summaryrefslogtreecommitdiffstats
path: root/src/bin/pg_dump/parallel.c
blob: da0723ad38587c36f53a6878349b433d3e735b4e (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072
1073
1074
1075
1076
1077
1078
1079
1080
1081
1082
1083
1084
1085
1086
1087
1088
1089
1090
1091
1092
1093
1094
1095
1096
1097
1098
1099
1100
1101
1102
1103
1104
1105
1106
1107
1108
1109
1110
1111
1112
1113
1114
1115
1116
1117
1118
1119
1120
1121
1122
1123
1124
1125
1126
1127
1128
1129
1130
1131
1132
1133
1134
1135
1136
1137
1138
1139
1140
1141
1142
1143
1144
1145
1146
1147
1148
1149
1150
1151
1152
1153
1154
1155
1156
1157
1158
1159
1160
1161
1162
1163
1164
1165
1166
1167
1168
1169
1170
1171
1172
1173
1174
1175
1176
1177
1178
1179
1180
1181
1182
1183
1184
1185
1186
1187
1188
1189
1190
1191
1192
1193
1194
1195
1196
1197
1198
1199
1200
1201
1202
1203
1204
1205
1206
1207
1208
1209
1210
1211
1212
1213
1214
1215
1216
1217
1218
1219
1220
1221
1222
1223
1224
1225
1226
1227
1228
1229
1230
1231
1232
1233
1234
1235
1236
1237
1238
1239
1240
1241
1242
1243
1244
1245
1246
1247
1248
1249
1250
1251
1252
1253
1254
1255
1256
1257
1258
1259
1260
1261
1262
1263
1264
1265
1266
1267
1268
1269
1270
1271
1272
1273
1274
1275
1276
1277
1278
1279
1280
1281
1282
1283
1284
1285
1286
1287
1288
1289
1290
1291
1292
1293
1294
1295
1296
1297
1298
1299
1300
1301
1302
1303
1304
1305
1306
1307
1308
1309
1310
1311
1312
1313
1314
1315
1316
1317
1318
1319
1320
1321
1322
1323
1324
1325
1326
1327
1328
1329
1330
1331
1332
1333
1334
1335
1336
1337
1338
1339
1340
1341
1342
1343
1344
1345
1346
1347
1348
1349
1350
1351
1352
1353
1354
1355
1356
1357
1358
1359
1360
1361
1362
1363
1364
1365
1366
1367
1368
1369
1370
1371
1372
1373
1374
1375
1376
1377
1378
1379
1380
1381
1382
1383
1384
1385
1386
1387
1388
1389
1390
1391
1392
1393
1394
1395
1396
1397
1398
1399
1400
1401
1402
1403
1404
1405
1406
1407
1408
1409
1410
1411
1412
1413
1414
1415
1416
1417
1418
1419
1420
1421
1422
1423
1424
1425
1426
1427
1428
1429
1430
1431
1432
1433
1434
1435
1436
1437
1438
1439
1440
1441
1442
1443
1444
1445
1446
1447
1448
1449
1450
1451
1452
1453
1454
1455
1456
1457
1458
1459
1460
1461
1462
1463
1464
1465
1466
1467
1468
1469
1470
1471
1472
1473
1474
1475
1476
1477
1478
1479
1480
1481
1482
1483
1484
1485
1486
1487
1488
1489
1490
1491
1492
1493
1494
1495
1496
1497
1498
1499
1500
1501
1502
1503
1504
1505
1506
1507
1508
1509
1510
1511
1512
1513
1514
1515
1516
1517
1518
1519
1520
1521
1522
1523
1524
1525
1526
1527
1528
1529
1530
1531
1532
1533
1534
1535
1536
1537
1538
1539
1540
1541
1542
1543
1544
1545
1546
1547
1548
1549
1550
1551
1552
1553
1554
1555
1556
1557
1558
1559
1560
1561
1562
1563
1564
1565
1566
1567
1568
1569
1570
1571
1572
1573
1574
1575
1576
1577
1578
1579
1580
1581
1582
1583
1584
1585
1586
1587
1588
1589
1590
1591
1592
1593
1594
1595
1596
1597
1598
1599
1600
1601
1602
1603
1604
1605
1606
1607
1608
1609
1610
1611
1612
1613
1614
1615
1616
1617
1618
1619
1620
1621
1622
1623
1624
1625
1626
1627
1628
1629
1630
1631
1632
1633
1634
1635
1636
1637
1638
1639
1640
1641
1642
1643
1644
1645
1646
1647
1648
1649
1650
1651
1652
1653
1654
1655
1656
1657
1658
1659
1660
1661
1662
1663
1664
1665
1666
1667
1668
1669
1670
1671
1672
1673
1674
1675
1676
1677
1678
1679
1680
1681
1682
1683
1684
1685
1686
1687
1688
1689
1690
1691
1692
1693
1694
1695
1696
1697
1698
1699
1700
1701
1702
1703
1704
1705
1706
1707
1708
1709
1710
1711
1712
1713
1714
1715
1716
1717
1718
1719
1720
1721
1722
1723
1724
1725
1726
1727
1728
1729
1730
1731
1732
1733
1734
1735
1736
1737
1738
1739
1740
1741
1742
1743
1744
1745
1746
1747
1748
1749
1750
1751
1752
1753
1754
1755
1756
1757
1758
1759
1760
1761
1762
1763
1764
1765
1766
1767
1768
1769
1770
1771
1772
1773
1774
1775
1776
1777
1778
1779
1780
1781
1782
1783
1784
1785
1786
1787
1788
1789
1790
1791
1792
1793
1794
1795
1796
1797
1798
1799
1800
1801
/*-------------------------------------------------------------------------
 *
 * parallel.c
 *
 *	Parallel support for pg_dump and pg_restore
 *
 * Portions Copyright (c) 1996-2023, PostgreSQL Global Development Group
 * Portions Copyright (c) 1994, Regents of the University of California
 *
 * IDENTIFICATION
 *		src/bin/pg_dump/parallel.c
 *
 *-------------------------------------------------------------------------
 */

/*
 * Parallel operation works like this:
 *
 * The original, leader process calls ParallelBackupStart(), which forks off
 * the desired number of worker processes, which each enter WaitForCommands().
 *
 * The leader process dispatches an individual work item to one of the worker
 * processes in DispatchJobForTocEntry().  We send a command string such as
 * "DUMP 1234" or "RESTORE 1234", where 1234 is the TocEntry ID.
 * The worker process receives and decodes the command and passes it to the
 * routine pointed to by AH->WorkerJobDumpPtr or AH->WorkerJobRestorePtr,
 * which are routines of the current archive format.  That routine performs
 * the required action (dump or restore) and returns an integer status code.
 * This is passed back to the leader where we pass it to the
 * ParallelCompletionPtr callback function that was passed to
 * DispatchJobForTocEntry().  The callback function does state updating
 * for the leader control logic in pg_backup_archiver.c.
 *
 * In principle additional archive-format-specific information might be needed
 * in commands or worker status responses, but so far that hasn't proved
 * necessary, since workers have full copies of the ArchiveHandle/TocEntry
 * data structures.  Remember that we have forked off the workers only after
 * we have read in the catalog.  That's why our worker processes can also
 * access the catalog information.  (In the Windows case, the workers are
 * threads in the same process.  To avoid problems, they work with cloned
 * copies of the Archive data structure; see RunWorker().)
 *
 * In the leader process, the workerStatus field for each worker has one of
 * the following values:
 *		WRKR_NOT_STARTED: we've not yet forked this worker
 *		WRKR_IDLE: it's waiting for a command
 *		WRKR_WORKING: it's working on a command
 *		WRKR_TERMINATED: process ended
 * The pstate->te[] entry for each worker is valid when it's in WRKR_WORKING
 * state, and must be NULL in other states.
 */

#include "postgres_fe.h"

#ifndef WIN32
#include <sys/select.h>
#include <sys/wait.h>
#include <signal.h>
#include <unistd.h>
#include <fcntl.h>
#endif

#include "fe_utils/string_utils.h"
#include "parallel.h"
#include "pg_backup_utils.h"
#include "port/pg_bswap.h"

/* Mnemonic macros for indexing the fd array returned by pipe(2) */
#define PIPE_READ							0
#define PIPE_WRITE							1

#define NO_SLOT (-1)			/* Failure result for GetIdleWorker() */

/* Worker process statuses */
typedef enum
{
	WRKR_NOT_STARTED = 0,
	WRKR_IDLE,
	WRKR_WORKING,
	WRKR_TERMINATED
} T_WorkerStatus;

#define WORKER_IS_RUNNING(workerStatus) \
	((workerStatus) == WRKR_IDLE || (workerStatus) == WRKR_WORKING)

/*
 * Private per-parallel-worker state (typedef for this is in parallel.h).
 *
 * Much of this is valid only in the leader process (or, on Windows, should
 * be touched only by the leader thread).  But the AH field should be touched
 * only by workers.  The pipe descriptors are valid everywhere.
 */
struct ParallelSlot
{
	T_WorkerStatus workerStatus;	/* see enum above */

	/* These fields are valid if workerStatus == WRKR_WORKING: */
	ParallelCompletionPtr callback; /* function to call on completion */
	void	   *callback_data;	/* passthrough data for it */

	ArchiveHandle *AH;			/* Archive data worker is using */

	int			pipeRead;		/* leader's end of the pipes */
	int			pipeWrite;
	int			pipeRevRead;	/* child's end of the pipes */
	int			pipeRevWrite;

	/* Child process/thread identity info: */
#ifdef WIN32
	uintptr_t	hThread;
	unsigned int threadId;
#else
	pid_t		pid;
#endif
};

#ifdef WIN32

/*
 * Structure to hold info passed by _beginthreadex() to the function it calls
 * via its single allowed argument.
 */
typedef struct
{
	ArchiveHandle *AH;			/* leader database connection */
	ParallelSlot *slot;			/* this worker's parallel slot */
} WorkerInfo;

/* Windows implementation of pipe access */
static int	pgpipe(int handles[2]);
#define piperead(a,b,c)		recv(a,b,c,0)
#define pipewrite(a,b,c)	send(a,b,c,0)

#else							/* !WIN32 */

/* Non-Windows implementation of pipe access */
#define pgpipe(a)			pipe(a)
#define piperead(a,b,c)		read(a,b,c)
#define pipewrite(a,b,c)	write(a,b,c)

#endif							/* WIN32 */

/*
 * State info for archive_close_connection() shutdown callback.
 */
typedef struct ShutdownInformation
{
	ParallelState *pstate;
	Archive    *AHX;
} ShutdownInformation;

static ShutdownInformation shutdown_info;

/*
 * State info for signal handling.
 * We assume signal_info initializes to zeroes.
 *
 * On Unix, myAH is the leader DB connection in the leader process, and the
 * worker's own connection in worker processes.  On Windows, we have only one
 * instance of signal_info, so myAH is the leader connection and the worker
 * connections must be dug out of pstate->parallelSlot[].
 */
typedef struct DumpSignalInformation
{
	ArchiveHandle *myAH;		/* database connection to issue cancel for */
	ParallelState *pstate;		/* parallel state, if any */
	bool		handler_set;	/* signal handler set up in this process? */
#ifndef WIN32
	bool		am_worker;		/* am I a worker process? */
#endif
} DumpSignalInformation;

static volatile DumpSignalInformation signal_info;

#ifdef WIN32
static CRITICAL_SECTION signal_info_lock;
#endif

/*
 * Write a simple string to stderr --- must be safe in a signal handler.
 * We ignore the write() result since there's not much we could do about it.
 * Certain compilers make that harder than it ought to be.
 */
#define write_stderr(str) \
	do { \
		const char *str_ = (str); \
		int		rc_; \
		rc_ = write(fileno(stderr), str_, strlen(str_)); \
		(void) rc_; \
	} while (0)


#ifdef WIN32
/* file-scope variables */
static DWORD tls_index;

/* globally visible variables (needed by exit_nicely) */
bool		parallel_init_done = false;
DWORD		mainThreadId;
#endif							/* WIN32 */

/* Local function prototypes */
static ParallelSlot *GetMyPSlot(ParallelState *pstate);
static void archive_close_connection(int code, void *arg);
static void ShutdownWorkersHard(ParallelState *pstate);
static void WaitForTerminatingWorkers(ParallelState *pstate);
static void setup_cancel_handler(void);
static void set_cancel_pstate(ParallelState *pstate);
static void set_cancel_slot_archive(ParallelSlot *slot, ArchiveHandle *AH);
static void RunWorker(ArchiveHandle *AH, ParallelSlot *slot);
static int	GetIdleWorker(ParallelState *pstate);
static bool HasEveryWorkerTerminated(ParallelState *pstate);
static void lockTableForWorker(ArchiveHandle *AH, TocEntry *te);
static void WaitForCommands(ArchiveHandle *AH, int pipefd[2]);
static bool ListenToWorkers(ArchiveHandle *AH, ParallelState *pstate,
							bool do_wait);
static char *getMessageFromLeader(int pipefd[2]);
static void sendMessageToLeader(int pipefd[2], const char *str);
static int	select_loop(int maxFd, fd_set *workerset);
static char *getMessageFromWorker(ParallelState *pstate,
								  bool do_wait, int *worker);
static void sendMessageToWorker(ParallelState *pstate,
								int worker, const char *str);
static char *readMessageFromPipe(int fd);

#define messageStartsWith(msg, prefix) \
	(strncmp(msg, prefix, strlen(prefix)) == 0)


/*
 * Initialize parallel dump support --- should be called early in process
 * startup.  (Currently, this is called whether or not we intend parallel
 * activity.)
 */
void
init_parallel_dump_utils(void)
{
#ifdef WIN32
	if (!parallel_init_done)
	{
		WSADATA		wsaData;
		int			err;

		/* Prepare for threaded operation */
		tls_index = TlsAlloc();
		mainThreadId = GetCurrentThreadId();

		/* Initialize socket access */
		err = WSAStartup(MAKEWORD(2, 2), &wsaData);
		if (err != 0)
			pg_fatal("%s() failed: error code %d", "WSAStartup", err);

		parallel_init_done = true;
	}
#endif
}

/*
 * Find the ParallelSlot for the current worker process or thread.
 *
 * Returns NULL if no matching slot is found (this implies we're the leader).
 */
static ParallelSlot *
GetMyPSlot(ParallelState *pstate)
{
	int			i;

	for (i = 0; i < pstate->numWorkers; i++)
	{
#ifdef WIN32
		if (pstate->parallelSlot[i].threadId == GetCurrentThreadId())
#else
		if (pstate->parallelSlot[i].pid == getpid())
#endif
			return &(pstate->parallelSlot[i]);
	}

	return NULL;
}

/*
 * A thread-local version of getLocalPQExpBuffer().
 *
 * Non-reentrant but reduces memory leakage: we'll consume one buffer per
 * thread, which is much better than one per fmtId/fmtQualifiedId call.
 */
#ifdef WIN32
static PQExpBuffer
getThreadLocalPQExpBuffer(void)
{
	/*
	 * The Tls code goes awry if we use a static var, so we provide for both
	 * static and auto, and omit any use of the static var when using Tls. We
	 * rely on TlsGetValue() to return 0 if the value is not yet set.
	 */
	static PQExpBuffer s_id_return = NULL;
	PQExpBuffer id_return;

	if (parallel_init_done)
		id_return = (PQExpBuffer) TlsGetValue(tls_index);
	else
		id_return = s_id_return;

	if (id_return)				/* first time through? */
	{
		/* same buffer, just wipe contents */
		resetPQExpBuffer(id_return);
	}
	else
	{
		/* new buffer */
		id_return = createPQExpBuffer();
		if (parallel_init_done)
			TlsSetValue(tls_index, id_return);
		else
			s_id_return = id_return;
	}

	return id_return;
}
#endif							/* WIN32 */

/*
 * pg_dump and pg_restore call this to register the cleanup handler
 * as soon as they've created the ArchiveHandle.
 */
void
on_exit_close_archive(Archive *AHX)
{
	shutdown_info.AHX = AHX;
	on_exit_nicely(archive_close_connection, &shutdown_info);
}

/*
 * on_exit_nicely handler for shutting down database connections and
 * worker processes cleanly.
 */
static void
archive_close_connection(int code, void *arg)
{
	ShutdownInformation *si = (ShutdownInformation *) arg;

	if (si->pstate)
	{
		/* In parallel mode, must figure out who we are */
		ParallelSlot *slot = GetMyPSlot(si->pstate);

		if (!slot)
		{
			/*
			 * We're the leader.  Forcibly shut down workers, then close our
			 * own database connection, if any.
			 */
			ShutdownWorkersHard(si->pstate);

			if (si->AHX)
				DisconnectDatabase(si->AHX);
		}
		else
		{
			/*
			 * We're a worker.  Shut down our own DB connection if any.  On
			 * Windows, we also have to close our communication sockets, to
			 * emulate what will happen on Unix when the worker process exits.
			 * (Without this, if this is a premature exit, the leader would
			 * fail to detect it because there would be no EOF condition on
			 * the other end of the pipe.)
			 */
			if (slot->AH)
				DisconnectDatabase(&(slot->AH->public));

#ifdef WIN32
			closesocket(slot->pipeRevRead);
			closesocket(slot->pipeRevWrite);
#endif
		}
	}
	else
	{
		/* Non-parallel operation: just kill the leader DB connection */
		if (si->AHX)
			DisconnectDatabase(si->AHX);
	}
}

/*
 * Forcibly shut down any remaining workers, waiting for them to finish.
 *
 * Note that we don't expect to come here during normal exit (the workers
 * should be long gone, and the ParallelState too).  We're only here in a
 * pg_fatal() situation, so intervening to cancel active commands is
 * appropriate.
 */
static void
ShutdownWorkersHard(ParallelState *pstate)
{
	int			i;

	/*
	 * Close our write end of the sockets so that any workers waiting for
	 * commands know they can exit.  (Note: some of the pipeWrite fields might
	 * still be zero, if we failed to initialize all the workers.  Hence, just
	 * ignore errors here.)
	 */
	for (i = 0; i < pstate->numWorkers; i++)
		closesocket(pstate->parallelSlot[i].pipeWrite);

	/*
	 * Force early termination of any commands currently in progress.
	 */
#ifndef WIN32
	/* On non-Windows, send SIGTERM to each worker process. */
	for (i = 0; i < pstate->numWorkers; i++)
	{
		pid_t		pid = pstate->parallelSlot[i].pid;

		if (pid != 0)
			kill(pid, SIGTERM);
	}
#else

	/*
	 * On Windows, send query cancels directly to the workers' backends.  Use
	 * a critical section to ensure worker threads don't change state.
	 */
	EnterCriticalSection(&signal_info_lock);
	for (i = 0; i < pstate->numWorkers; i++)
	{
		ArchiveHandle *AH = pstate->parallelSlot[i].AH;
		char		errbuf[1];

		if (AH != NULL && AH->connCancel != NULL)
			(void) PQcancel(AH->connCancel, errbuf, sizeof(errbuf));
	}
	LeaveCriticalSection(&signal_info_lock);
#endif

	/* Now wait for them to terminate. */
	WaitForTerminatingWorkers(pstate);
}

/*
 * Wait for all workers to terminate.
 */
static void
WaitForTerminatingWorkers(ParallelState *pstate)
{
	while (!HasEveryWorkerTerminated(pstate))
	{
		ParallelSlot *slot = NULL;
		int			j;

#ifndef WIN32
		/* On non-Windows, use wait() to wait for next worker to end */
		int			status;
		pid_t		pid = wait(&status);

		/* Find dead worker's slot, and clear the PID field */
		for (j = 0; j < pstate->numWorkers; j++)
		{
			slot = &(pstate->parallelSlot[j]);
			if (slot->pid == pid)
			{
				slot->pid = 0;
				break;
			}
		}
#else							/* WIN32 */
		/* On Windows, we must use WaitForMultipleObjects() */
		HANDLE	   *lpHandles = pg_malloc(sizeof(HANDLE) * pstate->numWorkers);
		int			nrun = 0;
		DWORD		ret;
		uintptr_t	hThread;

		for (j = 0; j < pstate->numWorkers; j++)
		{
			if (WORKER_IS_RUNNING(pstate->parallelSlot[j].workerStatus))
			{
				lpHandles[nrun] = (HANDLE) pstate->parallelSlot[j].hThread;
				nrun++;
			}
		}
		ret = WaitForMultipleObjects(nrun, lpHandles, false, INFINITE);
		Assert(ret != WAIT_FAILED);
		hThread = (uintptr_t) lpHandles[ret - WAIT_OBJECT_0];
		free(lpHandles);

		/* Find dead worker's slot, and clear the hThread field */
		for (j = 0; j < pstate->numWorkers; j++)
		{
			slot = &(pstate->parallelSlot[j]);
			if (slot->hThread == hThread)
			{
				/* For cleanliness, close handles for dead threads */
				CloseHandle((HANDLE) slot->hThread);
				slot->hThread = (uintptr_t) INVALID_HANDLE_VALUE;
				break;
			}
		}
#endif							/* WIN32 */

		/* On all platforms, update workerStatus and te[] as well */
		Assert(j < pstate->numWorkers);
		slot->workerStatus = WRKR_TERMINATED;
		pstate->te[j] = NULL;
	}
}


/*
 * Code for responding to cancel interrupts (SIGINT, control-C, etc)
 *
 * This doesn't quite belong in this module, but it needs access to the
 * ParallelState data, so there's not really a better place either.
 *
 * When we get a cancel interrupt, we could just die, but in pg_restore that
 * could leave a SQL command (e.g., CREATE INDEX on a large table) running
 * for a long time.  Instead, we try to send a cancel request and then die.
 * pg_dump probably doesn't really need this, but we might as well use it
 * there too.  Note that sending the cancel directly from the signal handler
 * is safe because PQcancel() is written to make it so.
 *
 * In parallel operation on Unix, each process is responsible for canceling
 * its own connection (this must be so because nobody else has access to it).
 * Furthermore, the leader process should attempt to forward its signal to
 * each child.  In simple manual use of pg_dump/pg_restore, forwarding isn't
 * needed because typing control-C at the console would deliver SIGINT to
 * every member of the terminal process group --- but in other scenarios it
 * might be that only the leader gets signaled.
 *
 * On Windows, the cancel handler runs in a separate thread, because that's
 * how SetConsoleCtrlHandler works.  We make it stop worker threads, send
 * cancels on all active connections, and then return FALSE, which will allow
 * the process to die.  For safety's sake, we use a critical section to
 * protect the PGcancel structures against being changed while the signal
 * thread runs.
 */

#ifndef WIN32

/*
 * Signal handler (Unix only)
 */
static void
sigTermHandler(SIGNAL_ARGS)
{
	int			i;
	char		errbuf[1];

	/*
	 * Some platforms allow delivery of new signals to interrupt an active
	 * signal handler.  That could muck up our attempt to send PQcancel, so
	 * disable the signals that setup_cancel_handler enabled.
	 */
	pqsignal(SIGINT, SIG_IGN);
	pqsignal(SIGTERM, SIG_IGN);
	pqsignal(SIGQUIT, SIG_IGN);

	/*
	 * If we're in the leader, forward signal to all workers.  (It seems best
	 * to do this before PQcancel; killing the leader transaction will result
	 * in invalid-snapshot errors from active workers, which maybe we can
	 * quiet by killing workers first.)  Ignore any errors.
	 */
	if (signal_info.pstate != NULL)
	{
		for (i = 0; i < signal_info.pstate->numWorkers; i++)
		{
			pid_t		pid = signal_info.pstate->parallelSlot[i].pid;

			if (pid != 0)
				kill(pid, SIGTERM);
		}
	}

	/*
	 * Send QueryCancel if we have a connection to send to.  Ignore errors,
	 * there's not much we can do about them anyway.
	 */
	if (signal_info.myAH != NULL && signal_info.myAH->connCancel != NULL)
		(void) PQcancel(signal_info.myAH->connCancel, errbuf, sizeof(errbuf));

	/*
	 * Report we're quitting, using nothing more complicated than write(2).
	 * When in parallel operation, only the leader process should do this.
	 */
	if (!signal_info.am_worker)
	{
		if (progname)
		{
			write_stderr(progname);
			write_stderr(": ");
		}
		write_stderr("terminated by user\n");
	}

	/*
	 * And die, using _exit() not exit() because the latter will invoke atexit
	 * handlers that can fail if we interrupted related code.
	 */
	_exit(1);
}

/*
 * Enable cancel interrupt handler, if not already done.
 */
static void
setup_cancel_handler(void)
{
	/*
	 * When forking, signal_info.handler_set will propagate into the new
	 * process, but that's fine because the signal handler state does too.
	 */
	if (!signal_info.handler_set)
	{
		signal_info.handler_set = true;

		pqsignal(SIGINT, sigTermHandler);
		pqsignal(SIGTERM, sigTermHandler);
		pqsignal(SIGQUIT, sigTermHandler);
	}
}

#else							/* WIN32 */

/*
 * Console interrupt handler --- runs in a newly-started thread.
 *
 * After stopping other threads and sending cancel requests on all open
 * connections, we return FALSE which will allow the default ExitProcess()
 * action to be taken.
 */
static BOOL WINAPI
consoleHandler(DWORD dwCtrlType)
{
	int			i;
	char		errbuf[1];

	if (dwCtrlType == CTRL_C_EVENT ||
		dwCtrlType == CTRL_BREAK_EVENT)
	{
		/* Critical section prevents changing data we look at here */
		EnterCriticalSection(&signal_info_lock);

		/*
		 * If in parallel mode, stop worker threads and send QueryCancel to
		 * their connected backends.  The main point of stopping the worker
		 * threads is to keep them from reporting the query cancels as errors,
		 * which would clutter the user's screen.  We needn't stop the leader
		 * thread since it won't be doing much anyway.  Do this before
		 * canceling the main transaction, else we might get invalid-snapshot
		 * errors reported before we can stop the workers.  Ignore errors,
		 * there's not much we can do about them anyway.
		 */
		if (signal_info.pstate != NULL)
		{
			for (i = 0; i < signal_info.pstate->numWorkers; i++)
			{
				ParallelSlot *slot = &(signal_info.pstate->parallelSlot[i]);
				ArchiveHandle *AH = slot->AH;
				HANDLE		hThread = (HANDLE) slot->hThread;

				/*
				 * Using TerminateThread here may leave some resources leaked,
				 * but it doesn't matter since we're about to end the whole
				 * process.
				 */
				if (hThread != INVALID_HANDLE_VALUE)
					TerminateThread(hThread, 0);

				if (AH != NULL && AH->connCancel != NULL)
					(void) PQcancel(AH->connCancel, errbuf, sizeof(errbuf));
			}
		}

		/*
		 * Send QueryCancel to leader connection, if enabled.  Ignore errors,
		 * there's not much we can do about them anyway.
		 */
		if (signal_info.myAH != NULL && signal_info.myAH->connCancel != NULL)
			(void) PQcancel(signal_info.myAH->connCancel,
							errbuf, sizeof(errbuf));

		LeaveCriticalSection(&signal_info_lock);

		/*
		 * Report we're quitting, using nothing more complicated than
		 * write(2).  (We might be able to get away with using pg_log_*()
		 * here, but since we terminated other threads uncleanly above, it
		 * seems better to assume as little as possible.)
		 */
		if (progname)
		{
			write_stderr(progname);
			write_stderr(": ");
		}
		write_stderr("terminated by user\n");
	}

	/* Always return FALSE to allow signal handling to continue */
	return FALSE;
}

/*
 * Enable cancel interrupt handler, if not already done.
 */
static void
setup_cancel_handler(void)
{
	if (!signal_info.handler_set)
	{
		signal_info.handler_set = true;

		InitializeCriticalSection(&signal_info_lock);

		SetConsoleCtrlHandler(consoleHandler, TRUE);
	}
}

#endif							/* WIN32 */


/*
 * set_archive_cancel_info
 *
 * Fill AH->connCancel with cancellation info for the specified database
 * connection; or clear it if conn is NULL.
 */
void
set_archive_cancel_info(ArchiveHandle *AH, PGconn *conn)
{
	PGcancel   *oldConnCancel;

	/*
	 * Activate the interrupt handler if we didn't yet in this process.  On
	 * Windows, this also initializes signal_info_lock; therefore it's
	 * important that this happen at least once before we fork off any
	 * threads.
	 */
	setup_cancel_handler();

	/*
	 * On Unix, we assume that storing a pointer value is atomic with respect
	 * to any possible signal interrupt.  On Windows, use a critical section.
	 */

#ifdef WIN32
	EnterCriticalSection(&signal_info_lock);
#endif

	/* Free the old one if we have one */
	oldConnCancel = AH->connCancel;
	/* be sure interrupt handler doesn't use pointer while freeing */
	AH->connCancel = NULL;

	if (oldConnCancel != NULL)
		PQfreeCancel(oldConnCancel);

	/* Set the new one if specified */
	if (conn)
		AH->connCancel = PQgetCancel(conn);

	/*
	 * On Unix, there's only ever one active ArchiveHandle per process, so we
	 * can just set signal_info.myAH unconditionally.  On Windows, do that
	 * only in the main thread; worker threads have to make sure their
	 * ArchiveHandle appears in the pstate data, which is dealt with in
	 * RunWorker().
	 */
#ifndef WIN32
	signal_info.myAH = AH;
#else
	if (mainThreadId == GetCurrentThreadId())
		signal_info.myAH = AH;
#endif

#ifdef WIN32
	LeaveCriticalSection(&signal_info_lock);
#endif
}

/*
 * set_cancel_pstate
 *
 * Set signal_info.pstate to point to the specified ParallelState, if any.
 * We need this mainly to have an interlock against Windows signal thread.
 */
static void
set_cancel_pstate(ParallelState *pstate)
{
#ifdef WIN32
	EnterCriticalSection(&signal_info_lock);
#endif

	signal_info.pstate = pstate;

#ifdef WIN32
	LeaveCriticalSection(&signal_info_lock);
#endif
}

/*
 * set_cancel_slot_archive
 *
 * Set ParallelSlot's AH field to point to the specified archive, if any.
 * We need this mainly to have an interlock against Windows signal thread.
 */
static void
set_cancel_slot_archive(ParallelSlot *slot, ArchiveHandle *AH)
{
#ifdef WIN32
	EnterCriticalSection(&signal_info_lock);
#endif

	slot->AH = AH;

#ifdef WIN32
	LeaveCriticalSection(&signal_info_lock);
#endif
}


/*
 * This function is called by both Unix and Windows variants to set up
 * and run a worker process.  Caller should exit the process (or thread)
 * upon return.
 */
static void
RunWorker(ArchiveHandle *AH, ParallelSlot *slot)
{
	int			pipefd[2];

	/* fetch child ends of pipes */
	pipefd[PIPE_READ] = slot->pipeRevRead;
	pipefd[PIPE_WRITE] = slot->pipeRevWrite;

	/*
	 * Clone the archive so that we have our own state to work with, and in
	 * particular our own database connection.
	 *
	 * We clone on Unix as well as Windows, even though technically we don't
	 * need to because fork() gives us a copy in our own address space
	 * already.  But CloneArchive resets the state information and also clones
	 * the database connection which both seem kinda helpful.
	 */
	AH = CloneArchive(AH);

	/* Remember cloned archive where signal handler can find it */
	set_cancel_slot_archive(slot, AH);

	/*
	 * Call the setup worker function that's defined in the ArchiveHandle.
	 */
	(AH->SetupWorkerPtr) ((Archive *) AH);

	/*
	 * Execute commands until done.
	 */
	WaitForCommands(AH, pipefd);

	/*
	 * Disconnect from database and clean up.
	 */
	set_cancel_slot_archive(slot, NULL);
	DisconnectDatabase(&(AH->public));
	DeCloneArchive(AH);
}

/*
 * Thread base function for Windows
 */
#ifdef WIN32
static unsigned __stdcall
init_spawned_worker_win32(WorkerInfo *wi)
{
	ArchiveHandle *AH = wi->AH;
	ParallelSlot *slot = wi->slot;

	/* Don't need WorkerInfo anymore */
	free(wi);

	/* Run the worker ... */
	RunWorker(AH, slot);

	/* Exit the thread */
	_endthreadex(0);
	return 0;
}
#endif							/* WIN32 */

/*
 * This function starts a parallel dump or restore by spawning off the worker
 * processes.  For Windows, it creates a number of threads; on Unix the
 * workers are created with fork().
 */
ParallelState *
ParallelBackupStart(ArchiveHandle *AH)
{
	ParallelState *pstate;
	int			i;

	Assert(AH->public.numWorkers > 0);

	pstate = (ParallelState *) pg_malloc(sizeof(ParallelState));

	pstate->numWorkers = AH->public.numWorkers;
	pstate->te = NULL;
	pstate->parallelSlot = NULL;

	if (AH->public.numWorkers == 1)
		return pstate;

	/* Create status arrays, being sure to initialize all fields to 0 */
	pstate->te = (TocEntry **)
		pg_malloc0(pstate->numWorkers * sizeof(TocEntry *));
	pstate->parallelSlot = (ParallelSlot *)
		pg_malloc0(pstate->numWorkers * sizeof(ParallelSlot));

#ifdef WIN32
	/* Make fmtId() and fmtQualifiedId() use thread-local storage */
	getLocalPQExpBuffer = getThreadLocalPQExpBuffer;
#endif

	/*
	 * Set the pstate in shutdown_info, to tell the exit handler that it must
	 * clean up workers as well as the main database connection.  But we don't
	 * set this in signal_info yet, because we don't want child processes to
	 * inherit non-NULL signal_info.pstate.
	 */
	shutdown_info.pstate = pstate;

	/*
	 * Temporarily disable query cancellation on the leader connection.  This
	 * ensures that child processes won't inherit valid AH->connCancel
	 * settings and thus won't try to issue cancels against the leader's
	 * connection.  No harm is done if we fail while it's disabled, because
	 * the leader connection is idle at this point anyway.
	 */
	set_archive_cancel_info(AH, NULL);

	/* Ensure stdio state is quiesced before forking */
	fflush(NULL);

	/* Create desired number of workers */
	for (i = 0; i < pstate->numWorkers; i++)
	{
#ifdef WIN32
		WorkerInfo *wi;
		uintptr_t	handle;
#else
		pid_t		pid;
#endif
		ParallelSlot *slot = &(pstate->parallelSlot[i]);
		int			pipeMW[2],
					pipeWM[2];

		/* Create communication pipes for this worker */
		if (pgpipe(pipeMW) < 0 || pgpipe(pipeWM) < 0)
			pg_fatal("could not create communication channels: %m");

		/* leader's ends of the pipes */
		slot->pipeRead = pipeWM[PIPE_READ];
		slot->pipeWrite = pipeMW[PIPE_WRITE];
		/* child's ends of the pipes */
		slot->pipeRevRead = pipeMW[PIPE_READ];
		slot->pipeRevWrite = pipeWM[PIPE_WRITE];

#ifdef WIN32
		/* Create transient structure to pass args to worker function */
		wi = (WorkerInfo *) pg_malloc(sizeof(WorkerInfo));

		wi->AH = AH;
		wi->slot = slot;

		handle = _beginthreadex(NULL, 0, (void *) &init_spawned_worker_win32,
								wi, 0, &(slot->threadId));
		slot->hThread = handle;
		slot->workerStatus = WRKR_IDLE;
#else							/* !WIN32 */
		pid = fork();
		if (pid == 0)
		{
			/* we are the worker */
			int			j;

			/* this is needed for GetMyPSlot() */
			slot->pid = getpid();

			/* instruct signal handler that we're in a worker now */
			signal_info.am_worker = true;

			/* close read end of Worker -> Leader */
			closesocket(pipeWM[PIPE_READ]);
			/* close write end of Leader -> Worker */
			closesocket(pipeMW[PIPE_WRITE]);

			/*
			 * Close all inherited fds for communication of the leader with
			 * previously-forked workers.
			 */
			for (j = 0; j < i; j++)
			{
				closesocket(pstate->parallelSlot[j].pipeRead);
				closesocket(pstate->parallelSlot[j].pipeWrite);
			}

			/* Run the worker ... */
			RunWorker(AH, slot);

			/* We can just exit(0) when done */
			exit(0);
		}
		else if (pid < 0)
		{
			/* fork failed */
			pg_fatal("could not create worker process: %m");
		}

		/* In Leader after successful fork */
		slot->pid = pid;
		slot->workerStatus = WRKR_IDLE;

		/* close read end of Leader -> Worker */
		closesocket(pipeMW[PIPE_READ]);
		/* close write end of Worker -> Leader */
		closesocket(pipeWM[PIPE_WRITE]);
#endif							/* WIN32 */
	}

	/*
	 * Having forked off the workers, disable SIGPIPE so that leader isn't
	 * killed if it tries to send a command to a dead worker.  We don't want
	 * the workers to inherit this setting, though.
	 */
#ifndef WIN32
	pqsignal(SIGPIPE, SIG_IGN);
#endif

	/*
	 * Re-establish query cancellation on the leader connection.
	 */
	set_archive_cancel_info(AH, AH->connection);

	/*
	 * Tell the cancel signal handler to forward signals to worker processes,
	 * too.  (As with query cancel, we did not need this earlier because the
	 * workers have not yet been given anything to do; if we die before this
	 * point, any already-started workers will see EOF and quit promptly.)
	 */
	set_cancel_pstate(pstate);

	return pstate;
}

/*
 * Close down a parallel dump or restore.
 */
void
ParallelBackupEnd(ArchiveHandle *AH, ParallelState *pstate)
{
	int			i;

	/* No work if non-parallel */
	if (pstate->numWorkers == 1)
		return;

	/* There should not be any unfinished jobs */
	Assert(IsEveryWorkerIdle(pstate));

	/* Close the sockets so that the workers know they can exit */
	for (i = 0; i < pstate->numWorkers; i++)
	{
		closesocket(pstate->parallelSlot[i].pipeRead);
		closesocket(pstate->parallelSlot[i].pipeWrite);
	}

	/* Wait for them to exit */
	WaitForTerminatingWorkers(pstate);

	/*
	 * Unlink pstate from shutdown_info, so the exit handler will not try to
	 * use it; and likewise unlink from signal_info.
	 */
	shutdown_info.pstate = NULL;
	set_cancel_pstate(NULL);

	/* Release state (mere neatnik-ism, since we're about to terminate) */
	free(pstate->te);
	free(pstate->parallelSlot);
	free(pstate);
}

/*
 * These next four functions handle construction and parsing of the command
 * strings and response strings for parallel workers.
 *
 * Currently, these can be the same regardless of which archive format we are
 * processing.  In future, we might want to let format modules override these
 * functions to add format-specific data to a command or response.
 */

/*
 * buildWorkerCommand: format a command string to send to a worker.
 *
 * The string is built in the caller-supplied buffer of size buflen.
 */
static void
buildWorkerCommand(ArchiveHandle *AH, TocEntry *te, T_Action act,
				   char *buf, int buflen)
{
	if (act == ACT_DUMP)
		snprintf(buf, buflen, "DUMP %d", te->dumpId);
	else if (act == ACT_RESTORE)
		snprintf(buf, buflen, "RESTORE %d", te->dumpId);
	else
		Assert(false);
}

/*
 * parseWorkerCommand: interpret a command string in a worker.
 */
static void
parseWorkerCommand(ArchiveHandle *AH, TocEntry **te, T_Action *act,
				   const char *msg)
{
	DumpId		dumpId;
	int			nBytes;

	if (messageStartsWith(msg, "DUMP "))
	{
		*act = ACT_DUMP;
		sscanf(msg, "DUMP %d%n", &dumpId, &nBytes);
		Assert(nBytes == strlen(msg));
		*te = getTocEntryByDumpId(AH, dumpId);
		Assert(*te != NULL);
	}
	else if (messageStartsWith(msg, "RESTORE "))
	{
		*act = ACT_RESTORE;
		sscanf(msg, "RESTORE %d%n", &dumpId, &nBytes);
		Assert(nBytes == strlen(msg));
		*te = getTocEntryByDumpId(AH, dumpId);
		Assert(*te != NULL);
	}
	else
		pg_fatal("unrecognized command received from leader: \"%s\"",
				 msg);
}

/*
 * buildWorkerResponse: format a response string to send to the leader.
 *
 * The string is built in the caller-supplied buffer of size buflen.
 */
static void
buildWorkerResponse(ArchiveHandle *AH, TocEntry *te, T_Action act, int status,
					char *buf, int buflen)
{
	snprintf(buf, buflen, "OK %d %d %d",
			 te->dumpId,
			 status,
			 status == WORKER_IGNORED_ERRORS ? AH->public.n_errors : 0);
}

/*
 * parseWorkerResponse: parse the status message returned by a worker.
 *
 * Returns the integer status code, and may update fields of AH and/or te.
 */
static int
parseWorkerResponse(ArchiveHandle *AH, TocEntry *te,
					const char *msg)
{
	DumpId		dumpId;
	int			nBytes,
				n_errors;
	int			status = 0;

	if (messageStartsWith(msg, "OK "))
	{
		sscanf(msg, "OK %d %d %d%n", &dumpId, &status, &n_errors, &nBytes);

		Assert(dumpId == te->dumpId);
		Assert(nBytes == strlen(msg));

		AH->public.n_errors += n_errors;
	}
	else
		pg_fatal("invalid message received from worker: \"%s\"",
				 msg);

	return status;
}

/*
 * Dispatch a job to some free worker.
 *
 * te is the TocEntry to be processed, act is the action to be taken on it.
 * callback is the function to call on completion of the job.
 *
 * If no worker is currently available, this will block, and previously
 * registered callback functions may be called.
 */
void
DispatchJobForTocEntry(ArchiveHandle *AH,
					   ParallelState *pstate,
					   TocEntry *te,
					   T_Action act,
					   ParallelCompletionPtr callback,
					   void *callback_data)
{
	int			worker;
	char		buf[256];

	/* Get a worker, waiting if none are idle */
	while ((worker = GetIdleWorker(pstate)) == NO_SLOT)
		WaitForWorkers(AH, pstate, WFW_ONE_IDLE);

	/* Construct and send command string */
	buildWorkerCommand(AH, te, act, buf, sizeof(buf));

	sendMessageToWorker(pstate, worker, buf);

	/* Remember worker is busy, and which TocEntry it's working on */
	pstate->parallelSlot[worker].workerStatus = WRKR_WORKING;
	pstate->parallelSlot[worker].callback = callback;
	pstate->parallelSlot[worker].callback_data = callback_data;
	pstate->te[worker] = te;
}

/*
 * Find an idle worker and return its slot number.
 * Return NO_SLOT if none are idle.
 */
static int
GetIdleWorker(ParallelState *pstate)
{
	int			i;

	for (i = 0; i < pstate->numWorkers; i++)
	{
		if (pstate->parallelSlot[i].workerStatus == WRKR_IDLE)
			return i;
	}
	return NO_SLOT;
}

/*
 * Return true iff no worker is running.
 */
static bool
HasEveryWorkerTerminated(ParallelState *pstate)
{
	int			i;

	for (i = 0; i < pstate->numWorkers; i++)
	{
		if (WORKER_IS_RUNNING(pstate->parallelSlot[i].workerStatus))
			return false;
	}
	return true;
}

/*
 * Return true iff every worker is in the WRKR_IDLE state.
 */
bool
IsEveryWorkerIdle(ParallelState *pstate)
{
	int			i;

	for (i = 0; i < pstate->numWorkers; i++)
	{
		if (pstate->parallelSlot[i].workerStatus != WRKR_IDLE)
			return false;
	}
	return true;
}

/*
 * Acquire lock on a table to be dumped by a worker process.
 *
 * The leader process is already holding an ACCESS SHARE lock.  Ordinarily
 * it's no problem for a worker to get one too, but if anything else besides
 * pg_dump is running, there's a possible deadlock:
 *
 * 1) Leader dumps the schema and locks all tables in ACCESS SHARE mode.
 * 2) Another process requests an ACCESS EXCLUSIVE lock (which is not granted
 *	  because the leader holds a conflicting ACCESS SHARE lock).
 * 3) A worker process also requests an ACCESS SHARE lock to read the table.
 *	  The worker is enqueued behind the ACCESS EXCLUSIVE lock request.
 * 4) Now we have a deadlock, since the leader is effectively waiting for
 *	  the worker.  The server cannot detect that, however.
 *
 * To prevent an infinite wait, prior to touching a table in a worker, request
 * a lock in ACCESS SHARE mode but with NOWAIT.  If we don't get the lock,
 * then we know that somebody else has requested an ACCESS EXCLUSIVE lock and
 * so we have a deadlock.  We must fail the backup in that case.
 */
static void
lockTableForWorker(ArchiveHandle *AH, TocEntry *te)
{
	const char *qualId;
	PQExpBuffer query;
	PGresult   *res;

	/* Nothing to do for BLOBS */
	if (strcmp(te->desc, "BLOBS") == 0)
		return;

	query = createPQExpBuffer();

	qualId = fmtQualifiedId(te->namespace, te->tag);

	appendPQExpBuffer(query, "LOCK TABLE %s IN ACCESS SHARE MODE NOWAIT",
					  qualId);

	res = PQexec(AH->connection, query->data);

	if (!res || PQresultStatus(res) != PGRES_COMMAND_OK)
		pg_fatal("could not obtain lock on relation \"%s\"\n"
				 "This usually means that someone requested an ACCESS EXCLUSIVE lock "
				 "on the table after the pg_dump parent process had gotten the "
				 "initial ACCESS SHARE lock on the table.", qualId);

	PQclear(res);
	destroyPQExpBuffer(query);
}

/*
 * WaitForCommands: main routine for a worker process.
 *
 * Read and execute commands from the leader until we see EOF on the pipe.
 */
static void
WaitForCommands(ArchiveHandle *AH, int pipefd[2])
{
	char	   *command;
	TocEntry   *te;
	T_Action	act;
	int			status = 0;
	char		buf[256];

	for (;;)
	{
		if (!(command = getMessageFromLeader(pipefd)))
		{
			/* EOF, so done */
			return;
		}

		/* Decode the command */
		parseWorkerCommand(AH, &te, &act, command);

		if (act == ACT_DUMP)
		{
			/* Acquire lock on this table within the worker's session */
			lockTableForWorker(AH, te);

			/* Perform the dump command */
			status = (AH->WorkerJobDumpPtr) (AH, te);
		}
		else if (act == ACT_RESTORE)
		{
			/* Perform the restore command */
			status = (AH->WorkerJobRestorePtr) (AH, te);
		}
		else
			Assert(false);

		/* Return status to leader */
		buildWorkerResponse(AH, te, act, status, buf, sizeof(buf));

		sendMessageToLeader(pipefd, buf);

		/* command was pg_malloc'd and we are responsible for free()ing it. */
		free(command);
	}
}

/*
 * Check for status messages from workers.
 *
 * If do_wait is true, wait to get a status message; otherwise, just return
 * immediately if there is none available.
 *
 * When we get a status message, we pass the status code to the callback
 * function that was specified to DispatchJobForTocEntry, then reset the
 * worker status to IDLE.
 *
 * Returns true if we collected a status message, else false.
 *
 * XXX is it worth checking for more than one status message per call?
 * It seems somewhat unlikely that multiple workers would finish at exactly
 * the same time.
 */
static bool
ListenToWorkers(ArchiveHandle *AH, ParallelState *pstate, bool do_wait)
{
	int			worker;
	char	   *msg;

	/* Try to collect a status message */
	msg = getMessageFromWorker(pstate, do_wait, &worker);

	if (!msg)
	{
		/* If do_wait is true, we must have detected EOF on some socket */
		if (do_wait)
			pg_fatal("a worker process died unexpectedly");
		return false;
	}

	/* Process it and update our idea of the worker's status */
	if (messageStartsWith(msg, "OK "))
	{
		ParallelSlot *slot = &pstate->parallelSlot[worker];
		TocEntry   *te = pstate->te[worker];
		int			status;

		status = parseWorkerResponse(AH, te, msg);
		slot->callback(AH, te, status, slot->callback_data);
		slot->workerStatus = WRKR_IDLE;
		pstate->te[worker] = NULL;
	}
	else
		pg_fatal("invalid message received from worker: \"%s\"",
				 msg);

	/* Free the string returned from getMessageFromWorker */
	free(msg);

	return true;
}

/*
 * Check for status results from workers, waiting if necessary.
 *
 * Available wait modes are:
 * WFW_NO_WAIT: reap any available status, but don't block
 * WFW_GOT_STATUS: wait for at least one more worker to finish
 * WFW_ONE_IDLE: wait for at least one worker to be idle
 * WFW_ALL_IDLE: wait for all workers to be idle
 *
 * Any received results are passed to the callback specified to
 * DispatchJobForTocEntry.
 *
 * This function is executed in the leader process.
 */
void
WaitForWorkers(ArchiveHandle *AH, ParallelState *pstate, WFW_WaitOption mode)
{
	bool		do_wait = false;

	/*
	 * In GOT_STATUS mode, always block waiting for a message, since we can't
	 * return till we get something.  In other modes, we don't block the first
	 * time through the loop.
	 */
	if (mode == WFW_GOT_STATUS)
	{
		/* Assert that caller knows what it's doing */
		Assert(!IsEveryWorkerIdle(pstate));
		do_wait = true;
	}

	for (;;)
	{
		/*
		 * Check for status messages, even if we don't need to block.  We do
		 * not try very hard to reap all available messages, though, since
		 * there's unlikely to be more than one.
		 */
		if (ListenToWorkers(AH, pstate, do_wait))
		{
			/*
			 * If we got a message, we are done by definition for GOT_STATUS
			 * mode, and we can also be certain that there's at least one idle
			 * worker.  So we're done in all but ALL_IDLE mode.
			 */
			if (mode != WFW_ALL_IDLE)
				return;
		}

		/* Check whether we must wait for new status messages */
		switch (mode)
		{
			case WFW_NO_WAIT:
				return;			/* never wait */
			case WFW_GOT_STATUS:
				Assert(false);	/* can't get here, because we waited */
				break;
			case WFW_ONE_IDLE:
				if (GetIdleWorker(pstate) != NO_SLOT)
					return;
				break;
			case WFW_ALL_IDLE:
				if (IsEveryWorkerIdle(pstate))
					return;
				break;
		}

		/* Loop back, and this time wait for something to happen */
		do_wait = true;
	}
}

/*
 * Read one command message from the leader, blocking if necessary
 * until one is available, and return it as a malloc'd string.
 * On EOF, return NULL.
 *
 * This function is executed in worker processes.
 */
static char *
getMessageFromLeader(int pipefd[2])
{
	return readMessageFromPipe(pipefd[PIPE_READ]);
}

/*
 * Send a status message to the leader.
 *
 * This function is executed in worker processes.
 */
static void
sendMessageToLeader(int pipefd[2], const char *str)
{
	int			len = strlen(str) + 1;

	if (pipewrite(pipefd[PIPE_WRITE], str, len) != len)
		pg_fatal("could not write to the communication channel: %m");
}

/*
 * Wait until some descriptor in "workerset" becomes readable.
 * Returns -1 on error, else the number of readable descriptors.
 */
static int
select_loop(int maxFd, fd_set *workerset)
{
	int			i;
	fd_set		saveSet = *workerset;

	for (;;)
	{
		*workerset = saveSet;
		i = select(maxFd + 1, workerset, NULL, NULL, NULL);

#ifndef WIN32
		if (i < 0 && errno == EINTR)
			continue;
#else
		if (i == SOCKET_ERROR && WSAGetLastError() == WSAEINTR)
			continue;
#endif
		break;
	}

	return i;
}


/*
 * Check for messages from worker processes.
 *
 * If a message is available, return it as a malloc'd string, and put the
 * index of the sending worker in *worker.
 *
 * If nothing is available, wait if "do_wait" is true, else return NULL.
 *
 * If we detect EOF on any socket, we'll return NULL.  It's not great that
 * that's hard to distinguish from the no-data-available case, but for now
 * our one caller is okay with that.
 *
 * This function is executed in the leader process.
 */
static char *
getMessageFromWorker(ParallelState *pstate, bool do_wait, int *worker)
{
	int			i;
	fd_set		workerset;
	int			maxFd = -1;
	struct timeval nowait = {0, 0};

	/* construct bitmap of socket descriptors for select() */
	FD_ZERO(&workerset);
	for (i = 0; i < pstate->numWorkers; i++)
	{
		if (!WORKER_IS_RUNNING(pstate->parallelSlot[i].workerStatus))
			continue;
		FD_SET(pstate->parallelSlot[i].pipeRead, &workerset);
		if (pstate->parallelSlot[i].pipeRead > maxFd)
			maxFd = pstate->parallelSlot[i].pipeRead;
	}

	if (do_wait)
	{
		i = select_loop(maxFd, &workerset);
		Assert(i != 0);
	}
	else
	{
		if ((i = select(maxFd + 1, &workerset, NULL, NULL, &nowait)) == 0)
			return NULL;
	}

	if (i < 0)
		pg_fatal("%s() failed: %m", "select");

	for (i = 0; i < pstate->numWorkers; i++)
	{
		char	   *msg;

		if (!WORKER_IS_RUNNING(pstate->parallelSlot[i].workerStatus))
			continue;
		if (!FD_ISSET(pstate->parallelSlot[i].pipeRead, &workerset))
			continue;

		/*
		 * Read the message if any.  If the socket is ready because of EOF,
		 * we'll return NULL instead (and the socket will stay ready, so the
		 * condition will persist).
		 *
		 * Note: because this is a blocking read, we'll wait if only part of
		 * the message is available.  Waiting a long time would be bad, but
		 * since worker status messages are short and are always sent in one
		 * operation, it shouldn't be a problem in practice.
		 */
		msg = readMessageFromPipe(pstate->parallelSlot[i].pipeRead);
		*worker = i;
		return msg;
	}
	Assert(false);
	return NULL;
}

/*
 * Send a command message to the specified worker process.
 *
 * This function is executed in the leader process.
 */
static void
sendMessageToWorker(ParallelState *pstate, int worker, const char *str)
{
	int			len = strlen(str) + 1;

	if (pipewrite(pstate->parallelSlot[worker].pipeWrite, str, len) != len)
	{
		pg_fatal("could not write to the communication channel: %m");
	}
}

/*
 * Read one message from the specified pipe (fd), blocking if necessary
 * until one is available, and return it as a malloc'd string.
 * On EOF, return NULL.
 *
 * A "message" on the channel is just a null-terminated string.
 */
static char *
readMessageFromPipe(int fd)
{
	char	   *msg;
	int			msgsize,
				bufsize;
	int			ret;

	/*
	 * In theory, if we let piperead() read multiple bytes, it might give us
	 * back fragments of multiple messages.  (That can't actually occur, since
	 * neither leader nor workers send more than one message without waiting
	 * for a reply, but we don't wish to assume that here.)  For simplicity,
	 * read a byte at a time until we get the terminating '\0'.  This method
	 * is a bit inefficient, but since this is only used for relatively short
	 * command and status strings, it shouldn't matter.
	 */
	bufsize = 64;				/* could be any number */
	msg = (char *) pg_malloc(bufsize);
	msgsize = 0;
	for (;;)
	{
		Assert(msgsize < bufsize);
		ret = piperead(fd, msg + msgsize, 1);
		if (ret <= 0)
			break;				/* error or connection closure */

		Assert(ret == 1);

		if (msg[msgsize] == '\0')
			return msg;			/* collected whole message */

		msgsize++;
		if (msgsize == bufsize) /* enlarge buffer if needed */
		{
			bufsize += 16;		/* could be any number */
			msg = (char *) pg_realloc(msg, bufsize);
		}
	}

	/* Other end has closed the connection */
	pg_free(msg);
	return NULL;
}

#ifdef WIN32

/*
 * This is a replacement version of pipe(2) for Windows which allows the pipe
 * handles to be used in select().
 *
 * Reads and writes on the pipe must go through piperead()/pipewrite().
 *
 * For consistency with Unix we declare the returned handles as "int".
 * This is okay even on WIN64 because system handles are not more than
 * 32 bits wide, but we do have to do some casting.
 */
static int
pgpipe(int handles[2])
{
	pgsocket	s,
				tmp_sock;
	struct sockaddr_in serv_addr;
	int			len = sizeof(serv_addr);

	/* We have to use the Unix socket invalid file descriptor value here. */
	handles[0] = handles[1] = -1;

	/*
	 * setup listen socket
	 */
	if ((s = socket(AF_INET, SOCK_STREAM, 0)) == PGINVALID_SOCKET)
	{
		pg_log_error("pgpipe: could not create socket: error code %d",
					 WSAGetLastError());
		return -1;
	}

	memset(&serv_addr, 0, sizeof(serv_addr));
	serv_addr.sin_family = AF_INET;
	serv_addr.sin_port = pg_hton16(0);
	serv_addr.sin_addr.s_addr = pg_hton32(INADDR_LOOPBACK);
	if (bind(s, (SOCKADDR *) &serv_addr, len) == SOCKET_ERROR)
	{
		pg_log_error("pgpipe: could not bind: error code %d",
					 WSAGetLastError());
		closesocket(s);
		return -1;
	}
	if (listen(s, 1) == SOCKET_ERROR)
	{
		pg_log_error("pgpipe: could not listen: error code %d",
					 WSAGetLastError());
		closesocket(s);
		return -1;
	}
	if (getsockname(s, (SOCKADDR *) &serv_addr, &len) == SOCKET_ERROR)
	{
		pg_log_error("pgpipe: %s() failed: error code %d", "getsockname",
					 WSAGetLastError());
		closesocket(s);
		return -1;
	}

	/*
	 * setup pipe handles
	 */
	if ((tmp_sock = socket(AF_INET, SOCK_STREAM, 0)) == PGINVALID_SOCKET)
	{
		pg_log_error("pgpipe: could not create second socket: error code %d",
					 WSAGetLastError());
		closesocket(s);
		return -1;
	}
	handles[1] = (int) tmp_sock;

	if (connect(handles[1], (SOCKADDR *) &serv_addr, len) == SOCKET_ERROR)
	{
		pg_log_error("pgpipe: could not connect socket: error code %d",
					 WSAGetLastError());
		closesocket(handles[1]);
		handles[1] = -1;
		closesocket(s);
		return -1;
	}
	if ((tmp_sock = accept(s, (SOCKADDR *) &serv_addr, &len)) == PGINVALID_SOCKET)
	{
		pg_log_error("pgpipe: could not accept connection: error code %d",
					 WSAGetLastError());
		closesocket(handles[1]);
		handles[1] = -1;
		closesocket(s);
		return -1;
	}
	handles[0] = (int) tmp_sock;

	closesocket(s);
	return 0;
}

#endif							/* WIN32 */