summaryrefslogtreecommitdiffstats
path: root/ml/Host.cc
diff options
context:
space:
mode:
Diffstat (limited to 'ml/Host.cc')
-rw-r--r--ml/Host.cc222
1 files changed, 137 insertions, 85 deletions
diff --git a/ml/Host.cc b/ml/Host.cc
index b632710a..3166720c 100644
--- a/ml/Host.cc
+++ b/ml/Host.cc
@@ -20,13 +20,18 @@ static void updateDimensionsChart(RRDHOST *RH,
static thread_local RRDDIM *NumAnomalousDimensionsRD = nullptr;
if (!RS) {
+ std::stringstream IdSS, NameSS;
+
+ IdSS << "dimensions_on_" << localhost->machine_guid;
+ NameSS << "dimensions_on_" << localhost->hostname;
+
RS = rrdset_create(
- RH, // host
+ RH,
"anomaly_detection", // type
- "dimensions", // id
- NULL, // name
+ IdSS.str().c_str(), // id
+ NameSS.str().c_str(), // name
"dimensions", // family
- NULL, // ctx
+ "anomaly_detection.dimensions", // ctx
"Anomaly detection dimensions", // title
"dimensions", // units
"netdata", // plugin
@@ -35,6 +40,7 @@ static void updateDimensionsChart(RRDHOST *RH,
RH->rrd_update_every, // update_every
RRDSET_TYPE_LINE // chart_type
);
+ rrdset_flag_set(RS, RRDSET_FLAG_ANOMALY_DETECTION);
NumTotalDimensionsRD = rrddim_add(RS, "total", NULL,
1, 1, RRD_ALGORITHM_ABSOLUTE);
@@ -60,13 +66,18 @@ static void updateRateChart(RRDHOST *RH, collected_number AnomalyRate) {
static thread_local RRDDIM *AnomalyRateRD = nullptr;
if (!RS) {
+ std::stringstream IdSS, NameSS;
+
+ IdSS << "anomaly_rate_on_" << localhost->machine_guid;
+ NameSS << "anomaly_rate_on_" << localhost->hostname;
+
RS = rrdset_create(
- RH, // host
+ RH,
"anomaly_detection", // type
- "anomaly_rate", // id
- NULL, // name
+ IdSS.str().c_str(), // id
+ NameSS.str().c_str(), // name
"anomaly_rate", // family
- NULL, // ctx
+ "anomaly_detection.anomaly_rate", // ctx
"Percentage of anomalous dimensions", // title
"percentage", // units
"netdata", // plugin
@@ -75,6 +86,7 @@ static void updateRateChart(RRDHOST *RH, collected_number AnomalyRate) {
RH->rrd_update_every, // update_every
RRDSET_TYPE_LINE // chart_type
);
+ rrdset_flag_set(RS, RRDSET_FLAG_ANOMALY_DETECTION);
AnomalyRateRD = rrddim_add(RS, "anomaly_rate", NULL,
1, 100, RRD_ALGORITHM_ABSOLUTE);
@@ -91,13 +103,18 @@ static void updateWindowLengthChart(RRDHOST *RH, collected_number WindowLength)
static thread_local RRDDIM *WindowLengthRD = nullptr;
if (!RS) {
+ std::stringstream IdSS, NameSS;
+
+ IdSS << "detector_window_on_" << localhost->machine_guid;
+ NameSS << "detector_window_on_" << localhost->hostname;
+
RS = rrdset_create(
- RH, // host
+ RH,
"anomaly_detection", // type
- "detector_window", // id
- NULL, // name
+ IdSS.str().c_str(), // id
+ NameSS.str().c_str(), // name
"detector_window", // family
- NULL, // ctx
+ "anomaly_detection.detector_window", // ctx
"Anomaly detector window length", // title
"seconds", // units
"netdata", // plugin
@@ -106,6 +123,7 @@ static void updateWindowLengthChart(RRDHOST *RH, collected_number WindowLength)
RH->rrd_update_every, // update_every
RRDSET_TYPE_LINE // chart_type
);
+ rrdset_flag_set(RS, RRDSET_FLAG_ANOMALY_DETECTION);
WindowLengthRD = rrddim_add(RS, "duration", NULL,
1, 1, RRD_ALGORITHM_ABSOLUTE);
@@ -126,13 +144,18 @@ static void updateEventsChart(RRDHOST *RH,
static thread_local RRDDIM *NewAnomalyEventRD = nullptr;
if (!RS) {
+ std::stringstream IdSS, NameSS;
+
+ IdSS << "detector_events_on_" << localhost->machine_guid;
+ NameSS << "detector_events_on_" << localhost->hostname;
+
RS = rrdset_create(
- RH, // host
+ RH,
"anomaly_detection", // type
- "detector_events", // id
- NULL, // name
+ IdSS.str().c_str(), // id
+ NameSS.str().c_str(), // name
"detector_events", // family
- NULL, // ctx
+ "anomaly_detection.detector_events", // ctx
"Anomaly events triggered", // title
"boolean", // units
"netdata", // plugin
@@ -141,6 +164,7 @@ static void updateEventsChart(RRDHOST *RH,
RH->rrd_update_every, // update_every
RRDSET_TYPE_LINE // chart_type
);
+ rrdset_flag_set(RS, RRDSET_FLAG_ANOMALY_DETECTION);
AboveThresholdRD = rrddim_add(RS, "above_threshold", NULL,
1, 1, RRD_ALGORITHM_ABSOLUTE);
@@ -161,82 +185,94 @@ static void updateEventsChart(RRDHOST *RH,
rrdset_done(RS);
}
-static void updateDetectionChart(RRDHOST *RH, collected_number PredictionDuration) {
+static void updateDetectionChart(RRDHOST *RH) {
static thread_local RRDSET *RS = nullptr;
- static thread_local RRDDIM *PredictiobDurationRD = nullptr;
+ static thread_local RRDDIM *UserRD, *SystemRD = nullptr;
if (!RS) {
- RS = rrdset_create(
- RH, // host
- "anomaly_detection", // type
- "prediction_stats", // id
- NULL, // name
- "prediction_stats", // family
- NULL, // ctx
- "Time it took to run prediction", // title
- "milliseconds", // units
+ std::stringstream IdSS, NameSS;
+
+ IdSS << "prediction_stats_" << RH->machine_guid;
+ NameSS << "prediction_stats_for_" << RH->hostname;
+
+ RS = rrdset_create_localhost(
+ "netdata", // type
+ IdSS.str().c_str(), // id
+ NameSS.str().c_str(), // name
+ "ml", // family
+ "netdata.prediction_stats", // ctx
+ "Prediction thread CPU usage", // title
+ "milliseconds/s", // units
"netdata", // plugin
"ml", // module
- 39187, // priority
+ 136000, // priority
RH->rrd_update_every, // update_every
- RRDSET_TYPE_LINE // chart_type
+ RRDSET_TYPE_STACKED // chart_type
);
- PredictiobDurationRD = rrddim_add(RS, "duration", NULL,
- 1, 1, RRD_ALGORITHM_ABSOLUTE);
+ UserRD = rrddim_add(RS, "user", NULL, 1, 1000, RRD_ALGORITHM_INCREMENTAL);
+ SystemRD = rrddim_add(RS, "system", NULL, 1, 1000, RRD_ALGORITHM_INCREMENTAL);
} else
rrdset_next(RS);
- rrddim_set_by_pointer(RS, PredictiobDurationRD, PredictionDuration);
+ struct rusage TRU;
+ getrusage(RUSAGE_THREAD, &TRU);
+ rrddim_set_by_pointer(RS, UserRD, TRU.ru_utime.tv_sec * 1000000ULL + TRU.ru_utime.tv_usec);
+ rrddim_set_by_pointer(RS, SystemRD, TRU.ru_stime.tv_sec * 1000000ULL + TRU.ru_stime.tv_usec);
rrdset_done(RS);
}
-static void updateTrainingChart(RRDHOST *RH,
- collected_number TotalTrainingDuration,
- collected_number MaxTrainingDuration)
+static void updateTrainingChart(RRDHOST *RH, struct rusage *TRU)
{
static thread_local RRDSET *RS = nullptr;
- static thread_local RRDDIM *TotalTrainingDurationRD = nullptr;
- static thread_local RRDDIM *MaxTrainingDurationRD = nullptr;
+ static thread_local RRDDIM *UserRD = nullptr;
+ static thread_local RRDDIM *SystemRD = nullptr;
if (!RS) {
- RS = rrdset_create(
- RH, // host
- "anomaly_detection", // type
- "training_stats", // id
- NULL, // name
- "training_stats", // family
- NULL, // ctx
- "Training step statistics", // title
- "milliseconds", // units
+ std::stringstream IdSS, NameSS;
+
+ IdSS << "training_stats_" << RH->machine_guid;
+ NameSS << "training_stats_for_" << RH->hostname;
+
+ RS = rrdset_create_localhost(
+ "netdata", // type
+ IdSS.str().c_str(), // id
+ NameSS.str().c_str(), // name
+ "ml", // family
+ "netdata.training_stats", // ctx
+ "Training thread CPU usage", // title
+ "milliseconds/s", // units
"netdata", // plugin
"ml", // module
- 39188, // priority
+ 136001, // priority
RH->rrd_update_every, // update_every
- RRDSET_TYPE_LINE // chart_type
+ RRDSET_TYPE_STACKED // chart_type
);
- TotalTrainingDurationRD = rrddim_add(RS, "total_training_duration", NULL,
- 1, 1, RRD_ALGORITHM_ABSOLUTE);
- MaxTrainingDurationRD = rrddim_add(RS, "max_training_duration", NULL,
- 1, 1, RRD_ALGORITHM_ABSOLUTE);
+ UserRD = rrddim_add(RS, "user", NULL, 1, 1000, RRD_ALGORITHM_INCREMENTAL);
+ SystemRD = rrddim_add(RS, "system", NULL, 1, 1000, RRD_ALGORITHM_INCREMENTAL);
} else
rrdset_next(RS);
- rrddim_set_by_pointer(RS, TotalTrainingDurationRD, TotalTrainingDuration);
- rrddim_set_by_pointer(RS, MaxTrainingDurationRD, MaxTrainingDuration);
-
+ rrddim_set_by_pointer(RS, UserRD, TRU->ru_utime.tv_sec * 1000000ULL + TRU->ru_utime.tv_usec);
+ rrddim_set_by_pointer(RS, SystemRD, TRU->ru_stime.tv_sec * 1000000ULL + TRU->ru_stime.tv_usec);
rrdset_done(RS);
}
void RrdHost::addDimension(Dimension *D) {
- std::lock_guard<std::mutex> Lock(Mutex);
+ RRDDIM *AnomalyRateRD = rrddim_add(AnomalyRateRS, D->getID().c_str(), NULL,
+ 1, 1000, RRD_ALGORITHM_ABSOLUTE);
+ D->setAnomalyRateRD(AnomalyRateRD);
+
+ {
+ std::lock_guard<std::mutex> Lock(Mutex);
- DimensionsMap[D->getRD()] = D;
+ DimensionsMap[D->getRD()] = D;
- // Default construct mutex for dimension
- LocksMap[D];
+ // Default construct mutex for dimension
+ LocksMap[D];
+ }
}
void RrdHost::removeDimension(Dimension *D) {
@@ -272,6 +308,7 @@ void RrdHost::getConfigAsJson(nlohmann::json &Json) const {
Json["smooth-n"] = Cfg.SmoothN;
Json["lag-n"] = Cfg.LagN;
+ Json["random-sampling-ratio"] = Cfg.RandomSamplingRatio;
Json["max-kmeans-iters"] = Cfg.MaxKMeansIters;
Json["dimension-anomaly-score-threshold"] = Cfg.DimensionAnomalyScoreThreshold;
@@ -310,11 +347,7 @@ void TrainableHost::trainDimension(Dimension *D, const TimePoint &NowTP) {
return;
D->LastTrainedAt = NowTP + Seconds{D->updateEvery()};
-
- TimePoint StartTP = SteadyClock::now();
D->trainModel();
- Duration<double> Duration = SteadyClock::now() - StartTP;
- D->updateTrainingDuration(Duration.count());
{
std::lock_guard<std::mutex> Lock(Mutex);
@@ -323,14 +356,21 @@ void TrainableHost::trainDimension(Dimension *D, const TimePoint &NowTP) {
}
void TrainableHost::train() {
- Duration<double> MaxSleepFor = Seconds{updateEvery()};
+ Duration<double> MaxSleepFor = Seconds{10 * updateEvery()};
while (!netdata_exit) {
+ netdata_thread_testcancel();
+ netdata_thread_disable_cancelability();
+
+ updateResourceUsage();
+
TimePoint NowTP = SteadyClock::now();
auto P = findDimensionToTrain(NowTP);
trainDimension(P.first, NowTP);
+ netdata_thread_enable_cancelability();
+
Duration<double> AllottedDuration = P.second;
Duration<double> RealDuration = SteadyClock::now() - NowTP;
@@ -344,7 +384,7 @@ void TrainableHost::train() {
}
void DetectableHost::detectOnce() {
- auto P = BRW.insert(AnomalyRate >= Cfg.HostAnomalyRateThreshold);
+ auto P = BRW.insert(WindowAnomalyRate >= Cfg.HostAnomalyRateThreshold);
BitRateWindow::Edge Edge = P.first;
size_t WindowLength = P.second;
@@ -358,8 +398,9 @@ void DetectableHost::detectOnce() {
size_t NumNormalDimensions = 0;
size_t NumTrainedDimensions = 0;
- double TotalTrainingDuration = 0.0;
- double MaxTrainingDuration = 0.0;
+ bool CollectAnomalyRates = (++AnomalyRateTimer == Cfg.DBEngineAnomalyRateEvery);
+ if (CollectAnomalyRates)
+ rrdset_next(AnomalyRateRS);
{
std::lock_guard<std::mutex> Lock(Mutex);
@@ -371,38 +412,44 @@ void DetectableHost::detectOnce() {
auto P = D->detect(WindowLength, ResetBitCounter);
bool IsAnomalous = P.first;
- double AnomalyRate = P.second;
+ double AnomalyScore = P.second;
NumTrainedDimensions += D->isTrained();
- double DimTrainingDuration = D->updateTrainingDuration(0.0);
- MaxTrainingDuration = std::max(MaxTrainingDuration, DimTrainingDuration);
- TotalTrainingDuration += DimTrainingDuration;
-
if (IsAnomalous)
NumAnomalousDimensions += 1;
- if (NewAnomalyEvent && (AnomalyRate >= Cfg.ADDimensionRateThreshold))
- DimsOverThreshold.push_back({ AnomalyRate, D->getID() });
+ if (NewAnomalyEvent && (AnomalyScore >= Cfg.ADDimensionRateThreshold))
+ DimsOverThreshold.push_back({ AnomalyScore, D->getID() });
+
+ D->updateAnomalyBitCounter(AnomalyRateRS, AnomalyRateTimer, IsAnomalous);
}
if (NumAnomalousDimensions)
- AnomalyRate = static_cast<double>(NumAnomalousDimensions) / DimensionsMap.size();
+ WindowAnomalyRate = static_cast<double>(NumAnomalousDimensions) / DimensionsMap.size();
else
- AnomalyRate = 0.0;
+ WindowAnomalyRate = 0.0;
NumNormalDimensions = DimensionsMap.size() - NumAnomalousDimensions;
}
+ if (CollectAnomalyRates) {
+ AnomalyRateTimer = 0;
+ rrdset_done(AnomalyRateRS);
+ }
+
this->NumAnomalousDimensions = NumAnomalousDimensions;
this->NumNormalDimensions = NumNormalDimensions;
this->NumTrainedDimensions = NumTrainedDimensions;
updateDimensionsChart(getRH(), NumTrainedDimensions, NumNormalDimensions, NumAnomalousDimensions);
- updateRateChart(getRH(), AnomalyRate * 10000.0);
+ updateRateChart(getRH(), WindowAnomalyRate * 10000.0);
updateWindowLengthChart(getRH(), WindowLength);
updateEventsChart(getRH(), P, ResetBitCounter, NewAnomalyEvent);
- updateTrainingChart(getRH(), TotalTrainingDuration * 1000.0, MaxTrainingDuration * 1000.0);
+
+ struct rusage TRU;
+ getResourceUsage(&TRU);
+ updateTrainingChart(getRH(), &TRU);
if (!NewAnomalyEvent || (DimsOverThreshold.size() == 0))
return;
@@ -431,15 +478,17 @@ void DetectableHost::detectOnce() {
void DetectableHost::detect() {
std::this_thread::sleep_for(Seconds{10});
- while (!netdata_exit) {
- TimePoint StartTP = SteadyClock::now();
- detectOnce();
- TimePoint EndTP = SteadyClock::now();
+ heartbeat_t HB;
+ heartbeat_init(&HB);
- Duration<double> Dur = EndTP - StartTP;
- updateDetectionChart(getRH(), Dur.count() * 1000);
+ while (!netdata_exit) {
+ netdata_thread_testcancel();
+ heartbeat_next(&HB, updateEvery() * USEC_PER_SEC);
- std::this_thread::sleep_for(Seconds{updateEvery()});
+ netdata_thread_disable_cancelability();
+ detectOnce();
+ updateDetectionChart(getRH());
+ netdata_thread_enable_cancelability();
}
}
@@ -457,6 +506,9 @@ void DetectableHost::startAnomalyDetectionThreads() {
}
void DetectableHost::stopAnomalyDetectionThreads() {
+ netdata_thread_cancel(TrainingThread.native_handle());
+ netdata_thread_cancel(DetectionThread.native_handle());
+
TrainingThread.join();
DetectionThread.join();
}