1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
|
# Copyright (c) 2021-2023, PostgreSQL Global Development Group
# Test streaming of transaction containing multiple subtransactions and rollbacks
use strict;
use warnings;
use PostgreSQL::Test::Cluster;
use PostgreSQL::Test::Utils;
use Test::More;
# Check that the parallel apply worker has finished applying the streaming
# transaction.
sub check_parallel_log
{
my ($node_subscriber, $offset, $is_parallel, $type) = @_;
if ($is_parallel)
{
$node_subscriber->wait_for_log(
qr/DEBUG: ( [A-Z0-9]+:)? finished processing the STREAM $type command/,
$offset);
}
}
# Common test steps for both the streaming=on and streaming=parallel cases.
sub test_streaming
{
my ($node_publisher, $node_subscriber, $appname, $is_parallel) = @_;
my $offset = 0;
# Check the subscriber log from now on.
$offset = -s $node_subscriber->logfile;
# streamed transaction with DDL, DML and ROLLBACKs
$node_publisher->safe_psql(
'postgres', q{
BEGIN;
INSERT INTO test_tab VALUES (3, md5(3::text));
SAVEPOINT s1;
INSERT INTO test_tab VALUES (4, md5(4::text));
SAVEPOINT s2;
INSERT INTO test_tab VALUES (5, md5(5::text));
SAVEPOINT s3;
INSERT INTO test_tab VALUES (6, md5(6::text));
ROLLBACK TO s2;
INSERT INTO test_tab VALUES (7, md5(7::text));
ROLLBACK TO s1;
INSERT INTO test_tab VALUES (8, md5(8::text));
SAVEPOINT s4;
INSERT INTO test_tab VALUES (9, md5(9::text));
SAVEPOINT s5;
INSERT INTO test_tab VALUES (10, md5(10::text));
COMMIT;
});
$node_publisher->wait_for_catchup($appname);
check_parallel_log($node_subscriber, $offset, $is_parallel, 'COMMIT');
my $result =
$node_subscriber->safe_psql('postgres',
"SELECT count(*), count(c) FROM test_tab");
is($result, qq(6|0),
'check rollback to savepoint was reflected on subscriber and extra columns contain local defaults'
);
# Check the subscriber log from now on.
$offset = -s $node_subscriber->logfile;
# streamed transaction with subscriber receiving out of order
# subtransaction ROLLBACKs
$node_publisher->safe_psql(
'postgres', q{
BEGIN;
INSERT INTO test_tab VALUES (11, md5(11::text));
SAVEPOINT s1;
INSERT INTO test_tab VALUES (12, md5(12::text));
SAVEPOINT s2;
INSERT INTO test_tab VALUES (13, md5(13::text));
SAVEPOINT s3;
INSERT INTO test_tab VALUES (14, md5(14::text));
RELEASE s2;
INSERT INTO test_tab VALUES (15, md5(15::text));
ROLLBACK TO s1;
COMMIT;
});
$node_publisher->wait_for_catchup($appname);
check_parallel_log($node_subscriber, $offset, $is_parallel, 'COMMIT');
$result =
$node_subscriber->safe_psql('postgres',
"SELECT count(*), count(c) FROM test_tab");
is($result, qq(7|0),
'check rollback to savepoint was reflected on subscriber');
# Check the subscriber log from now on.
$offset = -s $node_subscriber->logfile;
# streamed transaction with subscriber receiving rollback
$node_publisher->safe_psql(
'postgres', q{
BEGIN;
INSERT INTO test_tab VALUES (16, md5(16::text));
SAVEPOINT s1;
INSERT INTO test_tab VALUES (17, md5(17::text));
SAVEPOINT s2;
INSERT INTO test_tab VALUES (18, md5(18::text));
ROLLBACK;
});
$node_publisher->wait_for_catchup($appname);
check_parallel_log($node_subscriber, $offset, $is_parallel, 'ABORT');
$result =
$node_subscriber->safe_psql('postgres',
"SELECT count(*), count(c) FROM test_tab");
is($result, qq(7|0), 'check rollback was reflected on subscriber');
# Cleanup the test data
$node_publisher->safe_psql('postgres',
"DELETE FROM test_tab WHERE (a > 2)");
$node_publisher->wait_for_catchup($appname);
}
# Create publisher node
my $node_publisher = PostgreSQL::Test::Cluster->new('publisher');
$node_publisher->init(allows_streaming => 'logical');
$node_publisher->append_conf('postgresql.conf',
'debug_logical_replication_streaming = immediate');
$node_publisher->start;
# Create subscriber node
my $node_subscriber = PostgreSQL::Test::Cluster->new('subscriber');
$node_subscriber->init(allows_streaming => 'logical');
$node_subscriber->start;
# Create some preexisting content on publisher
$node_publisher->safe_psql('postgres',
"CREATE TABLE test_tab (a int primary key, b varchar)");
$node_publisher->safe_psql('postgres',
"INSERT INTO test_tab VALUES (1, 'foo'), (2, 'bar')");
$node_publisher->safe_psql('postgres', "CREATE TABLE test_tab_2 (a int)");
# Setup structure on subscriber
$node_subscriber->safe_psql('postgres',
"CREATE TABLE test_tab (a int primary key, b text, c INT, d INT, e INT)");
$node_subscriber->safe_psql('postgres', "CREATE TABLE test_tab_2 (a int)");
# Setup logical replication
my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres';
$node_publisher->safe_psql('postgres',
"CREATE PUBLICATION tap_pub FOR TABLE test_tab, test_tab_2");
my $appname = 'tap_sub';
################################
# Test using streaming mode 'on'
################################
$node_subscriber->safe_psql('postgres',
"CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr application_name=$appname' PUBLICATION tap_pub WITH (streaming = on)"
);
# Wait for initial table sync to finish
$node_subscriber->wait_for_subscription_sync($node_publisher, $appname);
my $result =
$node_subscriber->safe_psql('postgres',
"SELECT count(*), count(c) FROM test_tab");
is($result, qq(2|0), 'check initial data was copied to subscriber');
test_streaming($node_publisher, $node_subscriber, $appname, 0);
######################################
# Test using streaming mode 'parallel'
######################################
my $oldpid = $node_publisher->safe_psql('postgres',
"SELECT pid FROM pg_stat_replication WHERE application_name = '$appname' AND state = 'streaming';"
);
$node_subscriber->safe_psql('postgres',
"ALTER SUBSCRIPTION tap_sub SET(streaming = parallel)");
$node_publisher->poll_query_until('postgres',
"SELECT pid != $oldpid FROM pg_stat_replication WHERE application_name = '$appname' AND state = 'streaming';"
)
or die
"Timed out while waiting for apply to restart after changing SUBSCRIPTION";
# We need to check DEBUG logs to ensure that the parallel apply worker has
# applied the transaction. So, bump up the log verbosity.
$node_subscriber->append_conf('postgresql.conf', "log_min_messages = debug1");
$node_subscriber->reload;
# Run a query to make sure that the reload has taken effect.
$node_subscriber->safe_psql('postgres', q{SELECT 1});
test_streaming($node_publisher, $node_subscriber, $appname, 1);
# Test serializing changes to files and notify the parallel apply worker to
# apply them at the end of the transaction.
$node_subscriber->append_conf('postgresql.conf',
'debug_logical_replication_streaming = immediate');
# Reset the log_min_messages to default.
$node_subscriber->append_conf('postgresql.conf',
"log_min_messages = warning");
$node_subscriber->reload;
# Run a query to make sure that the reload has taken effect.
$node_subscriber->safe_psql('postgres', q{SELECT 1});
my $offset = -s $node_subscriber->logfile;
$node_publisher->safe_psql(
'postgres', q{
BEGIN;
INSERT INTO test_tab_2 values(1);
ROLLBACK;
});
# Ensure that the changes are serialized.
$node_subscriber->wait_for_log(
qr/LOG: ( [A-Z0-9]+:)? logical replication apply worker will serialize the remaining changes of remote transaction \d+ to a file/,
$offset);
$node_publisher->wait_for_catchup($appname);
# Check that transaction is aborted on subscriber
$result =
$node_subscriber->safe_psql('postgres', "SELECT count(*) FROM test_tab_2");
is($result, qq(0), 'check rollback was reflected on subscriber');
# Serialize the ABORT sub-transaction.
$offset = -s $node_subscriber->logfile;
$node_publisher->safe_psql(
'postgres', q{
BEGIN;
INSERT INTO test_tab_2 values(1);
SAVEPOINT sp;
INSERT INTO test_tab_2 values(1);
ROLLBACK TO sp;
COMMIT;
});
# Ensure that the changes are serialized.
$node_subscriber->wait_for_log(
qr/LOG: ( [A-Z0-9]+:)? logical replication apply worker will serialize the remaining changes of remote transaction \d+ to a file/,
$offset);
$node_publisher->wait_for_catchup($appname);
# Check that only sub-transaction is aborted on subscriber.
$result =
$node_subscriber->safe_psql('postgres', "SELECT count(*) FROM test_tab_2");
is($result, qq(1), 'check rollback to savepoint was reflected on subscriber');
$node_subscriber->stop;
$node_publisher->stop;
done_testing();
|