/*------------------------------------------------------------------------- * * copy.c * Implements the COPY utility command * * Portions Copyright (c) 1996-2021, PostgreSQL Global Development Group * Portions Copyright (c) 1994, Regents of the University of California * * * IDENTIFICATION * src/backend/commands/copy.c * *------------------------------------------------------------------------- */ #include "postgres.h" #include #include #include #include "access/sysattr.h" #include "access/table.h" #include "access/xact.h" #include "catalog/pg_authid.h" #include "commands/copy.h" #include "commands/defrem.h" #include "executor/executor.h" #include "mb/pg_wchar.h" #include "miscadmin.h" #include "nodes/makefuncs.h" #include "optimizer/optimizer.h" #include "parser/parse_coerce.h" #include "parser/parse_collate.h" #include "parser/parse_expr.h" #include "parser/parse_relation.h" #include "rewrite/rewriteHandler.h" #include "utils/acl.h" #include "utils/builtins.h" #include "utils/lsyscache.h" #include "utils/memutils.h" #include "utils/rel.h" #include "utils/rls.h" /* * DoCopy executes the SQL COPY statement * * Either unload or reload contents of table , depending on . * ( = true means we are inserting into the table.) In the "TO" case * we also support copying the output of an arbitrary SELECT, INSERT, UPDATE * or DELETE query. * * If is false, transfer is between the table and the file named * . Otherwise, transfer is between the table and our regular * input/output stream. The latter could be either stdin/stdout or a * socket, depending on whether we're running under Postmaster control. * * Do not allow a Postgres user without the 'pg_read_server_files' or * 'pg_write_server_files' role to read from or write to a file. * * Do not allow the copy if user doesn't have proper permission to access * the table or the specifically requested columns. */ void DoCopy(ParseState *pstate, const CopyStmt *stmt, int stmt_location, int stmt_len, uint64 *processed) { bool is_from = stmt->is_from; bool pipe = (stmt->filename == NULL); Relation rel; Oid relid; RawStmt *query = NULL; Node *whereClause = NULL; /* * Disallow COPY to/from file or program except to users with the * appropriate role. */ if (!pipe) { if (stmt->is_program) { if (!is_member_of_role(GetUserId(), ROLE_PG_EXECUTE_SERVER_PROGRAM)) ereport(ERROR, (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE), errmsg("must be superuser or a member of the pg_execute_server_program role to COPY to or from an external program"), errhint("Anyone can COPY to stdout or from stdin. " "psql's \\copy command also works for anyone."))); } else { if (is_from && !is_member_of_role(GetUserId(), ROLE_PG_READ_SERVER_FILES)) ereport(ERROR, (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE), errmsg("must be superuser or a member of the pg_read_server_files role to COPY from a file"), errhint("Anyone can COPY to stdout or from stdin. " "psql's \\copy command also works for anyone."))); if (!is_from && !is_member_of_role(GetUserId(), ROLE_PG_WRITE_SERVER_FILES)) ereport(ERROR, (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE), errmsg("must be superuser or a member of the pg_write_server_files role to COPY to a file"), errhint("Anyone can COPY to stdout or from stdin. " "psql's \\copy command also works for anyone."))); } } if (stmt->relation) { LOCKMODE lockmode = is_from ? RowExclusiveLock : AccessShareLock; ParseNamespaceItem *nsitem; RangeTblEntry *rte; TupleDesc tupDesc; List *attnums; ListCell *cur; Assert(!stmt->query); /* Open and lock the relation, using the appropriate lock type. */ rel = table_openrv(stmt->relation, lockmode); relid = RelationGetRelid(rel); nsitem = addRangeTableEntryForRelation(pstate, rel, lockmode, NULL, false, false); rte = nsitem->p_rte; rte->requiredPerms = (is_from ? ACL_INSERT : ACL_SELECT); if (stmt->whereClause) { /* add nsitem to query namespace */ addNSItemToQuery(pstate, nsitem, false, true, true); /* Transform the raw expression tree */ whereClause = transformExpr(pstate, stmt->whereClause, EXPR_KIND_COPY_WHERE); /* Make sure it yields a boolean result. */ whereClause = coerce_to_boolean(pstate, whereClause, "WHERE"); /* we have to fix its collations too */ assign_expr_collations(pstate, whereClause); whereClause = eval_const_expressions(NULL, whereClause); whereClause = (Node *) canonicalize_qual((Expr *) whereClause, false); whereClause = (Node *) make_ands_implicit((Expr *) whereClause); } tupDesc = RelationGetDescr(rel); attnums = CopyGetAttnums(tupDesc, rel, stmt->attlist); foreach(cur, attnums) { int attno = lfirst_int(cur) - FirstLowInvalidHeapAttributeNumber; if (is_from) rte->insertedCols = bms_add_member(rte->insertedCols, attno); else rte->selectedCols = bms_add_member(rte->selectedCols, attno); } ExecCheckRTPerms(pstate->p_rtable, true); /* * Permission check for row security policies. * * check_enable_rls will ereport(ERROR) if the user has requested * something invalid and will otherwise indicate if we should enable * RLS (returns RLS_ENABLED) or not for this COPY statement. * * If the relation has a row security policy and we are to apply it * then perform a "query" copy and allow the normal query processing * to handle the policies. * * If RLS is not enabled for this, then just fall through to the * normal non-filtering relation handling. */ if (check_enable_rls(rte->relid, InvalidOid, false) == RLS_ENABLED) { SelectStmt *select; ColumnRef *cr; ResTarget *target; RangeVar *from; List *targetList = NIL; if (is_from) ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), errmsg("COPY FROM not supported with row-level security"), errhint("Use INSERT statements instead."))); /* * Build target list * * If no columns are specified in the attribute list of the COPY * command, then the target list is 'all' columns. Therefore, '*' * should be used as the target list for the resulting SELECT * statement. * * In the case that columns are specified in the attribute list, * create a ColumnRef and ResTarget for each column and add them * to the target list for the resulting SELECT statement. */ if (!stmt->attlist) { cr = makeNode(ColumnRef); cr->fields = list_make1(makeNode(A_Star)); cr->location = -1; target = makeNode(ResTarget); target->name = NULL; target->indirection = NIL; target->val = (Node *) cr; target->location = -1; targetList = list_make1(target); } else { ListCell *lc; foreach(lc, stmt->attlist) { /* * Build the ColumnRef for each column. The ColumnRef * 'fields' property is a String 'Value' node (see * nodes/value.h) that corresponds to the column name * respectively. */ cr = makeNode(ColumnRef); cr->fields = list_make1(lfirst(lc)); cr->location = -1; /* Build the ResTarget and add the ColumnRef to it. */ target = makeNode(ResTarget); target->name = NULL; target->indirection = NIL; target->val = (Node *) cr; target->location = -1; /* Add each column to the SELECT statement's target list */ targetList = lappend(targetList, target); } } /* * Build RangeVar for from clause, fully qualified based on the * relation which we have opened and locked. */ from = makeRangeVar(get_namespace_name(RelationGetNamespace(rel)), pstrdup(RelationGetRelationName(rel)), -1); /* Build query */ select = makeNode(SelectStmt); select->targetList = targetList; select->fromClause = list_make1(from); query = makeNode(RawStmt); query->stmt = (Node *) select; query->stmt_location = stmt_location; query->stmt_len = stmt_len; /* * Close the relation for now, but keep the lock on it to prevent * changes between now and when we start the query-based COPY. * * We'll reopen it later as part of the query-based COPY. */ table_close(rel, NoLock); rel = NULL; } } else { Assert(stmt->query); query = makeNode(RawStmt); query->stmt = stmt->query; query->stmt_location = stmt_location; query->stmt_len = stmt_len; relid = InvalidOid; rel = NULL; } if (is_from) { CopyFromState cstate; Assert(rel); /* check read-only transaction and parallel mode */ if (XactReadOnly && !rel->rd_islocaltemp) PreventCommandIfReadOnly("COPY FROM"); cstate = BeginCopyFrom(pstate, rel, whereClause, stmt->filename, stmt->is_program, NULL, stmt->attlist, stmt->options); *processed = CopyFrom(cstate); /* copy from file to database */ EndCopyFrom(cstate); } else { CopyToState cstate; cstate = BeginCopyTo(pstate, rel, query, relid, stmt->filename, stmt->is_program, stmt->attlist, stmt->options); *processed = DoCopyTo(cstate); /* copy from database to file */ EndCopyTo(cstate); } if (rel != NULL) table_close(rel, NoLock); } /* * Process the statement option list for COPY. * * Scan the options list (a list of DefElem) and transpose the information * into *opts_out, applying appropriate error checking. * * If 'opts_out' is not NULL, it is assumed to be filled with zeroes initially. * * This is exported so that external users of the COPY API can sanity-check * a list of options. In that usage, 'opts_out' can be passed as NULL and * the collected data is just leaked until CurrentMemoryContext is reset. * * Note that additional checking, such as whether column names listed in FORCE * QUOTE actually exist, has to be applied later. This just checks for * self-consistency of the options list. */ void ProcessCopyOptions(ParseState *pstate, CopyFormatOptions *opts_out, bool is_from, List *options) { bool format_specified = false; bool freeze_specified = false; bool header_specified = false; ListCell *option; /* Support external use for option sanity checking */ if (opts_out == NULL) opts_out = (CopyFormatOptions *) palloc0(sizeof(CopyFormatOptions)); opts_out->file_encoding = -1; /* Extract options from the statement node tree */ foreach(option, options) { DefElem *defel = lfirst_node(DefElem, option); if (strcmp(defel->defname, "format") == 0) { char *fmt = defGetString(defel); if (format_specified) ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), errmsg("conflicting or redundant options"), parser_errposition(pstate, defel->location))); format_specified = true; if (strcmp(fmt, "text") == 0) /* default format */ ; else if (strcmp(fmt, "csv") == 0) opts_out->csv_mode = true; else if (strcmp(fmt, "binary") == 0) opts_out->binary = true; else ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE), errmsg("COPY format \"%s\" not recognized", fmt), parser_errposition(pstate, defel->location))); } else if (strcmp(defel->defname, "freeze") == 0) { if (freeze_specified) ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), errmsg("conflicting or redundant options"), parser_errposition(pstate, defel->location))); freeze_specified = true; opts_out->freeze = defGetBoolean(defel); } else if (strcmp(defel->defname, "delimiter") == 0) { if (opts_out->delim) ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), errmsg("conflicting or redundant options"), parser_errposition(pstate, defel->location))); opts_out->delim = defGetString(defel); } else if (strcmp(defel->defname, "null") == 0) { if (opts_out->null_print) ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), errmsg("conflicting or redundant options"), parser_errposition(pstate, defel->location))); opts_out->null_print = defGetString(defel); } else if (strcmp(defel->defname, "header") == 0) { if (header_specified) ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), errmsg("conflicting or redundant options"), parser_errposition(pstate, defel->location))); header_specified = true; opts_out->header_line = defGetBoolean(defel); } else if (strcmp(defel->defname, "quote") == 0) { if (opts_out->quote) ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), errmsg("conflicting or redundant options"), parser_errposition(pstate, defel->location))); opts_out->quote = defGetString(defel); } else if (strcmp(defel->defname, "escape") == 0) { if (opts_out->escape) ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), errmsg("conflicting or redundant options"), parser_errposition(pstate, defel->location))); opts_out->escape = defGetString(defel); } else if (strcmp(defel->defname, "force_quote") == 0) { if (opts_out->force_quote || opts_out->force_quote_all) ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), errmsg("conflicting or redundant options"), parser_errposition(pstate, defel->location))); if (defel->arg && IsA(defel->arg, A_Star)) opts_out->force_quote_all = true; else if (defel->arg && IsA(defel->arg, List)) opts_out->force_quote = castNode(List, defel->arg); else ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE), errmsg("argument to option \"%s\" must be a list of column names", defel->defname), parser_errposition(pstate, defel->location))); } else if (strcmp(defel->defname, "force_not_null") == 0) { if (opts_out->force_notnull) ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), errmsg("conflicting or redundant options"), parser_errposition(pstate, defel->location))); if (defel->arg && IsA(defel->arg, List)) opts_out->force_notnull = castNode(List, defel->arg); else ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE), errmsg("argument to option \"%s\" must be a list of column names", defel->defname), parser_errposition(pstate, defel->location))); } else if (strcmp(defel->defname, "force_null") == 0) { if (opts_out->force_null) ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), errmsg("conflicting or redundant options"))); if (defel->arg && IsA(defel->arg, List)) opts_out->force_null = castNode(List, defel->arg); else ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE), errmsg("argument to option \"%s\" must be a list of column names", defel->defname), parser_errposition(pstate, defel->location))); } else if (strcmp(defel->defname, "convert_selectively") == 0) { /* * Undocumented, not-accessible-from-SQL option: convert only the * named columns to binary form, storing the rest as NULLs. It's * allowed for the column list to be NIL. */ if (opts_out->convert_selectively) ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), errmsg("conflicting or redundant options"), parser_errposition(pstate, defel->location))); opts_out->convert_selectively = true; if (defel->arg == NULL || IsA(defel->arg, List)) opts_out->convert_select = castNode(List, defel->arg); else ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE), errmsg("argument to option \"%s\" must be a list of column names", defel->defname), parser_errposition(pstate, defel->location))); } else if (strcmp(defel->defname, "encoding") == 0) { if (opts_out->file_encoding >= 0) ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), errmsg("conflicting or redundant options"), parser_errposition(pstate, defel->location))); opts_out->file_encoding = pg_char_to_encoding(defGetString(defel)); if (opts_out->file_encoding < 0) ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE), errmsg("argument to option \"%s\" must be a valid encoding name", defel->defname), parser_errposition(pstate, defel->location))); } else ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), errmsg("option \"%s\" not recognized", defel->defname), parser_errposition(pstate, defel->location))); } /* * Check for incompatible options (must do these two before inserting * defaults) */ if (opts_out->binary && opts_out->delim) ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), errmsg("cannot specify DELIMITER in BINARY mode"))); if (opts_out->binary && opts_out->null_print) ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), errmsg("cannot specify NULL in BINARY mode"))); /* Set defaults for omitted options */ if (!opts_out->delim) opts_out->delim = opts_out->csv_mode ? "," : "\t"; if (!opts_out->null_print) opts_out->null_print = opts_out->csv_mode ? "" : "\\N"; opts_out->null_print_len = strlen(opts_out->null_print); if (opts_out->csv_mode) { if (!opts_out->quote) opts_out->quote = "\""; if (!opts_out->escape) opts_out->escape = opts_out->quote; } /* Only single-byte delimiter strings are supported. */ if (strlen(opts_out->delim) != 1) ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), errmsg("COPY delimiter must be a single one-byte character"))); /* Disallow end-of-line characters */ if (strchr(opts_out->delim, '\r') != NULL || strchr(opts_out->delim, '\n') != NULL) ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE), errmsg("COPY delimiter cannot be newline or carriage return"))); if (strchr(opts_out->null_print, '\r') != NULL || strchr(opts_out->null_print, '\n') != NULL) ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE), errmsg("COPY null representation cannot use newline or carriage return"))); /* * Disallow unsafe delimiter characters in non-CSV mode. We can't allow * backslash because it would be ambiguous. We can't allow the other * cases because data characters matching the delimiter must be * backslashed, and certain backslash combinations are interpreted * non-literally by COPY IN. Disallowing all lower case ASCII letters is * more than strictly necessary, but seems best for consistency and * future-proofing. Likewise we disallow all digits though only octal * digits are actually dangerous. */ if (!opts_out->csv_mode && strchr("\\.abcdefghijklmnopqrstuvwxyz0123456789", opts_out->delim[0]) != NULL) ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE), errmsg("COPY delimiter cannot be \"%s\"", opts_out->delim))); /* Check header */ if (!opts_out->csv_mode && opts_out->header_line) ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), errmsg("COPY HEADER available only in CSV mode"))); /* Check quote */ if (!opts_out->csv_mode && opts_out->quote != NULL) ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), errmsg("COPY quote available only in CSV mode"))); if (opts_out->csv_mode && strlen(opts_out->quote) != 1) ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), errmsg("COPY quote must be a single one-byte character"))); if (opts_out->csv_mode && opts_out->delim[0] == opts_out->quote[0]) ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE), errmsg("COPY delimiter and quote must be different"))); /* Check escape */ if (!opts_out->csv_mode && opts_out->escape != NULL) ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), errmsg("COPY escape available only in CSV mode"))); if (opts_out->csv_mode && strlen(opts_out->escape) != 1) ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), errmsg("COPY escape must be a single one-byte character"))); /* Check force_quote */ if (!opts_out->csv_mode && (opts_out->force_quote || opts_out->force_quote_all)) ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), errmsg("COPY force quote available only in CSV mode"))); if ((opts_out->force_quote || opts_out->force_quote_all) && is_from) ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), errmsg("COPY force quote only available using COPY TO"))); /* Check force_notnull */ if (!opts_out->csv_mode && opts_out->force_notnull != NIL) ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), errmsg("COPY force not null available only in CSV mode"))); if (opts_out->force_notnull != NIL && !is_from) ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), errmsg("COPY force not null only available using COPY FROM"))); /* Check force_null */ if (!opts_out->csv_mode && opts_out->force_null != NIL) ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), errmsg("COPY force null available only in CSV mode"))); if (opts_out->force_null != NIL && !is_from) ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), errmsg("COPY force null only available using COPY FROM"))); /* Don't allow the delimiter to appear in the null string. */ if (strchr(opts_out->null_print, opts_out->delim[0]) != NULL) ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), errmsg("COPY delimiter must not appear in the NULL specification"))); /* Don't allow the CSV quote char to appear in the null string. */ if (opts_out->csv_mode && strchr(opts_out->null_print, opts_out->quote[0]) != NULL) ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), errmsg("CSV quote character must not appear in the NULL specification"))); } /* * CopyGetAttnums - build an integer list of attnums to be copied * * The input attnamelist is either the user-specified column list, * or NIL if there was none (in which case we want all the non-dropped * columns). * * We don't include generated columns in the generated full list and we don't * allow them to be specified explicitly. They don't make sense for COPY * FROM, but we could possibly allow them for COPY TO. But this way it's at * least ensured that whatever we copy out can be copied back in. * * rel can be NULL ... it's only used for error reports. */ List * CopyGetAttnums(TupleDesc tupDesc, Relation rel, List *attnamelist) { List *attnums = NIL; if (attnamelist == NIL) { /* Generate default column list */ int attr_count = tupDesc->natts; int i; for (i = 0; i < attr_count; i++) { if (TupleDescAttr(tupDesc, i)->attisdropped) continue; if (TupleDescAttr(tupDesc, i)->attgenerated) continue; attnums = lappend_int(attnums, i + 1); } } else { /* Validate the user-supplied list and extract attnums */ ListCell *l; foreach(l, attnamelist) { char *name = strVal(lfirst(l)); int attnum; int i; /* Lookup column name */ attnum = InvalidAttrNumber; for (i = 0; i < tupDesc->natts; i++) { Form_pg_attribute att = TupleDescAttr(tupDesc, i); if (att->attisdropped) continue; if (namestrcmp(&(att->attname), name) == 0) { if (att->attgenerated) ereport(ERROR, (errcode(ERRCODE_INVALID_COLUMN_REFERENCE), errmsg("column \"%s\" is a generated column", name), errdetail("Generated columns cannot be used in COPY."))); attnum = att->attnum; break; } } if (attnum == InvalidAttrNumber) { if (rel != NULL) ereport(ERROR, (errcode(ERRCODE_UNDEFINED_COLUMN), errmsg("column \"%s\" of relation \"%s\" does not exist", name, RelationGetRelationName(rel)))); else ereport(ERROR, (errcode(ERRCODE_UNDEFINED_COLUMN), errmsg("column \"%s\" does not exist", name))); } /* Check for duplicates */ if (list_member_int(attnums, attnum)) ereport(ERROR, (errcode(ERRCODE_DUPLICATE_COLUMN), errmsg("column \"%s\" specified more than once", name))); attnums = lappend_int(attnums, attnum); } } return attnums; }