/*------------------------------------------------------------------------- * * origin.c * Logical replication progress tracking support. * * Copyright (c) 2013-2021, PostgreSQL Global Development Group * * IDENTIFICATION * src/backend/replication/logical/origin.c * * NOTES * * This file provides the following: * * An infrastructure to name nodes in a replication setup * * A facility to efficiently store and persist replication progress in an * efficient and durable manner. * * Replication origin consist out of a descriptive, user defined, external * name and a short, thus space efficient, internal 2 byte one. This split * exists because replication origin have to be stored in WAL and shared * memory and long descriptors would be inefficient. For now only use 2 bytes * for the internal id of a replication origin as it seems unlikely that there * soon will be more than 65k nodes in one replication setup; and using only * two bytes allow us to be more space efficient. * * Replication progress is tracked in a shared memory table * (ReplicationState) that's dumped to disk every checkpoint. Entries * ('slots') in this table are identified by the internal id. That's the case * because it allows to increase replication progress during crash * recovery. To allow doing so we store the original LSN (from the originating * system) of a transaction in the commit record. That allows to recover the * precise replayed state after crash recovery; without requiring synchronous * commits. Allowing logical replication to use asynchronous commit is * generally good for performance, but especially important as it allows a * single threaded replay process to keep up with a source that has multiple * backends generating changes concurrently. For efficiency and simplicity * reasons a backend can setup one replication origin that's from then used as * the source of changes produced by the backend, until reset again. * * This infrastructure is intended to be used in cooperation with logical * decoding. When replaying from a remote system the configured origin is * provided to output plugins, allowing prevention of replication loops and * other filtering. * * There are several levels of locking at work: * * * To create and drop replication origins an exclusive lock on * pg_replication_slot is required for the duration. That allows us to * safely and conflict free assign new origins using a dirty snapshot. * * * When creating an in-memory replication progress slot the ReplicationOrigin * LWLock has to be held exclusively; when iterating over the replication * progress a shared lock has to be held, the same when advancing the * replication progress of an individual backend that has not setup as the * session's replication origin. * * * When manipulating or looking at the remote_lsn and local_lsn fields of a * replication progress slot that slot's lwlock has to be held. That's * primarily because we do not assume 8 byte writes (the LSN) is atomic on * all our platforms, but it also simplifies memory ordering concerns * between the remote and local lsn. We use a lwlock instead of a spinlock * so it's less harmful to hold the lock over a WAL write * (cf. AdvanceReplicationProgress). * * --------------------------------------------------------------------------- */ #include "postgres.h" #include #include #include "access/genam.h" #include "access/htup_details.h" #include "access/table.h" #include "access/xact.h" #include "catalog/catalog.h" #include "catalog/indexing.h" #include "funcapi.h" #include "miscadmin.h" #include "nodes/execnodes.h" #include "pgstat.h" #include "replication/logical.h" #include "replication/origin.h" #include "storage/condition_variable.h" #include "storage/copydir.h" #include "storage/fd.h" #include "storage/ipc.h" #include "storage/lmgr.h" #include "utils/builtins.h" #include "utils/fmgroids.h" #include "utils/pg_lsn.h" #include "utils/rel.h" #include "utils/snapmgr.h" #include "utils/syscache.h" /* * Replay progress of a single remote node. */ typedef struct ReplicationState { /* * Local identifier for the remote node. */ RepOriginId roident; /* * Location of the latest commit from the remote side. */ XLogRecPtr remote_lsn; /* * Remember the local lsn of the commit record so we can XLogFlush() to it * during a checkpoint so we know the commit record actually is safe on * disk. */ XLogRecPtr local_lsn; /* * PID of backend that's acquired slot, or 0 if none. */ int acquired_by; /* * Condition variable that's signaled when acquired_by changes. */ ConditionVariable origin_cv; /* * Lock protecting remote_lsn and local_lsn. */ LWLock lock; } ReplicationState; /* * On disk version of ReplicationState. */ typedef struct ReplicationStateOnDisk { RepOriginId roident; XLogRecPtr remote_lsn; } ReplicationStateOnDisk; typedef struct ReplicationStateCtl { /* Tranche to use for per-origin LWLocks */ int tranche_id; /* Array of length max_replication_slots */ ReplicationState states[FLEXIBLE_ARRAY_MEMBER]; } ReplicationStateCtl; /* external variables */ RepOriginId replorigin_session_origin = InvalidRepOriginId; /* assumed identity */ XLogRecPtr replorigin_session_origin_lsn = InvalidXLogRecPtr; TimestampTz replorigin_session_origin_timestamp = 0; /* * Base address into a shared memory array of replication states of size * max_replication_slots. * * XXX: Should we use a separate variable to size this rather than * max_replication_slots? */ static ReplicationState *replication_states; /* * Actual shared memory block (replication_states[] is now part of this). */ static ReplicationStateCtl *replication_states_ctl; /* * Backend-local, cached element from ReplicationState for use in a backend * replaying remote commits, so we don't have to search ReplicationState for * the backends current RepOriginId. */ static ReplicationState *session_replication_state = NULL; /* Magic for on disk files. */ #define REPLICATION_STATE_MAGIC ((uint32) 0x1257DADE) static void replorigin_check_prerequisites(bool check_slots, bool recoveryOK) { if (check_slots && max_replication_slots == 0) ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), errmsg("cannot query or manipulate replication origin when max_replication_slots = 0"))); if (!recoveryOK && RecoveryInProgress()) ereport(ERROR, (errcode(ERRCODE_READ_ONLY_SQL_TRANSACTION), errmsg("cannot manipulate replication origins during recovery"))); } /* --------------------------------------------------------------------------- * Functions for working with replication origins themselves. * --------------------------------------------------------------------------- */ /* * Check for a persistent replication origin identified by name. * * Returns InvalidOid if the node isn't known yet and missing_ok is true. */ RepOriginId replorigin_by_name(const char *roname, bool missing_ok) { Form_pg_replication_origin ident; Oid roident = InvalidOid; HeapTuple tuple; Datum roname_d; roname_d = CStringGetTextDatum(roname); tuple = SearchSysCache1(REPLORIGNAME, roname_d); if (HeapTupleIsValid(tuple)) { ident = (Form_pg_replication_origin) GETSTRUCT(tuple); roident = ident->roident; ReleaseSysCache(tuple); } else if (!missing_ok) ereport(ERROR, (errcode(ERRCODE_UNDEFINED_OBJECT), errmsg("replication origin \"%s\" does not exist", roname))); return roident; } /* * Create a replication origin. * * Needs to be called in a transaction. */ RepOriginId replorigin_create(const char *roname) { Oid roident; HeapTuple tuple = NULL; Relation rel; Datum roname_d; SnapshotData SnapshotDirty; SysScanDesc scan; ScanKeyData key; roname_d = CStringGetTextDatum(roname); Assert(IsTransactionState()); /* * We need the numeric replication origin to be 16bit wide, so we cannot * rely on the normal oid allocation. Instead we simply scan * pg_replication_origin for the first unused id. That's not particularly * efficient, but this should be a fairly infrequent operation - we can * easily spend a bit more code on this when it turns out it needs to be * faster. * * We handle concurrency by taking an exclusive lock (allowing reads!) * over the table for the duration of the search. Because we use a "dirty * snapshot" we can read rows that other in-progress sessions have * written, even though they would be invisible with normal snapshots. Due * to the exclusive lock there's no danger that new rows can appear while * we're checking. */ InitDirtySnapshot(SnapshotDirty); rel = table_open(ReplicationOriginRelationId, ExclusiveLock); for (roident = InvalidOid + 1; roident < PG_UINT16_MAX; roident++) { bool nulls[Natts_pg_replication_origin]; Datum values[Natts_pg_replication_origin]; bool collides; CHECK_FOR_INTERRUPTS(); ScanKeyInit(&key, Anum_pg_replication_origin_roident, BTEqualStrategyNumber, F_OIDEQ, ObjectIdGetDatum(roident)); scan = systable_beginscan(rel, ReplicationOriginIdentIndex, true /* indexOK */ , &SnapshotDirty, 1, &key); collides = HeapTupleIsValid(systable_getnext(scan)); systable_endscan(scan); if (!collides) { /* * Ok, found an unused roident, insert the new row and do a CCI, * so our callers can look it up if they want to. */ memset(&nulls, 0, sizeof(nulls)); values[Anum_pg_replication_origin_roident - 1] = ObjectIdGetDatum(roident); values[Anum_pg_replication_origin_roname - 1] = roname_d; tuple = heap_form_tuple(RelationGetDescr(rel), values, nulls); CatalogTupleInsert(rel, tuple); CommandCounterIncrement(); break; } } /* now release lock again, */ table_close(rel, ExclusiveLock); if (tuple == NULL) ereport(ERROR, (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED), errmsg("could not find free replication origin OID"))); heap_freetuple(tuple); return roident; } /* * Helper function to drop a replication origin. */ static void replorigin_drop_guts(Relation rel, RepOriginId roident, bool nowait) { HeapTuple tuple; int i; /* * First, clean up the slot state info, if there is any matching slot. */ restart: tuple = NULL; LWLockAcquire(ReplicationOriginLock, LW_EXCLUSIVE); for (i = 0; i < max_replication_slots; i++) { ReplicationState *state = &replication_states[i]; if (state->roident == roident) { /* found our slot, is it busy? */ if (state->acquired_by != 0) { ConditionVariable *cv; if (nowait) ereport(ERROR, (errcode(ERRCODE_OBJECT_IN_USE), errmsg("could not drop replication origin with OID %d, in use by PID %d", state->roident, state->acquired_by))); /* * We must wait and then retry. Since we don't know which CV * to wait on until here, we can't readily use * ConditionVariablePrepareToSleep (calling it here would be * wrong, since we could miss the signal if we did so); just * use ConditionVariableSleep directly. */ cv = &state->origin_cv; LWLockRelease(ReplicationOriginLock); ConditionVariableSleep(cv, WAIT_EVENT_REPLICATION_ORIGIN_DROP); goto restart; } /* first make a WAL log entry */ { xl_replorigin_drop xlrec; xlrec.node_id = roident; XLogBeginInsert(); XLogRegisterData((char *) (&xlrec), sizeof(xlrec)); XLogInsert(RM_REPLORIGIN_ID, XLOG_REPLORIGIN_DROP); } /* then clear the in-memory slot */ state->roident = InvalidRepOriginId; state->remote_lsn = InvalidXLogRecPtr; state->local_lsn = InvalidXLogRecPtr; break; } } LWLockRelease(ReplicationOriginLock); ConditionVariableCancelSleep(); /* * Now, we can delete the catalog entry. */ tuple = SearchSysCache1(REPLORIGIDENT, ObjectIdGetDatum(roident)); if (!HeapTupleIsValid(tuple)) elog(ERROR, "cache lookup failed for replication origin with oid %u", roident); CatalogTupleDelete(rel, &tuple->t_self); ReleaseSysCache(tuple); CommandCounterIncrement(); } /* * Drop replication origin (by name). * * Needs to be called in a transaction. */ void replorigin_drop_by_name(const char *name, bool missing_ok, bool nowait) { RepOriginId roident; Relation rel; Assert(IsTransactionState()); /* * To interlock against concurrent drops, we hold ExclusiveLock on * pg_replication_origin till xact commit. * * XXX We can optimize this by acquiring the lock on a specific origin by * using LockSharedObject if required. However, for that, we first to * acquire a lock on ReplicationOriginRelationId, get the origin_id, lock * the specific origin and then re-check if the origin still exists. */ rel = table_open(ReplicationOriginRelationId, ExclusiveLock); roident = replorigin_by_name(name, missing_ok); if (OidIsValid(roident)) replorigin_drop_guts(rel, roident, nowait); /* We keep the lock on pg_replication_origin until commit */ table_close(rel, NoLock); } /* * Lookup replication origin via its oid and return the name. * * The external name is palloc'd in the calling context. * * Returns true if the origin is known, false otherwise. */ bool replorigin_by_oid(RepOriginId roident, bool missing_ok, char **roname) { HeapTuple tuple; Form_pg_replication_origin ric; Assert(OidIsValid((Oid) roident)); Assert(roident != InvalidRepOriginId); Assert(roident != DoNotReplicateId); tuple = SearchSysCache1(REPLORIGIDENT, ObjectIdGetDatum((Oid) roident)); if (HeapTupleIsValid(tuple)) { ric = (Form_pg_replication_origin) GETSTRUCT(tuple); *roname = text_to_cstring(&ric->roname); ReleaseSysCache(tuple); return true; } else { *roname = NULL; if (!missing_ok) ereport(ERROR, (errcode(ERRCODE_UNDEFINED_OBJECT), errmsg("replication origin with OID %u does not exist", roident))); return false; } } /* --------------------------------------------------------------------------- * Functions for handling replication progress. * --------------------------------------------------------------------------- */ Size ReplicationOriginShmemSize(void) { Size size = 0; /* * XXX: max_replication_slots is arguably the wrong thing to use, as here * we keep the replay state of *remote* transactions. But for now it seems * sufficient to reuse it, rather than introduce a separate GUC. */ if (max_replication_slots == 0) return size; size = add_size(size, offsetof(ReplicationStateCtl, states)); size = add_size(size, mul_size(max_replication_slots, sizeof(ReplicationState))); return size; } void ReplicationOriginShmemInit(void) { bool found; if (max_replication_slots == 0) return; replication_states_ctl = (ReplicationStateCtl *) ShmemInitStruct("ReplicationOriginState", ReplicationOriginShmemSize(), &found); replication_states = replication_states_ctl->states; if (!found) { int i; MemSet(replication_states_ctl, 0, ReplicationOriginShmemSize()); replication_states_ctl->tranche_id = LWTRANCHE_REPLICATION_ORIGIN_STATE; for (i = 0; i < max_replication_slots; i++) { LWLockInitialize(&replication_states[i].lock, replication_states_ctl->tranche_id); ConditionVariableInit(&replication_states[i].origin_cv); } } } /* --------------------------------------------------------------------------- * Perform a checkpoint of each replication origin's progress with respect to * the replayed remote_lsn. Make sure that all transactions we refer to in the * checkpoint (local_lsn) are actually on-disk. This might not yet be the case * if the transactions were originally committed asynchronously. * * We store checkpoints in the following format: * +-------+------------------------+------------------+-----+--------+ * | MAGIC | ReplicationStateOnDisk | struct Replic... | ... | CRC32C | EOF * +-------+------------------------+------------------+-----+--------+ * * So its just the magic, followed by the statically sized * ReplicationStateOnDisk structs. Note that the maximum number of * ReplicationState is determined by max_replication_slots. * --------------------------------------------------------------------------- */ void CheckPointReplicationOrigin(void) { const char *tmppath = "pg_logical/replorigin_checkpoint.tmp"; const char *path = "pg_logical/replorigin_checkpoint"; int tmpfd; int i; uint32 magic = REPLICATION_STATE_MAGIC; pg_crc32c crc; if (max_replication_slots == 0) return; INIT_CRC32C(crc); /* make sure no old temp file is remaining */ if (unlink(tmppath) < 0 && errno != ENOENT) ereport(PANIC, (errcode_for_file_access(), errmsg("could not remove file \"%s\": %m", tmppath))); /* * no other backend can perform this at the same time; only one checkpoint * can happen at a time. */ tmpfd = OpenTransientFile(tmppath, O_CREAT | O_EXCL | O_WRONLY | PG_BINARY); if (tmpfd < 0) ereport(PANIC, (errcode_for_file_access(), errmsg("could not create file \"%s\": %m", tmppath))); /* write magic */ errno = 0; if ((write(tmpfd, &magic, sizeof(magic))) != sizeof(magic)) { /* if write didn't set errno, assume problem is no disk space */ if (errno == 0) errno = ENOSPC; ereport(PANIC, (errcode_for_file_access(), errmsg("could not write to file \"%s\": %m", tmppath))); } COMP_CRC32C(crc, &magic, sizeof(magic)); /* prevent concurrent creations/drops */ LWLockAcquire(ReplicationOriginLock, LW_SHARED); /* write actual data */ for (i = 0; i < max_replication_slots; i++) { ReplicationStateOnDisk disk_state; ReplicationState *curstate = &replication_states[i]; XLogRecPtr local_lsn; if (curstate->roident == InvalidRepOriginId) continue; /* zero, to avoid uninitialized padding bytes */ memset(&disk_state, 0, sizeof(disk_state)); LWLockAcquire(&curstate->lock, LW_SHARED); disk_state.roident = curstate->roident; disk_state.remote_lsn = curstate->remote_lsn; local_lsn = curstate->local_lsn; LWLockRelease(&curstate->lock); /* make sure we only write out a commit that's persistent */ XLogFlush(local_lsn); errno = 0; if ((write(tmpfd, &disk_state, sizeof(disk_state))) != sizeof(disk_state)) { /* if write didn't set errno, assume problem is no disk space */ if (errno == 0) errno = ENOSPC; ereport(PANIC, (errcode_for_file_access(), errmsg("could not write to file \"%s\": %m", tmppath))); } COMP_CRC32C(crc, &disk_state, sizeof(disk_state)); } LWLockRelease(ReplicationOriginLock); /* write out the CRC */ FIN_CRC32C(crc); errno = 0; if ((write(tmpfd, &crc, sizeof(crc))) != sizeof(crc)) { /* if write didn't set errno, assume problem is no disk space */ if (errno == 0) errno = ENOSPC; ereport(PANIC, (errcode_for_file_access(), errmsg("could not write to file \"%s\": %m", tmppath))); } if (CloseTransientFile(tmpfd) != 0) ereport(PANIC, (errcode_for_file_access(), errmsg("could not close file \"%s\": %m", tmppath))); /* fsync, rename to permanent file, fsync file and directory */ durable_rename(tmppath, path, PANIC); } /* * Recover replication replay status from checkpoint data saved earlier by * CheckPointReplicationOrigin. * * This only needs to be called at startup and *not* during every checkpoint * read during recovery (e.g. in HS or PITR from a base backup) afterwards. All * state thereafter can be recovered by looking at commit records. */ void StartupReplicationOrigin(void) { const char *path = "pg_logical/replorigin_checkpoint"; int fd; int readBytes; uint32 magic = REPLICATION_STATE_MAGIC; int last_state = 0; pg_crc32c file_crc; pg_crc32c crc; /* don't want to overwrite already existing state */ #ifdef USE_ASSERT_CHECKING static bool already_started = false; Assert(!already_started); already_started = true; #endif if (max_replication_slots == 0) return; INIT_CRC32C(crc); elog(DEBUG2, "starting up replication origin progress state"); fd = OpenTransientFile(path, O_RDONLY | PG_BINARY); /* * might have had max_replication_slots == 0 last run, or we just brought * up a standby. */ if (fd < 0 && errno == ENOENT) return; else if (fd < 0) ereport(PANIC, (errcode_for_file_access(), errmsg("could not open file \"%s\": %m", path))); /* verify magic, that is written even if nothing was active */ readBytes = read(fd, &magic, sizeof(magic)); if (readBytes != sizeof(magic)) { if (readBytes < 0) ereport(PANIC, (errcode_for_file_access(), errmsg("could not read file \"%s\": %m", path))); else ereport(PANIC, (errcode(ERRCODE_DATA_CORRUPTED), errmsg("could not read file \"%s\": read %d of %zu", path, readBytes, sizeof(magic)))); } COMP_CRC32C(crc, &magic, sizeof(magic)); if (magic != REPLICATION_STATE_MAGIC) ereport(PANIC, (errmsg("replication checkpoint has wrong magic %u instead of %u", magic, REPLICATION_STATE_MAGIC))); /* we can skip locking here, no other access is possible */ /* recover individual states, until there are no more to be found */ while (true) { ReplicationStateOnDisk disk_state; readBytes = read(fd, &disk_state, sizeof(disk_state)); /* no further data */ if (readBytes == sizeof(crc)) { /* not pretty, but simple ... */ file_crc = *(pg_crc32c *) &disk_state; break; } if (readBytes < 0) { ereport(PANIC, (errcode_for_file_access(), errmsg("could not read file \"%s\": %m", path))); } if (readBytes != sizeof(disk_state)) { ereport(PANIC, (errcode_for_file_access(), errmsg("could not read file \"%s\": read %d of %zu", path, readBytes, sizeof(disk_state)))); } COMP_CRC32C(crc, &disk_state, sizeof(disk_state)); if (last_state == max_replication_slots) ereport(PANIC, (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED), errmsg("could not find free replication state, increase max_replication_slots"))); /* copy data to shared memory */ replication_states[last_state].roident = disk_state.roident; replication_states[last_state].remote_lsn = disk_state.remote_lsn; last_state++; ereport(LOG, (errmsg("recovered replication state of node %u to %X/%X", disk_state.roident, LSN_FORMAT_ARGS(disk_state.remote_lsn)))); } /* now check checksum */ FIN_CRC32C(crc); if (file_crc != crc) ereport(PANIC, (errcode(ERRCODE_DATA_CORRUPTED), errmsg("replication slot checkpoint has wrong checksum %u, expected %u", crc, file_crc))); if (CloseTransientFile(fd) != 0) ereport(PANIC, (errcode_for_file_access(), errmsg("could not close file \"%s\": %m", path))); } void replorigin_redo(XLogReaderState *record) { uint8 info = XLogRecGetInfo(record) & ~XLR_INFO_MASK; switch (info) { case XLOG_REPLORIGIN_SET: { xl_replorigin_set *xlrec = (xl_replorigin_set *) XLogRecGetData(record); replorigin_advance(xlrec->node_id, xlrec->remote_lsn, record->EndRecPtr, xlrec->force /* backward */ , false /* WAL log */ ); break; } case XLOG_REPLORIGIN_DROP: { xl_replorigin_drop *xlrec; int i; xlrec = (xl_replorigin_drop *) XLogRecGetData(record); for (i = 0; i < max_replication_slots; i++) { ReplicationState *state = &replication_states[i]; /* found our slot */ if (state->roident == xlrec->node_id) { /* reset entry */ state->roident = InvalidRepOriginId; state->remote_lsn = InvalidXLogRecPtr; state->local_lsn = InvalidXLogRecPtr; break; } } break; } default: elog(PANIC, "replorigin_redo: unknown op code %u", info); } } /* * Tell the replication origin progress machinery that a commit from 'node' * that originated at the LSN remote_commit on the remote node was replayed * successfully and that we don't need to do so again. In combination with * setting up replorigin_session_origin_lsn and replorigin_session_origin * that ensures we won't lose knowledge about that after a crash if the * transaction had a persistent effect (think of asynchronous commits). * * local_commit needs to be a local LSN of the commit so that we can make sure * upon a checkpoint that enough WAL has been persisted to disk. * * Needs to be called with a RowExclusiveLock on pg_replication_origin, * unless running in recovery. */ void replorigin_advance(RepOriginId node, XLogRecPtr remote_commit, XLogRecPtr local_commit, bool go_backward, bool wal_log) { int i; ReplicationState *replication_state = NULL; ReplicationState *free_state = NULL; Assert(node != InvalidRepOriginId); /* we don't track DoNotReplicateId */ if (node == DoNotReplicateId) return; /* * XXX: For the case where this is called by WAL replay, it'd be more * efficient to restore into a backend local hashtable and only dump into * shmem after recovery is finished. Let's wait with implementing that * till it's shown to be a measurable expense */ /* Lock exclusively, as we may have to create a new table entry. */ LWLockAcquire(ReplicationOriginLock, LW_EXCLUSIVE); /* * Search for either an existing slot for the origin, or a free one we can * use. */ for (i = 0; i < max_replication_slots; i++) { ReplicationState *curstate = &replication_states[i]; /* remember where to insert if necessary */ if (curstate->roident == InvalidRepOriginId && free_state == NULL) { free_state = curstate; continue; } /* not our slot */ if (curstate->roident != node) { continue; } /* ok, found slot */ replication_state = curstate; LWLockAcquire(&replication_state->lock, LW_EXCLUSIVE); /* Make sure it's not used by somebody else */ if (replication_state->acquired_by != 0) { ereport(ERROR, (errcode(ERRCODE_OBJECT_IN_USE), errmsg("replication origin with OID %d is already active for PID %d", replication_state->roident, replication_state->acquired_by))); } break; } if (replication_state == NULL && free_state == NULL) ereport(ERROR, (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED), errmsg("could not find free replication state slot for replication origin with OID %u", node), errhint("Increase max_replication_slots and try again."))); if (replication_state == NULL) { /* initialize new slot */ LWLockAcquire(&free_state->lock, LW_EXCLUSIVE); replication_state = free_state; Assert(replication_state->remote_lsn == InvalidXLogRecPtr); Assert(replication_state->local_lsn == InvalidXLogRecPtr); replication_state->roident = node; } Assert(replication_state->roident != InvalidRepOriginId); /* * If somebody "forcefully" sets this slot, WAL log it, so it's durable * and the standby gets the message. Primarily this will be called during * WAL replay (of commit records) where no WAL logging is necessary. */ if (wal_log) { xl_replorigin_set xlrec; xlrec.remote_lsn = remote_commit; xlrec.node_id = node; xlrec.force = go_backward; XLogBeginInsert(); XLogRegisterData((char *) (&xlrec), sizeof(xlrec)); XLogInsert(RM_REPLORIGIN_ID, XLOG_REPLORIGIN_SET); } /* * Due to - harmless - race conditions during a checkpoint we could see * values here that are older than the ones we already have in memory. * Don't overwrite those. */ if (go_backward || replication_state->remote_lsn < remote_commit) replication_state->remote_lsn = remote_commit; if (local_commit != InvalidXLogRecPtr && (go_backward || replication_state->local_lsn < local_commit)) replication_state->local_lsn = local_commit; LWLockRelease(&replication_state->lock); /* * Release *after* changing the LSNs, slot isn't acquired and thus could * otherwise be dropped anytime. */ LWLockRelease(ReplicationOriginLock); } XLogRecPtr replorigin_get_progress(RepOriginId node, bool flush) { int i; XLogRecPtr local_lsn = InvalidXLogRecPtr; XLogRecPtr remote_lsn = InvalidXLogRecPtr; /* prevent slots from being concurrently dropped */ LWLockAcquire(ReplicationOriginLock, LW_SHARED); for (i = 0; i < max_replication_slots; i++) { ReplicationState *state; state = &replication_states[i]; if (state->roident == node) { LWLockAcquire(&state->lock, LW_SHARED); remote_lsn = state->remote_lsn; local_lsn = state->local_lsn; LWLockRelease(&state->lock); break; } } LWLockRelease(ReplicationOriginLock); if (flush && local_lsn != InvalidXLogRecPtr) XLogFlush(local_lsn); return remote_lsn; } /* * Tear down a (possibly) configured session replication origin during process * exit. */ static void ReplicationOriginExitCleanup(int code, Datum arg) { ConditionVariable *cv = NULL; LWLockAcquire(ReplicationOriginLock, LW_EXCLUSIVE); if (session_replication_state != NULL && session_replication_state->acquired_by == MyProcPid) { cv = &session_replication_state->origin_cv; session_replication_state->acquired_by = 0; session_replication_state = NULL; } LWLockRelease(ReplicationOriginLock); if (cv) ConditionVariableBroadcast(cv); } /* * Setup a replication origin in the shared memory struct if it doesn't * already exists and cache access to the specific ReplicationSlot so the * array doesn't have to be searched when calling * replorigin_session_advance(). * * Obviously only one such cached origin can exist per process and the current * cached value can only be set again after the previous value is torn down * with replorigin_session_reset(). */ void replorigin_session_setup(RepOriginId node) { static bool registered_cleanup; int i; int free_slot = -1; if (!registered_cleanup) { on_shmem_exit(ReplicationOriginExitCleanup, 0); registered_cleanup = true; } Assert(max_replication_slots > 0); if (session_replication_state != NULL) ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), errmsg("cannot setup replication origin when one is already setup"))); /* Lock exclusively, as we may have to create a new table entry. */ LWLockAcquire(ReplicationOriginLock, LW_EXCLUSIVE); /* * Search for either an existing slot for the origin, or a free one we can * use. */ for (i = 0; i < max_replication_slots; i++) { ReplicationState *curstate = &replication_states[i]; /* remember where to insert if necessary */ if (curstate->roident == InvalidRepOriginId && free_slot == -1) { free_slot = i; continue; } /* not our slot */ if (curstate->roident != node) continue; else if (curstate->acquired_by != 0) { ereport(ERROR, (errcode(ERRCODE_OBJECT_IN_USE), errmsg("replication origin with OID %d is already active for PID %d", curstate->roident, curstate->acquired_by))); } /* ok, found slot */ session_replication_state = curstate; } if (session_replication_state == NULL && free_slot == -1) ereport(ERROR, (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED), errmsg("could not find free replication state slot for replication origin with OID %u", node), errhint("Increase max_replication_slots and try again."))); else if (session_replication_state == NULL) { /* initialize new slot */ session_replication_state = &replication_states[free_slot]; Assert(session_replication_state->remote_lsn == InvalidXLogRecPtr); Assert(session_replication_state->local_lsn == InvalidXLogRecPtr); session_replication_state->roident = node; } Assert(session_replication_state->roident != InvalidRepOriginId); session_replication_state->acquired_by = MyProcPid; LWLockRelease(ReplicationOriginLock); /* probably this one is pointless */ ConditionVariableBroadcast(&session_replication_state->origin_cv); } /* * Reset replay state previously setup in this session. * * This function may only be called if an origin was setup with * replorigin_session_setup(). */ void replorigin_session_reset(void) { ConditionVariable *cv; Assert(max_replication_slots != 0); if (session_replication_state == NULL) ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), errmsg("no replication origin is configured"))); LWLockAcquire(ReplicationOriginLock, LW_EXCLUSIVE); session_replication_state->acquired_by = 0; cv = &session_replication_state->origin_cv; session_replication_state = NULL; LWLockRelease(ReplicationOriginLock); ConditionVariableBroadcast(cv); } /* * Do the same work replorigin_advance() does, just on the session's * configured origin. * * This is noticeably cheaper than using replorigin_advance(). */ void replorigin_session_advance(XLogRecPtr remote_commit, XLogRecPtr local_commit) { Assert(session_replication_state != NULL); Assert(session_replication_state->roident != InvalidRepOriginId); LWLockAcquire(&session_replication_state->lock, LW_EXCLUSIVE); if (session_replication_state->local_lsn < local_commit) session_replication_state->local_lsn = local_commit; if (session_replication_state->remote_lsn < remote_commit) session_replication_state->remote_lsn = remote_commit; LWLockRelease(&session_replication_state->lock); } /* * Ask the machinery about the point up to which we successfully replayed * changes from an already setup replication origin. */ XLogRecPtr replorigin_session_get_progress(bool flush) { XLogRecPtr remote_lsn; XLogRecPtr local_lsn; Assert(session_replication_state != NULL); LWLockAcquire(&session_replication_state->lock, LW_SHARED); remote_lsn = session_replication_state->remote_lsn; local_lsn = session_replication_state->local_lsn; LWLockRelease(&session_replication_state->lock); if (flush && local_lsn != InvalidXLogRecPtr) XLogFlush(local_lsn); return remote_lsn; } /* --------------------------------------------------------------------------- * SQL functions for working with replication origin. * * These mostly should be fairly short wrappers around more generic functions. * --------------------------------------------------------------------------- */ /* * Create replication origin for the passed in name, and return the assigned * oid. */ Datum pg_replication_origin_create(PG_FUNCTION_ARGS) { char *name; RepOriginId roident; replorigin_check_prerequisites(false, false); name = text_to_cstring((text *) DatumGetPointer(PG_GETARG_DATUM(0))); /* Replication origins "pg_xxx" are reserved for internal use */ if (IsReservedName(name)) ereport(ERROR, (errcode(ERRCODE_RESERVED_NAME), errmsg("replication origin name \"%s\" is reserved", name), errdetail("Origin names starting with \"pg_\" are reserved."))); /* * If built with appropriate switch, whine when regression-testing * conventions for replication origin names are violated. */ #ifdef ENFORCE_REGRESSION_TEST_NAME_RESTRICTIONS if (strncmp(name, "regress_", 8) != 0) elog(WARNING, "replication origins created by regression test cases should have names starting with \"regress_\""); #endif roident = replorigin_create(name); pfree(name); PG_RETURN_OID(roident); } /* * Drop replication origin. */ Datum pg_replication_origin_drop(PG_FUNCTION_ARGS) { char *name; replorigin_check_prerequisites(false, false); name = text_to_cstring((text *) DatumGetPointer(PG_GETARG_DATUM(0))); replorigin_drop_by_name(name, false, true); pfree(name); PG_RETURN_VOID(); } /* * Return oid of a replication origin. */ Datum pg_replication_origin_oid(PG_FUNCTION_ARGS) { char *name; RepOriginId roident; replorigin_check_prerequisites(false, false); name = text_to_cstring((text *) DatumGetPointer(PG_GETARG_DATUM(0))); roident = replorigin_by_name(name, true); pfree(name); if (OidIsValid(roident)) PG_RETURN_OID(roident); PG_RETURN_NULL(); } /* * Setup a replication origin for this session. */ Datum pg_replication_origin_session_setup(PG_FUNCTION_ARGS) { char *name; RepOriginId origin; replorigin_check_prerequisites(true, false); name = text_to_cstring((text *) DatumGetPointer(PG_GETARG_DATUM(0))); origin = replorigin_by_name(name, false); replorigin_session_setup(origin); replorigin_session_origin = origin; pfree(name); PG_RETURN_VOID(); } /* * Reset previously setup origin in this session */ Datum pg_replication_origin_session_reset(PG_FUNCTION_ARGS) { replorigin_check_prerequisites(true, false); replorigin_session_reset(); replorigin_session_origin = InvalidRepOriginId; replorigin_session_origin_lsn = InvalidXLogRecPtr; replorigin_session_origin_timestamp = 0; PG_RETURN_VOID(); } /* * Has a replication origin been setup for this session. */ Datum pg_replication_origin_session_is_setup(PG_FUNCTION_ARGS) { replorigin_check_prerequisites(false, false); PG_RETURN_BOOL(replorigin_session_origin != InvalidRepOriginId); } /* * Return the replication progress for origin setup in the current session. * * If 'flush' is set to true it is ensured that the returned value corresponds * to a local transaction that has been flushed. This is useful if asynchronous * commits are used when replaying replicated transactions. */ Datum pg_replication_origin_session_progress(PG_FUNCTION_ARGS) { XLogRecPtr remote_lsn = InvalidXLogRecPtr; bool flush = PG_GETARG_BOOL(0); replorigin_check_prerequisites(true, false); if (session_replication_state == NULL) ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), errmsg("no replication origin is configured"))); remote_lsn = replorigin_session_get_progress(flush); if (remote_lsn == InvalidXLogRecPtr) PG_RETURN_NULL(); PG_RETURN_LSN(remote_lsn); } Datum pg_replication_origin_xact_setup(PG_FUNCTION_ARGS) { XLogRecPtr location = PG_GETARG_LSN(0); replorigin_check_prerequisites(true, false); if (session_replication_state == NULL) ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), errmsg("no replication origin is configured"))); replorigin_session_origin_lsn = location; replorigin_session_origin_timestamp = PG_GETARG_TIMESTAMPTZ(1); PG_RETURN_VOID(); } Datum pg_replication_origin_xact_reset(PG_FUNCTION_ARGS) { replorigin_check_prerequisites(true, false); replorigin_session_origin_lsn = InvalidXLogRecPtr; replorigin_session_origin_timestamp = 0; PG_RETURN_VOID(); } Datum pg_replication_origin_advance(PG_FUNCTION_ARGS) { text *name = PG_GETARG_TEXT_PP(0); XLogRecPtr remote_commit = PG_GETARG_LSN(1); RepOriginId node; replorigin_check_prerequisites(true, false); /* lock to prevent the replication origin from vanishing */ LockRelationOid(ReplicationOriginRelationId, RowExclusiveLock); node = replorigin_by_name(text_to_cstring(name), false); /* * Can't sensibly pass a local commit to be flushed at checkpoint - this * xact hasn't committed yet. This is why this function should be used to * set up the initial replication state, but not for replay. */ replorigin_advance(node, remote_commit, InvalidXLogRecPtr, true /* go backward */ , true /* WAL log */ ); UnlockRelationOid(ReplicationOriginRelationId, RowExclusiveLock); PG_RETURN_VOID(); } /* * Return the replication progress for an individual replication origin. * * If 'flush' is set to true it is ensured that the returned value corresponds * to a local transaction that has been flushed. This is useful if asynchronous * commits are used when replaying replicated transactions. */ Datum pg_replication_origin_progress(PG_FUNCTION_ARGS) { char *name; bool flush; RepOriginId roident; XLogRecPtr remote_lsn = InvalidXLogRecPtr; replorigin_check_prerequisites(true, true); name = text_to_cstring((text *) DatumGetPointer(PG_GETARG_DATUM(0))); flush = PG_GETARG_BOOL(1); roident = replorigin_by_name(name, false); Assert(OidIsValid(roident)); remote_lsn = replorigin_get_progress(roident, flush); if (remote_lsn == InvalidXLogRecPtr) PG_RETURN_NULL(); PG_RETURN_LSN(remote_lsn); } Datum pg_show_replication_origin_status(PG_FUNCTION_ARGS) { ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo; TupleDesc tupdesc; Tuplestorestate *tupstore; MemoryContext per_query_ctx; MemoryContext oldcontext; int i; #define REPLICATION_ORIGIN_PROGRESS_COLS 4 /* we want to return 0 rows if slot is set to zero */ replorigin_check_prerequisites(false, true); if (rsinfo == NULL || !IsA(rsinfo, ReturnSetInfo)) ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), errmsg("set-valued function called in context that cannot accept a set"))); if (!(rsinfo->allowedModes & SFRM_Materialize)) ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), errmsg("materialize mode required, but it is not allowed in this context"))); if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE) elog(ERROR, "return type must be a row type"); if (tupdesc->natts != REPLICATION_ORIGIN_PROGRESS_COLS) elog(ERROR, "wrong function definition"); per_query_ctx = rsinfo->econtext->ecxt_per_query_memory; oldcontext = MemoryContextSwitchTo(per_query_ctx); tupstore = tuplestore_begin_heap(true, false, work_mem); rsinfo->returnMode = SFRM_Materialize; rsinfo->setResult = tupstore; rsinfo->setDesc = tupdesc; MemoryContextSwitchTo(oldcontext); /* prevent slots from being concurrently dropped */ LWLockAcquire(ReplicationOriginLock, LW_SHARED); /* * Iterate through all possible replication_states, display if they are * filled. Note that we do not take any locks, so slightly corrupted/out * of date values are a possibility. */ for (i = 0; i < max_replication_slots; i++) { ReplicationState *state; Datum values[REPLICATION_ORIGIN_PROGRESS_COLS]; bool nulls[REPLICATION_ORIGIN_PROGRESS_COLS]; char *roname; state = &replication_states[i]; /* unused slot, nothing to display */ if (state->roident == InvalidRepOriginId) continue; memset(values, 0, sizeof(values)); memset(nulls, 1, sizeof(nulls)); values[0] = ObjectIdGetDatum(state->roident); nulls[0] = false; /* * We're not preventing the origin to be dropped concurrently, so * silently accept that it might be gone. */ if (replorigin_by_oid(state->roident, true, &roname)) { values[1] = CStringGetTextDatum(roname); nulls[1] = false; } LWLockAcquire(&state->lock, LW_SHARED); values[2] = LSNGetDatum(state->remote_lsn); nulls[2] = false; values[3] = LSNGetDatum(state->local_lsn); nulls[3] = false; LWLockRelease(&state->lock); tuplestore_putvalues(tupstore, tupdesc, values, nulls); } tuplestore_donestoring(tupstore); LWLockRelease(ReplicationOriginLock); #undef REPLICATION_ORIGIN_PROGRESS_COLS return (Datum) 0; }