summaryrefslogtreecommitdiffstats
path: root/src/backend/utils/cache/relcache.c
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-05-04 12:17:33 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-05-04 12:17:33 +0000
commit5e45211a64149b3c659b90ff2de6fa982a5a93ed (patch)
tree739caf8c461053357daa9f162bef34516c7bf452 /src/backend/utils/cache/relcache.c
parentInitial commit. (diff)
downloadpostgresql-15-5e45211a64149b3c659b90ff2de6fa982a5a93ed.tar.xz
postgresql-15-5e45211a64149b3c659b90ff2de6fa982a5a93ed.zip
Adding upstream version 15.5.upstream/15.5
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'src/backend/utils/cache/relcache.c')
-rw-r--r--src/backend/utils/cache/relcache.c6800
1 files changed, 6800 insertions, 0 deletions
diff --git a/src/backend/utils/cache/relcache.c b/src/backend/utils/cache/relcache.c
new file mode 100644
index 0000000..0ce4400
--- /dev/null
+++ b/src/backend/utils/cache/relcache.c
@@ -0,0 +1,6800 @@
+/*-------------------------------------------------------------------------
+ *
+ * relcache.c
+ * POSTGRES relation descriptor cache code
+ *
+ * Portions Copyright (c) 1996-2022, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ *
+ * IDENTIFICATION
+ * src/backend/utils/cache/relcache.c
+ *
+ *-------------------------------------------------------------------------
+ */
+/*
+ * INTERFACE ROUTINES
+ * RelationCacheInitialize - initialize relcache (to empty)
+ * RelationCacheInitializePhase2 - initialize shared-catalog entries
+ * RelationCacheInitializePhase3 - finish initializing relcache
+ * RelationIdGetRelation - get a reldesc by relation id
+ * RelationClose - close an open relation
+ *
+ * NOTES
+ * The following code contains many undocumented hacks. Please be
+ * careful....
+ */
+#include "postgres.h"
+
+#include <sys/file.h>
+#include <fcntl.h>
+#include <unistd.h>
+
+#include "access/htup_details.h"
+#include "access/multixact.h"
+#include "access/nbtree.h"
+#include "access/parallel.h"
+#include "access/reloptions.h"
+#include "access/sysattr.h"
+#include "access/table.h"
+#include "access/tableam.h"
+#include "access/tupdesc_details.h"
+#include "access/xact.h"
+#include "access/xlog.h"
+#include "catalog/binary_upgrade.h"
+#include "catalog/catalog.h"
+#include "catalog/indexing.h"
+#include "catalog/namespace.h"
+#include "catalog/partition.h"
+#include "catalog/pg_am.h"
+#include "catalog/pg_amproc.h"
+#include "catalog/pg_attrdef.h"
+#include "catalog/pg_auth_members.h"
+#include "catalog/pg_authid.h"
+#include "catalog/pg_constraint.h"
+#include "catalog/pg_database.h"
+#include "catalog/pg_namespace.h"
+#include "catalog/pg_opclass.h"
+#include "catalog/pg_proc.h"
+#include "catalog/pg_publication.h"
+#include "catalog/pg_rewrite.h"
+#include "catalog/pg_shseclabel.h"
+#include "catalog/pg_statistic_ext.h"
+#include "catalog/pg_subscription.h"
+#include "catalog/pg_tablespace.h"
+#include "catalog/pg_trigger.h"
+#include "catalog/pg_type.h"
+#include "catalog/schemapg.h"
+#include "catalog/storage.h"
+#include "commands/policy.h"
+#include "commands/publicationcmds.h"
+#include "commands/trigger.h"
+#include "miscadmin.h"
+#include "nodes/makefuncs.h"
+#include "nodes/nodeFuncs.h"
+#include "optimizer/optimizer.h"
+#include "pgstat.h"
+#include "rewrite/rewriteDefine.h"
+#include "rewrite/rowsecurity.h"
+#include "storage/lmgr.h"
+#include "storage/smgr.h"
+#include "utils/array.h"
+#include "utils/builtins.h"
+#include "utils/datum.h"
+#include "utils/fmgroids.h"
+#include "utils/inval.h"
+#include "utils/lsyscache.h"
+#include "utils/memutils.h"
+#include "utils/relmapper.h"
+#include "utils/resowner_private.h"
+#include "utils/snapmgr.h"
+#include "utils/syscache.h"
+
+#define RELCACHE_INIT_FILEMAGIC 0x573266 /* version ID value */
+
+/*
+ * Whether to bother checking if relation cache memory needs to be freed
+ * eagerly. See also RelationBuildDesc() and pg_config_manual.h.
+ */
+#if defined(RECOVER_RELATION_BUILD_MEMORY) && (RECOVER_RELATION_BUILD_MEMORY != 0)
+#define MAYBE_RECOVER_RELATION_BUILD_MEMORY 1
+#else
+#define RECOVER_RELATION_BUILD_MEMORY 0
+#ifdef DISCARD_CACHES_ENABLED
+#define MAYBE_RECOVER_RELATION_BUILD_MEMORY 1
+#endif
+#endif
+
+/*
+ * hardcoded tuple descriptors, contents generated by genbki.pl
+ */
+static const FormData_pg_attribute Desc_pg_class[Natts_pg_class] = {Schema_pg_class};
+static const FormData_pg_attribute Desc_pg_attribute[Natts_pg_attribute] = {Schema_pg_attribute};
+static const FormData_pg_attribute Desc_pg_proc[Natts_pg_proc] = {Schema_pg_proc};
+static const FormData_pg_attribute Desc_pg_type[Natts_pg_type] = {Schema_pg_type};
+static const FormData_pg_attribute Desc_pg_database[Natts_pg_database] = {Schema_pg_database};
+static const FormData_pg_attribute Desc_pg_authid[Natts_pg_authid] = {Schema_pg_authid};
+static const FormData_pg_attribute Desc_pg_auth_members[Natts_pg_auth_members] = {Schema_pg_auth_members};
+static const FormData_pg_attribute Desc_pg_index[Natts_pg_index] = {Schema_pg_index};
+static const FormData_pg_attribute Desc_pg_shseclabel[Natts_pg_shseclabel] = {Schema_pg_shseclabel};
+static const FormData_pg_attribute Desc_pg_subscription[Natts_pg_subscription] = {Schema_pg_subscription};
+
+/*
+ * Hash tables that index the relation cache
+ *
+ * We used to index the cache by both name and OID, but now there
+ * is only an index by OID.
+ */
+typedef struct relidcacheent
+{
+ Oid reloid;
+ Relation reldesc;
+} RelIdCacheEnt;
+
+static HTAB *RelationIdCache;
+
+/*
+ * This flag is false until we have prepared the critical relcache entries
+ * that are needed to do indexscans on the tables read by relcache building.
+ */
+bool criticalRelcachesBuilt = false;
+
+/*
+ * This flag is false until we have prepared the critical relcache entries
+ * for shared catalogs (which are the tables needed for login).
+ */
+bool criticalSharedRelcachesBuilt = false;
+
+/*
+ * This counter counts relcache inval events received since backend startup
+ * (but only for rels that are actually in cache). Presently, we use it only
+ * to detect whether data about to be written by write_relcache_init_file()
+ * might already be obsolete.
+ */
+static long relcacheInvalsReceived = 0L;
+
+/*
+ * in_progress_list is a stack of ongoing RelationBuildDesc() calls. CREATE
+ * INDEX CONCURRENTLY makes catalog changes under ShareUpdateExclusiveLock.
+ * It critically relies on each backend absorbing those changes no later than
+ * next transaction start. Hence, RelationBuildDesc() loops until it finishes
+ * without accepting a relevant invalidation. (Most invalidation consumers
+ * don't do this.)
+ */
+typedef struct inprogressent
+{
+ Oid reloid; /* OID of relation being built */
+ bool invalidated; /* whether an invalidation arrived for it */
+} InProgressEnt;
+
+static InProgressEnt *in_progress_list;
+static int in_progress_list_len;
+static int in_progress_list_maxlen;
+
+/*
+ * eoxact_list[] stores the OIDs of relations that (might) need AtEOXact
+ * cleanup work. This list intentionally has limited size; if it overflows,
+ * we fall back to scanning the whole hashtable. There is no value in a very
+ * large list because (1) at some point, a hash_seq_search scan is faster than
+ * retail lookups, and (2) the value of this is to reduce EOXact work for
+ * short transactions, which can't have dirtied all that many tables anyway.
+ * EOXactListAdd() does not bother to prevent duplicate list entries, so the
+ * cleanup processing must be idempotent.
+ */
+#define MAX_EOXACT_LIST 32
+static Oid eoxact_list[MAX_EOXACT_LIST];
+static int eoxact_list_len = 0;
+static bool eoxact_list_overflowed = false;
+
+#define EOXactListAdd(rel) \
+ do { \
+ if (eoxact_list_len < MAX_EOXACT_LIST) \
+ eoxact_list[eoxact_list_len++] = (rel)->rd_id; \
+ else \
+ eoxact_list_overflowed = true; \
+ } while (0)
+
+/*
+ * EOXactTupleDescArray stores TupleDescs that (might) need AtEOXact
+ * cleanup work. The array expands as needed; there is no hashtable because
+ * we don't need to access individual items except at EOXact.
+ */
+static TupleDesc *EOXactTupleDescArray;
+static int NextEOXactTupleDescNum = 0;
+static int EOXactTupleDescArrayLen = 0;
+
+/*
+ * macros to manipulate the lookup hashtable
+ */
+#define RelationCacheInsert(RELATION, replace_allowed) \
+do { \
+ RelIdCacheEnt *hentry; bool found; \
+ hentry = (RelIdCacheEnt *) hash_search(RelationIdCache, \
+ (void *) &((RELATION)->rd_id), \
+ HASH_ENTER, &found); \
+ if (found) \
+ { \
+ /* see comments in RelationBuildDesc and RelationBuildLocalRelation */ \
+ Relation _old_rel = hentry->reldesc; \
+ Assert(replace_allowed); \
+ hentry->reldesc = (RELATION); \
+ if (RelationHasReferenceCountZero(_old_rel)) \
+ RelationDestroyRelation(_old_rel, false); \
+ else if (!IsBootstrapProcessingMode()) \
+ elog(WARNING, "leaking still-referenced relcache entry for \"%s\"", \
+ RelationGetRelationName(_old_rel)); \
+ } \
+ else \
+ hentry->reldesc = (RELATION); \
+} while(0)
+
+#define RelationIdCacheLookup(ID, RELATION) \
+do { \
+ RelIdCacheEnt *hentry; \
+ hentry = (RelIdCacheEnt *) hash_search(RelationIdCache, \
+ (void *) &(ID), \
+ HASH_FIND, NULL); \
+ if (hentry) \
+ RELATION = hentry->reldesc; \
+ else \
+ RELATION = NULL; \
+} while(0)
+
+#define RelationCacheDelete(RELATION) \
+do { \
+ RelIdCacheEnt *hentry; \
+ hentry = (RelIdCacheEnt *) hash_search(RelationIdCache, \
+ (void *) &((RELATION)->rd_id), \
+ HASH_REMOVE, NULL); \
+ if (hentry == NULL) \
+ elog(WARNING, "failed to delete relcache entry for OID %u", \
+ (RELATION)->rd_id); \
+} while(0)
+
+
+/*
+ * Special cache for opclass-related information
+ *
+ * Note: only default support procs get cached, ie, those with
+ * lefttype = righttype = opcintype.
+ */
+typedef struct opclasscacheent
+{
+ Oid opclassoid; /* lookup key: OID of opclass */
+ bool valid; /* set true after successful fill-in */
+ StrategyNumber numSupport; /* max # of support procs (from pg_am) */
+ Oid opcfamily; /* OID of opclass's family */
+ Oid opcintype; /* OID of opclass's declared input type */
+ RegProcedure *supportProcs; /* OIDs of support procedures */
+} OpClassCacheEnt;
+
+static HTAB *OpClassCache = NULL;
+
+
+/* non-export function prototypes */
+
+static void RelationDestroyRelation(Relation relation, bool remember_tupdesc);
+static void RelationClearRelation(Relation relation, bool rebuild);
+
+static void RelationReloadIndexInfo(Relation relation);
+static void RelationReloadNailed(Relation relation);
+static void RelationFlushRelation(Relation relation);
+static void RememberToFreeTupleDescAtEOX(TupleDesc td);
+#ifdef USE_ASSERT_CHECKING
+static void AssertPendingSyncConsistency(Relation relation);
+#endif
+static void AtEOXact_cleanup(Relation relation, bool isCommit);
+static void AtEOSubXact_cleanup(Relation relation, bool isCommit,
+ SubTransactionId mySubid, SubTransactionId parentSubid);
+static bool load_relcache_init_file(bool shared);
+static void write_relcache_init_file(bool shared);
+static void write_item(const void *data, Size len, FILE *fp);
+
+static void formrdesc(const char *relationName, Oid relationReltype,
+ bool isshared, int natts, const FormData_pg_attribute *attrs);
+
+static HeapTuple ScanPgRelation(Oid targetRelId, bool indexOK, bool force_non_historic);
+static Relation AllocateRelationDesc(Form_pg_class relp);
+static void RelationParseRelOptions(Relation relation, HeapTuple tuple);
+static void RelationBuildTupleDesc(Relation relation);
+static Relation RelationBuildDesc(Oid targetRelId, bool insertIt);
+static void RelationInitPhysicalAddr(Relation relation);
+static void load_critical_index(Oid indexoid, Oid heapoid);
+static TupleDesc GetPgClassDescriptor(void);
+static TupleDesc GetPgIndexDescriptor(void);
+static void AttrDefaultFetch(Relation relation, int ndef);
+static int AttrDefaultCmp(const void *a, const void *b);
+static void CheckConstraintFetch(Relation relation);
+static int CheckConstraintCmp(const void *a, const void *b);
+static void InitIndexAmRoutine(Relation relation);
+static void IndexSupportInitialize(oidvector *indclass,
+ RegProcedure *indexSupport,
+ Oid *opFamily,
+ Oid *opcInType,
+ StrategyNumber maxSupportNumber,
+ AttrNumber maxAttributeNumber);
+static OpClassCacheEnt *LookupOpclassInfo(Oid operatorClassOid,
+ StrategyNumber numSupport);
+static void RelationCacheInitFileRemoveInDir(const char *tblspcpath);
+static void unlink_initfile(const char *initfilename, int elevel);
+
+
+/*
+ * ScanPgRelation
+ *
+ * This is used by RelationBuildDesc to find a pg_class
+ * tuple matching targetRelId. The caller must hold at least
+ * AccessShareLock on the target relid to prevent concurrent-update
+ * scenarios; it isn't guaranteed that all scans used to build the
+ * relcache entry will use the same snapshot. If, for example,
+ * an attribute were to be added after scanning pg_class and before
+ * scanning pg_attribute, relnatts wouldn't match.
+ *
+ * NB: the returned tuple has been copied into palloc'd storage
+ * and must eventually be freed with heap_freetuple.
+ */
+static HeapTuple
+ScanPgRelation(Oid targetRelId, bool indexOK, bool force_non_historic)
+{
+ HeapTuple pg_class_tuple;
+ Relation pg_class_desc;
+ SysScanDesc pg_class_scan;
+ ScanKeyData key[1];
+ Snapshot snapshot = NULL;
+
+ /*
+ * If something goes wrong during backend startup, we might find ourselves
+ * trying to read pg_class before we've selected a database. That ain't
+ * gonna work, so bail out with a useful error message. If this happens,
+ * it probably means a relcache entry that needs to be nailed isn't.
+ */
+ if (!OidIsValid(MyDatabaseId))
+ elog(FATAL, "cannot read pg_class without having selected a database");
+
+ /*
+ * form a scan key
+ */
+ ScanKeyInit(&key[0],
+ Anum_pg_class_oid,
+ BTEqualStrategyNumber, F_OIDEQ,
+ ObjectIdGetDatum(targetRelId));
+
+ /*
+ * Open pg_class and fetch a tuple. Force heap scan if we haven't yet
+ * built the critical relcache entries (this includes initdb and startup
+ * without a pg_internal.init file). The caller can also force a heap
+ * scan by setting indexOK == false.
+ */
+ pg_class_desc = table_open(RelationRelationId, AccessShareLock);
+
+ /*
+ * The caller might need a tuple that's newer than the one the historic
+ * snapshot; currently the only case requiring to do so is looking up the
+ * relfilenode of non mapped system relations during decoding. That
+ * snapshot can't change in the midst of a relcache build, so there's no
+ * need to register the snapshot.
+ */
+ if (force_non_historic)
+ snapshot = GetNonHistoricCatalogSnapshot(RelationRelationId);
+
+ pg_class_scan = systable_beginscan(pg_class_desc, ClassOidIndexId,
+ indexOK && criticalRelcachesBuilt,
+ snapshot,
+ 1, key);
+
+ pg_class_tuple = systable_getnext(pg_class_scan);
+
+ /*
+ * Must copy tuple before releasing buffer.
+ */
+ if (HeapTupleIsValid(pg_class_tuple))
+ pg_class_tuple = heap_copytuple(pg_class_tuple);
+
+ /* all done */
+ systable_endscan(pg_class_scan);
+ table_close(pg_class_desc, AccessShareLock);
+
+ return pg_class_tuple;
+}
+
+/*
+ * AllocateRelationDesc
+ *
+ * This is used to allocate memory for a new relation descriptor
+ * and initialize the rd_rel field from the given pg_class tuple.
+ */
+static Relation
+AllocateRelationDesc(Form_pg_class relp)
+{
+ Relation relation;
+ MemoryContext oldcxt;
+ Form_pg_class relationForm;
+
+ /* Relcache entries must live in CacheMemoryContext */
+ oldcxt = MemoryContextSwitchTo(CacheMemoryContext);
+
+ /*
+ * allocate and zero space for new relation descriptor
+ */
+ relation = (Relation) palloc0(sizeof(RelationData));
+
+ /* make sure relation is marked as having no open file yet */
+ relation->rd_smgr = NULL;
+
+ /*
+ * Copy the relation tuple form
+ *
+ * We only allocate space for the fixed fields, ie, CLASS_TUPLE_SIZE. The
+ * variable-length fields (relacl, reloptions) are NOT stored in the
+ * relcache --- there'd be little point in it, since we don't copy the
+ * tuple's nulls bitmap and hence wouldn't know if the values are valid.
+ * Bottom line is that relacl *cannot* be retrieved from the relcache. Get
+ * it from the syscache if you need it. The same goes for the original
+ * form of reloptions (however, we do store the parsed form of reloptions
+ * in rd_options).
+ */
+ relationForm = (Form_pg_class) palloc(CLASS_TUPLE_SIZE);
+
+ memcpy(relationForm, relp, CLASS_TUPLE_SIZE);
+
+ /* initialize relation tuple form */
+ relation->rd_rel = relationForm;
+
+ /* and allocate attribute tuple form storage */
+ relation->rd_att = CreateTemplateTupleDesc(relationForm->relnatts);
+ /* which we mark as a reference-counted tupdesc */
+ relation->rd_att->tdrefcount = 1;
+
+ MemoryContextSwitchTo(oldcxt);
+
+ return relation;
+}
+
+/*
+ * RelationParseRelOptions
+ * Convert pg_class.reloptions into pre-parsed rd_options
+ *
+ * tuple is the real pg_class tuple (not rd_rel!) for relation
+ *
+ * Note: rd_rel and (if an index) rd_indam must be valid already
+ */
+static void
+RelationParseRelOptions(Relation relation, HeapTuple tuple)
+{
+ bytea *options;
+ amoptions_function amoptsfn;
+
+ relation->rd_options = NULL;
+
+ /*
+ * Look up any AM-specific parse function; fall out if relkind should not
+ * have options.
+ */
+ switch (relation->rd_rel->relkind)
+ {
+ case RELKIND_RELATION:
+ case RELKIND_TOASTVALUE:
+ case RELKIND_VIEW:
+ case RELKIND_MATVIEW:
+ case RELKIND_PARTITIONED_TABLE:
+ amoptsfn = NULL;
+ break;
+ case RELKIND_INDEX:
+ case RELKIND_PARTITIONED_INDEX:
+ amoptsfn = relation->rd_indam->amoptions;
+ break;
+ default:
+ return;
+ }
+
+ /*
+ * Fetch reloptions from tuple; have to use a hardwired descriptor because
+ * we might not have any other for pg_class yet (consider executing this
+ * code for pg_class itself)
+ */
+ options = extractRelOptions(tuple, GetPgClassDescriptor(), amoptsfn);
+
+ /*
+ * Copy parsed data into CacheMemoryContext. To guard against the
+ * possibility of leaks in the reloptions code, we want to do the actual
+ * parsing in the caller's memory context and copy the results into
+ * CacheMemoryContext after the fact.
+ */
+ if (options)
+ {
+ relation->rd_options = MemoryContextAlloc(CacheMemoryContext,
+ VARSIZE(options));
+ memcpy(relation->rd_options, options, VARSIZE(options));
+ pfree(options);
+ }
+}
+
+/*
+ * RelationBuildTupleDesc
+ *
+ * Form the relation's tuple descriptor from information in
+ * the pg_attribute, pg_attrdef & pg_constraint system catalogs.
+ */
+static void
+RelationBuildTupleDesc(Relation relation)
+{
+ HeapTuple pg_attribute_tuple;
+ Relation pg_attribute_desc;
+ SysScanDesc pg_attribute_scan;
+ ScanKeyData skey[2];
+ int need;
+ TupleConstr *constr;
+ AttrMissing *attrmiss = NULL;
+ int ndef = 0;
+
+ /* fill rd_att's type ID fields (compare heap.c's AddNewRelationTuple) */
+ relation->rd_att->tdtypeid =
+ relation->rd_rel->reltype ? relation->rd_rel->reltype : RECORDOID;
+ relation->rd_att->tdtypmod = -1; /* just to be sure */
+
+ constr = (TupleConstr *) MemoryContextAllocZero(CacheMemoryContext,
+ sizeof(TupleConstr));
+ constr->has_not_null = false;
+ constr->has_generated_stored = false;
+
+ /*
+ * Form a scan key that selects only user attributes (attnum > 0).
+ * (Eliminating system attribute rows at the index level is lots faster
+ * than fetching them.)
+ */
+ ScanKeyInit(&skey[0],
+ Anum_pg_attribute_attrelid,
+ BTEqualStrategyNumber, F_OIDEQ,
+ ObjectIdGetDatum(RelationGetRelid(relation)));
+ ScanKeyInit(&skey[1],
+ Anum_pg_attribute_attnum,
+ BTGreaterStrategyNumber, F_INT2GT,
+ Int16GetDatum(0));
+
+ /*
+ * Open pg_attribute and begin a scan. Force heap scan if we haven't yet
+ * built the critical relcache entries (this includes initdb and startup
+ * without a pg_internal.init file).
+ */
+ pg_attribute_desc = table_open(AttributeRelationId, AccessShareLock);
+ pg_attribute_scan = systable_beginscan(pg_attribute_desc,
+ AttributeRelidNumIndexId,
+ criticalRelcachesBuilt,
+ NULL,
+ 2, skey);
+
+ /*
+ * add attribute data to relation->rd_att
+ */
+ need = RelationGetNumberOfAttributes(relation);
+
+ while (HeapTupleIsValid(pg_attribute_tuple = systable_getnext(pg_attribute_scan)))
+ {
+ Form_pg_attribute attp;
+ int attnum;
+
+ attp = (Form_pg_attribute) GETSTRUCT(pg_attribute_tuple);
+
+ attnum = attp->attnum;
+ if (attnum <= 0 || attnum > RelationGetNumberOfAttributes(relation))
+ elog(ERROR, "invalid attribute number %d for relation \"%s\"",
+ attp->attnum, RelationGetRelationName(relation));
+
+ memcpy(TupleDescAttr(relation->rd_att, attnum - 1),
+ attp,
+ ATTRIBUTE_FIXED_PART_SIZE);
+
+ /* Update constraint/default info */
+ if (attp->attnotnull)
+ constr->has_not_null = true;
+ if (attp->attgenerated == ATTRIBUTE_GENERATED_STORED)
+ constr->has_generated_stored = true;
+ if (attp->atthasdef)
+ ndef++;
+
+ /* If the column has a "missing" value, put it in the attrmiss array */
+ if (attp->atthasmissing)
+ {
+ Datum missingval;
+ bool missingNull;
+
+ /* Do we have a missing value? */
+ missingval = heap_getattr(pg_attribute_tuple,
+ Anum_pg_attribute_attmissingval,
+ pg_attribute_desc->rd_att,
+ &missingNull);
+ if (!missingNull)
+ {
+ /* Yes, fetch from the array */
+ MemoryContext oldcxt;
+ bool is_null;
+ int one = 1;
+ Datum missval;
+
+ if (attrmiss == NULL)
+ attrmiss = (AttrMissing *)
+ MemoryContextAllocZero(CacheMemoryContext,
+ relation->rd_rel->relnatts *
+ sizeof(AttrMissing));
+
+ missval = array_get_element(missingval,
+ 1,
+ &one,
+ -1,
+ attp->attlen,
+ attp->attbyval,
+ attp->attalign,
+ &is_null);
+ Assert(!is_null);
+ if (attp->attbyval)
+ {
+ /* for copy by val just copy the datum direct */
+ attrmiss[attnum - 1].am_value = missval;
+ }
+ else
+ {
+ /* otherwise copy in the correct context */
+ oldcxt = MemoryContextSwitchTo(CacheMemoryContext);
+ attrmiss[attnum - 1].am_value = datumCopy(missval,
+ attp->attbyval,
+ attp->attlen);
+ MemoryContextSwitchTo(oldcxt);
+ }
+ attrmiss[attnum - 1].am_present = true;
+ }
+ }
+ need--;
+ if (need == 0)
+ break;
+ }
+
+ /*
+ * end the scan and close the attribute relation
+ */
+ systable_endscan(pg_attribute_scan);
+ table_close(pg_attribute_desc, AccessShareLock);
+
+ if (need != 0)
+ elog(ERROR, "pg_attribute catalog is missing %d attribute(s) for relation OID %u",
+ need, RelationGetRelid(relation));
+
+ /*
+ * The attcacheoff values we read from pg_attribute should all be -1
+ * ("unknown"). Verify this if assert checking is on. They will be
+ * computed when and if needed during tuple access.
+ */
+#ifdef USE_ASSERT_CHECKING
+ {
+ int i;
+
+ for (i = 0; i < RelationGetNumberOfAttributes(relation); i++)
+ Assert(TupleDescAttr(relation->rd_att, i)->attcacheoff == -1);
+ }
+#endif
+
+ /*
+ * However, we can easily set the attcacheoff value for the first
+ * attribute: it must be zero. This eliminates the need for special cases
+ * for attnum=1 that used to exist in fastgetattr() and index_getattr().
+ */
+ if (RelationGetNumberOfAttributes(relation) > 0)
+ TupleDescAttr(relation->rd_att, 0)->attcacheoff = 0;
+
+ /*
+ * Set up constraint/default info
+ */
+ if (constr->has_not_null ||
+ constr->has_generated_stored ||
+ ndef > 0 ||
+ attrmiss ||
+ relation->rd_rel->relchecks > 0)
+ {
+ relation->rd_att->constr = constr;
+
+ if (ndef > 0) /* DEFAULTs */
+ AttrDefaultFetch(relation, ndef);
+ else
+ constr->num_defval = 0;
+
+ constr->missing = attrmiss;
+
+ if (relation->rd_rel->relchecks > 0) /* CHECKs */
+ CheckConstraintFetch(relation);
+ else
+ constr->num_check = 0;
+ }
+ else
+ {
+ pfree(constr);
+ relation->rd_att->constr = NULL;
+ }
+}
+
+/*
+ * RelationBuildRuleLock
+ *
+ * Form the relation's rewrite rules from information in
+ * the pg_rewrite system catalog.
+ *
+ * Note: The rule parsetrees are potentially very complex node structures.
+ * To allow these trees to be freed when the relcache entry is flushed,
+ * we make a private memory context to hold the RuleLock information for
+ * each relcache entry that has associated rules. The context is used
+ * just for rule info, not for any other subsidiary data of the relcache
+ * entry, because that keeps the update logic in RelationClearRelation()
+ * manageable. The other subsidiary data structures are simple enough
+ * to be easy to free explicitly, anyway.
+ *
+ * Note: The relation's reloptions must have been extracted first.
+ */
+static void
+RelationBuildRuleLock(Relation relation)
+{
+ MemoryContext rulescxt;
+ MemoryContext oldcxt;
+ HeapTuple rewrite_tuple;
+ Relation rewrite_desc;
+ TupleDesc rewrite_tupdesc;
+ SysScanDesc rewrite_scan;
+ ScanKeyData key;
+ RuleLock *rulelock;
+ int numlocks;
+ RewriteRule **rules;
+ int maxlocks;
+
+ /*
+ * Make the private context. Assume it'll not contain much data.
+ */
+ rulescxt = AllocSetContextCreate(CacheMemoryContext,
+ "relation rules",
+ ALLOCSET_SMALL_SIZES);
+ relation->rd_rulescxt = rulescxt;
+ MemoryContextCopyAndSetIdentifier(rulescxt,
+ RelationGetRelationName(relation));
+
+ /*
+ * allocate an array to hold the rewrite rules (the array is extended if
+ * necessary)
+ */
+ maxlocks = 4;
+ rules = (RewriteRule **)
+ MemoryContextAlloc(rulescxt, sizeof(RewriteRule *) * maxlocks);
+ numlocks = 0;
+
+ /*
+ * form a scan key
+ */
+ ScanKeyInit(&key,
+ Anum_pg_rewrite_ev_class,
+ BTEqualStrategyNumber, F_OIDEQ,
+ ObjectIdGetDatum(RelationGetRelid(relation)));
+
+ /*
+ * open pg_rewrite and begin a scan
+ *
+ * Note: since we scan the rules using RewriteRelRulenameIndexId, we will
+ * be reading the rules in name order, except possibly during
+ * emergency-recovery operations (ie, IgnoreSystemIndexes). This in turn
+ * ensures that rules will be fired in name order.
+ */
+ rewrite_desc = table_open(RewriteRelationId, AccessShareLock);
+ rewrite_tupdesc = RelationGetDescr(rewrite_desc);
+ rewrite_scan = systable_beginscan(rewrite_desc,
+ RewriteRelRulenameIndexId,
+ true, NULL,
+ 1, &key);
+
+ while (HeapTupleIsValid(rewrite_tuple = systable_getnext(rewrite_scan)))
+ {
+ Form_pg_rewrite rewrite_form = (Form_pg_rewrite) GETSTRUCT(rewrite_tuple);
+ bool isnull;
+ Datum rule_datum;
+ char *rule_str;
+ RewriteRule *rule;
+ Oid check_as_user;
+
+ rule = (RewriteRule *) MemoryContextAlloc(rulescxt,
+ sizeof(RewriteRule));
+
+ rule->ruleId = rewrite_form->oid;
+
+ rule->event = rewrite_form->ev_type - '0';
+ rule->enabled = rewrite_form->ev_enabled;
+ rule->isInstead = rewrite_form->is_instead;
+
+ /*
+ * Must use heap_getattr to fetch ev_action and ev_qual. Also, the
+ * rule strings are often large enough to be toasted. To avoid
+ * leaking memory in the caller's context, do the detoasting here so
+ * we can free the detoasted version.
+ */
+ rule_datum = heap_getattr(rewrite_tuple,
+ Anum_pg_rewrite_ev_action,
+ rewrite_tupdesc,
+ &isnull);
+ Assert(!isnull);
+ rule_str = TextDatumGetCString(rule_datum);
+ oldcxt = MemoryContextSwitchTo(rulescxt);
+ rule->actions = (List *) stringToNode(rule_str);
+ MemoryContextSwitchTo(oldcxt);
+ pfree(rule_str);
+
+ rule_datum = heap_getattr(rewrite_tuple,
+ Anum_pg_rewrite_ev_qual,
+ rewrite_tupdesc,
+ &isnull);
+ Assert(!isnull);
+ rule_str = TextDatumGetCString(rule_datum);
+ oldcxt = MemoryContextSwitchTo(rulescxt);
+ rule->qual = (Node *) stringToNode(rule_str);
+ MemoryContextSwitchTo(oldcxt);
+ pfree(rule_str);
+
+ /*
+ * If this is a SELECT rule defining a view, and the view has
+ * "security_invoker" set, we must perform all permissions checks on
+ * relations referred to by the rule as the invoking user.
+ *
+ * In all other cases (including non-SELECT rules on security invoker
+ * views), perform the permissions checks as the relation owner.
+ */
+ if (rule->event == CMD_SELECT &&
+ relation->rd_rel->relkind == RELKIND_VIEW &&
+ RelationHasSecurityInvoker(relation))
+ check_as_user = InvalidOid;
+ else
+ check_as_user = relation->rd_rel->relowner;
+
+ /*
+ * Scan through the rule's actions and set the checkAsUser field on
+ * all rtable entries. We have to look at the qual as well, in case it
+ * contains sublinks.
+ *
+ * The reason for doing this when the rule is loaded, rather than when
+ * it is stored, is that otherwise ALTER TABLE OWNER would have to
+ * grovel through stored rules to update checkAsUser fields. Scanning
+ * the rule tree during load is relatively cheap (compared to
+ * constructing it in the first place), so we do it here.
+ */
+ setRuleCheckAsUser((Node *) rule->actions, check_as_user);
+ setRuleCheckAsUser(rule->qual, check_as_user);
+
+ if (numlocks >= maxlocks)
+ {
+ maxlocks *= 2;
+ rules = (RewriteRule **)
+ repalloc(rules, sizeof(RewriteRule *) * maxlocks);
+ }
+ rules[numlocks++] = rule;
+ }
+
+ /*
+ * end the scan and close the attribute relation
+ */
+ systable_endscan(rewrite_scan);
+ table_close(rewrite_desc, AccessShareLock);
+
+ /*
+ * there might not be any rules (if relhasrules is out-of-date)
+ */
+ if (numlocks == 0)
+ {
+ relation->rd_rules = NULL;
+ relation->rd_rulescxt = NULL;
+ MemoryContextDelete(rulescxt);
+ return;
+ }
+
+ /*
+ * form a RuleLock and insert into relation
+ */
+ rulelock = (RuleLock *) MemoryContextAlloc(rulescxt, sizeof(RuleLock));
+ rulelock->numLocks = numlocks;
+ rulelock->rules = rules;
+
+ relation->rd_rules = rulelock;
+}
+
+/*
+ * equalRuleLocks
+ *
+ * Determine whether two RuleLocks are equivalent
+ *
+ * Probably this should be in the rules code someplace...
+ */
+static bool
+equalRuleLocks(RuleLock *rlock1, RuleLock *rlock2)
+{
+ int i;
+
+ /*
+ * As of 7.3 we assume the rule ordering is repeatable, because
+ * RelationBuildRuleLock should read 'em in a consistent order. So just
+ * compare corresponding slots.
+ */
+ if (rlock1 != NULL)
+ {
+ if (rlock2 == NULL)
+ return false;
+ if (rlock1->numLocks != rlock2->numLocks)
+ return false;
+ for (i = 0; i < rlock1->numLocks; i++)
+ {
+ RewriteRule *rule1 = rlock1->rules[i];
+ RewriteRule *rule2 = rlock2->rules[i];
+
+ if (rule1->ruleId != rule2->ruleId)
+ return false;
+ if (rule1->event != rule2->event)
+ return false;
+ if (rule1->enabled != rule2->enabled)
+ return false;
+ if (rule1->isInstead != rule2->isInstead)
+ return false;
+ if (!equal(rule1->qual, rule2->qual))
+ return false;
+ if (!equal(rule1->actions, rule2->actions))
+ return false;
+ }
+ }
+ else if (rlock2 != NULL)
+ return false;
+ return true;
+}
+
+/*
+ * equalPolicy
+ *
+ * Determine whether two policies are equivalent
+ */
+static bool
+equalPolicy(RowSecurityPolicy *policy1, RowSecurityPolicy *policy2)
+{
+ int i;
+ Oid *r1,
+ *r2;
+
+ if (policy1 != NULL)
+ {
+ if (policy2 == NULL)
+ return false;
+
+ if (policy1->polcmd != policy2->polcmd)
+ return false;
+ if (policy1->hassublinks != policy2->hassublinks)
+ return false;
+ if (strcmp(policy1->policy_name, policy2->policy_name) != 0)
+ return false;
+ if (ARR_DIMS(policy1->roles)[0] != ARR_DIMS(policy2->roles)[0])
+ return false;
+
+ r1 = (Oid *) ARR_DATA_PTR(policy1->roles);
+ r2 = (Oid *) ARR_DATA_PTR(policy2->roles);
+
+ for (i = 0; i < ARR_DIMS(policy1->roles)[0]; i++)
+ {
+ if (r1[i] != r2[i])
+ return false;
+ }
+
+ if (!equal(policy1->qual, policy2->qual))
+ return false;
+ if (!equal(policy1->with_check_qual, policy2->with_check_qual))
+ return false;
+ }
+ else if (policy2 != NULL)
+ return false;
+
+ return true;
+}
+
+/*
+ * equalRSDesc
+ *
+ * Determine whether two RowSecurityDesc's are equivalent
+ */
+static bool
+equalRSDesc(RowSecurityDesc *rsdesc1, RowSecurityDesc *rsdesc2)
+{
+ ListCell *lc,
+ *rc;
+
+ if (rsdesc1 == NULL && rsdesc2 == NULL)
+ return true;
+
+ if ((rsdesc1 != NULL && rsdesc2 == NULL) ||
+ (rsdesc1 == NULL && rsdesc2 != NULL))
+ return false;
+
+ if (list_length(rsdesc1->policies) != list_length(rsdesc2->policies))
+ return false;
+
+ /* RelationBuildRowSecurity should build policies in order */
+ forboth(lc, rsdesc1->policies, rc, rsdesc2->policies)
+ {
+ RowSecurityPolicy *l = (RowSecurityPolicy *) lfirst(lc);
+ RowSecurityPolicy *r = (RowSecurityPolicy *) lfirst(rc);
+
+ if (!equalPolicy(l, r))
+ return false;
+ }
+
+ return true;
+}
+
+/*
+ * RelationBuildDesc
+ *
+ * Build a relation descriptor. The caller must hold at least
+ * AccessShareLock on the target relid.
+ *
+ * The new descriptor is inserted into the hash table if insertIt is true.
+ *
+ * Returns NULL if no pg_class row could be found for the given relid
+ * (suggesting we are trying to access a just-deleted relation).
+ * Any other error is reported via elog.
+ */
+static Relation
+RelationBuildDesc(Oid targetRelId, bool insertIt)
+{
+ int in_progress_offset;
+ Relation relation;
+ Oid relid;
+ HeapTuple pg_class_tuple;
+ Form_pg_class relp;
+
+ /*
+ * This function and its subroutines can allocate a good deal of transient
+ * data in CurrentMemoryContext. Traditionally we've just leaked that
+ * data, reasoning that the caller's context is at worst of transaction
+ * scope, and relcache loads shouldn't happen so often that it's essential
+ * to recover transient data before end of statement/transaction. However
+ * that's definitely not true when debug_discard_caches is active, and
+ * perhaps it's not true in other cases.
+ *
+ * When debug_discard_caches is active or when forced to by
+ * RECOVER_RELATION_BUILD_MEMORY=1, arrange to allocate the junk in a
+ * temporary context that we'll free before returning. Make it a child of
+ * caller's context so that it will get cleaned up appropriately if we
+ * error out partway through.
+ */
+#ifdef MAYBE_RECOVER_RELATION_BUILD_MEMORY
+ MemoryContext tmpcxt = NULL;
+ MemoryContext oldcxt = NULL;
+
+ if (RECOVER_RELATION_BUILD_MEMORY || debug_discard_caches > 0)
+ {
+ tmpcxt = AllocSetContextCreate(CurrentMemoryContext,
+ "RelationBuildDesc workspace",
+ ALLOCSET_DEFAULT_SIZES);
+ oldcxt = MemoryContextSwitchTo(tmpcxt);
+ }
+#endif
+
+ /* Register to catch invalidation messages */
+ if (in_progress_list_len >= in_progress_list_maxlen)
+ {
+ int allocsize;
+
+ allocsize = in_progress_list_maxlen * 2;
+ in_progress_list = repalloc(in_progress_list,
+ allocsize * sizeof(*in_progress_list));
+ in_progress_list_maxlen = allocsize;
+ }
+ in_progress_offset = in_progress_list_len++;
+ in_progress_list[in_progress_offset].reloid = targetRelId;
+retry:
+ in_progress_list[in_progress_offset].invalidated = false;
+
+ /*
+ * find the tuple in pg_class corresponding to the given relation id
+ */
+ pg_class_tuple = ScanPgRelation(targetRelId, true, false);
+
+ /*
+ * if no such tuple exists, return NULL
+ */
+ if (!HeapTupleIsValid(pg_class_tuple))
+ {
+#ifdef MAYBE_RECOVER_RELATION_BUILD_MEMORY
+ if (tmpcxt)
+ {
+ /* Return to caller's context, and blow away the temporary context */
+ MemoryContextSwitchTo(oldcxt);
+ MemoryContextDelete(tmpcxt);
+ }
+#endif
+ Assert(in_progress_offset + 1 == in_progress_list_len);
+ in_progress_list_len--;
+ return NULL;
+ }
+
+ /*
+ * get information from the pg_class_tuple
+ */
+ relp = (Form_pg_class) GETSTRUCT(pg_class_tuple);
+ relid = relp->oid;
+ Assert(relid == targetRelId);
+
+ /*
+ * allocate storage for the relation descriptor, and copy pg_class_tuple
+ * to relation->rd_rel.
+ */
+ relation = AllocateRelationDesc(relp);
+
+ /*
+ * initialize the relation's relation id (relation->rd_id)
+ */
+ RelationGetRelid(relation) = relid;
+
+ /*
+ * Normal relations are not nailed into the cache. Since we don't flush
+ * new relations, it won't be new. It could be temp though.
+ */
+ relation->rd_refcnt = 0;
+ relation->rd_isnailed = false;
+ relation->rd_createSubid = InvalidSubTransactionId;
+ relation->rd_newRelfilenodeSubid = InvalidSubTransactionId;
+ relation->rd_firstRelfilenodeSubid = InvalidSubTransactionId;
+ relation->rd_droppedSubid = InvalidSubTransactionId;
+ switch (relation->rd_rel->relpersistence)
+ {
+ case RELPERSISTENCE_UNLOGGED:
+ case RELPERSISTENCE_PERMANENT:
+ relation->rd_backend = InvalidBackendId;
+ relation->rd_islocaltemp = false;
+ break;
+ case RELPERSISTENCE_TEMP:
+ if (isTempOrTempToastNamespace(relation->rd_rel->relnamespace))
+ {
+ relation->rd_backend = BackendIdForTempRelations();
+ relation->rd_islocaltemp = true;
+ }
+ else
+ {
+ /*
+ * If it's a temp table, but not one of ours, we have to use
+ * the slow, grotty method to figure out the owning backend.
+ *
+ * Note: it's possible that rd_backend gets set to MyBackendId
+ * here, in case we are looking at a pg_class entry left over
+ * from a crashed backend that coincidentally had the same
+ * BackendId we're using. We should *not* consider such a
+ * table to be "ours"; this is why we need the separate
+ * rd_islocaltemp flag. The pg_class entry will get flushed
+ * if/when we clean out the corresponding temp table namespace
+ * in preparation for using it.
+ */
+ relation->rd_backend =
+ GetTempNamespaceBackendId(relation->rd_rel->relnamespace);
+ Assert(relation->rd_backend != InvalidBackendId);
+ relation->rd_islocaltemp = false;
+ }
+ break;
+ default:
+ elog(ERROR, "invalid relpersistence: %c",
+ relation->rd_rel->relpersistence);
+ break;
+ }
+
+ /*
+ * initialize the tuple descriptor (relation->rd_att).
+ */
+ RelationBuildTupleDesc(relation);
+
+ /* foreign key data is not loaded till asked for */
+ relation->rd_fkeylist = NIL;
+ relation->rd_fkeyvalid = false;
+
+ /* partitioning data is not loaded till asked for */
+ relation->rd_partkey = NULL;
+ relation->rd_partkeycxt = NULL;
+ relation->rd_partdesc = NULL;
+ relation->rd_partdesc_nodetached = NULL;
+ relation->rd_partdesc_nodetached_xmin = InvalidTransactionId;
+ relation->rd_pdcxt = NULL;
+ relation->rd_pddcxt = NULL;
+ relation->rd_partcheck = NIL;
+ relation->rd_partcheckvalid = false;
+ relation->rd_partcheckcxt = NULL;
+
+ /*
+ * initialize access method information
+ */
+ if (relation->rd_rel->relkind == RELKIND_INDEX ||
+ relation->rd_rel->relkind == RELKIND_PARTITIONED_INDEX)
+ RelationInitIndexAccessInfo(relation);
+ else if (RELKIND_HAS_TABLE_AM(relation->rd_rel->relkind) ||
+ relation->rd_rel->relkind == RELKIND_SEQUENCE)
+ RelationInitTableAccessMethod(relation);
+ else
+ Assert(relation->rd_rel->relam == InvalidOid);
+
+ /* extract reloptions if any */
+ RelationParseRelOptions(relation, pg_class_tuple);
+
+ /*
+ * Fetch rules and triggers that affect this relation.
+ *
+ * Note that RelationBuildRuleLock() relies on this being done after
+ * extracting the relation's reloptions.
+ */
+ if (relation->rd_rel->relhasrules)
+ RelationBuildRuleLock(relation);
+ else
+ {
+ relation->rd_rules = NULL;
+ relation->rd_rulescxt = NULL;
+ }
+
+ if (relation->rd_rel->relhastriggers)
+ RelationBuildTriggers(relation);
+ else
+ relation->trigdesc = NULL;
+
+ if (relation->rd_rel->relrowsecurity)
+ RelationBuildRowSecurity(relation);
+ else
+ relation->rd_rsdesc = NULL;
+
+ /*
+ * initialize the relation lock manager information
+ */
+ RelationInitLockInfo(relation); /* see lmgr.c */
+
+ /*
+ * initialize physical addressing information for the relation
+ */
+ RelationInitPhysicalAddr(relation);
+
+ /* make sure relation is marked as having no open file yet */
+ relation->rd_smgr = NULL;
+
+ /*
+ * now we can free the memory allocated for pg_class_tuple
+ */
+ heap_freetuple(pg_class_tuple);
+
+ /*
+ * If an invalidation arrived mid-build, start over. Between here and the
+ * end of this function, don't add code that does or reasonably could read
+ * system catalogs. That range must be free from invalidation processing
+ * for the !insertIt case. For the insertIt case, RelationCacheInsert()
+ * will enroll this relation in ordinary relcache invalidation processing,
+ */
+ if (in_progress_list[in_progress_offset].invalidated)
+ {
+ RelationDestroyRelation(relation, false);
+ goto retry;
+ }
+ Assert(in_progress_offset + 1 == in_progress_list_len);
+ in_progress_list_len--;
+
+ /*
+ * Insert newly created relation into relcache hash table, if requested.
+ *
+ * There is one scenario in which we might find a hashtable entry already
+ * present, even though our caller failed to find it: if the relation is a
+ * system catalog or index that's used during relcache load, we might have
+ * recursively created the same relcache entry during the preceding steps.
+ * So allow RelationCacheInsert to delete any already-present relcache
+ * entry for the same OID. The already-present entry should have refcount
+ * zero (else somebody forgot to close it); in the event that it doesn't,
+ * we'll elog a WARNING and leak the already-present entry.
+ */
+ if (insertIt)
+ RelationCacheInsert(relation, true);
+
+ /* It's fully valid */
+ relation->rd_isvalid = true;
+
+#ifdef MAYBE_RECOVER_RELATION_BUILD_MEMORY
+ if (tmpcxt)
+ {
+ /* Return to caller's context, and blow away the temporary context */
+ MemoryContextSwitchTo(oldcxt);
+ MemoryContextDelete(tmpcxt);
+ }
+#endif
+
+ return relation;
+}
+
+/*
+ * Initialize the physical addressing info (RelFileNode) for a relcache entry
+ *
+ * Note: at the physical level, relations in the pg_global tablespace must
+ * be treated as shared, even if relisshared isn't set. Hence we do not
+ * look at relisshared here.
+ */
+static void
+RelationInitPhysicalAddr(Relation relation)
+{
+ Oid oldnode = relation->rd_node.relNode;
+
+ /* these relations kinds never have storage */
+ if (!RELKIND_HAS_STORAGE(relation->rd_rel->relkind))
+ return;
+
+ if (relation->rd_rel->reltablespace)
+ relation->rd_node.spcNode = relation->rd_rel->reltablespace;
+ else
+ relation->rd_node.spcNode = MyDatabaseTableSpace;
+ if (relation->rd_node.spcNode == GLOBALTABLESPACE_OID)
+ relation->rd_node.dbNode = InvalidOid;
+ else
+ relation->rd_node.dbNode = MyDatabaseId;
+
+ if (relation->rd_rel->relfilenode)
+ {
+ /*
+ * Even if we are using a decoding snapshot that doesn't represent the
+ * current state of the catalog we need to make sure the filenode
+ * points to the current file since the older file will be gone (or
+ * truncated). The new file will still contain older rows so lookups
+ * in them will work correctly. This wouldn't work correctly if
+ * rewrites were allowed to change the schema in an incompatible way,
+ * but those are prevented both on catalog tables and on user tables
+ * declared as additional catalog tables.
+ */
+ if (HistoricSnapshotActive()
+ && RelationIsAccessibleInLogicalDecoding(relation)
+ && IsTransactionState())
+ {
+ HeapTuple phys_tuple;
+ Form_pg_class physrel;
+
+ phys_tuple = ScanPgRelation(RelationGetRelid(relation),
+ RelationGetRelid(relation) != ClassOidIndexId,
+ true);
+ if (!HeapTupleIsValid(phys_tuple))
+ elog(ERROR, "could not find pg_class entry for %u",
+ RelationGetRelid(relation));
+ physrel = (Form_pg_class) GETSTRUCT(phys_tuple);
+
+ relation->rd_rel->reltablespace = physrel->reltablespace;
+ relation->rd_rel->relfilenode = physrel->relfilenode;
+ heap_freetuple(phys_tuple);
+ }
+
+ relation->rd_node.relNode = relation->rd_rel->relfilenode;
+ }
+ else
+ {
+ /* Consult the relation mapper */
+ relation->rd_node.relNode =
+ RelationMapOidToFilenode(relation->rd_id,
+ relation->rd_rel->relisshared);
+ if (!OidIsValid(relation->rd_node.relNode))
+ elog(ERROR, "could not find relation mapping for relation \"%s\", OID %u",
+ RelationGetRelationName(relation), relation->rd_id);
+ }
+
+ /*
+ * For RelationNeedsWAL() to answer correctly on parallel workers, restore
+ * rd_firstRelfilenodeSubid. No subtransactions start or end while in
+ * parallel mode, so the specific SubTransactionId does not matter.
+ */
+ if (IsParallelWorker() && oldnode != relation->rd_node.relNode)
+ {
+ if (RelFileNodeSkippingWAL(relation->rd_node))
+ relation->rd_firstRelfilenodeSubid = TopSubTransactionId;
+ else
+ relation->rd_firstRelfilenodeSubid = InvalidSubTransactionId;
+ }
+}
+
+/*
+ * Fill in the IndexAmRoutine for an index relation.
+ *
+ * relation's rd_amhandler and rd_indexcxt must be valid already.
+ */
+static void
+InitIndexAmRoutine(Relation relation)
+{
+ IndexAmRoutine *cached,
+ *tmp;
+
+ /*
+ * Call the amhandler in current, short-lived memory context, just in case
+ * it leaks anything (it probably won't, but let's be paranoid).
+ */
+ tmp = GetIndexAmRoutine(relation->rd_amhandler);
+
+ /* OK, now transfer the data into relation's rd_indexcxt. */
+ cached = (IndexAmRoutine *) MemoryContextAlloc(relation->rd_indexcxt,
+ sizeof(IndexAmRoutine));
+ memcpy(cached, tmp, sizeof(IndexAmRoutine));
+ relation->rd_indam = cached;
+
+ pfree(tmp);
+}
+
+/*
+ * Initialize index-access-method support data for an index relation
+ */
+void
+RelationInitIndexAccessInfo(Relation relation)
+{
+ HeapTuple tuple;
+ Form_pg_am aform;
+ Datum indcollDatum;
+ Datum indclassDatum;
+ Datum indoptionDatum;
+ bool isnull;
+ oidvector *indcoll;
+ oidvector *indclass;
+ int2vector *indoption;
+ MemoryContext indexcxt;
+ MemoryContext oldcontext;
+ int indnatts;
+ int indnkeyatts;
+ uint16 amsupport;
+
+ /*
+ * Make a copy of the pg_index entry for the index. Since pg_index
+ * contains variable-length and possibly-null fields, we have to do this
+ * honestly rather than just treating it as a Form_pg_index struct.
+ */
+ tuple = SearchSysCache1(INDEXRELID,
+ ObjectIdGetDatum(RelationGetRelid(relation)));
+ if (!HeapTupleIsValid(tuple))
+ elog(ERROR, "cache lookup failed for index %u",
+ RelationGetRelid(relation));
+ oldcontext = MemoryContextSwitchTo(CacheMemoryContext);
+ relation->rd_indextuple = heap_copytuple(tuple);
+ relation->rd_index = (Form_pg_index) GETSTRUCT(relation->rd_indextuple);
+ MemoryContextSwitchTo(oldcontext);
+ ReleaseSysCache(tuple);
+
+ /*
+ * Look up the index's access method, save the OID of its handler function
+ */
+ Assert(relation->rd_rel->relam != InvalidOid);
+ tuple = SearchSysCache1(AMOID, ObjectIdGetDatum(relation->rd_rel->relam));
+ if (!HeapTupleIsValid(tuple))
+ elog(ERROR, "cache lookup failed for access method %u",
+ relation->rd_rel->relam);
+ aform = (Form_pg_am) GETSTRUCT(tuple);
+ relation->rd_amhandler = aform->amhandler;
+ ReleaseSysCache(tuple);
+
+ indnatts = RelationGetNumberOfAttributes(relation);
+ if (indnatts != IndexRelationGetNumberOfAttributes(relation))
+ elog(ERROR, "relnatts disagrees with indnatts for index %u",
+ RelationGetRelid(relation));
+ indnkeyatts = IndexRelationGetNumberOfKeyAttributes(relation);
+
+ /*
+ * Make the private context to hold index access info. The reason we need
+ * a context, and not just a couple of pallocs, is so that we won't leak
+ * any subsidiary info attached to fmgr lookup records.
+ */
+ indexcxt = AllocSetContextCreate(CacheMemoryContext,
+ "index info",
+ ALLOCSET_SMALL_SIZES);
+ relation->rd_indexcxt = indexcxt;
+ MemoryContextCopyAndSetIdentifier(indexcxt,
+ RelationGetRelationName(relation));
+
+ /*
+ * Now we can fetch the index AM's API struct
+ */
+ InitIndexAmRoutine(relation);
+
+ /*
+ * Allocate arrays to hold data. Opclasses are not used for included
+ * columns, so allocate them for indnkeyatts only.
+ */
+ relation->rd_opfamily = (Oid *)
+ MemoryContextAllocZero(indexcxt, indnkeyatts * sizeof(Oid));
+ relation->rd_opcintype = (Oid *)
+ MemoryContextAllocZero(indexcxt, indnkeyatts * sizeof(Oid));
+
+ amsupport = relation->rd_indam->amsupport;
+ if (amsupport > 0)
+ {
+ int nsupport = indnatts * amsupport;
+
+ relation->rd_support = (RegProcedure *)
+ MemoryContextAllocZero(indexcxt, nsupport * sizeof(RegProcedure));
+ relation->rd_supportinfo = (FmgrInfo *)
+ MemoryContextAllocZero(indexcxt, nsupport * sizeof(FmgrInfo));
+ }
+ else
+ {
+ relation->rd_support = NULL;
+ relation->rd_supportinfo = NULL;
+ }
+
+ relation->rd_indcollation = (Oid *)
+ MemoryContextAllocZero(indexcxt, indnkeyatts * sizeof(Oid));
+
+ relation->rd_indoption = (int16 *)
+ MemoryContextAllocZero(indexcxt, indnkeyatts * sizeof(int16));
+
+ /*
+ * indcollation cannot be referenced directly through the C struct,
+ * because it comes after the variable-width indkey field. Must extract
+ * the datum the hard way...
+ */
+ indcollDatum = fastgetattr(relation->rd_indextuple,
+ Anum_pg_index_indcollation,
+ GetPgIndexDescriptor(),
+ &isnull);
+ Assert(!isnull);
+ indcoll = (oidvector *) DatumGetPointer(indcollDatum);
+ memcpy(relation->rd_indcollation, indcoll->values, indnkeyatts * sizeof(Oid));
+
+ /*
+ * indclass cannot be referenced directly through the C struct, because it
+ * comes after the variable-width indkey field. Must extract the datum
+ * the hard way...
+ */
+ indclassDatum = fastgetattr(relation->rd_indextuple,
+ Anum_pg_index_indclass,
+ GetPgIndexDescriptor(),
+ &isnull);
+ Assert(!isnull);
+ indclass = (oidvector *) DatumGetPointer(indclassDatum);
+
+ /*
+ * Fill the support procedure OID array, as well as the info about
+ * opfamilies and opclass input types. (aminfo and supportinfo are left
+ * as zeroes, and are filled on-the-fly when used)
+ */
+ IndexSupportInitialize(indclass, relation->rd_support,
+ relation->rd_opfamily, relation->rd_opcintype,
+ amsupport, indnkeyatts);
+
+ /*
+ * Similarly extract indoption and copy it to the cache entry
+ */
+ indoptionDatum = fastgetattr(relation->rd_indextuple,
+ Anum_pg_index_indoption,
+ GetPgIndexDescriptor(),
+ &isnull);
+ Assert(!isnull);
+ indoption = (int2vector *) DatumGetPointer(indoptionDatum);
+ memcpy(relation->rd_indoption, indoption->values, indnkeyatts * sizeof(int16));
+
+ (void) RelationGetIndexAttOptions(relation, false);
+
+ /*
+ * expressions, predicate, exclusion caches will be filled later
+ */
+ relation->rd_indexprs = NIL;
+ relation->rd_indpred = NIL;
+ relation->rd_exclops = NULL;
+ relation->rd_exclprocs = NULL;
+ relation->rd_exclstrats = NULL;
+ relation->rd_amcache = NULL;
+}
+
+/*
+ * IndexSupportInitialize
+ * Initializes an index's cached opclass information,
+ * given the index's pg_index.indclass entry.
+ *
+ * Data is returned into *indexSupport, *opFamily, and *opcInType,
+ * which are arrays allocated by the caller.
+ *
+ * The caller also passes maxSupportNumber and maxAttributeNumber, since these
+ * indicate the size of the arrays it has allocated --- but in practice these
+ * numbers must always match those obtainable from the system catalog entries
+ * for the index and access method.
+ */
+static void
+IndexSupportInitialize(oidvector *indclass,
+ RegProcedure *indexSupport,
+ Oid *opFamily,
+ Oid *opcInType,
+ StrategyNumber maxSupportNumber,
+ AttrNumber maxAttributeNumber)
+{
+ int attIndex;
+
+ for (attIndex = 0; attIndex < maxAttributeNumber; attIndex++)
+ {
+ OpClassCacheEnt *opcentry;
+
+ if (!OidIsValid(indclass->values[attIndex]))
+ elog(ERROR, "bogus pg_index tuple");
+
+ /* look up the info for this opclass, using a cache */
+ opcentry = LookupOpclassInfo(indclass->values[attIndex],
+ maxSupportNumber);
+
+ /* copy cached data into relcache entry */
+ opFamily[attIndex] = opcentry->opcfamily;
+ opcInType[attIndex] = opcentry->opcintype;
+ if (maxSupportNumber > 0)
+ memcpy(&indexSupport[attIndex * maxSupportNumber],
+ opcentry->supportProcs,
+ maxSupportNumber * sizeof(RegProcedure));
+ }
+}
+
+/*
+ * LookupOpclassInfo
+ *
+ * This routine maintains a per-opclass cache of the information needed
+ * by IndexSupportInitialize(). This is more efficient than relying on
+ * the catalog cache, because we can load all the info about a particular
+ * opclass in a single indexscan of pg_amproc.
+ *
+ * The information from pg_am about expected range of support function
+ * numbers is passed in, rather than being looked up, mainly because the
+ * caller will have it already.
+ *
+ * Note there is no provision for flushing the cache. This is OK at the
+ * moment because there is no way to ALTER any interesting properties of an
+ * existing opclass --- all you can do is drop it, which will result in
+ * a useless but harmless dead entry in the cache. To support altering
+ * opclass membership (not the same as opfamily membership!), we'd need to
+ * be able to flush this cache as well as the contents of relcache entries
+ * for indexes.
+ */
+static OpClassCacheEnt *
+LookupOpclassInfo(Oid operatorClassOid,
+ StrategyNumber numSupport)
+{
+ OpClassCacheEnt *opcentry;
+ bool found;
+ Relation rel;
+ SysScanDesc scan;
+ ScanKeyData skey[3];
+ HeapTuple htup;
+ bool indexOK;
+
+ if (OpClassCache == NULL)
+ {
+ /* First time through: initialize the opclass cache */
+ HASHCTL ctl;
+
+ /* Also make sure CacheMemoryContext exists */
+ if (!CacheMemoryContext)
+ CreateCacheMemoryContext();
+
+ ctl.keysize = sizeof(Oid);
+ ctl.entrysize = sizeof(OpClassCacheEnt);
+ OpClassCache = hash_create("Operator class cache", 64,
+ &ctl, HASH_ELEM | HASH_BLOBS);
+ }
+
+ opcentry = (OpClassCacheEnt *) hash_search(OpClassCache,
+ (void *) &operatorClassOid,
+ HASH_ENTER, &found);
+
+ if (!found)
+ {
+ /* Initialize new entry */
+ opcentry->valid = false; /* until known OK */
+ opcentry->numSupport = numSupport;
+ opcentry->supportProcs = NULL; /* filled below */
+ }
+ else
+ {
+ Assert(numSupport == opcentry->numSupport);
+ }
+
+ /*
+ * When aggressively testing cache-flush hazards, we disable the operator
+ * class cache and force reloading of the info on each call. This models
+ * no real-world behavior, since the cache entries are never invalidated
+ * otherwise. However it can be helpful for detecting bugs in the cache
+ * loading logic itself, such as reliance on a non-nailed index. Given
+ * the limited use-case and the fact that this adds a great deal of
+ * expense, we enable it only for high values of debug_discard_caches.
+ */
+#ifdef DISCARD_CACHES_ENABLED
+ if (debug_discard_caches > 2)
+ opcentry->valid = false;
+#endif
+
+ if (opcentry->valid)
+ return opcentry;
+
+ /*
+ * Need to fill in new entry. First allocate space, unless we already did
+ * so in some previous attempt.
+ */
+ if (opcentry->supportProcs == NULL && numSupport > 0)
+ opcentry->supportProcs = (RegProcedure *)
+ MemoryContextAllocZero(CacheMemoryContext,
+ numSupport * sizeof(RegProcedure));
+
+ /*
+ * To avoid infinite recursion during startup, force heap scans if we're
+ * looking up info for the opclasses used by the indexes we would like to
+ * reference here.
+ */
+ indexOK = criticalRelcachesBuilt ||
+ (operatorClassOid != OID_BTREE_OPS_OID &&
+ operatorClassOid != INT2_BTREE_OPS_OID);
+
+ /*
+ * We have to fetch the pg_opclass row to determine its opfamily and
+ * opcintype, which are needed to look up related operators and functions.
+ * It'd be convenient to use the syscache here, but that probably doesn't
+ * work while bootstrapping.
+ */
+ ScanKeyInit(&skey[0],
+ Anum_pg_opclass_oid,
+ BTEqualStrategyNumber, F_OIDEQ,
+ ObjectIdGetDatum(operatorClassOid));
+ rel = table_open(OperatorClassRelationId, AccessShareLock);
+ scan = systable_beginscan(rel, OpclassOidIndexId, indexOK,
+ NULL, 1, skey);
+
+ if (HeapTupleIsValid(htup = systable_getnext(scan)))
+ {
+ Form_pg_opclass opclassform = (Form_pg_opclass) GETSTRUCT(htup);
+
+ opcentry->opcfamily = opclassform->opcfamily;
+ opcentry->opcintype = opclassform->opcintype;
+ }
+ else
+ elog(ERROR, "could not find tuple for opclass %u", operatorClassOid);
+
+ systable_endscan(scan);
+ table_close(rel, AccessShareLock);
+
+ /*
+ * Scan pg_amproc to obtain support procs for the opclass. We only fetch
+ * the default ones (those with lefttype = righttype = opcintype).
+ */
+ if (numSupport > 0)
+ {
+ ScanKeyInit(&skey[0],
+ Anum_pg_amproc_amprocfamily,
+ BTEqualStrategyNumber, F_OIDEQ,
+ ObjectIdGetDatum(opcentry->opcfamily));
+ ScanKeyInit(&skey[1],
+ Anum_pg_amproc_amproclefttype,
+ BTEqualStrategyNumber, F_OIDEQ,
+ ObjectIdGetDatum(opcentry->opcintype));
+ ScanKeyInit(&skey[2],
+ Anum_pg_amproc_amprocrighttype,
+ BTEqualStrategyNumber, F_OIDEQ,
+ ObjectIdGetDatum(opcentry->opcintype));
+ rel = table_open(AccessMethodProcedureRelationId, AccessShareLock);
+ scan = systable_beginscan(rel, AccessMethodProcedureIndexId, indexOK,
+ NULL, 3, skey);
+
+ while (HeapTupleIsValid(htup = systable_getnext(scan)))
+ {
+ Form_pg_amproc amprocform = (Form_pg_amproc) GETSTRUCT(htup);
+
+ if (amprocform->amprocnum <= 0 ||
+ (StrategyNumber) amprocform->amprocnum > numSupport)
+ elog(ERROR, "invalid amproc number %d for opclass %u",
+ amprocform->amprocnum, operatorClassOid);
+
+ opcentry->supportProcs[amprocform->amprocnum - 1] =
+ amprocform->amproc;
+ }
+
+ systable_endscan(scan);
+ table_close(rel, AccessShareLock);
+ }
+
+ opcentry->valid = true;
+ return opcentry;
+}
+
+/*
+ * Fill in the TableAmRoutine for a relation
+ *
+ * relation's rd_amhandler must be valid already.
+ */
+static void
+InitTableAmRoutine(Relation relation)
+{
+ relation->rd_tableam = GetTableAmRoutine(relation->rd_amhandler);
+}
+
+/*
+ * Initialize table access method support for a table like relation
+ */
+void
+RelationInitTableAccessMethod(Relation relation)
+{
+ HeapTuple tuple;
+ Form_pg_am aform;
+
+ if (relation->rd_rel->relkind == RELKIND_SEQUENCE)
+ {
+ /*
+ * Sequences are currently accessed like heap tables, but it doesn't
+ * seem prudent to show that in the catalog. So just overwrite it
+ * here.
+ */
+ Assert(relation->rd_rel->relam == InvalidOid);
+ relation->rd_amhandler = F_HEAP_TABLEAM_HANDLER;
+ }
+ else if (IsCatalogRelation(relation))
+ {
+ /*
+ * Avoid doing a syscache lookup for catalog tables.
+ */
+ Assert(relation->rd_rel->relam == HEAP_TABLE_AM_OID);
+ relation->rd_amhandler = F_HEAP_TABLEAM_HANDLER;
+ }
+ else
+ {
+ /*
+ * Look up the table access method, save the OID of its handler
+ * function.
+ */
+ Assert(relation->rd_rel->relam != InvalidOid);
+ tuple = SearchSysCache1(AMOID,
+ ObjectIdGetDatum(relation->rd_rel->relam));
+ if (!HeapTupleIsValid(tuple))
+ elog(ERROR, "cache lookup failed for access method %u",
+ relation->rd_rel->relam);
+ aform = (Form_pg_am) GETSTRUCT(tuple);
+ relation->rd_amhandler = aform->amhandler;
+ ReleaseSysCache(tuple);
+ }
+
+ /*
+ * Now we can fetch the table AM's API struct
+ */
+ InitTableAmRoutine(relation);
+}
+
+/*
+ * formrdesc
+ *
+ * This is a special cut-down version of RelationBuildDesc(),
+ * used while initializing the relcache.
+ * The relation descriptor is built just from the supplied parameters,
+ * without actually looking at any system table entries. We cheat
+ * quite a lot since we only need to work for a few basic system
+ * catalogs.
+ *
+ * The catalogs this is used for can't have constraints (except attnotnull),
+ * default values, rules, or triggers, since we don't cope with any of that.
+ * (Well, actually, this only matters for properties that need to be valid
+ * during bootstrap or before RelationCacheInitializePhase3 runs, and none of
+ * these properties matter then...)
+ *
+ * NOTE: we assume we are already switched into CacheMemoryContext.
+ */
+static void
+formrdesc(const char *relationName, Oid relationReltype,
+ bool isshared,
+ int natts, const FormData_pg_attribute *attrs)
+{
+ Relation relation;
+ int i;
+ bool has_not_null;
+
+ /*
+ * allocate new relation desc, clear all fields of reldesc
+ */
+ relation = (Relation) palloc0(sizeof(RelationData));
+
+ /* make sure relation is marked as having no open file yet */
+ relation->rd_smgr = NULL;
+
+ /*
+ * initialize reference count: 1 because it is nailed in cache
+ */
+ relation->rd_refcnt = 1;
+
+ /*
+ * all entries built with this routine are nailed-in-cache; none are for
+ * new or temp relations.
+ */
+ relation->rd_isnailed = true;
+ relation->rd_createSubid = InvalidSubTransactionId;
+ relation->rd_newRelfilenodeSubid = InvalidSubTransactionId;
+ relation->rd_firstRelfilenodeSubid = InvalidSubTransactionId;
+ relation->rd_droppedSubid = InvalidSubTransactionId;
+ relation->rd_backend = InvalidBackendId;
+ relation->rd_islocaltemp = false;
+
+ /*
+ * initialize relation tuple form
+ *
+ * The data we insert here is pretty incomplete/bogus, but it'll serve to
+ * get us launched. RelationCacheInitializePhase3() will read the real
+ * data from pg_class and replace what we've done here. Note in
+ * particular that relowner is left as zero; this cues
+ * RelationCacheInitializePhase3 that the real data isn't there yet.
+ */
+ relation->rd_rel = (Form_pg_class) palloc0(CLASS_TUPLE_SIZE);
+
+ namestrcpy(&relation->rd_rel->relname, relationName);
+ relation->rd_rel->relnamespace = PG_CATALOG_NAMESPACE;
+ relation->rd_rel->reltype = relationReltype;
+
+ /*
+ * It's important to distinguish between shared and non-shared relations,
+ * even at bootstrap time, to make sure we know where they are stored.
+ */
+ relation->rd_rel->relisshared = isshared;
+ if (isshared)
+ relation->rd_rel->reltablespace = GLOBALTABLESPACE_OID;
+
+ /* formrdesc is used only for permanent relations */
+ relation->rd_rel->relpersistence = RELPERSISTENCE_PERMANENT;
+
+ /* ... and they're always populated, too */
+ relation->rd_rel->relispopulated = true;
+
+ relation->rd_rel->relreplident = REPLICA_IDENTITY_NOTHING;
+ relation->rd_rel->relpages = 0;
+ relation->rd_rel->reltuples = -1;
+ relation->rd_rel->relallvisible = 0;
+ relation->rd_rel->relkind = RELKIND_RELATION;
+ relation->rd_rel->relnatts = (int16) natts;
+ relation->rd_rel->relam = HEAP_TABLE_AM_OID;
+
+ /*
+ * initialize attribute tuple form
+ *
+ * Unlike the case with the relation tuple, this data had better be right
+ * because it will never be replaced. The data comes from
+ * src/include/catalog/ headers via genbki.pl.
+ */
+ relation->rd_att = CreateTemplateTupleDesc(natts);
+ relation->rd_att->tdrefcount = 1; /* mark as refcounted */
+
+ relation->rd_att->tdtypeid = relationReltype;
+ relation->rd_att->tdtypmod = -1; /* just to be sure */
+
+ /*
+ * initialize tuple desc info
+ */
+ has_not_null = false;
+ for (i = 0; i < natts; i++)
+ {
+ memcpy(TupleDescAttr(relation->rd_att, i),
+ &attrs[i],
+ ATTRIBUTE_FIXED_PART_SIZE);
+ has_not_null |= attrs[i].attnotnull;
+ /* make sure attcacheoff is valid */
+ TupleDescAttr(relation->rd_att, i)->attcacheoff = -1;
+ }
+
+ /* initialize first attribute's attcacheoff, cf RelationBuildTupleDesc */
+ TupleDescAttr(relation->rd_att, 0)->attcacheoff = 0;
+
+ /* mark not-null status */
+ if (has_not_null)
+ {
+ TupleConstr *constr = (TupleConstr *) palloc0(sizeof(TupleConstr));
+
+ constr->has_not_null = true;
+ relation->rd_att->constr = constr;
+ }
+
+ /*
+ * initialize relation id from info in att array (my, this is ugly)
+ */
+ RelationGetRelid(relation) = TupleDescAttr(relation->rd_att, 0)->attrelid;
+
+ /*
+ * All relations made with formrdesc are mapped. This is necessarily so
+ * because there is no other way to know what filenode they currently
+ * have. In bootstrap mode, add them to the initial relation mapper data,
+ * specifying that the initial filenode is the same as the OID.
+ */
+ relation->rd_rel->relfilenode = InvalidOid;
+ if (IsBootstrapProcessingMode())
+ RelationMapUpdateMap(RelationGetRelid(relation),
+ RelationGetRelid(relation),
+ isshared, true);
+
+ /*
+ * initialize the relation lock manager information
+ */
+ RelationInitLockInfo(relation); /* see lmgr.c */
+
+ /*
+ * initialize physical addressing information for the relation
+ */
+ RelationInitPhysicalAddr(relation);
+
+ /*
+ * initialize the table am handler
+ */
+ relation->rd_rel->relam = HEAP_TABLE_AM_OID;
+ relation->rd_tableam = GetHeapamTableAmRoutine();
+
+ /*
+ * initialize the rel-has-index flag, using hardwired knowledge
+ */
+ if (IsBootstrapProcessingMode())
+ {
+ /* In bootstrap mode, we have no indexes */
+ relation->rd_rel->relhasindex = false;
+ }
+ else
+ {
+ /* Otherwise, all the rels formrdesc is used for have indexes */
+ relation->rd_rel->relhasindex = true;
+ }
+
+ /*
+ * add new reldesc to relcache
+ */
+ RelationCacheInsert(relation, false);
+
+ /* It's fully valid */
+ relation->rd_isvalid = true;
+}
+
+
+/* ----------------------------------------------------------------
+ * Relation Descriptor Lookup Interface
+ * ----------------------------------------------------------------
+ */
+
+/*
+ * RelationIdGetRelation
+ *
+ * Lookup a reldesc by OID; make one if not already in cache.
+ *
+ * Returns NULL if no pg_class row could be found for the given relid
+ * (suggesting we are trying to access a just-deleted relation).
+ * Any other error is reported via elog.
+ *
+ * NB: caller should already have at least AccessShareLock on the
+ * relation ID, else there are nasty race conditions.
+ *
+ * NB: relation ref count is incremented, or set to 1 if new entry.
+ * Caller should eventually decrement count. (Usually,
+ * that happens by calling RelationClose().)
+ */
+Relation
+RelationIdGetRelation(Oid relationId)
+{
+ Relation rd;
+
+ /* Make sure we're in an xact, even if this ends up being a cache hit */
+ Assert(IsTransactionState());
+
+ /*
+ * first try to find reldesc in the cache
+ */
+ RelationIdCacheLookup(relationId, rd);
+
+ if (RelationIsValid(rd))
+ {
+ /* return NULL for dropped relations */
+ if (rd->rd_droppedSubid != InvalidSubTransactionId)
+ {
+ Assert(!rd->rd_isvalid);
+ return NULL;
+ }
+
+ RelationIncrementReferenceCount(rd);
+ /* revalidate cache entry if necessary */
+ if (!rd->rd_isvalid)
+ {
+ /*
+ * Indexes only have a limited number of possible schema changes,
+ * and we don't want to use the full-blown procedure because it's
+ * a headache for indexes that reload itself depends on.
+ */
+ if (rd->rd_rel->relkind == RELKIND_INDEX ||
+ rd->rd_rel->relkind == RELKIND_PARTITIONED_INDEX)
+ RelationReloadIndexInfo(rd);
+ else
+ RelationClearRelation(rd, true);
+
+ /*
+ * Normally entries need to be valid here, but before the relcache
+ * has been initialized, not enough infrastructure exists to
+ * perform pg_class lookups. The structure of such entries doesn't
+ * change, but we still want to update the rd_rel entry. So
+ * rd_isvalid = false is left in place for a later lookup.
+ */
+ Assert(rd->rd_isvalid ||
+ (rd->rd_isnailed && !criticalRelcachesBuilt));
+ }
+ return rd;
+ }
+
+ /*
+ * no reldesc in the cache, so have RelationBuildDesc() build one and add
+ * it.
+ */
+ rd = RelationBuildDesc(relationId, true);
+ if (RelationIsValid(rd))
+ RelationIncrementReferenceCount(rd);
+ return rd;
+}
+
+/* ----------------------------------------------------------------
+ * cache invalidation support routines
+ * ----------------------------------------------------------------
+ */
+
+/*
+ * RelationIncrementReferenceCount
+ * Increments relation reference count.
+ *
+ * Note: bootstrap mode has its own weird ideas about relation refcount
+ * behavior; we ought to fix it someday, but for now, just disable
+ * reference count ownership tracking in bootstrap mode.
+ */
+void
+RelationIncrementReferenceCount(Relation rel)
+{
+ ResourceOwnerEnlargeRelationRefs(CurrentResourceOwner);
+ rel->rd_refcnt += 1;
+ if (!IsBootstrapProcessingMode())
+ ResourceOwnerRememberRelationRef(CurrentResourceOwner, rel);
+}
+
+/*
+ * RelationDecrementReferenceCount
+ * Decrements relation reference count.
+ */
+void
+RelationDecrementReferenceCount(Relation rel)
+{
+ Assert(rel->rd_refcnt > 0);
+ rel->rd_refcnt -= 1;
+ if (!IsBootstrapProcessingMode())
+ ResourceOwnerForgetRelationRef(CurrentResourceOwner, rel);
+}
+
+/*
+ * RelationClose - close an open relation
+ *
+ * Actually, we just decrement the refcount.
+ *
+ * NOTE: if compiled with -DRELCACHE_FORCE_RELEASE then relcache entries
+ * will be freed as soon as their refcount goes to zero. In combination
+ * with aset.c's CLOBBER_FREED_MEMORY option, this provides a good test
+ * to catch references to already-released relcache entries. It slows
+ * things down quite a bit, however.
+ */
+void
+RelationClose(Relation relation)
+{
+ /* Note: no locking manipulations needed */
+ RelationDecrementReferenceCount(relation);
+
+ /*
+ * If the relation is no longer open in this session, we can clean up any
+ * stale partition descriptors it has. This is unlikely, so check to see
+ * if there are child contexts before expending a call to mcxt.c.
+ */
+ if (RelationHasReferenceCountZero(relation))
+ {
+ if (relation->rd_pdcxt != NULL &&
+ relation->rd_pdcxt->firstchild != NULL)
+ MemoryContextDeleteChildren(relation->rd_pdcxt);
+
+ if (relation->rd_pddcxt != NULL &&
+ relation->rd_pddcxt->firstchild != NULL)
+ MemoryContextDeleteChildren(relation->rd_pddcxt);
+ }
+
+#ifdef RELCACHE_FORCE_RELEASE
+ if (RelationHasReferenceCountZero(relation) &&
+ relation->rd_createSubid == InvalidSubTransactionId &&
+ relation->rd_firstRelfilenodeSubid == InvalidSubTransactionId)
+ RelationClearRelation(relation, false);
+#endif
+}
+
+/*
+ * RelationReloadIndexInfo - reload minimal information for an open index
+ *
+ * This function is used only for indexes. A relcache inval on an index
+ * can mean that its pg_class or pg_index row changed. There are only
+ * very limited changes that are allowed to an existing index's schema,
+ * so we can update the relcache entry without a complete rebuild; which
+ * is fortunate because we can't rebuild an index entry that is "nailed"
+ * and/or in active use. We support full replacement of the pg_class row,
+ * as well as updates of a few simple fields of the pg_index row.
+ *
+ * We can't necessarily reread the catalog rows right away; we might be
+ * in a failed transaction when we receive the SI notification. If so,
+ * RelationClearRelation just marks the entry as invalid by setting
+ * rd_isvalid to false. This routine is called to fix the entry when it
+ * is next needed.
+ *
+ * We assume that at the time we are called, we have at least AccessShareLock
+ * on the target index. (Note: in the calls from RelationClearRelation,
+ * this is legitimate because we know the rel has positive refcount.)
+ *
+ * If the target index is an index on pg_class or pg_index, we'd better have
+ * previously gotten at least AccessShareLock on its underlying catalog,
+ * else we are at risk of deadlock against someone trying to exclusive-lock
+ * the heap and index in that order. This is ensured in current usage by
+ * only applying this to indexes being opened or having positive refcount.
+ */
+static void
+RelationReloadIndexInfo(Relation relation)
+{
+ bool indexOK;
+ HeapTuple pg_class_tuple;
+ Form_pg_class relp;
+
+ /* Should be called only for invalidated, live indexes */
+ Assert((relation->rd_rel->relkind == RELKIND_INDEX ||
+ relation->rd_rel->relkind == RELKIND_PARTITIONED_INDEX) &&
+ !relation->rd_isvalid &&
+ relation->rd_droppedSubid == InvalidSubTransactionId);
+
+ /* Ensure it's closed at smgr level */
+ RelationCloseSmgr(relation);
+
+ /* Must free any AM cached data upon relcache flush */
+ if (relation->rd_amcache)
+ pfree(relation->rd_amcache);
+ relation->rd_amcache = NULL;
+
+ /*
+ * If it's a shared index, we might be called before backend startup has
+ * finished selecting a database, in which case we have no way to read
+ * pg_class yet. However, a shared index can never have any significant
+ * schema updates, so it's okay to ignore the invalidation signal. Just
+ * mark it valid and return without doing anything more.
+ */
+ if (relation->rd_rel->relisshared && !criticalRelcachesBuilt)
+ {
+ relation->rd_isvalid = true;
+ return;
+ }
+
+ /*
+ * Read the pg_class row
+ *
+ * Don't try to use an indexscan of pg_class_oid_index to reload the info
+ * for pg_class_oid_index ...
+ */
+ indexOK = (RelationGetRelid(relation) != ClassOidIndexId);
+ pg_class_tuple = ScanPgRelation(RelationGetRelid(relation), indexOK, false);
+ if (!HeapTupleIsValid(pg_class_tuple))
+ elog(ERROR, "could not find pg_class tuple for index %u",
+ RelationGetRelid(relation));
+ relp = (Form_pg_class) GETSTRUCT(pg_class_tuple);
+ memcpy(relation->rd_rel, relp, CLASS_TUPLE_SIZE);
+ /* Reload reloptions in case they changed */
+ if (relation->rd_options)
+ pfree(relation->rd_options);
+ RelationParseRelOptions(relation, pg_class_tuple);
+ /* done with pg_class tuple */
+ heap_freetuple(pg_class_tuple);
+ /* We must recalculate physical address in case it changed */
+ RelationInitPhysicalAddr(relation);
+
+ /*
+ * For a non-system index, there are fields of the pg_index row that are
+ * allowed to change, so re-read that row and update the relcache entry.
+ * Most of the info derived from pg_index (such as support function lookup
+ * info) cannot change, and indeed the whole point of this routine is to
+ * update the relcache entry without clobbering that data; so wholesale
+ * replacement is not appropriate.
+ */
+ if (!IsSystemRelation(relation))
+ {
+ HeapTuple tuple;
+ Form_pg_index index;
+
+ tuple = SearchSysCache1(INDEXRELID,
+ ObjectIdGetDatum(RelationGetRelid(relation)));
+ if (!HeapTupleIsValid(tuple))
+ elog(ERROR, "cache lookup failed for index %u",
+ RelationGetRelid(relation));
+ index = (Form_pg_index) GETSTRUCT(tuple);
+
+ /*
+ * Basically, let's just copy all the bool fields. There are one or
+ * two of these that can't actually change in the current code, but
+ * it's not worth it to track exactly which ones they are. None of
+ * the array fields are allowed to change, though.
+ */
+ relation->rd_index->indisunique = index->indisunique;
+ relation->rd_index->indnullsnotdistinct = index->indnullsnotdistinct;
+ relation->rd_index->indisprimary = index->indisprimary;
+ relation->rd_index->indisexclusion = index->indisexclusion;
+ relation->rd_index->indimmediate = index->indimmediate;
+ relation->rd_index->indisclustered = index->indisclustered;
+ relation->rd_index->indisvalid = index->indisvalid;
+ relation->rd_index->indcheckxmin = index->indcheckxmin;
+ relation->rd_index->indisready = index->indisready;
+ relation->rd_index->indislive = index->indislive;
+ relation->rd_index->indisreplident = index->indisreplident;
+
+ /* Copy xmin too, as that is needed to make sense of indcheckxmin */
+ HeapTupleHeaderSetXmin(relation->rd_indextuple->t_data,
+ HeapTupleHeaderGetXmin(tuple->t_data));
+
+ ReleaseSysCache(tuple);
+ }
+
+ /* Okay, now it's valid again */
+ relation->rd_isvalid = true;
+}
+
+/*
+ * RelationReloadNailed - reload minimal information for nailed relations.
+ *
+ * The structure of a nailed relation can never change (which is good, because
+ * we rely on knowing their structure to be able to read catalog content). But
+ * some parts, e.g. pg_class.relfrozenxid, are still important to have
+ * accurate content for. Therefore those need to be reloaded after the arrival
+ * of invalidations.
+ */
+static void
+RelationReloadNailed(Relation relation)
+{
+ Assert(relation->rd_isnailed);
+
+ /*
+ * Redo RelationInitPhysicalAddr in case it is a mapped relation whose
+ * mapping changed.
+ */
+ RelationInitPhysicalAddr(relation);
+
+ /* flag as needing to be revalidated */
+ relation->rd_isvalid = false;
+
+ /*
+ * Can only reread catalog contents if in a transaction. If the relation
+ * is currently open (not counting the nailed refcount), do so
+ * immediately. Otherwise we've already marked the entry as possibly
+ * invalid, and it'll be fixed when next opened.
+ */
+ if (!IsTransactionState() || relation->rd_refcnt <= 1)
+ return;
+
+ if (relation->rd_rel->relkind == RELKIND_INDEX)
+ {
+ /*
+ * If it's a nailed-but-not-mapped index, then we need to re-read the
+ * pg_class row to see if its relfilenode changed.
+ */
+ RelationReloadIndexInfo(relation);
+ }
+ else
+ {
+ /*
+ * Reload a non-index entry. We can't easily do so if relcaches
+ * aren't yet built, but that's fine because at that stage the
+ * attributes that need to be current (like relfrozenxid) aren't yet
+ * accessed. To ensure the entry will later be revalidated, we leave
+ * it in invalid state, but allow use (cf. RelationIdGetRelation()).
+ */
+ if (criticalRelcachesBuilt)
+ {
+ HeapTuple pg_class_tuple;
+ Form_pg_class relp;
+
+ /*
+ * NB: Mark the entry as valid before starting to scan, to avoid
+ * self-recursion when re-building pg_class.
+ */
+ relation->rd_isvalid = true;
+
+ pg_class_tuple = ScanPgRelation(RelationGetRelid(relation),
+ true, false);
+ relp = (Form_pg_class) GETSTRUCT(pg_class_tuple);
+ memcpy(relation->rd_rel, relp, CLASS_TUPLE_SIZE);
+ heap_freetuple(pg_class_tuple);
+
+ /*
+ * Again mark as valid, to protect against concurrently arriving
+ * invalidations.
+ */
+ relation->rd_isvalid = true;
+ }
+ }
+}
+
+/*
+ * RelationDestroyRelation
+ *
+ * Physically delete a relation cache entry and all subsidiary data.
+ * Caller must already have unhooked the entry from the hash table.
+ */
+static void
+RelationDestroyRelation(Relation relation, bool remember_tupdesc)
+{
+ Assert(RelationHasReferenceCountZero(relation));
+
+ /*
+ * Make sure smgr and lower levels close the relation's files, if they
+ * weren't closed already. (This was probably done by caller, but let's
+ * just be real sure.)
+ */
+ RelationCloseSmgr(relation);
+
+ /* break mutual link with stats entry */
+ pgstat_unlink_relation(relation);
+
+ /*
+ * Free all the subsidiary data structures of the relcache entry, then the
+ * entry itself.
+ */
+ if (relation->rd_rel)
+ pfree(relation->rd_rel);
+ /* can't use DecrTupleDescRefCount here */
+ Assert(relation->rd_att->tdrefcount > 0);
+ if (--relation->rd_att->tdrefcount == 0)
+ {
+ /*
+ * If we Rebuilt a relcache entry during a transaction then its
+ * possible we did that because the TupDesc changed as the result of
+ * an ALTER TABLE that ran at less than AccessExclusiveLock. It's
+ * possible someone copied that TupDesc, in which case the copy would
+ * point to free'd memory. So if we rebuild an entry we keep the
+ * TupDesc around until end of transaction, to be safe.
+ */
+ if (remember_tupdesc)
+ RememberToFreeTupleDescAtEOX(relation->rd_att);
+ else
+ FreeTupleDesc(relation->rd_att);
+ }
+ FreeTriggerDesc(relation->trigdesc);
+ list_free_deep(relation->rd_fkeylist);
+ list_free(relation->rd_indexlist);
+ list_free(relation->rd_statlist);
+ bms_free(relation->rd_indexattr);
+ bms_free(relation->rd_keyattr);
+ bms_free(relation->rd_pkattr);
+ bms_free(relation->rd_idattr);
+ if (relation->rd_pubdesc)
+ pfree(relation->rd_pubdesc);
+ if (relation->rd_options)
+ pfree(relation->rd_options);
+ if (relation->rd_indextuple)
+ pfree(relation->rd_indextuple);
+ if (relation->rd_amcache)
+ pfree(relation->rd_amcache);
+ if (relation->rd_fdwroutine)
+ pfree(relation->rd_fdwroutine);
+ if (relation->rd_indexcxt)
+ MemoryContextDelete(relation->rd_indexcxt);
+ if (relation->rd_rulescxt)
+ MemoryContextDelete(relation->rd_rulescxt);
+ if (relation->rd_rsdesc)
+ MemoryContextDelete(relation->rd_rsdesc->rscxt);
+ if (relation->rd_partkeycxt)
+ MemoryContextDelete(relation->rd_partkeycxt);
+ if (relation->rd_pdcxt)
+ MemoryContextDelete(relation->rd_pdcxt);
+ if (relation->rd_pddcxt)
+ MemoryContextDelete(relation->rd_pddcxt);
+ if (relation->rd_partcheckcxt)
+ MemoryContextDelete(relation->rd_partcheckcxt);
+ pfree(relation);
+}
+
+/*
+ * RelationClearRelation
+ *
+ * Physically blow away a relation cache entry, or reset it and rebuild
+ * it from scratch (that is, from catalog entries). The latter path is
+ * used when we are notified of a change to an open relation (one with
+ * refcount > 0).
+ *
+ * NB: when rebuilding, we'd better hold some lock on the relation,
+ * else the catalog data we need to read could be changing under us.
+ * Also, a rel to be rebuilt had better have refcnt > 0. This is because
+ * a sinval reset could happen while we're accessing the catalogs, and
+ * the rel would get blown away underneath us by RelationCacheInvalidate
+ * if it has zero refcnt.
+ *
+ * The "rebuild" parameter is redundant in current usage because it has
+ * to match the relation's refcnt status, but we keep it as a crosscheck
+ * that we're doing what the caller expects.
+ */
+static void
+RelationClearRelation(Relation relation, bool rebuild)
+{
+ /*
+ * As per notes above, a rel to be rebuilt MUST have refcnt > 0; while of
+ * course it would be an equally bad idea to blow away one with nonzero
+ * refcnt, since that would leave someone somewhere with a dangling
+ * pointer. All callers are expected to have verified that this holds.
+ */
+ Assert(rebuild ?
+ !RelationHasReferenceCountZero(relation) :
+ RelationHasReferenceCountZero(relation));
+
+ /*
+ * Make sure smgr and lower levels close the relation's files, if they
+ * weren't closed already. If the relation is not getting deleted, the
+ * next smgr access should reopen the files automatically. This ensures
+ * that the low-level file access state is updated after, say, a vacuum
+ * truncation.
+ */
+ RelationCloseSmgr(relation);
+
+ /* Free AM cached data, if any */
+ if (relation->rd_amcache)
+ pfree(relation->rd_amcache);
+ relation->rd_amcache = NULL;
+
+ /*
+ * Treat nailed-in system relations separately, they always need to be
+ * accessible, so we can't blow them away.
+ */
+ if (relation->rd_isnailed)
+ {
+ RelationReloadNailed(relation);
+ return;
+ }
+
+ /* Mark it invalid until we've finished rebuild */
+ relation->rd_isvalid = false;
+
+ /* See RelationForgetRelation(). */
+ if (relation->rd_droppedSubid != InvalidSubTransactionId)
+ return;
+
+ /*
+ * Even non-system indexes should not be blown away if they are open and
+ * have valid index support information. This avoids problems with active
+ * use of the index support information. As with nailed indexes, we
+ * re-read the pg_class row to handle possible physical relocation of the
+ * index, and we check for pg_index updates too.
+ */
+ if ((relation->rd_rel->relkind == RELKIND_INDEX ||
+ relation->rd_rel->relkind == RELKIND_PARTITIONED_INDEX) &&
+ relation->rd_refcnt > 0 &&
+ relation->rd_indexcxt != NULL)
+ {
+ if (IsTransactionState())
+ RelationReloadIndexInfo(relation);
+ return;
+ }
+
+ /*
+ * If we're really done with the relcache entry, blow it away. But if
+ * someone is still using it, reconstruct the whole deal without moving
+ * the physical RelationData record (so that the someone's pointer is
+ * still valid).
+ */
+ if (!rebuild)
+ {
+ /* Remove it from the hash table */
+ RelationCacheDelete(relation);
+
+ /* And release storage */
+ RelationDestroyRelation(relation, false);
+ }
+ else if (!IsTransactionState())
+ {
+ /*
+ * If we're not inside a valid transaction, we can't do any catalog
+ * access so it's not possible to rebuild yet. Just exit, leaving
+ * rd_isvalid = false so that the rebuild will occur when the entry is
+ * next opened.
+ *
+ * Note: it's possible that we come here during subtransaction abort,
+ * and the reason for wanting to rebuild is that the rel is open in
+ * the outer transaction. In that case it might seem unsafe to not
+ * rebuild immediately, since whatever code has the rel already open
+ * will keep on using the relcache entry as-is. However, in such a
+ * case the outer transaction should be holding a lock that's
+ * sufficient to prevent any significant change in the rel's schema,
+ * so the existing entry contents should be good enough for its
+ * purposes; at worst we might be behind on statistics updates or the
+ * like. (See also CheckTableNotInUse() and its callers.) These same
+ * remarks also apply to the cases above where we exit without having
+ * done RelationReloadIndexInfo() yet.
+ */
+ return;
+ }
+ else
+ {
+ /*
+ * Our strategy for rebuilding an open relcache entry is to build a
+ * new entry from scratch, swap its contents with the old entry, and
+ * finally delete the new entry (along with any infrastructure swapped
+ * over from the old entry). This is to avoid trouble in case an
+ * error causes us to lose control partway through. The old entry
+ * will still be marked !rd_isvalid, so we'll try to rebuild it again
+ * on next access. Meanwhile it's not any less valid than it was
+ * before, so any code that might expect to continue accessing it
+ * isn't hurt by the rebuild failure. (Consider for example a
+ * subtransaction that ALTERs a table and then gets canceled partway
+ * through the cache entry rebuild. The outer transaction should
+ * still see the not-modified cache entry as valid.) The worst
+ * consequence of an error is leaking the necessarily-unreferenced new
+ * entry, and this shouldn't happen often enough for that to be a big
+ * problem.
+ *
+ * When rebuilding an open relcache entry, we must preserve ref count,
+ * rd_*Subid, and rd_toastoid state. Also attempt to preserve the
+ * pg_class entry (rd_rel), tupledesc, rewrite-rule, partition key,
+ * and partition descriptor substructures in place, because various
+ * places assume that these structures won't move while they are
+ * working with an open relcache entry. (Note: the refcount
+ * mechanism for tupledescs might someday allow us to remove this hack
+ * for the tupledesc.)
+ *
+ * Note that this process does not touch CurrentResourceOwner; which
+ * is good because whatever ref counts the entry may have do not
+ * necessarily belong to that resource owner.
+ */
+ Relation newrel;
+ Oid save_relid = RelationGetRelid(relation);
+ bool keep_tupdesc;
+ bool keep_rules;
+ bool keep_policies;
+ bool keep_partkey;
+ bool keep_pgstats;
+
+ /* Build temporary entry, but don't link it into hashtable */
+ newrel = RelationBuildDesc(save_relid, false);
+
+ /*
+ * Between here and the end of the swap, don't add code that does or
+ * reasonably could read system catalogs. That range must be free
+ * from invalidation processing. See RelationBuildDesc() manipulation
+ * of in_progress_list.
+ */
+
+ if (newrel == NULL)
+ {
+ /*
+ * We can validly get here, if we're using a historic snapshot in
+ * which a relation, accessed from outside logical decoding, is
+ * still invisible. In that case it's fine to just mark the
+ * relation as invalid and return - it'll fully get reloaded by
+ * the cache reset at the end of logical decoding (or at the next
+ * access). During normal processing we don't want to ignore this
+ * case as it shouldn't happen there, as explained below.
+ */
+ if (HistoricSnapshotActive())
+ return;
+
+ /*
+ * This shouldn't happen as dropping a relation is intended to be
+ * impossible if still referenced (cf. CheckTableNotInUse()). But
+ * if we get here anyway, we can't just delete the relcache entry,
+ * as it possibly could get accessed later (as e.g. the error
+ * might get trapped and handled via a subtransaction rollback).
+ */
+ elog(ERROR, "relation %u deleted while still in use", save_relid);
+ }
+
+ keep_tupdesc = equalTupleDescs(relation->rd_att, newrel->rd_att);
+ keep_rules = equalRuleLocks(relation->rd_rules, newrel->rd_rules);
+ keep_policies = equalRSDesc(relation->rd_rsdesc, newrel->rd_rsdesc);
+ /* partkey is immutable once set up, so we can always keep it */
+ keep_partkey = (relation->rd_partkey != NULL);
+
+ /*
+ * Keep stats pointers, except when the relkind changes (e.g. when
+ * converting tables into views). Different kinds of relations might
+ * have different types of stats.
+ *
+ * If we don't want to keep the stats, unlink the stats and relcache
+ * entry (and do so before entering the "critical section"
+ * below). This is important because otherwise
+ * PgStat_TableStatus->relation would get out of sync with
+ * relation->pgstat_info.
+ */
+ keep_pgstats = relation->rd_rel->relkind == newrel->rd_rel->relkind;
+ if (!keep_pgstats)
+ pgstat_unlink_relation(relation);
+
+ /*
+ * Perform swapping of the relcache entry contents. Within this
+ * process the old entry is momentarily invalid, so there *must* be no
+ * possibility of CHECK_FOR_INTERRUPTS within this sequence. Do it in
+ * all-in-line code for safety.
+ *
+ * Since the vast majority of fields should be swapped, our method is
+ * to swap the whole structures and then re-swap those few fields we
+ * didn't want swapped.
+ */
+#define SWAPFIELD(fldtype, fldname) \
+ do { \
+ fldtype _tmp = newrel->fldname; \
+ newrel->fldname = relation->fldname; \
+ relation->fldname = _tmp; \
+ } while (0)
+
+ /* swap all Relation struct fields */
+ {
+ RelationData tmpstruct;
+
+ memcpy(&tmpstruct, newrel, sizeof(RelationData));
+ memcpy(newrel, relation, sizeof(RelationData));
+ memcpy(relation, &tmpstruct, sizeof(RelationData));
+ }
+
+ /* rd_smgr must not be swapped, due to back-links from smgr level */
+ SWAPFIELD(SMgrRelation, rd_smgr);
+ /* rd_refcnt must be preserved */
+ SWAPFIELD(int, rd_refcnt);
+ /* isnailed shouldn't change */
+ Assert(newrel->rd_isnailed == relation->rd_isnailed);
+ /* creation sub-XIDs must be preserved */
+ SWAPFIELD(SubTransactionId, rd_createSubid);
+ SWAPFIELD(SubTransactionId, rd_newRelfilenodeSubid);
+ SWAPFIELD(SubTransactionId, rd_firstRelfilenodeSubid);
+ SWAPFIELD(SubTransactionId, rd_droppedSubid);
+ /* un-swap rd_rel pointers, swap contents instead */
+ SWAPFIELD(Form_pg_class, rd_rel);
+ /* ... but actually, we don't have to update newrel->rd_rel */
+ memcpy(relation->rd_rel, newrel->rd_rel, CLASS_TUPLE_SIZE);
+ /* preserve old tupledesc, rules, policies if no logical change */
+ if (keep_tupdesc)
+ SWAPFIELD(TupleDesc, rd_att);
+ if (keep_rules)
+ {
+ SWAPFIELD(RuleLock *, rd_rules);
+ SWAPFIELD(MemoryContext, rd_rulescxt);
+ }
+ if (keep_policies)
+ SWAPFIELD(RowSecurityDesc *, rd_rsdesc);
+ /* toast OID override must be preserved */
+ SWAPFIELD(Oid, rd_toastoid);
+
+ /* pgstat_info / enabled must be preserved */
+ if (keep_pgstats)
+ {
+ SWAPFIELD(struct PgStat_TableStatus *, pgstat_info);
+ SWAPFIELD(bool, pgstat_enabled);
+ }
+
+ /* preserve old partition key if we have one */
+ if (keep_partkey)
+ {
+ SWAPFIELD(PartitionKey, rd_partkey);
+ SWAPFIELD(MemoryContext, rd_partkeycxt);
+ }
+ if (newrel->rd_pdcxt != NULL || newrel->rd_pddcxt != NULL)
+ {
+ /*
+ * We are rebuilding a partitioned relation with a non-zero
+ * reference count, so we must keep the old partition descriptor
+ * around, in case there's a PartitionDirectory with a pointer to
+ * it. This means we can't free the old rd_pdcxt yet. (This is
+ * necessary because RelationGetPartitionDesc hands out direct
+ * pointers to the relcache's data structure, unlike our usual
+ * practice which is to hand out copies. We'd have the same
+ * problem with rd_partkey, except that we always preserve that
+ * once created.)
+ *
+ * To ensure that it's not leaked completely, re-attach it to the
+ * new reldesc, or make it a child of the new reldesc's rd_pdcxt
+ * in the unlikely event that there is one already. (Compare hack
+ * in RelationBuildPartitionDesc.) RelationClose will clean up
+ * any such contexts once the reference count reaches zero.
+ *
+ * In the case where the reference count is zero, this code is not
+ * reached, which should be OK because in that case there should
+ * be no PartitionDirectory with a pointer to the old entry.
+ *
+ * Note that newrel and relation have already been swapped, so the
+ * "old" partition descriptor is actually the one hanging off of
+ * newrel.
+ */
+ relation->rd_partdesc = NULL; /* ensure rd_partdesc is invalid */
+ relation->rd_partdesc_nodetached = NULL;
+ relation->rd_partdesc_nodetached_xmin = InvalidTransactionId;
+ if (relation->rd_pdcxt != NULL) /* probably never happens */
+ MemoryContextSetParent(newrel->rd_pdcxt, relation->rd_pdcxt);
+ else
+ relation->rd_pdcxt = newrel->rd_pdcxt;
+ if (relation->rd_pddcxt != NULL)
+ MemoryContextSetParent(newrel->rd_pddcxt, relation->rd_pddcxt);
+ else
+ relation->rd_pddcxt = newrel->rd_pddcxt;
+ /* drop newrel's pointers so we don't destroy it below */
+ newrel->rd_partdesc = NULL;
+ newrel->rd_partdesc_nodetached = NULL;
+ newrel->rd_partdesc_nodetached_xmin = InvalidTransactionId;
+ newrel->rd_pdcxt = NULL;
+ newrel->rd_pddcxt = NULL;
+ }
+
+#undef SWAPFIELD
+
+ /* And now we can throw away the temporary entry */
+ RelationDestroyRelation(newrel, !keep_tupdesc);
+ }
+}
+
+/*
+ * RelationFlushRelation
+ *
+ * Rebuild the relation if it is open (refcount > 0), else blow it away.
+ * This is used when we receive a cache invalidation event for the rel.
+ */
+static void
+RelationFlushRelation(Relation relation)
+{
+ if (relation->rd_createSubid != InvalidSubTransactionId ||
+ relation->rd_firstRelfilenodeSubid != InvalidSubTransactionId)
+ {
+ /*
+ * New relcache entries are always rebuilt, not flushed; else we'd
+ * forget the "new" status of the relation. Ditto for the
+ * new-relfilenode status.
+ *
+ * The rel could have zero refcnt here, so temporarily increment the
+ * refcnt to ensure it's safe to rebuild it. We can assume that the
+ * current transaction has some lock on the rel already.
+ */
+ RelationIncrementReferenceCount(relation);
+ RelationClearRelation(relation, true);
+ RelationDecrementReferenceCount(relation);
+ }
+ else
+ {
+ /*
+ * Pre-existing rels can be dropped from the relcache if not open.
+ */
+ bool rebuild = !RelationHasReferenceCountZero(relation);
+
+ RelationClearRelation(relation, rebuild);
+ }
+}
+
+/*
+ * RelationForgetRelation - caller reports that it dropped the relation
+ */
+void
+RelationForgetRelation(Oid rid)
+{
+ Relation relation;
+
+ RelationIdCacheLookup(rid, relation);
+
+ if (!PointerIsValid(relation))
+ return; /* not in cache, nothing to do */
+
+ if (!RelationHasReferenceCountZero(relation))
+ elog(ERROR, "relation %u is still open", rid);
+
+ Assert(relation->rd_droppedSubid == InvalidSubTransactionId);
+ if (relation->rd_createSubid != InvalidSubTransactionId ||
+ relation->rd_firstRelfilenodeSubid != InvalidSubTransactionId)
+ {
+ /*
+ * In the event of subtransaction rollback, we must not forget
+ * rd_*Subid. Mark the entry "dropped" so RelationClearRelation()
+ * invalidates it in lieu of destroying it. (If we're in a top
+ * transaction, we could opt to destroy the entry.)
+ */
+ relation->rd_droppedSubid = GetCurrentSubTransactionId();
+ }
+
+ RelationClearRelation(relation, false);
+}
+
+/*
+ * RelationCacheInvalidateEntry
+ *
+ * This routine is invoked for SI cache flush messages.
+ *
+ * Any relcache entry matching the relid must be flushed. (Note: caller has
+ * already determined that the relid belongs to our database or is a shared
+ * relation.)
+ *
+ * We used to skip local relations, on the grounds that they could
+ * not be targets of cross-backend SI update messages; but it seems
+ * safer to process them, so that our *own* SI update messages will
+ * have the same effects during CommandCounterIncrement for both
+ * local and nonlocal relations.
+ */
+void
+RelationCacheInvalidateEntry(Oid relationId)
+{
+ Relation relation;
+
+ RelationIdCacheLookup(relationId, relation);
+
+ if (PointerIsValid(relation))
+ {
+ relcacheInvalsReceived++;
+ RelationFlushRelation(relation);
+ }
+ else
+ {
+ int i;
+
+ for (i = 0; i < in_progress_list_len; i++)
+ if (in_progress_list[i].reloid == relationId)
+ in_progress_list[i].invalidated = true;
+ }
+}
+
+/*
+ * RelationCacheInvalidate
+ * Blow away cached relation descriptors that have zero reference counts,
+ * and rebuild those with positive reference counts. Also reset the smgr
+ * relation cache and re-read relation mapping data.
+ *
+ * Apart from debug_discard_caches, this is currently used only to recover
+ * from SI message buffer overflow, so we do not touch relations having
+ * new-in-transaction relfilenodes; they cannot be targets of cross-backend
+ * SI updates (and our own updates now go through a separate linked list
+ * that isn't limited by the SI message buffer size).
+ *
+ * We do this in two phases: the first pass deletes deletable items, and
+ * the second one rebuilds the rebuildable items. This is essential for
+ * safety, because hash_seq_search only copes with concurrent deletion of
+ * the element it is currently visiting. If a second SI overflow were to
+ * occur while we are walking the table, resulting in recursive entry to
+ * this routine, we could crash because the inner invocation blows away
+ * the entry next to be visited by the outer scan. But this way is OK,
+ * because (a) during the first pass we won't process any more SI messages,
+ * so hash_seq_search will complete safely; (b) during the second pass we
+ * only hold onto pointers to nondeletable entries.
+ *
+ * The two-phase approach also makes it easy to update relfilenodes for
+ * mapped relations before we do anything else, and to ensure that the
+ * second pass processes nailed-in-cache items before other nondeletable
+ * items. This should ensure that system catalogs are up to date before
+ * we attempt to use them to reload information about other open relations.
+ *
+ * After those two phases of work having immediate effects, we normally
+ * signal any RelationBuildDesc() on the stack to start over. However, we
+ * don't do this if called as part of debug_discard_caches. Otherwise,
+ * RelationBuildDesc() would become an infinite loop.
+ */
+void
+RelationCacheInvalidate(bool debug_discard)
+{
+ HASH_SEQ_STATUS status;
+ RelIdCacheEnt *idhentry;
+ Relation relation;
+ List *rebuildFirstList = NIL;
+ List *rebuildList = NIL;
+ ListCell *l;
+ int i;
+
+ /*
+ * Reload relation mapping data before starting to reconstruct cache.
+ */
+ RelationMapInvalidateAll();
+
+ /* Phase 1 */
+ hash_seq_init(&status, RelationIdCache);
+
+ while ((idhentry = (RelIdCacheEnt *) hash_seq_search(&status)) != NULL)
+ {
+ relation = idhentry->reldesc;
+
+ /* Must close all smgr references to avoid leaving dangling ptrs */
+ RelationCloseSmgr(relation);
+
+ /*
+ * Ignore new relations; no other backend will manipulate them before
+ * we commit. Likewise, before replacing a relation's relfilenode, we
+ * shall have acquired AccessExclusiveLock and drained any applicable
+ * pending invalidations.
+ */
+ if (relation->rd_createSubid != InvalidSubTransactionId ||
+ relation->rd_firstRelfilenodeSubid != InvalidSubTransactionId)
+ continue;
+
+ relcacheInvalsReceived++;
+
+ if (RelationHasReferenceCountZero(relation))
+ {
+ /* Delete this entry immediately */
+ Assert(!relation->rd_isnailed);
+ RelationClearRelation(relation, false);
+ }
+ else
+ {
+ /*
+ * If it's a mapped relation, immediately update its rd_node in
+ * case its relfilenode changed. We must do this during phase 1
+ * in case the relation is consulted during rebuild of other
+ * relcache entries in phase 2. It's safe since consulting the
+ * map doesn't involve any access to relcache entries.
+ */
+ if (RelationIsMapped(relation))
+ RelationInitPhysicalAddr(relation);
+
+ /*
+ * Add this entry to list of stuff to rebuild in second pass.
+ * pg_class goes to the front of rebuildFirstList while
+ * pg_class_oid_index goes to the back of rebuildFirstList, so
+ * they are done first and second respectively. Other nailed
+ * relations go to the front of rebuildList, so they'll be done
+ * next in no particular order; and everything else goes to the
+ * back of rebuildList.
+ */
+ if (RelationGetRelid(relation) == RelationRelationId)
+ rebuildFirstList = lcons(relation, rebuildFirstList);
+ else if (RelationGetRelid(relation) == ClassOidIndexId)
+ rebuildFirstList = lappend(rebuildFirstList, relation);
+ else if (relation->rd_isnailed)
+ rebuildList = lcons(relation, rebuildList);
+ else
+ rebuildList = lappend(rebuildList, relation);
+ }
+ }
+
+ /*
+ * Now zap any remaining smgr cache entries. This must happen before we
+ * start to rebuild entries, since that may involve catalog fetches which
+ * will re-open catalog files.
+ */
+ smgrcloseall();
+
+ /* Phase 2: rebuild the items found to need rebuild in phase 1 */
+ foreach(l, rebuildFirstList)
+ {
+ relation = (Relation) lfirst(l);
+ RelationClearRelation(relation, true);
+ }
+ list_free(rebuildFirstList);
+ foreach(l, rebuildList)
+ {
+ relation = (Relation) lfirst(l);
+ RelationClearRelation(relation, true);
+ }
+ list_free(rebuildList);
+
+ if (!debug_discard)
+ /* Any RelationBuildDesc() on the stack must start over. */
+ for (i = 0; i < in_progress_list_len; i++)
+ in_progress_list[i].invalidated = true;
+}
+
+/*
+ * RelationCloseSmgrByOid - close a relcache entry's smgr link
+ *
+ * Needed in some cases where we are changing a relation's physical mapping.
+ * The link will be automatically reopened on next use.
+ */
+void
+RelationCloseSmgrByOid(Oid relationId)
+{
+ Relation relation;
+
+ RelationIdCacheLookup(relationId, relation);
+
+ if (!PointerIsValid(relation))
+ return; /* not in cache, nothing to do */
+
+ RelationCloseSmgr(relation);
+}
+
+static void
+RememberToFreeTupleDescAtEOX(TupleDesc td)
+{
+ if (EOXactTupleDescArray == NULL)
+ {
+ MemoryContext oldcxt;
+
+ oldcxt = MemoryContextSwitchTo(CacheMemoryContext);
+
+ EOXactTupleDescArray = (TupleDesc *) palloc(16 * sizeof(TupleDesc));
+ EOXactTupleDescArrayLen = 16;
+ NextEOXactTupleDescNum = 0;
+ MemoryContextSwitchTo(oldcxt);
+ }
+ else if (NextEOXactTupleDescNum >= EOXactTupleDescArrayLen)
+ {
+ int32 newlen = EOXactTupleDescArrayLen * 2;
+
+ Assert(EOXactTupleDescArrayLen > 0);
+
+ EOXactTupleDescArray = (TupleDesc *) repalloc(EOXactTupleDescArray,
+ newlen * sizeof(TupleDesc));
+ EOXactTupleDescArrayLen = newlen;
+ }
+
+ EOXactTupleDescArray[NextEOXactTupleDescNum++] = td;
+}
+
+#ifdef USE_ASSERT_CHECKING
+static void
+AssertPendingSyncConsistency(Relation relation)
+{
+ bool relcache_verdict =
+ RelationIsPermanent(relation) &&
+ ((relation->rd_createSubid != InvalidSubTransactionId &&
+ RELKIND_HAS_STORAGE(relation->rd_rel->relkind)) ||
+ relation->rd_firstRelfilenodeSubid != InvalidSubTransactionId);
+
+ Assert(relcache_verdict == RelFileNodeSkippingWAL(relation->rd_node));
+
+ if (relation->rd_droppedSubid != InvalidSubTransactionId)
+ Assert(!relation->rd_isvalid &&
+ (relation->rd_createSubid != InvalidSubTransactionId ||
+ relation->rd_firstRelfilenodeSubid != InvalidSubTransactionId));
+}
+
+/*
+ * AssertPendingSyncs_RelationCache
+ *
+ * Assert that relcache.c and storage.c agree on whether to skip WAL.
+ */
+void
+AssertPendingSyncs_RelationCache(void)
+{
+ HASH_SEQ_STATUS status;
+ LOCALLOCK *locallock;
+ Relation *rels;
+ int maxrels;
+ int nrels;
+ RelIdCacheEnt *idhentry;
+ int i;
+
+ /*
+ * Open every relation that this transaction has locked. If, for some
+ * relation, storage.c is skipping WAL and relcache.c is not skipping WAL,
+ * a CommandCounterIncrement() typically yields a local invalidation
+ * message that destroys the relcache entry. By recreating such entries
+ * here, we detect the problem.
+ */
+ PushActiveSnapshot(GetTransactionSnapshot());
+ maxrels = 1;
+ rels = palloc(maxrels * sizeof(*rels));
+ nrels = 0;
+ hash_seq_init(&status, GetLockMethodLocalHash());
+ while ((locallock = (LOCALLOCK *) hash_seq_search(&status)) != NULL)
+ {
+ Oid relid;
+ Relation r;
+
+ if (locallock->nLocks <= 0)
+ continue;
+ if ((LockTagType) locallock->tag.lock.locktag_type !=
+ LOCKTAG_RELATION)
+ continue;
+ relid = ObjectIdGetDatum(locallock->tag.lock.locktag_field2);
+ r = RelationIdGetRelation(relid);
+ if (!RelationIsValid(r))
+ continue;
+ if (nrels >= maxrels)
+ {
+ maxrels *= 2;
+ rels = repalloc(rels, maxrels * sizeof(*rels));
+ }
+ rels[nrels++] = r;
+ }
+
+ hash_seq_init(&status, RelationIdCache);
+ while ((idhentry = (RelIdCacheEnt *) hash_seq_search(&status)) != NULL)
+ AssertPendingSyncConsistency(idhentry->reldesc);
+
+ for (i = 0; i < nrels; i++)
+ RelationClose(rels[i]);
+ PopActiveSnapshot();
+}
+#endif
+
+/*
+ * AtEOXact_RelationCache
+ *
+ * Clean up the relcache at main-transaction commit or abort.
+ *
+ * Note: this must be called *before* processing invalidation messages.
+ * In the case of abort, we don't want to try to rebuild any invalidated
+ * cache entries (since we can't safely do database accesses). Therefore
+ * we must reset refcnts before handling pending invalidations.
+ *
+ * As of PostgreSQL 8.1, relcache refcnts should get released by the
+ * ResourceOwner mechanism. This routine just does a debugging
+ * cross-check that no pins remain. However, we also need to do special
+ * cleanup when the current transaction created any relations or made use
+ * of forced index lists.
+ */
+void
+AtEOXact_RelationCache(bool isCommit)
+{
+ HASH_SEQ_STATUS status;
+ RelIdCacheEnt *idhentry;
+ int i;
+
+ /*
+ * Forget in_progress_list. This is relevant when we're aborting due to
+ * an error during RelationBuildDesc().
+ */
+ Assert(in_progress_list_len == 0 || !isCommit);
+ in_progress_list_len = 0;
+
+ /*
+ * Unless the eoxact_list[] overflowed, we only need to examine the rels
+ * listed in it. Otherwise fall back on a hash_seq_search scan.
+ *
+ * For simplicity, eoxact_list[] entries are not deleted till end of
+ * top-level transaction, even though we could remove them at
+ * subtransaction end in some cases, or remove relations from the list if
+ * they are cleared for other reasons. Therefore we should expect the
+ * case that list entries are not found in the hashtable; if not, there's
+ * nothing to do for them.
+ */
+ if (eoxact_list_overflowed)
+ {
+ hash_seq_init(&status, RelationIdCache);
+ while ((idhentry = (RelIdCacheEnt *) hash_seq_search(&status)) != NULL)
+ {
+ AtEOXact_cleanup(idhentry->reldesc, isCommit);
+ }
+ }
+ else
+ {
+ for (i = 0; i < eoxact_list_len; i++)
+ {
+ idhentry = (RelIdCacheEnt *) hash_search(RelationIdCache,
+ (void *) &eoxact_list[i],
+ HASH_FIND,
+ NULL);
+ if (idhentry != NULL)
+ AtEOXact_cleanup(idhentry->reldesc, isCommit);
+ }
+ }
+
+ if (EOXactTupleDescArrayLen > 0)
+ {
+ Assert(EOXactTupleDescArray != NULL);
+ for (i = 0; i < NextEOXactTupleDescNum; i++)
+ FreeTupleDesc(EOXactTupleDescArray[i]);
+ pfree(EOXactTupleDescArray);
+ EOXactTupleDescArray = NULL;
+ }
+
+ /* Now we're out of the transaction and can clear the lists */
+ eoxact_list_len = 0;
+ eoxact_list_overflowed = false;
+ NextEOXactTupleDescNum = 0;
+ EOXactTupleDescArrayLen = 0;
+}
+
+/*
+ * AtEOXact_cleanup
+ *
+ * Clean up a single rel at main-transaction commit or abort
+ *
+ * NB: this processing must be idempotent, because EOXactListAdd() doesn't
+ * bother to prevent duplicate entries in eoxact_list[].
+ */
+static void
+AtEOXact_cleanup(Relation relation, bool isCommit)
+{
+ bool clear_relcache = false;
+
+ /*
+ * The relcache entry's ref count should be back to its normal
+ * not-in-a-transaction state: 0 unless it's nailed in cache.
+ *
+ * In bootstrap mode, this is NOT true, so don't check it --- the
+ * bootstrap code expects relations to stay open across start/commit
+ * transaction calls. (That seems bogus, but it's not worth fixing.)
+ *
+ * Note: ideally this check would be applied to every relcache entry, not
+ * just those that have eoxact work to do. But it's not worth forcing a
+ * scan of the whole relcache just for this. (Moreover, doing so would
+ * mean that assert-enabled testing never tests the hash_search code path
+ * above, which seems a bad idea.)
+ */
+#ifdef USE_ASSERT_CHECKING
+ if (!IsBootstrapProcessingMode())
+ {
+ int expected_refcnt;
+
+ expected_refcnt = relation->rd_isnailed ? 1 : 0;
+ Assert(relation->rd_refcnt == expected_refcnt);
+ }
+#endif
+
+ /*
+ * Is the relation live after this transaction ends?
+ *
+ * During commit, clear the relcache entry if it is preserved after
+ * relation drop, in order not to orphan the entry. During rollback,
+ * clear the relcache entry if the relation is created in the current
+ * transaction since it isn't interesting any longer once we are out of
+ * the transaction.
+ */
+ clear_relcache =
+ (isCommit ?
+ relation->rd_droppedSubid != InvalidSubTransactionId :
+ relation->rd_createSubid != InvalidSubTransactionId);
+
+ /*
+ * Since we are now out of the transaction, reset the subids to zero. That
+ * also lets RelationClearRelation() drop the relcache entry.
+ */
+ relation->rd_createSubid = InvalidSubTransactionId;
+ relation->rd_newRelfilenodeSubid = InvalidSubTransactionId;
+ relation->rd_firstRelfilenodeSubid = InvalidSubTransactionId;
+ relation->rd_droppedSubid = InvalidSubTransactionId;
+
+ if (clear_relcache)
+ {
+ if (RelationHasReferenceCountZero(relation))
+ {
+ RelationClearRelation(relation, false);
+ return;
+ }
+ else
+ {
+ /*
+ * Hmm, somewhere there's a (leaked?) reference to the relation.
+ * We daren't remove the entry for fear of dereferencing a
+ * dangling pointer later. Bleat, and mark it as not belonging to
+ * the current transaction. Hopefully it'll get cleaned up
+ * eventually. This must be just a WARNING to avoid
+ * error-during-error-recovery loops.
+ */
+ elog(WARNING, "cannot remove relcache entry for \"%s\" because it has nonzero refcount",
+ RelationGetRelationName(relation));
+ }
+ }
+}
+
+/*
+ * AtEOSubXact_RelationCache
+ *
+ * Clean up the relcache at sub-transaction commit or abort.
+ *
+ * Note: this must be called *before* processing invalidation messages.
+ */
+void
+AtEOSubXact_RelationCache(bool isCommit, SubTransactionId mySubid,
+ SubTransactionId parentSubid)
+{
+ HASH_SEQ_STATUS status;
+ RelIdCacheEnt *idhentry;
+ int i;
+
+ /*
+ * Forget in_progress_list. This is relevant when we're aborting due to
+ * an error during RelationBuildDesc(). We don't commit subtransactions
+ * during RelationBuildDesc().
+ */
+ Assert(in_progress_list_len == 0 || !isCommit);
+ in_progress_list_len = 0;
+
+ /*
+ * Unless the eoxact_list[] overflowed, we only need to examine the rels
+ * listed in it. Otherwise fall back on a hash_seq_search scan. Same
+ * logic as in AtEOXact_RelationCache.
+ */
+ if (eoxact_list_overflowed)
+ {
+ hash_seq_init(&status, RelationIdCache);
+ while ((idhentry = (RelIdCacheEnt *) hash_seq_search(&status)) != NULL)
+ {
+ AtEOSubXact_cleanup(idhentry->reldesc, isCommit,
+ mySubid, parentSubid);
+ }
+ }
+ else
+ {
+ for (i = 0; i < eoxact_list_len; i++)
+ {
+ idhentry = (RelIdCacheEnt *) hash_search(RelationIdCache,
+ (void *) &eoxact_list[i],
+ HASH_FIND,
+ NULL);
+ if (idhentry != NULL)
+ AtEOSubXact_cleanup(idhentry->reldesc, isCommit,
+ mySubid, parentSubid);
+ }
+ }
+
+ /* Don't reset the list; we still need more cleanup later */
+}
+
+/*
+ * AtEOSubXact_cleanup
+ *
+ * Clean up a single rel at subtransaction commit or abort
+ *
+ * NB: this processing must be idempotent, because EOXactListAdd() doesn't
+ * bother to prevent duplicate entries in eoxact_list[].
+ */
+static void
+AtEOSubXact_cleanup(Relation relation, bool isCommit,
+ SubTransactionId mySubid, SubTransactionId parentSubid)
+{
+ /*
+ * Is it a relation created in the current subtransaction?
+ *
+ * During subcommit, mark it as belonging to the parent, instead, as long
+ * as it has not been dropped. Otherwise simply delete the relcache entry.
+ * --- it isn't interesting any longer.
+ */
+ if (relation->rd_createSubid == mySubid)
+ {
+ /*
+ * Valid rd_droppedSubid means the corresponding relation is dropped
+ * but the relcache entry is preserved for at-commit pending sync. We
+ * need to drop it explicitly here not to make the entry orphan.
+ */
+ Assert(relation->rd_droppedSubid == mySubid ||
+ relation->rd_droppedSubid == InvalidSubTransactionId);
+ if (isCommit && relation->rd_droppedSubid == InvalidSubTransactionId)
+ relation->rd_createSubid = parentSubid;
+ else if (RelationHasReferenceCountZero(relation))
+ {
+ /* allow the entry to be removed */
+ relation->rd_createSubid = InvalidSubTransactionId;
+ relation->rd_newRelfilenodeSubid = InvalidSubTransactionId;
+ relation->rd_firstRelfilenodeSubid = InvalidSubTransactionId;
+ relation->rd_droppedSubid = InvalidSubTransactionId;
+ RelationClearRelation(relation, false);
+ return;
+ }
+ else
+ {
+ /*
+ * Hmm, somewhere there's a (leaked?) reference to the relation.
+ * We daren't remove the entry for fear of dereferencing a
+ * dangling pointer later. Bleat, and transfer it to the parent
+ * subtransaction so we can try again later. This must be just a
+ * WARNING to avoid error-during-error-recovery loops.
+ */
+ relation->rd_createSubid = parentSubid;
+ elog(WARNING, "cannot remove relcache entry for \"%s\" because it has nonzero refcount",
+ RelationGetRelationName(relation));
+ }
+ }
+
+ /*
+ * Likewise, update or drop any new-relfilenode-in-subtransaction record
+ * or drop record.
+ */
+ if (relation->rd_newRelfilenodeSubid == mySubid)
+ {
+ if (isCommit)
+ relation->rd_newRelfilenodeSubid = parentSubid;
+ else
+ relation->rd_newRelfilenodeSubid = InvalidSubTransactionId;
+ }
+
+ if (relation->rd_firstRelfilenodeSubid == mySubid)
+ {
+ if (isCommit)
+ relation->rd_firstRelfilenodeSubid = parentSubid;
+ else
+ relation->rd_firstRelfilenodeSubid = InvalidSubTransactionId;
+ }
+
+ if (relation->rd_droppedSubid == mySubid)
+ {
+ if (isCommit)
+ relation->rd_droppedSubid = parentSubid;
+ else
+ relation->rd_droppedSubid = InvalidSubTransactionId;
+ }
+}
+
+
+/*
+ * RelationBuildLocalRelation
+ * Build a relcache entry for an about-to-be-created relation,
+ * and enter it into the relcache.
+ */
+Relation
+RelationBuildLocalRelation(const char *relname,
+ Oid relnamespace,
+ TupleDesc tupDesc,
+ Oid relid,
+ Oid accessmtd,
+ Oid relfilenode,
+ Oid reltablespace,
+ bool shared_relation,
+ bool mapped_relation,
+ char relpersistence,
+ char relkind)
+{
+ Relation rel;
+ MemoryContext oldcxt;
+ int natts = tupDesc->natts;
+ int i;
+ bool has_not_null;
+ bool nailit;
+
+ AssertArg(natts >= 0);
+
+ /*
+ * check for creation of a rel that must be nailed in cache.
+ *
+ * XXX this list had better match the relations specially handled in
+ * RelationCacheInitializePhase2/3.
+ */
+ switch (relid)
+ {
+ case DatabaseRelationId:
+ case AuthIdRelationId:
+ case AuthMemRelationId:
+ case RelationRelationId:
+ case AttributeRelationId:
+ case ProcedureRelationId:
+ case TypeRelationId:
+ nailit = true;
+ break;
+ default:
+ nailit = false;
+ break;
+ }
+
+ /*
+ * check that hardwired list of shared rels matches what's in the
+ * bootstrap .bki file. If you get a failure here during initdb, you
+ * probably need to fix IsSharedRelation() to match whatever you've done
+ * to the set of shared relations.
+ */
+ if (shared_relation != IsSharedRelation(relid))
+ elog(ERROR, "shared_relation flag for \"%s\" does not match IsSharedRelation(%u)",
+ relname, relid);
+
+ /* Shared relations had better be mapped, too */
+ Assert(mapped_relation || !shared_relation);
+
+ /*
+ * switch to the cache context to create the relcache entry.
+ */
+ if (!CacheMemoryContext)
+ CreateCacheMemoryContext();
+
+ oldcxt = MemoryContextSwitchTo(CacheMemoryContext);
+
+ /*
+ * allocate a new relation descriptor and fill in basic state fields.
+ */
+ rel = (Relation) palloc0(sizeof(RelationData));
+
+ /* make sure relation is marked as having no open file yet */
+ rel->rd_smgr = NULL;
+
+ /* mark it nailed if appropriate */
+ rel->rd_isnailed = nailit;
+
+ rel->rd_refcnt = nailit ? 1 : 0;
+
+ /* it's being created in this transaction */
+ rel->rd_createSubid = GetCurrentSubTransactionId();
+ rel->rd_newRelfilenodeSubid = InvalidSubTransactionId;
+ rel->rd_firstRelfilenodeSubid = InvalidSubTransactionId;
+ rel->rd_droppedSubid = InvalidSubTransactionId;
+
+ /*
+ * create a new tuple descriptor from the one passed in. We do this
+ * partly to copy it into the cache context, and partly because the new
+ * relation can't have any defaults or constraints yet; they have to be
+ * added in later steps, because they require additions to multiple system
+ * catalogs. We can copy attnotnull constraints here, however.
+ */
+ rel->rd_att = CreateTupleDescCopy(tupDesc);
+ rel->rd_att->tdrefcount = 1; /* mark as refcounted */
+ has_not_null = false;
+ for (i = 0; i < natts; i++)
+ {
+ Form_pg_attribute satt = TupleDescAttr(tupDesc, i);
+ Form_pg_attribute datt = TupleDescAttr(rel->rd_att, i);
+
+ datt->attidentity = satt->attidentity;
+ datt->attgenerated = satt->attgenerated;
+ datt->attnotnull = satt->attnotnull;
+ has_not_null |= satt->attnotnull;
+ }
+
+ if (has_not_null)
+ {
+ TupleConstr *constr = (TupleConstr *) palloc0(sizeof(TupleConstr));
+
+ constr->has_not_null = true;
+ rel->rd_att->constr = constr;
+ }
+
+ /*
+ * initialize relation tuple form (caller may add/override data later)
+ */
+ rel->rd_rel = (Form_pg_class) palloc0(CLASS_TUPLE_SIZE);
+
+ namestrcpy(&rel->rd_rel->relname, relname);
+ rel->rd_rel->relnamespace = relnamespace;
+
+ rel->rd_rel->relkind = relkind;
+ rel->rd_rel->relnatts = natts;
+ rel->rd_rel->reltype = InvalidOid;
+ /* needed when bootstrapping: */
+ rel->rd_rel->relowner = BOOTSTRAP_SUPERUSERID;
+
+ /* set up persistence and relcache fields dependent on it */
+ rel->rd_rel->relpersistence = relpersistence;
+ switch (relpersistence)
+ {
+ case RELPERSISTENCE_UNLOGGED:
+ case RELPERSISTENCE_PERMANENT:
+ rel->rd_backend = InvalidBackendId;
+ rel->rd_islocaltemp = false;
+ break;
+ case RELPERSISTENCE_TEMP:
+ Assert(isTempOrTempToastNamespace(relnamespace));
+ rel->rd_backend = BackendIdForTempRelations();
+ rel->rd_islocaltemp = true;
+ break;
+ default:
+ elog(ERROR, "invalid relpersistence: %c", relpersistence);
+ break;
+ }
+
+ /* if it's a materialized view, it's not populated initially */
+ if (relkind == RELKIND_MATVIEW)
+ rel->rd_rel->relispopulated = false;
+ else
+ rel->rd_rel->relispopulated = true;
+
+ /* set replica identity -- system catalogs and non-tables don't have one */
+ if (!IsCatalogNamespace(relnamespace) &&
+ (relkind == RELKIND_RELATION ||
+ relkind == RELKIND_MATVIEW ||
+ relkind == RELKIND_PARTITIONED_TABLE))
+ rel->rd_rel->relreplident = REPLICA_IDENTITY_DEFAULT;
+ else
+ rel->rd_rel->relreplident = REPLICA_IDENTITY_NOTHING;
+
+ /*
+ * Insert relation physical and logical identifiers (OIDs) into the right
+ * places. For a mapped relation, we set relfilenode to zero and rely on
+ * RelationInitPhysicalAddr to consult the map.
+ */
+ rel->rd_rel->relisshared = shared_relation;
+
+ RelationGetRelid(rel) = relid;
+
+ for (i = 0; i < natts; i++)
+ TupleDescAttr(rel->rd_att, i)->attrelid = relid;
+
+ rel->rd_rel->reltablespace = reltablespace;
+
+ if (mapped_relation)
+ {
+ rel->rd_rel->relfilenode = InvalidOid;
+ /* Add it to the active mapping information */
+ RelationMapUpdateMap(relid, relfilenode, shared_relation, true);
+ }
+ else
+ rel->rd_rel->relfilenode = relfilenode;
+
+ RelationInitLockInfo(rel); /* see lmgr.c */
+
+ RelationInitPhysicalAddr(rel);
+
+ rel->rd_rel->relam = accessmtd;
+
+ /*
+ * RelationInitTableAccessMethod will do syscache lookups, so we mustn't
+ * run it in CacheMemoryContext. Fortunately, the remaining steps don't
+ * require a long-lived current context.
+ */
+ MemoryContextSwitchTo(oldcxt);
+
+ if (RELKIND_HAS_TABLE_AM(relkind) || relkind == RELKIND_SEQUENCE)
+ RelationInitTableAccessMethod(rel);
+
+ /*
+ * Okay to insert into the relcache hash table.
+ *
+ * Ordinarily, there should certainly not be an existing hash entry for
+ * the same OID; but during bootstrap, when we create a "real" relcache
+ * entry for one of the bootstrap relations, we'll be overwriting the
+ * phony one created with formrdesc. So allow that to happen for nailed
+ * rels.
+ */
+ RelationCacheInsert(rel, nailit);
+
+ /*
+ * Flag relation as needing eoxact cleanup (to clear rd_createSubid). We
+ * can't do this before storing relid in it.
+ */
+ EOXactListAdd(rel);
+
+ /* It's fully valid */
+ rel->rd_isvalid = true;
+
+ /*
+ * Caller expects us to pin the returned entry.
+ */
+ RelationIncrementReferenceCount(rel);
+
+ return rel;
+}
+
+
+/*
+ * RelationSetNewRelfilenode
+ *
+ * Assign a new relfilenode (physical file name), and possibly a new
+ * persistence setting, to the relation.
+ *
+ * This allows a full rewrite of the relation to be done with transactional
+ * safety (since the filenode assignment can be rolled back). Note however
+ * that there is no simple way to access the relation's old data for the
+ * remainder of the current transaction. This limits the usefulness to cases
+ * such as TRUNCATE or rebuilding an index from scratch.
+ *
+ * Caller must already hold exclusive lock on the relation.
+ */
+void
+RelationSetNewRelfilenode(Relation relation, char persistence)
+{
+ Oid newrelfilenode;
+ Relation pg_class;
+ HeapTuple tuple;
+ Form_pg_class classform;
+ MultiXactId minmulti = InvalidMultiXactId;
+ TransactionId freezeXid = InvalidTransactionId;
+ RelFileNode newrnode;
+
+ if (!IsBinaryUpgrade)
+ {
+ /* Allocate a new relfilenode */
+ newrelfilenode = GetNewRelFileNode(relation->rd_rel->reltablespace,
+ NULL, persistence);
+ }
+ else if (relation->rd_rel->relkind == RELKIND_INDEX)
+ {
+ if (!OidIsValid(binary_upgrade_next_index_pg_class_relfilenode))
+ ereport(ERROR,
+ (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+ errmsg("index relfilenode value not set when in binary upgrade mode")));
+
+ newrelfilenode = binary_upgrade_next_index_pg_class_relfilenode;
+ binary_upgrade_next_index_pg_class_relfilenode = InvalidOid;
+ }
+ else if (relation->rd_rel->relkind == RELKIND_RELATION)
+ {
+ if (!OidIsValid(binary_upgrade_next_heap_pg_class_relfilenode))
+ ereport(ERROR,
+ (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+ errmsg("heap relfilenode value not set when in binary upgrade mode")));
+
+ newrelfilenode = binary_upgrade_next_heap_pg_class_relfilenode;
+ binary_upgrade_next_heap_pg_class_relfilenode = InvalidOid;
+ }
+ else
+ ereport(ERROR,
+ (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+ errmsg("unexpected request for new relfilenode in binary upgrade mode")));
+
+ /*
+ * Get a writable copy of the pg_class tuple for the given relation.
+ */
+ pg_class = table_open(RelationRelationId, RowExclusiveLock);
+
+ tuple = SearchSysCacheCopy1(RELOID,
+ ObjectIdGetDatum(RelationGetRelid(relation)));
+ if (!HeapTupleIsValid(tuple))
+ elog(ERROR, "could not find tuple for relation %u",
+ RelationGetRelid(relation));
+ classform = (Form_pg_class) GETSTRUCT(tuple);
+
+ /*
+ * Schedule unlinking of the old storage at transaction commit, except
+ * when performing a binary upgrade, when we must do it immediately.
+ */
+ if (IsBinaryUpgrade)
+ {
+ SMgrRelation srel;
+
+ /*
+ * During a binary upgrade, we use this code path to ensure that
+ * pg_largeobject and its index have the same relfilenode values as in
+ * the old cluster. This is necessary because pg_upgrade treats
+ * pg_largeobject like a user table, not a system table. It is however
+ * possible that a table or index may need to end up with the same
+ * relfilenode in the new cluster as what it had in the old cluster.
+ * Hence, we can't wait until commit time to remove the old storage.
+ *
+ * In general, this function needs to have transactional semantics,
+ * and removing the old storage before commit time surely isn't.
+ * However, it doesn't really matter, because if a binary upgrade
+ * fails at this stage, the new cluster will need to be recreated
+ * anyway.
+ */
+ srel = smgropen(relation->rd_node, relation->rd_backend);
+ smgrdounlinkall(&srel, 1, false);
+ smgrclose(srel);
+ }
+ else
+ {
+ /* Not a binary upgrade, so just schedule it to happen later. */
+ RelationDropStorage(relation);
+ }
+
+ /*
+ * Create storage for the main fork of the new relfilenode. If it's a
+ * table-like object, call into the table AM to do so, which'll also
+ * create the table's init fork if needed.
+ *
+ * NOTE: If relevant for the AM, any conflict in relfilenode value will be
+ * caught here, if GetNewRelFileNode messes up for any reason.
+ */
+ newrnode = relation->rd_node;
+ newrnode.relNode = newrelfilenode;
+
+ if (RELKIND_HAS_TABLE_AM(relation->rd_rel->relkind))
+ {
+ table_relation_set_new_filenode(relation, &newrnode,
+ persistence,
+ &freezeXid, &minmulti);
+ }
+ else if (RELKIND_HAS_STORAGE(relation->rd_rel->relkind))
+ {
+ /* handle these directly, at least for now */
+ SMgrRelation srel;
+
+ srel = RelationCreateStorage(newrnode, persistence, true);
+ smgrclose(srel);
+ }
+ else
+ {
+ /* we shouldn't be called for anything else */
+ elog(ERROR, "relation \"%s\" does not have storage",
+ RelationGetRelationName(relation));
+ }
+
+ /*
+ * If we're dealing with a mapped index, pg_class.relfilenode doesn't
+ * change; instead we have to send the update to the relation mapper.
+ *
+ * For mapped indexes, we don't actually change the pg_class entry at all;
+ * this is essential when reindexing pg_class itself. That leaves us with
+ * possibly-inaccurate values of relpages etc, but those will be fixed up
+ * later.
+ */
+ if (RelationIsMapped(relation))
+ {
+ /* This case is only supported for indexes */
+ Assert(relation->rd_rel->relkind == RELKIND_INDEX);
+
+ /* Since we're not updating pg_class, these had better not change */
+ Assert(classform->relfrozenxid == freezeXid);
+ Assert(classform->relminmxid == minmulti);
+ Assert(classform->relpersistence == persistence);
+
+ /*
+ * In some code paths it's possible that the tuple update we'd
+ * otherwise do here is the only thing that would assign an XID for
+ * the current transaction. However, we must have an XID to delete
+ * files, so make sure one is assigned.
+ */
+ (void) GetCurrentTransactionId();
+
+ /* Do the deed */
+ RelationMapUpdateMap(RelationGetRelid(relation),
+ newrelfilenode,
+ relation->rd_rel->relisshared,
+ false);
+
+ /* Since we're not updating pg_class, must trigger inval manually */
+ CacheInvalidateRelcache(relation);
+ }
+ else
+ {
+ /* Normal case, update the pg_class entry */
+ classform->relfilenode = newrelfilenode;
+
+ /* relpages etc. never change for sequences */
+ if (relation->rd_rel->relkind != RELKIND_SEQUENCE)
+ {
+ classform->relpages = 0; /* it's empty until further notice */
+ classform->reltuples = -1;
+ classform->relallvisible = 0;
+ }
+ classform->relfrozenxid = freezeXid;
+ classform->relminmxid = minmulti;
+ classform->relpersistence = persistence;
+
+ CatalogTupleUpdate(pg_class, &tuple->t_self, tuple);
+ }
+
+ heap_freetuple(tuple);
+
+ table_close(pg_class, RowExclusiveLock);
+
+ /*
+ * Make the pg_class row change or relation map change visible. This will
+ * cause the relcache entry to get updated, too.
+ */
+ CommandCounterIncrement();
+
+ RelationAssumeNewRelfilenode(relation);
+}
+
+/*
+ * RelationAssumeNewRelfilenode
+ *
+ * Code that modifies pg_class.reltablespace or pg_class.relfilenode must call
+ * this. The call shall precede any code that might insert WAL records whose
+ * replay would modify bytes in the new RelFileNode, and the call shall follow
+ * any WAL modifying bytes in the prior RelFileNode. See struct RelationData.
+ * Ideally, call this as near as possible to the CommandCounterIncrement()
+ * that makes the pg_class change visible (before it or after it); that
+ * minimizes the chance of future development adding a forbidden WAL insertion
+ * between RelationAssumeNewRelfilenode() and CommandCounterIncrement().
+ */
+void
+RelationAssumeNewRelfilenode(Relation relation)
+{
+ relation->rd_newRelfilenodeSubid = GetCurrentSubTransactionId();
+ if (relation->rd_firstRelfilenodeSubid == InvalidSubTransactionId)
+ relation->rd_firstRelfilenodeSubid = relation->rd_newRelfilenodeSubid;
+
+ /* Flag relation as needing eoxact cleanup (to clear these fields) */
+ EOXactListAdd(relation);
+}
+
+
+/*
+ * RelationCacheInitialize
+ *
+ * This initializes the relation descriptor cache. At the time
+ * that this is invoked, we can't do database access yet (mainly
+ * because the transaction subsystem is not up); all we are doing
+ * is making an empty cache hashtable. This must be done before
+ * starting the initialization transaction, because otherwise
+ * AtEOXact_RelationCache would crash if that transaction aborts
+ * before we can get the relcache set up.
+ */
+
+#define INITRELCACHESIZE 400
+
+void
+RelationCacheInitialize(void)
+{
+ HASHCTL ctl;
+ int allocsize;
+
+ /*
+ * make sure cache memory context exists
+ */
+ if (!CacheMemoryContext)
+ CreateCacheMemoryContext();
+
+ /*
+ * create hashtable that indexes the relcache
+ */
+ ctl.keysize = sizeof(Oid);
+ ctl.entrysize = sizeof(RelIdCacheEnt);
+ RelationIdCache = hash_create("Relcache by OID", INITRELCACHESIZE,
+ &ctl, HASH_ELEM | HASH_BLOBS);
+
+ /*
+ * reserve enough in_progress_list slots for many cases
+ */
+ allocsize = 4;
+ in_progress_list =
+ MemoryContextAlloc(CacheMemoryContext,
+ allocsize * sizeof(*in_progress_list));
+ in_progress_list_maxlen = allocsize;
+
+ /*
+ * relation mapper needs to be initialized too
+ */
+ RelationMapInitialize();
+}
+
+/*
+ * RelationCacheInitializePhase2
+ *
+ * This is called to prepare for access to shared catalogs during startup.
+ * We must at least set up nailed reldescs for pg_database, pg_authid,
+ * pg_auth_members, and pg_shseclabel. Ideally we'd like to have reldescs
+ * for their indexes, too. We attempt to load this information from the
+ * shared relcache init file. If that's missing or broken, just make
+ * phony entries for the catalogs themselves.
+ * RelationCacheInitializePhase3 will clean up as needed.
+ */
+void
+RelationCacheInitializePhase2(void)
+{
+ MemoryContext oldcxt;
+
+ /*
+ * relation mapper needs initialized too
+ */
+ RelationMapInitializePhase2();
+
+ /*
+ * In bootstrap mode, the shared catalogs aren't there yet anyway, so do
+ * nothing.
+ */
+ if (IsBootstrapProcessingMode())
+ return;
+
+ /*
+ * switch to cache memory context
+ */
+ oldcxt = MemoryContextSwitchTo(CacheMemoryContext);
+
+ /*
+ * Try to load the shared relcache cache file. If unsuccessful, bootstrap
+ * the cache with pre-made descriptors for the critical shared catalogs.
+ */
+ if (!load_relcache_init_file(true))
+ {
+ formrdesc("pg_database", DatabaseRelation_Rowtype_Id, true,
+ Natts_pg_database, Desc_pg_database);
+ formrdesc("pg_authid", AuthIdRelation_Rowtype_Id, true,
+ Natts_pg_authid, Desc_pg_authid);
+ formrdesc("pg_auth_members", AuthMemRelation_Rowtype_Id, true,
+ Natts_pg_auth_members, Desc_pg_auth_members);
+ formrdesc("pg_shseclabel", SharedSecLabelRelation_Rowtype_Id, true,
+ Natts_pg_shseclabel, Desc_pg_shseclabel);
+ formrdesc("pg_subscription", SubscriptionRelation_Rowtype_Id, true,
+ Natts_pg_subscription, Desc_pg_subscription);
+
+#define NUM_CRITICAL_SHARED_RELS 5 /* fix if you change list above */
+ }
+
+ MemoryContextSwitchTo(oldcxt);
+}
+
+/*
+ * RelationCacheInitializePhase3
+ *
+ * This is called as soon as the catcache and transaction system
+ * are functional and we have determined MyDatabaseId. At this point
+ * we can actually read data from the database's system catalogs.
+ * We first try to read pre-computed relcache entries from the local
+ * relcache init file. If that's missing or broken, make phony entries
+ * for the minimum set of nailed-in-cache relations. Then (unless
+ * bootstrapping) make sure we have entries for the critical system
+ * indexes. Once we've done all this, we have enough infrastructure to
+ * open any system catalog or use any catcache. The last step is to
+ * rewrite the cache files if needed.
+ */
+void
+RelationCacheInitializePhase3(void)
+{
+ HASH_SEQ_STATUS status;
+ RelIdCacheEnt *idhentry;
+ MemoryContext oldcxt;
+ bool needNewCacheFile = !criticalSharedRelcachesBuilt;
+
+ /*
+ * relation mapper needs initialized too
+ */
+ RelationMapInitializePhase3();
+
+ /*
+ * switch to cache memory context
+ */
+ oldcxt = MemoryContextSwitchTo(CacheMemoryContext);
+
+ /*
+ * Try to load the local relcache cache file. If unsuccessful, bootstrap
+ * the cache with pre-made descriptors for the critical "nailed-in" system
+ * catalogs.
+ */
+ if (IsBootstrapProcessingMode() ||
+ !load_relcache_init_file(false))
+ {
+ needNewCacheFile = true;
+
+ formrdesc("pg_class", RelationRelation_Rowtype_Id, false,
+ Natts_pg_class, Desc_pg_class);
+ formrdesc("pg_attribute", AttributeRelation_Rowtype_Id, false,
+ Natts_pg_attribute, Desc_pg_attribute);
+ formrdesc("pg_proc", ProcedureRelation_Rowtype_Id, false,
+ Natts_pg_proc, Desc_pg_proc);
+ formrdesc("pg_type", TypeRelation_Rowtype_Id, false,
+ Natts_pg_type, Desc_pg_type);
+
+#define NUM_CRITICAL_LOCAL_RELS 4 /* fix if you change list above */
+ }
+
+ MemoryContextSwitchTo(oldcxt);
+
+ /* In bootstrap mode, the faked-up formrdesc info is all we'll have */
+ if (IsBootstrapProcessingMode())
+ return;
+
+ /*
+ * If we didn't get the critical system indexes loaded into relcache, do
+ * so now. These are critical because the catcache and/or opclass cache
+ * depend on them for fetches done during relcache load. Thus, we have an
+ * infinite-recursion problem. We can break the recursion by doing
+ * heapscans instead of indexscans at certain key spots. To avoid hobbling
+ * performance, we only want to do that until we have the critical indexes
+ * loaded into relcache. Thus, the flag criticalRelcachesBuilt is used to
+ * decide whether to do heapscan or indexscan at the key spots, and we set
+ * it true after we've loaded the critical indexes.
+ *
+ * The critical indexes are marked as "nailed in cache", partly to make it
+ * easy for load_relcache_init_file to count them, but mainly because we
+ * cannot flush and rebuild them once we've set criticalRelcachesBuilt to
+ * true. (NOTE: perhaps it would be possible to reload them by
+ * temporarily setting criticalRelcachesBuilt to false again. For now,
+ * though, we just nail 'em in.)
+ *
+ * RewriteRelRulenameIndexId and TriggerRelidNameIndexId are not critical
+ * in the same way as the others, because the critical catalogs don't
+ * (currently) have any rules or triggers, and so these indexes can be
+ * rebuilt without inducing recursion. However they are used during
+ * relcache load when a rel does have rules or triggers, so we choose to
+ * nail them for performance reasons.
+ */
+ if (!criticalRelcachesBuilt)
+ {
+ load_critical_index(ClassOidIndexId,
+ RelationRelationId);
+ load_critical_index(AttributeRelidNumIndexId,
+ AttributeRelationId);
+ load_critical_index(IndexRelidIndexId,
+ IndexRelationId);
+ load_critical_index(OpclassOidIndexId,
+ OperatorClassRelationId);
+ load_critical_index(AccessMethodProcedureIndexId,
+ AccessMethodProcedureRelationId);
+ load_critical_index(RewriteRelRulenameIndexId,
+ RewriteRelationId);
+ load_critical_index(TriggerRelidNameIndexId,
+ TriggerRelationId);
+
+#define NUM_CRITICAL_LOCAL_INDEXES 7 /* fix if you change list above */
+
+ criticalRelcachesBuilt = true;
+ }
+
+ /*
+ * Process critical shared indexes too.
+ *
+ * DatabaseNameIndexId isn't critical for relcache loading, but rather for
+ * initial lookup of MyDatabaseId, without which we'll never find any
+ * non-shared catalogs at all. Autovacuum calls InitPostgres with a
+ * database OID, so it instead depends on DatabaseOidIndexId. We also
+ * need to nail up some indexes on pg_authid and pg_auth_members for use
+ * during client authentication. SharedSecLabelObjectIndexId isn't
+ * critical for the core system, but authentication hooks might be
+ * interested in it.
+ */
+ if (!criticalSharedRelcachesBuilt)
+ {
+ load_critical_index(DatabaseNameIndexId,
+ DatabaseRelationId);
+ load_critical_index(DatabaseOidIndexId,
+ DatabaseRelationId);
+ load_critical_index(AuthIdRolnameIndexId,
+ AuthIdRelationId);
+ load_critical_index(AuthIdOidIndexId,
+ AuthIdRelationId);
+ load_critical_index(AuthMemMemRoleIndexId,
+ AuthMemRelationId);
+ load_critical_index(SharedSecLabelObjectIndexId,
+ SharedSecLabelRelationId);
+
+#define NUM_CRITICAL_SHARED_INDEXES 6 /* fix if you change list above */
+
+ criticalSharedRelcachesBuilt = true;
+ }
+
+ /*
+ * Now, scan all the relcache entries and update anything that might be
+ * wrong in the results from formrdesc or the relcache cache file. If we
+ * faked up relcache entries using formrdesc, then read the real pg_class
+ * rows and replace the fake entries with them. Also, if any of the
+ * relcache entries have rules, triggers, or security policies, load that
+ * info the hard way since it isn't recorded in the cache file.
+ *
+ * Whenever we access the catalogs to read data, there is a possibility of
+ * a shared-inval cache flush causing relcache entries to be removed.
+ * Since hash_seq_search only guarantees to still work after the *current*
+ * entry is removed, it's unsafe to continue the hashtable scan afterward.
+ * We handle this by restarting the scan from scratch after each access.
+ * This is theoretically O(N^2), but the number of entries that actually
+ * need to be fixed is small enough that it doesn't matter.
+ */
+ hash_seq_init(&status, RelationIdCache);
+
+ while ((idhentry = (RelIdCacheEnt *) hash_seq_search(&status)) != NULL)
+ {
+ Relation relation = idhentry->reldesc;
+ bool restart = false;
+
+ /*
+ * Make sure *this* entry doesn't get flushed while we work with it.
+ */
+ RelationIncrementReferenceCount(relation);
+
+ /*
+ * If it's a faked-up entry, read the real pg_class tuple.
+ */
+ if (relation->rd_rel->relowner == InvalidOid)
+ {
+ HeapTuple htup;
+ Form_pg_class relp;
+
+ htup = SearchSysCache1(RELOID,
+ ObjectIdGetDatum(RelationGetRelid(relation)));
+ if (!HeapTupleIsValid(htup))
+ elog(FATAL, "cache lookup failed for relation %u",
+ RelationGetRelid(relation));
+ relp = (Form_pg_class) GETSTRUCT(htup);
+
+ /*
+ * Copy tuple to relation->rd_rel. (See notes in
+ * AllocateRelationDesc())
+ */
+ memcpy((char *) relation->rd_rel, (char *) relp, CLASS_TUPLE_SIZE);
+
+ /* Update rd_options while we have the tuple */
+ if (relation->rd_options)
+ pfree(relation->rd_options);
+ RelationParseRelOptions(relation, htup);
+
+ /*
+ * Check the values in rd_att were set up correctly. (We cannot
+ * just copy them over now: formrdesc must have set up the rd_att
+ * data correctly to start with, because it may already have been
+ * copied into one or more catcache entries.)
+ */
+ Assert(relation->rd_att->tdtypeid == relp->reltype);
+ Assert(relation->rd_att->tdtypmod == -1);
+
+ ReleaseSysCache(htup);
+
+ /* relowner had better be OK now, else we'll loop forever */
+ if (relation->rd_rel->relowner == InvalidOid)
+ elog(ERROR, "invalid relowner in pg_class entry for \"%s\"",
+ RelationGetRelationName(relation));
+
+ restart = true;
+ }
+
+ /*
+ * Fix data that isn't saved in relcache cache file.
+ *
+ * relhasrules or relhastriggers could possibly be wrong or out of
+ * date. If we don't actually find any rules or triggers, clear the
+ * local copy of the flag so that we don't get into an infinite loop
+ * here. We don't make any attempt to fix the pg_class entry, though.
+ */
+ if (relation->rd_rel->relhasrules && relation->rd_rules == NULL)
+ {
+ RelationBuildRuleLock(relation);
+ if (relation->rd_rules == NULL)
+ relation->rd_rel->relhasrules = false;
+ restart = true;
+ }
+ if (relation->rd_rel->relhastriggers && relation->trigdesc == NULL)
+ {
+ RelationBuildTriggers(relation);
+ if (relation->trigdesc == NULL)
+ relation->rd_rel->relhastriggers = false;
+ restart = true;
+ }
+
+ /*
+ * Re-load the row security policies if the relation has them, since
+ * they are not preserved in the cache. Note that we can never NOT
+ * have a policy while relrowsecurity is true,
+ * RelationBuildRowSecurity will create a single default-deny policy
+ * if there is no policy defined in pg_policy.
+ */
+ if (relation->rd_rel->relrowsecurity && relation->rd_rsdesc == NULL)
+ {
+ RelationBuildRowSecurity(relation);
+
+ Assert(relation->rd_rsdesc != NULL);
+ restart = true;
+ }
+
+ /* Reload tableam data if needed */
+ if (relation->rd_tableam == NULL &&
+ (RELKIND_HAS_TABLE_AM(relation->rd_rel->relkind) || relation->rd_rel->relkind == RELKIND_SEQUENCE))
+ {
+ RelationInitTableAccessMethod(relation);
+ Assert(relation->rd_tableam != NULL);
+
+ restart = true;
+ }
+
+ /* Release hold on the relation */
+ RelationDecrementReferenceCount(relation);
+
+ /* Now, restart the hashtable scan if needed */
+ if (restart)
+ {
+ hash_seq_term(&status);
+ hash_seq_init(&status, RelationIdCache);
+ }
+ }
+
+ /*
+ * Lastly, write out new relcache cache files if needed. We don't bother
+ * to distinguish cases where only one of the two needs an update.
+ */
+ if (needNewCacheFile)
+ {
+ /*
+ * Force all the catcaches to finish initializing and thereby open the
+ * catalogs and indexes they use. This will preload the relcache with
+ * entries for all the most important system catalogs and indexes, so
+ * that the init files will be most useful for future backends.
+ */
+ InitCatalogCachePhase2();
+
+ /* now write the files */
+ write_relcache_init_file(true);
+ write_relcache_init_file(false);
+ }
+}
+
+/*
+ * Load one critical system index into the relcache
+ *
+ * indexoid is the OID of the target index, heapoid is the OID of the catalog
+ * it belongs to.
+ */
+static void
+load_critical_index(Oid indexoid, Oid heapoid)
+{
+ Relation ird;
+
+ /*
+ * We must lock the underlying catalog before locking the index to avoid
+ * deadlock, since RelationBuildDesc might well need to read the catalog,
+ * and if anyone else is exclusive-locking this catalog and index they'll
+ * be doing it in that order.
+ */
+ LockRelationOid(heapoid, AccessShareLock);
+ LockRelationOid(indexoid, AccessShareLock);
+ ird = RelationBuildDesc(indexoid, true);
+ if (ird == NULL)
+ elog(PANIC, "could not open critical system index %u", indexoid);
+ ird->rd_isnailed = true;
+ ird->rd_refcnt = 1;
+ UnlockRelationOid(indexoid, AccessShareLock);
+ UnlockRelationOid(heapoid, AccessShareLock);
+
+ (void) RelationGetIndexAttOptions(ird, false);
+}
+
+/*
+ * GetPgClassDescriptor -- get a predefined tuple descriptor for pg_class
+ * GetPgIndexDescriptor -- get a predefined tuple descriptor for pg_index
+ *
+ * We need this kluge because we have to be able to access non-fixed-width
+ * fields of pg_class and pg_index before we have the standard catalog caches
+ * available. We use predefined data that's set up in just the same way as
+ * the bootstrapped reldescs used by formrdesc(). The resulting tupdesc is
+ * not 100% kosher: it does not have the correct rowtype OID in tdtypeid, nor
+ * does it have a TupleConstr field. But it's good enough for the purpose of
+ * extracting fields.
+ */
+static TupleDesc
+BuildHardcodedDescriptor(int natts, const FormData_pg_attribute *attrs)
+{
+ TupleDesc result;
+ MemoryContext oldcxt;
+ int i;
+
+ oldcxt = MemoryContextSwitchTo(CacheMemoryContext);
+
+ result = CreateTemplateTupleDesc(natts);
+ result->tdtypeid = RECORDOID; /* not right, but we don't care */
+ result->tdtypmod = -1;
+
+ for (i = 0; i < natts; i++)
+ {
+ memcpy(TupleDescAttr(result, i), &attrs[i], ATTRIBUTE_FIXED_PART_SIZE);
+ /* make sure attcacheoff is valid */
+ TupleDescAttr(result, i)->attcacheoff = -1;
+ }
+
+ /* initialize first attribute's attcacheoff, cf RelationBuildTupleDesc */
+ TupleDescAttr(result, 0)->attcacheoff = 0;
+
+ /* Note: we don't bother to set up a TupleConstr entry */
+
+ MemoryContextSwitchTo(oldcxt);
+
+ return result;
+}
+
+static TupleDesc
+GetPgClassDescriptor(void)
+{
+ static TupleDesc pgclassdesc = NULL;
+
+ /* Already done? */
+ if (pgclassdesc == NULL)
+ pgclassdesc = BuildHardcodedDescriptor(Natts_pg_class,
+ Desc_pg_class);
+
+ return pgclassdesc;
+}
+
+static TupleDesc
+GetPgIndexDescriptor(void)
+{
+ static TupleDesc pgindexdesc = NULL;
+
+ /* Already done? */
+ if (pgindexdesc == NULL)
+ pgindexdesc = BuildHardcodedDescriptor(Natts_pg_index,
+ Desc_pg_index);
+
+ return pgindexdesc;
+}
+
+/*
+ * Load any default attribute value definitions for the relation.
+ *
+ * ndef is the number of attributes that were marked atthasdef.
+ *
+ * Note: we don't make it a hard error to be missing some pg_attrdef records.
+ * We can limp along as long as nothing needs to use the default value. Code
+ * that fails to find an expected AttrDefault record should throw an error.
+ */
+static void
+AttrDefaultFetch(Relation relation, int ndef)
+{
+ AttrDefault *attrdef;
+ Relation adrel;
+ SysScanDesc adscan;
+ ScanKeyData skey;
+ HeapTuple htup;
+ int found = 0;
+
+ /* Allocate array with room for as many entries as expected */
+ attrdef = (AttrDefault *)
+ MemoryContextAllocZero(CacheMemoryContext,
+ ndef * sizeof(AttrDefault));
+
+ /* Search pg_attrdef for relevant entries */
+ ScanKeyInit(&skey,
+ Anum_pg_attrdef_adrelid,
+ BTEqualStrategyNumber, F_OIDEQ,
+ ObjectIdGetDatum(RelationGetRelid(relation)));
+
+ adrel = table_open(AttrDefaultRelationId, AccessShareLock);
+ adscan = systable_beginscan(adrel, AttrDefaultIndexId, true,
+ NULL, 1, &skey);
+
+ while (HeapTupleIsValid(htup = systable_getnext(adscan)))
+ {
+ Form_pg_attrdef adform = (Form_pg_attrdef) GETSTRUCT(htup);
+ Datum val;
+ bool isnull;
+
+ /* protect limited size of array */
+ if (found >= ndef)
+ {
+ elog(WARNING, "unexpected pg_attrdef record found for attribute %d of relation \"%s\"",
+ adform->adnum, RelationGetRelationName(relation));
+ break;
+ }
+
+ val = fastgetattr(htup,
+ Anum_pg_attrdef_adbin,
+ adrel->rd_att, &isnull);
+ if (isnull)
+ elog(WARNING, "null adbin for attribute %d of relation \"%s\"",
+ adform->adnum, RelationGetRelationName(relation));
+ else
+ {
+ /* detoast and convert to cstring in caller's context */
+ char *s = TextDatumGetCString(val);
+
+ attrdef[found].adnum = adform->adnum;
+ attrdef[found].adbin = MemoryContextStrdup(CacheMemoryContext, s);
+ pfree(s);
+ found++;
+ }
+ }
+
+ systable_endscan(adscan);
+ table_close(adrel, AccessShareLock);
+
+ if (found != ndef)
+ elog(WARNING, "%d pg_attrdef record(s) missing for relation \"%s\"",
+ ndef - found, RelationGetRelationName(relation));
+
+ /*
+ * Sort the AttrDefault entries by adnum, for the convenience of
+ * equalTupleDescs(). (Usually, they already will be in order, but this
+ * might not be so if systable_getnext isn't using an index.)
+ */
+ if (found > 1)
+ qsort(attrdef, found, sizeof(AttrDefault), AttrDefaultCmp);
+
+ /* Install array only after it's fully valid */
+ relation->rd_att->constr->defval = attrdef;
+ relation->rd_att->constr->num_defval = found;
+}
+
+/*
+ * qsort comparator to sort AttrDefault entries by adnum
+ */
+static int
+AttrDefaultCmp(const void *a, const void *b)
+{
+ const AttrDefault *ada = (const AttrDefault *) a;
+ const AttrDefault *adb = (const AttrDefault *) b;
+
+ return ada->adnum - adb->adnum;
+}
+
+/*
+ * Load any check constraints for the relation.
+ *
+ * As with defaults, if we don't find the expected number of them, just warn
+ * here. The executor should throw an error if an INSERT/UPDATE is attempted.
+ */
+static void
+CheckConstraintFetch(Relation relation)
+{
+ ConstrCheck *check;
+ int ncheck = relation->rd_rel->relchecks;
+ Relation conrel;
+ SysScanDesc conscan;
+ ScanKeyData skey[1];
+ HeapTuple htup;
+ int found = 0;
+
+ /* Allocate array with room for as many entries as expected */
+ check = (ConstrCheck *)
+ MemoryContextAllocZero(CacheMemoryContext,
+ ncheck * sizeof(ConstrCheck));
+
+ /* Search pg_constraint for relevant entries */
+ ScanKeyInit(&skey[0],
+ Anum_pg_constraint_conrelid,
+ BTEqualStrategyNumber, F_OIDEQ,
+ ObjectIdGetDatum(RelationGetRelid(relation)));
+
+ conrel = table_open(ConstraintRelationId, AccessShareLock);
+ conscan = systable_beginscan(conrel, ConstraintRelidTypidNameIndexId, true,
+ NULL, 1, skey);
+
+ while (HeapTupleIsValid(htup = systable_getnext(conscan)))
+ {
+ Form_pg_constraint conform = (Form_pg_constraint) GETSTRUCT(htup);
+ Datum val;
+ bool isnull;
+
+ /* We want check constraints only */
+ if (conform->contype != CONSTRAINT_CHECK)
+ continue;
+
+ /* protect limited size of array */
+ if (found >= ncheck)
+ {
+ elog(WARNING, "unexpected pg_constraint record found for relation \"%s\"",
+ RelationGetRelationName(relation));
+ break;
+ }
+
+ check[found].ccvalid = conform->convalidated;
+ check[found].ccnoinherit = conform->connoinherit;
+ check[found].ccname = MemoryContextStrdup(CacheMemoryContext,
+ NameStr(conform->conname));
+
+ /* Grab and test conbin is actually set */
+ val = fastgetattr(htup,
+ Anum_pg_constraint_conbin,
+ conrel->rd_att, &isnull);
+ if (isnull)
+ elog(WARNING, "null conbin for relation \"%s\"",
+ RelationGetRelationName(relation));
+ else
+ {
+ /* detoast and convert to cstring in caller's context */
+ char *s = TextDatumGetCString(val);
+
+ check[found].ccbin = MemoryContextStrdup(CacheMemoryContext, s);
+ pfree(s);
+ found++;
+ }
+ }
+
+ systable_endscan(conscan);
+ table_close(conrel, AccessShareLock);
+
+ if (found != ncheck)
+ elog(WARNING, "%d pg_constraint record(s) missing for relation \"%s\"",
+ ncheck - found, RelationGetRelationName(relation));
+
+ /*
+ * Sort the records by name. This ensures that CHECKs are applied in a
+ * deterministic order, and it also makes equalTupleDescs() faster.
+ */
+ if (found > 1)
+ qsort(check, found, sizeof(ConstrCheck), CheckConstraintCmp);
+
+ /* Install array only after it's fully valid */
+ relation->rd_att->constr->check = check;
+ relation->rd_att->constr->num_check = found;
+}
+
+/*
+ * qsort comparator to sort ConstrCheck entries by name
+ */
+static int
+CheckConstraintCmp(const void *a, const void *b)
+{
+ const ConstrCheck *ca = (const ConstrCheck *) a;
+ const ConstrCheck *cb = (const ConstrCheck *) b;
+
+ return strcmp(ca->ccname, cb->ccname);
+}
+
+/*
+ * RelationGetFKeyList -- get a list of foreign key info for the relation
+ *
+ * Returns a list of ForeignKeyCacheInfo structs, one per FK constraining
+ * the given relation. This data is a direct copy of relevant fields from
+ * pg_constraint. The list items are in no particular order.
+ *
+ * CAUTION: the returned list is part of the relcache's data, and could
+ * vanish in a relcache entry reset. Callers must inspect or copy it
+ * before doing anything that might trigger a cache flush, such as
+ * system catalog accesses. copyObject() can be used if desired.
+ * (We define it this way because current callers want to filter and
+ * modify the list entries anyway, so copying would be a waste of time.)
+ */
+List *
+RelationGetFKeyList(Relation relation)
+{
+ List *result;
+ Relation conrel;
+ SysScanDesc conscan;
+ ScanKeyData skey;
+ HeapTuple htup;
+ List *oldlist;
+ MemoryContext oldcxt;
+
+ /* Quick exit if we already computed the list. */
+ if (relation->rd_fkeyvalid)
+ return relation->rd_fkeylist;
+
+ /* Fast path: non-partitioned tables without triggers can't have FKs */
+ if (!relation->rd_rel->relhastriggers &&
+ relation->rd_rel->relkind != RELKIND_PARTITIONED_TABLE)
+ return NIL;
+
+ /*
+ * We build the list we intend to return (in the caller's context) while
+ * doing the scan. After successfully completing the scan, we copy that
+ * list into the relcache entry. This avoids cache-context memory leakage
+ * if we get some sort of error partway through.
+ */
+ result = NIL;
+
+ /* Prepare to scan pg_constraint for entries having conrelid = this rel. */
+ ScanKeyInit(&skey,
+ Anum_pg_constraint_conrelid,
+ BTEqualStrategyNumber, F_OIDEQ,
+ ObjectIdGetDatum(RelationGetRelid(relation)));
+
+ conrel = table_open(ConstraintRelationId, AccessShareLock);
+ conscan = systable_beginscan(conrel, ConstraintRelidTypidNameIndexId, true,
+ NULL, 1, &skey);
+
+ while (HeapTupleIsValid(htup = systable_getnext(conscan)))
+ {
+ Form_pg_constraint constraint = (Form_pg_constraint) GETSTRUCT(htup);
+ ForeignKeyCacheInfo *info;
+
+ /* consider only foreign keys */
+ if (constraint->contype != CONSTRAINT_FOREIGN)
+ continue;
+
+ info = makeNode(ForeignKeyCacheInfo);
+ info->conoid = constraint->oid;
+ info->conrelid = constraint->conrelid;
+ info->confrelid = constraint->confrelid;
+
+ DeconstructFkConstraintRow(htup, &info->nkeys,
+ info->conkey,
+ info->confkey,
+ info->conpfeqop,
+ NULL, NULL, NULL, NULL);
+
+ /* Add FK's node to the result list */
+ result = lappend(result, info);
+ }
+
+ systable_endscan(conscan);
+ table_close(conrel, AccessShareLock);
+
+ /* Now save a copy of the completed list in the relcache entry. */
+ oldcxt = MemoryContextSwitchTo(CacheMemoryContext);
+ oldlist = relation->rd_fkeylist;
+ relation->rd_fkeylist = copyObject(result);
+ relation->rd_fkeyvalid = true;
+ MemoryContextSwitchTo(oldcxt);
+
+ /* Don't leak the old list, if there is one */
+ list_free_deep(oldlist);
+
+ return result;
+}
+
+/*
+ * RelationGetIndexList -- get a list of OIDs of indexes on this relation
+ *
+ * The index list is created only if someone requests it. We scan pg_index
+ * to find relevant indexes, and add the list to the relcache entry so that
+ * we won't have to compute it again. Note that shared cache inval of a
+ * relcache entry will delete the old list and set rd_indexvalid to false,
+ * so that we must recompute the index list on next request. This handles
+ * creation or deletion of an index.
+ *
+ * Indexes that are marked not indislive are omitted from the returned list.
+ * Such indexes are expected to be dropped momentarily, and should not be
+ * touched at all by any caller of this function.
+ *
+ * The returned list is guaranteed to be sorted in order by OID. This is
+ * needed by the executor, since for index types that we obtain exclusive
+ * locks on when updating the index, all backends must lock the indexes in
+ * the same order or we will get deadlocks (see ExecOpenIndices()). Any
+ * consistent ordering would do, but ordering by OID is easy.
+ *
+ * Since shared cache inval causes the relcache's copy of the list to go away,
+ * we return a copy of the list palloc'd in the caller's context. The caller
+ * may list_free() the returned list after scanning it. This is necessary
+ * since the caller will typically be doing syscache lookups on the relevant
+ * indexes, and syscache lookup could cause SI messages to be processed!
+ *
+ * In exactly the same way, we update rd_pkindex, which is the OID of the
+ * relation's primary key index if any, else InvalidOid; and rd_replidindex,
+ * which is the pg_class OID of an index to be used as the relation's
+ * replication identity index, or InvalidOid if there is no such index.
+ */
+List *
+RelationGetIndexList(Relation relation)
+{
+ Relation indrel;
+ SysScanDesc indscan;
+ ScanKeyData skey;
+ HeapTuple htup;
+ List *result;
+ List *oldlist;
+ char replident = relation->rd_rel->relreplident;
+ Oid pkeyIndex = InvalidOid;
+ Oid candidateIndex = InvalidOid;
+ MemoryContext oldcxt;
+
+ /* Quick exit if we already computed the list. */
+ if (relation->rd_indexvalid)
+ return list_copy(relation->rd_indexlist);
+
+ /*
+ * We build the list we intend to return (in the caller's context) while
+ * doing the scan. After successfully completing the scan, we copy that
+ * list into the relcache entry. This avoids cache-context memory leakage
+ * if we get some sort of error partway through.
+ */
+ result = NIL;
+
+ /* Prepare to scan pg_index for entries having indrelid = this rel. */
+ ScanKeyInit(&skey,
+ Anum_pg_index_indrelid,
+ BTEqualStrategyNumber, F_OIDEQ,
+ ObjectIdGetDatum(RelationGetRelid(relation)));
+
+ indrel = table_open(IndexRelationId, AccessShareLock);
+ indscan = systable_beginscan(indrel, IndexIndrelidIndexId, true,
+ NULL, 1, &skey);
+
+ while (HeapTupleIsValid(htup = systable_getnext(indscan)))
+ {
+ Form_pg_index index = (Form_pg_index) GETSTRUCT(htup);
+
+ /*
+ * Ignore any indexes that are currently being dropped. This will
+ * prevent them from being searched, inserted into, or considered in
+ * HOT-safety decisions. It's unsafe to touch such an index at all
+ * since its catalog entries could disappear at any instant.
+ */
+ if (!index->indislive)
+ continue;
+
+ /* add index's OID to result list */
+ result = lappend_oid(result, index->indexrelid);
+
+ /*
+ * Invalid, non-unique, non-immediate or predicate indexes aren't
+ * interesting for either oid indexes or replication identity indexes,
+ * so don't check them.
+ */
+ if (!index->indisvalid || !index->indisunique ||
+ !index->indimmediate ||
+ !heap_attisnull(htup, Anum_pg_index_indpred, NULL))
+ continue;
+
+ /* remember primary key index if any */
+ if (index->indisprimary)
+ pkeyIndex = index->indexrelid;
+
+ /* remember explicitly chosen replica index */
+ if (index->indisreplident)
+ candidateIndex = index->indexrelid;
+ }
+
+ systable_endscan(indscan);
+
+ table_close(indrel, AccessShareLock);
+
+ /* Sort the result list into OID order, per API spec. */
+ list_sort(result, list_oid_cmp);
+
+ /* Now save a copy of the completed list in the relcache entry. */
+ oldcxt = MemoryContextSwitchTo(CacheMemoryContext);
+ oldlist = relation->rd_indexlist;
+ relation->rd_indexlist = list_copy(result);
+ relation->rd_pkindex = pkeyIndex;
+ if (replident == REPLICA_IDENTITY_DEFAULT && OidIsValid(pkeyIndex))
+ relation->rd_replidindex = pkeyIndex;
+ else if (replident == REPLICA_IDENTITY_INDEX && OidIsValid(candidateIndex))
+ relation->rd_replidindex = candidateIndex;
+ else
+ relation->rd_replidindex = InvalidOid;
+ relation->rd_indexvalid = true;
+ MemoryContextSwitchTo(oldcxt);
+
+ /* Don't leak the old list, if there is one */
+ list_free(oldlist);
+
+ return result;
+}
+
+/*
+ * RelationGetStatExtList
+ * get a list of OIDs of statistics objects on this relation
+ *
+ * The statistics list is created only if someone requests it, in a way
+ * similar to RelationGetIndexList(). We scan pg_statistic_ext to find
+ * relevant statistics, and add the list to the relcache entry so that we
+ * won't have to compute it again. Note that shared cache inval of a
+ * relcache entry will delete the old list and set rd_statvalid to 0,
+ * so that we must recompute the statistics list on next request. This
+ * handles creation or deletion of a statistics object.
+ *
+ * The returned list is guaranteed to be sorted in order by OID, although
+ * this is not currently needed.
+ *
+ * Since shared cache inval causes the relcache's copy of the list to go away,
+ * we return a copy of the list palloc'd in the caller's context. The caller
+ * may list_free() the returned list after scanning it. This is necessary
+ * since the caller will typically be doing syscache lookups on the relevant
+ * statistics, and syscache lookup could cause SI messages to be processed!
+ */
+List *
+RelationGetStatExtList(Relation relation)
+{
+ Relation indrel;
+ SysScanDesc indscan;
+ ScanKeyData skey;
+ HeapTuple htup;
+ List *result;
+ List *oldlist;
+ MemoryContext oldcxt;
+
+ /* Quick exit if we already computed the list. */
+ if (relation->rd_statvalid != 0)
+ return list_copy(relation->rd_statlist);
+
+ /*
+ * We build the list we intend to return (in the caller's context) while
+ * doing the scan. After successfully completing the scan, we copy that
+ * list into the relcache entry. This avoids cache-context memory leakage
+ * if we get some sort of error partway through.
+ */
+ result = NIL;
+
+ /*
+ * Prepare to scan pg_statistic_ext for entries having stxrelid = this
+ * rel.
+ */
+ ScanKeyInit(&skey,
+ Anum_pg_statistic_ext_stxrelid,
+ BTEqualStrategyNumber, F_OIDEQ,
+ ObjectIdGetDatum(RelationGetRelid(relation)));
+
+ indrel = table_open(StatisticExtRelationId, AccessShareLock);
+ indscan = systable_beginscan(indrel, StatisticExtRelidIndexId, true,
+ NULL, 1, &skey);
+
+ while (HeapTupleIsValid(htup = systable_getnext(indscan)))
+ {
+ Oid oid = ((Form_pg_statistic_ext) GETSTRUCT(htup))->oid;
+
+ result = lappend_oid(result, oid);
+ }
+
+ systable_endscan(indscan);
+
+ table_close(indrel, AccessShareLock);
+
+ /* Sort the result list into OID order, per API spec. */
+ list_sort(result, list_oid_cmp);
+
+ /* Now save a copy of the completed list in the relcache entry. */
+ oldcxt = MemoryContextSwitchTo(CacheMemoryContext);
+ oldlist = relation->rd_statlist;
+ relation->rd_statlist = list_copy(result);
+
+ relation->rd_statvalid = true;
+ MemoryContextSwitchTo(oldcxt);
+
+ /* Don't leak the old list, if there is one */
+ list_free(oldlist);
+
+ return result;
+}
+
+/*
+ * RelationGetPrimaryKeyIndex -- get OID of the relation's primary key index
+ *
+ * Returns InvalidOid if there is no such index.
+ */
+Oid
+RelationGetPrimaryKeyIndex(Relation relation)
+{
+ List *ilist;
+
+ if (!relation->rd_indexvalid)
+ {
+ /* RelationGetIndexList does the heavy lifting. */
+ ilist = RelationGetIndexList(relation);
+ list_free(ilist);
+ Assert(relation->rd_indexvalid);
+ }
+
+ return relation->rd_pkindex;
+}
+
+/*
+ * RelationGetReplicaIndex -- get OID of the relation's replica identity index
+ *
+ * Returns InvalidOid if there is no such index.
+ */
+Oid
+RelationGetReplicaIndex(Relation relation)
+{
+ List *ilist;
+
+ if (!relation->rd_indexvalid)
+ {
+ /* RelationGetIndexList does the heavy lifting. */
+ ilist = RelationGetIndexList(relation);
+ list_free(ilist);
+ Assert(relation->rd_indexvalid);
+ }
+
+ return relation->rd_replidindex;
+}
+
+/*
+ * RelationGetIndexExpressions -- get the index expressions for an index
+ *
+ * We cache the result of transforming pg_index.indexprs into a node tree.
+ * If the rel is not an index or has no expressional columns, we return NIL.
+ * Otherwise, the returned tree is copied into the caller's memory context.
+ * (We don't want to return a pointer to the relcache copy, since it could
+ * disappear due to relcache invalidation.)
+ */
+List *
+RelationGetIndexExpressions(Relation relation)
+{
+ List *result;
+ Datum exprsDatum;
+ bool isnull;
+ char *exprsString;
+ MemoryContext oldcxt;
+
+ /* Quick exit if we already computed the result. */
+ if (relation->rd_indexprs)
+ return copyObject(relation->rd_indexprs);
+
+ /* Quick exit if there is nothing to do. */
+ if (relation->rd_indextuple == NULL ||
+ heap_attisnull(relation->rd_indextuple, Anum_pg_index_indexprs, NULL))
+ return NIL;
+
+ /*
+ * We build the tree we intend to return in the caller's context. After
+ * successfully completing the work, we copy it into the relcache entry.
+ * This avoids problems if we get some sort of error partway through.
+ */
+ exprsDatum = heap_getattr(relation->rd_indextuple,
+ Anum_pg_index_indexprs,
+ GetPgIndexDescriptor(),
+ &isnull);
+ Assert(!isnull);
+ exprsString = TextDatumGetCString(exprsDatum);
+ result = (List *) stringToNode(exprsString);
+ pfree(exprsString);
+
+ /*
+ * Run the expressions through eval_const_expressions. This is not just an
+ * optimization, but is necessary, because the planner will be comparing
+ * them to similarly-processed qual clauses, and may fail to detect valid
+ * matches without this. We must not use canonicalize_qual, however,
+ * since these aren't qual expressions.
+ */
+ result = (List *) eval_const_expressions(NULL, (Node *) result);
+
+ /* May as well fix opfuncids too */
+ fix_opfuncids((Node *) result);
+
+ /* Now save a copy of the completed tree in the relcache entry. */
+ oldcxt = MemoryContextSwitchTo(relation->rd_indexcxt);
+ relation->rd_indexprs = copyObject(result);
+ MemoryContextSwitchTo(oldcxt);
+
+ return result;
+}
+
+/*
+ * RelationGetDummyIndexExpressions -- get dummy expressions for an index
+ *
+ * Return a list of dummy expressions (just Const nodes) with the same
+ * types/typmods/collations as the index's real expressions. This is
+ * useful in situations where we don't want to run any user-defined code.
+ */
+List *
+RelationGetDummyIndexExpressions(Relation relation)
+{
+ List *result;
+ Datum exprsDatum;
+ bool isnull;
+ char *exprsString;
+ List *rawExprs;
+ ListCell *lc;
+
+ /* Quick exit if there is nothing to do. */
+ if (relation->rd_indextuple == NULL ||
+ heap_attisnull(relation->rd_indextuple, Anum_pg_index_indexprs, NULL))
+ return NIL;
+
+ /* Extract raw node tree(s) from index tuple. */
+ exprsDatum = heap_getattr(relation->rd_indextuple,
+ Anum_pg_index_indexprs,
+ GetPgIndexDescriptor(),
+ &isnull);
+ Assert(!isnull);
+ exprsString = TextDatumGetCString(exprsDatum);
+ rawExprs = (List *) stringToNode(exprsString);
+ pfree(exprsString);
+
+ /* Construct null Consts; the typlen and typbyval are arbitrary. */
+ result = NIL;
+ foreach(lc, rawExprs)
+ {
+ Node *rawExpr = (Node *) lfirst(lc);
+
+ result = lappend(result,
+ makeConst(exprType(rawExpr),
+ exprTypmod(rawExpr),
+ exprCollation(rawExpr),
+ 1,
+ (Datum) 0,
+ true,
+ true));
+ }
+
+ return result;
+}
+
+/*
+ * RelationGetIndexPredicate -- get the index predicate for an index
+ *
+ * We cache the result of transforming pg_index.indpred into an implicit-AND
+ * node tree (suitable for use in planning).
+ * If the rel is not an index or has no predicate, we return NIL.
+ * Otherwise, the returned tree is copied into the caller's memory context.
+ * (We don't want to return a pointer to the relcache copy, since it could
+ * disappear due to relcache invalidation.)
+ */
+List *
+RelationGetIndexPredicate(Relation relation)
+{
+ List *result;
+ Datum predDatum;
+ bool isnull;
+ char *predString;
+ MemoryContext oldcxt;
+
+ /* Quick exit if we already computed the result. */
+ if (relation->rd_indpred)
+ return copyObject(relation->rd_indpred);
+
+ /* Quick exit if there is nothing to do. */
+ if (relation->rd_indextuple == NULL ||
+ heap_attisnull(relation->rd_indextuple, Anum_pg_index_indpred, NULL))
+ return NIL;
+
+ /*
+ * We build the tree we intend to return in the caller's context. After
+ * successfully completing the work, we copy it into the relcache entry.
+ * This avoids problems if we get some sort of error partway through.
+ */
+ predDatum = heap_getattr(relation->rd_indextuple,
+ Anum_pg_index_indpred,
+ GetPgIndexDescriptor(),
+ &isnull);
+ Assert(!isnull);
+ predString = TextDatumGetCString(predDatum);
+ result = (List *) stringToNode(predString);
+ pfree(predString);
+
+ /*
+ * Run the expression through const-simplification and canonicalization.
+ * This is not just an optimization, but is necessary, because the planner
+ * will be comparing it to similarly-processed qual clauses, and may fail
+ * to detect valid matches without this. This must match the processing
+ * done to qual clauses in preprocess_expression()! (We can skip the
+ * stuff involving subqueries, however, since we don't allow any in index
+ * predicates.)
+ */
+ result = (List *) eval_const_expressions(NULL, (Node *) result);
+
+ result = (List *) canonicalize_qual((Expr *) result, false);
+
+ /* Also convert to implicit-AND format */
+ result = make_ands_implicit((Expr *) result);
+
+ /* May as well fix opfuncids too */
+ fix_opfuncids((Node *) result);
+
+ /* Now save a copy of the completed tree in the relcache entry. */
+ oldcxt = MemoryContextSwitchTo(relation->rd_indexcxt);
+ relation->rd_indpred = copyObject(result);
+ MemoryContextSwitchTo(oldcxt);
+
+ return result;
+}
+
+/*
+ * RelationGetIndexAttrBitmap -- get a bitmap of index attribute numbers
+ *
+ * The result has a bit set for each attribute used anywhere in the index
+ * definitions of all the indexes on this relation. (This includes not only
+ * simple index keys, but attributes used in expressions and partial-index
+ * predicates.)
+ *
+ * Depending on attrKind, a bitmap covering the attnums for all index columns,
+ * for all potential foreign key columns, or for all columns in the configured
+ * replica identity index is returned.
+ *
+ * Attribute numbers are offset by FirstLowInvalidHeapAttributeNumber so that
+ * we can include system attributes (e.g., OID) in the bitmap representation.
+ *
+ * Caller had better hold at least RowExclusiveLock on the target relation
+ * to ensure it is safe (deadlock-free) for us to take locks on the relation's
+ * indexes. Note that since the introduction of CREATE INDEX CONCURRENTLY,
+ * that lock level doesn't guarantee a stable set of indexes, so we have to
+ * be prepared to retry here in case of a change in the set of indexes.
+ *
+ * The returned result is palloc'd in the caller's memory context and should
+ * be bms_free'd when not needed anymore.
+ */
+Bitmapset *
+RelationGetIndexAttrBitmap(Relation relation, IndexAttrBitmapKind attrKind)
+{
+ Bitmapset *indexattrs; /* indexed columns */
+ Bitmapset *uindexattrs; /* columns in unique indexes */
+ Bitmapset *pkindexattrs; /* columns in the primary index */
+ Bitmapset *idindexattrs; /* columns in the replica identity */
+ List *indexoidlist;
+ List *newindexoidlist;
+ Oid relpkindex;
+ Oid relreplindex;
+ ListCell *l;
+ MemoryContext oldcxt;
+
+ /* Quick exit if we already computed the result. */
+ if (relation->rd_indexattr != NULL)
+ {
+ switch (attrKind)
+ {
+ case INDEX_ATTR_BITMAP_ALL:
+ return bms_copy(relation->rd_indexattr);
+ case INDEX_ATTR_BITMAP_KEY:
+ return bms_copy(relation->rd_keyattr);
+ case INDEX_ATTR_BITMAP_PRIMARY_KEY:
+ return bms_copy(relation->rd_pkattr);
+ case INDEX_ATTR_BITMAP_IDENTITY_KEY:
+ return bms_copy(relation->rd_idattr);
+ default:
+ elog(ERROR, "unknown attrKind %u", attrKind);
+ }
+ }
+
+ /* Fast path if definitely no indexes */
+ if (!RelationGetForm(relation)->relhasindex)
+ return NULL;
+
+ /*
+ * Get cached list of index OIDs. If we have to start over, we do so here.
+ */
+restart:
+ indexoidlist = RelationGetIndexList(relation);
+
+ /* Fall out if no indexes (but relhasindex was set) */
+ if (indexoidlist == NIL)
+ return NULL;
+
+ /*
+ * Copy the rd_pkindex and rd_replidindex values computed by
+ * RelationGetIndexList before proceeding. This is needed because a
+ * relcache flush could occur inside index_open below, resetting the
+ * fields managed by RelationGetIndexList. We need to do the work with
+ * stable values of these fields.
+ */
+ relpkindex = relation->rd_pkindex;
+ relreplindex = relation->rd_replidindex;
+
+ /*
+ * For each index, add referenced attributes to indexattrs.
+ *
+ * Note: we consider all indexes returned by RelationGetIndexList, even if
+ * they are not indisready or indisvalid. This is important because an
+ * index for which CREATE INDEX CONCURRENTLY has just started must be
+ * included in HOT-safety decisions (see README.HOT). If a DROP INDEX
+ * CONCURRENTLY is far enough along that we should ignore the index, it
+ * won't be returned at all by RelationGetIndexList.
+ */
+ indexattrs = NULL;
+ uindexattrs = NULL;
+ pkindexattrs = NULL;
+ idindexattrs = NULL;
+ foreach(l, indexoidlist)
+ {
+ Oid indexOid = lfirst_oid(l);
+ Relation indexDesc;
+ Datum datum;
+ bool isnull;
+ Node *indexExpressions;
+ Node *indexPredicate;
+ int i;
+ bool isKey; /* candidate key */
+ bool isPK; /* primary key */
+ bool isIDKey; /* replica identity index */
+
+ indexDesc = index_open(indexOid, AccessShareLock);
+
+ /*
+ * Extract index expressions and index predicate. Note: Don't use
+ * RelationGetIndexExpressions()/RelationGetIndexPredicate(), because
+ * those might run constant expressions evaluation, which needs a
+ * snapshot, which we might not have here. (Also, it's probably more
+ * sound to collect the bitmaps before any transformations that might
+ * eliminate columns, but the practical impact of this is limited.)
+ */
+
+ datum = heap_getattr(indexDesc->rd_indextuple, Anum_pg_index_indexprs,
+ GetPgIndexDescriptor(), &isnull);
+ if (!isnull)
+ indexExpressions = stringToNode(TextDatumGetCString(datum));
+ else
+ indexExpressions = NULL;
+
+ datum = heap_getattr(indexDesc->rd_indextuple, Anum_pg_index_indpred,
+ GetPgIndexDescriptor(), &isnull);
+ if (!isnull)
+ indexPredicate = stringToNode(TextDatumGetCString(datum));
+ else
+ indexPredicate = NULL;
+
+ /* Can this index be referenced by a foreign key? */
+ isKey = indexDesc->rd_index->indisunique &&
+ indexExpressions == NULL &&
+ indexPredicate == NULL;
+
+ /* Is this a primary key? */
+ isPK = (indexOid == relpkindex);
+
+ /* Is this index the configured (or default) replica identity? */
+ isIDKey = (indexOid == relreplindex);
+
+ /* Collect simple attribute references */
+ for (i = 0; i < indexDesc->rd_index->indnatts; i++)
+ {
+ int attrnum = indexDesc->rd_index->indkey.values[i];
+
+ /*
+ * Since we have covering indexes with non-key columns, we must
+ * handle them accurately here. non-key columns must be added into
+ * indexattrs, since they are in index, and HOT-update shouldn't
+ * miss them. Obviously, non-key columns couldn't be referenced by
+ * foreign key or identity key. Hence we do not include them into
+ * uindexattrs, pkindexattrs and idindexattrs bitmaps.
+ */
+ if (attrnum != 0)
+ {
+ indexattrs = bms_add_member(indexattrs,
+ attrnum - FirstLowInvalidHeapAttributeNumber);
+
+ if (isKey && i < indexDesc->rd_index->indnkeyatts)
+ uindexattrs = bms_add_member(uindexattrs,
+ attrnum - FirstLowInvalidHeapAttributeNumber);
+
+ if (isPK && i < indexDesc->rd_index->indnkeyatts)
+ pkindexattrs = bms_add_member(pkindexattrs,
+ attrnum - FirstLowInvalidHeapAttributeNumber);
+
+ if (isIDKey && i < indexDesc->rd_index->indnkeyatts)
+ idindexattrs = bms_add_member(idindexattrs,
+ attrnum - FirstLowInvalidHeapAttributeNumber);
+ }
+ }
+
+ /* Collect all attributes used in expressions, too */
+ pull_varattnos(indexExpressions, 1, &indexattrs);
+
+ /* Collect all attributes in the index predicate, too */
+ pull_varattnos(indexPredicate, 1, &indexattrs);
+
+ index_close(indexDesc, AccessShareLock);
+ }
+
+ /*
+ * During one of the index_opens in the above loop, we might have received
+ * a relcache flush event on this relcache entry, which might have been
+ * signaling a change in the rel's index list. If so, we'd better start
+ * over to ensure we deliver up-to-date attribute bitmaps.
+ */
+ newindexoidlist = RelationGetIndexList(relation);
+ if (equal(indexoidlist, newindexoidlist) &&
+ relpkindex == relation->rd_pkindex &&
+ relreplindex == relation->rd_replidindex)
+ {
+ /* Still the same index set, so proceed */
+ list_free(newindexoidlist);
+ list_free(indexoidlist);
+ }
+ else
+ {
+ /* Gotta do it over ... might as well not leak memory */
+ list_free(newindexoidlist);
+ list_free(indexoidlist);
+ bms_free(uindexattrs);
+ bms_free(pkindexattrs);
+ bms_free(idindexattrs);
+ bms_free(indexattrs);
+
+ goto restart;
+ }
+
+ /* Don't leak the old values of these bitmaps, if any */
+ bms_free(relation->rd_indexattr);
+ relation->rd_indexattr = NULL;
+ bms_free(relation->rd_keyattr);
+ relation->rd_keyattr = NULL;
+ bms_free(relation->rd_pkattr);
+ relation->rd_pkattr = NULL;
+ bms_free(relation->rd_idattr);
+ relation->rd_idattr = NULL;
+
+ /*
+ * Now save copies of the bitmaps in the relcache entry. We intentionally
+ * set rd_indexattr last, because that's the one that signals validity of
+ * the values; if we run out of memory before making that copy, we won't
+ * leave the relcache entry looking like the other ones are valid but
+ * empty.
+ */
+ oldcxt = MemoryContextSwitchTo(CacheMemoryContext);
+ relation->rd_keyattr = bms_copy(uindexattrs);
+ relation->rd_pkattr = bms_copy(pkindexattrs);
+ relation->rd_idattr = bms_copy(idindexattrs);
+ relation->rd_indexattr = bms_copy(indexattrs);
+ MemoryContextSwitchTo(oldcxt);
+
+ /* We return our original working copy for caller to play with */
+ switch (attrKind)
+ {
+ case INDEX_ATTR_BITMAP_ALL:
+ return indexattrs;
+ case INDEX_ATTR_BITMAP_KEY:
+ return uindexattrs;
+ case INDEX_ATTR_BITMAP_PRIMARY_KEY:
+ return pkindexattrs;
+ case INDEX_ATTR_BITMAP_IDENTITY_KEY:
+ return idindexattrs;
+ default:
+ elog(ERROR, "unknown attrKind %u", attrKind);
+ return NULL;
+ }
+}
+
+/*
+ * RelationGetIdentityKeyBitmap -- get a bitmap of replica identity attribute
+ * numbers
+ *
+ * A bitmap of index attribute numbers for the configured replica identity
+ * index is returned.
+ *
+ * See also comments of RelationGetIndexAttrBitmap().
+ *
+ * This is a special purpose function used during logical replication. Here,
+ * unlike RelationGetIndexAttrBitmap(), we don't acquire a lock on the required
+ * index as we build the cache entry using a historic snapshot and all the
+ * later changes are absorbed while decoding WAL. Due to this reason, we don't
+ * need to retry here in case of a change in the set of indexes.
+ */
+Bitmapset *
+RelationGetIdentityKeyBitmap(Relation relation)
+{
+ Bitmapset *idindexattrs = NULL; /* columns in the replica identity */
+ Relation indexDesc;
+ int i;
+ Oid replidindex;
+ MemoryContext oldcxt;
+
+ /* Quick exit if we already computed the result */
+ if (relation->rd_idattr != NULL)
+ return bms_copy(relation->rd_idattr);
+
+ /* Fast path if definitely no indexes */
+ if (!RelationGetForm(relation)->relhasindex)
+ return NULL;
+
+ /* Historic snapshot must be set. */
+ Assert(HistoricSnapshotActive());
+
+ replidindex = RelationGetReplicaIndex(relation);
+
+ /* Fall out if there is no replica identity index */
+ if (!OidIsValid(replidindex))
+ return NULL;
+
+ /* Look up the description for the replica identity index */
+ indexDesc = RelationIdGetRelation(replidindex);
+
+ if (!RelationIsValid(indexDesc))
+ elog(ERROR, "could not open relation with OID %u",
+ relation->rd_replidindex);
+
+ /* Add referenced attributes to idindexattrs */
+ for (i = 0; i < indexDesc->rd_index->indnatts; i++)
+ {
+ int attrnum = indexDesc->rd_index->indkey.values[i];
+
+ /*
+ * We don't include non-key columns into idindexattrs bitmaps. See
+ * RelationGetIndexAttrBitmap.
+ */
+ if (attrnum != 0)
+ {
+ if (i < indexDesc->rd_index->indnkeyatts)
+ idindexattrs = bms_add_member(idindexattrs,
+ attrnum - FirstLowInvalidHeapAttributeNumber);
+ }
+ }
+
+ RelationClose(indexDesc);
+
+ /* Don't leak the old values of these bitmaps, if any */
+ bms_free(relation->rd_idattr);
+ relation->rd_idattr = NULL;
+
+ /* Now save copy of the bitmap in the relcache entry */
+ oldcxt = MemoryContextSwitchTo(CacheMemoryContext);
+ relation->rd_idattr = bms_copy(idindexattrs);
+ MemoryContextSwitchTo(oldcxt);
+
+ /* We return our original working copy for caller to play with */
+ return idindexattrs;
+}
+
+/*
+ * RelationGetExclusionInfo -- get info about index's exclusion constraint
+ *
+ * This should be called only for an index that is known to have an
+ * associated exclusion constraint. It returns arrays (palloc'd in caller's
+ * context) of the exclusion operator OIDs, their underlying functions'
+ * OIDs, and their strategy numbers in the index's opclasses. We cache
+ * all this information since it requires a fair amount of work to get.
+ */
+void
+RelationGetExclusionInfo(Relation indexRelation,
+ Oid **operators,
+ Oid **procs,
+ uint16 **strategies)
+{
+ int indnkeyatts;
+ Oid *ops;
+ Oid *funcs;
+ uint16 *strats;
+ Relation conrel;
+ SysScanDesc conscan;
+ ScanKeyData skey[1];
+ HeapTuple htup;
+ bool found;
+ MemoryContext oldcxt;
+ int i;
+
+ indnkeyatts = IndexRelationGetNumberOfKeyAttributes(indexRelation);
+
+ /* Allocate result space in caller context */
+ *operators = ops = (Oid *) palloc(sizeof(Oid) * indnkeyatts);
+ *procs = funcs = (Oid *) palloc(sizeof(Oid) * indnkeyatts);
+ *strategies = strats = (uint16 *) palloc(sizeof(uint16) * indnkeyatts);
+
+ /* Quick exit if we have the data cached already */
+ if (indexRelation->rd_exclstrats != NULL)
+ {
+ memcpy(ops, indexRelation->rd_exclops, sizeof(Oid) * indnkeyatts);
+ memcpy(funcs, indexRelation->rd_exclprocs, sizeof(Oid) * indnkeyatts);
+ memcpy(strats, indexRelation->rd_exclstrats, sizeof(uint16) * indnkeyatts);
+ return;
+ }
+
+ /*
+ * Search pg_constraint for the constraint associated with the index. To
+ * make this not too painfully slow, we use the index on conrelid; that
+ * will hold the parent relation's OID not the index's own OID.
+ *
+ * Note: if we wanted to rely on the constraint name matching the index's
+ * name, we could just do a direct lookup using pg_constraint's unique
+ * index. For the moment it doesn't seem worth requiring that.
+ */
+ ScanKeyInit(&skey[0],
+ Anum_pg_constraint_conrelid,
+ BTEqualStrategyNumber, F_OIDEQ,
+ ObjectIdGetDatum(indexRelation->rd_index->indrelid));
+
+ conrel = table_open(ConstraintRelationId, AccessShareLock);
+ conscan = systable_beginscan(conrel, ConstraintRelidTypidNameIndexId, true,
+ NULL, 1, skey);
+ found = false;
+
+ while (HeapTupleIsValid(htup = systable_getnext(conscan)))
+ {
+ Form_pg_constraint conform = (Form_pg_constraint) GETSTRUCT(htup);
+ Datum val;
+ bool isnull;
+ ArrayType *arr;
+ int nelem;
+
+ /* We want the exclusion constraint owning the index */
+ if (conform->contype != CONSTRAINT_EXCLUSION ||
+ conform->conindid != RelationGetRelid(indexRelation))
+ continue;
+
+ /* There should be only one */
+ if (found)
+ elog(ERROR, "unexpected exclusion constraint record found for rel %s",
+ RelationGetRelationName(indexRelation));
+ found = true;
+
+ /* Extract the operator OIDS from conexclop */
+ val = fastgetattr(htup,
+ Anum_pg_constraint_conexclop,
+ conrel->rd_att, &isnull);
+ if (isnull)
+ elog(ERROR, "null conexclop for rel %s",
+ RelationGetRelationName(indexRelation));
+
+ arr = DatumGetArrayTypeP(val); /* ensure not toasted */
+ nelem = ARR_DIMS(arr)[0];
+ if (ARR_NDIM(arr) != 1 ||
+ nelem != indnkeyatts ||
+ ARR_HASNULL(arr) ||
+ ARR_ELEMTYPE(arr) != OIDOID)
+ elog(ERROR, "conexclop is not a 1-D Oid array");
+
+ memcpy(ops, ARR_DATA_PTR(arr), sizeof(Oid) * indnkeyatts);
+ }
+
+ systable_endscan(conscan);
+ table_close(conrel, AccessShareLock);
+
+ if (!found)
+ elog(ERROR, "exclusion constraint record missing for rel %s",
+ RelationGetRelationName(indexRelation));
+
+ /* We need the func OIDs and strategy numbers too */
+ for (i = 0; i < indnkeyatts; i++)
+ {
+ funcs[i] = get_opcode(ops[i]);
+ strats[i] = get_op_opfamily_strategy(ops[i],
+ indexRelation->rd_opfamily[i]);
+ /* shouldn't fail, since it was checked at index creation */
+ if (strats[i] == InvalidStrategy)
+ elog(ERROR, "could not find strategy for operator %u in family %u",
+ ops[i], indexRelation->rd_opfamily[i]);
+ }
+
+ /* Save a copy of the results in the relcache entry. */
+ oldcxt = MemoryContextSwitchTo(indexRelation->rd_indexcxt);
+ indexRelation->rd_exclops = (Oid *) palloc(sizeof(Oid) * indnkeyatts);
+ indexRelation->rd_exclprocs = (Oid *) palloc(sizeof(Oid) * indnkeyatts);
+ indexRelation->rd_exclstrats = (uint16 *) palloc(sizeof(uint16) * indnkeyatts);
+ memcpy(indexRelation->rd_exclops, ops, sizeof(Oid) * indnkeyatts);
+ memcpy(indexRelation->rd_exclprocs, funcs, sizeof(Oid) * indnkeyatts);
+ memcpy(indexRelation->rd_exclstrats, strats, sizeof(uint16) * indnkeyatts);
+ MemoryContextSwitchTo(oldcxt);
+}
+
+/*
+ * Get the publication information for the given relation.
+ *
+ * Traverse all the publications which the relation is in to get the
+ * publication actions and validate the row filter expressions for such
+ * publications if any. We consider the row filter expression as invalid if it
+ * references any column which is not part of REPLICA IDENTITY.
+ *
+ * To avoid fetching the publication information repeatedly, we cache the
+ * publication actions and row filter validation information.
+ */
+void
+RelationBuildPublicationDesc(Relation relation, PublicationDesc *pubdesc)
+{
+ List *puboids;
+ ListCell *lc;
+ MemoryContext oldcxt;
+ Oid schemaid;
+ List *ancestors = NIL;
+ Oid relid = RelationGetRelid(relation);
+
+ /*
+ * If not publishable, it publishes no actions. (pgoutput_change() will
+ * ignore it.)
+ */
+ if (!is_publishable_relation(relation))
+ {
+ memset(pubdesc, 0, sizeof(PublicationDesc));
+ pubdesc->rf_valid_for_update = true;
+ pubdesc->rf_valid_for_delete = true;
+ pubdesc->cols_valid_for_update = true;
+ pubdesc->cols_valid_for_delete = true;
+ return;
+ }
+
+ if (relation->rd_pubdesc)
+ {
+ memcpy(pubdesc, relation->rd_pubdesc, sizeof(PublicationDesc));
+ return;
+ }
+
+ memset(pubdesc, 0, sizeof(PublicationDesc));
+ pubdesc->rf_valid_for_update = true;
+ pubdesc->rf_valid_for_delete = true;
+ pubdesc->cols_valid_for_update = true;
+ pubdesc->cols_valid_for_delete = true;
+
+ /* Fetch the publication membership info. */
+ puboids = GetRelationPublications(relid);
+ schemaid = RelationGetNamespace(relation);
+ puboids = list_concat_unique_oid(puboids, GetSchemaPublications(schemaid));
+
+ if (relation->rd_rel->relispartition)
+ {
+ /* Add publications that the ancestors are in too. */
+ ancestors = get_partition_ancestors(relid);
+
+ foreach(lc, ancestors)
+ {
+ Oid ancestor = lfirst_oid(lc);
+
+ puboids = list_concat_unique_oid(puboids,
+ GetRelationPublications(ancestor));
+ schemaid = get_rel_namespace(ancestor);
+ puboids = list_concat_unique_oid(puboids,
+ GetSchemaPublications(schemaid));
+ }
+ }
+ puboids = list_concat_unique_oid(puboids, GetAllTablesPublications());
+
+ foreach(lc, puboids)
+ {
+ Oid pubid = lfirst_oid(lc);
+ HeapTuple tup;
+ Form_pg_publication pubform;
+
+ tup = SearchSysCache1(PUBLICATIONOID, ObjectIdGetDatum(pubid));
+
+ if (!HeapTupleIsValid(tup))
+ elog(ERROR, "cache lookup failed for publication %u", pubid);
+
+ pubform = (Form_pg_publication) GETSTRUCT(tup);
+
+ pubdesc->pubactions.pubinsert |= pubform->pubinsert;
+ pubdesc->pubactions.pubupdate |= pubform->pubupdate;
+ pubdesc->pubactions.pubdelete |= pubform->pubdelete;
+ pubdesc->pubactions.pubtruncate |= pubform->pubtruncate;
+
+ /*
+ * Check if all columns referenced in the filter expression are part
+ * of the REPLICA IDENTITY index or not.
+ *
+ * If the publication is FOR ALL TABLES then it means the table has no
+ * row filters and we can skip the validation.
+ */
+ if (!pubform->puballtables &&
+ (pubform->pubupdate || pubform->pubdelete) &&
+ pub_rf_contains_invalid_column(pubid, relation, ancestors,
+ pubform->pubviaroot))
+ {
+ if (pubform->pubupdate)
+ pubdesc->rf_valid_for_update = false;
+ if (pubform->pubdelete)
+ pubdesc->rf_valid_for_delete = false;
+ }
+
+ /*
+ * Check if all columns are part of the REPLICA IDENTITY index or not.
+ *
+ * If the publication is FOR ALL TABLES then it means the table has no
+ * column list and we can skip the validation.
+ */
+ if (!pubform->puballtables &&
+ (pubform->pubupdate || pubform->pubdelete) &&
+ pub_collist_contains_invalid_column(pubid, relation, ancestors,
+ pubform->pubviaroot))
+ {
+ if (pubform->pubupdate)
+ pubdesc->cols_valid_for_update = false;
+ if (pubform->pubdelete)
+ pubdesc->cols_valid_for_delete = false;
+ }
+
+ ReleaseSysCache(tup);
+
+ /*
+ * If we know everything is replicated and the row filter is invalid
+ * for update and delete, there is no point to check for other
+ * publications.
+ */
+ if (pubdesc->pubactions.pubinsert && pubdesc->pubactions.pubupdate &&
+ pubdesc->pubactions.pubdelete && pubdesc->pubactions.pubtruncate &&
+ !pubdesc->rf_valid_for_update && !pubdesc->rf_valid_for_delete)
+ break;
+
+ /*
+ * If we know everything is replicated and the column list is invalid
+ * for update and delete, there is no point to check for other
+ * publications.
+ */
+ if (pubdesc->pubactions.pubinsert && pubdesc->pubactions.pubupdate &&
+ pubdesc->pubactions.pubdelete && pubdesc->pubactions.pubtruncate &&
+ !pubdesc->cols_valid_for_update && !pubdesc->cols_valid_for_delete)
+ break;
+ }
+
+ if (relation->rd_pubdesc)
+ {
+ pfree(relation->rd_pubdesc);
+ relation->rd_pubdesc = NULL;
+ }
+
+ /* Now save copy of the descriptor in the relcache entry. */
+ oldcxt = MemoryContextSwitchTo(CacheMemoryContext);
+ relation->rd_pubdesc = palloc(sizeof(PublicationDesc));
+ memcpy(relation->rd_pubdesc, pubdesc, sizeof(PublicationDesc));
+ MemoryContextSwitchTo(oldcxt);
+}
+
+/*
+ * RelationGetIndexRawAttOptions -- get AM/opclass-specific options for the index
+ */
+Datum *
+RelationGetIndexRawAttOptions(Relation indexrel)
+{
+ Oid indexrelid = RelationGetRelid(indexrel);
+ int16 natts = RelationGetNumberOfAttributes(indexrel);
+ Datum *options = NULL;
+ int16 attnum;
+
+ for (attnum = 1; attnum <= natts; attnum++)
+ {
+ if (indexrel->rd_indam->amoptsprocnum == 0)
+ continue;
+
+ if (!OidIsValid(index_getprocid(indexrel, attnum,
+ indexrel->rd_indam->amoptsprocnum)))
+ continue;
+
+ if (!options)
+ options = palloc0(sizeof(Datum) * natts);
+
+ options[attnum - 1] = get_attoptions(indexrelid, attnum);
+ }
+
+ return options;
+}
+
+static bytea **
+CopyIndexAttOptions(bytea **srcopts, int natts)
+{
+ bytea **opts = palloc(sizeof(*opts) * natts);
+
+ for (int i = 0; i < natts; i++)
+ {
+ bytea *opt = srcopts[i];
+
+ opts[i] = !opt ? NULL : (bytea *)
+ DatumGetPointer(datumCopy(PointerGetDatum(opt), false, -1));
+ }
+
+ return opts;
+}
+
+/*
+ * RelationGetIndexAttOptions
+ * get AM/opclass-specific options for an index parsed into a binary form
+ */
+bytea **
+RelationGetIndexAttOptions(Relation relation, bool copy)
+{
+ MemoryContext oldcxt;
+ bytea **opts = relation->rd_opcoptions;
+ Oid relid = RelationGetRelid(relation);
+ int natts = RelationGetNumberOfAttributes(relation); /* XXX
+ * IndexRelationGetNumberOfKeyAttributes */
+ int i;
+
+ /* Try to copy cached options. */
+ if (opts)
+ return copy ? CopyIndexAttOptions(opts, natts) : opts;
+
+ /* Get and parse opclass options. */
+ opts = palloc0(sizeof(*opts) * natts);
+
+ for (i = 0; i < natts; i++)
+ {
+ if (criticalRelcachesBuilt && relid != AttributeRelidNumIndexId)
+ {
+ Datum attoptions = get_attoptions(relid, i + 1);
+
+ opts[i] = index_opclass_options(relation, i + 1, attoptions, false);
+
+ if (attoptions != (Datum) 0)
+ pfree(DatumGetPointer(attoptions));
+ }
+ }
+
+ /* Copy parsed options to the cache. */
+ oldcxt = MemoryContextSwitchTo(relation->rd_indexcxt);
+ relation->rd_opcoptions = CopyIndexAttOptions(opts, natts);
+ MemoryContextSwitchTo(oldcxt);
+
+ if (copy)
+ return opts;
+
+ for (i = 0; i < natts; i++)
+ {
+ if (opts[i])
+ pfree(opts[i]);
+ }
+
+ pfree(opts);
+
+ return relation->rd_opcoptions;
+}
+
+/*
+ * Routines to support ereport() reports of relation-related errors
+ *
+ * These could have been put into elog.c, but it seems like a module layering
+ * violation to have elog.c calling relcache or syscache stuff --- and we
+ * definitely don't want elog.h including rel.h. So we put them here.
+ */
+
+/*
+ * errtable --- stores schema_name and table_name of a table
+ * within the current errordata.
+ */
+int
+errtable(Relation rel)
+{
+ err_generic_string(PG_DIAG_SCHEMA_NAME,
+ get_namespace_name(RelationGetNamespace(rel)));
+ err_generic_string(PG_DIAG_TABLE_NAME, RelationGetRelationName(rel));
+
+ return 0; /* return value does not matter */
+}
+
+/*
+ * errtablecol --- stores schema_name, table_name and column_name
+ * of a table column within the current errordata.
+ *
+ * The column is specified by attribute number --- for most callers, this is
+ * easier and less error-prone than getting the column name for themselves.
+ */
+int
+errtablecol(Relation rel, int attnum)
+{
+ TupleDesc reldesc = RelationGetDescr(rel);
+ const char *colname;
+
+ /* Use reldesc if it's a user attribute, else consult the catalogs */
+ if (attnum > 0 && attnum <= reldesc->natts)
+ colname = NameStr(TupleDescAttr(reldesc, attnum - 1)->attname);
+ else
+ colname = get_attname(RelationGetRelid(rel), attnum, false);
+
+ return errtablecolname(rel, colname);
+}
+
+/*
+ * errtablecolname --- stores schema_name, table_name and column_name
+ * of a table column within the current errordata, where the column name is
+ * given directly rather than extracted from the relation's catalog data.
+ *
+ * Don't use this directly unless errtablecol() is inconvenient for some
+ * reason. This might possibly be needed during intermediate states in ALTER
+ * TABLE, for instance.
+ */
+int
+errtablecolname(Relation rel, const char *colname)
+{
+ errtable(rel);
+ err_generic_string(PG_DIAG_COLUMN_NAME, colname);
+
+ return 0; /* return value does not matter */
+}
+
+/*
+ * errtableconstraint --- stores schema_name, table_name and constraint_name
+ * of a table-related constraint within the current errordata.
+ */
+int
+errtableconstraint(Relation rel, const char *conname)
+{
+ errtable(rel);
+ err_generic_string(PG_DIAG_CONSTRAINT_NAME, conname);
+
+ return 0; /* return value does not matter */
+}
+
+
+/*
+ * load_relcache_init_file, write_relcache_init_file
+ *
+ * In late 1992, we started regularly having databases with more than
+ * a thousand classes in them. With this number of classes, it became
+ * critical to do indexed lookups on the system catalogs.
+ *
+ * Bootstrapping these lookups is very hard. We want to be able to
+ * use an index on pg_attribute, for example, but in order to do so,
+ * we must have read pg_attribute for the attributes in the index,
+ * which implies that we need to use the index.
+ *
+ * In order to get around the problem, we do the following:
+ *
+ * + When the database system is initialized (at initdb time), we
+ * don't use indexes. We do sequential scans.
+ *
+ * + When the backend is started up in normal mode, we load an image
+ * of the appropriate relation descriptors, in internal format,
+ * from an initialization file in the data/base/... directory.
+ *
+ * + If the initialization file isn't there, then we create the
+ * relation descriptors using sequential scans and write 'em to
+ * the initialization file for use by subsequent backends.
+ *
+ * As of Postgres 9.0, there is one local initialization file in each
+ * database, plus one shared initialization file for shared catalogs.
+ *
+ * We could dispense with the initialization files and just build the
+ * critical reldescs the hard way on every backend startup, but that
+ * slows down backend startup noticeably.
+ *
+ * We can in fact go further, and save more relcache entries than
+ * just the ones that are absolutely critical; this allows us to speed
+ * up backend startup by not having to build such entries the hard way.
+ * Presently, all the catalog and index entries that are referred to
+ * by catcaches are stored in the initialization files.
+ *
+ * The same mechanism that detects when catcache and relcache entries
+ * need to be invalidated (due to catalog updates) also arranges to
+ * unlink the initialization files when the contents may be out of date.
+ * The files will then be rebuilt during the next backend startup.
+ */
+
+/*
+ * load_relcache_init_file -- attempt to load cache from the shared
+ * or local cache init file
+ *
+ * If successful, return true and set criticalRelcachesBuilt or
+ * criticalSharedRelcachesBuilt to true.
+ * If not successful, return false.
+ *
+ * NOTE: we assume we are already switched into CacheMemoryContext.
+ */
+static bool
+load_relcache_init_file(bool shared)
+{
+ FILE *fp;
+ char initfilename[MAXPGPATH];
+ Relation *rels;
+ int relno,
+ num_rels,
+ max_rels,
+ nailed_rels,
+ nailed_indexes,
+ magic;
+ int i;
+
+ if (shared)
+ snprintf(initfilename, sizeof(initfilename), "global/%s",
+ RELCACHE_INIT_FILENAME);
+ else
+ snprintf(initfilename, sizeof(initfilename), "%s/%s",
+ DatabasePath, RELCACHE_INIT_FILENAME);
+
+ fp = AllocateFile(initfilename, PG_BINARY_R);
+ if (fp == NULL)
+ return false;
+
+ /*
+ * Read the index relcache entries from the file. Note we will not enter
+ * any of them into the cache if the read fails partway through; this
+ * helps to guard against broken init files.
+ */
+ max_rels = 100;
+ rels = (Relation *) palloc(max_rels * sizeof(Relation));
+ num_rels = 0;
+ nailed_rels = nailed_indexes = 0;
+
+ /* check for correct magic number (compatible version) */
+ if (fread(&magic, 1, sizeof(magic), fp) != sizeof(magic))
+ goto read_failed;
+ if (magic != RELCACHE_INIT_FILEMAGIC)
+ goto read_failed;
+
+ for (relno = 0;; relno++)
+ {
+ Size len;
+ size_t nread;
+ Relation rel;
+ Form_pg_class relform;
+ bool has_not_null;
+
+ /* first read the relation descriptor length */
+ nread = fread(&len, 1, sizeof(len), fp);
+ if (nread != sizeof(len))
+ {
+ if (nread == 0)
+ break; /* end of file */
+ goto read_failed;
+ }
+
+ /* safety check for incompatible relcache layout */
+ if (len != sizeof(RelationData))
+ goto read_failed;
+
+ /* allocate another relcache header */
+ if (num_rels >= max_rels)
+ {
+ max_rels *= 2;
+ rels = (Relation *) repalloc(rels, max_rels * sizeof(Relation));
+ }
+
+ rel = rels[num_rels++] = (Relation) palloc(len);
+
+ /* then, read the Relation structure */
+ if (fread(rel, 1, len, fp) != len)
+ goto read_failed;
+
+ /* next read the relation tuple form */
+ if (fread(&len, 1, sizeof(len), fp) != sizeof(len))
+ goto read_failed;
+
+ relform = (Form_pg_class) palloc(len);
+ if (fread(relform, 1, len, fp) != len)
+ goto read_failed;
+
+ rel->rd_rel = relform;
+
+ /* initialize attribute tuple forms */
+ rel->rd_att = CreateTemplateTupleDesc(relform->relnatts);
+ rel->rd_att->tdrefcount = 1; /* mark as refcounted */
+
+ rel->rd_att->tdtypeid = relform->reltype ? relform->reltype : RECORDOID;
+ rel->rd_att->tdtypmod = -1; /* just to be sure */
+
+ /* next read all the attribute tuple form data entries */
+ has_not_null = false;
+ for (i = 0; i < relform->relnatts; i++)
+ {
+ Form_pg_attribute attr = TupleDescAttr(rel->rd_att, i);
+
+ if (fread(&len, 1, sizeof(len), fp) != sizeof(len))
+ goto read_failed;
+ if (len != ATTRIBUTE_FIXED_PART_SIZE)
+ goto read_failed;
+ if (fread(attr, 1, len, fp) != len)
+ goto read_failed;
+
+ has_not_null |= attr->attnotnull;
+ }
+
+ /* next read the access method specific field */
+ if (fread(&len, 1, sizeof(len), fp) != sizeof(len))
+ goto read_failed;
+ if (len > 0)
+ {
+ rel->rd_options = palloc(len);
+ if (fread(rel->rd_options, 1, len, fp) != len)
+ goto read_failed;
+ if (len != VARSIZE(rel->rd_options))
+ goto read_failed; /* sanity check */
+ }
+ else
+ {
+ rel->rd_options = NULL;
+ }
+
+ /* mark not-null status */
+ if (has_not_null)
+ {
+ TupleConstr *constr = (TupleConstr *) palloc0(sizeof(TupleConstr));
+
+ constr->has_not_null = true;
+ rel->rd_att->constr = constr;
+ }
+
+ /*
+ * If it's an index, there's more to do. Note we explicitly ignore
+ * partitioned indexes here.
+ */
+ if (rel->rd_rel->relkind == RELKIND_INDEX)
+ {
+ MemoryContext indexcxt;
+ Oid *opfamily;
+ Oid *opcintype;
+ RegProcedure *support;
+ int nsupport;
+ int16 *indoption;
+ Oid *indcollation;
+
+ /* Count nailed indexes to ensure we have 'em all */
+ if (rel->rd_isnailed)
+ nailed_indexes++;
+
+ /* next, read the pg_index tuple */
+ if (fread(&len, 1, sizeof(len), fp) != sizeof(len))
+ goto read_failed;
+
+ rel->rd_indextuple = (HeapTuple) palloc(len);
+ if (fread(rel->rd_indextuple, 1, len, fp) != len)
+ goto read_failed;
+
+ /* Fix up internal pointers in the tuple -- see heap_copytuple */
+ rel->rd_indextuple->t_data = (HeapTupleHeader) ((char *) rel->rd_indextuple + HEAPTUPLESIZE);
+ rel->rd_index = (Form_pg_index) GETSTRUCT(rel->rd_indextuple);
+
+ /*
+ * prepare index info context --- parameters should match
+ * RelationInitIndexAccessInfo
+ */
+ indexcxt = AllocSetContextCreate(CacheMemoryContext,
+ "index info",
+ ALLOCSET_SMALL_SIZES);
+ rel->rd_indexcxt = indexcxt;
+ MemoryContextCopyAndSetIdentifier(indexcxt,
+ RelationGetRelationName(rel));
+
+ /*
+ * Now we can fetch the index AM's API struct. (We can't store
+ * that in the init file, since it contains function pointers that
+ * might vary across server executions. Fortunately, it should be
+ * safe to call the amhandler even while bootstrapping indexes.)
+ */
+ InitIndexAmRoutine(rel);
+
+ /* next, read the vector of opfamily OIDs */
+ if (fread(&len, 1, sizeof(len), fp) != sizeof(len))
+ goto read_failed;
+
+ opfamily = (Oid *) MemoryContextAlloc(indexcxt, len);
+ if (fread(opfamily, 1, len, fp) != len)
+ goto read_failed;
+
+ rel->rd_opfamily = opfamily;
+
+ /* next, read the vector of opcintype OIDs */
+ if (fread(&len, 1, sizeof(len), fp) != sizeof(len))
+ goto read_failed;
+
+ opcintype = (Oid *) MemoryContextAlloc(indexcxt, len);
+ if (fread(opcintype, 1, len, fp) != len)
+ goto read_failed;
+
+ rel->rd_opcintype = opcintype;
+
+ /* next, read the vector of support procedure OIDs */
+ if (fread(&len, 1, sizeof(len), fp) != sizeof(len))
+ goto read_failed;
+ support = (RegProcedure *) MemoryContextAlloc(indexcxt, len);
+ if (fread(support, 1, len, fp) != len)
+ goto read_failed;
+
+ rel->rd_support = support;
+
+ /* next, read the vector of collation OIDs */
+ if (fread(&len, 1, sizeof(len), fp) != sizeof(len))
+ goto read_failed;
+
+ indcollation = (Oid *) MemoryContextAlloc(indexcxt, len);
+ if (fread(indcollation, 1, len, fp) != len)
+ goto read_failed;
+
+ rel->rd_indcollation = indcollation;
+
+ /* finally, read the vector of indoption values */
+ if (fread(&len, 1, sizeof(len), fp) != sizeof(len))
+ goto read_failed;
+
+ indoption = (int16 *) MemoryContextAlloc(indexcxt, len);
+ if (fread(indoption, 1, len, fp) != len)
+ goto read_failed;
+
+ rel->rd_indoption = indoption;
+
+ /* finally, read the vector of opcoptions values */
+ rel->rd_opcoptions = (bytea **)
+ MemoryContextAllocZero(indexcxt, sizeof(*rel->rd_opcoptions) * relform->relnatts);
+
+ for (i = 0; i < relform->relnatts; i++)
+ {
+ if (fread(&len, 1, sizeof(len), fp) != sizeof(len))
+ goto read_failed;
+
+ if (len > 0)
+ {
+ rel->rd_opcoptions[i] = (bytea *) MemoryContextAlloc(indexcxt, len);
+ if (fread(rel->rd_opcoptions[i], 1, len, fp) != len)
+ goto read_failed;
+ }
+ }
+
+ /* set up zeroed fmgr-info vector */
+ nsupport = relform->relnatts * rel->rd_indam->amsupport;
+ rel->rd_supportinfo = (FmgrInfo *)
+ MemoryContextAllocZero(indexcxt, nsupport * sizeof(FmgrInfo));
+ }
+ else
+ {
+ /* Count nailed rels to ensure we have 'em all */
+ if (rel->rd_isnailed)
+ nailed_rels++;
+
+ /* Load table AM data */
+ if (RELKIND_HAS_TABLE_AM(rel->rd_rel->relkind) || rel->rd_rel->relkind == RELKIND_SEQUENCE)
+ RelationInitTableAccessMethod(rel);
+
+ Assert(rel->rd_index == NULL);
+ Assert(rel->rd_indextuple == NULL);
+ Assert(rel->rd_indexcxt == NULL);
+ Assert(rel->rd_indam == NULL);
+ Assert(rel->rd_opfamily == NULL);
+ Assert(rel->rd_opcintype == NULL);
+ Assert(rel->rd_support == NULL);
+ Assert(rel->rd_supportinfo == NULL);
+ Assert(rel->rd_indoption == NULL);
+ Assert(rel->rd_indcollation == NULL);
+ Assert(rel->rd_opcoptions == NULL);
+ }
+
+ /*
+ * Rules and triggers are not saved (mainly because the internal
+ * format is complex and subject to change). They must be rebuilt if
+ * needed by RelationCacheInitializePhase3. This is not expected to
+ * be a big performance hit since few system catalogs have such. Ditto
+ * for RLS policy data, partition info, index expressions, predicates,
+ * exclusion info, and FDW info.
+ */
+ rel->rd_rules = NULL;
+ rel->rd_rulescxt = NULL;
+ rel->trigdesc = NULL;
+ rel->rd_rsdesc = NULL;
+ rel->rd_partkey = NULL;
+ rel->rd_partkeycxt = NULL;
+ rel->rd_partdesc = NULL;
+ rel->rd_partdesc_nodetached = NULL;
+ rel->rd_partdesc_nodetached_xmin = InvalidTransactionId;
+ rel->rd_pdcxt = NULL;
+ rel->rd_pddcxt = NULL;
+ rel->rd_partcheck = NIL;
+ rel->rd_partcheckvalid = false;
+ rel->rd_partcheckcxt = NULL;
+ rel->rd_indexprs = NIL;
+ rel->rd_indpred = NIL;
+ rel->rd_exclops = NULL;
+ rel->rd_exclprocs = NULL;
+ rel->rd_exclstrats = NULL;
+ rel->rd_fdwroutine = NULL;
+
+ /*
+ * Reset transient-state fields in the relcache entry
+ */
+ rel->rd_smgr = NULL;
+ if (rel->rd_isnailed)
+ rel->rd_refcnt = 1;
+ else
+ rel->rd_refcnt = 0;
+ rel->rd_indexvalid = false;
+ rel->rd_indexlist = NIL;
+ rel->rd_pkindex = InvalidOid;
+ rel->rd_replidindex = InvalidOid;
+ rel->rd_indexattr = NULL;
+ rel->rd_keyattr = NULL;
+ rel->rd_pkattr = NULL;
+ rel->rd_idattr = NULL;
+ rel->rd_pubdesc = NULL;
+ rel->rd_statvalid = false;
+ rel->rd_statlist = NIL;
+ rel->rd_fkeyvalid = false;
+ rel->rd_fkeylist = NIL;
+ rel->rd_createSubid = InvalidSubTransactionId;
+ rel->rd_newRelfilenodeSubid = InvalidSubTransactionId;
+ rel->rd_firstRelfilenodeSubid = InvalidSubTransactionId;
+ rel->rd_droppedSubid = InvalidSubTransactionId;
+ rel->rd_amcache = NULL;
+ MemSet(&rel->pgstat_info, 0, sizeof(rel->pgstat_info));
+
+ /*
+ * Recompute lock and physical addressing info. This is needed in
+ * case the pg_internal.init file was copied from some other database
+ * by CREATE DATABASE.
+ */
+ RelationInitLockInfo(rel);
+ RelationInitPhysicalAddr(rel);
+ }
+
+ /*
+ * We reached the end of the init file without apparent problem. Did we
+ * get the right number of nailed items? This is a useful crosscheck in
+ * case the set of critical rels or indexes changes. However, that should
+ * not happen in a normally-running system, so let's bleat if it does.
+ *
+ * For the shared init file, we're called before client authentication is
+ * done, which means that elog(WARNING) will go only to the postmaster
+ * log, where it's easily missed. To ensure that developers notice bad
+ * values of NUM_CRITICAL_SHARED_RELS/NUM_CRITICAL_SHARED_INDEXES, we put
+ * an Assert(false) there.
+ */
+ if (shared)
+ {
+ if (nailed_rels != NUM_CRITICAL_SHARED_RELS ||
+ nailed_indexes != NUM_CRITICAL_SHARED_INDEXES)
+ {
+ elog(WARNING, "found %d nailed shared rels and %d nailed shared indexes in init file, but expected %d and %d respectively",
+ nailed_rels, nailed_indexes,
+ NUM_CRITICAL_SHARED_RELS, NUM_CRITICAL_SHARED_INDEXES);
+ /* Make sure we get developers' attention about this */
+ Assert(false);
+ /* In production builds, recover by bootstrapping the relcache */
+ goto read_failed;
+ }
+ }
+ else
+ {
+ if (nailed_rels != NUM_CRITICAL_LOCAL_RELS ||
+ nailed_indexes != NUM_CRITICAL_LOCAL_INDEXES)
+ {
+ elog(WARNING, "found %d nailed rels and %d nailed indexes in init file, but expected %d and %d respectively",
+ nailed_rels, nailed_indexes,
+ NUM_CRITICAL_LOCAL_RELS, NUM_CRITICAL_LOCAL_INDEXES);
+ /* We don't need an Assert() in this case */
+ goto read_failed;
+ }
+ }
+
+ /*
+ * OK, all appears well.
+ *
+ * Now insert all the new relcache entries into the cache.
+ */
+ for (relno = 0; relno < num_rels; relno++)
+ {
+ RelationCacheInsert(rels[relno], false);
+ }
+
+ pfree(rels);
+ FreeFile(fp);
+
+ if (shared)
+ criticalSharedRelcachesBuilt = true;
+ else
+ criticalRelcachesBuilt = true;
+ return true;
+
+ /*
+ * init file is broken, so do it the hard way. We don't bother trying to
+ * free the clutter we just allocated; it's not in the relcache so it
+ * won't hurt.
+ */
+read_failed:
+ pfree(rels);
+ FreeFile(fp);
+
+ return false;
+}
+
+/*
+ * Write out a new initialization file with the current contents
+ * of the relcache (either shared rels or local rels, as indicated).
+ */
+static void
+write_relcache_init_file(bool shared)
+{
+ FILE *fp;
+ char tempfilename[MAXPGPATH];
+ char finalfilename[MAXPGPATH];
+ int magic;
+ HASH_SEQ_STATUS status;
+ RelIdCacheEnt *idhentry;
+ int i;
+
+ /*
+ * If we have already received any relcache inval events, there's no
+ * chance of succeeding so we may as well skip the whole thing.
+ */
+ if (relcacheInvalsReceived != 0L)
+ return;
+
+ /*
+ * We must write a temporary file and rename it into place. Otherwise,
+ * another backend starting at about the same time might crash trying to
+ * read the partially-complete file.
+ */
+ if (shared)
+ {
+ snprintf(tempfilename, sizeof(tempfilename), "global/%s.%d",
+ RELCACHE_INIT_FILENAME, MyProcPid);
+ snprintf(finalfilename, sizeof(finalfilename), "global/%s",
+ RELCACHE_INIT_FILENAME);
+ }
+ else
+ {
+ snprintf(tempfilename, sizeof(tempfilename), "%s/%s.%d",
+ DatabasePath, RELCACHE_INIT_FILENAME, MyProcPid);
+ snprintf(finalfilename, sizeof(finalfilename), "%s/%s",
+ DatabasePath, RELCACHE_INIT_FILENAME);
+ }
+
+ unlink(tempfilename); /* in case it exists w/wrong permissions */
+
+ fp = AllocateFile(tempfilename, PG_BINARY_W);
+ if (fp == NULL)
+ {
+ /*
+ * We used to consider this a fatal error, but we might as well
+ * continue with backend startup ...
+ */
+ ereport(WARNING,
+ (errcode_for_file_access(),
+ errmsg("could not create relation-cache initialization file \"%s\": %m",
+ tempfilename),
+ errdetail("Continuing anyway, but there's something wrong.")));
+ return;
+ }
+
+ /*
+ * Write a magic number to serve as a file version identifier. We can
+ * change the magic number whenever the relcache layout changes.
+ */
+ magic = RELCACHE_INIT_FILEMAGIC;
+ if (fwrite(&magic, 1, sizeof(magic), fp) != sizeof(magic))
+ elog(FATAL, "could not write init file");
+
+ /*
+ * Write all the appropriate reldescs (in no particular order).
+ */
+ hash_seq_init(&status, RelationIdCache);
+
+ while ((idhentry = (RelIdCacheEnt *) hash_seq_search(&status)) != NULL)
+ {
+ Relation rel = idhentry->reldesc;
+ Form_pg_class relform = rel->rd_rel;
+
+ /* ignore if not correct group */
+ if (relform->relisshared != shared)
+ continue;
+
+ /*
+ * Ignore if not supposed to be in init file. We can allow any shared
+ * relation that's been loaded so far to be in the shared init file,
+ * but unshared relations must be ones that should be in the local
+ * file per RelationIdIsInInitFile. (Note: if you want to change the
+ * criterion for rels to be kept in the init file, see also inval.c.
+ * The reason for filtering here is to be sure that we don't put
+ * anything into the local init file for which a relcache inval would
+ * not cause invalidation of that init file.)
+ */
+ if (!shared && !RelationIdIsInInitFile(RelationGetRelid(rel)))
+ {
+ /* Nailed rels had better get stored. */
+ Assert(!rel->rd_isnailed);
+ continue;
+ }
+
+ /* first write the relcache entry proper */
+ write_item(rel, sizeof(RelationData), fp);
+
+ /* next write the relation tuple form */
+ write_item(relform, CLASS_TUPLE_SIZE, fp);
+
+ /* next, do all the attribute tuple form data entries */
+ for (i = 0; i < relform->relnatts; i++)
+ {
+ write_item(TupleDescAttr(rel->rd_att, i),
+ ATTRIBUTE_FIXED_PART_SIZE, fp);
+ }
+
+ /* next, do the access method specific field */
+ write_item(rel->rd_options,
+ (rel->rd_options ? VARSIZE(rel->rd_options) : 0),
+ fp);
+
+ /*
+ * If it's an index, there's more to do. Note we explicitly ignore
+ * partitioned indexes here.
+ */
+ if (rel->rd_rel->relkind == RELKIND_INDEX)
+ {
+ /* write the pg_index tuple */
+ /* we assume this was created by heap_copytuple! */
+ write_item(rel->rd_indextuple,
+ HEAPTUPLESIZE + rel->rd_indextuple->t_len,
+ fp);
+
+ /* next, write the vector of opfamily OIDs */
+ write_item(rel->rd_opfamily,
+ relform->relnatts * sizeof(Oid),
+ fp);
+
+ /* next, write the vector of opcintype OIDs */
+ write_item(rel->rd_opcintype,
+ relform->relnatts * sizeof(Oid),
+ fp);
+
+ /* next, write the vector of support procedure OIDs */
+ write_item(rel->rd_support,
+ relform->relnatts * (rel->rd_indam->amsupport * sizeof(RegProcedure)),
+ fp);
+
+ /* next, write the vector of collation OIDs */
+ write_item(rel->rd_indcollation,
+ relform->relnatts * sizeof(Oid),
+ fp);
+
+ /* finally, write the vector of indoption values */
+ write_item(rel->rd_indoption,
+ relform->relnatts * sizeof(int16),
+ fp);
+
+ Assert(rel->rd_opcoptions);
+
+ /* finally, write the vector of opcoptions values */
+ for (i = 0; i < relform->relnatts; i++)
+ {
+ bytea *opt = rel->rd_opcoptions[i];
+
+ write_item(opt, opt ? VARSIZE(opt) : 0, fp);
+ }
+ }
+ }
+
+ if (FreeFile(fp))
+ elog(FATAL, "could not write init file");
+
+ /*
+ * Now we have to check whether the data we've so painstakingly
+ * accumulated is already obsolete due to someone else's just-committed
+ * catalog changes. If so, we just delete the temp file and leave it to
+ * the next backend to try again. (Our own relcache entries will be
+ * updated by SI message processing, but we can't be sure whether what we
+ * wrote out was up-to-date.)
+ *
+ * This mustn't run concurrently with the code that unlinks an init file
+ * and sends SI messages, so grab a serialization lock for the duration.
+ */
+ LWLockAcquire(RelCacheInitLock, LW_EXCLUSIVE);
+
+ /* Make sure we have seen all incoming SI messages */
+ AcceptInvalidationMessages();
+
+ /*
+ * If we have received any SI relcache invals since backend start, assume
+ * we may have written out-of-date data.
+ */
+ if (relcacheInvalsReceived == 0L)
+ {
+ /*
+ * OK, rename the temp file to its final name, deleting any
+ * previously-existing init file.
+ *
+ * Note: a failure here is possible under Cygwin, if some other
+ * backend is holding open an unlinked-but-not-yet-gone init file. So
+ * treat this as a noncritical failure; just remove the useless temp
+ * file on failure.
+ */
+ if (rename(tempfilename, finalfilename) < 0)
+ unlink(tempfilename);
+ }
+ else
+ {
+ /* Delete the already-obsolete temp file */
+ unlink(tempfilename);
+ }
+
+ LWLockRelease(RelCacheInitLock);
+}
+
+/* write a chunk of data preceded by its length */
+static void
+write_item(const void *data, Size len, FILE *fp)
+{
+ if (fwrite(&len, 1, sizeof(len), fp) != sizeof(len))
+ elog(FATAL, "could not write init file");
+ if (len > 0 && fwrite(data, 1, len, fp) != len)
+ elog(FATAL, "could not write init file");
+}
+
+/*
+ * Determine whether a given relation (identified by OID) is one of the ones
+ * we should store in a relcache init file.
+ *
+ * We must cache all nailed rels, and for efficiency we should cache every rel
+ * that supports a syscache. The former set is almost but not quite a subset
+ * of the latter. The special cases are relations where
+ * RelationCacheInitializePhase2/3 chooses to nail for efficiency reasons, but
+ * which do not support any syscache.
+ */
+bool
+RelationIdIsInInitFile(Oid relationId)
+{
+ if (relationId == SharedSecLabelRelationId ||
+ relationId == TriggerRelidNameIndexId ||
+ relationId == DatabaseNameIndexId ||
+ relationId == SharedSecLabelObjectIndexId)
+ {
+ /*
+ * If this Assert fails, we don't need the applicable special case
+ * anymore.
+ */
+ Assert(!RelationSupportsSysCache(relationId));
+ return true;
+ }
+ return RelationSupportsSysCache(relationId);
+}
+
+/*
+ * Invalidate (remove) the init file during commit of a transaction that
+ * changed one or more of the relation cache entries that are kept in the
+ * local init file.
+ *
+ * To be safe against concurrent inspection or rewriting of the init file,
+ * we must take RelCacheInitLock, then remove the old init file, then send
+ * the SI messages that include relcache inval for such relations, and then
+ * release RelCacheInitLock. This serializes the whole affair against
+ * write_relcache_init_file, so that we can be sure that any other process
+ * that's concurrently trying to create a new init file won't move an
+ * already-stale version into place after we unlink. Also, because we unlink
+ * before sending the SI messages, a backend that's currently starting cannot
+ * read the now-obsolete init file and then miss the SI messages that will
+ * force it to update its relcache entries. (This works because the backend
+ * startup sequence gets into the sinval array before trying to load the init
+ * file.)
+ *
+ * We take the lock and do the unlink in RelationCacheInitFilePreInvalidate,
+ * then release the lock in RelationCacheInitFilePostInvalidate. Caller must
+ * send any pending SI messages between those calls.
+ */
+void
+RelationCacheInitFilePreInvalidate(void)
+{
+ char localinitfname[MAXPGPATH];
+ char sharedinitfname[MAXPGPATH];
+
+ if (DatabasePath)
+ snprintf(localinitfname, sizeof(localinitfname), "%s/%s",
+ DatabasePath, RELCACHE_INIT_FILENAME);
+ snprintf(sharedinitfname, sizeof(sharedinitfname), "global/%s",
+ RELCACHE_INIT_FILENAME);
+
+ LWLockAcquire(RelCacheInitLock, LW_EXCLUSIVE);
+
+ /*
+ * The files might not be there if no backend has been started since the
+ * last removal. But complain about failures other than ENOENT with
+ * ERROR. Fortunately, it's not too late to abort the transaction if we
+ * can't get rid of the would-be-obsolete init file.
+ */
+ if (DatabasePath)
+ unlink_initfile(localinitfname, ERROR);
+ unlink_initfile(sharedinitfname, ERROR);
+}
+
+void
+RelationCacheInitFilePostInvalidate(void)
+{
+ LWLockRelease(RelCacheInitLock);
+}
+
+/*
+ * Remove the init files during postmaster startup.
+ *
+ * We used to keep the init files across restarts, but that is unsafe in PITR
+ * scenarios, and even in simple crash-recovery cases there are windows for
+ * the init files to become out-of-sync with the database. So now we just
+ * remove them during startup and expect the first backend launch to rebuild
+ * them. Of course, this has to happen in each database of the cluster.
+ */
+void
+RelationCacheInitFileRemove(void)
+{
+ const char *tblspcdir = "pg_tblspc";
+ DIR *dir;
+ struct dirent *de;
+ char path[MAXPGPATH + 10 + sizeof(TABLESPACE_VERSION_DIRECTORY)];
+
+ snprintf(path, sizeof(path), "global/%s",
+ RELCACHE_INIT_FILENAME);
+ unlink_initfile(path, LOG);
+
+ /* Scan everything in the default tablespace */
+ RelationCacheInitFileRemoveInDir("base");
+
+ /* Scan the tablespace link directory to find non-default tablespaces */
+ dir = AllocateDir(tblspcdir);
+
+ while ((de = ReadDirExtended(dir, tblspcdir, LOG)) != NULL)
+ {
+ if (strspn(de->d_name, "0123456789") == strlen(de->d_name))
+ {
+ /* Scan the tablespace dir for per-database dirs */
+ snprintf(path, sizeof(path), "%s/%s/%s",
+ tblspcdir, de->d_name, TABLESPACE_VERSION_DIRECTORY);
+ RelationCacheInitFileRemoveInDir(path);
+ }
+ }
+
+ FreeDir(dir);
+}
+
+/* Process one per-tablespace directory for RelationCacheInitFileRemove */
+static void
+RelationCacheInitFileRemoveInDir(const char *tblspcpath)
+{
+ DIR *dir;
+ struct dirent *de;
+ char initfilename[MAXPGPATH * 2];
+
+ /* Scan the tablespace directory to find per-database directories */
+ dir = AllocateDir(tblspcpath);
+
+ while ((de = ReadDirExtended(dir, tblspcpath, LOG)) != NULL)
+ {
+ if (strspn(de->d_name, "0123456789") == strlen(de->d_name))
+ {
+ /* Try to remove the init file in each database */
+ snprintf(initfilename, sizeof(initfilename), "%s/%s/%s",
+ tblspcpath, de->d_name, RELCACHE_INIT_FILENAME);
+ unlink_initfile(initfilename, LOG);
+ }
+ }
+
+ FreeDir(dir);
+}
+
+static void
+unlink_initfile(const char *initfilename, int elevel)
+{
+ if (unlink(initfilename) < 0)
+ {
+ /* It might not be there, but log any error other than ENOENT */
+ if (errno != ENOENT)
+ ereport(elevel,
+ (errcode_for_file_access(),
+ errmsg("could not remove cache file \"%s\": %m",
+ initfilename)));
+ }
+}