summaryrefslogtreecommitdiffstats
path: root/src/include/replication/output_plugin.h
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--src/include/replication/output_plugin.h248
1 files changed, 248 insertions, 0 deletions
diff --git a/src/include/replication/output_plugin.h b/src/include/replication/output_plugin.h
new file mode 100644
index 0000000..539dc8e
--- /dev/null
+++ b/src/include/replication/output_plugin.h
@@ -0,0 +1,248 @@
+/*-------------------------------------------------------------------------
+ * output_plugin.h
+ * PostgreSQL Logical Decode Plugin Interface
+ *
+ * Copyright (c) 2012-2022, PostgreSQL Global Development Group
+ *
+ *-------------------------------------------------------------------------
+ */
+#ifndef OUTPUT_PLUGIN_H
+#define OUTPUT_PLUGIN_H
+
+#include "replication/reorderbuffer.h"
+
+struct LogicalDecodingContext;
+struct OutputPluginCallbacks;
+
+typedef enum OutputPluginOutputType
+{
+ OUTPUT_PLUGIN_BINARY_OUTPUT,
+ OUTPUT_PLUGIN_TEXTUAL_OUTPUT
+} OutputPluginOutputType;
+
+/*
+ * Options set by the output plugin, in the startup callback.
+ */
+typedef struct OutputPluginOptions
+{
+ OutputPluginOutputType output_type;
+ bool receive_rewrites;
+} OutputPluginOptions;
+
+/*
+ * Type of the shared library symbol _PG_output_plugin_init that is looked up
+ * when loading an output plugin shared library.
+ */
+typedef void (*LogicalOutputPluginInit) (struct OutputPluginCallbacks *cb);
+
+/*
+ * Callback that gets called in a user-defined plugin. ctx->private_data can
+ * be set to some private data.
+ *
+ * "is_init" will be set to "true" if the decoding slot just got defined. When
+ * the same slot is used from there one, it will be "false".
+ */
+typedef void (*LogicalDecodeStartupCB) (struct LogicalDecodingContext *ctx,
+ OutputPluginOptions *options,
+ bool is_init);
+
+/*
+ * Callback called for every (explicit or implicit) BEGIN of a successful
+ * transaction.
+ */
+typedef void (*LogicalDecodeBeginCB) (struct LogicalDecodingContext *ctx,
+ ReorderBufferTXN *txn);
+
+/*
+ * Callback for every individual change in a successful transaction.
+ */
+typedef void (*LogicalDecodeChangeCB) (struct LogicalDecodingContext *ctx,
+ ReorderBufferTXN *txn,
+ Relation relation,
+ ReorderBufferChange *change);
+
+/*
+ * Callback for every TRUNCATE in a successful transaction.
+ */
+typedef void (*LogicalDecodeTruncateCB) (struct LogicalDecodingContext *ctx,
+ ReorderBufferTXN *txn,
+ int nrelations,
+ Relation relations[],
+ ReorderBufferChange *change);
+
+/*
+ * Called for every (explicit or implicit) COMMIT of a successful transaction.
+ */
+typedef void (*LogicalDecodeCommitCB) (struct LogicalDecodingContext *ctx,
+ ReorderBufferTXN *txn,
+ XLogRecPtr commit_lsn);
+
+/*
+ * Called for the generic logical decoding messages.
+ */
+typedef void (*LogicalDecodeMessageCB) (struct LogicalDecodingContext *ctx,
+ ReorderBufferTXN *txn,
+ XLogRecPtr message_lsn,
+ bool transactional,
+ const char *prefix,
+ Size message_size,
+ const char *message);
+
+/*
+ * Filter changes by origin.
+ */
+typedef bool (*LogicalDecodeFilterByOriginCB) (struct LogicalDecodingContext *ctx,
+ RepOriginId origin_id);
+
+/*
+ * Called to shutdown an output plugin.
+ */
+typedef void (*LogicalDecodeShutdownCB) (struct LogicalDecodingContext *ctx);
+
+/*
+ * Called before decoding of PREPARE record to decide whether this
+ * transaction should be decoded with separate calls to prepare and
+ * commit_prepared/rollback_prepared callbacks or wait till COMMIT PREPARED
+ * and sent as usual transaction.
+ */
+typedef bool (*LogicalDecodeFilterPrepareCB) (struct LogicalDecodingContext *ctx,
+ TransactionId xid,
+ const char *gid);
+
+/*
+ * Callback called for every BEGIN of a prepared trnsaction.
+ */
+typedef void (*LogicalDecodeBeginPrepareCB) (struct LogicalDecodingContext *ctx,
+ ReorderBufferTXN *txn);
+
+/*
+ * Called for PREPARE record unless it was filtered by filter_prepare()
+ * callback.
+ */
+typedef void (*LogicalDecodePrepareCB) (struct LogicalDecodingContext *ctx,
+ ReorderBufferTXN *txn,
+ XLogRecPtr prepare_lsn);
+
+/*
+ * Called for COMMIT PREPARED.
+ */
+typedef void (*LogicalDecodeCommitPreparedCB) (struct LogicalDecodingContext *ctx,
+ ReorderBufferTXN *txn,
+ XLogRecPtr commit_lsn);
+
+/*
+ * Called for ROLLBACK PREPARED.
+ */
+typedef void (*LogicalDecodeRollbackPreparedCB) (struct LogicalDecodingContext *ctx,
+ ReorderBufferTXN *txn,
+ XLogRecPtr prepare_end_lsn,
+ TimestampTz prepare_time);
+
+
+/*
+ * Called when starting to stream a block of changes from in-progress
+ * transaction (may be called repeatedly, if it's streamed in multiple
+ * chunks).
+ */
+typedef void (*LogicalDecodeStreamStartCB) (struct LogicalDecodingContext *ctx,
+ ReorderBufferTXN *txn);
+
+/*
+ * Called when stopping to stream a block of changes from in-progress
+ * transaction to a remote node (may be called repeatedly, if it's streamed
+ * in multiple chunks).
+ */
+typedef void (*LogicalDecodeStreamStopCB) (struct LogicalDecodingContext *ctx,
+ ReorderBufferTXN *txn);
+
+/*
+ * Called to discard changes streamed to remote node from in-progress
+ * transaction.
+ */
+typedef void (*LogicalDecodeStreamAbortCB) (struct LogicalDecodingContext *ctx,
+ ReorderBufferTXN *txn,
+ XLogRecPtr abort_lsn);
+
+/*
+ * Called to prepare changes streamed to remote node from in-progress
+ * transaction. This is called as part of a two-phase commit.
+ */
+typedef void (*LogicalDecodeStreamPrepareCB) (struct LogicalDecodingContext *ctx,
+ ReorderBufferTXN *txn,
+ XLogRecPtr prepare_lsn);
+
+/*
+ * Called to apply changes streamed to remote node from in-progress
+ * transaction.
+ */
+typedef void (*LogicalDecodeStreamCommitCB) (struct LogicalDecodingContext *ctx,
+ ReorderBufferTXN *txn,
+ XLogRecPtr commit_lsn);
+
+/*
+ * Callback for streaming individual changes from in-progress transactions.
+ */
+typedef void (*LogicalDecodeStreamChangeCB) (struct LogicalDecodingContext *ctx,
+ ReorderBufferTXN *txn,
+ Relation relation,
+ ReorderBufferChange *change);
+
+/*
+ * Callback for streaming generic logical decoding messages from in-progress
+ * transactions.
+ */
+typedef void (*LogicalDecodeStreamMessageCB) (struct LogicalDecodingContext *ctx,
+ ReorderBufferTXN *txn,
+ XLogRecPtr message_lsn,
+ bool transactional,
+ const char *prefix,
+ Size message_size,
+ const char *message);
+
+/*
+ * Callback for streaming truncates from in-progress transactions.
+ */
+typedef void (*LogicalDecodeStreamTruncateCB) (struct LogicalDecodingContext *ctx,
+ ReorderBufferTXN *txn,
+ int nrelations,
+ Relation relations[],
+ ReorderBufferChange *change);
+
+/*
+ * Output plugin callbacks
+ */
+typedef struct OutputPluginCallbacks
+{
+ LogicalDecodeStartupCB startup_cb;
+ LogicalDecodeBeginCB begin_cb;
+ LogicalDecodeChangeCB change_cb;
+ LogicalDecodeTruncateCB truncate_cb;
+ LogicalDecodeCommitCB commit_cb;
+ LogicalDecodeMessageCB message_cb;
+ LogicalDecodeFilterByOriginCB filter_by_origin_cb;
+ LogicalDecodeShutdownCB shutdown_cb;
+
+ /* streaming of changes at prepare time */
+ LogicalDecodeFilterPrepareCB filter_prepare_cb;
+ LogicalDecodeBeginPrepareCB begin_prepare_cb;
+ LogicalDecodePrepareCB prepare_cb;
+ LogicalDecodeCommitPreparedCB commit_prepared_cb;
+ LogicalDecodeRollbackPreparedCB rollback_prepared_cb;
+
+ /* streaming of changes */
+ LogicalDecodeStreamStartCB stream_start_cb;
+ LogicalDecodeStreamStopCB stream_stop_cb;
+ LogicalDecodeStreamAbortCB stream_abort_cb;
+ LogicalDecodeStreamPrepareCB stream_prepare_cb;
+ LogicalDecodeStreamCommitCB stream_commit_cb;
+ LogicalDecodeStreamChangeCB stream_change_cb;
+ LogicalDecodeStreamMessageCB stream_message_cb;
+ LogicalDecodeStreamTruncateCB stream_truncate_cb;
+} OutputPluginCallbacks;
+
+/* Functions in replication/logical/logical.c */
+extern void OutputPluginPrepareWrite(struct LogicalDecodingContext *ctx, bool last_write);
+extern void OutputPluginWrite(struct LogicalDecodingContext *ctx, bool last_write);
+extern void OutputPluginUpdateProgress(struct LogicalDecodingContext *ctx, bool skipped_xact);
+
+#endif /* OUTPUT_PLUGIN_H */