diff options
Diffstat (limited to '')
-rw-r--r-- | ml/Host.cc | 45 |
1 files changed, 43 insertions, 2 deletions
diff --git a/ml/Host.cc b/ml/Host.cc index 3166720cc..f8cba9d64 100644 --- a/ml/Host.cc +++ b/ml/Host.cc @@ -358,6 +358,10 @@ void TrainableHost::trainDimension(Dimension *D, const TimePoint &NowTP) { void TrainableHost::train() { Duration<double> MaxSleepFor = Seconds{10 * updateEvery()}; + worker_register("MLTRAIN"); + worker_register_job_name(0, "dimensions"); + + worker_is_busy(0); while (!netdata_exit) { netdata_thread_testcancel(); netdata_thread_disable_cancelability(); @@ -378,11 +382,23 @@ void TrainableHost::train() { if (RealDuration >= AllottedDuration) continue; + worker_is_idle(); SleepFor = std::min(AllottedDuration - RealDuration, MaxSleepFor); std::this_thread::sleep_for(SleepFor); + worker_is_busy(0); } } +#define WORKER_JOB_DETECT_DIMENSION 0 +#define WORKER_JOB_UPDATE_DETECTION_CHART 1 +#define WORKER_JOB_UPDATE_ANOMALY_RATES 2 +#define WORKER_JOB_UPDATE_CHARTS 3 +#define WORKER_JOB_SAVE_ANOMALY_EVENT 4 + +#if WORKER_UTILIZATION_MAX_JOB_TYPES < 5 +#error WORKER_UTILIZATION_MAX_JOB_TYPES has to be at least 5 +#endif + void DetectableHost::detectOnce() { auto P = BRW.insert(WindowAnomalyRate >= Cfg.HostAnomalyRateThreshold); BitRateWindow::Edge Edge = P.first; @@ -397,6 +413,7 @@ void DetectableHost::detectOnce() { size_t NumAnomalousDimensions = 0; size_t NumNormalDimensions = 0; size_t NumTrainedDimensions = 0; + size_t NumActiveDimensions = 0; bool CollectAnomalyRates = (++AnomalyRateTimer == Cfg.DBEngineAnomalyRateEvery); if (CollectAnomalyRates) @@ -408,8 +425,17 @@ void DetectableHost::detectOnce() { DimsOverThreshold.reserve(DimensionsMap.size()); for (auto &DP : DimensionsMap) { + worker_is_busy(WORKER_JOB_DETECT_DIMENSION); + Dimension *D = DP.second; + if (!D->isActive()) { + D->updateAnomalyBitCounter(AnomalyRateRS, AnomalyRateTimer, false); + continue; + } + + NumActiveDimensions++; + auto P = D->detect(WindowLength, ResetBitCounter); bool IsAnomalous = P.first; double AnomalyScore = P.second; @@ -426,14 +452,15 @@ void DetectableHost::detectOnce() { } if (NumAnomalousDimensions) - WindowAnomalyRate = static_cast<double>(NumAnomalousDimensions) / DimensionsMap.size(); + WindowAnomalyRate = static_cast<double>(NumAnomalousDimensions) / NumActiveDimensions; else WindowAnomalyRate = 0.0; - NumNormalDimensions = DimensionsMap.size() - NumAnomalousDimensions; + NumNormalDimensions = NumActiveDimensions - NumAnomalousDimensions; } if (CollectAnomalyRates) { + worker_is_busy(WORKER_JOB_UPDATE_ANOMALY_RATES); AnomalyRateTimer = 0; rrdset_done(AnomalyRateRS); } @@ -441,7 +468,9 @@ void DetectableHost::detectOnce() { this->NumAnomalousDimensions = NumAnomalousDimensions; this->NumNormalDimensions = NumNormalDimensions; this->NumTrainedDimensions = NumTrainedDimensions; + this->NumActiveDimensions = NumActiveDimensions; + worker_is_busy(WORKER_JOB_UPDATE_CHARTS); updateDimensionsChart(getRH(), NumTrainedDimensions, NumNormalDimensions, NumAnomalousDimensions); updateRateChart(getRH(), WindowAnomalyRate * 10000.0); updateWindowLengthChart(getRH(), WindowLength); @@ -454,6 +483,8 @@ void DetectableHost::detectOnce() { if (!NewAnomalyEvent || (DimsOverThreshold.size() == 0)) return; + worker_is_busy(WORKER_JOB_SAVE_ANOMALY_EVENT); + std::sort(DimsOverThreshold.begin(), DimsOverThreshold.end()); std::reverse(DimsOverThreshold.begin(), DimsOverThreshold.end()); @@ -476,6 +507,13 @@ void DetectableHost::detectOnce() { } void DetectableHost::detect() { + worker_register("MLDETECT"); + worker_register_job_name(WORKER_JOB_DETECT_DIMENSION, "dimensions"); + worker_register_job_name(WORKER_JOB_UPDATE_DETECTION_CHART, "detection chart"); + worker_register_job_name(WORKER_JOB_UPDATE_ANOMALY_RATES, "anomaly rates"); + worker_register_job_name(WORKER_JOB_UPDATE_CHARTS, "charts"); + worker_register_job_name(WORKER_JOB_SAVE_ANOMALY_EVENT, "anomaly event"); + std::this_thread::sleep_for(Seconds{10}); heartbeat_t HB; @@ -483,10 +521,13 @@ void DetectableHost::detect() { while (!netdata_exit) { netdata_thread_testcancel(); + worker_is_idle(); heartbeat_next(&HB, updateEvery() * USEC_PER_SEC); netdata_thread_disable_cancelability(); detectOnce(); + + worker_is_busy(WORKER_JOB_UPDATE_DETECTION_CHART); updateDetectionChart(getRH()); netdata_thread_enable_cancelability(); } |