summaryrefslogtreecommitdiffstats
path: root/src/backend/replication/pgoutput
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-13 13:44:03 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-13 13:44:03 +0000
commit293913568e6a7a86fd1479e1cff8e2ecb58d6568 (patch)
treefc3b469a3ec5ab71b36ea97cc7aaddb838423a0c /src/backend/replication/pgoutput
parentInitial commit. (diff)
downloadpostgresql-16-293913568e6a7a86fd1479e1cff8e2ecb58d6568.tar.xz
postgresql-16-293913568e6a7a86fd1479e1cff8e2ecb58d6568.zip
Adding upstream version 16.2.upstream/16.2
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'src/backend/replication/pgoutput')
-rw-r--r--src/backend/replication/pgoutput/Makefile32
-rw-r--r--src/backend/replication/pgoutput/meson.build18
-rw-r--r--src/backend/replication/pgoutput/pgoutput.c2362
3 files changed, 2412 insertions, 0 deletions
diff --git a/src/backend/replication/pgoutput/Makefile b/src/backend/replication/pgoutput/Makefile
new file mode 100644
index 0000000..3b41fbc
--- /dev/null
+++ b/src/backend/replication/pgoutput/Makefile
@@ -0,0 +1,32 @@
+#-------------------------------------------------------------------------
+#
+# Makefile--
+# Makefile for src/backend/replication/pgoutput
+#
+# IDENTIFICATION
+# src/backend/replication/pgoutput
+#
+#-------------------------------------------------------------------------
+
+subdir = src/backend/replication/pgoutput
+top_builddir = ../../../..
+include $(top_builddir)/src/Makefile.global
+
+OBJS = \
+ $(WIN32RES) \
+ pgoutput.o
+PGFILEDESC = "pgoutput - standard logical replication output plugin"
+NAME = pgoutput
+
+all: all-shared-lib
+
+include $(top_srcdir)/src/Makefile.shlib
+
+install: all installdirs install-lib
+
+installdirs: installdirs-lib
+
+uninstall: uninstall-lib
+
+clean distclean maintainer-clean: clean-lib
+ rm -f $(OBJS)
diff --git a/src/backend/replication/pgoutput/meson.build b/src/backend/replication/pgoutput/meson.build
new file mode 100644
index 0000000..243c92d
--- /dev/null
+++ b/src/backend/replication/pgoutput/meson.build
@@ -0,0 +1,18 @@
+# Copyright (c) 2022-2023, PostgreSQL Global Development Group
+
+pgoutput_sources = files(
+ 'pgoutput.c',
+)
+
+if host_system == 'windows'
+ pgoutput_sources += rc_lib_gen.process(win32ver_rc, extra_args: [
+ '--NAME', 'pgoutput',
+ '--FILEDESC', 'pgoutput - standard logical replication output plugin',])
+endif
+
+pgoutput = shared_module('pgoutput',
+ pgoutput_sources,
+ kwargs: pg_mod_args,
+)
+
+backend_targets += pgoutput
diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c
new file mode 100644
index 0000000..05688cd
--- /dev/null
+++ b/src/backend/replication/pgoutput/pgoutput.c
@@ -0,0 +1,2362 @@
+/*-------------------------------------------------------------------------
+ *
+ * pgoutput.c
+ * Logical Replication output plugin
+ *
+ * Copyright (c) 2012-2023, PostgreSQL Global Development Group
+ *
+ * IDENTIFICATION
+ * src/backend/replication/pgoutput/pgoutput.c
+ *
+ *-------------------------------------------------------------------------
+ */
+#include "postgres.h"
+
+#include "access/tupconvert.h"
+#include "catalog/partition.h"
+#include "catalog/pg_publication.h"
+#include "catalog/pg_publication_rel.h"
+#include "catalog/pg_subscription.h"
+#include "commands/defrem.h"
+#include "commands/subscriptioncmds.h"
+#include "executor/executor.h"
+#include "fmgr.h"
+#include "nodes/makefuncs.h"
+#include "optimizer/optimizer.h"
+#include "parser/parse_relation.h"
+#include "replication/logical.h"
+#include "replication/logicalproto.h"
+#include "replication/origin.h"
+#include "replication/pgoutput.h"
+#include "utils/builtins.h"
+#include "utils/inval.h"
+#include "utils/lsyscache.h"
+#include "utils/memutils.h"
+#include "utils/rel.h"
+#include "utils/syscache.h"
+#include "utils/varlena.h"
+
+PG_MODULE_MAGIC;
+
+static void pgoutput_startup(LogicalDecodingContext *ctx,
+ OutputPluginOptions *opt, bool is_init);
+static void pgoutput_shutdown(LogicalDecodingContext *ctx);
+static void pgoutput_begin_txn(LogicalDecodingContext *ctx,
+ ReorderBufferTXN *txn);
+static void pgoutput_commit_txn(LogicalDecodingContext *ctx,
+ ReorderBufferTXN *txn, XLogRecPtr commit_lsn);
+static void pgoutput_change(LogicalDecodingContext *ctx,
+ ReorderBufferTXN *txn, Relation relation,
+ ReorderBufferChange *change);
+static void pgoutput_truncate(LogicalDecodingContext *ctx,
+ ReorderBufferTXN *txn, int nrelations, Relation relations[],
+ ReorderBufferChange *change);
+static void pgoutput_message(LogicalDecodingContext *ctx,
+ ReorderBufferTXN *txn, XLogRecPtr message_lsn,
+ bool transactional, const char *prefix,
+ Size sz, const char *message);
+static bool pgoutput_origin_filter(LogicalDecodingContext *ctx,
+ RepOriginId origin_id);
+static void pgoutput_begin_prepare_txn(LogicalDecodingContext *ctx,
+ ReorderBufferTXN *txn);
+static void pgoutput_prepare_txn(LogicalDecodingContext *ctx,
+ ReorderBufferTXN *txn, XLogRecPtr prepare_lsn);
+static void pgoutput_commit_prepared_txn(LogicalDecodingContext *ctx,
+ ReorderBufferTXN *txn, XLogRecPtr commit_lsn);
+static void pgoutput_rollback_prepared_txn(LogicalDecodingContext *ctx,
+ ReorderBufferTXN *txn,
+ XLogRecPtr prepare_end_lsn,
+ TimestampTz prepare_time);
+static void pgoutput_stream_start(struct LogicalDecodingContext *ctx,
+ ReorderBufferTXN *txn);
+static void pgoutput_stream_stop(struct LogicalDecodingContext *ctx,
+ ReorderBufferTXN *txn);
+static void pgoutput_stream_abort(struct LogicalDecodingContext *ctx,
+ ReorderBufferTXN *txn,
+ XLogRecPtr abort_lsn);
+static void pgoutput_stream_commit(struct LogicalDecodingContext *ctx,
+ ReorderBufferTXN *txn,
+ XLogRecPtr commit_lsn);
+static void pgoutput_stream_prepare_txn(LogicalDecodingContext *ctx,
+ ReorderBufferTXN *txn, XLogRecPtr prepare_lsn);
+
+static bool publications_valid;
+static bool in_streaming;
+
+static List *LoadPublications(List *pubnames);
+static void publication_invalidation_cb(Datum arg, int cacheid,
+ uint32 hashvalue);
+static void send_relation_and_attrs(Relation relation, TransactionId xid,
+ LogicalDecodingContext *ctx,
+ Bitmapset *columns);
+static void send_repl_origin(LogicalDecodingContext *ctx,
+ RepOriginId origin_id, XLogRecPtr origin_lsn,
+ bool send_origin);
+
+/*
+ * Only 3 publication actions are used for row filtering ("insert", "update",
+ * "delete"). See RelationSyncEntry.exprstate[].
+ */
+enum RowFilterPubAction
+{
+ PUBACTION_INSERT,
+ PUBACTION_UPDATE,
+ PUBACTION_DELETE
+};
+
+#define NUM_ROWFILTER_PUBACTIONS (PUBACTION_DELETE+1)
+
+/*
+ * Entry in the map used to remember which relation schemas we sent.
+ *
+ * The schema_sent flag determines if the current schema record for the
+ * relation (and for its ancestor if publish_as_relid is set) was already
+ * sent to the subscriber (in which case we don't need to send it again).
+ *
+ * The schema cache on downstream is however updated only at commit time,
+ * and with streamed transactions the commit order may be different from
+ * the order the transactions are sent in. Also, the (sub) transactions
+ * might get aborted so we need to send the schema for each (sub) transaction
+ * so that we don't lose the schema information on abort. For handling this,
+ * we maintain the list of xids (streamed_txns) for those we have already sent
+ * the schema.
+ *
+ * For partitions, 'pubactions' considers not only the table's own
+ * publications, but also those of all of its ancestors.
+ */
+typedef struct RelationSyncEntry
+{
+ Oid relid; /* relation oid */
+
+ bool replicate_valid; /* overall validity flag for entry */
+
+ bool schema_sent;
+ List *streamed_txns; /* streamed toplevel transactions with this
+ * schema */
+
+ /* are we publishing this rel? */
+ PublicationActions pubactions;
+
+ /*
+ * ExprState array for row filter. Different publication actions don't
+ * allow multiple expressions to always be combined into one, because
+ * updates or deletes restrict the column in expression to be part of the
+ * replica identity index whereas inserts do not have this restriction, so
+ * there is one ExprState per publication action.
+ */
+ ExprState *exprstate[NUM_ROWFILTER_PUBACTIONS];
+ EState *estate; /* executor state used for row filter */
+ TupleTableSlot *new_slot; /* slot for storing new tuple */
+ TupleTableSlot *old_slot; /* slot for storing old tuple */
+
+ /*
+ * OID of the relation to publish changes as. For a partition, this may
+ * be set to one of its ancestors whose schema will be used when
+ * replicating changes, if publish_via_partition_root is set for the
+ * publication.
+ */
+ Oid publish_as_relid;
+
+ /*
+ * Map used when replicating using an ancestor's schema to convert tuples
+ * from partition's type to the ancestor's; NULL if publish_as_relid is
+ * same as 'relid' or if unnecessary due to partition and the ancestor
+ * having identical TupleDesc.
+ */
+ AttrMap *attrmap;
+
+ /*
+ * Columns included in the publication, or NULL if all columns are
+ * included implicitly. Note that the attnums in this bitmap are not
+ * shifted by FirstLowInvalidHeapAttributeNumber.
+ */
+ Bitmapset *columns;
+
+ /*
+ * Private context to store additional data for this entry - state for the
+ * row filter expressions, column list, etc.
+ */
+ MemoryContext entry_cxt;
+} RelationSyncEntry;
+
+/*
+ * Maintain a per-transaction level variable to track whether the transaction
+ * has sent BEGIN. BEGIN is only sent when the first change in a transaction
+ * is processed. This makes it possible to skip sending a pair of BEGIN/COMMIT
+ * messages for empty transactions which saves network bandwidth.
+ *
+ * This optimization is not used for prepared transactions because if the
+ * WALSender restarts after prepare of a transaction and before commit prepared
+ * of the same transaction then we won't be able to figure out if we have
+ * skipped sending BEGIN/PREPARE of a transaction as it was empty. This is
+ * because we would have lost the in-memory txndata information that was
+ * present prior to the restart. This will result in sending a spurious
+ * COMMIT PREPARED without a corresponding prepared transaction at the
+ * downstream which would lead to an error when it tries to process it.
+ *
+ * XXX We could achieve this optimization by changing protocol to send
+ * additional information so that downstream can detect that the corresponding
+ * prepare has not been sent. However, adding such a check for every
+ * transaction in the downstream could be costly so we might want to do it
+ * optionally.
+ *
+ * We also don't have this optimization for streamed transactions because
+ * they can contain prepared transactions.
+ */
+typedef struct PGOutputTxnData
+{
+ bool sent_begin_txn; /* flag indicating whether BEGIN has been sent */
+} PGOutputTxnData;
+
+/* Map used to remember which relation schemas we sent. */
+static HTAB *RelationSyncCache = NULL;
+
+static void init_rel_sync_cache(MemoryContext cachectx);
+static void cleanup_rel_sync_cache(TransactionId xid, bool is_commit);
+static RelationSyncEntry *get_rel_sync_entry(PGOutputData *data,
+ Relation relation);
+static void rel_sync_cache_relation_cb(Datum arg, Oid relid);
+static void rel_sync_cache_publication_cb(Datum arg, int cacheid,
+ uint32 hashvalue);
+static void set_schema_sent_in_streamed_txn(RelationSyncEntry *entry,
+ TransactionId xid);
+static bool get_schema_sent_in_streamed_txn(RelationSyncEntry *entry,
+ TransactionId xid);
+static void init_tuple_slot(PGOutputData *data, Relation relation,
+ RelationSyncEntry *entry);
+
+/* row filter routines */
+static EState *create_estate_for_relation(Relation rel);
+static void pgoutput_row_filter_init(PGOutputData *data,
+ List *publications,
+ RelationSyncEntry *entry);
+static bool pgoutput_row_filter_exec_expr(ExprState *state,
+ ExprContext *econtext);
+static bool pgoutput_row_filter(Relation relation, TupleTableSlot *old_slot,
+ TupleTableSlot **new_slot_ptr,
+ RelationSyncEntry *entry,
+ ReorderBufferChangeType *action);
+
+/* column list routines */
+static void pgoutput_column_list_init(PGOutputData *data,
+ List *publications,
+ RelationSyncEntry *entry);
+
+/*
+ * Specify output plugin callbacks
+ */
+void
+_PG_output_plugin_init(OutputPluginCallbacks *cb)
+{
+ cb->startup_cb = pgoutput_startup;
+ cb->begin_cb = pgoutput_begin_txn;
+ cb->change_cb = pgoutput_change;
+ cb->truncate_cb = pgoutput_truncate;
+ cb->message_cb = pgoutput_message;
+ cb->commit_cb = pgoutput_commit_txn;
+
+ cb->begin_prepare_cb = pgoutput_begin_prepare_txn;
+ cb->prepare_cb = pgoutput_prepare_txn;
+ cb->commit_prepared_cb = pgoutput_commit_prepared_txn;
+ cb->rollback_prepared_cb = pgoutput_rollback_prepared_txn;
+ cb->filter_by_origin_cb = pgoutput_origin_filter;
+ cb->shutdown_cb = pgoutput_shutdown;
+
+ /* transaction streaming */
+ cb->stream_start_cb = pgoutput_stream_start;
+ cb->stream_stop_cb = pgoutput_stream_stop;
+ cb->stream_abort_cb = pgoutput_stream_abort;
+ cb->stream_commit_cb = pgoutput_stream_commit;
+ cb->stream_change_cb = pgoutput_change;
+ cb->stream_message_cb = pgoutput_message;
+ cb->stream_truncate_cb = pgoutput_truncate;
+ /* transaction streaming - two-phase commit */
+ cb->stream_prepare_cb = pgoutput_stream_prepare_txn;
+}
+
+static void
+parse_output_parameters(List *options, PGOutputData *data)
+{
+ ListCell *lc;
+ bool protocol_version_given = false;
+ bool publication_names_given = false;
+ bool binary_option_given = false;
+ bool messages_option_given = false;
+ bool streaming_given = false;
+ bool two_phase_option_given = false;
+ bool origin_option_given = false;
+
+ data->binary = false;
+ data->streaming = LOGICALREP_STREAM_OFF;
+ data->messages = false;
+ data->two_phase = false;
+
+ foreach(lc, options)
+ {
+ DefElem *defel = (DefElem *) lfirst(lc);
+
+ Assert(defel->arg == NULL || IsA(defel->arg, String));
+
+ /* Check each param, whether or not we recognize it */
+ if (strcmp(defel->defname, "proto_version") == 0)
+ {
+ unsigned long parsed;
+ char *endptr;
+
+ if (protocol_version_given)
+ ereport(ERROR,
+ (errcode(ERRCODE_SYNTAX_ERROR),
+ errmsg("conflicting or redundant options")));
+ protocol_version_given = true;
+
+ errno = 0;
+ parsed = strtoul(strVal(defel->arg), &endptr, 10);
+ if (errno != 0 || *endptr != '\0')
+ ereport(ERROR,
+ (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+ errmsg("invalid proto_version")));
+
+ if (parsed > PG_UINT32_MAX)
+ ereport(ERROR,
+ (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+ errmsg("proto_version \"%s\" out of range",
+ strVal(defel->arg))));
+
+ data->protocol_version = (uint32) parsed;
+ }
+ else if (strcmp(defel->defname, "publication_names") == 0)
+ {
+ if (publication_names_given)
+ ereport(ERROR,
+ (errcode(ERRCODE_SYNTAX_ERROR),
+ errmsg("conflicting or redundant options")));
+ publication_names_given = true;
+
+ if (!SplitIdentifierString(strVal(defel->arg), ',',
+ &data->publication_names))
+ ereport(ERROR,
+ (errcode(ERRCODE_INVALID_NAME),
+ errmsg("invalid publication_names syntax")));
+ }
+ else if (strcmp(defel->defname, "binary") == 0)
+ {
+ if (binary_option_given)
+ ereport(ERROR,
+ (errcode(ERRCODE_SYNTAX_ERROR),
+ errmsg("conflicting or redundant options")));
+ binary_option_given = true;
+
+ data->binary = defGetBoolean(defel);
+ }
+ else if (strcmp(defel->defname, "messages") == 0)
+ {
+ if (messages_option_given)
+ ereport(ERROR,
+ (errcode(ERRCODE_SYNTAX_ERROR),
+ errmsg("conflicting or redundant options")));
+ messages_option_given = true;
+
+ data->messages = defGetBoolean(defel);
+ }
+ else if (strcmp(defel->defname, "streaming") == 0)
+ {
+ if (streaming_given)
+ ereport(ERROR,
+ (errcode(ERRCODE_SYNTAX_ERROR),
+ errmsg("conflicting or redundant options")));
+ streaming_given = true;
+
+ data->streaming = defGetStreamingMode(defel);
+ }
+ else if (strcmp(defel->defname, "two_phase") == 0)
+ {
+ if (two_phase_option_given)
+ ereport(ERROR,
+ (errcode(ERRCODE_SYNTAX_ERROR),
+ errmsg("conflicting or redundant options")));
+ two_phase_option_given = true;
+
+ data->two_phase = defGetBoolean(defel);
+ }
+ else if (strcmp(defel->defname, "origin") == 0)
+ {
+ if (origin_option_given)
+ ereport(ERROR,
+ errcode(ERRCODE_SYNTAX_ERROR),
+ errmsg("conflicting or redundant options"));
+ origin_option_given = true;
+
+ data->origin = defGetString(defel);
+
+ if (pg_strcasecmp(data->origin, LOGICALREP_ORIGIN_NONE) != 0 &&
+ pg_strcasecmp(data->origin, LOGICALREP_ORIGIN_ANY) != 0)
+ ereport(ERROR,
+ errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+ errmsg("unrecognized origin value: \"%s\"", data->origin));
+ }
+ else
+ elog(ERROR, "unrecognized pgoutput option: %s", defel->defname);
+ }
+}
+
+/*
+ * Initialize this plugin
+ */
+static void
+pgoutput_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
+ bool is_init)
+{
+ PGOutputData *data = palloc0(sizeof(PGOutputData));
+ static bool publication_callback_registered = false;
+
+ /* Create our memory context for private allocations. */
+ data->context = AllocSetContextCreate(ctx->context,
+ "logical replication output context",
+ ALLOCSET_DEFAULT_SIZES);
+
+ data->cachectx = AllocSetContextCreate(ctx->context,
+ "logical replication cache context",
+ ALLOCSET_DEFAULT_SIZES);
+
+ ctx->output_plugin_private = data;
+
+ /* This plugin uses binary protocol. */
+ opt->output_type = OUTPUT_PLUGIN_BINARY_OUTPUT;
+
+ /*
+ * This is replication start and not slot initialization.
+ *
+ * Parse and validate options passed by the client.
+ */
+ if (!is_init)
+ {
+ /* Parse the params and ERROR if we see any we don't recognize */
+ parse_output_parameters(ctx->output_plugin_options, data);
+
+ /* Check if we support requested protocol */
+ if (data->protocol_version > LOGICALREP_PROTO_MAX_VERSION_NUM)
+ ereport(ERROR,
+ (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+ errmsg("client sent proto_version=%d but server only supports protocol %d or lower",
+ data->protocol_version, LOGICALREP_PROTO_MAX_VERSION_NUM)));
+
+ if (data->protocol_version < LOGICALREP_PROTO_MIN_VERSION_NUM)
+ ereport(ERROR,
+ (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+ errmsg("client sent proto_version=%d but server only supports protocol %d or higher",
+ data->protocol_version, LOGICALREP_PROTO_MIN_VERSION_NUM)));
+
+ if (data->publication_names == NIL)
+ ereport(ERROR,
+ (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+ errmsg("publication_names parameter missing")));
+
+ /*
+ * Decide whether to enable streaming. It is disabled by default, in
+ * which case we just update the flag in decoding context. Otherwise
+ * we only allow it with sufficient version of the protocol, and when
+ * the output plugin supports it.
+ */
+ if (data->streaming == LOGICALREP_STREAM_OFF)
+ ctx->streaming = false;
+ else if (data->streaming == LOGICALREP_STREAM_ON &&
+ data->protocol_version < LOGICALREP_PROTO_STREAM_VERSION_NUM)
+ ereport(ERROR,
+ (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+ errmsg("requested proto_version=%d does not support streaming, need %d or higher",
+ data->protocol_version, LOGICALREP_PROTO_STREAM_VERSION_NUM)));
+ else if (data->streaming == LOGICALREP_STREAM_PARALLEL &&
+ data->protocol_version < LOGICALREP_PROTO_STREAM_PARALLEL_VERSION_NUM)
+ ereport(ERROR,
+ (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+ errmsg("requested proto_version=%d does not support parallel streaming, need %d or higher",
+ data->protocol_version, LOGICALREP_PROTO_STREAM_PARALLEL_VERSION_NUM)));
+ else if (!ctx->streaming)
+ ereport(ERROR,
+ (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+ errmsg("streaming requested, but not supported by output plugin")));
+
+ /* Also remember we're currently not streaming any transaction. */
+ in_streaming = false;
+
+ /*
+ * Here, we just check whether the two-phase option is passed by
+ * plugin and decide whether to enable it at later point of time. It
+ * remains enabled if the previous start-up has done so. But we only
+ * allow the option to be passed in with sufficient version of the
+ * protocol, and when the output plugin supports it.
+ */
+ if (!data->two_phase)
+ ctx->twophase_opt_given = false;
+ else if (data->protocol_version < LOGICALREP_PROTO_TWOPHASE_VERSION_NUM)
+ ereport(ERROR,
+ (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+ errmsg("requested proto_version=%d does not support two-phase commit, need %d or higher",
+ data->protocol_version, LOGICALREP_PROTO_TWOPHASE_VERSION_NUM)));
+ else if (!ctx->twophase)
+ ereport(ERROR,
+ (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+ errmsg("two-phase commit requested, but not supported by output plugin")));
+ else
+ ctx->twophase_opt_given = true;
+
+ /* Init publication state. */
+ data->publications = NIL;
+ publications_valid = false;
+
+ /*
+ * Register callback for pg_publication if we didn't already do that
+ * during some previous call in this process.
+ */
+ if (!publication_callback_registered)
+ {
+ CacheRegisterSyscacheCallback(PUBLICATIONOID,
+ publication_invalidation_cb,
+ (Datum) 0);
+ publication_callback_registered = true;
+ }
+
+ /* Initialize relation schema cache. */
+ init_rel_sync_cache(CacheMemoryContext);
+ }
+ else
+ {
+ /*
+ * Disable the streaming and prepared transactions during the slot
+ * initialization mode.
+ */
+ ctx->streaming = false;
+ ctx->twophase = false;
+ }
+}
+
+/*
+ * BEGIN callback.
+ *
+ * Don't send the BEGIN message here instead postpone it until the first
+ * change. In logical replication, a common scenario is to replicate a set of
+ * tables (instead of all tables) and transactions whose changes were on
+ * the table(s) that are not published will produce empty transactions. These
+ * empty transactions will send BEGIN and COMMIT messages to subscribers,
+ * using bandwidth on something with little/no use for logical replication.
+ */
+static void
+pgoutput_begin_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
+{
+ PGOutputTxnData *txndata = MemoryContextAllocZero(ctx->context,
+ sizeof(PGOutputTxnData));
+
+ txn->output_plugin_private = txndata;
+}
+
+/*
+ * Send BEGIN.
+ *
+ * This is called while processing the first change of the transaction.
+ */
+static void
+pgoutput_send_begin(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
+{
+ bool send_replication_origin = txn->origin_id != InvalidRepOriginId;
+ PGOutputTxnData *txndata = (PGOutputTxnData *) txn->output_plugin_private;
+
+ Assert(txndata);
+ Assert(!txndata->sent_begin_txn);
+
+ OutputPluginPrepareWrite(ctx, !send_replication_origin);
+ logicalrep_write_begin(ctx->out, txn);
+ txndata->sent_begin_txn = true;
+
+ send_repl_origin(ctx, txn->origin_id, txn->origin_lsn,
+ send_replication_origin);
+
+ OutputPluginWrite(ctx, true);
+}
+
+/*
+ * COMMIT callback
+ */
+static void
+pgoutput_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
+ XLogRecPtr commit_lsn)
+{
+ PGOutputTxnData *txndata = (PGOutputTxnData *) txn->output_plugin_private;
+ bool sent_begin_txn;
+
+ Assert(txndata);
+
+ /*
+ * We don't need to send the commit message unless some relevant change
+ * from this transaction has been sent to the downstream.
+ */
+ sent_begin_txn = txndata->sent_begin_txn;
+ OutputPluginUpdateProgress(ctx, !sent_begin_txn);
+ pfree(txndata);
+ txn->output_plugin_private = NULL;
+
+ if (!sent_begin_txn)
+ {
+ elog(DEBUG1, "skipped replication of an empty transaction with XID: %u", txn->xid);
+ return;
+ }
+
+ OutputPluginPrepareWrite(ctx, true);
+ logicalrep_write_commit(ctx->out, txn, commit_lsn);
+ OutputPluginWrite(ctx, true);
+}
+
+/*
+ * BEGIN PREPARE callback
+ */
+static void
+pgoutput_begin_prepare_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
+{
+ bool send_replication_origin = txn->origin_id != InvalidRepOriginId;
+
+ OutputPluginPrepareWrite(ctx, !send_replication_origin);
+ logicalrep_write_begin_prepare(ctx->out, txn);
+
+ send_repl_origin(ctx, txn->origin_id, txn->origin_lsn,
+ send_replication_origin);
+
+ OutputPluginWrite(ctx, true);
+}
+
+/*
+ * PREPARE callback
+ */
+static void
+pgoutput_prepare_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
+ XLogRecPtr prepare_lsn)
+{
+ OutputPluginUpdateProgress(ctx, false);
+
+ OutputPluginPrepareWrite(ctx, true);
+ logicalrep_write_prepare(ctx->out, txn, prepare_lsn);
+ OutputPluginWrite(ctx, true);
+}
+
+/*
+ * COMMIT PREPARED callback
+ */
+static void
+pgoutput_commit_prepared_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
+ XLogRecPtr commit_lsn)
+{
+ OutputPluginUpdateProgress(ctx, false);
+
+ OutputPluginPrepareWrite(ctx, true);
+ logicalrep_write_commit_prepared(ctx->out, txn, commit_lsn);
+ OutputPluginWrite(ctx, true);
+}
+
+/*
+ * ROLLBACK PREPARED callback
+ */
+static void
+pgoutput_rollback_prepared_txn(LogicalDecodingContext *ctx,
+ ReorderBufferTXN *txn,
+ XLogRecPtr prepare_end_lsn,
+ TimestampTz prepare_time)
+{
+ OutputPluginUpdateProgress(ctx, false);
+
+ OutputPluginPrepareWrite(ctx, true);
+ logicalrep_write_rollback_prepared(ctx->out, txn, prepare_end_lsn,
+ prepare_time);
+ OutputPluginWrite(ctx, true);
+}
+
+/*
+ * Write the current schema of the relation and its ancestor (if any) if not
+ * done yet.
+ */
+static void
+maybe_send_schema(LogicalDecodingContext *ctx,
+ ReorderBufferChange *change,
+ Relation relation, RelationSyncEntry *relentry)
+{
+ bool schema_sent;
+ TransactionId xid = InvalidTransactionId;
+ TransactionId topxid = InvalidTransactionId;
+
+ /*
+ * Remember XID of the (sub)transaction for the change. We don't care if
+ * it's top-level transaction or not (we have already sent that XID in
+ * start of the current streaming block).
+ *
+ * If we're not in a streaming block, just use InvalidTransactionId and
+ * the write methods will not include it.
+ */
+ if (in_streaming)
+ xid = change->txn->xid;
+
+ if (rbtxn_is_subtxn(change->txn))
+ topxid = rbtxn_get_toptxn(change->txn)->xid;
+ else
+ topxid = xid;
+
+ /*
+ * Do we need to send the schema? We do track streamed transactions
+ * separately, because those may be applied later (and the regular
+ * transactions won't see their effects until then) and in an order that
+ * we don't know at this point.
+ *
+ * XXX There is a scope of optimization here. Currently, we always send
+ * the schema first time in a streaming transaction but we can probably
+ * avoid that by checking 'relentry->schema_sent' flag. However, before
+ * doing that we need to study its impact on the case where we have a mix
+ * of streaming and non-streaming transactions.
+ */
+ if (in_streaming)
+ schema_sent = get_schema_sent_in_streamed_txn(relentry, topxid);
+ else
+ schema_sent = relentry->schema_sent;
+
+ /* Nothing to do if we already sent the schema. */
+ if (schema_sent)
+ return;
+
+ /*
+ * Send the schema. If the changes will be published using an ancestor's
+ * schema, not the relation's own, send that ancestor's schema before
+ * sending relation's own (XXX - maybe sending only the former suffices?).
+ */
+ if (relentry->publish_as_relid != RelationGetRelid(relation))
+ {
+ Relation ancestor = RelationIdGetRelation(relentry->publish_as_relid);
+
+ send_relation_and_attrs(ancestor, xid, ctx, relentry->columns);
+ RelationClose(ancestor);
+ }
+
+ send_relation_and_attrs(relation, xid, ctx, relentry->columns);
+
+ if (in_streaming)
+ set_schema_sent_in_streamed_txn(relentry, topxid);
+ else
+ relentry->schema_sent = true;
+}
+
+/*
+ * Sends a relation
+ */
+static void
+send_relation_and_attrs(Relation relation, TransactionId xid,
+ LogicalDecodingContext *ctx,
+ Bitmapset *columns)
+{
+ TupleDesc desc = RelationGetDescr(relation);
+ int i;
+
+ /*
+ * Write out type info if needed. We do that only for user-created types.
+ * We use FirstGenbkiObjectId as the cutoff, so that we only consider
+ * objects with hand-assigned OIDs to be "built in", not for instance any
+ * function or type defined in the information_schema. This is important
+ * because only hand-assigned OIDs can be expected to remain stable across
+ * major versions.
+ */
+ for (i = 0; i < desc->natts; i++)
+ {
+ Form_pg_attribute att = TupleDescAttr(desc, i);
+
+ if (att->attisdropped || att->attgenerated)
+ continue;
+
+ if (att->atttypid < FirstGenbkiObjectId)
+ continue;
+
+ /* Skip this attribute if it's not present in the column list */
+ if (columns != NULL && !bms_is_member(att->attnum, columns))
+ continue;
+
+ OutputPluginPrepareWrite(ctx, false);
+ logicalrep_write_typ(ctx->out, xid, att->atttypid);
+ OutputPluginWrite(ctx, false);
+ }
+
+ OutputPluginPrepareWrite(ctx, false);
+ logicalrep_write_rel(ctx->out, xid, relation, columns);
+ OutputPluginWrite(ctx, false);
+}
+
+/*
+ * Executor state preparation for evaluation of row filter expressions for the
+ * specified relation.
+ */
+static EState *
+create_estate_for_relation(Relation rel)
+{
+ EState *estate;
+ RangeTblEntry *rte;
+ List *perminfos = NIL;
+
+ estate = CreateExecutorState();
+
+ rte = makeNode(RangeTblEntry);
+ rte->rtekind = RTE_RELATION;
+ rte->relid = RelationGetRelid(rel);
+ rte->relkind = rel->rd_rel->relkind;
+ rte->rellockmode = AccessShareLock;
+
+ addRTEPermissionInfo(&perminfos, rte);
+
+ ExecInitRangeTable(estate, list_make1(rte), perminfos);
+
+ estate->es_output_cid = GetCurrentCommandId(false);
+
+ return estate;
+}
+
+/*
+ * Evaluates row filter.
+ *
+ * If the row filter evaluates to NULL, it is taken as false i.e. the change
+ * isn't replicated.
+ */
+static bool
+pgoutput_row_filter_exec_expr(ExprState *state, ExprContext *econtext)
+{
+ Datum ret;
+ bool isnull;
+
+ Assert(state != NULL);
+
+ ret = ExecEvalExprSwitchContext(state, econtext, &isnull);
+
+ elog(DEBUG3, "row filter evaluates to %s (isnull: %s)",
+ isnull ? "false" : DatumGetBool(ret) ? "true" : "false",
+ isnull ? "true" : "false");
+
+ if (isnull)
+ return false;
+
+ return DatumGetBool(ret);
+}
+
+/*
+ * Make sure the per-entry memory context exists.
+ */
+static void
+pgoutput_ensure_entry_cxt(PGOutputData *data, RelationSyncEntry *entry)
+{
+ Relation relation;
+
+ /* The context may already exist, in which case bail out. */
+ if (entry->entry_cxt)
+ return;
+
+ relation = RelationIdGetRelation(entry->publish_as_relid);
+
+ entry->entry_cxt = AllocSetContextCreate(data->cachectx,
+ "entry private context",
+ ALLOCSET_SMALL_SIZES);
+
+ MemoryContextCopyAndSetIdentifier(entry->entry_cxt,
+ RelationGetRelationName(relation));
+}
+
+/*
+ * Initialize the row filter.
+ */
+static void
+pgoutput_row_filter_init(PGOutputData *data, List *publications,
+ RelationSyncEntry *entry)
+{
+ ListCell *lc;
+ List *rfnodes[] = {NIL, NIL, NIL}; /* One per pubaction */
+ bool no_filter[] = {false, false, false}; /* One per pubaction */
+ MemoryContext oldctx;
+ int idx;
+ bool has_filter = true;
+ Oid schemaid = get_rel_namespace(entry->publish_as_relid);
+
+ /*
+ * Find if there are any row filters for this relation. If there are, then
+ * prepare the necessary ExprState and cache it in entry->exprstate. To
+ * build an expression state, we need to ensure the following:
+ *
+ * All the given publication-table mappings must be checked.
+ *
+ * Multiple publications might have multiple row filters for this
+ * relation. Since row filter usage depends on the DML operation, there
+ * are multiple lists (one for each operation) to which row filters will
+ * be appended.
+ *
+ * FOR ALL TABLES and FOR TABLES IN SCHEMA implies "don't use row filter
+ * expression" so it takes precedence.
+ */
+ foreach(lc, publications)
+ {
+ Publication *pub = lfirst(lc);
+ HeapTuple rftuple = NULL;
+ Datum rfdatum = 0;
+ bool pub_no_filter = true;
+
+ /*
+ * If the publication is FOR ALL TABLES, or the publication includes a
+ * FOR TABLES IN SCHEMA where the table belongs to the referred
+ * schema, then it is treated the same as if there are no row filters
+ * (even if other publications have a row filter).
+ */
+ if (!pub->alltables &&
+ !SearchSysCacheExists2(PUBLICATIONNAMESPACEMAP,
+ ObjectIdGetDatum(schemaid),
+ ObjectIdGetDatum(pub->oid)))
+ {
+ /*
+ * Check for the presence of a row filter in this publication.
+ */
+ rftuple = SearchSysCache2(PUBLICATIONRELMAP,
+ ObjectIdGetDatum(entry->publish_as_relid),
+ ObjectIdGetDatum(pub->oid));
+
+ if (HeapTupleIsValid(rftuple))
+ {
+ /* Null indicates no filter. */
+ rfdatum = SysCacheGetAttr(PUBLICATIONRELMAP, rftuple,
+ Anum_pg_publication_rel_prqual,
+ &pub_no_filter);
+ }
+ }
+
+ if (pub_no_filter)
+ {
+ if (rftuple)
+ ReleaseSysCache(rftuple);
+
+ no_filter[PUBACTION_INSERT] |= pub->pubactions.pubinsert;
+ no_filter[PUBACTION_UPDATE] |= pub->pubactions.pubupdate;
+ no_filter[PUBACTION_DELETE] |= pub->pubactions.pubdelete;
+
+ /*
+ * Quick exit if all the DML actions are publicized via this
+ * publication.
+ */
+ if (no_filter[PUBACTION_INSERT] &&
+ no_filter[PUBACTION_UPDATE] &&
+ no_filter[PUBACTION_DELETE])
+ {
+ has_filter = false;
+ break;
+ }
+
+ /* No additional work for this publication. Next one. */
+ continue;
+ }
+
+ /* Form the per pubaction row filter lists. */
+ if (pub->pubactions.pubinsert && !no_filter[PUBACTION_INSERT])
+ rfnodes[PUBACTION_INSERT] = lappend(rfnodes[PUBACTION_INSERT],
+ TextDatumGetCString(rfdatum));
+ if (pub->pubactions.pubupdate && !no_filter[PUBACTION_UPDATE])
+ rfnodes[PUBACTION_UPDATE] = lappend(rfnodes[PUBACTION_UPDATE],
+ TextDatumGetCString(rfdatum));
+ if (pub->pubactions.pubdelete && !no_filter[PUBACTION_DELETE])
+ rfnodes[PUBACTION_DELETE] = lappend(rfnodes[PUBACTION_DELETE],
+ TextDatumGetCString(rfdatum));
+
+ ReleaseSysCache(rftuple);
+ } /* loop all subscribed publications */
+
+ /* Clean the row filter */
+ for (idx = 0; idx < NUM_ROWFILTER_PUBACTIONS; idx++)
+ {
+ if (no_filter[idx])
+ {
+ list_free_deep(rfnodes[idx]);
+ rfnodes[idx] = NIL;
+ }
+ }
+
+ if (has_filter)
+ {
+ Relation relation = RelationIdGetRelation(entry->publish_as_relid);
+
+ pgoutput_ensure_entry_cxt(data, entry);
+
+ /*
+ * Now all the filters for all pubactions are known. Combine them when
+ * their pubactions are the same.
+ */
+ oldctx = MemoryContextSwitchTo(entry->entry_cxt);
+ entry->estate = create_estate_for_relation(relation);
+ for (idx = 0; idx < NUM_ROWFILTER_PUBACTIONS; idx++)
+ {
+ List *filters = NIL;
+ Expr *rfnode;
+
+ if (rfnodes[idx] == NIL)
+ continue;
+
+ foreach(lc, rfnodes[idx])
+ filters = lappend(filters, stringToNode((char *) lfirst(lc)));
+
+ /* combine the row filter and cache the ExprState */
+ rfnode = make_orclause(filters);
+ entry->exprstate[idx] = ExecPrepareExpr(rfnode, entry->estate);
+ } /* for each pubaction */
+ MemoryContextSwitchTo(oldctx);
+
+ RelationClose(relation);
+ }
+}
+
+/*
+ * Initialize the column list.
+ */
+static void
+pgoutput_column_list_init(PGOutputData *data, List *publications,
+ RelationSyncEntry *entry)
+{
+ ListCell *lc;
+ bool first = true;
+ Relation relation = RelationIdGetRelation(entry->publish_as_relid);
+
+ /*
+ * Find if there are any column lists for this relation. If there are,
+ * build a bitmap using the column lists.
+ *
+ * Multiple publications might have multiple column lists for this
+ * relation.
+ *
+ * Note that we don't support the case where the column list is different
+ * for the same table when combining publications. See comments atop
+ * fetch_table_list. But one can later change the publication so we still
+ * need to check all the given publication-table mappings and report an
+ * error if any publications have a different column list.
+ *
+ * FOR ALL TABLES and FOR TABLES IN SCHEMA imply "don't use column list".
+ */
+ foreach(lc, publications)
+ {
+ Publication *pub = lfirst(lc);
+ HeapTuple cftuple = NULL;
+ Datum cfdatum = 0;
+ Bitmapset *cols = NULL;
+
+ /*
+ * If the publication is FOR ALL TABLES then it is treated the same as
+ * if there are no column lists (even if other publications have a
+ * list).
+ */
+ if (!pub->alltables)
+ {
+ bool pub_no_list = true;
+
+ /*
+ * Check for the presence of a column list in this publication.
+ *
+ * Note: If we find no pg_publication_rel row, it's a publication
+ * defined for a whole schema, so it can't have a column list,
+ * just like a FOR ALL TABLES publication.
+ */
+ cftuple = SearchSysCache2(PUBLICATIONRELMAP,
+ ObjectIdGetDatum(entry->publish_as_relid),
+ ObjectIdGetDatum(pub->oid));
+
+ if (HeapTupleIsValid(cftuple))
+ {
+ /* Lookup the column list attribute. */
+ cfdatum = SysCacheGetAttr(PUBLICATIONRELMAP, cftuple,
+ Anum_pg_publication_rel_prattrs,
+ &pub_no_list);
+
+ /* Build the column list bitmap in the per-entry context. */
+ if (!pub_no_list) /* when not null */
+ {
+ int i;
+ int nliveatts = 0;
+ TupleDesc desc = RelationGetDescr(relation);
+
+ pgoutput_ensure_entry_cxt(data, entry);
+
+ cols = pub_collist_to_bitmapset(cols, cfdatum,
+ entry->entry_cxt);
+
+ /* Get the number of live attributes. */
+ for (i = 0; i < desc->natts; i++)
+ {
+ Form_pg_attribute att = TupleDescAttr(desc, i);
+
+ if (att->attisdropped || att->attgenerated)
+ continue;
+
+ nliveatts++;
+ }
+
+ /*
+ * If column list includes all the columns of the table,
+ * set it to NULL.
+ */
+ if (bms_num_members(cols) == nliveatts)
+ {
+ bms_free(cols);
+ cols = NULL;
+ }
+ }
+
+ ReleaseSysCache(cftuple);
+ }
+ }
+
+ if (first)
+ {
+ entry->columns = cols;
+ first = false;
+ }
+ else if (!bms_equal(entry->columns, cols))
+ ereport(ERROR,
+ errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+ errmsg("cannot use different column lists for table \"%s.%s\" in different publications",
+ get_namespace_name(RelationGetNamespace(relation)),
+ RelationGetRelationName(relation)));
+ } /* loop all subscribed publications */
+
+ RelationClose(relation);
+}
+
+/*
+ * Initialize the slot for storing new and old tuples, and build the map that
+ * will be used to convert the relation's tuples into the ancestor's format.
+ */
+static void
+init_tuple_slot(PGOutputData *data, Relation relation,
+ RelationSyncEntry *entry)
+{
+ MemoryContext oldctx;
+ TupleDesc oldtupdesc;
+ TupleDesc newtupdesc;
+
+ oldctx = MemoryContextSwitchTo(data->cachectx);
+
+ /*
+ * Create tuple table slots. Create a copy of the TupleDesc as it needs to
+ * live as long as the cache remains.
+ */
+ oldtupdesc = CreateTupleDescCopyConstr(RelationGetDescr(relation));
+ newtupdesc = CreateTupleDescCopyConstr(RelationGetDescr(relation));
+
+ entry->old_slot = MakeSingleTupleTableSlot(oldtupdesc, &TTSOpsHeapTuple);
+ entry->new_slot = MakeSingleTupleTableSlot(newtupdesc, &TTSOpsHeapTuple);
+
+ MemoryContextSwitchTo(oldctx);
+
+ /*
+ * Cache the map that will be used to convert the relation's tuples into
+ * the ancestor's format, if needed.
+ */
+ if (entry->publish_as_relid != RelationGetRelid(relation))
+ {
+ Relation ancestor = RelationIdGetRelation(entry->publish_as_relid);
+ TupleDesc indesc = RelationGetDescr(relation);
+ TupleDesc outdesc = RelationGetDescr(ancestor);
+
+ /* Map must live as long as the session does. */
+ oldctx = MemoryContextSwitchTo(CacheMemoryContext);
+
+ entry->attrmap = build_attrmap_by_name_if_req(indesc, outdesc, false);
+
+ MemoryContextSwitchTo(oldctx);
+ RelationClose(ancestor);
+ }
+}
+
+/*
+ * Change is checked against the row filter if any.
+ *
+ * Returns true if the change is to be replicated, else false.
+ *
+ * For inserts, evaluate the row filter for new tuple.
+ * For deletes, evaluate the row filter for old tuple.
+ * For updates, evaluate the row filter for old and new tuple.
+ *
+ * For updates, if both evaluations are true, we allow sending the UPDATE and
+ * if both the evaluations are false, it doesn't replicate the UPDATE. Now, if
+ * only one of the tuples matches the row filter expression, we transform
+ * UPDATE to DELETE or INSERT to avoid any data inconsistency based on the
+ * following rules:
+ *
+ * Case 1: old-row (no match) new-row (no match) -> (drop change)
+ * Case 2: old-row (no match) new row (match) -> INSERT
+ * Case 3: old-row (match) new-row (no match) -> DELETE
+ * Case 4: old-row (match) new row (match) -> UPDATE
+ *
+ * The new action is updated in the action parameter.
+ *
+ * The new slot could be updated when transforming the UPDATE into INSERT,
+ * because the original new tuple might not have column values from the replica
+ * identity.
+ *
+ * Examples:
+ * Let's say the old tuple satisfies the row filter but the new tuple doesn't.
+ * Since the old tuple satisfies, the initial table synchronization copied this
+ * row (or another method was used to guarantee that there is data
+ * consistency). However, after the UPDATE the new tuple doesn't satisfy the
+ * row filter, so from a data consistency perspective, that row should be
+ * removed on the subscriber. The UPDATE should be transformed into a DELETE
+ * statement and be sent to the subscriber. Keeping this row on the subscriber
+ * is undesirable because it doesn't reflect what was defined in the row filter
+ * expression on the publisher. This row on the subscriber would likely not be
+ * modified by replication again. If someone inserted a new row with the same
+ * old identifier, replication could stop due to a constraint violation.
+ *
+ * Let's say the old tuple doesn't match the row filter but the new tuple does.
+ * Since the old tuple doesn't satisfy, the initial table synchronization
+ * probably didn't copy this row. However, after the UPDATE the new tuple does
+ * satisfy the row filter, so from a data consistency perspective, that row
+ * should be inserted on the subscriber. Otherwise, subsequent UPDATE or DELETE
+ * statements have no effect (it matches no row -- see
+ * apply_handle_update_internal()). So, the UPDATE should be transformed into a
+ * INSERT statement and be sent to the subscriber. However, this might surprise
+ * someone who expects the data set to satisfy the row filter expression on the
+ * provider.
+ */
+static bool
+pgoutput_row_filter(Relation relation, TupleTableSlot *old_slot,
+ TupleTableSlot **new_slot_ptr, RelationSyncEntry *entry,
+ ReorderBufferChangeType *action)
+{
+ TupleDesc desc;
+ int i;
+ bool old_matched,
+ new_matched,
+ result;
+ TupleTableSlot *tmp_new_slot;
+ TupleTableSlot *new_slot = *new_slot_ptr;
+ ExprContext *ecxt;
+ ExprState *filter_exprstate;
+
+ /*
+ * We need this map to avoid relying on ReorderBufferChangeType enums
+ * having specific values.
+ */
+ static const int map_changetype_pubaction[] = {
+ [REORDER_BUFFER_CHANGE_INSERT] = PUBACTION_INSERT,
+ [REORDER_BUFFER_CHANGE_UPDATE] = PUBACTION_UPDATE,
+ [REORDER_BUFFER_CHANGE_DELETE] = PUBACTION_DELETE
+ };
+
+ Assert(*action == REORDER_BUFFER_CHANGE_INSERT ||
+ *action == REORDER_BUFFER_CHANGE_UPDATE ||
+ *action == REORDER_BUFFER_CHANGE_DELETE);
+
+ Assert(new_slot || old_slot);
+
+ /* Get the corresponding row filter */
+ filter_exprstate = entry->exprstate[map_changetype_pubaction[*action]];
+
+ /* Bail out if there is no row filter */
+ if (!filter_exprstate)
+ return true;
+
+ elog(DEBUG3, "table \"%s.%s\" has row filter",
+ get_namespace_name(RelationGetNamespace(relation)),
+ RelationGetRelationName(relation));
+
+ ResetPerTupleExprContext(entry->estate);
+
+ ecxt = GetPerTupleExprContext(entry->estate);
+
+ /*
+ * For the following occasions where there is only one tuple, we can
+ * evaluate the row filter for that tuple and return.
+ *
+ * For inserts, we only have the new tuple.
+ *
+ * For updates, we can have only a new tuple when none of the replica
+ * identity columns changed and none of those columns have external data
+ * but we still need to evaluate the row filter for the new tuple as the
+ * existing values of those columns might not match the filter. Also,
+ * users can use constant expressions in the row filter, so we anyway need
+ * to evaluate it for the new tuple.
+ *
+ * For deletes, we only have the old tuple.
+ */
+ if (!new_slot || !old_slot)
+ {
+ ecxt->ecxt_scantuple = new_slot ? new_slot : old_slot;
+ result = pgoutput_row_filter_exec_expr(filter_exprstate, ecxt);
+
+ return result;
+ }
+
+ /*
+ * Both the old and new tuples must be valid only for updates and need to
+ * be checked against the row filter.
+ */
+ Assert(map_changetype_pubaction[*action] == PUBACTION_UPDATE);
+
+ slot_getallattrs(new_slot);
+ slot_getallattrs(old_slot);
+
+ tmp_new_slot = NULL;
+ desc = RelationGetDescr(relation);
+
+ /*
+ * The new tuple might not have all the replica identity columns, in which
+ * case it needs to be copied over from the old tuple.
+ */
+ for (i = 0; i < desc->natts; i++)
+ {
+ Form_pg_attribute att = TupleDescAttr(desc, i);
+
+ /*
+ * if the column in the new tuple or old tuple is null, nothing to do
+ */
+ if (new_slot->tts_isnull[i] || old_slot->tts_isnull[i])
+ continue;
+
+ /*
+ * Unchanged toasted replica identity columns are only logged in the
+ * old tuple. Copy this over to the new tuple. The changed (or WAL
+ * Logged) toast values are always assembled in memory and set as
+ * VARTAG_INDIRECT. See ReorderBufferToastReplace.
+ */
+ if (att->attlen == -1 &&
+ VARATT_IS_EXTERNAL_ONDISK(new_slot->tts_values[i]) &&
+ !VARATT_IS_EXTERNAL_ONDISK(old_slot->tts_values[i]))
+ {
+ if (!tmp_new_slot)
+ {
+ tmp_new_slot = MakeSingleTupleTableSlot(desc, &TTSOpsVirtual);
+ ExecClearTuple(tmp_new_slot);
+
+ memcpy(tmp_new_slot->tts_values, new_slot->tts_values,
+ desc->natts * sizeof(Datum));
+ memcpy(tmp_new_slot->tts_isnull, new_slot->tts_isnull,
+ desc->natts * sizeof(bool));
+ }
+
+ tmp_new_slot->tts_values[i] = old_slot->tts_values[i];
+ tmp_new_slot->tts_isnull[i] = old_slot->tts_isnull[i];
+ }
+ }
+
+ ecxt->ecxt_scantuple = old_slot;
+ old_matched = pgoutput_row_filter_exec_expr(filter_exprstate, ecxt);
+
+ if (tmp_new_slot)
+ {
+ ExecStoreVirtualTuple(tmp_new_slot);
+ ecxt->ecxt_scantuple = tmp_new_slot;
+ }
+ else
+ ecxt->ecxt_scantuple = new_slot;
+
+ new_matched = pgoutput_row_filter_exec_expr(filter_exprstate, ecxt);
+
+ /*
+ * Case 1: if both tuples don't match the row filter, bailout. Send
+ * nothing.
+ */
+ if (!old_matched && !new_matched)
+ return false;
+
+ /*
+ * Case 2: if the old tuple doesn't satisfy the row filter but the new
+ * tuple does, transform the UPDATE into INSERT.
+ *
+ * Use the newly transformed tuple that must contain the column values for
+ * all the replica identity columns. This is required to ensure that the
+ * while inserting the tuple in the downstream node, we have all the
+ * required column values.
+ */
+ if (!old_matched && new_matched)
+ {
+ *action = REORDER_BUFFER_CHANGE_INSERT;
+
+ if (tmp_new_slot)
+ *new_slot_ptr = tmp_new_slot;
+ }
+
+ /*
+ * Case 3: if the old tuple satisfies the row filter but the new tuple
+ * doesn't, transform the UPDATE into DELETE.
+ *
+ * This transformation does not require another tuple. The Old tuple will
+ * be used for DELETE.
+ */
+ else if (old_matched && !new_matched)
+ *action = REORDER_BUFFER_CHANGE_DELETE;
+
+ /*
+ * Case 4: if both tuples match the row filter, transformation isn't
+ * required. (*action is default UPDATE).
+ */
+
+ return true;
+}
+
+/*
+ * Sends the decoded DML over wire.
+ *
+ * This is called both in streaming and non-streaming modes.
+ */
+static void
+pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
+ Relation relation, ReorderBufferChange *change)
+{
+ PGOutputData *data = (PGOutputData *) ctx->output_plugin_private;
+ PGOutputTxnData *txndata = (PGOutputTxnData *) txn->output_plugin_private;
+ MemoryContext old;
+ RelationSyncEntry *relentry;
+ TransactionId xid = InvalidTransactionId;
+ Relation ancestor = NULL;
+ Relation targetrel = relation;
+ ReorderBufferChangeType action = change->action;
+ TupleTableSlot *old_slot = NULL;
+ TupleTableSlot *new_slot = NULL;
+
+ if (!is_publishable_relation(relation))
+ return;
+
+ /*
+ * Remember the xid for the change in streaming mode. We need to send xid
+ * with each change in the streaming mode so that subscriber can make
+ * their association and on aborts, it can discard the corresponding
+ * changes.
+ */
+ if (in_streaming)
+ xid = change->txn->xid;
+
+ relentry = get_rel_sync_entry(data, relation);
+
+ /* First check the table filter */
+ switch (action)
+ {
+ case REORDER_BUFFER_CHANGE_INSERT:
+ if (!relentry->pubactions.pubinsert)
+ return;
+ break;
+ case REORDER_BUFFER_CHANGE_UPDATE:
+ if (!relentry->pubactions.pubupdate)
+ return;
+ break;
+ case REORDER_BUFFER_CHANGE_DELETE:
+ if (!relentry->pubactions.pubdelete)
+ return;
+
+ /*
+ * This is only possible if deletes are allowed even when replica
+ * identity is not defined for a table. Since the DELETE action
+ * can't be published, we simply return.
+ */
+ if (!change->data.tp.oldtuple)
+ {
+ elog(DEBUG1, "didn't send DELETE change because of missing oldtuple");
+ return;
+ }
+ break;
+ default:
+ Assert(false);
+ }
+
+ /* Avoid leaking memory by using and resetting our own context */
+ old = MemoryContextSwitchTo(data->context);
+
+ /* Switch relation if publishing via root. */
+ if (relentry->publish_as_relid != RelationGetRelid(relation))
+ {
+ Assert(relation->rd_rel->relispartition);
+ ancestor = RelationIdGetRelation(relentry->publish_as_relid);
+ targetrel = ancestor;
+ }
+
+ if (change->data.tp.oldtuple)
+ {
+ old_slot = relentry->old_slot;
+ ExecStoreHeapTuple(&change->data.tp.oldtuple->tuple, old_slot, false);
+
+ /* Convert tuple if needed. */
+ if (relentry->attrmap)
+ {
+ TupleTableSlot *slot = MakeTupleTableSlot(RelationGetDescr(targetrel),
+ &TTSOpsVirtual);
+
+ old_slot = execute_attr_map_slot(relentry->attrmap, old_slot, slot);
+ }
+ }
+
+ if (change->data.tp.newtuple)
+ {
+ new_slot = relentry->new_slot;
+ ExecStoreHeapTuple(&change->data.tp.newtuple->tuple, new_slot, false);
+
+ /* Convert tuple if needed. */
+ if (relentry->attrmap)
+ {
+ TupleTableSlot *slot = MakeTupleTableSlot(RelationGetDescr(targetrel),
+ &TTSOpsVirtual);
+
+ new_slot = execute_attr_map_slot(relentry->attrmap, new_slot, slot);
+ }
+ }
+
+ /*
+ * Check row filter.
+ *
+ * Updates could be transformed to inserts or deletes based on the results
+ * of the row filter for old and new tuple.
+ */
+ if (!pgoutput_row_filter(targetrel, old_slot, &new_slot, relentry, &action))
+ goto cleanup;
+
+ /*
+ * Send BEGIN if we haven't yet.
+ *
+ * We send the BEGIN message after ensuring that we will actually send the
+ * change. This avoids sending a pair of BEGIN/COMMIT messages for empty
+ * transactions.
+ */
+ if (txndata && !txndata->sent_begin_txn)
+ pgoutput_send_begin(ctx, txn);
+
+ /*
+ * Schema should be sent using the original relation because it also sends
+ * the ancestor's relation.
+ */
+ maybe_send_schema(ctx, change, relation, relentry);
+
+ OutputPluginPrepareWrite(ctx, true);
+
+ /* Send the data */
+ switch (action)
+ {
+ case REORDER_BUFFER_CHANGE_INSERT:
+ logicalrep_write_insert(ctx->out, xid, targetrel, new_slot,
+ data->binary, relentry->columns);
+ break;
+ case REORDER_BUFFER_CHANGE_UPDATE:
+ logicalrep_write_update(ctx->out, xid, targetrel, old_slot,
+ new_slot, data->binary, relentry->columns);
+ break;
+ case REORDER_BUFFER_CHANGE_DELETE:
+ logicalrep_write_delete(ctx->out, xid, targetrel, old_slot,
+ data->binary, relentry->columns);
+ break;
+ default:
+ Assert(false);
+ }
+
+ OutputPluginWrite(ctx, true);
+
+cleanup:
+ if (RelationIsValid(ancestor))
+ {
+ RelationClose(ancestor);
+ ancestor = NULL;
+ }
+
+ MemoryContextSwitchTo(old);
+ MemoryContextReset(data->context);
+}
+
+static void
+pgoutput_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
+ int nrelations, Relation relations[], ReorderBufferChange *change)
+{
+ PGOutputData *data = (PGOutputData *) ctx->output_plugin_private;
+ PGOutputTxnData *txndata = (PGOutputTxnData *) txn->output_plugin_private;
+ MemoryContext old;
+ RelationSyncEntry *relentry;
+ int i;
+ int nrelids;
+ Oid *relids;
+ TransactionId xid = InvalidTransactionId;
+
+ /* Remember the xid for the change in streaming mode. See pgoutput_change. */
+ if (in_streaming)
+ xid = change->txn->xid;
+
+ old = MemoryContextSwitchTo(data->context);
+
+ relids = palloc0(nrelations * sizeof(Oid));
+ nrelids = 0;
+
+ for (i = 0; i < nrelations; i++)
+ {
+ Relation relation = relations[i];
+ Oid relid = RelationGetRelid(relation);
+
+ if (!is_publishable_relation(relation))
+ continue;
+
+ relentry = get_rel_sync_entry(data, relation);
+
+ if (!relentry->pubactions.pubtruncate)
+ continue;
+
+ /*
+ * Don't send partitions if the publication wants to send only the
+ * root tables through it.
+ */
+ if (relation->rd_rel->relispartition &&
+ relentry->publish_as_relid != relid)
+ continue;
+
+ relids[nrelids++] = relid;
+
+ /* Send BEGIN if we haven't yet */
+ if (txndata && !txndata->sent_begin_txn)
+ pgoutput_send_begin(ctx, txn);
+
+ maybe_send_schema(ctx, change, relation, relentry);
+ }
+
+ if (nrelids > 0)
+ {
+ OutputPluginPrepareWrite(ctx, true);
+ logicalrep_write_truncate(ctx->out,
+ xid,
+ nrelids,
+ relids,
+ change->data.truncate.cascade,
+ change->data.truncate.restart_seqs);
+ OutputPluginWrite(ctx, true);
+ }
+
+ MemoryContextSwitchTo(old);
+ MemoryContextReset(data->context);
+}
+
+static void
+pgoutput_message(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
+ XLogRecPtr message_lsn, bool transactional, const char *prefix, Size sz,
+ const char *message)
+{
+ PGOutputData *data = (PGOutputData *) ctx->output_plugin_private;
+ TransactionId xid = InvalidTransactionId;
+
+ if (!data->messages)
+ return;
+
+ /*
+ * Remember the xid for the message in streaming mode. See
+ * pgoutput_change.
+ */
+ if (in_streaming)
+ xid = txn->xid;
+
+ /*
+ * Output BEGIN if we haven't yet. Avoid for non-transactional messages.
+ */
+ if (transactional)
+ {
+ PGOutputTxnData *txndata = (PGOutputTxnData *) txn->output_plugin_private;
+
+ /* Send BEGIN if we haven't yet */
+ if (txndata && !txndata->sent_begin_txn)
+ pgoutput_send_begin(ctx, txn);
+ }
+
+ OutputPluginPrepareWrite(ctx, true);
+ logicalrep_write_message(ctx->out,
+ xid,
+ message_lsn,
+ transactional,
+ prefix,
+ sz,
+ message);
+ OutputPluginWrite(ctx, true);
+}
+
+/*
+ * Return true if the data is associated with an origin and the user has
+ * requested the changes that don't have an origin, false otherwise.
+ */
+static bool
+pgoutput_origin_filter(LogicalDecodingContext *ctx,
+ RepOriginId origin_id)
+{
+ PGOutputData *data = (PGOutputData *) ctx->output_plugin_private;
+
+ if (data->origin && (pg_strcasecmp(data->origin, LOGICALREP_ORIGIN_NONE) == 0) &&
+ origin_id != InvalidRepOriginId)
+ return true;
+
+ return false;
+}
+
+/*
+ * Shutdown the output plugin.
+ *
+ * Note, we don't need to clean the data->context and data->cachectx as
+ * they are child contexts of the ctx->context so they will be cleaned up by
+ * logical decoding machinery.
+ */
+static void
+pgoutput_shutdown(LogicalDecodingContext *ctx)
+{
+ if (RelationSyncCache)
+ {
+ hash_destroy(RelationSyncCache);
+ RelationSyncCache = NULL;
+ }
+}
+
+/*
+ * Load publications from the list of publication names.
+ */
+static List *
+LoadPublications(List *pubnames)
+{
+ List *result = NIL;
+ ListCell *lc;
+
+ foreach(lc, pubnames)
+ {
+ char *pubname = (char *) lfirst(lc);
+ Publication *pub = GetPublicationByName(pubname, false);
+
+ result = lappend(result, pub);
+ }
+
+ return result;
+}
+
+/*
+ * Publication syscache invalidation callback.
+ *
+ * Called for invalidations on pg_publication.
+ */
+static void
+publication_invalidation_cb(Datum arg, int cacheid, uint32 hashvalue)
+{
+ publications_valid = false;
+
+ /*
+ * Also invalidate per-relation cache so that next time the filtering info
+ * is checked it will be updated with the new publication settings.
+ */
+ rel_sync_cache_publication_cb(arg, cacheid, hashvalue);
+}
+
+/*
+ * START STREAM callback
+ */
+static void
+pgoutput_stream_start(struct LogicalDecodingContext *ctx,
+ ReorderBufferTXN *txn)
+{
+ bool send_replication_origin = txn->origin_id != InvalidRepOriginId;
+
+ /* we can't nest streaming of transactions */
+ Assert(!in_streaming);
+
+ /*
+ * If we already sent the first stream for this transaction then don't
+ * send the origin id in the subsequent streams.
+ */
+ if (rbtxn_is_streamed(txn))
+ send_replication_origin = false;
+
+ OutputPluginPrepareWrite(ctx, !send_replication_origin);
+ logicalrep_write_stream_start(ctx->out, txn->xid, !rbtxn_is_streamed(txn));
+
+ send_repl_origin(ctx, txn->origin_id, InvalidXLogRecPtr,
+ send_replication_origin);
+
+ OutputPluginWrite(ctx, true);
+
+ /* we're streaming a chunk of transaction now */
+ in_streaming = true;
+}
+
+/*
+ * STOP STREAM callback
+ */
+static void
+pgoutput_stream_stop(struct LogicalDecodingContext *ctx,
+ ReorderBufferTXN *txn)
+{
+ /* we should be streaming a transaction */
+ Assert(in_streaming);
+
+ OutputPluginPrepareWrite(ctx, true);
+ logicalrep_write_stream_stop(ctx->out);
+ OutputPluginWrite(ctx, true);
+
+ /* we've stopped streaming a transaction */
+ in_streaming = false;
+}
+
+/*
+ * Notify downstream to discard the streamed transaction (along with all
+ * it's subtransactions, if it's a toplevel transaction).
+ */
+static void
+pgoutput_stream_abort(struct LogicalDecodingContext *ctx,
+ ReorderBufferTXN *txn,
+ XLogRecPtr abort_lsn)
+{
+ ReorderBufferTXN *toptxn;
+ PGOutputData *data = (PGOutputData *) ctx->output_plugin_private;
+ bool write_abort_info = (data->streaming == LOGICALREP_STREAM_PARALLEL);
+
+ /*
+ * The abort should happen outside streaming block, even for streamed
+ * transactions. The transaction has to be marked as streamed, though.
+ */
+ Assert(!in_streaming);
+
+ /* determine the toplevel transaction */
+ toptxn = rbtxn_get_toptxn(txn);
+
+ Assert(rbtxn_is_streamed(toptxn));
+
+ OutputPluginPrepareWrite(ctx, true);
+ logicalrep_write_stream_abort(ctx->out, toptxn->xid, txn->xid, abort_lsn,
+ txn->xact_time.abort_time, write_abort_info);
+
+ OutputPluginWrite(ctx, true);
+
+ cleanup_rel_sync_cache(toptxn->xid, false);
+}
+
+/*
+ * Notify downstream to apply the streamed transaction (along with all
+ * it's subtransactions).
+ */
+static void
+pgoutput_stream_commit(struct LogicalDecodingContext *ctx,
+ ReorderBufferTXN *txn,
+ XLogRecPtr commit_lsn)
+{
+ /*
+ * The commit should happen outside streaming block, even for streamed
+ * transactions. The transaction has to be marked as streamed, though.
+ */
+ Assert(!in_streaming);
+ Assert(rbtxn_is_streamed(txn));
+
+ OutputPluginUpdateProgress(ctx, false);
+
+ OutputPluginPrepareWrite(ctx, true);
+ logicalrep_write_stream_commit(ctx->out, txn, commit_lsn);
+ OutputPluginWrite(ctx, true);
+
+ cleanup_rel_sync_cache(txn->xid, true);
+}
+
+/*
+ * PREPARE callback (for streaming two-phase commit).
+ *
+ * Notify the downstream to prepare the transaction.
+ */
+static void
+pgoutput_stream_prepare_txn(LogicalDecodingContext *ctx,
+ ReorderBufferTXN *txn,
+ XLogRecPtr prepare_lsn)
+{
+ Assert(rbtxn_is_streamed(txn));
+
+ OutputPluginUpdateProgress(ctx, false);
+ OutputPluginPrepareWrite(ctx, true);
+ logicalrep_write_stream_prepare(ctx->out, txn, prepare_lsn);
+ OutputPluginWrite(ctx, true);
+}
+
+/*
+ * Initialize the relation schema sync cache for a decoding session.
+ *
+ * The hash table is destroyed at the end of a decoding session. While
+ * relcache invalidations still exist and will still be invoked, they
+ * will just see the null hash table global and take no action.
+ */
+static void
+init_rel_sync_cache(MemoryContext cachectx)
+{
+ HASHCTL ctl;
+ static bool relation_callbacks_registered = false;
+
+ /* Nothing to do if hash table already exists */
+ if (RelationSyncCache != NULL)
+ return;
+
+ /* Make a new hash table for the cache */
+ ctl.keysize = sizeof(Oid);
+ ctl.entrysize = sizeof(RelationSyncEntry);
+ ctl.hcxt = cachectx;
+
+ RelationSyncCache = hash_create("logical replication output relation cache",
+ 128, &ctl,
+ HASH_ELEM | HASH_CONTEXT | HASH_BLOBS);
+
+ Assert(RelationSyncCache != NULL);
+
+ /* No more to do if we already registered callbacks */
+ if (relation_callbacks_registered)
+ return;
+
+ /* We must update the cache entry for a relation after a relcache flush */
+ CacheRegisterRelcacheCallback(rel_sync_cache_relation_cb, (Datum) 0);
+
+ /*
+ * Flush all cache entries after a pg_namespace change, in case it was a
+ * schema rename affecting a relation being replicated.
+ */
+ CacheRegisterSyscacheCallback(NAMESPACEOID,
+ rel_sync_cache_publication_cb,
+ (Datum) 0);
+
+ /*
+ * Flush all cache entries after any publication changes. (We need no
+ * callback entry for pg_publication, because publication_invalidation_cb
+ * will take care of it.)
+ */
+ CacheRegisterSyscacheCallback(PUBLICATIONRELMAP,
+ rel_sync_cache_publication_cb,
+ (Datum) 0);
+ CacheRegisterSyscacheCallback(PUBLICATIONNAMESPACEMAP,
+ rel_sync_cache_publication_cb,
+ (Datum) 0);
+
+ relation_callbacks_registered = true;
+}
+
+/*
+ * We expect relatively small number of streamed transactions.
+ */
+static bool
+get_schema_sent_in_streamed_txn(RelationSyncEntry *entry, TransactionId xid)
+{
+ return list_member_xid(entry->streamed_txns, xid);
+}
+
+/*
+ * Add the xid in the rel sync entry for which we have already sent the schema
+ * of the relation.
+ */
+static void
+set_schema_sent_in_streamed_txn(RelationSyncEntry *entry, TransactionId xid)
+{
+ MemoryContext oldctx;
+
+ oldctx = MemoryContextSwitchTo(CacheMemoryContext);
+
+ entry->streamed_txns = lappend_xid(entry->streamed_txns, xid);
+
+ MemoryContextSwitchTo(oldctx);
+}
+
+/*
+ * Find or create entry in the relation schema cache.
+ *
+ * This looks up publications that the given relation is directly or
+ * indirectly part of (the latter if it's really the relation's ancestor that
+ * is part of a publication) and fills up the found entry with the information
+ * about which operations to publish and whether to use an ancestor's schema
+ * when publishing.
+ */
+static RelationSyncEntry *
+get_rel_sync_entry(PGOutputData *data, Relation relation)
+{
+ RelationSyncEntry *entry;
+ bool found;
+ MemoryContext oldctx;
+ Oid relid = RelationGetRelid(relation);
+
+ Assert(RelationSyncCache != NULL);
+
+ /* Find cached relation info, creating if not found */
+ entry = (RelationSyncEntry *) hash_search(RelationSyncCache,
+ &relid,
+ HASH_ENTER, &found);
+ Assert(entry != NULL);
+
+ /* initialize entry, if it's new */
+ if (!found)
+ {
+ entry->replicate_valid = false;
+ entry->schema_sent = false;
+ entry->streamed_txns = NIL;
+ entry->pubactions.pubinsert = entry->pubactions.pubupdate =
+ entry->pubactions.pubdelete = entry->pubactions.pubtruncate = false;
+ entry->new_slot = NULL;
+ entry->old_slot = NULL;
+ memset(entry->exprstate, 0, sizeof(entry->exprstate));
+ entry->entry_cxt = NULL;
+ entry->publish_as_relid = InvalidOid;
+ entry->columns = NULL;
+ entry->attrmap = NULL;
+ }
+
+ /* Validate the entry */
+ if (!entry->replicate_valid)
+ {
+ Oid schemaId = get_rel_namespace(relid);
+ List *pubids = GetRelationPublications(relid);
+
+ /*
+ * We don't acquire a lock on the namespace system table as we build
+ * the cache entry using a historic snapshot and all the later changes
+ * are absorbed while decoding WAL.
+ */
+ List *schemaPubids = GetSchemaPublications(schemaId);
+ ListCell *lc;
+ Oid publish_as_relid = relid;
+ int publish_ancestor_level = 0;
+ bool am_partition = get_rel_relispartition(relid);
+ char relkind = get_rel_relkind(relid);
+ List *rel_publications = NIL;
+
+ /* Reload publications if needed before use. */
+ if (!publications_valid)
+ {
+ oldctx = MemoryContextSwitchTo(CacheMemoryContext);
+ if (data->publications)
+ {
+ list_free_deep(data->publications);
+ data->publications = NIL;
+ }
+ data->publications = LoadPublications(data->publication_names);
+ MemoryContextSwitchTo(oldctx);
+ publications_valid = true;
+ }
+
+ /*
+ * Reset schema_sent status as the relation definition may have
+ * changed. Also reset pubactions to empty in case rel was dropped
+ * from a publication. Also free any objects that depended on the
+ * earlier definition.
+ */
+ entry->schema_sent = false;
+ list_free(entry->streamed_txns);
+ entry->streamed_txns = NIL;
+ bms_free(entry->columns);
+ entry->columns = NULL;
+ entry->pubactions.pubinsert = false;
+ entry->pubactions.pubupdate = false;
+ entry->pubactions.pubdelete = false;
+ entry->pubactions.pubtruncate = false;
+
+ /*
+ * Tuple slots cleanups. (Will be rebuilt later if needed).
+ */
+ if (entry->old_slot)
+ ExecDropSingleTupleTableSlot(entry->old_slot);
+ if (entry->new_slot)
+ ExecDropSingleTupleTableSlot(entry->new_slot);
+
+ entry->old_slot = NULL;
+ entry->new_slot = NULL;
+
+ if (entry->attrmap)
+ free_attrmap(entry->attrmap);
+ entry->attrmap = NULL;
+
+ /*
+ * Row filter cache cleanups.
+ */
+ if (entry->entry_cxt)
+ MemoryContextDelete(entry->entry_cxt);
+
+ entry->entry_cxt = NULL;
+ entry->estate = NULL;
+ memset(entry->exprstate, 0, sizeof(entry->exprstate));
+
+ /*
+ * Build publication cache. We can't use one provided by relcache as
+ * relcache considers all publications that the given relation is in,
+ * but here we only need to consider ones that the subscriber
+ * requested.
+ */
+ foreach(lc, data->publications)
+ {
+ Publication *pub = lfirst(lc);
+ bool publish = false;
+
+ /*
+ * Under what relid should we publish changes in this publication?
+ * We'll use the top-most relid across all publications. Also
+ * track the ancestor level for this publication.
+ */
+ Oid pub_relid = relid;
+ int ancestor_level = 0;
+
+ /*
+ * If this is a FOR ALL TABLES publication, pick the partition
+ * root and set the ancestor level accordingly.
+ */
+ if (pub->alltables)
+ {
+ publish = true;
+ if (pub->pubviaroot && am_partition)
+ {
+ List *ancestors = get_partition_ancestors(relid);
+
+ pub_relid = llast_oid(ancestors);
+ ancestor_level = list_length(ancestors);
+ }
+ }
+
+ if (!publish)
+ {
+ bool ancestor_published = false;
+
+ /*
+ * For a partition, check if any of the ancestors are
+ * published. If so, note down the topmost ancestor that is
+ * published via this publication, which will be used as the
+ * relation via which to publish the partition's changes.
+ */
+ if (am_partition)
+ {
+ Oid ancestor;
+ int level;
+ List *ancestors = get_partition_ancestors(relid);
+
+ ancestor = GetTopMostAncestorInPublication(pub->oid,
+ ancestors,
+ &level);
+
+ if (ancestor != InvalidOid)
+ {
+ ancestor_published = true;
+ if (pub->pubviaroot)
+ {
+ pub_relid = ancestor;
+ ancestor_level = level;
+ }
+ }
+ }
+
+ if (list_member_oid(pubids, pub->oid) ||
+ list_member_oid(schemaPubids, pub->oid) ||
+ ancestor_published)
+ publish = true;
+ }
+
+ /*
+ * If the relation is to be published, determine actions to
+ * publish, and list of columns, if appropriate.
+ *
+ * Don't publish changes for partitioned tables, because
+ * publishing those of its partitions suffices, unless partition
+ * changes won't be published due to pubviaroot being set.
+ */
+ if (publish &&
+ (relkind != RELKIND_PARTITIONED_TABLE || pub->pubviaroot))
+ {
+ entry->pubactions.pubinsert |= pub->pubactions.pubinsert;
+ entry->pubactions.pubupdate |= pub->pubactions.pubupdate;
+ entry->pubactions.pubdelete |= pub->pubactions.pubdelete;
+ entry->pubactions.pubtruncate |= pub->pubactions.pubtruncate;
+
+ /*
+ * We want to publish the changes as the top-most ancestor
+ * across all publications. So we need to check if the already
+ * calculated level is higher than the new one. If yes, we can
+ * ignore the new value (as it's a child). Otherwise the new
+ * value is an ancestor, so we keep it.
+ */
+ if (publish_ancestor_level > ancestor_level)
+ continue;
+
+ /*
+ * If we found an ancestor higher up in the tree, discard the
+ * list of publications through which we replicate it, and use
+ * the new ancestor.
+ */
+ if (publish_ancestor_level < ancestor_level)
+ {
+ publish_as_relid = pub_relid;
+ publish_ancestor_level = ancestor_level;
+
+ /* reset the publication list for this relation */
+ rel_publications = NIL;
+ }
+ else
+ {
+ /* Same ancestor level, has to be the same OID. */
+ Assert(publish_as_relid == pub_relid);
+ }
+
+ /* Track publications for this ancestor. */
+ rel_publications = lappend(rel_publications, pub);
+ }
+ }
+
+ entry->publish_as_relid = publish_as_relid;
+
+ /*
+ * Initialize the tuple slot, map, and row filter. These are only used
+ * when publishing inserts, updates, or deletes.
+ */
+ if (entry->pubactions.pubinsert || entry->pubactions.pubupdate ||
+ entry->pubactions.pubdelete)
+ {
+ /* Initialize the tuple slot and map */
+ init_tuple_slot(data, relation, entry);
+
+ /* Initialize the row filter */
+ pgoutput_row_filter_init(data, rel_publications, entry);
+
+ /* Initialize the column list */
+ pgoutput_column_list_init(data, rel_publications, entry);
+ }
+
+ list_free(pubids);
+ list_free(schemaPubids);
+ list_free(rel_publications);
+
+ entry->replicate_valid = true;
+ }
+
+ return entry;
+}
+
+/*
+ * Cleanup list of streamed transactions and update the schema_sent flag.
+ *
+ * When a streamed transaction commits or aborts, we need to remove the
+ * toplevel XID from the schema cache. If the transaction aborted, the
+ * subscriber will simply throw away the schema records we streamed, so
+ * we don't need to do anything else.
+ *
+ * If the transaction is committed, the subscriber will update the relation
+ * cache - so tweak the schema_sent flag accordingly.
+ */
+static void
+cleanup_rel_sync_cache(TransactionId xid, bool is_commit)
+{
+ HASH_SEQ_STATUS hash_seq;
+ RelationSyncEntry *entry;
+ ListCell *lc;
+
+ Assert(RelationSyncCache != NULL);
+
+ hash_seq_init(&hash_seq, RelationSyncCache);
+ while ((entry = hash_seq_search(&hash_seq)) != NULL)
+ {
+ /*
+ * We can set the schema_sent flag for an entry that has committed xid
+ * in the list as that ensures that the subscriber would have the
+ * corresponding schema and we don't need to send it unless there is
+ * any invalidation for that relation.
+ */
+ foreach(lc, entry->streamed_txns)
+ {
+ if (xid == lfirst_xid(lc))
+ {
+ if (is_commit)
+ entry->schema_sent = true;
+
+ entry->streamed_txns =
+ foreach_delete_current(entry->streamed_txns, lc);
+ break;
+ }
+ }
+ }
+}
+
+/*
+ * Relcache invalidation callback
+ */
+static void
+rel_sync_cache_relation_cb(Datum arg, Oid relid)
+{
+ RelationSyncEntry *entry;
+
+ /*
+ * We can get here if the plugin was used in SQL interface as the
+ * RelSchemaSyncCache is destroyed when the decoding finishes, but there
+ * is no way to unregister the relcache invalidation callback.
+ */
+ if (RelationSyncCache == NULL)
+ return;
+
+ /*
+ * Nobody keeps pointers to entries in this hash table around outside
+ * logical decoding callback calls - but invalidation events can come in
+ * *during* a callback if we do any syscache access in the callback.
+ * Because of that we must mark the cache entry as invalid but not damage
+ * any of its substructure here. The next get_rel_sync_entry() call will
+ * rebuild it all.
+ */
+ if (OidIsValid(relid))
+ {
+ /*
+ * Getting invalidations for relations that aren't in the table is
+ * entirely normal. So we don't care if it's found or not.
+ */
+ entry = (RelationSyncEntry *) hash_search(RelationSyncCache, &relid,
+ HASH_FIND, NULL);
+ if (entry != NULL)
+ entry->replicate_valid = false;
+ }
+ else
+ {
+ /* Whole cache must be flushed. */
+ HASH_SEQ_STATUS status;
+
+ hash_seq_init(&status, RelationSyncCache);
+ while ((entry = (RelationSyncEntry *) hash_seq_search(&status)) != NULL)
+ {
+ entry->replicate_valid = false;
+ }
+ }
+}
+
+/*
+ * Publication relation/schema map syscache invalidation callback
+ *
+ * Called for invalidations on pg_publication, pg_publication_rel,
+ * pg_publication_namespace, and pg_namespace.
+ */
+static void
+rel_sync_cache_publication_cb(Datum arg, int cacheid, uint32 hashvalue)
+{
+ HASH_SEQ_STATUS status;
+ RelationSyncEntry *entry;
+
+ /*
+ * We can get here if the plugin was used in SQL interface as the
+ * RelSchemaSyncCache is destroyed when the decoding finishes, but there
+ * is no way to unregister the invalidation callbacks.
+ */
+ if (RelationSyncCache == NULL)
+ return;
+
+ /*
+ * We have no easy way to identify which cache entries this invalidation
+ * event might have affected, so just mark them all invalid.
+ */
+ hash_seq_init(&status, RelationSyncCache);
+ while ((entry = (RelationSyncEntry *) hash_seq_search(&status)) != NULL)
+ {
+ entry->replicate_valid = false;
+ }
+}
+
+/* Send Replication origin */
+static void
+send_repl_origin(LogicalDecodingContext *ctx, RepOriginId origin_id,
+ XLogRecPtr origin_lsn, bool send_origin)
+{
+ if (send_origin)
+ {
+ char *origin;
+
+ /*----------
+ * XXX: which behaviour do we want here?
+ *
+ * Alternatives:
+ * - don't send origin message if origin name not found
+ * (that's what we do now)
+ * - throw error - that will break replication, not good
+ * - send some special "unknown" origin
+ *----------
+ */
+ if (replorigin_by_oid(origin_id, true, &origin))
+ {
+ /* Message boundary */
+ OutputPluginWrite(ctx, false);
+ OutputPluginPrepareWrite(ctx, true);
+
+ logicalrep_write_origin(ctx->out, origin, origin_lsn);
+ }
+ }
+}