diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-05-04 12:17:33 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-05-04 12:17:33 +0000 |
commit | 5e45211a64149b3c659b90ff2de6fa982a5a93ed (patch) | |
tree | 739caf8c461053357daa9f162bef34516c7bf452 /src/backend/replication/logical/tablesync.c | |
parent | Initial commit. (diff) | |
download | postgresql-15-5e45211a64149b3c659b90ff2de6fa982a5a93ed.tar.xz postgresql-15-5e45211a64149b3c659b90ff2de6fa982a5a93ed.zip |
Adding upstream version 15.5.upstream/15.5
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'src/backend/replication/logical/tablesync.c')
-rw-r--r-- | src/backend/replication/logical/tablesync.c | 1564 |
1 files changed, 1564 insertions, 0 deletions
diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c new file mode 100644 index 0000000..b7b933d --- /dev/null +++ b/src/backend/replication/logical/tablesync.c @@ -0,0 +1,1564 @@ +/*------------------------------------------------------------------------- + * tablesync.c + * PostgreSQL logical replication: initial table data synchronization + * + * Copyright (c) 2012-2022, PostgreSQL Global Development Group + * + * IDENTIFICATION + * src/backend/replication/logical/tablesync.c + * + * NOTES + * This file contains code for initial table data synchronization for + * logical replication. + * + * The initial data synchronization is done separately for each table, + * in a separate apply worker that only fetches the initial snapshot data + * from the publisher and then synchronizes the position in the stream with + * the main apply worker. + * + * There are several reasons for doing the synchronization this way: + * - It allows us to parallelize the initial data synchronization + * which lowers the time needed for it to happen. + * - The initial synchronization does not have to hold the xid and LSN + * for the time it takes to copy data of all tables, causing less + * bloat and lower disk consumption compared to doing the + * synchronization in a single process for the whole database. + * - It allows us to synchronize any tables added after the initial + * synchronization has finished. + * + * The stream position synchronization works in multiple steps: + * - Apply worker requests a tablesync worker to start, setting the new + * table state to INIT. + * - Tablesync worker starts; changes table state from INIT to DATASYNC while + * copying. + * - Tablesync worker does initial table copy; there is a FINISHEDCOPY (sync + * worker specific) state to indicate when the copy phase has completed, so + * if the worker crashes with this (non-memory) state then the copy will not + * be re-attempted. + * - Tablesync worker then sets table state to SYNCWAIT; waits for state change. + * - Apply worker periodically checks for tables in SYNCWAIT state. When + * any appear, it sets the table state to CATCHUP and starts loop-waiting + * until either the table state is set to SYNCDONE or the sync worker + * exits. + * - After the sync worker has seen the state change to CATCHUP, it will + * read the stream and apply changes (acting like an apply worker) until + * it catches up to the specified stream position. Then it sets the + * state to SYNCDONE. There might be zero changes applied between + * CATCHUP and SYNCDONE, because the sync worker might be ahead of the + * apply worker. + * - Once the state is set to SYNCDONE, the apply will continue tracking + * the table until it reaches the SYNCDONE stream position, at which + * point it sets state to READY and stops tracking. Again, there might + * be zero changes in between. + * + * So the state progression is always: INIT -> DATASYNC -> FINISHEDCOPY + * -> SYNCWAIT -> CATCHUP -> SYNCDONE -> READY. + * + * The catalog pg_subscription_rel is used to keep information about + * subscribed tables and their state. The catalog holds all states + * except SYNCWAIT and CATCHUP which are only in shared memory. + * + * Example flows look like this: + * - Apply is in front: + * sync:8 + * -> set in catalog FINISHEDCOPY + * -> set in memory SYNCWAIT + * apply:10 + * -> set in memory CATCHUP + * -> enter wait-loop + * sync:10 + * -> set in catalog SYNCDONE + * -> exit + * apply:10 + * -> exit wait-loop + * -> continue rep + * apply:11 + * -> set in catalog READY + * + * - Sync is in front: + * sync:10 + * -> set in catalog FINISHEDCOPY + * -> set in memory SYNCWAIT + * apply:8 + * -> set in memory CATCHUP + * -> continue per-table filtering + * sync:10 + * -> set in catalog SYNCDONE + * -> exit + * apply:10 + * -> set in catalog READY + * -> stop per-table filtering + * -> continue rep + *------------------------------------------------------------------------- + */ + +#include "postgres.h" + +#include "access/table.h" +#include "access/xact.h" +#include "catalog/indexing.h" +#include "catalog/pg_subscription_rel.h" +#include "catalog/pg_type.h" +#include "commands/copy.h" +#include "miscadmin.h" +#include "parser/parse_relation.h" +#include "pgstat.h" +#include "replication/logicallauncher.h" +#include "replication/logicalrelation.h" +#include "replication/walreceiver.h" +#include "replication/worker_internal.h" +#include "replication/slot.h" +#include "replication/origin.h" +#include "storage/ipc.h" +#include "storage/lmgr.h" +#include "utils/acl.h" +#include "utils/array.h" +#include "utils/builtins.h" +#include "utils/lsyscache.h" +#include "utils/memutils.h" +#include "utils/rls.h" +#include "utils/snapmgr.h" +#include "utils/syscache.h" + +static bool table_states_valid = false; +static List *table_states_not_ready = NIL; +static bool FetchTableStates(bool *started_tx); + +static StringInfo copybuf = NULL; + +/* + * Exit routine for synchronization worker. + */ +static void +pg_attribute_noreturn() +finish_sync_worker(void) +{ + /* + * Commit any outstanding transaction. This is the usual case, unless + * there was nothing to do for the table. + */ + if (IsTransactionState()) + { + CommitTransactionCommand(); + pgstat_report_stat(true); + } + + /* And flush all writes. */ + XLogFlush(GetXLogWriteRecPtr()); + + StartTransactionCommand(); + ereport(LOG, + (errmsg("logical replication table synchronization worker for subscription \"%s\", table \"%s\" has finished", + MySubscription->name, + get_rel_name(MyLogicalRepWorker->relid)))); + CommitTransactionCommand(); + + /* Find the main apply worker and signal it. */ + logicalrep_worker_wakeup(MyLogicalRepWorker->subid, InvalidOid); + + /* Stop gracefully */ + proc_exit(0); +} + +/* + * Wait until the relation sync state is set in the catalog to the expected + * one; return true when it happens. + * + * Returns false if the table sync worker or the table itself have + * disappeared, or the table state has been reset. + * + * Currently, this is used in the apply worker when transitioning from + * CATCHUP state to SYNCDONE. + */ +static bool +wait_for_relation_state_change(Oid relid, char expected_state) +{ + char state; + + for (;;) + { + LogicalRepWorker *worker; + XLogRecPtr statelsn; + + CHECK_FOR_INTERRUPTS(); + + InvalidateCatalogSnapshot(); + state = GetSubscriptionRelState(MyLogicalRepWorker->subid, + relid, &statelsn); + + if (state == SUBREL_STATE_UNKNOWN) + break; + + if (state == expected_state) + return true; + + /* Check if the sync worker is still running and bail if not. */ + LWLockAcquire(LogicalRepWorkerLock, LW_SHARED); + worker = logicalrep_worker_find(MyLogicalRepWorker->subid, relid, + false); + LWLockRelease(LogicalRepWorkerLock); + if (!worker) + break; + + (void) WaitLatch(MyLatch, + WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH, + 1000L, WAIT_EVENT_LOGICAL_SYNC_STATE_CHANGE); + + ResetLatch(MyLatch); + } + + return false; +} + +/* + * Wait until the apply worker changes the state of our synchronization + * worker to the expected one. + * + * Used when transitioning from SYNCWAIT state to CATCHUP. + * + * Returns false if the apply worker has disappeared. + */ +static bool +wait_for_worker_state_change(char expected_state) +{ + int rc; + + for (;;) + { + LogicalRepWorker *worker; + + CHECK_FOR_INTERRUPTS(); + + /* + * Done if already in correct state. (We assume this fetch is atomic + * enough to not give a misleading answer if we do it with no lock.) + */ + if (MyLogicalRepWorker->relstate == expected_state) + return true; + + /* + * Bail out if the apply worker has died, else signal it we're + * waiting. + */ + LWLockAcquire(LogicalRepWorkerLock, LW_SHARED); + worker = logicalrep_worker_find(MyLogicalRepWorker->subid, + InvalidOid, false); + if (worker && worker->proc) + logicalrep_worker_wakeup_ptr(worker); + LWLockRelease(LogicalRepWorkerLock); + if (!worker) + break; + + /* + * Wait. We expect to get a latch signal back from the apply worker, + * but use a timeout in case it dies without sending one. + */ + rc = WaitLatch(MyLatch, + WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH, + 1000L, WAIT_EVENT_LOGICAL_SYNC_STATE_CHANGE); + + if (rc & WL_LATCH_SET) + ResetLatch(MyLatch); + } + + return false; +} + +/* + * Callback from syscache invalidation. + */ +void +invalidate_syncing_table_states(Datum arg, int cacheid, uint32 hashvalue) +{ + table_states_valid = false; +} + +/* + * Handle table synchronization cooperation from the synchronization + * worker. + * + * If the sync worker is in CATCHUP state and reached (or passed) the + * predetermined synchronization point in the WAL stream, mark the table as + * SYNCDONE and finish. + */ +static void +process_syncing_tables_for_sync(XLogRecPtr current_lsn) +{ + SpinLockAcquire(&MyLogicalRepWorker->relmutex); + + if (MyLogicalRepWorker->relstate == SUBREL_STATE_CATCHUP && + current_lsn >= MyLogicalRepWorker->relstate_lsn) + { + TimeLineID tli; + char syncslotname[NAMEDATALEN] = {0}; + + MyLogicalRepWorker->relstate = SUBREL_STATE_SYNCDONE; + MyLogicalRepWorker->relstate_lsn = current_lsn; + + SpinLockRelease(&MyLogicalRepWorker->relmutex); + + /* + * UpdateSubscriptionRelState must be called within a transaction. + * That transaction will be ended within the finish_sync_worker(). + */ + if (!IsTransactionState()) + StartTransactionCommand(); + + UpdateSubscriptionRelState(MyLogicalRepWorker->subid, + MyLogicalRepWorker->relid, + MyLogicalRepWorker->relstate, + MyLogicalRepWorker->relstate_lsn); + + /* + * End streaming so that LogRepWorkerWalRcvConn can be used to drop + * the slot. + */ + walrcv_endstreaming(LogRepWorkerWalRcvConn, &tli); + + /* + * Cleanup the tablesync slot. + * + * This has to be done after updating the state because otherwise if + * there is an error while doing the database operations we won't be + * able to rollback dropped slot. + */ + ReplicationSlotNameForTablesync(MyLogicalRepWorker->subid, + MyLogicalRepWorker->relid, + syncslotname, + sizeof(syncslotname)); + + /* + * It is important to give an error if we are unable to drop the slot, + * otherwise, it won't be dropped till the corresponding subscription + * is dropped. So passing missing_ok = false. + */ + ReplicationSlotDropAtPubNode(LogRepWorkerWalRcvConn, syncslotname, false); + + finish_sync_worker(); + } + else + SpinLockRelease(&MyLogicalRepWorker->relmutex); +} + +/* + * Handle table synchronization cooperation from the apply worker. + * + * Walk over all subscription tables that are individually tracked by the + * apply process (currently, all that have state other than + * SUBREL_STATE_READY) and manage synchronization for them. + * + * If there are tables that need synchronizing and are not being synchronized + * yet, start sync workers for them (if there are free slots for sync + * workers). To prevent starting the sync worker for the same relation at a + * high frequency after a failure, we store its last start time with each sync + * state info. We start the sync worker for the same relation after waiting + * at least wal_retrieve_retry_interval. + * + * For tables that are being synchronized already, check if sync workers + * either need action from the apply worker or have finished. This is the + * SYNCWAIT to CATCHUP transition. + * + * If the synchronization position is reached (SYNCDONE), then the table can + * be marked as READY and is no longer tracked. + */ +static void +process_syncing_tables_for_apply(XLogRecPtr current_lsn) +{ + struct tablesync_start_time_mapping + { + Oid relid; + TimestampTz last_start_time; + }; + static HTAB *last_start_times = NULL; + ListCell *lc; + bool started_tx = false; + + Assert(!IsTransactionState()); + + /* We need up-to-date sync state info for subscription tables here. */ + FetchTableStates(&started_tx); + + /* + * Prepare a hash table for tracking last start times of workers, to avoid + * immediate restarts. We don't need it if there are no tables that need + * syncing. + */ + if (table_states_not_ready && !last_start_times) + { + HASHCTL ctl; + + ctl.keysize = sizeof(Oid); + ctl.entrysize = sizeof(struct tablesync_start_time_mapping); + last_start_times = hash_create("Logical replication table sync worker start times", + 256, &ctl, HASH_ELEM | HASH_BLOBS); + } + + /* + * Clean up the hash table when we're done with all tables (just to + * release the bit of memory). + */ + else if (!table_states_not_ready && last_start_times) + { + hash_destroy(last_start_times); + last_start_times = NULL; + } + + /* + * Even when the two_phase mode is requested by the user, it remains as + * 'pending' until all tablesyncs have reached READY state. + * + * When this happens, we restart the apply worker and (if the conditions + * are still ok) then the two_phase tri-state will become 'enabled' at + * that time. + * + * Note: If the subscription has no tables then leave the state as + * PENDING, which allows ALTER SUBSCRIPTION ... REFRESH PUBLICATION to + * work. + */ + if (MySubscription->twophasestate == LOGICALREP_TWOPHASE_STATE_PENDING && + AllTablesyncsReady()) + { + ereport(LOG, + (errmsg("logical replication apply worker for subscription \"%s\" will restart so that two_phase can be enabled", + MySubscription->name))); + + proc_exit(0); + } + + /* + * Process all tables that are being synchronized. + */ + foreach(lc, table_states_not_ready) + { + SubscriptionRelState *rstate = (SubscriptionRelState *) lfirst(lc); + + if (rstate->state == SUBREL_STATE_SYNCDONE) + { + /* + * Apply has caught up to the position where the table sync has + * finished. Mark the table as ready so that the apply will just + * continue to replicate it normally. + */ + if (current_lsn >= rstate->lsn) + { + char originname[NAMEDATALEN]; + + rstate->state = SUBREL_STATE_READY; + rstate->lsn = current_lsn; + if (!started_tx) + { + StartTransactionCommand(); + started_tx = true; + } + + /* + * Remove the tablesync origin tracking if exists. + * + * The normal case origin drop is done here instead of in the + * process_syncing_tables_for_sync function because we don't + * allow to drop the origin till the process owning the origin + * is alive. + * + * There is a chance that the user is concurrently performing + * refresh for the subscription where we remove the table + * state and its origin and by this time the origin might be + * already removed. So passing missing_ok = true. + */ + ReplicationOriginNameForTablesync(MyLogicalRepWorker->subid, + rstate->relid, + originname, + sizeof(originname)); + replorigin_drop_by_name(originname, true, false); + + /* + * Update the state to READY only after the origin cleanup. + */ + UpdateSubscriptionRelState(MyLogicalRepWorker->subid, + rstate->relid, rstate->state, + rstate->lsn); + } + } + else + { + LogicalRepWorker *syncworker; + + /* + * Look for a sync worker for this relation. + */ + LWLockAcquire(LogicalRepWorkerLock, LW_SHARED); + + syncworker = logicalrep_worker_find(MyLogicalRepWorker->subid, + rstate->relid, false); + + if (syncworker) + { + /* Found one, update our copy of its state */ + SpinLockAcquire(&syncworker->relmutex); + rstate->state = syncworker->relstate; + rstate->lsn = syncworker->relstate_lsn; + if (rstate->state == SUBREL_STATE_SYNCWAIT) + { + /* + * Sync worker is waiting for apply. Tell sync worker it + * can catchup now. + */ + syncworker->relstate = SUBREL_STATE_CATCHUP; + syncworker->relstate_lsn = + Max(syncworker->relstate_lsn, current_lsn); + } + SpinLockRelease(&syncworker->relmutex); + + /* If we told worker to catch up, wait for it. */ + if (rstate->state == SUBREL_STATE_SYNCWAIT) + { + /* Signal the sync worker, as it may be waiting for us. */ + if (syncworker->proc) + logicalrep_worker_wakeup_ptr(syncworker); + + /* Now safe to release the LWLock */ + LWLockRelease(LogicalRepWorkerLock); + + /* + * Enter busy loop and wait for synchronization worker to + * reach expected state (or die trying). + */ + if (!started_tx) + { + StartTransactionCommand(); + started_tx = true; + } + + wait_for_relation_state_change(rstate->relid, + SUBREL_STATE_SYNCDONE); + } + else + LWLockRelease(LogicalRepWorkerLock); + } + else + { + /* + * If there is no sync worker for this table yet, count + * running sync workers for this subscription, while we have + * the lock. + */ + int nsyncworkers = + logicalrep_sync_worker_count(MyLogicalRepWorker->subid); + + /* Now safe to release the LWLock */ + LWLockRelease(LogicalRepWorkerLock); + + /* + * If there are free sync worker slot(s), start a new sync + * worker for the table. + */ + if (nsyncworkers < max_sync_workers_per_subscription) + { + TimestampTz now = GetCurrentTimestamp(); + struct tablesync_start_time_mapping *hentry; + bool found; + + hentry = hash_search(last_start_times, &rstate->relid, + HASH_ENTER, &found); + + if (!found || + TimestampDifferenceExceeds(hentry->last_start_time, now, + wal_retrieve_retry_interval)) + { + logicalrep_worker_launch(MyLogicalRepWorker->dbid, + MySubscription->oid, + MySubscription->name, + MyLogicalRepWorker->userid, + rstate->relid); + hentry->last_start_time = now; + } + } + } + } + } + + if (started_tx) + { + CommitTransactionCommand(); + pgstat_report_stat(true); + } +} + +/* + * Process possible state change(s) of tables that are being synchronized. + */ +void +process_syncing_tables(XLogRecPtr current_lsn) +{ + if (am_tablesync_worker()) + process_syncing_tables_for_sync(current_lsn); + else + process_syncing_tables_for_apply(current_lsn); +} + +/* + * Create list of columns for COPY based on logical relation mapping. + */ +static List * +make_copy_attnamelist(LogicalRepRelMapEntry *rel) +{ + List *attnamelist = NIL; + int i; + + for (i = 0; i < rel->remoterel.natts; i++) + { + attnamelist = lappend(attnamelist, + makeString(rel->remoterel.attnames[i])); + } + + + return attnamelist; +} + +/* + * Data source callback for the COPY FROM, which reads from the remote + * connection and passes the data back to our local COPY. + */ +static int +copy_read_data(void *outbuf, int minread, int maxread) +{ + int bytesread = 0; + int avail; + + /* If there are some leftover data from previous read, use it. */ + avail = copybuf->len - copybuf->cursor; + if (avail) + { + if (avail > maxread) + avail = maxread; + memcpy(outbuf, ©buf->data[copybuf->cursor], avail); + copybuf->cursor += avail; + maxread -= avail; + bytesread += avail; + } + + while (maxread > 0 && bytesread < minread) + { + pgsocket fd = PGINVALID_SOCKET; + int len; + char *buf = NULL; + + for (;;) + { + /* Try read the data. */ + len = walrcv_receive(LogRepWorkerWalRcvConn, &buf, &fd); + + CHECK_FOR_INTERRUPTS(); + + if (len == 0) + break; + else if (len < 0) + return bytesread; + else + { + /* Process the data */ + copybuf->data = buf; + copybuf->len = len; + copybuf->cursor = 0; + + avail = copybuf->len - copybuf->cursor; + if (avail > maxread) + avail = maxread; + memcpy(outbuf, ©buf->data[copybuf->cursor], avail); + outbuf = (void *) ((char *) outbuf + avail); + copybuf->cursor += avail; + maxread -= avail; + bytesread += avail; + } + + if (maxread <= 0 || bytesread >= minread) + return bytesread; + } + + /* + * Wait for more data or latch. + */ + (void) WaitLatchOrSocket(MyLatch, + WL_SOCKET_READABLE | WL_LATCH_SET | + WL_TIMEOUT | WL_EXIT_ON_PM_DEATH, + fd, 1000L, WAIT_EVENT_LOGICAL_SYNC_DATA); + + ResetLatch(MyLatch); + } + + return bytesread; +} + + +/* + * Get information about remote relation in similar fashion the RELATION + * message provides during replication. This function also returns the relation + * qualifications to be used in the COPY command. + */ +static void +fetch_remote_table_info(char *nspname, char *relname, + LogicalRepRelation *lrel, List **qual) +{ + WalRcvExecResult *res; + StringInfoData cmd; + TupleTableSlot *slot; + Oid tableRow[] = {OIDOID, CHAROID, CHAROID}; + Oid attrRow[] = {INT2OID, TEXTOID, OIDOID, BOOLOID}; + Oid qualRow[] = {TEXTOID}; + bool isnull; + int natt; + ListCell *lc; + Bitmapset *included_cols = NULL; + + lrel->nspname = nspname; + lrel->relname = relname; + + /* First fetch Oid and replica identity. */ + initStringInfo(&cmd); + appendStringInfo(&cmd, "SELECT c.oid, c.relreplident, c.relkind" + " FROM pg_catalog.pg_class c" + " INNER JOIN pg_catalog.pg_namespace n" + " ON (c.relnamespace = n.oid)" + " WHERE n.nspname = %s" + " AND c.relname = %s", + quote_literal_cstr(nspname), + quote_literal_cstr(relname)); + res = walrcv_exec(LogRepWorkerWalRcvConn, cmd.data, + lengthof(tableRow), tableRow); + + if (res->status != WALRCV_OK_TUPLES) + ereport(ERROR, + (errcode(ERRCODE_CONNECTION_FAILURE), + errmsg("could not fetch table info for table \"%s.%s\" from publisher: %s", + nspname, relname, res->err))); + + slot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple); + if (!tuplestore_gettupleslot(res->tuplestore, true, false, slot)) + ereport(ERROR, + (errcode(ERRCODE_UNDEFINED_OBJECT), + errmsg("table \"%s.%s\" not found on publisher", + nspname, relname))); + + lrel->remoteid = DatumGetObjectId(slot_getattr(slot, 1, &isnull)); + Assert(!isnull); + lrel->replident = DatumGetChar(slot_getattr(slot, 2, &isnull)); + Assert(!isnull); + lrel->relkind = DatumGetChar(slot_getattr(slot, 3, &isnull)); + Assert(!isnull); + + ExecDropSingleTupleTableSlot(slot); + walrcv_clear_result(res); + + + /* + * Get column lists for each relation. + * + * We need to do this before fetching info about column names and types, + * so that we can skip columns that should not be replicated. + */ + if (walrcv_server_version(LogRepWorkerWalRcvConn) >= 150000) + { + WalRcvExecResult *pubres; + TupleTableSlot *tslot; + Oid attrsRow[] = {INT2VECTOROID}; + StringInfoData pub_names; + initStringInfo(&pub_names); + foreach(lc, MySubscription->publications) + { + if (foreach_current_index(lc) > 0) + appendStringInfo(&pub_names, ", "); + appendStringInfoString(&pub_names, quote_literal_cstr(strVal(lfirst(lc)))); + } + + /* + * Fetch info about column lists for the relation (from all the + * publications). + */ + resetStringInfo(&cmd); + appendStringInfo(&cmd, + "SELECT DISTINCT" + " (CASE WHEN (array_length(gpt.attrs, 1) = c.relnatts)" + " THEN NULL ELSE gpt.attrs END)" + " FROM pg_publication p," + " LATERAL pg_get_publication_tables(p.pubname) gpt," + " pg_class c" + " WHERE gpt.relid = %u AND c.oid = gpt.relid" + " AND p.pubname IN ( %s )", + lrel->remoteid, + pub_names.data); + + pubres = walrcv_exec(LogRepWorkerWalRcvConn, cmd.data, + lengthof(attrsRow), attrsRow); + + if (pubres->status != WALRCV_OK_TUPLES) + ereport(ERROR, + (errcode(ERRCODE_CONNECTION_FAILURE), + errmsg("could not fetch column list info for table \"%s.%s\" from publisher: %s", + nspname, relname, pubres->err))); + + /* + * We don't support the case where the column list is different for + * the same table when combining publications. See comments atop + * fetch_table_list. So there should be only one row returned. + * Although we already checked this when creating the subscription, we + * still need to check here in case the column list was changed after + * creating the subscription and before the sync worker is started. + */ + if (tuplestore_tuple_count(pubres->tuplestore) > 1) + ereport(ERROR, + errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("cannot use different column lists for table \"%s.%s\" in different publications", + nspname, relname)); + + /* + * Get the column list and build a single bitmap with the attnums. + * + * If we find a NULL value, it means all the columns should be + * replicated. + */ + tslot = MakeSingleTupleTableSlot(pubres->tupledesc, &TTSOpsMinimalTuple); + if (tuplestore_gettupleslot(pubres->tuplestore, true, false, tslot)) + { + Datum cfval = slot_getattr(tslot, 1, &isnull); + + if (!isnull) + { + ArrayType *arr; + int nelems; + int16 *elems; + + arr = DatumGetArrayTypeP(cfval); + nelems = ARR_DIMS(arr)[0]; + elems = (int16 *) ARR_DATA_PTR(arr); + + for (natt = 0; natt < nelems; natt++) + included_cols = bms_add_member(included_cols, elems[natt]); + } + + ExecClearTuple(tslot); + } + ExecDropSingleTupleTableSlot(tslot); + + walrcv_clear_result(pubres); + + pfree(pub_names.data); + } + + /* + * Now fetch column names and types. + */ + resetStringInfo(&cmd); + appendStringInfo(&cmd, + "SELECT a.attnum," + " a.attname," + " a.atttypid," + " a.attnum = ANY(i.indkey)" + " FROM pg_catalog.pg_attribute a" + " LEFT JOIN pg_catalog.pg_index i" + " ON (i.indexrelid = pg_get_replica_identity_index(%u))" + " WHERE a.attnum > 0::pg_catalog.int2" + " AND NOT a.attisdropped %s" + " AND a.attrelid = %u" + " ORDER BY a.attnum", + lrel->remoteid, + (walrcv_server_version(LogRepWorkerWalRcvConn) >= 120000 ? + "AND a.attgenerated = ''" : ""), + lrel->remoteid); + res = walrcv_exec(LogRepWorkerWalRcvConn, cmd.data, + lengthof(attrRow), attrRow); + + if (res->status != WALRCV_OK_TUPLES) + ereport(ERROR, + (errcode(ERRCODE_CONNECTION_FAILURE), + errmsg("could not fetch table info for table \"%s.%s\" from publisher: %s", + nspname, relname, res->err))); + + /* We don't know the number of rows coming, so allocate enough space. */ + lrel->attnames = palloc0(MaxTupleAttributeNumber * sizeof(char *)); + lrel->atttyps = palloc0(MaxTupleAttributeNumber * sizeof(Oid)); + lrel->attkeys = NULL; + + /* + * Store the columns as a list of names. Ignore those that are not + * present in the column list, if there is one. + */ + natt = 0; + slot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple); + while (tuplestore_gettupleslot(res->tuplestore, true, false, slot)) + { + char *rel_colname; + AttrNumber attnum; + + attnum = DatumGetInt16(slot_getattr(slot, 1, &isnull)); + Assert(!isnull); + + /* If the column is not in the column list, skip it. */ + if (included_cols != NULL && !bms_is_member(attnum, included_cols)) + { + ExecClearTuple(slot); + continue; + } + + rel_colname = TextDatumGetCString(slot_getattr(slot, 2, &isnull)); + Assert(!isnull); + + lrel->attnames[natt] = rel_colname; + lrel->atttyps[natt] = DatumGetObjectId(slot_getattr(slot, 3, &isnull)); + Assert(!isnull); + + if (DatumGetBool(slot_getattr(slot, 4, &isnull))) + lrel->attkeys = bms_add_member(lrel->attkeys, natt); + + /* Should never happen. */ + if (++natt >= MaxTupleAttributeNumber) + elog(ERROR, "too many columns in remote table \"%s.%s\"", + nspname, relname); + + ExecClearTuple(slot); + } + ExecDropSingleTupleTableSlot(slot); + + lrel->natts = natt; + + walrcv_clear_result(res); + + /* + * Get relation's row filter expressions. DISTINCT avoids the same + * expression of a table in multiple publications from being included + * multiple times in the final expression. + * + * We need to copy the row even if it matches just one of the + * publications, so we later combine all the quals with OR. + * + * For initial synchronization, row filtering can be ignored in following + * cases: + * + * 1) one of the subscribed publications for the table hasn't specified + * any row filter + * + * 2) one of the subscribed publications has puballtables set to true + * + * 3) one of the subscribed publications is declared as TABLES IN SCHEMA + * that includes this relation + */ + if (walrcv_server_version(LogRepWorkerWalRcvConn) >= 150000) + { + StringInfoData pub_names; + + /* Build the pubname list. */ + initStringInfo(&pub_names); + foreach(lc, MySubscription->publications) + { + char *pubname = strVal(lfirst(lc)); + + if (foreach_current_index(lc) > 0) + appendStringInfoString(&pub_names, ", "); + + appendStringInfoString(&pub_names, quote_literal_cstr(pubname)); + } + + /* Check for row filters. */ + resetStringInfo(&cmd); + appendStringInfo(&cmd, + "SELECT DISTINCT pg_get_expr(gpt.qual, gpt.relid)" + " FROM pg_publication p," + " LATERAL pg_get_publication_tables(p.pubname) gpt" + " WHERE gpt.relid = %u" + " AND p.pubname IN ( %s )", + lrel->remoteid, + pub_names.data); + + res = walrcv_exec(LogRepWorkerWalRcvConn, cmd.data, 1, qualRow); + + if (res->status != WALRCV_OK_TUPLES) + ereport(ERROR, + (errmsg("could not fetch table WHERE clause info for table \"%s.%s\" from publisher: %s", + nspname, relname, res->err))); + + /* + * Multiple row filter expressions for the same table will be combined + * by COPY using OR. If any of the filter expressions for this table + * are null, it means the whole table will be copied. In this case it + * is not necessary to construct a unified row filter expression at + * all. + */ + slot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple); + while (tuplestore_gettupleslot(res->tuplestore, true, false, slot)) + { + Datum rf = slot_getattr(slot, 1, &isnull); + + if (!isnull) + *qual = lappend(*qual, makeString(TextDatumGetCString(rf))); + else + { + /* Ignore filters and cleanup as necessary. */ + if (*qual) + { + list_free_deep(*qual); + *qual = NIL; + } + break; + } + + ExecClearTuple(slot); + } + ExecDropSingleTupleTableSlot(slot); + + walrcv_clear_result(res); + } + + pfree(cmd.data); +} + +/* + * Copy existing data of a table from publisher. + * + * Caller is responsible for locking the local relation. + */ +static void +copy_table(Relation rel) +{ + LogicalRepRelMapEntry *relmapentry; + LogicalRepRelation lrel; + List *qual = NIL; + WalRcvExecResult *res; + StringInfoData cmd; + CopyFromState cstate; + List *attnamelist; + ParseState *pstate; + + /* Get the publisher relation info. */ + fetch_remote_table_info(get_namespace_name(RelationGetNamespace(rel)), + RelationGetRelationName(rel), &lrel, &qual); + + /* Put the relation into relmap. */ + logicalrep_relmap_update(&lrel); + + /* Map the publisher relation to local one. */ + relmapentry = logicalrep_rel_open(lrel.remoteid, NoLock); + Assert(rel == relmapentry->localrel); + + /* Start copy on the publisher. */ + initStringInfo(&cmd); + + /* Regular table with no row filter */ + if (lrel.relkind == RELKIND_RELATION && qual == NIL) + { + appendStringInfo(&cmd, "COPY %s (", + quote_qualified_identifier(lrel.nspname, lrel.relname)); + + /* + * XXX Do we need to list the columns in all cases? Maybe we're + * replicating all columns? + */ + for (int i = 0; i < lrel.natts; i++) + { + if (i > 0) + appendStringInfoString(&cmd, ", "); + + appendStringInfoString(&cmd, quote_identifier(lrel.attnames[i])); + } + + appendStringInfo(&cmd, ") TO STDOUT"); + } + else + { + /* + * For non-tables and tables with row filters, we need to do COPY + * (SELECT ...), but we can't just do SELECT * because we need to not + * copy generated columns. For tables with any row filters, build a + * SELECT query with OR'ed row filters for COPY. + */ + appendStringInfoString(&cmd, "COPY (SELECT "); + for (int i = 0; i < lrel.natts; i++) + { + appendStringInfoString(&cmd, quote_identifier(lrel.attnames[i])); + if (i < lrel.natts - 1) + appendStringInfoString(&cmd, ", "); + } + + appendStringInfoString(&cmd, " FROM "); + + /* + * For regular tables, make sure we don't copy data from a child that + * inherits the named table as those will be copied separately. + */ + if (lrel.relkind == RELKIND_RELATION) + appendStringInfoString(&cmd, "ONLY "); + + appendStringInfoString(&cmd, quote_qualified_identifier(lrel.nspname, lrel.relname)); + /* list of OR'ed filters */ + if (qual != NIL) + { + ListCell *lc; + char *q = strVal(linitial(qual)); + + appendStringInfo(&cmd, " WHERE %s", q); + for_each_from(lc, qual, 1) + { + q = strVal(lfirst(lc)); + appendStringInfo(&cmd, " OR %s", q); + } + list_free_deep(qual); + } + + appendStringInfoString(&cmd, ") TO STDOUT"); + } + res = walrcv_exec(LogRepWorkerWalRcvConn, cmd.data, 0, NULL); + pfree(cmd.data); + if (res->status != WALRCV_OK_COPY_OUT) + ereport(ERROR, + (errcode(ERRCODE_CONNECTION_FAILURE), + errmsg("could not start initial contents copy for table \"%s.%s\": %s", + lrel.nspname, lrel.relname, res->err))); + walrcv_clear_result(res); + + copybuf = makeStringInfo(); + + pstate = make_parsestate(NULL); + (void) addRangeTableEntryForRelation(pstate, rel, AccessShareLock, + NULL, false, false); + + attnamelist = make_copy_attnamelist(relmapentry); + cstate = BeginCopyFrom(pstate, rel, NULL, NULL, false, copy_read_data, attnamelist, NIL); + + /* Do the copy */ + (void) CopyFrom(cstate); + + logicalrep_rel_close(relmapentry, NoLock); +} + +/* + * Determine the tablesync slot name. + * + * The name must not exceed NAMEDATALEN - 1 because of remote node constraints + * on slot name length. We append system_identifier to avoid slot_name + * collision with subscriptions in other clusters. With the current scheme + * pg_%u_sync_%u_UINT64_FORMAT (3 + 10 + 6 + 10 + 20 + '\0'), the maximum + * length of slot_name will be 50. + * + * The returned slot name is stored in the supplied buffer (syncslotname) with + * the given size. + * + * Note: We don't use the subscription slot name as part of tablesync slot name + * because we are responsible for cleaning up these slots and it could become + * impossible to recalculate what name to cleanup if the subscription slot name + * had changed. + */ +void +ReplicationSlotNameForTablesync(Oid suboid, Oid relid, + char *syncslotname, int szslot) +{ + snprintf(syncslotname, szslot, "pg_%u_sync_%u_" UINT64_FORMAT, suboid, + relid, GetSystemIdentifier()); +} + +/* + * Form the origin name for tablesync. + * + * Return the name in the supplied buffer. + */ +void +ReplicationOriginNameForTablesync(Oid suboid, Oid relid, + char *originname, int szorgname) +{ + snprintf(originname, szorgname, "pg_%u_%u", suboid, relid); +} + +/* + * Start syncing the table in the sync worker. + * + * If nothing needs to be done to sync the table, we exit the worker without + * any further action. + * + * The returned slot name is palloc'ed in current memory context. + */ +char * +LogicalRepSyncTableStart(XLogRecPtr *origin_startpos) +{ + char *slotname; + char *err; + char relstate; + XLogRecPtr relstate_lsn; + Relation rel; + AclResult aclresult; + WalRcvExecResult *res; + char originname[NAMEDATALEN]; + RepOriginId originid; + + /* Check the state of the table synchronization. */ + StartTransactionCommand(); + relstate = GetSubscriptionRelState(MyLogicalRepWorker->subid, + MyLogicalRepWorker->relid, + &relstate_lsn); + CommitTransactionCommand(); + + SpinLockAcquire(&MyLogicalRepWorker->relmutex); + MyLogicalRepWorker->relstate = relstate; + MyLogicalRepWorker->relstate_lsn = relstate_lsn; + SpinLockRelease(&MyLogicalRepWorker->relmutex); + + /* + * If synchronization is already done or no longer necessary, exit now + * that we've updated shared memory state. + */ + switch (relstate) + { + case SUBREL_STATE_SYNCDONE: + case SUBREL_STATE_READY: + case SUBREL_STATE_UNKNOWN: + finish_sync_worker(); /* doesn't return */ + } + + /* Calculate the name of the tablesync slot. */ + slotname = (char *) palloc(NAMEDATALEN); + ReplicationSlotNameForTablesync(MySubscription->oid, + MyLogicalRepWorker->relid, + slotname, + NAMEDATALEN); + + /* + * Here we use the slot name instead of the subscription name as the + * application_name, so that it is different from the main apply worker, + * so that synchronous replication can distinguish them. + */ + LogRepWorkerWalRcvConn = + walrcv_connect(MySubscription->conninfo, true, slotname, &err); + if (LogRepWorkerWalRcvConn == NULL) + ereport(ERROR, + (errcode(ERRCODE_CONNECTION_FAILURE), + errmsg("could not connect to the publisher: %s", err))); + + Assert(MyLogicalRepWorker->relstate == SUBREL_STATE_INIT || + MyLogicalRepWorker->relstate == SUBREL_STATE_DATASYNC || + MyLogicalRepWorker->relstate == SUBREL_STATE_FINISHEDCOPY); + + /* Assign the origin tracking record name. */ + ReplicationOriginNameForTablesync(MySubscription->oid, + MyLogicalRepWorker->relid, + originname, + sizeof(originname)); + + if (MyLogicalRepWorker->relstate == SUBREL_STATE_DATASYNC) + { + /* + * We have previously errored out before finishing the copy so the + * replication slot might exist. We want to remove the slot if it + * already exists and proceed. + * + * XXX We could also instead try to drop the slot, last time we failed + * but for that, we might need to clean up the copy state as it might + * be in the middle of fetching the rows. Also, if there is a network + * breakdown then it wouldn't have succeeded so trying it next time + * seems like a better bet. + */ + ReplicationSlotDropAtPubNode(LogRepWorkerWalRcvConn, slotname, true); + } + else if (MyLogicalRepWorker->relstate == SUBREL_STATE_FINISHEDCOPY) + { + /* + * The COPY phase was previously done, but tablesync then crashed + * before it was able to finish normally. + */ + StartTransactionCommand(); + + /* + * The origin tracking name must already exist. It was created first + * time this tablesync was launched. + */ + originid = replorigin_by_name(originname, false); + replorigin_session_setup(originid); + replorigin_session_origin = originid; + *origin_startpos = replorigin_session_get_progress(false); + + CommitTransactionCommand(); + + goto copy_table_done; + } + + SpinLockAcquire(&MyLogicalRepWorker->relmutex); + MyLogicalRepWorker->relstate = SUBREL_STATE_DATASYNC; + MyLogicalRepWorker->relstate_lsn = InvalidXLogRecPtr; + SpinLockRelease(&MyLogicalRepWorker->relmutex); + + /* Update the state and make it visible to others. */ + StartTransactionCommand(); + UpdateSubscriptionRelState(MyLogicalRepWorker->subid, + MyLogicalRepWorker->relid, + MyLogicalRepWorker->relstate, + MyLogicalRepWorker->relstate_lsn); + CommitTransactionCommand(); + pgstat_report_stat(true); + + StartTransactionCommand(); + + /* + * Use a standard write lock here. It might be better to disallow access + * to the table while it's being synchronized. But we don't want to block + * the main apply process from working and it has to open the relation in + * RowExclusiveLock when remapping remote relation id to local one. + */ + rel = table_open(MyLogicalRepWorker->relid, RowExclusiveLock); + + /* + * Check that our table sync worker has permission to insert into the + * target table. + */ + aclresult = pg_class_aclcheck(RelationGetRelid(rel), GetUserId(), + ACL_INSERT); + if (aclresult != ACLCHECK_OK) + aclcheck_error(aclresult, + get_relkind_objtype(rel->rd_rel->relkind), + RelationGetRelationName(rel)); + + /* + * COPY FROM does not honor RLS policies. That is not a problem for + * subscriptions owned by roles with BYPASSRLS privilege (or superuser, + * who has it implicitly), but other roles should not be able to + * circumvent RLS. Disallow logical replication into RLS enabled + * relations for such roles. + */ + if (check_enable_rls(RelationGetRelid(rel), InvalidOid, false) == RLS_ENABLED) + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("user \"%s\" cannot replicate into relation with row-level security enabled: \"%s\"", + GetUserNameFromId(GetUserId(), true), + RelationGetRelationName(rel)))); + + /* + * Start a transaction in the remote node in REPEATABLE READ mode. This + * ensures that both the replication slot we create (see below) and the + * COPY are consistent with each other. + */ + res = walrcv_exec(LogRepWorkerWalRcvConn, + "BEGIN READ ONLY ISOLATION LEVEL REPEATABLE READ", + 0, NULL); + if (res->status != WALRCV_OK_COMMAND) + ereport(ERROR, + (errcode(ERRCODE_CONNECTION_FAILURE), + errmsg("table copy could not start transaction on publisher: %s", + res->err))); + walrcv_clear_result(res); + + /* + * Create a new permanent logical decoding slot. This slot will be used + * for the catchup phase after COPY is done, so tell it to use the + * snapshot to make the final data consistent. + */ + walrcv_create_slot(LogRepWorkerWalRcvConn, + slotname, false /* permanent */ , false /* two_phase */ , + CRS_USE_SNAPSHOT, origin_startpos); + + /* + * Setup replication origin tracking. The purpose of doing this before the + * copy is to avoid doing the copy again due to any error in setting up + * origin tracking. + */ + originid = replorigin_by_name(originname, true); + if (!OidIsValid(originid)) + { + /* + * Origin tracking does not exist, so create it now. + * + * Then advance to the LSN got from walrcv_create_slot. This is WAL + * logged for the purpose of recovery. Locks are to prevent the + * replication origin from vanishing while advancing. + */ + originid = replorigin_create(originname); + + LockRelationOid(ReplicationOriginRelationId, RowExclusiveLock); + replorigin_advance(originid, *origin_startpos, InvalidXLogRecPtr, + true /* go backward */ , true /* WAL log */ ); + UnlockRelationOid(ReplicationOriginRelationId, RowExclusiveLock); + + replorigin_session_setup(originid); + replorigin_session_origin = originid; + } + else + { + ereport(ERROR, + (errcode(ERRCODE_DUPLICATE_OBJECT), + errmsg("replication origin \"%s\" already exists", + originname))); + } + + /* Now do the initial data copy */ + PushActiveSnapshot(GetTransactionSnapshot()); + copy_table(rel); + PopActiveSnapshot(); + + res = walrcv_exec(LogRepWorkerWalRcvConn, "COMMIT", 0, NULL); + if (res->status != WALRCV_OK_COMMAND) + ereport(ERROR, + (errcode(ERRCODE_CONNECTION_FAILURE), + errmsg("table copy could not finish transaction on publisher: %s", + res->err))); + walrcv_clear_result(res); + + table_close(rel, NoLock); + + /* Make the copy visible. */ + CommandCounterIncrement(); + + /* + * Update the persisted state to indicate the COPY phase is done; make it + * visible to others. + */ + UpdateSubscriptionRelState(MyLogicalRepWorker->subid, + MyLogicalRepWorker->relid, + SUBREL_STATE_FINISHEDCOPY, + MyLogicalRepWorker->relstate_lsn); + + CommitTransactionCommand(); + +copy_table_done: + + elog(DEBUG1, + "LogicalRepSyncTableStart: '%s' origin_startpos lsn %X/%X", + originname, LSN_FORMAT_ARGS(*origin_startpos)); + + /* + * We are done with the initial data synchronization, update the state. + */ + SpinLockAcquire(&MyLogicalRepWorker->relmutex); + MyLogicalRepWorker->relstate = SUBREL_STATE_SYNCWAIT; + MyLogicalRepWorker->relstate_lsn = *origin_startpos; + SpinLockRelease(&MyLogicalRepWorker->relmutex); + + /* + * Finally, wait until the main apply worker tells us to catch up and then + * return to let LogicalRepApplyLoop do it. + */ + wait_for_worker_state_change(SUBREL_STATE_CATCHUP); + return slotname; +} + +/* + * Common code to fetch the up-to-date sync state info into the static lists. + * + * Returns true if subscription has 1 or more tables, else false. + * + * Note: If this function started the transaction (indicated by the parameter) + * then it is the caller's responsibility to commit it. + */ +static bool +FetchTableStates(bool *started_tx) +{ + static bool has_subrels = false; + + *started_tx = false; + + if (!table_states_valid) + { + MemoryContext oldctx; + List *rstates; + ListCell *lc; + SubscriptionRelState *rstate; + + /* Clean the old lists. */ + list_free_deep(table_states_not_ready); + table_states_not_ready = NIL; + + if (!IsTransactionState()) + { + StartTransactionCommand(); + *started_tx = true; + } + + /* Fetch all non-ready tables. */ + rstates = GetSubscriptionNotReadyRelations(MySubscription->oid); + + /* Allocate the tracking info in a permanent memory context. */ + oldctx = MemoryContextSwitchTo(CacheMemoryContext); + foreach(lc, rstates) + { + rstate = palloc(sizeof(SubscriptionRelState)); + memcpy(rstate, lfirst(lc), sizeof(SubscriptionRelState)); + table_states_not_ready = lappend(table_states_not_ready, rstate); + } + MemoryContextSwitchTo(oldctx); + + /* + * Does the subscription have tables? + * + * If there were not-READY relations found then we know it does. But + * if table_state_not_ready was empty we still need to check again to + * see if there are 0 tables. + */ + has_subrels = (list_length(table_states_not_ready) > 0) || + HasSubscriptionRelations(MySubscription->oid); + + table_states_valid = true; + } + + return has_subrels; +} + +/* + * If the subscription has no tables then return false. + * + * Otherwise, are all tablesyncs READY? + * + * Note: This function is not suitable to be called from outside of apply or + * tablesync workers because MySubscription needs to be already initialized. + */ +bool +AllTablesyncsReady(void) +{ + bool started_tx = false; + bool has_subrels = false; + + /* We need up-to-date sync state info for subscription tables here. */ + has_subrels = FetchTableStates(&started_tx); + + if (started_tx) + { + CommitTransactionCommand(); + pgstat_report_stat(true); + } + + /* + * Return false when there are no tables in subscription or not all tables + * are in ready state; true otherwise. + */ + return has_subrels && list_length(table_states_not_ready) == 0; +} + +/* + * Update the two_phase state of the specified subscription in pg_subscription. + */ +void +UpdateTwoPhaseState(Oid suboid, char new_state) +{ + Relation rel; + HeapTuple tup; + bool nulls[Natts_pg_subscription]; + bool replaces[Natts_pg_subscription]; + Datum values[Natts_pg_subscription]; + + Assert(new_state == LOGICALREP_TWOPHASE_STATE_DISABLED || + new_state == LOGICALREP_TWOPHASE_STATE_PENDING || + new_state == LOGICALREP_TWOPHASE_STATE_ENABLED); + + rel = table_open(SubscriptionRelationId, RowExclusiveLock); + tup = SearchSysCacheCopy1(SUBSCRIPTIONOID, ObjectIdGetDatum(suboid)); + if (!HeapTupleIsValid(tup)) + elog(ERROR, + "cache lookup failed for subscription oid %u", + suboid); + + /* Form a new tuple. */ + memset(values, 0, sizeof(values)); + memset(nulls, false, sizeof(nulls)); + memset(replaces, false, sizeof(replaces)); + + /* And update/set two_phase state */ + values[Anum_pg_subscription_subtwophasestate - 1] = CharGetDatum(new_state); + replaces[Anum_pg_subscription_subtwophasestate - 1] = true; + + tup = heap_modify_tuple(tup, RelationGetDescr(rel), + values, nulls, replaces); + CatalogTupleUpdate(rel, &tup->t_self, tup); + + heap_freetuple(tup); + table_close(rel, RowExclusiveLock); +} |