diff options
Diffstat (limited to 'ml/Host.cc')
-rw-r--r-- | ml/Host.cc | 255 |
1 files changed, 255 insertions, 0 deletions
diff --git a/ml/Host.cc b/ml/Host.cc new file mode 100644 index 0000000..4a57178 --- /dev/null +++ b/ml/Host.cc @@ -0,0 +1,255 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +#include "Config.h" +#include "Host.h" +#include "ADCharts.h" + +#include "json/single_include/nlohmann/json.hpp" + +using namespace ml; + +void RrdHost::addDimension(Dimension *D) { + std::lock_guard<std::mutex> Lock(Mutex); + + DimensionsMap[D->getRD()] = D; + + // Default construct mutex for dimension + LocksMap[D]; +} + +void RrdHost::removeDimension(Dimension *D) { + // Remove the dimension from the hosts map. + { + std::lock_guard<std::mutex> Lock(Mutex); + DimensionsMap.erase(D->getRD()); + } + + // Delete the dimension by locking the mutex that protects it. + { + std::lock_guard<std::mutex> Lock(LocksMap[D]); + delete D; + } + + // Remove the lock entry for the deleted dimension. + { + std::lock_guard<std::mutex> Lock(Mutex); + LocksMap.erase(D); + } +} + +void RrdHost::getConfigAsJson(nlohmann::json &Json) const { + Json["version"] = 1; + + Json["enabled"] = Cfg.EnableAnomalyDetection; + + Json["min-train-samples"] = Cfg.MinTrainSamples; + Json["max-train-samples"] = Cfg.MaxTrainSamples; + Json["train-every"] = Cfg.TrainEvery; + + Json["diff-n"] = Cfg.DiffN; + Json["smooth-n"] = Cfg.SmoothN; + Json["lag-n"] = Cfg.LagN; + + Json["random-sampling-ratio"] = Cfg.RandomSamplingRatio; + Json["max-kmeans-iters"] = Cfg.MaxKMeansIters; + + Json["dimension-anomaly-score-threshold"] = Cfg.DimensionAnomalyScoreThreshold; + + Json["host-anomaly-rate-threshold"] = Cfg.HostAnomalyRateThreshold; + Json["anomaly-detection-grouping-method"] = group_method2string(Cfg.AnomalyDetectionGroupingMethod); + Json["anomaly-detection-query-duration"] = Cfg.AnomalyDetectionQueryDuration; + + Json["hosts-to-skip"] = Cfg.HostsToSkip; + Json["charts-to-skip"] = Cfg.ChartsToSkip; +} + +void TrainableHost::getModelsAsJson(nlohmann::json &Json) { + std::lock_guard<std::mutex> Lock(Mutex); + + for (auto &DP : DimensionsMap) { + Dimension *D = DP.second; + + nlohmann::json JsonArray = nlohmann::json::array(); + for (const KMeans &KM : D->getModels()) { + nlohmann::json J; + KM.toJson(J); + JsonArray.push_back(J); + } + Json[getMLDimensionID(D->getRD())] = JsonArray; + } + + return; +} + +std::pair<Dimension *, Duration<double>> +TrainableHost::findDimensionToTrain(const TimePoint &NowTP) { + std::lock_guard<std::mutex> Lock(Mutex); + + Duration<double> AllottedDuration = Duration<double>{Cfg.TrainEvery * updateEvery()} / (DimensionsMap.size() + 1); + + for (auto &DP : DimensionsMap) { + Dimension *D = DP.second; + + if (D->shouldTrain(NowTP)) { + LocksMap[D].lock(); + return { D, AllottedDuration }; + } + } + + return { nullptr, AllottedDuration }; +} + +void TrainableHost::trainDimension(Dimension *D, const TimePoint &NowTP) { + if (D == nullptr) + return; + + D->LastTrainedAt = NowTP + Seconds{D->updateEvery()}; + D->trainModel(); + + { + std::lock_guard<std::mutex> Lock(Mutex); + LocksMap[D].unlock(); + } +} + +void TrainableHost::train() { + Duration<double> MaxSleepFor = Seconds{10 * updateEvery()}; + + worker_register("MLTRAIN"); + worker_register_job_name(0, "dimensions"); + + worker_is_busy(0); + while (!netdata_exit) { + netdata_thread_testcancel(); + netdata_thread_disable_cancelability(); + + updateResourceUsage(); + + TimePoint NowTP = SteadyClock::now(); + + auto P = findDimensionToTrain(NowTP); + trainDimension(P.first, NowTP); + + netdata_thread_enable_cancelability(); + + Duration<double> AllottedDuration = P.second; + Duration<double> RealDuration = SteadyClock::now() - NowTP; + + Duration<double> SleepFor; + if (RealDuration >= AllottedDuration) + continue; + + worker_is_idle(); + SleepFor = std::min(AllottedDuration - RealDuration, MaxSleepFor); + TimePoint Now = SteadyClock::now(); + auto Until = Now + SleepFor; + while (Now < Until && !netdata_exit) { + std::this_thread::sleep_for(std::chrono::milliseconds(1000)); + Now = SteadyClock::now(); + } + worker_is_busy(0); + } +} + +#define WORKER_JOB_DETECT_DIMENSION 0 +#define WORKER_JOB_UPDATE_DETECTION_CHART 1 +#define WORKER_JOB_UPDATE_ANOMALY_RATES 2 +#define WORKER_JOB_UPDATE_CHARTS 3 + +#if WORKER_UTILIZATION_MAX_JOB_TYPES < 5 +#error WORKER_UTILIZATION_MAX_JOB_TYPES has to be at least 5 +#endif + +void DetectableHost::detectOnce() { + size_t NumAnomalousDimensions = 0; + size_t NumNormalDimensions = 0; + size_t NumTrainedDimensions = 0; + size_t NumActiveDimensions = 0; + + { + std::lock_guard<std::mutex> Lock(Mutex); + + for (auto &DP : DimensionsMap) { + worker_is_busy(WORKER_JOB_DETECT_DIMENSION); + + Dimension *D = DP.second; + + if (!D->isActive()) + continue; + + NumActiveDimensions++; + NumTrainedDimensions += D->isTrained(); + + bool IsAnomalous = D->isAnomalous(); + if (IsAnomalous) + NumAnomalousDimensions += 1; + } + + if (NumAnomalousDimensions) + HostAnomalyRate = static_cast<double>(NumAnomalousDimensions) / NumActiveDimensions; + else + HostAnomalyRate = 0.0; + + NumNormalDimensions = NumActiveDimensions - NumAnomalousDimensions; + } + + this->NumAnomalousDimensions = NumAnomalousDimensions; + this->NumNormalDimensions = NumNormalDimensions; + this->NumTrainedDimensions = NumTrainedDimensions; + this->NumActiveDimensions = NumActiveDimensions; + + worker_is_busy(WORKER_JOB_UPDATE_CHARTS); + updateDimensionsChart(getRH(), NumTrainedDimensions, NumNormalDimensions, NumAnomalousDimensions); + updateHostAndDetectionRateCharts(getRH(), HostAnomalyRate * 10000.0); + + struct rusage TRU; + getResourceUsage(&TRU); + updateTrainingChart(getRH(), &TRU); +} + +void DetectableHost::detect() { + worker_register("MLDETECT"); + worker_register_job_name(WORKER_JOB_DETECT_DIMENSION, "dimensions"); + worker_register_job_name(WORKER_JOB_UPDATE_DETECTION_CHART, "detection chart"); + worker_register_job_name(WORKER_JOB_UPDATE_ANOMALY_RATES, "anomaly rates"); + worker_register_job_name(WORKER_JOB_UPDATE_CHARTS, "charts"); + + std::this_thread::sleep_for(Seconds{10}); + + heartbeat_t HB; + heartbeat_init(&HB); + + while (!netdata_exit) { + netdata_thread_testcancel(); + worker_is_idle(); + heartbeat_next(&HB, updateEvery() * USEC_PER_SEC); + + netdata_thread_disable_cancelability(); + detectOnce(); + + worker_is_busy(WORKER_JOB_UPDATE_DETECTION_CHART); + updateDetectionChart(getRH()); + netdata_thread_enable_cancelability(); + } +} + +void DetectableHost::getDetectionInfoAsJson(nlohmann::json &Json) const { + Json["version"] = 1; + Json["anomalous-dimensions"] = NumAnomalousDimensions; + Json["normal-dimensions"] = NumNormalDimensions; + Json["total-dimensions"] = NumAnomalousDimensions + NumNormalDimensions; + Json["trained-dimensions"] = NumTrainedDimensions; +} + +void DetectableHost::startAnomalyDetectionThreads() { + TrainingThread = std::thread(&TrainableHost::train, this); + DetectionThread = std::thread(&DetectableHost::detect, this); +} + +void DetectableHost::stopAnomalyDetectionThreads() { + netdata_thread_cancel(TrainingThread.native_handle()); + netdata_thread_cancel(DetectionThread.native_handle()); + + TrainingThread.join(); + DetectionThread.join(); +} |