1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
|
# Copyright (c) 2021-2023, PostgreSQL Global Development Group
# Tests for disable_on_error and SKIP transaction features.
use strict;
use warnings;
use PostgreSQL::Test::Cluster;
use PostgreSQL::Test::Utils;
use Test::More;
my $offset = 0;
# Test skipping the transaction. This function must be called after the caller
# has inserted data that conflicts with the subscriber. The finish LSN of the
# error transaction that is used to specify to ALTER SUBSCRIPTION ... SKIP is
# fetched from the server logs. After executing ALTER SUBSCRIPTION ... SKIP, we
# check if logical replication can continue working by inserting $nonconflict_data
# on the publisher.
sub test_skip_lsn
{
my ($node_publisher, $node_subscriber, $nonconflict_data, $expected, $msg)
= @_;
# Wait until a conflict occurs on the subscriber.
$node_subscriber->poll_query_until('postgres',
"SELECT subenabled = FALSE FROM pg_subscription WHERE subname = 'sub'"
);
# Get the finish LSN of the error transaction.
my $contents = slurp_file($node_subscriber->logfile, $offset);
$contents =~
qr/processing remote data for replication origin \"pg_\d+\" during message type "INSERT" for replication target relation "public.tbl" in transaction \d+, finished at ([[:xdigit:]]+\/[[:xdigit:]]+)/
or die "could not get error-LSN";
my $lsn = $1;
# Set skip lsn.
$node_subscriber->safe_psql('postgres',
"ALTER SUBSCRIPTION sub SKIP (lsn = '$lsn')");
# Re-enable the subscription.
$node_subscriber->safe_psql('postgres', "ALTER SUBSCRIPTION sub ENABLE");
# Wait for the failed transaction to be skipped
$node_subscriber->poll_query_until('postgres',
"SELECT subskiplsn = '0/0' FROM pg_subscription WHERE subname = 'sub'"
);
# Check the log to ensure that the transaction is skipped, and advance the
# offset of the log file for the next test.
$offset = $node_subscriber->wait_for_log(
qr/LOG: ( [A-Z0-9]+:)? logical replication completed skipping transaction at LSN $lsn/,
$offset);
# Insert non-conflict data
$node_publisher->safe_psql('postgres',
"INSERT INTO tbl VALUES $nonconflict_data");
$node_publisher->wait_for_catchup('sub');
# Check replicated data
my $res =
$node_subscriber->safe_psql('postgres', "SELECT count(*) FROM tbl");
is($res, $expected, $msg);
}
# Create publisher node. Set a low value of logical_decoding_work_mem to test
# streaming cases.
my $node_publisher = PostgreSQL::Test::Cluster->new('publisher');
$node_publisher->init(allows_streaming => 'logical');
$node_publisher->append_conf(
'postgresql.conf',
qq[
logical_decoding_work_mem = 64kB
max_prepared_transactions = 10
]);
$node_publisher->start;
# Create subscriber node
my $node_subscriber = PostgreSQL::Test::Cluster->new('subscriber');
$node_subscriber->init;
$node_subscriber->append_conf(
'postgresql.conf',
qq[
max_prepared_transactions = 10
]);
$node_subscriber->start;
# Initial table setup on both publisher and subscriber. On the subscriber, we
# create the same tables but with a primary key. Also, insert some data that
# will conflict with the data replicated from publisher later.
$node_publisher->safe_psql(
'postgres',
qq[
CREATE TABLE tbl (i INT, t TEXT);
INSERT INTO tbl VALUES (1, NULL);
]);
$node_subscriber->safe_psql(
'postgres',
qq[
CREATE TABLE tbl (i INT PRIMARY KEY, t TEXT);
INSERT INTO tbl VALUES (1, NULL);
]);
# Create a pub/sub to set up logical replication. This tests that the
# uniqueness violation will cause the subscription to fail during initial
# synchronization and make it disabled.
my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres';
$node_publisher->safe_psql('postgres',
"CREATE PUBLICATION pub FOR TABLE tbl");
$node_subscriber->safe_psql('postgres',
"CREATE SUBSCRIPTION sub CONNECTION '$publisher_connstr' PUBLICATION pub WITH (disable_on_error = true, streaming = on, two_phase = on)"
);
# Initial synchronization failure causes the subscription to be disabled.
$node_subscriber->poll_query_until('postgres',
"SELECT subenabled = false FROM pg_catalog.pg_subscription WHERE subname = 'sub'"
) or die "Timed out while waiting for subscriber to be disabled";
# Truncate the table on the subscriber which caused the subscription to be
# disabled.
$node_subscriber->safe_psql('postgres', "TRUNCATE tbl");
# Re-enable the subscription "sub".
$node_subscriber->safe_psql('postgres', "ALTER SUBSCRIPTION sub ENABLE");
# Wait for the data to replicate.
$node_subscriber->wait_for_subscription_sync($node_publisher, 'sub');
# Confirm that we have finished the table sync.
my $result =
$node_subscriber->safe_psql('postgres', "SELECT COUNT(*) FROM tbl");
is($result, qq(1), "subscription sub replicated data");
# Insert data to tbl, raising an error on the subscriber due to violation
# of the unique constraint on tbl. Then skip the transaction.
$node_publisher->safe_psql(
'postgres',
qq[
BEGIN;
INSERT INTO tbl VALUES (1, NULL);
COMMIT;
]);
test_skip_lsn($node_publisher, $node_subscriber,
"(2, NULL)", "2", "test skipping transaction");
# Test for PREPARE and COMMIT PREPARED. Insert the same data to tbl and
# PREPARE the transaction, raising an error. Then skip the transaction.
$node_publisher->safe_psql(
'postgres',
qq[
BEGIN;
INSERT INTO tbl VALUES (1, NULL);
PREPARE TRANSACTION 'gtx';
COMMIT PREPARED 'gtx';
]);
test_skip_lsn($node_publisher, $node_subscriber,
"(3, NULL)", "3", "test skipping prepare and commit prepared ");
# Test for STREAM COMMIT. Insert enough rows to tbl to exceed the 64kB
# limit, also raising an error on the subscriber during applying spooled
# changes for the same reason. Then skip the transaction.
$node_publisher->safe_psql(
'postgres',
qq[
BEGIN;
INSERT INTO tbl SELECT i, md5(i::text) FROM generate_series(1, 10000) s(i);
COMMIT;
]);
test_skip_lsn($node_publisher, $node_subscriber, "(4, md5(4::text))",
"4", "test skipping stream-commit");
$result = $node_subscriber->safe_psql('postgres',
"SELECT COUNT(*) FROM pg_prepared_xacts");
is($result, "0",
"check all prepared transactions are resolved on the subscriber");
$node_subscriber->stop;
$node_publisher->stop;
done_testing();
|