diff options
Diffstat (limited to '')
-rw-r--r-- | src/include/replication/logical.h | 147 |
1 files changed, 147 insertions, 0 deletions
diff --git a/src/include/replication/logical.h b/src/include/replication/logical.h new file mode 100644 index 0000000..edadacd --- /dev/null +++ b/src/include/replication/logical.h @@ -0,0 +1,147 @@ +/*------------------------------------------------------------------------- + * logical.h + * PostgreSQL logical decoding coordination + * + * Copyright (c) 2012-2022, PostgreSQL Global Development Group + * + *------------------------------------------------------------------------- + */ +#ifndef LOGICAL_H +#define LOGICAL_H + +#include "access/xlog.h" +#include "access/xlogreader.h" +#include "replication/output_plugin.h" +#include "replication/slot.h" + +struct LogicalDecodingContext; + +typedef void (*LogicalOutputPluginWriterWrite) (struct LogicalDecodingContext *lr, + XLogRecPtr Ptr, + TransactionId xid, + bool last_write +); + +typedef LogicalOutputPluginWriterWrite LogicalOutputPluginWriterPrepareWrite; + +typedef void (*LogicalOutputPluginWriterUpdateProgress) (struct LogicalDecodingContext *lr, + XLogRecPtr Ptr, + TransactionId xid, + bool skipped_xact +); + +typedef struct LogicalDecodingContext +{ + /* memory context this is all allocated in */ + MemoryContext context; + + /* The associated replication slot */ + ReplicationSlot *slot; + + /* infrastructure pieces for decoding */ + XLogReaderState *reader; + struct ReorderBuffer *reorder; + struct SnapBuild *snapshot_builder; + + /* + * Marks the logical decoding context as fast forward decoding one. Such a + * context does not have plugin loaded so most of the following properties + * are unused. + */ + bool fast_forward; + + OutputPluginCallbacks callbacks; + OutputPluginOptions options; + + /* + * User specified options + */ + List *output_plugin_options; + + /* + * User-Provided callback for writing/streaming out data. + */ + LogicalOutputPluginWriterPrepareWrite prepare_write; + LogicalOutputPluginWriterWrite write; + LogicalOutputPluginWriterUpdateProgress update_progress; + + /* + * Output buffer. + */ + StringInfo out; + + /* + * Private data pointer of the output plugin. + */ + void *output_plugin_private; + + /* + * Private data pointer for the data writer. + */ + void *output_writer_private; + + /* + * Does the output plugin support streaming, and is it enabled? + */ + bool streaming; + + /* + * Does the output plugin support two-phase decoding, and is it enabled? + */ + bool twophase; + + /* + * Is two-phase option given by output plugin? + * + * This flag indicates that the plugin passed in the two-phase option as + * part of the START_STREAMING command. We can't rely solely on the + * twophase flag which only tells whether the plugin provided all the + * necessary two-phase callbacks. + */ + bool twophase_opt_given; + + /* + * State for writing output. + */ + bool accept_writes; + bool prepared_write; + XLogRecPtr write_location; + TransactionId write_xid; + /* Are we processing the end LSN of a transaction? */ + bool end_xact; +} LogicalDecodingContext; + + +extern void CheckLogicalDecodingRequirements(void); + +extern LogicalDecodingContext *CreateInitDecodingContext(const char *plugin, + List *output_plugin_options, + bool need_full_snapshot, + XLogRecPtr restart_lsn, + XLogReaderRoutine *xl_routine, + LogicalOutputPluginWriterPrepareWrite prepare_write, + LogicalOutputPluginWriterWrite do_write, + LogicalOutputPluginWriterUpdateProgress update_progress); +extern LogicalDecodingContext *CreateDecodingContext(XLogRecPtr start_lsn, + List *output_plugin_options, + bool fast_forward, + XLogReaderRoutine *xl_routine, + LogicalOutputPluginWriterPrepareWrite prepare_write, + LogicalOutputPluginWriterWrite do_write, + LogicalOutputPluginWriterUpdateProgress update_progress); +extern void DecodingContextFindStartpoint(LogicalDecodingContext *ctx); +extern bool DecodingContextReady(LogicalDecodingContext *ctx); +extern void FreeDecodingContext(LogicalDecodingContext *ctx); + +extern void LogicalIncreaseXminForSlot(XLogRecPtr lsn, TransactionId xmin); +extern void LogicalIncreaseRestartDecodingForSlot(XLogRecPtr current_lsn, + XLogRecPtr restart_lsn); +extern void LogicalConfirmReceivedLocation(XLogRecPtr lsn); + +extern bool filter_prepare_cb_wrapper(LogicalDecodingContext *ctx, + TransactionId xid, const char *gid); +extern bool filter_by_origin_cb_wrapper(LogicalDecodingContext *ctx, RepOriginId origin_id); +extern void ResetLogicalStreamingState(void); +extern void UpdateDecodingStats(LogicalDecodingContext *ctx); + +#endif |