From a836a244a3d2bdd4da1ee2641e3e957850668cea Mon Sep 17 00:00:00 2001 From: Daniel Baumann Date: Mon, 8 May 2023 18:27:04 +0200 Subject: Adding upstream version 1.39.0. Signed-off-by: Daniel Baumann --- ml/ADCharts.cc | 518 ---------------- ml/ADCharts.h | 21 - ml/Chart.cc | 0 ml/Chart.h | 128 ---- ml/Config.cc | 123 ++-- ml/Config.h | 52 -- ml/Dimension.cc | 346 ----------- ml/Dimension.h | 198 ------ ml/Host.cc | 387 ------------ ml/Host.h | 70 --- ml/KMeans.cc | 43 -- ml/KMeans.h | 41 -- ml/Mutex.h | 36 -- ml/Query.h | 57 -- ml/Queue.h | 66 -- ml/README.md | 13 +- ml/SamplesBuffer.cc | 183 ------ ml/SamplesBuffer.h | 149 ----- ml/Stats.h | 46 -- ml/ad_charts.cc | 475 +++++++++++++++ ml/ad_charts.h | 14 + ml/ml-dummy.c | 95 +-- ml/ml-private.h | 336 ++++++++++- ml/ml.cc | 1657 +++++++++++++++++++++++++++++++++++++++++++++++---- ml/ml.h | 42 +- 25 files changed, 2509 insertions(+), 2587 deletions(-) delete mode 100644 ml/ADCharts.cc delete mode 100644 ml/ADCharts.h delete mode 100644 ml/Chart.cc delete mode 100644 ml/Chart.h delete mode 100644 ml/Config.h delete mode 100644 ml/Dimension.cc delete mode 100644 ml/Dimension.h delete mode 100644 ml/Host.cc delete mode 100644 ml/Host.h delete mode 100644 ml/KMeans.cc delete mode 100644 ml/KMeans.h delete mode 100644 ml/Mutex.h delete mode 100644 ml/Query.h delete mode 100644 ml/Queue.h delete mode 100644 ml/SamplesBuffer.cc delete mode 100644 ml/SamplesBuffer.h delete mode 100644 ml/Stats.h create mode 100644 ml/ad_charts.cc create mode 100644 ml/ad_charts.h (limited to 'ml') diff --git a/ml/ADCharts.cc b/ml/ADCharts.cc deleted file mode 100644 index cbb13f5d1..000000000 --- a/ml/ADCharts.cc +++ /dev/null @@ -1,518 +0,0 @@ -// SPDX-License-Identifier: GPL-3.0-or-later - -#include "ADCharts.h" -#include "Config.h" - -void ml::updateDimensionsChart(RRDHOST *RH, const MachineLearningStats &MLS) { - /* - * Machine learning status - */ - { - static thread_local RRDSET *MachineLearningStatusRS = nullptr; - - static thread_local RRDDIM *Enabled = nullptr; - static thread_local RRDDIM *DisabledUE = nullptr; - static thread_local RRDDIM *DisabledSP = nullptr; - - if (!MachineLearningStatusRS) { - std::stringstream IdSS, NameSS; - - IdSS << "machine_learning_status_on_" << localhost->machine_guid; - NameSS << "machine_learning_status_on_" << rrdhost_hostname(localhost); - - MachineLearningStatusRS = rrdset_create( - RH, - "netdata", // type - IdSS.str().c_str(), // id - NameSS.str().c_str(), // name - NETDATA_ML_CHART_FAMILY, // family - "netdata.machine_learning_status", // ctx - "Machine learning status", // title - "dimensions", // units - NETDATA_ML_PLUGIN, // plugin - NETDATA_ML_MODULE_TRAINING, // module - NETDATA_ML_CHART_PRIO_MACHINE_LEARNING_STATUS, // priority - RH->rrd_update_every, // update_every - RRDSET_TYPE_LINE // chart_type - ); - rrdset_flag_set(MachineLearningStatusRS , RRDSET_FLAG_ANOMALY_DETECTION); - - Enabled = rrddim_add(MachineLearningStatusRS, "enabled", NULL, 1, 1, RRD_ALGORITHM_ABSOLUTE); - DisabledUE = rrddim_add(MachineLearningStatusRS, "disabled-ue", NULL, 1, 1, RRD_ALGORITHM_ABSOLUTE); - DisabledSP = rrddim_add(MachineLearningStatusRS, "disabled-sp", NULL, 1, 1, RRD_ALGORITHM_ABSOLUTE); - } - - rrddim_set_by_pointer(MachineLearningStatusRS, Enabled, MLS.NumMachineLearningStatusEnabled); - rrddim_set_by_pointer(MachineLearningStatusRS, DisabledUE, MLS.NumMachineLearningStatusDisabledUE); - rrddim_set_by_pointer(MachineLearningStatusRS, DisabledSP, MLS.NumMachineLearningStatusDisabledSP); - - rrdset_done(MachineLearningStatusRS); - } - - /* - * Metric type - */ - { - static thread_local RRDSET *MetricTypesRS = nullptr; - - static thread_local RRDDIM *Constant = nullptr; - static thread_local RRDDIM *Variable = nullptr; - - if (!MetricTypesRS) { - std::stringstream IdSS, NameSS; - - IdSS << "metric_types_on_" << localhost->machine_guid; - NameSS << "metric_types_on_" << rrdhost_hostname(localhost); - - MetricTypesRS = rrdset_create( - RH, - "netdata", // type - IdSS.str().c_str(), // id - NameSS.str().c_str(), // name - NETDATA_ML_CHART_FAMILY, // family - "netdata.metric_types", // ctx - "Dimensions by metric type", // title - "dimensions", // units - NETDATA_ML_PLUGIN, // plugin - NETDATA_ML_MODULE_TRAINING, // module - NETDATA_ML_CHART_PRIO_METRIC_TYPES, // priority - RH->rrd_update_every, // update_every - RRDSET_TYPE_LINE // chart_type - ); - rrdset_flag_set(MetricTypesRS, RRDSET_FLAG_ANOMALY_DETECTION); - - Constant = rrddim_add(MetricTypesRS, "constant", NULL, 1, 1, RRD_ALGORITHM_ABSOLUTE); - Variable = rrddim_add(MetricTypesRS, "variable", NULL, 1, 1, RRD_ALGORITHM_ABSOLUTE); - } - - rrddim_set_by_pointer(MetricTypesRS, Constant, MLS.NumMetricTypeConstant); - rrddim_set_by_pointer(MetricTypesRS, Variable, MLS.NumMetricTypeVariable); - - rrdset_done(MetricTypesRS); - } - - /* - * Training status - */ - { - static thread_local RRDSET *TrainingStatusRS = nullptr; - - static thread_local RRDDIM *Untrained = nullptr; - static thread_local RRDDIM *PendingWithoutModel = nullptr; - static thread_local RRDDIM *Trained = nullptr; - static thread_local RRDDIM *PendingWithModel = nullptr; - - if (!TrainingStatusRS) { - std::stringstream IdSS, NameSS; - - IdSS << "training_status_on_" << localhost->machine_guid; - NameSS << "training_status_on_" << rrdhost_hostname(localhost); - - TrainingStatusRS = rrdset_create( - RH, - "netdata", // type - IdSS.str().c_str(), // id - NameSS.str().c_str(), // name - NETDATA_ML_CHART_FAMILY, // family - "netdata.training_status", // ctx - "Training status of dimensions", // title - "dimensions", // units - NETDATA_ML_PLUGIN, // plugin - NETDATA_ML_MODULE_TRAINING, // module - NETDATA_ML_CHART_PRIO_TRAINING_STATUS, // priority - RH->rrd_update_every, // update_every - RRDSET_TYPE_LINE // chart_type - ); - - rrdset_flag_set(TrainingStatusRS, RRDSET_FLAG_ANOMALY_DETECTION); - - Untrained = rrddim_add(TrainingStatusRS, "untrained", NULL, 1, 1, RRD_ALGORITHM_ABSOLUTE); - PendingWithoutModel = rrddim_add(TrainingStatusRS, "pending-without-model", NULL, 1, 1, RRD_ALGORITHM_ABSOLUTE); - Trained = rrddim_add(TrainingStatusRS, "trained", NULL, 1, 1, RRD_ALGORITHM_ABSOLUTE); - PendingWithModel = rrddim_add(TrainingStatusRS, "pending-with-model", NULL, 1, 1, RRD_ALGORITHM_ABSOLUTE); - } - - rrddim_set_by_pointer(TrainingStatusRS, Untrained, MLS.NumTrainingStatusUntrained); - rrddim_set_by_pointer(TrainingStatusRS, PendingWithoutModel, MLS.NumTrainingStatusPendingWithoutModel); - rrddim_set_by_pointer(TrainingStatusRS, Trained, MLS.NumTrainingStatusTrained); - rrddim_set_by_pointer(TrainingStatusRS, PendingWithModel, MLS.NumTrainingStatusPendingWithModel); - - rrdset_done(TrainingStatusRS); - } - - /* - * Prediction status - */ - { - static thread_local RRDSET *PredictionRS = nullptr; - - static thread_local RRDDIM *Anomalous = nullptr; - static thread_local RRDDIM *Normal = nullptr; - - if (!PredictionRS) { - std::stringstream IdSS, NameSS; - - IdSS << "dimensions_on_" << localhost->machine_guid; - NameSS << "dimensions_on_" << rrdhost_hostname(localhost); - - PredictionRS = rrdset_create( - RH, - "anomaly_detection", // type - IdSS.str().c_str(), // id - NameSS.str().c_str(), // name - "dimensions", // family - "anomaly_detection.dimensions", // ctx - "Anomaly detection dimensions", // title - "dimensions", // units - NETDATA_ML_PLUGIN, // plugin - NETDATA_ML_MODULE_TRAINING, // module - ML_CHART_PRIO_DIMENSIONS, // priority - RH->rrd_update_every, // update_every - RRDSET_TYPE_LINE // chart_type - ); - rrdset_flag_set(PredictionRS, RRDSET_FLAG_ANOMALY_DETECTION); - - Anomalous = rrddim_add(PredictionRS, "anomalous", NULL, 1, 1, RRD_ALGORITHM_ABSOLUTE); - Normal = rrddim_add(PredictionRS, "normal", NULL, 1, 1, RRD_ALGORITHM_ABSOLUTE); - } - - rrddim_set_by_pointer(PredictionRS, Anomalous, MLS.NumAnomalousDimensions); - rrddim_set_by_pointer(PredictionRS, Normal, MLS.NumNormalDimensions); - - rrdset_done(PredictionRS); - } - -} - -void ml::updateHostAndDetectionRateCharts(RRDHOST *RH, collected_number AnomalyRate) { - static thread_local RRDSET *HostRateRS = nullptr; - static thread_local RRDDIM *AnomalyRateRD = nullptr; - - if (!HostRateRS) { - std::stringstream IdSS, NameSS; - - IdSS << "anomaly_rate_on_" << localhost->machine_guid; - NameSS << "anomaly_rate_on_" << rrdhost_hostname(localhost); - - HostRateRS = rrdset_create( - RH, - "anomaly_detection", // type - IdSS.str().c_str(), // id - NameSS.str().c_str(), // name - "anomaly_rate", // family - "anomaly_detection.anomaly_rate", // ctx - "Percentage of anomalous dimensions", // title - "percentage", // units - NETDATA_ML_PLUGIN, // plugin - NETDATA_ML_MODULE_DETECTION, // module - ML_CHART_PRIO_ANOMALY_RATE, // priority - RH->rrd_update_every, // update_every - RRDSET_TYPE_LINE // chart_type - ); - rrdset_flag_set(HostRateRS, RRDSET_FLAG_ANOMALY_DETECTION); - - AnomalyRateRD = rrddim_add(HostRateRS, "anomaly_rate", NULL, - 1, 100, RRD_ALGORITHM_ABSOLUTE); - } - - rrddim_set_by_pointer(HostRateRS, AnomalyRateRD, AnomalyRate); - rrdset_done(HostRateRS); - - static thread_local RRDSET *AnomalyDetectionRS = nullptr; - static thread_local RRDDIM *AboveThresholdRD = nullptr; - static thread_local RRDDIM *NewAnomalyEventRD = nullptr; - - if (!AnomalyDetectionRS) { - std::stringstream IdSS, NameSS; - - IdSS << "anomaly_detection_on_" << localhost->machine_guid; - NameSS << "anomaly_detection_on_" << rrdhost_hostname(localhost); - - AnomalyDetectionRS = rrdset_create( - RH, - "anomaly_detection", // type - IdSS.str().c_str(), // id - NameSS.str().c_str(), // name - "anomaly_detection", // family - "anomaly_detection.detector_events", // ctx - "Anomaly detection events", // title - "percentage", // units - NETDATA_ML_PLUGIN, // plugin - NETDATA_ML_MODULE_DETECTION, // module - ML_CHART_PRIO_DETECTOR_EVENTS, // priority - RH->rrd_update_every, // update_every - RRDSET_TYPE_LINE // chart_type - ); - rrdset_flag_set(AnomalyDetectionRS, RRDSET_FLAG_ANOMALY_DETECTION); - - AboveThresholdRD = rrddim_add(AnomalyDetectionRS, "above_threshold", NULL, - 1, 1, RRD_ALGORITHM_ABSOLUTE); - NewAnomalyEventRD = rrddim_add(AnomalyDetectionRS, "new_anomaly_event", NULL, - 1, 1, RRD_ALGORITHM_ABSOLUTE); - } - - /* - * Compute the values of the dimensions based on the host rate chart - */ - ONEWAYALLOC *OWA = onewayalloc_create(0); - time_t Now = now_realtime_sec(); - time_t Before = Now - RH->rrd_update_every; - time_t After = Before - Cfg.AnomalyDetectionQueryDuration; - RRDR_OPTIONS Options = static_cast(0x00000000); - - RRDR *R = rrd2rrdr_legacy( - OWA, HostRateRS, - 1 /* points wanted */, - After, - Before, - Cfg.AnomalyDetectionGroupingMethod, - 0 /* resampling time */, - Options, "anomaly_rate", - NULL /* group options */, - 0, /* timeout */ - 0, /* tier */ - QUERY_SOURCE_ML, - STORAGE_PRIORITY_BEST_EFFORT - ); - - if(R) { - if(R->d == 1 && R->n == 1 && R->rows == 1) { - static thread_local bool PrevAboveThreshold = false; - bool AboveThreshold = R->v[0] >= Cfg.HostAnomalyRateThreshold; - bool NewAnomalyEvent = AboveThreshold && !PrevAboveThreshold; - PrevAboveThreshold = AboveThreshold; - - rrddim_set_by_pointer(AnomalyDetectionRS, AboveThresholdRD, AboveThreshold); - rrddim_set_by_pointer(AnomalyDetectionRS, NewAnomalyEventRD, NewAnomalyEvent); - rrdset_done(AnomalyDetectionRS); - } - - rrdr_free(OWA, R); - } - - onewayalloc_destroy(OWA); -} - -void ml::updateResourceUsageCharts(RRDHOST *RH, const struct rusage &PredictionRU, const struct rusage &TrainingRU) { - /* - * prediction rusage - */ - { - static thread_local RRDSET *RS = nullptr; - - static thread_local RRDDIM *User = nullptr; - static thread_local RRDDIM *System = nullptr; - - if (!RS) { - std::stringstream IdSS, NameSS; - - IdSS << "prediction_usage_for_" << RH->machine_guid; - NameSS << "prediction_usage_for_" << rrdhost_hostname(RH); - - RS = rrdset_create_localhost( - "netdata", // type - IdSS.str().c_str(), // id - NameSS.str().c_str(), // name - NETDATA_ML_CHART_FAMILY, // family - "netdata.prediction_usage", // ctx - "Prediction resource usage", // title - "milliseconds/s", // units - NETDATA_ML_PLUGIN, // plugin - NETDATA_ML_MODULE_PREDICTION, // module - NETDATA_ML_CHART_PRIO_PREDICTION_USAGE, // priority - RH->rrd_update_every, // update_every - RRDSET_TYPE_STACKED // chart_type - ); - rrdset_flag_set(RS, RRDSET_FLAG_ANOMALY_DETECTION); - - User = rrddim_add(RS, "user", NULL, 1, 1000, RRD_ALGORITHM_INCREMENTAL); - System = rrddim_add(RS, "system", NULL, 1, 1000, RRD_ALGORITHM_INCREMENTAL); - } - - rrddim_set_by_pointer(RS, User, PredictionRU.ru_utime.tv_sec * 1000000ULL + PredictionRU.ru_utime.tv_usec); - rrddim_set_by_pointer(RS, System, PredictionRU.ru_stime.tv_sec * 1000000ULL + PredictionRU.ru_stime.tv_usec); - - rrdset_done(RS); - } - - /* - * training rusage - */ - { - static thread_local RRDSET *RS = nullptr; - - static thread_local RRDDIM *User = nullptr; - static thread_local RRDDIM *System = nullptr; - - if (!RS) { - std::stringstream IdSS, NameSS; - - IdSS << "training_usage_for_" << RH->machine_guid; - NameSS << "training_usage_for_" << rrdhost_hostname(RH); - - RS = rrdset_create_localhost( - "netdata", // type - IdSS.str().c_str(), // id - NameSS.str().c_str(), // name - NETDATA_ML_CHART_FAMILY, // family - "netdata.training_usage", // ctx - "Training resource usage", // title - "milliseconds/s", // units - NETDATA_ML_PLUGIN, // plugin - NETDATA_ML_MODULE_TRAINING, // module - NETDATA_ML_CHART_PRIO_TRAINING_USAGE, // priority - RH->rrd_update_every, // update_every - RRDSET_TYPE_STACKED // chart_type - ); - rrdset_flag_set(RS, RRDSET_FLAG_ANOMALY_DETECTION); - - User = rrddim_add(RS, "user", NULL, 1, 1000, RRD_ALGORITHM_INCREMENTAL); - System = rrddim_add(RS, "system", NULL, 1, 1000, RRD_ALGORITHM_INCREMENTAL); - } - - rrddim_set_by_pointer(RS, User, TrainingRU.ru_utime.tv_sec * 1000000ULL + TrainingRU.ru_utime.tv_usec); - rrddim_set_by_pointer(RS, System, TrainingRU.ru_stime.tv_sec * 1000000ULL + TrainingRU.ru_stime.tv_usec); - - rrdset_done(RS); - } -} - -void ml::updateTrainingStatisticsChart(RRDHOST *RH, const TrainingStats &TS) { - /* - * queue stats - */ - { - static thread_local RRDSET *RS = nullptr; - - static thread_local RRDDIM *QueueSize = nullptr; - static thread_local RRDDIM *PoppedItems = nullptr; - - if (!RS) { - std::stringstream IdSS, NameSS; - - IdSS << "queue_stats_on_" << localhost->machine_guid; - NameSS << "queue_stats_on_" << rrdhost_hostname(localhost); - - RS = rrdset_create( - RH, - "netdata", // type - IdSS.str().c_str(), // id - NameSS.str().c_str(), // name - NETDATA_ML_CHART_FAMILY, // family - "netdata.queue_stats", // ctx - "Training queue stats", // title - "items", // units - NETDATA_ML_PLUGIN, // plugin - NETDATA_ML_MODULE_TRAINING, // module - NETDATA_ML_CHART_PRIO_QUEUE_STATS, // priority - RH->rrd_update_every, // update_every - RRDSET_TYPE_LINE// chart_type - ); - rrdset_flag_set(RS, RRDSET_FLAG_ANOMALY_DETECTION); - - QueueSize = rrddim_add(RS, "queue_size", NULL, 1, 1, RRD_ALGORITHM_ABSOLUTE); - PoppedItems = rrddim_add(RS, "popped_items", NULL, 1, 1, RRD_ALGORITHM_ABSOLUTE); - } - - rrddim_set_by_pointer(RS, QueueSize, TS.QueueSize); - rrddim_set_by_pointer(RS, PoppedItems, TS.NumPoppedItems); - - rrdset_done(RS); - } - - /* - * training stats - */ - { - static thread_local RRDSET *RS = nullptr; - - static thread_local RRDDIM *Allotted = nullptr; - static thread_local RRDDIM *Consumed = nullptr; - static thread_local RRDDIM *Remaining = nullptr; - - if (!RS) { - std::stringstream IdSS, NameSS; - - IdSS << "training_time_stats_on_" << localhost->machine_guid; - NameSS << "training_time_stats_on_" << rrdhost_hostname(localhost); - - RS = rrdset_create( - RH, - "netdata", // type - IdSS.str().c_str(), // id - NameSS.str().c_str(), // name - NETDATA_ML_CHART_FAMILY, // family - "netdata.training_time_stats", // ctx - "Training time stats", // title - "milliseconds", // units - NETDATA_ML_PLUGIN, // plugin - NETDATA_ML_MODULE_TRAINING, // module - NETDATA_ML_CHART_PRIO_TRAINING_TIME_STATS, // priority - RH->rrd_update_every, // update_every - RRDSET_TYPE_LINE// chart_type - ); - rrdset_flag_set(RS, RRDSET_FLAG_ANOMALY_DETECTION); - - Allotted = rrddim_add(RS, "allotted", NULL, 1, 1000, RRD_ALGORITHM_ABSOLUTE); - Consumed = rrddim_add(RS, "consumed", NULL, 1, 1000, RRD_ALGORITHM_ABSOLUTE); - Remaining = rrddim_add(RS, "remaining", NULL, 1, 1000, RRD_ALGORITHM_ABSOLUTE); - } - - rrddim_set_by_pointer(RS, Allotted, TS.AllottedUT); - rrddim_set_by_pointer(RS, Consumed, TS.ConsumedUT); - rrddim_set_by_pointer(RS, Remaining, TS.RemainingUT); - - rrdset_done(RS); - } - - /* - * training result stats - */ - { - static thread_local RRDSET *RS = nullptr; - - static thread_local RRDDIM *Ok = nullptr; - static thread_local RRDDIM *InvalidQueryTimeRange = nullptr; - static thread_local RRDDIM *NotEnoughCollectedValues = nullptr; - static thread_local RRDDIM *NullAcquiredDimension = nullptr; - static thread_local RRDDIM *ChartUnderReplication = nullptr; - - if (!RS) { - std::stringstream IdSS, NameSS; - - IdSS << "training_results_on_" << localhost->machine_guid; - NameSS << "training_results_on_" << rrdhost_hostname(localhost); - - RS = rrdset_create( - RH, - "netdata", // type - IdSS.str().c_str(), // id - NameSS.str().c_str(), // name - NETDATA_ML_CHART_FAMILY, // family - "netdata.training_results", // ctx - "Training results", // title - "events", // units - NETDATA_ML_PLUGIN, // plugin - NETDATA_ML_MODULE_TRAINING, // module - NETDATA_ML_CHART_PRIO_TRAINING_RESULTS, // priority - RH->rrd_update_every, // update_every - RRDSET_TYPE_LINE// chart_type - ); - rrdset_flag_set(RS, RRDSET_FLAG_ANOMALY_DETECTION); - - Ok = rrddim_add(RS, "ok", NULL, 1, 1, RRD_ALGORITHM_ABSOLUTE); - InvalidQueryTimeRange = rrddim_add(RS, "invalid-queries", NULL, 1, 1, RRD_ALGORITHM_ABSOLUTE); - NotEnoughCollectedValues = rrddim_add(RS, "not-enough-values", NULL, 1, 1, RRD_ALGORITHM_ABSOLUTE); - NullAcquiredDimension = rrddim_add(RS, "null-acquired-dimensions", NULL, 1, 1, RRD_ALGORITHM_ABSOLUTE); - ChartUnderReplication = rrddim_add(RS, "chart-under-replication", NULL, 1, 1, RRD_ALGORITHM_ABSOLUTE); - } - - rrddim_set_by_pointer(RS, Ok, TS.TrainingResultOk); - rrddim_set_by_pointer(RS, InvalidQueryTimeRange, TS.TrainingResultInvalidQueryTimeRange); - rrddim_set_by_pointer(RS, NotEnoughCollectedValues, TS.TrainingResultNotEnoughCollectedValues); - rrddim_set_by_pointer(RS, NullAcquiredDimension, TS.TrainingResultNullAcquiredDimension); - rrddim_set_by_pointer(RS, ChartUnderReplication, TS.TrainingResultChartUnderReplication); - - rrdset_done(RS); - } -} diff --git a/ml/ADCharts.h b/ml/ADCharts.h deleted file mode 100644 index ee09669e2..000000000 --- a/ml/ADCharts.h +++ /dev/null @@ -1,21 +0,0 @@ -// SPDX-License-Identifier: GPL-3.0-or-later - -#ifndef ML_ADCHARTS_H -#define ML_ADCHARTS_H - -#include "Stats.h" -#include "ml-private.h" - -namespace ml { - -void updateDimensionsChart(RRDHOST *RH, const MachineLearningStats &MLS); - -void updateHostAndDetectionRateCharts(RRDHOST *RH, collected_number AnomalyRate); - -void updateResourceUsageCharts(RRDHOST *RH, const struct rusage &PredictionRU, const struct rusage &TrainingRU); - -void updateTrainingStatisticsChart(RRDHOST *RH, const TrainingStats &TS); - -} // namespace ml - -#endif /* ML_ADCHARTS_H */ diff --git a/ml/Chart.cc b/ml/Chart.cc deleted file mode 100644 index e69de29bb..000000000 diff --git a/ml/Chart.h b/ml/Chart.h deleted file mode 100644 index dbd6a910f..000000000 --- a/ml/Chart.h +++ /dev/null @@ -1,128 +0,0 @@ -// SPDX-License-Identifier: GPL-3.0-or-later - -#ifndef ML_CHART_H -#define ML_CHART_H - -#include "Config.h" -#include "Dimension.h" - -#include "ml-private.h" -#include "json/single_include/nlohmann/json.hpp" - -namespace ml -{ - -class Chart { -public: - Chart(RRDSET *RS) : - RS(RS), - MLS() - { } - - RRDSET *getRS() const { - return RS; - } - - bool isAvailableForML() { - return rrdset_is_available_for_exporting_and_alarms(RS); - } - - void addDimension(Dimension *D) { - std::lock_guard L(M); - Dimensions[D->getRD()] = D; - } - - void removeDimension(Dimension *D) { - std::lock_guard L(M); - Dimensions.erase(D->getRD()); - } - - void getModelsAsJson(nlohmann::json &Json) { - std::lock_guard L(M); - - for (auto &DP : Dimensions) { - Dimension *D = DP.second; - nlohmann::json JsonArray = nlohmann::json::array(); - for (const KMeans &KM : D->getModels()) { - nlohmann::json J; - KM.toJson(J); - JsonArray.push_back(J); - } - - Json[getMLDimensionID(D->getRD())] = JsonArray; - } - } - - void updateBegin() { - M.lock(); - MLS = {}; - } - - void updateDimension(Dimension *D, bool IsAnomalous) { - switch (D->getMLS()) { - case MachineLearningStatus::DisabledDueToUniqueUpdateEvery: - MLS.NumMachineLearningStatusDisabledUE++; - return; - case MachineLearningStatus::DisabledDueToExcludedChart: - MLS.NumMachineLearningStatusDisabledSP++; - return; - case MachineLearningStatus::Enabled: { - MLS.NumMachineLearningStatusEnabled++; - - switch (D->getMT()) { - case MetricType::Constant: - MLS.NumMetricTypeConstant++; - MLS.NumTrainingStatusTrained++; - MLS.NumNormalDimensions++; - return; - case MetricType::Variable: - MLS.NumMetricTypeVariable++; - break; - } - - switch (D->getTS()) { - case TrainingStatus::Untrained: - MLS.NumTrainingStatusUntrained++; - return; - case TrainingStatus::PendingWithoutModel: - MLS.NumTrainingStatusPendingWithoutModel++; - return; - case TrainingStatus::Trained: - MLS.NumTrainingStatusTrained++; - - MLS.NumAnomalousDimensions += IsAnomalous; - MLS.NumNormalDimensions += !IsAnomalous; - return; - case TrainingStatus::PendingWithModel: - MLS.NumTrainingStatusPendingWithModel++; - - MLS.NumAnomalousDimensions += IsAnomalous; - MLS.NumNormalDimensions += !IsAnomalous; - return; - } - - return; - } - } - } - - void updateEnd() { - M.unlock(); - } - - MachineLearningStats getMLS() { - std::lock_guard L(M); - return MLS; - } - -private: - RRDSET *RS; - MachineLearningStats MLS; - - Mutex M; - std::unordered_map Dimensions; -}; - -} // namespace ml - -#endif /* ML_CHART_H */ diff --git a/ml/Config.cc b/ml/Config.cc index ba3a61445..d451c602c 100644 --- a/ml/Config.cc +++ b/ml/Config.cc @@ -1,15 +1,12 @@ // SPDX-License-Identifier: GPL-3.0-or-later -#include "Config.h" #include "ml-private.h" -using namespace ml; - /* * Global configuration instance to be shared between training and * prediction threads. */ -Config ml::Cfg; +ml_config_t Cfg; template static T clamp(const T& Value, const T& Min, const T& Max) { @@ -19,96 +16,110 @@ static T clamp(const T& Value, const T& Min, const T& Max) { /* * Initialize global configuration variable. */ -void Config::readMLConfig(void) { - const char *ConfigSectionML = CONFIG_SECTION_ML; +void ml_config_load(ml_config_t *cfg) { + const char *config_section_ml = CONFIG_SECTION_ML; - bool EnableAnomalyDetection = config_get_boolean(ConfigSectionML, "enabled", true); + bool enable_anomaly_detection = config_get_boolean(config_section_ml, "enabled", true); /* * Read values */ - unsigned MaxTrainSamples = config_get_number(ConfigSectionML, "maximum num samples to train", 4 * 3600); - unsigned MinTrainSamples = config_get_number(ConfigSectionML, "minimum num samples to train", 1 * 900); - unsigned TrainEvery = config_get_number(ConfigSectionML, "train every", 1 * 3600); - unsigned NumModelsToUse = config_get_number(ConfigSectionML, "number of models per dimension", 1); + unsigned max_train_samples = config_get_number(config_section_ml, "maximum num samples to train", 4 * 3600); + unsigned min_train_samples = config_get_number(config_section_ml, "minimum num samples to train", 1 * 900); + unsigned train_every = config_get_number(config_section_ml, "train every", 1 * 3600); + unsigned num_models_to_use = config_get_number(config_section_ml, "number of models per dimension", 1); + + unsigned diff_n = config_get_number(config_section_ml, "num samples to diff", 1); + unsigned smooth_n = config_get_number(config_section_ml, "num samples to smooth", 3); + unsigned lag_n = config_get_number(config_section_ml, "num samples to lag", 5); + + double random_sampling_ratio = config_get_float(config_section_ml, "random sampling ratio", 1.0 / 5.0 /* default lag_n */); + unsigned max_kmeans_iters = config_get_number(config_section_ml, "maximum number of k-means iterations", 1000); - unsigned DiffN = config_get_number(ConfigSectionML, "num samples to diff", 1); - unsigned SmoothN = config_get_number(ConfigSectionML, "num samples to smooth", 3); - unsigned LagN = config_get_number(ConfigSectionML, "num samples to lag", 5); + double dimension_anomaly_rate_threshold = config_get_float(config_section_ml, "dimension anomaly score threshold", 0.99); - double RandomSamplingRatio = config_get_float(ConfigSectionML, "random sampling ratio", 1.0 / LagN); - unsigned MaxKMeansIters = config_get_number(ConfigSectionML, "maximum number of k-means iterations", 1000); + double host_anomaly_rate_threshold = config_get_float(config_section_ml, "host anomaly rate threshold", 1.0); + std::string anomaly_detection_grouping_method = config_get(config_section_ml, "anomaly detection grouping method", "average"); + time_t anomaly_detection_query_duration = config_get_number(config_section_ml, "anomaly detection grouping duration", 5 * 60); - double DimensionAnomalyScoreThreshold = config_get_float(ConfigSectionML, "dimension anomaly score threshold", 0.99); + size_t num_training_threads = config_get_number(config_section_ml, "num training threads", 4); + size_t flush_models_batch_size = config_get_number(config_section_ml, "flush models batch size", 128); - double HostAnomalyRateThreshold = config_get_float(ConfigSectionML, "host anomaly rate threshold", 1.0); - std::string AnomalyDetectionGroupingMethod = config_get(ConfigSectionML, "anomaly detection grouping method", "average"); - time_t AnomalyDetectionQueryDuration = config_get_number(ConfigSectionML, "anomaly detection grouping duration", 5 * 60); + bool enable_statistics_charts = config_get_boolean(config_section_ml, "enable statistics charts", true); /* * Clamp */ - MaxTrainSamples = clamp(MaxTrainSamples, 1 * 3600, 24 * 3600); - MinTrainSamples = clamp(MinTrainSamples, 1 * 900, 6 * 3600); - TrainEvery = clamp(TrainEvery, 1 * 3600, 6 * 3600); - NumModelsToUse = clamp(NumModelsToUse, 1, 7 * 24); + max_train_samples = clamp(max_train_samples, 1 * 3600, 24 * 3600); + min_train_samples = clamp(min_train_samples, 1 * 900, 6 * 3600); + train_every = clamp(train_every, 1 * 3600, 6 * 3600); + num_models_to_use = clamp(num_models_to_use, 1, 7 * 24); - DiffN = clamp(DiffN, 0u, 1u); - SmoothN = clamp(SmoothN, 0u, 5u); - LagN = clamp(LagN, 1u, 5u); + diff_n = clamp(diff_n, 0u, 1u); + smooth_n = clamp(smooth_n, 0u, 5u); + lag_n = clamp(lag_n, 1u, 5u); - RandomSamplingRatio = clamp(RandomSamplingRatio, 0.2, 1.0); - MaxKMeansIters = clamp(MaxKMeansIters, 500u, 1000u); + random_sampling_ratio = clamp(random_sampling_ratio, 0.2, 1.0); + max_kmeans_iters = clamp(max_kmeans_iters, 500u, 1000u); - DimensionAnomalyScoreThreshold = clamp(DimensionAnomalyScoreThreshold, 0.01, 5.00); + dimension_anomaly_rate_threshold = clamp(dimension_anomaly_rate_threshold, 0.01, 5.00); - HostAnomalyRateThreshold = clamp(HostAnomalyRateThreshold, 0.1, 10.0); - AnomalyDetectionQueryDuration = clamp(AnomalyDetectionQueryDuration, 60, 15 * 60); + host_anomaly_rate_threshold = clamp(host_anomaly_rate_threshold, 0.1, 10.0); + anomaly_detection_query_duration = clamp(anomaly_detection_query_duration, 60, 15 * 60); + + num_training_threads = clamp(num_training_threads, 1, 128); + flush_models_batch_size = clamp(flush_models_batch_size, 8, 512); /* * Validate */ - if (MinTrainSamples >= MaxTrainSamples) { - error("invalid min/max train samples found (%u >= %u)", MinTrainSamples, MaxTrainSamples); + if (min_train_samples >= max_train_samples) { + error("invalid min/max train samples found (%u >= %u)", min_train_samples, max_train_samples); - MinTrainSamples = 1 * 3600; - MaxTrainSamples = 4 * 3600; + min_train_samples = 1 * 3600; + max_train_samples = 4 * 3600; } /* * Assign to config instance */ - Cfg.EnableAnomalyDetection = EnableAnomalyDetection; + cfg->enable_anomaly_detection = enable_anomaly_detection; - Cfg.MaxTrainSamples = MaxTrainSamples; - Cfg.MinTrainSamples = MinTrainSamples; - Cfg.TrainEvery = TrainEvery; - Cfg.NumModelsToUse = NumModelsToUse; + cfg->max_train_samples = max_train_samples; + cfg->min_train_samples = min_train_samples; + cfg->train_every = train_every; - Cfg.DiffN = DiffN; - Cfg.SmoothN = SmoothN; - Cfg.LagN = LagN; + cfg->num_models_to_use = num_models_to_use; - Cfg.RandomSamplingRatio = RandomSamplingRatio; - Cfg.MaxKMeansIters = MaxKMeansIters; + cfg->diff_n = diff_n; + cfg->smooth_n = smooth_n; + cfg->lag_n = lag_n; - Cfg.DimensionAnomalyScoreThreshold = DimensionAnomalyScoreThreshold; + cfg->random_sampling_ratio = random_sampling_ratio; + cfg->max_kmeans_iters = max_kmeans_iters; - Cfg.HostAnomalyRateThreshold = HostAnomalyRateThreshold; - Cfg.AnomalyDetectionGroupingMethod = web_client_api_request_v1_data_group(AnomalyDetectionGroupingMethod.c_str(), RRDR_GROUPING_AVERAGE); - Cfg.AnomalyDetectionQueryDuration = AnomalyDetectionQueryDuration; + cfg->host_anomaly_rate_threshold = host_anomaly_rate_threshold; + cfg->anomaly_detection_grouping_method = + time_grouping_parse(anomaly_detection_grouping_method.c_str(), RRDR_GROUPING_AVERAGE); + cfg->anomaly_detection_query_duration = anomaly_detection_query_duration; + cfg->dimension_anomaly_score_threshold = dimension_anomaly_rate_threshold; - Cfg.HostsToSkip = config_get(ConfigSectionML, "hosts to skip from training", "!*"); - Cfg.SP_HostsToSkip = simple_pattern_create(Cfg.HostsToSkip.c_str(), NULL, SIMPLE_PATTERN_EXACT); + cfg->hosts_to_skip = config_get(config_section_ml, "hosts to skip from training", "!*"); + cfg->sp_host_to_skip = simple_pattern_create(cfg->hosts_to_skip.c_str(), NULL, SIMPLE_PATTERN_EXACT, true); // Always exclude anomaly_detection charts from training. - Cfg.ChartsToSkip = "anomaly_detection.* "; - Cfg.ChartsToSkip += config_get(ConfigSectionML, "charts to skip from training", "netdata.*"); - Cfg.SP_ChartsToSkip = simple_pattern_create(Cfg.ChartsToSkip.c_str(), NULL, SIMPLE_PATTERN_EXACT); + cfg->charts_to_skip = "anomaly_detection.* "; + cfg->charts_to_skip += config_get(config_section_ml, "charts to skip from training", "netdata.*"); + cfg->sp_charts_to_skip = simple_pattern_create(cfg->charts_to_skip.c_str(), NULL, SIMPLE_PATTERN_EXACT, true); + + cfg->stream_anomaly_detection_charts = config_get_boolean(config_section_ml, "stream anomaly detection charts", true); + + cfg->num_training_threads = num_training_threads; + cfg->flush_models_batch_size = flush_models_batch_size; - Cfg.StreamADCharts = config_get_boolean(ConfigSectionML, "stream anomaly detection charts", true); + cfg->enable_statistics_charts = enable_statistics_charts; } diff --git a/ml/Config.h b/ml/Config.h deleted file mode 100644 index f10e11492..000000000 --- a/ml/Config.h +++ /dev/null @@ -1,52 +0,0 @@ -// SPDX-License-Identifier: GPL-3.0-or-later - -#ifndef ML_CONFIG_H -#define ML_CONFIG_H - -#include "ml-private.h" - -namespace ml { - -class Config { -public: - bool EnableAnomalyDetection; - - unsigned MaxTrainSamples; - unsigned MinTrainSamples; - unsigned TrainEvery; - - unsigned NumModelsToUse; - - unsigned DBEngineAnomalyRateEvery; - - unsigned DiffN; - unsigned SmoothN; - unsigned LagN; - - double RandomSamplingRatio; - unsigned MaxKMeansIters; - - double DimensionAnomalyScoreThreshold; - - double HostAnomalyRateThreshold; - RRDR_GROUPING AnomalyDetectionGroupingMethod; - time_t AnomalyDetectionQueryDuration; - - bool StreamADCharts; - - std::string HostsToSkip; - SIMPLE_PATTERN *SP_HostsToSkip; - - std::string ChartsToSkip; - SIMPLE_PATTERN *SP_ChartsToSkip; - - std::vector RandomNums; - - void readMLConfig(); -}; - -extern Config Cfg; - -} // namespace ml - -#endif /* ML_CONFIG_H */ diff --git a/ml/Dimension.cc b/ml/Dimension.cc deleted file mode 100644 index db9256895..000000000 --- a/ml/Dimension.cc +++ /dev/null @@ -1,346 +0,0 @@ -// SPDX-License-Identifier: GPL-3.0-or-later - -#include "Config.h" -#include "Dimension.h" -#include "Query.h" -#include "Host.h" - -using namespace ml; - -static const char *mls2str(MachineLearningStatus MLS) { - switch (MLS) { - case ml::MachineLearningStatus::Enabled: - return "enabled"; - case ml::MachineLearningStatus::DisabledDueToUniqueUpdateEvery: - return "disabled-ue"; - case ml::MachineLearningStatus::DisabledDueToExcludedChart: - return "disabled-sp"; - default: - return "unknown"; - } -} - -static const char *mt2str(MetricType MT) { - switch (MT) { - case ml::MetricType::Constant: - return "constant"; - case ml::MetricType::Variable: - return "variable"; - default: - return "unknown"; - } -} - -static const char *ts2str(TrainingStatus TS) { - switch (TS) { - case ml::TrainingStatus::PendingWithModel: - return "pending-with-model"; - case ml::TrainingStatus::PendingWithoutModel: - return "pending-without-model"; - case ml::TrainingStatus::Trained: - return "trained"; - case ml::TrainingStatus::Untrained: - return "untrained"; - default: - return "unknown"; - } -} - -static const char *tr2str(TrainingResult TR) { - switch (TR) { - case ml::TrainingResult::Ok: - return "ok"; - case ml::TrainingResult::InvalidQueryTimeRange: - return "invalid-query"; - case ml::TrainingResult::NotEnoughCollectedValues: - return "missing-values"; - case ml::TrainingResult::NullAcquiredDimension: - return "null-acquired-dim"; - case ml::TrainingResult::ChartUnderReplication: - return "chart-under-replication"; - default: - return "unknown"; - } -} - -std::pair Dimension::getCalculatedNumbers(const TrainingRequest &TrainingReq) { - TrainingResponse TrainingResp = {}; - - TrainingResp.RequestTime = TrainingReq.RequestTime; - TrainingResp.FirstEntryOnRequest = TrainingReq.FirstEntryOnRequest; - TrainingResp.LastEntryOnRequest = TrainingReq.LastEntryOnRequest; - - TrainingResp.FirstEntryOnResponse = rrddim_first_entry_s_of_tier(RD, 0); - TrainingResp.LastEntryOnResponse = rrddim_last_entry_s_of_tier(RD, 0); - - size_t MinN = Cfg.MinTrainSamples; - size_t MaxN = Cfg.MaxTrainSamples; - - // Figure out what our time window should be. - TrainingResp.QueryBeforeT = TrainingResp.LastEntryOnResponse; - TrainingResp.QueryAfterT = std::max( - TrainingResp.QueryBeforeT - static_cast((MaxN - 1) * updateEvery()), - TrainingResp.FirstEntryOnResponse - ); - - if (TrainingResp.QueryAfterT >= TrainingResp.QueryBeforeT) { - TrainingResp.Result = TrainingResult::InvalidQueryTimeRange; - return { nullptr, TrainingResp }; - } - - if (rrdset_is_replicating(RD->rrdset)) { - TrainingResp.Result = TrainingResult::ChartUnderReplication; - return { nullptr, TrainingResp }; - } - - CalculatedNumber *CNs = new CalculatedNumber[MaxN * (Cfg.LagN + 1)](); - - // Start the query. - size_t Idx = 0; - - CalculatedNumber LastValue = std::numeric_limits::quiet_NaN(); - Query Q = Query(getRD()); - - Q.init(TrainingResp.QueryAfterT, TrainingResp.QueryBeforeT); - while (!Q.isFinished()) { - if (Idx == MaxN) - break; - - auto P = Q.nextMetric(); - - CalculatedNumber Value = P.second; - - if (netdata_double_isnumber(Value)) { - if (!TrainingResp.DbAfterT) - TrainingResp.DbAfterT = P.first; - TrainingResp.DbBeforeT = P.first; - - CNs[Idx] = Value; - LastValue = CNs[Idx]; - TrainingResp.CollectedValues++; - } else - CNs[Idx] = LastValue; - - Idx++; - } - TrainingResp.TotalValues = Idx; - - if (TrainingResp.CollectedValues < MinN) { - TrainingResp.Result = TrainingResult::NotEnoughCollectedValues; - - delete[] CNs; - return { nullptr, TrainingResp }; - } - - // Find first non-NaN value. - for (Idx = 0; std::isnan(CNs[Idx]); Idx++, TrainingResp.TotalValues--) { } - - // Overwrite NaN values. - if (Idx != 0) - memmove(CNs, &CNs[Idx], sizeof(CalculatedNumber) * TrainingResp.TotalValues); - - TrainingResp.Result = TrainingResult::Ok; - return { CNs, TrainingResp }; -} - -TrainingResult Dimension::trainModel(const TrainingRequest &TrainingReq) { - auto P = getCalculatedNumbers(TrainingReq); - CalculatedNumber *CNs = P.first; - TrainingResponse TrainingResp = P.second; - - if (TrainingResp.Result != TrainingResult::Ok) { - std::lock_guard L(M); - - MT = MetricType::Constant; - - switch (TS) { - case TrainingStatus::PendingWithModel: - TS = TrainingStatus::Trained; - break; - case TrainingStatus::PendingWithoutModel: - TS = TrainingStatus::Untrained; - break; - default: - break; - } - - TR = TrainingResp; - - LastTrainingTime = TrainingResp.LastEntryOnResponse; - return TrainingResp.Result; - } - - unsigned N = TrainingResp.TotalValues; - unsigned TargetNumSamples = Cfg.MaxTrainSamples * Cfg.RandomSamplingRatio; - double SamplingRatio = std::min(static_cast(TargetNumSamples) / N, 1.0); - - SamplesBuffer SB = SamplesBuffer(CNs, N, 1, Cfg.DiffN, Cfg.SmoothN, Cfg.LagN, - SamplingRatio, Cfg.RandomNums); - std::vector Samples; - SB.preprocess(Samples); - - KMeans KM; - KM.train(Samples, Cfg.MaxKMeansIters); - - { - std::lock_guard L(M); - - if (Models.size() < Cfg.NumModelsToUse) { - Models.push_back(std::move(KM)); - } else { - std::rotate(std::begin(Models), std::begin(Models) + 1, std::end(Models)); - Models[Models.size() - 1] = std::move(KM); - } - - MT = MetricType::Constant; - TS = TrainingStatus::Trained; - TR = TrainingResp; - LastTrainingTime = rrddim_last_entry_s(RD); - } - - delete[] CNs; - return TrainingResp.Result; -} - -void Dimension::scheduleForTraining(time_t CurrT) { - switch (MT) { - case MetricType::Constant: { - return; - } default: - break; - } - - switch (TS) { - case TrainingStatus::PendingWithModel: - case TrainingStatus::PendingWithoutModel: - break; - case TrainingStatus::Untrained: { - Host *H = reinterpret_cast(RD->rrdset->rrdhost->ml_host); - TS = TrainingStatus::PendingWithoutModel; - H->scheduleForTraining(getTrainingRequest(CurrT)); - break; - } - case TrainingStatus::Trained: { - bool NeedsTraining = (time_t)(LastTrainingTime + (Cfg.TrainEvery * updateEvery())) < CurrT; - - if (NeedsTraining) { - Host *H = reinterpret_cast(RD->rrdset->rrdhost->ml_host); - TS = TrainingStatus::PendingWithModel; - H->scheduleForTraining(getTrainingRequest(CurrT)); - } - break; - } - } -} - -bool Dimension::predict(time_t CurrT, CalculatedNumber Value, bool Exists) { - // Nothing to do if ML is disabled for this dimension - if (MLS != MachineLearningStatus::Enabled) - return false; - - // Don't treat values that don't exist as anomalous - if (!Exists) { - CNs.clear(); - return false; - } - - // Save the value and return if we don't have enough values for a sample - unsigned N = Cfg.DiffN + Cfg.SmoothN + Cfg.LagN; - if (CNs.size() < N) { - CNs.push_back(Value); - return false; - } - - // Push the value and check if it's different from the last one - bool SameValue = true; - std::rotate(std::begin(CNs), std::begin(CNs) + 1, std::end(CNs)); - if (CNs[N - 1] != Value) - SameValue = false; - CNs[N - 1] = Value; - - // Create the sample - CalculatedNumber TmpCNs[N * (Cfg.LagN + 1)]; - memset(TmpCNs, 0, N * (Cfg.LagN + 1) * sizeof(CalculatedNumber)); - std::memcpy(TmpCNs, CNs.data(), N * sizeof(CalculatedNumber)); - SamplesBuffer SB = SamplesBuffer(TmpCNs, N, 1, - Cfg.DiffN, Cfg.SmoothN, Cfg.LagN, - 1.0, Cfg.RandomNums); - SB.preprocess(Feature); - - /* - * Lock to predict and possibly schedule the dimension for training - */ - - std::unique_lock L(M, std::defer_lock); - if (!L.try_lock()) { - return false; - } - - // Mark the metric time as variable if we received different values - if (!SameValue) - MT = MetricType::Variable; - - // Decide if the dimension needs to be scheduled for training - scheduleForTraining(CurrT); - - // Nothing to do if we don't have a model - switch (TS) { - case TrainingStatus::Untrained: - case TrainingStatus::PendingWithoutModel: - return false; - default: - break; - } - - /* - * Use the KMeans models to check if the value is anomalous - */ - - size_t ModelsConsulted = 0; - size_t Sum = 0; - - for (const auto &KM : Models) { - ModelsConsulted++; - - double AnomalyScore = KM.anomalyScore(Feature); - if (AnomalyScore == std::numeric_limits::quiet_NaN()) - continue; - - if (AnomalyScore < (100 * Cfg.DimensionAnomalyScoreThreshold)) { - global_statistics_ml_models_consulted(ModelsConsulted); - return false; - } - - Sum += 1; - } - - global_statistics_ml_models_consulted(ModelsConsulted); - return Sum; -} - -std::vector Dimension::getModels() { - std::unique_lock L(M); - return Models; -} - -void Dimension::dump() const { - const char *ChartId = rrdset_id(RD->rrdset); - const char *DimensionId = rrddim_id(RD); - - const char *MLS_Str = mls2str(MLS); - const char *MT_Str = mt2str(MT); - const char *TS_Str = ts2str(TS); - const char *TR_Str = tr2str(TR.Result); - - const char *fmt = - "[ML] %s.%s: MLS=%s, MT=%s, TS=%s, Result=%s, " - "ReqTime=%ld, FEOReq=%ld, LEOReq=%ld, " - "FEOResp=%ld, LEOResp=%ld, QTR=<%ld, %ld>, DBTR=<%ld, %ld>, Collected=%zu, Total=%zu"; - - error(fmt, - ChartId, DimensionId, MLS_Str, MT_Str, TS_Str, TR_Str, - TR.RequestTime, TR.FirstEntryOnRequest, TR.LastEntryOnRequest, - TR.FirstEntryOnResponse, TR.LastEntryOnResponse, - TR.QueryAfterT, TR.QueryBeforeT, TR.DbAfterT, TR.DbBeforeT, TR.CollectedValues, TR.TotalValues - ); -} diff --git a/ml/Dimension.h b/ml/Dimension.h deleted file mode 100644 index 2b1adfff9..000000000 --- a/ml/Dimension.h +++ /dev/null @@ -1,198 +0,0 @@ -// SPDX-License-Identifier: GPL-3.0-or-later - -#ifndef ML_DIMENSION_H -#define ML_DIMENSION_H - -#include "Mutex.h" -#include "Stats.h" -#include "Query.h" -#include "Config.h" - -#include "ml-private.h" - -namespace ml { - -static inline std::string getMLDimensionID(RRDDIM *RD) { - RRDSET *RS = RD->rrdset; - - std::stringstream SS; - SS << rrdset_context(RS) << "|" << rrdset_id(RS) << "|" << rrddim_name(RD); - return SS.str(); -} - -enum class MachineLearningStatus { - // Enable training/prediction - Enabled, - - // Disable due to update every being different from the host's - DisabledDueToUniqueUpdateEvery, - - // Disable because configuration pattern matches the chart's id - DisabledDueToExcludedChart, -}; - -enum class TrainingStatus { - // We don't have a model for this dimension - Untrained, - - // Request for training sent, but we don't have any models yet - PendingWithoutModel, - - // Request to update existing models sent - PendingWithModel, - - // Have a valid, up-to-date model - Trained, -}; - -enum class MetricType { - // The dimension has constant values, no need to train - Constant, - - // The dimension's values fluctuate, we need to generate a model - Variable, -}; - -struct TrainingRequest { - // Chart/dimension we want to train - STRING *ChartId; - STRING *DimensionId; - - // Creation time of request - time_t RequestTime; - - // First/last entry of this dimension in DB - // at the point the request was made - time_t FirstEntryOnRequest; - time_t LastEntryOnRequest; -}; - -void dumpTrainingRequest(const TrainingRequest &TrainingReq, const char *Prefix); - -enum TrainingResult { - // We managed to create a KMeans model - Ok, - // Could not query DB with a correct time range - InvalidQueryTimeRange, - // Did not gather enough data from DB to run KMeans - NotEnoughCollectedValues, - // Acquired a null dimension - NullAcquiredDimension, - // Chart is under replication - ChartUnderReplication, -}; - -struct TrainingResponse { - // Time when the request for this response was made - time_t RequestTime; - - // First/last entry of the dimension in DB when generating the request - time_t FirstEntryOnRequest; - time_t LastEntryOnRequest; - - // First/last entry of the dimension in DB when generating the response - time_t FirstEntryOnResponse; - time_t LastEntryOnResponse; - - // After/Before timestamps of our DB query - time_t QueryAfterT; - time_t QueryBeforeT; - - // Actual after/before returned by the DB query ops - time_t DbAfterT; - time_t DbBeforeT; - - // Number of doubles returned by the DB query - size_t CollectedValues; - - // Number of values we return to the caller - size_t TotalValues; - - // Result of training response - TrainingResult Result; -}; - -void dumpTrainingResponse(const TrainingResponse &TrainingResp, const char *Prefix); - -class Dimension { -public: - Dimension(RRDDIM *RD) : - RD(RD), - MT(MetricType::Constant), - TS(TrainingStatus::Untrained), - TR(), - LastTrainingTime(0) - { - if (simple_pattern_matches(Cfg.SP_ChartsToSkip, rrdset_name(RD->rrdset))) - MLS = MachineLearningStatus::DisabledDueToExcludedChart; - else if (RD->update_every != RD->rrdset->rrdhost->rrd_update_every) - MLS = MachineLearningStatus::DisabledDueToUniqueUpdateEvery; - else - MLS = MachineLearningStatus::Enabled; - - Models.reserve(Cfg.NumModelsToUse); - } - - RRDDIM *getRD() const { - return RD; - } - - unsigned updateEvery() const { - return RD->update_every; - } - - MetricType getMT() const { - return MT; - } - - TrainingStatus getTS() const { - return TS; - } - - MachineLearningStatus getMLS() const { - return MLS; - } - - TrainingResult trainModel(const TrainingRequest &TR); - - void scheduleForTraining(time_t CurrT); - - bool predict(time_t CurrT, CalculatedNumber Value, bool Exists); - - std::vector getModels(); - - void dump() const; - -private: - TrainingRequest getTrainingRequest(time_t CurrT) const { - return TrainingRequest { - string_dup(RD->rrdset->id), - string_dup(RD->id), - CurrT, - rrddim_first_entry_s(RD), - rrddim_last_entry_s(RD) - }; - } - -private: - std::pair getCalculatedNumbers(const TrainingRequest &TrainingReq); - -public: - RRDDIM *RD; - MetricType MT; - TrainingStatus TS; - TrainingResponse TR; - - time_t LastTrainingTime; - - MachineLearningStatus MLS; - - std::vector CNs; - DSample Feature; - std::vector Models; - Mutex M; -}; - -} // namespace ml - -#endif /* ML_DIMENSION_H */ diff --git a/ml/Host.cc b/ml/Host.cc deleted file mode 100644 index a5f276a80..000000000 --- a/ml/Host.cc +++ /dev/null @@ -1,387 +0,0 @@ -// SPDX-License-Identifier: GPL-3.0-or-later - -#include "Config.h" -#include "Host.h" -#include "Queue.h" -#include "ADCharts.h" - -#include "json/single_include/nlohmann/json.hpp" - -using namespace ml; - -void Host::addChart(Chart *C) { - std::lock_guard L(M); - Charts[C->getRS()] = C; -} - -void Host::removeChart(Chart *C) { - std::lock_guard L(M); - Charts.erase(C->getRS()); -} - -void Host::getConfigAsJson(nlohmann::json &Json) const { - Json["version"] = 1; - - Json["enabled"] = Cfg.EnableAnomalyDetection; - - Json["min-train-samples"] = Cfg.MinTrainSamples; - Json["max-train-samples"] = Cfg.MaxTrainSamples; - Json["train-every"] = Cfg.TrainEvery; - - Json["diff-n"] = Cfg.DiffN; - Json["smooth-n"] = Cfg.SmoothN; - Json["lag-n"] = Cfg.LagN; - - Json["random-sampling-ratio"] = Cfg.RandomSamplingRatio; - Json["max-kmeans-iters"] = Cfg.MaxKMeansIters; - - Json["dimension-anomaly-score-threshold"] = Cfg.DimensionAnomalyScoreThreshold; - - Json["host-anomaly-rate-threshold"] = Cfg.HostAnomalyRateThreshold; - Json["anomaly-detection-grouping-method"] = group_method2string(Cfg.AnomalyDetectionGroupingMethod); - Json["anomaly-detection-query-duration"] = Cfg.AnomalyDetectionQueryDuration; - - Json["hosts-to-skip"] = Cfg.HostsToSkip; - Json["charts-to-skip"] = Cfg.ChartsToSkip; -} - -void Host::getModelsAsJson(nlohmann::json &Json) { - std::lock_guard L(M); - - for (auto &CP : Charts) { - Chart *C = CP.second; - C->getModelsAsJson(Json); - } -} - -#define WORKER_JOB_DETECTION_PREP 0 -#define WORKER_JOB_DETECTION_DIM_CHART 1 -#define WORKER_JOB_DETECTION_HOST_CHART 2 -#define WORKER_JOB_DETECTION_STATS 3 -#define WORKER_JOB_DETECTION_RESOURCES 4 - -void Host::detectOnce() { - worker_is_busy(WORKER_JOB_DETECTION_PREP); - - MLS = {}; - MachineLearningStats MLSCopy = {}; - TrainingStats TSCopy = {}; - - { - std::lock_guard L(M); - - /* - * prediction/detection stats - */ - for (auto &CP : Charts) { - Chart *C = CP.second; - - if (!C->isAvailableForML()) - continue; - - MachineLearningStats ChartMLS = C->getMLS(); - - MLS.NumMachineLearningStatusEnabled += ChartMLS.NumMachineLearningStatusEnabled; - MLS.NumMachineLearningStatusDisabledUE += ChartMLS.NumMachineLearningStatusDisabledUE; - MLS.NumMachineLearningStatusDisabledSP += ChartMLS.NumMachineLearningStatusDisabledSP; - - MLS.NumMetricTypeConstant += ChartMLS.NumMetricTypeConstant; - MLS.NumMetricTypeVariable += ChartMLS.NumMetricTypeVariable; - - MLS.NumTrainingStatusUntrained += ChartMLS.NumTrainingStatusUntrained; - MLS.NumTrainingStatusPendingWithoutModel += ChartMLS.NumTrainingStatusPendingWithoutModel; - MLS.NumTrainingStatusTrained += ChartMLS.NumTrainingStatusTrained; - MLS.NumTrainingStatusPendingWithModel += ChartMLS.NumTrainingStatusPendingWithModel; - - MLS.NumAnomalousDimensions += ChartMLS.NumAnomalousDimensions; - MLS.NumNormalDimensions += ChartMLS.NumNormalDimensions; - } - - HostAnomalyRate = 0.0; - size_t NumActiveDimensions = MLS.NumAnomalousDimensions + MLS.NumNormalDimensions; - if (NumActiveDimensions) - HostAnomalyRate = static_cast(MLS.NumAnomalousDimensions) / NumActiveDimensions; - - MLSCopy = MLS; - - /* - * training stats - */ - TSCopy = TS; - - TS.QueueSize = 0; - TS.NumPoppedItems = 0; - - TS.AllottedUT = 0; - TS.ConsumedUT = 0; - TS.RemainingUT = 0; - - TS.TrainingResultOk = 0; - TS.TrainingResultInvalidQueryTimeRange = 0; - TS.TrainingResultNotEnoughCollectedValues = 0; - TS.TrainingResultNullAcquiredDimension = 0; - TS.TrainingResultChartUnderReplication = 0; - } - - // Calc the avg values - if (TSCopy.NumPoppedItems) { - TSCopy.QueueSize /= TSCopy.NumPoppedItems; - TSCopy.AllottedUT /= TSCopy.NumPoppedItems; - TSCopy.ConsumedUT /= TSCopy.NumPoppedItems; - TSCopy.RemainingUT /= TSCopy.NumPoppedItems; - - TSCopy.TrainingResultOk /= TSCopy.NumPoppedItems; - TSCopy.TrainingResultInvalidQueryTimeRange /= TSCopy.NumPoppedItems; - TSCopy.TrainingResultNotEnoughCollectedValues /= TSCopy.NumPoppedItems; - TSCopy.TrainingResultNullAcquiredDimension /= TSCopy.NumPoppedItems; - TSCopy.TrainingResultChartUnderReplication /= TSCopy.NumPoppedItems; - } else { - TSCopy.QueueSize = 0; - TSCopy.AllottedUT = 0; - TSCopy.ConsumedUT = 0; - TSCopy.RemainingUT = 0; - } - - if(!RH) - return; - - worker_is_busy(WORKER_JOB_DETECTION_DIM_CHART); - updateDimensionsChart(RH, MLSCopy); - - worker_is_busy(WORKER_JOB_DETECTION_HOST_CHART); - updateHostAndDetectionRateCharts(RH, HostAnomalyRate * 10000.0); - -#ifdef NETDATA_ML_RESOURCE_CHARTS - worker_is_busy(WORKER_JOB_DETECTION_RESOURCES); - struct rusage PredictionRU; - getrusage(RUSAGE_THREAD, &PredictionRU); - updateResourceUsageCharts(RH, PredictionRU, TSCopy.TrainingRU); -#endif - - worker_is_busy(WORKER_JOB_DETECTION_STATS); - updateTrainingStatisticsChart(RH, TSCopy); -} - -class AcquiredDimension { -public: - static AcquiredDimension find(RRDHOST *RH, STRING *ChartId, STRING *DimensionId) { - RRDDIM_ACQUIRED *AcqRD = nullptr; - Dimension *D = nullptr; - - RRDSET *RS = rrdset_find(RH, string2str(ChartId)); - if (RS) { - AcqRD = rrddim_find_and_acquire(RS, string2str(DimensionId)); - if (AcqRD) { - RRDDIM *RD = rrddim_acquired_to_rrddim(AcqRD); - if (RD) - D = reinterpret_cast(RD->ml_dimension); - } - } - - return AcquiredDimension(AcqRD, D); - } - -private: - AcquiredDimension(RRDDIM_ACQUIRED *AcqRD, Dimension *D) : AcqRD(AcqRD), D(D) {} - -public: - TrainingResult train(const TrainingRequest &TR) { - if (!D) - return TrainingResult::NullAcquiredDimension; - - return D->trainModel(TR); - } - - ~AcquiredDimension() { - if (AcqRD) - rrddim_acquired_release(AcqRD); - } - -private: - RRDDIM_ACQUIRED *AcqRD; - Dimension *D; -}; - -void Host::scheduleForTraining(TrainingRequest TR) { - TrainingQueue.push(TR); -} - -#define WORKER_JOB_TRAINING_FIND 0 -#define WORKER_JOB_TRAINING_TRAIN 1 -#define WORKER_JOB_TRAINING_STATS 2 - -void Host::train() { - worker_register("MLTRAIN"); - worker_register_job_name(WORKER_JOB_TRAINING_FIND, "find"); - worker_register_job_name(WORKER_JOB_TRAINING_TRAIN, "train"); - worker_register_job_name(WORKER_JOB_TRAINING_STATS, "stats"); - - service_register(SERVICE_THREAD_TYPE_NETDATA, NULL, (force_quit_t )ml_cancel_anomaly_detection_threads, RH, true); - - while (service_running(SERVICE_ML_TRAINING)) { - auto P = TrainingQueue.pop(); - TrainingRequest TrainingReq = P.first; - size_t Size = P.second; - - if (ThreadsCancelled) { - info("Stopping training thread because it was cancelled."); - break; - } - - usec_t AllottedUT = (Cfg.TrainEvery * RH->rrd_update_every * USEC_PER_SEC) / Size; - if (AllottedUT > USEC_PER_SEC) - AllottedUT = USEC_PER_SEC; - - usec_t StartUT = now_monotonic_usec(); - TrainingResult TrainingRes; - { - worker_is_busy(WORKER_JOB_TRAINING_FIND); - AcquiredDimension AcqDim = AcquiredDimension::find(RH, TrainingReq.ChartId, TrainingReq.DimensionId); - - worker_is_busy(WORKER_JOB_TRAINING_TRAIN); - TrainingRes = AcqDim.train(TrainingReq); - - string_freez(TrainingReq.ChartId); - string_freez(TrainingReq.DimensionId); - } - usec_t ConsumedUT = now_monotonic_usec() - StartUT; - - worker_is_busy(WORKER_JOB_TRAINING_STATS); - - usec_t RemainingUT = 0; - if (ConsumedUT < AllottedUT) - RemainingUT = AllottedUT - ConsumedUT; - - { - std::lock_guard L(M); - - if (TS.AllottedUT == 0) { - struct rusage TRU; - getrusage(RUSAGE_THREAD, &TRU); - TS.TrainingRU = TRU; - } - - TS.QueueSize += Size; - TS.NumPoppedItems += 1; - - TS.AllottedUT += AllottedUT; - TS.ConsumedUT += ConsumedUT; - TS.RemainingUT += RemainingUT; - - switch (TrainingRes) { - case TrainingResult::Ok: - TS.TrainingResultOk += 1; - break; - case TrainingResult::InvalidQueryTimeRange: - TS.TrainingResultInvalidQueryTimeRange += 1; - break; - case TrainingResult::NotEnoughCollectedValues: - TS.TrainingResultNotEnoughCollectedValues += 1; - break; - case TrainingResult::NullAcquiredDimension: - TS.TrainingResultNullAcquiredDimension += 1; - break; - case TrainingResult::ChartUnderReplication: - TS.TrainingResultChartUnderReplication += 1; - break; - } - } - - worker_is_idle(); - std::this_thread::sleep_for(std::chrono::microseconds{RemainingUT}); - worker_is_busy(0); - } -} - -void Host::detect() { - worker_register("MLDETECT"); - worker_register_job_name(WORKER_JOB_DETECTION_PREP, "prep"); - worker_register_job_name(WORKER_JOB_DETECTION_DIM_CHART, "dim chart"); - worker_register_job_name(WORKER_JOB_DETECTION_HOST_CHART, "host chart"); - worker_register_job_name(WORKER_JOB_DETECTION_STATS, "stats"); - worker_register_job_name(WORKER_JOB_DETECTION_RESOURCES, "resources"); - - service_register(SERVICE_THREAD_TYPE_NETDATA, NULL, (force_quit_t )ml_cancel_anomaly_detection_threads, RH, true); - - heartbeat_t HB; - heartbeat_init(&HB); - - while (service_running((SERVICE_TYPE)(SERVICE_ML_PREDICTION | SERVICE_COLLECTORS))) { - worker_is_idle(); - heartbeat_next(&HB, (RH ? RH->rrd_update_every : default_rrd_update_every) * USEC_PER_SEC); - detectOnce(); - } -} - -void Host::getDetectionInfoAsJson(nlohmann::json &Json) const { - Json["version"] = 1; - Json["anomalous-dimensions"] = MLS.NumAnomalousDimensions; - Json["normal-dimensions"] = MLS.NumNormalDimensions; - Json["total-dimensions"] = MLS.NumAnomalousDimensions + MLS.NumNormalDimensions; - Json["trained-dimensions"] = MLS.NumTrainingStatusTrained + MLS.NumTrainingStatusPendingWithModel; -} - -void *train_main(void *Arg) { - Host *H = reinterpret_cast(Arg); - H->train(); - return nullptr; -} - -void *detect_main(void *Arg) { - Host *H = reinterpret_cast(Arg); - H->detect(); - return nullptr; -} - -void Host::startAnomalyDetectionThreads() { - if (ThreadsRunning) { - error("Anomaly detections threads for host %s are already-up and running.", rrdhost_hostname(RH)); - return; - } - - ThreadsRunning = true; - ThreadsCancelled = false; - ThreadsJoined = false; - - char Tag[NETDATA_THREAD_TAG_MAX + 1]; - -// #define ML_DISABLE_JOINING - - snprintfz(Tag, NETDATA_THREAD_TAG_MAX, "MLTR[%s]", rrdhost_hostname(RH)); - netdata_thread_create(&TrainingThread, Tag, NETDATA_THREAD_OPTION_JOINABLE, train_main, static_cast(this)); - - snprintfz(Tag, NETDATA_THREAD_TAG_MAX, "MLDT[%s]", rrdhost_hostname(RH)); - netdata_thread_create(&DetectionThread, Tag, NETDATA_THREAD_OPTION_JOINABLE, detect_main, static_cast(this)); -} - -void Host::stopAnomalyDetectionThreads(bool join) { - if (!ThreadsRunning) { - error("Anomaly detections threads for host %s have already been stopped.", rrdhost_hostname(RH)); - return; - } - - if(!ThreadsCancelled) { - ThreadsCancelled = true; - - // Signal the training queue to stop popping-items - TrainingQueue.signal(); - netdata_thread_cancel(TrainingThread); - netdata_thread_cancel(DetectionThread); - } - - if (join && !ThreadsJoined) { - ThreadsJoined = true; - ThreadsRunning = false; - - // these fail on alpine linux and our CI hangs forever - // failing to compile static builds - - // commenting them, until we find a solution - - // to enable again: - // NETDATA_THREAD_OPTION_DEFAULT needs to become NETDATA_THREAD_OPTION_JOINABLE - - netdata_thread_join(TrainingThread, nullptr); - netdata_thread_join(DetectionThread, nullptr); - } -} diff --git a/ml/Host.h b/ml/Host.h deleted file mode 100644 index 289cb5ab7..000000000 --- a/ml/Host.h +++ /dev/null @@ -1,70 +0,0 @@ -// SPDX-License-Identifier: GPL-3.0-or-later - -#ifndef ML_HOST_H -#define ML_HOST_H - -#include "Mutex.h" -#include "Config.h" -#include "Dimension.h" -#include "Chart.h" -#include "Queue.h" - -#include "ml-private.h" -#include "json/single_include/nlohmann/json.hpp" - -namespace ml -{ - -class Host { - -friend void* train_main(void *); -friend void *detect_main(void *); - -public: - Host(RRDHOST *RH) : - RH(RH), - MLS(), - TS(), - HostAnomalyRate(0.0), - ThreadsRunning(false), - ThreadsCancelled(false), - ThreadsJoined(false) - {} - - void addChart(Chart *C); - void removeChart(Chart *C); - - void getConfigAsJson(nlohmann::json &Json) const; - void getModelsAsJson(nlohmann::json &Json); - void getDetectionInfoAsJson(nlohmann::json &Json) const; - - void startAnomalyDetectionThreads(); - void stopAnomalyDetectionThreads(bool join); - - void scheduleForTraining(TrainingRequest TR); - void train(); - - void detect(); - void detectOnce(); - -private: - RRDHOST *RH; - MachineLearningStats MLS; - TrainingStats TS; - CalculatedNumber HostAnomalyRate{0.0}; - std::atomic ThreadsRunning; - std::atomic ThreadsCancelled; - std::atomic ThreadsJoined; - - Queue TrainingQueue; - - Mutex M; - std::unordered_map Charts; - - netdata_thread_t TrainingThread; - netdata_thread_t DetectionThread; -}; - -} // namespace ml - -#endif /* ML_HOST_H */ diff --git a/ml/KMeans.cc b/ml/KMeans.cc deleted file mode 100644 index edc2ef49e..000000000 --- a/ml/KMeans.cc +++ /dev/null @@ -1,43 +0,0 @@ -// SPDX-License-Identifier: GPL-3.0-or-later - -#include "KMeans.h" -#include - -void KMeans::train(const std::vector &Samples, size_t MaxIterations) { - MinDist = std::numeric_limits::max(); - MaxDist = std::numeric_limits::min(); - - ClusterCenters.clear(); - - dlib::pick_initial_centers(NumClusters, ClusterCenters, Samples); - dlib::find_clusters_using_kmeans(Samples, ClusterCenters, MaxIterations); - - for (const auto &S : Samples) { - CalculatedNumber MeanDist = 0.0; - - for (const auto &KMCenter : ClusterCenters) - MeanDist += dlib::length(KMCenter - S); - - MeanDist /= NumClusters; - - if (MeanDist < MinDist) - MinDist = MeanDist; - - if (MeanDist > MaxDist) - MaxDist = MeanDist; - } -} - -CalculatedNumber KMeans::anomalyScore(const DSample &Sample) const { - CalculatedNumber MeanDist = 0.0; - for (const auto &CC: ClusterCenters) - MeanDist += dlib::length(CC - Sample); - - MeanDist /= NumClusters; - - if (MaxDist == MinDist) - return 0.0; - - CalculatedNumber AnomalyScore = 100.0 * std::abs((MeanDist - MinDist) / (MaxDist - MinDist)); - return (AnomalyScore > 100.0) ? 100.0 : AnomalyScore; -} diff --git a/ml/KMeans.h b/ml/KMeans.h deleted file mode 100644 index 0398eeb86..000000000 --- a/ml/KMeans.h +++ /dev/null @@ -1,41 +0,0 @@ -// SPDX-License-Identifier: GPL-3.0-or-later - -#ifndef KMEANS_H -#define KMEANS_H - -#include -#include -#include -#include - -#include "SamplesBuffer.h" -#include "json/single_include/nlohmann/json.hpp" - -class KMeans { -public: - KMeans(size_t NumClusters = 2) : NumClusters(NumClusters) { - MinDist = std::numeric_limits::max(); - MaxDist = std::numeric_limits::min(); - }; - - void train(const std::vector &Samples, size_t MaxIterations); - CalculatedNumber anomalyScore(const DSample &Sample) const; - - void toJson(nlohmann::json &J) const { - J = nlohmann::json{ - {"CCs", ClusterCenters}, - {"MinDist", MinDist}, - {"MaxDist", MaxDist} - }; - } - -private: - size_t NumClusters; - - std::vector ClusterCenters; - - CalculatedNumber MinDist; - CalculatedNumber MaxDist; -}; - -#endif /* KMEANS_H */ diff --git a/ml/Mutex.h b/ml/Mutex.h deleted file mode 100644 index fcdb75313..000000000 --- a/ml/Mutex.h +++ /dev/null @@ -1,36 +0,0 @@ -#ifndef ML_MUTEX_H -#define ML_MUTEX_H - -#include "ml-private.h" - -class Mutex { -public: - Mutex() { - netdata_mutex_init(&M); - } - - void lock() { - netdata_mutex_lock(&M); - } - - void unlock() { - netdata_mutex_unlock(&M); - } - - bool try_lock() { - return netdata_mutex_trylock(&M) == 0; - } - - netdata_mutex_t *inner() { - return &M; - } - - ~Mutex() { - netdata_mutex_destroy(&M); - } - -private: - netdata_mutex_t M; -}; - -#endif /* ML_MUTEX_H */ diff --git a/ml/Query.h b/ml/Query.h deleted file mode 100644 index 42a96e85b..000000000 --- a/ml/Query.h +++ /dev/null @@ -1,57 +0,0 @@ -#ifndef QUERY_H -#define QUERY_H - -#include "ml-private.h" - -namespace ml { - -class Query { -public: - Query(RRDDIM *RD) : RD(RD), Initialized(false) { - Ops = RD->tiers[0].query_ops; - } - - time_t latestTime() { - return Ops->latest_time_s(RD->tiers[0].db_metric_handle); - } - - time_t oldestTime() { - return Ops->oldest_time_s(RD->tiers[0].db_metric_handle); - } - - void init(time_t AfterT, time_t BeforeT) { - Ops->init(RD->tiers[0].db_metric_handle, &Handle, AfterT, BeforeT, STORAGE_PRIORITY_BEST_EFFORT); - Initialized = true; - points_read = 0; - } - - bool isFinished() { - return Ops->is_finished(&Handle); - } - - ~Query() { - if (Initialized) { - Ops->finalize(&Handle); - global_statistics_ml_query_completed(points_read); - points_read = 0; - } - } - - std::pair nextMetric() { - points_read++; - STORAGE_POINT sp = Ops->next_metric(&Handle); - return {sp.end_time_s, sp.sum / sp.count }; - } - -private: - RRDDIM *RD; - bool Initialized; - size_t points_read; - - struct storage_engine_query_ops *Ops; - struct storage_engine_query_handle Handle; -}; - -} // namespace ml - -#endif /* QUERY_H */ diff --git a/ml/Queue.h b/ml/Queue.h deleted file mode 100644 index 37a74bd07..000000000 --- a/ml/Queue.h +++ /dev/null @@ -1,66 +0,0 @@ -#ifndef QUEUE_H -#define QUEUE_H - -#include "ml-private.h" -#include "Mutex.h" -#include -#include -#include - -template -class Queue { -public: - Queue(void) : Q(), M() { - pthread_cond_init(&CV, nullptr); - Exit = false; - } - - ~Queue() { - pthread_cond_destroy(&CV); - } - - void push(T t) { - std::lock_guard L(M); - - Q.push(t); - pthread_cond_signal(&CV); - } - - std::pair pop(void) { - std::lock_guard L(M); - - while (Q.empty()) { - pthread_cond_wait(&CV, M.inner()); - - if (Exit) { - // This should happen only when we are destroying a host. - // Callers should use a flag dedicated to checking if we - // are about to delete the host or exit the agent. The original - // implementation would call pthread_exit which would cause - // the queue's mutex to be destroyed twice (and fail on the - // 2nd time) - return { T(), 0 }; - } - } - - T V = Q.front(); - size_t Size = Q.size(); - Q.pop(); - - return { V, Size }; - } - - void signal() { - std::lock_guard L(M); - Exit = true; - pthread_cond_signal(&CV); - } - -private: - std::queue Q; - Mutex M; - pthread_cond_t CV; - std::atomic Exit; -}; - -#endif /* QUEUE_H */ diff --git a/ml/README.md b/ml/README.md index 7f3ed276b..ac7c7c013 100644 --- a/ml/README.md +++ b/ml/README.md @@ -5,18 +5,22 @@ description: "This is an in-depth look at how Netdata uses ML to detect anomalie sidebar_label: "Configure machine learning (ML) powered anomaly detection" learn_status: "Published" learn_topic_type: "Tasks" -learn_rel_path: "Setup" +learn_rel_path: "Configuration" --> # Machine learning (ML) powered anomaly detection ## Overview +Machine learning is a subfield of artificial intelligence that enables computers to learn and improve from experience without being explicitly programmed. In monitoring, machine learning can be used to detect patterns and anomalies in large datasets, enabling users to identify potential issues before they become critical. The importance of machine learning in monitoring lies in its ability to analyze vast amounts of data in real-time and provide actionable insights that can help optimize system performance and prevent downtime. Machine learning can also improve the efficiency and scalability of monitoring systems, enabling organizations to monitor complex infrastructures and applications with ease. + +The primary goal of implementing machine learning features in Netdata is to enable users to detect and alert on anomalies in their systems with advanced anomaly detection capabilities. Netdata's machine learning features are designed to be highly customizable and scalable, so users can tailor the ML models and training process to their specific requirements and monitor systems of any size or complexity. + As of [`v1.32.0`](https://github.com/netdata/netdata/releases/tag/v1.32.0), Netdata comes with ML powered [anomaly detection](https://en.wikipedia.org/wiki/Anomaly_detection) capabilities built into it and available to use out of the box, with zero configuration required (ML was enabled by default in `v1.35.0-29-nightly` in [this PR](https://github.com/netdata/netdata/pull/13158), previously it required a one line config change). 🚧 **Note**: If you would like to get involved and help us with some feedback, email us at analytics-ml-team@netdata.cloud, comment on the [beta launch post](https://community.netdata.cloud/t/anomaly-advisor-beta-launch/2717) in the Netdata community, or come join us in the [🤖-ml-powered-monitoring](https://discord.gg/4eRSEUpJnc) channel of the Netdata discord. -Once ML is enabled, Netdata will begin training a model for each dimension. By default this model is a [k-means clustering](https://en.wikipedia.org/wiki/K-means_clustering) model trained on the most recent 4 hours of data. Rather than just using the most recent value of each raw metric, the model works on a preprocessed ["feature vector"](#feature-vector) of recent smoothed and differenced values. This should enable the model to detect a wider range of potentially anomalous patterns in recent observations as opposed to just point anomalies like big spikes or drops. ([This infographic](https://user-images.githubusercontent.com/2178292/144414415-275a3477-5b47-43d6-8959-509eb48ebb20.png) shows some different types of anomalies.) +Once ML is enabled, Netdata will begin training a model for each dimension. By default this model is a [k-means clustering](https://en.wikipedia.org/wiki/K-means_clustering) model trained on the most recent 4 hours of data. Rather than just using the most recent value of each raw metric, the model works on a preprocessed ["feature vector"](#feature-vector) of recent smoothed and differenced values. This enables the model to detect a wider range of potentially anomalous patterns in recent observations as opposed to just point anomalies like big spikes or drops. ([This infographic](https://user-images.githubusercontent.com/2178292/144414415-275a3477-5b47-43d6-8959-509eb48ebb20.png) shows some different types of anomalies). The sections below will introduce some of the main concepts: - anomaly bit @@ -114,7 +118,9 @@ To enable or disable anomaly detection: 2. In the `[ml]` section, set `enabled = yes` to enable or `enabled = no` to disable. 3. Restart netdata (typically `sudo systemctl restart netdata`). -**Note**: If you would like to learn more about configuring Netdata please see [the configuration guide](https://github.com/netdata/netdata/blob/master/docs/guides/step-by-step/step-04.md). +> 📑 Note +> +> If you would like to learn more about configuring Netdata please see the [Configuration section](https://github.com/netdata/netdata/blob/master/docs/configure/nodes.md) of our documentation. Below is a list of all the available configuration params and their default values. @@ -266,3 +272,4 @@ The anomaly rate across all dimensions of a node. - You should benchmark Netdata resource usage before and after enabling ML. Typical overhead ranges from 1-2% additional CPU at most. - The "anomaly bit" has been implemented to be a building block to underpin many more ML based use cases that we plan to deliver soon. - At its core Netdata uses an approach and problem formulation very similar to the Netdata python [anomalies collector](https://github.com/netdata/netdata/blob/master/collectors/python.d.plugin/anomalies/README.md), just implemented in a much much more efficient and scalable way in the agent in c++. So if you would like to learn more about the approach and are familiar with Python that is a useful resource to explore, as is the corresponding [deep dive tutorial](https://nbviewer.org/github/netdata/community/blob/main/netdata-agent-api/netdata-pandas/anomalies_collector_deepdive.ipynb) where the default model used is PCA instead of K-Means but the overall approach and formulation is similar. +- Check out our ML related blog posts over at [https://blog.netdata.cloud](https://blog.netdata.cloud/tags/machine-learning) diff --git a/ml/SamplesBuffer.cc b/ml/SamplesBuffer.cc deleted file mode 100644 index 359b60c23..000000000 --- a/ml/SamplesBuffer.cc +++ /dev/null @@ -1,183 +0,0 @@ -// SPDX-License-Identifier: GPL-3.0-or-later -// -#include "SamplesBuffer.h" - -#include -#include -#include - -void Sample::print(std::ostream &OS) const { - for (size_t Idx = 0; Idx != NumDims - 1; Idx++) - OS << CNs[Idx] << ", "; - - OS << CNs[NumDims - 1]; -} - -void SamplesBuffer::print(std::ostream &OS) const { - for (size_t Idx = Preprocessed ? (DiffN + (SmoothN - 1) + (LagN)) : 0; - Idx != NumSamples; Idx++) { - Sample S = Preprocessed ? getPreprocessedSample(Idx) : getSample(Idx); - OS << S << std::endl; - } -} - -std::vector SamplesBuffer::getPreprocessedSamples() const { - std::vector V; - - for (size_t Idx = Preprocessed ? (DiffN + (SmoothN - 1) + (LagN)) : 0; - Idx != NumSamples; Idx++) { - Sample S = Preprocessed ? getPreprocessedSample(Idx) : getSample(Idx); - V.push_back(S); - } - - return V; -} - -void SamplesBuffer::diffSamples() { - // Panda's DataFrame default behaviour is to subtract each element from - // itself. For us `DiffN = 0` means "disable diff-ing" when preprocessing - // the samples buffer. This deviation will make it easier for us to test - // the KMeans implementation. - if (DiffN == 0) - return; - - for (size_t Idx = 0; Idx != (NumSamples - DiffN); Idx++) { - size_t High = (NumSamples - 1) - Idx; - size_t Low = High - DiffN; - - Sample LHS = getSample(High); - Sample RHS = getSample(Low); - - LHS.diff(RHS); - } -} - -void SamplesBuffer::smoothSamples() { - // Holds the mean value of each window - CalculatedNumber AccCNs[1] = { 0 }; - Sample Acc(AccCNs, 1); - - // Used to avoid clobbering the accumulator when moving the window - CalculatedNumber TmpCNs[1] = { 0 }; - Sample Tmp(TmpCNs, 1); - - CalculatedNumber Factor = (CalculatedNumber) 1 / SmoothN; - - // Calculate the value of the 1st window - for (size_t Idx = 0; Idx != std::min(SmoothN, NumSamples); Idx++) { - Tmp.add(getSample(NumSamples - (Idx + 1))); - } - - Acc.add(Tmp); - Acc.scale(Factor); - - // Move the window and update the samples - for (size_t Idx = NumSamples; Idx != (DiffN + SmoothN - 1); Idx--) { - Sample S = getSample(Idx - 1); - - // Tmp <- Next window (if any) - if (Idx >= (SmoothN + 1)) { - Tmp.diff(S); - Tmp.add(getSample(Idx - (SmoothN + 1))); - } - - // S <- Acc - S.copy(Acc); - - // Acc <- Tmp - Acc.copy(Tmp); - Acc.scale(Factor); - } -} - -void SamplesBuffer::lagSamples() { - if (LagN == 0) - return; - - for (size_t Idx = NumSamples; Idx != LagN; Idx--) { - Sample PS = getPreprocessedSample(Idx - 1); - PS.lag(getSample(Idx - 1), LagN); - } -} - -void SamplesBuffer::preprocess(std::vector &Samples) { - assert(Preprocessed == false); - - size_t OutN = NumSamples; - - // Diff - if (DiffN >= OutN) - return; - OutN -= DiffN; - diffSamples(); - - // Smooth - if (SmoothN == 0 || SmoothN > OutN) - return; - OutN -= (SmoothN - 1); - smoothSamples(); - - // Lag - if (LagN >= OutN) - return; - OutN -= LagN; - lagSamples(); - - Samples.reserve(OutN); - Preprocessed = true; - - uint32_t MaxMT = std::numeric_limits::max(); - uint32_t CutOff = static_cast(MaxMT) * SamplingRatio; - - for (size_t Idx = NumSamples - OutN; Idx != NumSamples; Idx++) { - if (RandNums[Idx] > CutOff) - continue; - - DSample DS; - DS.set_size(NumDimsPerSample * (LagN + 1)); - - const Sample PS = getPreprocessedSample(Idx); - PS.initDSample(DS); - - Samples.push_back(std::move(DS)); - } -} - -void SamplesBuffer::preprocess(DSample &Feature) { - assert(Preprocessed == false); - - size_t OutN = NumSamples; - - // Diff - if (DiffN >= OutN) - return; - OutN -= DiffN; - diffSamples(); - - // Smooth - if (SmoothN == 0 || SmoothN > OutN) - return; - OutN -= (SmoothN - 1); - smoothSamples(); - - // Lag - if (LagN >= OutN) - return; - OutN -= LagN; - lagSamples(); - - Preprocessed = true; - - uint32_t MaxMT = std::numeric_limits::max(); - uint32_t CutOff = static_cast(MaxMT) * SamplingRatio; - - for (size_t Idx = NumSamples - OutN; Idx != NumSamples; Idx++) { - if (RandNums[Idx] > CutOff) - continue; - - Feature.set_size(NumDimsPerSample * (LagN + 1)); - - const Sample PS = getPreprocessedSample(Idx); - PS.initDSample(Feature); - } -} diff --git a/ml/SamplesBuffer.h b/ml/SamplesBuffer.h deleted file mode 100644 index ca60f4b91..000000000 --- a/ml/SamplesBuffer.h +++ /dev/null @@ -1,149 +0,0 @@ -// SPDX-License-Identifier: GPL-3.0-or-later - -#ifndef SAMPLES_BUFFER_H -#define SAMPLES_BUFFER_H - -#include -#include - -#include -#include -#include - -#include - -typedef double CalculatedNumber; -typedef dlib::matrix DSample; - -class Sample { -public: - Sample(CalculatedNumber *Buf, size_t N) : CNs(Buf), NumDims(N) {} - - void initDSample(DSample &DS) const { - for (size_t Idx = 0; Idx != NumDims; Idx++) { - DS(Idx) = std::abs(CNs[Idx]); - } - } - - void add(const Sample &RHS) const { - assert(NumDims == RHS.NumDims); - - for (size_t Idx = 0; Idx != NumDims; Idx++) - CNs[Idx] += RHS.CNs[Idx]; - }; - - void diff(const Sample &RHS) const { - assert(NumDims == RHS.NumDims); - - for (size_t Idx = 0; Idx != NumDims; Idx++) - CNs[Idx] -= RHS.CNs[Idx]; - }; - - void copy(const Sample &RHS) const { - assert(NumDims == RHS.NumDims); - - std::memcpy(CNs, RHS.CNs, NumDims * sizeof(CalculatedNumber)); - } - - void scale(CalculatedNumber Factor) { - for (size_t Idx = 0; Idx != NumDims; Idx++) - CNs[Idx] *= Factor; - } - - void lag(const Sample &S, size_t LagN) { - size_t N = S.NumDims; - - for (size_t Idx = 0; Idx != (LagN + 1); Idx++) { - Sample Src(S.CNs - (Idx * N), N); - Sample Dst(CNs + (Idx * N), N); - Dst.copy(Src); - } - } - - const CalculatedNumber *getCalculatedNumbers() const { - return CNs; - }; - - void print(std::ostream &OS) const; - -private: - CalculatedNumber *CNs; - size_t NumDims; -}; - -inline std::ostream& operator<<(std::ostream &OS, const Sample &S) { - S.print(OS); - return OS; -} - -class SamplesBuffer { -public: - SamplesBuffer(CalculatedNumber *CNs, - size_t NumSamples, size_t NumDimsPerSample, - size_t DiffN, size_t SmoothN, size_t LagN, - double SamplingRatio, std::vector &RandNums) : - CNs(CNs), NumSamples(NumSamples), NumDimsPerSample(NumDimsPerSample), - DiffN(DiffN), SmoothN(SmoothN), LagN(LagN), - SamplingRatio(SamplingRatio), RandNums(RandNums), - BytesPerSample(NumDimsPerSample * sizeof(CalculatedNumber)), - Preprocessed(false) { - assert(NumDimsPerSample == 1 && "SamplesBuffer supports only one dimension per sample"); - }; - - void preprocess(std::vector &Samples); - void preprocess(DSample &Feature); - std::vector getPreprocessedSamples() const; - - size_t capacity() const { return NumSamples; } - void print(std::ostream &OS) const; - -private: - size_t getSampleOffset(size_t Index) const { - assert(Index < NumSamples); - return Index * NumDimsPerSample; - } - - size_t getPreprocessedSampleOffset(size_t Index) const { - assert(Index < NumSamples); - return getSampleOffset(Index) * (LagN + 1); - } - - void setSample(size_t Index, const Sample &S) const { - size_t Offset = getSampleOffset(Index); - std::memcpy(&CNs[Offset], S.getCalculatedNumbers(), BytesPerSample); - } - - const Sample getSample(size_t Index) const { - size_t Offset = getSampleOffset(Index); - return Sample(&CNs[Offset], NumDimsPerSample); - }; - - const Sample getPreprocessedSample(size_t Index) const { - size_t Offset = getPreprocessedSampleOffset(Index); - return Sample(&CNs[Offset], NumDimsPerSample * (LagN + 1)); - }; - - void diffSamples(); - void smoothSamples(); - void lagSamples(); - -private: - CalculatedNumber *CNs; - size_t NumSamples; - size_t NumDimsPerSample; - size_t DiffN; - size_t SmoothN; - size_t LagN; - double SamplingRatio; - std::vector &RandNums; - - size_t BytesPerSample; - bool Preprocessed; -}; - -inline std::ostream& operator<<(std::ostream& OS, const SamplesBuffer &SB) { - SB.print(OS); - return OS; -} - -#endif /* SAMPLES_BUFFER_H */ diff --git a/ml/Stats.h b/ml/Stats.h deleted file mode 100644 index b99bc39da..000000000 --- a/ml/Stats.h +++ /dev/null @@ -1,46 +0,0 @@ -// SPDX-License-Identifier: GPL-3.0-or-later - -#ifndef ML_STATS_H -#define ML_STATS_H - -#include "ml-private.h" - -namespace ml { - -struct MachineLearningStats { - size_t NumMachineLearningStatusEnabled; - size_t NumMachineLearningStatusDisabledUE; - size_t NumMachineLearningStatusDisabledSP; - - size_t NumMetricTypeConstant; - size_t NumMetricTypeVariable; - - size_t NumTrainingStatusUntrained; - size_t NumTrainingStatusPendingWithoutModel; - size_t NumTrainingStatusTrained; - size_t NumTrainingStatusPendingWithModel; - - size_t NumAnomalousDimensions; - size_t NumNormalDimensions; -}; - -struct TrainingStats { - struct rusage TrainingRU; - - size_t QueueSize; - size_t NumPoppedItems; - - usec_t AllottedUT; - usec_t ConsumedUT; - usec_t RemainingUT; - - size_t TrainingResultOk; - size_t TrainingResultInvalidQueryTimeRange; - size_t TrainingResultNotEnoughCollectedValues; - size_t TrainingResultNullAcquiredDimension; - size_t TrainingResultChartUnderReplication; -}; - -} // namespace ml - -#endif /* ML_STATS_H */ diff --git a/ml/ad_charts.cc b/ml/ad_charts.cc new file mode 100644 index 000000000..086cd5aa0 --- /dev/null +++ b/ml/ad_charts.cc @@ -0,0 +1,475 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +#include "ad_charts.h" + +void ml_update_dimensions_chart(ml_host_t *host, const ml_machine_learning_stats_t &mls) { + /* + * Machine learning status + */ + if (Cfg.enable_statistics_charts) { + if (!host->machine_learning_status_rs) { + char id_buf[1024]; + char name_buf[1024]; + + snprintfz(id_buf, 1024, "machine_learning_status_on_%s", localhost->machine_guid); + snprintfz(name_buf, 1024, "machine_learning_status_on_%s", rrdhost_hostname(localhost)); + + host->machine_learning_status_rs = rrdset_create( + host->rh, + "netdata", // type + id_buf, + name_buf, // name + NETDATA_ML_CHART_FAMILY, // family + "netdata.machine_learning_status", // ctx + "Machine learning status", // title + "dimensions", // units + NETDATA_ML_PLUGIN, // plugin + NETDATA_ML_MODULE_TRAINING, // module + NETDATA_ML_CHART_PRIO_MACHINE_LEARNING_STATUS, // priority + localhost->rrd_update_every, // update_every + RRDSET_TYPE_LINE // chart_type + ); + rrdset_flag_set(host->machine_learning_status_rs , RRDSET_FLAG_ANOMALY_DETECTION); + + host->machine_learning_status_enabled_rd = + rrddim_add(host->machine_learning_status_rs, "enabled", NULL, 1, 1, RRD_ALGORITHM_ABSOLUTE); + host->machine_learning_status_disabled_sp_rd = + rrddim_add(host->machine_learning_status_rs, "disabled-sp", NULL, 1, 1, RRD_ALGORITHM_ABSOLUTE); + } + + rrddim_set_by_pointer(host->machine_learning_status_rs, + host->machine_learning_status_enabled_rd, mls.num_machine_learning_status_enabled); + rrddim_set_by_pointer(host->machine_learning_status_rs, + host->machine_learning_status_disabled_sp_rd, mls.num_machine_learning_status_disabled_sp); + + rrdset_done(host->machine_learning_status_rs); + } + + /* + * Metric type + */ + if (Cfg.enable_statistics_charts) { + if (!host->metric_type_rs) { + char id_buf[1024]; + char name_buf[1024]; + + snprintfz(id_buf, 1024, "metric_types_on_%s", localhost->machine_guid); + snprintfz(name_buf, 1024, "metric_types_on_%s", rrdhost_hostname(localhost)); + + host->metric_type_rs = rrdset_create( + host->rh, + "netdata", // type + id_buf, // id + name_buf, // name + NETDATA_ML_CHART_FAMILY, // family + "netdata.metric_types", // ctx + "Dimensions by metric type", // title + "dimensions", // units + NETDATA_ML_PLUGIN, // plugin + NETDATA_ML_MODULE_TRAINING, // module + NETDATA_ML_CHART_PRIO_METRIC_TYPES, // priority + localhost->rrd_update_every, // update_every + RRDSET_TYPE_LINE // chart_type + ); + rrdset_flag_set(host->metric_type_rs, RRDSET_FLAG_ANOMALY_DETECTION); + + host->metric_type_constant_rd = + rrddim_add(host->metric_type_rs, "constant", NULL, 1, 1, RRD_ALGORITHM_ABSOLUTE); + host->metric_type_variable_rd = + rrddim_add(host->metric_type_rs, "variable", NULL, 1, 1, RRD_ALGORITHM_ABSOLUTE); + } + + rrddim_set_by_pointer(host->metric_type_rs, + host->metric_type_constant_rd, mls.num_metric_type_constant); + rrddim_set_by_pointer(host->metric_type_rs, + host->metric_type_variable_rd, mls.num_metric_type_variable); + + rrdset_done(host->metric_type_rs); + } + + /* + * Training status + */ + if (Cfg.enable_statistics_charts) { + if (!host->training_status_rs) { + char id_buf[1024]; + char name_buf[1024]; + + snprintfz(id_buf, 1024, "training_status_on_%s", localhost->machine_guid); + snprintfz(name_buf, 1024, "training_status_on_%s", rrdhost_hostname(localhost)); + + host->training_status_rs = rrdset_create( + host->rh, + "netdata", // type + id_buf, // id + name_buf, // name + NETDATA_ML_CHART_FAMILY, // family + "netdata.training_status", // ctx + "Training status of dimensions", // title + "dimensions", // units + NETDATA_ML_PLUGIN, // plugin + NETDATA_ML_MODULE_TRAINING, // module + NETDATA_ML_CHART_PRIO_TRAINING_STATUS, // priority + localhost->rrd_update_every, // update_every + RRDSET_TYPE_LINE // chart_type + ); + + rrdset_flag_set(host->training_status_rs, RRDSET_FLAG_ANOMALY_DETECTION); + + host->training_status_untrained_rd = + rrddim_add(host->training_status_rs, "untrained", NULL, 1, 1, RRD_ALGORITHM_ABSOLUTE); + host->training_status_pending_without_model_rd = + rrddim_add(host->training_status_rs, "pending-without-model", NULL, 1, 1, RRD_ALGORITHM_ABSOLUTE); + host->training_status_trained_rd = + rrddim_add(host->training_status_rs, "trained", NULL, 1, 1, RRD_ALGORITHM_ABSOLUTE); + host->training_status_pending_with_model_rd = + rrddim_add(host->training_status_rs, "pending-with-model", NULL, 1, 1, RRD_ALGORITHM_ABSOLUTE); + } + + rrddim_set_by_pointer(host->training_status_rs, + host->training_status_untrained_rd, mls.num_training_status_untrained); + rrddim_set_by_pointer(host->training_status_rs, + host->training_status_pending_without_model_rd, mls.num_training_status_pending_without_model); + rrddim_set_by_pointer(host->training_status_rs, + host->training_status_trained_rd, mls.num_training_status_trained); + rrddim_set_by_pointer(host->training_status_rs, + host->training_status_pending_with_model_rd, mls.num_training_status_pending_with_model); + + rrdset_done(host->training_status_rs); + } + + /* + * Prediction status + */ + { + if (!host->dimensions_rs) { + char id_buf[1024]; + char name_buf[1024]; + + snprintfz(id_buf, 1024, "dimensions_on_%s", localhost->machine_guid); + snprintfz(name_buf, 1024, "dimensions_on_%s", rrdhost_hostname(localhost)); + + host->dimensions_rs = rrdset_create( + host->rh, + "anomaly_detection", // type + id_buf, // id + name_buf, // name + "dimensions", // family + "anomaly_detection.dimensions", // ctx + "Anomaly detection dimensions", // title + "dimensions", // units + NETDATA_ML_PLUGIN, // plugin + NETDATA_ML_MODULE_TRAINING, // module + ML_CHART_PRIO_DIMENSIONS, // priority + localhost->rrd_update_every, // update_every + RRDSET_TYPE_LINE // chart_type + ); + rrdset_flag_set(host->dimensions_rs, RRDSET_FLAG_ANOMALY_DETECTION); + + host->dimensions_anomalous_rd = + rrddim_add(host->dimensions_rs, "anomalous", NULL, 1, 1, RRD_ALGORITHM_ABSOLUTE); + host->dimensions_normal_rd = + rrddim_add(host->dimensions_rs, "normal", NULL, 1, 1, RRD_ALGORITHM_ABSOLUTE); + } + + rrddim_set_by_pointer(host->dimensions_rs, + host->dimensions_anomalous_rd, mls.num_anomalous_dimensions); + rrddim_set_by_pointer(host->dimensions_rs, + host->dimensions_normal_rd, mls.num_normal_dimensions); + + rrdset_done(host->dimensions_rs); + } +} + +void ml_update_host_and_detection_rate_charts(ml_host_t *host, collected_number AnomalyRate) { + /* + * Anomaly rate + */ + { + if (!host->anomaly_rate_rs) { + char id_buf[1024]; + char name_buf[1024]; + + snprintfz(id_buf, 1024, "anomaly_rate_on_%s", localhost->machine_guid); + snprintfz(name_buf, 1024, "anomaly_rate_on_%s", rrdhost_hostname(localhost)); + + host->anomaly_rate_rs = rrdset_create( + host->rh, + "anomaly_detection", // type + id_buf, // id + name_buf, // name + "anomaly_rate", // family + "anomaly_detection.anomaly_rate", // ctx + "Percentage of anomalous dimensions", // title + "percentage", // units + NETDATA_ML_PLUGIN, // plugin + NETDATA_ML_MODULE_DETECTION, // module + ML_CHART_PRIO_ANOMALY_RATE, // priority + localhost->rrd_update_every, // update_every + RRDSET_TYPE_LINE // chart_type + ); + rrdset_flag_set(host->anomaly_rate_rs, RRDSET_FLAG_ANOMALY_DETECTION); + + host->anomaly_rate_rd = + rrddim_add(host->anomaly_rate_rs, "anomaly_rate", NULL, 1, 100, RRD_ALGORITHM_ABSOLUTE); + } + + rrddim_set_by_pointer(host->anomaly_rate_rs, host->anomaly_rate_rd, AnomalyRate); + + rrdset_done(host->anomaly_rate_rs); + } + + /* + * Detector Events + */ + { + if (!host->detector_events_rs) { + char id_buf[1024]; + char name_buf[1024]; + + snprintfz(id_buf, 1024, "anomaly_detection_on_%s", localhost->machine_guid); + snprintfz(name_buf, 1024, "anomaly_detection_on_%s", rrdhost_hostname(localhost)); + + host->detector_events_rs = rrdset_create( + host->rh, + "anomaly_detection", // type + id_buf, // id + name_buf, // name + "anomaly_detection", // family + "anomaly_detection.detector_events", // ctx + "Anomaly detection events", // title + "percentage", // units + NETDATA_ML_PLUGIN, // plugin + NETDATA_ML_MODULE_DETECTION, // module + ML_CHART_PRIO_DETECTOR_EVENTS, // priority + localhost->rrd_update_every, // update_every + RRDSET_TYPE_LINE // chart_type + ); + rrdset_flag_set(host->detector_events_rs, RRDSET_FLAG_ANOMALY_DETECTION); + + host->detector_events_above_threshold_rd = + rrddim_add(host->detector_events_rs, "above_threshold", NULL, 1, 1, RRD_ALGORITHM_ABSOLUTE); + host->detector_events_new_anomaly_event_rd = + rrddim_add(host->detector_events_rs, "new_anomaly_event", NULL, 1, 1, RRD_ALGORITHM_ABSOLUTE); + } + + /* + * Compute the values of the dimensions based on the host rate chart + */ + ONEWAYALLOC *OWA = onewayalloc_create(0); + time_t Now = now_realtime_sec(); + time_t Before = Now - host->rh->rrd_update_every; + time_t After = Before - Cfg.anomaly_detection_query_duration; + RRDR_OPTIONS Options = static_cast(0x00000000); + + RRDR *R = rrd2rrdr_legacy( + OWA, + host->anomaly_rate_rs, + 1 /* points wanted */, + After, + Before, + Cfg.anomaly_detection_grouping_method, + 0 /* resampling time */, + Options, "anomaly_rate", + NULL /* group options */, + 0, /* timeout */ + 0, /* tier */ + QUERY_SOURCE_ML, + STORAGE_PRIORITY_SYNCHRONOUS + ); + + if (R) { + if (R->d == 1 && R->n == 1 && R->rows == 1) { + static thread_local bool prev_above_threshold = false; + bool above_threshold = R->v[0] >= Cfg.host_anomaly_rate_threshold; + bool new_anomaly_event = above_threshold && !prev_above_threshold; + prev_above_threshold = above_threshold; + + rrddim_set_by_pointer(host->detector_events_rs, + host->detector_events_above_threshold_rd, above_threshold); + rrddim_set_by_pointer(host->detector_events_rs, + host->detector_events_new_anomaly_event_rd, new_anomaly_event); + + rrdset_done(host->detector_events_rs); + } + + rrdr_free(OWA, R); + } + + onewayalloc_destroy(OWA); + } +} + +void ml_update_training_statistics_chart(ml_training_thread_t *training_thread, const ml_training_stats_t &ts) { + /* + * queue stats + */ + { + if (!training_thread->queue_stats_rs) { + char id_buf[1024]; + char name_buf[1024]; + + snprintfz(id_buf, 1024, "training_queue_%zu_stats", training_thread->id); + snprintfz(name_buf, 1024, "training_queue_%zu_stats", training_thread->id); + + training_thread->queue_stats_rs = rrdset_create( + localhost, + "netdata", // type + id_buf, // id + name_buf, // name + NETDATA_ML_CHART_FAMILY, // family + "netdata.queue_stats", // ctx + "Training queue stats", // title + "items", // units + NETDATA_ML_PLUGIN, // plugin + NETDATA_ML_MODULE_TRAINING, // module + NETDATA_ML_CHART_PRIO_QUEUE_STATS, // priority + localhost->rrd_update_every, // update_every + RRDSET_TYPE_LINE// chart_type + ); + rrdset_flag_set(training_thread->queue_stats_rs, RRDSET_FLAG_ANOMALY_DETECTION); + + training_thread->queue_stats_queue_size_rd = + rrddim_add(training_thread->queue_stats_rs, "queue_size", NULL, 1, 1, RRD_ALGORITHM_ABSOLUTE); + training_thread->queue_stats_popped_items_rd = + rrddim_add(training_thread->queue_stats_rs, "popped_items", NULL, 1, 1, RRD_ALGORITHM_ABSOLUTE); + } + + rrddim_set_by_pointer(training_thread->queue_stats_rs, + training_thread->queue_stats_queue_size_rd, ts.queue_size); + rrddim_set_by_pointer(training_thread->queue_stats_rs, + training_thread->queue_stats_popped_items_rd, ts.num_popped_items); + + rrdset_done(training_thread->queue_stats_rs); + } + + /* + * training stats + */ + { + if (!training_thread->training_time_stats_rs) { + char id_buf[1024]; + char name_buf[1024]; + + snprintfz(id_buf, 1024, "training_queue_%zu_time_stats", training_thread->id); + snprintfz(name_buf, 1024, "training_queue_%zu_time_stats", training_thread->id); + + training_thread->training_time_stats_rs = rrdset_create( + localhost, + "netdata", // type + id_buf, // id + name_buf, // name + NETDATA_ML_CHART_FAMILY, // family + "netdata.training_time_stats", // ctx + "Training time stats", // title + "milliseconds", // units + NETDATA_ML_PLUGIN, // plugin + NETDATA_ML_MODULE_TRAINING, // module + NETDATA_ML_CHART_PRIO_TRAINING_TIME_STATS, // priority + localhost->rrd_update_every, // update_every + RRDSET_TYPE_LINE// chart_type + ); + rrdset_flag_set(training_thread->training_time_stats_rs, RRDSET_FLAG_ANOMALY_DETECTION); + + training_thread->training_time_stats_allotted_rd = + rrddim_add(training_thread->training_time_stats_rs, "allotted", NULL, 1, 1000, RRD_ALGORITHM_ABSOLUTE); + training_thread->training_time_stats_consumed_rd = + rrddim_add(training_thread->training_time_stats_rs, "consumed", NULL, 1, 1000, RRD_ALGORITHM_ABSOLUTE); + training_thread->training_time_stats_remaining_rd = + rrddim_add(training_thread->training_time_stats_rs, "remaining", NULL, 1, 1000, RRD_ALGORITHM_ABSOLUTE); + } + + rrddim_set_by_pointer(training_thread->training_time_stats_rs, + training_thread->training_time_stats_allotted_rd, ts.allotted_ut); + rrddim_set_by_pointer(training_thread->training_time_stats_rs, + training_thread->training_time_stats_consumed_rd, ts.consumed_ut); + rrddim_set_by_pointer(training_thread->training_time_stats_rs, + training_thread->training_time_stats_remaining_rd, ts.remaining_ut); + + rrdset_done(training_thread->training_time_stats_rs); + } + + /* + * training result stats + */ + { + if (!training_thread->training_results_rs) { + char id_buf[1024]; + char name_buf[1024]; + + snprintfz(id_buf, 1024, "training_queue_%zu_results", training_thread->id); + snprintfz(name_buf, 1024, "training_queue_%zu_results", training_thread->id); + + training_thread->training_results_rs = rrdset_create( + localhost, + "netdata", // type + id_buf, // id + name_buf, // name + NETDATA_ML_CHART_FAMILY, // family + "netdata.training_results", // ctx + "Training results", // title + "events", // units + NETDATA_ML_PLUGIN, // plugin + NETDATA_ML_MODULE_TRAINING, // module + NETDATA_ML_CHART_PRIO_TRAINING_RESULTS, // priority + localhost->rrd_update_every, // update_every + RRDSET_TYPE_LINE// chart_type + ); + rrdset_flag_set(training_thread->training_results_rs, RRDSET_FLAG_ANOMALY_DETECTION); + + training_thread->training_results_ok_rd = + rrddim_add(training_thread->training_results_rs, "ok", NULL, 1, 1, RRD_ALGORITHM_ABSOLUTE); + training_thread->training_results_invalid_query_time_range_rd = + rrddim_add(training_thread->training_results_rs, "invalid-queries", NULL, 1, 1, RRD_ALGORITHM_ABSOLUTE); + training_thread->training_results_not_enough_collected_values_rd = + rrddim_add(training_thread->training_results_rs, "not-enough-values", NULL, 1, 1, RRD_ALGORITHM_ABSOLUTE); + training_thread->training_results_null_acquired_dimension_rd = + rrddim_add(training_thread->training_results_rs, "null-acquired-dimensions", NULL, 1, 1, RRD_ALGORITHM_ABSOLUTE); + training_thread->training_results_chart_under_replication_rd = + rrddim_add(training_thread->training_results_rs, "chart-under-replication", NULL, 1, 1, RRD_ALGORITHM_ABSOLUTE); + } + + rrddim_set_by_pointer(training_thread->training_results_rs, + training_thread->training_results_ok_rd, ts.training_result_ok); + rrddim_set_by_pointer(training_thread->training_results_rs, + training_thread->training_results_invalid_query_time_range_rd, ts.training_result_invalid_query_time_range); + rrddim_set_by_pointer(training_thread->training_results_rs, + training_thread->training_results_not_enough_collected_values_rd, ts.training_result_not_enough_collected_values); + rrddim_set_by_pointer(training_thread->training_results_rs, + training_thread->training_results_null_acquired_dimension_rd, ts.training_result_null_acquired_dimension); + rrddim_set_by_pointer(training_thread->training_results_rs, + training_thread->training_results_chart_under_replication_rd, ts.training_result_chart_under_replication); + + rrdset_done(training_thread->training_results_rs); + } +} + +void ml_update_global_statistics_charts(uint64_t models_consulted) { + if (Cfg.enable_statistics_charts) { + static RRDSET *st = NULL; + static RRDDIM *rd = NULL; + + if (unlikely(!st)) { + st = rrdset_create_localhost( + "netdata" // type + , "ml_models_consulted" // id + , NULL // name + , NETDATA_ML_CHART_FAMILY // family + , NULL // context + , "KMeans models used for prediction" // title + , "models" // units + , NETDATA_ML_PLUGIN // plugin + , NETDATA_ML_MODULE_DETECTION // module + , NETDATA_ML_CHART_PRIO_MACHINE_LEARNING_STATUS // priority + , localhost->rrd_update_every // update_every + , RRDSET_TYPE_AREA // chart_type + ); + + rd = rrddim_add(st, "num_models_consulted", NULL, 1, 1, RRD_ALGORITHM_INCREMENTAL); + } + + rrddim_set_by_pointer(st, rd, (collected_number) models_consulted); + + rrdset_done(st); + } +} diff --git a/ml/ad_charts.h b/ml/ad_charts.h new file mode 100644 index 000000000..349b369a2 --- /dev/null +++ b/ml/ad_charts.h @@ -0,0 +1,14 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +#ifndef ML_ADCHARTS_H +#define ML_ADCHARTS_H + +#include "ml-private.h" + +void ml_update_dimensions_chart(ml_host_t *host, const ml_machine_learning_stats_t &mls); + +void ml_update_host_and_detection_rate_charts(ml_host_t *host, collected_number anomaly_rate); + +void ml_update_training_statistics_chart(ml_training_thread_t *training_thread, const ml_training_stats_t &ts); + +#endif /* ML_ADCHARTS_H */ diff --git a/ml/ml-dummy.c b/ml/ml-dummy.c index 178018898..708ab68ea 100644 --- a/ml/ml-dummy.c +++ b/ml/ml-dummy.c @@ -8,78 +8,97 @@ bool ml_capable() { return false; } -bool ml_enabled(RRDHOST *RH) { - (void) RH; +bool ml_enabled(RRDHOST *rh) { + UNUSED(rh); + return false; +} + +bool ml_streaming_enabled() { return false; } void ml_init(void) {} -void ml_host_new(RRDHOST *RH) { - UNUSED(RH); +void ml_fini(void) {} + +void ml_start_threads(void) {} + +void ml_stop_threads(void) {} + +void ml_host_new(RRDHOST *rh) { + UNUSED(rh); } -void ml_host_delete(RRDHOST *RH) { - UNUSED(RH); +void ml_host_delete(RRDHOST *rh) { + UNUSED(rh); } -void ml_chart_new(RRDSET *RS) { - UNUSED(RS); +void ml_host_start_training_thread(RRDHOST *rh) { + UNUSED(rh); } -void ml_chart_delete(RRDSET *RS) { - UNUSED(RS); +void ml_host_stop_training_thread(RRDHOST *rh) { + UNUSED(rh); } -void ml_dimension_new(RRDDIM *RD) { - UNUSED(RD); +void ml_host_cancel_training_thread(RRDHOST *rh) { + UNUSED(rh); } -void ml_dimension_delete(RRDDIM *RD) { - UNUSED(RD); +void ml_host_get_info(RRDHOST *rh, BUFFER *wb) { + UNUSED(rh); + UNUSED(wb); } -void ml_start_anomaly_detection_threads(RRDHOST *RH) { - UNUSED(RH); +void ml_host_get_models(RRDHOST *rh, BUFFER *wb) { + UNUSED(rh); + UNUSED(wb); } -void ml_stop_anomaly_detection_threads(RRDHOST *RH) { - UNUSED(RH); +void ml_host_get_runtime_info(RRDHOST *rh) { + UNUSED(rh); } -char *ml_get_host_info(RRDHOST *RH) { - (void) RH; - return NULL; +void ml_chart_new(RRDSET *rs) { + UNUSED(rs); } -char *ml_get_host_runtime_info(RRDHOST *RH) { - (void) RH; - return NULL; +void ml_chart_delete(RRDSET *rs) { + UNUSED(rs); } -void ml_chart_update_begin(RRDSET *RS) { - (void) RS; +bool ml_chart_update_begin(RRDSET *rs) { + UNUSED(rs); + return false; } -void ml_chart_update_end(RRDSET *RS) { - (void) RS; +void ml_chart_update_end(RRDSET *rs) { + UNUSED(rs); } -char *ml_get_host_models(RRDHOST *RH) { - (void) RH; - return NULL; +void ml_dimension_new(RRDDIM *rd) { + UNUSED(rd); } -bool ml_is_anomalous(RRDDIM *RD, time_t CurrT, double Value, bool Exists) { - (void) RD; - (void) CurrT; - (void) Value; - (void) Exists; - return false; +void ml_dimension_delete(RRDDIM *rd) { + UNUSED(rd); } -bool ml_streaming_enabled() { +bool ml_dimension_is_anomalous(RRDDIM *rd, time_t curr_time, double value, bool exists) { + UNUSED(rd); + UNUSED(curr_time); + UNUSED(value); + UNUSED(exists); return false; } +int ml_dimension_load_models(RRDDIM *rd) { + UNUSED(rd); + return 0; +} + +void ml_update_global_statistics_charts(uint64_t models_consulted) { + UNUSED(models_consulted); +} + #endif diff --git a/ml/ml-private.h b/ml/ml-private.h index e479f2351..327cc59a2 100644 --- a/ml/ml-private.h +++ b/ml/ml-private.h @@ -1,13 +1,335 @@ // SPDX-License-Identifier: GPL-3.0-or-later -#ifndef ML_PRIVATE_H -#define ML_PRIVATE_H +#ifndef NETDATA_ML_PRIVATE_H +#define NETDATA_ML_PRIVATE_H -#include "KMeans.h" +#include "dlib/matrix.h" #include "ml/ml.h" -#include -#include -#include +#include +#include -#endif /* ML_PRIVATE_H */ +typedef double calculated_number_t; +typedef dlib::matrix DSample; + +/* + * Features + */ + +typedef struct { + size_t diff_n; + size_t smooth_n; + size_t lag_n; + + calculated_number_t *dst; + size_t dst_n; + + calculated_number_t *src; + size_t src_n; + + std::vector &preprocessed_features; +} ml_features_t; + +/* + * KMeans + */ + +typedef struct { + std::vector cluster_centers; + + calculated_number_t min_dist; + calculated_number_t max_dist; + + uint32_t after; + uint32_t before; +} ml_kmeans_t; + +typedef struct machine_learning_stats_t { + size_t num_machine_learning_status_enabled; + size_t num_machine_learning_status_disabled_sp; + + size_t num_metric_type_constant; + size_t num_metric_type_variable; + + size_t num_training_status_untrained; + size_t num_training_status_pending_without_model; + size_t num_training_status_trained; + size_t num_training_status_pending_with_model; + + size_t num_anomalous_dimensions; + size_t num_normal_dimensions; +} ml_machine_learning_stats_t; + +typedef struct training_stats_t { + size_t queue_size; + size_t num_popped_items; + + usec_t allotted_ut; + usec_t consumed_ut; + usec_t remaining_ut; + + size_t training_result_ok; + size_t training_result_invalid_query_time_range; + size_t training_result_not_enough_collected_values; + size_t training_result_null_acquired_dimension; + size_t training_result_chart_under_replication; +} ml_training_stats_t; + +enum ml_metric_type { + // The dimension has constant values, no need to train + METRIC_TYPE_CONSTANT, + + // The dimension's values fluctuate, we need to generate a model + METRIC_TYPE_VARIABLE, +}; + +enum ml_machine_learning_status { + // Enable training/prediction + MACHINE_LEARNING_STATUS_ENABLED, + + // Disable because configuration pattern matches the chart's id + MACHINE_LEARNING_STATUS_DISABLED_DUE_TO_EXCLUDED_CHART, +}; + +enum ml_training_status { + // We don't have a model for this dimension + TRAINING_STATUS_UNTRAINED, + + // Request for training sent, but we don't have any models yet + TRAINING_STATUS_PENDING_WITHOUT_MODEL, + + // Request to update existing models sent + TRAINING_STATUS_PENDING_WITH_MODEL, + + // Have a valid, up-to-date model + TRAINING_STATUS_TRAINED, +}; + +enum ml_training_result { + // We managed to create a KMeans model + TRAINING_RESULT_OK, + + // Could not query DB with a correct time range + TRAINING_RESULT_INVALID_QUERY_TIME_RANGE, + + // Did not gather enough data from DB to run KMeans + TRAINING_RESULT_NOT_ENOUGH_COLLECTED_VALUES, + + // Acquired a null dimension + TRAINING_RESULT_NULL_ACQUIRED_DIMENSION, + + // Chart is under replication + TRAINING_RESULT_CHART_UNDER_REPLICATION, +}; + +typedef struct { + // Chart/dimension we want to train + char machine_guid[GUID_LEN + 1]; + STRING *chart_id; + STRING *dimension_id; + + // Creation time of request + time_t request_time; + + // First/last entry of this dimension in DB + // at the point the request was made + time_t first_entry_on_request; + time_t last_entry_on_request; +} ml_training_request_t; + +typedef struct { + // Time when the request for this response was made + time_t request_time; + + // First/last entry of the dimension in DB when generating the request + time_t first_entry_on_request; + time_t last_entry_on_request; + + // First/last entry of the dimension in DB when generating the response + time_t first_entry_on_response; + time_t last_entry_on_response; + + // After/Before timestamps of our DB query + time_t query_after_t; + time_t query_before_t; + + // Actual after/before returned by the DB query ops + time_t db_after_t; + time_t db_before_t; + + // Number of doubles returned by the DB query + size_t collected_values; + + // Number of values we return to the caller + size_t total_values; + + // Result of training response + enum ml_training_result result; +} ml_training_response_t; + +/* + * Queue +*/ + +typedef struct { + std::queue internal; + netdata_mutex_t mutex; + pthread_cond_t cond_var; + std::atomic exit; +} ml_queue_t; + +typedef struct { + RRDDIM *rd; + + enum ml_metric_type mt; + enum ml_training_status ts; + enum ml_machine_learning_status mls; + + ml_training_response_t tr; + time_t last_training_time; + + std::vector cns; + + std::vector km_contexts; + netdata_mutex_t mutex; + ml_kmeans_t kmeans; + std::vector feature; +} ml_dimension_t; + +typedef struct { + RRDSET *rs; + ml_machine_learning_stats_t mls; + + netdata_mutex_t mutex; +} ml_chart_t; + +void ml_chart_update_dimension(ml_chart_t *chart, ml_dimension_t *dim, bool is_anomalous); + +typedef struct { + RRDHOST *rh; + + ml_machine_learning_stats_t mls; + + calculated_number_t host_anomaly_rate; + + netdata_mutex_t mutex; + + ml_queue_t *training_queue; + + /* + * bookkeeping for anomaly detection charts + */ + + RRDSET *machine_learning_status_rs; + RRDDIM *machine_learning_status_enabled_rd; + RRDDIM *machine_learning_status_disabled_sp_rd; + + RRDSET *metric_type_rs; + RRDDIM *metric_type_constant_rd; + RRDDIM *metric_type_variable_rd; + + RRDSET *training_status_rs; + RRDDIM *training_status_untrained_rd; + RRDDIM *training_status_pending_without_model_rd; + RRDDIM *training_status_trained_rd; + RRDDIM *training_status_pending_with_model_rd; + + RRDSET *dimensions_rs; + RRDDIM *dimensions_anomalous_rd; + RRDDIM *dimensions_normal_rd; + + RRDSET *anomaly_rate_rs; + RRDDIM *anomaly_rate_rd; + + RRDSET *detector_events_rs; + RRDDIM *detector_events_above_threshold_rd; + RRDDIM *detector_events_new_anomaly_event_rd; +} ml_host_t; + +typedef struct { + uuid_t metric_uuid; + ml_kmeans_t kmeans; +} ml_model_info_t; + +typedef struct { + size_t id; + netdata_thread_t nd_thread; + netdata_mutex_t nd_mutex; + + ml_queue_t *training_queue; + ml_training_stats_t training_stats; + + calculated_number_t *training_cns; + calculated_number_t *scratch_training_cns; + std::vector training_samples; + + std::vector pending_model_info; + + RRDSET *queue_stats_rs; + RRDDIM *queue_stats_queue_size_rd; + RRDDIM *queue_stats_popped_items_rd; + + RRDSET *training_time_stats_rs; + RRDDIM *training_time_stats_allotted_rd; + RRDDIM *training_time_stats_consumed_rd; + RRDDIM *training_time_stats_remaining_rd; + + RRDSET *training_results_rs; + RRDDIM *training_results_ok_rd; + RRDDIM *training_results_invalid_query_time_range_rd; + RRDDIM *training_results_not_enough_collected_values_rd; + RRDDIM *training_results_null_acquired_dimension_rd; + RRDDIM *training_results_chart_under_replication_rd; +} ml_training_thread_t; + +typedef struct { + bool enable_anomaly_detection; + + unsigned max_train_samples; + unsigned min_train_samples; + unsigned train_every; + + unsigned num_models_to_use; + + unsigned db_engine_anomaly_rate_every; + + unsigned diff_n; + unsigned smooth_n; + unsigned lag_n; + + double random_sampling_ratio; + unsigned max_kmeans_iters; + + double dimension_anomaly_score_threshold; + + double host_anomaly_rate_threshold; + RRDR_TIME_GROUPING anomaly_detection_grouping_method; + time_t anomaly_detection_query_duration; + + bool stream_anomaly_detection_charts; + + std::string hosts_to_skip; + SIMPLE_PATTERN *sp_host_to_skip; + + std::string charts_to_skip; + SIMPLE_PATTERN *sp_charts_to_skip; + + std::vector random_nums; + + netdata_thread_t detection_thread; + std::atomic detection_stop; + + size_t num_training_threads; + size_t flush_models_batch_size; + + std::vector training_threads; + std::atomic training_stop; + + bool enable_statistics_charts; +} ml_config_t; + +void ml_config_load(ml_config_t *cfg); + +extern ml_config_t Cfg; + +#endif /* NETDATA_ML_PRIVATE_H */ diff --git a/ml/ml.cc b/ml/ml.cc index 461c83baa..1f49f4bf1 100644 --- a/ml/ml.cc +++ b/ml/ml.cc @@ -1,202 +1,1621 @@ // SPDX-License-Identifier: GPL-3.0-or-later -#include "Config.h" -#include "Dimension.h" -#include "Chart.h" -#include "Host.h" +#include + +#include "ml-private.h" #include -using namespace ml; +#include "ad_charts.h" +#include "database/sqlite/sqlite3.h" + +#define WORKER_TRAIN_QUEUE_POP 0 +#define WORKER_TRAIN_ACQUIRE_DIMENSION 1 +#define WORKER_TRAIN_QUERY 2 +#define WORKER_TRAIN_KMEANS 3 +#define WORKER_TRAIN_UPDATE_MODELS 4 +#define WORKER_TRAIN_RELEASE_DIMENSION 5 +#define WORKER_TRAIN_UPDATE_HOST 6 +#define WORKER_TRAIN_FLUSH_MODELS 7 + +static sqlite3 *db = NULL; +static netdata_mutex_t db_mutex = NETDATA_MUTEX_INITIALIZER; + +/* + * Functions to convert enums to strings +*/ + +__attribute__((unused)) static const char * +ml_machine_learning_status_to_string(enum ml_machine_learning_status mls) +{ + switch (mls) { + case MACHINE_LEARNING_STATUS_ENABLED: + return "enabled"; + case MACHINE_LEARNING_STATUS_DISABLED_DUE_TO_EXCLUDED_CHART: + return "disabled-sp"; + default: + return "unknown"; + } +} + +__attribute__((unused)) static const char * +ml_metric_type_to_string(enum ml_metric_type mt) +{ + switch (mt) { + case METRIC_TYPE_CONSTANT: + return "constant"; + case METRIC_TYPE_VARIABLE: + return "variable"; + default: + return "unknown"; + } +} + +__attribute__((unused)) static const char * +ml_training_status_to_string(enum ml_training_status ts) +{ + switch (ts) { + case TRAINING_STATUS_PENDING_WITH_MODEL: + return "pending-with-model"; + case TRAINING_STATUS_PENDING_WITHOUT_MODEL: + return "pending-without-model"; + case TRAINING_STATUS_TRAINED: + return "trained"; + case TRAINING_STATUS_UNTRAINED: + return "untrained"; + default: + return "unknown"; + } +} + +__attribute__((unused)) static const char * +ml_training_result_to_string(enum ml_training_result tr) +{ + switch (tr) { + case TRAINING_RESULT_OK: + return "ok"; + case TRAINING_RESULT_INVALID_QUERY_TIME_RANGE: + return "invalid-query"; + case TRAINING_RESULT_NOT_ENOUGH_COLLECTED_VALUES: + return "missing-values"; + case TRAINING_RESULT_NULL_ACQUIRED_DIMENSION: + return "null-acquired-dim"; + case TRAINING_RESULT_CHART_UNDER_REPLICATION: + return "chart-under-replication"; + default: + return "unknown"; + } +} + +/* + * Features +*/ + +// subtract elements that are `diff_n` positions apart +static void +ml_features_diff(ml_features_t *features) +{ + if (features->diff_n == 0) + return; + + for (size_t idx = 0; idx != (features->src_n - features->diff_n); idx++) { + size_t high = (features->src_n - 1) - idx; + size_t low = high - features->diff_n; + + features->dst[low] = features->src[high] - features->src[low]; + } + + size_t n = features->src_n - features->diff_n; + memcpy(features->src, features->dst, n * sizeof(calculated_number_t)); + + for (size_t idx = features->src_n - features->diff_n; idx != features->src_n; idx++) + features->src[idx] = 0.0; +} + +// a function that computes the window average of an array inplace +static void +ml_features_smooth(ml_features_t *features) +{ + calculated_number_t sum = 0.0; + + size_t idx = 0; + for (; idx != features->smooth_n - 1; idx++) + sum += features->src[idx]; + + for (; idx != (features->src_n - features->diff_n); idx++) { + sum += features->src[idx]; + calculated_number_t prev_cn = features->src[idx - (features->smooth_n - 1)]; + features->src[idx - (features->smooth_n - 1)] = sum / features->smooth_n; + sum -= prev_cn; + } + + for (idx = 0; idx != features->smooth_n; idx++) + features->src[(features->src_n - 1) - idx] = 0.0; +} + +// create lag'd vectors out of the preprocessed buffer +static void +ml_features_lag(ml_features_t *features) +{ + size_t n = features->src_n - features->diff_n - features->smooth_n + 1 - features->lag_n; + features->preprocessed_features.resize(n); + + unsigned target_num_samples = Cfg.max_train_samples * Cfg.random_sampling_ratio; + double sampling_ratio = std::min(static_cast(target_num_samples) / n, 1.0); + + uint32_t max_mt = std::numeric_limits::max(); + uint32_t cutoff = static_cast(max_mt) * sampling_ratio; + + size_t sample_idx = 0; + + for (size_t idx = 0; idx != n; idx++) { + DSample &DS = features->preprocessed_features[sample_idx++]; + DS.set_size(features->lag_n); + + if (Cfg.random_nums[idx] > cutoff) { + sample_idx--; + continue; + } + + for (size_t feature_idx = 0; feature_idx != features->lag_n + 1; feature_idx++) + DS(feature_idx) = features->src[idx + feature_idx]; + } + + features->preprocessed_features.resize(sample_idx); +} + +static void +ml_features_preprocess(ml_features_t *features) +{ + ml_features_diff(features); + ml_features_smooth(features); + ml_features_lag(features); +} + +/* + * KMeans +*/ + +static void +ml_kmeans_init(ml_kmeans_t *kmeans) +{ + kmeans->cluster_centers.reserve(2); + kmeans->min_dist = std::numeric_limits::max(); + kmeans->max_dist = std::numeric_limits::min(); +} + +static void +ml_kmeans_train(ml_kmeans_t *kmeans, const ml_features_t *features, time_t after, time_t before) +{ + kmeans->after = (uint32_t) after; + kmeans->before = (uint32_t) before; + + kmeans->min_dist = std::numeric_limits::max(); + kmeans->max_dist = std::numeric_limits::min(); + + kmeans->cluster_centers.clear(); + + dlib::pick_initial_centers(2, kmeans->cluster_centers, features->preprocessed_features); + dlib::find_clusters_using_kmeans(features->preprocessed_features, kmeans->cluster_centers, Cfg.max_kmeans_iters); + + for (const auto &preprocessed_feature : features->preprocessed_features) { + calculated_number_t mean_dist = 0.0; + + for (const auto &cluster_center : kmeans->cluster_centers) { + mean_dist += dlib::length(cluster_center - preprocessed_feature); + } + + mean_dist /= kmeans->cluster_centers.size(); + + if (mean_dist < kmeans->min_dist) + kmeans->min_dist = mean_dist; + + if (mean_dist > kmeans->max_dist) + kmeans->max_dist = mean_dist; + } +} + +static calculated_number_t +ml_kmeans_anomaly_score(const ml_kmeans_t *kmeans, const DSample &DS) +{ + calculated_number_t mean_dist = 0.0; + for (const auto &CC: kmeans->cluster_centers) + mean_dist += dlib::length(CC - DS); + + mean_dist /= kmeans->cluster_centers.size(); + + if (kmeans->max_dist == kmeans->min_dist) + return 0.0; + + calculated_number_t anomaly_score = 100.0 * std::abs((mean_dist - kmeans->min_dist) / (kmeans->max_dist - kmeans->min_dist)); + return (anomaly_score > 100.0) ? 100.0 : anomaly_score; +} + +/* + * Queue +*/ + +static ml_queue_t * +ml_queue_init() +{ + ml_queue_t *q = new ml_queue_t(); + + netdata_mutex_init(&q->mutex); + pthread_cond_init(&q->cond_var, NULL); + q->exit = false; + return q; +} + +static void +ml_queue_destroy(ml_queue_t *q) +{ + netdata_mutex_destroy(&q->mutex); + pthread_cond_destroy(&q->cond_var); + delete q; +} + +static void +ml_queue_push(ml_queue_t *q, const ml_training_request_t req) +{ + netdata_mutex_lock(&q->mutex); + q->internal.push(req); + pthread_cond_signal(&q->cond_var); + netdata_mutex_unlock(&q->mutex); +} + +static ml_training_request_t +ml_queue_pop(ml_queue_t *q) +{ + netdata_mutex_lock(&q->mutex); + + ml_training_request_t req = { + {'\0'}, // machine_guid + NULL, // chart id + NULL, // dimension id + 0, // current time + 0, // first entry + 0 // last entry + }; + + while (q->internal.empty()) { + pthread_cond_wait(&q->cond_var, &q->mutex); + + if (q->exit) { + netdata_mutex_unlock(&q->mutex); + + // We return a dummy request because the queue has been signaled + return req; + } + } + + req = q->internal.front(); + q->internal.pop(); + + netdata_mutex_unlock(&q->mutex); + return req; +} + +static size_t +ml_queue_size(ml_queue_t *q) +{ + netdata_mutex_lock(&q->mutex); + size_t size = q->internal.size(); + netdata_mutex_unlock(&q->mutex); + return size; +} + +static void +ml_queue_signal(ml_queue_t *q) +{ + netdata_mutex_lock(&q->mutex); + q->exit = true; + pthread_cond_signal(&q->cond_var); + netdata_mutex_unlock(&q->mutex); +} + +/* + * Dimension +*/ + +static std::pair +ml_dimension_calculated_numbers(ml_training_thread_t *training_thread, ml_dimension_t *dim, const ml_training_request_t &training_request) +{ + ml_training_response_t training_response = {}; + + training_response.request_time = training_request.request_time; + training_response.first_entry_on_request = training_request.first_entry_on_request; + training_response.last_entry_on_request = training_request.last_entry_on_request; + + training_response.first_entry_on_response = rrddim_first_entry_s_of_tier(dim->rd, 0); + training_response.last_entry_on_response = rrddim_last_entry_s_of_tier(dim->rd, 0); + + size_t min_n = Cfg.min_train_samples; + size_t max_n = Cfg.max_train_samples; + + // Figure out what our time window should be. + training_response.query_before_t = training_response.last_entry_on_response; + training_response.query_after_t = std::max( + training_response.query_before_t - static_cast((max_n - 1) * dim->rd->update_every), + training_response.first_entry_on_response + ); + + if (training_response.query_after_t >= training_response.query_before_t) { + training_response.result = TRAINING_RESULT_INVALID_QUERY_TIME_RANGE; + return { NULL, training_response }; + } + + if (rrdset_is_replicating(dim->rd->rrdset)) { + training_response.result = TRAINING_RESULT_CHART_UNDER_REPLICATION; + return { NULL, training_response }; + } + + /* + * Execute the query + */ + struct storage_engine_query_handle handle; + + storage_engine_query_init(dim->rd->tiers[0].backend, dim->rd->tiers[0].db_metric_handle, &handle, + training_response.query_after_t, training_response.query_before_t, + STORAGE_PRIORITY_BEST_EFFORT); + + size_t idx = 0; + memset(training_thread->training_cns, 0, sizeof(calculated_number_t) * max_n * (Cfg.lag_n + 1)); + calculated_number_t last_value = std::numeric_limits::quiet_NaN(); + + while (!storage_engine_query_is_finished(&handle)) { + if (idx == max_n) + break; + + STORAGE_POINT sp = storage_engine_query_next_metric(&handle); + + time_t timestamp = sp.end_time_s; + calculated_number_t value = sp.sum / sp.count; + + if (netdata_double_isnumber(value)) { + if (!training_response.db_after_t) + training_response.db_after_t = timestamp; + training_response.db_before_t = timestamp; + + training_thread->training_cns[idx] = value; + last_value = training_thread->training_cns[idx]; + training_response.collected_values++; + } else + training_thread->training_cns[idx] = last_value; + + idx++; + } + storage_engine_query_finalize(&handle); + + global_statistics_ml_query_completed(/* points_read */ idx); + + training_response.total_values = idx; + if (training_response.collected_values < min_n) { + training_response.result = TRAINING_RESULT_NOT_ENOUGH_COLLECTED_VALUES; + return { NULL, training_response }; + } + + // Find first non-NaN value. + for (idx = 0; std::isnan(training_thread->training_cns[idx]); idx++, training_response.total_values--) { } + + // Overwrite NaN values. + if (idx != 0) + memmove(training_thread->training_cns, &training_thread->training_cns[idx], sizeof(calculated_number_t) * training_response.total_values); + + training_response.result = TRAINING_RESULT_OK; + return { training_thread->training_cns, training_response }; +} + +const char *db_models_create_table = + "CREATE TABLE IF NOT EXISTS models(" + " dim_id BLOB, after INT, before INT," + " min_dist REAL, max_dist REAL," + " c00 REAL, c01 REAL, c02 REAL, c03 REAL, c04 REAL, c05 REAL," + " c10 REAL, c11 REAL, c12 REAL, c13 REAL, c14 REAL, c15 REAL," + " PRIMARY KEY(dim_id, after)" + ");"; + +const char *db_models_add_model = + "INSERT OR REPLACE INTO models(" + " dim_id, after, before," + " min_dist, max_dist," + " c00, c01, c02, c03, c04, c05," + " c10, c11, c12, c13, c14, c15)" + "VALUES(" + " @dim_id, @after, @before," + " @min_dist, @max_dist," + " @c00, @c01, @c02, @c03, @c04, @c05," + " @c10, @c11, @c12, @c13, @c14, @c15);"; + +const char *db_models_load = + "SELECT * FROM models " + "WHERE dim_id = @dim_id AND after >= @after ORDER BY before ASC;"; + +const char *db_models_delete = + "DELETE FROM models " + "WHERE dim_id = @dim_id AND before < @before;"; + +static int +ml_dimension_add_model(const uuid_t *metric_uuid, const ml_kmeans_t *km) +{ + static __thread sqlite3_stmt *res = NULL; + int param = 0; + int rc = 0; + + if (unlikely(!db)) { + error_report("Database has not been initialized"); + return 1; + } + + if (unlikely(!res)) { + rc = prepare_statement(db, db_models_add_model, &res); + if (unlikely(rc != SQLITE_OK)) { + error_report("Failed to prepare statement to store model, rc = %d", rc); + return 1; + } + } + + rc = sqlite3_bind_blob(res, ++param, metric_uuid, sizeof(*metric_uuid), SQLITE_STATIC); + if (unlikely(rc != SQLITE_OK)) + goto bind_fail; + + rc = sqlite3_bind_int(res, ++param, (int) km->after); + if (unlikely(rc != SQLITE_OK)) + goto bind_fail; + + rc = sqlite3_bind_int(res, ++param, (int) km->before); + if (unlikely(rc != SQLITE_OK)) + goto bind_fail; + + rc = sqlite3_bind_double(res, ++param, km->min_dist); + if (unlikely(rc != SQLITE_OK)) + goto bind_fail; + + rc = sqlite3_bind_double(res, ++param, km->max_dist); + if (unlikely(rc != SQLITE_OK)) + goto bind_fail; + + if (km->cluster_centers.size() != 2) + fatal("Expected 2 cluster centers, got %zu", km->cluster_centers.size()); + + for (const DSample &ds : km->cluster_centers) { + if (ds.size() != 6) + fatal("Expected dsample with 6 dimensions, got %ld", ds.size()); + + for (long idx = 0; idx != ds.size(); idx++) { + calculated_number_t cn = ds(idx); + int rc = sqlite3_bind_double(res, ++param, cn); + if (unlikely(rc != SQLITE_OK)) + goto bind_fail; + } + } + + rc = execute_insert(res); + if (unlikely(rc != SQLITE_DONE)) + error_report("Failed to store model, rc = %d", rc); + + rc = sqlite3_reset(res); + if (unlikely(rc != SQLITE_OK)) + error_report("Failed to reset statement when storing model, rc = %d", rc); + + return 0; + +bind_fail: + error_report("Failed to bind parameter %d to store model, rc = %d", param, rc); + rc = sqlite3_reset(res); + if (unlikely(rc != SQLITE_OK)) + error_report("Failed to reset statement to store model, rc = %d", rc); + return 1; +} + +static int +ml_dimension_delete_models(const uuid_t *metric_uuid, time_t before) +{ + static __thread sqlite3_stmt *res = NULL; + int rc = 0; + int param = 0; + + if (unlikely(!db)) { + error_report("Database has not been initialized"); + return 1; + } + + if (unlikely(!res)) { + rc = prepare_statement(db, db_models_delete, &res); + if (unlikely(rc != SQLITE_OK)) { + error_report("Failed to prepare statement to delete models, rc = %d", rc); + return 1; + } + } + + rc = sqlite3_bind_blob(res, ++param, metric_uuid, sizeof(*metric_uuid), SQLITE_STATIC); + if (unlikely(rc != SQLITE_OK)) + goto bind_fail; + + rc = sqlite3_bind_int(res, ++param, (int) before); + if (unlikely(rc != SQLITE_OK)) + goto bind_fail; + + rc = execute_insert(res); + if (unlikely(rc != SQLITE_DONE)) + error_report("Failed to delete models, rc = %d", rc); + + rc = sqlite3_reset(res); + if (unlikely(rc != SQLITE_OK)) + error_report("Failed to reset statement when deleting models, rc = %d", rc); + + return 0; + +bind_fail: + error_report("Failed to bind parameter %d to delete models, rc = %d", param, rc); + rc = sqlite3_reset(res); + if (unlikely(rc != SQLITE_OK)) + error_report("Failed to reset statement to delete models, rc = %d", rc); + return 1; +} + +int ml_dimension_load_models(RRDDIM *rd) { + ml_dimension_t *dim = (ml_dimension_t *) rd->ml_dimension; + if (!dim) + return 0; + + netdata_mutex_lock(&dim->mutex); + bool is_empty = dim->km_contexts.empty(); + netdata_mutex_unlock(&dim->mutex); + + if (!is_empty) + return 0; + + std::vector V; + + static __thread sqlite3_stmt *res = NULL; + int rc = 0; + int param = 0; + + if (unlikely(!db)) { + error_report("Database has not been initialized"); + return 1; + } + + if (unlikely(!res)) { + rc = prepare_statement(db, db_models_load, &res); + if (unlikely(rc != SQLITE_OK)) { + error_report("Failed to prepare statement to load models, rc = %d", rc); + return 1; + } + } + + rc = sqlite3_bind_blob(res, ++param, &dim->rd->metric_uuid, sizeof(dim->rd->metric_uuid), SQLITE_STATIC); + if (unlikely(rc != SQLITE_OK)) + goto bind_fail; + + rc = sqlite3_bind_int(res, ++param, now_realtime_usec() - (Cfg.num_models_to_use * Cfg.max_train_samples)); + if (unlikely(rc != SQLITE_OK)) + goto bind_fail; + + netdata_mutex_lock(&dim->mutex); + + dim->km_contexts.reserve(Cfg.num_models_to_use); + while ((rc = sqlite3_step_monitored(res)) == SQLITE_ROW) { + ml_kmeans_t km; + + km.after = sqlite3_column_int(res, 2); + km.before = sqlite3_column_int(res, 3); + + km.min_dist = sqlite3_column_int(res, 4); + km.max_dist = sqlite3_column_int(res, 5); + + km.cluster_centers.resize(2); + + km.cluster_centers[0].set_size(Cfg.lag_n + 1); + km.cluster_centers[0](0) = sqlite3_column_double(res, 6); + km.cluster_centers[0](1) = sqlite3_column_double(res, 7); + km.cluster_centers[0](2) = sqlite3_column_double(res, 8); + km.cluster_centers[0](3) = sqlite3_column_double(res, 9); + km.cluster_centers[0](4) = sqlite3_column_double(res, 10); + km.cluster_centers[0](5) = sqlite3_column_double(res, 11); + + km.cluster_centers[1].set_size(Cfg.lag_n + 1); + km.cluster_centers[1](0) = sqlite3_column_double(res, 12); + km.cluster_centers[1](1) = sqlite3_column_double(res, 13); + km.cluster_centers[1](2) = sqlite3_column_double(res, 14); + km.cluster_centers[1](3) = sqlite3_column_double(res, 15); + km.cluster_centers[1](4) = sqlite3_column_double(res, 16); + km.cluster_centers[1](5) = sqlite3_column_double(res, 17); + + dim->km_contexts.push_back(km); + } + + if (!dim->km_contexts.empty()) { + dim->ts = TRAINING_STATUS_TRAINED; + } + + netdata_mutex_unlock(&dim->mutex); + + if (unlikely(rc != SQLITE_DONE)) + error_report("Failed to load models, rc = %d", rc); + + rc = sqlite3_reset(res); + if (unlikely(rc != SQLITE_OK)) + error_report("Failed to reset statement when loading models, rc = %d", rc); + + return 0; + +bind_fail: + error_report("Failed to bind parameter %d to load models, rc = %d", param, rc); + rc = sqlite3_reset(res); + if (unlikely(rc != SQLITE_OK)) + error_report("Failed to reset statement to load models, rc = %d", rc); + return 1; +} + +static enum ml_training_result +ml_dimension_train_model(ml_training_thread_t *training_thread, ml_dimension_t *dim, const ml_training_request_t &training_request) +{ + worker_is_busy(WORKER_TRAIN_QUERY); + auto P = ml_dimension_calculated_numbers(training_thread, dim, training_request); + ml_training_response_t training_response = P.second; + + if (training_response.result != TRAINING_RESULT_OK) { + netdata_mutex_lock(&dim->mutex); + + dim->mt = METRIC_TYPE_CONSTANT; + + switch (dim->ts) { + case TRAINING_STATUS_PENDING_WITH_MODEL: + dim->ts = TRAINING_STATUS_TRAINED; + break; + case TRAINING_STATUS_PENDING_WITHOUT_MODEL: + dim->ts = TRAINING_STATUS_UNTRAINED; + break; + default: + break; + } + + dim->tr = training_response; + + dim->last_training_time = training_response.last_entry_on_response; + enum ml_training_result result = training_response.result; + netdata_mutex_unlock(&dim->mutex); + + return result; + } + + // compute kmeans + worker_is_busy(WORKER_TRAIN_KMEANS); + { + memcpy(training_thread->scratch_training_cns, training_thread->training_cns, + training_response.total_values * sizeof(calculated_number_t)); + + ml_features_t features = { + Cfg.diff_n, Cfg.smooth_n, Cfg.lag_n, + training_thread->scratch_training_cns, training_response.total_values, + training_thread->training_cns, training_response.total_values, + training_thread->training_samples + }; + ml_features_preprocess(&features); + + ml_kmeans_init(&dim->kmeans); + ml_kmeans_train(&dim->kmeans, &features, training_response.query_after_t, training_response.query_before_t); + } + + // update models + worker_is_busy(WORKER_TRAIN_UPDATE_MODELS); + { + netdata_mutex_lock(&dim->mutex); + + if (dim->km_contexts.size() < Cfg.num_models_to_use) { + dim->km_contexts.push_back(std::move(dim->kmeans)); + } else { + bool can_drop_middle_km = false; + + if (Cfg.num_models_to_use > 2) { + const ml_kmeans_t *old_km = &dim->km_contexts[dim->km_contexts.size() - 1]; + const ml_kmeans_t *middle_km = &dim->km_contexts[dim->km_contexts.size() - 2]; + const ml_kmeans_t *new_km = &dim->kmeans; + + can_drop_middle_km = (middle_km->after < old_km->before) && + (middle_km->before > new_km->after); + } + + if (can_drop_middle_km) { + dim->km_contexts.back() = dim->kmeans; + } else { + std::rotate(std::begin(dim->km_contexts), std::begin(dim->km_contexts) + 1, std::end(dim->km_contexts)); + dim->km_contexts[dim->km_contexts.size() - 1] = std::move(dim->kmeans); + } + } + + dim->mt = METRIC_TYPE_CONSTANT; + dim->ts = TRAINING_STATUS_TRAINED; + dim->tr = training_response; + dim->last_training_time = rrddim_last_entry_s(dim->rd); + + // Add the newly generated model to the list of pending models to flush + ml_model_info_t model_info; + uuid_copy(model_info.metric_uuid, dim->rd->metric_uuid); + model_info.kmeans = dim->km_contexts.back(); + training_thread->pending_model_info.push_back(model_info); + + netdata_mutex_unlock(&dim->mutex); + } + + return training_response.result; +} + +static void +ml_dimension_schedule_for_training(ml_dimension_t *dim, time_t curr_time) +{ + switch (dim->mt) { + case METRIC_TYPE_CONSTANT: + return; + default: + break; + } + + bool schedule_for_training = false; + + switch (dim->ts) { + case TRAINING_STATUS_PENDING_WITH_MODEL: + case TRAINING_STATUS_PENDING_WITHOUT_MODEL: + schedule_for_training = false; + break; + case TRAINING_STATUS_UNTRAINED: + schedule_for_training = true; + dim->ts = TRAINING_STATUS_PENDING_WITHOUT_MODEL; + break; + case TRAINING_STATUS_TRAINED: + if ((dim->last_training_time + (Cfg.train_every * dim->rd->update_every)) < curr_time) { + schedule_for_training = true; + dim->ts = TRAINING_STATUS_PENDING_WITH_MODEL; + } + break; + } + + if (schedule_for_training) { + ml_training_request_t req; + + memcpy(req.machine_guid, dim->rd->rrdset->rrdhost->machine_guid, GUID_LEN + 1); + req.chart_id = string_dup(dim->rd->rrdset->id); + req.dimension_id = string_dup(dim->rd->id); + req.request_time = curr_time; + req.first_entry_on_request = rrddim_first_entry_s(dim->rd); + req.last_entry_on_request = rrddim_last_entry_s(dim->rd); + + ml_host_t *host = (ml_host_t *) dim->rd->rrdset->rrdhost->ml_host; + ml_queue_push(host->training_queue, req); + } +} + +static bool +ml_dimension_predict(ml_dimension_t *dim, time_t curr_time, calculated_number_t value, bool exists) +{ + // Nothing to do if ML is disabled for this dimension + if (dim->mls != MACHINE_LEARNING_STATUS_ENABLED) + return false; + + // Don't treat values that don't exist as anomalous + if (!exists) { + dim->cns.clear(); + return false; + } + + // Save the value and return if we don't have enough values for a sample + unsigned n = Cfg.diff_n + Cfg.smooth_n + Cfg.lag_n; + if (dim->cns.size() < n) { + dim->cns.push_back(value); + return false; + } + + // Push the value and check if it's different from the last one + bool same_value = true; + std::rotate(std::begin(dim->cns), std::begin(dim->cns) + 1, std::end(dim->cns)); + if (dim->cns[n - 1] != value) + same_value = false; + dim->cns[n - 1] = value; + + // Create the sample + assert((n * (Cfg.lag_n + 1) <= 128) && + "Static buffers too small to perform prediction. " + "This should not be possible with the default clamping of feature extraction options"); + calculated_number_t src_cns[128]; + calculated_number_t dst_cns[128]; + + memset(src_cns, 0, n * (Cfg.lag_n + 1) * sizeof(calculated_number_t)); + memcpy(src_cns, dim->cns.data(), n * sizeof(calculated_number_t)); + memcpy(dst_cns, dim->cns.data(), n * sizeof(calculated_number_t)); + + ml_features_t features = { + Cfg.diff_n, Cfg.smooth_n, Cfg.lag_n, + dst_cns, n, src_cns, n, + dim->feature + }; + ml_features_preprocess(&features); + + /* + * Lock to predict and possibly schedule the dimension for training + */ + if (netdata_mutex_trylock(&dim->mutex) != 0) + return false; + + // Mark the metric time as variable if we received different values + if (!same_value) + dim->mt = METRIC_TYPE_VARIABLE; + + // Decide if the dimension needs to be scheduled for training + ml_dimension_schedule_for_training(dim, curr_time); + + // Nothing to do if we don't have a model + switch (dim->ts) { + case TRAINING_STATUS_UNTRAINED: + case TRAINING_STATUS_PENDING_WITHOUT_MODEL: { + netdata_mutex_unlock(&dim->mutex); + return false; + } + default: + break; + } + + /* + * Use the KMeans models to check if the value is anomalous + */ + + size_t sum = 0; + size_t models_consulted = 0; + + for (const auto &km_ctx : dim->km_contexts) { + models_consulted++; + + calculated_number_t anomaly_score = ml_kmeans_anomaly_score(&km_ctx, features.preprocessed_features[0]); + if (anomaly_score == std::numeric_limits::quiet_NaN()) + continue; + + if (anomaly_score < (100 * Cfg.dimension_anomaly_score_threshold)) { + global_statistics_ml_models_consulted(models_consulted); + netdata_mutex_unlock(&dim->mutex); + return false; + } + + sum += 1; + } + + netdata_mutex_unlock(&dim->mutex); + + global_statistics_ml_models_consulted(models_consulted); + return sum; +} + +/* + * Chart +*/ + +static bool +ml_chart_is_available_for_ml(ml_chart_t *chart) +{ + return rrdset_is_available_for_exporting_and_alarms(chart->rs); +} + +void +ml_chart_update_dimension(ml_chart_t *chart, ml_dimension_t *dim, bool is_anomalous) +{ + switch (dim->mls) { + case MACHINE_LEARNING_STATUS_DISABLED_DUE_TO_EXCLUDED_CHART: + chart->mls.num_machine_learning_status_disabled_sp++; + return; + case MACHINE_LEARNING_STATUS_ENABLED: { + chart->mls.num_machine_learning_status_enabled++; + + switch (dim->mt) { + case METRIC_TYPE_CONSTANT: + chart->mls.num_metric_type_constant++; + chart->mls.num_training_status_trained++; + chart->mls.num_normal_dimensions++; + return; + case METRIC_TYPE_VARIABLE: + chart->mls.num_metric_type_variable++; + break; + } + + switch (dim->ts) { + case TRAINING_STATUS_UNTRAINED: + chart->mls.num_training_status_untrained++; + return; + case TRAINING_STATUS_PENDING_WITHOUT_MODEL: + chart->mls.num_training_status_pending_without_model++; + return; + case TRAINING_STATUS_TRAINED: + chart->mls.num_training_status_trained++; + + chart->mls.num_anomalous_dimensions += is_anomalous; + chart->mls.num_normal_dimensions += !is_anomalous; + return; + case TRAINING_STATUS_PENDING_WITH_MODEL: + chart->mls.num_training_status_pending_with_model++; + + chart->mls.num_anomalous_dimensions += is_anomalous; + chart->mls.num_normal_dimensions += !is_anomalous; + return; + } + + return; + } + } +} + +/* + * Host detection & training functions +*/ + +#define WORKER_JOB_DETECTION_COLLECT_STATS 0 +#define WORKER_JOB_DETECTION_DIM_CHART 1 +#define WORKER_JOB_DETECTION_HOST_CHART 2 +#define WORKER_JOB_DETECTION_STATS 3 + +static void +ml_host_detect_once(ml_host_t *host) +{ + worker_is_busy(WORKER_JOB_DETECTION_COLLECT_STATS); + + host->mls = {}; + ml_machine_learning_stats_t mls_copy = {}; + + { + netdata_mutex_lock(&host->mutex); + + /* + * prediction/detection stats + */ + void *rsp = NULL; + rrdset_foreach_read(rsp, host->rh) { + RRDSET *rs = static_cast(rsp); -bool ml_capable() { + ml_chart_t *chart = (ml_chart_t *) rs->ml_chart; + if (!chart) + continue; + + if (!ml_chart_is_available_for_ml(chart)) + continue; + + ml_machine_learning_stats_t chart_mls = chart->mls; + + host->mls.num_machine_learning_status_enabled += chart_mls.num_machine_learning_status_enabled; + host->mls.num_machine_learning_status_disabled_sp += chart_mls.num_machine_learning_status_disabled_sp; + + host->mls.num_metric_type_constant += chart_mls.num_metric_type_constant; + host->mls.num_metric_type_variable += chart_mls.num_metric_type_variable; + + host->mls.num_training_status_untrained += chart_mls.num_training_status_untrained; + host->mls.num_training_status_pending_without_model += chart_mls.num_training_status_pending_without_model; + host->mls.num_training_status_trained += chart_mls.num_training_status_trained; + host->mls.num_training_status_pending_with_model += chart_mls.num_training_status_pending_with_model; + + host->mls.num_anomalous_dimensions += chart_mls.num_anomalous_dimensions; + host->mls.num_normal_dimensions += chart_mls.num_normal_dimensions; + } + rrdset_foreach_done(rsp); + + host->host_anomaly_rate = 0.0; + size_t NumActiveDimensions = host->mls.num_anomalous_dimensions + host->mls.num_normal_dimensions; + if (NumActiveDimensions) + host->host_anomaly_rate = static_cast(host->mls.num_anomalous_dimensions) / NumActiveDimensions; + + mls_copy = host->mls; + + netdata_mutex_unlock(&host->mutex); + } + + worker_is_busy(WORKER_JOB_DETECTION_DIM_CHART); + ml_update_dimensions_chart(host, mls_copy); + + worker_is_busy(WORKER_JOB_DETECTION_HOST_CHART); + ml_update_host_and_detection_rate_charts(host, host->host_anomaly_rate * 10000.0); +} + +typedef struct { + RRDHOST_ACQUIRED *acq_rh; + RRDSET_ACQUIRED *acq_rs; + RRDDIM_ACQUIRED *acq_rd; + ml_dimension_t *dim; +} ml_acquired_dimension_t; + +static ml_acquired_dimension_t +ml_acquired_dimension_get(char *machine_guid, STRING *chart_id, STRING *dimension_id) +{ + RRDHOST_ACQUIRED *acq_rh = NULL; + RRDSET_ACQUIRED *acq_rs = NULL; + RRDDIM_ACQUIRED *acq_rd = NULL; + ml_dimension_t *dim = NULL; + + rrd_rdlock(); + + acq_rh = rrdhost_find_and_acquire(machine_guid); + if (acq_rh) { + RRDHOST *rh = rrdhost_acquired_to_rrdhost(acq_rh); + if (rh && !rrdhost_flag_check(rh, RRDHOST_FLAG_ORPHAN | RRDHOST_FLAG_ARCHIVED)) { + acq_rs = rrdset_find_and_acquire(rh, string2str(chart_id)); + if (acq_rs) { + RRDSET *rs = rrdset_acquired_to_rrdset(acq_rs); + if (rs && !rrdset_flag_check(rs, RRDSET_FLAG_ARCHIVED | RRDSET_FLAG_OBSOLETE)) { + acq_rd = rrddim_find_and_acquire(rs, string2str(dimension_id)); + if (acq_rd) { + RRDDIM *rd = rrddim_acquired_to_rrddim(acq_rd); + if (rd) + dim = (ml_dimension_t *) rd->ml_dimension; + } + } + } + } + } + + rrd_unlock(); + + ml_acquired_dimension_t acq_dim = { + acq_rh, acq_rs, acq_rd, dim + }; + + return acq_dim; +} + +static void +ml_acquired_dimension_release(ml_acquired_dimension_t acq_dim) +{ + if (acq_dim.acq_rd) + rrddim_acquired_release(acq_dim.acq_rd); + + if (acq_dim.acq_rs) + rrdset_acquired_release(acq_dim.acq_rs); + + if (acq_dim.acq_rh) + rrdhost_acquired_release(acq_dim.acq_rh); +} + +static enum ml_training_result +ml_acquired_dimension_train(ml_training_thread_t *training_thread, ml_acquired_dimension_t acq_dim, const ml_training_request_t &tr) +{ + if (!acq_dim.dim) + return TRAINING_RESULT_NULL_ACQUIRED_DIMENSION; + + return ml_dimension_train_model(training_thread, acq_dim.dim, tr); +} + +static void * +ml_detect_main(void *arg) +{ + UNUSED(arg); + + worker_register("MLDETECT"); + worker_register_job_name(WORKER_JOB_DETECTION_COLLECT_STATS, "collect stats"); + worker_register_job_name(WORKER_JOB_DETECTION_DIM_CHART, "dim chart"); + worker_register_job_name(WORKER_JOB_DETECTION_HOST_CHART, "host chart"); + worker_register_job_name(WORKER_JOB_DETECTION_STATS, "training stats"); + + heartbeat_t hb; + heartbeat_init(&hb); + + while (!Cfg.detection_stop) { + worker_is_idle(); + heartbeat_next(&hb, USEC_PER_SEC); + + RRDHOST *rh; + rrd_rdlock(); + rrdhost_foreach_read(rh) { + if (!rh->ml_host) + continue; + + ml_host_detect_once((ml_host_t *) rh->ml_host); + } + rrd_unlock(); + + if (Cfg.enable_statistics_charts) { + // collect and update training thread stats + for (size_t idx = 0; idx != Cfg.num_training_threads; idx++) { + ml_training_thread_t *training_thread = &Cfg.training_threads[idx]; + + netdata_mutex_lock(&training_thread->nd_mutex); + ml_training_stats_t training_stats = training_thread->training_stats; + training_thread->training_stats = {}; + netdata_mutex_unlock(&training_thread->nd_mutex); + + // calc the avg values + if (training_stats.num_popped_items) { + training_stats.queue_size /= training_stats.num_popped_items; + training_stats.allotted_ut /= training_stats.num_popped_items; + training_stats.consumed_ut /= training_stats.num_popped_items; + training_stats.remaining_ut /= training_stats.num_popped_items; + } else { + training_stats.queue_size = ml_queue_size(training_thread->training_queue); + training_stats.consumed_ut = 0; + training_stats.remaining_ut = training_stats.allotted_ut; + + training_stats.training_result_ok = 0; + training_stats.training_result_invalid_query_time_range = 0; + training_stats.training_result_not_enough_collected_values = 0; + training_stats.training_result_null_acquired_dimension = 0; + training_stats.training_result_chart_under_replication = 0; + } + + ml_update_training_statistics_chart(training_thread, training_stats); + } + } + } + + return NULL; +} + +/* + * Public API +*/ + +bool ml_capable() +{ return true; } -bool ml_enabled(RRDHOST *RH) { - if (!Cfg.EnableAnomalyDetection) +bool ml_enabled(RRDHOST *rh) +{ + if (!rh) + return false; + + if (!Cfg.enable_anomaly_detection) return false; - if (simple_pattern_matches(Cfg.SP_HostsToSkip, rrdhost_hostname(RH))) + if (simple_pattern_matches(Cfg.sp_host_to_skip, rrdhost_hostname(rh))) return false; return true; } -/* - * Assumptions: - * 1) hosts outlive their sets, and sets outlive their dimensions, - * 2) dimensions always have a set that has a host. - */ +bool ml_streaming_enabled() +{ + return Cfg.stream_anomaly_detection_charts; +} -void ml_init(void) { - // Read config values - Cfg.readMLConfig(); +void ml_host_new(RRDHOST *rh) +{ + if (!ml_enabled(rh)) + return; + + ml_host_t *host = new ml_host_t(); - if (!Cfg.EnableAnomalyDetection) + host->rh = rh; + host->mls = ml_machine_learning_stats_t(); + //host->ts = ml_training_stats_t(); + + static std::atomic times_called(0); + host->training_queue = Cfg.training_threads[times_called++ % Cfg.num_training_threads].training_queue; + + host->host_anomaly_rate = 0.0; + + netdata_mutex_init(&host->mutex); + + rh->ml_host = (rrd_ml_host_t *) host; +} + +void ml_host_delete(RRDHOST *rh) +{ + ml_host_t *host = (ml_host_t *) rh->ml_host; + if (!host) return; - // Generate random numbers to efficiently sample the features we need - // for KMeans clustering. - std::random_device RD; - std::mt19937 Gen(RD()); + netdata_mutex_destroy(&host->mutex); - Cfg.RandomNums.reserve(Cfg.MaxTrainSamples); - for (size_t Idx = 0; Idx != Cfg.MaxTrainSamples; Idx++) - Cfg.RandomNums.push_back(Gen()); + delete host; + rh->ml_host = NULL; } -void ml_host_new(RRDHOST *RH) { - if (!ml_enabled(RH)) +void ml_host_get_info(RRDHOST *rh, BUFFER *wb) +{ + ml_host_t *host = (ml_host_t *) rh->ml_host; + if (!host) { + buffer_json_member_add_boolean(wb, "enabled", false); return; + } + + buffer_json_member_add_uint64(wb, "version", 1); + + buffer_json_member_add_boolean(wb, "enabled", Cfg.enable_anomaly_detection); - Host *H = new Host(RH); - RH->ml_host = reinterpret_cast(H); + buffer_json_member_add_uint64(wb, "min-train-samples", Cfg.min_train_samples); + buffer_json_member_add_uint64(wb, "max-train-samples", Cfg.max_train_samples); + buffer_json_member_add_uint64(wb, "train-every", Cfg.train_every); + + buffer_json_member_add_uint64(wb, "diff-n", Cfg.diff_n); + buffer_json_member_add_uint64(wb, "smooth-n", Cfg.smooth_n); + buffer_json_member_add_uint64(wb, "lag-n", Cfg.lag_n); + + buffer_json_member_add_double(wb, "random-sampling-ratio", Cfg.random_sampling_ratio); + buffer_json_member_add_uint64(wb, "max-kmeans-iters", Cfg.random_sampling_ratio); + + buffer_json_member_add_double(wb, "dimension-anomaly-score-threshold", Cfg.dimension_anomaly_score_threshold); + + buffer_json_member_add_string(wb, "anomaly-detection-grouping-method", + time_grouping_method2string(Cfg.anomaly_detection_grouping_method)); + + buffer_json_member_add_int64(wb, "anomaly-detection-query-duration", Cfg.anomaly_detection_query_duration); + + buffer_json_member_add_string(wb, "hosts-to-skip", Cfg.hosts_to_skip.c_str()); + buffer_json_member_add_string(wb, "charts-to-skip", Cfg.charts_to_skip.c_str()); } -void ml_host_delete(RRDHOST *RH) { - Host *H = reinterpret_cast(RH->ml_host); - if (!H) +void ml_host_get_detection_info(RRDHOST *rh, BUFFER *wb) +{ + ml_host_t *host = (ml_host_t *) rh->ml_host; + if (!host) return; - delete H; - RH->ml_host = nullptr; + netdata_mutex_lock(&host->mutex); + + buffer_json_member_add_uint64(wb, "version", 1); + buffer_json_member_add_uint64(wb, "anomalous-dimensions", host->mls.num_anomalous_dimensions); + buffer_json_member_add_uint64(wb, "normal-dimensions", host->mls.num_normal_dimensions); + buffer_json_member_add_uint64(wb, "total-dimensions", host->mls.num_anomalous_dimensions + + host->mls.num_normal_dimensions); + buffer_json_member_add_uint64(wb, "trained-dimensions", host->mls.num_training_status_trained + + host->mls.num_training_status_pending_with_model); + netdata_mutex_unlock(&host->mutex); } -void ml_chart_new(RRDSET *RS) { - Host *H = reinterpret_cast(RS->rrdhost->ml_host); - if (!H) +void ml_host_get_models(RRDHOST *rh, BUFFER *wb) +{ + UNUSED(rh); + UNUSED(wb); + + // TODO: To be implemented + error("Fetching KMeans models is not supported yet"); +} + +void ml_chart_new(RRDSET *rs) +{ + ml_host_t *host = (ml_host_t *) rs->rrdhost->ml_host; + if (!host) return; - Chart *C = new Chart(RS); - RS->ml_chart = reinterpret_cast(C); + ml_chart_t *chart = new ml_chart_t(); - H->addChart(C); + chart->rs = rs; + chart->mls = ml_machine_learning_stats_t(); + + netdata_mutex_init(&chart->mutex); + + rs->ml_chart = (rrd_ml_chart_t *) chart; } -void ml_chart_delete(RRDSET *RS) { - Host *H = reinterpret_cast(RS->rrdhost->ml_host); - if (!H) +void ml_chart_delete(RRDSET *rs) +{ + ml_host_t *host = (ml_host_t *) rs->rrdhost->ml_host; + if (!host) return; - Chart *C = reinterpret_cast(RS->ml_chart); - H->removeChart(C); + ml_chart_t *chart = (ml_chart_t *) rs->ml_chart; + + netdata_mutex_destroy(&chart->mutex); - delete C; - RS->ml_chart = nullptr; + delete chart; + rs->ml_chart = NULL; } -void ml_dimension_new(RRDDIM *RD) { - Chart *C = reinterpret_cast(RD->rrdset->ml_chart); - if (!C) +bool ml_chart_update_begin(RRDSET *rs) +{ + ml_chart_t *chart = (ml_chart_t *) rs->ml_chart; + if (!chart) + return false; + + netdata_mutex_lock(&chart->mutex); + chart->mls = {}; + return true; +} + +void ml_chart_update_end(RRDSET *rs) +{ + ml_chart_t *chart = (ml_chart_t *) rs->ml_chart; + if (!chart) return; - Dimension *D = new Dimension(RD); - RD->ml_dimension = reinterpret_cast(D); - C->addDimension(D); + netdata_mutex_unlock(&chart->mutex); } -void ml_dimension_delete(RRDDIM *RD) { - Dimension *D = reinterpret_cast(RD->ml_dimension); - if (!D) +void ml_dimension_new(RRDDIM *rd) +{ + ml_chart_t *chart = (ml_chart_t *) rd->rrdset->ml_chart; + if (!chart) return; - Chart *C = reinterpret_cast(RD->rrdset->ml_chart); - C->removeDimension(D); + ml_dimension_t *dim = new ml_dimension_t(); + + dim->rd = rd; + + dim->mt = METRIC_TYPE_CONSTANT; + dim->ts = TRAINING_STATUS_UNTRAINED; + + dim->last_training_time = 0; + + ml_kmeans_init(&dim->kmeans); + + if (simple_pattern_matches(Cfg.sp_charts_to_skip, rrdset_name(rd->rrdset))) + dim->mls = MACHINE_LEARNING_STATUS_DISABLED_DUE_TO_EXCLUDED_CHART; + else + dim->mls = MACHINE_LEARNING_STATUS_ENABLED; + + netdata_mutex_init(&dim->mutex); + + dim->km_contexts.reserve(Cfg.num_models_to_use); + + rd->ml_dimension = (rrd_ml_dimension_t *) dim; - delete D; - RD->ml_dimension = nullptr; + metaqueue_ml_load_models(rd); } -char *ml_get_host_info(RRDHOST *RH) { - nlohmann::json ConfigJson; +void ml_dimension_delete(RRDDIM *rd) +{ + ml_dimension_t *dim = (ml_dimension_t *) rd->ml_dimension; + if (!dim) + return; - if (RH && RH->ml_host) { - Host *H = reinterpret_cast(RH->ml_host); - H->getConfigAsJson(ConfigJson); - } else { - ConfigJson["enabled"] = false; - } + netdata_mutex_destroy(&dim->mutex); - return strdupz(ConfigJson.dump(2, '\t').c_str()); + delete dim; + rd->ml_dimension = NULL; } -char *ml_get_host_runtime_info(RRDHOST *RH) { - nlohmann::json ConfigJson; +bool ml_dimension_is_anomalous(RRDDIM *rd, time_t curr_time, double value, bool exists) +{ + ml_dimension_t *dim = (ml_dimension_t *) rd->ml_dimension; + if (!dim) + return false; - if (RH && RH->ml_host) { - Host *H = reinterpret_cast(RH->ml_host); - H->getDetectionInfoAsJson(ConfigJson); - } else { - return nullptr; - } + ml_chart_t *chart = (ml_chart_t *) rd->rrdset->ml_chart; - return strdup(ConfigJson.dump(1, '\t').c_str()); + bool is_anomalous = ml_dimension_predict(dim, curr_time, value, exists); + ml_chart_update_dimension(chart, dim, is_anomalous); + + return is_anomalous; } -char *ml_get_host_models(RRDHOST *RH) { - nlohmann::json ModelsJson; +static int ml_flush_pending_models(ml_training_thread_t *training_thread) { + (void) db_execute(db, "BEGIN TRANSACTION;"); + + for (const auto &pending_model: training_thread->pending_model_info) { + int rc = ml_dimension_add_model(&pending_model.metric_uuid, &pending_model.kmeans); + if (rc) + return rc; - if (RH && RH->ml_host) { - Host *H = reinterpret_cast(RH->ml_host); - H->getModelsAsJson(ModelsJson); - return strdup(ModelsJson.dump(2, '\t').c_str()); + rc = ml_dimension_delete_models(&pending_model.metric_uuid, pending_model.kmeans.before - (Cfg.num_models_to_use * Cfg.train_every)); + if (rc) + return rc; } - return nullptr; + (void) db_execute(db, "COMMIT TRANSACTION;"); + + training_thread->pending_model_info.clear(); + return 0; } -void ml_start_anomaly_detection_threads(RRDHOST *RH) { - if (RH && RH->ml_host) { - Host *H = reinterpret_cast(RH->ml_host); - H->startAnomalyDetectionThreads(); +static void *ml_train_main(void *arg) { + ml_training_thread_t *training_thread = (ml_training_thread_t *) arg; + + char worker_name[1024]; + snprintfz(worker_name, 1024, "training_thread_%zu", training_thread->id); + worker_register("MLTRAIN"); + + worker_register_job_name(WORKER_TRAIN_QUEUE_POP, "pop queue"); + worker_register_job_name(WORKER_TRAIN_ACQUIRE_DIMENSION, "acquire"); + worker_register_job_name(WORKER_TRAIN_QUERY, "query"); + worker_register_job_name(WORKER_TRAIN_KMEANS, "kmeans"); + worker_register_job_name(WORKER_TRAIN_UPDATE_MODELS, "update models"); + worker_register_job_name(WORKER_TRAIN_RELEASE_DIMENSION, "release"); + worker_register_job_name(WORKER_TRAIN_UPDATE_HOST, "update host"); + worker_register_job_name(WORKER_TRAIN_FLUSH_MODELS, "flush models"); + + while (!Cfg.training_stop) { + worker_is_busy(WORKER_TRAIN_QUEUE_POP); + + ml_training_request_t training_req = ml_queue_pop(training_thread->training_queue); + + // we know this thread has been cancelled, when the queue starts + // returning "null" requests without blocking on queue's pop(). + if (training_req.chart_id == NULL) + break; + + size_t queue_size = ml_queue_size(training_thread->training_queue) + 1; + + usec_t allotted_ut = (Cfg.train_every * USEC_PER_SEC) / queue_size; + if (allotted_ut > USEC_PER_SEC) + allotted_ut = USEC_PER_SEC; + + usec_t start_ut = now_monotonic_usec(); + + enum ml_training_result training_res; + { + worker_is_busy(WORKER_TRAIN_ACQUIRE_DIMENSION); + ml_acquired_dimension_t acq_dim = ml_acquired_dimension_get( + training_req.machine_guid, + training_req.chart_id, + training_req.dimension_id); + + training_res = ml_acquired_dimension_train(training_thread, acq_dim, training_req); + + string_freez(training_req.chart_id); + string_freez(training_req.dimension_id); + + worker_is_busy(WORKER_TRAIN_RELEASE_DIMENSION); + ml_acquired_dimension_release(acq_dim); + } + + usec_t consumed_ut = now_monotonic_usec() - start_ut; + + usec_t remaining_ut = 0; + if (consumed_ut < allotted_ut) + remaining_ut = allotted_ut - consumed_ut; + + if (Cfg.enable_statistics_charts) { + worker_is_busy(WORKER_TRAIN_UPDATE_HOST); + + netdata_mutex_lock(&training_thread->nd_mutex); + + training_thread->training_stats.queue_size += queue_size; + training_thread->training_stats.num_popped_items += 1; + + training_thread->training_stats.allotted_ut += allotted_ut; + training_thread->training_stats.consumed_ut += consumed_ut; + training_thread->training_stats.remaining_ut += remaining_ut; + + switch (training_res) { + case TRAINING_RESULT_OK: + training_thread->training_stats.training_result_ok += 1; + break; + case TRAINING_RESULT_INVALID_QUERY_TIME_RANGE: + training_thread->training_stats.training_result_invalid_query_time_range += 1; + break; + case TRAINING_RESULT_NOT_ENOUGH_COLLECTED_VALUES: + training_thread->training_stats.training_result_not_enough_collected_values += 1; + break; + case TRAINING_RESULT_NULL_ACQUIRED_DIMENSION: + training_thread->training_stats.training_result_null_acquired_dimension += 1; + break; + case TRAINING_RESULT_CHART_UNDER_REPLICATION: + training_thread->training_stats.training_result_chart_under_replication += 1; + break; + } + + netdata_mutex_unlock(&training_thread->nd_mutex); + } + + if (training_thread->pending_model_info.size() >= Cfg.flush_models_batch_size) { + worker_is_busy(WORKER_TRAIN_FLUSH_MODELS); + netdata_mutex_lock(&db_mutex); + ml_flush_pending_models(training_thread); + netdata_mutex_unlock(&db_mutex); + continue; + } + + worker_is_idle(); + std::this_thread::sleep_for(std::chrono::microseconds{remaining_ut}); } + + return NULL; } -void ml_stop_anomaly_detection_threads(RRDHOST *RH) { - if (RH && RH->ml_host) { - Host *H = reinterpret_cast(RH->ml_host); - H->stopAnomalyDetectionThreads(true); +void ml_init() +{ + // Read config values + ml_config_load(&Cfg); + + if (!Cfg.enable_anomaly_detection) + return; + + // Generate random numbers to efficiently sample the features we need + // for KMeans clustering. + std::random_device RD; + std::mt19937 Gen(RD()); + + Cfg.random_nums.reserve(Cfg.max_train_samples); + for (size_t Idx = 0; Idx != Cfg.max_train_samples; Idx++) + Cfg.random_nums.push_back(Gen()); + + // init training thread-specific data + Cfg.training_threads.resize(Cfg.num_training_threads); + for (size_t idx = 0; idx != Cfg.num_training_threads; idx++) { + ml_training_thread_t *training_thread = &Cfg.training_threads[idx]; + + size_t max_elements_needed_for_training = Cfg.max_train_samples * (Cfg.lag_n + 1); + training_thread->training_cns = new calculated_number_t[max_elements_needed_for_training](); + training_thread->scratch_training_cns = new calculated_number_t[max_elements_needed_for_training](); + + training_thread->id = idx; + training_thread->training_queue = ml_queue_init(); + training_thread->pending_model_info.reserve(Cfg.flush_models_batch_size); + netdata_mutex_init(&training_thread->nd_mutex); } -} -void ml_cancel_anomaly_detection_threads(RRDHOST *RH) { - if (RH && RH->ml_host) { - Host *H = reinterpret_cast(RH->ml_host); - H->stopAnomalyDetectionThreads(false); + // open sqlite db + char path[FILENAME_MAX]; + snprintfz(path, FILENAME_MAX - 1, "%s/%s", netdata_configured_cache_dir, "ml.db"); + int rc = sqlite3_open(path, &db); + if (rc != SQLITE_OK) { + error_report("Failed to initialize database at %s, due to \"%s\"", path, sqlite3_errstr(rc)); + sqlite3_close(db); + db = NULL; + } + + if (db) { + char *err = NULL; + int rc = sqlite3_exec(db, db_models_create_table, NULL, NULL, &err); + if (rc != SQLITE_OK) { + error_report("Failed to create models table (%s, %s)", sqlite3_errstr(rc), err ? err : ""); + sqlite3_close(db); + sqlite3_free(err); + db = NULL; + } } } -void ml_chart_update_begin(RRDSET *RS) { - Chart *C = reinterpret_cast(RS->ml_chart); - if (!C) +void ml_fini() { + if (!Cfg.enable_anomaly_detection) return; - C->updateBegin(); + int rc = sqlite3_close_v2(db); + if (unlikely(rc != SQLITE_OK)) + error_report("Error %d while closing the SQLite database, %s", rc, sqlite3_errstr(rc)); } -void ml_chart_update_end(RRDSET *RS) { - Chart *C = reinterpret_cast(RS->ml_chart); - if (!C) +void ml_start_threads() { + if (!Cfg.enable_anomaly_detection) return; - C->updateEnd(); -} + // start detection & training threads + Cfg.detection_stop = false; + Cfg.training_stop = false; -bool ml_is_anomalous(RRDDIM *RD, time_t CurrT, double Value, bool Exists) { - Dimension *D = reinterpret_cast(RD->ml_dimension); - if (!D) - return false; + char tag[NETDATA_THREAD_TAG_MAX + 1]; - Chart *C = reinterpret_cast(RD->rrdset->ml_chart); + snprintfz(tag, NETDATA_THREAD_TAG_MAX, "%s", "PREDICT"); + netdata_thread_create(&Cfg.detection_thread, tag, NETDATA_THREAD_OPTION_JOINABLE, ml_detect_main, NULL); - bool IsAnomalous = D->predict(CurrT, Value, Exists); - C->updateDimension(D, IsAnomalous); - return IsAnomalous; + for (size_t idx = 0; idx != Cfg.num_training_threads; idx++) { + ml_training_thread_t *training_thread = &Cfg.training_threads[idx]; + snprintfz(tag, NETDATA_THREAD_TAG_MAX, "TRAIN[%zu]", training_thread->id); + netdata_thread_create(&training_thread->nd_thread, tag, NETDATA_THREAD_OPTION_JOINABLE, ml_train_main, training_thread); + } } -bool ml_streaming_enabled() { - return Cfg.StreamADCharts; -} +void ml_stop_threads() +{ + if (!Cfg.enable_anomaly_detection) + return; -#include "ml-private.h" + Cfg.detection_stop = true; + Cfg.training_stop = true; + + netdata_thread_cancel(Cfg.detection_thread); + netdata_thread_join(Cfg.detection_thread, NULL); + + // signal the training queue of each thread + for (size_t idx = 0; idx != Cfg.num_training_threads; idx++) { + ml_training_thread_t *training_thread = &Cfg.training_threads[idx]; + + ml_queue_signal(training_thread->training_queue); + } + + // cancel training threads + for (size_t idx = 0; idx != Cfg.num_training_threads; idx++) { + ml_training_thread_t *training_thread = &Cfg.training_threads[idx]; + + netdata_thread_cancel(training_thread->nd_thread); + } + + // join training threads + for (size_t idx = 0; idx != Cfg.num_training_threads; idx++) { + ml_training_thread_t *training_thread = &Cfg.training_threads[idx]; + + netdata_thread_join(training_thread->nd_thread, NULL); + } + + // clear training thread data + for (size_t idx = 0; idx != Cfg.num_training_threads; idx++) { + ml_training_thread_t *training_thread = &Cfg.training_threads[idx]; + + delete[] training_thread->training_cns; + delete[] training_thread->scratch_training_cns; + ml_queue_destroy(training_thread->training_queue); + netdata_mutex_destroy(&training_thread->nd_mutex); + } +} diff --git a/ml/ml.h b/ml/ml.h index 8bed627f5..964dd0821 100644 --- a/ml/ml.h +++ b/ml/ml.h @@ -10,39 +10,35 @@ extern "C" { #include "daemon/common.h" #include "web/api/queries/rrdr.h" -// This is a DBEngine function redeclared here so that we can free -// the anomaly rate dimension, whenever its backing dimension is freed. -void rrddim_free(RRDSET *st, RRDDIM *rd); - bool ml_capable(); - -bool ml_enabled(RRDHOST *RH); +bool ml_enabled(RRDHOST *rh); +bool ml_streaming_enabled(); void ml_init(void); +void ml_fini(void); -void ml_host_new(RRDHOST *RH); -void ml_host_delete(RRDHOST *RH); +void ml_start_threads(void); +void ml_stop_threads(void); -void ml_chart_new(RRDSET *RS); -void ml_chart_delete(RRDSET *RS); +void ml_host_new(RRDHOST *rh); +void ml_host_delete(RRDHOST *rh); -void ml_dimension_new(RRDDIM *RD); -void ml_dimension_delete(RRDDIM *RD); +void ml_host_get_info(RRDHOST *RH, BUFFER *wb); +void ml_host_get_detection_info(RRDHOST *RH, BUFFER *wb); +void ml_host_get_models(RRDHOST *RH, BUFFER *wb); -void ml_start_anomaly_detection_threads(RRDHOST *RH); -void ml_stop_anomaly_detection_threads(RRDHOST *RH); -void ml_cancel_anomaly_detection_threads(RRDHOST *RH); +void ml_chart_new(RRDSET *rs); +void ml_chart_delete(RRDSET *rs); +bool ml_chart_update_begin(RRDSET *rs); +void ml_chart_update_end(RRDSET *rs); -char *ml_get_host_info(RRDHOST *RH); -char *ml_get_host_runtime_info(RRDHOST *RH); -char *ml_get_host_models(RRDHOST *RH); +void ml_dimension_new(RRDDIM *rd); +void ml_dimension_delete(RRDDIM *rd); +bool ml_dimension_is_anomalous(RRDDIM *rd, time_t curr_time, double value, bool exists); -void ml_chart_update_begin(RRDSET *RS); -void ml_chart_update_end(RRDSET *RS); +int ml_dimension_load_models(RRDDIM *rd); -bool ml_is_anomalous(RRDDIM *RD, time_t curr_t, double value, bool exists); - -bool ml_streaming_enabled(); +void ml_update_global_statistics_charts(uint64_t models_consulted); #ifdef __cplusplus }; -- cgit v1.2.3