summaryrefslogtreecommitdiffstats
path: root/src/backend/replication
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--src/backend/replication/libpqwalreceiver/libpqwalreceiver.c6
-rw-r--r--src/backend/replication/logical/snapbuild.c10
-rw-r--r--src/backend/replication/logical/tablesync.c48
-rw-r--r--src/backend/replication/pgoutput/pgoutput.c4
4 files changed, 46 insertions, 22 deletions
diff --git a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
index 096a08e..a468420 100644
--- a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
+++ b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
@@ -373,6 +373,10 @@ libpqrcv_identify_system(WalReceiverConn *conn, TimeLineID *primary_tli)
"the primary server: %s",
pchomp(PQerrorMessage(conn->streamConn)))));
}
+ /*
+ * IDENTIFY_SYSTEM returns 3 columns in 9.3 and earlier, and 4 columns in
+ * 9.4 and onwards.
+ */
if (PQnfields(res) < 3 || PQntuples(res) != 1)
{
int ntuples = PQntuples(res);
@@ -383,7 +387,7 @@ libpqrcv_identify_system(WalReceiverConn *conn, TimeLineID *primary_tli)
(errcode(ERRCODE_PROTOCOL_VIOLATION),
errmsg("invalid response from primary server"),
errdetail("Could not identify system: got %d rows and %d fields, expected %d rows and %d or more fields.",
- ntuples, nfields, 3, 1)));
+ ntuples, nfields, 1, 3)));
}
primary_sysid = pstrdup(PQgetvalue(res, 0, 0));
*primary_tli = pg_strtoint32(PQgetvalue(res, 0, 1));
diff --git a/src/backend/replication/logical/snapbuild.c b/src/backend/replication/logical/snapbuild.c
index 3eaaaac..3981994 100644
--- a/src/backend/replication/logical/snapbuild.c
+++ b/src/backend/replication/logical/snapbuild.c
@@ -2126,11 +2126,13 @@ SnapBuildXidSetCatalogChanges(SnapBuild *builder, TransactionId xid, int subxcnt
TransactionId *subxacts, XLogRecPtr lsn)
{
/*
- * Skip if there is no initial running xacts information or the
- * transaction is already marked as containing catalog changes.
+ * Skip if there is no initial running xacts information.
+ *
+ * Even if the transaction has been marked as containing catalog
+ * changes, it cannot be skipped because its subtransactions that
+ * modified the catalog may not be marked.
*/
- if (NInitialRunningXacts == 0 ||
- ReorderBufferXidHasCatalogChanges(builder->reorder, xid))
+ if (NInitialRunningXacts == 0)
return;
if (bsearch(&xid, InitialRunningXacts, NInitialRunningXacts,
diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c
index b7b933d..c3dd902 100644
--- a/src/backend/replication/logical/tablesync.c
+++ b/src/backend/replication/logical/tablesync.c
@@ -518,15 +518,25 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn)
/* Now safe to release the LWLock */
LWLockRelease(LogicalRepWorkerLock);
+ if (started_tx)
+ {
+ /*
+ * We must commit the existing transaction to release
+ * the existing locks before entering a busy loop.
+ * This is required to avoid any undetected deadlocks
+ * due to any existing lock as deadlock detector won't
+ * be able to detect the waits on the latch.
+ */
+ CommitTransactionCommand();
+ pgstat_report_stat(false);
+ }
+
/*
* Enter busy loop and wait for synchronization worker to
* reach expected state (or die trying).
*/
- if (!started_tx)
- {
- StartTransactionCommand();
- started_tx = true;
- }
+ StartTransactionCommand();
+ started_tx = true;
wait_for_relation_state_change(rstate->relid,
SUBREL_STATE_SYNCDONE);
@@ -1043,22 +1053,30 @@ copy_table(Relation rel)
/* Regular table with no row filter */
if (lrel.relkind == RELKIND_RELATION && qual == NIL)
{
- appendStringInfo(&cmd, "COPY %s (",
+ appendStringInfo(&cmd, "COPY %s",
quote_qualified_identifier(lrel.nspname, lrel.relname));
- /*
- * XXX Do we need to list the columns in all cases? Maybe we're
- * replicating all columns?
- */
- for (int i = 0; i < lrel.natts; i++)
+ /* If the table has columns, then specify the columns */
+ if (lrel.natts)
{
- if (i > 0)
- appendStringInfoString(&cmd, ", ");
+ appendStringInfoString(&cmd, " (");
- appendStringInfoString(&cmd, quote_identifier(lrel.attnames[i]));
+ /*
+ * XXX Do we need to list the columns in all cases? Maybe we're
+ * replicating all columns?
+ */
+ for (int i = 0; i < lrel.natts; i++)
+ {
+ if (i > 0)
+ appendStringInfoString(&cmd, ", ");
+
+ appendStringInfoString(&cmd, quote_identifier(lrel.attnames[i]));
+ }
+
+ appendStringInfoString(&cmd, ")");
}
- appendStringInfo(&cmd, ") TO STDOUT");
+ appendStringInfo(&cmd, " TO STDOUT");
}
else
{
diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c
index 63a8442..5fc0def 100644
--- a/src/backend/replication/pgoutput/pgoutput.c
+++ b/src/backend/replication/pgoutput/pgoutput.c
@@ -1095,8 +1095,8 @@ init_tuple_slot(PGOutputData *data, Relation relation,
* Create tuple table slots. Create a copy of the TupleDesc as it needs to
* live as long as the cache remains.
*/
- oldtupdesc = CreateTupleDescCopy(RelationGetDescr(relation));
- newtupdesc = CreateTupleDescCopy(RelationGetDescr(relation));
+ oldtupdesc = CreateTupleDescCopyConstr(RelationGetDescr(relation));
+ newtupdesc = CreateTupleDescCopyConstr(RelationGetDescr(relation));
entry->old_slot = MakeSingleTupleTableSlot(oldtupdesc, &TTSOpsHeapTuple);
entry->new_slot = MakeSingleTupleTableSlot(newtupdesc, &TTSOpsHeapTuple);