summaryrefslogtreecommitdiffstats
path: root/src/test/subscription/t/020_messages.pl
diff options
context:
space:
mode:
Diffstat (limited to 'src/test/subscription/t/020_messages.pl')
-rw-r--r--src/test/subscription/t/020_messages.pl149
1 files changed, 149 insertions, 0 deletions
diff --git a/src/test/subscription/t/020_messages.pl b/src/test/subscription/t/020_messages.pl
new file mode 100644
index 0000000..533419b
--- /dev/null
+++ b/src/test/subscription/t/020_messages.pl
@@ -0,0 +1,149 @@
+
+# Copyright (c) 2021-2022, PostgreSQL Global Development Group
+
+# Tests that logical decoding messages
+use strict;
+use warnings;
+use PostgreSQL::Test::Cluster;
+use PostgreSQL::Test::Utils;
+use Test::More;
+
+# Create publisher node
+my $node_publisher = PostgreSQL::Test::Cluster->new('publisher');
+$node_publisher->init(allows_streaming => 'logical');
+$node_publisher->append_conf('postgresql.conf', 'autovacuum = off');
+$node_publisher->start;
+
+# Create subscriber node
+my $node_subscriber = PostgreSQL::Test::Cluster->new('subscriber');
+$node_subscriber->init(allows_streaming => 'logical');
+$node_subscriber->start;
+
+# Create some preexisting content on publisher
+$node_publisher->safe_psql('postgres',
+ "CREATE TABLE tab_test (a int primary key)");
+
+# Setup structure on subscriber
+$node_subscriber->safe_psql('postgres',
+ "CREATE TABLE tab_test (a int primary key)");
+
+# Setup logical replication
+my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres';
+$node_publisher->safe_psql('postgres',
+ "CREATE PUBLICATION tap_pub FOR TABLE tab_test");
+
+$node_subscriber->safe_psql('postgres',
+ "CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr' PUBLICATION tap_pub"
+);
+
+$node_publisher->wait_for_catchup('tap_sub');
+
+# Ensure a transactional logical decoding message shows up on the slot
+$node_subscriber->safe_psql('postgres', "ALTER SUBSCRIPTION tap_sub DISABLE");
+
+# wait for the replication slot to become inactive on the publisher
+$node_publisher->poll_query_until(
+ 'postgres',
+ "SELECT COUNT(*) FROM pg_catalog.pg_replication_slots WHERE slot_name = 'tap_sub' AND active='f'",
+ 1);
+
+$node_publisher->safe_psql('postgres',
+ "SELECT pg_logical_emit_message(true, 'pgoutput', 'a transactional message')"
+);
+
+my $result = $node_publisher->safe_psql(
+ 'postgres', qq(
+ SELECT get_byte(data, 0)
+ FROM pg_logical_slot_peek_binary_changes('tap_sub', NULL, NULL,
+ 'proto_version', '1',
+ 'publication_names', 'tap_pub',
+ 'messages', 'true')
+));
+
+# 66 77 67 == B M C == BEGIN MESSAGE COMMIT
+is( $result, qq(66
+77
+67),
+ 'messages on slot are B M C with message option');
+
+$result = $node_publisher->safe_psql(
+ 'postgres', qq(
+ SELECT get_byte(data, 1), encode(substr(data, 11, 8), 'escape')
+ FROM pg_logical_slot_peek_binary_changes('tap_sub', NULL, NULL,
+ 'proto_version', '1',
+ 'publication_names', 'tap_pub',
+ 'messages', 'true')
+ OFFSET 1 LIMIT 1
+));
+
+is($result, qq(1|pgoutput),
+ "flag transactional is set to 1 and prefix is pgoutput");
+
+$result = $node_publisher->safe_psql(
+ 'postgres', qq(
+ SELECT get_byte(data, 0)
+ FROM pg_logical_slot_get_binary_changes('tap_sub', NULL, NULL,
+ 'proto_version', '1',
+ 'publication_names', 'tap_pub')
+));
+
+# no message and no BEGIN and COMMIT because of empty transaction optimization
+is($result, qq(),
+ 'option messages defaults to false so message (M) is not available on slot'
+);
+
+$node_publisher->safe_psql('postgres', "INSERT INTO tab_test VALUES (1)");
+
+my $message_lsn = $node_publisher->safe_psql('postgres',
+ "SELECT pg_logical_emit_message(false, 'pgoutput', 'a non-transactional message')"
+);
+
+$node_publisher->safe_psql('postgres', "INSERT INTO tab_test VALUES (2)");
+
+$result = $node_publisher->safe_psql(
+ 'postgres', qq(
+ SELECT get_byte(data, 0), get_byte(data, 1)
+ FROM pg_logical_slot_get_binary_changes('tap_sub', NULL, NULL,
+ 'proto_version', '1',
+ 'publication_names', 'tap_pub',
+ 'messages', 'true')
+ WHERE lsn = '$message_lsn' AND xid = 0
+));
+
+is($result, qq(77|0), 'non-transactional message on slot is M');
+
+# Ensure a non-transactional logical decoding message shows up on the slot when
+# it is emitted within an aborted transaction. The message won't emit until
+# something advances the LSN, hence, we intentionally forces the server to
+# switch to a new WAL file.
+$node_publisher->safe_psql(
+ 'postgres', qq(
+ BEGIN;
+ SELECT pg_logical_emit_message(false, 'pgoutput',
+ 'a non-transactional message is available even if the transaction is aborted 1');
+ INSERT INTO tab_test VALUES (3);
+ SELECT pg_logical_emit_message(true, 'pgoutput',
+ 'a transactional message is not available if the transaction is aborted');
+ SELECT pg_logical_emit_message(false, 'pgoutput',
+ 'a non-transactional message is available even if the transaction is aborted 2');
+ ROLLBACK;
+ SELECT pg_switch_wal();
+));
+
+$result = $node_publisher->safe_psql(
+ 'postgres', qq(
+ SELECT get_byte(data, 0), get_byte(data, 1)
+ FROM pg_logical_slot_peek_binary_changes('tap_sub', NULL, NULL,
+ 'proto_version', '1',
+ 'publication_names', 'tap_pub',
+ 'messages', 'true')
+));
+
+is( $result, qq(77|0
+77|0),
+ 'non-transactional message on slot from aborted transaction is M');
+
+$node_subscriber->stop('fast');
+$node_publisher->stop('fast');
+
+done_testing();