summaryrefslogtreecommitdiffstats
path: root/src/backend/access/transam/commit_ts.c
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-05-04 12:15:05 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-05-04 12:15:05 +0000
commit46651ce6fe013220ed397add242004d764fc0153 (patch)
tree6e5299f990f88e60174a1d3ae6e48eedd2688b2b /src/backend/access/transam/commit_ts.c
parentInitial commit. (diff)
downloadpostgresql-14-upstream.tar.xz
postgresql-14-upstream.zip
Adding upstream version 14.5.upstream/14.5upstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'src/backend/access/transam/commit_ts.c')
-rw-r--r--src/backend/access/transam/commit_ts.c1032
1 files changed, 1032 insertions, 0 deletions
diff --git a/src/backend/access/transam/commit_ts.c b/src/backend/access/transam/commit_ts.c
new file mode 100644
index 0000000..edbe3cf
--- /dev/null
+++ b/src/backend/access/transam/commit_ts.c
@@ -0,0 +1,1032 @@
+/*-------------------------------------------------------------------------
+ *
+ * commit_ts.c
+ * PostgreSQL commit timestamp manager
+ *
+ * This module is a pg_xact-like system that stores the commit timestamp
+ * for each transaction.
+ *
+ * XLOG interactions: this module generates an XLOG record whenever a new
+ * CommitTs page is initialized to zeroes. Also, one XLOG record is
+ * generated for setting of values when the caller requests it; this allows
+ * us to support values coming from places other than transaction commit.
+ * Other writes of CommitTS come from recording of transaction commit in
+ * xact.c, which generates its own XLOG records for these events and will
+ * re-perform the status update on redo; so we need make no additional XLOG
+ * entry here.
+ *
+ * Portions Copyright (c) 1996-2021, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * src/backend/access/transam/commit_ts.c
+ *
+ *-------------------------------------------------------------------------
+ */
+#include "postgres.h"
+
+#include "access/commit_ts.h"
+#include "access/htup_details.h"
+#include "access/slru.h"
+#include "access/transam.h"
+#include "catalog/pg_type.h"
+#include "funcapi.h"
+#include "miscadmin.h"
+#include "pg_trace.h"
+#include "storage/shmem.h"
+#include "utils/builtins.h"
+#include "utils/snapmgr.h"
+#include "utils/timestamp.h"
+
+/*
+ * Defines for CommitTs page sizes. A page is the same BLCKSZ as is used
+ * everywhere else in Postgres.
+ *
+ * Note: because TransactionIds are 32 bits and wrap around at 0xFFFFFFFF,
+ * CommitTs page numbering also wraps around at
+ * 0xFFFFFFFF/COMMIT_TS_XACTS_PER_PAGE, and CommitTs segment numbering at
+ * 0xFFFFFFFF/COMMIT_TS_XACTS_PER_PAGE/SLRU_PAGES_PER_SEGMENT. We need take no
+ * explicit notice of that fact in this module, except when comparing segment
+ * and page numbers in TruncateCommitTs (see CommitTsPagePrecedes).
+ */
+
+/*
+ * We need 8+2 bytes per xact. Note that enlarging this struct might mean
+ * the largest possible file name is more than 5 chars long; see
+ * SlruScanDirectory.
+ */
+typedef struct CommitTimestampEntry
+{
+ TimestampTz time;
+ RepOriginId nodeid;
+} CommitTimestampEntry;
+
+#define SizeOfCommitTimestampEntry (offsetof(CommitTimestampEntry, nodeid) + \
+ sizeof(RepOriginId))
+
+#define COMMIT_TS_XACTS_PER_PAGE \
+ (BLCKSZ / SizeOfCommitTimestampEntry)
+
+#define TransactionIdToCTsPage(xid) \
+ ((xid) / (TransactionId) COMMIT_TS_XACTS_PER_PAGE)
+#define TransactionIdToCTsEntry(xid) \
+ ((xid) % (TransactionId) COMMIT_TS_XACTS_PER_PAGE)
+
+/*
+ * Link to shared-memory data structures for CommitTs control
+ */
+static SlruCtlData CommitTsCtlData;
+
+#define CommitTsCtl (&CommitTsCtlData)
+
+/*
+ * We keep a cache of the last value set in shared memory.
+ *
+ * This is also good place to keep the activation status. We keep this
+ * separate from the GUC so that the standby can activate the module if the
+ * primary has it active independently of the value of the GUC.
+ *
+ * This is protected by CommitTsLock. In some places, we use commitTsActive
+ * without acquiring the lock; where this happens, a comment explains the
+ * rationale for it.
+ */
+typedef struct CommitTimestampShared
+{
+ TransactionId xidLastCommit;
+ CommitTimestampEntry dataLastCommit;
+ bool commitTsActive;
+} CommitTimestampShared;
+
+CommitTimestampShared *commitTsShared;
+
+
+/* GUC variable */
+bool track_commit_timestamp;
+
+static void SetXidCommitTsInPage(TransactionId xid, int nsubxids,
+ TransactionId *subxids, TimestampTz ts,
+ RepOriginId nodeid, int pageno);
+static void TransactionIdSetCommitTs(TransactionId xid, TimestampTz ts,
+ RepOriginId nodeid, int slotno);
+static void error_commit_ts_disabled(void);
+static int ZeroCommitTsPage(int pageno, bool writeXlog);
+static bool CommitTsPagePrecedes(int page1, int page2);
+static void ActivateCommitTs(void);
+static void DeactivateCommitTs(void);
+static void WriteZeroPageXlogRec(int pageno);
+static void WriteTruncateXlogRec(int pageno, TransactionId oldestXid);
+
+/*
+ * TransactionTreeSetCommitTsData
+ *
+ * Record the final commit timestamp of transaction entries in the commit log
+ * for a transaction and its subtransaction tree, as efficiently as possible.
+ *
+ * xid is the top level transaction id.
+ *
+ * subxids is an array of xids of length nsubxids, representing subtransactions
+ * in the tree of xid. In various cases nsubxids may be zero.
+ * The reason why tracking just the parent xid commit timestamp is not enough
+ * is that the subtrans SLRU does not stay valid across crashes (it's not
+ * permanent) so we need to keep the information about them here. If the
+ * subtrans implementation changes in the future, we might want to revisit the
+ * decision of storing timestamp info for each subxid.
+ */
+void
+TransactionTreeSetCommitTsData(TransactionId xid, int nsubxids,
+ TransactionId *subxids, TimestampTz timestamp,
+ RepOriginId nodeid)
+{
+ int i;
+ TransactionId headxid;
+ TransactionId newestXact;
+
+ /*
+ * No-op if the module is not active.
+ *
+ * An unlocked read here is fine, because in a standby (the only place
+ * where the flag can change in flight) this routine is only called by the
+ * recovery process, which is also the only process which can change the
+ * flag.
+ */
+ if (!commitTsShared->commitTsActive)
+ return;
+
+ /*
+ * Figure out the latest Xid in this batch: either the last subxid if
+ * there's any, otherwise the parent xid.
+ */
+ if (nsubxids > 0)
+ newestXact = subxids[nsubxids - 1];
+ else
+ newestXact = xid;
+
+ /*
+ * We split the xids to set the timestamp to in groups belonging to the
+ * same SLRU page; the first element in each such set is its head. The
+ * first group has the main XID as the head; subsequent sets use the first
+ * subxid not on the previous page as head. This way, we only have to
+ * lock/modify each SLRU page once.
+ */
+ headxid = xid;
+ i = 0;
+ for (;;)
+ {
+ int pageno = TransactionIdToCTsPage(headxid);
+ int j;
+
+ for (j = i; j < nsubxids; j++)
+ {
+ if (TransactionIdToCTsPage(subxids[j]) != pageno)
+ break;
+ }
+ /* subxids[i..j] are on the same page as the head */
+
+ SetXidCommitTsInPage(headxid, j - i, subxids + i, timestamp, nodeid,
+ pageno);
+
+ /* if we wrote out all subxids, we're done. */
+ if (j >= nsubxids)
+ break;
+
+ /*
+ * Set the new head and skip over it, as well as over the subxids we
+ * just wrote.
+ */
+ headxid = subxids[j];
+ i = j + 1;
+ }
+
+ /* update the cached value in shared memory */
+ LWLockAcquire(CommitTsLock, LW_EXCLUSIVE);
+ commitTsShared->xidLastCommit = xid;
+ commitTsShared->dataLastCommit.time = timestamp;
+ commitTsShared->dataLastCommit.nodeid = nodeid;
+
+ /* and move forwards our endpoint, if needed */
+ if (TransactionIdPrecedes(ShmemVariableCache->newestCommitTsXid, newestXact))
+ ShmemVariableCache->newestCommitTsXid = newestXact;
+ LWLockRelease(CommitTsLock);
+}
+
+/*
+ * Record the commit timestamp of transaction entries in the commit log for all
+ * entries on a single page. Atomic only on this page.
+ */
+static void
+SetXidCommitTsInPage(TransactionId xid, int nsubxids,
+ TransactionId *subxids, TimestampTz ts,
+ RepOriginId nodeid, int pageno)
+{
+ int slotno;
+ int i;
+
+ LWLockAcquire(CommitTsSLRULock, LW_EXCLUSIVE);
+
+ slotno = SimpleLruReadPage(CommitTsCtl, pageno, true, xid);
+
+ TransactionIdSetCommitTs(xid, ts, nodeid, slotno);
+ for (i = 0; i < nsubxids; i++)
+ TransactionIdSetCommitTs(subxids[i], ts, nodeid, slotno);
+
+ CommitTsCtl->shared->page_dirty[slotno] = true;
+
+ LWLockRelease(CommitTsSLRULock);
+}
+
+/*
+ * Sets the commit timestamp of a single transaction.
+ *
+ * Must be called with CommitTsSLRULock held
+ */
+static void
+TransactionIdSetCommitTs(TransactionId xid, TimestampTz ts,
+ RepOriginId nodeid, int slotno)
+{
+ int entryno = TransactionIdToCTsEntry(xid);
+ CommitTimestampEntry entry;
+
+ Assert(TransactionIdIsNormal(xid));
+
+ entry.time = ts;
+ entry.nodeid = nodeid;
+
+ memcpy(CommitTsCtl->shared->page_buffer[slotno] +
+ SizeOfCommitTimestampEntry * entryno,
+ &entry, SizeOfCommitTimestampEntry);
+}
+
+/*
+ * Interrogate the commit timestamp of a transaction.
+ *
+ * The return value indicates whether a commit timestamp record was found for
+ * the given xid. The timestamp value is returned in *ts (which may not be
+ * null), and the origin node for the Xid is returned in *nodeid, if it's not
+ * null.
+ */
+bool
+TransactionIdGetCommitTsData(TransactionId xid, TimestampTz *ts,
+ RepOriginId *nodeid)
+{
+ int pageno = TransactionIdToCTsPage(xid);
+ int entryno = TransactionIdToCTsEntry(xid);
+ int slotno;
+ CommitTimestampEntry entry;
+ TransactionId oldestCommitTsXid;
+ TransactionId newestCommitTsXid;
+
+ if (!TransactionIdIsValid(xid))
+ ereport(ERROR,
+ (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+ errmsg("cannot retrieve commit timestamp for transaction %u", xid)));
+ else if (!TransactionIdIsNormal(xid))
+ {
+ /* frozen and bootstrap xids are always committed far in the past */
+ *ts = 0;
+ if (nodeid)
+ *nodeid = 0;
+ return false;
+ }
+
+ LWLockAcquire(CommitTsLock, LW_SHARED);
+
+ /* Error if module not enabled */
+ if (!commitTsShared->commitTsActive)
+ error_commit_ts_disabled();
+
+ /*
+ * If we're asked for the cached value, return that. Otherwise, fall
+ * through to read from SLRU.
+ */
+ if (commitTsShared->xidLastCommit == xid)
+ {
+ *ts = commitTsShared->dataLastCommit.time;
+ if (nodeid)
+ *nodeid = commitTsShared->dataLastCommit.nodeid;
+
+ LWLockRelease(CommitTsLock);
+ return *ts != 0;
+ }
+
+ oldestCommitTsXid = ShmemVariableCache->oldestCommitTsXid;
+ newestCommitTsXid = ShmemVariableCache->newestCommitTsXid;
+ /* neither is invalid, or both are */
+ Assert(TransactionIdIsValid(oldestCommitTsXid) == TransactionIdIsValid(newestCommitTsXid));
+ LWLockRelease(CommitTsLock);
+
+ /*
+ * Return empty if the requested value is outside our valid range.
+ */
+ if (!TransactionIdIsValid(oldestCommitTsXid) ||
+ TransactionIdPrecedes(xid, oldestCommitTsXid) ||
+ TransactionIdPrecedes(newestCommitTsXid, xid))
+ {
+ *ts = 0;
+ if (nodeid)
+ *nodeid = InvalidRepOriginId;
+ return false;
+ }
+
+ /* lock is acquired by SimpleLruReadPage_ReadOnly */
+ slotno = SimpleLruReadPage_ReadOnly(CommitTsCtl, pageno, xid);
+ memcpy(&entry,
+ CommitTsCtl->shared->page_buffer[slotno] +
+ SizeOfCommitTimestampEntry * entryno,
+ SizeOfCommitTimestampEntry);
+
+ *ts = entry.time;
+ if (nodeid)
+ *nodeid = entry.nodeid;
+
+ LWLockRelease(CommitTsSLRULock);
+ return *ts != 0;
+}
+
+/*
+ * Return the Xid of the latest committed transaction. (As far as this module
+ * is concerned, anyway; it's up to the caller to ensure the value is useful
+ * for its purposes.)
+ *
+ * ts and nodeid are filled with the corresponding data; they can be passed
+ * as NULL if not wanted.
+ */
+TransactionId
+GetLatestCommitTsData(TimestampTz *ts, RepOriginId *nodeid)
+{
+ TransactionId xid;
+
+ LWLockAcquire(CommitTsLock, LW_SHARED);
+
+ /* Error if module not enabled */
+ if (!commitTsShared->commitTsActive)
+ error_commit_ts_disabled();
+
+ xid = commitTsShared->xidLastCommit;
+ if (ts)
+ *ts = commitTsShared->dataLastCommit.time;
+ if (nodeid)
+ *nodeid = commitTsShared->dataLastCommit.nodeid;
+ LWLockRelease(CommitTsLock);
+
+ return xid;
+}
+
+static void
+error_commit_ts_disabled(void)
+{
+ ereport(ERROR,
+ (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+ errmsg("could not get commit timestamp data"),
+ RecoveryInProgress() ?
+ errhint("Make sure the configuration parameter \"%s\" is set on the primary server.",
+ "track_commit_timestamp") :
+ errhint("Make sure the configuration parameter \"%s\" is set.",
+ "track_commit_timestamp")));
+}
+
+/*
+ * SQL-callable wrapper to obtain commit time of a transaction
+ */
+Datum
+pg_xact_commit_timestamp(PG_FUNCTION_ARGS)
+{
+ TransactionId xid = PG_GETARG_TRANSACTIONID(0);
+ TimestampTz ts;
+ bool found;
+
+ found = TransactionIdGetCommitTsData(xid, &ts, NULL);
+
+ if (!found)
+ PG_RETURN_NULL();
+
+ PG_RETURN_TIMESTAMPTZ(ts);
+}
+
+
+/*
+ * pg_last_committed_xact
+ *
+ * SQL-callable wrapper to obtain some information about the latest
+ * committed transaction: transaction ID, timestamp and replication
+ * origin.
+ */
+Datum
+pg_last_committed_xact(PG_FUNCTION_ARGS)
+{
+ TransactionId xid;
+ RepOriginId nodeid;
+ TimestampTz ts;
+ Datum values[3];
+ bool nulls[3];
+ TupleDesc tupdesc;
+ HeapTuple htup;
+
+ /* and construct a tuple with our data */
+ xid = GetLatestCommitTsData(&ts, &nodeid);
+
+ /*
+ * Construct a tuple descriptor for the result row. This must match this
+ * function's pg_proc entry!
+ */
+ tupdesc = CreateTemplateTupleDesc(3);
+ TupleDescInitEntry(tupdesc, (AttrNumber) 1, "xid",
+ XIDOID, -1, 0);
+ TupleDescInitEntry(tupdesc, (AttrNumber) 2, "timestamp",
+ TIMESTAMPTZOID, -1, 0);
+ TupleDescInitEntry(tupdesc, (AttrNumber) 3, "roident",
+ OIDOID, -1, 0);
+ tupdesc = BlessTupleDesc(tupdesc);
+
+ if (!TransactionIdIsNormal(xid))
+ {
+ memset(nulls, true, sizeof(nulls));
+ }
+ else
+ {
+ values[0] = TransactionIdGetDatum(xid);
+ nulls[0] = false;
+
+ values[1] = TimestampTzGetDatum(ts);
+ nulls[1] = false;
+
+ values[2] = ObjectIdGetDatum((Oid) nodeid);
+ nulls[2] = false;
+ }
+
+ htup = heap_form_tuple(tupdesc, values, nulls);
+
+ PG_RETURN_DATUM(HeapTupleGetDatum(htup));
+}
+
+/*
+ * pg_xact_commit_timestamp_origin
+ *
+ * SQL-callable wrapper to obtain commit timestamp and replication origin
+ * of a given transaction.
+ */
+Datum
+pg_xact_commit_timestamp_origin(PG_FUNCTION_ARGS)
+{
+ TransactionId xid = PG_GETARG_TRANSACTIONID(0);
+ RepOriginId nodeid;
+ TimestampTz ts;
+ Datum values[2];
+ bool nulls[2];
+ TupleDesc tupdesc;
+ HeapTuple htup;
+ bool found;
+
+ found = TransactionIdGetCommitTsData(xid, &ts, &nodeid);
+
+ /*
+ * Construct a tuple descriptor for the result row. This must match this
+ * function's pg_proc entry!
+ */
+ tupdesc = CreateTemplateTupleDesc(2);
+ TupleDescInitEntry(tupdesc, (AttrNumber) 1, "timestamp",
+ TIMESTAMPTZOID, -1, 0);
+ TupleDescInitEntry(tupdesc, (AttrNumber) 2, "roident",
+ OIDOID, -1, 0);
+ tupdesc = BlessTupleDesc(tupdesc);
+
+ if (!found)
+ {
+ memset(nulls, true, sizeof(nulls));
+ }
+ else
+ {
+ values[0] = TimestampTzGetDatum(ts);
+ nulls[0] = false;
+
+ values[1] = ObjectIdGetDatum((Oid) nodeid);
+ nulls[1] = false;
+ }
+
+ htup = heap_form_tuple(tupdesc, values, nulls);
+
+ PG_RETURN_DATUM(HeapTupleGetDatum(htup));
+}
+
+/*
+ * Number of shared CommitTS buffers.
+ *
+ * We use a very similar logic as for the number of CLOG buffers; see comments
+ * in CLOGShmemBuffers.
+ */
+Size
+CommitTsShmemBuffers(void)
+{
+ return Min(16, Max(4, NBuffers / 1024));
+}
+
+/*
+ * Shared memory sizing for CommitTs
+ */
+Size
+CommitTsShmemSize(void)
+{
+ return SimpleLruShmemSize(CommitTsShmemBuffers(), 0) +
+ sizeof(CommitTimestampShared);
+}
+
+/*
+ * Initialize CommitTs at system startup (postmaster start or standalone
+ * backend)
+ */
+void
+CommitTsShmemInit(void)
+{
+ bool found;
+
+ CommitTsCtl->PagePrecedes = CommitTsPagePrecedes;
+ SimpleLruInit(CommitTsCtl, "CommitTs", CommitTsShmemBuffers(), 0,
+ CommitTsSLRULock, "pg_commit_ts",
+ LWTRANCHE_COMMITTS_BUFFER,
+ SYNC_HANDLER_COMMIT_TS);
+ SlruPagePrecedesUnitTests(CommitTsCtl, COMMIT_TS_XACTS_PER_PAGE);
+
+ commitTsShared = ShmemInitStruct("CommitTs shared",
+ sizeof(CommitTimestampShared),
+ &found);
+
+ if (!IsUnderPostmaster)
+ {
+ Assert(!found);
+
+ commitTsShared->xidLastCommit = InvalidTransactionId;
+ TIMESTAMP_NOBEGIN(commitTsShared->dataLastCommit.time);
+ commitTsShared->dataLastCommit.nodeid = InvalidRepOriginId;
+ commitTsShared->commitTsActive = false;
+ }
+ else
+ Assert(found);
+}
+
+/*
+ * This function must be called ONCE on system install.
+ *
+ * (The CommitTs directory is assumed to have been created by initdb, and
+ * CommitTsShmemInit must have been called already.)
+ */
+void
+BootStrapCommitTs(void)
+{
+ /*
+ * Nothing to do here at present, unlike most other SLRU modules; segments
+ * are created when the server is started with this module enabled. See
+ * ActivateCommitTs.
+ */
+}
+
+/*
+ * Initialize (or reinitialize) a page of CommitTs to zeroes.
+ * If writeXlog is true, also emit an XLOG record saying we did this.
+ *
+ * The page is not actually written, just set up in shared memory.
+ * The slot number of the new page is returned.
+ *
+ * Control lock must be held at entry, and will be held at exit.
+ */
+static int
+ZeroCommitTsPage(int pageno, bool writeXlog)
+{
+ int slotno;
+
+ slotno = SimpleLruZeroPage(CommitTsCtl, pageno);
+
+ if (writeXlog)
+ WriteZeroPageXlogRec(pageno);
+
+ return slotno;
+}
+
+/*
+ * This must be called ONCE during postmaster or standalone-backend startup,
+ * after StartupXLOG has initialized ShmemVariableCache->nextXid.
+ */
+void
+StartupCommitTs(void)
+{
+ ActivateCommitTs();
+}
+
+/*
+ * This must be called ONCE during postmaster or standalone-backend startup,
+ * after recovery has finished.
+ */
+void
+CompleteCommitTsInitialization(void)
+{
+ /*
+ * If the feature is not enabled, turn it off for good. This also removes
+ * any leftover data.
+ *
+ * Conversely, we activate the module if the feature is enabled. This is
+ * necessary for primary and standby as the activation depends on the
+ * control file contents at the beginning of recovery or when a
+ * XLOG_PARAMETER_CHANGE is replayed.
+ */
+ if (!track_commit_timestamp)
+ DeactivateCommitTs();
+ else
+ ActivateCommitTs();
+}
+
+/*
+ * Activate or deactivate CommitTs' upon reception of a XLOG_PARAMETER_CHANGE
+ * XLog record during recovery.
+ */
+void
+CommitTsParameterChange(bool newvalue, bool oldvalue)
+{
+ /*
+ * If the commit_ts module is disabled in this server and we get word from
+ * the primary server that it is enabled there, activate it so that we can
+ * replay future WAL records involving it; also mark it as active on
+ * pg_control. If the old value was already set, we already did this, so
+ * don't do anything.
+ *
+ * If the module is disabled in the primary, disable it here too, unless
+ * the module is enabled locally.
+ *
+ * Note this only runs in the recovery process, so an unlocked read is
+ * fine.
+ */
+ if (newvalue)
+ {
+ if (!commitTsShared->commitTsActive)
+ ActivateCommitTs();
+ }
+ else if (commitTsShared->commitTsActive)
+ DeactivateCommitTs();
+}
+
+/*
+ * Activate this module whenever necessary.
+ * This must happen during postmaster or standalone-backend startup,
+ * or during WAL replay anytime the track_commit_timestamp setting is
+ * changed in the primary.
+ *
+ * The reason why this SLRU needs separate activation/deactivation functions is
+ * that it can be enabled/disabled during start and the activation/deactivation
+ * on the primary is propagated to the standby via replay. Other SLRUs don't
+ * have this property and they can be just initialized during normal startup.
+ *
+ * This is in charge of creating the currently active segment, if it's not
+ * already there. The reason for this is that the server might have been
+ * running with this module disabled for a while and thus might have skipped
+ * the normal creation point.
+ */
+static void
+ActivateCommitTs(void)
+{
+ TransactionId xid;
+ int pageno;
+
+ /* If we've done this already, there's nothing to do */
+ LWLockAcquire(CommitTsLock, LW_EXCLUSIVE);
+ if (commitTsShared->commitTsActive)
+ {
+ LWLockRelease(CommitTsLock);
+ return;
+ }
+ LWLockRelease(CommitTsLock);
+
+ xid = XidFromFullTransactionId(ShmemVariableCache->nextXid);
+ pageno = TransactionIdToCTsPage(xid);
+
+ /*
+ * Re-Initialize our idea of the latest page number.
+ */
+ LWLockAcquire(CommitTsSLRULock, LW_EXCLUSIVE);
+ CommitTsCtl->shared->latest_page_number = pageno;
+ LWLockRelease(CommitTsSLRULock);
+
+ /*
+ * If CommitTs is enabled, but it wasn't in the previous server run, we
+ * need to set the oldest and newest values to the next Xid; that way, we
+ * will not try to read data that might not have been set.
+ *
+ * XXX does this have a problem if a server is started with commitTs
+ * enabled, then started with commitTs disabled, then restarted with it
+ * enabled again? It doesn't look like it does, because there should be a
+ * checkpoint that sets the value to InvalidTransactionId at end of
+ * recovery; and so any chance of injecting new transactions without
+ * CommitTs values would occur after the oldestCommitTsXid has been set to
+ * Invalid temporarily.
+ */
+ LWLockAcquire(CommitTsLock, LW_EXCLUSIVE);
+ if (ShmemVariableCache->oldestCommitTsXid == InvalidTransactionId)
+ {
+ ShmemVariableCache->oldestCommitTsXid =
+ ShmemVariableCache->newestCommitTsXid = ReadNextTransactionId();
+ }
+ LWLockRelease(CommitTsLock);
+
+ /* Create the current segment file, if necessary */
+ if (!SimpleLruDoesPhysicalPageExist(CommitTsCtl, pageno))
+ {
+ int slotno;
+
+ LWLockAcquire(CommitTsSLRULock, LW_EXCLUSIVE);
+ slotno = ZeroCommitTsPage(pageno, false);
+ SimpleLruWritePage(CommitTsCtl, slotno);
+ Assert(!CommitTsCtl->shared->page_dirty[slotno]);
+ LWLockRelease(CommitTsSLRULock);
+ }
+
+ /* Change the activation status in shared memory. */
+ LWLockAcquire(CommitTsLock, LW_EXCLUSIVE);
+ commitTsShared->commitTsActive = true;
+ LWLockRelease(CommitTsLock);
+}
+
+/*
+ * Deactivate this module.
+ *
+ * This must be called when the track_commit_timestamp parameter is turned off.
+ * This happens during postmaster or standalone-backend startup, or during WAL
+ * replay.
+ *
+ * Resets CommitTs into invalid state to make sure we don't hand back
+ * possibly-invalid data; also removes segments of old data.
+ */
+static void
+DeactivateCommitTs(void)
+{
+ /*
+ * Cleanup the status in the shared memory.
+ *
+ * We reset everything in the commitTsShared record to prevent user from
+ * getting confusing data about last committed transaction on the standby
+ * when the module was activated repeatedly on the primary.
+ */
+ LWLockAcquire(CommitTsLock, LW_EXCLUSIVE);
+
+ commitTsShared->commitTsActive = false;
+ commitTsShared->xidLastCommit = InvalidTransactionId;
+ TIMESTAMP_NOBEGIN(commitTsShared->dataLastCommit.time);
+ commitTsShared->dataLastCommit.nodeid = InvalidRepOriginId;
+
+ ShmemVariableCache->oldestCommitTsXid = InvalidTransactionId;
+ ShmemVariableCache->newestCommitTsXid = InvalidTransactionId;
+
+ LWLockRelease(CommitTsLock);
+
+ /*
+ * Remove *all* files. This is necessary so that there are no leftover
+ * files; in the case where this feature is later enabled after running
+ * with it disabled for some time there may be a gap in the file sequence.
+ * (We can probably tolerate out-of-sequence files, as they are going to
+ * be overwritten anyway when we wrap around, but it seems better to be
+ * tidy.)
+ */
+ LWLockAcquire(CommitTsSLRULock, LW_EXCLUSIVE);
+ (void) SlruScanDirectory(CommitTsCtl, SlruScanDirCbDeleteAll, NULL);
+ LWLockRelease(CommitTsSLRULock);
+}
+
+/*
+ * Perform a checkpoint --- either during shutdown, or on-the-fly
+ */
+void
+CheckPointCommitTs(void)
+{
+ /*
+ * Write dirty CommitTs pages to disk. This may result in sync requests
+ * queued for later handling by ProcessSyncRequests(), as part of the
+ * checkpoint.
+ */
+ SimpleLruWriteAll(CommitTsCtl, true);
+}
+
+/*
+ * Make sure that CommitTs has room for a newly-allocated XID.
+ *
+ * NB: this is called while holding XidGenLock. We want it to be very fast
+ * most of the time; even when it's not so fast, no actual I/O need happen
+ * unless we're forced to write out a dirty CommitTs or xlog page to make room
+ * in shared memory.
+ *
+ * NB: the current implementation relies on track_commit_timestamp being
+ * PGC_POSTMASTER.
+ */
+void
+ExtendCommitTs(TransactionId newestXact)
+{
+ int pageno;
+
+ /*
+ * Nothing to do if module not enabled. Note we do an unlocked read of
+ * the flag here, which is okay because this routine is only called from
+ * GetNewTransactionId, which is never called in a standby.
+ */
+ Assert(!InRecovery);
+ if (!commitTsShared->commitTsActive)
+ return;
+
+ /*
+ * No work except at first XID of a page. But beware: just after
+ * wraparound, the first XID of page zero is FirstNormalTransactionId.
+ */
+ if (TransactionIdToCTsEntry(newestXact) != 0 &&
+ !TransactionIdEquals(newestXact, FirstNormalTransactionId))
+ return;
+
+ pageno = TransactionIdToCTsPage(newestXact);
+
+ LWLockAcquire(CommitTsSLRULock, LW_EXCLUSIVE);
+
+ /* Zero the page and make an XLOG entry about it */
+ ZeroCommitTsPage(pageno, !InRecovery);
+
+ LWLockRelease(CommitTsSLRULock);
+}
+
+/*
+ * Remove all CommitTs segments before the one holding the passed
+ * transaction ID.
+ *
+ * Note that we don't need to flush XLOG here.
+ */
+void
+TruncateCommitTs(TransactionId oldestXact)
+{
+ int cutoffPage;
+
+ /*
+ * The cutoff point is the start of the segment containing oldestXact. We
+ * pass the *page* containing oldestXact to SimpleLruTruncate.
+ */
+ cutoffPage = TransactionIdToCTsPage(oldestXact);
+
+ /* Check to see if there's any files that could be removed */
+ if (!SlruScanDirectory(CommitTsCtl, SlruScanDirCbReportPresence,
+ &cutoffPage))
+ return; /* nothing to remove */
+
+ /* Write XLOG record */
+ WriteTruncateXlogRec(cutoffPage, oldestXact);
+
+ /* Now we can remove the old CommitTs segment(s) */
+ SimpleLruTruncate(CommitTsCtl, cutoffPage);
+}
+
+/*
+ * Set the limit values between which commit TS can be consulted.
+ */
+void
+SetCommitTsLimit(TransactionId oldestXact, TransactionId newestXact)
+{
+ /*
+ * Be careful not to overwrite values that are either further into the
+ * "future" or signal a disabled committs.
+ */
+ LWLockAcquire(CommitTsLock, LW_EXCLUSIVE);
+ if (ShmemVariableCache->oldestCommitTsXid != InvalidTransactionId)
+ {
+ if (TransactionIdPrecedes(ShmemVariableCache->oldestCommitTsXid, oldestXact))
+ ShmemVariableCache->oldestCommitTsXid = oldestXact;
+ if (TransactionIdPrecedes(newestXact, ShmemVariableCache->newestCommitTsXid))
+ ShmemVariableCache->newestCommitTsXid = newestXact;
+ }
+ else
+ {
+ Assert(ShmemVariableCache->newestCommitTsXid == InvalidTransactionId);
+ ShmemVariableCache->oldestCommitTsXid = oldestXact;
+ ShmemVariableCache->newestCommitTsXid = newestXact;
+ }
+ LWLockRelease(CommitTsLock);
+}
+
+/*
+ * Move forwards the oldest commitTS value that can be consulted
+ */
+void
+AdvanceOldestCommitTsXid(TransactionId oldestXact)
+{
+ LWLockAcquire(CommitTsLock, LW_EXCLUSIVE);
+ if (ShmemVariableCache->oldestCommitTsXid != InvalidTransactionId &&
+ TransactionIdPrecedes(ShmemVariableCache->oldestCommitTsXid, oldestXact))
+ ShmemVariableCache->oldestCommitTsXid = oldestXact;
+ LWLockRelease(CommitTsLock);
+}
+
+
+/*
+ * Decide whether a commitTS page number is "older" for truncation purposes.
+ * Analogous to CLOGPagePrecedes().
+ *
+ * At default BLCKSZ, (1 << 31) % COMMIT_TS_XACTS_PER_PAGE == 128. This
+ * introduces differences compared to CLOG and the other SLRUs having (1 <<
+ * 31) % per_page == 0. This function never tests exactly
+ * TransactionIdPrecedes(x-2^31, x). When the system reaches xidStopLimit,
+ * there are two possible counts of page boundaries between oldestXact and the
+ * latest XID assigned, depending on whether oldestXact is within the first
+ * 128 entries of its page. Since this function doesn't know the location of
+ * oldestXact within page2, it returns false for one page that actually is
+ * expendable. This is a wider (yet still negligible) version of the
+ * truncation opportunity that CLOGPagePrecedes() cannot recognize.
+ *
+ * For the sake of a worked example, number entries with decimal values such
+ * that page1==1 entries range from 1.0 to 1.999. Let N+0.15 be the number of
+ * pages that 2^31 entries will span (N is an integer). If oldestXact=N+2.1,
+ * then the final safe XID assignment leaves newestXact=1.95. We keep page 2,
+ * because entry=2.85 is the border that toggles whether entries precede the
+ * last entry of the oldestXact page. While page 2 is expendable at
+ * oldestXact=N+2.1, it would be precious at oldestXact=N+2.9.
+ */
+static bool
+CommitTsPagePrecedes(int page1, int page2)
+{
+ TransactionId xid1;
+ TransactionId xid2;
+
+ xid1 = ((TransactionId) page1) * COMMIT_TS_XACTS_PER_PAGE;
+ xid1 += FirstNormalTransactionId + 1;
+ xid2 = ((TransactionId) page2) * COMMIT_TS_XACTS_PER_PAGE;
+ xid2 += FirstNormalTransactionId + 1;
+
+ return (TransactionIdPrecedes(xid1, xid2) &&
+ TransactionIdPrecedes(xid1, xid2 + COMMIT_TS_XACTS_PER_PAGE - 1));
+}
+
+
+/*
+ * Write a ZEROPAGE xlog record
+ */
+static void
+WriteZeroPageXlogRec(int pageno)
+{
+ XLogBeginInsert();
+ XLogRegisterData((char *) (&pageno), sizeof(int));
+ (void) XLogInsert(RM_COMMIT_TS_ID, COMMIT_TS_ZEROPAGE);
+}
+
+/*
+ * Write a TRUNCATE xlog record
+ */
+static void
+WriteTruncateXlogRec(int pageno, TransactionId oldestXid)
+{
+ xl_commit_ts_truncate xlrec;
+
+ xlrec.pageno = pageno;
+ xlrec.oldestXid = oldestXid;
+
+ XLogBeginInsert();
+ XLogRegisterData((char *) (&xlrec), SizeOfCommitTsTruncate);
+ (void) XLogInsert(RM_COMMIT_TS_ID, COMMIT_TS_TRUNCATE);
+}
+
+/*
+ * CommitTS resource manager's routines
+ */
+void
+commit_ts_redo(XLogReaderState *record)
+{
+ uint8 info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
+
+ /* Backup blocks are not used in commit_ts records */
+ Assert(!XLogRecHasAnyBlockRefs(record));
+
+ if (info == COMMIT_TS_ZEROPAGE)
+ {
+ int pageno;
+ int slotno;
+
+ memcpy(&pageno, XLogRecGetData(record), sizeof(int));
+
+ LWLockAcquire(CommitTsSLRULock, LW_EXCLUSIVE);
+
+ slotno = ZeroCommitTsPage(pageno, false);
+ SimpleLruWritePage(CommitTsCtl, slotno);
+ Assert(!CommitTsCtl->shared->page_dirty[slotno]);
+
+ LWLockRelease(CommitTsSLRULock);
+ }
+ else if (info == COMMIT_TS_TRUNCATE)
+ {
+ xl_commit_ts_truncate *trunc = (xl_commit_ts_truncate *) XLogRecGetData(record);
+
+ AdvanceOldestCommitTsXid(trunc->oldestXid);
+
+ /*
+ * During XLOG replay, latest_page_number isn't set up yet; insert a
+ * suitable value to bypass the sanity test in SimpleLruTruncate.
+ */
+ CommitTsCtl->shared->latest_page_number = trunc->pageno;
+
+ SimpleLruTruncate(CommitTsCtl, trunc->pageno);
+ }
+ else
+ elog(PANIC, "commit_ts_redo: unknown op code %u", info);
+}
+
+/*
+ * Entrypoint for sync.c to sync commit_ts files.
+ */
+int
+committssyncfiletag(const FileTag *ftag, char *path)
+{
+ return SlruSyncFileTag(CommitTsCtl, ftag, path);
+}