1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
|
-- Test prepared transactions. When two-phase-commit is enabled, transactions are
-- decoded at PREPARE time rather than at COMMIT PREPARED time.
SET synchronous_commit = on;
SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding', false, true);
?column?
----------
init
(1 row)
CREATE TABLE test_prepared1(id integer primary key);
CREATE TABLE test_prepared2(id integer primary key);
-- Test that decoding happens at PREPARE time when two-phase-commit is enabled.
-- Decoding after COMMIT PREPARED must have all the commands in the transaction.
BEGIN;
INSERT INTO test_prepared1 VALUES (1);
INSERT INTO test_prepared1 VALUES (2);
-- should show nothing because the xact has not been prepared yet.
SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
data
------
(0 rows)
PREPARE TRANSACTION 'test_prepared#1';
-- should show both the above inserts and the PREPARE TRANSACTION.
SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
data
----------------------------------------------------
BEGIN
table public.test_prepared1: INSERT: id[integer]:1
table public.test_prepared1: INSERT: id[integer]:2
PREPARE TRANSACTION 'test_prepared#1'
(4 rows)
COMMIT PREPARED 'test_prepared#1';
SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
data
-----------------------------------
COMMIT PREPARED 'test_prepared#1'
(1 row)
-- Test that rollback of a prepared xact is decoded.
BEGIN;
INSERT INTO test_prepared1 VALUES (3);
PREPARE TRANSACTION 'test_prepared#2';
SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
data
----------------------------------------------------
BEGIN
table public.test_prepared1: INSERT: id[integer]:3
PREPARE TRANSACTION 'test_prepared#2'
(3 rows)
ROLLBACK PREPARED 'test_prepared#2';
SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
data
-------------------------------------
ROLLBACK PREPARED 'test_prepared#2'
(1 row)
-- Test prepare of a xact containing ddl. Leaving xact uncommitted for next test.
BEGIN;
ALTER TABLE test_prepared1 ADD COLUMN data text;
INSERT INTO test_prepared1 VALUES (4, 'frakbar');
PREPARE TRANSACTION 'test_prepared#3';
-- confirm that exclusive lock from the ALTER command is held on test_prepared1 table
SELECT 'test_prepared_1' AS relation, locktype, mode
FROM pg_locks
WHERE locktype = 'relation'
AND relation = 'test_prepared1'::regclass;
relation | locktype | mode
-----------------+----------+---------------------
test_prepared_1 | relation | RowExclusiveLock
test_prepared_1 | relation | AccessExclusiveLock
(2 rows)
-- The insert should show the newly altered column but not the DDL.
SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
data
-------------------------------------------------------------------------
BEGIN
table public.test_prepared1: INSERT: id[integer]:4 data[text]:'frakbar'
PREPARE TRANSACTION 'test_prepared#3'
(3 rows)
-- Test that we decode correctly while an uncommitted prepared xact
-- with ddl exists.
--
-- Use a separate table for the concurrent transaction because the lock from
-- the ALTER will stop us inserting into the other one.
--
INSERT INTO test_prepared2 VALUES (5);
SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
data
----------------------------------------------------
BEGIN
table public.test_prepared2: INSERT: id[integer]:5
COMMIT
(3 rows)
COMMIT PREPARED 'test_prepared#3';
SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
data
-----------------------------------
COMMIT PREPARED 'test_prepared#3'
(1 row)
-- make sure stuff still works
INSERT INTO test_prepared1 VALUES (6);
INSERT INTO test_prepared2 VALUES (7);
SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
data
--------------------------------------------------------------------
BEGIN
table public.test_prepared1: INSERT: id[integer]:6 data[text]:null
COMMIT
BEGIN
table public.test_prepared2: INSERT: id[integer]:7
COMMIT
(6 rows)
-- Check 'CLUSTER' (as operation that hold exclusive lock) doesn't block
-- logical decoding.
BEGIN;
INSERT INTO test_prepared1 VALUES (8, 'othercol');
CLUSTER test_prepared1 USING test_prepared1_pkey;
INSERT INTO test_prepared1 VALUES (9, 'othercol2');
PREPARE TRANSACTION 'test_prepared_lock';
SELECT 'test_prepared1' AS relation, locktype, mode
FROM pg_locks
WHERE locktype = 'relation'
AND relation = 'test_prepared1'::regclass;
relation | locktype | mode
----------------+----------+---------------------
test_prepared1 | relation | RowExclusiveLock
test_prepared1 | relation | ShareLock
test_prepared1 | relation | AccessExclusiveLock
(3 rows)
-- The above CLUSTER command shouldn't cause a timeout on 2pc decoding.
\set env_timeout ''
\getenv env_timeout PG_TEST_TIMEOUT_DEFAULT
SELECT COALESCE(NULLIF(:'env_timeout', ''), '180') || 's' AS timeout \gset
SET statement_timeout = :'timeout';
SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
data
---------------------------------------------------------------------------
BEGIN
table public.test_prepared1: INSERT: id[integer]:8 data[text]:'othercol'
table public.test_prepared1: INSERT: id[integer]:9 data[text]:'othercol2'
PREPARE TRANSACTION 'test_prepared_lock'
(4 rows)
RESET statement_timeout;
COMMIT PREPARED 'test_prepared_lock';
-- consume the commit
SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
data
--------------------------------------
COMMIT PREPARED 'test_prepared_lock'
(1 row)
-- Test savepoints and sub-xacts. Creating savepoints will create
-- sub-xacts implicitly.
BEGIN;
CREATE TABLE test_prepared_savepoint (a int);
INSERT INTO test_prepared_savepoint VALUES (1);
SAVEPOINT test_savepoint;
INSERT INTO test_prepared_savepoint VALUES (2);
ROLLBACK TO SAVEPOINT test_savepoint;
PREPARE TRANSACTION 'test_prepared_savepoint';
-- should show only 1, not 2
SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
data
------------------------------------------------------------
BEGIN
table public.test_prepared_savepoint: INSERT: a[integer]:1
PREPARE TRANSACTION 'test_prepared_savepoint'
(3 rows)
COMMIT PREPARED 'test_prepared_savepoint';
-- consume the commit
SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
data
-------------------------------------------
COMMIT PREPARED 'test_prepared_savepoint'
(1 row)
-- Test that a GID containing "_nodecode" gets decoded at commit prepared time.
BEGIN;
INSERT INTO test_prepared1 VALUES (20);
PREPARE TRANSACTION 'test_prepared_nodecode';
-- should show nothing
SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
data
------
(0 rows)
COMMIT PREPARED 'test_prepared_nodecode';
-- should be decoded now
SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
data
---------------------------------------------------------------------
BEGIN
table public.test_prepared1: INSERT: id[integer]:20 data[text]:null
COMMIT
(3 rows)
-- Test 8:
-- cleanup and make sure results are also empty
DROP TABLE test_prepared1;
DROP TABLE test_prepared2;
-- show results. There should be nothing to show
SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
data
------
(0 rows)
SELECT pg_drop_replication_slot('regression_slot');
pg_drop_replication_slot
--------------------------
(1 row)
|