summaryrefslogtreecommitdiffstats
path: root/doc/src/sgml/logicaldecoding.sgml
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-05-04 12:15:05 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-05-04 12:15:05 +0000
commit46651ce6fe013220ed397add242004d764fc0153 (patch)
tree6e5299f990f88e60174a1d3ae6e48eedd2688b2b /doc/src/sgml/logicaldecoding.sgml
parentInitial commit. (diff)
downloadpostgresql-14-46651ce6fe013220ed397add242004d764fc0153.tar.xz
postgresql-14-46651ce6fe013220ed397add242004d764fc0153.zip
Adding upstream version 14.5.upstream/14.5upstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'doc/src/sgml/logicaldecoding.sgml')
-rw-r--r--doc/src/sgml/logicaldecoding.sgml1327
1 files changed, 1327 insertions, 0 deletions
diff --git a/doc/src/sgml/logicaldecoding.sgml b/doc/src/sgml/logicaldecoding.sgml
new file mode 100644
index 0000000..7805dd4
--- /dev/null
+++ b/doc/src/sgml/logicaldecoding.sgml
@@ -0,0 +1,1327 @@
+<!-- doc/src/sgml/logicaldecoding.sgml -->
+ <chapter id="logicaldecoding">
+ <title>Logical Decoding</title>
+ <indexterm zone="logicaldecoding">
+ <primary>Logical Decoding</primary>
+ </indexterm>
+ <para>
+ PostgreSQL provides infrastructure to stream the modifications performed
+ via SQL to external consumers. This functionality can be used for a
+ variety of purposes, including replication solutions and auditing.
+ </para>
+
+ <para>
+ Changes are sent out in streams identified by logical replication slots.
+ </para>
+
+ <para>
+ The format in which those changes are streamed is determined by the output
+ plugin used. An example plugin is provided in the PostgreSQL distribution.
+ Additional plugins can be
+ written to extend the choice of available formats without modifying any
+ core code.
+ Every output plugin has access to each individual new row produced
+ by <command>INSERT</command> and the new row version created
+ by <command>UPDATE</command>. Availability of old row versions for
+ <command>UPDATE</command> and <command>DELETE</command> depends on
+ the configured replica identity (see <xref linkend="sql-altertable-replica-identity"/>).
+ </para>
+
+ <para>
+ Changes can be consumed either using the streaming replication protocol
+ (see <xref linkend="protocol-replication"/> and
+ <xref linkend="logicaldecoding-walsender"/>), or by calling functions
+ via SQL (see <xref linkend="logicaldecoding-sql"/>). It is also possible
+ to write additional methods of consuming the output of a replication slot
+ without modifying core code
+ (see <xref linkend="logicaldecoding-writer"/>).
+ </para>
+
+ <sect1 id="logicaldecoding-example">
+ <title>Logical Decoding Examples</title>
+
+ <para>
+ The following example demonstrates controlling logical decoding using the
+ SQL interface.
+ </para>
+
+ <para>
+ Before you can use logical decoding, you must set
+ <xref linkend="guc-wal-level"/> to <literal>logical</literal> and
+ <xref linkend="guc-max-replication-slots"/> to at least 1. Then, you
+ should connect to the target database (in the example
+ below, <literal>postgres</literal>) as a superuser.
+ </para>
+
+<programlisting>
+postgres=# -- Create a slot named 'regression_slot' using the output plugin 'test_decoding'
+postgres=# SELECT * FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding', false, true);
+ slot_name | lsn
+-----------------+-----------
+ regression_slot | 0/16B1970
+(1 row)
+
+postgres=# SELECT slot_name, plugin, slot_type, database, active, restart_lsn, confirmed_flush_lsn FROM pg_replication_slots;
+ slot_name | plugin | slot_type | database | active | restart_lsn | confirmed_flush_lsn
+-----------------+---------------+-----------+----------+--------+-------------+-----------------
+ regression_slot | test_decoding | logical | postgres | f | 0/16A4408 | 0/16A4440
+(1 row)
+
+postgres=# -- There are no changes to see yet
+postgres=# SELECT * FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL);
+ lsn | xid | data
+-----+-----+------
+(0 rows)
+
+postgres=# CREATE TABLE data(id serial primary key, data text);
+CREATE TABLE
+
+postgres=# -- DDL isn't replicated, so all you'll see is the transaction
+postgres=# SELECT * FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL);
+ lsn | xid | data
+-----------+-------+--------------
+ 0/BA2DA58 | 10297 | BEGIN 10297
+ 0/BA5A5A0 | 10297 | COMMIT 10297
+(2 rows)
+
+postgres=# -- Once changes are read, they're consumed and not emitted
+postgres=# -- in a subsequent call:
+postgres=# SELECT * FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL);
+ lsn | xid | data
+-----+-----+------
+(0 rows)
+
+postgres=# BEGIN;
+postgres=*# INSERT INTO data(data) VALUES('1');
+postgres=*# INSERT INTO data(data) VALUES('2');
+postgres=*# COMMIT;
+
+postgres=# SELECT * FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL);
+ lsn | xid | data
+-----------+-------+---------------------------------------------------------
+ 0/BA5A688 | 10298 | BEGIN 10298
+ 0/BA5A6F0 | 10298 | table public.data: INSERT: id[integer]:1 data[text]:'1'
+ 0/BA5A7F8 | 10298 | table public.data: INSERT: id[integer]:2 data[text]:'2'
+ 0/BA5A8A8 | 10298 | COMMIT 10298
+(4 rows)
+
+postgres=# INSERT INTO data(data) VALUES('3');
+
+postgres=# -- You can also peek ahead in the change stream without consuming changes
+postgres=# SELECT * FROM pg_logical_slot_peek_changes('regression_slot', NULL, NULL);
+ lsn | xid | data
+-----------+-------+---------------------------------------------------------
+ 0/BA5A8E0 | 10299 | BEGIN 10299
+ 0/BA5A8E0 | 10299 | table public.data: INSERT: id[integer]:3 data[text]:'3'
+ 0/BA5A990 | 10299 | COMMIT 10299
+(3 rows)
+
+postgres=# -- The next call to pg_logical_slot_peek_changes() returns the same changes again
+postgres=# SELECT * FROM pg_logical_slot_peek_changes('regression_slot', NULL, NULL);
+ lsn | xid | data
+-----------+-------+---------------------------------------------------------
+ 0/BA5A8E0 | 10299 | BEGIN 10299
+ 0/BA5A8E0 | 10299 | table public.data: INSERT: id[integer]:3 data[text]:'3'
+ 0/BA5A990 | 10299 | COMMIT 10299
+(3 rows)
+
+postgres=# -- options can be passed to output plugin, to influence the formatting
+postgres=# SELECT * FROM pg_logical_slot_peek_changes('regression_slot', NULL, NULL, 'include-timestamp', 'on');
+ lsn | xid | data
+-----------+-------+---------------------------------------------------------
+ 0/BA5A8E0 | 10299 | BEGIN 10299
+ 0/BA5A8E0 | 10299 | table public.data: INSERT: id[integer]:3 data[text]:'3'
+ 0/BA5A990 | 10299 | COMMIT 10299 (at 2017-05-10 12:07:21.272494-04)
+(3 rows)
+
+postgres=# -- Remember to destroy a slot you no longer need to stop it consuming
+postgres=# -- server resources:
+postgres=# SELECT pg_drop_replication_slot('regression_slot');
+ pg_drop_replication_slot
+-----------------------
+
+(1 row)
+</programlisting>
+
+ <para>
+ The following example shows how logical decoding is controlled over the
+ streaming replication protocol, using the
+ program <xref linkend="app-pgrecvlogical"/> included in the PostgreSQL
+ distribution. This requires that client authentication is set up to allow
+ replication connections
+ (see <xref linkend="streaming-replication-authentication"/>) and
+ that <varname>max_wal_senders</varname> is set sufficiently high to allow
+ an additional connection.
+ </para>
+<programlisting>
+$ pg_recvlogical -d postgres --slot=test --create-slot
+$ pg_recvlogical -d postgres --slot=test --start -f -
+<keycombo action="simul"><keycap>Control</keycap><keycap>Z</keycap></keycombo>
+$ psql -d postgres -c "INSERT INTO data(data) VALUES('4');"
+$ fg
+BEGIN 693
+table public.data: INSERT: id[integer]:4 data[text]:'4'
+COMMIT 693
+<keycombo action="simul"><keycap>Control</keycap><keycap>C</keycap></keycombo>
+$ pg_recvlogical -d postgres --slot=test --drop-slot
+</programlisting>
+
+ <para>
+ The following example shows SQL interface that can be used to decode prepared
+ transactions. Before you use two-phase commit commands, you must set
+ <varname>max_prepared_transactions</varname> to at least 1. You must also have
+ set the two-phase parameter as 'true' while creating the slot using
+ <function>pg_create_logical_replication_slot</function>
+ Note that we will stream the entire transaction after the commit if it
+ is not already decoded.
+ </para>
+<programlisting>
+postgres=# BEGIN;
+postgres=*# INSERT INTO data(data) VALUES('5');
+postgres=*# PREPARE TRANSACTION 'test_prepared1';
+
+postgres=# SELECT * FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL);
+ lsn | xid | data
+-----------+-----+---------------------------------------------------------
+ 0/1689DC0 | 529 | BEGIN 529
+ 0/1689DC0 | 529 | table public.data: INSERT: id[integer]:3 data[text]:'5'
+ 0/1689FC0 | 529 | PREPARE TRANSACTION 'test_prepared1', txid 529
+(3 rows)
+
+postgres=# COMMIT PREPARED 'test_prepared1';
+postgres=# select * from pg_logical_slot_get_changes('regression_slot', NULL, NULL);
+ lsn | xid | data
+-----------+-----+--------------------------------------------
+ 0/168A060 | 529 | COMMIT PREPARED 'test_prepared1', txid 529
+(4 row)
+
+postgres=#-- you can also rollback a prepared transaction
+postgres=# BEGIN;
+postgres=*# INSERT INTO data(data) VALUES('6');
+postgres=*# PREPARE TRANSACTION 'test_prepared2';
+postgres=# select * from pg_logical_slot_get_changes('regression_slot', NULL, NULL);
+ lsn | xid | data
+-----------+-----+---------------------------------------------------------
+ 0/168A180 | 530 | BEGIN 530
+ 0/168A1E8 | 530 | table public.data: INSERT: id[integer]:4 data[text]:'6'
+ 0/168A430 | 530 | PREPARE TRANSACTION 'test_prepared2', txid 530
+(3 rows)
+
+postgres=# ROLLBACK PREPARED 'test_prepared2';
+postgres=# select * from pg_logical_slot_get_changes('regression_slot', NULL, NULL);
+ lsn | xid | data
+-----------+-----+----------------------------------------------
+ 0/168A4B8 | 530 | ROLLBACK PREPARED 'test_prepared2', txid 530
+(1 row)
+</programlisting>
+</sect1>
+
+ <sect1 id="logicaldecoding-explanation">
+ <title>Logical Decoding Concepts</title>
+ <sect2>
+ <title>Logical Decoding</title>
+
+ <indexterm>
+ <primary>Logical Decoding</primary>
+ </indexterm>
+
+ <para>
+ Logical decoding is the process of extracting all persistent changes
+ to a database's tables into a coherent, easy to understand format which
+ can be interpreted without detailed knowledge of the database's internal
+ state.
+ </para>
+
+ <para>
+ In <productname>PostgreSQL</productname>, logical decoding is implemented
+ by decoding the contents of the <link linkend="wal">write-ahead
+ log</link>, which describe changes on a storage level, into an
+ application-specific form such as a stream of tuples or SQL statements.
+ </para>
+ </sect2>
+
+ <sect2 id="logicaldecoding-replication-slots">
+ <title>Replication Slots</title>
+
+ <indexterm>
+ <primary>replication slot</primary>
+ <secondary>logical replication</secondary>
+ </indexterm>
+
+ <para>
+ In the context of logical replication, a slot represents a stream of
+ changes that can be replayed to a client in the order they were made on
+ the origin server. Each slot streams a sequence of changes from a single
+ database.
+ </para>
+
+ <note>
+ <para><productname>PostgreSQL</productname> also has streaming replication slots
+ (see <xref linkend="streaming-replication"/>), but they are used somewhat
+ differently there.
+ </para>
+ </note>
+
+ <para>
+ A replication slot has an identifier that is unique across all databases
+ in a <productname>PostgreSQL</productname> cluster. Slots persist
+ independently of the connection using them and are crash-safe.
+ </para>
+
+ <para>
+ A logical slot will emit each change just once in normal operation.
+ The current position of each slot is persisted only at checkpoint, so in
+ the case of a crash the slot may return to an earlier LSN, which will
+ then cause recent changes to be sent again when the server restarts.
+ Logical decoding clients are responsible for avoiding ill effects from
+ handling the same message more than once. Clients may wish to record
+ the last LSN they saw when decoding and skip over any repeated data or
+ (when using the replication protocol) request that decoding start from
+ that LSN rather than letting the server determine the start point.
+ The Replication Progress Tracking feature is designed for this purpose,
+ refer to <link linkend="replication-origins">replication origins</link>.
+ </para>
+
+ <para>
+ Multiple independent slots may exist for a single database. Each slot has
+ its own state, allowing different consumers to receive changes from
+ different points in the database change stream. For most applications, a
+ separate slot will be required for each consumer.
+ </para>
+
+ <para>
+ A logical replication slot knows nothing about the state of the
+ receiver(s). It's even possible to have multiple different receivers using
+ the same slot at different times; they'll just get the changes following
+ on from when the last receiver stopped consuming them. Only one receiver
+ may consume changes from a slot at any given time.
+ </para>
+
+ <caution>
+ <para>
+ Replication slots persist across crashes and know nothing about the state
+ of their consumer(s). They will prevent removal of required resources
+ even when there is no connection using them. This consumes storage
+ because neither required WAL nor required rows from the system catalogs
+ can be removed by <command>VACUUM</command> as long as they are required by a replication
+ slot. In extreme cases this could cause the database to shut down to prevent
+ transaction ID wraparound (see <xref linkend="vacuum-for-wraparound"/>).
+ So if a slot is no longer required it should be dropped.
+ </para>
+ </caution>
+ </sect2>
+
+ <sect2>
+ <title>Output Plugins</title>
+ <para>
+ Output plugins transform the data from the write-ahead log's internal
+ representation into the format the consumer of a replication slot desires.
+ </para>
+ </sect2>
+
+ <sect2>
+ <title>Exported Snapshots</title>
+ <para>
+ When a new replication slot is created using the streaming replication
+ interface (see <xref linkend="protocol-replication-create-slot"/>), a
+ snapshot is exported
+ (see <xref linkend="functions-snapshot-synchronization"/>), which will show
+ exactly the state of the database after which all changes will be
+ included in the change stream. This can be used to create a new replica by
+ using <link linkend="sql-set-transaction"><literal>SET TRANSACTION
+ SNAPSHOT</literal></link> to read the state of the database at the moment
+ the slot was created. This transaction can then be used to dump the
+ database's state at that point in time, which afterwards can be updated
+ using the slot's contents without losing any changes.
+ </para>
+ <para>
+ Creation of a snapshot is not always possible. In particular, it will
+ fail when connected to a hot standby. Applications that do not require
+ snapshot export may suppress it with the <literal>NOEXPORT_SNAPSHOT</literal>
+ option.
+ </para>
+ </sect2>
+ </sect1>
+
+ <sect1 id="logicaldecoding-walsender">
+ <title>Streaming Replication Protocol Interface</title>
+
+ <para>
+ The commands
+ <itemizedlist>
+ <listitem>
+ <para><literal>CREATE_REPLICATION_SLOT <replaceable>slot_name</replaceable> LOGICAL <replaceable>output_plugin</replaceable></literal></para>
+ </listitem>
+
+ <listitem>
+ <para><literal>DROP_REPLICATION_SLOT <replaceable>slot_name</replaceable></literal> <optional> <literal>WAIT</literal> </optional></para>
+ </listitem>
+
+ <listitem>
+ <para><literal>START_REPLICATION SLOT <replaceable>slot_name</replaceable> LOGICAL ...</literal></para>
+ </listitem>
+ </itemizedlist>
+ are used to create, drop, and stream changes from a replication
+ slot, respectively. These commands are only available over a replication
+ connection; they cannot be used via SQL.
+ See <xref linkend="protocol-replication"/> for details on these commands.
+ </para>
+
+ <para>
+ The command <xref linkend="app-pgrecvlogical"/> can be used to control
+ logical decoding over a streaming replication connection. (It uses
+ these commands internally.)
+ </para>
+ </sect1>
+
+ <sect1 id="logicaldecoding-sql">
+ <title>Logical Decoding <acronym>SQL</acronym> Interface</title>
+
+ <para>
+ See <xref linkend="functions-replication"/> for detailed documentation on
+ the SQL-level API for interacting with logical decoding.
+ </para>
+
+ <para>
+ Synchronous replication (see <xref linkend="synchronous-replication"/>) is
+ only supported on replication slots used over the streaming replication interface. The
+ function interface and additional, non-core interfaces do not support
+ synchronous replication.
+ </para>
+ </sect1>
+
+ <sect1 id="logicaldecoding-catalogs">
+ <title>System Catalogs Related to Logical Decoding</title>
+
+ <para>
+ The <link linkend="view-pg-replication-slots"><structname>pg_replication_slots</structname></link>
+ view and the
+ <link linkend="monitoring-pg-stat-replication-view">
+ <structname>pg_stat_replication</structname></link>
+ view provide information about the current state of replication slots and
+ streaming replication connections respectively. These views apply to both physical and
+ logical replication. The
+ <link linkend="monitoring-pg-stat-replication-slots-view">
+ <structname>pg_stat_replication_slots</structname></link>
+ view provides statistics information about the logical replication slots.
+ </para>
+ </sect1>
+
+ <sect1 id="logicaldecoding-output-plugin">
+ <title>Logical Decoding Output Plugins</title>
+ <para>
+ An example output plugin can be found in the
+ <link linkend="test-decoding">
+ <filename>contrib/test_decoding</filename>
+ </link>
+ subdirectory of the PostgreSQL source tree.
+ </para>
+ <sect2 id="logicaldecoding-output-init">
+ <title>Initialization Function</title>
+ <indexterm zone="logicaldecoding-output-init">
+ <primary>_PG_output_plugin_init</primary>
+ </indexterm>
+ <para>
+ An output plugin is loaded by dynamically loading a shared library with
+ the output plugin's name as the library base name. The normal library
+ search path is used to locate the library. To provide the required output
+ plugin callbacks and to indicate that the library is actually an output
+ plugin it needs to provide a function named
+ <function>_PG_output_plugin_init</function>. This function is passed a
+ struct that needs to be filled with the callback function pointers for
+ individual actions.
+<programlisting>
+typedef struct OutputPluginCallbacks
+{
+ LogicalDecodeStartupCB startup_cb;
+ LogicalDecodeBeginCB begin_cb;
+ LogicalDecodeChangeCB change_cb;
+ LogicalDecodeTruncateCB truncate_cb;
+ LogicalDecodeCommitCB commit_cb;
+ LogicalDecodeMessageCB message_cb;
+ LogicalDecodeFilterByOriginCB filter_by_origin_cb;
+ LogicalDecodeShutdownCB shutdown_cb;
+ LogicalDecodeFilterPrepareCB filter_prepare_cb;
+ LogicalDecodeBeginPrepareCB begin_prepare_cb;
+ LogicalDecodePrepareCB prepare_cb;
+ LogicalDecodeCommitPreparedCB commit_prepared_cb;
+ LogicalDecodeRollbackPreparedCB rollback_prepared_cb;
+ LogicalDecodeStreamStartCB stream_start_cb;
+ LogicalDecodeStreamStopCB stream_stop_cb;
+ LogicalDecodeStreamAbortCB stream_abort_cb;
+ LogicalDecodeStreamPrepareCB stream_prepare_cb;
+ LogicalDecodeStreamCommitCB stream_commit_cb;
+ LogicalDecodeStreamChangeCB stream_change_cb;
+ LogicalDecodeStreamMessageCB stream_message_cb;
+ LogicalDecodeStreamTruncateCB stream_truncate_cb;
+} OutputPluginCallbacks;
+
+typedef void (*LogicalOutputPluginInit) (struct OutputPluginCallbacks *cb);
+</programlisting>
+ The <function>begin_cb</function>, <function>change_cb</function>
+ and <function>commit_cb</function> callbacks are required,
+ while <function>startup_cb</function>,
+ <function>filter_by_origin_cb</function>, <function>truncate_cb</function>,
+ and <function>shutdown_cb</function> are optional.
+ If <function>truncate_cb</function> is not set but a
+ <command>TRUNCATE</command> is to be decoded, the action will be ignored.
+ </para>
+
+ <para>
+ An output plugin may also define functions to support streaming of large,
+ in-progress transactions. The <function>stream_start_cb</function>,
+ <function>stream_stop_cb</function>, <function>stream_abort_cb</function>,
+ <function>stream_commit_cb</function>, <function>stream_change_cb</function>,
+ and <function>stream_prepare_cb</function>
+ are required, while <function>stream_message_cb</function> and
+ <function>stream_truncate_cb</function> are optional.
+ </para>
+
+ <para>
+ An output plugin may also define functions to support two-phase commits,
+ which allows actions to be decoded on the <command>PREPARE TRANSACTION</command>.
+ The <function>begin_prepare_cb</function>, <function>prepare_cb</function>,
+ <function>stream_prepare_cb</function>,
+ <function>commit_prepared_cb</function> and <function>rollback_prepared_cb</function>
+ callbacks are required, while <function>filter_prepare_cb</function> is optional.
+ </para>
+ </sect2>
+
+ <sect2 id="logicaldecoding-capabilities">
+ <title>Capabilities</title>
+
+ <para>
+ To decode, format and output changes, output plugins can use most of the
+ backend's normal infrastructure, including calling output functions. Read
+ only access to relations is permitted as long as only relations are
+ accessed that either have been created by <command>initdb</command> in
+ the <literal>pg_catalog</literal> schema, or have been marked as user
+ provided catalog tables using
+<programlisting>
+ALTER TABLE user_catalog_table SET (user_catalog_table = true);
+CREATE TABLE another_catalog_table(data text) WITH (user_catalog_table = true);
+</programlisting>
+ Note that access to user catalog tables or regular system catalog tables
+ in the output plugins has to be done via the <literal>systable_*</literal>
+ scan APIs only. Access via the <literal>heap_*</literal> scan APIs will
+ error out. Additionally, any actions leading to transaction ID assignment
+ are prohibited. That, among others, includes writing to tables, performing
+ DDL changes, and calling <literal>pg_current_xact_id()</literal>.
+ </para>
+ </sect2>
+
+ <sect2 id="logicaldecoding-output-mode">
+ <title>Output Modes</title>
+
+ <para>
+ Output plugin callbacks can pass data to the consumer in nearly arbitrary
+ formats. For some use cases, like viewing the changes via SQL, returning
+ data in a data type that can contain arbitrary data (e.g., <type>bytea</type>) is
+ cumbersome. If the output plugin only outputs textual data in the
+ server's encoding, it can declare that by
+ setting <literal>OutputPluginOptions.output_type</literal>
+ to <literal>OUTPUT_PLUGIN_TEXTUAL_OUTPUT</literal> instead
+ of <literal>OUTPUT_PLUGIN_BINARY_OUTPUT</literal> in
+ the <link linkend="logicaldecoding-output-plugin-startup">startup
+ callback</link>. In that case, all the data has to be in the server's encoding
+ so that a <type>text</type> datum can contain it. This is checked in assertion-enabled
+ builds.
+ </para>
+ </sect2>
+
+ <sect2 id="logicaldecoding-output-plugin-callbacks">
+ <title>Output Plugin Callbacks</title>
+
+ <para>
+ An output plugin gets notified about changes that are happening via
+ various callbacks it needs to provide.
+ </para>
+
+ <para>
+ Concurrent transactions are decoded in commit order, and only changes
+ belonging to a specific transaction are decoded between
+ the <literal>begin</literal> and <literal>commit</literal>
+ callbacks. Transactions that were rolled back explicitly or implicitly
+ never get
+ decoded. Successful savepoints are
+ folded into the transaction containing them in the order they were
+ executed within that transaction. A transaction that is prepared for
+ a two-phase commit using <command>PREPARE TRANSACTION</command> will
+ also be decoded if the output plugin callbacks needed for decoding
+ them are provided. It is possible that the current prepared transaction
+ which is being decoded is aborted concurrently via a
+ <command>ROLLBACK PREPARED</command> command. In that case, the logical
+ decoding of this transaction will be aborted too. All the changes of such
+ a transaction are skipped once the abort is detected and the
+ <function>prepare_cb</function> callback is invoked. Thus even in case of
+ a concurrent abort, enough information is provided to the output plugin
+ for it to properly deal with <command>ROLLBACK PREPARED</command> once
+ that is decoded.
+ </para>
+
+ <note>
+ <para>
+ Only transactions that have already safely been flushed to disk will be
+ decoded. That can lead to a <command>COMMIT</command> not immediately being decoded in a
+ directly following <literal>pg_logical_slot_get_changes()</literal>
+ when <varname>synchronous_commit</varname> is set
+ to <literal>off</literal>.
+ </para>
+ </note>
+
+ <sect3 id="logicaldecoding-output-plugin-startup">
+ <title>Startup Callback</title>
+ <para>
+ The optional <function>startup_cb</function> callback is called whenever
+ a replication slot is created or asked to stream changes, independent
+ of the number of changes that are ready to be put out.
+<programlisting>
+typedef void (*LogicalDecodeStartupCB) (struct LogicalDecodingContext *ctx,
+ OutputPluginOptions *options,
+ bool is_init);
+</programlisting>
+ The <literal>is_init</literal> parameter will be true when the
+ replication slot is being created and false
+ otherwise. <parameter>options</parameter> points to a struct of options
+ that output plugins can set:
+<programlisting>
+typedef struct OutputPluginOptions
+{
+ OutputPluginOutputType output_type;
+ bool receive_rewrites;
+} OutputPluginOptions;
+</programlisting>
+ <literal>output_type</literal> has to either be set to
+ <literal>OUTPUT_PLUGIN_TEXTUAL_OUTPUT</literal>
+ or <literal>OUTPUT_PLUGIN_BINARY_OUTPUT</literal>. See also
+ <xref linkend="logicaldecoding-output-mode"/>.
+ If <literal>receive_rewrites</literal> is true, the output plugin will
+ also be called for changes made by heap rewrites during certain DDL
+ operations. These are of interest to plugins that handle DDL
+ replication, but they require special handling.
+ </para>
+
+ <para>
+ The startup callback should validate the options present in
+ <literal>ctx-&gt;output_plugin_options</literal>. If the output plugin
+ needs to have a state, it can
+ use <literal>ctx-&gt;output_plugin_private</literal> to store it.
+ </para>
+ </sect3>
+
+ <sect3 id="logicaldecoding-output-plugin-shutdown">
+ <title>Shutdown Callback</title>
+
+ <para>
+ The optional <function>shutdown_cb</function> callback is called
+ whenever a formerly active replication slot is not used anymore and can
+ be used to deallocate resources private to the output plugin. The slot
+ isn't necessarily being dropped, streaming is just being stopped.
+<programlisting>
+typedef void (*LogicalDecodeShutdownCB) (struct LogicalDecodingContext *ctx);
+</programlisting>
+ </para>
+ </sect3>
+
+ <sect3 id="logicaldecoding-output-plugin-begin">
+ <title>Transaction Begin Callback</title>
+
+ <para>
+ The required <function>begin_cb</function> callback is called whenever a
+ start of a committed transaction has been decoded. Aborted transactions
+ and their contents never get decoded.
+<programlisting>
+typedef void (*LogicalDecodeBeginCB) (struct LogicalDecodingContext *ctx,
+ ReorderBufferTXN *txn);
+</programlisting>
+ The <parameter>txn</parameter> parameter contains meta information about
+ the transaction, like the time stamp at which it has been committed and
+ its XID.
+ </para>
+ </sect3>
+
+ <sect3 id="logicaldecoding-output-plugin-commit">
+ <title>Transaction End Callback</title>
+
+ <para>
+ The required <function>commit_cb</function> callback is called whenever
+ a transaction commit has been
+ decoded. The <function>change_cb</function> callbacks for all modified
+ rows will have been called before this, if there have been any modified
+ rows.
+<programlisting>
+typedef void (*LogicalDecodeCommitCB) (struct LogicalDecodingContext *ctx,
+ ReorderBufferTXN *txn,
+ XLogRecPtr commit_lsn);
+</programlisting>
+ </para>
+ </sect3>
+
+ <sect3 id="logicaldecoding-output-plugin-change">
+ <title>Change Callback</title>
+
+ <para>
+ The required <function>change_cb</function> callback is called for every
+ individual row modification inside a transaction, may it be
+ an <command>INSERT</command>, <command>UPDATE</command>,
+ or <command>DELETE</command>. Even if the original command modified
+ several rows at once the callback will be called individually for each
+ row. The <function>change_cb</function> callback may access system or
+ user catalog tables to aid in the process of outputting the row
+ modification details. In case of decoding a prepared (but yet
+ uncommitted) transaction or decoding of an uncommitted transaction, this
+ change callback might also error out due to simultaneous rollback of
+ this very same transaction. In that case, the logical decoding of this
+ aborted transaction is stopped gracefully.
+<programlisting>
+typedef void (*LogicalDecodeChangeCB) (struct LogicalDecodingContext *ctx,
+ ReorderBufferTXN *txn,
+ Relation relation,
+ ReorderBufferChange *change);
+</programlisting>
+ The <parameter>ctx</parameter> and <parameter>txn</parameter> parameters
+ have the same contents as for the <function>begin_cb</function>
+ and <function>commit_cb</function> callbacks, but additionally the
+ relation descriptor <parameter>relation</parameter> points to the
+ relation the row belongs to and a struct
+ <parameter>change</parameter> describing the row modification are passed
+ in.
+ </para>
+
+ <note>
+ <para>
+ Only changes in user defined tables that are not unlogged
+ (see <xref linkend="sql-createtable-unlogged"/>) and not temporary
+ (see <xref linkend="sql-createtable-temporary"/>) can be extracted using
+ logical decoding.
+ </para>
+ </note>
+ </sect3>
+
+ <sect3 id="logicaldecoding-output-plugin-truncate">
+ <title>Truncate Callback</title>
+
+ <para>
+ The <function>truncate_cb</function> callback is called for a
+ <command>TRUNCATE</command> command.
+<programlisting>
+typedef void (*LogicalDecodeTruncateCB) (struct LogicalDecodingContext *ctx,
+ ReorderBufferTXN *txn,
+ int nrelations,
+ Relation relations[],
+ ReorderBufferChange *change);
+</programlisting>
+ The parameters are analogous to the <function>change_cb</function>
+ callback. However, because <command>TRUNCATE</command> actions on
+ tables connected by foreign keys need to be executed together, this
+ callback receives an array of relations instead of just a single one.
+ See the description of the <xref linkend="sql-truncate"/> statement for
+ details.
+ </para>
+ </sect3>
+
+ <sect3 id="logicaldecoding-output-plugin-filter-origin">
+ <title>Origin Filter Callback</title>
+
+ <para>
+ The optional <function>filter_by_origin_cb</function> callback
+ is called to determine whether data that has been replayed
+ from <parameter>origin_id</parameter> is of interest to the
+ output plugin.
+<programlisting>
+typedef bool (*LogicalDecodeFilterByOriginCB) (struct LogicalDecodingContext *ctx,
+ RepOriginId origin_id);
+</programlisting>
+ The <parameter>ctx</parameter> parameter has the same contents
+ as for the other callbacks. No information but the origin is
+ available. To signal that changes originating on the passed in
+ node are irrelevant, return true, causing them to be filtered
+ away; false otherwise. The other callbacks will not be called
+ for transactions and changes that have been filtered away.
+ </para>
+ <para>
+ This is useful when implementing cascading or multidirectional
+ replication solutions. Filtering by the origin allows to
+ prevent replicating the same changes back and forth in such
+ setups. While transactions and changes also carry information
+ about the origin, filtering via this callback is noticeably
+ more efficient.
+ </para>
+ </sect3>
+
+ <sect3 id="logicaldecoding-output-plugin-message">
+ <title>Generic Message Callback</title>
+
+ <para>
+ The optional <function>message_cb</function> callback is called whenever
+ a logical decoding message has been decoded.
+<programlisting>
+typedef void (*LogicalDecodeMessageCB) (struct LogicalDecodingContext *ctx,
+ ReorderBufferTXN *txn,
+ XLogRecPtr message_lsn,
+ bool transactional,
+ const char *prefix,
+ Size message_size,
+ const char *message);
+</programlisting>
+ The <parameter>txn</parameter> parameter contains meta information about
+ the transaction, like the time stamp at which it has been committed and
+ its XID. Note however that it can be NULL when the message is
+ non-transactional and the XID was not assigned yet in the transaction
+ which logged the message. The <parameter>lsn</parameter> has WAL
+ location of the message. The <parameter>transactional</parameter> says
+ if the message was sent as transactional or not. Similar to the change
+ callback, in case of decoding a prepared (but yet uncommitted)
+ transaction or decoding of an uncommitted transaction, this message
+ callback might also error out due to simultaneous rollback of
+ this very same transaction. In that case, the logical decoding of this
+ aborted transaction is stopped gracefully.
+
+ The <parameter>prefix</parameter> is arbitrary null-terminated prefix
+ which can be used for identifying interesting messages for the current
+ plugin. And finally the <parameter>message</parameter> parameter holds
+ the actual message of <parameter>message_size</parameter> size.
+ </para>
+ <para>
+ Extra care should be taken to ensure that the prefix the output plugin
+ considers interesting is unique. Using name of the extension or the
+ output plugin itself is often a good choice.
+ </para>
+ </sect3>
+
+ <sect3 id="logicaldecoding-output-plugin-filter-prepare">
+ <title>Prepare Filter Callback</title>
+
+ <para>
+ The optional <function>filter_prepare_cb</function> callback
+ is called to determine whether data that is part of the current
+ two-phase commit transaction should be considered for decoding
+ at this prepare stage or later as a regular one-phase transaction at
+ <command>COMMIT PREPARED</command> time. To signal that
+ decoding should be skipped, return <literal>true</literal>;
+ <literal>false</literal> otherwise. When the callback is not
+ defined, <literal>false</literal> is assumed (i.e. no filtering, all
+ transactions using two-phase commit are decoded in two phases as well).
+<programlisting>
+typedef bool (*LogicalDecodeFilterPrepareCB) (struct LogicalDecodingContext *ctx,
+ TransactionId xid,
+ const char *gid);
+</programlisting>
+ The <parameter>ctx</parameter> parameter has the same contents as for
+ the other callbacks. The parameters <parameter>xid</parameter>
+ and <parameter>gid</parameter> provide two different ways to identify
+ the transaction. The later <command>COMMIT PREPARED</command> or
+ <command>ROLLBACK PREPARED</command> carries both identifiers,
+ providing an output plugin the choice of what to use.
+ </para>
+ <para>
+ The callback may be invoked multiple times per transaction to decode
+ and must provide the same static answer for a given pair of
+ <parameter>xid</parameter> and <parameter>gid</parameter> every time
+ it is called.
+ </para>
+ </sect3>
+
+ <sect3 id="logicaldecoding-output-plugin-begin-prepare">
+ <title>Transaction Begin Prepare Callback</title>
+
+ <para>
+ The required <function>begin_prepare_cb</function> callback is called
+ whenever the start of a prepared transaction has been decoded. The
+ <parameter>gid</parameter> field, which is part of the
+ <parameter>txn</parameter> parameter, can be used in this callback to
+ check if the plugin has already received this <command>PREPARE</command>
+ in which case it can either error out or skip the remaining changes of
+ the transaction.
+<programlisting>
+typedef void (*LogicalDecodeBeginPrepareCB) (struct LogicalDecodingContext *ctx,
+ ReorderBufferTXN *txn);
+</programlisting>
+ </para>
+ </sect3>
+
+ <sect3 id="logicaldecoding-output-plugin-prepare">
+ <title>Transaction Prepare Callback</title>
+
+ <para>
+ The required <function>prepare_cb</function> callback is called whenever
+ a transaction which is prepared for two-phase commit has been
+ decoded. The <function>change_cb</function> callback for all modified
+ rows will have been called before this, if there have been any modified
+ rows. The <parameter>gid</parameter> field, which is part of the
+ <parameter>txn</parameter> parameter, can be used in this callback.
+<programlisting>
+typedef void (*LogicalDecodePrepareCB) (struct LogicalDecodingContext *ctx,
+ ReorderBufferTXN *txn,
+ XLogRecPtr prepare_lsn);
+</programlisting>
+ </para>
+ </sect3>
+
+ <sect3 id="logicaldecoding-output-plugin-commit-prepared">
+ <title>Transaction Commit Prepared Callback</title>
+
+ <para>
+ The required <function>commit_prepared_cb</function> callback is called
+ whenever a transaction <command>COMMIT PREPARED</command> has been decoded.
+ The <parameter>gid</parameter> field, which is part of the
+ <parameter>txn</parameter> parameter, can be used in this callback.
+<programlisting>
+typedef void (*LogicalDecodeCommitPreparedCB) (struct LogicalDecodingContext *ctx,
+ ReorderBufferTXN *txn,
+ XLogRecPtr commit_lsn);
+</programlisting>
+ </para>
+ </sect3>
+
+ <sect3 id="logicaldecoding-output-plugin-rollback-prepared">
+ <title>Transaction Rollback Prepared Callback</title>
+
+ <para>
+ The required <function>rollback_prepared_cb</function> callback is called
+ whenever a transaction <command>ROLLBACK PREPARED</command> has been
+ decoded. The <parameter>gid</parameter> field, which is part of the
+ <parameter>txn</parameter> parameter, can be used in this callback. The
+ parameters <parameter>prepare_end_lsn</parameter> and
+ <parameter>prepare_time</parameter> can be used to check if the plugin
+ has received this <command>PREPARE TRANSACTION</command> in which case
+ it can apply the rollback, otherwise, it can skip the rollback operation. The
+ <parameter>gid</parameter> alone is not sufficient because the downstream
+ node can have a prepared transaction with same identifier.
+<programlisting>
+typedef void (*LogicalDecodeRollbackPreparedCB) (struct LogicalDecodingContext *ctx,
+ ReorderBufferTXN *txn,
+ XLogRecPtr prepare_end_lsn,
+ TimestampTz prepare_time);
+</programlisting>
+ </para>
+ </sect3>
+
+ <sect3 id="logicaldecoding-output-plugin-stream-start">
+ <title>Stream Start Callback</title>
+ <para>
+ The <function>stream_start_cb</function> callback is called when opening
+ a block of streamed changes from an in-progress transaction.
+<programlisting>
+typedef void (*LogicalDecodeStreamStartCB) (struct LogicalDecodingContext *ctx,
+ ReorderBufferTXN *txn);
+</programlisting>
+ </para>
+ </sect3>
+
+ <sect3 id="logicaldecoding-output-plugin-stream-stop">
+ <title>Stream Stop Callback</title>
+ <para>
+ The <function>stream_stop_cb</function> callback is called when closing
+ a block of streamed changes from an in-progress transaction.
+<programlisting>
+typedef void (*LogicalDecodeStreamStopCB) (struct LogicalDecodingContext *ctx,
+ ReorderBufferTXN *txn);
+</programlisting>
+ </para>
+ </sect3>
+
+ <sect3 id="logicaldecoding-output-plugin-stream-abort">
+ <title>Stream Abort Callback</title>
+ <para>
+ The <function>stream_abort_cb</function> callback is called to abort
+ a previously streamed transaction.
+<programlisting>
+typedef void (*LogicalDecodeStreamAbortCB) (struct LogicalDecodingContext *ctx,
+ ReorderBufferTXN *txn,
+ XLogRecPtr abort_lsn);
+</programlisting>
+ </para>
+ </sect3>
+
+ <sect3 id="logicaldecoding-output-plugin-stream-prepare">
+ <title>Stream Prepare Callback</title>
+ <para>
+ The <function>stream_prepare_cb</function> callback is called to prepare
+ a previously streamed transaction as part of a two-phase commit.
+<programlisting>
+typedef void (*LogicalDecodeStreamPrepareCB) (struct LogicalDecodingContext *ctx,
+ ReorderBufferTXN *txn,
+ XLogRecPtr prepare_lsn);
+</programlisting>
+ </para>
+ </sect3>
+
+ <sect3 id="logicaldecoding-output-plugin-stream-commit">
+ <title>Stream Commit Callback</title>
+ <para>
+ The <function>stream_commit_cb</function> callback is called to commit
+ a previously streamed transaction.
+<programlisting>
+typedef void (*LogicalDecodeStreamCommitCB) (struct LogicalDecodingContext *ctx,
+ ReorderBufferTXN *txn,
+ XLogRecPtr commit_lsn);
+</programlisting>
+ </para>
+ </sect3>
+
+ <sect3 id="logicaldecoding-output-plugin-stream-change">
+ <title>Stream Change Callback</title>
+ <para>
+ The <function>stream_change_cb</function> callback is called when sending
+ a change in a block of streamed changes (demarcated by
+ <function>stream_start_cb</function> and <function>stream_stop_cb</function> calls).
+ The actual changes are not displayed as the transaction can abort at a later
+ point in time and we don't decode changes for aborted transactions.
+<programlisting>
+typedef void (*LogicalDecodeStreamChangeCB) (struct LogicalDecodingContext *ctx,
+ ReorderBufferTXN *txn,
+ Relation relation,
+ ReorderBufferChange *change);
+</programlisting>
+ </para>
+ </sect3>
+
+ <sect3 id="logicaldecoding-output-plugin-stream-message">
+ <title>Stream Message Callback</title>
+ <para>
+ The <function>stream_message_cb</function> callback is called when sending
+ a generic message in a block of streamed changes (demarcated by
+ <function>stream_start_cb</function> and <function>stream_stop_cb</function> calls).
+ The message contents for transactional messages are not displayed as the transaction
+ can abort at a later point in time and we don't decode changes for aborted
+ transactions.
+<programlisting>
+typedef void (*LogicalDecodeStreamMessageCB) (struct LogicalDecodingContext *ctx,
+ ReorderBufferTXN *txn,
+ XLogRecPtr message_lsn,
+ bool transactional,
+ const char *prefix,
+ Size message_size,
+ const char *message);
+</programlisting>
+ </para>
+ </sect3>
+
+ <sect3 id="logicaldecoding-output-plugin-stream-truncate">
+ <title>Stream Truncate Callback</title>
+ <para>
+ The <function>stream_truncate_cb</function> callback is called for a
+ <command>TRUNCATE</command> command in a block of streamed changes
+ (demarcated by <function>stream_start_cb</function> and
+ <function>stream_stop_cb</function> calls).
+<programlisting>
+typedef void (*LogicalDecodeStreamTruncateCB) (struct LogicalDecodingContext *ctx,
+ ReorderBufferTXN *txn,
+ int nrelations,
+ Relation relations[],
+ ReorderBufferChange *change);
+</programlisting>
+ The parameters are analogous to the <function>stream_change_cb</function>
+ callback. However, because <command>TRUNCATE</command> actions on
+ tables connected by foreign keys need to be executed together, this
+ callback receives an array of relations instead of just a single one.
+ See the description of the <xref linkend="sql-truncate"/> statement for
+ details.
+ </para>
+ </sect3>
+
+ </sect2>
+
+ <sect2 id="logicaldecoding-output-plugin-output">
+ <title>Functions for Producing Output</title>
+
+ <para>
+ To actually produce output, output plugins can write data to
+ the <literal>StringInfo</literal> output buffer
+ in <literal>ctx-&gt;out</literal> when inside
+ the <function>begin_cb</function>, <function>commit_cb</function>,
+ or <function>change_cb</function> callbacks. Before writing to the output
+ buffer, <function>OutputPluginPrepareWrite(ctx, last_write)</function> has
+ to be called, and after finishing writing to the
+ buffer, <function>OutputPluginWrite(ctx, last_write)</function> has to be
+ called to perform the write. The <parameter>last_write</parameter>
+ indicates whether a particular write was the callback's last write.
+ </para>
+
+ <para>
+ The following example shows how to output data to the consumer of an
+ output plugin:
+<programlisting>
+OutputPluginPrepareWrite(ctx, true);
+appendStringInfo(ctx->out, "BEGIN %u", txn->xid);
+OutputPluginWrite(ctx, true);
+</programlisting>
+ </para>
+ </sect2>
+ </sect1>
+
+ <sect1 id="logicaldecoding-writer">
+ <title>Logical Decoding Output Writers</title>
+
+ <para>
+ It is possible to add more output methods for logical decoding.
+ For details, see
+ <filename>src/backend/replication/logical/logicalfuncs.c</filename>.
+ Essentially, three functions need to be provided: one to read WAL, one to
+ prepare writing output, and one to write the output
+ (see <xref linkend="logicaldecoding-output-plugin-output"/>).
+ </para>
+ </sect1>
+
+ <sect1 id="logicaldecoding-synchronous">
+ <title>Synchronous Replication Support for Logical Decoding</title>
+ <sect2>
+ <title>Overview</title>
+
+ <para>
+ Logical decoding can be used to build
+ <link linkend="synchronous-replication">synchronous
+ replication</link> solutions with the same user interface as synchronous
+ replication for <link linkend="streaming-replication">streaming
+ replication</link>. To do this, the streaming replication interface
+ (see <xref linkend="logicaldecoding-walsender"/>) must be used to stream out
+ data. Clients have to send <literal>Standby status update (F)</literal>
+ (see <xref linkend="protocol-replication"/>) messages, just like streaming
+ replication clients do.
+ </para>
+
+ <note>
+ <para>
+ A synchronous replica receiving changes via logical decoding will work in
+ the scope of a single database. Since, in contrast to
+ that, <parameter>synchronous_standby_names</parameter> currently is
+ server wide, this means this technique will not work properly if more
+ than one database is actively used.
+ </para>
+ </note>
+ </sect2>
+
+ <sect2 id="logicaldecoding-synchronous-caveats">
+ <title>Caveats</title>
+
+ <para>
+ In synchronous replication setup, a deadlock can happen, if the transaction
+ has locked [user] catalog tables exclusively. See
+ <xref linkend="logicaldecoding-capabilities"/> for information on user
+ catalog tables. This is because logical decoding of transactions can lock
+ catalog tables to access them. To avoid this users must refrain from taking
+ an exclusive lock on [user] catalog tables. This can happen in the following
+ ways:
+
+ <itemizedlist>
+ <listitem>
+ <para>
+ Issuing an explicit <command>LOCK</command> on <structname>pg_class</structname>
+ in a transaction.
+ </para>
+ </listitem>
+
+ <listitem>
+ <para>
+ Perform <command>CLUSTER</command> on <structname>pg_class</structname> in
+ a transaction.
+ </para>
+ </listitem>
+
+ <listitem>
+ <para>
+ <command>PREPARE TRANSACTION</command> after <command>LOCK</command> command
+ on <structname>pg_class</structname> and allow logical decoding of two-phase
+ transactions.
+ </para>
+ </listitem>
+
+ <listitem>
+ <para>
+ <command>PREPARE TRANSACTION</command> after <command>CLUSTER</command>
+ command on <structname>pg_trigger</structname> and allow logical decoding of
+ two-phase transactions. This will lead to deadlock only when published table
+ have a trigger.
+ </para>
+ </listitem>
+
+ <listitem>
+ <para>
+ Executing <command>TRUNCATE</command> on [user] catalog table in a
+ transaction.
+ </para>
+ </listitem>
+ </itemizedlist>
+
+ Note that these commands that can cause deadlock apply to not only explicitly
+ indicated system catalog tables above but also to any other [user] catalog
+ table.
+ </para>
+ </sect2>
+ </sect1>
+
+ <sect1 id="logicaldecoding-streaming">
+ <title>Streaming of Large Transactions for Logical Decoding</title>
+
+ <para>
+ The basic output plugin callbacks (e.g., <function>begin_cb</function>,
+ <function>change_cb</function>, <function>commit_cb</function> and
+ <function>message_cb</function>) are only invoked when the transaction
+ actually commits. The changes are still decoded from the transaction
+ log, but are only passed to the output plugin at commit (and discarded
+ if the transaction aborts).
+ </para>
+
+ <para>
+ This means that while the decoding happens incrementally, and may spill
+ to disk to keep memory usage under control, all the decoded changes have
+ to be transmitted when the transaction finally commits (or more precisely,
+ when the commit is decoded from the transaction log). Depending on the
+ size of the transaction and network bandwidth, the transfer time may
+ significantly increase the apply lag.
+ </para>
+
+ <para>
+ To reduce the apply lag caused by large transactions, an output plugin
+ may provide additional callback to support incremental streaming of
+ in-progress transactions. There are multiple required streaming callbacks
+ (<function>stream_start_cb</function>, <function>stream_stop_cb</function>,
+ <function>stream_abort_cb</function>, <function>stream_commit_cb</function>
+ and <function>stream_change_cb</function>) and two optional callbacks
+ (<function>stream_message_cb</function> and <function>stream_truncate_cb</function>).
+ </para>
+
+ <para>
+ When streaming an in-progress transaction, the changes (and messages) are
+ streamed in blocks demarcated by <function>stream_start_cb</function>
+ and <function>stream_stop_cb</function> callbacks. Once all the decoded
+ changes are transmitted, the transaction can be committed using the
+ the <function>stream_commit_cb</function> callback
+ (or possibly aborted using the <function>stream_abort_cb</function> callback).
+ If two-phase commits are supported, the transaction can be prepared using the
+ <function>stream_prepare_cb</function> callback,
+ <command>COMMIT PREPARED</command> using the
+ <function>commit_prepared_cb</function> callback or aborted using the
+ <function>rollback_prepared_cb</function>.
+ </para>
+
+ <para>
+ One example sequence of streaming callback calls for one transaction may
+ look like this:
+<programlisting>
+stream_start_cb(...); &lt;-- start of first block of changes
+ stream_change_cb(...);
+ stream_change_cb(...);
+ stream_message_cb(...);
+ stream_change_cb(...);
+ ...
+ stream_change_cb(...);
+stream_stop_cb(...); &lt;-- end of first block of changes
+
+stream_start_cb(...); &lt;-- start of second block of changes
+ stream_change_cb(...);
+ stream_change_cb(...);
+ stream_change_cb(...);
+ ...
+ stream_message_cb(...);
+ stream_change_cb(...);
+stream_stop_cb(...); &lt;-- end of second block of changes
+
+stream_commit_cb(...); &lt;-- commit of the streamed transaction
+</programlisting>
+ </para>
+
+ <para>
+ The actual sequence of callback calls may be more complicated, of course.
+ There may be blocks for multiple streamed transactions, some of the
+ transactions may get aborted, etc.
+ </para>
+
+ <para>
+ Similar to spill-to-disk behavior, streaming is triggered when the total
+ amount of changes decoded from the WAL (for all in-progress transactions)
+ exceeds the limit defined by <varname>logical_decoding_work_mem</varname> setting.
+ At that point, the largest top-level transaction (measured by the amount of memory
+ currently used for decoded changes) is selected and streamed. However, in
+ some cases we still have to spill to disk even if streaming is enabled
+ because we exceed the memory threshold but still have not decoded the
+ complete tuple e.g., only decoded toast table insert but not the main table
+ insert.
+ </para>
+
+ <para>
+ Even when streaming large transactions, the changes are still applied in
+ commit order, preserving the same guarantees as the non-streaming mode.
+ </para>
+
+ </sect1>
+
+ <sect1 id="logicaldecoding-two-phase-commits">
+ <title>Two-phase Commit Support for Logical Decoding</title>
+
+ <para>
+ With the basic output plugin callbacks (eg., <function>begin_cb</function>,
+ <function>change_cb</function>, <function>commit_cb</function> and
+ <function>message_cb</function>) two-phase commit commands like
+ <command>PREPARE TRANSACTION</command>, <command>COMMIT PREPARED</command>
+ and <command>ROLLBACK PREPARED</command> are not decoded. While the
+ <command>PREPARE TRANSACTION</command> is ignored,
+ <command>COMMIT PREPARED</command> is decoded as a <command>COMMIT</command>
+ and <command>ROLLBACK PREPARED</command> is decoded as a
+ <command>ROLLBACK</command>.
+ </para>
+
+ <para>
+ To support the streaming of two-phase commands, an output plugin needs to
+ provide additional callbacks. There are multiple two-phase commit callbacks
+ that are required, (<function>begin_prepare_cb</function>,
+ <function>prepare_cb</function>, <function>commit_prepared_cb</function>,
+ <function>rollback_prepared_cb</function> and
+ <function>stream_prepare_cb</function>) and an optional callback
+ (<function>filter_prepare_cb</function>).
+ </para>
+
+ <para>
+ If the output plugin callbacks for decoding two-phase commit commands are
+ provided, then on <command>PREPARE TRANSACTION</command>, the changes of
+ that transaction are decoded, passed to the output plugin, and the
+ <function>prepare_cb</function> callback is invoked. This differs from the
+ basic decoding setup where changes are only passed to the output plugin
+ when a transaction is committed. The start of a prepared transaction is
+ indicated by the <function>begin_prepare_cb</function> callback.
+ </para>
+
+ <para>
+ When a prepared transaction is rolled back using the
+ <command>ROLLBACK PREPARED</command>, then the
+ <function>rollback_prepared_cb</function> callback is invoked and when the
+ prepared transaction is committed using <command>COMMIT PREPARED</command>,
+ then the <function>commit_prepared_cb</function> callback is invoked.
+ </para>
+
+ <para>
+ Optionally the output plugin can define filtering rules via
+ <function>filter_prepare_cb</function> to decode only specific transaction
+ in two phases. This can be achieved by pattern matching on the
+ <parameter>gid</parameter> or via lookups using the
+ <parameter>xid</parameter>.
+ </para>
+
+ <para>
+ The users that want to decode prepared transactions need to be careful about
+ below mentioned points:
+
+ <itemizedlist>
+ <listitem>
+ <para>
+ If the prepared transaction has locked [user] catalog tables exclusively
+ then decoding prepare can block till the main transaction is committed.
+ </para>
+ </listitem>
+
+ <listitem>
+ <para>
+ The logical replication solution that builds distributed two phase commit
+ using this feature can deadlock if the prepared transaction has locked
+ [user] catalog tables exclusively. To avoid this users must refrain from
+ having locks on catalog tables (e.g. explicit <command>LOCK</command> command)
+ in such transactions.
+ See <xref linkend="logicaldecoding-synchronous-caveats"/> for the details.
+ </para>
+ </listitem>
+ </itemizedlist>
+ </para>
+
+ </sect1>
+ </chapter>