summaryrefslogtreecommitdiffstats
path: root/lib/db_ido_mysql/idomysqlconnection.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'lib/db_ido_mysql/idomysqlconnection.cpp')
-rw-r--r--lib/db_ido_mysql/idomysqlconnection.cpp1269
1 files changed, 1269 insertions, 0 deletions
diff --git a/lib/db_ido_mysql/idomysqlconnection.cpp b/lib/db_ido_mysql/idomysqlconnection.cpp
new file mode 100644
index 0000000..aacb7d7
--- /dev/null
+++ b/lib/db_ido_mysql/idomysqlconnection.cpp
@@ -0,0 +1,1269 @@
+/* Icinga 2 | (c) 2012 Icinga GmbH | GPLv2+ */
+
+#include "db_ido_mysql/idomysqlconnection.hpp"
+#include "db_ido_mysql/idomysqlconnection-ti.cpp"
+#include "db_ido/dbtype.hpp"
+#include "db_ido/dbvalue.hpp"
+#include "base/logger.hpp"
+#include "base/objectlock.hpp"
+#include "base/convert.hpp"
+#include "base/utility.hpp"
+#include "base/perfdatavalue.hpp"
+#include "base/application.hpp"
+#include "base/configtype.hpp"
+#include "base/exception.hpp"
+#include "base/statsfunction.hpp"
+#include "base/defer.hpp"
+#include <utility>
+
+using namespace icinga;
+
+REGISTER_TYPE(IdoMysqlConnection);
+REGISTER_STATSFUNCTION(IdoMysqlConnection, &IdoMysqlConnection::StatsFunc);
+
+const char * IdoMysqlConnection::GetLatestSchemaVersion() const noexcept
+{
+ return "1.15.1";
+}
+
+const char * IdoMysqlConnection::GetCompatSchemaVersion() const noexcept
+{
+ return "1.14.3";
+}
+
+void IdoMysqlConnection::OnConfigLoaded()
+{
+ ObjectImpl<IdoMysqlConnection>::OnConfigLoaded();
+
+ m_QueryQueue.SetName("IdoMysqlConnection, " + GetName());
+
+ Library shimLibrary{"mysql_shim"};
+
+ auto create_mysql_shim = shimLibrary.GetSymbolAddress<create_mysql_shim_ptr>("create_mysql_shim");
+
+ m_Mysql.reset(create_mysql_shim());
+
+ std::swap(m_Library, shimLibrary);
+}
+
+void IdoMysqlConnection::StatsFunc(const Dictionary::Ptr& status, const Array::Ptr& perfdata)
+{
+ DictionaryData nodes;
+
+ for (const IdoMysqlConnection::Ptr& idomysqlconnection : ConfigType::GetObjectsByType<IdoMysqlConnection>()) {
+ size_t queryQueueItems = idomysqlconnection->m_QueryQueue.GetLength();
+ double queryQueueItemRate = idomysqlconnection->m_QueryQueue.GetTaskCount(60) / 60.0;
+
+ nodes.emplace_back(idomysqlconnection->GetName(), new Dictionary({
+ { "version", idomysqlconnection->GetSchemaVersion() },
+ { "instance_name", idomysqlconnection->GetInstanceName() },
+ { "connected", idomysqlconnection->GetConnected() },
+ { "query_queue_items", queryQueueItems },
+ { "query_queue_item_rate", queryQueueItemRate }
+ }));
+
+ perfdata->Add(new PerfdataValue("idomysqlconnection_" + idomysqlconnection->GetName() + "_queries_rate", idomysqlconnection->GetQueryCount(60) / 60.0));
+ perfdata->Add(new PerfdataValue("idomysqlconnection_" + idomysqlconnection->GetName() + "_queries_1min", idomysqlconnection->GetQueryCount(60)));
+ perfdata->Add(new PerfdataValue("idomysqlconnection_" + idomysqlconnection->GetName() + "_queries_5mins", idomysqlconnection->GetQueryCount(5 * 60)));
+ perfdata->Add(new PerfdataValue("idomysqlconnection_" + idomysqlconnection->GetName() + "_queries_15mins", idomysqlconnection->GetQueryCount(15 * 60)));
+ perfdata->Add(new PerfdataValue("idomysqlconnection_" + idomysqlconnection->GetName() + "_query_queue_items", queryQueueItems));
+ perfdata->Add(new PerfdataValue("idomysqlconnection_" + idomysqlconnection->GetName() + "_query_queue_item_rate", queryQueueItemRate));
+ }
+
+ status->Set("idomysqlconnection", new Dictionary(std::move(nodes)));
+}
+
+void IdoMysqlConnection::Resume()
+{
+ Log(LogInformation, "IdoMysqlConnection")
+ << "'" << GetName() << "' resumed.";
+
+ SetConnected(false);
+
+ m_QueryQueue.SetExceptionCallback([this](boost::exception_ptr exp) { ExceptionHandler(std::move(exp)); });
+
+ /* Immediately try to connect on Resume() without timer. */
+ m_QueryQueue.Enqueue([this]() { Reconnect(); }, PriorityImmediate);
+
+ m_TxTimer = Timer::Create();
+ m_TxTimer->SetInterval(1);
+ m_TxTimer->OnTimerExpired.connect([this](const Timer * const&) { NewTransaction(); });
+ m_TxTimer->Start();
+
+ m_ReconnectTimer = Timer::Create();
+ m_ReconnectTimer->SetInterval(10);
+ m_ReconnectTimer->OnTimerExpired.connect([this](const Timer * const&){ ReconnectTimerHandler(); });
+ m_ReconnectTimer->Start();
+
+ /* Start with queries after connect. */
+ DbConnection::Resume();
+
+ ASSERT(m_Mysql->thread_safe());
+}
+
+void IdoMysqlConnection::Pause()
+{
+ Log(LogDebug, "IdoMysqlConnection")
+ << "Attempting to pause '" << GetName() << "'.";
+
+ DbConnection::Pause();
+
+ m_ReconnectTimer->Stop(true);
+ m_TxTimer->Stop(true);
+
+#ifdef I2_DEBUG /* I2_DEBUG */
+ Log(LogDebug, "IdoMysqlConnection")
+ << "Rescheduling disconnect task.";
+#endif /* I2_DEBUG */
+
+ Log(LogInformation, "IdoMysqlConnection")
+ << "'" << GetName() << "' paused.";
+
+}
+
+void IdoMysqlConnection::ExceptionHandler(boost::exception_ptr exp)
+{
+ Log(LogCritical, "IdoMysqlConnection", "Exception during database operation: Verify that your database is operational!");
+
+ Log(LogDebug, "IdoMysqlConnection")
+ << "Exception during database operation: " << DiagnosticInformation(std::move(exp));
+
+ if (GetConnected()) {
+ m_Mysql->close(&m_Connection);
+
+ SetConnected(false);
+ }
+}
+
+void IdoMysqlConnection::AssertOnWorkQueue()
+{
+ ASSERT(m_QueryQueue.IsWorkerThread());
+}
+
+void IdoMysqlConnection::Disconnect()
+{
+ AssertOnWorkQueue();
+
+ if (!GetConnected())
+ return;
+
+ Query("COMMIT");
+ m_Mysql->close(&m_Connection);
+
+ SetConnected(false);
+
+ Log(LogInformation, "IdoMysqlConnection")
+ << "Disconnected from '" << GetName() << "' database '" << GetDatabase() << "'.";
+}
+
+void IdoMysqlConnection::NewTransaction()
+{
+ if (IsPaused() && GetPauseCalled())
+ return;
+
+#ifdef I2_DEBUG /* I2_DEBUG */
+ Log(LogDebug, "IdoMysqlConnection")
+ << "Scheduling new transaction and finishing async queries.";
+#endif /* I2_DEBUG */
+
+ m_QueryQueue.Enqueue([this]() { InternalNewTransaction(); }, PriorityHigh);
+}
+
+void IdoMysqlConnection::InternalNewTransaction()
+{
+ AssertOnWorkQueue();
+
+ if (!GetConnected())
+ return;
+
+ IncreasePendingQueries(2);
+
+ AsyncQuery("COMMIT");
+ AsyncQuery("BEGIN");
+
+ FinishAsyncQueries();
+}
+
+void IdoMysqlConnection::ReconnectTimerHandler()
+{
+#ifdef I2_DEBUG /* I2_DEBUG */
+ Log(LogDebug, "IdoMysqlConnection")
+ << "Scheduling reconnect task.";
+#endif /* I2_DEBUG */
+
+ /* Only allow Reconnect events with high priority. */
+ m_QueryQueue.Enqueue([this]() { Reconnect(); }, PriorityImmediate);
+}
+
+void IdoMysqlConnection::Reconnect()
+{
+ AssertOnWorkQueue();
+
+ if (!IsActive())
+ return;
+
+ CONTEXT("Reconnecting to MySQL IDO database '" << GetName() << "'");
+
+ double startTime = Utility::GetTime();
+
+ SetShouldConnect(true);
+
+ bool reconnect = false;
+
+ /* Ensure to close old connections first. */
+ if (GetConnected()) {
+ /* Check if we're really still connected */
+ if (m_Mysql->ping(&m_Connection) == 0)
+ return;
+
+ m_Mysql->close(&m_Connection);
+ SetConnected(false);
+ reconnect = true;
+ }
+
+ Log(LogDebug, "IdoMysqlConnection")
+ << "Reconnect: Clearing ID cache.";
+
+ ClearIDCache();
+
+ String ihost, isocket_path, iuser, ipasswd, idb;
+ String isslKey, isslCert, isslCa, isslCaPath, isslCipher;
+ const char *host, *socket_path, *user , *passwd, *db;
+ const char *sslKey, *sslCert, *sslCa, *sslCaPath, *sslCipher;
+ bool enableSsl;
+ long port;
+
+ ihost = GetHost();
+ isocket_path = GetSocketPath();
+ iuser = GetUser();
+ ipasswd = GetPassword();
+ idb = GetDatabase();
+
+ enableSsl = GetEnableSsl();
+ isslKey = GetSslKey();
+ isslCert = GetSslCert();
+ isslCa = GetSslCa();
+ isslCaPath = GetSslCapath();
+ isslCipher = GetSslCipher();
+
+ host = (!ihost.IsEmpty()) ? ihost.CStr() : nullptr;
+ port = GetPort();
+ socket_path = (!isocket_path.IsEmpty()) ? isocket_path.CStr() : nullptr;
+ user = (!iuser.IsEmpty()) ? iuser.CStr() : nullptr;
+ passwd = (!ipasswd.IsEmpty()) ? ipasswd.CStr() : nullptr;
+ db = (!idb.IsEmpty()) ? idb.CStr() : nullptr;
+
+ sslKey = (!isslKey.IsEmpty()) ? isslKey.CStr() : nullptr;
+ sslCert = (!isslCert.IsEmpty()) ? isslCert.CStr() : nullptr;
+ sslCa = (!isslCa.IsEmpty()) ? isslCa.CStr() : nullptr;
+ sslCaPath = (!isslCaPath.IsEmpty()) ? isslCaPath.CStr() : nullptr;
+ sslCipher = (!isslCipher.IsEmpty()) ? isslCipher.CStr() : nullptr;
+
+ /* connection */
+ if (!m_Mysql->init(&m_Connection)) {
+ Log(LogCritical, "IdoMysqlConnection")
+ << "mysql_init() failed: out of memory";
+
+ BOOST_THROW_EXCEPTION(std::bad_alloc());
+ }
+
+ /* Read "latin1" (here, in the schema and in Icinga Web) as "bytes".
+ Icinga 2 and Icinga Web use byte-strings everywhere and every byte-string is a valid latin1 string.
+ This way the (actually mostly UTF-8) bytes are transferred end-to-end as-is. */
+ m_Mysql->options(&m_Connection, MYSQL_SET_CHARSET_NAME, "latin1");
+
+ if (enableSsl)
+ m_Mysql->ssl_set(&m_Connection, sslKey, sslCert, sslCa, sslCaPath, sslCipher);
+
+ if (!m_Mysql->real_connect(&m_Connection, host, user, passwd, db, port, socket_path, CLIENT_FOUND_ROWS | CLIENT_MULTI_STATEMENTS)) {
+ Log(LogCritical, "IdoMysqlConnection")
+ << "Connection to database '" << db << "' with user '" << user << "' on '" << host << ":" << port
+ << "' " << (enableSsl ? "(SSL enabled) " : "") << "failed: \"" << m_Mysql->error(&m_Connection) << "\"";
+
+ BOOST_THROW_EXCEPTION(std::runtime_error(m_Mysql->error(&m_Connection)));
+ }
+
+ Log(LogNotice, "IdoMysqlConnection")
+ << "Reconnect: '" << GetName() << "' is now connected to database '" << GetDatabase() << "'.";
+
+ SetConnected(true);
+
+ IdoMysqlResult result = Query("SELECT @@global.max_allowed_packet AS max_allowed_packet");
+
+ Dictionary::Ptr row = FetchRow(result);
+
+ if (row)
+ m_MaxPacketSize = row->Get("max_allowed_packet");
+ else
+ m_MaxPacketSize = 64 * 1024;
+
+ DiscardRows(result);
+
+ String dbVersionName = "idoutils";
+ result = Query("SELECT version FROM " + GetTablePrefix() + "dbversion WHERE name='" + Escape(dbVersionName) + "'");
+
+ row = FetchRow(result);
+
+ if (!row) {
+ m_Mysql->close(&m_Connection);
+ SetConnected(false);
+
+ Log(LogCritical, "IdoMysqlConnection", "Schema does not provide any valid version! Verify your schema installation.");
+
+ BOOST_THROW_EXCEPTION(std::runtime_error("Invalid schema."));
+ }
+
+ DiscardRows(result);
+
+ String version = row->Get("version");
+
+ SetSchemaVersion(version);
+
+ if (Utility::CompareVersion(GetCompatSchemaVersion(), version) < 0) {
+ m_Mysql->close(&m_Connection);
+ SetConnected(false);
+
+ Log(LogCritical, "IdoMysqlConnection")
+ << "Schema version '" << version << "' does not match the required version '"
+ << GetCompatSchemaVersion() << "' (or newer)! Please check the upgrade documentation at "
+ << "https://icinga.com/docs/icinga2/latest/doc/16-upgrading-icinga-2/#upgrading-mysql-db";
+
+ BOOST_THROW_EXCEPTION(std::runtime_error("Schema version mismatch."));
+ }
+
+ String instanceName = GetInstanceName();
+
+ result = Query("SELECT instance_id FROM " + GetTablePrefix() + "instances WHERE instance_name = '" + Escape(instanceName) + "'");
+ row = FetchRow(result);
+
+ if (!row) {
+ Query("INSERT INTO " + GetTablePrefix() + "instances (instance_name, instance_description) VALUES ('" + Escape(instanceName) + "', '" + Escape(GetInstanceDescription()) + "')");
+ m_InstanceID = GetLastInsertID();
+ } else {
+ m_InstanceID = DbReference(row->Get("instance_id"));
+ }
+
+ DiscardRows(result);
+
+ Endpoint::Ptr my_endpoint = Endpoint::GetLocalEndpoint();
+
+ /* we have an endpoint in a cluster setup, so decide if we can proceed here */
+ if (my_endpoint && GetHAMode() == HARunOnce) {
+ /* get the current endpoint writing to programstatus table */
+ result = Query("SELECT UNIX_TIMESTAMP(status_update_time) AS status_update_time, endpoint_name FROM " +
+ GetTablePrefix() + "programstatus WHERE instance_id = " + Convert::ToString(m_InstanceID));
+ row = FetchRow(result);
+ DiscardRows(result);
+
+ String endpoint_name;
+
+ if (row)
+ endpoint_name = row->Get("endpoint_name");
+ else
+ Log(LogNotice, "IdoMysqlConnection", "Empty program status table");
+
+ /* if we did not write into the database earlier, another instance is active */
+ if (endpoint_name != my_endpoint->GetName()) {
+ double status_update_time;
+
+ if (row)
+ status_update_time = row->Get("status_update_time");
+ else
+ status_update_time = 0;
+
+ double now = Utility::GetTime();
+
+ double status_update_age = now - status_update_time;
+ double failoverTimeout = GetFailoverTimeout();
+
+ if (status_update_age < failoverTimeout) {
+ Log(LogInformation, "IdoMysqlConnection")
+ << "Last update by endpoint '" << endpoint_name << "' was "
+ << status_update_age << "s ago (< failover timeout of " << failoverTimeout << "s). Retrying.";
+
+ m_Mysql->close(&m_Connection);
+ SetConnected(false);
+ SetShouldConnect(false);
+
+ return;
+ }
+
+ /* activate the IDO only, if we're authoritative in this zone */
+ if (IsPaused()) {
+ Log(LogNotice, "IdoMysqlConnection")
+ << "Local endpoint '" << my_endpoint->GetName() << "' is not authoritative, bailing out.";
+
+ m_Mysql->close(&m_Connection);
+ SetConnected(false);
+
+ return;
+ }
+
+ SetLastFailover(now);
+
+ Log(LogInformation, "IdoMysqlConnection")
+ << "Last update by endpoint '" << endpoint_name << "' was "
+ << status_update_age << "s ago. Taking over '" << GetName() << "' in HA zone '" << Zone::GetLocalZone()->GetName() << "'.";
+ }
+
+ Log(LogNotice, "IdoMysqlConnection", "Enabling IDO connection in HA zone.");
+ }
+
+ Log(LogInformation, "IdoMysqlConnection")
+ << "MySQL IDO instance id: " << static_cast<long>(m_InstanceID) << " (schema version: '" + version + "')";
+
+ /* set session time zone to utc */
+ Query("SET SESSION TIME_ZONE='+00:00'");
+
+ Query("SET SESSION SQL_MODE='NO_AUTO_VALUE_ON_ZERO'");
+
+ Query("BEGIN");
+
+ /* update programstatus table */
+ UpdateProgramStatus();
+
+ /* record connection */
+ Query("INSERT INTO " + GetTablePrefix() + "conninfo " +
+ "(instance_id, connect_time, last_checkin_time, agent_name, agent_version, connect_type, data_start_time) VALUES ("
+ + Convert::ToString(static_cast<long>(m_InstanceID)) + ", NOW(), NOW(), 'icinga2 db_ido_mysql', '" + Escape(Application::GetAppVersion())
+ + "', '" + (reconnect ? "RECONNECT" : "INITIAL") + "', NOW())");
+
+ /* clear config tables for the initial config dump */
+ PrepareDatabase();
+
+ std::ostringstream q1buf;
+ q1buf << "SELECT object_id, objecttype_id, name1, name2, is_active FROM " + GetTablePrefix() + "objects WHERE instance_id = " << static_cast<long>(m_InstanceID);
+ result = Query(q1buf.str());
+
+ std::vector<DbObject::Ptr> activeDbObjs;
+
+ while ((row = FetchRow(result))) {
+ DbType::Ptr dbtype = DbType::GetByID(row->Get("objecttype_id"));
+
+ if (!dbtype)
+ continue;
+
+ DbObject::Ptr dbobj = dbtype->GetOrCreateObjectByName(row->Get("name1"), row->Get("name2"));
+ SetObjectID(dbobj, DbReference(row->Get("object_id")));
+ bool active = row->Get("is_active");
+ SetObjectActive(dbobj, active);
+
+ if (active)
+ activeDbObjs.emplace_back(std::move(dbobj));
+ }
+
+ SetIDCacheValid(true);
+
+ EnableActiveChangedHandler();
+
+ for (const DbObject::Ptr& dbobj : activeDbObjs) {
+ if (dbobj->GetObject())
+ continue;
+
+ Log(LogNotice, "IdoMysqlConnection")
+ << "Deactivate deleted object name1: '" << dbobj->GetName1()
+ << "' name2: '" << dbobj->GetName2() + "'.";
+ DeactivateObject(dbobj);
+ }
+
+ UpdateAllObjects();
+
+#ifdef I2_DEBUG /* I2_DEBUG */
+ Log(LogDebug, "IdoMysqlConnection")
+ << "Scheduling session table clear and finish connect task.";
+#endif /* I2_DEBUG */
+
+ m_QueryQueue.Enqueue([this]() { ClearTablesBySession(); }, PriorityNormal);
+
+ m_QueryQueue.Enqueue([this, startTime]() { FinishConnect(startTime); }, PriorityNormal);
+}
+
+void IdoMysqlConnection::FinishConnect(double startTime)
+{
+ AssertOnWorkQueue();
+
+ if (!GetConnected() || IsPaused())
+ return;
+
+ FinishAsyncQueries();
+
+ Log(LogInformation, "IdoMysqlConnection")
+ << "Finished reconnecting to '" << GetName() << "' database '" << GetDatabase() << "' in "
+ << std::setw(2) << Utility::GetTime() - startTime << " second(s).";
+
+ Query("COMMIT");
+ Query("BEGIN");
+}
+
+void IdoMysqlConnection::ClearTablesBySession()
+{
+ /* delete all comments and downtimes without current session token */
+ ClearTableBySession("comments");
+ ClearTableBySession("scheduleddowntime");
+}
+
+void IdoMysqlConnection::ClearTableBySession(const String& table)
+{
+ Query("DELETE FROM " + GetTablePrefix() + table + " WHERE instance_id = " +
+ Convert::ToString(static_cast<long>(m_InstanceID)) + " AND session_token <> " +
+ Convert::ToString(GetSessionToken()));
+}
+
+void IdoMysqlConnection::AsyncQuery(const String& query, const std::function<void (const IdoMysqlResult&)>& callback)
+{
+ AssertOnWorkQueue();
+
+ IdoAsyncQuery aq;
+ aq.Query = query;
+ /* XXX: Important: The callback must not immediately execute a query, but enqueue it!
+ * See https://github.com/Icinga/icinga2/issues/4603 for details.
+ */
+ aq.Callback = callback;
+ m_AsyncQueries.emplace_back(std::move(aq));
+}
+
+void IdoMysqlConnection::FinishAsyncQueries()
+{
+ std::vector<IdoAsyncQuery> queries;
+ m_AsyncQueries.swap(queries);
+
+ std::vector<IdoAsyncQuery>::size_type offset = 0;
+
+ // This will be executed if there is a problem with executing the queries,
+ // at which point this function throws an exception and the queries should
+ // not be listed as still pending in the queue.
+ Defer decreaseQueries ([this, &offset, &queries]() {
+ auto lostQueries = queries.size() - offset;
+
+ if (lostQueries > 0) {
+ DecreasePendingQueries(lostQueries);
+ }
+ });
+
+ while (offset < queries.size()) {
+ std::ostringstream querybuf;
+
+ std::vector<IdoAsyncQuery>::size_type count = 0;
+ size_t num_bytes = 0;
+
+ Defer decreaseQueries ([this, &offset, &count]() {
+ offset += count;
+ DecreasePendingQueries(count);
+ m_UncommittedAsyncQueries += count;
+ });
+
+ for (std::vector<IdoAsyncQuery>::size_type i = offset; i < queries.size(); i++) {
+ const IdoAsyncQuery& aq = queries[i];
+
+ size_t size_query = aq.Query.GetLength() + 1;
+
+ if (count > 0) {
+ if (num_bytes + size_query > m_MaxPacketSize - 512)
+ break;
+
+ querybuf << ";";
+ }
+
+ IncreaseQueryCount();
+ count++;
+
+ Log(LogDebug, "IdoMysqlConnection")
+ << "Query: " << aq.Query;
+
+ querybuf << aq.Query;
+ num_bytes += size_query;
+ }
+
+ String query = querybuf.str();
+
+ if (m_Mysql->query(&m_Connection, query.CStr()) != 0) {
+ std::ostringstream msgbuf;
+ String message = m_Mysql->error(&m_Connection);
+ msgbuf << "Error \"" << message << "\" when executing query \"" << query << "\"";
+ Log(LogCritical, "IdoMysqlConnection", msgbuf.str());
+
+ BOOST_THROW_EXCEPTION(
+ database_error()
+ << errinfo_message(m_Mysql->error(&m_Connection))
+ << errinfo_database_query(query)
+ );
+ }
+
+ for (std::vector<IdoAsyncQuery>::size_type i = offset; i < offset + count; i++) {
+ const IdoAsyncQuery& aq = queries[i];
+
+ MYSQL_RES *result = m_Mysql->store_result(&m_Connection);
+
+ m_AffectedRows = m_Mysql->affected_rows(&m_Connection);
+
+ IdoMysqlResult iresult;
+
+ if (!result) {
+ if (m_Mysql->field_count(&m_Connection) > 0) {
+ std::ostringstream msgbuf;
+ String message = m_Mysql->error(&m_Connection);
+ msgbuf << "Error \"" << message << "\" when executing query \"" << aq.Query << "\"";
+ Log(LogCritical, "IdoMysqlConnection", msgbuf.str());
+
+ BOOST_THROW_EXCEPTION(
+ database_error()
+ << errinfo_message(m_Mysql->error(&m_Connection))
+ << errinfo_database_query(query)
+ );
+ }
+ } else
+ iresult = IdoMysqlResult(result, [this](MYSQL_RES* result) { m_Mysql->free_result(result); });
+
+ if (aq.Callback)
+ aq.Callback(iresult);
+
+ if (m_Mysql->next_result(&m_Connection) > 0) {
+ std::ostringstream msgbuf;
+ String message = m_Mysql->error(&m_Connection);
+ msgbuf << "Error \"" << message << "\" when executing query \"" << query << "\"";
+ Log(LogCritical, "IdoMysqlConnection", msgbuf.str());
+
+ BOOST_THROW_EXCEPTION(
+ database_error()
+ << errinfo_message(m_Mysql->error(&m_Connection))
+ << errinfo_database_query(query)
+ );
+ }
+ }
+ }
+
+ if (m_UncommittedAsyncQueries > 25000) {
+ m_UncommittedAsyncQueries = 0;
+
+ Query("COMMIT");
+ Query("BEGIN");
+ }
+}
+
+IdoMysqlResult IdoMysqlConnection::Query(const String& query)
+{
+ AssertOnWorkQueue();
+
+ IncreasePendingQueries(1);
+ Defer decreaseQueries ([this]() { DecreasePendingQueries(1); });
+
+ /* finish all async queries to maintain the right order for queries */
+ FinishAsyncQueries();
+
+ Log(LogDebug, "IdoMysqlConnection")
+ << "Query: " << query;
+
+ IncreaseQueryCount();
+
+ if (m_Mysql->query(&m_Connection, query.CStr()) != 0) {
+ std::ostringstream msgbuf;
+ String message = m_Mysql->error(&m_Connection);
+ msgbuf << "Error \"" << message << "\" when executing query \"" << query << "\"";
+ Log(LogCritical, "IdoMysqlConnection", msgbuf.str());
+
+ BOOST_THROW_EXCEPTION(
+ database_error()
+ << errinfo_message(m_Mysql->error(&m_Connection))
+ << errinfo_database_query(query)
+ );
+ }
+
+ MYSQL_RES *result = m_Mysql->store_result(&m_Connection);
+
+ m_AffectedRows = m_Mysql->affected_rows(&m_Connection);
+
+ if (!result) {
+ if (m_Mysql->field_count(&m_Connection) > 0) {
+ std::ostringstream msgbuf;
+ String message = m_Mysql->error(&m_Connection);
+ msgbuf << "Error \"" << message << "\" when executing query \"" << query << "\"";
+ Log(LogCritical, "IdoMysqlConnection", msgbuf.str());
+
+ BOOST_THROW_EXCEPTION(
+ database_error()
+ << errinfo_message(m_Mysql->error(&m_Connection))
+ << errinfo_database_query(query)
+ );
+ }
+
+ return IdoMysqlResult();
+ }
+
+ return IdoMysqlResult(result, [this](MYSQL_RES* result) { m_Mysql->free_result(result); });
+}
+
+DbReference IdoMysqlConnection::GetLastInsertID()
+{
+ AssertOnWorkQueue();
+
+ return {static_cast<long>(m_Mysql->insert_id(&m_Connection))};
+}
+
+int IdoMysqlConnection::GetAffectedRows()
+{
+ AssertOnWorkQueue();
+
+ return m_AffectedRows;
+}
+
+String IdoMysqlConnection::Escape(const String& s)
+{
+ AssertOnWorkQueue();
+
+ String utf8s = Utility::ValidateUTF8(s);
+
+ size_t length = utf8s.GetLength();
+ auto *to = new char[utf8s.GetLength() * 2 + 1];
+
+ m_Mysql->real_escape_string(&m_Connection, to, utf8s.CStr(), length);
+
+ String result = String(to);
+
+ delete [] to;
+
+ return result;
+}
+
+Dictionary::Ptr IdoMysqlConnection::FetchRow(const IdoMysqlResult& result)
+{
+ AssertOnWorkQueue();
+
+ MYSQL_ROW row;
+ MYSQL_FIELD *field;
+ unsigned long *lengths, i;
+
+ row = m_Mysql->fetch_row(result.get());
+
+ if (!row)
+ return nullptr;
+
+ lengths = m_Mysql->fetch_lengths(result.get());
+
+ if (!lengths)
+ return nullptr;
+
+ Dictionary::Ptr dict = new Dictionary();
+
+ m_Mysql->field_seek(result.get(), 0);
+ for (field = m_Mysql->fetch_field(result.get()), i = 0; field; field = m_Mysql->fetch_field(result.get()), i++)
+ dict->Set(field->name, String(row[i], row[i] + lengths[i]));
+
+ return dict;
+}
+
+void IdoMysqlConnection::DiscardRows(const IdoMysqlResult& result)
+{
+ Dictionary::Ptr row;
+
+ while ((row = FetchRow(result)))
+ ; /* empty loop body */
+}
+
+void IdoMysqlConnection::ActivateObject(const DbObject::Ptr& dbobj)
+{
+ if (IsPaused())
+ return;
+
+#ifdef I2_DEBUG /* I2_DEBUG */
+ Log(LogDebug, "IdoMysqlConnection")
+ << "Scheduling object activation task for '" << dbobj->GetName1() << "!" << dbobj->GetName2() << "'.";
+#endif /* I2_DEBUG */
+
+ m_QueryQueue.Enqueue([this, dbobj]() { InternalActivateObject(dbobj); }, PriorityNormal);
+}
+
+void IdoMysqlConnection::InternalActivateObject(const DbObject::Ptr& dbobj)
+{
+ AssertOnWorkQueue();
+
+ if (IsPaused())
+ return;
+
+ if (!GetConnected())
+ return;
+
+ DbReference dbref = GetObjectID(dbobj);
+ std::ostringstream qbuf;
+
+ if (!dbref.IsValid()) {
+ if (!dbobj->GetName2().IsEmpty()) {
+ qbuf << "INSERT INTO " + GetTablePrefix() + "objects (instance_id, objecttype_id, name1, name2, is_active) VALUES ("
+ << static_cast<long>(m_InstanceID) << ", " << dbobj->GetType()->GetTypeID() << ", "
+ << "'" << Escape(dbobj->GetName1()) << "', '" << Escape(dbobj->GetName2()) << "', 1)";
+ } else {
+ qbuf << "INSERT INTO " + GetTablePrefix() + "objects (instance_id, objecttype_id, name1, is_active) VALUES ("
+ << static_cast<long>(m_InstanceID) << ", " << dbobj->GetType()->GetTypeID() << ", "
+ << "'" << Escape(dbobj->GetName1()) << "', 1)";
+ }
+
+ Query(qbuf.str());
+ SetObjectID(dbobj, GetLastInsertID());
+ } else {
+ qbuf << "UPDATE " + GetTablePrefix() + "objects SET is_active = 1 WHERE object_id = " << static_cast<long>(dbref);
+ IncreasePendingQueries(1);
+ AsyncQuery(qbuf.str());
+ }
+}
+
+void IdoMysqlConnection::DeactivateObject(const DbObject::Ptr& dbobj)
+{
+ if (IsPaused())
+ return;
+
+#ifdef I2_DEBUG /* I2_DEBUG */
+ Log(LogDebug, "IdoMysqlConnection")
+ << "Scheduling object deactivation task for '" << dbobj->GetName1() << "!" << dbobj->GetName2() << "'.";
+#endif /* I2_DEBUG */
+
+ m_QueryQueue.Enqueue([this, dbobj]() { InternalDeactivateObject(dbobj); }, PriorityNormal);
+}
+
+void IdoMysqlConnection::InternalDeactivateObject(const DbObject::Ptr& dbobj)
+{
+ AssertOnWorkQueue();
+
+ if (IsPaused())
+ return;
+
+ if (!GetConnected())
+ return;
+
+ DbReference dbref = GetObjectID(dbobj);
+
+ if (!dbref.IsValid())
+ return;
+
+ std::ostringstream qbuf;
+ qbuf << "UPDATE " + GetTablePrefix() + "objects SET is_active = 0 WHERE object_id = " << static_cast<long>(dbref);
+ IncreasePendingQueries(1);
+ AsyncQuery(qbuf.str());
+
+ /* Note that we're _NOT_ clearing the db refs via SetReference/SetConfigUpdate/SetStatusUpdate
+ * because the object is still in the database. */
+
+ SetObjectActive(dbobj, false);
+}
+
+bool IdoMysqlConnection::FieldToEscapedString(const String& key, const Value& value, Value *result)
+{
+ if (key == "instance_id") {
+ *result = static_cast<long>(m_InstanceID);
+ return true;
+ } else if (key == "session_token") {
+ *result = GetSessionToken();
+ return true;
+ }
+
+ Value rawvalue = DbValue::ExtractValue(value);
+
+ if (rawvalue.GetType() == ValueEmpty) {
+ *result = "NULL";
+ } else if (rawvalue.IsObjectType<ConfigObject>()) {
+ DbObject::Ptr dbobjcol = DbObject::GetOrCreateByObject(rawvalue);
+
+ if (!dbobjcol) {
+ *result = 0;
+ return true;
+ }
+
+ if (!IsIDCacheValid())
+ return false;
+
+ DbReference dbrefcol;
+
+ if (DbValue::IsObjectInsertID(value)) {
+ dbrefcol = GetInsertID(dbobjcol);
+
+ if (!dbrefcol.IsValid())
+ return false;
+ } else {
+ dbrefcol = GetObjectID(dbobjcol);
+
+ if (!dbrefcol.IsValid()) {
+ InternalActivateObject(dbobjcol);
+
+ dbrefcol = GetObjectID(dbobjcol);
+
+ if (!dbrefcol.IsValid())
+ return false;
+ }
+ }
+
+ *result = static_cast<long>(dbrefcol);
+ } else if (DbValue::IsTimestamp(value)) {
+ long ts = rawvalue;
+ std::ostringstream msgbuf;
+ msgbuf << "FROM_UNIXTIME(" << ts << ")";
+ *result = Value(msgbuf.str());
+ } else if (DbValue::IsObjectInsertID(value)) {
+ auto id = static_cast<long>(rawvalue);
+
+ if (id <= 0)
+ return false;
+
+ *result = id;
+ return true;
+ } else {
+ Value fvalue;
+
+ if (rawvalue.IsBoolean())
+ fvalue = Convert::ToLong(rawvalue);
+ else
+ fvalue = rawvalue;
+
+ *result = "'" + Escape(fvalue) + "'";
+ }
+
+ return true;
+}
+
+void IdoMysqlConnection::ExecuteQuery(const DbQuery& query)
+{
+ if (IsPaused() && GetPauseCalled())
+ return;
+
+ ASSERT(query.Category != DbCatInvalid);
+
+#ifdef I2_DEBUG /* I2_DEBUG */
+ Log(LogDebug, "IdoMysqlConnection")
+ << "Scheduling execute query task, type " << query.Type << ", table '" << query.Table << "'.";
+#endif /* I2_DEBUG */
+
+ IncreasePendingQueries(1);
+ m_QueryQueue.Enqueue([this, query]() { InternalExecuteQuery(query, -1); }, query.Priority, true);
+}
+
+void IdoMysqlConnection::ExecuteMultipleQueries(const std::vector<DbQuery>& queries)
+{
+ if (IsPaused())
+ return;
+
+ if (queries.empty())
+ return;
+
+#ifdef I2_DEBUG /* I2_DEBUG */
+ Log(LogDebug, "IdoMysqlConnection")
+ << "Scheduling multiple execute query task, type " << queries[0].Type << ", table '" << queries[0].Table << "'.";
+#endif /* I2_DEBUG */
+
+ IncreasePendingQueries(queries.size());
+ m_QueryQueue.Enqueue([this, queries]() { InternalExecuteMultipleQueries(queries); }, queries[0].Priority, true);
+}
+
+bool IdoMysqlConnection::CanExecuteQuery(const DbQuery& query)
+{
+ if (query.Object && !IsIDCacheValid())
+ return false;
+
+ if (query.WhereCriteria) {
+ ObjectLock olock(query.WhereCriteria);
+ Value value;
+
+ for (const Dictionary::Pair& kv : query.WhereCriteria) {
+ if (!FieldToEscapedString(kv.first, kv.second, &value))
+ return false;
+ }
+ }
+
+ if (query.Fields) {
+ ObjectLock olock(query.Fields);
+
+ for (const Dictionary::Pair& kv : query.Fields) {
+ Value value;
+
+ if (!FieldToEscapedString(kv.first, kv.second, &value))
+ return false;
+ }
+ }
+
+ return true;
+}
+
+void IdoMysqlConnection::InternalExecuteMultipleQueries(const std::vector<DbQuery>& queries)
+{
+ AssertOnWorkQueue();
+
+ if (IsPaused()) {
+ DecreasePendingQueries(queries.size());
+ return;
+ }
+
+ if (!GetConnected()) {
+ DecreasePendingQueries(queries.size());
+ return;
+ }
+
+
+ for (const DbQuery& query : queries) {
+ ASSERT(query.Type == DbQueryNewTransaction || query.Category != DbCatInvalid);
+
+ if (!CanExecuteQuery(query)) {
+
+#ifdef I2_DEBUG /* I2_DEBUG */
+ Log(LogDebug, "IdoMysqlConnection")
+ << "Scheduling multiple execute query task again: Cannot execute query now. Type '"
+ << query.Type << "', table '" << query.Table << "', queue size: '" << GetPendingQueryCount() << "'.";
+#endif /* I2_DEBUG */
+
+ m_QueryQueue.Enqueue([this, queries]() { InternalExecuteMultipleQueries(queries); }, query.Priority);
+ return;
+ }
+ }
+
+ for (const DbQuery& query : queries) {
+ InternalExecuteQuery(query);
+ }
+}
+
+void IdoMysqlConnection::InternalExecuteQuery(const DbQuery& query, int typeOverride)
+{
+ AssertOnWorkQueue();
+
+ if (IsPaused() && GetPauseCalled()) {
+ DecreasePendingQueries(1);
+ return;
+ }
+
+ if (!GetConnected()) {
+ DecreasePendingQueries(1);
+ return;
+ }
+
+ if (query.Type == DbQueryNewTransaction) {
+ DecreasePendingQueries(1);
+ InternalNewTransaction();
+ return;
+ }
+
+ /* check whether we're allowed to execute the query first */
+ if (GetCategoryFilter() != DbCatEverything && (query.Category & GetCategoryFilter()) == 0) {
+ DecreasePendingQueries(1);
+ return;
+ }
+
+ if (query.Object && query.Object->GetObject()->GetExtension("agent_check").ToBool()) {
+ DecreasePendingQueries(1);
+ return;
+ }
+
+ /* check if there are missing object/insert ids and re-enqueue the query */
+ if (!CanExecuteQuery(query)) {
+
+#ifdef I2_DEBUG /* I2_DEBUG */
+ Log(LogDebug, "IdoMysqlConnection")
+ << "Scheduling execute query task again: Cannot execute query now. Type '"
+ << typeOverride << "', table '" << query.Table << "', queue size: '" << GetPendingQueryCount() << "'.";
+#endif /* I2_DEBUG */
+
+ m_QueryQueue.Enqueue([this, query, typeOverride]() { InternalExecuteQuery(query, typeOverride); }, query.Priority);
+ return;
+ }
+
+ std::ostringstream qbuf, where;
+ int type;
+
+ if (query.WhereCriteria) {
+ where << " WHERE ";
+
+ ObjectLock olock(query.WhereCriteria);
+ Value value;
+ bool first = true;
+
+ for (const Dictionary::Pair& kv : query.WhereCriteria) {
+ if (!FieldToEscapedString(kv.first, kv.second, &value)) {
+
+#ifdef I2_DEBUG /* I2_DEBUG */
+ Log(LogDebug, "IdoMysqlConnection")
+ << "Scheduling execute query task again: Cannot execute query now. Type '"
+ << typeOverride << "', table '" << query.Table << "', queue size: '" << GetPendingQueryCount() << "'.";
+#endif /* I2_DEBUG */
+
+ m_QueryQueue.Enqueue([this, query]() { InternalExecuteQuery(query, -1); }, query.Priority);
+ return;
+ }
+
+ if (!first)
+ where << " AND ";
+
+ where << kv.first << " = " << value;
+
+ if (first)
+ first = false;
+ }
+ }
+
+ type = (typeOverride != -1) ? typeOverride : query.Type;
+
+ bool upsert = false;
+
+ if ((type & DbQueryInsert) && (type & DbQueryUpdate)) {
+ bool hasid = false;
+
+ if (query.Object) {
+ if (query.ConfigUpdate)
+ hasid = GetConfigUpdate(query.Object);
+ else if (query.StatusUpdate)
+ hasid = GetStatusUpdate(query.Object);
+ }
+
+ if (!hasid)
+ upsert = true;
+
+ type = DbQueryUpdate;
+ }
+
+ if ((type & DbQueryInsert) && (type & DbQueryDelete)) {
+ std::ostringstream qdel;
+ qdel << "DELETE FROM " << GetTablePrefix() << query.Table << where.str();
+ IncreasePendingQueries(1);
+ AsyncQuery(qdel.str());
+
+ type = DbQueryInsert;
+ }
+
+ switch (type) {
+ case DbQueryInsert:
+ qbuf << "INSERT INTO " << GetTablePrefix() << query.Table;
+ break;
+ case DbQueryUpdate:
+ qbuf << "UPDATE " << GetTablePrefix() << query.Table << " SET";
+ break;
+ case DbQueryDelete:
+ qbuf << "DELETE FROM " << GetTablePrefix() << query.Table;
+ break;
+ default:
+ VERIFY(!"Invalid query type.");
+ }
+
+ if (type == DbQueryInsert || type == DbQueryUpdate) {
+ std::ostringstream colbuf, valbuf;
+
+ if (type == DbQueryUpdate && query.Fields->GetLength() == 0)
+ return;
+
+ ObjectLock olock(query.Fields);
+
+ bool first = true;
+ for (const Dictionary::Pair& kv : query.Fields) {
+ Value value;
+
+ if (!FieldToEscapedString(kv.first, kv.second, &value)) {
+
+#ifdef I2_DEBUG /* I2_DEBUG */
+ Log(LogDebug, "IdoMysqlConnection")
+ << "Scheduling execute query task again: Cannot extract required INSERT/UPDATE fields, key '"
+ << kv.first << "', val '" << kv.second << "', type " << typeOverride << ", table '" << query.Table << "'.";
+#endif /* I2_DEBUG */
+
+ m_QueryQueue.Enqueue([this, query]() { InternalExecuteQuery(query, -1); }, query.Priority);
+ return;
+ }
+
+ if (type == DbQueryInsert) {
+ if (!first) {
+ colbuf << ", ";
+ valbuf << ", ";
+ }
+
+ colbuf << kv.first;
+ valbuf << value;
+ } else {
+ if (!first)
+ qbuf << ", ";
+
+ qbuf << " " << kv.first << " = " << value;
+ }
+
+ if (first)
+ first = false;
+ }
+
+ if (type == DbQueryInsert)
+ qbuf << " (" << colbuf.str() << ") VALUES (" << valbuf.str() << ")";
+ }
+
+ if (type != DbQueryInsert)
+ qbuf << where.str();
+
+ AsyncQuery(qbuf.str(), [this, query, type, upsert](const IdoMysqlResult&) { FinishExecuteQuery(query, type, upsert); });
+}
+
+void IdoMysqlConnection::FinishExecuteQuery(const DbQuery& query, int type, bool upsert)
+{
+ if (upsert && GetAffectedRows() == 0) {
+
+#ifdef I2_DEBUG /* I2_DEBUG */
+ Log(LogDebug, "IdoMysqlConnection")
+ << "Rescheduling DELETE/INSERT query: Upsert UPDATE did not affect rows, type " << type << ", table '" << query.Table << "'.";
+#endif /* I2_DEBUG */
+
+ IncreasePendingQueries(1);
+ m_QueryQueue.Enqueue([this, query]() { InternalExecuteQuery(query, DbQueryDelete | DbQueryInsert); }, query.Priority);
+
+ return;
+ }
+
+ if (type == DbQueryInsert && query.Object) {
+ if (query.ConfigUpdate) {
+ SetInsertID(query.Object, GetLastInsertID());
+ SetConfigUpdate(query.Object, true);
+ } else if (query.StatusUpdate)
+ SetStatusUpdate(query.Object, true);
+ }
+
+ if (type == DbQueryInsert && query.Table == "notifications" && query.NotificationInsertID)
+ query.NotificationInsertID->SetValue(static_cast<long>(GetLastInsertID()));
+}
+
+void IdoMysqlConnection::CleanUpExecuteQuery(const String& table, const String& time_column, double max_age)
+{
+ if (IsPaused())
+ return;
+
+#ifdef I2_DEBUG /* I2_DEBUG */
+ Log(LogDebug, "IdoMysqlConnection")
+ << "Rescheduling cleanup query for table '" << table << "' and column '"
+ << time_column << "'. max_age is set to '" << max_age << "'.";
+#endif /* I2_DEBUG */
+
+ IncreasePendingQueries(1);
+ m_QueryQueue.Enqueue([this, table, time_column, max_age]() { InternalCleanUpExecuteQuery(table, time_column, max_age); }, PriorityLow, true);
+}
+
+void IdoMysqlConnection::InternalCleanUpExecuteQuery(const String& table, const String& time_column, double max_age)
+{
+ AssertOnWorkQueue();
+
+ if (IsPaused()) {
+ DecreasePendingQueries(1);
+ return;
+ }
+
+ if (!GetConnected()) {
+ DecreasePendingQueries(1);
+ return;
+ }
+
+ AsyncQuery("DELETE FROM " + GetTablePrefix() + table + " WHERE instance_id = " +
+ Convert::ToString(static_cast<long>(m_InstanceID)) + " AND " + time_column +
+ " < FROM_UNIXTIME(" + Convert::ToString(static_cast<long>(max_age)) + ")");
+}
+
+void IdoMysqlConnection::FillIDCache(const DbType::Ptr& type)
+{
+ String query = "SELECT " + type->GetIDColumn() + " AS object_id, " + type->GetTable() + "_id, config_hash FROM " + GetTablePrefix() + type->GetTable() + "s";
+ IdoMysqlResult result = Query(query);
+
+ Dictionary::Ptr row;
+
+ while ((row = FetchRow(result))) {
+ DbReference dbref(row->Get("object_id"));
+ SetInsertID(type, dbref, DbReference(row->Get(type->GetTable() + "_id")));
+ SetConfigHash(type, dbref, row->Get("config_hash"));
+ }
+}
+
+int IdoMysqlConnection::GetPendingQueryCount() const
+{
+ return m_QueryQueue.GetLength();
+}