1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
|
-- Test streaming of two-phase commits
SET synchronous_commit = on;
SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding', false, true);
?column?
----------
init
(1 row)
CREATE TABLE stream_test(data text);
-- consume DDL
SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
data
------
(0 rows)
-- streaming test with sub-transaction and PREPARE/COMMIT PREPARED
BEGIN;
SAVEPOINT s1;
SELECT 'msg5' FROM pg_logical_emit_message(true, 'test', repeat('a', 50));
?column?
----------
msg5
(1 row)
INSERT INTO stream_test SELECT repeat('a', 2000) || g.i FROM generate_series(1, 35) g(i);
TRUNCATE table stream_test;
ROLLBACK TO s1;
INSERT INTO stream_test SELECT repeat('a', 10) || g.i FROM generate_series(1, 20) g(i);
PREPARE TRANSACTION 'test1';
-- should show the inserts after a ROLLBACK
SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL,NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'stream-changes', '1');
data
----------------------------------------------------------
streaming message: transactional: 1 prefix: test, sz: 50
opening a streamed block for transaction
streaming change for transaction
streaming change for transaction
streaming change for transaction
streaming change for transaction
streaming change for transaction
streaming change for transaction
streaming change for transaction
streaming change for transaction
streaming change for transaction
streaming change for transaction
streaming change for transaction
streaming change for transaction
streaming change for transaction
streaming change for transaction
streaming change for transaction
streaming change for transaction
streaming change for transaction
streaming change for transaction
streaming change for transaction
streaming change for transaction
closing a streamed block for transaction
preparing streamed transaction 'test1'
(24 rows)
COMMIT PREPARED 'test1';
--should show the COMMIT PREPARED and the other changes in the transaction
SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL,NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'stream-changes', '1');
data
-------------------------
COMMIT PREPARED 'test1'
(1 row)
-- streaming test with sub-transaction and PREPARE/COMMIT PREPARED but with
-- filtered gid. gids with '_nodecode' will not be decoded at prepare time.
BEGIN;
SAVEPOINT s1;
SELECT 'msg5' FROM pg_logical_emit_message(true, 'test', repeat('a', 50));
?column?
----------
msg5
(1 row)
INSERT INTO stream_test SELECT repeat('a', 2000) || g.i FROM generate_series(1, 35) g(i);
TRUNCATE table stream_test;
ROLLBACK to s1;
INSERT INTO stream_test SELECT repeat('a', 10) || g.i FROM generate_series(1, 20) g(i);
PREPARE TRANSACTION 'test1_nodecode';
-- should NOT show inserts after a ROLLBACK
SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL,NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'stream-changes', '1');
data
----------------------------------------------------------
streaming message: transactional: 1 prefix: test, sz: 50
(1 row)
COMMIT PREPARED 'test1_nodecode';
-- should show the inserts but not show a COMMIT PREPARED but a COMMIT
SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL,NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'stream-changes', '1');
data
-------------------------------------------------------------
BEGIN
table public.stream_test: INSERT: data[text]:'aaaaaaaaaa1'
table public.stream_test: INSERT: data[text]:'aaaaaaaaaa2'
table public.stream_test: INSERT: data[text]:'aaaaaaaaaa3'
table public.stream_test: INSERT: data[text]:'aaaaaaaaaa4'
table public.stream_test: INSERT: data[text]:'aaaaaaaaaa5'
table public.stream_test: INSERT: data[text]:'aaaaaaaaaa6'
table public.stream_test: INSERT: data[text]:'aaaaaaaaaa7'
table public.stream_test: INSERT: data[text]:'aaaaaaaaaa8'
table public.stream_test: INSERT: data[text]:'aaaaaaaaaa9'
table public.stream_test: INSERT: data[text]:'aaaaaaaaaa10'
table public.stream_test: INSERT: data[text]:'aaaaaaaaaa11'
table public.stream_test: INSERT: data[text]:'aaaaaaaaaa12'
table public.stream_test: INSERT: data[text]:'aaaaaaaaaa13'
table public.stream_test: INSERT: data[text]:'aaaaaaaaaa14'
table public.stream_test: INSERT: data[text]:'aaaaaaaaaa15'
table public.stream_test: INSERT: data[text]:'aaaaaaaaaa16'
table public.stream_test: INSERT: data[text]:'aaaaaaaaaa17'
table public.stream_test: INSERT: data[text]:'aaaaaaaaaa18'
table public.stream_test: INSERT: data[text]:'aaaaaaaaaa19'
table public.stream_test: INSERT: data[text]:'aaaaaaaaaa20'
COMMIT
(22 rows)
DROP TABLE stream_test;
SELECT pg_drop_replication_slot('regression_slot');
pg_drop_replication_slot
--------------------------
(1 row)
|