summaryrefslogtreecommitdiffstats
path: root/src/ml/ml.cc
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--src/ml/ml.cc (renamed from ml/ml.cc)85
1 files changed, 46 insertions, 39 deletions
diff --git a/ml/ml.cc b/src/ml/ml.cc
index 5f8f5033e..7ecdce418 100644
--- a/ml/ml.cc
+++ b/src/ml/ml.cc
@@ -1,6 +1,6 @@
// SPDX-License-Identifier: GPL-3.0-or-later
-#include <dlib/clustering.h>
+#include "dlib/dlib/clustering.h"
#include "ml-private.h"
@@ -358,7 +358,7 @@ ml_dimension_calculated_numbers(ml_training_thread_t *training_thread, ml_dimens
*/
struct storage_engine_query_handle handle;
- storage_engine_query_init(dim->rd->tiers[0].backend, dim->rd->tiers[0].db_metric_handle, &handle,
+ storage_engine_query_init(dim->rd->tiers[0].seb, dim->rd->tiers[0].smh, &handle,
training_response.query_after_t, training_response.query_before_t,
STORAGE_PRIORITY_BEST_EFFORT);
@@ -443,7 +443,7 @@ const char *db_models_prune =
"WHERE after < @after LIMIT @n;";
static int
-ml_dimension_add_model(const uuid_t *metric_uuid, const ml_kmeans_t *km)
+ml_dimension_add_model(const nd_uuid_t *metric_uuid, const ml_kmeans_t *km)
{
static __thread sqlite3_stmt *res = NULL;
int param = 0;
@@ -520,7 +520,7 @@ bind_fail:
}
static int
-ml_dimension_delete_models(const uuid_t *metric_uuid, time_t before)
+ml_dimension_delete_models(const nd_uuid_t *metric_uuid, time_t before)
{
static __thread sqlite3_stmt *res = NULL;
int rc = 0;
@@ -1090,20 +1090,24 @@ ml_host_detect_once(ml_host_t *host)
host->mls.num_anomalous_dimensions += chart_mls.num_anomalous_dimensions;
host->mls.num_normal_dimensions += chart_mls.num_normal_dimensions;
- STRING *key = rs->parts.type;
- auto &um = host->type_anomaly_rate;
- auto it = um.find(key);
- if (it == um.end()) {
- um[key] = ml_type_anomaly_rate_t {
- .rd = NULL,
- .normal_dimensions = 0,
- .anomalous_dimensions = 0
- };
- it = um.find(key);
- }
+ if (spinlock_trylock_cancelable(&host->type_anomaly_rate_spinlock))
+ {
+ STRING *key = rs->parts.type;
+ auto &um = host->type_anomaly_rate;
+ auto it = um.find(key);
+ if (it == um.end()) {
+ um[key] = ml_type_anomaly_rate_t {
+ .rd = NULL,
+ .normal_dimensions = 0,
+ .anomalous_dimensions = 0
+ };
+ it = um.find(key);
+ }
- it->second.anomalous_dimensions += chart_mls.num_anomalous_dimensions;
- it->second.normal_dimensions += chart_mls.num_normal_dimensions;
+ it->second.anomalous_dimensions += chart_mls.num_anomalous_dimensions;
+ it->second.normal_dimensions += chart_mls.num_normal_dimensions;
+ spinlock_unlock_cancelable(&host->type_anomaly_rate_spinlock);
+ }
}
rrdset_foreach_done(rsp);
@@ -1171,7 +1175,7 @@ ml_acquired_dimension_get(char *machine_guid, STRING *chart_id, STRING *dimensio
}
}
- rrd_unlock();
+ rrd_rdunlock();
ml_acquired_dimension_t acq_dim = {
acq_rh, acq_rs, acq_rd, dim
@@ -1216,7 +1220,7 @@ ml_detect_main(void *arg)
heartbeat_t hb;
heartbeat_init(&hb);
- while (!Cfg.detection_stop) {
+ while (!Cfg.detection_stop && service_running(SERVICE_COLLECTORS)) {
worker_is_idle();
heartbeat_next(&hb, USEC_PER_SEC);
@@ -1226,9 +1230,12 @@ ml_detect_main(void *arg)
if (!rh->ml_host)
continue;
+ if (!service_running(SERVICE_COLLECTORS))
+ break;
+
ml_host_detect_once((ml_host_t *) rh->ml_host);
}
- rrd_unlock();
+ rrd_rdunlock();
if (Cfg.enable_statistics_charts) {
// collect and update training thread stats
@@ -1262,6 +1269,7 @@ ml_detect_main(void *arg)
}
}
}
+ Cfg.training_stop = true;
return NULL;
}
@@ -1310,6 +1318,7 @@ void ml_host_new(RRDHOST *rh)
host->training_queue = Cfg.training_threads[times_called++ % Cfg.num_training_threads].training_queue;
netdata_mutex_init(&host->mutex);
+ spinlock_init(&host->type_anomaly_rate_spinlock);
host->ml_running = true;
rh->ml_host = (rrd_ml_host_t *) host;
@@ -1416,8 +1425,7 @@ void ml_host_get_info(RRDHOST *rh, BUFFER *wb)
buffer_json_member_add_double(wb, "dimension-anomaly-score-threshold", Cfg.dimension_anomaly_score_threshold);
- buffer_json_member_add_string(wb, "anomaly-detection-grouping-method",
- time_grouping_method2string(Cfg.anomaly_detection_grouping_method));
+ buffer_json_member_add_string(wb, "anomaly-detection-grouping-method", time_grouping_id2txt(Cfg.anomaly_detection_grouping_method));
buffer_json_member_add_int64(wb, "anomaly-detection-query-duration", Cfg.anomaly_detection_query_duration);
@@ -1787,7 +1795,7 @@ void ml_init()
// create table
if (db) {
int target_version = perform_ml_database_migration(db, ML_METADATA_VERSION);
- if (configure_sqlite_database(db, target_version)) {
+ if (configure_sqlite_database(db, target_version, "ml_config")) {
error_report("Failed to setup ML database");
sqlite3_close(db);
db = NULL;
@@ -1805,13 +1813,17 @@ void ml_init()
}
}
+uint64_t sqlite_get_ml_space(void)
+{
+ return sqlite_get_db_space(db);
+}
+
void ml_fini() {
- if (!Cfg.enable_anomaly_detection)
+ if (!Cfg.enable_anomaly_detection || !db)
return;
- int rc = sqlite3_close_v2(db);
- if (unlikely(rc != SQLITE_OK))
- error_report("Error %d while closing the SQLite database, %s", rc, sqlite3_errstr(rc));
+ sql_close_database(db, "ML");
+ db = NULL;
}
void ml_start_threads() {
@@ -1825,12 +1837,14 @@ void ml_start_threads() {
char tag[NETDATA_THREAD_TAG_MAX + 1];
snprintfz(tag, NETDATA_THREAD_TAG_MAX, "%s", "PREDICT");
- netdata_thread_create(&Cfg.detection_thread, tag, NETDATA_THREAD_OPTION_JOINABLE, ml_detect_main, NULL);
+ Cfg.detection_thread = nd_thread_create(tag, NETDATA_THREAD_OPTION_JOINABLE,
+ ml_detect_main, NULL);
for (size_t idx = 0; idx != Cfg.num_training_threads; idx++) {
ml_training_thread_t *training_thread = &Cfg.training_threads[idx];
snprintfz(tag, NETDATA_THREAD_TAG_MAX, "TRAIN[%zu]", training_thread->id);
- netdata_thread_create(&training_thread->nd_thread, tag, NETDATA_THREAD_OPTION_JOINABLE, ml_train_main, training_thread);
+ training_thread->nd_thread = nd_thread_create(tag, NETDATA_THREAD_OPTION_JOINABLE,
+ ml_train_main, training_thread);
}
}
@@ -1845,8 +1859,8 @@ void ml_stop_threads()
if (!Cfg.detection_thread)
return;
- netdata_thread_cancel(Cfg.detection_thread);
- netdata_thread_join(Cfg.detection_thread, NULL);
+ nd_thread_join(Cfg.detection_thread);
+ Cfg.detection_thread = 0;
// signal the training queue of each thread
for (size_t idx = 0; idx != Cfg.num_training_threads; idx++) {
@@ -1855,18 +1869,11 @@ void ml_stop_threads()
ml_queue_signal(training_thread->training_queue);
}
- // cancel training threads
- for (size_t idx = 0; idx != Cfg.num_training_threads; idx++) {
- ml_training_thread_t *training_thread = &Cfg.training_threads[idx];
-
- netdata_thread_cancel(training_thread->nd_thread);
- }
-
// join training threads
for (size_t idx = 0; idx != Cfg.num_training_threads; idx++) {
ml_training_thread_t *training_thread = &Cfg.training_threads[idx];
- netdata_thread_join(training_thread->nd_thread, NULL);
+ nd_thread_join(training_thread->nd_thread);
}
// clear training thread data