diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-05-04 12:15:05 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-05-04 12:15:05 +0000 |
commit | 46651ce6fe013220ed397add242004d764fc0153 (patch) | |
tree | 6e5299f990f88e60174a1d3ae6e48eedd2688b2b /src/backend/executor/execReplication.c | |
parent | Initial commit. (diff) | |
download | postgresql-14-46651ce6fe013220ed397add242004d764fc0153.tar.xz postgresql-14-46651ce6fe013220ed397add242004d764fc0153.zip |
Adding upstream version 14.5.upstream/14.5upstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'src/backend/executor/execReplication.c')
-rw-r--r-- | src/backend/executor/execReplication.c | 629 |
1 files changed, 629 insertions, 0 deletions
diff --git a/src/backend/executor/execReplication.c b/src/backend/executor/execReplication.c new file mode 100644 index 0000000..1e285e0 --- /dev/null +++ b/src/backend/executor/execReplication.c @@ -0,0 +1,629 @@ +/*------------------------------------------------------------------------- + * + * execReplication.c + * miscellaneous executor routines for logical replication + * + * Portions Copyright (c) 1996-2021, PostgreSQL Global Development Group + * Portions Copyright (c) 1994, Regents of the University of California + * + * IDENTIFICATION + * src/backend/executor/execReplication.c + * + *------------------------------------------------------------------------- + */ + +#include "postgres.h" + +#include "access/genam.h" +#include "access/relscan.h" +#include "access/tableam.h" +#include "access/transam.h" +#include "access/xact.h" +#include "commands/trigger.h" +#include "executor/executor.h" +#include "executor/nodeModifyTable.h" +#include "nodes/nodeFuncs.h" +#include "parser/parse_relation.h" +#include "parser/parsetree.h" +#include "storage/bufmgr.h" +#include "storage/lmgr.h" +#include "utils/builtins.h" +#include "utils/datum.h" +#include "utils/lsyscache.h" +#include "utils/memutils.h" +#include "utils/rel.h" +#include "utils/snapmgr.h" +#include "utils/syscache.h" +#include "utils/typcache.h" + + +/* + * Setup a ScanKey for a search in the relation 'rel' for a tuple 'key' that + * is setup to match 'rel' (*NOT* idxrel!). + * + * Returns whether any column contains NULLs. + * + * This is not generic routine, it expects the idxrel to be replication + * identity of a rel and meet all limitations associated with that. + */ +static bool +build_replindex_scan_key(ScanKey skey, Relation rel, Relation idxrel, + TupleTableSlot *searchslot) +{ + int attoff; + bool isnull; + Datum indclassDatum; + oidvector *opclass; + int2vector *indkey = &idxrel->rd_index->indkey; + bool hasnulls = false; + + Assert(RelationGetReplicaIndex(rel) == RelationGetRelid(idxrel) || + RelationGetPrimaryKeyIndex(rel) == RelationGetRelid(idxrel)); + + indclassDatum = SysCacheGetAttr(INDEXRELID, idxrel->rd_indextuple, + Anum_pg_index_indclass, &isnull); + Assert(!isnull); + opclass = (oidvector *) DatumGetPointer(indclassDatum); + + /* Build scankey for every attribute in the index. */ + for (attoff = 0; attoff < IndexRelationGetNumberOfKeyAttributes(idxrel); attoff++) + { + Oid operator; + Oid opfamily; + RegProcedure regop; + int pkattno = attoff + 1; + int mainattno = indkey->values[attoff]; + Oid optype = get_opclass_input_type(opclass->values[attoff]); + + /* + * Load the operator info. We need this to get the equality operator + * function for the scan key. + */ + opfamily = get_opclass_family(opclass->values[attoff]); + + operator = get_opfamily_member(opfamily, optype, + optype, + BTEqualStrategyNumber); + if (!OidIsValid(operator)) + elog(ERROR, "missing operator %d(%u,%u) in opfamily %u", + BTEqualStrategyNumber, optype, optype, opfamily); + + regop = get_opcode(operator); + + /* Initialize the scankey. */ + ScanKeyInit(&skey[attoff], + pkattno, + BTEqualStrategyNumber, + regop, + searchslot->tts_values[mainattno - 1]); + + skey[attoff].sk_collation = idxrel->rd_indcollation[attoff]; + + /* Check for null value. */ + if (searchslot->tts_isnull[mainattno - 1]) + { + hasnulls = true; + skey[attoff].sk_flags |= SK_ISNULL; + } + } + + return hasnulls; +} + +/* + * Search the relation 'rel' for tuple using the index. + * + * If a matching tuple is found, lock it with lockmode, fill the slot with its + * contents, and return true. Return false otherwise. + */ +bool +RelationFindReplTupleByIndex(Relation rel, Oid idxoid, + LockTupleMode lockmode, + TupleTableSlot *searchslot, + TupleTableSlot *outslot) +{ + ScanKeyData skey[INDEX_MAX_KEYS]; + IndexScanDesc scan; + SnapshotData snap; + TransactionId xwait; + Relation idxrel; + bool found; + + /* Open the index. */ + idxrel = index_open(idxoid, RowExclusiveLock); + + /* Start an index scan. */ + InitDirtySnapshot(snap); + scan = index_beginscan(rel, idxrel, &snap, + IndexRelationGetNumberOfKeyAttributes(idxrel), + 0); + + /* Build scan key. */ + build_replindex_scan_key(skey, rel, idxrel, searchslot); + +retry: + found = false; + + index_rescan(scan, skey, IndexRelationGetNumberOfKeyAttributes(idxrel), NULL, 0); + + /* Try to find the tuple */ + if (index_getnext_slot(scan, ForwardScanDirection, outslot)) + { + found = true; + ExecMaterializeSlot(outslot); + + xwait = TransactionIdIsValid(snap.xmin) ? + snap.xmin : snap.xmax; + + /* + * If the tuple is locked, wait for locking transaction to finish and + * retry. + */ + if (TransactionIdIsValid(xwait)) + { + XactLockTableWait(xwait, NULL, NULL, XLTW_None); + goto retry; + } + } + + /* Found tuple, try to lock it in the lockmode. */ + if (found) + { + TM_FailureData tmfd; + TM_Result res; + + PushActiveSnapshot(GetLatestSnapshot()); + + res = table_tuple_lock(rel, &(outslot->tts_tid), GetLatestSnapshot(), + outslot, + GetCurrentCommandId(false), + lockmode, + LockWaitBlock, + 0 /* don't follow updates */ , + &tmfd); + + PopActiveSnapshot(); + + switch (res) + { + case TM_Ok: + break; + case TM_Updated: + /* XXX: Improve handling here */ + if (ItemPointerIndicatesMovedPartitions(&tmfd.ctid)) + ereport(LOG, + (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE), + errmsg("tuple to be locked was already moved to another partition due to concurrent update, retrying"))); + else + ereport(LOG, + (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE), + errmsg("concurrent update, retrying"))); + goto retry; + case TM_Deleted: + /* XXX: Improve handling here */ + ereport(LOG, + (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE), + errmsg("concurrent delete, retrying"))); + goto retry; + case TM_Invisible: + elog(ERROR, "attempted to lock invisible tuple"); + break; + default: + elog(ERROR, "unexpected table_tuple_lock status: %u", res); + break; + } + } + + index_endscan(scan); + + /* Don't release lock until commit. */ + index_close(idxrel, NoLock); + + return found; +} + +/* + * Compare the tuples in the slots by checking if they have equal values. + */ +static bool +tuples_equal(TupleTableSlot *slot1, TupleTableSlot *slot2, + TypeCacheEntry **eq) +{ + int attrnum; + + Assert(slot1->tts_tupleDescriptor->natts == + slot2->tts_tupleDescriptor->natts); + + slot_getallattrs(slot1); + slot_getallattrs(slot2); + + /* Check equality of the attributes. */ + for (attrnum = 0; attrnum < slot1->tts_tupleDescriptor->natts; attrnum++) + { + Form_pg_attribute att; + TypeCacheEntry *typentry; + + /* + * If one value is NULL and other is not, then they are certainly not + * equal + */ + if (slot1->tts_isnull[attrnum] != slot2->tts_isnull[attrnum]) + return false; + + /* + * If both are NULL, they can be considered equal. + */ + if (slot1->tts_isnull[attrnum] || slot2->tts_isnull[attrnum]) + continue; + + att = TupleDescAttr(slot1->tts_tupleDescriptor, attrnum); + + typentry = eq[attrnum]; + if (typentry == NULL) + { + typentry = lookup_type_cache(att->atttypid, + TYPECACHE_EQ_OPR_FINFO); + if (!OidIsValid(typentry->eq_opr_finfo.fn_oid)) + ereport(ERROR, + (errcode(ERRCODE_UNDEFINED_FUNCTION), + errmsg("could not identify an equality operator for type %s", + format_type_be(att->atttypid)))); + eq[attrnum] = typentry; + } + + if (!DatumGetBool(FunctionCall2Coll(&typentry->eq_opr_finfo, + att->attcollation, + slot1->tts_values[attrnum], + slot2->tts_values[attrnum]))) + return false; + } + + return true; +} + +/* + * Search the relation 'rel' for tuple using the sequential scan. + * + * If a matching tuple is found, lock it with lockmode, fill the slot with its + * contents, and return true. Return false otherwise. + * + * Note that this stops on the first matching tuple. + * + * This can obviously be quite slow on tables that have more than few rows. + */ +bool +RelationFindReplTupleSeq(Relation rel, LockTupleMode lockmode, + TupleTableSlot *searchslot, TupleTableSlot *outslot) +{ + TupleTableSlot *scanslot; + TableScanDesc scan; + SnapshotData snap; + TypeCacheEntry **eq; + TransactionId xwait; + bool found; + TupleDesc desc PG_USED_FOR_ASSERTS_ONLY = RelationGetDescr(rel); + + Assert(equalTupleDescs(desc, outslot->tts_tupleDescriptor)); + + eq = palloc0(sizeof(*eq) * outslot->tts_tupleDescriptor->natts); + + /* Start a heap scan. */ + InitDirtySnapshot(snap); + scan = table_beginscan(rel, &snap, 0, NULL); + scanslot = table_slot_create(rel, NULL); + +retry: + found = false; + + table_rescan(scan, NULL); + + /* Try to find the tuple */ + while (table_scan_getnextslot(scan, ForwardScanDirection, scanslot)) + { + if (!tuples_equal(scanslot, searchslot, eq)) + continue; + + found = true; + ExecCopySlot(outslot, scanslot); + + xwait = TransactionIdIsValid(snap.xmin) ? + snap.xmin : snap.xmax; + + /* + * If the tuple is locked, wait for locking transaction to finish and + * retry. + */ + if (TransactionIdIsValid(xwait)) + { + XactLockTableWait(xwait, NULL, NULL, XLTW_None); + goto retry; + } + + /* Found our tuple and it's not locked */ + break; + } + + /* Found tuple, try to lock it in the lockmode. */ + if (found) + { + TM_FailureData tmfd; + TM_Result res; + + PushActiveSnapshot(GetLatestSnapshot()); + + res = table_tuple_lock(rel, &(outslot->tts_tid), GetLatestSnapshot(), + outslot, + GetCurrentCommandId(false), + lockmode, + LockWaitBlock, + 0 /* don't follow updates */ , + &tmfd); + + PopActiveSnapshot(); + + switch (res) + { + case TM_Ok: + break; + case TM_Updated: + /* XXX: Improve handling here */ + if (ItemPointerIndicatesMovedPartitions(&tmfd.ctid)) + ereport(LOG, + (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE), + errmsg("tuple to be locked was already moved to another partition due to concurrent update, retrying"))); + else + ereport(LOG, + (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE), + errmsg("concurrent update, retrying"))); + goto retry; + case TM_Deleted: + /* XXX: Improve handling here */ + ereport(LOG, + (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE), + errmsg("concurrent delete, retrying"))); + goto retry; + case TM_Invisible: + elog(ERROR, "attempted to lock invisible tuple"); + break; + default: + elog(ERROR, "unexpected table_tuple_lock status: %u", res); + break; + } + } + + table_endscan(scan); + ExecDropSingleTupleTableSlot(scanslot); + + return found; +} + +/* + * Insert tuple represented in the slot to the relation, update the indexes, + * and execute any constraints and per-row triggers. + * + * Caller is responsible for opening the indexes. + */ +void +ExecSimpleRelationInsert(ResultRelInfo *resultRelInfo, + EState *estate, TupleTableSlot *slot) +{ + bool skip_tuple = false; + Relation rel = resultRelInfo->ri_RelationDesc; + + /* For now we support only tables. */ + Assert(rel->rd_rel->relkind == RELKIND_RELATION); + + CheckCmdReplicaIdentity(rel, CMD_INSERT); + + /* BEFORE ROW INSERT Triggers */ + if (resultRelInfo->ri_TrigDesc && + resultRelInfo->ri_TrigDesc->trig_insert_before_row) + { + if (!ExecBRInsertTriggers(estate, resultRelInfo, slot)) + skip_tuple = true; /* "do nothing" */ + } + + if (!skip_tuple) + { + List *recheckIndexes = NIL; + + /* Compute stored generated columns */ + if (rel->rd_att->constr && + rel->rd_att->constr->has_generated_stored) + ExecComputeStoredGenerated(resultRelInfo, estate, slot, + CMD_INSERT); + + /* Check the constraints of the tuple */ + if (rel->rd_att->constr) + ExecConstraints(resultRelInfo, slot, estate); + if (rel->rd_rel->relispartition) + ExecPartitionCheck(resultRelInfo, slot, estate, true); + + /* OK, store the tuple and create index entries for it */ + simple_table_tuple_insert(resultRelInfo->ri_RelationDesc, slot); + + if (resultRelInfo->ri_NumIndices > 0) + recheckIndexes = ExecInsertIndexTuples(resultRelInfo, + slot, estate, false, false, + NULL, NIL); + + /* AFTER ROW INSERT Triggers */ + ExecARInsertTriggers(estate, resultRelInfo, slot, + recheckIndexes, NULL); + + /* + * XXX we should in theory pass a TransitionCaptureState object to the + * above to capture transition tuples, but after statement triggers + * don't actually get fired by replication yet anyway + */ + + list_free(recheckIndexes); + } +} + +/* + * Find the searchslot tuple and update it with data in the slot, + * update the indexes, and execute any constraints and per-row triggers. + * + * Caller is responsible for opening the indexes. + */ +void +ExecSimpleRelationUpdate(ResultRelInfo *resultRelInfo, + EState *estate, EPQState *epqstate, + TupleTableSlot *searchslot, TupleTableSlot *slot) +{ + bool skip_tuple = false; + Relation rel = resultRelInfo->ri_RelationDesc; + ItemPointer tid = &(searchslot->tts_tid); + + /* For now we support only tables. */ + Assert(rel->rd_rel->relkind == RELKIND_RELATION); + + CheckCmdReplicaIdentity(rel, CMD_UPDATE); + + /* BEFORE ROW UPDATE Triggers */ + if (resultRelInfo->ri_TrigDesc && + resultRelInfo->ri_TrigDesc->trig_update_before_row) + { + if (!ExecBRUpdateTriggers(estate, epqstate, resultRelInfo, + tid, NULL, slot)) + skip_tuple = true; /* "do nothing" */ + } + + if (!skip_tuple) + { + List *recheckIndexes = NIL; + bool update_indexes; + + /* Compute stored generated columns */ + if (rel->rd_att->constr && + rel->rd_att->constr->has_generated_stored) + ExecComputeStoredGenerated(resultRelInfo, estate, slot, + CMD_UPDATE); + + /* Check the constraints of the tuple */ + if (rel->rd_att->constr) + ExecConstraints(resultRelInfo, slot, estate); + if (rel->rd_rel->relispartition) + ExecPartitionCheck(resultRelInfo, slot, estate, true); + + simple_table_tuple_update(rel, tid, slot, estate->es_snapshot, + &update_indexes); + + if (resultRelInfo->ri_NumIndices > 0 && update_indexes) + recheckIndexes = ExecInsertIndexTuples(resultRelInfo, + slot, estate, true, false, + NULL, NIL); + + /* AFTER ROW UPDATE Triggers */ + ExecARUpdateTriggers(estate, resultRelInfo, + tid, NULL, slot, + recheckIndexes, NULL); + + list_free(recheckIndexes); + } +} + +/* + * Find the searchslot tuple and delete it, and execute any constraints + * and per-row triggers. + * + * Caller is responsible for opening the indexes. + */ +void +ExecSimpleRelationDelete(ResultRelInfo *resultRelInfo, + EState *estate, EPQState *epqstate, + TupleTableSlot *searchslot) +{ + bool skip_tuple = false; + Relation rel = resultRelInfo->ri_RelationDesc; + ItemPointer tid = &searchslot->tts_tid; + + CheckCmdReplicaIdentity(rel, CMD_DELETE); + + /* BEFORE ROW DELETE Triggers */ + if (resultRelInfo->ri_TrigDesc && + resultRelInfo->ri_TrigDesc->trig_delete_before_row) + { + skip_tuple = !ExecBRDeleteTriggers(estate, epqstate, resultRelInfo, + tid, NULL, NULL); + + } + + if (!skip_tuple) + { + /* OK, delete the tuple */ + simple_table_tuple_delete(rel, tid, estate->es_snapshot); + + /* AFTER ROW DELETE Triggers */ + ExecARDeleteTriggers(estate, resultRelInfo, + tid, NULL, NULL); + } +} + +/* + * Check if command can be executed with current replica identity. + */ +void +CheckCmdReplicaIdentity(Relation rel, CmdType cmd) +{ + PublicationActions *pubactions; + + /* We only need to do checks for UPDATE and DELETE. */ + if (cmd != CMD_UPDATE && cmd != CMD_DELETE) + return; + + /* If relation has replica identity we are always good. */ + if (rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL || + OidIsValid(RelationGetReplicaIndex(rel))) + return; + + /* + * This is either UPDATE OR DELETE and there is no replica identity. + * + * Check if the table publishes UPDATES or DELETES. + */ + pubactions = GetRelationPublicationActions(rel); + if (cmd == CMD_UPDATE && pubactions->pubupdate) + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("cannot update table \"%s\" because it does not have a replica identity and publishes updates", + RelationGetRelationName(rel)), + errhint("To enable updating the table, set REPLICA IDENTITY using ALTER TABLE."))); + else if (cmd == CMD_DELETE && pubactions->pubdelete) + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("cannot delete from table \"%s\" because it does not have a replica identity and publishes deletes", + RelationGetRelationName(rel)), + errhint("To enable deleting from the table, set REPLICA IDENTITY using ALTER TABLE."))); +} + + +/* + * Check if we support writing into specific relkind. + * + * The nspname and relname are only needed for error reporting. + */ +void +CheckSubscriptionRelkind(char relkind, const char *nspname, + const char *relname) +{ + /* + * Give a more specific error for foreign tables. + */ + if (relkind == RELKIND_FOREIGN_TABLE) + ereport(ERROR, + (errcode(ERRCODE_WRONG_OBJECT_TYPE), + errmsg("cannot use relation \"%s.%s\" as logical replication target", + nspname, relname), + errdetail("\"%s.%s\" is a foreign table.", + nspname, relname))); + + if (relkind != RELKIND_RELATION && relkind != RELKIND_PARTITIONED_TABLE) + ereport(ERROR, + (errcode(ERRCODE_WRONG_OBJECT_TYPE), + errmsg("cannot use relation \"%s.%s\" as logical replication target", + nspname, relname), + errdetail("\"%s.%s\" is not a table.", + nspname, relname))); +} |