summaryrefslogtreecommitdiffstats
path: root/src/backend/catalog/pg_subscription.c
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-05-04 12:19:15 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-05-04 12:19:15 +0000
commit6eb9c5a5657d1fe77b55cc261450f3538d35a94d (patch)
tree657d8194422a5daccecfd42d654b8a245ef7b4c8 /src/backend/catalog/pg_subscription.c
parentInitial commit. (diff)
downloadpostgresql-13-6eb9c5a5657d1fe77b55cc261450f3538d35a94d.tar.xz
postgresql-13-6eb9c5a5657d1fe77b55cc261450f3538d35a94d.zip
Adding upstream version 13.4.upstream/13.4upstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'src/backend/catalog/pg_subscription.c')
-rw-r--r--src/backend/catalog/pg_subscription.c536
1 files changed, 536 insertions, 0 deletions
diff --git a/src/backend/catalog/pg_subscription.c b/src/backend/catalog/pg_subscription.c
new file mode 100644
index 0000000..0d23b20
--- /dev/null
+++ b/src/backend/catalog/pg_subscription.c
@@ -0,0 +1,536 @@
+/*-------------------------------------------------------------------------
+ *
+ * pg_subscription.c
+ * replication subscriptions
+ *
+ * Portions Copyright (c) 1996-2020, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * IDENTIFICATION
+ * src/backend/catalog/pg_subscription.c
+ *
+ *-------------------------------------------------------------------------
+ */
+
+#include "postgres.h"
+
+#include "access/genam.h"
+#include "access/heapam.h"
+#include "access/htup_details.h"
+#include "access/tableam.h"
+#include "access/xact.h"
+#include "catalog/indexing.h"
+#include "catalog/pg_subscription.h"
+#include "catalog/pg_subscription_rel.h"
+#include "catalog/pg_type.h"
+#include "miscadmin.h"
+#include "nodes/makefuncs.h"
+#include "storage/lmgr.h"
+#include "utils/array.h"
+#include "utils/builtins.h"
+#include "utils/fmgroids.h"
+#include "utils/pg_lsn.h"
+#include "utils/rel.h"
+#include "utils/syscache.h"
+
+static List *textarray_to_stringlist(ArrayType *textarray);
+
+/*
+ * Fetch the subscription from the syscache.
+ */
+Subscription *
+GetSubscription(Oid subid, bool missing_ok)
+{
+ HeapTuple tup;
+ Subscription *sub;
+ Form_pg_subscription subform;
+ Datum datum;
+ bool isnull;
+
+ tup = SearchSysCache1(SUBSCRIPTIONOID, ObjectIdGetDatum(subid));
+
+ if (!HeapTupleIsValid(tup))
+ {
+ if (missing_ok)
+ return NULL;
+
+ elog(ERROR, "cache lookup failed for subscription %u", subid);
+ }
+
+ subform = (Form_pg_subscription) GETSTRUCT(tup);
+
+ sub = (Subscription *) palloc(sizeof(Subscription));
+ sub->oid = subid;
+ sub->dbid = subform->subdbid;
+ sub->name = pstrdup(NameStr(subform->subname));
+ sub->owner = subform->subowner;
+ sub->enabled = subform->subenabled;
+
+ /* Get conninfo */
+ datum = SysCacheGetAttr(SUBSCRIPTIONOID,
+ tup,
+ Anum_pg_subscription_subconninfo,
+ &isnull);
+ Assert(!isnull);
+ sub->conninfo = TextDatumGetCString(datum);
+
+ /* Get slotname */
+ datum = SysCacheGetAttr(SUBSCRIPTIONOID,
+ tup,
+ Anum_pg_subscription_subslotname,
+ &isnull);
+ if (!isnull)
+ sub->slotname = pstrdup(NameStr(*DatumGetName(datum)));
+ else
+ sub->slotname = NULL;
+
+ /* Get synccommit */
+ datum = SysCacheGetAttr(SUBSCRIPTIONOID,
+ tup,
+ Anum_pg_subscription_subsynccommit,
+ &isnull);
+ Assert(!isnull);
+ sub->synccommit = TextDatumGetCString(datum);
+
+ /* Get publications */
+ datum = SysCacheGetAttr(SUBSCRIPTIONOID,
+ tup,
+ Anum_pg_subscription_subpublications,
+ &isnull);
+ Assert(!isnull);
+ sub->publications = textarray_to_stringlist(DatumGetArrayTypeP(datum));
+
+ ReleaseSysCache(tup);
+
+ return sub;
+}
+
+/*
+ * Return number of subscriptions defined in given database.
+ * Used by dropdb() to check if database can indeed be dropped.
+ */
+int
+CountDBSubscriptions(Oid dbid)
+{
+ int nsubs = 0;
+ Relation rel;
+ ScanKeyData scankey;
+ SysScanDesc scan;
+ HeapTuple tup;
+
+ rel = table_open(SubscriptionRelationId, RowExclusiveLock);
+
+ ScanKeyInit(&scankey,
+ Anum_pg_subscription_subdbid,
+ BTEqualStrategyNumber, F_OIDEQ,
+ ObjectIdGetDatum(dbid));
+
+ scan = systable_beginscan(rel, InvalidOid, false,
+ NULL, 1, &scankey);
+
+ while (HeapTupleIsValid(tup = systable_getnext(scan)))
+ nsubs++;
+
+ systable_endscan(scan);
+
+ table_close(rel, NoLock);
+
+ return nsubs;
+}
+
+/*
+ * Free memory allocated by subscription struct.
+ */
+void
+FreeSubscription(Subscription *sub)
+{
+ pfree(sub->name);
+ pfree(sub->conninfo);
+ if (sub->slotname)
+ pfree(sub->slotname);
+ list_free_deep(sub->publications);
+ pfree(sub);
+}
+
+/*
+ * get_subscription_oid - given a subscription name, look up the OID
+ *
+ * If missing_ok is false, throw an error if name not found. If true, just
+ * return InvalidOid.
+ */
+Oid
+get_subscription_oid(const char *subname, bool missing_ok)
+{
+ Oid oid;
+
+ oid = GetSysCacheOid2(SUBSCRIPTIONNAME, Anum_pg_subscription_oid,
+ MyDatabaseId, CStringGetDatum(subname));
+ if (!OidIsValid(oid) && !missing_ok)
+ ereport(ERROR,
+ (errcode(ERRCODE_UNDEFINED_OBJECT),
+ errmsg("subscription \"%s\" does not exist", subname)));
+ return oid;
+}
+
+/*
+ * get_subscription_name - given a subscription OID, look up the name
+ *
+ * If missing_ok is false, throw an error if name not found. If true, just
+ * return NULL.
+ */
+char *
+get_subscription_name(Oid subid, bool missing_ok)
+{
+ HeapTuple tup;
+ char *subname;
+ Form_pg_subscription subform;
+
+ tup = SearchSysCache1(SUBSCRIPTIONOID, ObjectIdGetDatum(subid));
+
+ if (!HeapTupleIsValid(tup))
+ {
+ if (!missing_ok)
+ elog(ERROR, "cache lookup failed for subscription %u", subid);
+ return NULL;
+ }
+
+ subform = (Form_pg_subscription) GETSTRUCT(tup);
+ subname = pstrdup(NameStr(subform->subname));
+
+ ReleaseSysCache(tup);
+
+ return subname;
+}
+
+/*
+ * Convert text array to list of strings.
+ *
+ * Note: the resulting list of strings is pallocated here.
+ */
+static List *
+textarray_to_stringlist(ArrayType *textarray)
+{
+ Datum *elems;
+ int nelems,
+ i;
+ List *res = NIL;
+
+ deconstruct_array(textarray,
+ TEXTOID, -1, false, TYPALIGN_INT,
+ &elems, NULL, &nelems);
+
+ if (nelems == 0)
+ return NIL;
+
+ for (i = 0; i < nelems; i++)
+ res = lappend(res, makeString(TextDatumGetCString(elems[i])));
+
+ return res;
+}
+
+/*
+ * Add new state record for a subscription table.
+ */
+void
+AddSubscriptionRelState(Oid subid, Oid relid, char state,
+ XLogRecPtr sublsn)
+{
+ Relation rel;
+ HeapTuple tup;
+ bool nulls[Natts_pg_subscription_rel];
+ Datum values[Natts_pg_subscription_rel];
+
+ LockSharedObject(SubscriptionRelationId, subid, 0, AccessShareLock);
+
+ rel = table_open(SubscriptionRelRelationId, RowExclusiveLock);
+
+ /* Try finding existing mapping. */
+ tup = SearchSysCacheCopy2(SUBSCRIPTIONRELMAP,
+ ObjectIdGetDatum(relid),
+ ObjectIdGetDatum(subid));
+ if (HeapTupleIsValid(tup))
+ elog(ERROR, "subscription table %u in subscription %u already exists",
+ relid, subid);
+
+ /* Form the tuple. */
+ memset(values, 0, sizeof(values));
+ memset(nulls, false, sizeof(nulls));
+ values[Anum_pg_subscription_rel_srsubid - 1] = ObjectIdGetDatum(subid);
+ values[Anum_pg_subscription_rel_srrelid - 1] = ObjectIdGetDatum(relid);
+ values[Anum_pg_subscription_rel_srsubstate - 1] = CharGetDatum(state);
+ if (sublsn != InvalidXLogRecPtr)
+ values[Anum_pg_subscription_rel_srsublsn - 1] = LSNGetDatum(sublsn);
+ else
+ nulls[Anum_pg_subscription_rel_srsublsn - 1] = true;
+
+ tup = heap_form_tuple(RelationGetDescr(rel), values, nulls);
+
+ /* Insert tuple into catalog. */
+ CatalogTupleInsert(rel, tup);
+
+ heap_freetuple(tup);
+
+ /* Cleanup. */
+ table_close(rel, NoLock);
+}
+
+/*
+ * Update the state of a subscription table.
+ */
+void
+UpdateSubscriptionRelState(Oid subid, Oid relid, char state,
+ XLogRecPtr sublsn)
+{
+ Relation rel;
+ HeapTuple tup;
+ bool nulls[Natts_pg_subscription_rel];
+ Datum values[Natts_pg_subscription_rel];
+ bool replaces[Natts_pg_subscription_rel];
+
+ LockSharedObject(SubscriptionRelationId, subid, 0, AccessShareLock);
+
+ rel = table_open(SubscriptionRelRelationId, RowExclusiveLock);
+
+ /* Try finding existing mapping. */
+ tup = SearchSysCacheCopy2(SUBSCRIPTIONRELMAP,
+ ObjectIdGetDatum(relid),
+ ObjectIdGetDatum(subid));
+ if (!HeapTupleIsValid(tup))
+ elog(ERROR, "subscription table %u in subscription %u does not exist",
+ relid, subid);
+
+ /* Update the tuple. */
+ memset(values, 0, sizeof(values));
+ memset(nulls, false, sizeof(nulls));
+ memset(replaces, false, sizeof(replaces));
+
+ replaces[Anum_pg_subscription_rel_srsubstate - 1] = true;
+ values[Anum_pg_subscription_rel_srsubstate - 1] = CharGetDatum(state);
+
+ replaces[Anum_pg_subscription_rel_srsublsn - 1] = true;
+ if (sublsn != InvalidXLogRecPtr)
+ values[Anum_pg_subscription_rel_srsublsn - 1] = LSNGetDatum(sublsn);
+ else
+ nulls[Anum_pg_subscription_rel_srsublsn - 1] = true;
+
+ tup = heap_modify_tuple(tup, RelationGetDescr(rel), values, nulls,
+ replaces);
+
+ /* Update the catalog. */
+ CatalogTupleUpdate(rel, &tup->t_self, tup);
+
+ /* Cleanup. */
+ table_close(rel, NoLock);
+}
+
+/*
+ * Get state of subscription table.
+ *
+ * Returns SUBREL_STATE_UNKNOWN when not found and missing_ok is true.
+ */
+char
+GetSubscriptionRelState(Oid subid, Oid relid, XLogRecPtr *sublsn,
+ bool missing_ok)
+{
+ Relation rel;
+ HeapTuple tup;
+ char substate;
+ bool isnull;
+ Datum d;
+
+ rel = table_open(SubscriptionRelRelationId, AccessShareLock);
+
+ /* Try finding the mapping. */
+ tup = SearchSysCache2(SUBSCRIPTIONRELMAP,
+ ObjectIdGetDatum(relid),
+ ObjectIdGetDatum(subid));
+
+ if (!HeapTupleIsValid(tup))
+ {
+ if (missing_ok)
+ {
+ table_close(rel, AccessShareLock);
+ *sublsn = InvalidXLogRecPtr;
+ return SUBREL_STATE_UNKNOWN;
+ }
+
+ elog(ERROR, "subscription table %u in subscription %u does not exist",
+ relid, subid);
+ }
+
+ /* Get the state. */
+ d = SysCacheGetAttr(SUBSCRIPTIONRELMAP, tup,
+ Anum_pg_subscription_rel_srsubstate, &isnull);
+ Assert(!isnull);
+ substate = DatumGetChar(d);
+ d = SysCacheGetAttr(SUBSCRIPTIONRELMAP, tup,
+ Anum_pg_subscription_rel_srsublsn, &isnull);
+ if (isnull)
+ *sublsn = InvalidXLogRecPtr;
+ else
+ *sublsn = DatumGetLSN(d);
+
+ /* Cleanup */
+ ReleaseSysCache(tup);
+ table_close(rel, AccessShareLock);
+
+ return substate;
+}
+
+/*
+ * Drop subscription relation mapping. These can be for a particular
+ * subscription, or for a particular relation, or both.
+ */
+void
+RemoveSubscriptionRel(Oid subid, Oid relid)
+{
+ Relation rel;
+ TableScanDesc scan;
+ ScanKeyData skey[2];
+ HeapTuple tup;
+ int nkeys = 0;
+
+ rel = table_open(SubscriptionRelRelationId, RowExclusiveLock);
+
+ if (OidIsValid(subid))
+ {
+ ScanKeyInit(&skey[nkeys++],
+ Anum_pg_subscription_rel_srsubid,
+ BTEqualStrategyNumber,
+ F_OIDEQ,
+ ObjectIdGetDatum(subid));
+ }
+
+ if (OidIsValid(relid))
+ {
+ ScanKeyInit(&skey[nkeys++],
+ Anum_pg_subscription_rel_srrelid,
+ BTEqualStrategyNumber,
+ F_OIDEQ,
+ ObjectIdGetDatum(relid));
+ }
+
+ /* Do the search and delete what we found. */
+ scan = table_beginscan_catalog(rel, nkeys, skey);
+ while (HeapTupleIsValid(tup = heap_getnext(scan, ForwardScanDirection)))
+ {
+ CatalogTupleDelete(rel, &tup->t_self);
+ }
+ table_endscan(scan);
+
+ table_close(rel, RowExclusiveLock);
+}
+
+
+/*
+ * Get all relations for subscription.
+ *
+ * Returned list is palloc'ed in current memory context.
+ */
+List *
+GetSubscriptionRelations(Oid subid)
+{
+ List *res = NIL;
+ Relation rel;
+ HeapTuple tup;
+ int nkeys = 0;
+ ScanKeyData skey[2];
+ SysScanDesc scan;
+
+ rel = table_open(SubscriptionRelRelationId, AccessShareLock);
+
+ ScanKeyInit(&skey[nkeys++],
+ Anum_pg_subscription_rel_srsubid,
+ BTEqualStrategyNumber, F_OIDEQ,
+ ObjectIdGetDatum(subid));
+
+ scan = systable_beginscan(rel, InvalidOid, false,
+ NULL, nkeys, skey);
+
+ while (HeapTupleIsValid(tup = systable_getnext(scan)))
+ {
+ Form_pg_subscription_rel subrel;
+ SubscriptionRelState *relstate;
+ Datum d;
+ bool isnull;
+
+ subrel = (Form_pg_subscription_rel) GETSTRUCT(tup);
+
+ relstate = (SubscriptionRelState *) palloc(sizeof(SubscriptionRelState));
+ relstate->relid = subrel->srrelid;
+ relstate->state = subrel->srsubstate;
+ d = SysCacheGetAttr(SUBSCRIPTIONRELMAP, tup,
+ Anum_pg_subscription_rel_srsublsn, &isnull);
+ if (isnull)
+ relstate->lsn = InvalidXLogRecPtr;
+ else
+ relstate->lsn = DatumGetLSN(d);
+
+ res = lappend(res, relstate);
+ }
+
+ /* Cleanup */
+ systable_endscan(scan);
+ table_close(rel, AccessShareLock);
+
+ return res;
+}
+
+/*
+ * Get all relations for subscription that are not in a ready state.
+ *
+ * Returned list is palloc'ed in current memory context.
+ */
+List *
+GetSubscriptionNotReadyRelations(Oid subid)
+{
+ List *res = NIL;
+ Relation rel;
+ HeapTuple tup;
+ int nkeys = 0;
+ ScanKeyData skey[2];
+ SysScanDesc scan;
+
+ rel = table_open(SubscriptionRelRelationId, AccessShareLock);
+
+ ScanKeyInit(&skey[nkeys++],
+ Anum_pg_subscription_rel_srsubid,
+ BTEqualStrategyNumber, F_OIDEQ,
+ ObjectIdGetDatum(subid));
+
+ ScanKeyInit(&skey[nkeys++],
+ Anum_pg_subscription_rel_srsubstate,
+ BTEqualStrategyNumber, F_CHARNE,
+ CharGetDatum(SUBREL_STATE_READY));
+
+ scan = systable_beginscan(rel, InvalidOid, false,
+ NULL, nkeys, skey);
+
+ while (HeapTupleIsValid(tup = systable_getnext(scan)))
+ {
+ Form_pg_subscription_rel subrel;
+ SubscriptionRelState *relstate;
+ Datum d;
+ bool isnull;
+
+ subrel = (Form_pg_subscription_rel) GETSTRUCT(tup);
+
+ relstate = (SubscriptionRelState *) palloc(sizeof(SubscriptionRelState));
+ relstate->relid = subrel->srrelid;
+ relstate->state = subrel->srsubstate;
+ d = SysCacheGetAttr(SUBSCRIPTIONRELMAP, tup,
+ Anum_pg_subscription_rel_srsublsn, &isnull);
+ if (isnull)
+ relstate->lsn = InvalidXLogRecPtr;
+ else
+ relstate->lsn = DatumGetLSN(d);
+
+ res = lappend(res, relstate);
+ }
+
+ /* Cleanup */
+ systable_endscan(scan);
+ table_close(rel, AccessShareLock);
+
+ return res;
+}