diff options
Diffstat (limited to '')
-rw-r--r-- | src/ml/ml.cc (renamed from ml/ml.cc) | 85 |
1 files changed, 46 insertions, 39 deletions
diff --git a/ml/ml.cc b/src/ml/ml.cc index 5f8f5033e..7ecdce418 100644 --- a/ml/ml.cc +++ b/src/ml/ml.cc @@ -1,6 +1,6 @@ // SPDX-License-Identifier: GPL-3.0-or-later -#include <dlib/clustering.h> +#include "dlib/dlib/clustering.h" #include "ml-private.h" @@ -358,7 +358,7 @@ ml_dimension_calculated_numbers(ml_training_thread_t *training_thread, ml_dimens */ struct storage_engine_query_handle handle; - storage_engine_query_init(dim->rd->tiers[0].backend, dim->rd->tiers[0].db_metric_handle, &handle, + storage_engine_query_init(dim->rd->tiers[0].seb, dim->rd->tiers[0].smh, &handle, training_response.query_after_t, training_response.query_before_t, STORAGE_PRIORITY_BEST_EFFORT); @@ -443,7 +443,7 @@ const char *db_models_prune = "WHERE after < @after LIMIT @n;"; static int -ml_dimension_add_model(const uuid_t *metric_uuid, const ml_kmeans_t *km) +ml_dimension_add_model(const nd_uuid_t *metric_uuid, const ml_kmeans_t *km) { static __thread sqlite3_stmt *res = NULL; int param = 0; @@ -520,7 +520,7 @@ bind_fail: } static int -ml_dimension_delete_models(const uuid_t *metric_uuid, time_t before) +ml_dimension_delete_models(const nd_uuid_t *metric_uuid, time_t before) { static __thread sqlite3_stmt *res = NULL; int rc = 0; @@ -1090,20 +1090,24 @@ ml_host_detect_once(ml_host_t *host) host->mls.num_anomalous_dimensions += chart_mls.num_anomalous_dimensions; host->mls.num_normal_dimensions += chart_mls.num_normal_dimensions; - STRING *key = rs->parts.type; - auto &um = host->type_anomaly_rate; - auto it = um.find(key); - if (it == um.end()) { - um[key] = ml_type_anomaly_rate_t { - .rd = NULL, - .normal_dimensions = 0, - .anomalous_dimensions = 0 - }; - it = um.find(key); - } + if (spinlock_trylock_cancelable(&host->type_anomaly_rate_spinlock)) + { + STRING *key = rs->parts.type; + auto &um = host->type_anomaly_rate; + auto it = um.find(key); + if (it == um.end()) { + um[key] = ml_type_anomaly_rate_t { + .rd = NULL, + .normal_dimensions = 0, + .anomalous_dimensions = 0 + }; + it = um.find(key); + } - it->second.anomalous_dimensions += chart_mls.num_anomalous_dimensions; - it->second.normal_dimensions += chart_mls.num_normal_dimensions; + it->second.anomalous_dimensions += chart_mls.num_anomalous_dimensions; + it->second.normal_dimensions += chart_mls.num_normal_dimensions; + spinlock_unlock_cancelable(&host->type_anomaly_rate_spinlock); + } } rrdset_foreach_done(rsp); @@ -1171,7 +1175,7 @@ ml_acquired_dimension_get(char *machine_guid, STRING *chart_id, STRING *dimensio } } - rrd_unlock(); + rrd_rdunlock(); ml_acquired_dimension_t acq_dim = { acq_rh, acq_rs, acq_rd, dim @@ -1216,7 +1220,7 @@ ml_detect_main(void *arg) heartbeat_t hb; heartbeat_init(&hb); - while (!Cfg.detection_stop) { + while (!Cfg.detection_stop && service_running(SERVICE_COLLECTORS)) { worker_is_idle(); heartbeat_next(&hb, USEC_PER_SEC); @@ -1226,9 +1230,12 @@ ml_detect_main(void *arg) if (!rh->ml_host) continue; + if (!service_running(SERVICE_COLLECTORS)) + break; + ml_host_detect_once((ml_host_t *) rh->ml_host); } - rrd_unlock(); + rrd_rdunlock(); if (Cfg.enable_statistics_charts) { // collect and update training thread stats @@ -1262,6 +1269,7 @@ ml_detect_main(void *arg) } } } + Cfg.training_stop = true; return NULL; } @@ -1310,6 +1318,7 @@ void ml_host_new(RRDHOST *rh) host->training_queue = Cfg.training_threads[times_called++ % Cfg.num_training_threads].training_queue; netdata_mutex_init(&host->mutex); + spinlock_init(&host->type_anomaly_rate_spinlock); host->ml_running = true; rh->ml_host = (rrd_ml_host_t *) host; @@ -1416,8 +1425,7 @@ void ml_host_get_info(RRDHOST *rh, BUFFER *wb) buffer_json_member_add_double(wb, "dimension-anomaly-score-threshold", Cfg.dimension_anomaly_score_threshold); - buffer_json_member_add_string(wb, "anomaly-detection-grouping-method", - time_grouping_method2string(Cfg.anomaly_detection_grouping_method)); + buffer_json_member_add_string(wb, "anomaly-detection-grouping-method", time_grouping_id2txt(Cfg.anomaly_detection_grouping_method)); buffer_json_member_add_int64(wb, "anomaly-detection-query-duration", Cfg.anomaly_detection_query_duration); @@ -1787,7 +1795,7 @@ void ml_init() // create table if (db) { int target_version = perform_ml_database_migration(db, ML_METADATA_VERSION); - if (configure_sqlite_database(db, target_version)) { + if (configure_sqlite_database(db, target_version, "ml_config")) { error_report("Failed to setup ML database"); sqlite3_close(db); db = NULL; @@ -1805,13 +1813,17 @@ void ml_init() } } +uint64_t sqlite_get_ml_space(void) +{ + return sqlite_get_db_space(db); +} + void ml_fini() { - if (!Cfg.enable_anomaly_detection) + if (!Cfg.enable_anomaly_detection || !db) return; - int rc = sqlite3_close_v2(db); - if (unlikely(rc != SQLITE_OK)) - error_report("Error %d while closing the SQLite database, %s", rc, sqlite3_errstr(rc)); + sql_close_database(db, "ML"); + db = NULL; } void ml_start_threads() { @@ -1825,12 +1837,14 @@ void ml_start_threads() { char tag[NETDATA_THREAD_TAG_MAX + 1]; snprintfz(tag, NETDATA_THREAD_TAG_MAX, "%s", "PREDICT"); - netdata_thread_create(&Cfg.detection_thread, tag, NETDATA_THREAD_OPTION_JOINABLE, ml_detect_main, NULL); + Cfg.detection_thread = nd_thread_create(tag, NETDATA_THREAD_OPTION_JOINABLE, + ml_detect_main, NULL); for (size_t idx = 0; idx != Cfg.num_training_threads; idx++) { ml_training_thread_t *training_thread = &Cfg.training_threads[idx]; snprintfz(tag, NETDATA_THREAD_TAG_MAX, "TRAIN[%zu]", training_thread->id); - netdata_thread_create(&training_thread->nd_thread, tag, NETDATA_THREAD_OPTION_JOINABLE, ml_train_main, training_thread); + training_thread->nd_thread = nd_thread_create(tag, NETDATA_THREAD_OPTION_JOINABLE, + ml_train_main, training_thread); } } @@ -1845,8 +1859,8 @@ void ml_stop_threads() if (!Cfg.detection_thread) return; - netdata_thread_cancel(Cfg.detection_thread); - netdata_thread_join(Cfg.detection_thread, NULL); + nd_thread_join(Cfg.detection_thread); + Cfg.detection_thread = 0; // signal the training queue of each thread for (size_t idx = 0; idx != Cfg.num_training_threads; idx++) { @@ -1855,18 +1869,11 @@ void ml_stop_threads() ml_queue_signal(training_thread->training_queue); } - // cancel training threads - for (size_t idx = 0; idx != Cfg.num_training_threads; idx++) { - ml_training_thread_t *training_thread = &Cfg.training_threads[idx]; - - netdata_thread_cancel(training_thread->nd_thread); - } - // join training threads for (size_t idx = 0; idx != Cfg.num_training_threads; idx++) { ml_training_thread_t *training_thread = &Cfg.training_threads[idx]; - netdata_thread_join(training_thread->nd_thread, NULL); + nd_thread_join(training_thread->nd_thread); } // clear training thread data |