/* Asynchronous replication implementation. * * Copyright (c) 2009-2012, Salvatore Sanfilippo * All rights reserved. * * Redistribution and use in source and binary forms, with or without * modification, are permitted provided that the following conditions are met: * * * Redistributions of source code must retain the above copyright notice, * this list of conditions and the following disclaimer. * * Redistributions in binary form must reproduce the above copyright * notice, this list of conditions and the following disclaimer in the * documentation and/or other materials provided with the distribution. * * Neither the name of Redis nor the names of its contributors may be used * to endorse or promote products derived from this software without * specific prior written permission. * * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE * POSSIBILITY OF SUCH DAMAGE. */ #include "server.h" #include "cluster.h" #include "bio.h" #include "functions.h" #include "connection.h" #include #include #include #include #include #include void replicationDiscardCachedMaster(void); void replicationResurrectCachedMaster(connection *conn); void replicationSendAck(void); int replicaPutOnline(client *slave); void replicaStartCommandStream(client *slave); int cancelReplicationHandshake(int reconnect); /* We take a global flag to remember if this instance generated an RDB * because of replication, so that we can remove the RDB file in case * the instance is configured to have no persistence. */ int RDBGeneratedByReplication = 0; /* --------------------------- Utility functions ---------------------------- */ static ConnectionType *connTypeOfReplication(void) { if (server.tls_replication) { return connectionTypeTls(); } return connectionTypeTcp(); } /* Return the pointer to a string representing the slave ip:listening_port * pair. Mostly useful for logging, since we want to log a slave using its * IP address and its listening port which is more clear for the user, for * example: "Closing connection with replica 10.1.2.3:6380". */ char *replicationGetSlaveName(client *c) { static char buf[NET_HOST_PORT_STR_LEN]; char ip[NET_IP_STR_LEN]; ip[0] = '\0'; buf[0] = '\0'; if (c->slave_addr || connAddrPeerName(c->conn,ip,sizeof(ip),NULL) != -1) { char *addr = c->slave_addr ? c->slave_addr : ip; if (c->slave_listening_port) formatAddr(buf,sizeof(buf),addr,c->slave_listening_port); else snprintf(buf,sizeof(buf),"%s:",addr); } else { snprintf(buf,sizeof(buf),"client id #%llu", (unsigned long long) c->id); } return buf; } /* Plain unlink() can block for quite some time in order to actually apply * the file deletion to the filesystem. This call removes the file in a * background thread instead. We actually just do close() in the thread, * by using the fact that if there is another instance of the same file open, * the foreground unlink() will only remove the fs name, and deleting the * file's storage space will only happen once the last reference is lost. */ int bg_unlink(const char *filename) { int fd = open(filename,O_RDONLY|O_NONBLOCK); if (fd == -1) { /* Can't open the file? Fall back to unlinking in the main thread. */ return unlink(filename); } else { /* The following unlink() removes the name but doesn't free the * file contents because a process still has it open. */ int retval = unlink(filename); if (retval == -1) { /* If we got an unlink error, we just return it, closing the * new reference we have to the file. */ int old_errno = errno; close(fd); /* This would overwrite our errno. So we saved it. */ errno = old_errno; return -1; } bioCreateCloseJob(fd, 0, 0); return 0; /* Success. */ } } /* ---------------------------------- MASTER -------------------------------- */ void createReplicationBacklog(void) { serverAssert(server.repl_backlog == NULL); server.repl_backlog = zmalloc(sizeof(replBacklog)); server.repl_backlog->ref_repl_buf_node = NULL; server.repl_backlog->unindexed_count = 0; server.repl_backlog->blocks_index = raxNew(); server.repl_backlog->histlen = 0; /* We don't have any data inside our buffer, but virtually the first * byte we have is the next byte that will be generated for the * replication stream. */ server.repl_backlog->offset = server.master_repl_offset+1; } /* This function is called when the user modifies the replication backlog * size at runtime. It is up to the function to resize the buffer and setup it * so that it contains the same data as the previous one (possibly less data, * but the most recent bytes, or the same data and more free space in case the * buffer is enlarged). */ void resizeReplicationBacklog(void) { if (server.repl_backlog_size < CONFIG_REPL_BACKLOG_MIN_SIZE) server.repl_backlog_size = CONFIG_REPL_BACKLOG_MIN_SIZE; if (server.repl_backlog) incrementalTrimReplicationBacklog(REPL_BACKLOG_TRIM_BLOCKS_PER_CALL); } void freeReplicationBacklog(void) { serverAssert(listLength(server.slaves) == 0); if (server.repl_backlog == NULL) return; /* Decrease the start buffer node reference count. */ if (server.repl_backlog->ref_repl_buf_node) { replBufBlock *o = listNodeValue( server.repl_backlog->ref_repl_buf_node); serverAssert(o->refcount == 1); /* Last reference. */ o->refcount--; } /* Replication buffer blocks are completely released when we free the * backlog, since the backlog is released only when there are no replicas * and the backlog keeps the last reference of all blocks. */ freeReplicationBacklogRefMemAsync(server.repl_buffer_blocks, server.repl_backlog->blocks_index); resetReplicationBuffer(); zfree(server.repl_backlog); server.repl_backlog = NULL; } /* To make search offset from replication buffer blocks quickly * when replicas ask partial resynchronization, we create one index * block every REPL_BACKLOG_INDEX_PER_BLOCKS blocks. */ void createReplicationBacklogIndex(listNode *ln) { server.repl_backlog->unindexed_count++; if (server.repl_backlog->unindexed_count >= REPL_BACKLOG_INDEX_PER_BLOCKS) { replBufBlock *o = listNodeValue(ln); uint64_t encoded_offset = htonu64(o->repl_offset); raxInsert(server.repl_backlog->blocks_index, (unsigned char*)&encoded_offset, sizeof(uint64_t), ln, NULL); server.repl_backlog->unindexed_count = 0; } } /* Rebase replication buffer blocks' offset since the initial * setting offset starts from 0 when master restart. */ void rebaseReplicationBuffer(long long base_repl_offset) { raxFree(server.repl_backlog->blocks_index); server.repl_backlog->blocks_index = raxNew(); server.repl_backlog->unindexed_count = 0; listIter li; listNode *ln; listRewind(server.repl_buffer_blocks, &li); while ((ln = listNext(&li))) { replBufBlock *o = listNodeValue(ln); o->repl_offset += base_repl_offset; createReplicationBacklogIndex(ln); } } void resetReplicationBuffer(void) { server.repl_buffer_mem = 0; server.repl_buffer_blocks = listCreate(); listSetFreeMethod(server.repl_buffer_blocks, (void (*)(void*))zfree); } int canFeedReplicaReplBuffer(client *replica) { /* Don't feed replicas that only want the RDB. */ if (replica->flags & CLIENT_REPL_RDBONLY) return 0; /* Don't feed replicas that are still waiting for BGSAVE to start. */ if (replica->replstate == SLAVE_STATE_WAIT_BGSAVE_START) return 0; return 1; } /* Similar with 'prepareClientToWrite', note that we must call this function * before feeding replication stream into global replication buffer, since * clientHasPendingReplies in prepareClientToWrite will access the global * replication buffer to make judgements. */ int prepareReplicasToWrite(void) { listIter li; listNode *ln; int prepared = 0; listRewind(server.slaves,&li); while((ln = listNext(&li))) { client *slave = ln->value; if (!canFeedReplicaReplBuffer(slave)) continue; if (prepareClientToWrite(slave) == C_ERR) continue; prepared++; } return prepared; } /* Wrapper for feedReplicationBuffer() that takes Redis string objects * as input. */ void feedReplicationBufferWithObject(robj *o) { char llstr[LONG_STR_SIZE]; void *p; size_t len; if (o->encoding == OBJ_ENCODING_INT) { len = ll2string(llstr,sizeof(llstr),(long)o->ptr); p = llstr; } else { len = sdslen(o->ptr); p = o->ptr; } feedReplicationBuffer(p,len); } /* Generally, we only have one replication buffer block to trim when replication * backlog size exceeds our setting and no replica reference it. But if replica * clients disconnect, we need to free many replication buffer blocks that are * referenced. It would cost much time if there are a lots blocks to free, that * will freeze server, so we trim replication backlog incrementally. */ void incrementalTrimReplicationBacklog(size_t max_blocks) { serverAssert(server.repl_backlog != NULL); size_t trimmed_blocks = 0; while (server.repl_backlog->histlen > server.repl_backlog_size && trimmed_blocks < max_blocks) { /* We never trim backlog to less than one block. */ if (listLength(server.repl_buffer_blocks) <= 1) break; /* Replicas increment the refcount of the first replication buffer block * they refer to, in that case, we don't trim the backlog even if * backlog_histlen exceeds backlog_size. This implicitly makes backlog * bigger than our setting, but makes the master accept partial resync as * much as possible. So that backlog must be the last reference of * replication buffer blocks. */ listNode *first = listFirst(server.repl_buffer_blocks); serverAssert(first == server.repl_backlog->ref_repl_buf_node); replBufBlock *fo = listNodeValue(first); if (fo->refcount != 1) break; /* We don't try trim backlog if backlog valid size will be lessen than * setting backlog size once we release the first repl buffer block. */ if (server.repl_backlog->histlen - (long long)fo->size <= server.repl_backlog_size) break; /* Decr refcount and release the first block later. */ fo->refcount--; trimmed_blocks++; server.repl_backlog->histlen -= fo->size; /* Go to use next replication buffer block node. */ listNode *next = listNextNode(first); server.repl_backlog->ref_repl_buf_node = next; serverAssert(server.repl_backlog->ref_repl_buf_node != NULL); /* Incr reference count to keep the new head node. */ ((replBufBlock *)listNodeValue(next))->refcount++; /* Remove the node in recorded blocks. */ uint64_t encoded_offset = htonu64(fo->repl_offset); raxRemove(server.repl_backlog->blocks_index, (unsigned char*)&encoded_offset, sizeof(uint64_t), NULL); /* Delete the first node from global replication buffer. */ serverAssert(fo->refcount == 0 && fo->used == fo->size); server.repl_buffer_mem -= (fo->size + sizeof(listNode) + sizeof(replBufBlock)); listDelNode(server.repl_buffer_blocks, first); } /* Set the offset of the first byte we have in the backlog. */ server.repl_backlog->offset = server.master_repl_offset - server.repl_backlog->histlen + 1; } /* Free replication buffer blocks that are referenced by this client. */ void freeReplicaReferencedReplBuffer(client *replica) { if (replica->ref_repl_buf_node != NULL) { /* Decrease the start buffer node reference count. */ replBufBlock *o = listNodeValue(replica->ref_repl_buf_node); serverAssert(o->refcount > 0); o->refcount--; incrementalTrimReplicationBacklog(REPL_BACKLOG_TRIM_BLOCKS_PER_CALL); } replica->ref_repl_buf_node = NULL; replica->ref_block_pos = 0; } /* Append bytes into the global replication buffer list, replication backlog and * all replica clients use replication buffers collectively, this function replace * 'addReply*', 'feedReplicationBacklog' for replicas and replication backlog, * First we add buffer into global replication buffer block list, and then * update replica / replication-backlog referenced node and block position. */ void feedReplicationBuffer(char *s, size_t len) { static long long repl_block_id = 0; if (server.repl_backlog == NULL) return; while(len > 0) { size_t start_pos = 0; /* The position of referenced block to start sending. */ listNode *start_node = NULL; /* Replica/backlog starts referenced node. */ int add_new_block = 0; /* Create new block if current block is total used. */ listNode *ln = listLast(server.repl_buffer_blocks); replBufBlock *tail = ln ? listNodeValue(ln) : NULL; /* Append to tail string when possible. */ if (tail && tail->size > tail->used) { start_node = listLast(server.repl_buffer_blocks); start_pos = tail->used; /* Copy the part we can fit into the tail, and leave the rest for a * new node */ size_t avail = tail->size - tail->used; size_t copy = (avail >= len) ? len : avail; memcpy(tail->buf + tail->used, s, copy); tail->used += copy; s += copy; len -= copy; server.master_repl_offset += copy; server.repl_backlog->histlen += copy; } if (len) { /* Create a new node, make sure it is allocated to at * least PROTO_REPLY_CHUNK_BYTES */ size_t usable_size; /* Avoid creating nodes smaller than PROTO_REPLY_CHUNK_BYTES, so that we can append more data into them, * and also avoid creating nodes bigger than repl_backlog_size / 16, so that we won't have huge nodes that can't * trim when we only still need to hold a small portion from them. */ size_t limit = max((size_t)server.repl_backlog_size / 16, (size_t)PROTO_REPLY_CHUNK_BYTES); size_t size = min(max(len, (size_t)PROTO_REPLY_CHUNK_BYTES), limit); tail = zmalloc_usable(size + sizeof(replBufBlock), &usable_size); /* Take over the allocation's internal fragmentation */ tail->size = usable_size - sizeof(replBufBlock); size_t copy = (tail->size >= len) ? len : tail->size; tail->used = copy; tail->refcount = 0; tail->repl_offset = server.master_repl_offset + 1; tail->id = repl_block_id++; memcpy(tail->buf, s, copy); listAddNodeTail(server.repl_buffer_blocks, tail); /* We also count the list node memory into replication buffer memory. */ server.repl_buffer_mem += (usable_size + sizeof(listNode)); add_new_block = 1; if (start_node == NULL) { start_node = listLast(server.repl_buffer_blocks); start_pos = 0; } s += copy; len -= copy; server.master_repl_offset += copy; server.repl_backlog->histlen += copy; } /* For output buffer of replicas. */ listIter li; listRewind(server.slaves,&li); while((ln = listNext(&li))) { client *slave = ln->value; if (!canFeedReplicaReplBuffer(slave)) continue; /* Update shared replication buffer start position. */ if (slave->ref_repl_buf_node == NULL) { slave->ref_repl_buf_node = start_node; slave->ref_block_pos = start_pos; /* Only increase the start block reference count. */ ((replBufBlock *)listNodeValue(start_node))->refcount++; } /* Check output buffer limit only when add new block. */ if (add_new_block) closeClientOnOutputBufferLimitReached(slave, 1); } /* For replication backlog */ if (server.repl_backlog->ref_repl_buf_node == NULL) { server.repl_backlog->ref_repl_buf_node = start_node; /* Only increase the start block reference count. */ ((replBufBlock *)listNodeValue(start_node))->refcount++; /* Replication buffer must be empty before adding replication stream * into replication backlog. */ serverAssert(add_new_block == 1 && start_pos == 0); } if (add_new_block) { createReplicationBacklogIndex(listLast(server.repl_buffer_blocks)); /* It is important to trim after adding replication data to keep the backlog size close to * repl_backlog_size in the common case. We wait until we add a new block to avoid repeated * unnecessary trimming attempts when small amounts of data are added. See comments in * freeMemoryGetNotCountedMemory() for details on replication backlog memory tracking. */ incrementalTrimReplicationBacklog(REPL_BACKLOG_TRIM_BLOCKS_PER_CALL); } } } /* Propagate write commands to replication stream. * * This function is used if the instance is a master: we use the commands * received by our clients in order to create the replication stream. * Instead if the instance is a replica and has sub-replicas attached, we use * replicationFeedStreamFromMasterStream() */ void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int argc) { int j, len; char llstr[LONG_STR_SIZE]; /* In case we propagate a command that doesn't touch keys (PING, REPLCONF) we * pass dbid=-1 that indicate there is no need to replicate `select` command. */ serverAssert(dictid == -1 || (dictid >= 0 && dictid < server.dbnum)); /* If the instance is not a top level master, return ASAP: we'll just proxy * the stream of data we receive from our master instead, in order to * propagate *identical* replication stream. In this way this slave can * advertise the same replication ID as the master (since it shares the * master replication history and has the same backlog and offsets). */ if (server.masterhost != NULL) return; /* If there aren't slaves, and there is no backlog buffer to populate, * we can return ASAP. */ if (server.repl_backlog == NULL && listLength(slaves) == 0) { /* We increment the repl_offset anyway, since we use that for tracking AOF fsyncs * even when there's no replication active. This code will not be reached if AOF * is also disabled. */ server.master_repl_offset += 1; return; } /* We can't have slaves attached and no backlog. */ serverAssert(!(listLength(slaves) != 0 && server.repl_backlog == NULL)); /* Must install write handler for all replicas first before feeding * replication stream. */ prepareReplicasToWrite(); /* Send SELECT command to every slave if needed. */ if (dictid != -1 && server.slaveseldb != dictid) { robj *selectcmd; /* For a few DBs we have pre-computed SELECT command. */ if (dictid >= 0 && dictid < PROTO_SHARED_SELECT_CMDS) { selectcmd = shared.select[dictid]; } else { int dictid_len; dictid_len = ll2string(llstr,sizeof(llstr),dictid); selectcmd = createObject(OBJ_STRING, sdscatprintf(sdsempty(), "*2\r\n$6\r\nSELECT\r\n$%d\r\n%s\r\n", dictid_len, llstr)); } feedReplicationBufferWithObject(selectcmd); if (dictid < 0 || dictid >= PROTO_SHARED_SELECT_CMDS) decrRefCount(selectcmd); server.slaveseldb = dictid; } /* Write the command to the replication buffer if any. */ char aux[LONG_STR_SIZE+3]; /* Add the multi bulk reply length. */ aux[0] = '*'; len = ll2string(aux+1,sizeof(aux)-1,argc); aux[len+1] = '\r'; aux[len+2] = '\n'; feedReplicationBuffer(aux,len+3); for (j = 0; j < argc; j++) { long objlen = stringObjectLen(argv[j]); /* We need to feed the buffer with the object as a bulk reply * not just as a plain string, so create the $..CRLF payload len * and add the final CRLF */ aux[0] = '$'; len = ll2string(aux+1,sizeof(aux)-1,objlen); aux[len+1] = '\r'; aux[len+2] = '\n'; feedReplicationBuffer(aux,len+3); feedReplicationBufferWithObject(argv[j]); feedReplicationBuffer(aux+len+1,2); } } /* This is a debugging function that gets called when we detect something * wrong with the replication protocol: the goal is to peek into the * replication backlog and show a few final bytes to make simpler to * guess what kind of bug it could be. */ void showLatestBacklog(void) { if (server.repl_backlog == NULL) return; if (listLength(server.repl_buffer_blocks) == 0) return; size_t dumplen = 256; if (server.repl_backlog->histlen < (long long)dumplen) dumplen = server.repl_backlog->histlen; sds dump = sdsempty(); listNode *node = listLast(server.repl_buffer_blocks); while(dumplen) { if (node == NULL) break; replBufBlock *o = listNodeValue(node); size_t thislen = o->used >= dumplen ? dumplen : o->used; sds head = sdscatrepr(sdsempty(), o->buf+o->used-thislen, thislen); sds tmp = sdscatsds(head, dump); sdsfree(dump); dump = tmp; dumplen -= thislen; node = listPrevNode(node); } /* Finally log such bytes: this is vital debugging info to * understand what happened. */ serverLog(LL_NOTICE,"Latest backlog is: '%s'", dump); sdsfree(dump); } /* This function is used in order to proxy what we receive from our master * to our sub-slaves. */ #include void replicationFeedStreamFromMasterStream(char *buf, size_t buflen) { /* Debugging: this is handy to see the stream sent from master * to slaves. Disabled with if(0). */ if (0) { printf("%zu:",buflen); for (size_t j = 0; j < buflen; j++) { printf("%c", isprint(buf[j]) ? buf[j] : '.'); } printf("\n"); } /* There must be replication backlog if having attached slaves. */ if (listLength(server.slaves)) serverAssert(server.repl_backlog != NULL); if (server.repl_backlog) { /* Must install write handler for all replicas first before feeding * replication stream. */ prepareReplicasToWrite(); feedReplicationBuffer(buf,buflen); } } void replicationFeedMonitors(client *c, list *monitors, int dictid, robj **argv, int argc) { /* Fast path to return if the monitors list is empty or the server is in loading. */ if (monitors == NULL || listLength(monitors) == 0 || server.loading) return; listNode *ln; listIter li; int j; sds cmdrepr = sdsnew("+"); robj *cmdobj; struct timeval tv; gettimeofday(&tv,NULL); cmdrepr = sdscatprintf(cmdrepr,"%ld.%06ld ",(long)tv.tv_sec,(long)tv.tv_usec); if (c->flags & CLIENT_SCRIPT) { cmdrepr = sdscatprintf(cmdrepr,"[%d lua] ",dictid); } else if (c->flags & CLIENT_UNIX_SOCKET) { cmdrepr = sdscatprintf(cmdrepr,"[%d unix:%s] ",dictid,server.unixsocket); } else { cmdrepr = sdscatprintf(cmdrepr,"[%d %s] ",dictid,getClientPeerId(c)); } for (j = 0; j < argc; j++) { if (argv[j]->encoding == OBJ_ENCODING_INT) { cmdrepr = sdscatprintf(cmdrepr, "\"%ld\"", (long)argv[j]->ptr); } else { cmdrepr = sdscatrepr(cmdrepr,(char*)argv[j]->ptr, sdslen(argv[j]->ptr)); } if (j != argc-1) cmdrepr = sdscatlen(cmdrepr," ",1); } cmdrepr = sdscatlen(cmdrepr,"\r\n",2); cmdobj = createObject(OBJ_STRING,cmdrepr); listRewind(monitors,&li); while((ln = listNext(&li))) { client *monitor = ln->value; addReply(monitor,cmdobj); updateClientMemUsageAndBucket(monitor); } decrRefCount(cmdobj); } /* Feed the slave 'c' with the replication backlog starting from the * specified 'offset' up to the end of the backlog. */ long long addReplyReplicationBacklog(client *c, long long offset) { long long skip; serverLog(LL_DEBUG, "[PSYNC] Replica request offset: %lld", offset); if (server.repl_backlog->histlen == 0) { serverLog(LL_DEBUG, "[PSYNC] Backlog history len is zero"); return 0; } serverLog(LL_DEBUG, "[PSYNC] Backlog size: %lld", server.repl_backlog_size); serverLog(LL_DEBUG, "[PSYNC] First byte: %lld", server.repl_backlog->offset); serverLog(LL_DEBUG, "[PSYNC] History len: %lld", server.repl_backlog->histlen); /* Compute the amount of bytes we need to discard. */ skip = offset - server.repl_backlog->offset; serverLog(LL_DEBUG, "[PSYNC] Skipping: %lld", skip); /* Iterate recorded blocks, quickly search the approximate node. */ listNode *node = NULL; if (raxSize(server.repl_backlog->blocks_index) > 0) { uint64_t encoded_offset = htonu64(offset); raxIterator ri; raxStart(&ri, server.repl_backlog->blocks_index); raxSeek(&ri, ">", (unsigned char*)&encoded_offset, sizeof(uint64_t)); if (raxEOF(&ri)) { /* No found, so search from the last recorded node. */ raxSeek(&ri, "$", NULL, 0); raxPrev(&ri); node = (listNode *)ri.data; } else { raxPrev(&ri); /* Skip the sought node. */ /* We should search from the prev node since the offset of current * sought node exceeds searching offset. */ if (raxPrev(&ri)) node = (listNode *)ri.data; else node = server.repl_backlog->ref_repl_buf_node; } raxStop(&ri); } else { /* No recorded blocks, just from the start node to search. */ node = server.repl_backlog->ref_repl_buf_node; } /* Search the exact node. */ while (node != NULL) { replBufBlock *o = listNodeValue(node); if (o->repl_offset + (long long)o->used >= offset) break; node = listNextNode(node); } serverAssert(node != NULL); /* Install a writer handler first.*/ prepareClientToWrite(c); /* Setting output buffer of the replica. */ replBufBlock *o = listNodeValue(node); o->refcount++; c->ref_repl_buf_node = node; c->ref_block_pos = offset - o->repl_offset; return server.repl_backlog->histlen - skip; } /* Return the offset to provide as reply to the PSYNC command received * from the slave. The returned value is only valid immediately after * the BGSAVE process started and before executing any other command * from clients. */ long long getPsyncInitialOffset(void) { return server.master_repl_offset; } /* Send a FULLRESYNC reply in the specific case of a full resynchronization, * as a side effect setup the slave for a full sync in different ways: * * 1) Remember, into the slave client structure, the replication offset * we sent here, so that if new slaves will later attach to the same * background RDB saving process (by duplicating this client output * buffer), we can get the right offset from this slave. * 2) Set the replication state of the slave to WAIT_BGSAVE_END so that * we start accumulating differences from this point. * 3) Force the replication stream to re-emit a SELECT statement so * the new slave incremental differences will start selecting the * right database number. * * Normally this function should be called immediately after a successful * BGSAVE for replication was started, or when there is one already in * progress that we attached our slave to. */ int replicationSetupSlaveForFullResync(client *slave, long long offset) { char buf[128]; int buflen; slave->psync_initial_offset = offset; slave->replstate = SLAVE_STATE_WAIT_BGSAVE_END; /* We are going to accumulate the incremental changes for this * slave as well. Set slaveseldb to -1 in order to force to re-emit * a SELECT statement in the replication stream. */ server.slaveseldb = -1; /* Don't send this reply to slaves that approached us with * the old SYNC command. */ if (!(slave->flags & CLIENT_PRE_PSYNC)) { buflen = snprintf(buf,sizeof(buf),"+FULLRESYNC %s %lld\r\n", server.replid,offset); if (connWrite(slave->conn,buf,buflen) != buflen) { freeClientAsync(slave); return C_ERR; } } return C_OK; } /* This function handles the PSYNC command from the point of view of a * master receiving a request for partial resynchronization. * * On success return C_OK, otherwise C_ERR is returned and we proceed * with the usual full resync. */ int masterTryPartialResynchronization(client *c, long long psync_offset) { long long psync_len; char *master_replid = c->argv[1]->ptr; char buf[128]; int buflen; /* Is the replication ID of this master the same advertised by the wannabe * slave via PSYNC? If the replication ID changed this master has a * different replication history, and there is no way to continue. * * Note that there are two potentially valid replication IDs: the ID1 * and the ID2. The ID2 however is only valid up to a specific offset. */ if (strcasecmp(master_replid, server.replid) && (strcasecmp(master_replid, server.replid2) || psync_offset > server.second_replid_offset)) { /* Replid "?" is used by slaves that want to force a full resync. */ if (master_replid[0] != '?') { if (strcasecmp(master_replid, server.replid) && strcasecmp(master_replid, server.replid2)) { serverLog(LL_NOTICE,"Partial resynchronization not accepted: " "Replication ID mismatch (Replica asked for '%s', my " "replication IDs are '%s' and '%s')", master_replid, server.replid, server.replid2); } else { serverLog(LL_NOTICE,"Partial resynchronization not accepted: " "Requested offset for second ID was %lld, but I can reply " "up to %lld", psync_offset, server.second_replid_offset); } } else { serverLog(LL_NOTICE,"Full resync requested by replica %s", replicationGetSlaveName(c)); } goto need_full_resync; } /* We still have the data our slave is asking for? */ if (!server.repl_backlog || psync_offset < server.repl_backlog->offset || psync_offset > (server.repl_backlog->offset + server.repl_backlog->histlen)) { serverLog(LL_NOTICE, "Unable to partial resync with replica %s for lack of backlog (Replica request was: %lld).", replicationGetSlaveName(c), psync_offset); if (psync_offset > server.master_repl_offset) { serverLog(LL_WARNING, "Warning: replica %s tried to PSYNC with an offset that is greater than the master replication offset.", replicationGetSlaveName(c)); } goto need_full_resync; } /* If we reached this point, we are able to perform a partial resync: * 1) Set client state to make it a slave. * 2) Inform the client we can continue with +CONTINUE * 3) Send the backlog data (from the offset to the end) to the slave. */ c->flags |= CLIENT_SLAVE; c->replstate = SLAVE_STATE_ONLINE; c->repl_ack_time = server.unixtime; c->repl_start_cmd_stream_on_ack = 0; listAddNodeTail(server.slaves,c); /* We can't use the connection buffers since they are used to accumulate * new commands at this stage. But we are sure the socket send buffer is * empty so this write will never fail actually. */ if (c->slave_capa & SLAVE_CAPA_PSYNC2) { buflen = snprintf(buf,sizeof(buf),"+CONTINUE %s\r\n", server.replid); } else { buflen = snprintf(buf,sizeof(buf),"+CONTINUE\r\n"); } if (connWrite(c->conn,buf,buflen) != buflen) { freeClientAsync(c); return C_OK; } psync_len = addReplyReplicationBacklog(c,psync_offset); serverLog(LL_NOTICE, "Partial resynchronization request from %s accepted. Sending %lld bytes of backlog starting from offset %lld.", replicationGetSlaveName(c), psync_len, psync_offset); /* Note that we don't need to set the selected DB at server.slaveseldb * to -1 to force the master to emit SELECT, since the slave already * has this state from the previous connection with the master. */ refreshGoodSlavesCount(); /* Fire the replica change modules event. */ moduleFireServerEvent(REDISMODULE_EVENT_REPLICA_CHANGE, REDISMODULE_SUBEVENT_REPLICA_CHANGE_ONLINE, NULL); return C_OK; /* The caller can return, no full resync needed. */ need_full_resync: /* We need a full resync for some reason... Note that we can't * reply to PSYNC right now if a full SYNC is needed. The reply * must include the master offset at the time the RDB file we transfer * is generated, so we need to delay the reply to that moment. */ return C_ERR; } /* Start a BGSAVE for replication goals, which is, selecting the disk or * socket target depending on the configuration, and making sure that * the script cache is flushed before to start. * * The mincapa argument is the bitwise AND among all the slaves capabilities * of the slaves waiting for this BGSAVE, so represents the slave capabilities * all the slaves support. Can be tested via SLAVE_CAPA_* macros. * * Side effects, other than starting a BGSAVE: * * 1) Handle the slaves in WAIT_START state, by preparing them for a full * sync if the BGSAVE was successfully started, or sending them an error * and dropping them from the list of slaves. * * 2) Flush the Lua scripting script cache if the BGSAVE was actually * started. * * Returns C_OK on success or C_ERR otherwise. */ int startBgsaveForReplication(int mincapa, int req) { int retval; int socket_target = 0; listIter li; listNode *ln; /* We use a socket target if slave can handle the EOF marker and we're configured to do diskless syncs. * Note that in case we're creating a "filtered" RDB (functions-only, for example) we also force socket replication * to avoid overwriting the snapshot RDB file with filtered data. */ socket_target = (server.repl_diskless_sync || req & SLAVE_REQ_RDB_MASK) && (mincapa & SLAVE_CAPA_EOF); /* `SYNC` should have failed with error if we don't support socket and require a filter, assert this here */ serverAssert(socket_target || !(req & SLAVE_REQ_RDB_MASK)); serverLog(LL_NOTICE,"Starting BGSAVE for SYNC with target: %s", socket_target ? "replicas sockets" : "disk"); rdbSaveInfo rsi, *rsiptr; rsiptr = rdbPopulateSaveInfo(&rsi); /* Only do rdbSave* when rsiptr is not NULL, * otherwise slave will miss repl-stream-db. */ if (rsiptr) { if (socket_target) retval = rdbSaveToSlavesSockets(req,rsiptr); else { /* Keep the page cache since it'll get used soon */ retval = rdbSaveBackground(req,server.rdb_filename,rsiptr,RDBFLAGS_KEEP_CACHE); } } else { serverLog(LL_WARNING,"BGSAVE for replication: replication information not available, can't generate the RDB file right now. Try later."); retval = C_ERR; } /* If we succeeded to start a BGSAVE with disk target, let's remember * this fact, so that we can later delete the file if needed. Note * that we don't set the flag to 1 if the feature is disabled, otherwise * it would never be cleared: the file is not deleted. This way if * the user enables it later with CONFIG SET, we are fine. */ if (retval == C_OK && !socket_target && server.rdb_del_sync_files) RDBGeneratedByReplication = 1; /* If we failed to BGSAVE, remove the slaves waiting for a full * resynchronization from the list of slaves, inform them with * an error about what happened, close the connection ASAP. */ if (retval == C_ERR) { serverLog(LL_WARNING,"BGSAVE for replication failed"); listRewind(server.slaves,&li); while((ln = listNext(&li))) { client *slave = ln->value; if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_START) { slave->replstate = REPL_STATE_NONE; slave->flags &= ~CLIENT_SLAVE; listDelNode(server.slaves,ln); addReplyError(slave, "BGSAVE failed, replication can't continue"); slave->flags |= CLIENT_CLOSE_AFTER_REPLY; } } return retval; } /* If the target is socket, rdbSaveToSlavesSockets() already setup * the slaves for a full resync. Otherwise for disk target do it now.*/ if (!socket_target) { listRewind(server.slaves,&li); while((ln = listNext(&li))) { client *slave = ln->value; if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_START) { /* Check slave has the exact requirements */ if (slave->slave_req != req) continue; replicationSetupSlaveForFullResync(slave, getPsyncInitialOffset()); } } } return retval; } /* SYNC and PSYNC command implementation. */ void syncCommand(client *c) { /* ignore SYNC if already slave or in monitor mode */ if (c->flags & CLIENT_SLAVE) return; /* Check if this is a failover request to a replica with the same replid and * become a master if so. */ if (c->argc > 3 && !strcasecmp(c->argv[0]->ptr,"psync") && !strcasecmp(c->argv[3]->ptr,"failover")) { serverLog(LL_NOTICE, "Failover request received for replid %s.", (unsigned char *)c->argv[1]->ptr); if (!server.masterhost) { addReplyError(c, "PSYNC FAILOVER can't be sent to a master."); return; } if (!strcasecmp(c->argv[1]->ptr,server.replid)) { replicationUnsetMaster(); sds client = catClientInfoString(sdsempty(),c); serverLog(LL_NOTICE, "MASTER MODE enabled (failover request from '%s')",client); sdsfree(client); } else { addReplyError(c, "PSYNC FAILOVER replid must match my replid."); return; } } /* Don't let replicas sync with us while we're failing over */ if (server.failover_state != NO_FAILOVER) { addReplyError(c,"-NOMASTERLINK Can't SYNC while failing over"); return; } /* Refuse SYNC requests if we are a slave but the link with our master * is not ok... */ if (server.masterhost && server.repl_state != REPL_STATE_CONNECTED) { addReplyError(c,"-NOMASTERLINK Can't SYNC while not connected with my master"); return; } /* SYNC can't be issued when the server has pending data to send to * the client about already issued commands. We need a fresh reply * buffer registering the differences between the BGSAVE and the current * dataset, so that we can copy to other slaves if needed. */ if (clientHasPendingReplies(c)) { addReplyError(c,"SYNC and PSYNC are invalid with pending output"); return; } /* Fail sync if slave doesn't support EOF capability but wants a filtered RDB. This is because we force filtered * RDB's to be generated over a socket and not through a file to avoid conflicts with the snapshot files. Forcing * use of a socket is handled, if needed, in `startBgsaveForReplication`. */ if (c->slave_req & SLAVE_REQ_RDB_MASK && !(c->slave_capa & SLAVE_CAPA_EOF)) { addReplyError(c,"Filtered replica requires EOF capability"); return; } serverLog(LL_NOTICE,"Replica %s asks for synchronization", replicationGetSlaveName(c)); /* Try a partial resynchronization if this is a PSYNC command. * If it fails, we continue with usual full resynchronization, however * when this happens replicationSetupSlaveForFullResync will replied * with: * * +FULLRESYNC * * So the slave knows the new replid and offset to try a PSYNC later * if the connection with the master is lost. */ if (!strcasecmp(c->argv[0]->ptr,"psync")) { long long psync_offset; if (getLongLongFromObjectOrReply(c, c->argv[2], &psync_offset, NULL) != C_OK) { serverLog(LL_WARNING, "Replica %s asks for synchronization but with a wrong offset", replicationGetSlaveName(c)); return; } if (masterTryPartialResynchronization(c, psync_offset) == C_OK) { server.stat_sync_partial_ok++; return; /* No full resync needed, return. */ } else { char *master_replid = c->argv[1]->ptr; /* Increment stats for failed PSYNCs, but only if the * replid is not "?", as this is used by slaves to force a full * resync on purpose when they are not able to partially * resync. */ if (master_replid[0] != '?') server.stat_sync_partial_err++; } } else { /* If a slave uses SYNC, we are dealing with an old implementation * of the replication protocol (like redis-cli --slave). Flag the client * so that we don't expect to receive REPLCONF ACK feedbacks. */ c->flags |= CLIENT_PRE_PSYNC; } /* Full resynchronization. */ server.stat_sync_full++; /* Setup the slave as one waiting for BGSAVE to start. The following code * paths will change the state if we handle the slave differently. */ c->replstate = SLAVE_STATE_WAIT_BGSAVE_START; if (server.repl_disable_tcp_nodelay) connDisableTcpNoDelay(c->conn); /* Non critical if it fails. */ c->repldbfd = -1; c->flags |= CLIENT_SLAVE; listAddNodeTail(server.slaves,c); /* Create the replication backlog if needed. */ if (listLength(server.slaves) == 1 && server.repl_backlog == NULL) { /* When we create the backlog from scratch, we always use a new * replication ID and clear the ID2, since there is no valid * past history. */ changeReplicationId(); clearReplicationId2(); createReplicationBacklog(); serverLog(LL_NOTICE,"Replication backlog created, my new " "replication IDs are '%s' and '%s'", server.replid, server.replid2); } /* CASE 1: BGSAVE is in progress, with disk target. */ if (server.child_type == CHILD_TYPE_RDB && server.rdb_child_type == RDB_CHILD_TYPE_DISK) { /* Ok a background save is in progress. Let's check if it is a good * one for replication, i.e. if there is another slave that is * registering differences since the server forked to save. */ client *slave; listNode *ln; listIter li; listRewind(server.slaves,&li); while((ln = listNext(&li))) { slave = ln->value; /* If the client needs a buffer of commands, we can't use * a replica without replication buffer. */ if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_END && (!(slave->flags & CLIENT_REPL_RDBONLY) || (c->flags & CLIENT_REPL_RDBONLY))) break; } /* To attach this slave, we check that it has at least all the * capabilities of the slave that triggered the current BGSAVE * and its exact requirements. */ if (ln && ((c->slave_capa & slave->slave_capa) == slave->slave_capa) && c->slave_req == slave->slave_req) { /* Perfect, the server is already registering differences for * another slave. Set the right state, and copy the buffer. * We don't copy buffer if clients don't want. */ if (!(c->flags & CLIENT_REPL_RDBONLY)) copyReplicaOutputBuffer(c,slave); replicationSetupSlaveForFullResync(c,slave->psync_initial_offset); serverLog(LL_NOTICE,"Waiting for end of BGSAVE for SYNC"); } else { /* No way, we need to wait for the next BGSAVE in order to * register differences. */ serverLog(LL_NOTICE,"Can't attach the replica to the current BGSAVE. Waiting for next BGSAVE for SYNC"); } /* CASE 2: BGSAVE is in progress, with socket target. */ } else if (server.child_type == CHILD_TYPE_RDB && server.rdb_child_type == RDB_CHILD_TYPE_SOCKET) { /* There is an RDB child process but it is writing directly to * children sockets. We need to wait for the next BGSAVE * in order to synchronize. */ serverLog(LL_NOTICE,"Current BGSAVE has socket target. Waiting for next BGSAVE for SYNC"); /* CASE 3: There is no BGSAVE is in progress. */ } else { if (server.repl_diskless_sync && (c->slave_capa & SLAVE_CAPA_EOF) && server.repl_diskless_sync_delay) { /* Diskless replication RDB child is created inside * replicationCron() since we want to delay its start a * few seconds to wait for more slaves to arrive. */ serverLog(LL_NOTICE,"Delay next BGSAVE for diskless SYNC"); } else { /* We don't have a BGSAVE in progress, let's start one. Diskless * or disk-based mode is determined by replica's capacity. */ if (!hasActiveChildProcess()) { startBgsaveForReplication(c->slave_capa, c->slave_req); } else { serverLog(LL_NOTICE, "No BGSAVE in progress, but another BG operation is active. " "BGSAVE for replication delayed"); } } } return; } /* REPLCONF