summaryrefslogtreecommitdiffstats
path: root/src/backend/replication/logical/decode.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/replication/logical/decode.c')
-rw-r--r--src/backend/replication/logical/decode.c1052
1 files changed, 1052 insertions, 0 deletions
diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c
new file mode 100644
index 0000000..4985c2a
--- /dev/null
+++ b/src/backend/replication/logical/decode.c
@@ -0,0 +1,1052 @@
+/* -------------------------------------------------------------------------
+ *
+ * decode.c
+ * This module decodes WAL records read using xlogreader.h's APIs for the
+ * purpose of logical decoding by passing information to the
+ * reorderbuffer module (containing the actual changes) and to the
+ * snapbuild module to build a fitting catalog snapshot (to be able to
+ * properly decode the changes in the reorderbuffer).
+ *
+ * NOTE:
+ * This basically tries to handle all low level xlog stuff for
+ * reorderbuffer.c and snapbuild.c. There's some minor leakage where a
+ * specific record's struct is used to pass data along, but those just
+ * happen to contain the right amount of data in a convenient
+ * format. There isn't and shouldn't be much intelligence about the
+ * contents of records in here except turning them into a more usable
+ * format.
+ *
+ * Portions Copyright (c) 1996-2020, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * IDENTIFICATION
+ * src/backend/replication/logical/decode.c
+ *
+ * -------------------------------------------------------------------------
+ */
+#include "postgres.h"
+
+#include "access/heapam.h"
+#include "access/heapam_xlog.h"
+#include "access/transam.h"
+#include "access/xact.h"
+#include "access/xlog_internal.h"
+#include "access/xlogreader.h"
+#include "access/xlogrecord.h"
+#include "access/xlogutils.h"
+#include "catalog/pg_control.h"
+#include "replication/decode.h"
+#include "replication/logical.h"
+#include "replication/message.h"
+#include "replication/origin.h"
+#include "replication/reorderbuffer.h"
+#include "replication/snapbuild.h"
+#include "storage/standby.h"
+
+typedef struct XLogRecordBuffer
+{
+ XLogRecPtr origptr;
+ XLogRecPtr endptr;
+ XLogReaderState *record;
+} XLogRecordBuffer;
+
+/* RMGR Handlers */
+static void DecodeXLogOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
+static void DecodeHeapOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
+static void DecodeHeap2Op(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
+static void DecodeXactOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
+static void DecodeStandbyOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
+static void DecodeLogicalMsgOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
+
+/* individual record(group)'s handlers */
+static void DecodeInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
+static void DecodeUpdate(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
+static void DecodeDelete(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
+static void DecodeTruncate(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
+static void DecodeMultiInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
+static void DecodeSpecConfirm(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
+
+static void DecodeCommit(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
+ xl_xact_parsed_commit *parsed, TransactionId xid);
+static void DecodeAbort(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
+ xl_xact_parsed_abort *parsed, TransactionId xid);
+
+/* common function to decode tuples */
+static void DecodeXLogTuple(char *data, Size len, ReorderBufferTupleBuf *tup);
+
+/*
+ * Take every XLogReadRecord()ed record and perform the actions required to
+ * decode it using the output plugin already setup in the logical decoding
+ * context.
+ *
+ * NB: Note that every record's xid needs to be processed by reorderbuffer
+ * (xids contained in the content of records are not relevant for this rule).
+ * That means that for records which'd otherwise not go through the
+ * reorderbuffer ReorderBufferProcessXid() has to be called. We don't want to
+ * call ReorderBufferProcessXid for each record type by default, because
+ * e.g. empty xacts can be handled more efficiently if there's no previous
+ * state for them.
+ *
+ * We also support the ability to fast forward thru records, skipping some
+ * record types completely - see individual record types for details.
+ */
+void
+LogicalDecodingProcessRecord(LogicalDecodingContext *ctx, XLogReaderState *record)
+{
+ XLogRecordBuffer buf;
+
+ buf.origptr = ctx->reader->ReadRecPtr;
+ buf.endptr = ctx->reader->EndRecPtr;
+ buf.record = record;
+
+ /* cast so we get a warning when new rmgrs are added */
+ switch ((RmgrId) XLogRecGetRmid(record))
+ {
+ /*
+ * Rmgrs we care about for logical decoding. Add new rmgrs in
+ * rmgrlist.h's order.
+ */
+ case RM_XLOG_ID:
+ DecodeXLogOp(ctx, &buf);
+ break;
+
+ case RM_XACT_ID:
+ DecodeXactOp(ctx, &buf);
+ break;
+
+ case RM_STANDBY_ID:
+ DecodeStandbyOp(ctx, &buf);
+ break;
+
+ case RM_HEAP2_ID:
+ DecodeHeap2Op(ctx, &buf);
+ break;
+
+ case RM_HEAP_ID:
+ DecodeHeapOp(ctx, &buf);
+ break;
+
+ case RM_LOGICALMSG_ID:
+ DecodeLogicalMsgOp(ctx, &buf);
+ break;
+
+ /*
+ * Rmgrs irrelevant for logical decoding; they describe stuff not
+ * represented in logical decoding. Add new rmgrs in rmgrlist.h's
+ * order.
+ */
+ case RM_SMGR_ID:
+ case RM_CLOG_ID:
+ case RM_DBASE_ID:
+ case RM_TBLSPC_ID:
+ case RM_MULTIXACT_ID:
+ case RM_RELMAP_ID:
+ case RM_BTREE_ID:
+ case RM_HASH_ID:
+ case RM_GIN_ID:
+ case RM_GIST_ID:
+ case RM_SEQ_ID:
+ case RM_SPGIST_ID:
+ case RM_BRIN_ID:
+ case RM_COMMIT_TS_ID:
+ case RM_REPLORIGIN_ID:
+ case RM_GENERIC_ID:
+ /* just deal with xid, and done */
+ ReorderBufferProcessXid(ctx->reorder, XLogRecGetXid(record),
+ buf.origptr);
+ break;
+ case RM_NEXT_ID:
+ elog(ERROR, "unexpected RM_NEXT_ID rmgr_id: %u", (RmgrIds) XLogRecGetRmid(buf.record));
+ }
+}
+
+/*
+ * Handle rmgr XLOG_ID records for DecodeRecordIntoReorderBuffer().
+ */
+static void
+DecodeXLogOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
+{
+ SnapBuild *builder = ctx->snapshot_builder;
+ uint8 info = XLogRecGetInfo(buf->record) & ~XLR_INFO_MASK;
+
+ ReorderBufferProcessXid(ctx->reorder, XLogRecGetXid(buf->record),
+ buf->origptr);
+
+ switch (info)
+ {
+ /* this is also used in END_OF_RECOVERY checkpoints */
+ case XLOG_CHECKPOINT_SHUTDOWN:
+ case XLOG_END_OF_RECOVERY:
+ SnapBuildSerializationPoint(builder, buf->origptr);
+
+ break;
+ case XLOG_CHECKPOINT_ONLINE:
+
+ /*
+ * a RUNNING_XACTS record will have been logged near to this, we
+ * can restart from there.
+ */
+ break;
+ case XLOG_NOOP:
+ case XLOG_NEXTOID:
+ case XLOG_SWITCH:
+ case XLOG_BACKUP_END:
+ case XLOG_PARAMETER_CHANGE:
+ case XLOG_RESTORE_POINT:
+ case XLOG_FPW_CHANGE:
+ case XLOG_FPI_FOR_HINT:
+ case XLOG_FPI:
+ break;
+ default:
+ elog(ERROR, "unexpected RM_XLOG_ID record type: %u", info);
+ }
+}
+
+/*
+ * Handle rmgr XACT_ID records for DecodeRecordIntoReorderBuffer().
+ */
+static void
+DecodeXactOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
+{
+ SnapBuild *builder = ctx->snapshot_builder;
+ ReorderBuffer *reorder = ctx->reorder;
+ XLogReaderState *r = buf->record;
+ uint8 info = XLogRecGetInfo(r) & XLOG_XACT_OPMASK;
+
+ /*
+ * If the snapshot isn't yet fully built, we cannot decode anything, so
+ * bail out.
+ *
+ * However, it's critical to process XLOG_XACT_ASSIGNMENT records even
+ * when the snapshot is being built: it is possible to get later records
+ * that require subxids to be properly assigned.
+ */
+ if (SnapBuildCurrentState(builder) < SNAPBUILD_FULL_SNAPSHOT &&
+ info != XLOG_XACT_ASSIGNMENT)
+ return;
+
+ switch (info)
+ {
+ case XLOG_XACT_COMMIT:
+ case XLOG_XACT_COMMIT_PREPARED:
+ {
+ xl_xact_commit *xlrec;
+ xl_xact_parsed_commit parsed;
+ TransactionId xid;
+
+ xlrec = (xl_xact_commit *) XLogRecGetData(r);
+ ParseCommitRecord(XLogRecGetInfo(buf->record), xlrec, &parsed);
+
+ if (!TransactionIdIsValid(parsed.twophase_xid))
+ xid = XLogRecGetXid(r);
+ else
+ xid = parsed.twophase_xid;
+
+ DecodeCommit(ctx, buf, &parsed, xid);
+ break;
+ }
+ case XLOG_XACT_ABORT:
+ case XLOG_XACT_ABORT_PREPARED:
+ {
+ xl_xact_abort *xlrec;
+ xl_xact_parsed_abort parsed;
+ TransactionId xid;
+
+ xlrec = (xl_xact_abort *) XLogRecGetData(r);
+ ParseAbortRecord(XLogRecGetInfo(buf->record), xlrec, &parsed);
+
+ if (!TransactionIdIsValid(parsed.twophase_xid))
+ xid = XLogRecGetXid(r);
+ else
+ xid = parsed.twophase_xid;
+
+ DecodeAbort(ctx, buf, &parsed, xid);
+ break;
+ }
+ case XLOG_XACT_ASSIGNMENT:
+ {
+ xl_xact_assignment *xlrec;
+ int i;
+ TransactionId *sub_xid;
+
+ xlrec = (xl_xact_assignment *) XLogRecGetData(r);
+
+ sub_xid = &xlrec->xsub[0];
+
+ for (i = 0; i < xlrec->nsubxacts; i++)
+ {
+ ReorderBufferAssignChild(reorder, xlrec->xtop,
+ *(sub_xid++), buf->origptr);
+ }
+ break;
+ }
+ case XLOG_XACT_PREPARE:
+
+ /*
+ * Currently decoding ignores PREPARE TRANSACTION and will just
+ * decode the transaction when the COMMIT PREPARED is sent or
+ * throw away the transaction's contents when a ROLLBACK PREPARED
+ * is received. In the future we could add code to expose prepared
+ * transactions in the changestream allowing for a kind of
+ * distributed 2PC.
+ */
+ ReorderBufferProcessXid(reorder, XLogRecGetXid(r), buf->origptr);
+ break;
+ default:
+ elog(ERROR, "unexpected RM_XACT_ID record type: %u", info);
+ }
+}
+
+/*
+ * Handle rmgr STANDBY_ID records for DecodeRecordIntoReorderBuffer().
+ */
+static void
+DecodeStandbyOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
+{
+ SnapBuild *builder = ctx->snapshot_builder;
+ XLogReaderState *r = buf->record;
+ uint8 info = XLogRecGetInfo(r) & ~XLR_INFO_MASK;
+
+ ReorderBufferProcessXid(ctx->reorder, XLogRecGetXid(r), buf->origptr);
+
+ switch (info)
+ {
+ case XLOG_RUNNING_XACTS:
+ {
+ xl_running_xacts *running = (xl_running_xacts *) XLogRecGetData(r);
+
+ SnapBuildProcessRunningXacts(builder, buf->origptr, running);
+
+ /*
+ * Abort all transactions that we keep track of, that are
+ * older than the record's oldestRunningXid. This is the most
+ * convenient spot for doing so since, in contrast to shutdown
+ * or end-of-recovery checkpoints, we have information about
+ * all running transactions which includes prepared ones,
+ * while shutdown checkpoints just know that no non-prepared
+ * transactions are in progress.
+ */
+ ReorderBufferAbortOld(ctx->reorder, running->oldestRunningXid);
+ }
+ break;
+ case XLOG_STANDBY_LOCK:
+ break;
+ case XLOG_INVALIDATIONS:
+ {
+ xl_invalidations *invalidations =
+ (xl_invalidations *) XLogRecGetData(r);
+
+ if (!ctx->fast_forward)
+ ReorderBufferImmediateInvalidation(ctx->reorder,
+ invalidations->nmsgs,
+ invalidations->msgs);
+ }
+ break;
+ default:
+ elog(ERROR, "unexpected RM_STANDBY_ID record type: %u", info);
+ }
+}
+
+/*
+ * Handle rmgr HEAP2_ID records for DecodeRecordIntoReorderBuffer().
+ */
+static void
+DecodeHeap2Op(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
+{
+ uint8 info = XLogRecGetInfo(buf->record) & XLOG_HEAP_OPMASK;
+ TransactionId xid = XLogRecGetXid(buf->record);
+ SnapBuild *builder = ctx->snapshot_builder;
+
+ ReorderBufferProcessXid(ctx->reorder, xid, buf->origptr);
+
+ /*
+ * If we don't have snapshot or we are just fast-forwarding, there is no
+ * point in decoding changes.
+ */
+ if (SnapBuildCurrentState(builder) < SNAPBUILD_FULL_SNAPSHOT ||
+ ctx->fast_forward)
+ return;
+
+ switch (info)
+ {
+ case XLOG_HEAP2_MULTI_INSERT:
+ if (!ctx->fast_forward &&
+ SnapBuildProcessChange(builder, xid, buf->origptr))
+ DecodeMultiInsert(ctx, buf);
+ break;
+ case XLOG_HEAP2_NEW_CID:
+ {
+ xl_heap_new_cid *xlrec;
+
+ xlrec = (xl_heap_new_cid *) XLogRecGetData(buf->record);
+ SnapBuildProcessNewCid(builder, xid, buf->origptr, xlrec);
+
+ break;
+ }
+ case XLOG_HEAP2_REWRITE:
+
+ /*
+ * Although these records only exist to serve the needs of logical
+ * decoding, all the work happens as part of crash or archive
+ * recovery, so we don't need to do anything here.
+ */
+ break;
+
+ /*
+ * Everything else here is just low level physical stuff we're not
+ * interested in.
+ */
+ case XLOG_HEAP2_FREEZE_PAGE:
+ case XLOG_HEAP2_CLEAN:
+ case XLOG_HEAP2_CLEANUP_INFO:
+ case XLOG_HEAP2_VISIBLE:
+ case XLOG_HEAP2_LOCK_UPDATED:
+ break;
+ default:
+ elog(ERROR, "unexpected RM_HEAP2_ID record type: %u", info);
+ }
+}
+
+/*
+ * Handle rmgr HEAP_ID records for DecodeRecordIntoReorderBuffer().
+ */
+static void
+DecodeHeapOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
+{
+ uint8 info = XLogRecGetInfo(buf->record) & XLOG_HEAP_OPMASK;
+ TransactionId xid = XLogRecGetXid(buf->record);
+ SnapBuild *builder = ctx->snapshot_builder;
+
+ ReorderBufferProcessXid(ctx->reorder, xid, buf->origptr);
+
+ /*
+ * If we don't have snapshot or we are just fast-forwarding, there is no
+ * point in decoding data changes.
+ */
+ if (SnapBuildCurrentState(builder) < SNAPBUILD_FULL_SNAPSHOT ||
+ ctx->fast_forward)
+ return;
+
+ switch (info)
+ {
+ case XLOG_HEAP_INSERT:
+ if (SnapBuildProcessChange(builder, xid, buf->origptr))
+ DecodeInsert(ctx, buf);
+ break;
+
+ /*
+ * Treat HOT update as normal updates. There is no useful
+ * information in the fact that we could make it a HOT update
+ * locally and the WAL layout is compatible.
+ */
+ case XLOG_HEAP_HOT_UPDATE:
+ case XLOG_HEAP_UPDATE:
+ if (SnapBuildProcessChange(builder, xid, buf->origptr))
+ DecodeUpdate(ctx, buf);
+ break;
+
+ case XLOG_HEAP_DELETE:
+ if (SnapBuildProcessChange(builder, xid, buf->origptr))
+ DecodeDelete(ctx, buf);
+ break;
+
+ case XLOG_HEAP_TRUNCATE:
+ if (SnapBuildProcessChange(builder, xid, buf->origptr))
+ DecodeTruncate(ctx, buf);
+ break;
+
+ case XLOG_HEAP_INPLACE:
+
+ /*
+ * Inplace updates are only ever performed on catalog tuples and
+ * can, per definition, not change tuple visibility. Since we
+ * don't decode catalog tuples, we're not interested in the
+ * record's contents.
+ *
+ * In-place updates can be used either by XID-bearing transactions
+ * (e.g. in CREATE INDEX CONCURRENTLY) or by XID-less
+ * transactions (e.g. VACUUM). In the former case, the commit
+ * record will include cache invalidations, so we mark the
+ * transaction as catalog modifying here. Currently that's
+ * redundant because the commit will do that as well, but once we
+ * support decoding in-progress relations, this will be important.
+ */
+ if (!TransactionIdIsValid(xid))
+ break;
+
+ SnapBuildProcessChange(builder, xid, buf->origptr);
+ ReorderBufferXidSetCatalogChanges(ctx->reorder, xid, buf->origptr);
+ break;
+
+ case XLOG_HEAP_CONFIRM:
+ if (SnapBuildProcessChange(builder, xid, buf->origptr))
+ DecodeSpecConfirm(ctx, buf);
+ break;
+
+ case XLOG_HEAP_LOCK:
+ /* we don't care about row level locks for now */
+ break;
+
+ default:
+ elog(ERROR, "unexpected RM_HEAP_ID record type: %u", info);
+ break;
+ }
+}
+
+static inline bool
+FilterByOrigin(LogicalDecodingContext *ctx, RepOriginId origin_id)
+{
+ if (ctx->callbacks.filter_by_origin_cb == NULL)
+ return false;
+
+ return filter_by_origin_cb_wrapper(ctx, origin_id);
+}
+
+/*
+ * Handle rmgr LOGICALMSG_ID records for DecodeRecordIntoReorderBuffer().
+ */
+static void
+DecodeLogicalMsgOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
+{
+ SnapBuild *builder = ctx->snapshot_builder;
+ XLogReaderState *r = buf->record;
+ TransactionId xid = XLogRecGetXid(r);
+ uint8 info = XLogRecGetInfo(r) & ~XLR_INFO_MASK;
+ RepOriginId origin_id = XLogRecGetOrigin(r);
+ Snapshot snapshot;
+ xl_logical_message *message;
+
+ if (info != XLOG_LOGICAL_MESSAGE)
+ elog(ERROR, "unexpected RM_LOGICALMSG_ID record type: %u", info);
+
+ ReorderBufferProcessXid(ctx->reorder, XLogRecGetXid(r), buf->origptr);
+
+ /*
+ * If we don't have snapshot or we are just fast-forwarding, there is no
+ * point in decoding messages.
+ */
+ if (SnapBuildCurrentState(builder) < SNAPBUILD_FULL_SNAPSHOT ||
+ ctx->fast_forward)
+ return;
+
+ message = (xl_logical_message *) XLogRecGetData(r);
+
+ if (message->dbId != ctx->slot->data.database ||
+ FilterByOrigin(ctx, origin_id))
+ return;
+
+ if (message->transactional &&
+ !SnapBuildProcessChange(builder, xid, buf->origptr))
+ return;
+ else if (!message->transactional &&
+ (SnapBuildCurrentState(builder) != SNAPBUILD_CONSISTENT ||
+ SnapBuildXactNeedsSkip(builder, buf->origptr)))
+ return;
+
+ snapshot = SnapBuildGetOrBuildSnapshot(builder, xid);
+ ReorderBufferQueueMessage(ctx->reorder, xid, snapshot, buf->endptr,
+ message->transactional,
+ message->message, /* first part of message is
+ * prefix */
+ message->message_size,
+ message->message + message->prefix_size);
+}
+
+/*
+ * Consolidated commit record handling between the different form of commit
+ * records.
+ */
+static void
+DecodeCommit(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
+ xl_xact_parsed_commit *parsed, TransactionId xid)
+{
+ XLogRecPtr origin_lsn = InvalidXLogRecPtr;
+ TimestampTz commit_time = parsed->xact_time;
+ RepOriginId origin_id = XLogRecGetOrigin(buf->record);
+ int i;
+
+ if (parsed->xinfo & XACT_XINFO_HAS_ORIGIN)
+ {
+ origin_lsn = parsed->origin_lsn;
+ commit_time = parsed->origin_timestamp;
+ }
+
+ /*
+ * Process invalidation messages, even if we're not interested in the
+ * transaction's contents, since the various caches need to always be
+ * consistent.
+ */
+ if (parsed->nmsgs > 0)
+ {
+ if (!ctx->fast_forward)
+ ReorderBufferAddInvalidations(ctx->reorder, xid, buf->origptr,
+ parsed->nmsgs, parsed->msgs);
+ ReorderBufferXidSetCatalogChanges(ctx->reorder, xid, buf->origptr);
+ }
+
+ SnapBuildCommitTxn(ctx->snapshot_builder, buf->origptr, xid,
+ parsed->nsubxacts, parsed->subxacts);
+
+ /* ----
+ * Check whether we are interested in this specific transaction, and tell
+ * the reorderbuffer to forget the content of the (sub-)transactions
+ * if not.
+ *
+ * There can be several reasons we might not be interested in this
+ * transaction:
+ * 1) We might not be interested in decoding transactions up to this
+ * LSN. This can happen because we previously decoded it and now just
+ * are restarting or if we haven't assembled a consistent snapshot yet.
+ * 2) The transaction happened in another database.
+ * 3) The output plugin is not interested in the origin.
+ * 4) We are doing fast-forwarding
+ *
+ * We can't just use ReorderBufferAbort() here, because we need to execute
+ * the transaction's invalidations. This currently won't be needed if
+ * we're just skipping over the transaction because currently we only do
+ * so during startup, to get to the first transaction the client needs. As
+ * we have reset the catalog caches before starting to read WAL, and we
+ * haven't yet touched any catalogs, there can't be anything to invalidate.
+ * But if we're "forgetting" this commit because it's it happened in
+ * another database, the invalidations might be important, because they
+ * could be for shared catalogs and we might have loaded data into the
+ * relevant syscaches.
+ * ---
+ */
+ if (SnapBuildXactNeedsSkip(ctx->snapshot_builder, buf->origptr) ||
+ (parsed->dbId != InvalidOid && parsed->dbId != ctx->slot->data.database) ||
+ ctx->fast_forward || FilterByOrigin(ctx, origin_id))
+ {
+ for (i = 0; i < parsed->nsubxacts; i++)
+ {
+ ReorderBufferForget(ctx->reorder, parsed->subxacts[i], buf->origptr);
+ }
+ ReorderBufferForget(ctx->reorder, xid, buf->origptr);
+
+ return;
+ }
+
+ /* tell the reorderbuffer about the surviving subtransactions */
+ for (i = 0; i < parsed->nsubxacts; i++)
+ {
+ ReorderBufferCommitChild(ctx->reorder, xid, parsed->subxacts[i],
+ buf->origptr, buf->endptr);
+ }
+
+ /* replay actions of all transaction + subtransactions in order */
+ ReorderBufferCommit(ctx->reorder, xid, buf->origptr, buf->endptr,
+ commit_time, origin_id, origin_lsn);
+}
+
+/*
+ * Get the data from the various forms of abort records and pass it on to
+ * snapbuild.c and reorderbuffer.c
+ */
+static void
+DecodeAbort(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
+ xl_xact_parsed_abort *parsed, TransactionId xid)
+{
+ int i;
+
+ for (i = 0; i < parsed->nsubxacts; i++)
+ {
+ ReorderBufferAbort(ctx->reorder, parsed->subxacts[i],
+ buf->record->EndRecPtr);
+ }
+
+ ReorderBufferAbort(ctx->reorder, xid, buf->record->EndRecPtr);
+}
+
+/*
+ * Parse XLOG_HEAP_INSERT (not MULTI_INSERT!) records into tuplebufs.
+ *
+ * Deletes can contain the new tuple.
+ */
+static void
+DecodeInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
+{
+ Size datalen;
+ char *tupledata;
+ Size tuplelen;
+ XLogReaderState *r = buf->record;
+ xl_heap_insert *xlrec;
+ ReorderBufferChange *change;
+ RelFileNode target_node;
+
+ xlrec = (xl_heap_insert *) XLogRecGetData(r);
+
+ /*
+ * Ignore insert records without new tuples (this does happen when
+ * raw_heap_insert marks the TOAST record as HEAP_INSERT_NO_LOGICAL).
+ */
+ if (!(xlrec->flags & XLH_INSERT_CONTAINS_NEW_TUPLE))
+ return;
+
+ /* only interested in our database */
+ XLogRecGetBlockTag(r, 0, &target_node, NULL, NULL);
+ if (target_node.dbNode != ctx->slot->data.database)
+ return;
+
+ /* output plugin doesn't look for this origin, no need to queue */
+ if (FilterByOrigin(ctx, XLogRecGetOrigin(r)))
+ return;
+
+ change = ReorderBufferGetChange(ctx->reorder);
+ if (!(xlrec->flags & XLH_INSERT_IS_SPECULATIVE))
+ change->action = REORDER_BUFFER_CHANGE_INSERT;
+ else
+ change->action = REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT;
+ change->origin_id = XLogRecGetOrigin(r);
+
+ memcpy(&change->data.tp.relnode, &target_node, sizeof(RelFileNode));
+
+ tupledata = XLogRecGetBlockData(r, 0, &datalen);
+ tuplelen = datalen - SizeOfHeapHeader;
+
+ change->data.tp.newtuple =
+ ReorderBufferGetTupleBuf(ctx->reorder, tuplelen);
+
+ DecodeXLogTuple(tupledata, datalen, change->data.tp.newtuple);
+
+ change->data.tp.clear_toast_afterwards = true;
+
+ ReorderBufferQueueChange(ctx->reorder, XLogRecGetXid(r), buf->origptr, change);
+}
+
+/*
+ * Parse XLOG_HEAP_UPDATE and XLOG_HEAP_HOT_UPDATE, which have the same layout
+ * in the record, from wal into proper tuplebufs.
+ *
+ * Updates can possibly contain a new tuple and the old primary key.
+ */
+static void
+DecodeUpdate(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
+{
+ XLogReaderState *r = buf->record;
+ xl_heap_update *xlrec;
+ ReorderBufferChange *change;
+ char *data;
+ RelFileNode target_node;
+
+ xlrec = (xl_heap_update *) XLogRecGetData(r);
+
+ /* only interested in our database */
+ XLogRecGetBlockTag(r, 0, &target_node, NULL, NULL);
+ if (target_node.dbNode != ctx->slot->data.database)
+ return;
+
+ /* output plugin doesn't look for this origin, no need to queue */
+ if (FilterByOrigin(ctx, XLogRecGetOrigin(r)))
+ return;
+
+ change = ReorderBufferGetChange(ctx->reorder);
+ change->action = REORDER_BUFFER_CHANGE_UPDATE;
+ change->origin_id = XLogRecGetOrigin(r);
+ memcpy(&change->data.tp.relnode, &target_node, sizeof(RelFileNode));
+
+ if (xlrec->flags & XLH_UPDATE_CONTAINS_NEW_TUPLE)
+ {
+ Size datalen;
+ Size tuplelen;
+
+ data = XLogRecGetBlockData(r, 0, &datalen);
+
+ tuplelen = datalen - SizeOfHeapHeader;
+
+ change->data.tp.newtuple =
+ ReorderBufferGetTupleBuf(ctx->reorder, tuplelen);
+
+ DecodeXLogTuple(data, datalen, change->data.tp.newtuple);
+ }
+
+ if (xlrec->flags & XLH_UPDATE_CONTAINS_OLD)
+ {
+ Size datalen;
+ Size tuplelen;
+
+ /* caution, remaining data in record is not aligned */
+ data = XLogRecGetData(r) + SizeOfHeapUpdate;
+ datalen = XLogRecGetDataLen(r) - SizeOfHeapUpdate;
+ tuplelen = datalen - SizeOfHeapHeader;
+
+ change->data.tp.oldtuple =
+ ReorderBufferGetTupleBuf(ctx->reorder, tuplelen);
+
+ DecodeXLogTuple(data, datalen, change->data.tp.oldtuple);
+ }
+
+ change->data.tp.clear_toast_afterwards = true;
+
+ ReorderBufferQueueChange(ctx->reorder, XLogRecGetXid(r), buf->origptr, change);
+}
+
+/*
+ * Parse XLOG_HEAP_DELETE from wal into proper tuplebufs.
+ *
+ * Deletes can possibly contain the old primary key.
+ */
+static void
+DecodeDelete(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
+{
+ XLogReaderState *r = buf->record;
+ xl_heap_delete *xlrec;
+ ReorderBufferChange *change;
+ RelFileNode target_node;
+
+ xlrec = (xl_heap_delete *) XLogRecGetData(r);
+
+ /* only interested in our database */
+ XLogRecGetBlockTag(r, 0, &target_node, NULL, NULL);
+ if (target_node.dbNode != ctx->slot->data.database)
+ return;
+
+ /* output plugin doesn't look for this origin, no need to queue */
+ if (FilterByOrigin(ctx, XLogRecGetOrigin(r)))
+ return;
+
+ change = ReorderBufferGetChange(ctx->reorder);
+
+ if (xlrec->flags & XLH_DELETE_IS_SUPER)
+ change->action = REORDER_BUFFER_CHANGE_INTERNAL_SPEC_ABORT;
+ else
+ change->action = REORDER_BUFFER_CHANGE_DELETE;
+
+ change->origin_id = XLogRecGetOrigin(r);
+
+ memcpy(&change->data.tp.relnode, &target_node, sizeof(RelFileNode));
+
+ /* old primary key stored */
+ if (xlrec->flags & XLH_DELETE_CONTAINS_OLD)
+ {
+ Size datalen = XLogRecGetDataLen(r) - SizeOfHeapDelete;
+ Size tuplelen = datalen - SizeOfHeapHeader;
+
+ Assert(XLogRecGetDataLen(r) > (SizeOfHeapDelete + SizeOfHeapHeader));
+
+ change->data.tp.oldtuple =
+ ReorderBufferGetTupleBuf(ctx->reorder, tuplelen);
+
+ DecodeXLogTuple((char *) xlrec + SizeOfHeapDelete,
+ datalen, change->data.tp.oldtuple);
+ }
+
+ change->data.tp.clear_toast_afterwards = true;
+
+ ReorderBufferQueueChange(ctx->reorder, XLogRecGetXid(r), buf->origptr, change);
+}
+
+/*
+ * Parse XLOG_HEAP_TRUNCATE from wal
+ */
+static void
+DecodeTruncate(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
+{
+ XLogReaderState *r = buf->record;
+ xl_heap_truncate *xlrec;
+ ReorderBufferChange *change;
+
+ xlrec = (xl_heap_truncate *) XLogRecGetData(r);
+
+ /* only interested in our database */
+ if (xlrec->dbId != ctx->slot->data.database)
+ return;
+
+ /* output plugin doesn't look for this origin, no need to queue */
+ if (FilterByOrigin(ctx, XLogRecGetOrigin(r)))
+ return;
+
+ change = ReorderBufferGetChange(ctx->reorder);
+ change->action = REORDER_BUFFER_CHANGE_TRUNCATE;
+ change->origin_id = XLogRecGetOrigin(r);
+ if (xlrec->flags & XLH_TRUNCATE_CASCADE)
+ change->data.truncate.cascade = true;
+ if (xlrec->flags & XLH_TRUNCATE_RESTART_SEQS)
+ change->data.truncate.restart_seqs = true;
+ change->data.truncate.nrelids = xlrec->nrelids;
+ change->data.truncate.relids = ReorderBufferGetRelids(ctx->reorder,
+ xlrec->nrelids);
+ memcpy(change->data.truncate.relids, xlrec->relids,
+ xlrec->nrelids * sizeof(Oid));
+ ReorderBufferQueueChange(ctx->reorder, XLogRecGetXid(r),
+ buf->origptr, change);
+}
+
+/*
+ * Decode XLOG_HEAP2_MULTI_INSERT_insert record into multiple tuplebufs.
+ *
+ * Currently MULTI_INSERT will always contain the full tuples.
+ */
+static void
+DecodeMultiInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
+{
+ XLogReaderState *r = buf->record;
+ xl_heap_multi_insert *xlrec;
+ int i;
+ char *data;
+ char *tupledata;
+ Size tuplelen;
+ RelFileNode rnode;
+
+ xlrec = (xl_heap_multi_insert *) XLogRecGetData(r);
+
+ /*
+ * Ignore insert records without new tuples. This happens when a
+ * multi_insert is done on a catalog or on a non-persistent relation.
+ */
+ if (!(xlrec->flags & XLH_INSERT_CONTAINS_NEW_TUPLE))
+ return;
+
+ /* only interested in our database */
+ XLogRecGetBlockTag(r, 0, &rnode, NULL, NULL);
+ if (rnode.dbNode != ctx->slot->data.database)
+ return;
+
+ /* output plugin doesn't look for this origin, no need to queue */
+ if (FilterByOrigin(ctx, XLogRecGetOrigin(r)))
+ return;
+
+ /*
+ * We know that this multi_insert isn't for a catalog, so the block should
+ * always have data even if a full-page write of it is taken.
+ */
+ tupledata = XLogRecGetBlockData(r, 0, &tuplelen);
+ Assert(tupledata != NULL);
+
+ data = tupledata;
+ for (i = 0; i < xlrec->ntuples; i++)
+ {
+ ReorderBufferChange *change;
+ xl_multi_insert_tuple *xlhdr;
+ int datalen;
+ ReorderBufferTupleBuf *tuple;
+ HeapTupleHeader header;
+
+ change = ReorderBufferGetChange(ctx->reorder);
+ change->action = REORDER_BUFFER_CHANGE_INSERT;
+ change->origin_id = XLogRecGetOrigin(r);
+
+ memcpy(&change->data.tp.relnode, &rnode, sizeof(RelFileNode));
+
+ xlhdr = (xl_multi_insert_tuple *) SHORTALIGN(data);
+ data = ((char *) xlhdr) + SizeOfMultiInsertTuple;
+ datalen = xlhdr->datalen;
+
+ change->data.tp.newtuple =
+ ReorderBufferGetTupleBuf(ctx->reorder, datalen);
+
+ tuple = change->data.tp.newtuple;
+ header = tuple->tuple.t_data;
+
+ /* not a disk based tuple */
+ ItemPointerSetInvalid(&tuple->tuple.t_self);
+
+ /*
+ * We can only figure this out after reassembling the transactions.
+ */
+ tuple->tuple.t_tableOid = InvalidOid;
+
+ tuple->tuple.t_len = datalen + SizeofHeapTupleHeader;
+
+ memset(header, 0, SizeofHeapTupleHeader);
+
+ memcpy((char *) tuple->tuple.t_data + SizeofHeapTupleHeader,
+ (char *) data,
+ datalen);
+ header->t_infomask = xlhdr->t_infomask;
+ header->t_infomask2 = xlhdr->t_infomask2;
+ header->t_hoff = xlhdr->t_hoff;
+
+ /*
+ * Reset toast reassembly state only after the last row in the last
+ * xl_multi_insert_tuple record emitted by one heap_multi_insert()
+ * call.
+ */
+ if (xlrec->flags & XLH_INSERT_LAST_IN_MULTI &&
+ (i + 1) == xlrec->ntuples)
+ change->data.tp.clear_toast_afterwards = true;
+ else
+ change->data.tp.clear_toast_afterwards = false;
+
+ ReorderBufferQueueChange(ctx->reorder, XLogRecGetXid(r),
+ buf->origptr, change);
+
+ /* move to the next xl_multi_insert_tuple entry */
+ data += datalen;
+ }
+ Assert(data == tupledata + tuplelen);
+}
+
+/*
+ * Parse XLOG_HEAP_CONFIRM from wal into a confirmation change.
+ *
+ * This is pretty trivial, all the state essentially already setup by the
+ * speculative insertion.
+ */
+static void
+DecodeSpecConfirm(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
+{
+ XLogReaderState *r = buf->record;
+ ReorderBufferChange *change;
+ RelFileNode target_node;
+
+ /* only interested in our database */
+ XLogRecGetBlockTag(r, 0, &target_node, NULL, NULL);
+ if (target_node.dbNode != ctx->slot->data.database)
+ return;
+
+ /* output plugin doesn't look for this origin, no need to queue */
+ if (FilterByOrigin(ctx, XLogRecGetOrigin(r)))
+ return;
+
+ change = ReorderBufferGetChange(ctx->reorder);
+ change->action = REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM;
+ change->origin_id = XLogRecGetOrigin(r);
+
+ memcpy(&change->data.tp.relnode, &target_node, sizeof(RelFileNode));
+
+ change->data.tp.clear_toast_afterwards = true;
+
+ ReorderBufferQueueChange(ctx->reorder, XLogRecGetXid(r), buf->origptr, change);
+}
+
+
+/*
+ * Read a HeapTuple as WAL logged by heap_insert, heap_update and heap_delete
+ * (but not by heap_multi_insert) into a tuplebuf.
+ *
+ * The size 'len' and the pointer 'data' in the record need to be
+ * computed outside as they are record specific.
+ */
+static void
+DecodeXLogTuple(char *data, Size len, ReorderBufferTupleBuf *tuple)
+{
+ xl_heap_header xlhdr;
+ int datalen = len - SizeOfHeapHeader;
+ HeapTupleHeader header;
+
+ Assert(datalen >= 0);
+
+ tuple->tuple.t_len = datalen + SizeofHeapTupleHeader;
+ header = tuple->tuple.t_data;
+
+ /* not a disk based tuple */
+ ItemPointerSetInvalid(&tuple->tuple.t_self);
+
+ /* we can only figure this out after reassembling the transactions */
+ tuple->tuple.t_tableOid = InvalidOid;
+
+ /* data is not stored aligned, copy to aligned storage */
+ memcpy((char *) &xlhdr,
+ data,
+ SizeOfHeapHeader);
+
+ memset(header, 0, SizeofHeapTupleHeader);
+
+ memcpy(((char *) tuple->tuple.t_data) + SizeofHeapTupleHeader,
+ data + SizeOfHeapHeader,
+ datalen);
+
+ header->t_infomask = xlhdr.t_infomask;
+ header->t_infomask2 = xlhdr.t_infomask2;
+ header->t_hoff = xlhdr.t_hoff;
+}