diff options
Diffstat (limited to 'ml/Host.cc')
-rw-r--r-- | ml/Host.cc | 222 |
1 files changed, 137 insertions, 85 deletions
diff --git a/ml/Host.cc b/ml/Host.cc index b632710a4..3166720cc 100644 --- a/ml/Host.cc +++ b/ml/Host.cc @@ -20,13 +20,18 @@ static void updateDimensionsChart(RRDHOST *RH, static thread_local RRDDIM *NumAnomalousDimensionsRD = nullptr; if (!RS) { + std::stringstream IdSS, NameSS; + + IdSS << "dimensions_on_" << localhost->machine_guid; + NameSS << "dimensions_on_" << localhost->hostname; + RS = rrdset_create( - RH, // host + RH, "anomaly_detection", // type - "dimensions", // id - NULL, // name + IdSS.str().c_str(), // id + NameSS.str().c_str(), // name "dimensions", // family - NULL, // ctx + "anomaly_detection.dimensions", // ctx "Anomaly detection dimensions", // title "dimensions", // units "netdata", // plugin @@ -35,6 +40,7 @@ static void updateDimensionsChart(RRDHOST *RH, RH->rrd_update_every, // update_every RRDSET_TYPE_LINE // chart_type ); + rrdset_flag_set(RS, RRDSET_FLAG_ANOMALY_DETECTION); NumTotalDimensionsRD = rrddim_add(RS, "total", NULL, 1, 1, RRD_ALGORITHM_ABSOLUTE); @@ -60,13 +66,18 @@ static void updateRateChart(RRDHOST *RH, collected_number AnomalyRate) { static thread_local RRDDIM *AnomalyRateRD = nullptr; if (!RS) { + std::stringstream IdSS, NameSS; + + IdSS << "anomaly_rate_on_" << localhost->machine_guid; + NameSS << "anomaly_rate_on_" << localhost->hostname; + RS = rrdset_create( - RH, // host + RH, "anomaly_detection", // type - "anomaly_rate", // id - NULL, // name + IdSS.str().c_str(), // id + NameSS.str().c_str(), // name "anomaly_rate", // family - NULL, // ctx + "anomaly_detection.anomaly_rate", // ctx "Percentage of anomalous dimensions", // title "percentage", // units "netdata", // plugin @@ -75,6 +86,7 @@ static void updateRateChart(RRDHOST *RH, collected_number AnomalyRate) { RH->rrd_update_every, // update_every RRDSET_TYPE_LINE // chart_type ); + rrdset_flag_set(RS, RRDSET_FLAG_ANOMALY_DETECTION); AnomalyRateRD = rrddim_add(RS, "anomaly_rate", NULL, 1, 100, RRD_ALGORITHM_ABSOLUTE); @@ -91,13 +103,18 @@ static void updateWindowLengthChart(RRDHOST *RH, collected_number WindowLength) static thread_local RRDDIM *WindowLengthRD = nullptr; if (!RS) { + std::stringstream IdSS, NameSS; + + IdSS << "detector_window_on_" << localhost->machine_guid; + NameSS << "detector_window_on_" << localhost->hostname; + RS = rrdset_create( - RH, // host + RH, "anomaly_detection", // type - "detector_window", // id - NULL, // name + IdSS.str().c_str(), // id + NameSS.str().c_str(), // name "detector_window", // family - NULL, // ctx + "anomaly_detection.detector_window", // ctx "Anomaly detector window length", // title "seconds", // units "netdata", // plugin @@ -106,6 +123,7 @@ static void updateWindowLengthChart(RRDHOST *RH, collected_number WindowLength) RH->rrd_update_every, // update_every RRDSET_TYPE_LINE // chart_type ); + rrdset_flag_set(RS, RRDSET_FLAG_ANOMALY_DETECTION); WindowLengthRD = rrddim_add(RS, "duration", NULL, 1, 1, RRD_ALGORITHM_ABSOLUTE); @@ -126,13 +144,18 @@ static void updateEventsChart(RRDHOST *RH, static thread_local RRDDIM *NewAnomalyEventRD = nullptr; if (!RS) { + std::stringstream IdSS, NameSS; + + IdSS << "detector_events_on_" << localhost->machine_guid; + NameSS << "detector_events_on_" << localhost->hostname; + RS = rrdset_create( - RH, // host + RH, "anomaly_detection", // type - "detector_events", // id - NULL, // name + IdSS.str().c_str(), // id + NameSS.str().c_str(), // name "detector_events", // family - NULL, // ctx + "anomaly_detection.detector_events", // ctx "Anomaly events triggered", // title "boolean", // units "netdata", // plugin @@ -141,6 +164,7 @@ static void updateEventsChart(RRDHOST *RH, RH->rrd_update_every, // update_every RRDSET_TYPE_LINE // chart_type ); + rrdset_flag_set(RS, RRDSET_FLAG_ANOMALY_DETECTION); AboveThresholdRD = rrddim_add(RS, "above_threshold", NULL, 1, 1, RRD_ALGORITHM_ABSOLUTE); @@ -161,82 +185,94 @@ static void updateEventsChart(RRDHOST *RH, rrdset_done(RS); } -static void updateDetectionChart(RRDHOST *RH, collected_number PredictionDuration) { +static void updateDetectionChart(RRDHOST *RH) { static thread_local RRDSET *RS = nullptr; - static thread_local RRDDIM *PredictiobDurationRD = nullptr; + static thread_local RRDDIM *UserRD, *SystemRD = nullptr; if (!RS) { - RS = rrdset_create( - RH, // host - "anomaly_detection", // type - "prediction_stats", // id - NULL, // name - "prediction_stats", // family - NULL, // ctx - "Time it took to run prediction", // title - "milliseconds", // units + std::stringstream IdSS, NameSS; + + IdSS << "prediction_stats_" << RH->machine_guid; + NameSS << "prediction_stats_for_" << RH->hostname; + + RS = rrdset_create_localhost( + "netdata", // type + IdSS.str().c_str(), // id + NameSS.str().c_str(), // name + "ml", // family + "netdata.prediction_stats", // ctx + "Prediction thread CPU usage", // title + "milliseconds/s", // units "netdata", // plugin "ml", // module - 39187, // priority + 136000, // priority RH->rrd_update_every, // update_every - RRDSET_TYPE_LINE // chart_type + RRDSET_TYPE_STACKED // chart_type ); - PredictiobDurationRD = rrddim_add(RS, "duration", NULL, - 1, 1, RRD_ALGORITHM_ABSOLUTE); + UserRD = rrddim_add(RS, "user", NULL, 1, 1000, RRD_ALGORITHM_INCREMENTAL); + SystemRD = rrddim_add(RS, "system", NULL, 1, 1000, RRD_ALGORITHM_INCREMENTAL); } else rrdset_next(RS); - rrddim_set_by_pointer(RS, PredictiobDurationRD, PredictionDuration); + struct rusage TRU; + getrusage(RUSAGE_THREAD, &TRU); + rrddim_set_by_pointer(RS, UserRD, TRU.ru_utime.tv_sec * 1000000ULL + TRU.ru_utime.tv_usec); + rrddim_set_by_pointer(RS, SystemRD, TRU.ru_stime.tv_sec * 1000000ULL + TRU.ru_stime.tv_usec); rrdset_done(RS); } -static void updateTrainingChart(RRDHOST *RH, - collected_number TotalTrainingDuration, - collected_number MaxTrainingDuration) +static void updateTrainingChart(RRDHOST *RH, struct rusage *TRU) { static thread_local RRDSET *RS = nullptr; - static thread_local RRDDIM *TotalTrainingDurationRD = nullptr; - static thread_local RRDDIM *MaxTrainingDurationRD = nullptr; + static thread_local RRDDIM *UserRD = nullptr; + static thread_local RRDDIM *SystemRD = nullptr; if (!RS) { - RS = rrdset_create( - RH, // host - "anomaly_detection", // type - "training_stats", // id - NULL, // name - "training_stats", // family - NULL, // ctx - "Training step statistics", // title - "milliseconds", // units + std::stringstream IdSS, NameSS; + + IdSS << "training_stats_" << RH->machine_guid; + NameSS << "training_stats_for_" << RH->hostname; + + RS = rrdset_create_localhost( + "netdata", // type + IdSS.str().c_str(), // id + NameSS.str().c_str(), // name + "ml", // family + "netdata.training_stats", // ctx + "Training thread CPU usage", // title + "milliseconds/s", // units "netdata", // plugin "ml", // module - 39188, // priority + 136001, // priority RH->rrd_update_every, // update_every - RRDSET_TYPE_LINE // chart_type + RRDSET_TYPE_STACKED // chart_type ); - TotalTrainingDurationRD = rrddim_add(RS, "total_training_duration", NULL, - 1, 1, RRD_ALGORITHM_ABSOLUTE); - MaxTrainingDurationRD = rrddim_add(RS, "max_training_duration", NULL, - 1, 1, RRD_ALGORITHM_ABSOLUTE); + UserRD = rrddim_add(RS, "user", NULL, 1, 1000, RRD_ALGORITHM_INCREMENTAL); + SystemRD = rrddim_add(RS, "system", NULL, 1, 1000, RRD_ALGORITHM_INCREMENTAL); } else rrdset_next(RS); - rrddim_set_by_pointer(RS, TotalTrainingDurationRD, TotalTrainingDuration); - rrddim_set_by_pointer(RS, MaxTrainingDurationRD, MaxTrainingDuration); - + rrddim_set_by_pointer(RS, UserRD, TRU->ru_utime.tv_sec * 1000000ULL + TRU->ru_utime.tv_usec); + rrddim_set_by_pointer(RS, SystemRD, TRU->ru_stime.tv_sec * 1000000ULL + TRU->ru_stime.tv_usec); rrdset_done(RS); } void RrdHost::addDimension(Dimension *D) { - std::lock_guard<std::mutex> Lock(Mutex); + RRDDIM *AnomalyRateRD = rrddim_add(AnomalyRateRS, D->getID().c_str(), NULL, + 1, 1000, RRD_ALGORITHM_ABSOLUTE); + D->setAnomalyRateRD(AnomalyRateRD); + + { + std::lock_guard<std::mutex> Lock(Mutex); - DimensionsMap[D->getRD()] = D; + DimensionsMap[D->getRD()] = D; - // Default construct mutex for dimension - LocksMap[D]; + // Default construct mutex for dimension + LocksMap[D]; + } } void RrdHost::removeDimension(Dimension *D) { @@ -272,6 +308,7 @@ void RrdHost::getConfigAsJson(nlohmann::json &Json) const { Json["smooth-n"] = Cfg.SmoothN; Json["lag-n"] = Cfg.LagN; + Json["random-sampling-ratio"] = Cfg.RandomSamplingRatio; Json["max-kmeans-iters"] = Cfg.MaxKMeansIters; Json["dimension-anomaly-score-threshold"] = Cfg.DimensionAnomalyScoreThreshold; @@ -310,11 +347,7 @@ void TrainableHost::trainDimension(Dimension *D, const TimePoint &NowTP) { return; D->LastTrainedAt = NowTP + Seconds{D->updateEvery()}; - - TimePoint StartTP = SteadyClock::now(); D->trainModel(); - Duration<double> Duration = SteadyClock::now() - StartTP; - D->updateTrainingDuration(Duration.count()); { std::lock_guard<std::mutex> Lock(Mutex); @@ -323,14 +356,21 @@ void TrainableHost::trainDimension(Dimension *D, const TimePoint &NowTP) { } void TrainableHost::train() { - Duration<double> MaxSleepFor = Seconds{updateEvery()}; + Duration<double> MaxSleepFor = Seconds{10 * updateEvery()}; while (!netdata_exit) { + netdata_thread_testcancel(); + netdata_thread_disable_cancelability(); + + updateResourceUsage(); + TimePoint NowTP = SteadyClock::now(); auto P = findDimensionToTrain(NowTP); trainDimension(P.first, NowTP); + netdata_thread_enable_cancelability(); + Duration<double> AllottedDuration = P.second; Duration<double> RealDuration = SteadyClock::now() - NowTP; @@ -344,7 +384,7 @@ void TrainableHost::train() { } void DetectableHost::detectOnce() { - auto P = BRW.insert(AnomalyRate >= Cfg.HostAnomalyRateThreshold); + auto P = BRW.insert(WindowAnomalyRate >= Cfg.HostAnomalyRateThreshold); BitRateWindow::Edge Edge = P.first; size_t WindowLength = P.second; @@ -358,8 +398,9 @@ void DetectableHost::detectOnce() { size_t NumNormalDimensions = 0; size_t NumTrainedDimensions = 0; - double TotalTrainingDuration = 0.0; - double MaxTrainingDuration = 0.0; + bool CollectAnomalyRates = (++AnomalyRateTimer == Cfg.DBEngineAnomalyRateEvery); + if (CollectAnomalyRates) + rrdset_next(AnomalyRateRS); { std::lock_guard<std::mutex> Lock(Mutex); @@ -371,38 +412,44 @@ void DetectableHost::detectOnce() { auto P = D->detect(WindowLength, ResetBitCounter); bool IsAnomalous = P.first; - double AnomalyRate = P.second; + double AnomalyScore = P.second; NumTrainedDimensions += D->isTrained(); - double DimTrainingDuration = D->updateTrainingDuration(0.0); - MaxTrainingDuration = std::max(MaxTrainingDuration, DimTrainingDuration); - TotalTrainingDuration += DimTrainingDuration; - if (IsAnomalous) NumAnomalousDimensions += 1; - if (NewAnomalyEvent && (AnomalyRate >= Cfg.ADDimensionRateThreshold)) - DimsOverThreshold.push_back({ AnomalyRate, D->getID() }); + if (NewAnomalyEvent && (AnomalyScore >= Cfg.ADDimensionRateThreshold)) + DimsOverThreshold.push_back({ AnomalyScore, D->getID() }); + + D->updateAnomalyBitCounter(AnomalyRateRS, AnomalyRateTimer, IsAnomalous); } if (NumAnomalousDimensions) - AnomalyRate = static_cast<double>(NumAnomalousDimensions) / DimensionsMap.size(); + WindowAnomalyRate = static_cast<double>(NumAnomalousDimensions) / DimensionsMap.size(); else - AnomalyRate = 0.0; + WindowAnomalyRate = 0.0; NumNormalDimensions = DimensionsMap.size() - NumAnomalousDimensions; } + if (CollectAnomalyRates) { + AnomalyRateTimer = 0; + rrdset_done(AnomalyRateRS); + } + this->NumAnomalousDimensions = NumAnomalousDimensions; this->NumNormalDimensions = NumNormalDimensions; this->NumTrainedDimensions = NumTrainedDimensions; updateDimensionsChart(getRH(), NumTrainedDimensions, NumNormalDimensions, NumAnomalousDimensions); - updateRateChart(getRH(), AnomalyRate * 10000.0); + updateRateChart(getRH(), WindowAnomalyRate * 10000.0); updateWindowLengthChart(getRH(), WindowLength); updateEventsChart(getRH(), P, ResetBitCounter, NewAnomalyEvent); - updateTrainingChart(getRH(), TotalTrainingDuration * 1000.0, MaxTrainingDuration * 1000.0); + + struct rusage TRU; + getResourceUsage(&TRU); + updateTrainingChart(getRH(), &TRU); if (!NewAnomalyEvent || (DimsOverThreshold.size() == 0)) return; @@ -431,15 +478,17 @@ void DetectableHost::detectOnce() { void DetectableHost::detect() { std::this_thread::sleep_for(Seconds{10}); - while (!netdata_exit) { - TimePoint StartTP = SteadyClock::now(); - detectOnce(); - TimePoint EndTP = SteadyClock::now(); + heartbeat_t HB; + heartbeat_init(&HB); - Duration<double> Dur = EndTP - StartTP; - updateDetectionChart(getRH(), Dur.count() * 1000); + while (!netdata_exit) { + netdata_thread_testcancel(); + heartbeat_next(&HB, updateEvery() * USEC_PER_SEC); - std::this_thread::sleep_for(Seconds{updateEvery()}); + netdata_thread_disable_cancelability(); + detectOnce(); + updateDetectionChart(getRH()); + netdata_thread_enable_cancelability(); } } @@ -457,6 +506,9 @@ void DetectableHost::startAnomalyDetectionThreads() { } void DetectableHost::stopAnomalyDetectionThreads() { + netdata_thread_cancel(TrainingThread.native_handle()); + netdata_thread_cancel(DetectionThread.native_handle()); + TrainingThread.join(); DetectionThread.join(); } |