summaryrefslogtreecommitdiffstats
path: root/ml
diff options
context:
space:
mode:
Diffstat (limited to 'ml')
-rw-r--r--ml/Config.cc2
-rw-r--r--ml/ad_charts.cc119
-rw-r--r--ml/ml-dummy.c17
-rw-r--r--ml/ml-private.h9
-rw-r--r--ml/ml.cc157
-rw-r--r--ml/ml.h6
6 files changed, 235 insertions, 75 deletions
diff --git a/ml/Config.cc b/ml/Config.cc
index c00d2e8ee..f733bcf7d 100644
--- a/ml/Config.cc
+++ b/ml/Config.cc
@@ -83,7 +83,7 @@ void ml_config_load(ml_config_t *cfg) {
*/
if (min_train_samples >= max_train_samples) {
- error("invalid min/max train samples found (%u >= %u)", min_train_samples, max_train_samples);
+ netdata_log_error("invalid min/max train samples found (%u >= %u)", min_train_samples, max_train_samples);
min_train_samples = 1 * 3600;
max_train_samples = 6 * 3600;
diff --git a/ml/ad_charts.cc b/ml/ad_charts.cc
index bd065cfcc..ca4dca139 100644
--- a/ml/ad_charts.cc
+++ b/ml/ad_charts.cc
@@ -183,6 +183,41 @@ void ml_update_dimensions_chart(ml_host_t *host, const ml_machine_learning_stats
rrdset_done(host->dimensions_rs);
}
+
+ // ML running
+ {
+ if (!host->ml_running_rs) {
+ char id_buf[1024];
+ char name_buf[1024];
+
+ snprintfz(id_buf, 1024, "ml_running_on_%s", localhost->machine_guid);
+ snprintfz(name_buf, 1024, "ml_running_on_%s", rrdhost_hostname(localhost));
+
+ host->ml_running_rs = rrdset_create(
+ host->rh,
+ "anomaly_detection", // type
+ id_buf, // id
+ name_buf, // name
+ "anomaly_detection", // family
+ "anomaly_detection.ml_running", // ctx
+ "ML running", // title
+ "boolean", // units
+ NETDATA_ML_PLUGIN, // plugin
+ NETDATA_ML_MODULE_DETECTION, // module
+ NETDATA_ML_CHART_RUNNING, // priority
+ localhost->rrd_update_every, // update_every
+ RRDSET_TYPE_LINE // chart_type
+ );
+ rrdset_flag_set(host->ml_running_rs, RRDSET_FLAG_ANOMALY_DETECTION);
+
+ host->ml_running_rd =
+ rrddim_add(host->ml_running_rs, "ml_running", NULL, 1, 1, RRD_ALGORITHM_ABSOLUTE);
+ }
+
+ rrddim_set_by_pointer(host->ml_running_rs,
+ host->ml_running_rd, host->ml_running);
+ rrdset_done(host->ml_running_rs);
+ }
}
void ml_update_host_and_detection_rate_charts(ml_host_t *host, collected_number AnomalyRate) {
@@ -260,47 +295,55 @@ void ml_update_host_and_detection_rate_charts(ml_host_t *host, collected_number
/*
* Compute the values of the dimensions based on the host rate chart
*/
- ONEWAYALLOC *OWA = onewayalloc_create(0);
- time_t Now = now_realtime_sec();
- time_t Before = Now - host->rh->rrd_update_every;
- time_t After = Before - Cfg.anomaly_detection_query_duration;
- RRDR_OPTIONS Options = static_cast<RRDR_OPTIONS>(0x00000000);
-
- RRDR *R = rrd2rrdr_legacy(
- OWA,
- host->anomaly_rate_rs,
- 1 /* points wanted */,
- After,
- Before,
- Cfg.anomaly_detection_grouping_method,
- 0 /* resampling time */,
- Options, "anomaly_rate",
- NULL /* group options */,
- 0, /* timeout */
- 0, /* tier */
- QUERY_SOURCE_ML,
- STORAGE_PRIORITY_SYNCHRONOUS
- );
-
- if (R) {
- if (R->d == 1 && R->n == 1 && R->rows == 1) {
- static thread_local bool prev_above_threshold = false;
- bool above_threshold = R->v[0] >= Cfg.host_anomaly_rate_threshold;
- bool new_anomaly_event = above_threshold && !prev_above_threshold;
- prev_above_threshold = above_threshold;
-
- rrddim_set_by_pointer(host->detector_events_rs,
- host->detector_events_above_threshold_rd, above_threshold);
- rrddim_set_by_pointer(host->detector_events_rs,
- host->detector_events_new_anomaly_event_rd, new_anomaly_event);
-
- rrdset_done(host->detector_events_rs);
+ if (host->ml_running) {
+ ONEWAYALLOC *OWA = onewayalloc_create(0);
+ time_t Now = now_realtime_sec();
+ time_t Before = Now - host->rh->rrd_update_every;
+ time_t After = Before - Cfg.anomaly_detection_query_duration;
+ RRDR_OPTIONS Options = static_cast<RRDR_OPTIONS>(0x00000000);
+
+ RRDR *R = rrd2rrdr_legacy(
+ OWA,
+ host->anomaly_rate_rs,
+ 1 /* points wanted */,
+ After,
+ Before,
+ Cfg.anomaly_detection_grouping_method,
+ 0 /* resampling time */,
+ Options, "anomaly_rate",
+ NULL /* group options */,
+ 0, /* timeout */
+ 0, /* tier */
+ QUERY_SOURCE_ML,
+ STORAGE_PRIORITY_SYNCHRONOUS
+ );
+
+ if (R) {
+ if (R->d == 1 && R->n == 1 && R->rows == 1) {
+ static thread_local bool prev_above_threshold = false;
+ bool above_threshold = R->v[0] >= Cfg.host_anomaly_rate_threshold;
+ bool new_anomaly_event = above_threshold && !prev_above_threshold;
+ prev_above_threshold = above_threshold;
+
+ rrddim_set_by_pointer(host->detector_events_rs,
+ host->detector_events_above_threshold_rd, above_threshold);
+ rrddim_set_by_pointer(host->detector_events_rs,
+ host->detector_events_new_anomaly_event_rd, new_anomaly_event);
+
+ rrdset_done(host->detector_events_rs);
+ }
+
+ rrdr_free(OWA, R);
}
- rrdr_free(OWA, R);
+ onewayalloc_destroy(OWA);
+ } else {
+ rrddim_set_by_pointer(host->detector_events_rs,
+ host->detector_events_above_threshold_rd, 0);
+ rrddim_set_by_pointer(host->detector_events_rs,
+ host->detector_events_new_anomaly_event_rd, 0);
+ rrdset_done(host->detector_events_rs);
}
-
- onewayalloc_destroy(OWA);
}
}
diff --git a/ml/ml-dummy.c b/ml/ml-dummy.c
index 708ab68ea..2ad6cc726 100644
--- a/ml/ml-dummy.c
+++ b/ml/ml-dummy.c
@@ -33,6 +33,14 @@ void ml_host_delete(RRDHOST *rh) {
UNUSED(rh);
}
+void ml_host_start(RRDHOST *rh) {
+ UNUSED(rh);
+}
+
+void ml_host_stop(RRDHOST *rh) {
+ UNUSED(rh);
+}
+
void ml_host_start_training_thread(RRDHOST *rh) {
UNUSED(rh);
}
@@ -101,4 +109,13 @@ void ml_update_global_statistics_charts(uint64_t models_consulted) {
UNUSED(models_consulted);
}
+bool ml_host_get_host_status(RRDHOST *rh, struct ml_metrics_statistics *mlm) {
+ memset(mlm, 0, sizeof(*mlm));
+ return false;
+}
+
+bool ml_host_running(RRDHOST *rh) {
+ return false;
+}
+
#endif
diff --git a/ml/ml-private.h b/ml/ml-private.h
index 2ed70d1ca..f0e2e7eaf 100644
--- a/ml/ml-private.h
+++ b/ml/ml-private.h
@@ -195,7 +195,7 @@ typedef struct {
std::vector<calculated_number_t> cns;
std::vector<ml_kmeans_t> km_contexts;
- netdata_mutex_t mutex;
+ SPINLOCK slock;
ml_kmeans_t kmeans;
std::vector<DSample> feature;
@@ -206,8 +206,6 @@ typedef struct {
typedef struct {
RRDSET *rs;
ml_machine_learning_stats_t mls;
-
- netdata_mutex_t mutex;
} ml_chart_t;
void ml_chart_update_dimension(ml_chart_t *chart, ml_dimension_t *dim, bool is_anomalous);
@@ -215,6 +213,8 @@ void ml_chart_update_dimension(ml_chart_t *chart, ml_dimension_t *dim, bool is_a
typedef struct {
RRDHOST *rh;
+ std::atomic<bool> ml_running;
+
ml_machine_learning_stats_t mls;
calculated_number_t host_anomaly_rate;
@@ -227,6 +227,9 @@ typedef struct {
* bookkeeping for anomaly detection charts
*/
+ RRDSET *ml_running_rs;
+ RRDDIM *ml_running_rd;
+
RRDSET *machine_learning_status_rs;
RRDDIM *machine_learning_status_enabled_rd;
RRDDIM *machine_learning_status_disabled_sp_rd;
diff --git a/ml/ml.cc b/ml/ml.cc
index a5f0fa062..396967492 100644
--- a/ml/ml.cc
+++ b/ml/ml.cc
@@ -337,7 +337,7 @@ ml_dimension_calculated_numbers(ml_training_thread_t *training_thread, ml_dimens
// Figure out what our time window should be.
training_response.query_before_t = training_response.last_entry_on_response;
training_response.query_after_t = std::max(
- training_response.query_before_t - static_cast<time_t>((max_n - 1) * dim->rd->update_every),
+ training_response.query_before_t - static_cast<time_t>((max_n - 1) * dim->rd->rrdset->update_every),
training_response.first_entry_on_response
);
@@ -568,9 +568,9 @@ int ml_dimension_load_models(RRDDIM *rd) {
if (!dim)
return 0;
- netdata_mutex_lock(&dim->mutex);
+ spinlock_lock(&dim->slock);
bool is_empty = dim->km_contexts.empty();
- netdata_mutex_unlock(&dim->mutex);
+ spinlock_unlock(&dim->slock);
if (!is_empty)
return 0;
@@ -602,7 +602,7 @@ int ml_dimension_load_models(RRDDIM *rd) {
if (unlikely(rc != SQLITE_OK))
goto bind_fail;
- netdata_mutex_lock(&dim->mutex);
+ spinlock_lock(&dim->slock);
dim->km_contexts.reserve(Cfg.num_models_to_use);
while ((rc = sqlite3_step_monitored(res)) == SQLITE_ROW) {
@@ -639,7 +639,7 @@ int ml_dimension_load_models(RRDDIM *rd) {
dim->ts = TRAINING_STATUS_TRAINED;
}
- netdata_mutex_unlock(&dim->mutex);
+ spinlock_unlock(&dim->slock);
if (unlikely(rc != SQLITE_DONE))
error_report("Failed to load models, rc = %d", rc);
@@ -666,7 +666,7 @@ ml_dimension_train_model(ml_training_thread_t *training_thread, ml_dimension_t *
ml_training_response_t training_response = P.second;
if (training_response.result != TRAINING_RESULT_OK) {
- netdata_mutex_lock(&dim->mutex);
+ spinlock_lock(&dim->slock);
dim->mt = METRIC_TYPE_CONSTANT;
@@ -687,7 +687,8 @@ ml_dimension_train_model(ml_training_thread_t *training_thread, ml_dimension_t *
dim->last_training_time = training_response.last_entry_on_response;
enum ml_training_result result = training_response.result;
- netdata_mutex_unlock(&dim->mutex);
+
+ spinlock_unlock(&dim->slock);
return result;
}
@@ -713,7 +714,7 @@ ml_dimension_train_model(ml_training_thread_t *training_thread, ml_dimension_t *
// update models
worker_is_busy(WORKER_TRAIN_UPDATE_MODELS);
{
- netdata_mutex_lock(&dim->mutex);
+ spinlock_lock(&dim->slock);
if (dim->km_contexts.size() < Cfg.num_models_to_use) {
dim->km_contexts.push_back(std::move(dim->kmeans));
@@ -752,7 +753,7 @@ ml_dimension_train_model(ml_training_thread_t *training_thread, ml_dimension_t *
model_info.kmeans = dim->km_contexts.back();
training_thread->pending_model_info.push_back(model_info);
- netdata_mutex_unlock(&dim->mutex);
+ spinlock_unlock(&dim->slock);
}
return training_response.result;
@@ -781,7 +782,7 @@ ml_dimension_schedule_for_training(ml_dimension_t *dim, time_t curr_time)
break;
case TRAINING_STATUS_SILENCED:
case TRAINING_STATUS_TRAINED:
- if ((dim->last_training_time + (Cfg.train_every * dim->rd->update_every)) < curr_time) {
+ if ((dim->last_training_time + (Cfg.train_every * dim->rd->rrdset->update_every)) < curr_time) {
schedule_for_training = true;
dim->ts = TRAINING_STATUS_PENDING_WITH_MODEL;
}
@@ -851,7 +852,7 @@ ml_dimension_predict(ml_dimension_t *dim, time_t curr_time, calculated_number_t
/*
* Lock to predict and possibly schedule the dimension for training
*/
- if (netdata_mutex_trylock(&dim->mutex) != 0)
+ if (spinlock_trylock(&dim->slock) == 0)
return false;
// Mark the metric time as variable if we received different values
@@ -866,7 +867,7 @@ ml_dimension_predict(ml_dimension_t *dim, time_t curr_time, calculated_number_t
case TRAINING_STATUS_UNTRAINED:
case TRAINING_STATUS_PENDING_WITHOUT_MODEL: {
case TRAINING_STATUS_SILENCED:
- netdata_mutex_unlock(&dim->mutex);
+ spinlock_unlock(&dim->slock);
return false;
}
default:
@@ -891,7 +892,7 @@ ml_dimension_predict(ml_dimension_t *dim, time_t curr_time, calculated_number_t
if (anomaly_score < (100 * Cfg.dimension_anomaly_score_threshold)) {
global_statistics_ml_models_consulted(models_consulted);
- netdata_mutex_unlock(&dim->mutex);
+ spinlock_unlock(&dim->slock);
return false;
}
@@ -905,7 +906,7 @@ ml_dimension_predict(ml_dimension_t *dim, time_t curr_time, calculated_number_t
dim->ts = TRAINING_STATUS_SILENCED;
}
- netdata_mutex_unlock(&dim->mutex);
+ spinlock_unlock(&dim->slock);
global_statistics_ml_models_consulted(models_consulted);
return sum;
@@ -992,7 +993,7 @@ ml_host_detect_once(ml_host_t *host)
host->mls = {};
ml_machine_learning_stats_t mls_copy = {};
- {
+ if (host->ml_running) {
netdata_mutex_lock(&host->mutex);
/*
@@ -1036,6 +1037,8 @@ ml_host_detect_once(ml_host_t *host)
mls_copy = host->mls;
netdata_mutex_unlock(&host->mutex);
+ } else {
+ host->host_anomaly_rate = 0.0;
}
worker_is_busy(WORKER_JOB_DETECTION_DIM_CHART);
@@ -1213,15 +1216,14 @@ void ml_host_new(RRDHOST *rh)
host->rh = rh;
host->mls = ml_machine_learning_stats_t();
- //host->ts = ml_training_stats_t();
+ host->host_anomaly_rate = 0.0;
static std::atomic<size_t> times_called(0);
host->training_queue = Cfg.training_threads[times_called++ % Cfg.num_training_threads].training_queue;
- host->host_anomaly_rate = 0.0;
-
netdata_mutex_init(&host->mutex);
+ host->ml_running = true;
rh->ml_host = (rrd_ml_host_t *) host;
}
@@ -1237,6 +1239,70 @@ void ml_host_delete(RRDHOST *rh)
rh->ml_host = NULL;
}
+void ml_host_start(RRDHOST *rh) {
+ ml_host_t *host = (ml_host_t *) rh->ml_host;
+ if (!host)
+ return;
+
+ host->ml_running = true;
+}
+
+void ml_host_stop(RRDHOST *rh) {
+ ml_host_t *host = (ml_host_t *) rh->ml_host;
+ if (!host || !host->ml_running)
+ return;
+
+ netdata_mutex_lock(&host->mutex);
+
+ // reset host stats
+ host->mls = ml_machine_learning_stats_t();
+
+ // reset charts/dims
+ void *rsp = NULL;
+ rrdset_foreach_read(rsp, host->rh) {
+ RRDSET *rs = static_cast<RRDSET *>(rsp);
+
+ ml_chart_t *chart = (ml_chart_t *) rs->ml_chart;
+ if (!chart)
+ continue;
+
+ // reset chart
+ chart->mls = ml_machine_learning_stats_t();
+
+ void *rdp = NULL;
+ rrddim_foreach_read(rdp, rs) {
+ RRDDIM *rd = static_cast<RRDDIM *>(rdp);
+
+ ml_dimension_t *dim = (ml_dimension_t *) rd->ml_dimension;
+ if (!dim)
+ continue;
+
+ spinlock_lock(&dim->slock);
+
+ // reset dim
+ // TODO: should we drop in-mem models, or mark them as stale? Is it
+ // okay to resume training straight away?
+
+ dim->mt = METRIC_TYPE_CONSTANT;
+ dim->ts = TRAINING_STATUS_UNTRAINED;
+ dim->last_training_time = 0;
+ dim->suppression_anomaly_counter = 0;
+ dim->suppression_window_counter = 0;
+ dim->cns.clear();
+
+ ml_kmeans_init(&dim->kmeans);
+
+ spinlock_unlock(&dim->slock);
+ }
+ rrddim_foreach_done(rdp);
+ }
+ rrdset_foreach_done(rsp);
+
+ netdata_mutex_unlock(&host->mutex);
+
+ host->ml_running = false;
+}
+
void ml_host_get_info(RRDHOST *rh, BUFFER *wb)
{
ml_host_t *host = (ml_host_t *) rh->ml_host;
@@ -1279,7 +1345,8 @@ void ml_host_get_detection_info(RRDHOST *rh, BUFFER *wb)
netdata_mutex_lock(&host->mutex);
- buffer_json_member_add_uint64(wb, "version", 1);
+ buffer_json_member_add_uint64(wb, "version", 2);
+ buffer_json_member_add_uint64(wb, "ml-running", host->ml_running);
buffer_json_member_add_uint64(wb, "anomalous-dimensions", host->mls.num_anomalous_dimensions);
buffer_json_member_add_uint64(wb, "normal-dimensions", host->mls.num_normal_dimensions);
buffer_json_member_add_uint64(wb, "total-dimensions", host->mls.num_anomalous_dimensions +
@@ -1289,13 +1356,41 @@ void ml_host_get_detection_info(RRDHOST *rh, BUFFER *wb)
netdata_mutex_unlock(&host->mutex);
}
+bool ml_host_get_host_status(RRDHOST *rh, struct ml_metrics_statistics *mlm) {
+ ml_host_t *host = (ml_host_t *) rh->ml_host;
+ if (!host) {
+ memset(mlm, 0, sizeof(*mlm));
+ return false;
+ }
+
+ netdata_mutex_lock(&host->mutex);
+
+ mlm->anomalous = host->mls.num_anomalous_dimensions;
+ mlm->normal = host->mls.num_normal_dimensions;
+ mlm->trained = host->mls.num_training_status_trained + host->mls.num_training_status_pending_with_model;
+ mlm->pending = host->mls.num_training_status_untrained + host->mls.num_training_status_pending_without_model;
+ mlm->silenced = host->mls.num_training_status_silenced;
+
+ netdata_mutex_unlock(&host->mutex);
+
+ return true;
+}
+
+bool ml_host_running(RRDHOST *rh) {
+ ml_host_t *host = (ml_host_t *) rh->ml_host;
+ if(!host)
+ return false;
+
+ return true;
+}
+
void ml_host_get_models(RRDHOST *rh, BUFFER *wb)
{
UNUSED(rh);
UNUSED(wb);
// TODO: To be implemented
- error("Fetching KMeans models is not supported yet");
+ netdata_log_error("Fetching KMeans models is not supported yet");
}
void ml_chart_new(RRDSET *rs)
@@ -1309,8 +1404,6 @@ void ml_chart_new(RRDSET *rs)
chart->rs = rs;
chart->mls = ml_machine_learning_stats_t();
- netdata_mutex_init(&chart->mutex);
-
rs->ml_chart = (rrd_ml_chart_t *) chart;
}
@@ -1322,8 +1415,6 @@ void ml_chart_delete(RRDSET *rs)
ml_chart_t *chart = (ml_chart_t *) rs->ml_chart;
- netdata_mutex_destroy(&chart->mutex);
-
delete chart;
rs->ml_chart = NULL;
}
@@ -1334,7 +1425,6 @@ bool ml_chart_update_begin(RRDSET *rs)
if (!chart)
return false;
- netdata_mutex_lock(&chart->mutex);
chart->mls = {};
return true;
}
@@ -1344,8 +1434,6 @@ void ml_chart_update_end(RRDSET *rs)
ml_chart_t *chart = (ml_chart_t *) rs->ml_chart;
if (!chart)
return;
-
- netdata_mutex_unlock(&chart->mutex);
}
void ml_dimension_new(RRDDIM *rd)
@@ -1360,8 +1448,9 @@ void ml_dimension_new(RRDDIM *rd)
dim->mt = METRIC_TYPE_CONSTANT;
dim->ts = TRAINING_STATUS_UNTRAINED;
-
dim->last_training_time = 0;
+ dim->suppression_anomaly_counter = 0;
+ dim->suppression_window_counter = 0;
ml_kmeans_init(&dim->kmeans);
@@ -1370,7 +1459,7 @@ void ml_dimension_new(RRDDIM *rd)
else
dim->mls = MACHINE_LEARNING_STATUS_ENABLED;
- netdata_mutex_init(&dim->mutex);
+ spinlock_init(&dim->slock);
dim->km_contexts.reserve(Cfg.num_models_to_use);
@@ -1385,8 +1474,6 @@ void ml_dimension_delete(RRDDIM *rd)
if (!dim)
return;
- netdata_mutex_destroy(&dim->mutex);
-
delete dim;
rd->ml_dimension = NULL;
}
@@ -1397,6 +1484,10 @@ bool ml_dimension_is_anomalous(RRDDIM *rd, time_t curr_time, double value, bool
if (!dim)
return false;
+ ml_host_t *host = (ml_host_t *) rd->rrdset->rrdhost->ml_host;
+ if (!host->ml_running)
+ return false;
+
ml_chart_t *chart = (ml_chart_t *) rd->rrdset->ml_chart;
bool is_anomalous = ml_dimension_predict(dim, curr_time, value, exists);
@@ -1428,11 +1519,11 @@ static void ml_flush_pending_models(ml_training_thread_t *training_thread) {
// try to rollback transaction if we got any failures
if (rc) {
- error("Trying to rollback ML transaction because it failed with rc=%d, op_no=%d", rc, op_no);
+ netdata_log_error("Trying to rollback ML transaction because it failed with rc=%d, op_no=%d", rc, op_no);
op_no++;
rc = db_execute(db, "ROLLBACK;");
if (rc)
- error("ML transaction rollback failed with rc=%d", rc);
+ netdata_log_error("ML transaction rollback failed with rc=%d", rc);
}
training_thread->pending_model_info.clear();
diff --git a/ml/ml.h b/ml/ml.h
index 964dd0821..9384f8a9e 100644
--- a/ml/ml.h
+++ b/ml/ml.h
@@ -23,6 +23,9 @@ void ml_stop_threads(void);
void ml_host_new(RRDHOST *rh);
void ml_host_delete(RRDHOST *rh);
+void ml_host_start(RRDHOST *RH);
+void ml_host_stop(RRDHOST *RH);
+
void ml_host_get_info(RRDHOST *RH, BUFFER *wb);
void ml_host_get_detection_info(RRDHOST *RH, BUFFER *wb);
void ml_host_get_models(RRDHOST *RH, BUFFER *wb);
@@ -40,6 +43,9 @@ int ml_dimension_load_models(RRDDIM *rd);
void ml_update_global_statistics_charts(uint64_t models_consulted);
+bool ml_host_get_host_status(RRDHOST *rh, struct ml_metrics_statistics *mlm);
+bool ml_host_running(RRDHOST *rh);
+
#ifdef __cplusplus
};
#endif