summaryrefslogtreecommitdiffstats
path: root/ml/Host.cc
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--ml/Host.cc45
1 files changed, 43 insertions, 2 deletions
diff --git a/ml/Host.cc b/ml/Host.cc
index 3166720cc..f8cba9d64 100644
--- a/ml/Host.cc
+++ b/ml/Host.cc
@@ -358,6 +358,10 @@ void TrainableHost::trainDimension(Dimension *D, const TimePoint &NowTP) {
void TrainableHost::train() {
Duration<double> MaxSleepFor = Seconds{10 * updateEvery()};
+ worker_register("MLTRAIN");
+ worker_register_job_name(0, "dimensions");
+
+ worker_is_busy(0);
while (!netdata_exit) {
netdata_thread_testcancel();
netdata_thread_disable_cancelability();
@@ -378,11 +382,23 @@ void TrainableHost::train() {
if (RealDuration >= AllottedDuration)
continue;
+ worker_is_idle();
SleepFor = std::min(AllottedDuration - RealDuration, MaxSleepFor);
std::this_thread::sleep_for(SleepFor);
+ worker_is_busy(0);
}
}
+#define WORKER_JOB_DETECT_DIMENSION 0
+#define WORKER_JOB_UPDATE_DETECTION_CHART 1
+#define WORKER_JOB_UPDATE_ANOMALY_RATES 2
+#define WORKER_JOB_UPDATE_CHARTS 3
+#define WORKER_JOB_SAVE_ANOMALY_EVENT 4
+
+#if WORKER_UTILIZATION_MAX_JOB_TYPES < 5
+#error WORKER_UTILIZATION_MAX_JOB_TYPES has to be at least 5
+#endif
+
void DetectableHost::detectOnce() {
auto P = BRW.insert(WindowAnomalyRate >= Cfg.HostAnomalyRateThreshold);
BitRateWindow::Edge Edge = P.first;
@@ -397,6 +413,7 @@ void DetectableHost::detectOnce() {
size_t NumAnomalousDimensions = 0;
size_t NumNormalDimensions = 0;
size_t NumTrainedDimensions = 0;
+ size_t NumActiveDimensions = 0;
bool CollectAnomalyRates = (++AnomalyRateTimer == Cfg.DBEngineAnomalyRateEvery);
if (CollectAnomalyRates)
@@ -408,8 +425,17 @@ void DetectableHost::detectOnce() {
DimsOverThreshold.reserve(DimensionsMap.size());
for (auto &DP : DimensionsMap) {
+ worker_is_busy(WORKER_JOB_DETECT_DIMENSION);
+
Dimension *D = DP.second;
+ if (!D->isActive()) {
+ D->updateAnomalyBitCounter(AnomalyRateRS, AnomalyRateTimer, false);
+ continue;
+ }
+
+ NumActiveDimensions++;
+
auto P = D->detect(WindowLength, ResetBitCounter);
bool IsAnomalous = P.first;
double AnomalyScore = P.second;
@@ -426,14 +452,15 @@ void DetectableHost::detectOnce() {
}
if (NumAnomalousDimensions)
- WindowAnomalyRate = static_cast<double>(NumAnomalousDimensions) / DimensionsMap.size();
+ WindowAnomalyRate = static_cast<double>(NumAnomalousDimensions) / NumActiveDimensions;
else
WindowAnomalyRate = 0.0;
- NumNormalDimensions = DimensionsMap.size() - NumAnomalousDimensions;
+ NumNormalDimensions = NumActiveDimensions - NumAnomalousDimensions;
}
if (CollectAnomalyRates) {
+ worker_is_busy(WORKER_JOB_UPDATE_ANOMALY_RATES);
AnomalyRateTimer = 0;
rrdset_done(AnomalyRateRS);
}
@@ -441,7 +468,9 @@ void DetectableHost::detectOnce() {
this->NumAnomalousDimensions = NumAnomalousDimensions;
this->NumNormalDimensions = NumNormalDimensions;
this->NumTrainedDimensions = NumTrainedDimensions;
+ this->NumActiveDimensions = NumActiveDimensions;
+ worker_is_busy(WORKER_JOB_UPDATE_CHARTS);
updateDimensionsChart(getRH(), NumTrainedDimensions, NumNormalDimensions, NumAnomalousDimensions);
updateRateChart(getRH(), WindowAnomalyRate * 10000.0);
updateWindowLengthChart(getRH(), WindowLength);
@@ -454,6 +483,8 @@ void DetectableHost::detectOnce() {
if (!NewAnomalyEvent || (DimsOverThreshold.size() == 0))
return;
+ worker_is_busy(WORKER_JOB_SAVE_ANOMALY_EVENT);
+
std::sort(DimsOverThreshold.begin(), DimsOverThreshold.end());
std::reverse(DimsOverThreshold.begin(), DimsOverThreshold.end());
@@ -476,6 +507,13 @@ void DetectableHost::detectOnce() {
}
void DetectableHost::detect() {
+ worker_register("MLDETECT");
+ worker_register_job_name(WORKER_JOB_DETECT_DIMENSION, "dimensions");
+ worker_register_job_name(WORKER_JOB_UPDATE_DETECTION_CHART, "detection chart");
+ worker_register_job_name(WORKER_JOB_UPDATE_ANOMALY_RATES, "anomaly rates");
+ worker_register_job_name(WORKER_JOB_UPDATE_CHARTS, "charts");
+ worker_register_job_name(WORKER_JOB_SAVE_ANOMALY_EVENT, "anomaly event");
+
std::this_thread::sleep_for(Seconds{10});
heartbeat_t HB;
@@ -483,10 +521,13 @@ void DetectableHost::detect() {
while (!netdata_exit) {
netdata_thread_testcancel();
+ worker_is_idle();
heartbeat_next(&HB, updateEvery() * USEC_PER_SEC);
netdata_thread_disable_cancelability();
detectOnce();
+
+ worker_is_busy(WORKER_JOB_UPDATE_DETECTION_CHART);
updateDetectionChart(getRH());
netdata_thread_enable_cancelability();
}