1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
|
# Copyright (c) 2021-2023, PostgreSQL Global Development Group
# Test logical replication of 2PC with streaming.
use strict;
use warnings;
use PostgreSQL::Test::Cluster;
use PostgreSQL::Test::Utils;
use Test::More;
# Check that the parallel apply worker has finished applying the streaming
# transaction.
sub check_parallel_log
{
my ($node_subscriber, $offset, $is_parallel, $type) = @_;
if ($is_parallel)
{
$node_subscriber->wait_for_log(
qr/DEBUG: ( [A-Z0-9]+:)? finished processing the STREAM $type command/,
$offset);
}
}
# Common test steps for both the streaming=on and streaming=parallel cases.
sub test_streaming
{
my ($node_publisher, $node_subscriber, $appname, $is_parallel) = @_;
my $offset = 0;
###############################
# Test 2PC PREPARE / COMMIT PREPARED.
# 1. Data is streamed as a 2PC transaction.
# 2. Then do commit prepared.
#
# Expect all data is replicated on subscriber side after the commit.
###############################
# Check the subscriber log from now on.
$offset = -s $node_subscriber->logfile;
# check that 2PC gets replicated to subscriber
# Insert, update and delete some rows.
$node_publisher->safe_psql(
'postgres', q{
BEGIN;
INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(3, 5) s(i);
UPDATE test_tab SET b = md5(b) WHERE mod(a,2) = 0;
DELETE FROM test_tab WHERE mod(a,3) = 0;
PREPARE TRANSACTION 'test_prepared_tab';});
$node_publisher->wait_for_catchup($appname);
check_parallel_log($node_subscriber, $offset, $is_parallel, 'PREPARE');
# check that transaction is in prepared state on subscriber
my $result = $node_subscriber->safe_psql('postgres',
"SELECT count(*) FROM pg_prepared_xacts;");
is($result, qq(1), 'transaction is prepared on subscriber');
# 2PC transaction gets committed
$node_publisher->safe_psql('postgres',
"COMMIT PREPARED 'test_prepared_tab';");
$node_publisher->wait_for_catchup($appname);
# check that transaction is committed on subscriber
$result = $node_subscriber->safe_psql('postgres',
"SELECT count(*), count(c), count(d = 999) FROM test_tab");
is($result, qq(4|4|4),
'Rows inserted by 2PC have committed on subscriber, and extra columns contain local defaults'
);
$result = $node_subscriber->safe_psql('postgres',
"SELECT count(*) FROM pg_prepared_xacts;");
is($result, qq(0), 'transaction is committed on subscriber');
###############################
# Test 2PC PREPARE / ROLLBACK PREPARED.
# 1. Table is deleted back to 2 rows which are replicated on subscriber.
# 2. Data is streamed using 2PC.
# 3. Do rollback prepared.
#
# Expect data rolls back leaving only the original 2 rows.
###############################
# First, delete the data except for 2 rows (will be replicated)
$node_publisher->safe_psql('postgres',
"DELETE FROM test_tab WHERE a > 2;");
# Check the subscriber log from now on.
$offset = -s $node_subscriber->logfile;
# Then insert, update and delete some rows.
$node_publisher->safe_psql(
'postgres', q{
BEGIN;
INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(3, 5) s(i);
UPDATE test_tab SET b = md5(b) WHERE mod(a,2) = 0;
DELETE FROM test_tab WHERE mod(a,3) = 0;
PREPARE TRANSACTION 'test_prepared_tab';});
$node_publisher->wait_for_catchup($appname);
check_parallel_log($node_subscriber, $offset, $is_parallel, 'PREPARE');
# check that transaction is in prepared state on subscriber
$result = $node_subscriber->safe_psql('postgres',
"SELECT count(*) FROM pg_prepared_xacts;");
is($result, qq(1), 'transaction is prepared on subscriber');
# 2PC transaction gets aborted
$node_publisher->safe_psql('postgres',
"ROLLBACK PREPARED 'test_prepared_tab';");
$node_publisher->wait_for_catchup($appname);
# check that transaction is aborted on subscriber
$result = $node_subscriber->safe_psql('postgres',
"SELECT count(*), count(c), count(d = 999) FROM test_tab");
is($result, qq(2|2|2),
'Rows inserted by 2PC are rolled back, leaving only the original 2 rows'
);
$result = $node_subscriber->safe_psql('postgres',
"SELECT count(*) FROM pg_prepared_xacts;");
is($result, qq(0), 'transaction is aborted on subscriber');
###############################
# Check that 2PC COMMIT PREPARED is decoded properly on crash restart.
# 1. insert, update and delete some rows.
# 2. Then server crashes before the 2PC transaction is committed.
# 3. After servers are restarted the pending transaction is committed.
#
# Expect all data is replicated on subscriber side after the commit.
# Note: both publisher and subscriber do crash/restart.
###############################
# Check the subscriber log from now on.
$offset = -s $node_subscriber->logfile;
$node_publisher->safe_psql(
'postgres', q{
BEGIN;
INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(3, 5) s(i);
UPDATE test_tab SET b = md5(b) WHERE mod(a,2) = 0;
DELETE FROM test_tab WHERE mod(a,3) = 0;
PREPARE TRANSACTION 'test_prepared_tab';});
$node_subscriber->stop('immediate');
$node_publisher->stop('immediate');
$node_publisher->start;
$node_subscriber->start;
# We don't try to check the log for parallel option here as the subscriber
# may have stopped after finishing the prepare and before logging the
# appropriate message.
# commit post the restart
$node_publisher->safe_psql('postgres',
"COMMIT PREPARED 'test_prepared_tab';");
$node_publisher->wait_for_catchup($appname);
# check inserts are visible
$result = $node_subscriber->safe_psql('postgres',
"SELECT count(*), count(c), count(d = 999) FROM test_tab");
is($result, qq(4|4|4),
'Rows inserted by 2PC have committed on subscriber, and extra columns contain local defaults'
);
###############################
# Do INSERT after the PREPARE but before ROLLBACK PREPARED.
# 1. Table is deleted back to 2 rows which are replicated on subscriber.
# 2. Data is streamed using 2PC.
# 3. A single row INSERT is done which is after the PREPARE.
# 4. Then do a ROLLBACK PREPARED.
#
# Expect the 2PC data rolls back leaving only 3 rows on the subscriber
# (the original 2 + inserted 1).
###############################
# First, delete the data except for 2 rows (will be replicated)
$node_publisher->safe_psql('postgres',
"DELETE FROM test_tab WHERE a > 2;");
# Check the subscriber log from now on.
$offset = -s $node_subscriber->logfile;
# Then insert, update and delete some rows.
$node_publisher->safe_psql(
'postgres', q{
BEGIN;
INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(3, 5) s(i);
UPDATE test_tab SET b = md5(b) WHERE mod(a,2) = 0;
DELETE FROM test_tab WHERE mod(a,3) = 0;
PREPARE TRANSACTION 'test_prepared_tab';});
$node_publisher->wait_for_catchup($appname);
check_parallel_log($node_subscriber, $offset, $is_parallel, 'PREPARE');
# check that transaction is in prepared state on subscriber
$result = $node_subscriber->safe_psql('postgres',
"SELECT count(*) FROM pg_prepared_xacts;");
is($result, qq(1), 'transaction is prepared on subscriber');
# Insert a different record (now we are outside of the 2PC transaction)
# Note: the 2PC transaction still holds row locks so make sure this insert is for a separate primary key
$node_publisher->safe_psql('postgres',
"INSERT INTO test_tab VALUES (99999, 'foobar')");
# 2PC transaction gets aborted
$node_publisher->safe_psql('postgres',
"ROLLBACK PREPARED 'test_prepared_tab';");
$node_publisher->wait_for_catchup($appname);
# check that transaction is aborted on subscriber,
# but the extra INSERT outside of the 2PC still was replicated
$result = $node_subscriber->safe_psql('postgres',
"SELECT count(*), count(c), count(d = 999) FROM test_tab");
is($result, qq(3|3|3),
'check the outside insert was copied to subscriber');
$result = $node_subscriber->safe_psql('postgres',
"SELECT count(*) FROM pg_prepared_xacts;");
is($result, qq(0), 'transaction is aborted on subscriber');
###############################
# Do INSERT after the PREPARE but before COMMIT PREPARED.
# 1. Table is deleted back to 2 rows which are replicated on subscriber.
# 2. Data is streamed using 2PC.
# 3. A single row INSERT is done which is after the PREPARE.
# 4. Then do a COMMIT PREPARED.
#
# Expect 2PC data + the extra row are on the subscriber
# (the 3334 + inserted 1 = 3335).
###############################
# First, delete the data except for 2 rows (will be replicated)
$node_publisher->safe_psql('postgres',
"DELETE FROM test_tab WHERE a > 2;");
# Check the subscriber log from now on.
$offset = -s $node_subscriber->logfile;
# Then insert, update and delete some rows.
$node_publisher->safe_psql(
'postgres', q{
BEGIN;
INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(3, 5) s(i);
UPDATE test_tab SET b = md5(b) WHERE mod(a,2) = 0;
DELETE FROM test_tab WHERE mod(a,3) = 0;
PREPARE TRANSACTION 'test_prepared_tab';});
$node_publisher->wait_for_catchup($appname);
check_parallel_log($node_subscriber, $offset, $is_parallel, 'PREPARE');
# check that transaction is in prepared state on subscriber
$result = $node_subscriber->safe_psql('postgres',
"SELECT count(*) FROM pg_prepared_xacts;");
is($result, qq(1), 'transaction is prepared on subscriber');
# Insert a different record (now we are outside of the 2PC transaction)
# Note: the 2PC transaction still holds row locks so make sure this insert is for a separate primary key
$node_publisher->safe_psql('postgres',
"INSERT INTO test_tab VALUES (99999, 'foobar')");
# 2PC transaction gets committed
$node_publisher->safe_psql('postgres',
"COMMIT PREPARED 'test_prepared_tab';");
$node_publisher->wait_for_catchup($appname);
# check that transaction is committed on subscriber
$result = $node_subscriber->safe_psql('postgres',
"SELECT count(*), count(c), count(d = 999) FROM test_tab");
is($result, qq(5|5|5),
'Rows inserted by 2PC (as well as outside insert) have committed on subscriber, and extra columns contain local defaults'
);
$result = $node_subscriber->safe_psql('postgres',
"SELECT count(*) FROM pg_prepared_xacts;");
is($result, qq(0), 'transaction is committed on subscriber');
# Cleanup the test data
$node_publisher->safe_psql('postgres',
"DELETE FROM test_tab WHERE a > 2;");
$node_publisher->wait_for_catchup($appname);
}
###############################
# Setup
###############################
# Initialize publisher node
my $node_publisher = PostgreSQL::Test::Cluster->new('publisher');
$node_publisher->init(allows_streaming => 'logical');
$node_publisher->append_conf(
'postgresql.conf', qq(
max_prepared_transactions = 10
debug_logical_replication_streaming = immediate
));
$node_publisher->start;
# Create subscriber node
my $node_subscriber = PostgreSQL::Test::Cluster->new('subscriber');
$node_subscriber->init(allows_streaming => 'logical');
$node_subscriber->append_conf(
'postgresql.conf', qq(
max_prepared_transactions = 10
));
$node_subscriber->start;
# Create some pre-existing content on publisher
$node_publisher->safe_psql('postgres',
"CREATE TABLE test_tab (a int primary key, b varchar)");
$node_publisher->safe_psql('postgres',
"INSERT INTO test_tab VALUES (1, 'foo'), (2, 'bar')");
$node_publisher->safe_psql('postgres', "CREATE TABLE test_tab_2 (a int)");
# Setup structure on subscriber (columns a and b are compatible with same table name on publisher)
$node_subscriber->safe_psql('postgres',
"CREATE TABLE test_tab (a int primary key, b text, c timestamptz DEFAULT now(), d bigint DEFAULT 999)"
);
$node_subscriber->safe_psql('postgres', "CREATE TABLE test_tab_2 (a int)");
# Setup logical replication (streaming = on)
my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres';
$node_publisher->safe_psql('postgres',
"CREATE PUBLICATION tap_pub FOR TABLE test_tab, test_tab_2");
my $appname = 'tap_sub';
################################
# Test using streaming mode 'on'
################################
$node_subscriber->safe_psql(
'postgres', "
CREATE SUBSCRIPTION tap_sub
CONNECTION '$publisher_connstr application_name=$appname'
PUBLICATION tap_pub
WITH (streaming = on, two_phase = on)");
# Wait for initial table sync to finish
$node_subscriber->wait_for_subscription_sync($node_publisher, $appname);
# Also wait for two-phase to be enabled
my $twophase_query =
"SELECT count(1) = 0 FROM pg_subscription WHERE subtwophasestate NOT IN ('e');";
$node_subscriber->poll_query_until('postgres', $twophase_query)
or die "Timed out while waiting for subscriber to enable twophase";
# Check initial data was copied to subscriber
my $result = $node_subscriber->safe_psql('postgres',
"SELECT count(*), count(c), count(d = 999) FROM test_tab");
is($result, qq(2|2|2), 'check initial data was copied to subscriber');
test_streaming($node_publisher, $node_subscriber, $appname, 0);
######################################
# Test using streaming mode 'parallel'
######################################
my $oldpid = $node_publisher->safe_psql('postgres',
"SELECT pid FROM pg_stat_replication WHERE application_name = '$appname' AND state = 'streaming';"
);
$node_subscriber->safe_psql('postgres',
"ALTER SUBSCRIPTION tap_sub SET(streaming = parallel)");
$node_publisher->poll_query_until('postgres',
"SELECT pid != $oldpid FROM pg_stat_replication WHERE application_name = '$appname' AND state = 'streaming';"
)
or die
"Timed out while waiting for apply to restart after changing SUBSCRIPTION";
# We need to check DEBUG logs to ensure that the parallel apply worker has
# applied the transaction. So, bump up the log verbosity.
$node_subscriber->append_conf('postgresql.conf', "log_min_messages = debug1");
$node_subscriber->reload;
# Run a query to make sure that the reload has taken effect.
$node_subscriber->safe_psql('postgres', q{SELECT 1});
test_streaming($node_publisher, $node_subscriber, $appname, 1);
# Test serializing changes to files and notify the parallel apply worker to
# apply them at the end of the transaction.
$node_subscriber->append_conf('postgresql.conf',
'debug_logical_replication_streaming = immediate');
# Reset the log_min_messages to default.
$node_subscriber->append_conf('postgresql.conf',
"log_min_messages = warning");
$node_subscriber->reload;
# Run a query to make sure that the reload has taken effect.
$node_subscriber->safe_psql('postgres', q{SELECT 1});
my $offset = -s $node_subscriber->logfile;
$node_publisher->safe_psql(
'postgres', q{
BEGIN;
INSERT INTO test_tab_2 values(1);
PREPARE TRANSACTION 'xact';
});
# Ensure that the changes are serialized.
$node_subscriber->wait_for_log(
qr/LOG: ( [A-Z0-9]+:)? logical replication apply worker will serialize the remaining changes of remote transaction \d+ to a file/,
$offset);
$node_publisher->wait_for_catchup($appname);
# Check that transaction is in prepared state on subscriber
$result = $node_subscriber->safe_psql('postgres',
"SELECT count(*) FROM pg_prepared_xacts;");
is($result, qq(1), 'transaction is prepared on subscriber');
# Check that 2PC gets committed on subscriber
$node_publisher->safe_psql('postgres', "COMMIT PREPARED 'xact';");
$node_publisher->wait_for_catchup($appname);
# Check that transaction is committed on subscriber
$result =
$node_subscriber->safe_psql('postgres', "SELECT count(*) FROM test_tab_2");
is($result, qq(1), 'transaction is committed on subscriber');
###############################
# check all the cleanup
###############################
$node_subscriber->safe_psql('postgres', "DROP SUBSCRIPTION tap_sub");
$result = $node_subscriber->safe_psql('postgres',
"SELECT count(*) FROM pg_subscription");
is($result, qq(0), 'check subscription was dropped on subscriber');
$result = $node_publisher->safe_psql('postgres',
"SELECT count(*) FROM pg_replication_slots");
is($result, qq(0), 'check replication slot was dropped on publisher');
$result = $node_subscriber->safe_psql('postgres',
"SELECT count(*) FROM pg_subscription_rel");
is($result, qq(0),
'check subscription relation status was dropped on subscriber');
$result = $node_subscriber->safe_psql('postgres',
"SELECT count(*) FROM pg_replication_origin");
is($result, qq(0), 'check replication origin was dropped on subscriber');
$node_subscriber->stop('fast');
$node_publisher->stop('fast');
done_testing();
|