/*------------------------------------------------------------------------- * * snapbuild.c * * Infrastructure for building historic catalog snapshots based on contents * of the WAL, for the purpose of decoding heapam.c style values in the * WAL. * * NOTES: * * We build snapshots which can *only* be used to read catalog contents and we * do so by reading and interpreting the WAL stream. The aim is to build a * snapshot that behaves the same as a freshly taken MVCC snapshot would have * at the time the XLogRecord was generated. * * To build the snapshots we reuse the infrastructure built for Hot * Standby. The in-memory snapshots we build look different than HS' because * we have different needs. To successfully decode data from the WAL we only * need to access catalog tables and (sys|rel|cat)cache, not the actual user * tables since the data we decode is wholly contained in the WAL * records. Also, our snapshots need to be different in comparison to normal * MVCC ones because in contrast to those we cannot fully rely on the clog and * pg_subtrans for information about committed transactions because they might * commit in the future from the POV of the WAL entry we're currently * decoding. This definition has the advantage that we only need to prevent * removal of catalog rows, while normal table's rows can still be * removed. This is achieved by using the replication slot mechanism. * * As the percentage of transactions modifying the catalog normally is fairly * small in comparisons to ones only manipulating user data, we keep track of * the committed catalog modifying ones inside [xmin, xmax) instead of keeping * track of all running transactions like it's done in a normal snapshot. Note * that we're generally only looking at transactions that have acquired an * xid. That is we keep a list of transactions between snapshot->(xmin, xmax) * that we consider committed, everything else is considered aborted/in * progress. That also allows us not to care about subtransactions before they * have committed which means this module, in contrast to HS, doesn't have to * care about suboverflowed subtransactions and similar. * * One complexity of doing this is that to e.g. handle mixed DDL/DML * transactions we need Snapshots that see intermediate versions of the * catalog in a transaction. During normal operation this is achieved by using * CommandIds/cmin/cmax. The problem with that however is that for space * efficiency reasons only one value of that is stored * (cf. combocid.c). Since combo CIDs are only available in memory we log * additional information which allows us to get the original (cmin, cmax) * pair during visibility checks. Check the reorderbuffer.c's comment above * ResolveCminCmaxDuringDecoding() for details. * * To facilitate all this we need our own visibility routine, as the normal * ones are optimized for different usecases. * * To replace the normal catalog snapshots with decoding ones use the * SetupHistoricSnapshot() and TeardownHistoricSnapshot() functions. * * * * The snapbuild machinery is starting up in several stages, as illustrated * by the following graph describing the SnapBuild->state transitions: * * +-------------------------+ * +----| START |-------------+ * | +-------------------------+ | * | | | * | | | * | running_xacts #1 | * | | | * | | | * | v | * | +-------------------------+ v * | | BUILDING_SNAPSHOT |------------>| * | +-------------------------+ | * | | | * | | | * | running_xacts #2, xacts from #1 finished | * | | | * | | | * | v | * | +-------------------------+ v * | | FULL_SNAPSHOT |------------>| * | +-------------------------+ | * | | | * running_xacts | saved snapshot * with zero xacts | at running_xacts's lsn * | | | * | running_xacts with xacts from #2 finished | * | | | * | v | * | +-------------------------+ | * +--->|SNAPBUILD_CONSISTENT |<------------+ * +-------------------------+ * * Initially the machinery is in the START stage. When an xl_running_xacts * record is read that is sufficiently new (above the safe xmin horizon), * there's a state transition. If there were no running xacts when the * xl_running_xacts record was generated, we'll directly go into CONSISTENT * state, otherwise we'll switch to the BUILDING_SNAPSHOT state. Having a full * snapshot means that all transactions that start henceforth can be decoded * in their entirety, but transactions that started previously can't. In * FULL_SNAPSHOT we'll switch into CONSISTENT once all those previously * running transactions have committed or aborted. * * Only transactions that commit after CONSISTENT state has been reached will * be replayed, even though they might have started while still in * FULL_SNAPSHOT. That ensures that we'll reach a point where no previous * changes has been exported, but all the following ones will be. That point * is a convenient point to initialize replication from, which is why we * export a snapshot at that point, which *can* be used to read normal data. * * Copyright (c) 2012-2023, PostgreSQL Global Development Group * * IDENTIFICATION * src/backend/replication/logical/snapbuild.c * *------------------------------------------------------------------------- */ #include "postgres.h" #include #include #include "access/heapam_xlog.h" #include "access/transam.h" #include "access/xact.h" #include "common/file_utils.h" #include "miscadmin.h" #include "pgstat.h" #include "replication/logical.h" #include "replication/reorderbuffer.h" #include "replication/snapbuild.h" #include "storage/block.h" /* debugging output */ #include "storage/fd.h" #include "storage/lmgr.h" #include "storage/proc.h" #include "storage/procarray.h" #include "storage/standby.h" #include "utils/builtins.h" #include "utils/memutils.h" #include "utils/snapmgr.h" #include "utils/snapshot.h" /* * This struct contains the current state of the snapshot building * machinery. Besides a forward declaration in the header, it is not exposed * to the public, so we can easily change its contents. */ struct SnapBuild { /* how far are we along building our first full snapshot */ SnapBuildState state; /* private memory context used to allocate memory for this module. */ MemoryContext context; /* all transactions < than this have committed/aborted */ TransactionId xmin; /* all transactions >= than this are uncommitted */ TransactionId xmax; /* * Don't replay commits from an LSN < this LSN. This can be set externally * but it will also be advanced (never retreat) from within snapbuild.c. */ XLogRecPtr start_decoding_at; /* * LSN at which two-phase decoding was enabled or LSN at which we found a * consistent point at the time of slot creation. * * The prepared transactions, that were skipped because previously * two-phase was not enabled or are not covered by initial snapshot, need * to be sent later along with commit prepared and they must be before * this point. */ XLogRecPtr two_phase_at; /* * Don't start decoding WAL until the "xl_running_xacts" information * indicates there are no running xids with an xid smaller than this. */ TransactionId initial_xmin_horizon; /* Indicates if we are building full snapshot or just catalog one. */ bool building_full_snapshot; /* * Snapshot that's valid to see the catalog state seen at this moment. */ Snapshot snapshot; /* * LSN of the last location we are sure a snapshot has been serialized to. */ XLogRecPtr last_serialized_snapshot; /* * The reorderbuffer we need to update with usable snapshots et al. */ ReorderBuffer *reorder; /* * TransactionId at which the next phase of initial snapshot building will * happen. InvalidTransactionId if not known (i.e. SNAPBUILD_START), or * when no next phase necessary (SNAPBUILD_CONSISTENT). */ TransactionId next_phase_at; /* * Array of transactions which could have catalog changes that committed * between xmin and xmax. */ struct { /* number of committed transactions */ size_t xcnt; /* available space for committed transactions */ size_t xcnt_space; /* * Until we reach a CONSISTENT state, we record commits of all * transactions, not just the catalog changing ones. Record when that * changes so we know we cannot export a snapshot safely anymore. */ bool includes_all_transactions; /* * Array of committed transactions that have modified the catalog. * * As this array is frequently modified we do *not* keep it in * xidComparator order. Instead we sort the array when building & * distributing a snapshot. * * TODO: It's unclear whether that reasoning has much merit. Every * time we add something here after becoming consistent will also * require distributing a snapshot. Storing them sorted would * potentially also make it easier to purge (but more complicated wrt * wraparound?). Should be improved if sorting while building the * snapshot shows up in profiles. */ TransactionId *xip; } committed; /* * Array of transactions and subtransactions that had modified catalogs * and were running when the snapshot was serialized. * * We normally rely on some WAL record types such as HEAP2_NEW_CID to know * if the transaction has changed the catalog. But it could happen that * the logical decoding decodes only the commit record of the transaction * after restoring the previously serialized snapshot in which case we * will miss adding the xid to the snapshot and end up looking at the * catalogs with the wrong snapshot. * * Now to avoid the above problem, we serialize the transactions that had * modified the catalogs and are still running at the time of snapshot * serialization. We fill this array while restoring the snapshot and then * refer it while decoding commit to ensure if the xact has modified the * catalog. We discard this array when all the xids in the list become old * enough to matter. See SnapBuildPurgeOlderTxn for details. */ struct { /* number of transactions */ size_t xcnt; /* This array must be sorted in xidComparator order */ TransactionId *xip; } catchange; }; /* * Starting a transaction -- which we need to do while exporting a snapshot -- * removes knowledge about the previously used resowner, so we save it here. */ static ResourceOwner SavedResourceOwnerDuringExport = NULL; static bool ExportInProgress = false; /* ->committed and ->catchange manipulation */ static void SnapBuildPurgeOlderTxn(SnapBuild *builder); /* snapshot building/manipulation/distribution functions */ static Snapshot SnapBuildBuildSnapshot(SnapBuild *builder); static void SnapBuildFreeSnapshot(Snapshot snap); static void SnapBuildSnapIncRefcount(Snapshot snap); static void SnapBuildDistributeNewCatalogSnapshot(SnapBuild *builder, XLogRecPtr lsn); static inline bool SnapBuildXidHasCatalogChanges(SnapBuild *builder, TransactionId xid, uint32 xinfo); /* xlog reading helper functions for SnapBuildProcessRunningXacts */ static bool SnapBuildFindSnapshot(SnapBuild *builder, XLogRecPtr lsn, xl_running_xacts *running); static void SnapBuildWaitSnapshot(xl_running_xacts *running, TransactionId cutoff); /* serialization functions */ static void SnapBuildSerialize(SnapBuild *builder, XLogRecPtr lsn); static bool SnapBuildRestore(SnapBuild *builder, XLogRecPtr lsn); static void SnapBuildRestoreContents(int fd, char *dest, Size size, const char *path); /* * Allocate a new snapshot builder. * * xmin_horizon is the xid >= which we can be sure no catalog rows have been * removed, start_lsn is the LSN >= we want to replay commits. */ SnapBuild * AllocateSnapshotBuilder(ReorderBuffer *reorder, TransactionId xmin_horizon, XLogRecPtr start_lsn, bool need_full_snapshot, XLogRecPtr two_phase_at) { MemoryContext context; MemoryContext oldcontext; SnapBuild *builder; /* allocate memory in own context, to have better accountability */ context = AllocSetContextCreate(CurrentMemoryContext, "snapshot builder context", ALLOCSET_DEFAULT_SIZES); oldcontext = MemoryContextSwitchTo(context); builder = palloc0(sizeof(SnapBuild)); builder->state = SNAPBUILD_START; builder->context = context; builder->reorder = reorder; /* Other struct members initialized by zeroing via palloc0 above */ builder->committed.xcnt = 0; builder->committed.xcnt_space = 128; /* arbitrary number */ builder->committed.xip = palloc0(builder->committed.xcnt_space * sizeof(TransactionId)); builder->committed.includes_all_transactions = true; builder->catchange.xcnt = 0; builder->catchange.xip = NULL; builder->initial_xmin_horizon = xmin_horizon; builder->start_decoding_at = start_lsn; builder->building_full_snapshot = need_full_snapshot; builder->two_phase_at = two_phase_at; MemoryContextSwitchTo(oldcontext); return builder; } /* * Free a snapshot builder. */ void FreeSnapshotBuilder(SnapBuild *builder) { MemoryContext context = builder->context; /* free snapshot explicitly, that contains some error checking */ if (builder->snapshot != NULL) { SnapBuildSnapDecRefcount(builder->snapshot); builder->snapshot = NULL; } /* other resources are deallocated via memory context reset */ MemoryContextDelete(context); } /* * Free an unreferenced snapshot that has previously been built by us. */ static void SnapBuildFreeSnapshot(Snapshot snap) { /* make sure we don't get passed an external snapshot */ Assert(snap->snapshot_type == SNAPSHOT_HISTORIC_MVCC); /* make sure nobody modified our snapshot */ Assert(snap->curcid == FirstCommandId); Assert(!snap->suboverflowed); Assert(!snap->takenDuringRecovery); Assert(snap->regd_count == 0); /* slightly more likely, so it's checked even without c-asserts */ if (snap->copied) elog(ERROR, "cannot free a copied snapshot"); if (snap->active_count) elog(ERROR, "cannot free an active snapshot"); pfree(snap); } /* * In which state of snapshot building are we? */ SnapBuildState SnapBuildCurrentState(SnapBuild *builder) { return builder->state; } /* * Return the LSN at which the two-phase decoding was first enabled. */ XLogRecPtr SnapBuildGetTwoPhaseAt(SnapBuild *builder) { return builder->two_phase_at; } /* * Set the LSN at which two-phase decoding is enabled. */ void SnapBuildSetTwoPhaseAt(SnapBuild *builder, XLogRecPtr ptr) { builder->two_phase_at = ptr; } /* * Should the contents of transaction ending at 'ptr' be decoded? */ bool SnapBuildXactNeedsSkip(SnapBuild *builder, XLogRecPtr ptr) { return ptr < builder->start_decoding_at; } /* * Increase refcount of a snapshot. * * This is used when handing out a snapshot to some external resource or when * adding a Snapshot as builder->snapshot. */ static void SnapBuildSnapIncRefcount(Snapshot snap) { snap->active_count++; } /* * Decrease refcount of a snapshot and free if the refcount reaches zero. * * Externally visible, so that external resources that have been handed an * IncRef'ed Snapshot can adjust its refcount easily. */ void SnapBuildSnapDecRefcount(Snapshot snap) { /* make sure we don't get passed an external snapshot */ Assert(snap->snapshot_type == SNAPSHOT_HISTORIC_MVCC); /* make sure nobody modified our snapshot */ Assert(snap->curcid == FirstCommandId); Assert(!snap->suboverflowed); Assert(!snap->takenDuringRecovery); Assert(snap->regd_count == 0); Assert(snap->active_count > 0); /* slightly more likely, so it's checked even without casserts */ if (snap->copied) elog(ERROR, "cannot free a copied snapshot"); snap->active_count--; if (snap->active_count == 0) SnapBuildFreeSnapshot(snap); } /* * Build a new snapshot, based on currently committed catalog-modifying * transactions. * * In-progress transactions with catalog access are *not* allowed to modify * these snapshots; they have to copy them and fill in appropriate ->curcid * and ->subxip/subxcnt values. */ static Snapshot SnapBuildBuildSnapshot(SnapBuild *builder) { Snapshot snapshot; Size ssize; Assert(builder->state >= SNAPBUILD_FULL_SNAPSHOT); ssize = sizeof(SnapshotData) + sizeof(TransactionId) * builder->committed.xcnt + sizeof(TransactionId) * 1 /* toplevel xid */ ; snapshot = MemoryContextAllocZero(builder->context, ssize); snapshot->snapshot_type = SNAPSHOT_HISTORIC_MVCC; /* * We misuse the original meaning of SnapshotData's xip and subxip fields * to make the more fitting for our needs. * * In the 'xip' array we store transactions that have to be treated as * committed. Since we will only ever look at tuples from transactions * that have modified the catalog it's more efficient to store those few * that exist between xmin and xmax (frequently there are none). * * Snapshots that are used in transactions that have modified the catalog * also use the 'subxip' array to store their toplevel xid and all the * subtransaction xids so we can recognize when we need to treat rows as * visible that are not in xip but still need to be visible. Subxip only * gets filled when the transaction is copied into the context of a * catalog modifying transaction since we otherwise share a snapshot * between transactions. As long as a txn hasn't modified the catalog it * doesn't need to treat any uncommitted rows as visible, so there is no * need for those xids. * * Both arrays are qsort'ed so that we can use bsearch() on them. */ Assert(TransactionIdIsNormal(builder->xmin)); Assert(TransactionIdIsNormal(builder->xmax)); snapshot->xmin = builder->xmin; snapshot->xmax = builder->xmax; /* store all transactions to be treated as committed by this snapshot */ snapshot->xip = (TransactionId *) ((char *) snapshot + sizeof(SnapshotData)); snapshot->xcnt = builder->committed.xcnt; memcpy(snapshot->xip, builder->committed.xip, builder->committed.xcnt * sizeof(TransactionId)); /* sort so we can bsearch() */ qsort(snapshot->xip, snapshot->xcnt, sizeof(TransactionId), xidComparator); /* * Initially, subxip is empty, i.e. it's a snapshot to be used by * transactions that don't modify the catalog. Will be filled by * ReorderBufferCopySnap() if necessary. */ snapshot->subxcnt = 0; snapshot->subxip = NULL; snapshot->suboverflowed = false; snapshot->takenDuringRecovery = false; snapshot->copied = false; snapshot->curcid = FirstCommandId; snapshot->active_count = 0; snapshot->regd_count = 0; snapshot->snapXactCompletionCount = 0; return snapshot; } /* * Build the initial slot snapshot and convert it to a normal snapshot that * is understood by HeapTupleSatisfiesMVCC. * * The snapshot will be usable directly in current transaction or exported * for loading in different transaction. */ Snapshot SnapBuildInitialSnapshot(SnapBuild *builder) { Snapshot snap; TransactionId xid; TransactionId safeXid; TransactionId *newxip; int newxcnt = 0; Assert(XactIsoLevel == XACT_REPEATABLE_READ); Assert(builder->building_full_snapshot); /* don't allow older snapshots */ InvalidateCatalogSnapshot(); /* about to overwrite MyProc->xmin */ if (HaveRegisteredOrActiveSnapshot()) elog(ERROR, "cannot build an initial slot snapshot when snapshots exist"); Assert(!HistoricSnapshotActive()); if (builder->state != SNAPBUILD_CONSISTENT) elog(ERROR, "cannot build an initial slot snapshot before reaching a consistent state"); if (!builder->committed.includes_all_transactions) elog(ERROR, "cannot build an initial slot snapshot, not all transactions are monitored anymore"); /* so we don't overwrite the existing value */ if (TransactionIdIsValid(MyProc->xmin)) elog(ERROR, "cannot build an initial slot snapshot when MyProc->xmin already is valid"); snap = SnapBuildBuildSnapshot(builder); /* * We know that snap->xmin is alive, enforced by the logical xmin * mechanism. Due to that we can do this without locks, we're only * changing our own value. * * Building an initial snapshot is expensive and an unenforced xmin * horizon would have bad consequences, therefore always double-check that * the horizon is enforced. */ LWLockAcquire(ProcArrayLock, LW_SHARED); safeXid = GetOldestSafeDecodingTransactionId(false); LWLockRelease(ProcArrayLock); if (TransactionIdFollows(safeXid, snap->xmin)) elog(ERROR, "cannot build an initial slot snapshot as oldest safe xid %u follows snapshot's xmin %u", safeXid, snap->xmin); MyProc->xmin = snap->xmin; /* allocate in transaction context */ newxip = (TransactionId *) palloc(sizeof(TransactionId) * GetMaxSnapshotXidCount()); /* * snapbuild.c builds transactions in an "inverted" manner, which means it * stores committed transactions in ->xip, not ones in progress. Build a * classical snapshot by marking all non-committed transactions as * in-progress. This can be expensive. */ for (xid = snap->xmin; NormalTransactionIdPrecedes(xid, snap->xmax);) { void *test; /* * Check whether transaction committed using the decoding snapshot * meaning of ->xip. */ test = bsearch(&xid, snap->xip, snap->xcnt, sizeof(TransactionId), xidComparator); if (test == NULL) { if (newxcnt >= GetMaxSnapshotXidCount()) ereport(ERROR, (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE), errmsg("initial slot snapshot too large"))); newxip[newxcnt++] = xid; } TransactionIdAdvance(xid); } /* adjust remaining snapshot fields as needed */ snap->snapshot_type = SNAPSHOT_MVCC; snap->xcnt = newxcnt; snap->xip = newxip; return snap; } /* * Export a snapshot so it can be set in another session with SET TRANSACTION * SNAPSHOT. * * For that we need to start a transaction in the current backend as the * importing side checks whether the source transaction is still open to make * sure the xmin horizon hasn't advanced since then. */ const char * SnapBuildExportSnapshot(SnapBuild *builder) { Snapshot snap; char *snapname; if (IsTransactionOrTransactionBlock()) elog(ERROR, "cannot export a snapshot from within a transaction"); if (SavedResourceOwnerDuringExport) elog(ERROR, "can only export one snapshot at a time"); SavedResourceOwnerDuringExport = CurrentResourceOwner; ExportInProgress = true; StartTransactionCommand(); /* There doesn't seem to a nice API to set these */ XactIsoLevel = XACT_REPEATABLE_READ; XactReadOnly = true; snap = SnapBuildInitialSnapshot(builder); /* * now that we've built a plain snapshot, make it active and use the * normal mechanisms for exporting it */ snapname = ExportSnapshot(snap); ereport(LOG, (errmsg_plural("exported logical decoding snapshot: \"%s\" with %u transaction ID", "exported logical decoding snapshot: \"%s\" with %u transaction IDs", snap->xcnt, snapname, snap->xcnt))); return snapname; } /* * Ensure there is a snapshot and if not build one for current transaction. */ Snapshot SnapBuildGetOrBuildSnapshot(SnapBuild *builder) { Assert(builder->state == SNAPBUILD_CONSISTENT); /* only build a new snapshot if we don't have a prebuilt one */ if (builder->snapshot == NULL) { builder->snapshot = SnapBuildBuildSnapshot(builder); /* increase refcount for the snapshot builder */ SnapBuildSnapIncRefcount(builder->snapshot); } return builder->snapshot; } /* * Reset a previously SnapBuildExportSnapshot()'ed snapshot if there is * any. Aborts the previously started transaction and resets the resource * owner back to its original value. */ void SnapBuildClearExportedSnapshot(void) { ResourceOwner tmpResOwner; /* nothing exported, that is the usual case */ if (!ExportInProgress) return; if (!IsTransactionState()) elog(ERROR, "clearing exported snapshot in wrong transaction state"); /* * AbortCurrentTransaction() takes care of resetting the snapshot state, * so remember SavedResourceOwnerDuringExport. */ tmpResOwner = SavedResourceOwnerDuringExport; /* make sure nothing could have ever happened */ AbortCurrentTransaction(); CurrentResourceOwner = tmpResOwner; } /* * Clear snapshot export state during transaction abort. */ void SnapBuildResetExportedSnapshotState(void) { SavedResourceOwnerDuringExport = NULL; ExportInProgress = false; } /* * Handle the effects of a single heap change, appropriate to the current state * of the snapshot builder and returns whether changes made at (xid, lsn) can * be decoded. */ bool SnapBuildProcessChange(SnapBuild *builder, TransactionId xid, XLogRecPtr lsn) { /* * We can't handle data in transactions if we haven't built a snapshot * yet, so don't store them. */ if (builder->state < SNAPBUILD_FULL_SNAPSHOT) return false; /* * No point in keeping track of changes in transactions that we don't have * enough information about to decode. This means that they started before * we got into the SNAPBUILD_FULL_SNAPSHOT state. */ if (builder->state < SNAPBUILD_CONSISTENT && TransactionIdPrecedes(xid, builder->next_phase_at)) return false; /* * If the reorderbuffer doesn't yet have a snapshot, add one now, it will * be needed to decode the change we're currently processing. */ if (!ReorderBufferXidHasBaseSnapshot(builder->reorder, xid)) { /* only build a new snapshot if we don't have a prebuilt one */ if (builder->snapshot == NULL) { builder->snapshot = SnapBuildBuildSnapshot(builder); /* increase refcount for the snapshot builder */ SnapBuildSnapIncRefcount(builder->snapshot); } /* * Increase refcount for the transaction we're handing the snapshot * out to. */ SnapBuildSnapIncRefcount(builder->snapshot); ReorderBufferSetBaseSnapshot(builder->reorder, xid, lsn, builder->snapshot); } return true; } /* * Do CommandId/combo CID handling after reading an xl_heap_new_cid record. * This implies that a transaction has done some form of write to system * catalogs. */ void SnapBuildProcessNewCid(SnapBuild *builder, TransactionId xid, XLogRecPtr lsn, xl_heap_new_cid *xlrec) { CommandId cid; /* * we only log new_cid's if a catalog tuple was modified, so mark the * transaction as containing catalog modifications */ ReorderBufferXidSetCatalogChanges(builder->reorder, xid, lsn); ReorderBufferAddNewTupleCids(builder->reorder, xlrec->top_xid, lsn, xlrec->target_locator, xlrec->target_tid, xlrec->cmin, xlrec->cmax, xlrec->combocid); /* figure out new command id */ if (xlrec->cmin != InvalidCommandId && xlrec->cmax != InvalidCommandId) cid = Max(xlrec->cmin, xlrec->cmax); else if (xlrec->cmax != InvalidCommandId) cid = xlrec->cmax; else if (xlrec->cmin != InvalidCommandId) cid = xlrec->cmin; else { cid = InvalidCommandId; /* silence compiler */ elog(ERROR, "xl_heap_new_cid record without a valid CommandId"); } ReorderBufferAddNewCommandId(builder->reorder, xid, lsn, cid + 1); } /* * Add a new Snapshot to all transactions we're decoding that currently are * in-progress so they can see new catalog contents made by the transaction * that just committed. This is necessary because those in-progress * transactions will use the new catalog's contents from here on (at the very * least everything they do needs to be compatible with newer catalog * contents). */ static void SnapBuildDistributeNewCatalogSnapshot(SnapBuild *builder, XLogRecPtr lsn) { dlist_iter txn_i; ReorderBufferTXN *txn; /* * Iterate through all toplevel transactions. This can include * subtransactions which we just don't yet know to be that, but that's * fine, they will just get an unnecessary snapshot queued. */ dlist_foreach(txn_i, &builder->reorder->toplevel_by_lsn) { txn = dlist_container(ReorderBufferTXN, node, txn_i.cur); Assert(TransactionIdIsValid(txn->xid)); /* * If we don't have a base snapshot yet, there are no changes in this * transaction which in turn implies we don't yet need a snapshot at * all. We'll add a snapshot when the first change gets queued. * * NB: This works correctly even for subtransactions because * ReorderBufferAssignChild() takes care to transfer the base snapshot * to the top-level transaction, and while iterating the changequeue * we'll get the change from the subtxn. */ if (!ReorderBufferXidHasBaseSnapshot(builder->reorder, txn->xid)) continue; /* * We don't need to add snapshot to prepared transactions as they * should not see the new catalog contents. */ if (rbtxn_prepared(txn) || rbtxn_skip_prepared(txn)) continue; elog(DEBUG2, "adding a new snapshot to %u at %X/%X", txn->xid, LSN_FORMAT_ARGS(lsn)); /* * increase the snapshot's refcount for the transaction we are handing * it out to */ SnapBuildSnapIncRefcount(builder->snapshot); ReorderBufferAddSnapshot(builder->reorder, txn->xid, lsn, builder->snapshot); } } /* * Keep track of a new catalog changing transaction that has committed. */ static void SnapBuildAddCommittedTxn(SnapBuild *builder, TransactionId xid) { Assert(TransactionIdIsValid(xid)); if (builder->committed.xcnt == builder->committed.xcnt_space) { builder->committed.xcnt_space = builder->committed.xcnt_space * 2 + 1; elog(DEBUG1, "increasing space for committed transactions to %u", (uint32) builder->committed.xcnt_space); builder->committed.xip = repalloc(builder->committed.xip, builder->committed.xcnt_space * sizeof(TransactionId)); } /* * TODO: It might make sense to keep the array sorted here instead of * doing it every time we build a new snapshot. On the other hand this * gets called repeatedly when a transaction with subtransactions commits. */ builder->committed.xip[builder->committed.xcnt++] = xid; } /* * Remove knowledge about transactions we treat as committed or containing catalog * changes that are smaller than ->xmin. Those won't ever get checked via * the ->committed or ->catchange array, respectively. The committed xids will * get checked via the clog machinery. * * We can ideally remove the transaction from catchange array once it is * finished (committed/aborted) but that could be costly as we need to maintain * the xids order in the array. */ static void SnapBuildPurgeOlderTxn(SnapBuild *builder) { int off; TransactionId *workspace; int surviving_xids = 0; /* not ready yet */ if (!TransactionIdIsNormal(builder->xmin)) return; /* TODO: Neater algorithm than just copying and iterating? */ workspace = MemoryContextAlloc(builder->context, builder->committed.xcnt * sizeof(TransactionId)); /* copy xids that still are interesting to workspace */ for (off = 0; off < builder->committed.xcnt; off++) { if (NormalTransactionIdPrecedes(builder->committed.xip[off], builder->xmin)) ; /* remove */ else workspace[surviving_xids++] = builder->committed.xip[off]; } /* copy workspace back to persistent state */ memcpy(builder->committed.xip, workspace, surviving_xids * sizeof(TransactionId)); elog(DEBUG3, "purged committed transactions from %u to %u, xmin: %u, xmax: %u", (uint32) builder->committed.xcnt, (uint32) surviving_xids, builder->xmin, builder->xmax); builder->committed.xcnt = surviving_xids; pfree(workspace); /* * Purge xids in ->catchange as well. The purged array must also be sorted * in xidComparator order. */ if (builder->catchange.xcnt > 0) { /* * Since catchange.xip is sorted, we find the lower bound of xids that * are still interesting. */ for (off = 0; off < builder->catchange.xcnt; off++) { if (TransactionIdFollowsOrEquals(builder->catchange.xip[off], builder->xmin)) break; } surviving_xids = builder->catchange.xcnt - off; if (surviving_xids > 0) { memmove(builder->catchange.xip, &(builder->catchange.xip[off]), surviving_xids * sizeof(TransactionId)); } else { pfree(builder->catchange.xip); builder->catchange.xip = NULL; } elog(DEBUG3, "purged catalog modifying transactions from %u to %u, xmin: %u, xmax: %u", (uint32) builder->catchange.xcnt, (uint32) surviving_xids, builder->xmin, builder->xmax); builder->catchange.xcnt = surviving_xids; } } /* * Handle everything that needs to be done when a transaction commits */ void SnapBuildCommitTxn(SnapBuild *builder, XLogRecPtr lsn, TransactionId xid, int nsubxacts, TransactionId *subxacts, uint32 xinfo) { int nxact; bool needs_snapshot = false; bool needs_timetravel = false; bool sub_needs_timetravel = false; TransactionId xmax = xid; /* * Transactions preceding BUILDING_SNAPSHOT will neither be decoded, nor * will they be part of a snapshot. So we don't need to record anything. */ if (builder->state == SNAPBUILD_START || (builder->state == SNAPBUILD_BUILDING_SNAPSHOT && TransactionIdPrecedes(xid, builder->next_phase_at))) { /* ensure that only commits after this are getting replayed */ if (builder->start_decoding_at <= lsn) builder->start_decoding_at = lsn + 1; return; } if (builder->state < SNAPBUILD_CONSISTENT) { /* ensure that only commits after this are getting replayed */ if (builder->start_decoding_at <= lsn) builder->start_decoding_at = lsn + 1; /* * If building an exportable snapshot, force xid to be tracked, even * if the transaction didn't modify the catalog. */ if (builder->building_full_snapshot) { needs_timetravel = true; } } for (nxact = 0; nxact < nsubxacts; nxact++) { TransactionId subxid = subxacts[nxact]; /* * Add subtransaction to base snapshot if catalog modifying, we don't * distinguish to toplevel transactions there. */ if (SnapBuildXidHasCatalogChanges(builder, subxid, xinfo)) { sub_needs_timetravel = true; needs_snapshot = true; elog(DEBUG1, "found subtransaction %u:%u with catalog changes", xid, subxid); SnapBuildAddCommittedTxn(builder, subxid); if (NormalTransactionIdFollows(subxid, xmax)) xmax = subxid; } /* * If we're forcing timetravel we also need visibility information * about subtransaction, so keep track of subtransaction's state, even * if not catalog modifying. Don't need to distribute a snapshot in * that case. */ else if (needs_timetravel) { SnapBuildAddCommittedTxn(builder, subxid); if (NormalTransactionIdFollows(subxid, xmax)) xmax = subxid; } } /* if top-level modified catalog, it'll need a snapshot */ if (SnapBuildXidHasCatalogChanges(builder, xid, xinfo)) { elog(DEBUG2, "found top level transaction %u, with catalog changes", xid); needs_snapshot = true; needs_timetravel = true; SnapBuildAddCommittedTxn(builder, xid); } else if (sub_needs_timetravel) { /* track toplevel txn as well, subxact alone isn't meaningful */ elog(DEBUG2, "forced transaction %u to do timetravel due to one of its subtransactions", xid); needs_timetravel = true; SnapBuildAddCommittedTxn(builder, xid); } else if (needs_timetravel) { elog(DEBUG2, "forced transaction %u to do timetravel", xid); SnapBuildAddCommittedTxn(builder, xid); } if (!needs_timetravel) { /* record that we cannot export a general snapshot anymore */ builder->committed.includes_all_transactions = false; } Assert(!needs_snapshot || needs_timetravel); /* * Adjust xmax of the snapshot builder, we only do that for committed, * catalog modifying, transactions, everything else isn't interesting for * us since we'll never look at the respective rows. */ if (needs_timetravel && (!TransactionIdIsValid(builder->xmax) || TransactionIdFollowsOrEquals(xmax, builder->xmax))) { builder->xmax = xmax; TransactionIdAdvance(builder->xmax); } /* if there's any reason to build a historic snapshot, do so now */ if (needs_snapshot) { /* * If we haven't built a complete snapshot yet there's no need to hand * it out, it wouldn't (and couldn't) be used anyway. */ if (builder->state < SNAPBUILD_FULL_SNAPSHOT) return; /* * Decrease the snapshot builder's refcount of the old snapshot, note * that it still will be used if it has been handed out to the * reorderbuffer earlier. */ if (builder->snapshot) SnapBuildSnapDecRefcount(builder->snapshot); builder->snapshot = SnapBuildBuildSnapshot(builder); /* we might need to execute invalidations, add snapshot */ if (!ReorderBufferXidHasBaseSnapshot(builder->reorder, xid)) { SnapBuildSnapIncRefcount(builder->snapshot); ReorderBufferSetBaseSnapshot(builder->reorder, xid, lsn, builder->snapshot); } /* refcount of the snapshot builder for the new snapshot */ SnapBuildSnapIncRefcount(builder->snapshot); /* add a new catalog snapshot to all currently running transactions */ SnapBuildDistributeNewCatalogSnapshot(builder, lsn); } } /* * Check the reorder buffer and the snapshot to see if the given transaction has * modified catalogs. */ static inline bool SnapBuildXidHasCatalogChanges(SnapBuild *builder, TransactionId xid, uint32 xinfo) { if (ReorderBufferXidHasCatalogChanges(builder->reorder, xid)) return true; /* * The transactions that have changed catalogs must have invalidation * info. */ if (!(xinfo & XACT_XINFO_HAS_INVALS)) return false; /* Check the catchange XID array */ return ((builder->catchange.xcnt > 0) && (bsearch(&xid, builder->catchange.xip, builder->catchange.xcnt, sizeof(TransactionId), xidComparator) != NULL)); } /* ----------------------------------- * Snapshot building functions dealing with xlog records * ----------------------------------- */ /* * Process a running xacts record, and use its information to first build a * historic snapshot and later to release resources that aren't needed * anymore. */ void SnapBuildProcessRunningXacts(SnapBuild *builder, XLogRecPtr lsn, xl_running_xacts *running) { ReorderBufferTXN *txn; TransactionId xmin; /* * If we're not consistent yet, inspect the record to see whether it * allows to get closer to being consistent. If we are consistent, dump * our snapshot so others or we, after a restart, can use it. */ if (builder->state < SNAPBUILD_CONSISTENT) { /* returns false if there's no point in performing cleanup just yet */ if (!SnapBuildFindSnapshot(builder, lsn, running)) return; } else SnapBuildSerialize(builder, lsn); /* * Update range of interesting xids based on the running xacts * information. We don't increase ->xmax using it, because once we are in * a consistent state we can do that ourselves and much more efficiently * so, because we only need to do it for catalog transactions since we * only ever look at those. * * NB: We only increase xmax when a catalog modifying transaction commits * (see SnapBuildCommitTxn). Because of this, xmax can be lower than * xmin, which looks odd but is correct and actually more efficient, since * we hit fast paths in heapam_visibility.c. */ builder->xmin = running->oldestRunningXid; /* Remove transactions we don't need to keep track off anymore */ SnapBuildPurgeOlderTxn(builder); /* * Advance the xmin limit for the current replication slot, to allow * vacuum to clean up the tuples this slot has been protecting. * * The reorderbuffer might have an xmin among the currently running * snapshots; use it if so. If not, we need only consider the snapshots * we'll produce later, which can't be less than the oldest running xid in * the record we're reading now. */ xmin = ReorderBufferGetOldestXmin(builder->reorder); if (xmin == InvalidTransactionId) xmin = running->oldestRunningXid; elog(DEBUG3, "xmin: %u, xmax: %u, oldest running: %u, oldest xmin: %u", builder->xmin, builder->xmax, running->oldestRunningXid, xmin); LogicalIncreaseXminForSlot(lsn, xmin); /* * Also tell the slot where we can restart decoding from. We don't want to * do that after every commit because changing that implies an fsync of * the logical slot's state file, so we only do it every time we see a * running xacts record. * * Do so by looking for the oldest in progress transaction (determined by * the first LSN of any of its relevant records). Every transaction * remembers the last location we stored the snapshot to disk before its * beginning. That point is where we can restart from. */ /* * Can't know about a serialized snapshot's location if we're not * consistent. */ if (builder->state < SNAPBUILD_CONSISTENT) return; txn = ReorderBufferGetOldestTXN(builder->reorder); /* * oldest ongoing txn might have started when we didn't yet serialize * anything because we hadn't reached a consistent state yet. */ if (txn != NULL && txn->restart_decoding_lsn != InvalidXLogRecPtr) LogicalIncreaseRestartDecodingForSlot(lsn, txn->restart_decoding_lsn); /* * No in-progress transaction, can reuse the last serialized snapshot if * we have one. */ else if (txn == NULL && builder->reorder->current_restart_decoding_lsn != InvalidXLogRecPtr && builder->last_serialized_snapshot != InvalidXLogRecPtr) LogicalIncreaseRestartDecodingForSlot(lsn, builder->last_serialized_snapshot); } /* * Build the start of a snapshot that's capable of decoding the catalog. * * Helper function for SnapBuildProcessRunningXacts() while we're not yet * consistent. * * Returns true if there is a point in performing internal maintenance/cleanup * using the xl_running_xacts record. */ static bool SnapBuildFindSnapshot(SnapBuild *builder, XLogRecPtr lsn, xl_running_xacts *running) { /* --- * Build catalog decoding snapshot incrementally using information about * the currently running transactions. There are several ways to do that: * * a) There were no running transactions when the xl_running_xacts record * was inserted, jump to CONSISTENT immediately. We might find such a * state while waiting on c)'s sub-states. * * b) This (in a previous run) or another decoding slot serialized a * snapshot to disk that we can use. Can't use this method for the * initial snapshot when slot is being created and needs full snapshot * for export or direct use, as that snapshot will only contain catalog * modifying transactions. * * c) First incrementally build a snapshot for catalog tuples * (BUILDING_SNAPSHOT), that requires all, already in-progress, * transactions to finish. Every transaction starting after that * (FULL_SNAPSHOT state), has enough information to be decoded. But * for older running transactions no viable snapshot exists yet, so * CONSISTENT will only be reached once all of those have finished. * --- */ /* * xl_running_xacts record is older than what we can use, we might not * have all necessary catalog rows anymore. */ if (TransactionIdIsNormal(builder->initial_xmin_horizon) && NormalTransactionIdPrecedes(running->oldestRunningXid, builder->initial_xmin_horizon)) { ereport(DEBUG1, (errmsg_internal("skipping snapshot at %X/%X while building logical decoding snapshot, xmin horizon too low", LSN_FORMAT_ARGS(lsn)), errdetail_internal("initial xmin horizon of %u vs the snapshot's %u", builder->initial_xmin_horizon, running->oldestRunningXid))); SnapBuildWaitSnapshot(running, builder->initial_xmin_horizon); return true; } /* * a) No transaction were running, we can jump to consistent. * * This is not affected by races around xl_running_xacts, because we can * miss transaction commits, but currently not transactions starting. * * NB: We might have already started to incrementally assemble a snapshot, * so we need to be careful to deal with that. */ if (running->oldestRunningXid == running->nextXid) { if (builder->start_decoding_at == InvalidXLogRecPtr || builder->start_decoding_at <= lsn) /* can decode everything after this */ builder->start_decoding_at = lsn + 1; /* As no transactions were running xmin/xmax can be trivially set. */ builder->xmin = running->nextXid; /* < are finished */ builder->xmax = running->nextXid; /* >= are running */ /* so we can safely use the faster comparisons */ Assert(TransactionIdIsNormal(builder->xmin)); Assert(TransactionIdIsNormal(builder->xmax)); builder->state = SNAPBUILD_CONSISTENT; builder->next_phase_at = InvalidTransactionId; ereport(LOG, (errmsg("logical decoding found consistent point at %X/%X", LSN_FORMAT_ARGS(lsn)), errdetail("There are no running transactions."))); return false; } /* b) valid on disk state and not building full snapshot */ else if (!builder->building_full_snapshot && SnapBuildRestore(builder, lsn)) { /* there won't be any state to cleanup */ return false; } /* * c) transition from START to BUILDING_SNAPSHOT. * * In START state, and a xl_running_xacts record with running xacts is * encountered. In that case, switch to BUILDING_SNAPSHOT state, and * record xl_running_xacts->nextXid. Once all running xacts have finished * (i.e. they're all >= nextXid), we have a complete catalog snapshot. It * might look that we could use xl_running_xacts's ->xids information to * get there quicker, but that is problematic because transactions marked * as running, might already have inserted their commit record - it's * infeasible to change that with locking. */ else if (builder->state == SNAPBUILD_START) { builder->state = SNAPBUILD_BUILDING_SNAPSHOT; builder->next_phase_at = running->nextXid; /* * Start with an xmin/xmax that's correct for future, when all the * currently running transactions have finished. We'll update both * while waiting for the pending transactions to finish. */ builder->xmin = running->nextXid; /* < are finished */ builder->xmax = running->nextXid; /* >= are running */ /* so we can safely use the faster comparisons */ Assert(TransactionIdIsNormal(builder->xmin)); Assert(TransactionIdIsNormal(builder->xmax)); ereport(LOG, (errmsg("logical decoding found initial starting point at %X/%X", LSN_FORMAT_ARGS(lsn)), errdetail("Waiting for transactions (approximately %d) older than %u to end.", running->xcnt, running->nextXid))); SnapBuildWaitSnapshot(running, running->nextXid); } /* * c) transition from BUILDING_SNAPSHOT to FULL_SNAPSHOT. * * In BUILDING_SNAPSHOT state, and this xl_running_xacts' oldestRunningXid * is >= than nextXid from when we switched to BUILDING_SNAPSHOT. This * means all transactions starting afterwards have enough information to * be decoded. Switch to FULL_SNAPSHOT. */ else if (builder->state == SNAPBUILD_BUILDING_SNAPSHOT && TransactionIdPrecedesOrEquals(builder->next_phase_at, running->oldestRunningXid)) { builder->state = SNAPBUILD_FULL_SNAPSHOT; builder->next_phase_at = running->nextXid; ereport(LOG, (errmsg("logical decoding found initial consistent point at %X/%X", LSN_FORMAT_ARGS(lsn)), errdetail("Waiting for transactions (approximately %d) older than %u to end.", running->xcnt, running->nextXid))); SnapBuildWaitSnapshot(running, running->nextXid); } /* * c) transition from FULL_SNAPSHOT to CONSISTENT. * * In FULL_SNAPSHOT state, and this xl_running_xacts' oldestRunningXid is * >= than nextXid from when we switched to FULL_SNAPSHOT. This means all * transactions that are currently in progress have a catalog snapshot, * and all their changes have been collected. Switch to CONSISTENT. */ else if (builder->state == SNAPBUILD_FULL_SNAPSHOT && TransactionIdPrecedesOrEquals(builder->next_phase_at, running->oldestRunningXid)) { builder->state = SNAPBUILD_CONSISTENT; builder->next_phase_at = InvalidTransactionId; ereport(LOG, (errmsg("logical decoding found consistent point at %X/%X", LSN_FORMAT_ARGS(lsn)), errdetail("There are no old transactions anymore."))); } /* * We already started to track running xacts and need to wait for all * in-progress ones to finish. We fall through to the normal processing of * records so incremental cleanup can be performed. */ return true; } /* --- * Iterate through xids in record, wait for all older than the cutoff to * finish. Then, if possible, log a new xl_running_xacts record. * * This isn't required for the correctness of decoding, but to: * a) allow isolationtester to notice that we're currently waiting for * something. * b) log a new xl_running_xacts record where it'd be helpful, without having * to wait for bgwriter or checkpointer. * --- */ static void SnapBuildWaitSnapshot(xl_running_xacts *running, TransactionId cutoff) { int off; for (off = 0; off < running->xcnt; off++) { TransactionId xid = running->xids[off]; /* * Upper layers should prevent that we ever need to wait on ourselves. * Check anyway, since failing to do so would either result in an * endless wait or an Assert() failure. */ if (TransactionIdIsCurrentTransactionId(xid)) elog(ERROR, "waiting for ourselves"); if (TransactionIdFollows(xid, cutoff)) continue; XactLockTableWait(xid, NULL, NULL, XLTW_None); } /* * All transactions we needed to finish finished - try to ensure there is * another xl_running_xacts record in a timely manner, without having to * wait for bgwriter or checkpointer to log one. During recovery we can't * enforce that, so we'll have to wait. */ if (!RecoveryInProgress()) { LogStandbySnapshot(); } } /* ----------------------------------- * Snapshot serialization support * ----------------------------------- */ /* * We store current state of struct SnapBuild on disk in the following manner: * * struct SnapBuildOnDisk; * TransactionId * committed.xcnt; (*not xcnt_space*) * TransactionId * catchange.xcnt; * */ typedef struct SnapBuildOnDisk { /* first part of this struct needs to be version independent */ /* data not covered by checksum */ uint32 magic; pg_crc32c checksum; /* data covered by checksum */ /* version, in case we want to support pg_upgrade */ uint32 version; /* how large is the on disk data, excluding the constant sized part */ uint32 length; /* version dependent part */ SnapBuild builder; /* variable amount of TransactionIds follows */ } SnapBuildOnDisk; #define SnapBuildOnDiskConstantSize \ offsetof(SnapBuildOnDisk, builder) #define SnapBuildOnDiskNotChecksummedSize \ offsetof(SnapBuildOnDisk, version) #define SNAPBUILD_MAGIC 0x51A1E001 #define SNAPBUILD_VERSION 5 /* * Store/Load a snapshot from disk, depending on the snapshot builder's state. * * Supposed to be used by external (i.e. not snapbuild.c) code that just read * a record that's a potential location for a serialized snapshot. */ void SnapBuildSerializationPoint(SnapBuild *builder, XLogRecPtr lsn) { if (builder->state < SNAPBUILD_CONSISTENT) SnapBuildRestore(builder, lsn); else SnapBuildSerialize(builder, lsn); } /* * Serialize the snapshot 'builder' at the location 'lsn' if it hasn't already * been done by another decoding process. */ static void SnapBuildSerialize(SnapBuild *builder, XLogRecPtr lsn) { Size needed_length; SnapBuildOnDisk *ondisk = NULL; TransactionId *catchange_xip = NULL; MemoryContext old_ctx; size_t catchange_xcnt; char *ondisk_c; int fd; char tmppath[MAXPGPATH]; char path[MAXPGPATH]; int ret; struct stat stat_buf; Size sz; Assert(lsn != InvalidXLogRecPtr); Assert(builder->last_serialized_snapshot == InvalidXLogRecPtr || builder->last_serialized_snapshot <= lsn); /* * no point in serializing if we cannot continue to work immediately after * restoring the snapshot */ if (builder->state < SNAPBUILD_CONSISTENT) return; /* consistent snapshots have no next phase */ Assert(builder->next_phase_at == InvalidTransactionId); /* * We identify snapshots by the LSN they are valid for. We don't need to * include timelines in the name as each LSN maps to exactly one timeline * unless the user used pg_resetwal or similar. If a user did so, there's * no hope continuing to decode anyway. */ sprintf(path, "pg_logical/snapshots/%X-%X.snap", LSN_FORMAT_ARGS(lsn)); /* * first check whether some other backend already has written the snapshot * for this LSN. It's perfectly fine if there's none, so we accept ENOENT * as a valid state. Everything else is an unexpected error. */ ret = stat(path, &stat_buf); if (ret != 0 && errno != ENOENT) ereport(ERROR, (errcode_for_file_access(), errmsg("could not stat file \"%s\": %m", path))); else if (ret == 0) { /* * somebody else has already serialized to this point, don't overwrite * but remember location, so we don't need to read old data again. * * To be sure it has been synced to disk after the rename() from the * tempfile filename to the real filename, we just repeat the fsync. * That ought to be cheap because in most scenarios it should already * be safely on disk. */ fsync_fname(path, false); fsync_fname("pg_logical/snapshots", true); builder->last_serialized_snapshot = lsn; goto out; } /* * there is an obvious race condition here between the time we stat(2) the * file and us writing the file. But we rename the file into place * atomically and all files created need to contain the same data anyway, * so this is perfectly fine, although a bit of a resource waste. Locking * seems like pointless complication. */ elog(DEBUG1, "serializing snapshot to %s", path); /* to make sure only we will write to this tempfile, include pid */ sprintf(tmppath, "pg_logical/snapshots/%X-%X.snap.%d.tmp", LSN_FORMAT_ARGS(lsn), MyProcPid); /* * Unlink temporary file if it already exists, needs to have been before a * crash/error since we won't enter this function twice from within a * single decoding slot/backend and the temporary file contains the pid of * the current process. */ if (unlink(tmppath) != 0 && errno != ENOENT) ereport(ERROR, (errcode_for_file_access(), errmsg("could not remove file \"%s\": %m", tmppath))); old_ctx = MemoryContextSwitchTo(builder->context); /* Get the catalog modifying transactions that are yet not committed */ catchange_xip = ReorderBufferGetCatalogChangesXacts(builder->reorder); catchange_xcnt = dclist_count(&builder->reorder->catchange_txns); needed_length = sizeof(SnapBuildOnDisk) + sizeof(TransactionId) * (builder->committed.xcnt + catchange_xcnt); ondisk_c = palloc0(needed_length); ondisk = (SnapBuildOnDisk *) ondisk_c; ondisk->magic = SNAPBUILD_MAGIC; ondisk->version = SNAPBUILD_VERSION; ondisk->length = needed_length; INIT_CRC32C(ondisk->checksum); COMP_CRC32C(ondisk->checksum, ((char *) ondisk) + SnapBuildOnDiskNotChecksummedSize, SnapBuildOnDiskConstantSize - SnapBuildOnDiskNotChecksummedSize); ondisk_c += sizeof(SnapBuildOnDisk); memcpy(&ondisk->builder, builder, sizeof(SnapBuild)); /* NULL-ify memory-only data */ ondisk->builder.context = NULL; ondisk->builder.snapshot = NULL; ondisk->builder.reorder = NULL; ondisk->builder.committed.xip = NULL; ondisk->builder.catchange.xip = NULL; /* update catchange only on disk data */ ondisk->builder.catchange.xcnt = catchange_xcnt; COMP_CRC32C(ondisk->checksum, &ondisk->builder, sizeof(SnapBuild)); /* copy committed xacts */ if (builder->committed.xcnt > 0) { sz = sizeof(TransactionId) * builder->committed.xcnt; memcpy(ondisk_c, builder->committed.xip, sz); COMP_CRC32C(ondisk->checksum, ondisk_c, sz); ondisk_c += sz; } /* copy catalog modifying xacts */ if (catchange_xcnt > 0) { sz = sizeof(TransactionId) * catchange_xcnt; memcpy(ondisk_c, catchange_xip, sz); COMP_CRC32C(ondisk->checksum, ondisk_c, sz); ondisk_c += sz; } FIN_CRC32C(ondisk->checksum); /* we have valid data now, open tempfile and write it there */ fd = OpenTransientFile(tmppath, O_CREAT | O_EXCL | O_WRONLY | PG_BINARY); if (fd < 0) ereport(ERROR, (errcode_for_file_access(), errmsg("could not open file \"%s\": %m", tmppath))); errno = 0; pgstat_report_wait_start(WAIT_EVENT_SNAPBUILD_WRITE); if ((write(fd, ondisk, needed_length)) != needed_length) { int save_errno = errno; CloseTransientFile(fd); /* if write didn't set errno, assume problem is no disk space */ errno = save_errno ? save_errno : ENOSPC; ereport(ERROR, (errcode_for_file_access(), errmsg("could not write to file \"%s\": %m", tmppath))); } pgstat_report_wait_end(); /* * fsync the file before renaming so that even if we crash after this we * have either a fully valid file or nothing. * * It's safe to just ERROR on fsync() here because we'll retry the whole * operation including the writes. * * TODO: Do the fsync() via checkpoints/restartpoints, doing it here has * some noticeable overhead since it's performed synchronously during * decoding? */ pgstat_report_wait_start(WAIT_EVENT_SNAPBUILD_SYNC); if (pg_fsync(fd) != 0) { int save_errno = errno; CloseTransientFile(fd); errno = save_errno; ereport(ERROR, (errcode_for_file_access(), errmsg("could not fsync file \"%s\": %m", tmppath))); } pgstat_report_wait_end(); if (CloseTransientFile(fd) != 0) ereport(ERROR, (errcode_for_file_access(), errmsg("could not close file \"%s\": %m", tmppath))); fsync_fname("pg_logical/snapshots", true); /* * We may overwrite the work from some other backend, but that's ok, our * snapshot is valid as well, we'll just have done some superfluous work. */ if (rename(tmppath, path) != 0) { ereport(ERROR, (errcode_for_file_access(), errmsg("could not rename file \"%s\" to \"%s\": %m", tmppath, path))); } /* make sure we persist */ fsync_fname(path, false); fsync_fname("pg_logical/snapshots", true); /* * Now there's no way we can lose the dumped state anymore, remember this * as a serialization point. */ builder->last_serialized_snapshot = lsn; MemoryContextSwitchTo(old_ctx); out: ReorderBufferSetRestartPoint(builder->reorder, builder->last_serialized_snapshot); /* be tidy */ if (ondisk) pfree(ondisk); if (catchange_xip) pfree(catchange_xip); } /* * Restore a snapshot into 'builder' if previously one has been stored at the * location indicated by 'lsn'. Returns true if successful, false otherwise. */ static bool SnapBuildRestore(SnapBuild *builder, XLogRecPtr lsn) { SnapBuildOnDisk ondisk; int fd; char path[MAXPGPATH]; Size sz; pg_crc32c checksum; /* no point in loading a snapshot if we're already there */ if (builder->state == SNAPBUILD_CONSISTENT) return false; sprintf(path, "pg_logical/snapshots/%X-%X.snap", LSN_FORMAT_ARGS(lsn)); fd = OpenTransientFile(path, O_RDONLY | PG_BINARY); if (fd < 0 && errno == ENOENT) return false; else if (fd < 0) ereport(ERROR, (errcode_for_file_access(), errmsg("could not open file \"%s\": %m", path))); /* ---- * Make sure the snapshot had been stored safely to disk, that's normally * cheap. * Note that we do not need PANIC here, nobody will be able to use the * slot without fsyncing, and saving it won't succeed without an fsync() * either... * ---- */ fsync_fname(path, false); fsync_fname("pg_logical/snapshots", true); /* read statically sized portion of snapshot */ SnapBuildRestoreContents(fd, (char *) &ondisk, SnapBuildOnDiskConstantSize, path); if (ondisk.magic != SNAPBUILD_MAGIC) ereport(ERROR, (errcode(ERRCODE_DATA_CORRUPTED), errmsg("snapbuild state file \"%s\" has wrong magic number: %u instead of %u", path, ondisk.magic, SNAPBUILD_MAGIC))); if (ondisk.version != SNAPBUILD_VERSION) ereport(ERROR, (errcode(ERRCODE_DATA_CORRUPTED), errmsg("snapbuild state file \"%s\" has unsupported version: %u instead of %u", path, ondisk.version, SNAPBUILD_VERSION))); INIT_CRC32C(checksum); COMP_CRC32C(checksum, ((char *) &ondisk) + SnapBuildOnDiskNotChecksummedSize, SnapBuildOnDiskConstantSize - SnapBuildOnDiskNotChecksummedSize); /* read SnapBuild */ SnapBuildRestoreContents(fd, (char *) &ondisk.builder, sizeof(SnapBuild), path); COMP_CRC32C(checksum, &ondisk.builder, sizeof(SnapBuild)); /* restore committed xacts information */ if (ondisk.builder.committed.xcnt > 0) { sz = sizeof(TransactionId) * ondisk.builder.committed.xcnt; ondisk.builder.committed.xip = MemoryContextAllocZero(builder->context, sz); SnapBuildRestoreContents(fd, (char *) ondisk.builder.committed.xip, sz, path); COMP_CRC32C(checksum, ondisk.builder.committed.xip, sz); } /* restore catalog modifying xacts information */ if (ondisk.builder.catchange.xcnt > 0) { sz = sizeof(TransactionId) * ondisk.builder.catchange.xcnt; ondisk.builder.catchange.xip = MemoryContextAllocZero(builder->context, sz); SnapBuildRestoreContents(fd, (char *) ondisk.builder.catchange.xip, sz, path); COMP_CRC32C(checksum, ondisk.builder.catchange.xip, sz); } if (CloseTransientFile(fd) != 0) ereport(ERROR, (errcode_for_file_access(), errmsg("could not close file \"%s\": %m", path))); FIN_CRC32C(checksum); /* verify checksum of what we've read */ if (!EQ_CRC32C(checksum, ondisk.checksum)) ereport(ERROR, (errcode(ERRCODE_DATA_CORRUPTED), errmsg("checksum mismatch for snapbuild state file \"%s\": is %u, should be %u", path, checksum, ondisk.checksum))); /* * ok, we now have a sensible snapshot here, figure out if it has more * information than we have. */ /* * We are only interested in consistent snapshots for now, comparing * whether one incomplete snapshot is more "advanced" seems to be * unnecessarily complex. */ if (ondisk.builder.state < SNAPBUILD_CONSISTENT) goto snapshot_not_interesting; /* * Don't use a snapshot that requires an xmin that we cannot guarantee to * be available. */ if (TransactionIdPrecedes(ondisk.builder.xmin, builder->initial_xmin_horizon)) goto snapshot_not_interesting; /* * Consistent snapshots have no next phase. Reset next_phase_at as it is * possible that an old value may remain. */ Assert(ondisk.builder.next_phase_at == InvalidTransactionId); builder->next_phase_at = InvalidTransactionId; /* ok, we think the snapshot is sensible, copy over everything important */ builder->xmin = ondisk.builder.xmin; builder->xmax = ondisk.builder.xmax; builder->state = ondisk.builder.state; builder->committed.xcnt = ondisk.builder.committed.xcnt; /* We only allocated/stored xcnt, not xcnt_space xids ! */ /* don't overwrite preallocated xip, if we don't have anything here */ if (builder->committed.xcnt > 0) { pfree(builder->committed.xip); builder->committed.xcnt_space = ondisk.builder.committed.xcnt; builder->committed.xip = ondisk.builder.committed.xip; } ondisk.builder.committed.xip = NULL; /* set catalog modifying transactions */ if (builder->catchange.xip) pfree(builder->catchange.xip); builder->catchange.xcnt = ondisk.builder.catchange.xcnt; builder->catchange.xip = ondisk.builder.catchange.xip; ondisk.builder.catchange.xip = NULL; /* our snapshot is not interesting anymore, build a new one */ if (builder->snapshot != NULL) { SnapBuildSnapDecRefcount(builder->snapshot); } builder->snapshot = SnapBuildBuildSnapshot(builder); SnapBuildSnapIncRefcount(builder->snapshot); ReorderBufferSetRestartPoint(builder->reorder, lsn); Assert(builder->state == SNAPBUILD_CONSISTENT); ereport(LOG, (errmsg("logical decoding found consistent point at %X/%X", LSN_FORMAT_ARGS(lsn)), errdetail("Logical decoding will begin using saved snapshot."))); return true; snapshot_not_interesting: if (ondisk.builder.committed.xip != NULL) pfree(ondisk.builder.committed.xip); if (ondisk.builder.catchange.xip != NULL) pfree(ondisk.builder.catchange.xip); return false; } /* * Read the contents of the serialized snapshot to 'dest'. */ static void SnapBuildRestoreContents(int fd, char *dest, Size size, const char *path) { int readBytes; pgstat_report_wait_start(WAIT_EVENT_SNAPBUILD_READ); readBytes = read(fd, dest, size); pgstat_report_wait_end(); if (readBytes != size) { int save_errno = errno; CloseTransientFile(fd); if (readBytes < 0) { errno = save_errno; ereport(ERROR, (errcode_for_file_access(), errmsg("could not read file \"%s\": %m", path))); } else ereport(ERROR, (errcode(ERRCODE_DATA_CORRUPTED), errmsg("could not read file \"%s\": read %d of %zu", path, readBytes, size))); } } /* * Remove all serialized snapshots that are not required anymore because no * slot can need them. This doesn't actually have to run during a checkpoint, * but it's a convenient point to schedule this. * * NB: We run this during checkpoints even if logical decoding is disabled so * we cleanup old slots at some point after it got disabled. */ void CheckPointSnapBuild(void) { XLogRecPtr cutoff; XLogRecPtr redo; DIR *snap_dir; struct dirent *snap_de; char path[MAXPGPATH + 21]; /* * We start off with a minimum of the last redo pointer. No new * replication slot will start before that, so that's a safe upper bound * for removal. */ redo = GetRedoRecPtr(); /* now check for the restart ptrs from existing slots */ cutoff = ReplicationSlotsComputeLogicalRestartLSN(); /* don't start earlier than the restart lsn */ if (redo < cutoff) cutoff = redo; snap_dir = AllocateDir("pg_logical/snapshots"); while ((snap_de = ReadDir(snap_dir, "pg_logical/snapshots")) != NULL) { uint32 hi; uint32 lo; XLogRecPtr lsn; PGFileType de_type; if (strcmp(snap_de->d_name, ".") == 0 || strcmp(snap_de->d_name, "..") == 0) continue; snprintf(path, sizeof(path), "pg_logical/snapshots/%s", snap_de->d_name); de_type = get_dirent_type(path, snap_de, false, DEBUG1); if (de_type != PGFILETYPE_ERROR && de_type != PGFILETYPE_REG) { elog(DEBUG1, "only regular files expected: %s", path); continue; } /* * temporary filenames from SnapBuildSerialize() include the LSN and * everything but are postfixed by .$pid.tmp. We can just remove them * the same as other files because there can be none that are * currently being written that are older than cutoff. * * We just log a message if a file doesn't fit the pattern, it's * probably some editors lock/state file or similar... */ if (sscanf(snap_de->d_name, "%X-%X.snap", &hi, &lo) != 2) { ereport(LOG, (errmsg("could not parse file name \"%s\"", path))); continue; } lsn = ((uint64) hi) << 32 | lo; /* check whether we still need it */ if (lsn < cutoff || cutoff == InvalidXLogRecPtr) { elog(DEBUG1, "removing snapbuild snapshot %s", path); /* * It's not particularly harmful, though strange, if we can't * remove the file here. Don't prevent the checkpoint from * completing, that'd be a cure worse than the disease. */ if (unlink(path) < 0) { ereport(LOG, (errcode_for_file_access(), errmsg("could not remove file \"%s\": %m", path))); continue; } } } FreeDir(snap_dir); }