diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-05-04 12:19:15 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-05-04 12:19:15 +0000 |
commit | 6eb9c5a5657d1fe77b55cc261450f3538d35a94d (patch) | |
tree | 657d8194422a5daccecfd42d654b8a245ef7b4c8 /src/bin/pg_dump/pg_backup_custom.c | |
parent | Initial commit. (diff) | |
download | postgresql-13-6eb9c5a5657d1fe77b55cc261450f3538d35a94d.tar.xz postgresql-13-6eb9c5a5657d1fe77b55cc261450f3538d35a94d.zip |
Adding upstream version 13.4.upstream/13.4upstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'src/bin/pg_dump/pg_backup_custom.c')
-rw-r--r-- | src/bin/pg_dump/pg_backup_custom.c | 1027 |
1 files changed, 1027 insertions, 0 deletions
diff --git a/src/bin/pg_dump/pg_backup_custom.c b/src/bin/pg_dump/pg_backup_custom.c new file mode 100644 index 0000000..895ff76 --- /dev/null +++ b/src/bin/pg_dump/pg_backup_custom.c @@ -0,0 +1,1027 @@ +/*------------------------------------------------------------------------- + * + * pg_backup_custom.c + * + * Implements the custom output format. + * + * The comments with the routines in this code are a good place to + * understand how to write a new format. + * + * See the headers to pg_restore for more details. + * + * Copyright (c) 2000, Philip Warner + * Rights are granted to use this software in any way so long + * as this notice is not removed. + * + * The author is not responsible for loss or damages that may + * and any liability will be limited to the time taken to fix any + * related bug. + * + * + * IDENTIFICATION + * src/bin/pg_dump/pg_backup_custom.c + * + *------------------------------------------------------------------------- + */ +#include "postgres_fe.h" + +#include "common/file_utils.h" +#include "compress_io.h" +#include "parallel.h" +#include "pg_backup_utils.h" + +/*-------- + * Routines in the format interface + *-------- + */ + +static void _ArchiveEntry(ArchiveHandle *AH, TocEntry *te); +static void _StartData(ArchiveHandle *AH, TocEntry *te); +static void _WriteData(ArchiveHandle *AH, const void *data, size_t dLen); +static void _EndData(ArchiveHandle *AH, TocEntry *te); +static int _WriteByte(ArchiveHandle *AH, const int i); +static int _ReadByte(ArchiveHandle *); +static void _WriteBuf(ArchiveHandle *AH, const void *buf, size_t len); +static void _ReadBuf(ArchiveHandle *AH, void *buf, size_t len); +static void _CloseArchive(ArchiveHandle *AH); +static void _ReopenArchive(ArchiveHandle *AH); +static void _PrintTocData(ArchiveHandle *AH, TocEntry *te); +static void _WriteExtraToc(ArchiveHandle *AH, TocEntry *te); +static void _ReadExtraToc(ArchiveHandle *AH, TocEntry *te); +static void _PrintExtraToc(ArchiveHandle *AH, TocEntry *te); + +static void _PrintData(ArchiveHandle *AH); +static void _skipData(ArchiveHandle *AH); +static void _skipBlobs(ArchiveHandle *AH); + +static void _StartBlobs(ArchiveHandle *AH, TocEntry *te); +static void _StartBlob(ArchiveHandle *AH, TocEntry *te, Oid oid); +static void _EndBlob(ArchiveHandle *AH, TocEntry *te, Oid oid); +static void _EndBlobs(ArchiveHandle *AH, TocEntry *te); +static void _LoadBlobs(ArchiveHandle *AH, bool drop); + +static void _PrepParallelRestore(ArchiveHandle *AH); +static void _Clone(ArchiveHandle *AH); +static void _DeClone(ArchiveHandle *AH); + +static int _WorkerJobRestoreCustom(ArchiveHandle *AH, TocEntry *te); + +typedef struct +{ + CompressorState *cs; + int hasSeek; + /* lastFilePos is used only when reading, and may be invalid if !hasSeek */ + pgoff_t lastFilePos; /* position after last data block we've read */ +} lclContext; + +typedef struct +{ + int dataState; + pgoff_t dataPos; /* valid only if dataState=K_OFFSET_POS_SET */ +} lclTocEntry; + + +/*------ + * Static declarations + *------ + */ +static void _readBlockHeader(ArchiveHandle *AH, int *type, int *id); +static pgoff_t _getFilePos(ArchiveHandle *AH, lclContext *ctx); + +static void _CustomWriteFunc(ArchiveHandle *AH, const char *buf, size_t len); +static size_t _CustomReadFunc(ArchiveHandle *AH, char **buf, size_t *buflen); + + +/* + * Init routine required by ALL formats. This is a global routine + * and should be declared in pg_backup_archiver.h + * + * It's task is to create any extra archive context (using AH->formatData), + * and to initialize the supported function pointers. + * + * It should also prepare whatever it's input source is for reading/writing, + * and in the case of a read mode connection, it should load the Header & TOC. + */ +void +InitArchiveFmt_Custom(ArchiveHandle *AH) +{ + lclContext *ctx; + + /* Assuming static functions, this can be copied for each format. */ + AH->ArchiveEntryPtr = _ArchiveEntry; + AH->StartDataPtr = _StartData; + AH->WriteDataPtr = _WriteData; + AH->EndDataPtr = _EndData; + AH->WriteBytePtr = _WriteByte; + AH->ReadBytePtr = _ReadByte; + AH->WriteBufPtr = _WriteBuf; + AH->ReadBufPtr = _ReadBuf; + AH->ClosePtr = _CloseArchive; + AH->ReopenPtr = _ReopenArchive; + AH->PrintTocDataPtr = _PrintTocData; + AH->ReadExtraTocPtr = _ReadExtraToc; + AH->WriteExtraTocPtr = _WriteExtraToc; + AH->PrintExtraTocPtr = _PrintExtraToc; + + AH->StartBlobsPtr = _StartBlobs; + AH->StartBlobPtr = _StartBlob; + AH->EndBlobPtr = _EndBlob; + AH->EndBlobsPtr = _EndBlobs; + + AH->PrepParallelRestorePtr = _PrepParallelRestore; + AH->ClonePtr = _Clone; + AH->DeClonePtr = _DeClone; + + /* no parallel dump in the custom archive, only parallel restore */ + AH->WorkerJobDumpPtr = NULL; + AH->WorkerJobRestorePtr = _WorkerJobRestoreCustom; + + /* Set up a private area. */ + ctx = (lclContext *) pg_malloc0(sizeof(lclContext)); + AH->formatData = (void *) ctx; + + /* Initialize LO buffering */ + AH->lo_buf_size = LOBBUFSIZE; + AH->lo_buf = (void *) pg_malloc(LOBBUFSIZE); + + /* + * Now open the file + */ + if (AH->mode == archModeWrite) + { + if (AH->fSpec && strcmp(AH->fSpec, "") != 0) + { + AH->FH = fopen(AH->fSpec, PG_BINARY_W); + if (!AH->FH) + fatal("could not open output file \"%s\": %m", AH->fSpec); + } + else + { + AH->FH = stdout; + if (!AH->FH) + fatal("could not open output file: %m"); + } + + ctx->hasSeek = checkSeek(AH->FH); + } + else + { + if (AH->fSpec && strcmp(AH->fSpec, "") != 0) + { + AH->FH = fopen(AH->fSpec, PG_BINARY_R); + if (!AH->FH) + fatal("could not open input file \"%s\": %m", AH->fSpec); + } + else + { + AH->FH = stdin; + if (!AH->FH) + fatal("could not open input file: %m"); + } + + ctx->hasSeek = checkSeek(AH->FH); + + ReadHead(AH); + ReadToc(AH); + + /* + * Remember location of first data block (i.e., the point after TOC) + * in case we have to search for desired data blocks. + */ + ctx->lastFilePos = _getFilePos(AH, ctx); + } +} + +/* + * Called by the Archiver when the dumper creates a new TOC entry. + * + * Optional. + * + * Set up extract format-related TOC data. +*/ +static void +_ArchiveEntry(ArchiveHandle *AH, TocEntry *te) +{ + lclTocEntry *ctx; + + ctx = (lclTocEntry *) pg_malloc0(sizeof(lclTocEntry)); + if (te->dataDumper) + ctx->dataState = K_OFFSET_POS_NOT_SET; + else + ctx->dataState = K_OFFSET_NO_DATA; + + te->formatData = (void *) ctx; +} + +/* + * Called by the Archiver to save any extra format-related TOC entry + * data. + * + * Optional. + * + * Use the Archiver routines to write data - they are non-endian, and + * maintain other important file information. + */ +static void +_WriteExtraToc(ArchiveHandle *AH, TocEntry *te) +{ + lclTocEntry *ctx = (lclTocEntry *) te->formatData; + + WriteOffset(AH, ctx->dataPos, ctx->dataState); +} + +/* + * Called by the Archiver to read any extra format-related TOC data. + * + * Optional. + * + * Needs to match the order defined in _WriteExtraToc, and should also + * use the Archiver input routines. + */ +static void +_ReadExtraToc(ArchiveHandle *AH, TocEntry *te) +{ + lclTocEntry *ctx = (lclTocEntry *) te->formatData; + + if (ctx == NULL) + { + ctx = (lclTocEntry *) pg_malloc0(sizeof(lclTocEntry)); + te->formatData = (void *) ctx; + } + + ctx->dataState = ReadOffset(AH, &(ctx->dataPos)); + + /* + * Prior to V1.7 (pg7.3), we dumped the data size as an int now we don't + * dump it at all. + */ + if (AH->version < K_VERS_1_7) + ReadInt(AH); +} + +/* + * Called by the Archiver when restoring an archive to output a comment + * that includes useful information about the TOC entry. + * + * Optional. + * + */ +static void +_PrintExtraToc(ArchiveHandle *AH, TocEntry *te) +{ + lclTocEntry *ctx = (lclTocEntry *) te->formatData; + + if (AH->public.verbose) + ahprintf(AH, "-- Data Pos: " INT64_FORMAT "\n", + (int64) ctx->dataPos); +} + +/* + * Called by the archiver when saving TABLE DATA (not schema). This routine + * should save whatever format-specific information is needed to read + * the archive back. + * + * It is called just prior to the dumper's 'DataDumper' routine being called. + * + * Optional, but strongly recommended. + * + */ +static void +_StartData(ArchiveHandle *AH, TocEntry *te) +{ + lclContext *ctx = (lclContext *) AH->formatData; + lclTocEntry *tctx = (lclTocEntry *) te->formatData; + + tctx->dataPos = _getFilePos(AH, ctx); + if (tctx->dataPos >= 0) + tctx->dataState = K_OFFSET_POS_SET; + + _WriteByte(AH, BLK_DATA); /* Block type */ + WriteInt(AH, te->dumpId); /* For sanity check */ + + ctx->cs = AllocateCompressor(AH->compression, _CustomWriteFunc); +} + +/* + * Called by archiver when dumper calls WriteData. This routine is + * called for both BLOB and TABLE data; it is the responsibility of + * the format to manage each kind of data using StartBlob/StartData. + * + * It should only be called from within a DataDumper routine. + * + * Mandatory. + */ +static void +_WriteData(ArchiveHandle *AH, const void *data, size_t dLen) +{ + lclContext *ctx = (lclContext *) AH->formatData; + CompressorState *cs = ctx->cs; + + if (dLen > 0) + /* WriteDataToArchive() internally throws write errors */ + WriteDataToArchive(AH, cs, data, dLen); +} + +/* + * Called by the archiver when a dumper's 'DataDumper' routine has + * finished. + * + * Optional. + * + */ +static void +_EndData(ArchiveHandle *AH, TocEntry *te) +{ + lclContext *ctx = (lclContext *) AH->formatData; + + EndCompressor(AH, ctx->cs); + /* Send the end marker */ + WriteInt(AH, 0); +} + +/* + * Called by the archiver when starting to save all BLOB DATA (not schema). + * This routine should save whatever format-specific information is needed + * to read the BLOBs back into memory. + * + * It is called just prior to the dumper's DataDumper routine. + * + * Optional, but strongly recommended. + */ +static void +_StartBlobs(ArchiveHandle *AH, TocEntry *te) +{ + lclContext *ctx = (lclContext *) AH->formatData; + lclTocEntry *tctx = (lclTocEntry *) te->formatData; + + tctx->dataPos = _getFilePos(AH, ctx); + if (tctx->dataPos >= 0) + tctx->dataState = K_OFFSET_POS_SET; + + _WriteByte(AH, BLK_BLOBS); /* Block type */ + WriteInt(AH, te->dumpId); /* For sanity check */ +} + +/* + * Called by the archiver when the dumper calls StartBlob. + * + * Mandatory. + * + * Must save the passed OID for retrieval at restore-time. + */ +static void +_StartBlob(ArchiveHandle *AH, TocEntry *te, Oid oid) +{ + lclContext *ctx = (lclContext *) AH->formatData; + + if (oid == 0) + fatal("invalid OID for large object"); + + WriteInt(AH, oid); + + ctx->cs = AllocateCompressor(AH->compression, _CustomWriteFunc); +} + +/* + * Called by the archiver when the dumper calls EndBlob. + * + * Optional. + */ +static void +_EndBlob(ArchiveHandle *AH, TocEntry *te, Oid oid) +{ + lclContext *ctx = (lclContext *) AH->formatData; + + EndCompressor(AH, ctx->cs); + /* Send the end marker */ + WriteInt(AH, 0); +} + +/* + * Called by the archiver when finishing saving all BLOB DATA. + * + * Optional. + */ +static void +_EndBlobs(ArchiveHandle *AH, TocEntry *te) +{ + /* Write out a fake zero OID to mark end-of-blobs. */ + WriteInt(AH, 0); +} + +/* + * Print data for a given TOC entry + */ +static void +_PrintTocData(ArchiveHandle *AH, TocEntry *te) +{ + lclContext *ctx = (lclContext *) AH->formatData; + lclTocEntry *tctx = (lclTocEntry *) te->formatData; + int blkType; + int id; + + if (tctx->dataState == K_OFFSET_NO_DATA) + return; + + if (!ctx->hasSeek || tctx->dataState == K_OFFSET_POS_NOT_SET) + { + /* + * We cannot seek directly to the desired block. Instead, skip over + * block headers until we find the one we want. Remember the + * positions of skipped-over blocks, so that if we later decide we + * need to read one, we'll be able to seek to it. + * + * When our input file is seekable, we can do the search starting from + * the point after the last data block we scanned in previous + * iterations of this function. + */ + if (ctx->hasSeek) + { + if (fseeko(AH->FH, ctx->lastFilePos, SEEK_SET) != 0) + fatal("error during file seek: %m"); + } + + for (;;) + { + pgoff_t thisBlkPos = _getFilePos(AH, ctx); + + _readBlockHeader(AH, &blkType, &id); + + if (blkType == EOF || id == te->dumpId) + break; + + /* Remember the block position, if we got one */ + if (thisBlkPos >= 0) + { + TocEntry *otherte = getTocEntryByDumpId(AH, id); + + if (otherte && otherte->formatData) + { + lclTocEntry *othertctx = (lclTocEntry *) otherte->formatData; + + /* + * Note: on Windows, multiple threads might access/update + * the same lclTocEntry concurrently, but that should be + * safe as long as we update dataPos before dataState. + * Ideally, we'd use pg_write_barrier() to enforce that, + * but the needed infrastructure doesn't exist in frontend + * code. But Windows only runs on machines with strong + * store ordering, so it should be okay for now. + */ + if (othertctx->dataState == K_OFFSET_POS_NOT_SET) + { + othertctx->dataPos = thisBlkPos; + othertctx->dataState = K_OFFSET_POS_SET; + } + else if (othertctx->dataPos != thisBlkPos || + othertctx->dataState != K_OFFSET_POS_SET) + { + /* sanity check */ + pg_log_warning("data block %d has wrong seek position", + id); + } + } + } + + switch (blkType) + { + case BLK_DATA: + _skipData(AH); + break; + + case BLK_BLOBS: + _skipBlobs(AH); + break; + + default: /* Always have a default */ + fatal("unrecognized data block type (%d) while searching archive", + blkType); + break; + } + } + } + else + { + /* We can just seek to the place we need to be. */ + if (fseeko(AH->FH, tctx->dataPos, SEEK_SET) != 0) + fatal("error during file seek: %m"); + + _readBlockHeader(AH, &blkType, &id); + } + + /* + * If we reached EOF without finding the block we want, then either it + * doesn't exist, or it does but we lack the ability to seek back to it. + */ + if (blkType == EOF) + { + if (!ctx->hasSeek) + fatal("could not find block ID %d in archive -- " + "possibly due to out-of-order restore request, " + "which cannot be handled due to non-seekable input file", + te->dumpId); + else + fatal("could not find block ID %d in archive -- " + "possibly corrupt archive", + te->dumpId); + } + + /* Are we sane? */ + if (id != te->dumpId) + fatal("found unexpected block ID (%d) when reading data -- expected %d", + id, te->dumpId); + + switch (blkType) + { + case BLK_DATA: + _PrintData(AH); + break; + + case BLK_BLOBS: + _LoadBlobs(AH, AH->public.ropt->dropSchema); + break; + + default: /* Always have a default */ + fatal("unrecognized data block type %d while restoring archive", + blkType); + break; + } + + /* + * If our input file is seekable but lacks data offsets, update our + * knowledge of where to start future searches from. (Note that we did + * not update the current TE's dataState/dataPos. We could have, but + * there is no point since it will not be visited again.) + */ + if (ctx->hasSeek && tctx->dataState == K_OFFSET_POS_NOT_SET) + { + pgoff_t curPos = _getFilePos(AH, ctx); + + if (curPos > ctx->lastFilePos) + ctx->lastFilePos = curPos; + } +} + +/* + * Print data from current file position. +*/ +static void +_PrintData(ArchiveHandle *AH) +{ + ReadDataFromArchive(AH, AH->compression, _CustomReadFunc); +} + +static void +_LoadBlobs(ArchiveHandle *AH, bool drop) +{ + Oid oid; + + StartRestoreBlobs(AH); + + oid = ReadInt(AH); + while (oid != 0) + { + StartRestoreBlob(AH, oid, drop); + _PrintData(AH); + EndRestoreBlob(AH, oid); + oid = ReadInt(AH); + } + + EndRestoreBlobs(AH); +} + +/* + * Skip the BLOBs from the current file position. + * BLOBS are written sequentially as data blocks (see below). + * Each BLOB is preceded by it's original OID. + * A zero OID indicated the end of the BLOBS + */ +static void +_skipBlobs(ArchiveHandle *AH) +{ + Oid oid; + + oid = ReadInt(AH); + while (oid != 0) + { + _skipData(AH); + oid = ReadInt(AH); + } +} + +/* + * Skip data from current file position. + * Data blocks are formatted as an integer length, followed by data. + * A zero length denoted the end of the block. +*/ +static void +_skipData(ArchiveHandle *AH) +{ + lclContext *ctx = (lclContext *) AH->formatData; + size_t blkLen; + char *buf = NULL; + int buflen = 0; + size_t cnt; + + blkLen = ReadInt(AH); + while (blkLen != 0) + { + if (ctx->hasSeek) + { + if (fseeko(AH->FH, blkLen, SEEK_CUR) != 0) + fatal("error during file seek: %m"); + } + else + { + if (blkLen > buflen) + { + if (buf) + free(buf); + buf = (char *) pg_malloc(blkLen); + buflen = blkLen; + } + if ((cnt = fread(buf, 1, blkLen, AH->FH)) != blkLen) + { + if (feof(AH->FH)) + fatal("could not read from input file: end of file"); + else + fatal("could not read from input file: %m"); + } + } + + blkLen = ReadInt(AH); + } + + if (buf) + free(buf); +} + +/* + * Write a byte of data to the archive. + * + * Mandatory. + * + * Called by the archiver to do integer & byte output to the archive. + */ +static int +_WriteByte(ArchiveHandle *AH, const int i) +{ + int res; + + if ((res = fputc(i, AH->FH)) == EOF) + WRITE_ERROR_EXIT; + + return 1; +} + +/* + * Read a byte of data from the archive. + * + * Mandatory + * + * Called by the archiver to read bytes & integers from the archive. + * EOF should be treated as a fatal error. + */ +static int +_ReadByte(ArchiveHandle *AH) +{ + int res; + + res = getc(AH->FH); + if (res == EOF) + READ_ERROR_EXIT(AH->FH); + return res; +} + +/* + * Write a buffer of data to the archive. + * + * Mandatory. + * + * Called by the archiver to write a block of bytes to the archive. + */ +static void +_WriteBuf(ArchiveHandle *AH, const void *buf, size_t len) +{ + if (fwrite(buf, 1, len, AH->FH) != len) + WRITE_ERROR_EXIT; +} + +/* + * Read a block of bytes from the archive. + * + * Mandatory. + * + * Called by the archiver to read a block of bytes from the archive + */ +static void +_ReadBuf(ArchiveHandle *AH, void *buf, size_t len) +{ + if (fread(buf, 1, len, AH->FH) != len) + READ_ERROR_EXIT(AH->FH); +} + +/* + * Close the archive. + * + * Mandatory. + * + * When writing the archive, this is the routine that actually starts + * the process of saving it to files. No data should be written prior + * to this point, since the user could sort the TOC after creating it. + * + * If an archive is to be written, this routine must call: + * WriteHead to save the archive header + * WriteToc to save the TOC entries + * WriteDataChunks to save all DATA & BLOBs. + * + */ +static void +_CloseArchive(ArchiveHandle *AH) +{ + lclContext *ctx = (lclContext *) AH->formatData; + pgoff_t tpos; + + if (AH->mode == archModeWrite) + { + WriteHead(AH); + /* Remember TOC's seek position for use below */ + tpos = ftello(AH->FH); + if (tpos < 0 && ctx->hasSeek) + fatal("could not determine seek position in archive file: %m"); + WriteToc(AH); + WriteDataChunks(AH, NULL); + + /* + * If possible, re-write the TOC in order to update the data offset + * information. This is not essential, as pg_restore can cope in most + * cases without it; but it can make pg_restore significantly faster + * in some situations (especially parallel restore). + */ + if (ctx->hasSeek && + fseeko(AH->FH, tpos, SEEK_SET) == 0) + WriteToc(AH); + } + + if (fclose(AH->FH) != 0) + fatal("could not close archive file: %m"); + + /* Sync the output file if one is defined */ + if (AH->dosync && AH->mode == archModeWrite && AH->fSpec) + (void) fsync_fname(AH->fSpec, false); + + AH->FH = NULL; +} + +/* + * Reopen the archive's file handle. + * + * We close the original file handle, except on Windows. (The difference + * is because on Windows, this is used within a multithreading context, + * and we don't want a thread closing the parent file handle.) + */ +static void +_ReopenArchive(ArchiveHandle *AH) +{ + lclContext *ctx = (lclContext *) AH->formatData; + pgoff_t tpos; + + if (AH->mode == archModeWrite) + fatal("can only reopen input archives"); + + /* + * These two cases are user-facing errors since they represent unsupported + * (but not invalid) use-cases. Word the error messages appropriately. + */ + if (AH->fSpec == NULL || strcmp(AH->fSpec, "") == 0) + fatal("parallel restore from standard input is not supported"); + if (!ctx->hasSeek) + fatal("parallel restore from non-seekable file is not supported"); + + tpos = ftello(AH->FH); + if (tpos < 0) + fatal("could not determine seek position in archive file: %m"); + +#ifndef WIN32 + if (fclose(AH->FH) != 0) + fatal("could not close archive file: %m"); +#endif + + AH->FH = fopen(AH->fSpec, PG_BINARY_R); + if (!AH->FH) + fatal("could not open input file \"%s\": %m", AH->fSpec); + + if (fseeko(AH->FH, tpos, SEEK_SET) != 0) + fatal("could not set seek position in archive file: %m"); +} + +/* + * Prepare for parallel restore. + * + * The main thing that needs to happen here is to fill in TABLE DATA and BLOBS + * TOC entries' dataLength fields with appropriate values to guide the + * ordering of restore jobs. The source of said data is format-dependent, + * as is the exact meaning of the values. + * + * A format module might also choose to do other setup here. + */ +static void +_PrepParallelRestore(ArchiveHandle *AH) +{ + lclContext *ctx = (lclContext *) AH->formatData; + TocEntry *prev_te = NULL; + lclTocEntry *prev_tctx = NULL; + TocEntry *te; + + /* + * Knowing that the data items were dumped out in TOC order, we can + * reconstruct the length of each item as the delta to the start offset of + * the next data item. + */ + for (te = AH->toc->next; te != AH->toc; te = te->next) + { + lclTocEntry *tctx = (lclTocEntry *) te->formatData; + + /* + * Ignore entries without a known data offset; if we were unable to + * seek to rewrite the TOC when creating the archive, this'll be all + * of them, and we'll end up with no size estimates. + */ + if (tctx->dataState != K_OFFSET_POS_SET) + continue; + + /* Compute previous data item's length */ + if (prev_te) + { + if (tctx->dataPos > prev_tctx->dataPos) + prev_te->dataLength = tctx->dataPos - prev_tctx->dataPos; + } + + prev_te = te; + prev_tctx = tctx; + } + + /* If OK to seek, we can determine the length of the last item */ + if (prev_te && ctx->hasSeek) + { + pgoff_t endpos; + + if (fseeko(AH->FH, 0, SEEK_END) != 0) + fatal("error during file seek: %m"); + endpos = ftello(AH->FH); + if (endpos > prev_tctx->dataPos) + prev_te->dataLength = endpos - prev_tctx->dataPos; + } +} + +/* + * Clone format-specific fields during parallel restoration. + */ +static void +_Clone(ArchiveHandle *AH) +{ + lclContext *ctx = (lclContext *) AH->formatData; + + /* + * Each thread must have private lclContext working state. + */ + AH->formatData = (lclContext *) pg_malloc(sizeof(lclContext)); + memcpy(AH->formatData, ctx, sizeof(lclContext)); + ctx = (lclContext *) AH->formatData; + + /* sanity check, shouldn't happen */ + if (ctx->cs != NULL) + fatal("compressor active"); + + /* + * We intentionally do not clone TOC-entry-local state: it's useful to + * share knowledge about where the data blocks are across threads. + * _PrintTocData has to be careful about the order of operations on that + * state, though. + * + * Note: we do not make a local lo_buf because we expect at most one BLOBS + * entry per archive, so no parallelism is possible. + */ +} + +static void +_DeClone(ArchiveHandle *AH) +{ + lclContext *ctx = (lclContext *) AH->formatData; + + free(ctx); +} + +/* + * This function is executed in the child of a parallel restore from a + * custom-format archive and restores the actual data for one TOC entry. + */ +static int +_WorkerJobRestoreCustom(ArchiveHandle *AH, TocEntry *te) +{ + return parallel_restore(AH, te); +} + +/*-------------------------------------------------- + * END OF FORMAT CALLBACKS + *-------------------------------------------------- + */ + +/* + * Get the current position in the archive file. + * + * With a non-seekable archive file, we may not be able to obtain the + * file position. If so, just return -1. It's not too important in + * that case because we won't be able to rewrite the TOC to fill in + * data block offsets anyway. + */ +static pgoff_t +_getFilePos(ArchiveHandle *AH, lclContext *ctx) +{ + pgoff_t pos; + + pos = ftello(AH->FH); + if (pos < 0) + { + /* Not expected if we found we can seek. */ + if (ctx->hasSeek) + fatal("could not determine seek position in archive file: %m"); + } + return pos; +} + +/* + * Read a data block header. The format changed in V1.3, so we + * centralize the code here for simplicity. Returns *type = EOF + * if at EOF. + */ +static void +_readBlockHeader(ArchiveHandle *AH, int *type, int *id) +{ + int byt; + + /* + * Note: if we are at EOF with a pre-1.3 input file, we'll fatal() inside + * ReadInt rather than returning EOF. It doesn't seem worth jumping + * through hoops to deal with that case better, because no such files are + * likely to exist in the wild: only some 7.1 development versions of + * pg_dump ever generated such files. + */ + if (AH->version < K_VERS_1_3) + *type = BLK_DATA; + else + { + byt = getc(AH->FH); + *type = byt; + if (byt == EOF) + { + *id = 0; /* don't return an uninitialized value */ + return; + } + } + + *id = ReadInt(AH); +} + +/* + * Callback function for WriteDataToArchive. Writes one block of (compressed) + * data to the archive. + */ +static void +_CustomWriteFunc(ArchiveHandle *AH, const char *buf, size_t len) +{ + /* never write 0-byte blocks (this should not happen) */ + if (len > 0) + { + WriteInt(AH, len); + _WriteBuf(AH, buf, len); + } +} + +/* + * Callback function for ReadDataFromArchive. To keep things simple, we + * always read one compressed block at a time. + */ +static size_t +_CustomReadFunc(ArchiveHandle *AH, char **buf, size_t *buflen) +{ + size_t blkLen; + + /* Read length */ + blkLen = ReadInt(AH); + if (blkLen == 0) + return 0; + + /* If the caller's buffer is not large enough, allocate a bigger one */ + if (blkLen > *buflen) + { + free(*buf); + *buf = (char *) pg_malloc(blkLen); + *buflen = blkLen; + } + + /* exits app on read errors */ + _ReadBuf(AH, *buf, blkLen); + + return blkLen; +} |