diff options
Diffstat (limited to 'ml')
-rw-r--r-- | ml/Config.cc | 2 | ||||
-rw-r--r-- | ml/ad_charts.cc | 119 | ||||
-rw-r--r-- | ml/ml-dummy.c | 17 | ||||
-rw-r--r-- | ml/ml-private.h | 9 | ||||
-rw-r--r-- | ml/ml.cc | 157 | ||||
-rw-r--r-- | ml/ml.h | 6 |
6 files changed, 235 insertions, 75 deletions
diff --git a/ml/Config.cc b/ml/Config.cc index c00d2e8ee..f733bcf7d 100644 --- a/ml/Config.cc +++ b/ml/Config.cc @@ -83,7 +83,7 @@ void ml_config_load(ml_config_t *cfg) { */ if (min_train_samples >= max_train_samples) { - error("invalid min/max train samples found (%u >= %u)", min_train_samples, max_train_samples); + netdata_log_error("invalid min/max train samples found (%u >= %u)", min_train_samples, max_train_samples); min_train_samples = 1 * 3600; max_train_samples = 6 * 3600; diff --git a/ml/ad_charts.cc b/ml/ad_charts.cc index bd065cfcc..ca4dca139 100644 --- a/ml/ad_charts.cc +++ b/ml/ad_charts.cc @@ -183,6 +183,41 @@ void ml_update_dimensions_chart(ml_host_t *host, const ml_machine_learning_stats rrdset_done(host->dimensions_rs); } + + // ML running + { + if (!host->ml_running_rs) { + char id_buf[1024]; + char name_buf[1024]; + + snprintfz(id_buf, 1024, "ml_running_on_%s", localhost->machine_guid); + snprintfz(name_buf, 1024, "ml_running_on_%s", rrdhost_hostname(localhost)); + + host->ml_running_rs = rrdset_create( + host->rh, + "anomaly_detection", // type + id_buf, // id + name_buf, // name + "anomaly_detection", // family + "anomaly_detection.ml_running", // ctx + "ML running", // title + "boolean", // units + NETDATA_ML_PLUGIN, // plugin + NETDATA_ML_MODULE_DETECTION, // module + NETDATA_ML_CHART_RUNNING, // priority + localhost->rrd_update_every, // update_every + RRDSET_TYPE_LINE // chart_type + ); + rrdset_flag_set(host->ml_running_rs, RRDSET_FLAG_ANOMALY_DETECTION); + + host->ml_running_rd = + rrddim_add(host->ml_running_rs, "ml_running", NULL, 1, 1, RRD_ALGORITHM_ABSOLUTE); + } + + rrddim_set_by_pointer(host->ml_running_rs, + host->ml_running_rd, host->ml_running); + rrdset_done(host->ml_running_rs); + } } void ml_update_host_and_detection_rate_charts(ml_host_t *host, collected_number AnomalyRate) { @@ -260,47 +295,55 @@ void ml_update_host_and_detection_rate_charts(ml_host_t *host, collected_number /* * Compute the values of the dimensions based on the host rate chart */ - ONEWAYALLOC *OWA = onewayalloc_create(0); - time_t Now = now_realtime_sec(); - time_t Before = Now - host->rh->rrd_update_every; - time_t After = Before - Cfg.anomaly_detection_query_duration; - RRDR_OPTIONS Options = static_cast<RRDR_OPTIONS>(0x00000000); - - RRDR *R = rrd2rrdr_legacy( - OWA, - host->anomaly_rate_rs, - 1 /* points wanted */, - After, - Before, - Cfg.anomaly_detection_grouping_method, - 0 /* resampling time */, - Options, "anomaly_rate", - NULL /* group options */, - 0, /* timeout */ - 0, /* tier */ - QUERY_SOURCE_ML, - STORAGE_PRIORITY_SYNCHRONOUS - ); - - if (R) { - if (R->d == 1 && R->n == 1 && R->rows == 1) { - static thread_local bool prev_above_threshold = false; - bool above_threshold = R->v[0] >= Cfg.host_anomaly_rate_threshold; - bool new_anomaly_event = above_threshold && !prev_above_threshold; - prev_above_threshold = above_threshold; - - rrddim_set_by_pointer(host->detector_events_rs, - host->detector_events_above_threshold_rd, above_threshold); - rrddim_set_by_pointer(host->detector_events_rs, - host->detector_events_new_anomaly_event_rd, new_anomaly_event); - - rrdset_done(host->detector_events_rs); + if (host->ml_running) { + ONEWAYALLOC *OWA = onewayalloc_create(0); + time_t Now = now_realtime_sec(); + time_t Before = Now - host->rh->rrd_update_every; + time_t After = Before - Cfg.anomaly_detection_query_duration; + RRDR_OPTIONS Options = static_cast<RRDR_OPTIONS>(0x00000000); + + RRDR *R = rrd2rrdr_legacy( + OWA, + host->anomaly_rate_rs, + 1 /* points wanted */, + After, + Before, + Cfg.anomaly_detection_grouping_method, + 0 /* resampling time */, + Options, "anomaly_rate", + NULL /* group options */, + 0, /* timeout */ + 0, /* tier */ + QUERY_SOURCE_ML, + STORAGE_PRIORITY_SYNCHRONOUS + ); + + if (R) { + if (R->d == 1 && R->n == 1 && R->rows == 1) { + static thread_local bool prev_above_threshold = false; + bool above_threshold = R->v[0] >= Cfg.host_anomaly_rate_threshold; + bool new_anomaly_event = above_threshold && !prev_above_threshold; + prev_above_threshold = above_threshold; + + rrddim_set_by_pointer(host->detector_events_rs, + host->detector_events_above_threshold_rd, above_threshold); + rrddim_set_by_pointer(host->detector_events_rs, + host->detector_events_new_anomaly_event_rd, new_anomaly_event); + + rrdset_done(host->detector_events_rs); + } + + rrdr_free(OWA, R); } - rrdr_free(OWA, R); + onewayalloc_destroy(OWA); + } else { + rrddim_set_by_pointer(host->detector_events_rs, + host->detector_events_above_threshold_rd, 0); + rrddim_set_by_pointer(host->detector_events_rs, + host->detector_events_new_anomaly_event_rd, 0); + rrdset_done(host->detector_events_rs); } - - onewayalloc_destroy(OWA); } } diff --git a/ml/ml-dummy.c b/ml/ml-dummy.c index 708ab68ea..2ad6cc726 100644 --- a/ml/ml-dummy.c +++ b/ml/ml-dummy.c @@ -33,6 +33,14 @@ void ml_host_delete(RRDHOST *rh) { UNUSED(rh); } +void ml_host_start(RRDHOST *rh) { + UNUSED(rh); +} + +void ml_host_stop(RRDHOST *rh) { + UNUSED(rh); +} + void ml_host_start_training_thread(RRDHOST *rh) { UNUSED(rh); } @@ -101,4 +109,13 @@ void ml_update_global_statistics_charts(uint64_t models_consulted) { UNUSED(models_consulted); } +bool ml_host_get_host_status(RRDHOST *rh, struct ml_metrics_statistics *mlm) { + memset(mlm, 0, sizeof(*mlm)); + return false; +} + +bool ml_host_running(RRDHOST *rh) { + return false; +} + #endif diff --git a/ml/ml-private.h b/ml/ml-private.h index 2ed70d1ca..f0e2e7eaf 100644 --- a/ml/ml-private.h +++ b/ml/ml-private.h @@ -195,7 +195,7 @@ typedef struct { std::vector<calculated_number_t> cns; std::vector<ml_kmeans_t> km_contexts; - netdata_mutex_t mutex; + SPINLOCK slock; ml_kmeans_t kmeans; std::vector<DSample> feature; @@ -206,8 +206,6 @@ typedef struct { typedef struct { RRDSET *rs; ml_machine_learning_stats_t mls; - - netdata_mutex_t mutex; } ml_chart_t; void ml_chart_update_dimension(ml_chart_t *chart, ml_dimension_t *dim, bool is_anomalous); @@ -215,6 +213,8 @@ void ml_chart_update_dimension(ml_chart_t *chart, ml_dimension_t *dim, bool is_a typedef struct { RRDHOST *rh; + std::atomic<bool> ml_running; + ml_machine_learning_stats_t mls; calculated_number_t host_anomaly_rate; @@ -227,6 +227,9 @@ typedef struct { * bookkeeping for anomaly detection charts */ + RRDSET *ml_running_rs; + RRDDIM *ml_running_rd; + RRDSET *machine_learning_status_rs; RRDDIM *machine_learning_status_enabled_rd; RRDDIM *machine_learning_status_disabled_sp_rd; @@ -337,7 +337,7 @@ ml_dimension_calculated_numbers(ml_training_thread_t *training_thread, ml_dimens // Figure out what our time window should be. training_response.query_before_t = training_response.last_entry_on_response; training_response.query_after_t = std::max( - training_response.query_before_t - static_cast<time_t>((max_n - 1) * dim->rd->update_every), + training_response.query_before_t - static_cast<time_t>((max_n - 1) * dim->rd->rrdset->update_every), training_response.first_entry_on_response ); @@ -568,9 +568,9 @@ int ml_dimension_load_models(RRDDIM *rd) { if (!dim) return 0; - netdata_mutex_lock(&dim->mutex); + spinlock_lock(&dim->slock); bool is_empty = dim->km_contexts.empty(); - netdata_mutex_unlock(&dim->mutex); + spinlock_unlock(&dim->slock); if (!is_empty) return 0; @@ -602,7 +602,7 @@ int ml_dimension_load_models(RRDDIM *rd) { if (unlikely(rc != SQLITE_OK)) goto bind_fail; - netdata_mutex_lock(&dim->mutex); + spinlock_lock(&dim->slock); dim->km_contexts.reserve(Cfg.num_models_to_use); while ((rc = sqlite3_step_monitored(res)) == SQLITE_ROW) { @@ -639,7 +639,7 @@ int ml_dimension_load_models(RRDDIM *rd) { dim->ts = TRAINING_STATUS_TRAINED; } - netdata_mutex_unlock(&dim->mutex); + spinlock_unlock(&dim->slock); if (unlikely(rc != SQLITE_DONE)) error_report("Failed to load models, rc = %d", rc); @@ -666,7 +666,7 @@ ml_dimension_train_model(ml_training_thread_t *training_thread, ml_dimension_t * ml_training_response_t training_response = P.second; if (training_response.result != TRAINING_RESULT_OK) { - netdata_mutex_lock(&dim->mutex); + spinlock_lock(&dim->slock); dim->mt = METRIC_TYPE_CONSTANT; @@ -687,7 +687,8 @@ ml_dimension_train_model(ml_training_thread_t *training_thread, ml_dimension_t * dim->last_training_time = training_response.last_entry_on_response; enum ml_training_result result = training_response.result; - netdata_mutex_unlock(&dim->mutex); + + spinlock_unlock(&dim->slock); return result; } @@ -713,7 +714,7 @@ ml_dimension_train_model(ml_training_thread_t *training_thread, ml_dimension_t * // update models worker_is_busy(WORKER_TRAIN_UPDATE_MODELS); { - netdata_mutex_lock(&dim->mutex); + spinlock_lock(&dim->slock); if (dim->km_contexts.size() < Cfg.num_models_to_use) { dim->km_contexts.push_back(std::move(dim->kmeans)); @@ -752,7 +753,7 @@ ml_dimension_train_model(ml_training_thread_t *training_thread, ml_dimension_t * model_info.kmeans = dim->km_contexts.back(); training_thread->pending_model_info.push_back(model_info); - netdata_mutex_unlock(&dim->mutex); + spinlock_unlock(&dim->slock); } return training_response.result; @@ -781,7 +782,7 @@ ml_dimension_schedule_for_training(ml_dimension_t *dim, time_t curr_time) break; case TRAINING_STATUS_SILENCED: case TRAINING_STATUS_TRAINED: - if ((dim->last_training_time + (Cfg.train_every * dim->rd->update_every)) < curr_time) { + if ((dim->last_training_time + (Cfg.train_every * dim->rd->rrdset->update_every)) < curr_time) { schedule_for_training = true; dim->ts = TRAINING_STATUS_PENDING_WITH_MODEL; } @@ -851,7 +852,7 @@ ml_dimension_predict(ml_dimension_t *dim, time_t curr_time, calculated_number_t /* * Lock to predict and possibly schedule the dimension for training */ - if (netdata_mutex_trylock(&dim->mutex) != 0) + if (spinlock_trylock(&dim->slock) == 0) return false; // Mark the metric time as variable if we received different values @@ -866,7 +867,7 @@ ml_dimension_predict(ml_dimension_t *dim, time_t curr_time, calculated_number_t case TRAINING_STATUS_UNTRAINED: case TRAINING_STATUS_PENDING_WITHOUT_MODEL: { case TRAINING_STATUS_SILENCED: - netdata_mutex_unlock(&dim->mutex); + spinlock_unlock(&dim->slock); return false; } default: @@ -891,7 +892,7 @@ ml_dimension_predict(ml_dimension_t *dim, time_t curr_time, calculated_number_t if (anomaly_score < (100 * Cfg.dimension_anomaly_score_threshold)) { global_statistics_ml_models_consulted(models_consulted); - netdata_mutex_unlock(&dim->mutex); + spinlock_unlock(&dim->slock); return false; } @@ -905,7 +906,7 @@ ml_dimension_predict(ml_dimension_t *dim, time_t curr_time, calculated_number_t dim->ts = TRAINING_STATUS_SILENCED; } - netdata_mutex_unlock(&dim->mutex); + spinlock_unlock(&dim->slock); global_statistics_ml_models_consulted(models_consulted); return sum; @@ -992,7 +993,7 @@ ml_host_detect_once(ml_host_t *host) host->mls = {}; ml_machine_learning_stats_t mls_copy = {}; - { + if (host->ml_running) { netdata_mutex_lock(&host->mutex); /* @@ -1036,6 +1037,8 @@ ml_host_detect_once(ml_host_t *host) mls_copy = host->mls; netdata_mutex_unlock(&host->mutex); + } else { + host->host_anomaly_rate = 0.0; } worker_is_busy(WORKER_JOB_DETECTION_DIM_CHART); @@ -1213,15 +1216,14 @@ void ml_host_new(RRDHOST *rh) host->rh = rh; host->mls = ml_machine_learning_stats_t(); - //host->ts = ml_training_stats_t(); + host->host_anomaly_rate = 0.0; static std::atomic<size_t> times_called(0); host->training_queue = Cfg.training_threads[times_called++ % Cfg.num_training_threads].training_queue; - host->host_anomaly_rate = 0.0; - netdata_mutex_init(&host->mutex); + host->ml_running = true; rh->ml_host = (rrd_ml_host_t *) host; } @@ -1237,6 +1239,70 @@ void ml_host_delete(RRDHOST *rh) rh->ml_host = NULL; } +void ml_host_start(RRDHOST *rh) { + ml_host_t *host = (ml_host_t *) rh->ml_host; + if (!host) + return; + + host->ml_running = true; +} + +void ml_host_stop(RRDHOST *rh) { + ml_host_t *host = (ml_host_t *) rh->ml_host; + if (!host || !host->ml_running) + return; + + netdata_mutex_lock(&host->mutex); + + // reset host stats + host->mls = ml_machine_learning_stats_t(); + + // reset charts/dims + void *rsp = NULL; + rrdset_foreach_read(rsp, host->rh) { + RRDSET *rs = static_cast<RRDSET *>(rsp); + + ml_chart_t *chart = (ml_chart_t *) rs->ml_chart; + if (!chart) + continue; + + // reset chart + chart->mls = ml_machine_learning_stats_t(); + + void *rdp = NULL; + rrddim_foreach_read(rdp, rs) { + RRDDIM *rd = static_cast<RRDDIM *>(rdp); + + ml_dimension_t *dim = (ml_dimension_t *) rd->ml_dimension; + if (!dim) + continue; + + spinlock_lock(&dim->slock); + + // reset dim + // TODO: should we drop in-mem models, or mark them as stale? Is it + // okay to resume training straight away? + + dim->mt = METRIC_TYPE_CONSTANT; + dim->ts = TRAINING_STATUS_UNTRAINED; + dim->last_training_time = 0; + dim->suppression_anomaly_counter = 0; + dim->suppression_window_counter = 0; + dim->cns.clear(); + + ml_kmeans_init(&dim->kmeans); + + spinlock_unlock(&dim->slock); + } + rrddim_foreach_done(rdp); + } + rrdset_foreach_done(rsp); + + netdata_mutex_unlock(&host->mutex); + + host->ml_running = false; +} + void ml_host_get_info(RRDHOST *rh, BUFFER *wb) { ml_host_t *host = (ml_host_t *) rh->ml_host; @@ -1279,7 +1345,8 @@ void ml_host_get_detection_info(RRDHOST *rh, BUFFER *wb) netdata_mutex_lock(&host->mutex); - buffer_json_member_add_uint64(wb, "version", 1); + buffer_json_member_add_uint64(wb, "version", 2); + buffer_json_member_add_uint64(wb, "ml-running", host->ml_running); buffer_json_member_add_uint64(wb, "anomalous-dimensions", host->mls.num_anomalous_dimensions); buffer_json_member_add_uint64(wb, "normal-dimensions", host->mls.num_normal_dimensions); buffer_json_member_add_uint64(wb, "total-dimensions", host->mls.num_anomalous_dimensions + @@ -1289,13 +1356,41 @@ void ml_host_get_detection_info(RRDHOST *rh, BUFFER *wb) netdata_mutex_unlock(&host->mutex); } +bool ml_host_get_host_status(RRDHOST *rh, struct ml_metrics_statistics *mlm) { + ml_host_t *host = (ml_host_t *) rh->ml_host; + if (!host) { + memset(mlm, 0, sizeof(*mlm)); + return false; + } + + netdata_mutex_lock(&host->mutex); + + mlm->anomalous = host->mls.num_anomalous_dimensions; + mlm->normal = host->mls.num_normal_dimensions; + mlm->trained = host->mls.num_training_status_trained + host->mls.num_training_status_pending_with_model; + mlm->pending = host->mls.num_training_status_untrained + host->mls.num_training_status_pending_without_model; + mlm->silenced = host->mls.num_training_status_silenced; + + netdata_mutex_unlock(&host->mutex); + + return true; +} + +bool ml_host_running(RRDHOST *rh) { + ml_host_t *host = (ml_host_t *) rh->ml_host; + if(!host) + return false; + + return true; +} + void ml_host_get_models(RRDHOST *rh, BUFFER *wb) { UNUSED(rh); UNUSED(wb); // TODO: To be implemented - error("Fetching KMeans models is not supported yet"); + netdata_log_error("Fetching KMeans models is not supported yet"); } void ml_chart_new(RRDSET *rs) @@ -1309,8 +1404,6 @@ void ml_chart_new(RRDSET *rs) chart->rs = rs; chart->mls = ml_machine_learning_stats_t(); - netdata_mutex_init(&chart->mutex); - rs->ml_chart = (rrd_ml_chart_t *) chart; } @@ -1322,8 +1415,6 @@ void ml_chart_delete(RRDSET *rs) ml_chart_t *chart = (ml_chart_t *) rs->ml_chart; - netdata_mutex_destroy(&chart->mutex); - delete chart; rs->ml_chart = NULL; } @@ -1334,7 +1425,6 @@ bool ml_chart_update_begin(RRDSET *rs) if (!chart) return false; - netdata_mutex_lock(&chart->mutex); chart->mls = {}; return true; } @@ -1344,8 +1434,6 @@ void ml_chart_update_end(RRDSET *rs) ml_chart_t *chart = (ml_chart_t *) rs->ml_chart; if (!chart) return; - - netdata_mutex_unlock(&chart->mutex); } void ml_dimension_new(RRDDIM *rd) @@ -1360,8 +1448,9 @@ void ml_dimension_new(RRDDIM *rd) dim->mt = METRIC_TYPE_CONSTANT; dim->ts = TRAINING_STATUS_UNTRAINED; - dim->last_training_time = 0; + dim->suppression_anomaly_counter = 0; + dim->suppression_window_counter = 0; ml_kmeans_init(&dim->kmeans); @@ -1370,7 +1459,7 @@ void ml_dimension_new(RRDDIM *rd) else dim->mls = MACHINE_LEARNING_STATUS_ENABLED; - netdata_mutex_init(&dim->mutex); + spinlock_init(&dim->slock); dim->km_contexts.reserve(Cfg.num_models_to_use); @@ -1385,8 +1474,6 @@ void ml_dimension_delete(RRDDIM *rd) if (!dim) return; - netdata_mutex_destroy(&dim->mutex); - delete dim; rd->ml_dimension = NULL; } @@ -1397,6 +1484,10 @@ bool ml_dimension_is_anomalous(RRDDIM *rd, time_t curr_time, double value, bool if (!dim) return false; + ml_host_t *host = (ml_host_t *) rd->rrdset->rrdhost->ml_host; + if (!host->ml_running) + return false; + ml_chart_t *chart = (ml_chart_t *) rd->rrdset->ml_chart; bool is_anomalous = ml_dimension_predict(dim, curr_time, value, exists); @@ -1428,11 +1519,11 @@ static void ml_flush_pending_models(ml_training_thread_t *training_thread) { // try to rollback transaction if we got any failures if (rc) { - error("Trying to rollback ML transaction because it failed with rc=%d, op_no=%d", rc, op_no); + netdata_log_error("Trying to rollback ML transaction because it failed with rc=%d, op_no=%d", rc, op_no); op_no++; rc = db_execute(db, "ROLLBACK;"); if (rc) - error("ML transaction rollback failed with rc=%d", rc); + netdata_log_error("ML transaction rollback failed with rc=%d", rc); } training_thread->pending_model_info.clear(); @@ -23,6 +23,9 @@ void ml_stop_threads(void); void ml_host_new(RRDHOST *rh); void ml_host_delete(RRDHOST *rh); +void ml_host_start(RRDHOST *RH); +void ml_host_stop(RRDHOST *RH); + void ml_host_get_info(RRDHOST *RH, BUFFER *wb); void ml_host_get_detection_info(RRDHOST *RH, BUFFER *wb); void ml_host_get_models(RRDHOST *RH, BUFFER *wb); @@ -40,6 +43,9 @@ int ml_dimension_load_models(RRDDIM *rd); void ml_update_global_statistics_charts(uint64_t models_consulted); +bool ml_host_get_host_status(RRDHOST *rh, struct ml_metrics_statistics *mlm); +bool ml_host_running(RRDHOST *rh); + #ifdef __cplusplus }; #endif |