summaryrefslogtreecommitdiffstats
path: root/ml/Host.cc
diff options
context:
space:
mode:
Diffstat (limited to 'ml/Host.cc')
-rw-r--r--ml/Host.cc448
1 files changed, 290 insertions, 158 deletions
diff --git a/ml/Host.cc b/ml/Host.cc
index 4a57178c7..a5f276a80 100644
--- a/ml/Host.cc
+++ b/ml/Host.cc
@@ -2,42 +2,24 @@
#include "Config.h"
#include "Host.h"
+#include "Queue.h"
#include "ADCharts.h"
#include "json/single_include/nlohmann/json.hpp"
using namespace ml;
-void RrdHost::addDimension(Dimension *D) {
- std::lock_guard<std::mutex> Lock(Mutex);
-
- DimensionsMap[D->getRD()] = D;
-
- // Default construct mutex for dimension
- LocksMap[D];
+void Host::addChart(Chart *C) {
+ std::lock_guard<Mutex> L(M);
+ Charts[C->getRS()] = C;
}
-void RrdHost::removeDimension(Dimension *D) {
- // Remove the dimension from the hosts map.
- {
- std::lock_guard<std::mutex> Lock(Mutex);
- DimensionsMap.erase(D->getRD());
- }
-
- // Delete the dimension by locking the mutex that protects it.
- {
- std::lock_guard<std::mutex> Lock(LocksMap[D]);
- delete D;
- }
-
- // Remove the lock entry for the deleted dimension.
- {
- std::lock_guard<std::mutex> Lock(Mutex);
- LocksMap.erase(D);
- }
+void Host::removeChart(Chart *C) {
+ std::lock_guard<Mutex> L(M);
+ Charts.erase(C->getRS());
}
-void RrdHost::getConfigAsJson(nlohmann::json &Json) const {
+void Host::getConfigAsJson(nlohmann::json &Json) const {
Json["version"] = 1;
Json["enabled"] = Cfg.EnableAnomalyDetection;
@@ -63,193 +45,343 @@ void RrdHost::getConfigAsJson(nlohmann::json &Json) const {
Json["charts-to-skip"] = Cfg.ChartsToSkip;
}
-void TrainableHost::getModelsAsJson(nlohmann::json &Json) {
- std::lock_guard<std::mutex> Lock(Mutex);
+void Host::getModelsAsJson(nlohmann::json &Json) {
+ std::lock_guard<Mutex> L(M);
- for (auto &DP : DimensionsMap) {
- Dimension *D = DP.second;
-
- nlohmann::json JsonArray = nlohmann::json::array();
- for (const KMeans &KM : D->getModels()) {
- nlohmann::json J;
- KM.toJson(J);
- JsonArray.push_back(J);
- }
- Json[getMLDimensionID(D->getRD())] = JsonArray;
+ for (auto &CP : Charts) {
+ Chart *C = CP.second;
+ C->getModelsAsJson(Json);
}
-
- return;
}
-std::pair<Dimension *, Duration<double>>
-TrainableHost::findDimensionToTrain(const TimePoint &NowTP) {
- std::lock_guard<std::mutex> Lock(Mutex);
+#define WORKER_JOB_DETECTION_PREP 0
+#define WORKER_JOB_DETECTION_DIM_CHART 1
+#define WORKER_JOB_DETECTION_HOST_CHART 2
+#define WORKER_JOB_DETECTION_STATS 3
+#define WORKER_JOB_DETECTION_RESOURCES 4
- Duration<double> AllottedDuration = Duration<double>{Cfg.TrainEvery * updateEvery()} / (DimensionsMap.size() + 1);
+void Host::detectOnce() {
+ worker_is_busy(WORKER_JOB_DETECTION_PREP);
- for (auto &DP : DimensionsMap) {
- Dimension *D = DP.second;
+ MLS = {};
+ MachineLearningStats MLSCopy = {};
+ TrainingStats TSCopy = {};
- if (D->shouldTrain(NowTP)) {
- LocksMap[D].lock();
- return { D, AllottedDuration };
- }
- }
+ {
+ std::lock_guard<Mutex> L(M);
- return { nullptr, AllottedDuration };
-}
+ /*
+ * prediction/detection stats
+ */
+ for (auto &CP : Charts) {
+ Chart *C = CP.second;
-void TrainableHost::trainDimension(Dimension *D, const TimePoint &NowTP) {
- if (D == nullptr)
- return;
+ if (!C->isAvailableForML())
+ continue;
- D->LastTrainedAt = NowTP + Seconds{D->updateEvery()};
- D->trainModel();
+ MachineLearningStats ChartMLS = C->getMLS();
- {
- std::lock_guard<std::mutex> Lock(Mutex);
- LocksMap[D].unlock();
- }
-}
+ MLS.NumMachineLearningStatusEnabled += ChartMLS.NumMachineLearningStatusEnabled;
+ MLS.NumMachineLearningStatusDisabledUE += ChartMLS.NumMachineLearningStatusDisabledUE;
+ MLS.NumMachineLearningStatusDisabledSP += ChartMLS.NumMachineLearningStatusDisabledSP;
-void TrainableHost::train() {
- Duration<double> MaxSleepFor = Seconds{10 * updateEvery()};
+ MLS.NumMetricTypeConstant += ChartMLS.NumMetricTypeConstant;
+ MLS.NumMetricTypeVariable += ChartMLS.NumMetricTypeVariable;
- worker_register("MLTRAIN");
- worker_register_job_name(0, "dimensions");
+ MLS.NumTrainingStatusUntrained += ChartMLS.NumTrainingStatusUntrained;
+ MLS.NumTrainingStatusPendingWithoutModel += ChartMLS.NumTrainingStatusPendingWithoutModel;
+ MLS.NumTrainingStatusTrained += ChartMLS.NumTrainingStatusTrained;
+ MLS.NumTrainingStatusPendingWithModel += ChartMLS.NumTrainingStatusPendingWithModel;
- worker_is_busy(0);
- while (!netdata_exit) {
- netdata_thread_testcancel();
- netdata_thread_disable_cancelability();
+ MLS.NumAnomalousDimensions += ChartMLS.NumAnomalousDimensions;
+ MLS.NumNormalDimensions += ChartMLS.NumNormalDimensions;
+ }
- updateResourceUsage();
+ HostAnomalyRate = 0.0;
+ size_t NumActiveDimensions = MLS.NumAnomalousDimensions + MLS.NumNormalDimensions;
+ if (NumActiveDimensions)
+ HostAnomalyRate = static_cast<double>(MLS.NumAnomalousDimensions) / NumActiveDimensions;
- TimePoint NowTP = SteadyClock::now();
+ MLSCopy = MLS;
- auto P = findDimensionToTrain(NowTP);
- trainDimension(P.first, NowTP);
+ /*
+ * training stats
+ */
+ TSCopy = TS;
- netdata_thread_enable_cancelability();
+ TS.QueueSize = 0;
+ TS.NumPoppedItems = 0;
- Duration<double> AllottedDuration = P.second;
- Duration<double> RealDuration = SteadyClock::now() - NowTP;
+ TS.AllottedUT = 0;
+ TS.ConsumedUT = 0;
+ TS.RemainingUT = 0;
- Duration<double> SleepFor;
- if (RealDuration >= AllottedDuration)
- continue;
+ TS.TrainingResultOk = 0;
+ TS.TrainingResultInvalidQueryTimeRange = 0;
+ TS.TrainingResultNotEnoughCollectedValues = 0;
+ TS.TrainingResultNullAcquiredDimension = 0;
+ TS.TrainingResultChartUnderReplication = 0;
+ }
- worker_is_idle();
- SleepFor = std::min(AllottedDuration - RealDuration, MaxSleepFor);
- TimePoint Now = SteadyClock::now();
- auto Until = Now + SleepFor;
- while (Now < Until && !netdata_exit) {
- std::this_thread::sleep_for(std::chrono::milliseconds(1000));
- Now = SteadyClock::now();
- }
- worker_is_busy(0);
+ // Calc the avg values
+ if (TSCopy.NumPoppedItems) {
+ TSCopy.QueueSize /= TSCopy.NumPoppedItems;
+ TSCopy.AllottedUT /= TSCopy.NumPoppedItems;
+ TSCopy.ConsumedUT /= TSCopy.NumPoppedItems;
+ TSCopy.RemainingUT /= TSCopy.NumPoppedItems;
+
+ TSCopy.TrainingResultOk /= TSCopy.NumPoppedItems;
+ TSCopy.TrainingResultInvalidQueryTimeRange /= TSCopy.NumPoppedItems;
+ TSCopy.TrainingResultNotEnoughCollectedValues /= TSCopy.NumPoppedItems;
+ TSCopy.TrainingResultNullAcquiredDimension /= TSCopy.NumPoppedItems;
+ TSCopy.TrainingResultChartUnderReplication /= TSCopy.NumPoppedItems;
+ } else {
+ TSCopy.QueueSize = 0;
+ TSCopy.AllottedUT = 0;
+ TSCopy.ConsumedUT = 0;
+ TSCopy.RemainingUT = 0;
}
-}
-#define WORKER_JOB_DETECT_DIMENSION 0
-#define WORKER_JOB_UPDATE_DETECTION_CHART 1
-#define WORKER_JOB_UPDATE_ANOMALY_RATES 2
-#define WORKER_JOB_UPDATE_CHARTS 3
+ if(!RH)
+ return;
-#if WORKER_UTILIZATION_MAX_JOB_TYPES < 5
-#error WORKER_UTILIZATION_MAX_JOB_TYPES has to be at least 5
+ worker_is_busy(WORKER_JOB_DETECTION_DIM_CHART);
+ updateDimensionsChart(RH, MLSCopy);
+
+ worker_is_busy(WORKER_JOB_DETECTION_HOST_CHART);
+ updateHostAndDetectionRateCharts(RH, HostAnomalyRate * 10000.0);
+
+#ifdef NETDATA_ML_RESOURCE_CHARTS
+ worker_is_busy(WORKER_JOB_DETECTION_RESOURCES);
+ struct rusage PredictionRU;
+ getrusage(RUSAGE_THREAD, &PredictionRU);
+ updateResourceUsageCharts(RH, PredictionRU, TSCopy.TrainingRU);
#endif
-void DetectableHost::detectOnce() {
- size_t NumAnomalousDimensions = 0;
- size_t NumNormalDimensions = 0;
- size_t NumTrainedDimensions = 0;
- size_t NumActiveDimensions = 0;
+ worker_is_busy(WORKER_JOB_DETECTION_STATS);
+ updateTrainingStatisticsChart(RH, TSCopy);
+}
- {
- std::lock_guard<std::mutex> Lock(Mutex);
+class AcquiredDimension {
+public:
+ static AcquiredDimension find(RRDHOST *RH, STRING *ChartId, STRING *DimensionId) {
+ RRDDIM_ACQUIRED *AcqRD = nullptr;
+ Dimension *D = nullptr;
+
+ RRDSET *RS = rrdset_find(RH, string2str(ChartId));
+ if (RS) {
+ AcqRD = rrddim_find_and_acquire(RS, string2str(DimensionId));
+ if (AcqRD) {
+ RRDDIM *RD = rrddim_acquired_to_rrddim(AcqRD);
+ if (RD)
+ D = reinterpret_cast<Dimension *>(RD->ml_dimension);
+ }
+ }
- for (auto &DP : DimensionsMap) {
- worker_is_busy(WORKER_JOB_DETECT_DIMENSION);
+ return AcquiredDimension(AcqRD, D);
+ }
- Dimension *D = DP.second;
+private:
+ AcquiredDimension(RRDDIM_ACQUIRED *AcqRD, Dimension *D) : AcqRD(AcqRD), D(D) {}
- if (!D->isActive())
- continue;
+public:
+ TrainingResult train(const TrainingRequest &TR) {
+ if (!D)
+ return TrainingResult::NullAcquiredDimension;
- NumActiveDimensions++;
- NumTrainedDimensions += D->isTrained();
+ return D->trainModel(TR);
+ }
- bool IsAnomalous = D->isAnomalous();
- if (IsAnomalous)
- NumAnomalousDimensions += 1;
+ ~AcquiredDimension() {
+ if (AcqRD)
+ rrddim_acquired_release(AcqRD);
+ }
+
+private:
+ RRDDIM_ACQUIRED *AcqRD;
+ Dimension *D;
+};
+
+void Host::scheduleForTraining(TrainingRequest TR) {
+ TrainingQueue.push(TR);
+}
+
+#define WORKER_JOB_TRAINING_FIND 0
+#define WORKER_JOB_TRAINING_TRAIN 1
+#define WORKER_JOB_TRAINING_STATS 2
+
+void Host::train() {
+ worker_register("MLTRAIN");
+ worker_register_job_name(WORKER_JOB_TRAINING_FIND, "find");
+ worker_register_job_name(WORKER_JOB_TRAINING_TRAIN, "train");
+ worker_register_job_name(WORKER_JOB_TRAINING_STATS, "stats");
+
+ service_register(SERVICE_THREAD_TYPE_NETDATA, NULL, (force_quit_t )ml_cancel_anomaly_detection_threads, RH, true);
+
+ while (service_running(SERVICE_ML_TRAINING)) {
+ auto P = TrainingQueue.pop();
+ TrainingRequest TrainingReq = P.first;
+ size_t Size = P.second;
+
+ if (ThreadsCancelled) {
+ info("Stopping training thread because it was cancelled.");
+ break;
}
- if (NumAnomalousDimensions)
- HostAnomalyRate = static_cast<double>(NumAnomalousDimensions) / NumActiveDimensions;
- else
- HostAnomalyRate = 0.0;
+ usec_t AllottedUT = (Cfg.TrainEvery * RH->rrd_update_every * USEC_PER_SEC) / Size;
+ if (AllottedUT > USEC_PER_SEC)
+ AllottedUT = USEC_PER_SEC;
- NumNormalDimensions = NumActiveDimensions - NumAnomalousDimensions;
- }
+ usec_t StartUT = now_monotonic_usec();
+ TrainingResult TrainingRes;
+ {
+ worker_is_busy(WORKER_JOB_TRAINING_FIND);
+ AcquiredDimension AcqDim = AcquiredDimension::find(RH, TrainingReq.ChartId, TrainingReq.DimensionId);
- this->NumAnomalousDimensions = NumAnomalousDimensions;
- this->NumNormalDimensions = NumNormalDimensions;
- this->NumTrainedDimensions = NumTrainedDimensions;
- this->NumActiveDimensions = NumActiveDimensions;
+ worker_is_busy(WORKER_JOB_TRAINING_TRAIN);
+ TrainingRes = AcqDim.train(TrainingReq);
- worker_is_busy(WORKER_JOB_UPDATE_CHARTS);
- updateDimensionsChart(getRH(), NumTrainedDimensions, NumNormalDimensions, NumAnomalousDimensions);
- updateHostAndDetectionRateCharts(getRH(), HostAnomalyRate * 10000.0);
+ string_freez(TrainingReq.ChartId);
+ string_freez(TrainingReq.DimensionId);
+ }
+ usec_t ConsumedUT = now_monotonic_usec() - StartUT;
+
+ worker_is_busy(WORKER_JOB_TRAINING_STATS);
+
+ usec_t RemainingUT = 0;
+ if (ConsumedUT < AllottedUT)
+ RemainingUT = AllottedUT - ConsumedUT;
+
+ {
+ std::lock_guard<Mutex> L(M);
+
+ if (TS.AllottedUT == 0) {
+ struct rusage TRU;
+ getrusage(RUSAGE_THREAD, &TRU);
+ TS.TrainingRU = TRU;
+ }
+
+ TS.QueueSize += Size;
+ TS.NumPoppedItems += 1;
+
+ TS.AllottedUT += AllottedUT;
+ TS.ConsumedUT += ConsumedUT;
+ TS.RemainingUT += RemainingUT;
+
+ switch (TrainingRes) {
+ case TrainingResult::Ok:
+ TS.TrainingResultOk += 1;
+ break;
+ case TrainingResult::InvalidQueryTimeRange:
+ TS.TrainingResultInvalidQueryTimeRange += 1;
+ break;
+ case TrainingResult::NotEnoughCollectedValues:
+ TS.TrainingResultNotEnoughCollectedValues += 1;
+ break;
+ case TrainingResult::NullAcquiredDimension:
+ TS.TrainingResultNullAcquiredDimension += 1;
+ break;
+ case TrainingResult::ChartUnderReplication:
+ TS.TrainingResultChartUnderReplication += 1;
+ break;
+ }
+ }
- struct rusage TRU;
- getResourceUsage(&TRU);
- updateTrainingChart(getRH(), &TRU);
+ worker_is_idle();
+ std::this_thread::sleep_for(std::chrono::microseconds{RemainingUT});
+ worker_is_busy(0);
+ }
}
-void DetectableHost::detect() {
+void Host::detect() {
worker_register("MLDETECT");
- worker_register_job_name(WORKER_JOB_DETECT_DIMENSION, "dimensions");
- worker_register_job_name(WORKER_JOB_UPDATE_DETECTION_CHART, "detection chart");
- worker_register_job_name(WORKER_JOB_UPDATE_ANOMALY_RATES, "anomaly rates");
- worker_register_job_name(WORKER_JOB_UPDATE_CHARTS, "charts");
+ worker_register_job_name(WORKER_JOB_DETECTION_PREP, "prep");
+ worker_register_job_name(WORKER_JOB_DETECTION_DIM_CHART, "dim chart");
+ worker_register_job_name(WORKER_JOB_DETECTION_HOST_CHART, "host chart");
+ worker_register_job_name(WORKER_JOB_DETECTION_STATS, "stats");
+ worker_register_job_name(WORKER_JOB_DETECTION_RESOURCES, "resources");
- std::this_thread::sleep_for(Seconds{10});
+ service_register(SERVICE_THREAD_TYPE_NETDATA, NULL, (force_quit_t )ml_cancel_anomaly_detection_threads, RH, true);
heartbeat_t HB;
heartbeat_init(&HB);
- while (!netdata_exit) {
- netdata_thread_testcancel();
+ while (service_running((SERVICE_TYPE)(SERVICE_ML_PREDICTION | SERVICE_COLLECTORS))) {
worker_is_idle();
- heartbeat_next(&HB, updateEvery() * USEC_PER_SEC);
-
- netdata_thread_disable_cancelability();
+ heartbeat_next(&HB, (RH ? RH->rrd_update_every : default_rrd_update_every) * USEC_PER_SEC);
detectOnce();
-
- worker_is_busy(WORKER_JOB_UPDATE_DETECTION_CHART);
- updateDetectionChart(getRH());
- netdata_thread_enable_cancelability();
}
}
-void DetectableHost::getDetectionInfoAsJson(nlohmann::json &Json) const {
+void Host::getDetectionInfoAsJson(nlohmann::json &Json) const {
Json["version"] = 1;
- Json["anomalous-dimensions"] = NumAnomalousDimensions;
- Json["normal-dimensions"] = NumNormalDimensions;
- Json["total-dimensions"] = NumAnomalousDimensions + NumNormalDimensions;
- Json["trained-dimensions"] = NumTrainedDimensions;
+ Json["anomalous-dimensions"] = MLS.NumAnomalousDimensions;
+ Json["normal-dimensions"] = MLS.NumNormalDimensions;
+ Json["total-dimensions"] = MLS.NumAnomalousDimensions + MLS.NumNormalDimensions;
+ Json["trained-dimensions"] = MLS.NumTrainingStatusTrained + MLS.NumTrainingStatusPendingWithModel;
+}
+
+void *train_main(void *Arg) {
+ Host *H = reinterpret_cast<Host *>(Arg);
+ H->train();
+ return nullptr;
}
-void DetectableHost::startAnomalyDetectionThreads() {
- TrainingThread = std::thread(&TrainableHost::train, this);
- DetectionThread = std::thread(&DetectableHost::detect, this);
+void *detect_main(void *Arg) {
+ Host *H = reinterpret_cast<Host *>(Arg);
+ H->detect();
+ return nullptr;
}
-void DetectableHost::stopAnomalyDetectionThreads() {
- netdata_thread_cancel(TrainingThread.native_handle());
- netdata_thread_cancel(DetectionThread.native_handle());
+void Host::startAnomalyDetectionThreads() {
+ if (ThreadsRunning) {
+ error("Anomaly detections threads for host %s are already-up and running.", rrdhost_hostname(RH));
+ return;
+ }
+
+ ThreadsRunning = true;
+ ThreadsCancelled = false;
+ ThreadsJoined = false;
+
+ char Tag[NETDATA_THREAD_TAG_MAX + 1];
+
+// #define ML_DISABLE_JOINING
- TrainingThread.join();
- DetectionThread.join();
+ snprintfz(Tag, NETDATA_THREAD_TAG_MAX, "MLTR[%s]", rrdhost_hostname(RH));
+ netdata_thread_create(&TrainingThread, Tag, NETDATA_THREAD_OPTION_JOINABLE, train_main, static_cast<void *>(this));
+
+ snprintfz(Tag, NETDATA_THREAD_TAG_MAX, "MLDT[%s]", rrdhost_hostname(RH));
+ netdata_thread_create(&DetectionThread, Tag, NETDATA_THREAD_OPTION_JOINABLE, detect_main, static_cast<void *>(this));
+}
+
+void Host::stopAnomalyDetectionThreads(bool join) {
+ if (!ThreadsRunning) {
+ error("Anomaly detections threads for host %s have already been stopped.", rrdhost_hostname(RH));
+ return;
+ }
+
+ if(!ThreadsCancelled) {
+ ThreadsCancelled = true;
+
+ // Signal the training queue to stop popping-items
+ TrainingQueue.signal();
+ netdata_thread_cancel(TrainingThread);
+ netdata_thread_cancel(DetectionThread);
+ }
+
+ if (join && !ThreadsJoined) {
+ ThreadsJoined = true;
+ ThreadsRunning = false;
+
+ // these fail on alpine linux and our CI hangs forever
+ // failing to compile static builds
+
+ // commenting them, until we find a solution
+
+ // to enable again:
+ // NETDATA_THREAD_OPTION_DEFAULT needs to become NETDATA_THREAD_OPTION_JOINABLE
+
+ netdata_thread_join(TrainingThread, nullptr);
+ netdata_thread_join(DetectionThread, nullptr);
+ }
}