summaryrefslogtreecommitdiffstats
path: root/src/backend/access/common
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-05-04 12:15:05 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-05-04 12:15:05 +0000
commit46651ce6fe013220ed397add242004d764fc0153 (patch)
tree6e5299f990f88e60174a1d3ae6e48eedd2688b2b /src/backend/access/common
parentInitial commit. (diff)
downloadpostgresql-14-46651ce6fe013220ed397add242004d764fc0153.tar.xz
postgresql-14-46651ce6fe013220ed397add242004d764fc0153.zip
Adding upstream version 14.5.upstream/14.5upstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'src/backend/access/common')
-rw-r--r--src/backend/access/common/Makefile33
-rw-r--r--src/backend/access/common/attmap.c324
-rw-r--r--src/backend/access/common/bufmask.c130
-rw-r--r--src/backend/access/common/detoast.c646
-rw-r--r--src/backend/access/common/heaptuple.c1501
-rw-r--r--src/backend/access/common/indextuple.c589
-rw-r--r--src/backend/access/common/printsimple.c132
-rw-r--r--src/backend/access/common/printtup.c485
-rw-r--r--src/backend/access/common/relation.c217
-rw-r--r--src/backend/access/common/reloptions.c2131
-rw-r--r--src/backend/access/common/scankey.c117
-rw-r--r--src/backend/access/common/session.c208
-rw-r--r--src/backend/access/common/syncscan.c322
-rw-r--r--src/backend/access/common/toast_compression.c318
-rw-r--r--src/backend/access/common/toast_internals.c664
-rw-r--r--src/backend/access/common/tupconvert.c293
-rw-r--r--src/backend/access/common/tupdesc.c912
17 files changed, 9022 insertions, 0 deletions
diff --git a/src/backend/access/common/Makefile b/src/backend/access/common/Makefile
new file mode 100644
index 0000000..b9aff0c
--- /dev/null
+++ b/src/backend/access/common/Makefile
@@ -0,0 +1,33 @@
+#-------------------------------------------------------------------------
+#
+# Makefile--
+# Makefile for access/common
+#
+# IDENTIFICATION
+# src/backend/access/common/Makefile
+#
+#-------------------------------------------------------------------------
+
+subdir = src/backend/access/common
+top_builddir = ../../../..
+include $(top_builddir)/src/Makefile.global
+
+OBJS = \
+ attmap.o \
+ bufmask.o \
+ detoast.o \
+ heaptuple.o \
+ indextuple.o \
+ printsimple.o \
+ printtup.o \
+ relation.o \
+ reloptions.o \
+ scankey.o \
+ session.o \
+ syncscan.o \
+ toast_compression.o \
+ toast_internals.o \
+ tupconvert.o \
+ tupdesc.o
+
+include $(top_srcdir)/src/backend/common.mk
diff --git a/src/backend/access/common/attmap.c b/src/backend/access/common/attmap.c
new file mode 100644
index 0000000..32405f8
--- /dev/null
+++ b/src/backend/access/common/attmap.c
@@ -0,0 +1,324 @@
+/*-------------------------------------------------------------------------
+ *
+ * attmap.c
+ * Attribute mapping support.
+ *
+ * This file provides utility routines to build and manage attribute
+ * mappings by comparing input and output TupleDescs. Such mappings
+ * are typically used by DDL operating on inheritance and partition trees
+ * to do a conversion between rowtypes logically equivalent but with
+ * columns in a different order, taking into account dropped columns.
+ * They are also used by the tuple conversion routines in tupconvert.c.
+ *
+ * Portions Copyright (c) 1996-2021, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ *
+ * IDENTIFICATION
+ * src/backend/access/common/attmap.c
+ *
+ *-------------------------------------------------------------------------
+ */
+
+#include "postgres.h"
+
+#include "access/attmap.h"
+#include "access/htup_details.h"
+#include "utils/builtins.h"
+
+
+static bool check_attrmap_match(TupleDesc indesc,
+ TupleDesc outdesc,
+ AttrMap *attrMap);
+
+/*
+ * make_attrmap
+ *
+ * Utility routine to allocate an attribute map in the current memory
+ * context.
+ */
+AttrMap *
+make_attrmap(int maplen)
+{
+ AttrMap *res;
+
+ res = (AttrMap *) palloc0(sizeof(AttrMap));
+ res->maplen = maplen;
+ res->attnums = (AttrNumber *) palloc0(sizeof(AttrNumber) * maplen);
+ return res;
+}
+
+/*
+ * free_attrmap
+ *
+ * Utility routine to release an attribute map.
+ */
+void
+free_attrmap(AttrMap *map)
+{
+ pfree(map->attnums);
+ pfree(map);
+}
+
+/*
+ * build_attrmap_by_position
+ *
+ * Return a palloc'd bare attribute map for tuple conversion, matching input
+ * and output columns by position. Dropped columns are ignored in both input
+ * and output, marked as 0. This is normally a subroutine for
+ * convert_tuples_by_position in tupconvert.c, but it can be used standalone.
+ *
+ * Note: the errdetail messages speak of indesc as the "returned" rowtype,
+ * outdesc as the "expected" rowtype. This is okay for current uses but
+ * might need generalization in future.
+ */
+AttrMap *
+build_attrmap_by_position(TupleDesc indesc,
+ TupleDesc outdesc,
+ const char *msg)
+{
+ AttrMap *attrMap;
+ int nincols;
+ int noutcols;
+ int n;
+ int i;
+ int j;
+ bool same;
+
+ /*
+ * The length is computed as the number of attributes of the expected
+ * rowtype as it includes dropped attributes in its count.
+ */
+ n = outdesc->natts;
+ attrMap = make_attrmap(n);
+
+ j = 0; /* j is next physical input attribute */
+ nincols = noutcols = 0; /* these count non-dropped attributes */
+ same = true;
+ for (i = 0; i < n; i++)
+ {
+ Form_pg_attribute att = TupleDescAttr(outdesc, i);
+ Oid atttypid;
+ int32 atttypmod;
+
+ if (att->attisdropped)
+ continue; /* attrMap->attnums[i] is already 0 */
+ noutcols++;
+ atttypid = att->atttypid;
+ atttypmod = att->atttypmod;
+ for (; j < indesc->natts; j++)
+ {
+ att = TupleDescAttr(indesc, j);
+ if (att->attisdropped)
+ continue;
+ nincols++;
+
+ /* Found matching column, now check type */
+ if (atttypid != att->atttypid ||
+ (atttypmod != att->atttypmod && atttypmod >= 0))
+ ereport(ERROR,
+ (errcode(ERRCODE_DATATYPE_MISMATCH),
+ errmsg_internal("%s", _(msg)),
+ errdetail("Returned type %s does not match expected type %s in column %d.",
+ format_type_with_typemod(att->atttypid,
+ att->atttypmod),
+ format_type_with_typemod(atttypid,
+ atttypmod),
+ noutcols)));
+ attrMap->attnums[i] = (AttrNumber) (j + 1);
+ j++;
+ break;
+ }
+ if (attrMap->attnums[i] == 0)
+ same = false; /* we'll complain below */
+ }
+
+ /* Check for unused input columns */
+ for (; j < indesc->natts; j++)
+ {
+ if (TupleDescAttr(indesc, j)->attisdropped)
+ continue;
+ nincols++;
+ same = false; /* we'll complain below */
+ }
+
+ /* Report column count mismatch using the non-dropped-column counts */
+ if (!same)
+ ereport(ERROR,
+ (errcode(ERRCODE_DATATYPE_MISMATCH),
+ errmsg_internal("%s", _(msg)),
+ errdetail("Number of returned columns (%d) does not match "
+ "expected column count (%d).",
+ nincols, noutcols)));
+
+ /* Check if the map has a one-to-one match */
+ if (check_attrmap_match(indesc, outdesc, attrMap))
+ {
+ /* Runtime conversion is not needed */
+ free_attrmap(attrMap);
+ return NULL;
+ }
+
+ return attrMap;
+}
+
+/*
+ * build_attrmap_by_name
+ *
+ * Return a palloc'd bare attribute map for tuple conversion, matching input
+ * and output columns by name. (Dropped columns are ignored in both input and
+ * output.) This is normally a subroutine for convert_tuples_by_name in
+ * tupconvert.c, but can be used standalone.
+ */
+AttrMap *
+build_attrmap_by_name(TupleDesc indesc,
+ TupleDesc outdesc)
+{
+ AttrMap *attrMap;
+ int outnatts;
+ int innatts;
+ int i;
+ int nextindesc = -1;
+
+ outnatts = outdesc->natts;
+ innatts = indesc->natts;
+
+ attrMap = make_attrmap(outnatts);
+ for (i = 0; i < outnatts; i++)
+ {
+ Form_pg_attribute outatt = TupleDescAttr(outdesc, i);
+ char *attname;
+ Oid atttypid;
+ int32 atttypmod;
+ int j;
+
+ if (outatt->attisdropped)
+ continue; /* attrMap->attnums[i] is already 0 */
+ attname = NameStr(outatt->attname);
+ atttypid = outatt->atttypid;
+ atttypmod = outatt->atttypmod;
+
+ /*
+ * Now search for an attribute with the same name in the indesc. It
+ * seems likely that a partitioned table will have the attributes in
+ * the same order as the partition, so the search below is optimized
+ * for that case. It is possible that columns are dropped in one of
+ * the relations, but not the other, so we use the 'nextindesc'
+ * counter to track the starting point of the search. If the inner
+ * loop encounters dropped columns then it will have to skip over
+ * them, but it should leave 'nextindesc' at the correct position for
+ * the next outer loop.
+ */
+ for (j = 0; j < innatts; j++)
+ {
+ Form_pg_attribute inatt;
+
+ nextindesc++;
+ if (nextindesc >= innatts)
+ nextindesc = 0;
+
+ inatt = TupleDescAttr(indesc, nextindesc);
+ if (inatt->attisdropped)
+ continue;
+ if (strcmp(attname, NameStr(inatt->attname)) == 0)
+ {
+ /* Found it, check type */
+ if (atttypid != inatt->atttypid || atttypmod != inatt->atttypmod)
+ ereport(ERROR,
+ (errcode(ERRCODE_DATATYPE_MISMATCH),
+ errmsg("could not convert row type"),
+ errdetail("Attribute \"%s\" of type %s does not match corresponding attribute of type %s.",
+ attname,
+ format_type_be(outdesc->tdtypeid),
+ format_type_be(indesc->tdtypeid))));
+ attrMap->attnums[i] = inatt->attnum;
+ break;
+ }
+ }
+ if (attrMap->attnums[i] == 0)
+ ereport(ERROR,
+ (errcode(ERRCODE_DATATYPE_MISMATCH),
+ errmsg("could not convert row type"),
+ errdetail("Attribute \"%s\" of type %s does not exist in type %s.",
+ attname,
+ format_type_be(outdesc->tdtypeid),
+ format_type_be(indesc->tdtypeid))));
+ }
+ return attrMap;
+}
+
+/*
+ * build_attrmap_by_name_if_req
+ *
+ * Returns mapping created by build_attrmap_by_name, or NULL if no
+ * conversion is required. This is a convenience routine for
+ * convert_tuples_by_name() in tupconvert.c and other functions, but it
+ * can be used standalone.
+ */
+AttrMap *
+build_attrmap_by_name_if_req(TupleDesc indesc,
+ TupleDesc outdesc)
+{
+ AttrMap *attrMap;
+
+ /* Verify compatibility and prepare attribute-number map */
+ attrMap = build_attrmap_by_name(indesc, outdesc);
+
+ /* Check if the map has a one-to-one match */
+ if (check_attrmap_match(indesc, outdesc, attrMap))
+ {
+ /* Runtime conversion is not needed */
+ free_attrmap(attrMap);
+ return NULL;
+ }
+
+ return attrMap;
+}
+
+/*
+ * check_attrmap_match
+ *
+ * Check to see if the map is a one-to-one match, in which case we need
+ * not to do a tuple conversion, and the attribute map is not necessary.
+ */
+static bool
+check_attrmap_match(TupleDesc indesc,
+ TupleDesc outdesc,
+ AttrMap *attrMap)
+{
+ int i;
+
+ /* no match if attribute numbers are not the same */
+ if (indesc->natts != outdesc->natts)
+ return false;
+
+ for (i = 0; i < attrMap->maplen; i++)
+ {
+ Form_pg_attribute inatt = TupleDescAttr(indesc, i);
+ Form_pg_attribute outatt = TupleDescAttr(outdesc, i);
+
+ /*
+ * If the input column has a missing attribute, we need a conversion.
+ */
+ if (inatt->atthasmissing)
+ return false;
+
+ if (attrMap->attnums[i] == (i + 1))
+ continue;
+
+ /*
+ * If it's a dropped column and the corresponding input column is also
+ * dropped, we don't need a conversion. However, attlen and attalign
+ * must agree.
+ */
+ if (attrMap->attnums[i] == 0 &&
+ inatt->attisdropped &&
+ inatt->attlen == outatt->attlen &&
+ inatt->attalign == outatt->attalign)
+ continue;
+
+ return false;
+ }
+
+ return true;
+}
diff --git a/src/backend/access/common/bufmask.c b/src/backend/access/common/bufmask.c
new file mode 100644
index 0000000..003a0be
--- /dev/null
+++ b/src/backend/access/common/bufmask.c
@@ -0,0 +1,130 @@
+/*-------------------------------------------------------------------------
+ *
+ * bufmask.c
+ * Routines for buffer masking. Used to mask certain bits
+ * in a page which can be different when the WAL is generated
+ * and when the WAL is applied.
+ *
+ * Portions Copyright (c) 2016-2021, PostgreSQL Global Development Group
+ *
+ * Contains common routines required for masking a page.
+ *
+ * IDENTIFICATION
+ * src/backend/access/common/bufmask.c
+ *
+ *-------------------------------------------------------------------------
+ */
+
+#include "postgres.h"
+
+#include "access/bufmask.h"
+
+/*
+ * mask_page_lsn_and_checksum
+ *
+ * In consistency checks, the LSN of the two pages compared will likely be
+ * different because of concurrent operations when the WAL is generated and
+ * the state of the page when WAL is applied. Also, mask out checksum as
+ * masking anything else on page means checksum is not going to match as well.
+ */
+void
+mask_page_lsn_and_checksum(Page page)
+{
+ PageHeader phdr = (PageHeader) page;
+
+ PageXLogRecPtrSet(phdr->pd_lsn, (uint64) MASK_MARKER);
+ phdr->pd_checksum = MASK_MARKER;
+}
+
+/*
+ * mask_page_hint_bits
+ *
+ * Mask hint bits in PageHeader. We want to ignore differences in hint bits,
+ * since they can be set without emitting any WAL.
+ */
+void
+mask_page_hint_bits(Page page)
+{
+ PageHeader phdr = (PageHeader) page;
+
+ /* Ignore prune_xid (it's like a hint-bit) */
+ phdr->pd_prune_xid = MASK_MARKER;
+
+ /* Ignore PD_PAGE_FULL and PD_HAS_FREE_LINES flags, they are just hints. */
+ PageClearFull(page);
+ PageClearHasFreeLinePointers(page);
+
+ /*
+ * During replay, if the page LSN has advanced past our XLOG record's LSN,
+ * we don't mark the page all-visible. See heap_xlog_visible() for
+ * details.
+ */
+ PageClearAllVisible(page);
+}
+
+/*
+ * mask_unused_space
+ *
+ * Mask the unused space of a page between pd_lower and pd_upper.
+ */
+void
+mask_unused_space(Page page)
+{
+ int pd_lower = ((PageHeader) page)->pd_lower;
+ int pd_upper = ((PageHeader) page)->pd_upper;
+ int pd_special = ((PageHeader) page)->pd_special;
+
+ /* Sanity check */
+ if (pd_lower > pd_upper || pd_special < pd_upper ||
+ pd_lower < SizeOfPageHeaderData || pd_special > BLCKSZ)
+ {
+ elog(ERROR, "invalid page pd_lower %u pd_upper %u pd_special %u\n",
+ pd_lower, pd_upper, pd_special);
+ }
+
+ memset(page + pd_lower, MASK_MARKER, pd_upper - pd_lower);
+}
+
+/*
+ * mask_lp_flags
+ *
+ * In some index AMs, line pointer flags can be modified on the primary
+ * without emitting any WAL record.
+ */
+void
+mask_lp_flags(Page page)
+{
+ OffsetNumber offnum,
+ maxoff;
+
+ maxoff = PageGetMaxOffsetNumber(page);
+ for (offnum = FirstOffsetNumber;
+ offnum <= maxoff;
+ offnum = OffsetNumberNext(offnum))
+ {
+ ItemId itemId = PageGetItemId(page, offnum);
+
+ if (ItemIdIsUsed(itemId))
+ itemId->lp_flags = LP_UNUSED;
+ }
+}
+
+/*
+ * mask_page_content
+ *
+ * In some index AMs, the contents of deleted pages need to be almost
+ * completely ignored.
+ */
+void
+mask_page_content(Page page)
+{
+ /* Mask Page Content */
+ memset(page + SizeOfPageHeaderData, MASK_MARKER,
+ BLCKSZ - SizeOfPageHeaderData);
+
+ /* Mask pd_lower and pd_upper */
+ memset(&((PageHeader) page)->pd_lower, MASK_MARKER,
+ sizeof(uint16));
+ memset(&((PageHeader) page)->pd_upper, MASK_MARKER,
+ sizeof(uint16));
+}
diff --git a/src/backend/access/common/detoast.c b/src/backend/access/common/detoast.c
new file mode 100644
index 0000000..545a6b8
--- /dev/null
+++ b/src/backend/access/common/detoast.c
@@ -0,0 +1,646 @@
+/*-------------------------------------------------------------------------
+ *
+ * detoast.c
+ * Retrieve compressed or external variable size attributes.
+ *
+ * Copyright (c) 2000-2021, PostgreSQL Global Development Group
+ *
+ * IDENTIFICATION
+ * src/backend/access/common/detoast.c
+ *
+ *-------------------------------------------------------------------------
+ */
+
+#include "postgres.h"
+
+#include "access/detoast.h"
+#include "access/table.h"
+#include "access/tableam.h"
+#include "access/toast_internals.h"
+#include "common/int.h"
+#include "common/pg_lzcompress.h"
+#include "utils/expandeddatum.h"
+#include "utils/rel.h"
+
+static struct varlena *toast_fetch_datum(struct varlena *attr);
+static struct varlena *toast_fetch_datum_slice(struct varlena *attr,
+ int32 sliceoffset,
+ int32 slicelength);
+static struct varlena *toast_decompress_datum(struct varlena *attr);
+static struct varlena *toast_decompress_datum_slice(struct varlena *attr, int32 slicelength);
+
+/* ----------
+ * detoast_external_attr -
+ *
+ * Public entry point to get back a toasted value from
+ * external source (possibly still in compressed format).
+ *
+ * This will return a datum that contains all the data internally, ie, not
+ * relying on external storage or memory, but it can still be compressed or
+ * have a short header. Note some callers assume that if the input is an
+ * EXTERNAL datum, the result will be a pfree'able chunk.
+ * ----------
+ */
+struct varlena *
+detoast_external_attr(struct varlena *attr)
+{
+ struct varlena *result;
+
+ if (VARATT_IS_EXTERNAL_ONDISK(attr))
+ {
+ /*
+ * This is an external stored plain value
+ */
+ result = toast_fetch_datum(attr);
+ }
+ else if (VARATT_IS_EXTERNAL_INDIRECT(attr))
+ {
+ /*
+ * This is an indirect pointer --- dereference it
+ */
+ struct varatt_indirect redirect;
+
+ VARATT_EXTERNAL_GET_POINTER(redirect, attr);
+ attr = (struct varlena *) redirect.pointer;
+
+ /* nested indirect Datums aren't allowed */
+ Assert(!VARATT_IS_EXTERNAL_INDIRECT(attr));
+
+ /* recurse if value is still external in some other way */
+ if (VARATT_IS_EXTERNAL(attr))
+ return detoast_external_attr(attr);
+
+ /*
+ * Copy into the caller's memory context, in case caller tries to
+ * pfree the result.
+ */
+ result = (struct varlena *) palloc(VARSIZE_ANY(attr));
+ memcpy(result, attr, VARSIZE_ANY(attr));
+ }
+ else if (VARATT_IS_EXTERNAL_EXPANDED(attr))
+ {
+ /*
+ * This is an expanded-object pointer --- get flat format
+ */
+ ExpandedObjectHeader *eoh;
+ Size resultsize;
+
+ eoh = DatumGetEOHP(PointerGetDatum(attr));
+ resultsize = EOH_get_flat_size(eoh);
+ result = (struct varlena *) palloc(resultsize);
+ EOH_flatten_into(eoh, (void *) result, resultsize);
+ }
+ else
+ {
+ /*
+ * This is a plain value inside of the main tuple - why am I called?
+ */
+ result = attr;
+ }
+
+ return result;
+}
+
+
+/* ----------
+ * detoast_attr -
+ *
+ * Public entry point to get back a toasted value from compression
+ * or external storage. The result is always non-extended varlena form.
+ *
+ * Note some callers assume that if the input is an EXTERNAL or COMPRESSED
+ * datum, the result will be a pfree'able chunk.
+ * ----------
+ */
+struct varlena *
+detoast_attr(struct varlena *attr)
+{
+ if (VARATT_IS_EXTERNAL_ONDISK(attr))
+ {
+ /*
+ * This is an externally stored datum --- fetch it back from there
+ */
+ attr = toast_fetch_datum(attr);
+ /* If it's compressed, decompress it */
+ if (VARATT_IS_COMPRESSED(attr))
+ {
+ struct varlena *tmp = attr;
+
+ attr = toast_decompress_datum(tmp);
+ pfree(tmp);
+ }
+ }
+ else if (VARATT_IS_EXTERNAL_INDIRECT(attr))
+ {
+ /*
+ * This is an indirect pointer --- dereference it
+ */
+ struct varatt_indirect redirect;
+
+ VARATT_EXTERNAL_GET_POINTER(redirect, attr);
+ attr = (struct varlena *) redirect.pointer;
+
+ /* nested indirect Datums aren't allowed */
+ Assert(!VARATT_IS_EXTERNAL_INDIRECT(attr));
+
+ /* recurse in case value is still extended in some other way */
+ attr = detoast_attr(attr);
+
+ /* if it isn't, we'd better copy it */
+ if (attr == (struct varlena *) redirect.pointer)
+ {
+ struct varlena *result;
+
+ result = (struct varlena *) palloc(VARSIZE_ANY(attr));
+ memcpy(result, attr, VARSIZE_ANY(attr));
+ attr = result;
+ }
+ }
+ else if (VARATT_IS_EXTERNAL_EXPANDED(attr))
+ {
+ /*
+ * This is an expanded-object pointer --- get flat format
+ */
+ attr = detoast_external_attr(attr);
+ /* flatteners are not allowed to produce compressed/short output */
+ Assert(!VARATT_IS_EXTENDED(attr));
+ }
+ else if (VARATT_IS_COMPRESSED(attr))
+ {
+ /*
+ * This is a compressed value inside of the main tuple
+ */
+ attr = toast_decompress_datum(attr);
+ }
+ else if (VARATT_IS_SHORT(attr))
+ {
+ /*
+ * This is a short-header varlena --- convert to 4-byte header format
+ */
+ Size data_size = VARSIZE_SHORT(attr) - VARHDRSZ_SHORT;
+ Size new_size = data_size + VARHDRSZ;
+ struct varlena *new_attr;
+
+ new_attr = (struct varlena *) palloc(new_size);
+ SET_VARSIZE(new_attr, new_size);
+ memcpy(VARDATA(new_attr), VARDATA_SHORT(attr), data_size);
+ attr = new_attr;
+ }
+
+ return attr;
+}
+
+
+/* ----------
+ * detoast_attr_slice -
+ *
+ * Public entry point to get back part of a toasted value
+ * from compression or external storage.
+ *
+ * sliceoffset is where to start (zero or more)
+ * If slicelength < 0, return everything beyond sliceoffset
+ * ----------
+ */
+struct varlena *
+detoast_attr_slice(struct varlena *attr,
+ int32 sliceoffset, int32 slicelength)
+{
+ struct varlena *preslice;
+ struct varlena *result;
+ char *attrdata;
+ int32 slicelimit;
+ int32 attrsize;
+
+ if (sliceoffset < 0)
+ elog(ERROR, "invalid sliceoffset: %d", sliceoffset);
+
+ /*
+ * Compute slicelimit = offset + length, or -1 if we must fetch all of the
+ * value. In case of integer overflow, we must fetch all.
+ */
+ if (slicelength < 0)
+ slicelimit = -1;
+ else if (pg_add_s32_overflow(sliceoffset, slicelength, &slicelimit))
+ slicelength = slicelimit = -1;
+
+ if (VARATT_IS_EXTERNAL_ONDISK(attr))
+ {
+ struct varatt_external toast_pointer;
+
+ VARATT_EXTERNAL_GET_POINTER(toast_pointer, attr);
+
+ /* fast path for non-compressed external datums */
+ if (!VARATT_EXTERNAL_IS_COMPRESSED(toast_pointer))
+ return toast_fetch_datum_slice(attr, sliceoffset, slicelength);
+
+ /*
+ * For compressed values, we need to fetch enough slices to decompress
+ * at least the requested part (when a prefix is requested).
+ * Otherwise, just fetch all slices.
+ */
+ if (slicelimit >= 0)
+ {
+ int32 max_size = VARATT_EXTERNAL_GET_EXTSIZE(toast_pointer);
+
+ /*
+ * Determine maximum amount of compressed data needed for a prefix
+ * of a given length (after decompression).
+ *
+ * At least for now, if it's LZ4 data, we'll have to fetch the
+ * whole thing, because there doesn't seem to be an API call to
+ * determine how much compressed data we need to be sure of being
+ * able to decompress the required slice.
+ */
+ if (VARATT_EXTERNAL_GET_COMPRESS_METHOD(toast_pointer) ==
+ TOAST_PGLZ_COMPRESSION_ID)
+ max_size = pglz_maximum_compressed_size(slicelimit, max_size);
+
+ /*
+ * Fetch enough compressed slices (compressed marker will get set
+ * automatically).
+ */
+ preslice = toast_fetch_datum_slice(attr, 0, max_size);
+ }
+ else
+ preslice = toast_fetch_datum(attr);
+ }
+ else if (VARATT_IS_EXTERNAL_INDIRECT(attr))
+ {
+ struct varatt_indirect redirect;
+
+ VARATT_EXTERNAL_GET_POINTER(redirect, attr);
+
+ /* nested indirect Datums aren't allowed */
+ Assert(!VARATT_IS_EXTERNAL_INDIRECT(redirect.pointer));
+
+ return detoast_attr_slice(redirect.pointer,
+ sliceoffset, slicelength);
+ }
+ else if (VARATT_IS_EXTERNAL_EXPANDED(attr))
+ {
+ /* pass it off to detoast_external_attr to flatten */
+ preslice = detoast_external_attr(attr);
+ }
+ else
+ preslice = attr;
+
+ Assert(!VARATT_IS_EXTERNAL(preslice));
+
+ if (VARATT_IS_COMPRESSED(preslice))
+ {
+ struct varlena *tmp = preslice;
+
+ /* Decompress enough to encompass the slice and the offset */
+ if (slicelimit >= 0)
+ preslice = toast_decompress_datum_slice(tmp, slicelimit);
+ else
+ preslice = toast_decompress_datum(tmp);
+
+ if (tmp != attr)
+ pfree(tmp);
+ }
+
+ if (VARATT_IS_SHORT(preslice))
+ {
+ attrdata = VARDATA_SHORT(preslice);
+ attrsize = VARSIZE_SHORT(preslice) - VARHDRSZ_SHORT;
+ }
+ else
+ {
+ attrdata = VARDATA(preslice);
+ attrsize = VARSIZE(preslice) - VARHDRSZ;
+ }
+
+ /* slicing of datum for compressed cases and plain value */
+
+ if (sliceoffset >= attrsize)
+ {
+ sliceoffset = 0;
+ slicelength = 0;
+ }
+ else if (slicelength < 0 || slicelimit > attrsize)
+ slicelength = attrsize - sliceoffset;
+
+ result = (struct varlena *) palloc(slicelength + VARHDRSZ);
+ SET_VARSIZE(result, slicelength + VARHDRSZ);
+
+ memcpy(VARDATA(result), attrdata + sliceoffset, slicelength);
+
+ if (preslice != attr)
+ pfree(preslice);
+
+ return result;
+}
+
+/* ----------
+ * toast_fetch_datum -
+ *
+ * Reconstruct an in memory Datum from the chunks saved
+ * in the toast relation
+ * ----------
+ */
+static struct varlena *
+toast_fetch_datum(struct varlena *attr)
+{
+ Relation toastrel;
+ struct varlena *result;
+ struct varatt_external toast_pointer;
+ int32 attrsize;
+
+ if (!VARATT_IS_EXTERNAL_ONDISK(attr))
+ elog(ERROR, "toast_fetch_datum shouldn't be called for non-ondisk datums");
+
+ /* Must copy to access aligned fields */
+ VARATT_EXTERNAL_GET_POINTER(toast_pointer, attr);
+
+ attrsize = VARATT_EXTERNAL_GET_EXTSIZE(toast_pointer);
+
+ result = (struct varlena *) palloc(attrsize + VARHDRSZ);
+
+ if (VARATT_EXTERNAL_IS_COMPRESSED(toast_pointer))
+ SET_VARSIZE_COMPRESSED(result, attrsize + VARHDRSZ);
+ else
+ SET_VARSIZE(result, attrsize + VARHDRSZ);
+
+ if (attrsize == 0)
+ return result; /* Probably shouldn't happen, but just in
+ * case. */
+
+ /*
+ * Open the toast relation and its indexes
+ */
+ toastrel = table_open(toast_pointer.va_toastrelid, AccessShareLock);
+
+ /* Fetch all chunks */
+ table_relation_fetch_toast_slice(toastrel, toast_pointer.va_valueid,
+ attrsize, 0, attrsize, result);
+
+ /* Close toast table */
+ table_close(toastrel, AccessShareLock);
+
+ return result;
+}
+
+/* ----------
+ * toast_fetch_datum_slice -
+ *
+ * Reconstruct a segment of a Datum from the chunks saved
+ * in the toast relation
+ *
+ * Note that this function supports non-compressed external datums
+ * and compressed external datums (in which case the requested slice
+ * has to be a prefix, i.e. sliceoffset has to be 0).
+ * ----------
+ */
+static struct varlena *
+toast_fetch_datum_slice(struct varlena *attr, int32 sliceoffset,
+ int32 slicelength)
+{
+ Relation toastrel;
+ struct varlena *result;
+ struct varatt_external toast_pointer;
+ int32 attrsize;
+
+ if (!VARATT_IS_EXTERNAL_ONDISK(attr))
+ elog(ERROR, "toast_fetch_datum_slice shouldn't be called for non-ondisk datums");
+
+ /* Must copy to access aligned fields */
+ VARATT_EXTERNAL_GET_POINTER(toast_pointer, attr);
+
+ /*
+ * It's nonsense to fetch slices of a compressed datum unless when it's a
+ * prefix -- this isn't lo_* we can't return a compressed datum which is
+ * meaningful to toast later.
+ */
+ Assert(!VARATT_EXTERNAL_IS_COMPRESSED(toast_pointer) || 0 == sliceoffset);
+
+ attrsize = VARATT_EXTERNAL_GET_EXTSIZE(toast_pointer);
+
+ if (sliceoffset >= attrsize)
+ {
+ sliceoffset = 0;
+ slicelength = 0;
+ }
+
+ /*
+ * When fetching a prefix of a compressed external datum, account for the
+ * space required by va_tcinfo, which is stored at the beginning as an
+ * int32 value.
+ */
+ if (VARATT_EXTERNAL_IS_COMPRESSED(toast_pointer) && slicelength > 0)
+ slicelength = slicelength + sizeof(int32);
+
+ /*
+ * Adjust length request if needed. (Note: our sole caller,
+ * detoast_attr_slice, protects us against sliceoffset + slicelength
+ * overflowing.)
+ */
+ if (((sliceoffset + slicelength) > attrsize) || slicelength < 0)
+ slicelength = attrsize - sliceoffset;
+
+ result = (struct varlena *) palloc(slicelength + VARHDRSZ);
+
+ if (VARATT_EXTERNAL_IS_COMPRESSED(toast_pointer))
+ SET_VARSIZE_COMPRESSED(result, slicelength + VARHDRSZ);
+ else
+ SET_VARSIZE(result, slicelength + VARHDRSZ);
+
+ if (slicelength == 0)
+ return result; /* Can save a lot of work at this point! */
+
+ /* Open the toast relation */
+ toastrel = table_open(toast_pointer.va_toastrelid, AccessShareLock);
+
+ /* Fetch all chunks */
+ table_relation_fetch_toast_slice(toastrel, toast_pointer.va_valueid,
+ attrsize, sliceoffset, slicelength,
+ result);
+
+ /* Close toast table */
+ table_close(toastrel, AccessShareLock);
+
+ return result;
+}
+
+/* ----------
+ * toast_decompress_datum -
+ *
+ * Decompress a compressed version of a varlena datum
+ */
+static struct varlena *
+toast_decompress_datum(struct varlena *attr)
+{
+ ToastCompressionId cmid;
+
+ Assert(VARATT_IS_COMPRESSED(attr));
+
+ /*
+ * Fetch the compression method id stored in the compression header and
+ * decompress the data using the appropriate decompression routine.
+ */
+ cmid = TOAST_COMPRESS_METHOD(attr);
+ switch (cmid)
+ {
+ case TOAST_PGLZ_COMPRESSION_ID:
+ return pglz_decompress_datum(attr);
+ case TOAST_LZ4_COMPRESSION_ID:
+ return lz4_decompress_datum(attr);
+ default:
+ elog(ERROR, "invalid compression method id %d", cmid);
+ return NULL; /* keep compiler quiet */
+ }
+}
+
+
+/* ----------
+ * toast_decompress_datum_slice -
+ *
+ * Decompress the front of a compressed version of a varlena datum.
+ * offset handling happens in detoast_attr_slice.
+ * Here we just decompress a slice from the front.
+ */
+static struct varlena *
+toast_decompress_datum_slice(struct varlena *attr, int32 slicelength)
+{
+ ToastCompressionId cmid;
+
+ Assert(VARATT_IS_COMPRESSED(attr));
+
+ /*
+ * Some callers may pass a slicelength that's more than the actual
+ * decompressed size. If so, just decompress normally. This avoids
+ * possibly allocating a larger-than-necessary result object, and may be
+ * faster and/or more robust as well. Notably, some versions of liblz4
+ * have been seen to give wrong results if passed an output size that is
+ * more than the data's true decompressed size.
+ */
+ if ((uint32) slicelength >= TOAST_COMPRESS_EXTSIZE(attr))
+ return toast_decompress_datum(attr);
+
+ /*
+ * Fetch the compression method id stored in the compression header and
+ * decompress the data slice using the appropriate decompression routine.
+ */
+ cmid = TOAST_COMPRESS_METHOD(attr);
+ switch (cmid)
+ {
+ case TOAST_PGLZ_COMPRESSION_ID:
+ return pglz_decompress_datum_slice(attr, slicelength);
+ case TOAST_LZ4_COMPRESSION_ID:
+ return lz4_decompress_datum_slice(attr, slicelength);
+ default:
+ elog(ERROR, "invalid compression method id %d", cmid);
+ return NULL; /* keep compiler quiet */
+ }
+}
+
+/* ----------
+ * toast_raw_datum_size -
+ *
+ * Return the raw (detoasted) size of a varlena datum
+ * (including the VARHDRSZ header)
+ * ----------
+ */
+Size
+toast_raw_datum_size(Datum value)
+{
+ struct varlena *attr = (struct varlena *) DatumGetPointer(value);
+ Size result;
+
+ if (VARATT_IS_EXTERNAL_ONDISK(attr))
+ {
+ /* va_rawsize is the size of the original datum -- including header */
+ struct varatt_external toast_pointer;
+
+ VARATT_EXTERNAL_GET_POINTER(toast_pointer, attr);
+ result = toast_pointer.va_rawsize;
+ }
+ else if (VARATT_IS_EXTERNAL_INDIRECT(attr))
+ {
+ struct varatt_indirect toast_pointer;
+
+ VARATT_EXTERNAL_GET_POINTER(toast_pointer, attr);
+
+ /* nested indirect Datums aren't allowed */
+ Assert(!VARATT_IS_EXTERNAL_INDIRECT(toast_pointer.pointer));
+
+ return toast_raw_datum_size(PointerGetDatum(toast_pointer.pointer));
+ }
+ else if (VARATT_IS_EXTERNAL_EXPANDED(attr))
+ {
+ result = EOH_get_flat_size(DatumGetEOHP(value));
+ }
+ else if (VARATT_IS_COMPRESSED(attr))
+ {
+ /* here, va_rawsize is just the payload size */
+ result = VARDATA_COMPRESSED_GET_EXTSIZE(attr) + VARHDRSZ;
+ }
+ else if (VARATT_IS_SHORT(attr))
+ {
+ /*
+ * we have to normalize the header length to VARHDRSZ or else the
+ * callers of this function will be confused.
+ */
+ result = VARSIZE_SHORT(attr) - VARHDRSZ_SHORT + VARHDRSZ;
+ }
+ else
+ {
+ /* plain untoasted datum */
+ result = VARSIZE(attr);
+ }
+ return result;
+}
+
+/* ----------
+ * toast_datum_size
+ *
+ * Return the physical storage size (possibly compressed) of a varlena datum
+ * ----------
+ */
+Size
+toast_datum_size(Datum value)
+{
+ struct varlena *attr = (struct varlena *) DatumGetPointer(value);
+ Size result;
+
+ if (VARATT_IS_EXTERNAL_ONDISK(attr))
+ {
+ /*
+ * Attribute is stored externally - return the extsize whether
+ * compressed or not. We do not count the size of the toast pointer
+ * ... should we?
+ */
+ struct varatt_external toast_pointer;
+
+ VARATT_EXTERNAL_GET_POINTER(toast_pointer, attr);
+ result = VARATT_EXTERNAL_GET_EXTSIZE(toast_pointer);
+ }
+ else if (VARATT_IS_EXTERNAL_INDIRECT(attr))
+ {
+ struct varatt_indirect toast_pointer;
+
+ VARATT_EXTERNAL_GET_POINTER(toast_pointer, attr);
+
+ /* nested indirect Datums aren't allowed */
+ Assert(!VARATT_IS_EXTERNAL_INDIRECT(attr));
+
+ return toast_datum_size(PointerGetDatum(toast_pointer.pointer));
+ }
+ else if (VARATT_IS_EXTERNAL_EXPANDED(attr))
+ {
+ result = EOH_get_flat_size(DatumGetEOHP(value));
+ }
+ else if (VARATT_IS_SHORT(attr))
+ {
+ result = VARSIZE_SHORT(attr);
+ }
+ else
+ {
+ /*
+ * Attribute is stored inline either compressed or not, just calculate
+ * the size of the datum in either case.
+ */
+ result = VARSIZE(attr);
+ }
+ return result;
+}
diff --git a/src/backend/access/common/heaptuple.c b/src/backend/access/common/heaptuple.c
new file mode 100644
index 0000000..0b56b0f
--- /dev/null
+++ b/src/backend/access/common/heaptuple.c
@@ -0,0 +1,1501 @@
+/*-------------------------------------------------------------------------
+ *
+ * heaptuple.c
+ * This file contains heap tuple accessor and mutator routines, as well
+ * as various tuple utilities.
+ *
+ * Some notes about varlenas and this code:
+ *
+ * Before Postgres 8.3 varlenas always had a 4-byte length header, and
+ * therefore always needed 4-byte alignment (at least). This wasted space
+ * for short varlenas, for example CHAR(1) took 5 bytes and could need up to
+ * 3 additional padding bytes for alignment.
+ *
+ * Now, a short varlena (up to 126 data bytes) is reduced to a 1-byte header
+ * and we don't align it. To hide this from datatype-specific functions that
+ * don't want to deal with it, such a datum is considered "toasted" and will
+ * be expanded back to the normal 4-byte-header format by pg_detoast_datum.
+ * (In performance-critical code paths we can use pg_detoast_datum_packed
+ * and the appropriate access macros to avoid that overhead.) Note that this
+ * conversion is performed directly in heap_form_tuple, without invoking
+ * heaptoast.c.
+ *
+ * This change will break any code that assumes it needn't detoast values
+ * that have been put into a tuple but never sent to disk. Hopefully there
+ * are few such places.
+ *
+ * Varlenas still have alignment INT (or DOUBLE) in pg_type/pg_attribute, since
+ * that's the normal requirement for the untoasted format. But we ignore that
+ * for the 1-byte-header format. This means that the actual start position
+ * of a varlena datum may vary depending on which format it has. To determine
+ * what is stored, we have to require that alignment padding bytes be zero.
+ * (Postgres actually has always zeroed them, but now it's required!) Since
+ * the first byte of a 1-byte-header varlena can never be zero, we can examine
+ * the first byte after the previous datum to tell if it's a pad byte or the
+ * start of a 1-byte-header varlena.
+ *
+ * Note that while formerly we could rely on the first varlena column of a
+ * system catalog to be at the offset suggested by the C struct for the
+ * catalog, this is now risky: it's only safe if the preceding field is
+ * word-aligned, so that there will never be any padding.
+ *
+ * We don't pack varlenas whose attstorage is PLAIN, since the data type
+ * isn't expecting to have to detoast values. This is used in particular
+ * by oidvector and int2vector, which are used in the system catalogs
+ * and we'd like to still refer to them via C struct offsets.
+ *
+ *
+ * Portions Copyright (c) 1996-2021, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ *
+ * IDENTIFICATION
+ * src/backend/access/common/heaptuple.c
+ *
+ *-------------------------------------------------------------------------
+ */
+
+#include "postgres.h"
+
+#include "access/heaptoast.h"
+#include "access/sysattr.h"
+#include "access/tupdesc_details.h"
+#include "executor/tuptable.h"
+#include "utils/expandeddatum.h"
+
+
+/* Does att's datatype allow packing into the 1-byte-header varlena format? */
+#define ATT_IS_PACKABLE(att) \
+ ((att)->attlen == -1 && (att)->attstorage != TYPSTORAGE_PLAIN)
+/* Use this if it's already known varlena */
+#define VARLENA_ATT_IS_PACKABLE(att) \
+ ((att)->attstorage != TYPSTORAGE_PLAIN)
+
+
+/* ----------------------------------------------------------------
+ * misc support routines
+ * ----------------------------------------------------------------
+ */
+
+/*
+ * Return the missing value of an attribute, or NULL if there isn't one.
+ */
+Datum
+getmissingattr(TupleDesc tupleDesc,
+ int attnum, bool *isnull)
+{
+ Form_pg_attribute att;
+
+ Assert(attnum <= tupleDesc->natts);
+ Assert(attnum > 0);
+
+ att = TupleDescAttr(tupleDesc, attnum - 1);
+
+ if (att->atthasmissing)
+ {
+ AttrMissing *attrmiss;
+
+ Assert(tupleDesc->constr);
+ Assert(tupleDesc->constr->missing);
+
+ attrmiss = tupleDesc->constr->missing + (attnum - 1);
+
+ if (attrmiss->am_present)
+ {
+ *isnull = false;
+ return attrmiss->am_value;
+ }
+ }
+
+ *isnull = true;
+ return PointerGetDatum(NULL);
+}
+
+/*
+ * heap_compute_data_size
+ * Determine size of the data area of a tuple to be constructed
+ */
+Size
+heap_compute_data_size(TupleDesc tupleDesc,
+ Datum *values,
+ bool *isnull)
+{
+ Size data_length = 0;
+ int i;
+ int numberOfAttributes = tupleDesc->natts;
+
+ for (i = 0; i < numberOfAttributes; i++)
+ {
+ Datum val;
+ Form_pg_attribute atti;
+
+ if (isnull[i])
+ continue;
+
+ val = values[i];
+ atti = TupleDescAttr(tupleDesc, i);
+
+ if (ATT_IS_PACKABLE(atti) &&
+ VARATT_CAN_MAKE_SHORT(DatumGetPointer(val)))
+ {
+ /*
+ * we're anticipating converting to a short varlena header, so
+ * adjust length and don't count any alignment
+ */
+ data_length += VARATT_CONVERTED_SHORT_SIZE(DatumGetPointer(val));
+ }
+ else if (atti->attlen == -1 &&
+ VARATT_IS_EXTERNAL_EXPANDED(DatumGetPointer(val)))
+ {
+ /*
+ * we want to flatten the expanded value so that the constructed
+ * tuple doesn't depend on it
+ */
+ data_length = att_align_nominal(data_length, atti->attalign);
+ data_length += EOH_get_flat_size(DatumGetEOHP(val));
+ }
+ else
+ {
+ data_length = att_align_datum(data_length, atti->attalign,
+ atti->attlen, val);
+ data_length = att_addlength_datum(data_length, atti->attlen,
+ val);
+ }
+ }
+
+ return data_length;
+}
+
+/*
+ * Per-attribute helper for heap_fill_tuple and other routines building tuples.
+ *
+ * Fill in either a data value or a bit in the null bitmask
+ */
+static inline void
+fill_val(Form_pg_attribute att,
+ bits8 **bit,
+ int *bitmask,
+ char **dataP,
+ uint16 *infomask,
+ Datum datum,
+ bool isnull)
+{
+ Size data_length;
+ char *data = *dataP;
+
+ /*
+ * If we're building a null bitmap, set the appropriate bit for the
+ * current column value here.
+ */
+ if (bit != NULL)
+ {
+ if (*bitmask != HIGHBIT)
+ *bitmask <<= 1;
+ else
+ {
+ *bit += 1;
+ **bit = 0x0;
+ *bitmask = 1;
+ }
+
+ if (isnull)
+ {
+ *infomask |= HEAP_HASNULL;
+ return;
+ }
+
+ **bit |= *bitmask;
+ }
+
+ /*
+ * XXX we use the att_align macros on the pointer value itself, not on an
+ * offset. This is a bit of a hack.
+ */
+ if (att->attbyval)
+ {
+ /* pass-by-value */
+ data = (char *) att_align_nominal(data, att->attalign);
+ store_att_byval(data, datum, att->attlen);
+ data_length = att->attlen;
+ }
+ else if (att->attlen == -1)
+ {
+ /* varlena */
+ Pointer val = DatumGetPointer(datum);
+
+ *infomask |= HEAP_HASVARWIDTH;
+ if (VARATT_IS_EXTERNAL(val))
+ {
+ if (VARATT_IS_EXTERNAL_EXPANDED(val))
+ {
+ /*
+ * we want to flatten the expanded value so that the
+ * constructed tuple doesn't depend on it
+ */
+ ExpandedObjectHeader *eoh = DatumGetEOHP(datum);
+
+ data = (char *) att_align_nominal(data,
+ att->attalign);
+ data_length = EOH_get_flat_size(eoh);
+ EOH_flatten_into(eoh, data, data_length);
+ }
+ else
+ {
+ *infomask |= HEAP_HASEXTERNAL;
+ /* no alignment, since it's short by definition */
+ data_length = VARSIZE_EXTERNAL(val);
+ memcpy(data, val, data_length);
+ }
+ }
+ else if (VARATT_IS_SHORT(val))
+ {
+ /* no alignment for short varlenas */
+ data_length = VARSIZE_SHORT(val);
+ memcpy(data, val, data_length);
+ }
+ else if (VARLENA_ATT_IS_PACKABLE(att) &&
+ VARATT_CAN_MAKE_SHORT(val))
+ {
+ /* convert to short varlena -- no alignment */
+ data_length = VARATT_CONVERTED_SHORT_SIZE(val);
+ SET_VARSIZE_SHORT(data, data_length);
+ memcpy(data + 1, VARDATA(val), data_length - 1);
+ }
+ else
+ {
+ /* full 4-byte header varlena */
+ data = (char *) att_align_nominal(data,
+ att->attalign);
+ data_length = VARSIZE(val);
+ memcpy(data, val, data_length);
+ }
+ }
+ else if (att->attlen == -2)
+ {
+ /* cstring ... never needs alignment */
+ *infomask |= HEAP_HASVARWIDTH;
+ Assert(att->attalign == TYPALIGN_CHAR);
+ data_length = strlen(DatumGetCString(datum)) + 1;
+ memcpy(data, DatumGetPointer(datum), data_length);
+ }
+ else
+ {
+ /* fixed-length pass-by-reference */
+ data = (char *) att_align_nominal(data, att->attalign);
+ Assert(att->attlen > 0);
+ data_length = att->attlen;
+ memcpy(data, DatumGetPointer(datum), data_length);
+ }
+
+ data += data_length;
+ *dataP = data;
+}
+
+/*
+ * heap_fill_tuple
+ * Load data portion of a tuple from values/isnull arrays
+ *
+ * We also fill the null bitmap (if any) and set the infomask bits
+ * that reflect the tuple's data contents.
+ *
+ * NOTE: it is now REQUIRED that the caller have pre-zeroed the data area.
+ */
+void
+heap_fill_tuple(TupleDesc tupleDesc,
+ Datum *values, bool *isnull,
+ char *data, Size data_size,
+ uint16 *infomask, bits8 *bit)
+{
+ bits8 *bitP;
+ int bitmask;
+ int i;
+ int numberOfAttributes = tupleDesc->natts;
+
+#ifdef USE_ASSERT_CHECKING
+ char *start = data;
+#endif
+
+ if (bit != NULL)
+ {
+ bitP = &bit[-1];
+ bitmask = HIGHBIT;
+ }
+ else
+ {
+ /* just to keep compiler quiet */
+ bitP = NULL;
+ bitmask = 0;
+ }
+
+ *infomask &= ~(HEAP_HASNULL | HEAP_HASVARWIDTH | HEAP_HASEXTERNAL);
+
+ for (i = 0; i < numberOfAttributes; i++)
+ {
+ Form_pg_attribute attr = TupleDescAttr(tupleDesc, i);
+
+ fill_val(attr,
+ bitP ? &bitP : NULL,
+ &bitmask,
+ &data,
+ infomask,
+ values ? values[i] : PointerGetDatum(NULL),
+ isnull ? isnull[i] : true);
+ }
+
+ Assert((data - start) == data_size);
+}
+
+
+/* ----------------------------------------------------------------
+ * heap tuple interface
+ * ----------------------------------------------------------------
+ */
+
+/* ----------------
+ * heap_attisnull - returns true iff tuple attribute is not present
+ * ----------------
+ */
+bool
+heap_attisnull(HeapTuple tup, int attnum, TupleDesc tupleDesc)
+{
+ /*
+ * We allow a NULL tupledesc for relations not expected to have missing
+ * values, such as catalog relations and indexes.
+ */
+ Assert(!tupleDesc || attnum <= tupleDesc->natts);
+ if (attnum > (int) HeapTupleHeaderGetNatts(tup->t_data))
+ {
+ if (tupleDesc && TupleDescAttr(tupleDesc, attnum - 1)->atthasmissing)
+ return false;
+ else
+ return true;
+ }
+
+ if (attnum > 0)
+ {
+ if (HeapTupleNoNulls(tup))
+ return false;
+ return att_isnull(attnum - 1, tup->t_data->t_bits);
+ }
+
+ switch (attnum)
+ {
+ case TableOidAttributeNumber:
+ case SelfItemPointerAttributeNumber:
+ case MinTransactionIdAttributeNumber:
+ case MinCommandIdAttributeNumber:
+ case MaxTransactionIdAttributeNumber:
+ case MaxCommandIdAttributeNumber:
+ /* these are never null */
+ break;
+
+ default:
+ elog(ERROR, "invalid attnum: %d", attnum);
+ }
+
+ return false;
+}
+
+/* ----------------
+ * nocachegetattr
+ *
+ * This only gets called from fastgetattr() macro, in cases where
+ * we can't use a cacheoffset and the value is not null.
+ *
+ * This caches attribute offsets in the attribute descriptor.
+ *
+ * An alternative way to speed things up would be to cache offsets
+ * with the tuple, but that seems more difficult unless you take
+ * the storage hit of actually putting those offsets into the
+ * tuple you send to disk. Yuck.
+ *
+ * This scheme will be slightly slower than that, but should
+ * perform well for queries which hit large #'s of tuples. After
+ * you cache the offsets once, examining all the other tuples using
+ * the same attribute descriptor will go much quicker. -cim 5/4/91
+ *
+ * NOTE: if you need to change this code, see also heap_deform_tuple.
+ * Also see nocache_index_getattr, which is the same code for index
+ * tuples.
+ * ----------------
+ */
+Datum
+nocachegetattr(HeapTuple tuple,
+ int attnum,
+ TupleDesc tupleDesc)
+{
+ HeapTupleHeader tup = tuple->t_data;
+ char *tp; /* ptr to data part of tuple */
+ bits8 *bp = tup->t_bits; /* ptr to null bitmap in tuple */
+ bool slow = false; /* do we have to walk attrs? */
+ int off; /* current offset within data */
+
+ /* ----------------
+ * Three cases:
+ *
+ * 1: No nulls and no variable-width attributes.
+ * 2: Has a null or a var-width AFTER att.
+ * 3: Has nulls or var-widths BEFORE att.
+ * ----------------
+ */
+
+ attnum--;
+
+ if (!HeapTupleNoNulls(tuple))
+ {
+ /*
+ * there's a null somewhere in the tuple
+ *
+ * check to see if any preceding bits are null...
+ */
+ int byte = attnum >> 3;
+ int finalbit = attnum & 0x07;
+
+ /* check for nulls "before" final bit of last byte */
+ if ((~bp[byte]) & ((1 << finalbit) - 1))
+ slow = true;
+ else
+ {
+ /* check for nulls in any "earlier" bytes */
+ int i;
+
+ for (i = 0; i < byte; i++)
+ {
+ if (bp[i] != 0xFF)
+ {
+ slow = true;
+ break;
+ }
+ }
+ }
+ }
+
+ tp = (char *) tup + tup->t_hoff;
+
+ if (!slow)
+ {
+ Form_pg_attribute att;
+
+ /*
+ * If we get here, there are no nulls up to and including the target
+ * attribute. If we have a cached offset, we can use it.
+ */
+ att = TupleDescAttr(tupleDesc, attnum);
+ if (att->attcacheoff >= 0)
+ return fetchatt(att, tp + att->attcacheoff);
+
+ /*
+ * Otherwise, check for non-fixed-length attrs up to and including
+ * target. If there aren't any, it's safe to cheaply initialize the
+ * cached offsets for these attrs.
+ */
+ if (HeapTupleHasVarWidth(tuple))
+ {
+ int j;
+
+ for (j = 0; j <= attnum; j++)
+ {
+ if (TupleDescAttr(tupleDesc, j)->attlen <= 0)
+ {
+ slow = true;
+ break;
+ }
+ }
+ }
+ }
+
+ if (!slow)
+ {
+ int natts = tupleDesc->natts;
+ int j = 1;
+
+ /*
+ * If we get here, we have a tuple with no nulls or var-widths up to
+ * and including the target attribute, so we can use the cached offset
+ * ... only we don't have it yet, or we'd not have got here. Since
+ * it's cheap to compute offsets for fixed-width columns, we take the
+ * opportunity to initialize the cached offsets for *all* the leading
+ * fixed-width columns, in hope of avoiding future visits to this
+ * routine.
+ */
+ TupleDescAttr(tupleDesc, 0)->attcacheoff = 0;
+
+ /* we might have set some offsets in the slow path previously */
+ while (j < natts && TupleDescAttr(tupleDesc, j)->attcacheoff > 0)
+ j++;
+
+ off = TupleDescAttr(tupleDesc, j - 1)->attcacheoff +
+ TupleDescAttr(tupleDesc, j - 1)->attlen;
+
+ for (; j < natts; j++)
+ {
+ Form_pg_attribute att = TupleDescAttr(tupleDesc, j);
+
+ if (att->attlen <= 0)
+ break;
+
+ off = att_align_nominal(off, att->attalign);
+
+ att->attcacheoff = off;
+
+ off += att->attlen;
+ }
+
+ Assert(j > attnum);
+
+ off = TupleDescAttr(tupleDesc, attnum)->attcacheoff;
+ }
+ else
+ {
+ bool usecache = true;
+ int i;
+
+ /*
+ * Now we know that we have to walk the tuple CAREFULLY. But we still
+ * might be able to cache some offsets for next time.
+ *
+ * Note - This loop is a little tricky. For each non-null attribute,
+ * we have to first account for alignment padding before the attr,
+ * then advance over the attr based on its length. Nulls have no
+ * storage and no alignment padding either. We can use/set
+ * attcacheoff until we reach either a null or a var-width attribute.
+ */
+ off = 0;
+ for (i = 0;; i++) /* loop exit is at "break" */
+ {
+ Form_pg_attribute att = TupleDescAttr(tupleDesc, i);
+
+ if (HeapTupleHasNulls(tuple) && att_isnull(i, bp))
+ {
+ usecache = false;
+ continue; /* this cannot be the target att */
+ }
+
+ /* If we know the next offset, we can skip the rest */
+ if (usecache && att->attcacheoff >= 0)
+ off = att->attcacheoff;
+ else if (att->attlen == -1)
+ {
+ /*
+ * We can only cache the offset for a varlena attribute if the
+ * offset is already suitably aligned, so that there would be
+ * no pad bytes in any case: then the offset will be valid for
+ * either an aligned or unaligned value.
+ */
+ if (usecache &&
+ off == att_align_nominal(off, att->attalign))
+ att->attcacheoff = off;
+ else
+ {
+ off = att_align_pointer(off, att->attalign, -1,
+ tp + off);
+ usecache = false;
+ }
+ }
+ else
+ {
+ /* not varlena, so safe to use att_align_nominal */
+ off = att_align_nominal(off, att->attalign);
+
+ if (usecache)
+ att->attcacheoff = off;
+ }
+
+ if (i == attnum)
+ break;
+
+ off = att_addlength_pointer(off, att->attlen, tp + off);
+
+ if (usecache && att->attlen <= 0)
+ usecache = false;
+ }
+ }
+
+ return fetchatt(TupleDescAttr(tupleDesc, attnum), tp + off);
+}
+
+/* ----------------
+ * heap_getsysattr
+ *
+ * Fetch the value of a system attribute for a tuple.
+ *
+ * This is a support routine for the heap_getattr macro. The macro
+ * has already determined that the attnum refers to a system attribute.
+ * ----------------
+ */
+Datum
+heap_getsysattr(HeapTuple tup, int attnum, TupleDesc tupleDesc, bool *isnull)
+{
+ Datum result;
+
+ Assert(tup);
+
+ /* Currently, no sys attribute ever reads as NULL. */
+ *isnull = false;
+
+ switch (attnum)
+ {
+ case SelfItemPointerAttributeNumber:
+ /* pass-by-reference datatype */
+ result = PointerGetDatum(&(tup->t_self));
+ break;
+ case MinTransactionIdAttributeNumber:
+ result = TransactionIdGetDatum(HeapTupleHeaderGetRawXmin(tup->t_data));
+ break;
+ case MaxTransactionIdAttributeNumber:
+ result = TransactionIdGetDatum(HeapTupleHeaderGetRawXmax(tup->t_data));
+ break;
+ case MinCommandIdAttributeNumber:
+ case MaxCommandIdAttributeNumber:
+
+ /*
+ * cmin and cmax are now both aliases for the same field, which
+ * can in fact also be a combo command id. XXX perhaps we should
+ * return the "real" cmin or cmax if possible, that is if we are
+ * inside the originating transaction?
+ */
+ result = CommandIdGetDatum(HeapTupleHeaderGetRawCommandId(tup->t_data));
+ break;
+ case TableOidAttributeNumber:
+ result = ObjectIdGetDatum(tup->t_tableOid);
+ break;
+ default:
+ elog(ERROR, "invalid attnum: %d", attnum);
+ result = 0; /* keep compiler quiet */
+ break;
+ }
+ return result;
+}
+
+/* ----------------
+ * heap_copytuple
+ *
+ * returns a copy of an entire tuple
+ *
+ * The HeapTuple struct, tuple header, and tuple data are all allocated
+ * as a single palloc() block.
+ * ----------------
+ */
+HeapTuple
+heap_copytuple(HeapTuple tuple)
+{
+ HeapTuple newTuple;
+
+ if (!HeapTupleIsValid(tuple) || tuple->t_data == NULL)
+ return NULL;
+
+ newTuple = (HeapTuple) palloc(HEAPTUPLESIZE + tuple->t_len);
+ newTuple->t_len = tuple->t_len;
+ newTuple->t_self = tuple->t_self;
+ newTuple->t_tableOid = tuple->t_tableOid;
+ newTuple->t_data = (HeapTupleHeader) ((char *) newTuple + HEAPTUPLESIZE);
+ memcpy((char *) newTuple->t_data, (char *) tuple->t_data, tuple->t_len);
+ return newTuple;
+}
+
+/* ----------------
+ * heap_copytuple_with_tuple
+ *
+ * copy a tuple into a caller-supplied HeapTuple management struct
+ *
+ * Note that after calling this function, the "dest" HeapTuple will not be
+ * allocated as a single palloc() block (unlike with heap_copytuple()).
+ * ----------------
+ */
+void
+heap_copytuple_with_tuple(HeapTuple src, HeapTuple dest)
+{
+ if (!HeapTupleIsValid(src) || src->t_data == NULL)
+ {
+ dest->t_data = NULL;
+ return;
+ }
+
+ dest->t_len = src->t_len;
+ dest->t_self = src->t_self;
+ dest->t_tableOid = src->t_tableOid;
+ dest->t_data = (HeapTupleHeader) palloc(src->t_len);
+ memcpy((char *) dest->t_data, (char *) src->t_data, src->t_len);
+}
+
+/*
+ * Expand a tuple which has fewer attributes than required. For each attribute
+ * not present in the sourceTuple, if there is a missing value that will be
+ * used. Otherwise the attribute will be set to NULL.
+ *
+ * The source tuple must have fewer attributes than the required number.
+ *
+ * Only one of targetHeapTuple and targetMinimalTuple may be supplied. The
+ * other argument must be NULL.
+ */
+static void
+expand_tuple(HeapTuple *targetHeapTuple,
+ MinimalTuple *targetMinimalTuple,
+ HeapTuple sourceTuple,
+ TupleDesc tupleDesc)
+{
+ AttrMissing *attrmiss = NULL;
+ int attnum;
+ int firstmissingnum;
+ bool hasNulls = HeapTupleHasNulls(sourceTuple);
+ HeapTupleHeader targetTHeader;
+ HeapTupleHeader sourceTHeader = sourceTuple->t_data;
+ int sourceNatts = HeapTupleHeaderGetNatts(sourceTHeader);
+ int natts = tupleDesc->natts;
+ int sourceNullLen;
+ int targetNullLen;
+ Size sourceDataLen = sourceTuple->t_len - sourceTHeader->t_hoff;
+ Size targetDataLen;
+ Size len;
+ int hoff;
+ bits8 *nullBits = NULL;
+ int bitMask = 0;
+ char *targetData;
+ uint16 *infoMask;
+
+ Assert((targetHeapTuple && !targetMinimalTuple)
+ || (!targetHeapTuple && targetMinimalTuple));
+
+ Assert(sourceNatts < natts);
+
+ sourceNullLen = (hasNulls ? BITMAPLEN(sourceNatts) : 0);
+
+ targetDataLen = sourceDataLen;
+
+ if (tupleDesc->constr &&
+ tupleDesc->constr->missing)
+ {
+ /*
+ * If there are missing values we want to put them into the tuple.
+ * Before that we have to compute the extra length for the values
+ * array and the variable length data.
+ */
+ attrmiss = tupleDesc->constr->missing;
+
+ /*
+ * Find the first item in attrmiss for which we don't have a value in
+ * the source. We can ignore all the missing entries before that.
+ */
+ for (firstmissingnum = sourceNatts;
+ firstmissingnum < natts;
+ firstmissingnum++)
+ {
+ if (attrmiss[firstmissingnum].am_present)
+ break;
+ else
+ hasNulls = true;
+ }
+
+ /*
+ * Now walk the missing attributes. If there is a missing value make
+ * space for it. Otherwise, it's going to be NULL.
+ */
+ for (attnum = firstmissingnum;
+ attnum < natts;
+ attnum++)
+ {
+ if (attrmiss[attnum].am_present)
+ {
+ Form_pg_attribute att = TupleDescAttr(tupleDesc, attnum);
+
+ targetDataLen = att_align_datum(targetDataLen,
+ att->attalign,
+ att->attlen,
+ attrmiss[attnum].am_value);
+
+ targetDataLen = att_addlength_pointer(targetDataLen,
+ att->attlen,
+ attrmiss[attnum].am_value);
+ }
+ else
+ {
+ /* no missing value, so it must be null */
+ hasNulls = true;
+ }
+ }
+ } /* end if have missing values */
+ else
+ {
+ /*
+ * If there are no missing values at all then NULLS must be allowed,
+ * since some of the attributes are known to be absent.
+ */
+ hasNulls = true;
+ }
+
+ len = 0;
+
+ if (hasNulls)
+ {
+ targetNullLen = BITMAPLEN(natts);
+ len += targetNullLen;
+ }
+ else
+ targetNullLen = 0;
+
+ /*
+ * Allocate and zero the space needed. Note that the tuple body and
+ * HeapTupleData management structure are allocated in one chunk.
+ */
+ if (targetHeapTuple)
+ {
+ len += offsetof(HeapTupleHeaderData, t_bits);
+ hoff = len = MAXALIGN(len); /* align user data safely */
+ len += targetDataLen;
+
+ *targetHeapTuple = (HeapTuple) palloc0(HEAPTUPLESIZE + len);
+ (*targetHeapTuple)->t_data
+ = targetTHeader
+ = (HeapTupleHeader) ((char *) *targetHeapTuple + HEAPTUPLESIZE);
+ (*targetHeapTuple)->t_len = len;
+ (*targetHeapTuple)->t_tableOid = sourceTuple->t_tableOid;
+ (*targetHeapTuple)->t_self = sourceTuple->t_self;
+
+ targetTHeader->t_infomask = sourceTHeader->t_infomask;
+ targetTHeader->t_hoff = hoff;
+ HeapTupleHeaderSetNatts(targetTHeader, natts);
+ HeapTupleHeaderSetDatumLength(targetTHeader, len);
+ HeapTupleHeaderSetTypeId(targetTHeader, tupleDesc->tdtypeid);
+ HeapTupleHeaderSetTypMod(targetTHeader, tupleDesc->tdtypmod);
+ /* We also make sure that t_ctid is invalid unless explicitly set */
+ ItemPointerSetInvalid(&(targetTHeader->t_ctid));
+ if (targetNullLen > 0)
+ nullBits = (bits8 *) ((char *) (*targetHeapTuple)->t_data
+ + offsetof(HeapTupleHeaderData, t_bits));
+ targetData = (char *) (*targetHeapTuple)->t_data + hoff;
+ infoMask = &(targetTHeader->t_infomask);
+ }
+ else
+ {
+ len += SizeofMinimalTupleHeader;
+ hoff = len = MAXALIGN(len); /* align user data safely */
+ len += targetDataLen;
+
+ *targetMinimalTuple = (MinimalTuple) palloc0(len);
+ (*targetMinimalTuple)->t_len = len;
+ (*targetMinimalTuple)->t_hoff = hoff + MINIMAL_TUPLE_OFFSET;
+ (*targetMinimalTuple)->t_infomask = sourceTHeader->t_infomask;
+ /* Same macro works for MinimalTuples */
+ HeapTupleHeaderSetNatts(*targetMinimalTuple, natts);
+ if (targetNullLen > 0)
+ nullBits = (bits8 *) ((char *) *targetMinimalTuple
+ + offsetof(MinimalTupleData, t_bits));
+ targetData = (char *) *targetMinimalTuple + hoff;
+ infoMask = &((*targetMinimalTuple)->t_infomask);
+ }
+
+ if (targetNullLen > 0)
+ {
+ if (sourceNullLen > 0)
+ {
+ /* if bitmap pre-existed copy in - all is set */
+ memcpy(nullBits,
+ ((char *) sourceTHeader)
+ + offsetof(HeapTupleHeaderData, t_bits),
+ sourceNullLen);
+ nullBits += sourceNullLen - 1;
+ }
+ else
+ {
+ sourceNullLen = BITMAPLEN(sourceNatts);
+ /* Set NOT NULL for all existing attributes */
+ memset(nullBits, 0xff, sourceNullLen);
+
+ nullBits += sourceNullLen - 1;
+
+ if (sourceNatts & 0x07)
+ {
+ /* build the mask (inverted!) */
+ bitMask = 0xff << (sourceNatts & 0x07);
+ /* Voila */
+ *nullBits = ~bitMask;
+ }
+ }
+
+ bitMask = (1 << ((sourceNatts - 1) & 0x07));
+ } /* End if have null bitmap */
+
+ memcpy(targetData,
+ ((char *) sourceTuple->t_data) + sourceTHeader->t_hoff,
+ sourceDataLen);
+
+ targetData += sourceDataLen;
+
+ /* Now fill in the missing values */
+ for (attnum = sourceNatts; attnum < natts; attnum++)
+ {
+
+ Form_pg_attribute attr = TupleDescAttr(tupleDesc, attnum);
+
+ if (attrmiss && attrmiss[attnum].am_present)
+ {
+ fill_val(attr,
+ nullBits ? &nullBits : NULL,
+ &bitMask,
+ &targetData,
+ infoMask,
+ attrmiss[attnum].am_value,
+ false);
+ }
+ else
+ {
+ fill_val(attr,
+ &nullBits,
+ &bitMask,
+ &targetData,
+ infoMask,
+ (Datum) 0,
+ true);
+ }
+ } /* end loop over missing attributes */
+}
+
+/*
+ * Fill in the missing values for a minimal HeapTuple
+ */
+MinimalTuple
+minimal_expand_tuple(HeapTuple sourceTuple, TupleDesc tupleDesc)
+{
+ MinimalTuple minimalTuple;
+
+ expand_tuple(NULL, &minimalTuple, sourceTuple, tupleDesc);
+ return minimalTuple;
+}
+
+/*
+ * Fill in the missing values for an ordinary HeapTuple
+ */
+HeapTuple
+heap_expand_tuple(HeapTuple sourceTuple, TupleDesc tupleDesc)
+{
+ HeapTuple heapTuple;
+
+ expand_tuple(&heapTuple, NULL, sourceTuple, tupleDesc);
+ return heapTuple;
+}
+
+/* ----------------
+ * heap_copy_tuple_as_datum
+ *
+ * copy a tuple as a composite-type Datum
+ * ----------------
+ */
+Datum
+heap_copy_tuple_as_datum(HeapTuple tuple, TupleDesc tupleDesc)
+{
+ HeapTupleHeader td;
+
+ /*
+ * If the tuple contains any external TOAST pointers, we have to inline
+ * those fields to meet the conventions for composite-type Datums.
+ */
+ if (HeapTupleHasExternal(tuple))
+ return toast_flatten_tuple_to_datum(tuple->t_data,
+ tuple->t_len,
+ tupleDesc);
+
+ /*
+ * Fast path for easy case: just make a palloc'd copy and insert the
+ * correct composite-Datum header fields (since those may not be set if
+ * the given tuple came from disk, rather than from heap_form_tuple).
+ */
+ td = (HeapTupleHeader) palloc(tuple->t_len);
+ memcpy((char *) td, (char *) tuple->t_data, tuple->t_len);
+
+ HeapTupleHeaderSetDatumLength(td, tuple->t_len);
+ HeapTupleHeaderSetTypeId(td, tupleDesc->tdtypeid);
+ HeapTupleHeaderSetTypMod(td, tupleDesc->tdtypmod);
+
+ return PointerGetDatum(td);
+}
+
+/*
+ * heap_form_tuple
+ * construct a tuple from the given values[] and isnull[] arrays,
+ * which are of the length indicated by tupleDescriptor->natts
+ *
+ * The result is allocated in the current memory context.
+ */
+HeapTuple
+heap_form_tuple(TupleDesc tupleDescriptor,
+ Datum *values,
+ bool *isnull)
+{
+ HeapTuple tuple; /* return tuple */
+ HeapTupleHeader td; /* tuple data */
+ Size len,
+ data_len;
+ int hoff;
+ bool hasnull = false;
+ int numberOfAttributes = tupleDescriptor->natts;
+ int i;
+
+ if (numberOfAttributes > MaxTupleAttributeNumber)
+ ereport(ERROR,
+ (errcode(ERRCODE_TOO_MANY_COLUMNS),
+ errmsg("number of columns (%d) exceeds limit (%d)",
+ numberOfAttributes, MaxTupleAttributeNumber)));
+
+ /*
+ * Check for nulls
+ */
+ for (i = 0; i < numberOfAttributes; i++)
+ {
+ if (isnull[i])
+ {
+ hasnull = true;
+ break;
+ }
+ }
+
+ /*
+ * Determine total space needed
+ */
+ len = offsetof(HeapTupleHeaderData, t_bits);
+
+ if (hasnull)
+ len += BITMAPLEN(numberOfAttributes);
+
+ hoff = len = MAXALIGN(len); /* align user data safely */
+
+ data_len = heap_compute_data_size(tupleDescriptor, values, isnull);
+
+ len += data_len;
+
+ /*
+ * Allocate and zero the space needed. Note that the tuple body and
+ * HeapTupleData management structure are allocated in one chunk.
+ */
+ tuple = (HeapTuple) palloc0(HEAPTUPLESIZE + len);
+ tuple->t_data = td = (HeapTupleHeader) ((char *) tuple + HEAPTUPLESIZE);
+
+ /*
+ * And fill in the information. Note we fill the Datum fields even though
+ * this tuple may never become a Datum. This lets HeapTupleHeaderGetDatum
+ * identify the tuple type if needed.
+ */
+ tuple->t_len = len;
+ ItemPointerSetInvalid(&(tuple->t_self));
+ tuple->t_tableOid = InvalidOid;
+
+ HeapTupleHeaderSetDatumLength(td, len);
+ HeapTupleHeaderSetTypeId(td, tupleDescriptor->tdtypeid);
+ HeapTupleHeaderSetTypMod(td, tupleDescriptor->tdtypmod);
+ /* We also make sure that t_ctid is invalid unless explicitly set */
+ ItemPointerSetInvalid(&(td->t_ctid));
+
+ HeapTupleHeaderSetNatts(td, numberOfAttributes);
+ td->t_hoff = hoff;
+
+ heap_fill_tuple(tupleDescriptor,
+ values,
+ isnull,
+ (char *) td + hoff,
+ data_len,
+ &td->t_infomask,
+ (hasnull ? td->t_bits : NULL));
+
+ return tuple;
+}
+
+/*
+ * heap_modify_tuple
+ * form a new tuple from an old tuple and a set of replacement values.
+ *
+ * The replValues, replIsnull, and doReplace arrays must be of the length
+ * indicated by tupleDesc->natts. The new tuple is constructed using the data
+ * from replValues/replIsnull at columns where doReplace is true, and using
+ * the data from the old tuple at columns where doReplace is false.
+ *
+ * The result is allocated in the current memory context.
+ */
+HeapTuple
+heap_modify_tuple(HeapTuple tuple,
+ TupleDesc tupleDesc,
+ Datum *replValues,
+ bool *replIsnull,
+ bool *doReplace)
+{
+ int numberOfAttributes = tupleDesc->natts;
+ int attoff;
+ Datum *values;
+ bool *isnull;
+ HeapTuple newTuple;
+
+ /*
+ * allocate and fill values and isnull arrays from either the tuple or the
+ * repl information, as appropriate.
+ *
+ * NOTE: it's debatable whether to use heap_deform_tuple() here or just
+ * heap_getattr() only the non-replaced columns. The latter could win if
+ * there are many replaced columns and few non-replaced ones. However,
+ * heap_deform_tuple costs only O(N) while the heap_getattr way would cost
+ * O(N^2) if there are many non-replaced columns, so it seems better to
+ * err on the side of linear cost.
+ */
+ values = (Datum *) palloc(numberOfAttributes * sizeof(Datum));
+ isnull = (bool *) palloc(numberOfAttributes * sizeof(bool));
+
+ heap_deform_tuple(tuple, tupleDesc, values, isnull);
+
+ for (attoff = 0; attoff < numberOfAttributes; attoff++)
+ {
+ if (doReplace[attoff])
+ {
+ values[attoff] = replValues[attoff];
+ isnull[attoff] = replIsnull[attoff];
+ }
+ }
+
+ /*
+ * create a new tuple from the values and isnull arrays
+ */
+ newTuple = heap_form_tuple(tupleDesc, values, isnull);
+
+ pfree(values);
+ pfree(isnull);
+
+ /*
+ * copy the identification info of the old tuple: t_ctid, t_self
+ */
+ newTuple->t_data->t_ctid = tuple->t_data->t_ctid;
+ newTuple->t_self = tuple->t_self;
+ newTuple->t_tableOid = tuple->t_tableOid;
+
+ return newTuple;
+}
+
+/*
+ * heap_modify_tuple_by_cols
+ * form a new tuple from an old tuple and a set of replacement values.
+ *
+ * This is like heap_modify_tuple, except that instead of specifying which
+ * column(s) to replace by a boolean map, an array of target column numbers
+ * is used. This is often more convenient when a fixed number of columns
+ * are to be replaced. The replCols, replValues, and replIsnull arrays must
+ * be of length nCols. Target column numbers are indexed from 1.
+ *
+ * The result is allocated in the current memory context.
+ */
+HeapTuple
+heap_modify_tuple_by_cols(HeapTuple tuple,
+ TupleDesc tupleDesc,
+ int nCols,
+ int *replCols,
+ Datum *replValues,
+ bool *replIsnull)
+{
+ int numberOfAttributes = tupleDesc->natts;
+ Datum *values;
+ bool *isnull;
+ HeapTuple newTuple;
+ int i;
+
+ /*
+ * allocate and fill values and isnull arrays from the tuple, then replace
+ * selected columns from the input arrays.
+ */
+ values = (Datum *) palloc(numberOfAttributes * sizeof(Datum));
+ isnull = (bool *) palloc(numberOfAttributes * sizeof(bool));
+
+ heap_deform_tuple(tuple, tupleDesc, values, isnull);
+
+ for (i = 0; i < nCols; i++)
+ {
+ int attnum = replCols[i];
+
+ if (attnum <= 0 || attnum > numberOfAttributes)
+ elog(ERROR, "invalid column number %d", attnum);
+ values[attnum - 1] = replValues[i];
+ isnull[attnum - 1] = replIsnull[i];
+ }
+
+ /*
+ * create a new tuple from the values and isnull arrays
+ */
+ newTuple = heap_form_tuple(tupleDesc, values, isnull);
+
+ pfree(values);
+ pfree(isnull);
+
+ /*
+ * copy the identification info of the old tuple: t_ctid, t_self
+ */
+ newTuple->t_data->t_ctid = tuple->t_data->t_ctid;
+ newTuple->t_self = tuple->t_self;
+ newTuple->t_tableOid = tuple->t_tableOid;
+
+ return newTuple;
+}
+
+/*
+ * heap_deform_tuple
+ * Given a tuple, extract data into values/isnull arrays; this is
+ * the inverse of heap_form_tuple.
+ *
+ * Storage for the values/isnull arrays is provided by the caller;
+ * it should be sized according to tupleDesc->natts not
+ * HeapTupleHeaderGetNatts(tuple->t_data).
+ *
+ * Note that for pass-by-reference datatypes, the pointer placed
+ * in the Datum will point into the given tuple.
+ *
+ * When all or most of a tuple's fields need to be extracted,
+ * this routine will be significantly quicker than a loop around
+ * heap_getattr; the loop will become O(N^2) as soon as any
+ * noncacheable attribute offsets are involved.
+ */
+void
+heap_deform_tuple(HeapTuple tuple, TupleDesc tupleDesc,
+ Datum *values, bool *isnull)
+{
+ HeapTupleHeader tup = tuple->t_data;
+ bool hasnulls = HeapTupleHasNulls(tuple);
+ int tdesc_natts = tupleDesc->natts;
+ int natts; /* number of atts to extract */
+ int attnum;
+ char *tp; /* ptr to tuple data */
+ uint32 off; /* offset in tuple data */
+ bits8 *bp = tup->t_bits; /* ptr to null bitmap in tuple */
+ bool slow = false; /* can we use/set attcacheoff? */
+
+ natts = HeapTupleHeaderGetNatts(tup);
+
+ /*
+ * In inheritance situations, it is possible that the given tuple actually
+ * has more fields than the caller is expecting. Don't run off the end of
+ * the caller's arrays.
+ */
+ natts = Min(natts, tdesc_natts);
+
+ tp = (char *) tup + tup->t_hoff;
+
+ off = 0;
+
+ for (attnum = 0; attnum < natts; attnum++)
+ {
+ Form_pg_attribute thisatt = TupleDescAttr(tupleDesc, attnum);
+
+ if (hasnulls && att_isnull(attnum, bp))
+ {
+ values[attnum] = (Datum) 0;
+ isnull[attnum] = true;
+ slow = true; /* can't use attcacheoff anymore */
+ continue;
+ }
+
+ isnull[attnum] = false;
+
+ if (!slow && thisatt->attcacheoff >= 0)
+ off = thisatt->attcacheoff;
+ else if (thisatt->attlen == -1)
+ {
+ /*
+ * We can only cache the offset for a varlena attribute if the
+ * offset is already suitably aligned, so that there would be no
+ * pad bytes in any case: then the offset will be valid for either
+ * an aligned or unaligned value.
+ */
+ if (!slow &&
+ off == att_align_nominal(off, thisatt->attalign))
+ thisatt->attcacheoff = off;
+ else
+ {
+ off = att_align_pointer(off, thisatt->attalign, -1,
+ tp + off);
+ slow = true;
+ }
+ }
+ else
+ {
+ /* not varlena, so safe to use att_align_nominal */
+ off = att_align_nominal(off, thisatt->attalign);
+
+ if (!slow)
+ thisatt->attcacheoff = off;
+ }
+
+ values[attnum] = fetchatt(thisatt, tp + off);
+
+ off = att_addlength_pointer(off, thisatt->attlen, tp + off);
+
+ if (thisatt->attlen <= 0)
+ slow = true; /* can't use attcacheoff anymore */
+ }
+
+ /*
+ * If tuple doesn't have all the atts indicated by tupleDesc, read the
+ * rest as nulls or missing values as appropriate.
+ */
+ for (; attnum < tdesc_natts; attnum++)
+ values[attnum] = getmissingattr(tupleDesc, attnum + 1, &isnull[attnum]);
+}
+
+/*
+ * heap_freetuple
+ */
+void
+heap_freetuple(HeapTuple htup)
+{
+ pfree(htup);
+}
+
+
+/*
+ * heap_form_minimal_tuple
+ * construct a MinimalTuple from the given values[] and isnull[] arrays,
+ * which are of the length indicated by tupleDescriptor->natts
+ *
+ * This is exactly like heap_form_tuple() except that the result is a
+ * "minimal" tuple lacking a HeapTupleData header as well as room for system
+ * columns.
+ *
+ * The result is allocated in the current memory context.
+ */
+MinimalTuple
+heap_form_minimal_tuple(TupleDesc tupleDescriptor,
+ Datum *values,
+ bool *isnull)
+{
+ MinimalTuple tuple; /* return tuple */
+ Size len,
+ data_len;
+ int hoff;
+ bool hasnull = false;
+ int numberOfAttributes = tupleDescriptor->natts;
+ int i;
+
+ if (numberOfAttributes > MaxTupleAttributeNumber)
+ ereport(ERROR,
+ (errcode(ERRCODE_TOO_MANY_COLUMNS),
+ errmsg("number of columns (%d) exceeds limit (%d)",
+ numberOfAttributes, MaxTupleAttributeNumber)));
+
+ /*
+ * Check for nulls
+ */
+ for (i = 0; i < numberOfAttributes; i++)
+ {
+ if (isnull[i])
+ {
+ hasnull = true;
+ break;
+ }
+ }
+
+ /*
+ * Determine total space needed
+ */
+ len = SizeofMinimalTupleHeader;
+
+ if (hasnull)
+ len += BITMAPLEN(numberOfAttributes);
+
+ hoff = len = MAXALIGN(len); /* align user data safely */
+
+ data_len = heap_compute_data_size(tupleDescriptor, values, isnull);
+
+ len += data_len;
+
+ /*
+ * Allocate and zero the space needed.
+ */
+ tuple = (MinimalTuple) palloc0(len);
+
+ /*
+ * And fill in the information.
+ */
+ tuple->t_len = len;
+ HeapTupleHeaderSetNatts(tuple, numberOfAttributes);
+ tuple->t_hoff = hoff + MINIMAL_TUPLE_OFFSET;
+
+ heap_fill_tuple(tupleDescriptor,
+ values,
+ isnull,
+ (char *) tuple + hoff,
+ data_len,
+ &tuple->t_infomask,
+ (hasnull ? tuple->t_bits : NULL));
+
+ return tuple;
+}
+
+/*
+ * heap_free_minimal_tuple
+ */
+void
+heap_free_minimal_tuple(MinimalTuple mtup)
+{
+ pfree(mtup);
+}
+
+/*
+ * heap_copy_minimal_tuple
+ * copy a MinimalTuple
+ *
+ * The result is allocated in the current memory context.
+ */
+MinimalTuple
+heap_copy_minimal_tuple(MinimalTuple mtup)
+{
+ MinimalTuple result;
+
+ result = (MinimalTuple) palloc(mtup->t_len);
+ memcpy(result, mtup, mtup->t_len);
+ return result;
+}
+
+/*
+ * heap_tuple_from_minimal_tuple
+ * create a HeapTuple by copying from a MinimalTuple;
+ * system columns are filled with zeroes
+ *
+ * The result is allocated in the current memory context.
+ * The HeapTuple struct, tuple header, and tuple data are all allocated
+ * as a single palloc() block.
+ */
+HeapTuple
+heap_tuple_from_minimal_tuple(MinimalTuple mtup)
+{
+ HeapTuple result;
+ uint32 len = mtup->t_len + MINIMAL_TUPLE_OFFSET;
+
+ result = (HeapTuple) palloc(HEAPTUPLESIZE + len);
+ result->t_len = len;
+ ItemPointerSetInvalid(&(result->t_self));
+ result->t_tableOid = InvalidOid;
+ result->t_data = (HeapTupleHeader) ((char *) result + HEAPTUPLESIZE);
+ memcpy((char *) result->t_data + MINIMAL_TUPLE_OFFSET, mtup, mtup->t_len);
+ memset(result->t_data, 0, offsetof(HeapTupleHeaderData, t_infomask2));
+ return result;
+}
+
+/*
+ * minimal_tuple_from_heap_tuple
+ * create a MinimalTuple by copying from a HeapTuple
+ *
+ * The result is allocated in the current memory context.
+ */
+MinimalTuple
+minimal_tuple_from_heap_tuple(HeapTuple htup)
+{
+ MinimalTuple result;
+ uint32 len;
+
+ Assert(htup->t_len > MINIMAL_TUPLE_OFFSET);
+ len = htup->t_len - MINIMAL_TUPLE_OFFSET;
+ result = (MinimalTuple) palloc(len);
+ memcpy(result, (char *) htup->t_data + MINIMAL_TUPLE_OFFSET, len);
+ result->t_len = len;
+ return result;
+}
+
+/*
+ * This mainly exists so JIT can inline the definition, but it's also
+ * sometimes useful in debugging sessions.
+ */
+size_t
+varsize_any(void *p)
+{
+ return VARSIZE_ANY(p);
+}
diff --git a/src/backend/access/common/indextuple.c b/src/backend/access/common/indextuple.c
new file mode 100644
index 0000000..8df882d
--- /dev/null
+++ b/src/backend/access/common/indextuple.c
@@ -0,0 +1,589 @@
+/*-------------------------------------------------------------------------
+ *
+ * indextuple.c
+ * This file contains index tuple accessor and mutator routines,
+ * as well as various tuple utilities.
+ *
+ * Portions Copyright (c) 1996-2021, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ *
+ * IDENTIFICATION
+ * src/backend/access/common/indextuple.c
+ *
+ *-------------------------------------------------------------------------
+ */
+
+#include "postgres.h"
+
+#include "access/detoast.h"
+#include "access/heaptoast.h"
+#include "access/htup_details.h"
+#include "access/itup.h"
+#include "access/toast_internals.h"
+
+/*
+ * This enables de-toasting of index entries. Needed until VACUUM is
+ * smart enough to rebuild indexes from scratch.
+ */
+#define TOAST_INDEX_HACK
+
+/* ----------------------------------------------------------------
+ * index_ tuple interface routines
+ * ----------------------------------------------------------------
+ */
+
+/* ----------------
+ * index_form_tuple
+ *
+ * This shouldn't leak any memory; otherwise, callers such as
+ * tuplesort_putindextuplevalues() will be very unhappy.
+ *
+ * This shouldn't perform external table access provided caller
+ * does not pass values that are stored EXTERNAL.
+ * ----------------
+ */
+IndexTuple
+index_form_tuple(TupleDesc tupleDescriptor,
+ Datum *values,
+ bool *isnull)
+{
+ char *tp; /* tuple pointer */
+ IndexTuple tuple; /* return tuple */
+ Size size,
+ data_size,
+ hoff;
+ int i;
+ unsigned short infomask = 0;
+ bool hasnull = false;
+ uint16 tupmask = 0;
+ int numberOfAttributes = tupleDescriptor->natts;
+
+#ifdef TOAST_INDEX_HACK
+ Datum untoasted_values[INDEX_MAX_KEYS];
+ bool untoasted_free[INDEX_MAX_KEYS];
+#endif
+
+ if (numberOfAttributes > INDEX_MAX_KEYS)
+ ereport(ERROR,
+ (errcode(ERRCODE_TOO_MANY_COLUMNS),
+ errmsg("number of index columns (%d) exceeds limit (%d)",
+ numberOfAttributes, INDEX_MAX_KEYS)));
+
+#ifdef TOAST_INDEX_HACK
+ for (i = 0; i < numberOfAttributes; i++)
+ {
+ Form_pg_attribute att = TupleDescAttr(tupleDescriptor, i);
+
+ untoasted_values[i] = values[i];
+ untoasted_free[i] = false;
+
+ /* Do nothing if value is NULL or not of varlena type */
+ if (isnull[i] || att->attlen != -1)
+ continue;
+
+ /*
+ * If value is stored EXTERNAL, must fetch it so we are not depending
+ * on outside storage. This should be improved someday.
+ */
+ if (VARATT_IS_EXTERNAL(DatumGetPointer(values[i])))
+ {
+ untoasted_values[i] =
+ PointerGetDatum(detoast_external_attr((struct varlena *)
+ DatumGetPointer(values[i])));
+ untoasted_free[i] = true;
+ }
+
+ /*
+ * If value is above size target, and is of a compressible datatype,
+ * try to compress it in-line.
+ */
+ if (!VARATT_IS_EXTENDED(DatumGetPointer(untoasted_values[i])) &&
+ VARSIZE(DatumGetPointer(untoasted_values[i])) > TOAST_INDEX_TARGET &&
+ (att->attstorage == TYPSTORAGE_EXTENDED ||
+ att->attstorage == TYPSTORAGE_MAIN))
+ {
+ Datum cvalue;
+
+ cvalue = toast_compress_datum(untoasted_values[i],
+ att->attcompression);
+
+ if (DatumGetPointer(cvalue) != NULL)
+ {
+ /* successful compression */
+ if (untoasted_free[i])
+ pfree(DatumGetPointer(untoasted_values[i]));
+ untoasted_values[i] = cvalue;
+ untoasted_free[i] = true;
+ }
+ }
+ }
+#endif
+
+ for (i = 0; i < numberOfAttributes; i++)
+ {
+ if (isnull[i])
+ {
+ hasnull = true;
+ break;
+ }
+ }
+
+ if (hasnull)
+ infomask |= INDEX_NULL_MASK;
+
+ hoff = IndexInfoFindDataOffset(infomask);
+#ifdef TOAST_INDEX_HACK
+ data_size = heap_compute_data_size(tupleDescriptor,
+ untoasted_values, isnull);
+#else
+ data_size = heap_compute_data_size(tupleDescriptor,
+ values, isnull);
+#endif
+ size = hoff + data_size;
+ size = MAXALIGN(size); /* be conservative */
+
+ tp = (char *) palloc0(size);
+ tuple = (IndexTuple) tp;
+
+ heap_fill_tuple(tupleDescriptor,
+#ifdef TOAST_INDEX_HACK
+ untoasted_values,
+#else
+ values,
+#endif
+ isnull,
+ (char *) tp + hoff,
+ data_size,
+ &tupmask,
+ (hasnull ? (bits8 *) tp + sizeof(IndexTupleData) : NULL));
+
+#ifdef TOAST_INDEX_HACK
+ for (i = 0; i < numberOfAttributes; i++)
+ {
+ if (untoasted_free[i])
+ pfree(DatumGetPointer(untoasted_values[i]));
+ }
+#endif
+
+ /*
+ * We do this because heap_fill_tuple wants to initialize a "tupmask"
+ * which is used for HeapTuples, but we want an indextuple infomask. The
+ * only relevant info is the "has variable attributes" field. We have
+ * already set the hasnull bit above.
+ */
+ if (tupmask & HEAP_HASVARWIDTH)
+ infomask |= INDEX_VAR_MASK;
+
+ /* Also assert we got rid of external attributes */
+#ifdef TOAST_INDEX_HACK
+ Assert((tupmask & HEAP_HASEXTERNAL) == 0);
+#endif
+
+ /*
+ * Here we make sure that the size will fit in the field reserved for it
+ * in t_info.
+ */
+ if ((size & INDEX_SIZE_MASK) != size)
+ ereport(ERROR,
+ (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
+ errmsg("index row requires %zu bytes, maximum size is %zu",
+ size, (Size) INDEX_SIZE_MASK)));
+
+ infomask |= size;
+
+ /*
+ * initialize metadata
+ */
+ tuple->t_info = infomask;
+ return tuple;
+}
+
+/* ----------------
+ * nocache_index_getattr
+ *
+ * This gets called from index_getattr() macro, and only in cases
+ * where we can't use cacheoffset and the value is not null.
+ *
+ * This caches attribute offsets in the attribute descriptor.
+ *
+ * An alternative way to speed things up would be to cache offsets
+ * with the tuple, but that seems more difficult unless you take
+ * the storage hit of actually putting those offsets into the
+ * tuple you send to disk. Yuck.
+ *
+ * This scheme will be slightly slower than that, but should
+ * perform well for queries which hit large #'s of tuples. After
+ * you cache the offsets once, examining all the other tuples using
+ * the same attribute descriptor will go much quicker. -cim 5/4/91
+ * ----------------
+ */
+Datum
+nocache_index_getattr(IndexTuple tup,
+ int attnum,
+ TupleDesc tupleDesc)
+{
+ char *tp; /* ptr to data part of tuple */
+ bits8 *bp = NULL; /* ptr to null bitmap in tuple */
+ bool slow = false; /* do we have to walk attrs? */
+ int data_off; /* tuple data offset */
+ int off; /* current offset within data */
+
+ /* ----------------
+ * Three cases:
+ *
+ * 1: No nulls and no variable-width attributes.
+ * 2: Has a null or a var-width AFTER att.
+ * 3: Has nulls or var-widths BEFORE att.
+ * ----------------
+ */
+
+ data_off = IndexInfoFindDataOffset(tup->t_info);
+
+ attnum--;
+
+ if (IndexTupleHasNulls(tup))
+ {
+ /*
+ * there's a null somewhere in the tuple
+ *
+ * check to see if desired att is null
+ */
+
+ /* XXX "knows" t_bits are just after fixed tuple header! */
+ bp = (bits8 *) ((char *) tup + sizeof(IndexTupleData));
+
+ /*
+ * Now check to see if any preceding bits are null...
+ */
+ {
+ int byte = attnum >> 3;
+ int finalbit = attnum & 0x07;
+
+ /* check for nulls "before" final bit of last byte */
+ if ((~bp[byte]) & ((1 << finalbit) - 1))
+ slow = true;
+ else
+ {
+ /* check for nulls in any "earlier" bytes */
+ int i;
+
+ for (i = 0; i < byte; i++)
+ {
+ if (bp[i] != 0xFF)
+ {
+ slow = true;
+ break;
+ }
+ }
+ }
+ }
+ }
+
+ tp = (char *) tup + data_off;
+
+ if (!slow)
+ {
+ Form_pg_attribute att;
+
+ /*
+ * If we get here, there are no nulls up to and including the target
+ * attribute. If we have a cached offset, we can use it.
+ */
+ att = TupleDescAttr(tupleDesc, attnum);
+ if (att->attcacheoff >= 0)
+ return fetchatt(att, tp + att->attcacheoff);
+
+ /*
+ * Otherwise, check for non-fixed-length attrs up to and including
+ * target. If there aren't any, it's safe to cheaply initialize the
+ * cached offsets for these attrs.
+ */
+ if (IndexTupleHasVarwidths(tup))
+ {
+ int j;
+
+ for (j = 0; j <= attnum; j++)
+ {
+ if (TupleDescAttr(tupleDesc, j)->attlen <= 0)
+ {
+ slow = true;
+ break;
+ }
+ }
+ }
+ }
+
+ if (!slow)
+ {
+ int natts = tupleDesc->natts;
+ int j = 1;
+
+ /*
+ * If we get here, we have a tuple with no nulls or var-widths up to
+ * and including the target attribute, so we can use the cached offset
+ * ... only we don't have it yet, or we'd not have got here. Since
+ * it's cheap to compute offsets for fixed-width columns, we take the
+ * opportunity to initialize the cached offsets for *all* the leading
+ * fixed-width columns, in hope of avoiding future visits to this
+ * routine.
+ */
+ TupleDescAttr(tupleDesc, 0)->attcacheoff = 0;
+
+ /* we might have set some offsets in the slow path previously */
+ while (j < natts && TupleDescAttr(tupleDesc, j)->attcacheoff > 0)
+ j++;
+
+ off = TupleDescAttr(tupleDesc, j - 1)->attcacheoff +
+ TupleDescAttr(tupleDesc, j - 1)->attlen;
+
+ for (; j < natts; j++)
+ {
+ Form_pg_attribute att = TupleDescAttr(tupleDesc, j);
+
+ if (att->attlen <= 0)
+ break;
+
+ off = att_align_nominal(off, att->attalign);
+
+ att->attcacheoff = off;
+
+ off += att->attlen;
+ }
+
+ Assert(j > attnum);
+
+ off = TupleDescAttr(tupleDesc, attnum)->attcacheoff;
+ }
+ else
+ {
+ bool usecache = true;
+ int i;
+
+ /*
+ * Now we know that we have to walk the tuple CAREFULLY. But we still
+ * might be able to cache some offsets for next time.
+ *
+ * Note - This loop is a little tricky. For each non-null attribute,
+ * we have to first account for alignment padding before the attr,
+ * then advance over the attr based on its length. Nulls have no
+ * storage and no alignment padding either. We can use/set
+ * attcacheoff until we reach either a null or a var-width attribute.
+ */
+ off = 0;
+ for (i = 0;; i++) /* loop exit is at "break" */
+ {
+ Form_pg_attribute att = TupleDescAttr(tupleDesc, i);
+
+ if (IndexTupleHasNulls(tup) && att_isnull(i, bp))
+ {
+ usecache = false;
+ continue; /* this cannot be the target att */
+ }
+
+ /* If we know the next offset, we can skip the rest */
+ if (usecache && att->attcacheoff >= 0)
+ off = att->attcacheoff;
+ else if (att->attlen == -1)
+ {
+ /*
+ * We can only cache the offset for a varlena attribute if the
+ * offset is already suitably aligned, so that there would be
+ * no pad bytes in any case: then the offset will be valid for
+ * either an aligned or unaligned value.
+ */
+ if (usecache &&
+ off == att_align_nominal(off, att->attalign))
+ att->attcacheoff = off;
+ else
+ {
+ off = att_align_pointer(off, att->attalign, -1,
+ tp + off);
+ usecache = false;
+ }
+ }
+ else
+ {
+ /* not varlena, so safe to use att_align_nominal */
+ off = att_align_nominal(off, att->attalign);
+
+ if (usecache)
+ att->attcacheoff = off;
+ }
+
+ if (i == attnum)
+ break;
+
+ off = att_addlength_pointer(off, att->attlen, tp + off);
+
+ if (usecache && att->attlen <= 0)
+ usecache = false;
+ }
+ }
+
+ return fetchatt(TupleDescAttr(tupleDesc, attnum), tp + off);
+}
+
+/*
+ * Convert an index tuple into Datum/isnull arrays.
+ *
+ * The caller must allocate sufficient storage for the output arrays.
+ * (INDEX_MAX_KEYS entries should be enough.)
+ *
+ * This is nearly the same as heap_deform_tuple(), but for IndexTuples.
+ * One difference is that the tuple should never have any missing columns.
+ */
+void
+index_deform_tuple(IndexTuple tup, TupleDesc tupleDescriptor,
+ Datum *values, bool *isnull)
+{
+ char *tp; /* ptr to tuple data */
+ bits8 *bp; /* ptr to null bitmap in tuple */
+
+ /* XXX "knows" t_bits are just after fixed tuple header! */
+ bp = (bits8 *) ((char *) tup + sizeof(IndexTupleData));
+
+ tp = (char *) tup + IndexInfoFindDataOffset(tup->t_info);
+
+ index_deform_tuple_internal(tupleDescriptor, values, isnull,
+ tp, bp, IndexTupleHasNulls(tup));
+}
+
+/*
+ * Convert an index tuple into Datum/isnull arrays,
+ * without assuming any specific layout of the index tuple header.
+ *
+ * Caller must supply pointer to data area, pointer to nulls bitmap
+ * (which can be NULL if !hasnulls), and hasnulls flag.
+ */
+void
+index_deform_tuple_internal(TupleDesc tupleDescriptor,
+ Datum *values, bool *isnull,
+ char *tp, bits8 *bp, int hasnulls)
+{
+ int natts = tupleDescriptor->natts; /* number of atts to extract */
+ int attnum;
+ int off = 0; /* offset in tuple data */
+ bool slow = false; /* can we use/set attcacheoff? */
+
+ /* Assert to protect callers who allocate fixed-size arrays */
+ Assert(natts <= INDEX_MAX_KEYS);
+
+ for (attnum = 0; attnum < natts; attnum++)
+ {
+ Form_pg_attribute thisatt = TupleDescAttr(tupleDescriptor, attnum);
+
+ if (hasnulls && att_isnull(attnum, bp))
+ {
+ values[attnum] = (Datum) 0;
+ isnull[attnum] = true;
+ slow = true; /* can't use attcacheoff anymore */
+ continue;
+ }
+
+ isnull[attnum] = false;
+
+ if (!slow && thisatt->attcacheoff >= 0)
+ off = thisatt->attcacheoff;
+ else if (thisatt->attlen == -1)
+ {
+ /*
+ * We can only cache the offset for a varlena attribute if the
+ * offset is already suitably aligned, so that there would be no
+ * pad bytes in any case: then the offset will be valid for either
+ * an aligned or unaligned value.
+ */
+ if (!slow &&
+ off == att_align_nominal(off, thisatt->attalign))
+ thisatt->attcacheoff = off;
+ else
+ {
+ off = att_align_pointer(off, thisatt->attalign, -1,
+ tp + off);
+ slow = true;
+ }
+ }
+ else
+ {
+ /* not varlena, so safe to use att_align_nominal */
+ off = att_align_nominal(off, thisatt->attalign);
+
+ if (!slow)
+ thisatt->attcacheoff = off;
+ }
+
+ values[attnum] = fetchatt(thisatt, tp + off);
+
+ off = att_addlength_pointer(off, thisatt->attlen, tp + off);
+
+ if (thisatt->attlen <= 0)
+ slow = true; /* can't use attcacheoff anymore */
+ }
+}
+
+/*
+ * Create a palloc'd copy of an index tuple.
+ */
+IndexTuple
+CopyIndexTuple(IndexTuple source)
+{
+ IndexTuple result;
+ Size size;
+
+ size = IndexTupleSize(source);
+ result = (IndexTuple) palloc(size);
+ memcpy(result, source, size);
+ return result;
+}
+
+/*
+ * Create a palloc'd copy of an index tuple, leaving only the first
+ * leavenatts attributes remaining.
+ *
+ * Truncation is guaranteed to result in an index tuple that is no
+ * larger than the original. It is safe to use the IndexTuple with
+ * the original tuple descriptor, but caller must avoid actually
+ * accessing truncated attributes from returned tuple! In practice
+ * this means that index_getattr() must be called with special care,
+ * and that the truncated tuple should only ever be accessed by code
+ * under caller's direct control.
+ *
+ * It's safe to call this function with a buffer lock held, since it
+ * never performs external table access. If it ever became possible
+ * for index tuples to contain EXTERNAL TOAST values, then this would
+ * have to be revisited.
+ */
+IndexTuple
+index_truncate_tuple(TupleDesc sourceDescriptor, IndexTuple source,
+ int leavenatts)
+{
+ TupleDesc truncdesc;
+ Datum values[INDEX_MAX_KEYS];
+ bool isnull[INDEX_MAX_KEYS];
+ IndexTuple truncated;
+
+ Assert(leavenatts <= sourceDescriptor->natts);
+
+ /* Easy case: no truncation actually required */
+ if (leavenatts == sourceDescriptor->natts)
+ return CopyIndexTuple(source);
+
+ /* Create temporary descriptor to scribble on */
+ truncdesc = palloc(TupleDescSize(sourceDescriptor));
+ TupleDescCopy(truncdesc, sourceDescriptor);
+ truncdesc->natts = leavenatts;
+
+ /* Deform, form copy of tuple with fewer attributes */
+ index_deform_tuple(source, truncdesc, values, isnull);
+ truncated = index_form_tuple(truncdesc, values, isnull);
+ truncated->t_tid = source->t_tid;
+ Assert(IndexTupleSize(truncated) <= IndexTupleSize(source));
+
+ /*
+ * Cannot leak memory here, TupleDescCopy() doesn't allocate any inner
+ * structure, so, plain pfree() should clean all allocated memory
+ */
+ pfree(truncdesc);
+
+ return truncated;
+}
diff --git a/src/backend/access/common/printsimple.c b/src/backend/access/common/printsimple.c
new file mode 100644
index 0000000..93c3c4f
--- /dev/null
+++ b/src/backend/access/common/printsimple.c
@@ -0,0 +1,132 @@
+/*-------------------------------------------------------------------------
+ *
+ * printsimple.c
+ * Routines to print out tuples containing only a limited range of
+ * builtin types without catalog access. This is intended for
+ * backends that don't have catalog access because they are not bound
+ * to a specific database, such as some walsender processes. It
+ * doesn't handle standalone backends or protocol versions other than
+ * 3.0, because we don't need such handling for current applications.
+ *
+ * Portions Copyright (c) 1996-2021, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * IDENTIFICATION
+ * src/backend/access/common/printsimple.c
+ *
+ *-------------------------------------------------------------------------
+ */
+#include "postgres.h"
+
+#include "access/printsimple.h"
+#include "catalog/pg_type.h"
+#include "libpq/pqformat.h"
+#include "utils/builtins.h"
+
+/*
+ * At startup time, send a RowDescription message.
+ */
+void
+printsimple_startup(DestReceiver *self, int operation, TupleDesc tupdesc)
+{
+ StringInfoData buf;
+ int i;
+
+ pq_beginmessage(&buf, 'T'); /* RowDescription */
+ pq_sendint16(&buf, tupdesc->natts);
+
+ for (i = 0; i < tupdesc->natts; ++i)
+ {
+ Form_pg_attribute attr = TupleDescAttr(tupdesc, i);
+
+ pq_sendstring(&buf, NameStr(attr->attname));
+ pq_sendint32(&buf, 0); /* table oid */
+ pq_sendint16(&buf, 0); /* attnum */
+ pq_sendint32(&buf, (int) attr->atttypid);
+ pq_sendint16(&buf, attr->attlen);
+ pq_sendint32(&buf, attr->atttypmod);
+ pq_sendint16(&buf, 0); /* format code */
+ }
+
+ pq_endmessage(&buf);
+}
+
+/*
+ * For each tuple, send a DataRow message.
+ */
+bool
+printsimple(TupleTableSlot *slot, DestReceiver *self)
+{
+ TupleDesc tupdesc = slot->tts_tupleDescriptor;
+ StringInfoData buf;
+ int i;
+
+ /* Make sure the tuple is fully deconstructed */
+ slot_getallattrs(slot);
+
+ /* Prepare and send message */
+ pq_beginmessage(&buf, 'D');
+ pq_sendint16(&buf, tupdesc->natts);
+
+ for (i = 0; i < tupdesc->natts; ++i)
+ {
+ Form_pg_attribute attr = TupleDescAttr(tupdesc, i);
+ Datum value;
+
+ if (slot->tts_isnull[i])
+ {
+ pq_sendint32(&buf, -1);
+ continue;
+ }
+
+ value = slot->tts_values[i];
+
+ /*
+ * We can't call the regular type output functions here because we
+ * might not have catalog access. Instead, we must hard-wire
+ * knowledge of the required types.
+ */
+ switch (attr->atttypid)
+ {
+ case TEXTOID:
+ {
+ text *t = DatumGetTextPP(value);
+
+ pq_sendcountedtext(&buf,
+ VARDATA_ANY(t),
+ VARSIZE_ANY_EXHDR(t),
+ false);
+ }
+ break;
+
+ case INT4OID:
+ {
+ int32 num = DatumGetInt32(value);
+ char str[12]; /* sign, 10 digits and '\0' */
+ int len;
+
+ len = pg_ltoa(num, str);
+ pq_sendcountedtext(&buf, str, len, false);
+ }
+ break;
+
+ case INT8OID:
+ {
+ int64 num = DatumGetInt64(value);
+ char str[MAXINT8LEN + 1];
+ int len;
+
+ len = pg_lltoa(num, str);
+ pq_sendcountedtext(&buf, str, len, false);
+ }
+ break;
+
+ default:
+ elog(ERROR, "unsupported type OID: %u", attr->atttypid);
+ }
+ }
+
+ pq_endmessage(&buf);
+
+ return true;
+}
diff --git a/src/backend/access/common/printtup.c b/src/backend/access/common/printtup.c
new file mode 100644
index 0000000..54b539f
--- /dev/null
+++ b/src/backend/access/common/printtup.c
@@ -0,0 +1,485 @@
+/*-------------------------------------------------------------------------
+ *
+ * printtup.c
+ * Routines to print out tuples to the destination (both frontend
+ * clients and standalone backends are supported here).
+ *
+ *
+ * Portions Copyright (c) 1996-2021, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * IDENTIFICATION
+ * src/backend/access/common/printtup.c
+ *
+ *-------------------------------------------------------------------------
+ */
+#include "postgres.h"
+
+#include "access/printtup.h"
+#include "libpq/libpq.h"
+#include "libpq/pqformat.h"
+#include "tcop/pquery.h"
+#include "utils/lsyscache.h"
+#include "utils/memdebug.h"
+#include "utils/memutils.h"
+
+
+static void printtup_startup(DestReceiver *self, int operation,
+ TupleDesc typeinfo);
+static bool printtup(TupleTableSlot *slot, DestReceiver *self);
+static void printtup_shutdown(DestReceiver *self);
+static void printtup_destroy(DestReceiver *self);
+
+/* ----------------------------------------------------------------
+ * printtup / debugtup support
+ * ----------------------------------------------------------------
+ */
+
+/* ----------------
+ * Private state for a printtup destination object
+ *
+ * NOTE: finfo is the lookup info for either typoutput or typsend, whichever
+ * we are using for this column.
+ * ----------------
+ */
+typedef struct
+{ /* Per-attribute information */
+ Oid typoutput; /* Oid for the type's text output fn */
+ Oid typsend; /* Oid for the type's binary output fn */
+ bool typisvarlena; /* is it varlena (ie possibly toastable)? */
+ int16 format; /* format code for this column */
+ FmgrInfo finfo; /* Precomputed call info for output fn */
+} PrinttupAttrInfo;
+
+typedef struct
+{
+ DestReceiver pub; /* publicly-known function pointers */
+ Portal portal; /* the Portal we are printing from */
+ bool sendDescrip; /* send RowDescription at startup? */
+ TupleDesc attrinfo; /* The attr info we are set up for */
+ int nattrs;
+ PrinttupAttrInfo *myinfo; /* Cached info about each attr */
+ StringInfoData buf; /* output buffer (*not* in tmpcontext) */
+ MemoryContext tmpcontext; /* Memory context for per-row workspace */
+} DR_printtup;
+
+/* ----------------
+ * Initialize: create a DestReceiver for printtup
+ * ----------------
+ */
+DestReceiver *
+printtup_create_DR(CommandDest dest)
+{
+ DR_printtup *self = (DR_printtup *) palloc0(sizeof(DR_printtup));
+
+ self->pub.receiveSlot = printtup; /* might get changed later */
+ self->pub.rStartup = printtup_startup;
+ self->pub.rShutdown = printtup_shutdown;
+ self->pub.rDestroy = printtup_destroy;
+ self->pub.mydest = dest;
+
+ /*
+ * Send T message automatically if DestRemote, but not if
+ * DestRemoteExecute
+ */
+ self->sendDescrip = (dest == DestRemote);
+
+ self->attrinfo = NULL;
+ self->nattrs = 0;
+ self->myinfo = NULL;
+ self->buf.data = NULL;
+ self->tmpcontext = NULL;
+
+ return (DestReceiver *) self;
+}
+
+/*
+ * Set parameters for a DestRemote (or DestRemoteExecute) receiver
+ */
+void
+SetRemoteDestReceiverParams(DestReceiver *self, Portal portal)
+{
+ DR_printtup *myState = (DR_printtup *) self;
+
+ Assert(myState->pub.mydest == DestRemote ||
+ myState->pub.mydest == DestRemoteExecute);
+
+ myState->portal = portal;
+}
+
+static void
+printtup_startup(DestReceiver *self, int operation, TupleDesc typeinfo)
+{
+ DR_printtup *myState = (DR_printtup *) self;
+ Portal portal = myState->portal;
+
+ /*
+ * Create I/O buffer to be used for all messages. This cannot be inside
+ * tmpcontext, since we want to re-use it across rows.
+ */
+ initStringInfo(&myState->buf);
+
+ /*
+ * Create a temporary memory context that we can reset once per row to
+ * recover palloc'd memory. This avoids any problems with leaks inside
+ * datatype output routines, and should be faster than retail pfree's
+ * anyway.
+ */
+ myState->tmpcontext = AllocSetContextCreate(CurrentMemoryContext,
+ "printtup",
+ ALLOCSET_DEFAULT_SIZES);
+
+ /*
+ * If we are supposed to emit row descriptions, then send the tuple
+ * descriptor of the tuples.
+ */
+ if (myState->sendDescrip)
+ SendRowDescriptionMessage(&myState->buf,
+ typeinfo,
+ FetchPortalTargetList(portal),
+ portal->formats);
+
+ /* ----------------
+ * We could set up the derived attr info at this time, but we postpone it
+ * until the first call of printtup, for 2 reasons:
+ * 1. We don't waste time (compared to the old way) if there are no
+ * tuples at all to output.
+ * 2. Checking in printtup allows us to handle the case that the tuples
+ * change type midway through (although this probably can't happen in
+ * the current executor).
+ * ----------------
+ */
+}
+
+/*
+ * SendRowDescriptionMessage --- send a RowDescription message to the frontend
+ *
+ * Notes: the TupleDesc has typically been manufactured by ExecTypeFromTL()
+ * or some similar function; it does not contain a full set of fields.
+ * The targetlist will be NIL when executing a utility function that does
+ * not have a plan. If the targetlist isn't NIL then it is a Query node's
+ * targetlist; it is up to us to ignore resjunk columns in it. The formats[]
+ * array pointer might be NULL (if we are doing Describe on a prepared stmt);
+ * send zeroes for the format codes in that case.
+ */
+void
+SendRowDescriptionMessage(StringInfo buf, TupleDesc typeinfo,
+ List *targetlist, int16 *formats)
+{
+ int natts = typeinfo->natts;
+ int i;
+ ListCell *tlist_item = list_head(targetlist);
+
+ /* tuple descriptor message type */
+ pq_beginmessage_reuse(buf, 'T');
+ /* # of attrs in tuples */
+ pq_sendint16(buf, natts);
+
+ /*
+ * Preallocate memory for the entire message to be sent. That allows to
+ * use the significantly faster inline pqformat.h functions and to avoid
+ * reallocations.
+ *
+ * Have to overestimate the size of the column-names, to account for
+ * character set overhead.
+ */
+ enlargeStringInfo(buf, (NAMEDATALEN * MAX_CONVERSION_GROWTH /* attname */
+ + sizeof(Oid) /* resorigtbl */
+ + sizeof(AttrNumber) /* resorigcol */
+ + sizeof(Oid) /* atttypid */
+ + sizeof(int16) /* attlen */
+ + sizeof(int32) /* attypmod */
+ + sizeof(int16) /* format */
+ ) * natts);
+
+ for (i = 0; i < natts; ++i)
+ {
+ Form_pg_attribute att = TupleDescAttr(typeinfo, i);
+ Oid atttypid = att->atttypid;
+ int32 atttypmod = att->atttypmod;
+ Oid resorigtbl;
+ AttrNumber resorigcol;
+ int16 format;
+
+ /*
+ * If column is a domain, send the base type and typmod instead.
+ * Lookup before sending any ints, for efficiency.
+ */
+ atttypid = getBaseTypeAndTypmod(atttypid, &atttypmod);
+
+ /* Do we have a non-resjunk tlist item? */
+ while (tlist_item &&
+ ((TargetEntry *) lfirst(tlist_item))->resjunk)
+ tlist_item = lnext(targetlist, tlist_item);
+ if (tlist_item)
+ {
+ TargetEntry *tle = (TargetEntry *) lfirst(tlist_item);
+
+ resorigtbl = tle->resorigtbl;
+ resorigcol = tle->resorigcol;
+ tlist_item = lnext(targetlist, tlist_item);
+ }
+ else
+ {
+ /* No info available, so send zeroes */
+ resorigtbl = 0;
+ resorigcol = 0;
+ }
+
+ if (formats)
+ format = formats[i];
+ else
+ format = 0;
+
+ pq_writestring(buf, NameStr(att->attname));
+ pq_writeint32(buf, resorigtbl);
+ pq_writeint16(buf, resorigcol);
+ pq_writeint32(buf, atttypid);
+ pq_writeint16(buf, att->attlen);
+ pq_writeint32(buf, atttypmod);
+ pq_writeint16(buf, format);
+ }
+
+ pq_endmessage_reuse(buf);
+}
+
+/*
+ * Get the lookup info that printtup() needs
+ */
+static void
+printtup_prepare_info(DR_printtup *myState, TupleDesc typeinfo, int numAttrs)
+{
+ int16 *formats = myState->portal->formats;
+ int i;
+
+ /* get rid of any old data */
+ if (myState->myinfo)
+ pfree(myState->myinfo);
+ myState->myinfo = NULL;
+
+ myState->attrinfo = typeinfo;
+ myState->nattrs = numAttrs;
+ if (numAttrs <= 0)
+ return;
+
+ myState->myinfo = (PrinttupAttrInfo *)
+ palloc0(numAttrs * sizeof(PrinttupAttrInfo));
+
+ for (i = 0; i < numAttrs; i++)
+ {
+ PrinttupAttrInfo *thisState = myState->myinfo + i;
+ int16 format = (formats ? formats[i] : 0);
+ Form_pg_attribute attr = TupleDescAttr(typeinfo, i);
+
+ thisState->format = format;
+ if (format == 0)
+ {
+ getTypeOutputInfo(attr->atttypid,
+ &thisState->typoutput,
+ &thisState->typisvarlena);
+ fmgr_info(thisState->typoutput, &thisState->finfo);
+ }
+ else if (format == 1)
+ {
+ getTypeBinaryOutputInfo(attr->atttypid,
+ &thisState->typsend,
+ &thisState->typisvarlena);
+ fmgr_info(thisState->typsend, &thisState->finfo);
+ }
+ else
+ ereport(ERROR,
+ (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+ errmsg("unsupported format code: %d", format)));
+ }
+}
+
+/* ----------------
+ * printtup --- send a tuple to the client
+ * ----------------
+ */
+static bool
+printtup(TupleTableSlot *slot, DestReceiver *self)
+{
+ TupleDesc typeinfo = slot->tts_tupleDescriptor;
+ DR_printtup *myState = (DR_printtup *) self;
+ MemoryContext oldcontext;
+ StringInfo buf = &myState->buf;
+ int natts = typeinfo->natts;
+ int i;
+
+ /* Set or update my derived attribute info, if needed */
+ if (myState->attrinfo != typeinfo || myState->nattrs != natts)
+ printtup_prepare_info(myState, typeinfo, natts);
+
+ /* Make sure the tuple is fully deconstructed */
+ slot_getallattrs(slot);
+
+ /* Switch into per-row context so we can recover memory below */
+ oldcontext = MemoryContextSwitchTo(myState->tmpcontext);
+
+ /*
+ * Prepare a DataRow message (note buffer is in per-row context)
+ */
+ pq_beginmessage_reuse(buf, 'D');
+
+ pq_sendint16(buf, natts);
+
+ /*
+ * send the attributes of this tuple
+ */
+ for (i = 0; i < natts; ++i)
+ {
+ PrinttupAttrInfo *thisState = myState->myinfo + i;
+ Datum attr = slot->tts_values[i];
+
+ if (slot->tts_isnull[i])
+ {
+ pq_sendint32(buf, -1);
+ continue;
+ }
+
+ /*
+ * Here we catch undefined bytes in datums that are returned to the
+ * client without hitting disk; see comments at the related check in
+ * PageAddItem(). This test is most useful for uncompressed,
+ * non-external datums, but we're quite likely to see such here when
+ * testing new C functions.
+ */
+ if (thisState->typisvarlena)
+ VALGRIND_CHECK_MEM_IS_DEFINED(DatumGetPointer(attr),
+ VARSIZE_ANY(attr));
+
+ if (thisState->format == 0)
+ {
+ /* Text output */
+ char *outputstr;
+
+ outputstr = OutputFunctionCall(&thisState->finfo, attr);
+ pq_sendcountedtext(buf, outputstr, strlen(outputstr), false);
+ }
+ else
+ {
+ /* Binary output */
+ bytea *outputbytes;
+
+ outputbytes = SendFunctionCall(&thisState->finfo, attr);
+ pq_sendint32(buf, VARSIZE(outputbytes) - VARHDRSZ);
+ pq_sendbytes(buf, VARDATA(outputbytes),
+ VARSIZE(outputbytes) - VARHDRSZ);
+ }
+ }
+
+ pq_endmessage_reuse(buf);
+
+ /* Return to caller's context, and flush row's temporary memory */
+ MemoryContextSwitchTo(oldcontext);
+ MemoryContextReset(myState->tmpcontext);
+
+ return true;
+}
+
+/* ----------------
+ * printtup_shutdown
+ * ----------------
+ */
+static void
+printtup_shutdown(DestReceiver *self)
+{
+ DR_printtup *myState = (DR_printtup *) self;
+
+ if (myState->myinfo)
+ pfree(myState->myinfo);
+ myState->myinfo = NULL;
+
+ myState->attrinfo = NULL;
+
+ if (myState->buf.data)
+ pfree(myState->buf.data);
+ myState->buf.data = NULL;
+
+ if (myState->tmpcontext)
+ MemoryContextDelete(myState->tmpcontext);
+ myState->tmpcontext = NULL;
+}
+
+/* ----------------
+ * printtup_destroy
+ * ----------------
+ */
+static void
+printtup_destroy(DestReceiver *self)
+{
+ pfree(self);
+}
+
+/* ----------------
+ * printatt
+ * ----------------
+ */
+static void
+printatt(unsigned attributeId,
+ Form_pg_attribute attributeP,
+ char *value)
+{
+ printf("\t%2d: %s%s%s%s\t(typeid = %u, len = %d, typmod = %d, byval = %c)\n",
+ attributeId,
+ NameStr(attributeP->attname),
+ value != NULL ? " = \"" : "",
+ value != NULL ? value : "",
+ value != NULL ? "\"" : "",
+ (unsigned int) (attributeP->atttypid),
+ attributeP->attlen,
+ attributeP->atttypmod,
+ attributeP->attbyval ? 't' : 'f');
+}
+
+/* ----------------
+ * debugStartup - prepare to print tuples for an interactive backend
+ * ----------------
+ */
+void
+debugStartup(DestReceiver *self, int operation, TupleDesc typeinfo)
+{
+ int natts = typeinfo->natts;
+ int i;
+
+ /*
+ * show the return type of the tuples
+ */
+ for (i = 0; i < natts; ++i)
+ printatt((unsigned) i + 1, TupleDescAttr(typeinfo, i), NULL);
+ printf("\t----\n");
+}
+
+/* ----------------
+ * debugtup - print one tuple for an interactive backend
+ * ----------------
+ */
+bool
+debugtup(TupleTableSlot *slot, DestReceiver *self)
+{
+ TupleDesc typeinfo = slot->tts_tupleDescriptor;
+ int natts = typeinfo->natts;
+ int i;
+ Datum attr;
+ char *value;
+ bool isnull;
+ Oid typoutput;
+ bool typisvarlena;
+
+ for (i = 0; i < natts; ++i)
+ {
+ attr = slot_getattr(slot, i + 1, &isnull);
+ if (isnull)
+ continue;
+ getTypeOutputInfo(TupleDescAttr(typeinfo, i)->atttypid,
+ &typoutput, &typisvarlena);
+
+ value = OidOutputFunctionCall(typoutput, attr);
+
+ printatt((unsigned) i + 1, TupleDescAttr(typeinfo, i), value);
+ }
+ printf("\t----\n");
+
+ return true;
+}
diff --git a/src/backend/access/common/relation.c b/src/backend/access/common/relation.c
new file mode 100644
index 0000000..632d13c
--- /dev/null
+++ b/src/backend/access/common/relation.c
@@ -0,0 +1,217 @@
+/*-------------------------------------------------------------------------
+ *
+ * relation.c
+ * Generic relation related routines.
+ *
+ * Portions Copyright (c) 1996-2021, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ *
+ * IDENTIFICATION
+ * src/backend/access/common/relation.c
+ *
+ * NOTES
+ * This file contains relation_ routines that implement access to relations
+ * (tables, indexes, etc). Support that's specific to subtypes of relations
+ * should go into their respective files, not here.
+ *
+ *-------------------------------------------------------------------------
+ */
+
+#include "postgres.h"
+
+#include "access/relation.h"
+#include "access/xact.h"
+#include "catalog/namespace.h"
+#include "miscadmin.h"
+#include "pgstat.h"
+#include "storage/lmgr.h"
+#include "utils/inval.h"
+#include "utils/syscache.h"
+
+
+/* ----------------
+ * relation_open - open any relation by relation OID
+ *
+ * If lockmode is not "NoLock", the specified kind of lock is
+ * obtained on the relation. (Generally, NoLock should only be
+ * used if the caller knows it has some appropriate lock on the
+ * relation already.)
+ *
+ * An error is raised if the relation does not exist.
+ *
+ * NB: a "relation" is anything with a pg_class entry. The caller is
+ * expected to check whether the relkind is something it can handle.
+ * ----------------
+ */
+Relation
+relation_open(Oid relationId, LOCKMODE lockmode)
+{
+ Relation r;
+
+ Assert(lockmode >= NoLock && lockmode < MAX_LOCKMODES);
+
+ /* Get the lock before trying to open the relcache entry */
+ if (lockmode != NoLock)
+ LockRelationOid(relationId, lockmode);
+
+ /* The relcache does all the real work... */
+ r = RelationIdGetRelation(relationId);
+
+ if (!RelationIsValid(r))
+ elog(ERROR, "could not open relation with OID %u", relationId);
+
+ /*
+ * If we didn't get the lock ourselves, assert that caller holds one,
+ * except in bootstrap mode where no locks are used.
+ */
+ Assert(lockmode != NoLock ||
+ IsBootstrapProcessingMode() ||
+ CheckRelationLockedByMe(r, AccessShareLock, true));
+
+ /* Make note that we've accessed a temporary relation */
+ if (RelationUsesLocalBuffers(r))
+ MyXactFlags |= XACT_FLAGS_ACCESSEDTEMPNAMESPACE;
+
+ pgstat_initstats(r);
+
+ return r;
+}
+
+/* ----------------
+ * try_relation_open - open any relation by relation OID
+ *
+ * Same as relation_open, except return NULL instead of failing
+ * if the relation does not exist.
+ * ----------------
+ */
+Relation
+try_relation_open(Oid relationId, LOCKMODE lockmode)
+{
+ Relation r;
+
+ Assert(lockmode >= NoLock && lockmode < MAX_LOCKMODES);
+
+ /* Get the lock first */
+ if (lockmode != NoLock)
+ LockRelationOid(relationId, lockmode);
+
+ /*
+ * Now that we have the lock, probe to see if the relation really exists
+ * or not.
+ */
+ if (!SearchSysCacheExists1(RELOID, ObjectIdGetDatum(relationId)))
+ {
+ /* Release useless lock */
+ if (lockmode != NoLock)
+ UnlockRelationOid(relationId, lockmode);
+
+ return NULL;
+ }
+
+ /* Should be safe to do a relcache load */
+ r = RelationIdGetRelation(relationId);
+
+ if (!RelationIsValid(r))
+ elog(ERROR, "could not open relation with OID %u", relationId);
+
+ /* If we didn't get the lock ourselves, assert that caller holds one */
+ Assert(lockmode != NoLock ||
+ CheckRelationLockedByMe(r, AccessShareLock, true));
+
+ /* Make note that we've accessed a temporary relation */
+ if (RelationUsesLocalBuffers(r))
+ MyXactFlags |= XACT_FLAGS_ACCESSEDTEMPNAMESPACE;
+
+ pgstat_initstats(r);
+
+ return r;
+}
+
+/* ----------------
+ * relation_openrv - open any relation specified by a RangeVar
+ *
+ * Same as relation_open, but the relation is specified by a RangeVar.
+ * ----------------
+ */
+Relation
+relation_openrv(const RangeVar *relation, LOCKMODE lockmode)
+{
+ Oid relOid;
+
+ /*
+ * Check for shared-cache-inval messages before trying to open the
+ * relation. This is needed even if we already hold a lock on the
+ * relation, because GRANT/REVOKE are executed without taking any lock on
+ * the target relation, and we want to be sure we see current ACL
+ * information. We can skip this if asked for NoLock, on the assumption
+ * that such a call is not the first one in the current command, and so we
+ * should be reasonably up-to-date already. (XXX this all could stand to
+ * be redesigned, but for the moment we'll keep doing this like it's been
+ * done historically.)
+ */
+ if (lockmode != NoLock)
+ AcceptInvalidationMessages();
+
+ /* Look up and lock the appropriate relation using namespace search */
+ relOid = RangeVarGetRelid(relation, lockmode, false);
+
+ /* Let relation_open do the rest */
+ return relation_open(relOid, NoLock);
+}
+
+/* ----------------
+ * relation_openrv_extended - open any relation specified by a RangeVar
+ *
+ * Same as relation_openrv, but with an additional missing_ok argument
+ * allowing a NULL return rather than an error if the relation is not
+ * found. (Note that some other causes, such as permissions problems,
+ * will still result in an ereport.)
+ * ----------------
+ */
+Relation
+relation_openrv_extended(const RangeVar *relation, LOCKMODE lockmode,
+ bool missing_ok)
+{
+ Oid relOid;
+
+ /*
+ * Check for shared-cache-inval messages before trying to open the
+ * relation. See comments in relation_openrv().
+ */
+ if (lockmode != NoLock)
+ AcceptInvalidationMessages();
+
+ /* Look up and lock the appropriate relation using namespace search */
+ relOid = RangeVarGetRelid(relation, lockmode, missing_ok);
+
+ /* Return NULL on not-found */
+ if (!OidIsValid(relOid))
+ return NULL;
+
+ /* Let relation_open do the rest */
+ return relation_open(relOid, NoLock);
+}
+
+/* ----------------
+ * relation_close - close any relation
+ *
+ * If lockmode is not "NoLock", we then release the specified lock.
+ *
+ * Note that it is often sensible to hold a lock beyond relation_close;
+ * in that case, the lock is released automatically at xact end.
+ * ----------------
+ */
+void
+relation_close(Relation relation, LOCKMODE lockmode)
+{
+ LockRelId relid = relation->rd_lockInfo.lockRelId;
+
+ Assert(lockmode >= NoLock && lockmode < MAX_LOCKMODES);
+
+ /* The relcache does the real work... */
+ RelationClose(relation);
+
+ if (lockmode != NoLock)
+ UnlockRelationId(&relid, lockmode);
+}
diff --git a/src/backend/access/common/reloptions.c b/src/backend/access/common/reloptions.c
new file mode 100644
index 0000000..b5602f5
--- /dev/null
+++ b/src/backend/access/common/reloptions.c
@@ -0,0 +1,2131 @@
+/*-------------------------------------------------------------------------
+ *
+ * reloptions.c
+ * Core support for relation options (pg_class.reloptions)
+ *
+ * Portions Copyright (c) 1996-2021, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ *
+ * IDENTIFICATION
+ * src/backend/access/common/reloptions.c
+ *
+ *-------------------------------------------------------------------------
+ */
+
+#include "postgres.h"
+
+#include <float.h>
+
+#include "access/gist_private.h"
+#include "access/hash.h"
+#include "access/heaptoast.h"
+#include "access/htup_details.h"
+#include "access/nbtree.h"
+#include "access/reloptions.h"
+#include "access/spgist_private.h"
+#include "catalog/pg_type.h"
+#include "commands/defrem.h"
+#include "commands/tablespace.h"
+#include "commands/view.h"
+#include "nodes/makefuncs.h"
+#include "postmaster/postmaster.h"
+#include "utils/array.h"
+#include "utils/attoptcache.h"
+#include "utils/builtins.h"
+#include "utils/guc.h"
+#include "utils/memutils.h"
+#include "utils/rel.h"
+
+/*
+ * Contents of pg_class.reloptions
+ *
+ * To add an option:
+ *
+ * (i) decide on a type (integer, real, bool, string), name, default value,
+ * upper and lower bounds (if applicable); for strings, consider a validation
+ * routine.
+ * (ii) add a record below (or use add_<type>_reloption).
+ * (iii) add it to the appropriate options struct (perhaps StdRdOptions)
+ * (iv) add it to the appropriate handling routine (perhaps
+ * default_reloptions)
+ * (v) make sure the lock level is set correctly for that operation
+ * (vi) don't forget to document the option
+ *
+ * The default choice for any new option should be AccessExclusiveLock.
+ * In some cases the lock level can be reduced from there, but the lock
+ * level chosen should always conflict with itself to ensure that multiple
+ * changes aren't lost when we attempt concurrent changes.
+ * The choice of lock level depends completely upon how that parameter
+ * is used within the server, not upon how and when you'd like to change it.
+ * Safety first. Existing choices are documented here, and elsewhere in
+ * backend code where the parameters are used.
+ *
+ * In general, anything that affects the results obtained from a SELECT must be
+ * protected by AccessExclusiveLock.
+ *
+ * Autovacuum related parameters can be set at ShareUpdateExclusiveLock
+ * since they are only used by the AV procs and don't change anything
+ * currently executing.
+ *
+ * Fillfactor can be set because it applies only to subsequent changes made to
+ * data blocks, as documented in hio.c
+ *
+ * n_distinct options can be set at ShareUpdateExclusiveLock because they
+ * are only used during ANALYZE, which uses a ShareUpdateExclusiveLock,
+ * so the ANALYZE will not be affected by in-flight changes. Changing those
+ * values has no effect until the next ANALYZE, so no need for stronger lock.
+ *
+ * Planner-related parameters can be set with ShareUpdateExclusiveLock because
+ * they only affect planning and not the correctness of the execution. Plans
+ * cannot be changed in mid-flight, so changes here could not easily result in
+ * new improved plans in any case. So we allow existing queries to continue
+ * and existing plans to survive, a small price to pay for allowing better
+ * plans to be introduced concurrently without interfering with users.
+ *
+ * Setting parallel_workers is safe, since it acts the same as
+ * max_parallel_workers_per_gather which is a USERSET parameter that doesn't
+ * affect existing plans or queries.
+ *
+ * vacuum_truncate can be set at ShareUpdateExclusiveLock because it
+ * is only used during VACUUM, which uses a ShareUpdateExclusiveLock,
+ * so the VACUUM will not be affected by in-flight changes. Changing its
+ * value has no effect until the next VACUUM, so no need for stronger lock.
+ */
+
+static relopt_bool boolRelOpts[] =
+{
+ {
+ {
+ "autosummarize",
+ "Enables automatic summarization on this BRIN index",
+ RELOPT_KIND_BRIN,
+ AccessExclusiveLock
+ },
+ false
+ },
+ {
+ {
+ "autovacuum_enabled",
+ "Enables autovacuum in this relation",
+ RELOPT_KIND_HEAP | RELOPT_KIND_TOAST,
+ ShareUpdateExclusiveLock
+ },
+ true
+ },
+ {
+ {
+ "user_catalog_table",
+ "Declare a table as an additional catalog table, e.g. for the purpose of logical replication",
+ RELOPT_KIND_HEAP,
+ AccessExclusiveLock
+ },
+ false
+ },
+ {
+ {
+ "fastupdate",
+ "Enables \"fast update\" feature for this GIN index",
+ RELOPT_KIND_GIN,
+ AccessExclusiveLock
+ },
+ true
+ },
+ {
+ {
+ "security_barrier",
+ "View acts as a row security barrier",
+ RELOPT_KIND_VIEW,
+ AccessExclusiveLock
+ },
+ false
+ },
+ {
+ {
+ "vacuum_truncate",
+ "Enables vacuum to truncate empty pages at the end of this table",
+ RELOPT_KIND_HEAP | RELOPT_KIND_TOAST,
+ ShareUpdateExclusiveLock
+ },
+ true
+ },
+ {
+ {
+ "deduplicate_items",
+ "Enables \"deduplicate items\" feature for this btree index",
+ RELOPT_KIND_BTREE,
+ ShareUpdateExclusiveLock /* since it applies only to later
+ * inserts */
+ },
+ true
+ },
+ /* list terminator */
+ {{NULL}}
+};
+
+static relopt_int intRelOpts[] =
+{
+ {
+ {
+ "fillfactor",
+ "Packs table pages only to this percentage",
+ RELOPT_KIND_HEAP,
+ ShareUpdateExclusiveLock /* since it applies only to later
+ * inserts */
+ },
+ HEAP_DEFAULT_FILLFACTOR, HEAP_MIN_FILLFACTOR, 100
+ },
+ {
+ {
+ "fillfactor",
+ "Packs btree index pages only to this percentage",
+ RELOPT_KIND_BTREE,
+ ShareUpdateExclusiveLock /* since it applies only to later
+ * inserts */
+ },
+ BTREE_DEFAULT_FILLFACTOR, BTREE_MIN_FILLFACTOR, 100
+ },
+ {
+ {
+ "fillfactor",
+ "Packs hash index pages only to this percentage",
+ RELOPT_KIND_HASH,
+ ShareUpdateExclusiveLock /* since it applies only to later
+ * inserts */
+ },
+ HASH_DEFAULT_FILLFACTOR, HASH_MIN_FILLFACTOR, 100
+ },
+ {
+ {
+ "fillfactor",
+ "Packs gist index pages only to this percentage",
+ RELOPT_KIND_GIST,
+ ShareUpdateExclusiveLock /* since it applies only to later
+ * inserts */
+ },
+ GIST_DEFAULT_FILLFACTOR, GIST_MIN_FILLFACTOR, 100
+ },
+ {
+ {
+ "fillfactor",
+ "Packs spgist index pages only to this percentage",
+ RELOPT_KIND_SPGIST,
+ ShareUpdateExclusiveLock /* since it applies only to later
+ * inserts */
+ },
+ SPGIST_DEFAULT_FILLFACTOR, SPGIST_MIN_FILLFACTOR, 100
+ },
+ {
+ {
+ "autovacuum_vacuum_threshold",
+ "Minimum number of tuple updates or deletes prior to vacuum",
+ RELOPT_KIND_HEAP | RELOPT_KIND_TOAST,
+ ShareUpdateExclusiveLock
+ },
+ -1, 0, INT_MAX
+ },
+ {
+ {
+ "autovacuum_vacuum_insert_threshold",
+ "Minimum number of tuple inserts prior to vacuum, or -1 to disable insert vacuums",
+ RELOPT_KIND_HEAP | RELOPT_KIND_TOAST,
+ ShareUpdateExclusiveLock
+ },
+ -2, -1, INT_MAX
+ },
+ {
+ {
+ "autovacuum_analyze_threshold",
+ "Minimum number of tuple inserts, updates or deletes prior to analyze",
+ RELOPT_KIND_HEAP,
+ ShareUpdateExclusiveLock
+ },
+ -1, 0, INT_MAX
+ },
+ {
+ {
+ "autovacuum_vacuum_cost_limit",
+ "Vacuum cost amount available before napping, for autovacuum",
+ RELOPT_KIND_HEAP | RELOPT_KIND_TOAST,
+ ShareUpdateExclusiveLock
+ },
+ -1, 1, 10000
+ },
+ {
+ {
+ "autovacuum_freeze_min_age",
+ "Minimum age at which VACUUM should freeze a table row, for autovacuum",
+ RELOPT_KIND_HEAP | RELOPT_KIND_TOAST,
+ ShareUpdateExclusiveLock
+ },
+ -1, 0, 1000000000
+ },
+ {
+ {
+ "autovacuum_multixact_freeze_min_age",
+ "Minimum multixact age at which VACUUM should freeze a row multixact's, for autovacuum",
+ RELOPT_KIND_HEAP | RELOPT_KIND_TOAST,
+ ShareUpdateExclusiveLock
+ },
+ -1, 0, 1000000000
+ },
+ {
+ {
+ "autovacuum_freeze_max_age",
+ "Age at which to autovacuum a table to prevent transaction ID wraparound",
+ RELOPT_KIND_HEAP | RELOPT_KIND_TOAST,
+ ShareUpdateExclusiveLock
+ },
+ -1, 100000, 2000000000
+ },
+ {
+ {
+ "autovacuum_multixact_freeze_max_age",
+ "Multixact age at which to autovacuum a table to prevent multixact wraparound",
+ RELOPT_KIND_HEAP | RELOPT_KIND_TOAST,
+ ShareUpdateExclusiveLock
+ },
+ -1, 10000, 2000000000
+ },
+ {
+ {
+ "autovacuum_freeze_table_age",
+ "Age at which VACUUM should perform a full table sweep to freeze row versions",
+ RELOPT_KIND_HEAP | RELOPT_KIND_TOAST,
+ ShareUpdateExclusiveLock
+ }, -1, 0, 2000000000
+ },
+ {
+ {
+ "autovacuum_multixact_freeze_table_age",
+ "Age of multixact at which VACUUM should perform a full table sweep to freeze row versions",
+ RELOPT_KIND_HEAP | RELOPT_KIND_TOAST,
+ ShareUpdateExclusiveLock
+ }, -1, 0, 2000000000
+ },
+ {
+ {
+ "log_autovacuum_min_duration",
+ "Sets the minimum execution time above which autovacuum actions will be logged",
+ RELOPT_KIND_HEAP | RELOPT_KIND_TOAST,
+ ShareUpdateExclusiveLock
+ },
+ -1, -1, INT_MAX
+ },
+ {
+ {
+ "toast_tuple_target",
+ "Sets the target tuple length at which external columns will be toasted",
+ RELOPT_KIND_HEAP,
+ ShareUpdateExclusiveLock
+ },
+ TOAST_TUPLE_TARGET, 128, TOAST_TUPLE_TARGET_MAIN
+ },
+ {
+ {
+ "pages_per_range",
+ "Number of pages that each page range covers in a BRIN index",
+ RELOPT_KIND_BRIN,
+ AccessExclusiveLock
+ }, 128, 1, 131072
+ },
+ {
+ {
+ "gin_pending_list_limit",
+ "Maximum size of the pending list for this GIN index, in kilobytes.",
+ RELOPT_KIND_GIN,
+ AccessExclusiveLock
+ },
+ -1, 64, MAX_KILOBYTES
+ },
+ {
+ {
+ "effective_io_concurrency",
+ "Number of simultaneous requests that can be handled efficiently by the disk subsystem.",
+ RELOPT_KIND_TABLESPACE,
+ ShareUpdateExclusiveLock
+ },
+#ifdef USE_PREFETCH
+ -1, 0, MAX_IO_CONCURRENCY
+#else
+ 0, 0, 0
+#endif
+ },
+ {
+ {
+ "maintenance_io_concurrency",
+ "Number of simultaneous requests that can be handled efficiently by the disk subsystem for maintenance work.",
+ RELOPT_KIND_TABLESPACE,
+ ShareUpdateExclusiveLock
+ },
+#ifdef USE_PREFETCH
+ -1, 0, MAX_IO_CONCURRENCY
+#else
+ 0, 0, 0
+#endif
+ },
+ {
+ {
+ "parallel_workers",
+ "Number of parallel processes that can be used per executor node for this relation.",
+ RELOPT_KIND_HEAP,
+ ShareUpdateExclusiveLock
+ },
+ -1, 0, 1024
+ },
+
+ /* list terminator */
+ {{NULL}}
+};
+
+static relopt_real realRelOpts[] =
+{
+ {
+ {
+ "autovacuum_vacuum_cost_delay",
+ "Vacuum cost delay in milliseconds, for autovacuum",
+ RELOPT_KIND_HEAP | RELOPT_KIND_TOAST,
+ ShareUpdateExclusiveLock
+ },
+ -1, 0.0, 100.0
+ },
+ {
+ {
+ "autovacuum_vacuum_scale_factor",
+ "Number of tuple updates or deletes prior to vacuum as a fraction of reltuples",
+ RELOPT_KIND_HEAP | RELOPT_KIND_TOAST,
+ ShareUpdateExclusiveLock
+ },
+ -1, 0.0, 100.0
+ },
+ {
+ {
+ "autovacuum_vacuum_insert_scale_factor",
+ "Number of tuple inserts prior to vacuum as a fraction of reltuples",
+ RELOPT_KIND_HEAP | RELOPT_KIND_TOAST,
+ ShareUpdateExclusiveLock
+ },
+ -1, 0.0, 100.0
+ },
+ {
+ {
+ "autovacuum_analyze_scale_factor",
+ "Number of tuple inserts, updates or deletes prior to analyze as a fraction of reltuples",
+ RELOPT_KIND_HEAP,
+ ShareUpdateExclusiveLock
+ },
+ -1, 0.0, 100.0
+ },
+ {
+ {
+ "seq_page_cost",
+ "Sets the planner's estimate of the cost of a sequentially fetched disk page.",
+ RELOPT_KIND_TABLESPACE,
+ ShareUpdateExclusiveLock
+ },
+ -1, 0.0, DBL_MAX
+ },
+ {
+ {
+ "random_page_cost",
+ "Sets the planner's estimate of the cost of a nonsequentially fetched disk page.",
+ RELOPT_KIND_TABLESPACE,
+ ShareUpdateExclusiveLock
+ },
+ -1, 0.0, DBL_MAX
+ },
+ {
+ {
+ "n_distinct",
+ "Sets the planner's estimate of the number of distinct values appearing in a column (excluding child relations).",
+ RELOPT_KIND_ATTRIBUTE,
+ ShareUpdateExclusiveLock
+ },
+ 0, -1.0, DBL_MAX
+ },
+ {
+ {
+ "n_distinct_inherited",
+ "Sets the planner's estimate of the number of distinct values appearing in a column (including child relations).",
+ RELOPT_KIND_ATTRIBUTE,
+ ShareUpdateExclusiveLock
+ },
+ 0, -1.0, DBL_MAX
+ },
+ {
+ {
+ "vacuum_cleanup_index_scale_factor",
+ "Deprecated B-Tree parameter.",
+ RELOPT_KIND_BTREE,
+ ShareUpdateExclusiveLock
+ },
+ -1, 0.0, 1e10
+ },
+ /* list terminator */
+ {{NULL}}
+};
+
+/* values from StdRdOptIndexCleanup */
+relopt_enum_elt_def StdRdOptIndexCleanupValues[] =
+{
+ {"auto", STDRD_OPTION_VACUUM_INDEX_CLEANUP_AUTO},
+ {"on", STDRD_OPTION_VACUUM_INDEX_CLEANUP_ON},
+ {"off", STDRD_OPTION_VACUUM_INDEX_CLEANUP_OFF},
+ {"true", STDRD_OPTION_VACUUM_INDEX_CLEANUP_ON},
+ {"false", STDRD_OPTION_VACUUM_INDEX_CLEANUP_OFF},
+ {"yes", STDRD_OPTION_VACUUM_INDEX_CLEANUP_ON},
+ {"no", STDRD_OPTION_VACUUM_INDEX_CLEANUP_OFF},
+ {"1", STDRD_OPTION_VACUUM_INDEX_CLEANUP_ON},
+ {"0", STDRD_OPTION_VACUUM_INDEX_CLEANUP_OFF},
+ {(const char *) NULL} /* list terminator */
+};
+
+/* values from GistOptBufferingMode */
+relopt_enum_elt_def gistBufferingOptValues[] =
+{
+ {"auto", GIST_OPTION_BUFFERING_AUTO},
+ {"on", GIST_OPTION_BUFFERING_ON},
+ {"off", GIST_OPTION_BUFFERING_OFF},
+ {(const char *) NULL} /* list terminator */
+};
+
+/* values from ViewOptCheckOption */
+relopt_enum_elt_def viewCheckOptValues[] =
+{
+ /* no value for NOT_SET */
+ {"local", VIEW_OPTION_CHECK_OPTION_LOCAL},
+ {"cascaded", VIEW_OPTION_CHECK_OPTION_CASCADED},
+ {(const char *) NULL} /* list terminator */
+};
+
+static relopt_enum enumRelOpts[] =
+{
+ {
+ {
+ "vacuum_index_cleanup",
+ "Controls index vacuuming and index cleanup",
+ RELOPT_KIND_HEAP | RELOPT_KIND_TOAST,
+ ShareUpdateExclusiveLock
+ },
+ StdRdOptIndexCleanupValues,
+ STDRD_OPTION_VACUUM_INDEX_CLEANUP_AUTO,
+ gettext_noop("Valid values are \"on\", \"off\", and \"auto\".")
+ },
+ {
+ {
+ "buffering",
+ "Enables buffering build for this GiST index",
+ RELOPT_KIND_GIST,
+ AccessExclusiveLock
+ },
+ gistBufferingOptValues,
+ GIST_OPTION_BUFFERING_AUTO,
+ gettext_noop("Valid values are \"on\", \"off\", and \"auto\".")
+ },
+ {
+ {
+ "check_option",
+ "View has WITH CHECK OPTION defined (local or cascaded).",
+ RELOPT_KIND_VIEW,
+ AccessExclusiveLock
+ },
+ viewCheckOptValues,
+ VIEW_OPTION_CHECK_OPTION_NOT_SET,
+ gettext_noop("Valid values are \"local\" and \"cascaded\".")
+ },
+ /* list terminator */
+ {{NULL}}
+};
+
+static relopt_string stringRelOpts[] =
+{
+ /* list terminator */
+ {{NULL}}
+};
+
+static relopt_gen **relOpts = NULL;
+static bits32 last_assigned_kind = RELOPT_KIND_LAST_DEFAULT;
+
+static int num_custom_options = 0;
+static relopt_gen **custom_options = NULL;
+static bool need_initialization = true;
+
+static void initialize_reloptions(void);
+static void parse_one_reloption(relopt_value *option, char *text_str,
+ int text_len, bool validate);
+
+/*
+ * Get the length of a string reloption (either default or the user-defined
+ * value). This is used for allocation purposes when building a set of
+ * relation options.
+ */
+#define GET_STRING_RELOPTION_LEN(option) \
+ ((option).isset ? strlen((option).values.string_val) : \
+ ((relopt_string *) (option).gen)->default_len)
+
+/*
+ * initialize_reloptions
+ * initialization routine, must be called before parsing
+ *
+ * Initialize the relOpts array and fill each variable's type and name length.
+ */
+static void
+initialize_reloptions(void)
+{
+ int i;
+ int j;
+
+ j = 0;
+ for (i = 0; boolRelOpts[i].gen.name; i++)
+ {
+ Assert(DoLockModesConflict(boolRelOpts[i].gen.lockmode,
+ boolRelOpts[i].gen.lockmode));
+ j++;
+ }
+ for (i = 0; intRelOpts[i].gen.name; i++)
+ {
+ Assert(DoLockModesConflict(intRelOpts[i].gen.lockmode,
+ intRelOpts[i].gen.lockmode));
+ j++;
+ }
+ for (i = 0; realRelOpts[i].gen.name; i++)
+ {
+ Assert(DoLockModesConflict(realRelOpts[i].gen.lockmode,
+ realRelOpts[i].gen.lockmode));
+ j++;
+ }
+ for (i = 0; enumRelOpts[i].gen.name; i++)
+ {
+ Assert(DoLockModesConflict(enumRelOpts[i].gen.lockmode,
+ enumRelOpts[i].gen.lockmode));
+ j++;
+ }
+ for (i = 0; stringRelOpts[i].gen.name; i++)
+ {
+ Assert(DoLockModesConflict(stringRelOpts[i].gen.lockmode,
+ stringRelOpts[i].gen.lockmode));
+ j++;
+ }
+ j += num_custom_options;
+
+ if (relOpts)
+ pfree(relOpts);
+ relOpts = MemoryContextAlloc(TopMemoryContext,
+ (j + 1) * sizeof(relopt_gen *));
+
+ j = 0;
+ for (i = 0; boolRelOpts[i].gen.name; i++)
+ {
+ relOpts[j] = &boolRelOpts[i].gen;
+ relOpts[j]->type = RELOPT_TYPE_BOOL;
+ relOpts[j]->namelen = strlen(relOpts[j]->name);
+ j++;
+ }
+
+ for (i = 0; intRelOpts[i].gen.name; i++)
+ {
+ relOpts[j] = &intRelOpts[i].gen;
+ relOpts[j]->type = RELOPT_TYPE_INT;
+ relOpts[j]->namelen = strlen(relOpts[j]->name);
+ j++;
+ }
+
+ for (i = 0; realRelOpts[i].gen.name; i++)
+ {
+ relOpts[j] = &realRelOpts[i].gen;
+ relOpts[j]->type = RELOPT_TYPE_REAL;
+ relOpts[j]->namelen = strlen(relOpts[j]->name);
+ j++;
+ }
+
+ for (i = 0; enumRelOpts[i].gen.name; i++)
+ {
+ relOpts[j] = &enumRelOpts[i].gen;
+ relOpts[j]->type = RELOPT_TYPE_ENUM;
+ relOpts[j]->namelen = strlen(relOpts[j]->name);
+ j++;
+ }
+
+ for (i = 0; stringRelOpts[i].gen.name; i++)
+ {
+ relOpts[j] = &stringRelOpts[i].gen;
+ relOpts[j]->type = RELOPT_TYPE_STRING;
+ relOpts[j]->namelen = strlen(relOpts[j]->name);
+ j++;
+ }
+
+ for (i = 0; i < num_custom_options; i++)
+ {
+ relOpts[j] = custom_options[i];
+ j++;
+ }
+
+ /* add a list terminator */
+ relOpts[j] = NULL;
+
+ /* flag the work is complete */
+ need_initialization = false;
+}
+
+/*
+ * add_reloption_kind
+ * Create a new relopt_kind value, to be used in custom reloptions by
+ * user-defined AMs.
+ */
+relopt_kind
+add_reloption_kind(void)
+{
+ /* don't hand out the last bit so that the enum's behavior is portable */
+ if (last_assigned_kind >= RELOPT_KIND_MAX)
+ ereport(ERROR,
+ (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
+ errmsg("user-defined relation parameter types limit exceeded")));
+ last_assigned_kind <<= 1;
+ return (relopt_kind) last_assigned_kind;
+}
+
+/*
+ * add_reloption
+ * Add an already-created custom reloption to the list, and recompute the
+ * main parser table.
+ */
+static void
+add_reloption(relopt_gen *newoption)
+{
+ static int max_custom_options = 0;
+
+ if (num_custom_options >= max_custom_options)
+ {
+ MemoryContext oldcxt;
+
+ oldcxt = MemoryContextSwitchTo(TopMemoryContext);
+
+ if (max_custom_options == 0)
+ {
+ max_custom_options = 8;
+ custom_options = palloc(max_custom_options * sizeof(relopt_gen *));
+ }
+ else
+ {
+ max_custom_options *= 2;
+ custom_options = repalloc(custom_options,
+ max_custom_options * sizeof(relopt_gen *));
+ }
+ MemoryContextSwitchTo(oldcxt);
+ }
+ custom_options[num_custom_options++] = newoption;
+
+ need_initialization = true;
+}
+
+/*
+ * init_local_reloptions
+ * Initialize local reloptions that will parsed into bytea structure of
+ * 'relopt_struct_size'.
+ */
+void
+init_local_reloptions(local_relopts *opts, Size relopt_struct_size)
+{
+ opts->options = NIL;
+ opts->validators = NIL;
+ opts->relopt_struct_size = relopt_struct_size;
+}
+
+/*
+ * register_reloptions_validator
+ * Register custom validation callback that will be called at the end of
+ * build_local_reloptions().
+ */
+void
+register_reloptions_validator(local_relopts *opts, relopts_validator validator)
+{
+ opts->validators = lappend(opts->validators, validator);
+}
+
+/*
+ * add_local_reloption
+ * Add an already-created custom reloption to the local list.
+ */
+static void
+add_local_reloption(local_relopts *relopts, relopt_gen *newoption, int offset)
+{
+ local_relopt *opt = palloc(sizeof(*opt));
+
+ Assert(offset < relopts->relopt_struct_size);
+
+ opt->option = newoption;
+ opt->offset = offset;
+
+ relopts->options = lappend(relopts->options, opt);
+}
+
+/*
+ * allocate_reloption
+ * Allocate a new reloption and initialize the type-agnostic fields
+ * (for types other than string)
+ */
+static relopt_gen *
+allocate_reloption(bits32 kinds, int type, const char *name, const char *desc,
+ LOCKMODE lockmode)
+{
+ MemoryContext oldcxt;
+ size_t size;
+ relopt_gen *newoption;
+
+ if (kinds != RELOPT_KIND_LOCAL)
+ oldcxt = MemoryContextSwitchTo(TopMemoryContext);
+ else
+ oldcxt = NULL;
+
+ switch (type)
+ {
+ case RELOPT_TYPE_BOOL:
+ size = sizeof(relopt_bool);
+ break;
+ case RELOPT_TYPE_INT:
+ size = sizeof(relopt_int);
+ break;
+ case RELOPT_TYPE_REAL:
+ size = sizeof(relopt_real);
+ break;
+ case RELOPT_TYPE_ENUM:
+ size = sizeof(relopt_enum);
+ break;
+ case RELOPT_TYPE_STRING:
+ size = sizeof(relopt_string);
+ break;
+ default:
+ elog(ERROR, "unsupported reloption type %d", type);
+ return NULL; /* keep compiler quiet */
+ }
+
+ newoption = palloc(size);
+
+ newoption->name = pstrdup(name);
+ if (desc)
+ newoption->desc = pstrdup(desc);
+ else
+ newoption->desc = NULL;
+ newoption->kinds = kinds;
+ newoption->namelen = strlen(name);
+ newoption->type = type;
+ newoption->lockmode = lockmode;
+
+ if (oldcxt != NULL)
+ MemoryContextSwitchTo(oldcxt);
+
+ return newoption;
+}
+
+/*
+ * init_bool_reloption
+ * Allocate and initialize a new boolean reloption
+ */
+static relopt_bool *
+init_bool_reloption(bits32 kinds, const char *name, const char *desc,
+ bool default_val, LOCKMODE lockmode)
+{
+ relopt_bool *newoption;
+
+ newoption = (relopt_bool *) allocate_reloption(kinds, RELOPT_TYPE_BOOL,
+ name, desc, lockmode);
+ newoption->default_val = default_val;
+
+ return newoption;
+}
+
+/*
+ * add_bool_reloption
+ * Add a new boolean reloption
+ */
+void
+add_bool_reloption(bits32 kinds, const char *name, const char *desc,
+ bool default_val, LOCKMODE lockmode)
+{
+ relopt_bool *newoption = init_bool_reloption(kinds, name, desc,
+ default_val, lockmode);
+
+ add_reloption((relopt_gen *) newoption);
+}
+
+/*
+ * add_local_bool_reloption
+ * Add a new boolean local reloption
+ *
+ * 'offset' is offset of bool-typed field.
+ */
+void
+add_local_bool_reloption(local_relopts *relopts, const char *name,
+ const char *desc, bool default_val, int offset)
+{
+ relopt_bool *newoption = init_bool_reloption(RELOPT_KIND_LOCAL,
+ name, desc,
+ default_val, 0);
+
+ add_local_reloption(relopts, (relopt_gen *) newoption, offset);
+}
+
+
+/*
+ * init_real_reloption
+ * Allocate and initialize a new integer reloption
+ */
+static relopt_int *
+init_int_reloption(bits32 kinds, const char *name, const char *desc,
+ int default_val, int min_val, int max_val,
+ LOCKMODE lockmode)
+{
+ relopt_int *newoption;
+
+ newoption = (relopt_int *) allocate_reloption(kinds, RELOPT_TYPE_INT,
+ name, desc, lockmode);
+ newoption->default_val = default_val;
+ newoption->min = min_val;
+ newoption->max = max_val;
+
+ return newoption;
+}
+
+/*
+ * add_int_reloption
+ * Add a new integer reloption
+ */
+void
+add_int_reloption(bits32 kinds, const char *name, const char *desc, int default_val,
+ int min_val, int max_val, LOCKMODE lockmode)
+{
+ relopt_int *newoption = init_int_reloption(kinds, name, desc,
+ default_val, min_val,
+ max_val, lockmode);
+
+ add_reloption((relopt_gen *) newoption);
+}
+
+/*
+ * add_local_int_reloption
+ * Add a new local integer reloption
+ *
+ * 'offset' is offset of int-typed field.
+ */
+void
+add_local_int_reloption(local_relopts *relopts, const char *name,
+ const char *desc, int default_val, int min_val,
+ int max_val, int offset)
+{
+ relopt_int *newoption = init_int_reloption(RELOPT_KIND_LOCAL,
+ name, desc, default_val,
+ min_val, max_val, 0);
+
+ add_local_reloption(relopts, (relopt_gen *) newoption, offset);
+}
+
+/*
+ * init_real_reloption
+ * Allocate and initialize a new real reloption
+ */
+static relopt_real *
+init_real_reloption(bits32 kinds, const char *name, const char *desc,
+ double default_val, double min_val, double max_val,
+ LOCKMODE lockmode)
+{
+ relopt_real *newoption;
+
+ newoption = (relopt_real *) allocate_reloption(kinds, RELOPT_TYPE_REAL,
+ name, desc, lockmode);
+ newoption->default_val = default_val;
+ newoption->min = min_val;
+ newoption->max = max_val;
+
+ return newoption;
+}
+
+/*
+ * add_real_reloption
+ * Add a new float reloption
+ */
+void
+add_real_reloption(bits32 kinds, const char *name, const char *desc,
+ double default_val, double min_val, double max_val,
+ LOCKMODE lockmode)
+{
+ relopt_real *newoption = init_real_reloption(kinds, name, desc,
+ default_val, min_val,
+ max_val, lockmode);
+
+ add_reloption((relopt_gen *) newoption);
+}
+
+/*
+ * add_local_real_reloption
+ * Add a new local float reloption
+ *
+ * 'offset' is offset of double-typed field.
+ */
+void
+add_local_real_reloption(local_relopts *relopts, const char *name,
+ const char *desc, double default_val,
+ double min_val, double max_val, int offset)
+{
+ relopt_real *newoption = init_real_reloption(RELOPT_KIND_LOCAL,
+ name, desc,
+ default_val, min_val,
+ max_val, 0);
+
+ add_local_reloption(relopts, (relopt_gen *) newoption, offset);
+}
+
+/*
+ * init_enum_reloption
+ * Allocate and initialize a new enum reloption
+ */
+static relopt_enum *
+init_enum_reloption(bits32 kinds, const char *name, const char *desc,
+ relopt_enum_elt_def *members, int default_val,
+ const char *detailmsg, LOCKMODE lockmode)
+{
+ relopt_enum *newoption;
+
+ newoption = (relopt_enum *) allocate_reloption(kinds, RELOPT_TYPE_ENUM,
+ name, desc, lockmode);
+ newoption->members = members;
+ newoption->default_val = default_val;
+ newoption->detailmsg = detailmsg;
+
+ return newoption;
+}
+
+
+/*
+ * add_enum_reloption
+ * Add a new enum reloption
+ *
+ * The members array must have a terminating NULL entry.
+ *
+ * The detailmsg is shown when unsupported values are passed, and has this
+ * form: "Valid values are \"foo\", \"bar\", and \"bar\"."
+ *
+ * The members array and detailmsg are not copied -- caller must ensure that
+ * they are valid throughout the life of the process.
+ */
+void
+add_enum_reloption(bits32 kinds, const char *name, const char *desc,
+ relopt_enum_elt_def *members, int default_val,
+ const char *detailmsg, LOCKMODE lockmode)
+{
+ relopt_enum *newoption = init_enum_reloption(kinds, name, desc,
+ members, default_val,
+ detailmsg, lockmode);
+
+ add_reloption((relopt_gen *) newoption);
+}
+
+/*
+ * add_local_enum_reloption
+ * Add a new local enum reloption
+ *
+ * 'offset' is offset of int-typed field.
+ */
+void
+add_local_enum_reloption(local_relopts *relopts, const char *name,
+ const char *desc, relopt_enum_elt_def *members,
+ int default_val, const char *detailmsg, int offset)
+{
+ relopt_enum *newoption = init_enum_reloption(RELOPT_KIND_LOCAL,
+ name, desc,
+ members, default_val,
+ detailmsg, 0);
+
+ add_local_reloption(relopts, (relopt_gen *) newoption, offset);
+}
+
+/*
+ * init_string_reloption
+ * Allocate and initialize a new string reloption
+ */
+static relopt_string *
+init_string_reloption(bits32 kinds, const char *name, const char *desc,
+ const char *default_val,
+ validate_string_relopt validator,
+ fill_string_relopt filler,
+ LOCKMODE lockmode)
+{
+ relopt_string *newoption;
+
+ /* make sure the validator/default combination is sane */
+ if (validator)
+ (validator) (default_val);
+
+ newoption = (relopt_string *) allocate_reloption(kinds, RELOPT_TYPE_STRING,
+ name, desc, lockmode);
+ newoption->validate_cb = validator;
+ newoption->fill_cb = filler;
+ if (default_val)
+ {
+ if (kinds == RELOPT_KIND_LOCAL)
+ newoption->default_val = strdup(default_val);
+ else
+ newoption->default_val = MemoryContextStrdup(TopMemoryContext, default_val);
+ newoption->default_len = strlen(default_val);
+ newoption->default_isnull = false;
+ }
+ else
+ {
+ newoption->default_val = "";
+ newoption->default_len = 0;
+ newoption->default_isnull = true;
+ }
+
+ return newoption;
+}
+
+/*
+ * add_string_reloption
+ * Add a new string reloption
+ *
+ * "validator" is an optional function pointer that can be used to test the
+ * validity of the values. It must elog(ERROR) when the argument string is
+ * not acceptable for the variable. Note that the default value must pass
+ * the validation.
+ */
+void
+add_string_reloption(bits32 kinds, const char *name, const char *desc,
+ const char *default_val, validate_string_relopt validator,
+ LOCKMODE lockmode)
+{
+ relopt_string *newoption = init_string_reloption(kinds, name, desc,
+ default_val,
+ validator, NULL,
+ lockmode);
+
+ add_reloption((relopt_gen *) newoption);
+}
+
+/*
+ * add_local_string_reloption
+ * Add a new local string reloption
+ *
+ * 'offset' is offset of int-typed field that will store offset of string value
+ * in the resulting bytea structure.
+ */
+void
+add_local_string_reloption(local_relopts *relopts, const char *name,
+ const char *desc, const char *default_val,
+ validate_string_relopt validator,
+ fill_string_relopt filler, int offset)
+{
+ relopt_string *newoption = init_string_reloption(RELOPT_KIND_LOCAL,
+ name, desc,
+ default_val,
+ validator, filler,
+ 0);
+
+ add_local_reloption(relopts, (relopt_gen *) newoption, offset);
+}
+
+/*
+ * Transform a relation options list (list of DefElem) into the text array
+ * format that is kept in pg_class.reloptions, including only those options
+ * that are in the passed namespace. The output values do not include the
+ * namespace.
+ *
+ * This is used for three cases: CREATE TABLE/INDEX, ALTER TABLE SET, and
+ * ALTER TABLE RESET. In the ALTER cases, oldOptions is the existing
+ * reloptions value (possibly NULL), and we replace or remove entries
+ * as needed.
+ *
+ * If acceptOidsOff is true, then we allow oids = false, but throw error when
+ * on. This is solely needed for backwards compatibility.
+ *
+ * Note that this is not responsible for determining whether the options
+ * are valid, but it does check that namespaces for all the options given are
+ * listed in validnsps. The NULL namespace is always valid and need not be
+ * explicitly listed. Passing a NULL pointer means that only the NULL
+ * namespace is valid.
+ *
+ * Both oldOptions and the result are text arrays (or NULL for "default"),
+ * but we declare them as Datums to avoid including array.h in reloptions.h.
+ */
+Datum
+transformRelOptions(Datum oldOptions, List *defList, const char *namspace,
+ char *validnsps[], bool acceptOidsOff, bool isReset)
+{
+ Datum result;
+ ArrayBuildState *astate;
+ ListCell *cell;
+
+ /* no change if empty list */
+ if (defList == NIL)
+ return oldOptions;
+
+ /* We build new array using accumArrayResult */
+ astate = NULL;
+
+ /* Copy any oldOptions that aren't to be replaced */
+ if (PointerIsValid(DatumGetPointer(oldOptions)))
+ {
+ ArrayType *array = DatumGetArrayTypeP(oldOptions);
+ Datum *oldoptions;
+ int noldoptions;
+ int i;
+
+ deconstruct_array(array, TEXTOID, -1, false, TYPALIGN_INT,
+ &oldoptions, NULL, &noldoptions);
+
+ for (i = 0; i < noldoptions; i++)
+ {
+ char *text_str = VARDATA(oldoptions[i]);
+ int text_len = VARSIZE(oldoptions[i]) - VARHDRSZ;
+
+ /* Search for a match in defList */
+ foreach(cell, defList)
+ {
+ DefElem *def = (DefElem *) lfirst(cell);
+ int kw_len;
+
+ /* ignore if not in the same namespace */
+ if (namspace == NULL)
+ {
+ if (def->defnamespace != NULL)
+ continue;
+ }
+ else if (def->defnamespace == NULL)
+ continue;
+ else if (strcmp(def->defnamespace, namspace) != 0)
+ continue;
+
+ kw_len = strlen(def->defname);
+ if (text_len > kw_len && text_str[kw_len] == '=' &&
+ strncmp(text_str, def->defname, kw_len) == 0)
+ break;
+ }
+ if (!cell)
+ {
+ /* No match, so keep old option */
+ astate = accumArrayResult(astate, oldoptions[i],
+ false, TEXTOID,
+ CurrentMemoryContext);
+ }
+ }
+ }
+
+ /*
+ * If CREATE/SET, add new options to array; if RESET, just check that the
+ * user didn't say RESET (option=val). (Must do this because the grammar
+ * doesn't enforce it.)
+ */
+ foreach(cell, defList)
+ {
+ DefElem *def = (DefElem *) lfirst(cell);
+
+ if (isReset)
+ {
+ if (def->arg != NULL)
+ ereport(ERROR,
+ (errcode(ERRCODE_SYNTAX_ERROR),
+ errmsg("RESET must not include values for parameters")));
+ }
+ else
+ {
+ text *t;
+ const char *value;
+ Size len;
+
+ /*
+ * Error out if the namespace is not valid. A NULL namespace is
+ * always valid.
+ */
+ if (def->defnamespace != NULL)
+ {
+ bool valid = false;
+ int i;
+
+ if (validnsps)
+ {
+ for (i = 0; validnsps[i]; i++)
+ {
+ if (strcmp(def->defnamespace, validnsps[i]) == 0)
+ {
+ valid = true;
+ break;
+ }
+ }
+ }
+
+ if (!valid)
+ ereport(ERROR,
+ (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+ errmsg("unrecognized parameter namespace \"%s\"",
+ def->defnamespace)));
+ }
+
+ /* ignore if not in the same namespace */
+ if (namspace == NULL)
+ {
+ if (def->defnamespace != NULL)
+ continue;
+ }
+ else if (def->defnamespace == NULL)
+ continue;
+ else if (strcmp(def->defnamespace, namspace) != 0)
+ continue;
+
+ /*
+ * Flatten the DefElem into a text string like "name=arg". If we
+ * have just "name", assume "name=true" is meant. Note: the
+ * namespace is not output.
+ */
+ if (def->arg != NULL)
+ value = defGetString(def);
+ else
+ value = "true";
+
+ /*
+ * This is not a great place for this test, but there's no other
+ * convenient place to filter the option out. As WITH (oids =
+ * false) will be removed someday, this seems like an acceptable
+ * amount of ugly.
+ */
+ if (acceptOidsOff && def->defnamespace == NULL &&
+ strcmp(def->defname, "oids") == 0)
+ {
+ if (defGetBoolean(def))
+ ereport(ERROR,
+ (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+ errmsg("tables declared WITH OIDS are not supported")));
+ /* skip over option, reloptions machinery doesn't know it */
+ continue;
+ }
+
+ len = VARHDRSZ + strlen(def->defname) + 1 + strlen(value);
+ /* +1 leaves room for sprintf's trailing null */
+ t = (text *) palloc(len + 1);
+ SET_VARSIZE(t, len);
+ sprintf(VARDATA(t), "%s=%s", def->defname, value);
+
+ astate = accumArrayResult(astate, PointerGetDatum(t),
+ false, TEXTOID,
+ CurrentMemoryContext);
+ }
+ }
+
+ if (astate)
+ result = makeArrayResult(astate, CurrentMemoryContext);
+ else
+ result = (Datum) 0;
+
+ return result;
+}
+
+
+/*
+ * Convert the text-array format of reloptions into a List of DefElem.
+ * This is the inverse of transformRelOptions().
+ */
+List *
+untransformRelOptions(Datum options)
+{
+ List *result = NIL;
+ ArrayType *array;
+ Datum *optiondatums;
+ int noptions;
+ int i;
+
+ /* Nothing to do if no options */
+ if (!PointerIsValid(DatumGetPointer(options)))
+ return result;
+
+ array = DatumGetArrayTypeP(options);
+
+ deconstruct_array(array, TEXTOID, -1, false, TYPALIGN_INT,
+ &optiondatums, NULL, &noptions);
+
+ for (i = 0; i < noptions; i++)
+ {
+ char *s;
+ char *p;
+ Node *val = NULL;
+
+ s = TextDatumGetCString(optiondatums[i]);
+ p = strchr(s, '=');
+ if (p)
+ {
+ *p++ = '\0';
+ val = (Node *) makeString(pstrdup(p));
+ }
+ result = lappend(result, makeDefElem(pstrdup(s), val, -1));
+ }
+
+ return result;
+}
+
+/*
+ * Extract and parse reloptions from a pg_class tuple.
+ *
+ * This is a low-level routine, expected to be used by relcache code and
+ * callers that do not have a table's relcache entry (e.g. autovacuum). For
+ * other uses, consider grabbing the rd_options pointer from the relcache entry
+ * instead.
+ *
+ * tupdesc is pg_class' tuple descriptor. amoptions is a pointer to the index
+ * AM's options parser function in the case of a tuple corresponding to an
+ * index, or NULL otherwise.
+ */
+bytea *
+extractRelOptions(HeapTuple tuple, TupleDesc tupdesc,
+ amoptions_function amoptions)
+{
+ bytea *options;
+ bool isnull;
+ Datum datum;
+ Form_pg_class classForm;
+
+ datum = fastgetattr(tuple,
+ Anum_pg_class_reloptions,
+ tupdesc,
+ &isnull);
+ if (isnull)
+ return NULL;
+
+ classForm = (Form_pg_class) GETSTRUCT(tuple);
+
+ /* Parse into appropriate format; don't error out here */
+ switch (classForm->relkind)
+ {
+ case RELKIND_RELATION:
+ case RELKIND_TOASTVALUE:
+ case RELKIND_MATVIEW:
+ options = heap_reloptions(classForm->relkind, datum, false);
+ break;
+ case RELKIND_PARTITIONED_TABLE:
+ options = partitioned_table_reloptions(datum, false);
+ break;
+ case RELKIND_VIEW:
+ options = view_reloptions(datum, false);
+ break;
+ case RELKIND_INDEX:
+ case RELKIND_PARTITIONED_INDEX:
+ options = index_reloptions(amoptions, datum, false);
+ break;
+ case RELKIND_FOREIGN_TABLE:
+ options = NULL;
+ break;
+ default:
+ Assert(false); /* can't get here */
+ options = NULL; /* keep compiler quiet */
+ break;
+ }
+
+ return options;
+}
+
+static void
+parseRelOptionsInternal(Datum options, bool validate,
+ relopt_value *reloptions, int numoptions)
+{
+ ArrayType *array = DatumGetArrayTypeP(options);
+ Datum *optiondatums;
+ int noptions;
+ int i;
+
+ deconstruct_array(array, TEXTOID, -1, false, TYPALIGN_INT,
+ &optiondatums, NULL, &noptions);
+
+ for (i = 0; i < noptions; i++)
+ {
+ char *text_str = VARDATA(optiondatums[i]);
+ int text_len = VARSIZE(optiondatums[i]) - VARHDRSZ;
+ int j;
+
+ /* Search for a match in reloptions */
+ for (j = 0; j < numoptions; j++)
+ {
+ int kw_len = reloptions[j].gen->namelen;
+
+ if (text_len > kw_len && text_str[kw_len] == '=' &&
+ strncmp(text_str, reloptions[j].gen->name, kw_len) == 0)
+ {
+ parse_one_reloption(&reloptions[j], text_str, text_len,
+ validate);
+ break;
+ }
+ }
+
+ if (j >= numoptions && validate)
+ {
+ char *s;
+ char *p;
+
+ s = TextDatumGetCString(optiondatums[i]);
+ p = strchr(s, '=');
+ if (p)
+ *p = '\0';
+ ereport(ERROR,
+ (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+ errmsg("unrecognized parameter \"%s\"", s)));
+ }
+ }
+
+ /* It's worth avoiding memory leaks in this function */
+ pfree(optiondatums);
+
+ if (((void *) array) != DatumGetPointer(options))
+ pfree(array);
+}
+
+/*
+ * Interpret reloptions that are given in text-array format.
+ *
+ * options is a reloption text array as constructed by transformRelOptions.
+ * kind specifies the family of options to be processed.
+ *
+ * The return value is a relopt_value * array on which the options actually
+ * set in the options array are marked with isset=true. The length of this
+ * array is returned in *numrelopts. Options not set are also present in the
+ * array; this is so that the caller can easily locate the default values.
+ *
+ * If there are no options of the given kind, numrelopts is set to 0 and NULL
+ * is returned (unless options are illegally supplied despite none being
+ * defined, in which case an error occurs).
+ *
+ * Note: values of type int, bool and real are allocated as part of the
+ * returned array. Values of type string are allocated separately and must
+ * be freed by the caller.
+ */
+static relopt_value *
+parseRelOptions(Datum options, bool validate, relopt_kind kind,
+ int *numrelopts)
+{
+ relopt_value *reloptions = NULL;
+ int numoptions = 0;
+ int i;
+ int j;
+
+ if (need_initialization)
+ initialize_reloptions();
+
+ /* Build a list of expected options, based on kind */
+
+ for (i = 0; relOpts[i]; i++)
+ if (relOpts[i]->kinds & kind)
+ numoptions++;
+
+ if (numoptions > 0)
+ {
+ reloptions = palloc(numoptions * sizeof(relopt_value));
+
+ for (i = 0, j = 0; relOpts[i]; i++)
+ {
+ if (relOpts[i]->kinds & kind)
+ {
+ reloptions[j].gen = relOpts[i];
+ reloptions[j].isset = false;
+ j++;
+ }
+ }
+ }
+
+ /* Done if no options */
+ if (PointerIsValid(DatumGetPointer(options)))
+ parseRelOptionsInternal(options, validate, reloptions, numoptions);
+
+ *numrelopts = numoptions;
+ return reloptions;
+}
+
+/* Parse local unregistered options. */
+static relopt_value *
+parseLocalRelOptions(local_relopts *relopts, Datum options, bool validate)
+{
+ int nopts = list_length(relopts->options);
+ relopt_value *values = palloc(sizeof(*values) * nopts);
+ ListCell *lc;
+ int i = 0;
+
+ foreach(lc, relopts->options)
+ {
+ local_relopt *opt = lfirst(lc);
+
+ values[i].gen = opt->option;
+ values[i].isset = false;
+
+ i++;
+ }
+
+ if (options != (Datum) 0)
+ parseRelOptionsInternal(options, validate, values, nopts);
+
+ return values;
+}
+
+/*
+ * Subroutine for parseRelOptions, to parse and validate a single option's
+ * value
+ */
+static void
+parse_one_reloption(relopt_value *option, char *text_str, int text_len,
+ bool validate)
+{
+ char *value;
+ int value_len;
+ bool parsed;
+ bool nofree = false;
+
+ if (option->isset && validate)
+ ereport(ERROR,
+ (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+ errmsg("parameter \"%s\" specified more than once",
+ option->gen->name)));
+
+ value_len = text_len - option->gen->namelen - 1;
+ value = (char *) palloc(value_len + 1);
+ memcpy(value, text_str + option->gen->namelen + 1, value_len);
+ value[value_len] = '\0';
+
+ switch (option->gen->type)
+ {
+ case RELOPT_TYPE_BOOL:
+ {
+ parsed = parse_bool(value, &option->values.bool_val);
+ if (validate && !parsed)
+ ereport(ERROR,
+ (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+ errmsg("invalid value for boolean option \"%s\": %s",
+ option->gen->name, value)));
+ }
+ break;
+ case RELOPT_TYPE_INT:
+ {
+ relopt_int *optint = (relopt_int *) option->gen;
+
+ parsed = parse_int(value, &option->values.int_val, 0, NULL);
+ if (validate && !parsed)
+ ereport(ERROR,
+ (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+ errmsg("invalid value for integer option \"%s\": %s",
+ option->gen->name, value)));
+ if (validate && (option->values.int_val < optint->min ||
+ option->values.int_val > optint->max))
+ ereport(ERROR,
+ (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+ errmsg("value %s out of bounds for option \"%s\"",
+ value, option->gen->name),
+ errdetail("Valid values are between \"%d\" and \"%d\".",
+ optint->min, optint->max)));
+ }
+ break;
+ case RELOPT_TYPE_REAL:
+ {
+ relopt_real *optreal = (relopt_real *) option->gen;
+
+ parsed = parse_real(value, &option->values.real_val, 0, NULL);
+ if (validate && !parsed)
+ ereport(ERROR,
+ (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+ errmsg("invalid value for floating point option \"%s\": %s",
+ option->gen->name, value)));
+ if (validate && (option->values.real_val < optreal->min ||
+ option->values.real_val > optreal->max))
+ ereport(ERROR,
+ (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+ errmsg("value %s out of bounds for option \"%s\"",
+ value, option->gen->name),
+ errdetail("Valid values are between \"%f\" and \"%f\".",
+ optreal->min, optreal->max)));
+ }
+ break;
+ case RELOPT_TYPE_ENUM:
+ {
+ relopt_enum *optenum = (relopt_enum *) option->gen;
+ relopt_enum_elt_def *elt;
+
+ parsed = false;
+ for (elt = optenum->members; elt->string_val; elt++)
+ {
+ if (pg_strcasecmp(value, elt->string_val) == 0)
+ {
+ option->values.enum_val = elt->symbol_val;
+ parsed = true;
+ break;
+ }
+ }
+ if (validate && !parsed)
+ ereport(ERROR,
+ (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+ errmsg("invalid value for enum option \"%s\": %s",
+ option->gen->name, value),
+ optenum->detailmsg ?
+ errdetail_internal("%s", _(optenum->detailmsg)) : 0));
+
+ /*
+ * If value is not among the allowed string values, but we are
+ * not asked to validate, just use the default numeric value.
+ */
+ if (!parsed)
+ option->values.enum_val = optenum->default_val;
+ }
+ break;
+ case RELOPT_TYPE_STRING:
+ {
+ relopt_string *optstring = (relopt_string *) option->gen;
+
+ option->values.string_val = value;
+ nofree = true;
+ if (validate && optstring->validate_cb)
+ (optstring->validate_cb) (value);
+ parsed = true;
+ }
+ break;
+ default:
+ elog(ERROR, "unsupported reloption type %d", option->gen->type);
+ parsed = true; /* quiet compiler */
+ break;
+ }
+
+ if (parsed)
+ option->isset = true;
+ if (!nofree)
+ pfree(value);
+}
+
+/*
+ * Given the result from parseRelOptions, allocate a struct that's of the
+ * specified base size plus any extra space that's needed for string variables.
+ *
+ * "base" should be sizeof(struct) of the reloptions struct (StdRdOptions or
+ * equivalent).
+ */
+static void *
+allocateReloptStruct(Size base, relopt_value *options, int numoptions)
+{
+ Size size = base;
+ int i;
+
+ for (i = 0; i < numoptions; i++)
+ {
+ relopt_value *optval = &options[i];
+
+ if (optval->gen->type == RELOPT_TYPE_STRING)
+ {
+ relopt_string *optstr = (relopt_string *) optval->gen;
+
+ if (optstr->fill_cb)
+ {
+ const char *val = optval->isset ? optval->values.string_val :
+ optstr->default_isnull ? NULL : optstr->default_val;
+
+ size += optstr->fill_cb(val, NULL);
+ }
+ else
+ size += GET_STRING_RELOPTION_LEN(*optval) + 1;
+ }
+ }
+
+ return palloc0(size);
+}
+
+/*
+ * Given the result of parseRelOptions and a parsing table, fill in the
+ * struct (previously allocated with allocateReloptStruct) with the parsed
+ * values.
+ *
+ * rdopts is the pointer to the allocated struct to be filled.
+ * basesize is the sizeof(struct) that was passed to allocateReloptStruct.
+ * options, of length numoptions, is parseRelOptions' output.
+ * elems, of length numelems, is the table describing the allowed options.
+ * When validate is true, it is expected that all options appear in elems.
+ */
+static void
+fillRelOptions(void *rdopts, Size basesize,
+ relopt_value *options, int numoptions,
+ bool validate,
+ const relopt_parse_elt *elems, int numelems)
+{
+ int i;
+ int offset = basesize;
+
+ for (i = 0; i < numoptions; i++)
+ {
+ int j;
+ bool found = false;
+
+ for (j = 0; j < numelems; j++)
+ {
+ if (strcmp(options[i].gen->name, elems[j].optname) == 0)
+ {
+ relopt_string *optstring;
+ char *itempos = ((char *) rdopts) + elems[j].offset;
+ char *string_val;
+
+ switch (options[i].gen->type)
+ {
+ case RELOPT_TYPE_BOOL:
+ *(bool *) itempos = options[i].isset ?
+ options[i].values.bool_val :
+ ((relopt_bool *) options[i].gen)->default_val;
+ break;
+ case RELOPT_TYPE_INT:
+ *(int *) itempos = options[i].isset ?
+ options[i].values.int_val :
+ ((relopt_int *) options[i].gen)->default_val;
+ break;
+ case RELOPT_TYPE_REAL:
+ *(double *) itempos = options[i].isset ?
+ options[i].values.real_val :
+ ((relopt_real *) options[i].gen)->default_val;
+ break;
+ case RELOPT_TYPE_ENUM:
+ *(int *) itempos = options[i].isset ?
+ options[i].values.enum_val :
+ ((relopt_enum *) options[i].gen)->default_val;
+ break;
+ case RELOPT_TYPE_STRING:
+ optstring = (relopt_string *) options[i].gen;
+ if (options[i].isset)
+ string_val = options[i].values.string_val;
+ else if (!optstring->default_isnull)
+ string_val = optstring->default_val;
+ else
+ string_val = NULL;
+
+ if (optstring->fill_cb)
+ {
+ Size size =
+ optstring->fill_cb(string_val,
+ (char *) rdopts + offset);
+
+ if (size)
+ {
+ *(int *) itempos = offset;
+ offset += size;
+ }
+ else
+ *(int *) itempos = 0;
+ }
+ else if (string_val == NULL)
+ *(int *) itempos = 0;
+ else
+ {
+ strcpy((char *) rdopts + offset, string_val);
+ *(int *) itempos = offset;
+ offset += strlen(string_val) + 1;
+ }
+ break;
+ default:
+ elog(ERROR, "unsupported reloption type %d",
+ options[i].gen->type);
+ break;
+ }
+ found = true;
+ break;
+ }
+ }
+ if (validate && !found)
+ elog(ERROR, "reloption \"%s\" not found in parse table",
+ options[i].gen->name);
+ }
+ SET_VARSIZE(rdopts, offset);
+}
+
+
+/*
+ * Option parser for anything that uses StdRdOptions.
+ */
+bytea *
+default_reloptions(Datum reloptions, bool validate, relopt_kind kind)
+{
+ static const relopt_parse_elt tab[] = {
+ {"fillfactor", RELOPT_TYPE_INT, offsetof(StdRdOptions, fillfactor)},
+ {"autovacuum_enabled", RELOPT_TYPE_BOOL,
+ offsetof(StdRdOptions, autovacuum) + offsetof(AutoVacOpts, enabled)},
+ {"autovacuum_vacuum_threshold", RELOPT_TYPE_INT,
+ offsetof(StdRdOptions, autovacuum) + offsetof(AutoVacOpts, vacuum_threshold)},
+ {"autovacuum_vacuum_insert_threshold", RELOPT_TYPE_INT,
+ offsetof(StdRdOptions, autovacuum) + offsetof(AutoVacOpts, vacuum_ins_threshold)},
+ {"autovacuum_analyze_threshold", RELOPT_TYPE_INT,
+ offsetof(StdRdOptions, autovacuum) + offsetof(AutoVacOpts, analyze_threshold)},
+ {"autovacuum_vacuum_cost_limit", RELOPT_TYPE_INT,
+ offsetof(StdRdOptions, autovacuum) + offsetof(AutoVacOpts, vacuum_cost_limit)},
+ {"autovacuum_freeze_min_age", RELOPT_TYPE_INT,
+ offsetof(StdRdOptions, autovacuum) + offsetof(AutoVacOpts, freeze_min_age)},
+ {"autovacuum_freeze_max_age", RELOPT_TYPE_INT,
+ offsetof(StdRdOptions, autovacuum) + offsetof(AutoVacOpts, freeze_max_age)},
+ {"autovacuum_freeze_table_age", RELOPT_TYPE_INT,
+ offsetof(StdRdOptions, autovacuum) + offsetof(AutoVacOpts, freeze_table_age)},
+ {"autovacuum_multixact_freeze_min_age", RELOPT_TYPE_INT,
+ offsetof(StdRdOptions, autovacuum) + offsetof(AutoVacOpts, multixact_freeze_min_age)},
+ {"autovacuum_multixact_freeze_max_age", RELOPT_TYPE_INT,
+ offsetof(StdRdOptions, autovacuum) + offsetof(AutoVacOpts, multixact_freeze_max_age)},
+ {"autovacuum_multixact_freeze_table_age", RELOPT_TYPE_INT,
+ offsetof(StdRdOptions, autovacuum) + offsetof(AutoVacOpts, multixact_freeze_table_age)},
+ {"log_autovacuum_min_duration", RELOPT_TYPE_INT,
+ offsetof(StdRdOptions, autovacuum) + offsetof(AutoVacOpts, log_min_duration)},
+ {"toast_tuple_target", RELOPT_TYPE_INT,
+ offsetof(StdRdOptions, toast_tuple_target)},
+ {"autovacuum_vacuum_cost_delay", RELOPT_TYPE_REAL,
+ offsetof(StdRdOptions, autovacuum) + offsetof(AutoVacOpts, vacuum_cost_delay)},
+ {"autovacuum_vacuum_scale_factor", RELOPT_TYPE_REAL,
+ offsetof(StdRdOptions, autovacuum) + offsetof(AutoVacOpts, vacuum_scale_factor)},
+ {"autovacuum_vacuum_insert_scale_factor", RELOPT_TYPE_REAL,
+ offsetof(StdRdOptions, autovacuum) + offsetof(AutoVacOpts, vacuum_ins_scale_factor)},
+ {"autovacuum_analyze_scale_factor", RELOPT_TYPE_REAL,
+ offsetof(StdRdOptions, autovacuum) + offsetof(AutoVacOpts, analyze_scale_factor)},
+ {"user_catalog_table", RELOPT_TYPE_BOOL,
+ offsetof(StdRdOptions, user_catalog_table)},
+ {"parallel_workers", RELOPT_TYPE_INT,
+ offsetof(StdRdOptions, parallel_workers)},
+ {"vacuum_index_cleanup", RELOPT_TYPE_ENUM,
+ offsetof(StdRdOptions, vacuum_index_cleanup)},
+ {"vacuum_truncate", RELOPT_TYPE_BOOL,
+ offsetof(StdRdOptions, vacuum_truncate)}
+ };
+
+ return (bytea *) build_reloptions(reloptions, validate, kind,
+ sizeof(StdRdOptions),
+ tab, lengthof(tab));
+}
+
+/*
+ * build_reloptions
+ *
+ * Parses "reloptions" provided by the caller, returning them in a
+ * structure containing the parsed options. The parsing is done with
+ * the help of a parsing table describing the allowed options, defined
+ * by "relopt_elems" of length "num_relopt_elems".
+ *
+ * "validate" must be true if reloptions value is freshly built by
+ * transformRelOptions(), as opposed to being read from the catalog, in which
+ * case the values contained in it must already be valid.
+ *
+ * NULL is returned if the passed-in options did not match any of the options
+ * in the parsing table, unless validate is true in which case an error would
+ * be reported.
+ */
+void *
+build_reloptions(Datum reloptions, bool validate,
+ relopt_kind kind,
+ Size relopt_struct_size,
+ const relopt_parse_elt *relopt_elems,
+ int num_relopt_elems)
+{
+ int numoptions;
+ relopt_value *options;
+ void *rdopts;
+
+ /* parse options specific to given relation option kind */
+ options = parseRelOptions(reloptions, validate, kind, &numoptions);
+ Assert(numoptions <= num_relopt_elems);
+
+ /* if none set, we're done */
+ if (numoptions == 0)
+ {
+ Assert(options == NULL);
+ return NULL;
+ }
+
+ /* allocate and fill the structure */
+ rdopts = allocateReloptStruct(relopt_struct_size, options, numoptions);
+ fillRelOptions(rdopts, relopt_struct_size, options, numoptions,
+ validate, relopt_elems, num_relopt_elems);
+
+ pfree(options);
+
+ return rdopts;
+}
+
+/*
+ * Parse local options, allocate a bytea struct that's of the specified
+ * 'base_size' plus any extra space that's needed for string variables,
+ * fill its option's fields located at the given offsets and return it.
+ */
+void *
+build_local_reloptions(local_relopts *relopts, Datum options, bool validate)
+{
+ int noptions = list_length(relopts->options);
+ relopt_parse_elt *elems = palloc(sizeof(*elems) * noptions);
+ relopt_value *vals;
+ void *opts;
+ int i = 0;
+ ListCell *lc;
+
+ foreach(lc, relopts->options)
+ {
+ local_relopt *opt = lfirst(lc);
+
+ elems[i].optname = opt->option->name;
+ elems[i].opttype = opt->option->type;
+ elems[i].offset = opt->offset;
+
+ i++;
+ }
+
+ vals = parseLocalRelOptions(relopts, options, validate);
+ opts = allocateReloptStruct(relopts->relopt_struct_size, vals, noptions);
+ fillRelOptions(opts, relopts->relopt_struct_size, vals, noptions, validate,
+ elems, noptions);
+
+ foreach(lc, relopts->validators)
+ ((relopts_validator) lfirst(lc)) (opts, vals, noptions);
+
+ if (elems)
+ pfree(elems);
+
+ return opts;
+}
+
+/*
+ * Option parser for partitioned tables
+ */
+bytea *
+partitioned_table_reloptions(Datum reloptions, bool validate)
+{
+ /*
+ * There are no options for partitioned tables yet, but this is able to do
+ * some validation.
+ */
+ return (bytea *) build_reloptions(reloptions, validate,
+ RELOPT_KIND_PARTITIONED,
+ 0, NULL, 0);
+}
+
+/*
+ * Option parser for views
+ */
+bytea *
+view_reloptions(Datum reloptions, bool validate)
+{
+ static const relopt_parse_elt tab[] = {
+ {"security_barrier", RELOPT_TYPE_BOOL,
+ offsetof(ViewOptions, security_barrier)},
+ {"check_option", RELOPT_TYPE_ENUM,
+ offsetof(ViewOptions, check_option)}
+ };
+
+ return (bytea *) build_reloptions(reloptions, validate,
+ RELOPT_KIND_VIEW,
+ sizeof(ViewOptions),
+ tab, lengthof(tab));
+}
+
+/*
+ * Parse options for heaps, views and toast tables.
+ */
+bytea *
+heap_reloptions(char relkind, Datum reloptions, bool validate)
+{
+ StdRdOptions *rdopts;
+
+ switch (relkind)
+ {
+ case RELKIND_TOASTVALUE:
+ rdopts = (StdRdOptions *)
+ default_reloptions(reloptions, validate, RELOPT_KIND_TOAST);
+ if (rdopts != NULL)
+ {
+ /* adjust default-only parameters for TOAST relations */
+ rdopts->fillfactor = 100;
+ rdopts->autovacuum.analyze_threshold = -1;
+ rdopts->autovacuum.analyze_scale_factor = -1;
+ }
+ return (bytea *) rdopts;
+ case RELKIND_RELATION:
+ case RELKIND_MATVIEW:
+ return default_reloptions(reloptions, validate, RELOPT_KIND_HEAP);
+ default:
+ /* other relkinds are not supported */
+ return NULL;
+ }
+}
+
+
+/*
+ * Parse options for indexes.
+ *
+ * amoptions index AM's option parser function
+ * reloptions options as text[] datum
+ * validate error flag
+ */
+bytea *
+index_reloptions(amoptions_function amoptions, Datum reloptions, bool validate)
+{
+ Assert(amoptions != NULL);
+
+ /* Assume function is strict */
+ if (!PointerIsValid(DatumGetPointer(reloptions)))
+ return NULL;
+
+ return amoptions(reloptions, validate);
+}
+
+/*
+ * Option parser for attribute reloptions
+ */
+bytea *
+attribute_reloptions(Datum reloptions, bool validate)
+{
+ static const relopt_parse_elt tab[] = {
+ {"n_distinct", RELOPT_TYPE_REAL, offsetof(AttributeOpts, n_distinct)},
+ {"n_distinct_inherited", RELOPT_TYPE_REAL, offsetof(AttributeOpts, n_distinct_inherited)}
+ };
+
+ return (bytea *) build_reloptions(reloptions, validate,
+ RELOPT_KIND_ATTRIBUTE,
+ sizeof(AttributeOpts),
+ tab, lengthof(tab));
+}
+
+/*
+ * Option parser for tablespace reloptions
+ */
+bytea *
+tablespace_reloptions(Datum reloptions, bool validate)
+{
+ static const relopt_parse_elt tab[] = {
+ {"random_page_cost", RELOPT_TYPE_REAL, offsetof(TableSpaceOpts, random_page_cost)},
+ {"seq_page_cost", RELOPT_TYPE_REAL, offsetof(TableSpaceOpts, seq_page_cost)},
+ {"effective_io_concurrency", RELOPT_TYPE_INT, offsetof(TableSpaceOpts, effective_io_concurrency)},
+ {"maintenance_io_concurrency", RELOPT_TYPE_INT, offsetof(TableSpaceOpts, maintenance_io_concurrency)}
+ };
+
+ return (bytea *) build_reloptions(reloptions, validate,
+ RELOPT_KIND_TABLESPACE,
+ sizeof(TableSpaceOpts),
+ tab, lengthof(tab));
+}
+
+/*
+ * Determine the required LOCKMODE from an option list.
+ *
+ * Called from AlterTableGetLockLevel(), see that function
+ * for a longer explanation of how this works.
+ */
+LOCKMODE
+AlterTableGetRelOptionsLockLevel(List *defList)
+{
+ LOCKMODE lockmode = NoLock;
+ ListCell *cell;
+
+ if (defList == NIL)
+ return AccessExclusiveLock;
+
+ if (need_initialization)
+ initialize_reloptions();
+
+ foreach(cell, defList)
+ {
+ DefElem *def = (DefElem *) lfirst(cell);
+ int i;
+
+ for (i = 0; relOpts[i]; i++)
+ {
+ if (strncmp(relOpts[i]->name,
+ def->defname,
+ relOpts[i]->namelen + 1) == 0)
+ {
+ if (lockmode < relOpts[i]->lockmode)
+ lockmode = relOpts[i]->lockmode;
+ }
+ }
+ }
+
+ return lockmode;
+}
diff --git a/src/backend/access/common/scankey.c b/src/backend/access/common/scankey.c
new file mode 100644
index 0000000..bf33c50
--- /dev/null
+++ b/src/backend/access/common/scankey.c
@@ -0,0 +1,117 @@
+/*-------------------------------------------------------------------------
+ *
+ * scankey.c
+ * scan key support code
+ *
+ * Portions Copyright (c) 1996-2021, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ *
+ * IDENTIFICATION
+ * src/backend/access/common/scankey.c
+ *
+ *-------------------------------------------------------------------------
+ */
+#include "postgres.h"
+
+#include "access/skey.h"
+#include "catalog/pg_collation.h"
+
+
+/*
+ * ScanKeyEntryInitialize
+ * Initializes a scan key entry given all the field values.
+ * The target procedure is specified by OID (but can be invalid
+ * if SK_SEARCHNULL or SK_SEARCHNOTNULL is set).
+ *
+ * Note: CurrentMemoryContext at call should be as long-lived as the ScanKey
+ * itself, because that's what will be used for any subsidiary info attached
+ * to the ScanKey's FmgrInfo record.
+ */
+void
+ScanKeyEntryInitialize(ScanKey entry,
+ int flags,
+ AttrNumber attributeNumber,
+ StrategyNumber strategy,
+ Oid subtype,
+ Oid collation,
+ RegProcedure procedure,
+ Datum argument)
+{
+ entry->sk_flags = flags;
+ entry->sk_attno = attributeNumber;
+ entry->sk_strategy = strategy;
+ entry->sk_subtype = subtype;
+ entry->sk_collation = collation;
+ entry->sk_argument = argument;
+ if (RegProcedureIsValid(procedure))
+ {
+ fmgr_info(procedure, &entry->sk_func);
+ }
+ else
+ {
+ Assert(flags & (SK_SEARCHNULL | SK_SEARCHNOTNULL));
+ MemSet(&entry->sk_func, 0, sizeof(entry->sk_func));
+ }
+}
+
+/*
+ * ScanKeyInit
+ * Shorthand version of ScanKeyEntryInitialize: flags and subtype
+ * are assumed to be zero (the usual value), and collation is defaulted.
+ *
+ * This is the recommended version for hardwired lookups in system catalogs.
+ * It cannot handle NULL arguments, unary operators, or nondefault operators,
+ * but we need none of those features for most hardwired lookups.
+ *
+ * We set collation to C_COLLATION_OID always. This is the correct value
+ * for all collation-aware columns in system catalogs, and it will be ignored
+ * for other column types, so it's not worth trying to be more finicky.
+ *
+ * Note: CurrentMemoryContext at call should be as long-lived as the ScanKey
+ * itself, because that's what will be used for any subsidiary info attached
+ * to the ScanKey's FmgrInfo record.
+ */
+void
+ScanKeyInit(ScanKey entry,
+ AttrNumber attributeNumber,
+ StrategyNumber strategy,
+ RegProcedure procedure,
+ Datum argument)
+{
+ entry->sk_flags = 0;
+ entry->sk_attno = attributeNumber;
+ entry->sk_strategy = strategy;
+ entry->sk_subtype = InvalidOid;
+ entry->sk_collation = C_COLLATION_OID;
+ entry->sk_argument = argument;
+ fmgr_info(procedure, &entry->sk_func);
+}
+
+/*
+ * ScanKeyEntryInitializeWithInfo
+ * Initializes a scan key entry using an already-completed FmgrInfo
+ * function lookup record.
+ *
+ * Note: CurrentMemoryContext at call should be as long-lived as the ScanKey
+ * itself, because that's what will be used for any subsidiary info attached
+ * to the ScanKey's FmgrInfo record.
+ */
+void
+ScanKeyEntryInitializeWithInfo(ScanKey entry,
+ int flags,
+ AttrNumber attributeNumber,
+ StrategyNumber strategy,
+ Oid subtype,
+ Oid collation,
+ FmgrInfo *finfo,
+ Datum argument)
+{
+ entry->sk_flags = flags;
+ entry->sk_attno = attributeNumber;
+ entry->sk_strategy = strategy;
+ entry->sk_subtype = subtype;
+ entry->sk_collation = collation;
+ entry->sk_argument = argument;
+ fmgr_info_copy(&entry->sk_func, finfo, CurrentMemoryContext);
+}
diff --git a/src/backend/access/common/session.c b/src/backend/access/common/session.c
new file mode 100644
index 0000000..61b3206
--- /dev/null
+++ b/src/backend/access/common/session.c
@@ -0,0 +1,208 @@
+/*-------------------------------------------------------------------------
+ *
+ * session.c
+ * Encapsulation of user session.
+ *
+ * This is intended to contain data that needs to be shared between backends
+ * performing work for a client session. In particular such a session is
+ * shared between the leader and worker processes for parallel queries. At
+ * some later point it might also become useful infrastructure for separating
+ * backends from client connections, e.g. for the purpose of pooling.
+ *
+ * Currently this infrastructure is used to share:
+ * - typemod registry for ephemeral row-types, i.e. BlessTupleDesc etc.
+ *
+ * Portions Copyright (c) 2017-2021, PostgreSQL Global Development Group
+ *
+ * src/backend/access/common/session.c
+ *
+ *-------------------------------------------------------------------------
+ */
+#include "postgres.h"
+
+#include "access/session.h"
+#include "storage/lwlock.h"
+#include "storage/shm_toc.h"
+#include "utils/memutils.h"
+#include "utils/typcache.h"
+
+/* Magic number for per-session DSM TOC. */
+#define SESSION_MAGIC 0xabb0fbc9
+
+/*
+ * We want to create a DSA area to store shared state that has the same
+ * lifetime as a session. So far, it's only used to hold the shared record
+ * type registry. We don't want it to have to create any DSM segments just
+ * yet in common cases, so we'll give it enough space to hold a very small
+ * SharedRecordTypmodRegistry.
+ */
+#define SESSION_DSA_SIZE 0x30000
+
+/*
+ * Magic numbers for state sharing in the per-session DSM area.
+ */
+#define SESSION_KEY_DSA UINT64CONST(0xFFFFFFFFFFFF0001)
+#define SESSION_KEY_RECORD_TYPMOD_REGISTRY UINT64CONST(0xFFFFFFFFFFFF0002)
+
+/* This backend's current session. */
+Session *CurrentSession = NULL;
+
+/*
+ * Set up CurrentSession to point to an empty Session object.
+ */
+void
+InitializeSession(void)
+{
+ CurrentSession = MemoryContextAllocZero(TopMemoryContext, sizeof(Session));
+}
+
+/*
+ * Initialize the per-session DSM segment if it isn't already initialized, and
+ * return its handle so that worker processes can attach to it.
+ *
+ * Unlike the per-context DSM segment, this segment and its contents are
+ * reused for future parallel queries.
+ *
+ * Return DSM_HANDLE_INVALID if a segment can't be allocated due to lack of
+ * resources.
+ */
+dsm_handle
+GetSessionDsmHandle(void)
+{
+ shm_toc_estimator estimator;
+ shm_toc *toc;
+ dsm_segment *seg;
+ size_t typmod_registry_size;
+ size_t size;
+ void *dsa_space;
+ void *typmod_registry_space;
+ dsa_area *dsa;
+ MemoryContext old_context;
+
+ /*
+ * If we have already created a session-scope DSM segment in this backend,
+ * return its handle. The same segment will be used for the rest of this
+ * backend's lifetime.
+ */
+ if (CurrentSession->segment != NULL)
+ return dsm_segment_handle(CurrentSession->segment);
+
+ /* Otherwise, prepare to set one up. */
+ old_context = MemoryContextSwitchTo(TopMemoryContext);
+ shm_toc_initialize_estimator(&estimator);
+
+ /* Estimate space for the per-session DSA area. */
+ shm_toc_estimate_keys(&estimator, 1);
+ shm_toc_estimate_chunk(&estimator, SESSION_DSA_SIZE);
+
+ /* Estimate space for the per-session record typmod registry. */
+ typmod_registry_size = SharedRecordTypmodRegistryEstimate();
+ shm_toc_estimate_keys(&estimator, 1);
+ shm_toc_estimate_chunk(&estimator, typmod_registry_size);
+
+ /* Set up segment and TOC. */
+ size = shm_toc_estimate(&estimator);
+ seg = dsm_create(size, DSM_CREATE_NULL_IF_MAXSEGMENTS);
+ if (seg == NULL)
+ {
+ MemoryContextSwitchTo(old_context);
+
+ return DSM_HANDLE_INVALID;
+ }
+ toc = shm_toc_create(SESSION_MAGIC,
+ dsm_segment_address(seg),
+ size);
+
+ /* Create per-session DSA area. */
+ dsa_space = shm_toc_allocate(toc, SESSION_DSA_SIZE);
+ dsa = dsa_create_in_place(dsa_space,
+ SESSION_DSA_SIZE,
+ LWTRANCHE_PER_SESSION_DSA,
+ seg);
+ shm_toc_insert(toc, SESSION_KEY_DSA, dsa_space);
+
+
+ /* Create session-scoped shared record typmod registry. */
+ typmod_registry_space = shm_toc_allocate(toc, typmod_registry_size);
+ SharedRecordTypmodRegistryInit((SharedRecordTypmodRegistry *)
+ typmod_registry_space, seg, dsa);
+ shm_toc_insert(toc, SESSION_KEY_RECORD_TYPMOD_REGISTRY,
+ typmod_registry_space);
+
+ /*
+ * If we got this far, we can pin the shared memory so it stays mapped for
+ * the rest of this backend's life. If we don't make it this far, cleanup
+ * callbacks for anything we installed above (ie currently
+ * SharedRecordTypmodRegistry) will run when the DSM segment is detached
+ * by CurrentResourceOwner so we aren't left with a broken CurrentSession.
+ */
+ dsm_pin_mapping(seg);
+ dsa_pin_mapping(dsa);
+
+ /* Make segment and area available via CurrentSession. */
+ CurrentSession->segment = seg;
+ CurrentSession->area = dsa;
+
+ MemoryContextSwitchTo(old_context);
+
+ return dsm_segment_handle(seg);
+}
+
+/*
+ * Attach to a per-session DSM segment provided by a parallel leader.
+ */
+void
+AttachSession(dsm_handle handle)
+{
+ dsm_segment *seg;
+ shm_toc *toc;
+ void *dsa_space;
+ void *typmod_registry_space;
+ dsa_area *dsa;
+ MemoryContext old_context;
+
+ old_context = MemoryContextSwitchTo(TopMemoryContext);
+
+ /* Attach to the DSM segment. */
+ seg = dsm_attach(handle);
+ if (seg == NULL)
+ elog(ERROR, "could not attach to per-session DSM segment");
+ toc = shm_toc_attach(SESSION_MAGIC, dsm_segment_address(seg));
+
+ /* Attach to the DSA area. */
+ dsa_space = shm_toc_lookup(toc, SESSION_KEY_DSA, false);
+ dsa = dsa_attach_in_place(dsa_space, seg);
+
+ /* Make them available via the current session. */
+ CurrentSession->segment = seg;
+ CurrentSession->area = dsa;
+
+ /* Attach to the shared record typmod registry. */
+ typmod_registry_space =
+ shm_toc_lookup(toc, SESSION_KEY_RECORD_TYPMOD_REGISTRY, false);
+ SharedRecordTypmodRegistryAttach((SharedRecordTypmodRegistry *)
+ typmod_registry_space);
+
+ /* Remain attached until end of backend or DetachSession(). */
+ dsm_pin_mapping(seg);
+ dsa_pin_mapping(dsa);
+
+ MemoryContextSwitchTo(old_context);
+}
+
+/*
+ * Detach from the current session DSM segment. It's not strictly necessary
+ * to do this explicitly since we'll detach automatically at backend exit, but
+ * if we ever reuse parallel workers it will become important for workers to
+ * detach from one session before attaching to another. Note that this runs
+ * detach hooks.
+ */
+void
+DetachSession(void)
+{
+ /* Runs detach hooks. */
+ dsm_detach(CurrentSession->segment);
+ CurrentSession->segment = NULL;
+ dsa_detach(CurrentSession->area);
+ CurrentSession->area = NULL;
+}
diff --git a/src/backend/access/common/syncscan.c b/src/backend/access/common/syncscan.c
new file mode 100644
index 0000000..b7a28af
--- /dev/null
+++ b/src/backend/access/common/syncscan.c
@@ -0,0 +1,322 @@
+/*-------------------------------------------------------------------------
+ *
+ * syncscan.c
+ * scan synchronization support
+ *
+ * When multiple backends run a sequential scan on the same table, we try
+ * to keep them synchronized to reduce the overall I/O needed. The goal is
+ * to read each page into shared buffer cache only once, and let all backends
+ * that take part in the shared scan process the page before it falls out of
+ * the cache.
+ *
+ * Since the "leader" in a pack of backends doing a seqscan will have to wait
+ * for I/O, while the "followers" don't, there is a strong self-synchronizing
+ * effect once we can get the backends examining approximately the same part
+ * of the table at the same time. Hence all that is really needed is to get
+ * a new backend beginning a seqscan to begin it close to where other backends
+ * are reading. We can scan the table circularly, from block X up to the
+ * end and then from block 0 to X-1, to ensure we visit all rows while still
+ * participating in the common scan.
+ *
+ * To accomplish that, we keep track of the scan position of each table, and
+ * start new scans close to where the previous scan(s) are. We don't try to
+ * do any extra synchronization to keep the scans together afterwards; some
+ * scans might progress much more slowly than others, for example if the
+ * results need to be transferred to the client over a slow network, and we
+ * don't want such queries to slow down others.
+ *
+ * There can realistically only be a few large sequential scans on different
+ * tables in progress at any time. Therefore we just keep the scan positions
+ * in a small LRU list which we scan every time we need to look up or update a
+ * scan position. The whole mechanism is only applied for tables exceeding
+ * a threshold size (but that is not the concern of this module).
+ *
+ * INTERFACE ROUTINES
+ * ss_get_location - return current scan location of a relation
+ * ss_report_location - update current scan location
+ *
+ *
+ * Portions Copyright (c) 1996-2021, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * IDENTIFICATION
+ * src/backend/access/common/syncscan.c
+ *
+ *-------------------------------------------------------------------------
+ */
+#include "postgres.h"
+
+#include "access/syncscan.h"
+#include "miscadmin.h"
+#include "storage/lwlock.h"
+#include "storage/shmem.h"
+#include "utils/rel.h"
+
+
+/* GUC variables */
+#ifdef TRACE_SYNCSCAN
+bool trace_syncscan = false;
+#endif
+
+
+/*
+ * Size of the LRU list.
+ *
+ * Note: the code assumes that SYNC_SCAN_NELEM > 1.
+ *
+ * XXX: What's a good value? It should be large enough to hold the
+ * maximum number of large tables scanned simultaneously. But a larger value
+ * means more traversing of the LRU list when starting a new scan.
+ */
+#define SYNC_SCAN_NELEM 20
+
+/*
+ * Interval between reports of the location of the current scan, in pages.
+ *
+ * Note: This should be smaller than the ring size (see buffer/freelist.c)
+ * we use for bulk reads. Otherwise a scan joining other scans might start
+ * from a page that's no longer in the buffer cache. This is a bit fuzzy;
+ * there's no guarantee that the new scan will read the page before it leaves
+ * the buffer cache anyway, and on the other hand the page is most likely
+ * still in the OS cache.
+ */
+#define SYNC_SCAN_REPORT_INTERVAL (128 * 1024 / BLCKSZ)
+
+
+/*
+ * The scan locations structure is essentially a doubly-linked LRU with head
+ * and tail pointer, but designed to hold a fixed maximum number of elements in
+ * fixed-size shared memory.
+ */
+typedef struct ss_scan_location_t
+{
+ RelFileNode relfilenode; /* identity of a relation */
+ BlockNumber location; /* last-reported location in the relation */
+} ss_scan_location_t;
+
+typedef struct ss_lru_item_t
+{
+ struct ss_lru_item_t *prev;
+ struct ss_lru_item_t *next;
+ ss_scan_location_t location;
+} ss_lru_item_t;
+
+typedef struct ss_scan_locations_t
+{
+ ss_lru_item_t *head;
+ ss_lru_item_t *tail;
+ ss_lru_item_t items[FLEXIBLE_ARRAY_MEMBER]; /* SYNC_SCAN_NELEM items */
+} ss_scan_locations_t;
+
+#define SizeOfScanLocations(N) \
+ (offsetof(ss_scan_locations_t, items) + (N) * sizeof(ss_lru_item_t))
+
+/* Pointer to struct in shared memory */
+static ss_scan_locations_t *scan_locations;
+
+/* prototypes for internal functions */
+static BlockNumber ss_search(RelFileNode relfilenode,
+ BlockNumber location, bool set);
+
+
+/*
+ * SyncScanShmemSize --- report amount of shared memory space needed
+ */
+Size
+SyncScanShmemSize(void)
+{
+ return SizeOfScanLocations(SYNC_SCAN_NELEM);
+}
+
+/*
+ * SyncScanShmemInit --- initialize this module's shared memory
+ */
+void
+SyncScanShmemInit(void)
+{
+ int i;
+ bool found;
+
+ scan_locations = (ss_scan_locations_t *)
+ ShmemInitStruct("Sync Scan Locations List",
+ SizeOfScanLocations(SYNC_SCAN_NELEM),
+ &found);
+
+ if (!IsUnderPostmaster)
+ {
+ /* Initialize shared memory area */
+ Assert(!found);
+
+ scan_locations->head = &scan_locations->items[0];
+ scan_locations->tail = &scan_locations->items[SYNC_SCAN_NELEM - 1];
+
+ for (i = 0; i < SYNC_SCAN_NELEM; i++)
+ {
+ ss_lru_item_t *item = &scan_locations->items[i];
+
+ /*
+ * Initialize all slots with invalid values. As scans are started,
+ * these invalid entries will fall off the LRU list and get
+ * replaced with real entries.
+ */
+ item->location.relfilenode.spcNode = InvalidOid;
+ item->location.relfilenode.dbNode = InvalidOid;
+ item->location.relfilenode.relNode = InvalidOid;
+ item->location.location = InvalidBlockNumber;
+
+ item->prev = (i > 0) ?
+ (&scan_locations->items[i - 1]) : NULL;
+ item->next = (i < SYNC_SCAN_NELEM - 1) ?
+ (&scan_locations->items[i + 1]) : NULL;
+ }
+ }
+ else
+ Assert(found);
+}
+
+/*
+ * ss_search --- search the scan_locations structure for an entry with the
+ * given relfilenode.
+ *
+ * If "set" is true, the location is updated to the given location. If no
+ * entry for the given relfilenode is found, it will be created at the head
+ * of the list with the given location, even if "set" is false.
+ *
+ * In any case, the location after possible update is returned.
+ *
+ * Caller is responsible for having acquired suitable lock on the shared
+ * data structure.
+ */
+static BlockNumber
+ss_search(RelFileNode relfilenode, BlockNumber location, bool set)
+{
+ ss_lru_item_t *item;
+
+ item = scan_locations->head;
+ for (;;)
+ {
+ bool match;
+
+ match = RelFileNodeEquals(item->location.relfilenode, relfilenode);
+
+ if (match || item->next == NULL)
+ {
+ /*
+ * If we reached the end of list and no match was found, take over
+ * the last entry
+ */
+ if (!match)
+ {
+ item->location.relfilenode = relfilenode;
+ item->location.location = location;
+ }
+ else if (set)
+ item->location.location = location;
+
+ /* Move the entry to the front of the LRU list */
+ if (item != scan_locations->head)
+ {
+ /* unlink */
+ if (item == scan_locations->tail)
+ scan_locations->tail = item->prev;
+ item->prev->next = item->next;
+ if (item->next)
+ item->next->prev = item->prev;
+
+ /* link */
+ item->prev = NULL;
+ item->next = scan_locations->head;
+ scan_locations->head->prev = item;
+ scan_locations->head = item;
+ }
+
+ return item->location.location;
+ }
+
+ item = item->next;
+ }
+
+ /* not reached */
+}
+
+/*
+ * ss_get_location --- get the optimal starting location for scan
+ *
+ * Returns the last-reported location of a sequential scan on the
+ * relation, or 0 if no valid location is found.
+ *
+ * We expect the caller has just done RelationGetNumberOfBlocks(), and
+ * so that number is passed in rather than computing it again. The result
+ * is guaranteed less than relnblocks (assuming that's > 0).
+ */
+BlockNumber
+ss_get_location(Relation rel, BlockNumber relnblocks)
+{
+ BlockNumber startloc;
+
+ LWLockAcquire(SyncScanLock, LW_EXCLUSIVE);
+ startloc = ss_search(rel->rd_node, 0, false);
+ LWLockRelease(SyncScanLock);
+
+ /*
+ * If the location is not a valid block number for this scan, start at 0.
+ *
+ * This can happen if for instance a VACUUM truncated the table since the
+ * location was saved.
+ */
+ if (startloc >= relnblocks)
+ startloc = 0;
+
+#ifdef TRACE_SYNCSCAN
+ if (trace_syncscan)
+ elog(LOG,
+ "SYNC_SCAN: start \"%s\" (size %u) at %u",
+ RelationGetRelationName(rel), relnblocks, startloc);
+#endif
+
+ return startloc;
+}
+
+/*
+ * ss_report_location --- update the current scan location
+ *
+ * Writes an entry into the shared Sync Scan state of the form
+ * (relfilenode, blocknumber), overwriting any existing entry for the
+ * same relfilenode.
+ */
+void
+ss_report_location(Relation rel, BlockNumber location)
+{
+#ifdef TRACE_SYNCSCAN
+ if (trace_syncscan)
+ {
+ if ((location % 1024) == 0)
+ elog(LOG,
+ "SYNC_SCAN: scanning \"%s\" at %u",
+ RelationGetRelationName(rel), location);
+ }
+#endif
+
+ /*
+ * To reduce lock contention, only report scan progress every N pages. For
+ * the same reason, don't block if the lock isn't immediately available.
+ * Missing a few updates isn't critical, it just means that a new scan
+ * that wants to join the pack will start a little bit behind the head of
+ * the scan. Hopefully the pages are still in OS cache and the scan
+ * catches up quickly.
+ */
+ if ((location % SYNC_SCAN_REPORT_INTERVAL) == 0)
+ {
+ if (LWLockConditionalAcquire(SyncScanLock, LW_EXCLUSIVE))
+ {
+ (void) ss_search(rel->rd_node, location, true);
+ LWLockRelease(SyncScanLock);
+ }
+#ifdef TRACE_SYNCSCAN
+ else if (trace_syncscan)
+ elog(LOG,
+ "SYNC_SCAN: missed update for \"%s\" at %u",
+ RelationGetRelationName(rel), location);
+#endif
+ }
+}
diff --git a/src/backend/access/common/toast_compression.c b/src/backend/access/common/toast_compression.c
new file mode 100644
index 0000000..8456183
--- /dev/null
+++ b/src/backend/access/common/toast_compression.c
@@ -0,0 +1,318 @@
+/*-------------------------------------------------------------------------
+ *
+ * toast_compression.c
+ * Functions for toast compression.
+ *
+ * Copyright (c) 2021, PostgreSQL Global Development Group
+ *
+ *
+ * IDENTIFICATION
+ * src/backend/access/common/toast_compression.c
+ *
+ *-------------------------------------------------------------------------
+ */
+#include "postgres.h"
+
+#ifdef USE_LZ4
+#include <lz4.h>
+#endif
+
+#include "access/detoast.h"
+#include "access/toast_compression.h"
+#include "common/pg_lzcompress.h"
+#include "fmgr.h"
+#include "utils/builtins.h"
+
+/* GUC */
+int default_toast_compression = TOAST_PGLZ_COMPRESSION;
+
+#define NO_LZ4_SUPPORT() \
+ ereport(ERROR, \
+ (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), \
+ errmsg("compression method lz4 not supported"), \
+ errdetail("This functionality requires the server to be built with lz4 support."), \
+ errhint("You need to rebuild PostgreSQL using %s.", "--with-lz4")))
+
+/*
+ * Compress a varlena using PGLZ.
+ *
+ * Returns the compressed varlena, or NULL if compression fails.
+ */
+struct varlena *
+pglz_compress_datum(const struct varlena *value)
+{
+ int32 valsize,
+ len;
+ struct varlena *tmp = NULL;
+
+ valsize = VARSIZE_ANY_EXHDR(DatumGetPointer(value));
+
+ /*
+ * No point in wasting a palloc cycle if value size is outside the allowed
+ * range for compression.
+ */
+ if (valsize < PGLZ_strategy_default->min_input_size ||
+ valsize > PGLZ_strategy_default->max_input_size)
+ return NULL;
+
+ /*
+ * Figure out the maximum possible size of the pglz output, add the bytes
+ * that will be needed for varlena overhead, and allocate that amount.
+ */
+ tmp = (struct varlena *) palloc(PGLZ_MAX_OUTPUT(valsize) +
+ VARHDRSZ_COMPRESSED);
+
+ len = pglz_compress(VARDATA_ANY(value),
+ valsize,
+ (char *) tmp + VARHDRSZ_COMPRESSED,
+ NULL);
+ if (len < 0)
+ {
+ pfree(tmp);
+ return NULL;
+ }
+
+ SET_VARSIZE_COMPRESSED(tmp, len + VARHDRSZ_COMPRESSED);
+
+ return tmp;
+}
+
+/*
+ * Decompress a varlena that was compressed using PGLZ.
+ */
+struct varlena *
+pglz_decompress_datum(const struct varlena *value)
+{
+ struct varlena *result;
+ int32 rawsize;
+
+ /* allocate memory for the uncompressed data */
+ result = (struct varlena *) palloc(VARDATA_COMPRESSED_GET_EXTSIZE(value) + VARHDRSZ);
+
+ /* decompress the data */
+ rawsize = pglz_decompress((char *) value + VARHDRSZ_COMPRESSED,
+ VARSIZE(value) - VARHDRSZ_COMPRESSED,
+ VARDATA(result),
+ VARDATA_COMPRESSED_GET_EXTSIZE(value), true);
+ if (rawsize < 0)
+ ereport(ERROR,
+ (errcode(ERRCODE_DATA_CORRUPTED),
+ errmsg_internal("compressed pglz data is corrupt")));
+
+ SET_VARSIZE(result, rawsize + VARHDRSZ);
+
+ return result;
+}
+
+/*
+ * Decompress part of a varlena that was compressed using PGLZ.
+ */
+struct varlena *
+pglz_decompress_datum_slice(const struct varlena *value,
+ int32 slicelength)
+{
+ struct varlena *result;
+ int32 rawsize;
+
+ /* allocate memory for the uncompressed data */
+ result = (struct varlena *) palloc(slicelength + VARHDRSZ);
+
+ /* decompress the data */
+ rawsize = pglz_decompress((char *) value + VARHDRSZ_COMPRESSED,
+ VARSIZE(value) - VARHDRSZ_COMPRESSED,
+ VARDATA(result),
+ slicelength, false);
+ if (rawsize < 0)
+ ereport(ERROR,
+ (errcode(ERRCODE_DATA_CORRUPTED),
+ errmsg_internal("compressed pglz data is corrupt")));
+
+ SET_VARSIZE(result, rawsize + VARHDRSZ);
+
+ return result;
+}
+
+/*
+ * Compress a varlena using LZ4.
+ *
+ * Returns the compressed varlena, or NULL if compression fails.
+ */
+struct varlena *
+lz4_compress_datum(const struct varlena *value)
+{
+#ifndef USE_LZ4
+ NO_LZ4_SUPPORT();
+ return NULL; /* keep compiler quiet */
+#else
+ int32 valsize;
+ int32 len;
+ int32 max_size;
+ struct varlena *tmp = NULL;
+
+ valsize = VARSIZE_ANY_EXHDR(value);
+
+ /*
+ * Figure out the maximum possible size of the LZ4 output, add the bytes
+ * that will be needed for varlena overhead, and allocate that amount.
+ */
+ max_size = LZ4_compressBound(valsize);
+ tmp = (struct varlena *) palloc(max_size + VARHDRSZ_COMPRESSED);
+
+ len = LZ4_compress_default(VARDATA_ANY(value),
+ (char *) tmp + VARHDRSZ_COMPRESSED,
+ valsize, max_size);
+ if (len <= 0)
+ elog(ERROR, "lz4 compression failed");
+
+ /* data is incompressible so just free the memory and return NULL */
+ if (len > valsize)
+ {
+ pfree(tmp);
+ return NULL;
+ }
+
+ SET_VARSIZE_COMPRESSED(tmp, len + VARHDRSZ_COMPRESSED);
+
+ return tmp;
+#endif
+}
+
+/*
+ * Decompress a varlena that was compressed using LZ4.
+ */
+struct varlena *
+lz4_decompress_datum(const struct varlena *value)
+{
+#ifndef USE_LZ4
+ NO_LZ4_SUPPORT();
+ return NULL; /* keep compiler quiet */
+#else
+ int32 rawsize;
+ struct varlena *result;
+
+ /* allocate memory for the uncompressed data */
+ result = (struct varlena *) palloc(VARDATA_COMPRESSED_GET_EXTSIZE(value) + VARHDRSZ);
+
+ /* decompress the data */
+ rawsize = LZ4_decompress_safe((char *) value + VARHDRSZ_COMPRESSED,
+ VARDATA(result),
+ VARSIZE(value) - VARHDRSZ_COMPRESSED,
+ VARDATA_COMPRESSED_GET_EXTSIZE(value));
+ if (rawsize < 0)
+ ereport(ERROR,
+ (errcode(ERRCODE_DATA_CORRUPTED),
+ errmsg_internal("compressed lz4 data is corrupt")));
+
+
+ SET_VARSIZE(result, rawsize + VARHDRSZ);
+
+ return result;
+#endif
+}
+
+/*
+ * Decompress part of a varlena that was compressed using LZ4.
+ */
+struct varlena *
+lz4_decompress_datum_slice(const struct varlena *value, int32 slicelength)
+{
+#ifndef USE_LZ4
+ NO_LZ4_SUPPORT();
+ return NULL; /* keep compiler quiet */
+#else
+ int32 rawsize;
+ struct varlena *result;
+
+ /* slice decompression not supported prior to 1.8.3 */
+ if (LZ4_versionNumber() < 10803)
+ return lz4_decompress_datum(value);
+
+ /* allocate memory for the uncompressed data */
+ result = (struct varlena *) palloc(slicelength + VARHDRSZ);
+
+ /* decompress the data */
+ rawsize = LZ4_decompress_safe_partial((char *) value + VARHDRSZ_COMPRESSED,
+ VARDATA(result),
+ VARSIZE(value) - VARHDRSZ_COMPRESSED,
+ slicelength,
+ slicelength);
+ if (rawsize < 0)
+ ereport(ERROR,
+ (errcode(ERRCODE_DATA_CORRUPTED),
+ errmsg_internal("compressed lz4 data is corrupt")));
+
+ SET_VARSIZE(result, rawsize + VARHDRSZ);
+
+ return result;
+#endif
+}
+
+/*
+ * Extract compression ID from a varlena.
+ *
+ * Returns TOAST_INVALID_COMPRESSION_ID if the varlena is not compressed.
+ */
+ToastCompressionId
+toast_get_compression_id(struct varlena *attr)
+{
+ ToastCompressionId cmid = TOAST_INVALID_COMPRESSION_ID;
+
+ /*
+ * If it is stored externally then fetch the compression method id from
+ * the external toast pointer. If compressed inline, fetch it from the
+ * toast compression header.
+ */
+ if (VARATT_IS_EXTERNAL_ONDISK(attr))
+ {
+ struct varatt_external toast_pointer;
+
+ VARATT_EXTERNAL_GET_POINTER(toast_pointer, attr);
+
+ if (VARATT_EXTERNAL_IS_COMPRESSED(toast_pointer))
+ cmid = VARATT_EXTERNAL_GET_COMPRESS_METHOD(toast_pointer);
+ }
+ else if (VARATT_IS_COMPRESSED(attr))
+ cmid = VARDATA_COMPRESSED_GET_COMPRESS_METHOD(attr);
+
+ return cmid;
+}
+
+/*
+ * CompressionNameToMethod - Get compression method from compression name
+ *
+ * Search in the available built-in methods. If the compression not found
+ * in the built-in methods then return InvalidCompressionMethod.
+ */
+char
+CompressionNameToMethod(const char *compression)
+{
+ if (strcmp(compression, "pglz") == 0)
+ return TOAST_PGLZ_COMPRESSION;
+ else if (strcmp(compression, "lz4") == 0)
+ {
+#ifndef USE_LZ4
+ NO_LZ4_SUPPORT();
+#endif
+ return TOAST_LZ4_COMPRESSION;
+ }
+
+ return InvalidCompressionMethod;
+}
+
+/*
+ * GetCompressionMethodName - Get compression method name
+ */
+const char *
+GetCompressionMethodName(char method)
+{
+ switch (method)
+ {
+ case TOAST_PGLZ_COMPRESSION:
+ return "pglz";
+ case TOAST_LZ4_COMPRESSION:
+ return "lz4";
+ default:
+ elog(ERROR, "invalid compression method %c", method);
+ return NULL; /* keep compiler quiet */
+ }
+}
diff --git a/src/backend/access/common/toast_internals.c b/src/backend/access/common/toast_internals.c
new file mode 100644
index 0000000..2d2fd60
--- /dev/null
+++ b/src/backend/access/common/toast_internals.c
@@ -0,0 +1,664 @@
+/*-------------------------------------------------------------------------
+ *
+ * toast_internals.c
+ * Functions for internal use by the TOAST system.
+ *
+ * Copyright (c) 2000-2021, PostgreSQL Global Development Group
+ *
+ * IDENTIFICATION
+ * src/backend/access/common/toast_internals.c
+ *
+ *-------------------------------------------------------------------------
+ */
+
+#include "postgres.h"
+
+#include "access/detoast.h"
+#include "access/genam.h"
+#include "access/heapam.h"
+#include "access/heaptoast.h"
+#include "access/table.h"
+#include "access/toast_internals.h"
+#include "access/xact.h"
+#include "catalog/catalog.h"
+#include "common/pg_lzcompress.h"
+#include "miscadmin.h"
+#include "utils/fmgroids.h"
+#include "utils/rel.h"
+#include "utils/snapmgr.h"
+
+static bool toastrel_valueid_exists(Relation toastrel, Oid valueid);
+static bool toastid_valueid_exists(Oid toastrelid, Oid valueid);
+
+/* ----------
+ * toast_compress_datum -
+ *
+ * Create a compressed version of a varlena datum
+ *
+ * If we fail (ie, compressed result is actually bigger than original)
+ * then return NULL. We must not use compressed data if it'd expand
+ * the tuple!
+ *
+ * We use VAR{SIZE,DATA}_ANY so we can handle short varlenas here without
+ * copying them. But we can't handle external or compressed datums.
+ * ----------
+ */
+Datum
+toast_compress_datum(Datum value, char cmethod)
+{
+ struct varlena *tmp = NULL;
+ int32 valsize;
+ ToastCompressionId cmid = TOAST_INVALID_COMPRESSION_ID;
+
+ Assert(!VARATT_IS_EXTERNAL(DatumGetPointer(value)));
+ Assert(!VARATT_IS_COMPRESSED(DatumGetPointer(value)));
+
+ valsize = VARSIZE_ANY_EXHDR(DatumGetPointer(value));
+
+ /* If the compression method is not valid, use the current default */
+ if (!CompressionMethodIsValid(cmethod))
+ cmethod = default_toast_compression;
+
+ /*
+ * Call appropriate compression routine for the compression method.
+ */
+ switch (cmethod)
+ {
+ case TOAST_PGLZ_COMPRESSION:
+ tmp = pglz_compress_datum((const struct varlena *) value);
+ cmid = TOAST_PGLZ_COMPRESSION_ID;
+ break;
+ case TOAST_LZ4_COMPRESSION:
+ tmp = lz4_compress_datum((const struct varlena *) value);
+ cmid = TOAST_LZ4_COMPRESSION_ID;
+ break;
+ default:
+ elog(ERROR, "invalid compression method %c", cmethod);
+ }
+
+ if (tmp == NULL)
+ return PointerGetDatum(NULL);
+
+ /*
+ * We recheck the actual size even if compression reports success, because
+ * it might be satisfied with having saved as little as one byte in the
+ * compressed data --- which could turn into a net loss once you consider
+ * header and alignment padding. Worst case, the compressed format might
+ * require three padding bytes (plus header, which is included in
+ * VARSIZE(tmp)), whereas the uncompressed format would take only one
+ * header byte and no padding if the value is short enough. So we insist
+ * on a savings of more than 2 bytes to ensure we have a gain.
+ */
+ if (VARSIZE(tmp) < valsize - 2)
+ {
+ /* successful compression */
+ Assert(cmid != TOAST_INVALID_COMPRESSION_ID);
+ TOAST_COMPRESS_SET_SIZE_AND_COMPRESS_METHOD(tmp, valsize, cmid);
+ return PointerGetDatum(tmp);
+ }
+ else
+ {
+ /* incompressible data */
+ pfree(tmp);
+ return PointerGetDatum(NULL);
+ }
+}
+
+/* ----------
+ * toast_save_datum -
+ *
+ * Save one single datum into the secondary relation and return
+ * a Datum reference for it.
+ *
+ * rel: the main relation we're working with (not the toast rel!)
+ * value: datum to be pushed to toast storage
+ * oldexternal: if not NULL, toast pointer previously representing the datum
+ * options: options to be passed to heap_insert() for toast rows
+ * ----------
+ */
+Datum
+toast_save_datum(Relation rel, Datum value,
+ struct varlena *oldexternal, int options)
+{
+ Relation toastrel;
+ Relation *toastidxs;
+ HeapTuple toasttup;
+ TupleDesc toasttupDesc;
+ Datum t_values[3];
+ bool t_isnull[3];
+ CommandId mycid = GetCurrentCommandId(true);
+ struct varlena *result;
+ struct varatt_external toast_pointer;
+ union
+ {
+ struct varlena hdr;
+ /* this is to make the union big enough for a chunk: */
+ char data[TOAST_MAX_CHUNK_SIZE + VARHDRSZ];
+ /* ensure union is aligned well enough: */
+ int32 align_it;
+ } chunk_data;
+ int32 chunk_size;
+ int32 chunk_seq = 0;
+ char *data_p;
+ int32 data_todo;
+ Pointer dval = DatumGetPointer(value);
+ int num_indexes;
+ int validIndex;
+
+ Assert(!VARATT_IS_EXTERNAL(value));
+
+ /*
+ * Open the toast relation and its indexes. We can use the index to check
+ * uniqueness of the OID we assign to the toasted item, even though it has
+ * additional columns besides OID.
+ */
+ toastrel = table_open(rel->rd_rel->reltoastrelid, RowExclusiveLock);
+ toasttupDesc = toastrel->rd_att;
+
+ /* Open all the toast indexes and look for the valid one */
+ validIndex = toast_open_indexes(toastrel,
+ RowExclusiveLock,
+ &toastidxs,
+ &num_indexes);
+
+ /*
+ * Get the data pointer and length, and compute va_rawsize and va_extinfo.
+ *
+ * va_rawsize is the size of the equivalent fully uncompressed datum, so
+ * we have to adjust for short headers.
+ *
+ * va_extinfo stored the actual size of the data payload in the toast
+ * records and the compression method in first 2 bits if data is
+ * compressed.
+ */
+ if (VARATT_IS_SHORT(dval))
+ {
+ data_p = VARDATA_SHORT(dval);
+ data_todo = VARSIZE_SHORT(dval) - VARHDRSZ_SHORT;
+ toast_pointer.va_rawsize = data_todo + VARHDRSZ; /* as if not short */
+ toast_pointer.va_extinfo = data_todo;
+ }
+ else if (VARATT_IS_COMPRESSED(dval))
+ {
+ data_p = VARDATA(dval);
+ data_todo = VARSIZE(dval) - VARHDRSZ;
+ /* rawsize in a compressed datum is just the size of the payload */
+ toast_pointer.va_rawsize = VARDATA_COMPRESSED_GET_EXTSIZE(dval) + VARHDRSZ;
+
+ /* set external size and compression method */
+ VARATT_EXTERNAL_SET_SIZE_AND_COMPRESS_METHOD(toast_pointer, data_todo,
+ VARDATA_COMPRESSED_GET_COMPRESS_METHOD(dval));
+ /* Assert that the numbers look like it's compressed */
+ Assert(VARATT_EXTERNAL_IS_COMPRESSED(toast_pointer));
+ }
+ else
+ {
+ data_p = VARDATA(dval);
+ data_todo = VARSIZE(dval) - VARHDRSZ;
+ toast_pointer.va_rawsize = VARSIZE(dval);
+ toast_pointer.va_extinfo = data_todo;
+ }
+
+ /*
+ * Insert the correct table OID into the result TOAST pointer.
+ *
+ * Normally this is the actual OID of the target toast table, but during
+ * table-rewriting operations such as CLUSTER, we have to insert the OID
+ * of the table's real permanent toast table instead. rd_toastoid is set
+ * if we have to substitute such an OID.
+ */
+ if (OidIsValid(rel->rd_toastoid))
+ toast_pointer.va_toastrelid = rel->rd_toastoid;
+ else
+ toast_pointer.va_toastrelid = RelationGetRelid(toastrel);
+
+ /*
+ * Choose an OID to use as the value ID for this toast value.
+ *
+ * Normally we just choose an unused OID within the toast table. But
+ * during table-rewriting operations where we are preserving an existing
+ * toast table OID, we want to preserve toast value OIDs too. So, if
+ * rd_toastoid is set and we had a prior external value from that same
+ * toast table, re-use its value ID. If we didn't have a prior external
+ * value (which is a corner case, but possible if the table's attstorage
+ * options have been changed), we have to pick a value ID that doesn't
+ * conflict with either new or existing toast value OIDs.
+ */
+ if (!OidIsValid(rel->rd_toastoid))
+ {
+ /* normal case: just choose an unused OID */
+ toast_pointer.va_valueid =
+ GetNewOidWithIndex(toastrel,
+ RelationGetRelid(toastidxs[validIndex]),
+ (AttrNumber) 1);
+ }
+ else
+ {
+ /* rewrite case: check to see if value was in old toast table */
+ toast_pointer.va_valueid = InvalidOid;
+ if (oldexternal != NULL)
+ {
+ struct varatt_external old_toast_pointer;
+
+ Assert(VARATT_IS_EXTERNAL_ONDISK(oldexternal));
+ /* Must copy to access aligned fields */
+ VARATT_EXTERNAL_GET_POINTER(old_toast_pointer, oldexternal);
+ if (old_toast_pointer.va_toastrelid == rel->rd_toastoid)
+ {
+ /* This value came from the old toast table; reuse its OID */
+ toast_pointer.va_valueid = old_toast_pointer.va_valueid;
+
+ /*
+ * There is a corner case here: the table rewrite might have
+ * to copy both live and recently-dead versions of a row, and
+ * those versions could easily reference the same toast value.
+ * When we copy the second or later version of such a row,
+ * reusing the OID will mean we select an OID that's already
+ * in the new toast table. Check for that, and if so, just
+ * fall through without writing the data again.
+ *
+ * While annoying and ugly-looking, this is a good thing
+ * because it ensures that we wind up with only one copy of
+ * the toast value when there is only one copy in the old
+ * toast table. Before we detected this case, we'd have made
+ * multiple copies, wasting space; and what's worse, the
+ * copies belonging to already-deleted heap tuples would not
+ * be reclaimed by VACUUM.
+ */
+ if (toastrel_valueid_exists(toastrel,
+ toast_pointer.va_valueid))
+ {
+ /* Match, so short-circuit the data storage loop below */
+ data_todo = 0;
+ }
+ }
+ }
+ if (toast_pointer.va_valueid == InvalidOid)
+ {
+ /*
+ * new value; must choose an OID that doesn't conflict in either
+ * old or new toast table
+ */
+ do
+ {
+ toast_pointer.va_valueid =
+ GetNewOidWithIndex(toastrel,
+ RelationGetRelid(toastidxs[validIndex]),
+ (AttrNumber) 1);
+ } while (toastid_valueid_exists(rel->rd_toastoid,
+ toast_pointer.va_valueid));
+ }
+ }
+
+ /*
+ * Initialize constant parts of the tuple data
+ */
+ t_values[0] = ObjectIdGetDatum(toast_pointer.va_valueid);
+ t_values[2] = PointerGetDatum(&chunk_data);
+ t_isnull[0] = false;
+ t_isnull[1] = false;
+ t_isnull[2] = false;
+
+ /*
+ * Split up the item into chunks
+ */
+ while (data_todo > 0)
+ {
+ int i;
+
+ CHECK_FOR_INTERRUPTS();
+
+ /*
+ * Calculate the size of this chunk
+ */
+ chunk_size = Min(TOAST_MAX_CHUNK_SIZE, data_todo);
+
+ /*
+ * Build a tuple and store it
+ */
+ t_values[1] = Int32GetDatum(chunk_seq++);
+ SET_VARSIZE(&chunk_data, chunk_size + VARHDRSZ);
+ memcpy(VARDATA(&chunk_data), data_p, chunk_size);
+ toasttup = heap_form_tuple(toasttupDesc, t_values, t_isnull);
+
+ heap_insert(toastrel, toasttup, mycid, options, NULL);
+
+ /*
+ * Create the index entry. We cheat a little here by not using
+ * FormIndexDatum: this relies on the knowledge that the index columns
+ * are the same as the initial columns of the table for all the
+ * indexes. We also cheat by not providing an IndexInfo: this is okay
+ * for now because btree doesn't need one, but we might have to be
+ * more honest someday.
+ *
+ * Note also that there had better not be any user-created index on
+ * the TOAST table, since we don't bother to update anything else.
+ */
+ for (i = 0; i < num_indexes; i++)
+ {
+ /* Only index relations marked as ready can be updated */
+ if (toastidxs[i]->rd_index->indisready)
+ index_insert(toastidxs[i], t_values, t_isnull,
+ &(toasttup->t_self),
+ toastrel,
+ toastidxs[i]->rd_index->indisunique ?
+ UNIQUE_CHECK_YES : UNIQUE_CHECK_NO,
+ false, NULL);
+ }
+
+ /*
+ * Free memory
+ */
+ heap_freetuple(toasttup);
+
+ /*
+ * Move on to next chunk
+ */
+ data_todo -= chunk_size;
+ data_p += chunk_size;
+ }
+
+ /*
+ * Done - close toast relation and its indexes but keep the lock until
+ * commit, so as a concurrent reindex done directly on the toast relation
+ * would be able to wait for this transaction.
+ */
+ toast_close_indexes(toastidxs, num_indexes, NoLock);
+ table_close(toastrel, NoLock);
+
+ /*
+ * Create the TOAST pointer value that we'll return
+ */
+ result = (struct varlena *) palloc(TOAST_POINTER_SIZE);
+ SET_VARTAG_EXTERNAL(result, VARTAG_ONDISK);
+ memcpy(VARDATA_EXTERNAL(result), &toast_pointer, sizeof(toast_pointer));
+
+ return PointerGetDatum(result);
+}
+
+/* ----------
+ * toast_delete_datum -
+ *
+ * Delete a single external stored value.
+ * ----------
+ */
+void
+toast_delete_datum(Relation rel, Datum value, bool is_speculative)
+{
+ struct varlena *attr = (struct varlena *) DatumGetPointer(value);
+ struct varatt_external toast_pointer;
+ Relation toastrel;
+ Relation *toastidxs;
+ ScanKeyData toastkey;
+ SysScanDesc toastscan;
+ HeapTuple toasttup;
+ int num_indexes;
+ int validIndex;
+ SnapshotData SnapshotToast;
+
+ if (!VARATT_IS_EXTERNAL_ONDISK(attr))
+ return;
+
+ /* Must copy to access aligned fields */
+ VARATT_EXTERNAL_GET_POINTER(toast_pointer, attr);
+
+ /*
+ * Open the toast relation and its indexes
+ */
+ toastrel = table_open(toast_pointer.va_toastrelid, RowExclusiveLock);
+
+ /* Fetch valid relation used for process */
+ validIndex = toast_open_indexes(toastrel,
+ RowExclusiveLock,
+ &toastidxs,
+ &num_indexes);
+
+ /*
+ * Setup a scan key to find chunks with matching va_valueid
+ */
+ ScanKeyInit(&toastkey,
+ (AttrNumber) 1,
+ BTEqualStrategyNumber, F_OIDEQ,
+ ObjectIdGetDatum(toast_pointer.va_valueid));
+
+ /*
+ * Find all the chunks. (We don't actually care whether we see them in
+ * sequence or not, but since we've already locked the index we might as
+ * well use systable_beginscan_ordered.)
+ */
+ init_toast_snapshot(&SnapshotToast);
+ toastscan = systable_beginscan_ordered(toastrel, toastidxs[validIndex],
+ &SnapshotToast, 1, &toastkey);
+ while ((toasttup = systable_getnext_ordered(toastscan, ForwardScanDirection)) != NULL)
+ {
+ /*
+ * Have a chunk, delete it
+ */
+ if (is_speculative)
+ heap_abort_speculative(toastrel, &toasttup->t_self);
+ else
+ simple_heap_delete(toastrel, &toasttup->t_self);
+ }
+
+ /*
+ * End scan and close relations but keep the lock until commit, so as a
+ * concurrent reindex done directly on the toast relation would be able to
+ * wait for this transaction.
+ */
+ systable_endscan_ordered(toastscan);
+ toast_close_indexes(toastidxs, num_indexes, NoLock);
+ table_close(toastrel, NoLock);
+}
+
+/* ----------
+ * toastrel_valueid_exists -
+ *
+ * Test whether a toast value with the given ID exists in the toast relation.
+ * For safety, we consider a value to exist if there are either live or dead
+ * toast rows with that ID; see notes for GetNewOidWithIndex().
+ * ----------
+ */
+static bool
+toastrel_valueid_exists(Relation toastrel, Oid valueid)
+{
+ bool result = false;
+ ScanKeyData toastkey;
+ SysScanDesc toastscan;
+ int num_indexes;
+ int validIndex;
+ Relation *toastidxs;
+
+ /* Fetch a valid index relation */
+ validIndex = toast_open_indexes(toastrel,
+ RowExclusiveLock,
+ &toastidxs,
+ &num_indexes);
+
+ /*
+ * Setup a scan key to find chunks with matching va_valueid
+ */
+ ScanKeyInit(&toastkey,
+ (AttrNumber) 1,
+ BTEqualStrategyNumber, F_OIDEQ,
+ ObjectIdGetDatum(valueid));
+
+ /*
+ * Is there any such chunk?
+ */
+ toastscan = systable_beginscan(toastrel,
+ RelationGetRelid(toastidxs[validIndex]),
+ true, SnapshotAny, 1, &toastkey);
+
+ if (systable_getnext(toastscan) != NULL)
+ result = true;
+
+ systable_endscan(toastscan);
+
+ /* Clean up */
+ toast_close_indexes(toastidxs, num_indexes, RowExclusiveLock);
+
+ return result;
+}
+
+/* ----------
+ * toastid_valueid_exists -
+ *
+ * As above, but work from toast rel's OID not an open relation
+ * ----------
+ */
+static bool
+toastid_valueid_exists(Oid toastrelid, Oid valueid)
+{
+ bool result;
+ Relation toastrel;
+
+ toastrel = table_open(toastrelid, AccessShareLock);
+
+ result = toastrel_valueid_exists(toastrel, valueid);
+
+ table_close(toastrel, AccessShareLock);
+
+ return result;
+}
+
+/* ----------
+ * toast_get_valid_index
+ *
+ * Get OID of valid index associated to given toast relation. A toast
+ * relation can have only one valid index at the same time.
+ */
+Oid
+toast_get_valid_index(Oid toastoid, LOCKMODE lock)
+{
+ int num_indexes;
+ int validIndex;
+ Oid validIndexOid;
+ Relation *toastidxs;
+ Relation toastrel;
+
+ /* Open the toast relation */
+ toastrel = table_open(toastoid, lock);
+
+ /* Look for the valid index of the toast relation */
+ validIndex = toast_open_indexes(toastrel,
+ lock,
+ &toastidxs,
+ &num_indexes);
+ validIndexOid = RelationGetRelid(toastidxs[validIndex]);
+
+ /* Close the toast relation and all its indexes */
+ toast_close_indexes(toastidxs, num_indexes, NoLock);
+ table_close(toastrel, NoLock);
+
+ return validIndexOid;
+}
+
+/* ----------
+ * toast_open_indexes
+ *
+ * Get an array of the indexes associated to the given toast relation
+ * and return as well the position of the valid index used by the toast
+ * relation in this array. It is the responsibility of the caller of this
+ * function to close the indexes as well as free them.
+ */
+int
+toast_open_indexes(Relation toastrel,
+ LOCKMODE lock,
+ Relation **toastidxs,
+ int *num_indexes)
+{
+ int i = 0;
+ int res = 0;
+ bool found = false;
+ List *indexlist;
+ ListCell *lc;
+
+ /* Get index list of the toast relation */
+ indexlist = RelationGetIndexList(toastrel);
+ Assert(indexlist != NIL);
+
+ *num_indexes = list_length(indexlist);
+
+ /* Open all the index relations */
+ *toastidxs = (Relation *) palloc(*num_indexes * sizeof(Relation));
+ foreach(lc, indexlist)
+ (*toastidxs)[i++] = index_open(lfirst_oid(lc), lock);
+
+ /* Fetch the first valid index in list */
+ for (i = 0; i < *num_indexes; i++)
+ {
+ Relation toastidx = (*toastidxs)[i];
+
+ if (toastidx->rd_index->indisvalid)
+ {
+ res = i;
+ found = true;
+ break;
+ }
+ }
+
+ /*
+ * Free index list, not necessary anymore as relations are opened and a
+ * valid index has been found.
+ */
+ list_free(indexlist);
+
+ /*
+ * The toast relation should have one valid index, so something is going
+ * wrong if there is nothing.
+ */
+ if (!found)
+ elog(ERROR, "no valid index found for toast relation with Oid %u",
+ RelationGetRelid(toastrel));
+
+ return res;
+}
+
+/* ----------
+ * toast_close_indexes
+ *
+ * Close an array of indexes for a toast relation and free it. This should
+ * be called for a set of indexes opened previously with toast_open_indexes.
+ */
+void
+toast_close_indexes(Relation *toastidxs, int num_indexes, LOCKMODE lock)
+{
+ int i;
+
+ /* Close relations and clean up things */
+ for (i = 0; i < num_indexes; i++)
+ index_close(toastidxs[i], lock);
+ pfree(toastidxs);
+}
+
+/* ----------
+ * init_toast_snapshot
+ *
+ * Initialize an appropriate TOAST snapshot. We must use an MVCC snapshot
+ * to initialize the TOAST snapshot; since we don't know which one to use,
+ * just use the oldest one. This is safe: at worst, we will get a "snapshot
+ * too old" error that might have been avoided otherwise.
+ */
+void
+init_toast_snapshot(Snapshot toast_snapshot)
+{
+ Snapshot snapshot = GetOldestSnapshot();
+
+ /*
+ * GetOldestSnapshot returns NULL if the session has no active snapshots.
+ * We can get that if, for example, a procedure fetches a toasted value
+ * into a local variable, commits, and then tries to detoast the value.
+ * Such coding is unsafe, because once we commit there is nothing to
+ * prevent the toast data from being deleted. Detoasting *must* happen in
+ * the same transaction that originally fetched the toast pointer. Hence,
+ * rather than trying to band-aid over the problem, throw an error. (This
+ * is not very much protection, because in many scenarios the procedure
+ * would have already created a new transaction snapshot, preventing us
+ * from detecting the problem. But it's better than nothing, and for sure
+ * we shouldn't expend code on masking the problem more.)
+ */
+ if (snapshot == NULL)
+ elog(ERROR, "cannot fetch toast data without an active snapshot");
+
+ InitToastSnapshot(*toast_snapshot, snapshot->lsn, snapshot->whenTaken);
+}
diff --git a/src/backend/access/common/tupconvert.c b/src/backend/access/common/tupconvert.c
new file mode 100644
index 0000000..64f5439
--- /dev/null
+++ b/src/backend/access/common/tupconvert.c
@@ -0,0 +1,293 @@
+/*-------------------------------------------------------------------------
+ *
+ * tupconvert.c
+ * Tuple conversion support.
+ *
+ * These functions provide conversion between rowtypes that are logically
+ * equivalent but might have columns in a different order or different sets of
+ * dropped columns.
+ *
+ * Portions Copyright (c) 1996-2021, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ *
+ * IDENTIFICATION
+ * src/backend/access/common/tupconvert.c
+ *
+ *-------------------------------------------------------------------------
+ */
+#include "postgres.h"
+
+#include "access/tupconvert.h"
+#include "executor/tuptable.h"
+
+
+/*
+ * The conversion setup routines have the following common API:
+ *
+ * The setup routine checks using attmap.c whether the given source and
+ * destination tuple descriptors are logically compatible. If not, it throws
+ * an error. If so, it returns NULL if they are physically compatible (ie, no
+ * conversion is needed), else a TupleConversionMap that can be used by
+ * execute_attr_map_tuple or execute_attr_map_slot to perform the conversion.
+ *
+ * The TupleConversionMap, if needed, is palloc'd in the caller's memory
+ * context. Also, the given tuple descriptors are referenced by the map,
+ * so they must survive as long as the map is needed.
+ *
+ * The caller must supply a suitable primary error message to be used if
+ * a compatibility error is thrown. Recommended coding practice is to use
+ * gettext_noop() on this string, so that it is translatable but won't
+ * actually be translated unless the error gets thrown.
+ *
+ *
+ * Implementation notes:
+ *
+ * The key component of a TupleConversionMap is an attrMap[] array with
+ * one entry per output column. This entry contains the 1-based index of
+ * the corresponding input column, or zero to force a NULL value (for
+ * a dropped output column). The TupleConversionMap also contains workspace
+ * arrays.
+ */
+
+
+/*
+ * Set up for tuple conversion, matching input and output columns by
+ * position. (Dropped columns are ignored in both input and output.)
+ */
+TupleConversionMap *
+convert_tuples_by_position(TupleDesc indesc,
+ TupleDesc outdesc,
+ const char *msg)
+{
+ TupleConversionMap *map;
+ int n;
+ AttrMap *attrMap;
+
+ /* Verify compatibility and prepare attribute-number map */
+ attrMap = build_attrmap_by_position(indesc, outdesc, msg);
+
+ if (attrMap == NULL)
+ {
+ /* runtime conversion is not needed */
+ return NULL;
+ }
+
+ /* Prepare the map structure */
+ map = (TupleConversionMap *) palloc(sizeof(TupleConversionMap));
+ map->indesc = indesc;
+ map->outdesc = outdesc;
+ map->attrMap = attrMap;
+ /* preallocate workspace for Datum arrays */
+ n = outdesc->natts + 1; /* +1 for NULL */
+ map->outvalues = (Datum *) palloc(n * sizeof(Datum));
+ map->outisnull = (bool *) palloc(n * sizeof(bool));
+ n = indesc->natts + 1; /* +1 for NULL */
+ map->invalues = (Datum *) palloc(n * sizeof(Datum));
+ map->inisnull = (bool *) palloc(n * sizeof(bool));
+ map->invalues[0] = (Datum) 0; /* set up the NULL entry */
+ map->inisnull[0] = true;
+
+ return map;
+}
+
+/*
+ * Set up for tuple conversion, matching input and output columns by name.
+ * (Dropped columns are ignored in both input and output.) This is intended
+ * for use when the rowtypes are related by inheritance, so we expect an exact
+ * match of both type and typmod. The error messages will be a bit unhelpful
+ * unless both rowtypes are named composite types.
+ */
+TupleConversionMap *
+convert_tuples_by_name(TupleDesc indesc,
+ TupleDesc outdesc)
+{
+ TupleConversionMap *map;
+ AttrMap *attrMap;
+ int n = outdesc->natts;
+
+ /* Verify compatibility and prepare attribute-number map */
+ attrMap = build_attrmap_by_name_if_req(indesc, outdesc);
+
+ if (attrMap == NULL)
+ {
+ /* runtime conversion is not needed */
+ return NULL;
+ }
+
+ /* Prepare the map structure */
+ map = (TupleConversionMap *) palloc(sizeof(TupleConversionMap));
+ map->indesc = indesc;
+ map->outdesc = outdesc;
+ map->attrMap = attrMap;
+ /* preallocate workspace for Datum arrays */
+ map->outvalues = (Datum *) palloc(n * sizeof(Datum));
+ map->outisnull = (bool *) palloc(n * sizeof(bool));
+ n = indesc->natts + 1; /* +1 for NULL */
+ map->invalues = (Datum *) palloc(n * sizeof(Datum));
+ map->inisnull = (bool *) palloc(n * sizeof(bool));
+ map->invalues[0] = (Datum) 0; /* set up the NULL entry */
+ map->inisnull[0] = true;
+
+ return map;
+}
+
+/*
+ * Perform conversion of a tuple according to the map.
+ */
+HeapTuple
+execute_attr_map_tuple(HeapTuple tuple, TupleConversionMap *map)
+{
+ AttrMap *attrMap = map->attrMap;
+ Datum *invalues = map->invalues;
+ bool *inisnull = map->inisnull;
+ Datum *outvalues = map->outvalues;
+ bool *outisnull = map->outisnull;
+ int i;
+
+ /*
+ * Extract all the values of the old tuple, offsetting the arrays so that
+ * invalues[0] is left NULL and invalues[1] is the first source attribute;
+ * this exactly matches the numbering convention in attrMap.
+ */
+ heap_deform_tuple(tuple, map->indesc, invalues + 1, inisnull + 1);
+
+ /*
+ * Transpose into proper fields of the new tuple.
+ */
+ Assert(attrMap->maplen == map->outdesc->natts);
+ for (i = 0; i < attrMap->maplen; i++)
+ {
+ int j = attrMap->attnums[i];
+
+ outvalues[i] = invalues[j];
+ outisnull[i] = inisnull[j];
+ }
+
+ /*
+ * Now form the new tuple.
+ */
+ return heap_form_tuple(map->outdesc, outvalues, outisnull);
+}
+
+/*
+ * Perform conversion of a tuple slot according to the map.
+ */
+TupleTableSlot *
+execute_attr_map_slot(AttrMap *attrMap,
+ TupleTableSlot *in_slot,
+ TupleTableSlot *out_slot)
+{
+ Datum *invalues;
+ bool *inisnull;
+ Datum *outvalues;
+ bool *outisnull;
+ int outnatts;
+ int i;
+
+ /* Sanity checks */
+ Assert(in_slot->tts_tupleDescriptor != NULL &&
+ out_slot->tts_tupleDescriptor != NULL);
+ Assert(in_slot->tts_values != NULL && out_slot->tts_values != NULL);
+
+ outnatts = out_slot->tts_tupleDescriptor->natts;
+
+ /* Extract all the values of the in slot. */
+ slot_getallattrs(in_slot);
+
+ /* Before doing the mapping, clear any old contents from the out slot */
+ ExecClearTuple(out_slot);
+
+ invalues = in_slot->tts_values;
+ inisnull = in_slot->tts_isnull;
+ outvalues = out_slot->tts_values;
+ outisnull = out_slot->tts_isnull;
+
+ /* Transpose into proper fields of the out slot. */
+ for (i = 0; i < outnatts; i++)
+ {
+ int j = attrMap->attnums[i] - 1;
+
+ /* attrMap->attnums[i] == 0 means it's a NULL datum. */
+ if (j == -1)
+ {
+ outvalues[i] = (Datum) 0;
+ outisnull[i] = true;
+ }
+ else
+ {
+ outvalues[i] = invalues[j];
+ outisnull[i] = inisnull[j];
+ }
+ }
+
+ ExecStoreVirtualTuple(out_slot);
+
+ return out_slot;
+}
+
+/*
+ * Perform conversion of bitmap of columns according to the map.
+ *
+ * The input and output bitmaps are offset by
+ * FirstLowInvalidHeapAttributeNumber to accommodate system cols, like the
+ * column-bitmaps in RangeTblEntry.
+ */
+Bitmapset *
+execute_attr_map_cols(AttrMap *attrMap, Bitmapset *in_cols)
+{
+ Bitmapset *out_cols;
+ int out_attnum;
+
+ /* fast path for the common trivial case */
+ if (in_cols == NULL)
+ return NULL;
+
+ /*
+ * For each output column, check which input column it corresponds to.
+ */
+ out_cols = NULL;
+
+ for (out_attnum = FirstLowInvalidHeapAttributeNumber;
+ out_attnum <= attrMap->maplen;
+ out_attnum++)
+ {
+ int in_attnum;
+
+ if (out_attnum < 0)
+ {
+ /* System column. No mapping. */
+ in_attnum = out_attnum;
+ }
+ else if (out_attnum == 0)
+ continue;
+ else
+ {
+ /* normal user column */
+ in_attnum = attrMap->attnums[out_attnum - 1];
+
+ if (in_attnum == 0)
+ continue;
+ }
+
+ if (bms_is_member(in_attnum - FirstLowInvalidHeapAttributeNumber, in_cols))
+ out_cols = bms_add_member(out_cols, out_attnum - FirstLowInvalidHeapAttributeNumber);
+ }
+
+ return out_cols;
+}
+
+/*
+ * Free a TupleConversionMap structure.
+ */
+void
+free_conversion_map(TupleConversionMap *map)
+{
+ /* indesc and outdesc are not ours to free */
+ free_attrmap(map->attrMap);
+ pfree(map->invalues);
+ pfree(map->inisnull);
+ pfree(map->outvalues);
+ pfree(map->outisnull);
+ pfree(map);
+}
diff --git a/src/backend/access/common/tupdesc.c b/src/backend/access/common/tupdesc.c
new file mode 100644
index 0000000..4c63bd4
--- /dev/null
+++ b/src/backend/access/common/tupdesc.c
@@ -0,0 +1,912 @@
+/*-------------------------------------------------------------------------
+ *
+ * tupdesc.c
+ * POSTGRES tuple descriptor support code
+ *
+ * Portions Copyright (c) 1996-2021, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ *
+ * IDENTIFICATION
+ * src/backend/access/common/tupdesc.c
+ *
+ * NOTES
+ * some of the executor utility code such as "ExecTypeFromTL" should be
+ * moved here.
+ *
+ *-------------------------------------------------------------------------
+ */
+
+#include "postgres.h"
+
+#include "access/htup_details.h"
+#include "access/toast_compression.h"
+#include "access/tupdesc_details.h"
+#include "catalog/pg_collation.h"
+#include "catalog/pg_type.h"
+#include "common/hashfn.h"
+#include "miscadmin.h"
+#include "parser/parse_type.h"
+#include "utils/acl.h"
+#include "utils/builtins.h"
+#include "utils/datum.h"
+#include "utils/resowner_private.h"
+#include "utils/syscache.h"
+
+
+/*
+ * CreateTemplateTupleDesc
+ * This function allocates an empty tuple descriptor structure.
+ *
+ * Tuple type ID information is initially set for an anonymous record type;
+ * caller can overwrite this if needed.
+ */
+TupleDesc
+CreateTemplateTupleDesc(int natts)
+{
+ TupleDesc desc;
+
+ /*
+ * sanity checks
+ */
+ AssertArg(natts >= 0);
+
+ /*
+ * Allocate enough memory for the tuple descriptor, including the
+ * attribute rows.
+ *
+ * Note: the attribute array stride is sizeof(FormData_pg_attribute),
+ * since we declare the array elements as FormData_pg_attribute for
+ * notational convenience. However, we only guarantee that the first
+ * ATTRIBUTE_FIXED_PART_SIZE bytes of each entry are valid; most code that
+ * copies tupdesc entries around copies just that much. In principle that
+ * could be less due to trailing padding, although with the current
+ * definition of pg_attribute there probably isn't any padding.
+ */
+ desc = (TupleDesc) palloc(offsetof(struct TupleDescData, attrs) +
+ natts * sizeof(FormData_pg_attribute));
+
+ /*
+ * Initialize other fields of the tupdesc.
+ */
+ desc->natts = natts;
+ desc->constr = NULL;
+ desc->tdtypeid = RECORDOID;
+ desc->tdtypmod = -1;
+ desc->tdrefcount = -1; /* assume not reference-counted */
+
+ return desc;
+}
+
+/*
+ * CreateTupleDesc
+ * This function allocates a new TupleDesc by copying a given
+ * Form_pg_attribute array.
+ *
+ * Tuple type ID information is initially set for an anonymous record type;
+ * caller can overwrite this if needed.
+ */
+TupleDesc
+CreateTupleDesc(int natts, Form_pg_attribute *attrs)
+{
+ TupleDesc desc;
+ int i;
+
+ desc = CreateTemplateTupleDesc(natts);
+
+ for (i = 0; i < natts; ++i)
+ memcpy(TupleDescAttr(desc, i), attrs[i], ATTRIBUTE_FIXED_PART_SIZE);
+
+ return desc;
+}
+
+/*
+ * CreateTupleDescCopy
+ * This function creates a new TupleDesc by copying from an existing
+ * TupleDesc.
+ *
+ * !!! Constraints and defaults are not copied !!!
+ */
+TupleDesc
+CreateTupleDescCopy(TupleDesc tupdesc)
+{
+ TupleDesc desc;
+ int i;
+
+ desc = CreateTemplateTupleDesc(tupdesc->natts);
+
+ /* Flat-copy the attribute array */
+ memcpy(TupleDescAttr(desc, 0),
+ TupleDescAttr(tupdesc, 0),
+ desc->natts * sizeof(FormData_pg_attribute));
+
+ /*
+ * Since we're not copying constraints and defaults, clear fields
+ * associated with them.
+ */
+ for (i = 0; i < desc->natts; i++)
+ {
+ Form_pg_attribute att = TupleDescAttr(desc, i);
+
+ att->attnotnull = false;
+ att->atthasdef = false;
+ att->atthasmissing = false;
+ att->attidentity = '\0';
+ att->attgenerated = '\0';
+ }
+
+ /* We can copy the tuple type identification, too */
+ desc->tdtypeid = tupdesc->tdtypeid;
+ desc->tdtypmod = tupdesc->tdtypmod;
+
+ return desc;
+}
+
+/*
+ * CreateTupleDescCopyConstr
+ * This function creates a new TupleDesc by copying from an existing
+ * TupleDesc (including its constraints and defaults).
+ */
+TupleDesc
+CreateTupleDescCopyConstr(TupleDesc tupdesc)
+{
+ TupleDesc desc;
+ TupleConstr *constr = tupdesc->constr;
+ int i;
+
+ desc = CreateTemplateTupleDesc(tupdesc->natts);
+
+ /* Flat-copy the attribute array */
+ memcpy(TupleDescAttr(desc, 0),
+ TupleDescAttr(tupdesc, 0),
+ desc->natts * sizeof(FormData_pg_attribute));
+
+ /* Copy the TupleConstr data structure, if any */
+ if (constr)
+ {
+ TupleConstr *cpy = (TupleConstr *) palloc0(sizeof(TupleConstr));
+
+ cpy->has_not_null = constr->has_not_null;
+ cpy->has_generated_stored = constr->has_generated_stored;
+
+ if ((cpy->num_defval = constr->num_defval) > 0)
+ {
+ cpy->defval = (AttrDefault *) palloc(cpy->num_defval * sizeof(AttrDefault));
+ memcpy(cpy->defval, constr->defval, cpy->num_defval * sizeof(AttrDefault));
+ for (i = cpy->num_defval - 1; i >= 0; i--)
+ cpy->defval[i].adbin = pstrdup(constr->defval[i].adbin);
+ }
+
+ if (constr->missing)
+ {
+ cpy->missing = (AttrMissing *) palloc(tupdesc->natts * sizeof(AttrMissing));
+ memcpy(cpy->missing, constr->missing, tupdesc->natts * sizeof(AttrMissing));
+ for (i = tupdesc->natts - 1; i >= 0; i--)
+ {
+ if (constr->missing[i].am_present)
+ {
+ Form_pg_attribute attr = TupleDescAttr(tupdesc, i);
+
+ cpy->missing[i].am_value = datumCopy(constr->missing[i].am_value,
+ attr->attbyval,
+ attr->attlen);
+ }
+ }
+ }
+
+ if ((cpy->num_check = constr->num_check) > 0)
+ {
+ cpy->check = (ConstrCheck *) palloc(cpy->num_check * sizeof(ConstrCheck));
+ memcpy(cpy->check, constr->check, cpy->num_check * sizeof(ConstrCheck));
+ for (i = cpy->num_check - 1; i >= 0; i--)
+ {
+ cpy->check[i].ccname = pstrdup(constr->check[i].ccname);
+ cpy->check[i].ccbin = pstrdup(constr->check[i].ccbin);
+ cpy->check[i].ccvalid = constr->check[i].ccvalid;
+ cpy->check[i].ccnoinherit = constr->check[i].ccnoinherit;
+ }
+ }
+
+ desc->constr = cpy;
+ }
+
+ /* We can copy the tuple type identification, too */
+ desc->tdtypeid = tupdesc->tdtypeid;
+ desc->tdtypmod = tupdesc->tdtypmod;
+
+ return desc;
+}
+
+/*
+ * TupleDescCopy
+ * Copy a tuple descriptor into caller-supplied memory.
+ * The memory may be shared memory mapped at any address, and must
+ * be sufficient to hold TupleDescSize(src) bytes.
+ *
+ * !!! Constraints and defaults are not copied !!!
+ */
+void
+TupleDescCopy(TupleDesc dst, TupleDesc src)
+{
+ int i;
+
+ /* Flat-copy the header and attribute array */
+ memcpy(dst, src, TupleDescSize(src));
+
+ /*
+ * Since we're not copying constraints and defaults, clear fields
+ * associated with them.
+ */
+ for (i = 0; i < dst->natts; i++)
+ {
+ Form_pg_attribute att = TupleDescAttr(dst, i);
+
+ att->attnotnull = false;
+ att->atthasdef = false;
+ att->atthasmissing = false;
+ att->attidentity = '\0';
+ att->attgenerated = '\0';
+ }
+ dst->constr = NULL;
+
+ /*
+ * Also, assume the destination is not to be ref-counted. (Copying the
+ * source's refcount would be wrong in any case.)
+ */
+ dst->tdrefcount = -1;
+}
+
+/*
+ * TupleDescCopyEntry
+ * This function copies a single attribute structure from one tuple
+ * descriptor to another.
+ *
+ * !!! Constraints and defaults are not copied !!!
+ */
+void
+TupleDescCopyEntry(TupleDesc dst, AttrNumber dstAttno,
+ TupleDesc src, AttrNumber srcAttno)
+{
+ Form_pg_attribute dstAtt = TupleDescAttr(dst, dstAttno - 1);
+ Form_pg_attribute srcAtt = TupleDescAttr(src, srcAttno - 1);
+
+ /*
+ * sanity checks
+ */
+ AssertArg(PointerIsValid(src));
+ AssertArg(PointerIsValid(dst));
+ AssertArg(srcAttno >= 1);
+ AssertArg(srcAttno <= src->natts);
+ AssertArg(dstAttno >= 1);
+ AssertArg(dstAttno <= dst->natts);
+
+ memcpy(dstAtt, srcAtt, ATTRIBUTE_FIXED_PART_SIZE);
+
+ /*
+ * Aside from updating the attno, we'd better reset attcacheoff.
+ *
+ * XXX Actually, to be entirely safe we'd need to reset the attcacheoff of
+ * all following columns in dst as well. Current usage scenarios don't
+ * require that though, because all following columns will get initialized
+ * by other uses of this function or TupleDescInitEntry. So we cheat a
+ * bit to avoid a useless O(N^2) penalty.
+ */
+ dstAtt->attnum = dstAttno;
+ dstAtt->attcacheoff = -1;
+
+ /* since we're not copying constraints or defaults, clear these */
+ dstAtt->attnotnull = false;
+ dstAtt->atthasdef = false;
+ dstAtt->atthasmissing = false;
+ dstAtt->attidentity = '\0';
+ dstAtt->attgenerated = '\0';
+}
+
+/*
+ * Free a TupleDesc including all substructure
+ */
+void
+FreeTupleDesc(TupleDesc tupdesc)
+{
+ int i;
+
+ /*
+ * Possibly this should assert tdrefcount == 0, to disallow explicit
+ * freeing of un-refcounted tupdescs?
+ */
+ Assert(tupdesc->tdrefcount <= 0);
+
+ if (tupdesc->constr)
+ {
+ if (tupdesc->constr->num_defval > 0)
+ {
+ AttrDefault *attrdef = tupdesc->constr->defval;
+
+ for (i = tupdesc->constr->num_defval - 1; i >= 0; i--)
+ pfree(attrdef[i].adbin);
+ pfree(attrdef);
+ }
+ if (tupdesc->constr->missing)
+ {
+ AttrMissing *attrmiss = tupdesc->constr->missing;
+
+ for (i = tupdesc->natts - 1; i >= 0; i--)
+ {
+ if (attrmiss[i].am_present
+ && !TupleDescAttr(tupdesc, i)->attbyval)
+ pfree(DatumGetPointer(attrmiss[i].am_value));
+ }
+ pfree(attrmiss);
+ }
+ if (tupdesc->constr->num_check > 0)
+ {
+ ConstrCheck *check = tupdesc->constr->check;
+
+ for (i = tupdesc->constr->num_check - 1; i >= 0; i--)
+ {
+ pfree(check[i].ccname);
+ pfree(check[i].ccbin);
+ }
+ pfree(check);
+ }
+ pfree(tupdesc->constr);
+ }
+
+ pfree(tupdesc);
+}
+
+/*
+ * Increment the reference count of a tupdesc, and log the reference in
+ * CurrentResourceOwner.
+ *
+ * Do not apply this to tupdescs that are not being refcounted. (Use the
+ * macro PinTupleDesc for tupdescs of uncertain status.)
+ */
+void
+IncrTupleDescRefCount(TupleDesc tupdesc)
+{
+ Assert(tupdesc->tdrefcount >= 0);
+
+ ResourceOwnerEnlargeTupleDescs(CurrentResourceOwner);
+ tupdesc->tdrefcount++;
+ ResourceOwnerRememberTupleDesc(CurrentResourceOwner, tupdesc);
+}
+
+/*
+ * Decrement the reference count of a tupdesc, remove the corresponding
+ * reference from CurrentResourceOwner, and free the tupdesc if no more
+ * references remain.
+ *
+ * Do not apply this to tupdescs that are not being refcounted. (Use the
+ * macro ReleaseTupleDesc for tupdescs of uncertain status.)
+ */
+void
+DecrTupleDescRefCount(TupleDesc tupdesc)
+{
+ Assert(tupdesc->tdrefcount > 0);
+
+ ResourceOwnerForgetTupleDesc(CurrentResourceOwner, tupdesc);
+ if (--tupdesc->tdrefcount == 0)
+ FreeTupleDesc(tupdesc);
+}
+
+/*
+ * Compare two TupleDesc structures for logical equality
+ *
+ * Note: we deliberately do not check the attrelid and tdtypmod fields.
+ * This allows typcache.c to use this routine to see if a cached record type
+ * matches a requested type, and is harmless for relcache.c's uses.
+ * We don't compare tdrefcount, either.
+ */
+bool
+equalTupleDescs(TupleDesc tupdesc1, TupleDesc tupdesc2)
+{
+ int i,
+ n;
+
+ if (tupdesc1->natts != tupdesc2->natts)
+ return false;
+ if (tupdesc1->tdtypeid != tupdesc2->tdtypeid)
+ return false;
+
+ for (i = 0; i < tupdesc1->natts; i++)
+ {
+ Form_pg_attribute attr1 = TupleDescAttr(tupdesc1, i);
+ Form_pg_attribute attr2 = TupleDescAttr(tupdesc2, i);
+
+ /*
+ * We do not need to check every single field here: we can disregard
+ * attrelid and attnum (which were used to place the row in the attrs
+ * array in the first place). It might look like we could dispense
+ * with checking attlen/attbyval/attalign, since these are derived
+ * from atttypid; but in the case of dropped columns we must check
+ * them (since atttypid will be zero for all dropped columns) and in
+ * general it seems safer to check them always.
+ *
+ * attcacheoff must NOT be checked since it's possibly not set in both
+ * copies. We also intentionally ignore atthasmissing, since that's
+ * not very relevant in tupdescs, which lack the attmissingval field.
+ */
+ if (strcmp(NameStr(attr1->attname), NameStr(attr2->attname)) != 0)
+ return false;
+ if (attr1->atttypid != attr2->atttypid)
+ return false;
+ if (attr1->attstattarget != attr2->attstattarget)
+ return false;
+ if (attr1->attlen != attr2->attlen)
+ return false;
+ if (attr1->attndims != attr2->attndims)
+ return false;
+ if (attr1->atttypmod != attr2->atttypmod)
+ return false;
+ if (attr1->attbyval != attr2->attbyval)
+ return false;
+ if (attr1->attalign != attr2->attalign)
+ return false;
+ if (attr1->attstorage != attr2->attstorage)
+ return false;
+ if (attr1->attcompression != attr2->attcompression)
+ return false;
+ if (attr1->attnotnull != attr2->attnotnull)
+ return false;
+ if (attr1->atthasdef != attr2->atthasdef)
+ return false;
+ if (attr1->attidentity != attr2->attidentity)
+ return false;
+ if (attr1->attgenerated != attr2->attgenerated)
+ return false;
+ if (attr1->attisdropped != attr2->attisdropped)
+ return false;
+ if (attr1->attislocal != attr2->attislocal)
+ return false;
+ if (attr1->attinhcount != attr2->attinhcount)
+ return false;
+ if (attr1->attcollation != attr2->attcollation)
+ return false;
+ /* variable-length fields are not even present... */
+ }
+
+ if (tupdesc1->constr != NULL)
+ {
+ TupleConstr *constr1 = tupdesc1->constr;
+ TupleConstr *constr2 = tupdesc2->constr;
+
+ if (constr2 == NULL)
+ return false;
+ if (constr1->has_not_null != constr2->has_not_null)
+ return false;
+ if (constr1->has_generated_stored != constr2->has_generated_stored)
+ return false;
+ n = constr1->num_defval;
+ if (n != (int) constr2->num_defval)
+ return false;
+ /* We assume here that both AttrDefault arrays are in adnum order */
+ for (i = 0; i < n; i++)
+ {
+ AttrDefault *defval1 = constr1->defval + i;
+ AttrDefault *defval2 = constr2->defval + i;
+
+ if (defval1->adnum != defval2->adnum)
+ return false;
+ if (strcmp(defval1->adbin, defval2->adbin) != 0)
+ return false;
+ }
+ if (constr1->missing)
+ {
+ if (!constr2->missing)
+ return false;
+ for (i = 0; i < tupdesc1->natts; i++)
+ {
+ AttrMissing *missval1 = constr1->missing + i;
+ AttrMissing *missval2 = constr2->missing + i;
+
+ if (missval1->am_present != missval2->am_present)
+ return false;
+ if (missval1->am_present)
+ {
+ Form_pg_attribute missatt1 = TupleDescAttr(tupdesc1, i);
+
+ if (!datumIsEqual(missval1->am_value, missval2->am_value,
+ missatt1->attbyval, missatt1->attlen))
+ return false;
+ }
+ }
+ }
+ else if (constr2->missing)
+ return false;
+ n = constr1->num_check;
+ if (n != (int) constr2->num_check)
+ return false;
+
+ /*
+ * Similarly, we rely here on the ConstrCheck entries being sorted by
+ * name. If there are duplicate names, the outcome of the comparison
+ * is uncertain, but that should not happen.
+ */
+ for (i = 0; i < n; i++)
+ {
+ ConstrCheck *check1 = constr1->check + i;
+ ConstrCheck *check2 = constr2->check + i;
+
+ if (!(strcmp(check1->ccname, check2->ccname) == 0 &&
+ strcmp(check1->ccbin, check2->ccbin) == 0 &&
+ check1->ccvalid == check2->ccvalid &&
+ check1->ccnoinherit == check2->ccnoinherit))
+ return false;
+ }
+ }
+ else if (tupdesc2->constr != NULL)
+ return false;
+ return true;
+}
+
+/*
+ * hashTupleDesc
+ * Compute a hash value for a tuple descriptor.
+ *
+ * If two tuple descriptors would be considered equal by equalTupleDescs()
+ * then their hash value will be equal according to this function.
+ *
+ * Note that currently contents of constraint are not hashed - it'd be a bit
+ * painful to do so, and conflicts just due to constraints are unlikely.
+ */
+uint32
+hashTupleDesc(TupleDesc desc)
+{
+ uint32 s;
+ int i;
+
+ s = hash_combine(0, hash_uint32(desc->natts));
+ s = hash_combine(s, hash_uint32(desc->tdtypeid));
+ for (i = 0; i < desc->natts; ++i)
+ s = hash_combine(s, hash_uint32(TupleDescAttr(desc, i)->atttypid));
+
+ return s;
+}
+
+/*
+ * TupleDescInitEntry
+ * This function initializes a single attribute structure in
+ * a previously allocated tuple descriptor.
+ *
+ * If attributeName is NULL, the attname field is set to an empty string
+ * (this is for cases where we don't know or need a name for the field).
+ * Also, some callers use this function to change the datatype-related fields
+ * in an existing tupdesc; they pass attributeName = NameStr(att->attname)
+ * to indicate that the attname field shouldn't be modified.
+ *
+ * Note that attcollation is set to the default for the specified datatype.
+ * If a nondefault collation is needed, insert it afterwards using
+ * TupleDescInitEntryCollation.
+ */
+void
+TupleDescInitEntry(TupleDesc desc,
+ AttrNumber attributeNumber,
+ const char *attributeName,
+ Oid oidtypeid,
+ int32 typmod,
+ int attdim)
+{
+ HeapTuple tuple;
+ Form_pg_type typeForm;
+ Form_pg_attribute att;
+
+ /*
+ * sanity checks
+ */
+ AssertArg(PointerIsValid(desc));
+ AssertArg(attributeNumber >= 1);
+ AssertArg(attributeNumber <= desc->natts);
+
+ /*
+ * initialize the attribute fields
+ */
+ att = TupleDescAttr(desc, attributeNumber - 1);
+
+ att->attrelid = 0; /* dummy value */
+
+ /*
+ * Note: attributeName can be NULL, because the planner doesn't always
+ * fill in valid resname values in targetlists, particularly for resjunk
+ * attributes. Also, do nothing if caller wants to re-use the old attname.
+ */
+ if (attributeName == NULL)
+ MemSet(NameStr(att->attname), 0, NAMEDATALEN);
+ else if (attributeName != NameStr(att->attname))
+ namestrcpy(&(att->attname), attributeName);
+
+ att->attstattarget = -1;
+ att->attcacheoff = -1;
+ att->atttypmod = typmod;
+
+ att->attnum = attributeNumber;
+ att->attndims = attdim;
+
+ att->attnotnull = false;
+ att->atthasdef = false;
+ att->atthasmissing = false;
+ att->attidentity = '\0';
+ att->attgenerated = '\0';
+ att->attisdropped = false;
+ att->attislocal = true;
+ att->attinhcount = 0;
+ /* attacl, attoptions and attfdwoptions are not present in tupledescs */
+
+ tuple = SearchSysCache1(TYPEOID, ObjectIdGetDatum(oidtypeid));
+ if (!HeapTupleIsValid(tuple))
+ elog(ERROR, "cache lookup failed for type %u", oidtypeid);
+ typeForm = (Form_pg_type) GETSTRUCT(tuple);
+
+ att->atttypid = oidtypeid;
+ att->attlen = typeForm->typlen;
+ att->attbyval = typeForm->typbyval;
+ att->attalign = typeForm->typalign;
+ att->attstorage = typeForm->typstorage;
+ att->attcompression = InvalidCompressionMethod;
+ att->attcollation = typeForm->typcollation;
+
+ ReleaseSysCache(tuple);
+}
+
+/*
+ * TupleDescInitBuiltinEntry
+ * Initialize a tuple descriptor without catalog access. Only
+ * a limited range of builtin types are supported.
+ */
+void
+TupleDescInitBuiltinEntry(TupleDesc desc,
+ AttrNumber attributeNumber,
+ const char *attributeName,
+ Oid oidtypeid,
+ int32 typmod,
+ int attdim)
+{
+ Form_pg_attribute att;
+
+ /* sanity checks */
+ AssertArg(PointerIsValid(desc));
+ AssertArg(attributeNumber >= 1);
+ AssertArg(attributeNumber <= desc->natts);
+
+ /* initialize the attribute fields */
+ att = TupleDescAttr(desc, attributeNumber - 1);
+ att->attrelid = 0; /* dummy value */
+
+ /* unlike TupleDescInitEntry, we require an attribute name */
+ Assert(attributeName != NULL);
+ namestrcpy(&(att->attname), attributeName);
+
+ att->attstattarget = -1;
+ att->attcacheoff = -1;
+ att->atttypmod = typmod;
+
+ att->attnum = attributeNumber;
+ att->attndims = attdim;
+
+ att->attnotnull = false;
+ att->atthasdef = false;
+ att->atthasmissing = false;
+ att->attidentity = '\0';
+ att->attgenerated = '\0';
+ att->attisdropped = false;
+ att->attislocal = true;
+ att->attinhcount = 0;
+ /* attacl, attoptions and attfdwoptions are not present in tupledescs */
+
+ att->atttypid = oidtypeid;
+
+ /*
+ * Our goal here is to support just enough types to let basic builtin
+ * commands work without catalog access - e.g. so that we can do certain
+ * things even in processes that are not connected to a database.
+ */
+ switch (oidtypeid)
+ {
+ case TEXTOID:
+ case TEXTARRAYOID:
+ att->attlen = -1;
+ att->attbyval = false;
+ att->attalign = TYPALIGN_INT;
+ att->attstorage = TYPSTORAGE_EXTENDED;
+ att->attcompression = InvalidCompressionMethod;
+ att->attcollation = DEFAULT_COLLATION_OID;
+ break;
+
+ case BOOLOID:
+ att->attlen = 1;
+ att->attbyval = true;
+ att->attalign = TYPALIGN_CHAR;
+ att->attstorage = TYPSTORAGE_PLAIN;
+ att->attcompression = InvalidCompressionMethod;
+ att->attcollation = InvalidOid;
+ break;
+
+ case INT4OID:
+ att->attlen = 4;
+ att->attbyval = true;
+ att->attalign = TYPALIGN_INT;
+ att->attstorage = TYPSTORAGE_PLAIN;
+ att->attcompression = InvalidCompressionMethod;
+ att->attcollation = InvalidOid;
+ break;
+
+ case INT8OID:
+ att->attlen = 8;
+ att->attbyval = FLOAT8PASSBYVAL;
+ att->attalign = TYPALIGN_DOUBLE;
+ att->attstorage = TYPSTORAGE_PLAIN;
+ att->attcompression = InvalidCompressionMethod;
+ att->attcollation = InvalidOid;
+ break;
+
+ default:
+ elog(ERROR, "unsupported type %u", oidtypeid);
+ }
+}
+
+/*
+ * TupleDescInitEntryCollation
+ *
+ * Assign a nondefault collation to a previously initialized tuple descriptor
+ * entry.
+ */
+void
+TupleDescInitEntryCollation(TupleDesc desc,
+ AttrNumber attributeNumber,
+ Oid collationid)
+{
+ /*
+ * sanity checks
+ */
+ AssertArg(PointerIsValid(desc));
+ AssertArg(attributeNumber >= 1);
+ AssertArg(attributeNumber <= desc->natts);
+
+ TupleDescAttr(desc, attributeNumber - 1)->attcollation = collationid;
+}
+
+
+/*
+ * BuildDescForRelation
+ *
+ * Given a relation schema (list of ColumnDef nodes), build a TupleDesc.
+ *
+ * Note: tdtypeid will need to be filled in later on.
+ */
+TupleDesc
+BuildDescForRelation(List *schema)
+{
+ int natts;
+ AttrNumber attnum;
+ ListCell *l;
+ TupleDesc desc;
+ bool has_not_null;
+ char *attname;
+ Oid atttypid;
+ int32 atttypmod;
+ Oid attcollation;
+ int attdim;
+
+ /*
+ * allocate a new tuple descriptor
+ */
+ natts = list_length(schema);
+ desc = CreateTemplateTupleDesc(natts);
+ has_not_null = false;
+
+ attnum = 0;
+
+ foreach(l, schema)
+ {
+ ColumnDef *entry = lfirst(l);
+ AclResult aclresult;
+ Form_pg_attribute att;
+
+ /*
+ * for each entry in the list, get the name and type information from
+ * the list and have TupleDescInitEntry fill in the attribute
+ * information we need.
+ */
+ attnum++;
+
+ attname = entry->colname;
+ typenameTypeIdAndMod(NULL, entry->typeName, &atttypid, &atttypmod);
+
+ aclresult = pg_type_aclcheck(atttypid, GetUserId(), ACL_USAGE);
+ if (aclresult != ACLCHECK_OK)
+ aclcheck_error_type(aclresult, atttypid);
+
+ attcollation = GetColumnDefCollation(NULL, entry, atttypid);
+ attdim = list_length(entry->typeName->arrayBounds);
+
+ if (entry->typeName->setof)
+ ereport(ERROR,
+ (errcode(ERRCODE_INVALID_TABLE_DEFINITION),
+ errmsg("column \"%s\" cannot be declared SETOF",
+ attname)));
+
+ TupleDescInitEntry(desc, attnum, attname,
+ atttypid, atttypmod, attdim);
+ att = TupleDescAttr(desc, attnum - 1);
+
+ /* Override TupleDescInitEntry's settings as requested */
+ TupleDescInitEntryCollation(desc, attnum, attcollation);
+ if (entry->storage)
+ att->attstorage = entry->storage;
+
+ /* Fill in additional stuff not handled by TupleDescInitEntry */
+ att->attnotnull = entry->is_not_null;
+ has_not_null |= entry->is_not_null;
+ att->attislocal = entry->is_local;
+ att->attinhcount = entry->inhcount;
+ }
+
+ if (has_not_null)
+ {
+ TupleConstr *constr = (TupleConstr *) palloc0(sizeof(TupleConstr));
+
+ constr->has_not_null = true;
+ constr->has_generated_stored = false;
+ constr->defval = NULL;
+ constr->missing = NULL;
+ constr->num_defval = 0;
+ constr->check = NULL;
+ constr->num_check = 0;
+ desc->constr = constr;
+ }
+ else
+ {
+ desc->constr = NULL;
+ }
+
+ return desc;
+}
+
+/*
+ * BuildDescFromLists
+ *
+ * Build a TupleDesc given lists of column names (as String nodes),
+ * column type OIDs, typmods, and collation OIDs.
+ *
+ * No constraints are generated.
+ *
+ * This is essentially a cut-down version of BuildDescForRelation for use
+ * with functions returning RECORD.
+ */
+TupleDesc
+BuildDescFromLists(List *names, List *types, List *typmods, List *collations)
+{
+ int natts;
+ AttrNumber attnum;
+ ListCell *l1;
+ ListCell *l2;
+ ListCell *l3;
+ ListCell *l4;
+ TupleDesc desc;
+
+ natts = list_length(names);
+ Assert(natts == list_length(types));
+ Assert(natts == list_length(typmods));
+ Assert(natts == list_length(collations));
+
+ /*
+ * allocate a new tuple descriptor
+ */
+ desc = CreateTemplateTupleDesc(natts);
+
+ attnum = 0;
+ forfour(l1, names, l2, types, l3, typmods, l4, collations)
+ {
+ char *attname = strVal(lfirst(l1));
+ Oid atttypid = lfirst_oid(l2);
+ int32 atttypmod = lfirst_int(l3);
+ Oid attcollation = lfirst_oid(l4);
+
+ attnum++;
+
+ TupleDescInitEntry(desc, attnum, attname, atttypid, atttypmod, 0);
+ TupleDescInitEntryCollation(desc, attnum, attcollation);
+ }
+
+ return desc;
+}