summaryrefslogtreecommitdiffstats
path: root/src/include/replication
diff options
context:
space:
mode:
Diffstat (limited to 'src/include/replication')
-rw-r--r--src/include/replication/decode.h34
-rw-r--r--src/include/replication/logical.h147
-rw-r--r--src/include/replication/logicallauncher.h29
-rw-r--r--src/include/replication/logicalproto.h254
-rw-r--r--src/include/replication/logicalrelation.h50
-rw-r--r--src/include/replication/logicalworker.h19
-rw-r--r--src/include/replication/message.h41
-rw-r--r--src/include/replication/origin.h73
-rw-r--r--src/include/replication/output_plugin.h248
-rw-r--r--src/include/replication/pgoutput.h34
-rw-r--r--src/include/replication/reorderbuffer.h685
-rw-r--r--src/include/replication/slot.h230
-rw-r--r--src/include/replication/snapbuild.h97
-rw-r--r--src/include/replication/syncrep.h115
-rw-r--r--src/include/replication/walreceiver.h472
-rw-r--r--src/include/replication/walsender.h74
-rw-r--r--src/include/replication/walsender_private.h128
-rw-r--r--src/include/replication/worker_internal.h112
18 files changed, 2842 insertions, 0 deletions
diff --git a/src/include/replication/decode.h b/src/include/replication/decode.h
new file mode 100644
index 0000000..741bf65
--- /dev/null
+++ b/src/include/replication/decode.h
@@ -0,0 +1,34 @@
+/*-------------------------------------------------------------------------
+ * decode.h
+ * PostgreSQL WAL to logical transformation
+ *
+ * Portions Copyright (c) 2012-2022, PostgreSQL Global Development Group
+ *
+ *-------------------------------------------------------------------------
+ */
+#ifndef DECODE_H
+#define DECODE_H
+
+#include "access/xlogreader.h"
+#include "access/xlogrecord.h"
+#include "replication/logical.h"
+#include "replication/reorderbuffer.h"
+
+typedef struct XLogRecordBuffer
+{
+ XLogRecPtr origptr;
+ XLogRecPtr endptr;
+ XLogReaderState *record;
+} XLogRecordBuffer;
+
+extern void xlog_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
+extern void heap_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
+extern void heap2_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
+extern void xact_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
+extern void standby_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
+extern void logicalmsg_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
+
+extern void LogicalDecodingProcessRecord(LogicalDecodingContext *ctx,
+ XLogReaderState *record);
+
+#endif
diff --git a/src/include/replication/logical.h b/src/include/replication/logical.h
new file mode 100644
index 0000000..edadacd
--- /dev/null
+++ b/src/include/replication/logical.h
@@ -0,0 +1,147 @@
+/*-------------------------------------------------------------------------
+ * logical.h
+ * PostgreSQL logical decoding coordination
+ *
+ * Copyright (c) 2012-2022, PostgreSQL Global Development Group
+ *
+ *-------------------------------------------------------------------------
+ */
+#ifndef LOGICAL_H
+#define LOGICAL_H
+
+#include "access/xlog.h"
+#include "access/xlogreader.h"
+#include "replication/output_plugin.h"
+#include "replication/slot.h"
+
+struct LogicalDecodingContext;
+
+typedef void (*LogicalOutputPluginWriterWrite) (struct LogicalDecodingContext *lr,
+ XLogRecPtr Ptr,
+ TransactionId xid,
+ bool last_write
+);
+
+typedef LogicalOutputPluginWriterWrite LogicalOutputPluginWriterPrepareWrite;
+
+typedef void (*LogicalOutputPluginWriterUpdateProgress) (struct LogicalDecodingContext *lr,
+ XLogRecPtr Ptr,
+ TransactionId xid,
+ bool skipped_xact
+);
+
+typedef struct LogicalDecodingContext
+{
+ /* memory context this is all allocated in */
+ MemoryContext context;
+
+ /* The associated replication slot */
+ ReplicationSlot *slot;
+
+ /* infrastructure pieces for decoding */
+ XLogReaderState *reader;
+ struct ReorderBuffer *reorder;
+ struct SnapBuild *snapshot_builder;
+
+ /*
+ * Marks the logical decoding context as fast forward decoding one. Such a
+ * context does not have plugin loaded so most of the following properties
+ * are unused.
+ */
+ bool fast_forward;
+
+ OutputPluginCallbacks callbacks;
+ OutputPluginOptions options;
+
+ /*
+ * User specified options
+ */
+ List *output_plugin_options;
+
+ /*
+ * User-Provided callback for writing/streaming out data.
+ */
+ LogicalOutputPluginWriterPrepareWrite prepare_write;
+ LogicalOutputPluginWriterWrite write;
+ LogicalOutputPluginWriterUpdateProgress update_progress;
+
+ /*
+ * Output buffer.
+ */
+ StringInfo out;
+
+ /*
+ * Private data pointer of the output plugin.
+ */
+ void *output_plugin_private;
+
+ /*
+ * Private data pointer for the data writer.
+ */
+ void *output_writer_private;
+
+ /*
+ * Does the output plugin support streaming, and is it enabled?
+ */
+ bool streaming;
+
+ /*
+ * Does the output plugin support two-phase decoding, and is it enabled?
+ */
+ bool twophase;
+
+ /*
+ * Is two-phase option given by output plugin?
+ *
+ * This flag indicates that the plugin passed in the two-phase option as
+ * part of the START_STREAMING command. We can't rely solely on the
+ * twophase flag which only tells whether the plugin provided all the
+ * necessary two-phase callbacks.
+ */
+ bool twophase_opt_given;
+
+ /*
+ * State for writing output.
+ */
+ bool accept_writes;
+ bool prepared_write;
+ XLogRecPtr write_location;
+ TransactionId write_xid;
+ /* Are we processing the end LSN of a transaction? */
+ bool end_xact;
+} LogicalDecodingContext;
+
+
+extern void CheckLogicalDecodingRequirements(void);
+
+extern LogicalDecodingContext *CreateInitDecodingContext(const char *plugin,
+ List *output_plugin_options,
+ bool need_full_snapshot,
+ XLogRecPtr restart_lsn,
+ XLogReaderRoutine *xl_routine,
+ LogicalOutputPluginWriterPrepareWrite prepare_write,
+ LogicalOutputPluginWriterWrite do_write,
+ LogicalOutputPluginWriterUpdateProgress update_progress);
+extern LogicalDecodingContext *CreateDecodingContext(XLogRecPtr start_lsn,
+ List *output_plugin_options,
+ bool fast_forward,
+ XLogReaderRoutine *xl_routine,
+ LogicalOutputPluginWriterPrepareWrite prepare_write,
+ LogicalOutputPluginWriterWrite do_write,
+ LogicalOutputPluginWriterUpdateProgress update_progress);
+extern void DecodingContextFindStartpoint(LogicalDecodingContext *ctx);
+extern bool DecodingContextReady(LogicalDecodingContext *ctx);
+extern void FreeDecodingContext(LogicalDecodingContext *ctx);
+
+extern void LogicalIncreaseXminForSlot(XLogRecPtr lsn, TransactionId xmin);
+extern void LogicalIncreaseRestartDecodingForSlot(XLogRecPtr current_lsn,
+ XLogRecPtr restart_lsn);
+extern void LogicalConfirmReceivedLocation(XLogRecPtr lsn);
+
+extern bool filter_prepare_cb_wrapper(LogicalDecodingContext *ctx,
+ TransactionId xid, const char *gid);
+extern bool filter_by_origin_cb_wrapper(LogicalDecodingContext *ctx, RepOriginId origin_id);
+extern void ResetLogicalStreamingState(void);
+extern void UpdateDecodingStats(LogicalDecodingContext *ctx);
+
+#endif
diff --git a/src/include/replication/logicallauncher.h b/src/include/replication/logicallauncher.h
new file mode 100644
index 0000000..f1e2821
--- /dev/null
+++ b/src/include/replication/logicallauncher.h
@@ -0,0 +1,29 @@
+/*-------------------------------------------------------------------------
+ *
+ * logicallauncher.h
+ * Exports for logical replication launcher.
+ *
+ * Portions Copyright (c) 2016-2022, PostgreSQL Global Development Group
+ *
+ * src/include/replication/logicallauncher.h
+ *
+ *-------------------------------------------------------------------------
+ */
+#ifndef LOGICALLAUNCHER_H
+#define LOGICALLAUNCHER_H
+
+extern PGDLLIMPORT int max_logical_replication_workers;
+extern PGDLLIMPORT int max_sync_workers_per_subscription;
+
+extern void ApplyLauncherRegister(void);
+extern void ApplyLauncherMain(Datum main_arg);
+
+extern Size ApplyLauncherShmemSize(void);
+extern void ApplyLauncherShmemInit(void);
+
+extern void ApplyLauncherWakeupAtCommit(void);
+extern void AtEOXact_ApplyLauncher(bool isCommit);
+
+extern bool IsLogicalLauncher(void);
+
+#endif /* LOGICALLAUNCHER_H */
diff --git a/src/include/replication/logicalproto.h b/src/include/replication/logicalproto.h
new file mode 100644
index 0000000..8cdc04a
--- /dev/null
+++ b/src/include/replication/logicalproto.h
@@ -0,0 +1,254 @@
+/*-------------------------------------------------------------------------
+ *
+ * logicalproto.h
+ * logical replication protocol
+ *
+ * Copyright (c) 2015-2022, PostgreSQL Global Development Group
+ *
+ * IDENTIFICATION
+ * src/include/replication/logicalproto.h
+ *
+ *-------------------------------------------------------------------------
+ */
+#ifndef LOGICAL_PROTO_H
+#define LOGICAL_PROTO_H
+
+#include "access/xact.h"
+#include "executor/tuptable.h"
+#include "replication/reorderbuffer.h"
+#include "utils/rel.h"
+
+/*
+ * Protocol capabilities
+ *
+ * LOGICALREP_PROTO_VERSION_NUM is our native protocol.
+ * LOGICALREP_PROTO_MAX_VERSION_NUM is the greatest version we can support.
+ * LOGICALREP_PROTO_MIN_VERSION_NUM is the oldest version we
+ * have backwards compatibility for. The client requests protocol version at
+ * connect time.
+ *
+ * LOGICALREP_PROTO_STREAM_VERSION_NUM is the minimum protocol version with
+ * support for streaming large transactions. Introduced in PG14.
+ *
+ * LOGICALREP_PROTO_TWOPHASE_VERSION_NUM is the minimum protocol version with
+ * support for two-phase commit decoding (at prepare time). Introduced in PG15.
+ */
+#define LOGICALREP_PROTO_MIN_VERSION_NUM 1
+#define LOGICALREP_PROTO_VERSION_NUM 1
+#define LOGICALREP_PROTO_STREAM_VERSION_NUM 2
+#define LOGICALREP_PROTO_TWOPHASE_VERSION_NUM 3
+#define LOGICALREP_PROTO_MAX_VERSION_NUM LOGICALREP_PROTO_TWOPHASE_VERSION_NUM
+
+/*
+ * Logical message types
+ *
+ * Used by logical replication wire protocol.
+ *
+ * Note: though this is an enum, the values are used to identify message types
+ * in logical replication protocol, which uses a single byte to identify a
+ * message type. Hence the values should be single-byte wide and preferably
+ * human-readable characters.
+ */
+typedef enum LogicalRepMsgType
+{
+ LOGICAL_REP_MSG_BEGIN = 'B',
+ LOGICAL_REP_MSG_COMMIT = 'C',
+ LOGICAL_REP_MSG_ORIGIN = 'O',
+ LOGICAL_REP_MSG_INSERT = 'I',
+ LOGICAL_REP_MSG_UPDATE = 'U',
+ LOGICAL_REP_MSG_DELETE = 'D',
+ LOGICAL_REP_MSG_TRUNCATE = 'T',
+ LOGICAL_REP_MSG_RELATION = 'R',
+ LOGICAL_REP_MSG_TYPE = 'Y',
+ LOGICAL_REP_MSG_MESSAGE = 'M',
+ LOGICAL_REP_MSG_BEGIN_PREPARE = 'b',
+ LOGICAL_REP_MSG_PREPARE = 'P',
+ LOGICAL_REP_MSG_COMMIT_PREPARED = 'K',
+ LOGICAL_REP_MSG_ROLLBACK_PREPARED = 'r',
+ LOGICAL_REP_MSG_STREAM_START = 'S',
+ LOGICAL_REP_MSG_STREAM_STOP = 'E',
+ LOGICAL_REP_MSG_STREAM_COMMIT = 'c',
+ LOGICAL_REP_MSG_STREAM_ABORT = 'A',
+ LOGICAL_REP_MSG_STREAM_PREPARE = 'p'
+} LogicalRepMsgType;
+
+/*
+ * This struct stores a tuple received via logical replication.
+ * Keep in mind that the columns correspond to the *remote* table.
+ */
+typedef struct LogicalRepTupleData
+{
+ /* Array of StringInfos, one per column; some may be unused */
+ StringInfoData *colvalues;
+ /* Array of markers for null/unchanged/text/binary, one per column */
+ char *colstatus;
+ /* Length of above arrays */
+ int ncols;
+} LogicalRepTupleData;
+
+/* Possible values for LogicalRepTupleData.colstatus[colnum] */
+/* These values are also used in the on-the-wire protocol */
+#define LOGICALREP_COLUMN_NULL 'n'
+#define LOGICALREP_COLUMN_UNCHANGED 'u'
+#define LOGICALREP_COLUMN_TEXT 't'
+#define LOGICALREP_COLUMN_BINARY 'b' /* added in PG14 */
+
+typedef uint32 LogicalRepRelId;
+
+/* Relation information */
+typedef struct LogicalRepRelation
+{
+ /* Info coming from the remote side. */
+ LogicalRepRelId remoteid; /* unique id of the relation */
+ char *nspname; /* schema name */
+ char *relname; /* relation name */
+ int natts; /* number of columns */
+ char **attnames; /* column names */
+ Oid *atttyps; /* column types */
+ char replident; /* replica identity */
+ char relkind; /* remote relation kind */
+ Bitmapset *attkeys; /* Bitmap of key columns */
+} LogicalRepRelation;
+
+/* Type mapping info */
+typedef struct LogicalRepTyp
+{
+ Oid remoteid; /* unique id of the remote type */
+ char *nspname; /* schema name of remote type */
+ char *typname; /* name of the remote type */
+} LogicalRepTyp;
+
+/* Transaction info */
+typedef struct LogicalRepBeginData
+{
+ XLogRecPtr final_lsn;
+ TimestampTz committime;
+ TransactionId xid;
+} LogicalRepBeginData;
+
+typedef struct LogicalRepCommitData
+{
+ XLogRecPtr commit_lsn;
+ XLogRecPtr end_lsn;
+ TimestampTz committime;
+} LogicalRepCommitData;
+
+/*
+ * Prepared transaction protocol information for begin_prepare, and prepare.
+ */
+typedef struct LogicalRepPreparedTxnData
+{
+ XLogRecPtr prepare_lsn;
+ XLogRecPtr end_lsn;
+ TimestampTz prepare_time;
+ TransactionId xid;
+ char gid[GIDSIZE];
+} LogicalRepPreparedTxnData;
+
+/*
+ * Prepared transaction protocol information for commit prepared.
+ */
+typedef struct LogicalRepCommitPreparedTxnData
+{
+ XLogRecPtr commit_lsn;
+ XLogRecPtr end_lsn;
+ TimestampTz commit_time;
+ TransactionId xid;
+ char gid[GIDSIZE];
+} LogicalRepCommitPreparedTxnData;
+
+/*
+ * Rollback Prepared transaction protocol information. The prepare information
+ * prepare_end_lsn and prepare_time are used to check if the downstream has
+ * received this prepared transaction in which case it can apply the rollback,
+ * otherwise, it can skip the rollback operation. The gid alone is not
+ * sufficient because the downstream node can have a prepared transaction with
+ * same identifier.
+ */
+typedef struct LogicalRepRollbackPreparedTxnData
+{
+ XLogRecPtr prepare_end_lsn;
+ XLogRecPtr rollback_end_lsn;
+ TimestampTz prepare_time;
+ TimestampTz rollback_time;
+ TransactionId xid;
+ char gid[GIDSIZE];
+} LogicalRepRollbackPreparedTxnData;
+
+extern void logicalrep_write_begin(StringInfo out, ReorderBufferTXN *txn);
+extern void logicalrep_read_begin(StringInfo in,
+ LogicalRepBeginData *begin_data);
+extern void logicalrep_write_commit(StringInfo out, ReorderBufferTXN *txn,
+ XLogRecPtr commit_lsn);
+extern void logicalrep_read_commit(StringInfo in,
+ LogicalRepCommitData *commit_data);
+extern void logicalrep_write_begin_prepare(StringInfo out, ReorderBufferTXN *txn);
+extern void logicalrep_read_begin_prepare(StringInfo in,
+ LogicalRepPreparedTxnData *begin_data);
+extern void logicalrep_write_prepare(StringInfo out, ReorderBufferTXN *txn,
+ XLogRecPtr prepare_lsn);
+extern void logicalrep_read_prepare(StringInfo in,
+ LogicalRepPreparedTxnData *prepare_data);
+extern void logicalrep_write_commit_prepared(StringInfo out, ReorderBufferTXN *txn,
+ XLogRecPtr commit_lsn);
+extern void logicalrep_read_commit_prepared(StringInfo in,
+ LogicalRepCommitPreparedTxnData *prepare_data);
+extern void logicalrep_write_rollback_prepared(StringInfo out, ReorderBufferTXN *txn,
+ XLogRecPtr prepare_end_lsn,
+ TimestampTz prepare_time);
+extern void logicalrep_read_rollback_prepared(StringInfo in,
+ LogicalRepRollbackPreparedTxnData *rollback_data);
+extern void logicalrep_write_stream_prepare(StringInfo out, ReorderBufferTXN *txn,
+ XLogRecPtr prepare_lsn);
+extern void logicalrep_read_stream_prepare(StringInfo in,
+ LogicalRepPreparedTxnData *prepare_data);
+
+extern void logicalrep_write_origin(StringInfo out, const char *origin,
+ XLogRecPtr origin_lsn);
+extern char *logicalrep_read_origin(StringInfo in, XLogRecPtr *origin_lsn);
+extern void logicalrep_write_insert(StringInfo out, TransactionId xid,
+ Relation rel,
+ TupleTableSlot *newslot,
+ bool binary, Bitmapset *columns);
+extern LogicalRepRelId logicalrep_read_insert(StringInfo in, LogicalRepTupleData *newtup);
+extern void logicalrep_write_update(StringInfo out, TransactionId xid,
+ Relation rel,
+ TupleTableSlot *oldslot,
+ TupleTableSlot *newslot, bool binary, Bitmapset *columns);
+extern LogicalRepRelId logicalrep_read_update(StringInfo in,
+ bool *has_oldtuple, LogicalRepTupleData *oldtup,
+ LogicalRepTupleData *newtup);
+extern void logicalrep_write_delete(StringInfo out, TransactionId xid,
+ Relation rel, TupleTableSlot *oldtuple,
+ bool binary, Bitmapset *columns);
+extern LogicalRepRelId logicalrep_read_delete(StringInfo in,
+ LogicalRepTupleData *oldtup);
+extern void logicalrep_write_truncate(StringInfo out, TransactionId xid,
+ int nrelids, Oid relids[],
+ bool cascade, bool restart_seqs);
+extern List *logicalrep_read_truncate(StringInfo in,
+ bool *cascade, bool *restart_seqs);
+extern void logicalrep_write_message(StringInfo out, TransactionId xid, XLogRecPtr lsn,
+ bool transactional, const char *prefix, Size sz, const char *message);
+extern void logicalrep_write_rel(StringInfo out, TransactionId xid,
+ Relation rel, Bitmapset *columns);
+extern LogicalRepRelation *logicalrep_read_rel(StringInfo in);
+extern void logicalrep_write_typ(StringInfo out, TransactionId xid,
+ Oid typoid);
+extern void logicalrep_read_typ(StringInfo out, LogicalRepTyp *ltyp);
+extern void logicalrep_write_stream_start(StringInfo out, TransactionId xid,
+ bool first_segment);
+extern TransactionId logicalrep_read_stream_start(StringInfo in,
+ bool *first_segment);
+extern void logicalrep_write_stream_stop(StringInfo out);
+extern void logicalrep_write_stream_commit(StringInfo out, ReorderBufferTXN *txn,
+ XLogRecPtr commit_lsn);
+extern TransactionId logicalrep_read_stream_commit(StringInfo out,
+ LogicalRepCommitData *commit_data);
+extern void logicalrep_write_stream_abort(StringInfo out, TransactionId xid,
+ TransactionId subxid);
+extern void logicalrep_read_stream_abort(StringInfo in, TransactionId *xid,
+ TransactionId *subxid);
+extern const char *logicalrep_message_type(LogicalRepMsgType action);
+
+#endif /* LOGICAL_PROTO_H */
diff --git a/src/include/replication/logicalrelation.h b/src/include/replication/logicalrelation.h
new file mode 100644
index 0000000..78cd7e7
--- /dev/null
+++ b/src/include/replication/logicalrelation.h
@@ -0,0 +1,50 @@
+/*-------------------------------------------------------------------------
+ *
+ * logicalrelation.h
+ * Relation definitions for logical replication relation mapping.
+ *
+ * Portions Copyright (c) 2016-2022, PostgreSQL Global Development Group
+ *
+ * src/include/replication/logicalrelation.h
+ *
+ *-------------------------------------------------------------------------
+ */
+#ifndef LOGICALRELATION_H
+#define LOGICALRELATION_H
+
+#include "access/attmap.h"
+#include "replication/logicalproto.h"
+
+typedef struct LogicalRepRelMapEntry
+{
+ LogicalRepRelation remoterel; /* key is remoterel.remoteid */
+
+ /*
+ * Validity flag -- when false, revalidate all derived info at next
+ * logicalrep_rel_open. (While the localrel is open, we assume our lock
+ * on that rel ensures the info remains good.)
+ */
+ bool localrelvalid;
+
+ /* Mapping to local relation. */
+ Oid localreloid; /* local relation id */
+ Relation localrel; /* relcache entry (NULL when closed) */
+ AttrMap *attrmap; /* map of local attributes to remote ones */
+ bool updatable; /* Can apply updates/deletes? */
+
+ /* Sync state. */
+ char state;
+ XLogRecPtr statelsn;
+} LogicalRepRelMapEntry;
+
+extern void logicalrep_relmap_update(LogicalRepRelation *remoterel);
+extern void logicalrep_partmap_reset_relmap(LogicalRepRelation *remoterel);
+
+extern LogicalRepRelMapEntry *logicalrep_rel_open(LogicalRepRelId remoteid,
+ LOCKMODE lockmode);
+extern LogicalRepRelMapEntry *logicalrep_partition_open(LogicalRepRelMapEntry *root,
+ Relation partrel, AttrMap *map);
+extern void logicalrep_rel_close(LogicalRepRelMapEntry *rel,
+ LOCKMODE lockmode);
+
+#endif /* LOGICALRELATION_H */
diff --git a/src/include/replication/logicalworker.h b/src/include/replication/logicalworker.h
new file mode 100644
index 0000000..cd1b6e8
--- /dev/null
+++ b/src/include/replication/logicalworker.h
@@ -0,0 +1,19 @@
+/*-------------------------------------------------------------------------
+ *
+ * logicalworker.h
+ * Exports for logical replication workers.
+ *
+ * Portions Copyright (c) 2016-2022, PostgreSQL Global Development Group
+ *
+ * src/include/replication/logicalworker.h
+ *
+ *-------------------------------------------------------------------------
+ */
+#ifndef LOGICALWORKER_H
+#define LOGICALWORKER_H
+
+extern void ApplyWorkerMain(Datum main_arg);
+
+extern bool IsLogicalWorker(void);
+
+#endif /* LOGICALWORKER_H */
diff --git a/src/include/replication/message.h b/src/include/replication/message.h
new file mode 100644
index 0000000..0b396c5
--- /dev/null
+++ b/src/include/replication/message.h
@@ -0,0 +1,41 @@
+/*-------------------------------------------------------------------------
+ * message.h
+ * Exports from replication/logical/message.c
+ *
+ * Copyright (c) 2013-2022, PostgreSQL Global Development Group
+ *
+ * src/include/replication/message.h
+ *-------------------------------------------------------------------------
+ */
+#ifndef PG_LOGICAL_MESSAGE_H
+#define PG_LOGICAL_MESSAGE_H
+
+#include "access/xlog.h"
+#include "access/xlogdefs.h"
+#include "access/xlogreader.h"
+
+/*
+ * Generic logical decoding message wal record.
+ */
+typedef struct xl_logical_message
+{
+ Oid dbId; /* database Oid emitted from */
+ bool transactional; /* is message transactional? */
+ Size prefix_size; /* length of prefix */
+ Size message_size; /* size of the message */
+ /* payload, including null-terminated prefix of length prefix_size */
+ char message[FLEXIBLE_ARRAY_MEMBER];
+} xl_logical_message;
+
+#define SizeOfLogicalMessage (offsetof(xl_logical_message, message))
+
+extern XLogRecPtr LogLogicalMessage(const char *prefix, const char *message,
+ size_t size, bool transactional);
+
+/* RMGR API */
+#define XLOG_LOGICAL_MESSAGE 0x00
+extern void logicalmsg_redo(XLogReaderState *record);
+extern void logicalmsg_desc(StringInfo buf, XLogReaderState *record);
+extern const char *logicalmsg_identify(uint8 info);
+
+#endif /* PG_LOGICAL_MESSAGE_H */
diff --git a/src/include/replication/origin.h b/src/include/replication/origin.h
new file mode 100644
index 0000000..2d1b5e5
--- /dev/null
+++ b/src/include/replication/origin.h
@@ -0,0 +1,73 @@
+/*-------------------------------------------------------------------------
+ * origin.h
+ * Exports from replication/logical/origin.c
+ *
+ * Copyright (c) 2013-2022, PostgreSQL Global Development Group
+ *
+ * src/include/replication/origin.h
+ *-------------------------------------------------------------------------
+ */
+#ifndef PG_ORIGIN_H
+#define PG_ORIGIN_H
+
+#include "access/xlog.h"
+#include "access/xlogdefs.h"
+#include "access/xlogreader.h"
+#include "catalog/pg_replication_origin.h"
+
+typedef struct xl_replorigin_set
+{
+ XLogRecPtr remote_lsn;
+ RepOriginId node_id;
+ bool force;
+} xl_replorigin_set;
+
+typedef struct xl_replorigin_drop
+{
+ RepOriginId node_id;
+} xl_replorigin_drop;
+
+#define XLOG_REPLORIGIN_SET 0x00
+#define XLOG_REPLORIGIN_DROP 0x10
+
+#define InvalidRepOriginId 0
+#define DoNotReplicateId PG_UINT16_MAX
+
+extern PGDLLIMPORT RepOriginId replorigin_session_origin;
+extern PGDLLIMPORT XLogRecPtr replorigin_session_origin_lsn;
+extern PGDLLIMPORT TimestampTz replorigin_session_origin_timestamp;
+
+/* API for querying & manipulating replication origins */
+extern RepOriginId replorigin_by_name(const char *name, bool missing_ok);
+extern RepOriginId replorigin_create(const char *name);
+extern void replorigin_drop_by_name(const char *name, bool missing_ok, bool nowait);
+extern bool replorigin_by_oid(RepOriginId roident, bool missing_ok,
+ char **roname);
+
+/* API for querying & manipulating replication progress tracking */
+extern void replorigin_advance(RepOriginId node,
+ XLogRecPtr remote_commit,
+ XLogRecPtr local_commit,
+ bool go_backward, bool wal_log);
+extern XLogRecPtr replorigin_get_progress(RepOriginId node, bool flush);
+
+extern void replorigin_session_advance(XLogRecPtr remote_commit,
+ XLogRecPtr local_commit);
+extern void replorigin_session_setup(RepOriginId node);
+extern void replorigin_session_reset(void);
+extern XLogRecPtr replorigin_session_get_progress(bool flush);
+
+/* Checkpoint/Startup integration */
+extern void CheckPointReplicationOrigin(void);
+extern void StartupReplicationOrigin(void);
+
+/* WAL logging */
+extern void replorigin_redo(XLogReaderState *record);
+extern void replorigin_desc(StringInfo buf, XLogReaderState *record);
+extern const char *replorigin_identify(uint8 info);
+
+/* shared memory allocation */
+extern Size ReplicationOriginShmemSize(void);
+extern void ReplicationOriginShmemInit(void);
+
+#endif /* PG_ORIGIN_H */
diff --git a/src/include/replication/output_plugin.h b/src/include/replication/output_plugin.h
new file mode 100644
index 0000000..539dc8e
--- /dev/null
+++ b/src/include/replication/output_plugin.h
@@ -0,0 +1,248 @@
+/*-------------------------------------------------------------------------
+ * output_plugin.h
+ * PostgreSQL Logical Decode Plugin Interface
+ *
+ * Copyright (c) 2012-2022, PostgreSQL Global Development Group
+ *
+ *-------------------------------------------------------------------------
+ */
+#ifndef OUTPUT_PLUGIN_H
+#define OUTPUT_PLUGIN_H
+
+#include "replication/reorderbuffer.h"
+
+struct LogicalDecodingContext;
+struct OutputPluginCallbacks;
+
+typedef enum OutputPluginOutputType
+{
+ OUTPUT_PLUGIN_BINARY_OUTPUT,
+ OUTPUT_PLUGIN_TEXTUAL_OUTPUT
+} OutputPluginOutputType;
+
+/*
+ * Options set by the output plugin, in the startup callback.
+ */
+typedef struct OutputPluginOptions
+{
+ OutputPluginOutputType output_type;
+ bool receive_rewrites;
+} OutputPluginOptions;
+
+/*
+ * Type of the shared library symbol _PG_output_plugin_init that is looked up
+ * when loading an output plugin shared library.
+ */
+typedef void (*LogicalOutputPluginInit) (struct OutputPluginCallbacks *cb);
+
+/*
+ * Callback that gets called in a user-defined plugin. ctx->private_data can
+ * be set to some private data.
+ *
+ * "is_init" will be set to "true" if the decoding slot just got defined. When
+ * the same slot is used from there one, it will be "false".
+ */
+typedef void (*LogicalDecodeStartupCB) (struct LogicalDecodingContext *ctx,
+ OutputPluginOptions *options,
+ bool is_init);
+
+/*
+ * Callback called for every (explicit or implicit) BEGIN of a successful
+ * transaction.
+ */
+typedef void (*LogicalDecodeBeginCB) (struct LogicalDecodingContext *ctx,
+ ReorderBufferTXN *txn);
+
+/*
+ * Callback for every individual change in a successful transaction.
+ */
+typedef void (*LogicalDecodeChangeCB) (struct LogicalDecodingContext *ctx,
+ ReorderBufferTXN *txn,
+ Relation relation,
+ ReorderBufferChange *change);
+
+/*
+ * Callback for every TRUNCATE in a successful transaction.
+ */
+typedef void (*LogicalDecodeTruncateCB) (struct LogicalDecodingContext *ctx,
+ ReorderBufferTXN *txn,
+ int nrelations,
+ Relation relations[],
+ ReorderBufferChange *change);
+
+/*
+ * Called for every (explicit or implicit) COMMIT of a successful transaction.
+ */
+typedef void (*LogicalDecodeCommitCB) (struct LogicalDecodingContext *ctx,
+ ReorderBufferTXN *txn,
+ XLogRecPtr commit_lsn);
+
+/*
+ * Called for the generic logical decoding messages.
+ */
+typedef void (*LogicalDecodeMessageCB) (struct LogicalDecodingContext *ctx,
+ ReorderBufferTXN *txn,
+ XLogRecPtr message_lsn,
+ bool transactional,
+ const char *prefix,
+ Size message_size,
+ const char *message);
+
+/*
+ * Filter changes by origin.
+ */
+typedef bool (*LogicalDecodeFilterByOriginCB) (struct LogicalDecodingContext *ctx,
+ RepOriginId origin_id);
+
+/*
+ * Called to shutdown an output plugin.
+ */
+typedef void (*LogicalDecodeShutdownCB) (struct LogicalDecodingContext *ctx);
+
+/*
+ * Called before decoding of PREPARE record to decide whether this
+ * transaction should be decoded with separate calls to prepare and
+ * commit_prepared/rollback_prepared callbacks or wait till COMMIT PREPARED
+ * and sent as usual transaction.
+ */
+typedef bool (*LogicalDecodeFilterPrepareCB) (struct LogicalDecodingContext *ctx,
+ TransactionId xid,
+ const char *gid);
+
+/*
+ * Callback called for every BEGIN of a prepared trnsaction.
+ */
+typedef void (*LogicalDecodeBeginPrepareCB) (struct LogicalDecodingContext *ctx,
+ ReorderBufferTXN *txn);
+
+/*
+ * Called for PREPARE record unless it was filtered by filter_prepare()
+ * callback.
+ */
+typedef void (*LogicalDecodePrepareCB) (struct LogicalDecodingContext *ctx,
+ ReorderBufferTXN *txn,
+ XLogRecPtr prepare_lsn);
+
+/*
+ * Called for COMMIT PREPARED.
+ */
+typedef void (*LogicalDecodeCommitPreparedCB) (struct LogicalDecodingContext *ctx,
+ ReorderBufferTXN *txn,
+ XLogRecPtr commit_lsn);
+
+/*
+ * Called for ROLLBACK PREPARED.
+ */
+typedef void (*LogicalDecodeRollbackPreparedCB) (struct LogicalDecodingContext *ctx,
+ ReorderBufferTXN *txn,
+ XLogRecPtr prepare_end_lsn,
+ TimestampTz prepare_time);
+
+
+/*
+ * Called when starting to stream a block of changes from in-progress
+ * transaction (may be called repeatedly, if it's streamed in multiple
+ * chunks).
+ */
+typedef void (*LogicalDecodeStreamStartCB) (struct LogicalDecodingContext *ctx,
+ ReorderBufferTXN *txn);
+
+/*
+ * Called when stopping to stream a block of changes from in-progress
+ * transaction to a remote node (may be called repeatedly, if it's streamed
+ * in multiple chunks).
+ */
+typedef void (*LogicalDecodeStreamStopCB) (struct LogicalDecodingContext *ctx,
+ ReorderBufferTXN *txn);
+
+/*
+ * Called to discard changes streamed to remote node from in-progress
+ * transaction.
+ */
+typedef void (*LogicalDecodeStreamAbortCB) (struct LogicalDecodingContext *ctx,
+ ReorderBufferTXN *txn,
+ XLogRecPtr abort_lsn);
+
+/*
+ * Called to prepare changes streamed to remote node from in-progress
+ * transaction. This is called as part of a two-phase commit.
+ */
+typedef void (*LogicalDecodeStreamPrepareCB) (struct LogicalDecodingContext *ctx,
+ ReorderBufferTXN *txn,
+ XLogRecPtr prepare_lsn);
+
+/*
+ * Called to apply changes streamed to remote node from in-progress
+ * transaction.
+ */
+typedef void (*LogicalDecodeStreamCommitCB) (struct LogicalDecodingContext *ctx,
+ ReorderBufferTXN *txn,
+ XLogRecPtr commit_lsn);
+
+/*
+ * Callback for streaming individual changes from in-progress transactions.
+ */
+typedef void (*LogicalDecodeStreamChangeCB) (struct LogicalDecodingContext *ctx,
+ ReorderBufferTXN *txn,
+ Relation relation,
+ ReorderBufferChange *change);
+
+/*
+ * Callback for streaming generic logical decoding messages from in-progress
+ * transactions.
+ */
+typedef void (*LogicalDecodeStreamMessageCB) (struct LogicalDecodingContext *ctx,
+ ReorderBufferTXN *txn,
+ XLogRecPtr message_lsn,
+ bool transactional,
+ const char *prefix,
+ Size message_size,
+ const char *message);
+
+/*
+ * Callback for streaming truncates from in-progress transactions.
+ */
+typedef void (*LogicalDecodeStreamTruncateCB) (struct LogicalDecodingContext *ctx,
+ ReorderBufferTXN *txn,
+ int nrelations,
+ Relation relations[],
+ ReorderBufferChange *change);
+
+/*
+ * Output plugin callbacks
+ */
+typedef struct OutputPluginCallbacks
+{
+ LogicalDecodeStartupCB startup_cb;
+ LogicalDecodeBeginCB begin_cb;
+ LogicalDecodeChangeCB change_cb;
+ LogicalDecodeTruncateCB truncate_cb;
+ LogicalDecodeCommitCB commit_cb;
+ LogicalDecodeMessageCB message_cb;
+ LogicalDecodeFilterByOriginCB filter_by_origin_cb;
+ LogicalDecodeShutdownCB shutdown_cb;
+
+ /* streaming of changes at prepare time */
+ LogicalDecodeFilterPrepareCB filter_prepare_cb;
+ LogicalDecodeBeginPrepareCB begin_prepare_cb;
+ LogicalDecodePrepareCB prepare_cb;
+ LogicalDecodeCommitPreparedCB commit_prepared_cb;
+ LogicalDecodeRollbackPreparedCB rollback_prepared_cb;
+
+ /* streaming of changes */
+ LogicalDecodeStreamStartCB stream_start_cb;
+ LogicalDecodeStreamStopCB stream_stop_cb;
+ LogicalDecodeStreamAbortCB stream_abort_cb;
+ LogicalDecodeStreamPrepareCB stream_prepare_cb;
+ LogicalDecodeStreamCommitCB stream_commit_cb;
+ LogicalDecodeStreamChangeCB stream_change_cb;
+ LogicalDecodeStreamMessageCB stream_message_cb;
+ LogicalDecodeStreamTruncateCB stream_truncate_cb;
+} OutputPluginCallbacks;
+
+/* Functions in replication/logical/logical.c */
+extern void OutputPluginPrepareWrite(struct LogicalDecodingContext *ctx, bool last_write);
+extern void OutputPluginWrite(struct LogicalDecodingContext *ctx, bool last_write);
+extern void OutputPluginUpdateProgress(struct LogicalDecodingContext *ctx, bool skipped_xact);
+
+#endif /* OUTPUT_PLUGIN_H */
diff --git a/src/include/replication/pgoutput.h b/src/include/replication/pgoutput.h
new file mode 100644
index 0000000..eafedd6
--- /dev/null
+++ b/src/include/replication/pgoutput.h
@@ -0,0 +1,34 @@
+/*-------------------------------------------------------------------------
+ *
+ * pgoutput.h
+ * Logical Replication output plugin
+ *
+ * Copyright (c) 2015-2022, PostgreSQL Global Development Group
+ *
+ * IDENTIFICATION
+ * src/include/replication/pgoutput.h
+ *
+ *-------------------------------------------------------------------------
+ */
+#ifndef PGOUTPUT_H
+#define PGOUTPUT_H
+
+#include "nodes/pg_list.h"
+
+typedef struct PGOutputData
+{
+ MemoryContext context; /* private memory context for transient
+ * allocations */
+ MemoryContext cachectx; /* private memory context for cache data */
+
+ /* client-supplied info: */
+ uint32 protocol_version;
+ List *publication_names;
+ List *publications;
+ bool binary;
+ bool streaming;
+ bool messages;
+ bool two_phase;
+} PGOutputData;
+
+#endif /* PGOUTPUT_H */
diff --git a/src/include/replication/reorderbuffer.h b/src/include/replication/reorderbuffer.h
new file mode 100644
index 0000000..4a01f87
--- /dev/null
+++ b/src/include/replication/reorderbuffer.h
@@ -0,0 +1,685 @@
+/*
+ * reorderbuffer.h
+ * PostgreSQL logical replay/reorder buffer management.
+ *
+ * Copyright (c) 2012-2022, PostgreSQL Global Development Group
+ *
+ * src/include/replication/reorderbuffer.h
+ */
+#ifndef REORDERBUFFER_H
+#define REORDERBUFFER_H
+
+#include "access/htup_details.h"
+#include "lib/ilist.h"
+#include "storage/sinval.h"
+#include "utils/hsearch.h"
+#include "utils/relcache.h"
+#include "utils/snapshot.h"
+#include "utils/timestamp.h"
+
+extern PGDLLIMPORT int logical_decoding_work_mem;
+
+/* an individual tuple, stored in one chunk of memory */
+typedef struct ReorderBufferTupleBuf
+{
+ /* position in preallocated list */
+ slist_node node;
+
+ /* tuple header, the interesting bit for users of logical decoding */
+ HeapTupleData tuple;
+
+ /* pre-allocated size of tuple buffer, different from tuple size */
+ Size alloc_tuple_size;
+
+ /* actual tuple data follows */
+} ReorderBufferTupleBuf;
+
+/* pointer to the data stored in a TupleBuf */
+#define ReorderBufferTupleBufData(p) \
+ ((HeapTupleHeader) MAXALIGN(((char *) p) + sizeof(ReorderBufferTupleBuf)))
+
+/*
+ * Types of the change passed to a 'change' callback.
+ *
+ * For efficiency and simplicity reasons we want to keep Snapshots, CommandIds
+ * and ComboCids in the same list with the user visible INSERT/UPDATE/DELETE
+ * changes. Users of the decoding facilities will never see changes with
+ * *_INTERNAL_* actions.
+ *
+ * The INTERNAL_SPEC_INSERT and INTERNAL_SPEC_CONFIRM, and INTERNAL_SPEC_ABORT
+ * changes concern "speculative insertions", their confirmation, and abort
+ * respectively. They're used by INSERT .. ON CONFLICT .. UPDATE. Users of
+ * logical decoding don't have to care about these.
+ */
+typedef enum ReorderBufferChangeType
+{
+ REORDER_BUFFER_CHANGE_INSERT,
+ REORDER_BUFFER_CHANGE_UPDATE,
+ REORDER_BUFFER_CHANGE_DELETE,
+ REORDER_BUFFER_CHANGE_MESSAGE,
+ REORDER_BUFFER_CHANGE_INVALIDATION,
+ REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT,
+ REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID,
+ REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID,
+ REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT,
+ REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM,
+ REORDER_BUFFER_CHANGE_INTERNAL_SPEC_ABORT,
+ REORDER_BUFFER_CHANGE_TRUNCATE
+} ReorderBufferChangeType;
+
+/* forward declaration */
+struct ReorderBufferTXN;
+
+/*
+ * a single 'change', can be an insert (with one tuple), an update (old, new),
+ * or a delete (old).
+ *
+ * The same struct is also used internally for other purposes but that should
+ * never be visible outside reorderbuffer.c.
+ */
+typedef struct ReorderBufferChange
+{
+ XLogRecPtr lsn;
+
+ /* The type of change. */
+ ReorderBufferChangeType action;
+
+ /* Transaction this change belongs to. */
+ struct ReorderBufferTXN *txn;
+
+ RepOriginId origin_id;
+
+ /*
+ * Context data for the change. Which part of the union is valid depends
+ * on action.
+ */
+ union
+ {
+ /* Old, new tuples when action == *_INSERT|UPDATE|DELETE */
+ struct
+ {
+ /* relation that has been changed */
+ RelFileNode relnode;
+
+ /* no previously reassembled toast chunks are necessary anymore */
+ bool clear_toast_afterwards;
+
+ /* valid for DELETE || UPDATE */
+ ReorderBufferTupleBuf *oldtuple;
+ /* valid for INSERT || UPDATE */
+ ReorderBufferTupleBuf *newtuple;
+ } tp;
+
+ /*
+ * Truncate data for REORDER_BUFFER_CHANGE_TRUNCATE representing one
+ * set of relations to be truncated.
+ */
+ struct
+ {
+ Size nrelids;
+ bool cascade;
+ bool restart_seqs;
+ Oid *relids;
+ } truncate;
+
+ /* Message with arbitrary data. */
+ struct
+ {
+ char *prefix;
+ Size message_size;
+ char *message;
+ } msg;
+
+ /* New snapshot, set when action == *_INTERNAL_SNAPSHOT */
+ Snapshot snapshot;
+
+ /*
+ * New command id for existing snapshot in a catalog changing tx. Set
+ * when action == *_INTERNAL_COMMAND_ID.
+ */
+ CommandId command_id;
+
+ /*
+ * New cid mapping for catalog changing transaction, set when action
+ * == *_INTERNAL_TUPLECID.
+ */
+ struct
+ {
+ RelFileNode node;
+ ItemPointerData tid;
+ CommandId cmin;
+ CommandId cmax;
+ CommandId combocid;
+ } tuplecid;
+
+ /* Invalidation. */
+ struct
+ {
+ uint32 ninvalidations; /* Number of messages */
+ SharedInvalidationMessage *invalidations; /* invalidation message */
+ } inval;
+ } data;
+
+ /*
+ * While in use this is how a change is linked into a transactions,
+ * otherwise it's the preallocated list.
+ */
+ dlist_node node;
+} ReorderBufferChange;
+
+/* ReorderBufferTXN txn_flags */
+#define RBTXN_HAS_CATALOG_CHANGES 0x0001
+#define RBTXN_IS_SUBXACT 0x0002
+#define RBTXN_IS_SERIALIZED 0x0004
+#define RBTXN_IS_SERIALIZED_CLEAR 0x0008
+#define RBTXN_IS_STREAMED 0x0010
+#define RBTXN_HAS_PARTIAL_CHANGE 0x0020
+#define RBTXN_PREPARE 0x0040
+#define RBTXN_SKIPPED_PREPARE 0x0080
+
+/* Does the transaction have catalog changes? */
+#define rbtxn_has_catalog_changes(txn) \
+( \
+ ((txn)->txn_flags & RBTXN_HAS_CATALOG_CHANGES) != 0 \
+)
+
+/* Is the transaction known as a subxact? */
+#define rbtxn_is_known_subxact(txn) \
+( \
+ ((txn)->txn_flags & RBTXN_IS_SUBXACT) != 0 \
+)
+
+/* Has this transaction been spilled to disk? */
+#define rbtxn_is_serialized(txn) \
+( \
+ ((txn)->txn_flags & RBTXN_IS_SERIALIZED) != 0 \
+)
+
+/* Has this transaction ever been spilled to disk? */
+#define rbtxn_is_serialized_clear(txn) \
+( \
+ ((txn)->txn_flags & RBTXN_IS_SERIALIZED_CLEAR) != 0 \
+)
+
+/* Has this transaction contains partial changes? */
+#define rbtxn_has_partial_change(txn) \
+( \
+ ((txn)->txn_flags & RBTXN_HAS_PARTIAL_CHANGE) != 0 \
+)
+
+/*
+ * Has this transaction been streamed to downstream?
+ *
+ * (It's not possible to deduce this from nentries and nentries_mem for
+ * various reasons. For example, all changes may be in subtransactions in
+ * which case we'd have nentries==0 for the toplevel one, which would say
+ * nothing about the streaming. So we maintain this flag, but only for the
+ * toplevel transaction.)
+ */
+#define rbtxn_is_streamed(txn) \
+( \
+ ((txn)->txn_flags & RBTXN_IS_STREAMED) != 0 \
+)
+
+/* Has this transaction been prepared? */
+#define rbtxn_prepared(txn) \
+( \
+ ((txn)->txn_flags & RBTXN_PREPARE) != 0 \
+)
+
+/* prepare for this transaction skipped? */
+#define rbtxn_skip_prepared(txn) \
+( \
+ ((txn)->txn_flags & RBTXN_SKIPPED_PREPARE) != 0 \
+)
+
+typedef struct ReorderBufferTXN
+{
+ /* See above */
+ bits32 txn_flags;
+
+ /* The transaction's transaction id, can be a toplevel or sub xid. */
+ TransactionId xid;
+
+ /* Xid of top-level transaction, if known */
+ TransactionId toplevel_xid;
+
+ /*
+ * Global transaction id required for identification of prepared
+ * transactions.
+ */
+ char *gid;
+
+ /*
+ * LSN of the first data carrying, WAL record with knowledge about this
+ * xid. This is allowed to *not* be first record adorned with this xid, if
+ * the previous records aren't relevant for logical decoding.
+ */
+ XLogRecPtr first_lsn;
+
+ /* ----
+ * LSN of the record that lead to this xact to be prepared or committed or
+ * aborted. This can be a
+ * * plain commit record
+ * * plain commit record, of a parent transaction
+ * * prepared tansaction
+ * * prepared transaction commit
+ * * plain abort record
+ * * prepared transaction abort
+ *
+ * This can also become set to earlier values than transaction end when
+ * a transaction is spilled to disk; specifically it's set to the LSN of
+ * the latest change written to disk so far.
+ * ----
+ */
+ XLogRecPtr final_lsn;
+
+ /*
+ * LSN pointing to the end of the commit record + 1.
+ */
+ XLogRecPtr end_lsn;
+
+ /* Toplevel transaction for this subxact (NULL for top-level). */
+ struct ReorderBufferTXN *toptxn;
+
+ /*
+ * LSN of the last lsn at which snapshot information reside, so we can
+ * restart decoding from there and fully recover this transaction from
+ * WAL.
+ */
+ XLogRecPtr restart_decoding_lsn;
+
+ /* origin of the change that caused this transaction */
+ RepOriginId origin_id;
+ XLogRecPtr origin_lsn;
+
+ /*
+ * Commit or Prepare time, only known when we read the actual commit or
+ * prepare record.
+ */
+ union
+ {
+ TimestampTz commit_time;
+ TimestampTz prepare_time;
+ } xact_time;
+
+ /*
+ * The base snapshot is used to decode all changes until either this
+ * transaction modifies the catalog, or another catalog-modifying
+ * transaction commits.
+ */
+ Snapshot base_snapshot;
+ XLogRecPtr base_snapshot_lsn;
+ dlist_node base_snapshot_node; /* link in txns_by_base_snapshot_lsn */
+
+ /*
+ * Snapshot/CID from the previous streaming run. Only valid for already
+ * streamed transactions (NULL/InvalidCommandId otherwise).
+ */
+ Snapshot snapshot_now;
+ CommandId command_id;
+
+ /*
+ * How many ReorderBufferChange's do we have in this txn.
+ *
+ * Changes in subtransactions are *not* included but tracked separately.
+ */
+ uint64 nentries;
+
+ /*
+ * How many of the above entries are stored in memory in contrast to being
+ * spilled to disk.
+ */
+ uint64 nentries_mem;
+
+ /*
+ * List of ReorderBufferChange structs, including new Snapshots, new
+ * CommandIds and command invalidation messages.
+ */
+ dlist_head changes;
+
+ /*
+ * List of (relation, ctid) => (cmin, cmax) mappings for catalog tuples.
+ * Those are always assigned to the toplevel transaction. (Keep track of
+ * #entries to create a hash of the right size)
+ */
+ dlist_head tuplecids;
+ uint64 ntuplecids;
+
+ /*
+ * On-demand built hash for looking up the above values.
+ */
+ HTAB *tuplecid_hash;
+
+ /*
+ * Hash containing (potentially partial) toast entries. NULL if no toast
+ * tuples have been found for the current change.
+ */
+ HTAB *toast_hash;
+
+ /*
+ * non-hierarchical list of subtransactions that are *not* aborted. Only
+ * used in toplevel transactions.
+ */
+ dlist_head subtxns;
+ uint32 nsubtxns;
+
+ /*
+ * Stored cache invalidations. This is not a linked list because we get
+ * all the invalidations at once.
+ */
+ uint32 ninvalidations;
+ SharedInvalidationMessage *invalidations;
+
+ /* ---
+ * Position in one of three lists:
+ * * list of subtransactions if we are *known* to be subxact
+ * * list of toplevel xacts (can be an as-yet unknown subxact)
+ * * list of preallocated ReorderBufferTXNs (if unused)
+ * ---
+ */
+ dlist_node node;
+
+ /*
+ * Size of this transaction (changes currently in memory, in bytes).
+ */
+ Size size;
+
+ /* Size of top-transaction including sub-transactions. */
+ Size total_size;
+
+ /* If we have detected concurrent abort then ignore future changes. */
+ bool concurrent_abort;
+
+ /*
+ * Private data pointer of the output plugin.
+ */
+ void *output_plugin_private;
+} ReorderBufferTXN;
+
+/* so we can define the callbacks used inside struct ReorderBuffer itself */
+typedef struct ReorderBuffer ReorderBuffer;
+
+/* change callback signature */
+typedef void (*ReorderBufferApplyChangeCB) (ReorderBuffer *rb,
+ ReorderBufferTXN *txn,
+ Relation relation,
+ ReorderBufferChange *change);
+
+/* truncate callback signature */
+typedef void (*ReorderBufferApplyTruncateCB) (ReorderBuffer *rb,
+ ReorderBufferTXN *txn,
+ int nrelations,
+ Relation relations[],
+ ReorderBufferChange *change);
+
+/* begin callback signature */
+typedef void (*ReorderBufferBeginCB) (ReorderBuffer *rb,
+ ReorderBufferTXN *txn);
+
+/* commit callback signature */
+typedef void (*ReorderBufferCommitCB) (ReorderBuffer *rb,
+ ReorderBufferTXN *txn,
+ XLogRecPtr commit_lsn);
+
+/* message callback signature */
+typedef void (*ReorderBufferMessageCB) (ReorderBuffer *rb,
+ ReorderBufferTXN *txn,
+ XLogRecPtr message_lsn,
+ bool transactional,
+ const char *prefix, Size sz,
+ const char *message);
+
+/* begin prepare callback signature */
+typedef void (*ReorderBufferBeginPrepareCB) (ReorderBuffer *rb,
+ ReorderBufferTXN *txn);
+
+/* prepare callback signature */
+typedef void (*ReorderBufferPrepareCB) (ReorderBuffer *rb,
+ ReorderBufferTXN *txn,
+ XLogRecPtr prepare_lsn);
+
+/* commit prepared callback signature */
+typedef void (*ReorderBufferCommitPreparedCB) (ReorderBuffer *rb,
+ ReorderBufferTXN *txn,
+ XLogRecPtr commit_lsn);
+
+/* rollback prepared callback signature */
+typedef void (*ReorderBufferRollbackPreparedCB) (ReorderBuffer *rb,
+ ReorderBufferTXN *txn,
+ XLogRecPtr prepare_end_lsn,
+ TimestampTz prepare_time);
+
+/* start streaming transaction callback signature */
+typedef void (*ReorderBufferStreamStartCB) (
+ ReorderBuffer *rb,
+ ReorderBufferTXN *txn,
+ XLogRecPtr first_lsn);
+
+/* stop streaming transaction callback signature */
+typedef void (*ReorderBufferStreamStopCB) (
+ ReorderBuffer *rb,
+ ReorderBufferTXN *txn,
+ XLogRecPtr last_lsn);
+
+/* discard streamed transaction callback signature */
+typedef void (*ReorderBufferStreamAbortCB) (
+ ReorderBuffer *rb,
+ ReorderBufferTXN *txn,
+ XLogRecPtr abort_lsn);
+
+/* prepare streamed transaction callback signature */
+typedef void (*ReorderBufferStreamPrepareCB) (
+ ReorderBuffer *rb,
+ ReorderBufferTXN *txn,
+ XLogRecPtr prepare_lsn);
+
+/* commit streamed transaction callback signature */
+typedef void (*ReorderBufferStreamCommitCB) (
+ ReorderBuffer *rb,
+ ReorderBufferTXN *txn,
+ XLogRecPtr commit_lsn);
+
+/* stream change callback signature */
+typedef void (*ReorderBufferStreamChangeCB) (
+ ReorderBuffer *rb,
+ ReorderBufferTXN *txn,
+ Relation relation,
+ ReorderBufferChange *change);
+
+/* stream message callback signature */
+typedef void (*ReorderBufferStreamMessageCB) (
+ ReorderBuffer *rb,
+ ReorderBufferTXN *txn,
+ XLogRecPtr message_lsn,
+ bool transactional,
+ const char *prefix, Size sz,
+ const char *message);
+
+/* stream truncate callback signature */
+typedef void (*ReorderBufferStreamTruncateCB) (
+ ReorderBuffer *rb,
+ ReorderBufferTXN *txn,
+ int nrelations,
+ Relation relations[],
+ ReorderBufferChange *change);
+
+struct ReorderBuffer
+{
+ /*
+ * xid => ReorderBufferTXN lookup table
+ */
+ HTAB *by_txn;
+
+ /*
+ * Transactions that could be a toplevel xact, ordered by LSN of the first
+ * record bearing that xid.
+ */
+ dlist_head toplevel_by_lsn;
+
+ /*
+ * Transactions and subtransactions that have a base snapshot, ordered by
+ * LSN of the record which caused us to first obtain the base snapshot.
+ * This is not the same as toplevel_by_lsn, because we only set the base
+ * snapshot on the first logical-decoding-relevant record (eg. heap
+ * writes), whereas the initial LSN could be set by other operations.
+ */
+ dlist_head txns_by_base_snapshot_lsn;
+
+ /*
+ * one-entry sized cache for by_txn. Very frequently the same txn gets
+ * looked up over and over again.
+ */
+ TransactionId by_txn_last_xid;
+ ReorderBufferTXN *by_txn_last_txn;
+
+ /*
+ * Callbacks to be called when a transactions commits.
+ */
+ ReorderBufferBeginCB begin;
+ ReorderBufferApplyChangeCB apply_change;
+ ReorderBufferApplyTruncateCB apply_truncate;
+ ReorderBufferCommitCB commit;
+ ReorderBufferMessageCB message;
+
+ /*
+ * Callbacks to be called when streaming a transaction at prepare time.
+ */
+ ReorderBufferBeginCB begin_prepare;
+ ReorderBufferPrepareCB prepare;
+ ReorderBufferCommitPreparedCB commit_prepared;
+ ReorderBufferRollbackPreparedCB rollback_prepared;
+
+ /*
+ * Callbacks to be called when streaming a transaction.
+ */
+ ReorderBufferStreamStartCB stream_start;
+ ReorderBufferStreamStopCB stream_stop;
+ ReorderBufferStreamAbortCB stream_abort;
+ ReorderBufferStreamPrepareCB stream_prepare;
+ ReorderBufferStreamCommitCB stream_commit;
+ ReorderBufferStreamChangeCB stream_change;
+ ReorderBufferStreamMessageCB stream_message;
+ ReorderBufferStreamTruncateCB stream_truncate;
+
+ /*
+ * Pointer that will be passed untouched to the callbacks.
+ */
+ void *private_data;
+
+ /*
+ * Saved output plugin option
+ */
+ bool output_rewrites;
+
+ /*
+ * Private memory context.
+ */
+ MemoryContext context;
+
+ /*
+ * Memory contexts for specific types objects
+ */
+ MemoryContext change_context;
+ MemoryContext txn_context;
+ MemoryContext tup_context;
+
+ XLogRecPtr current_restart_decoding_lsn;
+
+ /* buffer for disk<->memory conversions */
+ char *outbuf;
+ Size outbufsize;
+
+ /* memory accounting */
+ Size size;
+
+ /*
+ * Statistics about transactions spilled to disk.
+ *
+ * A single transaction may be spilled repeatedly, which is why we keep
+ * two different counters. For spilling, the transaction counter includes
+ * both toplevel transactions and subtransactions.
+ */
+ int64 spillTxns; /* number of transactions spilled to disk */
+ int64 spillCount; /* spill-to-disk invocation counter */
+ int64 spillBytes; /* amount of data spilled to disk */
+
+ /* Statistics about transactions streamed to the decoding output plugin */
+ int64 streamTxns; /* number of transactions streamed */
+ int64 streamCount; /* streaming invocation counter */
+ int64 streamBytes; /* amount of data decoded */
+
+ /*
+ * Statistics about all the transactions sent to the decoding output
+ * plugin
+ */
+ int64 totalTxns; /* total number of transactions sent */
+ int64 totalBytes; /* total amount of data decoded */
+};
+
+
+extern ReorderBuffer *ReorderBufferAllocate(void);
+extern void ReorderBufferFree(ReorderBuffer *);
+
+extern ReorderBufferTupleBuf *ReorderBufferGetTupleBuf(ReorderBuffer *, Size tuple_len);
+extern void ReorderBufferReturnTupleBuf(ReorderBuffer *, ReorderBufferTupleBuf *tuple);
+extern ReorderBufferChange *ReorderBufferGetChange(ReorderBuffer *);
+extern void ReorderBufferReturnChange(ReorderBuffer *, ReorderBufferChange *, bool);
+
+extern Oid *ReorderBufferGetRelids(ReorderBuffer *, int nrelids);
+extern void ReorderBufferReturnRelids(ReorderBuffer *, Oid *relids);
+
+extern void ReorderBufferQueueChange(ReorderBuffer *, TransactionId,
+ XLogRecPtr lsn, ReorderBufferChange *,
+ bool toast_insert);
+extern void ReorderBufferQueueMessage(ReorderBuffer *, TransactionId, Snapshot snapshot, XLogRecPtr lsn,
+ bool transactional, const char *prefix,
+ Size message_size, const char *message);
+extern void ReorderBufferCommit(ReorderBuffer *, TransactionId,
+ XLogRecPtr commit_lsn, XLogRecPtr end_lsn,
+ TimestampTz commit_time, RepOriginId origin_id, XLogRecPtr origin_lsn);
+extern void ReorderBufferFinishPrepared(ReorderBuffer *rb, TransactionId xid,
+ XLogRecPtr commit_lsn, XLogRecPtr end_lsn,
+ XLogRecPtr two_phase_at,
+ TimestampTz commit_time,
+ RepOriginId origin_id, XLogRecPtr origin_lsn,
+ char *gid, bool is_commit);
+extern void ReorderBufferAssignChild(ReorderBuffer *, TransactionId, TransactionId, XLogRecPtr commit_lsn);
+extern void ReorderBufferCommitChild(ReorderBuffer *, TransactionId, TransactionId,
+ XLogRecPtr commit_lsn, XLogRecPtr end_lsn);
+extern void ReorderBufferAbort(ReorderBuffer *, TransactionId, XLogRecPtr lsn);
+extern void ReorderBufferAbortOld(ReorderBuffer *, TransactionId xid);
+extern void ReorderBufferForget(ReorderBuffer *, TransactionId, XLogRecPtr lsn);
+extern void ReorderBufferInvalidate(ReorderBuffer *, TransactionId, XLogRecPtr lsn);
+
+extern void ReorderBufferSetBaseSnapshot(ReorderBuffer *, TransactionId, XLogRecPtr lsn, struct SnapshotData *snap);
+extern void ReorderBufferAddSnapshot(ReorderBuffer *, TransactionId, XLogRecPtr lsn, struct SnapshotData *snap);
+extern void ReorderBufferAddNewCommandId(ReorderBuffer *, TransactionId, XLogRecPtr lsn,
+ CommandId cid);
+extern void ReorderBufferAddNewTupleCids(ReorderBuffer *, TransactionId, XLogRecPtr lsn,
+ RelFileNode node, ItemPointerData pt,
+ CommandId cmin, CommandId cmax, CommandId combocid);
+extern void ReorderBufferAddInvalidations(ReorderBuffer *, TransactionId, XLogRecPtr lsn,
+ Size nmsgs, SharedInvalidationMessage *msgs);
+extern void ReorderBufferImmediateInvalidation(ReorderBuffer *, uint32 ninvalidations,
+ SharedInvalidationMessage *invalidations);
+extern void ReorderBufferProcessXid(ReorderBuffer *, TransactionId xid, XLogRecPtr lsn);
+
+extern void ReorderBufferXidSetCatalogChanges(ReorderBuffer *, TransactionId xid, XLogRecPtr lsn);
+extern bool ReorderBufferXidHasCatalogChanges(ReorderBuffer *, TransactionId xid);
+extern bool ReorderBufferXidHasBaseSnapshot(ReorderBuffer *, TransactionId xid);
+
+extern bool ReorderBufferRememberPrepareInfo(ReorderBuffer *rb, TransactionId xid,
+ XLogRecPtr prepare_lsn, XLogRecPtr end_lsn,
+ TimestampTz prepare_time,
+ RepOriginId origin_id, XLogRecPtr origin_lsn);
+extern void ReorderBufferSkipPrepare(ReorderBuffer *rb, TransactionId xid);
+extern void ReorderBufferPrepare(ReorderBuffer *rb, TransactionId xid, char *gid);
+extern ReorderBufferTXN *ReorderBufferGetOldestTXN(ReorderBuffer *);
+extern TransactionId ReorderBufferGetOldestXmin(ReorderBuffer *rb);
+
+extern void ReorderBufferSetRestartPoint(ReorderBuffer *, XLogRecPtr ptr);
+
+extern void StartupReorderBuffer(void);
+
+#endif
diff --git a/src/include/replication/slot.h b/src/include/replication/slot.h
new file mode 100644
index 0000000..deba2c4
--- /dev/null
+++ b/src/include/replication/slot.h
@@ -0,0 +1,230 @@
+/*-------------------------------------------------------------------------
+ * slot.h
+ * Replication slot management.
+ *
+ * Copyright (c) 2012-2022, PostgreSQL Global Development Group
+ *
+ *-------------------------------------------------------------------------
+ */
+#ifndef SLOT_H
+#define SLOT_H
+
+#include "access/xlog.h"
+#include "access/xlogreader.h"
+#include "storage/condition_variable.h"
+#include "storage/lwlock.h"
+#include "storage/shmem.h"
+#include "storage/spin.h"
+#include "replication/walreceiver.h"
+
+/*
+ * Behaviour of replication slots, upon release or crash.
+ *
+ * Slots marked as PERSISTENT are crash-safe and will not be dropped when
+ * released. Slots marked as EPHEMERAL will be dropped when released or after
+ * restarts. Slots marked TEMPORARY will be dropped at the end of a session
+ * or on error.
+ *
+ * EPHEMERAL is used as a not-quite-ready state when creating persistent
+ * slots. EPHEMERAL slots can be made PERSISTENT by calling
+ * ReplicationSlotPersist(). For a slot that goes away at the end of a
+ * session, TEMPORARY is the appropriate choice.
+ */
+typedef enum ReplicationSlotPersistency
+{
+ RS_PERSISTENT,
+ RS_EPHEMERAL,
+ RS_TEMPORARY
+} ReplicationSlotPersistency;
+
+/*
+ * On-Disk data of a replication slot, preserved across restarts.
+ */
+typedef struct ReplicationSlotPersistentData
+{
+ /* The slot's identifier */
+ NameData name;
+
+ /* database the slot is active on */
+ Oid database;
+
+ /*
+ * The slot's behaviour when being dropped (or restored after a crash).
+ */
+ ReplicationSlotPersistency persistency;
+
+ /*
+ * xmin horizon for data
+ *
+ * NB: This may represent a value that hasn't been written to disk yet;
+ * see notes for effective_xmin, below.
+ */
+ TransactionId xmin;
+
+ /*
+ * xmin horizon for catalog tuples
+ *
+ * NB: This may represent a value that hasn't been written to disk yet;
+ * see notes for effective_xmin, below.
+ */
+ TransactionId catalog_xmin;
+
+ /* oldest LSN that might be required by this replication slot */
+ XLogRecPtr restart_lsn;
+
+ /* restart_lsn is copied here when the slot is invalidated */
+ XLogRecPtr invalidated_at;
+
+ /*
+ * Oldest LSN that the client has acked receipt for. This is used as the
+ * start_lsn point in case the client doesn't specify one, and also as a
+ * safety measure to jump forwards in case the client specifies a
+ * start_lsn that's further in the past than this value.
+ */
+ XLogRecPtr confirmed_flush;
+
+ /*
+ * LSN at which we enabled two_phase commit for this slot or LSN at which
+ * we found a consistent point at the time of slot creation.
+ */
+ XLogRecPtr two_phase_at;
+
+ /*
+ * Allow decoding of prepared transactions?
+ */
+ bool two_phase;
+
+ /* plugin name */
+ NameData plugin;
+} ReplicationSlotPersistentData;
+
+/*
+ * Shared memory state of a single replication slot.
+ *
+ * The in-memory data of replication slots follows a locking model based
+ * on two linked concepts:
+ * - A replication slot's in_use flag is switched when added or discarded using
+ * the LWLock ReplicationSlotControlLock, which needs to be hold in exclusive
+ * mode when updating the flag by the backend owning the slot and doing the
+ * operation, while readers (concurrent backends not owning the slot) need
+ * to hold it in shared mode when looking at replication slot data.
+ * - Individual fields are protected by mutex where only the backend owning
+ * the slot is authorized to update the fields from its own slot. The
+ * backend owning the slot does not need to take this lock when reading its
+ * own fields, while concurrent backends not owning this slot should take the
+ * lock when reading this slot's data.
+ */
+typedef struct ReplicationSlot
+{
+ /* lock, on same cacheline as effective_xmin */
+ slock_t mutex;
+
+ /* is this slot defined */
+ bool in_use;
+
+ /* Who is streaming out changes for this slot? 0 in unused slots. */
+ pid_t active_pid;
+
+ /* any outstanding modifications? */
+ bool just_dirtied;
+ bool dirty;
+
+ /*
+ * For logical decoding, it's extremely important that we never remove any
+ * data that's still needed for decoding purposes, even after a crash;
+ * otherwise, decoding will produce wrong answers. Ordinary streaming
+ * replication also needs to prevent old row versions from being removed
+ * too soon, but the worst consequence we might encounter there is
+ * unwanted query cancellations on the standby. Thus, for logical
+ * decoding, this value represents the latest xmin that has actually been
+ * written to disk, whereas for streaming replication, it's just the same
+ * as the persistent value (data.xmin).
+ */
+ TransactionId effective_xmin;
+ TransactionId effective_catalog_xmin;
+
+ /* data surviving shutdowns and crashes */
+ ReplicationSlotPersistentData data;
+
+ /* is somebody performing io on this slot? */
+ LWLock io_in_progress_lock;
+
+ /* Condition variable signaled when active_pid changes */
+ ConditionVariable active_cv;
+
+ /* all the remaining data is only used for logical slots */
+
+ /*
+ * When the client has confirmed flushes >= candidate_xmin_lsn we can
+ * advance the catalog xmin. When restart_valid has been passed,
+ * restart_lsn can be increased.
+ */
+ TransactionId candidate_catalog_xmin;
+ XLogRecPtr candidate_xmin_lsn;
+ XLogRecPtr candidate_restart_valid;
+ XLogRecPtr candidate_restart_lsn;
+} ReplicationSlot;
+
+#define SlotIsPhysical(slot) ((slot)->data.database == InvalidOid)
+#define SlotIsLogical(slot) ((slot)->data.database != InvalidOid)
+
+/*
+ * Shared memory control area for all of replication slots.
+ */
+typedef struct ReplicationSlotCtlData
+{
+ /*
+ * This array should be declared [FLEXIBLE_ARRAY_MEMBER], but for some
+ * reason you can't do that in an otherwise-empty struct.
+ */
+ ReplicationSlot replication_slots[1];
+} ReplicationSlotCtlData;
+
+/*
+ * Pointers to shared memory
+ */
+extern PGDLLIMPORT ReplicationSlotCtlData *ReplicationSlotCtl;
+extern PGDLLIMPORT ReplicationSlot *MyReplicationSlot;
+
+/* GUCs */
+extern PGDLLIMPORT int max_replication_slots;
+
+/* shmem initialization functions */
+extern Size ReplicationSlotsShmemSize(void);
+extern void ReplicationSlotsShmemInit(void);
+
+/* management of individual slots */
+extern void ReplicationSlotCreate(const char *name, bool db_specific,
+ ReplicationSlotPersistency p, bool two_phase);
+extern void ReplicationSlotPersist(void);
+extern void ReplicationSlotDrop(const char *name, bool nowait);
+
+extern void ReplicationSlotAcquire(const char *name, bool nowait);
+extern void ReplicationSlotRelease(void);
+extern void ReplicationSlotCleanup(void);
+extern void ReplicationSlotSave(void);
+extern void ReplicationSlotMarkDirty(void);
+
+/* misc stuff */
+extern void ReplicationSlotInitialize(void);
+extern bool ReplicationSlotValidateName(const char *name, int elevel);
+extern void ReplicationSlotReserveWal(void);
+extern void ReplicationSlotsComputeRequiredXmin(bool already_locked);
+extern void ReplicationSlotsComputeRequiredLSN(void);
+extern XLogRecPtr ReplicationSlotsComputeLogicalRestartLSN(void);
+extern bool ReplicationSlotsCountDBSlots(Oid dboid, int *nslots, int *nactive);
+extern void ReplicationSlotsDropDBSlots(Oid dboid);
+extern bool InvalidateObsoleteReplicationSlots(XLogSegNo oldestSegno);
+extern ReplicationSlot *SearchNamedReplicationSlot(const char *name, bool need_lock);
+extern int ReplicationSlotIndex(ReplicationSlot *slot);
+extern bool ReplicationSlotName(int index, Name name);
+extern void ReplicationSlotNameForTablesync(Oid suboid, Oid relid, char *syncslotname, int szslot);
+extern void ReplicationSlotDropAtPubNode(WalReceiverConn *wrconn, char *slotname, bool missing_ok);
+
+extern void StartupReplicationSlots(void);
+extern void CheckPointReplicationSlots(void);
+
+extern void CheckSlotRequirements(void);
+extern void CheckSlotPermissions(void);
+
+#endif /* SLOT_H */
diff --git a/src/include/replication/snapbuild.h b/src/include/replication/snapbuild.h
new file mode 100644
index 0000000..53d83f3
--- /dev/null
+++ b/src/include/replication/snapbuild.h
@@ -0,0 +1,97 @@
+/*-------------------------------------------------------------------------
+ *
+ * snapbuild.h
+ * Exports from replication/logical/snapbuild.c.
+ *
+ * Copyright (c) 2012-2022, PostgreSQL Global Development Group
+ *
+ * src/include/replication/snapbuild.h
+ *
+ *-------------------------------------------------------------------------
+ */
+#ifndef SNAPBUILD_H
+#define SNAPBUILD_H
+
+#include "access/xlogdefs.h"
+#include "utils/snapmgr.h"
+
+typedef enum
+{
+ /*
+ * Initial state, we can't do much yet.
+ */
+ SNAPBUILD_START = -1,
+
+ /*
+ * Collecting committed transactions, to build the initial catalog
+ * snapshot.
+ */
+ SNAPBUILD_BUILDING_SNAPSHOT = 0,
+
+ /*
+ * We have collected enough information to decode tuples in transactions
+ * that started after this.
+ *
+ * Once we reached this we start to collect changes. We cannot apply them
+ * yet, because they might be based on transactions that were still
+ * running when FULL_SNAPSHOT was reached.
+ */
+ SNAPBUILD_FULL_SNAPSHOT = 1,
+
+ /*
+ * Found a point after SNAPBUILD_FULL_SNAPSHOT where all transactions that
+ * were running at that point finished. Till we reach that we hold off
+ * calling any commit callbacks.
+ */
+ SNAPBUILD_CONSISTENT = 2
+} SnapBuildState;
+
+/* forward declare so we don't have to expose the struct to the public */
+struct SnapBuild;
+typedef struct SnapBuild SnapBuild;
+
+/* forward declare so we don't have to include reorderbuffer.h */
+struct ReorderBuffer;
+
+/* forward declare so we don't have to include heapam_xlog.h */
+struct xl_heap_new_cid;
+struct xl_running_xacts;
+
+extern void CheckPointSnapBuild(void);
+
+extern SnapBuild *AllocateSnapshotBuilder(struct ReorderBuffer *cache,
+ TransactionId xmin_horizon, XLogRecPtr start_lsn,
+ bool need_full_snapshot,
+ XLogRecPtr two_phase_at);
+extern void FreeSnapshotBuilder(SnapBuild *cache);
+
+extern void SnapBuildSnapDecRefcount(Snapshot snap);
+
+extern Snapshot SnapBuildInitialSnapshot(SnapBuild *builder);
+extern const char *SnapBuildExportSnapshot(SnapBuild *snapstate);
+extern void SnapBuildClearExportedSnapshot(void);
+extern void SnapBuildResetExportedSnapshotState(void);
+
+extern SnapBuildState SnapBuildCurrentState(SnapBuild *snapstate);
+extern Snapshot SnapBuildGetOrBuildSnapshot(SnapBuild *builder,
+ TransactionId xid);
+
+extern bool SnapBuildXactNeedsSkip(SnapBuild *snapstate, XLogRecPtr ptr);
+extern XLogRecPtr SnapBuildGetTwoPhaseAt(SnapBuild *builder);
+extern void SnapBuildSetTwoPhaseAt(SnapBuild *builder, XLogRecPtr ptr);
+
+extern void SnapBuildCommitTxn(SnapBuild *builder, XLogRecPtr lsn,
+ TransactionId xid, int nsubxacts,
+ TransactionId *subxacts);
+extern bool SnapBuildProcessChange(SnapBuild *builder, TransactionId xid,
+ XLogRecPtr lsn);
+extern void SnapBuildProcessNewCid(SnapBuild *builder, TransactionId xid,
+ XLogRecPtr lsn, struct xl_heap_new_cid *cid);
+extern void SnapBuildProcessRunningXacts(SnapBuild *builder, XLogRecPtr lsn,
+ struct xl_running_xacts *running);
+extern void SnapBuildSerializationPoint(SnapBuild *builder, XLogRecPtr lsn);
+
+extern void SnapBuildXidSetCatalogChanges(SnapBuild *builder, TransactionId xid,
+ int subxcnt, TransactionId *subxacts,
+ XLogRecPtr lsn);
+#endif /* SNAPBUILD_H */
diff --git a/src/include/replication/syncrep.h b/src/include/replication/syncrep.h
new file mode 100644
index 0000000..4d7c90b
--- /dev/null
+++ b/src/include/replication/syncrep.h
@@ -0,0 +1,115 @@
+/*-------------------------------------------------------------------------
+ *
+ * syncrep.h
+ * Exports from replication/syncrep.c.
+ *
+ * Portions Copyright (c) 2010-2022, PostgreSQL Global Development Group
+ *
+ * IDENTIFICATION
+ * src/include/replication/syncrep.h
+ *
+ *-------------------------------------------------------------------------
+ */
+#ifndef _SYNCREP_H
+#define _SYNCREP_H
+
+#include "access/xlogdefs.h"
+#include "utils/guc.h"
+
+#define SyncRepRequested() \
+ (max_wal_senders > 0 && synchronous_commit > SYNCHRONOUS_COMMIT_LOCAL_FLUSH)
+
+/* SyncRepWaitMode */
+#define SYNC_REP_NO_WAIT (-1)
+#define SYNC_REP_WAIT_WRITE 0
+#define SYNC_REP_WAIT_FLUSH 1
+#define SYNC_REP_WAIT_APPLY 2
+
+#define NUM_SYNC_REP_WAIT_MODE 3
+
+/* syncRepState */
+#define SYNC_REP_NOT_WAITING 0
+#define SYNC_REP_WAITING 1
+#define SYNC_REP_WAIT_COMPLETE 2
+
+/* syncrep_method of SyncRepConfigData */
+#define SYNC_REP_PRIORITY 0
+#define SYNC_REP_QUORUM 1
+
+/*
+ * SyncRepGetCandidateStandbys returns an array of these structs,
+ * one per candidate synchronous walsender.
+ */
+typedef struct SyncRepStandbyData
+{
+ /* Copies of relevant fields from WalSnd shared-memory struct */
+ pid_t pid;
+ XLogRecPtr write;
+ XLogRecPtr flush;
+ XLogRecPtr apply;
+ int sync_standby_priority;
+ /* Index of this walsender in the WalSnd shared-memory array */
+ int walsnd_index;
+ /* This flag indicates whether this struct is about our own process */
+ bool is_me;
+} SyncRepStandbyData;
+
+/*
+ * Struct for the configuration of synchronous replication.
+ *
+ * Note: this must be a flat representation that can be held in a single
+ * chunk of malloc'd memory, so that it can be stored as the "extra" data
+ * for the synchronous_standby_names GUC.
+ */
+typedef struct SyncRepConfigData
+{
+ int config_size; /* total size of this struct, in bytes */
+ int num_sync; /* number of sync standbys that we need to
+ * wait for */
+ uint8 syncrep_method; /* method to choose sync standbys */
+ int nmembers; /* number of members in the following list */
+ /* member_names contains nmembers consecutive nul-terminated C strings */
+ char member_names[FLEXIBLE_ARRAY_MEMBER];
+} SyncRepConfigData;
+
+extern PGDLLIMPORT SyncRepConfigData *SyncRepConfig;
+
+/* communication variables for parsing synchronous_standby_names GUC */
+extern PGDLLIMPORT SyncRepConfigData *syncrep_parse_result;
+extern PGDLLIMPORT char *syncrep_parse_error_msg;
+
+/* user-settable parameters for synchronous replication */
+extern PGDLLIMPORT char *SyncRepStandbyNames;
+
+/* called by user backend */
+extern void SyncRepWaitForLSN(XLogRecPtr lsn, bool commit);
+
+/* called at backend exit */
+extern void SyncRepCleanupAtProcExit(void);
+
+/* called by wal sender */
+extern void SyncRepInitConfig(void);
+extern void SyncRepReleaseWaiters(void);
+
+/* called by wal sender and user backend */
+extern int SyncRepGetCandidateStandbys(SyncRepStandbyData **standbys);
+
+/* called by checkpointer */
+extern void SyncRepUpdateSyncStandbysDefined(void);
+
+/* GUC infrastructure */
+extern bool check_synchronous_standby_names(char **newval, void **extra, GucSource source);
+extern void assign_synchronous_standby_names(const char *newval, void *extra);
+extern void assign_synchronous_commit(int newval, void *extra);
+
+/*
+ * Internal functions for parsing synchronous_standby_names grammar,
+ * in syncrep_gram.y and syncrep_scanner.l
+ */
+extern int syncrep_yyparse(void);
+extern int syncrep_yylex(void);
+extern void syncrep_yyerror(const char *str);
+extern void syncrep_scanner_init(const char *query_string);
+extern void syncrep_scanner_finish(void);
+
+#endif /* _SYNCREP_H */
diff --git a/src/include/replication/walreceiver.h b/src/include/replication/walreceiver.h
new file mode 100644
index 0000000..81184aa
--- /dev/null
+++ b/src/include/replication/walreceiver.h
@@ -0,0 +1,472 @@
+/*-------------------------------------------------------------------------
+ *
+ * walreceiver.h
+ * Exports from replication/walreceiverfuncs.c.
+ *
+ * Portions Copyright (c) 2010-2022, PostgreSQL Global Development Group
+ *
+ * src/include/replication/walreceiver.h
+ *
+ *-------------------------------------------------------------------------
+ */
+#ifndef _WALRECEIVER_H
+#define _WALRECEIVER_H
+
+#include "access/xlog.h"
+#include "access/xlogdefs.h"
+#include "getaddrinfo.h" /* for NI_MAXHOST */
+#include "pgtime.h"
+#include "port/atomics.h"
+#include "replication/logicalproto.h"
+#include "replication/walsender.h"
+#include "storage/condition_variable.h"
+#include "storage/latch.h"
+#include "storage/spin.h"
+#include "utils/tuplestore.h"
+
+/* user-settable parameters */
+extern PGDLLIMPORT int wal_receiver_status_interval;
+extern PGDLLIMPORT int wal_receiver_timeout;
+extern PGDLLIMPORT bool hot_standby_feedback;
+
+/*
+ * MAXCONNINFO: maximum size of a connection string.
+ *
+ * XXX: Should this move to pg_config_manual.h?
+ */
+#define MAXCONNINFO 1024
+
+/* Can we allow the standby to accept replication connection from another standby? */
+#define AllowCascadeReplication() (EnableHotStandby && max_wal_senders > 0)
+
+/*
+ * Values for WalRcv->walRcvState.
+ */
+typedef enum
+{
+ WALRCV_STOPPED, /* stopped and mustn't start up again */
+ WALRCV_STARTING, /* launched, but the process hasn't
+ * initialized yet */
+ WALRCV_STREAMING, /* walreceiver is streaming */
+ WALRCV_WAITING, /* stopped streaming, waiting for orders */
+ WALRCV_RESTARTING, /* asked to restart streaming */
+ WALRCV_STOPPING /* requested to stop, but still running */
+} WalRcvState;
+
+/* Shared memory area for management of walreceiver process */
+typedef struct
+{
+ /*
+ * PID of currently active walreceiver process, its current state and
+ * start time (actually, the time at which it was requested to be
+ * started).
+ */
+ pid_t pid;
+ WalRcvState walRcvState;
+ ConditionVariable walRcvStoppedCV;
+ pg_time_t startTime;
+
+ /*
+ * receiveStart and receiveStartTLI indicate the first byte position and
+ * timeline that will be received. When startup process starts the
+ * walreceiver, it sets these to the point where it wants the streaming to
+ * begin.
+ */
+ XLogRecPtr receiveStart;
+ TimeLineID receiveStartTLI;
+
+ /*
+ * flushedUpto-1 is the last byte position that has already been received,
+ * and receivedTLI is the timeline it came from. At the first startup of
+ * walreceiver, these are set to receiveStart and receiveStartTLI. After
+ * that, walreceiver updates these whenever it flushes the received WAL to
+ * disk.
+ */
+ XLogRecPtr flushedUpto;
+ TimeLineID receivedTLI;
+
+ /*
+ * latestChunkStart is the starting byte position of the current "batch"
+ * of received WAL. It's actually the same as the previous value of
+ * flushedUpto before the last flush to disk. Startup process can use
+ * this to detect whether it's keeping up or not.
+ */
+ XLogRecPtr latestChunkStart;
+
+ /*
+ * Time of send and receive of any message received.
+ */
+ TimestampTz lastMsgSendTime;
+ TimestampTz lastMsgReceiptTime;
+
+ /*
+ * Latest reported end of WAL on the sender
+ */
+ XLogRecPtr latestWalEnd;
+ TimestampTz latestWalEndTime;
+
+ /*
+ * connection string; initially set to connect to the primary, and later
+ * clobbered to hide security-sensitive fields.
+ */
+ char conninfo[MAXCONNINFO];
+
+ /*
+ * Host name (this can be a host name, an IP address, or a directory path)
+ * and port number of the active replication connection.
+ */
+ char sender_host[NI_MAXHOST];
+ int sender_port;
+
+ /*
+ * replication slot name; is also used for walreceiver to connect with the
+ * primary
+ */
+ char slotname[NAMEDATALEN];
+
+ /*
+ * If it's a temporary replication slot, it needs to be recreated when
+ * connecting.
+ */
+ bool is_temp_slot;
+
+ /* set true once conninfo is ready to display (obfuscated pwds etc) */
+ bool ready_to_display;
+
+ /*
+ * Latch used by startup process to wake up walreceiver after telling it
+ * where to start streaming (after setting receiveStart and
+ * receiveStartTLI), and also to tell it to send apply feedback to the
+ * primary whenever specially marked commit records are applied. This is
+ * normally mapped to procLatch when walreceiver is running.
+ */
+ Latch *latch;
+
+ slock_t mutex; /* locks shared variables shown above */
+
+ /*
+ * Like flushedUpto, but advanced after writing and before flushing,
+ * without the need to acquire the spin lock. Data can be read by another
+ * process up to this point, but shouldn't be used for data integrity
+ * purposes.
+ */
+ pg_atomic_uint64 writtenUpto;
+
+ /*
+ * force walreceiver reply? This doesn't need to be locked; memory
+ * barriers for ordering are sufficient. But we do need atomic fetch and
+ * store semantics, so use sig_atomic_t.
+ */
+ sig_atomic_t force_reply; /* used as a bool */
+} WalRcvData;
+
+extern PGDLLIMPORT WalRcvData *WalRcv;
+
+typedef struct
+{
+ bool logical; /* True if this is logical replication stream,
+ * false if physical stream. */
+ char *slotname; /* Name of the replication slot or NULL. */
+ XLogRecPtr startpoint; /* LSN of starting point. */
+
+ union
+ {
+ struct
+ {
+ TimeLineID startpointTLI; /* Starting timeline */
+ } physical;
+ struct
+ {
+ uint32 proto_version; /* Logical protocol version */
+ List *publication_names; /* String list of publications */
+ bool binary; /* Ask publisher to use binary */
+ bool streaming; /* Streaming of large transactions */
+ bool twophase; /* Streaming of two-phase transactions at
+ * prepare time */
+ } logical;
+ } proto;
+} WalRcvStreamOptions;
+
+struct WalReceiverConn;
+typedef struct WalReceiverConn WalReceiverConn;
+
+/*
+ * Status of walreceiver query execution.
+ *
+ * We only define statuses that are currently used.
+ */
+typedef enum
+{
+ WALRCV_ERROR, /* There was error when executing the query. */
+ WALRCV_OK_COMMAND, /* Query executed utility or replication
+ * command. */
+ WALRCV_OK_TUPLES, /* Query returned tuples. */
+ WALRCV_OK_COPY_IN, /* Query started COPY FROM. */
+ WALRCV_OK_COPY_OUT, /* Query started COPY TO. */
+ WALRCV_OK_COPY_BOTH /* Query started COPY BOTH replication
+ * protocol. */
+} WalRcvExecStatus;
+
+/*
+ * Return value for walrcv_exec, returns the status of the execution and
+ * tuples if any.
+ */
+typedef struct WalRcvExecResult
+{
+ WalRcvExecStatus status;
+ int sqlstate;
+ char *err;
+ Tuplestorestate *tuplestore;
+ TupleDesc tupledesc;
+} WalRcvExecResult;
+
+/* WAL receiver - libpqwalreceiver hooks */
+
+/*
+ * walrcv_connect_fn
+ *
+ * Establish connection to a cluster. 'logical' is true if the
+ * connection is logical, and false if the connection is physical.
+ * 'appname' is a name associated to the connection, to use for example
+ * with fallback_application_name or application_name. Returns the
+ * details about the connection established, as defined by
+ * WalReceiverConn for each WAL receiver module. On error, NULL is
+ * returned with 'err' including the error generated.
+ */
+typedef WalReceiverConn *(*walrcv_connect_fn) (const char *conninfo,
+ bool logical,
+ const char *appname,
+ char **err);
+
+/*
+ * walrcv_check_conninfo_fn
+ *
+ * Parse and validate the connection string given as of 'conninfo'.
+ */
+typedef void (*walrcv_check_conninfo_fn) (const char *conninfo);
+
+/*
+ * walrcv_get_conninfo_fn
+ *
+ * Returns a user-displayable conninfo string. Note that any
+ * security-sensitive fields should be obfuscated.
+ */
+typedef char *(*walrcv_get_conninfo_fn) (WalReceiverConn *conn);
+
+/*
+ * walrcv_get_senderinfo_fn
+ *
+ * Provide information of the WAL sender this WAL receiver is connected
+ * to, as of 'sender_host' for the host of the sender and 'sender_port'
+ * for its port.
+ */
+typedef void (*walrcv_get_senderinfo_fn) (WalReceiverConn *conn,
+ char **sender_host,
+ int *sender_port);
+
+/*
+ * walrcv_identify_system_fn
+ *
+ * Run IDENTIFY_SYSTEM on the cluster connected to and validate the
+ * identity of the cluster. Returns the system ID of the cluster
+ * connected to. 'primary_tli' is the timeline ID of the sender.
+ */
+typedef char *(*walrcv_identify_system_fn) (WalReceiverConn *conn,
+ TimeLineID *primary_tli);
+
+/*
+ * walrcv_server_version_fn
+ *
+ * Returns the version number of the cluster connected to.
+ */
+typedef int (*walrcv_server_version_fn) (WalReceiverConn *conn);
+
+/*
+ * walrcv_readtimelinehistoryfile_fn
+ *
+ * Fetch from cluster the timeline history file for timeline 'tli'.
+ * Returns the name of the timeline history file as of 'filename', its
+ * contents as of 'content' and its 'size'.
+ */
+typedef void (*walrcv_readtimelinehistoryfile_fn) (WalReceiverConn *conn,
+ TimeLineID tli,
+ char **filename,
+ char **content,
+ int *size);
+
+/*
+ * walrcv_startstreaming_fn
+ *
+ * Start streaming WAL data from given streaming options. Returns true
+ * if the connection has switched successfully to copy-both mode and false
+ * if the server received the command and executed it successfully, but
+ * didn't switch to copy-mode.
+ */
+typedef bool (*walrcv_startstreaming_fn) (WalReceiverConn *conn,
+ const WalRcvStreamOptions *options);
+
+/*
+ * walrcv_endstreaming_fn
+ *
+ * Stop streaming of WAL data. Returns the next timeline ID of the cluster
+ * connected to in 'next_tli', or 0 if there was no report.
+ */
+typedef void (*walrcv_endstreaming_fn) (WalReceiverConn *conn,
+ TimeLineID *next_tli);
+
+/*
+ * walrcv_receive_fn
+ *
+ * Receive a message available from the WAL stream. 'buffer' is a pointer
+ * to a buffer holding the message received. Returns the length of the data,
+ * 0 if no data is available yet ('wait_fd' is a socket descriptor which can
+ * be waited on before a retry), and -1 if the cluster ended the COPY.
+ */
+typedef int (*walrcv_receive_fn) (WalReceiverConn *conn,
+ char **buffer,
+ pgsocket *wait_fd);
+
+/*
+ * walrcv_send_fn
+ *
+ * Send a message of size 'nbytes' to the WAL stream with 'buffer' as
+ * contents.
+ */
+typedef void (*walrcv_send_fn) (WalReceiverConn *conn,
+ const char *buffer,
+ int nbytes);
+
+/*
+ * walrcv_create_slot_fn
+ *
+ * Create a new replication slot named 'slotname'. 'temporary' defines
+ * if the slot is temporary. 'snapshot_action' defines the behavior wanted
+ * for an exported snapshot (see replication protocol for more details).
+ * 'lsn' includes the LSN position at which the created slot became
+ * consistent. Returns the name of the exported snapshot for a logical
+ * slot, or NULL for a physical slot.
+ */
+typedef char *(*walrcv_create_slot_fn) (WalReceiverConn *conn,
+ const char *slotname,
+ bool temporary,
+ bool two_phase,
+ CRSSnapshotAction snapshot_action,
+ XLogRecPtr *lsn);
+
+/*
+ * walrcv_get_backend_pid_fn
+ *
+ * Returns the PID of the remote backend process.
+ */
+typedef pid_t (*walrcv_get_backend_pid_fn) (WalReceiverConn *conn);
+
+/*
+ * walrcv_exec_fn
+ *
+ * Send generic queries (and commands) to the remote cluster. 'nRetTypes'
+ * is the expected number of returned attributes, and 'retTypes' an array
+ * including their type OIDs. Returns the status of the execution and
+ * tuples if any.
+ */
+typedef WalRcvExecResult *(*walrcv_exec_fn) (WalReceiverConn *conn,
+ const char *query,
+ const int nRetTypes,
+ const Oid *retTypes);
+
+/*
+ * walrcv_disconnect_fn
+ *
+ * Disconnect with the cluster.
+ */
+typedef void (*walrcv_disconnect_fn) (WalReceiverConn *conn);
+
+typedef struct WalReceiverFunctionsType
+{
+ walrcv_connect_fn walrcv_connect;
+ walrcv_check_conninfo_fn walrcv_check_conninfo;
+ walrcv_get_conninfo_fn walrcv_get_conninfo;
+ walrcv_get_senderinfo_fn walrcv_get_senderinfo;
+ walrcv_identify_system_fn walrcv_identify_system;
+ walrcv_server_version_fn walrcv_server_version;
+ walrcv_readtimelinehistoryfile_fn walrcv_readtimelinehistoryfile;
+ walrcv_startstreaming_fn walrcv_startstreaming;
+ walrcv_endstreaming_fn walrcv_endstreaming;
+ walrcv_receive_fn walrcv_receive;
+ walrcv_send_fn walrcv_send;
+ walrcv_create_slot_fn walrcv_create_slot;
+ walrcv_get_backend_pid_fn walrcv_get_backend_pid;
+ walrcv_exec_fn walrcv_exec;
+ walrcv_disconnect_fn walrcv_disconnect;
+} WalReceiverFunctionsType;
+
+extern PGDLLIMPORT WalReceiverFunctionsType *WalReceiverFunctions;
+
+#define walrcv_connect(conninfo, logical, appname, err) \
+ WalReceiverFunctions->walrcv_connect(conninfo, logical, appname, err)
+#define walrcv_check_conninfo(conninfo) \
+ WalReceiverFunctions->walrcv_check_conninfo(conninfo)
+#define walrcv_get_conninfo(conn) \
+ WalReceiverFunctions->walrcv_get_conninfo(conn)
+#define walrcv_get_senderinfo(conn, sender_host, sender_port) \
+ WalReceiverFunctions->walrcv_get_senderinfo(conn, sender_host, sender_port)
+#define walrcv_identify_system(conn, primary_tli) \
+ WalReceiverFunctions->walrcv_identify_system(conn, primary_tli)
+#define walrcv_server_version(conn) \
+ WalReceiverFunctions->walrcv_server_version(conn)
+#define walrcv_readtimelinehistoryfile(conn, tli, filename, content, size) \
+ WalReceiverFunctions->walrcv_readtimelinehistoryfile(conn, tli, filename, content, size)
+#define walrcv_startstreaming(conn, options) \
+ WalReceiverFunctions->walrcv_startstreaming(conn, options)
+#define walrcv_endstreaming(conn, next_tli) \
+ WalReceiverFunctions->walrcv_endstreaming(conn, next_tli)
+#define walrcv_receive(conn, buffer, wait_fd) \
+ WalReceiverFunctions->walrcv_receive(conn, buffer, wait_fd)
+#define walrcv_send(conn, buffer, nbytes) \
+ WalReceiverFunctions->walrcv_send(conn, buffer, nbytes)
+#define walrcv_create_slot(conn, slotname, temporary, two_phase, snapshot_action, lsn) \
+ WalReceiverFunctions->walrcv_create_slot(conn, slotname, temporary, two_phase, snapshot_action, lsn)
+#define walrcv_get_backend_pid(conn) \
+ WalReceiverFunctions->walrcv_get_backend_pid(conn)
+#define walrcv_exec(conn, exec, nRetTypes, retTypes) \
+ WalReceiverFunctions->walrcv_exec(conn, exec, nRetTypes, retTypes)
+#define walrcv_disconnect(conn) \
+ WalReceiverFunctions->walrcv_disconnect(conn)
+
+static inline void
+walrcv_clear_result(WalRcvExecResult *walres)
+{
+ if (!walres)
+ return;
+
+ if (walres->err)
+ pfree(walres->err);
+
+ if (walres->tuplestore)
+ tuplestore_end(walres->tuplestore);
+
+ if (walres->tupledesc)
+ FreeTupleDesc(walres->tupledesc);
+
+ pfree(walres);
+}
+
+/* prototypes for functions in walreceiver.c */
+extern void WalReceiverMain(void) pg_attribute_noreturn();
+extern void ProcessWalRcvInterrupts(void);
+
+/* prototypes for functions in walreceiverfuncs.c */
+extern Size WalRcvShmemSize(void);
+extern void WalRcvShmemInit(void);
+extern void ShutdownWalRcv(void);
+extern bool WalRcvStreaming(void);
+extern bool WalRcvRunning(void);
+extern void RequestXLogStreaming(TimeLineID tli, XLogRecPtr recptr,
+ const char *conninfo, const char *slotname,
+ bool create_temp_slot);
+extern XLogRecPtr GetWalRcvFlushRecPtr(XLogRecPtr *latestChunkStart, TimeLineID *receiveTLI);
+extern XLogRecPtr GetWalRcvWriteRecPtr(void);
+extern int GetReplicationApplyDelay(void);
+extern int GetReplicationTransferLatency(void);
+extern void WalRcvForceReply(void);
+
+#endif /* _WALRECEIVER_H */
diff --git a/src/include/replication/walsender.h b/src/include/replication/walsender.h
new file mode 100644
index 0000000..d99a21b
--- /dev/null
+++ b/src/include/replication/walsender.h
@@ -0,0 +1,74 @@
+/*-------------------------------------------------------------------------
+ *
+ * walsender.h
+ * Exports from replication/walsender.c.
+ *
+ * Portions Copyright (c) 2010-2022, PostgreSQL Global Development Group
+ *
+ * src/include/replication/walsender.h
+ *
+ *-------------------------------------------------------------------------
+ */
+#ifndef _WALSENDER_H
+#define _WALSENDER_H
+
+#include <signal.h>
+
+/*
+ * What to do with a snapshot in create replication slot command.
+ */
+typedef enum
+{
+ CRS_EXPORT_SNAPSHOT,
+ CRS_NOEXPORT_SNAPSHOT,
+ CRS_USE_SNAPSHOT
+} CRSSnapshotAction;
+
+/* global state */
+extern PGDLLIMPORT bool am_walsender;
+extern PGDLLIMPORT bool am_cascading_walsender;
+extern PGDLLIMPORT bool am_db_walsender;
+extern PGDLLIMPORT bool wake_wal_senders;
+
+/* user-settable parameters */
+extern PGDLLIMPORT int max_wal_senders;
+extern PGDLLIMPORT int wal_sender_timeout;
+extern PGDLLIMPORT bool log_replication_commands;
+
+extern void InitWalSender(void);
+extern bool exec_replication_command(const char *query_string);
+extern void WalSndErrorCleanup(void);
+extern void WalSndResourceCleanup(bool isCommit);
+extern void WalSndSignals(void);
+extern Size WalSndShmemSize(void);
+extern void WalSndShmemInit(void);
+extern void WalSndWakeup(void);
+extern void WalSndInitStopping(void);
+extern void WalSndWaitStopping(void);
+extern void HandleWalSndInitStopping(void);
+extern void WalSndRqstFileReload(void);
+
+/*
+ * Remember that we want to wakeup walsenders later
+ *
+ * This is separated from doing the actual wakeup because the writeout is done
+ * while holding contended locks.
+ */
+#define WalSndWakeupRequest() \
+ do { wake_wal_senders = true; } while (0)
+
+/*
+ * wakeup walsenders if there is work to be done
+ */
+#define WalSndWakeupProcessRequests() \
+ do \
+ { \
+ if (wake_wal_senders) \
+ { \
+ wake_wal_senders = false; \
+ if (max_wal_senders > 0) \
+ WalSndWakeup(); \
+ } \
+ } while (0)
+
+#endif /* _WALSENDER_H */
diff --git a/src/include/replication/walsender_private.h b/src/include/replication/walsender_private.h
new file mode 100644
index 0000000..c14888e
--- /dev/null
+++ b/src/include/replication/walsender_private.h
@@ -0,0 +1,128 @@
+/*-------------------------------------------------------------------------
+ *
+ * walsender_private.h
+ * Private definitions from replication/walsender.c.
+ *
+ * Portions Copyright (c) 2010-2022, PostgreSQL Global Development Group
+ *
+ * src/include/replication/walsender_private.h
+ *
+ *-------------------------------------------------------------------------
+ */
+#ifndef _WALSENDER_PRIVATE_H
+#define _WALSENDER_PRIVATE_H
+
+#include "access/xlog.h"
+#include "nodes/nodes.h"
+#include "replication/syncrep.h"
+#include "storage/latch.h"
+#include "storage/shmem.h"
+#include "storage/spin.h"
+
+typedef enum WalSndState
+{
+ WALSNDSTATE_STARTUP = 0,
+ WALSNDSTATE_BACKUP,
+ WALSNDSTATE_CATCHUP,
+ WALSNDSTATE_STREAMING,
+ WALSNDSTATE_STOPPING
+} WalSndState;
+
+/*
+ * Each walsender has a WalSnd struct in shared memory.
+ *
+ * This struct is protected by its 'mutex' spinlock field, except that some
+ * members are only written by the walsender process itself, and thus that
+ * process is free to read those members without holding spinlock. pid and
+ * needreload always require the spinlock to be held for all accesses.
+ */
+typedef struct WalSnd
+{
+ pid_t pid; /* this walsender's PID, or 0 if not active */
+
+ WalSndState state; /* this walsender's state */
+ XLogRecPtr sentPtr; /* WAL has been sent up to this point */
+ bool needreload; /* does currently-open file need to be
+ * reloaded? */
+
+ /*
+ * The xlog locations that have been written, flushed, and applied by
+ * standby-side. These may be invalid if the standby-side has not offered
+ * values yet.
+ */
+ XLogRecPtr write;
+ XLogRecPtr flush;
+ XLogRecPtr apply;
+
+ /* Measured lag times, or -1 for unknown/none. */
+ TimeOffset writeLag;
+ TimeOffset flushLag;
+ TimeOffset applyLag;
+
+ /*
+ * The priority order of the standby managed by this WALSender, as listed
+ * in synchronous_standby_names, or 0 if not-listed.
+ */
+ int sync_standby_priority;
+
+ /* Protects shared variables shown above. */
+ slock_t mutex;
+
+ /*
+ * Pointer to the walsender's latch. Used by backends to wake up this
+ * walsender when it has work to do. NULL if the walsender isn't active.
+ */
+ Latch *latch;
+
+ /*
+ * Timestamp of the last message received from standby.
+ */
+ TimestampTz replyTime;
+} WalSnd;
+
+extern PGDLLIMPORT WalSnd *MyWalSnd;
+
+/* There is one WalSndCtl struct for the whole database cluster */
+typedef struct
+{
+ /*
+ * Synchronous replication queue with one queue per request type.
+ * Protected by SyncRepLock.
+ */
+ SHM_QUEUE SyncRepQueue[NUM_SYNC_REP_WAIT_MODE];
+
+ /*
+ * Current location of the head of the queue. All waiters should have a
+ * waitLSN that follows this value. Protected by SyncRepLock.
+ */
+ XLogRecPtr lsn[NUM_SYNC_REP_WAIT_MODE];
+
+ /*
+ * Are any sync standbys defined? Waiting backends can't reload the
+ * config file safely, so checkpointer updates this value as needed.
+ * Protected by SyncRepLock.
+ */
+ bool sync_standbys_defined;
+
+ WalSnd walsnds[FLEXIBLE_ARRAY_MEMBER];
+} WalSndCtlData;
+
+extern PGDLLIMPORT WalSndCtlData *WalSndCtl;
+
+
+extern void WalSndSetState(WalSndState state);
+
+/*
+ * Internal functions for parsing the replication grammar, in repl_gram.y and
+ * repl_scanner.l
+ */
+extern int replication_yyparse(void);
+extern int replication_yylex(void);
+extern void replication_yyerror(const char *str) pg_attribute_noreturn();
+extern void replication_scanner_init(const char *query_string);
+extern void replication_scanner_finish(void);
+extern bool replication_scanner_is_replication_command(void);
+
+extern PGDLLIMPORT Node *replication_parse_result;
+
+#endif /* _WALSENDER_PRIVATE_H */
diff --git a/src/include/replication/worker_internal.h b/src/include/replication/worker_internal.h
new file mode 100644
index 0000000..901845a
--- /dev/null
+++ b/src/include/replication/worker_internal.h
@@ -0,0 +1,112 @@
+/*-------------------------------------------------------------------------
+ *
+ * worker_internal.h
+ * Internal headers shared by logical replication workers.
+ *
+ * Portions Copyright (c) 2016-2022, PostgreSQL Global Development Group
+ *
+ * src/include/replication/worker_internal.h
+ *
+ *-------------------------------------------------------------------------
+ */
+#ifndef WORKER_INTERNAL_H
+#define WORKER_INTERNAL_H
+
+#include <signal.h>
+
+#include "access/xlogdefs.h"
+#include "catalog/pg_subscription.h"
+#include "datatype/timestamp.h"
+#include "storage/fileset.h"
+#include "storage/lock.h"
+#include "storage/spin.h"
+
+
+typedef struct LogicalRepWorker
+{
+ /* Time at which this worker was launched. */
+ TimestampTz launch_time;
+
+ /* Indicates if this slot is used or free. */
+ bool in_use;
+
+ /* Increased every time the slot is taken by new worker. */
+ uint16 generation;
+
+ /* Pointer to proc array. NULL if not running. */
+ PGPROC *proc;
+
+ /* Database id to connect to. */
+ Oid dbid;
+
+ /* User to use for connection (will be same as owner of subscription). */
+ Oid userid;
+
+ /* Subscription id for the worker. */
+ Oid subid;
+
+ /* Used for initial table synchronization. */
+ Oid relid;
+ char relstate;
+ XLogRecPtr relstate_lsn;
+ slock_t relmutex;
+
+ /*
+ * Used to create the changes and subxact files for the streaming
+ * transactions. Upon the arrival of the first streaming transaction, the
+ * fileset will be initialized, and it will be deleted when the worker
+ * exits. Under this, separate buffiles would be created for each
+ * transaction which will be deleted after the transaction is finished.
+ */
+ FileSet *stream_fileset;
+
+ /* Stats. */
+ XLogRecPtr last_lsn;
+ TimestampTz last_send_time;
+ TimestampTz last_recv_time;
+ XLogRecPtr reply_lsn;
+ TimestampTz reply_time;
+} LogicalRepWorker;
+
+/* Main memory context for apply worker. Permanent during worker lifetime. */
+extern PGDLLIMPORT MemoryContext ApplyContext;
+
+/* libpqreceiver connection */
+extern PGDLLIMPORT struct WalReceiverConn *LogRepWorkerWalRcvConn;
+
+/* Worker and subscription objects. */
+extern PGDLLIMPORT Subscription *MySubscription;
+extern PGDLLIMPORT LogicalRepWorker *MyLogicalRepWorker;
+
+extern PGDLLIMPORT bool in_remote_transaction;
+
+extern void logicalrep_worker_attach(int slot);
+extern LogicalRepWorker *logicalrep_worker_find(Oid subid, Oid relid,
+ bool only_running);
+extern List *logicalrep_workers_find(Oid subid, bool only_running);
+extern void logicalrep_worker_launch(Oid dbid, Oid subid, const char *subname,
+ Oid userid, Oid relid);
+extern void logicalrep_worker_stop(Oid subid, Oid relid);
+extern void logicalrep_worker_wakeup(Oid subid, Oid relid);
+extern void logicalrep_worker_wakeup_ptr(LogicalRepWorker *worker);
+
+extern int logicalrep_sync_worker_count(Oid subid);
+
+extern void ReplicationOriginNameForTablesync(Oid suboid, Oid relid,
+ char *originname, int szorgname);
+extern char *LogicalRepSyncTableStart(XLogRecPtr *origin_startpos);
+
+extern bool AllTablesyncsReady(void);
+extern void UpdateTwoPhaseState(Oid suboid, char new_state);
+
+extern void process_syncing_tables(XLogRecPtr current_lsn);
+extern void invalidate_syncing_table_states(Datum arg, int cacheid,
+ uint32 hashvalue);
+
+static inline bool
+am_tablesync_worker(void)
+{
+ return OidIsValid(MyLogicalRepWorker->relid);
+}
+
+#endif /* WORKER_INTERNAL_H */