diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-05-04 12:15:05 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-05-04 12:15:05 +0000 |
commit | 46651ce6fe013220ed397add242004d764fc0153 (patch) | |
tree | 6e5299f990f88e60174a1d3ae6e48eedd2688b2b /src/backend/replication/logical/snapbuild.c | |
parent | Initial commit. (diff) | |
download | postgresql-14-46651ce6fe013220ed397add242004d764fc0153.tar.xz postgresql-14-46651ce6fe013220ed397add242004d764fc0153.zip |
Adding upstream version 14.5.upstream/14.5upstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'src/backend/replication/logical/snapbuild.c')
-rw-r--r-- | src/backend/replication/logical/snapbuild.c | 1995 |
1 files changed, 1995 insertions, 0 deletions
diff --git a/src/backend/replication/logical/snapbuild.c b/src/backend/replication/logical/snapbuild.c new file mode 100644 index 0000000..6df6024 --- /dev/null +++ b/src/backend/replication/logical/snapbuild.c @@ -0,0 +1,1995 @@ +/*------------------------------------------------------------------------- + * + * snapbuild.c + * + * Infrastructure for building historic catalog snapshots based on contents + * of the WAL, for the purpose of decoding heapam.c style values in the + * WAL. + * + * NOTES: + * + * We build snapshots which can *only* be used to read catalog contents and we + * do so by reading and interpreting the WAL stream. The aim is to build a + * snapshot that behaves the same as a freshly taken MVCC snapshot would have + * at the time the XLogRecord was generated. + * + * To build the snapshots we reuse the infrastructure built for Hot + * Standby. The in-memory snapshots we build look different than HS' because + * we have different needs. To successfully decode data from the WAL we only + * need to access catalog tables and (sys|rel|cat)cache, not the actual user + * tables since the data we decode is wholly contained in the WAL + * records. Also, our snapshots need to be different in comparison to normal + * MVCC ones because in contrast to those we cannot fully rely on the clog and + * pg_subtrans for information about committed transactions because they might + * commit in the future from the POV of the WAL entry we're currently + * decoding. This definition has the advantage that we only need to prevent + * removal of catalog rows, while normal table's rows can still be + * removed. This is achieved by using the replication slot mechanism. + * + * As the percentage of transactions modifying the catalog normally is fairly + * small in comparisons to ones only manipulating user data, we keep track of + * the committed catalog modifying ones inside [xmin, xmax) instead of keeping + * track of all running transactions like it's done in a normal snapshot. Note + * that we're generally only looking at transactions that have acquired an + * xid. That is we keep a list of transactions between snapshot->(xmin, xmax) + * that we consider committed, everything else is considered aborted/in + * progress. That also allows us not to care about subtransactions before they + * have committed which means this module, in contrast to HS, doesn't have to + * care about suboverflowed subtransactions and similar. + * + * One complexity of doing this is that to e.g. handle mixed DDL/DML + * transactions we need Snapshots that see intermediate versions of the + * catalog in a transaction. During normal operation this is achieved by using + * CommandIds/cmin/cmax. The problem with that however is that for space + * efficiency reasons only one value of that is stored + * (cf. combocid.c). Since combo CIDs are only available in memory we log + * additional information which allows us to get the original (cmin, cmax) + * pair during visibility checks. Check the reorderbuffer.c's comment above + * ResolveCminCmaxDuringDecoding() for details. + * + * To facilitate all this we need our own visibility routine, as the normal + * ones are optimized for different usecases. + * + * To replace the normal catalog snapshots with decoding ones use the + * SetupHistoricSnapshot() and TeardownHistoricSnapshot() functions. + * + * + * + * The snapbuild machinery is starting up in several stages, as illustrated + * by the following graph describing the SnapBuild->state transitions: + * + * +-------------------------+ + * +----| START |-------------+ + * | +-------------------------+ | + * | | | + * | | | + * | running_xacts #1 | + * | | | + * | | | + * | v | + * | +-------------------------+ v + * | | BUILDING_SNAPSHOT |------------>| + * | +-------------------------+ | + * | | | + * | | | + * | running_xacts #2, xacts from #1 finished | + * | | | + * | | | + * | v | + * | +-------------------------+ v + * | | FULL_SNAPSHOT |------------>| + * | +-------------------------+ | + * | | | + * running_xacts | saved snapshot + * with zero xacts | at running_xacts's lsn + * | | | + * | running_xacts with xacts from #2 finished | + * | | | + * | v | + * | +-------------------------+ | + * +--->|SNAPBUILD_CONSISTENT |<------------+ + * +-------------------------+ + * + * Initially the machinery is in the START stage. When an xl_running_xacts + * record is read that is sufficiently new (above the safe xmin horizon), + * there's a state transition. If there were no running xacts when the + * running_xacts record was generated, we'll directly go into CONSISTENT + * state, otherwise we'll switch to the BUILDING_SNAPSHOT state. Having a full + * snapshot means that all transactions that start henceforth can be decoded + * in their entirety, but transactions that started previously can't. In + * FULL_SNAPSHOT we'll switch into CONSISTENT once all those previously + * running transactions have committed or aborted. + * + * Only transactions that commit after CONSISTENT state has been reached will + * be replayed, even though they might have started while still in + * FULL_SNAPSHOT. That ensures that we'll reach a point where no previous + * changes has been exported, but all the following ones will be. That point + * is a convenient point to initialize replication from, which is why we + * export a snapshot at that point, which *can* be used to read normal data. + * + * Copyright (c) 2012-2021, PostgreSQL Global Development Group + * + * IDENTIFICATION + * src/backend/replication/snapbuild.c + * + *------------------------------------------------------------------------- + */ + +#include "postgres.h" + +#include <sys/stat.h> +#include <unistd.h> + +#include "access/heapam_xlog.h" +#include "access/transam.h" +#include "access/xact.h" +#include "miscadmin.h" +#include "pgstat.h" +#include "replication/logical.h" +#include "replication/reorderbuffer.h" +#include "replication/snapbuild.h" +#include "storage/block.h" /* debugging output */ +#include "storage/fd.h" +#include "storage/lmgr.h" +#include "storage/proc.h" +#include "storage/procarray.h" +#include "storage/standby.h" +#include "utils/builtins.h" +#include "utils/memutils.h" +#include "utils/snapmgr.h" +#include "utils/snapshot.h" + +/* + * This struct contains the current state of the snapshot building + * machinery. Besides a forward declaration in the header, it is not exposed + * to the public, so we can easily change its contents. + */ +struct SnapBuild +{ + /* how far are we along building our first full snapshot */ + SnapBuildState state; + + /* private memory context used to allocate memory for this module. */ + MemoryContext context; + + /* all transactions < than this have committed/aborted */ + TransactionId xmin; + + /* all transactions >= than this are uncommitted */ + TransactionId xmax; + + /* + * Don't replay commits from an LSN < this LSN. This can be set externally + * but it will also be advanced (never retreat) from within snapbuild.c. + */ + XLogRecPtr start_decoding_at; + + /* + * LSN at which we found a consistent point at the time of slot creation. + * This is also the point where we have exported a snapshot for the + * initial copy. + * + * The prepared transactions that are not covered by initial snapshot + * needs to be sent later along with commit prepared and they must be + * before this point. + */ + XLogRecPtr initial_consistent_point; + + /* + * Don't start decoding WAL until the "xl_running_xacts" information + * indicates there are no running xids with an xid smaller than this. + */ + TransactionId initial_xmin_horizon; + + /* Indicates if we are building full snapshot or just catalog one. */ + bool building_full_snapshot; + + /* + * Snapshot that's valid to see the catalog state seen at this moment. + */ + Snapshot snapshot; + + /* + * LSN of the last location we are sure a snapshot has been serialized to. + */ + XLogRecPtr last_serialized_snapshot; + + /* + * The reorderbuffer we need to update with usable snapshots et al. + */ + ReorderBuffer *reorder; + + /* + * TransactionId at which the next phase of initial snapshot building will + * happen. InvalidTransactionId if not known (i.e. SNAPBUILD_START), or + * when no next phase necessary (SNAPBUILD_CONSISTENT). + */ + TransactionId next_phase_at; + + /* + * Array of transactions which could have catalog changes that committed + * between xmin and xmax. + */ + struct + { + /* number of committed transactions */ + size_t xcnt; + + /* available space for committed transactions */ + size_t xcnt_space; + + /* + * Until we reach a CONSISTENT state, we record commits of all + * transactions, not just the catalog changing ones. Record when that + * changes so we know we cannot export a snapshot safely anymore. + */ + bool includes_all_transactions; + + /* + * Array of committed transactions that have modified the catalog. + * + * As this array is frequently modified we do *not* keep it in + * xidComparator order. Instead we sort the array when building & + * distributing a snapshot. + * + * TODO: It's unclear whether that reasoning has much merit. Every + * time we add something here after becoming consistent will also + * require distributing a snapshot. Storing them sorted would + * potentially also make it easier to purge (but more complicated wrt + * wraparound?). Should be improved if sorting while building the + * snapshot shows up in profiles. + */ + TransactionId *xip; + } committed; +}; + +/* + * Starting a transaction -- which we need to do while exporting a snapshot -- + * removes knowledge about the previously used resowner, so we save it here. + */ +static ResourceOwner SavedResourceOwnerDuringExport = NULL; +static bool ExportInProgress = false; + +/* ->committed manipulation */ +static void SnapBuildPurgeCommittedTxn(SnapBuild *builder); + +/* snapshot building/manipulation/distribution functions */ +static Snapshot SnapBuildBuildSnapshot(SnapBuild *builder); + +static void SnapBuildFreeSnapshot(Snapshot snap); + +static void SnapBuildSnapIncRefcount(Snapshot snap); + +static void SnapBuildDistributeNewCatalogSnapshot(SnapBuild *builder, XLogRecPtr lsn); + +/* xlog reading helper functions for SnapBuildProcessRunningXacts */ +static bool SnapBuildFindSnapshot(SnapBuild *builder, XLogRecPtr lsn, xl_running_xacts *running); +static void SnapBuildWaitSnapshot(xl_running_xacts *running, TransactionId cutoff); + +/* serialization functions */ +static void SnapBuildSerialize(SnapBuild *builder, XLogRecPtr lsn); +static bool SnapBuildRestore(SnapBuild *builder, XLogRecPtr lsn); + +/* + * Allocate a new snapshot builder. + * + * xmin_horizon is the xid >= which we can be sure no catalog rows have been + * removed, start_lsn is the LSN >= we want to replay commits. + */ +SnapBuild * +AllocateSnapshotBuilder(ReorderBuffer *reorder, + TransactionId xmin_horizon, + XLogRecPtr start_lsn, + bool need_full_snapshot, + XLogRecPtr initial_consistent_point) +{ + MemoryContext context; + MemoryContext oldcontext; + SnapBuild *builder; + + /* allocate memory in own context, to have better accountability */ + context = AllocSetContextCreate(CurrentMemoryContext, + "snapshot builder context", + ALLOCSET_DEFAULT_SIZES); + oldcontext = MemoryContextSwitchTo(context); + + builder = palloc0(sizeof(SnapBuild)); + + builder->state = SNAPBUILD_START; + builder->context = context; + builder->reorder = reorder; + /* Other struct members initialized by zeroing via palloc0 above */ + + builder->committed.xcnt = 0; + builder->committed.xcnt_space = 128; /* arbitrary number */ + builder->committed.xip = + palloc0(builder->committed.xcnt_space * sizeof(TransactionId)); + builder->committed.includes_all_transactions = true; + + builder->initial_xmin_horizon = xmin_horizon; + builder->start_decoding_at = start_lsn; + builder->building_full_snapshot = need_full_snapshot; + builder->initial_consistent_point = initial_consistent_point; + + MemoryContextSwitchTo(oldcontext); + + return builder; +} + +/* + * Free a snapshot builder. + */ +void +FreeSnapshotBuilder(SnapBuild *builder) +{ + MemoryContext context = builder->context; + + /* free snapshot explicitly, that contains some error checking */ + if (builder->snapshot != NULL) + { + SnapBuildSnapDecRefcount(builder->snapshot); + builder->snapshot = NULL; + } + + /* other resources are deallocated via memory context reset */ + MemoryContextDelete(context); +} + +/* + * Free an unreferenced snapshot that has previously been built by us. + */ +static void +SnapBuildFreeSnapshot(Snapshot snap) +{ + /* make sure we don't get passed an external snapshot */ + Assert(snap->snapshot_type == SNAPSHOT_HISTORIC_MVCC); + + /* make sure nobody modified our snapshot */ + Assert(snap->curcid == FirstCommandId); + Assert(!snap->suboverflowed); + Assert(!snap->takenDuringRecovery); + Assert(snap->regd_count == 0); + + /* slightly more likely, so it's checked even without c-asserts */ + if (snap->copied) + elog(ERROR, "cannot free a copied snapshot"); + + if (snap->active_count) + elog(ERROR, "cannot free an active snapshot"); + + pfree(snap); +} + +/* + * In which state of snapshot building are we? + */ +SnapBuildState +SnapBuildCurrentState(SnapBuild *builder) +{ + return builder->state; +} + +/* + * Return the LSN at which the snapshot was exported + */ +XLogRecPtr +SnapBuildInitialConsistentPoint(SnapBuild *builder) +{ + return builder->initial_consistent_point; +} + +/* + * Should the contents of transaction ending at 'ptr' be decoded? + */ +bool +SnapBuildXactNeedsSkip(SnapBuild *builder, XLogRecPtr ptr) +{ + return ptr < builder->start_decoding_at; +} + +/* + * Increase refcount of a snapshot. + * + * This is used when handing out a snapshot to some external resource or when + * adding a Snapshot as builder->snapshot. + */ +static void +SnapBuildSnapIncRefcount(Snapshot snap) +{ + snap->active_count++; +} + +/* + * Decrease refcount of a snapshot and free if the refcount reaches zero. + * + * Externally visible, so that external resources that have been handed an + * IncRef'ed Snapshot can adjust its refcount easily. + */ +void +SnapBuildSnapDecRefcount(Snapshot snap) +{ + /* make sure we don't get passed an external snapshot */ + Assert(snap->snapshot_type == SNAPSHOT_HISTORIC_MVCC); + + /* make sure nobody modified our snapshot */ + Assert(snap->curcid == FirstCommandId); + Assert(!snap->suboverflowed); + Assert(!snap->takenDuringRecovery); + + Assert(snap->regd_count == 0); + + Assert(snap->active_count > 0); + + /* slightly more likely, so it's checked even without casserts */ + if (snap->copied) + elog(ERROR, "cannot free a copied snapshot"); + + snap->active_count--; + if (snap->active_count == 0) + SnapBuildFreeSnapshot(snap); +} + +/* + * Build a new snapshot, based on currently committed catalog-modifying + * transactions. + * + * In-progress transactions with catalog access are *not* allowed to modify + * these snapshots; they have to copy them and fill in appropriate ->curcid + * and ->subxip/subxcnt values. + */ +static Snapshot +SnapBuildBuildSnapshot(SnapBuild *builder) +{ + Snapshot snapshot; + Size ssize; + + Assert(builder->state >= SNAPBUILD_FULL_SNAPSHOT); + + ssize = sizeof(SnapshotData) + + sizeof(TransactionId) * builder->committed.xcnt + + sizeof(TransactionId) * 1 /* toplevel xid */ ; + + snapshot = MemoryContextAllocZero(builder->context, ssize); + + snapshot->snapshot_type = SNAPSHOT_HISTORIC_MVCC; + + /* + * We misuse the original meaning of SnapshotData's xip and subxip fields + * to make the more fitting for our needs. + * + * In the 'xip' array we store transactions that have to be treated as + * committed. Since we will only ever look at tuples from transactions + * that have modified the catalog it's more efficient to store those few + * that exist between xmin and xmax (frequently there are none). + * + * Snapshots that are used in transactions that have modified the catalog + * also use the 'subxip' array to store their toplevel xid and all the + * subtransaction xids so we can recognize when we need to treat rows as + * visible that are not in xip but still need to be visible. Subxip only + * gets filled when the transaction is copied into the context of a + * catalog modifying transaction since we otherwise share a snapshot + * between transactions. As long as a txn hasn't modified the catalog it + * doesn't need to treat any uncommitted rows as visible, so there is no + * need for those xids. + * + * Both arrays are qsort'ed so that we can use bsearch() on them. + */ + Assert(TransactionIdIsNormal(builder->xmin)); + Assert(TransactionIdIsNormal(builder->xmax)); + + snapshot->xmin = builder->xmin; + snapshot->xmax = builder->xmax; + + /* store all transactions to be treated as committed by this snapshot */ + snapshot->xip = + (TransactionId *) ((char *) snapshot + sizeof(SnapshotData)); + snapshot->xcnt = builder->committed.xcnt; + memcpy(snapshot->xip, + builder->committed.xip, + builder->committed.xcnt * sizeof(TransactionId)); + + /* sort so we can bsearch() */ + qsort(snapshot->xip, snapshot->xcnt, sizeof(TransactionId), xidComparator); + + /* + * Initially, subxip is empty, i.e. it's a snapshot to be used by + * transactions that don't modify the catalog. Will be filled by + * ReorderBufferCopySnap() if necessary. + */ + snapshot->subxcnt = 0; + snapshot->subxip = NULL; + + snapshot->suboverflowed = false; + snapshot->takenDuringRecovery = false; + snapshot->copied = false; + snapshot->curcid = FirstCommandId; + snapshot->active_count = 0; + snapshot->regd_count = 0; + snapshot->snapXactCompletionCount = 0; + + return snapshot; +} + +/* + * Build the initial slot snapshot and convert it to a normal snapshot that + * is understood by HeapTupleSatisfiesMVCC. + * + * The snapshot will be usable directly in current transaction or exported + * for loading in different transaction. + */ +Snapshot +SnapBuildInitialSnapshot(SnapBuild *builder) +{ + Snapshot snap; + TransactionId xid; + TransactionId *newxip; + int newxcnt = 0; + + Assert(!FirstSnapshotSet); + Assert(XactIsoLevel == XACT_REPEATABLE_READ); + + if (builder->state != SNAPBUILD_CONSISTENT) + elog(ERROR, "cannot build an initial slot snapshot before reaching a consistent state"); + + if (!builder->committed.includes_all_transactions) + elog(ERROR, "cannot build an initial slot snapshot, not all transactions are monitored anymore"); + + /* so we don't overwrite the existing value */ + if (TransactionIdIsValid(MyProc->xmin)) + elog(ERROR, "cannot build an initial slot snapshot when MyProc->xmin already is valid"); + + snap = SnapBuildBuildSnapshot(builder); + + /* + * We know that snap->xmin is alive, enforced by the logical xmin + * mechanism. Due to that we can do this without locks, we're only + * changing our own value. + */ +#ifdef USE_ASSERT_CHECKING + { + TransactionId safeXid; + + LWLockAcquire(ProcArrayLock, LW_SHARED); + safeXid = GetOldestSafeDecodingTransactionId(false); + LWLockRelease(ProcArrayLock); + + Assert(TransactionIdPrecedesOrEquals(safeXid, snap->xmin)); + } +#endif + + MyProc->xmin = snap->xmin; + + /* allocate in transaction context */ + newxip = (TransactionId *) + palloc(sizeof(TransactionId) * GetMaxSnapshotXidCount()); + + /* + * snapbuild.c builds transactions in an "inverted" manner, which means it + * stores committed transactions in ->xip, not ones in progress. Build a + * classical snapshot by marking all non-committed transactions as + * in-progress. This can be expensive. + */ + for (xid = snap->xmin; NormalTransactionIdPrecedes(xid, snap->xmax);) + { + void *test; + + /* + * Check whether transaction committed using the decoding snapshot + * meaning of ->xip. + */ + test = bsearch(&xid, snap->xip, snap->xcnt, + sizeof(TransactionId), xidComparator); + + if (test == NULL) + { + if (newxcnt >= GetMaxSnapshotXidCount()) + ereport(ERROR, + (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE), + errmsg("initial slot snapshot too large"))); + + newxip[newxcnt++] = xid; + } + + TransactionIdAdvance(xid); + } + + /* adjust remaining snapshot fields as needed */ + snap->snapshot_type = SNAPSHOT_MVCC; + snap->xcnt = newxcnt; + snap->xip = newxip; + + return snap; +} + +/* + * Export a snapshot so it can be set in another session with SET TRANSACTION + * SNAPSHOT. + * + * For that we need to start a transaction in the current backend as the + * importing side checks whether the source transaction is still open to make + * sure the xmin horizon hasn't advanced since then. + */ +const char * +SnapBuildExportSnapshot(SnapBuild *builder) +{ + Snapshot snap; + char *snapname; + + if (IsTransactionOrTransactionBlock()) + elog(ERROR, "cannot export a snapshot from within a transaction"); + + if (SavedResourceOwnerDuringExport) + elog(ERROR, "can only export one snapshot at a time"); + + SavedResourceOwnerDuringExport = CurrentResourceOwner; + ExportInProgress = true; + + StartTransactionCommand(); + + /* There doesn't seem to a nice API to set these */ + XactIsoLevel = XACT_REPEATABLE_READ; + XactReadOnly = true; + + snap = SnapBuildInitialSnapshot(builder); + + /* + * now that we've built a plain snapshot, make it active and use the + * normal mechanisms for exporting it + */ + snapname = ExportSnapshot(snap); + + ereport(LOG, + (errmsg_plural("exported logical decoding snapshot: \"%s\" with %u transaction ID", + "exported logical decoding snapshot: \"%s\" with %u transaction IDs", + snap->xcnt, + snapname, snap->xcnt))); + return snapname; +} + +/* + * Ensure there is a snapshot and if not build one for current transaction. + */ +Snapshot +SnapBuildGetOrBuildSnapshot(SnapBuild *builder, TransactionId xid) +{ + Assert(builder->state == SNAPBUILD_CONSISTENT); + + /* only build a new snapshot if we don't have a prebuilt one */ + if (builder->snapshot == NULL) + { + builder->snapshot = SnapBuildBuildSnapshot(builder); + /* increase refcount for the snapshot builder */ + SnapBuildSnapIncRefcount(builder->snapshot); + } + + return builder->snapshot; +} + +/* + * Reset a previously SnapBuildExportSnapshot()'ed snapshot if there is + * any. Aborts the previously started transaction and resets the resource + * owner back to its original value. + */ +void +SnapBuildClearExportedSnapshot(void) +{ + ResourceOwner tmpResOwner; + + /* nothing exported, that is the usual case */ + if (!ExportInProgress) + return; + + if (!IsTransactionState()) + elog(ERROR, "clearing exported snapshot in wrong transaction state"); + + /* + * AbortCurrentTransaction() takes care of resetting the snapshot state, + * so remember SavedResourceOwnerDuringExport. + */ + tmpResOwner = SavedResourceOwnerDuringExport; + + /* make sure nothing could have ever happened */ + AbortCurrentTransaction(); + + CurrentResourceOwner = tmpResOwner; +} + +/* + * Clear snapshot export state during transaction abort. + */ +void +SnapBuildResetExportedSnapshotState(void) +{ + SavedResourceOwnerDuringExport = NULL; + ExportInProgress = false; +} + +/* + * Handle the effects of a single heap change, appropriate to the current state + * of the snapshot builder and returns whether changes made at (xid, lsn) can + * be decoded. + */ +bool +SnapBuildProcessChange(SnapBuild *builder, TransactionId xid, XLogRecPtr lsn) +{ + /* + * We can't handle data in transactions if we haven't built a snapshot + * yet, so don't store them. + */ + if (builder->state < SNAPBUILD_FULL_SNAPSHOT) + return false; + + /* + * No point in keeping track of changes in transactions that we don't have + * enough information about to decode. This means that they started before + * we got into the SNAPBUILD_FULL_SNAPSHOT state. + */ + if (builder->state < SNAPBUILD_CONSISTENT && + TransactionIdPrecedes(xid, builder->next_phase_at)) + return false; + + /* + * If the reorderbuffer doesn't yet have a snapshot, add one now, it will + * be needed to decode the change we're currently processing. + */ + if (!ReorderBufferXidHasBaseSnapshot(builder->reorder, xid)) + { + /* only build a new snapshot if we don't have a prebuilt one */ + if (builder->snapshot == NULL) + { + builder->snapshot = SnapBuildBuildSnapshot(builder); + /* increase refcount for the snapshot builder */ + SnapBuildSnapIncRefcount(builder->snapshot); + } + + /* + * Increase refcount for the transaction we're handing the snapshot + * out to. + */ + SnapBuildSnapIncRefcount(builder->snapshot); + ReorderBufferSetBaseSnapshot(builder->reorder, xid, lsn, + builder->snapshot); + } + + return true; +} + +/* + * Do CommandId/combo CID handling after reading an xl_heap_new_cid record. + * This implies that a transaction has done some form of write to system + * catalogs. + */ +void +SnapBuildProcessNewCid(SnapBuild *builder, TransactionId xid, + XLogRecPtr lsn, xl_heap_new_cid *xlrec) +{ + CommandId cid; + + /* + * we only log new_cid's if a catalog tuple was modified, so mark the + * transaction as containing catalog modifications + */ + ReorderBufferXidSetCatalogChanges(builder->reorder, xid, lsn); + + ReorderBufferAddNewTupleCids(builder->reorder, xlrec->top_xid, lsn, + xlrec->target_node, xlrec->target_tid, + xlrec->cmin, xlrec->cmax, + xlrec->combocid); + + /* figure out new command id */ + if (xlrec->cmin != InvalidCommandId && + xlrec->cmax != InvalidCommandId) + cid = Max(xlrec->cmin, xlrec->cmax); + else if (xlrec->cmax != InvalidCommandId) + cid = xlrec->cmax; + else if (xlrec->cmin != InvalidCommandId) + cid = xlrec->cmin; + else + { + cid = InvalidCommandId; /* silence compiler */ + elog(ERROR, "xl_heap_new_cid record without a valid CommandId"); + } + + ReorderBufferAddNewCommandId(builder->reorder, xid, lsn, cid + 1); +} + +/* + * Add a new Snapshot to all transactions we're decoding that currently are + * in-progress so they can see new catalog contents made by the transaction + * that just committed. This is necessary because those in-progress + * transactions will use the new catalog's contents from here on (at the very + * least everything they do needs to be compatible with newer catalog + * contents). + */ +static void +SnapBuildDistributeNewCatalogSnapshot(SnapBuild *builder, XLogRecPtr lsn) +{ + dlist_iter txn_i; + ReorderBufferTXN *txn; + + /* + * Iterate through all toplevel transactions. This can include + * subtransactions which we just don't yet know to be that, but that's + * fine, they will just get an unnecessary snapshot queued. + */ + dlist_foreach(txn_i, &builder->reorder->toplevel_by_lsn) + { + txn = dlist_container(ReorderBufferTXN, node, txn_i.cur); + + Assert(TransactionIdIsValid(txn->xid)); + + /* + * If we don't have a base snapshot yet, there are no changes in this + * transaction which in turn implies we don't yet need a snapshot at + * all. We'll add a snapshot when the first change gets queued. + * + * NB: This works correctly even for subtransactions because + * ReorderBufferAssignChild() takes care to transfer the base snapshot + * to the top-level transaction, and while iterating the changequeue + * we'll get the change from the subtxn. + */ + if (!ReorderBufferXidHasBaseSnapshot(builder->reorder, txn->xid)) + continue; + + /* + * We don't need to add snapshot to prepared transactions as they + * should not see the new catalog contents. + */ + if (rbtxn_prepared(txn) || rbtxn_skip_prepared(txn)) + continue; + + elog(DEBUG2, "adding a new snapshot to %u at %X/%X", + txn->xid, LSN_FORMAT_ARGS(lsn)); + + /* + * increase the snapshot's refcount for the transaction we are handing + * it out to + */ + SnapBuildSnapIncRefcount(builder->snapshot); + ReorderBufferAddSnapshot(builder->reorder, txn->xid, lsn, + builder->snapshot); + } +} + +/* + * Keep track of a new catalog changing transaction that has committed. + */ +static void +SnapBuildAddCommittedTxn(SnapBuild *builder, TransactionId xid) +{ + Assert(TransactionIdIsValid(xid)); + + if (builder->committed.xcnt == builder->committed.xcnt_space) + { + builder->committed.xcnt_space = builder->committed.xcnt_space * 2 + 1; + + elog(DEBUG1, "increasing space for committed transactions to %u", + (uint32) builder->committed.xcnt_space); + + builder->committed.xip = repalloc(builder->committed.xip, + builder->committed.xcnt_space * sizeof(TransactionId)); + } + + /* + * TODO: It might make sense to keep the array sorted here instead of + * doing it every time we build a new snapshot. On the other hand this + * gets called repeatedly when a transaction with subtransactions commits. + */ + builder->committed.xip[builder->committed.xcnt++] = xid; +} + +/* + * Remove knowledge about transactions we treat as committed that are smaller + * than ->xmin. Those won't ever get checked via the ->committed array but via + * the clog machinery, so we don't need to waste memory on them. + */ +static void +SnapBuildPurgeCommittedTxn(SnapBuild *builder) +{ + int off; + TransactionId *workspace; + int surviving_xids = 0; + + /* not ready yet */ + if (!TransactionIdIsNormal(builder->xmin)) + return; + + /* TODO: Neater algorithm than just copying and iterating? */ + workspace = + MemoryContextAlloc(builder->context, + builder->committed.xcnt * sizeof(TransactionId)); + + /* copy xids that still are interesting to workspace */ + for (off = 0; off < builder->committed.xcnt; off++) + { + if (NormalTransactionIdPrecedes(builder->committed.xip[off], + builder->xmin)) + ; /* remove */ + else + workspace[surviving_xids++] = builder->committed.xip[off]; + } + + /* copy workspace back to persistent state */ + memcpy(builder->committed.xip, workspace, + surviving_xids * sizeof(TransactionId)); + + elog(DEBUG3, "purged committed transactions from %u to %u, xmin: %u, xmax: %u", + (uint32) builder->committed.xcnt, (uint32) surviving_xids, + builder->xmin, builder->xmax); + builder->committed.xcnt = surviving_xids; + + pfree(workspace); +} + +/* + * Handle everything that needs to be done when a transaction commits + */ +void +SnapBuildCommitTxn(SnapBuild *builder, XLogRecPtr lsn, TransactionId xid, + int nsubxacts, TransactionId *subxacts) +{ + int nxact; + + bool needs_snapshot = false; + bool needs_timetravel = false; + bool sub_needs_timetravel = false; + + TransactionId xmax = xid; + + /* + * Transactions preceding BUILDING_SNAPSHOT will neither be decoded, nor + * will they be part of a snapshot. So we don't need to record anything. + */ + if (builder->state == SNAPBUILD_START || + (builder->state == SNAPBUILD_BUILDING_SNAPSHOT && + TransactionIdPrecedes(xid, builder->next_phase_at))) + { + /* ensure that only commits after this are getting replayed */ + if (builder->start_decoding_at <= lsn) + builder->start_decoding_at = lsn + 1; + return; + } + + if (builder->state < SNAPBUILD_CONSISTENT) + { + /* ensure that only commits after this are getting replayed */ + if (builder->start_decoding_at <= lsn) + builder->start_decoding_at = lsn + 1; + + /* + * If building an exportable snapshot, force xid to be tracked, even + * if the transaction didn't modify the catalog. + */ + if (builder->building_full_snapshot) + { + needs_timetravel = true; + } + } + + for (nxact = 0; nxact < nsubxacts; nxact++) + { + TransactionId subxid = subxacts[nxact]; + + /* + * Add subtransaction to base snapshot if catalog modifying, we don't + * distinguish to toplevel transactions there. + */ + if (ReorderBufferXidHasCatalogChanges(builder->reorder, subxid)) + { + sub_needs_timetravel = true; + needs_snapshot = true; + + elog(DEBUG1, "found subtransaction %u:%u with catalog changes", + xid, subxid); + + SnapBuildAddCommittedTxn(builder, subxid); + + if (NormalTransactionIdFollows(subxid, xmax)) + xmax = subxid; + } + + /* + * If we're forcing timetravel we also need visibility information + * about subtransaction, so keep track of subtransaction's state, even + * if not catalog modifying. Don't need to distribute a snapshot in + * that case. + */ + else if (needs_timetravel) + { + SnapBuildAddCommittedTxn(builder, subxid); + if (NormalTransactionIdFollows(subxid, xmax)) + xmax = subxid; + } + } + + /* if top-level modified catalog, it'll need a snapshot */ + if (ReorderBufferXidHasCatalogChanges(builder->reorder, xid)) + { + elog(DEBUG2, "found top level transaction %u, with catalog changes", + xid); + needs_snapshot = true; + needs_timetravel = true; + SnapBuildAddCommittedTxn(builder, xid); + } + else if (sub_needs_timetravel) + { + /* track toplevel txn as well, subxact alone isn't meaningful */ + SnapBuildAddCommittedTxn(builder, xid); + } + else if (needs_timetravel) + { + elog(DEBUG2, "forced transaction %u to do timetravel", xid); + + SnapBuildAddCommittedTxn(builder, xid); + } + + if (!needs_timetravel) + { + /* record that we cannot export a general snapshot anymore */ + builder->committed.includes_all_transactions = false; + } + + Assert(!needs_snapshot || needs_timetravel); + + /* + * Adjust xmax of the snapshot builder, we only do that for committed, + * catalog modifying, transactions, everything else isn't interesting for + * us since we'll never look at the respective rows. + */ + if (needs_timetravel && + (!TransactionIdIsValid(builder->xmax) || + TransactionIdFollowsOrEquals(xmax, builder->xmax))) + { + builder->xmax = xmax; + TransactionIdAdvance(builder->xmax); + } + + /* if there's any reason to build a historic snapshot, do so now */ + if (needs_snapshot) + { + /* + * If we haven't built a complete snapshot yet there's no need to hand + * it out, it wouldn't (and couldn't) be used anyway. + */ + if (builder->state < SNAPBUILD_FULL_SNAPSHOT) + return; + + /* + * Decrease the snapshot builder's refcount of the old snapshot, note + * that it still will be used if it has been handed out to the + * reorderbuffer earlier. + */ + if (builder->snapshot) + SnapBuildSnapDecRefcount(builder->snapshot); + + builder->snapshot = SnapBuildBuildSnapshot(builder); + + /* we might need to execute invalidations, add snapshot */ + if (!ReorderBufferXidHasBaseSnapshot(builder->reorder, xid)) + { + SnapBuildSnapIncRefcount(builder->snapshot); + ReorderBufferSetBaseSnapshot(builder->reorder, xid, lsn, + builder->snapshot); + } + + /* refcount of the snapshot builder for the new snapshot */ + SnapBuildSnapIncRefcount(builder->snapshot); + + /* add a new catalog snapshot to all currently running transactions */ + SnapBuildDistributeNewCatalogSnapshot(builder, lsn); + } +} + + +/* ----------------------------------- + * Snapshot building functions dealing with xlog records + * ----------------------------------- + */ + +/* + * Process a running xacts record, and use its information to first build a + * historic snapshot and later to release resources that aren't needed + * anymore. + */ +void +SnapBuildProcessRunningXacts(SnapBuild *builder, XLogRecPtr lsn, xl_running_xacts *running) +{ + ReorderBufferTXN *txn; + TransactionId xmin; + + /* + * If we're not consistent yet, inspect the record to see whether it + * allows to get closer to being consistent. If we are consistent, dump + * our snapshot so others or we, after a restart, can use it. + */ + if (builder->state < SNAPBUILD_CONSISTENT) + { + /* returns false if there's no point in performing cleanup just yet */ + if (!SnapBuildFindSnapshot(builder, lsn, running)) + return; + } + else + SnapBuildSerialize(builder, lsn); + + /* + * Update range of interesting xids based on the running xacts + * information. We don't increase ->xmax using it, because once we are in + * a consistent state we can do that ourselves and much more efficiently + * so, because we only need to do it for catalog transactions since we + * only ever look at those. + * + * NB: We only increase xmax when a catalog modifying transaction commits + * (see SnapBuildCommitTxn). Because of this, xmax can be lower than + * xmin, which looks odd but is correct and actually more efficient, since + * we hit fast paths in heapam_visibility.c. + */ + builder->xmin = running->oldestRunningXid; + + /* Remove transactions we don't need to keep track off anymore */ + SnapBuildPurgeCommittedTxn(builder); + + /* + * Advance the xmin limit for the current replication slot, to allow + * vacuum to clean up the tuples this slot has been protecting. + * + * The reorderbuffer might have an xmin among the currently running + * snapshots; use it if so. If not, we need only consider the snapshots + * we'll produce later, which can't be less than the oldest running xid in + * the record we're reading now. + */ + xmin = ReorderBufferGetOldestXmin(builder->reorder); + if (xmin == InvalidTransactionId) + xmin = running->oldestRunningXid; + elog(DEBUG3, "xmin: %u, xmax: %u, oldest running: %u, oldest xmin: %u", + builder->xmin, builder->xmax, running->oldestRunningXid, xmin); + LogicalIncreaseXminForSlot(lsn, xmin); + + /* + * Also tell the slot where we can restart decoding from. We don't want to + * do that after every commit because changing that implies an fsync of + * the logical slot's state file, so we only do it every time we see a + * running xacts record. + * + * Do so by looking for the oldest in progress transaction (determined by + * the first LSN of any of its relevant records). Every transaction + * remembers the last location we stored the snapshot to disk before its + * beginning. That point is where we can restart from. + */ + + /* + * Can't know about a serialized snapshot's location if we're not + * consistent. + */ + if (builder->state < SNAPBUILD_CONSISTENT) + return; + + txn = ReorderBufferGetOldestTXN(builder->reorder); + + /* + * oldest ongoing txn might have started when we didn't yet serialize + * anything because we hadn't reached a consistent state yet. + */ + if (txn != NULL && txn->restart_decoding_lsn != InvalidXLogRecPtr) + LogicalIncreaseRestartDecodingForSlot(lsn, txn->restart_decoding_lsn); + + /* + * No in-progress transaction, can reuse the last serialized snapshot if + * we have one. + */ + else if (txn == NULL && + builder->reorder->current_restart_decoding_lsn != InvalidXLogRecPtr && + builder->last_serialized_snapshot != InvalidXLogRecPtr) + LogicalIncreaseRestartDecodingForSlot(lsn, + builder->last_serialized_snapshot); +} + + +/* + * Build the start of a snapshot that's capable of decoding the catalog. + * + * Helper function for SnapBuildProcessRunningXacts() while we're not yet + * consistent. + * + * Returns true if there is a point in performing internal maintenance/cleanup + * using the xl_running_xacts record. + */ +static bool +SnapBuildFindSnapshot(SnapBuild *builder, XLogRecPtr lsn, xl_running_xacts *running) +{ + /* --- + * Build catalog decoding snapshot incrementally using information about + * the currently running transactions. There are several ways to do that: + * + * a) There were no running transactions when the xl_running_xacts record + * was inserted, jump to CONSISTENT immediately. We might find such a + * state while waiting on c)'s sub-states. + * + * b) This (in a previous run) or another decoding slot serialized a + * snapshot to disk that we can use. Can't use this method for the + * initial snapshot when slot is being created and needs full snapshot + * for export or direct use, as that snapshot will only contain catalog + * modifying transactions. + * + * c) First incrementally build a snapshot for catalog tuples + * (BUILDING_SNAPSHOT), that requires all, already in-progress, + * transactions to finish. Every transaction starting after that + * (FULL_SNAPSHOT state), has enough information to be decoded. But + * for older running transactions no viable snapshot exists yet, so + * CONSISTENT will only be reached once all of those have finished. + * --- + */ + + /* + * xl_running_xact record is older than what we can use, we might not have + * all necessary catalog rows anymore. + */ + if (TransactionIdIsNormal(builder->initial_xmin_horizon) && + NormalTransactionIdPrecedes(running->oldestRunningXid, + builder->initial_xmin_horizon)) + { + ereport(DEBUG1, + (errmsg_internal("skipping snapshot at %X/%X while building logical decoding snapshot, xmin horizon too low", + LSN_FORMAT_ARGS(lsn)), + errdetail_internal("initial xmin horizon of %u vs the snapshot's %u", + builder->initial_xmin_horizon, running->oldestRunningXid))); + + + SnapBuildWaitSnapshot(running, builder->initial_xmin_horizon); + + return true; + } + + /* + * a) No transaction were running, we can jump to consistent. + * + * This is not affected by races around xl_running_xacts, because we can + * miss transaction commits, but currently not transactions starting. + * + * NB: We might have already started to incrementally assemble a snapshot, + * so we need to be careful to deal with that. + */ + if (running->oldestRunningXid == running->nextXid) + { + if (builder->start_decoding_at == InvalidXLogRecPtr || + builder->start_decoding_at <= lsn) + /* can decode everything after this */ + builder->start_decoding_at = lsn + 1; + + /* As no transactions were running xmin/xmax can be trivially set. */ + builder->xmin = running->nextXid; /* < are finished */ + builder->xmax = running->nextXid; /* >= are running */ + + /* so we can safely use the faster comparisons */ + Assert(TransactionIdIsNormal(builder->xmin)); + Assert(TransactionIdIsNormal(builder->xmax)); + + builder->state = SNAPBUILD_CONSISTENT; + builder->next_phase_at = InvalidTransactionId; + + ereport(LOG, + (errmsg("logical decoding found consistent point at %X/%X", + LSN_FORMAT_ARGS(lsn)), + errdetail("There are no running transactions."))); + + return false; + } + /* b) valid on disk state and not building full snapshot */ + else if (!builder->building_full_snapshot && + SnapBuildRestore(builder, lsn)) + { + /* there won't be any state to cleanup */ + return false; + } + + /* + * c) transition from START to BUILDING_SNAPSHOT. + * + * In START state, and a xl_running_xacts record with running xacts is + * encountered. In that case, switch to BUILDING_SNAPSHOT state, and + * record xl_running_xacts->nextXid. Once all running xacts have finished + * (i.e. they're all >= nextXid), we have a complete catalog snapshot. It + * might look that we could use xl_running_xact's ->xids information to + * get there quicker, but that is problematic because transactions marked + * as running, might already have inserted their commit record - it's + * infeasible to change that with locking. + */ + else if (builder->state == SNAPBUILD_START) + { + builder->state = SNAPBUILD_BUILDING_SNAPSHOT; + builder->next_phase_at = running->nextXid; + + /* + * Start with an xmin/xmax that's correct for future, when all the + * currently running transactions have finished. We'll update both + * while waiting for the pending transactions to finish. + */ + builder->xmin = running->nextXid; /* < are finished */ + builder->xmax = running->nextXid; /* >= are running */ + + /* so we can safely use the faster comparisons */ + Assert(TransactionIdIsNormal(builder->xmin)); + Assert(TransactionIdIsNormal(builder->xmax)); + + ereport(LOG, + (errmsg("logical decoding found initial starting point at %X/%X", + LSN_FORMAT_ARGS(lsn)), + errdetail("Waiting for transactions (approximately %d) older than %u to end.", + running->xcnt, running->nextXid))); + + SnapBuildWaitSnapshot(running, running->nextXid); + } + + /* + * c) transition from BUILDING_SNAPSHOT to FULL_SNAPSHOT. + * + * In BUILDING_SNAPSHOT state, and this xl_running_xacts' oldestRunningXid + * is >= than nextXid from when we switched to BUILDING_SNAPSHOT. This + * means all transactions starting afterwards have enough information to + * be decoded. Switch to FULL_SNAPSHOT. + */ + else if (builder->state == SNAPBUILD_BUILDING_SNAPSHOT && + TransactionIdPrecedesOrEquals(builder->next_phase_at, + running->oldestRunningXid)) + { + builder->state = SNAPBUILD_FULL_SNAPSHOT; + builder->next_phase_at = running->nextXid; + + ereport(LOG, + (errmsg("logical decoding found initial consistent point at %X/%X", + LSN_FORMAT_ARGS(lsn)), + errdetail("Waiting for transactions (approximately %d) older than %u to end.", + running->xcnt, running->nextXid))); + + SnapBuildWaitSnapshot(running, running->nextXid); + } + + /* + * c) transition from FULL_SNAPSHOT to CONSISTENT. + * + * In FULL_SNAPSHOT state (see d) ), and this xl_running_xacts' + * oldestRunningXid is >= than nextXid from when we switched to + * FULL_SNAPSHOT. This means all transactions that are currently in + * progress have a catalog snapshot, and all their changes have been + * collected. Switch to CONSISTENT. + */ + else if (builder->state == SNAPBUILD_FULL_SNAPSHOT && + TransactionIdPrecedesOrEquals(builder->next_phase_at, + running->oldestRunningXid)) + { + builder->state = SNAPBUILD_CONSISTENT; + builder->next_phase_at = InvalidTransactionId; + + ereport(LOG, + (errmsg("logical decoding found consistent point at %X/%X", + LSN_FORMAT_ARGS(lsn)), + errdetail("There are no old transactions anymore."))); + } + + /* + * We already started to track running xacts and need to wait for all + * in-progress ones to finish. We fall through to the normal processing of + * records so incremental cleanup can be performed. + */ + return true; + +} + +/* --- + * Iterate through xids in record, wait for all older than the cutoff to + * finish. Then, if possible, log a new xl_running_xacts record. + * + * This isn't required for the correctness of decoding, but to: + * a) allow isolationtester to notice that we're currently waiting for + * something. + * b) log a new xl_running_xacts record where it'd be helpful, without having + * to wait for bgwriter or checkpointer. + * --- + */ +static void +SnapBuildWaitSnapshot(xl_running_xacts *running, TransactionId cutoff) +{ + int off; + + for (off = 0; off < running->xcnt; off++) + { + TransactionId xid = running->xids[off]; + + /* + * Upper layers should prevent that we ever need to wait on ourselves. + * Check anyway, since failing to do so would either result in an + * endless wait or an Assert() failure. + */ + if (TransactionIdIsCurrentTransactionId(xid)) + elog(ERROR, "waiting for ourselves"); + + if (TransactionIdFollows(xid, cutoff)) + continue; + + XactLockTableWait(xid, NULL, NULL, XLTW_None); + } + + /* + * All transactions we needed to finish finished - try to ensure there is + * another xl_running_xacts record in a timely manner, without having to + * wait for bgwriter or checkpointer to log one. During recovery we can't + * enforce that, so we'll have to wait. + */ + if (!RecoveryInProgress()) + { + LogStandbySnapshot(); + } +} + +/* ----------------------------------- + * Snapshot serialization support + * ----------------------------------- + */ + +/* + * We store current state of struct SnapBuild on disk in the following manner: + * + * struct SnapBuildOnDisk; + * TransactionId * running.xcnt_space; + * TransactionId * committed.xcnt; (*not xcnt_space*) + * + */ +typedef struct SnapBuildOnDisk +{ + /* first part of this struct needs to be version independent */ + + /* data not covered by checksum */ + uint32 magic; + pg_crc32c checksum; + + /* data covered by checksum */ + + /* version, in case we want to support pg_upgrade */ + uint32 version; + /* how large is the on disk data, excluding the constant sized part */ + uint32 length; + + /* version dependent part */ + SnapBuild builder; + + /* variable amount of TransactionIds follows */ +} SnapBuildOnDisk; + +#define SnapBuildOnDiskConstantSize \ + offsetof(SnapBuildOnDisk, builder) +#define SnapBuildOnDiskNotChecksummedSize \ + offsetof(SnapBuildOnDisk, version) + +#define SNAPBUILD_MAGIC 0x51A1E001 +#define SNAPBUILD_VERSION 4 + +/* + * Store/Load a snapshot from disk, depending on the snapshot builder's state. + * + * Supposed to be used by external (i.e. not snapbuild.c) code that just read + * a record that's a potential location for a serialized snapshot. + */ +void +SnapBuildSerializationPoint(SnapBuild *builder, XLogRecPtr lsn) +{ + if (builder->state < SNAPBUILD_CONSISTENT) + SnapBuildRestore(builder, lsn); + else + SnapBuildSerialize(builder, lsn); +} + +/* + * Serialize the snapshot 'builder' at the location 'lsn' if it hasn't already + * been done by another decoding process. + */ +static void +SnapBuildSerialize(SnapBuild *builder, XLogRecPtr lsn) +{ + Size needed_length; + SnapBuildOnDisk *ondisk = NULL; + char *ondisk_c; + int fd; + char tmppath[MAXPGPATH]; + char path[MAXPGPATH]; + int ret; + struct stat stat_buf; + Size sz; + + Assert(lsn != InvalidXLogRecPtr); + Assert(builder->last_serialized_snapshot == InvalidXLogRecPtr || + builder->last_serialized_snapshot <= lsn); + + /* + * no point in serializing if we cannot continue to work immediately after + * restoring the snapshot + */ + if (builder->state < SNAPBUILD_CONSISTENT) + return; + + /* consistent snapshots have no next phase */ + Assert(builder->next_phase_at == InvalidTransactionId); + + /* + * We identify snapshots by the LSN they are valid for. We don't need to + * include timelines in the name as each LSN maps to exactly one timeline + * unless the user used pg_resetwal or similar. If a user did so, there's + * no hope continuing to decode anyway. + */ + sprintf(path, "pg_logical/snapshots/%X-%X.snap", + LSN_FORMAT_ARGS(lsn)); + + /* + * first check whether some other backend already has written the snapshot + * for this LSN. It's perfectly fine if there's none, so we accept ENOENT + * as a valid state. Everything else is an unexpected error. + */ + ret = stat(path, &stat_buf); + + if (ret != 0 && errno != ENOENT) + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not stat file \"%s\": %m", path))); + + else if (ret == 0) + { + /* + * somebody else has already serialized to this point, don't overwrite + * but remember location, so we don't need to read old data again. + * + * To be sure it has been synced to disk after the rename() from the + * tempfile filename to the real filename, we just repeat the fsync. + * That ought to be cheap because in most scenarios it should already + * be safely on disk. + */ + fsync_fname(path, false); + fsync_fname("pg_logical/snapshots", true); + + builder->last_serialized_snapshot = lsn; + goto out; + } + + /* + * there is an obvious race condition here between the time we stat(2) the + * file and us writing the file. But we rename the file into place + * atomically and all files created need to contain the same data anyway, + * so this is perfectly fine, although a bit of a resource waste. Locking + * seems like pointless complication. + */ + elog(DEBUG1, "serializing snapshot to %s", path); + + /* to make sure only we will write to this tempfile, include pid */ + sprintf(tmppath, "pg_logical/snapshots/%X-%X.snap.%d.tmp", + LSN_FORMAT_ARGS(lsn), MyProcPid); + + /* + * Unlink temporary file if it already exists, needs to have been before a + * crash/error since we won't enter this function twice from within a + * single decoding slot/backend and the temporary file contains the pid of + * the current process. + */ + if (unlink(tmppath) != 0 && errno != ENOENT) + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not remove file \"%s\": %m", tmppath))); + + needed_length = sizeof(SnapBuildOnDisk) + + sizeof(TransactionId) * builder->committed.xcnt; + + ondisk_c = MemoryContextAllocZero(builder->context, needed_length); + ondisk = (SnapBuildOnDisk *) ondisk_c; + ondisk->magic = SNAPBUILD_MAGIC; + ondisk->version = SNAPBUILD_VERSION; + ondisk->length = needed_length; + INIT_CRC32C(ondisk->checksum); + COMP_CRC32C(ondisk->checksum, + ((char *) ondisk) + SnapBuildOnDiskNotChecksummedSize, + SnapBuildOnDiskConstantSize - SnapBuildOnDiskNotChecksummedSize); + ondisk_c += sizeof(SnapBuildOnDisk); + + memcpy(&ondisk->builder, builder, sizeof(SnapBuild)); + /* NULL-ify memory-only data */ + ondisk->builder.context = NULL; + ondisk->builder.snapshot = NULL; + ondisk->builder.reorder = NULL; + ondisk->builder.committed.xip = NULL; + + COMP_CRC32C(ondisk->checksum, + &ondisk->builder, + sizeof(SnapBuild)); + + /* copy committed xacts */ + sz = sizeof(TransactionId) * builder->committed.xcnt; + memcpy(ondisk_c, builder->committed.xip, sz); + COMP_CRC32C(ondisk->checksum, ondisk_c, sz); + ondisk_c += sz; + + FIN_CRC32C(ondisk->checksum); + + /* we have valid data now, open tempfile and write it there */ + fd = OpenTransientFile(tmppath, + O_CREAT | O_EXCL | O_WRONLY | PG_BINARY); + if (fd < 0) + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not open file \"%s\": %m", tmppath))); + + errno = 0; + pgstat_report_wait_start(WAIT_EVENT_SNAPBUILD_WRITE); + if ((write(fd, ondisk, needed_length)) != needed_length) + { + int save_errno = errno; + + CloseTransientFile(fd); + + /* if write didn't set errno, assume problem is no disk space */ + errno = save_errno ? save_errno : ENOSPC; + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not write to file \"%s\": %m", tmppath))); + } + pgstat_report_wait_end(); + + /* + * fsync the file before renaming so that even if we crash after this we + * have either a fully valid file or nothing. + * + * It's safe to just ERROR on fsync() here because we'll retry the whole + * operation including the writes. + * + * TODO: Do the fsync() via checkpoints/restartpoints, doing it here has + * some noticeable overhead since it's performed synchronously during + * decoding? + */ + pgstat_report_wait_start(WAIT_EVENT_SNAPBUILD_SYNC); + if (pg_fsync(fd) != 0) + { + int save_errno = errno; + + CloseTransientFile(fd); + errno = save_errno; + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not fsync file \"%s\": %m", tmppath))); + } + pgstat_report_wait_end(); + + if (CloseTransientFile(fd) != 0) + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not close file \"%s\": %m", tmppath))); + + fsync_fname("pg_logical/snapshots", true); + + /* + * We may overwrite the work from some other backend, but that's ok, our + * snapshot is valid as well, we'll just have done some superfluous work. + */ + if (rename(tmppath, path) != 0) + { + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not rename file \"%s\" to \"%s\": %m", + tmppath, path))); + } + + /* make sure we persist */ + fsync_fname(path, false); + fsync_fname("pg_logical/snapshots", true); + + /* + * Now there's no way we can loose the dumped state anymore, remember this + * as a serialization point. + */ + builder->last_serialized_snapshot = lsn; + +out: + ReorderBufferSetRestartPoint(builder->reorder, + builder->last_serialized_snapshot); + /* be tidy */ + if (ondisk) + pfree(ondisk); +} + +/* + * Restore a snapshot into 'builder' if previously one has been stored at the + * location indicated by 'lsn'. Returns true if successful, false otherwise. + */ +static bool +SnapBuildRestore(SnapBuild *builder, XLogRecPtr lsn) +{ + SnapBuildOnDisk ondisk; + int fd; + char path[MAXPGPATH]; + Size sz; + int readBytes; + pg_crc32c checksum; + + /* no point in loading a snapshot if we're already there */ + if (builder->state == SNAPBUILD_CONSISTENT) + return false; + + sprintf(path, "pg_logical/snapshots/%X-%X.snap", + LSN_FORMAT_ARGS(lsn)); + + fd = OpenTransientFile(path, O_RDONLY | PG_BINARY); + + if (fd < 0 && errno == ENOENT) + return false; + else if (fd < 0) + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not open file \"%s\": %m", path))); + + /* ---- + * Make sure the snapshot had been stored safely to disk, that's normally + * cheap. + * Note that we do not need PANIC here, nobody will be able to use the + * slot without fsyncing, and saving it won't succeed without an fsync() + * either... + * ---- + */ + fsync_fname(path, false); + fsync_fname("pg_logical/snapshots", true); + + + /* read statically sized portion of snapshot */ + pgstat_report_wait_start(WAIT_EVENT_SNAPBUILD_READ); + readBytes = read(fd, &ondisk, SnapBuildOnDiskConstantSize); + pgstat_report_wait_end(); + if (readBytes != SnapBuildOnDiskConstantSize) + { + int save_errno = errno; + + CloseTransientFile(fd); + + if (readBytes < 0) + { + errno = save_errno; + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not read file \"%s\": %m", path))); + } + else + ereport(ERROR, + (errcode(ERRCODE_DATA_CORRUPTED), + errmsg("could not read file \"%s\": read %d of %zu", + path, readBytes, + (Size) SnapBuildOnDiskConstantSize))); + } + + if (ondisk.magic != SNAPBUILD_MAGIC) + ereport(ERROR, + (errcode(ERRCODE_DATA_CORRUPTED), + errmsg("snapbuild state file \"%s\" has wrong magic number: %u instead of %u", + path, ondisk.magic, SNAPBUILD_MAGIC))); + + if (ondisk.version != SNAPBUILD_VERSION) + ereport(ERROR, + (errcode(ERRCODE_DATA_CORRUPTED), + errmsg("snapbuild state file \"%s\" has unsupported version: %u instead of %u", + path, ondisk.version, SNAPBUILD_VERSION))); + + INIT_CRC32C(checksum); + COMP_CRC32C(checksum, + ((char *) &ondisk) + SnapBuildOnDiskNotChecksummedSize, + SnapBuildOnDiskConstantSize - SnapBuildOnDiskNotChecksummedSize); + + /* read SnapBuild */ + pgstat_report_wait_start(WAIT_EVENT_SNAPBUILD_READ); + readBytes = read(fd, &ondisk.builder, sizeof(SnapBuild)); + pgstat_report_wait_end(); + if (readBytes != sizeof(SnapBuild)) + { + int save_errno = errno; + + CloseTransientFile(fd); + + if (readBytes < 0) + { + errno = save_errno; + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not read file \"%s\": %m", path))); + } + else + ereport(ERROR, + (errcode(ERRCODE_DATA_CORRUPTED), + errmsg("could not read file \"%s\": read %d of %zu", + path, readBytes, sizeof(SnapBuild)))); + } + COMP_CRC32C(checksum, &ondisk.builder, sizeof(SnapBuild)); + + /* restore committed xacts information */ + sz = sizeof(TransactionId) * ondisk.builder.committed.xcnt; + ondisk.builder.committed.xip = MemoryContextAllocZero(builder->context, sz); + pgstat_report_wait_start(WAIT_EVENT_SNAPBUILD_READ); + readBytes = read(fd, ondisk.builder.committed.xip, sz); + pgstat_report_wait_end(); + if (readBytes != sz) + { + int save_errno = errno; + + CloseTransientFile(fd); + + if (readBytes < 0) + { + errno = save_errno; + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not read file \"%s\": %m", path))); + } + else + ereport(ERROR, + (errcode(ERRCODE_DATA_CORRUPTED), + errmsg("could not read file \"%s\": read %d of %zu", + path, readBytes, sz))); + } + COMP_CRC32C(checksum, ondisk.builder.committed.xip, sz); + + if (CloseTransientFile(fd) != 0) + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not close file \"%s\": %m", path))); + + FIN_CRC32C(checksum); + + /* verify checksum of what we've read */ + if (!EQ_CRC32C(checksum, ondisk.checksum)) + ereport(ERROR, + (errcode(ERRCODE_DATA_CORRUPTED), + errmsg("checksum mismatch for snapbuild state file \"%s\": is %u, should be %u", + path, checksum, ondisk.checksum))); + + /* + * ok, we now have a sensible snapshot here, figure out if it has more + * information than we have. + */ + + /* + * We are only interested in consistent snapshots for now, comparing + * whether one incomplete snapshot is more "advanced" seems to be + * unnecessarily complex. + */ + if (ondisk.builder.state < SNAPBUILD_CONSISTENT) + goto snapshot_not_interesting; + + /* + * Don't use a snapshot that requires an xmin that we cannot guarantee to + * be available. + */ + if (TransactionIdPrecedes(ondisk.builder.xmin, builder->initial_xmin_horizon)) + goto snapshot_not_interesting; + + /* consistent snapshots have no next phase */ + Assert(ondisk.builder.next_phase_at == InvalidTransactionId); + + /* ok, we think the snapshot is sensible, copy over everything important */ + builder->xmin = ondisk.builder.xmin; + builder->xmax = ondisk.builder.xmax; + builder->state = ondisk.builder.state; + + builder->committed.xcnt = ondisk.builder.committed.xcnt; + /* We only allocated/stored xcnt, not xcnt_space xids ! */ + /* don't overwrite preallocated xip, if we don't have anything here */ + if (builder->committed.xcnt > 0) + { + pfree(builder->committed.xip); + builder->committed.xcnt_space = ondisk.builder.committed.xcnt; + builder->committed.xip = ondisk.builder.committed.xip; + } + ondisk.builder.committed.xip = NULL; + + /* our snapshot is not interesting anymore, build a new one */ + if (builder->snapshot != NULL) + { + SnapBuildSnapDecRefcount(builder->snapshot); + } + builder->snapshot = SnapBuildBuildSnapshot(builder); + SnapBuildSnapIncRefcount(builder->snapshot); + + ReorderBufferSetRestartPoint(builder->reorder, lsn); + + Assert(builder->state == SNAPBUILD_CONSISTENT); + + ereport(LOG, + (errmsg("logical decoding found consistent point at %X/%X", + LSN_FORMAT_ARGS(lsn)), + errdetail("Logical decoding will begin using saved snapshot."))); + return true; + +snapshot_not_interesting: + if (ondisk.builder.committed.xip != NULL) + pfree(ondisk.builder.committed.xip); + return false; +} + +/* + * Remove all serialized snapshots that are not required anymore because no + * slot can need them. This doesn't actually have to run during a checkpoint, + * but it's a convenient point to schedule this. + * + * NB: We run this during checkpoints even if logical decoding is disabled so + * we cleanup old slots at some point after it got disabled. + */ +void +CheckPointSnapBuild(void) +{ + XLogRecPtr cutoff; + XLogRecPtr redo; + DIR *snap_dir; + struct dirent *snap_de; + char path[MAXPGPATH + 21]; + + /* + * We start off with a minimum of the last redo pointer. No new + * replication slot will start before that, so that's a safe upper bound + * for removal. + */ + redo = GetRedoRecPtr(); + + /* now check for the restart ptrs from existing slots */ + cutoff = ReplicationSlotsComputeLogicalRestartLSN(); + + /* don't start earlier than the restart lsn */ + if (redo < cutoff) + cutoff = redo; + + snap_dir = AllocateDir("pg_logical/snapshots"); + while ((snap_de = ReadDir(snap_dir, "pg_logical/snapshots")) != NULL) + { + uint32 hi; + uint32 lo; + XLogRecPtr lsn; + struct stat statbuf; + + if (strcmp(snap_de->d_name, ".") == 0 || + strcmp(snap_de->d_name, "..") == 0) + continue; + + snprintf(path, sizeof(path), "pg_logical/snapshots/%s", snap_de->d_name); + + if (lstat(path, &statbuf) == 0 && !S_ISREG(statbuf.st_mode)) + { + elog(DEBUG1, "only regular files expected: %s", path); + continue; + } + + /* + * temporary filenames from SnapBuildSerialize() include the LSN and + * everything but are postfixed by .$pid.tmp. We can just remove them + * the same as other files because there can be none that are + * currently being written that are older than cutoff. + * + * We just log a message if a file doesn't fit the pattern, it's + * probably some editors lock/state file or similar... + */ + if (sscanf(snap_de->d_name, "%X-%X.snap", &hi, &lo) != 2) + { + ereport(LOG, + (errmsg("could not parse file name \"%s\"", path))); + continue; + } + + lsn = ((uint64) hi) << 32 | lo; + + /* check whether we still need it */ + if (lsn < cutoff || cutoff == InvalidXLogRecPtr) + { + elog(DEBUG1, "removing snapbuild snapshot %s", path); + + /* + * It's not particularly harmful, though strange, if we can't + * remove the file here. Don't prevent the checkpoint from + * completing, that'd be a cure worse than the disease. + */ + if (unlink(path) < 0) + { + ereport(LOG, + (errcode_for_file_access(), + errmsg("could not remove file \"%s\": %m", + path))); + continue; + } + } + } + FreeDir(snap_dir); +} |