diff options
Diffstat (limited to 'src/backend/catalog/pg_inherits.c')
-rw-r--r-- | src/backend/catalog/pg_inherits.c | 493 |
1 files changed, 493 insertions, 0 deletions
diff --git a/src/backend/catalog/pg_inherits.c b/src/backend/catalog/pg_inherits.c new file mode 100644 index 0000000..5145a56 --- /dev/null +++ b/src/backend/catalog/pg_inherits.c @@ -0,0 +1,493 @@ +/*------------------------------------------------------------------------- + * + * pg_inherits.c + * routines to support manipulation of the pg_inherits relation + * + * Note: currently, this module mostly contains inquiry functions; actual + * creation and deletion of pg_inherits entries is mostly done in tablecmds.c. + * Perhaps someday that code should be moved here, but it'd have to be + * disentangled from other stuff such as pg_depend updates. + * + * Portions Copyright (c) 1996-2020, PostgreSQL Global Development Group + * Portions Copyright (c) 1994, Regents of the University of California + * + * + * IDENTIFICATION + * src/backend/catalog/pg_inherits.c + * + *------------------------------------------------------------------------- + */ +#include "postgres.h" + +#include "access/genam.h" +#include "access/htup_details.h" +#include "access/table.h" +#include "catalog/indexing.h" +#include "catalog/pg_inherits.h" +#include "parser/parse_type.h" +#include "storage/lmgr.h" +#include "utils/builtins.h" +#include "utils/fmgroids.h" +#include "utils/memutils.h" +#include "utils/syscache.h" + +/* + * Entry of a hash table used in find_all_inheritors. See below. + */ +typedef struct SeenRelsEntry +{ + Oid rel_id; /* relation oid */ + int list_index; /* its position in output list(s) */ +} SeenRelsEntry; + +/* + * find_inheritance_children + * + * Returns a list containing the OIDs of all relations which + * inherit *directly* from the relation with OID 'parentrelId'. + * + * The specified lock type is acquired on each child relation (but not on the + * given rel; caller should already have locked it). If lockmode is NoLock + * then no locks are acquired, but caller must beware of race conditions + * against possible DROPs of child relations. + */ +List * +find_inheritance_children(Oid parentrelId, LOCKMODE lockmode) +{ + List *list = NIL; + Relation relation; + SysScanDesc scan; + ScanKeyData key[1]; + HeapTuple inheritsTuple; + Oid inhrelid; + Oid *oidarr; + int maxoids, + numoids, + i; + + /* + * Can skip the scan if pg_class shows the relation has never had a + * subclass. + */ + if (!has_subclass(parentrelId)) + return NIL; + + /* + * Scan pg_inherits and build a working array of subclass OIDs. + */ + maxoids = 32; + oidarr = (Oid *) palloc(maxoids * sizeof(Oid)); + numoids = 0; + + relation = table_open(InheritsRelationId, AccessShareLock); + + ScanKeyInit(&key[0], + Anum_pg_inherits_inhparent, + BTEqualStrategyNumber, F_OIDEQ, + ObjectIdGetDatum(parentrelId)); + + scan = systable_beginscan(relation, InheritsParentIndexId, true, + NULL, 1, key); + + while ((inheritsTuple = systable_getnext(scan)) != NULL) + { + inhrelid = ((Form_pg_inherits) GETSTRUCT(inheritsTuple))->inhrelid; + if (numoids >= maxoids) + { + maxoids *= 2; + oidarr = (Oid *) repalloc(oidarr, maxoids * sizeof(Oid)); + } + oidarr[numoids++] = inhrelid; + } + + systable_endscan(scan); + + table_close(relation, AccessShareLock); + + /* + * If we found more than one child, sort them by OID. This ensures + * reasonably consistent behavior regardless of the vagaries of an + * indexscan. This is important since we need to be sure all backends + * lock children in the same order to avoid needless deadlocks. + */ + if (numoids > 1) + qsort(oidarr, numoids, sizeof(Oid), oid_cmp); + + /* + * Acquire locks and build the result list. + */ + for (i = 0; i < numoids; i++) + { + inhrelid = oidarr[i]; + + if (lockmode != NoLock) + { + /* Get the lock to synchronize against concurrent drop */ + LockRelationOid(inhrelid, lockmode); + + /* + * Now that we have the lock, double-check to see if the relation + * really exists or not. If not, assume it was dropped while we + * waited to acquire lock, and ignore it. + */ + if (!SearchSysCacheExists1(RELOID, ObjectIdGetDatum(inhrelid))) + { + /* Release useless lock */ + UnlockRelationOid(inhrelid, lockmode); + /* And ignore this relation */ + continue; + } + } + + list = lappend_oid(list, inhrelid); + } + + pfree(oidarr); + + return list; +} + + +/* + * find_all_inheritors - + * Returns a list of relation OIDs including the given rel plus + * all relations that inherit from it, directly or indirectly. + * Optionally, it also returns the number of parents found for + * each such relation within the inheritance tree rooted at the + * given rel. + * + * The specified lock type is acquired on all child relations (but not on the + * given rel; caller should already have locked it). If lockmode is NoLock + * then no locks are acquired, but caller must beware of race conditions + * against possible DROPs of child relations. + */ +List * +find_all_inheritors(Oid parentrelId, LOCKMODE lockmode, List **numparents) +{ + /* hash table for O(1) rel_oid -> rel_numparents cell lookup */ + HTAB *seen_rels; + HASHCTL ctl; + List *rels_list, + *rel_numparents; + ListCell *l; + + memset(&ctl, 0, sizeof(ctl)); + ctl.keysize = sizeof(Oid); + ctl.entrysize = sizeof(SeenRelsEntry); + ctl.hcxt = CurrentMemoryContext; + + seen_rels = hash_create("find_all_inheritors temporary table", + 32, /* start small and extend */ + &ctl, + HASH_ELEM | HASH_BLOBS | HASH_CONTEXT); + + /* + * We build a list starting with the given rel and adding all direct and + * indirect children. We can use a single list as both the record of + * already-found rels and the agenda of rels yet to be scanned for more + * children. This is a bit tricky but works because the foreach() macro + * doesn't fetch the next list element until the bottom of the loop. Note + * that we can't keep pointers into the output lists; but an index is + * sufficient. + */ + rels_list = list_make1_oid(parentrelId); + rel_numparents = list_make1_int(0); + + foreach(l, rels_list) + { + Oid currentrel = lfirst_oid(l); + List *currentchildren; + ListCell *lc; + + /* Get the direct children of this rel */ + currentchildren = find_inheritance_children(currentrel, lockmode); + + /* + * Add to the queue only those children not already seen. This avoids + * making duplicate entries in case of multiple inheritance paths from + * the same parent. (It'll also keep us from getting into an infinite + * loop, though theoretically there can't be any cycles in the + * inheritance graph anyway.) + */ + foreach(lc, currentchildren) + { + Oid child_oid = lfirst_oid(lc); + bool found; + SeenRelsEntry *hash_entry; + + hash_entry = hash_search(seen_rels, &child_oid, HASH_ENTER, &found); + if (found) + { + /* if the rel is already there, bump number-of-parents counter */ + ListCell *numparents_cell; + + numparents_cell = list_nth_cell(rel_numparents, + hash_entry->list_index); + lfirst_int(numparents_cell)++; + } + else + { + /* if it's not there, add it. expect 1 parent, initially. */ + hash_entry->list_index = list_length(rels_list); + rels_list = lappend_oid(rels_list, child_oid); + rel_numparents = lappend_int(rel_numparents, 1); + } + } + } + + if (numparents) + *numparents = rel_numparents; + else + list_free(rel_numparents); + + hash_destroy(seen_rels); + + return rels_list; +} + + +/* + * has_subclass - does this relation have any children? + * + * In the current implementation, has_subclass returns whether a + * particular class *might* have a subclass. It will not return the + * correct result if a class had a subclass which was later dropped. + * This is because relhassubclass in pg_class is not updated immediately + * when a subclass is dropped, primarily because of concurrency concerns. + * + * Currently has_subclass is only used as an efficiency hack to skip + * unnecessary inheritance searches, so this is OK. Note that ANALYZE + * on a childless table will clean up the obsolete relhassubclass flag. + * + * Although this doesn't actually touch pg_inherits, it seems reasonable + * to keep it here since it's normally used with the other routines here. + */ +bool +has_subclass(Oid relationId) +{ + HeapTuple tuple; + bool result; + + tuple = SearchSysCache1(RELOID, ObjectIdGetDatum(relationId)); + if (!HeapTupleIsValid(tuple)) + elog(ERROR, "cache lookup failed for relation %u", relationId); + + result = ((Form_pg_class) GETSTRUCT(tuple))->relhassubclass; + ReleaseSysCache(tuple); + return result; +} + +/* + * has_superclass - does this relation inherit from another? + * + * Unlike has_subclass, this can be relied on to give an accurate answer. + * However, the caller must hold a lock on the given relation so that it + * can't be concurrently added to or removed from an inheritance hierarchy. + */ +bool +has_superclass(Oid relationId) +{ + Relation catalog; + SysScanDesc scan; + ScanKeyData skey; + bool result; + + catalog = table_open(InheritsRelationId, AccessShareLock); + ScanKeyInit(&skey, Anum_pg_inherits_inhrelid, BTEqualStrategyNumber, + F_OIDEQ, ObjectIdGetDatum(relationId)); + scan = systable_beginscan(catalog, InheritsRelidSeqnoIndexId, true, + NULL, 1, &skey); + result = HeapTupleIsValid(systable_getnext(scan)); + systable_endscan(scan); + table_close(catalog, AccessShareLock); + + return result; +} + +/* + * Given two type OIDs, determine whether the first is a complex type + * (class type) that inherits from the second. + * + * This essentially asks whether the first type is guaranteed to be coercible + * to the second. Therefore, we allow the first type to be a domain over a + * complex type that inherits from the second; that creates no difficulties. + * But the second type cannot be a domain. + */ +bool +typeInheritsFrom(Oid subclassTypeId, Oid superclassTypeId) +{ + bool result = false; + Oid subclassRelid; + Oid superclassRelid; + Relation inhrel; + List *visited, + *queue; + ListCell *queue_item; + + /* We need to work with the associated relation OIDs */ + subclassRelid = typeOrDomainTypeRelid(subclassTypeId); + if (subclassRelid == InvalidOid) + return false; /* not a complex type or domain over one */ + superclassRelid = typeidTypeRelid(superclassTypeId); + if (superclassRelid == InvalidOid) + return false; /* not a complex type */ + + /* No point in searching if the superclass has no subclasses */ + if (!has_subclass(superclassRelid)) + return false; + + /* + * Begin the search at the relation itself, so add its relid to the queue. + */ + queue = list_make1_oid(subclassRelid); + visited = NIL; + + inhrel = table_open(InheritsRelationId, AccessShareLock); + + /* + * Use queue to do a breadth-first traversal of the inheritance graph from + * the relid supplied up to the root. Notice that we append to the queue + * inside the loop --- this is okay because the foreach() macro doesn't + * advance queue_item until the next loop iteration begins. + */ + foreach(queue_item, queue) + { + Oid this_relid = lfirst_oid(queue_item); + ScanKeyData skey; + SysScanDesc inhscan; + HeapTuple inhtup; + + /* + * If we've seen this relid already, skip it. This avoids extra work + * in multiple-inheritance scenarios, and also protects us from an + * infinite loop in case there is a cycle in pg_inherits (though + * theoretically that shouldn't happen). + */ + if (list_member_oid(visited, this_relid)) + continue; + + /* + * Okay, this is a not-yet-seen relid. Add it to the list of + * already-visited OIDs, then find all the types this relid inherits + * from and add them to the queue. + */ + visited = lappend_oid(visited, this_relid); + + ScanKeyInit(&skey, + Anum_pg_inherits_inhrelid, + BTEqualStrategyNumber, F_OIDEQ, + ObjectIdGetDatum(this_relid)); + + inhscan = systable_beginscan(inhrel, InheritsRelidSeqnoIndexId, true, + NULL, 1, &skey); + + while ((inhtup = systable_getnext(inhscan)) != NULL) + { + Form_pg_inherits inh = (Form_pg_inherits) GETSTRUCT(inhtup); + Oid inhparent = inh->inhparent; + + /* If this is the target superclass, we're done */ + if (inhparent == superclassRelid) + { + result = true; + break; + } + + /* Else add to queue */ + queue = lappend_oid(queue, inhparent); + } + + systable_endscan(inhscan); + + if (result) + break; + } + + /* clean up ... */ + table_close(inhrel, AccessShareLock); + + list_free(visited); + list_free(queue); + + return result; +} + +/* + * Create a single pg_inherits row with the given data + */ +void +StoreSingleInheritance(Oid relationId, Oid parentOid, int32 seqNumber) +{ + Datum values[Natts_pg_inherits]; + bool nulls[Natts_pg_inherits]; + HeapTuple tuple; + Relation inhRelation; + + inhRelation = table_open(InheritsRelationId, RowExclusiveLock); + + /* + * Make the pg_inherits entry + */ + values[Anum_pg_inherits_inhrelid - 1] = ObjectIdGetDatum(relationId); + values[Anum_pg_inherits_inhparent - 1] = ObjectIdGetDatum(parentOid); + values[Anum_pg_inherits_inhseqno - 1] = Int32GetDatum(seqNumber); + + memset(nulls, 0, sizeof(nulls)); + + tuple = heap_form_tuple(RelationGetDescr(inhRelation), values, nulls); + + CatalogTupleInsert(inhRelation, tuple); + + heap_freetuple(tuple); + + table_close(inhRelation, RowExclusiveLock); +} + +/* + * DeleteInheritsTuple + * + * Delete pg_inherits tuples with the given inhrelid. inhparent may be given + * as InvalidOid, in which case all tuples matching inhrelid are deleted; + * otherwise only delete tuples with the specified inhparent. + * + * Returns whether at least one row was deleted. + */ +bool +DeleteInheritsTuple(Oid inhrelid, Oid inhparent) +{ + bool found = false; + Relation catalogRelation; + ScanKeyData key; + SysScanDesc scan; + HeapTuple inheritsTuple; + + /* + * Find pg_inherits entries by inhrelid. + */ + catalogRelation = table_open(InheritsRelationId, RowExclusiveLock); + ScanKeyInit(&key, + Anum_pg_inherits_inhrelid, + BTEqualStrategyNumber, F_OIDEQ, + ObjectIdGetDatum(inhrelid)); + scan = systable_beginscan(catalogRelation, InheritsRelidSeqnoIndexId, + true, NULL, 1, &key); + + while (HeapTupleIsValid(inheritsTuple = systable_getnext(scan))) + { + Oid parent; + + /* Compare inhparent if it was given, and do the actual deletion. */ + parent = ((Form_pg_inherits) GETSTRUCT(inheritsTuple))->inhparent; + if (!OidIsValid(inhparent) || parent == inhparent) + { + CatalogTupleDelete(catalogRelation, &inheritsTuple->t_self); + found = true; + } + } + + /* Done */ + systable_endscan(scan); + table_close(catalogRelation, RowExclusiveLock); + + return found; +} |