summaryrefslogtreecommitdiffstats
path: root/src/bin/pg_dump/pg_backup_custom.c
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-05-04 12:19:15 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-05-04 12:19:15 +0000
commit6eb9c5a5657d1fe77b55cc261450f3538d35a94d (patch)
tree657d8194422a5daccecfd42d654b8a245ef7b4c8 /src/bin/pg_dump/pg_backup_custom.c
parentInitial commit. (diff)
downloadpostgresql-13-6eb9c5a5657d1fe77b55cc261450f3538d35a94d.tar.xz
postgresql-13-6eb9c5a5657d1fe77b55cc261450f3538d35a94d.zip
Adding upstream version 13.4.upstream/13.4upstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'src/bin/pg_dump/pg_backup_custom.c')
-rw-r--r--src/bin/pg_dump/pg_backup_custom.c1027
1 files changed, 1027 insertions, 0 deletions
diff --git a/src/bin/pg_dump/pg_backup_custom.c b/src/bin/pg_dump/pg_backup_custom.c
new file mode 100644
index 0000000..895ff76
--- /dev/null
+++ b/src/bin/pg_dump/pg_backup_custom.c
@@ -0,0 +1,1027 @@
+/*-------------------------------------------------------------------------
+ *
+ * pg_backup_custom.c
+ *
+ * Implements the custom output format.
+ *
+ * The comments with the routines in this code are a good place to
+ * understand how to write a new format.
+ *
+ * See the headers to pg_restore for more details.
+ *
+ * Copyright (c) 2000, Philip Warner
+ * Rights are granted to use this software in any way so long
+ * as this notice is not removed.
+ *
+ * The author is not responsible for loss or damages that may
+ * and any liability will be limited to the time taken to fix any
+ * related bug.
+ *
+ *
+ * IDENTIFICATION
+ * src/bin/pg_dump/pg_backup_custom.c
+ *
+ *-------------------------------------------------------------------------
+ */
+#include "postgres_fe.h"
+
+#include "common/file_utils.h"
+#include "compress_io.h"
+#include "parallel.h"
+#include "pg_backup_utils.h"
+
+/*--------
+ * Routines in the format interface
+ *--------
+ */
+
+static void _ArchiveEntry(ArchiveHandle *AH, TocEntry *te);
+static void _StartData(ArchiveHandle *AH, TocEntry *te);
+static void _WriteData(ArchiveHandle *AH, const void *data, size_t dLen);
+static void _EndData(ArchiveHandle *AH, TocEntry *te);
+static int _WriteByte(ArchiveHandle *AH, const int i);
+static int _ReadByte(ArchiveHandle *);
+static void _WriteBuf(ArchiveHandle *AH, const void *buf, size_t len);
+static void _ReadBuf(ArchiveHandle *AH, void *buf, size_t len);
+static void _CloseArchive(ArchiveHandle *AH);
+static void _ReopenArchive(ArchiveHandle *AH);
+static void _PrintTocData(ArchiveHandle *AH, TocEntry *te);
+static void _WriteExtraToc(ArchiveHandle *AH, TocEntry *te);
+static void _ReadExtraToc(ArchiveHandle *AH, TocEntry *te);
+static void _PrintExtraToc(ArchiveHandle *AH, TocEntry *te);
+
+static void _PrintData(ArchiveHandle *AH);
+static void _skipData(ArchiveHandle *AH);
+static void _skipBlobs(ArchiveHandle *AH);
+
+static void _StartBlobs(ArchiveHandle *AH, TocEntry *te);
+static void _StartBlob(ArchiveHandle *AH, TocEntry *te, Oid oid);
+static void _EndBlob(ArchiveHandle *AH, TocEntry *te, Oid oid);
+static void _EndBlobs(ArchiveHandle *AH, TocEntry *te);
+static void _LoadBlobs(ArchiveHandle *AH, bool drop);
+
+static void _PrepParallelRestore(ArchiveHandle *AH);
+static void _Clone(ArchiveHandle *AH);
+static void _DeClone(ArchiveHandle *AH);
+
+static int _WorkerJobRestoreCustom(ArchiveHandle *AH, TocEntry *te);
+
+typedef struct
+{
+ CompressorState *cs;
+ int hasSeek;
+ /* lastFilePos is used only when reading, and may be invalid if !hasSeek */
+ pgoff_t lastFilePos; /* position after last data block we've read */
+} lclContext;
+
+typedef struct
+{
+ int dataState;
+ pgoff_t dataPos; /* valid only if dataState=K_OFFSET_POS_SET */
+} lclTocEntry;
+
+
+/*------
+ * Static declarations
+ *------
+ */
+static void _readBlockHeader(ArchiveHandle *AH, int *type, int *id);
+static pgoff_t _getFilePos(ArchiveHandle *AH, lclContext *ctx);
+
+static void _CustomWriteFunc(ArchiveHandle *AH, const char *buf, size_t len);
+static size_t _CustomReadFunc(ArchiveHandle *AH, char **buf, size_t *buflen);
+
+
+/*
+ * Init routine required by ALL formats. This is a global routine
+ * and should be declared in pg_backup_archiver.h
+ *
+ * It's task is to create any extra archive context (using AH->formatData),
+ * and to initialize the supported function pointers.
+ *
+ * It should also prepare whatever it's input source is for reading/writing,
+ * and in the case of a read mode connection, it should load the Header & TOC.
+ */
+void
+InitArchiveFmt_Custom(ArchiveHandle *AH)
+{
+ lclContext *ctx;
+
+ /* Assuming static functions, this can be copied for each format. */
+ AH->ArchiveEntryPtr = _ArchiveEntry;
+ AH->StartDataPtr = _StartData;
+ AH->WriteDataPtr = _WriteData;
+ AH->EndDataPtr = _EndData;
+ AH->WriteBytePtr = _WriteByte;
+ AH->ReadBytePtr = _ReadByte;
+ AH->WriteBufPtr = _WriteBuf;
+ AH->ReadBufPtr = _ReadBuf;
+ AH->ClosePtr = _CloseArchive;
+ AH->ReopenPtr = _ReopenArchive;
+ AH->PrintTocDataPtr = _PrintTocData;
+ AH->ReadExtraTocPtr = _ReadExtraToc;
+ AH->WriteExtraTocPtr = _WriteExtraToc;
+ AH->PrintExtraTocPtr = _PrintExtraToc;
+
+ AH->StartBlobsPtr = _StartBlobs;
+ AH->StartBlobPtr = _StartBlob;
+ AH->EndBlobPtr = _EndBlob;
+ AH->EndBlobsPtr = _EndBlobs;
+
+ AH->PrepParallelRestorePtr = _PrepParallelRestore;
+ AH->ClonePtr = _Clone;
+ AH->DeClonePtr = _DeClone;
+
+ /* no parallel dump in the custom archive, only parallel restore */
+ AH->WorkerJobDumpPtr = NULL;
+ AH->WorkerJobRestorePtr = _WorkerJobRestoreCustom;
+
+ /* Set up a private area. */
+ ctx = (lclContext *) pg_malloc0(sizeof(lclContext));
+ AH->formatData = (void *) ctx;
+
+ /* Initialize LO buffering */
+ AH->lo_buf_size = LOBBUFSIZE;
+ AH->lo_buf = (void *) pg_malloc(LOBBUFSIZE);
+
+ /*
+ * Now open the file
+ */
+ if (AH->mode == archModeWrite)
+ {
+ if (AH->fSpec && strcmp(AH->fSpec, "") != 0)
+ {
+ AH->FH = fopen(AH->fSpec, PG_BINARY_W);
+ if (!AH->FH)
+ fatal("could not open output file \"%s\": %m", AH->fSpec);
+ }
+ else
+ {
+ AH->FH = stdout;
+ if (!AH->FH)
+ fatal("could not open output file: %m");
+ }
+
+ ctx->hasSeek = checkSeek(AH->FH);
+ }
+ else
+ {
+ if (AH->fSpec && strcmp(AH->fSpec, "") != 0)
+ {
+ AH->FH = fopen(AH->fSpec, PG_BINARY_R);
+ if (!AH->FH)
+ fatal("could not open input file \"%s\": %m", AH->fSpec);
+ }
+ else
+ {
+ AH->FH = stdin;
+ if (!AH->FH)
+ fatal("could not open input file: %m");
+ }
+
+ ctx->hasSeek = checkSeek(AH->FH);
+
+ ReadHead(AH);
+ ReadToc(AH);
+
+ /*
+ * Remember location of first data block (i.e., the point after TOC)
+ * in case we have to search for desired data blocks.
+ */
+ ctx->lastFilePos = _getFilePos(AH, ctx);
+ }
+}
+
+/*
+ * Called by the Archiver when the dumper creates a new TOC entry.
+ *
+ * Optional.
+ *
+ * Set up extract format-related TOC data.
+*/
+static void
+_ArchiveEntry(ArchiveHandle *AH, TocEntry *te)
+{
+ lclTocEntry *ctx;
+
+ ctx = (lclTocEntry *) pg_malloc0(sizeof(lclTocEntry));
+ if (te->dataDumper)
+ ctx->dataState = K_OFFSET_POS_NOT_SET;
+ else
+ ctx->dataState = K_OFFSET_NO_DATA;
+
+ te->formatData = (void *) ctx;
+}
+
+/*
+ * Called by the Archiver to save any extra format-related TOC entry
+ * data.
+ *
+ * Optional.
+ *
+ * Use the Archiver routines to write data - they are non-endian, and
+ * maintain other important file information.
+ */
+static void
+_WriteExtraToc(ArchiveHandle *AH, TocEntry *te)
+{
+ lclTocEntry *ctx = (lclTocEntry *) te->formatData;
+
+ WriteOffset(AH, ctx->dataPos, ctx->dataState);
+}
+
+/*
+ * Called by the Archiver to read any extra format-related TOC data.
+ *
+ * Optional.
+ *
+ * Needs to match the order defined in _WriteExtraToc, and should also
+ * use the Archiver input routines.
+ */
+static void
+_ReadExtraToc(ArchiveHandle *AH, TocEntry *te)
+{
+ lclTocEntry *ctx = (lclTocEntry *) te->formatData;
+
+ if (ctx == NULL)
+ {
+ ctx = (lclTocEntry *) pg_malloc0(sizeof(lclTocEntry));
+ te->formatData = (void *) ctx;
+ }
+
+ ctx->dataState = ReadOffset(AH, &(ctx->dataPos));
+
+ /*
+ * Prior to V1.7 (pg7.3), we dumped the data size as an int now we don't
+ * dump it at all.
+ */
+ if (AH->version < K_VERS_1_7)
+ ReadInt(AH);
+}
+
+/*
+ * Called by the Archiver when restoring an archive to output a comment
+ * that includes useful information about the TOC entry.
+ *
+ * Optional.
+ *
+ */
+static void
+_PrintExtraToc(ArchiveHandle *AH, TocEntry *te)
+{
+ lclTocEntry *ctx = (lclTocEntry *) te->formatData;
+
+ if (AH->public.verbose)
+ ahprintf(AH, "-- Data Pos: " INT64_FORMAT "\n",
+ (int64) ctx->dataPos);
+}
+
+/*
+ * Called by the archiver when saving TABLE DATA (not schema). This routine
+ * should save whatever format-specific information is needed to read
+ * the archive back.
+ *
+ * It is called just prior to the dumper's 'DataDumper' routine being called.
+ *
+ * Optional, but strongly recommended.
+ *
+ */
+static void
+_StartData(ArchiveHandle *AH, TocEntry *te)
+{
+ lclContext *ctx = (lclContext *) AH->formatData;
+ lclTocEntry *tctx = (lclTocEntry *) te->formatData;
+
+ tctx->dataPos = _getFilePos(AH, ctx);
+ if (tctx->dataPos >= 0)
+ tctx->dataState = K_OFFSET_POS_SET;
+
+ _WriteByte(AH, BLK_DATA); /* Block type */
+ WriteInt(AH, te->dumpId); /* For sanity check */
+
+ ctx->cs = AllocateCompressor(AH->compression, _CustomWriteFunc);
+}
+
+/*
+ * Called by archiver when dumper calls WriteData. This routine is
+ * called for both BLOB and TABLE data; it is the responsibility of
+ * the format to manage each kind of data using StartBlob/StartData.
+ *
+ * It should only be called from within a DataDumper routine.
+ *
+ * Mandatory.
+ */
+static void
+_WriteData(ArchiveHandle *AH, const void *data, size_t dLen)
+{
+ lclContext *ctx = (lclContext *) AH->formatData;
+ CompressorState *cs = ctx->cs;
+
+ if (dLen > 0)
+ /* WriteDataToArchive() internally throws write errors */
+ WriteDataToArchive(AH, cs, data, dLen);
+}
+
+/*
+ * Called by the archiver when a dumper's 'DataDumper' routine has
+ * finished.
+ *
+ * Optional.
+ *
+ */
+static void
+_EndData(ArchiveHandle *AH, TocEntry *te)
+{
+ lclContext *ctx = (lclContext *) AH->formatData;
+
+ EndCompressor(AH, ctx->cs);
+ /* Send the end marker */
+ WriteInt(AH, 0);
+}
+
+/*
+ * Called by the archiver when starting to save all BLOB DATA (not schema).
+ * This routine should save whatever format-specific information is needed
+ * to read the BLOBs back into memory.
+ *
+ * It is called just prior to the dumper's DataDumper routine.
+ *
+ * Optional, but strongly recommended.
+ */
+static void
+_StartBlobs(ArchiveHandle *AH, TocEntry *te)
+{
+ lclContext *ctx = (lclContext *) AH->formatData;
+ lclTocEntry *tctx = (lclTocEntry *) te->formatData;
+
+ tctx->dataPos = _getFilePos(AH, ctx);
+ if (tctx->dataPos >= 0)
+ tctx->dataState = K_OFFSET_POS_SET;
+
+ _WriteByte(AH, BLK_BLOBS); /* Block type */
+ WriteInt(AH, te->dumpId); /* For sanity check */
+}
+
+/*
+ * Called by the archiver when the dumper calls StartBlob.
+ *
+ * Mandatory.
+ *
+ * Must save the passed OID for retrieval at restore-time.
+ */
+static void
+_StartBlob(ArchiveHandle *AH, TocEntry *te, Oid oid)
+{
+ lclContext *ctx = (lclContext *) AH->formatData;
+
+ if (oid == 0)
+ fatal("invalid OID for large object");
+
+ WriteInt(AH, oid);
+
+ ctx->cs = AllocateCompressor(AH->compression, _CustomWriteFunc);
+}
+
+/*
+ * Called by the archiver when the dumper calls EndBlob.
+ *
+ * Optional.
+ */
+static void
+_EndBlob(ArchiveHandle *AH, TocEntry *te, Oid oid)
+{
+ lclContext *ctx = (lclContext *) AH->formatData;
+
+ EndCompressor(AH, ctx->cs);
+ /* Send the end marker */
+ WriteInt(AH, 0);
+}
+
+/*
+ * Called by the archiver when finishing saving all BLOB DATA.
+ *
+ * Optional.
+ */
+static void
+_EndBlobs(ArchiveHandle *AH, TocEntry *te)
+{
+ /* Write out a fake zero OID to mark end-of-blobs. */
+ WriteInt(AH, 0);
+}
+
+/*
+ * Print data for a given TOC entry
+ */
+static void
+_PrintTocData(ArchiveHandle *AH, TocEntry *te)
+{
+ lclContext *ctx = (lclContext *) AH->formatData;
+ lclTocEntry *tctx = (lclTocEntry *) te->formatData;
+ int blkType;
+ int id;
+
+ if (tctx->dataState == K_OFFSET_NO_DATA)
+ return;
+
+ if (!ctx->hasSeek || tctx->dataState == K_OFFSET_POS_NOT_SET)
+ {
+ /*
+ * We cannot seek directly to the desired block. Instead, skip over
+ * block headers until we find the one we want. Remember the
+ * positions of skipped-over blocks, so that if we later decide we
+ * need to read one, we'll be able to seek to it.
+ *
+ * When our input file is seekable, we can do the search starting from
+ * the point after the last data block we scanned in previous
+ * iterations of this function.
+ */
+ if (ctx->hasSeek)
+ {
+ if (fseeko(AH->FH, ctx->lastFilePos, SEEK_SET) != 0)
+ fatal("error during file seek: %m");
+ }
+
+ for (;;)
+ {
+ pgoff_t thisBlkPos = _getFilePos(AH, ctx);
+
+ _readBlockHeader(AH, &blkType, &id);
+
+ if (blkType == EOF || id == te->dumpId)
+ break;
+
+ /* Remember the block position, if we got one */
+ if (thisBlkPos >= 0)
+ {
+ TocEntry *otherte = getTocEntryByDumpId(AH, id);
+
+ if (otherte && otherte->formatData)
+ {
+ lclTocEntry *othertctx = (lclTocEntry *) otherte->formatData;
+
+ /*
+ * Note: on Windows, multiple threads might access/update
+ * the same lclTocEntry concurrently, but that should be
+ * safe as long as we update dataPos before dataState.
+ * Ideally, we'd use pg_write_barrier() to enforce that,
+ * but the needed infrastructure doesn't exist in frontend
+ * code. But Windows only runs on machines with strong
+ * store ordering, so it should be okay for now.
+ */
+ if (othertctx->dataState == K_OFFSET_POS_NOT_SET)
+ {
+ othertctx->dataPos = thisBlkPos;
+ othertctx->dataState = K_OFFSET_POS_SET;
+ }
+ else if (othertctx->dataPos != thisBlkPos ||
+ othertctx->dataState != K_OFFSET_POS_SET)
+ {
+ /* sanity check */
+ pg_log_warning("data block %d has wrong seek position",
+ id);
+ }
+ }
+ }
+
+ switch (blkType)
+ {
+ case BLK_DATA:
+ _skipData(AH);
+ break;
+
+ case BLK_BLOBS:
+ _skipBlobs(AH);
+ break;
+
+ default: /* Always have a default */
+ fatal("unrecognized data block type (%d) while searching archive",
+ blkType);
+ break;
+ }
+ }
+ }
+ else
+ {
+ /* We can just seek to the place we need to be. */
+ if (fseeko(AH->FH, tctx->dataPos, SEEK_SET) != 0)
+ fatal("error during file seek: %m");
+
+ _readBlockHeader(AH, &blkType, &id);
+ }
+
+ /*
+ * If we reached EOF without finding the block we want, then either it
+ * doesn't exist, or it does but we lack the ability to seek back to it.
+ */
+ if (blkType == EOF)
+ {
+ if (!ctx->hasSeek)
+ fatal("could not find block ID %d in archive -- "
+ "possibly due to out-of-order restore request, "
+ "which cannot be handled due to non-seekable input file",
+ te->dumpId);
+ else
+ fatal("could not find block ID %d in archive -- "
+ "possibly corrupt archive",
+ te->dumpId);
+ }
+
+ /* Are we sane? */
+ if (id != te->dumpId)
+ fatal("found unexpected block ID (%d) when reading data -- expected %d",
+ id, te->dumpId);
+
+ switch (blkType)
+ {
+ case BLK_DATA:
+ _PrintData(AH);
+ break;
+
+ case BLK_BLOBS:
+ _LoadBlobs(AH, AH->public.ropt->dropSchema);
+ break;
+
+ default: /* Always have a default */
+ fatal("unrecognized data block type %d while restoring archive",
+ blkType);
+ break;
+ }
+
+ /*
+ * If our input file is seekable but lacks data offsets, update our
+ * knowledge of where to start future searches from. (Note that we did
+ * not update the current TE's dataState/dataPos. We could have, but
+ * there is no point since it will not be visited again.)
+ */
+ if (ctx->hasSeek && tctx->dataState == K_OFFSET_POS_NOT_SET)
+ {
+ pgoff_t curPos = _getFilePos(AH, ctx);
+
+ if (curPos > ctx->lastFilePos)
+ ctx->lastFilePos = curPos;
+ }
+}
+
+/*
+ * Print data from current file position.
+*/
+static void
+_PrintData(ArchiveHandle *AH)
+{
+ ReadDataFromArchive(AH, AH->compression, _CustomReadFunc);
+}
+
+static void
+_LoadBlobs(ArchiveHandle *AH, bool drop)
+{
+ Oid oid;
+
+ StartRestoreBlobs(AH);
+
+ oid = ReadInt(AH);
+ while (oid != 0)
+ {
+ StartRestoreBlob(AH, oid, drop);
+ _PrintData(AH);
+ EndRestoreBlob(AH, oid);
+ oid = ReadInt(AH);
+ }
+
+ EndRestoreBlobs(AH);
+}
+
+/*
+ * Skip the BLOBs from the current file position.
+ * BLOBS are written sequentially as data blocks (see below).
+ * Each BLOB is preceded by it's original OID.
+ * A zero OID indicated the end of the BLOBS
+ */
+static void
+_skipBlobs(ArchiveHandle *AH)
+{
+ Oid oid;
+
+ oid = ReadInt(AH);
+ while (oid != 0)
+ {
+ _skipData(AH);
+ oid = ReadInt(AH);
+ }
+}
+
+/*
+ * Skip data from current file position.
+ * Data blocks are formatted as an integer length, followed by data.
+ * A zero length denoted the end of the block.
+*/
+static void
+_skipData(ArchiveHandle *AH)
+{
+ lclContext *ctx = (lclContext *) AH->formatData;
+ size_t blkLen;
+ char *buf = NULL;
+ int buflen = 0;
+ size_t cnt;
+
+ blkLen = ReadInt(AH);
+ while (blkLen != 0)
+ {
+ if (ctx->hasSeek)
+ {
+ if (fseeko(AH->FH, blkLen, SEEK_CUR) != 0)
+ fatal("error during file seek: %m");
+ }
+ else
+ {
+ if (blkLen > buflen)
+ {
+ if (buf)
+ free(buf);
+ buf = (char *) pg_malloc(blkLen);
+ buflen = blkLen;
+ }
+ if ((cnt = fread(buf, 1, blkLen, AH->FH)) != blkLen)
+ {
+ if (feof(AH->FH))
+ fatal("could not read from input file: end of file");
+ else
+ fatal("could not read from input file: %m");
+ }
+ }
+
+ blkLen = ReadInt(AH);
+ }
+
+ if (buf)
+ free(buf);
+}
+
+/*
+ * Write a byte of data to the archive.
+ *
+ * Mandatory.
+ *
+ * Called by the archiver to do integer & byte output to the archive.
+ */
+static int
+_WriteByte(ArchiveHandle *AH, const int i)
+{
+ int res;
+
+ if ((res = fputc(i, AH->FH)) == EOF)
+ WRITE_ERROR_EXIT;
+
+ return 1;
+}
+
+/*
+ * Read a byte of data from the archive.
+ *
+ * Mandatory
+ *
+ * Called by the archiver to read bytes & integers from the archive.
+ * EOF should be treated as a fatal error.
+ */
+static int
+_ReadByte(ArchiveHandle *AH)
+{
+ int res;
+
+ res = getc(AH->FH);
+ if (res == EOF)
+ READ_ERROR_EXIT(AH->FH);
+ return res;
+}
+
+/*
+ * Write a buffer of data to the archive.
+ *
+ * Mandatory.
+ *
+ * Called by the archiver to write a block of bytes to the archive.
+ */
+static void
+_WriteBuf(ArchiveHandle *AH, const void *buf, size_t len)
+{
+ if (fwrite(buf, 1, len, AH->FH) != len)
+ WRITE_ERROR_EXIT;
+}
+
+/*
+ * Read a block of bytes from the archive.
+ *
+ * Mandatory.
+ *
+ * Called by the archiver to read a block of bytes from the archive
+ */
+static void
+_ReadBuf(ArchiveHandle *AH, void *buf, size_t len)
+{
+ if (fread(buf, 1, len, AH->FH) != len)
+ READ_ERROR_EXIT(AH->FH);
+}
+
+/*
+ * Close the archive.
+ *
+ * Mandatory.
+ *
+ * When writing the archive, this is the routine that actually starts
+ * the process of saving it to files. No data should be written prior
+ * to this point, since the user could sort the TOC after creating it.
+ *
+ * If an archive is to be written, this routine must call:
+ * WriteHead to save the archive header
+ * WriteToc to save the TOC entries
+ * WriteDataChunks to save all DATA & BLOBs.
+ *
+ */
+static void
+_CloseArchive(ArchiveHandle *AH)
+{
+ lclContext *ctx = (lclContext *) AH->formatData;
+ pgoff_t tpos;
+
+ if (AH->mode == archModeWrite)
+ {
+ WriteHead(AH);
+ /* Remember TOC's seek position for use below */
+ tpos = ftello(AH->FH);
+ if (tpos < 0 && ctx->hasSeek)
+ fatal("could not determine seek position in archive file: %m");
+ WriteToc(AH);
+ WriteDataChunks(AH, NULL);
+
+ /*
+ * If possible, re-write the TOC in order to update the data offset
+ * information. This is not essential, as pg_restore can cope in most
+ * cases without it; but it can make pg_restore significantly faster
+ * in some situations (especially parallel restore).
+ */
+ if (ctx->hasSeek &&
+ fseeko(AH->FH, tpos, SEEK_SET) == 0)
+ WriteToc(AH);
+ }
+
+ if (fclose(AH->FH) != 0)
+ fatal("could not close archive file: %m");
+
+ /* Sync the output file if one is defined */
+ if (AH->dosync && AH->mode == archModeWrite && AH->fSpec)
+ (void) fsync_fname(AH->fSpec, false);
+
+ AH->FH = NULL;
+}
+
+/*
+ * Reopen the archive's file handle.
+ *
+ * We close the original file handle, except on Windows. (The difference
+ * is because on Windows, this is used within a multithreading context,
+ * and we don't want a thread closing the parent file handle.)
+ */
+static void
+_ReopenArchive(ArchiveHandle *AH)
+{
+ lclContext *ctx = (lclContext *) AH->formatData;
+ pgoff_t tpos;
+
+ if (AH->mode == archModeWrite)
+ fatal("can only reopen input archives");
+
+ /*
+ * These two cases are user-facing errors since they represent unsupported
+ * (but not invalid) use-cases. Word the error messages appropriately.
+ */
+ if (AH->fSpec == NULL || strcmp(AH->fSpec, "") == 0)
+ fatal("parallel restore from standard input is not supported");
+ if (!ctx->hasSeek)
+ fatal("parallel restore from non-seekable file is not supported");
+
+ tpos = ftello(AH->FH);
+ if (tpos < 0)
+ fatal("could not determine seek position in archive file: %m");
+
+#ifndef WIN32
+ if (fclose(AH->FH) != 0)
+ fatal("could not close archive file: %m");
+#endif
+
+ AH->FH = fopen(AH->fSpec, PG_BINARY_R);
+ if (!AH->FH)
+ fatal("could not open input file \"%s\": %m", AH->fSpec);
+
+ if (fseeko(AH->FH, tpos, SEEK_SET) != 0)
+ fatal("could not set seek position in archive file: %m");
+}
+
+/*
+ * Prepare for parallel restore.
+ *
+ * The main thing that needs to happen here is to fill in TABLE DATA and BLOBS
+ * TOC entries' dataLength fields with appropriate values to guide the
+ * ordering of restore jobs. The source of said data is format-dependent,
+ * as is the exact meaning of the values.
+ *
+ * A format module might also choose to do other setup here.
+ */
+static void
+_PrepParallelRestore(ArchiveHandle *AH)
+{
+ lclContext *ctx = (lclContext *) AH->formatData;
+ TocEntry *prev_te = NULL;
+ lclTocEntry *prev_tctx = NULL;
+ TocEntry *te;
+
+ /*
+ * Knowing that the data items were dumped out in TOC order, we can
+ * reconstruct the length of each item as the delta to the start offset of
+ * the next data item.
+ */
+ for (te = AH->toc->next; te != AH->toc; te = te->next)
+ {
+ lclTocEntry *tctx = (lclTocEntry *) te->formatData;
+
+ /*
+ * Ignore entries without a known data offset; if we were unable to
+ * seek to rewrite the TOC when creating the archive, this'll be all
+ * of them, and we'll end up with no size estimates.
+ */
+ if (tctx->dataState != K_OFFSET_POS_SET)
+ continue;
+
+ /* Compute previous data item's length */
+ if (prev_te)
+ {
+ if (tctx->dataPos > prev_tctx->dataPos)
+ prev_te->dataLength = tctx->dataPos - prev_tctx->dataPos;
+ }
+
+ prev_te = te;
+ prev_tctx = tctx;
+ }
+
+ /* If OK to seek, we can determine the length of the last item */
+ if (prev_te && ctx->hasSeek)
+ {
+ pgoff_t endpos;
+
+ if (fseeko(AH->FH, 0, SEEK_END) != 0)
+ fatal("error during file seek: %m");
+ endpos = ftello(AH->FH);
+ if (endpos > prev_tctx->dataPos)
+ prev_te->dataLength = endpos - prev_tctx->dataPos;
+ }
+}
+
+/*
+ * Clone format-specific fields during parallel restoration.
+ */
+static void
+_Clone(ArchiveHandle *AH)
+{
+ lclContext *ctx = (lclContext *) AH->formatData;
+
+ /*
+ * Each thread must have private lclContext working state.
+ */
+ AH->formatData = (lclContext *) pg_malloc(sizeof(lclContext));
+ memcpy(AH->formatData, ctx, sizeof(lclContext));
+ ctx = (lclContext *) AH->formatData;
+
+ /* sanity check, shouldn't happen */
+ if (ctx->cs != NULL)
+ fatal("compressor active");
+
+ /*
+ * We intentionally do not clone TOC-entry-local state: it's useful to
+ * share knowledge about where the data blocks are across threads.
+ * _PrintTocData has to be careful about the order of operations on that
+ * state, though.
+ *
+ * Note: we do not make a local lo_buf because we expect at most one BLOBS
+ * entry per archive, so no parallelism is possible.
+ */
+}
+
+static void
+_DeClone(ArchiveHandle *AH)
+{
+ lclContext *ctx = (lclContext *) AH->formatData;
+
+ free(ctx);
+}
+
+/*
+ * This function is executed in the child of a parallel restore from a
+ * custom-format archive and restores the actual data for one TOC entry.
+ */
+static int
+_WorkerJobRestoreCustom(ArchiveHandle *AH, TocEntry *te)
+{
+ return parallel_restore(AH, te);
+}
+
+/*--------------------------------------------------
+ * END OF FORMAT CALLBACKS
+ *--------------------------------------------------
+ */
+
+/*
+ * Get the current position in the archive file.
+ *
+ * With a non-seekable archive file, we may not be able to obtain the
+ * file position. If so, just return -1. It's not too important in
+ * that case because we won't be able to rewrite the TOC to fill in
+ * data block offsets anyway.
+ */
+static pgoff_t
+_getFilePos(ArchiveHandle *AH, lclContext *ctx)
+{
+ pgoff_t pos;
+
+ pos = ftello(AH->FH);
+ if (pos < 0)
+ {
+ /* Not expected if we found we can seek. */
+ if (ctx->hasSeek)
+ fatal("could not determine seek position in archive file: %m");
+ }
+ return pos;
+}
+
+/*
+ * Read a data block header. The format changed in V1.3, so we
+ * centralize the code here for simplicity. Returns *type = EOF
+ * if at EOF.
+ */
+static void
+_readBlockHeader(ArchiveHandle *AH, int *type, int *id)
+{
+ int byt;
+
+ /*
+ * Note: if we are at EOF with a pre-1.3 input file, we'll fatal() inside
+ * ReadInt rather than returning EOF. It doesn't seem worth jumping
+ * through hoops to deal with that case better, because no such files are
+ * likely to exist in the wild: only some 7.1 development versions of
+ * pg_dump ever generated such files.
+ */
+ if (AH->version < K_VERS_1_3)
+ *type = BLK_DATA;
+ else
+ {
+ byt = getc(AH->FH);
+ *type = byt;
+ if (byt == EOF)
+ {
+ *id = 0; /* don't return an uninitialized value */
+ return;
+ }
+ }
+
+ *id = ReadInt(AH);
+}
+
+/*
+ * Callback function for WriteDataToArchive. Writes one block of (compressed)
+ * data to the archive.
+ */
+static void
+_CustomWriteFunc(ArchiveHandle *AH, const char *buf, size_t len)
+{
+ /* never write 0-byte blocks (this should not happen) */
+ if (len > 0)
+ {
+ WriteInt(AH, len);
+ _WriteBuf(AH, buf, len);
+ }
+}
+
+/*
+ * Callback function for ReadDataFromArchive. To keep things simple, we
+ * always read one compressed block at a time.
+ */
+static size_t
+_CustomReadFunc(ArchiveHandle *AH, char **buf, size_t *buflen)
+{
+ size_t blkLen;
+
+ /* Read length */
+ blkLen = ReadInt(AH);
+ if (blkLen == 0)
+ return 0;
+
+ /* If the caller's buffer is not large enough, allocate a bigger one */
+ if (blkLen > *buflen)
+ {
+ free(*buf);
+ *buf = (char *) pg_malloc(blkLen);
+ *buflen = blkLen;
+ }
+
+ /* exits app on read errors */
+ _ReadBuf(AH, *buf, blkLen);
+
+ return blkLen;
+}