diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-14 13:40:54 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-14 13:40:54 +0000 |
commit | 317c0644ccf108aa23ef3fd8358bd66c2840bfc0 (patch) | |
tree | c417b3d25c86b775989cb5ac042f37611b626c8a /src/sentinel.c | |
parent | Initial commit. (diff) | |
download | redis-317c0644ccf108aa23ef3fd8358bd66c2840bfc0.tar.xz redis-317c0644ccf108aa23ef3fd8358bd66c2840bfc0.zip |
Adding upstream version 5:7.2.4.upstream/5%7.2.4
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'src/sentinel.c')
-rw-r--r-- | src/sentinel.c | 5484 |
1 files changed, 5484 insertions, 0 deletions
diff --git a/src/sentinel.c b/src/sentinel.c new file mode 100644 index 0000000..238be90 --- /dev/null +++ b/src/sentinel.c @@ -0,0 +1,5484 @@ +/* Redis Sentinel implementation + * + * Copyright (c) 2009-2012, Salvatore Sanfilippo <antirez at gmail dot com> + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * * Redistributions of source code must retain the above copyright notice, + * this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * * Neither the name of Redis nor the names of its contributors may be used + * to endorse or promote products derived from this software without + * specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" + * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE + * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE + * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR + * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS + * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN + * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) + * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE + * POSSIBILITY OF SUCH DAMAGE. + */ + +#include "server.h" +#include "hiredis.h" +#if USE_OPENSSL == 1 /* BUILD_YES */ +#include "openssl/ssl.h" +#include "hiredis_ssl.h" +#endif +#include "async.h" + +#include <ctype.h> +#include <arpa/inet.h> +#include <sys/socket.h> +#include <sys/wait.h> +#include <fcntl.h> + +extern char **environ; + +#if USE_OPENSSL == 1 /* BUILD_YES */ +extern SSL_CTX *redis_tls_ctx; +extern SSL_CTX *redis_tls_client_ctx; +#endif + +#define REDIS_SENTINEL_PORT 26379 + +/* ======================== Sentinel global state =========================== */ + +/* Address object, used to describe an ip:port pair. */ +typedef struct sentinelAddr { + char *hostname; /* Hostname OR address, as specified */ + char *ip; /* Always a resolved address */ + int port; +} sentinelAddr; + +/* A Sentinel Redis Instance object is monitoring. */ +#define SRI_MASTER (1<<0) +#define SRI_SLAVE (1<<1) +#define SRI_SENTINEL (1<<2) +#define SRI_S_DOWN (1<<3) /* Subjectively down (no quorum). */ +#define SRI_O_DOWN (1<<4) /* Objectively down (confirmed by others). */ +#define SRI_MASTER_DOWN (1<<5) /* A Sentinel with this flag set thinks that + its master is down. */ +#define SRI_FAILOVER_IN_PROGRESS (1<<6) /* Failover is in progress for + this master. */ +#define SRI_PROMOTED (1<<7) /* Slave selected for promotion. */ +#define SRI_RECONF_SENT (1<<8) /* SLAVEOF <newmaster> sent. */ +#define SRI_RECONF_INPROG (1<<9) /* Slave synchronization in progress. */ +#define SRI_RECONF_DONE (1<<10) /* Slave synchronized with new master. */ +#define SRI_FORCE_FAILOVER (1<<11) /* Force failover with master up. */ +#define SRI_SCRIPT_KILL_SENT (1<<12) /* SCRIPT KILL already sent on -BUSY */ +#define SRI_MASTER_REBOOT (1<<13) /* Master was detected as rebooting */ +/* Note: when adding new flags, please check the flags section in addReplySentinelRedisInstance. */ + +/* Note: times are in milliseconds. */ +#define SENTINEL_PING_PERIOD 1000 + +static mstime_t sentinel_info_period = 10000; +static mstime_t sentinel_ping_period = SENTINEL_PING_PERIOD; +static mstime_t sentinel_ask_period = 1000; +static mstime_t sentinel_publish_period = 2000; +static mstime_t sentinel_default_down_after = 30000; +static mstime_t sentinel_tilt_trigger = 2000; +static mstime_t sentinel_tilt_period = SENTINEL_PING_PERIOD * 30; +static mstime_t sentinel_slave_reconf_timeout = 10000; +static mstime_t sentinel_min_link_reconnect_period = 15000; +static mstime_t sentinel_election_timeout = 10000; +static mstime_t sentinel_script_max_runtime = 60000; /* 60 seconds max exec time. */ +static mstime_t sentinel_script_retry_delay = 30000; /* 30 seconds between retries. */ +static mstime_t sentinel_default_failover_timeout = 60*3*1000; + +#define SENTINEL_HELLO_CHANNEL "__sentinel__:hello" +#define SENTINEL_DEFAULT_SLAVE_PRIORITY 100 +#define SENTINEL_DEFAULT_PARALLEL_SYNCS 1 +#define SENTINEL_MAX_PENDING_COMMANDS 100 + +#define SENTINEL_MAX_DESYNC 1000 +#define SENTINEL_DEFAULT_DENY_SCRIPTS_RECONFIG 1 +#define SENTINEL_DEFAULT_RESOLVE_HOSTNAMES 0 +#define SENTINEL_DEFAULT_ANNOUNCE_HOSTNAMES 0 + +/* Failover machine different states. */ +#define SENTINEL_FAILOVER_STATE_NONE 0 /* No failover in progress. */ +#define SENTINEL_FAILOVER_STATE_WAIT_START 1 /* Wait for failover_start_time*/ +#define SENTINEL_FAILOVER_STATE_SELECT_SLAVE 2 /* Select slave to promote */ +#define SENTINEL_FAILOVER_STATE_SEND_SLAVEOF_NOONE 3 /* Slave -> Master */ +#define SENTINEL_FAILOVER_STATE_WAIT_PROMOTION 4 /* Wait slave to change role */ +#define SENTINEL_FAILOVER_STATE_RECONF_SLAVES 5 /* SLAVEOF newmaster */ +#define SENTINEL_FAILOVER_STATE_UPDATE_CONFIG 6 /* Monitor promoted slave. */ + +#define SENTINEL_MASTER_LINK_STATUS_UP 0 +#define SENTINEL_MASTER_LINK_STATUS_DOWN 1 + +/* Generic flags that can be used with different functions. + * They use higher bits to avoid colliding with the function specific + * flags. */ +#define SENTINEL_NO_FLAGS 0 +#define SENTINEL_GENERATE_EVENT (1<<16) +#define SENTINEL_LEADER (1<<17) +#define SENTINEL_OBSERVER (1<<18) + +/* Script execution flags and limits. */ +#define SENTINEL_SCRIPT_NONE 0 +#define SENTINEL_SCRIPT_RUNNING 1 +#define SENTINEL_SCRIPT_MAX_QUEUE 256 +#define SENTINEL_SCRIPT_MAX_RUNNING 16 +#define SENTINEL_SCRIPT_MAX_RETRY 10 + +/* SENTINEL SIMULATE-FAILURE command flags. */ +#define SENTINEL_SIMFAILURE_NONE 0 +#define SENTINEL_SIMFAILURE_CRASH_AFTER_ELECTION (1<<0) +#define SENTINEL_SIMFAILURE_CRASH_AFTER_PROMOTION (1<<1) + +/* The link to a sentinelRedisInstance. When we have the same set of Sentinels + * monitoring many masters, we have different instances representing the + * same Sentinels, one per master, and we need to share the hiredis connections + * among them. Otherwise if 5 Sentinels are monitoring 100 masters we create + * 500 outgoing connections instead of 5. + * + * So this structure represents a reference counted link in terms of the two + * hiredis connections for commands and Pub/Sub, and the fields needed for + * failure detection, since the ping/pong time are now local to the link: if + * the link is available, the instance is available. This way we don't just + * have 5 connections instead of 500, we also send 5 pings instead of 500. + * + * Links are shared only for Sentinels: master and slave instances have + * a link with refcount = 1, always. */ +typedef struct instanceLink { + int refcount; /* Number of sentinelRedisInstance owners. */ + int disconnected; /* Non-zero if we need to reconnect cc or pc. */ + int pending_commands; /* Number of commands sent waiting for a reply. */ + redisAsyncContext *cc; /* Hiredis context for commands. */ + redisAsyncContext *pc; /* Hiredis context for Pub / Sub. */ + mstime_t cc_conn_time; /* cc connection time. */ + mstime_t pc_conn_time; /* pc connection time. */ + mstime_t pc_last_activity; /* Last time we received any message. */ + mstime_t last_avail_time; /* Last time the instance replied to ping with + a reply we consider valid. */ + mstime_t act_ping_time; /* Time at which the last pending ping (no pong + received after it) was sent. This field is + set to 0 when a pong is received, and set again + to the current time if the value is 0 and a new + ping is sent. */ + mstime_t last_ping_time; /* Time at which we sent the last ping. This is + only used to avoid sending too many pings + during failure. Idle time is computed using + the act_ping_time field. */ + mstime_t last_pong_time; /* Last time the instance replied to ping, + whatever the reply was. That's used to check + if the link is idle and must be reconnected. */ + mstime_t last_reconn_time; /* Last reconnection attempt performed when + the link was down. */ +} instanceLink; + +typedef struct sentinelRedisInstance { + int flags; /* See SRI_... defines */ + char *name; /* Master name from the point of view of this sentinel. */ + char *runid; /* Run ID of this instance, or unique ID if is a Sentinel.*/ + uint64_t config_epoch; /* Configuration epoch. */ + sentinelAddr *addr; /* Master host. */ + instanceLink *link; /* Link to the instance, may be shared for Sentinels. */ + mstime_t last_pub_time; /* Last time we sent hello via Pub/Sub. */ + mstime_t last_hello_time; /* Only used if SRI_SENTINEL is set. Last time + we received a hello from this Sentinel + via Pub/Sub. */ + mstime_t last_master_down_reply_time; /* Time of last reply to + SENTINEL is-master-down command. */ + mstime_t s_down_since_time; /* Subjectively down since time. */ + mstime_t o_down_since_time; /* Objectively down since time. */ + mstime_t down_after_period; /* Consider it down after that period. */ + mstime_t master_reboot_down_after_period; /* Consider master down after that period. */ + mstime_t master_reboot_since_time; /* master reboot time since time. */ + mstime_t info_refresh; /* Time at which we received INFO output from it. */ + dict *renamed_commands; /* Commands renamed in this instance: + Sentinel will use the alternative commands + mapped on this table to send things like + SLAVEOF, CONFIG, INFO, ... */ + + /* Role and the first time we observed it. + * This is useful in order to delay replacing what the instance reports + * with our own configuration. We need to always wait some time in order + * to give a chance to the leader to report the new configuration before + * we do silly things. */ + int role_reported; + mstime_t role_reported_time; + mstime_t slave_conf_change_time; /* Last time slave master addr changed. */ + + /* Master specific. */ + dict *sentinels; /* Other sentinels monitoring the same master. */ + dict *slaves; /* Slaves for this master instance. */ + unsigned int quorum;/* Number of sentinels that need to agree on failure. */ + int parallel_syncs; /* How many slaves to reconfigure at same time. */ + char *auth_pass; /* Password to use for AUTH against master & replica. */ + char *auth_user; /* Username for ACLs AUTH against master & replica. */ + + /* Slave specific. */ + mstime_t master_link_down_time; /* Slave replication link down time. */ + int slave_priority; /* Slave priority according to its INFO output. */ + int replica_announced; /* Replica announcing according to its INFO output. */ + mstime_t slave_reconf_sent_time; /* Time at which we sent SLAVE OF <new> */ + struct sentinelRedisInstance *master; /* Master instance if it's slave. */ + char *slave_master_host; /* Master host as reported by INFO */ + int slave_master_port; /* Master port as reported by INFO */ + int slave_master_link_status; /* Master link status as reported by INFO */ + unsigned long long slave_repl_offset; /* Slave replication offset. */ + /* Failover */ + char *leader; /* If this is a master instance, this is the runid of + the Sentinel that should perform the failover. If + this is a Sentinel, this is the runid of the Sentinel + that this Sentinel voted as leader. */ + uint64_t leader_epoch; /* Epoch of the 'leader' field. */ + uint64_t failover_epoch; /* Epoch of the currently started failover. */ + int failover_state; /* See SENTINEL_FAILOVER_STATE_* defines. */ + mstime_t failover_state_change_time; + mstime_t failover_start_time; /* Last failover attempt start time. */ + mstime_t failover_timeout; /* Max time to refresh failover state. */ + mstime_t failover_delay_logged; /* For what failover_start_time value we + logged the failover delay. */ + struct sentinelRedisInstance *promoted_slave; /* Promoted slave instance. */ + /* Scripts executed to notify admin or reconfigure clients: when they + * are set to NULL no script is executed. */ + char *notification_script; + char *client_reconfig_script; + sds info; /* cached INFO output */ +} sentinelRedisInstance; + +/* Main state. */ +struct sentinelState { + char myid[CONFIG_RUN_ID_SIZE+1]; /* This sentinel ID. */ + uint64_t current_epoch; /* Current epoch. */ + dict *masters; /* Dictionary of master sentinelRedisInstances. + Key is the instance name, value is the + sentinelRedisInstance structure pointer. */ + int tilt; /* Are we in TILT mode? */ + int running_scripts; /* Number of scripts in execution right now. */ + mstime_t tilt_start_time; /* When TITL started. */ + mstime_t previous_time; /* Last time we ran the time handler. */ + list *scripts_queue; /* Queue of user scripts to execute. */ + char *announce_ip; /* IP addr that is gossiped to other sentinels if + not NULL. */ + int announce_port; /* Port that is gossiped to other sentinels if + non zero. */ + unsigned long simfailure_flags; /* Failures simulation. */ + int deny_scripts_reconfig; /* Allow SENTINEL SET ... to change script + paths at runtime? */ + char *sentinel_auth_pass; /* Password to use for AUTH against other sentinel */ + char *sentinel_auth_user; /* Username for ACLs AUTH against other sentinel. */ + int resolve_hostnames; /* Support use of hostnames, assuming DNS is well configured. */ + int announce_hostnames; /* Announce hostnames instead of IPs when we have them. */ +} sentinel; + +/* A script execution job. */ +typedef struct sentinelScriptJob { + int flags; /* Script job flags: SENTINEL_SCRIPT_* */ + int retry_num; /* Number of times we tried to execute it. */ + char **argv; /* Arguments to call the script. */ + mstime_t start_time; /* Script execution time if the script is running, + otherwise 0 if we are allowed to retry the + execution at any time. If the script is not + running and it's not 0, it means: do not run + before the specified time. */ + pid_t pid; /* Script execution pid. */ +} sentinelScriptJob; + +/* ======================= hiredis ae.c adapters ============================= + * Note: this implementation is taken from hiredis/adapters/ae.h, however + * we have our modified copy for Sentinel in order to use our allocator + * and to have full control over how the adapter works. */ + +typedef struct redisAeEvents { + redisAsyncContext *context; + aeEventLoop *loop; + int fd; + int reading, writing; +} redisAeEvents; + +static void redisAeReadEvent(aeEventLoop *el, int fd, void *privdata, int mask) { + ((void)el); ((void)fd); ((void)mask); + + redisAeEvents *e = (redisAeEvents*)privdata; + redisAsyncHandleRead(e->context); +} + +static void redisAeWriteEvent(aeEventLoop *el, int fd, void *privdata, int mask) { + ((void)el); ((void)fd); ((void)mask); + + redisAeEvents *e = (redisAeEvents*)privdata; + redisAsyncHandleWrite(e->context); +} + +static void redisAeAddRead(void *privdata) { + redisAeEvents *e = (redisAeEvents*)privdata; + aeEventLoop *loop = e->loop; + if (!e->reading) { + e->reading = 1; + aeCreateFileEvent(loop,e->fd,AE_READABLE,redisAeReadEvent,e); + } +} + +static void redisAeDelRead(void *privdata) { + redisAeEvents *e = (redisAeEvents*)privdata; + aeEventLoop *loop = e->loop; + if (e->reading) { + e->reading = 0; + aeDeleteFileEvent(loop,e->fd,AE_READABLE); + } +} + +static void redisAeAddWrite(void *privdata) { + redisAeEvents *e = (redisAeEvents*)privdata; + aeEventLoop *loop = e->loop; + if (!e->writing) { + e->writing = 1; + aeCreateFileEvent(loop,e->fd,AE_WRITABLE,redisAeWriteEvent,e); + } +} + +static void redisAeDelWrite(void *privdata) { + redisAeEvents *e = (redisAeEvents*)privdata; + aeEventLoop *loop = e->loop; + if (e->writing) { + e->writing = 0; + aeDeleteFileEvent(loop,e->fd,AE_WRITABLE); + } +} + +static void redisAeCleanup(void *privdata) { + redisAeEvents *e = (redisAeEvents*)privdata; + redisAeDelRead(privdata); + redisAeDelWrite(privdata); + zfree(e); +} + +static int redisAeAttach(aeEventLoop *loop, redisAsyncContext *ac) { + redisContext *c = &(ac->c); + redisAeEvents *e; + + /* Nothing should be attached when something is already attached */ + if (ac->ev.data != NULL) + return C_ERR; + + /* Create container for context and r/w events */ + e = (redisAeEvents*)zmalloc(sizeof(*e)); + e->context = ac; + e->loop = loop; + e->fd = c->fd; + e->reading = e->writing = 0; + + /* Register functions to start/stop listening for events */ + ac->ev.addRead = redisAeAddRead; + ac->ev.delRead = redisAeDelRead; + ac->ev.addWrite = redisAeAddWrite; + ac->ev.delWrite = redisAeDelWrite; + ac->ev.cleanup = redisAeCleanup; + ac->ev.data = e; + + return C_OK; +} + +/* ============================= Prototypes ================================= */ + +void sentinelLinkEstablishedCallback(const redisAsyncContext *c, int status); +void sentinelDisconnectCallback(const redisAsyncContext *c, int status); +void sentinelReceiveHelloMessages(redisAsyncContext *c, void *reply, void *privdata); +sentinelRedisInstance *sentinelGetMasterByName(char *name); +char *sentinelGetSubjectiveLeader(sentinelRedisInstance *master); +char *sentinelGetObjectiveLeader(sentinelRedisInstance *master); +void instanceLinkConnectionError(const redisAsyncContext *c); +const char *sentinelRedisInstanceTypeStr(sentinelRedisInstance *ri); +void sentinelAbortFailover(sentinelRedisInstance *ri); +void sentinelEvent(int level, char *type, sentinelRedisInstance *ri, const char *fmt, ...); +sentinelRedisInstance *sentinelSelectSlave(sentinelRedisInstance *master); +void sentinelScheduleScriptExecution(char *path, ...); +void sentinelStartFailover(sentinelRedisInstance *master); +void sentinelDiscardReplyCallback(redisAsyncContext *c, void *reply, void *privdata); +int sentinelSendSlaveOf(sentinelRedisInstance *ri, const sentinelAddr *addr); +char *sentinelVoteLeader(sentinelRedisInstance *master, uint64_t req_epoch, char *req_runid, uint64_t *leader_epoch); +int sentinelFlushConfig(void); +void sentinelGenerateInitialMonitorEvents(void); +int sentinelSendPing(sentinelRedisInstance *ri); +int sentinelForceHelloUpdateForMaster(sentinelRedisInstance *master); +sentinelRedisInstance *getSentinelRedisInstanceByAddrAndRunID(dict *instances, char *ip, int port, char *runid); +void sentinelSimFailureCrash(void); + +/* ========================= Dictionary types =============================== */ + +void releaseSentinelRedisInstance(sentinelRedisInstance *ri); + +void dictInstancesValDestructor (dict *d, void *obj) { + UNUSED(d); + releaseSentinelRedisInstance(obj); +} + +/* Instance name (sds) -> instance (sentinelRedisInstance pointer) + * + * also used for: sentinelRedisInstance->sentinels dictionary that maps + * sentinels ip:port to last seen time in Pub/Sub hello message. */ +dictType instancesDictType = { + dictSdsHash, /* hash function */ + NULL, /* key dup */ + NULL, /* val dup */ + dictSdsKeyCompare, /* key compare */ + NULL, /* key destructor */ + dictInstancesValDestructor,/* val destructor */ + NULL /* allow to expand */ +}; + +/* Instance runid (sds) -> votes (long casted to void*) + * + * This is useful into sentinelGetObjectiveLeader() function in order to + * count the votes and understand who is the leader. */ +dictType leaderVotesDictType = { + dictSdsHash, /* hash function */ + NULL, /* key dup */ + NULL, /* val dup */ + dictSdsKeyCompare, /* key compare */ + NULL, /* key destructor */ + NULL, /* val destructor */ + NULL /* allow to expand */ +}; + +/* Instance renamed commands table. */ +dictType renamedCommandsDictType = { + dictSdsCaseHash, /* hash function */ + NULL, /* key dup */ + NULL, /* val dup */ + dictSdsKeyCaseCompare, /* key compare */ + dictSdsDestructor, /* key destructor */ + dictSdsDestructor, /* val destructor */ + NULL /* allow to expand */ +}; + +/* =========================== Initialization =============================== */ + +void sentinelSetCommand(client *c); +void sentinelConfigGetCommand(client *c); +void sentinelConfigSetCommand(client *c); + +/* this array is used for sentinel config lookup, which need to be loaded + * before monitoring masters config to avoid dependency issues */ +const char *preMonitorCfgName[] = { + "announce-ip", + "announce-port", + "deny-scripts-reconfig", + "sentinel-user", + "sentinel-pass", + "current-epoch", + "myid", + "resolve-hostnames", + "announce-hostnames" +}; + +/* This function overwrites a few normal Redis config default with Sentinel + * specific defaults. */ +void initSentinelConfig(void) { + server.port = REDIS_SENTINEL_PORT; + server.protected_mode = 0; /* Sentinel must be exposed. */ +} + +void freeSentinelLoadQueueEntry(void *item); + +/* Perform the Sentinel mode initialization. */ +void initSentinel(void) { + /* Initialize various data structures. */ + sentinel.current_epoch = 0; + sentinel.masters = dictCreate(&instancesDictType); + sentinel.tilt = 0; + sentinel.tilt_start_time = 0; + sentinel.previous_time = mstime(); + sentinel.running_scripts = 0; + sentinel.scripts_queue = listCreate(); + sentinel.announce_ip = NULL; + sentinel.announce_port = 0; + sentinel.simfailure_flags = SENTINEL_SIMFAILURE_NONE; + sentinel.deny_scripts_reconfig = SENTINEL_DEFAULT_DENY_SCRIPTS_RECONFIG; + sentinel.sentinel_auth_pass = NULL; + sentinel.sentinel_auth_user = NULL; + sentinel.resolve_hostnames = SENTINEL_DEFAULT_RESOLVE_HOSTNAMES; + sentinel.announce_hostnames = SENTINEL_DEFAULT_ANNOUNCE_HOSTNAMES; + memset(sentinel.myid,0,sizeof(sentinel.myid)); + server.sentinel_config = NULL; +} + +/* This function is for checking whether sentinel config file has been set, + * also checking whether we have write permissions. */ +void sentinelCheckConfigFile(void) { + if (server.configfile == NULL) { + serverLog(LL_WARNING, + "Sentinel needs config file on disk to save state. Exiting..."); + exit(1); + } else if (access(server.configfile,W_OK) == -1) { + serverLog(LL_WARNING, + "Sentinel config file %s is not writable: %s. Exiting...", + server.configfile,strerror(errno)); + exit(1); + } +} + +/* This function gets called when the server is in Sentinel mode, started, + * loaded the configuration, and is ready for normal operations. */ +void sentinelIsRunning(void) { + int j; + + /* If this Sentinel has yet no ID set in the configuration file, we + * pick a random one and persist the config on disk. From now on this + * will be this Sentinel ID across restarts. */ + for (j = 0; j < CONFIG_RUN_ID_SIZE; j++) + if (sentinel.myid[j] != 0) break; + + if (j == CONFIG_RUN_ID_SIZE) { + /* Pick ID and persist the config. */ + getRandomHexChars(sentinel.myid,CONFIG_RUN_ID_SIZE); + sentinelFlushConfig(); + } + + /* Log its ID to make debugging of issues simpler. */ + serverLog(LL_NOTICE,"Sentinel ID is %s", sentinel.myid); + + /* We want to generate a +monitor event for every configured master + * at startup. */ + sentinelGenerateInitialMonitorEvents(); +} + +/* ============================== sentinelAddr ============================== */ + +/* Create a sentinelAddr object and return it on success. + * On error NULL is returned and errno is set to: + * ENOENT: Can't resolve the hostname, unless accept_unresolved is non-zero. + * EINVAL: Invalid port number. + */ +sentinelAddr *createSentinelAddr(char *hostname, int port, int is_accept_unresolved) { + char ip[NET_IP_STR_LEN]; + sentinelAddr *sa; + + if (port < 0 || port > 65535) { + errno = EINVAL; + return NULL; + } + if (anetResolve(NULL,hostname,ip,sizeof(ip), + sentinel.resolve_hostnames ? ANET_NONE : ANET_IP_ONLY) == ANET_ERR) { + serverLog(LL_WARNING, "Failed to resolve hostname '%s'", hostname); + if (sentinel.resolve_hostnames && is_accept_unresolved) { + ip[0] = '\0'; + } + else { + errno = ENOENT; + return NULL; + } + } + sa = zmalloc(sizeof(*sa)); + sa->hostname = sdsnew(hostname); + sa->ip = sdsnew(ip); + sa->port = port; + return sa; +} + +/* Return a duplicate of the source address. */ +sentinelAddr *dupSentinelAddr(sentinelAddr *src) { + sentinelAddr *sa; + + sa = zmalloc(sizeof(*sa)); + sa->hostname = sdsnew(src->hostname); + sa->ip = sdsnew(src->ip); + sa->port = src->port; + return sa; +} + +/* Free a Sentinel address. Can't fail. */ +void releaseSentinelAddr(sentinelAddr *sa) { + sdsfree(sa->hostname); + sdsfree(sa->ip); + zfree(sa); +} + +/* Return non-zero if the two addresses are equal, either by address + * or by hostname if they could not have been resolved. + */ +int sentinelAddrOrHostnameEqual(sentinelAddr *a, sentinelAddr *b) { + return a->port == b->port && + (!strcmp(a->ip, b->ip) || + !strcasecmp(a->hostname, b->hostname)); +} + +/* Return non-zero if a hostname matches an address. */ +int sentinelAddrEqualsHostname(sentinelAddr *a, char *hostname) { + char ip[NET_IP_STR_LEN]; + + /* Try resolve the hostname and compare it to the address */ + if (anetResolve(NULL, hostname, ip, sizeof(ip), + sentinel.resolve_hostnames ? ANET_NONE : ANET_IP_ONLY) == ANET_ERR) { + + /* If failed resolve then compare based on hostnames. That is our best effort as + * long as the server is unavailable for some reason. It is fine since Redis + * instance cannot have multiple hostnames for a given setup */ + return !strcasecmp(sentinel.resolve_hostnames ? a->hostname : a->ip, hostname); + } + /* Compare based on address */ + return !strcasecmp(a->ip, ip); +} + +const char *announceSentinelAddr(const sentinelAddr *a) { + return sentinel.announce_hostnames ? a->hostname : a->ip; +} + +/* Return an allocated sds with hostname/address:port. IPv6 + * addresses are bracketed the same way anetFormatAddr() does. + */ +sds announceSentinelAddrAndPort(const sentinelAddr *a) { + const char *addr = announceSentinelAddr(a); + if (strchr(addr, ':') != NULL) + return sdscatprintf(sdsempty(), "[%s]:%d", addr, a->port); + else + return sdscatprintf(sdsempty(), "%s:%d", addr, a->port); +} + +/* =========================== Events notification ========================== */ + +/* Send an event to log, pub/sub, user notification script. + * + * 'level' is the log level for logging. Only LL_WARNING events will trigger + * the execution of the user notification script. + * + * 'type' is the message type, also used as a pub/sub channel name. + * + * 'ri', is the redis instance target of this event if applicable, and is + * used to obtain the path of the notification script to execute. + * + * The remaining arguments are printf-alike. + * If the format specifier starts with the two characters "%@" then ri is + * not NULL, and the message is prefixed with an instance identifier in the + * following format: + * + * <instance type> <instance name> <ip> <port> + * + * If the instance type is not master, than the additional string is + * added to specify the originating master: + * + * @ <master name> <master ip> <master port> + * + * Any other specifier after "%@" is processed by printf itself. + */ +void sentinelEvent(int level, char *type, sentinelRedisInstance *ri, + const char *fmt, ...) { + va_list ap; + char msg[LOG_MAX_LEN]; + robj *channel, *payload; + + /* Handle %@ */ + if (fmt[0] == '%' && fmt[1] == '@') { + sentinelRedisInstance *master = (ri->flags & SRI_MASTER) ? + NULL : ri->master; + + if (master) { + snprintf(msg, sizeof(msg), "%s %s %s %d @ %s %s %d", + sentinelRedisInstanceTypeStr(ri), + ri->name, announceSentinelAddr(ri->addr), ri->addr->port, + master->name, announceSentinelAddr(master->addr), master->addr->port); + } else { + snprintf(msg, sizeof(msg), "%s %s %s %d", + sentinelRedisInstanceTypeStr(ri), + ri->name, announceSentinelAddr(ri->addr), ri->addr->port); + } + fmt += 2; + } else { + msg[0] = '\0'; + } + + /* Use vsprintf for the rest of the formatting if any. */ + if (fmt[0] != '\0') { + va_start(ap, fmt); + vsnprintf(msg+strlen(msg), sizeof(msg)-strlen(msg), fmt, ap); + va_end(ap); + } + + /* Log the message if the log level allows it to be logged. */ + if (level >= server.verbosity) + serverLog(level,"%s %s",type,msg); + + /* Publish the message via Pub/Sub if it's not a debugging one. */ + if (level != LL_DEBUG) { + channel = createStringObject(type,strlen(type)); + payload = createStringObject(msg,strlen(msg)); + pubsubPublishMessage(channel,payload,0); + decrRefCount(channel); + decrRefCount(payload); + } + + /* Call the notification script if applicable. */ + if (level == LL_WARNING && ri != NULL) { + sentinelRedisInstance *master = (ri->flags & SRI_MASTER) ? + ri : ri->master; + if (master && master->notification_script) { + sentinelScheduleScriptExecution(master->notification_script, + type,msg,NULL); + } + } +} + +/* This function is called only at startup and is used to generate a + * +monitor event for every configured master. The same events are also + * generated when a master to monitor is added at runtime via the + * SENTINEL MONITOR command. */ +void sentinelGenerateInitialMonitorEvents(void) { + dictIterator *di; + dictEntry *de; + + di = dictGetIterator(sentinel.masters); + while((de = dictNext(di)) != NULL) { + sentinelRedisInstance *ri = dictGetVal(de); + sentinelEvent(LL_WARNING,"+monitor",ri,"%@ quorum %d",ri->quorum); + } + dictReleaseIterator(di); +} + +/* ============================ script execution ============================ */ + +/* Release a script job structure and all the associated data. */ +void sentinelReleaseScriptJob(sentinelScriptJob *sj) { + int j = 0; + + while(sj->argv[j]) sdsfree(sj->argv[j++]); + zfree(sj->argv); + zfree(sj); +} + +#define SENTINEL_SCRIPT_MAX_ARGS 16 +void sentinelScheduleScriptExecution(char *path, ...) { + va_list ap; + char *argv[SENTINEL_SCRIPT_MAX_ARGS+1]; + int argc = 1; + sentinelScriptJob *sj; + + va_start(ap, path); + while(argc < SENTINEL_SCRIPT_MAX_ARGS) { + argv[argc] = va_arg(ap,char*); + if (!argv[argc]) break; + argv[argc] = sdsnew(argv[argc]); /* Copy the string. */ + argc++; + } + va_end(ap); + argv[0] = sdsnew(path); + + sj = zmalloc(sizeof(*sj)); + sj->flags = SENTINEL_SCRIPT_NONE; + sj->retry_num = 0; + sj->argv = zmalloc(sizeof(char*)*(argc+1)); + sj->start_time = 0; + sj->pid = 0; + memcpy(sj->argv,argv,sizeof(char*)*(argc+1)); + + listAddNodeTail(sentinel.scripts_queue,sj); + + /* Remove the oldest non running script if we already hit the limit. */ + if (listLength(sentinel.scripts_queue) > SENTINEL_SCRIPT_MAX_QUEUE) { + listNode *ln; + listIter li; + + listRewind(sentinel.scripts_queue,&li); + while ((ln = listNext(&li)) != NULL) { + sj = ln->value; + + if (sj->flags & SENTINEL_SCRIPT_RUNNING) continue; + /* The first node is the oldest as we add on tail. */ + listDelNode(sentinel.scripts_queue,ln); + sentinelReleaseScriptJob(sj); + break; + } + serverAssert(listLength(sentinel.scripts_queue) <= + SENTINEL_SCRIPT_MAX_QUEUE); + } +} + +/* Lookup a script in the scripts queue via pid, and returns the list node + * (so that we can easily remove it from the queue if needed). */ +listNode *sentinelGetScriptListNodeByPid(pid_t pid) { + listNode *ln; + listIter li; + + listRewind(sentinel.scripts_queue,&li); + while ((ln = listNext(&li)) != NULL) { + sentinelScriptJob *sj = ln->value; + + if ((sj->flags & SENTINEL_SCRIPT_RUNNING) && sj->pid == pid) + return ln; + } + return NULL; +} + +/* Run pending scripts if we are not already at max number of running + * scripts. */ +void sentinelRunPendingScripts(void) { + listNode *ln; + listIter li; + mstime_t now = mstime(); + + /* Find jobs that are not running and run them, from the top to the + * tail of the queue, so we run older jobs first. */ + listRewind(sentinel.scripts_queue,&li); + while (sentinel.running_scripts < SENTINEL_SCRIPT_MAX_RUNNING && + (ln = listNext(&li)) != NULL) + { + sentinelScriptJob *sj = ln->value; + pid_t pid; + + /* Skip if already running. */ + if (sj->flags & SENTINEL_SCRIPT_RUNNING) continue; + + /* Skip if it's a retry, but not enough time has elapsed. */ + if (sj->start_time && sj->start_time > now) continue; + + sj->flags |= SENTINEL_SCRIPT_RUNNING; + sj->start_time = mstime(); + sj->retry_num++; + pid = fork(); + + if (pid == -1) { + /* Parent (fork error). + * We report fork errors as signal 99, in order to unify the + * reporting with other kind of errors. */ + sentinelEvent(LL_WARNING,"-script-error",NULL, + "%s %d %d", sj->argv[0], 99, 0); + sj->flags &= ~SENTINEL_SCRIPT_RUNNING; + sj->pid = 0; + } else if (pid == 0) { + /* Child */ + connTypeCleanupAll(); + execve(sj->argv[0],sj->argv,environ); + /* If we are here an error occurred. */ + _exit(2); /* Don't retry execution. */ + } else { + sentinel.running_scripts++; + sj->pid = pid; + sentinelEvent(LL_DEBUG,"+script-child",NULL,"%ld",(long)pid); + } + } +} + +/* How much to delay the execution of a script that we need to retry after + * an error? + * + * We double the retry delay for every further retry we do. So for instance + * if RETRY_DELAY is set to 30 seconds and the max number of retries is 10 + * starting from the second attempt to execute the script the delays are: + * 30 sec, 60 sec, 2 min, 4 min, 8 min, 16 min, 32 min, 64 min, 128 min. */ +mstime_t sentinelScriptRetryDelay(int retry_num) { + mstime_t delay = sentinel_script_retry_delay; + + while (retry_num-- > 1) delay *= 2; + return delay; +} + +/* Check for scripts that terminated, and remove them from the queue if the + * script terminated successfully. If instead the script was terminated by + * a signal, or returned exit code "1", it is scheduled to run again if + * the max number of retries did not already elapsed. */ +void sentinelCollectTerminatedScripts(void) { + int statloc; + pid_t pid; + + while ((pid = waitpid(-1, &statloc, WNOHANG)) > 0) { + int exitcode = WEXITSTATUS(statloc); + int bysignal = 0; + listNode *ln; + sentinelScriptJob *sj; + + if (WIFSIGNALED(statloc)) bysignal = WTERMSIG(statloc); + sentinelEvent(LL_DEBUG,"-script-child",NULL,"%ld %d %d", + (long)pid, exitcode, bysignal); + + ln = sentinelGetScriptListNodeByPid(pid); + if (ln == NULL) { + serverLog(LL_WARNING,"waitpid() returned a pid (%ld) we can't find in our scripts execution queue!", (long)pid); + continue; + } + sj = ln->value; + + /* If the script was terminated by a signal or returns an + * exit code of "1" (that means: please retry), we reschedule it + * if the max number of retries is not already reached. */ + if ((bysignal || exitcode == 1) && + sj->retry_num != SENTINEL_SCRIPT_MAX_RETRY) + { + sj->flags &= ~SENTINEL_SCRIPT_RUNNING; + sj->pid = 0; + sj->start_time = mstime() + + sentinelScriptRetryDelay(sj->retry_num); + } else { + /* Otherwise let's remove the script, but log the event if the + * execution did not terminated in the best of the ways. */ + if (bysignal || exitcode != 0) { + sentinelEvent(LL_WARNING,"-script-error",NULL, + "%s %d %d", sj->argv[0], bysignal, exitcode); + } + listDelNode(sentinel.scripts_queue,ln); + sentinelReleaseScriptJob(sj); + } + sentinel.running_scripts--; + } +} + +/* Kill scripts in timeout, they'll be collected by the + * sentinelCollectTerminatedScripts() function. */ +void sentinelKillTimedoutScripts(void) { + listNode *ln; + listIter li; + mstime_t now = mstime(); + + listRewind(sentinel.scripts_queue,&li); + while ((ln = listNext(&li)) != NULL) { + sentinelScriptJob *sj = ln->value; + + if (sj->flags & SENTINEL_SCRIPT_RUNNING && + (now - sj->start_time) > sentinel_script_max_runtime) + { + sentinelEvent(LL_WARNING,"-script-timeout",NULL,"%s %ld", + sj->argv[0], (long)sj->pid); + kill(sj->pid,SIGKILL); + } + } +} + +/* Implements SENTINEL PENDING-SCRIPTS command. */ +void sentinelPendingScriptsCommand(client *c) { + listNode *ln; + listIter li; + + addReplyArrayLen(c,listLength(sentinel.scripts_queue)); + listRewind(sentinel.scripts_queue,&li); + while ((ln = listNext(&li)) != NULL) { + sentinelScriptJob *sj = ln->value; + int j = 0; + + addReplyMapLen(c,5); + + addReplyBulkCString(c,"argv"); + while (sj->argv[j]) j++; + addReplyArrayLen(c,j); + j = 0; + while (sj->argv[j]) addReplyBulkCString(c,sj->argv[j++]); + + addReplyBulkCString(c,"flags"); + addReplyBulkCString(c, + (sj->flags & SENTINEL_SCRIPT_RUNNING) ? "running" : "scheduled"); + + addReplyBulkCString(c,"pid"); + addReplyBulkLongLong(c,sj->pid); + + if (sj->flags & SENTINEL_SCRIPT_RUNNING) { + addReplyBulkCString(c,"run-time"); + addReplyBulkLongLong(c,mstime() - sj->start_time); + } else { + mstime_t delay = sj->start_time ? (sj->start_time-mstime()) : 0; + if (delay < 0) delay = 0; + addReplyBulkCString(c,"run-delay"); + addReplyBulkLongLong(c,delay); + } + + addReplyBulkCString(c,"retry-num"); + addReplyBulkLongLong(c,sj->retry_num); + } +} + +/* This function calls, if any, the client reconfiguration script with the + * following parameters: + * + * <master-name> <role> <state> <from-ip> <from-port> <to-ip> <to-port> + * + * It is called every time a failover is performed. + * + * <state> is currently always "start". + * <role> is either "leader" or "observer". + * + * from/to fields are respectively master -> promoted slave addresses for + * "start" and "end". */ +void sentinelCallClientReconfScript(sentinelRedisInstance *master, int role, char *state, sentinelAddr *from, sentinelAddr *to) { + char fromport[32], toport[32]; + + if (master->client_reconfig_script == NULL) return; + ll2string(fromport,sizeof(fromport),from->port); + ll2string(toport,sizeof(toport),to->port); + sentinelScheduleScriptExecution(master->client_reconfig_script, + master->name, + (role == SENTINEL_LEADER) ? "leader" : "observer", + state, announceSentinelAddr(from), fromport, + announceSentinelAddr(to), toport, NULL); +} + +/* =============================== instanceLink ============================= */ + +/* Create a not yet connected link object. */ +instanceLink *createInstanceLink(void) { + instanceLink *link = zmalloc(sizeof(*link)); + + link->refcount = 1; + link->disconnected = 1; + link->pending_commands = 0; + link->cc = NULL; + link->pc = NULL; + link->cc_conn_time = 0; + link->pc_conn_time = 0; + link->last_reconn_time = 0; + link->pc_last_activity = 0; + /* We set the act_ping_time to "now" even if we actually don't have yet + * a connection with the node, nor we sent a ping. + * This is useful to detect a timeout in case we'll not be able to connect + * with the node at all. */ + link->act_ping_time = mstime(); + link->last_ping_time = 0; + link->last_avail_time = mstime(); + link->last_pong_time = mstime(); + return link; +} + +/* Disconnect a hiredis connection in the context of an instance link. */ +void instanceLinkCloseConnection(instanceLink *link, redisAsyncContext *c) { + if (c == NULL) return; + + if (link->cc == c) { + link->cc = NULL; + link->pending_commands = 0; + } + if (link->pc == c) link->pc = NULL; + c->data = NULL; + link->disconnected = 1; + redisAsyncFree(c); +} + +/* Decrement the refcount of a link object, if it drops to zero, actually + * free it and return NULL. Otherwise don't do anything and return the pointer + * to the object. + * + * If we are not going to free the link and ri is not NULL, we rebind all the + * pending requests in link->cc (hiredis connection for commands) to a + * callback that will just ignore them. This is useful to avoid processing + * replies for an instance that no longer exists. */ +instanceLink *releaseInstanceLink(instanceLink *link, sentinelRedisInstance *ri) +{ + serverAssert(link->refcount > 0); + link->refcount--; + if (link->refcount != 0) { + if (ri && ri->link->cc) { + /* This instance may have pending callbacks in the hiredis async + * context, having as 'privdata' the instance that we are going to + * free. Let's rewrite the callback list, directly exploiting + * hiredis internal data structures, in order to bind them with + * a callback that will ignore the reply at all. */ + redisCallback *cb; + redisCallbackList *callbacks = &link->cc->replies; + + cb = callbacks->head; + while(cb) { + if (cb->privdata == ri) { + cb->fn = sentinelDiscardReplyCallback; + cb->privdata = NULL; /* Not strictly needed. */ + } + cb = cb->next; + } + } + return link; /* Other active users. */ + } + + instanceLinkCloseConnection(link,link->cc); + instanceLinkCloseConnection(link,link->pc); + zfree(link); + return NULL; +} + +/* This function will attempt to share the instance link we already have + * for the same Sentinel in the context of a different master, with the + * instance we are passing as argument. + * + * This way multiple Sentinel objects that refer all to the same physical + * Sentinel instance but in the context of different masters will use + * a single connection, will send a single PING per second for failure + * detection and so forth. + * + * Return C_OK if a matching Sentinel was found in the context of a + * different master and sharing was performed. Otherwise C_ERR + * is returned. */ +int sentinelTryConnectionSharing(sentinelRedisInstance *ri) { + serverAssert(ri->flags & SRI_SENTINEL); + dictIterator *di; + dictEntry *de; + + if (ri->runid == NULL) return C_ERR; /* No way to identify it. */ + if (ri->link->refcount > 1) return C_ERR; /* Already shared. */ + + di = dictGetIterator(sentinel.masters); + while((de = dictNext(di)) != NULL) { + sentinelRedisInstance *master = dictGetVal(de), *match; + /* We want to share with the same physical Sentinel referenced + * in other masters, so skip our master. */ + if (master == ri->master) continue; + match = getSentinelRedisInstanceByAddrAndRunID(master->sentinels, + NULL,0,ri->runid); + if (match == NULL) continue; /* No match. */ + if (match == ri) continue; /* Should never happen but... safer. */ + + /* We identified a matching Sentinel, great! Let's free our link + * and use the one of the matching Sentinel. */ + releaseInstanceLink(ri->link,NULL); + ri->link = match->link; + match->link->refcount++; + dictReleaseIterator(di); + return C_OK; + } + dictReleaseIterator(di); + return C_ERR; +} + +/* Disconnect the relevant master and its replicas. */ +void dropInstanceConnections(sentinelRedisInstance *ri) { + serverAssert(ri->flags & SRI_MASTER); + + /* Disconnect with the master. */ + instanceLinkCloseConnection(ri->link, ri->link->cc); + instanceLinkCloseConnection(ri->link, ri->link->pc); + + /* Disconnect with all replicas. */ + dictIterator *di; + dictEntry *de; + sentinelRedisInstance *repl_ri; + di = dictGetIterator(ri->slaves); + while ((de = dictNext(di)) != NULL) { + repl_ri = dictGetVal(de); + instanceLinkCloseConnection(repl_ri->link, repl_ri->link->cc); + instanceLinkCloseConnection(repl_ri->link, repl_ri->link->pc); + } + dictReleaseIterator(di); +} + +/* Drop all connections to other sentinels. Returns the number of connections + * dropped.*/ +int sentinelDropConnections(void) { + dictIterator *di; + dictEntry *de; + int dropped = 0; + + di = dictGetIterator(sentinel.masters); + while ((de = dictNext(di)) != NULL) { + dictIterator *sdi; + dictEntry *sde; + + sentinelRedisInstance *ri = dictGetVal(de); + sdi = dictGetIterator(ri->sentinels); + while ((sde = dictNext(sdi)) != NULL) { + sentinelRedisInstance *si = dictGetVal(sde); + if (!si->link->disconnected) { + instanceLinkCloseConnection(si->link, si->link->pc); + instanceLinkCloseConnection(si->link, si->link->cc); + dropped++; + } + } + dictReleaseIterator(sdi); + } + dictReleaseIterator(di); + + return dropped; +} + +/* When we detect a Sentinel to switch address (reporting a different IP/port + * pair in Hello messages), let's update all the matching Sentinels in the + * context of other masters as well and disconnect the links, so that everybody + * will be updated. + * + * Return the number of updated Sentinel addresses. */ +int sentinelUpdateSentinelAddressInAllMasters(sentinelRedisInstance *ri) { + serverAssert(ri->flags & SRI_SENTINEL); + dictIterator *di; + dictEntry *de; + int reconfigured = 0; + + di = dictGetIterator(sentinel.masters); + while((de = dictNext(di)) != NULL) { + sentinelRedisInstance *master = dictGetVal(de), *match; + match = getSentinelRedisInstanceByAddrAndRunID(master->sentinels, + NULL,0,ri->runid); + /* If there is no match, this master does not know about this + * Sentinel, try with the next one. */ + if (match == NULL) continue; + + /* Disconnect the old links if connected. */ + if (match->link->cc != NULL) + instanceLinkCloseConnection(match->link,match->link->cc); + if (match->link->pc != NULL) + instanceLinkCloseConnection(match->link,match->link->pc); + + if (match == ri) continue; /* Address already updated for it. */ + + /* Update the address of the matching Sentinel by copying the address + * of the Sentinel object that received the address update. */ + releaseSentinelAddr(match->addr); + match->addr = dupSentinelAddr(ri->addr); + reconfigured++; + } + dictReleaseIterator(di); + if (reconfigured) + sentinelEvent(LL_NOTICE,"+sentinel-address-update", ri, + "%@ %d additional matching instances", reconfigured); + return reconfigured; +} + +/* This function is called when a hiredis connection reported an error. + * We set it to NULL and mark the link as disconnected so that it will be + * reconnected again. + * + * Note: we don't free the hiredis context as hiredis will do it for us + * for async connections. */ +void instanceLinkConnectionError(const redisAsyncContext *c) { + instanceLink *link = c->data; + int pubsub; + + if (!link) return; + + pubsub = (link->pc == c); + if (pubsub) + link->pc = NULL; + else + link->cc = NULL; + link->disconnected = 1; +} + +/* Hiredis connection established / disconnected callbacks. We need them + * just to cleanup our link state. */ +void sentinelLinkEstablishedCallback(const redisAsyncContext *c, int status) { + if (status != C_OK) instanceLinkConnectionError(c); +} + +void sentinelDisconnectCallback(const redisAsyncContext *c, int status) { + UNUSED(status); + instanceLinkConnectionError(c); +} + +/* ========================== sentinelRedisInstance ========================= */ + +/* Create a redis instance, the following fields must be populated by the + * caller if needed: + * runid: set to NULL but will be populated once INFO output is received. + * info_refresh: is set to 0 to mean that we never received INFO so far. + * + * If SRI_MASTER is set into initial flags the instance is added to + * sentinel.masters table. + * + * if SRI_SLAVE or SRI_SENTINEL is set then 'master' must be not NULL and the + * instance is added into master->slaves or master->sentinels table. + * + * If the instance is a slave, the name parameter is ignored and is created + * automatically as ip/hostname:port. + * + * The function fails if hostname can't be resolved or port is out of range. + * When this happens NULL is returned and errno is set accordingly to the + * createSentinelAddr() function. + * + * The function may also fail and return NULL with errno set to EBUSY if + * a master with the same name, a slave with the same address, or a sentinel + * with the same ID already exists. */ + +sentinelRedisInstance *createSentinelRedisInstance(char *name, int flags, char *hostname, int port, int quorum, sentinelRedisInstance *master) { + sentinelRedisInstance *ri; + sentinelAddr *addr; + dict *table = NULL; + sds sdsname; + + serverAssert(flags & (SRI_MASTER|SRI_SLAVE|SRI_SENTINEL)); + serverAssert((flags & SRI_MASTER) || master != NULL); + + /* Check address validity. */ + addr = createSentinelAddr(hostname,port,1); + if (addr == NULL) return NULL; + + /* For slaves use ip/host:port as name. */ + if (flags & SRI_SLAVE) + sdsname = announceSentinelAddrAndPort(addr); + else + sdsname = sdsnew(name); + + /* Make sure the entry is not duplicated. This may happen when the same + * name for a master is used multiple times inside the configuration or + * if we try to add multiple times a slave or sentinel with same ip/port + * to a master. */ + if (flags & SRI_MASTER) table = sentinel.masters; + else if (flags & SRI_SLAVE) table = master->slaves; + else if (flags & SRI_SENTINEL) table = master->sentinels; + if (dictFind(table,sdsname)) { + releaseSentinelAddr(addr); + sdsfree(sdsname); + errno = EBUSY; + return NULL; + } + + /* Create the instance object. */ + ri = zmalloc(sizeof(*ri)); + /* Note that all the instances are started in the disconnected state, + * the event loop will take care of connecting them. */ + ri->flags = flags; + ri->name = sdsname; + ri->runid = NULL; + ri->config_epoch = 0; + ri->addr = addr; + ri->link = createInstanceLink(); + ri->last_pub_time = mstime(); + ri->last_hello_time = mstime(); + ri->last_master_down_reply_time = mstime(); + ri->s_down_since_time = 0; + ri->o_down_since_time = 0; + ri->down_after_period = master ? master->down_after_period : sentinel_default_down_after; + ri->master_reboot_down_after_period = 0; + ri->master_link_down_time = 0; + ri->auth_pass = NULL; + ri->auth_user = NULL; + ri->slave_priority = SENTINEL_DEFAULT_SLAVE_PRIORITY; + ri->replica_announced = 1; + ri->slave_reconf_sent_time = 0; + ri->slave_master_host = NULL; + ri->slave_master_port = 0; + ri->slave_master_link_status = SENTINEL_MASTER_LINK_STATUS_DOWN; + ri->slave_repl_offset = 0; + ri->sentinels = dictCreate(&instancesDictType); + ri->quorum = quorum; + ri->parallel_syncs = SENTINEL_DEFAULT_PARALLEL_SYNCS; + ri->master = master; + ri->slaves = dictCreate(&instancesDictType); + ri->info_refresh = 0; + ri->renamed_commands = dictCreate(&renamedCommandsDictType); + + /* Failover state. */ + ri->leader = NULL; + ri->leader_epoch = 0; + ri->failover_epoch = 0; + ri->failover_state = SENTINEL_FAILOVER_STATE_NONE; + ri->failover_state_change_time = 0; + ri->failover_start_time = 0; + ri->failover_timeout = sentinel_default_failover_timeout; + ri->failover_delay_logged = 0; + ri->promoted_slave = NULL; + ri->notification_script = NULL; + ri->client_reconfig_script = NULL; + ri->info = NULL; + + /* Role */ + ri->role_reported = ri->flags & (SRI_MASTER|SRI_SLAVE); + ri->role_reported_time = mstime(); + ri->slave_conf_change_time = mstime(); + + /* Add into the right table. */ + dictAdd(table, ri->name, ri); + return ri; +} + +/* Release this instance and all its slaves, sentinels, hiredis connections. + * This function does not take care of unlinking the instance from the main + * masters table (if it is a master) or from its master sentinels/slaves table + * if it is a slave or sentinel. */ +void releaseSentinelRedisInstance(sentinelRedisInstance *ri) { + /* Release all its slaves or sentinels if any. */ + dictRelease(ri->sentinels); + dictRelease(ri->slaves); + + /* Disconnect the instance. */ + releaseInstanceLink(ri->link,ri); + + /* Free other resources. */ + sdsfree(ri->name); + sdsfree(ri->runid); + sdsfree(ri->notification_script); + sdsfree(ri->client_reconfig_script); + sdsfree(ri->slave_master_host); + sdsfree(ri->leader); + sdsfree(ri->auth_pass); + sdsfree(ri->auth_user); + sdsfree(ri->info); + releaseSentinelAddr(ri->addr); + dictRelease(ri->renamed_commands); + + /* Clear state into the master if needed. */ + if ((ri->flags & SRI_SLAVE) && (ri->flags & SRI_PROMOTED) && ri->master) + ri->master->promoted_slave = NULL; + + zfree(ri); +} + +/* Lookup a slave in a master Redis instance, by ip and port. */ +sentinelRedisInstance *sentinelRedisInstanceLookupSlave( + sentinelRedisInstance *ri, char *slave_addr, int port) +{ + sds key; + sentinelRedisInstance *slave; + sentinelAddr *addr; + + serverAssert(ri->flags & SRI_MASTER); + + /* We need to handle a slave_addr that is potentially a hostname. + * If that is the case, depending on configuration we either resolve + * it and use the IP address or fail. + */ + addr = createSentinelAddr(slave_addr, port, 0); + if (!addr) return NULL; + key = announceSentinelAddrAndPort(addr); + releaseSentinelAddr(addr); + + slave = dictFetchValue(ri->slaves,key); + sdsfree(key); + return slave; +} + +/* Return the name of the type of the instance as a string. */ +const char *sentinelRedisInstanceTypeStr(sentinelRedisInstance *ri) { + if (ri->flags & SRI_MASTER) return "master"; + else if (ri->flags & SRI_SLAVE) return "slave"; + else if (ri->flags & SRI_SENTINEL) return "sentinel"; + else return "unknown"; +} + +/* This function remove the Sentinel with the specified ID from the + * specified master. + * + * If "runid" is NULL the function returns ASAP. + * + * This function is useful because on Sentinels address switch, we want to + * remove our old entry and add a new one for the same ID but with the new + * address. + * + * The function returns 1 if the matching Sentinel was removed, otherwise + * 0 if there was no Sentinel with this ID. */ +int removeMatchingSentinelFromMaster(sentinelRedisInstance *master, char *runid) { + dictIterator *di; + dictEntry *de; + int removed = 0; + + if (runid == NULL) return 0; + + di = dictGetSafeIterator(master->sentinels); + while((de = dictNext(di)) != NULL) { + sentinelRedisInstance *ri = dictGetVal(de); + + if (ri->runid && strcmp(ri->runid,runid) == 0) { + dictDelete(master->sentinels,ri->name); + removed++; + } + } + dictReleaseIterator(di); + return removed; +} + +/* Search an instance with the same runid, ip and port into a dictionary + * of instances. Return NULL if not found, otherwise return the instance + * pointer. + * + * runid or addr can be NULL. In such a case the search is performed only + * by the non-NULL field. */ +sentinelRedisInstance *getSentinelRedisInstanceByAddrAndRunID(dict *instances, char *addr, int port, char *runid) { + dictIterator *di; + dictEntry *de; + sentinelRedisInstance *instance = NULL; + sentinelAddr *ri_addr = NULL; + + serverAssert(addr || runid); /* User must pass at least one search param. */ + if (addr != NULL) { + /* Try to resolve addr. If hostnames are used, we're accepting an ri_addr + * that contains an hostname only and can still be matched based on that. + */ + ri_addr = createSentinelAddr(addr,port,1); + if (!ri_addr) return NULL; + } + di = dictGetIterator(instances); + while((de = dictNext(di)) != NULL) { + sentinelRedisInstance *ri = dictGetVal(de); + + if (runid && !ri->runid) continue; + if ((runid == NULL || strcmp(ri->runid, runid) == 0) && + (addr == NULL || sentinelAddrOrHostnameEqual(ri->addr, ri_addr))) + { + instance = ri; + break; + } + } + dictReleaseIterator(di); + if (ri_addr != NULL) + releaseSentinelAddr(ri_addr); + + return instance; +} + +/* Master lookup by name */ +sentinelRedisInstance *sentinelGetMasterByName(char *name) { + sentinelRedisInstance *ri; + sds sdsname = sdsnew(name); + + ri = dictFetchValue(sentinel.masters,sdsname); + sdsfree(sdsname); + return ri; +} + +/* Reset the state of a monitored master: + * 1) Remove all slaves. + * 2) Remove all sentinels. + * 3) Remove most of the flags resulting from runtime operations. + * 4) Reset timers to their default value. For example after a reset it will be + * possible to failover again the same master ASAP, without waiting the + * failover timeout delay. + * 5) In the process of doing this undo the failover if in progress. + * 6) Disconnect the connections with the master (will reconnect automatically). + */ + +#define SENTINEL_RESET_NO_SENTINELS (1<<0) +void sentinelResetMaster(sentinelRedisInstance *ri, int flags) { + serverAssert(ri->flags & SRI_MASTER); + dictRelease(ri->slaves); + ri->slaves = dictCreate(&instancesDictType); + if (!(flags & SENTINEL_RESET_NO_SENTINELS)) { + dictRelease(ri->sentinels); + ri->sentinels = dictCreate(&instancesDictType); + } + instanceLinkCloseConnection(ri->link,ri->link->cc); + instanceLinkCloseConnection(ri->link,ri->link->pc); + ri->flags &= SRI_MASTER; + if (ri->leader) { + sdsfree(ri->leader); + ri->leader = NULL; + } + ri->failover_state = SENTINEL_FAILOVER_STATE_NONE; + ri->failover_state_change_time = 0; + ri->failover_start_time = 0; /* We can failover again ASAP. */ + ri->promoted_slave = NULL; + sdsfree(ri->runid); + sdsfree(ri->slave_master_host); + ri->runid = NULL; + ri->slave_master_host = NULL; + ri->link->act_ping_time = mstime(); + ri->link->last_ping_time = 0; + ri->link->last_avail_time = mstime(); + ri->link->last_pong_time = mstime(); + ri->role_reported_time = mstime(); + ri->role_reported = SRI_MASTER; + if (flags & SENTINEL_GENERATE_EVENT) + sentinelEvent(LL_WARNING,"+reset-master",ri,"%@"); +} + +/* Call sentinelResetMaster() on every master with a name matching the specified + * pattern. */ +int sentinelResetMastersByPattern(char *pattern, int flags) { + dictIterator *di; + dictEntry *de; + int reset = 0; + + di = dictGetIterator(sentinel.masters); + while((de = dictNext(di)) != NULL) { + sentinelRedisInstance *ri = dictGetVal(de); + + if (ri->name) { + if (stringmatch(pattern,ri->name,0)) { + sentinelResetMaster(ri,flags); + reset++; + } + } + } + dictReleaseIterator(di); + return reset; +} + +/* Reset the specified master with sentinelResetMaster(), and also change + * the ip:port address, but take the name of the instance unmodified. + * + * This is used to handle the +switch-master event. + * + * The function returns C_ERR if the address can't be resolved for some + * reason. Otherwise C_OK is returned. */ +int sentinelResetMasterAndChangeAddress(sentinelRedisInstance *master, char *hostname, int port) { + sentinelAddr *oldaddr, *newaddr; + sentinelAddr **slaves = NULL; + int numslaves = 0, j; + dictIterator *di; + dictEntry *de; + + newaddr = createSentinelAddr(hostname,port,0); + if (newaddr == NULL) return C_ERR; + + /* There can be only 0 or 1 slave that has the newaddr. + * and It can add old master 1 more slave. + * so It allocates dictSize(master->slaves) + 1 */ + slaves = zmalloc(sizeof(sentinelAddr*)*(dictSize(master->slaves) + 1)); + + /* Don't include the one having the address we are switching to. */ + di = dictGetIterator(master->slaves); + while((de = dictNext(di)) != NULL) { + sentinelRedisInstance *slave = dictGetVal(de); + + if (sentinelAddrOrHostnameEqual(slave->addr,newaddr)) continue; + slaves[numslaves++] = dupSentinelAddr(slave->addr); + } + dictReleaseIterator(di); + + /* If we are switching to a different address, include the old address + * as a slave as well, so that we'll be able to sense / reconfigure + * the old master. */ + if (!sentinelAddrOrHostnameEqual(newaddr,master->addr)) { + slaves[numslaves++] = dupSentinelAddr(master->addr); + } + + /* Reset and switch address. */ + sentinelResetMaster(master,SENTINEL_RESET_NO_SENTINELS); + oldaddr = master->addr; + master->addr = newaddr; + master->o_down_since_time = 0; + master->s_down_since_time = 0; + + /* Add slaves back. */ + for (j = 0; j < numslaves; j++) { + sentinelRedisInstance *slave; + + slave = createSentinelRedisInstance(NULL,SRI_SLAVE,slaves[j]->hostname, + slaves[j]->port, master->quorum, master); + releaseSentinelAddr(slaves[j]); + if (slave) sentinelEvent(LL_NOTICE,"+slave",slave,"%@"); + } + zfree(slaves); + + /* Release the old address at the end so we are safe even if the function + * gets the master->addr->ip and master->addr->port as arguments. */ + releaseSentinelAddr(oldaddr); + sentinelFlushConfig(); + return C_OK; +} + +/* Return non-zero if there was no SDOWN or ODOWN error associated to this + * instance in the latest 'ms' milliseconds. */ +int sentinelRedisInstanceNoDownFor(sentinelRedisInstance *ri, mstime_t ms) { + mstime_t most_recent; + + most_recent = ri->s_down_since_time; + if (ri->o_down_since_time > most_recent) + most_recent = ri->o_down_since_time; + return most_recent == 0 || (mstime() - most_recent) > ms; +} + +/* Return the current master address, that is, its address or the address + * of the promoted slave if already operational. */ +sentinelAddr *sentinelGetCurrentMasterAddress(sentinelRedisInstance *master) { + /* If we are failing over the master, and the state is already + * SENTINEL_FAILOVER_STATE_RECONF_SLAVES or greater, it means that we + * already have the new configuration epoch in the master, and the + * slave acknowledged the configuration switch. Advertise the new + * address. */ + if ((master->flags & SRI_FAILOVER_IN_PROGRESS) && + master->promoted_slave && + master->failover_state >= SENTINEL_FAILOVER_STATE_RECONF_SLAVES) + { + return master->promoted_slave->addr; + } else { + return master->addr; + } +} + +/* This function sets the down_after_period field value in 'master' to all + * the slaves and sentinel instances connected to this master. */ +void sentinelPropagateDownAfterPeriod(sentinelRedisInstance *master) { + dictIterator *di; + dictEntry *de; + int j; + dict *d[] = {master->slaves, master->sentinels, NULL}; + + for (j = 0; d[j]; j++) { + di = dictGetIterator(d[j]); + while((de = dictNext(di)) != NULL) { + sentinelRedisInstance *ri = dictGetVal(de); + ri->down_after_period = master->down_after_period; + } + dictReleaseIterator(di); + } +} + +/* This function is used in order to send commands to Redis instances: the + * commands we send from Sentinel may be renamed, a common case is a master + * with CONFIG and SLAVEOF commands renamed for security concerns. In that + * case we check the ri->renamed_command table (or if the instance is a slave, + * we check the one of the master), and map the command that we should send + * to the set of renamed commands. However, if the command was not renamed, + * we just return "command" itself. */ +char *sentinelInstanceMapCommand(sentinelRedisInstance *ri, char *command) { + sds sc = sdsnew(command); + if (ri->master) ri = ri->master; + char *retval = dictFetchValue(ri->renamed_commands, sc); + sdsfree(sc); + return retval ? retval : command; +} + +/* ============================ Config handling ============================= */ + +/* Generalise handling create instance error. Use SRI_MASTER, SRI_SLAVE or + * SRI_SENTINEL as a role value. */ +const char *sentinelCheckCreateInstanceErrors(int role) { + switch(errno) { + case EBUSY: + switch (role) { + case SRI_MASTER: + return "Duplicate master name."; + case SRI_SLAVE: + return "Duplicate hostname and port for replica."; + case SRI_SENTINEL: + return "Duplicate runid for sentinel."; + default: + serverAssert(0); + break; + } + break; + case ENOENT: + return "Can't resolve instance hostname."; + case EINVAL: + return "Invalid port number."; + default: + return "Unknown Error for creating instances."; + } +} + +/* init function for server.sentinel_config */ +void initializeSentinelConfig(void) { + server.sentinel_config = zmalloc(sizeof(struct sentinelConfig)); + server.sentinel_config->monitor_cfg = listCreate(); + server.sentinel_config->pre_monitor_cfg = listCreate(); + server.sentinel_config->post_monitor_cfg = listCreate(); + listSetFreeMethod(server.sentinel_config->monitor_cfg,freeSentinelLoadQueueEntry); + listSetFreeMethod(server.sentinel_config->pre_monitor_cfg,freeSentinelLoadQueueEntry); + listSetFreeMethod(server.sentinel_config->post_monitor_cfg,freeSentinelLoadQueueEntry); +} + +/* destroy function for server.sentinel_config */ +void freeSentinelConfig(void) { + /* release these three config queues since we will not use it anymore */ + listRelease(server.sentinel_config->pre_monitor_cfg); + listRelease(server.sentinel_config->monitor_cfg); + listRelease(server.sentinel_config->post_monitor_cfg); + zfree(server.sentinel_config); + server.sentinel_config = NULL; +} + +/* Search config name in pre monitor config name array, return 1 if found, + * 0 if not found. */ +int searchPreMonitorCfgName(const char *name) { + for (unsigned int i = 0; i < sizeof(preMonitorCfgName)/sizeof(preMonitorCfgName[0]); i++) { + if (!strcasecmp(preMonitorCfgName[i],name)) return 1; + } + return 0; +} + +/* free method for sentinelLoadQueueEntry when release the list */ +void freeSentinelLoadQueueEntry(void *item) { + struct sentinelLoadQueueEntry *entry = item; + sdsfreesplitres(entry->argv,entry->argc); + sdsfree(entry->line); + zfree(entry); +} + +/* This function is used for queuing sentinel configuration, the main + * purpose of this function is to delay parsing the sentinel config option + * in order to avoid the order dependent issue from the config. */ +void queueSentinelConfig(sds *argv, int argc, int linenum, sds line) { + int i; + struct sentinelLoadQueueEntry *entry; + + /* initialize sentinel_config for the first call */ + if (server.sentinel_config == NULL) initializeSentinelConfig(); + + entry = zmalloc(sizeof(struct sentinelLoadQueueEntry)); + entry->argv = zmalloc(sizeof(char*)*argc); + entry->argc = argc; + entry->linenum = linenum; + entry->line = sdsdup(line); + for (i = 0; i < argc; i++) { + entry->argv[i] = sdsdup(argv[i]); + } + /* Separate config lines with pre monitor config, monitor config and + * post monitor config, in order to parsing config dependencies + * correctly. */ + if (!strcasecmp(argv[0],"monitor")) { + listAddNodeTail(server.sentinel_config->monitor_cfg,entry); + } else if (searchPreMonitorCfgName(argv[0])) { + listAddNodeTail(server.sentinel_config->pre_monitor_cfg,entry); + } else{ + listAddNodeTail(server.sentinel_config->post_monitor_cfg,entry); + } +} + +/* This function is used for loading the sentinel configuration from + * pre_monitor_cfg, monitor_cfg and post_monitor_cfg list */ +void loadSentinelConfigFromQueue(void) { + const char *err = NULL; + listIter li; + listNode *ln; + int linenum = 0; + sds line = NULL; + unsigned int j; + + /* if there is no sentinel_config entry, we can return immediately */ + if (server.sentinel_config == NULL) return; + + list *sentinel_configs[3] = { + server.sentinel_config->pre_monitor_cfg, + server.sentinel_config->monitor_cfg, + server.sentinel_config->post_monitor_cfg + }; + /* loading from pre monitor config queue first to avoid dependency issues + * loading from monitor config queue + * loading from the post monitor config queue */ + for (j = 0; j < sizeof(sentinel_configs) / sizeof(sentinel_configs[0]); j++) { + listRewind(sentinel_configs[j],&li); + while((ln = listNext(&li))) { + struct sentinelLoadQueueEntry *entry = ln->value; + err = sentinelHandleConfiguration(entry->argv,entry->argc); + if (err) { + linenum = entry->linenum; + line = entry->line; + goto loaderr; + } + } + } + + /* free sentinel_config when config loading is finished */ + freeSentinelConfig(); + return; + +loaderr: + fprintf(stderr, "\n*** FATAL CONFIG FILE ERROR (Redis %s) ***\n", + REDIS_VERSION); + fprintf(stderr, "Reading the configuration file, at line %d\n", linenum); + fprintf(stderr, ">>> '%s'\n", line); + fprintf(stderr, "%s\n", err); + exit(1); +} + +const char *sentinelHandleConfiguration(char **argv, int argc) { + + sentinelRedisInstance *ri; + + if (!strcasecmp(argv[0],"monitor") && argc == 5) { + /* monitor <name> <host> <port> <quorum> */ + int quorum = atoi(argv[4]); + + if (quorum <= 0) return "Quorum must be 1 or greater."; + if (createSentinelRedisInstance(argv[1],SRI_MASTER,argv[2], + atoi(argv[3]),quorum,NULL) == NULL) + { + return sentinelCheckCreateInstanceErrors(SRI_MASTER); + } + } else if (!strcasecmp(argv[0],"down-after-milliseconds") && argc == 3) { + /* down-after-milliseconds <name> <milliseconds> */ + ri = sentinelGetMasterByName(argv[1]); + if (!ri) return "No such master with specified name."; + ri->down_after_period = atoi(argv[2]); + if (ri->down_after_period <= 0) + return "negative or zero time parameter."; + sentinelPropagateDownAfterPeriod(ri); + } else if (!strcasecmp(argv[0],"failover-timeout") && argc == 3) { + /* failover-timeout <name> <milliseconds> */ + ri = sentinelGetMasterByName(argv[1]); + if (!ri) return "No such master with specified name."; + ri->failover_timeout = atoi(argv[2]); + if (ri->failover_timeout <= 0) + return "negative or zero time parameter."; + } else if (!strcasecmp(argv[0],"parallel-syncs") && argc == 3) { + /* parallel-syncs <name> <milliseconds> */ + ri = sentinelGetMasterByName(argv[1]); + if (!ri) return "No such master with specified name."; + ri->parallel_syncs = atoi(argv[2]); + } else if (!strcasecmp(argv[0],"notification-script") && argc == 3) { + /* notification-script <name> <path> */ + ri = sentinelGetMasterByName(argv[1]); + if (!ri) return "No such master with specified name."; + if (access(argv[2],X_OK) == -1) + return "Notification script seems non existing or non executable."; + ri->notification_script = sdsnew(argv[2]); + } else if (!strcasecmp(argv[0],"client-reconfig-script") && argc == 3) { + /* client-reconfig-script <name> <path> */ + ri = sentinelGetMasterByName(argv[1]); + if (!ri) return "No such master with specified name."; + if (access(argv[2],X_OK) == -1) + return "Client reconfiguration script seems non existing or " + "non executable."; + ri->client_reconfig_script = sdsnew(argv[2]); + } else if (!strcasecmp(argv[0],"auth-pass") && argc == 3) { + /* auth-pass <name> <password> */ + ri = sentinelGetMasterByName(argv[1]); + if (!ri) return "No such master with specified name."; + ri->auth_pass = sdsnew(argv[2]); + } else if (!strcasecmp(argv[0],"auth-user") && argc == 3) { + /* auth-user <name> <username> */ + ri = sentinelGetMasterByName(argv[1]); + if (!ri) return "No such master with specified name."; + ri->auth_user = sdsnew(argv[2]); + } else if (!strcasecmp(argv[0],"current-epoch") && argc == 2) { + /* current-epoch <epoch> */ + unsigned long long current_epoch = strtoull(argv[1],NULL,10); + if (current_epoch > sentinel.current_epoch) + sentinel.current_epoch = current_epoch; + } else if (!strcasecmp(argv[0],"myid") && argc == 2) { + if (strlen(argv[1]) != CONFIG_RUN_ID_SIZE) + return "Malformed Sentinel id in myid option."; + memcpy(sentinel.myid,argv[1],CONFIG_RUN_ID_SIZE); + } else if (!strcasecmp(argv[0],"config-epoch") && argc == 3) { + /* config-epoch <name> <epoch> */ + ri = sentinelGetMasterByName(argv[1]); + if (!ri) return "No such master with specified name."; + ri->config_epoch = strtoull(argv[2],NULL,10); + /* The following update of current_epoch is not really useful as + * now the current epoch is persisted on the config file, but + * we leave this check here for redundancy. */ + if (ri->config_epoch > sentinel.current_epoch) + sentinel.current_epoch = ri->config_epoch; + } else if (!strcasecmp(argv[0],"leader-epoch") && argc == 3) { + /* leader-epoch <name> <epoch> */ + ri = sentinelGetMasterByName(argv[1]); + if (!ri) return "No such master with specified name."; + ri->leader_epoch = strtoull(argv[2],NULL,10); + } else if ((!strcasecmp(argv[0],"known-slave") || + !strcasecmp(argv[0],"known-replica")) && argc == 4) + { + sentinelRedisInstance *slave; + + /* known-replica <name> <ip> <port> */ + ri = sentinelGetMasterByName(argv[1]); + if (!ri) return "No such master with specified name."; + if ((slave = createSentinelRedisInstance(NULL,SRI_SLAVE,argv[2], + atoi(argv[3]), ri->quorum, ri)) == NULL) + { + return sentinelCheckCreateInstanceErrors(SRI_SLAVE); + } + } else if (!strcasecmp(argv[0],"known-sentinel") && + (argc == 4 || argc == 5)) { + sentinelRedisInstance *si; + + if (argc == 5) { /* Ignore the old form without runid. */ + /* known-sentinel <name> <ip> <port> [runid] */ + ri = sentinelGetMasterByName(argv[1]); + if (!ri) return "No such master with specified name."; + if ((si = createSentinelRedisInstance(argv[4],SRI_SENTINEL,argv[2], + atoi(argv[3]), ri->quorum, ri)) == NULL) + { + return sentinelCheckCreateInstanceErrors(SRI_SENTINEL); + } + si->runid = sdsnew(argv[4]); + sentinelTryConnectionSharing(si); + } + } else if (!strcasecmp(argv[0],"rename-command") && argc == 4) { + /* rename-command <name> <command> <renamed-command> */ + ri = sentinelGetMasterByName(argv[1]); + if (!ri) return "No such master with specified name."; + sds oldcmd = sdsnew(argv[2]); + sds newcmd = sdsnew(argv[3]); + if (dictAdd(ri->renamed_commands,oldcmd,newcmd) != DICT_OK) { + sdsfree(oldcmd); + sdsfree(newcmd); + return "Same command renamed multiple times with rename-command."; + } + } else if (!strcasecmp(argv[0],"announce-ip") && argc == 2) { + /* announce-ip <ip-address> */ + if (strlen(argv[1])) + sentinel.announce_ip = sdsnew(argv[1]); + } else if (!strcasecmp(argv[0],"announce-port") && argc == 2) { + /* announce-port <port> */ + sentinel.announce_port = atoi(argv[1]); + } else if (!strcasecmp(argv[0],"deny-scripts-reconfig") && argc == 2) { + /* deny-scripts-reconfig <yes|no> */ + if ((sentinel.deny_scripts_reconfig = yesnotoi(argv[1])) == -1) { + return "Please specify yes or no for the " + "deny-scripts-reconfig options."; + } + } else if (!strcasecmp(argv[0],"sentinel-user") && argc == 2) { + /* sentinel-user <user-name> */ + if (strlen(argv[1])) + sentinel.sentinel_auth_user = sdsnew(argv[1]); + } else if (!strcasecmp(argv[0],"sentinel-pass") && argc == 2) { + /* sentinel-pass <password> */ + if (strlen(argv[1])) + sentinel.sentinel_auth_pass = sdsnew(argv[1]); + } else if (!strcasecmp(argv[0],"resolve-hostnames") && argc == 2) { + /* resolve-hostnames <yes|no> */ + if ((sentinel.resolve_hostnames = yesnotoi(argv[1])) == -1) { + return "Please specify yes or no for the resolve-hostnames option."; + } + } else if (!strcasecmp(argv[0],"announce-hostnames") && argc == 2) { + /* announce-hostnames <yes|no> */ + if ((sentinel.announce_hostnames = yesnotoi(argv[1])) == -1) { + return "Please specify yes or no for the announce-hostnames option."; + } + } else if (!strcasecmp(argv[0],"master-reboot-down-after-period") && argc == 3) { + /* master-reboot-down-after-period <name> <milliseconds> */ + ri = sentinelGetMasterByName(argv[1]); + if (!ri) return "No such master with specified name."; + ri->master_reboot_down_after_period = atoi(argv[2]); + if (ri->master_reboot_down_after_period < 0) + return "negative time parameter."; + } else { + return "Unrecognized sentinel configuration statement."; + } + return NULL; +} + +/* Implements CONFIG REWRITE for "sentinel" option. + * This is used not just to rewrite the configuration given by the user + * (the configured masters) but also in order to retain the state of + * Sentinel across restarts: config epoch of masters, associated slaves + * and sentinel instances, and so forth. */ +void rewriteConfigSentinelOption(struct rewriteConfigState *state) { + dictIterator *di, *di2; + dictEntry *de; + sds line; + + /* sentinel unique ID. */ + line = sdscatprintf(sdsempty(), "sentinel myid %s", sentinel.myid); + rewriteConfigRewriteLine(state,"sentinel myid",line,1); + + /* sentinel deny-scripts-reconfig. */ + line = sdscatprintf(sdsempty(), "sentinel deny-scripts-reconfig %s", + sentinel.deny_scripts_reconfig ? "yes" : "no"); + rewriteConfigRewriteLine(state,"sentinel deny-scripts-reconfig",line, + sentinel.deny_scripts_reconfig != SENTINEL_DEFAULT_DENY_SCRIPTS_RECONFIG); + + /* sentinel resolve-hostnames. + * This must be included early in the file so it is already in effect + * when reading the file. + */ + line = sdscatprintf(sdsempty(), "sentinel resolve-hostnames %s", + sentinel.resolve_hostnames ? "yes" : "no"); + rewriteConfigRewriteLine(state,"sentinel resolve-hostnames",line, + sentinel.resolve_hostnames != SENTINEL_DEFAULT_RESOLVE_HOSTNAMES); + + /* sentinel announce-hostnames. */ + line = sdscatprintf(sdsempty(), "sentinel announce-hostnames %s", + sentinel.announce_hostnames ? "yes" : "no"); + rewriteConfigRewriteLine(state,"sentinel announce-hostnames",line, + sentinel.announce_hostnames != SENTINEL_DEFAULT_ANNOUNCE_HOSTNAMES); + + /* For every master emit a "sentinel monitor" config entry. */ + di = dictGetIterator(sentinel.masters); + while((de = dictNext(di)) != NULL) { + sentinelRedisInstance *master, *ri; + sentinelAddr *master_addr; + + /* sentinel monitor */ + master = dictGetVal(de); + master_addr = sentinelGetCurrentMasterAddress(master); + line = sdscatprintf(sdsempty(),"sentinel monitor %s %s %d %d", + master->name, announceSentinelAddr(master_addr), master_addr->port, + master->quorum); + rewriteConfigRewriteLine(state,"sentinel monitor",line,1); + /* rewriteConfigMarkAsProcessed is handled after the loop */ + + /* sentinel down-after-milliseconds */ + if (master->down_after_period != sentinel_default_down_after) { + line = sdscatprintf(sdsempty(), + "sentinel down-after-milliseconds %s %ld", + master->name, (long) master->down_after_period); + rewriteConfigRewriteLine(state,"sentinel down-after-milliseconds",line,1); + /* rewriteConfigMarkAsProcessed is handled after the loop */ + } + + /* sentinel failover-timeout */ + if (master->failover_timeout != sentinel_default_failover_timeout) { + line = sdscatprintf(sdsempty(), + "sentinel failover-timeout %s %ld", + master->name, (long) master->failover_timeout); + rewriteConfigRewriteLine(state,"sentinel failover-timeout",line,1); + /* rewriteConfigMarkAsProcessed is handled after the loop */ + + } + + /* sentinel parallel-syncs */ + if (master->parallel_syncs != SENTINEL_DEFAULT_PARALLEL_SYNCS) { + line = sdscatprintf(sdsempty(), + "sentinel parallel-syncs %s %d", + master->name, master->parallel_syncs); + rewriteConfigRewriteLine(state,"sentinel parallel-syncs",line,1); + /* rewriteConfigMarkAsProcessed is handled after the loop */ + } + + /* sentinel notification-script */ + if (master->notification_script) { + line = sdscatprintf(sdsempty(), + "sentinel notification-script %s %s", + master->name, master->notification_script); + rewriteConfigRewriteLine(state,"sentinel notification-script",line,1); + /* rewriteConfigMarkAsProcessed is handled after the loop */ + } + + /* sentinel client-reconfig-script */ + if (master->client_reconfig_script) { + line = sdscatprintf(sdsempty(), + "sentinel client-reconfig-script %s %s", + master->name, master->client_reconfig_script); + rewriteConfigRewriteLine(state,"sentinel client-reconfig-script",line,1); + /* rewriteConfigMarkAsProcessed is handled after the loop */ + } + + /* sentinel auth-pass & auth-user */ + if (master->auth_pass) { + line = sdscatprintf(sdsempty(), + "sentinel auth-pass %s %s", + master->name, master->auth_pass); + rewriteConfigRewriteLine(state,"sentinel auth-pass",line,1); + /* rewriteConfigMarkAsProcessed is handled after the loop */ + } + + if (master->auth_user) { + line = sdscatprintf(sdsempty(), + "sentinel auth-user %s %s", + master->name, master->auth_user); + rewriteConfigRewriteLine(state,"sentinel auth-user",line,1); + /* rewriteConfigMarkAsProcessed is handled after the loop */ + } + + /* sentinel master-reboot-down-after-period */ + if (master->master_reboot_down_after_period != 0) { + line = sdscatprintf(sdsempty(), + "sentinel master-reboot-down-after-period %s %ld", + master->name, (long) master->master_reboot_down_after_period); + rewriteConfigRewriteLine(state,"sentinel master-reboot-down-after-period",line,1); + /* rewriteConfigMarkAsProcessed is handled after the loop */ + } + + /* sentinel config-epoch */ + line = sdscatprintf(sdsempty(), + "sentinel config-epoch %s %llu", + master->name, (unsigned long long) master->config_epoch); + rewriteConfigRewriteLine(state,"sentinel config-epoch",line,1); + /* rewriteConfigMarkAsProcessed is handled after the loop */ + + + /* sentinel leader-epoch */ + line = sdscatprintf(sdsempty(), + "sentinel leader-epoch %s %llu", + master->name, (unsigned long long) master->leader_epoch); + rewriteConfigRewriteLine(state,"sentinel leader-epoch",line,1); + /* rewriteConfigMarkAsProcessed is handled after the loop */ + + /* sentinel known-slave */ + di2 = dictGetIterator(master->slaves); + while((de = dictNext(di2)) != NULL) { + sentinelAddr *slave_addr; + + ri = dictGetVal(de); + slave_addr = ri->addr; + + /* If master_addr (obtained using sentinelGetCurrentMasterAddress() + * so it may be the address of the promoted slave) is equal to this + * slave's address, a failover is in progress and the slave was + * already successfully promoted. So as the address of this slave + * we use the old master address instead. */ + if (sentinelAddrOrHostnameEqual(slave_addr,master_addr)) + slave_addr = master->addr; + line = sdscatprintf(sdsempty(), + "sentinel known-replica %s %s %d", + master->name, announceSentinelAddr(slave_addr), slave_addr->port); + /* try to replace any known-slave option first if found */ + if (rewriteConfigRewriteLine(state, "sentinel known-slave", sdsdup(line), 0) == 0) { + rewriteConfigRewriteLine(state, "sentinel known-replica", line, 1); + } else { + sdsfree(line); + } + /* rewriteConfigMarkAsProcessed is handled after the loop */ + } + dictReleaseIterator(di2); + + /* sentinel known-sentinel */ + di2 = dictGetIterator(master->sentinels); + while((de = dictNext(di2)) != NULL) { + ri = dictGetVal(de); + if (ri->runid == NULL) continue; + line = sdscatprintf(sdsempty(), + "sentinel known-sentinel %s %s %d %s", + master->name, announceSentinelAddr(ri->addr), ri->addr->port, ri->runid); + rewriteConfigRewriteLine(state,"sentinel known-sentinel",line,1); + /* rewriteConfigMarkAsProcessed is handled after the loop */ + } + dictReleaseIterator(di2); + + /* sentinel rename-command */ + di2 = dictGetIterator(master->renamed_commands); + while((de = dictNext(di2)) != NULL) { + sds oldname = dictGetKey(de); + sds newname = dictGetVal(de); + line = sdscatprintf(sdsempty(), + "sentinel rename-command %s %s %s", + master->name, oldname, newname); + rewriteConfigRewriteLine(state,"sentinel rename-command",line,1); + /* rewriteConfigMarkAsProcessed is handled after the loop */ + } + dictReleaseIterator(di2); + } + + /* sentinel current-epoch is a global state valid for all the masters. */ + line = sdscatprintf(sdsempty(), + "sentinel current-epoch %llu", (unsigned long long) sentinel.current_epoch); + rewriteConfigRewriteLine(state,"sentinel current-epoch",line,1); + + /* sentinel announce-ip. */ + if (sentinel.announce_ip) { + line = sdsnew("sentinel announce-ip "); + line = sdscatrepr(line, sentinel.announce_ip, sdslen(sentinel.announce_ip)); + rewriteConfigRewriteLine(state,"sentinel announce-ip",line,1); + } else { + rewriteConfigMarkAsProcessed(state,"sentinel announce-ip"); + } + + /* sentinel announce-port. */ + if (sentinel.announce_port) { + line = sdscatprintf(sdsempty(),"sentinel announce-port %d", + sentinel.announce_port); + rewriteConfigRewriteLine(state,"sentinel announce-port",line,1); + } else { + rewriteConfigMarkAsProcessed(state,"sentinel announce-port"); + } + + /* sentinel sentinel-user. */ + if (sentinel.sentinel_auth_user) { + line = sdscatprintf(sdsempty(), "sentinel sentinel-user %s", sentinel.sentinel_auth_user); + rewriteConfigRewriteLine(state,"sentinel sentinel-user",line,1); + } else { + rewriteConfigMarkAsProcessed(state,"sentinel sentinel-user"); + } + + /* sentinel sentinel-pass. */ + if (sentinel.sentinel_auth_pass) { + line = sdscatprintf(sdsempty(), "sentinel sentinel-pass %s", sentinel.sentinel_auth_pass); + rewriteConfigRewriteLine(state,"sentinel sentinel-pass",line,1); + } else { + rewriteConfigMarkAsProcessed(state,"sentinel sentinel-pass"); + } + + dictReleaseIterator(di); + + /* NOTE: the purpose here is in case due to the state change, the config rewrite + does not handle the configs, however, previously the config was set in the config file, + rewriteConfigMarkAsProcessed should be put here to mark it as processed in order to + delete the old config entry. + */ + rewriteConfigMarkAsProcessed(state,"sentinel monitor"); + rewriteConfigMarkAsProcessed(state,"sentinel down-after-milliseconds"); + rewriteConfigMarkAsProcessed(state,"sentinel failover-timeout"); + rewriteConfigMarkAsProcessed(state,"sentinel parallel-syncs"); + rewriteConfigMarkAsProcessed(state,"sentinel notification-script"); + rewriteConfigMarkAsProcessed(state,"sentinel client-reconfig-script"); + rewriteConfigMarkAsProcessed(state,"sentinel auth-pass"); + rewriteConfigMarkAsProcessed(state,"sentinel auth-user"); + rewriteConfigMarkAsProcessed(state,"sentinel config-epoch"); + rewriteConfigMarkAsProcessed(state,"sentinel leader-epoch"); + rewriteConfigMarkAsProcessed(state,"sentinel known-replica"); + rewriteConfigMarkAsProcessed(state,"sentinel known-sentinel"); + rewriteConfigMarkAsProcessed(state,"sentinel rename-command"); + rewriteConfigMarkAsProcessed(state,"sentinel master-reboot-down-after-period"); +} + +/* This function uses the config rewriting Redis engine in order to persist + * the state of the Sentinel in the current configuration file. + * + * On failure the function logs a warning on the Redis log. */ +int sentinelFlushConfig(void) { + int saved_hz = server.hz; + int rewrite_status; + + server.hz = CONFIG_DEFAULT_HZ; + rewrite_status = rewriteConfig(server.configfile, 0); + server.hz = saved_hz; + + if (rewrite_status == -1) { + serverLog(LL_WARNING,"WARNING: Sentinel was not able to save the new configuration on disk!!!: %s", strerror(errno)); + return C_ERR; + } else { + serverLog(LL_NOTICE,"Sentinel new configuration saved on disk"); + return C_OK; + } +} + +/* Call sentinelFlushConfig() produce a success/error reply to the + * calling client. + */ +static void sentinelFlushConfigAndReply(client *c) { + if (sentinelFlushConfig() == C_ERR) + addReplyError(c, "Failed to save config file. Check server logs."); + else + addReply(c, shared.ok); +} + +/* ====================== hiredis connection handling ======================= */ + +/* Send the AUTH command with the specified master password if needed. + * Note that for slaves the password set for the master is used. + * + * In case this Sentinel requires a password as well, via the "requirepass" + * configuration directive, we assume we should use the local password in + * order to authenticate when connecting with the other Sentinels as well. + * So basically all the Sentinels share the same password and use it to + * authenticate reciprocally. + * + * We don't check at all if the command was successfully transmitted + * to the instance as if it fails Sentinel will detect the instance down, + * will disconnect and reconnect the link and so forth. */ +void sentinelSendAuthIfNeeded(sentinelRedisInstance *ri, redisAsyncContext *c) { + char *auth_pass = NULL; + char *auth_user = NULL; + + if (ri->flags & SRI_MASTER) { + auth_pass = ri->auth_pass; + auth_user = ri->auth_user; + } else if (ri->flags & SRI_SLAVE) { + auth_pass = ri->master->auth_pass; + auth_user = ri->master->auth_user; + } else if (ri->flags & SRI_SENTINEL) { + /* If sentinel_auth_user is NULL, AUTH will use default user + with sentinel_auth_pass to authenticate */ + if (sentinel.sentinel_auth_pass) { + auth_pass = sentinel.sentinel_auth_pass; + auth_user = sentinel.sentinel_auth_user; + } else { + /* Compatibility with old configs. requirepass is used + * for both incoming and outgoing authentication. */ + auth_pass = server.requirepass; + auth_user = NULL; + } + } + + if (auth_pass && auth_user == NULL) { + if (redisAsyncCommand(c, sentinelDiscardReplyCallback, ri, "%s %s", + sentinelInstanceMapCommand(ri,"AUTH"), + auth_pass) == C_OK) ri->link->pending_commands++; + } else if (auth_pass && auth_user) { + /* If we also have an username, use the ACL-style AUTH command + * with two arguments, username and password. */ + if (redisAsyncCommand(c, sentinelDiscardReplyCallback, ri, "%s %s %s", + sentinelInstanceMapCommand(ri,"AUTH"), + auth_user, auth_pass) == C_OK) ri->link->pending_commands++; + } +} + +/* Use CLIENT SETNAME to name the connection in the Redis instance as + * sentinel-<first_8_chars_of_runid>-<connection_type> + * The connection type is "cmd" or "pubsub" as specified by 'type'. + * + * This makes it possible to list all the sentinel instances connected + * to a Redis server with CLIENT LIST, grepping for a specific name format. */ +void sentinelSetClientName(sentinelRedisInstance *ri, redisAsyncContext *c, char *type) { + char name[64]; + + snprintf(name,sizeof(name),"sentinel-%.8s-%s",sentinel.myid,type); + if (redisAsyncCommand(c, sentinelDiscardReplyCallback, ri, + "%s SETNAME %s", + sentinelInstanceMapCommand(ri,"CLIENT"), + name) == C_OK) + { + ri->link->pending_commands++; + } +} + +static int instanceLinkNegotiateTLS(redisAsyncContext *context) { +#if USE_OPENSSL == 1 /* BUILD_YES */ + if (!redis_tls_ctx) return C_ERR; + SSL *ssl = SSL_new(redis_tls_client_ctx ? redis_tls_client_ctx : redis_tls_ctx); + if (!ssl) return C_ERR; + + if (redisInitiateSSL(&context->c, ssl) == REDIS_ERR) { + SSL_free(ssl); + return C_ERR; + } +#else + UNUSED(context); +#endif + return C_OK; +} + +/* Create the async connections for the instance link if the link + * is disconnected. Note that link->disconnected is true even if just + * one of the two links (commands and pub/sub) is missing. */ +void sentinelReconnectInstance(sentinelRedisInstance *ri) { + + if (ri->link->disconnected == 0) return; + if (ri->addr->port == 0) return; /* port == 0 means invalid address. */ + instanceLink *link = ri->link; + mstime_t now = mstime(); + + if (now - ri->link->last_reconn_time < sentinel_ping_period) return; + ri->link->last_reconn_time = now; + + /* Commands connection. */ + if (link->cc == NULL) { + + /* It might be that the instance is disconnected because it wasn't available earlier when the instance + * allocated, say during failover, and therefore we failed to resolve its ip. + * Another scenario is that the instance restarted with new ip, and we should resolve its new ip based on + * its hostname */ + if (sentinel.resolve_hostnames) { + sentinelAddr *tryResolveAddr = createSentinelAddr(ri->addr->hostname, ri->addr->port, 0); + if (tryResolveAddr != NULL) { + releaseSentinelAddr(ri->addr); + ri->addr = tryResolveAddr; + } + } + + link->cc = redisAsyncConnectBind(ri->addr->ip,ri->addr->port,server.bind_source_addr); + + if (link->cc && !link->cc->err) anetCloexec(link->cc->c.fd); + if (!link->cc) { + sentinelEvent(LL_DEBUG,"-cmd-link-reconnection",ri,"%@ #Failed to establish connection"); + } else if (!link->cc->err && server.tls_replication && + (instanceLinkNegotiateTLS(link->cc) == C_ERR)) { + sentinelEvent(LL_DEBUG,"-cmd-link-reconnection",ri,"%@ #Failed to initialize TLS"); + instanceLinkCloseConnection(link,link->cc); + } else if (link->cc->err) { + sentinelEvent(LL_DEBUG,"-cmd-link-reconnection",ri,"%@ #%s", + link->cc->errstr); + instanceLinkCloseConnection(link,link->cc); + } else { + link->pending_commands = 0; + link->cc_conn_time = mstime(); + link->cc->data = link; + redisAeAttach(server.el,link->cc); + redisAsyncSetConnectCallback(link->cc, + sentinelLinkEstablishedCallback); + redisAsyncSetDisconnectCallback(link->cc, + sentinelDisconnectCallback); + sentinelSendAuthIfNeeded(ri,link->cc); + sentinelSetClientName(ri,link->cc,"cmd"); + + /* Send a PING ASAP when reconnecting. */ + sentinelSendPing(ri); + } + } + /* Pub / Sub */ + if ((ri->flags & (SRI_MASTER|SRI_SLAVE)) && link->pc == NULL) { + link->pc = redisAsyncConnectBind(ri->addr->ip,ri->addr->port,server.bind_source_addr); + if (link->pc && !link->pc->err) anetCloexec(link->pc->c.fd); + if (!link->pc) { + sentinelEvent(LL_DEBUG,"-pubsub-link-reconnection",ri,"%@ #Failed to establish connection"); + } else if (!link->pc->err && server.tls_replication && + (instanceLinkNegotiateTLS(link->pc) == C_ERR)) { + sentinelEvent(LL_DEBUG,"-pubsub-link-reconnection",ri,"%@ #Failed to initialize TLS"); + } else if (link->pc->err) { + sentinelEvent(LL_DEBUG,"-pubsub-link-reconnection",ri,"%@ #%s", + link->pc->errstr); + instanceLinkCloseConnection(link,link->pc); + } else { + int retval; + link->pc_conn_time = mstime(); + link->pc->data = link; + redisAeAttach(server.el,link->pc); + redisAsyncSetConnectCallback(link->pc, + sentinelLinkEstablishedCallback); + redisAsyncSetDisconnectCallback(link->pc, + sentinelDisconnectCallback); + sentinelSendAuthIfNeeded(ri,link->pc); + sentinelSetClientName(ri,link->pc,"pubsub"); + /* Now we subscribe to the Sentinels "Hello" channel. */ + retval = redisAsyncCommand(link->pc, + sentinelReceiveHelloMessages, ri, "%s %s", + sentinelInstanceMapCommand(ri,"SUBSCRIBE"), + SENTINEL_HELLO_CHANNEL); + if (retval != C_OK) { + /* If we can't subscribe, the Pub/Sub connection is useless + * and we can simply disconnect it and try again. */ + instanceLinkCloseConnection(link,link->pc); + return; + } + } + } + /* Clear the disconnected status only if we have both the connections + * (or just the commands connection if this is a sentinel instance). */ + if (link->cc && (ri->flags & SRI_SENTINEL || link->pc)) + link->disconnected = 0; +} + +/* ======================== Redis instances pinging ======================== */ + +/* Return true if master looks "sane", that is: + * 1) It is actually a master in the current configuration. + * 2) It reports itself as a master. + * 3) It is not SDOWN or ODOWN. + * 4) We obtained last INFO no more than two times the INFO period time ago. */ +int sentinelMasterLooksSane(sentinelRedisInstance *master) { + return + master->flags & SRI_MASTER && + master->role_reported == SRI_MASTER && + (master->flags & (SRI_S_DOWN|SRI_O_DOWN)) == 0 && + (mstime() - master->info_refresh) < sentinel_info_period*2; +} + +/* Process the INFO output from masters. */ +void sentinelRefreshInstanceInfo(sentinelRedisInstance *ri, const char *info) { + sds *lines; + int numlines, j; + int role = 0; + + /* cache full INFO output for instance */ + sdsfree(ri->info); + ri->info = sdsnew(info); + + /* The following fields must be reset to a given value in the case they + * are not found at all in the INFO output. */ + ri->master_link_down_time = 0; + + /* Process line by line. */ + lines = sdssplitlen(info,strlen(info),"\r\n",2,&numlines); + for (j = 0; j < numlines; j++) { + sentinelRedisInstance *slave; + sds l = lines[j]; + + /* run_id:<40 hex chars>*/ + if (sdslen(l) >= 47 && !memcmp(l,"run_id:",7)) { + if (ri->runid == NULL) { + ri->runid = sdsnewlen(l+7,40); + } else { + if (strncmp(ri->runid,l+7,40) != 0) { + sentinelEvent(LL_NOTICE,"+reboot",ri,"%@"); + + if (ri->flags & SRI_MASTER && ri->master_reboot_down_after_period != 0) { + ri->flags |= SRI_MASTER_REBOOT; + ri->master_reboot_since_time = mstime(); + } + + sdsfree(ri->runid); + ri->runid = sdsnewlen(l+7,40); + } + } + } + + /* old versions: slave0:<ip>,<port>,<state> + * new versions: slave0:ip=127.0.0.1,port=9999,... */ + if ((ri->flags & SRI_MASTER) && + sdslen(l) >= 7 && + !memcmp(l,"slave",5) && isdigit(l[5])) + { + char *ip, *port, *end; + + if (strstr(l,"ip=") == NULL) { + /* Old format. */ + ip = strchr(l,':'); if (!ip) continue; + ip++; /* Now ip points to start of ip address. */ + port = strchr(ip,','); if (!port) continue; + *port = '\0'; /* nul term for easy access. */ + port++; /* Now port points to start of port number. */ + end = strchr(port,','); if (!end) continue; + *end = '\0'; /* nul term for easy access. */ + } else { + /* New format. */ + ip = strstr(l,"ip="); if (!ip) continue; + ip += 3; /* Now ip points to start of ip address. */ + port = strstr(l,"port="); if (!port) continue; + port += 5; /* Now port points to start of port number. */ + /* Nul term both fields for easy access. */ + end = strchr(ip,','); if (end) *end = '\0'; + end = strchr(port,','); if (end) *end = '\0'; + } + + /* Check if we already have this slave into our table, + * otherwise add it. */ + if (sentinelRedisInstanceLookupSlave(ri,ip,atoi(port)) == NULL) { + if ((slave = createSentinelRedisInstance(NULL,SRI_SLAVE,ip, + atoi(port), ri->quorum, ri)) != NULL) + { + sentinelEvent(LL_NOTICE,"+slave",slave,"%@"); + sentinelFlushConfig(); + } + } + } + + /* master_link_down_since_seconds:<seconds> */ + if (sdslen(l) >= 32 && + !memcmp(l,"master_link_down_since_seconds",30)) + { + ri->master_link_down_time = strtoll(l+31,NULL,10)*1000; + } + + /* role:<role> */ + if (sdslen(l) >= 11 && !memcmp(l,"role:master",11)) role = SRI_MASTER; + else if (sdslen(l) >= 10 && !memcmp(l,"role:slave",10)) role = SRI_SLAVE; + + if (role == SRI_SLAVE) { + /* master_host:<host> */ + if (sdslen(l) >= 12 && !memcmp(l,"master_host:",12)) { + if (ri->slave_master_host == NULL || + strcasecmp(l+12,ri->slave_master_host)) + { + sdsfree(ri->slave_master_host); + ri->slave_master_host = sdsnew(l+12); + ri->slave_conf_change_time = mstime(); + } + } + + /* master_port:<port> */ + if (sdslen(l) >= 12 && !memcmp(l,"master_port:",12)) { + int slave_master_port = atoi(l+12); + + if (ri->slave_master_port != slave_master_port) { + ri->slave_master_port = slave_master_port; + ri->slave_conf_change_time = mstime(); + } + } + + /* master_link_status:<status> */ + if (sdslen(l) >= 19 && !memcmp(l,"master_link_status:",19)) { + ri->slave_master_link_status = + (strcasecmp(l+19,"up") == 0) ? + SENTINEL_MASTER_LINK_STATUS_UP : + SENTINEL_MASTER_LINK_STATUS_DOWN; + } + + /* slave_priority:<priority> */ + if (sdslen(l) >= 15 && !memcmp(l,"slave_priority:",15)) + ri->slave_priority = atoi(l+15); + + /* slave_repl_offset:<offset> */ + if (sdslen(l) >= 18 && !memcmp(l,"slave_repl_offset:",18)) + ri->slave_repl_offset = strtoull(l+18,NULL,10); + + /* replica_announced:<announcement> */ + if (sdslen(l) >= 18 && !memcmp(l,"replica_announced:",18)) + ri->replica_announced = atoi(l+18); + } + } + ri->info_refresh = mstime(); + sdsfreesplitres(lines,numlines); + + /* ---------------------------- Acting half ----------------------------- + * Some things will not happen if sentinel.tilt is true, but some will + * still be processed. */ + + /* Remember when the role changed. */ + if (role != ri->role_reported) { + ri->role_reported_time = mstime(); + ri->role_reported = role; + if (role == SRI_SLAVE) ri->slave_conf_change_time = mstime(); + /* Log the event with +role-change if the new role is coherent or + * with -role-change if there is a mismatch with the current config. */ + sentinelEvent(LL_VERBOSE, + ((ri->flags & (SRI_MASTER|SRI_SLAVE)) == role) ? + "+role-change" : "-role-change", + ri, "%@ new reported role is %s", + role == SRI_MASTER ? "master" : "slave"); + } + + /* None of the following conditions are processed when in tilt mode, so + * return asap. */ + if (sentinel.tilt) return; + + /* Handle master -> slave role switch. */ + if ((ri->flags & SRI_MASTER) && role == SRI_SLAVE) { + /* Nothing to do, but masters claiming to be slaves are + * considered to be unreachable by Sentinel, so eventually + * a failover will be triggered. */ + } + + /* Handle slave -> master role switch. */ + if ((ri->flags & SRI_SLAVE) && role == SRI_MASTER) { + /* If this is a promoted slave we can change state to the + * failover state machine. */ + if ((ri->flags & SRI_PROMOTED) && + (ri->master->flags & SRI_FAILOVER_IN_PROGRESS) && + (ri->master->failover_state == + SENTINEL_FAILOVER_STATE_WAIT_PROMOTION)) + { + /* Now that we are sure the slave was reconfigured as a master + * set the master configuration epoch to the epoch we won the + * election to perform this failover. This will force the other + * Sentinels to update their config (assuming there is not + * a newer one already available). */ + ri->master->config_epoch = ri->master->failover_epoch; + ri->master->failover_state = SENTINEL_FAILOVER_STATE_RECONF_SLAVES; + ri->master->failover_state_change_time = mstime(); + sentinelFlushConfig(); + sentinelEvent(LL_WARNING,"+promoted-slave",ri,"%@"); + if (sentinel.simfailure_flags & + SENTINEL_SIMFAILURE_CRASH_AFTER_PROMOTION) + sentinelSimFailureCrash(); + sentinelEvent(LL_WARNING,"+failover-state-reconf-slaves", + ri->master,"%@"); + sentinelCallClientReconfScript(ri->master,SENTINEL_LEADER, + "start",ri->master->addr,ri->addr); + sentinelForceHelloUpdateForMaster(ri->master); + } else { + /* A slave turned into a master. We want to force our view and + * reconfigure as slave. Wait some time after the change before + * going forward, to receive new configs if any. */ + mstime_t wait_time = sentinel_publish_period*4; + + if (!(ri->flags & SRI_PROMOTED) && + sentinelMasterLooksSane(ri->master) && + sentinelRedisInstanceNoDownFor(ri,wait_time) && + mstime() - ri->role_reported_time > wait_time) + { + int retval = sentinelSendSlaveOf(ri,ri->master->addr); + if (retval == C_OK) + sentinelEvent(LL_NOTICE,"+convert-to-slave",ri,"%@"); + } + } + } + + /* Handle slaves replicating to a different master address. */ + if ((ri->flags & SRI_SLAVE) && + role == SRI_SLAVE && + (ri->slave_master_port != ri->master->addr->port || + !sentinelAddrEqualsHostname(ri->master->addr, ri->slave_master_host))) + { + mstime_t wait_time = ri->master->failover_timeout; + + /* Make sure the master is sane before reconfiguring this instance + * into a slave. */ + if (sentinelMasterLooksSane(ri->master) && + sentinelRedisInstanceNoDownFor(ri,wait_time) && + mstime() - ri->slave_conf_change_time > wait_time) + { + int retval = sentinelSendSlaveOf(ri,ri->master->addr); + if (retval == C_OK) + sentinelEvent(LL_NOTICE,"+fix-slave-config",ri,"%@"); + } + } + + /* Detect if the slave that is in the process of being reconfigured + * changed state. */ + if ((ri->flags & SRI_SLAVE) && role == SRI_SLAVE && + (ri->flags & (SRI_RECONF_SENT|SRI_RECONF_INPROG))) + { + /* SRI_RECONF_SENT -> SRI_RECONF_INPROG. */ + if ((ri->flags & SRI_RECONF_SENT) && + ri->slave_master_host && + sentinelAddrEqualsHostname(ri->master->promoted_slave->addr, + ri->slave_master_host) && + ri->slave_master_port == ri->master->promoted_slave->addr->port) + { + ri->flags &= ~SRI_RECONF_SENT; + ri->flags |= SRI_RECONF_INPROG; + sentinelEvent(LL_NOTICE,"+slave-reconf-inprog",ri,"%@"); + } + + /* SRI_RECONF_INPROG -> SRI_RECONF_DONE */ + if ((ri->flags & SRI_RECONF_INPROG) && + ri->slave_master_link_status == SENTINEL_MASTER_LINK_STATUS_UP) + { + ri->flags &= ~SRI_RECONF_INPROG; + ri->flags |= SRI_RECONF_DONE; + sentinelEvent(LL_NOTICE,"+slave-reconf-done",ri,"%@"); + } + } +} + +void sentinelInfoReplyCallback(redisAsyncContext *c, void *reply, void *privdata) { + sentinelRedisInstance *ri = privdata; + instanceLink *link = c->data; + redisReply *r; + + if (!reply || !link) return; + link->pending_commands--; + r = reply; + + /* INFO reply type is verbatim in resp3. Normally, sentinel will not use + * resp3 but this is required for testing (see logreqres.c). */ + if (r->type == REDIS_REPLY_STRING || r->type == REDIS_REPLY_VERB) + sentinelRefreshInstanceInfo(ri,r->str); +} + +/* Just discard the reply. We use this when we are not monitoring the return + * value of the command but its effects directly. */ +void sentinelDiscardReplyCallback(redisAsyncContext *c, void *reply, void *privdata) { + instanceLink *link = c->data; + UNUSED(reply); + UNUSED(privdata); + + if (link) link->pending_commands--; +} + +void sentinelPingReplyCallback(redisAsyncContext *c, void *reply, void *privdata) { + sentinelRedisInstance *ri = privdata; + instanceLink *link = c->data; + redisReply *r; + + if (!reply || !link) return; + link->pending_commands--; + r = reply; + + if (r->type == REDIS_REPLY_STATUS || + r->type == REDIS_REPLY_ERROR) { + /* Update the "instance available" field only if this is an + * acceptable reply. */ + if (strncmp(r->str,"PONG",4) == 0 || + strncmp(r->str,"LOADING",7) == 0 || + strncmp(r->str,"MASTERDOWN",10) == 0) + { + link->last_avail_time = mstime(); + link->act_ping_time = 0; /* Flag the pong as received. */ + + if (ri->flags & SRI_MASTER_REBOOT && strncmp(r->str,"PONG",4) == 0) + ri->flags &= ~SRI_MASTER_REBOOT; + + } else { + /* Send a SCRIPT KILL command if the instance appears to be + * down because of a busy script. */ + if (strncmp(r->str,"BUSY",4) == 0 && + (ri->flags & SRI_S_DOWN) && + !(ri->flags & SRI_SCRIPT_KILL_SENT)) + { + if (redisAsyncCommand(ri->link->cc, + sentinelDiscardReplyCallback, ri, + "%s KILL", + sentinelInstanceMapCommand(ri,"SCRIPT")) == C_OK) + { + ri->link->pending_commands++; + } + ri->flags |= SRI_SCRIPT_KILL_SENT; + } + } + } + link->last_pong_time = mstime(); +} + +/* This is called when we get the reply about the PUBLISH command we send + * to the master to advertise this sentinel. */ +void sentinelPublishReplyCallback(redisAsyncContext *c, void *reply, void *privdata) { + sentinelRedisInstance *ri = privdata; + instanceLink *link = c->data; + redisReply *r; + + if (!reply || !link) return; + link->pending_commands--; + r = reply; + + /* Only update pub_time if we actually published our message. Otherwise + * we'll retry again in 100 milliseconds. */ + if (r->type != REDIS_REPLY_ERROR) + ri->last_pub_time = mstime(); +} + +/* Process a hello message received via Pub/Sub in master or slave instance, + * or sent directly to this sentinel via the (fake) PUBLISH command of Sentinel. + * + * If the master name specified in the message is not known, the message is + * discarded. */ +void sentinelProcessHelloMessage(char *hello, int hello_len) { + /* Format is composed of 8 tokens: + * 0=ip,1=port,2=runid,3=current_epoch,4=master_name, + * 5=master_ip,6=master_port,7=master_config_epoch. */ + int numtokens, port, removed, master_port; + uint64_t current_epoch, master_config_epoch; + char **token = sdssplitlen(hello, hello_len, ",", 1, &numtokens); + sentinelRedisInstance *si, *master; + + if (numtokens == 8) { + /* Obtain a reference to the master this hello message is about */ + master = sentinelGetMasterByName(token[4]); + if (!master) goto cleanup; /* Unknown master, skip the message. */ + + /* First, try to see if we already have this sentinel. */ + port = atoi(token[1]); + master_port = atoi(token[6]); + si = getSentinelRedisInstanceByAddrAndRunID( + master->sentinels,token[0],port,token[2]); + current_epoch = strtoull(token[3],NULL,10); + master_config_epoch = strtoull(token[7],NULL,10); + + if (!si) { + /* If not, remove all the sentinels that have the same runid + * because there was an address change, and add the same Sentinel + * with the new address back. */ + removed = removeMatchingSentinelFromMaster(master,token[2]); + if (removed) { + sentinelEvent(LL_NOTICE,"+sentinel-address-switch",master, + "%@ ip %s port %d for %s", token[0],port,token[2]); + } else { + /* Check if there is another Sentinel with the same address this + * new one is reporting. What we do if this happens is to set its + * port to 0, to signal the address is invalid. We'll update it + * later if we get an HELLO message. */ + sentinelRedisInstance *other = + getSentinelRedisInstanceByAddrAndRunID( + master->sentinels, token[0],port,NULL); + if (other) { + /* If there is already other sentinel with same address (but + * different runid) then remove the old one across all masters */ + sentinelEvent(LL_NOTICE,"+sentinel-invalid-addr",other,"%@"); + dictIterator *di; + dictEntry *de; + + /* Keep a copy of runid. 'other' about to be deleted in loop. */ + sds runid_obsolete = sdsnew(other->runid); + + di = dictGetIterator(sentinel.masters); + while((de = dictNext(di)) != NULL) { + sentinelRedisInstance *master = dictGetVal(de); + removeMatchingSentinelFromMaster(master, runid_obsolete); + } + dictReleaseIterator(di); + sdsfree(runid_obsolete); + } + } + + /* Add the new sentinel. */ + si = createSentinelRedisInstance(token[2],SRI_SENTINEL, + token[0],port,master->quorum,master); + + if (si) { + if (!removed) sentinelEvent(LL_NOTICE,"+sentinel",si,"%@"); + /* The runid is NULL after a new instance creation and + * for Sentinels we don't have a later chance to fill it, + * so do it now. */ + si->runid = sdsnew(token[2]); + sentinelTryConnectionSharing(si); + if (removed) sentinelUpdateSentinelAddressInAllMasters(si); + sentinelFlushConfig(); + } + } + + /* Update local current_epoch if received current_epoch is greater.*/ + if (current_epoch > sentinel.current_epoch) { + sentinel.current_epoch = current_epoch; + sentinelFlushConfig(); + sentinelEvent(LL_WARNING,"+new-epoch",master,"%llu", + (unsigned long long) sentinel.current_epoch); + } + + /* Update master info if received configuration is newer. */ + if (si && master->config_epoch < master_config_epoch) { + master->config_epoch = master_config_epoch; + if (master_port != master->addr->port || + !sentinelAddrEqualsHostname(master->addr, token[5])) + { + sentinelAddr *old_addr; + + sentinelEvent(LL_WARNING,"+config-update-from",si,"%@"); + sentinelEvent(LL_WARNING,"+switch-master", + master,"%s %s %d %s %d", + master->name, + announceSentinelAddr(master->addr), master->addr->port, + token[5], master_port); + + old_addr = dupSentinelAddr(master->addr); + sentinelResetMasterAndChangeAddress(master, token[5], master_port); + sentinelCallClientReconfScript(master, + SENTINEL_OBSERVER,"start", + old_addr,master->addr); + releaseSentinelAddr(old_addr); + } + } + + /* Update the state of the Sentinel. */ + if (si) si->last_hello_time = mstime(); + } + +cleanup: + sdsfreesplitres(token,numtokens); +} + + +/* This is our Pub/Sub callback for the Hello channel. It's useful in order + * to discover other sentinels attached at the same master. */ +void sentinelReceiveHelloMessages(redisAsyncContext *c, void *reply, void *privdata) { + sentinelRedisInstance *ri = privdata; + redisReply *r; + UNUSED(c); + + if (!reply || !ri) return; + r = reply; + + /* Update the last activity in the pubsub channel. Note that since we + * receive our messages as well this timestamp can be used to detect + * if the link is probably disconnected even if it seems otherwise. */ + ri->link->pc_last_activity = mstime(); + + /* Sanity check in the reply we expect, so that the code that follows + * can avoid to check for details. + * Note: Reply type is PUSH in resp3. Normally, sentinel will not use + * resp3 but this is required for testing (see logreqres.c). */ + if ((r->type != REDIS_REPLY_ARRAY && r->type != REDIS_REPLY_PUSH) || + r->elements != 3 || + r->element[0]->type != REDIS_REPLY_STRING || + r->element[1]->type != REDIS_REPLY_STRING || + r->element[2]->type != REDIS_REPLY_STRING || + strcmp(r->element[0]->str,"message") != 0) return; + + /* We are not interested in meeting ourselves */ + if (strstr(r->element[2]->str,sentinel.myid) != NULL) return; + + sentinelProcessHelloMessage(r->element[2]->str, r->element[2]->len); +} + +/* Send a "Hello" message via Pub/Sub to the specified 'ri' Redis + * instance in order to broadcast the current configuration for this + * master, and to advertise the existence of this Sentinel at the same time. + * + * The message has the following format: + * + * sentinel_ip,sentinel_port,sentinel_runid,current_epoch, + * master_name,master_ip,master_port,master_config_epoch. + * + * Returns C_OK if the PUBLISH was queued correctly, otherwise + * C_ERR is returned. */ +int sentinelSendHello(sentinelRedisInstance *ri) { + char ip[NET_IP_STR_LEN]; + char payload[NET_IP_STR_LEN+1024]; + int retval; + char *announce_ip; + int announce_port; + sentinelRedisInstance *master = (ri->flags & SRI_MASTER) ? ri : ri->master; + sentinelAddr *master_addr = sentinelGetCurrentMasterAddress(master); + + if (ri->link->disconnected) return C_ERR; + + /* Use the specified announce address if specified, otherwise try to + * obtain our own IP address. */ + if (sentinel.announce_ip) { + announce_ip = sentinel.announce_ip; + } else { + if (anetFdToString(ri->link->cc->c.fd,ip,sizeof(ip),NULL,0) == -1) + return C_ERR; + announce_ip = ip; + } + if (sentinel.announce_port) announce_port = sentinel.announce_port; + else if (server.tls_replication && server.tls_port) announce_port = server.tls_port; + else announce_port = server.port; + + /* Format and send the Hello message. */ + snprintf(payload,sizeof(payload), + "%s,%d,%s,%llu," /* Info about this sentinel. */ + "%s,%s,%d,%llu", /* Info about current master. */ + announce_ip, announce_port, sentinel.myid, + (unsigned long long) sentinel.current_epoch, + /* --- */ + master->name,announceSentinelAddr(master_addr),master_addr->port, + (unsigned long long) master->config_epoch); + retval = redisAsyncCommand(ri->link->cc, + sentinelPublishReplyCallback, ri, "%s %s %s", + sentinelInstanceMapCommand(ri,"PUBLISH"), + SENTINEL_HELLO_CHANNEL,payload); + if (retval != C_OK) return C_ERR; + ri->link->pending_commands++; + return C_OK; +} + +/* Reset last_pub_time in all the instances in the specified dictionary + * in order to force the delivery of a Hello update ASAP. */ +void sentinelForceHelloUpdateDictOfRedisInstances(dict *instances) { + dictIterator *di; + dictEntry *de; + + di = dictGetSafeIterator(instances); + while((de = dictNext(di)) != NULL) { + sentinelRedisInstance *ri = dictGetVal(de); + if (ri->last_pub_time >= (sentinel_publish_period+1)) + ri->last_pub_time -= (sentinel_publish_period+1); + } + dictReleaseIterator(di); +} + +/* This function forces the delivery of a "Hello" message (see + * sentinelSendHello() top comment for further information) to all the Redis + * and Sentinel instances related to the specified 'master'. + * + * It is technically not needed since we send an update to every instance + * with a period of SENTINEL_PUBLISH_PERIOD milliseconds, however when a + * Sentinel upgrades a configuration it is a good idea to deliver an update + * to the other Sentinels ASAP. */ +int sentinelForceHelloUpdateForMaster(sentinelRedisInstance *master) { + if (!(master->flags & SRI_MASTER)) return C_ERR; + if (master->last_pub_time >= (sentinel_publish_period+1)) + master->last_pub_time -= (sentinel_publish_period+1); + sentinelForceHelloUpdateDictOfRedisInstances(master->sentinels); + sentinelForceHelloUpdateDictOfRedisInstances(master->slaves); + return C_OK; +} + +/* Send a PING to the specified instance and refresh the act_ping_time + * if it is zero (that is, if we received a pong for the previous ping). + * + * On error zero is returned, and we can't consider the PING command + * queued in the connection. */ +int sentinelSendPing(sentinelRedisInstance *ri) { + int retval = redisAsyncCommand(ri->link->cc, + sentinelPingReplyCallback, ri, "%s", + sentinelInstanceMapCommand(ri,"PING")); + if (retval == C_OK) { + ri->link->pending_commands++; + ri->link->last_ping_time = mstime(); + /* We update the active ping time only if we received the pong for + * the previous ping, otherwise we are technically waiting since the + * first ping that did not receive a reply. */ + if (ri->link->act_ping_time == 0) + ri->link->act_ping_time = ri->link->last_ping_time; + return 1; + } else { + return 0; + } +} + +/* Send periodic PING, INFO, and PUBLISH to the Hello channel to + * the specified master or slave instance. */ +void sentinelSendPeriodicCommands(sentinelRedisInstance *ri) { + mstime_t now = mstime(); + mstime_t info_period, ping_period; + int retval; + + /* Return ASAP if we have already a PING or INFO already pending, or + * in the case the instance is not properly connected. */ + if (ri->link->disconnected) return; + + /* For INFO, PING, PUBLISH that are not critical commands to send we + * also have a limit of SENTINEL_MAX_PENDING_COMMANDS. We don't + * want to use a lot of memory just because a link is not working + * properly (note that anyway there is a redundant protection about this, + * that is, the link will be disconnected and reconnected if a long + * timeout condition is detected. */ + if (ri->link->pending_commands >= + SENTINEL_MAX_PENDING_COMMANDS * ri->link->refcount) return; + + /* If this is a slave of a master in O_DOWN condition we start sending + * it INFO every second, instead of the usual SENTINEL_INFO_PERIOD + * period. In this state we want to closely monitor slaves in case they + * are turned into masters by another Sentinel, or by the sysadmin. + * + * Similarly we monitor the INFO output more often if the slave reports + * to be disconnected from the master, so that we can have a fresh + * disconnection time figure. */ + if ((ri->flags & SRI_SLAVE) && + ((ri->master->flags & (SRI_O_DOWN|SRI_FAILOVER_IN_PROGRESS)) || + (ri->master_link_down_time != 0))) + { + info_period = 1000; + } else { + info_period = sentinel_info_period; + } + + /* We ping instances every time the last received pong is older than + * the configured 'down-after-milliseconds' time, but every second + * anyway if 'down-after-milliseconds' is greater than 1 second. */ + ping_period = ri->down_after_period; + if (ping_period > sentinel_ping_period) ping_period = sentinel_ping_period; + + /* Send INFO to masters and slaves, not sentinels. */ + if ((ri->flags & SRI_SENTINEL) == 0 && + (ri->info_refresh == 0 || + (now - ri->info_refresh) > info_period)) + { + retval = redisAsyncCommand(ri->link->cc, + sentinelInfoReplyCallback, ri, "%s", + sentinelInstanceMapCommand(ri,"INFO")); + if (retval == C_OK) ri->link->pending_commands++; + } + + /* Send PING to all the three kinds of instances. */ + if ((now - ri->link->last_pong_time) > ping_period && + (now - ri->link->last_ping_time) > ping_period/2) { + sentinelSendPing(ri); + } + + /* PUBLISH hello messages to all the three kinds of instances. */ + if ((now - ri->last_pub_time) > sentinel_publish_period) { + sentinelSendHello(ri); + } +} + +/* =========================== SENTINEL command ============================= */ +static void populateDict(dict *options_dict, char **options) { + for (int i=0; options[i]; i++) { + sds option = sdsnew(options[i]); + if (dictAdd(options_dict, option, NULL)==DICT_ERR) + sdsfree(option); + } +} + +const char* getLogLevel(void) { + switch (server.verbosity) { + case LL_DEBUG: return "debug"; + case LL_VERBOSE: return "verbose"; + case LL_NOTICE: return "notice"; + case LL_WARNING: return "warning"; + case LL_NOTHING: return "nothing"; + } + return "unknown"; +} + +/* SENTINEL CONFIG SET option value [option value ...] */ +void sentinelConfigSetCommand(client *c) { + long long numval; + int drop_conns = 0; + char *option; + robj *val; + char *options[] = { + "announce-ip", + "sentinel-user", + "sentinel-pass", + "resolve-hostnames", + "announce-port", + "announce-hostnames", + "loglevel", + NULL}; + static dict *options_dict = NULL; + if (!options_dict) { + options_dict = dictCreate(&stringSetDictType); + populateDict(options_dict, options); + } + dict *set_configs = dictCreate(&stringSetDictType); + + /* Validate arguments are valid */ + for (int i = 3; i < c->argc; i++) { + option = c->argv[i]->ptr; + + /* Validate option is valid */ + if (dictFind(options_dict, option) == NULL) { + addReplyErrorFormat(c, "Invalid argument '%s' to SENTINEL CONFIG SET", option); + goto exit; + } + + /* Check duplicates */ + if (dictFind(set_configs, option) != NULL) { + addReplyErrorFormat(c, "Duplicate argument '%s' to SENTINEL CONFIG SET", option); + goto exit; + } + + serverAssert(dictAdd(set_configs, sdsnew(option), NULL) == C_OK); + + /* Validate argument */ + if (i + 1 == c->argc) { + addReplyErrorFormat(c, "Missing argument '%s' value", option); + goto exit; + } + val = c->argv[++i]; + + if (!strcasecmp(option, "resolve-hostnames")) { + if ((yesnotoi(val->ptr)) == -1) goto badfmt; + } else if (!strcasecmp(option, "announce-hostnames")) { + if ((yesnotoi(val->ptr)) == -1) goto badfmt; + } else if (!strcasecmp(option, "announce-port")) { + if (getLongLongFromObject(val, &numval) == C_ERR || + numval < 0 || numval > 65535) goto badfmt; + } else if (!strcasecmp(option, "loglevel")) { + if (!(!strcasecmp(val->ptr, "debug") || !strcasecmp(val->ptr, "verbose") || + !strcasecmp(val->ptr, "notice") || !strcasecmp(val->ptr, "warning") || + !strcasecmp(val->ptr, "nothing"))) goto badfmt; + } + } + + /* Apply changes */ + for (int i = 3; i < c->argc; i++) { + int moreargs = (c->argc-1) - i; + option = c->argv[i]->ptr; + if (!strcasecmp(option, "loglevel") && moreargs > 0) { + val = c->argv[++i]; + if (!strcasecmp(val->ptr, "debug")) + server.verbosity = LL_DEBUG; + else if (!strcasecmp(val->ptr, "verbose")) + server.verbosity = LL_VERBOSE; + else if (!strcasecmp(val->ptr, "notice")) + server.verbosity = LL_NOTICE; + else if (!strcasecmp(val->ptr, "warning")) + server.verbosity = LL_WARNING; + else if (!strcasecmp(val->ptr, "nothing")) + server.verbosity = LL_NOTHING; + } else if (!strcasecmp(option, "resolve-hostnames") && moreargs > 0) { + val = c->argv[++i]; + numval = yesnotoi(val->ptr); + sentinel.resolve_hostnames = numval; + } else if (!strcasecmp(option, "announce-hostnames") && moreargs > 0) { + val = c->argv[++i]; + numval = yesnotoi(val->ptr); + sentinel.announce_hostnames = numval; + } else if (!strcasecmp(option, "announce-ip") && moreargs > 0) { + val = c->argv[++i]; + if (sentinel.announce_ip) sdsfree(sentinel.announce_ip); + sentinel.announce_ip = sdsnew(val->ptr); + } else if (!strcasecmp(option, "announce-port") && moreargs > 0) { + val = c->argv[++i]; + getLongLongFromObject(val, &numval); + sentinel.announce_port = numval; + } else if (!strcasecmp(option, "sentinel-user") && moreargs > 0) { + val = c->argv[++i]; + sdsfree(sentinel.sentinel_auth_user); + sentinel.sentinel_auth_user = sdslen(val->ptr) == 0 ? + NULL : sdsdup(val->ptr); + drop_conns = 1; + } else if (!strcasecmp(option, "sentinel-pass") && moreargs > 0) { + val = c->argv[++i]; + sdsfree(sentinel.sentinel_auth_pass); + sentinel.sentinel_auth_pass = sdslen(val->ptr) == 0 ? + NULL : sdsdup(val->ptr); + drop_conns = 1; + } else { + /* Should never reach here */ + serverAssert(0); + } + } + + sentinelFlushConfigAndReply(c); + + /* Drop Sentinel connections to initiate a reconnect if needed. */ + if (drop_conns) + sentinelDropConnections(); + +exit: + dictRelease(set_configs); + return; + +badfmt: + addReplyErrorFormat(c, "Invalid value '%s' to SENTINEL CONFIG SET '%s'", + (char *) val->ptr, option); + dictRelease(set_configs); +} + +/* SENTINEL CONFIG GET <option> [<option> ...] */ +void sentinelConfigGetCommand(client *c) { + char *pattern; + void *replylen = addReplyDeferredLen(c); + int matches = 0; + /* Create a dictionary to store the input configs,to avoid adding duplicate twice */ + dict *d = dictCreate(&externalStringType); + for (int i = 3; i < c->argc; i++) { + pattern = c->argv[i]->ptr; + /* If the string doesn't contain glob patterns and available in dictionary, don't look further, just continue. */ + if (!strpbrk(pattern, "[*?") && dictFind(d, pattern)) continue; + /* we want to print all the matched patterns and avoid printing duplicates twice */ + if (stringmatch(pattern,"resolve-hostnames",1) && !dictFind(d, "resolve-hostnames")) { + addReplyBulkCString(c,"resolve-hostnames"); + addReplyBulkCString(c,sentinel.resolve_hostnames ? "yes" : "no"); + dictAdd(d, "resolve-hostnames", NULL); + matches++; + } + if (stringmatch(pattern, "announce-hostnames", 1) && !dictFind(d, "announce-hostnames")) { + addReplyBulkCString(c,"announce-hostnames"); + addReplyBulkCString(c,sentinel.announce_hostnames ? "yes" : "no"); + dictAdd(d, "announce-hostnames", NULL); + matches++; + } + if (stringmatch(pattern, "announce-ip", 1) && !dictFind(d, "announce-ip")) { + addReplyBulkCString(c,"announce-ip"); + addReplyBulkCString(c,sentinel.announce_ip ? sentinel.announce_ip : ""); + dictAdd(d, "announce-ip", NULL); + matches++; + } + if (stringmatch(pattern, "announce-port", 1) && !dictFind(d, "announce-port")) { + addReplyBulkCString(c, "announce-port"); + addReplyBulkLongLong(c, sentinel.announce_port); + dictAdd(d, "announce-port", NULL); + matches++; + } + if (stringmatch(pattern, "sentinel-user", 1) && !dictFind(d, "sentinel-user")) { + addReplyBulkCString(c, "sentinel-user"); + addReplyBulkCString(c, sentinel.sentinel_auth_user ? sentinel.sentinel_auth_user : ""); + dictAdd(d, "sentinel-user", NULL); + matches++; + } + if (stringmatch(pattern, "sentinel-pass", 1) && !dictFind(d, "sentinel-pass")) { + addReplyBulkCString(c, "sentinel-pass"); + addReplyBulkCString(c, sentinel.sentinel_auth_pass ? sentinel.sentinel_auth_pass : ""); + dictAdd(d, "sentinel-pass", NULL); + matches++; + } + if (stringmatch(pattern, "loglevel", 1) && !dictFind(d, "loglevel")) { + addReplyBulkCString(c, "loglevel"); + addReplyBulkCString(c, getLogLevel()); + dictAdd(d, "loglevel", NULL); + matches++; + } + } + dictRelease(d); + setDeferredMapLen(c, replylen, matches); +} + +const char *sentinelFailoverStateStr(int state) { + switch(state) { + case SENTINEL_FAILOVER_STATE_NONE: return "none"; + case SENTINEL_FAILOVER_STATE_WAIT_START: return "wait_start"; + case SENTINEL_FAILOVER_STATE_SELECT_SLAVE: return "select_slave"; + case SENTINEL_FAILOVER_STATE_SEND_SLAVEOF_NOONE: return "send_slaveof_noone"; + case SENTINEL_FAILOVER_STATE_WAIT_PROMOTION: return "wait_promotion"; + case SENTINEL_FAILOVER_STATE_RECONF_SLAVES: return "reconf_slaves"; + case SENTINEL_FAILOVER_STATE_UPDATE_CONFIG: return "update_config"; + default: return "unknown"; + } +} + +/* Redis instance to Redis protocol representation. */ +void addReplySentinelRedisInstance(client *c, sentinelRedisInstance *ri) { + char *flags = sdsempty(); + void *mbl; + int fields = 0; + + mbl = addReplyDeferredLen(c); + + addReplyBulkCString(c,"name"); + addReplyBulkCString(c,ri->name); + fields++; + + addReplyBulkCString(c,"ip"); + addReplyBulkCString(c,announceSentinelAddr(ri->addr)); + fields++; + + addReplyBulkCString(c,"port"); + addReplyBulkLongLong(c,ri->addr->port); + fields++; + + addReplyBulkCString(c,"runid"); + addReplyBulkCString(c,ri->runid ? ri->runid : ""); + fields++; + + addReplyBulkCString(c,"flags"); + if (ri->flags & SRI_S_DOWN) flags = sdscat(flags,"s_down,"); + if (ri->flags & SRI_O_DOWN) flags = sdscat(flags,"o_down,"); + if (ri->flags & SRI_MASTER) flags = sdscat(flags,"master,"); + if (ri->flags & SRI_SLAVE) flags = sdscat(flags,"slave,"); + if (ri->flags & SRI_SENTINEL) flags = sdscat(flags,"sentinel,"); + if (ri->link->disconnected) flags = sdscat(flags,"disconnected,"); + if (ri->flags & SRI_MASTER_DOWN) flags = sdscat(flags,"master_down,"); + if (ri->flags & SRI_FAILOVER_IN_PROGRESS) + flags = sdscat(flags,"failover_in_progress,"); + if (ri->flags & SRI_PROMOTED) flags = sdscat(flags,"promoted,"); + if (ri->flags & SRI_RECONF_SENT) flags = sdscat(flags,"reconf_sent,"); + if (ri->flags & SRI_RECONF_INPROG) flags = sdscat(flags,"reconf_inprog,"); + if (ri->flags & SRI_RECONF_DONE) flags = sdscat(flags,"reconf_done,"); + if (ri->flags & SRI_FORCE_FAILOVER) flags = sdscat(flags,"force_failover,"); + if (ri->flags & SRI_SCRIPT_KILL_SENT) flags = sdscat(flags,"script_kill_sent,"); + if (ri->flags & SRI_MASTER_REBOOT) flags = sdscat(flags,"master_reboot,"); + + if (sdslen(flags) != 0) sdsrange(flags,0,-2); /* remove last "," */ + addReplyBulkCString(c,flags); + sdsfree(flags); + fields++; + + addReplyBulkCString(c,"link-pending-commands"); + addReplyBulkLongLong(c,ri->link->pending_commands); + fields++; + + addReplyBulkCString(c,"link-refcount"); + addReplyBulkLongLong(c,ri->link->refcount); + fields++; + + if (ri->flags & SRI_FAILOVER_IN_PROGRESS) { + addReplyBulkCString(c,"failover-state"); + addReplyBulkCString(c,(char*)sentinelFailoverStateStr(ri->failover_state)); + fields++; + } + + addReplyBulkCString(c,"last-ping-sent"); + addReplyBulkLongLong(c, + ri->link->act_ping_time ? (mstime() - ri->link->act_ping_time) : 0); + fields++; + + addReplyBulkCString(c,"last-ok-ping-reply"); + addReplyBulkLongLong(c,mstime() - ri->link->last_avail_time); + fields++; + + addReplyBulkCString(c,"last-ping-reply"); + addReplyBulkLongLong(c,mstime() - ri->link->last_pong_time); + fields++; + + if (ri->flags & SRI_S_DOWN) { + addReplyBulkCString(c,"s-down-time"); + addReplyBulkLongLong(c,mstime()-ri->s_down_since_time); + fields++; + } + + if (ri->flags & SRI_O_DOWN) { + addReplyBulkCString(c,"o-down-time"); + addReplyBulkLongLong(c,mstime()-ri->o_down_since_time); + fields++; + } + + addReplyBulkCString(c,"down-after-milliseconds"); + addReplyBulkLongLong(c,ri->down_after_period); + fields++; + + /* Masters and Slaves */ + if (ri->flags & (SRI_MASTER|SRI_SLAVE)) { + addReplyBulkCString(c,"info-refresh"); + addReplyBulkLongLong(c, + ri->info_refresh ? (mstime() - ri->info_refresh) : 0); + fields++; + + addReplyBulkCString(c,"role-reported"); + addReplyBulkCString(c, (ri->role_reported == SRI_MASTER) ? "master" : + "slave"); + fields++; + + addReplyBulkCString(c,"role-reported-time"); + addReplyBulkLongLong(c,mstime() - ri->role_reported_time); + fields++; + } + + /* Only masters */ + if (ri->flags & SRI_MASTER) { + addReplyBulkCString(c,"config-epoch"); + addReplyBulkLongLong(c,ri->config_epoch); + fields++; + + addReplyBulkCString(c,"num-slaves"); + addReplyBulkLongLong(c,dictSize(ri->slaves)); + fields++; + + addReplyBulkCString(c,"num-other-sentinels"); + addReplyBulkLongLong(c,dictSize(ri->sentinels)); + fields++; + + addReplyBulkCString(c,"quorum"); + addReplyBulkLongLong(c,ri->quorum); + fields++; + + addReplyBulkCString(c,"failover-timeout"); + addReplyBulkLongLong(c,ri->failover_timeout); + fields++; + + addReplyBulkCString(c,"parallel-syncs"); + addReplyBulkLongLong(c,ri->parallel_syncs); + fields++; + + if (ri->notification_script) { + addReplyBulkCString(c,"notification-script"); + addReplyBulkCString(c,ri->notification_script); + fields++; + } + + if (ri->client_reconfig_script) { + addReplyBulkCString(c,"client-reconfig-script"); + addReplyBulkCString(c,ri->client_reconfig_script); + fields++; + } + } + + /* Only slaves */ + if (ri->flags & SRI_SLAVE) { + addReplyBulkCString(c,"master-link-down-time"); + addReplyBulkLongLong(c,ri->master_link_down_time); + fields++; + + addReplyBulkCString(c,"master-link-status"); + addReplyBulkCString(c, + (ri->slave_master_link_status == SENTINEL_MASTER_LINK_STATUS_UP) ? + "ok" : "err"); + fields++; + + addReplyBulkCString(c,"master-host"); + addReplyBulkCString(c, + ri->slave_master_host ? ri->slave_master_host : "?"); + fields++; + + addReplyBulkCString(c,"master-port"); + addReplyBulkLongLong(c,ri->slave_master_port); + fields++; + + addReplyBulkCString(c,"slave-priority"); + addReplyBulkLongLong(c,ri->slave_priority); + fields++; + + addReplyBulkCString(c,"slave-repl-offset"); + addReplyBulkLongLong(c,ri->slave_repl_offset); + fields++; + + addReplyBulkCString(c,"replica-announced"); + addReplyBulkLongLong(c,ri->replica_announced); + fields++; + } + + /* Only sentinels */ + if (ri->flags & SRI_SENTINEL) { + addReplyBulkCString(c,"last-hello-message"); + addReplyBulkLongLong(c,mstime() - ri->last_hello_time); + fields++; + + addReplyBulkCString(c,"voted-leader"); + addReplyBulkCString(c,ri->leader ? ri->leader : "?"); + fields++; + + addReplyBulkCString(c,"voted-leader-epoch"); + addReplyBulkLongLong(c,ri->leader_epoch); + fields++; + } + + setDeferredMapLen(c,mbl,fields); +} + +void sentinelSetDebugConfigParameters(client *c){ + int j; + int badarg = 0; /* Bad argument position for error reporting. */ + char *option; + + /* Process option - value pairs. */ + for (j = 2; j < c->argc; j++) { + int moreargs = (c->argc-1) - j; + option = c->argv[j]->ptr; + long long ll; + + if (!strcasecmp(option,"info-period") && moreargs > 0) { + /* info-period <milliseconds> */ + robj *o = c->argv[++j]; + if (getLongLongFromObject(o,&ll) == C_ERR || ll <= 0) { + badarg = j; + goto badfmt; + } + sentinel_info_period = ll; + + } else if (!strcasecmp(option,"ping-period") && moreargs > 0) { + /* ping-period <milliseconds> */ + robj *o = c->argv[++j]; + if (getLongLongFromObject(o,&ll) == C_ERR || ll <= 0) { + badarg = j; + goto badfmt; + } + sentinel_ping_period = ll; + + } else if (!strcasecmp(option,"ask-period") && moreargs > 0) { + /* ask-period <milliseconds> */ + robj *o = c->argv[++j]; + if (getLongLongFromObject(o,&ll) == C_ERR || ll <= 0) { + badarg = j; + goto badfmt; + } + sentinel_ask_period = ll; + + } else if (!strcasecmp(option,"publish-period") && moreargs > 0) { + /* publish-period <milliseconds> */ + robj *o = c->argv[++j]; + if (getLongLongFromObject(o,&ll) == C_ERR || ll <= 0) { + badarg = j; + goto badfmt; + } + sentinel_publish_period = ll; + + } else if (!strcasecmp(option,"default-down-after") && moreargs > 0) { + /* default-down-after <milliseconds> */ + robj *o = c->argv[++j]; + if (getLongLongFromObject(o,&ll) == C_ERR || ll <= 0) { + badarg = j; + goto badfmt; + } + sentinel_default_down_after = ll; + + } else if (!strcasecmp(option,"tilt-trigger") && moreargs > 0) { + /* tilt-trigger <milliseconds> */ + robj *o = c->argv[++j]; + if (getLongLongFromObject(o,&ll) == C_ERR || ll <= 0) { + badarg = j; + goto badfmt; + } + sentinel_tilt_trigger = ll; + + } else if (!strcasecmp(option,"tilt-period") && moreargs > 0) { + /* tilt-period <milliseconds> */ + robj *o = c->argv[++j]; + if (getLongLongFromObject(o,&ll) == C_ERR || ll <= 0) { + badarg = j; + goto badfmt; + } + sentinel_tilt_period = ll; + + } else if (!strcasecmp(option,"slave-reconf-timeout") && moreargs > 0) { + /* slave-reconf-timeout <milliseconds> */ + robj *o = c->argv[++j]; + if (getLongLongFromObject(o,&ll) == C_ERR || ll <= 0) { + badarg = j; + goto badfmt; + } + sentinel_slave_reconf_timeout = ll; + + } else if (!strcasecmp(option,"min-link-reconnect-period") && moreargs > 0) { + /* min-link-reconnect-period <milliseconds> */ + robj *o = c->argv[++j]; + if (getLongLongFromObject(o,&ll) == C_ERR || ll <= 0) { + badarg = j; + goto badfmt; + } + sentinel_min_link_reconnect_period = ll; + + } else if (!strcasecmp(option,"default-failover-timeout") && moreargs > 0) { + /* default-failover-timeout <milliseconds> */ + robj *o = c->argv[++j]; + if (getLongLongFromObject(o,&ll) == C_ERR || ll <= 0) { + badarg = j; + goto badfmt; + } + sentinel_default_failover_timeout = ll; + + } else if (!strcasecmp(option,"election-timeout") && moreargs > 0) { + /* election-timeout <milliseconds> */ + robj *o = c->argv[++j]; + if (getLongLongFromObject(o,&ll) == C_ERR || ll <= 0) { + badarg = j; + goto badfmt; + } + sentinel_election_timeout = ll; + + } else if (!strcasecmp(option,"script-max-runtime") && moreargs > 0) { + /* script-max-runtime <milliseconds> */ + robj *o = c->argv[++j]; + if (getLongLongFromObject(o,&ll) == C_ERR || ll <= 0) { + badarg = j; + goto badfmt; + } + sentinel_script_max_runtime = ll; + + } else if (!strcasecmp(option,"script-retry-delay") && moreargs > 0) { + /* script-retry-delay <milliseconds> */ + robj *o = c->argv[++j]; + if (getLongLongFromObject(o,&ll) == C_ERR || ll <= 0) { + badarg = j; + goto badfmt; + } + sentinel_script_retry_delay = ll; + + } else { + addReplyErrorFormat(c,"Unknown option or number of arguments for " + "SENTINEL DEBUG '%s'", option); + return; + } + } + + addReply(c,shared.ok); + return; + +badfmt: /* Bad format errors */ + addReplyErrorFormat(c,"Invalid argument '%s' for SENTINEL DEBUG '%s'", + (char*)c->argv[badarg]->ptr,option); + + return; +} + +void addReplySentinelDebugInfo(client *c) { + void *mbl; + int fields = 0; + + mbl = addReplyDeferredLen(c); + + addReplyBulkCString(c,"INFO-PERIOD"); + addReplyBulkLongLong(c,sentinel_info_period); + fields++; + + addReplyBulkCString(c,"PING-PERIOD"); + addReplyBulkLongLong(c,sentinel_ping_period); + fields++; + + addReplyBulkCString(c,"ASK-PERIOD"); + addReplyBulkLongLong(c,sentinel_ask_period); + fields++; + + addReplyBulkCString(c,"PUBLISH-PERIOD"); + addReplyBulkLongLong(c,sentinel_publish_period); + fields++; + + addReplyBulkCString(c,"DEFAULT-DOWN-AFTER"); + addReplyBulkLongLong(c,sentinel_default_down_after); + fields++; + + addReplyBulkCString(c,"DEFAULT-FAILOVER-TIMEOUT"); + addReplyBulkLongLong(c,sentinel_default_failover_timeout); + fields++; + + addReplyBulkCString(c,"TILT-TRIGGER"); + addReplyBulkLongLong(c,sentinel_tilt_trigger); + fields++; + + addReplyBulkCString(c,"TILT-PERIOD"); + addReplyBulkLongLong(c,sentinel_tilt_period); + fields++; + + addReplyBulkCString(c,"SLAVE-RECONF-TIMEOUT"); + addReplyBulkLongLong(c,sentinel_slave_reconf_timeout); + fields++; + + addReplyBulkCString(c,"MIN-LINK-RECONNECT-PERIOD"); + addReplyBulkLongLong(c,sentinel_min_link_reconnect_period); + fields++; + + addReplyBulkCString(c,"ELECTION-TIMEOUT"); + addReplyBulkLongLong(c,sentinel_election_timeout); + fields++; + + addReplyBulkCString(c,"SCRIPT-MAX-RUNTIME"); + addReplyBulkLongLong(c,sentinel_script_max_runtime); + fields++; + + addReplyBulkCString(c,"SCRIPT-RETRY-DELAY"); + addReplyBulkLongLong(c,sentinel_script_retry_delay); + fields++; + + setDeferredMapLen(c,mbl,fields); +} + +/* Output a number of instances contained inside a dictionary as + * Redis protocol. */ +void addReplyDictOfRedisInstances(client *c, dict *instances) { + dictIterator *di; + dictEntry *de; + long slaves = 0; + void *replylen = addReplyDeferredLen(c); + + di = dictGetIterator(instances); + while((de = dictNext(di)) != NULL) { + sentinelRedisInstance *ri = dictGetVal(de); + + /* don't announce unannounced replicas */ + if (ri->flags & SRI_SLAVE && !ri->replica_announced) continue; + addReplySentinelRedisInstance(c,ri); + slaves++; + } + dictReleaseIterator(di); + setDeferredArrayLen(c, replylen, slaves); +} + +/* Lookup the named master into sentinel.masters. + * If the master is not found reply to the client with an error and returns + * NULL. */ +sentinelRedisInstance *sentinelGetMasterByNameOrReplyError(client *c, + robj *name) +{ + sentinelRedisInstance *ri; + + ri = dictFetchValue(sentinel.masters,name->ptr); + if (!ri) { + addReplyError(c,"No such master with that name"); + return NULL; + } + return ri; +} + +#define SENTINEL_ISQR_OK 0 +#define SENTINEL_ISQR_NOQUORUM (1<<0) +#define SENTINEL_ISQR_NOAUTH (1<<1) +int sentinelIsQuorumReachable(sentinelRedisInstance *master, int *usableptr) { + dictIterator *di; + dictEntry *de; + int usable = 1; /* Number of usable Sentinels. Init to 1 to count myself. */ + int result = SENTINEL_ISQR_OK; + int voters = dictSize(master->sentinels)+1; /* Known Sentinels + myself. */ + + di = dictGetIterator(master->sentinels); + while((de = dictNext(di)) != NULL) { + sentinelRedisInstance *ri = dictGetVal(de); + + if (ri->flags & (SRI_S_DOWN|SRI_O_DOWN)) continue; + usable++; + } + dictReleaseIterator(di); + + if (usable < (int)master->quorum) result |= SENTINEL_ISQR_NOQUORUM; + if (usable < voters/2+1) result |= SENTINEL_ISQR_NOAUTH; + if (usableptr) *usableptr = usable; + return result; +} + +void sentinelCommand(client *c) { + if (c->argc == 2 && !strcasecmp(c->argv[1]->ptr,"help")) { + const char *help[] = { +"CKQUORUM <master-name>", +" Check if the current Sentinel configuration is able to reach the quorum", +" needed to failover a master and the majority needed to authorize the", +" failover.", +"CONFIG SET param value [param value ...]", +" Set a global Sentinel configuration parameter.", +"CONFIG GET <param> [param param param ...]", +" Get global Sentinel configuration parameter.", +"DEBUG [<param> <value> ...]", +" Show a list of configurable time parameters and their values (milliseconds).", +" Or update current configurable parameters values (one or more).", +"GET-MASTER-ADDR-BY-NAME <master-name>", +" Return the ip and port number of the master with that name.", +"FAILOVER <master-name>", +" Manually failover a master node without asking for agreement from other", +" Sentinels", +"FLUSHCONFIG", +" Force Sentinel to rewrite its configuration on disk, including the current", +" Sentinel state.", +"INFO-CACHE <master-name>", +" Return last cached INFO output from masters and all its replicas.", +"IS-MASTER-DOWN-BY-ADDR <ip> <port> <current-epoch> <runid>", +" Check if the master specified by ip:port is down from current Sentinel's", +" point of view.", +"MASTER <master-name>", +" Show the state and info of the specified master.", +"MASTERS", +" Show a list of monitored masters and their state.", +"MONITOR <name> <ip> <port> <quorum>", +" Start monitoring a new master with the specified name, ip, port and quorum.", +"MYID", +" Return the ID of the Sentinel instance.", +"PENDING-SCRIPTS", +" Get pending scripts information.", +"REMOVE <master-name>", +" Remove master from Sentinel's monitor list.", +"REPLICAS <master-name>", +" Show a list of replicas for this master and their state.", +"RESET <pattern>", +" Reset masters for specific master name matching this pattern.", +"SENTINELS <master-name>", +" Show a list of Sentinel instances for this master and their state.", +"SET <master-name> <option> <value> [<option> <value> ...]", +" Set configuration parameters for certain masters.", +"SIMULATE-FAILURE [CRASH-AFTER-ELECTION] [CRASH-AFTER-PROMOTION] [HELP]", +" Simulate a Sentinel crash.", +NULL + }; + addReplyHelp(c, help); + } else if (!strcasecmp(c->argv[1]->ptr,"masters")) { + /* SENTINEL MASTERS */ + if (c->argc != 2) goto numargserr; + addReplyDictOfRedisInstances(c,sentinel.masters); + } else if (!strcasecmp(c->argv[1]->ptr,"master")) { + /* SENTINEL MASTER <name> */ + sentinelRedisInstance *ri; + + if (c->argc != 3) goto numargserr; + if ((ri = sentinelGetMasterByNameOrReplyError(c,c->argv[2])) + == NULL) return; + addReplySentinelRedisInstance(c,ri); + } else if (!strcasecmp(c->argv[1]->ptr,"slaves") || + !strcasecmp(c->argv[1]->ptr,"replicas")) + { + /* SENTINEL REPLICAS <master-name> */ + sentinelRedisInstance *ri; + + if (c->argc != 3) goto numargserr; + if ((ri = sentinelGetMasterByNameOrReplyError(c,c->argv[2])) == NULL) + return; + addReplyDictOfRedisInstances(c,ri->slaves); + } else if (!strcasecmp(c->argv[1]->ptr,"sentinels")) { + /* SENTINEL SENTINELS <master-name> */ + sentinelRedisInstance *ri; + + if (c->argc != 3) goto numargserr; + if ((ri = sentinelGetMasterByNameOrReplyError(c,c->argv[2])) == NULL) + return; + addReplyDictOfRedisInstances(c,ri->sentinels); + } else if (!strcasecmp(c->argv[1]->ptr,"myid") && c->argc == 2) { + /* SENTINEL MYID */ + addReplyBulkCBuffer(c,sentinel.myid,CONFIG_RUN_ID_SIZE); + } else if (!strcasecmp(c->argv[1]->ptr,"is-master-down-by-addr")) { + /* SENTINEL IS-MASTER-DOWN-BY-ADDR <ip> <port> <current-epoch> <runid> + * + * Arguments: + * + * ip and port are the ip and port of the master we want to be + * checked by Sentinel. Note that the command will not check by + * name but just by master, in theory different Sentinels may monitor + * different masters with the same name. + * + * current-epoch is needed in order to understand if we are allowed + * to vote for a failover leader or not. Each Sentinel can vote just + * one time per epoch. + * + * runid is "*" if we are not seeking for a vote from the Sentinel + * in order to elect the failover leader. Otherwise it is set to the + * runid we want the Sentinel to vote if it did not already voted. + */ + sentinelRedisInstance *ri; + long long req_epoch; + uint64_t leader_epoch = 0; + char *leader = NULL; + long port; + int isdown = 0; + + if (c->argc != 6) goto numargserr; + if (getLongFromObjectOrReply(c,c->argv[3],&port,NULL) != C_OK || + getLongLongFromObjectOrReply(c,c->argv[4],&req_epoch,NULL) + != C_OK) + return; + ri = getSentinelRedisInstanceByAddrAndRunID(sentinel.masters, + c->argv[2]->ptr,port,NULL); + + /* It exists? Is actually a master? Is subjectively down? It's down. + * Note: if we are in tilt mode we always reply with "0". */ + if (!sentinel.tilt && ri && (ri->flags & SRI_S_DOWN) && + (ri->flags & SRI_MASTER)) + isdown = 1; + + /* Vote for the master (or fetch the previous vote) if the request + * includes a runid, otherwise the sender is not seeking for a vote. */ + if (ri && ri->flags & SRI_MASTER && strcasecmp(c->argv[5]->ptr,"*")) { + leader = sentinelVoteLeader(ri,(uint64_t)req_epoch, + c->argv[5]->ptr, + &leader_epoch); + } + + /* Reply with a three-elements multi-bulk reply: + * down state, leader, vote epoch. */ + addReplyArrayLen(c,3); + addReply(c, isdown ? shared.cone : shared.czero); + addReplyBulkCString(c, leader ? leader : "*"); + addReplyLongLong(c, (long long)leader_epoch); + if (leader) sdsfree(leader); + } else if (!strcasecmp(c->argv[1]->ptr,"reset")) { + /* SENTINEL RESET <pattern> */ + if (c->argc != 3) goto numargserr; + addReplyLongLong(c,sentinelResetMastersByPattern(c->argv[2]->ptr,SENTINEL_GENERATE_EVENT)); + } else if (!strcasecmp(c->argv[1]->ptr,"get-master-addr-by-name")) { + /* SENTINEL GET-MASTER-ADDR-BY-NAME <master-name> */ + sentinelRedisInstance *ri; + + if (c->argc != 3) goto numargserr; + ri = sentinelGetMasterByName(c->argv[2]->ptr); + if (ri == NULL) { + addReplyNullArray(c); + } else { + sentinelAddr *addr = sentinelGetCurrentMasterAddress(ri); + + addReplyArrayLen(c,2); + addReplyBulkCString(c,announceSentinelAddr(addr)); + addReplyBulkLongLong(c,addr->port); + } + } else if (!strcasecmp(c->argv[1]->ptr,"failover")) { + /* SENTINEL FAILOVER <master-name> */ + sentinelRedisInstance *ri; + + if (c->argc != 3) goto numargserr; + if ((ri = sentinelGetMasterByNameOrReplyError(c,c->argv[2])) == NULL) + return; + if (ri->flags & SRI_FAILOVER_IN_PROGRESS) { + addReplyError(c,"-INPROG Failover already in progress"); + return; + } + if (sentinelSelectSlave(ri) == NULL) { + addReplyError(c,"-NOGOODSLAVE No suitable replica to promote"); + return; + } + serverLog(LL_NOTICE,"Executing user requested FAILOVER of '%s'", + ri->name); + sentinelStartFailover(ri); + ri->flags |= SRI_FORCE_FAILOVER; + addReply(c,shared.ok); + } else if (!strcasecmp(c->argv[1]->ptr,"pending-scripts")) { + /* SENTINEL PENDING-SCRIPTS */ + + if (c->argc != 2) goto numargserr; + sentinelPendingScriptsCommand(c); + } else if (!strcasecmp(c->argv[1]->ptr,"monitor")) { + /* SENTINEL MONITOR <name> <ip> <port> <quorum> */ + sentinelRedisInstance *ri; + long quorum, port; + char ip[NET_IP_STR_LEN]; + + if (c->argc != 6) goto numargserr; + if (getLongFromObjectOrReply(c,c->argv[5],&quorum,"Invalid quorum") + != C_OK) return; + if (getLongFromObjectOrReply(c,c->argv[4],&port,"Invalid port") + != C_OK) return; + + if (quorum <= 0) { + addReplyError(c, "Quorum must be 1 or greater."); + return; + } + + /* If resolve-hostnames is used, actual DNS resolution may take place. + * Otherwise just validate address. + */ + if (anetResolve(NULL,c->argv[3]->ptr,ip,sizeof(ip), + sentinel.resolve_hostnames ? ANET_NONE : ANET_IP_ONLY) == ANET_ERR) { + addReplyError(c, "Invalid IP address or hostname specified"); + return; + } + + /* Parameters are valid. Try to create the master instance. */ + ri = createSentinelRedisInstance(c->argv[2]->ptr,SRI_MASTER, + c->argv[3]->ptr,port,quorum,NULL); + if (ri == NULL) { + addReplyError(c,sentinelCheckCreateInstanceErrors(SRI_MASTER)); + } else { + sentinelFlushConfigAndReply(c); + sentinelEvent(LL_WARNING,"+monitor",ri,"%@ quorum %d",ri->quorum); + } + } else if (!strcasecmp(c->argv[1]->ptr,"flushconfig")) { + if (c->argc != 2) goto numargserr; + sentinelFlushConfigAndReply(c); + return; + } else if (!strcasecmp(c->argv[1]->ptr,"remove")) { + /* SENTINEL REMOVE <name> */ + sentinelRedisInstance *ri; + + if (c->argc != 3) goto numargserr; + if ((ri = sentinelGetMasterByNameOrReplyError(c,c->argv[2])) + == NULL) return; + sentinelEvent(LL_WARNING,"-monitor",ri,"%@"); + dictDelete(sentinel.masters,c->argv[2]->ptr); + sentinelFlushConfigAndReply(c); + } else if (!strcasecmp(c->argv[1]->ptr,"ckquorum")) { + /* SENTINEL CKQUORUM <name> */ + sentinelRedisInstance *ri; + int usable; + + if (c->argc != 3) goto numargserr; + if ((ri = sentinelGetMasterByNameOrReplyError(c,c->argv[2])) + == NULL) return; + int result = sentinelIsQuorumReachable(ri,&usable); + if (result == SENTINEL_ISQR_OK) { + addReplySds(c, sdscatfmt(sdsempty(), + "+OK %i usable Sentinels. Quorum and failover authorization " + "can be reached\r\n",usable)); + } else { + sds e = sdscatfmt(sdsempty(), + "-NOQUORUM %i usable Sentinels. ",usable); + if (result & SENTINEL_ISQR_NOQUORUM) + e = sdscat(e,"Not enough available Sentinels to reach the" + " specified quorum for this master"); + if (result & SENTINEL_ISQR_NOAUTH) { + if (result & SENTINEL_ISQR_NOQUORUM) e = sdscat(e,". "); + e = sdscat(e, "Not enough available Sentinels to reach the" + " majority and authorize a failover"); + } + addReplyErrorSds(c,e); + } + } else if (!strcasecmp(c->argv[1]->ptr,"set")) { + sentinelSetCommand(c); + } else if (!strcasecmp(c->argv[1]->ptr,"config")) { + if (c->argc < 4) goto numargserr; + if (!strcasecmp(c->argv[2]->ptr,"set") && c->argc >= 5) + sentinelConfigSetCommand(c); + else if (!strcasecmp(c->argv[2]->ptr,"get") && c->argc >= 4) + sentinelConfigGetCommand(c); + else + addReplyError(c, "Only SENTINEL CONFIG GET <param> [<param> <param> ...]/ SET <param> <value> [<param> <value> ...] are supported."); + } else if (!strcasecmp(c->argv[1]->ptr,"info-cache")) { + /* SENTINEL INFO-CACHE <name> */ + if (c->argc < 2) goto numargserr; + mstime_t now = mstime(); + + /* Create an ad-hoc dictionary type so that we can iterate + * a dictionary composed of just the master groups the user + * requested. */ + dictType copy_keeper = instancesDictType; + copy_keeper.valDestructor = NULL; + dict *masters_local = sentinel.masters; + if (c->argc > 2) { + masters_local = dictCreate(©_keeper); + + for (int i = 2; i < c->argc; i++) { + sentinelRedisInstance *ri; + ri = sentinelGetMasterByName(c->argv[i]->ptr); + if (!ri) continue; /* ignore non-existing names */ + dictAdd(masters_local, ri->name, ri); + } + } + + /* Reply format: + * 1) master name + * 2) 1) 1) info cached ms + * 2) info from master + * 2) 1) info cached ms + * 2) info from replica1 + * ... + * 3) other master name + * ... + * ... + */ + addReplyArrayLen(c,dictSize(masters_local) * 2); + + dictIterator *di; + dictEntry *de; + di = dictGetIterator(masters_local); + while ((de = dictNext(di)) != NULL) { + sentinelRedisInstance *ri = dictGetVal(de); + addReplyBulkCBuffer(c,ri->name,strlen(ri->name)); + addReplyArrayLen(c,dictSize(ri->slaves) + 1); /* +1 for self */ + addReplyArrayLen(c,2); + addReplyLongLong(c, + ri->info_refresh ? (now - ri->info_refresh) : 0); + if (ri->info) + addReplyBulkCBuffer(c,ri->info,sdslen(ri->info)); + else + addReplyNull(c); + + dictIterator *sdi; + dictEntry *sde; + sdi = dictGetIterator(ri->slaves); + while ((sde = dictNext(sdi)) != NULL) { + sentinelRedisInstance *sri = dictGetVal(sde); + addReplyArrayLen(c,2); + addReplyLongLong(c, + ri->info_refresh ? (now - sri->info_refresh) : 0); + if (sri->info) + addReplyBulkCBuffer(c,sri->info,sdslen(sri->info)); + else + addReplyNull(c); + } + dictReleaseIterator(sdi); + } + dictReleaseIterator(di); + if (masters_local != sentinel.masters) dictRelease(masters_local); + } else if (!strcasecmp(c->argv[1]->ptr,"simulate-failure")) { + /* SENTINEL SIMULATE-FAILURE [CRASH-AFTER-ELECTION] [CRASH-AFTER-PROMOTION] [HELP] */ + int j; + + sentinel.simfailure_flags = SENTINEL_SIMFAILURE_NONE; + for (j = 2; j < c->argc; j++) { + if (!strcasecmp(c->argv[j]->ptr,"crash-after-election")) { + sentinel.simfailure_flags |= + SENTINEL_SIMFAILURE_CRASH_AFTER_ELECTION; + serverLog(LL_WARNING,"Failure simulation: this Sentinel " + "will crash after being successfully elected as failover " + "leader"); + } else if (!strcasecmp(c->argv[j]->ptr,"crash-after-promotion")) { + sentinel.simfailure_flags |= + SENTINEL_SIMFAILURE_CRASH_AFTER_PROMOTION; + serverLog(LL_WARNING,"Failure simulation: this Sentinel " + "will crash after promoting the selected replica to master"); + } else if (!strcasecmp(c->argv[j]->ptr,"help")) { + addReplyArrayLen(c,2); + addReplyBulkCString(c,"crash-after-election"); + addReplyBulkCString(c,"crash-after-promotion"); + return; + } else { + addReplyError(c,"Unknown failure simulation specified"); + return; + } + } + addReply(c,shared.ok); + } else if (!strcasecmp(c->argv[1]->ptr,"debug")) { + if(c->argc == 2) + addReplySentinelDebugInfo(c); + else + sentinelSetDebugConfigParameters(c); + } + else { + addReplySubcommandSyntaxError(c); + } + return; + +numargserr: + addReplyErrorArity(c); +} + +void addInfoSectionsToDict(dict *section_dict, char **sections); + +/* INFO [<section> [<section> ...]] */ +void sentinelInfoCommand(client *c) { + char *sentinel_sections[] = {"server", "clients", "cpu", "stats", "sentinel", NULL}; + int sec_all = 0, sec_everything = 0; + static dict *cached_all_info_sections = NULL; + + /* Get requested section list. */ + dict *sections_dict = genInfoSectionDict(c->argv+1, c->argc-1, sentinel_sections, &sec_all, &sec_everything); + + /* Purge unsupported sections from the requested ones. */ + dictEntry *de; + dictIterator *di = dictGetSafeIterator(sections_dict); + while((de = dictNext(di)) != NULL) { + int i; + sds sec = dictGetKey(de); + for (i=0; sentinel_sections[i]; i++) + if (!strcasecmp(sentinel_sections[i], sec)) + break; + /* section not found? remove it */ + if (!sentinel_sections[i]) + dictDelete(sections_dict, sec); + } + dictReleaseIterator(di); + + /* Insert explicit all sections (don't pass these vars to genRedisInfoString) */ + if (sec_all || sec_everything) { + releaseInfoSectionDict(sections_dict); + /* We cache this dict as an optimization. */ + if (!cached_all_info_sections) { + cached_all_info_sections = dictCreate(&stringSetDictType); + addInfoSectionsToDict(cached_all_info_sections, sentinel_sections); + } + sections_dict = cached_all_info_sections; + } + + sds info = genRedisInfoString(sections_dict, 0, 0); + if (sec_all || (dictFind(sections_dict, "sentinel") != NULL)) { + dictIterator *di; + dictEntry *de; + int master_id = 0; + + if (sdslen(info) != 0) + info = sdscat(info,"\r\n"); + info = sdscatprintf(info, + "# Sentinel\r\n" + "sentinel_masters:%lu\r\n" + "sentinel_tilt:%d\r\n" + "sentinel_tilt_since_seconds:%jd\r\n" + "sentinel_running_scripts:%d\r\n" + "sentinel_scripts_queue_length:%ld\r\n" + "sentinel_simulate_failure_flags:%lu\r\n", + dictSize(sentinel.masters), + sentinel.tilt, + sentinel.tilt ? (intmax_t)((mstime()-sentinel.tilt_start_time)/1000) : -1, + sentinel.running_scripts, + listLength(sentinel.scripts_queue), + sentinel.simfailure_flags); + + di = dictGetIterator(sentinel.masters); + while((de = dictNext(di)) != NULL) { + sentinelRedisInstance *ri = dictGetVal(de); + char *status = "ok"; + + if (ri->flags & SRI_O_DOWN) status = "odown"; + else if (ri->flags & SRI_S_DOWN) status = "sdown"; + info = sdscatprintf(info, + "master%d:name=%s,status=%s,address=%s:%d," + "slaves=%lu,sentinels=%lu\r\n", + master_id++, ri->name, status, + announceSentinelAddr(ri->addr), ri->addr->port, + dictSize(ri->slaves), + dictSize(ri->sentinels)+1); + } + dictReleaseIterator(di); + } + if (sections_dict != cached_all_info_sections) + releaseInfoSectionDict(sections_dict); + addReplyBulkSds(c, info); +} + +/* Implements Sentinel version of the ROLE command. The output is + * "sentinel" and the list of currently monitored master names. */ +void sentinelRoleCommand(client *c) { + dictIterator *di; + dictEntry *de; + + addReplyArrayLen(c,2); + addReplyBulkCBuffer(c,"sentinel",8); + addReplyArrayLen(c,dictSize(sentinel.masters)); + + di = dictGetIterator(sentinel.masters); + while((de = dictNext(di)) != NULL) { + sentinelRedisInstance *ri = dictGetVal(de); + + addReplyBulkCString(c,ri->name); + } + dictReleaseIterator(di); +} + +/* SENTINEL SET <mastername> [<option> <value> ...] */ +void sentinelSetCommand(client *c) { + sentinelRedisInstance *ri; + int j, changes = 0; + int badarg = 0; /* Bad argument position for error reporting. */ + char *option; + int redacted; + + if ((ri = sentinelGetMasterByNameOrReplyError(c,c->argv[2])) + == NULL) return; + + /* Process option - value pairs. */ + for (j = 3; j < c->argc; j++) { + int moreargs = (c->argc-1) - j; + option = c->argv[j]->ptr; + long long ll; + int old_j = j; /* Used to know what to log as an event. */ + redacted = 0; + + if (!strcasecmp(option,"down-after-milliseconds") && moreargs > 0) { + /* down-after-milliseconds <milliseconds> */ + robj *o = c->argv[++j]; + if (getLongLongFromObject(o,&ll) == C_ERR || ll <= 0) { + badarg = j; + goto badfmt; + } + ri->down_after_period = ll; + sentinelPropagateDownAfterPeriod(ri); + changes++; + } else if (!strcasecmp(option,"failover-timeout") && moreargs > 0) { + /* failover-timeout <milliseconds> */ + robj *o = c->argv[++j]; + if (getLongLongFromObject(o,&ll) == C_ERR || ll <= 0) { + badarg = j; + goto badfmt; + } + ri->failover_timeout = ll; + changes++; + } else if (!strcasecmp(option,"parallel-syncs") && moreargs > 0) { + /* parallel-syncs <milliseconds> */ + robj *o = c->argv[++j]; + if (getLongLongFromObject(o,&ll) == C_ERR || ll <= 0) { + badarg = j; + goto badfmt; + } + ri->parallel_syncs = ll; + changes++; + } else if (!strcasecmp(option,"notification-script") && moreargs > 0) { + /* notification-script <path> */ + char *value = c->argv[++j]->ptr; + if (sentinel.deny_scripts_reconfig) { + addReplyError(c, + "Reconfiguration of scripts path is denied for " + "security reasons. Check the deny-scripts-reconfig " + "configuration directive in your Sentinel configuration"); + goto seterr; + } + + if (strlen(value) && access(value,X_OK) == -1) { + addReplyError(c, + "Notification script seems non existing or non executable"); + goto seterr; + } + sdsfree(ri->notification_script); + ri->notification_script = strlen(value) ? sdsnew(value) : NULL; + changes++; + } else if (!strcasecmp(option,"client-reconfig-script") && moreargs > 0) { + /* client-reconfig-script <path> */ + char *value = c->argv[++j]->ptr; + if (sentinel.deny_scripts_reconfig) { + addReplyError(c, + "Reconfiguration of scripts path is denied for " + "security reasons. Check the deny-scripts-reconfig " + "configuration directive in your Sentinel configuration"); + goto seterr; + } + + if (strlen(value) && access(value,X_OK) == -1) { + addReplyError(c, + "Client reconfiguration script seems non existing or " + "non executable"); + goto seterr; + } + sdsfree(ri->client_reconfig_script); + ri->client_reconfig_script = strlen(value) ? sdsnew(value) : NULL; + changes++; + } else if (!strcasecmp(option,"auth-pass") && moreargs > 0) { + /* auth-pass <password> */ + char *value = c->argv[++j]->ptr; + sdsfree(ri->auth_pass); + ri->auth_pass = strlen(value) ? sdsnew(value) : NULL; + dropInstanceConnections(ri); + changes++; + redacted = 1; + } else if (!strcasecmp(option,"auth-user") && moreargs > 0) { + /* auth-user <username> */ + char *value = c->argv[++j]->ptr; + sdsfree(ri->auth_user); + ri->auth_user = strlen(value) ? sdsnew(value) : NULL; + dropInstanceConnections(ri); + changes++; + } else if (!strcasecmp(option,"quorum") && moreargs > 0) { + /* quorum <count> */ + robj *o = c->argv[++j]; + if (getLongLongFromObject(o,&ll) == C_ERR || ll <= 0) { + badarg = j; + goto badfmt; + } + ri->quorum = ll; + changes++; + } else if (!strcasecmp(option,"rename-command") && moreargs > 1) { + /* rename-command <oldname> <newname> */ + sds oldname = c->argv[++j]->ptr; + sds newname = c->argv[++j]->ptr; + + if ((sdslen(oldname) == 0) || (sdslen(newname) == 0)) { + badarg = sdslen(newname) ? j-1 : j; + goto badfmt; + } + + /* Remove any older renaming for this command. */ + dictDelete(ri->renamed_commands,oldname); + + /* If the target name is the same as the source name there + * is no need to add an entry mapping to itself. */ + if (!dictSdsKeyCaseCompare(ri->renamed_commands,oldname,newname)) { + oldname = sdsdup(oldname); + newname = sdsdup(newname); + dictAdd(ri->renamed_commands,oldname,newname); + } + changes++; + } else if (!strcasecmp(option,"master-reboot-down-after-period") && moreargs > 0) { + /* master-reboot-down-after-period <milliseconds> */ + robj *o = c->argv[++j]; + if (getLongLongFromObject(o,&ll) == C_ERR || ll < 0) { + badarg = j; + goto badfmt; + } + ri->master_reboot_down_after_period = ll; + changes++; + } else { + addReplyErrorFormat(c,"Unknown option or number of arguments for " + "SENTINEL SET '%s'", option); + goto seterr; + } + + /* Log the event. */ + int numargs = j-old_j+1; + switch(numargs) { + case 2: + sentinelEvent(LL_WARNING,"+set",ri,"%@ %s %s",(char*)c->argv[old_j]->ptr, + redacted ? "******" : (char*)c->argv[old_j+1]->ptr); + break; + case 3: + sentinelEvent(LL_WARNING,"+set",ri,"%@ %s %s %s",(char*)c->argv[old_j]->ptr, + (char*)c->argv[old_j+1]->ptr, + (char*)c->argv[old_j+2]->ptr); + break; + default: + sentinelEvent(LL_WARNING,"+set",ri,"%@ %s",(char*)c->argv[old_j]->ptr); + break; + } + } + if (changes) sentinelFlushConfigAndReply(c); + return; + +badfmt: /* Bad format errors */ + addReplyErrorFormat(c,"Invalid argument '%s' for SENTINEL SET '%s'", + (char*)c->argv[badarg]->ptr,option); +seterr: + /* TODO: Handle the case of both bad input and save error, possibly handling + * SENTINEL SET atomically. */ + if (changes) sentinelFlushConfig(); +} + +/* Our fake PUBLISH command: it is actually useful only to receive hello messages + * from the other sentinel instances, and publishing to a channel other than + * SENTINEL_HELLO_CHANNEL is forbidden. + * + * Because we have a Sentinel PUBLISH, the code to send hello messages is the same + * for all the three kind of instances: masters, slaves, sentinels. */ +void sentinelPublishCommand(client *c) { + if (strcmp(c->argv[1]->ptr,SENTINEL_HELLO_CHANNEL)) { + addReplyError(c, "Only HELLO messages are accepted by Sentinel instances."); + return; + } + sentinelProcessHelloMessage(c->argv[2]->ptr,sdslen(c->argv[2]->ptr)); + addReplyLongLong(c,1); +} + +/* ===================== SENTINEL availability checks ======================= */ + +/* Is this instance down from our point of view? */ +void sentinelCheckSubjectivelyDown(sentinelRedisInstance *ri) { + mstime_t elapsed = 0; + + if (ri->link->act_ping_time) + elapsed = mstime() - ri->link->act_ping_time; + else if (ri->link->disconnected) + elapsed = mstime() - ri->link->last_avail_time; + + /* Check if we are in need for a reconnection of one of the + * links, because we are detecting low activity. + * + * 1) Check if the command link seems connected, was connected not less + * than SENTINEL_MIN_LINK_RECONNECT_PERIOD, but still we have a + * pending ping for more than half the timeout. */ + if (ri->link->cc && + (mstime() - ri->link->cc_conn_time) > + sentinel_min_link_reconnect_period && + ri->link->act_ping_time != 0 && /* There is a pending ping... */ + /* The pending ping is delayed, and we did not receive + * error replies as well. */ + (mstime() - ri->link->act_ping_time) > (ri->down_after_period/2) && + (mstime() - ri->link->last_pong_time) > (ri->down_after_period/2)) + { + instanceLinkCloseConnection(ri->link,ri->link->cc); + } + + /* 2) Check if the pubsub link seems connected, was connected not less + * than SENTINEL_MIN_LINK_RECONNECT_PERIOD, but still we have no + * activity in the Pub/Sub channel for more than + * SENTINEL_PUBLISH_PERIOD * 3. + */ + if (ri->link->pc && + (mstime() - ri->link->pc_conn_time) > + sentinel_min_link_reconnect_period && + (mstime() - ri->link->pc_last_activity) > (sentinel_publish_period*3)) + { + instanceLinkCloseConnection(ri->link,ri->link->pc); + } + + /* Update the SDOWN flag. We believe the instance is SDOWN if: + * + * 1) It is not replying. + * 2) We believe it is a master, it reports to be a slave for enough time + * to meet the down_after_period, plus enough time to get two times + * INFO report from the instance. */ + if (elapsed > ri->down_after_period || + (ri->flags & SRI_MASTER && + ri->role_reported == SRI_SLAVE && + mstime() - ri->role_reported_time > + (ri->down_after_period+sentinel_info_period*2)) || + (ri->flags & SRI_MASTER_REBOOT && + mstime()-ri->master_reboot_since_time > ri->master_reboot_down_after_period)) + { + /* Is subjectively down */ + if ((ri->flags & SRI_S_DOWN) == 0) { + sentinelEvent(LL_WARNING,"+sdown",ri,"%@"); + ri->s_down_since_time = mstime(); + ri->flags |= SRI_S_DOWN; + } + } else { + /* Is subjectively up */ + if (ri->flags & SRI_S_DOWN) { + sentinelEvent(LL_WARNING,"-sdown",ri,"%@"); + ri->flags &= ~(SRI_S_DOWN|SRI_SCRIPT_KILL_SENT); + } + } +} + +/* Is this instance down according to the configured quorum? + * + * Note that ODOWN is a weak quorum, it only means that enough Sentinels + * reported in a given time range that the instance was not reachable. + * However messages can be delayed so there are no strong guarantees about + * N instances agreeing at the same time about the down state. */ +void sentinelCheckObjectivelyDown(sentinelRedisInstance *master) { + dictIterator *di; + dictEntry *de; + unsigned int quorum = 0, odown = 0; + + if (master->flags & SRI_S_DOWN) { + /* Is down for enough sentinels? */ + quorum = 1; /* the current sentinel. */ + /* Count all the other sentinels. */ + di = dictGetIterator(master->sentinels); + while((de = dictNext(di)) != NULL) { + sentinelRedisInstance *ri = dictGetVal(de); + + if (ri->flags & SRI_MASTER_DOWN) quorum++; + } + dictReleaseIterator(di); + if (quorum >= master->quorum) odown = 1; + } + + /* Set the flag accordingly to the outcome. */ + if (odown) { + if ((master->flags & SRI_O_DOWN) == 0) { + sentinelEvent(LL_WARNING,"+odown",master,"%@ #quorum %d/%d", + quorum, master->quorum); + master->flags |= SRI_O_DOWN; + master->o_down_since_time = mstime(); + } + } else { + if (master->flags & SRI_O_DOWN) { + sentinelEvent(LL_WARNING,"-odown",master,"%@"); + master->flags &= ~SRI_O_DOWN; + } + } +} + +/* Receive the SENTINEL is-master-down-by-addr reply, see the + * sentinelAskMasterStateToOtherSentinels() function for more information. */ +void sentinelReceiveIsMasterDownReply(redisAsyncContext *c, void *reply, void *privdata) { + sentinelRedisInstance *ri = privdata; + instanceLink *link = c->data; + redisReply *r; + + if (!reply || !link) return; + link->pending_commands--; + r = reply; + + /* Ignore every error or unexpected reply. + * Note that if the command returns an error for any reason we'll + * end clearing the SRI_MASTER_DOWN flag for timeout anyway. */ + if (r->type == REDIS_REPLY_ARRAY && r->elements == 3 && + r->element[0]->type == REDIS_REPLY_INTEGER && + r->element[1]->type == REDIS_REPLY_STRING && + r->element[2]->type == REDIS_REPLY_INTEGER) + { + ri->last_master_down_reply_time = mstime(); + if (r->element[0]->integer == 1) { + ri->flags |= SRI_MASTER_DOWN; + } else { + ri->flags &= ~SRI_MASTER_DOWN; + } + if (strcmp(r->element[1]->str,"*")) { + /* If the runid in the reply is not "*" the Sentinel actually + * replied with a vote. */ + sdsfree(ri->leader); + if ((long long)ri->leader_epoch != r->element[2]->integer) + serverLog(LL_NOTICE, + "%s voted for %s %llu", ri->name, + r->element[1]->str, + (unsigned long long) r->element[2]->integer); + ri->leader = sdsnew(r->element[1]->str); + ri->leader_epoch = r->element[2]->integer; + } + } +} + +/* If we think the master is down, we start sending + * SENTINEL IS-MASTER-DOWN-BY-ADDR requests to other sentinels + * in order to get the replies that allow to reach the quorum + * needed to mark the master in ODOWN state and trigger a failover. */ +#define SENTINEL_ASK_FORCED (1<<0) +void sentinelAskMasterStateToOtherSentinels(sentinelRedisInstance *master, int flags) { + dictIterator *di; + dictEntry *de; + + di = dictGetIterator(master->sentinels); + while((de = dictNext(di)) != NULL) { + sentinelRedisInstance *ri = dictGetVal(de); + mstime_t elapsed = mstime() - ri->last_master_down_reply_time; + char port[32]; + int retval; + + /* If the master state from other sentinel is too old, we clear it. */ + if (elapsed > sentinel_ask_period*5) { + ri->flags &= ~SRI_MASTER_DOWN; + sdsfree(ri->leader); + ri->leader = NULL; + } + + /* Only ask if master is down to other sentinels if: + * + * 1) We believe it is down, or there is a failover in progress. + * 2) Sentinel is connected. + * 3) We did not receive the info within SENTINEL_ASK_PERIOD ms. */ + if ((master->flags & SRI_S_DOWN) == 0) continue; + if (ri->link->disconnected) continue; + if (!(flags & SENTINEL_ASK_FORCED) && + mstime() - ri->last_master_down_reply_time < sentinel_ask_period) + continue; + + /* Ask */ + ll2string(port,sizeof(port),master->addr->port); + retval = redisAsyncCommand(ri->link->cc, + sentinelReceiveIsMasterDownReply, ri, + "%s is-master-down-by-addr %s %s %llu %s", + sentinelInstanceMapCommand(ri,"SENTINEL"), + announceSentinelAddr(master->addr), port, + sentinel.current_epoch, + (master->failover_state > SENTINEL_FAILOVER_STATE_NONE) ? + sentinel.myid : "*"); + if (retval == C_OK) ri->link->pending_commands++; + } + dictReleaseIterator(di); +} + +/* =============================== FAILOVER ================================= */ + +/* Crash because of user request via SENTINEL simulate-failure command. */ +void sentinelSimFailureCrash(void) { + serverLog(LL_WARNING, + "Sentinel CRASH because of SENTINEL simulate-failure"); + exit(99); +} + +/* Vote for the sentinel with 'req_runid' or return the old vote if already + * voted for the specified 'req_epoch' or one greater. + * + * If a vote is not available returns NULL, otherwise return the Sentinel + * runid and populate the leader_epoch with the epoch of the vote. */ +char *sentinelVoteLeader(sentinelRedisInstance *master, uint64_t req_epoch, char *req_runid, uint64_t *leader_epoch) { + if (req_epoch > sentinel.current_epoch) { + sentinel.current_epoch = req_epoch; + sentinelFlushConfig(); + sentinelEvent(LL_WARNING,"+new-epoch",master,"%llu", + (unsigned long long) sentinel.current_epoch); + } + + if (master->leader_epoch < req_epoch && sentinel.current_epoch <= req_epoch) + { + sdsfree(master->leader); + master->leader = sdsnew(req_runid); + master->leader_epoch = sentinel.current_epoch; + sentinelFlushConfig(); + sentinelEvent(LL_WARNING,"+vote-for-leader",master,"%s %llu", + master->leader, (unsigned long long) master->leader_epoch); + /* If we did not voted for ourselves, set the master failover start + * time to now, in order to force a delay before we can start a + * failover for the same master. */ + if (strcasecmp(master->leader,sentinel.myid)) + master->failover_start_time = mstime()+rand()%SENTINEL_MAX_DESYNC; + } + + *leader_epoch = master->leader_epoch; + return master->leader ? sdsnew(master->leader) : NULL; +} + +struct sentinelLeader { + char *runid; + unsigned long votes; +}; + +/* Helper function for sentinelGetLeader, increment the counter + * relative to the specified runid. */ +int sentinelLeaderIncr(dict *counters, char *runid) { + dictEntry *existing, *de; + uint64_t oldval; + + de = dictAddRaw(counters,runid,&existing); + if (existing) { + oldval = dictGetUnsignedIntegerVal(existing); + dictSetUnsignedIntegerVal(existing,oldval+1); + return oldval+1; + } else { + serverAssert(de != NULL); + dictSetUnsignedIntegerVal(de,1); + return 1; + } +} + +/* Scan all the Sentinels attached to this master to check if there + * is a leader for the specified epoch. + * + * To be a leader for a given epoch, we should have the majority of + * the Sentinels we know (ever seen since the last SENTINEL RESET) that + * reported the same instance as leader for the same epoch. */ +char *sentinelGetLeader(sentinelRedisInstance *master, uint64_t epoch) { + dict *counters; + dictIterator *di; + dictEntry *de; + unsigned int voters = 0, voters_quorum; + char *myvote; + char *winner = NULL; + uint64_t leader_epoch; + uint64_t max_votes = 0; + + serverAssert(master->flags & (SRI_O_DOWN|SRI_FAILOVER_IN_PROGRESS)); + counters = dictCreate(&leaderVotesDictType); + + voters = dictSize(master->sentinels)+1; /* All the other sentinels and me.*/ + + /* Count other sentinels votes */ + di = dictGetIterator(master->sentinels); + while((de = dictNext(di)) != NULL) { + sentinelRedisInstance *ri = dictGetVal(de); + if (ri->leader != NULL && ri->leader_epoch == sentinel.current_epoch) + sentinelLeaderIncr(counters,ri->leader); + } + dictReleaseIterator(di); + + /* Check what's the winner. For the winner to win, it needs two conditions: + * 1) Absolute majority between voters (50% + 1). + * 2) And anyway at least master->quorum votes. */ + di = dictGetIterator(counters); + while((de = dictNext(di)) != NULL) { + uint64_t votes = dictGetUnsignedIntegerVal(de); + + if (votes > max_votes) { + max_votes = votes; + winner = dictGetKey(de); + } + } + dictReleaseIterator(di); + + /* Count this Sentinel vote: + * if this Sentinel did not voted yet, either vote for the most + * common voted sentinel, or for itself if no vote exists at all. */ + if (winner) + myvote = sentinelVoteLeader(master,epoch,winner,&leader_epoch); + else + myvote = sentinelVoteLeader(master,epoch,sentinel.myid,&leader_epoch); + + if (myvote && leader_epoch == epoch) { + uint64_t votes = sentinelLeaderIncr(counters,myvote); + + if (votes > max_votes) { + max_votes = votes; + winner = myvote; + } + } + + voters_quorum = voters/2+1; + if (winner && (max_votes < voters_quorum || max_votes < master->quorum)) + winner = NULL; + + winner = winner ? sdsnew(winner) : NULL; + sdsfree(myvote); + dictRelease(counters); + return winner; +} + +/* Send SLAVEOF to the specified instance, always followed by a + * CONFIG REWRITE command in order to store the new configuration on disk + * when possible (that is, if the Redis instance is recent enough to support + * config rewriting, and if the server was started with a configuration file). + * + * If Host is NULL the function sends "SLAVEOF NO ONE". + * + * The command returns C_OK if the SLAVEOF command was accepted for + * (later) delivery otherwise C_ERR. The command replies are just + * discarded. */ +int sentinelSendSlaveOf(sentinelRedisInstance *ri, const sentinelAddr *addr) { + char portstr[32]; + const char *host; + int retval; + + /* If host is NULL we send SLAVEOF NO ONE that will turn the instance + * into a master. */ + if (!addr) { + host = "NO"; + memcpy(portstr,"ONE",4); + } else { + host = announceSentinelAddr(addr); + ll2string(portstr,sizeof(portstr),addr->port); + } + + /* In order to send SLAVEOF in a safe way, we send a transaction performing + * the following tasks: + * 1) Reconfigure the instance according to the specified host/port params. + * 2) Rewrite the configuration. + * 3) Disconnect all clients (but this one sending the command) in order + * to trigger the ask-master-on-reconnection protocol for connected + * clients. + * + * Note that we don't check the replies returned by commands, since we + * will observe instead the effects in the next INFO output. */ + retval = redisAsyncCommand(ri->link->cc, + sentinelDiscardReplyCallback, ri, "%s", + sentinelInstanceMapCommand(ri,"MULTI")); + if (retval == C_ERR) return retval; + ri->link->pending_commands++; + + retval = redisAsyncCommand(ri->link->cc, + sentinelDiscardReplyCallback, ri, "%s %s %s", + sentinelInstanceMapCommand(ri,"SLAVEOF"), + host, portstr); + if (retval == C_ERR) return retval; + ri->link->pending_commands++; + + retval = redisAsyncCommand(ri->link->cc, + sentinelDiscardReplyCallback, ri, "%s REWRITE", + sentinelInstanceMapCommand(ri,"CONFIG")); + if (retval == C_ERR) return retval; + ri->link->pending_commands++; + + /* CLIENT KILL TYPE <type> is only supported starting from Redis 2.8.12, + * however sending it to an instance not understanding this command is not + * an issue because CLIENT is variadic command, so Redis will not + * recognized as a syntax error, and the transaction will not fail (but + * only the unsupported command will fail). */ + for (int type = 0; type < 2; type++) { + retval = redisAsyncCommand(ri->link->cc, + sentinelDiscardReplyCallback, ri, "%s KILL TYPE %s", + sentinelInstanceMapCommand(ri,"CLIENT"), + type == 0 ? "normal" : "pubsub"); + if (retval == C_ERR) return retval; + ri->link->pending_commands++; + } + + retval = redisAsyncCommand(ri->link->cc, + sentinelDiscardReplyCallback, ri, "%s", + sentinelInstanceMapCommand(ri,"EXEC")); + if (retval == C_ERR) return retval; + ri->link->pending_commands++; + + return C_OK; +} + +/* Setup the master state to start a failover. */ +void sentinelStartFailover(sentinelRedisInstance *master) { + serverAssert(master->flags & SRI_MASTER); + + master->failover_state = SENTINEL_FAILOVER_STATE_WAIT_START; + master->flags |= SRI_FAILOVER_IN_PROGRESS; + master->failover_epoch = ++sentinel.current_epoch; + sentinelEvent(LL_WARNING,"+new-epoch",master,"%llu", + (unsigned long long) sentinel.current_epoch); + sentinelEvent(LL_WARNING,"+try-failover",master,"%@"); + master->failover_start_time = mstime()+rand()%SENTINEL_MAX_DESYNC; + master->failover_state_change_time = mstime(); +} + +/* This function checks if there are the conditions to start the failover, + * that is: + * + * 1) Master must be in ODOWN condition. + * 2) No failover already in progress. + * 3) No failover already attempted recently. + * + * We still don't know if we'll win the election so it is possible that we + * start the failover but that we'll not be able to act. + * + * Return non-zero if a failover was started. */ +int sentinelStartFailoverIfNeeded(sentinelRedisInstance *master) { + /* We can't failover if the master is not in O_DOWN state. */ + if (!(master->flags & SRI_O_DOWN)) return 0; + + /* Failover already in progress? */ + if (master->flags & SRI_FAILOVER_IN_PROGRESS) return 0; + + /* Last failover attempt started too little time ago? */ + if (mstime() - master->failover_start_time < + master->failover_timeout*2) + { + if (master->failover_delay_logged != master->failover_start_time) { + time_t clock = (master->failover_start_time + + master->failover_timeout*2) / 1000; + char ctimebuf[26]; + + ctime_r(&clock,ctimebuf); + ctimebuf[24] = '\0'; /* Remove newline. */ + master->failover_delay_logged = master->failover_start_time; + serverLog(LL_NOTICE, + "Next failover delay: I will not start a failover before %s", + ctimebuf); + } + return 0; + } + + sentinelStartFailover(master); + return 1; +} + +/* Select a suitable slave to promote. The current algorithm only uses + * the following parameters: + * + * 1) None of the following conditions: S_DOWN, O_DOWN, DISCONNECTED. + * 2) Last time the slave replied to ping no more than 5 times the PING period. + * 3) info_refresh not older than 3 times the INFO refresh period. + * 4) master_link_down_time no more than: + * (now - master->s_down_since_time) + (master->down_after_period * 10). + * Basically since the master is down from our POV, the slave reports + * to be disconnected no more than 10 times the configured down-after-period. + * This is pretty much black magic but the idea is, the master was not + * available so the slave may be lagging, but not over a certain time. + * Anyway we'll select the best slave according to replication offset. + * 5) Slave priority can't be zero, otherwise the slave is discarded. + * + * Among all the slaves matching the above conditions we select the slave + * with, in order of sorting key: + * + * - lower slave_priority. + * - bigger processed replication offset. + * - lexicographically smaller runid. + * + * Basically if runid is the same, the slave that processed more commands + * from the master is selected. + * + * The function returns the pointer to the selected slave, otherwise + * NULL if no suitable slave was found. + */ + +/* Helper for sentinelSelectSlave(). This is used by qsort() in order to + * sort suitable slaves in a "better first" order, to take the first of + * the list. */ +int compareSlavesForPromotion(const void *a, const void *b) { + sentinelRedisInstance **sa = (sentinelRedisInstance **)a, + **sb = (sentinelRedisInstance **)b; + char *sa_runid, *sb_runid; + + if ((*sa)->slave_priority != (*sb)->slave_priority) + return (*sa)->slave_priority - (*sb)->slave_priority; + + /* If priority is the same, select the slave with greater replication + * offset (processed more data from the master). */ + if ((*sa)->slave_repl_offset > (*sb)->slave_repl_offset) { + return -1; /* a < b */ + } else if ((*sa)->slave_repl_offset < (*sb)->slave_repl_offset) { + return 1; /* a > b */ + } + + /* If the replication offset is the same select the slave with that has + * the lexicographically smaller runid. Note that we try to handle runid + * == NULL as there are old Redis versions that don't publish runid in + * INFO. A NULL runid is considered bigger than any other runid. */ + sa_runid = (*sa)->runid; + sb_runid = (*sb)->runid; + if (sa_runid == NULL && sb_runid == NULL) return 0; + else if (sa_runid == NULL) return 1; /* a > b */ + else if (sb_runid == NULL) return -1; /* a < b */ + return strcasecmp(sa_runid, sb_runid); +} + +sentinelRedisInstance *sentinelSelectSlave(sentinelRedisInstance *master) { + sentinelRedisInstance **instance = + zmalloc(sizeof(instance[0])*dictSize(master->slaves)); + sentinelRedisInstance *selected = NULL; + int instances = 0; + dictIterator *di; + dictEntry *de; + mstime_t max_master_down_time = 0; + + if (master->flags & SRI_S_DOWN) + max_master_down_time += mstime() - master->s_down_since_time; + max_master_down_time += master->down_after_period * 10; + + di = dictGetIterator(master->slaves); + + while((de = dictNext(di)) != NULL) { + sentinelRedisInstance *slave = dictGetVal(de); + mstime_t info_validity_time; + + if (slave->flags & (SRI_S_DOWN|SRI_O_DOWN)) continue; + if (slave->link->disconnected) continue; + if (mstime() - slave->link->last_avail_time > sentinel_ping_period*5) continue; + if (slave->slave_priority == 0) continue; + + /* If the master is in SDOWN state we get INFO for slaves every second. + * Otherwise we get it with the usual period so we need to account for + * a larger delay. */ + if (master->flags & SRI_S_DOWN) + info_validity_time = sentinel_ping_period*5; + else + info_validity_time = sentinel_info_period*3; + if (mstime() - slave->info_refresh > info_validity_time) continue; + if (slave->master_link_down_time > max_master_down_time) continue; + instance[instances++] = slave; + } + dictReleaseIterator(di); + if (instances) { + qsort(instance,instances,sizeof(sentinelRedisInstance*), + compareSlavesForPromotion); + selected = instance[0]; + } + zfree(instance); + return selected; +} + +/* ---------------- Failover state machine implementation ------------------- */ +void sentinelFailoverWaitStart(sentinelRedisInstance *ri) { + char *leader; + int isleader; + + /* Check if we are the leader for the failover epoch. */ + leader = sentinelGetLeader(ri, ri->failover_epoch); + isleader = leader && strcasecmp(leader,sentinel.myid) == 0; + sdsfree(leader); + + /* If I'm not the leader, and it is not a forced failover via + * SENTINEL FAILOVER, then I can't continue with the failover. */ + if (!isleader && !(ri->flags & SRI_FORCE_FAILOVER)) { + mstime_t election_timeout = sentinel_election_timeout; + + /* The election timeout is the MIN between SENTINEL_ELECTION_TIMEOUT + * and the configured failover timeout. */ + if (election_timeout > ri->failover_timeout) + election_timeout = ri->failover_timeout; + /* Abort the failover if I'm not the leader after some time. */ + if (mstime() - ri->failover_start_time > election_timeout) { + sentinelEvent(LL_WARNING,"-failover-abort-not-elected",ri,"%@"); + sentinelAbortFailover(ri); + } + return; + } + sentinelEvent(LL_WARNING,"+elected-leader",ri,"%@"); + if (sentinel.simfailure_flags & SENTINEL_SIMFAILURE_CRASH_AFTER_ELECTION) + sentinelSimFailureCrash(); + ri->failover_state = SENTINEL_FAILOVER_STATE_SELECT_SLAVE; + ri->failover_state_change_time = mstime(); + sentinelEvent(LL_WARNING,"+failover-state-select-slave",ri,"%@"); +} + +void sentinelFailoverSelectSlave(sentinelRedisInstance *ri) { + sentinelRedisInstance *slave = sentinelSelectSlave(ri); + + /* We don't handle the timeout in this state as the function aborts + * the failover or go forward in the next state. */ + if (slave == NULL) { + sentinelEvent(LL_WARNING,"-failover-abort-no-good-slave",ri,"%@"); + sentinelAbortFailover(ri); + } else { + sentinelEvent(LL_WARNING,"+selected-slave",slave,"%@"); + slave->flags |= SRI_PROMOTED; + ri->promoted_slave = slave; + ri->failover_state = SENTINEL_FAILOVER_STATE_SEND_SLAVEOF_NOONE; + ri->failover_state_change_time = mstime(); + sentinelEvent(LL_NOTICE,"+failover-state-send-slaveof-noone", + slave, "%@"); + } +} + +void sentinelFailoverSendSlaveOfNoOne(sentinelRedisInstance *ri) { + int retval; + + /* We can't send the command to the promoted slave if it is now + * disconnected. Retry again and again with this state until the timeout + * is reached, then abort the failover. */ + if (ri->promoted_slave->link->disconnected) { + if (mstime() - ri->failover_state_change_time > ri->failover_timeout) { + sentinelEvent(LL_WARNING,"-failover-abort-slave-timeout",ri,"%@"); + sentinelAbortFailover(ri); + } + return; + } + + /* Send SLAVEOF NO ONE command to turn the slave into a master. + * We actually register a generic callback for this command as we don't + * really care about the reply. We check if it worked indirectly observing + * if INFO returns a different role (master instead of slave). */ + retval = sentinelSendSlaveOf(ri->promoted_slave,NULL); + if (retval != C_OK) return; + sentinelEvent(LL_NOTICE, "+failover-state-wait-promotion", + ri->promoted_slave,"%@"); + ri->failover_state = SENTINEL_FAILOVER_STATE_WAIT_PROMOTION; + ri->failover_state_change_time = mstime(); +} + +/* We actually wait for promotion indirectly checking with INFO when the + * slave turns into a master. */ +void sentinelFailoverWaitPromotion(sentinelRedisInstance *ri) { + /* Just handle the timeout. Switching to the next state is handled + * by the function parsing the INFO command of the promoted slave. */ + if (mstime() - ri->failover_state_change_time > ri->failover_timeout) { + sentinelEvent(LL_WARNING,"-failover-abort-slave-timeout",ri,"%@"); + sentinelAbortFailover(ri); + } +} + +void sentinelFailoverDetectEnd(sentinelRedisInstance *master) { + int not_reconfigured = 0, timeout = 0; + dictIterator *di; + dictEntry *de; + mstime_t elapsed = mstime() - master->failover_state_change_time; + + /* We can't consider failover finished if the promoted slave is + * not reachable. */ + if (master->promoted_slave == NULL || + master->promoted_slave->flags & SRI_S_DOWN) return; + + /* The failover terminates once all the reachable slaves are properly + * configured. */ + di = dictGetIterator(master->slaves); + while((de = dictNext(di)) != NULL) { + sentinelRedisInstance *slave = dictGetVal(de); + + if (slave->flags & (SRI_PROMOTED|SRI_RECONF_DONE)) continue; + if (slave->flags & SRI_S_DOWN) continue; + not_reconfigured++; + } + dictReleaseIterator(di); + + /* Force end of failover on timeout. */ + if (elapsed > master->failover_timeout) { + not_reconfigured = 0; + timeout = 1; + sentinelEvent(LL_WARNING,"+failover-end-for-timeout",master,"%@"); + } + + if (not_reconfigured == 0) { + sentinelEvent(LL_WARNING,"+failover-end",master,"%@"); + master->failover_state = SENTINEL_FAILOVER_STATE_UPDATE_CONFIG; + master->failover_state_change_time = mstime(); + } + + /* If I'm the leader it is a good idea to send a best effort SLAVEOF + * command to all the slaves still not reconfigured to replicate with + * the new master. */ + if (timeout) { + dictIterator *di; + dictEntry *de; + + di = dictGetIterator(master->slaves); + while((de = dictNext(di)) != NULL) { + sentinelRedisInstance *slave = dictGetVal(de); + int retval; + + if (slave->flags & (SRI_PROMOTED|SRI_RECONF_DONE|SRI_RECONF_SENT)) continue; + if (slave->link->disconnected) continue; + + retval = sentinelSendSlaveOf(slave,master->promoted_slave->addr); + if (retval == C_OK) { + sentinelEvent(LL_NOTICE,"+slave-reconf-sent-be",slave,"%@"); + slave->flags |= SRI_RECONF_SENT; + } + } + dictReleaseIterator(di); + } +} + +/* Send SLAVE OF <new master address> to all the remaining slaves that + * still don't appear to have the configuration updated. */ +void sentinelFailoverReconfNextSlave(sentinelRedisInstance *master) { + dictIterator *di; + dictEntry *de; + int in_progress = 0; + + di = dictGetIterator(master->slaves); + while((de = dictNext(di)) != NULL) { + sentinelRedisInstance *slave = dictGetVal(de); + + if (slave->flags & (SRI_RECONF_SENT|SRI_RECONF_INPROG)) + in_progress++; + } + dictReleaseIterator(di); + + di = dictGetIterator(master->slaves); + while(in_progress < master->parallel_syncs && + (de = dictNext(di)) != NULL) + { + sentinelRedisInstance *slave = dictGetVal(de); + int retval; + + /* Skip the promoted slave, and already configured slaves. */ + if (slave->flags & (SRI_PROMOTED|SRI_RECONF_DONE)) continue; + + /* If too much time elapsed without the slave moving forward to + * the next state, consider it reconfigured even if it is not. + * Sentinels will detect the slave as misconfigured and fix its + * configuration later. */ + if ((slave->flags & SRI_RECONF_SENT) && + (mstime() - slave->slave_reconf_sent_time) > + sentinel_slave_reconf_timeout) + { + sentinelEvent(LL_NOTICE,"-slave-reconf-sent-timeout",slave,"%@"); + slave->flags &= ~SRI_RECONF_SENT; + slave->flags |= SRI_RECONF_DONE; + } + + /* Nothing to do for instances that are disconnected or already + * in RECONF_SENT state. */ + if (slave->flags & (SRI_RECONF_SENT|SRI_RECONF_INPROG)) continue; + if (slave->link->disconnected) continue; + + /* Send SLAVEOF <new master>. */ + retval = sentinelSendSlaveOf(slave,master->promoted_slave->addr); + if (retval == C_OK) { + slave->flags |= SRI_RECONF_SENT; + slave->slave_reconf_sent_time = mstime(); + sentinelEvent(LL_NOTICE,"+slave-reconf-sent",slave,"%@"); + in_progress++; + } + } + dictReleaseIterator(di); + + /* Check if all the slaves are reconfigured and handle timeout. */ + sentinelFailoverDetectEnd(master); +} + +/* This function is called when the slave is in + * SENTINEL_FAILOVER_STATE_UPDATE_CONFIG state. In this state we need + * to remove it from the master table and add the promoted slave instead. */ +void sentinelFailoverSwitchToPromotedSlave(sentinelRedisInstance *master) { + sentinelRedisInstance *ref = master->promoted_slave ? + master->promoted_slave : master; + + sentinelEvent(LL_WARNING,"+switch-master",master,"%s %s %d %s %d", + master->name, announceSentinelAddr(master->addr), master->addr->port, + announceSentinelAddr(ref->addr), ref->addr->port); + + sentinelResetMasterAndChangeAddress(master,ref->addr->hostname,ref->addr->port); +} + +void sentinelFailoverStateMachine(sentinelRedisInstance *ri) { + serverAssert(ri->flags & SRI_MASTER); + + if (!(ri->flags & SRI_FAILOVER_IN_PROGRESS)) return; + + switch(ri->failover_state) { + case SENTINEL_FAILOVER_STATE_WAIT_START: + sentinelFailoverWaitStart(ri); + break; + case SENTINEL_FAILOVER_STATE_SELECT_SLAVE: + sentinelFailoverSelectSlave(ri); + break; + case SENTINEL_FAILOVER_STATE_SEND_SLAVEOF_NOONE: + sentinelFailoverSendSlaveOfNoOne(ri); + break; + case SENTINEL_FAILOVER_STATE_WAIT_PROMOTION: + sentinelFailoverWaitPromotion(ri); + break; + case SENTINEL_FAILOVER_STATE_RECONF_SLAVES: + sentinelFailoverReconfNextSlave(ri); + break; + } +} + +/* Abort a failover in progress: + * + * This function can only be called before the promoted slave acknowledged + * the slave -> master switch. Otherwise the failover can't be aborted and + * will reach its end (possibly by timeout). */ +void sentinelAbortFailover(sentinelRedisInstance *ri) { + serverAssert(ri->flags & SRI_FAILOVER_IN_PROGRESS); + serverAssert(ri->failover_state <= SENTINEL_FAILOVER_STATE_WAIT_PROMOTION); + + ri->flags &= ~(SRI_FAILOVER_IN_PROGRESS|SRI_FORCE_FAILOVER); + ri->failover_state = SENTINEL_FAILOVER_STATE_NONE; + ri->failover_state_change_time = mstime(); + if (ri->promoted_slave) { + ri->promoted_slave->flags &= ~SRI_PROMOTED; + ri->promoted_slave = NULL; + } +} + +/* ======================== SENTINEL timer handler ========================== + * This is the "main" our Sentinel, being sentinel completely non blocking + * in design. + * -------------------------------------------------------------------------- */ + +/* Perform scheduled operations for the specified Redis instance. */ +void sentinelHandleRedisInstance(sentinelRedisInstance *ri) { + /* ========== MONITORING HALF ============ */ + /* Every kind of instance */ + sentinelReconnectInstance(ri); + sentinelSendPeriodicCommands(ri); + + /* ============== ACTING HALF ============= */ + /* We don't proceed with the acting half if we are in TILT mode. + * TILT happens when we find something odd with the time, like a + * sudden change in the clock. */ + if (sentinel.tilt) { + if (mstime()-sentinel.tilt_start_time < sentinel_tilt_period) return; + sentinel.tilt = 0; + sentinelEvent(LL_WARNING,"-tilt",NULL,"#tilt mode exited"); + } + + /* Every kind of instance */ + sentinelCheckSubjectivelyDown(ri); + + /* Masters and slaves */ + if (ri->flags & (SRI_MASTER|SRI_SLAVE)) { + /* Nothing so far. */ + } + + /* Only masters */ + if (ri->flags & SRI_MASTER) { + sentinelCheckObjectivelyDown(ri); + if (sentinelStartFailoverIfNeeded(ri)) + sentinelAskMasterStateToOtherSentinels(ri,SENTINEL_ASK_FORCED); + sentinelFailoverStateMachine(ri); + sentinelAskMasterStateToOtherSentinels(ri,SENTINEL_NO_FLAGS); + } +} + +/* Perform scheduled operations for all the instances in the dictionary. + * Recursively call the function against dictionaries of slaves. */ +void sentinelHandleDictOfRedisInstances(dict *instances) { + dictIterator *di; + dictEntry *de; + sentinelRedisInstance *switch_to_promoted = NULL; + + /* There are a number of things we need to perform against every master. */ + di = dictGetIterator(instances); + while((de = dictNext(di)) != NULL) { + sentinelRedisInstance *ri = dictGetVal(de); + + sentinelHandleRedisInstance(ri); + if (ri->flags & SRI_MASTER) { + sentinelHandleDictOfRedisInstances(ri->slaves); + sentinelHandleDictOfRedisInstances(ri->sentinels); + if (ri->failover_state == SENTINEL_FAILOVER_STATE_UPDATE_CONFIG) { + switch_to_promoted = ri; + } + } + } + if (switch_to_promoted) + sentinelFailoverSwitchToPromotedSlave(switch_to_promoted); + dictReleaseIterator(di); +} + +/* This function checks if we need to enter the TILT mode. + * + * The TILT mode is entered if we detect that between two invocations of the + * timer interrupt, a negative amount of time, or too much time has passed. + * Note that we expect that more or less just 100 milliseconds will pass + * if everything is fine. However we'll see a negative number or a + * difference bigger than SENTINEL_TILT_TRIGGER milliseconds if one of the + * following conditions happen: + * + * 1) The Sentinel process for some time is blocked, for every kind of + * random reason: the load is huge, the computer was frozen for some time + * in I/O or alike, the process was stopped by a signal. Everything. + * 2) The system clock was altered significantly. + * + * Under both this conditions we'll see everything as timed out and failing + * without good reasons. Instead we enter the TILT mode and wait + * for SENTINEL_TILT_PERIOD to elapse before starting to act again. + * + * During TILT time we still collect information, we just do not act. */ +void sentinelCheckTiltCondition(void) { + mstime_t now = mstime(); + mstime_t delta = now - sentinel.previous_time; + + if (delta < 0 || delta > sentinel_tilt_trigger) { + sentinel.tilt = 1; + sentinel.tilt_start_time = mstime(); + sentinelEvent(LL_WARNING,"+tilt",NULL,"#tilt mode entered"); + } + sentinel.previous_time = mstime(); +} + +void sentinelTimer(void) { + sentinelCheckTiltCondition(); + sentinelHandleDictOfRedisInstances(sentinel.masters); + sentinelRunPendingScripts(); + sentinelCollectTerminatedScripts(); + sentinelKillTimedoutScripts(); + + /* We continuously change the frequency of the Redis "timer interrupt" + * in order to desynchronize every Sentinel from every other. + * This non-determinism avoids that Sentinels started at the same time + * exactly continue to stay synchronized asking to be voted at the + * same time again and again (resulting in nobody likely winning the + * election because of split brain voting). */ + server.hz = CONFIG_DEFAULT_HZ + rand() % CONFIG_DEFAULT_HZ; +} |