summaryrefslogtreecommitdiffstats
path: root/ml/Dimension.cc
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--ml/Dimension.cc309
1 files changed, 241 insertions, 68 deletions
diff --git a/ml/Dimension.cc b/ml/Dimension.cc
index bf34abb72..db9256895 100644
--- a/ml/Dimension.cc
+++ b/ml/Dimension.cc
@@ -3,171 +3,344 @@
#include "Config.h"
#include "Dimension.h"
#include "Query.h"
+#include "Host.h"
using namespace ml;
-bool Dimension::isActive() const {
- bool SetObsolete = rrdset_flag_check(RD->rrdset, RRDSET_FLAG_OBSOLETE);
- bool DimObsolete = rrddim_flag_check(RD, RRDDIM_FLAG_OBSOLETE);
- return !SetObsolete && !DimObsolete;
+static const char *mls2str(MachineLearningStatus MLS) {
+ switch (MLS) {
+ case ml::MachineLearningStatus::Enabled:
+ return "enabled";
+ case ml::MachineLearningStatus::DisabledDueToUniqueUpdateEvery:
+ return "disabled-ue";
+ case ml::MachineLearningStatus::DisabledDueToExcludedChart:
+ return "disabled-sp";
+ default:
+ return "unknown";
+ }
+}
+
+static const char *mt2str(MetricType MT) {
+ switch (MT) {
+ case ml::MetricType::Constant:
+ return "constant";
+ case ml::MetricType::Variable:
+ return "variable";
+ default:
+ return "unknown";
+ }
}
-std::pair<CalculatedNumber *, size_t> Dimension::getCalculatedNumbers() {
+static const char *ts2str(TrainingStatus TS) {
+ switch (TS) {
+ case ml::TrainingStatus::PendingWithModel:
+ return "pending-with-model";
+ case ml::TrainingStatus::PendingWithoutModel:
+ return "pending-without-model";
+ case ml::TrainingStatus::Trained:
+ return "trained";
+ case ml::TrainingStatus::Untrained:
+ return "untrained";
+ default:
+ return "unknown";
+ }
+}
+
+static const char *tr2str(TrainingResult TR) {
+ switch (TR) {
+ case ml::TrainingResult::Ok:
+ return "ok";
+ case ml::TrainingResult::InvalidQueryTimeRange:
+ return "invalid-query";
+ case ml::TrainingResult::NotEnoughCollectedValues:
+ return "missing-values";
+ case ml::TrainingResult::NullAcquiredDimension:
+ return "null-acquired-dim";
+ case ml::TrainingResult::ChartUnderReplication:
+ return "chart-under-replication";
+ default:
+ return "unknown";
+ }
+}
+
+std::pair<CalculatedNumber *, TrainingResponse> Dimension::getCalculatedNumbers(const TrainingRequest &TrainingReq) {
+ TrainingResponse TrainingResp = {};
+
+ TrainingResp.RequestTime = TrainingReq.RequestTime;
+ TrainingResp.FirstEntryOnRequest = TrainingReq.FirstEntryOnRequest;
+ TrainingResp.LastEntryOnRequest = TrainingReq.LastEntryOnRequest;
+
+ TrainingResp.FirstEntryOnResponse = rrddim_first_entry_s_of_tier(RD, 0);
+ TrainingResp.LastEntryOnResponse = rrddim_last_entry_s_of_tier(RD, 0);
+
size_t MinN = Cfg.MinTrainSamples;
size_t MaxN = Cfg.MaxTrainSamples;
// Figure out what our time window should be.
- time_t BeforeT = now_realtime_sec() - 1;
- time_t AfterT = BeforeT - (MaxN * updateEvery());
-
- BeforeT -= (BeforeT % updateEvery());
- AfterT -= (AfterT % updateEvery());
-
- BeforeT = std::min(BeforeT, latestTime());
- AfterT = std::max(AfterT, oldestTime());
+ TrainingResp.QueryBeforeT = TrainingResp.LastEntryOnResponse;
+ TrainingResp.QueryAfterT = std::max(
+ TrainingResp.QueryBeforeT - static_cast<time_t>((MaxN - 1) * updateEvery()),
+ TrainingResp.FirstEntryOnResponse
+ );
+
+ if (TrainingResp.QueryAfterT >= TrainingResp.QueryBeforeT) {
+ TrainingResp.Result = TrainingResult::InvalidQueryTimeRange;
+ return { nullptr, TrainingResp };
+ }
- if (AfterT >= BeforeT)
- return { nullptr, 0 };
+ if (rrdset_is_replicating(RD->rrdset)) {
+ TrainingResp.Result = TrainingResult::ChartUnderReplication;
+ return { nullptr, TrainingResp };
+ }
CalculatedNumber *CNs = new CalculatedNumber[MaxN * (Cfg.LagN + 1)]();
// Start the query.
- unsigned Idx = 0;
- unsigned CollectedValues = 0;
- unsigned TotalValues = 0;
+ size_t Idx = 0;
CalculatedNumber LastValue = std::numeric_limits<CalculatedNumber>::quiet_NaN();
Query Q = Query(getRD());
- Q.init(AfterT, BeforeT);
+ Q.init(TrainingResp.QueryAfterT, TrainingResp.QueryBeforeT);
while (!Q.isFinished()) {
if (Idx == MaxN)
break;
auto P = Q.nextMetric();
+
CalculatedNumber Value = P.second;
if (netdata_double_isnumber(Value)) {
+ if (!TrainingResp.DbAfterT)
+ TrainingResp.DbAfterT = P.first;
+ TrainingResp.DbBeforeT = P.first;
+
CNs[Idx] = Value;
LastValue = CNs[Idx];
- CollectedValues++;
+ TrainingResp.CollectedValues++;
} else
CNs[Idx] = LastValue;
Idx++;
}
- TotalValues = Idx;
+ TrainingResp.TotalValues = Idx;
+
+ if (TrainingResp.CollectedValues < MinN) {
+ TrainingResp.Result = TrainingResult::NotEnoughCollectedValues;
- if (CollectedValues < MinN) {
delete[] CNs;
- return { nullptr, 0 };
+ return { nullptr, TrainingResp };
}
// Find first non-NaN value.
- for (Idx = 0; std::isnan(CNs[Idx]); Idx++, TotalValues--) { }
+ for (Idx = 0; std::isnan(CNs[Idx]); Idx++, TrainingResp.TotalValues--) { }
// Overwrite NaN values.
if (Idx != 0)
- memmove(CNs, &CNs[Idx], sizeof(CalculatedNumber) * TotalValues);
+ memmove(CNs, &CNs[Idx], sizeof(CalculatedNumber) * TrainingResp.TotalValues);
- return { CNs, TotalValues };
+ TrainingResp.Result = TrainingResult::Ok;
+ return { CNs, TrainingResp };
}
-MLResult Dimension::trainModel() {
- auto P = getCalculatedNumbers();
+TrainingResult Dimension::trainModel(const TrainingRequest &TrainingReq) {
+ auto P = getCalculatedNumbers(TrainingReq);
CalculatedNumber *CNs = P.first;
- unsigned N = P.second;
+ TrainingResponse TrainingResp = P.second;
+
+ if (TrainingResp.Result != TrainingResult::Ok) {
+ std::lock_guard<Mutex> L(M);
+
+ MT = MetricType::Constant;
+
+ switch (TS) {
+ case TrainingStatus::PendingWithModel:
+ TS = TrainingStatus::Trained;
+ break;
+ case TrainingStatus::PendingWithoutModel:
+ TS = TrainingStatus::Untrained;
+ break;
+ default:
+ break;
+ }
+
+ TR = TrainingResp;
- if (!CNs)
- return MLResult::MissingData;
+ LastTrainingTime = TrainingResp.LastEntryOnResponse;
+ return TrainingResp.Result;
+ }
+ unsigned N = TrainingResp.TotalValues;
unsigned TargetNumSamples = Cfg.MaxTrainSamples * Cfg.RandomSamplingRatio;
double SamplingRatio = std::min(static_cast<double>(TargetNumSamples) / N, 1.0);
SamplesBuffer SB = SamplesBuffer(CNs, N, 1, Cfg.DiffN, Cfg.SmoothN, Cfg.LagN,
SamplingRatio, Cfg.RandomNums);
- std::vector<DSample> Samples = SB.preprocess();
+ std::vector<DSample> Samples;
+ SB.preprocess(Samples);
KMeans KM;
KM.train(Samples, Cfg.MaxKMeansIters);
{
- std::lock_guard<std::mutex> Lock(Mutex);
- Models[0] = KM;
- }
+ std::lock_guard<Mutex> L(M);
- Trained = true;
- ConstantModel = true;
+ if (Models.size() < Cfg.NumModelsToUse) {
+ Models.push_back(std::move(KM));
+ } else {
+ std::rotate(std::begin(Models), std::begin(Models) + 1, std::end(Models));
+ Models[Models.size() - 1] = std::move(KM);
+ }
+
+ MT = MetricType::Constant;
+ TS = TrainingStatus::Trained;
+ TR = TrainingResp;
+ LastTrainingTime = rrddim_last_entry_s(RD);
+ }
delete[] CNs;
- return MLResult::Success;
+ return TrainingResp.Result;
}
-bool Dimension::shouldTrain(const TimePoint &TP) const {
- if (ConstantModel)
- return false;
+void Dimension::scheduleForTraining(time_t CurrT) {
+ switch (MT) {
+ case MetricType::Constant: {
+ return;
+ } default:
+ break;
+ }
- return (LastTrainedAt + Seconds(Cfg.TrainEvery * updateEvery())) < TP;
+ switch (TS) {
+ case TrainingStatus::PendingWithModel:
+ case TrainingStatus::PendingWithoutModel:
+ break;
+ case TrainingStatus::Untrained: {
+ Host *H = reinterpret_cast<Host *>(RD->rrdset->rrdhost->ml_host);
+ TS = TrainingStatus::PendingWithoutModel;
+ H->scheduleForTraining(getTrainingRequest(CurrT));
+ break;
+ }
+ case TrainingStatus::Trained: {
+ bool NeedsTraining = (time_t)(LastTrainingTime + (Cfg.TrainEvery * updateEvery())) < CurrT;
+
+ if (NeedsTraining) {
+ Host *H = reinterpret_cast<Host *>(RD->rrdset->rrdhost->ml_host);
+ TS = TrainingStatus::PendingWithModel;
+ H->scheduleForTraining(getTrainingRequest(CurrT));
+ }
+ break;
+ }
+ }
}
-bool Dimension::predict(CalculatedNumber Value, bool Exists) {
+bool Dimension::predict(time_t CurrT, CalculatedNumber Value, bool Exists) {
+ // Nothing to do if ML is disabled for this dimension
+ if (MLS != MachineLearningStatus::Enabled)
+ return false;
+
+ // Don't treat values that don't exist as anomalous
if (!Exists) {
CNs.clear();
- AnomalyBit = false;
return false;
}
+ // Save the value and return if we don't have enough values for a sample
unsigned N = Cfg.DiffN + Cfg.SmoothN + Cfg.LagN;
if (CNs.size() < N) {
CNs.push_back(Value);
- AnomalyBit = false;
return false;
}
+ // Push the value and check if it's different from the last one
+ bool SameValue = true;
std::rotate(std::begin(CNs), std::begin(CNs) + 1, std::end(CNs));
-
if (CNs[N - 1] != Value)
- ConstantModel = false;
-
+ SameValue = false;
CNs[N - 1] = Value;
- if (!isTrained() || ConstantModel) {
- AnomalyBit = false;
- return false;
- }
-
- CalculatedNumber *TmpCNs = new CalculatedNumber[N * (Cfg.LagN + 1)]();
+ // Create the sample
+ CalculatedNumber TmpCNs[N * (Cfg.LagN + 1)];
+ memset(TmpCNs, 0, N * (Cfg.LagN + 1) * sizeof(CalculatedNumber));
std::memcpy(TmpCNs, CNs.data(), N * sizeof(CalculatedNumber));
SamplesBuffer SB = SamplesBuffer(TmpCNs, N, 1,
Cfg.DiffN, Cfg.SmoothN, Cfg.LagN,
1.0, Cfg.RandomNums);
- const DSample Sample = SB.preprocess().back();
- delete[] TmpCNs;
+ SB.preprocess(Feature);
- std::unique_lock<std::mutex> Lock(Mutex, std::defer_lock);
- if (!Lock.try_lock()) {
- AnomalyBit = false;
+ /*
+ * Lock to predict and possibly schedule the dimension for training
+ */
+
+ std::unique_lock<Mutex> L(M, std::defer_lock);
+ if (!L.try_lock()) {
return false;
}
+ // Mark the metric time as variable if we received different values
+ if (!SameValue)
+ MT = MetricType::Variable;
+
+ // Decide if the dimension needs to be scheduled for training
+ scheduleForTraining(CurrT);
+
+ // Nothing to do if we don't have a model
+ switch (TS) {
+ case TrainingStatus::Untrained:
+ case TrainingStatus::PendingWithoutModel:
+ return false;
+ default:
+ break;
+ }
+
+ /*
+ * Use the KMeans models to check if the value is anomalous
+ */
+
+ size_t ModelsConsulted = 0;
+ size_t Sum = 0;
+
for (const auto &KM : Models) {
- double AnomalyScore = KM.anomalyScore(Sample);
- if (AnomalyScore == std::numeric_limits<CalculatedNumber>::quiet_NaN()) {
- AnomalyBit = false;
+ ModelsConsulted++;
+
+ double AnomalyScore = KM.anomalyScore(Feature);
+ if (AnomalyScore == std::numeric_limits<CalculatedNumber>::quiet_NaN())
continue;
- }
if (AnomalyScore < (100 * Cfg.DimensionAnomalyScoreThreshold)) {
- AnomalyBit = false;
+ global_statistics_ml_models_consulted(ModelsConsulted);
return false;
}
+
+ Sum += 1;
}
- AnomalyBit = true;
- return true;
+ global_statistics_ml_models_consulted(ModelsConsulted);
+ return Sum;
}
-std::array<KMeans, 1> Dimension::getModels() {
- std::unique_lock<std::mutex> Lock(Mutex);
+std::vector<KMeans> Dimension::getModels() {
+ std::unique_lock<Mutex> L(M);
return Models;
}
+
+void Dimension::dump() const {
+ const char *ChartId = rrdset_id(RD->rrdset);
+ const char *DimensionId = rrddim_id(RD);
+
+ const char *MLS_Str = mls2str(MLS);
+ const char *MT_Str = mt2str(MT);
+ const char *TS_Str = ts2str(TS);
+ const char *TR_Str = tr2str(TR.Result);
+
+ const char *fmt =
+ "[ML] %s.%s: MLS=%s, MT=%s, TS=%s, Result=%s, "
+ "ReqTime=%ld, FEOReq=%ld, LEOReq=%ld, "
+ "FEOResp=%ld, LEOResp=%ld, QTR=<%ld, %ld>, DBTR=<%ld, %ld>, Collected=%zu, Total=%zu";
+
+ error(fmt,
+ ChartId, DimensionId, MLS_Str, MT_Str, TS_Str, TR_Str,
+ TR.RequestTime, TR.FirstEntryOnRequest, TR.LastEntryOnRequest,
+ TR.FirstEntryOnResponse, TR.LastEntryOnResponse,
+ TR.QueryAfterT, TR.QueryBeforeT, TR.DbAfterT, TR.DbBeforeT, TR.CollectedValues, TR.TotalValues
+ );
+}