summaryrefslogtreecommitdiffstats
path: root/src/backend/utils/time
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-13 13:44:03 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-13 13:44:03 +0000
commit293913568e6a7a86fd1479e1cff8e2ecb58d6568 (patch)
treefc3b469a3ec5ab71b36ea97cc7aaddb838423a0c /src/backend/utils/time
parentInitial commit. (diff)
downloadpostgresql-16-293913568e6a7a86fd1479e1cff8e2ecb58d6568.tar.xz
postgresql-16-293913568e6a7a86fd1479e1cff8e2ecb58d6568.zip
Adding upstream version 16.2.upstream/16.2
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'src/backend/utils/time')
-rw-r--r--src/backend/utils/time/Makefile19
-rw-r--r--src/backend/utils/time/combocid.c364
-rw-r--r--src/backend/utils/time/meson.build6
-rw-r--r--src/backend/utils/time/snapmgr.c2381
4 files changed, 2770 insertions, 0 deletions
diff --git a/src/backend/utils/time/Makefile b/src/backend/utils/time/Makefile
new file mode 100644
index 0000000..380dd2f
--- /dev/null
+++ b/src/backend/utils/time/Makefile
@@ -0,0 +1,19 @@
+#-------------------------------------------------------------------------
+#
+# Makefile--
+# Makefile for utils/time
+#
+# IDENTIFICATION
+# src/backend/utils/time/Makefile
+#
+#-------------------------------------------------------------------------
+
+subdir = src/backend/utils/time
+top_builddir = ../../../..
+include $(top_builddir)/src/Makefile.global
+
+OBJS = \
+ combocid.o \
+ snapmgr.o
+
+include $(top_srcdir)/src/backend/common.mk
diff --git a/src/backend/utils/time/combocid.c b/src/backend/utils/time/combocid.c
new file mode 100644
index 0000000..0e94bc9
--- /dev/null
+++ b/src/backend/utils/time/combocid.c
@@ -0,0 +1,364 @@
+/*-------------------------------------------------------------------------
+ *
+ * combocid.c
+ * Combo command ID support routines
+ *
+ * Before version 8.3, HeapTupleHeaderData had separate fields for cmin
+ * and cmax. To reduce the header size, cmin and cmax are now overlayed
+ * in the same field in the header. That usually works because you rarely
+ * insert and delete a tuple in the same transaction, and we don't need
+ * either field to remain valid after the originating transaction exits.
+ * To make it work when the inserting transaction does delete the tuple,
+ * we create a "combo" command ID and store that in the tuple header
+ * instead of cmin and cmax. The combo command ID can be mapped to the
+ * real cmin and cmax using a backend-private array, which is managed by
+ * this module.
+ *
+ * To allow reusing existing combo CIDs, we also keep a hash table that
+ * maps cmin,cmax pairs to combo CIDs. This keeps the data structure size
+ * reasonable in most cases, since the number of unique pairs used by any
+ * one transaction is likely to be small.
+ *
+ * With a 32-bit combo command id we can represent 2^32 distinct cmin,cmax
+ * combinations. In the most perverse case where each command deletes a tuple
+ * generated by every previous command, the number of combo command ids
+ * required for N commands is N*(N+1)/2. That means that in the worst case,
+ * that's enough for 92682 commands. In practice, you'll run out of memory
+ * and/or disk space way before you reach that limit.
+ *
+ * The array and hash table are kept in TopTransactionContext, and are
+ * destroyed at the end of each transaction.
+ *
+ *
+ * Portions Copyright (c) 1996-2023, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * IDENTIFICATION
+ * src/backend/utils/time/combocid.c
+ *
+ *-------------------------------------------------------------------------
+ */
+
+#include "postgres.h"
+
+#include "access/htup_details.h"
+#include "access/xact.h"
+#include "miscadmin.h"
+#include "storage/shmem.h"
+#include "utils/combocid.h"
+#include "utils/hsearch.h"
+#include "utils/memutils.h"
+
+/* Hash table to lookup combo CIDs by cmin and cmax */
+static HTAB *comboHash = NULL;
+
+/* Key and entry structures for the hash table */
+typedef struct
+{
+ CommandId cmin;
+ CommandId cmax;
+} ComboCidKeyData;
+
+typedef ComboCidKeyData *ComboCidKey;
+
+typedef struct
+{
+ ComboCidKeyData key;
+ CommandId combocid;
+} ComboCidEntryData;
+
+typedef ComboCidEntryData *ComboCidEntry;
+
+/* Initial size of the hash table */
+#define CCID_HASH_SIZE 100
+
+
+/*
+ * An array of cmin,cmax pairs, indexed by combo command id.
+ * To convert a combo CID to cmin and cmax, you do a simple array lookup.
+ */
+static ComboCidKey comboCids = NULL;
+static int usedComboCids = 0; /* number of elements in comboCids */
+static int sizeComboCids = 0; /* allocated size of array */
+
+/* Initial size of the array */
+#define CCID_ARRAY_SIZE 100
+
+
+/* prototypes for internal functions */
+static CommandId GetComboCommandId(CommandId cmin, CommandId cmax);
+static CommandId GetRealCmin(CommandId combocid);
+static CommandId GetRealCmax(CommandId combocid);
+
+
+/**** External API ****/
+
+/*
+ * GetCmin and GetCmax assert that they are only called in situations where
+ * they make sense, that is, can deliver a useful answer. If you have
+ * reason to examine a tuple's t_cid field from a transaction other than
+ * the originating one, use HeapTupleHeaderGetRawCommandId() directly.
+ */
+
+CommandId
+HeapTupleHeaderGetCmin(HeapTupleHeader tup)
+{
+ CommandId cid = HeapTupleHeaderGetRawCommandId(tup);
+
+ Assert(!(tup->t_infomask & HEAP_MOVED));
+ Assert(TransactionIdIsCurrentTransactionId(HeapTupleHeaderGetXmin(tup)));
+
+ if (tup->t_infomask & HEAP_COMBOCID)
+ return GetRealCmin(cid);
+ else
+ return cid;
+}
+
+CommandId
+HeapTupleHeaderGetCmax(HeapTupleHeader tup)
+{
+ CommandId cid = HeapTupleHeaderGetRawCommandId(tup);
+
+ Assert(!(tup->t_infomask & HEAP_MOVED));
+
+ /*
+ * Because GetUpdateXid() performs memory allocations if xmax is a
+ * multixact we can't Assert() if we're inside a critical section. This
+ * weakens the check, but not using GetCmax() inside one would complicate
+ * things too much.
+ */
+ Assert(CritSectionCount > 0 ||
+ TransactionIdIsCurrentTransactionId(HeapTupleHeaderGetUpdateXid(tup)));
+
+ if (tup->t_infomask & HEAP_COMBOCID)
+ return GetRealCmax(cid);
+ else
+ return cid;
+}
+
+/*
+ * Given a tuple we are about to delete, determine the correct value to store
+ * into its t_cid field.
+ *
+ * If we don't need a combo CID, *cmax is unchanged and *iscombo is set to
+ * false. If we do need one, *cmax is replaced by a combo CID and *iscombo
+ * is set to true.
+ *
+ * The reason this is separate from the actual HeapTupleHeaderSetCmax()
+ * operation is that this could fail due to out-of-memory conditions. Hence
+ * we need to do this before entering the critical section that actually
+ * changes the tuple in shared buffers.
+ */
+void
+HeapTupleHeaderAdjustCmax(HeapTupleHeader tup,
+ CommandId *cmax,
+ bool *iscombo)
+{
+ /*
+ * If we're marking a tuple deleted that was inserted by (any
+ * subtransaction of) our transaction, we need to use a combo command id.
+ * Test for HeapTupleHeaderXminCommitted() first, because it's cheaper
+ * than a TransactionIdIsCurrentTransactionId call.
+ */
+ if (!HeapTupleHeaderXminCommitted(tup) &&
+ TransactionIdIsCurrentTransactionId(HeapTupleHeaderGetRawXmin(tup)))
+ {
+ CommandId cmin = HeapTupleHeaderGetCmin(tup);
+
+ *cmax = GetComboCommandId(cmin, *cmax);
+ *iscombo = true;
+ }
+ else
+ {
+ *iscombo = false;
+ }
+}
+
+/*
+ * Combo command ids are only interesting to the inserting and deleting
+ * transaction, so we can forget about them at the end of transaction.
+ */
+void
+AtEOXact_ComboCid(void)
+{
+ /*
+ * Don't bother to pfree. These are allocated in TopTransactionContext, so
+ * they're going to go away at the end of transaction anyway.
+ */
+ comboHash = NULL;
+
+ comboCids = NULL;
+ usedComboCids = 0;
+ sizeComboCids = 0;
+}
+
+
+/**** Internal routines ****/
+
+/*
+ * Get a combo command id that maps to cmin and cmax.
+ *
+ * We try to reuse old combo command ids when possible.
+ */
+static CommandId
+GetComboCommandId(CommandId cmin, CommandId cmax)
+{
+ CommandId combocid;
+ ComboCidKeyData key;
+ ComboCidEntry entry;
+ bool found;
+
+ /*
+ * Create the hash table and array the first time we need to use combo
+ * cids in the transaction.
+ */
+ if (comboHash == NULL)
+ {
+ HASHCTL hash_ctl;
+
+ /* Make array first; existence of hash table asserts array exists */
+ comboCids = (ComboCidKeyData *)
+ MemoryContextAlloc(TopTransactionContext,
+ sizeof(ComboCidKeyData) * CCID_ARRAY_SIZE);
+ sizeComboCids = CCID_ARRAY_SIZE;
+ usedComboCids = 0;
+
+ hash_ctl.keysize = sizeof(ComboCidKeyData);
+ hash_ctl.entrysize = sizeof(ComboCidEntryData);
+ hash_ctl.hcxt = TopTransactionContext;
+
+ comboHash = hash_create("Combo CIDs",
+ CCID_HASH_SIZE,
+ &hash_ctl,
+ HASH_ELEM | HASH_BLOBS | HASH_CONTEXT);
+ }
+
+ /*
+ * Grow the array if there's not at least one free slot. We must do this
+ * before possibly entering a new hashtable entry, else failure to
+ * repalloc would leave a corrupt hashtable entry behind.
+ */
+ if (usedComboCids >= sizeComboCids)
+ {
+ int newsize = sizeComboCids * 2;
+
+ comboCids = (ComboCidKeyData *)
+ repalloc(comboCids, sizeof(ComboCidKeyData) * newsize);
+ sizeComboCids = newsize;
+ }
+
+ /* Lookup or create a hash entry with the desired cmin/cmax */
+
+ /* We assume there is no struct padding in ComboCidKeyData! */
+ key.cmin = cmin;
+ key.cmax = cmax;
+ entry = (ComboCidEntry) hash_search(comboHash,
+ &key,
+ HASH_ENTER,
+ &found);
+
+ if (found)
+ {
+ /* Reuse an existing combo CID */
+ return entry->combocid;
+ }
+
+ /* We have to create a new combo CID; we already made room in the array */
+ combocid = usedComboCids;
+
+ comboCids[combocid].cmin = cmin;
+ comboCids[combocid].cmax = cmax;
+ usedComboCids++;
+
+ entry->combocid = combocid;
+
+ return combocid;
+}
+
+static CommandId
+GetRealCmin(CommandId combocid)
+{
+ Assert(combocid < usedComboCids);
+ return comboCids[combocid].cmin;
+}
+
+static CommandId
+GetRealCmax(CommandId combocid)
+{
+ Assert(combocid < usedComboCids);
+ return comboCids[combocid].cmax;
+}
+
+/*
+ * Estimate the amount of space required to serialize the current combo CID
+ * state.
+ */
+Size
+EstimateComboCIDStateSpace(void)
+{
+ Size size;
+
+ /* Add space required for saving usedComboCids */
+ size = sizeof(int);
+
+ /* Add space required for saving ComboCidKeyData */
+ size = add_size(size, mul_size(sizeof(ComboCidKeyData), usedComboCids));
+
+ return size;
+}
+
+/*
+ * Serialize the combo CID state into the memory, beginning at start_address.
+ * maxsize should be at least as large as the value returned by
+ * EstimateComboCIDStateSpace.
+ */
+void
+SerializeComboCIDState(Size maxsize, char *start_address)
+{
+ char *endptr;
+
+ /* First, we store the number of currently-existing combo CIDs. */
+ *(int *) start_address = usedComboCids;
+
+ /* If maxsize is too small, throw an error. */
+ endptr = start_address + sizeof(int) +
+ (sizeof(ComboCidKeyData) * usedComboCids);
+ if (endptr < start_address || endptr > start_address + maxsize)
+ elog(ERROR, "not enough space to serialize ComboCID state");
+
+ /* Now, copy the actual cmin/cmax pairs. */
+ if (usedComboCids > 0)
+ memcpy(start_address + sizeof(int), comboCids,
+ (sizeof(ComboCidKeyData) * usedComboCids));
+}
+
+/*
+ * Read the combo CID state at the specified address and initialize this
+ * backend with the same combo CIDs. This is only valid in a backend that
+ * currently has no combo CIDs (and only makes sense if the transaction state
+ * is serialized and restored as well).
+ */
+void
+RestoreComboCIDState(char *comboCIDstate)
+{
+ int num_elements;
+ ComboCidKeyData *keydata;
+ int i;
+ CommandId cid;
+
+ Assert(!comboCids && !comboHash);
+
+ /* First, we retrieve the number of combo CIDs that were serialized. */
+ num_elements = *(int *) comboCIDstate;
+ keydata = (ComboCidKeyData *) (comboCIDstate + sizeof(int));
+
+ /* Use GetComboCommandId to restore each combo CID. */
+ for (i = 0; i < num_elements; i++)
+ {
+ cid = GetComboCommandId(keydata[i].cmin, keydata[i].cmax);
+
+ /* Verify that we got the expected answer. */
+ if (cid != i)
+ elog(ERROR, "unexpected command ID while restoring combo CIDs");
+ }
+}
diff --git a/src/backend/utils/time/meson.build b/src/backend/utils/time/meson.build
new file mode 100644
index 0000000..36166b2
--- /dev/null
+++ b/src/backend/utils/time/meson.build
@@ -0,0 +1,6 @@
+# Copyright (c) 2022-2023, PostgreSQL Global Development Group
+
+backend_sources += files(
+ 'combocid.c',
+ 'snapmgr.c',
+)
diff --git a/src/backend/utils/time/snapmgr.c b/src/backend/utils/time/snapmgr.c
new file mode 100644
index 0000000..3a419e3
--- /dev/null
+++ b/src/backend/utils/time/snapmgr.c
@@ -0,0 +1,2381 @@
+/*-------------------------------------------------------------------------
+ *
+ * snapmgr.c
+ * PostgreSQL snapshot manager
+ *
+ * We keep track of snapshots in two ways: those "registered" by resowner.c,
+ * and the "active snapshot" stack. All snapshots in either of them live in
+ * persistent memory. When a snapshot is no longer in any of these lists
+ * (tracked by separate refcounts on each snapshot), its memory can be freed.
+ *
+ * The FirstXactSnapshot, if any, is treated a bit specially: we increment its
+ * regd_count and list it in RegisteredSnapshots, but this reference is not
+ * tracked by a resource owner. We used to use the TopTransactionResourceOwner
+ * to track this snapshot reference, but that introduces logical circularity
+ * and thus makes it impossible to clean up in a sane fashion. It's better to
+ * handle this reference as an internally-tracked registration, so that this
+ * module is entirely lower-level than ResourceOwners.
+ *
+ * Likewise, any snapshots that have been exported by pg_export_snapshot
+ * have regd_count = 1 and are listed in RegisteredSnapshots, but are not
+ * tracked by any resource owner.
+ *
+ * Likewise, the CatalogSnapshot is listed in RegisteredSnapshots when it
+ * is valid, but is not tracked by any resource owner.
+ *
+ * The same is true for historic snapshots used during logical decoding,
+ * their lifetime is managed separately (as they live longer than one xact.c
+ * transaction).
+ *
+ * These arrangements let us reset MyProc->xmin when there are no snapshots
+ * referenced by this transaction, and advance it when the one with oldest
+ * Xmin is no longer referenced. For simplicity however, only registered
+ * snapshots not active snapshots participate in tracking which one is oldest;
+ * we don't try to change MyProc->xmin except when the active-snapshot
+ * stack is empty.
+ *
+ *
+ * Portions Copyright (c) 1996-2023, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * IDENTIFICATION
+ * src/backend/utils/time/snapmgr.c
+ *
+ *-------------------------------------------------------------------------
+ */
+#include "postgres.h"
+
+#include <sys/stat.h>
+#include <unistd.h>
+
+#include "access/subtrans.h"
+#include "access/transam.h"
+#include "access/xact.h"
+#include "access/xlog.h"
+#include "catalog/catalog.h"
+#include "datatype/timestamp.h"
+#include "lib/pairingheap.h"
+#include "miscadmin.h"
+#include "port/pg_lfind.h"
+#include "storage/predicate.h"
+#include "storage/proc.h"
+#include "storage/procarray.h"
+#include "storage/sinval.h"
+#include "storage/sinvaladt.h"
+#include "storage/spin.h"
+#include "utils/builtins.h"
+#include "utils/memutils.h"
+#include "utils/old_snapshot.h"
+#include "utils/rel.h"
+#include "utils/resowner_private.h"
+#include "utils/snapmgr.h"
+#include "utils/syscache.h"
+#include "utils/timestamp.h"
+
+
+/*
+ * GUC parameters
+ */
+int old_snapshot_threshold; /* number of minutes, -1 disables */
+
+volatile OldSnapshotControlData *oldSnapshotControl;
+
+
+/*
+ * CurrentSnapshot points to the only snapshot taken in transaction-snapshot
+ * mode, and to the latest one taken in a read-committed transaction.
+ * SecondarySnapshot is a snapshot that's always up-to-date as of the current
+ * instant, even in transaction-snapshot mode. It should only be used for
+ * special-purpose code (say, RI checking.) CatalogSnapshot points to an
+ * MVCC snapshot intended to be used for catalog scans; we must invalidate it
+ * whenever a system catalog change occurs.
+ *
+ * These SnapshotData structs are static to simplify memory allocation
+ * (see the hack in GetSnapshotData to avoid repeated malloc/free).
+ */
+static SnapshotData CurrentSnapshotData = {SNAPSHOT_MVCC};
+static SnapshotData SecondarySnapshotData = {SNAPSHOT_MVCC};
+SnapshotData CatalogSnapshotData = {SNAPSHOT_MVCC};
+SnapshotData SnapshotSelfData = {SNAPSHOT_SELF};
+SnapshotData SnapshotAnyData = {SNAPSHOT_ANY};
+
+/* Pointers to valid snapshots */
+static Snapshot CurrentSnapshot = NULL;
+static Snapshot SecondarySnapshot = NULL;
+static Snapshot CatalogSnapshot = NULL;
+static Snapshot HistoricSnapshot = NULL;
+
+/*
+ * These are updated by GetSnapshotData. We initialize them this way
+ * for the convenience of TransactionIdIsInProgress: even in bootstrap
+ * mode, we don't want it to say that BootstrapTransactionId is in progress.
+ */
+TransactionId TransactionXmin = FirstNormalTransactionId;
+TransactionId RecentXmin = FirstNormalTransactionId;
+
+/* (table, ctid) => (cmin, cmax) mapping during timetravel */
+static HTAB *tuplecid_data = NULL;
+
+/*
+ * Elements of the active snapshot stack.
+ *
+ * Each element here accounts for exactly one active_count on SnapshotData.
+ *
+ * NB: the code assumes that elements in this list are in non-increasing
+ * order of as_level; also, the list must be NULL-terminated.
+ */
+typedef struct ActiveSnapshotElt
+{
+ Snapshot as_snap;
+ int as_level;
+ struct ActiveSnapshotElt *as_next;
+} ActiveSnapshotElt;
+
+/* Top of the stack of active snapshots */
+static ActiveSnapshotElt *ActiveSnapshot = NULL;
+
+/* Bottom of the stack of active snapshots */
+static ActiveSnapshotElt *OldestActiveSnapshot = NULL;
+
+/*
+ * Currently registered Snapshots. Ordered in a heap by xmin, so that we can
+ * quickly find the one with lowest xmin, to advance our MyProc->xmin.
+ */
+static int xmin_cmp(const pairingheap_node *a, const pairingheap_node *b,
+ void *arg);
+
+static pairingheap RegisteredSnapshots = {&xmin_cmp, NULL, NULL};
+
+/* first GetTransactionSnapshot call in a transaction? */
+bool FirstSnapshotSet = false;
+
+/*
+ * Remember the serializable transaction snapshot, if any. We cannot trust
+ * FirstSnapshotSet in combination with IsolationUsesXactSnapshot(), because
+ * GUC may be reset before us, changing the value of IsolationUsesXactSnapshot.
+ */
+static Snapshot FirstXactSnapshot = NULL;
+
+/* Define pathname of exported-snapshot files */
+#define SNAPSHOT_EXPORT_DIR "pg_snapshots"
+
+/* Structure holding info about exported snapshot. */
+typedef struct ExportedSnapshot
+{
+ char *snapfile;
+ Snapshot snapshot;
+} ExportedSnapshot;
+
+/* Current xact's exported snapshots (a list of ExportedSnapshot structs) */
+static List *exportedSnapshots = NIL;
+
+/* Prototypes for local functions */
+static TimestampTz AlignTimestampToMinuteBoundary(TimestampTz ts);
+static Snapshot CopySnapshot(Snapshot snapshot);
+static void FreeSnapshot(Snapshot snapshot);
+static void SnapshotResetXmin(void);
+
+/*
+ * Snapshot fields to be serialized.
+ *
+ * Only these fields need to be sent to the cooperating backend; the
+ * remaining ones can (and must) be set by the receiver upon restore.
+ */
+typedef struct SerializedSnapshotData
+{
+ TransactionId xmin;
+ TransactionId xmax;
+ uint32 xcnt;
+ int32 subxcnt;
+ bool suboverflowed;
+ bool takenDuringRecovery;
+ CommandId curcid;
+ TimestampTz whenTaken;
+ XLogRecPtr lsn;
+} SerializedSnapshotData;
+
+Size
+SnapMgrShmemSize(void)
+{
+ Size size;
+
+ size = offsetof(OldSnapshotControlData, xid_by_minute);
+ if (old_snapshot_threshold > 0)
+ size = add_size(size, mul_size(sizeof(TransactionId),
+ OLD_SNAPSHOT_TIME_MAP_ENTRIES));
+
+ return size;
+}
+
+/*
+ * Initialize for managing old snapshot detection.
+ */
+void
+SnapMgrInit(void)
+{
+ bool found;
+
+ /*
+ * Create or attach to the OldSnapshotControlData structure.
+ */
+ oldSnapshotControl = (volatile OldSnapshotControlData *)
+ ShmemInitStruct("OldSnapshotControlData",
+ SnapMgrShmemSize(), &found);
+
+ if (!found)
+ {
+ SpinLockInit(&oldSnapshotControl->mutex_current);
+ oldSnapshotControl->current_timestamp = 0;
+ SpinLockInit(&oldSnapshotControl->mutex_latest_xmin);
+ oldSnapshotControl->latest_xmin = InvalidTransactionId;
+ oldSnapshotControl->next_map_update = 0;
+ SpinLockInit(&oldSnapshotControl->mutex_threshold);
+ oldSnapshotControl->threshold_timestamp = 0;
+ oldSnapshotControl->threshold_xid = InvalidTransactionId;
+ oldSnapshotControl->head_offset = 0;
+ oldSnapshotControl->head_timestamp = 0;
+ oldSnapshotControl->count_used = 0;
+ }
+}
+
+/*
+ * GetTransactionSnapshot
+ * Get the appropriate snapshot for a new query in a transaction.
+ *
+ * Note that the return value may point at static storage that will be modified
+ * by future calls and by CommandCounterIncrement(). Callers should call
+ * RegisterSnapshot or PushActiveSnapshot on the returned snap if it is to be
+ * used very long.
+ */
+Snapshot
+GetTransactionSnapshot(void)
+{
+ /*
+ * Return historic snapshot if doing logical decoding. We'll never need a
+ * non-historic transaction snapshot in this (sub-)transaction, so there's
+ * no need to be careful to set one up for later calls to
+ * GetTransactionSnapshot().
+ */
+ if (HistoricSnapshotActive())
+ {
+ Assert(!FirstSnapshotSet);
+ return HistoricSnapshot;
+ }
+
+ /* First call in transaction? */
+ if (!FirstSnapshotSet)
+ {
+ /*
+ * Don't allow catalog snapshot to be older than xact snapshot. Must
+ * do this first to allow the empty-heap Assert to succeed.
+ */
+ InvalidateCatalogSnapshot();
+
+ Assert(pairingheap_is_empty(&RegisteredSnapshots));
+ Assert(FirstXactSnapshot == NULL);
+
+ if (IsInParallelMode())
+ elog(ERROR,
+ "cannot take query snapshot during a parallel operation");
+
+ /*
+ * In transaction-snapshot mode, the first snapshot must live until
+ * end of xact regardless of what the caller does with it, so we must
+ * make a copy of it rather than returning CurrentSnapshotData
+ * directly. Furthermore, if we're running in serializable mode,
+ * predicate.c needs to wrap the snapshot fetch in its own processing.
+ */
+ if (IsolationUsesXactSnapshot())
+ {
+ /* First, create the snapshot in CurrentSnapshotData */
+ if (IsolationIsSerializable())
+ CurrentSnapshot = GetSerializableTransactionSnapshot(&CurrentSnapshotData);
+ else
+ CurrentSnapshot = GetSnapshotData(&CurrentSnapshotData);
+ /* Make a saved copy */
+ CurrentSnapshot = CopySnapshot(CurrentSnapshot);
+ FirstXactSnapshot = CurrentSnapshot;
+ /* Mark it as "registered" in FirstXactSnapshot */
+ FirstXactSnapshot->regd_count++;
+ pairingheap_add(&RegisteredSnapshots, &FirstXactSnapshot->ph_node);
+ }
+ else
+ CurrentSnapshot = GetSnapshotData(&CurrentSnapshotData);
+
+ FirstSnapshotSet = true;
+ return CurrentSnapshot;
+ }
+
+ if (IsolationUsesXactSnapshot())
+ return CurrentSnapshot;
+
+ /* Don't allow catalog snapshot to be older than xact snapshot. */
+ InvalidateCatalogSnapshot();
+
+ CurrentSnapshot = GetSnapshotData(&CurrentSnapshotData);
+
+ return CurrentSnapshot;
+}
+
+/*
+ * GetLatestSnapshot
+ * Get a snapshot that is up-to-date as of the current instant,
+ * even if we are executing in transaction-snapshot mode.
+ */
+Snapshot
+GetLatestSnapshot(void)
+{
+ /*
+ * We might be able to relax this, but nothing that could otherwise work
+ * needs it.
+ */
+ if (IsInParallelMode())
+ elog(ERROR,
+ "cannot update SecondarySnapshot during a parallel operation");
+
+ /*
+ * So far there are no cases requiring support for GetLatestSnapshot()
+ * during logical decoding, but it wouldn't be hard to add if required.
+ */
+ Assert(!HistoricSnapshotActive());
+
+ /* If first call in transaction, go ahead and set the xact snapshot */
+ if (!FirstSnapshotSet)
+ return GetTransactionSnapshot();
+
+ SecondarySnapshot = GetSnapshotData(&SecondarySnapshotData);
+
+ return SecondarySnapshot;
+}
+
+/*
+ * GetOldestSnapshot
+ *
+ * Get the transaction's oldest known snapshot, as judged by the LSN.
+ * Will return NULL if there are no active or registered snapshots.
+ */
+Snapshot
+GetOldestSnapshot(void)
+{
+ Snapshot OldestRegisteredSnapshot = NULL;
+ XLogRecPtr RegisteredLSN = InvalidXLogRecPtr;
+
+ if (!pairingheap_is_empty(&RegisteredSnapshots))
+ {
+ OldestRegisteredSnapshot = pairingheap_container(SnapshotData, ph_node,
+ pairingheap_first(&RegisteredSnapshots));
+ RegisteredLSN = OldestRegisteredSnapshot->lsn;
+ }
+
+ if (OldestActiveSnapshot != NULL)
+ {
+ XLogRecPtr ActiveLSN = OldestActiveSnapshot->as_snap->lsn;
+
+ if (XLogRecPtrIsInvalid(RegisteredLSN) || RegisteredLSN > ActiveLSN)
+ return OldestActiveSnapshot->as_snap;
+ }
+
+ return OldestRegisteredSnapshot;
+}
+
+/*
+ * GetCatalogSnapshot
+ * Get a snapshot that is sufficiently up-to-date for scan of the
+ * system catalog with the specified OID.
+ */
+Snapshot
+GetCatalogSnapshot(Oid relid)
+{
+ /*
+ * Return historic snapshot while we're doing logical decoding, so we can
+ * see the appropriate state of the catalog.
+ *
+ * This is the primary reason for needing to reset the system caches after
+ * finishing decoding.
+ */
+ if (HistoricSnapshotActive())
+ return HistoricSnapshot;
+
+ return GetNonHistoricCatalogSnapshot(relid);
+}
+
+/*
+ * GetNonHistoricCatalogSnapshot
+ * Get a snapshot that is sufficiently up-to-date for scan of the system
+ * catalog with the specified OID, even while historic snapshots are set
+ * up.
+ */
+Snapshot
+GetNonHistoricCatalogSnapshot(Oid relid)
+{
+ /*
+ * If the caller is trying to scan a relation that has no syscache, no
+ * catcache invalidations will be sent when it is updated. For a few key
+ * relations, snapshot invalidations are sent instead. If we're trying to
+ * scan a relation for which neither catcache nor snapshot invalidations
+ * are sent, we must refresh the snapshot every time.
+ */
+ if (CatalogSnapshot &&
+ !RelationInvalidatesSnapshotsOnly(relid) &&
+ !RelationHasSysCache(relid))
+ InvalidateCatalogSnapshot();
+
+ if (CatalogSnapshot == NULL)
+ {
+ /* Get new snapshot. */
+ CatalogSnapshot = GetSnapshotData(&CatalogSnapshotData);
+
+ /*
+ * Make sure the catalog snapshot will be accounted for in decisions
+ * about advancing PGPROC->xmin. We could apply RegisterSnapshot, but
+ * that would result in making a physical copy, which is overkill; and
+ * it would also create a dependency on some resource owner, which we
+ * do not want for reasons explained at the head of this file. Instead
+ * just shove the CatalogSnapshot into the pairing heap manually. This
+ * has to be reversed in InvalidateCatalogSnapshot, of course.
+ *
+ * NB: it had better be impossible for this to throw error, since the
+ * CatalogSnapshot pointer is already valid.
+ */
+ pairingheap_add(&RegisteredSnapshots, &CatalogSnapshot->ph_node);
+ }
+
+ return CatalogSnapshot;
+}
+
+/*
+ * InvalidateCatalogSnapshot
+ * Mark the current catalog snapshot, if any, as invalid
+ *
+ * We could change this API to allow the caller to provide more fine-grained
+ * invalidation details, so that a change to relation A wouldn't prevent us
+ * from using our cached snapshot to scan relation B, but so far there's no
+ * evidence that the CPU cycles we spent tracking such fine details would be
+ * well-spent.
+ */
+void
+InvalidateCatalogSnapshot(void)
+{
+ if (CatalogSnapshot)
+ {
+ pairingheap_remove(&RegisteredSnapshots, &CatalogSnapshot->ph_node);
+ CatalogSnapshot = NULL;
+ SnapshotResetXmin();
+ }
+}
+
+/*
+ * InvalidateCatalogSnapshotConditionally
+ * Drop catalog snapshot if it's the only one we have
+ *
+ * This is called when we are about to wait for client input, so we don't
+ * want to continue holding the catalog snapshot if it might mean that the
+ * global xmin horizon can't advance. However, if there are other snapshots
+ * still active or registered, the catalog snapshot isn't likely to be the
+ * oldest one, so we might as well keep it.
+ */
+void
+InvalidateCatalogSnapshotConditionally(void)
+{
+ if (CatalogSnapshot &&
+ ActiveSnapshot == NULL &&
+ pairingheap_is_singular(&RegisteredSnapshots))
+ InvalidateCatalogSnapshot();
+}
+
+/*
+ * SnapshotSetCommandId
+ * Propagate CommandCounterIncrement into the static snapshots, if set
+ */
+void
+SnapshotSetCommandId(CommandId curcid)
+{
+ if (!FirstSnapshotSet)
+ return;
+
+ if (CurrentSnapshot)
+ CurrentSnapshot->curcid = curcid;
+ if (SecondarySnapshot)
+ SecondarySnapshot->curcid = curcid;
+ /* Should we do the same with CatalogSnapshot? */
+}
+
+/*
+ * SetTransactionSnapshot
+ * Set the transaction's snapshot from an imported MVCC snapshot.
+ *
+ * Note that this is very closely tied to GetTransactionSnapshot --- it
+ * must take care of all the same considerations as the first-snapshot case
+ * in GetTransactionSnapshot.
+ */
+static void
+SetTransactionSnapshot(Snapshot sourcesnap, VirtualTransactionId *sourcevxid,
+ int sourcepid, PGPROC *sourceproc)
+{
+ /* Caller should have checked this already */
+ Assert(!FirstSnapshotSet);
+
+ /* Better do this to ensure following Assert succeeds. */
+ InvalidateCatalogSnapshot();
+
+ Assert(pairingheap_is_empty(&RegisteredSnapshots));
+ Assert(FirstXactSnapshot == NULL);
+ Assert(!HistoricSnapshotActive());
+
+ /*
+ * Even though we are not going to use the snapshot it computes, we must
+ * call GetSnapshotData, for two reasons: (1) to be sure that
+ * CurrentSnapshotData's XID arrays have been allocated, and (2) to update
+ * the state for GlobalVis*.
+ */
+ CurrentSnapshot = GetSnapshotData(&CurrentSnapshotData);
+
+ /*
+ * Now copy appropriate fields from the source snapshot.
+ */
+ CurrentSnapshot->xmin = sourcesnap->xmin;
+ CurrentSnapshot->xmax = sourcesnap->xmax;
+ CurrentSnapshot->xcnt = sourcesnap->xcnt;
+ Assert(sourcesnap->xcnt <= GetMaxSnapshotXidCount());
+ if (sourcesnap->xcnt > 0)
+ memcpy(CurrentSnapshot->xip, sourcesnap->xip,
+ sourcesnap->xcnt * sizeof(TransactionId));
+ CurrentSnapshot->subxcnt = sourcesnap->subxcnt;
+ Assert(sourcesnap->subxcnt <= GetMaxSnapshotSubxidCount());
+ if (sourcesnap->subxcnt > 0)
+ memcpy(CurrentSnapshot->subxip, sourcesnap->subxip,
+ sourcesnap->subxcnt * sizeof(TransactionId));
+ CurrentSnapshot->suboverflowed = sourcesnap->suboverflowed;
+ CurrentSnapshot->takenDuringRecovery = sourcesnap->takenDuringRecovery;
+ /* NB: curcid should NOT be copied, it's a local matter */
+
+ CurrentSnapshot->snapXactCompletionCount = 0;
+
+ /*
+ * Now we have to fix what GetSnapshotData did with MyProc->xmin and
+ * TransactionXmin. There is a race condition: to make sure we are not
+ * causing the global xmin to go backwards, we have to test that the
+ * source transaction is still running, and that has to be done
+ * atomically. So let procarray.c do it.
+ *
+ * Note: in serializable mode, predicate.c will do this a second time. It
+ * doesn't seem worth contorting the logic here to avoid two calls,
+ * especially since it's not clear that predicate.c *must* do this.
+ */
+ if (sourceproc != NULL)
+ {
+ if (!ProcArrayInstallRestoredXmin(CurrentSnapshot->xmin, sourceproc))
+ ereport(ERROR,
+ (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+ errmsg("could not import the requested snapshot"),
+ errdetail("The source transaction is not running anymore.")));
+ }
+ else if (!ProcArrayInstallImportedXmin(CurrentSnapshot->xmin, sourcevxid))
+ ereport(ERROR,
+ (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+ errmsg("could not import the requested snapshot"),
+ errdetail("The source process with PID %d is not running anymore.",
+ sourcepid)));
+
+ /*
+ * In transaction-snapshot mode, the first snapshot must live until end of
+ * xact, so we must make a copy of it. Furthermore, if we're running in
+ * serializable mode, predicate.c needs to do its own processing.
+ */
+ if (IsolationUsesXactSnapshot())
+ {
+ if (IsolationIsSerializable())
+ SetSerializableTransactionSnapshot(CurrentSnapshot, sourcevxid,
+ sourcepid);
+ /* Make a saved copy */
+ CurrentSnapshot = CopySnapshot(CurrentSnapshot);
+ FirstXactSnapshot = CurrentSnapshot;
+ /* Mark it as "registered" in FirstXactSnapshot */
+ FirstXactSnapshot->regd_count++;
+ pairingheap_add(&RegisteredSnapshots, &FirstXactSnapshot->ph_node);
+ }
+
+ FirstSnapshotSet = true;
+}
+
+/*
+ * CopySnapshot
+ * Copy the given snapshot.
+ *
+ * The copy is palloc'd in TopTransactionContext and has initial refcounts set
+ * to 0. The returned snapshot has the copied flag set.
+ */
+static Snapshot
+CopySnapshot(Snapshot snapshot)
+{
+ Snapshot newsnap;
+ Size subxipoff;
+ Size size;
+
+ Assert(snapshot != InvalidSnapshot);
+
+ /* We allocate any XID arrays needed in the same palloc block. */
+ size = subxipoff = sizeof(SnapshotData) +
+ snapshot->xcnt * sizeof(TransactionId);
+ if (snapshot->subxcnt > 0)
+ size += snapshot->subxcnt * sizeof(TransactionId);
+
+ newsnap = (Snapshot) MemoryContextAlloc(TopTransactionContext, size);
+ memcpy(newsnap, snapshot, sizeof(SnapshotData));
+
+ newsnap->regd_count = 0;
+ newsnap->active_count = 0;
+ newsnap->copied = true;
+ newsnap->snapXactCompletionCount = 0;
+
+ /* setup XID array */
+ if (snapshot->xcnt > 0)
+ {
+ newsnap->xip = (TransactionId *) (newsnap + 1);
+ memcpy(newsnap->xip, snapshot->xip,
+ snapshot->xcnt * sizeof(TransactionId));
+ }
+ else
+ newsnap->xip = NULL;
+
+ /*
+ * Setup subXID array. Don't bother to copy it if it had overflowed,
+ * though, because it's not used anywhere in that case. Except if it's a
+ * snapshot taken during recovery; all the top-level XIDs are in subxip as
+ * well in that case, so we mustn't lose them.
+ */
+ if (snapshot->subxcnt > 0 &&
+ (!snapshot->suboverflowed || snapshot->takenDuringRecovery))
+ {
+ newsnap->subxip = (TransactionId *) ((char *) newsnap + subxipoff);
+ memcpy(newsnap->subxip, snapshot->subxip,
+ snapshot->subxcnt * sizeof(TransactionId));
+ }
+ else
+ newsnap->subxip = NULL;
+
+ return newsnap;
+}
+
+/*
+ * FreeSnapshot
+ * Free the memory associated with a snapshot.
+ */
+static void
+FreeSnapshot(Snapshot snapshot)
+{
+ Assert(snapshot->regd_count == 0);
+ Assert(snapshot->active_count == 0);
+ Assert(snapshot->copied);
+
+ pfree(snapshot);
+}
+
+/*
+ * PushActiveSnapshot
+ * Set the given snapshot as the current active snapshot
+ *
+ * If the passed snapshot is a statically-allocated one, or it is possibly
+ * subject to a future command counter update, create a new long-lived copy
+ * with active refcount=1. Otherwise, only increment the refcount.
+ */
+void
+PushActiveSnapshot(Snapshot snapshot)
+{
+ PushActiveSnapshotWithLevel(snapshot, GetCurrentTransactionNestLevel());
+}
+
+/*
+ * PushActiveSnapshotWithLevel
+ * Set the given snapshot as the current active snapshot
+ *
+ * Same as PushActiveSnapshot except that caller can specify the
+ * transaction nesting level that "owns" the snapshot. This level
+ * must not be deeper than the current top of the snapshot stack.
+ */
+void
+PushActiveSnapshotWithLevel(Snapshot snapshot, int snap_level)
+{
+ ActiveSnapshotElt *newactive;
+
+ Assert(snapshot != InvalidSnapshot);
+ Assert(ActiveSnapshot == NULL || snap_level >= ActiveSnapshot->as_level);
+
+ newactive = MemoryContextAlloc(TopTransactionContext, sizeof(ActiveSnapshotElt));
+
+ /*
+ * Checking SecondarySnapshot is probably useless here, but it seems
+ * better to be sure.
+ */
+ if (snapshot == CurrentSnapshot || snapshot == SecondarySnapshot ||
+ !snapshot->copied)
+ newactive->as_snap = CopySnapshot(snapshot);
+ else
+ newactive->as_snap = snapshot;
+
+ newactive->as_next = ActiveSnapshot;
+ newactive->as_level = snap_level;
+
+ newactive->as_snap->active_count++;
+
+ ActiveSnapshot = newactive;
+ if (OldestActiveSnapshot == NULL)
+ OldestActiveSnapshot = ActiveSnapshot;
+}
+
+/*
+ * PushCopiedSnapshot
+ * As above, except forcibly copy the presented snapshot.
+ *
+ * This should be used when the ActiveSnapshot has to be modifiable, for
+ * example if the caller intends to call UpdateActiveSnapshotCommandId.
+ * The new snapshot will be released when popped from the stack.
+ */
+void
+PushCopiedSnapshot(Snapshot snapshot)
+{
+ PushActiveSnapshot(CopySnapshot(snapshot));
+}
+
+/*
+ * UpdateActiveSnapshotCommandId
+ *
+ * Update the current CID of the active snapshot. This can only be applied
+ * to a snapshot that is not referenced elsewhere.
+ */
+void
+UpdateActiveSnapshotCommandId(void)
+{
+ CommandId save_curcid,
+ curcid;
+
+ Assert(ActiveSnapshot != NULL);
+ Assert(ActiveSnapshot->as_snap->active_count == 1);
+ Assert(ActiveSnapshot->as_snap->regd_count == 0);
+
+ /*
+ * Don't allow modification of the active snapshot during parallel
+ * operation. We share the snapshot to worker backends at the beginning
+ * of parallel operation, so any change to the snapshot can lead to
+ * inconsistencies. We have other defenses against
+ * CommandCounterIncrement, but there are a few places that call this
+ * directly, so we put an additional guard here.
+ */
+ save_curcid = ActiveSnapshot->as_snap->curcid;
+ curcid = GetCurrentCommandId(false);
+ if (IsInParallelMode() && save_curcid != curcid)
+ elog(ERROR, "cannot modify commandid in active snapshot during a parallel operation");
+ ActiveSnapshot->as_snap->curcid = curcid;
+}
+
+/*
+ * PopActiveSnapshot
+ *
+ * Remove the topmost snapshot from the active snapshot stack, decrementing the
+ * reference count, and free it if this was the last reference.
+ */
+void
+PopActiveSnapshot(void)
+{
+ ActiveSnapshotElt *newstack;
+
+ newstack = ActiveSnapshot->as_next;
+
+ Assert(ActiveSnapshot->as_snap->active_count > 0);
+
+ ActiveSnapshot->as_snap->active_count--;
+
+ if (ActiveSnapshot->as_snap->active_count == 0 &&
+ ActiveSnapshot->as_snap->regd_count == 0)
+ FreeSnapshot(ActiveSnapshot->as_snap);
+
+ pfree(ActiveSnapshot);
+ ActiveSnapshot = newstack;
+ if (ActiveSnapshot == NULL)
+ OldestActiveSnapshot = NULL;
+
+ SnapshotResetXmin();
+}
+
+/*
+ * GetActiveSnapshot
+ * Return the topmost snapshot in the Active stack.
+ */
+Snapshot
+GetActiveSnapshot(void)
+{
+ Assert(ActiveSnapshot != NULL);
+
+ return ActiveSnapshot->as_snap;
+}
+
+/*
+ * ActiveSnapshotSet
+ * Return whether there is at least one snapshot in the Active stack
+ */
+bool
+ActiveSnapshotSet(void)
+{
+ return ActiveSnapshot != NULL;
+}
+
+/*
+ * RegisterSnapshot
+ * Register a snapshot as being in use by the current resource owner
+ *
+ * If InvalidSnapshot is passed, it is not registered.
+ */
+Snapshot
+RegisterSnapshot(Snapshot snapshot)
+{
+ if (snapshot == InvalidSnapshot)
+ return InvalidSnapshot;
+
+ return RegisterSnapshotOnOwner(snapshot, CurrentResourceOwner);
+}
+
+/*
+ * RegisterSnapshotOnOwner
+ * As above, but use the specified resource owner
+ */
+Snapshot
+RegisterSnapshotOnOwner(Snapshot snapshot, ResourceOwner owner)
+{
+ Snapshot snap;
+
+ if (snapshot == InvalidSnapshot)
+ return InvalidSnapshot;
+
+ /* Static snapshot? Create a persistent copy */
+ snap = snapshot->copied ? snapshot : CopySnapshot(snapshot);
+
+ /* and tell resowner.c about it */
+ ResourceOwnerEnlargeSnapshots(owner);
+ snap->regd_count++;
+ ResourceOwnerRememberSnapshot(owner, snap);
+
+ if (snap->regd_count == 1)
+ pairingheap_add(&RegisteredSnapshots, &snap->ph_node);
+
+ return snap;
+}
+
+/*
+ * UnregisterSnapshot
+ *
+ * Decrement the reference count of a snapshot, remove the corresponding
+ * reference from CurrentResourceOwner, and free the snapshot if no more
+ * references remain.
+ */
+void
+UnregisterSnapshot(Snapshot snapshot)
+{
+ if (snapshot == NULL)
+ return;
+
+ UnregisterSnapshotFromOwner(snapshot, CurrentResourceOwner);
+}
+
+/*
+ * UnregisterSnapshotFromOwner
+ * As above, but use the specified resource owner
+ */
+void
+UnregisterSnapshotFromOwner(Snapshot snapshot, ResourceOwner owner)
+{
+ if (snapshot == NULL)
+ return;
+
+ Assert(snapshot->regd_count > 0);
+ Assert(!pairingheap_is_empty(&RegisteredSnapshots));
+
+ ResourceOwnerForgetSnapshot(owner, snapshot);
+
+ snapshot->regd_count--;
+ if (snapshot->regd_count == 0)
+ pairingheap_remove(&RegisteredSnapshots, &snapshot->ph_node);
+
+ if (snapshot->regd_count == 0 && snapshot->active_count == 0)
+ {
+ FreeSnapshot(snapshot);
+ SnapshotResetXmin();
+ }
+}
+
+/*
+ * Comparison function for RegisteredSnapshots heap. Snapshots are ordered
+ * by xmin, so that the snapshot with smallest xmin is at the top.
+ */
+static int
+xmin_cmp(const pairingheap_node *a, const pairingheap_node *b, void *arg)
+{
+ const SnapshotData *asnap = pairingheap_const_container(SnapshotData, ph_node, a);
+ const SnapshotData *bsnap = pairingheap_const_container(SnapshotData, ph_node, b);
+
+ if (TransactionIdPrecedes(asnap->xmin, bsnap->xmin))
+ return 1;
+ else if (TransactionIdFollows(asnap->xmin, bsnap->xmin))
+ return -1;
+ else
+ return 0;
+}
+
+/*
+ * SnapshotResetXmin
+ *
+ * If there are no more snapshots, we can reset our PGPROC->xmin to
+ * InvalidTransactionId. Note we can do this without locking because we assume
+ * that storing an Xid is atomic.
+ *
+ * Even if there are some remaining snapshots, we may be able to advance our
+ * PGPROC->xmin to some degree. This typically happens when a portal is
+ * dropped. For efficiency, we only consider recomputing PGPROC->xmin when
+ * the active snapshot stack is empty; this allows us not to need to track
+ * which active snapshot is oldest.
+ *
+ * Note: it's tempting to use GetOldestSnapshot() here so that we can include
+ * active snapshots in the calculation. However, that compares by LSN not
+ * xmin so it's not entirely clear that it's the same thing. Also, we'd be
+ * critically dependent on the assumption that the bottommost active snapshot
+ * stack entry has the oldest xmin. (Current uses of GetOldestSnapshot() are
+ * not actually critical, but this would be.)
+ */
+static void
+SnapshotResetXmin(void)
+{
+ Snapshot minSnapshot;
+
+ if (ActiveSnapshot != NULL)
+ return;
+
+ if (pairingheap_is_empty(&RegisteredSnapshots))
+ {
+ MyProc->xmin = InvalidTransactionId;
+ return;
+ }
+
+ minSnapshot = pairingheap_container(SnapshotData, ph_node,
+ pairingheap_first(&RegisteredSnapshots));
+
+ if (TransactionIdPrecedes(MyProc->xmin, minSnapshot->xmin))
+ MyProc->xmin = minSnapshot->xmin;
+}
+
+/*
+ * AtSubCommit_Snapshot
+ */
+void
+AtSubCommit_Snapshot(int level)
+{
+ ActiveSnapshotElt *active;
+
+ /*
+ * Relabel the active snapshots set in this subtransaction as though they
+ * are owned by the parent subxact.
+ */
+ for (active = ActiveSnapshot; active != NULL; active = active->as_next)
+ {
+ if (active->as_level < level)
+ break;
+ active->as_level = level - 1;
+ }
+}
+
+/*
+ * AtSubAbort_Snapshot
+ * Clean up snapshots after a subtransaction abort
+ */
+void
+AtSubAbort_Snapshot(int level)
+{
+ /* Forget the active snapshots set by this subtransaction */
+ while (ActiveSnapshot && ActiveSnapshot->as_level >= level)
+ {
+ ActiveSnapshotElt *next;
+
+ next = ActiveSnapshot->as_next;
+
+ /*
+ * Decrement the snapshot's active count. If it's still registered or
+ * marked as active by an outer subtransaction, we can't free it yet.
+ */
+ Assert(ActiveSnapshot->as_snap->active_count >= 1);
+ ActiveSnapshot->as_snap->active_count -= 1;
+
+ if (ActiveSnapshot->as_snap->active_count == 0 &&
+ ActiveSnapshot->as_snap->regd_count == 0)
+ FreeSnapshot(ActiveSnapshot->as_snap);
+
+ /* and free the stack element */
+ pfree(ActiveSnapshot);
+
+ ActiveSnapshot = next;
+ if (ActiveSnapshot == NULL)
+ OldestActiveSnapshot = NULL;
+ }
+
+ SnapshotResetXmin();
+}
+
+/*
+ * AtEOXact_Snapshot
+ * Snapshot manager's cleanup function for end of transaction
+ */
+void
+AtEOXact_Snapshot(bool isCommit, bool resetXmin)
+{
+ /*
+ * In transaction-snapshot mode we must release our privately-managed
+ * reference to the transaction snapshot. We must remove it from
+ * RegisteredSnapshots to keep the check below happy. But we don't bother
+ * to do FreeSnapshot, for two reasons: the memory will go away with
+ * TopTransactionContext anyway, and if someone has left the snapshot
+ * stacked as active, we don't want the code below to be chasing through a
+ * dangling pointer.
+ */
+ if (FirstXactSnapshot != NULL)
+ {
+ Assert(FirstXactSnapshot->regd_count > 0);
+ Assert(!pairingheap_is_empty(&RegisteredSnapshots));
+ pairingheap_remove(&RegisteredSnapshots, &FirstXactSnapshot->ph_node);
+ }
+ FirstXactSnapshot = NULL;
+
+ /*
+ * If we exported any snapshots, clean them up.
+ */
+ if (exportedSnapshots != NIL)
+ {
+ ListCell *lc;
+
+ /*
+ * Get rid of the files. Unlink failure is only a WARNING because (1)
+ * it's too late to abort the transaction, and (2) leaving a leaked
+ * file around has little real consequence anyway.
+ *
+ * We also need to remove the snapshots from RegisteredSnapshots to
+ * prevent a warning below.
+ *
+ * As with the FirstXactSnapshot, we don't need to free resources of
+ * the snapshot itself as it will go away with the memory context.
+ */
+ foreach(lc, exportedSnapshots)
+ {
+ ExportedSnapshot *esnap = (ExportedSnapshot *) lfirst(lc);
+
+ if (unlink(esnap->snapfile))
+ elog(WARNING, "could not unlink file \"%s\": %m",
+ esnap->snapfile);
+
+ pairingheap_remove(&RegisteredSnapshots,
+ &esnap->snapshot->ph_node);
+ }
+
+ exportedSnapshots = NIL;
+ }
+
+ /* Drop catalog snapshot if any */
+ InvalidateCatalogSnapshot();
+
+ /* On commit, complain about leftover snapshots */
+ if (isCommit)
+ {
+ ActiveSnapshotElt *active;
+
+ if (!pairingheap_is_empty(&RegisteredSnapshots))
+ elog(WARNING, "registered snapshots seem to remain after cleanup");
+
+ /* complain about unpopped active snapshots */
+ for (active = ActiveSnapshot; active != NULL; active = active->as_next)
+ elog(WARNING, "snapshot %p still active", active);
+ }
+
+ /*
+ * And reset our state. We don't need to free the memory explicitly --
+ * it'll go away with TopTransactionContext.
+ */
+ ActiveSnapshot = NULL;
+ OldestActiveSnapshot = NULL;
+ pairingheap_reset(&RegisteredSnapshots);
+
+ CurrentSnapshot = NULL;
+ SecondarySnapshot = NULL;
+
+ FirstSnapshotSet = false;
+
+ /*
+ * During normal commit processing, we call ProcArrayEndTransaction() to
+ * reset the MyProc->xmin. That call happens prior to the call to
+ * AtEOXact_Snapshot(), so we need not touch xmin here at all.
+ */
+ if (resetXmin)
+ SnapshotResetXmin();
+
+ Assert(resetXmin || MyProc->xmin == 0);
+}
+
+
+/*
+ * ExportSnapshot
+ * Export the snapshot to a file so that other backends can import it.
+ * Returns the token (the file name) that can be used to import this
+ * snapshot.
+ */
+char *
+ExportSnapshot(Snapshot snapshot)
+{
+ TransactionId topXid;
+ TransactionId *children;
+ ExportedSnapshot *esnap;
+ int nchildren;
+ int addTopXid;
+ StringInfoData buf;
+ FILE *f;
+ int i;
+ MemoryContext oldcxt;
+ char path[MAXPGPATH];
+ char pathtmp[MAXPGPATH];
+
+ /*
+ * It's tempting to call RequireTransactionBlock here, since it's not very
+ * useful to export a snapshot that will disappear immediately afterwards.
+ * However, we haven't got enough information to do that, since we don't
+ * know if we're at top level or not. For example, we could be inside a
+ * plpgsql function that is going to fire off other transactions via
+ * dblink. Rather than disallow perfectly legitimate usages, don't make a
+ * check.
+ *
+ * Also note that we don't make any restriction on the transaction's
+ * isolation level; however, importers must check the level if they are
+ * serializable.
+ */
+
+ /*
+ * Get our transaction ID if there is one, to include in the snapshot.
+ */
+ topXid = GetTopTransactionIdIfAny();
+
+ /*
+ * We cannot export a snapshot from a subtransaction because there's no
+ * easy way for importers to verify that the same subtransaction is still
+ * running.
+ */
+ if (IsSubTransaction())
+ ereport(ERROR,
+ (errcode(ERRCODE_ACTIVE_SQL_TRANSACTION),
+ errmsg("cannot export a snapshot from a subtransaction")));
+
+ /*
+ * We do however allow previous committed subtransactions to exist.
+ * Importers of the snapshot must see them as still running, so get their
+ * XIDs to add them to the snapshot.
+ */
+ nchildren = xactGetCommittedChildren(&children);
+
+ /*
+ * Generate file path for the snapshot. We start numbering of snapshots
+ * inside the transaction from 1.
+ */
+ snprintf(path, sizeof(path), SNAPSHOT_EXPORT_DIR "/%08X-%08X-%d",
+ MyProc->backendId, MyProc->lxid, list_length(exportedSnapshots) + 1);
+
+ /*
+ * Copy the snapshot into TopTransactionContext, add it to the
+ * exportedSnapshots list, and mark it pseudo-registered. We do this to
+ * ensure that the snapshot's xmin is honored for the rest of the
+ * transaction.
+ */
+ snapshot = CopySnapshot(snapshot);
+
+ oldcxt = MemoryContextSwitchTo(TopTransactionContext);
+ esnap = (ExportedSnapshot *) palloc(sizeof(ExportedSnapshot));
+ esnap->snapfile = pstrdup(path);
+ esnap->snapshot = snapshot;
+ exportedSnapshots = lappend(exportedSnapshots, esnap);
+ MemoryContextSwitchTo(oldcxt);
+
+ snapshot->regd_count++;
+ pairingheap_add(&RegisteredSnapshots, &snapshot->ph_node);
+
+ /*
+ * Fill buf with a text serialization of the snapshot, plus identification
+ * data about this transaction. The format expected by ImportSnapshot is
+ * pretty rigid: each line must be fieldname:value.
+ */
+ initStringInfo(&buf);
+
+ appendStringInfo(&buf, "vxid:%d/%u\n", MyProc->backendId, MyProc->lxid);
+ appendStringInfo(&buf, "pid:%d\n", MyProcPid);
+ appendStringInfo(&buf, "dbid:%u\n", MyDatabaseId);
+ appendStringInfo(&buf, "iso:%d\n", XactIsoLevel);
+ appendStringInfo(&buf, "ro:%d\n", XactReadOnly);
+
+ appendStringInfo(&buf, "xmin:%u\n", snapshot->xmin);
+ appendStringInfo(&buf, "xmax:%u\n", snapshot->xmax);
+
+ /*
+ * We must include our own top transaction ID in the top-xid data, since
+ * by definition we will still be running when the importing transaction
+ * adopts the snapshot, but GetSnapshotData never includes our own XID in
+ * the snapshot. (There must, therefore, be enough room to add it.)
+ *
+ * However, it could be that our topXid is after the xmax, in which case
+ * we shouldn't include it because xip[] members are expected to be before
+ * xmax. (We need not make the same check for subxip[] members, see
+ * snapshot.h.)
+ */
+ addTopXid = (TransactionIdIsValid(topXid) &&
+ TransactionIdPrecedes(topXid, snapshot->xmax)) ? 1 : 0;
+ appendStringInfo(&buf, "xcnt:%d\n", snapshot->xcnt + addTopXid);
+ for (i = 0; i < snapshot->xcnt; i++)
+ appendStringInfo(&buf, "xip:%u\n", snapshot->xip[i]);
+ if (addTopXid)
+ appendStringInfo(&buf, "xip:%u\n", topXid);
+
+ /*
+ * Similarly, we add our subcommitted child XIDs to the subxid data. Here,
+ * we have to cope with possible overflow.
+ */
+ if (snapshot->suboverflowed ||
+ snapshot->subxcnt + nchildren > GetMaxSnapshotSubxidCount())
+ appendStringInfoString(&buf, "sof:1\n");
+ else
+ {
+ appendStringInfoString(&buf, "sof:0\n");
+ appendStringInfo(&buf, "sxcnt:%d\n", snapshot->subxcnt + nchildren);
+ for (i = 0; i < snapshot->subxcnt; i++)
+ appendStringInfo(&buf, "sxp:%u\n", snapshot->subxip[i]);
+ for (i = 0; i < nchildren; i++)
+ appendStringInfo(&buf, "sxp:%u\n", children[i]);
+ }
+ appendStringInfo(&buf, "rec:%u\n", snapshot->takenDuringRecovery);
+
+ /*
+ * Now write the text representation into a file. We first write to a
+ * ".tmp" filename, and rename to final filename if no error. This
+ * ensures that no other backend can read an incomplete file
+ * (ImportSnapshot won't allow it because of its valid-characters check).
+ */
+ snprintf(pathtmp, sizeof(pathtmp), "%s.tmp", path);
+ if (!(f = AllocateFile(pathtmp, PG_BINARY_W)))
+ ereport(ERROR,
+ (errcode_for_file_access(),
+ errmsg("could not create file \"%s\": %m", pathtmp)));
+
+ if (fwrite(buf.data, buf.len, 1, f) != 1)
+ ereport(ERROR,
+ (errcode_for_file_access(),
+ errmsg("could not write to file \"%s\": %m", pathtmp)));
+
+ /* no fsync() since file need not survive a system crash */
+
+ if (FreeFile(f))
+ ereport(ERROR,
+ (errcode_for_file_access(),
+ errmsg("could not write to file \"%s\": %m", pathtmp)));
+
+ /*
+ * Now that we have written everything into a .tmp file, rename the file
+ * to remove the .tmp suffix.
+ */
+ if (rename(pathtmp, path) < 0)
+ ereport(ERROR,
+ (errcode_for_file_access(),
+ errmsg("could not rename file \"%s\" to \"%s\": %m",
+ pathtmp, path)));
+
+ /*
+ * The basename of the file is what we return from pg_export_snapshot().
+ * It's already in path in a textual format and we know that the path
+ * starts with SNAPSHOT_EXPORT_DIR. Skip over the prefix and the slash
+ * and pstrdup it so as not to return the address of a local variable.
+ */
+ return pstrdup(path + strlen(SNAPSHOT_EXPORT_DIR) + 1);
+}
+
+/*
+ * pg_export_snapshot
+ * SQL-callable wrapper for ExportSnapshot.
+ */
+Datum
+pg_export_snapshot(PG_FUNCTION_ARGS)
+{
+ char *snapshotName;
+
+ snapshotName = ExportSnapshot(GetActiveSnapshot());
+ PG_RETURN_TEXT_P(cstring_to_text(snapshotName));
+}
+
+
+/*
+ * Parsing subroutines for ImportSnapshot: parse a line with the given
+ * prefix followed by a value, and advance *s to the next line. The
+ * filename is provided for use in error messages.
+ */
+static int
+parseIntFromText(const char *prefix, char **s, const char *filename)
+{
+ char *ptr = *s;
+ int prefixlen = strlen(prefix);
+ int val;
+
+ if (strncmp(ptr, prefix, prefixlen) != 0)
+ ereport(ERROR,
+ (errcode(ERRCODE_INVALID_TEXT_REPRESENTATION),
+ errmsg("invalid snapshot data in file \"%s\"", filename)));
+ ptr += prefixlen;
+ if (sscanf(ptr, "%d", &val) != 1)
+ ereport(ERROR,
+ (errcode(ERRCODE_INVALID_TEXT_REPRESENTATION),
+ errmsg("invalid snapshot data in file \"%s\"", filename)));
+ ptr = strchr(ptr, '\n');
+ if (!ptr)
+ ereport(ERROR,
+ (errcode(ERRCODE_INVALID_TEXT_REPRESENTATION),
+ errmsg("invalid snapshot data in file \"%s\"", filename)));
+ *s = ptr + 1;
+ return val;
+}
+
+static TransactionId
+parseXidFromText(const char *prefix, char **s, const char *filename)
+{
+ char *ptr = *s;
+ int prefixlen = strlen(prefix);
+ TransactionId val;
+
+ if (strncmp(ptr, prefix, prefixlen) != 0)
+ ereport(ERROR,
+ (errcode(ERRCODE_INVALID_TEXT_REPRESENTATION),
+ errmsg("invalid snapshot data in file \"%s\"", filename)));
+ ptr += prefixlen;
+ if (sscanf(ptr, "%u", &val) != 1)
+ ereport(ERROR,
+ (errcode(ERRCODE_INVALID_TEXT_REPRESENTATION),
+ errmsg("invalid snapshot data in file \"%s\"", filename)));
+ ptr = strchr(ptr, '\n');
+ if (!ptr)
+ ereport(ERROR,
+ (errcode(ERRCODE_INVALID_TEXT_REPRESENTATION),
+ errmsg("invalid snapshot data in file \"%s\"", filename)));
+ *s = ptr + 1;
+ return val;
+}
+
+static void
+parseVxidFromText(const char *prefix, char **s, const char *filename,
+ VirtualTransactionId *vxid)
+{
+ char *ptr = *s;
+ int prefixlen = strlen(prefix);
+
+ if (strncmp(ptr, prefix, prefixlen) != 0)
+ ereport(ERROR,
+ (errcode(ERRCODE_INVALID_TEXT_REPRESENTATION),
+ errmsg("invalid snapshot data in file \"%s\"", filename)));
+ ptr += prefixlen;
+ if (sscanf(ptr, "%d/%u", &vxid->backendId, &vxid->localTransactionId) != 2)
+ ereport(ERROR,
+ (errcode(ERRCODE_INVALID_TEXT_REPRESENTATION),
+ errmsg("invalid snapshot data in file \"%s\"", filename)));
+ ptr = strchr(ptr, '\n');
+ if (!ptr)
+ ereport(ERROR,
+ (errcode(ERRCODE_INVALID_TEXT_REPRESENTATION),
+ errmsg("invalid snapshot data in file \"%s\"", filename)));
+ *s = ptr + 1;
+}
+
+/*
+ * ImportSnapshot
+ * Import a previously exported snapshot. The argument should be a
+ * filename in SNAPSHOT_EXPORT_DIR. Load the snapshot from that file.
+ * This is called by "SET TRANSACTION SNAPSHOT 'foo'".
+ */
+void
+ImportSnapshot(const char *idstr)
+{
+ char path[MAXPGPATH];
+ FILE *f;
+ struct stat stat_buf;
+ char *filebuf;
+ int xcnt;
+ int i;
+ VirtualTransactionId src_vxid;
+ int src_pid;
+ Oid src_dbid;
+ int src_isolevel;
+ bool src_readonly;
+ SnapshotData snapshot;
+
+ /*
+ * Must be at top level of a fresh transaction. Note in particular that
+ * we check we haven't acquired an XID --- if we have, it's conceivable
+ * that the snapshot would show it as not running, making for very screwy
+ * behavior.
+ */
+ if (FirstSnapshotSet ||
+ GetTopTransactionIdIfAny() != InvalidTransactionId ||
+ IsSubTransaction())
+ ereport(ERROR,
+ (errcode(ERRCODE_ACTIVE_SQL_TRANSACTION),
+ errmsg("SET TRANSACTION SNAPSHOT must be called before any query")));
+
+ /*
+ * If we are in read committed mode then the next query would execute with
+ * a new snapshot thus making this function call quite useless.
+ */
+ if (!IsolationUsesXactSnapshot())
+ ereport(ERROR,
+ (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+ errmsg("a snapshot-importing transaction must have isolation level SERIALIZABLE or REPEATABLE READ")));
+
+ /*
+ * Verify the identifier: only 0-9, A-F and hyphens are allowed. We do
+ * this mainly to prevent reading arbitrary files.
+ */
+ if (strspn(idstr, "0123456789ABCDEF-") != strlen(idstr))
+ ereport(ERROR,
+ (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+ errmsg("invalid snapshot identifier: \"%s\"", idstr)));
+
+ /* OK, read the file */
+ snprintf(path, MAXPGPATH, SNAPSHOT_EXPORT_DIR "/%s", idstr);
+
+ f = AllocateFile(path, PG_BINARY_R);
+ if (!f)
+ ereport(ERROR,
+ (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+ errmsg("invalid snapshot identifier: \"%s\"", idstr)));
+
+ /* get the size of the file so that we know how much memory we need */
+ if (fstat(fileno(f), &stat_buf))
+ elog(ERROR, "could not stat file \"%s\": %m", path);
+
+ /* and read the file into a palloc'd string */
+ filebuf = (char *) palloc(stat_buf.st_size + 1);
+ if (fread(filebuf, stat_buf.st_size, 1, f) != 1)
+ elog(ERROR, "could not read file \"%s\": %m", path);
+
+ filebuf[stat_buf.st_size] = '\0';
+
+ FreeFile(f);
+
+ /*
+ * Construct a snapshot struct by parsing the file content.
+ */
+ memset(&snapshot, 0, sizeof(snapshot));
+
+ parseVxidFromText("vxid:", &filebuf, path, &src_vxid);
+ src_pid = parseIntFromText("pid:", &filebuf, path);
+ /* we abuse parseXidFromText a bit here ... */
+ src_dbid = parseXidFromText("dbid:", &filebuf, path);
+ src_isolevel = parseIntFromText("iso:", &filebuf, path);
+ src_readonly = parseIntFromText("ro:", &filebuf, path);
+
+ snapshot.snapshot_type = SNAPSHOT_MVCC;
+
+ snapshot.xmin = parseXidFromText("xmin:", &filebuf, path);
+ snapshot.xmax = parseXidFromText("xmax:", &filebuf, path);
+
+ snapshot.xcnt = xcnt = parseIntFromText("xcnt:", &filebuf, path);
+
+ /* sanity-check the xid count before palloc */
+ if (xcnt < 0 || xcnt > GetMaxSnapshotXidCount())
+ ereport(ERROR,
+ (errcode(ERRCODE_INVALID_TEXT_REPRESENTATION),
+ errmsg("invalid snapshot data in file \"%s\"", path)));
+
+ snapshot.xip = (TransactionId *) palloc(xcnt * sizeof(TransactionId));
+ for (i = 0; i < xcnt; i++)
+ snapshot.xip[i] = parseXidFromText("xip:", &filebuf, path);
+
+ snapshot.suboverflowed = parseIntFromText("sof:", &filebuf, path);
+
+ if (!snapshot.suboverflowed)
+ {
+ snapshot.subxcnt = xcnt = parseIntFromText("sxcnt:", &filebuf, path);
+
+ /* sanity-check the xid count before palloc */
+ if (xcnt < 0 || xcnt > GetMaxSnapshotSubxidCount())
+ ereport(ERROR,
+ (errcode(ERRCODE_INVALID_TEXT_REPRESENTATION),
+ errmsg("invalid snapshot data in file \"%s\"", path)));
+
+ snapshot.subxip = (TransactionId *) palloc(xcnt * sizeof(TransactionId));
+ for (i = 0; i < xcnt; i++)
+ snapshot.subxip[i] = parseXidFromText("sxp:", &filebuf, path);
+ }
+ else
+ {
+ snapshot.subxcnt = 0;
+ snapshot.subxip = NULL;
+ }
+
+ snapshot.takenDuringRecovery = parseIntFromText("rec:", &filebuf, path);
+
+ /*
+ * Do some additional sanity checking, just to protect ourselves. We
+ * don't trouble to check the array elements, just the most critical
+ * fields.
+ */
+ if (!VirtualTransactionIdIsValid(src_vxid) ||
+ !OidIsValid(src_dbid) ||
+ !TransactionIdIsNormal(snapshot.xmin) ||
+ !TransactionIdIsNormal(snapshot.xmax))
+ ereport(ERROR,
+ (errcode(ERRCODE_INVALID_TEXT_REPRESENTATION),
+ errmsg("invalid snapshot data in file \"%s\"", path)));
+
+ /*
+ * If we're serializable, the source transaction must be too, otherwise
+ * predicate.c has problems (SxactGlobalXmin could go backwards). Also, a
+ * non-read-only transaction can't adopt a snapshot from a read-only
+ * transaction, as predicate.c handles the cases very differently.
+ */
+ if (IsolationIsSerializable())
+ {
+ if (src_isolevel != XACT_SERIALIZABLE)
+ ereport(ERROR,
+ (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+ errmsg("a serializable transaction cannot import a snapshot from a non-serializable transaction")));
+ if (src_readonly && !XactReadOnly)
+ ereport(ERROR,
+ (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+ errmsg("a non-read-only serializable transaction cannot import a snapshot from a read-only transaction")));
+ }
+
+ /*
+ * We cannot import a snapshot that was taken in a different database,
+ * because vacuum calculates OldestXmin on a per-database basis; so the
+ * source transaction's xmin doesn't protect us from data loss. This
+ * restriction could be removed if the source transaction were to mark its
+ * xmin as being globally applicable. But that would require some
+ * additional syntax, since that has to be known when the snapshot is
+ * initially taken. (See pgsql-hackers discussion of 2011-10-21.)
+ */
+ if (src_dbid != MyDatabaseId)
+ ereport(ERROR,
+ (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+ errmsg("cannot import a snapshot from a different database")));
+
+ /* OK, install the snapshot */
+ SetTransactionSnapshot(&snapshot, &src_vxid, src_pid, NULL);
+}
+
+/*
+ * XactHasExportedSnapshots
+ * Test whether current transaction has exported any snapshots.
+ */
+bool
+XactHasExportedSnapshots(void)
+{
+ return (exportedSnapshots != NIL);
+}
+
+/*
+ * DeleteAllExportedSnapshotFiles
+ * Clean up any files that have been left behind by a crashed backend
+ * that had exported snapshots before it died.
+ *
+ * This should be called during database startup or crash recovery.
+ */
+void
+DeleteAllExportedSnapshotFiles(void)
+{
+ char buf[MAXPGPATH + sizeof(SNAPSHOT_EXPORT_DIR)];
+ DIR *s_dir;
+ struct dirent *s_de;
+
+ /*
+ * Problems in reading the directory, or unlinking files, are reported at
+ * LOG level. Since we're running in the startup process, ERROR level
+ * would prevent database start, and it's not important enough for that.
+ */
+ s_dir = AllocateDir(SNAPSHOT_EXPORT_DIR);
+
+ while ((s_de = ReadDirExtended(s_dir, SNAPSHOT_EXPORT_DIR, LOG)) != NULL)
+ {
+ if (strcmp(s_de->d_name, ".") == 0 ||
+ strcmp(s_de->d_name, "..") == 0)
+ continue;
+
+ snprintf(buf, sizeof(buf), SNAPSHOT_EXPORT_DIR "/%s", s_de->d_name);
+
+ if (unlink(buf) != 0)
+ ereport(LOG,
+ (errcode_for_file_access(),
+ errmsg("could not remove file \"%s\": %m", buf)));
+ }
+
+ FreeDir(s_dir);
+}
+
+/*
+ * ThereAreNoPriorRegisteredSnapshots
+ * Is the registered snapshot count less than or equal to one?
+ *
+ * Don't use this to settle important decisions. While zero registrations and
+ * no ActiveSnapshot would confirm a certain idleness, the system makes no
+ * guarantees about the significance of one registered snapshot.
+ */
+bool
+ThereAreNoPriorRegisteredSnapshots(void)
+{
+ if (pairingheap_is_empty(&RegisteredSnapshots) ||
+ pairingheap_is_singular(&RegisteredSnapshots))
+ return true;
+
+ return false;
+}
+
+/*
+ * HaveRegisteredOrActiveSnapshot
+ * Is there any registered or active snapshot?
+ *
+ * NB: Unless pushed or active, the cached catalog snapshot will not cause
+ * this function to return true. That allows this function to be used in
+ * checks enforcing a longer-lived snapshot.
+ */
+bool
+HaveRegisteredOrActiveSnapshot(void)
+{
+ if (ActiveSnapshot != NULL)
+ return true;
+
+ /*
+ * The catalog snapshot is in RegisteredSnapshots when valid, but can be
+ * removed at any time due to invalidation processing. If explicitly
+ * registered more than one snapshot has to be in RegisteredSnapshots.
+ */
+ if (CatalogSnapshot != NULL &&
+ pairingheap_is_singular(&RegisteredSnapshots))
+ return false;
+
+ return !pairingheap_is_empty(&RegisteredSnapshots);
+}
+
+
+/*
+ * Return a timestamp that is exactly on a minute boundary.
+ *
+ * If the argument is already aligned, return that value, otherwise move to
+ * the next minute boundary following the given time.
+ */
+static TimestampTz
+AlignTimestampToMinuteBoundary(TimestampTz ts)
+{
+ TimestampTz retval = ts + (USECS_PER_MINUTE - 1);
+
+ return retval - (retval % USECS_PER_MINUTE);
+}
+
+/*
+ * Get current timestamp for snapshots
+ *
+ * This is basically GetCurrentTimestamp(), but with a guarantee that
+ * the result never moves backward.
+ */
+TimestampTz
+GetSnapshotCurrentTimestamp(void)
+{
+ TimestampTz now = GetCurrentTimestamp();
+
+ /*
+ * Don't let time move backward; if it hasn't advanced, use the old value.
+ */
+ SpinLockAcquire(&oldSnapshotControl->mutex_current);
+ if (now <= oldSnapshotControl->current_timestamp)
+ now = oldSnapshotControl->current_timestamp;
+ else
+ oldSnapshotControl->current_timestamp = now;
+ SpinLockRelease(&oldSnapshotControl->mutex_current);
+
+ return now;
+}
+
+/*
+ * Get timestamp through which vacuum may have processed based on last stored
+ * value for threshold_timestamp.
+ *
+ * XXX: So far, we never trust that a 64-bit value can be read atomically; if
+ * that ever changes, we could get rid of the spinlock here.
+ */
+TimestampTz
+GetOldSnapshotThresholdTimestamp(void)
+{
+ TimestampTz threshold_timestamp;
+
+ SpinLockAcquire(&oldSnapshotControl->mutex_threshold);
+ threshold_timestamp = oldSnapshotControl->threshold_timestamp;
+ SpinLockRelease(&oldSnapshotControl->mutex_threshold);
+
+ return threshold_timestamp;
+}
+
+void
+SetOldSnapshotThresholdTimestamp(TimestampTz ts, TransactionId xlimit)
+{
+ SpinLockAcquire(&oldSnapshotControl->mutex_threshold);
+ Assert(oldSnapshotControl->threshold_timestamp <= ts);
+ Assert(TransactionIdPrecedesOrEquals(oldSnapshotControl->threshold_xid, xlimit));
+ oldSnapshotControl->threshold_timestamp = ts;
+ oldSnapshotControl->threshold_xid = xlimit;
+ SpinLockRelease(&oldSnapshotControl->mutex_threshold);
+}
+
+/*
+ * XXX: Magic to keep old_snapshot_threshold tests appear "working". They
+ * currently are broken, and discussion of what to do about them is
+ * ongoing. See
+ * https://www.postgresql.org/message-id/20200403001235.e6jfdll3gh2ygbuc%40alap3.anarazel.de
+ */
+void
+SnapshotTooOldMagicForTest(void)
+{
+ TimestampTz ts = GetSnapshotCurrentTimestamp();
+
+ Assert(old_snapshot_threshold == 0);
+
+ ts -= 5 * USECS_PER_SEC;
+
+ SpinLockAcquire(&oldSnapshotControl->mutex_threshold);
+ oldSnapshotControl->threshold_timestamp = ts;
+ SpinLockRelease(&oldSnapshotControl->mutex_threshold);
+}
+
+/*
+ * If there is a valid mapping for the timestamp, set *xlimitp to
+ * that. Returns whether there is such a mapping.
+ */
+static bool
+GetOldSnapshotFromTimeMapping(TimestampTz ts, TransactionId *xlimitp)
+{
+ bool in_mapping = false;
+
+ Assert(ts == AlignTimestampToMinuteBoundary(ts));
+
+ LWLockAcquire(OldSnapshotTimeMapLock, LW_SHARED);
+
+ if (oldSnapshotControl->count_used > 0
+ && ts >= oldSnapshotControl->head_timestamp)
+ {
+ int offset;
+
+ offset = ((ts - oldSnapshotControl->head_timestamp)
+ / USECS_PER_MINUTE);
+ if (offset > oldSnapshotControl->count_used - 1)
+ offset = oldSnapshotControl->count_used - 1;
+ offset = (oldSnapshotControl->head_offset + offset)
+ % OLD_SNAPSHOT_TIME_MAP_ENTRIES;
+
+ *xlimitp = oldSnapshotControl->xid_by_minute[offset];
+
+ in_mapping = true;
+ }
+
+ LWLockRelease(OldSnapshotTimeMapLock);
+
+ return in_mapping;
+}
+
+/*
+ * TransactionIdLimitedForOldSnapshots
+ *
+ * Apply old snapshot limit. This is intended to be called for page pruning
+ * and table vacuuming, to allow old_snapshot_threshold to override the normal
+ * global xmin value. Actual testing for snapshot too old will be based on
+ * whether a snapshot timestamp is prior to the threshold timestamp set in
+ * this function.
+ *
+ * If the limited horizon allows a cleanup action that otherwise would not be
+ * possible, SetOldSnapshotThresholdTimestamp(*limit_ts, *limit_xid) needs to
+ * be called before that cleanup action.
+ */
+bool
+TransactionIdLimitedForOldSnapshots(TransactionId recentXmin,
+ Relation relation,
+ TransactionId *limit_xid,
+ TimestampTz *limit_ts)
+{
+ TimestampTz ts;
+ TransactionId xlimit = recentXmin;
+ TransactionId latest_xmin;
+ TimestampTz next_map_update_ts;
+ TransactionId threshold_timestamp;
+ TransactionId threshold_xid;
+
+ Assert(TransactionIdIsNormal(recentXmin));
+ Assert(OldSnapshotThresholdActive());
+ Assert(limit_ts != NULL && limit_xid != NULL);
+
+ /*
+ * TestForOldSnapshot() assumes early pruning advances the page LSN, so we
+ * can't prune early when skipping WAL.
+ */
+ if (!RelationAllowsEarlyPruning(relation) || !RelationNeedsWAL(relation))
+ return false;
+
+ ts = GetSnapshotCurrentTimestamp();
+
+ SpinLockAcquire(&oldSnapshotControl->mutex_latest_xmin);
+ latest_xmin = oldSnapshotControl->latest_xmin;
+ next_map_update_ts = oldSnapshotControl->next_map_update;
+ SpinLockRelease(&oldSnapshotControl->mutex_latest_xmin);
+
+ /*
+ * Zero threshold always overrides to latest xmin, if valid. Without some
+ * heuristic it will find its own snapshot too old on, for example, a
+ * simple UPDATE -- which would make it useless for most testing, but
+ * there is no principled way to ensure that it doesn't fail in this way.
+ * Use a five-second delay to try to get useful testing behavior, but this
+ * may need adjustment.
+ */
+ if (old_snapshot_threshold == 0)
+ {
+ if (TransactionIdPrecedes(latest_xmin, MyProc->xmin)
+ && TransactionIdFollows(latest_xmin, xlimit))
+ xlimit = latest_xmin;
+
+ ts -= 5 * USECS_PER_SEC;
+ }
+ else
+ {
+ ts = AlignTimestampToMinuteBoundary(ts)
+ - (old_snapshot_threshold * USECS_PER_MINUTE);
+
+ /* Check for fast exit without LW locking. */
+ SpinLockAcquire(&oldSnapshotControl->mutex_threshold);
+ threshold_timestamp = oldSnapshotControl->threshold_timestamp;
+ threshold_xid = oldSnapshotControl->threshold_xid;
+ SpinLockRelease(&oldSnapshotControl->mutex_threshold);
+
+ if (ts == threshold_timestamp)
+ {
+ /*
+ * Current timestamp is in same bucket as the last limit that was
+ * applied. Reuse.
+ */
+ xlimit = threshold_xid;
+ }
+ else if (ts == next_map_update_ts)
+ {
+ /*
+ * FIXME: This branch is super iffy - but that should probably
+ * fixed separately.
+ */
+ xlimit = latest_xmin;
+ }
+ else if (GetOldSnapshotFromTimeMapping(ts, &xlimit))
+ {
+ }
+
+ /*
+ * Failsafe protection against vacuuming work of active transaction.
+ *
+ * This is not an assertion because we avoid the spinlock for
+ * performance, leaving open the possibility that xlimit could advance
+ * and be more current; but it seems prudent to apply this limit. It
+ * might make pruning a tiny bit less aggressive than it could be, but
+ * protects against data loss bugs.
+ */
+ if (TransactionIdIsNormal(latest_xmin)
+ && TransactionIdPrecedes(latest_xmin, xlimit))
+ xlimit = latest_xmin;
+ }
+
+ if (TransactionIdIsValid(xlimit) &&
+ TransactionIdFollowsOrEquals(xlimit, recentXmin))
+ {
+ *limit_ts = ts;
+ *limit_xid = xlimit;
+
+ return true;
+ }
+
+ return false;
+}
+
+/*
+ * Take care of the circular buffer that maps time to xid.
+ */
+void
+MaintainOldSnapshotTimeMapping(TimestampTz whenTaken, TransactionId xmin)
+{
+ TimestampTz ts;
+ TransactionId latest_xmin;
+ TimestampTz update_ts;
+ bool map_update_required = false;
+
+ /* Never call this function when old snapshot checking is disabled. */
+ Assert(old_snapshot_threshold >= 0);
+
+ ts = AlignTimestampToMinuteBoundary(whenTaken);
+
+ /*
+ * Keep track of the latest xmin seen by any process. Update mapping with
+ * a new value when we have crossed a bucket boundary.
+ */
+ SpinLockAcquire(&oldSnapshotControl->mutex_latest_xmin);
+ latest_xmin = oldSnapshotControl->latest_xmin;
+ update_ts = oldSnapshotControl->next_map_update;
+ if (ts > update_ts)
+ {
+ oldSnapshotControl->next_map_update = ts;
+ map_update_required = true;
+ }
+ if (TransactionIdFollows(xmin, latest_xmin))
+ oldSnapshotControl->latest_xmin = xmin;
+ SpinLockRelease(&oldSnapshotControl->mutex_latest_xmin);
+
+ /* We only needed to update the most recent xmin value. */
+ if (!map_update_required)
+ return;
+
+ /* No further tracking needed for 0 (used for testing). */
+ if (old_snapshot_threshold == 0)
+ return;
+
+ /*
+ * We don't want to do something stupid with unusual values, but we don't
+ * want to litter the log with warnings or break otherwise normal
+ * processing for this feature; so if something seems unreasonable, just
+ * log at DEBUG level and return without doing anything.
+ */
+ if (whenTaken < 0)
+ {
+ elog(DEBUG1,
+ "MaintainOldSnapshotTimeMapping called with negative whenTaken = %ld",
+ (long) whenTaken);
+ return;
+ }
+ if (!TransactionIdIsNormal(xmin))
+ {
+ elog(DEBUG1,
+ "MaintainOldSnapshotTimeMapping called with xmin = %lu",
+ (unsigned long) xmin);
+ return;
+ }
+
+ LWLockAcquire(OldSnapshotTimeMapLock, LW_EXCLUSIVE);
+
+ Assert(oldSnapshotControl->head_offset >= 0);
+ Assert(oldSnapshotControl->head_offset < OLD_SNAPSHOT_TIME_MAP_ENTRIES);
+ Assert((oldSnapshotControl->head_timestamp % USECS_PER_MINUTE) == 0);
+ Assert(oldSnapshotControl->count_used >= 0);
+ Assert(oldSnapshotControl->count_used <= OLD_SNAPSHOT_TIME_MAP_ENTRIES);
+
+ if (oldSnapshotControl->count_used == 0)
+ {
+ /* set up first entry for empty mapping */
+ oldSnapshotControl->head_offset = 0;
+ oldSnapshotControl->head_timestamp = ts;
+ oldSnapshotControl->count_used = 1;
+ oldSnapshotControl->xid_by_minute[0] = xmin;
+ }
+ else if (ts < oldSnapshotControl->head_timestamp)
+ {
+ /* old ts; log it at DEBUG */
+ LWLockRelease(OldSnapshotTimeMapLock);
+ elog(DEBUG1,
+ "MaintainOldSnapshotTimeMapping called with old whenTaken = %ld",
+ (long) whenTaken);
+ return;
+ }
+ else if (ts <= (oldSnapshotControl->head_timestamp +
+ ((oldSnapshotControl->count_used - 1)
+ * USECS_PER_MINUTE)))
+ {
+ /* existing mapping; advance xid if possible */
+ int bucket = (oldSnapshotControl->head_offset
+ + ((ts - oldSnapshotControl->head_timestamp)
+ / USECS_PER_MINUTE))
+ % OLD_SNAPSHOT_TIME_MAP_ENTRIES;
+
+ if (TransactionIdPrecedes(oldSnapshotControl->xid_by_minute[bucket], xmin))
+ oldSnapshotControl->xid_by_minute[bucket] = xmin;
+ }
+ else
+ {
+ /* We need a new bucket, but it might not be the very next one. */
+ int distance_to_new_tail;
+ int distance_to_current_tail;
+ int advance;
+
+ /*
+ * Our goal is for the new "tail" of the mapping, that is, the entry
+ * which is newest and thus furthest from the "head" entry, to
+ * correspond to "ts". Since there's one entry per minute, the
+ * distance between the current head and the new tail is just the
+ * number of minutes of difference between ts and the current
+ * head_timestamp.
+ *
+ * The distance from the current head to the current tail is one less
+ * than the number of entries in the mapping, because the entry at the
+ * head_offset is for 0 minutes after head_timestamp.
+ *
+ * The difference between these two values is the number of minutes by
+ * which we need to advance the mapping, either adding new entries or
+ * rotating old ones out.
+ */
+ distance_to_new_tail =
+ (ts - oldSnapshotControl->head_timestamp) / USECS_PER_MINUTE;
+ distance_to_current_tail =
+ oldSnapshotControl->count_used - 1;
+ advance = distance_to_new_tail - distance_to_current_tail;
+ Assert(advance > 0);
+
+ if (advance >= OLD_SNAPSHOT_TIME_MAP_ENTRIES)
+ {
+ /* Advance is so far that all old data is junk; start over. */
+ oldSnapshotControl->head_offset = 0;
+ oldSnapshotControl->count_used = 1;
+ oldSnapshotControl->xid_by_minute[0] = xmin;
+ oldSnapshotControl->head_timestamp = ts;
+ }
+ else
+ {
+ /* Store the new value in one or more buckets. */
+ int i;
+
+ for (i = 0; i < advance; i++)
+ {
+ if (oldSnapshotControl->count_used == OLD_SNAPSHOT_TIME_MAP_ENTRIES)
+ {
+ /* Map full and new value replaces old head. */
+ int old_head = oldSnapshotControl->head_offset;
+
+ if (old_head == (OLD_SNAPSHOT_TIME_MAP_ENTRIES - 1))
+ oldSnapshotControl->head_offset = 0;
+ else
+ oldSnapshotControl->head_offset = old_head + 1;
+ oldSnapshotControl->xid_by_minute[old_head] = xmin;
+ oldSnapshotControl->head_timestamp += USECS_PER_MINUTE;
+ }
+ else
+ {
+ /* Extend map to unused entry. */
+ int new_tail = (oldSnapshotControl->head_offset
+ + oldSnapshotControl->count_used)
+ % OLD_SNAPSHOT_TIME_MAP_ENTRIES;
+
+ oldSnapshotControl->count_used++;
+ oldSnapshotControl->xid_by_minute[new_tail] = xmin;
+ }
+ }
+ }
+ }
+
+ LWLockRelease(OldSnapshotTimeMapLock);
+}
+
+
+/*
+ * Setup a snapshot that replaces normal catalog snapshots that allows catalog
+ * access to behave just like it did at a certain point in the past.
+ *
+ * Needed for logical decoding.
+ */
+void
+SetupHistoricSnapshot(Snapshot historic_snapshot, HTAB *tuplecids)
+{
+ Assert(historic_snapshot != NULL);
+
+ /* setup the timetravel snapshot */
+ HistoricSnapshot = historic_snapshot;
+
+ /* setup (cmin, cmax) lookup hash */
+ tuplecid_data = tuplecids;
+}
+
+
+/*
+ * Make catalog snapshots behave normally again.
+ */
+void
+TeardownHistoricSnapshot(bool is_error)
+{
+ HistoricSnapshot = NULL;
+ tuplecid_data = NULL;
+}
+
+bool
+HistoricSnapshotActive(void)
+{
+ return HistoricSnapshot != NULL;
+}
+
+HTAB *
+HistoricSnapshotGetTupleCids(void)
+{
+ Assert(HistoricSnapshotActive());
+ return tuplecid_data;
+}
+
+/*
+ * EstimateSnapshotSpace
+ * Returns the size needed to store the given snapshot.
+ *
+ * We are exporting only required fields from the Snapshot, stored in
+ * SerializedSnapshotData.
+ */
+Size
+EstimateSnapshotSpace(Snapshot snapshot)
+{
+ Size size;
+
+ Assert(snapshot != InvalidSnapshot);
+ Assert(snapshot->snapshot_type == SNAPSHOT_MVCC);
+
+ /* We allocate any XID arrays needed in the same palloc block. */
+ size = add_size(sizeof(SerializedSnapshotData),
+ mul_size(snapshot->xcnt, sizeof(TransactionId)));
+ if (snapshot->subxcnt > 0 &&
+ (!snapshot->suboverflowed || snapshot->takenDuringRecovery))
+ size = add_size(size,
+ mul_size(snapshot->subxcnt, sizeof(TransactionId)));
+
+ return size;
+}
+
+/*
+ * SerializeSnapshot
+ * Dumps the serialized snapshot (extracted from given snapshot) onto the
+ * memory location at start_address.
+ */
+void
+SerializeSnapshot(Snapshot snapshot, char *start_address)
+{
+ SerializedSnapshotData serialized_snapshot;
+
+ Assert(snapshot->subxcnt >= 0);
+
+ /* Copy all required fields */
+ serialized_snapshot.xmin = snapshot->xmin;
+ serialized_snapshot.xmax = snapshot->xmax;
+ serialized_snapshot.xcnt = snapshot->xcnt;
+ serialized_snapshot.subxcnt = snapshot->subxcnt;
+ serialized_snapshot.suboverflowed = snapshot->suboverflowed;
+ serialized_snapshot.takenDuringRecovery = snapshot->takenDuringRecovery;
+ serialized_snapshot.curcid = snapshot->curcid;
+ serialized_snapshot.whenTaken = snapshot->whenTaken;
+ serialized_snapshot.lsn = snapshot->lsn;
+
+ /*
+ * Ignore the SubXID array if it has overflowed, unless the snapshot was
+ * taken during recovery - in that case, top-level XIDs are in subxip as
+ * well, and we mustn't lose them.
+ */
+ if (serialized_snapshot.suboverflowed && !snapshot->takenDuringRecovery)
+ serialized_snapshot.subxcnt = 0;
+
+ /* Copy struct to possibly-unaligned buffer */
+ memcpy(start_address,
+ &serialized_snapshot, sizeof(SerializedSnapshotData));
+
+ /* Copy XID array */
+ if (snapshot->xcnt > 0)
+ memcpy((TransactionId *) (start_address +
+ sizeof(SerializedSnapshotData)),
+ snapshot->xip, snapshot->xcnt * sizeof(TransactionId));
+
+ /*
+ * Copy SubXID array. Don't bother to copy it if it had overflowed,
+ * though, because it's not used anywhere in that case. Except if it's a
+ * snapshot taken during recovery; all the top-level XIDs are in subxip as
+ * well in that case, so we mustn't lose them.
+ */
+ if (serialized_snapshot.subxcnt > 0)
+ {
+ Size subxipoff = sizeof(SerializedSnapshotData) +
+ snapshot->xcnt * sizeof(TransactionId);
+
+ memcpy((TransactionId *) (start_address + subxipoff),
+ snapshot->subxip, snapshot->subxcnt * sizeof(TransactionId));
+ }
+}
+
+/*
+ * RestoreSnapshot
+ * Restore a serialized snapshot from the specified address.
+ *
+ * The copy is palloc'd in TopTransactionContext and has initial refcounts set
+ * to 0. The returned snapshot has the copied flag set.
+ */
+Snapshot
+RestoreSnapshot(char *start_address)
+{
+ SerializedSnapshotData serialized_snapshot;
+ Size size;
+ Snapshot snapshot;
+ TransactionId *serialized_xids;
+
+ memcpy(&serialized_snapshot, start_address,
+ sizeof(SerializedSnapshotData));
+ serialized_xids = (TransactionId *)
+ (start_address + sizeof(SerializedSnapshotData));
+
+ /* We allocate any XID arrays needed in the same palloc block. */
+ size = sizeof(SnapshotData)
+ + serialized_snapshot.xcnt * sizeof(TransactionId)
+ + serialized_snapshot.subxcnt * sizeof(TransactionId);
+
+ /* Copy all required fields */
+ snapshot = (Snapshot) MemoryContextAlloc(TopTransactionContext, size);
+ snapshot->snapshot_type = SNAPSHOT_MVCC;
+ snapshot->xmin = serialized_snapshot.xmin;
+ snapshot->xmax = serialized_snapshot.xmax;
+ snapshot->xip = NULL;
+ snapshot->xcnt = serialized_snapshot.xcnt;
+ snapshot->subxip = NULL;
+ snapshot->subxcnt = serialized_snapshot.subxcnt;
+ snapshot->suboverflowed = serialized_snapshot.suboverflowed;
+ snapshot->takenDuringRecovery = serialized_snapshot.takenDuringRecovery;
+ snapshot->curcid = serialized_snapshot.curcid;
+ snapshot->whenTaken = serialized_snapshot.whenTaken;
+ snapshot->lsn = serialized_snapshot.lsn;
+ snapshot->snapXactCompletionCount = 0;
+
+ /* Copy XIDs, if present. */
+ if (serialized_snapshot.xcnt > 0)
+ {
+ snapshot->xip = (TransactionId *) (snapshot + 1);
+ memcpy(snapshot->xip, serialized_xids,
+ serialized_snapshot.xcnt * sizeof(TransactionId));
+ }
+
+ /* Copy SubXIDs, if present. */
+ if (serialized_snapshot.subxcnt > 0)
+ {
+ snapshot->subxip = ((TransactionId *) (snapshot + 1)) +
+ serialized_snapshot.xcnt;
+ memcpy(snapshot->subxip, serialized_xids + serialized_snapshot.xcnt,
+ serialized_snapshot.subxcnt * sizeof(TransactionId));
+ }
+
+ /* Set the copied flag so that the caller will set refcounts correctly. */
+ snapshot->regd_count = 0;
+ snapshot->active_count = 0;
+ snapshot->copied = true;
+
+ return snapshot;
+}
+
+/*
+ * Install a restored snapshot as the transaction snapshot.
+ *
+ * The second argument is of type void * so that snapmgr.h need not include
+ * the declaration for PGPROC.
+ */
+void
+RestoreTransactionSnapshot(Snapshot snapshot, void *source_pgproc)
+{
+ SetTransactionSnapshot(snapshot, NULL, InvalidPid, source_pgproc);
+}
+
+/*
+ * XidInMVCCSnapshot
+ * Is the given XID still-in-progress according to the snapshot?
+ *
+ * Note: GetSnapshotData never stores either top xid or subxids of our own
+ * backend into a snapshot, so these xids will not be reported as "running"
+ * by this function. This is OK for current uses, because we always check
+ * TransactionIdIsCurrentTransactionId first, except when it's known the
+ * XID could not be ours anyway.
+ */
+bool
+XidInMVCCSnapshot(TransactionId xid, Snapshot snapshot)
+{
+ /*
+ * Make a quick range check to eliminate most XIDs without looking at the
+ * xip arrays. Note that this is OK even if we convert a subxact XID to
+ * its parent below, because a subxact with XID < xmin has surely also got
+ * a parent with XID < xmin, while one with XID >= xmax must belong to a
+ * parent that was not yet committed at the time of this snapshot.
+ */
+
+ /* Any xid < xmin is not in-progress */
+ if (TransactionIdPrecedes(xid, snapshot->xmin))
+ return false;
+ /* Any xid >= xmax is in-progress */
+ if (TransactionIdFollowsOrEquals(xid, snapshot->xmax))
+ return true;
+
+ /*
+ * Snapshot information is stored slightly differently in snapshots taken
+ * during recovery.
+ */
+ if (!snapshot->takenDuringRecovery)
+ {
+ /*
+ * If the snapshot contains full subxact data, the fastest way to
+ * check things is just to compare the given XID against both subxact
+ * XIDs and top-level XIDs. If the snapshot overflowed, we have to
+ * use pg_subtrans to convert a subxact XID to its parent XID, but
+ * then we need only look at top-level XIDs not subxacts.
+ */
+ if (!snapshot->suboverflowed)
+ {
+ /* we have full data, so search subxip */
+ if (pg_lfind32(xid, snapshot->subxip, snapshot->subxcnt))
+ return true;
+
+ /* not there, fall through to search xip[] */
+ }
+ else
+ {
+ /*
+ * Snapshot overflowed, so convert xid to top-level. This is safe
+ * because we eliminated too-old XIDs above.
+ */
+ xid = SubTransGetTopmostTransaction(xid);
+
+ /*
+ * If xid was indeed a subxact, we might now have an xid < xmin,
+ * so recheck to avoid an array scan. No point in rechecking
+ * xmax.
+ */
+ if (TransactionIdPrecedes(xid, snapshot->xmin))
+ return false;
+ }
+
+ if (pg_lfind32(xid, snapshot->xip, snapshot->xcnt))
+ return true;
+ }
+ else
+ {
+ /*
+ * In recovery we store all xids in the subxip array because it is by
+ * far the bigger array, and we mostly don't know which xids are
+ * top-level and which are subxacts. The xip array is empty.
+ *
+ * We start by searching subtrans, if we overflowed.
+ */
+ if (snapshot->suboverflowed)
+ {
+ /*
+ * Snapshot overflowed, so convert xid to top-level. This is safe
+ * because we eliminated too-old XIDs above.
+ */
+ xid = SubTransGetTopmostTransaction(xid);
+
+ /*
+ * If xid was indeed a subxact, we might now have an xid < xmin,
+ * so recheck to avoid an array scan. No point in rechecking
+ * xmax.
+ */
+ if (TransactionIdPrecedes(xid, snapshot->xmin))
+ return false;
+ }
+
+ /*
+ * We now have either a top-level xid higher than xmin or an
+ * indeterminate xid. We don't know whether it's top level or subxact
+ * but it doesn't matter. If it's present, the xid is visible.
+ */
+ if (pg_lfind32(xid, snapshot->subxip, snapshot->subxcnt))
+ return true;
+ }
+
+ return false;
+}