diff options
Diffstat (limited to '')
-rw-r--r-- | src/include/replication/decode.h | 34 | ||||
-rw-r--r-- | src/include/replication/logical.h | 147 | ||||
-rw-r--r-- | src/include/replication/logicallauncher.h | 29 | ||||
-rw-r--r-- | src/include/replication/logicalproto.h | 254 | ||||
-rw-r--r-- | src/include/replication/logicalrelation.h | 50 | ||||
-rw-r--r-- | src/include/replication/logicalworker.h | 19 | ||||
-rw-r--r-- | src/include/replication/message.h | 41 | ||||
-rw-r--r-- | src/include/replication/origin.h | 73 | ||||
-rw-r--r-- | src/include/replication/output_plugin.h | 248 | ||||
-rw-r--r-- | src/include/replication/pgoutput.h | 34 | ||||
-rw-r--r-- | src/include/replication/reorderbuffer.h | 685 | ||||
-rw-r--r-- | src/include/replication/slot.h | 230 | ||||
-rw-r--r-- | src/include/replication/snapbuild.h | 97 | ||||
-rw-r--r-- | src/include/replication/syncrep.h | 115 | ||||
-rw-r--r-- | src/include/replication/walreceiver.h | 472 | ||||
-rw-r--r-- | src/include/replication/walsender.h | 74 | ||||
-rw-r--r-- | src/include/replication/walsender_private.h | 128 | ||||
-rw-r--r-- | src/include/replication/worker_internal.h | 112 |
18 files changed, 2842 insertions, 0 deletions
diff --git a/src/include/replication/decode.h b/src/include/replication/decode.h new file mode 100644 index 0000000..741bf65 --- /dev/null +++ b/src/include/replication/decode.h @@ -0,0 +1,34 @@ +/*------------------------------------------------------------------------- + * decode.h + * PostgreSQL WAL to logical transformation + * + * Portions Copyright (c) 2012-2022, PostgreSQL Global Development Group + * + *------------------------------------------------------------------------- + */ +#ifndef DECODE_H +#define DECODE_H + +#include "access/xlogreader.h" +#include "access/xlogrecord.h" +#include "replication/logical.h" +#include "replication/reorderbuffer.h" + +typedef struct XLogRecordBuffer +{ + XLogRecPtr origptr; + XLogRecPtr endptr; + XLogReaderState *record; +} XLogRecordBuffer; + +extern void xlog_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf); +extern void heap_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf); +extern void heap2_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf); +extern void xact_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf); +extern void standby_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf); +extern void logicalmsg_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf); + +extern void LogicalDecodingProcessRecord(LogicalDecodingContext *ctx, + XLogReaderState *record); + +#endif diff --git a/src/include/replication/logical.h b/src/include/replication/logical.h new file mode 100644 index 0000000..edadacd --- /dev/null +++ b/src/include/replication/logical.h @@ -0,0 +1,147 @@ +/*------------------------------------------------------------------------- + * logical.h + * PostgreSQL logical decoding coordination + * + * Copyright (c) 2012-2022, PostgreSQL Global Development Group + * + *------------------------------------------------------------------------- + */ +#ifndef LOGICAL_H +#define LOGICAL_H + +#include "access/xlog.h" +#include "access/xlogreader.h" +#include "replication/output_plugin.h" +#include "replication/slot.h" + +struct LogicalDecodingContext; + +typedef void (*LogicalOutputPluginWriterWrite) (struct LogicalDecodingContext *lr, + XLogRecPtr Ptr, + TransactionId xid, + bool last_write +); + +typedef LogicalOutputPluginWriterWrite LogicalOutputPluginWriterPrepareWrite; + +typedef void (*LogicalOutputPluginWriterUpdateProgress) (struct LogicalDecodingContext *lr, + XLogRecPtr Ptr, + TransactionId xid, + bool skipped_xact +); + +typedef struct LogicalDecodingContext +{ + /* memory context this is all allocated in */ + MemoryContext context; + + /* The associated replication slot */ + ReplicationSlot *slot; + + /* infrastructure pieces for decoding */ + XLogReaderState *reader; + struct ReorderBuffer *reorder; + struct SnapBuild *snapshot_builder; + + /* + * Marks the logical decoding context as fast forward decoding one. Such a + * context does not have plugin loaded so most of the following properties + * are unused. + */ + bool fast_forward; + + OutputPluginCallbacks callbacks; + OutputPluginOptions options; + + /* + * User specified options + */ + List *output_plugin_options; + + /* + * User-Provided callback for writing/streaming out data. + */ + LogicalOutputPluginWriterPrepareWrite prepare_write; + LogicalOutputPluginWriterWrite write; + LogicalOutputPluginWriterUpdateProgress update_progress; + + /* + * Output buffer. + */ + StringInfo out; + + /* + * Private data pointer of the output plugin. + */ + void *output_plugin_private; + + /* + * Private data pointer for the data writer. + */ + void *output_writer_private; + + /* + * Does the output plugin support streaming, and is it enabled? + */ + bool streaming; + + /* + * Does the output plugin support two-phase decoding, and is it enabled? + */ + bool twophase; + + /* + * Is two-phase option given by output plugin? + * + * This flag indicates that the plugin passed in the two-phase option as + * part of the START_STREAMING command. We can't rely solely on the + * twophase flag which only tells whether the plugin provided all the + * necessary two-phase callbacks. + */ + bool twophase_opt_given; + + /* + * State for writing output. + */ + bool accept_writes; + bool prepared_write; + XLogRecPtr write_location; + TransactionId write_xid; + /* Are we processing the end LSN of a transaction? */ + bool end_xact; +} LogicalDecodingContext; + + +extern void CheckLogicalDecodingRequirements(void); + +extern LogicalDecodingContext *CreateInitDecodingContext(const char *plugin, + List *output_plugin_options, + bool need_full_snapshot, + XLogRecPtr restart_lsn, + XLogReaderRoutine *xl_routine, + LogicalOutputPluginWriterPrepareWrite prepare_write, + LogicalOutputPluginWriterWrite do_write, + LogicalOutputPluginWriterUpdateProgress update_progress); +extern LogicalDecodingContext *CreateDecodingContext(XLogRecPtr start_lsn, + List *output_plugin_options, + bool fast_forward, + XLogReaderRoutine *xl_routine, + LogicalOutputPluginWriterPrepareWrite prepare_write, + LogicalOutputPluginWriterWrite do_write, + LogicalOutputPluginWriterUpdateProgress update_progress); +extern void DecodingContextFindStartpoint(LogicalDecodingContext *ctx); +extern bool DecodingContextReady(LogicalDecodingContext *ctx); +extern void FreeDecodingContext(LogicalDecodingContext *ctx); + +extern void LogicalIncreaseXminForSlot(XLogRecPtr lsn, TransactionId xmin); +extern void LogicalIncreaseRestartDecodingForSlot(XLogRecPtr current_lsn, + XLogRecPtr restart_lsn); +extern void LogicalConfirmReceivedLocation(XLogRecPtr lsn); + +extern bool filter_prepare_cb_wrapper(LogicalDecodingContext *ctx, + TransactionId xid, const char *gid); +extern bool filter_by_origin_cb_wrapper(LogicalDecodingContext *ctx, RepOriginId origin_id); +extern void ResetLogicalStreamingState(void); +extern void UpdateDecodingStats(LogicalDecodingContext *ctx); + +#endif diff --git a/src/include/replication/logicallauncher.h b/src/include/replication/logicallauncher.h new file mode 100644 index 0000000..f1e2821 --- /dev/null +++ b/src/include/replication/logicallauncher.h @@ -0,0 +1,29 @@ +/*------------------------------------------------------------------------- + * + * logicallauncher.h + * Exports for logical replication launcher. + * + * Portions Copyright (c) 2016-2022, PostgreSQL Global Development Group + * + * src/include/replication/logicallauncher.h + * + *------------------------------------------------------------------------- + */ +#ifndef LOGICALLAUNCHER_H +#define LOGICALLAUNCHER_H + +extern PGDLLIMPORT int max_logical_replication_workers; +extern PGDLLIMPORT int max_sync_workers_per_subscription; + +extern void ApplyLauncherRegister(void); +extern void ApplyLauncherMain(Datum main_arg); + +extern Size ApplyLauncherShmemSize(void); +extern void ApplyLauncherShmemInit(void); + +extern void ApplyLauncherWakeupAtCommit(void); +extern void AtEOXact_ApplyLauncher(bool isCommit); + +extern bool IsLogicalLauncher(void); + +#endif /* LOGICALLAUNCHER_H */ diff --git a/src/include/replication/logicalproto.h b/src/include/replication/logicalproto.h new file mode 100644 index 0000000..8cdc04a --- /dev/null +++ b/src/include/replication/logicalproto.h @@ -0,0 +1,254 @@ +/*------------------------------------------------------------------------- + * + * logicalproto.h + * logical replication protocol + * + * Copyright (c) 2015-2022, PostgreSQL Global Development Group + * + * IDENTIFICATION + * src/include/replication/logicalproto.h + * + *------------------------------------------------------------------------- + */ +#ifndef LOGICAL_PROTO_H +#define LOGICAL_PROTO_H + +#include "access/xact.h" +#include "executor/tuptable.h" +#include "replication/reorderbuffer.h" +#include "utils/rel.h" + +/* + * Protocol capabilities + * + * LOGICALREP_PROTO_VERSION_NUM is our native protocol. + * LOGICALREP_PROTO_MAX_VERSION_NUM is the greatest version we can support. + * LOGICALREP_PROTO_MIN_VERSION_NUM is the oldest version we + * have backwards compatibility for. The client requests protocol version at + * connect time. + * + * LOGICALREP_PROTO_STREAM_VERSION_NUM is the minimum protocol version with + * support for streaming large transactions. Introduced in PG14. + * + * LOGICALREP_PROTO_TWOPHASE_VERSION_NUM is the minimum protocol version with + * support for two-phase commit decoding (at prepare time). Introduced in PG15. + */ +#define LOGICALREP_PROTO_MIN_VERSION_NUM 1 +#define LOGICALREP_PROTO_VERSION_NUM 1 +#define LOGICALREP_PROTO_STREAM_VERSION_NUM 2 +#define LOGICALREP_PROTO_TWOPHASE_VERSION_NUM 3 +#define LOGICALREP_PROTO_MAX_VERSION_NUM LOGICALREP_PROTO_TWOPHASE_VERSION_NUM + +/* + * Logical message types + * + * Used by logical replication wire protocol. + * + * Note: though this is an enum, the values are used to identify message types + * in logical replication protocol, which uses a single byte to identify a + * message type. Hence the values should be single-byte wide and preferably + * human-readable characters. + */ +typedef enum LogicalRepMsgType +{ + LOGICAL_REP_MSG_BEGIN = 'B', + LOGICAL_REP_MSG_COMMIT = 'C', + LOGICAL_REP_MSG_ORIGIN = 'O', + LOGICAL_REP_MSG_INSERT = 'I', + LOGICAL_REP_MSG_UPDATE = 'U', + LOGICAL_REP_MSG_DELETE = 'D', + LOGICAL_REP_MSG_TRUNCATE = 'T', + LOGICAL_REP_MSG_RELATION = 'R', + LOGICAL_REP_MSG_TYPE = 'Y', + LOGICAL_REP_MSG_MESSAGE = 'M', + LOGICAL_REP_MSG_BEGIN_PREPARE = 'b', + LOGICAL_REP_MSG_PREPARE = 'P', + LOGICAL_REP_MSG_COMMIT_PREPARED = 'K', + LOGICAL_REP_MSG_ROLLBACK_PREPARED = 'r', + LOGICAL_REP_MSG_STREAM_START = 'S', + LOGICAL_REP_MSG_STREAM_STOP = 'E', + LOGICAL_REP_MSG_STREAM_COMMIT = 'c', + LOGICAL_REP_MSG_STREAM_ABORT = 'A', + LOGICAL_REP_MSG_STREAM_PREPARE = 'p' +} LogicalRepMsgType; + +/* + * This struct stores a tuple received via logical replication. + * Keep in mind that the columns correspond to the *remote* table. + */ +typedef struct LogicalRepTupleData +{ + /* Array of StringInfos, one per column; some may be unused */ + StringInfoData *colvalues; + /* Array of markers for null/unchanged/text/binary, one per column */ + char *colstatus; + /* Length of above arrays */ + int ncols; +} LogicalRepTupleData; + +/* Possible values for LogicalRepTupleData.colstatus[colnum] */ +/* These values are also used in the on-the-wire protocol */ +#define LOGICALREP_COLUMN_NULL 'n' +#define LOGICALREP_COLUMN_UNCHANGED 'u' +#define LOGICALREP_COLUMN_TEXT 't' +#define LOGICALREP_COLUMN_BINARY 'b' /* added in PG14 */ + +typedef uint32 LogicalRepRelId; + +/* Relation information */ +typedef struct LogicalRepRelation +{ + /* Info coming from the remote side. */ + LogicalRepRelId remoteid; /* unique id of the relation */ + char *nspname; /* schema name */ + char *relname; /* relation name */ + int natts; /* number of columns */ + char **attnames; /* column names */ + Oid *atttyps; /* column types */ + char replident; /* replica identity */ + char relkind; /* remote relation kind */ + Bitmapset *attkeys; /* Bitmap of key columns */ +} LogicalRepRelation; + +/* Type mapping info */ +typedef struct LogicalRepTyp +{ + Oid remoteid; /* unique id of the remote type */ + char *nspname; /* schema name of remote type */ + char *typname; /* name of the remote type */ +} LogicalRepTyp; + +/* Transaction info */ +typedef struct LogicalRepBeginData +{ + XLogRecPtr final_lsn; + TimestampTz committime; + TransactionId xid; +} LogicalRepBeginData; + +typedef struct LogicalRepCommitData +{ + XLogRecPtr commit_lsn; + XLogRecPtr end_lsn; + TimestampTz committime; +} LogicalRepCommitData; + +/* + * Prepared transaction protocol information for begin_prepare, and prepare. + */ +typedef struct LogicalRepPreparedTxnData +{ + XLogRecPtr prepare_lsn; + XLogRecPtr end_lsn; + TimestampTz prepare_time; + TransactionId xid; + char gid[GIDSIZE]; +} LogicalRepPreparedTxnData; + +/* + * Prepared transaction protocol information for commit prepared. + */ +typedef struct LogicalRepCommitPreparedTxnData +{ + XLogRecPtr commit_lsn; + XLogRecPtr end_lsn; + TimestampTz commit_time; + TransactionId xid; + char gid[GIDSIZE]; +} LogicalRepCommitPreparedTxnData; + +/* + * Rollback Prepared transaction protocol information. The prepare information + * prepare_end_lsn and prepare_time are used to check if the downstream has + * received this prepared transaction in which case it can apply the rollback, + * otherwise, it can skip the rollback operation. The gid alone is not + * sufficient because the downstream node can have a prepared transaction with + * same identifier. + */ +typedef struct LogicalRepRollbackPreparedTxnData +{ + XLogRecPtr prepare_end_lsn; + XLogRecPtr rollback_end_lsn; + TimestampTz prepare_time; + TimestampTz rollback_time; + TransactionId xid; + char gid[GIDSIZE]; +} LogicalRepRollbackPreparedTxnData; + +extern void logicalrep_write_begin(StringInfo out, ReorderBufferTXN *txn); +extern void logicalrep_read_begin(StringInfo in, + LogicalRepBeginData *begin_data); +extern void logicalrep_write_commit(StringInfo out, ReorderBufferTXN *txn, + XLogRecPtr commit_lsn); +extern void logicalrep_read_commit(StringInfo in, + LogicalRepCommitData *commit_data); +extern void logicalrep_write_begin_prepare(StringInfo out, ReorderBufferTXN *txn); +extern void logicalrep_read_begin_prepare(StringInfo in, + LogicalRepPreparedTxnData *begin_data); +extern void logicalrep_write_prepare(StringInfo out, ReorderBufferTXN *txn, + XLogRecPtr prepare_lsn); +extern void logicalrep_read_prepare(StringInfo in, + LogicalRepPreparedTxnData *prepare_data); +extern void logicalrep_write_commit_prepared(StringInfo out, ReorderBufferTXN *txn, + XLogRecPtr commit_lsn); +extern void logicalrep_read_commit_prepared(StringInfo in, + LogicalRepCommitPreparedTxnData *prepare_data); +extern void logicalrep_write_rollback_prepared(StringInfo out, ReorderBufferTXN *txn, + XLogRecPtr prepare_end_lsn, + TimestampTz prepare_time); +extern void logicalrep_read_rollback_prepared(StringInfo in, + LogicalRepRollbackPreparedTxnData *rollback_data); +extern void logicalrep_write_stream_prepare(StringInfo out, ReorderBufferTXN *txn, + XLogRecPtr prepare_lsn); +extern void logicalrep_read_stream_prepare(StringInfo in, + LogicalRepPreparedTxnData *prepare_data); + +extern void logicalrep_write_origin(StringInfo out, const char *origin, + XLogRecPtr origin_lsn); +extern char *logicalrep_read_origin(StringInfo in, XLogRecPtr *origin_lsn); +extern void logicalrep_write_insert(StringInfo out, TransactionId xid, + Relation rel, + TupleTableSlot *newslot, + bool binary, Bitmapset *columns); +extern LogicalRepRelId logicalrep_read_insert(StringInfo in, LogicalRepTupleData *newtup); +extern void logicalrep_write_update(StringInfo out, TransactionId xid, + Relation rel, + TupleTableSlot *oldslot, + TupleTableSlot *newslot, bool binary, Bitmapset *columns); +extern LogicalRepRelId logicalrep_read_update(StringInfo in, + bool *has_oldtuple, LogicalRepTupleData *oldtup, + LogicalRepTupleData *newtup); +extern void logicalrep_write_delete(StringInfo out, TransactionId xid, + Relation rel, TupleTableSlot *oldtuple, + bool binary, Bitmapset *columns); +extern LogicalRepRelId logicalrep_read_delete(StringInfo in, + LogicalRepTupleData *oldtup); +extern void logicalrep_write_truncate(StringInfo out, TransactionId xid, + int nrelids, Oid relids[], + bool cascade, bool restart_seqs); +extern List *logicalrep_read_truncate(StringInfo in, + bool *cascade, bool *restart_seqs); +extern void logicalrep_write_message(StringInfo out, TransactionId xid, XLogRecPtr lsn, + bool transactional, const char *prefix, Size sz, const char *message); +extern void logicalrep_write_rel(StringInfo out, TransactionId xid, + Relation rel, Bitmapset *columns); +extern LogicalRepRelation *logicalrep_read_rel(StringInfo in); +extern void logicalrep_write_typ(StringInfo out, TransactionId xid, + Oid typoid); +extern void logicalrep_read_typ(StringInfo out, LogicalRepTyp *ltyp); +extern void logicalrep_write_stream_start(StringInfo out, TransactionId xid, + bool first_segment); +extern TransactionId logicalrep_read_stream_start(StringInfo in, + bool *first_segment); +extern void logicalrep_write_stream_stop(StringInfo out); +extern void logicalrep_write_stream_commit(StringInfo out, ReorderBufferTXN *txn, + XLogRecPtr commit_lsn); +extern TransactionId logicalrep_read_stream_commit(StringInfo out, + LogicalRepCommitData *commit_data); +extern void logicalrep_write_stream_abort(StringInfo out, TransactionId xid, + TransactionId subxid); +extern void logicalrep_read_stream_abort(StringInfo in, TransactionId *xid, + TransactionId *subxid); +extern const char *logicalrep_message_type(LogicalRepMsgType action); + +#endif /* LOGICAL_PROTO_H */ diff --git a/src/include/replication/logicalrelation.h b/src/include/replication/logicalrelation.h new file mode 100644 index 0000000..78cd7e7 --- /dev/null +++ b/src/include/replication/logicalrelation.h @@ -0,0 +1,50 @@ +/*------------------------------------------------------------------------- + * + * logicalrelation.h + * Relation definitions for logical replication relation mapping. + * + * Portions Copyright (c) 2016-2022, PostgreSQL Global Development Group + * + * src/include/replication/logicalrelation.h + * + *------------------------------------------------------------------------- + */ +#ifndef LOGICALRELATION_H +#define LOGICALRELATION_H + +#include "access/attmap.h" +#include "replication/logicalproto.h" + +typedef struct LogicalRepRelMapEntry +{ + LogicalRepRelation remoterel; /* key is remoterel.remoteid */ + + /* + * Validity flag -- when false, revalidate all derived info at next + * logicalrep_rel_open. (While the localrel is open, we assume our lock + * on that rel ensures the info remains good.) + */ + bool localrelvalid; + + /* Mapping to local relation. */ + Oid localreloid; /* local relation id */ + Relation localrel; /* relcache entry (NULL when closed) */ + AttrMap *attrmap; /* map of local attributes to remote ones */ + bool updatable; /* Can apply updates/deletes? */ + + /* Sync state. */ + char state; + XLogRecPtr statelsn; +} LogicalRepRelMapEntry; + +extern void logicalrep_relmap_update(LogicalRepRelation *remoterel); +extern void logicalrep_partmap_reset_relmap(LogicalRepRelation *remoterel); + +extern LogicalRepRelMapEntry *logicalrep_rel_open(LogicalRepRelId remoteid, + LOCKMODE lockmode); +extern LogicalRepRelMapEntry *logicalrep_partition_open(LogicalRepRelMapEntry *root, + Relation partrel, AttrMap *map); +extern void logicalrep_rel_close(LogicalRepRelMapEntry *rel, + LOCKMODE lockmode); + +#endif /* LOGICALRELATION_H */ diff --git a/src/include/replication/logicalworker.h b/src/include/replication/logicalworker.h new file mode 100644 index 0000000..cd1b6e8 --- /dev/null +++ b/src/include/replication/logicalworker.h @@ -0,0 +1,19 @@ +/*------------------------------------------------------------------------- + * + * logicalworker.h + * Exports for logical replication workers. + * + * Portions Copyright (c) 2016-2022, PostgreSQL Global Development Group + * + * src/include/replication/logicalworker.h + * + *------------------------------------------------------------------------- + */ +#ifndef LOGICALWORKER_H +#define LOGICALWORKER_H + +extern void ApplyWorkerMain(Datum main_arg); + +extern bool IsLogicalWorker(void); + +#endif /* LOGICALWORKER_H */ diff --git a/src/include/replication/message.h b/src/include/replication/message.h new file mode 100644 index 0000000..0b396c5 --- /dev/null +++ b/src/include/replication/message.h @@ -0,0 +1,41 @@ +/*------------------------------------------------------------------------- + * message.h + * Exports from replication/logical/message.c + * + * Copyright (c) 2013-2022, PostgreSQL Global Development Group + * + * src/include/replication/message.h + *------------------------------------------------------------------------- + */ +#ifndef PG_LOGICAL_MESSAGE_H +#define PG_LOGICAL_MESSAGE_H + +#include "access/xlog.h" +#include "access/xlogdefs.h" +#include "access/xlogreader.h" + +/* + * Generic logical decoding message wal record. + */ +typedef struct xl_logical_message +{ + Oid dbId; /* database Oid emitted from */ + bool transactional; /* is message transactional? */ + Size prefix_size; /* length of prefix */ + Size message_size; /* size of the message */ + /* payload, including null-terminated prefix of length prefix_size */ + char message[FLEXIBLE_ARRAY_MEMBER]; +} xl_logical_message; + +#define SizeOfLogicalMessage (offsetof(xl_logical_message, message)) + +extern XLogRecPtr LogLogicalMessage(const char *prefix, const char *message, + size_t size, bool transactional); + +/* RMGR API */ +#define XLOG_LOGICAL_MESSAGE 0x00 +extern void logicalmsg_redo(XLogReaderState *record); +extern void logicalmsg_desc(StringInfo buf, XLogReaderState *record); +extern const char *logicalmsg_identify(uint8 info); + +#endif /* PG_LOGICAL_MESSAGE_H */ diff --git a/src/include/replication/origin.h b/src/include/replication/origin.h new file mode 100644 index 0000000..2d1b5e5 --- /dev/null +++ b/src/include/replication/origin.h @@ -0,0 +1,73 @@ +/*------------------------------------------------------------------------- + * origin.h + * Exports from replication/logical/origin.c + * + * Copyright (c) 2013-2022, PostgreSQL Global Development Group + * + * src/include/replication/origin.h + *------------------------------------------------------------------------- + */ +#ifndef PG_ORIGIN_H +#define PG_ORIGIN_H + +#include "access/xlog.h" +#include "access/xlogdefs.h" +#include "access/xlogreader.h" +#include "catalog/pg_replication_origin.h" + +typedef struct xl_replorigin_set +{ + XLogRecPtr remote_lsn; + RepOriginId node_id; + bool force; +} xl_replorigin_set; + +typedef struct xl_replorigin_drop +{ + RepOriginId node_id; +} xl_replorigin_drop; + +#define XLOG_REPLORIGIN_SET 0x00 +#define XLOG_REPLORIGIN_DROP 0x10 + +#define InvalidRepOriginId 0 +#define DoNotReplicateId PG_UINT16_MAX + +extern PGDLLIMPORT RepOriginId replorigin_session_origin; +extern PGDLLIMPORT XLogRecPtr replorigin_session_origin_lsn; +extern PGDLLIMPORT TimestampTz replorigin_session_origin_timestamp; + +/* API for querying & manipulating replication origins */ +extern RepOriginId replorigin_by_name(const char *name, bool missing_ok); +extern RepOriginId replorigin_create(const char *name); +extern void replorigin_drop_by_name(const char *name, bool missing_ok, bool nowait); +extern bool replorigin_by_oid(RepOriginId roident, bool missing_ok, + char **roname); + +/* API for querying & manipulating replication progress tracking */ +extern void replorigin_advance(RepOriginId node, + XLogRecPtr remote_commit, + XLogRecPtr local_commit, + bool go_backward, bool wal_log); +extern XLogRecPtr replorigin_get_progress(RepOriginId node, bool flush); + +extern void replorigin_session_advance(XLogRecPtr remote_commit, + XLogRecPtr local_commit); +extern void replorigin_session_setup(RepOriginId node); +extern void replorigin_session_reset(void); +extern XLogRecPtr replorigin_session_get_progress(bool flush); + +/* Checkpoint/Startup integration */ +extern void CheckPointReplicationOrigin(void); +extern void StartupReplicationOrigin(void); + +/* WAL logging */ +extern void replorigin_redo(XLogReaderState *record); +extern void replorigin_desc(StringInfo buf, XLogReaderState *record); +extern const char *replorigin_identify(uint8 info); + +/* shared memory allocation */ +extern Size ReplicationOriginShmemSize(void); +extern void ReplicationOriginShmemInit(void); + +#endif /* PG_ORIGIN_H */ diff --git a/src/include/replication/output_plugin.h b/src/include/replication/output_plugin.h new file mode 100644 index 0000000..539dc8e --- /dev/null +++ b/src/include/replication/output_plugin.h @@ -0,0 +1,248 @@ +/*------------------------------------------------------------------------- + * output_plugin.h + * PostgreSQL Logical Decode Plugin Interface + * + * Copyright (c) 2012-2022, PostgreSQL Global Development Group + * + *------------------------------------------------------------------------- + */ +#ifndef OUTPUT_PLUGIN_H +#define OUTPUT_PLUGIN_H + +#include "replication/reorderbuffer.h" + +struct LogicalDecodingContext; +struct OutputPluginCallbacks; + +typedef enum OutputPluginOutputType +{ + OUTPUT_PLUGIN_BINARY_OUTPUT, + OUTPUT_PLUGIN_TEXTUAL_OUTPUT +} OutputPluginOutputType; + +/* + * Options set by the output plugin, in the startup callback. + */ +typedef struct OutputPluginOptions +{ + OutputPluginOutputType output_type; + bool receive_rewrites; +} OutputPluginOptions; + +/* + * Type of the shared library symbol _PG_output_plugin_init that is looked up + * when loading an output plugin shared library. + */ +typedef void (*LogicalOutputPluginInit) (struct OutputPluginCallbacks *cb); + +/* + * Callback that gets called in a user-defined plugin. ctx->private_data can + * be set to some private data. + * + * "is_init" will be set to "true" if the decoding slot just got defined. When + * the same slot is used from there one, it will be "false". + */ +typedef void (*LogicalDecodeStartupCB) (struct LogicalDecodingContext *ctx, + OutputPluginOptions *options, + bool is_init); + +/* + * Callback called for every (explicit or implicit) BEGIN of a successful + * transaction. + */ +typedef void (*LogicalDecodeBeginCB) (struct LogicalDecodingContext *ctx, + ReorderBufferTXN *txn); + +/* + * Callback for every individual change in a successful transaction. + */ +typedef void (*LogicalDecodeChangeCB) (struct LogicalDecodingContext *ctx, + ReorderBufferTXN *txn, + Relation relation, + ReorderBufferChange *change); + +/* + * Callback for every TRUNCATE in a successful transaction. + */ +typedef void (*LogicalDecodeTruncateCB) (struct LogicalDecodingContext *ctx, + ReorderBufferTXN *txn, + int nrelations, + Relation relations[], + ReorderBufferChange *change); + +/* + * Called for every (explicit or implicit) COMMIT of a successful transaction. + */ +typedef void (*LogicalDecodeCommitCB) (struct LogicalDecodingContext *ctx, + ReorderBufferTXN *txn, + XLogRecPtr commit_lsn); + +/* + * Called for the generic logical decoding messages. + */ +typedef void (*LogicalDecodeMessageCB) (struct LogicalDecodingContext *ctx, + ReorderBufferTXN *txn, + XLogRecPtr message_lsn, + bool transactional, + const char *prefix, + Size message_size, + const char *message); + +/* + * Filter changes by origin. + */ +typedef bool (*LogicalDecodeFilterByOriginCB) (struct LogicalDecodingContext *ctx, + RepOriginId origin_id); + +/* + * Called to shutdown an output plugin. + */ +typedef void (*LogicalDecodeShutdownCB) (struct LogicalDecodingContext *ctx); + +/* + * Called before decoding of PREPARE record to decide whether this + * transaction should be decoded with separate calls to prepare and + * commit_prepared/rollback_prepared callbacks or wait till COMMIT PREPARED + * and sent as usual transaction. + */ +typedef bool (*LogicalDecodeFilterPrepareCB) (struct LogicalDecodingContext *ctx, + TransactionId xid, + const char *gid); + +/* + * Callback called for every BEGIN of a prepared trnsaction. + */ +typedef void (*LogicalDecodeBeginPrepareCB) (struct LogicalDecodingContext *ctx, + ReorderBufferTXN *txn); + +/* + * Called for PREPARE record unless it was filtered by filter_prepare() + * callback. + */ +typedef void (*LogicalDecodePrepareCB) (struct LogicalDecodingContext *ctx, + ReorderBufferTXN *txn, + XLogRecPtr prepare_lsn); + +/* + * Called for COMMIT PREPARED. + */ +typedef void (*LogicalDecodeCommitPreparedCB) (struct LogicalDecodingContext *ctx, + ReorderBufferTXN *txn, + XLogRecPtr commit_lsn); + +/* + * Called for ROLLBACK PREPARED. + */ +typedef void (*LogicalDecodeRollbackPreparedCB) (struct LogicalDecodingContext *ctx, + ReorderBufferTXN *txn, + XLogRecPtr prepare_end_lsn, + TimestampTz prepare_time); + + +/* + * Called when starting to stream a block of changes from in-progress + * transaction (may be called repeatedly, if it's streamed in multiple + * chunks). + */ +typedef void (*LogicalDecodeStreamStartCB) (struct LogicalDecodingContext *ctx, + ReorderBufferTXN *txn); + +/* + * Called when stopping to stream a block of changes from in-progress + * transaction to a remote node (may be called repeatedly, if it's streamed + * in multiple chunks). + */ +typedef void (*LogicalDecodeStreamStopCB) (struct LogicalDecodingContext *ctx, + ReorderBufferTXN *txn); + +/* + * Called to discard changes streamed to remote node from in-progress + * transaction. + */ +typedef void (*LogicalDecodeStreamAbortCB) (struct LogicalDecodingContext *ctx, + ReorderBufferTXN *txn, + XLogRecPtr abort_lsn); + +/* + * Called to prepare changes streamed to remote node from in-progress + * transaction. This is called as part of a two-phase commit. + */ +typedef void (*LogicalDecodeStreamPrepareCB) (struct LogicalDecodingContext *ctx, + ReorderBufferTXN *txn, + XLogRecPtr prepare_lsn); + +/* + * Called to apply changes streamed to remote node from in-progress + * transaction. + */ +typedef void (*LogicalDecodeStreamCommitCB) (struct LogicalDecodingContext *ctx, + ReorderBufferTXN *txn, + XLogRecPtr commit_lsn); + +/* + * Callback for streaming individual changes from in-progress transactions. + */ +typedef void (*LogicalDecodeStreamChangeCB) (struct LogicalDecodingContext *ctx, + ReorderBufferTXN *txn, + Relation relation, + ReorderBufferChange *change); + +/* + * Callback for streaming generic logical decoding messages from in-progress + * transactions. + */ +typedef void (*LogicalDecodeStreamMessageCB) (struct LogicalDecodingContext *ctx, + ReorderBufferTXN *txn, + XLogRecPtr message_lsn, + bool transactional, + const char *prefix, + Size message_size, + const char *message); + +/* + * Callback for streaming truncates from in-progress transactions. + */ +typedef void (*LogicalDecodeStreamTruncateCB) (struct LogicalDecodingContext *ctx, + ReorderBufferTXN *txn, + int nrelations, + Relation relations[], + ReorderBufferChange *change); + +/* + * Output plugin callbacks + */ +typedef struct OutputPluginCallbacks +{ + LogicalDecodeStartupCB startup_cb; + LogicalDecodeBeginCB begin_cb; + LogicalDecodeChangeCB change_cb; + LogicalDecodeTruncateCB truncate_cb; + LogicalDecodeCommitCB commit_cb; + LogicalDecodeMessageCB message_cb; + LogicalDecodeFilterByOriginCB filter_by_origin_cb; + LogicalDecodeShutdownCB shutdown_cb; + + /* streaming of changes at prepare time */ + LogicalDecodeFilterPrepareCB filter_prepare_cb; + LogicalDecodeBeginPrepareCB begin_prepare_cb; + LogicalDecodePrepareCB prepare_cb; + LogicalDecodeCommitPreparedCB commit_prepared_cb; + LogicalDecodeRollbackPreparedCB rollback_prepared_cb; + + /* streaming of changes */ + LogicalDecodeStreamStartCB stream_start_cb; + LogicalDecodeStreamStopCB stream_stop_cb; + LogicalDecodeStreamAbortCB stream_abort_cb; + LogicalDecodeStreamPrepareCB stream_prepare_cb; + LogicalDecodeStreamCommitCB stream_commit_cb; + LogicalDecodeStreamChangeCB stream_change_cb; + LogicalDecodeStreamMessageCB stream_message_cb; + LogicalDecodeStreamTruncateCB stream_truncate_cb; +} OutputPluginCallbacks; + +/* Functions in replication/logical/logical.c */ +extern void OutputPluginPrepareWrite(struct LogicalDecodingContext *ctx, bool last_write); +extern void OutputPluginWrite(struct LogicalDecodingContext *ctx, bool last_write); +extern void OutputPluginUpdateProgress(struct LogicalDecodingContext *ctx, bool skipped_xact); + +#endif /* OUTPUT_PLUGIN_H */ diff --git a/src/include/replication/pgoutput.h b/src/include/replication/pgoutput.h new file mode 100644 index 0000000..eafedd6 --- /dev/null +++ b/src/include/replication/pgoutput.h @@ -0,0 +1,34 @@ +/*------------------------------------------------------------------------- + * + * pgoutput.h + * Logical Replication output plugin + * + * Copyright (c) 2015-2022, PostgreSQL Global Development Group + * + * IDENTIFICATION + * src/include/replication/pgoutput.h + * + *------------------------------------------------------------------------- + */ +#ifndef PGOUTPUT_H +#define PGOUTPUT_H + +#include "nodes/pg_list.h" + +typedef struct PGOutputData +{ + MemoryContext context; /* private memory context for transient + * allocations */ + MemoryContext cachectx; /* private memory context for cache data */ + + /* client-supplied info: */ + uint32 protocol_version; + List *publication_names; + List *publications; + bool binary; + bool streaming; + bool messages; + bool two_phase; +} PGOutputData; + +#endif /* PGOUTPUT_H */ diff --git a/src/include/replication/reorderbuffer.h b/src/include/replication/reorderbuffer.h new file mode 100644 index 0000000..4a01f87 --- /dev/null +++ b/src/include/replication/reorderbuffer.h @@ -0,0 +1,685 @@ +/* + * reorderbuffer.h + * PostgreSQL logical replay/reorder buffer management. + * + * Copyright (c) 2012-2022, PostgreSQL Global Development Group + * + * src/include/replication/reorderbuffer.h + */ +#ifndef REORDERBUFFER_H +#define REORDERBUFFER_H + +#include "access/htup_details.h" +#include "lib/ilist.h" +#include "storage/sinval.h" +#include "utils/hsearch.h" +#include "utils/relcache.h" +#include "utils/snapshot.h" +#include "utils/timestamp.h" + +extern PGDLLIMPORT int logical_decoding_work_mem; + +/* an individual tuple, stored in one chunk of memory */ +typedef struct ReorderBufferTupleBuf +{ + /* position in preallocated list */ + slist_node node; + + /* tuple header, the interesting bit for users of logical decoding */ + HeapTupleData tuple; + + /* pre-allocated size of tuple buffer, different from tuple size */ + Size alloc_tuple_size; + + /* actual tuple data follows */ +} ReorderBufferTupleBuf; + +/* pointer to the data stored in a TupleBuf */ +#define ReorderBufferTupleBufData(p) \ + ((HeapTupleHeader) MAXALIGN(((char *) p) + sizeof(ReorderBufferTupleBuf))) + +/* + * Types of the change passed to a 'change' callback. + * + * For efficiency and simplicity reasons we want to keep Snapshots, CommandIds + * and ComboCids in the same list with the user visible INSERT/UPDATE/DELETE + * changes. Users of the decoding facilities will never see changes with + * *_INTERNAL_* actions. + * + * The INTERNAL_SPEC_INSERT and INTERNAL_SPEC_CONFIRM, and INTERNAL_SPEC_ABORT + * changes concern "speculative insertions", their confirmation, and abort + * respectively. They're used by INSERT .. ON CONFLICT .. UPDATE. Users of + * logical decoding don't have to care about these. + */ +typedef enum ReorderBufferChangeType +{ + REORDER_BUFFER_CHANGE_INSERT, + REORDER_BUFFER_CHANGE_UPDATE, + REORDER_BUFFER_CHANGE_DELETE, + REORDER_BUFFER_CHANGE_MESSAGE, + REORDER_BUFFER_CHANGE_INVALIDATION, + REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT, + REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID, + REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID, + REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT, + REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM, + REORDER_BUFFER_CHANGE_INTERNAL_SPEC_ABORT, + REORDER_BUFFER_CHANGE_TRUNCATE +} ReorderBufferChangeType; + +/* forward declaration */ +struct ReorderBufferTXN; + +/* + * a single 'change', can be an insert (with one tuple), an update (old, new), + * or a delete (old). + * + * The same struct is also used internally for other purposes but that should + * never be visible outside reorderbuffer.c. + */ +typedef struct ReorderBufferChange +{ + XLogRecPtr lsn; + + /* The type of change. */ + ReorderBufferChangeType action; + + /* Transaction this change belongs to. */ + struct ReorderBufferTXN *txn; + + RepOriginId origin_id; + + /* + * Context data for the change. Which part of the union is valid depends + * on action. + */ + union + { + /* Old, new tuples when action == *_INSERT|UPDATE|DELETE */ + struct + { + /* relation that has been changed */ + RelFileNode relnode; + + /* no previously reassembled toast chunks are necessary anymore */ + bool clear_toast_afterwards; + + /* valid for DELETE || UPDATE */ + ReorderBufferTupleBuf *oldtuple; + /* valid for INSERT || UPDATE */ + ReorderBufferTupleBuf *newtuple; + } tp; + + /* + * Truncate data for REORDER_BUFFER_CHANGE_TRUNCATE representing one + * set of relations to be truncated. + */ + struct + { + Size nrelids; + bool cascade; + bool restart_seqs; + Oid *relids; + } truncate; + + /* Message with arbitrary data. */ + struct + { + char *prefix; + Size message_size; + char *message; + } msg; + + /* New snapshot, set when action == *_INTERNAL_SNAPSHOT */ + Snapshot snapshot; + + /* + * New command id for existing snapshot in a catalog changing tx. Set + * when action == *_INTERNAL_COMMAND_ID. + */ + CommandId command_id; + + /* + * New cid mapping for catalog changing transaction, set when action + * == *_INTERNAL_TUPLECID. + */ + struct + { + RelFileNode node; + ItemPointerData tid; + CommandId cmin; + CommandId cmax; + CommandId combocid; + } tuplecid; + + /* Invalidation. */ + struct + { + uint32 ninvalidations; /* Number of messages */ + SharedInvalidationMessage *invalidations; /* invalidation message */ + } inval; + } data; + + /* + * While in use this is how a change is linked into a transactions, + * otherwise it's the preallocated list. + */ + dlist_node node; +} ReorderBufferChange; + +/* ReorderBufferTXN txn_flags */ +#define RBTXN_HAS_CATALOG_CHANGES 0x0001 +#define RBTXN_IS_SUBXACT 0x0002 +#define RBTXN_IS_SERIALIZED 0x0004 +#define RBTXN_IS_SERIALIZED_CLEAR 0x0008 +#define RBTXN_IS_STREAMED 0x0010 +#define RBTXN_HAS_PARTIAL_CHANGE 0x0020 +#define RBTXN_PREPARE 0x0040 +#define RBTXN_SKIPPED_PREPARE 0x0080 + +/* Does the transaction have catalog changes? */ +#define rbtxn_has_catalog_changes(txn) \ +( \ + ((txn)->txn_flags & RBTXN_HAS_CATALOG_CHANGES) != 0 \ +) + +/* Is the transaction known as a subxact? */ +#define rbtxn_is_known_subxact(txn) \ +( \ + ((txn)->txn_flags & RBTXN_IS_SUBXACT) != 0 \ +) + +/* Has this transaction been spilled to disk? */ +#define rbtxn_is_serialized(txn) \ +( \ + ((txn)->txn_flags & RBTXN_IS_SERIALIZED) != 0 \ +) + +/* Has this transaction ever been spilled to disk? */ +#define rbtxn_is_serialized_clear(txn) \ +( \ + ((txn)->txn_flags & RBTXN_IS_SERIALIZED_CLEAR) != 0 \ +) + +/* Has this transaction contains partial changes? */ +#define rbtxn_has_partial_change(txn) \ +( \ + ((txn)->txn_flags & RBTXN_HAS_PARTIAL_CHANGE) != 0 \ +) + +/* + * Has this transaction been streamed to downstream? + * + * (It's not possible to deduce this from nentries and nentries_mem for + * various reasons. For example, all changes may be in subtransactions in + * which case we'd have nentries==0 for the toplevel one, which would say + * nothing about the streaming. So we maintain this flag, but only for the + * toplevel transaction.) + */ +#define rbtxn_is_streamed(txn) \ +( \ + ((txn)->txn_flags & RBTXN_IS_STREAMED) != 0 \ +) + +/* Has this transaction been prepared? */ +#define rbtxn_prepared(txn) \ +( \ + ((txn)->txn_flags & RBTXN_PREPARE) != 0 \ +) + +/* prepare for this transaction skipped? */ +#define rbtxn_skip_prepared(txn) \ +( \ + ((txn)->txn_flags & RBTXN_SKIPPED_PREPARE) != 0 \ +) + +typedef struct ReorderBufferTXN +{ + /* See above */ + bits32 txn_flags; + + /* The transaction's transaction id, can be a toplevel or sub xid. */ + TransactionId xid; + + /* Xid of top-level transaction, if known */ + TransactionId toplevel_xid; + + /* + * Global transaction id required for identification of prepared + * transactions. + */ + char *gid; + + /* + * LSN of the first data carrying, WAL record with knowledge about this + * xid. This is allowed to *not* be first record adorned with this xid, if + * the previous records aren't relevant for logical decoding. + */ + XLogRecPtr first_lsn; + + /* ---- + * LSN of the record that lead to this xact to be prepared or committed or + * aborted. This can be a + * * plain commit record + * * plain commit record, of a parent transaction + * * prepared tansaction + * * prepared transaction commit + * * plain abort record + * * prepared transaction abort + * + * This can also become set to earlier values than transaction end when + * a transaction is spilled to disk; specifically it's set to the LSN of + * the latest change written to disk so far. + * ---- + */ + XLogRecPtr final_lsn; + + /* + * LSN pointing to the end of the commit record + 1. + */ + XLogRecPtr end_lsn; + + /* Toplevel transaction for this subxact (NULL for top-level). */ + struct ReorderBufferTXN *toptxn; + + /* + * LSN of the last lsn at which snapshot information reside, so we can + * restart decoding from there and fully recover this transaction from + * WAL. + */ + XLogRecPtr restart_decoding_lsn; + + /* origin of the change that caused this transaction */ + RepOriginId origin_id; + XLogRecPtr origin_lsn; + + /* + * Commit or Prepare time, only known when we read the actual commit or + * prepare record. + */ + union + { + TimestampTz commit_time; + TimestampTz prepare_time; + } xact_time; + + /* + * The base snapshot is used to decode all changes until either this + * transaction modifies the catalog, or another catalog-modifying + * transaction commits. + */ + Snapshot base_snapshot; + XLogRecPtr base_snapshot_lsn; + dlist_node base_snapshot_node; /* link in txns_by_base_snapshot_lsn */ + + /* + * Snapshot/CID from the previous streaming run. Only valid for already + * streamed transactions (NULL/InvalidCommandId otherwise). + */ + Snapshot snapshot_now; + CommandId command_id; + + /* + * How many ReorderBufferChange's do we have in this txn. + * + * Changes in subtransactions are *not* included but tracked separately. + */ + uint64 nentries; + + /* + * How many of the above entries are stored in memory in contrast to being + * spilled to disk. + */ + uint64 nentries_mem; + + /* + * List of ReorderBufferChange structs, including new Snapshots, new + * CommandIds and command invalidation messages. + */ + dlist_head changes; + + /* + * List of (relation, ctid) => (cmin, cmax) mappings for catalog tuples. + * Those are always assigned to the toplevel transaction. (Keep track of + * #entries to create a hash of the right size) + */ + dlist_head tuplecids; + uint64 ntuplecids; + + /* + * On-demand built hash for looking up the above values. + */ + HTAB *tuplecid_hash; + + /* + * Hash containing (potentially partial) toast entries. NULL if no toast + * tuples have been found for the current change. + */ + HTAB *toast_hash; + + /* + * non-hierarchical list of subtransactions that are *not* aborted. Only + * used in toplevel transactions. + */ + dlist_head subtxns; + uint32 nsubtxns; + + /* + * Stored cache invalidations. This is not a linked list because we get + * all the invalidations at once. + */ + uint32 ninvalidations; + SharedInvalidationMessage *invalidations; + + /* --- + * Position in one of three lists: + * * list of subtransactions if we are *known* to be subxact + * * list of toplevel xacts (can be an as-yet unknown subxact) + * * list of preallocated ReorderBufferTXNs (if unused) + * --- + */ + dlist_node node; + + /* + * Size of this transaction (changes currently in memory, in bytes). + */ + Size size; + + /* Size of top-transaction including sub-transactions. */ + Size total_size; + + /* If we have detected concurrent abort then ignore future changes. */ + bool concurrent_abort; + + /* + * Private data pointer of the output plugin. + */ + void *output_plugin_private; +} ReorderBufferTXN; + +/* so we can define the callbacks used inside struct ReorderBuffer itself */ +typedef struct ReorderBuffer ReorderBuffer; + +/* change callback signature */ +typedef void (*ReorderBufferApplyChangeCB) (ReorderBuffer *rb, + ReorderBufferTXN *txn, + Relation relation, + ReorderBufferChange *change); + +/* truncate callback signature */ +typedef void (*ReorderBufferApplyTruncateCB) (ReorderBuffer *rb, + ReorderBufferTXN *txn, + int nrelations, + Relation relations[], + ReorderBufferChange *change); + +/* begin callback signature */ +typedef void (*ReorderBufferBeginCB) (ReorderBuffer *rb, + ReorderBufferTXN *txn); + +/* commit callback signature */ +typedef void (*ReorderBufferCommitCB) (ReorderBuffer *rb, + ReorderBufferTXN *txn, + XLogRecPtr commit_lsn); + +/* message callback signature */ +typedef void (*ReorderBufferMessageCB) (ReorderBuffer *rb, + ReorderBufferTXN *txn, + XLogRecPtr message_lsn, + bool transactional, + const char *prefix, Size sz, + const char *message); + +/* begin prepare callback signature */ +typedef void (*ReorderBufferBeginPrepareCB) (ReorderBuffer *rb, + ReorderBufferTXN *txn); + +/* prepare callback signature */ +typedef void (*ReorderBufferPrepareCB) (ReorderBuffer *rb, + ReorderBufferTXN *txn, + XLogRecPtr prepare_lsn); + +/* commit prepared callback signature */ +typedef void (*ReorderBufferCommitPreparedCB) (ReorderBuffer *rb, + ReorderBufferTXN *txn, + XLogRecPtr commit_lsn); + +/* rollback prepared callback signature */ +typedef void (*ReorderBufferRollbackPreparedCB) (ReorderBuffer *rb, + ReorderBufferTXN *txn, + XLogRecPtr prepare_end_lsn, + TimestampTz prepare_time); + +/* start streaming transaction callback signature */ +typedef void (*ReorderBufferStreamStartCB) ( + ReorderBuffer *rb, + ReorderBufferTXN *txn, + XLogRecPtr first_lsn); + +/* stop streaming transaction callback signature */ +typedef void (*ReorderBufferStreamStopCB) ( + ReorderBuffer *rb, + ReorderBufferTXN *txn, + XLogRecPtr last_lsn); + +/* discard streamed transaction callback signature */ +typedef void (*ReorderBufferStreamAbortCB) ( + ReorderBuffer *rb, + ReorderBufferTXN *txn, + XLogRecPtr abort_lsn); + +/* prepare streamed transaction callback signature */ +typedef void (*ReorderBufferStreamPrepareCB) ( + ReorderBuffer *rb, + ReorderBufferTXN *txn, + XLogRecPtr prepare_lsn); + +/* commit streamed transaction callback signature */ +typedef void (*ReorderBufferStreamCommitCB) ( + ReorderBuffer *rb, + ReorderBufferTXN *txn, + XLogRecPtr commit_lsn); + +/* stream change callback signature */ +typedef void (*ReorderBufferStreamChangeCB) ( + ReorderBuffer *rb, + ReorderBufferTXN *txn, + Relation relation, + ReorderBufferChange *change); + +/* stream message callback signature */ +typedef void (*ReorderBufferStreamMessageCB) ( + ReorderBuffer *rb, + ReorderBufferTXN *txn, + XLogRecPtr message_lsn, + bool transactional, + const char *prefix, Size sz, + const char *message); + +/* stream truncate callback signature */ +typedef void (*ReorderBufferStreamTruncateCB) ( + ReorderBuffer *rb, + ReorderBufferTXN *txn, + int nrelations, + Relation relations[], + ReorderBufferChange *change); + +struct ReorderBuffer +{ + /* + * xid => ReorderBufferTXN lookup table + */ + HTAB *by_txn; + + /* + * Transactions that could be a toplevel xact, ordered by LSN of the first + * record bearing that xid. + */ + dlist_head toplevel_by_lsn; + + /* + * Transactions and subtransactions that have a base snapshot, ordered by + * LSN of the record which caused us to first obtain the base snapshot. + * This is not the same as toplevel_by_lsn, because we only set the base + * snapshot on the first logical-decoding-relevant record (eg. heap + * writes), whereas the initial LSN could be set by other operations. + */ + dlist_head txns_by_base_snapshot_lsn; + + /* + * one-entry sized cache for by_txn. Very frequently the same txn gets + * looked up over and over again. + */ + TransactionId by_txn_last_xid; + ReorderBufferTXN *by_txn_last_txn; + + /* + * Callbacks to be called when a transactions commits. + */ + ReorderBufferBeginCB begin; + ReorderBufferApplyChangeCB apply_change; + ReorderBufferApplyTruncateCB apply_truncate; + ReorderBufferCommitCB commit; + ReorderBufferMessageCB message; + + /* + * Callbacks to be called when streaming a transaction at prepare time. + */ + ReorderBufferBeginCB begin_prepare; + ReorderBufferPrepareCB prepare; + ReorderBufferCommitPreparedCB commit_prepared; + ReorderBufferRollbackPreparedCB rollback_prepared; + + /* + * Callbacks to be called when streaming a transaction. + */ + ReorderBufferStreamStartCB stream_start; + ReorderBufferStreamStopCB stream_stop; + ReorderBufferStreamAbortCB stream_abort; + ReorderBufferStreamPrepareCB stream_prepare; + ReorderBufferStreamCommitCB stream_commit; + ReorderBufferStreamChangeCB stream_change; + ReorderBufferStreamMessageCB stream_message; + ReorderBufferStreamTruncateCB stream_truncate; + + /* + * Pointer that will be passed untouched to the callbacks. + */ + void *private_data; + + /* + * Saved output plugin option + */ + bool output_rewrites; + + /* + * Private memory context. + */ + MemoryContext context; + + /* + * Memory contexts for specific types objects + */ + MemoryContext change_context; + MemoryContext txn_context; + MemoryContext tup_context; + + XLogRecPtr current_restart_decoding_lsn; + + /* buffer for disk<->memory conversions */ + char *outbuf; + Size outbufsize; + + /* memory accounting */ + Size size; + + /* + * Statistics about transactions spilled to disk. + * + * A single transaction may be spilled repeatedly, which is why we keep + * two different counters. For spilling, the transaction counter includes + * both toplevel transactions and subtransactions. + */ + int64 spillTxns; /* number of transactions spilled to disk */ + int64 spillCount; /* spill-to-disk invocation counter */ + int64 spillBytes; /* amount of data spilled to disk */ + + /* Statistics about transactions streamed to the decoding output plugin */ + int64 streamTxns; /* number of transactions streamed */ + int64 streamCount; /* streaming invocation counter */ + int64 streamBytes; /* amount of data decoded */ + + /* + * Statistics about all the transactions sent to the decoding output + * plugin + */ + int64 totalTxns; /* total number of transactions sent */ + int64 totalBytes; /* total amount of data decoded */ +}; + + +extern ReorderBuffer *ReorderBufferAllocate(void); +extern void ReorderBufferFree(ReorderBuffer *); + +extern ReorderBufferTupleBuf *ReorderBufferGetTupleBuf(ReorderBuffer *, Size tuple_len); +extern void ReorderBufferReturnTupleBuf(ReorderBuffer *, ReorderBufferTupleBuf *tuple); +extern ReorderBufferChange *ReorderBufferGetChange(ReorderBuffer *); +extern void ReorderBufferReturnChange(ReorderBuffer *, ReorderBufferChange *, bool); + +extern Oid *ReorderBufferGetRelids(ReorderBuffer *, int nrelids); +extern void ReorderBufferReturnRelids(ReorderBuffer *, Oid *relids); + +extern void ReorderBufferQueueChange(ReorderBuffer *, TransactionId, + XLogRecPtr lsn, ReorderBufferChange *, + bool toast_insert); +extern void ReorderBufferQueueMessage(ReorderBuffer *, TransactionId, Snapshot snapshot, XLogRecPtr lsn, + bool transactional, const char *prefix, + Size message_size, const char *message); +extern void ReorderBufferCommit(ReorderBuffer *, TransactionId, + XLogRecPtr commit_lsn, XLogRecPtr end_lsn, + TimestampTz commit_time, RepOriginId origin_id, XLogRecPtr origin_lsn); +extern void ReorderBufferFinishPrepared(ReorderBuffer *rb, TransactionId xid, + XLogRecPtr commit_lsn, XLogRecPtr end_lsn, + XLogRecPtr two_phase_at, + TimestampTz commit_time, + RepOriginId origin_id, XLogRecPtr origin_lsn, + char *gid, bool is_commit); +extern void ReorderBufferAssignChild(ReorderBuffer *, TransactionId, TransactionId, XLogRecPtr commit_lsn); +extern void ReorderBufferCommitChild(ReorderBuffer *, TransactionId, TransactionId, + XLogRecPtr commit_lsn, XLogRecPtr end_lsn); +extern void ReorderBufferAbort(ReorderBuffer *, TransactionId, XLogRecPtr lsn); +extern void ReorderBufferAbortOld(ReorderBuffer *, TransactionId xid); +extern void ReorderBufferForget(ReorderBuffer *, TransactionId, XLogRecPtr lsn); +extern void ReorderBufferInvalidate(ReorderBuffer *, TransactionId, XLogRecPtr lsn); + +extern void ReorderBufferSetBaseSnapshot(ReorderBuffer *, TransactionId, XLogRecPtr lsn, struct SnapshotData *snap); +extern void ReorderBufferAddSnapshot(ReorderBuffer *, TransactionId, XLogRecPtr lsn, struct SnapshotData *snap); +extern void ReorderBufferAddNewCommandId(ReorderBuffer *, TransactionId, XLogRecPtr lsn, + CommandId cid); +extern void ReorderBufferAddNewTupleCids(ReorderBuffer *, TransactionId, XLogRecPtr lsn, + RelFileNode node, ItemPointerData pt, + CommandId cmin, CommandId cmax, CommandId combocid); +extern void ReorderBufferAddInvalidations(ReorderBuffer *, TransactionId, XLogRecPtr lsn, + Size nmsgs, SharedInvalidationMessage *msgs); +extern void ReorderBufferImmediateInvalidation(ReorderBuffer *, uint32 ninvalidations, + SharedInvalidationMessage *invalidations); +extern void ReorderBufferProcessXid(ReorderBuffer *, TransactionId xid, XLogRecPtr lsn); + +extern void ReorderBufferXidSetCatalogChanges(ReorderBuffer *, TransactionId xid, XLogRecPtr lsn); +extern bool ReorderBufferXidHasCatalogChanges(ReorderBuffer *, TransactionId xid); +extern bool ReorderBufferXidHasBaseSnapshot(ReorderBuffer *, TransactionId xid); + +extern bool ReorderBufferRememberPrepareInfo(ReorderBuffer *rb, TransactionId xid, + XLogRecPtr prepare_lsn, XLogRecPtr end_lsn, + TimestampTz prepare_time, + RepOriginId origin_id, XLogRecPtr origin_lsn); +extern void ReorderBufferSkipPrepare(ReorderBuffer *rb, TransactionId xid); +extern void ReorderBufferPrepare(ReorderBuffer *rb, TransactionId xid, char *gid); +extern ReorderBufferTXN *ReorderBufferGetOldestTXN(ReorderBuffer *); +extern TransactionId ReorderBufferGetOldestXmin(ReorderBuffer *rb); + +extern void ReorderBufferSetRestartPoint(ReorderBuffer *, XLogRecPtr ptr); + +extern void StartupReorderBuffer(void); + +#endif diff --git a/src/include/replication/slot.h b/src/include/replication/slot.h new file mode 100644 index 0000000..deba2c4 --- /dev/null +++ b/src/include/replication/slot.h @@ -0,0 +1,230 @@ +/*------------------------------------------------------------------------- + * slot.h + * Replication slot management. + * + * Copyright (c) 2012-2022, PostgreSQL Global Development Group + * + *------------------------------------------------------------------------- + */ +#ifndef SLOT_H +#define SLOT_H + +#include "access/xlog.h" +#include "access/xlogreader.h" +#include "storage/condition_variable.h" +#include "storage/lwlock.h" +#include "storage/shmem.h" +#include "storage/spin.h" +#include "replication/walreceiver.h" + +/* + * Behaviour of replication slots, upon release or crash. + * + * Slots marked as PERSISTENT are crash-safe and will not be dropped when + * released. Slots marked as EPHEMERAL will be dropped when released or after + * restarts. Slots marked TEMPORARY will be dropped at the end of a session + * or on error. + * + * EPHEMERAL is used as a not-quite-ready state when creating persistent + * slots. EPHEMERAL slots can be made PERSISTENT by calling + * ReplicationSlotPersist(). For a slot that goes away at the end of a + * session, TEMPORARY is the appropriate choice. + */ +typedef enum ReplicationSlotPersistency +{ + RS_PERSISTENT, + RS_EPHEMERAL, + RS_TEMPORARY +} ReplicationSlotPersistency; + +/* + * On-Disk data of a replication slot, preserved across restarts. + */ +typedef struct ReplicationSlotPersistentData +{ + /* The slot's identifier */ + NameData name; + + /* database the slot is active on */ + Oid database; + + /* + * The slot's behaviour when being dropped (or restored after a crash). + */ + ReplicationSlotPersistency persistency; + + /* + * xmin horizon for data + * + * NB: This may represent a value that hasn't been written to disk yet; + * see notes for effective_xmin, below. + */ + TransactionId xmin; + + /* + * xmin horizon for catalog tuples + * + * NB: This may represent a value that hasn't been written to disk yet; + * see notes for effective_xmin, below. + */ + TransactionId catalog_xmin; + + /* oldest LSN that might be required by this replication slot */ + XLogRecPtr restart_lsn; + + /* restart_lsn is copied here when the slot is invalidated */ + XLogRecPtr invalidated_at; + + /* + * Oldest LSN that the client has acked receipt for. This is used as the + * start_lsn point in case the client doesn't specify one, and also as a + * safety measure to jump forwards in case the client specifies a + * start_lsn that's further in the past than this value. + */ + XLogRecPtr confirmed_flush; + + /* + * LSN at which we enabled two_phase commit for this slot or LSN at which + * we found a consistent point at the time of slot creation. + */ + XLogRecPtr two_phase_at; + + /* + * Allow decoding of prepared transactions? + */ + bool two_phase; + + /* plugin name */ + NameData plugin; +} ReplicationSlotPersistentData; + +/* + * Shared memory state of a single replication slot. + * + * The in-memory data of replication slots follows a locking model based + * on two linked concepts: + * - A replication slot's in_use flag is switched when added or discarded using + * the LWLock ReplicationSlotControlLock, which needs to be hold in exclusive + * mode when updating the flag by the backend owning the slot and doing the + * operation, while readers (concurrent backends not owning the slot) need + * to hold it in shared mode when looking at replication slot data. + * - Individual fields are protected by mutex where only the backend owning + * the slot is authorized to update the fields from its own slot. The + * backend owning the slot does not need to take this lock when reading its + * own fields, while concurrent backends not owning this slot should take the + * lock when reading this slot's data. + */ +typedef struct ReplicationSlot +{ + /* lock, on same cacheline as effective_xmin */ + slock_t mutex; + + /* is this slot defined */ + bool in_use; + + /* Who is streaming out changes for this slot? 0 in unused slots. */ + pid_t active_pid; + + /* any outstanding modifications? */ + bool just_dirtied; + bool dirty; + + /* + * For logical decoding, it's extremely important that we never remove any + * data that's still needed for decoding purposes, even after a crash; + * otherwise, decoding will produce wrong answers. Ordinary streaming + * replication also needs to prevent old row versions from being removed + * too soon, but the worst consequence we might encounter there is + * unwanted query cancellations on the standby. Thus, for logical + * decoding, this value represents the latest xmin that has actually been + * written to disk, whereas for streaming replication, it's just the same + * as the persistent value (data.xmin). + */ + TransactionId effective_xmin; + TransactionId effective_catalog_xmin; + + /* data surviving shutdowns and crashes */ + ReplicationSlotPersistentData data; + + /* is somebody performing io on this slot? */ + LWLock io_in_progress_lock; + + /* Condition variable signaled when active_pid changes */ + ConditionVariable active_cv; + + /* all the remaining data is only used for logical slots */ + + /* + * When the client has confirmed flushes >= candidate_xmin_lsn we can + * advance the catalog xmin. When restart_valid has been passed, + * restart_lsn can be increased. + */ + TransactionId candidate_catalog_xmin; + XLogRecPtr candidate_xmin_lsn; + XLogRecPtr candidate_restart_valid; + XLogRecPtr candidate_restart_lsn; +} ReplicationSlot; + +#define SlotIsPhysical(slot) ((slot)->data.database == InvalidOid) +#define SlotIsLogical(slot) ((slot)->data.database != InvalidOid) + +/* + * Shared memory control area for all of replication slots. + */ +typedef struct ReplicationSlotCtlData +{ + /* + * This array should be declared [FLEXIBLE_ARRAY_MEMBER], but for some + * reason you can't do that in an otherwise-empty struct. + */ + ReplicationSlot replication_slots[1]; +} ReplicationSlotCtlData; + +/* + * Pointers to shared memory + */ +extern PGDLLIMPORT ReplicationSlotCtlData *ReplicationSlotCtl; +extern PGDLLIMPORT ReplicationSlot *MyReplicationSlot; + +/* GUCs */ +extern PGDLLIMPORT int max_replication_slots; + +/* shmem initialization functions */ +extern Size ReplicationSlotsShmemSize(void); +extern void ReplicationSlotsShmemInit(void); + +/* management of individual slots */ +extern void ReplicationSlotCreate(const char *name, bool db_specific, + ReplicationSlotPersistency p, bool two_phase); +extern void ReplicationSlotPersist(void); +extern void ReplicationSlotDrop(const char *name, bool nowait); + +extern void ReplicationSlotAcquire(const char *name, bool nowait); +extern void ReplicationSlotRelease(void); +extern void ReplicationSlotCleanup(void); +extern void ReplicationSlotSave(void); +extern void ReplicationSlotMarkDirty(void); + +/* misc stuff */ +extern void ReplicationSlotInitialize(void); +extern bool ReplicationSlotValidateName(const char *name, int elevel); +extern void ReplicationSlotReserveWal(void); +extern void ReplicationSlotsComputeRequiredXmin(bool already_locked); +extern void ReplicationSlotsComputeRequiredLSN(void); +extern XLogRecPtr ReplicationSlotsComputeLogicalRestartLSN(void); +extern bool ReplicationSlotsCountDBSlots(Oid dboid, int *nslots, int *nactive); +extern void ReplicationSlotsDropDBSlots(Oid dboid); +extern bool InvalidateObsoleteReplicationSlots(XLogSegNo oldestSegno); +extern ReplicationSlot *SearchNamedReplicationSlot(const char *name, bool need_lock); +extern int ReplicationSlotIndex(ReplicationSlot *slot); +extern bool ReplicationSlotName(int index, Name name); +extern void ReplicationSlotNameForTablesync(Oid suboid, Oid relid, char *syncslotname, int szslot); +extern void ReplicationSlotDropAtPubNode(WalReceiverConn *wrconn, char *slotname, bool missing_ok); + +extern void StartupReplicationSlots(void); +extern void CheckPointReplicationSlots(void); + +extern void CheckSlotRequirements(void); +extern void CheckSlotPermissions(void); + +#endif /* SLOT_H */ diff --git a/src/include/replication/snapbuild.h b/src/include/replication/snapbuild.h new file mode 100644 index 0000000..53d83f3 --- /dev/null +++ b/src/include/replication/snapbuild.h @@ -0,0 +1,97 @@ +/*------------------------------------------------------------------------- + * + * snapbuild.h + * Exports from replication/logical/snapbuild.c. + * + * Copyright (c) 2012-2022, PostgreSQL Global Development Group + * + * src/include/replication/snapbuild.h + * + *------------------------------------------------------------------------- + */ +#ifndef SNAPBUILD_H +#define SNAPBUILD_H + +#include "access/xlogdefs.h" +#include "utils/snapmgr.h" + +typedef enum +{ + /* + * Initial state, we can't do much yet. + */ + SNAPBUILD_START = -1, + + /* + * Collecting committed transactions, to build the initial catalog + * snapshot. + */ + SNAPBUILD_BUILDING_SNAPSHOT = 0, + + /* + * We have collected enough information to decode tuples in transactions + * that started after this. + * + * Once we reached this we start to collect changes. We cannot apply them + * yet, because they might be based on transactions that were still + * running when FULL_SNAPSHOT was reached. + */ + SNAPBUILD_FULL_SNAPSHOT = 1, + + /* + * Found a point after SNAPBUILD_FULL_SNAPSHOT where all transactions that + * were running at that point finished. Till we reach that we hold off + * calling any commit callbacks. + */ + SNAPBUILD_CONSISTENT = 2 +} SnapBuildState; + +/* forward declare so we don't have to expose the struct to the public */ +struct SnapBuild; +typedef struct SnapBuild SnapBuild; + +/* forward declare so we don't have to include reorderbuffer.h */ +struct ReorderBuffer; + +/* forward declare so we don't have to include heapam_xlog.h */ +struct xl_heap_new_cid; +struct xl_running_xacts; + +extern void CheckPointSnapBuild(void); + +extern SnapBuild *AllocateSnapshotBuilder(struct ReorderBuffer *cache, + TransactionId xmin_horizon, XLogRecPtr start_lsn, + bool need_full_snapshot, + XLogRecPtr two_phase_at); +extern void FreeSnapshotBuilder(SnapBuild *cache); + +extern void SnapBuildSnapDecRefcount(Snapshot snap); + +extern Snapshot SnapBuildInitialSnapshot(SnapBuild *builder); +extern const char *SnapBuildExportSnapshot(SnapBuild *snapstate); +extern void SnapBuildClearExportedSnapshot(void); +extern void SnapBuildResetExportedSnapshotState(void); + +extern SnapBuildState SnapBuildCurrentState(SnapBuild *snapstate); +extern Snapshot SnapBuildGetOrBuildSnapshot(SnapBuild *builder, + TransactionId xid); + +extern bool SnapBuildXactNeedsSkip(SnapBuild *snapstate, XLogRecPtr ptr); +extern XLogRecPtr SnapBuildGetTwoPhaseAt(SnapBuild *builder); +extern void SnapBuildSetTwoPhaseAt(SnapBuild *builder, XLogRecPtr ptr); + +extern void SnapBuildCommitTxn(SnapBuild *builder, XLogRecPtr lsn, + TransactionId xid, int nsubxacts, + TransactionId *subxacts); +extern bool SnapBuildProcessChange(SnapBuild *builder, TransactionId xid, + XLogRecPtr lsn); +extern void SnapBuildProcessNewCid(SnapBuild *builder, TransactionId xid, + XLogRecPtr lsn, struct xl_heap_new_cid *cid); +extern void SnapBuildProcessRunningXacts(SnapBuild *builder, XLogRecPtr lsn, + struct xl_running_xacts *running); +extern void SnapBuildSerializationPoint(SnapBuild *builder, XLogRecPtr lsn); + +extern void SnapBuildXidSetCatalogChanges(SnapBuild *builder, TransactionId xid, + int subxcnt, TransactionId *subxacts, + XLogRecPtr lsn); +#endif /* SNAPBUILD_H */ diff --git a/src/include/replication/syncrep.h b/src/include/replication/syncrep.h new file mode 100644 index 0000000..4d7c90b --- /dev/null +++ b/src/include/replication/syncrep.h @@ -0,0 +1,115 @@ +/*------------------------------------------------------------------------- + * + * syncrep.h + * Exports from replication/syncrep.c. + * + * Portions Copyright (c) 2010-2022, PostgreSQL Global Development Group + * + * IDENTIFICATION + * src/include/replication/syncrep.h + * + *------------------------------------------------------------------------- + */ +#ifndef _SYNCREP_H +#define _SYNCREP_H + +#include "access/xlogdefs.h" +#include "utils/guc.h" + +#define SyncRepRequested() \ + (max_wal_senders > 0 && synchronous_commit > SYNCHRONOUS_COMMIT_LOCAL_FLUSH) + +/* SyncRepWaitMode */ +#define SYNC_REP_NO_WAIT (-1) +#define SYNC_REP_WAIT_WRITE 0 +#define SYNC_REP_WAIT_FLUSH 1 +#define SYNC_REP_WAIT_APPLY 2 + +#define NUM_SYNC_REP_WAIT_MODE 3 + +/* syncRepState */ +#define SYNC_REP_NOT_WAITING 0 +#define SYNC_REP_WAITING 1 +#define SYNC_REP_WAIT_COMPLETE 2 + +/* syncrep_method of SyncRepConfigData */ +#define SYNC_REP_PRIORITY 0 +#define SYNC_REP_QUORUM 1 + +/* + * SyncRepGetCandidateStandbys returns an array of these structs, + * one per candidate synchronous walsender. + */ +typedef struct SyncRepStandbyData +{ + /* Copies of relevant fields from WalSnd shared-memory struct */ + pid_t pid; + XLogRecPtr write; + XLogRecPtr flush; + XLogRecPtr apply; + int sync_standby_priority; + /* Index of this walsender in the WalSnd shared-memory array */ + int walsnd_index; + /* This flag indicates whether this struct is about our own process */ + bool is_me; +} SyncRepStandbyData; + +/* + * Struct for the configuration of synchronous replication. + * + * Note: this must be a flat representation that can be held in a single + * chunk of malloc'd memory, so that it can be stored as the "extra" data + * for the synchronous_standby_names GUC. + */ +typedef struct SyncRepConfigData +{ + int config_size; /* total size of this struct, in bytes */ + int num_sync; /* number of sync standbys that we need to + * wait for */ + uint8 syncrep_method; /* method to choose sync standbys */ + int nmembers; /* number of members in the following list */ + /* member_names contains nmembers consecutive nul-terminated C strings */ + char member_names[FLEXIBLE_ARRAY_MEMBER]; +} SyncRepConfigData; + +extern PGDLLIMPORT SyncRepConfigData *SyncRepConfig; + +/* communication variables for parsing synchronous_standby_names GUC */ +extern PGDLLIMPORT SyncRepConfigData *syncrep_parse_result; +extern PGDLLIMPORT char *syncrep_parse_error_msg; + +/* user-settable parameters for synchronous replication */ +extern PGDLLIMPORT char *SyncRepStandbyNames; + +/* called by user backend */ +extern void SyncRepWaitForLSN(XLogRecPtr lsn, bool commit); + +/* called at backend exit */ +extern void SyncRepCleanupAtProcExit(void); + +/* called by wal sender */ +extern void SyncRepInitConfig(void); +extern void SyncRepReleaseWaiters(void); + +/* called by wal sender and user backend */ +extern int SyncRepGetCandidateStandbys(SyncRepStandbyData **standbys); + +/* called by checkpointer */ +extern void SyncRepUpdateSyncStandbysDefined(void); + +/* GUC infrastructure */ +extern bool check_synchronous_standby_names(char **newval, void **extra, GucSource source); +extern void assign_synchronous_standby_names(const char *newval, void *extra); +extern void assign_synchronous_commit(int newval, void *extra); + +/* + * Internal functions for parsing synchronous_standby_names grammar, + * in syncrep_gram.y and syncrep_scanner.l + */ +extern int syncrep_yyparse(void); +extern int syncrep_yylex(void); +extern void syncrep_yyerror(const char *str); +extern void syncrep_scanner_init(const char *query_string); +extern void syncrep_scanner_finish(void); + +#endif /* _SYNCREP_H */ diff --git a/src/include/replication/walreceiver.h b/src/include/replication/walreceiver.h new file mode 100644 index 0000000..81184aa --- /dev/null +++ b/src/include/replication/walreceiver.h @@ -0,0 +1,472 @@ +/*------------------------------------------------------------------------- + * + * walreceiver.h + * Exports from replication/walreceiverfuncs.c. + * + * Portions Copyright (c) 2010-2022, PostgreSQL Global Development Group + * + * src/include/replication/walreceiver.h + * + *------------------------------------------------------------------------- + */ +#ifndef _WALRECEIVER_H +#define _WALRECEIVER_H + +#include "access/xlog.h" +#include "access/xlogdefs.h" +#include "getaddrinfo.h" /* for NI_MAXHOST */ +#include "pgtime.h" +#include "port/atomics.h" +#include "replication/logicalproto.h" +#include "replication/walsender.h" +#include "storage/condition_variable.h" +#include "storage/latch.h" +#include "storage/spin.h" +#include "utils/tuplestore.h" + +/* user-settable parameters */ +extern PGDLLIMPORT int wal_receiver_status_interval; +extern PGDLLIMPORT int wal_receiver_timeout; +extern PGDLLIMPORT bool hot_standby_feedback; + +/* + * MAXCONNINFO: maximum size of a connection string. + * + * XXX: Should this move to pg_config_manual.h? + */ +#define MAXCONNINFO 1024 + +/* Can we allow the standby to accept replication connection from another standby? */ +#define AllowCascadeReplication() (EnableHotStandby && max_wal_senders > 0) + +/* + * Values for WalRcv->walRcvState. + */ +typedef enum +{ + WALRCV_STOPPED, /* stopped and mustn't start up again */ + WALRCV_STARTING, /* launched, but the process hasn't + * initialized yet */ + WALRCV_STREAMING, /* walreceiver is streaming */ + WALRCV_WAITING, /* stopped streaming, waiting for orders */ + WALRCV_RESTARTING, /* asked to restart streaming */ + WALRCV_STOPPING /* requested to stop, but still running */ +} WalRcvState; + +/* Shared memory area for management of walreceiver process */ +typedef struct +{ + /* + * PID of currently active walreceiver process, its current state and + * start time (actually, the time at which it was requested to be + * started). + */ + pid_t pid; + WalRcvState walRcvState; + ConditionVariable walRcvStoppedCV; + pg_time_t startTime; + + /* + * receiveStart and receiveStartTLI indicate the first byte position and + * timeline that will be received. When startup process starts the + * walreceiver, it sets these to the point where it wants the streaming to + * begin. + */ + XLogRecPtr receiveStart; + TimeLineID receiveStartTLI; + + /* + * flushedUpto-1 is the last byte position that has already been received, + * and receivedTLI is the timeline it came from. At the first startup of + * walreceiver, these are set to receiveStart and receiveStartTLI. After + * that, walreceiver updates these whenever it flushes the received WAL to + * disk. + */ + XLogRecPtr flushedUpto; + TimeLineID receivedTLI; + + /* + * latestChunkStart is the starting byte position of the current "batch" + * of received WAL. It's actually the same as the previous value of + * flushedUpto before the last flush to disk. Startup process can use + * this to detect whether it's keeping up or not. + */ + XLogRecPtr latestChunkStart; + + /* + * Time of send and receive of any message received. + */ + TimestampTz lastMsgSendTime; + TimestampTz lastMsgReceiptTime; + + /* + * Latest reported end of WAL on the sender + */ + XLogRecPtr latestWalEnd; + TimestampTz latestWalEndTime; + + /* + * connection string; initially set to connect to the primary, and later + * clobbered to hide security-sensitive fields. + */ + char conninfo[MAXCONNINFO]; + + /* + * Host name (this can be a host name, an IP address, or a directory path) + * and port number of the active replication connection. + */ + char sender_host[NI_MAXHOST]; + int sender_port; + + /* + * replication slot name; is also used for walreceiver to connect with the + * primary + */ + char slotname[NAMEDATALEN]; + + /* + * If it's a temporary replication slot, it needs to be recreated when + * connecting. + */ + bool is_temp_slot; + + /* set true once conninfo is ready to display (obfuscated pwds etc) */ + bool ready_to_display; + + /* + * Latch used by startup process to wake up walreceiver after telling it + * where to start streaming (after setting receiveStart and + * receiveStartTLI), and also to tell it to send apply feedback to the + * primary whenever specially marked commit records are applied. This is + * normally mapped to procLatch when walreceiver is running. + */ + Latch *latch; + + slock_t mutex; /* locks shared variables shown above */ + + /* + * Like flushedUpto, but advanced after writing and before flushing, + * without the need to acquire the spin lock. Data can be read by another + * process up to this point, but shouldn't be used for data integrity + * purposes. + */ + pg_atomic_uint64 writtenUpto; + + /* + * force walreceiver reply? This doesn't need to be locked; memory + * barriers for ordering are sufficient. But we do need atomic fetch and + * store semantics, so use sig_atomic_t. + */ + sig_atomic_t force_reply; /* used as a bool */ +} WalRcvData; + +extern PGDLLIMPORT WalRcvData *WalRcv; + +typedef struct +{ + bool logical; /* True if this is logical replication stream, + * false if physical stream. */ + char *slotname; /* Name of the replication slot or NULL. */ + XLogRecPtr startpoint; /* LSN of starting point. */ + + union + { + struct + { + TimeLineID startpointTLI; /* Starting timeline */ + } physical; + struct + { + uint32 proto_version; /* Logical protocol version */ + List *publication_names; /* String list of publications */ + bool binary; /* Ask publisher to use binary */ + bool streaming; /* Streaming of large transactions */ + bool twophase; /* Streaming of two-phase transactions at + * prepare time */ + } logical; + } proto; +} WalRcvStreamOptions; + +struct WalReceiverConn; +typedef struct WalReceiverConn WalReceiverConn; + +/* + * Status of walreceiver query execution. + * + * We only define statuses that are currently used. + */ +typedef enum +{ + WALRCV_ERROR, /* There was error when executing the query. */ + WALRCV_OK_COMMAND, /* Query executed utility or replication + * command. */ + WALRCV_OK_TUPLES, /* Query returned tuples. */ + WALRCV_OK_COPY_IN, /* Query started COPY FROM. */ + WALRCV_OK_COPY_OUT, /* Query started COPY TO. */ + WALRCV_OK_COPY_BOTH /* Query started COPY BOTH replication + * protocol. */ +} WalRcvExecStatus; + +/* + * Return value for walrcv_exec, returns the status of the execution and + * tuples if any. + */ +typedef struct WalRcvExecResult +{ + WalRcvExecStatus status; + int sqlstate; + char *err; + Tuplestorestate *tuplestore; + TupleDesc tupledesc; +} WalRcvExecResult; + +/* WAL receiver - libpqwalreceiver hooks */ + +/* + * walrcv_connect_fn + * + * Establish connection to a cluster. 'logical' is true if the + * connection is logical, and false if the connection is physical. + * 'appname' is a name associated to the connection, to use for example + * with fallback_application_name or application_name. Returns the + * details about the connection established, as defined by + * WalReceiverConn for each WAL receiver module. On error, NULL is + * returned with 'err' including the error generated. + */ +typedef WalReceiverConn *(*walrcv_connect_fn) (const char *conninfo, + bool logical, + const char *appname, + char **err); + +/* + * walrcv_check_conninfo_fn + * + * Parse and validate the connection string given as of 'conninfo'. + */ +typedef void (*walrcv_check_conninfo_fn) (const char *conninfo); + +/* + * walrcv_get_conninfo_fn + * + * Returns a user-displayable conninfo string. Note that any + * security-sensitive fields should be obfuscated. + */ +typedef char *(*walrcv_get_conninfo_fn) (WalReceiverConn *conn); + +/* + * walrcv_get_senderinfo_fn + * + * Provide information of the WAL sender this WAL receiver is connected + * to, as of 'sender_host' for the host of the sender and 'sender_port' + * for its port. + */ +typedef void (*walrcv_get_senderinfo_fn) (WalReceiverConn *conn, + char **sender_host, + int *sender_port); + +/* + * walrcv_identify_system_fn + * + * Run IDENTIFY_SYSTEM on the cluster connected to and validate the + * identity of the cluster. Returns the system ID of the cluster + * connected to. 'primary_tli' is the timeline ID of the sender. + */ +typedef char *(*walrcv_identify_system_fn) (WalReceiverConn *conn, + TimeLineID *primary_tli); + +/* + * walrcv_server_version_fn + * + * Returns the version number of the cluster connected to. + */ +typedef int (*walrcv_server_version_fn) (WalReceiverConn *conn); + +/* + * walrcv_readtimelinehistoryfile_fn + * + * Fetch from cluster the timeline history file for timeline 'tli'. + * Returns the name of the timeline history file as of 'filename', its + * contents as of 'content' and its 'size'. + */ +typedef void (*walrcv_readtimelinehistoryfile_fn) (WalReceiverConn *conn, + TimeLineID tli, + char **filename, + char **content, + int *size); + +/* + * walrcv_startstreaming_fn + * + * Start streaming WAL data from given streaming options. Returns true + * if the connection has switched successfully to copy-both mode and false + * if the server received the command and executed it successfully, but + * didn't switch to copy-mode. + */ +typedef bool (*walrcv_startstreaming_fn) (WalReceiverConn *conn, + const WalRcvStreamOptions *options); + +/* + * walrcv_endstreaming_fn + * + * Stop streaming of WAL data. Returns the next timeline ID of the cluster + * connected to in 'next_tli', or 0 if there was no report. + */ +typedef void (*walrcv_endstreaming_fn) (WalReceiverConn *conn, + TimeLineID *next_tli); + +/* + * walrcv_receive_fn + * + * Receive a message available from the WAL stream. 'buffer' is a pointer + * to a buffer holding the message received. Returns the length of the data, + * 0 if no data is available yet ('wait_fd' is a socket descriptor which can + * be waited on before a retry), and -1 if the cluster ended the COPY. + */ +typedef int (*walrcv_receive_fn) (WalReceiverConn *conn, + char **buffer, + pgsocket *wait_fd); + +/* + * walrcv_send_fn + * + * Send a message of size 'nbytes' to the WAL stream with 'buffer' as + * contents. + */ +typedef void (*walrcv_send_fn) (WalReceiverConn *conn, + const char *buffer, + int nbytes); + +/* + * walrcv_create_slot_fn + * + * Create a new replication slot named 'slotname'. 'temporary' defines + * if the slot is temporary. 'snapshot_action' defines the behavior wanted + * for an exported snapshot (see replication protocol for more details). + * 'lsn' includes the LSN position at which the created slot became + * consistent. Returns the name of the exported snapshot for a logical + * slot, or NULL for a physical slot. + */ +typedef char *(*walrcv_create_slot_fn) (WalReceiverConn *conn, + const char *slotname, + bool temporary, + bool two_phase, + CRSSnapshotAction snapshot_action, + XLogRecPtr *lsn); + +/* + * walrcv_get_backend_pid_fn + * + * Returns the PID of the remote backend process. + */ +typedef pid_t (*walrcv_get_backend_pid_fn) (WalReceiverConn *conn); + +/* + * walrcv_exec_fn + * + * Send generic queries (and commands) to the remote cluster. 'nRetTypes' + * is the expected number of returned attributes, and 'retTypes' an array + * including their type OIDs. Returns the status of the execution and + * tuples if any. + */ +typedef WalRcvExecResult *(*walrcv_exec_fn) (WalReceiverConn *conn, + const char *query, + const int nRetTypes, + const Oid *retTypes); + +/* + * walrcv_disconnect_fn + * + * Disconnect with the cluster. + */ +typedef void (*walrcv_disconnect_fn) (WalReceiverConn *conn); + +typedef struct WalReceiverFunctionsType +{ + walrcv_connect_fn walrcv_connect; + walrcv_check_conninfo_fn walrcv_check_conninfo; + walrcv_get_conninfo_fn walrcv_get_conninfo; + walrcv_get_senderinfo_fn walrcv_get_senderinfo; + walrcv_identify_system_fn walrcv_identify_system; + walrcv_server_version_fn walrcv_server_version; + walrcv_readtimelinehistoryfile_fn walrcv_readtimelinehistoryfile; + walrcv_startstreaming_fn walrcv_startstreaming; + walrcv_endstreaming_fn walrcv_endstreaming; + walrcv_receive_fn walrcv_receive; + walrcv_send_fn walrcv_send; + walrcv_create_slot_fn walrcv_create_slot; + walrcv_get_backend_pid_fn walrcv_get_backend_pid; + walrcv_exec_fn walrcv_exec; + walrcv_disconnect_fn walrcv_disconnect; +} WalReceiverFunctionsType; + +extern PGDLLIMPORT WalReceiverFunctionsType *WalReceiverFunctions; + +#define walrcv_connect(conninfo, logical, appname, err) \ + WalReceiverFunctions->walrcv_connect(conninfo, logical, appname, err) +#define walrcv_check_conninfo(conninfo) \ + WalReceiverFunctions->walrcv_check_conninfo(conninfo) +#define walrcv_get_conninfo(conn) \ + WalReceiverFunctions->walrcv_get_conninfo(conn) +#define walrcv_get_senderinfo(conn, sender_host, sender_port) \ + WalReceiverFunctions->walrcv_get_senderinfo(conn, sender_host, sender_port) +#define walrcv_identify_system(conn, primary_tli) \ + WalReceiverFunctions->walrcv_identify_system(conn, primary_tli) +#define walrcv_server_version(conn) \ + WalReceiverFunctions->walrcv_server_version(conn) +#define walrcv_readtimelinehistoryfile(conn, tli, filename, content, size) \ + WalReceiverFunctions->walrcv_readtimelinehistoryfile(conn, tli, filename, content, size) +#define walrcv_startstreaming(conn, options) \ + WalReceiverFunctions->walrcv_startstreaming(conn, options) +#define walrcv_endstreaming(conn, next_tli) \ + WalReceiverFunctions->walrcv_endstreaming(conn, next_tli) +#define walrcv_receive(conn, buffer, wait_fd) \ + WalReceiverFunctions->walrcv_receive(conn, buffer, wait_fd) +#define walrcv_send(conn, buffer, nbytes) \ + WalReceiverFunctions->walrcv_send(conn, buffer, nbytes) +#define walrcv_create_slot(conn, slotname, temporary, two_phase, snapshot_action, lsn) \ + WalReceiverFunctions->walrcv_create_slot(conn, slotname, temporary, two_phase, snapshot_action, lsn) +#define walrcv_get_backend_pid(conn) \ + WalReceiverFunctions->walrcv_get_backend_pid(conn) +#define walrcv_exec(conn, exec, nRetTypes, retTypes) \ + WalReceiverFunctions->walrcv_exec(conn, exec, nRetTypes, retTypes) +#define walrcv_disconnect(conn) \ + WalReceiverFunctions->walrcv_disconnect(conn) + +static inline void +walrcv_clear_result(WalRcvExecResult *walres) +{ + if (!walres) + return; + + if (walres->err) + pfree(walres->err); + + if (walres->tuplestore) + tuplestore_end(walres->tuplestore); + + if (walres->tupledesc) + FreeTupleDesc(walres->tupledesc); + + pfree(walres); +} + +/* prototypes for functions in walreceiver.c */ +extern void WalReceiverMain(void) pg_attribute_noreturn(); +extern void ProcessWalRcvInterrupts(void); + +/* prototypes for functions in walreceiverfuncs.c */ +extern Size WalRcvShmemSize(void); +extern void WalRcvShmemInit(void); +extern void ShutdownWalRcv(void); +extern bool WalRcvStreaming(void); +extern bool WalRcvRunning(void); +extern void RequestXLogStreaming(TimeLineID tli, XLogRecPtr recptr, + const char *conninfo, const char *slotname, + bool create_temp_slot); +extern XLogRecPtr GetWalRcvFlushRecPtr(XLogRecPtr *latestChunkStart, TimeLineID *receiveTLI); +extern XLogRecPtr GetWalRcvWriteRecPtr(void); +extern int GetReplicationApplyDelay(void); +extern int GetReplicationTransferLatency(void); +extern void WalRcvForceReply(void); + +#endif /* _WALRECEIVER_H */ diff --git a/src/include/replication/walsender.h b/src/include/replication/walsender.h new file mode 100644 index 0000000..d99a21b --- /dev/null +++ b/src/include/replication/walsender.h @@ -0,0 +1,74 @@ +/*------------------------------------------------------------------------- + * + * walsender.h + * Exports from replication/walsender.c. + * + * Portions Copyright (c) 2010-2022, PostgreSQL Global Development Group + * + * src/include/replication/walsender.h + * + *------------------------------------------------------------------------- + */ +#ifndef _WALSENDER_H +#define _WALSENDER_H + +#include <signal.h> + +/* + * What to do with a snapshot in create replication slot command. + */ +typedef enum +{ + CRS_EXPORT_SNAPSHOT, + CRS_NOEXPORT_SNAPSHOT, + CRS_USE_SNAPSHOT +} CRSSnapshotAction; + +/* global state */ +extern PGDLLIMPORT bool am_walsender; +extern PGDLLIMPORT bool am_cascading_walsender; +extern PGDLLIMPORT bool am_db_walsender; +extern PGDLLIMPORT bool wake_wal_senders; + +/* user-settable parameters */ +extern PGDLLIMPORT int max_wal_senders; +extern PGDLLIMPORT int wal_sender_timeout; +extern PGDLLIMPORT bool log_replication_commands; + +extern void InitWalSender(void); +extern bool exec_replication_command(const char *query_string); +extern void WalSndErrorCleanup(void); +extern void WalSndResourceCleanup(bool isCommit); +extern void WalSndSignals(void); +extern Size WalSndShmemSize(void); +extern void WalSndShmemInit(void); +extern void WalSndWakeup(void); +extern void WalSndInitStopping(void); +extern void WalSndWaitStopping(void); +extern void HandleWalSndInitStopping(void); +extern void WalSndRqstFileReload(void); + +/* + * Remember that we want to wakeup walsenders later + * + * This is separated from doing the actual wakeup because the writeout is done + * while holding contended locks. + */ +#define WalSndWakeupRequest() \ + do { wake_wal_senders = true; } while (0) + +/* + * wakeup walsenders if there is work to be done + */ +#define WalSndWakeupProcessRequests() \ + do \ + { \ + if (wake_wal_senders) \ + { \ + wake_wal_senders = false; \ + if (max_wal_senders > 0) \ + WalSndWakeup(); \ + } \ + } while (0) + +#endif /* _WALSENDER_H */ diff --git a/src/include/replication/walsender_private.h b/src/include/replication/walsender_private.h new file mode 100644 index 0000000..c14888e --- /dev/null +++ b/src/include/replication/walsender_private.h @@ -0,0 +1,128 @@ +/*------------------------------------------------------------------------- + * + * walsender_private.h + * Private definitions from replication/walsender.c. + * + * Portions Copyright (c) 2010-2022, PostgreSQL Global Development Group + * + * src/include/replication/walsender_private.h + * + *------------------------------------------------------------------------- + */ +#ifndef _WALSENDER_PRIVATE_H +#define _WALSENDER_PRIVATE_H + +#include "access/xlog.h" +#include "nodes/nodes.h" +#include "replication/syncrep.h" +#include "storage/latch.h" +#include "storage/shmem.h" +#include "storage/spin.h" + +typedef enum WalSndState +{ + WALSNDSTATE_STARTUP = 0, + WALSNDSTATE_BACKUP, + WALSNDSTATE_CATCHUP, + WALSNDSTATE_STREAMING, + WALSNDSTATE_STOPPING +} WalSndState; + +/* + * Each walsender has a WalSnd struct in shared memory. + * + * This struct is protected by its 'mutex' spinlock field, except that some + * members are only written by the walsender process itself, and thus that + * process is free to read those members without holding spinlock. pid and + * needreload always require the spinlock to be held for all accesses. + */ +typedef struct WalSnd +{ + pid_t pid; /* this walsender's PID, or 0 if not active */ + + WalSndState state; /* this walsender's state */ + XLogRecPtr sentPtr; /* WAL has been sent up to this point */ + bool needreload; /* does currently-open file need to be + * reloaded? */ + + /* + * The xlog locations that have been written, flushed, and applied by + * standby-side. These may be invalid if the standby-side has not offered + * values yet. + */ + XLogRecPtr write; + XLogRecPtr flush; + XLogRecPtr apply; + + /* Measured lag times, or -1 for unknown/none. */ + TimeOffset writeLag; + TimeOffset flushLag; + TimeOffset applyLag; + + /* + * The priority order of the standby managed by this WALSender, as listed + * in synchronous_standby_names, or 0 if not-listed. + */ + int sync_standby_priority; + + /* Protects shared variables shown above. */ + slock_t mutex; + + /* + * Pointer to the walsender's latch. Used by backends to wake up this + * walsender when it has work to do. NULL if the walsender isn't active. + */ + Latch *latch; + + /* + * Timestamp of the last message received from standby. + */ + TimestampTz replyTime; +} WalSnd; + +extern PGDLLIMPORT WalSnd *MyWalSnd; + +/* There is one WalSndCtl struct for the whole database cluster */ +typedef struct +{ + /* + * Synchronous replication queue with one queue per request type. + * Protected by SyncRepLock. + */ + SHM_QUEUE SyncRepQueue[NUM_SYNC_REP_WAIT_MODE]; + + /* + * Current location of the head of the queue. All waiters should have a + * waitLSN that follows this value. Protected by SyncRepLock. + */ + XLogRecPtr lsn[NUM_SYNC_REP_WAIT_MODE]; + + /* + * Are any sync standbys defined? Waiting backends can't reload the + * config file safely, so checkpointer updates this value as needed. + * Protected by SyncRepLock. + */ + bool sync_standbys_defined; + + WalSnd walsnds[FLEXIBLE_ARRAY_MEMBER]; +} WalSndCtlData; + +extern PGDLLIMPORT WalSndCtlData *WalSndCtl; + + +extern void WalSndSetState(WalSndState state); + +/* + * Internal functions for parsing the replication grammar, in repl_gram.y and + * repl_scanner.l + */ +extern int replication_yyparse(void); +extern int replication_yylex(void); +extern void replication_yyerror(const char *str) pg_attribute_noreturn(); +extern void replication_scanner_init(const char *query_string); +extern void replication_scanner_finish(void); +extern bool replication_scanner_is_replication_command(void); + +extern PGDLLIMPORT Node *replication_parse_result; + +#endif /* _WALSENDER_PRIVATE_H */ diff --git a/src/include/replication/worker_internal.h b/src/include/replication/worker_internal.h new file mode 100644 index 0000000..901845a --- /dev/null +++ b/src/include/replication/worker_internal.h @@ -0,0 +1,112 @@ +/*------------------------------------------------------------------------- + * + * worker_internal.h + * Internal headers shared by logical replication workers. + * + * Portions Copyright (c) 2016-2022, PostgreSQL Global Development Group + * + * src/include/replication/worker_internal.h + * + *------------------------------------------------------------------------- + */ +#ifndef WORKER_INTERNAL_H +#define WORKER_INTERNAL_H + +#include <signal.h> + +#include "access/xlogdefs.h" +#include "catalog/pg_subscription.h" +#include "datatype/timestamp.h" +#include "storage/fileset.h" +#include "storage/lock.h" +#include "storage/spin.h" + + +typedef struct LogicalRepWorker +{ + /* Time at which this worker was launched. */ + TimestampTz launch_time; + + /* Indicates if this slot is used or free. */ + bool in_use; + + /* Increased every time the slot is taken by new worker. */ + uint16 generation; + + /* Pointer to proc array. NULL if not running. */ + PGPROC *proc; + + /* Database id to connect to. */ + Oid dbid; + + /* User to use for connection (will be same as owner of subscription). */ + Oid userid; + + /* Subscription id for the worker. */ + Oid subid; + + /* Used for initial table synchronization. */ + Oid relid; + char relstate; + XLogRecPtr relstate_lsn; + slock_t relmutex; + + /* + * Used to create the changes and subxact files for the streaming + * transactions. Upon the arrival of the first streaming transaction, the + * fileset will be initialized, and it will be deleted when the worker + * exits. Under this, separate buffiles would be created for each + * transaction which will be deleted after the transaction is finished. + */ + FileSet *stream_fileset; + + /* Stats. */ + XLogRecPtr last_lsn; + TimestampTz last_send_time; + TimestampTz last_recv_time; + XLogRecPtr reply_lsn; + TimestampTz reply_time; +} LogicalRepWorker; + +/* Main memory context for apply worker. Permanent during worker lifetime. */ +extern PGDLLIMPORT MemoryContext ApplyContext; + +/* libpqreceiver connection */ +extern PGDLLIMPORT struct WalReceiverConn *LogRepWorkerWalRcvConn; + +/* Worker and subscription objects. */ +extern PGDLLIMPORT Subscription *MySubscription; +extern PGDLLIMPORT LogicalRepWorker *MyLogicalRepWorker; + +extern PGDLLIMPORT bool in_remote_transaction; + +extern void logicalrep_worker_attach(int slot); +extern LogicalRepWorker *logicalrep_worker_find(Oid subid, Oid relid, + bool only_running); +extern List *logicalrep_workers_find(Oid subid, bool only_running); +extern void logicalrep_worker_launch(Oid dbid, Oid subid, const char *subname, + Oid userid, Oid relid); +extern void logicalrep_worker_stop(Oid subid, Oid relid); +extern void logicalrep_worker_wakeup(Oid subid, Oid relid); +extern void logicalrep_worker_wakeup_ptr(LogicalRepWorker *worker); + +extern int logicalrep_sync_worker_count(Oid subid); + +extern void ReplicationOriginNameForTablesync(Oid suboid, Oid relid, + char *originname, int szorgname); +extern char *LogicalRepSyncTableStart(XLogRecPtr *origin_startpos); + +extern bool AllTablesyncsReady(void); +extern void UpdateTwoPhaseState(Oid suboid, char new_state); + +extern void process_syncing_tables(XLogRecPtr current_lsn); +extern void invalidate_syncing_table_states(Datum arg, int cacheid, + uint32 hashvalue); + +static inline bool +am_tablesync_worker(void) +{ + return OidIsValid(MyLogicalRepWorker->relid); +} + +#endif /* WORKER_INTERNAL_H */ |