summaryrefslogtreecommitdiffstats
path: root/src/backend/commands/copy.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/commands/copy.c')
-rw-r--r--src/backend/commands/copy.c760
1 files changed, 760 insertions, 0 deletions
diff --git a/src/backend/commands/copy.c b/src/backend/commands/copy.c
new file mode 100644
index 0000000..8265b98
--- /dev/null
+++ b/src/backend/commands/copy.c
@@ -0,0 +1,760 @@
+/*-------------------------------------------------------------------------
+ *
+ * copy.c
+ * Implements the COPY utility command
+ *
+ * Portions Copyright (c) 1996-2021, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ *
+ * IDENTIFICATION
+ * src/backend/commands/copy.c
+ *
+ *-------------------------------------------------------------------------
+ */
+#include "postgres.h"
+
+#include <ctype.h>
+#include <unistd.h>
+#include <sys/stat.h>
+
+#include "access/sysattr.h"
+#include "access/table.h"
+#include "access/xact.h"
+#include "catalog/pg_authid.h"
+#include "commands/copy.h"
+#include "commands/defrem.h"
+#include "executor/executor.h"
+#include "mb/pg_wchar.h"
+#include "miscadmin.h"
+#include "nodes/makefuncs.h"
+#include "optimizer/optimizer.h"
+#include "parser/parse_coerce.h"
+#include "parser/parse_collate.h"
+#include "parser/parse_expr.h"
+#include "parser/parse_relation.h"
+#include "rewrite/rewriteHandler.h"
+#include "utils/acl.h"
+#include "utils/builtins.h"
+#include "utils/lsyscache.h"
+#include "utils/memutils.h"
+#include "utils/rel.h"
+#include "utils/rls.h"
+
+/*
+ * DoCopy executes the SQL COPY statement
+ *
+ * Either unload or reload contents of table <relation>, depending on <from>.
+ * (<from> = true means we are inserting into the table.) In the "TO" case
+ * we also support copying the output of an arbitrary SELECT, INSERT, UPDATE
+ * or DELETE query.
+ *
+ * If <pipe> is false, transfer is between the table and the file named
+ * <filename>. Otherwise, transfer is between the table and our regular
+ * input/output stream. The latter could be either stdin/stdout or a
+ * socket, depending on whether we're running under Postmaster control.
+ *
+ * Do not allow a Postgres user without the 'pg_read_server_files' or
+ * 'pg_write_server_files' role to read from or write to a file.
+ *
+ * Do not allow the copy if user doesn't have proper permission to access
+ * the table or the specifically requested columns.
+ */
+void
+DoCopy(ParseState *pstate, const CopyStmt *stmt,
+ int stmt_location, int stmt_len,
+ uint64 *processed)
+{
+ bool is_from = stmt->is_from;
+ bool pipe = (stmt->filename == NULL);
+ Relation rel;
+ Oid relid;
+ RawStmt *query = NULL;
+ Node *whereClause = NULL;
+
+ /*
+ * Disallow COPY to/from file or program except to users with the
+ * appropriate role.
+ */
+ if (!pipe)
+ {
+ if (stmt->is_program)
+ {
+ if (!is_member_of_role(GetUserId(), ROLE_PG_EXECUTE_SERVER_PROGRAM))
+ ereport(ERROR,
+ (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
+ errmsg("must be superuser or a member of the pg_execute_server_program role to COPY to or from an external program"),
+ errhint("Anyone can COPY to stdout or from stdin. "
+ "psql's \\copy command also works for anyone.")));
+ }
+ else
+ {
+ if (is_from && !is_member_of_role(GetUserId(), ROLE_PG_READ_SERVER_FILES))
+ ereport(ERROR,
+ (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
+ errmsg("must be superuser or a member of the pg_read_server_files role to COPY from a file"),
+ errhint("Anyone can COPY to stdout or from stdin. "
+ "psql's \\copy command also works for anyone.")));
+
+ if (!is_from && !is_member_of_role(GetUserId(), ROLE_PG_WRITE_SERVER_FILES))
+ ereport(ERROR,
+ (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
+ errmsg("must be superuser or a member of the pg_write_server_files role to COPY to a file"),
+ errhint("Anyone can COPY to stdout or from stdin. "
+ "psql's \\copy command also works for anyone.")));
+ }
+ }
+
+ if (stmt->relation)
+ {
+ LOCKMODE lockmode = is_from ? RowExclusiveLock : AccessShareLock;
+ ParseNamespaceItem *nsitem;
+ RangeTblEntry *rte;
+ TupleDesc tupDesc;
+ List *attnums;
+ ListCell *cur;
+
+ Assert(!stmt->query);
+
+ /* Open and lock the relation, using the appropriate lock type. */
+ rel = table_openrv(stmt->relation, lockmode);
+
+ relid = RelationGetRelid(rel);
+
+ nsitem = addRangeTableEntryForRelation(pstate, rel, lockmode,
+ NULL, false, false);
+ rte = nsitem->p_rte;
+ rte->requiredPerms = (is_from ? ACL_INSERT : ACL_SELECT);
+
+ if (stmt->whereClause)
+ {
+ /* add nsitem to query namespace */
+ addNSItemToQuery(pstate, nsitem, false, true, true);
+
+ /* Transform the raw expression tree */
+ whereClause = transformExpr(pstate, stmt->whereClause, EXPR_KIND_COPY_WHERE);
+
+ /* Make sure it yields a boolean result. */
+ whereClause = coerce_to_boolean(pstate, whereClause, "WHERE");
+
+ /* we have to fix its collations too */
+ assign_expr_collations(pstate, whereClause);
+
+ whereClause = eval_const_expressions(NULL, whereClause);
+
+ whereClause = (Node *) canonicalize_qual((Expr *) whereClause, false);
+ whereClause = (Node *) make_ands_implicit((Expr *) whereClause);
+ }
+
+ tupDesc = RelationGetDescr(rel);
+ attnums = CopyGetAttnums(tupDesc, rel, stmt->attlist);
+ foreach(cur, attnums)
+ {
+ int attno = lfirst_int(cur) -
+ FirstLowInvalidHeapAttributeNumber;
+
+ if (is_from)
+ rte->insertedCols = bms_add_member(rte->insertedCols, attno);
+ else
+ rte->selectedCols = bms_add_member(rte->selectedCols, attno);
+ }
+ ExecCheckRTPerms(pstate->p_rtable, true);
+
+ /*
+ * Permission check for row security policies.
+ *
+ * check_enable_rls will ereport(ERROR) if the user has requested
+ * something invalid and will otherwise indicate if we should enable
+ * RLS (returns RLS_ENABLED) or not for this COPY statement.
+ *
+ * If the relation has a row security policy and we are to apply it
+ * then perform a "query" copy and allow the normal query processing
+ * to handle the policies.
+ *
+ * If RLS is not enabled for this, then just fall through to the
+ * normal non-filtering relation handling.
+ */
+ if (check_enable_rls(rte->relid, InvalidOid, false) == RLS_ENABLED)
+ {
+ SelectStmt *select;
+ ColumnRef *cr;
+ ResTarget *target;
+ RangeVar *from;
+ List *targetList = NIL;
+
+ if (is_from)
+ ereport(ERROR,
+ (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+ errmsg("COPY FROM not supported with row-level security"),
+ errhint("Use INSERT statements instead.")));
+
+ /*
+ * Build target list
+ *
+ * If no columns are specified in the attribute list of the COPY
+ * command, then the target list is 'all' columns. Therefore, '*'
+ * should be used as the target list for the resulting SELECT
+ * statement.
+ *
+ * In the case that columns are specified in the attribute list,
+ * create a ColumnRef and ResTarget for each column and add them
+ * to the target list for the resulting SELECT statement.
+ */
+ if (!stmt->attlist)
+ {
+ cr = makeNode(ColumnRef);
+ cr->fields = list_make1(makeNode(A_Star));
+ cr->location = -1;
+
+ target = makeNode(ResTarget);
+ target->name = NULL;
+ target->indirection = NIL;
+ target->val = (Node *) cr;
+ target->location = -1;
+
+ targetList = list_make1(target);
+ }
+ else
+ {
+ ListCell *lc;
+
+ foreach(lc, stmt->attlist)
+ {
+ /*
+ * Build the ColumnRef for each column. The ColumnRef
+ * 'fields' property is a String 'Value' node (see
+ * nodes/value.h) that corresponds to the column name
+ * respectively.
+ */
+ cr = makeNode(ColumnRef);
+ cr->fields = list_make1(lfirst(lc));
+ cr->location = -1;
+
+ /* Build the ResTarget and add the ColumnRef to it. */
+ target = makeNode(ResTarget);
+ target->name = NULL;
+ target->indirection = NIL;
+ target->val = (Node *) cr;
+ target->location = -1;
+
+ /* Add each column to the SELECT statement's target list */
+ targetList = lappend(targetList, target);
+ }
+ }
+
+ /*
+ * Build RangeVar for from clause, fully qualified based on the
+ * relation which we have opened and locked.
+ */
+ from = makeRangeVar(get_namespace_name(RelationGetNamespace(rel)),
+ pstrdup(RelationGetRelationName(rel)),
+ -1);
+
+ /* Build query */
+ select = makeNode(SelectStmt);
+ select->targetList = targetList;
+ select->fromClause = list_make1(from);
+
+ query = makeNode(RawStmt);
+ query->stmt = (Node *) select;
+ query->stmt_location = stmt_location;
+ query->stmt_len = stmt_len;
+
+ /*
+ * Close the relation for now, but keep the lock on it to prevent
+ * changes between now and when we start the query-based COPY.
+ *
+ * We'll reopen it later as part of the query-based COPY.
+ */
+ table_close(rel, NoLock);
+ rel = NULL;
+ }
+ }
+ else
+ {
+ Assert(stmt->query);
+
+ query = makeNode(RawStmt);
+ query->stmt = stmt->query;
+ query->stmt_location = stmt_location;
+ query->stmt_len = stmt_len;
+
+ relid = InvalidOid;
+ rel = NULL;
+ }
+
+ if (is_from)
+ {
+ CopyFromState cstate;
+
+ Assert(rel);
+
+ /* check read-only transaction and parallel mode */
+ if (XactReadOnly && !rel->rd_islocaltemp)
+ PreventCommandIfReadOnly("COPY FROM");
+
+ cstate = BeginCopyFrom(pstate, rel, whereClause,
+ stmt->filename, stmt->is_program,
+ NULL, stmt->attlist, stmt->options);
+ *processed = CopyFrom(cstate); /* copy from file to database */
+ EndCopyFrom(cstate);
+ }
+ else
+ {
+ CopyToState cstate;
+
+ cstate = BeginCopyTo(pstate, rel, query, relid,
+ stmt->filename, stmt->is_program,
+ stmt->attlist, stmt->options);
+ *processed = DoCopyTo(cstate); /* copy from database to file */
+ EndCopyTo(cstate);
+ }
+
+ if (rel != NULL)
+ table_close(rel, NoLock);
+}
+
+/*
+ * Process the statement option list for COPY.
+ *
+ * Scan the options list (a list of DefElem) and transpose the information
+ * into *opts_out, applying appropriate error checking.
+ *
+ * If 'opts_out' is not NULL, it is assumed to be filled with zeroes initially.
+ *
+ * This is exported so that external users of the COPY API can sanity-check
+ * a list of options. In that usage, 'opts_out' can be passed as NULL and
+ * the collected data is just leaked until CurrentMemoryContext is reset.
+ *
+ * Note that additional checking, such as whether column names listed in FORCE
+ * QUOTE actually exist, has to be applied later. This just checks for
+ * self-consistency of the options list.
+ */
+void
+ProcessCopyOptions(ParseState *pstate,
+ CopyFormatOptions *opts_out,
+ bool is_from,
+ List *options)
+{
+ bool format_specified = false;
+ bool freeze_specified = false;
+ bool header_specified = false;
+ ListCell *option;
+
+ /* Support external use for option sanity checking */
+ if (opts_out == NULL)
+ opts_out = (CopyFormatOptions *) palloc0(sizeof(CopyFormatOptions));
+
+ opts_out->file_encoding = -1;
+
+ /* Extract options from the statement node tree */
+ foreach(option, options)
+ {
+ DefElem *defel = lfirst_node(DefElem, option);
+
+ if (strcmp(defel->defname, "format") == 0)
+ {
+ char *fmt = defGetString(defel);
+
+ if (format_specified)
+ ereport(ERROR,
+ (errcode(ERRCODE_SYNTAX_ERROR),
+ errmsg("conflicting or redundant options"),
+ parser_errposition(pstate, defel->location)));
+ format_specified = true;
+ if (strcmp(fmt, "text") == 0)
+ /* default format */ ;
+ else if (strcmp(fmt, "csv") == 0)
+ opts_out->csv_mode = true;
+ else if (strcmp(fmt, "binary") == 0)
+ opts_out->binary = true;
+ else
+ ereport(ERROR,
+ (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+ errmsg("COPY format \"%s\" not recognized", fmt),
+ parser_errposition(pstate, defel->location)));
+ }
+ else if (strcmp(defel->defname, "freeze") == 0)
+ {
+ if (freeze_specified)
+ ereport(ERROR,
+ (errcode(ERRCODE_SYNTAX_ERROR),
+ errmsg("conflicting or redundant options"),
+ parser_errposition(pstate, defel->location)));
+ freeze_specified = true;
+ opts_out->freeze = defGetBoolean(defel);
+ }
+ else if (strcmp(defel->defname, "delimiter") == 0)
+ {
+ if (opts_out->delim)
+ ereport(ERROR,
+ (errcode(ERRCODE_SYNTAX_ERROR),
+ errmsg("conflicting or redundant options"),
+ parser_errposition(pstate, defel->location)));
+ opts_out->delim = defGetString(defel);
+ }
+ else if (strcmp(defel->defname, "null") == 0)
+ {
+ if (opts_out->null_print)
+ ereport(ERROR,
+ (errcode(ERRCODE_SYNTAX_ERROR),
+ errmsg("conflicting or redundant options"),
+ parser_errposition(pstate, defel->location)));
+ opts_out->null_print = defGetString(defel);
+ }
+ else if (strcmp(defel->defname, "header") == 0)
+ {
+ if (header_specified)
+ ereport(ERROR,
+ (errcode(ERRCODE_SYNTAX_ERROR),
+ errmsg("conflicting or redundant options"),
+ parser_errposition(pstate, defel->location)));
+ header_specified = true;
+ opts_out->header_line = defGetBoolean(defel);
+ }
+ else if (strcmp(defel->defname, "quote") == 0)
+ {
+ if (opts_out->quote)
+ ereport(ERROR,
+ (errcode(ERRCODE_SYNTAX_ERROR),
+ errmsg("conflicting or redundant options"),
+ parser_errposition(pstate, defel->location)));
+ opts_out->quote = defGetString(defel);
+ }
+ else if (strcmp(defel->defname, "escape") == 0)
+ {
+ if (opts_out->escape)
+ ereport(ERROR,
+ (errcode(ERRCODE_SYNTAX_ERROR),
+ errmsg("conflicting or redundant options"),
+ parser_errposition(pstate, defel->location)));
+ opts_out->escape = defGetString(defel);
+ }
+ else if (strcmp(defel->defname, "force_quote") == 0)
+ {
+ if (opts_out->force_quote || opts_out->force_quote_all)
+ ereport(ERROR,
+ (errcode(ERRCODE_SYNTAX_ERROR),
+ errmsg("conflicting or redundant options"),
+ parser_errposition(pstate, defel->location)));
+ if (defel->arg && IsA(defel->arg, A_Star))
+ opts_out->force_quote_all = true;
+ else if (defel->arg && IsA(defel->arg, List))
+ opts_out->force_quote = castNode(List, defel->arg);
+ else
+ ereport(ERROR,
+ (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+ errmsg("argument to option \"%s\" must be a list of column names",
+ defel->defname),
+ parser_errposition(pstate, defel->location)));
+ }
+ else if (strcmp(defel->defname, "force_not_null") == 0)
+ {
+ if (opts_out->force_notnull)
+ ereport(ERROR,
+ (errcode(ERRCODE_SYNTAX_ERROR),
+ errmsg("conflicting or redundant options"),
+ parser_errposition(pstate, defel->location)));
+ if (defel->arg && IsA(defel->arg, List))
+ opts_out->force_notnull = castNode(List, defel->arg);
+ else
+ ereport(ERROR,
+ (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+ errmsg("argument to option \"%s\" must be a list of column names",
+ defel->defname),
+ parser_errposition(pstate, defel->location)));
+ }
+ else if (strcmp(defel->defname, "force_null") == 0)
+ {
+ if (opts_out->force_null)
+ ereport(ERROR,
+ (errcode(ERRCODE_SYNTAX_ERROR),
+ errmsg("conflicting or redundant options")));
+ if (defel->arg && IsA(defel->arg, List))
+ opts_out->force_null = castNode(List, defel->arg);
+ else
+ ereport(ERROR,
+ (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+ errmsg("argument to option \"%s\" must be a list of column names",
+ defel->defname),
+ parser_errposition(pstate, defel->location)));
+ }
+ else if (strcmp(defel->defname, "convert_selectively") == 0)
+ {
+ /*
+ * Undocumented, not-accessible-from-SQL option: convert only the
+ * named columns to binary form, storing the rest as NULLs. It's
+ * allowed for the column list to be NIL.
+ */
+ if (opts_out->convert_selectively)
+ ereport(ERROR,
+ (errcode(ERRCODE_SYNTAX_ERROR),
+ errmsg("conflicting or redundant options"),
+ parser_errposition(pstate, defel->location)));
+ opts_out->convert_selectively = true;
+ if (defel->arg == NULL || IsA(defel->arg, List))
+ opts_out->convert_select = castNode(List, defel->arg);
+ else
+ ereport(ERROR,
+ (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+ errmsg("argument to option \"%s\" must be a list of column names",
+ defel->defname),
+ parser_errposition(pstate, defel->location)));
+ }
+ else if (strcmp(defel->defname, "encoding") == 0)
+ {
+ if (opts_out->file_encoding >= 0)
+ ereport(ERROR,
+ (errcode(ERRCODE_SYNTAX_ERROR),
+ errmsg("conflicting or redundant options"),
+ parser_errposition(pstate, defel->location)));
+ opts_out->file_encoding = pg_char_to_encoding(defGetString(defel));
+ if (opts_out->file_encoding < 0)
+ ereport(ERROR,
+ (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+ errmsg("argument to option \"%s\" must be a valid encoding name",
+ defel->defname),
+ parser_errposition(pstate, defel->location)));
+ }
+ else
+ ereport(ERROR,
+ (errcode(ERRCODE_SYNTAX_ERROR),
+ errmsg("option \"%s\" not recognized",
+ defel->defname),
+ parser_errposition(pstate, defel->location)));
+ }
+
+ /*
+ * Check for incompatible options (must do these two before inserting
+ * defaults)
+ */
+ if (opts_out->binary && opts_out->delim)
+ ereport(ERROR,
+ (errcode(ERRCODE_SYNTAX_ERROR),
+ errmsg("cannot specify DELIMITER in BINARY mode")));
+
+ if (opts_out->binary && opts_out->null_print)
+ ereport(ERROR,
+ (errcode(ERRCODE_SYNTAX_ERROR),
+ errmsg("cannot specify NULL in BINARY mode")));
+
+ /* Set defaults for omitted options */
+ if (!opts_out->delim)
+ opts_out->delim = opts_out->csv_mode ? "," : "\t";
+
+ if (!opts_out->null_print)
+ opts_out->null_print = opts_out->csv_mode ? "" : "\\N";
+ opts_out->null_print_len = strlen(opts_out->null_print);
+
+ if (opts_out->csv_mode)
+ {
+ if (!opts_out->quote)
+ opts_out->quote = "\"";
+ if (!opts_out->escape)
+ opts_out->escape = opts_out->quote;
+ }
+
+ /* Only single-byte delimiter strings are supported. */
+ if (strlen(opts_out->delim) != 1)
+ ereport(ERROR,
+ (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+ errmsg("COPY delimiter must be a single one-byte character")));
+
+ /* Disallow end-of-line characters */
+ if (strchr(opts_out->delim, '\r') != NULL ||
+ strchr(opts_out->delim, '\n') != NULL)
+ ereport(ERROR,
+ (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+ errmsg("COPY delimiter cannot be newline or carriage return")));
+
+ if (strchr(opts_out->null_print, '\r') != NULL ||
+ strchr(opts_out->null_print, '\n') != NULL)
+ ereport(ERROR,
+ (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+ errmsg("COPY null representation cannot use newline or carriage return")));
+
+ /*
+ * Disallow unsafe delimiter characters in non-CSV mode. We can't allow
+ * backslash because it would be ambiguous. We can't allow the other
+ * cases because data characters matching the delimiter must be
+ * backslashed, and certain backslash combinations are interpreted
+ * non-literally by COPY IN. Disallowing all lower case ASCII letters is
+ * more than strictly necessary, but seems best for consistency and
+ * future-proofing. Likewise we disallow all digits though only octal
+ * digits are actually dangerous.
+ */
+ if (!opts_out->csv_mode &&
+ strchr("\\.abcdefghijklmnopqrstuvwxyz0123456789",
+ opts_out->delim[0]) != NULL)
+ ereport(ERROR,
+ (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+ errmsg("COPY delimiter cannot be \"%s\"", opts_out->delim)));
+
+ /* Check header */
+ if (!opts_out->csv_mode && opts_out->header_line)
+ ereport(ERROR,
+ (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+ errmsg("COPY HEADER available only in CSV mode")));
+
+ /* Check quote */
+ if (!opts_out->csv_mode && opts_out->quote != NULL)
+ ereport(ERROR,
+ (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+ errmsg("COPY quote available only in CSV mode")));
+
+ if (opts_out->csv_mode && strlen(opts_out->quote) != 1)
+ ereport(ERROR,
+ (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+ errmsg("COPY quote must be a single one-byte character")));
+
+ if (opts_out->csv_mode && opts_out->delim[0] == opts_out->quote[0])
+ ereport(ERROR,
+ (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+ errmsg("COPY delimiter and quote must be different")));
+
+ /* Check escape */
+ if (!opts_out->csv_mode && opts_out->escape != NULL)
+ ereport(ERROR,
+ (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+ errmsg("COPY escape available only in CSV mode")));
+
+ if (opts_out->csv_mode && strlen(opts_out->escape) != 1)
+ ereport(ERROR,
+ (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+ errmsg("COPY escape must be a single one-byte character")));
+
+ /* Check force_quote */
+ if (!opts_out->csv_mode && (opts_out->force_quote || opts_out->force_quote_all))
+ ereport(ERROR,
+ (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+ errmsg("COPY force quote available only in CSV mode")));
+ if ((opts_out->force_quote || opts_out->force_quote_all) && is_from)
+ ereport(ERROR,
+ (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+ errmsg("COPY force quote only available using COPY TO")));
+
+ /* Check force_notnull */
+ if (!opts_out->csv_mode && opts_out->force_notnull != NIL)
+ ereport(ERROR,
+ (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+ errmsg("COPY force not null available only in CSV mode")));
+ if (opts_out->force_notnull != NIL && !is_from)
+ ereport(ERROR,
+ (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+ errmsg("COPY force not null only available using COPY FROM")));
+
+ /* Check force_null */
+ if (!opts_out->csv_mode && opts_out->force_null != NIL)
+ ereport(ERROR,
+ (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+ errmsg("COPY force null available only in CSV mode")));
+
+ if (opts_out->force_null != NIL && !is_from)
+ ereport(ERROR,
+ (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+ errmsg("COPY force null only available using COPY FROM")));
+
+ /* Don't allow the delimiter to appear in the null string. */
+ if (strchr(opts_out->null_print, opts_out->delim[0]) != NULL)
+ ereport(ERROR,
+ (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+ errmsg("COPY delimiter must not appear in the NULL specification")));
+
+ /* Don't allow the CSV quote char to appear in the null string. */
+ if (opts_out->csv_mode &&
+ strchr(opts_out->null_print, opts_out->quote[0]) != NULL)
+ ereport(ERROR,
+ (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+ errmsg("CSV quote character must not appear in the NULL specification")));
+}
+
+/*
+ * CopyGetAttnums - build an integer list of attnums to be copied
+ *
+ * The input attnamelist is either the user-specified column list,
+ * or NIL if there was none (in which case we want all the non-dropped
+ * columns).
+ *
+ * We don't include generated columns in the generated full list and we don't
+ * allow them to be specified explicitly. They don't make sense for COPY
+ * FROM, but we could possibly allow them for COPY TO. But this way it's at
+ * least ensured that whatever we copy out can be copied back in.
+ *
+ * rel can be NULL ... it's only used for error reports.
+ */
+List *
+CopyGetAttnums(TupleDesc tupDesc, Relation rel, List *attnamelist)
+{
+ List *attnums = NIL;
+
+ if (attnamelist == NIL)
+ {
+ /* Generate default column list */
+ int attr_count = tupDesc->natts;
+ int i;
+
+ for (i = 0; i < attr_count; i++)
+ {
+ if (TupleDescAttr(tupDesc, i)->attisdropped)
+ continue;
+ if (TupleDescAttr(tupDesc, i)->attgenerated)
+ continue;
+ attnums = lappend_int(attnums, i + 1);
+ }
+ }
+ else
+ {
+ /* Validate the user-supplied list and extract attnums */
+ ListCell *l;
+
+ foreach(l, attnamelist)
+ {
+ char *name = strVal(lfirst(l));
+ int attnum;
+ int i;
+
+ /* Lookup column name */
+ attnum = InvalidAttrNumber;
+ for (i = 0; i < tupDesc->natts; i++)
+ {
+ Form_pg_attribute att = TupleDescAttr(tupDesc, i);
+
+ if (att->attisdropped)
+ continue;
+ if (namestrcmp(&(att->attname), name) == 0)
+ {
+ if (att->attgenerated)
+ ereport(ERROR,
+ (errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
+ errmsg("column \"%s\" is a generated column",
+ name),
+ errdetail("Generated columns cannot be used in COPY.")));
+ attnum = att->attnum;
+ break;
+ }
+ }
+ if (attnum == InvalidAttrNumber)
+ {
+ if (rel != NULL)
+ ereport(ERROR,
+ (errcode(ERRCODE_UNDEFINED_COLUMN),
+ errmsg("column \"%s\" of relation \"%s\" does not exist",
+ name, RelationGetRelationName(rel))));
+ else
+ ereport(ERROR,
+ (errcode(ERRCODE_UNDEFINED_COLUMN),
+ errmsg("column \"%s\" does not exist",
+ name)));
+ }
+ /* Check for duplicates */
+ if (list_member_int(attnums, attnum))
+ ereport(ERROR,
+ (errcode(ERRCODE_DUPLICATE_COLUMN),
+ errmsg("column \"%s\" specified more than once",
+ name)));
+ attnums = lappend_int(attnums, attnum);
+ }
+ }
+
+ return attnums;
+}