diff options
Diffstat (limited to 'contrib/test_decoding/sql/slot.sql')
-rw-r--r-- | contrib/test_decoding/sql/slot.sql | 178 |
1 files changed, 178 insertions, 0 deletions
diff --git a/contrib/test_decoding/sql/slot.sql b/contrib/test_decoding/sql/slot.sql new file mode 100644 index 0000000..1aa27c5 --- /dev/null +++ b/contrib/test_decoding/sql/slot.sql @@ -0,0 +1,178 @@ +-- predictability +SET synchronous_commit = on; + +SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot_p', 'test_decoding'); +SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot_t', 'test_decoding', true); + +SELECT pg_drop_replication_slot('regression_slot_p'); +SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot_p', 'test_decoding', false); + +SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot_t2', 'test_decoding', true); + +SELECT pg_create_logical_replication_slot('foo', 'nonexistent'); + +-- here we want to start a new session and wait till old one is gone +select pg_backend_pid() as oldpid \gset +\c - +SET synchronous_commit = on; + +do 'declare c int = 0; +begin + while (select count(*) from pg_replication_slots where active_pid = ' + :'oldpid' + ') > 0 loop c := c + 1; perform pg_sleep(0.01); end loop; + raise log ''slot test looped % times'', c; +end'; + +-- should fail because the temporary slots were dropped automatically +SELECT pg_drop_replication_slot('regression_slot_t'); +SELECT pg_drop_replication_slot('regression_slot_t2'); + +-- monitoring functions for slot directories +SELECT count(*) >= 0 AS ok FROM pg_ls_logicalmapdir(); +SELECT count(*) >= 0 AS ok FROM pg_ls_logicalsnapdir(); +SELECT count(*) >= 0 AS ok FROM pg_ls_replslotdir('regression_slot_p'); +SELECT count(*) >= 0 AS ok FROM pg_ls_replslotdir('not_existing_slot'); -- fails + +-- permanent slot has survived +SELECT pg_drop_replication_slot('regression_slot_p'); + +-- test switching between slots in a session +SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot1', 'test_decoding', true); + +CREATE TABLE replication_example(id SERIAL PRIMARY KEY, somedata int, text varchar(120)); +BEGIN; +INSERT INTO replication_example(somedata, text) VALUES (1, 1); +INSERT INTO replication_example(somedata, text) VALUES (1, 2); +COMMIT; + +SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot2', 'test_decoding', true); + +INSERT INTO replication_example(somedata, text) VALUES (1, 3); + +SELECT data FROM pg_logical_slot_get_changes('regression_slot1', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1'); +SELECT data FROM pg_logical_slot_get_changes('regression_slot2', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1'); + +INSERT INTO replication_example(somedata, text) VALUES (1, 4); +INSERT INTO replication_example(somedata, text) VALUES (1, 5); + +SELECT pg_current_wal_lsn() AS wal_lsn \gset + +INSERT INTO replication_example(somedata, text) VALUES (1, 6); + +SELECT end_lsn FROM pg_replication_slot_advance('regression_slot1', :'wal_lsn') \gset +SELECT slot_name FROM pg_replication_slot_advance('regression_slot2', pg_current_wal_lsn()); + +SELECT :'wal_lsn' = :'end_lsn'; + +SELECT data FROM pg_logical_slot_get_changes('regression_slot1', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1'); +SELECT data FROM pg_logical_slot_get_changes('regression_slot2', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1'); + +DROP TABLE replication_example; + +-- error +SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot1', 'test_decoding', true); + +-- both should error as they should be dropped on error +SELECT pg_drop_replication_slot('regression_slot1'); +SELECT pg_drop_replication_slot('regression_slot2'); + +-- slot advance with physical slot, error with non-reserved slot +SELECT slot_name FROM pg_create_physical_replication_slot('regression_slot3'); +SELECT pg_replication_slot_advance('regression_slot3', '0/0'); -- invalid LSN +SELECT pg_replication_slot_advance('regression_slot3', '0/1'); -- error +SELECT pg_drop_replication_slot('regression_slot3'); + +-- +-- Test copy functions for logical replication slots +-- + +-- Create and copy logical slots +SELECT 'init' FROM pg_create_logical_replication_slot('orig_slot1', 'test_decoding', false); +SELECT 'copy' FROM pg_copy_logical_replication_slot('orig_slot1', 'copied_slot1_no_change'); +SELECT 'copy' FROM pg_copy_logical_replication_slot('orig_slot1', 'copied_slot1_change_plugin', false, 'pgoutput'); +SELECT 'copy' FROM pg_copy_logical_replication_slot('orig_slot1', 'copied_slot1_change_plugin_temp', true, 'pgoutput'); + +-- Check all copied slots status +SELECT + o.slot_name, o.plugin, o.temporary, c.slot_name, c.plugin, c.temporary +FROM + (SELECT * FROM pg_replication_slots WHERE slot_name LIKE 'orig%') as o + LEFT JOIN pg_replication_slots as c ON o.restart_lsn = c.restart_lsn AND o.confirmed_flush_lsn = c.confirmed_flush_lsn +WHERE + o.slot_name != c.slot_name +ORDER BY o.slot_name, c.slot_name; + +-- Now we have maximum 4 replication slots. Check slots are properly +-- released even when raise error during creating the target slot. +SELECT 'copy' FROM pg_copy_logical_replication_slot('orig_slot1', 'failed'); -- error + +-- temporary slots were dropped automatically +SELECT pg_drop_replication_slot('orig_slot1'); +SELECT pg_drop_replication_slot('copied_slot1_no_change'); +SELECT pg_drop_replication_slot('copied_slot1_change_plugin'); + +-- Test based on the temporary logical slot +SELECT 'init' FROM pg_create_logical_replication_slot('orig_slot2', 'test_decoding', true); +SELECT 'copy' FROM pg_copy_logical_replication_slot('orig_slot2', 'copied_slot2_no_change'); +SELECT 'copy' FROM pg_copy_logical_replication_slot('orig_slot2', 'copied_slot2_change_plugin', true, 'pgoutput'); +SELECT 'copy' FROM pg_copy_logical_replication_slot('orig_slot2', 'copied_slot2_change_plugin_temp', false, 'pgoutput'); + +-- Check all copied slots status +SELECT + o.slot_name, o.plugin, o.temporary, c.slot_name, c.plugin, c.temporary +FROM + (SELECT * FROM pg_replication_slots WHERE slot_name LIKE 'orig%') as o + LEFT JOIN pg_replication_slots as c ON o.restart_lsn = c.restart_lsn AND o.confirmed_flush_lsn = c.confirmed_flush_lsn +WHERE + o.slot_name != c.slot_name +ORDER BY o.slot_name, c.slot_name; + +-- Cannot copy a logical slot to a physical slot +SELECT 'copy' FROM pg_copy_physical_replication_slot('orig_slot2', 'failed'); -- error + +-- temporary slots were dropped automatically +SELECT pg_drop_replication_slot('copied_slot2_change_plugin_temp'); + +-- +-- Test copy functions for physical replication slots +-- + +-- Create and copy physical slots +SELECT 'init' FROM pg_create_physical_replication_slot('orig_slot1', true); +SELECT 'init' FROM pg_create_physical_replication_slot('orig_slot2', false); +SELECT 'copy' FROM pg_copy_physical_replication_slot('orig_slot1', 'copied_slot1_no_change'); +SELECT 'copy' FROM pg_copy_physical_replication_slot('orig_slot1', 'copied_slot1_temp', true); + +-- Check all copied slots status. Since all slots don't reserve WAL we check only other fields. +SELECT slot_name, slot_type, temporary FROM pg_replication_slots; + +-- Cannot copy a physical slot to a logical slot +SELECT 'copy' FROM pg_copy_logical_replication_slot('orig_slot1', 'failed'); -- error + +-- Cannot copy a physical slot that doesn't reserve WAL +SELECT 'copy' FROM pg_copy_physical_replication_slot('orig_slot2', 'failed'); -- error + +-- temporary slots were dropped automatically +SELECT pg_drop_replication_slot('orig_slot1'); +SELECT pg_drop_replication_slot('orig_slot2'); +SELECT pg_drop_replication_slot('copied_slot1_no_change'); + +-- Test based on the temporary physical slot +SELECT 'init' FROM pg_create_physical_replication_slot('orig_slot2', true, true); +SELECT 'copy' FROM pg_copy_physical_replication_slot('orig_slot2', 'copied_slot2_no_change'); +SELECT 'copy' FROM pg_copy_physical_replication_slot('orig_slot2', 'copied_slot2_notemp', false); + +-- Check all copied slots status +SELECT + o.slot_name, o.temporary, c.slot_name, c.temporary +FROM + (SELECT * FROM pg_replication_slots WHERE slot_name LIKE 'orig%') as o + LEFT JOIN pg_replication_slots as c ON o.restart_lsn = c.restart_lsn +WHERE + o.slot_name != c.slot_name +ORDER BY o.slot_name, c.slot_name; + +SELECT pg_drop_replication_slot('orig_slot2'); +SELECT pg_drop_replication_slot('copied_slot2_no_change'); +SELECT pg_drop_replication_slot('copied_slot2_notemp'); |