diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-05-04 12:15:05 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-05-04 12:15:05 +0000 |
commit | 46651ce6fe013220ed397add242004d764fc0153 (patch) | |
tree | 6e5299f990f88e60174a1d3ae6e48eedd2688b2b /src/backend/replication/pgoutput/pgoutput.c | |
parent | Initial commit. (diff) | |
download | postgresql-14-46651ce6fe013220ed397add242004d764fc0153.tar.xz postgresql-14-46651ce6fe013220ed397add242004d764fc0153.zip |
Adding upstream version 14.5.upstream/14.5upstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'src/backend/replication/pgoutput/pgoutput.c')
-rw-r--r-- | src/backend/replication/pgoutput/pgoutput.c | 1346 |
1 files changed, 1346 insertions, 0 deletions
diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c new file mode 100644 index 0000000..ff9cf5d --- /dev/null +++ b/src/backend/replication/pgoutput/pgoutput.c @@ -0,0 +1,1346 @@ +/*------------------------------------------------------------------------- + * + * pgoutput.c + * Logical Replication output plugin + * + * Copyright (c) 2012-2021, PostgreSQL Global Development Group + * + * IDENTIFICATION + * src/backend/replication/pgoutput/pgoutput.c + * + *------------------------------------------------------------------------- + */ +#include "postgres.h" + +#include "access/tupconvert.h" +#include "catalog/partition.h" +#include "catalog/pg_publication.h" +#include "commands/defrem.h" +#include "fmgr.h" +#include "replication/logical.h" +#include "replication/logicalproto.h" +#include "replication/origin.h" +#include "replication/pgoutput.h" +#include "utils/int8.h" +#include "utils/inval.h" +#include "utils/lsyscache.h" +#include "utils/memutils.h" +#include "utils/syscache.h" +#include "utils/varlena.h" + +PG_MODULE_MAGIC; + +extern void _PG_output_plugin_init(OutputPluginCallbacks *cb); + +static void pgoutput_startup(LogicalDecodingContext *ctx, + OutputPluginOptions *opt, bool is_init); +static void pgoutput_shutdown(LogicalDecodingContext *ctx); +static void pgoutput_begin_txn(LogicalDecodingContext *ctx, + ReorderBufferTXN *txn); +static void pgoutput_commit_txn(LogicalDecodingContext *ctx, + ReorderBufferTXN *txn, XLogRecPtr commit_lsn); +static void pgoutput_change(LogicalDecodingContext *ctx, + ReorderBufferTXN *txn, Relation rel, + ReorderBufferChange *change); +static void pgoutput_truncate(LogicalDecodingContext *ctx, + ReorderBufferTXN *txn, int nrelations, Relation relations[], + ReorderBufferChange *change); +static void pgoutput_message(LogicalDecodingContext *ctx, + ReorderBufferTXN *txn, XLogRecPtr message_lsn, + bool transactional, const char *prefix, + Size sz, const char *message); +static bool pgoutput_origin_filter(LogicalDecodingContext *ctx, + RepOriginId origin_id); +static void pgoutput_stream_start(struct LogicalDecodingContext *ctx, + ReorderBufferTXN *txn); +static void pgoutput_stream_stop(struct LogicalDecodingContext *ctx, + ReorderBufferTXN *txn); +static void pgoutput_stream_abort(struct LogicalDecodingContext *ctx, + ReorderBufferTXN *txn, + XLogRecPtr abort_lsn); +static void pgoutput_stream_commit(struct LogicalDecodingContext *ctx, + ReorderBufferTXN *txn, + XLogRecPtr commit_lsn); + +static bool publications_valid; +static bool in_streaming; + +static List *LoadPublications(List *pubnames); +static void publication_invalidation_cb(Datum arg, int cacheid, + uint32 hashvalue); +static void send_relation_and_attrs(Relation relation, TransactionId xid, + LogicalDecodingContext *ctx); +static void update_replication_progress(LogicalDecodingContext *ctx); + +/* + * Entry in the map used to remember which relation schemas we sent. + * + * The schema_sent flag determines if the current schema record for the + * relation (and for its ancestor if publish_as_relid is set) was already + * sent to the subscriber (in which case we don't need to send it again). + * + * The schema cache on downstream is however updated only at commit time, + * and with streamed transactions the commit order may be different from + * the order the transactions are sent in. Also, the (sub) transactions + * might get aborted so we need to send the schema for each (sub) transaction + * so that we don't lose the schema information on abort. For handling this, + * we maintain the list of xids (streamed_txns) for those we have already sent + * the schema. + * + * For partitions, 'pubactions' considers not only the table's own + * publications, but also those of all of its ancestors. + */ +typedef struct RelationSyncEntry +{ + Oid relid; /* relation oid */ + + bool schema_sent; + List *streamed_txns; /* streamed toplevel transactions with this + * schema */ + + bool replicate_valid; + PublicationActions pubactions; + + /* + * OID of the relation to publish changes as. For a partition, this may + * be set to one of its ancestors whose schema will be used when + * replicating changes, if publish_via_partition_root is set for the + * publication. + */ + Oid publish_as_relid; + + /* + * Map used when replicating using an ancestor's schema to convert tuples + * from partition's type to the ancestor's; NULL if publish_as_relid is + * same as 'relid' or if unnecessary due to partition and the ancestor + * having identical TupleDesc. + */ + TupleConversionMap *map; +} RelationSyncEntry; + +/* Map used to remember which relation schemas we sent. */ +static HTAB *RelationSyncCache = NULL; + +static void init_rel_sync_cache(MemoryContext decoding_context); +static void cleanup_rel_sync_cache(TransactionId xid, bool is_commit); +static RelationSyncEntry *get_rel_sync_entry(PGOutputData *data, Oid relid); +static void rel_sync_cache_relation_cb(Datum arg, Oid relid); +static void rel_sync_cache_publication_cb(Datum arg, int cacheid, + uint32 hashvalue); +static void set_schema_sent_in_streamed_txn(RelationSyncEntry *entry, + TransactionId xid); +static bool get_schema_sent_in_streamed_txn(RelationSyncEntry *entry, + TransactionId xid); + +/* + * Specify output plugin callbacks + */ +void +_PG_output_plugin_init(OutputPluginCallbacks *cb) +{ + AssertVariableIsOfType(&_PG_output_plugin_init, LogicalOutputPluginInit); + + cb->startup_cb = pgoutput_startup; + cb->begin_cb = pgoutput_begin_txn; + cb->change_cb = pgoutput_change; + cb->truncate_cb = pgoutput_truncate; + cb->message_cb = pgoutput_message; + cb->commit_cb = pgoutput_commit_txn; + cb->filter_by_origin_cb = pgoutput_origin_filter; + cb->shutdown_cb = pgoutput_shutdown; + + /* transaction streaming */ + cb->stream_start_cb = pgoutput_stream_start; + cb->stream_stop_cb = pgoutput_stream_stop; + cb->stream_abort_cb = pgoutput_stream_abort; + cb->stream_commit_cb = pgoutput_stream_commit; + cb->stream_change_cb = pgoutput_change; + cb->stream_message_cb = pgoutput_message; + cb->stream_truncate_cb = pgoutput_truncate; +} + +static void +parse_output_parameters(List *options, PGOutputData *data) +{ + ListCell *lc; + bool protocol_version_given = false; + bool publication_names_given = false; + bool binary_option_given = false; + bool messages_option_given = false; + bool streaming_given = false; + + data->binary = false; + data->streaming = false; + data->messages = false; + + foreach(lc, options) + { + DefElem *defel = (DefElem *) lfirst(lc); + + Assert(defel->arg == NULL || IsA(defel->arg, String)); + + /* Check each param, whether or not we recognize it */ + if (strcmp(defel->defname, "proto_version") == 0) + { + int64 parsed; + + if (protocol_version_given) + ereport(ERROR, + (errcode(ERRCODE_SYNTAX_ERROR), + errmsg("conflicting or redundant options"))); + protocol_version_given = true; + + if (!scanint8(strVal(defel->arg), true, &parsed)) + ereport(ERROR, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("invalid proto_version"))); + + if (parsed > PG_UINT32_MAX || parsed < 0) + ereport(ERROR, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("proto_version \"%s\" out of range", + strVal(defel->arg)))); + + data->protocol_version = (uint32) parsed; + } + else if (strcmp(defel->defname, "publication_names") == 0) + { + if (publication_names_given) + ereport(ERROR, + (errcode(ERRCODE_SYNTAX_ERROR), + errmsg("conflicting or redundant options"))); + publication_names_given = true; + + if (!SplitIdentifierString(strVal(defel->arg), ',', + &data->publication_names)) + ereport(ERROR, + (errcode(ERRCODE_INVALID_NAME), + errmsg("invalid publication_names syntax"))); + } + else if (strcmp(defel->defname, "binary") == 0) + { + if (binary_option_given) + ereport(ERROR, + (errcode(ERRCODE_SYNTAX_ERROR), + errmsg("conflicting or redundant options"))); + binary_option_given = true; + + data->binary = defGetBoolean(defel); + } + else if (strcmp(defel->defname, "messages") == 0) + { + if (messages_option_given) + ereport(ERROR, + (errcode(ERRCODE_SYNTAX_ERROR), + errmsg("conflicting or redundant options"))); + messages_option_given = true; + + data->messages = defGetBoolean(defel); + } + else if (strcmp(defel->defname, "streaming") == 0) + { + if (streaming_given) + ereport(ERROR, + (errcode(ERRCODE_SYNTAX_ERROR), + errmsg("conflicting or redundant options"))); + streaming_given = true; + + data->streaming = defGetBoolean(defel); + } + else + elog(ERROR, "unrecognized pgoutput option: %s", defel->defname); + } +} + +/* + * Initialize this plugin + */ +static void +pgoutput_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt, + bool is_init) +{ + PGOutputData *data = palloc0(sizeof(PGOutputData)); + + /* Create our memory context for private allocations. */ + data->context = AllocSetContextCreate(ctx->context, + "logical replication output context", + ALLOCSET_DEFAULT_SIZES); + + ctx->output_plugin_private = data; + + /* This plugin uses binary protocol. */ + opt->output_type = OUTPUT_PLUGIN_BINARY_OUTPUT; + + /* + * This is replication start and not slot initialization. + * + * Parse and validate options passed by the client. + */ + if (!is_init) + { + /* Parse the params and ERROR if we see any we don't recognize */ + parse_output_parameters(ctx->output_plugin_options, data); + + /* Check if we support requested protocol */ + if (data->protocol_version > LOGICALREP_PROTO_MAX_VERSION_NUM) + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("client sent proto_version=%d but we only support protocol %d or lower", + data->protocol_version, LOGICALREP_PROTO_MAX_VERSION_NUM))); + + if (data->protocol_version < LOGICALREP_PROTO_MIN_VERSION_NUM) + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("client sent proto_version=%d but we only support protocol %d or higher", + data->protocol_version, LOGICALREP_PROTO_MIN_VERSION_NUM))); + + if (list_length(data->publication_names) < 1) + ereport(ERROR, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("publication_names parameter missing"))); + + /* + * Decide whether to enable streaming. It is disabled by default, in + * which case we just update the flag in decoding context. Otherwise + * we only allow it with sufficient version of the protocol, and when + * the output plugin supports it. + */ + if (!data->streaming) + ctx->streaming = false; + else if (data->protocol_version < LOGICALREP_PROTO_STREAM_VERSION_NUM) + ereport(ERROR, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("requested proto_version=%d does not support streaming, need %d or higher", + data->protocol_version, LOGICALREP_PROTO_STREAM_VERSION_NUM))); + else if (!ctx->streaming) + ereport(ERROR, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("streaming requested, but not supported by output plugin"))); + + /* Also remember we're currently not streaming any transaction. */ + in_streaming = false; + + /* Init publication state. */ + data->publications = NIL; + publications_valid = false; + CacheRegisterSyscacheCallback(PUBLICATIONOID, + publication_invalidation_cb, + (Datum) 0); + + /* Initialize relation schema cache. */ + init_rel_sync_cache(CacheMemoryContext); + } + else + { + /* Disable the streaming during the slot initialization mode. */ + ctx->streaming = false; + } +} + +/* + * BEGIN callback + */ +static void +pgoutput_begin_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn) +{ + bool send_replication_origin = txn->origin_id != InvalidRepOriginId; + + OutputPluginPrepareWrite(ctx, !send_replication_origin); + logicalrep_write_begin(ctx->out, txn); + + if (send_replication_origin) + { + char *origin; + + /*---------- + * XXX: which behaviour do we want here? + * + * Alternatives: + * - don't send origin message if origin name not found + * (that's what we do now) + * - throw error - that will break replication, not good + * - send some special "unknown" origin + *---------- + */ + if (replorigin_by_oid(txn->origin_id, true, &origin)) + { + /* Message boundary */ + OutputPluginWrite(ctx, false); + OutputPluginPrepareWrite(ctx, true); + logicalrep_write_origin(ctx->out, origin, txn->origin_lsn); + } + + } + + OutputPluginWrite(ctx, true); +} + +/* + * COMMIT callback + */ +static void +pgoutput_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, + XLogRecPtr commit_lsn) +{ + update_replication_progress(ctx); + + OutputPluginPrepareWrite(ctx, true); + logicalrep_write_commit(ctx->out, txn, commit_lsn); + OutputPluginWrite(ctx, true); +} + +/* + * Write the current schema of the relation and its ancestor (if any) if not + * done yet. + */ +static void +maybe_send_schema(LogicalDecodingContext *ctx, + ReorderBufferTXN *txn, ReorderBufferChange *change, + Relation relation, RelationSyncEntry *relentry) +{ + bool schema_sent; + TransactionId xid = InvalidTransactionId; + TransactionId topxid = InvalidTransactionId; + + /* + * Remember XID of the (sub)transaction for the change. We don't care if + * it's top-level transaction or not (we have already sent that XID in + * start of the current streaming block). + * + * If we're not in a streaming block, just use InvalidTransactionId and + * the write methods will not include it. + */ + if (in_streaming) + xid = change->txn->xid; + + if (change->txn->toptxn) + topxid = change->txn->toptxn->xid; + else + topxid = xid; + + /* + * Do we need to send the schema? We do track streamed transactions + * separately, because those may be applied later (and the regular + * transactions won't see their effects until then) and in an order that + * we don't know at this point. + * + * XXX There is a scope of optimization here. Currently, we always send + * the schema first time in a streaming transaction but we can probably + * avoid that by checking 'relentry->schema_sent' flag. However, before + * doing that we need to study its impact on the case where we have a mix + * of streaming and non-streaming transactions. + */ + if (in_streaming) + schema_sent = get_schema_sent_in_streamed_txn(relentry, topxid); + else + schema_sent = relentry->schema_sent; + + /* Nothing to do if we already sent the schema. */ + if (schema_sent) + return; + + /* + * Nope, so send the schema. If the changes will be published using an + * ancestor's schema, not the relation's own, send that ancestor's schema + * before sending relation's own (XXX - maybe sending only the former + * suffices?). This is also a good place to set the map that will be used + * to convert the relation's tuples into the ancestor's format, if needed. + */ + if (relentry->publish_as_relid != RelationGetRelid(relation)) + { + Relation ancestor = RelationIdGetRelation(relentry->publish_as_relid); + TupleDesc indesc = RelationGetDescr(relation); + TupleDesc outdesc = RelationGetDescr(ancestor); + MemoryContext oldctx; + + /* Map must live as long as the session does. */ + oldctx = MemoryContextSwitchTo(CacheMemoryContext); + + /* + * Make copies of the TupleDescs that will live as long as the map + * does before putting into the map. + */ + indesc = CreateTupleDescCopy(indesc); + outdesc = CreateTupleDescCopy(outdesc); + relentry->map = convert_tuples_by_name(indesc, outdesc); + if (relentry->map == NULL) + { + /* Map not necessary, so free the TupleDescs too. */ + FreeTupleDesc(indesc); + FreeTupleDesc(outdesc); + } + + MemoryContextSwitchTo(oldctx); + send_relation_and_attrs(ancestor, xid, ctx); + RelationClose(ancestor); + } + + send_relation_and_attrs(relation, xid, ctx); + + if (in_streaming) + set_schema_sent_in_streamed_txn(relentry, topxid); + else + relentry->schema_sent = true; +} + +/* + * Sends a relation + */ +static void +send_relation_and_attrs(Relation relation, TransactionId xid, + LogicalDecodingContext *ctx) +{ + TupleDesc desc = RelationGetDescr(relation); + int i; + + /* + * Write out type info if needed. We do that only for user-created types. + * We use FirstGenbkiObjectId as the cutoff, so that we only consider + * objects with hand-assigned OIDs to be "built in", not for instance any + * function or type defined in the information_schema. This is important + * because only hand-assigned OIDs can be expected to remain stable across + * major versions. + */ + for (i = 0; i < desc->natts; i++) + { + Form_pg_attribute att = TupleDescAttr(desc, i); + + if (att->attisdropped || att->attgenerated) + continue; + + if (att->atttypid < FirstGenbkiObjectId) + continue; + + OutputPluginPrepareWrite(ctx, false); + logicalrep_write_typ(ctx->out, xid, att->atttypid); + OutputPluginWrite(ctx, false); + } + + OutputPluginPrepareWrite(ctx, false); + logicalrep_write_rel(ctx->out, xid, relation); + OutputPluginWrite(ctx, false); +} + +/* + * Sends the decoded DML over wire. + * + * This is called both in streaming and non-streaming modes. + */ +static void +pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, + Relation relation, ReorderBufferChange *change) +{ + PGOutputData *data = (PGOutputData *) ctx->output_plugin_private; + MemoryContext old; + RelationSyncEntry *relentry; + TransactionId xid = InvalidTransactionId; + Relation ancestor = NULL; + + update_replication_progress(ctx); + + if (!is_publishable_relation(relation)) + return; + + /* + * Remember the xid for the change in streaming mode. We need to send xid + * with each change in the streaming mode so that subscriber can make + * their association and on aborts, it can discard the corresponding + * changes. + */ + if (in_streaming) + xid = change->txn->xid; + + relentry = get_rel_sync_entry(data, RelationGetRelid(relation)); + + /* First check the table filter */ + switch (change->action) + { + case REORDER_BUFFER_CHANGE_INSERT: + if (!relentry->pubactions.pubinsert) + return; + break; + case REORDER_BUFFER_CHANGE_UPDATE: + if (!relentry->pubactions.pubupdate) + return; + break; + case REORDER_BUFFER_CHANGE_DELETE: + if (!relentry->pubactions.pubdelete) + return; + break; + default: + Assert(false); + } + + /* Avoid leaking memory by using and resetting our own context */ + old = MemoryContextSwitchTo(data->context); + + maybe_send_schema(ctx, txn, change, relation, relentry); + + /* Send the data */ + switch (change->action) + { + case REORDER_BUFFER_CHANGE_INSERT: + { + HeapTuple tuple = &change->data.tp.newtuple->tuple; + + /* Switch relation if publishing via root. */ + if (relentry->publish_as_relid != RelationGetRelid(relation)) + { + Assert(relation->rd_rel->relispartition); + ancestor = RelationIdGetRelation(relentry->publish_as_relid); + relation = ancestor; + /* Convert tuple if needed. */ + if (relentry->map) + tuple = execute_attr_map_tuple(tuple, relentry->map); + } + + OutputPluginPrepareWrite(ctx, true); + logicalrep_write_insert(ctx->out, xid, relation, tuple, + data->binary); + OutputPluginWrite(ctx, true); + break; + } + case REORDER_BUFFER_CHANGE_UPDATE: + { + HeapTuple oldtuple = change->data.tp.oldtuple ? + &change->data.tp.oldtuple->tuple : NULL; + HeapTuple newtuple = &change->data.tp.newtuple->tuple; + + /* Switch relation if publishing via root. */ + if (relentry->publish_as_relid != RelationGetRelid(relation)) + { + Assert(relation->rd_rel->relispartition); + ancestor = RelationIdGetRelation(relentry->publish_as_relid); + relation = ancestor; + /* Convert tuples if needed. */ + if (relentry->map) + { + if (oldtuple) + oldtuple = execute_attr_map_tuple(oldtuple, + relentry->map); + newtuple = execute_attr_map_tuple(newtuple, + relentry->map); + } + } + + OutputPluginPrepareWrite(ctx, true); + logicalrep_write_update(ctx->out, xid, relation, oldtuple, + newtuple, data->binary); + OutputPluginWrite(ctx, true); + break; + } + case REORDER_BUFFER_CHANGE_DELETE: + if (change->data.tp.oldtuple) + { + HeapTuple oldtuple = &change->data.tp.oldtuple->tuple; + + /* Switch relation if publishing via root. */ + if (relentry->publish_as_relid != RelationGetRelid(relation)) + { + Assert(relation->rd_rel->relispartition); + ancestor = RelationIdGetRelation(relentry->publish_as_relid); + relation = ancestor; + /* Convert tuple if needed. */ + if (relentry->map) + oldtuple = execute_attr_map_tuple(oldtuple, relentry->map); + } + + OutputPluginPrepareWrite(ctx, true); + logicalrep_write_delete(ctx->out, xid, relation, oldtuple, + data->binary); + OutputPluginWrite(ctx, true); + } + else + elog(DEBUG1, "didn't send DELETE change because of missing oldtuple"); + break; + default: + Assert(false); + } + + if (RelationIsValid(ancestor)) + { + RelationClose(ancestor); + ancestor = NULL; + } + + /* Cleanup */ + MemoryContextSwitchTo(old); + MemoryContextReset(data->context); +} + +static void +pgoutput_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, + int nrelations, Relation relations[], ReorderBufferChange *change) +{ + PGOutputData *data = (PGOutputData *) ctx->output_plugin_private; + MemoryContext old; + RelationSyncEntry *relentry; + int i; + int nrelids; + Oid *relids; + TransactionId xid = InvalidTransactionId; + + update_replication_progress(ctx); + + /* Remember the xid for the change in streaming mode. See pgoutput_change. */ + if (in_streaming) + xid = change->txn->xid; + + old = MemoryContextSwitchTo(data->context); + + relids = palloc0(nrelations * sizeof(Oid)); + nrelids = 0; + + for (i = 0; i < nrelations; i++) + { + Relation relation = relations[i]; + Oid relid = RelationGetRelid(relation); + + if (!is_publishable_relation(relation)) + continue; + + relentry = get_rel_sync_entry(data, relid); + + if (!relentry->pubactions.pubtruncate) + continue; + + /* + * Don't send partitions if the publication wants to send only the + * root tables through it. + */ + if (relation->rd_rel->relispartition && + relentry->publish_as_relid != relid) + continue; + + relids[nrelids++] = relid; + maybe_send_schema(ctx, txn, change, relation, relentry); + } + + if (nrelids > 0) + { + OutputPluginPrepareWrite(ctx, true); + logicalrep_write_truncate(ctx->out, + xid, + nrelids, + relids, + change->data.truncate.cascade, + change->data.truncate.restart_seqs); + OutputPluginWrite(ctx, true); + } + + MemoryContextSwitchTo(old); + MemoryContextReset(data->context); +} + +static void +pgoutput_message(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, + XLogRecPtr message_lsn, bool transactional, const char *prefix, Size sz, + const char *message) +{ + PGOutputData *data = (PGOutputData *) ctx->output_plugin_private; + TransactionId xid = InvalidTransactionId; + + update_replication_progress(ctx); + + if (!data->messages) + return; + + /* + * Remember the xid for the message in streaming mode. See + * pgoutput_change. + */ + if (in_streaming) + xid = txn->xid; + + OutputPluginPrepareWrite(ctx, true); + logicalrep_write_message(ctx->out, + xid, + message_lsn, + transactional, + prefix, + sz, + message); + OutputPluginWrite(ctx, true); +} + +/* + * Currently we always forward. + */ +static bool +pgoutput_origin_filter(LogicalDecodingContext *ctx, + RepOriginId origin_id) +{ + return false; +} + +/* + * Shutdown the output plugin. + * + * Note, we don't need to clean the data->context as it's child context + * of the ctx->context so it will be cleaned up by logical decoding machinery. + */ +static void +pgoutput_shutdown(LogicalDecodingContext *ctx) +{ + if (RelationSyncCache) + { + hash_destroy(RelationSyncCache); + RelationSyncCache = NULL; + } +} + +/* + * Load publications from the list of publication names. + */ +static List * +LoadPublications(List *pubnames) +{ + List *result = NIL; + ListCell *lc; + + foreach(lc, pubnames) + { + char *pubname = (char *) lfirst(lc); + Publication *pub = GetPublicationByName(pubname, false); + + result = lappend(result, pub); + } + + return result; +} + +/* + * Publication cache invalidation callback. + */ +static void +publication_invalidation_cb(Datum arg, int cacheid, uint32 hashvalue) +{ + publications_valid = false; + + /* + * Also invalidate per-relation cache so that next time the filtering info + * is checked it will be updated with the new publication settings. + */ + rel_sync_cache_publication_cb(arg, cacheid, hashvalue); +} + +/* + * START STREAM callback + */ +static void +pgoutput_stream_start(struct LogicalDecodingContext *ctx, + ReorderBufferTXN *txn) +{ + bool send_replication_origin = txn->origin_id != InvalidRepOriginId; + + /* we can't nest streaming of transactions */ + Assert(!in_streaming); + + /* + * If we already sent the first stream for this transaction then don't + * send the origin id in the subsequent streams. + */ + if (rbtxn_is_streamed(txn)) + send_replication_origin = false; + + OutputPluginPrepareWrite(ctx, !send_replication_origin); + logicalrep_write_stream_start(ctx->out, txn->xid, !rbtxn_is_streamed(txn)); + + if (send_replication_origin) + { + char *origin; + + if (replorigin_by_oid(txn->origin_id, true, &origin)) + { + /* Message boundary */ + OutputPluginWrite(ctx, false); + OutputPluginPrepareWrite(ctx, true); + logicalrep_write_origin(ctx->out, origin, InvalidXLogRecPtr); + } + } + + OutputPluginWrite(ctx, true); + + /* we're streaming a chunk of transaction now */ + in_streaming = true; +} + +/* + * STOP STREAM callback + */ +static void +pgoutput_stream_stop(struct LogicalDecodingContext *ctx, + ReorderBufferTXN *txn) +{ + /* we should be streaming a trasanction */ + Assert(in_streaming); + + OutputPluginPrepareWrite(ctx, true); + logicalrep_write_stream_stop(ctx->out); + OutputPluginWrite(ctx, true); + + /* we've stopped streaming a transaction */ + in_streaming = false; +} + +/* + * Notify downstream to discard the streamed transaction (along with all + * it's subtransactions, if it's a toplevel transaction). + */ +static void +pgoutput_stream_abort(struct LogicalDecodingContext *ctx, + ReorderBufferTXN *txn, + XLogRecPtr abort_lsn) +{ + ReorderBufferTXN *toptxn; + + /* + * The abort should happen outside streaming block, even for streamed + * transactions. The transaction has to be marked as streamed, though. + */ + Assert(!in_streaming); + + /* determine the toplevel transaction */ + toptxn = (txn->toptxn) ? txn->toptxn : txn; + + Assert(rbtxn_is_streamed(toptxn)); + + OutputPluginPrepareWrite(ctx, true); + logicalrep_write_stream_abort(ctx->out, toptxn->xid, txn->xid); + OutputPluginWrite(ctx, true); + + cleanup_rel_sync_cache(toptxn->xid, false); +} + +/* + * Notify downstream to apply the streamed transaction (along with all + * it's subtransactions). + */ +static void +pgoutput_stream_commit(struct LogicalDecodingContext *ctx, + ReorderBufferTXN *txn, + XLogRecPtr commit_lsn) +{ + /* + * The commit should happen outside streaming block, even for streamed + * transactions. The transaction has to be marked as streamed, though. + */ + Assert(!in_streaming); + Assert(rbtxn_is_streamed(txn)); + + update_replication_progress(ctx); + + OutputPluginPrepareWrite(ctx, true); + logicalrep_write_stream_commit(ctx->out, txn, commit_lsn); + OutputPluginWrite(ctx, true); + + cleanup_rel_sync_cache(txn->xid, true); +} + +/* + * Initialize the relation schema sync cache for a decoding session. + * + * The hash table is destroyed at the end of a decoding session. While + * relcache invalidations still exist and will still be invoked, they + * will just see the null hash table global and take no action. + */ +static void +init_rel_sync_cache(MemoryContext cachectx) +{ + HASHCTL ctl; + + if (RelationSyncCache != NULL) + return; + + /* Make a new hash table for the cache */ + ctl.keysize = sizeof(Oid); + ctl.entrysize = sizeof(RelationSyncEntry); + ctl.hcxt = cachectx; + + RelationSyncCache = hash_create("logical replication output relation cache", + 128, &ctl, + HASH_ELEM | HASH_CONTEXT | HASH_BLOBS); + + Assert(RelationSyncCache != NULL); + + CacheRegisterRelcacheCallback(rel_sync_cache_relation_cb, (Datum) 0); + CacheRegisterSyscacheCallback(PUBLICATIONRELMAP, + rel_sync_cache_publication_cb, + (Datum) 0); +} + +/* + * We expect relatively small number of streamed transactions. + */ +static bool +get_schema_sent_in_streamed_txn(RelationSyncEntry *entry, TransactionId xid) +{ + ListCell *lc; + + foreach(lc, entry->streamed_txns) + { + if (xid == (uint32) lfirst_int(lc)) + return true; + } + + return false; +} + +/* + * Add the xid in the rel sync entry for which we have already sent the schema + * of the relation. + */ +static void +set_schema_sent_in_streamed_txn(RelationSyncEntry *entry, TransactionId xid) +{ + MemoryContext oldctx; + + oldctx = MemoryContextSwitchTo(CacheMemoryContext); + + entry->streamed_txns = lappend_int(entry->streamed_txns, xid); + + MemoryContextSwitchTo(oldctx); +} + +/* + * Find or create entry in the relation schema cache. + * + * This looks up publications that the given relation is directly or + * indirectly part of (the latter if it's really the relation's ancestor that + * is part of a publication) and fills up the found entry with the information + * about which operations to publish and whether to use an ancestor's schema + * when publishing. + */ +static RelationSyncEntry * +get_rel_sync_entry(PGOutputData *data, Oid relid) +{ + RelationSyncEntry *entry; + bool found; + MemoryContext oldctx; + + Assert(RelationSyncCache != NULL); + + /* Find cached relation info, creating if not found */ + entry = (RelationSyncEntry *) hash_search(RelationSyncCache, + (void *) &relid, + HASH_ENTER, &found); + Assert(entry != NULL); + + /* Not found means schema wasn't sent */ + if (!found) + { + /* immediately make a new entry valid enough to satisfy callbacks */ + entry->schema_sent = false; + entry->streamed_txns = NIL; + entry->replicate_valid = false; + entry->pubactions.pubinsert = entry->pubactions.pubupdate = + entry->pubactions.pubdelete = entry->pubactions.pubtruncate = false; + entry->publish_as_relid = InvalidOid; + entry->map = NULL; /* will be set by maybe_send_schema() if + * needed */ + } + + /* Validate the entry */ + if (!entry->replicate_valid) + { + List *pubids = GetRelationPublications(relid); + ListCell *lc; + Oid publish_as_relid = relid; + int publish_ancestor_level = 0; + bool am_partition = get_rel_relispartition(relid); + char relkind = get_rel_relkind(relid); + + /* Reload publications if needed before use. */ + if (!publications_valid) + { + oldctx = MemoryContextSwitchTo(CacheMemoryContext); + if (data->publications) + list_free_deep(data->publications); + + data->publications = LoadPublications(data->publication_names); + MemoryContextSwitchTo(oldctx); + publications_valid = true; + } + + /* + * Build publication cache. We can't use one provided by relcache as + * relcache considers all publications given relation is in, but here + * we only need to consider ones that the subscriber requested. + */ + foreach(lc, data->publications) + { + Publication *pub = lfirst(lc); + bool publish = false; + + /* + * Under what relid should we publish changes in this publication? + * We'll use the top-most relid across all publications. Also track + * the ancestor level for this publication. + */ + Oid pub_relid = relid; + int ancestor_level = 0; + + /* + * If this is a FOR ALL TABLES publication, pick the partition root + * and set the ancestor level accordingly. + */ + if (pub->alltables) + { + publish = true; + if (pub->pubviaroot && am_partition) + { + List *ancestors = get_partition_ancestors(relid); + + pub_relid = llast_oid(ancestors); + ancestor_level = list_length(ancestors); + } + } + + if (!publish) + { + bool ancestor_published = false; + + /* + * For a partition, check if any of the ancestors are + * published. If so, note down the topmost ancestor that is + * published via this publication, which will be used as the + * relation via which to publish the partition's changes. + */ + if (am_partition) + { + List *ancestors = get_partition_ancestors(relid); + ListCell *lc2; + int level = 0; + + /* + * Find the "topmost" ancestor that is in this + * publication. + */ + foreach(lc2, ancestors) + { + Oid ancestor = lfirst_oid(lc2); + + level++; + + if (list_member_oid(GetRelationPublications(ancestor), + pub->oid)) + { + ancestor_published = true; + if (pub->pubviaroot) + { + pub_relid = ancestor; + ancestor_level = level; + } + } + } + } + + if (list_member_oid(pubids, pub->oid) || ancestor_published) + publish = true; + } + + /* + * Don't publish changes for partitioned tables, because + * publishing those of its partitions suffices, unless partition + * changes won't be published due to pubviaroot being set. + */ + if (publish && + (relkind != RELKIND_PARTITIONED_TABLE || pub->pubviaroot)) + { + entry->pubactions.pubinsert |= pub->pubactions.pubinsert; + entry->pubactions.pubupdate |= pub->pubactions.pubupdate; + entry->pubactions.pubdelete |= pub->pubactions.pubdelete; + entry->pubactions.pubtruncate |= pub->pubactions.pubtruncate; + + /* + * We want to publish the changes as the top-most ancestor + * across all publications. So we need to check if the + * already calculated level is higher than the new one. If + * yes, we can ignore the new value (as it's a child). + * Otherwise the new value is an ancestor, so we keep it. + */ + if (publish_ancestor_level > ancestor_level) + continue; + + /* The new value is an ancestor, so let's keep it. */ + publish_as_relid = pub_relid; + publish_ancestor_level = ancestor_level; + } + } + + list_free(pubids); + + entry->publish_as_relid = publish_as_relid; + entry->replicate_valid = true; + } + + return entry; +} + +/* + * Cleanup list of streamed transactions and update the schema_sent flag. + * + * When a streamed transaction commits or aborts, we need to remove the + * toplevel XID from the schema cache. If the transaction aborted, the + * subscriber will simply throw away the schema records we streamed, so + * we don't need to do anything else. + * + * If the transaction is committed, the subscriber will update the relation + * cache - so tweak the schema_sent flag accordingly. + */ +static void +cleanup_rel_sync_cache(TransactionId xid, bool is_commit) +{ + HASH_SEQ_STATUS hash_seq; + RelationSyncEntry *entry; + ListCell *lc; + + Assert(RelationSyncCache != NULL); + + hash_seq_init(&hash_seq, RelationSyncCache); + while ((entry = hash_seq_search(&hash_seq)) != NULL) + { + /* + * We can set the schema_sent flag for an entry that has committed xid + * in the list as that ensures that the subscriber would have the + * corresponding schema and we don't need to send it unless there is + * any invalidation for that relation. + */ + foreach(lc, entry->streamed_txns) + { + if (xid == (uint32) lfirst_int(lc)) + { + if (is_commit) + entry->schema_sent = true; + + entry->streamed_txns = + foreach_delete_current(entry->streamed_txns, lc); + break; + } + } + } +} + +/* + * Relcache invalidation callback + */ +static void +rel_sync_cache_relation_cb(Datum arg, Oid relid) +{ + RelationSyncEntry *entry; + + /* + * We can get here if the plugin was used in SQL interface as the + * RelSchemaSyncCache is destroyed when the decoding finishes, but there + * is no way to unregister the relcache invalidation callback. + */ + if (RelationSyncCache == NULL) + return; + + /* + * Nobody keeps pointers to entries in this hash table around outside + * logical decoding callback calls - but invalidation events can come in + * *during* a callback if we access the relcache in the callback. Because + * of that we must mark the cache entry as invalid but not remove it from + * the hash while it could still be referenced, then prune it at a later + * safe point. + * + * Getting invalidations for relations that aren't in the table is + * entirely normal, since there's no way to unregister for an invalidation + * event. So we don't care if it's found or not. + */ + entry = (RelationSyncEntry *) hash_search(RelationSyncCache, &relid, + HASH_FIND, NULL); + + /* + * Reset schema sent status as the relation definition may have changed. + * Also free any objects that depended on the earlier definition. + */ + if (entry != NULL) + { + entry->schema_sent = false; + list_free(entry->streamed_txns); + entry->streamed_txns = NIL; + if (entry->map) + { + /* + * Must free the TupleDescs contained in the map explicitly, + * because free_conversion_map() doesn't. + */ + FreeTupleDesc(entry->map->indesc); + FreeTupleDesc(entry->map->outdesc); + free_conversion_map(entry->map); + } + entry->map = NULL; + } +} + +/* + * Publication relation map syscache invalidation callback + */ +static void +rel_sync_cache_publication_cb(Datum arg, int cacheid, uint32 hashvalue) +{ + HASH_SEQ_STATUS status; + RelationSyncEntry *entry; + + /* + * We can get here if the plugin was used in SQL interface as the + * RelSchemaSyncCache is destroyed when the decoding finishes, but there + * is no way to unregister the relcache invalidation callback. + */ + if (RelationSyncCache == NULL) + return; + + /* + * There is no way to find which entry in our cache the hash belongs to so + * mark the whole cache as invalid. + */ + hash_seq_init(&status, RelationSyncCache); + while ((entry = (RelationSyncEntry *) hash_seq_search(&status)) != NULL) + { + entry->replicate_valid = false; + + /* + * There might be some relations dropped from the publication so we + * don't need to publish the changes for them. + */ + entry->pubactions.pubinsert = false; + entry->pubactions.pubupdate = false; + entry->pubactions.pubdelete = false; + entry->pubactions.pubtruncate = false; + } +} + +/* + * Try to update progress and send a keepalive message if too many changes were + * processed. + * + * For a large transaction, if we don't send any change to the downstream for a + * long time (exceeds the wal_receiver_timeout of standby) then it can timeout. + * This can happen when all or most of the changes are not published. + */ +static void +update_replication_progress(LogicalDecodingContext *ctx) +{ + static int changes_count = 0; + + /* + * We don't want to try sending a keepalive message after processing each + * change as that can have overhead. Tests revealed that there is no + * noticeable overhead in doing it after continuously processing 100 or so + * changes. + */ +#define CHANGES_THRESHOLD 100 + + /* + * If we are at the end of transaction LSN, update progress tracking. + * Otherwise, after continuously processing CHANGES_THRESHOLD changes, we + * try to send a keepalive message if required. + */ + if (ctx->end_xact || ++changes_count >= CHANGES_THRESHOLD) + { + OutputPluginUpdateProgress(ctx); + changes_count = 0; + } +} |