summaryrefslogtreecommitdiffstats
path: root/src/bin/pg_dump/pg_backup_archiver.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/bin/pg_dump/pg_backup_archiver.c')
-rw-r--r--src/bin/pg_dump/pg_backup_archiver.c4824
1 files changed, 4824 insertions, 0 deletions
diff --git a/src/bin/pg_dump/pg_backup_archiver.c b/src/bin/pg_dump/pg_backup_archiver.c
new file mode 100644
index 0000000..87113cf
--- /dev/null
+++ b/src/bin/pg_dump/pg_backup_archiver.c
@@ -0,0 +1,4824 @@
+/*-------------------------------------------------------------------------
+ *
+ * pg_backup_archiver.c
+ *
+ * Private implementation of the archiver routines.
+ *
+ * See the headers to pg_restore for more details.
+ *
+ * Copyright (c) 2000, Philip Warner
+ * Rights are granted to use this software in any way so long
+ * as this notice is not removed.
+ *
+ * The author is not responsible for loss or damages that may
+ * result from its use.
+ *
+ *
+ * IDENTIFICATION
+ * src/bin/pg_dump/pg_backup_archiver.c
+ *
+ *-------------------------------------------------------------------------
+ */
+#include "postgres_fe.h"
+
+#include <ctype.h>
+#include <fcntl.h>
+#include <unistd.h>
+#include <sys/stat.h>
+#include <sys/wait.h>
+#ifdef WIN32
+#include <io.h>
+#endif
+
+#include "common/string.h"
+#include "dumputils.h"
+#include "fe_utils/string_utils.h"
+#include "lib/stringinfo.h"
+#include "libpq/libpq-fs.h"
+#include "parallel.h"
+#include "pg_backup_archiver.h"
+#include "pg_backup_db.h"
+#include "pg_backup_utils.h"
+
+#define TEXT_DUMP_HEADER "--\n-- PostgreSQL database dump\n--\n\n"
+#define TEXT_DUMPALL_HEADER "--\n-- PostgreSQL database cluster dump\n--\n\n"
+
+/* state needed to save/restore an archive's output target */
+typedef struct _outputContext
+{
+ void *OF;
+ int gzOut;
+} OutputContext;
+
+/*
+ * State for tracking TocEntrys that are ready to process during a parallel
+ * restore. (This used to be a list, and we still call it that, though now
+ * it's really an array so that we can apply qsort to it.)
+ *
+ * tes[] is sized large enough that we can't overrun it.
+ * The valid entries are indexed first_te .. last_te inclusive.
+ * We periodically sort the array to bring larger-by-dataLength entries to
+ * the front; "sorted" is true if the valid entries are known sorted.
+ */
+typedef struct _parallelReadyList
+{
+ TocEntry **tes; /* Ready-to-dump TocEntrys */
+ int first_te; /* index of first valid entry in tes[] */
+ int last_te; /* index of last valid entry in tes[] */
+ bool sorted; /* are valid entries currently sorted? */
+} ParallelReadyList;
+
+
+static ArchiveHandle *_allocAH(const char *FileSpec, const ArchiveFormat fmt,
+ const int compression, bool dosync, ArchiveMode mode,
+ SetupWorkerPtrType setupWorkerPtr);
+static void _getObjectDescription(PQExpBuffer buf, TocEntry *te);
+static void _printTocEntry(ArchiveHandle *AH, TocEntry *te, bool isData);
+static char *sanitize_line(const char *str, bool want_hyphen);
+static void _doSetFixedOutputState(ArchiveHandle *AH);
+static void _doSetSessionAuth(ArchiveHandle *AH, const char *user);
+static void _reconnectToDB(ArchiveHandle *AH, const char *dbname);
+static void _becomeUser(ArchiveHandle *AH, const char *user);
+static void _becomeOwner(ArchiveHandle *AH, TocEntry *te);
+static void _selectOutputSchema(ArchiveHandle *AH, const char *schemaName);
+static void _selectTablespace(ArchiveHandle *AH, const char *tablespace);
+static void _selectTableAccessMethod(ArchiveHandle *AH, const char *tableam);
+static void processEncodingEntry(ArchiveHandle *AH, TocEntry *te);
+static void processStdStringsEntry(ArchiveHandle *AH, TocEntry *te);
+static void processSearchPathEntry(ArchiveHandle *AH, TocEntry *te);
+static int _tocEntryRequired(TocEntry *te, teSection curSection, ArchiveHandle *AH);
+static RestorePass _tocEntryRestorePass(TocEntry *te);
+static bool _tocEntryIsACL(TocEntry *te);
+static void _disableTriggersIfNecessary(ArchiveHandle *AH, TocEntry *te);
+static void _enableTriggersIfNecessary(ArchiveHandle *AH, TocEntry *te);
+static void buildTocEntryArrays(ArchiveHandle *AH);
+static void _moveBefore(TocEntry *pos, TocEntry *te);
+static int _discoverArchiveFormat(ArchiveHandle *AH);
+
+static int RestoringToDB(ArchiveHandle *AH);
+static void dump_lo_buf(ArchiveHandle *AH);
+static void dumpTimestamp(ArchiveHandle *AH, const char *msg, time_t tim);
+static void SetOutput(ArchiveHandle *AH, const char *filename, int compression);
+static OutputContext SaveOutput(ArchiveHandle *AH);
+static void RestoreOutput(ArchiveHandle *AH, OutputContext savedContext);
+
+static int restore_toc_entry(ArchiveHandle *AH, TocEntry *te, bool is_parallel);
+static void restore_toc_entries_prefork(ArchiveHandle *AH,
+ TocEntry *pending_list);
+static void restore_toc_entries_parallel(ArchiveHandle *AH,
+ ParallelState *pstate,
+ TocEntry *pending_list);
+static void restore_toc_entries_postfork(ArchiveHandle *AH,
+ TocEntry *pending_list);
+static void pending_list_header_init(TocEntry *l);
+static void pending_list_append(TocEntry *l, TocEntry *te);
+static void pending_list_remove(TocEntry *te);
+static void ready_list_init(ParallelReadyList *ready_list, int tocCount);
+static void ready_list_free(ParallelReadyList *ready_list);
+static void ready_list_insert(ParallelReadyList *ready_list, TocEntry *te);
+static void ready_list_remove(ParallelReadyList *ready_list, int i);
+static void ready_list_sort(ParallelReadyList *ready_list);
+static int TocEntrySizeCompare(const void *p1, const void *p2);
+static void move_to_ready_list(TocEntry *pending_list,
+ ParallelReadyList *ready_list,
+ RestorePass pass);
+static TocEntry *pop_next_work_item(ParallelReadyList *ready_list,
+ ParallelState *pstate);
+static void mark_dump_job_done(ArchiveHandle *AH,
+ TocEntry *te,
+ int status,
+ void *callback_data);
+static void mark_restore_job_done(ArchiveHandle *AH,
+ TocEntry *te,
+ int status,
+ void *callback_data);
+static void fix_dependencies(ArchiveHandle *AH);
+static bool has_lock_conflicts(TocEntry *te1, TocEntry *te2);
+static void repoint_table_dependencies(ArchiveHandle *AH);
+static void identify_locking_dependencies(ArchiveHandle *AH, TocEntry *te);
+static void reduce_dependencies(ArchiveHandle *AH, TocEntry *te,
+ ParallelReadyList *ready_list);
+static void mark_create_done(ArchiveHandle *AH, TocEntry *te);
+static void inhibit_data_for_failed_table(ArchiveHandle *AH, TocEntry *te);
+
+static void StrictNamesCheck(RestoreOptions *ropt);
+
+
+/*
+ * Allocate a new DumpOptions block containing all default values.
+ */
+DumpOptions *
+NewDumpOptions(void)
+{
+ DumpOptions *opts = (DumpOptions *) pg_malloc(sizeof(DumpOptions));
+
+ InitDumpOptions(opts);
+ return opts;
+}
+
+/*
+ * Initialize a DumpOptions struct to all default values
+ */
+void
+InitDumpOptions(DumpOptions *opts)
+{
+ memset(opts, 0, sizeof(DumpOptions));
+ /* set any fields that shouldn't default to zeroes */
+ opts->include_everything = true;
+ opts->cparams.promptPassword = TRI_DEFAULT;
+ opts->dumpSections = DUMP_UNSECTIONED;
+}
+
+/*
+ * Create a freshly allocated DumpOptions with options equivalent to those
+ * found in the given RestoreOptions.
+ */
+DumpOptions *
+dumpOptionsFromRestoreOptions(RestoreOptions *ropt)
+{
+ DumpOptions *dopt = NewDumpOptions();
+
+ /* this is the inverse of what's at the end of pg_dump.c's main() */
+ dopt->cparams.dbname = ropt->cparams.dbname ? pg_strdup(ropt->cparams.dbname) : NULL;
+ dopt->cparams.pgport = ropt->cparams.pgport ? pg_strdup(ropt->cparams.pgport) : NULL;
+ dopt->cparams.pghost = ropt->cparams.pghost ? pg_strdup(ropt->cparams.pghost) : NULL;
+ dopt->cparams.username = ropt->cparams.username ? pg_strdup(ropt->cparams.username) : NULL;
+ dopt->cparams.promptPassword = ropt->cparams.promptPassword;
+ dopt->outputClean = ropt->dropSchema;
+ dopt->dataOnly = ropt->dataOnly;
+ dopt->schemaOnly = ropt->schemaOnly;
+ dopt->if_exists = ropt->if_exists;
+ dopt->column_inserts = ropt->column_inserts;
+ dopt->dumpSections = ropt->dumpSections;
+ dopt->aclsSkip = ropt->aclsSkip;
+ dopt->outputSuperuser = ropt->superuser;
+ dopt->outputCreateDB = ropt->createDB;
+ dopt->outputNoOwner = ropt->noOwner;
+ dopt->outputNoTablespaces = ropt->noTablespace;
+ dopt->disable_triggers = ropt->disable_triggers;
+ dopt->use_setsessauth = ropt->use_setsessauth;
+ dopt->disable_dollar_quoting = ropt->disable_dollar_quoting;
+ dopt->dump_inserts = ropt->dump_inserts;
+ dopt->no_comments = ropt->no_comments;
+ dopt->no_publications = ropt->no_publications;
+ dopt->no_security_labels = ropt->no_security_labels;
+ dopt->no_subscriptions = ropt->no_subscriptions;
+ dopt->lockWaitTimeout = ropt->lockWaitTimeout;
+ dopt->include_everything = ropt->include_everything;
+ dopt->enable_row_security = ropt->enable_row_security;
+ dopt->sequence_data = ropt->sequence_data;
+
+ return dopt;
+}
+
+
+/*
+ * Wrapper functions.
+ *
+ * The objective is to make writing new formats and dumpers as simple
+ * as possible, if necessary at the expense of extra function calls etc.
+ *
+ */
+
+/*
+ * The dump worker setup needs lots of knowledge of the internals of pg_dump,
+ * so it's defined in pg_dump.c and passed into OpenArchive. The restore worker
+ * setup doesn't need to know anything much, so it's defined here.
+ */
+static void
+setupRestoreWorker(Archive *AHX)
+{
+ ArchiveHandle *AH = (ArchiveHandle *) AHX;
+
+ AH->ReopenPtr(AH);
+}
+
+
+/* Create a new archive */
+/* Public */
+Archive *
+CreateArchive(const char *FileSpec, const ArchiveFormat fmt,
+ const int compression, bool dosync, ArchiveMode mode,
+ SetupWorkerPtrType setupDumpWorker)
+
+{
+ ArchiveHandle *AH = _allocAH(FileSpec, fmt, compression, dosync,
+ mode, setupDumpWorker);
+
+ return (Archive *) AH;
+}
+
+/* Open an existing archive */
+/* Public */
+Archive *
+OpenArchive(const char *FileSpec, const ArchiveFormat fmt)
+{
+ ArchiveHandle *AH = _allocAH(FileSpec, fmt, 0, true, archModeRead, setupRestoreWorker);
+
+ return (Archive *) AH;
+}
+
+/* Public */
+void
+CloseArchive(Archive *AHX)
+{
+ int res = 0;
+ ArchiveHandle *AH = (ArchiveHandle *) AHX;
+
+ AH->ClosePtr(AH);
+
+ /* Close the output */
+ errno = 0; /* in case gzclose() doesn't set it */
+ if (AH->gzOut)
+ res = GZCLOSE(AH->OF);
+ else if (AH->OF != stdout)
+ res = fclose(AH->OF);
+
+ if (res != 0)
+ fatal("could not close output file: %m");
+}
+
+/* Public */
+void
+SetArchiveOptions(Archive *AH, DumpOptions *dopt, RestoreOptions *ropt)
+{
+ /* Caller can omit dump options, in which case we synthesize them */
+ if (dopt == NULL && ropt != NULL)
+ dopt = dumpOptionsFromRestoreOptions(ropt);
+
+ /* Save options for later access */
+ AH->dopt = dopt;
+ AH->ropt = ropt;
+}
+
+/* Public */
+void
+ProcessArchiveRestoreOptions(Archive *AHX)
+{
+ ArchiveHandle *AH = (ArchiveHandle *) AHX;
+ RestoreOptions *ropt = AH->public.ropt;
+ TocEntry *te;
+ teSection curSection;
+
+ /* Decide which TOC entries will be dumped/restored, and mark them */
+ curSection = SECTION_PRE_DATA;
+ for (te = AH->toc->next; te != AH->toc; te = te->next)
+ {
+ /*
+ * When writing an archive, we also take this opportunity to check
+ * that we have generated the entries in a sane order that respects
+ * the section divisions. When reading, don't complain, since buggy
+ * old versions of pg_dump might generate out-of-order archives.
+ */
+ if (AH->mode != archModeRead)
+ {
+ switch (te->section)
+ {
+ case SECTION_NONE:
+ /* ok to be anywhere */
+ break;
+ case SECTION_PRE_DATA:
+ if (curSection != SECTION_PRE_DATA)
+ pg_log_warning("archive items not in correct section order");
+ break;
+ case SECTION_DATA:
+ if (curSection == SECTION_POST_DATA)
+ pg_log_warning("archive items not in correct section order");
+ break;
+ case SECTION_POST_DATA:
+ /* ok no matter which section we were in */
+ break;
+ default:
+ fatal("unexpected section code %d",
+ (int) te->section);
+ break;
+ }
+ }
+
+ if (te->section != SECTION_NONE)
+ curSection = te->section;
+
+ te->reqs = _tocEntryRequired(te, curSection, AH);
+ }
+
+ /* Enforce strict names checking */
+ if (ropt->strict_names)
+ StrictNamesCheck(ropt);
+}
+
+/* Public */
+void
+RestoreArchive(Archive *AHX)
+{
+ ArchiveHandle *AH = (ArchiveHandle *) AHX;
+ RestoreOptions *ropt = AH->public.ropt;
+ bool parallel_mode;
+ TocEntry *te;
+ OutputContext sav;
+
+ AH->stage = STAGE_INITIALIZING;
+
+ /*
+ * If we're going to do parallel restore, there are some restrictions.
+ */
+ parallel_mode = (AH->public.numWorkers > 1 && ropt->useDB);
+ if (parallel_mode)
+ {
+ /* We haven't got round to making this work for all archive formats */
+ if (AH->ClonePtr == NULL || AH->ReopenPtr == NULL)
+ fatal("parallel restore is not supported with this archive file format");
+
+ /* Doesn't work if the archive represents dependencies as OIDs */
+ if (AH->version < K_VERS_1_8)
+ fatal("parallel restore is not supported with archives made by pre-8.0 pg_dump");
+
+ /*
+ * It's also not gonna work if we can't reopen the input file, so
+ * let's try that immediately.
+ */
+ AH->ReopenPtr(AH);
+ }
+
+ /*
+ * Make sure we won't need (de)compression we haven't got
+ */
+#ifndef HAVE_LIBZ
+ if (AH->compression != 0 && AH->PrintTocDataPtr != NULL)
+ {
+ for (te = AH->toc->next; te != AH->toc; te = te->next)
+ {
+ if (te->hadDumper && (te->reqs & REQ_DATA) != 0)
+ fatal("cannot restore from compressed archive (compression not supported in this installation)");
+ }
+ }
+#endif
+
+ /*
+ * Prepare index arrays, so we can assume we have them throughout restore.
+ * It's possible we already did this, though.
+ */
+ if (AH->tocsByDumpId == NULL)
+ buildTocEntryArrays(AH);
+
+ /*
+ * If we're using a DB connection, then connect it.
+ */
+ if (ropt->useDB)
+ {
+ pg_log_info("connecting to database for restore");
+ if (AH->version < K_VERS_1_3)
+ fatal("direct database connections are not supported in pre-1.3 archives");
+
+ /*
+ * We don't want to guess at whether the dump will successfully
+ * restore; allow the attempt regardless of the version of the restore
+ * target.
+ */
+ AHX->minRemoteVersion = 0;
+ AHX->maxRemoteVersion = 9999999;
+
+ ConnectDatabase(AHX, &ropt->cparams, false);
+
+ /*
+ * If we're talking to the DB directly, don't send comments since they
+ * obscure SQL when displaying errors
+ */
+ AH->noTocComments = 1;
+ }
+
+ /*
+ * Work out if we have an implied data-only restore. This can happen if
+ * the dump was data only or if the user has used a toc list to exclude
+ * all of the schema data. All we do is look for schema entries - if none
+ * are found then we set the dataOnly flag.
+ *
+ * We could scan for wanted TABLE entries, but that is not the same as
+ * dataOnly. At this stage, it seems unnecessary (6-Mar-2001).
+ */
+ if (!ropt->dataOnly)
+ {
+ int impliedDataOnly = 1;
+
+ for (te = AH->toc->next; te != AH->toc; te = te->next)
+ {
+ if ((te->reqs & REQ_SCHEMA) != 0)
+ { /* It's schema, and it's wanted */
+ impliedDataOnly = 0;
+ break;
+ }
+ }
+ if (impliedDataOnly)
+ {
+ ropt->dataOnly = impliedDataOnly;
+ pg_log_info("implied data-only restore");
+ }
+ }
+
+ /*
+ * Setup the output file if necessary.
+ */
+ sav = SaveOutput(AH);
+ if (ropt->filename || ropt->compression)
+ SetOutput(AH, ropt->filename, ropt->compression);
+
+ ahprintf(AH, "--\n-- PostgreSQL database dump\n--\n\n");
+
+ if (AH->archiveRemoteVersion)
+ ahprintf(AH, "-- Dumped from database version %s\n",
+ AH->archiveRemoteVersion);
+ if (AH->archiveDumpVersion)
+ ahprintf(AH, "-- Dumped by pg_dump version %s\n",
+ AH->archiveDumpVersion);
+
+ ahprintf(AH, "\n");
+
+ if (AH->public.verbose)
+ dumpTimestamp(AH, "Started on", AH->createDate);
+
+ if (ropt->single_txn)
+ {
+ if (AH->connection)
+ StartTransaction(AHX);
+ else
+ ahprintf(AH, "BEGIN;\n\n");
+ }
+
+ /*
+ * Establish important parameter values right away.
+ */
+ _doSetFixedOutputState(AH);
+
+ AH->stage = STAGE_PROCESSING;
+
+ /*
+ * Drop the items at the start, in reverse order
+ */
+ if (ropt->dropSchema)
+ {
+ for (te = AH->toc->prev; te != AH->toc; te = te->prev)
+ {
+ AH->currentTE = te;
+
+ /*
+ * In createDB mode, issue a DROP *only* for the database as a
+ * whole. Issuing drops against anything else would be wrong,
+ * because at this point we're connected to the wrong database.
+ * (The DATABASE PROPERTIES entry, if any, should be treated like
+ * the DATABASE entry.)
+ */
+ if (ropt->createDB)
+ {
+ if (strcmp(te->desc, "DATABASE") != 0 &&
+ strcmp(te->desc, "DATABASE PROPERTIES") != 0)
+ continue;
+ }
+
+ /* Otherwise, drop anything that's selected and has a dropStmt */
+ if (((te->reqs & (REQ_SCHEMA | REQ_DATA)) != 0) && te->dropStmt)
+ {
+ pg_log_info("dropping %s %s", te->desc, te->tag);
+ /* Select owner and schema as necessary */
+ _becomeOwner(AH, te);
+ _selectOutputSchema(AH, te->namespace);
+
+ /*
+ * Now emit the DROP command, if the object has one. Note we
+ * don't necessarily emit it verbatim; at this point we add an
+ * appropriate IF EXISTS clause, if the user requested it.
+ */
+ if (*te->dropStmt != '\0')
+ {
+ if (!ropt->if_exists)
+ {
+ /* No --if-exists? Then just use the original */
+ ahprintf(AH, "%s", te->dropStmt);
+ }
+ else
+ {
+ /*
+ * Inject an appropriate spelling of "if exists". For
+ * large objects, we have a separate routine that
+ * knows how to do it, without depending on
+ * te->dropStmt; use that. For other objects we need
+ * to parse the command.
+ */
+ if (strncmp(te->desc, "BLOB", 4) == 0)
+ {
+ DropBlobIfExists(AH, te->catalogId.oid);
+ }
+ else
+ {
+ char *dropStmt = pg_strdup(te->dropStmt);
+ char *dropStmtOrig = dropStmt;
+ PQExpBuffer ftStmt = createPQExpBuffer();
+
+ /*
+ * Need to inject IF EXISTS clause after ALTER
+ * TABLE part in ALTER TABLE .. DROP statement
+ */
+ if (strncmp(dropStmt, "ALTER TABLE", 11) == 0)
+ {
+ appendPQExpBufferStr(ftStmt,
+ "ALTER TABLE IF EXISTS");
+ dropStmt = dropStmt + 11;
+ }
+
+ /*
+ * ALTER TABLE..ALTER COLUMN..DROP DEFAULT does
+ * not support the IF EXISTS clause, and therefore
+ * we simply emit the original command for DEFAULT
+ * objects (modulo the adjustment made above).
+ *
+ * Likewise, don't mess with DATABASE PROPERTIES.
+ *
+ * If we used CREATE OR REPLACE VIEW as a means of
+ * quasi-dropping an ON SELECT rule, that should
+ * be emitted unchanged as well.
+ *
+ * For other object types, we need to extract the
+ * first part of the DROP which includes the
+ * object type. Most of the time this matches
+ * te->desc, so search for that; however for the
+ * different kinds of CONSTRAINTs, we know to
+ * search for hardcoded "DROP CONSTRAINT" instead.
+ */
+ if (strcmp(te->desc, "DEFAULT") == 0 ||
+ strcmp(te->desc, "DATABASE PROPERTIES") == 0 ||
+ strncmp(dropStmt, "CREATE OR REPLACE VIEW", 22) == 0)
+ appendPQExpBufferStr(ftStmt, dropStmt);
+ else
+ {
+ char buffer[40];
+ char *mark;
+
+ if (strcmp(te->desc, "CONSTRAINT") == 0 ||
+ strcmp(te->desc, "CHECK CONSTRAINT") == 0 ||
+ strcmp(te->desc, "FK CONSTRAINT") == 0)
+ strcpy(buffer, "DROP CONSTRAINT");
+ else
+ snprintf(buffer, sizeof(buffer), "DROP %s",
+ te->desc);
+
+ mark = strstr(dropStmt, buffer);
+
+ if (mark)
+ {
+ *mark = '\0';
+ appendPQExpBuffer(ftStmt, "%s%s IF EXISTS%s",
+ dropStmt, buffer,
+ mark + strlen(buffer));
+ }
+ else
+ {
+ /* complain and emit unmodified command */
+ pg_log_warning("could not find where to insert IF EXISTS in statement \"%s\"",
+ dropStmtOrig);
+ appendPQExpBufferStr(ftStmt, dropStmt);
+ }
+ }
+
+ ahprintf(AH, "%s", ftStmt->data);
+
+ destroyPQExpBuffer(ftStmt);
+ pg_free(dropStmtOrig);
+ }
+ }
+ }
+ }
+ }
+
+ /*
+ * _selectOutputSchema may have set currSchema to reflect the effect
+ * of a "SET search_path" command it emitted. However, by now we may
+ * have dropped that schema; or it might not have existed in the first
+ * place. In either case the effective value of search_path will not
+ * be what we think. Forcibly reset currSchema so that we will
+ * re-establish the search_path setting when needed (after creating
+ * the schema).
+ *
+ * If we treated users as pg_dump'able objects then we'd need to reset
+ * currUser here too.
+ */
+ if (AH->currSchema)
+ free(AH->currSchema);
+ AH->currSchema = NULL;
+ }
+
+ if (parallel_mode)
+ {
+ /*
+ * In parallel mode, turn control over to the parallel-restore logic.
+ */
+ ParallelState *pstate;
+ TocEntry pending_list;
+
+ /* The archive format module may need some setup for this */
+ if (AH->PrepParallelRestorePtr)
+ AH->PrepParallelRestorePtr(AH);
+
+ pending_list_header_init(&pending_list);
+
+ /* This runs PRE_DATA items and then disconnects from the database */
+ restore_toc_entries_prefork(AH, &pending_list);
+ Assert(AH->connection == NULL);
+
+ /* ParallelBackupStart() will actually fork the processes */
+ pstate = ParallelBackupStart(AH);
+ restore_toc_entries_parallel(AH, pstate, &pending_list);
+ ParallelBackupEnd(AH, pstate);
+
+ /* reconnect the leader and see if we missed something */
+ restore_toc_entries_postfork(AH, &pending_list);
+ Assert(AH->connection != NULL);
+ }
+ else
+ {
+ /*
+ * In serial mode, process everything in three phases: normal items,
+ * then ACLs, then post-ACL items. We might be able to skip one or
+ * both extra phases in some cases, eg data-only restores.
+ */
+ bool haveACL = false;
+ bool havePostACL = false;
+
+ for (te = AH->toc->next; te != AH->toc; te = te->next)
+ {
+ if ((te->reqs & (REQ_SCHEMA | REQ_DATA)) == 0)
+ continue; /* ignore if not to be dumped at all */
+
+ switch (_tocEntryRestorePass(te))
+ {
+ case RESTORE_PASS_MAIN:
+ (void) restore_toc_entry(AH, te, false);
+ break;
+ case RESTORE_PASS_ACL:
+ haveACL = true;
+ break;
+ case RESTORE_PASS_POST_ACL:
+ havePostACL = true;
+ break;
+ }
+ }
+
+ if (haveACL)
+ {
+ for (te = AH->toc->next; te != AH->toc; te = te->next)
+ {
+ if ((te->reqs & (REQ_SCHEMA | REQ_DATA)) != 0 &&
+ _tocEntryRestorePass(te) == RESTORE_PASS_ACL)
+ (void) restore_toc_entry(AH, te, false);
+ }
+ }
+
+ if (havePostACL)
+ {
+ for (te = AH->toc->next; te != AH->toc; te = te->next)
+ {
+ if ((te->reqs & (REQ_SCHEMA | REQ_DATA)) != 0 &&
+ _tocEntryRestorePass(te) == RESTORE_PASS_POST_ACL)
+ (void) restore_toc_entry(AH, te, false);
+ }
+ }
+ }
+
+ if (ropt->single_txn)
+ {
+ if (AH->connection)
+ CommitTransaction(AHX);
+ else
+ ahprintf(AH, "COMMIT;\n\n");
+ }
+
+ if (AH->public.verbose)
+ dumpTimestamp(AH, "Completed on", time(NULL));
+
+ ahprintf(AH, "--\n-- PostgreSQL database dump complete\n--\n\n");
+
+ /*
+ * Clean up & we're done.
+ */
+ AH->stage = STAGE_FINALIZING;
+
+ if (ropt->filename || ropt->compression)
+ RestoreOutput(AH, sav);
+
+ if (ropt->useDB)
+ DisconnectDatabase(&AH->public);
+}
+
+/*
+ * Restore a single TOC item. Used in both parallel and non-parallel restore;
+ * is_parallel is true if we are in a worker child process.
+ *
+ * Returns 0 normally, but WORKER_CREATE_DONE or WORKER_INHIBIT_DATA if
+ * the parallel parent has to make the corresponding status update.
+ */
+static int
+restore_toc_entry(ArchiveHandle *AH, TocEntry *te, bool is_parallel)
+{
+ RestoreOptions *ropt = AH->public.ropt;
+ int status = WORKER_OK;
+ int reqs;
+ bool defnDumped;
+
+ AH->currentTE = te;
+
+ /* Dump any relevant dump warnings to stderr */
+ if (!ropt->suppressDumpWarnings && strcmp(te->desc, "WARNING") == 0)
+ {
+ if (!ropt->dataOnly && te->defn != NULL && strlen(te->defn) != 0)
+ pg_log_warning("warning from original dump file: %s", te->defn);
+ else if (te->copyStmt != NULL && strlen(te->copyStmt) != 0)
+ pg_log_warning("warning from original dump file: %s", te->copyStmt);
+ }
+
+ /* Work out what, if anything, we want from this entry */
+ reqs = te->reqs;
+
+ defnDumped = false;
+
+ /*
+ * If it has a schema component that we want, then process that
+ */
+ if ((reqs & REQ_SCHEMA) != 0)
+ {
+ /* Show namespace in log message if available */
+ if (te->namespace)
+ pg_log_info("creating %s \"%s.%s\"",
+ te->desc, te->namespace, te->tag);
+ else
+ pg_log_info("creating %s \"%s\"",
+ te->desc, te->tag);
+
+ _printTocEntry(AH, te, false);
+ defnDumped = true;
+
+ if (strcmp(te->desc, "TABLE") == 0)
+ {
+ if (AH->lastErrorTE == te)
+ {
+ /*
+ * We failed to create the table. If
+ * --no-data-for-failed-tables was given, mark the
+ * corresponding TABLE DATA to be ignored.
+ *
+ * In the parallel case this must be done in the parent, so we
+ * just set the return value.
+ */
+ if (ropt->noDataForFailedTables)
+ {
+ if (is_parallel)
+ status = WORKER_INHIBIT_DATA;
+ else
+ inhibit_data_for_failed_table(AH, te);
+ }
+ }
+ else
+ {
+ /*
+ * We created the table successfully. Mark the corresponding
+ * TABLE DATA for possible truncation.
+ *
+ * In the parallel case this must be done in the parent, so we
+ * just set the return value.
+ */
+ if (is_parallel)
+ status = WORKER_CREATE_DONE;
+ else
+ mark_create_done(AH, te);
+ }
+ }
+
+ /*
+ * If we created a DB, connect to it. Also, if we changed DB
+ * properties, reconnect to ensure that relevant GUC settings are
+ * applied to our session.
+ */
+ if (strcmp(te->desc, "DATABASE") == 0 ||
+ strcmp(te->desc, "DATABASE PROPERTIES") == 0)
+ {
+ pg_log_info("connecting to new database \"%s\"", te->tag);
+ _reconnectToDB(AH, te->tag);
+ }
+ }
+
+ /*
+ * If it has a data component that we want, then process that
+ */
+ if ((reqs & REQ_DATA) != 0)
+ {
+ /*
+ * hadDumper will be set if there is genuine data component for this
+ * node. Otherwise, we need to check the defn field for statements
+ * that need to be executed in data-only restores.
+ */
+ if (te->hadDumper)
+ {
+ /*
+ * If we can output the data, then restore it.
+ */
+ if (AH->PrintTocDataPtr != NULL)
+ {
+ _printTocEntry(AH, te, true);
+
+ if (strcmp(te->desc, "BLOBS") == 0 ||
+ strcmp(te->desc, "BLOB COMMENTS") == 0)
+ {
+ pg_log_info("processing %s", te->desc);
+
+ _selectOutputSchema(AH, "pg_catalog");
+
+ /* Send BLOB COMMENTS data to ExecuteSimpleCommands() */
+ if (strcmp(te->desc, "BLOB COMMENTS") == 0)
+ AH->outputKind = OUTPUT_OTHERDATA;
+
+ AH->PrintTocDataPtr(AH, te);
+
+ AH->outputKind = OUTPUT_SQLCMDS;
+ }
+ else
+ {
+ _disableTriggersIfNecessary(AH, te);
+
+ /* Select owner and schema as necessary */
+ _becomeOwner(AH, te);
+ _selectOutputSchema(AH, te->namespace);
+
+ pg_log_info("processing data for table \"%s.%s\"",
+ te->namespace, te->tag);
+
+ /*
+ * In parallel restore, if we created the table earlier in
+ * the run then we wrap the COPY in a transaction and
+ * precede it with a TRUNCATE. If archiving is not on
+ * this prevents WAL-logging the COPY. This obtains a
+ * speedup similar to that from using single_txn mode in
+ * non-parallel restores.
+ */
+ if (is_parallel && te->created)
+ {
+ /*
+ * Parallel restore is always talking directly to a
+ * server, so no need to see if we should issue BEGIN.
+ */
+ StartTransaction(&AH->public);
+
+ /*
+ * If the server version is >= 8.4, make sure we issue
+ * TRUNCATE with ONLY so that child tables are not
+ * wiped.
+ */
+ ahprintf(AH, "TRUNCATE TABLE %s%s;\n\n",
+ (PQserverVersion(AH->connection) >= 80400 ?
+ "ONLY " : ""),
+ fmtQualifiedId(te->namespace, te->tag));
+ }
+
+ /*
+ * If we have a copy statement, use it.
+ */
+ if (te->copyStmt && strlen(te->copyStmt) > 0)
+ {
+ ahprintf(AH, "%s", te->copyStmt);
+ AH->outputKind = OUTPUT_COPYDATA;
+ }
+ else
+ AH->outputKind = OUTPUT_OTHERDATA;
+
+ AH->PrintTocDataPtr(AH, te);
+
+ /*
+ * Terminate COPY if needed.
+ */
+ if (AH->outputKind == OUTPUT_COPYDATA &&
+ RestoringToDB(AH))
+ EndDBCopyMode(&AH->public, te->tag);
+ AH->outputKind = OUTPUT_SQLCMDS;
+
+ /* close out the transaction started above */
+ if (is_parallel && te->created)
+ CommitTransaction(&AH->public);
+
+ _enableTriggersIfNecessary(AH, te);
+ }
+ }
+ }
+ else if (!defnDumped)
+ {
+ /* If we haven't already dumped the defn part, do so now */
+ pg_log_info("executing %s %s", te->desc, te->tag);
+ _printTocEntry(AH, te, false);
+ }
+ }
+
+ if (AH->public.n_errors > 0 && status == WORKER_OK)
+ status = WORKER_IGNORED_ERRORS;
+
+ return status;
+}
+
+/*
+ * Allocate a new RestoreOptions block.
+ * This is mainly so we can initialize it, but also for future expansion,
+ */
+RestoreOptions *
+NewRestoreOptions(void)
+{
+ RestoreOptions *opts;
+
+ opts = (RestoreOptions *) pg_malloc0(sizeof(RestoreOptions));
+
+ /* set any fields that shouldn't default to zeroes */
+ opts->format = archUnknown;
+ opts->cparams.promptPassword = TRI_DEFAULT;
+ opts->dumpSections = DUMP_UNSECTIONED;
+
+ return opts;
+}
+
+static void
+_disableTriggersIfNecessary(ArchiveHandle *AH, TocEntry *te)
+{
+ RestoreOptions *ropt = AH->public.ropt;
+
+ /* This hack is only needed in a data-only restore */
+ if (!ropt->dataOnly || !ropt->disable_triggers)
+ return;
+
+ pg_log_info("disabling triggers for %s", te->tag);
+
+ /*
+ * Become superuser if possible, since they are the only ones who can
+ * disable constraint triggers. If -S was not given, assume the initial
+ * user identity is a superuser. (XXX would it be better to become the
+ * table owner?)
+ */
+ _becomeUser(AH, ropt->superuser);
+
+ /*
+ * Disable them.
+ */
+ ahprintf(AH, "ALTER TABLE %s DISABLE TRIGGER ALL;\n\n",
+ fmtQualifiedId(te->namespace, te->tag));
+}
+
+static void
+_enableTriggersIfNecessary(ArchiveHandle *AH, TocEntry *te)
+{
+ RestoreOptions *ropt = AH->public.ropt;
+
+ /* This hack is only needed in a data-only restore */
+ if (!ropt->dataOnly || !ropt->disable_triggers)
+ return;
+
+ pg_log_info("enabling triggers for %s", te->tag);
+
+ /*
+ * Become superuser if possible, since they are the only ones who can
+ * disable constraint triggers. If -S was not given, assume the initial
+ * user identity is a superuser. (XXX would it be better to become the
+ * table owner?)
+ */
+ _becomeUser(AH, ropt->superuser);
+
+ /*
+ * Enable them.
+ */
+ ahprintf(AH, "ALTER TABLE %s ENABLE TRIGGER ALL;\n\n",
+ fmtQualifiedId(te->namespace, te->tag));
+}
+
+/*
+ * This is a routine that is part of the dumper interface, hence the 'Archive*' parameter.
+ */
+
+/* Public */
+void
+WriteData(Archive *AHX, const void *data, size_t dLen)
+{
+ ArchiveHandle *AH = (ArchiveHandle *) AHX;
+
+ if (!AH->currToc)
+ fatal("internal error -- WriteData cannot be called outside the context of a DataDumper routine");
+
+ AH->WriteDataPtr(AH, data, dLen);
+}
+
+/*
+ * Create a new TOC entry. The TOC was designed as a TOC, but is now the
+ * repository for all metadata. But the name has stuck.
+ *
+ * The new entry is added to the Archive's TOC list. Most callers can ignore
+ * the result value because nothing else need be done, but a few want to
+ * manipulate the TOC entry further.
+ */
+
+/* Public */
+TocEntry *
+ArchiveEntry(Archive *AHX, CatalogId catalogId, DumpId dumpId,
+ ArchiveOpts *opts)
+{
+ ArchiveHandle *AH = (ArchiveHandle *) AHX;
+ TocEntry *newToc;
+
+ newToc = (TocEntry *) pg_malloc0(sizeof(TocEntry));
+
+ AH->tocCount++;
+ if (dumpId > AH->maxDumpId)
+ AH->maxDumpId = dumpId;
+
+ newToc->prev = AH->toc->prev;
+ newToc->next = AH->toc;
+ AH->toc->prev->next = newToc;
+ AH->toc->prev = newToc;
+
+ newToc->catalogId = catalogId;
+ newToc->dumpId = dumpId;
+ newToc->section = opts->section;
+
+ newToc->tag = pg_strdup(opts->tag);
+ newToc->namespace = opts->namespace ? pg_strdup(opts->namespace) : NULL;
+ newToc->tablespace = opts->tablespace ? pg_strdup(opts->tablespace) : NULL;
+ newToc->tableam = opts->tableam ? pg_strdup(opts->tableam) : NULL;
+ newToc->owner = opts->owner ? pg_strdup(opts->owner) : NULL;
+ newToc->desc = pg_strdup(opts->description);
+ newToc->defn = opts->createStmt ? pg_strdup(opts->createStmt) : NULL;
+ newToc->dropStmt = opts->dropStmt ? pg_strdup(opts->dropStmt) : NULL;
+ newToc->copyStmt = opts->copyStmt ? pg_strdup(opts->copyStmt) : NULL;
+
+ if (opts->nDeps > 0)
+ {
+ newToc->dependencies = (DumpId *) pg_malloc(opts->nDeps * sizeof(DumpId));
+ memcpy(newToc->dependencies, opts->deps, opts->nDeps * sizeof(DumpId));
+ newToc->nDeps = opts->nDeps;
+ }
+ else
+ {
+ newToc->dependencies = NULL;
+ newToc->nDeps = 0;
+ }
+
+ newToc->dataDumper = opts->dumpFn;
+ newToc->dataDumperArg = opts->dumpArg;
+ newToc->hadDumper = opts->dumpFn ? true : false;
+
+ newToc->formatData = NULL;
+ newToc->dataLength = 0;
+
+ if (AH->ArchiveEntryPtr != NULL)
+ AH->ArchiveEntryPtr(AH, newToc);
+
+ return newToc;
+}
+
+/* Public */
+void
+PrintTOCSummary(Archive *AHX)
+{
+ ArchiveHandle *AH = (ArchiveHandle *) AHX;
+ RestoreOptions *ropt = AH->public.ropt;
+ TocEntry *te;
+ teSection curSection;
+ OutputContext sav;
+ const char *fmtName;
+ char stamp_str[64];
+
+ sav = SaveOutput(AH);
+ if (ropt->filename)
+ SetOutput(AH, ropt->filename, 0 /* no compression */ );
+
+ if (strftime(stamp_str, sizeof(stamp_str), PGDUMP_STRFTIME_FMT,
+ localtime(&AH->createDate)) == 0)
+ strcpy(stamp_str, "[unknown]");
+
+ ahprintf(AH, ";\n; Archive created at %s\n", stamp_str);
+ ahprintf(AH, "; dbname: %s\n; TOC Entries: %d\n; Compression: %d\n",
+ sanitize_line(AH->archdbname, false),
+ AH->tocCount, AH->compression);
+
+ switch (AH->format)
+ {
+ case archCustom:
+ fmtName = "CUSTOM";
+ break;
+ case archDirectory:
+ fmtName = "DIRECTORY";
+ break;
+ case archTar:
+ fmtName = "TAR";
+ break;
+ default:
+ fmtName = "UNKNOWN";
+ }
+
+ ahprintf(AH, "; Dump Version: %d.%d-%d\n",
+ ARCHIVE_MAJOR(AH->version), ARCHIVE_MINOR(AH->version), ARCHIVE_REV(AH->version));
+ ahprintf(AH, "; Format: %s\n", fmtName);
+ ahprintf(AH, "; Integer: %d bytes\n", (int) AH->intSize);
+ ahprintf(AH, "; Offset: %d bytes\n", (int) AH->offSize);
+ if (AH->archiveRemoteVersion)
+ ahprintf(AH, "; Dumped from database version: %s\n",
+ AH->archiveRemoteVersion);
+ if (AH->archiveDumpVersion)
+ ahprintf(AH, "; Dumped by pg_dump version: %s\n",
+ AH->archiveDumpVersion);
+
+ ahprintf(AH, ";\n;\n; Selected TOC Entries:\n;\n");
+
+ curSection = SECTION_PRE_DATA;
+ for (te = AH->toc->next; te != AH->toc; te = te->next)
+ {
+ if (te->section != SECTION_NONE)
+ curSection = te->section;
+ if (ropt->verbose ||
+ (_tocEntryRequired(te, curSection, AH) & (REQ_SCHEMA | REQ_DATA)) != 0)
+ {
+ char *sanitized_name;
+ char *sanitized_schema;
+ char *sanitized_owner;
+
+ /*
+ */
+ sanitized_name = sanitize_line(te->tag, false);
+ sanitized_schema = sanitize_line(te->namespace, true);
+ sanitized_owner = sanitize_line(te->owner, false);
+
+ ahprintf(AH, "%d; %u %u %s %s %s %s\n", te->dumpId,
+ te->catalogId.tableoid, te->catalogId.oid,
+ te->desc, sanitized_schema, sanitized_name,
+ sanitized_owner);
+
+ free(sanitized_name);
+ free(sanitized_schema);
+ free(sanitized_owner);
+ }
+ if (ropt->verbose && te->nDeps > 0)
+ {
+ int i;
+
+ ahprintf(AH, ";\tdepends on:");
+ for (i = 0; i < te->nDeps; i++)
+ ahprintf(AH, " %d", te->dependencies[i]);
+ ahprintf(AH, "\n");
+ }
+ }
+
+ /* Enforce strict names checking */
+ if (ropt->strict_names)
+ StrictNamesCheck(ropt);
+
+ if (ropt->filename)
+ RestoreOutput(AH, sav);
+}
+
+/***********
+ * BLOB Archival
+ ***********/
+
+/* Called by a dumper to signal start of a BLOB */
+int
+StartBlob(Archive *AHX, Oid oid)
+{
+ ArchiveHandle *AH = (ArchiveHandle *) AHX;
+
+ if (!AH->StartBlobPtr)
+ fatal("large-object output not supported in chosen format");
+
+ AH->StartBlobPtr(AH, AH->currToc, oid);
+
+ return 1;
+}
+
+/* Called by a dumper to signal end of a BLOB */
+int
+EndBlob(Archive *AHX, Oid oid)
+{
+ ArchiveHandle *AH = (ArchiveHandle *) AHX;
+
+ if (AH->EndBlobPtr)
+ AH->EndBlobPtr(AH, AH->currToc, oid);
+
+ return 1;
+}
+
+/**********
+ * BLOB Restoration
+ **********/
+
+/*
+ * Called by a format handler before any blobs are restored
+ */
+void
+StartRestoreBlobs(ArchiveHandle *AH)
+{
+ RestoreOptions *ropt = AH->public.ropt;
+
+ if (!ropt->single_txn)
+ {
+ if (AH->connection)
+ StartTransaction(&AH->public);
+ else
+ ahprintf(AH, "BEGIN;\n\n");
+ }
+
+ AH->blobCount = 0;
+}
+
+/*
+ * Called by a format handler after all blobs are restored
+ */
+void
+EndRestoreBlobs(ArchiveHandle *AH)
+{
+ RestoreOptions *ropt = AH->public.ropt;
+
+ if (!ropt->single_txn)
+ {
+ if (AH->connection)
+ CommitTransaction(&AH->public);
+ else
+ ahprintf(AH, "COMMIT;\n\n");
+ }
+
+ pg_log_info(ngettext("restored %d large object",
+ "restored %d large objects",
+ AH->blobCount),
+ AH->blobCount);
+}
+
+
+/*
+ * Called by a format handler to initiate restoration of a blob
+ */
+void
+StartRestoreBlob(ArchiveHandle *AH, Oid oid, bool drop)
+{
+ bool old_blob_style = (AH->version < K_VERS_1_12);
+ Oid loOid;
+
+ AH->blobCount++;
+
+ /* Initialize the LO Buffer */
+ AH->lo_buf_used = 0;
+
+ pg_log_info("restoring large object with OID %u", oid);
+
+ /* With an old archive we must do drop and create logic here */
+ if (old_blob_style && drop)
+ DropBlobIfExists(AH, oid);
+
+ if (AH->connection)
+ {
+ if (old_blob_style)
+ {
+ loOid = lo_create(AH->connection, oid);
+ if (loOid == 0 || loOid != oid)
+ fatal("could not create large object %u: %s",
+ oid, PQerrorMessage(AH->connection));
+ }
+ AH->loFd = lo_open(AH->connection, oid, INV_WRITE);
+ if (AH->loFd == -1)
+ fatal("could not open large object %u: %s",
+ oid, PQerrorMessage(AH->connection));
+ }
+ else
+ {
+ if (old_blob_style)
+ ahprintf(AH, "SELECT pg_catalog.lo_open(pg_catalog.lo_create('%u'), %d);\n",
+ oid, INV_WRITE);
+ else
+ ahprintf(AH, "SELECT pg_catalog.lo_open('%u', %d);\n",
+ oid, INV_WRITE);
+ }
+
+ AH->writingBlob = 1;
+}
+
+void
+EndRestoreBlob(ArchiveHandle *AH, Oid oid)
+{
+ if (AH->lo_buf_used > 0)
+ {
+ /* Write remaining bytes from the LO buffer */
+ dump_lo_buf(AH);
+ }
+
+ AH->writingBlob = 0;
+
+ if (AH->connection)
+ {
+ lo_close(AH->connection, AH->loFd);
+ AH->loFd = -1;
+ }
+ else
+ {
+ ahprintf(AH, "SELECT pg_catalog.lo_close(0);\n\n");
+ }
+}
+
+/***********
+ * Sorting and Reordering
+ ***********/
+
+void
+SortTocFromFile(Archive *AHX)
+{
+ ArchiveHandle *AH = (ArchiveHandle *) AHX;
+ RestoreOptions *ropt = AH->public.ropt;
+ FILE *fh;
+ StringInfoData linebuf;
+
+ /* Allocate space for the 'wanted' array, and init it */
+ ropt->idWanted = (bool *) pg_malloc0(sizeof(bool) * AH->maxDumpId);
+
+ /* Setup the file */
+ fh = fopen(ropt->tocFile, PG_BINARY_R);
+ if (!fh)
+ fatal("could not open TOC file \"%s\": %m", ropt->tocFile);
+
+ initStringInfo(&linebuf);
+
+ while (pg_get_line_buf(fh, &linebuf))
+ {
+ char *cmnt;
+ char *endptr;
+ DumpId id;
+ TocEntry *te;
+
+ /* Truncate line at comment, if any */
+ cmnt = strchr(linebuf.data, ';');
+ if (cmnt != NULL)
+ {
+ cmnt[0] = '\0';
+ linebuf.len = cmnt - linebuf.data;
+ }
+
+ /* Ignore if all blank */
+ if (strspn(linebuf.data, " \t\r\n") == linebuf.len)
+ continue;
+
+ /* Get an ID, check it's valid and not already seen */
+ id = strtol(linebuf.data, &endptr, 10);
+ if (endptr == linebuf.data || id <= 0 || id > AH->maxDumpId ||
+ ropt->idWanted[id - 1])
+ {
+ pg_log_warning("line ignored: %s", linebuf.data);
+ continue;
+ }
+
+ /* Find TOC entry */
+ te = getTocEntryByDumpId(AH, id);
+ if (!te)
+ fatal("could not find entry for ID %d",
+ id);
+
+ /* Mark it wanted */
+ ropt->idWanted[id - 1] = true;
+
+ /*
+ * Move each item to the end of the list as it is selected, so that
+ * they are placed in the desired order. Any unwanted items will end
+ * up at the front of the list, which may seem unintuitive but it's
+ * what we need. In an ordinary serial restore that makes no
+ * difference, but in a parallel restore we need to mark unrestored
+ * items' dependencies as satisfied before we start examining
+ * restorable items. Otherwise they could have surprising
+ * side-effects on the order in which restorable items actually get
+ * restored.
+ */
+ _moveBefore(AH->toc, te);
+ }
+
+ pg_free(linebuf.data);
+
+ if (fclose(fh) != 0)
+ fatal("could not close TOC file: %m");
+}
+
+/**********************
+ * Convenience functions that look like standard IO functions
+ * for writing data when in dump mode.
+ **********************/
+
+/* Public */
+void
+archputs(const char *s, Archive *AH)
+{
+ WriteData(AH, s, strlen(s));
+}
+
+/* Public */
+int
+archprintf(Archive *AH, const char *fmt,...)
+{
+ int save_errno = errno;
+ char *p;
+ size_t len = 128; /* initial assumption about buffer size */
+ size_t cnt;
+
+ for (;;)
+ {
+ va_list args;
+
+ /* Allocate work buffer. */
+ p = (char *) pg_malloc(len);
+
+ /* Try to format the data. */
+ errno = save_errno;
+ va_start(args, fmt);
+ cnt = pvsnprintf(p, len, fmt, args);
+ va_end(args);
+
+ if (cnt < len)
+ break; /* success */
+
+ /* Release buffer and loop around to try again with larger len. */
+ free(p);
+ len = cnt;
+ }
+
+ WriteData(AH, p, cnt);
+ free(p);
+ return (int) cnt;
+}
+
+
+/*******************************
+ * Stuff below here should be 'private' to the archiver routines
+ *******************************/
+
+static void
+SetOutput(ArchiveHandle *AH, const char *filename, int compression)
+{
+ int fn;
+
+ if (filename)
+ {
+ if (strcmp(filename, "-") == 0)
+ fn = fileno(stdout);
+ else
+ fn = -1;
+ }
+ else if (AH->FH)
+ fn = fileno(AH->FH);
+ else if (AH->fSpec)
+ {
+ fn = -1;
+ filename = AH->fSpec;
+ }
+ else
+ fn = fileno(stdout);
+
+ /* If compression explicitly requested, use gzopen */
+#ifdef HAVE_LIBZ
+ if (compression != 0)
+ {
+ char fmode[14];
+
+ /* Don't use PG_BINARY_x since this is zlib */
+ sprintf(fmode, "wb%d", compression);
+ if (fn >= 0)
+ AH->OF = gzdopen(dup(fn), fmode);
+ else
+ AH->OF = gzopen(filename, fmode);
+ AH->gzOut = 1;
+ }
+ else
+#endif
+ { /* Use fopen */
+ if (AH->mode == archModeAppend)
+ {
+ if (fn >= 0)
+ AH->OF = fdopen(dup(fn), PG_BINARY_A);
+ else
+ AH->OF = fopen(filename, PG_BINARY_A);
+ }
+ else
+ {
+ if (fn >= 0)
+ AH->OF = fdopen(dup(fn), PG_BINARY_W);
+ else
+ AH->OF = fopen(filename, PG_BINARY_W);
+ }
+ AH->gzOut = 0;
+ }
+
+ if (!AH->OF)
+ {
+ if (filename)
+ fatal("could not open output file \"%s\": %m", filename);
+ else
+ fatal("could not open output file: %m");
+ }
+}
+
+static OutputContext
+SaveOutput(ArchiveHandle *AH)
+{
+ OutputContext sav;
+
+ sav.OF = AH->OF;
+ sav.gzOut = AH->gzOut;
+
+ return sav;
+}
+
+static void
+RestoreOutput(ArchiveHandle *AH, OutputContext savedContext)
+{
+ int res;
+
+ errno = 0; /* in case gzclose() doesn't set it */
+ if (AH->gzOut)
+ res = GZCLOSE(AH->OF);
+ else
+ res = fclose(AH->OF);
+
+ if (res != 0)
+ fatal("could not close output file: %m");
+
+ AH->gzOut = savedContext.gzOut;
+ AH->OF = savedContext.OF;
+}
+
+
+
+/*
+ * Print formatted text to the output file (usually stdout).
+ */
+int
+ahprintf(ArchiveHandle *AH, const char *fmt,...)
+{
+ int save_errno = errno;
+ char *p;
+ size_t len = 128; /* initial assumption about buffer size */
+ size_t cnt;
+
+ for (;;)
+ {
+ va_list args;
+
+ /* Allocate work buffer. */
+ p = (char *) pg_malloc(len);
+
+ /* Try to format the data. */
+ errno = save_errno;
+ va_start(args, fmt);
+ cnt = pvsnprintf(p, len, fmt, args);
+ va_end(args);
+
+ if (cnt < len)
+ break; /* success */
+
+ /* Release buffer and loop around to try again with larger len. */
+ free(p);
+ len = cnt;
+ }
+
+ ahwrite(p, 1, cnt, AH);
+ free(p);
+ return (int) cnt;
+}
+
+/*
+ * Single place for logic which says 'We are restoring to a direct DB connection'.
+ */
+static int
+RestoringToDB(ArchiveHandle *AH)
+{
+ RestoreOptions *ropt = AH->public.ropt;
+
+ return (ropt && ropt->useDB && AH->connection);
+}
+
+/*
+ * Dump the current contents of the LO data buffer while writing a BLOB
+ */
+static void
+dump_lo_buf(ArchiveHandle *AH)
+{
+ if (AH->connection)
+ {
+ int res;
+
+ res = lo_write(AH->connection, AH->loFd, AH->lo_buf, AH->lo_buf_used);
+ pg_log_debug(ngettext("wrote %zu byte of large object data (result = %d)",
+ "wrote %zu bytes of large object data (result = %d)",
+ AH->lo_buf_used),
+ AH->lo_buf_used, res);
+ /* We assume there are no short writes, only errors */
+ if (res != AH->lo_buf_used)
+ warn_or_exit_horribly(AH, "could not write to large object: %s",
+ PQerrorMessage(AH->connection));
+ }
+ else
+ {
+ PQExpBuffer buf = createPQExpBuffer();
+
+ appendByteaLiteralAHX(buf,
+ (const unsigned char *) AH->lo_buf,
+ AH->lo_buf_used,
+ AH);
+
+ /* Hack: turn off writingBlob so ahwrite doesn't recurse to here */
+ AH->writingBlob = 0;
+ ahprintf(AH, "SELECT pg_catalog.lowrite(0, %s);\n", buf->data);
+ AH->writingBlob = 1;
+
+ destroyPQExpBuffer(buf);
+ }
+ AH->lo_buf_used = 0;
+}
+
+
+/*
+ * Write buffer to the output file (usually stdout). This is used for
+ * outputting 'restore' scripts etc. It is even possible for an archive
+ * format to create a custom output routine to 'fake' a restore if it
+ * wants to generate a script (see TAR output).
+ */
+void
+ahwrite(const void *ptr, size_t size, size_t nmemb, ArchiveHandle *AH)
+{
+ int bytes_written = 0;
+
+ if (AH->writingBlob)
+ {
+ size_t remaining = size * nmemb;
+
+ while (AH->lo_buf_used + remaining > AH->lo_buf_size)
+ {
+ size_t avail = AH->lo_buf_size - AH->lo_buf_used;
+
+ memcpy((char *) AH->lo_buf + AH->lo_buf_used, ptr, avail);
+ ptr = (const void *) ((const char *) ptr + avail);
+ remaining -= avail;
+ AH->lo_buf_used += avail;
+ dump_lo_buf(AH);
+ }
+
+ memcpy((char *) AH->lo_buf + AH->lo_buf_used, ptr, remaining);
+ AH->lo_buf_used += remaining;
+
+ bytes_written = size * nmemb;
+ }
+ else if (AH->gzOut)
+ bytes_written = GZWRITE(ptr, size, nmemb, AH->OF);
+ else if (AH->CustomOutPtr)
+ bytes_written = AH->CustomOutPtr(AH, ptr, size * nmemb);
+
+ else
+ {
+ /*
+ * If we're doing a restore, and it's direct to DB, and we're
+ * connected then send it to the DB.
+ */
+ if (RestoringToDB(AH))
+ bytes_written = ExecuteSqlCommandBuf(&AH->public, (const char *) ptr, size * nmemb);
+ else
+ bytes_written = fwrite(ptr, size, nmemb, AH->OF) * size;
+ }
+
+ if (bytes_written != size * nmemb)
+ WRITE_ERROR_EXIT;
+}
+
+/* on some error, we may decide to go on... */
+void
+warn_or_exit_horribly(ArchiveHandle *AH, const char *fmt,...)
+{
+ va_list ap;
+
+ switch (AH->stage)
+ {
+
+ case STAGE_NONE:
+ /* Do nothing special */
+ break;
+
+ case STAGE_INITIALIZING:
+ if (AH->stage != AH->lastErrorStage)
+ pg_log_generic(PG_LOG_INFO, "while INITIALIZING:");
+ break;
+
+ case STAGE_PROCESSING:
+ if (AH->stage != AH->lastErrorStage)
+ pg_log_generic(PG_LOG_INFO, "while PROCESSING TOC:");
+ break;
+
+ case STAGE_FINALIZING:
+ if (AH->stage != AH->lastErrorStage)
+ pg_log_generic(PG_LOG_INFO, "while FINALIZING:");
+ break;
+ }
+ if (AH->currentTE != NULL && AH->currentTE != AH->lastErrorTE)
+ {
+ pg_log_generic(PG_LOG_INFO, "from TOC entry %d; %u %u %s %s %s",
+ AH->currentTE->dumpId,
+ AH->currentTE->catalogId.tableoid,
+ AH->currentTE->catalogId.oid,
+ AH->currentTE->desc ? AH->currentTE->desc : "(no desc)",
+ AH->currentTE->tag ? AH->currentTE->tag : "(no tag)",
+ AH->currentTE->owner ? AH->currentTE->owner : "(no owner)");
+ }
+ AH->lastErrorStage = AH->stage;
+ AH->lastErrorTE = AH->currentTE;
+
+ va_start(ap, fmt);
+ pg_log_generic_v(PG_LOG_ERROR, fmt, ap);
+ va_end(ap);
+
+ if (AH->public.exit_on_error)
+ exit_nicely(1);
+ else
+ AH->public.n_errors++;
+}
+
+#ifdef NOT_USED
+
+static void
+_moveAfter(ArchiveHandle *AH, TocEntry *pos, TocEntry *te)
+{
+ /* Unlink te from list */
+ te->prev->next = te->next;
+ te->next->prev = te->prev;
+
+ /* and insert it after "pos" */
+ te->prev = pos;
+ te->next = pos->next;
+ pos->next->prev = te;
+ pos->next = te;
+}
+#endif
+
+static void
+_moveBefore(TocEntry *pos, TocEntry *te)
+{
+ /* Unlink te from list */
+ te->prev->next = te->next;
+ te->next->prev = te->prev;
+
+ /* and insert it before "pos" */
+ te->prev = pos->prev;
+ te->next = pos;
+ pos->prev->next = te;
+ pos->prev = te;
+}
+
+/*
+ * Build index arrays for the TOC list
+ *
+ * This should be invoked only after we have created or read in all the TOC
+ * items.
+ *
+ * The arrays are indexed by dump ID (so entry zero is unused). Note that the
+ * array entries run only up to maxDumpId. We might see dependency dump IDs
+ * beyond that (if the dump was partial); so always check the array bound
+ * before trying to touch an array entry.
+ */
+static void
+buildTocEntryArrays(ArchiveHandle *AH)
+{
+ DumpId maxDumpId = AH->maxDumpId;
+ TocEntry *te;
+
+ AH->tocsByDumpId = (TocEntry **) pg_malloc0((maxDumpId + 1) * sizeof(TocEntry *));
+ AH->tableDataId = (DumpId *) pg_malloc0((maxDumpId + 1) * sizeof(DumpId));
+
+ for (te = AH->toc->next; te != AH->toc; te = te->next)
+ {
+ /* this check is purely paranoia, maxDumpId should be correct */
+ if (te->dumpId <= 0 || te->dumpId > maxDumpId)
+ fatal("bad dumpId");
+
+ /* tocsByDumpId indexes all TOCs by their dump ID */
+ AH->tocsByDumpId[te->dumpId] = te;
+
+ /*
+ * tableDataId provides the TABLE DATA item's dump ID for each TABLE
+ * TOC entry that has a DATA item. We compute this by reversing the
+ * TABLE DATA item's dependency, knowing that a TABLE DATA item has
+ * just one dependency and it is the TABLE item.
+ */
+ if (strcmp(te->desc, "TABLE DATA") == 0 && te->nDeps > 0)
+ {
+ DumpId tableId = te->dependencies[0];
+
+ /*
+ * The TABLE item might not have been in the archive, if this was
+ * a data-only dump; but its dump ID should be less than its data
+ * item's dump ID, so there should be a place for it in the array.
+ */
+ if (tableId <= 0 || tableId > maxDumpId)
+ fatal("bad table dumpId for TABLE DATA item");
+
+ AH->tableDataId[tableId] = te->dumpId;
+ }
+ }
+}
+
+TocEntry *
+getTocEntryByDumpId(ArchiveHandle *AH, DumpId id)
+{
+ /* build index arrays if we didn't already */
+ if (AH->tocsByDumpId == NULL)
+ buildTocEntryArrays(AH);
+
+ if (id > 0 && id <= AH->maxDumpId)
+ return AH->tocsByDumpId[id];
+
+ return NULL;
+}
+
+int
+TocIDRequired(ArchiveHandle *AH, DumpId id)
+{
+ TocEntry *te = getTocEntryByDumpId(AH, id);
+
+ if (!te)
+ return 0;
+
+ return te->reqs;
+}
+
+size_t
+WriteOffset(ArchiveHandle *AH, pgoff_t o, int wasSet)
+{
+ int off;
+
+ /* Save the flag */
+ AH->WriteBytePtr(AH, wasSet);
+
+ /* Write out pgoff_t smallest byte first, prevents endian mismatch */
+ for (off = 0; off < sizeof(pgoff_t); off++)
+ {
+ AH->WriteBytePtr(AH, o & 0xFF);
+ o >>= 8;
+ }
+ return sizeof(pgoff_t) + 1;
+}
+
+int
+ReadOffset(ArchiveHandle *AH, pgoff_t * o)
+{
+ int i;
+ int off;
+ int offsetFlg;
+
+ /* Initialize to zero */
+ *o = 0;
+
+ /* Check for old version */
+ if (AH->version < K_VERS_1_7)
+ {
+ /* Prior versions wrote offsets using WriteInt */
+ i = ReadInt(AH);
+ /* -1 means not set */
+ if (i < 0)
+ return K_OFFSET_POS_NOT_SET;
+ else if (i == 0)
+ return K_OFFSET_NO_DATA;
+
+ /* Cast to pgoff_t because it was written as an int. */
+ *o = (pgoff_t) i;
+ return K_OFFSET_POS_SET;
+ }
+
+ /*
+ * Read the flag indicating the state of the data pointer. Check if valid
+ * and die if not.
+ *
+ * This used to be handled by a negative or zero pointer, now we use an
+ * extra byte specifically for the state.
+ */
+ offsetFlg = AH->ReadBytePtr(AH) & 0xFF;
+
+ switch (offsetFlg)
+ {
+ case K_OFFSET_POS_NOT_SET:
+ case K_OFFSET_NO_DATA:
+ case K_OFFSET_POS_SET:
+
+ break;
+
+ default:
+ fatal("unexpected data offset flag %d", offsetFlg);
+ }
+
+ /*
+ * Read the bytes
+ */
+ for (off = 0; off < AH->offSize; off++)
+ {
+ if (off < sizeof(pgoff_t))
+ *o |= ((pgoff_t) (AH->ReadBytePtr(AH))) << (off * 8);
+ else
+ {
+ if (AH->ReadBytePtr(AH) != 0)
+ fatal("file offset in dump file is too large");
+ }
+ }
+
+ return offsetFlg;
+}
+
+size_t
+WriteInt(ArchiveHandle *AH, int i)
+{
+ int b;
+
+ /*
+ * This is a bit yucky, but I don't want to make the binary format very
+ * dependent on representation, and not knowing much about it, I write out
+ * a sign byte. If you change this, don't forget to change the file
+ * version #, and modify ReadInt to read the new format AS WELL AS the old
+ * formats.
+ */
+
+ /* SIGN byte */
+ if (i < 0)
+ {
+ AH->WriteBytePtr(AH, 1);
+ i = -i;
+ }
+ else
+ AH->WriteBytePtr(AH, 0);
+
+ for (b = 0; b < AH->intSize; b++)
+ {
+ AH->WriteBytePtr(AH, i & 0xFF);
+ i >>= 8;
+ }
+
+ return AH->intSize + 1;
+}
+
+int
+ReadInt(ArchiveHandle *AH)
+{
+ int res = 0;
+ int bv,
+ b;
+ int sign = 0; /* Default positive */
+ int bitShift = 0;
+
+ if (AH->version > K_VERS_1_0)
+ /* Read a sign byte */
+ sign = AH->ReadBytePtr(AH);
+
+ for (b = 0; b < AH->intSize; b++)
+ {
+ bv = AH->ReadBytePtr(AH) & 0xFF;
+ if (bv != 0)
+ res = res + (bv << bitShift);
+ bitShift += 8;
+ }
+
+ if (sign)
+ res = -res;
+
+ return res;
+}
+
+size_t
+WriteStr(ArchiveHandle *AH, const char *c)
+{
+ size_t res;
+
+ if (c)
+ {
+ int len = strlen(c);
+
+ res = WriteInt(AH, len);
+ AH->WriteBufPtr(AH, c, len);
+ res += len;
+ }
+ else
+ res = WriteInt(AH, -1);
+
+ return res;
+}
+
+char *
+ReadStr(ArchiveHandle *AH)
+{
+ char *buf;
+ int l;
+
+ l = ReadInt(AH);
+ if (l < 0)
+ buf = NULL;
+ else
+ {
+ buf = (char *) pg_malloc(l + 1);
+ AH->ReadBufPtr(AH, (void *) buf, l);
+
+ buf[l] = '\0';
+ }
+
+ return buf;
+}
+
+static int
+_discoverArchiveFormat(ArchiveHandle *AH)
+{
+ FILE *fh;
+ char sig[6]; /* More than enough */
+ size_t cnt;
+ int wantClose = 0;
+
+ pg_log_debug("attempting to ascertain archive format");
+
+ if (AH->lookahead)
+ free(AH->lookahead);
+
+ AH->readHeader = 0;
+ AH->lookaheadSize = 512;
+ AH->lookahead = pg_malloc0(512);
+ AH->lookaheadLen = 0;
+ AH->lookaheadPos = 0;
+
+ if (AH->fSpec)
+ {
+ struct stat st;
+
+ wantClose = 1;
+
+ /*
+ * Check if the specified archive is a directory. If so, check if
+ * there's a "toc.dat" (or "toc.dat.gz") file in it.
+ */
+ if (stat(AH->fSpec, &st) == 0 && S_ISDIR(st.st_mode))
+ {
+ char buf[MAXPGPATH];
+
+ if (snprintf(buf, MAXPGPATH, "%s/toc.dat", AH->fSpec) >= MAXPGPATH)
+ fatal("directory name too long: \"%s\"",
+ AH->fSpec);
+ if (stat(buf, &st) == 0 && S_ISREG(st.st_mode))
+ {
+ AH->format = archDirectory;
+ return AH->format;
+ }
+
+#ifdef HAVE_LIBZ
+ if (snprintf(buf, MAXPGPATH, "%s/toc.dat.gz", AH->fSpec) >= MAXPGPATH)
+ fatal("directory name too long: \"%s\"",
+ AH->fSpec);
+ if (stat(buf, &st) == 0 && S_ISREG(st.st_mode))
+ {
+ AH->format = archDirectory;
+ return AH->format;
+ }
+#endif
+ fatal("directory \"%s\" does not appear to be a valid archive (\"toc.dat\" does not exist)",
+ AH->fSpec);
+ fh = NULL; /* keep compiler quiet */
+ }
+ else
+ {
+ fh = fopen(AH->fSpec, PG_BINARY_R);
+ if (!fh)
+ fatal("could not open input file \"%s\": %m", AH->fSpec);
+ }
+ }
+ else
+ {
+ fh = stdin;
+ if (!fh)
+ fatal("could not open input file: %m");
+ }
+
+ if ((cnt = fread(sig, 1, 5, fh)) != 5)
+ {
+ if (ferror(fh))
+ fatal("could not read input file: %m");
+ else
+ fatal("input file is too short (read %lu, expected 5)",
+ (unsigned long) cnt);
+ }
+
+ /* Save it, just in case we need it later */
+ memcpy(&AH->lookahead[0], sig, 5);
+ AH->lookaheadLen = 5;
+
+ if (strncmp(sig, "PGDMP", 5) == 0)
+ {
+ /* It's custom format, stop here */
+ AH->format = archCustom;
+ AH->readHeader = 1;
+ }
+ else
+ {
+ /*
+ * *Maybe* we have a tar archive format file or a text dump ... So,
+ * read first 512 byte header...
+ */
+ cnt = fread(&AH->lookahead[AH->lookaheadLen], 1, 512 - AH->lookaheadLen, fh);
+ /* read failure is checked below */
+ AH->lookaheadLen += cnt;
+
+ if (AH->lookaheadLen >= strlen(TEXT_DUMPALL_HEADER) &&
+ (strncmp(AH->lookahead, TEXT_DUMP_HEADER, strlen(TEXT_DUMP_HEADER)) == 0 ||
+ strncmp(AH->lookahead, TEXT_DUMPALL_HEADER, strlen(TEXT_DUMPALL_HEADER)) == 0))
+ {
+ /*
+ * looks like it's probably a text format dump. so suggest they
+ * try psql
+ */
+ fatal("input file appears to be a text format dump. Please use psql.");
+ }
+
+ if (AH->lookaheadLen != 512)
+ {
+ if (feof(fh))
+ fatal("input file does not appear to be a valid archive (too short?)");
+ else
+ READ_ERROR_EXIT(fh);
+ }
+
+ if (!isValidTarHeader(AH->lookahead))
+ fatal("input file does not appear to be a valid archive");
+
+ AH->format = archTar;
+ }
+
+ /* Close the file if we opened it */
+ if (wantClose)
+ {
+ if (fclose(fh) != 0)
+ fatal("could not close input file: %m");
+ /* Forget lookahead, since we'll re-read header after re-opening */
+ AH->readHeader = 0;
+ AH->lookaheadLen = 0;
+ }
+
+ return AH->format;
+}
+
+
+/*
+ * Allocate an archive handle
+ */
+static ArchiveHandle *
+_allocAH(const char *FileSpec, const ArchiveFormat fmt,
+ const int compression, bool dosync, ArchiveMode mode,
+ SetupWorkerPtrType setupWorkerPtr)
+{
+ ArchiveHandle *AH;
+
+ pg_log_debug("allocating AH for %s, format %d",
+ FileSpec ? FileSpec : "(stdio)", fmt);
+
+ AH = (ArchiveHandle *) pg_malloc0(sizeof(ArchiveHandle));
+
+ AH->version = K_VERS_SELF;
+
+ /* initialize for backwards compatible string processing */
+ AH->public.encoding = 0; /* PG_SQL_ASCII */
+ AH->public.std_strings = false;
+
+ /* sql error handling */
+ AH->public.exit_on_error = true;
+ AH->public.n_errors = 0;
+
+ AH->archiveDumpVersion = PG_VERSION;
+
+ AH->createDate = time(NULL);
+
+ AH->intSize = sizeof(int);
+ AH->offSize = sizeof(pgoff_t);
+ if (FileSpec)
+ {
+ AH->fSpec = pg_strdup(FileSpec);
+
+ /*
+ * Not used; maybe later....
+ *
+ * AH->workDir = pg_strdup(FileSpec); for(i=strlen(FileSpec) ; i > 0 ;
+ * i--) if (AH->workDir[i-1] == '/')
+ */
+ }
+ else
+ AH->fSpec = NULL;
+
+ AH->currUser = NULL; /* unknown */
+ AH->currSchema = NULL; /* ditto */
+ AH->currTablespace = NULL; /* ditto */
+ AH->currTableAm = NULL; /* ditto */
+
+ AH->toc = (TocEntry *) pg_malloc0(sizeof(TocEntry));
+
+ AH->toc->next = AH->toc;
+ AH->toc->prev = AH->toc;
+
+ AH->mode = mode;
+ AH->compression = compression;
+ AH->dosync = dosync;
+
+ memset(&(AH->sqlparse), 0, sizeof(AH->sqlparse));
+
+ /* Open stdout with no compression for AH output handle */
+ AH->gzOut = 0;
+ AH->OF = stdout;
+
+ /*
+ * On Windows, we need to use binary mode to read/write non-text files,
+ * which include all archive formats as well as compressed plain text.
+ * Force stdin/stdout into binary mode if that is what we are using.
+ */
+#ifdef WIN32
+ if ((fmt != archNull || compression != 0) &&
+ (AH->fSpec == NULL || strcmp(AH->fSpec, "") == 0))
+ {
+ if (mode == archModeWrite)
+ _setmode(fileno(stdout), O_BINARY);
+ else
+ _setmode(fileno(stdin), O_BINARY);
+ }
+#endif
+
+ AH->SetupWorkerPtr = setupWorkerPtr;
+
+ if (fmt == archUnknown)
+ AH->format = _discoverArchiveFormat(AH);
+ else
+ AH->format = fmt;
+
+ switch (AH->format)
+ {
+ case archCustom:
+ InitArchiveFmt_Custom(AH);
+ break;
+
+ case archNull:
+ InitArchiveFmt_Null(AH);
+ break;
+
+ case archDirectory:
+ InitArchiveFmt_Directory(AH);
+ break;
+
+ case archTar:
+ InitArchiveFmt_Tar(AH);
+ break;
+
+ default:
+ fatal("unrecognized file format \"%d\"", fmt);
+ }
+
+ return AH;
+}
+
+/*
+ * Write out all data (tables & blobs)
+ */
+void
+WriteDataChunks(ArchiveHandle *AH, ParallelState *pstate)
+{
+ TocEntry *te;
+
+ if (pstate && pstate->numWorkers > 1)
+ {
+ /*
+ * In parallel mode, this code runs in the leader process. We
+ * construct an array of candidate TEs, then sort it into decreasing
+ * size order, then dispatch each TE to a data-transfer worker. By
+ * dumping larger tables first, we avoid getting into a situation
+ * where we're down to one job and it's big, losing parallelism.
+ */
+ TocEntry **tes;
+ int ntes;
+
+ tes = (TocEntry **) pg_malloc(AH->tocCount * sizeof(TocEntry *));
+ ntes = 0;
+ for (te = AH->toc->next; te != AH->toc; te = te->next)
+ {
+ /* Consider only TEs with dataDumper functions ... */
+ if (!te->dataDumper)
+ continue;
+ /* ... and ignore ones not enabled for dump */
+ if ((te->reqs & REQ_DATA) == 0)
+ continue;
+
+ tes[ntes++] = te;
+ }
+
+ if (ntes > 1)
+ qsort((void *) tes, ntes, sizeof(TocEntry *),
+ TocEntrySizeCompare);
+
+ for (int i = 0; i < ntes; i++)
+ DispatchJobForTocEntry(AH, pstate, tes[i], ACT_DUMP,
+ mark_dump_job_done, NULL);
+
+ pg_free(tes);
+
+ /* Now wait for workers to finish. */
+ WaitForWorkers(AH, pstate, WFW_ALL_IDLE);
+ }
+ else
+ {
+ /* Non-parallel mode: just dump all candidate TEs sequentially. */
+ for (te = AH->toc->next; te != AH->toc; te = te->next)
+ {
+ /* Must have same filter conditions as above */
+ if (!te->dataDumper)
+ continue;
+ if ((te->reqs & REQ_DATA) == 0)
+ continue;
+
+ WriteDataChunksForTocEntry(AH, te);
+ }
+ }
+}
+
+
+/*
+ * Callback function that's invoked in the leader process after a step has
+ * been parallel dumped.
+ *
+ * We don't need to do anything except check for worker failure.
+ */
+static void
+mark_dump_job_done(ArchiveHandle *AH,
+ TocEntry *te,
+ int status,
+ void *callback_data)
+{
+ pg_log_info("finished item %d %s %s",
+ te->dumpId, te->desc, te->tag);
+
+ if (status != 0)
+ fatal("worker process failed: exit code %d",
+ status);
+}
+
+
+void
+WriteDataChunksForTocEntry(ArchiveHandle *AH, TocEntry *te)
+{
+ StartDataPtrType startPtr;
+ EndDataPtrType endPtr;
+
+ AH->currToc = te;
+
+ if (strcmp(te->desc, "BLOBS") == 0)
+ {
+ startPtr = AH->StartBlobsPtr;
+ endPtr = AH->EndBlobsPtr;
+ }
+ else
+ {
+ startPtr = AH->StartDataPtr;
+ endPtr = AH->EndDataPtr;
+ }
+
+ if (startPtr != NULL)
+ (*startPtr) (AH, te);
+
+ /*
+ * The user-provided DataDumper routine needs to call AH->WriteData
+ */
+ te->dataDumper((Archive *) AH, te->dataDumperArg);
+
+ if (endPtr != NULL)
+ (*endPtr) (AH, te);
+
+ AH->currToc = NULL;
+}
+
+void
+WriteToc(ArchiveHandle *AH)
+{
+ TocEntry *te;
+ char workbuf[32];
+ int tocCount;
+ int i;
+
+ /* count entries that will actually be dumped */
+ tocCount = 0;
+ for (te = AH->toc->next; te != AH->toc; te = te->next)
+ {
+ if ((te->reqs & (REQ_SCHEMA | REQ_DATA | REQ_SPECIAL)) != 0)
+ tocCount++;
+ }
+
+ /* printf("%d TOC Entries to save\n", tocCount); */
+
+ WriteInt(AH, tocCount);
+
+ for (te = AH->toc->next; te != AH->toc; te = te->next)
+ {
+ if ((te->reqs & (REQ_SCHEMA | REQ_DATA | REQ_SPECIAL)) == 0)
+ continue;
+
+ WriteInt(AH, te->dumpId);
+ WriteInt(AH, te->dataDumper ? 1 : 0);
+
+ /* OID is recorded as a string for historical reasons */
+ sprintf(workbuf, "%u", te->catalogId.tableoid);
+ WriteStr(AH, workbuf);
+ sprintf(workbuf, "%u", te->catalogId.oid);
+ WriteStr(AH, workbuf);
+
+ WriteStr(AH, te->tag);
+ WriteStr(AH, te->desc);
+ WriteInt(AH, te->section);
+ WriteStr(AH, te->defn);
+ WriteStr(AH, te->dropStmt);
+ WriteStr(AH, te->copyStmt);
+ WriteStr(AH, te->namespace);
+ WriteStr(AH, te->tablespace);
+ WriteStr(AH, te->tableam);
+ WriteStr(AH, te->owner);
+ WriteStr(AH, "false");
+
+ /* Dump list of dependencies */
+ for (i = 0; i < te->nDeps; i++)
+ {
+ sprintf(workbuf, "%d", te->dependencies[i]);
+ WriteStr(AH, workbuf);
+ }
+ WriteStr(AH, NULL); /* Terminate List */
+
+ if (AH->WriteExtraTocPtr)
+ AH->WriteExtraTocPtr(AH, te);
+ }
+}
+
+void
+ReadToc(ArchiveHandle *AH)
+{
+ int i;
+ char *tmp;
+ DumpId *deps;
+ int depIdx;
+ int depSize;
+ TocEntry *te;
+
+ AH->tocCount = ReadInt(AH);
+ AH->maxDumpId = 0;
+
+ for (i = 0; i < AH->tocCount; i++)
+ {
+ te = (TocEntry *) pg_malloc0(sizeof(TocEntry));
+ te->dumpId = ReadInt(AH);
+
+ if (te->dumpId > AH->maxDumpId)
+ AH->maxDumpId = te->dumpId;
+
+ /* Sanity check */
+ if (te->dumpId <= 0)
+ fatal("entry ID %d out of range -- perhaps a corrupt TOC",
+ te->dumpId);
+
+ te->hadDumper = ReadInt(AH);
+
+ if (AH->version >= K_VERS_1_8)
+ {
+ tmp = ReadStr(AH);
+ sscanf(tmp, "%u", &te->catalogId.tableoid);
+ free(tmp);
+ }
+ else
+ te->catalogId.tableoid = InvalidOid;
+ tmp = ReadStr(AH);
+ sscanf(tmp, "%u", &te->catalogId.oid);
+ free(tmp);
+
+ te->tag = ReadStr(AH);
+ te->desc = ReadStr(AH);
+
+ if (AH->version >= K_VERS_1_11)
+ {
+ te->section = ReadInt(AH);
+ }
+ else
+ {
+ /*
+ * Rules for pre-8.4 archives wherein pg_dump hasn't classified
+ * the entries into sections. This list need not cover entry
+ * types added later than 8.4.
+ */
+ if (strcmp(te->desc, "COMMENT") == 0 ||
+ strcmp(te->desc, "ACL") == 0 ||
+ strcmp(te->desc, "ACL LANGUAGE") == 0)
+ te->section = SECTION_NONE;
+ else if (strcmp(te->desc, "TABLE DATA") == 0 ||
+ strcmp(te->desc, "BLOBS") == 0 ||
+ strcmp(te->desc, "BLOB COMMENTS") == 0)
+ te->section = SECTION_DATA;
+ else if (strcmp(te->desc, "CONSTRAINT") == 0 ||
+ strcmp(te->desc, "CHECK CONSTRAINT") == 0 ||
+ strcmp(te->desc, "FK CONSTRAINT") == 0 ||
+ strcmp(te->desc, "INDEX") == 0 ||
+ strcmp(te->desc, "RULE") == 0 ||
+ strcmp(te->desc, "TRIGGER") == 0)
+ te->section = SECTION_POST_DATA;
+ else
+ te->section = SECTION_PRE_DATA;
+ }
+
+ te->defn = ReadStr(AH);
+ te->dropStmt = ReadStr(AH);
+
+ if (AH->version >= K_VERS_1_3)
+ te->copyStmt = ReadStr(AH);
+
+ if (AH->version >= K_VERS_1_6)
+ te->namespace = ReadStr(AH);
+
+ if (AH->version >= K_VERS_1_10)
+ te->tablespace = ReadStr(AH);
+
+ if (AH->version >= K_VERS_1_14)
+ te->tableam = ReadStr(AH);
+
+ te->owner = ReadStr(AH);
+ if (AH->version < K_VERS_1_9 || strcmp(ReadStr(AH), "true") == 0)
+ pg_log_warning("restoring tables WITH OIDS is not supported anymore");
+
+ /* Read TOC entry dependencies */
+ if (AH->version >= K_VERS_1_5)
+ {
+ depSize = 100;
+ deps = (DumpId *) pg_malloc(sizeof(DumpId) * depSize);
+ depIdx = 0;
+ for (;;)
+ {
+ tmp = ReadStr(AH);
+ if (!tmp)
+ break; /* end of list */
+ if (depIdx >= depSize)
+ {
+ depSize *= 2;
+ deps = (DumpId *) pg_realloc(deps, sizeof(DumpId) * depSize);
+ }
+ sscanf(tmp, "%d", &deps[depIdx]);
+ free(tmp);
+ depIdx++;
+ }
+
+ if (depIdx > 0) /* We have a non-null entry */
+ {
+ deps = (DumpId *) pg_realloc(deps, sizeof(DumpId) * depIdx);
+ te->dependencies = deps;
+ te->nDeps = depIdx;
+ }
+ else
+ {
+ free(deps);
+ te->dependencies = NULL;
+ te->nDeps = 0;
+ }
+ }
+ else
+ {
+ te->dependencies = NULL;
+ te->nDeps = 0;
+ }
+ te->dataLength = 0;
+
+ if (AH->ReadExtraTocPtr)
+ AH->ReadExtraTocPtr(AH, te);
+
+ pg_log_debug("read TOC entry %d (ID %d) for %s %s",
+ i, te->dumpId, te->desc, te->tag);
+
+ /* link completed entry into TOC circular list */
+ te->prev = AH->toc->prev;
+ AH->toc->prev->next = te;
+ AH->toc->prev = te;
+ te->next = AH->toc;
+
+ /* special processing immediately upon read for some items */
+ if (strcmp(te->desc, "ENCODING") == 0)
+ processEncodingEntry(AH, te);
+ else if (strcmp(te->desc, "STDSTRINGS") == 0)
+ processStdStringsEntry(AH, te);
+ else if (strcmp(te->desc, "SEARCHPATH") == 0)
+ processSearchPathEntry(AH, te);
+ }
+}
+
+static void
+processEncodingEntry(ArchiveHandle *AH, TocEntry *te)
+{
+ /* te->defn should have the form SET client_encoding = 'foo'; */
+ char *defn = pg_strdup(te->defn);
+ char *ptr1;
+ char *ptr2 = NULL;
+ int encoding;
+
+ ptr1 = strchr(defn, '\'');
+ if (ptr1)
+ ptr2 = strchr(++ptr1, '\'');
+ if (ptr2)
+ {
+ *ptr2 = '\0';
+ encoding = pg_char_to_encoding(ptr1);
+ if (encoding < 0)
+ fatal("unrecognized encoding \"%s\"",
+ ptr1);
+ AH->public.encoding = encoding;
+ }
+ else
+ fatal("invalid ENCODING item: %s",
+ te->defn);
+
+ free(defn);
+}
+
+static void
+processStdStringsEntry(ArchiveHandle *AH, TocEntry *te)
+{
+ /* te->defn should have the form SET standard_conforming_strings = 'x'; */
+ char *ptr1;
+
+ ptr1 = strchr(te->defn, '\'');
+ if (ptr1 && strncmp(ptr1, "'on'", 4) == 0)
+ AH->public.std_strings = true;
+ else if (ptr1 && strncmp(ptr1, "'off'", 5) == 0)
+ AH->public.std_strings = false;
+ else
+ fatal("invalid STDSTRINGS item: %s",
+ te->defn);
+}
+
+static void
+processSearchPathEntry(ArchiveHandle *AH, TocEntry *te)
+{
+ /*
+ * te->defn should contain a command to set search_path. We just copy it
+ * verbatim for use later.
+ */
+ AH->public.searchpath = pg_strdup(te->defn);
+}
+
+static void
+StrictNamesCheck(RestoreOptions *ropt)
+{
+ const char *missing_name;
+
+ Assert(ropt->strict_names);
+
+ if (ropt->schemaNames.head != NULL)
+ {
+ missing_name = simple_string_list_not_touched(&ropt->schemaNames);
+ if (missing_name != NULL)
+ fatal("schema \"%s\" not found", missing_name);
+ }
+
+ if (ropt->tableNames.head != NULL)
+ {
+ missing_name = simple_string_list_not_touched(&ropt->tableNames);
+ if (missing_name != NULL)
+ fatal("table \"%s\" not found", missing_name);
+ }
+
+ if (ropt->indexNames.head != NULL)
+ {
+ missing_name = simple_string_list_not_touched(&ropt->indexNames);
+ if (missing_name != NULL)
+ fatal("index \"%s\" not found", missing_name);
+ }
+
+ if (ropt->functionNames.head != NULL)
+ {
+ missing_name = simple_string_list_not_touched(&ropt->functionNames);
+ if (missing_name != NULL)
+ fatal("function \"%s\" not found", missing_name);
+ }
+
+ if (ropt->triggerNames.head != NULL)
+ {
+ missing_name = simple_string_list_not_touched(&ropt->triggerNames);
+ if (missing_name != NULL)
+ fatal("trigger \"%s\" not found", missing_name);
+ }
+}
+
+/*
+ * Determine whether we want to restore this TOC entry.
+ *
+ * Returns 0 if entry should be skipped, or some combination of the
+ * REQ_SCHEMA and REQ_DATA bits if we want to restore schema and/or data
+ * portions of this TOC entry, or REQ_SPECIAL if it's a special entry.
+ */
+static int
+_tocEntryRequired(TocEntry *te, teSection curSection, ArchiveHandle *AH)
+{
+ int res = REQ_SCHEMA | REQ_DATA;
+ RestoreOptions *ropt = AH->public.ropt;
+
+ /* These items are treated specially */
+ if (strcmp(te->desc, "ENCODING") == 0 ||
+ strcmp(te->desc, "STDSTRINGS") == 0 ||
+ strcmp(te->desc, "SEARCHPATH") == 0)
+ return REQ_SPECIAL;
+
+ /*
+ * DATABASE and DATABASE PROPERTIES also have a special rule: they are
+ * restored in createDB mode, and not restored otherwise, independently of
+ * all else.
+ */
+ if (strcmp(te->desc, "DATABASE") == 0 ||
+ strcmp(te->desc, "DATABASE PROPERTIES") == 0)
+ {
+ if (ropt->createDB)
+ return REQ_SCHEMA;
+ else
+ return 0;
+ }
+
+ /*
+ * Process exclusions that affect certain classes of TOC entries.
+ */
+
+ /* If it's an ACL, maybe ignore it */
+ if (ropt->aclsSkip && _tocEntryIsACL(te))
+ return 0;
+
+ /* If it's a comment, maybe ignore it */
+ if (ropt->no_comments && strcmp(te->desc, "COMMENT") == 0)
+ return 0;
+
+ /*
+ * If it's a publication or a table part of a publication, maybe ignore
+ * it.
+ */
+ if (ropt->no_publications &&
+ (strcmp(te->desc, "PUBLICATION") == 0 ||
+ strcmp(te->desc, "PUBLICATION TABLE") == 0))
+ return 0;
+
+ /* If it's a security label, maybe ignore it */
+ if (ropt->no_security_labels && strcmp(te->desc, "SECURITY LABEL") == 0)
+ return 0;
+
+ /* If it's a subscription, maybe ignore it */
+ if (ropt->no_subscriptions && strcmp(te->desc, "SUBSCRIPTION") == 0)
+ return 0;
+
+ /* Ignore it if section is not to be dumped/restored */
+ switch (curSection)
+ {
+ case SECTION_PRE_DATA:
+ if (!(ropt->dumpSections & DUMP_PRE_DATA))
+ return 0;
+ break;
+ case SECTION_DATA:
+ if (!(ropt->dumpSections & DUMP_DATA))
+ return 0;
+ break;
+ case SECTION_POST_DATA:
+ if (!(ropt->dumpSections & DUMP_POST_DATA))
+ return 0;
+ break;
+ default:
+ /* shouldn't get here, really, but ignore it */
+ return 0;
+ }
+
+ /* Ignore it if rejected by idWanted[] (cf. SortTocFromFile) */
+ if (ropt->idWanted && !ropt->idWanted[te->dumpId - 1])
+ return 0;
+
+ /*
+ * Check options for selective dump/restore.
+ */
+ if (strcmp(te->desc, "ACL") == 0 ||
+ strcmp(te->desc, "COMMENT") == 0 ||
+ strcmp(te->desc, "SECURITY LABEL") == 0)
+ {
+ /* Database properties react to createDB, not selectivity options. */
+ if (strncmp(te->tag, "DATABASE ", 9) == 0)
+ {
+ if (!ropt->createDB)
+ return 0;
+ }
+ else if (ropt->schemaNames.head != NULL ||
+ ropt->schemaExcludeNames.head != NULL ||
+ ropt->selTypes)
+ {
+ /*
+ * In a selective dump/restore, we want to restore these dependent
+ * TOC entry types only if their parent object is being restored.
+ * Without selectivity options, we let through everything in the
+ * archive. Note there may be such entries with no parent, eg
+ * non-default ACLs for built-in objects.
+ *
+ * This code depends on the parent having been marked already,
+ * which should be the case; if it isn't, perhaps due to
+ * SortTocFromFile rearrangement, skipping the dependent entry
+ * seems prudent anyway.
+ *
+ * Ideally we'd handle, eg, table CHECK constraints this way too.
+ * But it's hard to tell which of their dependencies is the one to
+ * consult.
+ */
+ if (te->nDeps != 1 ||
+ TocIDRequired(AH, te->dependencies[0]) == 0)
+ return 0;
+ }
+ }
+ else
+ {
+ /* Apply selective-restore rules for standalone TOC entries. */
+ if (ropt->schemaNames.head != NULL)
+ {
+ /* If no namespace is specified, it means all. */
+ if (!te->namespace)
+ return 0;
+ if (!simple_string_list_member(&ropt->schemaNames, te->namespace))
+ return 0;
+ }
+
+ if (ropt->schemaExcludeNames.head != NULL &&
+ te->namespace &&
+ simple_string_list_member(&ropt->schemaExcludeNames, te->namespace))
+ return 0;
+
+ if (ropt->selTypes)
+ {
+ if (strcmp(te->desc, "TABLE") == 0 ||
+ strcmp(te->desc, "TABLE DATA") == 0 ||
+ strcmp(te->desc, "VIEW") == 0 ||
+ strcmp(te->desc, "FOREIGN TABLE") == 0 ||
+ strcmp(te->desc, "MATERIALIZED VIEW") == 0 ||
+ strcmp(te->desc, "MATERIALIZED VIEW DATA") == 0 ||
+ strcmp(te->desc, "SEQUENCE") == 0 ||
+ strcmp(te->desc, "SEQUENCE SET") == 0)
+ {
+ if (!ropt->selTable)
+ return 0;
+ if (ropt->tableNames.head != NULL &&
+ !simple_string_list_member(&ropt->tableNames, te->tag))
+ return 0;
+ }
+ else if (strcmp(te->desc, "INDEX") == 0)
+ {
+ if (!ropt->selIndex)
+ return 0;
+ if (ropt->indexNames.head != NULL &&
+ !simple_string_list_member(&ropt->indexNames, te->tag))
+ return 0;
+ }
+ else if (strcmp(te->desc, "FUNCTION") == 0 ||
+ strcmp(te->desc, "AGGREGATE") == 0 ||
+ strcmp(te->desc, "PROCEDURE") == 0)
+ {
+ if (!ropt->selFunction)
+ return 0;
+ if (ropt->functionNames.head != NULL &&
+ !simple_string_list_member(&ropt->functionNames, te->tag))
+ return 0;
+ }
+ else if (strcmp(te->desc, "TRIGGER") == 0)
+ {
+ if (!ropt->selTrigger)
+ return 0;
+ if (ropt->triggerNames.head != NULL &&
+ !simple_string_list_member(&ropt->triggerNames, te->tag))
+ return 0;
+ }
+ else
+ return 0;
+ }
+ }
+
+ /*
+ * Determine whether the TOC entry contains schema and/or data components,
+ * and mask off inapplicable REQ bits. If it had a dataDumper, assume
+ * it's both schema and data. Otherwise it's probably schema-only, but
+ * there are exceptions.
+ */
+ if (!te->hadDumper)
+ {
+ /*
+ * Special Case: If 'SEQUENCE SET' or anything to do with BLOBs, then
+ * it is considered a data entry. We don't need to check for the
+ * BLOBS entry or old-style BLOB COMMENTS, because they will have
+ * hadDumper = true ... but we do need to check new-style BLOB ACLs,
+ * comments, etc.
+ */
+ if (strcmp(te->desc, "SEQUENCE SET") == 0 ||
+ strcmp(te->desc, "BLOB") == 0 ||
+ (strcmp(te->desc, "ACL") == 0 &&
+ strncmp(te->tag, "LARGE OBJECT ", 13) == 0) ||
+ (strcmp(te->desc, "COMMENT") == 0 &&
+ strncmp(te->tag, "LARGE OBJECT ", 13) == 0) ||
+ (strcmp(te->desc, "SECURITY LABEL") == 0 &&
+ strncmp(te->tag, "LARGE OBJECT ", 13) == 0))
+ res = res & REQ_DATA;
+ else
+ res = res & ~REQ_DATA;
+ }
+
+ /* If there's no definition command, there's no schema component */
+ if (!te->defn || !te->defn[0])
+ res = res & ~REQ_SCHEMA;
+
+ /*
+ * Special case: <Init> type with <Max OID> tag; this is obsolete and we
+ * always ignore it.
+ */
+ if ((strcmp(te->desc, "<Init>") == 0) && (strcmp(te->tag, "Max OID") == 0))
+ return 0;
+
+ /* Mask it if we only want schema */
+ if (ropt->schemaOnly)
+ {
+ /*
+ * The sequence_data option overrides schemaOnly for SEQUENCE SET.
+ *
+ * In binary-upgrade mode, even with schemaOnly set, we do not mask
+ * out large objects. (Only large object definitions, comments and
+ * other metadata should be generated in binary-upgrade mode, not the
+ * actual data, but that need not concern us here.)
+ */
+ if (!(ropt->sequence_data && strcmp(te->desc, "SEQUENCE SET") == 0) &&
+ !(ropt->binary_upgrade &&
+ (strcmp(te->desc, "BLOB") == 0 ||
+ (strcmp(te->desc, "ACL") == 0 &&
+ strncmp(te->tag, "LARGE OBJECT ", 13) == 0) ||
+ (strcmp(te->desc, "COMMENT") == 0 &&
+ strncmp(te->tag, "LARGE OBJECT ", 13) == 0) ||
+ (strcmp(te->desc, "SECURITY LABEL") == 0 &&
+ strncmp(te->tag, "LARGE OBJECT ", 13) == 0))))
+ res = res & REQ_SCHEMA;
+ }
+
+ /* Mask it if we only want data */
+ if (ropt->dataOnly)
+ res = res & REQ_DATA;
+
+ return res;
+}
+
+/*
+ * Identify which pass we should restore this TOC entry in.
+ *
+ * See notes with the RestorePass typedef in pg_backup_archiver.h.
+ */
+static RestorePass
+_tocEntryRestorePass(TocEntry *te)
+{
+ /* "ACL LANGUAGE" was a crock emitted only in PG 7.4 */
+ if (strcmp(te->desc, "ACL") == 0 ||
+ strcmp(te->desc, "ACL LANGUAGE") == 0 ||
+ strcmp(te->desc, "DEFAULT ACL") == 0)
+ return RESTORE_PASS_ACL;
+ if (strcmp(te->desc, "EVENT TRIGGER") == 0 ||
+ strcmp(te->desc, "MATERIALIZED VIEW DATA") == 0)
+ return RESTORE_PASS_POST_ACL;
+
+ /*
+ * Comments need to be emitted in the same pass as their parent objects.
+ * ACLs haven't got comments, and neither do matview data objects, but
+ * event triggers do. (Fortunately, event triggers haven't got ACLs, or
+ * we'd need yet another weird special case.)
+ */
+ if (strcmp(te->desc, "COMMENT") == 0 &&
+ strncmp(te->tag, "EVENT TRIGGER ", 14) == 0)
+ return RESTORE_PASS_POST_ACL;
+
+ /* All else can be handled in the main pass. */
+ return RESTORE_PASS_MAIN;
+}
+
+/*
+ * Identify TOC entries that are ACLs.
+ *
+ * Note: it seems worth duplicating some code here to avoid a hard-wired
+ * assumption that these are exactly the same entries that we restore during
+ * the RESTORE_PASS_ACL phase.
+ */
+static bool
+_tocEntryIsACL(TocEntry *te)
+{
+ /* "ACL LANGUAGE" was a crock emitted only in PG 7.4 */
+ if (strcmp(te->desc, "ACL") == 0 ||
+ strcmp(te->desc, "ACL LANGUAGE") == 0 ||
+ strcmp(te->desc, "DEFAULT ACL") == 0)
+ return true;
+ return false;
+}
+
+/*
+ * Issue SET commands for parameters that we want to have set the same way
+ * at all times during execution of a restore script.
+ */
+static void
+_doSetFixedOutputState(ArchiveHandle *AH)
+{
+ RestoreOptions *ropt = AH->public.ropt;
+
+ /*
+ * Disable timeouts to allow for slow commands, idle parallel workers, etc
+ */
+ ahprintf(AH, "SET statement_timeout = 0;\n");
+ ahprintf(AH, "SET lock_timeout = 0;\n");
+ ahprintf(AH, "SET idle_in_transaction_session_timeout = 0;\n");
+
+ /* Select the correct character set encoding */
+ ahprintf(AH, "SET client_encoding = '%s';\n",
+ pg_encoding_to_char(AH->public.encoding));
+
+ /* Select the correct string literal syntax */
+ ahprintf(AH, "SET standard_conforming_strings = %s;\n",
+ AH->public.std_strings ? "on" : "off");
+
+ /* Select the role to be used during restore */
+ if (ropt && ropt->use_role)
+ ahprintf(AH, "SET ROLE %s;\n", fmtId(ropt->use_role));
+
+ /* Select the dump-time search_path */
+ if (AH->public.searchpath)
+ ahprintf(AH, "%s", AH->public.searchpath);
+
+ /* Make sure function checking is disabled */
+ ahprintf(AH, "SET check_function_bodies = false;\n");
+
+ /* Ensure that all valid XML data will be accepted */
+ ahprintf(AH, "SET xmloption = content;\n");
+
+ /* Avoid annoying notices etc */
+ ahprintf(AH, "SET client_min_messages = warning;\n");
+ if (!AH->public.std_strings)
+ ahprintf(AH, "SET escape_string_warning = off;\n");
+
+ /* Adjust row-security state */
+ if (ropt && ropt->enable_row_security)
+ ahprintf(AH, "SET row_security = on;\n");
+ else
+ ahprintf(AH, "SET row_security = off;\n");
+
+ ahprintf(AH, "\n");
+}
+
+/*
+ * Issue a SET SESSION AUTHORIZATION command. Caller is responsible
+ * for updating state if appropriate. If user is NULL or an empty string,
+ * the specification DEFAULT will be used.
+ */
+static void
+_doSetSessionAuth(ArchiveHandle *AH, const char *user)
+{
+ PQExpBuffer cmd = createPQExpBuffer();
+
+ appendPQExpBufferStr(cmd, "SET SESSION AUTHORIZATION ");
+
+ /*
+ * SQL requires a string literal here. Might as well be correct.
+ */
+ if (user && *user)
+ appendStringLiteralAHX(cmd, user, AH);
+ else
+ appendPQExpBufferStr(cmd, "DEFAULT");
+ appendPQExpBufferChar(cmd, ';');
+
+ if (RestoringToDB(AH))
+ {
+ PGresult *res;
+
+ res = PQexec(AH->connection, cmd->data);
+
+ if (!res || PQresultStatus(res) != PGRES_COMMAND_OK)
+ /* NOT warn_or_exit_horribly... use -O instead to skip this. */
+ fatal("could not set session user to \"%s\": %s",
+ user, PQerrorMessage(AH->connection));
+
+ PQclear(res);
+ }
+ else
+ ahprintf(AH, "%s\n\n", cmd->data);
+
+ destroyPQExpBuffer(cmd);
+}
+
+
+/*
+ * Issue the commands to connect to the specified database.
+ *
+ * If we're currently restoring right into a database, this will
+ * actually establish a connection. Otherwise it puts a \connect into
+ * the script output.
+ */
+static void
+_reconnectToDB(ArchiveHandle *AH, const char *dbname)
+{
+ if (RestoringToDB(AH))
+ ReconnectToServer(AH, dbname);
+ else
+ {
+ PQExpBufferData connectbuf;
+
+ initPQExpBuffer(&connectbuf);
+ appendPsqlMetaConnect(&connectbuf, dbname);
+ ahprintf(AH, "%s\n", connectbuf.data);
+ termPQExpBuffer(&connectbuf);
+ }
+
+ /*
+ * NOTE: currUser keeps track of what the imaginary session user in our
+ * script is. It's now effectively reset to the original userID.
+ */
+ if (AH->currUser)
+ free(AH->currUser);
+ AH->currUser = NULL;
+
+ /* don't assume we still know the output schema, tablespace, etc either */
+ if (AH->currSchema)
+ free(AH->currSchema);
+ AH->currSchema = NULL;
+ if (AH->currTablespace)
+ free(AH->currTablespace);
+ AH->currTablespace = NULL;
+
+ /* re-establish fixed state */
+ _doSetFixedOutputState(AH);
+}
+
+/*
+ * Become the specified user, and update state to avoid redundant commands
+ *
+ * NULL or empty argument is taken to mean restoring the session default
+ */
+static void
+_becomeUser(ArchiveHandle *AH, const char *user)
+{
+ if (!user)
+ user = ""; /* avoid null pointers */
+
+ if (AH->currUser && strcmp(AH->currUser, user) == 0)
+ return; /* no need to do anything */
+
+ _doSetSessionAuth(AH, user);
+
+ /*
+ * NOTE: currUser keeps track of what the imaginary session user in our
+ * script is
+ */
+ if (AH->currUser)
+ free(AH->currUser);
+ AH->currUser = pg_strdup(user);
+}
+
+/*
+ * Become the owner of the given TOC entry object. If
+ * changes in ownership are not allowed, this doesn't do anything.
+ */
+static void
+_becomeOwner(ArchiveHandle *AH, TocEntry *te)
+{
+ RestoreOptions *ropt = AH->public.ropt;
+
+ if (ropt && (ropt->noOwner || !ropt->use_setsessauth))
+ return;
+
+ _becomeUser(AH, te->owner);
+}
+
+
+/*
+ * Issue the commands to select the specified schema as the current schema
+ * in the target database.
+ */
+static void
+_selectOutputSchema(ArchiveHandle *AH, const char *schemaName)
+{
+ PQExpBuffer qry;
+
+ /*
+ * If there was a SEARCHPATH TOC entry, we're supposed to just stay with
+ * that search_path rather than switching to entry-specific paths.
+ * Otherwise, it's an old archive that will not restore correctly unless
+ * we set the search_path as it's expecting.
+ */
+ if (AH->public.searchpath)
+ return;
+
+ if (!schemaName || *schemaName == '\0' ||
+ (AH->currSchema && strcmp(AH->currSchema, schemaName) == 0))
+ return; /* no need to do anything */
+
+ qry = createPQExpBuffer();
+
+ appendPQExpBuffer(qry, "SET search_path = %s",
+ fmtId(schemaName));
+ if (strcmp(schemaName, "pg_catalog") != 0)
+ appendPQExpBufferStr(qry, ", pg_catalog");
+
+ if (RestoringToDB(AH))
+ {
+ PGresult *res;
+
+ res = PQexec(AH->connection, qry->data);
+
+ if (!res || PQresultStatus(res) != PGRES_COMMAND_OK)
+ warn_or_exit_horribly(AH,
+ "could not set search_path to \"%s\": %s",
+ schemaName, PQerrorMessage(AH->connection));
+
+ PQclear(res);
+ }
+ else
+ ahprintf(AH, "%s;\n\n", qry->data);
+
+ if (AH->currSchema)
+ free(AH->currSchema);
+ AH->currSchema = pg_strdup(schemaName);
+
+ destroyPQExpBuffer(qry);
+}
+
+/*
+ * Issue the commands to select the specified tablespace as the current one
+ * in the target database.
+ */
+static void
+_selectTablespace(ArchiveHandle *AH, const char *tablespace)
+{
+ RestoreOptions *ropt = AH->public.ropt;
+ PQExpBuffer qry;
+ const char *want,
+ *have;
+
+ /* do nothing in --no-tablespaces mode */
+ if (ropt->noTablespace)
+ return;
+
+ have = AH->currTablespace;
+ want = tablespace;
+
+ /* no need to do anything for non-tablespace object */
+ if (!want)
+ return;
+
+ if (have && strcmp(want, have) == 0)
+ return; /* no need to do anything */
+
+ qry = createPQExpBuffer();
+
+ if (strcmp(want, "") == 0)
+ {
+ /* We want the tablespace to be the database's default */
+ appendPQExpBufferStr(qry, "SET default_tablespace = ''");
+ }
+ else
+ {
+ /* We want an explicit tablespace */
+ appendPQExpBuffer(qry, "SET default_tablespace = %s", fmtId(want));
+ }
+
+ if (RestoringToDB(AH))
+ {
+ PGresult *res;
+
+ res = PQexec(AH->connection, qry->data);
+
+ if (!res || PQresultStatus(res) != PGRES_COMMAND_OK)
+ warn_or_exit_horribly(AH,
+ "could not set default_tablespace to %s: %s",
+ fmtId(want), PQerrorMessage(AH->connection));
+
+ PQclear(res);
+ }
+ else
+ ahprintf(AH, "%s;\n\n", qry->data);
+
+ if (AH->currTablespace)
+ free(AH->currTablespace);
+ AH->currTablespace = pg_strdup(want);
+
+ destroyPQExpBuffer(qry);
+}
+
+/*
+ * Set the proper default_table_access_method value for the table.
+ */
+static void
+_selectTableAccessMethod(ArchiveHandle *AH, const char *tableam)
+{
+ PQExpBuffer cmd;
+ const char *want,
+ *have;
+
+ have = AH->currTableAm;
+ want = tableam;
+
+ if (!want)
+ return;
+
+ if (have && strcmp(want, have) == 0)
+ return;
+
+ cmd = createPQExpBuffer();
+ appendPQExpBuffer(cmd, "SET default_table_access_method = %s;", fmtId(want));
+
+ if (RestoringToDB(AH))
+ {
+ PGresult *res;
+
+ res = PQexec(AH->connection, cmd->data);
+
+ if (!res || PQresultStatus(res) != PGRES_COMMAND_OK)
+ warn_or_exit_horribly(AH,
+ "could not set default_table_access_method: %s",
+ PQerrorMessage(AH->connection));
+
+ PQclear(res);
+ }
+ else
+ ahprintf(AH, "%s\n\n", cmd->data);
+
+ destroyPQExpBuffer(cmd);
+
+ AH->currTableAm = pg_strdup(want);
+}
+
+/*
+ * Extract an object description for a TOC entry, and append it to buf.
+ *
+ * This is used for ALTER ... OWNER TO.
+ */
+static void
+_getObjectDescription(PQExpBuffer buf, TocEntry *te)
+{
+ const char *type = te->desc;
+
+ /* Use ALTER TABLE for views and sequences */
+ if (strcmp(type, "VIEW") == 0 || strcmp(type, "SEQUENCE") == 0 ||
+ strcmp(type, "MATERIALIZED VIEW") == 0)
+ type = "TABLE";
+
+ /* objects that don't require special decoration */
+ if (strcmp(type, "COLLATION") == 0 ||
+ strcmp(type, "CONVERSION") == 0 ||
+ strcmp(type, "DOMAIN") == 0 ||
+ strcmp(type, "TABLE") == 0 ||
+ strcmp(type, "TYPE") == 0 ||
+ strcmp(type, "FOREIGN TABLE") == 0 ||
+ strcmp(type, "TEXT SEARCH DICTIONARY") == 0 ||
+ strcmp(type, "TEXT SEARCH CONFIGURATION") == 0 ||
+ strcmp(type, "STATISTICS") == 0 ||
+ /* non-schema-specified objects */
+ strcmp(type, "DATABASE") == 0 ||
+ strcmp(type, "PROCEDURAL LANGUAGE") == 0 ||
+ strcmp(type, "SCHEMA") == 0 ||
+ strcmp(type, "EVENT TRIGGER") == 0 ||
+ strcmp(type, "FOREIGN DATA WRAPPER") == 0 ||
+ strcmp(type, "SERVER") == 0 ||
+ strcmp(type, "PUBLICATION") == 0 ||
+ strcmp(type, "SUBSCRIPTION") == 0 ||
+ strcmp(type, "USER MAPPING") == 0)
+ {
+ appendPQExpBuffer(buf, "%s ", type);
+ if (te->namespace && *te->namespace)
+ appendPQExpBuffer(buf, "%s.", fmtId(te->namespace));
+ appendPQExpBufferStr(buf, fmtId(te->tag));
+ return;
+ }
+
+ /* BLOBs just have a name, but it's numeric so must not use fmtId */
+ if (strcmp(type, "BLOB") == 0)
+ {
+ appendPQExpBuffer(buf, "LARGE OBJECT %s", te->tag);
+ return;
+ }
+
+ /*
+ * These object types require additional decoration. Fortunately, the
+ * information needed is exactly what's in the DROP command.
+ */
+ if (strcmp(type, "AGGREGATE") == 0 ||
+ strcmp(type, "FUNCTION") == 0 ||
+ strcmp(type, "OPERATOR") == 0 ||
+ strcmp(type, "OPERATOR CLASS") == 0 ||
+ strcmp(type, "OPERATOR FAMILY") == 0 ||
+ strcmp(type, "PROCEDURE") == 0)
+ {
+ /* Chop "DROP " off the front and make a modifiable copy */
+ char *first = pg_strdup(te->dropStmt + 5);
+ char *last;
+
+ /* point to last character in string */
+ last = first + strlen(first) - 1;
+
+ /* Strip off any ';' or '\n' at the end */
+ while (last >= first && (*last == '\n' || *last == ';'))
+ last--;
+ *(last + 1) = '\0';
+
+ appendPQExpBufferStr(buf, first);
+
+ free(first);
+ return;
+ }
+
+ pg_log_warning("don't know how to set owner for object type \"%s\"",
+ type);
+}
+
+/*
+ * Emit the SQL commands to create the object represented by a TOC entry
+ *
+ * This now also includes issuing an ALTER OWNER command to restore the
+ * object's ownership, if wanted. But note that the object's permissions
+ * will remain at default, until the matching ACL TOC entry is restored.
+ */
+static void
+_printTocEntry(ArchiveHandle *AH, TocEntry *te, bool isData)
+{
+ RestoreOptions *ropt = AH->public.ropt;
+
+ /* Select owner, schema, tablespace and default AM as necessary */
+ _becomeOwner(AH, te);
+ _selectOutputSchema(AH, te->namespace);
+ _selectTablespace(AH, te->tablespace);
+ _selectTableAccessMethod(AH, te->tableam);
+
+ /* Emit header comment for item */
+ if (!AH->noTocComments)
+ {
+ const char *pfx;
+ char *sanitized_name;
+ char *sanitized_schema;
+ char *sanitized_owner;
+
+ if (isData)
+ pfx = "Data for ";
+ else
+ pfx = "";
+
+ ahprintf(AH, "--\n");
+ if (AH->public.verbose)
+ {
+ ahprintf(AH, "-- TOC entry %d (class %u OID %u)\n",
+ te->dumpId, te->catalogId.tableoid, te->catalogId.oid);
+ if (te->nDeps > 0)
+ {
+ int i;
+
+ ahprintf(AH, "-- Dependencies:");
+ for (i = 0; i < te->nDeps; i++)
+ ahprintf(AH, " %d", te->dependencies[i]);
+ ahprintf(AH, "\n");
+ }
+ }
+
+ sanitized_name = sanitize_line(te->tag, false);
+ sanitized_schema = sanitize_line(te->namespace, true);
+ sanitized_owner = sanitize_line(ropt->noOwner ? NULL : te->owner, true);
+
+ ahprintf(AH, "-- %sName: %s; Type: %s; Schema: %s; Owner: %s",
+ pfx, sanitized_name, te->desc, sanitized_schema,
+ sanitized_owner);
+
+ free(sanitized_name);
+ free(sanitized_schema);
+ free(sanitized_owner);
+
+ if (te->tablespace && strlen(te->tablespace) > 0 && !ropt->noTablespace)
+ {
+ char *sanitized_tablespace;
+
+ sanitized_tablespace = sanitize_line(te->tablespace, false);
+ ahprintf(AH, "; Tablespace: %s", sanitized_tablespace);
+ free(sanitized_tablespace);
+ }
+ ahprintf(AH, "\n");
+
+ if (AH->PrintExtraTocPtr != NULL)
+ AH->PrintExtraTocPtr(AH, te);
+ ahprintf(AH, "--\n\n");
+ }
+
+ /*
+ * Actually print the definition.
+ *
+ * Really crude hack for suppressing AUTHORIZATION clause that old pg_dump
+ * versions put into CREATE SCHEMA. We have to do this when --no-owner
+ * mode is selected. This is ugly, but I see no other good way ...
+ */
+ if (ropt->noOwner && strcmp(te->desc, "SCHEMA") == 0)
+ {
+ ahprintf(AH, "CREATE SCHEMA %s;\n\n\n", fmtId(te->tag));
+ }
+ else
+ {
+ if (te->defn && strlen(te->defn) > 0)
+ ahprintf(AH, "%s\n\n", te->defn);
+ }
+
+ /*
+ * If we aren't using SET SESSION AUTH to determine ownership, we must
+ * instead issue an ALTER OWNER command. We assume that anything without
+ * a DROP command is not a separately ownable object. All the categories
+ * with DROP commands must appear in one list or the other.
+ */
+ if (!ropt->noOwner && !ropt->use_setsessauth &&
+ te->owner && strlen(te->owner) > 0 &&
+ te->dropStmt && strlen(te->dropStmt) > 0)
+ {
+ if (strcmp(te->desc, "AGGREGATE") == 0 ||
+ strcmp(te->desc, "BLOB") == 0 ||
+ strcmp(te->desc, "COLLATION") == 0 ||
+ strcmp(te->desc, "CONVERSION") == 0 ||
+ strcmp(te->desc, "DATABASE") == 0 ||
+ strcmp(te->desc, "DOMAIN") == 0 ||
+ strcmp(te->desc, "FUNCTION") == 0 ||
+ strcmp(te->desc, "OPERATOR") == 0 ||
+ strcmp(te->desc, "OPERATOR CLASS") == 0 ||
+ strcmp(te->desc, "OPERATOR FAMILY") == 0 ||
+ strcmp(te->desc, "PROCEDURE") == 0 ||
+ strcmp(te->desc, "PROCEDURAL LANGUAGE") == 0 ||
+ strcmp(te->desc, "SCHEMA") == 0 ||
+ strcmp(te->desc, "EVENT TRIGGER") == 0 ||
+ strcmp(te->desc, "TABLE") == 0 ||
+ strcmp(te->desc, "TYPE") == 0 ||
+ strcmp(te->desc, "VIEW") == 0 ||
+ strcmp(te->desc, "MATERIALIZED VIEW") == 0 ||
+ strcmp(te->desc, "SEQUENCE") == 0 ||
+ strcmp(te->desc, "FOREIGN TABLE") == 0 ||
+ strcmp(te->desc, "TEXT SEARCH DICTIONARY") == 0 ||
+ strcmp(te->desc, "TEXT SEARCH CONFIGURATION") == 0 ||
+ strcmp(te->desc, "FOREIGN DATA WRAPPER") == 0 ||
+ strcmp(te->desc, "SERVER") == 0 ||
+ strcmp(te->desc, "STATISTICS") == 0 ||
+ strcmp(te->desc, "PUBLICATION") == 0 ||
+ strcmp(te->desc, "SUBSCRIPTION") == 0)
+ {
+ PQExpBuffer temp = createPQExpBuffer();
+
+ appendPQExpBufferStr(temp, "ALTER ");
+ _getObjectDescription(temp, te);
+ appendPQExpBuffer(temp, " OWNER TO %s;", fmtId(te->owner));
+ ahprintf(AH, "%s\n\n", temp->data);
+ destroyPQExpBuffer(temp);
+ }
+ else if (strcmp(te->desc, "CAST") == 0 ||
+ strcmp(te->desc, "CHECK CONSTRAINT") == 0 ||
+ strcmp(te->desc, "CONSTRAINT") == 0 ||
+ strcmp(te->desc, "DATABASE PROPERTIES") == 0 ||
+ strcmp(te->desc, "DEFAULT") == 0 ||
+ strcmp(te->desc, "FK CONSTRAINT") == 0 ||
+ strcmp(te->desc, "INDEX") == 0 ||
+ strcmp(te->desc, "RULE") == 0 ||
+ strcmp(te->desc, "TRIGGER") == 0 ||
+ strcmp(te->desc, "ROW SECURITY") == 0 ||
+ strcmp(te->desc, "POLICY") == 0 ||
+ strcmp(te->desc, "USER MAPPING") == 0)
+ {
+ /* these object types don't have separate owners */
+ }
+ else
+ {
+ pg_log_warning("don't know how to set owner for object type \"%s\"",
+ te->desc);
+ }
+ }
+
+ /*
+ * If it's an ACL entry, it might contain SET SESSION AUTHORIZATION
+ * commands, so we can no longer assume we know the current auth setting.
+ */
+ if (_tocEntryIsACL(te))
+ {
+ if (AH->currUser)
+ free(AH->currUser);
+ AH->currUser = NULL;
+ }
+}
+
+/*
+ * Sanitize a string to be included in an SQL comment or TOC listing, by
+ * replacing any newlines with spaces. This ensures each logical output line
+ * is in fact one physical output line, to prevent corruption of the dump
+ * (which could, in the worst case, present an SQL injection vulnerability
+ * if someone were to incautiously load a dump containing objects with
+ * maliciously crafted names).
+ *
+ * The result is a freshly malloc'd string. If the input string is NULL,
+ * return a malloc'ed empty string, unless want_hyphen, in which case return a
+ * malloc'ed hyphen.
+ *
+ * Note that we currently don't bother to quote names, meaning that the name
+ * fields aren't automatically parseable. "pg_restore -L" doesn't care because
+ * it only examines the dumpId field, but someday we might want to try harder.
+ */
+static char *
+sanitize_line(const char *str, bool want_hyphen)
+{
+ char *result;
+ char *s;
+
+ if (!str)
+ return pg_strdup(want_hyphen ? "-" : "");
+
+ result = pg_strdup(str);
+
+ for (s = result; *s != '\0'; s++)
+ {
+ if (*s == '\n' || *s == '\r')
+ *s = ' ';
+ }
+
+ return result;
+}
+
+/*
+ * Write the file header for a custom-format archive
+ */
+void
+WriteHead(ArchiveHandle *AH)
+{
+ struct tm crtm;
+
+ AH->WriteBufPtr(AH, "PGDMP", 5); /* Magic code */
+ AH->WriteBytePtr(AH, ARCHIVE_MAJOR(AH->version));
+ AH->WriteBytePtr(AH, ARCHIVE_MINOR(AH->version));
+ AH->WriteBytePtr(AH, ARCHIVE_REV(AH->version));
+ AH->WriteBytePtr(AH, AH->intSize);
+ AH->WriteBytePtr(AH, AH->offSize);
+ AH->WriteBytePtr(AH, AH->format);
+ WriteInt(AH, AH->compression);
+ crtm = *localtime(&AH->createDate);
+ WriteInt(AH, crtm.tm_sec);
+ WriteInt(AH, crtm.tm_min);
+ WriteInt(AH, crtm.tm_hour);
+ WriteInt(AH, crtm.tm_mday);
+ WriteInt(AH, crtm.tm_mon);
+ WriteInt(AH, crtm.tm_year);
+ WriteInt(AH, crtm.tm_isdst);
+ WriteStr(AH, PQdb(AH->connection));
+ WriteStr(AH, AH->public.remoteVersionStr);
+ WriteStr(AH, PG_VERSION);
+}
+
+void
+ReadHead(ArchiveHandle *AH)
+{
+ char vmaj,
+ vmin,
+ vrev;
+ int fmt;
+
+ /*
+ * If we haven't already read the header, do so.
+ *
+ * NB: this code must agree with _discoverArchiveFormat(). Maybe find a
+ * way to unify the cases?
+ */
+ if (!AH->readHeader)
+ {
+ char tmpMag[7];
+
+ AH->ReadBufPtr(AH, tmpMag, 5);
+
+ if (strncmp(tmpMag, "PGDMP", 5) != 0)
+ fatal("did not find magic string in file header");
+ }
+
+ vmaj = AH->ReadBytePtr(AH);
+ vmin = AH->ReadBytePtr(AH);
+
+ if (vmaj > 1 || (vmaj == 1 && vmin > 0)) /* Version > 1.0 */
+ vrev = AH->ReadBytePtr(AH);
+ else
+ vrev = 0;
+
+ AH->version = MAKE_ARCHIVE_VERSION(vmaj, vmin, vrev);
+
+ if (AH->version < K_VERS_1_0 || AH->version > K_VERS_MAX)
+ fatal("unsupported version (%d.%d) in file header",
+ vmaj, vmin);
+
+ AH->intSize = AH->ReadBytePtr(AH);
+ if (AH->intSize > 32)
+ fatal("sanity check on integer size (%lu) failed",
+ (unsigned long) AH->intSize);
+
+ if (AH->intSize > sizeof(int))
+ pg_log_warning("archive was made on a machine with larger integers, some operations might fail");
+
+ if (AH->version >= K_VERS_1_7)
+ AH->offSize = AH->ReadBytePtr(AH);
+ else
+ AH->offSize = AH->intSize;
+
+ fmt = AH->ReadBytePtr(AH);
+
+ if (AH->format != fmt)
+ fatal("expected format (%d) differs from format found in file (%d)",
+ AH->format, fmt);
+
+ if (AH->version >= K_VERS_1_2)
+ {
+ if (AH->version < K_VERS_1_4)
+ AH->compression = AH->ReadBytePtr(AH);
+ else
+ AH->compression = ReadInt(AH);
+ }
+ else
+ AH->compression = Z_DEFAULT_COMPRESSION;
+
+#ifndef HAVE_LIBZ
+ if (AH->compression != 0)
+ pg_log_warning("archive is compressed, but this installation does not support compression -- no data will be available");
+#endif
+
+ if (AH->version >= K_VERS_1_4)
+ {
+ struct tm crtm;
+
+ crtm.tm_sec = ReadInt(AH);
+ crtm.tm_min = ReadInt(AH);
+ crtm.tm_hour = ReadInt(AH);
+ crtm.tm_mday = ReadInt(AH);
+ crtm.tm_mon = ReadInt(AH);
+ crtm.tm_year = ReadInt(AH);
+ crtm.tm_isdst = ReadInt(AH);
+
+ /*
+ * Newer versions of glibc have mktime() report failure if tm_isdst is
+ * inconsistent with the prevailing timezone, e.g. tm_isdst = 1 when
+ * TZ=UTC. This is problematic when restoring an archive under a
+ * different timezone setting. If we get a failure, try again with
+ * tm_isdst set to -1 ("don't know").
+ *
+ * XXX with or without this hack, we reconstruct createDate
+ * incorrectly when the prevailing timezone is different from
+ * pg_dump's. Next time we bump the archive version, we should flush
+ * this representation and store a plain seconds-since-the-Epoch
+ * timestamp instead.
+ */
+ AH->createDate = mktime(&crtm);
+ if (AH->createDate == (time_t) -1)
+ {
+ crtm.tm_isdst = -1;
+ AH->createDate = mktime(&crtm);
+ if (AH->createDate == (time_t) -1)
+ pg_log_warning("invalid creation date in header");
+ }
+ }
+
+ if (AH->version >= K_VERS_1_4)
+ {
+ AH->archdbname = ReadStr(AH);
+ }
+
+ if (AH->version >= K_VERS_1_10)
+ {
+ AH->archiveRemoteVersion = ReadStr(AH);
+ AH->archiveDumpVersion = ReadStr(AH);
+ }
+}
+
+
+/*
+ * checkSeek
+ * check to see if ftell/fseek can be performed.
+ */
+bool
+checkSeek(FILE *fp)
+{
+ pgoff_t tpos;
+
+ /* Check that ftello works on this file */
+ tpos = ftello(fp);
+ if (tpos < 0)
+ return false;
+
+ /*
+ * Check that fseeko(SEEK_SET) works, too. NB: we used to try to test
+ * this with fseeko(fp, 0, SEEK_CUR). But some platforms treat that as a
+ * successful no-op even on files that are otherwise unseekable.
+ */
+ if (fseeko(fp, tpos, SEEK_SET) != 0)
+ return false;
+
+ return true;
+}
+
+
+/*
+ * dumpTimestamp
+ */
+static void
+dumpTimestamp(ArchiveHandle *AH, const char *msg, time_t tim)
+{
+ char buf[64];
+
+ if (strftime(buf, sizeof(buf), PGDUMP_STRFTIME_FMT, localtime(&tim)) != 0)
+ ahprintf(AH, "-- %s %s\n\n", msg, buf);
+}
+
+/*
+ * Main engine for parallel restore.
+ *
+ * Parallel restore is done in three phases. In this first phase,
+ * we'll process all SECTION_PRE_DATA TOC entries that are allowed to be
+ * processed in the RESTORE_PASS_MAIN pass. (In practice, that's all
+ * PRE_DATA items other than ACLs.) Entries we can't process now are
+ * added to the pending_list for later phases to deal with.
+ */
+static void
+restore_toc_entries_prefork(ArchiveHandle *AH, TocEntry *pending_list)
+{
+ bool skipped_some;
+ TocEntry *next_work_item;
+
+ pg_log_debug("entering restore_toc_entries_prefork");
+
+ /* Adjust dependency information */
+ fix_dependencies(AH);
+
+ /*
+ * Do all the early stuff in a single connection in the parent. There's no
+ * great point in running it in parallel, in fact it will actually run
+ * faster in a single connection because we avoid all the connection and
+ * setup overhead. Also, pre-9.2 pg_dump versions were not very good
+ * about showing all the dependencies of SECTION_PRE_DATA items, so we do
+ * not risk trying to process them out-of-order.
+ *
+ * Stuff that we can't do immediately gets added to the pending_list.
+ * Note: we don't yet filter out entries that aren't going to be restored.
+ * They might participate in dependency chains connecting entries that
+ * should be restored, so we treat them as live until we actually process
+ * them.
+ *
+ * Note: as of 9.2, it should be guaranteed that all PRE_DATA items appear
+ * before DATA items, and all DATA items before POST_DATA items. That is
+ * not certain to be true in older archives, though, and in any case use
+ * of a list file would destroy that ordering (cf. SortTocFromFile). So
+ * this loop cannot assume that it holds.
+ */
+ AH->restorePass = RESTORE_PASS_MAIN;
+ skipped_some = false;
+ for (next_work_item = AH->toc->next; next_work_item != AH->toc; next_work_item = next_work_item->next)
+ {
+ bool do_now = true;
+
+ if (next_work_item->section != SECTION_PRE_DATA)
+ {
+ /* DATA and POST_DATA items are just ignored for now */
+ if (next_work_item->section == SECTION_DATA ||
+ next_work_item->section == SECTION_POST_DATA)
+ {
+ do_now = false;
+ skipped_some = true;
+ }
+ else
+ {
+ /*
+ * SECTION_NONE items, such as comments, can be processed now
+ * if we are still in the PRE_DATA part of the archive. Once
+ * we've skipped any items, we have to consider whether the
+ * comment's dependencies are satisfied, so skip it for now.
+ */
+ if (skipped_some)
+ do_now = false;
+ }
+ }
+
+ /*
+ * Also skip items that need to be forced into later passes. We need
+ * not set skipped_some in this case, since by assumption no main-pass
+ * items could depend on these.
+ */
+ if (_tocEntryRestorePass(next_work_item) != RESTORE_PASS_MAIN)
+ do_now = false;
+
+ if (do_now)
+ {
+ /* OK, restore the item and update its dependencies */
+ pg_log_info("processing item %d %s %s",
+ next_work_item->dumpId,
+ next_work_item->desc, next_work_item->tag);
+
+ (void) restore_toc_entry(AH, next_work_item, false);
+
+ /* Reduce dependencies, but don't move anything to ready_list */
+ reduce_dependencies(AH, next_work_item, NULL);
+ }
+ else
+ {
+ /* Nope, so add it to pending_list */
+ pending_list_append(pending_list, next_work_item);
+ }
+ }
+
+ /*
+ * Now close parent connection in prep for parallel steps. We do this
+ * mainly to ensure that we don't exceed the specified number of parallel
+ * connections.
+ */
+ DisconnectDatabase(&AH->public);
+
+ /* blow away any transient state from the old connection */
+ if (AH->currUser)
+ free(AH->currUser);
+ AH->currUser = NULL;
+ if (AH->currSchema)
+ free(AH->currSchema);
+ AH->currSchema = NULL;
+ if (AH->currTablespace)
+ free(AH->currTablespace);
+ AH->currTablespace = NULL;
+ if (AH->currTableAm)
+ free(AH->currTableAm);
+ AH->currTableAm = NULL;
+}
+
+/*
+ * Main engine for parallel restore.
+ *
+ * Parallel restore is done in three phases. In this second phase,
+ * we process entries by dispatching them to parallel worker children
+ * (processes on Unix, threads on Windows), each of which connects
+ * separately to the database. Inter-entry dependencies are respected,
+ * and so is the RestorePass multi-pass structure. When we can no longer
+ * make any entries ready to process, we exit. Normally, there will be
+ * nothing left to do; but if there is, the third phase will mop up.
+ */
+static void
+restore_toc_entries_parallel(ArchiveHandle *AH, ParallelState *pstate,
+ TocEntry *pending_list)
+{
+ ParallelReadyList ready_list;
+ TocEntry *next_work_item;
+
+ pg_log_debug("entering restore_toc_entries_parallel");
+
+ /* Set up ready_list with enough room for all known TocEntrys */
+ ready_list_init(&ready_list, AH->tocCount);
+
+ /*
+ * The pending_list contains all items that we need to restore. Move all
+ * items that are available to process immediately into the ready_list.
+ * After this setup, the pending list is everything that needs to be done
+ * but is blocked by one or more dependencies, while the ready list
+ * contains items that have no remaining dependencies and are OK to
+ * process in the current restore pass.
+ */
+ AH->restorePass = RESTORE_PASS_MAIN;
+ move_to_ready_list(pending_list, &ready_list, AH->restorePass);
+
+ /*
+ * main parent loop
+ *
+ * Keep going until there is no worker still running AND there is no work
+ * left to be done. Note invariant: at top of loop, there should always
+ * be at least one worker available to dispatch a job to.
+ */
+ pg_log_info("entering main parallel loop");
+
+ for (;;)
+ {
+ /* Look for an item ready to be dispatched to a worker */
+ next_work_item = pop_next_work_item(&ready_list, pstate);
+ if (next_work_item != NULL)
+ {
+ /* If not to be restored, don't waste time launching a worker */
+ if ((next_work_item->reqs & (REQ_SCHEMA | REQ_DATA)) == 0)
+ {
+ pg_log_info("skipping item %d %s %s",
+ next_work_item->dumpId,
+ next_work_item->desc, next_work_item->tag);
+ /* Update its dependencies as though we'd completed it */
+ reduce_dependencies(AH, next_work_item, &ready_list);
+ /* Loop around to see if anything else can be dispatched */
+ continue;
+ }
+
+ pg_log_info("launching item %d %s %s",
+ next_work_item->dumpId,
+ next_work_item->desc, next_work_item->tag);
+
+ /* Dispatch to some worker */
+ DispatchJobForTocEntry(AH, pstate, next_work_item, ACT_RESTORE,
+ mark_restore_job_done, &ready_list);
+ }
+ else if (IsEveryWorkerIdle(pstate))
+ {
+ /*
+ * Nothing is ready and no worker is running, so we're done with
+ * the current pass or maybe with the whole process.
+ */
+ if (AH->restorePass == RESTORE_PASS_LAST)
+ break; /* No more parallel processing is possible */
+
+ /* Advance to next restore pass */
+ AH->restorePass++;
+ /* That probably allows some stuff to be made ready */
+ move_to_ready_list(pending_list, &ready_list, AH->restorePass);
+ /* Loop around to see if anything's now ready */
+ continue;
+ }
+ else
+ {
+ /*
+ * We have nothing ready, but at least one child is working, so
+ * wait for some subjob to finish.
+ */
+ }
+
+ /*
+ * Before dispatching another job, check to see if anything has
+ * finished. We should check every time through the loop so as to
+ * reduce dependencies as soon as possible. If we were unable to
+ * dispatch any job this time through, wait until some worker finishes
+ * (and, hopefully, unblocks some pending item). If we did dispatch
+ * something, continue as soon as there's at least one idle worker.
+ * Note that in either case, there's guaranteed to be at least one
+ * idle worker when we return to the top of the loop. This ensures we
+ * won't block inside DispatchJobForTocEntry, which would be
+ * undesirable: we'd rather postpone dispatching until we see what's
+ * been unblocked by finished jobs.
+ */
+ WaitForWorkers(AH, pstate,
+ next_work_item ? WFW_ONE_IDLE : WFW_GOT_STATUS);
+ }
+
+ /* There should now be nothing in ready_list. */
+ Assert(ready_list.first_te > ready_list.last_te);
+
+ ready_list_free(&ready_list);
+
+ pg_log_info("finished main parallel loop");
+}
+
+/*
+ * Main engine for parallel restore.
+ *
+ * Parallel restore is done in three phases. In this third phase,
+ * we mop up any remaining TOC entries by processing them serially.
+ * This phase normally should have nothing to do, but if we've somehow
+ * gotten stuck due to circular dependencies or some such, this provides
+ * at least some chance of completing the restore successfully.
+ */
+static void
+restore_toc_entries_postfork(ArchiveHandle *AH, TocEntry *pending_list)
+{
+ RestoreOptions *ropt = AH->public.ropt;
+ TocEntry *te;
+
+ pg_log_debug("entering restore_toc_entries_postfork");
+
+ /*
+ * Now reconnect the single parent connection.
+ */
+ ConnectDatabase((Archive *) AH, &ropt->cparams, true);
+
+ /* re-establish fixed state */
+ _doSetFixedOutputState(AH);
+
+ /*
+ * Make sure there is no work left due to, say, circular dependencies, or
+ * some other pathological condition. If so, do it in the single parent
+ * connection. We don't sweat about RestorePass ordering; it's likely we
+ * already violated that.
+ */
+ for (te = pending_list->pending_next; te != pending_list; te = te->pending_next)
+ {
+ pg_log_info("processing missed item %d %s %s",
+ te->dumpId, te->desc, te->tag);
+ (void) restore_toc_entry(AH, te, false);
+ }
+}
+
+/*
+ * Check if te1 has an exclusive lock requirement for an item that te2 also
+ * requires, whether or not te2's requirement is for an exclusive lock.
+ */
+static bool
+has_lock_conflicts(TocEntry *te1, TocEntry *te2)
+{
+ int j,
+ k;
+
+ for (j = 0; j < te1->nLockDeps; j++)
+ {
+ for (k = 0; k < te2->nDeps; k++)
+ {
+ if (te1->lockDeps[j] == te2->dependencies[k])
+ return true;
+ }
+ }
+ return false;
+}
+
+
+/*
+ * Initialize the header of the pending-items list.
+ *
+ * This is a circular list with a dummy TocEntry as header, just like the
+ * main TOC list; but we use separate list links so that an entry can be in
+ * the main TOC list as well as in the pending list.
+ */
+static void
+pending_list_header_init(TocEntry *l)
+{
+ l->pending_prev = l->pending_next = l;
+}
+
+/* Append te to the end of the pending-list headed by l */
+static void
+pending_list_append(TocEntry *l, TocEntry *te)
+{
+ te->pending_prev = l->pending_prev;
+ l->pending_prev->pending_next = te;
+ l->pending_prev = te;
+ te->pending_next = l;
+}
+
+/* Remove te from the pending-list */
+static void
+pending_list_remove(TocEntry *te)
+{
+ te->pending_prev->pending_next = te->pending_next;
+ te->pending_next->pending_prev = te->pending_prev;
+ te->pending_prev = NULL;
+ te->pending_next = NULL;
+}
+
+
+/*
+ * Initialize the ready_list with enough room for up to tocCount entries.
+ */
+static void
+ready_list_init(ParallelReadyList *ready_list, int tocCount)
+{
+ ready_list->tes = (TocEntry **)
+ pg_malloc(tocCount * sizeof(TocEntry *));
+ ready_list->first_te = 0;
+ ready_list->last_te = -1;
+ ready_list->sorted = false;
+}
+
+/*
+ * Free storage for a ready_list.
+ */
+static void
+ready_list_free(ParallelReadyList *ready_list)
+{
+ pg_free(ready_list->tes);
+}
+
+/* Add te to the ready_list */
+static void
+ready_list_insert(ParallelReadyList *ready_list, TocEntry *te)
+{
+ ready_list->tes[++ready_list->last_te] = te;
+ /* List is (probably) not sorted anymore. */
+ ready_list->sorted = false;
+}
+
+/* Remove the i'th entry in the ready_list */
+static void
+ready_list_remove(ParallelReadyList *ready_list, int i)
+{
+ int f = ready_list->first_te;
+
+ Assert(i >= f && i <= ready_list->last_te);
+
+ /*
+ * In the typical case where the item to be removed is the first ready
+ * entry, we need only increment first_te to remove it. Otherwise, move
+ * the entries before it to compact the list. (This preserves sortedness,
+ * if any.) We could alternatively move the entries after i, but there
+ * are typically many more of those.
+ */
+ if (i > f)
+ {
+ TocEntry **first_te_ptr = &ready_list->tes[f];
+
+ memmove(first_te_ptr + 1, first_te_ptr, (i - f) * sizeof(TocEntry *));
+ }
+ ready_list->first_te++;
+}
+
+/* Sort the ready_list into the desired order */
+static void
+ready_list_sort(ParallelReadyList *ready_list)
+{
+ if (!ready_list->sorted)
+ {
+ int n = ready_list->last_te - ready_list->first_te + 1;
+
+ if (n > 1)
+ qsort(ready_list->tes + ready_list->first_te, n,
+ sizeof(TocEntry *),
+ TocEntrySizeCompare);
+ ready_list->sorted = true;
+ }
+}
+
+/* qsort comparator for sorting TocEntries by dataLength */
+static int
+TocEntrySizeCompare(const void *p1, const void *p2)
+{
+ const TocEntry *te1 = *(const TocEntry *const *) p1;
+ const TocEntry *te2 = *(const TocEntry *const *) p2;
+
+ /* Sort by decreasing dataLength */
+ if (te1->dataLength > te2->dataLength)
+ return -1;
+ if (te1->dataLength < te2->dataLength)
+ return 1;
+
+ /* For equal dataLengths, sort by dumpId, just to be stable */
+ if (te1->dumpId < te2->dumpId)
+ return -1;
+ if (te1->dumpId > te2->dumpId)
+ return 1;
+
+ return 0;
+}
+
+
+/*
+ * Move all immediately-ready items from pending_list to ready_list.
+ *
+ * Items are considered ready if they have no remaining dependencies and
+ * they belong in the current restore pass. (See also reduce_dependencies,
+ * which applies the same logic one-at-a-time.)
+ */
+static void
+move_to_ready_list(TocEntry *pending_list,
+ ParallelReadyList *ready_list,
+ RestorePass pass)
+{
+ TocEntry *te;
+ TocEntry *next_te;
+
+ for (te = pending_list->pending_next; te != pending_list; te = next_te)
+ {
+ /* must save list link before possibly removing te from list */
+ next_te = te->pending_next;
+
+ if (te->depCount == 0 &&
+ _tocEntryRestorePass(te) == pass)
+ {
+ /* Remove it from pending_list ... */
+ pending_list_remove(te);
+ /* ... and add to ready_list */
+ ready_list_insert(ready_list, te);
+ }
+ }
+}
+
+/*
+ * Find the next work item (if any) that is capable of being run now,
+ * and remove it from the ready_list.
+ *
+ * Returns the item, or NULL if nothing is runnable.
+ *
+ * To qualify, the item must have no remaining dependencies
+ * and no requirements for locks that are incompatible with
+ * items currently running. Items in the ready_list are known to have
+ * no remaining dependencies, but we have to check for lock conflicts.
+ */
+static TocEntry *
+pop_next_work_item(ParallelReadyList *ready_list,
+ ParallelState *pstate)
+{
+ /*
+ * Sort the ready_list so that we'll tackle larger jobs first.
+ */
+ ready_list_sort(ready_list);
+
+ /*
+ * Search the ready_list until we find a suitable item.
+ */
+ for (int i = ready_list->first_te; i <= ready_list->last_te; i++)
+ {
+ TocEntry *te = ready_list->tes[i];
+ bool conflicts = false;
+
+ /*
+ * Check to see if the item would need exclusive lock on something
+ * that a currently running item also needs lock on, or vice versa. If
+ * so, we don't want to schedule them together.
+ */
+ for (int k = 0; k < pstate->numWorkers; k++)
+ {
+ TocEntry *running_te = pstate->te[k];
+
+ if (running_te == NULL)
+ continue;
+ if (has_lock_conflicts(te, running_te) ||
+ has_lock_conflicts(running_te, te))
+ {
+ conflicts = true;
+ break;
+ }
+ }
+
+ if (conflicts)
+ continue;
+
+ /* passed all tests, so this item can run */
+ ready_list_remove(ready_list, i);
+ return te;
+ }
+
+ pg_log_debug("no item ready");
+ return NULL;
+}
+
+
+/*
+ * Restore a single TOC item in parallel with others
+ *
+ * this is run in the worker, i.e. in a thread (Windows) or a separate process
+ * (everything else). A worker process executes several such work items during
+ * a parallel backup or restore. Once we terminate here and report back that
+ * our work is finished, the leader process will assign us a new work item.
+ */
+int
+parallel_restore(ArchiveHandle *AH, TocEntry *te)
+{
+ int status;
+
+ Assert(AH->connection != NULL);
+
+ /* Count only errors associated with this TOC entry */
+ AH->public.n_errors = 0;
+
+ /* Restore the TOC item */
+ status = restore_toc_entry(AH, te, true);
+
+ return status;
+}
+
+
+/*
+ * Callback function that's invoked in the leader process after a step has
+ * been parallel restored.
+ *
+ * Update status and reduce the dependency count of any dependent items.
+ */
+static void
+mark_restore_job_done(ArchiveHandle *AH,
+ TocEntry *te,
+ int status,
+ void *callback_data)
+{
+ ParallelReadyList *ready_list = (ParallelReadyList *) callback_data;
+
+ pg_log_info("finished item %d %s %s",
+ te->dumpId, te->desc, te->tag);
+
+ if (status == WORKER_CREATE_DONE)
+ mark_create_done(AH, te);
+ else if (status == WORKER_INHIBIT_DATA)
+ {
+ inhibit_data_for_failed_table(AH, te);
+ AH->public.n_errors++;
+ }
+ else if (status == WORKER_IGNORED_ERRORS)
+ AH->public.n_errors++;
+ else if (status != 0)
+ fatal("worker process failed: exit code %d",
+ status);
+
+ reduce_dependencies(AH, te, ready_list);
+}
+
+
+/*
+ * Process the dependency information into a form useful for parallel restore.
+ *
+ * This function takes care of fixing up some missing or badly designed
+ * dependencies, and then prepares subsidiary data structures that will be
+ * used in the main parallel-restore logic, including:
+ * 1. We build the revDeps[] arrays of incoming dependency dumpIds.
+ * 2. We set up depCount fields that are the number of as-yet-unprocessed
+ * dependencies for each TOC entry.
+ *
+ * We also identify locking dependencies so that we can avoid trying to
+ * schedule conflicting items at the same time.
+ */
+static void
+fix_dependencies(ArchiveHandle *AH)
+{
+ TocEntry *te;
+ int i;
+
+ /*
+ * Initialize the depCount/revDeps/nRevDeps fields, and make sure the TOC
+ * items are marked as not being in any parallel-processing list.
+ */
+ for (te = AH->toc->next; te != AH->toc; te = te->next)
+ {
+ te->depCount = te->nDeps;
+ te->revDeps = NULL;
+ te->nRevDeps = 0;
+ te->pending_prev = NULL;
+ te->pending_next = NULL;
+ }
+
+ /*
+ * POST_DATA items that are shown as depending on a table need to be
+ * re-pointed to depend on that table's data, instead. This ensures they
+ * won't get scheduled until the data has been loaded.
+ */
+ repoint_table_dependencies(AH);
+
+ /*
+ * Pre-8.4 versions of pg_dump neglected to set up a dependency from BLOB
+ * COMMENTS to BLOBS. Cope. (We assume there's only one BLOBS and only
+ * one BLOB COMMENTS in such files.)
+ */
+ if (AH->version < K_VERS_1_11)
+ {
+ for (te = AH->toc->next; te != AH->toc; te = te->next)
+ {
+ if (strcmp(te->desc, "BLOB COMMENTS") == 0 && te->nDeps == 0)
+ {
+ TocEntry *te2;
+
+ for (te2 = AH->toc->next; te2 != AH->toc; te2 = te2->next)
+ {
+ if (strcmp(te2->desc, "BLOBS") == 0)
+ {
+ te->dependencies = (DumpId *) pg_malloc(sizeof(DumpId));
+ te->dependencies[0] = te2->dumpId;
+ te->nDeps++;
+ te->depCount++;
+ break;
+ }
+ }
+ break;
+ }
+ }
+ }
+
+ /*
+ * At this point we start to build the revDeps reverse-dependency arrays,
+ * so all changes of dependencies must be complete.
+ */
+
+ /*
+ * Count the incoming dependencies for each item. Also, it is possible
+ * that the dependencies list items that are not in the archive at all
+ * (that should not happen in 9.2 and later, but is highly likely in older
+ * archives). Subtract such items from the depCounts.
+ */
+ for (te = AH->toc->next; te != AH->toc; te = te->next)
+ {
+ for (i = 0; i < te->nDeps; i++)
+ {
+ DumpId depid = te->dependencies[i];
+
+ if (depid <= AH->maxDumpId && AH->tocsByDumpId[depid] != NULL)
+ AH->tocsByDumpId[depid]->nRevDeps++;
+ else
+ te->depCount--;
+ }
+ }
+
+ /*
+ * Allocate space for revDeps[] arrays, and reset nRevDeps so we can use
+ * it as a counter below.
+ */
+ for (te = AH->toc->next; te != AH->toc; te = te->next)
+ {
+ if (te->nRevDeps > 0)
+ te->revDeps = (DumpId *) pg_malloc(te->nRevDeps * sizeof(DumpId));
+ te->nRevDeps = 0;
+ }
+
+ /*
+ * Build the revDeps[] arrays of incoming-dependency dumpIds. This had
+ * better agree with the loops above.
+ */
+ for (te = AH->toc->next; te != AH->toc; te = te->next)
+ {
+ for (i = 0; i < te->nDeps; i++)
+ {
+ DumpId depid = te->dependencies[i];
+
+ if (depid <= AH->maxDumpId && AH->tocsByDumpId[depid] != NULL)
+ {
+ TocEntry *otherte = AH->tocsByDumpId[depid];
+
+ otherte->revDeps[otherte->nRevDeps++] = te->dumpId;
+ }
+ }
+ }
+
+ /*
+ * Lastly, work out the locking dependencies.
+ */
+ for (te = AH->toc->next; te != AH->toc; te = te->next)
+ {
+ te->lockDeps = NULL;
+ te->nLockDeps = 0;
+ identify_locking_dependencies(AH, te);
+ }
+}
+
+/*
+ * Change dependencies on table items to depend on table data items instead,
+ * but only in POST_DATA items.
+ *
+ * Also, for any item having such dependency(s), set its dataLength to the
+ * largest dataLength of the table data items it depends on. This ensures
+ * that parallel restore will prioritize larger jobs (index builds, FK
+ * constraint checks, etc) over smaller ones, avoiding situations where we
+ * end a restore with only one active job working on a large table.
+ */
+static void
+repoint_table_dependencies(ArchiveHandle *AH)
+{
+ TocEntry *te;
+ int i;
+ DumpId olddep;
+
+ for (te = AH->toc->next; te != AH->toc; te = te->next)
+ {
+ if (te->section != SECTION_POST_DATA)
+ continue;
+ for (i = 0; i < te->nDeps; i++)
+ {
+ olddep = te->dependencies[i];
+ if (olddep <= AH->maxDumpId &&
+ AH->tableDataId[olddep] != 0)
+ {
+ DumpId tabledataid = AH->tableDataId[olddep];
+ TocEntry *tabledatate = AH->tocsByDumpId[tabledataid];
+
+ te->dependencies[i] = tabledataid;
+ te->dataLength = Max(te->dataLength, tabledatate->dataLength);
+ pg_log_debug("transferring dependency %d -> %d to %d",
+ te->dumpId, olddep, tabledataid);
+ }
+ }
+ }
+}
+
+/*
+ * Identify which objects we'll need exclusive lock on in order to restore
+ * the given TOC entry (*other* than the one identified by the TOC entry
+ * itself). Record their dump IDs in the entry's lockDeps[] array.
+ */
+static void
+identify_locking_dependencies(ArchiveHandle *AH, TocEntry *te)
+{
+ DumpId *lockids;
+ int nlockids;
+ int i;
+
+ /*
+ * We only care about this for POST_DATA items. PRE_DATA items are not
+ * run in parallel, and DATA items are all independent by assumption.
+ */
+ if (te->section != SECTION_POST_DATA)
+ return;
+
+ /* Quick exit if no dependencies at all */
+ if (te->nDeps == 0)
+ return;
+
+ /*
+ * Most POST_DATA items are ALTER TABLEs or some moral equivalent of that,
+ * and hence require exclusive lock. However, we know that CREATE INDEX
+ * does not. (Maybe someday index-creating CONSTRAINTs will fall in that
+ * category too ... but today is not that day.)
+ */
+ if (strcmp(te->desc, "INDEX") == 0)
+ return;
+
+ /*
+ * We assume the entry requires exclusive lock on each TABLE or TABLE DATA
+ * item listed among its dependencies. Originally all of these would have
+ * been TABLE items, but repoint_table_dependencies would have repointed
+ * them to the TABLE DATA items if those are present (which they might not
+ * be, eg in a schema-only dump). Note that all of the entries we are
+ * processing here are POST_DATA; otherwise there might be a significant
+ * difference between a dependency on a table and a dependency on its
+ * data, so that closer analysis would be needed here.
+ */
+ lockids = (DumpId *) pg_malloc(te->nDeps * sizeof(DumpId));
+ nlockids = 0;
+ for (i = 0; i < te->nDeps; i++)
+ {
+ DumpId depid = te->dependencies[i];
+
+ if (depid <= AH->maxDumpId && AH->tocsByDumpId[depid] != NULL &&
+ ((strcmp(AH->tocsByDumpId[depid]->desc, "TABLE DATA") == 0) ||
+ strcmp(AH->tocsByDumpId[depid]->desc, "TABLE") == 0))
+ lockids[nlockids++] = depid;
+ }
+
+ if (nlockids == 0)
+ {
+ free(lockids);
+ return;
+ }
+
+ te->lockDeps = pg_realloc(lockids, nlockids * sizeof(DumpId));
+ te->nLockDeps = nlockids;
+}
+
+/*
+ * Remove the specified TOC entry from the depCounts of items that depend on
+ * it, thereby possibly making them ready-to-run. Any pending item that
+ * becomes ready should be moved to the ready_list, if that's provided.
+ */
+static void
+reduce_dependencies(ArchiveHandle *AH, TocEntry *te,
+ ParallelReadyList *ready_list)
+{
+ int i;
+
+ pg_log_debug("reducing dependencies for %d", te->dumpId);
+
+ for (i = 0; i < te->nRevDeps; i++)
+ {
+ TocEntry *otherte = AH->tocsByDumpId[te->revDeps[i]];
+
+ Assert(otherte->depCount > 0);
+ otherte->depCount--;
+
+ /*
+ * It's ready if it has no remaining dependencies, and it belongs in
+ * the current restore pass, and it is currently a member of the
+ * pending list (that check is needed to prevent double restore in
+ * some cases where a list-file forces out-of-order restoring).
+ * However, if ready_list == NULL then caller doesn't want any list
+ * memberships changed.
+ */
+ if (otherte->depCount == 0 &&
+ _tocEntryRestorePass(otherte) == AH->restorePass &&
+ otherte->pending_prev != NULL &&
+ ready_list != NULL)
+ {
+ /* Remove it from pending list ... */
+ pending_list_remove(otherte);
+ /* ... and add to ready_list */
+ ready_list_insert(ready_list, otherte);
+ }
+ }
+}
+
+/*
+ * Set the created flag on the DATA member corresponding to the given
+ * TABLE member
+ */
+static void
+mark_create_done(ArchiveHandle *AH, TocEntry *te)
+{
+ if (AH->tableDataId[te->dumpId] != 0)
+ {
+ TocEntry *ted = AH->tocsByDumpId[AH->tableDataId[te->dumpId]];
+
+ ted->created = true;
+ }
+}
+
+/*
+ * Mark the DATA member corresponding to the given TABLE member
+ * as not wanted
+ */
+static void
+inhibit_data_for_failed_table(ArchiveHandle *AH, TocEntry *te)
+{
+ pg_log_info("table \"%s\" could not be created, will not restore its data",
+ te->tag);
+
+ if (AH->tableDataId[te->dumpId] != 0)
+ {
+ TocEntry *ted = AH->tocsByDumpId[AH->tableDataId[te->dumpId]];
+
+ ted->reqs = 0;
+ }
+}
+
+/*
+ * Clone and de-clone routines used in parallel restoration.
+ *
+ * Enough of the structure is cloned to ensure that there is no
+ * conflict between different threads each with their own clone.
+ */
+ArchiveHandle *
+CloneArchive(ArchiveHandle *AH)
+{
+ ArchiveHandle *clone;
+
+ /* Make a "flat" copy */
+ clone = (ArchiveHandle *) pg_malloc(sizeof(ArchiveHandle));
+ memcpy(clone, AH, sizeof(ArchiveHandle));
+
+ /* Handle format-independent fields */
+ memset(&(clone->sqlparse), 0, sizeof(clone->sqlparse));
+
+ /* The clone will have its own connection, so disregard connection state */
+ clone->connection = NULL;
+ clone->connCancel = NULL;
+ clone->currUser = NULL;
+ clone->currSchema = NULL;
+ clone->currTablespace = NULL;
+
+ /* savedPassword must be local in case we change it while connecting */
+ if (clone->savedPassword)
+ clone->savedPassword = pg_strdup(clone->savedPassword);
+
+ /* clone has its own error count, too */
+ clone->public.n_errors = 0;
+
+ /*
+ * Connect our new clone object to the database, using the same connection
+ * parameters used for the original connection.
+ */
+ ConnectDatabase((Archive *) clone, &clone->public.ropt->cparams, true);
+
+ /* re-establish fixed state */
+ if (AH->mode == archModeRead)
+ _doSetFixedOutputState(clone);
+ /* in write case, setupDumpWorker will fix up connection state */
+
+ /* Let the format-specific code have a chance too */
+ clone->ClonePtr(clone);
+
+ Assert(clone->connection != NULL);
+ return clone;
+}
+
+/*
+ * Release clone-local storage.
+ *
+ * Note: we assume any clone-local connection was already closed.
+ */
+void
+DeCloneArchive(ArchiveHandle *AH)
+{
+ /* Should not have an open database connection */
+ Assert(AH->connection == NULL);
+
+ /* Clear format-specific state */
+ AH->DeClonePtr(AH);
+
+ /* Clear state allocated by CloneArchive */
+ if (AH->sqlparse.curCmd)
+ destroyPQExpBuffer(AH->sqlparse.curCmd);
+
+ /* Clear any connection-local state */
+ if (AH->currUser)
+ free(AH->currUser);
+ if (AH->currSchema)
+ free(AH->currSchema);
+ if (AH->currTablespace)
+ free(AH->currTablespace);
+ if (AH->currTableAm)
+ free(AH->currTableAm);
+ if (AH->savedPassword)
+ free(AH->savedPassword);
+
+ free(AH);
+}