diff options
Diffstat (limited to '')
-rw-r--r-- | src/backend/replication/libpqwalreceiver/libpqwalreceiver.c | 6 | ||||
-rw-r--r-- | src/backend/replication/logical/snapbuild.c | 10 | ||||
-rw-r--r-- | src/backend/replication/logical/tablesync.c | 48 | ||||
-rw-r--r-- | src/backend/replication/pgoutput/pgoutput.c | 4 |
4 files changed, 46 insertions, 22 deletions
diff --git a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c index 096a08e..a468420 100644 --- a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c +++ b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c @@ -373,6 +373,10 @@ libpqrcv_identify_system(WalReceiverConn *conn, TimeLineID *primary_tli) "the primary server: %s", pchomp(PQerrorMessage(conn->streamConn))))); } + /* + * IDENTIFY_SYSTEM returns 3 columns in 9.3 and earlier, and 4 columns in + * 9.4 and onwards. + */ if (PQnfields(res) < 3 || PQntuples(res) != 1) { int ntuples = PQntuples(res); @@ -383,7 +387,7 @@ libpqrcv_identify_system(WalReceiverConn *conn, TimeLineID *primary_tli) (errcode(ERRCODE_PROTOCOL_VIOLATION), errmsg("invalid response from primary server"), errdetail("Could not identify system: got %d rows and %d fields, expected %d rows and %d or more fields.", - ntuples, nfields, 3, 1))); + ntuples, nfields, 1, 3))); } primary_sysid = pstrdup(PQgetvalue(res, 0, 0)); *primary_tli = pg_strtoint32(PQgetvalue(res, 0, 1)); diff --git a/src/backend/replication/logical/snapbuild.c b/src/backend/replication/logical/snapbuild.c index 3eaaaac..3981994 100644 --- a/src/backend/replication/logical/snapbuild.c +++ b/src/backend/replication/logical/snapbuild.c @@ -2126,11 +2126,13 @@ SnapBuildXidSetCatalogChanges(SnapBuild *builder, TransactionId xid, int subxcnt TransactionId *subxacts, XLogRecPtr lsn) { /* - * Skip if there is no initial running xacts information or the - * transaction is already marked as containing catalog changes. + * Skip if there is no initial running xacts information. + * + * Even if the transaction has been marked as containing catalog + * changes, it cannot be skipped because its subtransactions that + * modified the catalog may not be marked. */ - if (NInitialRunningXacts == 0 || - ReorderBufferXidHasCatalogChanges(builder->reorder, xid)) + if (NInitialRunningXacts == 0) return; if (bsearch(&xid, InitialRunningXacts, NInitialRunningXacts, diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c index b7b933d..c3dd902 100644 --- a/src/backend/replication/logical/tablesync.c +++ b/src/backend/replication/logical/tablesync.c @@ -518,15 +518,25 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn) /* Now safe to release the LWLock */ LWLockRelease(LogicalRepWorkerLock); + if (started_tx) + { + /* + * We must commit the existing transaction to release + * the existing locks before entering a busy loop. + * This is required to avoid any undetected deadlocks + * due to any existing lock as deadlock detector won't + * be able to detect the waits on the latch. + */ + CommitTransactionCommand(); + pgstat_report_stat(false); + } + /* * Enter busy loop and wait for synchronization worker to * reach expected state (or die trying). */ - if (!started_tx) - { - StartTransactionCommand(); - started_tx = true; - } + StartTransactionCommand(); + started_tx = true; wait_for_relation_state_change(rstate->relid, SUBREL_STATE_SYNCDONE); @@ -1043,22 +1053,30 @@ copy_table(Relation rel) /* Regular table with no row filter */ if (lrel.relkind == RELKIND_RELATION && qual == NIL) { - appendStringInfo(&cmd, "COPY %s (", + appendStringInfo(&cmd, "COPY %s", quote_qualified_identifier(lrel.nspname, lrel.relname)); - /* - * XXX Do we need to list the columns in all cases? Maybe we're - * replicating all columns? - */ - for (int i = 0; i < lrel.natts; i++) + /* If the table has columns, then specify the columns */ + if (lrel.natts) { - if (i > 0) - appendStringInfoString(&cmd, ", "); + appendStringInfoString(&cmd, " ("); - appendStringInfoString(&cmd, quote_identifier(lrel.attnames[i])); + /* + * XXX Do we need to list the columns in all cases? Maybe we're + * replicating all columns? + */ + for (int i = 0; i < lrel.natts; i++) + { + if (i > 0) + appendStringInfoString(&cmd, ", "); + + appendStringInfoString(&cmd, quote_identifier(lrel.attnames[i])); + } + + appendStringInfoString(&cmd, ")"); } - appendStringInfo(&cmd, ") TO STDOUT"); + appendStringInfo(&cmd, " TO STDOUT"); } else { diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c index 63a8442..5fc0def 100644 --- a/src/backend/replication/pgoutput/pgoutput.c +++ b/src/backend/replication/pgoutput/pgoutput.c @@ -1095,8 +1095,8 @@ init_tuple_slot(PGOutputData *data, Relation relation, * Create tuple table slots. Create a copy of the TupleDesc as it needs to * live as long as the cache remains. */ - oldtupdesc = CreateTupleDescCopy(RelationGetDescr(relation)); - newtupdesc = CreateTupleDescCopy(RelationGetDescr(relation)); + oldtupdesc = CreateTupleDescCopyConstr(RelationGetDescr(relation)); + newtupdesc = CreateTupleDescCopyConstr(RelationGetDescr(relation)); entry->old_slot = MakeSingleTupleTableSlot(oldtupdesc, &TTSOpsHeapTuple); entry->new_slot = MakeSingleTupleTableSlot(newtupdesc, &TTSOpsHeapTuple); |