/*------------------------------------------------------------------------- * tablesync.c * PostgreSQL logical replication: initial table data synchronization * * Copyright (c) 2012-2023, PostgreSQL Global Development Group * * IDENTIFICATION * src/backend/replication/logical/tablesync.c * * NOTES * This file contains code for initial table data synchronization for * logical replication. * * The initial data synchronization is done separately for each table, * in a separate apply worker that only fetches the initial snapshot data * from the publisher and then synchronizes the position in the stream with * the leader apply worker. * * There are several reasons for doing the synchronization this way: * - It allows us to parallelize the initial data synchronization * which lowers the time needed for it to happen. * - The initial synchronization does not have to hold the xid and LSN * for the time it takes to copy data of all tables, causing less * bloat and lower disk consumption compared to doing the * synchronization in a single process for the whole database. * - It allows us to synchronize any tables added after the initial * synchronization has finished. * * The stream position synchronization works in multiple steps: * - Apply worker requests a tablesync worker to start, setting the new * table state to INIT. * - Tablesync worker starts; changes table state from INIT to DATASYNC while * copying. * - Tablesync worker does initial table copy; there is a FINISHEDCOPY (sync * worker specific) state to indicate when the copy phase has completed, so * if the worker crashes with this (non-memory) state then the copy will not * be re-attempted. * - Tablesync worker then sets table state to SYNCWAIT; waits for state change. * - Apply worker periodically checks for tables in SYNCWAIT state. When * any appear, it sets the table state to CATCHUP and starts loop-waiting * until either the table state is set to SYNCDONE or the sync worker * exits. * - After the sync worker has seen the state change to CATCHUP, it will * read the stream and apply changes (acting like an apply worker) until * it catches up to the specified stream position. Then it sets the * state to SYNCDONE. There might be zero changes applied between * CATCHUP and SYNCDONE, because the sync worker might be ahead of the * apply worker. * - Once the state is set to SYNCDONE, the apply will continue tracking * the table until it reaches the SYNCDONE stream position, at which * point it sets state to READY and stops tracking. Again, there might * be zero changes in between. * * So the state progression is always: INIT -> DATASYNC -> FINISHEDCOPY * -> SYNCWAIT -> CATCHUP -> SYNCDONE -> READY. * * The catalog pg_subscription_rel is used to keep information about * subscribed tables and their state. The catalog holds all states * except SYNCWAIT and CATCHUP which are only in shared memory. * * Example flows look like this: * - Apply is in front: * sync:8 * -> set in catalog FINISHEDCOPY * -> set in memory SYNCWAIT * apply:10 * -> set in memory CATCHUP * -> enter wait-loop * sync:10 * -> set in catalog SYNCDONE * -> exit * apply:10 * -> exit wait-loop * -> continue rep * apply:11 * -> set in catalog READY * * - Sync is in front: * sync:10 * -> set in catalog FINISHEDCOPY * -> set in memory SYNCWAIT * apply:8 * -> set in memory CATCHUP * -> continue per-table filtering * sync:10 * -> set in catalog SYNCDONE * -> exit * apply:10 * -> set in catalog READY * -> stop per-table filtering * -> continue rep *------------------------------------------------------------------------- */ #include "postgres.h" #include "access/table.h" #include "access/xact.h" #include "catalog/indexing.h" #include "catalog/pg_subscription_rel.h" #include "catalog/pg_type.h" #include "commands/copy.h" #include "miscadmin.h" #include "nodes/makefuncs.h" #include "parser/parse_relation.h" #include "pgstat.h" #include "replication/logicallauncher.h" #include "replication/logicalrelation.h" #include "replication/walreceiver.h" #include "replication/worker_internal.h" #include "replication/slot.h" #include "replication/origin.h" #include "storage/ipc.h" #include "storage/lmgr.h" #include "utils/acl.h" #include "utils/array.h" #include "utils/builtins.h" #include "utils/lsyscache.h" #include "utils/memutils.h" #include "utils/rls.h" #include "utils/snapmgr.h" #include "utils/syscache.h" #include "utils/usercontext.h" static bool table_states_valid = false; static List *table_states_not_ready = NIL; static bool FetchTableStates(bool *started_tx); static StringInfo copybuf = NULL; /* * Exit routine for synchronization worker. */ static void pg_attribute_noreturn() finish_sync_worker(void) { /* * Commit any outstanding transaction. This is the usual case, unless * there was nothing to do for the table. */ if (IsTransactionState()) { CommitTransactionCommand(); pgstat_report_stat(true); } /* And flush all writes. */ XLogFlush(GetXLogWriteRecPtr()); StartTransactionCommand(); ereport(LOG, (errmsg("logical replication table synchronization worker for subscription \"%s\", table \"%s\" has finished", MySubscription->name, get_rel_name(MyLogicalRepWorker->relid)))); CommitTransactionCommand(); /* Find the leader apply worker and signal it. */ logicalrep_worker_wakeup(MyLogicalRepWorker->subid, InvalidOid); /* Stop gracefully */ proc_exit(0); } /* * Wait until the relation sync state is set in the catalog to the expected * one; return true when it happens. * * Returns false if the table sync worker or the table itself have * disappeared, or the table state has been reset. * * Currently, this is used in the apply worker when transitioning from * CATCHUP state to SYNCDONE. */ static bool wait_for_relation_state_change(Oid relid, char expected_state) { char state; for (;;) { LogicalRepWorker *worker; XLogRecPtr statelsn; CHECK_FOR_INTERRUPTS(); InvalidateCatalogSnapshot(); state = GetSubscriptionRelState(MyLogicalRepWorker->subid, relid, &statelsn); if (state == SUBREL_STATE_UNKNOWN) break; if (state == expected_state) return true; /* Check if the sync worker is still running and bail if not. */ LWLockAcquire(LogicalRepWorkerLock, LW_SHARED); worker = logicalrep_worker_find(MyLogicalRepWorker->subid, relid, false); LWLockRelease(LogicalRepWorkerLock); if (!worker) break; (void) WaitLatch(MyLatch, WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH, 1000L, WAIT_EVENT_LOGICAL_SYNC_STATE_CHANGE); ResetLatch(MyLatch); } return false; } /* * Wait until the apply worker changes the state of our synchronization * worker to the expected one. * * Used when transitioning from SYNCWAIT state to CATCHUP. * * Returns false if the apply worker has disappeared. */ static bool wait_for_worker_state_change(char expected_state) { int rc; for (;;) { LogicalRepWorker *worker; CHECK_FOR_INTERRUPTS(); /* * Done if already in correct state. (We assume this fetch is atomic * enough to not give a misleading answer if we do it with no lock.) */ if (MyLogicalRepWorker->relstate == expected_state) return true; /* * Bail out if the apply worker has died, else signal it we're * waiting. */ LWLockAcquire(LogicalRepWorkerLock, LW_SHARED); worker = logicalrep_worker_find(MyLogicalRepWorker->subid, InvalidOid, false); if (worker && worker->proc) logicalrep_worker_wakeup_ptr(worker); LWLockRelease(LogicalRepWorkerLock); if (!worker) break; /* * Wait. We expect to get a latch signal back from the apply worker, * but use a timeout in case it dies without sending one. */ rc = WaitLatch(MyLatch, WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH, 1000L, WAIT_EVENT_LOGICAL_SYNC_STATE_CHANGE); if (rc & WL_LATCH_SET) ResetLatch(MyLatch); } return false; } /* * Callback from syscache invalidation. */ void invalidate_syncing_table_states(Datum arg, int cacheid, uint32 hashvalue) { table_states_valid = false; } /* * Handle table synchronization cooperation from the synchronization * worker. * * If the sync worker is in CATCHUP state and reached (or passed) the * predetermined synchronization point in the WAL stream, mark the table as * SYNCDONE and finish. */ static void process_syncing_tables_for_sync(XLogRecPtr current_lsn) { SpinLockAcquire(&MyLogicalRepWorker->relmutex); if (MyLogicalRepWorker->relstate == SUBREL_STATE_CATCHUP && current_lsn >= MyLogicalRepWorker->relstate_lsn) { TimeLineID tli; char syncslotname[NAMEDATALEN] = {0}; char originname[NAMEDATALEN] = {0}; MyLogicalRepWorker->relstate = SUBREL_STATE_SYNCDONE; MyLogicalRepWorker->relstate_lsn = current_lsn; SpinLockRelease(&MyLogicalRepWorker->relmutex); /* * UpdateSubscriptionRelState must be called within a transaction. */ if (!IsTransactionState()) StartTransactionCommand(); UpdateSubscriptionRelState(MyLogicalRepWorker->subid, MyLogicalRepWorker->relid, MyLogicalRepWorker->relstate, MyLogicalRepWorker->relstate_lsn); /* * End streaming so that LogRepWorkerWalRcvConn can be used to drop * the slot. */ walrcv_endstreaming(LogRepWorkerWalRcvConn, &tli); /* * Cleanup the tablesync slot. * * This has to be done after updating the state because otherwise if * there is an error while doing the database operations we won't be * able to rollback dropped slot. */ ReplicationSlotNameForTablesync(MyLogicalRepWorker->subid, MyLogicalRepWorker->relid, syncslotname, sizeof(syncslotname)); /* * It is important to give an error if we are unable to drop the slot, * otherwise, it won't be dropped till the corresponding subscription * is dropped. So passing missing_ok = false. */ ReplicationSlotDropAtPubNode(LogRepWorkerWalRcvConn, syncslotname, false); CommitTransactionCommand(); pgstat_report_stat(false); /* * Start a new transaction to clean up the tablesync origin tracking. * This transaction will be ended within the finish_sync_worker(). * Now, even, if we fail to remove this here, the apply worker will * ensure to clean it up afterward. * * We need to do this after the table state is set to SYNCDONE. * Otherwise, if an error occurs while performing the database * operation, the worker will be restarted and the in-memory state of * replication progress (remote_lsn) won't be rolled-back which would * have been cleared before restart. So, the restarted worker will use * invalid replication progress state resulting in replay of * transactions that have already been applied. */ StartTransactionCommand(); ReplicationOriginNameForLogicalRep(MyLogicalRepWorker->subid, MyLogicalRepWorker->relid, originname, sizeof(originname)); /* * Resetting the origin session removes the ownership of the slot. * This is needed to allow the origin to be dropped. */ replorigin_session_reset(); replorigin_session_origin = InvalidRepOriginId; replorigin_session_origin_lsn = InvalidXLogRecPtr; replorigin_session_origin_timestamp = 0; /* * Drop the tablesync's origin tracking if exists. * * There is a chance that the user is concurrently performing refresh * for the subscription where we remove the table state and its origin * or the apply worker would have removed this origin. So passing * missing_ok = true. */ replorigin_drop_by_name(originname, true, false); finish_sync_worker(); } else SpinLockRelease(&MyLogicalRepWorker->relmutex); } /* * Handle table synchronization cooperation from the apply worker. * * Walk over all subscription tables that are individually tracked by the * apply process (currently, all that have state other than * SUBREL_STATE_READY) and manage synchronization for them. * * If there are tables that need synchronizing and are not being synchronized * yet, start sync workers for them (if there are free slots for sync * workers). To prevent starting the sync worker for the same relation at a * high frequency after a failure, we store its last start time with each sync * state info. We start the sync worker for the same relation after waiting * at least wal_retrieve_retry_interval. * * For tables that are being synchronized already, check if sync workers * either need action from the apply worker or have finished. This is the * SYNCWAIT to CATCHUP transition. * * If the synchronization position is reached (SYNCDONE), then the table can * be marked as READY and is no longer tracked. */ static void process_syncing_tables_for_apply(XLogRecPtr current_lsn) { struct tablesync_start_time_mapping { Oid relid; TimestampTz last_start_time; }; static HTAB *last_start_times = NULL; ListCell *lc; bool started_tx = false; bool should_exit = false; Assert(!IsTransactionState()); /* We need up-to-date sync state info for subscription tables here. */ FetchTableStates(&started_tx); /* * Prepare a hash table for tracking last start times of workers, to avoid * immediate restarts. We don't need it if there are no tables that need * syncing. */ if (table_states_not_ready != NIL && !last_start_times) { HASHCTL ctl; ctl.keysize = sizeof(Oid); ctl.entrysize = sizeof(struct tablesync_start_time_mapping); last_start_times = hash_create("Logical replication table sync worker start times", 256, &ctl, HASH_ELEM | HASH_BLOBS); } /* * Clean up the hash table when we're done with all tables (just to * release the bit of memory). */ else if (table_states_not_ready == NIL && last_start_times) { hash_destroy(last_start_times); last_start_times = NULL; } /* * Process all tables that are being synchronized. */ foreach(lc, table_states_not_ready) { SubscriptionRelState *rstate = (SubscriptionRelState *) lfirst(lc); if (rstate->state == SUBREL_STATE_SYNCDONE) { /* * Apply has caught up to the position where the table sync has * finished. Mark the table as ready so that the apply will just * continue to replicate it normally. */ if (current_lsn >= rstate->lsn) { char originname[NAMEDATALEN]; rstate->state = SUBREL_STATE_READY; rstate->lsn = current_lsn; if (!started_tx) { StartTransactionCommand(); started_tx = true; } /* * Remove the tablesync origin tracking if exists. * * There is a chance that the user is concurrently performing * refresh for the subscription where we remove the table * state and its origin or the tablesync worker would have * already removed this origin. We can't rely on tablesync * worker to remove the origin tracking as if there is any * error while dropping we won't restart it to drop the * origin. So passing missing_ok = true. */ ReplicationOriginNameForLogicalRep(MyLogicalRepWorker->subid, rstate->relid, originname, sizeof(originname)); replorigin_drop_by_name(originname, true, false); /* * Update the state to READY only after the origin cleanup. */ UpdateSubscriptionRelState(MyLogicalRepWorker->subid, rstate->relid, rstate->state, rstate->lsn); } } else { LogicalRepWorker *syncworker; /* * Look for a sync worker for this relation. */ LWLockAcquire(LogicalRepWorkerLock, LW_SHARED); syncworker = logicalrep_worker_find(MyLogicalRepWorker->subid, rstate->relid, false); if (syncworker) { /* Found one, update our copy of its state */ SpinLockAcquire(&syncworker->relmutex); rstate->state = syncworker->relstate; rstate->lsn = syncworker->relstate_lsn; if (rstate->state == SUBREL_STATE_SYNCWAIT) { /* * Sync worker is waiting for apply. Tell sync worker it * can catchup now. */ syncworker->relstate = SUBREL_STATE_CATCHUP; syncworker->relstate_lsn = Max(syncworker->relstate_lsn, current_lsn); } SpinLockRelease(&syncworker->relmutex); /* If we told worker to catch up, wait for it. */ if (rstate->state == SUBREL_STATE_SYNCWAIT) { /* Signal the sync worker, as it may be waiting for us. */ if (syncworker->proc) logicalrep_worker_wakeup_ptr(syncworker); /* Now safe to release the LWLock */ LWLockRelease(LogicalRepWorkerLock); if (started_tx) { /* * We must commit the existing transaction to release * the existing locks before entering a busy loop. * This is required to avoid any undetected deadlocks * due to any existing lock as deadlock detector won't * be able to detect the waits on the latch. */ CommitTransactionCommand(); pgstat_report_stat(false); } /* * Enter busy loop and wait for synchronization worker to * reach expected state (or die trying). */ StartTransactionCommand(); started_tx = true; wait_for_relation_state_change(rstate->relid, SUBREL_STATE_SYNCDONE); } else LWLockRelease(LogicalRepWorkerLock); } else { /* * If there is no sync worker for this table yet, count * running sync workers for this subscription, while we have * the lock. */ int nsyncworkers = logicalrep_sync_worker_count(MyLogicalRepWorker->subid); /* Now safe to release the LWLock */ LWLockRelease(LogicalRepWorkerLock); /* * If there are free sync worker slot(s), start a new sync * worker for the table. */ if (nsyncworkers < max_sync_workers_per_subscription) { TimestampTz now = GetCurrentTimestamp(); struct tablesync_start_time_mapping *hentry; bool found; hentry = hash_search(last_start_times, &rstate->relid, HASH_ENTER, &found); if (!found || TimestampDifferenceExceeds(hentry->last_start_time, now, wal_retrieve_retry_interval)) { logicalrep_worker_launch(MyLogicalRepWorker->dbid, MySubscription->oid, MySubscription->name, MyLogicalRepWorker->userid, rstate->relid, DSM_HANDLE_INVALID); hentry->last_start_time = now; } } } } } if (started_tx) { /* * Even when the two_phase mode is requested by the user, it remains * as 'pending' until all tablesyncs have reached READY state. * * When this happens, we restart the apply worker and (if the * conditions are still ok) then the two_phase tri-state will become * 'enabled' at that time. * * Note: If the subscription has no tables then leave the state as * PENDING, which allows ALTER SUBSCRIPTION ... REFRESH PUBLICATION to * work. */ if (MySubscription->twophasestate == LOGICALREP_TWOPHASE_STATE_PENDING) { CommandCounterIncrement(); /* make updates visible */ if (AllTablesyncsReady()) { ereport(LOG, (errmsg("logical replication apply worker for subscription \"%s\" will restart so that two_phase can be enabled", MySubscription->name))); should_exit = true; } } CommitTransactionCommand(); pgstat_report_stat(true); } if (should_exit) { /* * Reset the last-start time for this worker so that the launcher will * restart it without waiting for wal_retrieve_retry_interval. */ ApplyLauncherForgetWorkerStartTime(MySubscription->oid); proc_exit(0); } } /* * Process possible state change(s) of tables that are being synchronized. */ void process_syncing_tables(XLogRecPtr current_lsn) { /* * Skip for parallel apply workers because they only operate on tables * that are in a READY state. See pa_can_start() and * should_apply_changes_for_rel(). */ if (am_parallel_apply_worker()) return; if (am_tablesync_worker()) process_syncing_tables_for_sync(current_lsn); else process_syncing_tables_for_apply(current_lsn); } /* * Create list of columns for COPY based on logical relation mapping. */ static List * make_copy_attnamelist(LogicalRepRelMapEntry *rel) { List *attnamelist = NIL; int i; for (i = 0; i < rel->remoterel.natts; i++) { attnamelist = lappend(attnamelist, makeString(rel->remoterel.attnames[i])); } return attnamelist; } /* * Data source callback for the COPY FROM, which reads from the remote * connection and passes the data back to our local COPY. */ static int copy_read_data(void *outbuf, int minread, int maxread) { int bytesread = 0; int avail; /* If there are some leftover data from previous read, use it. */ avail = copybuf->len - copybuf->cursor; if (avail) { if (avail > maxread) avail = maxread; memcpy(outbuf, ©buf->data[copybuf->cursor], avail); copybuf->cursor += avail; maxread -= avail; bytesread += avail; } while (maxread > 0 && bytesread < minread) { pgsocket fd = PGINVALID_SOCKET; int len; char *buf = NULL; for (;;) { /* Try read the data. */ len = walrcv_receive(LogRepWorkerWalRcvConn, &buf, &fd); CHECK_FOR_INTERRUPTS(); if (len == 0) break; else if (len < 0) return bytesread; else { /* Process the data */ copybuf->data = buf; copybuf->len = len; copybuf->cursor = 0; avail = copybuf->len - copybuf->cursor; if (avail > maxread) avail = maxread; memcpy(outbuf, ©buf->data[copybuf->cursor], avail); outbuf = (void *) ((char *) outbuf + avail); copybuf->cursor += avail; maxread -= avail; bytesread += avail; } if (maxread <= 0 || bytesread >= minread) return bytesread; } /* * Wait for more data or latch. */ (void) WaitLatchOrSocket(MyLatch, WL_SOCKET_READABLE | WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH, fd, 1000L, WAIT_EVENT_LOGICAL_SYNC_DATA); ResetLatch(MyLatch); } return bytesread; } /* * Get information about remote relation in similar fashion the RELATION * message provides during replication. This function also returns the relation * qualifications to be used in the COPY command. */ static void fetch_remote_table_info(char *nspname, char *relname, LogicalRepRelation *lrel, List **qual) { WalRcvExecResult *res; StringInfoData cmd; TupleTableSlot *slot; Oid tableRow[] = {OIDOID, CHAROID, CHAROID}; Oid attrRow[] = {INT2OID, TEXTOID, OIDOID, BOOLOID}; Oid qualRow[] = {TEXTOID}; bool isnull; int natt; ListCell *lc; Bitmapset *included_cols = NULL; lrel->nspname = nspname; lrel->relname = relname; /* First fetch Oid and replica identity. */ initStringInfo(&cmd); appendStringInfo(&cmd, "SELECT c.oid, c.relreplident, c.relkind" " FROM pg_catalog.pg_class c" " INNER JOIN pg_catalog.pg_namespace n" " ON (c.relnamespace = n.oid)" " WHERE n.nspname = %s" " AND c.relname = %s", quote_literal_cstr(nspname), quote_literal_cstr(relname)); res = walrcv_exec(LogRepWorkerWalRcvConn, cmd.data, lengthof(tableRow), tableRow); if (res->status != WALRCV_OK_TUPLES) ereport(ERROR, (errcode(ERRCODE_CONNECTION_FAILURE), errmsg("could not fetch table info for table \"%s.%s\" from publisher: %s", nspname, relname, res->err))); slot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple); if (!tuplestore_gettupleslot(res->tuplestore, true, false, slot)) ereport(ERROR, (errcode(ERRCODE_UNDEFINED_OBJECT), errmsg("table \"%s.%s\" not found on publisher", nspname, relname))); lrel->remoteid = DatumGetObjectId(slot_getattr(slot, 1, &isnull)); Assert(!isnull); lrel->replident = DatumGetChar(slot_getattr(slot, 2, &isnull)); Assert(!isnull); lrel->relkind = DatumGetChar(slot_getattr(slot, 3, &isnull)); Assert(!isnull); ExecDropSingleTupleTableSlot(slot); walrcv_clear_result(res); /* * Get column lists for each relation. * * We need to do this before fetching info about column names and types, * so that we can skip columns that should not be replicated. */ if (walrcv_server_version(LogRepWorkerWalRcvConn) >= 150000) { WalRcvExecResult *pubres; TupleTableSlot *tslot; Oid attrsRow[] = {INT2VECTOROID}; StringInfoData pub_names; initStringInfo(&pub_names); foreach(lc, MySubscription->publications) { if (foreach_current_index(lc) > 0) appendStringInfoString(&pub_names, ", "); appendStringInfoString(&pub_names, quote_literal_cstr(strVal(lfirst(lc)))); } /* * Fetch info about column lists for the relation (from all the * publications). */ resetStringInfo(&cmd); appendStringInfo(&cmd, "SELECT DISTINCT" " (CASE WHEN (array_length(gpt.attrs, 1) = c.relnatts)" " THEN NULL ELSE gpt.attrs END)" " FROM pg_publication p," " LATERAL pg_get_publication_tables(p.pubname) gpt," " pg_class c" " WHERE gpt.relid = %u AND c.oid = gpt.relid" " AND p.pubname IN ( %s )", lrel->remoteid, pub_names.data); pubres = walrcv_exec(LogRepWorkerWalRcvConn, cmd.data, lengthof(attrsRow), attrsRow); if (pubres->status != WALRCV_OK_TUPLES) ereport(ERROR, (errcode(ERRCODE_CONNECTION_FAILURE), errmsg("could not fetch column list info for table \"%s.%s\" from publisher: %s", nspname, relname, pubres->err))); /* * We don't support the case where the column list is different for * the same table when combining publications. See comments atop * fetch_table_list. So there should be only one row returned. * Although we already checked this when creating the subscription, we * still need to check here in case the column list was changed after * creating the subscription and before the sync worker is started. */ if (tuplestore_tuple_count(pubres->tuplestore) > 1) ereport(ERROR, errcode(ERRCODE_FEATURE_NOT_SUPPORTED), errmsg("cannot use different column lists for table \"%s.%s\" in different publications", nspname, relname)); /* * Get the column list and build a single bitmap with the attnums. * * If we find a NULL value, it means all the columns should be * replicated. */ tslot = MakeSingleTupleTableSlot(pubres->tupledesc, &TTSOpsMinimalTuple); if (tuplestore_gettupleslot(pubres->tuplestore, true, false, tslot)) { Datum cfval = slot_getattr(tslot, 1, &isnull); if (!isnull) { ArrayType *arr; int nelems; int16 *elems; arr = DatumGetArrayTypeP(cfval); nelems = ARR_DIMS(arr)[0]; elems = (int16 *) ARR_DATA_PTR(arr); for (natt = 0; natt < nelems; natt++) included_cols = bms_add_member(included_cols, elems[natt]); } ExecClearTuple(tslot); } ExecDropSingleTupleTableSlot(tslot); walrcv_clear_result(pubres); pfree(pub_names.data); } /* * Now fetch column names and types. */ resetStringInfo(&cmd); appendStringInfo(&cmd, "SELECT a.attnum," " a.attname," " a.atttypid," " a.attnum = ANY(i.indkey)" " FROM pg_catalog.pg_attribute a" " LEFT JOIN pg_catalog.pg_index i" " ON (i.indexrelid = pg_get_replica_identity_index(%u))" " WHERE a.attnum > 0::pg_catalog.int2" " AND NOT a.attisdropped %s" " AND a.attrelid = %u" " ORDER BY a.attnum", lrel->remoteid, (walrcv_server_version(LogRepWorkerWalRcvConn) >= 120000 ? "AND a.attgenerated = ''" : ""), lrel->remoteid); res = walrcv_exec(LogRepWorkerWalRcvConn, cmd.data, lengthof(attrRow), attrRow); if (res->status != WALRCV_OK_TUPLES) ereport(ERROR, (errcode(ERRCODE_CONNECTION_FAILURE), errmsg("could not fetch table info for table \"%s.%s\" from publisher: %s", nspname, relname, res->err))); /* We don't know the number of rows coming, so allocate enough space. */ lrel->attnames = palloc0(MaxTupleAttributeNumber * sizeof(char *)); lrel->atttyps = palloc0(MaxTupleAttributeNumber * sizeof(Oid)); lrel->attkeys = NULL; /* * Store the columns as a list of names. Ignore those that are not * present in the column list, if there is one. */ natt = 0; slot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple); while (tuplestore_gettupleslot(res->tuplestore, true, false, slot)) { char *rel_colname; AttrNumber attnum; attnum = DatumGetInt16(slot_getattr(slot, 1, &isnull)); Assert(!isnull); /* If the column is not in the column list, skip it. */ if (included_cols != NULL && !bms_is_member(attnum, included_cols)) { ExecClearTuple(slot); continue; } rel_colname = TextDatumGetCString(slot_getattr(slot, 2, &isnull)); Assert(!isnull); lrel->attnames[natt] = rel_colname; lrel->atttyps[natt] = DatumGetObjectId(slot_getattr(slot, 3, &isnull)); Assert(!isnull); if (DatumGetBool(slot_getattr(slot, 4, &isnull))) lrel->attkeys = bms_add_member(lrel->attkeys, natt); /* Should never happen. */ if (++natt >= MaxTupleAttributeNumber) elog(ERROR, "too many columns in remote table \"%s.%s\"", nspname, relname); ExecClearTuple(slot); } ExecDropSingleTupleTableSlot(slot); lrel->natts = natt; walrcv_clear_result(res); /* * Get relation's row filter expressions. DISTINCT avoids the same * expression of a table in multiple publications from being included * multiple times in the final expression. * * We need to copy the row even if it matches just one of the * publications, so we later combine all the quals with OR. * * For initial synchronization, row filtering can be ignored in following * cases: * * 1) one of the subscribed publications for the table hasn't specified * any row filter * * 2) one of the subscribed publications has puballtables set to true * * 3) one of the subscribed publications is declared as TABLES IN SCHEMA * that includes this relation */ if (walrcv_server_version(LogRepWorkerWalRcvConn) >= 150000) { StringInfoData pub_names; /* Build the pubname list. */ initStringInfo(&pub_names); foreach(lc, MySubscription->publications) { char *pubname = strVal(lfirst(lc)); if (foreach_current_index(lc) > 0) appendStringInfoString(&pub_names, ", "); appendStringInfoString(&pub_names, quote_literal_cstr(pubname)); } /* Check for row filters. */ resetStringInfo(&cmd); appendStringInfo(&cmd, "SELECT DISTINCT pg_get_expr(gpt.qual, gpt.relid)" " FROM pg_publication p," " LATERAL pg_get_publication_tables(p.pubname) gpt" " WHERE gpt.relid = %u" " AND p.pubname IN ( %s )", lrel->remoteid, pub_names.data); res = walrcv_exec(LogRepWorkerWalRcvConn, cmd.data, 1, qualRow); if (res->status != WALRCV_OK_TUPLES) ereport(ERROR, (errmsg("could not fetch table WHERE clause info for table \"%s.%s\" from publisher: %s", nspname, relname, res->err))); /* * Multiple row filter expressions for the same table will be combined * by COPY using OR. If any of the filter expressions for this table * are null, it means the whole table will be copied. In this case it * is not necessary to construct a unified row filter expression at * all. */ slot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple); while (tuplestore_gettupleslot(res->tuplestore, true, false, slot)) { Datum rf = slot_getattr(slot, 1, &isnull); if (!isnull) *qual = lappend(*qual, makeString(TextDatumGetCString(rf))); else { /* Ignore filters and cleanup as necessary. */ if (*qual) { list_free_deep(*qual); *qual = NIL; } break; } ExecClearTuple(slot); } ExecDropSingleTupleTableSlot(slot); walrcv_clear_result(res); } pfree(cmd.data); } /* * Copy existing data of a table from publisher. * * Caller is responsible for locking the local relation. */ static void copy_table(Relation rel) { LogicalRepRelMapEntry *relmapentry; LogicalRepRelation lrel; List *qual = NIL; WalRcvExecResult *res; StringInfoData cmd; CopyFromState cstate; List *attnamelist; ParseState *pstate; List *options = NIL; /* Get the publisher relation info. */ fetch_remote_table_info(get_namespace_name(RelationGetNamespace(rel)), RelationGetRelationName(rel), &lrel, &qual); /* Put the relation into relmap. */ logicalrep_relmap_update(&lrel); /* Map the publisher relation to local one. */ relmapentry = logicalrep_rel_open(lrel.remoteid, NoLock); Assert(rel == relmapentry->localrel); /* Start copy on the publisher. */ initStringInfo(&cmd); /* Regular table with no row filter */ if (lrel.relkind == RELKIND_RELATION && qual == NIL) { appendStringInfo(&cmd, "COPY %s", quote_qualified_identifier(lrel.nspname, lrel.relname)); /* If the table has columns, then specify the columns */ if (lrel.natts) { appendStringInfoString(&cmd, " ("); /* * XXX Do we need to list the columns in all cases? Maybe we're * replicating all columns? */ for (int i = 0; i < lrel.natts; i++) { if (i > 0) appendStringInfoString(&cmd, ", "); appendStringInfoString(&cmd, quote_identifier(lrel.attnames[i])); } appendStringInfoString(&cmd, ")"); } appendStringInfoString(&cmd, " TO STDOUT"); } else { /* * For non-tables and tables with row filters, we need to do COPY * (SELECT ...), but we can't just do SELECT * because we need to not * copy generated columns. For tables with any row filters, build a * SELECT query with OR'ed row filters for COPY. */ appendStringInfoString(&cmd, "COPY (SELECT "); for (int i = 0; i < lrel.natts; i++) { appendStringInfoString(&cmd, quote_identifier(lrel.attnames[i])); if (i < lrel.natts - 1) appendStringInfoString(&cmd, ", "); } appendStringInfoString(&cmd, " FROM "); /* * For regular tables, make sure we don't copy data from a child that * inherits the named table as those will be copied separately. */ if (lrel.relkind == RELKIND_RELATION) appendStringInfoString(&cmd, "ONLY "); appendStringInfoString(&cmd, quote_qualified_identifier(lrel.nspname, lrel.relname)); /* list of OR'ed filters */ if (qual != NIL) { ListCell *lc; char *q = strVal(linitial(qual)); appendStringInfo(&cmd, " WHERE %s", q); for_each_from(lc, qual, 1) { q = strVal(lfirst(lc)); appendStringInfo(&cmd, " OR %s", q); } list_free_deep(qual); } appendStringInfoString(&cmd, ") TO STDOUT"); } /* * Prior to v16, initial table synchronization will use text format even * if the binary option is enabled for a subscription. */ if (walrcv_server_version(LogRepWorkerWalRcvConn) >= 160000 && MySubscription->binary) { appendStringInfoString(&cmd, " WITH (FORMAT binary)"); options = list_make1(makeDefElem("format", (Node *) makeString("binary"), -1)); } res = walrcv_exec(LogRepWorkerWalRcvConn, cmd.data, 0, NULL); pfree(cmd.data); if (res->status != WALRCV_OK_COPY_OUT) ereport(ERROR, (errcode(ERRCODE_CONNECTION_FAILURE), errmsg("could not start initial contents copy for table \"%s.%s\": %s", lrel.nspname, lrel.relname, res->err))); walrcv_clear_result(res); copybuf = makeStringInfo(); pstate = make_parsestate(NULL); (void) addRangeTableEntryForRelation(pstate, rel, AccessShareLock, NULL, false, false); attnamelist = make_copy_attnamelist(relmapentry); cstate = BeginCopyFrom(pstate, rel, NULL, NULL, false, copy_read_data, attnamelist, options); /* Do the copy */ (void) CopyFrom(cstate); logicalrep_rel_close(relmapentry, NoLock); } /* * Determine the tablesync slot name. * * The name must not exceed NAMEDATALEN - 1 because of remote node constraints * on slot name length. We append system_identifier to avoid slot_name * collision with subscriptions in other clusters. With the current scheme * pg_%u_sync_%u_UINT64_FORMAT (3 + 10 + 6 + 10 + 20 + '\0'), the maximum * length of slot_name will be 50. * * The returned slot name is stored in the supplied buffer (syncslotname) with * the given size. * * Note: We don't use the subscription slot name as part of tablesync slot name * because we are responsible for cleaning up these slots and it could become * impossible to recalculate what name to cleanup if the subscription slot name * had changed. */ void ReplicationSlotNameForTablesync(Oid suboid, Oid relid, char *syncslotname, Size szslot) { snprintf(syncslotname, szslot, "pg_%u_sync_%u_" UINT64_FORMAT, suboid, relid, GetSystemIdentifier()); } /* * Start syncing the table in the sync worker. * * If nothing needs to be done to sync the table, we exit the worker without * any further action. * * The returned slot name is palloc'ed in current memory context. */ char * LogicalRepSyncTableStart(XLogRecPtr *origin_startpos) { char *slotname; char *err; char relstate; XLogRecPtr relstate_lsn; Relation rel; AclResult aclresult; WalRcvExecResult *res; char originname[NAMEDATALEN]; RepOriginId originid; UserContext ucxt; bool must_use_password; bool run_as_owner; /* Check the state of the table synchronization. */ StartTransactionCommand(); relstate = GetSubscriptionRelState(MyLogicalRepWorker->subid, MyLogicalRepWorker->relid, &relstate_lsn); /* Is the use of a password mandatory? */ must_use_password = MySubscription->passwordrequired && !superuser_arg(MySubscription->owner); /* Note that the superuser_arg call can access the DB */ CommitTransactionCommand(); SpinLockAcquire(&MyLogicalRepWorker->relmutex); MyLogicalRepWorker->relstate = relstate; MyLogicalRepWorker->relstate_lsn = relstate_lsn; SpinLockRelease(&MyLogicalRepWorker->relmutex); /* * If synchronization is already done or no longer necessary, exit now * that we've updated shared memory state. */ switch (relstate) { case SUBREL_STATE_SYNCDONE: case SUBREL_STATE_READY: case SUBREL_STATE_UNKNOWN: finish_sync_worker(); /* doesn't return */ } /* Calculate the name of the tablesync slot. */ slotname = (char *) palloc(NAMEDATALEN); ReplicationSlotNameForTablesync(MySubscription->oid, MyLogicalRepWorker->relid, slotname, NAMEDATALEN); /* * Here we use the slot name instead of the subscription name as the * application_name, so that it is different from the leader apply worker, * so that synchronous replication can distinguish them. */ LogRepWorkerWalRcvConn = walrcv_connect(MySubscription->conninfo, true, must_use_password, slotname, &err); if (LogRepWorkerWalRcvConn == NULL) ereport(ERROR, (errcode(ERRCODE_CONNECTION_FAILURE), errmsg("could not connect to the publisher: %s", err))); Assert(MyLogicalRepWorker->relstate == SUBREL_STATE_INIT || MyLogicalRepWorker->relstate == SUBREL_STATE_DATASYNC || MyLogicalRepWorker->relstate == SUBREL_STATE_FINISHEDCOPY); /* Assign the origin tracking record name. */ ReplicationOriginNameForLogicalRep(MySubscription->oid, MyLogicalRepWorker->relid, originname, sizeof(originname)); if (MyLogicalRepWorker->relstate == SUBREL_STATE_DATASYNC) { /* * We have previously errored out before finishing the copy so the * replication slot might exist. We want to remove the slot if it * already exists and proceed. * * XXX We could also instead try to drop the slot, last time we failed * but for that, we might need to clean up the copy state as it might * be in the middle of fetching the rows. Also, if there is a network * breakdown then it wouldn't have succeeded so trying it next time * seems like a better bet. */ ReplicationSlotDropAtPubNode(LogRepWorkerWalRcvConn, slotname, true); } else if (MyLogicalRepWorker->relstate == SUBREL_STATE_FINISHEDCOPY) { /* * The COPY phase was previously done, but tablesync then crashed * before it was able to finish normally. */ StartTransactionCommand(); /* * The origin tracking name must already exist. It was created first * time this tablesync was launched. */ originid = replorigin_by_name(originname, false); replorigin_session_setup(originid, 0); replorigin_session_origin = originid; *origin_startpos = replorigin_session_get_progress(false); CommitTransactionCommand(); goto copy_table_done; } SpinLockAcquire(&MyLogicalRepWorker->relmutex); MyLogicalRepWorker->relstate = SUBREL_STATE_DATASYNC; MyLogicalRepWorker->relstate_lsn = InvalidXLogRecPtr; SpinLockRelease(&MyLogicalRepWorker->relmutex); /* Update the state and make it visible to others. */ StartTransactionCommand(); UpdateSubscriptionRelState(MyLogicalRepWorker->subid, MyLogicalRepWorker->relid, MyLogicalRepWorker->relstate, MyLogicalRepWorker->relstate_lsn); CommitTransactionCommand(); pgstat_report_stat(true); StartTransactionCommand(); /* * Use a standard write lock here. It might be better to disallow access * to the table while it's being synchronized. But we don't want to block * the main apply process from working and it has to open the relation in * RowExclusiveLock when remapping remote relation id to local one. */ rel = table_open(MyLogicalRepWorker->relid, RowExclusiveLock); /* * Start a transaction in the remote node in REPEATABLE READ mode. This * ensures that both the replication slot we create (see below) and the * COPY are consistent with each other. */ res = walrcv_exec(LogRepWorkerWalRcvConn, "BEGIN READ ONLY ISOLATION LEVEL REPEATABLE READ", 0, NULL); if (res->status != WALRCV_OK_COMMAND) ereport(ERROR, (errcode(ERRCODE_CONNECTION_FAILURE), errmsg("table copy could not start transaction on publisher: %s", res->err))); walrcv_clear_result(res); /* * Create a new permanent logical decoding slot. This slot will be used * for the catchup phase after COPY is done, so tell it to use the * snapshot to make the final data consistent. */ walrcv_create_slot(LogRepWorkerWalRcvConn, slotname, false /* permanent */ , false /* two_phase */ , CRS_USE_SNAPSHOT, origin_startpos); /* * Setup replication origin tracking. The purpose of doing this before the * copy is to avoid doing the copy again due to any error in setting up * origin tracking. */ originid = replorigin_by_name(originname, true); if (!OidIsValid(originid)) { /* * Origin tracking does not exist, so create it now. * * Then advance to the LSN got from walrcv_create_slot. This is WAL * logged for the purpose of recovery. Locks are to prevent the * replication origin from vanishing while advancing. */ originid = replorigin_create(originname); LockRelationOid(ReplicationOriginRelationId, RowExclusiveLock); replorigin_advance(originid, *origin_startpos, InvalidXLogRecPtr, true /* go backward */ , true /* WAL log */ ); UnlockRelationOid(ReplicationOriginRelationId, RowExclusiveLock); replorigin_session_setup(originid, 0); replorigin_session_origin = originid; } else { ereport(ERROR, (errcode(ERRCODE_DUPLICATE_OBJECT), errmsg("replication origin \"%s\" already exists", originname))); } /* * Make sure that the copy command runs as the table owner, unless the * user has opted out of that behaviour. */ run_as_owner = MySubscription->runasowner; if (!run_as_owner) SwitchToUntrustedUser(rel->rd_rel->relowner, &ucxt); /* * Check that our table sync worker has permission to insert into the * target table. */ aclresult = pg_class_aclcheck(RelationGetRelid(rel), GetUserId(), ACL_INSERT); if (aclresult != ACLCHECK_OK) aclcheck_error(aclresult, get_relkind_objtype(rel->rd_rel->relkind), RelationGetRelationName(rel)); /* * COPY FROM does not honor RLS policies. That is not a problem for * subscriptions owned by roles with BYPASSRLS privilege (or superuser, * who has it implicitly), but other roles should not be able to * circumvent RLS. Disallow logical replication into RLS enabled * relations for such roles. */ if (check_enable_rls(RelationGetRelid(rel), InvalidOid, false) == RLS_ENABLED) ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), errmsg("user \"%s\" cannot replicate into relation with row-level security enabled: \"%s\"", GetUserNameFromId(GetUserId(), true), RelationGetRelationName(rel)))); /* Now do the initial data copy */ PushActiveSnapshot(GetTransactionSnapshot()); copy_table(rel); PopActiveSnapshot(); res = walrcv_exec(LogRepWorkerWalRcvConn, "COMMIT", 0, NULL); if (res->status != WALRCV_OK_COMMAND) ereport(ERROR, (errcode(ERRCODE_CONNECTION_FAILURE), errmsg("table copy could not finish transaction on publisher: %s", res->err))); walrcv_clear_result(res); if (!run_as_owner) RestoreUserContext(&ucxt); table_close(rel, NoLock); /* Make the copy visible. */ CommandCounterIncrement(); /* * Update the persisted state to indicate the COPY phase is done; make it * visible to others. */ UpdateSubscriptionRelState(MyLogicalRepWorker->subid, MyLogicalRepWorker->relid, SUBREL_STATE_FINISHEDCOPY, MyLogicalRepWorker->relstate_lsn); CommitTransactionCommand(); copy_table_done: elog(DEBUG1, "LogicalRepSyncTableStart: '%s' origin_startpos lsn %X/%X", originname, LSN_FORMAT_ARGS(*origin_startpos)); /* * We are done with the initial data synchronization, update the state. */ SpinLockAcquire(&MyLogicalRepWorker->relmutex); MyLogicalRepWorker->relstate = SUBREL_STATE_SYNCWAIT; MyLogicalRepWorker->relstate_lsn = *origin_startpos; SpinLockRelease(&MyLogicalRepWorker->relmutex); /* * Finally, wait until the leader apply worker tells us to catch up and * then return to let LogicalRepApplyLoop do it. */ wait_for_worker_state_change(SUBREL_STATE_CATCHUP); return slotname; } /* * Common code to fetch the up-to-date sync state info into the static lists. * * Returns true if subscription has 1 or more tables, else false. * * Note: If this function started the transaction (indicated by the parameter) * then it is the caller's responsibility to commit it. */ static bool FetchTableStates(bool *started_tx) { static bool has_subrels = false; *started_tx = false; if (!table_states_valid) { MemoryContext oldctx; List *rstates; ListCell *lc; SubscriptionRelState *rstate; /* Clean the old lists. */ list_free_deep(table_states_not_ready); table_states_not_ready = NIL; if (!IsTransactionState()) { StartTransactionCommand(); *started_tx = true; } /* Fetch all non-ready tables. */ rstates = GetSubscriptionRelations(MySubscription->oid, true); /* Allocate the tracking info in a permanent memory context. */ oldctx = MemoryContextSwitchTo(CacheMemoryContext); foreach(lc, rstates) { rstate = palloc(sizeof(SubscriptionRelState)); memcpy(rstate, lfirst(lc), sizeof(SubscriptionRelState)); table_states_not_ready = lappend(table_states_not_ready, rstate); } MemoryContextSwitchTo(oldctx); /* * Does the subscription have tables? * * If there were not-READY relations found then we know it does. But * if table_states_not_ready was empty we still need to check again to * see if there are 0 tables. */ has_subrels = (table_states_not_ready != NIL) || HasSubscriptionRelations(MySubscription->oid); table_states_valid = true; } return has_subrels; } /* * If the subscription has no tables then return false. * * Otherwise, are all tablesyncs READY? * * Note: This function is not suitable to be called from outside of apply or * tablesync workers because MySubscription needs to be already initialized. */ bool AllTablesyncsReady(void) { bool started_tx = false; bool has_subrels = false; /* We need up-to-date sync state info for subscription tables here. */ has_subrels = FetchTableStates(&started_tx); if (started_tx) { CommitTransactionCommand(); pgstat_report_stat(true); } /* * Return false when there are no tables in subscription or not all tables * are in ready state; true otherwise. */ return has_subrels && (table_states_not_ready == NIL); } /* * Update the two_phase state of the specified subscription in pg_subscription. */ void UpdateTwoPhaseState(Oid suboid, char new_state) { Relation rel; HeapTuple tup; bool nulls[Natts_pg_subscription]; bool replaces[Natts_pg_subscription]; Datum values[Natts_pg_subscription]; Assert(new_state == LOGICALREP_TWOPHASE_STATE_DISABLED || new_state == LOGICALREP_TWOPHASE_STATE_PENDING || new_state == LOGICALREP_TWOPHASE_STATE_ENABLED); rel = table_open(SubscriptionRelationId, RowExclusiveLock); tup = SearchSysCacheCopy1(SUBSCRIPTIONOID, ObjectIdGetDatum(suboid)); if (!HeapTupleIsValid(tup)) elog(ERROR, "cache lookup failed for subscription oid %u", suboid); /* Form a new tuple. */ memset(values, 0, sizeof(values)); memset(nulls, false, sizeof(nulls)); memset(replaces, false, sizeof(replaces)); /* And update/set two_phase state */ values[Anum_pg_subscription_subtwophasestate - 1] = CharGetDatum(new_state); replaces[Anum_pg_subscription_subtwophasestate - 1] = true; tup = heap_modify_tuple(tup, RelationGetDescr(rel), values, nulls, replaces); CatalogTupleUpdate(rel, &tup->t_self, tup); heap_freetuple(tup); table_close(rel, RowExclusiveLock); }