summaryrefslogtreecommitdiffstats
path: root/src/test/subscription/t/020_messages.pl
blob: 0e218e0048b4aebb760d10127978b16e2cdd3503 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
# Copyright (c) 2021, PostgreSQL Global Development Group

# Tests that logical decoding messages
use strict;
use warnings;
use PostgresNode;
use TestLib;
use Test::More tests => 5;

# Create publisher node
my $node_publisher = get_new_node('publisher');
$node_publisher->init(allows_streaming => 'logical');
$node_publisher->append_conf('postgresql.conf', 'autovacuum = off');
$node_publisher->start;

# Create subscriber node
my $node_subscriber = get_new_node('subscriber');
$node_subscriber->init(allows_streaming => 'logical');
$node_subscriber->start;

# Create some preexisting content on publisher
$node_publisher->safe_psql('postgres',
	"CREATE TABLE tab_test (a int primary key)");

# Setup structure on subscriber
$node_subscriber->safe_psql('postgres',
	"CREATE TABLE tab_test (a int primary key)");

# Setup logical replication
my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres';
$node_publisher->safe_psql('postgres',
	"CREATE PUBLICATION tap_pub FOR TABLE tab_test");

$node_subscriber->safe_psql('postgres',
	"CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr' PUBLICATION tap_pub"
);

$node_publisher->wait_for_catchup('tap_sub');

# Ensure a transactional logical decoding message shows up on the slot
$node_subscriber->safe_psql('postgres', "ALTER SUBSCRIPTION tap_sub DISABLE");

# wait for the replication slot to become inactive in the publisher
$node_publisher->poll_query_until(
	'postgres',
	"SELECT COUNT(*) FROM pg_catalog.pg_replication_slots WHERE slot_name = 'tap_sub' AND active='f'",
	1);

$node_publisher->safe_psql('postgres',
	"SELECT pg_logical_emit_message(true, 'pgoutput', 'a transactional message')"
);

my $result = $node_publisher->safe_psql(
	'postgres', qq(
		SELECT get_byte(data, 0)
		FROM pg_logical_slot_peek_binary_changes('tap_sub', NULL, NULL,
			'proto_version', '1',
			'publication_names', 'tap_pub',
			'messages', 'true')
));

# 66 77 67 == B M C == BEGIN MESSAGE COMMIT
is( $result, qq(66
77
67),
	'messages on slot are B M C with message option');

$result = $node_publisher->safe_psql(
	'postgres', qq(
		SELECT get_byte(data, 1), encode(substr(data, 11, 8), 'escape')
		FROM pg_logical_slot_peek_binary_changes('tap_sub', NULL, NULL,
			'proto_version', '1',
			'publication_names', 'tap_pub',
			'messages', 'true')
		OFFSET 1 LIMIT 1
));

is($result, qq(1|pgoutput),
	"flag transactional is set to 1 and prefix is pgoutput");

$result = $node_publisher->safe_psql(
	'postgres', qq(
		SELECT get_byte(data, 0)
		FROM pg_logical_slot_get_binary_changes('tap_sub', NULL, NULL,
			'proto_version', '1',
			'publication_names', 'tap_pub')
));

# 66 67 == B C == BEGIN COMMIT
is( $result, qq(66
67),
	'option messages defaults to false so message (M) is not available on slot'
);

$node_publisher->safe_psql('postgres', "INSERT INTO tab_test VALUES (1)");

my $message_lsn = $node_publisher->safe_psql('postgres',
	"SELECT pg_logical_emit_message(false, 'pgoutput', 'a non-transactional message')"
);

$node_publisher->safe_psql('postgres', "INSERT INTO tab_test VALUES (2)");

$result = $node_publisher->safe_psql(
	'postgres', qq(
		SELECT get_byte(data, 0), get_byte(data, 1)
		FROM pg_logical_slot_get_binary_changes('tap_sub', NULL, NULL,
			'proto_version', '1',
			'publication_names', 'tap_pub',
			'messages', 'true')
		WHERE lsn = '$message_lsn' AND xid = 0
));

is($result, qq(77|0), 'non-transactional message on slot is M');

# Ensure a non-transactional logical decoding message shows up on the slot when
# it is emitted within an aborted transaction. The message won't emit until
# something advances the LSN, hence, we intentionally forces the server to
# switch to a new WAL file.
$node_publisher->safe_psql(
	'postgres', qq(
		BEGIN;
		SELECT pg_logical_emit_message(false, 'pgoutput',
			'a non-transactional message is available even if the transaction is aborted 1');
		INSERT INTO tab_test VALUES (3);
		SELECT pg_logical_emit_message(true, 'pgoutput',
			'a transactional message is not available if the transaction is aborted');
		SELECT pg_logical_emit_message(false, 'pgoutput',
			'a non-transactional message is available even if the transaction is aborted 2');
		ROLLBACK;
		SELECT pg_switch_wal();
));

$result = $node_publisher->safe_psql(
	'postgres', qq(
		SELECT get_byte(data, 0), get_byte(data, 1)
		FROM pg_logical_slot_peek_binary_changes('tap_sub', NULL, NULL,
			'proto_version', '1',
			'publication_names', 'tap_pub',
			'messages', 'true')
));

is( $result, qq(77|0
77|0),
	'non-transactional message on slot from aborted transaction is M');

$node_subscriber->stop('fast');
$node_publisher->stop('fast');