summaryrefslogtreecommitdiffstats
path: root/src/backend/executor/execReplication.c
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-05-04 12:15:05 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-05-04 12:15:05 +0000
commit46651ce6fe013220ed397add242004d764fc0153 (patch)
tree6e5299f990f88e60174a1d3ae6e48eedd2688b2b /src/backend/executor/execReplication.c
parentInitial commit. (diff)
downloadpostgresql-14-46651ce6fe013220ed397add242004d764fc0153.tar.xz
postgresql-14-46651ce6fe013220ed397add242004d764fc0153.zip
Adding upstream version 14.5.upstream/14.5upstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'src/backend/executor/execReplication.c')
-rw-r--r--src/backend/executor/execReplication.c629
1 files changed, 629 insertions, 0 deletions
diff --git a/src/backend/executor/execReplication.c b/src/backend/executor/execReplication.c
new file mode 100644
index 0000000..1e285e0
--- /dev/null
+++ b/src/backend/executor/execReplication.c
@@ -0,0 +1,629 @@
+/*-------------------------------------------------------------------------
+ *
+ * execReplication.c
+ * miscellaneous executor routines for logical replication
+ *
+ * Portions Copyright (c) 1996-2021, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * IDENTIFICATION
+ * src/backend/executor/execReplication.c
+ *
+ *-------------------------------------------------------------------------
+ */
+
+#include "postgres.h"
+
+#include "access/genam.h"
+#include "access/relscan.h"
+#include "access/tableam.h"
+#include "access/transam.h"
+#include "access/xact.h"
+#include "commands/trigger.h"
+#include "executor/executor.h"
+#include "executor/nodeModifyTable.h"
+#include "nodes/nodeFuncs.h"
+#include "parser/parse_relation.h"
+#include "parser/parsetree.h"
+#include "storage/bufmgr.h"
+#include "storage/lmgr.h"
+#include "utils/builtins.h"
+#include "utils/datum.h"
+#include "utils/lsyscache.h"
+#include "utils/memutils.h"
+#include "utils/rel.h"
+#include "utils/snapmgr.h"
+#include "utils/syscache.h"
+#include "utils/typcache.h"
+
+
+/*
+ * Setup a ScanKey for a search in the relation 'rel' for a tuple 'key' that
+ * is setup to match 'rel' (*NOT* idxrel!).
+ *
+ * Returns whether any column contains NULLs.
+ *
+ * This is not generic routine, it expects the idxrel to be replication
+ * identity of a rel and meet all limitations associated with that.
+ */
+static bool
+build_replindex_scan_key(ScanKey skey, Relation rel, Relation idxrel,
+ TupleTableSlot *searchslot)
+{
+ int attoff;
+ bool isnull;
+ Datum indclassDatum;
+ oidvector *opclass;
+ int2vector *indkey = &idxrel->rd_index->indkey;
+ bool hasnulls = false;
+
+ Assert(RelationGetReplicaIndex(rel) == RelationGetRelid(idxrel) ||
+ RelationGetPrimaryKeyIndex(rel) == RelationGetRelid(idxrel));
+
+ indclassDatum = SysCacheGetAttr(INDEXRELID, idxrel->rd_indextuple,
+ Anum_pg_index_indclass, &isnull);
+ Assert(!isnull);
+ opclass = (oidvector *) DatumGetPointer(indclassDatum);
+
+ /* Build scankey for every attribute in the index. */
+ for (attoff = 0; attoff < IndexRelationGetNumberOfKeyAttributes(idxrel); attoff++)
+ {
+ Oid operator;
+ Oid opfamily;
+ RegProcedure regop;
+ int pkattno = attoff + 1;
+ int mainattno = indkey->values[attoff];
+ Oid optype = get_opclass_input_type(opclass->values[attoff]);
+
+ /*
+ * Load the operator info. We need this to get the equality operator
+ * function for the scan key.
+ */
+ opfamily = get_opclass_family(opclass->values[attoff]);
+
+ operator = get_opfamily_member(opfamily, optype,
+ optype,
+ BTEqualStrategyNumber);
+ if (!OidIsValid(operator))
+ elog(ERROR, "missing operator %d(%u,%u) in opfamily %u",
+ BTEqualStrategyNumber, optype, optype, opfamily);
+
+ regop = get_opcode(operator);
+
+ /* Initialize the scankey. */
+ ScanKeyInit(&skey[attoff],
+ pkattno,
+ BTEqualStrategyNumber,
+ regop,
+ searchslot->tts_values[mainattno - 1]);
+
+ skey[attoff].sk_collation = idxrel->rd_indcollation[attoff];
+
+ /* Check for null value. */
+ if (searchslot->tts_isnull[mainattno - 1])
+ {
+ hasnulls = true;
+ skey[attoff].sk_flags |= SK_ISNULL;
+ }
+ }
+
+ return hasnulls;
+}
+
+/*
+ * Search the relation 'rel' for tuple using the index.
+ *
+ * If a matching tuple is found, lock it with lockmode, fill the slot with its
+ * contents, and return true. Return false otherwise.
+ */
+bool
+RelationFindReplTupleByIndex(Relation rel, Oid idxoid,
+ LockTupleMode lockmode,
+ TupleTableSlot *searchslot,
+ TupleTableSlot *outslot)
+{
+ ScanKeyData skey[INDEX_MAX_KEYS];
+ IndexScanDesc scan;
+ SnapshotData snap;
+ TransactionId xwait;
+ Relation idxrel;
+ bool found;
+
+ /* Open the index. */
+ idxrel = index_open(idxoid, RowExclusiveLock);
+
+ /* Start an index scan. */
+ InitDirtySnapshot(snap);
+ scan = index_beginscan(rel, idxrel, &snap,
+ IndexRelationGetNumberOfKeyAttributes(idxrel),
+ 0);
+
+ /* Build scan key. */
+ build_replindex_scan_key(skey, rel, idxrel, searchslot);
+
+retry:
+ found = false;
+
+ index_rescan(scan, skey, IndexRelationGetNumberOfKeyAttributes(idxrel), NULL, 0);
+
+ /* Try to find the tuple */
+ if (index_getnext_slot(scan, ForwardScanDirection, outslot))
+ {
+ found = true;
+ ExecMaterializeSlot(outslot);
+
+ xwait = TransactionIdIsValid(snap.xmin) ?
+ snap.xmin : snap.xmax;
+
+ /*
+ * If the tuple is locked, wait for locking transaction to finish and
+ * retry.
+ */
+ if (TransactionIdIsValid(xwait))
+ {
+ XactLockTableWait(xwait, NULL, NULL, XLTW_None);
+ goto retry;
+ }
+ }
+
+ /* Found tuple, try to lock it in the lockmode. */
+ if (found)
+ {
+ TM_FailureData tmfd;
+ TM_Result res;
+
+ PushActiveSnapshot(GetLatestSnapshot());
+
+ res = table_tuple_lock(rel, &(outslot->tts_tid), GetLatestSnapshot(),
+ outslot,
+ GetCurrentCommandId(false),
+ lockmode,
+ LockWaitBlock,
+ 0 /* don't follow updates */ ,
+ &tmfd);
+
+ PopActiveSnapshot();
+
+ switch (res)
+ {
+ case TM_Ok:
+ break;
+ case TM_Updated:
+ /* XXX: Improve handling here */
+ if (ItemPointerIndicatesMovedPartitions(&tmfd.ctid))
+ ereport(LOG,
+ (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
+ errmsg("tuple to be locked was already moved to another partition due to concurrent update, retrying")));
+ else
+ ereport(LOG,
+ (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
+ errmsg("concurrent update, retrying")));
+ goto retry;
+ case TM_Deleted:
+ /* XXX: Improve handling here */
+ ereport(LOG,
+ (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
+ errmsg("concurrent delete, retrying")));
+ goto retry;
+ case TM_Invisible:
+ elog(ERROR, "attempted to lock invisible tuple");
+ break;
+ default:
+ elog(ERROR, "unexpected table_tuple_lock status: %u", res);
+ break;
+ }
+ }
+
+ index_endscan(scan);
+
+ /* Don't release lock until commit. */
+ index_close(idxrel, NoLock);
+
+ return found;
+}
+
+/*
+ * Compare the tuples in the slots by checking if they have equal values.
+ */
+static bool
+tuples_equal(TupleTableSlot *slot1, TupleTableSlot *slot2,
+ TypeCacheEntry **eq)
+{
+ int attrnum;
+
+ Assert(slot1->tts_tupleDescriptor->natts ==
+ slot2->tts_tupleDescriptor->natts);
+
+ slot_getallattrs(slot1);
+ slot_getallattrs(slot2);
+
+ /* Check equality of the attributes. */
+ for (attrnum = 0; attrnum < slot1->tts_tupleDescriptor->natts; attrnum++)
+ {
+ Form_pg_attribute att;
+ TypeCacheEntry *typentry;
+
+ /*
+ * If one value is NULL and other is not, then they are certainly not
+ * equal
+ */
+ if (slot1->tts_isnull[attrnum] != slot2->tts_isnull[attrnum])
+ return false;
+
+ /*
+ * If both are NULL, they can be considered equal.
+ */
+ if (slot1->tts_isnull[attrnum] || slot2->tts_isnull[attrnum])
+ continue;
+
+ att = TupleDescAttr(slot1->tts_tupleDescriptor, attrnum);
+
+ typentry = eq[attrnum];
+ if (typentry == NULL)
+ {
+ typentry = lookup_type_cache(att->atttypid,
+ TYPECACHE_EQ_OPR_FINFO);
+ if (!OidIsValid(typentry->eq_opr_finfo.fn_oid))
+ ereport(ERROR,
+ (errcode(ERRCODE_UNDEFINED_FUNCTION),
+ errmsg("could not identify an equality operator for type %s",
+ format_type_be(att->atttypid))));
+ eq[attrnum] = typentry;
+ }
+
+ if (!DatumGetBool(FunctionCall2Coll(&typentry->eq_opr_finfo,
+ att->attcollation,
+ slot1->tts_values[attrnum],
+ slot2->tts_values[attrnum])))
+ return false;
+ }
+
+ return true;
+}
+
+/*
+ * Search the relation 'rel' for tuple using the sequential scan.
+ *
+ * If a matching tuple is found, lock it with lockmode, fill the slot with its
+ * contents, and return true. Return false otherwise.
+ *
+ * Note that this stops on the first matching tuple.
+ *
+ * This can obviously be quite slow on tables that have more than few rows.
+ */
+bool
+RelationFindReplTupleSeq(Relation rel, LockTupleMode lockmode,
+ TupleTableSlot *searchslot, TupleTableSlot *outslot)
+{
+ TupleTableSlot *scanslot;
+ TableScanDesc scan;
+ SnapshotData snap;
+ TypeCacheEntry **eq;
+ TransactionId xwait;
+ bool found;
+ TupleDesc desc PG_USED_FOR_ASSERTS_ONLY = RelationGetDescr(rel);
+
+ Assert(equalTupleDescs(desc, outslot->tts_tupleDescriptor));
+
+ eq = palloc0(sizeof(*eq) * outslot->tts_tupleDescriptor->natts);
+
+ /* Start a heap scan. */
+ InitDirtySnapshot(snap);
+ scan = table_beginscan(rel, &snap, 0, NULL);
+ scanslot = table_slot_create(rel, NULL);
+
+retry:
+ found = false;
+
+ table_rescan(scan, NULL);
+
+ /* Try to find the tuple */
+ while (table_scan_getnextslot(scan, ForwardScanDirection, scanslot))
+ {
+ if (!tuples_equal(scanslot, searchslot, eq))
+ continue;
+
+ found = true;
+ ExecCopySlot(outslot, scanslot);
+
+ xwait = TransactionIdIsValid(snap.xmin) ?
+ snap.xmin : snap.xmax;
+
+ /*
+ * If the tuple is locked, wait for locking transaction to finish and
+ * retry.
+ */
+ if (TransactionIdIsValid(xwait))
+ {
+ XactLockTableWait(xwait, NULL, NULL, XLTW_None);
+ goto retry;
+ }
+
+ /* Found our tuple and it's not locked */
+ break;
+ }
+
+ /* Found tuple, try to lock it in the lockmode. */
+ if (found)
+ {
+ TM_FailureData tmfd;
+ TM_Result res;
+
+ PushActiveSnapshot(GetLatestSnapshot());
+
+ res = table_tuple_lock(rel, &(outslot->tts_tid), GetLatestSnapshot(),
+ outslot,
+ GetCurrentCommandId(false),
+ lockmode,
+ LockWaitBlock,
+ 0 /* don't follow updates */ ,
+ &tmfd);
+
+ PopActiveSnapshot();
+
+ switch (res)
+ {
+ case TM_Ok:
+ break;
+ case TM_Updated:
+ /* XXX: Improve handling here */
+ if (ItemPointerIndicatesMovedPartitions(&tmfd.ctid))
+ ereport(LOG,
+ (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
+ errmsg("tuple to be locked was already moved to another partition due to concurrent update, retrying")));
+ else
+ ereport(LOG,
+ (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
+ errmsg("concurrent update, retrying")));
+ goto retry;
+ case TM_Deleted:
+ /* XXX: Improve handling here */
+ ereport(LOG,
+ (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
+ errmsg("concurrent delete, retrying")));
+ goto retry;
+ case TM_Invisible:
+ elog(ERROR, "attempted to lock invisible tuple");
+ break;
+ default:
+ elog(ERROR, "unexpected table_tuple_lock status: %u", res);
+ break;
+ }
+ }
+
+ table_endscan(scan);
+ ExecDropSingleTupleTableSlot(scanslot);
+
+ return found;
+}
+
+/*
+ * Insert tuple represented in the slot to the relation, update the indexes,
+ * and execute any constraints and per-row triggers.
+ *
+ * Caller is responsible for opening the indexes.
+ */
+void
+ExecSimpleRelationInsert(ResultRelInfo *resultRelInfo,
+ EState *estate, TupleTableSlot *slot)
+{
+ bool skip_tuple = false;
+ Relation rel = resultRelInfo->ri_RelationDesc;
+
+ /* For now we support only tables. */
+ Assert(rel->rd_rel->relkind == RELKIND_RELATION);
+
+ CheckCmdReplicaIdentity(rel, CMD_INSERT);
+
+ /* BEFORE ROW INSERT Triggers */
+ if (resultRelInfo->ri_TrigDesc &&
+ resultRelInfo->ri_TrigDesc->trig_insert_before_row)
+ {
+ if (!ExecBRInsertTriggers(estate, resultRelInfo, slot))
+ skip_tuple = true; /* "do nothing" */
+ }
+
+ if (!skip_tuple)
+ {
+ List *recheckIndexes = NIL;
+
+ /* Compute stored generated columns */
+ if (rel->rd_att->constr &&
+ rel->rd_att->constr->has_generated_stored)
+ ExecComputeStoredGenerated(resultRelInfo, estate, slot,
+ CMD_INSERT);
+
+ /* Check the constraints of the tuple */
+ if (rel->rd_att->constr)
+ ExecConstraints(resultRelInfo, slot, estate);
+ if (rel->rd_rel->relispartition)
+ ExecPartitionCheck(resultRelInfo, slot, estate, true);
+
+ /* OK, store the tuple and create index entries for it */
+ simple_table_tuple_insert(resultRelInfo->ri_RelationDesc, slot);
+
+ if (resultRelInfo->ri_NumIndices > 0)
+ recheckIndexes = ExecInsertIndexTuples(resultRelInfo,
+ slot, estate, false, false,
+ NULL, NIL);
+
+ /* AFTER ROW INSERT Triggers */
+ ExecARInsertTriggers(estate, resultRelInfo, slot,
+ recheckIndexes, NULL);
+
+ /*
+ * XXX we should in theory pass a TransitionCaptureState object to the
+ * above to capture transition tuples, but after statement triggers
+ * don't actually get fired by replication yet anyway
+ */
+
+ list_free(recheckIndexes);
+ }
+}
+
+/*
+ * Find the searchslot tuple and update it with data in the slot,
+ * update the indexes, and execute any constraints and per-row triggers.
+ *
+ * Caller is responsible for opening the indexes.
+ */
+void
+ExecSimpleRelationUpdate(ResultRelInfo *resultRelInfo,
+ EState *estate, EPQState *epqstate,
+ TupleTableSlot *searchslot, TupleTableSlot *slot)
+{
+ bool skip_tuple = false;
+ Relation rel = resultRelInfo->ri_RelationDesc;
+ ItemPointer tid = &(searchslot->tts_tid);
+
+ /* For now we support only tables. */
+ Assert(rel->rd_rel->relkind == RELKIND_RELATION);
+
+ CheckCmdReplicaIdentity(rel, CMD_UPDATE);
+
+ /* BEFORE ROW UPDATE Triggers */
+ if (resultRelInfo->ri_TrigDesc &&
+ resultRelInfo->ri_TrigDesc->trig_update_before_row)
+ {
+ if (!ExecBRUpdateTriggers(estate, epqstate, resultRelInfo,
+ tid, NULL, slot))
+ skip_tuple = true; /* "do nothing" */
+ }
+
+ if (!skip_tuple)
+ {
+ List *recheckIndexes = NIL;
+ bool update_indexes;
+
+ /* Compute stored generated columns */
+ if (rel->rd_att->constr &&
+ rel->rd_att->constr->has_generated_stored)
+ ExecComputeStoredGenerated(resultRelInfo, estate, slot,
+ CMD_UPDATE);
+
+ /* Check the constraints of the tuple */
+ if (rel->rd_att->constr)
+ ExecConstraints(resultRelInfo, slot, estate);
+ if (rel->rd_rel->relispartition)
+ ExecPartitionCheck(resultRelInfo, slot, estate, true);
+
+ simple_table_tuple_update(rel, tid, slot, estate->es_snapshot,
+ &update_indexes);
+
+ if (resultRelInfo->ri_NumIndices > 0 && update_indexes)
+ recheckIndexes = ExecInsertIndexTuples(resultRelInfo,
+ slot, estate, true, false,
+ NULL, NIL);
+
+ /* AFTER ROW UPDATE Triggers */
+ ExecARUpdateTriggers(estate, resultRelInfo,
+ tid, NULL, slot,
+ recheckIndexes, NULL);
+
+ list_free(recheckIndexes);
+ }
+}
+
+/*
+ * Find the searchslot tuple and delete it, and execute any constraints
+ * and per-row triggers.
+ *
+ * Caller is responsible for opening the indexes.
+ */
+void
+ExecSimpleRelationDelete(ResultRelInfo *resultRelInfo,
+ EState *estate, EPQState *epqstate,
+ TupleTableSlot *searchslot)
+{
+ bool skip_tuple = false;
+ Relation rel = resultRelInfo->ri_RelationDesc;
+ ItemPointer tid = &searchslot->tts_tid;
+
+ CheckCmdReplicaIdentity(rel, CMD_DELETE);
+
+ /* BEFORE ROW DELETE Triggers */
+ if (resultRelInfo->ri_TrigDesc &&
+ resultRelInfo->ri_TrigDesc->trig_delete_before_row)
+ {
+ skip_tuple = !ExecBRDeleteTriggers(estate, epqstate, resultRelInfo,
+ tid, NULL, NULL);
+
+ }
+
+ if (!skip_tuple)
+ {
+ /* OK, delete the tuple */
+ simple_table_tuple_delete(rel, tid, estate->es_snapshot);
+
+ /* AFTER ROW DELETE Triggers */
+ ExecARDeleteTriggers(estate, resultRelInfo,
+ tid, NULL, NULL);
+ }
+}
+
+/*
+ * Check if command can be executed with current replica identity.
+ */
+void
+CheckCmdReplicaIdentity(Relation rel, CmdType cmd)
+{
+ PublicationActions *pubactions;
+
+ /* We only need to do checks for UPDATE and DELETE. */
+ if (cmd != CMD_UPDATE && cmd != CMD_DELETE)
+ return;
+
+ /* If relation has replica identity we are always good. */
+ if (rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL ||
+ OidIsValid(RelationGetReplicaIndex(rel)))
+ return;
+
+ /*
+ * This is either UPDATE OR DELETE and there is no replica identity.
+ *
+ * Check if the table publishes UPDATES or DELETES.
+ */
+ pubactions = GetRelationPublicationActions(rel);
+ if (cmd == CMD_UPDATE && pubactions->pubupdate)
+ ereport(ERROR,
+ (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+ errmsg("cannot update table \"%s\" because it does not have a replica identity and publishes updates",
+ RelationGetRelationName(rel)),
+ errhint("To enable updating the table, set REPLICA IDENTITY using ALTER TABLE.")));
+ else if (cmd == CMD_DELETE && pubactions->pubdelete)
+ ereport(ERROR,
+ (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+ errmsg("cannot delete from table \"%s\" because it does not have a replica identity and publishes deletes",
+ RelationGetRelationName(rel)),
+ errhint("To enable deleting from the table, set REPLICA IDENTITY using ALTER TABLE.")));
+}
+
+
+/*
+ * Check if we support writing into specific relkind.
+ *
+ * The nspname and relname are only needed for error reporting.
+ */
+void
+CheckSubscriptionRelkind(char relkind, const char *nspname,
+ const char *relname)
+{
+ /*
+ * Give a more specific error for foreign tables.
+ */
+ if (relkind == RELKIND_FOREIGN_TABLE)
+ ereport(ERROR,
+ (errcode(ERRCODE_WRONG_OBJECT_TYPE),
+ errmsg("cannot use relation \"%s.%s\" as logical replication target",
+ nspname, relname),
+ errdetail("\"%s.%s\" is a foreign table.",
+ nspname, relname)));
+
+ if (relkind != RELKIND_RELATION && relkind != RELKIND_PARTITIONED_TABLE)
+ ereport(ERROR,
+ (errcode(ERRCODE_WRONG_OBJECT_TYPE),
+ errmsg("cannot use relation \"%s.%s\" as logical replication target",
+ nspname, relname),
+ errdetail("\"%s.%s\" is not a table.",
+ nspname, relname)));
+}