# Copyright (c) 2021-2022, PostgreSQL Global Development Group # Tests that logical decoding messages use strict; use warnings; use PostgreSQL::Test::Cluster; use PostgreSQL::Test::Utils; use Test::More; # Create publisher node my $node_publisher = PostgreSQL::Test::Cluster->new('publisher'); $node_publisher->init(allows_streaming => 'logical'); $node_publisher->append_conf('postgresql.conf', 'autovacuum = off'); $node_publisher->start; # Create subscriber node my $node_subscriber = PostgreSQL::Test::Cluster->new('subscriber'); $node_subscriber->init(allows_streaming => 'logical'); $node_subscriber->start; # Create some preexisting content on publisher $node_publisher->safe_psql('postgres', "CREATE TABLE tab_test (a int primary key)"); # Setup structure on subscriber $node_subscriber->safe_psql('postgres', "CREATE TABLE tab_test (a int primary key)"); # Setup logical replication my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres'; $node_publisher->safe_psql('postgres', "CREATE PUBLICATION tap_pub FOR TABLE tab_test"); $node_subscriber->safe_psql('postgres', "CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr' PUBLICATION tap_pub" ); $node_publisher->wait_for_catchup('tap_sub'); # Ensure a transactional logical decoding message shows up on the slot $node_subscriber->safe_psql('postgres', "ALTER SUBSCRIPTION tap_sub DISABLE"); # wait for the replication slot to become inactive on the publisher $node_publisher->poll_query_until( 'postgres', "SELECT COUNT(*) FROM pg_catalog.pg_replication_slots WHERE slot_name = 'tap_sub' AND active='f'", 1); $node_publisher->safe_psql('postgres', "SELECT pg_logical_emit_message(true, 'pgoutput', 'a transactional message')" ); my $result = $node_publisher->safe_psql( 'postgres', qq( SELECT get_byte(data, 0) FROM pg_logical_slot_peek_binary_changes('tap_sub', NULL, NULL, 'proto_version', '1', 'publication_names', 'tap_pub', 'messages', 'true') )); # 66 77 67 == B M C == BEGIN MESSAGE COMMIT is( $result, qq(66 77 67), 'messages on slot are B M C with message option'); $result = $node_publisher->safe_psql( 'postgres', qq( SELECT get_byte(data, 1), encode(substr(data, 11, 8), 'escape') FROM pg_logical_slot_peek_binary_changes('tap_sub', NULL, NULL, 'proto_version', '1', 'publication_names', 'tap_pub', 'messages', 'true') OFFSET 1 LIMIT 1 )); is($result, qq(1|pgoutput), "flag transactional is set to 1 and prefix is pgoutput"); $result = $node_publisher->safe_psql( 'postgres', qq( SELECT get_byte(data, 0) FROM pg_logical_slot_get_binary_changes('tap_sub', NULL, NULL, 'proto_version', '1', 'publication_names', 'tap_pub') )); # no message and no BEGIN and COMMIT because of empty transaction optimization is($result, qq(), 'option messages defaults to false so message (M) is not available on slot' ); $node_publisher->safe_psql('postgres', "INSERT INTO tab_test VALUES (1)"); my $message_lsn = $node_publisher->safe_psql('postgres', "SELECT pg_logical_emit_message(false, 'pgoutput', 'a non-transactional message')" ); $node_publisher->safe_psql('postgres', "INSERT INTO tab_test VALUES (2)"); $result = $node_publisher->safe_psql( 'postgres', qq( SELECT get_byte(data, 0), get_byte(data, 1) FROM pg_logical_slot_get_binary_changes('tap_sub', NULL, NULL, 'proto_version', '1', 'publication_names', 'tap_pub', 'messages', 'true') WHERE lsn = '$message_lsn' AND xid = 0 )); is($result, qq(77|0), 'non-transactional message on slot is M'); # Ensure a non-transactional logical decoding message shows up on the slot when # it is emitted within an aborted transaction. The message won't emit until # something advances the LSN, hence, we intentionally forces the server to # switch to a new WAL file. $node_publisher->safe_psql( 'postgres', qq( BEGIN; SELECT pg_logical_emit_message(false, 'pgoutput', 'a non-transactional message is available even if the transaction is aborted 1'); INSERT INTO tab_test VALUES (3); SELECT pg_logical_emit_message(true, 'pgoutput', 'a transactional message is not available if the transaction is aborted'); SELECT pg_logical_emit_message(false, 'pgoutput', 'a non-transactional message is available even if the transaction is aborted 2'); ROLLBACK; SELECT pg_switch_wal(); )); $result = $node_publisher->safe_psql( 'postgres', qq( SELECT get_byte(data, 0), get_byte(data, 1) FROM pg_logical_slot_peek_binary_changes('tap_sub', NULL, NULL, 'proto_version', '1', 'publication_names', 'tap_pub', 'messages', 'true') )); is( $result, qq(77|0 77|0), 'non-transactional message on slot from aborted transaction is M'); $node_subscriber->stop('fast'); $node_publisher->stop('fast'); done_testing();