diff options
Diffstat (limited to 'ml/ml.cc')
-rw-r--r-- | ml/ml.cc | 128 |
1 files changed, 120 insertions, 8 deletions
@@ -9,6 +9,8 @@ #include "ad_charts.h" #include "database/sqlite/sqlite3.h" +#define ML_METADATA_VERSION 2 + #define WORKER_TRAIN_QUEUE_POP 0 #define WORKER_TRAIN_ACQUIRE_DIMENSION 1 #define WORKER_TRAIN_QUERY 2 @@ -436,6 +438,10 @@ const char *db_models_delete = "DELETE FROM models " "WHERE dim_id = @dim_id AND before < @before;"; +const char *db_models_prune = + "DELETE FROM models " + "WHERE after < @after LIMIT @n;"; + static int ml_dimension_add_model(const uuid_t *metric_uuid, const ml_kmeans_t *km) { @@ -563,6 +569,58 @@ bind_fail: return rc; } +static int +ml_prune_old_models(size_t num_models_to_prune) +{ + static __thread sqlite3_stmt *res = NULL; + int rc = 0; + int param = 0; + + if (unlikely(!db)) { + error_report("Database has not been initialized"); + return 1; + } + + if (unlikely(!res)) { + rc = prepare_statement(db, db_models_prune, &res); + if (unlikely(rc != SQLITE_OK)) { + error_report("Failed to prepare statement to prune models, rc = %d", rc); + return rc; + } + } + + int after = (int) (now_realtime_sec() - Cfg.delete_models_older_than); + + rc = sqlite3_bind_int(res, ++param, after); + if (unlikely(rc != SQLITE_OK)) + goto bind_fail; + + rc = sqlite3_bind_int(res, ++param, num_models_to_prune); + if (unlikely(rc != SQLITE_OK)) + goto bind_fail; + + rc = execute_insert(res); + if (unlikely(rc != SQLITE_DONE)) { + error_report("Failed to prune old models, rc = %d", rc); + return rc; + } + + rc = sqlite3_reset(res); + if (unlikely(rc != SQLITE_OK)) { + error_report("Failed to reset statement when pruning old models, rc = %d", rc); + return rc; + } + + return 0; + +bind_fail: + error_report("Failed to bind parameter %d to prune old models, rc = %d", param, rc); + rc = sqlite3_reset(res); + if (unlikely(rc != SQLITE_OK)) + error_report("Failed to reset statement to prune old models, rc = %d", rc); + return rc; +} + int ml_dimension_load_models(RRDDIM *rd) { ml_dimension_t *dim = (ml_dimension_t *) rd->ml_dimension; if (!dim) @@ -1026,6 +1084,21 @@ ml_host_detect_once(ml_host_t *host) host->mls.num_anomalous_dimensions += chart_mls.num_anomalous_dimensions; host->mls.num_normal_dimensions += chart_mls.num_normal_dimensions; + + STRING *key = rs->parts.type; + auto &um = host->type_anomaly_rate; + auto it = um.find(key); + if (it == um.end()) { + um[key] = ml_type_anomaly_rate_t { + .rd = NULL, + .normal_dimensions = 0, + .anomalous_dimensions = 0 + }; + it = um.find(key); + } + + it->second.anomalous_dimensions += chart_mls.num_anomalous_dimensions; + it->second.normal_dimensions += chart_mls.num_normal_dimensions; } rrdset_foreach_done(rsp); @@ -1039,6 +1112,15 @@ ml_host_detect_once(ml_host_t *host) netdata_mutex_unlock(&host->mutex); } else { host->host_anomaly_rate = 0.0; + + auto &um = host->type_anomaly_rate; + for (auto &entry: um) { + entry.second = ml_type_anomaly_rate_t { + .rd = NULL, + .normal_dimensions = 0, + .anomalous_dimensions = 0 + }; + } } worker_is_busy(WORKER_JOB_DETECTION_DIM_CHART); @@ -1072,7 +1154,7 @@ ml_acquired_dimension_get(char *machine_guid, STRING *chart_id, STRING *dimensio acq_rs = rrdset_find_and_acquire(rh, string2str(chart_id)); if (acq_rs) { RRDSET *rs = rrdset_acquired_to_rrdset(acq_rs); - if (rs && !rrdset_flag_check(rs, RRDSET_FLAG_ARCHIVED | RRDSET_FLAG_OBSOLETE)) { + if (rs && !rrdset_flag_check(rs, RRDSET_FLAG_OBSOLETE)) { acq_rd = rrddim_find_and_acquire(rs, string2str(dimension_id)); if (acq_rd) { RRDDIM *rd = rrddim_acquired_to_rrddim(acq_rd); @@ -1217,6 +1299,7 @@ void ml_host_new(RRDHOST *rh) host->rh = rh; host->mls = ml_machine_learning_stats_t(); host->host_anomaly_rate = 0.0; + host->anomaly_rate_rs = NULL; static std::atomic<size_t> times_called(0); host->training_queue = Cfg.training_threads[times_called++ % Cfg.num_training_threads].training_queue; @@ -1497,9 +1580,12 @@ bool ml_dimension_is_anomalous(RRDDIM *rd, time_t curr_time, double value, bool } static void ml_flush_pending_models(ml_training_thread_t *training_thread) { - int rc = db_execute(db, "BEGIN TRANSACTION;"); int op_no = 1; + // begin transaction + int rc = db_execute(db, "BEGIN TRANSACTION;"); + + // add/delete models if (!rc) { op_no++; @@ -1512,12 +1598,22 @@ static void ml_flush_pending_models(ml_training_thread_t *training_thread) { } } + // prune old models + if (!rc) { + if ((training_thread->num_db_transactions % 64) == 0) { + rc = ml_prune_old_models(training_thread->num_models_to_prune); + if (!rc) + training_thread->num_models_to_prune = 0; + } + } + + // commit transaction if (!rc) { op_no++; rc = db_execute(db, "COMMIT TRANSACTION;"); } - // try to rollback transaction if we got any failures + // rollback transaction on failure if (rc) { netdata_log_error("Trying to rollback ML transaction because it failed with rc=%d, op_no=%d", rc, op_no); op_no++; @@ -1526,6 +1622,13 @@ static void ml_flush_pending_models(ml_training_thread_t *training_thread) { netdata_log_error("ML transaction rollback failed with rc=%d", rc); } + if (!rc) { + training_thread->num_db_transactions++; + training_thread->num_models_to_prune += training_thread->pending_model_info.size(); + } + + vacuum_database(db, "ML", 0, 0); + training_thread->pending_model_info.clear(); } @@ -1676,15 +1779,24 @@ void ml_init() db = NULL; } + // create table if (db) { - char *err = NULL; - int rc = sqlite3_exec(db, db_models_create_table, NULL, NULL, &err); - if (rc != SQLITE_OK) { - error_report("Failed to create models table (%s, %s)", sqlite3_errstr(rc), err ? err : ""); + int target_version = perform_ml_database_migration(db, ML_METADATA_VERSION); + if (configure_sqlite_database(db, target_version)) { + error_report("Failed to setup ML database"); sqlite3_close(db); - sqlite3_free(err); db = NULL; } + else { + char *err = NULL; + int rc = sqlite3_exec(db, db_models_create_table, NULL, NULL, &err); + if (rc != SQLITE_OK) { + error_report("Failed to create models table (%s, %s)", sqlite3_errstr(rc), err ? err : ""); + sqlite3_close(db); + sqlite3_free(err); + db = NULL; + } + } } } |