summaryrefslogtreecommitdiffstats
path: root/src/test/subscription/t/026_stats.pl
blob: 4719321e4df2795faca4e010d632e1eca71be6f7 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
# Copyright (c) 2021-2022, PostgreSQL Global Development Group

# Tests for subscription stats.
use strict;
use warnings;
use PostgreSQL::Test::Cluster;
use PostgreSQL::Test::Utils;
use Test::More;

# Create publisher node.
my $node_publisher = PostgreSQL::Test::Cluster->new('publisher');
$node_publisher->init(allows_streaming => 'logical');
$node_publisher->start;

# Create subscriber node.
my $node_subscriber = PostgreSQL::Test::Cluster->new('subscriber');
$node_subscriber->init(allows_streaming => 'logical');
$node_subscriber->start;


sub create_sub_pub_w_errors
{
	my ($node_publisher, $node_subscriber, $db, $table_name) = @_;
	# Initial table setup on both publisher and subscriber. On subscriber we
	# create the same tables but with primary keys. Also, insert some data that
	# will conflict with the data replicated from publisher later.
	$node_publisher->safe_psql(
		$db,
		qq[
	BEGIN;
	CREATE TABLE $table_name(a int);
	INSERT INTO $table_name VALUES (1);
	COMMIT;
	]);
	$node_subscriber->safe_psql(
		$db,
		qq[
	BEGIN;
	CREATE TABLE $table_name(a int primary key);
	INSERT INTO $table_name VALUES (1);
	COMMIT;
	]);

	# Set up publication.
	my $pub_name          = $table_name . '_pub';
	my $publisher_connstr = $node_publisher->connstr . qq( dbname=$db);

	$node_publisher->safe_psql($db,
		qq(CREATE PUBLICATION $pub_name FOR TABLE $table_name));

	# Create subscription. The tablesync for table on subscription will enter into
	# infinite error loop due to violating the unique constraint.
	my $sub_name = $table_name . '_sub';
	$node_subscriber->safe_psql($db,
		qq(CREATE SUBSCRIPTION $sub_name CONNECTION '$publisher_connstr' PUBLICATION $pub_name)
	);

	$node_publisher->wait_for_catchup($sub_name);

	# Wait for the tablesync error to be reported.
	$node_subscriber->poll_query_until(
		$db,
		qq[
	SELECT sync_error_count > 0
	FROM pg_stat_subscription_stats
	WHERE subname = '$sub_name'
	])
	  or die
	  qq(Timed out while waiting for tablesync errors for subscription '$sub_name');

	# Truncate test_tab1 so that tablesync worker can continue.
	$node_subscriber->safe_psql($db, qq(TRUNCATE $table_name));

	# Wait for initial tablesync to finish.
	$node_subscriber->poll_query_until(
		$db,
		qq[
	SELECT count(1) = 1 FROM pg_subscription_rel
	WHERE srrelid = '$table_name'::regclass AND srsubstate in ('r', 's')
	])
	  or die
	  qq(Timed out while waiting for subscriber to synchronize data for table '$table_name'.);

	# Check test table on the subscriber has one row.
	my $result =
	  $node_subscriber->safe_psql($db, qq(SELECT a FROM $table_name));
	is($result, qq(1), qq(Check that table '$table_name' now has 1 row.));

	# Insert data to test table on the publisher, raising an error on the
	# subscriber due to violation of the unique constraint on test table.
	$node_publisher->safe_psql($db, qq(INSERT INTO $table_name VALUES (1)));

	# Wait for the apply error to be reported.
	$node_subscriber->poll_query_until(
		$db,
		qq[
	SELECT apply_error_count > 0
	FROM pg_stat_subscription_stats
	WHERE subname = '$sub_name'
	])
	  or die
	  qq(Timed out while waiting for apply error for subscription '$sub_name');

	# Truncate test table so that apply worker can continue.
	$node_subscriber->safe_psql($db, qq(TRUNCATE $table_name));

	return ($pub_name, $sub_name);
}

my $db = 'postgres';

# There shouldn't be any subscription errors before starting logical replication.
my $result = $node_subscriber->safe_psql($db,
	qq(SELECT count(1) FROM pg_stat_subscription_stats));
is($result, qq(0),
	'Check that there are no subscription errors before starting logical replication.'
);

# Create the publication and subscription with sync and apply errors
my $table1_name = 'test_tab1';
my ($pub1_name, $sub1_name) =
  create_sub_pub_w_errors($node_publisher, $node_subscriber, $db,
	$table1_name);

# Apply and Sync errors are > 0 and reset timestamp is NULL
is( $node_subscriber->safe_psql(
		$db,
		qq(SELECT apply_error_count > 0,
	sync_error_count > 0,
	stats_reset IS NULL
	FROM pg_stat_subscription_stats
	WHERE subname = '$sub1_name')
	),
	qq(t|t|t),
	qq(Check that apply errors and sync errors are both > 0 and stats_reset is NULL for subscription '$sub1_name'.)
);

# Reset a single subscription
$node_subscriber->safe_psql($db,
	qq(SELECT pg_stat_reset_subscription_stats((SELECT subid FROM pg_stat_subscription_stats WHERE subname = '$sub1_name')))
);

# Apply and Sync errors are 0 and stats reset is not NULL
is( $node_subscriber->safe_psql(
		$db,
		qq(SELECT apply_error_count = 0,
	sync_error_count = 0,
	stats_reset IS NOT NULL
	FROM pg_stat_subscription_stats
	WHERE subname = '$sub1_name')
	),
	qq(t|t|t),
	qq(Confirm that apply errors and sync errors are both 0 and stats_reset is not NULL after reset for subscription '$sub1_name'.)
);

# Get reset timestamp
my $reset_time1 = $node_subscriber->safe_psql($db,
	qq(SELECT stats_reset FROM pg_stat_subscription_stats WHERE subname = '$sub1_name')
);

# Reset single sub again
$node_subscriber->safe_psql(
	$db,
	qq(SELECT pg_stat_reset_subscription_stats((SELECT subid FROM
	pg_stat_subscription_stats WHERE subname = '$sub1_name')))
);

# check reset timestamp is newer after reset
is( $node_subscriber->safe_psql(
		$db,
		qq(SELECT stats_reset > '$reset_time1'::timestamptz FROM
	pg_stat_subscription_stats WHERE subname = '$sub1_name')
	),
	qq(t),
	qq(Check reset timestamp for '$sub1_name' is newer after second reset.));

# Make second subscription and publication
my $table2_name = 'test_tab2';
my ($pub2_name, $sub2_name) =
  create_sub_pub_w_errors($node_publisher, $node_subscriber, $db,
	$table2_name);

# Apply and Sync errors are > 0 and reset timestamp is NULL
is( $node_subscriber->safe_psql(
		$db,
		qq(SELECT apply_error_count > 0,
	sync_error_count > 0,
	stats_reset IS NULL
	FROM pg_stat_subscription_stats
	WHERE subname = '$sub2_name')
	),
	qq(t|t|t),
	qq(Confirm that apply errors and sync errors are both > 0 and stats_reset is NULL for sub '$sub2_name'.)
);

# Reset all subscriptions
$node_subscriber->safe_psql($db,
	qq(SELECT pg_stat_reset_subscription_stats(NULL)));

# Apply and Sync errors are 0 and stats reset is not NULL
is( $node_subscriber->safe_psql(
		$db,
		qq(SELECT apply_error_count = 0,
	sync_error_count = 0,
	stats_reset IS NOT NULL
	FROM pg_stat_subscription_stats
	WHERE subname = '$sub1_name')
	),
	qq(t|t|t),
	qq(Confirm that apply errors and sync errors are both 0 and stats_reset is not NULL for sub '$sub1_name' after reset.)
);

is( $node_subscriber->safe_psql(
		$db,
		qq(SELECT apply_error_count = 0,
	sync_error_count = 0,
	stats_reset IS NOT NULL
	FROM pg_stat_subscription_stats
	WHERE subname = '$sub2_name')
	),
	qq(t|t|t),
	qq(Confirm that apply errors and sync errors are both 0 and stats_reset is not NULL for sub '$sub2_name' after reset.)
);

$reset_time1 = $node_subscriber->safe_psql($db,
	qq(SELECT stats_reset FROM pg_stat_subscription_stats WHERE subname = '$sub1_name')
);
my $reset_time2 = $node_subscriber->safe_psql($db,
	qq(SELECT stats_reset FROM pg_stat_subscription_stats WHERE subname = '$sub2_name')
);

# Reset all subscriptions
$node_subscriber->safe_psql($db,
	qq(SELECT pg_stat_reset_subscription_stats(NULL)));

# check reset timestamp for sub1 is newer after reset
is( $node_subscriber->safe_psql(
		$db,
		qq(SELECT stats_reset > '$reset_time1'::timestamptz FROM
	pg_stat_subscription_stats WHERE subname = '$sub1_name')
	),
	qq(t),
	qq(Confirm that reset timestamp for '$sub1_name' is newer after second reset.)
);

# check reset timestamp for sub2 is newer after reset
is( $node_subscriber->safe_psql(
		$db,
		qq(SELECT stats_reset > '$reset_time2'::timestamptz FROM
	pg_stat_subscription_stats WHERE subname = '$sub2_name')
	),
	qq(t),
	qq(Confirm that reset timestamp for '$sub2_name' is newer after second reset.)
);

# Get subscription 1 oid
my $sub1_oid = $node_subscriber->safe_psql($db,
	qq(SELECT oid FROM pg_subscription WHERE subname = '$sub1_name'));

# Drop subscription 1
$node_subscriber->safe_psql($db, qq(DROP SUBSCRIPTION $sub1_name));

# Subscription stats for sub1 should be gone
is( $node_subscriber->safe_psql(
		$db, qq(SELECT pg_stat_have_stats('subscription', 0, $sub1_oid))),
	qq(f),
	qq(Subscription stats for subscription '$sub1_name' should be removed.));

# Get subscription 2 oid
my $sub2_oid = $node_subscriber->safe_psql($db,
	qq(SELECT oid FROM pg_subscription WHERE subname = '$sub2_name'));

# Diassociate the subscription 2 from its replication slot and drop it
$node_subscriber->safe_psql(
	$db,
	qq(
ALTER SUBSCRIPTION $sub2_name DISABLE;
ALTER SUBSCRIPTION $sub2_name SET (slot_name = NONE);
DROP SUBSCRIPTION $sub2_name;
			    ));

# Subscription stats for sub2 should be gone
is( $node_subscriber->safe_psql(
		$db, qq(SELECT pg_stat_have_stats('subscription', 0, $sub2_oid))),
	qq(f),
	qq(Subscription stats for subscription '$sub2_name' should be removed.));
$node_publisher->safe_psql($db,
	qq(SELECT pg_drop_replication_slot('$sub2_name')));

$node_subscriber->stop('fast');
$node_publisher->stop('fast');

done_testing();