summaryrefslogtreecommitdiffstats
path: root/src/backend/access/table
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-05-04 12:17:33 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-05-04 12:17:33 +0000
commit5e45211a64149b3c659b90ff2de6fa982a5a93ed (patch)
tree739caf8c461053357daa9f162bef34516c7bf452 /src/backend/access/table
parentInitial commit. (diff)
downloadpostgresql-15-5e45211a64149b3c659b90ff2de6fa982a5a93ed.tar.xz
postgresql-15-5e45211a64149b3c659b90ff2de6fa982a5a93ed.zip
Adding upstream version 15.5.upstream/15.5
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'src/backend/access/table')
-rw-r--r--src/backend/access/table/Makefile21
-rw-r--r--src/backend/access/table/table.c170
-rw-r--r--src/backend/access/table/tableam.c761
-rw-r--r--src/backend/access/table/tableamapi.c158
-rw-r--r--src/backend/access/table/toast_helper.c337
5 files changed, 1447 insertions, 0 deletions
diff --git a/src/backend/access/table/Makefile b/src/backend/access/table/Makefile
new file mode 100644
index 0000000..9aba3ff
--- /dev/null
+++ b/src/backend/access/table/Makefile
@@ -0,0 +1,21 @@
+#-------------------------------------------------------------------------
+#
+# Makefile--
+# Makefile for access/table
+#
+# IDENTIFICATION
+# src/backend/access/table/Makefile
+#
+#-------------------------------------------------------------------------
+
+subdir = src/backend/access/table
+top_builddir = ../../../..
+include $(top_builddir)/src/Makefile.global
+
+OBJS = \
+ table.o \
+ tableam.o \
+ tableamapi.o \
+ toast_helper.o
+
+include $(top_srcdir)/src/backend/common.mk
diff --git a/src/backend/access/table/table.c b/src/backend/access/table/table.c
new file mode 100644
index 0000000..744d3b5
--- /dev/null
+++ b/src/backend/access/table/table.c
@@ -0,0 +1,170 @@
+/*-------------------------------------------------------------------------
+ *
+ * table.c
+ * Generic routines for table related code.
+ *
+ * Portions Copyright (c) 1996-2022, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ *
+ * IDENTIFICATION
+ * src/backend/access/table/table.c
+ *
+ *
+ * NOTES
+ * This file contains table_ routines that implement access to tables (in
+ * contrast to other relation types like indexes) that are independent of
+ * individual table access methods.
+ *
+ *-------------------------------------------------------------------------
+ */
+
+#include "postgres.h"
+
+#include "access/relation.h"
+#include "access/table.h"
+#include "storage/lmgr.h"
+
+
+/* ----------------
+ * table_open - open a table relation by relation OID
+ *
+ * This is essentially relation_open plus check that the relation
+ * is not an index nor a composite type. (The caller should also
+ * check that it's not a view or foreign table before assuming it has
+ * storage.)
+ * ----------------
+ */
+Relation
+table_open(Oid relationId, LOCKMODE lockmode)
+{
+ Relation r;
+
+ r = relation_open(relationId, lockmode);
+
+ if (r->rd_rel->relkind == RELKIND_INDEX ||
+ r->rd_rel->relkind == RELKIND_PARTITIONED_INDEX)
+ ereport(ERROR,
+ (errcode(ERRCODE_WRONG_OBJECT_TYPE),
+ errmsg("\"%s\" is an index",
+ RelationGetRelationName(r))));
+ else if (r->rd_rel->relkind == RELKIND_COMPOSITE_TYPE)
+ ereport(ERROR,
+ (errcode(ERRCODE_WRONG_OBJECT_TYPE),
+ errmsg("\"%s\" is a composite type",
+ RelationGetRelationName(r))));
+
+ return r;
+}
+
+
+/* ----------------
+ * try_table_open - open a table relation by relation OID
+ *
+ * Same as table_open, except return NULL instead of failing
+ * if the relation does not exist.
+ * ----------------
+ */
+Relation
+try_table_open(Oid relationId, LOCKMODE lockmode)
+{
+ Relation r;
+
+ r = try_relation_open(relationId, lockmode);
+
+ /* leave if table does not exist */
+ if (!r)
+ return NULL;
+
+ if (r->rd_rel->relkind == RELKIND_INDEX ||
+ r->rd_rel->relkind == RELKIND_PARTITIONED_INDEX)
+ ereport(ERROR,
+ (errcode(ERRCODE_WRONG_OBJECT_TYPE),
+ errmsg("\"%s\" is an index",
+ RelationGetRelationName(r))));
+ else if (r->rd_rel->relkind == RELKIND_COMPOSITE_TYPE)
+ ereport(ERROR,
+ (errcode(ERRCODE_WRONG_OBJECT_TYPE),
+ errmsg("\"%s\" is a composite type",
+ RelationGetRelationName(r))));
+
+ return r;
+}
+
+/* ----------------
+ * table_openrv - open a table relation specified
+ * by a RangeVar node
+ *
+ * As above, but relation is specified by a RangeVar.
+ * ----------------
+ */
+Relation
+table_openrv(const RangeVar *relation, LOCKMODE lockmode)
+{
+ Relation r;
+
+ r = relation_openrv(relation, lockmode);
+
+ if (r->rd_rel->relkind == RELKIND_INDEX ||
+ r->rd_rel->relkind == RELKIND_PARTITIONED_INDEX)
+ ereport(ERROR,
+ (errcode(ERRCODE_WRONG_OBJECT_TYPE),
+ errmsg("\"%s\" is an index",
+ RelationGetRelationName(r))));
+ else if (r->rd_rel->relkind == RELKIND_COMPOSITE_TYPE)
+ ereport(ERROR,
+ (errcode(ERRCODE_WRONG_OBJECT_TYPE),
+ errmsg("\"%s\" is a composite type",
+ RelationGetRelationName(r))));
+
+ return r;
+}
+
+/* ----------------
+ * table_openrv_extended - open a table relation specified
+ * by a RangeVar node
+ *
+ * As above, but optionally return NULL instead of failing for
+ * relation-not-found.
+ * ----------------
+ */
+Relation
+table_openrv_extended(const RangeVar *relation, LOCKMODE lockmode,
+ bool missing_ok)
+{
+ Relation r;
+
+ r = relation_openrv_extended(relation, lockmode, missing_ok);
+
+ if (r)
+ {
+ if (r->rd_rel->relkind == RELKIND_INDEX ||
+ r->rd_rel->relkind == RELKIND_PARTITIONED_INDEX)
+ ereport(ERROR,
+ (errcode(ERRCODE_WRONG_OBJECT_TYPE),
+ errmsg("\"%s\" is an index",
+ RelationGetRelationName(r))));
+ else if (r->rd_rel->relkind == RELKIND_COMPOSITE_TYPE)
+ ereport(ERROR,
+ (errcode(ERRCODE_WRONG_OBJECT_TYPE),
+ errmsg("\"%s\" is a composite type",
+ RelationGetRelationName(r))));
+ }
+
+ return r;
+}
+
+/* ----------------
+ * table_close - close a table
+ *
+ * If lockmode is not "NoLock", we then release the specified lock.
+ *
+ * Note that it is often sensible to hold a lock beyond relation_close;
+ * in that case, the lock is released automatically at xact end.
+ * ----------------
+ */
+void
+table_close(Relation relation, LOCKMODE lockmode)
+{
+ relation_close(relation, lockmode);
+}
diff --git a/src/backend/access/table/tableam.c b/src/backend/access/table/tableam.c
new file mode 100644
index 0000000..b3d1a6c
--- /dev/null
+++ b/src/backend/access/table/tableam.c
@@ -0,0 +1,761 @@
+/*----------------------------------------------------------------------
+ *
+ * tableam.c
+ * Table access method routines too big to be inline functions.
+ *
+ * Portions Copyright (c) 1996-2022, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ *
+ * IDENTIFICATION
+ * src/backend/access/table/tableam.c
+ *
+ * NOTES
+ * Note that most function in here are documented in tableam.h, rather than
+ * here. That's because there's a lot of inline functions in tableam.h and
+ * it'd be harder to understand if one constantly had to switch between files.
+ *
+ *----------------------------------------------------------------------
+ */
+#include "postgres.h"
+
+#include <math.h>
+
+#include "access/syncscan.h"
+#include "access/tableam.h"
+#include "access/xact.h"
+#include "optimizer/plancat.h"
+#include "port/pg_bitutils.h"
+#include "storage/bufmgr.h"
+#include "storage/shmem.h"
+#include "storage/smgr.h"
+
+/*
+ * Constants to control the behavior of block allocation to parallel workers
+ * during a parallel seqscan. Technically these values do not need to be
+ * powers of 2, but having them as powers of 2 makes the math more optimal
+ * and makes the ramp-down stepping more even.
+ */
+
+/* The number of I/O chunks we try to break a parallel seqscan down into */
+#define PARALLEL_SEQSCAN_NCHUNKS 2048
+/* Ramp down size of allocations when we've only this number of chunks left */
+#define PARALLEL_SEQSCAN_RAMPDOWN_CHUNKS 64
+/* Cap the size of parallel I/O chunks to this number of blocks */
+#define PARALLEL_SEQSCAN_MAX_CHUNK_SIZE 8192
+
+/* GUC variables */
+char *default_table_access_method = DEFAULT_TABLE_ACCESS_METHOD;
+bool synchronize_seqscans = true;
+
+
+/* ----------------------------------------------------------------------------
+ * Slot functions.
+ * ----------------------------------------------------------------------------
+ */
+
+const TupleTableSlotOps *
+table_slot_callbacks(Relation relation)
+{
+ const TupleTableSlotOps *tts_cb;
+
+ if (relation->rd_tableam)
+ tts_cb = relation->rd_tableam->slot_callbacks(relation);
+ else if (relation->rd_rel->relkind == RELKIND_FOREIGN_TABLE)
+ {
+ /*
+ * Historically FDWs expect to store heap tuples in slots. Continue
+ * handing them one, to make it less painful to adapt FDWs to new
+ * versions. The cost of a heap slot over a virtual slot is pretty
+ * small.
+ */
+ tts_cb = &TTSOpsHeapTuple;
+ }
+ else
+ {
+ /*
+ * These need to be supported, as some parts of the code (like COPY)
+ * need to create slots for such relations too. It seems better to
+ * centralize the knowledge that a heap slot is the right thing in
+ * that case here.
+ */
+ Assert(relation->rd_rel->relkind == RELKIND_VIEW ||
+ relation->rd_rel->relkind == RELKIND_PARTITIONED_TABLE);
+ tts_cb = &TTSOpsVirtual;
+ }
+
+ return tts_cb;
+}
+
+TupleTableSlot *
+table_slot_create(Relation relation, List **reglist)
+{
+ const TupleTableSlotOps *tts_cb;
+ TupleTableSlot *slot;
+
+ tts_cb = table_slot_callbacks(relation);
+ slot = MakeSingleTupleTableSlot(RelationGetDescr(relation), tts_cb);
+
+ if (reglist)
+ *reglist = lappend(*reglist, slot);
+
+ return slot;
+}
+
+
+/* ----------------------------------------------------------------------------
+ * Table scan functions.
+ * ----------------------------------------------------------------------------
+ */
+
+TableScanDesc
+table_beginscan_catalog(Relation relation, int nkeys, struct ScanKeyData *key)
+{
+ uint32 flags = SO_TYPE_SEQSCAN |
+ SO_ALLOW_STRAT | SO_ALLOW_SYNC | SO_ALLOW_PAGEMODE | SO_TEMP_SNAPSHOT;
+ Oid relid = RelationGetRelid(relation);
+ Snapshot snapshot = RegisterSnapshot(GetCatalogSnapshot(relid));
+
+ return relation->rd_tableam->scan_begin(relation, snapshot, nkeys, key,
+ NULL, flags);
+}
+
+void
+table_scan_update_snapshot(TableScanDesc scan, Snapshot snapshot)
+{
+ Assert(IsMVCCSnapshot(snapshot));
+
+ RegisterSnapshot(snapshot);
+ scan->rs_snapshot = snapshot;
+ scan->rs_flags |= SO_TEMP_SNAPSHOT;
+}
+
+
+/* ----------------------------------------------------------------------------
+ * Parallel table scan related functions.
+ * ----------------------------------------------------------------------------
+ */
+
+Size
+table_parallelscan_estimate(Relation rel, Snapshot snapshot)
+{
+ Size sz = 0;
+
+ if (IsMVCCSnapshot(snapshot))
+ sz = add_size(sz, EstimateSnapshotSpace(snapshot));
+ else
+ Assert(snapshot == SnapshotAny);
+
+ sz = add_size(sz, rel->rd_tableam->parallelscan_estimate(rel));
+
+ return sz;
+}
+
+void
+table_parallelscan_initialize(Relation rel, ParallelTableScanDesc pscan,
+ Snapshot snapshot)
+{
+ Size snapshot_off = rel->rd_tableam->parallelscan_initialize(rel, pscan);
+
+ pscan->phs_snapshot_off = snapshot_off;
+
+ if (IsMVCCSnapshot(snapshot))
+ {
+ SerializeSnapshot(snapshot, (char *) pscan + pscan->phs_snapshot_off);
+ pscan->phs_snapshot_any = false;
+ }
+ else
+ {
+ Assert(snapshot == SnapshotAny);
+ pscan->phs_snapshot_any = true;
+ }
+}
+
+TableScanDesc
+table_beginscan_parallel(Relation relation, ParallelTableScanDesc parallel_scan)
+{
+ Snapshot snapshot;
+ uint32 flags = SO_TYPE_SEQSCAN |
+ SO_ALLOW_STRAT | SO_ALLOW_SYNC | SO_ALLOW_PAGEMODE;
+
+ Assert(RelationGetRelid(relation) == parallel_scan->phs_relid);
+
+ if (!parallel_scan->phs_snapshot_any)
+ {
+ /* Snapshot was serialized -- restore it */
+ snapshot = RestoreSnapshot((char *) parallel_scan +
+ parallel_scan->phs_snapshot_off);
+ RegisterSnapshot(snapshot);
+ flags |= SO_TEMP_SNAPSHOT;
+ }
+ else
+ {
+ /* SnapshotAny passed by caller (not serialized) */
+ snapshot = SnapshotAny;
+ }
+
+ return relation->rd_tableam->scan_begin(relation, snapshot, 0, NULL,
+ parallel_scan, flags);
+}
+
+
+/* ----------------------------------------------------------------------------
+ * Index scan related functions.
+ * ----------------------------------------------------------------------------
+ */
+
+/*
+ * To perform that check simply start an index scan, create the necessary
+ * slot, do the heap lookup, and shut everything down again. This could be
+ * optimized, but is unlikely to matter from a performance POV. If there
+ * frequently are live index pointers also matching a unique index key, the
+ * CPU overhead of this routine is unlikely to matter.
+ *
+ * Note that *tid may be modified when we return true if the AM supports
+ * storing multiple row versions reachable via a single index entry (like
+ * heap's HOT).
+ */
+bool
+table_index_fetch_tuple_check(Relation rel,
+ ItemPointer tid,
+ Snapshot snapshot,
+ bool *all_dead)
+{
+ IndexFetchTableData *scan;
+ TupleTableSlot *slot;
+ bool call_again = false;
+ bool found;
+
+ slot = table_slot_create(rel, NULL);
+ scan = table_index_fetch_begin(rel);
+ found = table_index_fetch_tuple(scan, tid, snapshot, slot, &call_again,
+ all_dead);
+ table_index_fetch_end(scan);
+ ExecDropSingleTupleTableSlot(slot);
+
+ return found;
+}
+
+
+/* ------------------------------------------------------------------------
+ * Functions for non-modifying operations on individual tuples
+ * ------------------------------------------------------------------------
+ */
+
+void
+table_tuple_get_latest_tid(TableScanDesc scan, ItemPointer tid)
+{
+ Relation rel = scan->rs_rd;
+ const TableAmRoutine *tableam = rel->rd_tableam;
+
+ /*
+ * We don't expect direct calls to table_tuple_get_latest_tid with valid
+ * CheckXidAlive for catalog or regular tables. See detailed comments in
+ * xact.c where these variables are declared.
+ */
+ if (unlikely(TransactionIdIsValid(CheckXidAlive) && !bsysscan))
+ elog(ERROR, "unexpected table_tuple_get_latest_tid call during logical decoding");
+
+ /*
+ * Since this can be called with user-supplied TID, don't trust the input
+ * too much.
+ */
+ if (!tableam->tuple_tid_valid(scan, tid))
+ ereport(ERROR,
+ (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+ errmsg("tid (%u, %u) is not valid for relation \"%s\"",
+ ItemPointerGetBlockNumberNoCheck(tid),
+ ItemPointerGetOffsetNumberNoCheck(tid),
+ RelationGetRelationName(rel))));
+
+ tableam->tuple_get_latest_tid(scan, tid);
+}
+
+
+/* ----------------------------------------------------------------------------
+ * Functions to make modifications a bit simpler.
+ * ----------------------------------------------------------------------------
+ */
+
+/*
+ * simple_table_tuple_insert - insert a tuple
+ *
+ * Currently, this routine differs from table_tuple_insert only in supplying a
+ * default command ID and not allowing access to the speedup options.
+ */
+void
+simple_table_tuple_insert(Relation rel, TupleTableSlot *slot)
+{
+ table_tuple_insert(rel, slot, GetCurrentCommandId(true), 0, NULL);
+}
+
+/*
+ * simple_table_tuple_delete - delete a tuple
+ *
+ * This routine may be used to delete a tuple when concurrent updates of
+ * the target tuple are not expected (for example, because we have a lock
+ * on the relation associated with the tuple). Any failure is reported
+ * via ereport().
+ */
+void
+simple_table_tuple_delete(Relation rel, ItemPointer tid, Snapshot snapshot)
+{
+ TM_Result result;
+ TM_FailureData tmfd;
+
+ result = table_tuple_delete(rel, tid,
+ GetCurrentCommandId(true),
+ snapshot, InvalidSnapshot,
+ true /* wait for commit */ ,
+ &tmfd, false /* changingPart */ );
+
+ switch (result)
+ {
+ case TM_SelfModified:
+ /* Tuple was already updated in current command? */
+ elog(ERROR, "tuple already updated by self");
+ break;
+
+ case TM_Ok:
+ /* done successfully */
+ break;
+
+ case TM_Updated:
+ elog(ERROR, "tuple concurrently updated");
+ break;
+
+ case TM_Deleted:
+ elog(ERROR, "tuple concurrently deleted");
+ break;
+
+ default:
+ elog(ERROR, "unrecognized table_tuple_delete status: %u", result);
+ break;
+ }
+}
+
+/*
+ * simple_table_tuple_update - replace a tuple
+ *
+ * This routine may be used to update a tuple when concurrent updates of
+ * the target tuple are not expected (for example, because we have a lock
+ * on the relation associated with the tuple). Any failure is reported
+ * via ereport().
+ */
+void
+simple_table_tuple_update(Relation rel, ItemPointer otid,
+ TupleTableSlot *slot,
+ Snapshot snapshot,
+ bool *update_indexes)
+{
+ TM_Result result;
+ TM_FailureData tmfd;
+ LockTupleMode lockmode;
+
+ result = table_tuple_update(rel, otid, slot,
+ GetCurrentCommandId(true),
+ snapshot, InvalidSnapshot,
+ true /* wait for commit */ ,
+ &tmfd, &lockmode, update_indexes);
+
+ switch (result)
+ {
+ case TM_SelfModified:
+ /* Tuple was already updated in current command? */
+ elog(ERROR, "tuple already updated by self");
+ break;
+
+ case TM_Ok:
+ /* done successfully */
+ break;
+
+ case TM_Updated:
+ elog(ERROR, "tuple concurrently updated");
+ break;
+
+ case TM_Deleted:
+ elog(ERROR, "tuple concurrently deleted");
+ break;
+
+ default:
+ elog(ERROR, "unrecognized table_tuple_update status: %u", result);
+ break;
+ }
+}
+
+
+/* ----------------------------------------------------------------------------
+ * Helper functions to implement parallel scans for block oriented AMs.
+ * ----------------------------------------------------------------------------
+ */
+
+Size
+table_block_parallelscan_estimate(Relation rel)
+{
+ return sizeof(ParallelBlockTableScanDescData);
+}
+
+Size
+table_block_parallelscan_initialize(Relation rel, ParallelTableScanDesc pscan)
+{
+ ParallelBlockTableScanDesc bpscan = (ParallelBlockTableScanDesc) pscan;
+
+ bpscan->base.phs_relid = RelationGetRelid(rel);
+ bpscan->phs_nblocks = RelationGetNumberOfBlocks(rel);
+ /* compare phs_syncscan initialization to similar logic in initscan */
+ bpscan->base.phs_syncscan = synchronize_seqscans &&
+ !RelationUsesLocalBuffers(rel) &&
+ bpscan->phs_nblocks > NBuffers / 4;
+ SpinLockInit(&bpscan->phs_mutex);
+ bpscan->phs_startblock = InvalidBlockNumber;
+ pg_atomic_init_u64(&bpscan->phs_nallocated, 0);
+
+ return sizeof(ParallelBlockTableScanDescData);
+}
+
+void
+table_block_parallelscan_reinitialize(Relation rel, ParallelTableScanDesc pscan)
+{
+ ParallelBlockTableScanDesc bpscan = (ParallelBlockTableScanDesc) pscan;
+
+ pg_atomic_write_u64(&bpscan->phs_nallocated, 0);
+}
+
+/*
+ * find and set the scan's startblock
+ *
+ * Determine where the parallel seq scan should start. This function may be
+ * called many times, once by each parallel worker. We must be careful only
+ * to set the startblock once.
+ */
+void
+table_block_parallelscan_startblock_init(Relation rel,
+ ParallelBlockTableScanWorker pbscanwork,
+ ParallelBlockTableScanDesc pbscan)
+{
+ BlockNumber sync_startpage = InvalidBlockNumber;
+
+ /* Reset the state we use for controlling allocation size. */
+ memset(pbscanwork, 0, sizeof(*pbscanwork));
+
+ StaticAssertStmt(MaxBlockNumber <= 0xFFFFFFFE,
+ "pg_nextpower2_32 may be too small for non-standard BlockNumber width");
+
+ /*
+ * We determine the chunk size based on the size of the relation. First we
+ * split the relation into PARALLEL_SEQSCAN_NCHUNKS chunks but we then
+ * take the next highest power of 2 number of the chunk size. This means
+ * we split the relation into somewhere between PARALLEL_SEQSCAN_NCHUNKS
+ * and PARALLEL_SEQSCAN_NCHUNKS / 2 chunks.
+ */
+ pbscanwork->phsw_chunk_size = pg_nextpower2_32(Max(pbscan->phs_nblocks /
+ PARALLEL_SEQSCAN_NCHUNKS, 1));
+
+ /*
+ * Ensure we don't go over the maximum chunk size with larger tables. This
+ * means we may get much more than PARALLEL_SEQSCAN_NCHUNKS for larger
+ * tables. Too large a chunk size has been shown to be detrimental to
+ * synchronous scan performance.
+ */
+ pbscanwork->phsw_chunk_size = Min(pbscanwork->phsw_chunk_size,
+ PARALLEL_SEQSCAN_MAX_CHUNK_SIZE);
+
+retry:
+ /* Grab the spinlock. */
+ SpinLockAcquire(&pbscan->phs_mutex);
+
+ /*
+ * If the scan's startblock has not yet been initialized, we must do so
+ * now. If this is not a synchronized scan, we just start at block 0, but
+ * if it is a synchronized scan, we must get the starting position from
+ * the synchronized scan machinery. We can't hold the spinlock while
+ * doing that, though, so release the spinlock, get the information we
+ * need, and retry. If nobody else has initialized the scan in the
+ * meantime, we'll fill in the value we fetched on the second time
+ * through.
+ */
+ if (pbscan->phs_startblock == InvalidBlockNumber)
+ {
+ if (!pbscan->base.phs_syncscan)
+ pbscan->phs_startblock = 0;
+ else if (sync_startpage != InvalidBlockNumber)
+ pbscan->phs_startblock = sync_startpage;
+ else
+ {
+ SpinLockRelease(&pbscan->phs_mutex);
+ sync_startpage = ss_get_location(rel, pbscan->phs_nblocks);
+ goto retry;
+ }
+ }
+ SpinLockRelease(&pbscan->phs_mutex);
+}
+
+/*
+ * get the next page to scan
+ *
+ * Get the next page to scan. Even if there are no pages left to scan,
+ * another backend could have grabbed a page to scan and not yet finished
+ * looking at it, so it doesn't follow that the scan is done when the first
+ * backend gets an InvalidBlockNumber return.
+ */
+BlockNumber
+table_block_parallelscan_nextpage(Relation rel,
+ ParallelBlockTableScanWorker pbscanwork,
+ ParallelBlockTableScanDesc pbscan)
+{
+ BlockNumber page;
+ uint64 nallocated;
+
+ /*
+ * The logic below allocates block numbers out to parallel workers in a
+ * way that each worker will receive a set of consecutive block numbers to
+ * scan. Earlier versions of this would allocate the next highest block
+ * number to the next worker to call this function. This would generally
+ * result in workers never receiving consecutive block numbers. Some
+ * operating systems would not detect the sequential I/O pattern due to
+ * each backend being a different process which could result in poor
+ * performance due to inefficient or no readahead. To work around this
+ * issue, we now allocate a range of block numbers for each worker and
+ * when they come back for another block, we give them the next one in
+ * that range until the range is complete. When the worker completes the
+ * range of blocks we then allocate another range for it and return the
+ * first block number from that range.
+ *
+ * Here we name these ranges of blocks "chunks". The initial size of
+ * these chunks is determined in table_block_parallelscan_startblock_init
+ * based on the size of the relation. Towards the end of the scan, we
+ * start making reductions in the size of the chunks in order to attempt
+ * to divide the remaining work over all the workers as evenly as
+ * possible.
+ *
+ * Here pbscanwork is local worker memory. phsw_chunk_remaining tracks
+ * the number of blocks remaining in the chunk. When that reaches 0 then
+ * we must allocate a new chunk for the worker.
+ *
+ * phs_nallocated tracks how many blocks have been allocated to workers
+ * already. When phs_nallocated >= rs_nblocks, all blocks have been
+ * allocated.
+ *
+ * Because we use an atomic fetch-and-add to fetch the current value, the
+ * phs_nallocated counter will exceed rs_nblocks, because workers will
+ * still increment the value, when they try to allocate the next block but
+ * all blocks have been allocated already. The counter must be 64 bits
+ * wide because of that, to avoid wrapping around when rs_nblocks is close
+ * to 2^32.
+ *
+ * The actual block to return is calculated by adding the counter to the
+ * starting block number, modulo nblocks.
+ */
+
+ /*
+ * First check if we have any remaining blocks in a previous chunk for
+ * this worker. We must consume all of the blocks from that before we
+ * allocate a new chunk to the worker.
+ */
+ if (pbscanwork->phsw_chunk_remaining > 0)
+ {
+ /*
+ * Give them the next block in the range and update the remaining
+ * number of blocks.
+ */
+ nallocated = ++pbscanwork->phsw_nallocated;
+ pbscanwork->phsw_chunk_remaining--;
+ }
+ else
+ {
+ /*
+ * When we've only got PARALLEL_SEQSCAN_RAMPDOWN_CHUNKS chunks
+ * remaining in the scan, we half the chunk size. Since we reduce the
+ * chunk size here, we'll hit this again after doing
+ * PARALLEL_SEQSCAN_RAMPDOWN_CHUNKS at the new size. After a few
+ * iterations of this, we'll end up doing the last few blocks with the
+ * chunk size set to 1.
+ */
+ if (pbscanwork->phsw_chunk_size > 1 &&
+ pbscanwork->phsw_nallocated > pbscan->phs_nblocks -
+ (pbscanwork->phsw_chunk_size * PARALLEL_SEQSCAN_RAMPDOWN_CHUNKS))
+ pbscanwork->phsw_chunk_size >>= 1;
+
+ nallocated = pbscanwork->phsw_nallocated =
+ pg_atomic_fetch_add_u64(&pbscan->phs_nallocated,
+ pbscanwork->phsw_chunk_size);
+
+ /*
+ * Set the remaining number of blocks in this chunk so that subsequent
+ * calls from this worker continue on with this chunk until it's done.
+ */
+ pbscanwork->phsw_chunk_remaining = pbscanwork->phsw_chunk_size - 1;
+ }
+
+ if (nallocated >= pbscan->phs_nblocks)
+ page = InvalidBlockNumber; /* all blocks have been allocated */
+ else
+ page = (nallocated + pbscan->phs_startblock) % pbscan->phs_nblocks;
+
+ /*
+ * Report scan location. Normally, we report the current page number.
+ * When we reach the end of the scan, though, we report the starting page,
+ * not the ending page, just so the starting positions for later scans
+ * doesn't slew backwards. We only report the position at the end of the
+ * scan once, though: subsequent callers will report nothing.
+ */
+ if (pbscan->base.phs_syncscan)
+ {
+ if (page != InvalidBlockNumber)
+ ss_report_location(rel, page);
+ else if (nallocated == pbscan->phs_nblocks)
+ ss_report_location(rel, pbscan->phs_startblock);
+ }
+
+ return page;
+}
+
+/* ----------------------------------------------------------------------------
+ * Helper functions to implement relation sizing for block oriented AMs.
+ * ----------------------------------------------------------------------------
+ */
+
+/*
+ * table_block_relation_size
+ *
+ * If a table AM uses the various relation forks as the sole place where data
+ * is stored, and if it uses them in the expected manner (e.g. the actual data
+ * is in the main fork rather than some other), it can use this implementation
+ * of the relation_size callback rather than implementing its own.
+ */
+uint64
+table_block_relation_size(Relation rel, ForkNumber forkNumber)
+{
+ uint64 nblocks = 0;
+
+ /* InvalidForkNumber indicates returning the size for all forks */
+ if (forkNumber == InvalidForkNumber)
+ {
+ for (int i = 0; i < MAX_FORKNUM; i++)
+ nblocks += smgrnblocks(RelationGetSmgr(rel), i);
+ }
+ else
+ nblocks = smgrnblocks(RelationGetSmgr(rel), forkNumber);
+
+ return nblocks * BLCKSZ;
+}
+
+/*
+ * table_block_relation_estimate_size
+ *
+ * This function can't be directly used as the implementation of the
+ * relation_estimate_size callback, because it has a few additional parameters.
+ * Instead, it is intended to be used as a helper function; the caller can
+ * pass through the arguments to its relation_estimate_size function plus the
+ * additional values required here.
+ *
+ * overhead_bytes_per_tuple should contain the approximate number of bytes
+ * of storage required to store a tuple above and beyond what is required for
+ * the tuple data proper. Typically, this would include things like the
+ * size of the tuple header and item pointer. This is only used for query
+ * planning, so a table AM where the value is not constant could choose to
+ * pass a "best guess".
+ *
+ * usable_bytes_per_page should contain the approximate number of bytes per
+ * page usable for tuple data, excluding the page header and any anticipated
+ * special space.
+ */
+void
+table_block_relation_estimate_size(Relation rel, int32 *attr_widths,
+ BlockNumber *pages, double *tuples,
+ double *allvisfrac,
+ Size overhead_bytes_per_tuple,
+ Size usable_bytes_per_page)
+{
+ BlockNumber curpages;
+ BlockNumber relpages;
+ double reltuples;
+ BlockNumber relallvisible;
+ double density;
+
+ /* it should have storage, so we can call the smgr */
+ curpages = RelationGetNumberOfBlocks(rel);
+
+ /* coerce values in pg_class to more desirable types */
+ relpages = (BlockNumber) rel->rd_rel->relpages;
+ reltuples = (double) rel->rd_rel->reltuples;
+ relallvisible = (BlockNumber) rel->rd_rel->relallvisible;
+
+ /*
+ * HACK: if the relation has never yet been vacuumed, use a minimum size
+ * estimate of 10 pages. The idea here is to avoid assuming a
+ * newly-created table is really small, even if it currently is, because
+ * that may not be true once some data gets loaded into it. Once a vacuum
+ * or analyze cycle has been done on it, it's more reasonable to believe
+ * the size is somewhat stable.
+ *
+ * (Note that this is only an issue if the plan gets cached and used again
+ * after the table has been filled. What we're trying to avoid is using a
+ * nestloop-type plan on a table that has grown substantially since the
+ * plan was made. Normally, autovacuum/autoanalyze will occur once enough
+ * inserts have happened and cause cached-plan invalidation; but that
+ * doesn't happen instantaneously, and it won't happen at all for cases
+ * such as temporary tables.)
+ *
+ * We test "never vacuumed" by seeing whether reltuples < 0.
+ *
+ * If the table has inheritance children, we don't apply this heuristic.
+ * Totally empty parent tables are quite common, so we should be willing
+ * to believe that they are empty.
+ */
+ if (curpages < 10 &&
+ reltuples < 0 &&
+ !rel->rd_rel->relhassubclass)
+ curpages = 10;
+
+ /* report estimated # pages */
+ *pages = curpages;
+ /* quick exit if rel is clearly empty */
+ if (curpages == 0)
+ {
+ *tuples = 0;
+ *allvisfrac = 0;
+ return;
+ }
+
+ /* estimate number of tuples from previous tuple density */
+ if (reltuples >= 0 && relpages > 0)
+ density = reltuples / (double) relpages;
+ else
+ {
+ /*
+ * When we have no data because the relation was never yet vacuumed,
+ * estimate tuple width from attribute datatypes. We assume here that
+ * the pages are completely full, which is OK for tables but is
+ * probably an overestimate for indexes. Fortunately
+ * get_relation_info() can clamp the overestimate to the parent
+ * table's size.
+ *
+ * Note: this code intentionally disregards alignment considerations,
+ * because (a) that would be gilding the lily considering how crude
+ * the estimate is, (b) it creates platform dependencies in the
+ * default plans which are kind of a headache for regression testing,
+ * and (c) different table AMs might use different padding schemes.
+ */
+ int32 tuple_width;
+
+ tuple_width = get_rel_data_width(rel, attr_widths);
+ tuple_width += overhead_bytes_per_tuple;
+ /* note: integer division is intentional here */
+ density = usable_bytes_per_page / tuple_width;
+ }
+ *tuples = rint(density * (double) curpages);
+
+ /*
+ * We use relallvisible as-is, rather than scaling it up like we do for
+ * the pages and tuples counts, on the theory that any pages added since
+ * the last VACUUM are most likely not marked all-visible. But costsize.c
+ * wants it converted to a fraction.
+ */
+ if (relallvisible == 0 || curpages <= 0)
+ *allvisfrac = 0;
+ else if ((double) relallvisible >= curpages)
+ *allvisfrac = 1;
+ else
+ *allvisfrac = (double) relallvisible / curpages;
+}
diff --git a/src/backend/access/table/tableamapi.c b/src/backend/access/table/tableamapi.c
new file mode 100644
index 0000000..76df798
--- /dev/null
+++ b/src/backend/access/table/tableamapi.c
@@ -0,0 +1,158 @@
+/*----------------------------------------------------------------------
+ *
+ * tableamapi.c
+ * Support routines for API for Postgres table access methods
+ *
+ * Portions Copyright (c) 1996-2022, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * src/backend/access/table/tableamapi.c
+ *----------------------------------------------------------------------
+ */
+#include "postgres.h"
+
+#include "access/heapam.h"
+#include "access/htup_details.h"
+#include "access/tableam.h"
+#include "access/xact.h"
+#include "catalog/pg_am.h"
+#include "catalog/pg_proc.h"
+#include "commands/defrem.h"
+#include "miscadmin.h"
+#include "utils/fmgroids.h"
+#include "utils/memutils.h"
+#include "utils/syscache.h"
+
+
+/*
+ * GetTableAmRoutine
+ * Call the specified access method handler routine to get its
+ * TableAmRoutine struct, which will be palloc'd in the caller's
+ * memory context.
+ */
+const TableAmRoutine *
+GetTableAmRoutine(Oid amhandler)
+{
+ Datum datum;
+ const TableAmRoutine *routine;
+
+ datum = OidFunctionCall0(amhandler);
+ routine = (TableAmRoutine *) DatumGetPointer(datum);
+
+ if (routine == NULL || !IsA(routine, TableAmRoutine))
+ elog(ERROR, "table access method handler %u did not return a TableAmRoutine struct",
+ amhandler);
+
+ /*
+ * Assert that all required callbacks are present. That makes it a bit
+ * easier to keep AMs up to date, e.g. when forward porting them to a new
+ * major version.
+ */
+ Assert(routine->scan_begin != NULL);
+ Assert(routine->scan_end != NULL);
+ Assert(routine->scan_rescan != NULL);
+ Assert(routine->scan_getnextslot != NULL);
+
+ Assert(routine->parallelscan_estimate != NULL);
+ Assert(routine->parallelscan_initialize != NULL);
+ Assert(routine->parallelscan_reinitialize != NULL);
+
+ Assert(routine->index_fetch_begin != NULL);
+ Assert(routine->index_fetch_reset != NULL);
+ Assert(routine->index_fetch_end != NULL);
+ Assert(routine->index_fetch_tuple != NULL);
+
+ Assert(routine->tuple_fetch_row_version != NULL);
+ Assert(routine->tuple_tid_valid != NULL);
+ Assert(routine->tuple_get_latest_tid != NULL);
+ Assert(routine->tuple_satisfies_snapshot != NULL);
+ Assert(routine->index_delete_tuples != NULL);
+
+ Assert(routine->tuple_insert != NULL);
+
+ /*
+ * Could be made optional, but would require throwing error during
+ * parse-analysis.
+ */
+ Assert(routine->tuple_insert_speculative != NULL);
+ Assert(routine->tuple_complete_speculative != NULL);
+
+ Assert(routine->multi_insert != NULL);
+ Assert(routine->tuple_delete != NULL);
+ Assert(routine->tuple_update != NULL);
+ Assert(routine->tuple_lock != NULL);
+
+ Assert(routine->relation_set_new_filenode != NULL);
+ Assert(routine->relation_nontransactional_truncate != NULL);
+ Assert(routine->relation_copy_data != NULL);
+ Assert(routine->relation_copy_for_cluster != NULL);
+ Assert(routine->relation_vacuum != NULL);
+ Assert(routine->scan_analyze_next_block != NULL);
+ Assert(routine->scan_analyze_next_tuple != NULL);
+ Assert(routine->index_build_range_scan != NULL);
+ Assert(routine->index_validate_scan != NULL);
+
+ Assert(routine->relation_size != NULL);
+ Assert(routine->relation_needs_toast_table != NULL);
+
+ Assert(routine->relation_estimate_size != NULL);
+
+ /* optional, but one callback implies presence of the other */
+ Assert((routine->scan_bitmap_next_block == NULL) ==
+ (routine->scan_bitmap_next_tuple == NULL));
+ Assert(routine->scan_sample_next_block != NULL);
+ Assert(routine->scan_sample_next_tuple != NULL);
+
+ return routine;
+}
+
+/* check_hook: validate new default_table_access_method */
+bool
+check_default_table_access_method(char **newval, void **extra, GucSource source)
+{
+ if (**newval == '\0')
+ {
+ GUC_check_errdetail("%s cannot be empty.",
+ "default_table_access_method");
+ return false;
+ }
+
+ if (strlen(*newval) >= NAMEDATALEN)
+ {
+ GUC_check_errdetail("%s is too long (maximum %d characters).",
+ "default_table_access_method", NAMEDATALEN - 1);
+ return false;
+ }
+
+ /*
+ * If we aren't inside a transaction, or not connected to a database, we
+ * cannot do the catalog access necessary to verify the method. Must
+ * accept the value on faith.
+ */
+ if (IsTransactionState() && MyDatabaseId != InvalidOid)
+ {
+ if (!OidIsValid(get_table_am_oid(*newval, true)))
+ {
+ /*
+ * When source == PGC_S_TEST, don't throw a hard error for a
+ * nonexistent table access method, only a NOTICE. See comments in
+ * guc.h.
+ */
+ if (source == PGC_S_TEST)
+ {
+ ereport(NOTICE,
+ (errcode(ERRCODE_UNDEFINED_OBJECT),
+ errmsg("table access method \"%s\" does not exist",
+ *newval)));
+ }
+ else
+ {
+ GUC_check_errdetail("Table access method \"%s\" does not exist.",
+ *newval);
+ return false;
+ }
+ }
+ }
+
+ return true;
+}
diff --git a/src/backend/access/table/toast_helper.c b/src/backend/access/table/toast_helper.c
new file mode 100644
index 0000000..0cc5a30
--- /dev/null
+++ b/src/backend/access/table/toast_helper.c
@@ -0,0 +1,337 @@
+/*-------------------------------------------------------------------------
+ *
+ * toast_helper.c
+ * Helper functions for table AMs implementing compressed or
+ * out-of-line storage of varlena attributes.
+ *
+ * Copyright (c) 2000-2022, PostgreSQL Global Development Group
+ *
+ * IDENTIFICATION
+ * src/backend/access/table/toast_helper.c
+ *
+ *-------------------------------------------------------------------------
+ */
+
+#include "postgres.h"
+
+#include "access/detoast.h"
+#include "access/table.h"
+#include "access/toast_helper.h"
+#include "access/toast_internals.h"
+#include "catalog/pg_type_d.h"
+
+
+/*
+ * Prepare to TOAST a tuple.
+ *
+ * tupleDesc, toast_values, and toast_isnull are required parameters; they
+ * provide the necessary details about the tuple to be toasted.
+ *
+ * toast_oldvalues and toast_oldisnull should be NULL for a newly-inserted
+ * tuple; for an update, they should describe the existing tuple.
+ *
+ * All of these arrays should have a length equal to tupleDesc->natts.
+ *
+ * On return, toast_flags and toast_attr will have been initialized.
+ * toast_flags is just a single uint8, but toast_attr is a caller-provided
+ * array with a length equal to tupleDesc->natts. The caller need not
+ * perform any initialization of the array before calling this function.
+ */
+void
+toast_tuple_init(ToastTupleContext *ttc)
+{
+ TupleDesc tupleDesc = ttc->ttc_rel->rd_att;
+ int numAttrs = tupleDesc->natts;
+ int i;
+
+ ttc->ttc_flags = 0;
+
+ for (i = 0; i < numAttrs; i++)
+ {
+ Form_pg_attribute att = TupleDescAttr(tupleDesc, i);
+ struct varlena *old_value;
+ struct varlena *new_value;
+
+ ttc->ttc_attr[i].tai_colflags = 0;
+ ttc->ttc_attr[i].tai_oldexternal = NULL;
+ ttc->ttc_attr[i].tai_compression = att->attcompression;
+
+ if (ttc->ttc_oldvalues != NULL)
+ {
+ /*
+ * For UPDATE get the old and new values of this attribute
+ */
+ old_value =
+ (struct varlena *) DatumGetPointer(ttc->ttc_oldvalues[i]);
+ new_value =
+ (struct varlena *) DatumGetPointer(ttc->ttc_values[i]);
+
+ /*
+ * If the old value is stored on disk, check if it has changed so
+ * we have to delete it later.
+ */
+ if (att->attlen == -1 && !ttc->ttc_oldisnull[i] &&
+ VARATT_IS_EXTERNAL_ONDISK(old_value))
+ {
+ if (ttc->ttc_isnull[i] ||
+ !VARATT_IS_EXTERNAL_ONDISK(new_value) ||
+ memcmp((char *) old_value, (char *) new_value,
+ VARSIZE_EXTERNAL(old_value)) != 0)
+ {
+ /*
+ * The old external stored value isn't needed any more
+ * after the update
+ */
+ ttc->ttc_attr[i].tai_colflags |= TOASTCOL_NEEDS_DELETE_OLD;
+ ttc->ttc_flags |= TOAST_NEEDS_DELETE_OLD;
+ }
+ else
+ {
+ /*
+ * This attribute isn't changed by this update so we reuse
+ * the original reference to the old value in the new
+ * tuple.
+ */
+ ttc->ttc_attr[i].tai_colflags |= TOASTCOL_IGNORE;
+ continue;
+ }
+ }
+ }
+ else
+ {
+ /*
+ * For INSERT simply get the new value
+ */
+ new_value = (struct varlena *) DatumGetPointer(ttc->ttc_values[i]);
+ }
+
+ /*
+ * Handle NULL attributes
+ */
+ if (ttc->ttc_isnull[i])
+ {
+ ttc->ttc_attr[i].tai_colflags |= TOASTCOL_IGNORE;
+ ttc->ttc_flags |= TOAST_HAS_NULLS;
+ continue;
+ }
+
+ /*
+ * Now look at varlena attributes
+ */
+ if (att->attlen == -1)
+ {
+ /*
+ * If the table's attribute says PLAIN always, force it so.
+ */
+ if (att->attstorage == TYPSTORAGE_PLAIN)
+ ttc->ttc_attr[i].tai_colflags |= TOASTCOL_IGNORE;
+
+ /*
+ * We took care of UPDATE above, so any external value we find
+ * still in the tuple must be someone else's that we cannot reuse
+ * (this includes the case of an out-of-line in-memory datum).
+ * Fetch it back (without decompression, unless we are forcing
+ * PLAIN storage). If necessary, we'll push it out as a new
+ * external value below.
+ */
+ if (VARATT_IS_EXTERNAL(new_value))
+ {
+ ttc->ttc_attr[i].tai_oldexternal = new_value;
+ if (att->attstorage == TYPSTORAGE_PLAIN)
+ new_value = detoast_attr(new_value);
+ else
+ new_value = detoast_external_attr(new_value);
+ ttc->ttc_values[i] = PointerGetDatum(new_value);
+ ttc->ttc_attr[i].tai_colflags |= TOASTCOL_NEEDS_FREE;
+ ttc->ttc_flags |= (TOAST_NEEDS_CHANGE | TOAST_NEEDS_FREE);
+ }
+
+ /*
+ * Remember the size of this attribute
+ */
+ ttc->ttc_attr[i].tai_size = VARSIZE_ANY(new_value);
+ }
+ else
+ {
+ /*
+ * Not a varlena attribute, plain storage always
+ */
+ ttc->ttc_attr[i].tai_colflags |= TOASTCOL_IGNORE;
+ }
+ }
+}
+
+/*
+ * Find the largest varlena attribute that satisfies certain criteria.
+ *
+ * The relevant column must not be marked TOASTCOL_IGNORE, and if the
+ * for_compression flag is passed as true, it must also not be marked
+ * TOASTCOL_INCOMPRESSIBLE.
+ *
+ * The column must have attstorage EXTERNAL or EXTENDED if check_main is
+ * false, and must have attstorage MAIN if check_main is true.
+ *
+ * The column must have a minimum size of MAXALIGN(TOAST_POINTER_SIZE);
+ * if not, no benefit is to be expected by compressing it.
+ *
+ * The return value is the index of the biggest suitable column, or
+ * -1 if there is none.
+ */
+int
+toast_tuple_find_biggest_attribute(ToastTupleContext *ttc,
+ bool for_compression, bool check_main)
+{
+ TupleDesc tupleDesc = ttc->ttc_rel->rd_att;
+ int numAttrs = tupleDesc->natts;
+ int biggest_attno = -1;
+ int32 biggest_size = MAXALIGN(TOAST_POINTER_SIZE);
+ int32 skip_colflags = TOASTCOL_IGNORE;
+ int i;
+
+ if (for_compression)
+ skip_colflags |= TOASTCOL_INCOMPRESSIBLE;
+
+ for (i = 0; i < numAttrs; i++)
+ {
+ Form_pg_attribute att = TupleDescAttr(tupleDesc, i);
+
+ if ((ttc->ttc_attr[i].tai_colflags & skip_colflags) != 0)
+ continue;
+ if (VARATT_IS_EXTERNAL(DatumGetPointer(ttc->ttc_values[i])))
+ continue; /* can't happen, toast_action would be PLAIN */
+ if (for_compression &&
+ VARATT_IS_COMPRESSED(DatumGetPointer(ttc->ttc_values[i])))
+ continue;
+ if (check_main && att->attstorage != TYPSTORAGE_MAIN)
+ continue;
+ if (!check_main && att->attstorage != TYPSTORAGE_EXTENDED &&
+ att->attstorage != TYPSTORAGE_EXTERNAL)
+ continue;
+
+ if (ttc->ttc_attr[i].tai_size > biggest_size)
+ {
+ biggest_attno = i;
+ biggest_size = ttc->ttc_attr[i].tai_size;
+ }
+ }
+
+ return biggest_attno;
+}
+
+/*
+ * Try compression for an attribute.
+ *
+ * If we find that the attribute is not compressible, mark it so.
+ */
+void
+toast_tuple_try_compression(ToastTupleContext *ttc, int attribute)
+{
+ Datum *value = &ttc->ttc_values[attribute];
+ Datum new_value;
+ ToastAttrInfo *attr = &ttc->ttc_attr[attribute];
+
+ new_value = toast_compress_datum(*value, attr->tai_compression);
+
+ if (DatumGetPointer(new_value) != NULL)
+ {
+ /* successful compression */
+ if ((attr->tai_colflags & TOASTCOL_NEEDS_FREE) != 0)
+ pfree(DatumGetPointer(*value));
+ *value = new_value;
+ attr->tai_colflags |= TOASTCOL_NEEDS_FREE;
+ attr->tai_size = VARSIZE(DatumGetPointer(*value));
+ ttc->ttc_flags |= (TOAST_NEEDS_CHANGE | TOAST_NEEDS_FREE);
+ }
+ else
+ {
+ /* incompressible, ignore on subsequent compression passes */
+ attr->tai_colflags |= TOASTCOL_INCOMPRESSIBLE;
+ }
+}
+
+/*
+ * Move an attribute to external storage.
+ */
+void
+toast_tuple_externalize(ToastTupleContext *ttc, int attribute, int options)
+{
+ Datum *value = &ttc->ttc_values[attribute];
+ Datum old_value = *value;
+ ToastAttrInfo *attr = &ttc->ttc_attr[attribute];
+
+ attr->tai_colflags |= TOASTCOL_IGNORE;
+ *value = toast_save_datum(ttc->ttc_rel, old_value, attr->tai_oldexternal,
+ options);
+ if ((attr->tai_colflags & TOASTCOL_NEEDS_FREE) != 0)
+ pfree(DatumGetPointer(old_value));
+ attr->tai_colflags |= TOASTCOL_NEEDS_FREE;
+ ttc->ttc_flags |= (TOAST_NEEDS_CHANGE | TOAST_NEEDS_FREE);
+}
+
+/*
+ * Perform appropriate cleanup after one tuple has been subjected to TOAST.
+ */
+void
+toast_tuple_cleanup(ToastTupleContext *ttc)
+{
+ TupleDesc tupleDesc = ttc->ttc_rel->rd_att;
+ int numAttrs = tupleDesc->natts;
+
+ /*
+ * Free allocated temp values
+ */
+ if ((ttc->ttc_flags & TOAST_NEEDS_FREE) != 0)
+ {
+ int i;
+
+ for (i = 0; i < numAttrs; i++)
+ {
+ ToastAttrInfo *attr = &ttc->ttc_attr[i];
+
+ if ((attr->tai_colflags & TOASTCOL_NEEDS_FREE) != 0)
+ pfree(DatumGetPointer(ttc->ttc_values[i]));
+ }
+ }
+
+ /*
+ * Delete external values from the old tuple
+ */
+ if ((ttc->ttc_flags & TOAST_NEEDS_DELETE_OLD) != 0)
+ {
+ int i;
+
+ for (i = 0; i < numAttrs; i++)
+ {
+ ToastAttrInfo *attr = &ttc->ttc_attr[i];
+
+ if ((attr->tai_colflags & TOASTCOL_NEEDS_DELETE_OLD) != 0)
+ toast_delete_datum(ttc->ttc_rel, ttc->ttc_oldvalues[i], false);
+ }
+ }
+}
+
+/*
+ * Check for external stored attributes and delete them from the secondary
+ * relation.
+ */
+void
+toast_delete_external(Relation rel, Datum *values, bool *isnull,
+ bool is_speculative)
+{
+ TupleDesc tupleDesc = rel->rd_att;
+ int numAttrs = tupleDesc->natts;
+ int i;
+
+ for (i = 0; i < numAttrs; i++)
+ {
+ if (TupleDescAttr(tupleDesc, i)->attlen == -1)
+ {
+ Datum value = values[i];
+
+ if (isnull[i])
+ continue;
+ else if (VARATT_IS_EXTERNAL_ONDISK(PointerGetDatum(value)))
+ toast_delete_datum(rel, value, is_speculative);
+ }
+ }
+}