diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-13 11:32:39 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-13 11:32:39 +0000 |
commit | 56ae875861ab260b80a030f50c4aff9f9dc8fff0 (patch) | |
tree | 531412110fc901a5918c7f7442202804a83cada9 /lib/db_ido_pgsql/idopgsqlconnection.cpp | |
parent | Initial commit. (diff) | |
download | icinga2-e6c8b97d844e301093c7e2c03da489629676e2c4.tar.xz icinga2-e6c8b97d844e301093c7e2c03da489629676e2c4.zip |
Adding upstream version 2.14.2.upstream/2.14.2upstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'lib/db_ido_pgsql/idopgsqlconnection.cpp')
-rw-r--r-- | lib/db_ido_pgsql/idopgsqlconnection.cpp | 1029 |
1 files changed, 1029 insertions, 0 deletions
diff --git a/lib/db_ido_pgsql/idopgsqlconnection.cpp b/lib/db_ido_pgsql/idopgsqlconnection.cpp new file mode 100644 index 0000000..07e88e6 --- /dev/null +++ b/lib/db_ido_pgsql/idopgsqlconnection.cpp @@ -0,0 +1,1029 @@ +/* Icinga 2 | (c) 2012 Icinga GmbH | GPLv2+ */ + +#include "db_ido_pgsql/idopgsqlconnection.hpp" +#include "db_ido_pgsql/idopgsqlconnection-ti.cpp" +#include "db_ido/dbtype.hpp" +#include "db_ido/dbvalue.hpp" +#include "base/logger.hpp" +#include "base/objectlock.hpp" +#include "base/convert.hpp" +#include "base/utility.hpp" +#include "base/perfdatavalue.hpp" +#include "base/application.hpp" +#include "base/configtype.hpp" +#include "base/exception.hpp" +#include "base/context.hpp" +#include "base/statsfunction.hpp" +#include "base/defer.hpp" +#include <utility> + +using namespace icinga; + +REGISTER_TYPE(IdoPgsqlConnection); + +REGISTER_STATSFUNCTION(IdoPgsqlConnection, &IdoPgsqlConnection::StatsFunc); + +const char * IdoPgsqlConnection::GetLatestSchemaVersion() const noexcept +{ + return "1.14.3"; +} + +const char * IdoPgsqlConnection::GetCompatSchemaVersion() const noexcept +{ + return "1.14.3"; +} + +IdoPgsqlConnection::IdoPgsqlConnection() +{ + m_QueryQueue.SetName("IdoPgsqlConnection, " + GetName()); +} + +void IdoPgsqlConnection::OnConfigLoaded() +{ + ObjectImpl<IdoPgsqlConnection>::OnConfigLoaded(); + + m_QueryQueue.SetName("IdoPgsqlConnection, " + GetName()); + + Library shimLibrary{"pgsql_shim"}; + + auto create_pgsql_shim = shimLibrary.GetSymbolAddress<create_pgsql_shim_ptr>("create_pgsql_shim"); + + m_Pgsql.reset(create_pgsql_shim()); + + std::swap(m_Library, shimLibrary); +} + +void IdoPgsqlConnection::StatsFunc(const Dictionary::Ptr& status, const Array::Ptr& perfdata) +{ + DictionaryData nodes; + + for (const IdoPgsqlConnection::Ptr& idopgsqlconnection : ConfigType::GetObjectsByType<IdoPgsqlConnection>()) { + size_t queryQueueItems = idopgsqlconnection->m_QueryQueue.GetLength(); + double queryQueueItemRate = idopgsqlconnection->m_QueryQueue.GetTaskCount(60) / 60.0; + + nodes.emplace_back(idopgsqlconnection->GetName(), new Dictionary({ + { "version", idopgsqlconnection->GetSchemaVersion() }, + { "instance_name", idopgsqlconnection->GetInstanceName() }, + { "connected", idopgsqlconnection->GetConnected() }, + { "query_queue_items", queryQueueItems }, + { "query_queue_item_rate", queryQueueItemRate } + })); + + perfdata->Add(new PerfdataValue("idopgsqlconnection_" + idopgsqlconnection->GetName() + "_queries_rate", idopgsqlconnection->GetQueryCount(60) / 60.0)); + perfdata->Add(new PerfdataValue("idopgsqlconnection_" + idopgsqlconnection->GetName() + "_queries_1min", idopgsqlconnection->GetQueryCount(60))); + perfdata->Add(new PerfdataValue("idopgsqlconnection_" + idopgsqlconnection->GetName() + "_queries_5mins", idopgsqlconnection->GetQueryCount(5 * 60))); + perfdata->Add(new PerfdataValue("idopgsqlconnection_" + idopgsqlconnection->GetName() + "_queries_15mins", idopgsqlconnection->GetQueryCount(15 * 60))); + perfdata->Add(new PerfdataValue("idopgsqlconnection_" + idopgsqlconnection->GetName() + "_query_queue_items", queryQueueItems)); + perfdata->Add(new PerfdataValue("idopgsqlconnection_" + idopgsqlconnection->GetName() + "_query_queue_item_rate", queryQueueItemRate)); + } + + status->Set("idopgsqlconnection", new Dictionary(std::move(nodes))); +} + +void IdoPgsqlConnection::Resume() +{ + Log(LogInformation, "IdoPgsqlConnection") + << "'" << GetName() << "' resumed."; + + SetConnected(false); + + m_QueryQueue.SetExceptionCallback([this](boost::exception_ptr exp) { ExceptionHandler(std::move(exp)); }); + + /* Immediately try to connect on Resume() without timer. */ + m_QueryQueue.Enqueue([this]() { Reconnect(); }, PriorityImmediate); + + m_TxTimer = Timer::Create(); + m_TxTimer->SetInterval(1); + m_TxTimer->OnTimerExpired.connect([this](const Timer * const&) { NewTransaction(); }); + m_TxTimer->Start(); + + m_ReconnectTimer = Timer::Create(); + m_ReconnectTimer->SetInterval(10); + m_ReconnectTimer->OnTimerExpired.connect([this](const Timer * const&) { ReconnectTimerHandler(); }); + m_ReconnectTimer->Start(); + + /* Start with queries after connect. */ + DbConnection::Resume(); + + ASSERT(m_Pgsql->isthreadsafe()); +} + +void IdoPgsqlConnection::Pause() +{ + DbConnection::Pause(); + + m_ReconnectTimer->Stop(true); + m_TxTimer->Stop(true); + + Log(LogInformation, "IdoPgsqlConnection") + << "'" << GetName() << "' paused."; +} + +void IdoPgsqlConnection::ExceptionHandler(boost::exception_ptr exp) +{ + Log(LogWarning, "IdoPgsqlConnection", "Exception during database operation: Verify that your database is operational!"); + + Log(LogDebug, "IdoPgsqlConnection") + << "Exception during database operation: " << DiagnosticInformation(std::move(exp)); + + if (GetConnected()) { + m_Pgsql->finish(m_Connection); + SetConnected(false); + } +} + +void IdoPgsqlConnection::AssertOnWorkQueue() +{ + ASSERT(m_QueryQueue.IsWorkerThread()); +} + +void IdoPgsqlConnection::Disconnect() +{ + AssertOnWorkQueue(); + + if (!GetConnected()) + return; + + IncreasePendingQueries(1); + Query("COMMIT"); + + m_Pgsql->finish(m_Connection); + SetConnected(false); + + Log(LogInformation, "IdoPgsqlConnection") + << "Disconnected from '" << GetName() << "' database '" << GetDatabase() << "'."; +} + +void IdoPgsqlConnection::NewTransaction() +{ + if (IsPaused()) + return; + + m_QueryQueue.Enqueue([this]() { InternalNewTransaction(); }, PriorityNormal, true); +} + +void IdoPgsqlConnection::InternalNewTransaction() +{ + AssertOnWorkQueue(); + + if (!GetConnected()) + return; + + IncreasePendingQueries(2); + Query("COMMIT"); + Query("BEGIN"); +} + +void IdoPgsqlConnection::ReconnectTimerHandler() +{ + /* Only allow Reconnect events with high priority. */ + m_QueryQueue.Enqueue([this]() { Reconnect(); }, PriorityHigh); +} + +void IdoPgsqlConnection::Reconnect() +{ + AssertOnWorkQueue(); + + CONTEXT("Reconnecting to PostgreSQL IDO database '" << GetName() << "'"); + + double startTime = Utility::GetTime(); + + SetShouldConnect(true); + + bool reconnect = false; + + if (GetConnected()) { + /* Check if we're really still connected */ + try { + IncreasePendingQueries(1); + Query("SELECT 1"); + return; + } catch (const std::exception&) { + m_Pgsql->finish(m_Connection); + SetConnected(false); + reconnect = true; + } + } + + ClearIDCache(); + + String host = GetHost(); + String port = GetPort(); + String user = GetUser(); + String password = GetPassword(); + String database = GetDatabase(); + + String sslMode = GetSslMode(); + String sslKey = GetSslKey(); + String sslCert = GetSslCert(); + String sslCa = GetSslCa(); + + String conninfo; + + if (!host.IsEmpty()) + conninfo += " host=" + host; + if (!port.IsEmpty()) + conninfo += " port=" + port; + if (!user.IsEmpty()) + conninfo += " user=" + user; + if (!password.IsEmpty()) + conninfo += " password=" + password; + if (!database.IsEmpty()) + conninfo += " dbname=" + database; + + if (!sslMode.IsEmpty()) + conninfo += " sslmode=" + sslMode; + if (!sslKey.IsEmpty()) + conninfo += " sslkey=" + sslKey; + if (!sslCert.IsEmpty()) + conninfo += " sslcert=" + sslCert; + if (!sslCa.IsEmpty()) + conninfo += " sslrootcert=" + sslCa; + + /* connection */ + m_Connection = m_Pgsql->connectdb(conninfo.CStr()); + + if (!m_Connection) + return; + + if (m_Pgsql->status(m_Connection) != CONNECTION_OK) { + String message = m_Pgsql->errorMessage(m_Connection); + m_Pgsql->finish(m_Connection); + SetConnected(false); + + Log(LogCritical, "IdoPgsqlConnection") + << "Connection to database '" << database << "' with user '" << user << "' on '" << host << ":" << port + << "' failed: \"" << message << "\""; + + BOOST_THROW_EXCEPTION(std::runtime_error(message)); + } + + SetConnected(true); + + IdoPgsqlResult result; + + String dbVersionName = "idoutils"; + IncreasePendingQueries(1); + result = Query("SELECT version FROM " + GetTablePrefix() + "dbversion WHERE name='" + Escape(dbVersionName) + "'"); + + Dictionary::Ptr row = FetchRow(result, 0); + + if (!row) { + m_Pgsql->finish(m_Connection); + SetConnected(false); + + Log(LogCritical, "IdoPgsqlConnection", "Schema does not provide any valid version! Verify your schema installation."); + + BOOST_THROW_EXCEPTION(std::runtime_error("Invalid schema.")); + } + + String version = row->Get("version"); + + SetSchemaVersion(version); + + if (Utility::CompareVersion(GetCompatSchemaVersion(), version) < 0) { + m_Pgsql->finish(m_Connection); + SetConnected(false); + + Log(LogCritical, "IdoPgsqlConnection") + << "Schema version '" << version << "' does not match the required version '" + << GetCompatSchemaVersion() << "' (or newer)! Please check the upgrade documentation at " + << "https://icinga.com/docs/icinga2/latest/doc/16-upgrading-icinga-2/#upgrading-postgresql-db"; + + BOOST_THROW_EXCEPTION(std::runtime_error("Schema version mismatch.")); + } + + String instanceName = GetInstanceName(); + + IncreasePendingQueries(1); + result = Query("SELECT instance_id FROM " + GetTablePrefix() + "instances WHERE instance_name = '" + Escape(instanceName) + "'"); + row = FetchRow(result, 0); + + if (!row) { + IncreasePendingQueries(1); + Query("INSERT INTO " + GetTablePrefix() + "instances (instance_name, instance_description) VALUES ('" + Escape(instanceName) + "', '" + Escape(GetInstanceDescription()) + "')"); + m_InstanceID = GetSequenceValue(GetTablePrefix() + "instances", "instance_id"); + } else { + m_InstanceID = DbReference(row->Get("instance_id")); + } + + Endpoint::Ptr my_endpoint = Endpoint::GetLocalEndpoint(); + + /* we have an endpoint in a cluster setup, so decide if we can proceed here */ + if (my_endpoint && GetHAMode() == HARunOnce) { + /* get the current endpoint writing to programstatus table */ + IncreasePendingQueries(1); + result = Query("SELECT UNIX_TIMESTAMP(status_update_time) AS status_update_time, endpoint_name FROM " + + GetTablePrefix() + "programstatus WHERE instance_id = " + Convert::ToString(m_InstanceID)); + row = FetchRow(result, 0); + + String endpoint_name; + + if (row) + endpoint_name = row->Get("endpoint_name"); + else + Log(LogNotice, "IdoPgsqlConnection", "Empty program status table"); + + /* if we did not write into the database earlier, another instance is active */ + if (endpoint_name != my_endpoint->GetName()) { + double status_update_time; + + if (row) + status_update_time = row->Get("status_update_time"); + else + status_update_time = 0; + + double now = Utility::GetTime(); + + double status_update_age = now - status_update_time; + double failoverTimeout = GetFailoverTimeout(); + + if (status_update_age < GetFailoverTimeout()) { + Log(LogInformation, "IdoPgsqlConnection") + << "Last update by endpoint '" << endpoint_name << "' was " + << status_update_age << "s ago (< failover timeout of " << failoverTimeout << "s). Retrying."; + + m_Pgsql->finish(m_Connection); + SetConnected(false); + SetShouldConnect(false); + + return; + } + + /* activate the IDO only, if we're authoritative in this zone */ + if (IsPaused()) { + Log(LogNotice, "IdoPgsqlConnection") + << "Local endpoint '" << my_endpoint->GetName() << "' is not authoritative, bailing out."; + + m_Pgsql->finish(m_Connection); + SetConnected(false); + + return; + } + + SetLastFailover(now); + + Log(LogInformation, "IdoPgsqlConnection") + << "Last update by endpoint '" << endpoint_name << "' was " + << status_update_age << "s ago. Taking over '" << GetName() << "' in HA zone '" << Zone::GetLocalZone()->GetName() << "'."; + } + + Log(LogNotice, "IdoPgsqlConnection", "Enabling IDO connection."); + } + + Log(LogInformation, "IdoPgsqlConnection") + << "PGSQL IDO instance id: " << static_cast<long>(m_InstanceID) << " (schema version: '" + version + "')" + << (!sslMode.IsEmpty() ? ", sslmode='" + sslMode + "'" : ""); + + IncreasePendingQueries(1); + Query("BEGIN"); + + /* update programstatus table */ + UpdateProgramStatus(); + + /* record connection */ + IncreasePendingQueries(1); + Query("INSERT INTO " + GetTablePrefix() + "conninfo " + + "(instance_id, connect_time, last_checkin_time, agent_name, agent_version, connect_type, data_start_time) VALUES (" + + Convert::ToString(static_cast<long>(m_InstanceID)) + ", NOW(), NOW(), 'icinga2 db_ido_pgsql', '" + Escape(Application::GetAppVersion()) + + "', '" + (reconnect ? "RECONNECT" : "INITIAL") + "', NOW())"); + + /* clear config tables for the initial config dump */ + PrepareDatabase(); + + std::ostringstream q1buf; + q1buf << "SELECT object_id, objecttype_id, name1, name2, is_active FROM " + GetTablePrefix() + "objects WHERE instance_id = " << static_cast<long>(m_InstanceID); + IncreasePendingQueries(1); + result = Query(q1buf.str()); + + std::vector<DbObject::Ptr> activeDbObjs; + + int index = 0; + while ((row = FetchRow(result, index))) { + index++; + + DbType::Ptr dbtype = DbType::GetByID(row->Get("objecttype_id")); + + if (!dbtype) + continue; + + DbObject::Ptr dbobj = dbtype->GetOrCreateObjectByName(row->Get("name1"), row->Get("name2")); + SetObjectID(dbobj, DbReference(row->Get("object_id"))); + bool active = row->Get("is_active"); + SetObjectActive(dbobj, active); + + if (active) + activeDbObjs.push_back(dbobj); + } + + SetIDCacheValid(true); + + EnableActiveChangedHandler(); + + for (const DbObject::Ptr& dbobj : activeDbObjs) { + if (dbobj->GetObject()) + continue; + + Log(LogNotice, "IdoPgsqlConnection") + << "Deactivate deleted object name1: '" << dbobj->GetName1() + << "' name2: '" << dbobj->GetName2() + "'."; + DeactivateObject(dbobj); + } + + UpdateAllObjects(); + + m_QueryQueue.Enqueue([this]() { ClearTablesBySession(); }, PriorityNormal); + + m_QueryQueue.Enqueue([this, startTime]() { FinishConnect(startTime); }, PriorityNormal); +} + +void IdoPgsqlConnection::FinishConnect(double startTime) +{ + AssertOnWorkQueue(); + + if (!GetConnected()) + return; + + Log(LogInformation, "IdoPgsqlConnection") + << "Finished reconnecting to '" << GetName() << "' database '" << GetDatabase() << "' in " + << std::setw(2) << Utility::GetTime() - startTime << " second(s)."; + + IncreasePendingQueries(2); + Query("COMMIT"); + Query("BEGIN"); +} + +void IdoPgsqlConnection::ClearTablesBySession() +{ + /* delete all comments and downtimes without current session token */ + ClearTableBySession("comments"); + ClearTableBySession("scheduleddowntime"); +} + +void IdoPgsqlConnection::ClearTableBySession(const String& table) +{ + IncreasePendingQueries(1); + Query("DELETE FROM " + GetTablePrefix() + table + " WHERE instance_id = " + + Convert::ToString(static_cast<long>(m_InstanceID)) + " AND session_token <> " + + Convert::ToString(GetSessionToken())); +} + +IdoPgsqlResult IdoPgsqlConnection::Query(const String& query) +{ + AssertOnWorkQueue(); + + Defer decreaseQueries ([this]() { DecreasePendingQueries(1); }); + + Log(LogDebug, "IdoPgsqlConnection") + << "Query: " << query; + + IncreaseQueryCount(); + + PGresult *result = m_Pgsql->exec(m_Connection, query.CStr()); + + if (!result) { + String message = m_Pgsql->errorMessage(m_Connection); + Log(LogCritical, "IdoPgsqlConnection") + << "Error \"" << message << "\" when executing query \"" << query << "\""; + + BOOST_THROW_EXCEPTION( + database_error() + << errinfo_message(message) + << errinfo_database_query(query) + ); + } + + char *rowCount = m_Pgsql->cmdTuples(result); + m_AffectedRows = atoi(rowCount); + + if (m_Pgsql->resultStatus(result) == PGRES_COMMAND_OK) { + m_Pgsql->clear(result); + return IdoPgsqlResult(); + } + + if (m_Pgsql->resultStatus(result) != PGRES_TUPLES_OK) { + String message = m_Pgsql->resultErrorMessage(result); + m_Pgsql->clear(result); + + Log(LogCritical, "IdoPgsqlConnection") + << "Error \"" << message << "\" when executing query \"" << query << "\""; + + BOOST_THROW_EXCEPTION( + database_error() + << errinfo_message(message) + << errinfo_database_query(query) + ); + } + + return IdoPgsqlResult(result, [this](PGresult* result) { m_Pgsql->clear(result); }); +} + +DbReference IdoPgsqlConnection::GetSequenceValue(const String& table, const String& column) +{ + AssertOnWorkQueue(); + + IncreasePendingQueries(1); + IdoPgsqlResult result = Query("SELECT CURRVAL(pg_get_serial_sequence('" + Escape(table) + "', '" + Escape(column) + "')) AS id"); + + Dictionary::Ptr row = FetchRow(result, 0); + + ASSERT(row); + + Log(LogDebug, "IdoPgsqlConnection") + << "Sequence Value: " << row->Get("id"); + + return {Convert::ToLong(row->Get("id"))}; +} + +int IdoPgsqlConnection::GetAffectedRows() +{ + AssertOnWorkQueue(); + + return m_AffectedRows; +} + +String IdoPgsqlConnection::Escape(const String& s) +{ + AssertOnWorkQueue(); + + String utf8s = Utility::ValidateUTF8(s); + + size_t length = utf8s.GetLength(); + auto *to = new char[utf8s.GetLength() * 2 + 1]; + + m_Pgsql->escapeStringConn(m_Connection, to, utf8s.CStr(), length, nullptr); + + String result = String(to); + + delete [] to; + + return result; +} + +Dictionary::Ptr IdoPgsqlConnection::FetchRow(const IdoPgsqlResult& result, int row) +{ + AssertOnWorkQueue(); + + if (row >= m_Pgsql->ntuples(result.get())) + return nullptr; + + int columns = m_Pgsql->nfields(result.get()); + + DictionaryData dict; + + for (int column = 0; column < columns; column++) { + Value value; + + if (!m_Pgsql->getisnull(result.get(), row, column)) + value = m_Pgsql->getvalue(result.get(), row, column); + + dict.emplace_back(m_Pgsql->fname(result.get(), column), value); + } + + return new Dictionary(std::move(dict)); +} + +void IdoPgsqlConnection::ActivateObject(const DbObject::Ptr& dbobj) +{ + if (IsPaused()) + return; + + m_QueryQueue.Enqueue([this, dbobj]() { InternalActivateObject(dbobj); }, PriorityNormal); +} + +void IdoPgsqlConnection::InternalActivateObject(const DbObject::Ptr& dbobj) +{ + AssertOnWorkQueue(); + + if (!GetConnected()) + return; + + DbReference dbref = GetObjectID(dbobj); + std::ostringstream qbuf; + + if (!dbref.IsValid()) { + if (!dbobj->GetName2().IsEmpty()) { + qbuf << "INSERT INTO " + GetTablePrefix() + "objects (instance_id, objecttype_id, name1, name2, is_active) VALUES (" + << static_cast<long>(m_InstanceID) << ", " << dbobj->GetType()->GetTypeID() << ", " + << "'" << Escape(dbobj->GetName1()) << "', '" << Escape(dbobj->GetName2()) << "', 1)"; + } else { + qbuf << "INSERT INTO " + GetTablePrefix() + "objects (instance_id, objecttype_id, name1, is_active) VALUES (" + << static_cast<long>(m_InstanceID) << ", " << dbobj->GetType()->GetTypeID() << ", " + << "'" << Escape(dbobj->GetName1()) << "', 1)"; + } + + IncreasePendingQueries(1); + Query(qbuf.str()); + SetObjectID(dbobj, GetSequenceValue(GetTablePrefix() + "objects", "object_id")); + } else { + qbuf << "UPDATE " + GetTablePrefix() + "objects SET is_active = 1 WHERE object_id = " << static_cast<long>(dbref); + IncreasePendingQueries(1); + Query(qbuf.str()); + } +} + +void IdoPgsqlConnection::DeactivateObject(const DbObject::Ptr& dbobj) +{ + if (IsPaused()) + return; + + m_QueryQueue.Enqueue([this, dbobj]() { InternalDeactivateObject(dbobj); }, PriorityNormal); +} + +void IdoPgsqlConnection::InternalDeactivateObject(const DbObject::Ptr& dbobj) +{ + AssertOnWorkQueue(); + + if (!GetConnected()) + return; + + DbReference dbref = GetObjectID(dbobj); + + if (!dbref.IsValid()) + return; + + std::ostringstream qbuf; + qbuf << "UPDATE " + GetTablePrefix() + "objects SET is_active = 0 WHERE object_id = " << static_cast<long>(dbref); + IncreasePendingQueries(1); + Query(qbuf.str()); + + /* Note that we're _NOT_ clearing the db refs via SetReference/SetConfigUpdate/SetStatusUpdate + * because the object is still in the database. */ + + SetObjectActive(dbobj, false); +} + +bool IdoPgsqlConnection::FieldToEscapedString(const String& key, const Value& value, Value *result) +{ + if (key == "instance_id") { + *result = static_cast<long>(m_InstanceID); + return true; + } else if (key == "session_token") { + *result = GetSessionToken(); + return true; + } + + Value rawvalue = DbValue::ExtractValue(value); + + if (rawvalue.GetType() == ValueEmpty) { + *result = "NULL"; + } else if (rawvalue.IsObjectType<ConfigObject>()) { + DbObject::Ptr dbobjcol = DbObject::GetOrCreateByObject(rawvalue); + + if (!dbobjcol) { + *result = 0; + return true; + } + + if (!IsIDCacheValid()) + return false; + + DbReference dbrefcol; + + if (DbValue::IsObjectInsertID(value)) { + dbrefcol = GetInsertID(dbobjcol); + + if (!dbrefcol.IsValid()) + return false; + } else { + dbrefcol = GetObjectID(dbobjcol); + + if (!dbrefcol.IsValid()) { + InternalActivateObject(dbobjcol); + + dbrefcol = GetObjectID(dbobjcol); + + if (!dbrefcol.IsValid()) + return false; + } + } + + *result = static_cast<long>(dbrefcol); + } else if (DbValue::IsTimestamp(value)) { + long ts = rawvalue; + std::ostringstream msgbuf; + msgbuf << "TO_TIMESTAMP(" << ts << ") AT TIME ZONE 'UTC'"; + *result = Value(msgbuf.str()); + } else if (DbValue::IsObjectInsertID(value)) { + auto id = static_cast<long>(rawvalue); + + if (id <= 0) + return false; + + *result = id; + return true; + } else { + Value fvalue; + + if (rawvalue.IsBoolean()) + fvalue = Convert::ToLong(rawvalue); + else + fvalue = rawvalue; + + *result = "'" + Escape(fvalue) + "'"; + } + + return true; +} + +void IdoPgsqlConnection::ExecuteQuery(const DbQuery& query) +{ + if (IsPaused() && GetPauseCalled()) + return; + + ASSERT(query.Category != DbCatInvalid); + + IncreasePendingQueries(1); + m_QueryQueue.Enqueue([this, query]() { InternalExecuteQuery(query, -1); }, query.Priority, true); +} + +void IdoPgsqlConnection::ExecuteMultipleQueries(const std::vector<DbQuery>& queries) +{ + if (IsPaused()) + return; + + if (queries.empty()) + return; + + IncreasePendingQueries(queries.size()); + m_QueryQueue.Enqueue([this, queries]() { InternalExecuteMultipleQueries(queries); }, queries[0].Priority, true); +} + +bool IdoPgsqlConnection::CanExecuteQuery(const DbQuery& query) +{ + if (query.Object && !IsIDCacheValid()) + return false; + + if (query.WhereCriteria) { + ObjectLock olock(query.WhereCriteria); + Value value; + + for (const Dictionary::Pair& kv : query.WhereCriteria) { + if (!FieldToEscapedString(kv.first, kv.second, &value)) + return false; + } + } + + if (query.Fields) { + ObjectLock olock(query.Fields); + + for (const Dictionary::Pair& kv : query.Fields) { + Value value; + + if (!FieldToEscapedString(kv.first, kv.second, &value)) + return false; + } + } + + return true; +} + +void IdoPgsqlConnection::InternalExecuteMultipleQueries(const std::vector<DbQuery>& queries) +{ + AssertOnWorkQueue(); + + if (IsPaused()) { + DecreasePendingQueries(queries.size()); + return; + } + + if (!GetConnected()) { + DecreasePendingQueries(queries.size()); + return; + } + + for (const DbQuery& query : queries) { + ASSERT(query.Type == DbQueryNewTransaction || query.Category != DbCatInvalid); + + if (!CanExecuteQuery(query)) { + m_QueryQueue.Enqueue([this, queries]() { InternalExecuteMultipleQueries(queries); }, query.Priority); + return; + } + } + + for (const DbQuery& query : queries) { + InternalExecuteQuery(query); + } +} + +void IdoPgsqlConnection::InternalExecuteQuery(const DbQuery& query, int typeOverride) +{ + AssertOnWorkQueue(); + + if (IsPaused() && GetPauseCalled()) { + DecreasePendingQueries(1); + return; + } + + if (!GetConnected()) { + DecreasePendingQueries(1); + return; + } + + if (query.Type == DbQueryNewTransaction) { + DecreasePendingQueries(1); + InternalNewTransaction(); + return; + } + + /* check whether we're allowed to execute the query first */ + if (GetCategoryFilter() != DbCatEverything && (query.Category & GetCategoryFilter()) == 0) { + DecreasePendingQueries(1); + return; + } + + if (query.Object && query.Object->GetObject()->GetExtension("agent_check").ToBool()) { + DecreasePendingQueries(1); + return; + } + + /* check if there are missing object/insert ids and re-enqueue the query */ + if (!CanExecuteQuery(query)) { + m_QueryQueue.Enqueue([this, query, typeOverride]() { InternalExecuteQuery(query, typeOverride); }, query.Priority); + return; + } + + std::ostringstream qbuf, where; + int type; + + if (query.WhereCriteria) { + where << " WHERE "; + + ObjectLock olock(query.WhereCriteria); + Value value; + bool first = true; + + for (const Dictionary::Pair& kv : query.WhereCriteria) { + if (!FieldToEscapedString(kv.first, kv.second, &value)) { + m_QueryQueue.Enqueue([this, query]() { InternalExecuteQuery(query, -1); }, query.Priority); + return; + } + + if (!first) + where << " AND "; + + where << kv.first << " = " << value; + + if (first) + first = false; + } + } + + type = (typeOverride != -1) ? typeOverride : query.Type; + + bool upsert = false; + + if ((type & DbQueryInsert) && (type & DbQueryUpdate)) { + bool hasid = false; + + if (query.Object) { + if (query.ConfigUpdate) + hasid = GetConfigUpdate(query.Object); + else if (query.StatusUpdate) + hasid = GetStatusUpdate(query.Object); + } + + if (!hasid) + upsert = true; + + type = DbQueryUpdate; + } + + if ((type & DbQueryInsert) && (type & DbQueryDelete)) { + std::ostringstream qdel; + qdel << "DELETE FROM " << GetTablePrefix() << query.Table << where.str(); + IncreasePendingQueries(1); + Query(qdel.str()); + + type = DbQueryInsert; + } + + switch (type) { + case DbQueryInsert: + qbuf << "INSERT INTO " << GetTablePrefix() << query.Table; + break; + case DbQueryUpdate: + qbuf << "UPDATE " << GetTablePrefix() << query.Table << " SET"; + break; + case DbQueryDelete: + qbuf << "DELETE FROM " << GetTablePrefix() << query.Table; + break; + default: + VERIFY(!"Invalid query type."); + } + + if (type == DbQueryInsert || type == DbQueryUpdate) { + std::ostringstream colbuf, valbuf; + + if (type == DbQueryUpdate && query.Fields->GetLength() == 0) + return; + + ObjectLock olock(query.Fields); + + Value value; + bool first = true; + for (const Dictionary::Pair& kv : query.Fields) { + if (!FieldToEscapedString(kv.first, kv.second, &value)) { + m_QueryQueue.Enqueue([this, query]() { InternalExecuteQuery(query, -1); }, query.Priority); + return; + } + + if (type == DbQueryInsert) { + if (!first) { + colbuf << ", "; + valbuf << ", "; + } + + colbuf << kv.first; + valbuf << value; + } else { + if (!first) + qbuf << ", "; + + qbuf << " " << kv.first << " = " << value; + } + + if (first) + first = false; + } + + if (type == DbQueryInsert) + qbuf << " (" << colbuf.str() << ") VALUES (" << valbuf.str() << ")"; + } + + if (type != DbQueryInsert) + qbuf << where.str(); + + Query(qbuf.str()); + + if (upsert && GetAffectedRows() == 0) { + IncreasePendingQueries(1); + InternalExecuteQuery(query, DbQueryDelete | DbQueryInsert); + + return; + } + + if (type == DbQueryInsert && query.Object) { + if (query.ConfigUpdate) { + String idField = query.IdColumn; + + if (idField.IsEmpty()) + idField = query.Table.SubStr(0, query.Table.GetLength() - 1) + "_id"; + + SetInsertID(query.Object, GetSequenceValue(GetTablePrefix() + query.Table, idField)); + + SetConfigUpdate(query.Object, true); + } else if (query.StatusUpdate) + SetStatusUpdate(query.Object, true); + } + + if (type == DbQueryInsert && query.Table == "notifications" && query.NotificationInsertID) { + DbReference seqval = GetSequenceValue(GetTablePrefix() + query.Table, "notification_id"); + query.NotificationInsertID->SetValue(static_cast<long>(seqval)); + } +} + +void IdoPgsqlConnection::CleanUpExecuteQuery(const String& table, const String& time_column, double max_age) +{ + if (IsPaused()) + return; + + IncreasePendingQueries(1); + m_QueryQueue.Enqueue([this, table, time_column, max_age]() { InternalCleanUpExecuteQuery(table, time_column, max_age); }, PriorityLow, true); +} + +void IdoPgsqlConnection::InternalCleanUpExecuteQuery(const String& table, const String& time_column, double max_age) +{ + AssertOnWorkQueue(); + + if (!GetConnected()) { + DecreasePendingQueries(1); + return; + } + + Query("DELETE FROM " + GetTablePrefix() + table + " WHERE instance_id = " + + Convert::ToString(static_cast<long>(m_InstanceID)) + " AND " + time_column + + " < TO_TIMESTAMP(" + Convert::ToString(static_cast<long>(max_age)) + ") AT TIME ZONE 'UTC'"); +} + +void IdoPgsqlConnection::FillIDCache(const DbType::Ptr& type) +{ + String query = "SELECT " + type->GetIDColumn() + " AS object_id, " + type->GetTable() + "_id, config_hash FROM " + GetTablePrefix() + type->GetTable() + "s"; + IncreasePendingQueries(1); + IdoPgsqlResult result = Query(query); + + Dictionary::Ptr row; + + int index = 0; + while ((row = FetchRow(result, index))) { + index++; + DbReference dbref(row->Get("object_id")); + SetInsertID(type, dbref, DbReference(row->Get(type->GetTable() + "_id"))); + SetConfigHash(type, dbref, row->Get("config_hash")); + } +} + +int IdoPgsqlConnection::GetPendingQueryCount() const +{ + return m_QueryQueue.GetLength(); +} |