diff options
Diffstat (limited to 'ml/Host.cc')
-rw-r--r-- | ml/Host.cc | 448 |
1 files changed, 290 insertions, 158 deletions
diff --git a/ml/Host.cc b/ml/Host.cc index 4a57178c7..a5f276a80 100644 --- a/ml/Host.cc +++ b/ml/Host.cc @@ -2,42 +2,24 @@ #include "Config.h" #include "Host.h" +#include "Queue.h" #include "ADCharts.h" #include "json/single_include/nlohmann/json.hpp" using namespace ml; -void RrdHost::addDimension(Dimension *D) { - std::lock_guard<std::mutex> Lock(Mutex); - - DimensionsMap[D->getRD()] = D; - - // Default construct mutex for dimension - LocksMap[D]; +void Host::addChart(Chart *C) { + std::lock_guard<Mutex> L(M); + Charts[C->getRS()] = C; } -void RrdHost::removeDimension(Dimension *D) { - // Remove the dimension from the hosts map. - { - std::lock_guard<std::mutex> Lock(Mutex); - DimensionsMap.erase(D->getRD()); - } - - // Delete the dimension by locking the mutex that protects it. - { - std::lock_guard<std::mutex> Lock(LocksMap[D]); - delete D; - } - - // Remove the lock entry for the deleted dimension. - { - std::lock_guard<std::mutex> Lock(Mutex); - LocksMap.erase(D); - } +void Host::removeChart(Chart *C) { + std::lock_guard<Mutex> L(M); + Charts.erase(C->getRS()); } -void RrdHost::getConfigAsJson(nlohmann::json &Json) const { +void Host::getConfigAsJson(nlohmann::json &Json) const { Json["version"] = 1; Json["enabled"] = Cfg.EnableAnomalyDetection; @@ -63,193 +45,343 @@ void RrdHost::getConfigAsJson(nlohmann::json &Json) const { Json["charts-to-skip"] = Cfg.ChartsToSkip; } -void TrainableHost::getModelsAsJson(nlohmann::json &Json) { - std::lock_guard<std::mutex> Lock(Mutex); +void Host::getModelsAsJson(nlohmann::json &Json) { + std::lock_guard<Mutex> L(M); - for (auto &DP : DimensionsMap) { - Dimension *D = DP.second; - - nlohmann::json JsonArray = nlohmann::json::array(); - for (const KMeans &KM : D->getModels()) { - nlohmann::json J; - KM.toJson(J); - JsonArray.push_back(J); - } - Json[getMLDimensionID(D->getRD())] = JsonArray; + for (auto &CP : Charts) { + Chart *C = CP.second; + C->getModelsAsJson(Json); } - - return; } -std::pair<Dimension *, Duration<double>> -TrainableHost::findDimensionToTrain(const TimePoint &NowTP) { - std::lock_guard<std::mutex> Lock(Mutex); +#define WORKER_JOB_DETECTION_PREP 0 +#define WORKER_JOB_DETECTION_DIM_CHART 1 +#define WORKER_JOB_DETECTION_HOST_CHART 2 +#define WORKER_JOB_DETECTION_STATS 3 +#define WORKER_JOB_DETECTION_RESOURCES 4 - Duration<double> AllottedDuration = Duration<double>{Cfg.TrainEvery * updateEvery()} / (DimensionsMap.size() + 1); +void Host::detectOnce() { + worker_is_busy(WORKER_JOB_DETECTION_PREP); - for (auto &DP : DimensionsMap) { - Dimension *D = DP.second; + MLS = {}; + MachineLearningStats MLSCopy = {}; + TrainingStats TSCopy = {}; - if (D->shouldTrain(NowTP)) { - LocksMap[D].lock(); - return { D, AllottedDuration }; - } - } + { + std::lock_guard<Mutex> L(M); - return { nullptr, AllottedDuration }; -} + /* + * prediction/detection stats + */ + for (auto &CP : Charts) { + Chart *C = CP.second; -void TrainableHost::trainDimension(Dimension *D, const TimePoint &NowTP) { - if (D == nullptr) - return; + if (!C->isAvailableForML()) + continue; - D->LastTrainedAt = NowTP + Seconds{D->updateEvery()}; - D->trainModel(); + MachineLearningStats ChartMLS = C->getMLS(); - { - std::lock_guard<std::mutex> Lock(Mutex); - LocksMap[D].unlock(); - } -} + MLS.NumMachineLearningStatusEnabled += ChartMLS.NumMachineLearningStatusEnabled; + MLS.NumMachineLearningStatusDisabledUE += ChartMLS.NumMachineLearningStatusDisabledUE; + MLS.NumMachineLearningStatusDisabledSP += ChartMLS.NumMachineLearningStatusDisabledSP; -void TrainableHost::train() { - Duration<double> MaxSleepFor = Seconds{10 * updateEvery()}; + MLS.NumMetricTypeConstant += ChartMLS.NumMetricTypeConstant; + MLS.NumMetricTypeVariable += ChartMLS.NumMetricTypeVariable; - worker_register("MLTRAIN"); - worker_register_job_name(0, "dimensions"); + MLS.NumTrainingStatusUntrained += ChartMLS.NumTrainingStatusUntrained; + MLS.NumTrainingStatusPendingWithoutModel += ChartMLS.NumTrainingStatusPendingWithoutModel; + MLS.NumTrainingStatusTrained += ChartMLS.NumTrainingStatusTrained; + MLS.NumTrainingStatusPendingWithModel += ChartMLS.NumTrainingStatusPendingWithModel; - worker_is_busy(0); - while (!netdata_exit) { - netdata_thread_testcancel(); - netdata_thread_disable_cancelability(); + MLS.NumAnomalousDimensions += ChartMLS.NumAnomalousDimensions; + MLS.NumNormalDimensions += ChartMLS.NumNormalDimensions; + } - updateResourceUsage(); + HostAnomalyRate = 0.0; + size_t NumActiveDimensions = MLS.NumAnomalousDimensions + MLS.NumNormalDimensions; + if (NumActiveDimensions) + HostAnomalyRate = static_cast<double>(MLS.NumAnomalousDimensions) / NumActiveDimensions; - TimePoint NowTP = SteadyClock::now(); + MLSCopy = MLS; - auto P = findDimensionToTrain(NowTP); - trainDimension(P.first, NowTP); + /* + * training stats + */ + TSCopy = TS; - netdata_thread_enable_cancelability(); + TS.QueueSize = 0; + TS.NumPoppedItems = 0; - Duration<double> AllottedDuration = P.second; - Duration<double> RealDuration = SteadyClock::now() - NowTP; + TS.AllottedUT = 0; + TS.ConsumedUT = 0; + TS.RemainingUT = 0; - Duration<double> SleepFor; - if (RealDuration >= AllottedDuration) - continue; + TS.TrainingResultOk = 0; + TS.TrainingResultInvalidQueryTimeRange = 0; + TS.TrainingResultNotEnoughCollectedValues = 0; + TS.TrainingResultNullAcquiredDimension = 0; + TS.TrainingResultChartUnderReplication = 0; + } - worker_is_idle(); - SleepFor = std::min(AllottedDuration - RealDuration, MaxSleepFor); - TimePoint Now = SteadyClock::now(); - auto Until = Now + SleepFor; - while (Now < Until && !netdata_exit) { - std::this_thread::sleep_for(std::chrono::milliseconds(1000)); - Now = SteadyClock::now(); - } - worker_is_busy(0); + // Calc the avg values + if (TSCopy.NumPoppedItems) { + TSCopy.QueueSize /= TSCopy.NumPoppedItems; + TSCopy.AllottedUT /= TSCopy.NumPoppedItems; + TSCopy.ConsumedUT /= TSCopy.NumPoppedItems; + TSCopy.RemainingUT /= TSCopy.NumPoppedItems; + + TSCopy.TrainingResultOk /= TSCopy.NumPoppedItems; + TSCopy.TrainingResultInvalidQueryTimeRange /= TSCopy.NumPoppedItems; + TSCopy.TrainingResultNotEnoughCollectedValues /= TSCopy.NumPoppedItems; + TSCopy.TrainingResultNullAcquiredDimension /= TSCopy.NumPoppedItems; + TSCopy.TrainingResultChartUnderReplication /= TSCopy.NumPoppedItems; + } else { + TSCopy.QueueSize = 0; + TSCopy.AllottedUT = 0; + TSCopy.ConsumedUT = 0; + TSCopy.RemainingUT = 0; } -} -#define WORKER_JOB_DETECT_DIMENSION 0 -#define WORKER_JOB_UPDATE_DETECTION_CHART 1 -#define WORKER_JOB_UPDATE_ANOMALY_RATES 2 -#define WORKER_JOB_UPDATE_CHARTS 3 + if(!RH) + return; -#if WORKER_UTILIZATION_MAX_JOB_TYPES < 5 -#error WORKER_UTILIZATION_MAX_JOB_TYPES has to be at least 5 + worker_is_busy(WORKER_JOB_DETECTION_DIM_CHART); + updateDimensionsChart(RH, MLSCopy); + + worker_is_busy(WORKER_JOB_DETECTION_HOST_CHART); + updateHostAndDetectionRateCharts(RH, HostAnomalyRate * 10000.0); + +#ifdef NETDATA_ML_RESOURCE_CHARTS + worker_is_busy(WORKER_JOB_DETECTION_RESOURCES); + struct rusage PredictionRU; + getrusage(RUSAGE_THREAD, &PredictionRU); + updateResourceUsageCharts(RH, PredictionRU, TSCopy.TrainingRU); #endif -void DetectableHost::detectOnce() { - size_t NumAnomalousDimensions = 0; - size_t NumNormalDimensions = 0; - size_t NumTrainedDimensions = 0; - size_t NumActiveDimensions = 0; + worker_is_busy(WORKER_JOB_DETECTION_STATS); + updateTrainingStatisticsChart(RH, TSCopy); +} - { - std::lock_guard<std::mutex> Lock(Mutex); +class AcquiredDimension { +public: + static AcquiredDimension find(RRDHOST *RH, STRING *ChartId, STRING *DimensionId) { + RRDDIM_ACQUIRED *AcqRD = nullptr; + Dimension *D = nullptr; + + RRDSET *RS = rrdset_find(RH, string2str(ChartId)); + if (RS) { + AcqRD = rrddim_find_and_acquire(RS, string2str(DimensionId)); + if (AcqRD) { + RRDDIM *RD = rrddim_acquired_to_rrddim(AcqRD); + if (RD) + D = reinterpret_cast<Dimension *>(RD->ml_dimension); + } + } - for (auto &DP : DimensionsMap) { - worker_is_busy(WORKER_JOB_DETECT_DIMENSION); + return AcquiredDimension(AcqRD, D); + } - Dimension *D = DP.second; +private: + AcquiredDimension(RRDDIM_ACQUIRED *AcqRD, Dimension *D) : AcqRD(AcqRD), D(D) {} - if (!D->isActive()) - continue; +public: + TrainingResult train(const TrainingRequest &TR) { + if (!D) + return TrainingResult::NullAcquiredDimension; - NumActiveDimensions++; - NumTrainedDimensions += D->isTrained(); + return D->trainModel(TR); + } - bool IsAnomalous = D->isAnomalous(); - if (IsAnomalous) - NumAnomalousDimensions += 1; + ~AcquiredDimension() { + if (AcqRD) + rrddim_acquired_release(AcqRD); + } + +private: + RRDDIM_ACQUIRED *AcqRD; + Dimension *D; +}; + +void Host::scheduleForTraining(TrainingRequest TR) { + TrainingQueue.push(TR); +} + +#define WORKER_JOB_TRAINING_FIND 0 +#define WORKER_JOB_TRAINING_TRAIN 1 +#define WORKER_JOB_TRAINING_STATS 2 + +void Host::train() { + worker_register("MLTRAIN"); + worker_register_job_name(WORKER_JOB_TRAINING_FIND, "find"); + worker_register_job_name(WORKER_JOB_TRAINING_TRAIN, "train"); + worker_register_job_name(WORKER_JOB_TRAINING_STATS, "stats"); + + service_register(SERVICE_THREAD_TYPE_NETDATA, NULL, (force_quit_t )ml_cancel_anomaly_detection_threads, RH, true); + + while (service_running(SERVICE_ML_TRAINING)) { + auto P = TrainingQueue.pop(); + TrainingRequest TrainingReq = P.first; + size_t Size = P.second; + + if (ThreadsCancelled) { + info("Stopping training thread because it was cancelled."); + break; } - if (NumAnomalousDimensions) - HostAnomalyRate = static_cast<double>(NumAnomalousDimensions) / NumActiveDimensions; - else - HostAnomalyRate = 0.0; + usec_t AllottedUT = (Cfg.TrainEvery * RH->rrd_update_every * USEC_PER_SEC) / Size; + if (AllottedUT > USEC_PER_SEC) + AllottedUT = USEC_PER_SEC; - NumNormalDimensions = NumActiveDimensions - NumAnomalousDimensions; - } + usec_t StartUT = now_monotonic_usec(); + TrainingResult TrainingRes; + { + worker_is_busy(WORKER_JOB_TRAINING_FIND); + AcquiredDimension AcqDim = AcquiredDimension::find(RH, TrainingReq.ChartId, TrainingReq.DimensionId); - this->NumAnomalousDimensions = NumAnomalousDimensions; - this->NumNormalDimensions = NumNormalDimensions; - this->NumTrainedDimensions = NumTrainedDimensions; - this->NumActiveDimensions = NumActiveDimensions; + worker_is_busy(WORKER_JOB_TRAINING_TRAIN); + TrainingRes = AcqDim.train(TrainingReq); - worker_is_busy(WORKER_JOB_UPDATE_CHARTS); - updateDimensionsChart(getRH(), NumTrainedDimensions, NumNormalDimensions, NumAnomalousDimensions); - updateHostAndDetectionRateCharts(getRH(), HostAnomalyRate * 10000.0); + string_freez(TrainingReq.ChartId); + string_freez(TrainingReq.DimensionId); + } + usec_t ConsumedUT = now_monotonic_usec() - StartUT; + + worker_is_busy(WORKER_JOB_TRAINING_STATS); + + usec_t RemainingUT = 0; + if (ConsumedUT < AllottedUT) + RemainingUT = AllottedUT - ConsumedUT; + + { + std::lock_guard<Mutex> L(M); + + if (TS.AllottedUT == 0) { + struct rusage TRU; + getrusage(RUSAGE_THREAD, &TRU); + TS.TrainingRU = TRU; + } + + TS.QueueSize += Size; + TS.NumPoppedItems += 1; + + TS.AllottedUT += AllottedUT; + TS.ConsumedUT += ConsumedUT; + TS.RemainingUT += RemainingUT; + + switch (TrainingRes) { + case TrainingResult::Ok: + TS.TrainingResultOk += 1; + break; + case TrainingResult::InvalidQueryTimeRange: + TS.TrainingResultInvalidQueryTimeRange += 1; + break; + case TrainingResult::NotEnoughCollectedValues: + TS.TrainingResultNotEnoughCollectedValues += 1; + break; + case TrainingResult::NullAcquiredDimension: + TS.TrainingResultNullAcquiredDimension += 1; + break; + case TrainingResult::ChartUnderReplication: + TS.TrainingResultChartUnderReplication += 1; + break; + } + } - struct rusage TRU; - getResourceUsage(&TRU); - updateTrainingChart(getRH(), &TRU); + worker_is_idle(); + std::this_thread::sleep_for(std::chrono::microseconds{RemainingUT}); + worker_is_busy(0); + } } -void DetectableHost::detect() { +void Host::detect() { worker_register("MLDETECT"); - worker_register_job_name(WORKER_JOB_DETECT_DIMENSION, "dimensions"); - worker_register_job_name(WORKER_JOB_UPDATE_DETECTION_CHART, "detection chart"); - worker_register_job_name(WORKER_JOB_UPDATE_ANOMALY_RATES, "anomaly rates"); - worker_register_job_name(WORKER_JOB_UPDATE_CHARTS, "charts"); + worker_register_job_name(WORKER_JOB_DETECTION_PREP, "prep"); + worker_register_job_name(WORKER_JOB_DETECTION_DIM_CHART, "dim chart"); + worker_register_job_name(WORKER_JOB_DETECTION_HOST_CHART, "host chart"); + worker_register_job_name(WORKER_JOB_DETECTION_STATS, "stats"); + worker_register_job_name(WORKER_JOB_DETECTION_RESOURCES, "resources"); - std::this_thread::sleep_for(Seconds{10}); + service_register(SERVICE_THREAD_TYPE_NETDATA, NULL, (force_quit_t )ml_cancel_anomaly_detection_threads, RH, true); heartbeat_t HB; heartbeat_init(&HB); - while (!netdata_exit) { - netdata_thread_testcancel(); + while (service_running((SERVICE_TYPE)(SERVICE_ML_PREDICTION | SERVICE_COLLECTORS))) { worker_is_idle(); - heartbeat_next(&HB, updateEvery() * USEC_PER_SEC); - - netdata_thread_disable_cancelability(); + heartbeat_next(&HB, (RH ? RH->rrd_update_every : default_rrd_update_every) * USEC_PER_SEC); detectOnce(); - - worker_is_busy(WORKER_JOB_UPDATE_DETECTION_CHART); - updateDetectionChart(getRH()); - netdata_thread_enable_cancelability(); } } -void DetectableHost::getDetectionInfoAsJson(nlohmann::json &Json) const { +void Host::getDetectionInfoAsJson(nlohmann::json &Json) const { Json["version"] = 1; - Json["anomalous-dimensions"] = NumAnomalousDimensions; - Json["normal-dimensions"] = NumNormalDimensions; - Json["total-dimensions"] = NumAnomalousDimensions + NumNormalDimensions; - Json["trained-dimensions"] = NumTrainedDimensions; + Json["anomalous-dimensions"] = MLS.NumAnomalousDimensions; + Json["normal-dimensions"] = MLS.NumNormalDimensions; + Json["total-dimensions"] = MLS.NumAnomalousDimensions + MLS.NumNormalDimensions; + Json["trained-dimensions"] = MLS.NumTrainingStatusTrained + MLS.NumTrainingStatusPendingWithModel; +} + +void *train_main(void *Arg) { + Host *H = reinterpret_cast<Host *>(Arg); + H->train(); + return nullptr; } -void DetectableHost::startAnomalyDetectionThreads() { - TrainingThread = std::thread(&TrainableHost::train, this); - DetectionThread = std::thread(&DetectableHost::detect, this); +void *detect_main(void *Arg) { + Host *H = reinterpret_cast<Host *>(Arg); + H->detect(); + return nullptr; } -void DetectableHost::stopAnomalyDetectionThreads() { - netdata_thread_cancel(TrainingThread.native_handle()); - netdata_thread_cancel(DetectionThread.native_handle()); +void Host::startAnomalyDetectionThreads() { + if (ThreadsRunning) { + error("Anomaly detections threads for host %s are already-up and running.", rrdhost_hostname(RH)); + return; + } + + ThreadsRunning = true; + ThreadsCancelled = false; + ThreadsJoined = false; + + char Tag[NETDATA_THREAD_TAG_MAX + 1]; + +// #define ML_DISABLE_JOINING - TrainingThread.join(); - DetectionThread.join(); + snprintfz(Tag, NETDATA_THREAD_TAG_MAX, "MLTR[%s]", rrdhost_hostname(RH)); + netdata_thread_create(&TrainingThread, Tag, NETDATA_THREAD_OPTION_JOINABLE, train_main, static_cast<void *>(this)); + + snprintfz(Tag, NETDATA_THREAD_TAG_MAX, "MLDT[%s]", rrdhost_hostname(RH)); + netdata_thread_create(&DetectionThread, Tag, NETDATA_THREAD_OPTION_JOINABLE, detect_main, static_cast<void *>(this)); +} + +void Host::stopAnomalyDetectionThreads(bool join) { + if (!ThreadsRunning) { + error("Anomaly detections threads for host %s have already been stopped.", rrdhost_hostname(RH)); + return; + } + + if(!ThreadsCancelled) { + ThreadsCancelled = true; + + // Signal the training queue to stop popping-items + TrainingQueue.signal(); + netdata_thread_cancel(TrainingThread); + netdata_thread_cancel(DetectionThread); + } + + if (join && !ThreadsJoined) { + ThreadsJoined = true; + ThreadsRunning = false; + + // these fail on alpine linux and our CI hangs forever + // failing to compile static builds + + // commenting them, until we find a solution + + // to enable again: + // NETDATA_THREAD_OPTION_DEFAULT needs to become NETDATA_THREAD_OPTION_JOINABLE + + netdata_thread_join(TrainingThread, nullptr); + netdata_thread_join(DetectionThread, nullptr); + } } |