/*------------------------------------------------------------------------- * * pg_backup_db.c * * Implements the basic DB functions used by the archiver. * * IDENTIFICATION * src/bin/pg_dump/pg_backup_db.c * *------------------------------------------------------------------------- */ #include "postgres_fe.h" #include #include #ifdef HAVE_TERMIOS_H #include #endif #include "common/connect.h" #include "dumputils.h" #include "fe_utils/string_utils.h" #include "parallel.h" #include "pg_backup_archiver.h" #include "pg_backup_db.h" #include "pg_backup_utils.h" static void _check_database_version(ArchiveHandle *AH); static void notice_processor(void *arg, const char *message); static void _check_database_version(ArchiveHandle *AH) { const char *remoteversion_str; int remoteversion; PGresult *res; remoteversion_str = PQparameterStatus(AH->connection, "server_version"); remoteversion = PQserverVersion(AH->connection); if (remoteversion == 0 || !remoteversion_str) fatal("could not get server_version from libpq"); AH->public.remoteVersionStr = pg_strdup(remoteversion_str); AH->public.remoteVersion = remoteversion; if (!AH->archiveRemoteVersion) AH->archiveRemoteVersion = AH->public.remoteVersionStr; if (remoteversion != PG_VERSION_NUM && (remoteversion < AH->public.minRemoteVersion || remoteversion > AH->public.maxRemoteVersion)) { pg_log_error("server version: %s; %s version: %s", remoteversion_str, progname, PG_VERSION); fatal("aborting because of server version mismatch"); } /* * When running against 9.0 or later, check if we are in recovery mode, * which means we are on a hot standby. */ if (remoteversion >= 90000) { res = ExecuteSqlQueryForSingleRow((Archive *) AH, "SELECT pg_catalog.pg_is_in_recovery()"); AH->public.isStandby = (strcmp(PQgetvalue(res, 0, 0), "t") == 0); PQclear(res); } else AH->public.isStandby = false; } /* * Reconnect to the server. If dbname is not NULL, use that database, * else the one associated with the archive handle. */ void ReconnectToServer(ArchiveHandle *AH, const char *dbname) { PGconn *oldConn = AH->connection; RestoreOptions *ropt = AH->public.ropt; /* * Save the dbname, if given, in override_dbname so that it will also * affect any later reconnection attempt. */ if (dbname) ropt->cparams.override_dbname = pg_strdup(dbname); /* * Note: we want to establish the new connection, and in particular update * ArchiveHandle's connCancel, before closing old connection. Otherwise * an ill-timed SIGINT could try to access a dead connection. */ AH->connection = NULL; /* dodge error check in ConnectDatabase */ ConnectDatabase((Archive *) AH, &ropt->cparams, true); PQfinish(oldConn); } /* * Make, or remake, a database connection with the given parameters. * * The resulting connection handle is stored in AHX->connection. * * An interactive password prompt is automatically issued if required. * We store the results of that in AHX->savedPassword. * Note: it's not really all that sensible to use a single-entry password * cache if the username keeps changing. In current usage, however, the * username never does change, so one savedPassword is sufficient. */ void ConnectDatabase(Archive *AHX, const ConnParams *cparams, bool isReconnect) { ArchiveHandle *AH = (ArchiveHandle *) AHX; trivalue prompt_password; char *password; char passbuf[100]; bool new_pass; if (AH->connection) fatal("already connected to a database"); /* Never prompt for a password during a reconnection */ prompt_password = isReconnect ? TRI_NO : cparams->promptPassword; password = AH->savedPassword; if (prompt_password == TRI_YES && password == NULL) { simple_prompt("Password: ", passbuf, sizeof(passbuf), false); password = passbuf; } /* * Start the connection. Loop until we have a password if requested by * backend. */ do { const char *keywords[8]; const char *values[8]; int i = 0; /* * If dbname is a connstring, its entries can override the other * values obtained from cparams; but in turn, override_dbname can * override the dbname component of it. */ keywords[i] = "host"; values[i++] = cparams->pghost; keywords[i] = "port"; values[i++] = cparams->pgport; keywords[i] = "user"; values[i++] = cparams->username; keywords[i] = "password"; values[i++] = password; keywords[i] = "dbname"; values[i++] = cparams->dbname; if (cparams->override_dbname) { keywords[i] = "dbname"; values[i++] = cparams->override_dbname; } keywords[i] = "fallback_application_name"; values[i++] = progname; keywords[i] = NULL; values[i++] = NULL; Assert(i <= lengthof(keywords)); new_pass = false; AH->connection = PQconnectdbParams(keywords, values, true); if (!AH->connection) fatal("could not connect to database"); if (PQstatus(AH->connection) == CONNECTION_BAD && PQconnectionNeedsPassword(AH->connection) && password == NULL && prompt_password != TRI_NO) { PQfinish(AH->connection); simple_prompt("Password: ", passbuf, sizeof(passbuf), false); password = passbuf; new_pass = true; } } while (new_pass); /* check to see that the backend connection was successfully made */ if (PQstatus(AH->connection) == CONNECTION_BAD) { if (isReconnect) fatal("reconnection to database \"%s\" failed: %s", PQdb(AH->connection) ? PQdb(AH->connection) : "", PQerrorMessage(AH->connection)); else fatal("connection to database \"%s\" failed: %s", PQdb(AH->connection) ? PQdb(AH->connection) : "", PQerrorMessage(AH->connection)); } /* Start strict; later phases may override this. */ PQclear(ExecuteSqlQueryForSingleRow((Archive *) AH, ALWAYS_SECURE_SEARCH_PATH_SQL)); /* * We want to remember connection's actual password, whether or not we got * it by prompting. So we don't just store the password variable. */ if (PQconnectionUsedPassword(AH->connection)) { if (AH->savedPassword) free(AH->savedPassword); AH->savedPassword = pg_strdup(PQpass(AH->connection)); } /* check for version mismatch */ _check_database_version(AH); PQsetNoticeProcessor(AH->connection, notice_processor, NULL); /* arrange for SIGINT to issue a query cancel on this connection */ set_archive_cancel_info(AH, AH->connection); } /* * Close the connection to the database and also cancel off the query if we * have one running. */ void DisconnectDatabase(Archive *AHX) { ArchiveHandle *AH = (ArchiveHandle *) AHX; char errbuf[1]; if (!AH->connection) return; if (AH->connCancel) { /* * If we have an active query, send a cancel before closing, ignoring * any errors. This is of no use for a normal exit, but might be * helpful during fatal(). */ if (PQtransactionStatus(AH->connection) == PQTRANS_ACTIVE) (void) PQcancel(AH->connCancel, errbuf, sizeof(errbuf)); /* * Prevent signal handler from sending a cancel after this. */ set_archive_cancel_info(AH, NULL); } PQfinish(AH->connection); AH->connection = NULL; } PGconn * GetConnection(Archive *AHX) { ArchiveHandle *AH = (ArchiveHandle *) AHX; return AH->connection; } static void notice_processor(void *arg, const char *message) { pg_log_generic(PG_LOG_INFO, "%s", message); } /* Like fatal(), but with a complaint about a particular query. */ static void die_on_query_failure(ArchiveHandle *AH, const char *query) { pg_log_error("query failed: %s", PQerrorMessage(AH->connection)); fatal("query was: %s", query); } void ExecuteSqlStatement(Archive *AHX, const char *query) { ArchiveHandle *AH = (ArchiveHandle *) AHX; PGresult *res; res = PQexec(AH->connection, query); if (PQresultStatus(res) != PGRES_COMMAND_OK) die_on_query_failure(AH, query); PQclear(res); } PGresult * ExecuteSqlQuery(Archive *AHX, const char *query, ExecStatusType status) { ArchiveHandle *AH = (ArchiveHandle *) AHX; PGresult *res; res = PQexec(AH->connection, query); if (PQresultStatus(res) != status) die_on_query_failure(AH, query); return res; } /* * Execute an SQL query and verify that we got exactly one row back. */ PGresult * ExecuteSqlQueryForSingleRow(Archive *fout, const char *query) { PGresult *res; int ntups; res = ExecuteSqlQuery(fout, query, PGRES_TUPLES_OK); /* Expecting a single result only */ ntups = PQntuples(res); if (ntups != 1) fatal(ngettext("query returned %d row instead of one: %s", "query returned %d rows instead of one: %s", ntups), ntups, query); return res; } /* * Convenience function to send a query. * Monitors result to detect COPY statements */ static void ExecuteSqlCommand(ArchiveHandle *AH, const char *qry, const char *desc) { PGconn *conn = AH->connection; PGresult *res; #ifdef NOT_USED fprintf(stderr, "Executing: '%s'\n\n", qry); #endif res = PQexec(conn, qry); switch (PQresultStatus(res)) { case PGRES_COMMAND_OK: case PGRES_TUPLES_OK: case PGRES_EMPTY_QUERY: /* A-OK */ break; case PGRES_COPY_IN: /* Assume this is an expected result */ AH->pgCopyIn = true; break; default: /* trouble */ warn_or_exit_horribly(AH, "%s: %sCommand was: %s", desc, PQerrorMessage(conn), qry); break; } PQclear(res); } /* * Process non-COPY table data (that is, INSERT commands). * * The commands have been run together as one long string for compressibility, * and we are receiving them in bufferloads with arbitrary boundaries, so we * have to locate command boundaries and save partial commands across calls. * All state must be kept in AH->sqlparse, not in local variables of this * routine. We assume that AH->sqlparse was filled with zeroes when created. * * We have to lex the data to the extent of identifying literals and quoted * identifiers, so that we can recognize statement-terminating semicolons. * We assume that INSERT data will not contain SQL comments, E'' literals, * or dollar-quoted strings, so this is much simpler than a full SQL lexer. * * Note: when restoring from a pre-9.0 dump file, this code is also used to * process BLOB COMMENTS data, which has the same problem of containing * multiple SQL commands that might be split across bufferloads. Fortunately, * that data won't contain anything complicated to lex either. */ static void ExecuteSimpleCommands(ArchiveHandle *AH, const char *buf, size_t bufLen) { const char *qry = buf; const char *eos = buf + bufLen; /* initialize command buffer if first time through */ if (AH->sqlparse.curCmd == NULL) AH->sqlparse.curCmd = createPQExpBuffer(); for (; qry < eos; qry++) { char ch = *qry; /* For neatness, we skip any newlines between commands */ if (!(ch == '\n' && AH->sqlparse.curCmd->len == 0)) appendPQExpBufferChar(AH->sqlparse.curCmd, ch); switch (AH->sqlparse.state) { case SQL_SCAN: /* Default state == 0, set in _allocAH */ if (ch == ';') { /* * We've found the end of a statement. Send it and reset * the buffer. */ ExecuteSqlCommand(AH, AH->sqlparse.curCmd->data, "could not execute query"); resetPQExpBuffer(AH->sqlparse.curCmd); } else if (ch == '\'') { AH->sqlparse.state = SQL_IN_SINGLE_QUOTE; AH->sqlparse.backSlash = false; } else if (ch == '"') { AH->sqlparse.state = SQL_IN_DOUBLE_QUOTE; } break; case SQL_IN_SINGLE_QUOTE: /* We needn't handle '' specially */ if (ch == '\'' && !AH->sqlparse.backSlash) AH->sqlparse.state = SQL_SCAN; else if (ch == '\\' && !AH->public.std_strings) AH->sqlparse.backSlash = !AH->sqlparse.backSlash; else AH->sqlparse.backSlash = false; break; case SQL_IN_DOUBLE_QUOTE: /* We needn't handle "" specially */ if (ch == '"') AH->sqlparse.state = SQL_SCAN; break; } } } /* * Implement ahwrite() for direct-to-DB restore */ int ExecuteSqlCommandBuf(Archive *AHX, const char *buf, size_t bufLen) { ArchiveHandle *AH = (ArchiveHandle *) AHX; if (AH->outputKind == OUTPUT_COPYDATA) { /* * COPY data. * * We drop the data on the floor if libpq has failed to enter COPY * mode; this allows us to behave reasonably when trying to continue * after an error in a COPY command. */ if (AH->pgCopyIn && PQputCopyData(AH->connection, buf, bufLen) <= 0) fatal("error returned by PQputCopyData: %s", PQerrorMessage(AH->connection)); } else if (AH->outputKind == OUTPUT_OTHERDATA) { /* * Table data expressed as INSERT commands; or, in old dump files, * BLOB COMMENTS data (which is expressed as COMMENT ON commands). */ ExecuteSimpleCommands(AH, buf, bufLen); } else { /* * General SQL commands; we assume that commands will not be split * across calls. * * In most cases the data passed to us will be a null-terminated * string, but if it's not, we have to add a trailing null. */ if (buf[bufLen] == '\0') ExecuteSqlCommand(AH, buf, "could not execute query"); else { char *str = (char *) pg_malloc(bufLen + 1); memcpy(str, buf, bufLen); str[bufLen] = '\0'; ExecuteSqlCommand(AH, str, "could not execute query"); free(str); } } return bufLen; } /* * Terminate a COPY operation during direct-to-DB restore */ void EndDBCopyMode(Archive *AHX, const char *tocEntryTag) { ArchiveHandle *AH = (ArchiveHandle *) AHX; if (AH->pgCopyIn) { PGresult *res; if (PQputCopyEnd(AH->connection, NULL) <= 0) fatal("error returned by PQputCopyEnd: %s", PQerrorMessage(AH->connection)); /* Check command status and return to normal libpq state */ res = PQgetResult(AH->connection); if (PQresultStatus(res) != PGRES_COMMAND_OK) warn_or_exit_horribly(AH, "COPY failed for table \"%s\": %s", tocEntryTag, PQerrorMessage(AH->connection)); PQclear(res); /* Do this to ensure we've pumped libpq back to idle state */ if (PQgetResult(AH->connection) != NULL) pg_log_warning("unexpected extra results during COPY of table \"%s\"", tocEntryTag); AH->pgCopyIn = false; } } void StartTransaction(Archive *AHX) { ArchiveHandle *AH = (ArchiveHandle *) AHX; ExecuteSqlCommand(AH, "BEGIN", "could not start database transaction"); } void CommitTransaction(Archive *AHX) { ArchiveHandle *AH = (ArchiveHandle *) AHX; ExecuteSqlCommand(AH, "COMMIT", "could not commit database transaction"); } void DropBlobIfExists(ArchiveHandle *AH, Oid oid) { /* * If we are not restoring to a direct database connection, we have to * guess about how to detect whether the blob exists. Assume new-style. */ if (AH->connection == NULL || PQserverVersion(AH->connection) >= 90000) { ahprintf(AH, "SELECT pg_catalog.lo_unlink(oid) " "FROM pg_catalog.pg_largeobject_metadata " "WHERE oid = '%u';\n", oid); } else { /* Restoring to pre-9.0 server, so do it the old way */ ahprintf(AH, "SELECT CASE WHEN EXISTS(" "SELECT 1 FROM pg_catalog.pg_largeobject WHERE loid = '%u'" ") THEN pg_catalog.lo_unlink('%u') END;\n", oid, oid); } }