summaryrefslogtreecommitdiffstats
path: root/src/backend/commands/copyto.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/commands/copyto.c')
-rw-r--r--src/backend/commands/copyto.c1289
1 files changed, 1289 insertions, 0 deletions
diff --git a/src/backend/commands/copyto.c b/src/backend/commands/copyto.c
new file mode 100644
index 0000000..9e4b243
--- /dev/null
+++ b/src/backend/commands/copyto.c
@@ -0,0 +1,1289 @@
+/*-------------------------------------------------------------------------
+ *
+ * copyto.c
+ * COPY <table> TO file/program/client
+ *
+ * Portions Copyright (c) 1996-2023, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ *
+ * IDENTIFICATION
+ * src/backend/commands/copyto.c
+ *
+ *-------------------------------------------------------------------------
+ */
+#include "postgres.h"
+
+#include <ctype.h>
+#include <unistd.h>
+#include <sys/stat.h>
+
+#include "access/heapam.h"
+#include "access/htup_details.h"
+#include "access/tableam.h"
+#include "access/xact.h"
+#include "access/xlog.h"
+#include "commands/copy.h"
+#include "commands/progress.h"
+#include "executor/execdesc.h"
+#include "executor/executor.h"
+#include "executor/tuptable.h"
+#include "libpq/libpq.h"
+#include "libpq/pqformat.h"
+#include "mb/pg_wchar.h"
+#include "miscadmin.h"
+#include "optimizer/optimizer.h"
+#include "pgstat.h"
+#include "rewrite/rewriteHandler.h"
+#include "storage/fd.h"
+#include "tcop/tcopprot.h"
+#include "utils/lsyscache.h"
+#include "utils/memutils.h"
+#include "utils/partcache.h"
+#include "utils/rel.h"
+#include "utils/snapmgr.h"
+
+/*
+ * Represents the different dest cases we need to worry about at
+ * the bottom level
+ */
+typedef enum CopyDest
+{
+ COPY_FILE, /* to file (or a piped program) */
+ COPY_FRONTEND, /* to frontend */
+ COPY_CALLBACK /* to callback function */
+} CopyDest;
+
+/*
+ * This struct contains all the state variables used throughout a COPY TO
+ * operation.
+ *
+ * Multi-byte encodings: all supported client-side encodings encode multi-byte
+ * characters by having the first byte's high bit set. Subsequent bytes of the
+ * character can have the high bit not set. When scanning data in such an
+ * encoding to look for a match to a single-byte (ie ASCII) character, we must
+ * use the full pg_encoding_mblen() machinery to skip over multibyte
+ * characters, else we might find a false match to a trailing byte. In
+ * supported server encodings, there is no possibility of a false match, and
+ * it's faster to make useless comparisons to trailing bytes than it is to
+ * invoke pg_encoding_mblen() to skip over them. encoding_embeds_ascii is true
+ * when we have to do it the hard way.
+ */
+typedef struct CopyToStateData
+{
+ /* low-level state data */
+ CopyDest copy_dest; /* type of copy source/destination */
+ FILE *copy_file; /* used if copy_dest == COPY_FILE */
+ StringInfo fe_msgbuf; /* used for all dests during COPY TO */
+
+ int file_encoding; /* file or remote side's character encoding */
+ bool need_transcoding; /* file encoding diff from server? */
+ bool encoding_embeds_ascii; /* ASCII can be non-first byte? */
+
+ /* parameters from the COPY command */
+ Relation rel; /* relation to copy to */
+ QueryDesc *queryDesc; /* executable query to copy from */
+ List *attnumlist; /* integer list of attnums to copy */
+ char *filename; /* filename, or NULL for STDOUT */
+ bool is_program; /* is 'filename' a program to popen? */
+ copy_data_dest_cb data_dest_cb; /* function for writing data */
+
+ CopyFormatOptions opts;
+ Node *whereClause; /* WHERE condition (or NULL) */
+
+ /*
+ * Working state
+ */
+ MemoryContext copycontext; /* per-copy execution context */
+
+ FmgrInfo *out_functions; /* lookup info for output functions */
+ MemoryContext rowcontext; /* per-row evaluation context */
+ uint64 bytes_processed; /* number of bytes processed so far */
+} CopyToStateData;
+
+/* DestReceiver for COPY (query) TO */
+typedef struct
+{
+ DestReceiver pub; /* publicly-known function pointers */
+ CopyToState cstate; /* CopyToStateData for the command */
+ uint64 processed; /* # of tuples processed */
+} DR_copy;
+
+/* NOTE: there's a copy of this in copyfromparse.c */
+static const char BinarySignature[11] = "PGCOPY\n\377\r\n\0";
+
+
+/* non-export function prototypes */
+static void EndCopy(CopyToState cstate);
+static void ClosePipeToProgram(CopyToState cstate);
+static void CopyOneRowTo(CopyToState cstate, TupleTableSlot *slot);
+static void CopyAttributeOutText(CopyToState cstate, const char *string);
+static void CopyAttributeOutCSV(CopyToState cstate, const char *string,
+ bool use_quote, bool single_attr);
+
+/* Low-level communications functions */
+static void SendCopyBegin(CopyToState cstate);
+static void SendCopyEnd(CopyToState cstate);
+static void CopySendData(CopyToState cstate, const void *databuf, int datasize);
+static void CopySendString(CopyToState cstate, const char *str);
+static void CopySendChar(CopyToState cstate, char c);
+static void CopySendEndOfRow(CopyToState cstate);
+static void CopySendInt32(CopyToState cstate, int32 val);
+static void CopySendInt16(CopyToState cstate, int16 val);
+
+
+/*
+ * Send copy start/stop messages for frontend copies. These have changed
+ * in past protocol redesigns.
+ */
+static void
+SendCopyBegin(CopyToState cstate)
+{
+ StringInfoData buf;
+ int natts = list_length(cstate->attnumlist);
+ int16 format = (cstate->opts.binary ? 1 : 0);
+ int i;
+
+ pq_beginmessage(&buf, 'H');
+ pq_sendbyte(&buf, format); /* overall format */
+ pq_sendint16(&buf, natts);
+ for (i = 0; i < natts; i++)
+ pq_sendint16(&buf, format); /* per-column formats */
+ pq_endmessage(&buf);
+ cstate->copy_dest = COPY_FRONTEND;
+}
+
+static void
+SendCopyEnd(CopyToState cstate)
+{
+ /* Shouldn't have any unsent data */
+ Assert(cstate->fe_msgbuf->len == 0);
+ /* Send Copy Done message */
+ pq_putemptymessage('c');
+}
+
+/*----------
+ * CopySendData sends output data to the destination (file or frontend)
+ * CopySendString does the same for null-terminated strings
+ * CopySendChar does the same for single characters
+ * CopySendEndOfRow does the appropriate thing at end of each data row
+ * (data is not actually flushed except by CopySendEndOfRow)
+ *
+ * NB: no data conversion is applied by these functions
+ *----------
+ */
+static void
+CopySendData(CopyToState cstate, const void *databuf, int datasize)
+{
+ appendBinaryStringInfo(cstate->fe_msgbuf, databuf, datasize);
+}
+
+static void
+CopySendString(CopyToState cstate, const char *str)
+{
+ appendBinaryStringInfo(cstate->fe_msgbuf, str, strlen(str));
+}
+
+static void
+CopySendChar(CopyToState cstate, char c)
+{
+ appendStringInfoCharMacro(cstate->fe_msgbuf, c);
+}
+
+static void
+CopySendEndOfRow(CopyToState cstate)
+{
+ StringInfo fe_msgbuf = cstate->fe_msgbuf;
+
+ switch (cstate->copy_dest)
+ {
+ case COPY_FILE:
+ if (!cstate->opts.binary)
+ {
+ /* Default line termination depends on platform */
+#ifndef WIN32
+ CopySendChar(cstate, '\n');
+#else
+ CopySendString(cstate, "\r\n");
+#endif
+ }
+
+ if (fwrite(fe_msgbuf->data, fe_msgbuf->len, 1,
+ cstate->copy_file) != 1 ||
+ ferror(cstate->copy_file))
+ {
+ if (cstate->is_program)
+ {
+ if (errno == EPIPE)
+ {
+ /*
+ * The pipe will be closed automatically on error at
+ * the end of transaction, but we might get a better
+ * error message from the subprocess' exit code than
+ * just "Broken Pipe"
+ */
+ ClosePipeToProgram(cstate);
+
+ /*
+ * If ClosePipeToProgram() didn't throw an error, the
+ * program terminated normally, but closed the pipe
+ * first. Restore errno, and throw an error.
+ */
+ errno = EPIPE;
+ }
+ ereport(ERROR,
+ (errcode_for_file_access(),
+ errmsg("could not write to COPY program: %m")));
+ }
+ else
+ ereport(ERROR,
+ (errcode_for_file_access(),
+ errmsg("could not write to COPY file: %m")));
+ }
+ break;
+ case COPY_FRONTEND:
+ /* The FE/BE protocol uses \n as newline for all platforms */
+ if (!cstate->opts.binary)
+ CopySendChar(cstate, '\n');
+
+ /* Dump the accumulated row as one CopyData message */
+ (void) pq_putmessage('d', fe_msgbuf->data, fe_msgbuf->len);
+ break;
+ case COPY_CALLBACK:
+ cstate->data_dest_cb(fe_msgbuf->data, fe_msgbuf->len);
+ break;
+ }
+
+ /* Update the progress */
+ cstate->bytes_processed += fe_msgbuf->len;
+ pgstat_progress_update_param(PROGRESS_COPY_BYTES_PROCESSED, cstate->bytes_processed);
+
+ resetStringInfo(fe_msgbuf);
+}
+
+/*
+ * These functions do apply some data conversion
+ */
+
+/*
+ * CopySendInt32 sends an int32 in network byte order
+ */
+static inline void
+CopySendInt32(CopyToState cstate, int32 val)
+{
+ uint32 buf;
+
+ buf = pg_hton32((uint32) val);
+ CopySendData(cstate, &buf, sizeof(buf));
+}
+
+/*
+ * CopySendInt16 sends an int16 in network byte order
+ */
+static inline void
+CopySendInt16(CopyToState cstate, int16 val)
+{
+ uint16 buf;
+
+ buf = pg_hton16((uint16) val);
+ CopySendData(cstate, &buf, sizeof(buf));
+}
+
+/*
+ * Closes the pipe to an external program, checking the pclose() return code.
+ */
+static void
+ClosePipeToProgram(CopyToState cstate)
+{
+ int pclose_rc;
+
+ Assert(cstate->is_program);
+
+ pclose_rc = ClosePipeStream(cstate->copy_file);
+ if (pclose_rc == -1)
+ ereport(ERROR,
+ (errcode_for_file_access(),
+ errmsg("could not close pipe to external command: %m")));
+ else if (pclose_rc != 0)
+ {
+ ereport(ERROR,
+ (errcode(ERRCODE_EXTERNAL_ROUTINE_EXCEPTION),
+ errmsg("program \"%s\" failed",
+ cstate->filename),
+ errdetail_internal("%s", wait_result_to_str(pclose_rc))));
+ }
+}
+
+/*
+ * Release resources allocated in a cstate for COPY TO/FROM.
+ */
+static void
+EndCopy(CopyToState cstate)
+{
+ if (cstate->is_program)
+ {
+ ClosePipeToProgram(cstate);
+ }
+ else
+ {
+ if (cstate->filename != NULL && FreeFile(cstate->copy_file))
+ ereport(ERROR,
+ (errcode_for_file_access(),
+ errmsg("could not close file \"%s\": %m",
+ cstate->filename)));
+ }
+
+ pgstat_progress_end_command();
+
+ MemoryContextDelete(cstate->copycontext);
+ pfree(cstate);
+}
+
+/*
+ * Setup CopyToState to read tuples from a table or a query for COPY TO.
+ *
+ * 'rel': Relation to be copied
+ * 'raw_query': Query whose results are to be copied
+ * 'queryRelId': OID of base relation to convert to a query (for RLS)
+ * 'filename': Name of server-local file to write, NULL for STDOUT
+ * 'is_program': true if 'filename' is program to execute
+ * 'data_dest_cb': Callback that processes the output data
+ * 'attnamelist': List of char *, columns to include. NIL selects all cols.
+ * 'options': List of DefElem. See copy_opt_item in gram.y for selections.
+ *
+ * Returns a CopyToState, to be passed to DoCopyTo() and related functions.
+ */
+CopyToState
+BeginCopyTo(ParseState *pstate,
+ Relation rel,
+ RawStmt *raw_query,
+ Oid queryRelId,
+ const char *filename,
+ bool is_program,
+ copy_data_dest_cb data_dest_cb,
+ List *attnamelist,
+ List *options)
+{
+ CopyToState cstate;
+ bool pipe = (filename == NULL && data_dest_cb == NULL);
+ TupleDesc tupDesc;
+ int num_phys_attrs;
+ MemoryContext oldcontext;
+ const int progress_cols[] = {
+ PROGRESS_COPY_COMMAND,
+ PROGRESS_COPY_TYPE
+ };
+ int64 progress_vals[] = {
+ PROGRESS_COPY_COMMAND_TO,
+ 0
+ };
+
+ if (rel != NULL && rel->rd_rel->relkind != RELKIND_RELATION)
+ {
+ if (rel->rd_rel->relkind == RELKIND_VIEW)
+ ereport(ERROR,
+ (errcode(ERRCODE_WRONG_OBJECT_TYPE),
+ errmsg("cannot copy from view \"%s\"",
+ RelationGetRelationName(rel)),
+ errhint("Try the COPY (SELECT ...) TO variant.")));
+ else if (rel->rd_rel->relkind == RELKIND_MATVIEW)
+ ereport(ERROR,
+ (errcode(ERRCODE_WRONG_OBJECT_TYPE),
+ errmsg("cannot copy from materialized view \"%s\"",
+ RelationGetRelationName(rel)),
+ errhint("Try the COPY (SELECT ...) TO variant.")));
+ else if (rel->rd_rel->relkind == RELKIND_FOREIGN_TABLE)
+ ereport(ERROR,
+ (errcode(ERRCODE_WRONG_OBJECT_TYPE),
+ errmsg("cannot copy from foreign table \"%s\"",
+ RelationGetRelationName(rel)),
+ errhint("Try the COPY (SELECT ...) TO variant.")));
+ else if (rel->rd_rel->relkind == RELKIND_SEQUENCE)
+ ereport(ERROR,
+ (errcode(ERRCODE_WRONG_OBJECT_TYPE),
+ errmsg("cannot copy from sequence \"%s\"",
+ RelationGetRelationName(rel))));
+ else if (rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
+ ereport(ERROR,
+ (errcode(ERRCODE_WRONG_OBJECT_TYPE),
+ errmsg("cannot copy from partitioned table \"%s\"",
+ RelationGetRelationName(rel)),
+ errhint("Try the COPY (SELECT ...) TO variant.")));
+ else
+ ereport(ERROR,
+ (errcode(ERRCODE_WRONG_OBJECT_TYPE),
+ errmsg("cannot copy from non-table relation \"%s\"",
+ RelationGetRelationName(rel))));
+ }
+
+
+ /* Allocate workspace and zero all fields */
+ cstate = (CopyToStateData *) palloc0(sizeof(CopyToStateData));
+
+ /*
+ * We allocate everything used by a cstate in a new memory context. This
+ * avoids memory leaks during repeated use of COPY in a query.
+ */
+ cstate->copycontext = AllocSetContextCreate(CurrentMemoryContext,
+ "COPY",
+ ALLOCSET_DEFAULT_SIZES);
+
+ oldcontext = MemoryContextSwitchTo(cstate->copycontext);
+
+ /* Extract options from the statement node tree */
+ ProcessCopyOptions(pstate, &cstate->opts, false /* is_from */ , options);
+
+ /* Process the source/target relation or query */
+ if (rel)
+ {
+ Assert(!raw_query);
+
+ cstate->rel = rel;
+
+ tupDesc = RelationGetDescr(cstate->rel);
+ }
+ else
+ {
+ List *rewritten;
+ Query *query;
+ PlannedStmt *plan;
+ DestReceiver *dest;
+
+ cstate->rel = NULL;
+
+ /*
+ * Run parse analysis and rewrite. Note this also acquires sufficient
+ * locks on the source table(s).
+ */
+ rewritten = pg_analyze_and_rewrite_fixedparams(raw_query,
+ pstate->p_sourcetext, NULL, 0,
+ NULL);
+
+ /* check that we got back something we can work with */
+ if (rewritten == NIL)
+ {
+ ereport(ERROR,
+ (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+ errmsg("DO INSTEAD NOTHING rules are not supported for COPY")));
+ }
+ else if (list_length(rewritten) > 1)
+ {
+ ListCell *lc;
+
+ /* examine queries to determine which error message to issue */
+ foreach(lc, rewritten)
+ {
+ Query *q = lfirst_node(Query, lc);
+
+ if (q->querySource == QSRC_QUAL_INSTEAD_RULE)
+ ereport(ERROR,
+ (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+ errmsg("conditional DO INSTEAD rules are not supported for COPY")));
+ if (q->querySource == QSRC_NON_INSTEAD_RULE)
+ ereport(ERROR,
+ (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+ errmsg("DO ALSO rules are not supported for the COPY")));
+ }
+
+ ereport(ERROR,
+ (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+ errmsg("multi-statement DO INSTEAD rules are not supported for COPY")));
+ }
+
+ query = linitial_node(Query, rewritten);
+
+ /* The grammar allows SELECT INTO, but we don't support that */
+ if (query->utilityStmt != NULL &&
+ IsA(query->utilityStmt, CreateTableAsStmt))
+ ereport(ERROR,
+ (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+ errmsg("COPY (SELECT INTO) is not supported")));
+
+ Assert(query->utilityStmt == NULL);
+
+ /*
+ * Similarly the grammar doesn't enforce the presence of a RETURNING
+ * clause, but this is required here.
+ */
+ if (query->commandType != CMD_SELECT &&
+ query->returningList == NIL)
+ {
+ Assert(query->commandType == CMD_INSERT ||
+ query->commandType == CMD_UPDATE ||
+ query->commandType == CMD_DELETE);
+
+ ereport(ERROR,
+ (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+ errmsg("COPY query must have a RETURNING clause")));
+ }
+
+ /* plan the query */
+ plan = pg_plan_query(query, pstate->p_sourcetext,
+ CURSOR_OPT_PARALLEL_OK, NULL);
+
+ /*
+ * With row-level security and a user using "COPY relation TO", we
+ * have to convert the "COPY relation TO" to a query-based COPY (eg:
+ * "COPY (SELECT * FROM ONLY relation) TO"), to allow the rewriter to
+ * add in any RLS clauses.
+ *
+ * When this happens, we are passed in the relid of the originally
+ * found relation (which we have locked). As the planner will look up
+ * the relation again, we double-check here to make sure it found the
+ * same one that we have locked.
+ */
+ if (queryRelId != InvalidOid)
+ {
+ /*
+ * Note that with RLS involved there may be multiple relations,
+ * and while the one we need is almost certainly first, we don't
+ * make any guarantees of that in the planner, so check the whole
+ * list and make sure we find the original relation.
+ */
+ if (!list_member_oid(plan->relationOids, queryRelId))
+ ereport(ERROR,
+ (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+ errmsg("relation referenced by COPY statement has changed")));
+ }
+
+ /*
+ * Use a snapshot with an updated command ID to ensure this query sees
+ * results of any previously executed queries.
+ */
+ PushCopiedSnapshot(GetActiveSnapshot());
+ UpdateActiveSnapshotCommandId();
+
+ /* Create dest receiver for COPY OUT */
+ dest = CreateDestReceiver(DestCopyOut);
+ ((DR_copy *) dest)->cstate = cstate;
+
+ /* Create a QueryDesc requesting no output */
+ cstate->queryDesc = CreateQueryDesc(plan, pstate->p_sourcetext,
+ GetActiveSnapshot(),
+ InvalidSnapshot,
+ dest, NULL, NULL, 0);
+
+ /*
+ * Call ExecutorStart to prepare the plan for execution.
+ *
+ * ExecutorStart computes a result tupdesc for us
+ */
+ ExecutorStart(cstate->queryDesc, 0);
+
+ tupDesc = cstate->queryDesc->tupDesc;
+ }
+
+ /* Generate or convert list of attributes to process */
+ cstate->attnumlist = CopyGetAttnums(tupDesc, cstate->rel, attnamelist);
+
+ num_phys_attrs = tupDesc->natts;
+
+ /* Convert FORCE_QUOTE name list to per-column flags, check validity */
+ cstate->opts.force_quote_flags = (bool *) palloc0(num_phys_attrs * sizeof(bool));
+ if (cstate->opts.force_quote_all)
+ {
+ int i;
+
+ for (i = 0; i < num_phys_attrs; i++)
+ cstate->opts.force_quote_flags[i] = true;
+ }
+ else if (cstate->opts.force_quote)
+ {
+ List *attnums;
+ ListCell *cur;
+
+ attnums = CopyGetAttnums(tupDesc, cstate->rel, cstate->opts.force_quote);
+
+ foreach(cur, attnums)
+ {
+ int attnum = lfirst_int(cur);
+ Form_pg_attribute attr = TupleDescAttr(tupDesc, attnum - 1);
+
+ if (!list_member_int(cstate->attnumlist, attnum))
+ ereport(ERROR,
+ (errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
+ errmsg("FORCE_QUOTE column \"%s\" not referenced by COPY",
+ NameStr(attr->attname))));
+ cstate->opts.force_quote_flags[attnum - 1] = true;
+ }
+ }
+
+ /* Use client encoding when ENCODING option is not specified. */
+ if (cstate->opts.file_encoding < 0)
+ cstate->file_encoding = pg_get_client_encoding();
+ else
+ cstate->file_encoding = cstate->opts.file_encoding;
+
+ /*
+ * Set up encoding conversion info. Even if the file and server encodings
+ * are the same, we must apply pg_any_to_server() to validate data in
+ * multibyte encodings.
+ */
+ cstate->need_transcoding =
+ (cstate->file_encoding != GetDatabaseEncoding() ||
+ pg_database_encoding_max_length() > 1);
+ /* See Multibyte encoding comment above */
+ cstate->encoding_embeds_ascii = PG_ENCODING_IS_CLIENT_ONLY(cstate->file_encoding);
+
+ cstate->copy_dest = COPY_FILE; /* default */
+
+ if (data_dest_cb)
+ {
+ progress_vals[1] = PROGRESS_COPY_TYPE_CALLBACK;
+ cstate->copy_dest = COPY_CALLBACK;
+ cstate->data_dest_cb = data_dest_cb;
+ }
+ else if (pipe)
+ {
+ progress_vals[1] = PROGRESS_COPY_TYPE_PIPE;
+
+ Assert(!is_program); /* the grammar does not allow this */
+ if (whereToSendOutput != DestRemote)
+ cstate->copy_file = stdout;
+ }
+ else
+ {
+ cstate->filename = pstrdup(filename);
+ cstate->is_program = is_program;
+
+ if (is_program)
+ {
+ progress_vals[1] = PROGRESS_COPY_TYPE_PROGRAM;
+ cstate->copy_file = OpenPipeStream(cstate->filename, PG_BINARY_W);
+ if (cstate->copy_file == NULL)
+ ereport(ERROR,
+ (errcode_for_file_access(),
+ errmsg("could not execute command \"%s\": %m",
+ cstate->filename)));
+ }
+ else
+ {
+ mode_t oumask; /* Pre-existing umask value */
+ struct stat st;
+
+ progress_vals[1] = PROGRESS_COPY_TYPE_FILE;
+
+ /*
+ * Prevent write to relative path ... too easy to shoot oneself in
+ * the foot by overwriting a database file ...
+ */
+ if (!is_absolute_path(filename))
+ ereport(ERROR,
+ (errcode(ERRCODE_INVALID_NAME),
+ errmsg("relative path not allowed for COPY to file")));
+
+ oumask = umask(S_IWGRP | S_IWOTH);
+ PG_TRY();
+ {
+ cstate->copy_file = AllocateFile(cstate->filename, PG_BINARY_W);
+ }
+ PG_FINALLY();
+ {
+ umask(oumask);
+ }
+ PG_END_TRY();
+ if (cstate->copy_file == NULL)
+ {
+ /* copy errno because ereport subfunctions might change it */
+ int save_errno = errno;
+
+ ereport(ERROR,
+ (errcode_for_file_access(),
+ errmsg("could not open file \"%s\" for writing: %m",
+ cstate->filename),
+ (save_errno == ENOENT || save_errno == EACCES) ?
+ errhint("COPY TO instructs the PostgreSQL server process to write a file. "
+ "You may want a client-side facility such as psql's \\copy.") : 0));
+ }
+
+ if (fstat(fileno(cstate->copy_file), &st))
+ ereport(ERROR,
+ (errcode_for_file_access(),
+ errmsg("could not stat file \"%s\": %m",
+ cstate->filename)));
+
+ if (S_ISDIR(st.st_mode))
+ ereport(ERROR,
+ (errcode(ERRCODE_WRONG_OBJECT_TYPE),
+ errmsg("\"%s\" is a directory", cstate->filename)));
+ }
+ }
+
+ /* initialize progress */
+ pgstat_progress_start_command(PROGRESS_COMMAND_COPY,
+ cstate->rel ? RelationGetRelid(cstate->rel) : InvalidOid);
+ pgstat_progress_update_multi_param(2, progress_cols, progress_vals);
+
+ cstate->bytes_processed = 0;
+
+ MemoryContextSwitchTo(oldcontext);
+
+ return cstate;
+}
+
+/*
+ * Clean up storage and release resources for COPY TO.
+ */
+void
+EndCopyTo(CopyToState cstate)
+{
+ if (cstate->queryDesc != NULL)
+ {
+ /* Close down the query and free resources. */
+ ExecutorFinish(cstate->queryDesc);
+ ExecutorEnd(cstate->queryDesc);
+ FreeQueryDesc(cstate->queryDesc);
+ PopActiveSnapshot();
+ }
+
+ /* Clean up storage */
+ EndCopy(cstate);
+}
+
+/*
+ * Copy from relation or query TO file.
+ *
+ * Returns the number of rows processed.
+ */
+uint64
+DoCopyTo(CopyToState cstate)
+{
+ bool pipe = (cstate->filename == NULL && cstate->data_dest_cb == NULL);
+ bool fe_copy = (pipe && whereToSendOutput == DestRemote);
+ TupleDesc tupDesc;
+ int num_phys_attrs;
+ ListCell *cur;
+ uint64 processed;
+
+ if (fe_copy)
+ SendCopyBegin(cstate);
+
+ if (cstate->rel)
+ tupDesc = RelationGetDescr(cstate->rel);
+ else
+ tupDesc = cstate->queryDesc->tupDesc;
+ num_phys_attrs = tupDesc->natts;
+ cstate->opts.null_print_client = cstate->opts.null_print; /* default */
+
+ /* We use fe_msgbuf as a per-row buffer regardless of copy_dest */
+ cstate->fe_msgbuf = makeStringInfo();
+
+ /* Get info about the columns we need to process. */
+ cstate->out_functions = (FmgrInfo *) palloc(num_phys_attrs * sizeof(FmgrInfo));
+ foreach(cur, cstate->attnumlist)
+ {
+ int attnum = lfirst_int(cur);
+ Oid out_func_oid;
+ bool isvarlena;
+ Form_pg_attribute attr = TupleDescAttr(tupDesc, attnum - 1);
+
+ if (cstate->opts.binary)
+ getTypeBinaryOutputInfo(attr->atttypid,
+ &out_func_oid,
+ &isvarlena);
+ else
+ getTypeOutputInfo(attr->atttypid,
+ &out_func_oid,
+ &isvarlena);
+ fmgr_info(out_func_oid, &cstate->out_functions[attnum - 1]);
+ }
+
+ /*
+ * Create a temporary memory context that we can reset once per row to
+ * recover palloc'd memory. This avoids any problems with leaks inside
+ * datatype output routines, and should be faster than retail pfree's
+ * anyway. (We don't need a whole econtext as CopyFrom does.)
+ */
+ cstate->rowcontext = AllocSetContextCreate(CurrentMemoryContext,
+ "COPY TO",
+ ALLOCSET_DEFAULT_SIZES);
+
+ if (cstate->opts.binary)
+ {
+ /* Generate header for a binary copy */
+ int32 tmp;
+
+ /* Signature */
+ CopySendData(cstate, BinarySignature, 11);
+ /* Flags field */
+ tmp = 0;
+ CopySendInt32(cstate, tmp);
+ /* No header extension */
+ tmp = 0;
+ CopySendInt32(cstate, tmp);
+ }
+ else
+ {
+ /*
+ * For non-binary copy, we need to convert null_print to file
+ * encoding, because it will be sent directly with CopySendString.
+ */
+ if (cstate->need_transcoding)
+ cstate->opts.null_print_client = pg_server_to_any(cstate->opts.null_print,
+ cstate->opts.null_print_len,
+ cstate->file_encoding);
+
+ /* if a header has been requested send the line */
+ if (cstate->opts.header_line)
+ {
+ bool hdr_delim = false;
+
+ foreach(cur, cstate->attnumlist)
+ {
+ int attnum = lfirst_int(cur);
+ char *colname;
+
+ if (hdr_delim)
+ CopySendChar(cstate, cstate->opts.delim[0]);
+ hdr_delim = true;
+
+ colname = NameStr(TupleDescAttr(tupDesc, attnum - 1)->attname);
+
+ if (cstate->opts.csv_mode)
+ CopyAttributeOutCSV(cstate, colname, false,
+ list_length(cstate->attnumlist) == 1);
+ else
+ CopyAttributeOutText(cstate, colname);
+ }
+
+ CopySendEndOfRow(cstate);
+ }
+ }
+
+ if (cstate->rel)
+ {
+ TupleTableSlot *slot;
+ TableScanDesc scandesc;
+
+ scandesc = table_beginscan(cstate->rel, GetActiveSnapshot(), 0, NULL);
+ slot = table_slot_create(cstate->rel, NULL);
+
+ processed = 0;
+ while (table_scan_getnextslot(scandesc, ForwardScanDirection, slot))
+ {
+ CHECK_FOR_INTERRUPTS();
+
+ /* Deconstruct the tuple ... */
+ slot_getallattrs(slot);
+
+ /* Format and send the data */
+ CopyOneRowTo(cstate, slot);
+
+ /*
+ * Increment the number of processed tuples, and report the
+ * progress.
+ */
+ pgstat_progress_update_param(PROGRESS_COPY_TUPLES_PROCESSED,
+ ++processed);
+ }
+
+ ExecDropSingleTupleTableSlot(slot);
+ table_endscan(scandesc);
+ }
+ else
+ {
+ /* run the plan --- the dest receiver will send tuples */
+ ExecutorRun(cstate->queryDesc, ForwardScanDirection, 0, true);
+ processed = ((DR_copy *) cstate->queryDesc->dest)->processed;
+ }
+
+ if (cstate->opts.binary)
+ {
+ /* Generate trailer for a binary copy */
+ CopySendInt16(cstate, -1);
+ /* Need to flush out the trailer */
+ CopySendEndOfRow(cstate);
+ }
+
+ MemoryContextDelete(cstate->rowcontext);
+
+ if (fe_copy)
+ SendCopyEnd(cstate);
+
+ return processed;
+}
+
+/*
+ * Emit one row during DoCopyTo().
+ */
+static void
+CopyOneRowTo(CopyToState cstate, TupleTableSlot *slot)
+{
+ bool need_delim = false;
+ FmgrInfo *out_functions = cstate->out_functions;
+ MemoryContext oldcontext;
+ ListCell *cur;
+ char *string;
+
+ MemoryContextReset(cstate->rowcontext);
+ oldcontext = MemoryContextSwitchTo(cstate->rowcontext);
+
+ if (cstate->opts.binary)
+ {
+ /* Binary per-tuple header */
+ CopySendInt16(cstate, list_length(cstate->attnumlist));
+ }
+
+ /* Make sure the tuple is fully deconstructed */
+ slot_getallattrs(slot);
+
+ foreach(cur, cstate->attnumlist)
+ {
+ int attnum = lfirst_int(cur);
+ Datum value = slot->tts_values[attnum - 1];
+ bool isnull = slot->tts_isnull[attnum - 1];
+
+ if (!cstate->opts.binary)
+ {
+ if (need_delim)
+ CopySendChar(cstate, cstate->opts.delim[0]);
+ need_delim = true;
+ }
+
+ if (isnull)
+ {
+ if (!cstate->opts.binary)
+ CopySendString(cstate, cstate->opts.null_print_client);
+ else
+ CopySendInt32(cstate, -1);
+ }
+ else
+ {
+ if (!cstate->opts.binary)
+ {
+ string = OutputFunctionCall(&out_functions[attnum - 1],
+ value);
+ if (cstate->opts.csv_mode)
+ CopyAttributeOutCSV(cstate, string,
+ cstate->opts.force_quote_flags[attnum - 1],
+ list_length(cstate->attnumlist) == 1);
+ else
+ CopyAttributeOutText(cstate, string);
+ }
+ else
+ {
+ bytea *outputbytes;
+
+ outputbytes = SendFunctionCall(&out_functions[attnum - 1],
+ value);
+ CopySendInt32(cstate, VARSIZE(outputbytes) - VARHDRSZ);
+ CopySendData(cstate, VARDATA(outputbytes),
+ VARSIZE(outputbytes) - VARHDRSZ);
+ }
+ }
+ }
+
+ CopySendEndOfRow(cstate);
+
+ MemoryContextSwitchTo(oldcontext);
+}
+
+/*
+ * Send text representation of one attribute, with conversion and escaping
+ */
+#define DUMPSOFAR() \
+ do { \
+ if (ptr > start) \
+ CopySendData(cstate, start, ptr - start); \
+ } while (0)
+
+static void
+CopyAttributeOutText(CopyToState cstate, const char *string)
+{
+ const char *ptr;
+ const char *start;
+ char c;
+ char delimc = cstate->opts.delim[0];
+
+ if (cstate->need_transcoding)
+ ptr = pg_server_to_any(string, strlen(string), cstate->file_encoding);
+ else
+ ptr = string;
+
+ /*
+ * We have to grovel through the string searching for control characters
+ * and instances of the delimiter character. In most cases, though, these
+ * are infrequent. To avoid overhead from calling CopySendData once per
+ * character, we dump out all characters between escaped characters in a
+ * single call. The loop invariant is that the data from "start" to "ptr"
+ * can be sent literally, but hasn't yet been.
+ *
+ * We can skip pg_encoding_mblen() overhead when encoding is safe, because
+ * in valid backend encodings, extra bytes of a multibyte character never
+ * look like ASCII. This loop is sufficiently performance-critical that
+ * it's worth making two copies of it to get the IS_HIGHBIT_SET() test out
+ * of the normal safe-encoding path.
+ */
+ if (cstate->encoding_embeds_ascii)
+ {
+ start = ptr;
+ while ((c = *ptr) != '\0')
+ {
+ if ((unsigned char) c < (unsigned char) 0x20)
+ {
+ /*
+ * \r and \n must be escaped, the others are traditional. We
+ * prefer to dump these using the C-like notation, rather than
+ * a backslash and the literal character, because it makes the
+ * dump file a bit more proof against Microsoftish data
+ * mangling.
+ */
+ switch (c)
+ {
+ case '\b':
+ c = 'b';
+ break;
+ case '\f':
+ c = 'f';
+ break;
+ case '\n':
+ c = 'n';
+ break;
+ case '\r':
+ c = 'r';
+ break;
+ case '\t':
+ c = 't';
+ break;
+ case '\v':
+ c = 'v';
+ break;
+ default:
+ /* If it's the delimiter, must backslash it */
+ if (c == delimc)
+ break;
+ /* All ASCII control chars are length 1 */
+ ptr++;
+ continue; /* fall to end of loop */
+ }
+ /* if we get here, we need to convert the control char */
+ DUMPSOFAR();
+ CopySendChar(cstate, '\\');
+ CopySendChar(cstate, c);
+ start = ++ptr; /* do not include char in next run */
+ }
+ else if (c == '\\' || c == delimc)
+ {
+ DUMPSOFAR();
+ CopySendChar(cstate, '\\');
+ start = ptr++; /* we include char in next run */
+ }
+ else if (IS_HIGHBIT_SET(c))
+ ptr += pg_encoding_mblen(cstate->file_encoding, ptr);
+ else
+ ptr++;
+ }
+ }
+ else
+ {
+ start = ptr;
+ while ((c = *ptr) != '\0')
+ {
+ if ((unsigned char) c < (unsigned char) 0x20)
+ {
+ /*
+ * \r and \n must be escaped, the others are traditional. We
+ * prefer to dump these using the C-like notation, rather than
+ * a backslash and the literal character, because it makes the
+ * dump file a bit more proof against Microsoftish data
+ * mangling.
+ */
+ switch (c)
+ {
+ case '\b':
+ c = 'b';
+ break;
+ case '\f':
+ c = 'f';
+ break;
+ case '\n':
+ c = 'n';
+ break;
+ case '\r':
+ c = 'r';
+ break;
+ case '\t':
+ c = 't';
+ break;
+ case '\v':
+ c = 'v';
+ break;
+ default:
+ /* If it's the delimiter, must backslash it */
+ if (c == delimc)
+ break;
+ /* All ASCII control chars are length 1 */
+ ptr++;
+ continue; /* fall to end of loop */
+ }
+ /* if we get here, we need to convert the control char */
+ DUMPSOFAR();
+ CopySendChar(cstate, '\\');
+ CopySendChar(cstate, c);
+ start = ++ptr; /* do not include char in next run */
+ }
+ else if (c == '\\' || c == delimc)
+ {
+ DUMPSOFAR();
+ CopySendChar(cstate, '\\');
+ start = ptr++; /* we include char in next run */
+ }
+ else
+ ptr++;
+ }
+ }
+
+ DUMPSOFAR();
+}
+
+/*
+ * Send text representation of one attribute, with conversion and
+ * CSV-style escaping
+ */
+static void
+CopyAttributeOutCSV(CopyToState cstate, const char *string,
+ bool use_quote, bool single_attr)
+{
+ const char *ptr;
+ const char *start;
+ char c;
+ char delimc = cstate->opts.delim[0];
+ char quotec = cstate->opts.quote[0];
+ char escapec = cstate->opts.escape[0];
+
+ /* force quoting if it matches null_print (before conversion!) */
+ if (!use_quote && strcmp(string, cstate->opts.null_print) == 0)
+ use_quote = true;
+
+ if (cstate->need_transcoding)
+ ptr = pg_server_to_any(string, strlen(string), cstate->file_encoding);
+ else
+ ptr = string;
+
+ /*
+ * Make a preliminary pass to discover if it needs quoting
+ */
+ if (!use_quote)
+ {
+ /*
+ * Because '\.' can be a data value, quote it if it appears alone on a
+ * line so it is not interpreted as the end-of-data marker.
+ */
+ if (single_attr && strcmp(ptr, "\\.") == 0)
+ use_quote = true;
+ else
+ {
+ const char *tptr = ptr;
+
+ while ((c = *tptr) != '\0')
+ {
+ if (c == delimc || c == quotec || c == '\n' || c == '\r')
+ {
+ use_quote = true;
+ break;
+ }
+ if (IS_HIGHBIT_SET(c) && cstate->encoding_embeds_ascii)
+ tptr += pg_encoding_mblen(cstate->file_encoding, tptr);
+ else
+ tptr++;
+ }
+ }
+ }
+
+ if (use_quote)
+ {
+ CopySendChar(cstate, quotec);
+
+ /*
+ * We adopt the same optimization strategy as in CopyAttributeOutText
+ */
+ start = ptr;
+ while ((c = *ptr) != '\0')
+ {
+ if (c == quotec || c == escapec)
+ {
+ DUMPSOFAR();
+ CopySendChar(cstate, escapec);
+ start = ptr; /* we include char in next run */
+ }
+ if (IS_HIGHBIT_SET(c) && cstate->encoding_embeds_ascii)
+ ptr += pg_encoding_mblen(cstate->file_encoding, ptr);
+ else
+ ptr++;
+ }
+ DUMPSOFAR();
+
+ CopySendChar(cstate, quotec);
+ }
+ else
+ {
+ /* If it doesn't need quoting, we can just dump it as-is */
+ CopySendString(cstate, ptr);
+ }
+}
+
+/*
+ * copy_dest_startup --- executor startup
+ */
+static void
+copy_dest_startup(DestReceiver *self, int operation, TupleDesc typeinfo)
+{
+ /* no-op */
+}
+
+/*
+ * copy_dest_receive --- receive one tuple
+ */
+static bool
+copy_dest_receive(TupleTableSlot *slot, DestReceiver *self)
+{
+ DR_copy *myState = (DR_copy *) self;
+ CopyToState cstate = myState->cstate;
+
+ /* Send the data */
+ CopyOneRowTo(cstate, slot);
+
+ /* Increment the number of processed tuples, and report the progress */
+ pgstat_progress_update_param(PROGRESS_COPY_TUPLES_PROCESSED,
+ ++myState->processed);
+
+ return true;
+}
+
+/*
+ * copy_dest_shutdown --- executor end
+ */
+static void
+copy_dest_shutdown(DestReceiver *self)
+{
+ /* no-op */
+}
+
+/*
+ * copy_dest_destroy --- release DestReceiver object
+ */
+static void
+copy_dest_destroy(DestReceiver *self)
+{
+ pfree(self);
+}
+
+/*
+ * CreateCopyDestReceiver -- create a suitable DestReceiver object
+ */
+DestReceiver *
+CreateCopyDestReceiver(void)
+{
+ DR_copy *self = (DR_copy *) palloc(sizeof(DR_copy));
+
+ self->pub.receiveSlot = copy_dest_receive;
+ self->pub.rStartup = copy_dest_startup;
+ self->pub.rShutdown = copy_dest_shutdown;
+ self->pub.rDestroy = copy_dest_destroy;
+ self->pub.mydest = DestCopyOut;
+
+ self->cstate = NULL; /* will be set later */
+ self->processed = 0;
+
+ return (DestReceiver *) self;
+}