diff options
Diffstat (limited to 'src/backend/commands/copyfromparse.c')
-rw-r--r-- | src/backend/commands/copyfromparse.c | 1872 |
1 files changed, 1872 insertions, 0 deletions
diff --git a/src/backend/commands/copyfromparse.c b/src/backend/commands/copyfromparse.c new file mode 100644 index 0000000..89ceb23 --- /dev/null +++ b/src/backend/commands/copyfromparse.c @@ -0,0 +1,1872 @@ +/*------------------------------------------------------------------------- + * + * copyfromparse.c + * Parse CSV/text/binary format for COPY FROM. + * + * This file contains routines to parse the text, CSV and binary input + * formats. The main entry point is NextCopyFrom(), which parses the + * next input line and returns it as Datums. + * + * In text/CSV mode, the parsing happens in multiple stages: + * + * [data source] --> raw_buf --> input_buf --> line_buf --> attribute_buf + * 1. 2. 3. 4. + * + * 1. CopyLoadRawBuf() reads raw data from the input file or client, and + * places it into 'raw_buf'. + * + * 2. CopyConvertBuf() calls the encoding conversion function to convert + * the data in 'raw_buf' from client to server encoding, placing the + * converted result in 'input_buf'. + * + * 3. CopyReadLine() parses the data in 'input_buf', one line at a time. + * It is responsible for finding the next newline marker, taking quote and + * escape characters into account according to the COPY options. The line + * is copied into 'line_buf', with quotes and escape characters still + * intact. + * + * 4. CopyReadAttributesText/CSV() function takes the input line from + * 'line_buf', and splits it into fields, unescaping the data as required. + * The fields are stored in 'attribute_buf', and 'raw_fields' array holds + * pointers to each field. + * + * If encoding conversion is not required, a shortcut is taken in step 2 to + * avoid copying the data unnecessarily. The 'input_buf' pointer is set to + * point directly to 'raw_buf', so that CopyLoadRawBuf() loads the raw data + * directly into 'input_buf'. CopyConvertBuf() then merely validates that + * the data is valid in the current encoding. + * + * In binary mode, the pipeline is much simpler. Input is loaded into + * into 'raw_buf', and encoding conversion is done in the datatype-specific + * receive functions, if required. 'input_buf' and 'line_buf' are not used, + * but 'attribute_buf' is used as a temporary buffer to hold one attribute's + * data when it's passed the receive function. + * + * 'raw_buf' is always 64 kB in size (RAW_BUF_SIZE). 'input_buf' is also + * 64 kB (INPUT_BUF_SIZE), if encoding conversion is required. 'line_buf' + * and 'attribute_buf' are expanded on demand, to hold the longest line + * encountered so far. + * + * Portions Copyright (c) 1996-2021, PostgreSQL Global Development Group + * Portions Copyright (c) 1994, Regents of the University of California + * + * + * IDENTIFICATION + * src/backend/commands/copyfromparse.c + * + *------------------------------------------------------------------------- + */ +#include "postgres.h" + +#include <ctype.h> +#include <unistd.h> +#include <sys/stat.h> + +#include "commands/copy.h" +#include "commands/copyfrom_internal.h" +#include "commands/progress.h" +#include "executor/executor.h" +#include "libpq/libpq.h" +#include "libpq/pqformat.h" +#include "mb/pg_wchar.h" +#include "miscadmin.h" +#include "pgstat.h" +#include "port/pg_bswap.h" +#include "utils/memutils.h" +#include "utils/rel.h" + +#define ISOCTAL(c) (((c) >= '0') && ((c) <= '7')) +#define OCTVALUE(c) ((c) - '0') + +/* + * These macros centralize code used to process line_buf and input_buf buffers. + * They are macros because they often do continue/break control and to avoid + * function call overhead in tight COPY loops. + * + * We must use "if (1)" because the usual "do {...} while(0)" wrapper would + * prevent the continue/break processing from working. We end the "if (1)" + * with "else ((void) 0)" to ensure the "if" does not unintentionally match + * any "else" in the calling code, and to avoid any compiler warnings about + * empty statements. See http://www.cit.gu.edu.au/~anthony/info/C/C.macros. + */ + +/* + * This keeps the character read at the top of the loop in the buffer + * even if there is more than one read-ahead. + */ +#define IF_NEED_REFILL_AND_NOT_EOF_CONTINUE(extralen) \ +if (1) \ +{ \ + if (input_buf_ptr + (extralen) >= copy_buf_len && !hit_eof) \ + { \ + input_buf_ptr = prev_raw_ptr; /* undo fetch */ \ + need_data = true; \ + continue; \ + } \ +} else ((void) 0) + +/* This consumes the remainder of the buffer and breaks */ +#define IF_NEED_REFILL_AND_EOF_BREAK(extralen) \ +if (1) \ +{ \ + if (input_buf_ptr + (extralen) >= copy_buf_len && hit_eof) \ + { \ + if (extralen) \ + input_buf_ptr = copy_buf_len; /* consume the partial character */ \ + /* backslash just before EOF, treat as data char */ \ + result = true; \ + break; \ + } \ +} else ((void) 0) + +/* + * Transfer any approved data to line_buf; must do this to be sure + * there is some room in input_buf. + */ +#define REFILL_LINEBUF \ +if (1) \ +{ \ + if (input_buf_ptr > cstate->input_buf_index) \ + { \ + appendBinaryStringInfo(&cstate->line_buf, \ + cstate->input_buf + cstate->input_buf_index, \ + input_buf_ptr - cstate->input_buf_index); \ + cstate->input_buf_index = input_buf_ptr; \ + } \ +} else ((void) 0) + +/* Undo any read-ahead and jump out of the block. */ +#define NO_END_OF_COPY_GOTO \ +if (1) \ +{ \ + input_buf_ptr = prev_raw_ptr + 1; \ + goto not_end_of_copy; \ +} else ((void) 0) + +/* NOTE: there's a copy of this in copyto.c */ +static const char BinarySignature[11] = "PGCOPY\n\377\r\n\0"; + + +/* non-export function prototypes */ +static bool CopyReadLine(CopyFromState cstate); +static bool CopyReadLineText(CopyFromState cstate); +static int CopyReadAttributesText(CopyFromState cstate); +static int CopyReadAttributesCSV(CopyFromState cstate); +static Datum CopyReadBinaryAttribute(CopyFromState cstate, FmgrInfo *flinfo, + Oid typioparam, int32 typmod, + bool *isnull); + + +/* Low-level communications functions */ +static int CopyGetData(CopyFromState cstate, void *databuf, + int minread, int maxread); +static inline bool CopyGetInt32(CopyFromState cstate, int32 *val); +static inline bool CopyGetInt16(CopyFromState cstate, int16 *val); +static void CopyLoadInputBuf(CopyFromState cstate); +static int CopyReadBinaryData(CopyFromState cstate, char *dest, int nbytes); + +void +ReceiveCopyBegin(CopyFromState cstate) +{ + StringInfoData buf; + int natts = list_length(cstate->attnumlist); + int16 format = (cstate->opts.binary ? 1 : 0); + int i; + + pq_beginmessage(&buf, 'G'); + pq_sendbyte(&buf, format); /* overall format */ + pq_sendint16(&buf, natts); + for (i = 0; i < natts; i++) + pq_sendint16(&buf, format); /* per-column formats */ + pq_endmessage(&buf); + cstate->copy_src = COPY_FRONTEND; + cstate->fe_msgbuf = makeStringInfo(); + /* We *must* flush here to ensure FE knows it can send. */ + pq_flush(); +} + +void +ReceiveCopyBinaryHeader(CopyFromState cstate) +{ + char readSig[11]; + int32 tmp; + + /* Signature */ + if (CopyReadBinaryData(cstate, readSig, 11) != 11 || + memcmp(readSig, BinarySignature, 11) != 0) + ereport(ERROR, + (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), + errmsg("COPY file signature not recognized"))); + /* Flags field */ + if (!CopyGetInt32(cstate, &tmp)) + ereport(ERROR, + (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), + errmsg("invalid COPY file header (missing flags)"))); + if ((tmp & (1 << 16)) != 0) + ereport(ERROR, + (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), + errmsg("invalid COPY file header (WITH OIDS)"))); + tmp &= ~(1 << 16); + if ((tmp >> 16) != 0) + ereport(ERROR, + (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), + errmsg("unrecognized critical flags in COPY file header"))); + /* Header extension length */ + if (!CopyGetInt32(cstate, &tmp) || + tmp < 0) + ereport(ERROR, + (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), + errmsg("invalid COPY file header (missing length)"))); + /* Skip extension header, if present */ + while (tmp-- > 0) + { + if (CopyReadBinaryData(cstate, readSig, 1) != 1) + ereport(ERROR, + (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), + errmsg("invalid COPY file header (wrong length)"))); + } +} + +/* + * CopyGetData reads data from the source (file or frontend) + * + * We attempt to read at least minread, and at most maxread, bytes from + * the source. The actual number of bytes read is returned; if this is + * less than minread, EOF was detected. + * + * Note: when copying from the frontend, we expect a proper EOF mark per + * protocol; if the frontend simply drops the connection, we raise error. + * It seems unwise to allow the COPY IN to complete normally in that case. + * + * NB: no data conversion is applied here. + */ +static int +CopyGetData(CopyFromState cstate, void *databuf, int minread, int maxread) +{ + int bytesread = 0; + + switch (cstate->copy_src) + { + case COPY_FILE: + bytesread = fread(databuf, 1, maxread, cstate->copy_file); + if (ferror(cstate->copy_file)) + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not read from COPY file: %m"))); + if (bytesread == 0) + cstate->raw_reached_eof = true; + break; + case COPY_FRONTEND: + while (maxread > 0 && bytesread < minread && !cstate->raw_reached_eof) + { + int avail; + + while (cstate->fe_msgbuf->cursor >= cstate->fe_msgbuf->len) + { + /* Try to receive another message */ + int mtype; + int maxmsglen; + + readmessage: + HOLD_CANCEL_INTERRUPTS(); + pq_startmsgread(); + mtype = pq_getbyte(); + if (mtype == EOF) + ereport(ERROR, + (errcode(ERRCODE_CONNECTION_FAILURE), + errmsg("unexpected EOF on client connection with an open transaction"))); + /* Validate message type and set packet size limit */ + switch (mtype) + { + case 'd': /* CopyData */ + maxmsglen = PQ_LARGE_MESSAGE_LIMIT; + break; + case 'c': /* CopyDone */ + case 'f': /* CopyFail */ + case 'H': /* Flush */ + case 'S': /* Sync */ + maxmsglen = PQ_SMALL_MESSAGE_LIMIT; + break; + default: + ereport(ERROR, + (errcode(ERRCODE_PROTOCOL_VIOLATION), + errmsg("unexpected message type 0x%02X during COPY from stdin", + mtype))); + maxmsglen = 0; /* keep compiler quiet */ + break; + } + /* Now collect the message body */ + if (pq_getmessage(cstate->fe_msgbuf, maxmsglen)) + ereport(ERROR, + (errcode(ERRCODE_CONNECTION_FAILURE), + errmsg("unexpected EOF on client connection with an open transaction"))); + RESUME_CANCEL_INTERRUPTS(); + /* ... and process it */ + switch (mtype) + { + case 'd': /* CopyData */ + break; + case 'c': /* CopyDone */ + /* COPY IN correctly terminated by frontend */ + cstate->raw_reached_eof = true; + return bytesread; + case 'f': /* CopyFail */ + ereport(ERROR, + (errcode(ERRCODE_QUERY_CANCELED), + errmsg("COPY from stdin failed: %s", + pq_getmsgstring(cstate->fe_msgbuf)))); + break; + case 'H': /* Flush */ + case 'S': /* Sync */ + + /* + * Ignore Flush/Sync for the convenience of client + * libraries (such as libpq) that may send those + * without noticing that the command they just + * sent was COPY. + */ + goto readmessage; + default: + Assert(false); /* NOT REACHED */ + } + } + avail = cstate->fe_msgbuf->len - cstate->fe_msgbuf->cursor; + if (avail > maxread) + avail = maxread; + pq_copymsgbytes(cstate->fe_msgbuf, databuf, avail); + databuf = (void *) ((char *) databuf + avail); + maxread -= avail; + bytesread += avail; + } + break; + case COPY_CALLBACK: + bytesread = cstate->data_source_cb(databuf, minread, maxread); + break; + } + + return bytesread; +} + + +/* + * These functions do apply some data conversion + */ + +/* + * CopyGetInt32 reads an int32 that appears in network byte order + * + * Returns true if OK, false if EOF + */ +static inline bool +CopyGetInt32(CopyFromState cstate, int32 *val) +{ + uint32 buf; + + if (CopyReadBinaryData(cstate, (char *) &buf, sizeof(buf)) != sizeof(buf)) + { + *val = 0; /* suppress compiler warning */ + return false; + } + *val = (int32) pg_ntoh32(buf); + return true; +} + +/* + * CopyGetInt16 reads an int16 that appears in network byte order + */ +static inline bool +CopyGetInt16(CopyFromState cstate, int16 *val) +{ + uint16 buf; + + if (CopyReadBinaryData(cstate, (char *) &buf, sizeof(buf)) != sizeof(buf)) + { + *val = 0; /* suppress compiler warning */ + return false; + } + *val = (int16) pg_ntoh16(buf); + return true; +} + + +/* + * Perform encoding conversion on data in 'raw_buf', writing the converted + * data into 'input_buf'. + * + * On entry, there must be some data to convert in 'raw_buf'. + */ +static void +CopyConvertBuf(CopyFromState cstate) +{ + /* + * If the file and server encoding are the same, no encoding conversion is + * required. However, we still need to verify that the input is valid for + * the encoding. + */ + if (!cstate->need_transcoding) + { + /* + * When conversion is not required, input_buf and raw_buf are the + * same. raw_buf_len is the total number of bytes in the buffer, and + * input_buf_len tracks how many of those bytes have already been + * verified. + */ + int preverifiedlen = cstate->input_buf_len; + int unverifiedlen = cstate->raw_buf_len - cstate->input_buf_len; + int nverified; + + if (unverifiedlen == 0) + { + /* + * If no more raw data is coming, report the EOF to the caller. + */ + if (cstate->raw_reached_eof) + cstate->input_reached_eof = true; + return; + } + + /* + * Verify the new data, including any residual unverified bytes from + * previous round. + */ + nverified = pg_encoding_verifymbstr(cstate->file_encoding, + cstate->raw_buf + preverifiedlen, + unverifiedlen); + if (nverified == 0) + { + /* + * Could not verify anything. + * + * If there is no more raw input data coming, it means that there + * was an incomplete multi-byte sequence at the end. Also, if + * there's "enough" input left, we should be able to verify at + * least one character, and a failure to do so means that we've + * hit an invalid byte sequence. + */ + if (cstate->raw_reached_eof || unverifiedlen >= pg_encoding_max_length(cstate->file_encoding)) + cstate->input_reached_error = true; + return; + } + cstate->input_buf_len += nverified; + } + else + { + /* + * Encoding conversion is needed. + */ + int nbytes; + unsigned char *src; + int srclen; + unsigned char *dst; + int dstlen; + int convertedlen; + + if (RAW_BUF_BYTES(cstate) == 0) + { + /* + * If no more raw data is coming, report the EOF to the caller. + */ + if (cstate->raw_reached_eof) + cstate->input_reached_eof = true; + return; + } + + /* + * First, copy down any unprocessed data. + */ + nbytes = INPUT_BUF_BYTES(cstate); + if (nbytes > 0 && cstate->input_buf_index > 0) + memmove(cstate->input_buf, cstate->input_buf + cstate->input_buf_index, + nbytes); + cstate->input_buf_index = 0; + cstate->input_buf_len = nbytes; + cstate->input_buf[nbytes] = '\0'; + + src = (unsigned char *) cstate->raw_buf + cstate->raw_buf_index; + srclen = cstate->raw_buf_len - cstate->raw_buf_index; + dst = (unsigned char *) cstate->input_buf + cstate->input_buf_len; + dstlen = INPUT_BUF_SIZE - cstate->input_buf_len + 1; + + /* + * Do the conversion. This might stop short, if there is an invalid + * byte sequence in the input. We'll convert as much as we can in + * that case. + * + * Note: Even if we hit an invalid byte sequence, we don't report the + * error until all the valid bytes have been consumed. The input + * might contain an end-of-input marker (\.), and we don't want to + * report an error if the invalid byte sequence is after the + * end-of-input marker. We might unnecessarily convert some data + * after the end-of-input marker as long as it's valid for the + * encoding, but that's harmless. + */ + convertedlen = pg_do_encoding_conversion_buf(cstate->conversion_proc, + cstate->file_encoding, + GetDatabaseEncoding(), + src, srclen, + dst, dstlen, + true); + if (convertedlen == 0) + { + /* + * Could not convert anything. If there is no more raw input data + * coming, it means that there was an incomplete multi-byte + * sequence at the end. Also, if there is plenty of input left, + * we should be able to convert at least one character, so a + * failure to do so must mean that we've hit a byte sequence + * that's invalid. + */ + if (cstate->raw_reached_eof || srclen >= MAX_CONVERSION_INPUT_LENGTH) + cstate->input_reached_error = true; + return; + } + cstate->raw_buf_index += convertedlen; + cstate->input_buf_len += strlen((char *) dst); + } +} + +/* + * Report an encoding or conversion error. + */ +static void +CopyConversionError(CopyFromState cstate) +{ + Assert(cstate->raw_buf_len > 0); + Assert(cstate->input_reached_error); + + if (!cstate->need_transcoding) + { + /* + * Everything up to input_buf_len was successfully verified, and + * input_buf_len points to the invalid or incomplete character. + */ + report_invalid_encoding(cstate->file_encoding, + cstate->raw_buf + cstate->input_buf_len, + cstate->raw_buf_len - cstate->input_buf_len); + } + else + { + /* + * raw_buf_index points to the invalid or untranslatable character. We + * let the conversion routine report the error, because it can provide + * a more specific error message than we could here. An earlier call + * to the conversion routine in CopyConvertBuf() detected that there + * is an error, now we call the conversion routine again with + * noError=false, to have it throw the error. + */ + unsigned char *src; + int srclen; + unsigned char *dst; + int dstlen; + + src = (unsigned char *) cstate->raw_buf + cstate->raw_buf_index; + srclen = cstate->raw_buf_len - cstate->raw_buf_index; + dst = (unsigned char *) cstate->input_buf + cstate->input_buf_len; + dstlen = INPUT_BUF_SIZE - cstate->input_buf_len + 1; + + (void) pg_do_encoding_conversion_buf(cstate->conversion_proc, + cstate->file_encoding, + GetDatabaseEncoding(), + src, srclen, + dst, dstlen, + false); + + /* + * The conversion routine should have reported an error, so this + * should not be reached. + */ + elog(ERROR, "encoding conversion failed without error"); + } +} + +/* + * Load more data from data source to raw_buf. + * + * If RAW_BUF_BYTES(cstate) > 0, the unprocessed bytes are moved to the + * beginning of the buffer, and we load new data after that. + */ +static void +CopyLoadRawBuf(CopyFromState cstate) +{ + int nbytes; + int inbytes; + + /* + * In text mode, if encoding conversion is not required, raw_buf and + * input_buf point to the same buffer. Their len/index better agree, too. + */ + if (cstate->raw_buf == cstate->input_buf) + { + Assert(!cstate->need_transcoding); + Assert(cstate->raw_buf_index == cstate->input_buf_index); + Assert(cstate->input_buf_len <= cstate->raw_buf_len); + } + + /* + * Copy down the unprocessed data if any. + */ + nbytes = RAW_BUF_BYTES(cstate); + if (nbytes > 0 && cstate->raw_buf_index > 0) + memmove(cstate->raw_buf, cstate->raw_buf + cstate->raw_buf_index, + nbytes); + cstate->raw_buf_len -= cstate->raw_buf_index; + cstate->raw_buf_index = 0; + + /* + * If raw_buf and input_buf are in fact the same buffer, adjust the + * input_buf variables, too. + */ + if (cstate->raw_buf == cstate->input_buf) + { + cstate->input_buf_len -= cstate->input_buf_index; + cstate->input_buf_index = 0; + } + + /* Load more data */ + inbytes = CopyGetData(cstate, cstate->raw_buf + cstate->raw_buf_len, + 1, RAW_BUF_SIZE - cstate->raw_buf_len); + nbytes += inbytes; + cstate->raw_buf[nbytes] = '\0'; + cstate->raw_buf_len = nbytes; + + cstate->bytes_processed += inbytes; + pgstat_progress_update_param(PROGRESS_COPY_BYTES_PROCESSED, cstate->bytes_processed); + + if (inbytes == 0) + cstate->raw_reached_eof = true; +} + +/* + * CopyLoadInputBuf loads some more data into input_buf + * + * On return, at least one more input character is loaded into + * input_buf, or input_reached_eof is set. + * + * If INPUT_BUF_BYTES(cstate) > 0, the unprocessed bytes are moved to the start + * of the buffer and then we load more data after that. + */ +static void +CopyLoadInputBuf(CopyFromState cstate) +{ + int nbytes = INPUT_BUF_BYTES(cstate); + + /* + * The caller has updated input_buf_index to indicate how much of the + * input has been consumed and isn't needed anymore. If input_buf is the + * same physical area as raw_buf, update raw_buf_index accordingly. + */ + if (cstate->raw_buf == cstate->input_buf) + { + Assert(!cstate->need_transcoding); + Assert(cstate->input_buf_index >= cstate->raw_buf_index); + cstate->raw_buf_index = cstate->input_buf_index; + } + + for (;;) + { + /* If we now have some unconverted data, try to convert it */ + CopyConvertBuf(cstate); + + /* If we now have some more input bytes ready, return them */ + if (INPUT_BUF_BYTES(cstate) > nbytes) + return; + + /* + * If we reached an invalid byte sequence, or we're at an incomplete + * multi-byte character but there is no more raw input data, report + * conversion error. + */ + if (cstate->input_reached_error) + CopyConversionError(cstate); + + /* no more input, and everything has been converted */ + if (cstate->input_reached_eof) + break; + + /* Try to load more raw data */ + Assert(!cstate->raw_reached_eof); + CopyLoadRawBuf(cstate); + } +} + +/* + * CopyReadBinaryData + * + * Reads up to 'nbytes' bytes from cstate->copy_file via cstate->raw_buf + * and writes them to 'dest'. Returns the number of bytes read (which + * would be less than 'nbytes' only if we reach EOF). + */ +static int +CopyReadBinaryData(CopyFromState cstate, char *dest, int nbytes) +{ + int copied_bytes = 0; + + if (RAW_BUF_BYTES(cstate) >= nbytes) + { + /* Enough bytes are present in the buffer. */ + memcpy(dest, cstate->raw_buf + cstate->raw_buf_index, nbytes); + cstate->raw_buf_index += nbytes; + copied_bytes = nbytes; + } + else + { + /* + * Not enough bytes in the buffer, so must read from the file. Need + * to loop since 'nbytes' could be larger than the buffer size. + */ + do + { + int copy_bytes; + + /* Load more data if buffer is empty. */ + if (RAW_BUF_BYTES(cstate) == 0) + { + CopyLoadRawBuf(cstate); + if (cstate->raw_reached_eof) + break; /* EOF */ + } + + /* Transfer some bytes. */ + copy_bytes = Min(nbytes - copied_bytes, RAW_BUF_BYTES(cstate)); + memcpy(dest, cstate->raw_buf + cstate->raw_buf_index, copy_bytes); + cstate->raw_buf_index += copy_bytes; + dest += copy_bytes; + copied_bytes += copy_bytes; + } while (copied_bytes < nbytes); + } + + return copied_bytes; +} + +/* + * Read raw fields in the next line for COPY FROM in text or csv mode. + * Return false if no more lines. + * + * An internal temporary buffer is returned via 'fields'. It is valid until + * the next call of the function. Since the function returns all raw fields + * in the input file, 'nfields' could be different from the number of columns + * in the relation. + * + * NOTE: force_not_null option are not applied to the returned fields. + */ +bool +NextCopyFromRawFields(CopyFromState cstate, char ***fields, int *nfields) +{ + int fldct; + bool done; + + /* only available for text or csv input */ + Assert(!cstate->opts.binary); + + /* on input just throw the header line away */ + if (cstate->cur_lineno == 0 && cstate->opts.header_line) + { + cstate->cur_lineno++; + if (CopyReadLine(cstate)) + return false; /* done */ + } + + cstate->cur_lineno++; + + /* Actually read the line into memory here */ + done = CopyReadLine(cstate); + + /* + * EOF at start of line means we're done. If we see EOF after some + * characters, we act as though it was newline followed by EOF, ie, + * process the line and then exit loop on next iteration. + */ + if (done && cstate->line_buf.len == 0) + return false; + + /* Parse the line into de-escaped field values */ + if (cstate->opts.csv_mode) + fldct = CopyReadAttributesCSV(cstate); + else + fldct = CopyReadAttributesText(cstate); + + *fields = cstate->raw_fields; + *nfields = fldct; + return true; +} + +/* + * Read next tuple from file for COPY FROM. Return false if no more tuples. + * + * 'econtext' is used to evaluate default expression for each columns not + * read from the file. It can be NULL when no default values are used, i.e. + * when all columns are read from the file. + * + * 'values' and 'nulls' arrays must be the same length as columns of the + * relation passed to BeginCopyFrom. This function fills the arrays. + */ +bool +NextCopyFrom(CopyFromState cstate, ExprContext *econtext, + Datum *values, bool *nulls) +{ + TupleDesc tupDesc; + AttrNumber num_phys_attrs, + attr_count, + num_defaults = cstate->num_defaults; + FmgrInfo *in_functions = cstate->in_functions; + Oid *typioparams = cstate->typioparams; + int i; + int *defmap = cstate->defmap; + ExprState **defexprs = cstate->defexprs; + + tupDesc = RelationGetDescr(cstate->rel); + num_phys_attrs = tupDesc->natts; + attr_count = list_length(cstate->attnumlist); + + /* Initialize all values for row to NULL */ + MemSet(values, 0, num_phys_attrs * sizeof(Datum)); + MemSet(nulls, true, num_phys_attrs * sizeof(bool)); + + if (!cstate->opts.binary) + { + char **field_strings; + ListCell *cur; + int fldct; + int fieldno; + char *string; + + /* read raw fields in the next line */ + if (!NextCopyFromRawFields(cstate, &field_strings, &fldct)) + return false; + + /* check for overflowing fields */ + if (attr_count > 0 && fldct > attr_count) + ereport(ERROR, + (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), + errmsg("extra data after last expected column"))); + + fieldno = 0; + + /* Loop to read the user attributes on the line. */ + foreach(cur, cstate->attnumlist) + { + int attnum = lfirst_int(cur); + int m = attnum - 1; + Form_pg_attribute att = TupleDescAttr(tupDesc, m); + + if (fieldno >= fldct) + ereport(ERROR, + (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), + errmsg("missing data for column \"%s\"", + NameStr(att->attname)))); + string = field_strings[fieldno++]; + + if (cstate->convert_select_flags && + !cstate->convert_select_flags[m]) + { + /* ignore input field, leaving column as NULL */ + continue; + } + + if (cstate->opts.csv_mode) + { + if (string == NULL && + cstate->opts.force_notnull_flags[m]) + { + /* + * FORCE_NOT_NULL option is set and column is NULL - + * convert it to the NULL string. + */ + string = cstate->opts.null_print; + } + else if (string != NULL && cstate->opts.force_null_flags[m] + && strcmp(string, cstate->opts.null_print) == 0) + { + /* + * FORCE_NULL option is set and column matches the NULL + * string. It must have been quoted, or otherwise the + * string would already have been set to NULL. Convert it + * to NULL as specified. + */ + string = NULL; + } + } + + cstate->cur_attname = NameStr(att->attname); + cstate->cur_attval = string; + values[m] = InputFunctionCall(&in_functions[m], + string, + typioparams[m], + att->atttypmod); + if (string != NULL) + nulls[m] = false; + cstate->cur_attname = NULL; + cstate->cur_attval = NULL; + } + + Assert(fieldno == attr_count); + } + else + { + /* binary */ + int16 fld_count; + ListCell *cur; + + cstate->cur_lineno++; + + if (!CopyGetInt16(cstate, &fld_count)) + { + /* EOF detected (end of file, or protocol-level EOF) */ + return false; + } + + if (fld_count == -1) + { + /* + * Received EOF marker. Wait for the protocol-level EOF, and + * complain if it doesn't come immediately. In COPY FROM STDIN, + * this ensures that we correctly handle CopyFail, if client + * chooses to send that now. When copying from file, we could + * ignore the rest of the file like in text mode, but we choose to + * be consistent with the COPY FROM STDIN case. + */ + char dummy; + + if (CopyReadBinaryData(cstate, &dummy, 1) > 0) + ereport(ERROR, + (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), + errmsg("received copy data after EOF marker"))); + return false; + } + + if (fld_count != attr_count) + ereport(ERROR, + (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), + errmsg("row field count is %d, expected %d", + (int) fld_count, attr_count))); + + foreach(cur, cstate->attnumlist) + { + int attnum = lfirst_int(cur); + int m = attnum - 1; + Form_pg_attribute att = TupleDescAttr(tupDesc, m); + + cstate->cur_attname = NameStr(att->attname); + values[m] = CopyReadBinaryAttribute(cstate, + &in_functions[m], + typioparams[m], + att->atttypmod, + &nulls[m]); + cstate->cur_attname = NULL; + } + } + + /* + * Now compute and insert any defaults available for the columns not + * provided by the input data. Anything not processed here or above will + * remain NULL. + */ + for (i = 0; i < num_defaults; i++) + { + /* + * The caller must supply econtext and have switched into the + * per-tuple memory context in it. + */ + Assert(econtext != NULL); + Assert(CurrentMemoryContext == econtext->ecxt_per_tuple_memory); + + values[defmap[i]] = ExecEvalExpr(defexprs[i], econtext, + &nulls[defmap[i]]); + } + + return true; +} + +/* + * Read the next input line and stash it in line_buf. + * + * Result is true if read was terminated by EOF, false if terminated + * by newline. The terminating newline or EOF marker is not included + * in the final value of line_buf. + */ +static bool +CopyReadLine(CopyFromState cstate) +{ + bool result; + + resetStringInfo(&cstate->line_buf); + cstate->line_buf_valid = false; + + /* Parse data and transfer into line_buf */ + result = CopyReadLineText(cstate); + + if (result) + { + /* + * Reached EOF. In protocol version 3, we should ignore anything + * after \. up to the protocol end of copy data. (XXX maybe better + * not to treat \. as special?) + */ + if (cstate->copy_src == COPY_FRONTEND) + { + int inbytes; + + do + { + inbytes = CopyGetData(cstate, cstate->input_buf, + 1, INPUT_BUF_SIZE); + } while (inbytes > 0); + cstate->input_buf_index = 0; + cstate->input_buf_len = 0; + cstate->raw_buf_index = 0; + cstate->raw_buf_len = 0; + } + } + else + { + /* + * If we didn't hit EOF, then we must have transferred the EOL marker + * to line_buf along with the data. Get rid of it. + */ + switch (cstate->eol_type) + { + case EOL_NL: + Assert(cstate->line_buf.len >= 1); + Assert(cstate->line_buf.data[cstate->line_buf.len - 1] == '\n'); + cstate->line_buf.len--; + cstate->line_buf.data[cstate->line_buf.len] = '\0'; + break; + case EOL_CR: + Assert(cstate->line_buf.len >= 1); + Assert(cstate->line_buf.data[cstate->line_buf.len - 1] == '\r'); + cstate->line_buf.len--; + cstate->line_buf.data[cstate->line_buf.len] = '\0'; + break; + case EOL_CRNL: + Assert(cstate->line_buf.len >= 2); + Assert(cstate->line_buf.data[cstate->line_buf.len - 2] == '\r'); + Assert(cstate->line_buf.data[cstate->line_buf.len - 1] == '\n'); + cstate->line_buf.len -= 2; + cstate->line_buf.data[cstate->line_buf.len] = '\0'; + break; + case EOL_UNKNOWN: + /* shouldn't get here */ + Assert(false); + break; + } + } + + /* Now it's safe to use the buffer in error messages */ + cstate->line_buf_valid = true; + + return result; +} + +/* + * CopyReadLineText - inner loop of CopyReadLine for text mode + */ +static bool +CopyReadLineText(CopyFromState cstate) +{ + char *copy_input_buf; + int input_buf_ptr; + int copy_buf_len; + bool need_data = false; + bool hit_eof = false; + bool result = false; + + /* CSV variables */ + bool first_char_in_line = true; + bool in_quote = false, + last_was_esc = false; + char quotec = '\0'; + char escapec = '\0'; + + if (cstate->opts.csv_mode) + { + quotec = cstate->opts.quote[0]; + escapec = cstate->opts.escape[0]; + /* ignore special escape processing if it's the same as quotec */ + if (quotec == escapec) + escapec = '\0'; + } + + /* + * The objective of this loop is to transfer the entire next input line + * into line_buf. Hence, we only care for detecting newlines (\r and/or + * \n) and the end-of-copy marker (\.). + * + * In CSV mode, \r and \n inside a quoted field are just part of the data + * value and are put in line_buf. We keep just enough state to know if we + * are currently in a quoted field or not. + * + * These four characters, and the CSV escape and quote characters, are + * assumed the same in frontend and backend encodings. + * + * The input has already been converted to the database encoding. All + * supported server encodings have the property that all bytes in a + * multi-byte sequence have the high bit set, so a multibyte character + * cannot contain any newline or escape characters embedded in the + * multibyte sequence. Therefore, we can process the input byte-by-byte, + * regardless of the encoding. + * + * For speed, we try to move data from input_buf to line_buf in chunks + * rather than one character at a time. input_buf_ptr points to the next + * character to examine; any characters from input_buf_index to + * input_buf_ptr have been determined to be part of the line, but not yet + * transferred to line_buf. + * + * For a little extra speed within the loop, we copy input_buf and + * input_buf_len into local variables. + */ + copy_input_buf = cstate->input_buf; + input_buf_ptr = cstate->input_buf_index; + copy_buf_len = cstate->input_buf_len; + + for (;;) + { + int prev_raw_ptr; + char c; + + /* + * Load more data if needed. + * + * TODO: We could just force four bytes of read-ahead and avoid the + * many calls to IF_NEED_REFILL_AND_NOT_EOF_CONTINUE(). That was + * unsafe with the old v2 COPY protocol, but we don't support that + * anymore. + */ + if (input_buf_ptr >= copy_buf_len || need_data) + { + REFILL_LINEBUF; + + CopyLoadInputBuf(cstate); + /* update our local variables */ + hit_eof = cstate->input_reached_eof; + input_buf_ptr = cstate->input_buf_index; + copy_buf_len = cstate->input_buf_len; + + /* + * If we are completely out of data, break out of the loop, + * reporting EOF. + */ + if (INPUT_BUF_BYTES(cstate) <= 0) + { + result = true; + break; + } + need_data = false; + } + + /* OK to fetch a character */ + prev_raw_ptr = input_buf_ptr; + c = copy_input_buf[input_buf_ptr++]; + + if (cstate->opts.csv_mode) + { + /* + * If character is '\\' or '\r', we may need to look ahead below. + * Force fetch of the next character if we don't already have it. + * We need to do this before changing CSV state, in case one of + * these characters is also the quote or escape character. + */ + if (c == '\\' || c == '\r') + { + IF_NEED_REFILL_AND_NOT_EOF_CONTINUE(0); + } + + /* + * Dealing with quotes and escapes here is mildly tricky. If the + * quote char is also the escape char, there's no problem - we + * just use the char as a toggle. If they are different, we need + * to ensure that we only take account of an escape inside a + * quoted field and immediately preceding a quote char, and not + * the second in an escape-escape sequence. + */ + if (in_quote && c == escapec) + last_was_esc = !last_was_esc; + if (c == quotec && !last_was_esc) + in_quote = !in_quote; + if (c != escapec) + last_was_esc = false; + + /* + * Updating the line count for embedded CR and/or LF chars is + * necessarily a little fragile - this test is probably about the + * best we can do. (XXX it's arguable whether we should do this + * at all --- is cur_lineno a physical or logical count?) + */ + if (in_quote && c == (cstate->eol_type == EOL_NL ? '\n' : '\r')) + cstate->cur_lineno++; + } + + /* Process \r */ + if (c == '\r' && (!cstate->opts.csv_mode || !in_quote)) + { + /* Check for \r\n on first line, _and_ handle \r\n. */ + if (cstate->eol_type == EOL_UNKNOWN || + cstate->eol_type == EOL_CRNL) + { + /* + * If need more data, go back to loop top to load it. + * + * Note that if we are at EOF, c will wind up as '\0' because + * of the guaranteed pad of input_buf. + */ + IF_NEED_REFILL_AND_NOT_EOF_CONTINUE(0); + + /* get next char */ + c = copy_input_buf[input_buf_ptr]; + + if (c == '\n') + { + input_buf_ptr++; /* eat newline */ + cstate->eol_type = EOL_CRNL; /* in case not set yet */ + } + else + { + /* found \r, but no \n */ + if (cstate->eol_type == EOL_CRNL) + ereport(ERROR, + (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), + !cstate->opts.csv_mode ? + errmsg("literal carriage return found in data") : + errmsg("unquoted carriage return found in data"), + !cstate->opts.csv_mode ? + errhint("Use \"\\r\" to represent carriage return.") : + errhint("Use quoted CSV field to represent carriage return."))); + + /* + * if we got here, it is the first line and we didn't find + * \n, so don't consume the peeked character + */ + cstate->eol_type = EOL_CR; + } + } + else if (cstate->eol_type == EOL_NL) + ereport(ERROR, + (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), + !cstate->opts.csv_mode ? + errmsg("literal carriage return found in data") : + errmsg("unquoted carriage return found in data"), + !cstate->opts.csv_mode ? + errhint("Use \"\\r\" to represent carriage return.") : + errhint("Use quoted CSV field to represent carriage return."))); + /* If reach here, we have found the line terminator */ + break; + } + + /* Process \n */ + if (c == '\n' && (!cstate->opts.csv_mode || !in_quote)) + { + if (cstate->eol_type == EOL_CR || cstate->eol_type == EOL_CRNL) + ereport(ERROR, + (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), + !cstate->opts.csv_mode ? + errmsg("literal newline found in data") : + errmsg("unquoted newline found in data"), + !cstate->opts.csv_mode ? + errhint("Use \"\\n\" to represent newline.") : + errhint("Use quoted CSV field to represent newline."))); + cstate->eol_type = EOL_NL; /* in case not set yet */ + /* If reach here, we have found the line terminator */ + break; + } + + /* + * In CSV mode, we only recognize \. alone on a line. This is because + * \. is a valid CSV data value. + */ + if (c == '\\' && (!cstate->opts.csv_mode || first_char_in_line)) + { + char c2; + + IF_NEED_REFILL_AND_NOT_EOF_CONTINUE(0); + IF_NEED_REFILL_AND_EOF_BREAK(0); + + /* ----- + * get next character + * Note: we do not change c so if it isn't \., we can fall + * through and continue processing. + * ----- + */ + c2 = copy_input_buf[input_buf_ptr]; + + if (c2 == '.') + { + input_buf_ptr++; /* consume the '.' */ + + /* + * Note: if we loop back for more data here, it does not + * matter that the CSV state change checks are re-executed; we + * will come back here with no important state changed. + */ + if (cstate->eol_type == EOL_CRNL) + { + /* Get the next character */ + IF_NEED_REFILL_AND_NOT_EOF_CONTINUE(0); + /* if hit_eof, c2 will become '\0' */ + c2 = copy_input_buf[input_buf_ptr++]; + + if (c2 == '\n') + { + if (!cstate->opts.csv_mode) + ereport(ERROR, + (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), + errmsg("end-of-copy marker does not match previous newline style"))); + else + NO_END_OF_COPY_GOTO; + } + else if (c2 != '\r') + { + if (!cstate->opts.csv_mode) + ereport(ERROR, + (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), + errmsg("end-of-copy marker corrupt"))); + else + NO_END_OF_COPY_GOTO; + } + } + + /* Get the next character */ + IF_NEED_REFILL_AND_NOT_EOF_CONTINUE(0); + /* if hit_eof, c2 will become '\0' */ + c2 = copy_input_buf[input_buf_ptr++]; + + if (c2 != '\r' && c2 != '\n') + { + if (!cstate->opts.csv_mode) + ereport(ERROR, + (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), + errmsg("end-of-copy marker corrupt"))); + else + NO_END_OF_COPY_GOTO; + } + + if ((cstate->eol_type == EOL_NL && c2 != '\n') || + (cstate->eol_type == EOL_CRNL && c2 != '\n') || + (cstate->eol_type == EOL_CR && c2 != '\r')) + { + ereport(ERROR, + (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), + errmsg("end-of-copy marker does not match previous newline style"))); + } + + /* + * Transfer only the data before the \. into line_buf, then + * discard the data and the \. sequence. + */ + if (prev_raw_ptr > cstate->input_buf_index) + appendBinaryStringInfo(&cstate->line_buf, + cstate->input_buf + cstate->input_buf_index, + prev_raw_ptr - cstate->input_buf_index); + cstate->input_buf_index = input_buf_ptr; + result = true; /* report EOF */ + break; + } + else if (!cstate->opts.csv_mode) + { + /* + * If we are here, it means we found a backslash followed by + * something other than a period. In non-CSV mode, anything + * after a backslash is special, so we skip over that second + * character too. If we didn't do that \\. would be + * considered an eof-of copy, while in non-CSV mode it is a + * literal backslash followed by a period. In CSV mode, + * backslashes are not special, so we want to process the + * character after the backslash just like a normal character, + * so we don't increment in those cases. + */ + input_buf_ptr++; + } + } + + /* + * This label is for CSV cases where \. appears at the start of a + * line, but there is more text after it, meaning it was a data value. + * We are more strict for \. in CSV mode because \. could be a data + * value, while in non-CSV mode, \. cannot be a data value. + */ +not_end_of_copy: + first_char_in_line = false; + } /* end of outer loop */ + + /* + * Transfer any still-uncopied data to line_buf. + */ + REFILL_LINEBUF; + + return result; +} + +/* + * Return decimal value for a hexadecimal digit + */ +static int +GetDecimalFromHex(char hex) +{ + if (isdigit((unsigned char) hex)) + return hex - '0'; + else + return tolower((unsigned char) hex) - 'a' + 10; +} + +/* + * Parse the current line into separate attributes (fields), + * performing de-escaping as needed. + * + * The input is in line_buf. We use attribute_buf to hold the result + * strings. cstate->raw_fields[k] is set to point to the k'th attribute + * string, or NULL when the input matches the null marker string. + * This array is expanded as necessary. + * + * (Note that the caller cannot check for nulls since the returned + * string would be the post-de-escaping equivalent, which may look + * the same as some valid data string.) + * + * delim is the column delimiter string (must be just one byte for now). + * null_print is the null marker string. Note that this is compared to + * the pre-de-escaped input string. + * + * The return value is the number of fields actually read. + */ +static int +CopyReadAttributesText(CopyFromState cstate) +{ + char delimc = cstate->opts.delim[0]; + int fieldno; + char *output_ptr; + char *cur_ptr; + char *line_end_ptr; + + /* + * We need a special case for zero-column tables: check that the input + * line is empty, and return. + */ + if (cstate->max_fields <= 0) + { + if (cstate->line_buf.len != 0) + ereport(ERROR, + (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), + errmsg("extra data after last expected column"))); + return 0; + } + + resetStringInfo(&cstate->attribute_buf); + + /* + * The de-escaped attributes will certainly not be longer than the input + * data line, so we can just force attribute_buf to be large enough and + * then transfer data without any checks for enough space. We need to do + * it this way because enlarging attribute_buf mid-stream would invalidate + * pointers already stored into cstate->raw_fields[]. + */ + if (cstate->attribute_buf.maxlen <= cstate->line_buf.len) + enlargeStringInfo(&cstate->attribute_buf, cstate->line_buf.len); + output_ptr = cstate->attribute_buf.data; + + /* set pointer variables for loop */ + cur_ptr = cstate->line_buf.data; + line_end_ptr = cstate->line_buf.data + cstate->line_buf.len; + + /* Outer loop iterates over fields */ + fieldno = 0; + for (;;) + { + bool found_delim = false; + char *start_ptr; + char *end_ptr; + int input_len; + bool saw_non_ascii = false; + + /* Make sure there is enough space for the next value */ + if (fieldno >= cstate->max_fields) + { + cstate->max_fields *= 2; + cstate->raw_fields = + repalloc(cstate->raw_fields, cstate->max_fields * sizeof(char *)); + } + + /* Remember start of field on both input and output sides */ + start_ptr = cur_ptr; + cstate->raw_fields[fieldno] = output_ptr; + + /* + * Scan data for field. + * + * Note that in this loop, we are scanning to locate the end of field + * and also speculatively performing de-escaping. Once we find the + * end-of-field, we can match the raw field contents against the null + * marker string. Only after that comparison fails do we know that + * de-escaping is actually the right thing to do; therefore we *must + * not* throw any syntax errors before we've done the null-marker + * check. + */ + for (;;) + { + char c; + + end_ptr = cur_ptr; + if (cur_ptr >= line_end_ptr) + break; + c = *cur_ptr++; + if (c == delimc) + { + found_delim = true; + break; + } + if (c == '\\') + { + if (cur_ptr >= line_end_ptr) + break; + c = *cur_ptr++; + switch (c) + { + case '0': + case '1': + case '2': + case '3': + case '4': + case '5': + case '6': + case '7': + { + /* handle \013 */ + int val; + + val = OCTVALUE(c); + if (cur_ptr < line_end_ptr) + { + c = *cur_ptr; + if (ISOCTAL(c)) + { + cur_ptr++; + val = (val << 3) + OCTVALUE(c); + if (cur_ptr < line_end_ptr) + { + c = *cur_ptr; + if (ISOCTAL(c)) + { + cur_ptr++; + val = (val << 3) + OCTVALUE(c); + } + } + } + } + c = val & 0377; + if (c == '\0' || IS_HIGHBIT_SET(c)) + saw_non_ascii = true; + } + break; + case 'x': + /* Handle \x3F */ + if (cur_ptr < line_end_ptr) + { + char hexchar = *cur_ptr; + + if (isxdigit((unsigned char) hexchar)) + { + int val = GetDecimalFromHex(hexchar); + + cur_ptr++; + if (cur_ptr < line_end_ptr) + { + hexchar = *cur_ptr; + if (isxdigit((unsigned char) hexchar)) + { + cur_ptr++; + val = (val << 4) + GetDecimalFromHex(hexchar); + } + } + c = val & 0xff; + if (c == '\0' || IS_HIGHBIT_SET(c)) + saw_non_ascii = true; + } + } + break; + case 'b': + c = '\b'; + break; + case 'f': + c = '\f'; + break; + case 'n': + c = '\n'; + break; + case 'r': + c = '\r'; + break; + case 't': + c = '\t'; + break; + case 'v': + c = '\v'; + break; + + /* + * in all other cases, take the char after '\' + * literally + */ + } + } + + /* Add c to output string */ + *output_ptr++ = c; + } + + /* Check whether raw input matched null marker */ + input_len = end_ptr - start_ptr; + if (input_len == cstate->opts.null_print_len && + strncmp(start_ptr, cstate->opts.null_print, input_len) == 0) + cstate->raw_fields[fieldno] = NULL; + else + { + /* + * At this point we know the field is supposed to contain data. + * + * If we de-escaped any non-7-bit-ASCII chars, make sure the + * resulting string is valid data for the db encoding. + */ + if (saw_non_ascii) + { + char *fld = cstate->raw_fields[fieldno]; + + pg_verifymbstr(fld, output_ptr - fld, false); + } + } + + /* Terminate attribute value in output area */ + *output_ptr++ = '\0'; + + fieldno++; + /* Done if we hit EOL instead of a delim */ + if (!found_delim) + break; + } + + /* Clean up state of attribute_buf */ + output_ptr--; + Assert(*output_ptr == '\0'); + cstate->attribute_buf.len = (output_ptr - cstate->attribute_buf.data); + + return fieldno; +} + +/* + * Parse the current line into separate attributes (fields), + * performing de-escaping as needed. This has exactly the same API as + * CopyReadAttributesText, except we parse the fields according to + * "standard" (i.e. common) CSV usage. + */ +static int +CopyReadAttributesCSV(CopyFromState cstate) +{ + char delimc = cstate->opts.delim[0]; + char quotec = cstate->opts.quote[0]; + char escapec = cstate->opts.escape[0]; + int fieldno; + char *output_ptr; + char *cur_ptr; + char *line_end_ptr; + + /* + * We need a special case for zero-column tables: check that the input + * line is empty, and return. + */ + if (cstate->max_fields <= 0) + { + if (cstate->line_buf.len != 0) + ereport(ERROR, + (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), + errmsg("extra data after last expected column"))); + return 0; + } + + resetStringInfo(&cstate->attribute_buf); + + /* + * The de-escaped attributes will certainly not be longer than the input + * data line, so we can just force attribute_buf to be large enough and + * then transfer data without any checks for enough space. We need to do + * it this way because enlarging attribute_buf mid-stream would invalidate + * pointers already stored into cstate->raw_fields[]. + */ + if (cstate->attribute_buf.maxlen <= cstate->line_buf.len) + enlargeStringInfo(&cstate->attribute_buf, cstate->line_buf.len); + output_ptr = cstate->attribute_buf.data; + + /* set pointer variables for loop */ + cur_ptr = cstate->line_buf.data; + line_end_ptr = cstate->line_buf.data + cstate->line_buf.len; + + /* Outer loop iterates over fields */ + fieldno = 0; + for (;;) + { + bool found_delim = false; + bool saw_quote = false; + char *start_ptr; + char *end_ptr; + int input_len; + + /* Make sure there is enough space for the next value */ + if (fieldno >= cstate->max_fields) + { + cstate->max_fields *= 2; + cstate->raw_fields = + repalloc(cstate->raw_fields, cstate->max_fields * sizeof(char *)); + } + + /* Remember start of field on both input and output sides */ + start_ptr = cur_ptr; + cstate->raw_fields[fieldno] = output_ptr; + + /* + * Scan data for field, + * + * The loop starts in "not quote" mode and then toggles between that + * and "in quote" mode. The loop exits normally if it is in "not + * quote" mode and a delimiter or line end is seen. + */ + for (;;) + { + char c; + + /* Not in quote */ + for (;;) + { + end_ptr = cur_ptr; + if (cur_ptr >= line_end_ptr) + goto endfield; + c = *cur_ptr++; + /* unquoted field delimiter */ + if (c == delimc) + { + found_delim = true; + goto endfield; + } + /* start of quoted field (or part of field) */ + if (c == quotec) + { + saw_quote = true; + break; + } + /* Add c to output string */ + *output_ptr++ = c; + } + + /* In quote */ + for (;;) + { + end_ptr = cur_ptr; + if (cur_ptr >= line_end_ptr) + ereport(ERROR, + (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), + errmsg("unterminated CSV quoted field"))); + + c = *cur_ptr++; + + /* escape within a quoted field */ + if (c == escapec) + { + /* + * peek at the next char if available, and escape it if it + * is an escape char or a quote char + */ + if (cur_ptr < line_end_ptr) + { + char nextc = *cur_ptr; + + if (nextc == escapec || nextc == quotec) + { + *output_ptr++ = nextc; + cur_ptr++; + continue; + } + } + } + + /* + * end of quoted field. Must do this test after testing for + * escape in case quote char and escape char are the same + * (which is the common case). + */ + if (c == quotec) + break; + + /* Add c to output string */ + *output_ptr++ = c; + } + } +endfield: + + /* Terminate attribute value in output area */ + *output_ptr++ = '\0'; + + /* Check whether raw input matched null marker */ + input_len = end_ptr - start_ptr; + if (!saw_quote && input_len == cstate->opts.null_print_len && + strncmp(start_ptr, cstate->opts.null_print, input_len) == 0) + cstate->raw_fields[fieldno] = NULL; + + fieldno++; + /* Done if we hit EOL instead of a delim */ + if (!found_delim) + break; + } + + /* Clean up state of attribute_buf */ + output_ptr--; + Assert(*output_ptr == '\0'); + cstate->attribute_buf.len = (output_ptr - cstate->attribute_buf.data); + + return fieldno; +} + + +/* + * Read a binary attribute + */ +static Datum +CopyReadBinaryAttribute(CopyFromState cstate, FmgrInfo *flinfo, + Oid typioparam, int32 typmod, + bool *isnull) +{ + int32 fld_size; + Datum result; + + if (!CopyGetInt32(cstate, &fld_size)) + ereport(ERROR, + (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), + errmsg("unexpected EOF in COPY data"))); + if (fld_size == -1) + { + *isnull = true; + return ReceiveFunctionCall(flinfo, NULL, typioparam, typmod); + } + if (fld_size < 0) + ereport(ERROR, + (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), + errmsg("invalid field size"))); + + /* reset attribute_buf to empty, and load raw data in it */ + resetStringInfo(&cstate->attribute_buf); + + enlargeStringInfo(&cstate->attribute_buf, fld_size); + if (CopyReadBinaryData(cstate, cstate->attribute_buf.data, + fld_size) != fld_size) + ereport(ERROR, + (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), + errmsg("unexpected EOF in COPY data"))); + + cstate->attribute_buf.len = fld_size; + cstate->attribute_buf.data[fld_size] = '\0'; + + /* Call the column type's binary input converter */ + result = ReceiveFunctionCall(flinfo, &cstate->attribute_buf, + typioparam, typmod); + + /* Trouble if it didn't eat the whole buffer */ + if (cstate->attribute_buf.cursor != cstate->attribute_buf.len) + ereport(ERROR, + (errcode(ERRCODE_INVALID_BINARY_REPRESENTATION), + errmsg("incorrect binary data format"))); + + *isnull = false; + return result; +} |