1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
|
<?xml version="1.0" encoding="UTF-8" standalone="no"?>
<!DOCTYPE html PUBLIC "-//W3C//DTD XHTML 1.0 Transitional//EN" "http://www.w3.org/TR/xhtml1/DTD/xhtml1-transitional.dtd"><html xmlns="http://www.w3.org/1999/xhtml"><head><meta http-equiv="Content-Type" content="text/html; charset=UTF-8" /><title>49.1. Logical Decoding Examples</title><link rel="stylesheet" type="text/css" href="stylesheet.css" /><link rev="made" href="pgsql-docs@lists.postgresql.org" /><meta name="generator" content="DocBook XSL Stylesheets Vsnapshot" /><link rel="prev" href="logicaldecoding.html" title="Chapter 49. Logical Decoding" /><link rel="next" href="logicaldecoding-explanation.html" title="49.2. Logical Decoding Concepts" /></head><body id="docContent" class="container-fluid col-10"><div class="navheader"><table width="100%" summary="Navigation header"><tr><th colspan="5" align="center">49.1. Logical Decoding Examples</th></tr><tr><td width="10%" align="left"><a accesskey="p" href="logicaldecoding.html" title="Chapter 49. Logical Decoding">Prev</a> </td><td width="10%" align="left"><a accesskey="u" href="logicaldecoding.html" title="Chapter 49. Logical Decoding">Up</a></td><th width="60%" align="center">Chapter 49. Logical Decoding</th><td width="10%" align="right"><a accesskey="h" href="index.html" title="PostgreSQL 16.2 Documentation">Home</a></td><td width="10%" align="right"> <a accesskey="n" href="logicaldecoding-explanation.html" title="49.2. Logical Decoding Concepts">Next</a></td></tr></table><hr /></div><div class="sect1" id="LOGICALDECODING-EXAMPLE"><div class="titlepage"><div><div><h2 class="title" style="clear: both">49.1. Logical Decoding Examples <a href="#LOGICALDECODING-EXAMPLE" class="id_link">#</a></h2></div></div></div><p>
The following example demonstrates controlling logical decoding using the
SQL interface.
</p><p>
Before you can use logical decoding, you must set
<a class="xref" href="runtime-config-wal.html#GUC-WAL-LEVEL">wal_level</a> to <code class="literal">logical</code> and
<a class="xref" href="runtime-config-replication.html#GUC-MAX-REPLICATION-SLOTS">max_replication_slots</a> to at least 1. Then, you
should connect to the target database (in the example
below, <code class="literal">postgres</code>) as a superuser.
</p><pre class="programlisting">
postgres=# -- Create a slot named 'regression_slot' using the output plugin 'test_decoding'
postgres=# SELECT * FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding', false, true);
slot_name | lsn
-----------------+-----------
regression_slot | 0/16B1970
(1 row)
postgres=# SELECT slot_name, plugin, slot_type, database, active, restart_lsn, confirmed_flush_lsn FROM pg_replication_slots;
slot_name | plugin | slot_type | database | active | restart_lsn | confirmed_flush_lsn
-----------------+---------------+-----------+----------+--------+-------------+-----------------
regression_slot | test_decoding | logical | postgres | f | 0/16A4408 | 0/16A4440
(1 row)
postgres=# -- There are no changes to see yet
postgres=# SELECT * FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL);
lsn | xid | data
-----+-----+------
(0 rows)
postgres=# CREATE TABLE data(id serial primary key, data text);
CREATE TABLE
postgres=# -- DDL isn't replicated, so all you'll see is the transaction
postgres=# SELECT * FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL);
lsn | xid | data
-----------+-------+--------------
0/BA2DA58 | 10297 | BEGIN 10297
0/BA5A5A0 | 10297 | COMMIT 10297
(2 rows)
postgres=# -- Once changes are read, they're consumed and not emitted
postgres=# -- in a subsequent call:
postgres=# SELECT * FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL);
lsn | xid | data
-----+-----+------
(0 rows)
postgres=# BEGIN;
postgres=*# INSERT INTO data(data) VALUES('1');
postgres=*# INSERT INTO data(data) VALUES('2');
postgres=*# COMMIT;
postgres=# SELECT * FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL);
lsn | xid | data
-----------+-------+---------------------------------------------------------
0/BA5A688 | 10298 | BEGIN 10298
0/BA5A6F0 | 10298 | table public.data: INSERT: id[integer]:1 data[text]:'1'
0/BA5A7F8 | 10298 | table public.data: INSERT: id[integer]:2 data[text]:'2'
0/BA5A8A8 | 10298 | COMMIT 10298
(4 rows)
postgres=# INSERT INTO data(data) VALUES('3');
postgres=# -- You can also peek ahead in the change stream without consuming changes
postgres=# SELECT * FROM pg_logical_slot_peek_changes('regression_slot', NULL, NULL);
lsn | xid | data
-----------+-------+---------------------------------------------------------
0/BA5A8E0 | 10299 | BEGIN 10299
0/BA5A8E0 | 10299 | table public.data: INSERT: id[integer]:3 data[text]:'3'
0/BA5A990 | 10299 | COMMIT 10299
(3 rows)
postgres=# -- The next call to pg_logical_slot_peek_changes() returns the same changes again
postgres=# SELECT * FROM pg_logical_slot_peek_changes('regression_slot', NULL, NULL);
lsn | xid | data
-----------+-------+---------------------------------------------------------
0/BA5A8E0 | 10299 | BEGIN 10299
0/BA5A8E0 | 10299 | table public.data: INSERT: id[integer]:3 data[text]:'3'
0/BA5A990 | 10299 | COMMIT 10299
(3 rows)
postgres=# -- options can be passed to output plugin, to influence the formatting
postgres=# SELECT * FROM pg_logical_slot_peek_changes('regression_slot', NULL, NULL, 'include-timestamp', 'on');
lsn | xid | data
-----------+-------+---------------------------------------------------------
0/BA5A8E0 | 10299 | BEGIN 10299
0/BA5A8E0 | 10299 | table public.data: INSERT: id[integer]:3 data[text]:'3'
0/BA5A990 | 10299 | COMMIT 10299 (at 2017-05-10 12:07:21.272494-04)
(3 rows)
postgres=# -- Remember to destroy a slot you no longer need to stop it consuming
postgres=# -- server resources:
postgres=# SELECT pg_drop_replication_slot('regression_slot');
pg_drop_replication_slot
-----------------------
(1 row)
</pre><p>
The following examples shows how logical decoding is controlled over the
streaming replication protocol, using the
program <a class="xref" href="app-pgrecvlogical.html" title="pg_recvlogical"><span class="refentrytitle"><span class="application">pg_recvlogical</span></span></a> included in the PostgreSQL
distribution. This requires that client authentication is set up to allow
replication connections
(see <a class="xref" href="warm-standby.html#STREAMING-REPLICATION-AUTHENTICATION" title="27.2.5.1. Authentication">Section 27.2.5.1</a>) and
that <code class="varname">max_wal_senders</code> is set sufficiently high to allow
an additional connection. The second example shows how to stream two-phase
transactions. Before you use two-phase commands, you must set
<a class="xref" href="runtime-config-resource.html#GUC-MAX-PREPARED-TRANSACTIONS">max_prepared_transactions</a> to at least 1.
</p><pre class="programlisting">
Example 1:
$ pg_recvlogical -d postgres --slot=test --create-slot
$ pg_recvlogical -d postgres --slot=test --start -f -
<span class="keycap"><strong>Control</strong></span>+<span class="keycap"><strong>Z</strong></span>
$ psql -d postgres -c "INSERT INTO data(data) VALUES('4');"
$ fg
BEGIN 693
table public.data: INSERT: id[integer]:4 data[text]:'4'
COMMIT 693
<span class="keycap"><strong>Control</strong></span>+<span class="keycap"><strong>C</strong></span>
$ pg_recvlogical -d postgres --slot=test --drop-slot
Example 2:
$ pg_recvlogical -d postgres --slot=test --create-slot --two-phase
$ pg_recvlogical -d postgres --slot=test --start -f -
<span class="keycap"><strong>Control</strong></span>+<span class="keycap"><strong>Z</strong></span>
$ psql -d postgres -c "BEGIN;INSERT INTO data(data) VALUES('5');PREPARE TRANSACTION 'test';"
$ fg
BEGIN 694
table public.data: INSERT: id[integer]:5 data[text]:'5'
PREPARE TRANSACTION 'test', txid 694
<span class="keycap"><strong>Control</strong></span>+<span class="keycap"><strong>Z</strong></span>
$ psql -d postgres -c "COMMIT PREPARED 'test';"
$ fg
COMMIT PREPARED 'test', txid 694
<span class="keycap"><strong>Control</strong></span>+<span class="keycap"><strong>C</strong></span>
$ pg_recvlogical -d postgres --slot=test --drop-slot
</pre><p>
The following example shows SQL interface that can be used to decode prepared
transactions. Before you use two-phase commit commands, you must set
<code class="varname">max_prepared_transactions</code> to at least 1. You must also have
set the two-phase parameter as 'true' while creating the slot using
<code class="function">pg_create_logical_replication_slot</code>
Note that we will stream the entire transaction after the commit if it
is not already decoded.
</p><pre class="programlisting">
postgres=# BEGIN;
postgres=*# INSERT INTO data(data) VALUES('5');
postgres=*# PREPARE TRANSACTION 'test_prepared1';
postgres=# SELECT * FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL);
lsn | xid | data
-----------+-----+---------------------------------------------------------
0/1689DC0 | 529 | BEGIN 529
0/1689DC0 | 529 | table public.data: INSERT: id[integer]:3 data[text]:'5'
0/1689FC0 | 529 | PREPARE TRANSACTION 'test_prepared1', txid 529
(3 rows)
postgres=# COMMIT PREPARED 'test_prepared1';
postgres=# select * from pg_logical_slot_get_changes('regression_slot', NULL, NULL);
lsn | xid | data
-----------+-----+--------------------------------------------
0/168A060 | 529 | COMMIT PREPARED 'test_prepared1', txid 529
(4 row)
postgres=#-- you can also rollback a prepared transaction
postgres=# BEGIN;
postgres=*# INSERT INTO data(data) VALUES('6');
postgres=*# PREPARE TRANSACTION 'test_prepared2';
postgres=# select * from pg_logical_slot_get_changes('regression_slot', NULL, NULL);
lsn | xid | data
-----------+-----+---------------------------------------------------------
0/168A180 | 530 | BEGIN 530
0/168A1E8 | 530 | table public.data: INSERT: id[integer]:4 data[text]:'6'
0/168A430 | 530 | PREPARE TRANSACTION 'test_prepared2', txid 530
(3 rows)
postgres=# ROLLBACK PREPARED 'test_prepared2';
postgres=# select * from pg_logical_slot_get_changes('regression_slot', NULL, NULL);
lsn | xid | data
-----------+-----+----------------------------------------------
0/168A4B8 | 530 | ROLLBACK PREPARED 'test_prepared2', txid 530
(1 row)
</pre></div><div class="navfooter"><hr /><table width="100%" summary="Navigation footer"><tr><td width="40%" align="left"><a accesskey="p" href="logicaldecoding.html" title="Chapter 49. Logical Decoding">Prev</a> </td><td width="20%" align="center"><a accesskey="u" href="logicaldecoding.html" title="Chapter 49. Logical Decoding">Up</a></td><td width="40%" align="right"> <a accesskey="n" href="logicaldecoding-explanation.html" title="49.2. Logical Decoding Concepts">Next</a></td></tr><tr><td width="40%" align="left" valign="top">Chapter 49. Logical Decoding </td><td width="20%" align="center"><a accesskey="h" href="index.html" title="PostgreSQL 16.2 Documentation">Home</a></td><td width="40%" align="right" valign="top"> 49.2. Logical Decoding Concepts</td></tr></table></div></body></html>
|