summaryrefslogtreecommitdiffstats
path: root/src/backend/commands/publicationcmds.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/commands/publicationcmds.c')
-rw-r--r--src/backend/commands/publicationcmds.c801
1 files changed, 801 insertions, 0 deletions
diff --git a/src/backend/commands/publicationcmds.c b/src/backend/commands/publicationcmds.c
new file mode 100644
index 0000000..a5e29b5
--- /dev/null
+++ b/src/backend/commands/publicationcmds.c
@@ -0,0 +1,801 @@
+/*-------------------------------------------------------------------------
+ *
+ * publicationcmds.c
+ * publication manipulation
+ *
+ * Portions Copyright (c) 1996-2020, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * IDENTIFICATION
+ * publicationcmds.c
+ *
+ *-------------------------------------------------------------------------
+ */
+
+#include "postgres.h"
+
+#include "access/genam.h"
+#include "access/htup_details.h"
+#include "access/table.h"
+#include "access/xact.h"
+#include "catalog/catalog.h"
+#include "catalog/indexing.h"
+#include "catalog/namespace.h"
+#include "catalog/objectaccess.h"
+#include "catalog/objectaddress.h"
+#include "catalog/partition.h"
+#include "catalog/pg_inherits.h"
+#include "catalog/pg_publication.h"
+#include "catalog/pg_publication_rel.h"
+#include "catalog/pg_type.h"
+#include "commands/dbcommands.h"
+#include "commands/defrem.h"
+#include "commands/event_trigger.h"
+#include "commands/publicationcmds.h"
+#include "funcapi.h"
+#include "miscadmin.h"
+#include "utils/acl.h"
+#include "utils/array.h"
+#include "utils/builtins.h"
+#include "utils/catcache.h"
+#include "utils/fmgroids.h"
+#include "utils/inval.h"
+#include "utils/lsyscache.h"
+#include "utils/rel.h"
+#include "utils/syscache.h"
+#include "utils/varlena.h"
+
+/* Same as MAXNUMMESSAGES in sinvaladt.c */
+#define MAX_RELCACHE_INVAL_MSGS 4096
+
+static List *OpenTableList(List *tables);
+static void CloseTableList(List *rels);
+static void PublicationAddTables(Oid pubid, List *rels, bool if_not_exists,
+ AlterPublicationStmt *stmt);
+static void PublicationDropTables(Oid pubid, List *rels, bool missing_ok);
+
+static void
+parse_publication_options(List *options,
+ bool *publish_given,
+ PublicationActions *pubactions,
+ bool *publish_via_partition_root_given,
+ bool *publish_via_partition_root)
+{
+ ListCell *lc;
+
+ *publish_given = false;
+ *publish_via_partition_root_given = false;
+
+ /* defaults */
+ pubactions->pubinsert = true;
+ pubactions->pubupdate = true;
+ pubactions->pubdelete = true;
+ pubactions->pubtruncate = true;
+ *publish_via_partition_root = false;
+
+ /* Parse options */
+ foreach(lc, options)
+ {
+ DefElem *defel = (DefElem *) lfirst(lc);
+
+ if (strcmp(defel->defname, "publish") == 0)
+ {
+ char *publish;
+ List *publish_list;
+ ListCell *lc;
+
+ if (*publish_given)
+ ereport(ERROR,
+ (errcode(ERRCODE_SYNTAX_ERROR),
+ errmsg("conflicting or redundant options")));
+
+ /*
+ * If publish option was given only the explicitly listed actions
+ * should be published.
+ */
+ pubactions->pubinsert = false;
+ pubactions->pubupdate = false;
+ pubactions->pubdelete = false;
+ pubactions->pubtruncate = false;
+
+ *publish_given = true;
+ publish = defGetString(defel);
+
+ if (!SplitIdentifierString(publish, ',', &publish_list))
+ ereport(ERROR,
+ (errcode(ERRCODE_SYNTAX_ERROR),
+ errmsg("invalid list syntax for \"publish\" option")));
+
+ /* Process the option list. */
+ foreach(lc, publish_list)
+ {
+ char *publish_opt = (char *) lfirst(lc);
+
+ if (strcmp(publish_opt, "insert") == 0)
+ pubactions->pubinsert = true;
+ else if (strcmp(publish_opt, "update") == 0)
+ pubactions->pubupdate = true;
+ else if (strcmp(publish_opt, "delete") == 0)
+ pubactions->pubdelete = true;
+ else if (strcmp(publish_opt, "truncate") == 0)
+ pubactions->pubtruncate = true;
+ else
+ ereport(ERROR,
+ (errcode(ERRCODE_SYNTAX_ERROR),
+ errmsg("unrecognized \"publish\" value: \"%s\"", publish_opt)));
+ }
+ }
+ else if (strcmp(defel->defname, "publish_via_partition_root") == 0)
+ {
+ if (*publish_via_partition_root_given)
+ ereport(ERROR,
+ (errcode(ERRCODE_SYNTAX_ERROR),
+ errmsg("conflicting or redundant options")));
+ *publish_via_partition_root_given = true;
+ *publish_via_partition_root = defGetBoolean(defel);
+ }
+ else
+ ereport(ERROR,
+ (errcode(ERRCODE_SYNTAX_ERROR),
+ errmsg("unrecognized publication parameter: \"%s\"", defel->defname)));
+ }
+}
+
+/*
+ * Create new publication.
+ */
+ObjectAddress
+CreatePublication(CreatePublicationStmt *stmt)
+{
+ Relation rel;
+ ObjectAddress myself;
+ Oid puboid;
+ bool nulls[Natts_pg_publication];
+ Datum values[Natts_pg_publication];
+ HeapTuple tup;
+ bool publish_given;
+ PublicationActions pubactions;
+ bool publish_via_partition_root_given;
+ bool publish_via_partition_root;
+ AclResult aclresult;
+
+ /* must have CREATE privilege on database */
+ aclresult = pg_database_aclcheck(MyDatabaseId, GetUserId(), ACL_CREATE);
+ if (aclresult != ACLCHECK_OK)
+ aclcheck_error(aclresult, OBJECT_DATABASE,
+ get_database_name(MyDatabaseId));
+
+ /* FOR ALL TABLES requires superuser */
+ if (stmt->for_all_tables && !superuser())
+ ereport(ERROR,
+ (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
+ errmsg("must be superuser to create FOR ALL TABLES publication")));
+
+ rel = table_open(PublicationRelationId, RowExclusiveLock);
+
+ /* Check if name is used */
+ puboid = GetSysCacheOid1(PUBLICATIONNAME, Anum_pg_publication_oid,
+ CStringGetDatum(stmt->pubname));
+ if (OidIsValid(puboid))
+ {
+ ereport(ERROR,
+ (errcode(ERRCODE_DUPLICATE_OBJECT),
+ errmsg("publication \"%s\" already exists",
+ stmt->pubname)));
+ }
+
+ /* Form a tuple. */
+ memset(values, 0, sizeof(values));
+ memset(nulls, false, sizeof(nulls));
+
+ values[Anum_pg_publication_pubname - 1] =
+ DirectFunctionCall1(namein, CStringGetDatum(stmt->pubname));
+ values[Anum_pg_publication_pubowner - 1] = ObjectIdGetDatum(GetUserId());
+
+ parse_publication_options(stmt->options,
+ &publish_given, &pubactions,
+ &publish_via_partition_root_given,
+ &publish_via_partition_root);
+
+ puboid = GetNewOidWithIndex(rel, PublicationObjectIndexId,
+ Anum_pg_publication_oid);
+ values[Anum_pg_publication_oid - 1] = ObjectIdGetDatum(puboid);
+ values[Anum_pg_publication_puballtables - 1] =
+ BoolGetDatum(stmt->for_all_tables);
+ values[Anum_pg_publication_pubinsert - 1] =
+ BoolGetDatum(pubactions.pubinsert);
+ values[Anum_pg_publication_pubupdate - 1] =
+ BoolGetDatum(pubactions.pubupdate);
+ values[Anum_pg_publication_pubdelete - 1] =
+ BoolGetDatum(pubactions.pubdelete);
+ values[Anum_pg_publication_pubtruncate - 1] =
+ BoolGetDatum(pubactions.pubtruncate);
+ values[Anum_pg_publication_pubviaroot - 1] =
+ BoolGetDatum(publish_via_partition_root);
+
+ tup = heap_form_tuple(RelationGetDescr(rel), values, nulls);
+
+ /* Insert tuple into catalog. */
+ CatalogTupleInsert(rel, tup);
+ heap_freetuple(tup);
+
+ recordDependencyOnOwner(PublicationRelationId, puboid, GetUserId());
+
+ ObjectAddressSet(myself, PublicationRelationId, puboid);
+
+ /* Make the changes visible. */
+ CommandCounterIncrement();
+
+ if (stmt->tables)
+ {
+ List *rels;
+
+ Assert(list_length(stmt->tables) > 0);
+
+ rels = OpenTableList(stmt->tables);
+ PublicationAddTables(puboid, rels, true, NULL);
+ CloseTableList(rels);
+ }
+
+ table_close(rel, RowExclusiveLock);
+
+ InvokeObjectPostCreateHook(PublicationRelationId, puboid, 0);
+
+ if (wal_level != WAL_LEVEL_LOGICAL)
+ {
+ ereport(WARNING,
+ (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+ errmsg("wal_level is insufficient to publish logical changes"),
+ errhint("Set wal_level to logical before creating subscriptions.")));
+ }
+
+ return myself;
+}
+
+/*
+ * Change options of a publication.
+ */
+static void
+AlterPublicationOptions(AlterPublicationStmt *stmt, Relation rel,
+ HeapTuple tup)
+{
+ bool nulls[Natts_pg_publication];
+ bool replaces[Natts_pg_publication];
+ Datum values[Natts_pg_publication];
+ bool publish_given;
+ PublicationActions pubactions;
+ bool publish_via_partition_root_given;
+ bool publish_via_partition_root;
+ ObjectAddress obj;
+ Form_pg_publication pubform;
+
+ parse_publication_options(stmt->options,
+ &publish_given, &pubactions,
+ &publish_via_partition_root_given,
+ &publish_via_partition_root);
+
+ /* Everything ok, form a new tuple. */
+ memset(values, 0, sizeof(values));
+ memset(nulls, false, sizeof(nulls));
+ memset(replaces, false, sizeof(replaces));
+
+ if (publish_given)
+ {
+ values[Anum_pg_publication_pubinsert - 1] = BoolGetDatum(pubactions.pubinsert);
+ replaces[Anum_pg_publication_pubinsert - 1] = true;
+
+ values[Anum_pg_publication_pubupdate - 1] = BoolGetDatum(pubactions.pubupdate);
+ replaces[Anum_pg_publication_pubupdate - 1] = true;
+
+ values[Anum_pg_publication_pubdelete - 1] = BoolGetDatum(pubactions.pubdelete);
+ replaces[Anum_pg_publication_pubdelete - 1] = true;
+
+ values[Anum_pg_publication_pubtruncate - 1] = BoolGetDatum(pubactions.pubtruncate);
+ replaces[Anum_pg_publication_pubtruncate - 1] = true;
+ }
+
+ if (publish_via_partition_root_given)
+ {
+ values[Anum_pg_publication_pubviaroot - 1] = BoolGetDatum(publish_via_partition_root);
+ replaces[Anum_pg_publication_pubviaroot - 1] = true;
+ }
+
+ tup = heap_modify_tuple(tup, RelationGetDescr(rel), values, nulls,
+ replaces);
+
+ /* Update the catalog. */
+ CatalogTupleUpdate(rel, &tup->t_self, tup);
+
+ CommandCounterIncrement();
+
+ pubform = (Form_pg_publication) GETSTRUCT(tup);
+
+ /* Invalidate the relcache. */
+ if (pubform->puballtables)
+ {
+ CacheInvalidateRelcacheAll();
+ }
+ else
+ {
+ /*
+ * For any partitioned tables contained in the publication, we must
+ * invalidate all partitions contained in the respective partition
+ * trees, not just those explicitly mentioned in the publication.
+ */
+ List *relids = GetPublicationRelations(pubform->oid,
+ PUBLICATION_PART_ALL);
+
+ /*
+ * We don't want to send too many individual messages, at some point
+ * it's cheaper to just reset whole relcache.
+ */
+ if (list_length(relids) < MAX_RELCACHE_INVAL_MSGS)
+ {
+ ListCell *lc;
+
+ foreach(lc, relids)
+ {
+ Oid relid = lfirst_oid(lc);
+
+ CacheInvalidateRelcacheByRelid(relid);
+ }
+ }
+ else
+ CacheInvalidateRelcacheAll();
+ }
+
+ ObjectAddressSet(obj, PublicationRelationId, pubform->oid);
+ EventTriggerCollectSimpleCommand(obj, InvalidObjectAddress,
+ (Node *) stmt);
+
+ InvokeObjectPostAlterHook(PublicationRelationId, pubform->oid, 0);
+}
+
+/*
+ * Add or remove table to/from publication.
+ */
+static void
+AlterPublicationTables(AlterPublicationStmt *stmt, Relation rel,
+ HeapTuple tup)
+{
+ List *rels = NIL;
+ Form_pg_publication pubform = (Form_pg_publication) GETSTRUCT(tup);
+ Oid pubid = pubform->oid;
+
+ /* Check that user is allowed to manipulate the publication tables. */
+ if (pubform->puballtables)
+ ereport(ERROR,
+ (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+ errmsg("publication \"%s\" is defined as FOR ALL TABLES",
+ NameStr(pubform->pubname)),
+ errdetail("Tables cannot be added to or dropped from FOR ALL TABLES publications.")));
+
+ Assert(list_length(stmt->tables) > 0);
+
+ rels = OpenTableList(stmt->tables);
+
+ if (stmt->tableAction == DEFELEM_ADD)
+ PublicationAddTables(pubid, rels, false, stmt);
+ else if (stmt->tableAction == DEFELEM_DROP)
+ PublicationDropTables(pubid, rels, false);
+ else /* DEFELEM_SET */
+ {
+ List *oldrelids = GetPublicationRelations(pubid,
+ PUBLICATION_PART_ROOT);
+ List *delrels = NIL;
+ ListCell *oldlc;
+
+ /* Calculate which relations to drop. */
+ foreach(oldlc, oldrelids)
+ {
+ Oid oldrelid = lfirst_oid(oldlc);
+ ListCell *newlc;
+ bool found = false;
+
+ foreach(newlc, rels)
+ {
+ Relation newrel = (Relation) lfirst(newlc);
+
+ if (RelationGetRelid(newrel) == oldrelid)
+ {
+ found = true;
+ break;
+ }
+ }
+
+ if (!found)
+ {
+ Relation oldrel = table_open(oldrelid,
+ ShareUpdateExclusiveLock);
+
+ delrels = lappend(delrels, oldrel);
+ }
+ }
+
+ /* And drop them. */
+ PublicationDropTables(pubid, delrels, true);
+
+ /*
+ * Don't bother calculating the difference for adding, we'll catch and
+ * skip existing ones when doing catalog update.
+ */
+ PublicationAddTables(pubid, rels, true, stmt);
+
+ CloseTableList(delrels);
+ }
+
+ CloseTableList(rels);
+}
+
+/*
+ * Alter the existing publication.
+ *
+ * This is dispatcher function for AlterPublicationOptions and
+ * AlterPublicationTables.
+ */
+void
+AlterPublication(AlterPublicationStmt *stmt)
+{
+ Relation rel;
+ HeapTuple tup;
+ Form_pg_publication pubform;
+
+ rel = table_open(PublicationRelationId, RowExclusiveLock);
+
+ tup = SearchSysCacheCopy1(PUBLICATIONNAME,
+ CStringGetDatum(stmt->pubname));
+
+ if (!HeapTupleIsValid(tup))
+ ereport(ERROR,
+ (errcode(ERRCODE_UNDEFINED_OBJECT),
+ errmsg("publication \"%s\" does not exist",
+ stmt->pubname)));
+
+ pubform = (Form_pg_publication) GETSTRUCT(tup);
+
+ /* must be owner */
+ if (!pg_publication_ownercheck(pubform->oid, GetUserId()))
+ aclcheck_error(ACLCHECK_NOT_OWNER, OBJECT_PUBLICATION,
+ stmt->pubname);
+
+ if (stmt->options)
+ AlterPublicationOptions(stmt, rel, tup);
+ else
+ AlterPublicationTables(stmt, rel, tup);
+
+ /* Cleanup. */
+ heap_freetuple(tup);
+ table_close(rel, RowExclusiveLock);
+}
+
+/*
+ * Drop publication by OID
+ */
+void
+RemovePublicationById(Oid pubid)
+{
+ Relation rel;
+ HeapTuple tup;
+
+ rel = table_open(PublicationRelationId, RowExclusiveLock);
+
+ tup = SearchSysCache1(PUBLICATIONOID, ObjectIdGetDatum(pubid));
+
+ if (!HeapTupleIsValid(tup))
+ elog(ERROR, "cache lookup failed for publication %u", pubid);
+
+ CatalogTupleDelete(rel, &tup->t_self);
+
+ ReleaseSysCache(tup);
+
+ table_close(rel, RowExclusiveLock);
+}
+
+/*
+ * Remove relation from publication by mapping OID.
+ */
+void
+RemovePublicationRelById(Oid proid)
+{
+ Relation rel;
+ HeapTuple tup;
+ Form_pg_publication_rel pubrel;
+
+ rel = table_open(PublicationRelRelationId, RowExclusiveLock);
+
+ tup = SearchSysCache1(PUBLICATIONREL, ObjectIdGetDatum(proid));
+
+ if (!HeapTupleIsValid(tup))
+ elog(ERROR, "cache lookup failed for publication table %u",
+ proid);
+
+ pubrel = (Form_pg_publication_rel) GETSTRUCT(tup);
+
+ /* Invalidate relcache so that publication info is rebuilt. */
+ CacheInvalidateRelcacheByRelid(pubrel->prrelid);
+
+ CatalogTupleDelete(rel, &tup->t_self);
+
+ ReleaseSysCache(tup);
+
+ table_close(rel, RowExclusiveLock);
+}
+
+/*
+ * Open relations specified by a RangeVar list.
+ * The returned tables are locked in ShareUpdateExclusiveLock mode in order to
+ * add them to a publication.
+ */
+static List *
+OpenTableList(List *tables)
+{
+ List *relids = NIL;
+ List *rels = NIL;
+ ListCell *lc;
+
+ /*
+ * Open, share-lock, and check all the explicitly-specified relations
+ */
+ foreach(lc, tables)
+ {
+ RangeVar *rv = castNode(RangeVar, lfirst(lc));
+ bool recurse = rv->inh;
+ Relation rel;
+ Oid myrelid;
+
+ /* Allow query cancel in case this takes a long time */
+ CHECK_FOR_INTERRUPTS();
+
+ rel = table_openrv(rv, ShareUpdateExclusiveLock);
+ myrelid = RelationGetRelid(rel);
+
+ /*
+ * Filter out duplicates if user specifies "foo, foo".
+ *
+ * Note that this algorithm is known to not be very efficient (O(N^2))
+ * but given that it only works on list of tables given to us by user
+ * it's deemed acceptable.
+ */
+ if (list_member_oid(relids, myrelid))
+ {
+ table_close(rel, ShareUpdateExclusiveLock);
+ continue;
+ }
+
+ rels = lappend(rels, rel);
+ relids = lappend_oid(relids, myrelid);
+
+ /*
+ * Add children of this rel, if requested, so that they too are added
+ * to the publication. A partitioned table can't have any inheritance
+ * children other than its partitions, which need not be explicitly
+ * added to the publication.
+ */
+ if (recurse && rel->rd_rel->relkind != RELKIND_PARTITIONED_TABLE)
+ {
+ List *children;
+ ListCell *child;
+
+ children = find_all_inheritors(myrelid, ShareUpdateExclusiveLock,
+ NULL);
+
+ foreach(child, children)
+ {
+ Oid childrelid = lfirst_oid(child);
+
+ /* Allow query cancel in case this takes a long time */
+ CHECK_FOR_INTERRUPTS();
+
+ /*
+ * Skip duplicates if user specified both parent and child
+ * tables.
+ */
+ if (list_member_oid(relids, childrelid))
+ continue;
+
+ /* find_all_inheritors already got lock */
+ rel = table_open(childrelid, NoLock);
+ rels = lappend(rels, rel);
+ relids = lappend_oid(relids, childrelid);
+ }
+ }
+ }
+
+ list_free(relids);
+
+ return rels;
+}
+
+/*
+ * Close all relations in the list.
+ */
+static void
+CloseTableList(List *rels)
+{
+ ListCell *lc;
+
+ foreach(lc, rels)
+ {
+ Relation rel = (Relation) lfirst(lc);
+
+ table_close(rel, NoLock);
+ }
+}
+
+/*
+ * Add listed tables to the publication.
+ */
+static void
+PublicationAddTables(Oid pubid, List *rels, bool if_not_exists,
+ AlterPublicationStmt *stmt)
+{
+ ListCell *lc;
+
+ Assert(!stmt || !stmt->for_all_tables);
+
+ foreach(lc, rels)
+ {
+ Relation rel = (Relation) lfirst(lc);
+ ObjectAddress obj;
+
+ /* Must be owner of the table or superuser. */
+ if (!pg_class_ownercheck(RelationGetRelid(rel), GetUserId()))
+ aclcheck_error(ACLCHECK_NOT_OWNER, get_relkind_objtype(rel->rd_rel->relkind),
+ RelationGetRelationName(rel));
+
+ obj = publication_add_relation(pubid, rel, if_not_exists);
+ if (stmt)
+ {
+ EventTriggerCollectSimpleCommand(obj, InvalidObjectAddress,
+ (Node *) stmt);
+
+ InvokeObjectPostCreateHook(PublicationRelRelationId,
+ obj.objectId, 0);
+ }
+ }
+}
+
+/*
+ * Remove listed tables from the publication.
+ */
+static void
+PublicationDropTables(Oid pubid, List *rels, bool missing_ok)
+{
+ ObjectAddress obj;
+ ListCell *lc;
+ Oid prid;
+
+ foreach(lc, rels)
+ {
+ Relation rel = (Relation) lfirst(lc);
+ Oid relid = RelationGetRelid(rel);
+
+ prid = GetSysCacheOid2(PUBLICATIONRELMAP, Anum_pg_publication_rel_oid,
+ ObjectIdGetDatum(relid),
+ ObjectIdGetDatum(pubid));
+ if (!OidIsValid(prid))
+ {
+ if (missing_ok)
+ continue;
+
+ ereport(ERROR,
+ (errcode(ERRCODE_UNDEFINED_OBJECT),
+ errmsg("relation \"%s\" is not part of the publication",
+ RelationGetRelationName(rel))));
+ }
+
+ ObjectAddressSet(obj, PublicationRelRelationId, prid);
+ performDeletion(&obj, DROP_CASCADE, 0);
+ }
+}
+
+/*
+ * Internal workhorse for changing a publication owner
+ */
+static void
+AlterPublicationOwner_internal(Relation rel, HeapTuple tup, Oid newOwnerId)
+{
+ Form_pg_publication form;
+
+ form = (Form_pg_publication) GETSTRUCT(tup);
+
+ if (form->pubowner == newOwnerId)
+ return;
+
+ if (!superuser())
+ {
+ AclResult aclresult;
+
+ /* Must be owner */
+ if (!pg_publication_ownercheck(form->oid, GetUserId()))
+ aclcheck_error(ACLCHECK_NOT_OWNER, OBJECT_PUBLICATION,
+ NameStr(form->pubname));
+
+ /* Must be able to become new owner */
+ check_is_member_of_role(GetUserId(), newOwnerId);
+
+ /* New owner must have CREATE privilege on database */
+ aclresult = pg_database_aclcheck(MyDatabaseId, newOwnerId, ACL_CREATE);
+ if (aclresult != ACLCHECK_OK)
+ aclcheck_error(aclresult, OBJECT_DATABASE,
+ get_database_name(MyDatabaseId));
+
+ if (form->puballtables && !superuser_arg(newOwnerId))
+ ereport(ERROR,
+ (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
+ errmsg("permission denied to change owner of publication \"%s\"",
+ NameStr(form->pubname)),
+ errhint("The owner of a FOR ALL TABLES publication must be a superuser.")));
+ }
+
+ form->pubowner = newOwnerId;
+ CatalogTupleUpdate(rel, &tup->t_self, tup);
+
+ /* Update owner dependency reference */
+ changeDependencyOnOwner(PublicationRelationId,
+ form->oid,
+ newOwnerId);
+
+ InvokeObjectPostAlterHook(PublicationRelationId,
+ form->oid, 0);
+}
+
+/*
+ * Change publication owner -- by name
+ */
+ObjectAddress
+AlterPublicationOwner(const char *name, Oid newOwnerId)
+{
+ Oid subid;
+ HeapTuple tup;
+ Relation rel;
+ ObjectAddress address;
+ Form_pg_publication pubform;
+
+ rel = table_open(PublicationRelationId, RowExclusiveLock);
+
+ tup = SearchSysCacheCopy1(PUBLICATIONNAME, CStringGetDatum(name));
+
+ if (!HeapTupleIsValid(tup))
+ ereport(ERROR,
+ (errcode(ERRCODE_UNDEFINED_OBJECT),
+ errmsg("publication \"%s\" does not exist", name)));
+
+ pubform = (Form_pg_publication) GETSTRUCT(tup);
+ subid = pubform->oid;
+
+ AlterPublicationOwner_internal(rel, tup, newOwnerId);
+
+ ObjectAddressSet(address, PublicationRelationId, subid);
+
+ heap_freetuple(tup);
+
+ table_close(rel, RowExclusiveLock);
+
+ return address;
+}
+
+/*
+ * Change publication owner -- by OID
+ */
+void
+AlterPublicationOwner_oid(Oid subid, Oid newOwnerId)
+{
+ HeapTuple tup;
+ Relation rel;
+
+ rel = table_open(PublicationRelationId, RowExclusiveLock);
+
+ tup = SearchSysCacheCopy1(PUBLICATIONOID, ObjectIdGetDatum(subid));
+
+ if (!HeapTupleIsValid(tup))
+ ereport(ERROR,
+ (errcode(ERRCODE_UNDEFINED_OBJECT),
+ errmsg("publication with OID %u does not exist", subid)));
+
+ AlterPublicationOwner_internal(rel, tup, newOwnerId);
+
+ heap_freetuple(tup);
+
+ table_close(rel, RowExclusiveLock);
+}