diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-05-04 12:15:05 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-05-04 12:15:05 +0000 |
commit | 46651ce6fe013220ed397add242004d764fc0153 (patch) | |
tree | 6e5299f990f88e60174a1d3ae6e48eedd2688b2b /src/backend/commands/copyfrom.c | |
parent | Initial commit. (diff) | |
download | postgresql-14-46651ce6fe013220ed397add242004d764fc0153.tar.xz postgresql-14-46651ce6fe013220ed397add242004d764fc0153.zip |
Adding upstream version 14.5.upstream/14.5upstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'src/backend/commands/copyfrom.c')
-rw-r--r-- | src/backend/commands/copyfrom.c | 1619 |
1 files changed, 1619 insertions, 0 deletions
diff --git a/src/backend/commands/copyfrom.c b/src/backend/commands/copyfrom.c new file mode 100644 index 0000000..177d731 --- /dev/null +++ b/src/backend/commands/copyfrom.c @@ -0,0 +1,1619 @@ +/*------------------------------------------------------------------------- + * + * copyfrom.c + * COPY <table> FROM file/program/client + * + * This file contains routines needed to efficiently load tuples into a + * table. That includes looking up the correct partition, firing triggers, + * calling the table AM function to insert the data, and updating indexes. + * Reading data from the input file or client and parsing it into Datums + * is handled in copyfromparse.c. + * + * Portions Copyright (c) 1996-2021, PostgreSQL Global Development Group + * Portions Copyright (c) 1994, Regents of the University of California + * + * + * IDENTIFICATION + * src/backend/commands/copyfrom.c + * + *------------------------------------------------------------------------- + */ +#include "postgres.h" + +#include <ctype.h> +#include <unistd.h> +#include <sys/stat.h> + +#include "access/heapam.h" +#include "access/htup_details.h" +#include "access/tableam.h" +#include "access/xact.h" +#include "access/xlog.h" +#include "catalog/namespace.h" +#include "commands/copy.h" +#include "commands/copyfrom_internal.h" +#include "commands/progress.h" +#include "commands/trigger.h" +#include "executor/execPartition.h" +#include "executor/executor.h" +#include "executor/nodeModifyTable.h" +#include "executor/tuptable.h" +#include "foreign/fdwapi.h" +#include "libpq/libpq.h" +#include "libpq/pqformat.h" +#include "miscadmin.h" +#include "optimizer/optimizer.h" +#include "pgstat.h" +#include "rewrite/rewriteHandler.h" +#include "storage/fd.h" +#include "tcop/tcopprot.h" +#include "utils/lsyscache.h" +#include "utils/memutils.h" +#include "utils/portal.h" +#include "utils/rel.h" +#include "utils/snapmgr.h" + +/* + * No more than this many tuples per CopyMultiInsertBuffer + * + * Caution: Don't make this too big, as we could end up with this many + * CopyMultiInsertBuffer items stored in CopyMultiInsertInfo's + * multiInsertBuffers list. Increasing this can cause quadratic growth in + * memory requirements during copies into partitioned tables with a large + * number of partitions. + */ +#define MAX_BUFFERED_TUPLES 1000 + +/* + * Flush buffers if there are >= this many bytes, as counted by the input + * size, of tuples stored. + */ +#define MAX_BUFFERED_BYTES 65535 + +/* Trim the list of buffers back down to this number after flushing */ +#define MAX_PARTITION_BUFFERS 32 + +/* Stores multi-insert data related to a single relation in CopyFrom. */ +typedef struct CopyMultiInsertBuffer +{ + TupleTableSlot *slots[MAX_BUFFERED_TUPLES]; /* Array to store tuples */ + ResultRelInfo *resultRelInfo; /* ResultRelInfo for 'relid' */ + BulkInsertState bistate; /* BulkInsertState for this rel */ + int nused; /* number of 'slots' containing tuples */ + uint64 linenos[MAX_BUFFERED_TUPLES]; /* Line # of tuple in copy + * stream */ +} CopyMultiInsertBuffer; + +/* + * Stores one or many CopyMultiInsertBuffers and details about the size and + * number of tuples which are stored in them. This allows multiple buffers to + * exist at once when COPYing into a partitioned table. + */ +typedef struct CopyMultiInsertInfo +{ + List *multiInsertBuffers; /* List of tracked CopyMultiInsertBuffers */ + int bufferedTuples; /* number of tuples buffered over all buffers */ + int bufferedBytes; /* number of bytes from all buffered tuples */ + CopyFromState cstate; /* Copy state for this CopyMultiInsertInfo */ + EState *estate; /* Executor state used for COPY */ + CommandId mycid; /* Command Id used for COPY */ + int ti_options; /* table insert options */ +} CopyMultiInsertInfo; + + +/* non-export function prototypes */ +static char *limit_printout_length(const char *str); + +static void ClosePipeFromProgram(CopyFromState cstate); + +/* + * error context callback for COPY FROM + * + * The argument for the error context must be CopyFromState. + */ +void +CopyFromErrorCallback(void *arg) +{ + CopyFromState cstate = (CopyFromState) arg; + char curlineno_str[32]; + + snprintf(curlineno_str, sizeof(curlineno_str), UINT64_FORMAT, + cstate->cur_lineno); + + if (cstate->opts.binary) + { + /* can't usefully display the data */ + if (cstate->cur_attname) + errcontext("COPY %s, line %s, column %s", + cstate->cur_relname, curlineno_str, + cstate->cur_attname); + else + errcontext("COPY %s, line %s", + cstate->cur_relname, curlineno_str); + } + else + { + if (cstate->cur_attname && cstate->cur_attval) + { + /* error is relevant to a particular column */ + char *attval; + + attval = limit_printout_length(cstate->cur_attval); + errcontext("COPY %s, line %s, column %s: \"%s\"", + cstate->cur_relname, curlineno_str, + cstate->cur_attname, attval); + pfree(attval); + } + else if (cstate->cur_attname) + { + /* error is relevant to a particular column, value is NULL */ + errcontext("COPY %s, line %s, column %s: null input", + cstate->cur_relname, curlineno_str, + cstate->cur_attname); + } + else + { + /* + * Error is relevant to a particular line. + * + * If line_buf still contains the correct line, print it. + */ + if (cstate->line_buf_valid) + { + char *lineval; + + lineval = limit_printout_length(cstate->line_buf.data); + errcontext("COPY %s, line %s: \"%s\"", + cstate->cur_relname, curlineno_str, lineval); + pfree(lineval); + } + else + { + errcontext("COPY %s, line %s", + cstate->cur_relname, curlineno_str); + } + } + } +} + +/* + * Make sure we don't print an unreasonable amount of COPY data in a message. + * + * Returns a pstrdup'd copy of the input. + */ +static char * +limit_printout_length(const char *str) +{ +#define MAX_COPY_DATA_DISPLAY 100 + + int slen = strlen(str); + int len; + char *res; + + /* Fast path if definitely okay */ + if (slen <= MAX_COPY_DATA_DISPLAY) + return pstrdup(str); + + /* Apply encoding-dependent truncation */ + len = pg_mbcliplen(str, slen, MAX_COPY_DATA_DISPLAY); + + /* + * Truncate, and add "..." to show we truncated the input. + */ + res = (char *) palloc(len + 4); + memcpy(res, str, len); + strcpy(res + len, "..."); + + return res; +} + +/* + * Allocate memory and initialize a new CopyMultiInsertBuffer for this + * ResultRelInfo. + */ +static CopyMultiInsertBuffer * +CopyMultiInsertBufferInit(ResultRelInfo *rri) +{ + CopyMultiInsertBuffer *buffer; + + buffer = (CopyMultiInsertBuffer *) palloc(sizeof(CopyMultiInsertBuffer)); + memset(buffer->slots, 0, sizeof(TupleTableSlot *) * MAX_BUFFERED_TUPLES); + buffer->resultRelInfo = rri; + buffer->bistate = GetBulkInsertState(); + buffer->nused = 0; + + return buffer; +} + +/* + * Make a new buffer for this ResultRelInfo. + */ +static inline void +CopyMultiInsertInfoSetupBuffer(CopyMultiInsertInfo *miinfo, + ResultRelInfo *rri) +{ + CopyMultiInsertBuffer *buffer; + + buffer = CopyMultiInsertBufferInit(rri); + + /* Setup back-link so we can easily find this buffer again */ + rri->ri_CopyMultiInsertBuffer = buffer; + /* Record that we're tracking this buffer */ + miinfo->multiInsertBuffers = lappend(miinfo->multiInsertBuffers, buffer); +} + +/* + * Initialize an already allocated CopyMultiInsertInfo. + * + * If rri is a non-partitioned table then a CopyMultiInsertBuffer is set up + * for that table. + */ +static void +CopyMultiInsertInfoInit(CopyMultiInsertInfo *miinfo, ResultRelInfo *rri, + CopyFromState cstate, EState *estate, CommandId mycid, + int ti_options) +{ + miinfo->multiInsertBuffers = NIL; + miinfo->bufferedTuples = 0; + miinfo->bufferedBytes = 0; + miinfo->cstate = cstate; + miinfo->estate = estate; + miinfo->mycid = mycid; + miinfo->ti_options = ti_options; + + /* + * Only setup the buffer when not dealing with a partitioned table. + * Buffers for partitioned tables will just be setup when we need to send + * tuples their way for the first time. + */ + if (rri->ri_RelationDesc->rd_rel->relkind != RELKIND_PARTITIONED_TABLE) + CopyMultiInsertInfoSetupBuffer(miinfo, rri); +} + +/* + * Returns true if the buffers are full + */ +static inline bool +CopyMultiInsertInfoIsFull(CopyMultiInsertInfo *miinfo) +{ + if (miinfo->bufferedTuples >= MAX_BUFFERED_TUPLES || + miinfo->bufferedBytes >= MAX_BUFFERED_BYTES) + return true; + return false; +} + +/* + * Returns true if we have no buffered tuples + */ +static inline bool +CopyMultiInsertInfoIsEmpty(CopyMultiInsertInfo *miinfo) +{ + return miinfo->bufferedTuples == 0; +} + +/* + * Write the tuples stored in 'buffer' out to the table. + */ +static inline void +CopyMultiInsertBufferFlush(CopyMultiInsertInfo *miinfo, + CopyMultiInsertBuffer *buffer) +{ + MemoryContext oldcontext; + int i; + uint64 save_cur_lineno; + CopyFromState cstate = miinfo->cstate; + EState *estate = miinfo->estate; + CommandId mycid = miinfo->mycid; + int ti_options = miinfo->ti_options; + bool line_buf_valid = cstate->line_buf_valid; + int nused = buffer->nused; + ResultRelInfo *resultRelInfo = buffer->resultRelInfo; + TupleTableSlot **slots = buffer->slots; + + /* + * Print error context information correctly, if one of the operations + * below fails. + */ + cstate->line_buf_valid = false; + save_cur_lineno = cstate->cur_lineno; + + /* + * table_multi_insert may leak memory, so switch to short-lived memory + * context before calling it. + */ + oldcontext = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate)); + table_multi_insert(resultRelInfo->ri_RelationDesc, + slots, + nused, + mycid, + ti_options, + buffer->bistate); + MemoryContextSwitchTo(oldcontext); + + for (i = 0; i < nused; i++) + { + /* + * If there are any indexes, update them for all the inserted tuples, + * and run AFTER ROW INSERT triggers. + */ + if (resultRelInfo->ri_NumIndices > 0) + { + List *recheckIndexes; + + cstate->cur_lineno = buffer->linenos[i]; + recheckIndexes = + ExecInsertIndexTuples(resultRelInfo, + buffer->slots[i], estate, false, false, + NULL, NIL); + ExecARInsertTriggers(estate, resultRelInfo, + slots[i], recheckIndexes, + cstate->transition_capture); + list_free(recheckIndexes); + } + + /* + * There's no indexes, but see if we need to run AFTER ROW INSERT + * triggers anyway. + */ + else if (resultRelInfo->ri_TrigDesc != NULL && + (resultRelInfo->ri_TrigDesc->trig_insert_after_row || + resultRelInfo->ri_TrigDesc->trig_insert_new_table)) + { + cstate->cur_lineno = buffer->linenos[i]; + ExecARInsertTriggers(estate, resultRelInfo, + slots[i], NIL, cstate->transition_capture); + } + + ExecClearTuple(slots[i]); + } + + /* Mark that all slots are free */ + buffer->nused = 0; + + /* reset cur_lineno and line_buf_valid to what they were */ + cstate->line_buf_valid = line_buf_valid; + cstate->cur_lineno = save_cur_lineno; +} + +/* + * Drop used slots and free member for this buffer. + * + * The buffer must be flushed before cleanup. + */ +static inline void +CopyMultiInsertBufferCleanup(CopyMultiInsertInfo *miinfo, + CopyMultiInsertBuffer *buffer) +{ + int i; + + /* Ensure buffer was flushed */ + Assert(buffer->nused == 0); + + /* Remove back-link to ourself */ + buffer->resultRelInfo->ri_CopyMultiInsertBuffer = NULL; + + FreeBulkInsertState(buffer->bistate); + + /* Since we only create slots on demand, just drop the non-null ones. */ + for (i = 0; i < MAX_BUFFERED_TUPLES && buffer->slots[i] != NULL; i++) + ExecDropSingleTupleTableSlot(buffer->slots[i]); + + table_finish_bulk_insert(buffer->resultRelInfo->ri_RelationDesc, + miinfo->ti_options); + + pfree(buffer); +} + +/* + * Write out all stored tuples in all buffers out to the tables. + * + * Once flushed we also trim the tracked buffers list down to size by removing + * the buffers created earliest first. + * + * Callers should pass 'curr_rri' as the ResultRelInfo that's currently being + * used. When cleaning up old buffers we'll never remove the one for + * 'curr_rri'. + */ +static inline void +CopyMultiInsertInfoFlush(CopyMultiInsertInfo *miinfo, ResultRelInfo *curr_rri) +{ + ListCell *lc; + + foreach(lc, miinfo->multiInsertBuffers) + { + CopyMultiInsertBuffer *buffer = (CopyMultiInsertBuffer *) lfirst(lc); + + CopyMultiInsertBufferFlush(miinfo, buffer); + } + + miinfo->bufferedTuples = 0; + miinfo->bufferedBytes = 0; + + /* + * Trim the list of tracked buffers down if it exceeds the limit. Here we + * remove buffers starting with the ones we created first. It seems less + * likely that these older ones will be needed than the ones that were + * just created. + */ + while (list_length(miinfo->multiInsertBuffers) > MAX_PARTITION_BUFFERS) + { + CopyMultiInsertBuffer *buffer; + + buffer = (CopyMultiInsertBuffer *) linitial(miinfo->multiInsertBuffers); + + /* + * We never want to remove the buffer that's currently being used, so + * if we happen to find that then move it to the end of the list. + */ + if (buffer->resultRelInfo == curr_rri) + { + miinfo->multiInsertBuffers = list_delete_first(miinfo->multiInsertBuffers); + miinfo->multiInsertBuffers = lappend(miinfo->multiInsertBuffers, buffer); + buffer = (CopyMultiInsertBuffer *) linitial(miinfo->multiInsertBuffers); + } + + CopyMultiInsertBufferCleanup(miinfo, buffer); + miinfo->multiInsertBuffers = list_delete_first(miinfo->multiInsertBuffers); + } +} + +/* + * Cleanup allocated buffers and free memory + */ +static inline void +CopyMultiInsertInfoCleanup(CopyMultiInsertInfo *miinfo) +{ + ListCell *lc; + + foreach(lc, miinfo->multiInsertBuffers) + CopyMultiInsertBufferCleanup(miinfo, lfirst(lc)); + + list_free(miinfo->multiInsertBuffers); +} + +/* + * Get the next TupleTableSlot that the next tuple should be stored in. + * + * Callers must ensure that the buffer is not full. + * + * Note: 'miinfo' is unused but has been included for consistency with the + * other functions in this area. + */ +static inline TupleTableSlot * +CopyMultiInsertInfoNextFreeSlot(CopyMultiInsertInfo *miinfo, + ResultRelInfo *rri) +{ + CopyMultiInsertBuffer *buffer = rri->ri_CopyMultiInsertBuffer; + int nused = buffer->nused; + + Assert(buffer != NULL); + Assert(nused < MAX_BUFFERED_TUPLES); + + if (buffer->slots[nused] == NULL) + buffer->slots[nused] = table_slot_create(rri->ri_RelationDesc, NULL); + return buffer->slots[nused]; +} + +/* + * Record the previously reserved TupleTableSlot that was reserved by + * CopyMultiInsertInfoNextFreeSlot as being consumed. + */ +static inline void +CopyMultiInsertInfoStore(CopyMultiInsertInfo *miinfo, ResultRelInfo *rri, + TupleTableSlot *slot, int tuplen, uint64 lineno) +{ + CopyMultiInsertBuffer *buffer = rri->ri_CopyMultiInsertBuffer; + + Assert(buffer != NULL); + Assert(slot == buffer->slots[buffer->nused]); + + /* Store the line number so we can properly report any errors later */ + buffer->linenos[buffer->nused] = lineno; + + /* Record this slot as being used */ + buffer->nused++; + + /* Update how many tuples are stored and their size */ + miinfo->bufferedTuples++; + miinfo->bufferedBytes += tuplen; +} + +/* + * Copy FROM file to relation. + */ +uint64 +CopyFrom(CopyFromState cstate) +{ + ResultRelInfo *resultRelInfo; + ResultRelInfo *target_resultRelInfo; + ResultRelInfo *prevResultRelInfo = NULL; + EState *estate = CreateExecutorState(); /* for ExecConstraints() */ + ModifyTableState *mtstate; + ExprContext *econtext; + TupleTableSlot *singleslot = NULL; + MemoryContext oldcontext = CurrentMemoryContext; + + PartitionTupleRouting *proute = NULL; + ErrorContextCallback errcallback; + CommandId mycid = GetCurrentCommandId(true); + int ti_options = 0; /* start with default options for insert */ + BulkInsertState bistate = NULL; + CopyInsertMethod insertMethod; + CopyMultiInsertInfo multiInsertInfo = {0}; /* pacify compiler */ + int64 processed = 0; + int64 excluded = 0; + bool has_before_insert_row_trig; + bool has_instead_insert_row_trig; + bool leafpart_use_multi_insert = false; + + Assert(cstate->rel); + Assert(list_length(cstate->range_table) == 1); + + /* + * The target must be a plain, foreign, or partitioned relation, or have + * an INSTEAD OF INSERT row trigger. (Currently, such triggers are only + * allowed on views, so we only hint about them in the view case.) + */ + if (cstate->rel->rd_rel->relkind != RELKIND_RELATION && + cstate->rel->rd_rel->relkind != RELKIND_FOREIGN_TABLE && + cstate->rel->rd_rel->relkind != RELKIND_PARTITIONED_TABLE && + !(cstate->rel->trigdesc && + cstate->rel->trigdesc->trig_insert_instead_row)) + { + if (cstate->rel->rd_rel->relkind == RELKIND_VIEW) + ereport(ERROR, + (errcode(ERRCODE_WRONG_OBJECT_TYPE), + errmsg("cannot copy to view \"%s\"", + RelationGetRelationName(cstate->rel)), + errhint("To enable copying to a view, provide an INSTEAD OF INSERT trigger."))); + else if (cstate->rel->rd_rel->relkind == RELKIND_MATVIEW) + ereport(ERROR, + (errcode(ERRCODE_WRONG_OBJECT_TYPE), + errmsg("cannot copy to materialized view \"%s\"", + RelationGetRelationName(cstate->rel)))); + else if (cstate->rel->rd_rel->relkind == RELKIND_SEQUENCE) + ereport(ERROR, + (errcode(ERRCODE_WRONG_OBJECT_TYPE), + errmsg("cannot copy to sequence \"%s\"", + RelationGetRelationName(cstate->rel)))); + else + ereport(ERROR, + (errcode(ERRCODE_WRONG_OBJECT_TYPE), + errmsg("cannot copy to non-table relation \"%s\"", + RelationGetRelationName(cstate->rel)))); + } + + /* + * If the target file is new-in-transaction, we assume that checking FSM + * for free space is a waste of time. This could possibly be wrong, but + * it's unlikely. + */ + if (RELKIND_HAS_STORAGE(cstate->rel->rd_rel->relkind) && + (cstate->rel->rd_createSubid != InvalidSubTransactionId || + cstate->rel->rd_firstRelfilenodeSubid != InvalidSubTransactionId)) + ti_options |= TABLE_INSERT_SKIP_FSM; + + /* + * Optimize if new relfilenode was created in this subxact or one of its + * committed children and we won't see those rows later as part of an + * earlier scan or command. The subxact test ensures that if this subxact + * aborts then the frozen rows won't be visible after xact cleanup. Note + * that the stronger test of exactly which subtransaction created it is + * crucial for correctness of this optimization. The test for an earlier + * scan or command tolerates false negatives. FREEZE causes other sessions + * to see rows they would not see under MVCC, and a false negative merely + * spreads that anomaly to the current session. + */ + if (cstate->opts.freeze) + { + /* + * We currently disallow COPY FREEZE on partitioned tables. The + * reason for this is that we've simply not yet opened the partitions + * to determine if the optimization can be applied to them. We could + * go and open them all here, but doing so may be quite a costly + * overhead for small copies. In any case, we may just end up routing + * tuples to a small number of partitions. It seems better just to + * raise an ERROR for partitioned tables. + */ + if (cstate->rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE) + { + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("cannot perform COPY FREEZE on a partitioned table"))); + } + + /* + * Tolerate one registration for the benefit of FirstXactSnapshot. + * Scan-bearing queries generally create at least two registrations, + * though relying on that is fragile, as is ignoring ActiveSnapshot. + * Clear CatalogSnapshot to avoid counting its registration. We'll + * still detect ongoing catalog scans, each of which separately + * registers the snapshot it uses. + */ + InvalidateCatalogSnapshot(); + if (!ThereAreNoPriorRegisteredSnapshots() || !ThereAreNoReadyPortals()) + ereport(ERROR, + (errcode(ERRCODE_INVALID_TRANSACTION_STATE), + errmsg("cannot perform COPY FREEZE because of prior transaction activity"))); + + if (cstate->rel->rd_createSubid != GetCurrentSubTransactionId() && + cstate->rel->rd_newRelfilenodeSubid != GetCurrentSubTransactionId()) + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("cannot perform COPY FREEZE because the table was not created or truncated in the current subtransaction"))); + + ti_options |= TABLE_INSERT_FROZEN; + } + + /* + * We need a ResultRelInfo so we can use the regular executor's + * index-entry-making machinery. (There used to be a huge amount of code + * here that basically duplicated execUtils.c ...) + */ + ExecInitRangeTable(estate, cstate->range_table); + resultRelInfo = target_resultRelInfo = makeNode(ResultRelInfo); + ExecInitResultRelation(estate, resultRelInfo, 1); + + /* Verify the named relation is a valid target for INSERT */ + CheckValidResultRel(resultRelInfo, CMD_INSERT); + + ExecOpenIndices(resultRelInfo, false); + + /* + * Set up a ModifyTableState so we can let FDW(s) init themselves for + * foreign-table result relation(s). + */ + mtstate = makeNode(ModifyTableState); + mtstate->ps.plan = NULL; + mtstate->ps.state = estate; + mtstate->operation = CMD_INSERT; + mtstate->mt_nrels = 1; + mtstate->resultRelInfo = resultRelInfo; + mtstate->rootResultRelInfo = resultRelInfo; + + if (resultRelInfo->ri_FdwRoutine != NULL && + resultRelInfo->ri_FdwRoutine->BeginForeignInsert != NULL) + resultRelInfo->ri_FdwRoutine->BeginForeignInsert(mtstate, + resultRelInfo); + + /* Prepare to catch AFTER triggers. */ + AfterTriggerBeginQuery(); + + /* + * If there are any triggers with transition tables on the named relation, + * we need to be prepared to capture transition tuples. + * + * Because partition tuple routing would like to know about whether + * transition capture is active, we also set it in mtstate, which is + * passed to ExecFindPartition() below. + */ + cstate->transition_capture = mtstate->mt_transition_capture = + MakeTransitionCaptureState(cstate->rel->trigdesc, + RelationGetRelid(cstate->rel), + CMD_INSERT); + + /* + * If the named relation is a partitioned table, initialize state for + * CopyFrom tuple routing. + */ + if (cstate->rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE) + proute = ExecSetupPartitionTupleRouting(estate, cstate->rel); + + if (cstate->whereClause) + cstate->qualexpr = ExecInitQual(castNode(List, cstate->whereClause), + &mtstate->ps); + + /* + * It's generally more efficient to prepare a bunch of tuples for + * insertion, and insert them in one table_multi_insert() call, than call + * table_tuple_insert() separately for every tuple. However, there are a + * number of reasons why we might not be able to do this. These are + * explained below. + */ + if (resultRelInfo->ri_TrigDesc != NULL && + (resultRelInfo->ri_TrigDesc->trig_insert_before_row || + resultRelInfo->ri_TrigDesc->trig_insert_instead_row)) + { + /* + * Can't support multi-inserts when there are any BEFORE/INSTEAD OF + * triggers on the table. Such triggers might query the table we're + * inserting into and act differently if the tuples that have already + * been processed and prepared for insertion are not there. + */ + insertMethod = CIM_SINGLE; + } + else if (proute != NULL && resultRelInfo->ri_TrigDesc != NULL && + resultRelInfo->ri_TrigDesc->trig_insert_new_table) + { + /* + * For partitioned tables we can't support multi-inserts when there + * are any statement level insert triggers. It might be possible to + * allow partitioned tables with such triggers in the future, but for + * now, CopyMultiInsertInfoFlush expects that any before row insert + * and statement level insert triggers are on the same relation. + */ + insertMethod = CIM_SINGLE; + } + else if (resultRelInfo->ri_FdwRoutine != NULL || + cstate->volatile_defexprs) + { + /* + * Can't support multi-inserts to foreign tables or if there are any + * volatile default expressions in the table. Similarly to the + * trigger case above, such expressions may query the table we're + * inserting into. + * + * Note: It does not matter if any partitions have any volatile + * default expressions as we use the defaults from the target of the + * COPY command. + */ + insertMethod = CIM_SINGLE; + } + else if (contain_volatile_functions(cstate->whereClause)) + { + /* + * Can't support multi-inserts if there are any volatile function + * expressions in WHERE clause. Similarly to the trigger case above, + * such expressions may query the table we're inserting into. + */ + insertMethod = CIM_SINGLE; + } + else + { + /* + * For partitioned tables, we may still be able to perform bulk + * inserts. However, the possibility of this depends on which types + * of triggers exist on the partition. We must disable bulk inserts + * if the partition is a foreign table or it has any before row insert + * or insert instead triggers (same as we checked above for the parent + * table). Since the partition's resultRelInfos are initialized only + * when we actually need to insert the first tuple into them, we must + * have the intermediate insert method of CIM_MULTI_CONDITIONAL to + * flag that we must later determine if we can use bulk-inserts for + * the partition being inserted into. + */ + if (proute) + insertMethod = CIM_MULTI_CONDITIONAL; + else + insertMethod = CIM_MULTI; + + CopyMultiInsertInfoInit(&multiInsertInfo, resultRelInfo, cstate, + estate, mycid, ti_options); + } + + /* + * If not using batch mode (which allocates slots as needed) set up a + * tuple slot too. When inserting into a partitioned table, we also need + * one, even if we might batch insert, to read the tuple in the root + * partition's form. + */ + if (insertMethod == CIM_SINGLE || insertMethod == CIM_MULTI_CONDITIONAL) + { + singleslot = table_slot_create(resultRelInfo->ri_RelationDesc, + &estate->es_tupleTable); + bistate = GetBulkInsertState(); + } + + has_before_insert_row_trig = (resultRelInfo->ri_TrigDesc && + resultRelInfo->ri_TrigDesc->trig_insert_before_row); + + has_instead_insert_row_trig = (resultRelInfo->ri_TrigDesc && + resultRelInfo->ri_TrigDesc->trig_insert_instead_row); + + /* + * Check BEFORE STATEMENT insertion triggers. It's debatable whether we + * should do this for COPY, since it's not really an "INSERT" statement as + * such. However, executing these triggers maintains consistency with the + * EACH ROW triggers that we already fire on COPY. + */ + ExecBSInsertTriggers(estate, resultRelInfo); + + econtext = GetPerTupleExprContext(estate); + + /* Set up callback to identify error line number */ + errcallback.callback = CopyFromErrorCallback; + errcallback.arg = (void *) cstate; + errcallback.previous = error_context_stack; + error_context_stack = &errcallback; + + for (;;) + { + TupleTableSlot *myslot; + bool skip_tuple; + + CHECK_FOR_INTERRUPTS(); + + /* + * Reset the per-tuple exprcontext. We do this after every tuple, to + * clean-up after expression evaluations etc. + */ + ResetPerTupleExprContext(estate); + + /* select slot to (initially) load row into */ + if (insertMethod == CIM_SINGLE || proute) + { + myslot = singleslot; + Assert(myslot != NULL); + } + else + { + Assert(resultRelInfo == target_resultRelInfo); + Assert(insertMethod == CIM_MULTI); + + myslot = CopyMultiInsertInfoNextFreeSlot(&multiInsertInfo, + resultRelInfo); + } + + /* + * Switch to per-tuple context before calling NextCopyFrom, which does + * evaluate default expressions etc. and requires per-tuple context. + */ + MemoryContextSwitchTo(GetPerTupleMemoryContext(estate)); + + ExecClearTuple(myslot); + + /* Directly store the values/nulls array in the slot */ + if (!NextCopyFrom(cstate, econtext, myslot->tts_values, myslot->tts_isnull)) + break; + + ExecStoreVirtualTuple(myslot); + + /* + * Constraints and where clause might reference the tableoid column, + * so (re-)initialize tts_tableOid before evaluating them. + */ + myslot->tts_tableOid = RelationGetRelid(target_resultRelInfo->ri_RelationDesc); + + /* Triggers and stuff need to be invoked in query context. */ + MemoryContextSwitchTo(oldcontext); + + if (cstate->whereClause) + { + econtext->ecxt_scantuple = myslot; + /* Skip items that don't match COPY's WHERE clause */ + if (!ExecQual(cstate->qualexpr, econtext)) + { + /* + * Report that this tuple was filtered out by the WHERE + * clause. + */ + pgstat_progress_update_param(PROGRESS_COPY_TUPLES_EXCLUDED, + ++excluded); + continue; + } + } + + /* Determine the partition to insert the tuple into */ + if (proute) + { + TupleConversionMap *map; + + /* + * Attempt to find a partition suitable for this tuple. + * ExecFindPartition() will raise an error if none can be found or + * if the found partition is not suitable for INSERTs. + */ + resultRelInfo = ExecFindPartition(mtstate, target_resultRelInfo, + proute, myslot, estate); + + if (prevResultRelInfo != resultRelInfo) + { + /* Determine which triggers exist on this partition */ + has_before_insert_row_trig = (resultRelInfo->ri_TrigDesc && + resultRelInfo->ri_TrigDesc->trig_insert_before_row); + + has_instead_insert_row_trig = (resultRelInfo->ri_TrigDesc && + resultRelInfo->ri_TrigDesc->trig_insert_instead_row); + + /* + * Disable multi-inserts when the partition has BEFORE/INSTEAD + * OF triggers, or if the partition is a foreign partition. + */ + leafpart_use_multi_insert = insertMethod == CIM_MULTI_CONDITIONAL && + !has_before_insert_row_trig && + !has_instead_insert_row_trig && + resultRelInfo->ri_FdwRoutine == NULL; + + /* Set the multi-insert buffer to use for this partition. */ + if (leafpart_use_multi_insert) + { + if (resultRelInfo->ri_CopyMultiInsertBuffer == NULL) + CopyMultiInsertInfoSetupBuffer(&multiInsertInfo, + resultRelInfo); + } + else if (insertMethod == CIM_MULTI_CONDITIONAL && + !CopyMultiInsertInfoIsEmpty(&multiInsertInfo)) + { + /* + * Flush pending inserts if this partition can't use + * batching, so rows are visible to triggers etc. + */ + CopyMultiInsertInfoFlush(&multiInsertInfo, resultRelInfo); + } + + if (bistate != NULL) + ReleaseBulkInsertStatePin(bistate); + prevResultRelInfo = resultRelInfo; + } + + /* + * If we're capturing transition tuples, we might need to convert + * from the partition rowtype to root rowtype. But if there are no + * BEFORE triggers on the partition that could change the tuple, + * we can just remember the original unconverted tuple to avoid a + * needless round trip conversion. + */ + if (cstate->transition_capture != NULL) + cstate->transition_capture->tcs_original_insert_tuple = + !has_before_insert_row_trig ? myslot : NULL; + + /* + * We might need to convert from the root rowtype to the partition + * rowtype. + */ + map = resultRelInfo->ri_RootToPartitionMap; + if (insertMethod == CIM_SINGLE || !leafpart_use_multi_insert) + { + /* non batch insert */ + if (map != NULL) + { + TupleTableSlot *new_slot; + + new_slot = resultRelInfo->ri_PartitionTupleSlot; + myslot = execute_attr_map_slot(map->attrMap, myslot, new_slot); + } + } + else + { + /* + * Prepare to queue up tuple for later batch insert into + * current partition. + */ + TupleTableSlot *batchslot; + + /* no other path available for partitioned table */ + Assert(insertMethod == CIM_MULTI_CONDITIONAL); + + batchslot = CopyMultiInsertInfoNextFreeSlot(&multiInsertInfo, + resultRelInfo); + + if (map != NULL) + myslot = execute_attr_map_slot(map->attrMap, myslot, + batchslot); + else + { + /* + * This looks more expensive than it is (Believe me, I + * optimized it away. Twice.). The input is in virtual + * form, and we'll materialize the slot below - for most + * slot types the copy performs the work materialization + * would later require anyway. + */ + ExecCopySlot(batchslot, myslot); + myslot = batchslot; + } + } + + /* ensure that triggers etc see the right relation */ + myslot->tts_tableOid = RelationGetRelid(resultRelInfo->ri_RelationDesc); + } + + skip_tuple = false; + + /* BEFORE ROW INSERT Triggers */ + if (has_before_insert_row_trig) + { + if (!ExecBRInsertTriggers(estate, resultRelInfo, myslot)) + skip_tuple = true; /* "do nothing" */ + } + + if (!skip_tuple) + { + /* + * If there is an INSTEAD OF INSERT ROW trigger, let it handle the + * tuple. Otherwise, proceed with inserting the tuple into the + * table or foreign table. + */ + if (has_instead_insert_row_trig) + { + ExecIRInsertTriggers(estate, resultRelInfo, myslot); + } + else + { + /* Compute stored generated columns */ + if (resultRelInfo->ri_RelationDesc->rd_att->constr && + resultRelInfo->ri_RelationDesc->rd_att->constr->has_generated_stored) + ExecComputeStoredGenerated(resultRelInfo, estate, myslot, + CMD_INSERT); + + /* + * If the target is a plain table, check the constraints of + * the tuple. + */ + if (resultRelInfo->ri_FdwRoutine == NULL && + resultRelInfo->ri_RelationDesc->rd_att->constr) + ExecConstraints(resultRelInfo, myslot, estate); + + /* + * Also check the tuple against the partition constraint, if + * there is one; except that if we got here via tuple-routing, + * we don't need to if there's no BR trigger defined on the + * partition. + */ + if (resultRelInfo->ri_RelationDesc->rd_rel->relispartition && + (proute == NULL || has_before_insert_row_trig)) + ExecPartitionCheck(resultRelInfo, myslot, estate, true); + + /* Store the slot in the multi-insert buffer, when enabled. */ + if (insertMethod == CIM_MULTI || leafpart_use_multi_insert) + { + /* + * The slot previously might point into the per-tuple + * context. For batching it needs to be longer lived. + */ + ExecMaterializeSlot(myslot); + + /* Add this tuple to the tuple buffer */ + CopyMultiInsertInfoStore(&multiInsertInfo, + resultRelInfo, myslot, + cstate->line_buf.len, + cstate->cur_lineno); + + /* + * If enough inserts have queued up, then flush all + * buffers out to their tables. + */ + if (CopyMultiInsertInfoIsFull(&multiInsertInfo)) + CopyMultiInsertInfoFlush(&multiInsertInfo, resultRelInfo); + } + else + { + List *recheckIndexes = NIL; + + /* OK, store the tuple */ + if (resultRelInfo->ri_FdwRoutine != NULL) + { + myslot = resultRelInfo->ri_FdwRoutine->ExecForeignInsert(estate, + resultRelInfo, + myslot, + NULL); + + if (myslot == NULL) /* "do nothing" */ + continue; /* next tuple please */ + + /* + * AFTER ROW Triggers might reference the tableoid + * column, so (re-)initialize tts_tableOid before + * evaluating them. + */ + myslot->tts_tableOid = RelationGetRelid(resultRelInfo->ri_RelationDesc); + } + else + { + /* OK, store the tuple and create index entries for it */ + table_tuple_insert(resultRelInfo->ri_RelationDesc, + myslot, mycid, ti_options, bistate); + + if (resultRelInfo->ri_NumIndices > 0) + recheckIndexes = ExecInsertIndexTuples(resultRelInfo, + myslot, + estate, + false, + false, + NULL, + NIL); + } + + /* AFTER ROW INSERT Triggers */ + ExecARInsertTriggers(estate, resultRelInfo, myslot, + recheckIndexes, cstate->transition_capture); + + list_free(recheckIndexes); + } + } + + /* + * We count only tuples not suppressed by a BEFORE INSERT trigger + * or FDW; this is the same definition used by nodeModifyTable.c + * for counting tuples inserted by an INSERT command. Update + * progress of the COPY command as well. + */ + pgstat_progress_update_param(PROGRESS_COPY_TUPLES_PROCESSED, + ++processed); + } + } + + /* Flush any remaining buffered tuples */ + if (insertMethod != CIM_SINGLE) + { + if (!CopyMultiInsertInfoIsEmpty(&multiInsertInfo)) + CopyMultiInsertInfoFlush(&multiInsertInfo, NULL); + } + + /* Done, clean up */ + error_context_stack = errcallback.previous; + + if (bistate != NULL) + FreeBulkInsertState(bistate); + + MemoryContextSwitchTo(oldcontext); + + /* Execute AFTER STATEMENT insertion triggers */ + ExecASInsertTriggers(estate, target_resultRelInfo, cstate->transition_capture); + + /* Handle queued AFTER triggers */ + AfterTriggerEndQuery(estate); + + ExecResetTupleTable(estate->es_tupleTable, false); + + /* Allow the FDW to shut down */ + if (target_resultRelInfo->ri_FdwRoutine != NULL && + target_resultRelInfo->ri_FdwRoutine->EndForeignInsert != NULL) + target_resultRelInfo->ri_FdwRoutine->EndForeignInsert(estate, + target_resultRelInfo); + + /* Tear down the multi-insert buffer data */ + if (insertMethod != CIM_SINGLE) + CopyMultiInsertInfoCleanup(&multiInsertInfo); + + /* Close all the partitioned tables, leaf partitions, and their indices */ + if (proute) + ExecCleanupTupleRouting(mtstate, proute); + + /* Close the result relations, including any trigger target relations */ + ExecCloseResultRelations(estate); + ExecCloseRangeTableRelations(estate); + + FreeExecutorState(estate); + + return processed; +} + +/* + * Setup to read tuples from a file for COPY FROM. + * + * 'rel': Used as a template for the tuples + * 'whereClause': WHERE clause from the COPY FROM command + * 'filename': Name of server-local file to read, NULL for STDIN + * 'is_program': true if 'filename' is program to execute + * 'data_source_cb': callback that provides the input data + * 'attnamelist': List of char *, columns to include. NIL selects all cols. + * 'options': List of DefElem. See copy_opt_item in gram.y for selections. + * + * Returns a CopyFromState, to be passed to NextCopyFrom and related functions. + */ +CopyFromState +BeginCopyFrom(ParseState *pstate, + Relation rel, + Node *whereClause, + const char *filename, + bool is_program, + copy_data_source_cb data_source_cb, + List *attnamelist, + List *options) +{ + CopyFromState cstate; + bool pipe = (filename == NULL); + TupleDesc tupDesc; + AttrNumber num_phys_attrs, + num_defaults; + FmgrInfo *in_functions; + Oid *typioparams; + int attnum; + Oid in_func_oid; + int *defmap; + ExprState **defexprs; + MemoryContext oldcontext; + bool volatile_defexprs; + const int progress_cols[] = { + PROGRESS_COPY_COMMAND, + PROGRESS_COPY_TYPE, + PROGRESS_COPY_BYTES_TOTAL + }; + int64 progress_vals[] = { + PROGRESS_COPY_COMMAND_FROM, + 0, + 0 + }; + + /* Allocate workspace and zero all fields */ + cstate = (CopyFromStateData *) palloc0(sizeof(CopyFromStateData)); + + /* + * We allocate everything used by a cstate in a new memory context. This + * avoids memory leaks during repeated use of COPY in a query. + */ + cstate->copycontext = AllocSetContextCreate(CurrentMemoryContext, + "COPY", + ALLOCSET_DEFAULT_SIZES); + + oldcontext = MemoryContextSwitchTo(cstate->copycontext); + + /* Extract options from the statement node tree */ + ProcessCopyOptions(pstate, &cstate->opts, true /* is_from */ , options); + + /* Process the target relation */ + cstate->rel = rel; + + tupDesc = RelationGetDescr(cstate->rel); + + /* process commmon options or initialization */ + + /* Generate or convert list of attributes to process */ + cstate->attnumlist = CopyGetAttnums(tupDesc, cstate->rel, attnamelist); + + num_phys_attrs = tupDesc->natts; + + /* Convert FORCE_NOT_NULL name list to per-column flags, check validity */ + cstate->opts.force_notnull_flags = (bool *) palloc0(num_phys_attrs * sizeof(bool)); + if (cstate->opts.force_notnull) + { + List *attnums; + ListCell *cur; + + attnums = CopyGetAttnums(tupDesc, cstate->rel, cstate->opts.force_notnull); + + foreach(cur, attnums) + { + int attnum = lfirst_int(cur); + Form_pg_attribute attr = TupleDescAttr(tupDesc, attnum - 1); + + if (!list_member_int(cstate->attnumlist, attnum)) + ereport(ERROR, + (errcode(ERRCODE_INVALID_COLUMN_REFERENCE), + errmsg("FORCE_NOT_NULL column \"%s\" not referenced by COPY", + NameStr(attr->attname)))); + cstate->opts.force_notnull_flags[attnum - 1] = true; + } + } + + /* Convert FORCE_NULL name list to per-column flags, check validity */ + cstate->opts.force_null_flags = (bool *) palloc0(num_phys_attrs * sizeof(bool)); + if (cstate->opts.force_null) + { + List *attnums; + ListCell *cur; + + attnums = CopyGetAttnums(tupDesc, cstate->rel, cstate->opts.force_null); + + foreach(cur, attnums) + { + int attnum = lfirst_int(cur); + Form_pg_attribute attr = TupleDescAttr(tupDesc, attnum - 1); + + if (!list_member_int(cstate->attnumlist, attnum)) + ereport(ERROR, + (errcode(ERRCODE_INVALID_COLUMN_REFERENCE), + errmsg("FORCE_NULL column \"%s\" not referenced by COPY", + NameStr(attr->attname)))); + cstate->opts.force_null_flags[attnum - 1] = true; + } + } + + /* Convert convert_selectively name list to per-column flags */ + if (cstate->opts.convert_selectively) + { + List *attnums; + ListCell *cur; + + cstate->convert_select_flags = (bool *) palloc0(num_phys_attrs * sizeof(bool)); + + attnums = CopyGetAttnums(tupDesc, cstate->rel, cstate->opts.convert_select); + + foreach(cur, attnums) + { + int attnum = lfirst_int(cur); + Form_pg_attribute attr = TupleDescAttr(tupDesc, attnum - 1); + + if (!list_member_int(cstate->attnumlist, attnum)) + ereport(ERROR, + (errcode(ERRCODE_INVALID_COLUMN_REFERENCE), + errmsg_internal("selected column \"%s\" not referenced by COPY", + NameStr(attr->attname)))); + cstate->convert_select_flags[attnum - 1] = true; + } + } + + /* Use client encoding when ENCODING option is not specified. */ + if (cstate->opts.file_encoding < 0) + cstate->file_encoding = pg_get_client_encoding(); + else + cstate->file_encoding = cstate->opts.file_encoding; + + /* + * Look up encoding conversion function. + */ + if (cstate->file_encoding == GetDatabaseEncoding() || + cstate->file_encoding == PG_SQL_ASCII || + GetDatabaseEncoding() == PG_SQL_ASCII) + { + cstate->need_transcoding = false; + } + else + { + cstate->need_transcoding = true; + cstate->conversion_proc = FindDefaultConversionProc(cstate->file_encoding, + GetDatabaseEncoding()); + } + + cstate->copy_src = COPY_FILE; /* default */ + + cstate->whereClause = whereClause; + + MemoryContextSwitchTo(oldcontext); + + oldcontext = MemoryContextSwitchTo(cstate->copycontext); + + /* Initialize state variables */ + cstate->eol_type = EOL_UNKNOWN; + cstate->cur_relname = RelationGetRelationName(cstate->rel); + cstate->cur_lineno = 0; + cstate->cur_attname = NULL; + cstate->cur_attval = NULL; + + /* + * Allocate buffers for the input pipeline. + * + * attribute_buf and raw_buf are used in both text and binary modes, but + * input_buf and line_buf only in text mode. + */ + cstate->raw_buf = palloc(RAW_BUF_SIZE + 1); + cstate->raw_buf_index = cstate->raw_buf_len = 0; + cstate->raw_reached_eof = false; + + if (!cstate->opts.binary) + { + /* + * If encoding conversion is needed, we need another buffer to hold + * the converted input data. Otherwise, we can just point input_buf + * to the same buffer as raw_buf. + */ + if (cstate->need_transcoding) + { + cstate->input_buf = (char *) palloc(INPUT_BUF_SIZE + 1); + cstate->input_buf_index = cstate->input_buf_len = 0; + } + else + cstate->input_buf = cstate->raw_buf; + cstate->input_reached_eof = false; + + initStringInfo(&cstate->line_buf); + } + + initStringInfo(&cstate->attribute_buf); + + /* Assign range table, we'll need it in CopyFrom. */ + if (pstate) + cstate->range_table = pstate->p_rtable; + + tupDesc = RelationGetDescr(cstate->rel); + num_phys_attrs = tupDesc->natts; + num_defaults = 0; + volatile_defexprs = false; + + /* + * Pick up the required catalog information for each attribute in the + * relation, including the input function, the element type (to pass to + * the input function), and info about defaults and constraints. (Which + * input function we use depends on text/binary format choice.) + */ + in_functions = (FmgrInfo *) palloc(num_phys_attrs * sizeof(FmgrInfo)); + typioparams = (Oid *) palloc(num_phys_attrs * sizeof(Oid)); + defmap = (int *) palloc(num_phys_attrs * sizeof(int)); + defexprs = (ExprState **) palloc(num_phys_attrs * sizeof(ExprState *)); + + for (attnum = 1; attnum <= num_phys_attrs; attnum++) + { + Form_pg_attribute att = TupleDescAttr(tupDesc, attnum - 1); + + /* We don't need info for dropped attributes */ + if (att->attisdropped) + continue; + + /* Fetch the input function and typioparam info */ + if (cstate->opts.binary) + getTypeBinaryInputInfo(att->atttypid, + &in_func_oid, &typioparams[attnum - 1]); + else + getTypeInputInfo(att->atttypid, + &in_func_oid, &typioparams[attnum - 1]); + fmgr_info(in_func_oid, &in_functions[attnum - 1]); + + /* Get default info if needed */ + if (!list_member_int(cstate->attnumlist, attnum) && !att->attgenerated) + { + /* attribute is NOT to be copied from input */ + /* use default value if one exists */ + Expr *defexpr = (Expr *) build_column_default(cstate->rel, + attnum); + + if (defexpr != NULL) + { + /* Run the expression through planner */ + defexpr = expression_planner(defexpr); + + /* Initialize executable expression in copycontext */ + defexprs[num_defaults] = ExecInitExpr(defexpr, NULL); + defmap[num_defaults] = attnum - 1; + num_defaults++; + + /* + * If a default expression looks at the table being loaded, + * then it could give the wrong answer when using + * multi-insert. Since database access can be dynamic this is + * hard to test for exactly, so we use the much wider test of + * whether the default expression is volatile. We allow for + * the special case of when the default expression is the + * nextval() of a sequence which in this specific case is + * known to be safe for use with the multi-insert + * optimization. Hence we use this special case function + * checker rather than the standard check for + * contain_volatile_functions(). + */ + if (!volatile_defexprs) + volatile_defexprs = contain_volatile_functions_not_nextval((Node *) defexpr); + } + } + } + + + /* initialize progress */ + pgstat_progress_start_command(PROGRESS_COMMAND_COPY, + cstate->rel ? RelationGetRelid(cstate->rel) : InvalidOid); + cstate->bytes_processed = 0; + + /* We keep those variables in cstate. */ + cstate->in_functions = in_functions; + cstate->typioparams = typioparams; + cstate->defmap = defmap; + cstate->defexprs = defexprs; + cstate->volatile_defexprs = volatile_defexprs; + cstate->num_defaults = num_defaults; + cstate->is_program = is_program; + + if (data_source_cb) + { + progress_vals[1] = PROGRESS_COPY_TYPE_CALLBACK; + cstate->copy_src = COPY_CALLBACK; + cstate->data_source_cb = data_source_cb; + } + else if (pipe) + { + progress_vals[1] = PROGRESS_COPY_TYPE_PIPE; + Assert(!is_program); /* the grammar does not allow this */ + if (whereToSendOutput == DestRemote) + ReceiveCopyBegin(cstate); + else + cstate->copy_file = stdin; + } + else + { + cstate->filename = pstrdup(filename); + + if (cstate->is_program) + { + progress_vals[1] = PROGRESS_COPY_TYPE_PROGRAM; + cstate->copy_file = OpenPipeStream(cstate->filename, PG_BINARY_R); + if (cstate->copy_file == NULL) + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not execute command \"%s\": %m", + cstate->filename))); + } + else + { + struct stat st; + + progress_vals[1] = PROGRESS_COPY_TYPE_FILE; + cstate->copy_file = AllocateFile(cstate->filename, PG_BINARY_R); + if (cstate->copy_file == NULL) + { + /* copy errno because ereport subfunctions might change it */ + int save_errno = errno; + + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not open file \"%s\" for reading: %m", + cstate->filename), + (save_errno == ENOENT || save_errno == EACCES) ? + errhint("COPY FROM instructs the PostgreSQL server process to read a file. " + "You may want a client-side facility such as psql's \\copy.") : 0)); + } + + if (fstat(fileno(cstate->copy_file), &st)) + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not stat file \"%s\": %m", + cstate->filename))); + + if (S_ISDIR(st.st_mode)) + ereport(ERROR, + (errcode(ERRCODE_WRONG_OBJECT_TYPE), + errmsg("\"%s\" is a directory", cstate->filename))); + + progress_vals[2] = st.st_size; + } + } + + pgstat_progress_update_multi_param(3, progress_cols, progress_vals); + + if (cstate->opts.binary) + { + /* Read and verify binary header */ + ReceiveCopyBinaryHeader(cstate); + } + + /* create workspace for CopyReadAttributes results */ + if (!cstate->opts.binary) + { + AttrNumber attr_count = list_length(cstate->attnumlist); + + cstate->max_fields = attr_count; + cstate->raw_fields = (char **) palloc(attr_count * sizeof(char *)); + } + + MemoryContextSwitchTo(oldcontext); + + return cstate; +} + +/* + * Clean up storage and release resources for COPY FROM. + */ +void +EndCopyFrom(CopyFromState cstate) +{ + /* No COPY FROM related resources except memory. */ + if (cstate->is_program) + { + ClosePipeFromProgram(cstate); + } + else + { + if (cstate->filename != NULL && FreeFile(cstate->copy_file)) + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not close file \"%s\": %m", + cstate->filename))); + } + + pgstat_progress_end_command(); + + MemoryContextDelete(cstate->copycontext); + pfree(cstate); +} + +/* + * Closes the pipe from an external program, checking the pclose() return code. + */ +static void +ClosePipeFromProgram(CopyFromState cstate) +{ + int pclose_rc; + + Assert(cstate->is_program); + + pclose_rc = ClosePipeStream(cstate->copy_file); + if (pclose_rc == -1) + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not close pipe to external command: %m"))); + else if (pclose_rc != 0) + { + /* + * If we ended a COPY FROM PROGRAM before reaching EOF, then it's + * expectable for the called program to fail with SIGPIPE, and we + * should not report that as an error. Otherwise, SIGPIPE indicates a + * problem. + */ + if (!cstate->raw_reached_eof && + wait_result_is_signal(pclose_rc, SIGPIPE)) + return; + + ereport(ERROR, + (errcode(ERRCODE_EXTERNAL_ROUTINE_EXCEPTION), + errmsg("program \"%s\" failed", + cstate->filename), + errdetail_internal("%s", wait_result_to_str(pclose_rc)))); + } +} |