summaryrefslogtreecommitdiffstats
path: root/src/backend/catalog/pg_constraint.c
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-05-04 12:15:05 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-05-04 12:15:05 +0000
commit46651ce6fe013220ed397add242004d764fc0153 (patch)
tree6e5299f990f88e60174a1d3ae6e48eedd2688b2b /src/backend/catalog/pg_constraint.c
parentInitial commit. (diff)
downloadpostgresql-14-upstream.tar.xz
postgresql-14-upstream.zip
Adding upstream version 14.5.upstream/14.5upstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'src/backend/catalog/pg_constraint.c')
-rw-r--r--src/backend/catalog/pg_constraint.c1318
1 files changed, 1318 insertions, 0 deletions
diff --git a/src/backend/catalog/pg_constraint.c b/src/backend/catalog/pg_constraint.c
new file mode 100644
index 0000000..a4e8900
--- /dev/null
+++ b/src/backend/catalog/pg_constraint.c
@@ -0,0 +1,1318 @@
+/*-------------------------------------------------------------------------
+ *
+ * pg_constraint.c
+ * routines to support manipulation of the pg_constraint relation
+ *
+ * Portions Copyright (c) 1996-2021, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ *
+ * IDENTIFICATION
+ * src/backend/catalog/pg_constraint.c
+ *
+ *-------------------------------------------------------------------------
+ */
+#include "postgres.h"
+
+#include "access/genam.h"
+#include "access/htup_details.h"
+#include "access/sysattr.h"
+#include "access/table.h"
+#include "access/xact.h"
+#include "catalog/catalog.h"
+#include "catalog/dependency.h"
+#include "catalog/indexing.h"
+#include "catalog/objectaccess.h"
+#include "catalog/pg_constraint.h"
+#include "catalog/pg_operator.h"
+#include "catalog/pg_type.h"
+#include "commands/defrem.h"
+#include "commands/tablecmds.h"
+#include "utils/array.h"
+#include "utils/builtins.h"
+#include "utils/fmgroids.h"
+#include "utils/lsyscache.h"
+#include "utils/rel.h"
+#include "utils/syscache.h"
+
+
+/*
+ * CreateConstraintEntry
+ * Create a constraint table entry.
+ *
+ * Subsidiary records (such as triggers or indexes to implement the
+ * constraint) are *not* created here. But we do make dependency links
+ * from the constraint to the things it depends on.
+ *
+ * The new constraint's OID is returned.
+ */
+Oid
+CreateConstraintEntry(const char *constraintName,
+ Oid constraintNamespace,
+ char constraintType,
+ bool isDeferrable,
+ bool isDeferred,
+ bool isValidated,
+ Oid parentConstrId,
+ Oid relId,
+ const int16 *constraintKey,
+ int constraintNKeys,
+ int constraintNTotalKeys,
+ Oid domainId,
+ Oid indexRelId,
+ Oid foreignRelId,
+ const int16 *foreignKey,
+ const Oid *pfEqOp,
+ const Oid *ppEqOp,
+ const Oid *ffEqOp,
+ int foreignNKeys,
+ char foreignUpdateType,
+ char foreignDeleteType,
+ char foreignMatchType,
+ const Oid *exclOp,
+ Node *conExpr,
+ const char *conBin,
+ bool conIsLocal,
+ int conInhCount,
+ bool conNoInherit,
+ bool is_internal)
+{
+ Relation conDesc;
+ Oid conOid;
+ HeapTuple tup;
+ bool nulls[Natts_pg_constraint];
+ Datum values[Natts_pg_constraint];
+ ArrayType *conkeyArray;
+ ArrayType *confkeyArray;
+ ArrayType *conpfeqopArray;
+ ArrayType *conppeqopArray;
+ ArrayType *conffeqopArray;
+ ArrayType *conexclopArray;
+ NameData cname;
+ int i;
+ ObjectAddress conobject;
+ ObjectAddresses *addrs_auto;
+ ObjectAddresses *addrs_normal;
+
+ conDesc = table_open(ConstraintRelationId, RowExclusiveLock);
+
+ Assert(constraintName);
+ namestrcpy(&cname, constraintName);
+
+ /*
+ * Convert C arrays into Postgres arrays.
+ */
+ if (constraintNKeys > 0)
+ {
+ Datum *conkey;
+
+ conkey = (Datum *) palloc(constraintNKeys * sizeof(Datum));
+ for (i = 0; i < constraintNKeys; i++)
+ conkey[i] = Int16GetDatum(constraintKey[i]);
+ conkeyArray = construct_array(conkey, constraintNKeys,
+ INT2OID, 2, true, TYPALIGN_SHORT);
+ }
+ else
+ conkeyArray = NULL;
+
+ if (foreignNKeys > 0)
+ {
+ Datum *fkdatums;
+
+ fkdatums = (Datum *) palloc(foreignNKeys * sizeof(Datum));
+ for (i = 0; i < foreignNKeys; i++)
+ fkdatums[i] = Int16GetDatum(foreignKey[i]);
+ confkeyArray = construct_array(fkdatums, foreignNKeys,
+ INT2OID, 2, true, TYPALIGN_SHORT);
+ for (i = 0; i < foreignNKeys; i++)
+ fkdatums[i] = ObjectIdGetDatum(pfEqOp[i]);
+ conpfeqopArray = construct_array(fkdatums, foreignNKeys,
+ OIDOID, sizeof(Oid), true, TYPALIGN_INT);
+ for (i = 0; i < foreignNKeys; i++)
+ fkdatums[i] = ObjectIdGetDatum(ppEqOp[i]);
+ conppeqopArray = construct_array(fkdatums, foreignNKeys,
+ OIDOID, sizeof(Oid), true, TYPALIGN_INT);
+ for (i = 0; i < foreignNKeys; i++)
+ fkdatums[i] = ObjectIdGetDatum(ffEqOp[i]);
+ conffeqopArray = construct_array(fkdatums, foreignNKeys,
+ OIDOID, sizeof(Oid), true, TYPALIGN_INT);
+ }
+ else
+ {
+ confkeyArray = NULL;
+ conpfeqopArray = NULL;
+ conppeqopArray = NULL;
+ conffeqopArray = NULL;
+ }
+
+ if (exclOp != NULL)
+ {
+ Datum *opdatums;
+
+ opdatums = (Datum *) palloc(constraintNKeys * sizeof(Datum));
+ for (i = 0; i < constraintNKeys; i++)
+ opdatums[i] = ObjectIdGetDatum(exclOp[i]);
+ conexclopArray = construct_array(opdatums, constraintNKeys,
+ OIDOID, sizeof(Oid), true, TYPALIGN_INT);
+ }
+ else
+ conexclopArray = NULL;
+
+ /* initialize nulls and values */
+ for (i = 0; i < Natts_pg_constraint; i++)
+ {
+ nulls[i] = false;
+ values[i] = (Datum) NULL;
+ }
+
+ conOid = GetNewOidWithIndex(conDesc, ConstraintOidIndexId,
+ Anum_pg_constraint_oid);
+ values[Anum_pg_constraint_oid - 1] = ObjectIdGetDatum(conOid);
+ values[Anum_pg_constraint_conname - 1] = NameGetDatum(&cname);
+ values[Anum_pg_constraint_connamespace - 1] = ObjectIdGetDatum(constraintNamespace);
+ values[Anum_pg_constraint_contype - 1] = CharGetDatum(constraintType);
+ values[Anum_pg_constraint_condeferrable - 1] = BoolGetDatum(isDeferrable);
+ values[Anum_pg_constraint_condeferred - 1] = BoolGetDatum(isDeferred);
+ values[Anum_pg_constraint_convalidated - 1] = BoolGetDatum(isValidated);
+ values[Anum_pg_constraint_conrelid - 1] = ObjectIdGetDatum(relId);
+ values[Anum_pg_constraint_contypid - 1] = ObjectIdGetDatum(domainId);
+ values[Anum_pg_constraint_conindid - 1] = ObjectIdGetDatum(indexRelId);
+ values[Anum_pg_constraint_conparentid - 1] = ObjectIdGetDatum(parentConstrId);
+ values[Anum_pg_constraint_confrelid - 1] = ObjectIdGetDatum(foreignRelId);
+ values[Anum_pg_constraint_confupdtype - 1] = CharGetDatum(foreignUpdateType);
+ values[Anum_pg_constraint_confdeltype - 1] = CharGetDatum(foreignDeleteType);
+ values[Anum_pg_constraint_confmatchtype - 1] = CharGetDatum(foreignMatchType);
+ values[Anum_pg_constraint_conislocal - 1] = BoolGetDatum(conIsLocal);
+ values[Anum_pg_constraint_coninhcount - 1] = Int32GetDatum(conInhCount);
+ values[Anum_pg_constraint_connoinherit - 1] = BoolGetDatum(conNoInherit);
+
+ if (conkeyArray)
+ values[Anum_pg_constraint_conkey - 1] = PointerGetDatum(conkeyArray);
+ else
+ nulls[Anum_pg_constraint_conkey - 1] = true;
+
+ if (confkeyArray)
+ values[Anum_pg_constraint_confkey - 1] = PointerGetDatum(confkeyArray);
+ else
+ nulls[Anum_pg_constraint_confkey - 1] = true;
+
+ if (conpfeqopArray)
+ values[Anum_pg_constraint_conpfeqop - 1] = PointerGetDatum(conpfeqopArray);
+ else
+ nulls[Anum_pg_constraint_conpfeqop - 1] = true;
+
+ if (conppeqopArray)
+ values[Anum_pg_constraint_conppeqop - 1] = PointerGetDatum(conppeqopArray);
+ else
+ nulls[Anum_pg_constraint_conppeqop - 1] = true;
+
+ if (conffeqopArray)
+ values[Anum_pg_constraint_conffeqop - 1] = PointerGetDatum(conffeqopArray);
+ else
+ nulls[Anum_pg_constraint_conffeqop - 1] = true;
+
+ if (conexclopArray)
+ values[Anum_pg_constraint_conexclop - 1] = PointerGetDatum(conexclopArray);
+ else
+ nulls[Anum_pg_constraint_conexclop - 1] = true;
+
+ if (conBin)
+ values[Anum_pg_constraint_conbin - 1] = CStringGetTextDatum(conBin);
+ else
+ nulls[Anum_pg_constraint_conbin - 1] = true;
+
+ tup = heap_form_tuple(RelationGetDescr(conDesc), values, nulls);
+
+ CatalogTupleInsert(conDesc, tup);
+
+ ObjectAddressSet(conobject, ConstraintRelationId, conOid);
+
+ table_close(conDesc, RowExclusiveLock);
+
+ /* Handle set of auto dependencies */
+ addrs_auto = new_object_addresses();
+
+ if (OidIsValid(relId))
+ {
+ /*
+ * Register auto dependency from constraint to owning relation, or to
+ * specific column(s) if any are mentioned.
+ */
+ ObjectAddress relobject;
+
+ if (constraintNTotalKeys > 0)
+ {
+ for (i = 0; i < constraintNTotalKeys; i++)
+ {
+ ObjectAddressSubSet(relobject, RelationRelationId, relId,
+ constraintKey[i]);
+ add_exact_object_address(&relobject, addrs_auto);
+ }
+ }
+ else
+ {
+ ObjectAddressSet(relobject, RelationRelationId, relId);
+ add_exact_object_address(&relobject, addrs_auto);
+ }
+ }
+
+ if (OidIsValid(domainId))
+ {
+ /*
+ * Register auto dependency from constraint to owning domain
+ */
+ ObjectAddress domobject;
+
+ ObjectAddressSet(domobject, TypeRelationId, domainId);
+ add_exact_object_address(&domobject, addrs_auto);
+ }
+
+ record_object_address_dependencies(&conobject, addrs_auto,
+ DEPENDENCY_AUTO);
+ free_object_addresses(addrs_auto);
+
+ /* Handle set of normal dependencies */
+ addrs_normal = new_object_addresses();
+
+ if (OidIsValid(foreignRelId))
+ {
+ /*
+ * Register normal dependency from constraint to foreign relation, or
+ * to specific column(s) if any are mentioned.
+ */
+ ObjectAddress relobject;
+
+ if (foreignNKeys > 0)
+ {
+ for (i = 0; i < foreignNKeys; i++)
+ {
+ ObjectAddressSubSet(relobject, RelationRelationId,
+ foreignRelId, foreignKey[i]);
+ add_exact_object_address(&relobject, addrs_normal);
+ }
+ }
+ else
+ {
+ ObjectAddressSet(relobject, RelationRelationId, foreignRelId);
+ add_exact_object_address(&relobject, addrs_normal);
+ }
+ }
+
+ if (OidIsValid(indexRelId) && constraintType == CONSTRAINT_FOREIGN)
+ {
+ /*
+ * Register normal dependency on the unique index that supports a
+ * foreign-key constraint. (Note: for indexes associated with unique
+ * or primary-key constraints, the dependency runs the other way, and
+ * is not made here.)
+ */
+ ObjectAddress relobject;
+
+ ObjectAddressSet(relobject, RelationRelationId, indexRelId);
+ add_exact_object_address(&relobject, addrs_normal);
+ }
+
+ if (foreignNKeys > 0)
+ {
+ /*
+ * Register normal dependencies on the equality operators that support
+ * a foreign-key constraint. If the PK and FK types are the same then
+ * all three operators for a column are the same; otherwise they are
+ * different.
+ */
+ ObjectAddress oprobject;
+
+ oprobject.classId = OperatorRelationId;
+ oprobject.objectSubId = 0;
+
+ for (i = 0; i < foreignNKeys; i++)
+ {
+ oprobject.objectId = pfEqOp[i];
+ add_exact_object_address(&oprobject, addrs_normal);
+ if (ppEqOp[i] != pfEqOp[i])
+ {
+ oprobject.objectId = ppEqOp[i];
+ add_exact_object_address(&oprobject, addrs_normal);
+ }
+ if (ffEqOp[i] != pfEqOp[i])
+ {
+ oprobject.objectId = ffEqOp[i];
+ add_exact_object_address(&oprobject, addrs_normal);
+ }
+ }
+ }
+
+ record_object_address_dependencies(&conobject, addrs_normal,
+ DEPENDENCY_NORMAL);
+ free_object_addresses(addrs_normal);
+
+ /*
+ * We don't bother to register dependencies on the exclusion operators of
+ * an exclusion constraint. We assume they are members of the opclass
+ * supporting the index, so there's an indirect dependency via that. (This
+ * would be pretty dicey for cross-type operators, but exclusion operators
+ * can never be cross-type.)
+ */
+
+ if (conExpr != NULL)
+ {
+ /*
+ * Register dependencies from constraint to objects mentioned in CHECK
+ * expression.
+ */
+ recordDependencyOnSingleRelExpr(&conobject, conExpr, relId,
+ DEPENDENCY_NORMAL,
+ DEPENDENCY_NORMAL, false);
+ }
+
+ /* Post creation hook for new constraint */
+ InvokeObjectPostCreateHookArg(ConstraintRelationId, conOid, 0,
+ is_internal);
+
+ return conOid;
+}
+
+/*
+ * Test whether given name is currently used as a constraint name
+ * for the given object (relation or domain).
+ *
+ * This is used to decide whether to accept a user-specified constraint name.
+ * It is deliberately not the same test as ChooseConstraintName uses to decide
+ * whether an auto-generated name is OK: here, we will allow it unless there
+ * is an identical constraint name in use *on the same object*.
+ *
+ * NB: Caller should hold exclusive lock on the given object, else
+ * this test can be fooled by concurrent additions.
+ */
+bool
+ConstraintNameIsUsed(ConstraintCategory conCat, Oid objId,
+ const char *conname)
+{
+ bool found;
+ Relation conDesc;
+ SysScanDesc conscan;
+ ScanKeyData skey[3];
+
+ conDesc = table_open(ConstraintRelationId, AccessShareLock);
+
+ ScanKeyInit(&skey[0],
+ Anum_pg_constraint_conrelid,
+ BTEqualStrategyNumber, F_OIDEQ,
+ ObjectIdGetDatum((conCat == CONSTRAINT_RELATION)
+ ? objId : InvalidOid));
+ ScanKeyInit(&skey[1],
+ Anum_pg_constraint_contypid,
+ BTEqualStrategyNumber, F_OIDEQ,
+ ObjectIdGetDatum((conCat == CONSTRAINT_DOMAIN)
+ ? objId : InvalidOid));
+ ScanKeyInit(&skey[2],
+ Anum_pg_constraint_conname,
+ BTEqualStrategyNumber, F_NAMEEQ,
+ CStringGetDatum(conname));
+
+ conscan = systable_beginscan(conDesc, ConstraintRelidTypidNameIndexId,
+ true, NULL, 3, skey);
+
+ /* There can be at most one matching row */
+ found = (HeapTupleIsValid(systable_getnext(conscan)));
+
+ systable_endscan(conscan);
+ table_close(conDesc, AccessShareLock);
+
+ return found;
+}
+
+/*
+ * Does any constraint of the given name exist in the given namespace?
+ *
+ * This is used for code that wants to match ChooseConstraintName's rule
+ * that we should avoid autogenerating duplicate constraint names within a
+ * namespace.
+ */
+bool
+ConstraintNameExists(const char *conname, Oid namespaceid)
+{
+ bool found;
+ Relation conDesc;
+ SysScanDesc conscan;
+ ScanKeyData skey[2];
+
+ conDesc = table_open(ConstraintRelationId, AccessShareLock);
+
+ ScanKeyInit(&skey[0],
+ Anum_pg_constraint_conname,
+ BTEqualStrategyNumber, F_NAMEEQ,
+ CStringGetDatum(conname));
+
+ ScanKeyInit(&skey[1],
+ Anum_pg_constraint_connamespace,
+ BTEqualStrategyNumber, F_OIDEQ,
+ ObjectIdGetDatum(namespaceid));
+
+ conscan = systable_beginscan(conDesc, ConstraintNameNspIndexId, true,
+ NULL, 2, skey);
+
+ found = (HeapTupleIsValid(systable_getnext(conscan)));
+
+ systable_endscan(conscan);
+ table_close(conDesc, AccessShareLock);
+
+ return found;
+}
+
+/*
+ * Select a nonconflicting name for a new constraint.
+ *
+ * The objective here is to choose a name that is unique within the
+ * specified namespace. Postgres does not require this, but the SQL
+ * spec does, and some apps depend on it. Therefore we avoid choosing
+ * default names that so conflict.
+ *
+ * name1, name2, and label are used the same way as for makeObjectName(),
+ * except that the label can't be NULL; digits will be appended to the label
+ * if needed to create a name that is unique within the specified namespace.
+ *
+ * 'others' can be a list of string names already chosen within the current
+ * command (but not yet reflected into the catalogs); we will not choose
+ * a duplicate of one of these either.
+ *
+ * Note: it is theoretically possible to get a collision anyway, if someone
+ * else chooses the same name concurrently. This is fairly unlikely to be
+ * a problem in practice, especially if one is holding an exclusive lock on
+ * the relation identified by name1.
+ *
+ * Returns a palloc'd string.
+ */
+char *
+ChooseConstraintName(const char *name1, const char *name2,
+ const char *label, Oid namespaceid,
+ List *others)
+{
+ int pass = 0;
+ char *conname = NULL;
+ char modlabel[NAMEDATALEN];
+ Relation conDesc;
+ SysScanDesc conscan;
+ ScanKeyData skey[2];
+ bool found;
+ ListCell *l;
+
+ conDesc = table_open(ConstraintRelationId, AccessShareLock);
+
+ /* try the unmodified label first */
+ strlcpy(modlabel, label, sizeof(modlabel));
+
+ for (;;)
+ {
+ conname = makeObjectName(name1, name2, modlabel);
+
+ found = false;
+
+ foreach(l, others)
+ {
+ if (strcmp((char *) lfirst(l), conname) == 0)
+ {
+ found = true;
+ break;
+ }
+ }
+
+ if (!found)
+ {
+ ScanKeyInit(&skey[0],
+ Anum_pg_constraint_conname,
+ BTEqualStrategyNumber, F_NAMEEQ,
+ CStringGetDatum(conname));
+
+ ScanKeyInit(&skey[1],
+ Anum_pg_constraint_connamespace,
+ BTEqualStrategyNumber, F_OIDEQ,
+ ObjectIdGetDatum(namespaceid));
+
+ conscan = systable_beginscan(conDesc, ConstraintNameNspIndexId, true,
+ NULL, 2, skey);
+
+ found = (HeapTupleIsValid(systable_getnext(conscan)));
+
+ systable_endscan(conscan);
+ }
+
+ if (!found)
+ break;
+
+ /* found a conflict, so try a new name component */
+ pfree(conname);
+ snprintf(modlabel, sizeof(modlabel), "%s%d", label, ++pass);
+ }
+
+ table_close(conDesc, AccessShareLock);
+
+ return conname;
+}
+
+/*
+ * Delete a single constraint record.
+ */
+void
+RemoveConstraintById(Oid conId)
+{
+ Relation conDesc;
+ HeapTuple tup;
+ Form_pg_constraint con;
+
+ conDesc = table_open(ConstraintRelationId, RowExclusiveLock);
+
+ tup = SearchSysCache1(CONSTROID, ObjectIdGetDatum(conId));
+ if (!HeapTupleIsValid(tup)) /* should not happen */
+ elog(ERROR, "cache lookup failed for constraint %u", conId);
+ con = (Form_pg_constraint) GETSTRUCT(tup);
+
+ /*
+ * Special processing depending on what the constraint is for.
+ */
+ if (OidIsValid(con->conrelid))
+ {
+ Relation rel;
+
+ /*
+ * If the constraint is for a relation, open and exclusive-lock the
+ * relation it's for.
+ */
+ rel = table_open(con->conrelid, AccessExclusiveLock);
+
+ /*
+ * We need to update the relchecks count if it is a check constraint
+ * being dropped. This update will force backends to rebuild relcache
+ * entries when we commit.
+ */
+ if (con->contype == CONSTRAINT_CHECK)
+ {
+ Relation pgrel;
+ HeapTuple relTup;
+ Form_pg_class classForm;
+
+ pgrel = table_open(RelationRelationId, RowExclusiveLock);
+ relTup = SearchSysCacheCopy1(RELOID,
+ ObjectIdGetDatum(con->conrelid));
+ if (!HeapTupleIsValid(relTup))
+ elog(ERROR, "cache lookup failed for relation %u",
+ con->conrelid);
+ classForm = (Form_pg_class) GETSTRUCT(relTup);
+
+ if (classForm->relchecks == 0) /* should not happen */
+ elog(ERROR, "relation \"%s\" has relchecks = 0",
+ RelationGetRelationName(rel));
+ classForm->relchecks--;
+
+ CatalogTupleUpdate(pgrel, &relTup->t_self, relTup);
+
+ heap_freetuple(relTup);
+
+ table_close(pgrel, RowExclusiveLock);
+ }
+
+ /* Keep lock on constraint's rel until end of xact */
+ table_close(rel, NoLock);
+ }
+ else if (OidIsValid(con->contypid))
+ {
+ /*
+ * XXX for now, do nothing special when dropping a domain constraint
+ *
+ * Probably there should be some form of locking on the domain type,
+ * but we have no such concept at the moment.
+ */
+ }
+ else
+ elog(ERROR, "constraint %u is not of a known type", conId);
+
+ /* Fry the constraint itself */
+ CatalogTupleDelete(conDesc, &tup->t_self);
+
+ /* Clean up */
+ ReleaseSysCache(tup);
+ table_close(conDesc, RowExclusiveLock);
+}
+
+/*
+ * RenameConstraintById
+ * Rename a constraint.
+ *
+ * Note: this isn't intended to be a user-exposed function; it doesn't check
+ * permissions etc. Currently this is only invoked when renaming an index
+ * that is associated with a constraint, but it's made a little more general
+ * than that with the expectation of someday having ALTER TABLE RENAME
+ * CONSTRAINT.
+ */
+void
+RenameConstraintById(Oid conId, const char *newname)
+{
+ Relation conDesc;
+ HeapTuple tuple;
+ Form_pg_constraint con;
+
+ conDesc = table_open(ConstraintRelationId, RowExclusiveLock);
+
+ tuple = SearchSysCacheCopy1(CONSTROID, ObjectIdGetDatum(conId));
+ if (!HeapTupleIsValid(tuple))
+ elog(ERROR, "cache lookup failed for constraint %u", conId);
+ con = (Form_pg_constraint) GETSTRUCT(tuple);
+
+ /*
+ * For user-friendliness, check whether the name is already in use.
+ */
+ if (OidIsValid(con->conrelid) &&
+ ConstraintNameIsUsed(CONSTRAINT_RELATION,
+ con->conrelid,
+ newname))
+ ereport(ERROR,
+ (errcode(ERRCODE_DUPLICATE_OBJECT),
+ errmsg("constraint \"%s\" for relation \"%s\" already exists",
+ newname, get_rel_name(con->conrelid))));
+ if (OidIsValid(con->contypid) &&
+ ConstraintNameIsUsed(CONSTRAINT_DOMAIN,
+ con->contypid,
+ newname))
+ ereport(ERROR,
+ (errcode(ERRCODE_DUPLICATE_OBJECT),
+ errmsg("constraint \"%s\" for domain %s already exists",
+ newname, format_type_be(con->contypid))));
+
+ /* OK, do the rename --- tuple is a copy, so OK to scribble on it */
+ namestrcpy(&(con->conname), newname);
+
+ CatalogTupleUpdate(conDesc, &tuple->t_self, tuple);
+
+ InvokeObjectPostAlterHook(ConstraintRelationId, conId, 0);
+
+ heap_freetuple(tuple);
+ table_close(conDesc, RowExclusiveLock);
+}
+
+/*
+ * AlterConstraintNamespaces
+ * Find any constraints belonging to the specified object,
+ * and move them to the specified new namespace.
+ *
+ * isType indicates whether the owning object is a type or a relation.
+ */
+void
+AlterConstraintNamespaces(Oid ownerId, Oid oldNspId,
+ Oid newNspId, bool isType, ObjectAddresses *objsMoved)
+{
+ Relation conRel;
+ ScanKeyData key[2];
+ SysScanDesc scan;
+ HeapTuple tup;
+
+ conRel = table_open(ConstraintRelationId, RowExclusiveLock);
+
+ ScanKeyInit(&key[0],
+ Anum_pg_constraint_conrelid,
+ BTEqualStrategyNumber, F_OIDEQ,
+ ObjectIdGetDatum(isType ? InvalidOid : ownerId));
+ ScanKeyInit(&key[1],
+ Anum_pg_constraint_contypid,
+ BTEqualStrategyNumber, F_OIDEQ,
+ ObjectIdGetDatum(isType ? ownerId : InvalidOid));
+
+ scan = systable_beginscan(conRel, ConstraintRelidTypidNameIndexId, true,
+ NULL, 2, key);
+
+ while (HeapTupleIsValid((tup = systable_getnext(scan))))
+ {
+ Form_pg_constraint conform = (Form_pg_constraint) GETSTRUCT(tup);
+ ObjectAddress thisobj;
+
+ ObjectAddressSet(thisobj, ConstraintRelationId, conform->oid);
+
+ if (object_address_present(&thisobj, objsMoved))
+ continue;
+
+ /* Don't update if the object is already part of the namespace */
+ if (conform->connamespace == oldNspId && oldNspId != newNspId)
+ {
+ tup = heap_copytuple(tup);
+ conform = (Form_pg_constraint) GETSTRUCT(tup);
+
+ conform->connamespace = newNspId;
+
+ CatalogTupleUpdate(conRel, &tup->t_self, tup);
+
+ /*
+ * Note: currently, the constraint will not have its own
+ * dependency on the namespace, so we don't need to do
+ * changeDependencyFor().
+ */
+ }
+
+ InvokeObjectPostAlterHook(ConstraintRelationId, thisobj.objectId, 0);
+
+ add_exact_object_address(&thisobj, objsMoved);
+ }
+
+ systable_endscan(scan);
+
+ table_close(conRel, RowExclusiveLock);
+}
+
+/*
+ * ConstraintSetParentConstraint
+ * Set a partition's constraint as child of its parent constraint,
+ * or remove the linkage if parentConstrId is InvalidOid.
+ *
+ * This updates the constraint's pg_constraint row to show it as inherited, and
+ * adds PARTITION dependencies to prevent the constraint from being deleted
+ * on its own. Alternatively, reverse that.
+ */
+void
+ConstraintSetParentConstraint(Oid childConstrId,
+ Oid parentConstrId,
+ Oid childTableId)
+{
+ Relation constrRel;
+ Form_pg_constraint constrForm;
+ HeapTuple tuple,
+ newtup;
+ ObjectAddress depender;
+ ObjectAddress referenced;
+
+ constrRel = table_open(ConstraintRelationId, RowExclusiveLock);
+ tuple = SearchSysCache1(CONSTROID, ObjectIdGetDatum(childConstrId));
+ if (!HeapTupleIsValid(tuple))
+ elog(ERROR, "cache lookup failed for constraint %u", childConstrId);
+ newtup = heap_copytuple(tuple);
+ constrForm = (Form_pg_constraint) GETSTRUCT(newtup);
+ if (OidIsValid(parentConstrId))
+ {
+ /* don't allow setting parent for a constraint that already has one */
+ Assert(constrForm->coninhcount == 0);
+ if (constrForm->conparentid != InvalidOid)
+ elog(ERROR, "constraint %u already has a parent constraint",
+ childConstrId);
+
+ constrForm->conislocal = false;
+ constrForm->coninhcount++;
+ constrForm->conparentid = parentConstrId;
+
+ CatalogTupleUpdate(constrRel, &tuple->t_self, newtup);
+
+ ObjectAddressSet(depender, ConstraintRelationId, childConstrId);
+
+ ObjectAddressSet(referenced, ConstraintRelationId, parentConstrId);
+ recordDependencyOn(&depender, &referenced, DEPENDENCY_PARTITION_PRI);
+
+ ObjectAddressSet(referenced, RelationRelationId, childTableId);
+ recordDependencyOn(&depender, &referenced, DEPENDENCY_PARTITION_SEC);
+ }
+ else
+ {
+ constrForm->coninhcount--;
+ constrForm->conislocal = true;
+ constrForm->conparentid = InvalidOid;
+
+ /* Make sure there's no further inheritance. */
+ Assert(constrForm->coninhcount == 0);
+
+ CatalogTupleUpdate(constrRel, &tuple->t_self, newtup);
+
+ deleteDependencyRecordsForClass(ConstraintRelationId, childConstrId,
+ ConstraintRelationId,
+ DEPENDENCY_PARTITION_PRI);
+ deleteDependencyRecordsForClass(ConstraintRelationId, childConstrId,
+ RelationRelationId,
+ DEPENDENCY_PARTITION_SEC);
+ }
+
+ ReleaseSysCache(tuple);
+ table_close(constrRel, RowExclusiveLock);
+}
+
+
+/*
+ * get_relation_constraint_oid
+ * Find a constraint on the specified relation with the specified name.
+ * Returns constraint's OID.
+ */
+Oid
+get_relation_constraint_oid(Oid relid, const char *conname, bool missing_ok)
+{
+ Relation pg_constraint;
+ HeapTuple tuple;
+ SysScanDesc scan;
+ ScanKeyData skey[3];
+ Oid conOid = InvalidOid;
+
+ pg_constraint = table_open(ConstraintRelationId, AccessShareLock);
+
+ ScanKeyInit(&skey[0],
+ Anum_pg_constraint_conrelid,
+ BTEqualStrategyNumber, F_OIDEQ,
+ ObjectIdGetDatum(relid));
+ ScanKeyInit(&skey[1],
+ Anum_pg_constraint_contypid,
+ BTEqualStrategyNumber, F_OIDEQ,
+ ObjectIdGetDatum(InvalidOid));
+ ScanKeyInit(&skey[2],
+ Anum_pg_constraint_conname,
+ BTEqualStrategyNumber, F_NAMEEQ,
+ CStringGetDatum(conname));
+
+ scan = systable_beginscan(pg_constraint, ConstraintRelidTypidNameIndexId, true,
+ NULL, 3, skey);
+
+ /* There can be at most one matching row */
+ if (HeapTupleIsValid(tuple = systable_getnext(scan)))
+ conOid = ((Form_pg_constraint) GETSTRUCT(tuple))->oid;
+
+ systable_endscan(scan);
+
+ /* If no such constraint exists, complain */
+ if (!OidIsValid(conOid) && !missing_ok)
+ ereport(ERROR,
+ (errcode(ERRCODE_UNDEFINED_OBJECT),
+ errmsg("constraint \"%s\" for table \"%s\" does not exist",
+ conname, get_rel_name(relid))));
+
+ table_close(pg_constraint, AccessShareLock);
+
+ return conOid;
+}
+
+/*
+ * get_relation_constraint_attnos
+ * Find a constraint on the specified relation with the specified name
+ * and return the constrained columns.
+ *
+ * Returns a Bitmapset of the column attnos of the constrained columns, with
+ * attnos being offset by FirstLowInvalidHeapAttributeNumber so that system
+ * columns can be represented.
+ *
+ * *constraintOid is set to the OID of the constraint, or InvalidOid on
+ * failure.
+ */
+Bitmapset *
+get_relation_constraint_attnos(Oid relid, const char *conname,
+ bool missing_ok, Oid *constraintOid)
+{
+ Bitmapset *conattnos = NULL;
+ Relation pg_constraint;
+ HeapTuple tuple;
+ SysScanDesc scan;
+ ScanKeyData skey[3];
+
+ /* Set *constraintOid, to avoid complaints about uninitialized vars */
+ *constraintOid = InvalidOid;
+
+ pg_constraint = table_open(ConstraintRelationId, AccessShareLock);
+
+ ScanKeyInit(&skey[0],
+ Anum_pg_constraint_conrelid,
+ BTEqualStrategyNumber, F_OIDEQ,
+ ObjectIdGetDatum(relid));
+ ScanKeyInit(&skey[1],
+ Anum_pg_constraint_contypid,
+ BTEqualStrategyNumber, F_OIDEQ,
+ ObjectIdGetDatum(InvalidOid));
+ ScanKeyInit(&skey[2],
+ Anum_pg_constraint_conname,
+ BTEqualStrategyNumber, F_NAMEEQ,
+ CStringGetDatum(conname));
+
+ scan = systable_beginscan(pg_constraint, ConstraintRelidTypidNameIndexId, true,
+ NULL, 3, skey);
+
+ /* There can be at most one matching row */
+ if (HeapTupleIsValid(tuple = systable_getnext(scan)))
+ {
+ Datum adatum;
+ bool isNull;
+
+ *constraintOid = ((Form_pg_constraint) GETSTRUCT(tuple))->oid;
+
+ /* Extract the conkey array, ie, attnums of constrained columns */
+ adatum = heap_getattr(tuple, Anum_pg_constraint_conkey,
+ RelationGetDescr(pg_constraint), &isNull);
+ if (!isNull)
+ {
+ ArrayType *arr;
+ int numcols;
+ int16 *attnums;
+ int i;
+
+ arr = DatumGetArrayTypeP(adatum); /* ensure not toasted */
+ numcols = ARR_DIMS(arr)[0];
+ if (ARR_NDIM(arr) != 1 ||
+ numcols < 0 ||
+ ARR_HASNULL(arr) ||
+ ARR_ELEMTYPE(arr) != INT2OID)
+ elog(ERROR, "conkey is not a 1-D smallint array");
+ attnums = (int16 *) ARR_DATA_PTR(arr);
+
+ /* Construct the result value */
+ for (i = 0; i < numcols; i++)
+ {
+ conattnos = bms_add_member(conattnos,
+ attnums[i] - FirstLowInvalidHeapAttributeNumber);
+ }
+ }
+ }
+
+ systable_endscan(scan);
+
+ /* If no such constraint exists, complain */
+ if (!OidIsValid(*constraintOid) && !missing_ok)
+ ereport(ERROR,
+ (errcode(ERRCODE_UNDEFINED_OBJECT),
+ errmsg("constraint \"%s\" for table \"%s\" does not exist",
+ conname, get_rel_name(relid))));
+
+ table_close(pg_constraint, AccessShareLock);
+
+ return conattnos;
+}
+
+/*
+ * Return the OID of the constraint associated with the given index in the
+ * given relation; or InvalidOid if no such index is catalogued.
+ */
+Oid
+get_relation_idx_constraint_oid(Oid relationId, Oid indexId)
+{
+ Relation pg_constraint;
+ SysScanDesc scan;
+ ScanKeyData key;
+ HeapTuple tuple;
+ Oid constraintId = InvalidOid;
+
+ pg_constraint = table_open(ConstraintRelationId, AccessShareLock);
+
+ ScanKeyInit(&key,
+ Anum_pg_constraint_conrelid,
+ BTEqualStrategyNumber,
+ F_OIDEQ,
+ ObjectIdGetDatum(relationId));
+ scan = systable_beginscan(pg_constraint, ConstraintRelidTypidNameIndexId,
+ true, NULL, 1, &key);
+ while ((tuple = systable_getnext(scan)) != NULL)
+ {
+ Form_pg_constraint constrForm;
+
+ constrForm = (Form_pg_constraint) GETSTRUCT(tuple);
+ if (constrForm->conindid == indexId)
+ {
+ constraintId = constrForm->oid;
+ break;
+ }
+ }
+ systable_endscan(scan);
+
+ table_close(pg_constraint, AccessShareLock);
+ return constraintId;
+}
+
+/*
+ * get_domain_constraint_oid
+ * Find a constraint on the specified domain with the specified name.
+ * Returns constraint's OID.
+ */
+Oid
+get_domain_constraint_oid(Oid typid, const char *conname, bool missing_ok)
+{
+ Relation pg_constraint;
+ HeapTuple tuple;
+ SysScanDesc scan;
+ ScanKeyData skey[3];
+ Oid conOid = InvalidOid;
+
+ pg_constraint = table_open(ConstraintRelationId, AccessShareLock);
+
+ ScanKeyInit(&skey[0],
+ Anum_pg_constraint_conrelid,
+ BTEqualStrategyNumber, F_OIDEQ,
+ ObjectIdGetDatum(InvalidOid));
+ ScanKeyInit(&skey[1],
+ Anum_pg_constraint_contypid,
+ BTEqualStrategyNumber, F_OIDEQ,
+ ObjectIdGetDatum(typid));
+ ScanKeyInit(&skey[2],
+ Anum_pg_constraint_conname,
+ BTEqualStrategyNumber, F_NAMEEQ,
+ CStringGetDatum(conname));
+
+ scan = systable_beginscan(pg_constraint, ConstraintRelidTypidNameIndexId, true,
+ NULL, 3, skey);
+
+ /* There can be at most one matching row */
+ if (HeapTupleIsValid(tuple = systable_getnext(scan)))
+ conOid = ((Form_pg_constraint) GETSTRUCT(tuple))->oid;
+
+ systable_endscan(scan);
+
+ /* If no such constraint exists, complain */
+ if (!OidIsValid(conOid) && !missing_ok)
+ ereport(ERROR,
+ (errcode(ERRCODE_UNDEFINED_OBJECT),
+ errmsg("constraint \"%s\" for domain %s does not exist",
+ conname, format_type_be(typid))));
+
+ table_close(pg_constraint, AccessShareLock);
+
+ return conOid;
+}
+
+/*
+ * get_primary_key_attnos
+ * Identify the columns in a relation's primary key, if any.
+ *
+ * Returns a Bitmapset of the column attnos of the primary key's columns,
+ * with attnos being offset by FirstLowInvalidHeapAttributeNumber so that
+ * system columns can be represented.
+ *
+ * If there is no primary key, return NULL. We also return NULL if the pkey
+ * constraint is deferrable and deferrableOk is false.
+ *
+ * *constraintOid is set to the OID of the pkey constraint, or InvalidOid
+ * on failure.
+ */
+Bitmapset *
+get_primary_key_attnos(Oid relid, bool deferrableOk, Oid *constraintOid)
+{
+ Bitmapset *pkattnos = NULL;
+ Relation pg_constraint;
+ HeapTuple tuple;
+ SysScanDesc scan;
+ ScanKeyData skey[1];
+
+ /* Set *constraintOid, to avoid complaints about uninitialized vars */
+ *constraintOid = InvalidOid;
+
+ /* Scan pg_constraint for constraints of the target rel */
+ pg_constraint = table_open(ConstraintRelationId, AccessShareLock);
+
+ ScanKeyInit(&skey[0],
+ Anum_pg_constraint_conrelid,
+ BTEqualStrategyNumber, F_OIDEQ,
+ ObjectIdGetDatum(relid));
+
+ scan = systable_beginscan(pg_constraint, ConstraintRelidTypidNameIndexId, true,
+ NULL, 1, skey);
+
+ while (HeapTupleIsValid(tuple = systable_getnext(scan)))
+ {
+ Form_pg_constraint con = (Form_pg_constraint) GETSTRUCT(tuple);
+ Datum adatum;
+ bool isNull;
+ ArrayType *arr;
+ int16 *attnums;
+ int numkeys;
+ int i;
+
+ /* Skip constraints that are not PRIMARY KEYs */
+ if (con->contype != CONSTRAINT_PRIMARY)
+ continue;
+
+ /*
+ * If the primary key is deferrable, but we've been instructed to
+ * ignore deferrable constraints, then we might as well give up
+ * searching, since there can only be a single primary key on a table.
+ */
+ if (con->condeferrable && !deferrableOk)
+ break;
+
+ /* Extract the conkey array, ie, attnums of PK's columns */
+ adatum = heap_getattr(tuple, Anum_pg_constraint_conkey,
+ RelationGetDescr(pg_constraint), &isNull);
+ if (isNull)
+ elog(ERROR, "null conkey for constraint %u",
+ ((Form_pg_constraint) GETSTRUCT(tuple))->oid);
+ arr = DatumGetArrayTypeP(adatum); /* ensure not toasted */
+ numkeys = ARR_DIMS(arr)[0];
+ if (ARR_NDIM(arr) != 1 ||
+ numkeys < 0 ||
+ ARR_HASNULL(arr) ||
+ ARR_ELEMTYPE(arr) != INT2OID)
+ elog(ERROR, "conkey is not a 1-D smallint array");
+ attnums = (int16 *) ARR_DATA_PTR(arr);
+
+ /* Construct the result value */
+ for (i = 0; i < numkeys; i++)
+ {
+ pkattnos = bms_add_member(pkattnos,
+ attnums[i] - FirstLowInvalidHeapAttributeNumber);
+ }
+ *constraintOid = ((Form_pg_constraint) GETSTRUCT(tuple))->oid;
+
+ /* No need to search further */
+ break;
+ }
+
+ systable_endscan(scan);
+
+ table_close(pg_constraint, AccessShareLock);
+
+ return pkattnos;
+}
+
+/*
+ * Extract data from the pg_constraint tuple of a foreign-key constraint.
+ *
+ * All arguments save the first are output arguments; the last three of them
+ * can be passed as NULL if caller doesn't need them.
+ */
+void
+DeconstructFkConstraintRow(HeapTuple tuple, int *numfks,
+ AttrNumber *conkey, AttrNumber *confkey,
+ Oid *pf_eq_oprs, Oid *pp_eq_oprs, Oid *ff_eq_oprs)
+{
+ Oid constrId;
+ Datum adatum;
+ bool isNull;
+ ArrayType *arr;
+ int numkeys;
+
+ constrId = ((Form_pg_constraint) GETSTRUCT(tuple))->oid;
+
+ /*
+ * We expect the arrays to be 1-D arrays of the right types; verify that.
+ * We don't need to use deconstruct_array() since the array data is just
+ * going to look like a C array of values.
+ */
+ adatum = SysCacheGetAttr(CONSTROID, tuple,
+ Anum_pg_constraint_conkey, &isNull);
+ if (isNull)
+ elog(ERROR, "null conkey for constraint %u", constrId);
+ arr = DatumGetArrayTypeP(adatum); /* ensure not toasted */
+ if (ARR_NDIM(arr) != 1 ||
+ ARR_HASNULL(arr) ||
+ ARR_ELEMTYPE(arr) != INT2OID)
+ elog(ERROR, "conkey is not a 1-D smallint array");
+ numkeys = ARR_DIMS(arr)[0];
+ if (numkeys <= 0 || numkeys > INDEX_MAX_KEYS)
+ elog(ERROR, "foreign key constraint cannot have %d columns", numkeys);
+ memcpy(conkey, ARR_DATA_PTR(arr), numkeys * sizeof(int16));
+ if ((Pointer) arr != DatumGetPointer(adatum))
+ pfree(arr); /* free de-toasted copy, if any */
+
+ adatum = SysCacheGetAttr(CONSTROID, tuple,
+ Anum_pg_constraint_confkey, &isNull);
+ if (isNull)
+ elog(ERROR, "null confkey for constraint %u", constrId);
+ arr = DatumGetArrayTypeP(adatum); /* ensure not toasted */
+ if (ARR_NDIM(arr) != 1 ||
+ ARR_DIMS(arr)[0] != numkeys ||
+ ARR_HASNULL(arr) ||
+ ARR_ELEMTYPE(arr) != INT2OID)
+ elog(ERROR, "confkey is not a 1-D smallint array");
+ memcpy(confkey, ARR_DATA_PTR(arr), numkeys * sizeof(int16));
+ if ((Pointer) arr != DatumGetPointer(adatum))
+ pfree(arr); /* free de-toasted copy, if any */
+
+ if (pf_eq_oprs)
+ {
+ adatum = SysCacheGetAttr(CONSTROID, tuple,
+ Anum_pg_constraint_conpfeqop, &isNull);
+ if (isNull)
+ elog(ERROR, "null conpfeqop for constraint %u", constrId);
+ arr = DatumGetArrayTypeP(adatum); /* ensure not toasted */
+ /* see TryReuseForeignKey if you change the test below */
+ if (ARR_NDIM(arr) != 1 ||
+ ARR_DIMS(arr)[0] != numkeys ||
+ ARR_HASNULL(arr) ||
+ ARR_ELEMTYPE(arr) != OIDOID)
+ elog(ERROR, "conpfeqop is not a 1-D Oid array");
+ memcpy(pf_eq_oprs, ARR_DATA_PTR(arr), numkeys * sizeof(Oid));
+ if ((Pointer) arr != DatumGetPointer(adatum))
+ pfree(arr); /* free de-toasted copy, if any */
+ }
+
+ if (pp_eq_oprs)
+ {
+ adatum = SysCacheGetAttr(CONSTROID, tuple,
+ Anum_pg_constraint_conppeqop, &isNull);
+ if (isNull)
+ elog(ERROR, "null conppeqop for constraint %u", constrId);
+ arr = DatumGetArrayTypeP(adatum); /* ensure not toasted */
+ if (ARR_NDIM(arr) != 1 ||
+ ARR_DIMS(arr)[0] != numkeys ||
+ ARR_HASNULL(arr) ||
+ ARR_ELEMTYPE(arr) != OIDOID)
+ elog(ERROR, "conppeqop is not a 1-D Oid array");
+ memcpy(pp_eq_oprs, ARR_DATA_PTR(arr), numkeys * sizeof(Oid));
+ if ((Pointer) arr != DatumGetPointer(adatum))
+ pfree(arr); /* free de-toasted copy, if any */
+ }
+
+ if (ff_eq_oprs)
+ {
+ adatum = SysCacheGetAttr(CONSTROID, tuple,
+ Anum_pg_constraint_conffeqop, &isNull);
+ if (isNull)
+ elog(ERROR, "null conffeqop for constraint %u", constrId);
+ arr = DatumGetArrayTypeP(adatum); /* ensure not toasted */
+ if (ARR_NDIM(arr) != 1 ||
+ ARR_DIMS(arr)[0] != numkeys ||
+ ARR_HASNULL(arr) ||
+ ARR_ELEMTYPE(arr) != OIDOID)
+ elog(ERROR, "conffeqop is not a 1-D Oid array");
+ memcpy(ff_eq_oprs, ARR_DATA_PTR(arr), numkeys * sizeof(Oid));
+ if ((Pointer) arr != DatumGetPointer(adatum))
+ pfree(arr); /* free de-toasted copy, if any */
+ }
+
+ *numfks = numkeys;
+}
+
+/*
+ * Determine whether a relation can be proven functionally dependent on
+ * a set of grouping columns. If so, return true and add the pg_constraint
+ * OIDs of the constraints needed for the proof to the *constraintDeps list.
+ *
+ * grouping_columns is a list of grouping expressions, in which columns of
+ * the rel of interest are Vars with the indicated varno/varlevelsup.
+ *
+ * Currently we only check to see if the rel has a primary key that is a
+ * subset of the grouping_columns. We could also use plain unique constraints
+ * if all their columns are known not null, but there's a problem: we need
+ * to be able to represent the not-null-ness as part of the constraints added
+ * to *constraintDeps. FIXME whenever not-null constraints get represented
+ * in pg_constraint.
+ */
+bool
+check_functional_grouping(Oid relid,
+ Index varno, Index varlevelsup,
+ List *grouping_columns,
+ List **constraintDeps)
+{
+ Bitmapset *pkattnos;
+ Bitmapset *groupbyattnos;
+ Oid constraintOid;
+ ListCell *gl;
+
+ /* If the rel has no PK, then we can't prove functional dependency */
+ pkattnos = get_primary_key_attnos(relid, false, &constraintOid);
+ if (pkattnos == NULL)
+ return false;
+
+ /* Identify all the rel's columns that appear in grouping_columns */
+ groupbyattnos = NULL;
+ foreach(gl, grouping_columns)
+ {
+ Var *gvar = (Var *) lfirst(gl);
+
+ if (IsA(gvar, Var) &&
+ gvar->varno == varno &&
+ gvar->varlevelsup == varlevelsup)
+ groupbyattnos = bms_add_member(groupbyattnos,
+ gvar->varattno - FirstLowInvalidHeapAttributeNumber);
+ }
+
+ if (bms_is_subset(pkattnos, groupbyattnos))
+ {
+ /* The PK is a subset of grouping_columns, so we win */
+ *constraintDeps = lappend_oid(*constraintDeps, constraintOid);
+ return true;
+ }
+
+ return false;
+}