summaryrefslogtreecommitdiffstats
path: root/src/bin/pg_dump/pg_backup_db.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/bin/pg_dump/pg_backup_db.c')
-rw-r--r--src/bin/pg_dump/pg_backup_db.c576
1 files changed, 576 insertions, 0 deletions
diff --git a/src/bin/pg_dump/pg_backup_db.c b/src/bin/pg_dump/pg_backup_db.c
new file mode 100644
index 0000000..d0752a3
--- /dev/null
+++ b/src/bin/pg_dump/pg_backup_db.c
@@ -0,0 +1,576 @@
+/*-------------------------------------------------------------------------
+ *
+ * pg_backup_db.c
+ *
+ * Implements the basic DB functions used by the archiver.
+ *
+ * IDENTIFICATION
+ * src/bin/pg_dump/pg_backup_db.c
+ *
+ *-------------------------------------------------------------------------
+ */
+#include "postgres_fe.h"
+
+#include <unistd.h>
+#include <ctype.h>
+#ifdef HAVE_TERMIOS_H
+#include <termios.h>
+#endif
+
+#include "common/connect.h"
+#include "dumputils.h"
+#include "fe_utils/string_utils.h"
+#include "parallel.h"
+#include "pg_backup_archiver.h"
+#include "pg_backup_db.h"
+#include "pg_backup_utils.h"
+
+static void _check_database_version(ArchiveHandle *AH);
+static void notice_processor(void *arg, const char *message);
+
+static void
+_check_database_version(ArchiveHandle *AH)
+{
+ const char *remoteversion_str;
+ int remoteversion;
+ PGresult *res;
+
+ remoteversion_str = PQparameterStatus(AH->connection, "server_version");
+ remoteversion = PQserverVersion(AH->connection);
+ if (remoteversion == 0 || !remoteversion_str)
+ fatal("could not get server_version from libpq");
+
+ AH->public.remoteVersionStr = pg_strdup(remoteversion_str);
+ AH->public.remoteVersion = remoteversion;
+ if (!AH->archiveRemoteVersion)
+ AH->archiveRemoteVersion = AH->public.remoteVersionStr;
+
+ if (remoteversion != PG_VERSION_NUM
+ && (remoteversion < AH->public.minRemoteVersion ||
+ remoteversion > AH->public.maxRemoteVersion))
+ {
+ pg_log_error("server version: %s; %s version: %s",
+ remoteversion_str, progname, PG_VERSION);
+ fatal("aborting because of server version mismatch");
+ }
+
+ /*
+ * When running against 9.0 or later, check if we are in recovery mode,
+ * which means we are on a hot standby.
+ */
+ if (remoteversion >= 90000)
+ {
+ res = ExecuteSqlQueryForSingleRow((Archive *) AH, "SELECT pg_catalog.pg_is_in_recovery()");
+
+ AH->public.isStandby = (strcmp(PQgetvalue(res, 0, 0), "t") == 0);
+ PQclear(res);
+ }
+ else
+ AH->public.isStandby = false;
+}
+
+/*
+ * Reconnect to the server. If dbname is not NULL, use that database,
+ * else the one associated with the archive handle.
+ */
+void
+ReconnectToServer(ArchiveHandle *AH, const char *dbname)
+{
+ PGconn *oldConn = AH->connection;
+ RestoreOptions *ropt = AH->public.ropt;
+
+ /*
+ * Save the dbname, if given, in override_dbname so that it will also
+ * affect any later reconnection attempt.
+ */
+ if (dbname)
+ ropt->cparams.override_dbname = pg_strdup(dbname);
+
+ /*
+ * Note: we want to establish the new connection, and in particular update
+ * ArchiveHandle's connCancel, before closing old connection. Otherwise
+ * an ill-timed SIGINT could try to access a dead connection.
+ */
+ AH->connection = NULL; /* dodge error check in ConnectDatabase */
+
+ ConnectDatabase((Archive *) AH, &ropt->cparams, true);
+
+ PQfinish(oldConn);
+}
+
+/*
+ * Make, or remake, a database connection with the given parameters.
+ *
+ * The resulting connection handle is stored in AHX->connection.
+ *
+ * An interactive password prompt is automatically issued if required.
+ * We store the results of that in AHX->savedPassword.
+ * Note: it's not really all that sensible to use a single-entry password
+ * cache if the username keeps changing. In current usage, however, the
+ * username never does change, so one savedPassword is sufficient.
+ */
+void
+ConnectDatabase(Archive *AHX,
+ const ConnParams *cparams,
+ bool isReconnect)
+{
+ ArchiveHandle *AH = (ArchiveHandle *) AHX;
+ trivalue prompt_password;
+ char *password;
+ char passbuf[100];
+ bool new_pass;
+
+ if (AH->connection)
+ fatal("already connected to a database");
+
+ /* Never prompt for a password during a reconnection */
+ prompt_password = isReconnect ? TRI_NO : cparams->promptPassword;
+
+ password = AH->savedPassword;
+
+ if (prompt_password == TRI_YES && password == NULL)
+ {
+ simple_prompt("Password: ", passbuf, sizeof(passbuf), false);
+ password = passbuf;
+ }
+
+ /*
+ * Start the connection. Loop until we have a password if requested by
+ * backend.
+ */
+ do
+ {
+ const char *keywords[8];
+ const char *values[8];
+ int i = 0;
+
+ /*
+ * If dbname is a connstring, its entries can override the other
+ * values obtained from cparams; but in turn, override_dbname can
+ * override the dbname component of it.
+ */
+ keywords[i] = "host";
+ values[i++] = cparams->pghost;
+ keywords[i] = "port";
+ values[i++] = cparams->pgport;
+ keywords[i] = "user";
+ values[i++] = cparams->username;
+ keywords[i] = "password";
+ values[i++] = password;
+ keywords[i] = "dbname";
+ values[i++] = cparams->dbname;
+ if (cparams->override_dbname)
+ {
+ keywords[i] = "dbname";
+ values[i++] = cparams->override_dbname;
+ }
+ keywords[i] = "fallback_application_name";
+ values[i++] = progname;
+ keywords[i] = NULL;
+ values[i++] = NULL;
+ Assert(i <= lengthof(keywords));
+
+ new_pass = false;
+ AH->connection = PQconnectdbParams(keywords, values, true);
+
+ if (!AH->connection)
+ fatal("could not connect to database");
+
+ if (PQstatus(AH->connection) == CONNECTION_BAD &&
+ PQconnectionNeedsPassword(AH->connection) &&
+ password == NULL &&
+ prompt_password != TRI_NO)
+ {
+ PQfinish(AH->connection);
+ simple_prompt("Password: ", passbuf, sizeof(passbuf), false);
+ password = passbuf;
+ new_pass = true;
+ }
+ } while (new_pass);
+
+ /* check to see that the backend connection was successfully made */
+ if (PQstatus(AH->connection) == CONNECTION_BAD)
+ {
+ if (isReconnect)
+ fatal("reconnection to database \"%s\" failed: %s",
+ PQdb(AH->connection) ? PQdb(AH->connection) : "",
+ PQerrorMessage(AH->connection));
+ else
+ fatal("connection to database \"%s\" failed: %s",
+ PQdb(AH->connection) ? PQdb(AH->connection) : "",
+ PQerrorMessage(AH->connection));
+ }
+
+ /* Start strict; later phases may override this. */
+ PQclear(ExecuteSqlQueryForSingleRow((Archive *) AH,
+ ALWAYS_SECURE_SEARCH_PATH_SQL));
+
+ /*
+ * We want to remember connection's actual password, whether or not we got
+ * it by prompting. So we don't just store the password variable.
+ */
+ if (PQconnectionUsedPassword(AH->connection))
+ {
+ if (AH->savedPassword)
+ free(AH->savedPassword);
+ AH->savedPassword = pg_strdup(PQpass(AH->connection));
+ }
+
+ /* check for version mismatch */
+ _check_database_version(AH);
+
+ PQsetNoticeProcessor(AH->connection, notice_processor, NULL);
+
+ /* arrange for SIGINT to issue a query cancel on this connection */
+ set_archive_cancel_info(AH, AH->connection);
+}
+
+/*
+ * Close the connection to the database and also cancel off the query if we
+ * have one running.
+ */
+void
+DisconnectDatabase(Archive *AHX)
+{
+ ArchiveHandle *AH = (ArchiveHandle *) AHX;
+ char errbuf[1];
+
+ if (!AH->connection)
+ return;
+
+ if (AH->connCancel)
+ {
+ /*
+ * If we have an active query, send a cancel before closing, ignoring
+ * any errors. This is of no use for a normal exit, but might be
+ * helpful during fatal().
+ */
+ if (PQtransactionStatus(AH->connection) == PQTRANS_ACTIVE)
+ (void) PQcancel(AH->connCancel, errbuf, sizeof(errbuf));
+
+ /*
+ * Prevent signal handler from sending a cancel after this.
+ */
+ set_archive_cancel_info(AH, NULL);
+ }
+
+ PQfinish(AH->connection);
+ AH->connection = NULL;
+}
+
+PGconn *
+GetConnection(Archive *AHX)
+{
+ ArchiveHandle *AH = (ArchiveHandle *) AHX;
+
+ return AH->connection;
+}
+
+static void
+notice_processor(void *arg, const char *message)
+{
+ pg_log_generic(PG_LOG_INFO, "%s", message);
+}
+
+/* Like fatal(), but with a complaint about a particular query. */
+static void
+die_on_query_failure(ArchiveHandle *AH, const char *query)
+{
+ pg_log_error("query failed: %s",
+ PQerrorMessage(AH->connection));
+ fatal("query was: %s", query);
+}
+
+void
+ExecuteSqlStatement(Archive *AHX, const char *query)
+{
+ ArchiveHandle *AH = (ArchiveHandle *) AHX;
+ PGresult *res;
+
+ res = PQexec(AH->connection, query);
+ if (PQresultStatus(res) != PGRES_COMMAND_OK)
+ die_on_query_failure(AH, query);
+ PQclear(res);
+}
+
+PGresult *
+ExecuteSqlQuery(Archive *AHX, const char *query, ExecStatusType status)
+{
+ ArchiveHandle *AH = (ArchiveHandle *) AHX;
+ PGresult *res;
+
+ res = PQexec(AH->connection, query);
+ if (PQresultStatus(res) != status)
+ die_on_query_failure(AH, query);
+ return res;
+}
+
+/*
+ * Execute an SQL query and verify that we got exactly one row back.
+ */
+PGresult *
+ExecuteSqlQueryForSingleRow(Archive *fout, const char *query)
+{
+ PGresult *res;
+ int ntups;
+
+ res = ExecuteSqlQuery(fout, query, PGRES_TUPLES_OK);
+
+ /* Expecting a single result only */
+ ntups = PQntuples(res);
+ if (ntups != 1)
+ fatal(ngettext("query returned %d row instead of one: %s",
+ "query returned %d rows instead of one: %s",
+ ntups),
+ ntups, query);
+
+ return res;
+}
+
+/*
+ * Convenience function to send a query.
+ * Monitors result to detect COPY statements
+ */
+static void
+ExecuteSqlCommand(ArchiveHandle *AH, const char *qry, const char *desc)
+{
+ PGconn *conn = AH->connection;
+ PGresult *res;
+
+#ifdef NOT_USED
+ fprintf(stderr, "Executing: '%s'\n\n", qry);
+#endif
+ res = PQexec(conn, qry);
+
+ switch (PQresultStatus(res))
+ {
+ case PGRES_COMMAND_OK:
+ case PGRES_TUPLES_OK:
+ case PGRES_EMPTY_QUERY:
+ /* A-OK */
+ break;
+ case PGRES_COPY_IN:
+ /* Assume this is an expected result */
+ AH->pgCopyIn = true;
+ break;
+ default:
+ /* trouble */
+ warn_or_exit_horribly(AH, "%s: %sCommand was: %s",
+ desc, PQerrorMessage(conn), qry);
+ break;
+ }
+
+ PQclear(res);
+}
+
+
+/*
+ * Process non-COPY table data (that is, INSERT commands).
+ *
+ * The commands have been run together as one long string for compressibility,
+ * and we are receiving them in bufferloads with arbitrary boundaries, so we
+ * have to locate command boundaries and save partial commands across calls.
+ * All state must be kept in AH->sqlparse, not in local variables of this
+ * routine. We assume that AH->sqlparse was filled with zeroes when created.
+ *
+ * We have to lex the data to the extent of identifying literals and quoted
+ * identifiers, so that we can recognize statement-terminating semicolons.
+ * We assume that INSERT data will not contain SQL comments, E'' literals,
+ * or dollar-quoted strings, so this is much simpler than a full SQL lexer.
+ *
+ * Note: when restoring from a pre-9.0 dump file, this code is also used to
+ * process BLOB COMMENTS data, which has the same problem of containing
+ * multiple SQL commands that might be split across bufferloads. Fortunately,
+ * that data won't contain anything complicated to lex either.
+ */
+static void
+ExecuteSimpleCommands(ArchiveHandle *AH, const char *buf, size_t bufLen)
+{
+ const char *qry = buf;
+ const char *eos = buf + bufLen;
+
+ /* initialize command buffer if first time through */
+ if (AH->sqlparse.curCmd == NULL)
+ AH->sqlparse.curCmd = createPQExpBuffer();
+
+ for (; qry < eos; qry++)
+ {
+ char ch = *qry;
+
+ /* For neatness, we skip any newlines between commands */
+ if (!(ch == '\n' && AH->sqlparse.curCmd->len == 0))
+ appendPQExpBufferChar(AH->sqlparse.curCmd, ch);
+
+ switch (AH->sqlparse.state)
+ {
+ case SQL_SCAN: /* Default state == 0, set in _allocAH */
+ if (ch == ';')
+ {
+ /*
+ * We've found the end of a statement. Send it and reset
+ * the buffer.
+ */
+ ExecuteSqlCommand(AH, AH->sqlparse.curCmd->data,
+ "could not execute query");
+ resetPQExpBuffer(AH->sqlparse.curCmd);
+ }
+ else if (ch == '\'')
+ {
+ AH->sqlparse.state = SQL_IN_SINGLE_QUOTE;
+ AH->sqlparse.backSlash = false;
+ }
+ else if (ch == '"')
+ {
+ AH->sqlparse.state = SQL_IN_DOUBLE_QUOTE;
+ }
+ break;
+
+ case SQL_IN_SINGLE_QUOTE:
+ /* We needn't handle '' specially */
+ if (ch == '\'' && !AH->sqlparse.backSlash)
+ AH->sqlparse.state = SQL_SCAN;
+ else if (ch == '\\' && !AH->public.std_strings)
+ AH->sqlparse.backSlash = !AH->sqlparse.backSlash;
+ else
+ AH->sqlparse.backSlash = false;
+ break;
+
+ case SQL_IN_DOUBLE_QUOTE:
+ /* We needn't handle "" specially */
+ if (ch == '"')
+ AH->sqlparse.state = SQL_SCAN;
+ break;
+ }
+ }
+}
+
+
+/*
+ * Implement ahwrite() for direct-to-DB restore
+ */
+int
+ExecuteSqlCommandBuf(Archive *AHX, const char *buf, size_t bufLen)
+{
+ ArchiveHandle *AH = (ArchiveHandle *) AHX;
+
+ if (AH->outputKind == OUTPUT_COPYDATA)
+ {
+ /*
+ * COPY data.
+ *
+ * We drop the data on the floor if libpq has failed to enter COPY
+ * mode; this allows us to behave reasonably when trying to continue
+ * after an error in a COPY command.
+ */
+ if (AH->pgCopyIn &&
+ PQputCopyData(AH->connection, buf, bufLen) <= 0)
+ fatal("error returned by PQputCopyData: %s",
+ PQerrorMessage(AH->connection));
+ }
+ else if (AH->outputKind == OUTPUT_OTHERDATA)
+ {
+ /*
+ * Table data expressed as INSERT commands; or, in old dump files,
+ * BLOB COMMENTS data (which is expressed as COMMENT ON commands).
+ */
+ ExecuteSimpleCommands(AH, buf, bufLen);
+ }
+ else
+ {
+ /*
+ * General SQL commands; we assume that commands will not be split
+ * across calls.
+ *
+ * In most cases the data passed to us will be a null-terminated
+ * string, but if it's not, we have to add a trailing null.
+ */
+ if (buf[bufLen] == '\0')
+ ExecuteSqlCommand(AH, buf, "could not execute query");
+ else
+ {
+ char *str = (char *) pg_malloc(bufLen + 1);
+
+ memcpy(str, buf, bufLen);
+ str[bufLen] = '\0';
+ ExecuteSqlCommand(AH, str, "could not execute query");
+ free(str);
+ }
+ }
+
+ return bufLen;
+}
+
+/*
+ * Terminate a COPY operation during direct-to-DB restore
+ */
+void
+EndDBCopyMode(Archive *AHX, const char *tocEntryTag)
+{
+ ArchiveHandle *AH = (ArchiveHandle *) AHX;
+
+ if (AH->pgCopyIn)
+ {
+ PGresult *res;
+
+ if (PQputCopyEnd(AH->connection, NULL) <= 0)
+ fatal("error returned by PQputCopyEnd: %s",
+ PQerrorMessage(AH->connection));
+
+ /* Check command status and return to normal libpq state */
+ res = PQgetResult(AH->connection);
+ if (PQresultStatus(res) != PGRES_COMMAND_OK)
+ warn_or_exit_horribly(AH, "COPY failed for table \"%s\": %s",
+ tocEntryTag, PQerrorMessage(AH->connection));
+ PQclear(res);
+
+ /* Do this to ensure we've pumped libpq back to idle state */
+ if (PQgetResult(AH->connection) != NULL)
+ pg_log_warning("unexpected extra results during COPY of table \"%s\"",
+ tocEntryTag);
+
+ AH->pgCopyIn = false;
+ }
+}
+
+void
+StartTransaction(Archive *AHX)
+{
+ ArchiveHandle *AH = (ArchiveHandle *) AHX;
+
+ ExecuteSqlCommand(AH, "BEGIN", "could not start database transaction");
+}
+
+void
+CommitTransaction(Archive *AHX)
+{
+ ArchiveHandle *AH = (ArchiveHandle *) AHX;
+
+ ExecuteSqlCommand(AH, "COMMIT", "could not commit database transaction");
+}
+
+void
+DropBlobIfExists(ArchiveHandle *AH, Oid oid)
+{
+ /*
+ * If we are not restoring to a direct database connection, we have to
+ * guess about how to detect whether the blob exists. Assume new-style.
+ */
+ if (AH->connection == NULL ||
+ PQserverVersion(AH->connection) >= 90000)
+ {
+ ahprintf(AH,
+ "SELECT pg_catalog.lo_unlink(oid) "
+ "FROM pg_catalog.pg_largeobject_metadata "
+ "WHERE oid = '%u';\n",
+ oid);
+ }
+ else
+ {
+ /* Restoring to pre-9.0 server, so do it the old way */
+ ahprintf(AH,
+ "SELECT CASE WHEN EXISTS("
+ "SELECT 1 FROM pg_catalog.pg_largeobject WHERE loid = '%u'"
+ ") THEN pg_catalog.lo_unlink('%u') END;\n",
+ oid, oid);
+ }
+}