diff options
Diffstat (limited to '')
-rw-r--r-- | src/backend/access/common/Makefile | 33 | ||||
-rw-r--r-- | src/backend/access/common/attmap.c | 324 | ||||
-rw-r--r-- | src/backend/access/common/bufmask.c | 130 | ||||
-rw-r--r-- | src/backend/access/common/detoast.c | 646 | ||||
-rw-r--r-- | src/backend/access/common/heaptuple.c | 1501 | ||||
-rw-r--r-- | src/backend/access/common/indextuple.c | 589 | ||||
-rw-r--r-- | src/backend/access/common/printsimple.c | 132 | ||||
-rw-r--r-- | src/backend/access/common/printtup.c | 485 | ||||
-rw-r--r-- | src/backend/access/common/relation.c | 217 | ||||
-rw-r--r-- | src/backend/access/common/reloptions.c | 2131 | ||||
-rw-r--r-- | src/backend/access/common/scankey.c | 117 | ||||
-rw-r--r-- | src/backend/access/common/session.c | 208 | ||||
-rw-r--r-- | src/backend/access/common/syncscan.c | 322 | ||||
-rw-r--r-- | src/backend/access/common/toast_compression.c | 318 | ||||
-rw-r--r-- | src/backend/access/common/toast_internals.c | 664 | ||||
-rw-r--r-- | src/backend/access/common/tupconvert.c | 293 | ||||
-rw-r--r-- | src/backend/access/common/tupdesc.c | 912 |
17 files changed, 9022 insertions, 0 deletions
diff --git a/src/backend/access/common/Makefile b/src/backend/access/common/Makefile new file mode 100644 index 0000000..b9aff0c --- /dev/null +++ b/src/backend/access/common/Makefile @@ -0,0 +1,33 @@ +#------------------------------------------------------------------------- +# +# Makefile-- +# Makefile for access/common +# +# IDENTIFICATION +# src/backend/access/common/Makefile +# +#------------------------------------------------------------------------- + +subdir = src/backend/access/common +top_builddir = ../../../.. +include $(top_builddir)/src/Makefile.global + +OBJS = \ + attmap.o \ + bufmask.o \ + detoast.o \ + heaptuple.o \ + indextuple.o \ + printsimple.o \ + printtup.o \ + relation.o \ + reloptions.o \ + scankey.o \ + session.o \ + syncscan.o \ + toast_compression.o \ + toast_internals.o \ + tupconvert.o \ + tupdesc.o + +include $(top_srcdir)/src/backend/common.mk diff --git a/src/backend/access/common/attmap.c b/src/backend/access/common/attmap.c new file mode 100644 index 0000000..32405f8 --- /dev/null +++ b/src/backend/access/common/attmap.c @@ -0,0 +1,324 @@ +/*------------------------------------------------------------------------- + * + * attmap.c + * Attribute mapping support. + * + * This file provides utility routines to build and manage attribute + * mappings by comparing input and output TupleDescs. Such mappings + * are typically used by DDL operating on inheritance and partition trees + * to do a conversion between rowtypes logically equivalent but with + * columns in a different order, taking into account dropped columns. + * They are also used by the tuple conversion routines in tupconvert.c. + * + * Portions Copyright (c) 1996-2021, PostgreSQL Global Development Group + * Portions Copyright (c) 1994, Regents of the University of California + * + * + * IDENTIFICATION + * src/backend/access/common/attmap.c + * + *------------------------------------------------------------------------- + */ + +#include "postgres.h" + +#include "access/attmap.h" +#include "access/htup_details.h" +#include "utils/builtins.h" + + +static bool check_attrmap_match(TupleDesc indesc, + TupleDesc outdesc, + AttrMap *attrMap); + +/* + * make_attrmap + * + * Utility routine to allocate an attribute map in the current memory + * context. + */ +AttrMap * +make_attrmap(int maplen) +{ + AttrMap *res; + + res = (AttrMap *) palloc0(sizeof(AttrMap)); + res->maplen = maplen; + res->attnums = (AttrNumber *) palloc0(sizeof(AttrNumber) * maplen); + return res; +} + +/* + * free_attrmap + * + * Utility routine to release an attribute map. + */ +void +free_attrmap(AttrMap *map) +{ + pfree(map->attnums); + pfree(map); +} + +/* + * build_attrmap_by_position + * + * Return a palloc'd bare attribute map for tuple conversion, matching input + * and output columns by position. Dropped columns are ignored in both input + * and output, marked as 0. This is normally a subroutine for + * convert_tuples_by_position in tupconvert.c, but it can be used standalone. + * + * Note: the errdetail messages speak of indesc as the "returned" rowtype, + * outdesc as the "expected" rowtype. This is okay for current uses but + * might need generalization in future. + */ +AttrMap * +build_attrmap_by_position(TupleDesc indesc, + TupleDesc outdesc, + const char *msg) +{ + AttrMap *attrMap; + int nincols; + int noutcols; + int n; + int i; + int j; + bool same; + + /* + * The length is computed as the number of attributes of the expected + * rowtype as it includes dropped attributes in its count. + */ + n = outdesc->natts; + attrMap = make_attrmap(n); + + j = 0; /* j is next physical input attribute */ + nincols = noutcols = 0; /* these count non-dropped attributes */ + same = true; + for (i = 0; i < n; i++) + { + Form_pg_attribute att = TupleDescAttr(outdesc, i); + Oid atttypid; + int32 atttypmod; + + if (att->attisdropped) + continue; /* attrMap->attnums[i] is already 0 */ + noutcols++; + atttypid = att->atttypid; + atttypmod = att->atttypmod; + for (; j < indesc->natts; j++) + { + att = TupleDescAttr(indesc, j); + if (att->attisdropped) + continue; + nincols++; + + /* Found matching column, now check type */ + if (atttypid != att->atttypid || + (atttypmod != att->atttypmod && atttypmod >= 0)) + ereport(ERROR, + (errcode(ERRCODE_DATATYPE_MISMATCH), + errmsg_internal("%s", _(msg)), + errdetail("Returned type %s does not match expected type %s in column %d.", + format_type_with_typemod(att->atttypid, + att->atttypmod), + format_type_with_typemod(atttypid, + atttypmod), + noutcols))); + attrMap->attnums[i] = (AttrNumber) (j + 1); + j++; + break; + } + if (attrMap->attnums[i] == 0) + same = false; /* we'll complain below */ + } + + /* Check for unused input columns */ + for (; j < indesc->natts; j++) + { + if (TupleDescAttr(indesc, j)->attisdropped) + continue; + nincols++; + same = false; /* we'll complain below */ + } + + /* Report column count mismatch using the non-dropped-column counts */ + if (!same) + ereport(ERROR, + (errcode(ERRCODE_DATATYPE_MISMATCH), + errmsg_internal("%s", _(msg)), + errdetail("Number of returned columns (%d) does not match " + "expected column count (%d).", + nincols, noutcols))); + + /* Check if the map has a one-to-one match */ + if (check_attrmap_match(indesc, outdesc, attrMap)) + { + /* Runtime conversion is not needed */ + free_attrmap(attrMap); + return NULL; + } + + return attrMap; +} + +/* + * build_attrmap_by_name + * + * Return a palloc'd bare attribute map for tuple conversion, matching input + * and output columns by name. (Dropped columns are ignored in both input and + * output.) This is normally a subroutine for convert_tuples_by_name in + * tupconvert.c, but can be used standalone. + */ +AttrMap * +build_attrmap_by_name(TupleDesc indesc, + TupleDesc outdesc) +{ + AttrMap *attrMap; + int outnatts; + int innatts; + int i; + int nextindesc = -1; + + outnatts = outdesc->natts; + innatts = indesc->natts; + + attrMap = make_attrmap(outnatts); + for (i = 0; i < outnatts; i++) + { + Form_pg_attribute outatt = TupleDescAttr(outdesc, i); + char *attname; + Oid atttypid; + int32 atttypmod; + int j; + + if (outatt->attisdropped) + continue; /* attrMap->attnums[i] is already 0 */ + attname = NameStr(outatt->attname); + atttypid = outatt->atttypid; + atttypmod = outatt->atttypmod; + + /* + * Now search for an attribute with the same name in the indesc. It + * seems likely that a partitioned table will have the attributes in + * the same order as the partition, so the search below is optimized + * for that case. It is possible that columns are dropped in one of + * the relations, but not the other, so we use the 'nextindesc' + * counter to track the starting point of the search. If the inner + * loop encounters dropped columns then it will have to skip over + * them, but it should leave 'nextindesc' at the correct position for + * the next outer loop. + */ + for (j = 0; j < innatts; j++) + { + Form_pg_attribute inatt; + + nextindesc++; + if (nextindesc >= innatts) + nextindesc = 0; + + inatt = TupleDescAttr(indesc, nextindesc); + if (inatt->attisdropped) + continue; + if (strcmp(attname, NameStr(inatt->attname)) == 0) + { + /* Found it, check type */ + if (atttypid != inatt->atttypid || atttypmod != inatt->atttypmod) + ereport(ERROR, + (errcode(ERRCODE_DATATYPE_MISMATCH), + errmsg("could not convert row type"), + errdetail("Attribute \"%s\" of type %s does not match corresponding attribute of type %s.", + attname, + format_type_be(outdesc->tdtypeid), + format_type_be(indesc->tdtypeid)))); + attrMap->attnums[i] = inatt->attnum; + break; + } + } + if (attrMap->attnums[i] == 0) + ereport(ERROR, + (errcode(ERRCODE_DATATYPE_MISMATCH), + errmsg("could not convert row type"), + errdetail("Attribute \"%s\" of type %s does not exist in type %s.", + attname, + format_type_be(outdesc->tdtypeid), + format_type_be(indesc->tdtypeid)))); + } + return attrMap; +} + +/* + * build_attrmap_by_name_if_req + * + * Returns mapping created by build_attrmap_by_name, or NULL if no + * conversion is required. This is a convenience routine for + * convert_tuples_by_name() in tupconvert.c and other functions, but it + * can be used standalone. + */ +AttrMap * +build_attrmap_by_name_if_req(TupleDesc indesc, + TupleDesc outdesc) +{ + AttrMap *attrMap; + + /* Verify compatibility and prepare attribute-number map */ + attrMap = build_attrmap_by_name(indesc, outdesc); + + /* Check if the map has a one-to-one match */ + if (check_attrmap_match(indesc, outdesc, attrMap)) + { + /* Runtime conversion is not needed */ + free_attrmap(attrMap); + return NULL; + } + + return attrMap; +} + +/* + * check_attrmap_match + * + * Check to see if the map is a one-to-one match, in which case we need + * not to do a tuple conversion, and the attribute map is not necessary. + */ +static bool +check_attrmap_match(TupleDesc indesc, + TupleDesc outdesc, + AttrMap *attrMap) +{ + int i; + + /* no match if attribute numbers are not the same */ + if (indesc->natts != outdesc->natts) + return false; + + for (i = 0; i < attrMap->maplen; i++) + { + Form_pg_attribute inatt = TupleDescAttr(indesc, i); + Form_pg_attribute outatt = TupleDescAttr(outdesc, i); + + /* + * If the input column has a missing attribute, we need a conversion. + */ + if (inatt->atthasmissing) + return false; + + if (attrMap->attnums[i] == (i + 1)) + continue; + + /* + * If it's a dropped column and the corresponding input column is also + * dropped, we don't need a conversion. However, attlen and attalign + * must agree. + */ + if (attrMap->attnums[i] == 0 && + inatt->attisdropped && + inatt->attlen == outatt->attlen && + inatt->attalign == outatt->attalign) + continue; + + return false; + } + + return true; +} diff --git a/src/backend/access/common/bufmask.c b/src/backend/access/common/bufmask.c new file mode 100644 index 0000000..003a0be --- /dev/null +++ b/src/backend/access/common/bufmask.c @@ -0,0 +1,130 @@ +/*------------------------------------------------------------------------- + * + * bufmask.c + * Routines for buffer masking. Used to mask certain bits + * in a page which can be different when the WAL is generated + * and when the WAL is applied. + * + * Portions Copyright (c) 2016-2021, PostgreSQL Global Development Group + * + * Contains common routines required for masking a page. + * + * IDENTIFICATION + * src/backend/access/common/bufmask.c + * + *------------------------------------------------------------------------- + */ + +#include "postgres.h" + +#include "access/bufmask.h" + +/* + * mask_page_lsn_and_checksum + * + * In consistency checks, the LSN of the two pages compared will likely be + * different because of concurrent operations when the WAL is generated and + * the state of the page when WAL is applied. Also, mask out checksum as + * masking anything else on page means checksum is not going to match as well. + */ +void +mask_page_lsn_and_checksum(Page page) +{ + PageHeader phdr = (PageHeader) page; + + PageXLogRecPtrSet(phdr->pd_lsn, (uint64) MASK_MARKER); + phdr->pd_checksum = MASK_MARKER; +} + +/* + * mask_page_hint_bits + * + * Mask hint bits in PageHeader. We want to ignore differences in hint bits, + * since they can be set without emitting any WAL. + */ +void +mask_page_hint_bits(Page page) +{ + PageHeader phdr = (PageHeader) page; + + /* Ignore prune_xid (it's like a hint-bit) */ + phdr->pd_prune_xid = MASK_MARKER; + + /* Ignore PD_PAGE_FULL and PD_HAS_FREE_LINES flags, they are just hints. */ + PageClearFull(page); + PageClearHasFreeLinePointers(page); + + /* + * During replay, if the page LSN has advanced past our XLOG record's LSN, + * we don't mark the page all-visible. See heap_xlog_visible() for + * details. + */ + PageClearAllVisible(page); +} + +/* + * mask_unused_space + * + * Mask the unused space of a page between pd_lower and pd_upper. + */ +void +mask_unused_space(Page page) +{ + int pd_lower = ((PageHeader) page)->pd_lower; + int pd_upper = ((PageHeader) page)->pd_upper; + int pd_special = ((PageHeader) page)->pd_special; + + /* Sanity check */ + if (pd_lower > pd_upper || pd_special < pd_upper || + pd_lower < SizeOfPageHeaderData || pd_special > BLCKSZ) + { + elog(ERROR, "invalid page pd_lower %u pd_upper %u pd_special %u\n", + pd_lower, pd_upper, pd_special); + } + + memset(page + pd_lower, MASK_MARKER, pd_upper - pd_lower); +} + +/* + * mask_lp_flags + * + * In some index AMs, line pointer flags can be modified on the primary + * without emitting any WAL record. + */ +void +mask_lp_flags(Page page) +{ + OffsetNumber offnum, + maxoff; + + maxoff = PageGetMaxOffsetNumber(page); + for (offnum = FirstOffsetNumber; + offnum <= maxoff; + offnum = OffsetNumberNext(offnum)) + { + ItemId itemId = PageGetItemId(page, offnum); + + if (ItemIdIsUsed(itemId)) + itemId->lp_flags = LP_UNUSED; + } +} + +/* + * mask_page_content + * + * In some index AMs, the contents of deleted pages need to be almost + * completely ignored. + */ +void +mask_page_content(Page page) +{ + /* Mask Page Content */ + memset(page + SizeOfPageHeaderData, MASK_MARKER, + BLCKSZ - SizeOfPageHeaderData); + + /* Mask pd_lower and pd_upper */ + memset(&((PageHeader) page)->pd_lower, MASK_MARKER, + sizeof(uint16)); + memset(&((PageHeader) page)->pd_upper, MASK_MARKER, + sizeof(uint16)); +} diff --git a/src/backend/access/common/detoast.c b/src/backend/access/common/detoast.c new file mode 100644 index 0000000..545a6b8 --- /dev/null +++ b/src/backend/access/common/detoast.c @@ -0,0 +1,646 @@ +/*------------------------------------------------------------------------- + * + * detoast.c + * Retrieve compressed or external variable size attributes. + * + * Copyright (c) 2000-2021, PostgreSQL Global Development Group + * + * IDENTIFICATION + * src/backend/access/common/detoast.c + * + *------------------------------------------------------------------------- + */ + +#include "postgres.h" + +#include "access/detoast.h" +#include "access/table.h" +#include "access/tableam.h" +#include "access/toast_internals.h" +#include "common/int.h" +#include "common/pg_lzcompress.h" +#include "utils/expandeddatum.h" +#include "utils/rel.h" + +static struct varlena *toast_fetch_datum(struct varlena *attr); +static struct varlena *toast_fetch_datum_slice(struct varlena *attr, + int32 sliceoffset, + int32 slicelength); +static struct varlena *toast_decompress_datum(struct varlena *attr); +static struct varlena *toast_decompress_datum_slice(struct varlena *attr, int32 slicelength); + +/* ---------- + * detoast_external_attr - + * + * Public entry point to get back a toasted value from + * external source (possibly still in compressed format). + * + * This will return a datum that contains all the data internally, ie, not + * relying on external storage or memory, but it can still be compressed or + * have a short header. Note some callers assume that if the input is an + * EXTERNAL datum, the result will be a pfree'able chunk. + * ---------- + */ +struct varlena * +detoast_external_attr(struct varlena *attr) +{ + struct varlena *result; + + if (VARATT_IS_EXTERNAL_ONDISK(attr)) + { + /* + * This is an external stored plain value + */ + result = toast_fetch_datum(attr); + } + else if (VARATT_IS_EXTERNAL_INDIRECT(attr)) + { + /* + * This is an indirect pointer --- dereference it + */ + struct varatt_indirect redirect; + + VARATT_EXTERNAL_GET_POINTER(redirect, attr); + attr = (struct varlena *) redirect.pointer; + + /* nested indirect Datums aren't allowed */ + Assert(!VARATT_IS_EXTERNAL_INDIRECT(attr)); + + /* recurse if value is still external in some other way */ + if (VARATT_IS_EXTERNAL(attr)) + return detoast_external_attr(attr); + + /* + * Copy into the caller's memory context, in case caller tries to + * pfree the result. + */ + result = (struct varlena *) palloc(VARSIZE_ANY(attr)); + memcpy(result, attr, VARSIZE_ANY(attr)); + } + else if (VARATT_IS_EXTERNAL_EXPANDED(attr)) + { + /* + * This is an expanded-object pointer --- get flat format + */ + ExpandedObjectHeader *eoh; + Size resultsize; + + eoh = DatumGetEOHP(PointerGetDatum(attr)); + resultsize = EOH_get_flat_size(eoh); + result = (struct varlena *) palloc(resultsize); + EOH_flatten_into(eoh, (void *) result, resultsize); + } + else + { + /* + * This is a plain value inside of the main tuple - why am I called? + */ + result = attr; + } + + return result; +} + + +/* ---------- + * detoast_attr - + * + * Public entry point to get back a toasted value from compression + * or external storage. The result is always non-extended varlena form. + * + * Note some callers assume that if the input is an EXTERNAL or COMPRESSED + * datum, the result will be a pfree'able chunk. + * ---------- + */ +struct varlena * +detoast_attr(struct varlena *attr) +{ + if (VARATT_IS_EXTERNAL_ONDISK(attr)) + { + /* + * This is an externally stored datum --- fetch it back from there + */ + attr = toast_fetch_datum(attr); + /* If it's compressed, decompress it */ + if (VARATT_IS_COMPRESSED(attr)) + { + struct varlena *tmp = attr; + + attr = toast_decompress_datum(tmp); + pfree(tmp); + } + } + else if (VARATT_IS_EXTERNAL_INDIRECT(attr)) + { + /* + * This is an indirect pointer --- dereference it + */ + struct varatt_indirect redirect; + + VARATT_EXTERNAL_GET_POINTER(redirect, attr); + attr = (struct varlena *) redirect.pointer; + + /* nested indirect Datums aren't allowed */ + Assert(!VARATT_IS_EXTERNAL_INDIRECT(attr)); + + /* recurse in case value is still extended in some other way */ + attr = detoast_attr(attr); + + /* if it isn't, we'd better copy it */ + if (attr == (struct varlena *) redirect.pointer) + { + struct varlena *result; + + result = (struct varlena *) palloc(VARSIZE_ANY(attr)); + memcpy(result, attr, VARSIZE_ANY(attr)); + attr = result; + } + } + else if (VARATT_IS_EXTERNAL_EXPANDED(attr)) + { + /* + * This is an expanded-object pointer --- get flat format + */ + attr = detoast_external_attr(attr); + /* flatteners are not allowed to produce compressed/short output */ + Assert(!VARATT_IS_EXTENDED(attr)); + } + else if (VARATT_IS_COMPRESSED(attr)) + { + /* + * This is a compressed value inside of the main tuple + */ + attr = toast_decompress_datum(attr); + } + else if (VARATT_IS_SHORT(attr)) + { + /* + * This is a short-header varlena --- convert to 4-byte header format + */ + Size data_size = VARSIZE_SHORT(attr) - VARHDRSZ_SHORT; + Size new_size = data_size + VARHDRSZ; + struct varlena *new_attr; + + new_attr = (struct varlena *) palloc(new_size); + SET_VARSIZE(new_attr, new_size); + memcpy(VARDATA(new_attr), VARDATA_SHORT(attr), data_size); + attr = new_attr; + } + + return attr; +} + + +/* ---------- + * detoast_attr_slice - + * + * Public entry point to get back part of a toasted value + * from compression or external storage. + * + * sliceoffset is where to start (zero or more) + * If slicelength < 0, return everything beyond sliceoffset + * ---------- + */ +struct varlena * +detoast_attr_slice(struct varlena *attr, + int32 sliceoffset, int32 slicelength) +{ + struct varlena *preslice; + struct varlena *result; + char *attrdata; + int32 slicelimit; + int32 attrsize; + + if (sliceoffset < 0) + elog(ERROR, "invalid sliceoffset: %d", sliceoffset); + + /* + * Compute slicelimit = offset + length, or -1 if we must fetch all of the + * value. In case of integer overflow, we must fetch all. + */ + if (slicelength < 0) + slicelimit = -1; + else if (pg_add_s32_overflow(sliceoffset, slicelength, &slicelimit)) + slicelength = slicelimit = -1; + + if (VARATT_IS_EXTERNAL_ONDISK(attr)) + { + struct varatt_external toast_pointer; + + VARATT_EXTERNAL_GET_POINTER(toast_pointer, attr); + + /* fast path for non-compressed external datums */ + if (!VARATT_EXTERNAL_IS_COMPRESSED(toast_pointer)) + return toast_fetch_datum_slice(attr, sliceoffset, slicelength); + + /* + * For compressed values, we need to fetch enough slices to decompress + * at least the requested part (when a prefix is requested). + * Otherwise, just fetch all slices. + */ + if (slicelimit >= 0) + { + int32 max_size = VARATT_EXTERNAL_GET_EXTSIZE(toast_pointer); + + /* + * Determine maximum amount of compressed data needed for a prefix + * of a given length (after decompression). + * + * At least for now, if it's LZ4 data, we'll have to fetch the + * whole thing, because there doesn't seem to be an API call to + * determine how much compressed data we need to be sure of being + * able to decompress the required slice. + */ + if (VARATT_EXTERNAL_GET_COMPRESS_METHOD(toast_pointer) == + TOAST_PGLZ_COMPRESSION_ID) + max_size = pglz_maximum_compressed_size(slicelimit, max_size); + + /* + * Fetch enough compressed slices (compressed marker will get set + * automatically). + */ + preslice = toast_fetch_datum_slice(attr, 0, max_size); + } + else + preslice = toast_fetch_datum(attr); + } + else if (VARATT_IS_EXTERNAL_INDIRECT(attr)) + { + struct varatt_indirect redirect; + + VARATT_EXTERNAL_GET_POINTER(redirect, attr); + + /* nested indirect Datums aren't allowed */ + Assert(!VARATT_IS_EXTERNAL_INDIRECT(redirect.pointer)); + + return detoast_attr_slice(redirect.pointer, + sliceoffset, slicelength); + } + else if (VARATT_IS_EXTERNAL_EXPANDED(attr)) + { + /* pass it off to detoast_external_attr to flatten */ + preslice = detoast_external_attr(attr); + } + else + preslice = attr; + + Assert(!VARATT_IS_EXTERNAL(preslice)); + + if (VARATT_IS_COMPRESSED(preslice)) + { + struct varlena *tmp = preslice; + + /* Decompress enough to encompass the slice and the offset */ + if (slicelimit >= 0) + preslice = toast_decompress_datum_slice(tmp, slicelimit); + else + preslice = toast_decompress_datum(tmp); + + if (tmp != attr) + pfree(tmp); + } + + if (VARATT_IS_SHORT(preslice)) + { + attrdata = VARDATA_SHORT(preslice); + attrsize = VARSIZE_SHORT(preslice) - VARHDRSZ_SHORT; + } + else + { + attrdata = VARDATA(preslice); + attrsize = VARSIZE(preslice) - VARHDRSZ; + } + + /* slicing of datum for compressed cases and plain value */ + + if (sliceoffset >= attrsize) + { + sliceoffset = 0; + slicelength = 0; + } + else if (slicelength < 0 || slicelimit > attrsize) + slicelength = attrsize - sliceoffset; + + result = (struct varlena *) palloc(slicelength + VARHDRSZ); + SET_VARSIZE(result, slicelength + VARHDRSZ); + + memcpy(VARDATA(result), attrdata + sliceoffset, slicelength); + + if (preslice != attr) + pfree(preslice); + + return result; +} + +/* ---------- + * toast_fetch_datum - + * + * Reconstruct an in memory Datum from the chunks saved + * in the toast relation + * ---------- + */ +static struct varlena * +toast_fetch_datum(struct varlena *attr) +{ + Relation toastrel; + struct varlena *result; + struct varatt_external toast_pointer; + int32 attrsize; + + if (!VARATT_IS_EXTERNAL_ONDISK(attr)) + elog(ERROR, "toast_fetch_datum shouldn't be called for non-ondisk datums"); + + /* Must copy to access aligned fields */ + VARATT_EXTERNAL_GET_POINTER(toast_pointer, attr); + + attrsize = VARATT_EXTERNAL_GET_EXTSIZE(toast_pointer); + + result = (struct varlena *) palloc(attrsize + VARHDRSZ); + + if (VARATT_EXTERNAL_IS_COMPRESSED(toast_pointer)) + SET_VARSIZE_COMPRESSED(result, attrsize + VARHDRSZ); + else + SET_VARSIZE(result, attrsize + VARHDRSZ); + + if (attrsize == 0) + return result; /* Probably shouldn't happen, but just in + * case. */ + + /* + * Open the toast relation and its indexes + */ + toastrel = table_open(toast_pointer.va_toastrelid, AccessShareLock); + + /* Fetch all chunks */ + table_relation_fetch_toast_slice(toastrel, toast_pointer.va_valueid, + attrsize, 0, attrsize, result); + + /* Close toast table */ + table_close(toastrel, AccessShareLock); + + return result; +} + +/* ---------- + * toast_fetch_datum_slice - + * + * Reconstruct a segment of a Datum from the chunks saved + * in the toast relation + * + * Note that this function supports non-compressed external datums + * and compressed external datums (in which case the requested slice + * has to be a prefix, i.e. sliceoffset has to be 0). + * ---------- + */ +static struct varlena * +toast_fetch_datum_slice(struct varlena *attr, int32 sliceoffset, + int32 slicelength) +{ + Relation toastrel; + struct varlena *result; + struct varatt_external toast_pointer; + int32 attrsize; + + if (!VARATT_IS_EXTERNAL_ONDISK(attr)) + elog(ERROR, "toast_fetch_datum_slice shouldn't be called for non-ondisk datums"); + + /* Must copy to access aligned fields */ + VARATT_EXTERNAL_GET_POINTER(toast_pointer, attr); + + /* + * It's nonsense to fetch slices of a compressed datum unless when it's a + * prefix -- this isn't lo_* we can't return a compressed datum which is + * meaningful to toast later. + */ + Assert(!VARATT_EXTERNAL_IS_COMPRESSED(toast_pointer) || 0 == sliceoffset); + + attrsize = VARATT_EXTERNAL_GET_EXTSIZE(toast_pointer); + + if (sliceoffset >= attrsize) + { + sliceoffset = 0; + slicelength = 0; + } + + /* + * When fetching a prefix of a compressed external datum, account for the + * space required by va_tcinfo, which is stored at the beginning as an + * int32 value. + */ + if (VARATT_EXTERNAL_IS_COMPRESSED(toast_pointer) && slicelength > 0) + slicelength = slicelength + sizeof(int32); + + /* + * Adjust length request if needed. (Note: our sole caller, + * detoast_attr_slice, protects us against sliceoffset + slicelength + * overflowing.) + */ + if (((sliceoffset + slicelength) > attrsize) || slicelength < 0) + slicelength = attrsize - sliceoffset; + + result = (struct varlena *) palloc(slicelength + VARHDRSZ); + + if (VARATT_EXTERNAL_IS_COMPRESSED(toast_pointer)) + SET_VARSIZE_COMPRESSED(result, slicelength + VARHDRSZ); + else + SET_VARSIZE(result, slicelength + VARHDRSZ); + + if (slicelength == 0) + return result; /* Can save a lot of work at this point! */ + + /* Open the toast relation */ + toastrel = table_open(toast_pointer.va_toastrelid, AccessShareLock); + + /* Fetch all chunks */ + table_relation_fetch_toast_slice(toastrel, toast_pointer.va_valueid, + attrsize, sliceoffset, slicelength, + result); + + /* Close toast table */ + table_close(toastrel, AccessShareLock); + + return result; +} + +/* ---------- + * toast_decompress_datum - + * + * Decompress a compressed version of a varlena datum + */ +static struct varlena * +toast_decompress_datum(struct varlena *attr) +{ + ToastCompressionId cmid; + + Assert(VARATT_IS_COMPRESSED(attr)); + + /* + * Fetch the compression method id stored in the compression header and + * decompress the data using the appropriate decompression routine. + */ + cmid = TOAST_COMPRESS_METHOD(attr); + switch (cmid) + { + case TOAST_PGLZ_COMPRESSION_ID: + return pglz_decompress_datum(attr); + case TOAST_LZ4_COMPRESSION_ID: + return lz4_decompress_datum(attr); + default: + elog(ERROR, "invalid compression method id %d", cmid); + return NULL; /* keep compiler quiet */ + } +} + + +/* ---------- + * toast_decompress_datum_slice - + * + * Decompress the front of a compressed version of a varlena datum. + * offset handling happens in detoast_attr_slice. + * Here we just decompress a slice from the front. + */ +static struct varlena * +toast_decompress_datum_slice(struct varlena *attr, int32 slicelength) +{ + ToastCompressionId cmid; + + Assert(VARATT_IS_COMPRESSED(attr)); + + /* + * Some callers may pass a slicelength that's more than the actual + * decompressed size. If so, just decompress normally. This avoids + * possibly allocating a larger-than-necessary result object, and may be + * faster and/or more robust as well. Notably, some versions of liblz4 + * have been seen to give wrong results if passed an output size that is + * more than the data's true decompressed size. + */ + if ((uint32) slicelength >= TOAST_COMPRESS_EXTSIZE(attr)) + return toast_decompress_datum(attr); + + /* + * Fetch the compression method id stored in the compression header and + * decompress the data slice using the appropriate decompression routine. + */ + cmid = TOAST_COMPRESS_METHOD(attr); + switch (cmid) + { + case TOAST_PGLZ_COMPRESSION_ID: + return pglz_decompress_datum_slice(attr, slicelength); + case TOAST_LZ4_COMPRESSION_ID: + return lz4_decompress_datum_slice(attr, slicelength); + default: + elog(ERROR, "invalid compression method id %d", cmid); + return NULL; /* keep compiler quiet */ + } +} + +/* ---------- + * toast_raw_datum_size - + * + * Return the raw (detoasted) size of a varlena datum + * (including the VARHDRSZ header) + * ---------- + */ +Size +toast_raw_datum_size(Datum value) +{ + struct varlena *attr = (struct varlena *) DatumGetPointer(value); + Size result; + + if (VARATT_IS_EXTERNAL_ONDISK(attr)) + { + /* va_rawsize is the size of the original datum -- including header */ + struct varatt_external toast_pointer; + + VARATT_EXTERNAL_GET_POINTER(toast_pointer, attr); + result = toast_pointer.va_rawsize; + } + else if (VARATT_IS_EXTERNAL_INDIRECT(attr)) + { + struct varatt_indirect toast_pointer; + + VARATT_EXTERNAL_GET_POINTER(toast_pointer, attr); + + /* nested indirect Datums aren't allowed */ + Assert(!VARATT_IS_EXTERNAL_INDIRECT(toast_pointer.pointer)); + + return toast_raw_datum_size(PointerGetDatum(toast_pointer.pointer)); + } + else if (VARATT_IS_EXTERNAL_EXPANDED(attr)) + { + result = EOH_get_flat_size(DatumGetEOHP(value)); + } + else if (VARATT_IS_COMPRESSED(attr)) + { + /* here, va_rawsize is just the payload size */ + result = VARDATA_COMPRESSED_GET_EXTSIZE(attr) + VARHDRSZ; + } + else if (VARATT_IS_SHORT(attr)) + { + /* + * we have to normalize the header length to VARHDRSZ or else the + * callers of this function will be confused. + */ + result = VARSIZE_SHORT(attr) - VARHDRSZ_SHORT + VARHDRSZ; + } + else + { + /* plain untoasted datum */ + result = VARSIZE(attr); + } + return result; +} + +/* ---------- + * toast_datum_size + * + * Return the physical storage size (possibly compressed) of a varlena datum + * ---------- + */ +Size +toast_datum_size(Datum value) +{ + struct varlena *attr = (struct varlena *) DatumGetPointer(value); + Size result; + + if (VARATT_IS_EXTERNAL_ONDISK(attr)) + { + /* + * Attribute is stored externally - return the extsize whether + * compressed or not. We do not count the size of the toast pointer + * ... should we? + */ + struct varatt_external toast_pointer; + + VARATT_EXTERNAL_GET_POINTER(toast_pointer, attr); + result = VARATT_EXTERNAL_GET_EXTSIZE(toast_pointer); + } + else if (VARATT_IS_EXTERNAL_INDIRECT(attr)) + { + struct varatt_indirect toast_pointer; + + VARATT_EXTERNAL_GET_POINTER(toast_pointer, attr); + + /* nested indirect Datums aren't allowed */ + Assert(!VARATT_IS_EXTERNAL_INDIRECT(attr)); + + return toast_datum_size(PointerGetDatum(toast_pointer.pointer)); + } + else if (VARATT_IS_EXTERNAL_EXPANDED(attr)) + { + result = EOH_get_flat_size(DatumGetEOHP(value)); + } + else if (VARATT_IS_SHORT(attr)) + { + result = VARSIZE_SHORT(attr); + } + else + { + /* + * Attribute is stored inline either compressed or not, just calculate + * the size of the datum in either case. + */ + result = VARSIZE(attr); + } + return result; +} diff --git a/src/backend/access/common/heaptuple.c b/src/backend/access/common/heaptuple.c new file mode 100644 index 0000000..0b56b0f --- /dev/null +++ b/src/backend/access/common/heaptuple.c @@ -0,0 +1,1501 @@ +/*------------------------------------------------------------------------- + * + * heaptuple.c + * This file contains heap tuple accessor and mutator routines, as well + * as various tuple utilities. + * + * Some notes about varlenas and this code: + * + * Before Postgres 8.3 varlenas always had a 4-byte length header, and + * therefore always needed 4-byte alignment (at least). This wasted space + * for short varlenas, for example CHAR(1) took 5 bytes and could need up to + * 3 additional padding bytes for alignment. + * + * Now, a short varlena (up to 126 data bytes) is reduced to a 1-byte header + * and we don't align it. To hide this from datatype-specific functions that + * don't want to deal with it, such a datum is considered "toasted" and will + * be expanded back to the normal 4-byte-header format by pg_detoast_datum. + * (In performance-critical code paths we can use pg_detoast_datum_packed + * and the appropriate access macros to avoid that overhead.) Note that this + * conversion is performed directly in heap_form_tuple, without invoking + * heaptoast.c. + * + * This change will break any code that assumes it needn't detoast values + * that have been put into a tuple but never sent to disk. Hopefully there + * are few such places. + * + * Varlenas still have alignment INT (or DOUBLE) in pg_type/pg_attribute, since + * that's the normal requirement for the untoasted format. But we ignore that + * for the 1-byte-header format. This means that the actual start position + * of a varlena datum may vary depending on which format it has. To determine + * what is stored, we have to require that alignment padding bytes be zero. + * (Postgres actually has always zeroed them, but now it's required!) Since + * the first byte of a 1-byte-header varlena can never be zero, we can examine + * the first byte after the previous datum to tell if it's a pad byte or the + * start of a 1-byte-header varlena. + * + * Note that while formerly we could rely on the first varlena column of a + * system catalog to be at the offset suggested by the C struct for the + * catalog, this is now risky: it's only safe if the preceding field is + * word-aligned, so that there will never be any padding. + * + * We don't pack varlenas whose attstorage is PLAIN, since the data type + * isn't expecting to have to detoast values. This is used in particular + * by oidvector and int2vector, which are used in the system catalogs + * and we'd like to still refer to them via C struct offsets. + * + * + * Portions Copyright (c) 1996-2021, PostgreSQL Global Development Group + * Portions Copyright (c) 1994, Regents of the University of California + * + * + * IDENTIFICATION + * src/backend/access/common/heaptuple.c + * + *------------------------------------------------------------------------- + */ + +#include "postgres.h" + +#include "access/heaptoast.h" +#include "access/sysattr.h" +#include "access/tupdesc_details.h" +#include "executor/tuptable.h" +#include "utils/expandeddatum.h" + + +/* Does att's datatype allow packing into the 1-byte-header varlena format? */ +#define ATT_IS_PACKABLE(att) \ + ((att)->attlen == -1 && (att)->attstorage != TYPSTORAGE_PLAIN) +/* Use this if it's already known varlena */ +#define VARLENA_ATT_IS_PACKABLE(att) \ + ((att)->attstorage != TYPSTORAGE_PLAIN) + + +/* ---------------------------------------------------------------- + * misc support routines + * ---------------------------------------------------------------- + */ + +/* + * Return the missing value of an attribute, or NULL if there isn't one. + */ +Datum +getmissingattr(TupleDesc tupleDesc, + int attnum, bool *isnull) +{ + Form_pg_attribute att; + + Assert(attnum <= tupleDesc->natts); + Assert(attnum > 0); + + att = TupleDescAttr(tupleDesc, attnum - 1); + + if (att->atthasmissing) + { + AttrMissing *attrmiss; + + Assert(tupleDesc->constr); + Assert(tupleDesc->constr->missing); + + attrmiss = tupleDesc->constr->missing + (attnum - 1); + + if (attrmiss->am_present) + { + *isnull = false; + return attrmiss->am_value; + } + } + + *isnull = true; + return PointerGetDatum(NULL); +} + +/* + * heap_compute_data_size + * Determine size of the data area of a tuple to be constructed + */ +Size +heap_compute_data_size(TupleDesc tupleDesc, + Datum *values, + bool *isnull) +{ + Size data_length = 0; + int i; + int numberOfAttributes = tupleDesc->natts; + + for (i = 0; i < numberOfAttributes; i++) + { + Datum val; + Form_pg_attribute atti; + + if (isnull[i]) + continue; + + val = values[i]; + atti = TupleDescAttr(tupleDesc, i); + + if (ATT_IS_PACKABLE(atti) && + VARATT_CAN_MAKE_SHORT(DatumGetPointer(val))) + { + /* + * we're anticipating converting to a short varlena header, so + * adjust length and don't count any alignment + */ + data_length += VARATT_CONVERTED_SHORT_SIZE(DatumGetPointer(val)); + } + else if (atti->attlen == -1 && + VARATT_IS_EXTERNAL_EXPANDED(DatumGetPointer(val))) + { + /* + * we want to flatten the expanded value so that the constructed + * tuple doesn't depend on it + */ + data_length = att_align_nominal(data_length, atti->attalign); + data_length += EOH_get_flat_size(DatumGetEOHP(val)); + } + else + { + data_length = att_align_datum(data_length, atti->attalign, + atti->attlen, val); + data_length = att_addlength_datum(data_length, atti->attlen, + val); + } + } + + return data_length; +} + +/* + * Per-attribute helper for heap_fill_tuple and other routines building tuples. + * + * Fill in either a data value or a bit in the null bitmask + */ +static inline void +fill_val(Form_pg_attribute att, + bits8 **bit, + int *bitmask, + char **dataP, + uint16 *infomask, + Datum datum, + bool isnull) +{ + Size data_length; + char *data = *dataP; + + /* + * If we're building a null bitmap, set the appropriate bit for the + * current column value here. + */ + if (bit != NULL) + { + if (*bitmask != HIGHBIT) + *bitmask <<= 1; + else + { + *bit += 1; + **bit = 0x0; + *bitmask = 1; + } + + if (isnull) + { + *infomask |= HEAP_HASNULL; + return; + } + + **bit |= *bitmask; + } + + /* + * XXX we use the att_align macros on the pointer value itself, not on an + * offset. This is a bit of a hack. + */ + if (att->attbyval) + { + /* pass-by-value */ + data = (char *) att_align_nominal(data, att->attalign); + store_att_byval(data, datum, att->attlen); + data_length = att->attlen; + } + else if (att->attlen == -1) + { + /* varlena */ + Pointer val = DatumGetPointer(datum); + + *infomask |= HEAP_HASVARWIDTH; + if (VARATT_IS_EXTERNAL(val)) + { + if (VARATT_IS_EXTERNAL_EXPANDED(val)) + { + /* + * we want to flatten the expanded value so that the + * constructed tuple doesn't depend on it + */ + ExpandedObjectHeader *eoh = DatumGetEOHP(datum); + + data = (char *) att_align_nominal(data, + att->attalign); + data_length = EOH_get_flat_size(eoh); + EOH_flatten_into(eoh, data, data_length); + } + else + { + *infomask |= HEAP_HASEXTERNAL; + /* no alignment, since it's short by definition */ + data_length = VARSIZE_EXTERNAL(val); + memcpy(data, val, data_length); + } + } + else if (VARATT_IS_SHORT(val)) + { + /* no alignment for short varlenas */ + data_length = VARSIZE_SHORT(val); + memcpy(data, val, data_length); + } + else if (VARLENA_ATT_IS_PACKABLE(att) && + VARATT_CAN_MAKE_SHORT(val)) + { + /* convert to short varlena -- no alignment */ + data_length = VARATT_CONVERTED_SHORT_SIZE(val); + SET_VARSIZE_SHORT(data, data_length); + memcpy(data + 1, VARDATA(val), data_length - 1); + } + else + { + /* full 4-byte header varlena */ + data = (char *) att_align_nominal(data, + att->attalign); + data_length = VARSIZE(val); + memcpy(data, val, data_length); + } + } + else if (att->attlen == -2) + { + /* cstring ... never needs alignment */ + *infomask |= HEAP_HASVARWIDTH; + Assert(att->attalign == TYPALIGN_CHAR); + data_length = strlen(DatumGetCString(datum)) + 1; + memcpy(data, DatumGetPointer(datum), data_length); + } + else + { + /* fixed-length pass-by-reference */ + data = (char *) att_align_nominal(data, att->attalign); + Assert(att->attlen > 0); + data_length = att->attlen; + memcpy(data, DatumGetPointer(datum), data_length); + } + + data += data_length; + *dataP = data; +} + +/* + * heap_fill_tuple + * Load data portion of a tuple from values/isnull arrays + * + * We also fill the null bitmap (if any) and set the infomask bits + * that reflect the tuple's data contents. + * + * NOTE: it is now REQUIRED that the caller have pre-zeroed the data area. + */ +void +heap_fill_tuple(TupleDesc tupleDesc, + Datum *values, bool *isnull, + char *data, Size data_size, + uint16 *infomask, bits8 *bit) +{ + bits8 *bitP; + int bitmask; + int i; + int numberOfAttributes = tupleDesc->natts; + +#ifdef USE_ASSERT_CHECKING + char *start = data; +#endif + + if (bit != NULL) + { + bitP = &bit[-1]; + bitmask = HIGHBIT; + } + else + { + /* just to keep compiler quiet */ + bitP = NULL; + bitmask = 0; + } + + *infomask &= ~(HEAP_HASNULL | HEAP_HASVARWIDTH | HEAP_HASEXTERNAL); + + for (i = 0; i < numberOfAttributes; i++) + { + Form_pg_attribute attr = TupleDescAttr(tupleDesc, i); + + fill_val(attr, + bitP ? &bitP : NULL, + &bitmask, + &data, + infomask, + values ? values[i] : PointerGetDatum(NULL), + isnull ? isnull[i] : true); + } + + Assert((data - start) == data_size); +} + + +/* ---------------------------------------------------------------- + * heap tuple interface + * ---------------------------------------------------------------- + */ + +/* ---------------- + * heap_attisnull - returns true iff tuple attribute is not present + * ---------------- + */ +bool +heap_attisnull(HeapTuple tup, int attnum, TupleDesc tupleDesc) +{ + /* + * We allow a NULL tupledesc for relations not expected to have missing + * values, such as catalog relations and indexes. + */ + Assert(!tupleDesc || attnum <= tupleDesc->natts); + if (attnum > (int) HeapTupleHeaderGetNatts(tup->t_data)) + { + if (tupleDesc && TupleDescAttr(tupleDesc, attnum - 1)->atthasmissing) + return false; + else + return true; + } + + if (attnum > 0) + { + if (HeapTupleNoNulls(tup)) + return false; + return att_isnull(attnum - 1, tup->t_data->t_bits); + } + + switch (attnum) + { + case TableOidAttributeNumber: + case SelfItemPointerAttributeNumber: + case MinTransactionIdAttributeNumber: + case MinCommandIdAttributeNumber: + case MaxTransactionIdAttributeNumber: + case MaxCommandIdAttributeNumber: + /* these are never null */ + break; + + default: + elog(ERROR, "invalid attnum: %d", attnum); + } + + return false; +} + +/* ---------------- + * nocachegetattr + * + * This only gets called from fastgetattr() macro, in cases where + * we can't use a cacheoffset and the value is not null. + * + * This caches attribute offsets in the attribute descriptor. + * + * An alternative way to speed things up would be to cache offsets + * with the tuple, but that seems more difficult unless you take + * the storage hit of actually putting those offsets into the + * tuple you send to disk. Yuck. + * + * This scheme will be slightly slower than that, but should + * perform well for queries which hit large #'s of tuples. After + * you cache the offsets once, examining all the other tuples using + * the same attribute descriptor will go much quicker. -cim 5/4/91 + * + * NOTE: if you need to change this code, see also heap_deform_tuple. + * Also see nocache_index_getattr, which is the same code for index + * tuples. + * ---------------- + */ +Datum +nocachegetattr(HeapTuple tuple, + int attnum, + TupleDesc tupleDesc) +{ + HeapTupleHeader tup = tuple->t_data; + char *tp; /* ptr to data part of tuple */ + bits8 *bp = tup->t_bits; /* ptr to null bitmap in tuple */ + bool slow = false; /* do we have to walk attrs? */ + int off; /* current offset within data */ + + /* ---------------- + * Three cases: + * + * 1: No nulls and no variable-width attributes. + * 2: Has a null or a var-width AFTER att. + * 3: Has nulls or var-widths BEFORE att. + * ---------------- + */ + + attnum--; + + if (!HeapTupleNoNulls(tuple)) + { + /* + * there's a null somewhere in the tuple + * + * check to see if any preceding bits are null... + */ + int byte = attnum >> 3; + int finalbit = attnum & 0x07; + + /* check for nulls "before" final bit of last byte */ + if ((~bp[byte]) & ((1 << finalbit) - 1)) + slow = true; + else + { + /* check for nulls in any "earlier" bytes */ + int i; + + for (i = 0; i < byte; i++) + { + if (bp[i] != 0xFF) + { + slow = true; + break; + } + } + } + } + + tp = (char *) tup + tup->t_hoff; + + if (!slow) + { + Form_pg_attribute att; + + /* + * If we get here, there are no nulls up to and including the target + * attribute. If we have a cached offset, we can use it. + */ + att = TupleDescAttr(tupleDesc, attnum); + if (att->attcacheoff >= 0) + return fetchatt(att, tp + att->attcacheoff); + + /* + * Otherwise, check for non-fixed-length attrs up to and including + * target. If there aren't any, it's safe to cheaply initialize the + * cached offsets for these attrs. + */ + if (HeapTupleHasVarWidth(tuple)) + { + int j; + + for (j = 0; j <= attnum; j++) + { + if (TupleDescAttr(tupleDesc, j)->attlen <= 0) + { + slow = true; + break; + } + } + } + } + + if (!slow) + { + int natts = tupleDesc->natts; + int j = 1; + + /* + * If we get here, we have a tuple with no nulls or var-widths up to + * and including the target attribute, so we can use the cached offset + * ... only we don't have it yet, or we'd not have got here. Since + * it's cheap to compute offsets for fixed-width columns, we take the + * opportunity to initialize the cached offsets for *all* the leading + * fixed-width columns, in hope of avoiding future visits to this + * routine. + */ + TupleDescAttr(tupleDesc, 0)->attcacheoff = 0; + + /* we might have set some offsets in the slow path previously */ + while (j < natts && TupleDescAttr(tupleDesc, j)->attcacheoff > 0) + j++; + + off = TupleDescAttr(tupleDesc, j - 1)->attcacheoff + + TupleDescAttr(tupleDesc, j - 1)->attlen; + + for (; j < natts; j++) + { + Form_pg_attribute att = TupleDescAttr(tupleDesc, j); + + if (att->attlen <= 0) + break; + + off = att_align_nominal(off, att->attalign); + + att->attcacheoff = off; + + off += att->attlen; + } + + Assert(j > attnum); + + off = TupleDescAttr(tupleDesc, attnum)->attcacheoff; + } + else + { + bool usecache = true; + int i; + + /* + * Now we know that we have to walk the tuple CAREFULLY. But we still + * might be able to cache some offsets for next time. + * + * Note - This loop is a little tricky. For each non-null attribute, + * we have to first account for alignment padding before the attr, + * then advance over the attr based on its length. Nulls have no + * storage and no alignment padding either. We can use/set + * attcacheoff until we reach either a null or a var-width attribute. + */ + off = 0; + for (i = 0;; i++) /* loop exit is at "break" */ + { + Form_pg_attribute att = TupleDescAttr(tupleDesc, i); + + if (HeapTupleHasNulls(tuple) && att_isnull(i, bp)) + { + usecache = false; + continue; /* this cannot be the target att */ + } + + /* If we know the next offset, we can skip the rest */ + if (usecache && att->attcacheoff >= 0) + off = att->attcacheoff; + else if (att->attlen == -1) + { + /* + * We can only cache the offset for a varlena attribute if the + * offset is already suitably aligned, so that there would be + * no pad bytes in any case: then the offset will be valid for + * either an aligned or unaligned value. + */ + if (usecache && + off == att_align_nominal(off, att->attalign)) + att->attcacheoff = off; + else + { + off = att_align_pointer(off, att->attalign, -1, + tp + off); + usecache = false; + } + } + else + { + /* not varlena, so safe to use att_align_nominal */ + off = att_align_nominal(off, att->attalign); + + if (usecache) + att->attcacheoff = off; + } + + if (i == attnum) + break; + + off = att_addlength_pointer(off, att->attlen, tp + off); + + if (usecache && att->attlen <= 0) + usecache = false; + } + } + + return fetchatt(TupleDescAttr(tupleDesc, attnum), tp + off); +} + +/* ---------------- + * heap_getsysattr + * + * Fetch the value of a system attribute for a tuple. + * + * This is a support routine for the heap_getattr macro. The macro + * has already determined that the attnum refers to a system attribute. + * ---------------- + */ +Datum +heap_getsysattr(HeapTuple tup, int attnum, TupleDesc tupleDesc, bool *isnull) +{ + Datum result; + + Assert(tup); + + /* Currently, no sys attribute ever reads as NULL. */ + *isnull = false; + + switch (attnum) + { + case SelfItemPointerAttributeNumber: + /* pass-by-reference datatype */ + result = PointerGetDatum(&(tup->t_self)); + break; + case MinTransactionIdAttributeNumber: + result = TransactionIdGetDatum(HeapTupleHeaderGetRawXmin(tup->t_data)); + break; + case MaxTransactionIdAttributeNumber: + result = TransactionIdGetDatum(HeapTupleHeaderGetRawXmax(tup->t_data)); + break; + case MinCommandIdAttributeNumber: + case MaxCommandIdAttributeNumber: + + /* + * cmin and cmax are now both aliases for the same field, which + * can in fact also be a combo command id. XXX perhaps we should + * return the "real" cmin or cmax if possible, that is if we are + * inside the originating transaction? + */ + result = CommandIdGetDatum(HeapTupleHeaderGetRawCommandId(tup->t_data)); + break; + case TableOidAttributeNumber: + result = ObjectIdGetDatum(tup->t_tableOid); + break; + default: + elog(ERROR, "invalid attnum: %d", attnum); + result = 0; /* keep compiler quiet */ + break; + } + return result; +} + +/* ---------------- + * heap_copytuple + * + * returns a copy of an entire tuple + * + * The HeapTuple struct, tuple header, and tuple data are all allocated + * as a single palloc() block. + * ---------------- + */ +HeapTuple +heap_copytuple(HeapTuple tuple) +{ + HeapTuple newTuple; + + if (!HeapTupleIsValid(tuple) || tuple->t_data == NULL) + return NULL; + + newTuple = (HeapTuple) palloc(HEAPTUPLESIZE + tuple->t_len); + newTuple->t_len = tuple->t_len; + newTuple->t_self = tuple->t_self; + newTuple->t_tableOid = tuple->t_tableOid; + newTuple->t_data = (HeapTupleHeader) ((char *) newTuple + HEAPTUPLESIZE); + memcpy((char *) newTuple->t_data, (char *) tuple->t_data, tuple->t_len); + return newTuple; +} + +/* ---------------- + * heap_copytuple_with_tuple + * + * copy a tuple into a caller-supplied HeapTuple management struct + * + * Note that after calling this function, the "dest" HeapTuple will not be + * allocated as a single palloc() block (unlike with heap_copytuple()). + * ---------------- + */ +void +heap_copytuple_with_tuple(HeapTuple src, HeapTuple dest) +{ + if (!HeapTupleIsValid(src) || src->t_data == NULL) + { + dest->t_data = NULL; + return; + } + + dest->t_len = src->t_len; + dest->t_self = src->t_self; + dest->t_tableOid = src->t_tableOid; + dest->t_data = (HeapTupleHeader) palloc(src->t_len); + memcpy((char *) dest->t_data, (char *) src->t_data, src->t_len); +} + +/* + * Expand a tuple which has fewer attributes than required. For each attribute + * not present in the sourceTuple, if there is a missing value that will be + * used. Otherwise the attribute will be set to NULL. + * + * The source tuple must have fewer attributes than the required number. + * + * Only one of targetHeapTuple and targetMinimalTuple may be supplied. The + * other argument must be NULL. + */ +static void +expand_tuple(HeapTuple *targetHeapTuple, + MinimalTuple *targetMinimalTuple, + HeapTuple sourceTuple, + TupleDesc tupleDesc) +{ + AttrMissing *attrmiss = NULL; + int attnum; + int firstmissingnum; + bool hasNulls = HeapTupleHasNulls(sourceTuple); + HeapTupleHeader targetTHeader; + HeapTupleHeader sourceTHeader = sourceTuple->t_data; + int sourceNatts = HeapTupleHeaderGetNatts(sourceTHeader); + int natts = tupleDesc->natts; + int sourceNullLen; + int targetNullLen; + Size sourceDataLen = sourceTuple->t_len - sourceTHeader->t_hoff; + Size targetDataLen; + Size len; + int hoff; + bits8 *nullBits = NULL; + int bitMask = 0; + char *targetData; + uint16 *infoMask; + + Assert((targetHeapTuple && !targetMinimalTuple) + || (!targetHeapTuple && targetMinimalTuple)); + + Assert(sourceNatts < natts); + + sourceNullLen = (hasNulls ? BITMAPLEN(sourceNatts) : 0); + + targetDataLen = sourceDataLen; + + if (tupleDesc->constr && + tupleDesc->constr->missing) + { + /* + * If there are missing values we want to put them into the tuple. + * Before that we have to compute the extra length for the values + * array and the variable length data. + */ + attrmiss = tupleDesc->constr->missing; + + /* + * Find the first item in attrmiss for which we don't have a value in + * the source. We can ignore all the missing entries before that. + */ + for (firstmissingnum = sourceNatts; + firstmissingnum < natts; + firstmissingnum++) + { + if (attrmiss[firstmissingnum].am_present) + break; + else + hasNulls = true; + } + + /* + * Now walk the missing attributes. If there is a missing value make + * space for it. Otherwise, it's going to be NULL. + */ + for (attnum = firstmissingnum; + attnum < natts; + attnum++) + { + if (attrmiss[attnum].am_present) + { + Form_pg_attribute att = TupleDescAttr(tupleDesc, attnum); + + targetDataLen = att_align_datum(targetDataLen, + att->attalign, + att->attlen, + attrmiss[attnum].am_value); + + targetDataLen = att_addlength_pointer(targetDataLen, + att->attlen, + attrmiss[attnum].am_value); + } + else + { + /* no missing value, so it must be null */ + hasNulls = true; + } + } + } /* end if have missing values */ + else + { + /* + * If there are no missing values at all then NULLS must be allowed, + * since some of the attributes are known to be absent. + */ + hasNulls = true; + } + + len = 0; + + if (hasNulls) + { + targetNullLen = BITMAPLEN(natts); + len += targetNullLen; + } + else + targetNullLen = 0; + + /* + * Allocate and zero the space needed. Note that the tuple body and + * HeapTupleData management structure are allocated in one chunk. + */ + if (targetHeapTuple) + { + len += offsetof(HeapTupleHeaderData, t_bits); + hoff = len = MAXALIGN(len); /* align user data safely */ + len += targetDataLen; + + *targetHeapTuple = (HeapTuple) palloc0(HEAPTUPLESIZE + len); + (*targetHeapTuple)->t_data + = targetTHeader + = (HeapTupleHeader) ((char *) *targetHeapTuple + HEAPTUPLESIZE); + (*targetHeapTuple)->t_len = len; + (*targetHeapTuple)->t_tableOid = sourceTuple->t_tableOid; + (*targetHeapTuple)->t_self = sourceTuple->t_self; + + targetTHeader->t_infomask = sourceTHeader->t_infomask; + targetTHeader->t_hoff = hoff; + HeapTupleHeaderSetNatts(targetTHeader, natts); + HeapTupleHeaderSetDatumLength(targetTHeader, len); + HeapTupleHeaderSetTypeId(targetTHeader, tupleDesc->tdtypeid); + HeapTupleHeaderSetTypMod(targetTHeader, tupleDesc->tdtypmod); + /* We also make sure that t_ctid is invalid unless explicitly set */ + ItemPointerSetInvalid(&(targetTHeader->t_ctid)); + if (targetNullLen > 0) + nullBits = (bits8 *) ((char *) (*targetHeapTuple)->t_data + + offsetof(HeapTupleHeaderData, t_bits)); + targetData = (char *) (*targetHeapTuple)->t_data + hoff; + infoMask = &(targetTHeader->t_infomask); + } + else + { + len += SizeofMinimalTupleHeader; + hoff = len = MAXALIGN(len); /* align user data safely */ + len += targetDataLen; + + *targetMinimalTuple = (MinimalTuple) palloc0(len); + (*targetMinimalTuple)->t_len = len; + (*targetMinimalTuple)->t_hoff = hoff + MINIMAL_TUPLE_OFFSET; + (*targetMinimalTuple)->t_infomask = sourceTHeader->t_infomask; + /* Same macro works for MinimalTuples */ + HeapTupleHeaderSetNatts(*targetMinimalTuple, natts); + if (targetNullLen > 0) + nullBits = (bits8 *) ((char *) *targetMinimalTuple + + offsetof(MinimalTupleData, t_bits)); + targetData = (char *) *targetMinimalTuple + hoff; + infoMask = &((*targetMinimalTuple)->t_infomask); + } + + if (targetNullLen > 0) + { + if (sourceNullLen > 0) + { + /* if bitmap pre-existed copy in - all is set */ + memcpy(nullBits, + ((char *) sourceTHeader) + + offsetof(HeapTupleHeaderData, t_bits), + sourceNullLen); + nullBits += sourceNullLen - 1; + } + else + { + sourceNullLen = BITMAPLEN(sourceNatts); + /* Set NOT NULL for all existing attributes */ + memset(nullBits, 0xff, sourceNullLen); + + nullBits += sourceNullLen - 1; + + if (sourceNatts & 0x07) + { + /* build the mask (inverted!) */ + bitMask = 0xff << (sourceNatts & 0x07); + /* Voila */ + *nullBits = ~bitMask; + } + } + + bitMask = (1 << ((sourceNatts - 1) & 0x07)); + } /* End if have null bitmap */ + + memcpy(targetData, + ((char *) sourceTuple->t_data) + sourceTHeader->t_hoff, + sourceDataLen); + + targetData += sourceDataLen; + + /* Now fill in the missing values */ + for (attnum = sourceNatts; attnum < natts; attnum++) + { + + Form_pg_attribute attr = TupleDescAttr(tupleDesc, attnum); + + if (attrmiss && attrmiss[attnum].am_present) + { + fill_val(attr, + nullBits ? &nullBits : NULL, + &bitMask, + &targetData, + infoMask, + attrmiss[attnum].am_value, + false); + } + else + { + fill_val(attr, + &nullBits, + &bitMask, + &targetData, + infoMask, + (Datum) 0, + true); + } + } /* end loop over missing attributes */ +} + +/* + * Fill in the missing values for a minimal HeapTuple + */ +MinimalTuple +minimal_expand_tuple(HeapTuple sourceTuple, TupleDesc tupleDesc) +{ + MinimalTuple minimalTuple; + + expand_tuple(NULL, &minimalTuple, sourceTuple, tupleDesc); + return minimalTuple; +} + +/* + * Fill in the missing values for an ordinary HeapTuple + */ +HeapTuple +heap_expand_tuple(HeapTuple sourceTuple, TupleDesc tupleDesc) +{ + HeapTuple heapTuple; + + expand_tuple(&heapTuple, NULL, sourceTuple, tupleDesc); + return heapTuple; +} + +/* ---------------- + * heap_copy_tuple_as_datum + * + * copy a tuple as a composite-type Datum + * ---------------- + */ +Datum +heap_copy_tuple_as_datum(HeapTuple tuple, TupleDesc tupleDesc) +{ + HeapTupleHeader td; + + /* + * If the tuple contains any external TOAST pointers, we have to inline + * those fields to meet the conventions for composite-type Datums. + */ + if (HeapTupleHasExternal(tuple)) + return toast_flatten_tuple_to_datum(tuple->t_data, + tuple->t_len, + tupleDesc); + + /* + * Fast path for easy case: just make a palloc'd copy and insert the + * correct composite-Datum header fields (since those may not be set if + * the given tuple came from disk, rather than from heap_form_tuple). + */ + td = (HeapTupleHeader) palloc(tuple->t_len); + memcpy((char *) td, (char *) tuple->t_data, tuple->t_len); + + HeapTupleHeaderSetDatumLength(td, tuple->t_len); + HeapTupleHeaderSetTypeId(td, tupleDesc->tdtypeid); + HeapTupleHeaderSetTypMod(td, tupleDesc->tdtypmod); + + return PointerGetDatum(td); +} + +/* + * heap_form_tuple + * construct a tuple from the given values[] and isnull[] arrays, + * which are of the length indicated by tupleDescriptor->natts + * + * The result is allocated in the current memory context. + */ +HeapTuple +heap_form_tuple(TupleDesc tupleDescriptor, + Datum *values, + bool *isnull) +{ + HeapTuple tuple; /* return tuple */ + HeapTupleHeader td; /* tuple data */ + Size len, + data_len; + int hoff; + bool hasnull = false; + int numberOfAttributes = tupleDescriptor->natts; + int i; + + if (numberOfAttributes > MaxTupleAttributeNumber) + ereport(ERROR, + (errcode(ERRCODE_TOO_MANY_COLUMNS), + errmsg("number of columns (%d) exceeds limit (%d)", + numberOfAttributes, MaxTupleAttributeNumber))); + + /* + * Check for nulls + */ + for (i = 0; i < numberOfAttributes; i++) + { + if (isnull[i]) + { + hasnull = true; + break; + } + } + + /* + * Determine total space needed + */ + len = offsetof(HeapTupleHeaderData, t_bits); + + if (hasnull) + len += BITMAPLEN(numberOfAttributes); + + hoff = len = MAXALIGN(len); /* align user data safely */ + + data_len = heap_compute_data_size(tupleDescriptor, values, isnull); + + len += data_len; + + /* + * Allocate and zero the space needed. Note that the tuple body and + * HeapTupleData management structure are allocated in one chunk. + */ + tuple = (HeapTuple) palloc0(HEAPTUPLESIZE + len); + tuple->t_data = td = (HeapTupleHeader) ((char *) tuple + HEAPTUPLESIZE); + + /* + * And fill in the information. Note we fill the Datum fields even though + * this tuple may never become a Datum. This lets HeapTupleHeaderGetDatum + * identify the tuple type if needed. + */ + tuple->t_len = len; + ItemPointerSetInvalid(&(tuple->t_self)); + tuple->t_tableOid = InvalidOid; + + HeapTupleHeaderSetDatumLength(td, len); + HeapTupleHeaderSetTypeId(td, tupleDescriptor->tdtypeid); + HeapTupleHeaderSetTypMod(td, tupleDescriptor->tdtypmod); + /* We also make sure that t_ctid is invalid unless explicitly set */ + ItemPointerSetInvalid(&(td->t_ctid)); + + HeapTupleHeaderSetNatts(td, numberOfAttributes); + td->t_hoff = hoff; + + heap_fill_tuple(tupleDescriptor, + values, + isnull, + (char *) td + hoff, + data_len, + &td->t_infomask, + (hasnull ? td->t_bits : NULL)); + + return tuple; +} + +/* + * heap_modify_tuple + * form a new tuple from an old tuple and a set of replacement values. + * + * The replValues, replIsnull, and doReplace arrays must be of the length + * indicated by tupleDesc->natts. The new tuple is constructed using the data + * from replValues/replIsnull at columns where doReplace is true, and using + * the data from the old tuple at columns where doReplace is false. + * + * The result is allocated in the current memory context. + */ +HeapTuple +heap_modify_tuple(HeapTuple tuple, + TupleDesc tupleDesc, + Datum *replValues, + bool *replIsnull, + bool *doReplace) +{ + int numberOfAttributes = tupleDesc->natts; + int attoff; + Datum *values; + bool *isnull; + HeapTuple newTuple; + + /* + * allocate and fill values and isnull arrays from either the tuple or the + * repl information, as appropriate. + * + * NOTE: it's debatable whether to use heap_deform_tuple() here or just + * heap_getattr() only the non-replaced columns. The latter could win if + * there are many replaced columns and few non-replaced ones. However, + * heap_deform_tuple costs only O(N) while the heap_getattr way would cost + * O(N^2) if there are many non-replaced columns, so it seems better to + * err on the side of linear cost. + */ + values = (Datum *) palloc(numberOfAttributes * sizeof(Datum)); + isnull = (bool *) palloc(numberOfAttributes * sizeof(bool)); + + heap_deform_tuple(tuple, tupleDesc, values, isnull); + + for (attoff = 0; attoff < numberOfAttributes; attoff++) + { + if (doReplace[attoff]) + { + values[attoff] = replValues[attoff]; + isnull[attoff] = replIsnull[attoff]; + } + } + + /* + * create a new tuple from the values and isnull arrays + */ + newTuple = heap_form_tuple(tupleDesc, values, isnull); + + pfree(values); + pfree(isnull); + + /* + * copy the identification info of the old tuple: t_ctid, t_self + */ + newTuple->t_data->t_ctid = tuple->t_data->t_ctid; + newTuple->t_self = tuple->t_self; + newTuple->t_tableOid = tuple->t_tableOid; + + return newTuple; +} + +/* + * heap_modify_tuple_by_cols + * form a new tuple from an old tuple and a set of replacement values. + * + * This is like heap_modify_tuple, except that instead of specifying which + * column(s) to replace by a boolean map, an array of target column numbers + * is used. This is often more convenient when a fixed number of columns + * are to be replaced. The replCols, replValues, and replIsnull arrays must + * be of length nCols. Target column numbers are indexed from 1. + * + * The result is allocated in the current memory context. + */ +HeapTuple +heap_modify_tuple_by_cols(HeapTuple tuple, + TupleDesc tupleDesc, + int nCols, + int *replCols, + Datum *replValues, + bool *replIsnull) +{ + int numberOfAttributes = tupleDesc->natts; + Datum *values; + bool *isnull; + HeapTuple newTuple; + int i; + + /* + * allocate and fill values and isnull arrays from the tuple, then replace + * selected columns from the input arrays. + */ + values = (Datum *) palloc(numberOfAttributes * sizeof(Datum)); + isnull = (bool *) palloc(numberOfAttributes * sizeof(bool)); + + heap_deform_tuple(tuple, tupleDesc, values, isnull); + + for (i = 0; i < nCols; i++) + { + int attnum = replCols[i]; + + if (attnum <= 0 || attnum > numberOfAttributes) + elog(ERROR, "invalid column number %d", attnum); + values[attnum - 1] = replValues[i]; + isnull[attnum - 1] = replIsnull[i]; + } + + /* + * create a new tuple from the values and isnull arrays + */ + newTuple = heap_form_tuple(tupleDesc, values, isnull); + + pfree(values); + pfree(isnull); + + /* + * copy the identification info of the old tuple: t_ctid, t_self + */ + newTuple->t_data->t_ctid = tuple->t_data->t_ctid; + newTuple->t_self = tuple->t_self; + newTuple->t_tableOid = tuple->t_tableOid; + + return newTuple; +} + +/* + * heap_deform_tuple + * Given a tuple, extract data into values/isnull arrays; this is + * the inverse of heap_form_tuple. + * + * Storage for the values/isnull arrays is provided by the caller; + * it should be sized according to tupleDesc->natts not + * HeapTupleHeaderGetNatts(tuple->t_data). + * + * Note that for pass-by-reference datatypes, the pointer placed + * in the Datum will point into the given tuple. + * + * When all or most of a tuple's fields need to be extracted, + * this routine will be significantly quicker than a loop around + * heap_getattr; the loop will become O(N^2) as soon as any + * noncacheable attribute offsets are involved. + */ +void +heap_deform_tuple(HeapTuple tuple, TupleDesc tupleDesc, + Datum *values, bool *isnull) +{ + HeapTupleHeader tup = tuple->t_data; + bool hasnulls = HeapTupleHasNulls(tuple); + int tdesc_natts = tupleDesc->natts; + int natts; /* number of atts to extract */ + int attnum; + char *tp; /* ptr to tuple data */ + uint32 off; /* offset in tuple data */ + bits8 *bp = tup->t_bits; /* ptr to null bitmap in tuple */ + bool slow = false; /* can we use/set attcacheoff? */ + + natts = HeapTupleHeaderGetNatts(tup); + + /* + * In inheritance situations, it is possible that the given tuple actually + * has more fields than the caller is expecting. Don't run off the end of + * the caller's arrays. + */ + natts = Min(natts, tdesc_natts); + + tp = (char *) tup + tup->t_hoff; + + off = 0; + + for (attnum = 0; attnum < natts; attnum++) + { + Form_pg_attribute thisatt = TupleDescAttr(tupleDesc, attnum); + + if (hasnulls && att_isnull(attnum, bp)) + { + values[attnum] = (Datum) 0; + isnull[attnum] = true; + slow = true; /* can't use attcacheoff anymore */ + continue; + } + + isnull[attnum] = false; + + if (!slow && thisatt->attcacheoff >= 0) + off = thisatt->attcacheoff; + else if (thisatt->attlen == -1) + { + /* + * We can only cache the offset for a varlena attribute if the + * offset is already suitably aligned, so that there would be no + * pad bytes in any case: then the offset will be valid for either + * an aligned or unaligned value. + */ + if (!slow && + off == att_align_nominal(off, thisatt->attalign)) + thisatt->attcacheoff = off; + else + { + off = att_align_pointer(off, thisatt->attalign, -1, + tp + off); + slow = true; + } + } + else + { + /* not varlena, so safe to use att_align_nominal */ + off = att_align_nominal(off, thisatt->attalign); + + if (!slow) + thisatt->attcacheoff = off; + } + + values[attnum] = fetchatt(thisatt, tp + off); + + off = att_addlength_pointer(off, thisatt->attlen, tp + off); + + if (thisatt->attlen <= 0) + slow = true; /* can't use attcacheoff anymore */ + } + + /* + * If tuple doesn't have all the atts indicated by tupleDesc, read the + * rest as nulls or missing values as appropriate. + */ + for (; attnum < tdesc_natts; attnum++) + values[attnum] = getmissingattr(tupleDesc, attnum + 1, &isnull[attnum]); +} + +/* + * heap_freetuple + */ +void +heap_freetuple(HeapTuple htup) +{ + pfree(htup); +} + + +/* + * heap_form_minimal_tuple + * construct a MinimalTuple from the given values[] and isnull[] arrays, + * which are of the length indicated by tupleDescriptor->natts + * + * This is exactly like heap_form_tuple() except that the result is a + * "minimal" tuple lacking a HeapTupleData header as well as room for system + * columns. + * + * The result is allocated in the current memory context. + */ +MinimalTuple +heap_form_minimal_tuple(TupleDesc tupleDescriptor, + Datum *values, + bool *isnull) +{ + MinimalTuple tuple; /* return tuple */ + Size len, + data_len; + int hoff; + bool hasnull = false; + int numberOfAttributes = tupleDescriptor->natts; + int i; + + if (numberOfAttributes > MaxTupleAttributeNumber) + ereport(ERROR, + (errcode(ERRCODE_TOO_MANY_COLUMNS), + errmsg("number of columns (%d) exceeds limit (%d)", + numberOfAttributes, MaxTupleAttributeNumber))); + + /* + * Check for nulls + */ + for (i = 0; i < numberOfAttributes; i++) + { + if (isnull[i]) + { + hasnull = true; + break; + } + } + + /* + * Determine total space needed + */ + len = SizeofMinimalTupleHeader; + + if (hasnull) + len += BITMAPLEN(numberOfAttributes); + + hoff = len = MAXALIGN(len); /* align user data safely */ + + data_len = heap_compute_data_size(tupleDescriptor, values, isnull); + + len += data_len; + + /* + * Allocate and zero the space needed. + */ + tuple = (MinimalTuple) palloc0(len); + + /* + * And fill in the information. + */ + tuple->t_len = len; + HeapTupleHeaderSetNatts(tuple, numberOfAttributes); + tuple->t_hoff = hoff + MINIMAL_TUPLE_OFFSET; + + heap_fill_tuple(tupleDescriptor, + values, + isnull, + (char *) tuple + hoff, + data_len, + &tuple->t_infomask, + (hasnull ? tuple->t_bits : NULL)); + + return tuple; +} + +/* + * heap_free_minimal_tuple + */ +void +heap_free_minimal_tuple(MinimalTuple mtup) +{ + pfree(mtup); +} + +/* + * heap_copy_minimal_tuple + * copy a MinimalTuple + * + * The result is allocated in the current memory context. + */ +MinimalTuple +heap_copy_minimal_tuple(MinimalTuple mtup) +{ + MinimalTuple result; + + result = (MinimalTuple) palloc(mtup->t_len); + memcpy(result, mtup, mtup->t_len); + return result; +} + +/* + * heap_tuple_from_minimal_tuple + * create a HeapTuple by copying from a MinimalTuple; + * system columns are filled with zeroes + * + * The result is allocated in the current memory context. + * The HeapTuple struct, tuple header, and tuple data are all allocated + * as a single palloc() block. + */ +HeapTuple +heap_tuple_from_minimal_tuple(MinimalTuple mtup) +{ + HeapTuple result; + uint32 len = mtup->t_len + MINIMAL_TUPLE_OFFSET; + + result = (HeapTuple) palloc(HEAPTUPLESIZE + len); + result->t_len = len; + ItemPointerSetInvalid(&(result->t_self)); + result->t_tableOid = InvalidOid; + result->t_data = (HeapTupleHeader) ((char *) result + HEAPTUPLESIZE); + memcpy((char *) result->t_data + MINIMAL_TUPLE_OFFSET, mtup, mtup->t_len); + memset(result->t_data, 0, offsetof(HeapTupleHeaderData, t_infomask2)); + return result; +} + +/* + * minimal_tuple_from_heap_tuple + * create a MinimalTuple by copying from a HeapTuple + * + * The result is allocated in the current memory context. + */ +MinimalTuple +minimal_tuple_from_heap_tuple(HeapTuple htup) +{ + MinimalTuple result; + uint32 len; + + Assert(htup->t_len > MINIMAL_TUPLE_OFFSET); + len = htup->t_len - MINIMAL_TUPLE_OFFSET; + result = (MinimalTuple) palloc(len); + memcpy(result, (char *) htup->t_data + MINIMAL_TUPLE_OFFSET, len); + result->t_len = len; + return result; +} + +/* + * This mainly exists so JIT can inline the definition, but it's also + * sometimes useful in debugging sessions. + */ +size_t +varsize_any(void *p) +{ + return VARSIZE_ANY(p); +} diff --git a/src/backend/access/common/indextuple.c b/src/backend/access/common/indextuple.c new file mode 100644 index 0000000..8df882d --- /dev/null +++ b/src/backend/access/common/indextuple.c @@ -0,0 +1,589 @@ +/*------------------------------------------------------------------------- + * + * indextuple.c + * This file contains index tuple accessor and mutator routines, + * as well as various tuple utilities. + * + * Portions Copyright (c) 1996-2021, PostgreSQL Global Development Group + * Portions Copyright (c) 1994, Regents of the University of California + * + * + * IDENTIFICATION + * src/backend/access/common/indextuple.c + * + *------------------------------------------------------------------------- + */ + +#include "postgres.h" + +#include "access/detoast.h" +#include "access/heaptoast.h" +#include "access/htup_details.h" +#include "access/itup.h" +#include "access/toast_internals.h" + +/* + * This enables de-toasting of index entries. Needed until VACUUM is + * smart enough to rebuild indexes from scratch. + */ +#define TOAST_INDEX_HACK + +/* ---------------------------------------------------------------- + * index_ tuple interface routines + * ---------------------------------------------------------------- + */ + +/* ---------------- + * index_form_tuple + * + * This shouldn't leak any memory; otherwise, callers such as + * tuplesort_putindextuplevalues() will be very unhappy. + * + * This shouldn't perform external table access provided caller + * does not pass values that are stored EXTERNAL. + * ---------------- + */ +IndexTuple +index_form_tuple(TupleDesc tupleDescriptor, + Datum *values, + bool *isnull) +{ + char *tp; /* tuple pointer */ + IndexTuple tuple; /* return tuple */ + Size size, + data_size, + hoff; + int i; + unsigned short infomask = 0; + bool hasnull = false; + uint16 tupmask = 0; + int numberOfAttributes = tupleDescriptor->natts; + +#ifdef TOAST_INDEX_HACK + Datum untoasted_values[INDEX_MAX_KEYS]; + bool untoasted_free[INDEX_MAX_KEYS]; +#endif + + if (numberOfAttributes > INDEX_MAX_KEYS) + ereport(ERROR, + (errcode(ERRCODE_TOO_MANY_COLUMNS), + errmsg("number of index columns (%d) exceeds limit (%d)", + numberOfAttributes, INDEX_MAX_KEYS))); + +#ifdef TOAST_INDEX_HACK + for (i = 0; i < numberOfAttributes; i++) + { + Form_pg_attribute att = TupleDescAttr(tupleDescriptor, i); + + untoasted_values[i] = values[i]; + untoasted_free[i] = false; + + /* Do nothing if value is NULL or not of varlena type */ + if (isnull[i] || att->attlen != -1) + continue; + + /* + * If value is stored EXTERNAL, must fetch it so we are not depending + * on outside storage. This should be improved someday. + */ + if (VARATT_IS_EXTERNAL(DatumGetPointer(values[i]))) + { + untoasted_values[i] = + PointerGetDatum(detoast_external_attr((struct varlena *) + DatumGetPointer(values[i]))); + untoasted_free[i] = true; + } + + /* + * If value is above size target, and is of a compressible datatype, + * try to compress it in-line. + */ + if (!VARATT_IS_EXTENDED(DatumGetPointer(untoasted_values[i])) && + VARSIZE(DatumGetPointer(untoasted_values[i])) > TOAST_INDEX_TARGET && + (att->attstorage == TYPSTORAGE_EXTENDED || + att->attstorage == TYPSTORAGE_MAIN)) + { + Datum cvalue; + + cvalue = toast_compress_datum(untoasted_values[i], + att->attcompression); + + if (DatumGetPointer(cvalue) != NULL) + { + /* successful compression */ + if (untoasted_free[i]) + pfree(DatumGetPointer(untoasted_values[i])); + untoasted_values[i] = cvalue; + untoasted_free[i] = true; + } + } + } +#endif + + for (i = 0; i < numberOfAttributes; i++) + { + if (isnull[i]) + { + hasnull = true; + break; + } + } + + if (hasnull) + infomask |= INDEX_NULL_MASK; + + hoff = IndexInfoFindDataOffset(infomask); +#ifdef TOAST_INDEX_HACK + data_size = heap_compute_data_size(tupleDescriptor, + untoasted_values, isnull); +#else + data_size = heap_compute_data_size(tupleDescriptor, + values, isnull); +#endif + size = hoff + data_size; + size = MAXALIGN(size); /* be conservative */ + + tp = (char *) palloc0(size); + tuple = (IndexTuple) tp; + + heap_fill_tuple(tupleDescriptor, +#ifdef TOAST_INDEX_HACK + untoasted_values, +#else + values, +#endif + isnull, + (char *) tp + hoff, + data_size, + &tupmask, + (hasnull ? (bits8 *) tp + sizeof(IndexTupleData) : NULL)); + +#ifdef TOAST_INDEX_HACK + for (i = 0; i < numberOfAttributes; i++) + { + if (untoasted_free[i]) + pfree(DatumGetPointer(untoasted_values[i])); + } +#endif + + /* + * We do this because heap_fill_tuple wants to initialize a "tupmask" + * which is used for HeapTuples, but we want an indextuple infomask. The + * only relevant info is the "has variable attributes" field. We have + * already set the hasnull bit above. + */ + if (tupmask & HEAP_HASVARWIDTH) + infomask |= INDEX_VAR_MASK; + + /* Also assert we got rid of external attributes */ +#ifdef TOAST_INDEX_HACK + Assert((tupmask & HEAP_HASEXTERNAL) == 0); +#endif + + /* + * Here we make sure that the size will fit in the field reserved for it + * in t_info. + */ + if ((size & INDEX_SIZE_MASK) != size) + ereport(ERROR, + (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED), + errmsg("index row requires %zu bytes, maximum size is %zu", + size, (Size) INDEX_SIZE_MASK))); + + infomask |= size; + + /* + * initialize metadata + */ + tuple->t_info = infomask; + return tuple; +} + +/* ---------------- + * nocache_index_getattr + * + * This gets called from index_getattr() macro, and only in cases + * where we can't use cacheoffset and the value is not null. + * + * This caches attribute offsets in the attribute descriptor. + * + * An alternative way to speed things up would be to cache offsets + * with the tuple, but that seems more difficult unless you take + * the storage hit of actually putting those offsets into the + * tuple you send to disk. Yuck. + * + * This scheme will be slightly slower than that, but should + * perform well for queries which hit large #'s of tuples. After + * you cache the offsets once, examining all the other tuples using + * the same attribute descriptor will go much quicker. -cim 5/4/91 + * ---------------- + */ +Datum +nocache_index_getattr(IndexTuple tup, + int attnum, + TupleDesc tupleDesc) +{ + char *tp; /* ptr to data part of tuple */ + bits8 *bp = NULL; /* ptr to null bitmap in tuple */ + bool slow = false; /* do we have to walk attrs? */ + int data_off; /* tuple data offset */ + int off; /* current offset within data */ + + /* ---------------- + * Three cases: + * + * 1: No nulls and no variable-width attributes. + * 2: Has a null or a var-width AFTER att. + * 3: Has nulls or var-widths BEFORE att. + * ---------------- + */ + + data_off = IndexInfoFindDataOffset(tup->t_info); + + attnum--; + + if (IndexTupleHasNulls(tup)) + { + /* + * there's a null somewhere in the tuple + * + * check to see if desired att is null + */ + + /* XXX "knows" t_bits are just after fixed tuple header! */ + bp = (bits8 *) ((char *) tup + sizeof(IndexTupleData)); + + /* + * Now check to see if any preceding bits are null... + */ + { + int byte = attnum >> 3; + int finalbit = attnum & 0x07; + + /* check for nulls "before" final bit of last byte */ + if ((~bp[byte]) & ((1 << finalbit) - 1)) + slow = true; + else + { + /* check for nulls in any "earlier" bytes */ + int i; + + for (i = 0; i < byte; i++) + { + if (bp[i] != 0xFF) + { + slow = true; + break; + } + } + } + } + } + + tp = (char *) tup + data_off; + + if (!slow) + { + Form_pg_attribute att; + + /* + * If we get here, there are no nulls up to and including the target + * attribute. If we have a cached offset, we can use it. + */ + att = TupleDescAttr(tupleDesc, attnum); + if (att->attcacheoff >= 0) + return fetchatt(att, tp + att->attcacheoff); + + /* + * Otherwise, check for non-fixed-length attrs up to and including + * target. If there aren't any, it's safe to cheaply initialize the + * cached offsets for these attrs. + */ + if (IndexTupleHasVarwidths(tup)) + { + int j; + + for (j = 0; j <= attnum; j++) + { + if (TupleDescAttr(tupleDesc, j)->attlen <= 0) + { + slow = true; + break; + } + } + } + } + + if (!slow) + { + int natts = tupleDesc->natts; + int j = 1; + + /* + * If we get here, we have a tuple with no nulls or var-widths up to + * and including the target attribute, so we can use the cached offset + * ... only we don't have it yet, or we'd not have got here. Since + * it's cheap to compute offsets for fixed-width columns, we take the + * opportunity to initialize the cached offsets for *all* the leading + * fixed-width columns, in hope of avoiding future visits to this + * routine. + */ + TupleDescAttr(tupleDesc, 0)->attcacheoff = 0; + + /* we might have set some offsets in the slow path previously */ + while (j < natts && TupleDescAttr(tupleDesc, j)->attcacheoff > 0) + j++; + + off = TupleDescAttr(tupleDesc, j - 1)->attcacheoff + + TupleDescAttr(tupleDesc, j - 1)->attlen; + + for (; j < natts; j++) + { + Form_pg_attribute att = TupleDescAttr(tupleDesc, j); + + if (att->attlen <= 0) + break; + + off = att_align_nominal(off, att->attalign); + + att->attcacheoff = off; + + off += att->attlen; + } + + Assert(j > attnum); + + off = TupleDescAttr(tupleDesc, attnum)->attcacheoff; + } + else + { + bool usecache = true; + int i; + + /* + * Now we know that we have to walk the tuple CAREFULLY. But we still + * might be able to cache some offsets for next time. + * + * Note - This loop is a little tricky. For each non-null attribute, + * we have to first account for alignment padding before the attr, + * then advance over the attr based on its length. Nulls have no + * storage and no alignment padding either. We can use/set + * attcacheoff until we reach either a null or a var-width attribute. + */ + off = 0; + for (i = 0;; i++) /* loop exit is at "break" */ + { + Form_pg_attribute att = TupleDescAttr(tupleDesc, i); + + if (IndexTupleHasNulls(tup) && att_isnull(i, bp)) + { + usecache = false; + continue; /* this cannot be the target att */ + } + + /* If we know the next offset, we can skip the rest */ + if (usecache && att->attcacheoff >= 0) + off = att->attcacheoff; + else if (att->attlen == -1) + { + /* + * We can only cache the offset for a varlena attribute if the + * offset is already suitably aligned, so that there would be + * no pad bytes in any case: then the offset will be valid for + * either an aligned or unaligned value. + */ + if (usecache && + off == att_align_nominal(off, att->attalign)) + att->attcacheoff = off; + else + { + off = att_align_pointer(off, att->attalign, -1, + tp + off); + usecache = false; + } + } + else + { + /* not varlena, so safe to use att_align_nominal */ + off = att_align_nominal(off, att->attalign); + + if (usecache) + att->attcacheoff = off; + } + + if (i == attnum) + break; + + off = att_addlength_pointer(off, att->attlen, tp + off); + + if (usecache && att->attlen <= 0) + usecache = false; + } + } + + return fetchatt(TupleDescAttr(tupleDesc, attnum), tp + off); +} + +/* + * Convert an index tuple into Datum/isnull arrays. + * + * The caller must allocate sufficient storage for the output arrays. + * (INDEX_MAX_KEYS entries should be enough.) + * + * This is nearly the same as heap_deform_tuple(), but for IndexTuples. + * One difference is that the tuple should never have any missing columns. + */ +void +index_deform_tuple(IndexTuple tup, TupleDesc tupleDescriptor, + Datum *values, bool *isnull) +{ + char *tp; /* ptr to tuple data */ + bits8 *bp; /* ptr to null bitmap in tuple */ + + /* XXX "knows" t_bits are just after fixed tuple header! */ + bp = (bits8 *) ((char *) tup + sizeof(IndexTupleData)); + + tp = (char *) tup + IndexInfoFindDataOffset(tup->t_info); + + index_deform_tuple_internal(tupleDescriptor, values, isnull, + tp, bp, IndexTupleHasNulls(tup)); +} + +/* + * Convert an index tuple into Datum/isnull arrays, + * without assuming any specific layout of the index tuple header. + * + * Caller must supply pointer to data area, pointer to nulls bitmap + * (which can be NULL if !hasnulls), and hasnulls flag. + */ +void +index_deform_tuple_internal(TupleDesc tupleDescriptor, + Datum *values, bool *isnull, + char *tp, bits8 *bp, int hasnulls) +{ + int natts = tupleDescriptor->natts; /* number of atts to extract */ + int attnum; + int off = 0; /* offset in tuple data */ + bool slow = false; /* can we use/set attcacheoff? */ + + /* Assert to protect callers who allocate fixed-size arrays */ + Assert(natts <= INDEX_MAX_KEYS); + + for (attnum = 0; attnum < natts; attnum++) + { + Form_pg_attribute thisatt = TupleDescAttr(tupleDescriptor, attnum); + + if (hasnulls && att_isnull(attnum, bp)) + { + values[attnum] = (Datum) 0; + isnull[attnum] = true; + slow = true; /* can't use attcacheoff anymore */ + continue; + } + + isnull[attnum] = false; + + if (!slow && thisatt->attcacheoff >= 0) + off = thisatt->attcacheoff; + else if (thisatt->attlen == -1) + { + /* + * We can only cache the offset for a varlena attribute if the + * offset is already suitably aligned, so that there would be no + * pad bytes in any case: then the offset will be valid for either + * an aligned or unaligned value. + */ + if (!slow && + off == att_align_nominal(off, thisatt->attalign)) + thisatt->attcacheoff = off; + else + { + off = att_align_pointer(off, thisatt->attalign, -1, + tp + off); + slow = true; + } + } + else + { + /* not varlena, so safe to use att_align_nominal */ + off = att_align_nominal(off, thisatt->attalign); + + if (!slow) + thisatt->attcacheoff = off; + } + + values[attnum] = fetchatt(thisatt, tp + off); + + off = att_addlength_pointer(off, thisatt->attlen, tp + off); + + if (thisatt->attlen <= 0) + slow = true; /* can't use attcacheoff anymore */ + } +} + +/* + * Create a palloc'd copy of an index tuple. + */ +IndexTuple +CopyIndexTuple(IndexTuple source) +{ + IndexTuple result; + Size size; + + size = IndexTupleSize(source); + result = (IndexTuple) palloc(size); + memcpy(result, source, size); + return result; +} + +/* + * Create a palloc'd copy of an index tuple, leaving only the first + * leavenatts attributes remaining. + * + * Truncation is guaranteed to result in an index tuple that is no + * larger than the original. It is safe to use the IndexTuple with + * the original tuple descriptor, but caller must avoid actually + * accessing truncated attributes from returned tuple! In practice + * this means that index_getattr() must be called with special care, + * and that the truncated tuple should only ever be accessed by code + * under caller's direct control. + * + * It's safe to call this function with a buffer lock held, since it + * never performs external table access. If it ever became possible + * for index tuples to contain EXTERNAL TOAST values, then this would + * have to be revisited. + */ +IndexTuple +index_truncate_tuple(TupleDesc sourceDescriptor, IndexTuple source, + int leavenatts) +{ + TupleDesc truncdesc; + Datum values[INDEX_MAX_KEYS]; + bool isnull[INDEX_MAX_KEYS]; + IndexTuple truncated; + + Assert(leavenatts <= sourceDescriptor->natts); + + /* Easy case: no truncation actually required */ + if (leavenatts == sourceDescriptor->natts) + return CopyIndexTuple(source); + + /* Create temporary descriptor to scribble on */ + truncdesc = palloc(TupleDescSize(sourceDescriptor)); + TupleDescCopy(truncdesc, sourceDescriptor); + truncdesc->natts = leavenatts; + + /* Deform, form copy of tuple with fewer attributes */ + index_deform_tuple(source, truncdesc, values, isnull); + truncated = index_form_tuple(truncdesc, values, isnull); + truncated->t_tid = source->t_tid; + Assert(IndexTupleSize(truncated) <= IndexTupleSize(source)); + + /* + * Cannot leak memory here, TupleDescCopy() doesn't allocate any inner + * structure, so, plain pfree() should clean all allocated memory + */ + pfree(truncdesc); + + return truncated; +} diff --git a/src/backend/access/common/printsimple.c b/src/backend/access/common/printsimple.c new file mode 100644 index 0000000..93c3c4f --- /dev/null +++ b/src/backend/access/common/printsimple.c @@ -0,0 +1,132 @@ +/*------------------------------------------------------------------------- + * + * printsimple.c + * Routines to print out tuples containing only a limited range of + * builtin types without catalog access. This is intended for + * backends that don't have catalog access because they are not bound + * to a specific database, such as some walsender processes. It + * doesn't handle standalone backends or protocol versions other than + * 3.0, because we don't need such handling for current applications. + * + * Portions Copyright (c) 1996-2021, PostgreSQL Global Development Group + * Portions Copyright (c) 1994, Regents of the University of California + * + * IDENTIFICATION + * src/backend/access/common/printsimple.c + * + *------------------------------------------------------------------------- + */ +#include "postgres.h" + +#include "access/printsimple.h" +#include "catalog/pg_type.h" +#include "libpq/pqformat.h" +#include "utils/builtins.h" + +/* + * At startup time, send a RowDescription message. + */ +void +printsimple_startup(DestReceiver *self, int operation, TupleDesc tupdesc) +{ + StringInfoData buf; + int i; + + pq_beginmessage(&buf, 'T'); /* RowDescription */ + pq_sendint16(&buf, tupdesc->natts); + + for (i = 0; i < tupdesc->natts; ++i) + { + Form_pg_attribute attr = TupleDescAttr(tupdesc, i); + + pq_sendstring(&buf, NameStr(attr->attname)); + pq_sendint32(&buf, 0); /* table oid */ + pq_sendint16(&buf, 0); /* attnum */ + pq_sendint32(&buf, (int) attr->atttypid); + pq_sendint16(&buf, attr->attlen); + pq_sendint32(&buf, attr->atttypmod); + pq_sendint16(&buf, 0); /* format code */ + } + + pq_endmessage(&buf); +} + +/* + * For each tuple, send a DataRow message. + */ +bool +printsimple(TupleTableSlot *slot, DestReceiver *self) +{ + TupleDesc tupdesc = slot->tts_tupleDescriptor; + StringInfoData buf; + int i; + + /* Make sure the tuple is fully deconstructed */ + slot_getallattrs(slot); + + /* Prepare and send message */ + pq_beginmessage(&buf, 'D'); + pq_sendint16(&buf, tupdesc->natts); + + for (i = 0; i < tupdesc->natts; ++i) + { + Form_pg_attribute attr = TupleDescAttr(tupdesc, i); + Datum value; + + if (slot->tts_isnull[i]) + { + pq_sendint32(&buf, -1); + continue; + } + + value = slot->tts_values[i]; + + /* + * We can't call the regular type output functions here because we + * might not have catalog access. Instead, we must hard-wire + * knowledge of the required types. + */ + switch (attr->atttypid) + { + case TEXTOID: + { + text *t = DatumGetTextPP(value); + + pq_sendcountedtext(&buf, + VARDATA_ANY(t), + VARSIZE_ANY_EXHDR(t), + false); + } + break; + + case INT4OID: + { + int32 num = DatumGetInt32(value); + char str[12]; /* sign, 10 digits and '\0' */ + int len; + + len = pg_ltoa(num, str); + pq_sendcountedtext(&buf, str, len, false); + } + break; + + case INT8OID: + { + int64 num = DatumGetInt64(value); + char str[MAXINT8LEN + 1]; + int len; + + len = pg_lltoa(num, str); + pq_sendcountedtext(&buf, str, len, false); + } + break; + + default: + elog(ERROR, "unsupported type OID: %u", attr->atttypid); + } + } + + pq_endmessage(&buf); + + return true; +} diff --git a/src/backend/access/common/printtup.c b/src/backend/access/common/printtup.c new file mode 100644 index 0000000..54b539f --- /dev/null +++ b/src/backend/access/common/printtup.c @@ -0,0 +1,485 @@ +/*------------------------------------------------------------------------- + * + * printtup.c + * Routines to print out tuples to the destination (both frontend + * clients and standalone backends are supported here). + * + * + * Portions Copyright (c) 1996-2021, PostgreSQL Global Development Group + * Portions Copyright (c) 1994, Regents of the University of California + * + * IDENTIFICATION + * src/backend/access/common/printtup.c + * + *------------------------------------------------------------------------- + */ +#include "postgres.h" + +#include "access/printtup.h" +#include "libpq/libpq.h" +#include "libpq/pqformat.h" +#include "tcop/pquery.h" +#include "utils/lsyscache.h" +#include "utils/memdebug.h" +#include "utils/memutils.h" + + +static void printtup_startup(DestReceiver *self, int operation, + TupleDesc typeinfo); +static bool printtup(TupleTableSlot *slot, DestReceiver *self); +static void printtup_shutdown(DestReceiver *self); +static void printtup_destroy(DestReceiver *self); + +/* ---------------------------------------------------------------- + * printtup / debugtup support + * ---------------------------------------------------------------- + */ + +/* ---------------- + * Private state for a printtup destination object + * + * NOTE: finfo is the lookup info for either typoutput or typsend, whichever + * we are using for this column. + * ---------------- + */ +typedef struct +{ /* Per-attribute information */ + Oid typoutput; /* Oid for the type's text output fn */ + Oid typsend; /* Oid for the type's binary output fn */ + bool typisvarlena; /* is it varlena (ie possibly toastable)? */ + int16 format; /* format code for this column */ + FmgrInfo finfo; /* Precomputed call info for output fn */ +} PrinttupAttrInfo; + +typedef struct +{ + DestReceiver pub; /* publicly-known function pointers */ + Portal portal; /* the Portal we are printing from */ + bool sendDescrip; /* send RowDescription at startup? */ + TupleDesc attrinfo; /* The attr info we are set up for */ + int nattrs; + PrinttupAttrInfo *myinfo; /* Cached info about each attr */ + StringInfoData buf; /* output buffer (*not* in tmpcontext) */ + MemoryContext tmpcontext; /* Memory context for per-row workspace */ +} DR_printtup; + +/* ---------------- + * Initialize: create a DestReceiver for printtup + * ---------------- + */ +DestReceiver * +printtup_create_DR(CommandDest dest) +{ + DR_printtup *self = (DR_printtup *) palloc0(sizeof(DR_printtup)); + + self->pub.receiveSlot = printtup; /* might get changed later */ + self->pub.rStartup = printtup_startup; + self->pub.rShutdown = printtup_shutdown; + self->pub.rDestroy = printtup_destroy; + self->pub.mydest = dest; + + /* + * Send T message automatically if DestRemote, but not if + * DestRemoteExecute + */ + self->sendDescrip = (dest == DestRemote); + + self->attrinfo = NULL; + self->nattrs = 0; + self->myinfo = NULL; + self->buf.data = NULL; + self->tmpcontext = NULL; + + return (DestReceiver *) self; +} + +/* + * Set parameters for a DestRemote (or DestRemoteExecute) receiver + */ +void +SetRemoteDestReceiverParams(DestReceiver *self, Portal portal) +{ + DR_printtup *myState = (DR_printtup *) self; + + Assert(myState->pub.mydest == DestRemote || + myState->pub.mydest == DestRemoteExecute); + + myState->portal = portal; +} + +static void +printtup_startup(DestReceiver *self, int operation, TupleDesc typeinfo) +{ + DR_printtup *myState = (DR_printtup *) self; + Portal portal = myState->portal; + + /* + * Create I/O buffer to be used for all messages. This cannot be inside + * tmpcontext, since we want to re-use it across rows. + */ + initStringInfo(&myState->buf); + + /* + * Create a temporary memory context that we can reset once per row to + * recover palloc'd memory. This avoids any problems with leaks inside + * datatype output routines, and should be faster than retail pfree's + * anyway. + */ + myState->tmpcontext = AllocSetContextCreate(CurrentMemoryContext, + "printtup", + ALLOCSET_DEFAULT_SIZES); + + /* + * If we are supposed to emit row descriptions, then send the tuple + * descriptor of the tuples. + */ + if (myState->sendDescrip) + SendRowDescriptionMessage(&myState->buf, + typeinfo, + FetchPortalTargetList(portal), + portal->formats); + + /* ---------------- + * We could set up the derived attr info at this time, but we postpone it + * until the first call of printtup, for 2 reasons: + * 1. We don't waste time (compared to the old way) if there are no + * tuples at all to output. + * 2. Checking in printtup allows us to handle the case that the tuples + * change type midway through (although this probably can't happen in + * the current executor). + * ---------------- + */ +} + +/* + * SendRowDescriptionMessage --- send a RowDescription message to the frontend + * + * Notes: the TupleDesc has typically been manufactured by ExecTypeFromTL() + * or some similar function; it does not contain a full set of fields. + * The targetlist will be NIL when executing a utility function that does + * not have a plan. If the targetlist isn't NIL then it is a Query node's + * targetlist; it is up to us to ignore resjunk columns in it. The formats[] + * array pointer might be NULL (if we are doing Describe on a prepared stmt); + * send zeroes for the format codes in that case. + */ +void +SendRowDescriptionMessage(StringInfo buf, TupleDesc typeinfo, + List *targetlist, int16 *formats) +{ + int natts = typeinfo->natts; + int i; + ListCell *tlist_item = list_head(targetlist); + + /* tuple descriptor message type */ + pq_beginmessage_reuse(buf, 'T'); + /* # of attrs in tuples */ + pq_sendint16(buf, natts); + + /* + * Preallocate memory for the entire message to be sent. That allows to + * use the significantly faster inline pqformat.h functions and to avoid + * reallocations. + * + * Have to overestimate the size of the column-names, to account for + * character set overhead. + */ + enlargeStringInfo(buf, (NAMEDATALEN * MAX_CONVERSION_GROWTH /* attname */ + + sizeof(Oid) /* resorigtbl */ + + sizeof(AttrNumber) /* resorigcol */ + + sizeof(Oid) /* atttypid */ + + sizeof(int16) /* attlen */ + + sizeof(int32) /* attypmod */ + + sizeof(int16) /* format */ + ) * natts); + + for (i = 0; i < natts; ++i) + { + Form_pg_attribute att = TupleDescAttr(typeinfo, i); + Oid atttypid = att->atttypid; + int32 atttypmod = att->atttypmod; + Oid resorigtbl; + AttrNumber resorigcol; + int16 format; + + /* + * If column is a domain, send the base type and typmod instead. + * Lookup before sending any ints, for efficiency. + */ + atttypid = getBaseTypeAndTypmod(atttypid, &atttypmod); + + /* Do we have a non-resjunk tlist item? */ + while (tlist_item && + ((TargetEntry *) lfirst(tlist_item))->resjunk) + tlist_item = lnext(targetlist, tlist_item); + if (tlist_item) + { + TargetEntry *tle = (TargetEntry *) lfirst(tlist_item); + + resorigtbl = tle->resorigtbl; + resorigcol = tle->resorigcol; + tlist_item = lnext(targetlist, tlist_item); + } + else + { + /* No info available, so send zeroes */ + resorigtbl = 0; + resorigcol = 0; + } + + if (formats) + format = formats[i]; + else + format = 0; + + pq_writestring(buf, NameStr(att->attname)); + pq_writeint32(buf, resorigtbl); + pq_writeint16(buf, resorigcol); + pq_writeint32(buf, atttypid); + pq_writeint16(buf, att->attlen); + pq_writeint32(buf, atttypmod); + pq_writeint16(buf, format); + } + + pq_endmessage_reuse(buf); +} + +/* + * Get the lookup info that printtup() needs + */ +static void +printtup_prepare_info(DR_printtup *myState, TupleDesc typeinfo, int numAttrs) +{ + int16 *formats = myState->portal->formats; + int i; + + /* get rid of any old data */ + if (myState->myinfo) + pfree(myState->myinfo); + myState->myinfo = NULL; + + myState->attrinfo = typeinfo; + myState->nattrs = numAttrs; + if (numAttrs <= 0) + return; + + myState->myinfo = (PrinttupAttrInfo *) + palloc0(numAttrs * sizeof(PrinttupAttrInfo)); + + for (i = 0; i < numAttrs; i++) + { + PrinttupAttrInfo *thisState = myState->myinfo + i; + int16 format = (formats ? formats[i] : 0); + Form_pg_attribute attr = TupleDescAttr(typeinfo, i); + + thisState->format = format; + if (format == 0) + { + getTypeOutputInfo(attr->atttypid, + &thisState->typoutput, + &thisState->typisvarlena); + fmgr_info(thisState->typoutput, &thisState->finfo); + } + else if (format == 1) + { + getTypeBinaryOutputInfo(attr->atttypid, + &thisState->typsend, + &thisState->typisvarlena); + fmgr_info(thisState->typsend, &thisState->finfo); + } + else + ereport(ERROR, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("unsupported format code: %d", format))); + } +} + +/* ---------------- + * printtup --- send a tuple to the client + * ---------------- + */ +static bool +printtup(TupleTableSlot *slot, DestReceiver *self) +{ + TupleDesc typeinfo = slot->tts_tupleDescriptor; + DR_printtup *myState = (DR_printtup *) self; + MemoryContext oldcontext; + StringInfo buf = &myState->buf; + int natts = typeinfo->natts; + int i; + + /* Set or update my derived attribute info, if needed */ + if (myState->attrinfo != typeinfo || myState->nattrs != natts) + printtup_prepare_info(myState, typeinfo, natts); + + /* Make sure the tuple is fully deconstructed */ + slot_getallattrs(slot); + + /* Switch into per-row context so we can recover memory below */ + oldcontext = MemoryContextSwitchTo(myState->tmpcontext); + + /* + * Prepare a DataRow message (note buffer is in per-row context) + */ + pq_beginmessage_reuse(buf, 'D'); + + pq_sendint16(buf, natts); + + /* + * send the attributes of this tuple + */ + for (i = 0; i < natts; ++i) + { + PrinttupAttrInfo *thisState = myState->myinfo + i; + Datum attr = slot->tts_values[i]; + + if (slot->tts_isnull[i]) + { + pq_sendint32(buf, -1); + continue; + } + + /* + * Here we catch undefined bytes in datums that are returned to the + * client without hitting disk; see comments at the related check in + * PageAddItem(). This test is most useful for uncompressed, + * non-external datums, but we're quite likely to see such here when + * testing new C functions. + */ + if (thisState->typisvarlena) + VALGRIND_CHECK_MEM_IS_DEFINED(DatumGetPointer(attr), + VARSIZE_ANY(attr)); + + if (thisState->format == 0) + { + /* Text output */ + char *outputstr; + + outputstr = OutputFunctionCall(&thisState->finfo, attr); + pq_sendcountedtext(buf, outputstr, strlen(outputstr), false); + } + else + { + /* Binary output */ + bytea *outputbytes; + + outputbytes = SendFunctionCall(&thisState->finfo, attr); + pq_sendint32(buf, VARSIZE(outputbytes) - VARHDRSZ); + pq_sendbytes(buf, VARDATA(outputbytes), + VARSIZE(outputbytes) - VARHDRSZ); + } + } + + pq_endmessage_reuse(buf); + + /* Return to caller's context, and flush row's temporary memory */ + MemoryContextSwitchTo(oldcontext); + MemoryContextReset(myState->tmpcontext); + + return true; +} + +/* ---------------- + * printtup_shutdown + * ---------------- + */ +static void +printtup_shutdown(DestReceiver *self) +{ + DR_printtup *myState = (DR_printtup *) self; + + if (myState->myinfo) + pfree(myState->myinfo); + myState->myinfo = NULL; + + myState->attrinfo = NULL; + + if (myState->buf.data) + pfree(myState->buf.data); + myState->buf.data = NULL; + + if (myState->tmpcontext) + MemoryContextDelete(myState->tmpcontext); + myState->tmpcontext = NULL; +} + +/* ---------------- + * printtup_destroy + * ---------------- + */ +static void +printtup_destroy(DestReceiver *self) +{ + pfree(self); +} + +/* ---------------- + * printatt + * ---------------- + */ +static void +printatt(unsigned attributeId, + Form_pg_attribute attributeP, + char *value) +{ + printf("\t%2d: %s%s%s%s\t(typeid = %u, len = %d, typmod = %d, byval = %c)\n", + attributeId, + NameStr(attributeP->attname), + value != NULL ? " = \"" : "", + value != NULL ? value : "", + value != NULL ? "\"" : "", + (unsigned int) (attributeP->atttypid), + attributeP->attlen, + attributeP->atttypmod, + attributeP->attbyval ? 't' : 'f'); +} + +/* ---------------- + * debugStartup - prepare to print tuples for an interactive backend + * ---------------- + */ +void +debugStartup(DestReceiver *self, int operation, TupleDesc typeinfo) +{ + int natts = typeinfo->natts; + int i; + + /* + * show the return type of the tuples + */ + for (i = 0; i < natts; ++i) + printatt((unsigned) i + 1, TupleDescAttr(typeinfo, i), NULL); + printf("\t----\n"); +} + +/* ---------------- + * debugtup - print one tuple for an interactive backend + * ---------------- + */ +bool +debugtup(TupleTableSlot *slot, DestReceiver *self) +{ + TupleDesc typeinfo = slot->tts_tupleDescriptor; + int natts = typeinfo->natts; + int i; + Datum attr; + char *value; + bool isnull; + Oid typoutput; + bool typisvarlena; + + for (i = 0; i < natts; ++i) + { + attr = slot_getattr(slot, i + 1, &isnull); + if (isnull) + continue; + getTypeOutputInfo(TupleDescAttr(typeinfo, i)->atttypid, + &typoutput, &typisvarlena); + + value = OidOutputFunctionCall(typoutput, attr); + + printatt((unsigned) i + 1, TupleDescAttr(typeinfo, i), value); + } + printf("\t----\n"); + + return true; +} diff --git a/src/backend/access/common/relation.c b/src/backend/access/common/relation.c new file mode 100644 index 0000000..632d13c --- /dev/null +++ b/src/backend/access/common/relation.c @@ -0,0 +1,217 @@ +/*------------------------------------------------------------------------- + * + * relation.c + * Generic relation related routines. + * + * Portions Copyright (c) 1996-2021, PostgreSQL Global Development Group + * Portions Copyright (c) 1994, Regents of the University of California + * + * + * IDENTIFICATION + * src/backend/access/common/relation.c + * + * NOTES + * This file contains relation_ routines that implement access to relations + * (tables, indexes, etc). Support that's specific to subtypes of relations + * should go into their respective files, not here. + * + *------------------------------------------------------------------------- + */ + +#include "postgres.h" + +#include "access/relation.h" +#include "access/xact.h" +#include "catalog/namespace.h" +#include "miscadmin.h" +#include "pgstat.h" +#include "storage/lmgr.h" +#include "utils/inval.h" +#include "utils/syscache.h" + + +/* ---------------- + * relation_open - open any relation by relation OID + * + * If lockmode is not "NoLock", the specified kind of lock is + * obtained on the relation. (Generally, NoLock should only be + * used if the caller knows it has some appropriate lock on the + * relation already.) + * + * An error is raised if the relation does not exist. + * + * NB: a "relation" is anything with a pg_class entry. The caller is + * expected to check whether the relkind is something it can handle. + * ---------------- + */ +Relation +relation_open(Oid relationId, LOCKMODE lockmode) +{ + Relation r; + + Assert(lockmode >= NoLock && lockmode < MAX_LOCKMODES); + + /* Get the lock before trying to open the relcache entry */ + if (lockmode != NoLock) + LockRelationOid(relationId, lockmode); + + /* The relcache does all the real work... */ + r = RelationIdGetRelation(relationId); + + if (!RelationIsValid(r)) + elog(ERROR, "could not open relation with OID %u", relationId); + + /* + * If we didn't get the lock ourselves, assert that caller holds one, + * except in bootstrap mode where no locks are used. + */ + Assert(lockmode != NoLock || + IsBootstrapProcessingMode() || + CheckRelationLockedByMe(r, AccessShareLock, true)); + + /* Make note that we've accessed a temporary relation */ + if (RelationUsesLocalBuffers(r)) + MyXactFlags |= XACT_FLAGS_ACCESSEDTEMPNAMESPACE; + + pgstat_initstats(r); + + return r; +} + +/* ---------------- + * try_relation_open - open any relation by relation OID + * + * Same as relation_open, except return NULL instead of failing + * if the relation does not exist. + * ---------------- + */ +Relation +try_relation_open(Oid relationId, LOCKMODE lockmode) +{ + Relation r; + + Assert(lockmode >= NoLock && lockmode < MAX_LOCKMODES); + + /* Get the lock first */ + if (lockmode != NoLock) + LockRelationOid(relationId, lockmode); + + /* + * Now that we have the lock, probe to see if the relation really exists + * or not. + */ + if (!SearchSysCacheExists1(RELOID, ObjectIdGetDatum(relationId))) + { + /* Release useless lock */ + if (lockmode != NoLock) + UnlockRelationOid(relationId, lockmode); + + return NULL; + } + + /* Should be safe to do a relcache load */ + r = RelationIdGetRelation(relationId); + + if (!RelationIsValid(r)) + elog(ERROR, "could not open relation with OID %u", relationId); + + /* If we didn't get the lock ourselves, assert that caller holds one */ + Assert(lockmode != NoLock || + CheckRelationLockedByMe(r, AccessShareLock, true)); + + /* Make note that we've accessed a temporary relation */ + if (RelationUsesLocalBuffers(r)) + MyXactFlags |= XACT_FLAGS_ACCESSEDTEMPNAMESPACE; + + pgstat_initstats(r); + + return r; +} + +/* ---------------- + * relation_openrv - open any relation specified by a RangeVar + * + * Same as relation_open, but the relation is specified by a RangeVar. + * ---------------- + */ +Relation +relation_openrv(const RangeVar *relation, LOCKMODE lockmode) +{ + Oid relOid; + + /* + * Check for shared-cache-inval messages before trying to open the + * relation. This is needed even if we already hold a lock on the + * relation, because GRANT/REVOKE are executed without taking any lock on + * the target relation, and we want to be sure we see current ACL + * information. We can skip this if asked for NoLock, on the assumption + * that such a call is not the first one in the current command, and so we + * should be reasonably up-to-date already. (XXX this all could stand to + * be redesigned, but for the moment we'll keep doing this like it's been + * done historically.) + */ + if (lockmode != NoLock) + AcceptInvalidationMessages(); + + /* Look up and lock the appropriate relation using namespace search */ + relOid = RangeVarGetRelid(relation, lockmode, false); + + /* Let relation_open do the rest */ + return relation_open(relOid, NoLock); +} + +/* ---------------- + * relation_openrv_extended - open any relation specified by a RangeVar + * + * Same as relation_openrv, but with an additional missing_ok argument + * allowing a NULL return rather than an error if the relation is not + * found. (Note that some other causes, such as permissions problems, + * will still result in an ereport.) + * ---------------- + */ +Relation +relation_openrv_extended(const RangeVar *relation, LOCKMODE lockmode, + bool missing_ok) +{ + Oid relOid; + + /* + * Check for shared-cache-inval messages before trying to open the + * relation. See comments in relation_openrv(). + */ + if (lockmode != NoLock) + AcceptInvalidationMessages(); + + /* Look up and lock the appropriate relation using namespace search */ + relOid = RangeVarGetRelid(relation, lockmode, missing_ok); + + /* Return NULL on not-found */ + if (!OidIsValid(relOid)) + return NULL; + + /* Let relation_open do the rest */ + return relation_open(relOid, NoLock); +} + +/* ---------------- + * relation_close - close any relation + * + * If lockmode is not "NoLock", we then release the specified lock. + * + * Note that it is often sensible to hold a lock beyond relation_close; + * in that case, the lock is released automatically at xact end. + * ---------------- + */ +void +relation_close(Relation relation, LOCKMODE lockmode) +{ + LockRelId relid = relation->rd_lockInfo.lockRelId; + + Assert(lockmode >= NoLock && lockmode < MAX_LOCKMODES); + + /* The relcache does the real work... */ + RelationClose(relation); + + if (lockmode != NoLock) + UnlockRelationId(&relid, lockmode); +} diff --git a/src/backend/access/common/reloptions.c b/src/backend/access/common/reloptions.c new file mode 100644 index 0000000..b5602f5 --- /dev/null +++ b/src/backend/access/common/reloptions.c @@ -0,0 +1,2131 @@ +/*------------------------------------------------------------------------- + * + * reloptions.c + * Core support for relation options (pg_class.reloptions) + * + * Portions Copyright (c) 1996-2021, PostgreSQL Global Development Group + * Portions Copyright (c) 1994, Regents of the University of California + * + * + * IDENTIFICATION + * src/backend/access/common/reloptions.c + * + *------------------------------------------------------------------------- + */ + +#include "postgres.h" + +#include <float.h> + +#include "access/gist_private.h" +#include "access/hash.h" +#include "access/heaptoast.h" +#include "access/htup_details.h" +#include "access/nbtree.h" +#include "access/reloptions.h" +#include "access/spgist_private.h" +#include "catalog/pg_type.h" +#include "commands/defrem.h" +#include "commands/tablespace.h" +#include "commands/view.h" +#include "nodes/makefuncs.h" +#include "postmaster/postmaster.h" +#include "utils/array.h" +#include "utils/attoptcache.h" +#include "utils/builtins.h" +#include "utils/guc.h" +#include "utils/memutils.h" +#include "utils/rel.h" + +/* + * Contents of pg_class.reloptions + * + * To add an option: + * + * (i) decide on a type (integer, real, bool, string), name, default value, + * upper and lower bounds (if applicable); for strings, consider a validation + * routine. + * (ii) add a record below (or use add_<type>_reloption). + * (iii) add it to the appropriate options struct (perhaps StdRdOptions) + * (iv) add it to the appropriate handling routine (perhaps + * default_reloptions) + * (v) make sure the lock level is set correctly for that operation + * (vi) don't forget to document the option + * + * The default choice for any new option should be AccessExclusiveLock. + * In some cases the lock level can be reduced from there, but the lock + * level chosen should always conflict with itself to ensure that multiple + * changes aren't lost when we attempt concurrent changes. + * The choice of lock level depends completely upon how that parameter + * is used within the server, not upon how and when you'd like to change it. + * Safety first. Existing choices are documented here, and elsewhere in + * backend code where the parameters are used. + * + * In general, anything that affects the results obtained from a SELECT must be + * protected by AccessExclusiveLock. + * + * Autovacuum related parameters can be set at ShareUpdateExclusiveLock + * since they are only used by the AV procs and don't change anything + * currently executing. + * + * Fillfactor can be set because it applies only to subsequent changes made to + * data blocks, as documented in hio.c + * + * n_distinct options can be set at ShareUpdateExclusiveLock because they + * are only used during ANALYZE, which uses a ShareUpdateExclusiveLock, + * so the ANALYZE will not be affected by in-flight changes. Changing those + * values has no effect until the next ANALYZE, so no need for stronger lock. + * + * Planner-related parameters can be set with ShareUpdateExclusiveLock because + * they only affect planning and not the correctness of the execution. Plans + * cannot be changed in mid-flight, so changes here could not easily result in + * new improved plans in any case. So we allow existing queries to continue + * and existing plans to survive, a small price to pay for allowing better + * plans to be introduced concurrently without interfering with users. + * + * Setting parallel_workers is safe, since it acts the same as + * max_parallel_workers_per_gather which is a USERSET parameter that doesn't + * affect existing plans or queries. + * + * vacuum_truncate can be set at ShareUpdateExclusiveLock because it + * is only used during VACUUM, which uses a ShareUpdateExclusiveLock, + * so the VACUUM will not be affected by in-flight changes. Changing its + * value has no effect until the next VACUUM, so no need for stronger lock. + */ + +static relopt_bool boolRelOpts[] = +{ + { + { + "autosummarize", + "Enables automatic summarization on this BRIN index", + RELOPT_KIND_BRIN, + AccessExclusiveLock + }, + false + }, + { + { + "autovacuum_enabled", + "Enables autovacuum in this relation", + RELOPT_KIND_HEAP | RELOPT_KIND_TOAST, + ShareUpdateExclusiveLock + }, + true + }, + { + { + "user_catalog_table", + "Declare a table as an additional catalog table, e.g. for the purpose of logical replication", + RELOPT_KIND_HEAP, + AccessExclusiveLock + }, + false + }, + { + { + "fastupdate", + "Enables \"fast update\" feature for this GIN index", + RELOPT_KIND_GIN, + AccessExclusiveLock + }, + true + }, + { + { + "security_barrier", + "View acts as a row security barrier", + RELOPT_KIND_VIEW, + AccessExclusiveLock + }, + false + }, + { + { + "vacuum_truncate", + "Enables vacuum to truncate empty pages at the end of this table", + RELOPT_KIND_HEAP | RELOPT_KIND_TOAST, + ShareUpdateExclusiveLock + }, + true + }, + { + { + "deduplicate_items", + "Enables \"deduplicate items\" feature for this btree index", + RELOPT_KIND_BTREE, + ShareUpdateExclusiveLock /* since it applies only to later + * inserts */ + }, + true + }, + /* list terminator */ + {{NULL}} +}; + +static relopt_int intRelOpts[] = +{ + { + { + "fillfactor", + "Packs table pages only to this percentage", + RELOPT_KIND_HEAP, + ShareUpdateExclusiveLock /* since it applies only to later + * inserts */ + }, + HEAP_DEFAULT_FILLFACTOR, HEAP_MIN_FILLFACTOR, 100 + }, + { + { + "fillfactor", + "Packs btree index pages only to this percentage", + RELOPT_KIND_BTREE, + ShareUpdateExclusiveLock /* since it applies only to later + * inserts */ + }, + BTREE_DEFAULT_FILLFACTOR, BTREE_MIN_FILLFACTOR, 100 + }, + { + { + "fillfactor", + "Packs hash index pages only to this percentage", + RELOPT_KIND_HASH, + ShareUpdateExclusiveLock /* since it applies only to later + * inserts */ + }, + HASH_DEFAULT_FILLFACTOR, HASH_MIN_FILLFACTOR, 100 + }, + { + { + "fillfactor", + "Packs gist index pages only to this percentage", + RELOPT_KIND_GIST, + ShareUpdateExclusiveLock /* since it applies only to later + * inserts */ + }, + GIST_DEFAULT_FILLFACTOR, GIST_MIN_FILLFACTOR, 100 + }, + { + { + "fillfactor", + "Packs spgist index pages only to this percentage", + RELOPT_KIND_SPGIST, + ShareUpdateExclusiveLock /* since it applies only to later + * inserts */ + }, + SPGIST_DEFAULT_FILLFACTOR, SPGIST_MIN_FILLFACTOR, 100 + }, + { + { + "autovacuum_vacuum_threshold", + "Minimum number of tuple updates or deletes prior to vacuum", + RELOPT_KIND_HEAP | RELOPT_KIND_TOAST, + ShareUpdateExclusiveLock + }, + -1, 0, INT_MAX + }, + { + { + "autovacuum_vacuum_insert_threshold", + "Minimum number of tuple inserts prior to vacuum, or -1 to disable insert vacuums", + RELOPT_KIND_HEAP | RELOPT_KIND_TOAST, + ShareUpdateExclusiveLock + }, + -2, -1, INT_MAX + }, + { + { + "autovacuum_analyze_threshold", + "Minimum number of tuple inserts, updates or deletes prior to analyze", + RELOPT_KIND_HEAP, + ShareUpdateExclusiveLock + }, + -1, 0, INT_MAX + }, + { + { + "autovacuum_vacuum_cost_limit", + "Vacuum cost amount available before napping, for autovacuum", + RELOPT_KIND_HEAP | RELOPT_KIND_TOAST, + ShareUpdateExclusiveLock + }, + -1, 1, 10000 + }, + { + { + "autovacuum_freeze_min_age", + "Minimum age at which VACUUM should freeze a table row, for autovacuum", + RELOPT_KIND_HEAP | RELOPT_KIND_TOAST, + ShareUpdateExclusiveLock + }, + -1, 0, 1000000000 + }, + { + { + "autovacuum_multixact_freeze_min_age", + "Minimum multixact age at which VACUUM should freeze a row multixact's, for autovacuum", + RELOPT_KIND_HEAP | RELOPT_KIND_TOAST, + ShareUpdateExclusiveLock + }, + -1, 0, 1000000000 + }, + { + { + "autovacuum_freeze_max_age", + "Age at which to autovacuum a table to prevent transaction ID wraparound", + RELOPT_KIND_HEAP | RELOPT_KIND_TOAST, + ShareUpdateExclusiveLock + }, + -1, 100000, 2000000000 + }, + { + { + "autovacuum_multixact_freeze_max_age", + "Multixact age at which to autovacuum a table to prevent multixact wraparound", + RELOPT_KIND_HEAP | RELOPT_KIND_TOAST, + ShareUpdateExclusiveLock + }, + -1, 10000, 2000000000 + }, + { + { + "autovacuum_freeze_table_age", + "Age at which VACUUM should perform a full table sweep to freeze row versions", + RELOPT_KIND_HEAP | RELOPT_KIND_TOAST, + ShareUpdateExclusiveLock + }, -1, 0, 2000000000 + }, + { + { + "autovacuum_multixact_freeze_table_age", + "Age of multixact at which VACUUM should perform a full table sweep to freeze row versions", + RELOPT_KIND_HEAP | RELOPT_KIND_TOAST, + ShareUpdateExclusiveLock + }, -1, 0, 2000000000 + }, + { + { + "log_autovacuum_min_duration", + "Sets the minimum execution time above which autovacuum actions will be logged", + RELOPT_KIND_HEAP | RELOPT_KIND_TOAST, + ShareUpdateExclusiveLock + }, + -1, -1, INT_MAX + }, + { + { + "toast_tuple_target", + "Sets the target tuple length at which external columns will be toasted", + RELOPT_KIND_HEAP, + ShareUpdateExclusiveLock + }, + TOAST_TUPLE_TARGET, 128, TOAST_TUPLE_TARGET_MAIN + }, + { + { + "pages_per_range", + "Number of pages that each page range covers in a BRIN index", + RELOPT_KIND_BRIN, + AccessExclusiveLock + }, 128, 1, 131072 + }, + { + { + "gin_pending_list_limit", + "Maximum size of the pending list for this GIN index, in kilobytes.", + RELOPT_KIND_GIN, + AccessExclusiveLock + }, + -1, 64, MAX_KILOBYTES + }, + { + { + "effective_io_concurrency", + "Number of simultaneous requests that can be handled efficiently by the disk subsystem.", + RELOPT_KIND_TABLESPACE, + ShareUpdateExclusiveLock + }, +#ifdef USE_PREFETCH + -1, 0, MAX_IO_CONCURRENCY +#else + 0, 0, 0 +#endif + }, + { + { + "maintenance_io_concurrency", + "Number of simultaneous requests that can be handled efficiently by the disk subsystem for maintenance work.", + RELOPT_KIND_TABLESPACE, + ShareUpdateExclusiveLock + }, +#ifdef USE_PREFETCH + -1, 0, MAX_IO_CONCURRENCY +#else + 0, 0, 0 +#endif + }, + { + { + "parallel_workers", + "Number of parallel processes that can be used per executor node for this relation.", + RELOPT_KIND_HEAP, + ShareUpdateExclusiveLock + }, + -1, 0, 1024 + }, + + /* list terminator */ + {{NULL}} +}; + +static relopt_real realRelOpts[] = +{ + { + { + "autovacuum_vacuum_cost_delay", + "Vacuum cost delay in milliseconds, for autovacuum", + RELOPT_KIND_HEAP | RELOPT_KIND_TOAST, + ShareUpdateExclusiveLock + }, + -1, 0.0, 100.0 + }, + { + { + "autovacuum_vacuum_scale_factor", + "Number of tuple updates or deletes prior to vacuum as a fraction of reltuples", + RELOPT_KIND_HEAP | RELOPT_KIND_TOAST, + ShareUpdateExclusiveLock + }, + -1, 0.0, 100.0 + }, + { + { + "autovacuum_vacuum_insert_scale_factor", + "Number of tuple inserts prior to vacuum as a fraction of reltuples", + RELOPT_KIND_HEAP | RELOPT_KIND_TOAST, + ShareUpdateExclusiveLock + }, + -1, 0.0, 100.0 + }, + { + { + "autovacuum_analyze_scale_factor", + "Number of tuple inserts, updates or deletes prior to analyze as a fraction of reltuples", + RELOPT_KIND_HEAP, + ShareUpdateExclusiveLock + }, + -1, 0.0, 100.0 + }, + { + { + "seq_page_cost", + "Sets the planner's estimate of the cost of a sequentially fetched disk page.", + RELOPT_KIND_TABLESPACE, + ShareUpdateExclusiveLock + }, + -1, 0.0, DBL_MAX + }, + { + { + "random_page_cost", + "Sets the planner's estimate of the cost of a nonsequentially fetched disk page.", + RELOPT_KIND_TABLESPACE, + ShareUpdateExclusiveLock + }, + -1, 0.0, DBL_MAX + }, + { + { + "n_distinct", + "Sets the planner's estimate of the number of distinct values appearing in a column (excluding child relations).", + RELOPT_KIND_ATTRIBUTE, + ShareUpdateExclusiveLock + }, + 0, -1.0, DBL_MAX + }, + { + { + "n_distinct_inherited", + "Sets the planner's estimate of the number of distinct values appearing in a column (including child relations).", + RELOPT_KIND_ATTRIBUTE, + ShareUpdateExclusiveLock + }, + 0, -1.0, DBL_MAX + }, + { + { + "vacuum_cleanup_index_scale_factor", + "Deprecated B-Tree parameter.", + RELOPT_KIND_BTREE, + ShareUpdateExclusiveLock + }, + -1, 0.0, 1e10 + }, + /* list terminator */ + {{NULL}} +}; + +/* values from StdRdOptIndexCleanup */ +relopt_enum_elt_def StdRdOptIndexCleanupValues[] = +{ + {"auto", STDRD_OPTION_VACUUM_INDEX_CLEANUP_AUTO}, + {"on", STDRD_OPTION_VACUUM_INDEX_CLEANUP_ON}, + {"off", STDRD_OPTION_VACUUM_INDEX_CLEANUP_OFF}, + {"true", STDRD_OPTION_VACUUM_INDEX_CLEANUP_ON}, + {"false", STDRD_OPTION_VACUUM_INDEX_CLEANUP_OFF}, + {"yes", STDRD_OPTION_VACUUM_INDEX_CLEANUP_ON}, + {"no", STDRD_OPTION_VACUUM_INDEX_CLEANUP_OFF}, + {"1", STDRD_OPTION_VACUUM_INDEX_CLEANUP_ON}, + {"0", STDRD_OPTION_VACUUM_INDEX_CLEANUP_OFF}, + {(const char *) NULL} /* list terminator */ +}; + +/* values from GistOptBufferingMode */ +relopt_enum_elt_def gistBufferingOptValues[] = +{ + {"auto", GIST_OPTION_BUFFERING_AUTO}, + {"on", GIST_OPTION_BUFFERING_ON}, + {"off", GIST_OPTION_BUFFERING_OFF}, + {(const char *) NULL} /* list terminator */ +}; + +/* values from ViewOptCheckOption */ +relopt_enum_elt_def viewCheckOptValues[] = +{ + /* no value for NOT_SET */ + {"local", VIEW_OPTION_CHECK_OPTION_LOCAL}, + {"cascaded", VIEW_OPTION_CHECK_OPTION_CASCADED}, + {(const char *) NULL} /* list terminator */ +}; + +static relopt_enum enumRelOpts[] = +{ + { + { + "vacuum_index_cleanup", + "Controls index vacuuming and index cleanup", + RELOPT_KIND_HEAP | RELOPT_KIND_TOAST, + ShareUpdateExclusiveLock + }, + StdRdOptIndexCleanupValues, + STDRD_OPTION_VACUUM_INDEX_CLEANUP_AUTO, + gettext_noop("Valid values are \"on\", \"off\", and \"auto\".") + }, + { + { + "buffering", + "Enables buffering build for this GiST index", + RELOPT_KIND_GIST, + AccessExclusiveLock + }, + gistBufferingOptValues, + GIST_OPTION_BUFFERING_AUTO, + gettext_noop("Valid values are \"on\", \"off\", and \"auto\".") + }, + { + { + "check_option", + "View has WITH CHECK OPTION defined (local or cascaded).", + RELOPT_KIND_VIEW, + AccessExclusiveLock + }, + viewCheckOptValues, + VIEW_OPTION_CHECK_OPTION_NOT_SET, + gettext_noop("Valid values are \"local\" and \"cascaded\".") + }, + /* list terminator */ + {{NULL}} +}; + +static relopt_string stringRelOpts[] = +{ + /* list terminator */ + {{NULL}} +}; + +static relopt_gen **relOpts = NULL; +static bits32 last_assigned_kind = RELOPT_KIND_LAST_DEFAULT; + +static int num_custom_options = 0; +static relopt_gen **custom_options = NULL; +static bool need_initialization = true; + +static void initialize_reloptions(void); +static void parse_one_reloption(relopt_value *option, char *text_str, + int text_len, bool validate); + +/* + * Get the length of a string reloption (either default or the user-defined + * value). This is used for allocation purposes when building a set of + * relation options. + */ +#define GET_STRING_RELOPTION_LEN(option) \ + ((option).isset ? strlen((option).values.string_val) : \ + ((relopt_string *) (option).gen)->default_len) + +/* + * initialize_reloptions + * initialization routine, must be called before parsing + * + * Initialize the relOpts array and fill each variable's type and name length. + */ +static void +initialize_reloptions(void) +{ + int i; + int j; + + j = 0; + for (i = 0; boolRelOpts[i].gen.name; i++) + { + Assert(DoLockModesConflict(boolRelOpts[i].gen.lockmode, + boolRelOpts[i].gen.lockmode)); + j++; + } + for (i = 0; intRelOpts[i].gen.name; i++) + { + Assert(DoLockModesConflict(intRelOpts[i].gen.lockmode, + intRelOpts[i].gen.lockmode)); + j++; + } + for (i = 0; realRelOpts[i].gen.name; i++) + { + Assert(DoLockModesConflict(realRelOpts[i].gen.lockmode, + realRelOpts[i].gen.lockmode)); + j++; + } + for (i = 0; enumRelOpts[i].gen.name; i++) + { + Assert(DoLockModesConflict(enumRelOpts[i].gen.lockmode, + enumRelOpts[i].gen.lockmode)); + j++; + } + for (i = 0; stringRelOpts[i].gen.name; i++) + { + Assert(DoLockModesConflict(stringRelOpts[i].gen.lockmode, + stringRelOpts[i].gen.lockmode)); + j++; + } + j += num_custom_options; + + if (relOpts) + pfree(relOpts); + relOpts = MemoryContextAlloc(TopMemoryContext, + (j + 1) * sizeof(relopt_gen *)); + + j = 0; + for (i = 0; boolRelOpts[i].gen.name; i++) + { + relOpts[j] = &boolRelOpts[i].gen; + relOpts[j]->type = RELOPT_TYPE_BOOL; + relOpts[j]->namelen = strlen(relOpts[j]->name); + j++; + } + + for (i = 0; intRelOpts[i].gen.name; i++) + { + relOpts[j] = &intRelOpts[i].gen; + relOpts[j]->type = RELOPT_TYPE_INT; + relOpts[j]->namelen = strlen(relOpts[j]->name); + j++; + } + + for (i = 0; realRelOpts[i].gen.name; i++) + { + relOpts[j] = &realRelOpts[i].gen; + relOpts[j]->type = RELOPT_TYPE_REAL; + relOpts[j]->namelen = strlen(relOpts[j]->name); + j++; + } + + for (i = 0; enumRelOpts[i].gen.name; i++) + { + relOpts[j] = &enumRelOpts[i].gen; + relOpts[j]->type = RELOPT_TYPE_ENUM; + relOpts[j]->namelen = strlen(relOpts[j]->name); + j++; + } + + for (i = 0; stringRelOpts[i].gen.name; i++) + { + relOpts[j] = &stringRelOpts[i].gen; + relOpts[j]->type = RELOPT_TYPE_STRING; + relOpts[j]->namelen = strlen(relOpts[j]->name); + j++; + } + + for (i = 0; i < num_custom_options; i++) + { + relOpts[j] = custom_options[i]; + j++; + } + + /* add a list terminator */ + relOpts[j] = NULL; + + /* flag the work is complete */ + need_initialization = false; +} + +/* + * add_reloption_kind + * Create a new relopt_kind value, to be used in custom reloptions by + * user-defined AMs. + */ +relopt_kind +add_reloption_kind(void) +{ + /* don't hand out the last bit so that the enum's behavior is portable */ + if (last_assigned_kind >= RELOPT_KIND_MAX) + ereport(ERROR, + (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED), + errmsg("user-defined relation parameter types limit exceeded"))); + last_assigned_kind <<= 1; + return (relopt_kind) last_assigned_kind; +} + +/* + * add_reloption + * Add an already-created custom reloption to the list, and recompute the + * main parser table. + */ +static void +add_reloption(relopt_gen *newoption) +{ + static int max_custom_options = 0; + + if (num_custom_options >= max_custom_options) + { + MemoryContext oldcxt; + + oldcxt = MemoryContextSwitchTo(TopMemoryContext); + + if (max_custom_options == 0) + { + max_custom_options = 8; + custom_options = palloc(max_custom_options * sizeof(relopt_gen *)); + } + else + { + max_custom_options *= 2; + custom_options = repalloc(custom_options, + max_custom_options * sizeof(relopt_gen *)); + } + MemoryContextSwitchTo(oldcxt); + } + custom_options[num_custom_options++] = newoption; + + need_initialization = true; +} + +/* + * init_local_reloptions + * Initialize local reloptions that will parsed into bytea structure of + * 'relopt_struct_size'. + */ +void +init_local_reloptions(local_relopts *opts, Size relopt_struct_size) +{ + opts->options = NIL; + opts->validators = NIL; + opts->relopt_struct_size = relopt_struct_size; +} + +/* + * register_reloptions_validator + * Register custom validation callback that will be called at the end of + * build_local_reloptions(). + */ +void +register_reloptions_validator(local_relopts *opts, relopts_validator validator) +{ + opts->validators = lappend(opts->validators, validator); +} + +/* + * add_local_reloption + * Add an already-created custom reloption to the local list. + */ +static void +add_local_reloption(local_relopts *relopts, relopt_gen *newoption, int offset) +{ + local_relopt *opt = palloc(sizeof(*opt)); + + Assert(offset < relopts->relopt_struct_size); + + opt->option = newoption; + opt->offset = offset; + + relopts->options = lappend(relopts->options, opt); +} + +/* + * allocate_reloption + * Allocate a new reloption and initialize the type-agnostic fields + * (for types other than string) + */ +static relopt_gen * +allocate_reloption(bits32 kinds, int type, const char *name, const char *desc, + LOCKMODE lockmode) +{ + MemoryContext oldcxt; + size_t size; + relopt_gen *newoption; + + if (kinds != RELOPT_KIND_LOCAL) + oldcxt = MemoryContextSwitchTo(TopMemoryContext); + else + oldcxt = NULL; + + switch (type) + { + case RELOPT_TYPE_BOOL: + size = sizeof(relopt_bool); + break; + case RELOPT_TYPE_INT: + size = sizeof(relopt_int); + break; + case RELOPT_TYPE_REAL: + size = sizeof(relopt_real); + break; + case RELOPT_TYPE_ENUM: + size = sizeof(relopt_enum); + break; + case RELOPT_TYPE_STRING: + size = sizeof(relopt_string); + break; + default: + elog(ERROR, "unsupported reloption type %d", type); + return NULL; /* keep compiler quiet */ + } + + newoption = palloc(size); + + newoption->name = pstrdup(name); + if (desc) + newoption->desc = pstrdup(desc); + else + newoption->desc = NULL; + newoption->kinds = kinds; + newoption->namelen = strlen(name); + newoption->type = type; + newoption->lockmode = lockmode; + + if (oldcxt != NULL) + MemoryContextSwitchTo(oldcxt); + + return newoption; +} + +/* + * init_bool_reloption + * Allocate and initialize a new boolean reloption + */ +static relopt_bool * +init_bool_reloption(bits32 kinds, const char *name, const char *desc, + bool default_val, LOCKMODE lockmode) +{ + relopt_bool *newoption; + + newoption = (relopt_bool *) allocate_reloption(kinds, RELOPT_TYPE_BOOL, + name, desc, lockmode); + newoption->default_val = default_val; + + return newoption; +} + +/* + * add_bool_reloption + * Add a new boolean reloption + */ +void +add_bool_reloption(bits32 kinds, const char *name, const char *desc, + bool default_val, LOCKMODE lockmode) +{ + relopt_bool *newoption = init_bool_reloption(kinds, name, desc, + default_val, lockmode); + + add_reloption((relopt_gen *) newoption); +} + +/* + * add_local_bool_reloption + * Add a new boolean local reloption + * + * 'offset' is offset of bool-typed field. + */ +void +add_local_bool_reloption(local_relopts *relopts, const char *name, + const char *desc, bool default_val, int offset) +{ + relopt_bool *newoption = init_bool_reloption(RELOPT_KIND_LOCAL, + name, desc, + default_val, 0); + + add_local_reloption(relopts, (relopt_gen *) newoption, offset); +} + + +/* + * init_real_reloption + * Allocate and initialize a new integer reloption + */ +static relopt_int * +init_int_reloption(bits32 kinds, const char *name, const char *desc, + int default_val, int min_val, int max_val, + LOCKMODE lockmode) +{ + relopt_int *newoption; + + newoption = (relopt_int *) allocate_reloption(kinds, RELOPT_TYPE_INT, + name, desc, lockmode); + newoption->default_val = default_val; + newoption->min = min_val; + newoption->max = max_val; + + return newoption; +} + +/* + * add_int_reloption + * Add a new integer reloption + */ +void +add_int_reloption(bits32 kinds, const char *name, const char *desc, int default_val, + int min_val, int max_val, LOCKMODE lockmode) +{ + relopt_int *newoption = init_int_reloption(kinds, name, desc, + default_val, min_val, + max_val, lockmode); + + add_reloption((relopt_gen *) newoption); +} + +/* + * add_local_int_reloption + * Add a new local integer reloption + * + * 'offset' is offset of int-typed field. + */ +void +add_local_int_reloption(local_relopts *relopts, const char *name, + const char *desc, int default_val, int min_val, + int max_val, int offset) +{ + relopt_int *newoption = init_int_reloption(RELOPT_KIND_LOCAL, + name, desc, default_val, + min_val, max_val, 0); + + add_local_reloption(relopts, (relopt_gen *) newoption, offset); +} + +/* + * init_real_reloption + * Allocate and initialize a new real reloption + */ +static relopt_real * +init_real_reloption(bits32 kinds, const char *name, const char *desc, + double default_val, double min_val, double max_val, + LOCKMODE lockmode) +{ + relopt_real *newoption; + + newoption = (relopt_real *) allocate_reloption(kinds, RELOPT_TYPE_REAL, + name, desc, lockmode); + newoption->default_val = default_val; + newoption->min = min_val; + newoption->max = max_val; + + return newoption; +} + +/* + * add_real_reloption + * Add a new float reloption + */ +void +add_real_reloption(bits32 kinds, const char *name, const char *desc, + double default_val, double min_val, double max_val, + LOCKMODE lockmode) +{ + relopt_real *newoption = init_real_reloption(kinds, name, desc, + default_val, min_val, + max_val, lockmode); + + add_reloption((relopt_gen *) newoption); +} + +/* + * add_local_real_reloption + * Add a new local float reloption + * + * 'offset' is offset of double-typed field. + */ +void +add_local_real_reloption(local_relopts *relopts, const char *name, + const char *desc, double default_val, + double min_val, double max_val, int offset) +{ + relopt_real *newoption = init_real_reloption(RELOPT_KIND_LOCAL, + name, desc, + default_val, min_val, + max_val, 0); + + add_local_reloption(relopts, (relopt_gen *) newoption, offset); +} + +/* + * init_enum_reloption + * Allocate and initialize a new enum reloption + */ +static relopt_enum * +init_enum_reloption(bits32 kinds, const char *name, const char *desc, + relopt_enum_elt_def *members, int default_val, + const char *detailmsg, LOCKMODE lockmode) +{ + relopt_enum *newoption; + + newoption = (relopt_enum *) allocate_reloption(kinds, RELOPT_TYPE_ENUM, + name, desc, lockmode); + newoption->members = members; + newoption->default_val = default_val; + newoption->detailmsg = detailmsg; + + return newoption; +} + + +/* + * add_enum_reloption + * Add a new enum reloption + * + * The members array must have a terminating NULL entry. + * + * The detailmsg is shown when unsupported values are passed, and has this + * form: "Valid values are \"foo\", \"bar\", and \"bar\"." + * + * The members array and detailmsg are not copied -- caller must ensure that + * they are valid throughout the life of the process. + */ +void +add_enum_reloption(bits32 kinds, const char *name, const char *desc, + relopt_enum_elt_def *members, int default_val, + const char *detailmsg, LOCKMODE lockmode) +{ + relopt_enum *newoption = init_enum_reloption(kinds, name, desc, + members, default_val, + detailmsg, lockmode); + + add_reloption((relopt_gen *) newoption); +} + +/* + * add_local_enum_reloption + * Add a new local enum reloption + * + * 'offset' is offset of int-typed field. + */ +void +add_local_enum_reloption(local_relopts *relopts, const char *name, + const char *desc, relopt_enum_elt_def *members, + int default_val, const char *detailmsg, int offset) +{ + relopt_enum *newoption = init_enum_reloption(RELOPT_KIND_LOCAL, + name, desc, + members, default_val, + detailmsg, 0); + + add_local_reloption(relopts, (relopt_gen *) newoption, offset); +} + +/* + * init_string_reloption + * Allocate and initialize a new string reloption + */ +static relopt_string * +init_string_reloption(bits32 kinds, const char *name, const char *desc, + const char *default_val, + validate_string_relopt validator, + fill_string_relopt filler, + LOCKMODE lockmode) +{ + relopt_string *newoption; + + /* make sure the validator/default combination is sane */ + if (validator) + (validator) (default_val); + + newoption = (relopt_string *) allocate_reloption(kinds, RELOPT_TYPE_STRING, + name, desc, lockmode); + newoption->validate_cb = validator; + newoption->fill_cb = filler; + if (default_val) + { + if (kinds == RELOPT_KIND_LOCAL) + newoption->default_val = strdup(default_val); + else + newoption->default_val = MemoryContextStrdup(TopMemoryContext, default_val); + newoption->default_len = strlen(default_val); + newoption->default_isnull = false; + } + else + { + newoption->default_val = ""; + newoption->default_len = 0; + newoption->default_isnull = true; + } + + return newoption; +} + +/* + * add_string_reloption + * Add a new string reloption + * + * "validator" is an optional function pointer that can be used to test the + * validity of the values. It must elog(ERROR) when the argument string is + * not acceptable for the variable. Note that the default value must pass + * the validation. + */ +void +add_string_reloption(bits32 kinds, const char *name, const char *desc, + const char *default_val, validate_string_relopt validator, + LOCKMODE lockmode) +{ + relopt_string *newoption = init_string_reloption(kinds, name, desc, + default_val, + validator, NULL, + lockmode); + + add_reloption((relopt_gen *) newoption); +} + +/* + * add_local_string_reloption + * Add a new local string reloption + * + * 'offset' is offset of int-typed field that will store offset of string value + * in the resulting bytea structure. + */ +void +add_local_string_reloption(local_relopts *relopts, const char *name, + const char *desc, const char *default_val, + validate_string_relopt validator, + fill_string_relopt filler, int offset) +{ + relopt_string *newoption = init_string_reloption(RELOPT_KIND_LOCAL, + name, desc, + default_val, + validator, filler, + 0); + + add_local_reloption(relopts, (relopt_gen *) newoption, offset); +} + +/* + * Transform a relation options list (list of DefElem) into the text array + * format that is kept in pg_class.reloptions, including only those options + * that are in the passed namespace. The output values do not include the + * namespace. + * + * This is used for three cases: CREATE TABLE/INDEX, ALTER TABLE SET, and + * ALTER TABLE RESET. In the ALTER cases, oldOptions is the existing + * reloptions value (possibly NULL), and we replace or remove entries + * as needed. + * + * If acceptOidsOff is true, then we allow oids = false, but throw error when + * on. This is solely needed for backwards compatibility. + * + * Note that this is not responsible for determining whether the options + * are valid, but it does check that namespaces for all the options given are + * listed in validnsps. The NULL namespace is always valid and need not be + * explicitly listed. Passing a NULL pointer means that only the NULL + * namespace is valid. + * + * Both oldOptions and the result are text arrays (or NULL for "default"), + * but we declare them as Datums to avoid including array.h in reloptions.h. + */ +Datum +transformRelOptions(Datum oldOptions, List *defList, const char *namspace, + char *validnsps[], bool acceptOidsOff, bool isReset) +{ + Datum result; + ArrayBuildState *astate; + ListCell *cell; + + /* no change if empty list */ + if (defList == NIL) + return oldOptions; + + /* We build new array using accumArrayResult */ + astate = NULL; + + /* Copy any oldOptions that aren't to be replaced */ + if (PointerIsValid(DatumGetPointer(oldOptions))) + { + ArrayType *array = DatumGetArrayTypeP(oldOptions); + Datum *oldoptions; + int noldoptions; + int i; + + deconstruct_array(array, TEXTOID, -1, false, TYPALIGN_INT, + &oldoptions, NULL, &noldoptions); + + for (i = 0; i < noldoptions; i++) + { + char *text_str = VARDATA(oldoptions[i]); + int text_len = VARSIZE(oldoptions[i]) - VARHDRSZ; + + /* Search for a match in defList */ + foreach(cell, defList) + { + DefElem *def = (DefElem *) lfirst(cell); + int kw_len; + + /* ignore if not in the same namespace */ + if (namspace == NULL) + { + if (def->defnamespace != NULL) + continue; + } + else if (def->defnamespace == NULL) + continue; + else if (strcmp(def->defnamespace, namspace) != 0) + continue; + + kw_len = strlen(def->defname); + if (text_len > kw_len && text_str[kw_len] == '=' && + strncmp(text_str, def->defname, kw_len) == 0) + break; + } + if (!cell) + { + /* No match, so keep old option */ + astate = accumArrayResult(astate, oldoptions[i], + false, TEXTOID, + CurrentMemoryContext); + } + } + } + + /* + * If CREATE/SET, add new options to array; if RESET, just check that the + * user didn't say RESET (option=val). (Must do this because the grammar + * doesn't enforce it.) + */ + foreach(cell, defList) + { + DefElem *def = (DefElem *) lfirst(cell); + + if (isReset) + { + if (def->arg != NULL) + ereport(ERROR, + (errcode(ERRCODE_SYNTAX_ERROR), + errmsg("RESET must not include values for parameters"))); + } + else + { + text *t; + const char *value; + Size len; + + /* + * Error out if the namespace is not valid. A NULL namespace is + * always valid. + */ + if (def->defnamespace != NULL) + { + bool valid = false; + int i; + + if (validnsps) + { + for (i = 0; validnsps[i]; i++) + { + if (strcmp(def->defnamespace, validnsps[i]) == 0) + { + valid = true; + break; + } + } + } + + if (!valid) + ereport(ERROR, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("unrecognized parameter namespace \"%s\"", + def->defnamespace))); + } + + /* ignore if not in the same namespace */ + if (namspace == NULL) + { + if (def->defnamespace != NULL) + continue; + } + else if (def->defnamespace == NULL) + continue; + else if (strcmp(def->defnamespace, namspace) != 0) + continue; + + /* + * Flatten the DefElem into a text string like "name=arg". If we + * have just "name", assume "name=true" is meant. Note: the + * namespace is not output. + */ + if (def->arg != NULL) + value = defGetString(def); + else + value = "true"; + + /* + * This is not a great place for this test, but there's no other + * convenient place to filter the option out. As WITH (oids = + * false) will be removed someday, this seems like an acceptable + * amount of ugly. + */ + if (acceptOidsOff && def->defnamespace == NULL && + strcmp(def->defname, "oids") == 0) + { + if (defGetBoolean(def)) + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("tables declared WITH OIDS are not supported"))); + /* skip over option, reloptions machinery doesn't know it */ + continue; + } + + len = VARHDRSZ + strlen(def->defname) + 1 + strlen(value); + /* +1 leaves room for sprintf's trailing null */ + t = (text *) palloc(len + 1); + SET_VARSIZE(t, len); + sprintf(VARDATA(t), "%s=%s", def->defname, value); + + astate = accumArrayResult(astate, PointerGetDatum(t), + false, TEXTOID, + CurrentMemoryContext); + } + } + + if (astate) + result = makeArrayResult(astate, CurrentMemoryContext); + else + result = (Datum) 0; + + return result; +} + + +/* + * Convert the text-array format of reloptions into a List of DefElem. + * This is the inverse of transformRelOptions(). + */ +List * +untransformRelOptions(Datum options) +{ + List *result = NIL; + ArrayType *array; + Datum *optiondatums; + int noptions; + int i; + + /* Nothing to do if no options */ + if (!PointerIsValid(DatumGetPointer(options))) + return result; + + array = DatumGetArrayTypeP(options); + + deconstruct_array(array, TEXTOID, -1, false, TYPALIGN_INT, + &optiondatums, NULL, &noptions); + + for (i = 0; i < noptions; i++) + { + char *s; + char *p; + Node *val = NULL; + + s = TextDatumGetCString(optiondatums[i]); + p = strchr(s, '='); + if (p) + { + *p++ = '\0'; + val = (Node *) makeString(pstrdup(p)); + } + result = lappend(result, makeDefElem(pstrdup(s), val, -1)); + } + + return result; +} + +/* + * Extract and parse reloptions from a pg_class tuple. + * + * This is a low-level routine, expected to be used by relcache code and + * callers that do not have a table's relcache entry (e.g. autovacuum). For + * other uses, consider grabbing the rd_options pointer from the relcache entry + * instead. + * + * tupdesc is pg_class' tuple descriptor. amoptions is a pointer to the index + * AM's options parser function in the case of a tuple corresponding to an + * index, or NULL otherwise. + */ +bytea * +extractRelOptions(HeapTuple tuple, TupleDesc tupdesc, + amoptions_function amoptions) +{ + bytea *options; + bool isnull; + Datum datum; + Form_pg_class classForm; + + datum = fastgetattr(tuple, + Anum_pg_class_reloptions, + tupdesc, + &isnull); + if (isnull) + return NULL; + + classForm = (Form_pg_class) GETSTRUCT(tuple); + + /* Parse into appropriate format; don't error out here */ + switch (classForm->relkind) + { + case RELKIND_RELATION: + case RELKIND_TOASTVALUE: + case RELKIND_MATVIEW: + options = heap_reloptions(classForm->relkind, datum, false); + break; + case RELKIND_PARTITIONED_TABLE: + options = partitioned_table_reloptions(datum, false); + break; + case RELKIND_VIEW: + options = view_reloptions(datum, false); + break; + case RELKIND_INDEX: + case RELKIND_PARTITIONED_INDEX: + options = index_reloptions(amoptions, datum, false); + break; + case RELKIND_FOREIGN_TABLE: + options = NULL; + break; + default: + Assert(false); /* can't get here */ + options = NULL; /* keep compiler quiet */ + break; + } + + return options; +} + +static void +parseRelOptionsInternal(Datum options, bool validate, + relopt_value *reloptions, int numoptions) +{ + ArrayType *array = DatumGetArrayTypeP(options); + Datum *optiondatums; + int noptions; + int i; + + deconstruct_array(array, TEXTOID, -1, false, TYPALIGN_INT, + &optiondatums, NULL, &noptions); + + for (i = 0; i < noptions; i++) + { + char *text_str = VARDATA(optiondatums[i]); + int text_len = VARSIZE(optiondatums[i]) - VARHDRSZ; + int j; + + /* Search for a match in reloptions */ + for (j = 0; j < numoptions; j++) + { + int kw_len = reloptions[j].gen->namelen; + + if (text_len > kw_len && text_str[kw_len] == '=' && + strncmp(text_str, reloptions[j].gen->name, kw_len) == 0) + { + parse_one_reloption(&reloptions[j], text_str, text_len, + validate); + break; + } + } + + if (j >= numoptions && validate) + { + char *s; + char *p; + + s = TextDatumGetCString(optiondatums[i]); + p = strchr(s, '='); + if (p) + *p = '\0'; + ereport(ERROR, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("unrecognized parameter \"%s\"", s))); + } + } + + /* It's worth avoiding memory leaks in this function */ + pfree(optiondatums); + + if (((void *) array) != DatumGetPointer(options)) + pfree(array); +} + +/* + * Interpret reloptions that are given in text-array format. + * + * options is a reloption text array as constructed by transformRelOptions. + * kind specifies the family of options to be processed. + * + * The return value is a relopt_value * array on which the options actually + * set in the options array are marked with isset=true. The length of this + * array is returned in *numrelopts. Options not set are also present in the + * array; this is so that the caller can easily locate the default values. + * + * If there are no options of the given kind, numrelopts is set to 0 and NULL + * is returned (unless options are illegally supplied despite none being + * defined, in which case an error occurs). + * + * Note: values of type int, bool and real are allocated as part of the + * returned array. Values of type string are allocated separately and must + * be freed by the caller. + */ +static relopt_value * +parseRelOptions(Datum options, bool validate, relopt_kind kind, + int *numrelopts) +{ + relopt_value *reloptions = NULL; + int numoptions = 0; + int i; + int j; + + if (need_initialization) + initialize_reloptions(); + + /* Build a list of expected options, based on kind */ + + for (i = 0; relOpts[i]; i++) + if (relOpts[i]->kinds & kind) + numoptions++; + + if (numoptions > 0) + { + reloptions = palloc(numoptions * sizeof(relopt_value)); + + for (i = 0, j = 0; relOpts[i]; i++) + { + if (relOpts[i]->kinds & kind) + { + reloptions[j].gen = relOpts[i]; + reloptions[j].isset = false; + j++; + } + } + } + + /* Done if no options */ + if (PointerIsValid(DatumGetPointer(options))) + parseRelOptionsInternal(options, validate, reloptions, numoptions); + + *numrelopts = numoptions; + return reloptions; +} + +/* Parse local unregistered options. */ +static relopt_value * +parseLocalRelOptions(local_relopts *relopts, Datum options, bool validate) +{ + int nopts = list_length(relopts->options); + relopt_value *values = palloc(sizeof(*values) * nopts); + ListCell *lc; + int i = 0; + + foreach(lc, relopts->options) + { + local_relopt *opt = lfirst(lc); + + values[i].gen = opt->option; + values[i].isset = false; + + i++; + } + + if (options != (Datum) 0) + parseRelOptionsInternal(options, validate, values, nopts); + + return values; +} + +/* + * Subroutine for parseRelOptions, to parse and validate a single option's + * value + */ +static void +parse_one_reloption(relopt_value *option, char *text_str, int text_len, + bool validate) +{ + char *value; + int value_len; + bool parsed; + bool nofree = false; + + if (option->isset && validate) + ereport(ERROR, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("parameter \"%s\" specified more than once", + option->gen->name))); + + value_len = text_len - option->gen->namelen - 1; + value = (char *) palloc(value_len + 1); + memcpy(value, text_str + option->gen->namelen + 1, value_len); + value[value_len] = '\0'; + + switch (option->gen->type) + { + case RELOPT_TYPE_BOOL: + { + parsed = parse_bool(value, &option->values.bool_val); + if (validate && !parsed) + ereport(ERROR, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("invalid value for boolean option \"%s\": %s", + option->gen->name, value))); + } + break; + case RELOPT_TYPE_INT: + { + relopt_int *optint = (relopt_int *) option->gen; + + parsed = parse_int(value, &option->values.int_val, 0, NULL); + if (validate && !parsed) + ereport(ERROR, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("invalid value for integer option \"%s\": %s", + option->gen->name, value))); + if (validate && (option->values.int_val < optint->min || + option->values.int_val > optint->max)) + ereport(ERROR, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("value %s out of bounds for option \"%s\"", + value, option->gen->name), + errdetail("Valid values are between \"%d\" and \"%d\".", + optint->min, optint->max))); + } + break; + case RELOPT_TYPE_REAL: + { + relopt_real *optreal = (relopt_real *) option->gen; + + parsed = parse_real(value, &option->values.real_val, 0, NULL); + if (validate && !parsed) + ereport(ERROR, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("invalid value for floating point option \"%s\": %s", + option->gen->name, value))); + if (validate && (option->values.real_val < optreal->min || + option->values.real_val > optreal->max)) + ereport(ERROR, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("value %s out of bounds for option \"%s\"", + value, option->gen->name), + errdetail("Valid values are between \"%f\" and \"%f\".", + optreal->min, optreal->max))); + } + break; + case RELOPT_TYPE_ENUM: + { + relopt_enum *optenum = (relopt_enum *) option->gen; + relopt_enum_elt_def *elt; + + parsed = false; + for (elt = optenum->members; elt->string_val; elt++) + { + if (pg_strcasecmp(value, elt->string_val) == 0) + { + option->values.enum_val = elt->symbol_val; + parsed = true; + break; + } + } + if (validate && !parsed) + ereport(ERROR, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("invalid value for enum option \"%s\": %s", + option->gen->name, value), + optenum->detailmsg ? + errdetail_internal("%s", _(optenum->detailmsg)) : 0)); + + /* + * If value is not among the allowed string values, but we are + * not asked to validate, just use the default numeric value. + */ + if (!parsed) + option->values.enum_val = optenum->default_val; + } + break; + case RELOPT_TYPE_STRING: + { + relopt_string *optstring = (relopt_string *) option->gen; + + option->values.string_val = value; + nofree = true; + if (validate && optstring->validate_cb) + (optstring->validate_cb) (value); + parsed = true; + } + break; + default: + elog(ERROR, "unsupported reloption type %d", option->gen->type); + parsed = true; /* quiet compiler */ + break; + } + + if (parsed) + option->isset = true; + if (!nofree) + pfree(value); +} + +/* + * Given the result from parseRelOptions, allocate a struct that's of the + * specified base size plus any extra space that's needed for string variables. + * + * "base" should be sizeof(struct) of the reloptions struct (StdRdOptions or + * equivalent). + */ +static void * +allocateReloptStruct(Size base, relopt_value *options, int numoptions) +{ + Size size = base; + int i; + + for (i = 0; i < numoptions; i++) + { + relopt_value *optval = &options[i]; + + if (optval->gen->type == RELOPT_TYPE_STRING) + { + relopt_string *optstr = (relopt_string *) optval->gen; + + if (optstr->fill_cb) + { + const char *val = optval->isset ? optval->values.string_val : + optstr->default_isnull ? NULL : optstr->default_val; + + size += optstr->fill_cb(val, NULL); + } + else + size += GET_STRING_RELOPTION_LEN(*optval) + 1; + } + } + + return palloc0(size); +} + +/* + * Given the result of parseRelOptions and a parsing table, fill in the + * struct (previously allocated with allocateReloptStruct) with the parsed + * values. + * + * rdopts is the pointer to the allocated struct to be filled. + * basesize is the sizeof(struct) that was passed to allocateReloptStruct. + * options, of length numoptions, is parseRelOptions' output. + * elems, of length numelems, is the table describing the allowed options. + * When validate is true, it is expected that all options appear in elems. + */ +static void +fillRelOptions(void *rdopts, Size basesize, + relopt_value *options, int numoptions, + bool validate, + const relopt_parse_elt *elems, int numelems) +{ + int i; + int offset = basesize; + + for (i = 0; i < numoptions; i++) + { + int j; + bool found = false; + + for (j = 0; j < numelems; j++) + { + if (strcmp(options[i].gen->name, elems[j].optname) == 0) + { + relopt_string *optstring; + char *itempos = ((char *) rdopts) + elems[j].offset; + char *string_val; + + switch (options[i].gen->type) + { + case RELOPT_TYPE_BOOL: + *(bool *) itempos = options[i].isset ? + options[i].values.bool_val : + ((relopt_bool *) options[i].gen)->default_val; + break; + case RELOPT_TYPE_INT: + *(int *) itempos = options[i].isset ? + options[i].values.int_val : + ((relopt_int *) options[i].gen)->default_val; + break; + case RELOPT_TYPE_REAL: + *(double *) itempos = options[i].isset ? + options[i].values.real_val : + ((relopt_real *) options[i].gen)->default_val; + break; + case RELOPT_TYPE_ENUM: + *(int *) itempos = options[i].isset ? + options[i].values.enum_val : + ((relopt_enum *) options[i].gen)->default_val; + break; + case RELOPT_TYPE_STRING: + optstring = (relopt_string *) options[i].gen; + if (options[i].isset) + string_val = options[i].values.string_val; + else if (!optstring->default_isnull) + string_val = optstring->default_val; + else + string_val = NULL; + + if (optstring->fill_cb) + { + Size size = + optstring->fill_cb(string_val, + (char *) rdopts + offset); + + if (size) + { + *(int *) itempos = offset; + offset += size; + } + else + *(int *) itempos = 0; + } + else if (string_val == NULL) + *(int *) itempos = 0; + else + { + strcpy((char *) rdopts + offset, string_val); + *(int *) itempos = offset; + offset += strlen(string_val) + 1; + } + break; + default: + elog(ERROR, "unsupported reloption type %d", + options[i].gen->type); + break; + } + found = true; + break; + } + } + if (validate && !found) + elog(ERROR, "reloption \"%s\" not found in parse table", + options[i].gen->name); + } + SET_VARSIZE(rdopts, offset); +} + + +/* + * Option parser for anything that uses StdRdOptions. + */ +bytea * +default_reloptions(Datum reloptions, bool validate, relopt_kind kind) +{ + static const relopt_parse_elt tab[] = { + {"fillfactor", RELOPT_TYPE_INT, offsetof(StdRdOptions, fillfactor)}, + {"autovacuum_enabled", RELOPT_TYPE_BOOL, + offsetof(StdRdOptions, autovacuum) + offsetof(AutoVacOpts, enabled)}, + {"autovacuum_vacuum_threshold", RELOPT_TYPE_INT, + offsetof(StdRdOptions, autovacuum) + offsetof(AutoVacOpts, vacuum_threshold)}, + {"autovacuum_vacuum_insert_threshold", RELOPT_TYPE_INT, + offsetof(StdRdOptions, autovacuum) + offsetof(AutoVacOpts, vacuum_ins_threshold)}, + {"autovacuum_analyze_threshold", RELOPT_TYPE_INT, + offsetof(StdRdOptions, autovacuum) + offsetof(AutoVacOpts, analyze_threshold)}, + {"autovacuum_vacuum_cost_limit", RELOPT_TYPE_INT, + offsetof(StdRdOptions, autovacuum) + offsetof(AutoVacOpts, vacuum_cost_limit)}, + {"autovacuum_freeze_min_age", RELOPT_TYPE_INT, + offsetof(StdRdOptions, autovacuum) + offsetof(AutoVacOpts, freeze_min_age)}, + {"autovacuum_freeze_max_age", RELOPT_TYPE_INT, + offsetof(StdRdOptions, autovacuum) + offsetof(AutoVacOpts, freeze_max_age)}, + {"autovacuum_freeze_table_age", RELOPT_TYPE_INT, + offsetof(StdRdOptions, autovacuum) + offsetof(AutoVacOpts, freeze_table_age)}, + {"autovacuum_multixact_freeze_min_age", RELOPT_TYPE_INT, + offsetof(StdRdOptions, autovacuum) + offsetof(AutoVacOpts, multixact_freeze_min_age)}, + {"autovacuum_multixact_freeze_max_age", RELOPT_TYPE_INT, + offsetof(StdRdOptions, autovacuum) + offsetof(AutoVacOpts, multixact_freeze_max_age)}, + {"autovacuum_multixact_freeze_table_age", RELOPT_TYPE_INT, + offsetof(StdRdOptions, autovacuum) + offsetof(AutoVacOpts, multixact_freeze_table_age)}, + {"log_autovacuum_min_duration", RELOPT_TYPE_INT, + offsetof(StdRdOptions, autovacuum) + offsetof(AutoVacOpts, log_min_duration)}, + {"toast_tuple_target", RELOPT_TYPE_INT, + offsetof(StdRdOptions, toast_tuple_target)}, + {"autovacuum_vacuum_cost_delay", RELOPT_TYPE_REAL, + offsetof(StdRdOptions, autovacuum) + offsetof(AutoVacOpts, vacuum_cost_delay)}, + {"autovacuum_vacuum_scale_factor", RELOPT_TYPE_REAL, + offsetof(StdRdOptions, autovacuum) + offsetof(AutoVacOpts, vacuum_scale_factor)}, + {"autovacuum_vacuum_insert_scale_factor", RELOPT_TYPE_REAL, + offsetof(StdRdOptions, autovacuum) + offsetof(AutoVacOpts, vacuum_ins_scale_factor)}, + {"autovacuum_analyze_scale_factor", RELOPT_TYPE_REAL, + offsetof(StdRdOptions, autovacuum) + offsetof(AutoVacOpts, analyze_scale_factor)}, + {"user_catalog_table", RELOPT_TYPE_BOOL, + offsetof(StdRdOptions, user_catalog_table)}, + {"parallel_workers", RELOPT_TYPE_INT, + offsetof(StdRdOptions, parallel_workers)}, + {"vacuum_index_cleanup", RELOPT_TYPE_ENUM, + offsetof(StdRdOptions, vacuum_index_cleanup)}, + {"vacuum_truncate", RELOPT_TYPE_BOOL, + offsetof(StdRdOptions, vacuum_truncate)} + }; + + return (bytea *) build_reloptions(reloptions, validate, kind, + sizeof(StdRdOptions), + tab, lengthof(tab)); +} + +/* + * build_reloptions + * + * Parses "reloptions" provided by the caller, returning them in a + * structure containing the parsed options. The parsing is done with + * the help of a parsing table describing the allowed options, defined + * by "relopt_elems" of length "num_relopt_elems". + * + * "validate" must be true if reloptions value is freshly built by + * transformRelOptions(), as opposed to being read from the catalog, in which + * case the values contained in it must already be valid. + * + * NULL is returned if the passed-in options did not match any of the options + * in the parsing table, unless validate is true in which case an error would + * be reported. + */ +void * +build_reloptions(Datum reloptions, bool validate, + relopt_kind kind, + Size relopt_struct_size, + const relopt_parse_elt *relopt_elems, + int num_relopt_elems) +{ + int numoptions; + relopt_value *options; + void *rdopts; + + /* parse options specific to given relation option kind */ + options = parseRelOptions(reloptions, validate, kind, &numoptions); + Assert(numoptions <= num_relopt_elems); + + /* if none set, we're done */ + if (numoptions == 0) + { + Assert(options == NULL); + return NULL; + } + + /* allocate and fill the structure */ + rdopts = allocateReloptStruct(relopt_struct_size, options, numoptions); + fillRelOptions(rdopts, relopt_struct_size, options, numoptions, + validate, relopt_elems, num_relopt_elems); + + pfree(options); + + return rdopts; +} + +/* + * Parse local options, allocate a bytea struct that's of the specified + * 'base_size' plus any extra space that's needed for string variables, + * fill its option's fields located at the given offsets and return it. + */ +void * +build_local_reloptions(local_relopts *relopts, Datum options, bool validate) +{ + int noptions = list_length(relopts->options); + relopt_parse_elt *elems = palloc(sizeof(*elems) * noptions); + relopt_value *vals; + void *opts; + int i = 0; + ListCell *lc; + + foreach(lc, relopts->options) + { + local_relopt *opt = lfirst(lc); + + elems[i].optname = opt->option->name; + elems[i].opttype = opt->option->type; + elems[i].offset = opt->offset; + + i++; + } + + vals = parseLocalRelOptions(relopts, options, validate); + opts = allocateReloptStruct(relopts->relopt_struct_size, vals, noptions); + fillRelOptions(opts, relopts->relopt_struct_size, vals, noptions, validate, + elems, noptions); + + foreach(lc, relopts->validators) + ((relopts_validator) lfirst(lc)) (opts, vals, noptions); + + if (elems) + pfree(elems); + + return opts; +} + +/* + * Option parser for partitioned tables + */ +bytea * +partitioned_table_reloptions(Datum reloptions, bool validate) +{ + /* + * There are no options for partitioned tables yet, but this is able to do + * some validation. + */ + return (bytea *) build_reloptions(reloptions, validate, + RELOPT_KIND_PARTITIONED, + 0, NULL, 0); +} + +/* + * Option parser for views + */ +bytea * +view_reloptions(Datum reloptions, bool validate) +{ + static const relopt_parse_elt tab[] = { + {"security_barrier", RELOPT_TYPE_BOOL, + offsetof(ViewOptions, security_barrier)}, + {"check_option", RELOPT_TYPE_ENUM, + offsetof(ViewOptions, check_option)} + }; + + return (bytea *) build_reloptions(reloptions, validate, + RELOPT_KIND_VIEW, + sizeof(ViewOptions), + tab, lengthof(tab)); +} + +/* + * Parse options for heaps, views and toast tables. + */ +bytea * +heap_reloptions(char relkind, Datum reloptions, bool validate) +{ + StdRdOptions *rdopts; + + switch (relkind) + { + case RELKIND_TOASTVALUE: + rdopts = (StdRdOptions *) + default_reloptions(reloptions, validate, RELOPT_KIND_TOAST); + if (rdopts != NULL) + { + /* adjust default-only parameters for TOAST relations */ + rdopts->fillfactor = 100; + rdopts->autovacuum.analyze_threshold = -1; + rdopts->autovacuum.analyze_scale_factor = -1; + } + return (bytea *) rdopts; + case RELKIND_RELATION: + case RELKIND_MATVIEW: + return default_reloptions(reloptions, validate, RELOPT_KIND_HEAP); + default: + /* other relkinds are not supported */ + return NULL; + } +} + + +/* + * Parse options for indexes. + * + * amoptions index AM's option parser function + * reloptions options as text[] datum + * validate error flag + */ +bytea * +index_reloptions(amoptions_function amoptions, Datum reloptions, bool validate) +{ + Assert(amoptions != NULL); + + /* Assume function is strict */ + if (!PointerIsValid(DatumGetPointer(reloptions))) + return NULL; + + return amoptions(reloptions, validate); +} + +/* + * Option parser for attribute reloptions + */ +bytea * +attribute_reloptions(Datum reloptions, bool validate) +{ + static const relopt_parse_elt tab[] = { + {"n_distinct", RELOPT_TYPE_REAL, offsetof(AttributeOpts, n_distinct)}, + {"n_distinct_inherited", RELOPT_TYPE_REAL, offsetof(AttributeOpts, n_distinct_inherited)} + }; + + return (bytea *) build_reloptions(reloptions, validate, + RELOPT_KIND_ATTRIBUTE, + sizeof(AttributeOpts), + tab, lengthof(tab)); +} + +/* + * Option parser for tablespace reloptions + */ +bytea * +tablespace_reloptions(Datum reloptions, bool validate) +{ + static const relopt_parse_elt tab[] = { + {"random_page_cost", RELOPT_TYPE_REAL, offsetof(TableSpaceOpts, random_page_cost)}, + {"seq_page_cost", RELOPT_TYPE_REAL, offsetof(TableSpaceOpts, seq_page_cost)}, + {"effective_io_concurrency", RELOPT_TYPE_INT, offsetof(TableSpaceOpts, effective_io_concurrency)}, + {"maintenance_io_concurrency", RELOPT_TYPE_INT, offsetof(TableSpaceOpts, maintenance_io_concurrency)} + }; + + return (bytea *) build_reloptions(reloptions, validate, + RELOPT_KIND_TABLESPACE, + sizeof(TableSpaceOpts), + tab, lengthof(tab)); +} + +/* + * Determine the required LOCKMODE from an option list. + * + * Called from AlterTableGetLockLevel(), see that function + * for a longer explanation of how this works. + */ +LOCKMODE +AlterTableGetRelOptionsLockLevel(List *defList) +{ + LOCKMODE lockmode = NoLock; + ListCell *cell; + + if (defList == NIL) + return AccessExclusiveLock; + + if (need_initialization) + initialize_reloptions(); + + foreach(cell, defList) + { + DefElem *def = (DefElem *) lfirst(cell); + int i; + + for (i = 0; relOpts[i]; i++) + { + if (strncmp(relOpts[i]->name, + def->defname, + relOpts[i]->namelen + 1) == 0) + { + if (lockmode < relOpts[i]->lockmode) + lockmode = relOpts[i]->lockmode; + } + } + } + + return lockmode; +} diff --git a/src/backend/access/common/scankey.c b/src/backend/access/common/scankey.c new file mode 100644 index 0000000..bf33c50 --- /dev/null +++ b/src/backend/access/common/scankey.c @@ -0,0 +1,117 @@ +/*------------------------------------------------------------------------- + * + * scankey.c + * scan key support code + * + * Portions Copyright (c) 1996-2021, PostgreSQL Global Development Group + * Portions Copyright (c) 1994, Regents of the University of California + * + * + * IDENTIFICATION + * src/backend/access/common/scankey.c + * + *------------------------------------------------------------------------- + */ +#include "postgres.h" + +#include "access/skey.h" +#include "catalog/pg_collation.h" + + +/* + * ScanKeyEntryInitialize + * Initializes a scan key entry given all the field values. + * The target procedure is specified by OID (but can be invalid + * if SK_SEARCHNULL or SK_SEARCHNOTNULL is set). + * + * Note: CurrentMemoryContext at call should be as long-lived as the ScanKey + * itself, because that's what will be used for any subsidiary info attached + * to the ScanKey's FmgrInfo record. + */ +void +ScanKeyEntryInitialize(ScanKey entry, + int flags, + AttrNumber attributeNumber, + StrategyNumber strategy, + Oid subtype, + Oid collation, + RegProcedure procedure, + Datum argument) +{ + entry->sk_flags = flags; + entry->sk_attno = attributeNumber; + entry->sk_strategy = strategy; + entry->sk_subtype = subtype; + entry->sk_collation = collation; + entry->sk_argument = argument; + if (RegProcedureIsValid(procedure)) + { + fmgr_info(procedure, &entry->sk_func); + } + else + { + Assert(flags & (SK_SEARCHNULL | SK_SEARCHNOTNULL)); + MemSet(&entry->sk_func, 0, sizeof(entry->sk_func)); + } +} + +/* + * ScanKeyInit + * Shorthand version of ScanKeyEntryInitialize: flags and subtype + * are assumed to be zero (the usual value), and collation is defaulted. + * + * This is the recommended version for hardwired lookups in system catalogs. + * It cannot handle NULL arguments, unary operators, or nondefault operators, + * but we need none of those features for most hardwired lookups. + * + * We set collation to C_COLLATION_OID always. This is the correct value + * for all collation-aware columns in system catalogs, and it will be ignored + * for other column types, so it's not worth trying to be more finicky. + * + * Note: CurrentMemoryContext at call should be as long-lived as the ScanKey + * itself, because that's what will be used for any subsidiary info attached + * to the ScanKey's FmgrInfo record. + */ +void +ScanKeyInit(ScanKey entry, + AttrNumber attributeNumber, + StrategyNumber strategy, + RegProcedure procedure, + Datum argument) +{ + entry->sk_flags = 0; + entry->sk_attno = attributeNumber; + entry->sk_strategy = strategy; + entry->sk_subtype = InvalidOid; + entry->sk_collation = C_COLLATION_OID; + entry->sk_argument = argument; + fmgr_info(procedure, &entry->sk_func); +} + +/* + * ScanKeyEntryInitializeWithInfo + * Initializes a scan key entry using an already-completed FmgrInfo + * function lookup record. + * + * Note: CurrentMemoryContext at call should be as long-lived as the ScanKey + * itself, because that's what will be used for any subsidiary info attached + * to the ScanKey's FmgrInfo record. + */ +void +ScanKeyEntryInitializeWithInfo(ScanKey entry, + int flags, + AttrNumber attributeNumber, + StrategyNumber strategy, + Oid subtype, + Oid collation, + FmgrInfo *finfo, + Datum argument) +{ + entry->sk_flags = flags; + entry->sk_attno = attributeNumber; + entry->sk_strategy = strategy; + entry->sk_subtype = subtype; + entry->sk_collation = collation; + entry->sk_argument = argument; + fmgr_info_copy(&entry->sk_func, finfo, CurrentMemoryContext); +} diff --git a/src/backend/access/common/session.c b/src/backend/access/common/session.c new file mode 100644 index 0000000..61b3206 --- /dev/null +++ b/src/backend/access/common/session.c @@ -0,0 +1,208 @@ +/*------------------------------------------------------------------------- + * + * session.c + * Encapsulation of user session. + * + * This is intended to contain data that needs to be shared between backends + * performing work for a client session. In particular such a session is + * shared between the leader and worker processes for parallel queries. At + * some later point it might also become useful infrastructure for separating + * backends from client connections, e.g. for the purpose of pooling. + * + * Currently this infrastructure is used to share: + * - typemod registry for ephemeral row-types, i.e. BlessTupleDesc etc. + * + * Portions Copyright (c) 2017-2021, PostgreSQL Global Development Group + * + * src/backend/access/common/session.c + * + *------------------------------------------------------------------------- + */ +#include "postgres.h" + +#include "access/session.h" +#include "storage/lwlock.h" +#include "storage/shm_toc.h" +#include "utils/memutils.h" +#include "utils/typcache.h" + +/* Magic number for per-session DSM TOC. */ +#define SESSION_MAGIC 0xabb0fbc9 + +/* + * We want to create a DSA area to store shared state that has the same + * lifetime as a session. So far, it's only used to hold the shared record + * type registry. We don't want it to have to create any DSM segments just + * yet in common cases, so we'll give it enough space to hold a very small + * SharedRecordTypmodRegistry. + */ +#define SESSION_DSA_SIZE 0x30000 + +/* + * Magic numbers for state sharing in the per-session DSM area. + */ +#define SESSION_KEY_DSA UINT64CONST(0xFFFFFFFFFFFF0001) +#define SESSION_KEY_RECORD_TYPMOD_REGISTRY UINT64CONST(0xFFFFFFFFFFFF0002) + +/* This backend's current session. */ +Session *CurrentSession = NULL; + +/* + * Set up CurrentSession to point to an empty Session object. + */ +void +InitializeSession(void) +{ + CurrentSession = MemoryContextAllocZero(TopMemoryContext, sizeof(Session)); +} + +/* + * Initialize the per-session DSM segment if it isn't already initialized, and + * return its handle so that worker processes can attach to it. + * + * Unlike the per-context DSM segment, this segment and its contents are + * reused for future parallel queries. + * + * Return DSM_HANDLE_INVALID if a segment can't be allocated due to lack of + * resources. + */ +dsm_handle +GetSessionDsmHandle(void) +{ + shm_toc_estimator estimator; + shm_toc *toc; + dsm_segment *seg; + size_t typmod_registry_size; + size_t size; + void *dsa_space; + void *typmod_registry_space; + dsa_area *dsa; + MemoryContext old_context; + + /* + * If we have already created a session-scope DSM segment in this backend, + * return its handle. The same segment will be used for the rest of this + * backend's lifetime. + */ + if (CurrentSession->segment != NULL) + return dsm_segment_handle(CurrentSession->segment); + + /* Otherwise, prepare to set one up. */ + old_context = MemoryContextSwitchTo(TopMemoryContext); + shm_toc_initialize_estimator(&estimator); + + /* Estimate space for the per-session DSA area. */ + shm_toc_estimate_keys(&estimator, 1); + shm_toc_estimate_chunk(&estimator, SESSION_DSA_SIZE); + + /* Estimate space for the per-session record typmod registry. */ + typmod_registry_size = SharedRecordTypmodRegistryEstimate(); + shm_toc_estimate_keys(&estimator, 1); + shm_toc_estimate_chunk(&estimator, typmod_registry_size); + + /* Set up segment and TOC. */ + size = shm_toc_estimate(&estimator); + seg = dsm_create(size, DSM_CREATE_NULL_IF_MAXSEGMENTS); + if (seg == NULL) + { + MemoryContextSwitchTo(old_context); + + return DSM_HANDLE_INVALID; + } + toc = shm_toc_create(SESSION_MAGIC, + dsm_segment_address(seg), + size); + + /* Create per-session DSA area. */ + dsa_space = shm_toc_allocate(toc, SESSION_DSA_SIZE); + dsa = dsa_create_in_place(dsa_space, + SESSION_DSA_SIZE, + LWTRANCHE_PER_SESSION_DSA, + seg); + shm_toc_insert(toc, SESSION_KEY_DSA, dsa_space); + + + /* Create session-scoped shared record typmod registry. */ + typmod_registry_space = shm_toc_allocate(toc, typmod_registry_size); + SharedRecordTypmodRegistryInit((SharedRecordTypmodRegistry *) + typmod_registry_space, seg, dsa); + shm_toc_insert(toc, SESSION_KEY_RECORD_TYPMOD_REGISTRY, + typmod_registry_space); + + /* + * If we got this far, we can pin the shared memory so it stays mapped for + * the rest of this backend's life. If we don't make it this far, cleanup + * callbacks for anything we installed above (ie currently + * SharedRecordTypmodRegistry) will run when the DSM segment is detached + * by CurrentResourceOwner so we aren't left with a broken CurrentSession. + */ + dsm_pin_mapping(seg); + dsa_pin_mapping(dsa); + + /* Make segment and area available via CurrentSession. */ + CurrentSession->segment = seg; + CurrentSession->area = dsa; + + MemoryContextSwitchTo(old_context); + + return dsm_segment_handle(seg); +} + +/* + * Attach to a per-session DSM segment provided by a parallel leader. + */ +void +AttachSession(dsm_handle handle) +{ + dsm_segment *seg; + shm_toc *toc; + void *dsa_space; + void *typmod_registry_space; + dsa_area *dsa; + MemoryContext old_context; + + old_context = MemoryContextSwitchTo(TopMemoryContext); + + /* Attach to the DSM segment. */ + seg = dsm_attach(handle); + if (seg == NULL) + elog(ERROR, "could not attach to per-session DSM segment"); + toc = shm_toc_attach(SESSION_MAGIC, dsm_segment_address(seg)); + + /* Attach to the DSA area. */ + dsa_space = shm_toc_lookup(toc, SESSION_KEY_DSA, false); + dsa = dsa_attach_in_place(dsa_space, seg); + + /* Make them available via the current session. */ + CurrentSession->segment = seg; + CurrentSession->area = dsa; + + /* Attach to the shared record typmod registry. */ + typmod_registry_space = + shm_toc_lookup(toc, SESSION_KEY_RECORD_TYPMOD_REGISTRY, false); + SharedRecordTypmodRegistryAttach((SharedRecordTypmodRegistry *) + typmod_registry_space); + + /* Remain attached until end of backend or DetachSession(). */ + dsm_pin_mapping(seg); + dsa_pin_mapping(dsa); + + MemoryContextSwitchTo(old_context); +} + +/* + * Detach from the current session DSM segment. It's not strictly necessary + * to do this explicitly since we'll detach automatically at backend exit, but + * if we ever reuse parallel workers it will become important for workers to + * detach from one session before attaching to another. Note that this runs + * detach hooks. + */ +void +DetachSession(void) +{ + /* Runs detach hooks. */ + dsm_detach(CurrentSession->segment); + CurrentSession->segment = NULL; + dsa_detach(CurrentSession->area); + CurrentSession->area = NULL; +} diff --git a/src/backend/access/common/syncscan.c b/src/backend/access/common/syncscan.c new file mode 100644 index 0000000..b7a28af --- /dev/null +++ b/src/backend/access/common/syncscan.c @@ -0,0 +1,322 @@ +/*------------------------------------------------------------------------- + * + * syncscan.c + * scan synchronization support + * + * When multiple backends run a sequential scan on the same table, we try + * to keep them synchronized to reduce the overall I/O needed. The goal is + * to read each page into shared buffer cache only once, and let all backends + * that take part in the shared scan process the page before it falls out of + * the cache. + * + * Since the "leader" in a pack of backends doing a seqscan will have to wait + * for I/O, while the "followers" don't, there is a strong self-synchronizing + * effect once we can get the backends examining approximately the same part + * of the table at the same time. Hence all that is really needed is to get + * a new backend beginning a seqscan to begin it close to where other backends + * are reading. We can scan the table circularly, from block X up to the + * end and then from block 0 to X-1, to ensure we visit all rows while still + * participating in the common scan. + * + * To accomplish that, we keep track of the scan position of each table, and + * start new scans close to where the previous scan(s) are. We don't try to + * do any extra synchronization to keep the scans together afterwards; some + * scans might progress much more slowly than others, for example if the + * results need to be transferred to the client over a slow network, and we + * don't want such queries to slow down others. + * + * There can realistically only be a few large sequential scans on different + * tables in progress at any time. Therefore we just keep the scan positions + * in a small LRU list which we scan every time we need to look up or update a + * scan position. The whole mechanism is only applied for tables exceeding + * a threshold size (but that is not the concern of this module). + * + * INTERFACE ROUTINES + * ss_get_location - return current scan location of a relation + * ss_report_location - update current scan location + * + * + * Portions Copyright (c) 1996-2021, PostgreSQL Global Development Group + * Portions Copyright (c) 1994, Regents of the University of California + * + * IDENTIFICATION + * src/backend/access/common/syncscan.c + * + *------------------------------------------------------------------------- + */ +#include "postgres.h" + +#include "access/syncscan.h" +#include "miscadmin.h" +#include "storage/lwlock.h" +#include "storage/shmem.h" +#include "utils/rel.h" + + +/* GUC variables */ +#ifdef TRACE_SYNCSCAN +bool trace_syncscan = false; +#endif + + +/* + * Size of the LRU list. + * + * Note: the code assumes that SYNC_SCAN_NELEM > 1. + * + * XXX: What's a good value? It should be large enough to hold the + * maximum number of large tables scanned simultaneously. But a larger value + * means more traversing of the LRU list when starting a new scan. + */ +#define SYNC_SCAN_NELEM 20 + +/* + * Interval between reports of the location of the current scan, in pages. + * + * Note: This should be smaller than the ring size (see buffer/freelist.c) + * we use for bulk reads. Otherwise a scan joining other scans might start + * from a page that's no longer in the buffer cache. This is a bit fuzzy; + * there's no guarantee that the new scan will read the page before it leaves + * the buffer cache anyway, and on the other hand the page is most likely + * still in the OS cache. + */ +#define SYNC_SCAN_REPORT_INTERVAL (128 * 1024 / BLCKSZ) + + +/* + * The scan locations structure is essentially a doubly-linked LRU with head + * and tail pointer, but designed to hold a fixed maximum number of elements in + * fixed-size shared memory. + */ +typedef struct ss_scan_location_t +{ + RelFileNode relfilenode; /* identity of a relation */ + BlockNumber location; /* last-reported location in the relation */ +} ss_scan_location_t; + +typedef struct ss_lru_item_t +{ + struct ss_lru_item_t *prev; + struct ss_lru_item_t *next; + ss_scan_location_t location; +} ss_lru_item_t; + +typedef struct ss_scan_locations_t +{ + ss_lru_item_t *head; + ss_lru_item_t *tail; + ss_lru_item_t items[FLEXIBLE_ARRAY_MEMBER]; /* SYNC_SCAN_NELEM items */ +} ss_scan_locations_t; + +#define SizeOfScanLocations(N) \ + (offsetof(ss_scan_locations_t, items) + (N) * sizeof(ss_lru_item_t)) + +/* Pointer to struct in shared memory */ +static ss_scan_locations_t *scan_locations; + +/* prototypes for internal functions */ +static BlockNumber ss_search(RelFileNode relfilenode, + BlockNumber location, bool set); + + +/* + * SyncScanShmemSize --- report amount of shared memory space needed + */ +Size +SyncScanShmemSize(void) +{ + return SizeOfScanLocations(SYNC_SCAN_NELEM); +} + +/* + * SyncScanShmemInit --- initialize this module's shared memory + */ +void +SyncScanShmemInit(void) +{ + int i; + bool found; + + scan_locations = (ss_scan_locations_t *) + ShmemInitStruct("Sync Scan Locations List", + SizeOfScanLocations(SYNC_SCAN_NELEM), + &found); + + if (!IsUnderPostmaster) + { + /* Initialize shared memory area */ + Assert(!found); + + scan_locations->head = &scan_locations->items[0]; + scan_locations->tail = &scan_locations->items[SYNC_SCAN_NELEM - 1]; + + for (i = 0; i < SYNC_SCAN_NELEM; i++) + { + ss_lru_item_t *item = &scan_locations->items[i]; + + /* + * Initialize all slots with invalid values. As scans are started, + * these invalid entries will fall off the LRU list and get + * replaced with real entries. + */ + item->location.relfilenode.spcNode = InvalidOid; + item->location.relfilenode.dbNode = InvalidOid; + item->location.relfilenode.relNode = InvalidOid; + item->location.location = InvalidBlockNumber; + + item->prev = (i > 0) ? + (&scan_locations->items[i - 1]) : NULL; + item->next = (i < SYNC_SCAN_NELEM - 1) ? + (&scan_locations->items[i + 1]) : NULL; + } + } + else + Assert(found); +} + +/* + * ss_search --- search the scan_locations structure for an entry with the + * given relfilenode. + * + * If "set" is true, the location is updated to the given location. If no + * entry for the given relfilenode is found, it will be created at the head + * of the list with the given location, even if "set" is false. + * + * In any case, the location after possible update is returned. + * + * Caller is responsible for having acquired suitable lock on the shared + * data structure. + */ +static BlockNumber +ss_search(RelFileNode relfilenode, BlockNumber location, bool set) +{ + ss_lru_item_t *item; + + item = scan_locations->head; + for (;;) + { + bool match; + + match = RelFileNodeEquals(item->location.relfilenode, relfilenode); + + if (match || item->next == NULL) + { + /* + * If we reached the end of list and no match was found, take over + * the last entry + */ + if (!match) + { + item->location.relfilenode = relfilenode; + item->location.location = location; + } + else if (set) + item->location.location = location; + + /* Move the entry to the front of the LRU list */ + if (item != scan_locations->head) + { + /* unlink */ + if (item == scan_locations->tail) + scan_locations->tail = item->prev; + item->prev->next = item->next; + if (item->next) + item->next->prev = item->prev; + + /* link */ + item->prev = NULL; + item->next = scan_locations->head; + scan_locations->head->prev = item; + scan_locations->head = item; + } + + return item->location.location; + } + + item = item->next; + } + + /* not reached */ +} + +/* + * ss_get_location --- get the optimal starting location for scan + * + * Returns the last-reported location of a sequential scan on the + * relation, or 0 if no valid location is found. + * + * We expect the caller has just done RelationGetNumberOfBlocks(), and + * so that number is passed in rather than computing it again. The result + * is guaranteed less than relnblocks (assuming that's > 0). + */ +BlockNumber +ss_get_location(Relation rel, BlockNumber relnblocks) +{ + BlockNumber startloc; + + LWLockAcquire(SyncScanLock, LW_EXCLUSIVE); + startloc = ss_search(rel->rd_node, 0, false); + LWLockRelease(SyncScanLock); + + /* + * If the location is not a valid block number for this scan, start at 0. + * + * This can happen if for instance a VACUUM truncated the table since the + * location was saved. + */ + if (startloc >= relnblocks) + startloc = 0; + +#ifdef TRACE_SYNCSCAN + if (trace_syncscan) + elog(LOG, + "SYNC_SCAN: start \"%s\" (size %u) at %u", + RelationGetRelationName(rel), relnblocks, startloc); +#endif + + return startloc; +} + +/* + * ss_report_location --- update the current scan location + * + * Writes an entry into the shared Sync Scan state of the form + * (relfilenode, blocknumber), overwriting any existing entry for the + * same relfilenode. + */ +void +ss_report_location(Relation rel, BlockNumber location) +{ +#ifdef TRACE_SYNCSCAN + if (trace_syncscan) + { + if ((location % 1024) == 0) + elog(LOG, + "SYNC_SCAN: scanning \"%s\" at %u", + RelationGetRelationName(rel), location); + } +#endif + + /* + * To reduce lock contention, only report scan progress every N pages. For + * the same reason, don't block if the lock isn't immediately available. + * Missing a few updates isn't critical, it just means that a new scan + * that wants to join the pack will start a little bit behind the head of + * the scan. Hopefully the pages are still in OS cache and the scan + * catches up quickly. + */ + if ((location % SYNC_SCAN_REPORT_INTERVAL) == 0) + { + if (LWLockConditionalAcquire(SyncScanLock, LW_EXCLUSIVE)) + { + (void) ss_search(rel->rd_node, location, true); + LWLockRelease(SyncScanLock); + } +#ifdef TRACE_SYNCSCAN + else if (trace_syncscan) + elog(LOG, + "SYNC_SCAN: missed update for \"%s\" at %u", + RelationGetRelationName(rel), location); +#endif + } +} diff --git a/src/backend/access/common/toast_compression.c b/src/backend/access/common/toast_compression.c new file mode 100644 index 0000000..8456183 --- /dev/null +++ b/src/backend/access/common/toast_compression.c @@ -0,0 +1,318 @@ +/*------------------------------------------------------------------------- + * + * toast_compression.c + * Functions for toast compression. + * + * Copyright (c) 2021, PostgreSQL Global Development Group + * + * + * IDENTIFICATION + * src/backend/access/common/toast_compression.c + * + *------------------------------------------------------------------------- + */ +#include "postgres.h" + +#ifdef USE_LZ4 +#include <lz4.h> +#endif + +#include "access/detoast.h" +#include "access/toast_compression.h" +#include "common/pg_lzcompress.h" +#include "fmgr.h" +#include "utils/builtins.h" + +/* GUC */ +int default_toast_compression = TOAST_PGLZ_COMPRESSION; + +#define NO_LZ4_SUPPORT() \ + ereport(ERROR, \ + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), \ + errmsg("compression method lz4 not supported"), \ + errdetail("This functionality requires the server to be built with lz4 support."), \ + errhint("You need to rebuild PostgreSQL using %s.", "--with-lz4"))) + +/* + * Compress a varlena using PGLZ. + * + * Returns the compressed varlena, or NULL if compression fails. + */ +struct varlena * +pglz_compress_datum(const struct varlena *value) +{ + int32 valsize, + len; + struct varlena *tmp = NULL; + + valsize = VARSIZE_ANY_EXHDR(DatumGetPointer(value)); + + /* + * No point in wasting a palloc cycle if value size is outside the allowed + * range for compression. + */ + if (valsize < PGLZ_strategy_default->min_input_size || + valsize > PGLZ_strategy_default->max_input_size) + return NULL; + + /* + * Figure out the maximum possible size of the pglz output, add the bytes + * that will be needed for varlena overhead, and allocate that amount. + */ + tmp = (struct varlena *) palloc(PGLZ_MAX_OUTPUT(valsize) + + VARHDRSZ_COMPRESSED); + + len = pglz_compress(VARDATA_ANY(value), + valsize, + (char *) tmp + VARHDRSZ_COMPRESSED, + NULL); + if (len < 0) + { + pfree(tmp); + return NULL; + } + + SET_VARSIZE_COMPRESSED(tmp, len + VARHDRSZ_COMPRESSED); + + return tmp; +} + +/* + * Decompress a varlena that was compressed using PGLZ. + */ +struct varlena * +pglz_decompress_datum(const struct varlena *value) +{ + struct varlena *result; + int32 rawsize; + + /* allocate memory for the uncompressed data */ + result = (struct varlena *) palloc(VARDATA_COMPRESSED_GET_EXTSIZE(value) + VARHDRSZ); + + /* decompress the data */ + rawsize = pglz_decompress((char *) value + VARHDRSZ_COMPRESSED, + VARSIZE(value) - VARHDRSZ_COMPRESSED, + VARDATA(result), + VARDATA_COMPRESSED_GET_EXTSIZE(value), true); + if (rawsize < 0) + ereport(ERROR, + (errcode(ERRCODE_DATA_CORRUPTED), + errmsg_internal("compressed pglz data is corrupt"))); + + SET_VARSIZE(result, rawsize + VARHDRSZ); + + return result; +} + +/* + * Decompress part of a varlena that was compressed using PGLZ. + */ +struct varlena * +pglz_decompress_datum_slice(const struct varlena *value, + int32 slicelength) +{ + struct varlena *result; + int32 rawsize; + + /* allocate memory for the uncompressed data */ + result = (struct varlena *) palloc(slicelength + VARHDRSZ); + + /* decompress the data */ + rawsize = pglz_decompress((char *) value + VARHDRSZ_COMPRESSED, + VARSIZE(value) - VARHDRSZ_COMPRESSED, + VARDATA(result), + slicelength, false); + if (rawsize < 0) + ereport(ERROR, + (errcode(ERRCODE_DATA_CORRUPTED), + errmsg_internal("compressed pglz data is corrupt"))); + + SET_VARSIZE(result, rawsize + VARHDRSZ); + + return result; +} + +/* + * Compress a varlena using LZ4. + * + * Returns the compressed varlena, or NULL if compression fails. + */ +struct varlena * +lz4_compress_datum(const struct varlena *value) +{ +#ifndef USE_LZ4 + NO_LZ4_SUPPORT(); + return NULL; /* keep compiler quiet */ +#else + int32 valsize; + int32 len; + int32 max_size; + struct varlena *tmp = NULL; + + valsize = VARSIZE_ANY_EXHDR(value); + + /* + * Figure out the maximum possible size of the LZ4 output, add the bytes + * that will be needed for varlena overhead, and allocate that amount. + */ + max_size = LZ4_compressBound(valsize); + tmp = (struct varlena *) palloc(max_size + VARHDRSZ_COMPRESSED); + + len = LZ4_compress_default(VARDATA_ANY(value), + (char *) tmp + VARHDRSZ_COMPRESSED, + valsize, max_size); + if (len <= 0) + elog(ERROR, "lz4 compression failed"); + + /* data is incompressible so just free the memory and return NULL */ + if (len > valsize) + { + pfree(tmp); + return NULL; + } + + SET_VARSIZE_COMPRESSED(tmp, len + VARHDRSZ_COMPRESSED); + + return tmp; +#endif +} + +/* + * Decompress a varlena that was compressed using LZ4. + */ +struct varlena * +lz4_decompress_datum(const struct varlena *value) +{ +#ifndef USE_LZ4 + NO_LZ4_SUPPORT(); + return NULL; /* keep compiler quiet */ +#else + int32 rawsize; + struct varlena *result; + + /* allocate memory for the uncompressed data */ + result = (struct varlena *) palloc(VARDATA_COMPRESSED_GET_EXTSIZE(value) + VARHDRSZ); + + /* decompress the data */ + rawsize = LZ4_decompress_safe((char *) value + VARHDRSZ_COMPRESSED, + VARDATA(result), + VARSIZE(value) - VARHDRSZ_COMPRESSED, + VARDATA_COMPRESSED_GET_EXTSIZE(value)); + if (rawsize < 0) + ereport(ERROR, + (errcode(ERRCODE_DATA_CORRUPTED), + errmsg_internal("compressed lz4 data is corrupt"))); + + + SET_VARSIZE(result, rawsize + VARHDRSZ); + + return result; +#endif +} + +/* + * Decompress part of a varlena that was compressed using LZ4. + */ +struct varlena * +lz4_decompress_datum_slice(const struct varlena *value, int32 slicelength) +{ +#ifndef USE_LZ4 + NO_LZ4_SUPPORT(); + return NULL; /* keep compiler quiet */ +#else + int32 rawsize; + struct varlena *result; + + /* slice decompression not supported prior to 1.8.3 */ + if (LZ4_versionNumber() < 10803) + return lz4_decompress_datum(value); + + /* allocate memory for the uncompressed data */ + result = (struct varlena *) palloc(slicelength + VARHDRSZ); + + /* decompress the data */ + rawsize = LZ4_decompress_safe_partial((char *) value + VARHDRSZ_COMPRESSED, + VARDATA(result), + VARSIZE(value) - VARHDRSZ_COMPRESSED, + slicelength, + slicelength); + if (rawsize < 0) + ereport(ERROR, + (errcode(ERRCODE_DATA_CORRUPTED), + errmsg_internal("compressed lz4 data is corrupt"))); + + SET_VARSIZE(result, rawsize + VARHDRSZ); + + return result; +#endif +} + +/* + * Extract compression ID from a varlena. + * + * Returns TOAST_INVALID_COMPRESSION_ID if the varlena is not compressed. + */ +ToastCompressionId +toast_get_compression_id(struct varlena *attr) +{ + ToastCompressionId cmid = TOAST_INVALID_COMPRESSION_ID; + + /* + * If it is stored externally then fetch the compression method id from + * the external toast pointer. If compressed inline, fetch it from the + * toast compression header. + */ + if (VARATT_IS_EXTERNAL_ONDISK(attr)) + { + struct varatt_external toast_pointer; + + VARATT_EXTERNAL_GET_POINTER(toast_pointer, attr); + + if (VARATT_EXTERNAL_IS_COMPRESSED(toast_pointer)) + cmid = VARATT_EXTERNAL_GET_COMPRESS_METHOD(toast_pointer); + } + else if (VARATT_IS_COMPRESSED(attr)) + cmid = VARDATA_COMPRESSED_GET_COMPRESS_METHOD(attr); + + return cmid; +} + +/* + * CompressionNameToMethod - Get compression method from compression name + * + * Search in the available built-in methods. If the compression not found + * in the built-in methods then return InvalidCompressionMethod. + */ +char +CompressionNameToMethod(const char *compression) +{ + if (strcmp(compression, "pglz") == 0) + return TOAST_PGLZ_COMPRESSION; + else if (strcmp(compression, "lz4") == 0) + { +#ifndef USE_LZ4 + NO_LZ4_SUPPORT(); +#endif + return TOAST_LZ4_COMPRESSION; + } + + return InvalidCompressionMethod; +} + +/* + * GetCompressionMethodName - Get compression method name + */ +const char * +GetCompressionMethodName(char method) +{ + switch (method) + { + case TOAST_PGLZ_COMPRESSION: + return "pglz"; + case TOAST_LZ4_COMPRESSION: + return "lz4"; + default: + elog(ERROR, "invalid compression method %c", method); + return NULL; /* keep compiler quiet */ + } +} diff --git a/src/backend/access/common/toast_internals.c b/src/backend/access/common/toast_internals.c new file mode 100644 index 0000000..2d2fd60 --- /dev/null +++ b/src/backend/access/common/toast_internals.c @@ -0,0 +1,664 @@ +/*------------------------------------------------------------------------- + * + * toast_internals.c + * Functions for internal use by the TOAST system. + * + * Copyright (c) 2000-2021, PostgreSQL Global Development Group + * + * IDENTIFICATION + * src/backend/access/common/toast_internals.c + * + *------------------------------------------------------------------------- + */ + +#include "postgres.h" + +#include "access/detoast.h" +#include "access/genam.h" +#include "access/heapam.h" +#include "access/heaptoast.h" +#include "access/table.h" +#include "access/toast_internals.h" +#include "access/xact.h" +#include "catalog/catalog.h" +#include "common/pg_lzcompress.h" +#include "miscadmin.h" +#include "utils/fmgroids.h" +#include "utils/rel.h" +#include "utils/snapmgr.h" + +static bool toastrel_valueid_exists(Relation toastrel, Oid valueid); +static bool toastid_valueid_exists(Oid toastrelid, Oid valueid); + +/* ---------- + * toast_compress_datum - + * + * Create a compressed version of a varlena datum + * + * If we fail (ie, compressed result is actually bigger than original) + * then return NULL. We must not use compressed data if it'd expand + * the tuple! + * + * We use VAR{SIZE,DATA}_ANY so we can handle short varlenas here without + * copying them. But we can't handle external or compressed datums. + * ---------- + */ +Datum +toast_compress_datum(Datum value, char cmethod) +{ + struct varlena *tmp = NULL; + int32 valsize; + ToastCompressionId cmid = TOAST_INVALID_COMPRESSION_ID; + + Assert(!VARATT_IS_EXTERNAL(DatumGetPointer(value))); + Assert(!VARATT_IS_COMPRESSED(DatumGetPointer(value))); + + valsize = VARSIZE_ANY_EXHDR(DatumGetPointer(value)); + + /* If the compression method is not valid, use the current default */ + if (!CompressionMethodIsValid(cmethod)) + cmethod = default_toast_compression; + + /* + * Call appropriate compression routine for the compression method. + */ + switch (cmethod) + { + case TOAST_PGLZ_COMPRESSION: + tmp = pglz_compress_datum((const struct varlena *) value); + cmid = TOAST_PGLZ_COMPRESSION_ID; + break; + case TOAST_LZ4_COMPRESSION: + tmp = lz4_compress_datum((const struct varlena *) value); + cmid = TOAST_LZ4_COMPRESSION_ID; + break; + default: + elog(ERROR, "invalid compression method %c", cmethod); + } + + if (tmp == NULL) + return PointerGetDatum(NULL); + + /* + * We recheck the actual size even if compression reports success, because + * it might be satisfied with having saved as little as one byte in the + * compressed data --- which could turn into a net loss once you consider + * header and alignment padding. Worst case, the compressed format might + * require three padding bytes (plus header, which is included in + * VARSIZE(tmp)), whereas the uncompressed format would take only one + * header byte and no padding if the value is short enough. So we insist + * on a savings of more than 2 bytes to ensure we have a gain. + */ + if (VARSIZE(tmp) < valsize - 2) + { + /* successful compression */ + Assert(cmid != TOAST_INVALID_COMPRESSION_ID); + TOAST_COMPRESS_SET_SIZE_AND_COMPRESS_METHOD(tmp, valsize, cmid); + return PointerGetDatum(tmp); + } + else + { + /* incompressible data */ + pfree(tmp); + return PointerGetDatum(NULL); + } +} + +/* ---------- + * toast_save_datum - + * + * Save one single datum into the secondary relation and return + * a Datum reference for it. + * + * rel: the main relation we're working with (not the toast rel!) + * value: datum to be pushed to toast storage + * oldexternal: if not NULL, toast pointer previously representing the datum + * options: options to be passed to heap_insert() for toast rows + * ---------- + */ +Datum +toast_save_datum(Relation rel, Datum value, + struct varlena *oldexternal, int options) +{ + Relation toastrel; + Relation *toastidxs; + HeapTuple toasttup; + TupleDesc toasttupDesc; + Datum t_values[3]; + bool t_isnull[3]; + CommandId mycid = GetCurrentCommandId(true); + struct varlena *result; + struct varatt_external toast_pointer; + union + { + struct varlena hdr; + /* this is to make the union big enough for a chunk: */ + char data[TOAST_MAX_CHUNK_SIZE + VARHDRSZ]; + /* ensure union is aligned well enough: */ + int32 align_it; + } chunk_data; + int32 chunk_size; + int32 chunk_seq = 0; + char *data_p; + int32 data_todo; + Pointer dval = DatumGetPointer(value); + int num_indexes; + int validIndex; + + Assert(!VARATT_IS_EXTERNAL(value)); + + /* + * Open the toast relation and its indexes. We can use the index to check + * uniqueness of the OID we assign to the toasted item, even though it has + * additional columns besides OID. + */ + toastrel = table_open(rel->rd_rel->reltoastrelid, RowExclusiveLock); + toasttupDesc = toastrel->rd_att; + + /* Open all the toast indexes and look for the valid one */ + validIndex = toast_open_indexes(toastrel, + RowExclusiveLock, + &toastidxs, + &num_indexes); + + /* + * Get the data pointer and length, and compute va_rawsize and va_extinfo. + * + * va_rawsize is the size of the equivalent fully uncompressed datum, so + * we have to adjust for short headers. + * + * va_extinfo stored the actual size of the data payload in the toast + * records and the compression method in first 2 bits if data is + * compressed. + */ + if (VARATT_IS_SHORT(dval)) + { + data_p = VARDATA_SHORT(dval); + data_todo = VARSIZE_SHORT(dval) - VARHDRSZ_SHORT; + toast_pointer.va_rawsize = data_todo + VARHDRSZ; /* as if not short */ + toast_pointer.va_extinfo = data_todo; + } + else if (VARATT_IS_COMPRESSED(dval)) + { + data_p = VARDATA(dval); + data_todo = VARSIZE(dval) - VARHDRSZ; + /* rawsize in a compressed datum is just the size of the payload */ + toast_pointer.va_rawsize = VARDATA_COMPRESSED_GET_EXTSIZE(dval) + VARHDRSZ; + + /* set external size and compression method */ + VARATT_EXTERNAL_SET_SIZE_AND_COMPRESS_METHOD(toast_pointer, data_todo, + VARDATA_COMPRESSED_GET_COMPRESS_METHOD(dval)); + /* Assert that the numbers look like it's compressed */ + Assert(VARATT_EXTERNAL_IS_COMPRESSED(toast_pointer)); + } + else + { + data_p = VARDATA(dval); + data_todo = VARSIZE(dval) - VARHDRSZ; + toast_pointer.va_rawsize = VARSIZE(dval); + toast_pointer.va_extinfo = data_todo; + } + + /* + * Insert the correct table OID into the result TOAST pointer. + * + * Normally this is the actual OID of the target toast table, but during + * table-rewriting operations such as CLUSTER, we have to insert the OID + * of the table's real permanent toast table instead. rd_toastoid is set + * if we have to substitute such an OID. + */ + if (OidIsValid(rel->rd_toastoid)) + toast_pointer.va_toastrelid = rel->rd_toastoid; + else + toast_pointer.va_toastrelid = RelationGetRelid(toastrel); + + /* + * Choose an OID to use as the value ID for this toast value. + * + * Normally we just choose an unused OID within the toast table. But + * during table-rewriting operations where we are preserving an existing + * toast table OID, we want to preserve toast value OIDs too. So, if + * rd_toastoid is set and we had a prior external value from that same + * toast table, re-use its value ID. If we didn't have a prior external + * value (which is a corner case, but possible if the table's attstorage + * options have been changed), we have to pick a value ID that doesn't + * conflict with either new or existing toast value OIDs. + */ + if (!OidIsValid(rel->rd_toastoid)) + { + /* normal case: just choose an unused OID */ + toast_pointer.va_valueid = + GetNewOidWithIndex(toastrel, + RelationGetRelid(toastidxs[validIndex]), + (AttrNumber) 1); + } + else + { + /* rewrite case: check to see if value was in old toast table */ + toast_pointer.va_valueid = InvalidOid; + if (oldexternal != NULL) + { + struct varatt_external old_toast_pointer; + + Assert(VARATT_IS_EXTERNAL_ONDISK(oldexternal)); + /* Must copy to access aligned fields */ + VARATT_EXTERNAL_GET_POINTER(old_toast_pointer, oldexternal); + if (old_toast_pointer.va_toastrelid == rel->rd_toastoid) + { + /* This value came from the old toast table; reuse its OID */ + toast_pointer.va_valueid = old_toast_pointer.va_valueid; + + /* + * There is a corner case here: the table rewrite might have + * to copy both live and recently-dead versions of a row, and + * those versions could easily reference the same toast value. + * When we copy the second or later version of such a row, + * reusing the OID will mean we select an OID that's already + * in the new toast table. Check for that, and if so, just + * fall through without writing the data again. + * + * While annoying and ugly-looking, this is a good thing + * because it ensures that we wind up with only one copy of + * the toast value when there is only one copy in the old + * toast table. Before we detected this case, we'd have made + * multiple copies, wasting space; and what's worse, the + * copies belonging to already-deleted heap tuples would not + * be reclaimed by VACUUM. + */ + if (toastrel_valueid_exists(toastrel, + toast_pointer.va_valueid)) + { + /* Match, so short-circuit the data storage loop below */ + data_todo = 0; + } + } + } + if (toast_pointer.va_valueid == InvalidOid) + { + /* + * new value; must choose an OID that doesn't conflict in either + * old or new toast table + */ + do + { + toast_pointer.va_valueid = + GetNewOidWithIndex(toastrel, + RelationGetRelid(toastidxs[validIndex]), + (AttrNumber) 1); + } while (toastid_valueid_exists(rel->rd_toastoid, + toast_pointer.va_valueid)); + } + } + + /* + * Initialize constant parts of the tuple data + */ + t_values[0] = ObjectIdGetDatum(toast_pointer.va_valueid); + t_values[2] = PointerGetDatum(&chunk_data); + t_isnull[0] = false; + t_isnull[1] = false; + t_isnull[2] = false; + + /* + * Split up the item into chunks + */ + while (data_todo > 0) + { + int i; + + CHECK_FOR_INTERRUPTS(); + + /* + * Calculate the size of this chunk + */ + chunk_size = Min(TOAST_MAX_CHUNK_SIZE, data_todo); + + /* + * Build a tuple and store it + */ + t_values[1] = Int32GetDatum(chunk_seq++); + SET_VARSIZE(&chunk_data, chunk_size + VARHDRSZ); + memcpy(VARDATA(&chunk_data), data_p, chunk_size); + toasttup = heap_form_tuple(toasttupDesc, t_values, t_isnull); + + heap_insert(toastrel, toasttup, mycid, options, NULL); + + /* + * Create the index entry. We cheat a little here by not using + * FormIndexDatum: this relies on the knowledge that the index columns + * are the same as the initial columns of the table for all the + * indexes. We also cheat by not providing an IndexInfo: this is okay + * for now because btree doesn't need one, but we might have to be + * more honest someday. + * + * Note also that there had better not be any user-created index on + * the TOAST table, since we don't bother to update anything else. + */ + for (i = 0; i < num_indexes; i++) + { + /* Only index relations marked as ready can be updated */ + if (toastidxs[i]->rd_index->indisready) + index_insert(toastidxs[i], t_values, t_isnull, + &(toasttup->t_self), + toastrel, + toastidxs[i]->rd_index->indisunique ? + UNIQUE_CHECK_YES : UNIQUE_CHECK_NO, + false, NULL); + } + + /* + * Free memory + */ + heap_freetuple(toasttup); + + /* + * Move on to next chunk + */ + data_todo -= chunk_size; + data_p += chunk_size; + } + + /* + * Done - close toast relation and its indexes but keep the lock until + * commit, so as a concurrent reindex done directly on the toast relation + * would be able to wait for this transaction. + */ + toast_close_indexes(toastidxs, num_indexes, NoLock); + table_close(toastrel, NoLock); + + /* + * Create the TOAST pointer value that we'll return + */ + result = (struct varlena *) palloc(TOAST_POINTER_SIZE); + SET_VARTAG_EXTERNAL(result, VARTAG_ONDISK); + memcpy(VARDATA_EXTERNAL(result), &toast_pointer, sizeof(toast_pointer)); + + return PointerGetDatum(result); +} + +/* ---------- + * toast_delete_datum - + * + * Delete a single external stored value. + * ---------- + */ +void +toast_delete_datum(Relation rel, Datum value, bool is_speculative) +{ + struct varlena *attr = (struct varlena *) DatumGetPointer(value); + struct varatt_external toast_pointer; + Relation toastrel; + Relation *toastidxs; + ScanKeyData toastkey; + SysScanDesc toastscan; + HeapTuple toasttup; + int num_indexes; + int validIndex; + SnapshotData SnapshotToast; + + if (!VARATT_IS_EXTERNAL_ONDISK(attr)) + return; + + /* Must copy to access aligned fields */ + VARATT_EXTERNAL_GET_POINTER(toast_pointer, attr); + + /* + * Open the toast relation and its indexes + */ + toastrel = table_open(toast_pointer.va_toastrelid, RowExclusiveLock); + + /* Fetch valid relation used for process */ + validIndex = toast_open_indexes(toastrel, + RowExclusiveLock, + &toastidxs, + &num_indexes); + + /* + * Setup a scan key to find chunks with matching va_valueid + */ + ScanKeyInit(&toastkey, + (AttrNumber) 1, + BTEqualStrategyNumber, F_OIDEQ, + ObjectIdGetDatum(toast_pointer.va_valueid)); + + /* + * Find all the chunks. (We don't actually care whether we see them in + * sequence or not, but since we've already locked the index we might as + * well use systable_beginscan_ordered.) + */ + init_toast_snapshot(&SnapshotToast); + toastscan = systable_beginscan_ordered(toastrel, toastidxs[validIndex], + &SnapshotToast, 1, &toastkey); + while ((toasttup = systable_getnext_ordered(toastscan, ForwardScanDirection)) != NULL) + { + /* + * Have a chunk, delete it + */ + if (is_speculative) + heap_abort_speculative(toastrel, &toasttup->t_self); + else + simple_heap_delete(toastrel, &toasttup->t_self); + } + + /* + * End scan and close relations but keep the lock until commit, so as a + * concurrent reindex done directly on the toast relation would be able to + * wait for this transaction. + */ + systable_endscan_ordered(toastscan); + toast_close_indexes(toastidxs, num_indexes, NoLock); + table_close(toastrel, NoLock); +} + +/* ---------- + * toastrel_valueid_exists - + * + * Test whether a toast value with the given ID exists in the toast relation. + * For safety, we consider a value to exist if there are either live or dead + * toast rows with that ID; see notes for GetNewOidWithIndex(). + * ---------- + */ +static bool +toastrel_valueid_exists(Relation toastrel, Oid valueid) +{ + bool result = false; + ScanKeyData toastkey; + SysScanDesc toastscan; + int num_indexes; + int validIndex; + Relation *toastidxs; + + /* Fetch a valid index relation */ + validIndex = toast_open_indexes(toastrel, + RowExclusiveLock, + &toastidxs, + &num_indexes); + + /* + * Setup a scan key to find chunks with matching va_valueid + */ + ScanKeyInit(&toastkey, + (AttrNumber) 1, + BTEqualStrategyNumber, F_OIDEQ, + ObjectIdGetDatum(valueid)); + + /* + * Is there any such chunk? + */ + toastscan = systable_beginscan(toastrel, + RelationGetRelid(toastidxs[validIndex]), + true, SnapshotAny, 1, &toastkey); + + if (systable_getnext(toastscan) != NULL) + result = true; + + systable_endscan(toastscan); + + /* Clean up */ + toast_close_indexes(toastidxs, num_indexes, RowExclusiveLock); + + return result; +} + +/* ---------- + * toastid_valueid_exists - + * + * As above, but work from toast rel's OID not an open relation + * ---------- + */ +static bool +toastid_valueid_exists(Oid toastrelid, Oid valueid) +{ + bool result; + Relation toastrel; + + toastrel = table_open(toastrelid, AccessShareLock); + + result = toastrel_valueid_exists(toastrel, valueid); + + table_close(toastrel, AccessShareLock); + + return result; +} + +/* ---------- + * toast_get_valid_index + * + * Get OID of valid index associated to given toast relation. A toast + * relation can have only one valid index at the same time. + */ +Oid +toast_get_valid_index(Oid toastoid, LOCKMODE lock) +{ + int num_indexes; + int validIndex; + Oid validIndexOid; + Relation *toastidxs; + Relation toastrel; + + /* Open the toast relation */ + toastrel = table_open(toastoid, lock); + + /* Look for the valid index of the toast relation */ + validIndex = toast_open_indexes(toastrel, + lock, + &toastidxs, + &num_indexes); + validIndexOid = RelationGetRelid(toastidxs[validIndex]); + + /* Close the toast relation and all its indexes */ + toast_close_indexes(toastidxs, num_indexes, NoLock); + table_close(toastrel, NoLock); + + return validIndexOid; +} + +/* ---------- + * toast_open_indexes + * + * Get an array of the indexes associated to the given toast relation + * and return as well the position of the valid index used by the toast + * relation in this array. It is the responsibility of the caller of this + * function to close the indexes as well as free them. + */ +int +toast_open_indexes(Relation toastrel, + LOCKMODE lock, + Relation **toastidxs, + int *num_indexes) +{ + int i = 0; + int res = 0; + bool found = false; + List *indexlist; + ListCell *lc; + + /* Get index list of the toast relation */ + indexlist = RelationGetIndexList(toastrel); + Assert(indexlist != NIL); + + *num_indexes = list_length(indexlist); + + /* Open all the index relations */ + *toastidxs = (Relation *) palloc(*num_indexes * sizeof(Relation)); + foreach(lc, indexlist) + (*toastidxs)[i++] = index_open(lfirst_oid(lc), lock); + + /* Fetch the first valid index in list */ + for (i = 0; i < *num_indexes; i++) + { + Relation toastidx = (*toastidxs)[i]; + + if (toastidx->rd_index->indisvalid) + { + res = i; + found = true; + break; + } + } + + /* + * Free index list, not necessary anymore as relations are opened and a + * valid index has been found. + */ + list_free(indexlist); + + /* + * The toast relation should have one valid index, so something is going + * wrong if there is nothing. + */ + if (!found) + elog(ERROR, "no valid index found for toast relation with Oid %u", + RelationGetRelid(toastrel)); + + return res; +} + +/* ---------- + * toast_close_indexes + * + * Close an array of indexes for a toast relation and free it. This should + * be called for a set of indexes opened previously with toast_open_indexes. + */ +void +toast_close_indexes(Relation *toastidxs, int num_indexes, LOCKMODE lock) +{ + int i; + + /* Close relations and clean up things */ + for (i = 0; i < num_indexes; i++) + index_close(toastidxs[i], lock); + pfree(toastidxs); +} + +/* ---------- + * init_toast_snapshot + * + * Initialize an appropriate TOAST snapshot. We must use an MVCC snapshot + * to initialize the TOAST snapshot; since we don't know which one to use, + * just use the oldest one. This is safe: at worst, we will get a "snapshot + * too old" error that might have been avoided otherwise. + */ +void +init_toast_snapshot(Snapshot toast_snapshot) +{ + Snapshot snapshot = GetOldestSnapshot(); + + /* + * GetOldestSnapshot returns NULL if the session has no active snapshots. + * We can get that if, for example, a procedure fetches a toasted value + * into a local variable, commits, and then tries to detoast the value. + * Such coding is unsafe, because once we commit there is nothing to + * prevent the toast data from being deleted. Detoasting *must* happen in + * the same transaction that originally fetched the toast pointer. Hence, + * rather than trying to band-aid over the problem, throw an error. (This + * is not very much protection, because in many scenarios the procedure + * would have already created a new transaction snapshot, preventing us + * from detecting the problem. But it's better than nothing, and for sure + * we shouldn't expend code on masking the problem more.) + */ + if (snapshot == NULL) + elog(ERROR, "cannot fetch toast data without an active snapshot"); + + InitToastSnapshot(*toast_snapshot, snapshot->lsn, snapshot->whenTaken); +} diff --git a/src/backend/access/common/tupconvert.c b/src/backend/access/common/tupconvert.c new file mode 100644 index 0000000..64f5439 --- /dev/null +++ b/src/backend/access/common/tupconvert.c @@ -0,0 +1,293 @@ +/*------------------------------------------------------------------------- + * + * tupconvert.c + * Tuple conversion support. + * + * These functions provide conversion between rowtypes that are logically + * equivalent but might have columns in a different order or different sets of + * dropped columns. + * + * Portions Copyright (c) 1996-2021, PostgreSQL Global Development Group + * Portions Copyright (c) 1994, Regents of the University of California + * + * + * IDENTIFICATION + * src/backend/access/common/tupconvert.c + * + *------------------------------------------------------------------------- + */ +#include "postgres.h" + +#include "access/tupconvert.h" +#include "executor/tuptable.h" + + +/* + * The conversion setup routines have the following common API: + * + * The setup routine checks using attmap.c whether the given source and + * destination tuple descriptors are logically compatible. If not, it throws + * an error. If so, it returns NULL if they are physically compatible (ie, no + * conversion is needed), else a TupleConversionMap that can be used by + * execute_attr_map_tuple or execute_attr_map_slot to perform the conversion. + * + * The TupleConversionMap, if needed, is palloc'd in the caller's memory + * context. Also, the given tuple descriptors are referenced by the map, + * so they must survive as long as the map is needed. + * + * The caller must supply a suitable primary error message to be used if + * a compatibility error is thrown. Recommended coding practice is to use + * gettext_noop() on this string, so that it is translatable but won't + * actually be translated unless the error gets thrown. + * + * + * Implementation notes: + * + * The key component of a TupleConversionMap is an attrMap[] array with + * one entry per output column. This entry contains the 1-based index of + * the corresponding input column, or zero to force a NULL value (for + * a dropped output column). The TupleConversionMap also contains workspace + * arrays. + */ + + +/* + * Set up for tuple conversion, matching input and output columns by + * position. (Dropped columns are ignored in both input and output.) + */ +TupleConversionMap * +convert_tuples_by_position(TupleDesc indesc, + TupleDesc outdesc, + const char *msg) +{ + TupleConversionMap *map; + int n; + AttrMap *attrMap; + + /* Verify compatibility and prepare attribute-number map */ + attrMap = build_attrmap_by_position(indesc, outdesc, msg); + + if (attrMap == NULL) + { + /* runtime conversion is not needed */ + return NULL; + } + + /* Prepare the map structure */ + map = (TupleConversionMap *) palloc(sizeof(TupleConversionMap)); + map->indesc = indesc; + map->outdesc = outdesc; + map->attrMap = attrMap; + /* preallocate workspace for Datum arrays */ + n = outdesc->natts + 1; /* +1 for NULL */ + map->outvalues = (Datum *) palloc(n * sizeof(Datum)); + map->outisnull = (bool *) palloc(n * sizeof(bool)); + n = indesc->natts + 1; /* +1 for NULL */ + map->invalues = (Datum *) palloc(n * sizeof(Datum)); + map->inisnull = (bool *) palloc(n * sizeof(bool)); + map->invalues[0] = (Datum) 0; /* set up the NULL entry */ + map->inisnull[0] = true; + + return map; +} + +/* + * Set up for tuple conversion, matching input and output columns by name. + * (Dropped columns are ignored in both input and output.) This is intended + * for use when the rowtypes are related by inheritance, so we expect an exact + * match of both type and typmod. The error messages will be a bit unhelpful + * unless both rowtypes are named composite types. + */ +TupleConversionMap * +convert_tuples_by_name(TupleDesc indesc, + TupleDesc outdesc) +{ + TupleConversionMap *map; + AttrMap *attrMap; + int n = outdesc->natts; + + /* Verify compatibility and prepare attribute-number map */ + attrMap = build_attrmap_by_name_if_req(indesc, outdesc); + + if (attrMap == NULL) + { + /* runtime conversion is not needed */ + return NULL; + } + + /* Prepare the map structure */ + map = (TupleConversionMap *) palloc(sizeof(TupleConversionMap)); + map->indesc = indesc; + map->outdesc = outdesc; + map->attrMap = attrMap; + /* preallocate workspace for Datum arrays */ + map->outvalues = (Datum *) palloc(n * sizeof(Datum)); + map->outisnull = (bool *) palloc(n * sizeof(bool)); + n = indesc->natts + 1; /* +1 for NULL */ + map->invalues = (Datum *) palloc(n * sizeof(Datum)); + map->inisnull = (bool *) palloc(n * sizeof(bool)); + map->invalues[0] = (Datum) 0; /* set up the NULL entry */ + map->inisnull[0] = true; + + return map; +} + +/* + * Perform conversion of a tuple according to the map. + */ +HeapTuple +execute_attr_map_tuple(HeapTuple tuple, TupleConversionMap *map) +{ + AttrMap *attrMap = map->attrMap; + Datum *invalues = map->invalues; + bool *inisnull = map->inisnull; + Datum *outvalues = map->outvalues; + bool *outisnull = map->outisnull; + int i; + + /* + * Extract all the values of the old tuple, offsetting the arrays so that + * invalues[0] is left NULL and invalues[1] is the first source attribute; + * this exactly matches the numbering convention in attrMap. + */ + heap_deform_tuple(tuple, map->indesc, invalues + 1, inisnull + 1); + + /* + * Transpose into proper fields of the new tuple. + */ + Assert(attrMap->maplen == map->outdesc->natts); + for (i = 0; i < attrMap->maplen; i++) + { + int j = attrMap->attnums[i]; + + outvalues[i] = invalues[j]; + outisnull[i] = inisnull[j]; + } + + /* + * Now form the new tuple. + */ + return heap_form_tuple(map->outdesc, outvalues, outisnull); +} + +/* + * Perform conversion of a tuple slot according to the map. + */ +TupleTableSlot * +execute_attr_map_slot(AttrMap *attrMap, + TupleTableSlot *in_slot, + TupleTableSlot *out_slot) +{ + Datum *invalues; + bool *inisnull; + Datum *outvalues; + bool *outisnull; + int outnatts; + int i; + + /* Sanity checks */ + Assert(in_slot->tts_tupleDescriptor != NULL && + out_slot->tts_tupleDescriptor != NULL); + Assert(in_slot->tts_values != NULL && out_slot->tts_values != NULL); + + outnatts = out_slot->tts_tupleDescriptor->natts; + + /* Extract all the values of the in slot. */ + slot_getallattrs(in_slot); + + /* Before doing the mapping, clear any old contents from the out slot */ + ExecClearTuple(out_slot); + + invalues = in_slot->tts_values; + inisnull = in_slot->tts_isnull; + outvalues = out_slot->tts_values; + outisnull = out_slot->tts_isnull; + + /* Transpose into proper fields of the out slot. */ + for (i = 0; i < outnatts; i++) + { + int j = attrMap->attnums[i] - 1; + + /* attrMap->attnums[i] == 0 means it's a NULL datum. */ + if (j == -1) + { + outvalues[i] = (Datum) 0; + outisnull[i] = true; + } + else + { + outvalues[i] = invalues[j]; + outisnull[i] = inisnull[j]; + } + } + + ExecStoreVirtualTuple(out_slot); + + return out_slot; +} + +/* + * Perform conversion of bitmap of columns according to the map. + * + * The input and output bitmaps are offset by + * FirstLowInvalidHeapAttributeNumber to accommodate system cols, like the + * column-bitmaps in RangeTblEntry. + */ +Bitmapset * +execute_attr_map_cols(AttrMap *attrMap, Bitmapset *in_cols) +{ + Bitmapset *out_cols; + int out_attnum; + + /* fast path for the common trivial case */ + if (in_cols == NULL) + return NULL; + + /* + * For each output column, check which input column it corresponds to. + */ + out_cols = NULL; + + for (out_attnum = FirstLowInvalidHeapAttributeNumber; + out_attnum <= attrMap->maplen; + out_attnum++) + { + int in_attnum; + + if (out_attnum < 0) + { + /* System column. No mapping. */ + in_attnum = out_attnum; + } + else if (out_attnum == 0) + continue; + else + { + /* normal user column */ + in_attnum = attrMap->attnums[out_attnum - 1]; + + if (in_attnum == 0) + continue; + } + + if (bms_is_member(in_attnum - FirstLowInvalidHeapAttributeNumber, in_cols)) + out_cols = bms_add_member(out_cols, out_attnum - FirstLowInvalidHeapAttributeNumber); + } + + return out_cols; +} + +/* + * Free a TupleConversionMap structure. + */ +void +free_conversion_map(TupleConversionMap *map) +{ + /* indesc and outdesc are not ours to free */ + free_attrmap(map->attrMap); + pfree(map->invalues); + pfree(map->inisnull); + pfree(map->outvalues); + pfree(map->outisnull); + pfree(map); +} diff --git a/src/backend/access/common/tupdesc.c b/src/backend/access/common/tupdesc.c new file mode 100644 index 0000000..4c63bd4 --- /dev/null +++ b/src/backend/access/common/tupdesc.c @@ -0,0 +1,912 @@ +/*------------------------------------------------------------------------- + * + * tupdesc.c + * POSTGRES tuple descriptor support code + * + * Portions Copyright (c) 1996-2021, PostgreSQL Global Development Group + * Portions Copyright (c) 1994, Regents of the University of California + * + * + * IDENTIFICATION + * src/backend/access/common/tupdesc.c + * + * NOTES + * some of the executor utility code such as "ExecTypeFromTL" should be + * moved here. + * + *------------------------------------------------------------------------- + */ + +#include "postgres.h" + +#include "access/htup_details.h" +#include "access/toast_compression.h" +#include "access/tupdesc_details.h" +#include "catalog/pg_collation.h" +#include "catalog/pg_type.h" +#include "common/hashfn.h" +#include "miscadmin.h" +#include "parser/parse_type.h" +#include "utils/acl.h" +#include "utils/builtins.h" +#include "utils/datum.h" +#include "utils/resowner_private.h" +#include "utils/syscache.h" + + +/* + * CreateTemplateTupleDesc + * This function allocates an empty tuple descriptor structure. + * + * Tuple type ID information is initially set for an anonymous record type; + * caller can overwrite this if needed. + */ +TupleDesc +CreateTemplateTupleDesc(int natts) +{ + TupleDesc desc; + + /* + * sanity checks + */ + AssertArg(natts >= 0); + + /* + * Allocate enough memory for the tuple descriptor, including the + * attribute rows. + * + * Note: the attribute array stride is sizeof(FormData_pg_attribute), + * since we declare the array elements as FormData_pg_attribute for + * notational convenience. However, we only guarantee that the first + * ATTRIBUTE_FIXED_PART_SIZE bytes of each entry are valid; most code that + * copies tupdesc entries around copies just that much. In principle that + * could be less due to trailing padding, although with the current + * definition of pg_attribute there probably isn't any padding. + */ + desc = (TupleDesc) palloc(offsetof(struct TupleDescData, attrs) + + natts * sizeof(FormData_pg_attribute)); + + /* + * Initialize other fields of the tupdesc. + */ + desc->natts = natts; + desc->constr = NULL; + desc->tdtypeid = RECORDOID; + desc->tdtypmod = -1; + desc->tdrefcount = -1; /* assume not reference-counted */ + + return desc; +} + +/* + * CreateTupleDesc + * This function allocates a new TupleDesc by copying a given + * Form_pg_attribute array. + * + * Tuple type ID information is initially set for an anonymous record type; + * caller can overwrite this if needed. + */ +TupleDesc +CreateTupleDesc(int natts, Form_pg_attribute *attrs) +{ + TupleDesc desc; + int i; + + desc = CreateTemplateTupleDesc(natts); + + for (i = 0; i < natts; ++i) + memcpy(TupleDescAttr(desc, i), attrs[i], ATTRIBUTE_FIXED_PART_SIZE); + + return desc; +} + +/* + * CreateTupleDescCopy + * This function creates a new TupleDesc by copying from an existing + * TupleDesc. + * + * !!! Constraints and defaults are not copied !!! + */ +TupleDesc +CreateTupleDescCopy(TupleDesc tupdesc) +{ + TupleDesc desc; + int i; + + desc = CreateTemplateTupleDesc(tupdesc->natts); + + /* Flat-copy the attribute array */ + memcpy(TupleDescAttr(desc, 0), + TupleDescAttr(tupdesc, 0), + desc->natts * sizeof(FormData_pg_attribute)); + + /* + * Since we're not copying constraints and defaults, clear fields + * associated with them. + */ + for (i = 0; i < desc->natts; i++) + { + Form_pg_attribute att = TupleDescAttr(desc, i); + + att->attnotnull = false; + att->atthasdef = false; + att->atthasmissing = false; + att->attidentity = '\0'; + att->attgenerated = '\0'; + } + + /* We can copy the tuple type identification, too */ + desc->tdtypeid = tupdesc->tdtypeid; + desc->tdtypmod = tupdesc->tdtypmod; + + return desc; +} + +/* + * CreateTupleDescCopyConstr + * This function creates a new TupleDesc by copying from an existing + * TupleDesc (including its constraints and defaults). + */ +TupleDesc +CreateTupleDescCopyConstr(TupleDesc tupdesc) +{ + TupleDesc desc; + TupleConstr *constr = tupdesc->constr; + int i; + + desc = CreateTemplateTupleDesc(tupdesc->natts); + + /* Flat-copy the attribute array */ + memcpy(TupleDescAttr(desc, 0), + TupleDescAttr(tupdesc, 0), + desc->natts * sizeof(FormData_pg_attribute)); + + /* Copy the TupleConstr data structure, if any */ + if (constr) + { + TupleConstr *cpy = (TupleConstr *) palloc0(sizeof(TupleConstr)); + + cpy->has_not_null = constr->has_not_null; + cpy->has_generated_stored = constr->has_generated_stored; + + if ((cpy->num_defval = constr->num_defval) > 0) + { + cpy->defval = (AttrDefault *) palloc(cpy->num_defval * sizeof(AttrDefault)); + memcpy(cpy->defval, constr->defval, cpy->num_defval * sizeof(AttrDefault)); + for (i = cpy->num_defval - 1; i >= 0; i--) + cpy->defval[i].adbin = pstrdup(constr->defval[i].adbin); + } + + if (constr->missing) + { + cpy->missing = (AttrMissing *) palloc(tupdesc->natts * sizeof(AttrMissing)); + memcpy(cpy->missing, constr->missing, tupdesc->natts * sizeof(AttrMissing)); + for (i = tupdesc->natts - 1; i >= 0; i--) + { + if (constr->missing[i].am_present) + { + Form_pg_attribute attr = TupleDescAttr(tupdesc, i); + + cpy->missing[i].am_value = datumCopy(constr->missing[i].am_value, + attr->attbyval, + attr->attlen); + } + } + } + + if ((cpy->num_check = constr->num_check) > 0) + { + cpy->check = (ConstrCheck *) palloc(cpy->num_check * sizeof(ConstrCheck)); + memcpy(cpy->check, constr->check, cpy->num_check * sizeof(ConstrCheck)); + for (i = cpy->num_check - 1; i >= 0; i--) + { + cpy->check[i].ccname = pstrdup(constr->check[i].ccname); + cpy->check[i].ccbin = pstrdup(constr->check[i].ccbin); + cpy->check[i].ccvalid = constr->check[i].ccvalid; + cpy->check[i].ccnoinherit = constr->check[i].ccnoinherit; + } + } + + desc->constr = cpy; + } + + /* We can copy the tuple type identification, too */ + desc->tdtypeid = tupdesc->tdtypeid; + desc->tdtypmod = tupdesc->tdtypmod; + + return desc; +} + +/* + * TupleDescCopy + * Copy a tuple descriptor into caller-supplied memory. + * The memory may be shared memory mapped at any address, and must + * be sufficient to hold TupleDescSize(src) bytes. + * + * !!! Constraints and defaults are not copied !!! + */ +void +TupleDescCopy(TupleDesc dst, TupleDesc src) +{ + int i; + + /* Flat-copy the header and attribute array */ + memcpy(dst, src, TupleDescSize(src)); + + /* + * Since we're not copying constraints and defaults, clear fields + * associated with them. + */ + for (i = 0; i < dst->natts; i++) + { + Form_pg_attribute att = TupleDescAttr(dst, i); + + att->attnotnull = false; + att->atthasdef = false; + att->atthasmissing = false; + att->attidentity = '\0'; + att->attgenerated = '\0'; + } + dst->constr = NULL; + + /* + * Also, assume the destination is not to be ref-counted. (Copying the + * source's refcount would be wrong in any case.) + */ + dst->tdrefcount = -1; +} + +/* + * TupleDescCopyEntry + * This function copies a single attribute structure from one tuple + * descriptor to another. + * + * !!! Constraints and defaults are not copied !!! + */ +void +TupleDescCopyEntry(TupleDesc dst, AttrNumber dstAttno, + TupleDesc src, AttrNumber srcAttno) +{ + Form_pg_attribute dstAtt = TupleDescAttr(dst, dstAttno - 1); + Form_pg_attribute srcAtt = TupleDescAttr(src, srcAttno - 1); + + /* + * sanity checks + */ + AssertArg(PointerIsValid(src)); + AssertArg(PointerIsValid(dst)); + AssertArg(srcAttno >= 1); + AssertArg(srcAttno <= src->natts); + AssertArg(dstAttno >= 1); + AssertArg(dstAttno <= dst->natts); + + memcpy(dstAtt, srcAtt, ATTRIBUTE_FIXED_PART_SIZE); + + /* + * Aside from updating the attno, we'd better reset attcacheoff. + * + * XXX Actually, to be entirely safe we'd need to reset the attcacheoff of + * all following columns in dst as well. Current usage scenarios don't + * require that though, because all following columns will get initialized + * by other uses of this function or TupleDescInitEntry. So we cheat a + * bit to avoid a useless O(N^2) penalty. + */ + dstAtt->attnum = dstAttno; + dstAtt->attcacheoff = -1; + + /* since we're not copying constraints or defaults, clear these */ + dstAtt->attnotnull = false; + dstAtt->atthasdef = false; + dstAtt->atthasmissing = false; + dstAtt->attidentity = '\0'; + dstAtt->attgenerated = '\0'; +} + +/* + * Free a TupleDesc including all substructure + */ +void +FreeTupleDesc(TupleDesc tupdesc) +{ + int i; + + /* + * Possibly this should assert tdrefcount == 0, to disallow explicit + * freeing of un-refcounted tupdescs? + */ + Assert(tupdesc->tdrefcount <= 0); + + if (tupdesc->constr) + { + if (tupdesc->constr->num_defval > 0) + { + AttrDefault *attrdef = tupdesc->constr->defval; + + for (i = tupdesc->constr->num_defval - 1; i >= 0; i--) + pfree(attrdef[i].adbin); + pfree(attrdef); + } + if (tupdesc->constr->missing) + { + AttrMissing *attrmiss = tupdesc->constr->missing; + + for (i = tupdesc->natts - 1; i >= 0; i--) + { + if (attrmiss[i].am_present + && !TupleDescAttr(tupdesc, i)->attbyval) + pfree(DatumGetPointer(attrmiss[i].am_value)); + } + pfree(attrmiss); + } + if (tupdesc->constr->num_check > 0) + { + ConstrCheck *check = tupdesc->constr->check; + + for (i = tupdesc->constr->num_check - 1; i >= 0; i--) + { + pfree(check[i].ccname); + pfree(check[i].ccbin); + } + pfree(check); + } + pfree(tupdesc->constr); + } + + pfree(tupdesc); +} + +/* + * Increment the reference count of a tupdesc, and log the reference in + * CurrentResourceOwner. + * + * Do not apply this to tupdescs that are not being refcounted. (Use the + * macro PinTupleDesc for tupdescs of uncertain status.) + */ +void +IncrTupleDescRefCount(TupleDesc tupdesc) +{ + Assert(tupdesc->tdrefcount >= 0); + + ResourceOwnerEnlargeTupleDescs(CurrentResourceOwner); + tupdesc->tdrefcount++; + ResourceOwnerRememberTupleDesc(CurrentResourceOwner, tupdesc); +} + +/* + * Decrement the reference count of a tupdesc, remove the corresponding + * reference from CurrentResourceOwner, and free the tupdesc if no more + * references remain. + * + * Do not apply this to tupdescs that are not being refcounted. (Use the + * macro ReleaseTupleDesc for tupdescs of uncertain status.) + */ +void +DecrTupleDescRefCount(TupleDesc tupdesc) +{ + Assert(tupdesc->tdrefcount > 0); + + ResourceOwnerForgetTupleDesc(CurrentResourceOwner, tupdesc); + if (--tupdesc->tdrefcount == 0) + FreeTupleDesc(tupdesc); +} + +/* + * Compare two TupleDesc structures for logical equality + * + * Note: we deliberately do not check the attrelid and tdtypmod fields. + * This allows typcache.c to use this routine to see if a cached record type + * matches a requested type, and is harmless for relcache.c's uses. + * We don't compare tdrefcount, either. + */ +bool +equalTupleDescs(TupleDesc tupdesc1, TupleDesc tupdesc2) +{ + int i, + n; + + if (tupdesc1->natts != tupdesc2->natts) + return false; + if (tupdesc1->tdtypeid != tupdesc2->tdtypeid) + return false; + + for (i = 0; i < tupdesc1->natts; i++) + { + Form_pg_attribute attr1 = TupleDescAttr(tupdesc1, i); + Form_pg_attribute attr2 = TupleDescAttr(tupdesc2, i); + + /* + * We do not need to check every single field here: we can disregard + * attrelid and attnum (which were used to place the row in the attrs + * array in the first place). It might look like we could dispense + * with checking attlen/attbyval/attalign, since these are derived + * from atttypid; but in the case of dropped columns we must check + * them (since atttypid will be zero for all dropped columns) and in + * general it seems safer to check them always. + * + * attcacheoff must NOT be checked since it's possibly not set in both + * copies. We also intentionally ignore atthasmissing, since that's + * not very relevant in tupdescs, which lack the attmissingval field. + */ + if (strcmp(NameStr(attr1->attname), NameStr(attr2->attname)) != 0) + return false; + if (attr1->atttypid != attr2->atttypid) + return false; + if (attr1->attstattarget != attr2->attstattarget) + return false; + if (attr1->attlen != attr2->attlen) + return false; + if (attr1->attndims != attr2->attndims) + return false; + if (attr1->atttypmod != attr2->atttypmod) + return false; + if (attr1->attbyval != attr2->attbyval) + return false; + if (attr1->attalign != attr2->attalign) + return false; + if (attr1->attstorage != attr2->attstorage) + return false; + if (attr1->attcompression != attr2->attcompression) + return false; + if (attr1->attnotnull != attr2->attnotnull) + return false; + if (attr1->atthasdef != attr2->atthasdef) + return false; + if (attr1->attidentity != attr2->attidentity) + return false; + if (attr1->attgenerated != attr2->attgenerated) + return false; + if (attr1->attisdropped != attr2->attisdropped) + return false; + if (attr1->attislocal != attr2->attislocal) + return false; + if (attr1->attinhcount != attr2->attinhcount) + return false; + if (attr1->attcollation != attr2->attcollation) + return false; + /* variable-length fields are not even present... */ + } + + if (tupdesc1->constr != NULL) + { + TupleConstr *constr1 = tupdesc1->constr; + TupleConstr *constr2 = tupdesc2->constr; + + if (constr2 == NULL) + return false; + if (constr1->has_not_null != constr2->has_not_null) + return false; + if (constr1->has_generated_stored != constr2->has_generated_stored) + return false; + n = constr1->num_defval; + if (n != (int) constr2->num_defval) + return false; + /* We assume here that both AttrDefault arrays are in adnum order */ + for (i = 0; i < n; i++) + { + AttrDefault *defval1 = constr1->defval + i; + AttrDefault *defval2 = constr2->defval + i; + + if (defval1->adnum != defval2->adnum) + return false; + if (strcmp(defval1->adbin, defval2->adbin) != 0) + return false; + } + if (constr1->missing) + { + if (!constr2->missing) + return false; + for (i = 0; i < tupdesc1->natts; i++) + { + AttrMissing *missval1 = constr1->missing + i; + AttrMissing *missval2 = constr2->missing + i; + + if (missval1->am_present != missval2->am_present) + return false; + if (missval1->am_present) + { + Form_pg_attribute missatt1 = TupleDescAttr(tupdesc1, i); + + if (!datumIsEqual(missval1->am_value, missval2->am_value, + missatt1->attbyval, missatt1->attlen)) + return false; + } + } + } + else if (constr2->missing) + return false; + n = constr1->num_check; + if (n != (int) constr2->num_check) + return false; + + /* + * Similarly, we rely here on the ConstrCheck entries being sorted by + * name. If there are duplicate names, the outcome of the comparison + * is uncertain, but that should not happen. + */ + for (i = 0; i < n; i++) + { + ConstrCheck *check1 = constr1->check + i; + ConstrCheck *check2 = constr2->check + i; + + if (!(strcmp(check1->ccname, check2->ccname) == 0 && + strcmp(check1->ccbin, check2->ccbin) == 0 && + check1->ccvalid == check2->ccvalid && + check1->ccnoinherit == check2->ccnoinherit)) + return false; + } + } + else if (tupdesc2->constr != NULL) + return false; + return true; +} + +/* + * hashTupleDesc + * Compute a hash value for a tuple descriptor. + * + * If two tuple descriptors would be considered equal by equalTupleDescs() + * then their hash value will be equal according to this function. + * + * Note that currently contents of constraint are not hashed - it'd be a bit + * painful to do so, and conflicts just due to constraints are unlikely. + */ +uint32 +hashTupleDesc(TupleDesc desc) +{ + uint32 s; + int i; + + s = hash_combine(0, hash_uint32(desc->natts)); + s = hash_combine(s, hash_uint32(desc->tdtypeid)); + for (i = 0; i < desc->natts; ++i) + s = hash_combine(s, hash_uint32(TupleDescAttr(desc, i)->atttypid)); + + return s; +} + +/* + * TupleDescInitEntry + * This function initializes a single attribute structure in + * a previously allocated tuple descriptor. + * + * If attributeName is NULL, the attname field is set to an empty string + * (this is for cases where we don't know or need a name for the field). + * Also, some callers use this function to change the datatype-related fields + * in an existing tupdesc; they pass attributeName = NameStr(att->attname) + * to indicate that the attname field shouldn't be modified. + * + * Note that attcollation is set to the default for the specified datatype. + * If a nondefault collation is needed, insert it afterwards using + * TupleDescInitEntryCollation. + */ +void +TupleDescInitEntry(TupleDesc desc, + AttrNumber attributeNumber, + const char *attributeName, + Oid oidtypeid, + int32 typmod, + int attdim) +{ + HeapTuple tuple; + Form_pg_type typeForm; + Form_pg_attribute att; + + /* + * sanity checks + */ + AssertArg(PointerIsValid(desc)); + AssertArg(attributeNumber >= 1); + AssertArg(attributeNumber <= desc->natts); + + /* + * initialize the attribute fields + */ + att = TupleDescAttr(desc, attributeNumber - 1); + + att->attrelid = 0; /* dummy value */ + + /* + * Note: attributeName can be NULL, because the planner doesn't always + * fill in valid resname values in targetlists, particularly for resjunk + * attributes. Also, do nothing if caller wants to re-use the old attname. + */ + if (attributeName == NULL) + MemSet(NameStr(att->attname), 0, NAMEDATALEN); + else if (attributeName != NameStr(att->attname)) + namestrcpy(&(att->attname), attributeName); + + att->attstattarget = -1; + att->attcacheoff = -1; + att->atttypmod = typmod; + + att->attnum = attributeNumber; + att->attndims = attdim; + + att->attnotnull = false; + att->atthasdef = false; + att->atthasmissing = false; + att->attidentity = '\0'; + att->attgenerated = '\0'; + att->attisdropped = false; + att->attislocal = true; + att->attinhcount = 0; + /* attacl, attoptions and attfdwoptions are not present in tupledescs */ + + tuple = SearchSysCache1(TYPEOID, ObjectIdGetDatum(oidtypeid)); + if (!HeapTupleIsValid(tuple)) + elog(ERROR, "cache lookup failed for type %u", oidtypeid); + typeForm = (Form_pg_type) GETSTRUCT(tuple); + + att->atttypid = oidtypeid; + att->attlen = typeForm->typlen; + att->attbyval = typeForm->typbyval; + att->attalign = typeForm->typalign; + att->attstorage = typeForm->typstorage; + att->attcompression = InvalidCompressionMethod; + att->attcollation = typeForm->typcollation; + + ReleaseSysCache(tuple); +} + +/* + * TupleDescInitBuiltinEntry + * Initialize a tuple descriptor without catalog access. Only + * a limited range of builtin types are supported. + */ +void +TupleDescInitBuiltinEntry(TupleDesc desc, + AttrNumber attributeNumber, + const char *attributeName, + Oid oidtypeid, + int32 typmod, + int attdim) +{ + Form_pg_attribute att; + + /* sanity checks */ + AssertArg(PointerIsValid(desc)); + AssertArg(attributeNumber >= 1); + AssertArg(attributeNumber <= desc->natts); + + /* initialize the attribute fields */ + att = TupleDescAttr(desc, attributeNumber - 1); + att->attrelid = 0; /* dummy value */ + + /* unlike TupleDescInitEntry, we require an attribute name */ + Assert(attributeName != NULL); + namestrcpy(&(att->attname), attributeName); + + att->attstattarget = -1; + att->attcacheoff = -1; + att->atttypmod = typmod; + + att->attnum = attributeNumber; + att->attndims = attdim; + + att->attnotnull = false; + att->atthasdef = false; + att->atthasmissing = false; + att->attidentity = '\0'; + att->attgenerated = '\0'; + att->attisdropped = false; + att->attislocal = true; + att->attinhcount = 0; + /* attacl, attoptions and attfdwoptions are not present in tupledescs */ + + att->atttypid = oidtypeid; + + /* + * Our goal here is to support just enough types to let basic builtin + * commands work without catalog access - e.g. so that we can do certain + * things even in processes that are not connected to a database. + */ + switch (oidtypeid) + { + case TEXTOID: + case TEXTARRAYOID: + att->attlen = -1; + att->attbyval = false; + att->attalign = TYPALIGN_INT; + att->attstorage = TYPSTORAGE_EXTENDED; + att->attcompression = InvalidCompressionMethod; + att->attcollation = DEFAULT_COLLATION_OID; + break; + + case BOOLOID: + att->attlen = 1; + att->attbyval = true; + att->attalign = TYPALIGN_CHAR; + att->attstorage = TYPSTORAGE_PLAIN; + att->attcompression = InvalidCompressionMethod; + att->attcollation = InvalidOid; + break; + + case INT4OID: + att->attlen = 4; + att->attbyval = true; + att->attalign = TYPALIGN_INT; + att->attstorage = TYPSTORAGE_PLAIN; + att->attcompression = InvalidCompressionMethod; + att->attcollation = InvalidOid; + break; + + case INT8OID: + att->attlen = 8; + att->attbyval = FLOAT8PASSBYVAL; + att->attalign = TYPALIGN_DOUBLE; + att->attstorage = TYPSTORAGE_PLAIN; + att->attcompression = InvalidCompressionMethod; + att->attcollation = InvalidOid; + break; + + default: + elog(ERROR, "unsupported type %u", oidtypeid); + } +} + +/* + * TupleDescInitEntryCollation + * + * Assign a nondefault collation to a previously initialized tuple descriptor + * entry. + */ +void +TupleDescInitEntryCollation(TupleDesc desc, + AttrNumber attributeNumber, + Oid collationid) +{ + /* + * sanity checks + */ + AssertArg(PointerIsValid(desc)); + AssertArg(attributeNumber >= 1); + AssertArg(attributeNumber <= desc->natts); + + TupleDescAttr(desc, attributeNumber - 1)->attcollation = collationid; +} + + +/* + * BuildDescForRelation + * + * Given a relation schema (list of ColumnDef nodes), build a TupleDesc. + * + * Note: tdtypeid will need to be filled in later on. + */ +TupleDesc +BuildDescForRelation(List *schema) +{ + int natts; + AttrNumber attnum; + ListCell *l; + TupleDesc desc; + bool has_not_null; + char *attname; + Oid atttypid; + int32 atttypmod; + Oid attcollation; + int attdim; + + /* + * allocate a new tuple descriptor + */ + natts = list_length(schema); + desc = CreateTemplateTupleDesc(natts); + has_not_null = false; + + attnum = 0; + + foreach(l, schema) + { + ColumnDef *entry = lfirst(l); + AclResult aclresult; + Form_pg_attribute att; + + /* + * for each entry in the list, get the name and type information from + * the list and have TupleDescInitEntry fill in the attribute + * information we need. + */ + attnum++; + + attname = entry->colname; + typenameTypeIdAndMod(NULL, entry->typeName, &atttypid, &atttypmod); + + aclresult = pg_type_aclcheck(atttypid, GetUserId(), ACL_USAGE); + if (aclresult != ACLCHECK_OK) + aclcheck_error_type(aclresult, atttypid); + + attcollation = GetColumnDefCollation(NULL, entry, atttypid); + attdim = list_length(entry->typeName->arrayBounds); + + if (entry->typeName->setof) + ereport(ERROR, + (errcode(ERRCODE_INVALID_TABLE_DEFINITION), + errmsg("column \"%s\" cannot be declared SETOF", + attname))); + + TupleDescInitEntry(desc, attnum, attname, + atttypid, atttypmod, attdim); + att = TupleDescAttr(desc, attnum - 1); + + /* Override TupleDescInitEntry's settings as requested */ + TupleDescInitEntryCollation(desc, attnum, attcollation); + if (entry->storage) + att->attstorage = entry->storage; + + /* Fill in additional stuff not handled by TupleDescInitEntry */ + att->attnotnull = entry->is_not_null; + has_not_null |= entry->is_not_null; + att->attislocal = entry->is_local; + att->attinhcount = entry->inhcount; + } + + if (has_not_null) + { + TupleConstr *constr = (TupleConstr *) palloc0(sizeof(TupleConstr)); + + constr->has_not_null = true; + constr->has_generated_stored = false; + constr->defval = NULL; + constr->missing = NULL; + constr->num_defval = 0; + constr->check = NULL; + constr->num_check = 0; + desc->constr = constr; + } + else + { + desc->constr = NULL; + } + + return desc; +} + +/* + * BuildDescFromLists + * + * Build a TupleDesc given lists of column names (as String nodes), + * column type OIDs, typmods, and collation OIDs. + * + * No constraints are generated. + * + * This is essentially a cut-down version of BuildDescForRelation for use + * with functions returning RECORD. + */ +TupleDesc +BuildDescFromLists(List *names, List *types, List *typmods, List *collations) +{ + int natts; + AttrNumber attnum; + ListCell *l1; + ListCell *l2; + ListCell *l3; + ListCell *l4; + TupleDesc desc; + + natts = list_length(names); + Assert(natts == list_length(types)); + Assert(natts == list_length(typmods)); + Assert(natts == list_length(collations)); + + /* + * allocate a new tuple descriptor + */ + desc = CreateTemplateTupleDesc(natts); + + attnum = 0; + forfour(l1, names, l2, types, l3, typmods, l4, collations) + { + char *attname = strVal(lfirst(l1)); + Oid atttypid = lfirst_oid(l2); + int32 atttypmod = lfirst_int(l3); + Oid attcollation = lfirst_oid(l4); + + attnum++; + + TupleDescInitEntry(desc, attnum, attname, atttypid, atttypmod, 0); + TupleDescInitEntryCollation(desc, attnum, attcollation); + } + + return desc; +} |