1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
|
# Copyright (c) 2021-2022, PostgreSQL Global Development Group
# Checks that snapshots on standbys behave in a minimally reasonable
# way.
use strict;
use warnings;
use PostgreSQL::Test::Cluster;
use PostgreSQL::Test::Utils;
use Test::More;
# Initialize primary node
my $node_primary = PostgreSQL::Test::Cluster->new('primary');
$node_primary->init(allows_streaming => 1);
$node_primary->append_conf('postgresql.conf', 'max_prepared_transactions=10');
$node_primary->start;
# Initialize with empty test table
$node_primary->safe_psql('postgres',
'CREATE TABLE public.test_visibility (data text not null)');
# Take backup
my $backup_name = 'my_backup';
$node_primary->backup($backup_name);
# Create streaming standby from backup
my $node_standby = PostgreSQL::Test::Cluster->new('standby');
$node_standby->init_from_backup($node_primary, $backup_name,
has_streaming => 1);
$node_standby->append_conf('postgresql.conf', 'max_prepared_transactions=10');
$node_standby->start;
my $psql_timeout =
IPC::Run::timer(2 * $PostgreSQL::Test::Utils::timeout_default);
# One psql to primary and standby each, for all queries. That allows
# to check uncommitted changes being replicated and such.
my %psql_primary = (stdin => '', stdout => '', stderr => '');
$psql_primary{run} = IPC::Run::start(
[ 'psql', '-XA', '-f', '-', '-d', $node_primary->connstr('postgres') ],
'<',
\$psql_primary{stdin},
'>',
\$psql_primary{stdout},
'2>',
\$psql_primary{stderr},
$psql_timeout);
my %psql_standby = ('stdin' => '', 'stdout' => '', 'stderr' => '');
$psql_standby{run} = IPC::Run::start(
[ 'psql', '-XA', '-f', '-', '-d', $node_standby->connstr('postgres') ],
'<',
\$psql_standby{stdin},
'>',
\$psql_standby{stdout},
'2>',
\$psql_standby{stderr},
$psql_timeout);
#
# 1. Check initial data is the same
#
ok( send_query_and_wait(
\%psql_standby,
q/SELECT * FROM test_visibility ORDER BY data;/,
qr/^\(0 rows\)$/m),
'data not visible');
#
# 2. Check if an INSERT is replayed and visible
#
$node_primary->psql('postgres',
"INSERT INTO test_visibility VALUES ('first insert')");
$node_primary->wait_for_catchup($node_standby);
ok( send_query_and_wait(
\%psql_standby,
q[SELECT * FROM test_visibility ORDER BY data;],
qr/first insert.*\n\(1 row\)/m),
'insert visible');
#
# 3. Verify that uncommitted changes aren't visible.
#
ok( send_query_and_wait(
\%psql_primary,
q[
BEGIN;
UPDATE test_visibility SET data = 'first update' RETURNING data;
],
qr/^UPDATE 1$/m),
'UPDATE');
$node_primary->psql('postgres', "SELECT txid_current();"); # ensure WAL flush
$node_primary->wait_for_catchup($node_standby);
ok( send_query_and_wait(
\%psql_standby,
q[SELECT * FROM test_visibility ORDER BY data;],
qr/first insert.*\n\(1 row\)/m),
'uncommitted update invisible');
#
# 4. That a commit turns 3. visible
#
ok(send_query_and_wait(\%psql_primary, q[COMMIT;], qr/^COMMIT$/m), 'COMMIT');
$node_primary->wait_for_catchup($node_standby);
ok( send_query_and_wait(
\%psql_standby,
q[SELECT * FROM test_visibility ORDER BY data;],
qr/first update\n\(1 row\)$/m),
'committed update visible');
#
# 5. Check that changes in prepared xacts is invisible
#
ok( send_query_and_wait(
\%psql_primary, q[
DELETE from test_visibility; -- delete old data, so we start with clean slate
BEGIN;
INSERT INTO test_visibility VALUES('inserted in prepared will_commit');
PREPARE TRANSACTION 'will_commit';],
qr/^PREPARE TRANSACTION$/m),
'prepared will_commit');
ok( send_query_and_wait(
\%psql_primary, q[
BEGIN;
INSERT INTO test_visibility VALUES('inserted in prepared will_abort');
PREPARE TRANSACTION 'will_abort';
],
qr/^PREPARE TRANSACTION$/m),
'prepared will_abort');
$node_primary->wait_for_catchup($node_standby);
ok( send_query_and_wait(
\%psql_standby,
q[SELECT * FROM test_visibility ORDER BY data;],
qr/^\(0 rows\)$/m),
'uncommitted prepared invisible');
# For some variation, finish prepared xacts via separate connections
$node_primary->safe_psql('postgres', "COMMIT PREPARED 'will_commit';");
$node_primary->safe_psql('postgres', "ROLLBACK PREPARED 'will_abort';");
$node_primary->wait_for_catchup($node_standby);
ok( send_query_and_wait(
\%psql_standby,
q[SELECT * FROM test_visibility ORDER BY data;],
qr/will_commit.*\n\(1 row\)$/m),
'finished prepared visible');
# explicitly shut down psql instances gracefully - to avoid hangs
# or worse on windows
$psql_primary{stdin} .= "\\q\n";
$psql_primary{run}->finish;
$psql_standby{stdin} .= "\\q\n";
$psql_standby{run}->finish;
$node_primary->stop;
$node_standby->stop;
# Send query, wait until string matches
sub send_query_and_wait
{
my ($psql, $query, $untl) = @_;
my $ret;
# send query
$$psql{stdin} .= $query;
$$psql{stdin} .= "\n";
# wait for query results
$$psql{run}->pump_nb();
while (1)
{
last if $$psql{stdout} =~ /$untl/;
if ($psql_timeout->is_expired)
{
BAIL_OUT("aborting wait: program timed out\n"
. "stream contents: >>$$psql{stdout}<<\n"
. "pattern searched for: $untl\n");
return 0;
}
if (not $$psql{run}->pumpable())
{
BAIL_OUT("aborting wait: program died\n"
. "stream contents: >>$$psql{stdout}<<\n"
. "pattern searched for: $untl\n");
return 0;
}
$$psql{run}->pump();
}
$$psql{stdout} = '';
return 1;
}
done_testing();
|