summaryrefslogtreecommitdiffstats
path: root/ml/ml.cc
diff options
context:
space:
mode:
Diffstat (limited to 'ml/ml.cc')
-rw-r--r--ml/ml.cc128
1 files changed, 120 insertions, 8 deletions
diff --git a/ml/ml.cc b/ml/ml.cc
index 75f2e727..0bba2060 100644
--- a/ml/ml.cc
+++ b/ml/ml.cc
@@ -9,6 +9,8 @@
#include "ad_charts.h"
#include "database/sqlite/sqlite3.h"
+#define ML_METADATA_VERSION 2
+
#define WORKER_TRAIN_QUEUE_POP 0
#define WORKER_TRAIN_ACQUIRE_DIMENSION 1
#define WORKER_TRAIN_QUERY 2
@@ -436,6 +438,10 @@ const char *db_models_delete =
"DELETE FROM models "
"WHERE dim_id = @dim_id AND before < @before;";
+const char *db_models_prune =
+ "DELETE FROM models "
+ "WHERE after < @after LIMIT @n;";
+
static int
ml_dimension_add_model(const uuid_t *metric_uuid, const ml_kmeans_t *km)
{
@@ -563,6 +569,58 @@ bind_fail:
return rc;
}
+static int
+ml_prune_old_models(size_t num_models_to_prune)
+{
+ static __thread sqlite3_stmt *res = NULL;
+ int rc = 0;
+ int param = 0;
+
+ if (unlikely(!db)) {
+ error_report("Database has not been initialized");
+ return 1;
+ }
+
+ if (unlikely(!res)) {
+ rc = prepare_statement(db, db_models_prune, &res);
+ if (unlikely(rc != SQLITE_OK)) {
+ error_report("Failed to prepare statement to prune models, rc = %d", rc);
+ return rc;
+ }
+ }
+
+ int after = (int) (now_realtime_sec() - Cfg.delete_models_older_than);
+
+ rc = sqlite3_bind_int(res, ++param, after);
+ if (unlikely(rc != SQLITE_OK))
+ goto bind_fail;
+
+ rc = sqlite3_bind_int(res, ++param, num_models_to_prune);
+ if (unlikely(rc != SQLITE_OK))
+ goto bind_fail;
+
+ rc = execute_insert(res);
+ if (unlikely(rc != SQLITE_DONE)) {
+ error_report("Failed to prune old models, rc = %d", rc);
+ return rc;
+ }
+
+ rc = sqlite3_reset(res);
+ if (unlikely(rc != SQLITE_OK)) {
+ error_report("Failed to reset statement when pruning old models, rc = %d", rc);
+ return rc;
+ }
+
+ return 0;
+
+bind_fail:
+ error_report("Failed to bind parameter %d to prune old models, rc = %d", param, rc);
+ rc = sqlite3_reset(res);
+ if (unlikely(rc != SQLITE_OK))
+ error_report("Failed to reset statement to prune old models, rc = %d", rc);
+ return rc;
+}
+
int ml_dimension_load_models(RRDDIM *rd) {
ml_dimension_t *dim = (ml_dimension_t *) rd->ml_dimension;
if (!dim)
@@ -1026,6 +1084,21 @@ ml_host_detect_once(ml_host_t *host)
host->mls.num_anomalous_dimensions += chart_mls.num_anomalous_dimensions;
host->mls.num_normal_dimensions += chart_mls.num_normal_dimensions;
+
+ STRING *key = rs->parts.type;
+ auto &um = host->type_anomaly_rate;
+ auto it = um.find(key);
+ if (it == um.end()) {
+ um[key] = ml_type_anomaly_rate_t {
+ .rd = NULL,
+ .normal_dimensions = 0,
+ .anomalous_dimensions = 0
+ };
+ it = um.find(key);
+ }
+
+ it->second.anomalous_dimensions += chart_mls.num_anomalous_dimensions;
+ it->second.normal_dimensions += chart_mls.num_normal_dimensions;
}
rrdset_foreach_done(rsp);
@@ -1039,6 +1112,15 @@ ml_host_detect_once(ml_host_t *host)
netdata_mutex_unlock(&host->mutex);
} else {
host->host_anomaly_rate = 0.0;
+
+ auto &um = host->type_anomaly_rate;
+ for (auto &entry: um) {
+ entry.second = ml_type_anomaly_rate_t {
+ .rd = NULL,
+ .normal_dimensions = 0,
+ .anomalous_dimensions = 0
+ };
+ }
}
worker_is_busy(WORKER_JOB_DETECTION_DIM_CHART);
@@ -1072,7 +1154,7 @@ ml_acquired_dimension_get(char *machine_guid, STRING *chart_id, STRING *dimensio
acq_rs = rrdset_find_and_acquire(rh, string2str(chart_id));
if (acq_rs) {
RRDSET *rs = rrdset_acquired_to_rrdset(acq_rs);
- if (rs && !rrdset_flag_check(rs, RRDSET_FLAG_ARCHIVED | RRDSET_FLAG_OBSOLETE)) {
+ if (rs && !rrdset_flag_check(rs, RRDSET_FLAG_OBSOLETE)) {
acq_rd = rrddim_find_and_acquire(rs, string2str(dimension_id));
if (acq_rd) {
RRDDIM *rd = rrddim_acquired_to_rrddim(acq_rd);
@@ -1217,6 +1299,7 @@ void ml_host_new(RRDHOST *rh)
host->rh = rh;
host->mls = ml_machine_learning_stats_t();
host->host_anomaly_rate = 0.0;
+ host->anomaly_rate_rs = NULL;
static std::atomic<size_t> times_called(0);
host->training_queue = Cfg.training_threads[times_called++ % Cfg.num_training_threads].training_queue;
@@ -1497,9 +1580,12 @@ bool ml_dimension_is_anomalous(RRDDIM *rd, time_t curr_time, double value, bool
}
static void ml_flush_pending_models(ml_training_thread_t *training_thread) {
- int rc = db_execute(db, "BEGIN TRANSACTION;");
int op_no = 1;
+ // begin transaction
+ int rc = db_execute(db, "BEGIN TRANSACTION;");
+
+ // add/delete models
if (!rc) {
op_no++;
@@ -1512,12 +1598,22 @@ static void ml_flush_pending_models(ml_training_thread_t *training_thread) {
}
}
+ // prune old models
+ if (!rc) {
+ if ((training_thread->num_db_transactions % 64) == 0) {
+ rc = ml_prune_old_models(training_thread->num_models_to_prune);
+ if (!rc)
+ training_thread->num_models_to_prune = 0;
+ }
+ }
+
+ // commit transaction
if (!rc) {
op_no++;
rc = db_execute(db, "COMMIT TRANSACTION;");
}
- // try to rollback transaction if we got any failures
+ // rollback transaction on failure
if (rc) {
netdata_log_error("Trying to rollback ML transaction because it failed with rc=%d, op_no=%d", rc, op_no);
op_no++;
@@ -1526,6 +1622,13 @@ static void ml_flush_pending_models(ml_training_thread_t *training_thread) {
netdata_log_error("ML transaction rollback failed with rc=%d", rc);
}
+ if (!rc) {
+ training_thread->num_db_transactions++;
+ training_thread->num_models_to_prune += training_thread->pending_model_info.size();
+ }
+
+ vacuum_database(db, "ML", 0, 0);
+
training_thread->pending_model_info.clear();
}
@@ -1676,15 +1779,24 @@ void ml_init()
db = NULL;
}
+ // create table
if (db) {
- char *err = NULL;
- int rc = sqlite3_exec(db, db_models_create_table, NULL, NULL, &err);
- if (rc != SQLITE_OK) {
- error_report("Failed to create models table (%s, %s)", sqlite3_errstr(rc), err ? err : "");
+ int target_version = perform_ml_database_migration(db, ML_METADATA_VERSION);
+ if (configure_sqlite_database(db, target_version)) {
+ error_report("Failed to setup ML database");
sqlite3_close(db);
- sqlite3_free(err);
db = NULL;
}
+ else {
+ char *err = NULL;
+ int rc = sqlite3_exec(db, db_models_create_table, NULL, NULL, &err);
+ if (rc != SQLITE_OK) {
+ error_report("Failed to create models table (%s, %s)", sqlite3_errstr(rc), err ? err : "");
+ sqlite3_close(db);
+ sqlite3_free(err);
+ db = NULL;
+ }
+ }
}
}