summaryrefslogtreecommitdiffstats
path: root/ml
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2022-11-30 18:47:00 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2022-11-30 18:47:00 +0000
commit03bf87dcb06f7021bfb2df2fa8691593c6148aff (patch)
treee16b06711a2ed77cafb4b7754be0220c3d14a9d7 /ml
parentAdding upstream version 1.36.1. (diff)
downloadnetdata-03bf87dcb06f7021bfb2df2fa8691593c6148aff.tar.xz
netdata-03bf87dcb06f7021bfb2df2fa8691593c6148aff.zip
Adding upstream version 1.37.0.upstream/1.37.0
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'ml')
-rw-r--r--ml/ADCharts.cc233
-rw-r--r--ml/ADCharts.h23
-rw-r--r--ml/BitBufferCounter.cc29
-rw-r--r--ml/BitBufferCounter.h54
-rw-r--r--ml/BitRateWindow.cc75
-rw-r--r--ml/BitRateWindow.h170
-rw-r--r--ml/Config.cc52
-rw-r--r--ml/Config.h11
-rw-r--r--ml/Database.cc127
-rw-r--r--ml/Database.h131
-rw-r--r--ml/Dimension.cc76
-rw-r--r--ml/Dimension.h165
-rw-r--r--ml/Host.cc374
-rw-r--r--ml/Host.h51
-rw-r--r--ml/KMeans.cc43
-rw-r--r--ml/KMeans.h (renamed from ml/kmeans/KMeans.h)15
-rw-r--r--ml/Makefile.am8
-rw-r--r--ml/Query.h27
-rw-r--r--ml/README.md2
-rw-r--r--ml/SamplesBuffer.cc (renamed from ml/kmeans/SamplesBuffer.cc)0
-rw-r--r--ml/SamplesBuffer.h (renamed from ml/kmeans/SamplesBuffer.h)0
-rw-r--r--ml/SamplesBufferTests.cc (renamed from ml/kmeans/Tests.cc)9
-rw-r--r--ml/Tests.cc301
-rw-r--r--ml/kmeans/KMeans.cc55
-rw-r--r--ml/kmeans/Makefile.am4
-rw-r--r--ml/ml-dummy.c35
-rw-r--r--ml/ml-private.h2
-rw-r--r--ml/ml.cc99
-rw-r--r--ml/ml.h13
29 files changed, 527 insertions, 1657 deletions
diff --git a/ml/ADCharts.cc b/ml/ADCharts.cc
new file mode 100644
index 000000000..00c593c0c
--- /dev/null
+++ b/ml/ADCharts.cc
@@ -0,0 +1,233 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+#include "ADCharts.h"
+#include "Config.h"
+
+void ml::updateDimensionsChart(RRDHOST *RH,
+ collected_number NumTrainedDimensions,
+ collected_number NumNormalDimensions,
+ collected_number NumAnomalousDimensions) {
+ static thread_local RRDSET *RS = nullptr;
+ static thread_local RRDDIM *NumTotalDimensionsRD = nullptr;
+ static thread_local RRDDIM *NumTrainedDimensionsRD = nullptr;
+ static thread_local RRDDIM *NumNormalDimensionsRD = nullptr;
+ static thread_local RRDDIM *NumAnomalousDimensionsRD = nullptr;
+
+ if (!RS) {
+ std::stringstream IdSS, NameSS;
+
+ IdSS << "dimensions_on_" << localhost->machine_guid;
+ NameSS << "dimensions_on_" << localhost->hostname;
+
+ RS = rrdset_create(
+ RH,
+ "anomaly_detection", // type
+ IdSS.str().c_str(), // id
+ NameSS.str().c_str(), // name
+ "dimensions", // family
+ "anomaly_detection.dimensions", // ctx
+ "Anomaly detection dimensions", // title
+ "dimensions", // units
+ "netdata", // plugin
+ "ml", // module
+ 39183, // priority
+ RH->rrd_update_every, // update_every
+ RRDSET_TYPE_LINE // chart_type
+ );
+ rrdset_flag_set(RS, RRDSET_FLAG_ANOMALY_DETECTION);
+
+ NumTotalDimensionsRD = rrddim_add(RS, "total", NULL,
+ 1, 1, RRD_ALGORITHM_ABSOLUTE);
+ NumTrainedDimensionsRD = rrddim_add(RS, "trained", NULL,
+ 1, 1, RRD_ALGORITHM_ABSOLUTE);
+ NumNormalDimensionsRD = rrddim_add(RS, "normal", NULL,
+ 1, 1, RRD_ALGORITHM_ABSOLUTE);
+ NumAnomalousDimensionsRD = rrddim_add(RS, "anomalous", NULL,
+ 1, 1, RRD_ALGORITHM_ABSOLUTE);
+ }
+
+ rrddim_set_by_pointer(RS, NumTotalDimensionsRD, NumNormalDimensions + NumAnomalousDimensions);
+ rrddim_set_by_pointer(RS, NumTrainedDimensionsRD, NumTrainedDimensions);
+ rrddim_set_by_pointer(RS, NumNormalDimensionsRD, NumNormalDimensions);
+ rrddim_set_by_pointer(RS, NumAnomalousDimensionsRD, NumAnomalousDimensions);
+
+ rrdset_done(RS);
+}
+
+void ml::updateHostAndDetectionRateCharts(RRDHOST *RH, collected_number AnomalyRate) {
+ static thread_local RRDSET *HostRateRS = nullptr;
+ static thread_local RRDDIM *AnomalyRateRD = nullptr;
+
+ if (!HostRateRS) {
+ std::stringstream IdSS, NameSS;
+
+ IdSS << "anomaly_rate_on_" << localhost->machine_guid;
+ NameSS << "anomaly_rate_on_" << localhost->hostname;
+
+ HostRateRS = rrdset_create(
+ RH,
+ "anomaly_detection", // type
+ IdSS.str().c_str(), // id
+ NameSS.str().c_str(), // name
+ "anomaly_rate", // family
+ "anomaly_detection.anomaly_rate", // ctx
+ "Percentage of anomalous dimensions", // title
+ "percentage", // units
+ "netdata", // plugin
+ "ml", // module
+ 39184, // priority
+ RH->rrd_update_every, // update_every
+ RRDSET_TYPE_LINE // chart_type
+ );
+ rrdset_flag_set(HostRateRS, RRDSET_FLAG_ANOMALY_DETECTION);
+
+ AnomalyRateRD = rrddim_add(HostRateRS, "anomaly_rate", NULL,
+ 1, 100, RRD_ALGORITHM_ABSOLUTE);
+ }
+
+ rrddim_set_by_pointer(HostRateRS, AnomalyRateRD, AnomalyRate);
+ rrdset_done(HostRateRS);
+
+ static thread_local RRDSET *AnomalyDetectionRS = nullptr;
+ static thread_local RRDDIM *AboveThresholdRD = nullptr;
+ static thread_local RRDDIM *NewAnomalyEventRD = nullptr;
+
+ if (!AnomalyDetectionRS) {
+ std::stringstream IdSS, NameSS;
+
+ IdSS << "anomaly_detection_on_" << localhost->machine_guid;
+ NameSS << "anomaly_detection_on_" << localhost->hostname;
+
+ AnomalyDetectionRS = rrdset_create(
+ RH,
+ "anomaly_detection", // type
+ IdSS.str().c_str(), // id
+ NameSS.str().c_str(), // name
+ "anomaly_detection", // family
+ "anomaly_detection.detector_events", // ctx
+ "Anomaly detection events", // title
+ "percentage", // units
+ "netdata", // plugin
+ "ml", // module
+ 39185, // priority
+ RH->rrd_update_every, // update_every
+ RRDSET_TYPE_LINE // chart_type
+ );
+ rrdset_flag_set(AnomalyDetectionRS, RRDSET_FLAG_ANOMALY_DETECTION);
+
+ AboveThresholdRD = rrddim_add(AnomalyDetectionRS, "above_threshold", NULL,
+ 1, 1, RRD_ALGORITHM_ABSOLUTE);
+ NewAnomalyEventRD = rrddim_add(AnomalyDetectionRS, "new_anomaly_event", NULL,
+ 1, 1, RRD_ALGORITHM_ABSOLUTE);
+ }
+
+ /*
+ * Compute the values of the dimensions based on the host rate chart
+ */
+ ONEWAYALLOC *OWA = onewayalloc_create(0);
+ time_t Now = now_realtime_sec();
+ time_t Before = Now - RH->rrd_update_every;
+ time_t After = Before - Cfg.AnomalyDetectionQueryDuration;
+ RRDR_OPTIONS Options = static_cast<RRDR_OPTIONS>(0x00000000);
+
+ RRDR *R = rrd2rrdr_legacy(
+ OWA, HostRateRS,
+ 1 /* points wanted */,
+ After,
+ Before,
+ Cfg.AnomalyDetectionGroupingMethod,
+ 0 /* resampling time */,
+ Options, "anomaly_rate",
+ NULL /* group options */,
+ 0, /* timeout */
+ 0, /* tier */
+ QUERY_SOURCE_ML
+ );
+ if(R) {
+ assert(R->d == 1 && R->n == 1 && R->rows == 1);
+
+ static thread_local bool PrevAboveThreshold = false;
+ bool AboveThreshold = R->v[0] >= Cfg.HostAnomalyRateThreshold;
+ bool NewAnomalyEvent = AboveThreshold && !PrevAboveThreshold;
+ PrevAboveThreshold = AboveThreshold;
+
+ rrddim_set_by_pointer(AnomalyDetectionRS, AboveThresholdRD, AboveThreshold);
+ rrddim_set_by_pointer(AnomalyDetectionRS, NewAnomalyEventRD, NewAnomalyEvent);
+ rrdset_done(AnomalyDetectionRS);
+
+ rrdr_free(OWA, R);
+ }
+ onewayalloc_destroy(OWA);
+}
+
+void ml::updateDetectionChart(RRDHOST *RH) {
+ static thread_local RRDSET *RS = nullptr;
+ static thread_local RRDDIM *UserRD, *SystemRD = nullptr;
+
+ if (!RS) {
+ std::stringstream IdSS, NameSS;
+
+ IdSS << "prediction_stats_" << RH->machine_guid;
+ NameSS << "prediction_stats_for_" << RH->hostname;
+
+ RS = rrdset_create_localhost(
+ "netdata", // type
+ IdSS.str().c_str(), // id
+ NameSS.str().c_str(), // name
+ "ml", // family
+ "netdata.prediction_stats", // ctx
+ "Prediction thread CPU usage", // title
+ "milliseconds/s", // units
+ "netdata", // plugin
+ "ml", // module
+ 136000, // priority
+ RH->rrd_update_every, // update_every
+ RRDSET_TYPE_STACKED // chart_type
+ );
+
+ UserRD = rrddim_add(RS, "user", NULL, 1, 1000, RRD_ALGORITHM_INCREMENTAL);
+ SystemRD = rrddim_add(RS, "system", NULL, 1, 1000, RRD_ALGORITHM_INCREMENTAL);
+ }
+
+ struct rusage TRU;
+ getrusage(RUSAGE_THREAD, &TRU);
+
+ rrddim_set_by_pointer(RS, UserRD, TRU.ru_utime.tv_sec * 1000000ULL + TRU.ru_utime.tv_usec);
+ rrddim_set_by_pointer(RS, SystemRD, TRU.ru_stime.tv_sec * 1000000ULL + TRU.ru_stime.tv_usec);
+ rrdset_done(RS);
+}
+
+void ml::updateTrainingChart(RRDHOST *RH, struct rusage *TRU) {
+ static thread_local RRDSET *RS = nullptr;
+ static thread_local RRDDIM *UserRD = nullptr;
+ static thread_local RRDDIM *SystemRD = nullptr;
+
+ if (!RS) {
+ std::stringstream IdSS, NameSS;
+
+ IdSS << "training_stats_" << RH->machine_guid;
+ NameSS << "training_stats_for_" << RH->hostname;
+
+ RS = rrdset_create_localhost(
+ "netdata", // type
+ IdSS.str().c_str(), // id
+ NameSS.str().c_str(), // name
+ "ml", // family
+ "netdata.training_stats", // ctx
+ "Training thread CPU usage", // title
+ "milliseconds/s", // units
+ "netdata", // plugin
+ "ml", // module
+ 136001, // priority
+ RH->rrd_update_every, // update_every
+ RRDSET_TYPE_STACKED // chart_type
+ );
+
+ UserRD = rrddim_add(RS, "user", NULL, 1, 1000, RRD_ALGORITHM_INCREMENTAL);
+ SystemRD = rrddim_add(RS, "system", NULL, 1, 1000, RRD_ALGORITHM_INCREMENTAL);
+ }
+
+ rrddim_set_by_pointer(RS, UserRD, TRU->ru_utime.tv_sec * 1000000ULL + TRU->ru_utime.tv_usec);
+ rrddim_set_by_pointer(RS, SystemRD, TRU->ru_stime.tv_sec * 1000000ULL + TRU->ru_stime.tv_usec);
+ rrdset_done(RS);
+}
diff --git a/ml/ADCharts.h b/ml/ADCharts.h
new file mode 100644
index 000000000..0be324f7d
--- /dev/null
+++ b/ml/ADCharts.h
@@ -0,0 +1,23 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+#ifndef ML_ADCHARTS_H
+#define ML_ADCHARTS_H
+
+#include "ml-private.h"
+
+namespace ml {
+
+void updateDimensionsChart(RRDHOST *RH,
+ collected_number NumTrainedDimensions,
+ collected_number NumNormalDimensions,
+ collected_number NumAnomalousDimensions);
+
+void updateHostAndDetectionRateCharts(RRDHOST *RH, collected_number AnomalyRate);
+
+void updateDetectionChart(RRDHOST *RH);
+
+void updateTrainingChart(RRDHOST *RH, struct rusage *TRU);
+
+} // namespace ml
+
+#endif /* ML_ADCHARTS_H */
diff --git a/ml/BitBufferCounter.cc b/ml/BitBufferCounter.cc
deleted file mode 100644
index 5e1ab5aca..000000000
--- a/ml/BitBufferCounter.cc
+++ /dev/null
@@ -1,29 +0,0 @@
-// SPDX-License-Identifier: GPL-3.0-or-later
-
-#include "BitBufferCounter.h"
-
-using namespace ml;
-
-std::vector<bool> BitBufferCounter::getBuffer() const {
- std::vector<bool> Buffer;
-
- for (size_t Idx = start(); Idx != (start() + size()); Idx++)
- Buffer.push_back(V[Idx % V.size()]);
-
- return Buffer;
-}
-
-void BitBufferCounter::insert(bool Bit) {
- if (N >= V.size())
- NumSetBits -= (V[start()] == true);
-
- NumSetBits += (Bit == true);
- V[N++ % V.size()] = Bit;
-}
-
-void BitBufferCounter::print(std::ostream &OS) const {
- std::vector<bool> Buffer = getBuffer();
-
- for (bool B : Buffer)
- OS << B;
-}
diff --git a/ml/BitBufferCounter.h b/ml/BitBufferCounter.h
deleted file mode 100644
index db924d776..000000000
--- a/ml/BitBufferCounter.h
+++ /dev/null
@@ -1,54 +0,0 @@
-// SPDX-License-Identifier: GPL-3.0-or-later
-
-#ifndef BIT_BUFFER_COUNTER_H
-#define BIT_BUFFER_COUNTER_H
-
-#include "ml-private.h"
-
-namespace ml {
-
-class BitBufferCounter {
-public:
- BitBufferCounter(size_t Capacity) : V(Capacity, 0), NumSetBits(0), N(0) {}
-
- std::vector<bool> getBuffer() const;
-
- void insert(bool Bit);
-
- void print(std::ostream &OS) const;
-
- bool isFilled() const {
- return N >= V.size();
- }
-
- size_t numSetBits() const {
- return NumSetBits;
- }
-
-private:
- inline size_t size() const {
- return N < V.size() ? N : V.size();
- }
-
- inline size_t start() const {
- if (N <= V.size())
- return 0;
-
- return N % V.size();
- }
-
-private:
- std::vector<bool> V;
- size_t NumSetBits;
-
- size_t N;
-};
-
-} // namespace ml
-
-inline std::ostream& operator<<(std::ostream &OS, const ml::BitBufferCounter &BBC) {
- BBC.print(OS);
- return OS;
-}
-
-#endif /* BIT_BUFFER_COUNTER_H */
diff --git a/ml/BitRateWindow.cc b/ml/BitRateWindow.cc
deleted file mode 100644
index c4c994c42..000000000
--- a/ml/BitRateWindow.cc
+++ /dev/null
@@ -1,75 +0,0 @@
-// SPDX-License-Identifier: GPL-3.0-or-later
-
-#include "BitRateWindow.h"
-
-using namespace ml;
-
-std::pair<BitRateWindow::Edge, size_t> BitRateWindow::insert(bool Bit) {
- Edge E;
-
- BBC.insert(Bit);
- switch (CurrState) {
- case State::NotFilled: {
- if (BBC.isFilled()) {
- if (BBC.numSetBits() < SetBitsThreshold) {
- CurrState = State::BelowThreshold;
- } else {
- CurrState = State::AboveThreshold;
- }
- } else {
- CurrState = State::NotFilled;
- }
-
- E = {State::NotFilled, CurrState};
- break;
- } case State::BelowThreshold: {
- if (BBC.numSetBits() >= SetBitsThreshold) {
- CurrState = State::AboveThreshold;
- }
-
- E = {State::BelowThreshold, CurrState};
- break;
- } case State::AboveThreshold: {
- if ((BBC.numSetBits() < SetBitsThreshold) ||
- (CurrLength == MaxLength)) {
- CurrState = State::Idle;
- }
-
- E = {State::AboveThreshold, CurrState};
- break;
- } case State::Idle: {
- if (CurrLength == IdleLength) {
- CurrState = State::NotFilled;
- }
-
- E = {State::Idle, CurrState};
- break;
- }
- }
-
- Action A = EdgeActions[E];
- size_t L = (this->*A)(E.first, Bit);
- return {E, L};
-}
-
-void BitRateWindow::print(std::ostream &OS) const {
- switch (CurrState) {
- case State::NotFilled:
- OS << "NotFilled";
- break;
- case State::BelowThreshold:
- OS << "BelowThreshold";
- break;
- case State::AboveThreshold:
- OS << "AboveThreshold";
- break;
- case State::Idle:
- OS << "Idle";
- break;
- default:
- OS << "UnknownState";
- break;
- }
-
- OS << ": " << BBC << " (Current Length: " << CurrLength << ")";
-}
diff --git a/ml/BitRateWindow.h b/ml/BitRateWindow.h
deleted file mode 100644
index 0d99008b8..000000000
--- a/ml/BitRateWindow.h
+++ /dev/null
@@ -1,170 +0,0 @@
-// SPDX-License-Identifier: GPL-3.0-or-later
-
-#ifndef BIT_RATE_WINDOW_H
-#define BIT_RATE_WINDOW_H
-
-#include "BitBufferCounter.h"
-#include "ml-private.h"
-
-namespace ml {
-
-class BitRateWindow {
-public:
- enum class State {
- NotFilled,
- BelowThreshold,
- AboveThreshold,
- Idle
- };
-
- using Edge = std::pair<State, State>;
- using Action = size_t (BitRateWindow::*)(State PrevState, bool NewBit);
-
-private:
- std::map<Edge, Action> EdgeActions = {
- // From == To
- {
- Edge(State::NotFilled, State::NotFilled),
- &BitRateWindow::onRoundtripNotFilled,
- },
- {
- Edge(State::BelowThreshold, State::BelowThreshold),
- &BitRateWindow::onRoundtripBelowThreshold,
- },
- {
- Edge(State::AboveThreshold, State::AboveThreshold),
- &BitRateWindow::onRoundtripAboveThreshold,
- },
- {
- Edge(State::Idle, State::Idle),
- &BitRateWindow::onRoundtripIdle,
- },
-
-
- // NotFilled => {BelowThreshold, AboveThreshold}
- {
- Edge(State::NotFilled, State::BelowThreshold),
- &BitRateWindow::onNotFilledToBelowThreshold
- },
- {
- Edge(State::NotFilled, State::AboveThreshold),
- &BitRateWindow::onNotFilledToAboveThreshold
- },
-
- // BelowThreshold => AboveThreshold
- {
- Edge(State::BelowThreshold, State::AboveThreshold),
- &BitRateWindow::onBelowToAboveThreshold
- },
-
- // AboveThreshold => Idle
- {
- Edge(State::AboveThreshold, State::Idle),
- &BitRateWindow::onAboveThresholdToIdle
- },
-
- // Idle => NotFilled
- {
- Edge(State::Idle, State::NotFilled),
- &BitRateWindow::onIdleToNotFilled
- },
- };
-
-public:
- BitRateWindow(size_t MinLength, size_t MaxLength, size_t IdleLength,
- size_t SetBitsThreshold) :
- MinLength(MinLength), MaxLength(MaxLength), IdleLength(IdleLength),
- SetBitsThreshold(SetBitsThreshold),
- CurrState(State::NotFilled), CurrLength(0), BBC(MinLength) {}
-
- std::pair<Edge, size_t> insert(bool Bit);
-
- void print(std::ostream &OS) const;
-
-private:
- size_t onRoundtripNotFilled(State PrevState, bool NewBit) {
- (void) PrevState, (void) NewBit;
-
- CurrLength += 1;
- return CurrLength;
- }
-
- size_t onRoundtripBelowThreshold(State PrevState, bool NewBit) {
- (void) PrevState, (void) NewBit;
-
- CurrLength = MinLength;
- return CurrLength;
- }
-
- size_t onRoundtripAboveThreshold(State PrevState, bool NewBit) {
- (void) PrevState, (void) NewBit;
-
- CurrLength += 1;
- return CurrLength;
- }
-
- size_t onRoundtripIdle(State PrevState, bool NewBit) {
- (void) PrevState, (void) NewBit;
-
- CurrLength += 1;
- return CurrLength;
- }
-
- size_t onNotFilledToBelowThreshold(State PrevState, bool NewBit) {
- (void) PrevState, (void) NewBit;
-
- CurrLength = MinLength;
- return CurrLength;
- }
-
- size_t onNotFilledToAboveThreshold(State PrevState, bool NewBit) {
- (void) PrevState, (void) NewBit;
-
- CurrLength += 1;
- return CurrLength;
- }
-
- size_t onBelowToAboveThreshold(State PrevState, bool NewBit) {
- (void) PrevState, (void) NewBit;
-
- CurrLength = MinLength;
- return CurrLength;
- }
-
- size_t onAboveThresholdToIdle(State PrevState, bool NewBit) {
- (void) PrevState, (void) NewBit;
-
- size_t PrevLength = CurrLength;
- CurrLength = 1;
- return PrevLength;
- }
-
- size_t onIdleToNotFilled(State PrevState, bool NewBit) {
- (void) PrevState, (void) NewBit;
-
- BBC = BitBufferCounter(MinLength);
- BBC.insert(NewBit);
-
- CurrLength = 1;
- return CurrLength;
- }
-
-private:
- size_t MinLength;
- size_t MaxLength;
- size_t IdleLength;
- size_t SetBitsThreshold;
-
- State CurrState;
- size_t CurrLength;
- BitBufferCounter BBC;
-};
-
-} // namespace ml
-
-inline std::ostream& operator<<(std::ostream &OS, const ml::BitRateWindow BRW) {
- BRW.print(OS);
- return OS;
-}
-
-#endif /* BIT_RATE_WINDOW_H */
diff --git a/ml/Config.cc b/ml/Config.cc
index 65b05a34d..eedd8c29f 100644
--- a/ml/Config.cc
+++ b/ml/Config.cc
@@ -31,8 +31,7 @@ void Config::readMLConfig(void) {
unsigned MaxTrainSamples = config_get_number(ConfigSectionML, "maximum num samples to train", 4 * 3600);
unsigned MinTrainSamples = config_get_number(ConfigSectionML, "minimum num samples to train", 1 * 900);
unsigned TrainEvery = config_get_number(ConfigSectionML, "train every", 1 * 3600);
-
- unsigned DBEngineAnomalyRateEvery = config_get_number(ConfigSectionML, "dbengine anomaly rate every", 30);
+ unsigned NumModelsToUse = config_get_number(ConfigSectionML, "number of models per dimension", 1 * 24);
unsigned DiffN = config_get_number(ConfigSectionML, "num samples to diff", 1);
unsigned SmoothN = config_get_number(ConfigSectionML, "num samples to smooth", 3);
@@ -42,27 +41,19 @@ void Config::readMLConfig(void) {
unsigned MaxKMeansIters = config_get_number(ConfigSectionML, "maximum number of k-means iterations", 1000);
double DimensionAnomalyScoreThreshold = config_get_float(ConfigSectionML, "dimension anomaly score threshold", 0.99);
- double HostAnomalyRateThreshold = config_get_float(ConfigSectionML, "host anomaly rate threshold", 0.01);
-
- double ADMinWindowSize = config_get_float(ConfigSectionML, "minimum window size", 30);
- double ADMaxWindowSize = config_get_float(ConfigSectionML, "maximum window size", 600);
- double ADIdleWindowSize = config_get_float(ConfigSectionML, "idle window size", 30);
- double ADWindowRateThreshold = config_get_float(ConfigSectionML, "window minimum anomaly rate", 0.25);
- double ADDimensionRateThreshold = config_get_float(ConfigSectionML, "anomaly event min dimension rate threshold", 0.05);
- std::stringstream SS;
- SS << netdata_configured_cache_dir << "/anomaly-detection.db";
- Cfg.AnomalyDBPath = SS.str();
+ double HostAnomalyRateThreshold = config_get_float(ConfigSectionML, "host anomaly rate threshold", 1.0);
+ std::string AnomalyDetectionGroupingMethod = config_get(ConfigSectionML, "anomaly detection grouping method", "average");
+ time_t AnomalyDetectionQueryDuration = config_get_number(ConfigSectionML, "anomaly detection grouping duration", 5 * 60);
/*
* Clamp
*/
- MaxTrainSamples = clamp(MaxTrainSamples, 1 * 3600u, 24 * 3600u);
- MinTrainSamples = clamp(MinTrainSamples, 1 * 900u, 6 * 3600u);
- TrainEvery = clamp(TrainEvery, 1 * 3600u, 6 * 3600u);
-
- DBEngineAnomalyRateEvery = clamp(DBEngineAnomalyRateEvery, 1 * 30u, 15 * 60u);
+ MaxTrainSamples = clamp<unsigned>(MaxTrainSamples, 1 * 3600, 24 * 3600);
+ MinTrainSamples = clamp<unsigned>(MinTrainSamples, 1 * 900, 6 * 3600);
+ TrainEvery = clamp<unsigned>(TrainEvery, 1 * 3600, 6 * 3600);
+ NumModelsToUse = clamp<unsigned>(TrainEvery, 1, 7 * 24);
DiffN = clamp(DiffN, 0u, 1u);
SmoothN = clamp(SmoothN, 0u, 5u);
@@ -72,13 +63,9 @@ void Config::readMLConfig(void) {
MaxKMeansIters = clamp(MaxKMeansIters, 500u, 1000u);
DimensionAnomalyScoreThreshold = clamp(DimensionAnomalyScoreThreshold, 0.01, 5.00);
- HostAnomalyRateThreshold = clamp(HostAnomalyRateThreshold, 0.01, 1.0);
- ADMinWindowSize = clamp(ADMinWindowSize, 30.0, 300.0);
- ADMaxWindowSize = clamp(ADMaxWindowSize, 60.0, 900.0);
- ADIdleWindowSize = clamp(ADIdleWindowSize, 30.0, 900.0);
- ADWindowRateThreshold = clamp(ADWindowRateThreshold, 0.01, 0.99);
- ADDimensionRateThreshold = clamp(ADDimensionRateThreshold, 0.01, 0.99);
+ HostAnomalyRateThreshold = clamp(HostAnomalyRateThreshold, 0.1, 10.0);
+ AnomalyDetectionQueryDuration = clamp<time_t>(AnomalyDetectionQueryDuration, 60, 15 * 60);
/*
* Validate
@@ -91,13 +78,6 @@ void Config::readMLConfig(void) {
MaxTrainSamples = 4 * 3600;
}
- if (ADMinWindowSize >= ADMaxWindowSize) {
- error("invalid min/max anomaly window size found (%lf >= %lf)", ADMinWindowSize, ADMaxWindowSize);
-
- ADMinWindowSize = 30.0;
- ADMaxWindowSize = 600.0;
- }
-
/*
* Assign to config instance
*/
@@ -107,8 +87,7 @@ void Config::readMLConfig(void) {
Cfg.MaxTrainSamples = MaxTrainSamples;
Cfg.MinTrainSamples = MinTrainSamples;
Cfg.TrainEvery = TrainEvery;
-
- Cfg.DBEngineAnomalyRateEvery = DBEngineAnomalyRateEvery;
+ Cfg.NumModelsToUse = NumModelsToUse;
Cfg.DiffN = DiffN;
Cfg.SmoothN = SmoothN;
@@ -118,13 +97,10 @@ void Config::readMLConfig(void) {
Cfg.MaxKMeansIters = MaxKMeansIters;
Cfg.DimensionAnomalyScoreThreshold = DimensionAnomalyScoreThreshold;
- Cfg.HostAnomalyRateThreshold = HostAnomalyRateThreshold;
- Cfg.ADMinWindowSize = ADMinWindowSize;
- Cfg.ADMaxWindowSize = ADMaxWindowSize;
- Cfg.ADIdleWindowSize = ADIdleWindowSize;
- Cfg.ADWindowRateThreshold = ADWindowRateThreshold;
- Cfg.ADDimensionRateThreshold = ADDimensionRateThreshold;
+ Cfg.HostAnomalyRateThreshold = HostAnomalyRateThreshold;
+ Cfg.AnomalyDetectionGroupingMethod = web_client_api_request_v1_data_group(AnomalyDetectionGroupingMethod.c_str(), RRDR_GROUPING_AVERAGE);
+ Cfg.AnomalyDetectionQueryDuration = AnomalyDetectionQueryDuration;
Cfg.HostsToSkip = config_get(ConfigSectionML, "hosts to skip from training", "!*");
Cfg.SP_HostsToSkip = simple_pattern_create(Cfg.HostsToSkip.c_str(), NULL, SIMPLE_PATTERN_EXACT);
diff --git a/ml/Config.h b/ml/Config.h
index 595fd072b..d876d4aa4 100644
--- a/ml/Config.h
+++ b/ml/Config.h
@@ -14,6 +14,7 @@ public:
unsigned MaxTrainSamples;
unsigned MinTrainSamples;
unsigned TrainEvery;
+ unsigned NumModelsToUse;
unsigned DBEngineAnomalyRateEvery;
@@ -25,13 +26,10 @@ public:
unsigned MaxKMeansIters;
double DimensionAnomalyScoreThreshold;
- double HostAnomalyRateThreshold;
- double ADMinWindowSize;
- double ADMaxWindowSize;
- double ADIdleWindowSize;
- double ADWindowRateThreshold;
- double ADDimensionRateThreshold;
+ double HostAnomalyRateThreshold;
+ RRDR_GROUPING AnomalyDetectionGroupingMethod;
+ time_t AnomalyDetectionQueryDuration;
bool StreamADCharts;
@@ -41,7 +39,6 @@ public:
std::string ChartsToSkip;
SIMPLE_PATTERN *SP_ChartsToSkip;
- std::string AnomalyDBPath;
std::vector<uint32_t> RandomNums;
void readMLConfig();
diff --git a/ml/Database.cc b/ml/Database.cc
deleted file mode 100644
index 06d0cdecb..000000000
--- a/ml/Database.cc
+++ /dev/null
@@ -1,127 +0,0 @@
-// SPDX-License-Identifier: GPL-3.0-or-later
-
-#include "Database.h"
-
-const char *ml::Database::SQL_CREATE_ANOMALIES_TABLE =
- "CREATE TABLE IF NOT EXISTS anomaly_events( "
- " anomaly_detector_name text NOT NULL, "
- " anomaly_detector_version int NOT NULL, "
- " host_id text NOT NULL, "
- " after int NOT NULL, "
- " before int NOT NULL, "
- " anomaly_event_info text, "
- " PRIMARY KEY( "
- " anomaly_detector_name, anomaly_detector_version, "
- " host_id, after, before "
- " ) "
- ");";
-
-const char *ml::Database::SQL_INSERT_ANOMALY =
- "INSERT INTO anomaly_events( "
- " anomaly_detector_name, anomaly_detector_version, "
- " host_id, after, before, anomaly_event_info) "
- "VALUES (?1, ?2, ?3, ?4, ?5, ?6);";
-
-const char *ml::Database::SQL_SELECT_ANOMALY =
- "SELECT anomaly_event_info FROM anomaly_events WHERE"
- " anomaly_detector_name == ?1 AND"
- " anomaly_detector_version == ?2 AND"
- " host_id == ?3 AND"
- " after == ?4 AND"
- " before == ?5;";
-
-const char *ml::Database::SQL_SELECT_ANOMALY_EVENTS =
- "SELECT after, before FROM anomaly_events WHERE"
- " anomaly_detector_name == ?1 AND"
- " anomaly_detector_version == ?2 AND"
- " host_id == ?3 AND"
- " after >= ?4 AND"
- " before <= ?5;";
-
-using namespace ml;
-
-bool Statement::prepare(sqlite3 *Conn) {
- if (!Conn)
- return false;
-
- if (ParsedStmt)
- return true;
-
- int RC = sqlite3_prepare_v2(Conn, RawStmt, -1, &ParsedStmt, nullptr);
- if (RC == SQLITE_OK)
- return true;
-
- std::string Msg = "Statement \"%s\" preparation failed due to \"%s\"";
- error(Msg.c_str(), RawStmt, sqlite3_errstr(RC));
-
- return false;
-}
-
-bool Statement::bindValue(size_t Pos, const std::string &Value) {
- int RC = sqlite3_bind_text(ParsedStmt, Pos, Value.c_str(), -1, SQLITE_TRANSIENT);
- if (RC == SQLITE_OK)
- return true;
-
- error("Failed to bind text '%s' (pos = %zu) in statement '%s'.", Value.c_str(), Pos, RawStmt);
- return false;
-}
-
-bool Statement::bindValue(size_t Pos, const int Value) {
- int RC = sqlite3_bind_int(ParsedStmt, Pos, Value);
- if (RC == SQLITE_OK)
- return true;
-
- error("Failed to bind integer %d (pos = %zu) in statement '%s'.", Value, Pos, RawStmt);
- return false;
-}
-
-bool Statement::resetAndClear(bool Ret) {
- int RC = sqlite3_reset(ParsedStmt);
- if (RC != SQLITE_OK) {
- error("Could not reset statement: '%s'", RawStmt);
- return false;
- }
-
- RC = sqlite3_clear_bindings(ParsedStmt);
- if (RC != SQLITE_OK) {
- error("Could not clear bindings in statement: '%s'", RawStmt);
- return false;
- }
-
- return Ret;
-}
-
-Database::Database(const std::string &Path) {
- // Get sqlite3 connection handle.
- int RC = sqlite3_open(Path.c_str(), &Conn);
- if (RC != SQLITE_OK) {
- std::string Msg = "Failed to initialize ML DB at %s, due to \"%s\"";
- error(Msg.c_str(), Path.c_str(), sqlite3_errstr(RC));
-
- sqlite3_close(Conn);
- Conn = nullptr;
- return;
- }
-
- // Create anomaly events table if it does not exist.
- char *ErrMsg;
- RC = sqlite3_exec(Conn, SQL_CREATE_ANOMALIES_TABLE, nullptr, nullptr, &ErrMsg);
- if (RC == SQLITE_OK)
- return;
-
- error("SQLite error during database initialization, rc = %d (%s)", RC, ErrMsg);
- error("SQLite failed statement: %s", SQL_CREATE_ANOMALIES_TABLE);
-
- sqlite3_free(ErrMsg);
- sqlite3_close(Conn);
- Conn = nullptr;
-}
-
-Database::~Database() {
- if (!Conn)
- return;
-
- int RC = sqlite3_close(Conn);
- if (RC != SQLITE_OK)
- error("Could not close connection properly (rc=%d)", RC);
-}
diff --git a/ml/Database.h b/ml/Database.h
deleted file mode 100644
index cc7b75872..000000000
--- a/ml/Database.h
+++ /dev/null
@@ -1,131 +0,0 @@
-// SPDX-License-Identifier: GPL-3.0-or-later
-
-#ifndef ML_DATABASE_H
-#define ML_DATABASE_H
-
-#include "Dimension.h"
-#include "ml-private.h"
-
-#include "json/single_include/nlohmann/json.hpp"
-
-namespace ml {
-
-class Statement {
-public:
- using RowCallback = std::function<void(sqlite3_stmt *Stmt)>;
-
-public:
- Statement(const char *RawStmt) : RawStmt(RawStmt), ParsedStmt(nullptr) {}
-
- template<typename ...ArgTypes>
- bool exec(sqlite3 *Conn, RowCallback RowCb, ArgTypes ...Args) {
- if (!prepare(Conn))
- return false;
-
- switch (bind(1, Args...)) {
- case 0:
- return false;
- case sizeof...(Args):
- break;
- default:
- return resetAndClear(false);
- }
-
- while (true) {
- switch (int RC = sqlite3_step(ParsedStmt)) {
- case SQLITE_BUSY: case SQLITE_LOCKED:
- usleep(SQLITE_INSERT_DELAY * USEC_PER_MS);
- continue;
- case SQLITE_ROW:
- RowCb(ParsedStmt);
- continue;
- case SQLITE_DONE:
- return resetAndClear(true);
- default:
- error("Stepping through '%s' returned rc=%d", RawStmt, RC);
- return resetAndClear(false);
- }
- }
- }
-
- ~Statement() {
- if (!ParsedStmt)
- return;
-
- int RC = sqlite3_finalize(ParsedStmt);
- if (RC != SQLITE_OK)
- error("Could not properly finalize statement (rc=%d)", RC);
- }
-
-private:
- bool prepare(sqlite3 *Conn);
-
- bool bindValue(size_t Pos, const int Value);
- bool bindValue(size_t Pos, const std::string &Value);
-
- template<typename ArgType, typename ...ArgTypes>
- size_t bind(size_t Pos, ArgType T) {
- return bindValue(Pos, T);
- }
-
- template<typename ArgType, typename ...ArgTypes>
- size_t bind(size_t Pos, ArgType T, ArgTypes ...Args) {
- return bindValue(Pos, T) + bind(Pos + 1, Args...);
- }
-
- bool resetAndClear(bool Ret);
-
-private:
- const char *RawStmt;
- sqlite3_stmt *ParsedStmt;
-};
-
-class Database {
-private:
- static const char *SQL_CREATE_ANOMALIES_TABLE;
- static const char *SQL_INSERT_ANOMALY;
- static const char *SQL_SELECT_ANOMALY;
- static const char *SQL_SELECT_ANOMALY_EVENTS;
-
-public:
- Database(const std::string &Path);
-
- ~Database();
-
- template<typename ...ArgTypes>
- bool insertAnomaly(ArgTypes... Args) {
- Statement::RowCallback RowCb = [](sqlite3_stmt *Stmt) { (void) Stmt; };
- return InsertAnomalyStmt.exec(Conn, RowCb, Args...);
- }
-
- template<typename ...ArgTypes>
- bool getAnomalyInfo(nlohmann::json &Json, ArgTypes&&... Args) {
- Statement::RowCallback RowCb = [&](sqlite3_stmt *Stmt) {
- const char *Text = static_cast<const char *>(sqlite3_column_blob(Stmt, 0));
- Json = nlohmann::json::parse(Text);
- };
- return GetAnomalyInfoStmt.exec(Conn, RowCb, Args...);
- }
-
- template<typename ...ArgTypes>
- bool getAnomaliesInRange(std::vector<std::pair<time_t, time_t>> &V, ArgTypes&&... Args) {
- Statement::RowCallback RowCb = [&](sqlite3_stmt *Stmt) {
- V.push_back({
- sqlite3_column_int64(Stmt, 0),
- sqlite3_column_int64(Stmt, 1)
- });
- };
- return GetAnomaliesInRangeStmt.exec(Conn, RowCb, Args...);
- }
-
-private:
- sqlite3 *Conn;
-
- Statement InsertAnomalyStmt{SQL_INSERT_ANOMALY};
- Statement GetAnomalyInfoStmt{SQL_SELECT_ANOMALY};
- Statement GetAnomaliesInRangeStmt{SQL_SELECT_ANOMALY_EVENTS};
-};
-
-}
-
-#endif /* ML_DATABASE_H */
diff --git a/ml/Dimension.cc b/ml/Dimension.cc
index 0fe07530c..bf34abb72 100644
--- a/ml/Dimension.cc
+++ b/ml/Dimension.cc
@@ -6,8 +6,13 @@
using namespace ml;
-std::pair<CalculatedNumber *, size_t>
-TrainableDimension::getCalculatedNumbers() {
+bool Dimension::isActive() const {
+ bool SetObsolete = rrdset_flag_check(RD->rrdset, RRDSET_FLAG_OBSOLETE);
+ bool DimObsolete = rrddim_flag_check(RD, RRDDIM_FLAG_OBSOLETE);
+ return !SetObsolete && !DimObsolete;
+}
+
+std::pair<CalculatedNumber *, size_t> Dimension::getCalculatedNumbers() {
size_t MinN = Cfg.MinTrainSamples;
size_t MaxN = Cfg.MaxTrainSamples;
@@ -68,7 +73,7 @@ TrainableDimension::getCalculatedNumbers() {
return { CNs, TotalValues };
}
-MLResult TrainableDimension::trainModel() {
+MLResult Dimension::trainModel() {
auto P = getCalculatedNumbers();
CalculatedNumber *CNs = P.first;
unsigned N = P.second;
@@ -81,7 +86,15 @@ MLResult TrainableDimension::trainModel() {
SamplesBuffer SB = SamplesBuffer(CNs, N, 1, Cfg.DiffN, Cfg.SmoothN, Cfg.LagN,
SamplingRatio, Cfg.RandomNums);
- KM.train(SB, Cfg.MaxKMeansIters);
+ std::vector<DSample> Samples = SB.preprocess();
+
+ KMeans KM;
+ KM.train(Samples, Cfg.MaxKMeansIters);
+
+ {
+ std::lock_guard<std::mutex> Lock(Mutex);
+ Models[0] = KM;
+ }
Trained = true;
ConstantModel = true;
@@ -90,16 +103,25 @@ MLResult TrainableDimension::trainModel() {
return MLResult::Success;
}
-void PredictableDimension::addValue(CalculatedNumber Value, bool Exists) {
+bool Dimension::shouldTrain(const TimePoint &TP) const {
+ if (ConstantModel)
+ return false;
+
+ return (LastTrainedAt + Seconds(Cfg.TrainEvery * updateEvery())) < TP;
+}
+
+bool Dimension::predict(CalculatedNumber Value, bool Exists) {
if (!Exists) {
CNs.clear();
- return;
+ AnomalyBit = false;
+ return false;
}
unsigned N = Cfg.DiffN + Cfg.SmoothN + Cfg.LagN;
if (CNs.size() < N) {
CNs.push_back(Value);
- return;
+ AnomalyBit = false;
+ return false;
}
std::rotate(std::begin(CNs), std::begin(CNs) + 1, std::end(CNs));
@@ -108,28 +130,44 @@ void PredictableDimension::addValue(CalculatedNumber Value, bool Exists) {
ConstantModel = false;
CNs[N - 1] = Value;
-}
-std::pair<MLResult, bool> PredictableDimension::predict() {
- unsigned N = Cfg.DiffN + Cfg.SmoothN + Cfg.LagN;
- if (CNs.size() != N) {
+ if (!isTrained() || ConstantModel) {
AnomalyBit = false;
- return { MLResult::MissingData, AnomalyBit };
+ return false;
}
CalculatedNumber *TmpCNs = new CalculatedNumber[N * (Cfg.LagN + 1)]();
std::memcpy(TmpCNs, CNs.data(), N * sizeof(CalculatedNumber));
-
- SamplesBuffer SB = SamplesBuffer(TmpCNs, N, 1, Cfg.DiffN, Cfg.SmoothN, Cfg.LagN,
+ SamplesBuffer SB = SamplesBuffer(TmpCNs, N, 1,
+ Cfg.DiffN, Cfg.SmoothN, Cfg.LagN,
1.0, Cfg.RandomNums);
- AnomalyScore = computeAnomalyScore(SB);
+ const DSample Sample = SB.preprocess().back();
delete[] TmpCNs;
- if (AnomalyScore == std::numeric_limits<CalculatedNumber>::quiet_NaN()) {
+ std::unique_lock<std::mutex> Lock(Mutex, std::defer_lock);
+ if (!Lock.try_lock()) {
AnomalyBit = false;
- return { MLResult::NaN, AnomalyBit };
+ return false;
}
- AnomalyBit = AnomalyScore >= (100 * Cfg.DimensionAnomalyScoreThreshold);
- return { MLResult::Success, AnomalyBit };
+ for (const auto &KM : Models) {
+ double AnomalyScore = KM.anomalyScore(Sample);
+ if (AnomalyScore == std::numeric_limits<CalculatedNumber>::quiet_NaN()) {
+ AnomalyBit = false;
+ continue;
+ }
+
+ if (AnomalyScore < (100 * Cfg.DimensionAnomalyScoreThreshold)) {
+ AnomalyBit = false;
+ return false;
+ }
+ }
+
+ AnomalyBit = true;
+ return true;
+}
+
+std::array<KMeans, 1> Dimension::getModels() {
+ std::unique_lock<std::mutex> Lock(Mutex);
+ return Models;
}
diff --git a/ml/Dimension.h b/ml/Dimension.h
index 4fbc09b98..3ec56e098 100644
--- a/ml/Dimension.h
+++ b/ml/Dimension.h
@@ -3,157 +3,92 @@
#ifndef ML_DIMENSION_H
#define ML_DIMENSION_H
-#include "BitBufferCounter.h"
+#include "Query.h"
#include "Config.h"
#include "ml-private.h"
namespace ml {
-class RrdDimension {
-public:
- RrdDimension(RRDDIM *RD) : RD(RD), Ops(&RD->tiers[0]->query_ops) { }
-
- RRDDIM *getRD() const { return RD; }
-
- time_t latestTime() { return Ops->latest_time(RD->tiers[0]->db_metric_handle); }
-
- time_t oldestTime() { return Ops->oldest_time(RD->tiers[0]->db_metric_handle); }
-
- unsigned updateEvery() const { return RD->update_every; }
-
- const std::string getID() const {
- RRDSET *RS = RD->rrdset;
-
- std::stringstream SS;
- SS << RS->context << "|" << RS->id << "|" << RD->name;
- return SS.str();
- }
-
- bool isActive() const {
- if (rrdset_flag_check(RD->rrdset, RRDSET_FLAG_OBSOLETE))
- return false;
-
- if (rrddim_flag_check(RD, RRDDIM_FLAG_OBSOLETE))
- return false;
-
- return true;
- }
-
- void setAnomalyRateRD(RRDDIM *ARRD) { AnomalyRateRD = ARRD; }
- RRDDIM *getAnomalyRateRD() const { return AnomalyRateRD; }
-
- void setAnomalyRateRDName(const char *Name) const {
- rrddim_set_name(AnomalyRateRD->rrdset, AnomalyRateRD, Name);
- }
-
- virtual ~RrdDimension() {
- rrddim_free(AnomalyRateRD->rrdset, AnomalyRateRD);
- }
-
-private:
- RRDDIM *RD;
- RRDDIM *AnomalyRateRD;
-
- struct rrddim_query_ops *Ops;
-
- std::string ID;
-};
-
enum class MLResult {
Success = 0,
MissingData,
NaN,
};
-class TrainableDimension : public RrdDimension {
-public:
- TrainableDimension(RRDDIM *RD) :
- RrdDimension(RD), TrainEvery(Cfg.TrainEvery * updateEvery()) {}
+static inline std::string getMLDimensionID(RRDDIM *RD) {
+ RRDSET *RS = RD->rrdset;
- MLResult trainModel();
+ std::stringstream SS;
+ SS << rrdset_context(RS) << "|" << rrdset_id(RS) << "|" << rrddim_name(RD);
+ return SS.str();
+}
- CalculatedNumber computeAnomalyScore(SamplesBuffer &SB) {
- return Trained ? KM.anomalyScore(SB) : 0.0;
+class Dimension {
+public:
+ Dimension(RRDDIM *RD) :
+ RD(RD),
+ LastTrainedAt(Seconds(0)),
+ Trained(false),
+ ConstantModel(false),
+ AnomalyScore(0.0),
+ AnomalyBit(0)
+ { }
+
+ RRDDIM *getRD() const {
+ return RD;
}
- bool shouldTrain(const TimePoint &TP) const {
- if (ConstantModel)
- return false;
-
- return (LastTrainedAt + TrainEvery) < TP;
+ unsigned updateEvery() const {
+ return RD->update_every;
}
- bool isTrained() const { return Trained; }
-
-private:
- std::pair<CalculatedNumber *, size_t> getCalculatedNumbers();
-
-public:
- TimePoint LastTrainedAt{Seconds{0}};
+ time_t latestTime() const {
+ return Query(RD).latestTime();
+ }
-protected:
- std::atomic<bool> ConstantModel{false};
+ time_t oldestTime() const {
+ return Query(RD).oldestTime();
+ }
-private:
- Seconds TrainEvery;
- KMeans KM;
+ bool isTrained() const {
+ return Trained;
+ }
- std::atomic<bool> Trained{false};
-};
+ bool isAnomalous() const {
+ return AnomalyBit;
+ }
-class PredictableDimension : public TrainableDimension {
-public:
- PredictableDimension(RRDDIM *RD) : TrainableDimension(RD) {}
+ bool shouldTrain(const TimePoint &TP) const;
- std::pair<MLResult, bool> predict();
+ bool isActive() const;
- void addValue(CalculatedNumber Value, bool Exists);
+ MLResult trainModel();
- bool isAnomalous() { return AnomalyBit; }
+ bool predict(CalculatedNumber Value, bool Exists);
- void updateAnomalyBitCounter(RRDSET *RS, unsigned Elapsed, bool IsAnomalous) {
- AnomalyBitCounter += IsAnomalous;
+ std::pair<bool, double> detect(size_t WindowLength, bool Reset);
- if (Elapsed == Cfg.DBEngineAnomalyRateEvery) {
- double AR = static_cast<double>(AnomalyBitCounter) / Cfg.DBEngineAnomalyRateEvery;
- rrddim_set_by_pointer(RS, getAnomalyRateRD(), AR * 1000);
- AnomalyBitCounter = 0;
- }
- }
+ std::array<KMeans, 1> getModels();
private:
- CalculatedNumber AnomalyScore{0.0};
- std::atomic<bool> AnomalyBit{false};
- unsigned AnomalyBitCounter{0};
-
- std::vector<CalculatedNumber> CNs;
-};
+ std::pair<CalculatedNumber *, size_t> getCalculatedNumbers();
-class DetectableDimension : public PredictableDimension {
public:
- DetectableDimension(RRDDIM *RD) : PredictableDimension(RD) {}
-
- std::pair<bool, double> detect(size_t WindowLength, bool Reset) {
- bool AnomalyBit = isAnomalous();
-
- if (Reset)
- NumSetBits = BBC.numSetBits();
+ RRDDIM *RD;
- NumSetBits += AnomalyBit;
- BBC.insert(AnomalyBit);
+ TimePoint LastTrainedAt;
+ std::atomic<bool> Trained;
+ std::atomic<bool> ConstantModel;
- double AnomalyRate = static_cast<double>(NumSetBits) / WindowLength;
- return { AnomalyBit, AnomalyRate };
- }
+ CalculatedNumber AnomalyScore;
+ std::atomic<bool> AnomalyBit;
-private:
- BitBufferCounter BBC{static_cast<size_t>(Cfg.ADMinWindowSize)};
- size_t NumSetBits{0};
+ std::vector<CalculatedNumber> CNs;
+ std::array<KMeans, 1> Models;
+ std::mutex Mutex;
};
-using Dimension = DetectableDimension;
-
} // namespace ml
#endif /* ML_DIMENSION_H */
diff --git a/ml/Host.cc b/ml/Host.cc
index f8cba9d64..4a57178c7 100644
--- a/ml/Host.cc
+++ b/ml/Host.cc
@@ -1,278 +1,20 @@
// SPDX-License-Identifier: GPL-3.0-or-later
-#include <dlib/statistics.h>
-
#include "Config.h"
#include "Host.h"
+#include "ADCharts.h"
#include "json/single_include/nlohmann/json.hpp"
using namespace ml;
-static void updateDimensionsChart(RRDHOST *RH,
- collected_number NumTrainedDimensions,
- collected_number NumNormalDimensions,
- collected_number NumAnomalousDimensions) {
- static thread_local RRDSET *RS = nullptr;
- static thread_local RRDDIM *NumTotalDimensionsRD = nullptr;
- static thread_local RRDDIM *NumTrainedDimensionsRD = nullptr;
- static thread_local RRDDIM *NumNormalDimensionsRD = nullptr;
- static thread_local RRDDIM *NumAnomalousDimensionsRD = nullptr;
-
- if (!RS) {
- std::stringstream IdSS, NameSS;
-
- IdSS << "dimensions_on_" << localhost->machine_guid;
- NameSS << "dimensions_on_" << localhost->hostname;
-
- RS = rrdset_create(
- RH,
- "anomaly_detection", // type
- IdSS.str().c_str(), // id
- NameSS.str().c_str(), // name
- "dimensions", // family
- "anomaly_detection.dimensions", // ctx
- "Anomaly detection dimensions", // title
- "dimensions", // units
- "netdata", // plugin
- "ml", // module
- 39183, // priority
- RH->rrd_update_every, // update_every
- RRDSET_TYPE_LINE // chart_type
- );
- rrdset_flag_set(RS, RRDSET_FLAG_ANOMALY_DETECTION);
-
- NumTotalDimensionsRD = rrddim_add(RS, "total", NULL,
- 1, 1, RRD_ALGORITHM_ABSOLUTE);
- NumTrainedDimensionsRD = rrddim_add(RS, "trained", NULL,
- 1, 1, RRD_ALGORITHM_ABSOLUTE);
- NumNormalDimensionsRD = rrddim_add(RS, "normal", NULL,
- 1, 1, RRD_ALGORITHM_ABSOLUTE);
- NumAnomalousDimensionsRD = rrddim_add(RS, "anomalous", NULL,
- 1, 1, RRD_ALGORITHM_ABSOLUTE);
- } else
- rrdset_next(RS);
-
- rrddim_set_by_pointer(RS, NumTotalDimensionsRD, NumNormalDimensions + NumAnomalousDimensions);
- rrddim_set_by_pointer(RS, NumTrainedDimensionsRD, NumTrainedDimensions);
- rrddim_set_by_pointer(RS, NumNormalDimensionsRD, NumNormalDimensions);
- rrddim_set_by_pointer(RS, NumAnomalousDimensionsRD, NumAnomalousDimensions);
-
- rrdset_done(RS);
-}
-
-static void updateRateChart(RRDHOST *RH, collected_number AnomalyRate) {
- static thread_local RRDSET *RS = nullptr;
- static thread_local RRDDIM *AnomalyRateRD = nullptr;
-
- if (!RS) {
- std::stringstream IdSS, NameSS;
-
- IdSS << "anomaly_rate_on_" << localhost->machine_guid;
- NameSS << "anomaly_rate_on_" << localhost->hostname;
-
- RS = rrdset_create(
- RH,
- "anomaly_detection", // type
- IdSS.str().c_str(), // id
- NameSS.str().c_str(), // name
- "anomaly_rate", // family
- "anomaly_detection.anomaly_rate", // ctx
- "Percentage of anomalous dimensions", // title
- "percentage", // units
- "netdata", // plugin
- "ml", // module
- 39184, // priority
- RH->rrd_update_every, // update_every
- RRDSET_TYPE_LINE // chart_type
- );
- rrdset_flag_set(RS, RRDSET_FLAG_ANOMALY_DETECTION);
-
- AnomalyRateRD = rrddim_add(RS, "anomaly_rate", NULL,
- 1, 100, RRD_ALGORITHM_ABSOLUTE);
- } else
- rrdset_next(RS);
-
- rrddim_set_by_pointer(RS, AnomalyRateRD, AnomalyRate);
-
- rrdset_done(RS);
-}
-
-static void updateWindowLengthChart(RRDHOST *RH, collected_number WindowLength) {
- static thread_local RRDSET *RS = nullptr;
- static thread_local RRDDIM *WindowLengthRD = nullptr;
-
- if (!RS) {
- std::stringstream IdSS, NameSS;
-
- IdSS << "detector_window_on_" << localhost->machine_guid;
- NameSS << "detector_window_on_" << localhost->hostname;
-
- RS = rrdset_create(
- RH,
- "anomaly_detection", // type
- IdSS.str().c_str(), // id
- NameSS.str().c_str(), // name
- "detector_window", // family
- "anomaly_detection.detector_window", // ctx
- "Anomaly detector window length", // title
- "seconds", // units
- "netdata", // plugin
- "ml", // module
- 39185, // priority
- RH->rrd_update_every, // update_every
- RRDSET_TYPE_LINE // chart_type
- );
- rrdset_flag_set(RS, RRDSET_FLAG_ANOMALY_DETECTION);
-
- WindowLengthRD = rrddim_add(RS, "duration", NULL,
- 1, 1, RRD_ALGORITHM_ABSOLUTE);
- } else
- rrdset_next(RS);
-
- rrddim_set_by_pointer(RS, WindowLengthRD, WindowLength * RH->rrd_update_every);
- rrdset_done(RS);
-}
-
-static void updateEventsChart(RRDHOST *RH,
- std::pair<BitRateWindow::Edge, size_t> P,
- bool ResetBitCounter,
- bool NewAnomalyEvent) {
- static thread_local RRDSET *RS = nullptr;
- static thread_local RRDDIM *AboveThresholdRD = nullptr;
- static thread_local RRDDIM *ResetBitCounterRD = nullptr;
- static thread_local RRDDIM *NewAnomalyEventRD = nullptr;
-
- if (!RS) {
- std::stringstream IdSS, NameSS;
-
- IdSS << "detector_events_on_" << localhost->machine_guid;
- NameSS << "detector_events_on_" << localhost->hostname;
-
- RS = rrdset_create(
- RH,
- "anomaly_detection", // type
- IdSS.str().c_str(), // id
- NameSS.str().c_str(), // name
- "detector_events", // family
- "anomaly_detection.detector_events", // ctx
- "Anomaly events triggered", // title
- "boolean", // units
- "netdata", // plugin
- "ml", // module
- 39186, // priority
- RH->rrd_update_every, // update_every
- RRDSET_TYPE_LINE // chart_type
- );
- rrdset_flag_set(RS, RRDSET_FLAG_ANOMALY_DETECTION);
-
- AboveThresholdRD = rrddim_add(RS, "above_threshold", NULL,
- 1, 1, RRD_ALGORITHM_ABSOLUTE);
- ResetBitCounterRD = rrddim_add(RS, "reset_bit_counter", NULL,
- 1, 1, RRD_ALGORITHM_ABSOLUTE);
- NewAnomalyEventRD = rrddim_add(RS, "new_anomaly_event", NULL,
- 1, 1, RRD_ALGORITHM_ABSOLUTE);
- } else
- rrdset_next(RS);
-
- BitRateWindow::Edge E = P.first;
- bool AboveThreshold = E.second == BitRateWindow::State::AboveThreshold;
-
- rrddim_set_by_pointer(RS, AboveThresholdRD, AboveThreshold);
- rrddim_set_by_pointer(RS, ResetBitCounterRD, ResetBitCounter);
- rrddim_set_by_pointer(RS, NewAnomalyEventRD, NewAnomalyEvent);
-
- rrdset_done(RS);
-}
-
-static void updateDetectionChart(RRDHOST *RH) {
- static thread_local RRDSET *RS = nullptr;
- static thread_local RRDDIM *UserRD, *SystemRD = nullptr;
-
- if (!RS) {
- std::stringstream IdSS, NameSS;
-
- IdSS << "prediction_stats_" << RH->machine_guid;
- NameSS << "prediction_stats_for_" << RH->hostname;
-
- RS = rrdset_create_localhost(
- "netdata", // type
- IdSS.str().c_str(), // id
- NameSS.str().c_str(), // name
- "ml", // family
- "netdata.prediction_stats", // ctx
- "Prediction thread CPU usage", // title
- "milliseconds/s", // units
- "netdata", // plugin
- "ml", // module
- 136000, // priority
- RH->rrd_update_every, // update_every
- RRDSET_TYPE_STACKED // chart_type
- );
-
- UserRD = rrddim_add(RS, "user", NULL, 1, 1000, RRD_ALGORITHM_INCREMENTAL);
- SystemRD = rrddim_add(RS, "system", NULL, 1, 1000, RRD_ALGORITHM_INCREMENTAL);
- } else
- rrdset_next(RS);
-
- struct rusage TRU;
- getrusage(RUSAGE_THREAD, &TRU);
-
- rrddim_set_by_pointer(RS, UserRD, TRU.ru_utime.tv_sec * 1000000ULL + TRU.ru_utime.tv_usec);
- rrddim_set_by_pointer(RS, SystemRD, TRU.ru_stime.tv_sec * 1000000ULL + TRU.ru_stime.tv_usec);
- rrdset_done(RS);
-}
-
-static void updateTrainingChart(RRDHOST *RH, struct rusage *TRU)
-{
- static thread_local RRDSET *RS = nullptr;
- static thread_local RRDDIM *UserRD = nullptr;
- static thread_local RRDDIM *SystemRD = nullptr;
-
- if (!RS) {
- std::stringstream IdSS, NameSS;
-
- IdSS << "training_stats_" << RH->machine_guid;
- NameSS << "training_stats_for_" << RH->hostname;
-
- RS = rrdset_create_localhost(
- "netdata", // type
- IdSS.str().c_str(), // id
- NameSS.str().c_str(), // name
- "ml", // family
- "netdata.training_stats", // ctx
- "Training thread CPU usage", // title
- "milliseconds/s", // units
- "netdata", // plugin
- "ml", // module
- 136001, // priority
- RH->rrd_update_every, // update_every
- RRDSET_TYPE_STACKED // chart_type
- );
-
- UserRD = rrddim_add(RS, "user", NULL, 1, 1000, RRD_ALGORITHM_INCREMENTAL);
- SystemRD = rrddim_add(RS, "system", NULL, 1, 1000, RRD_ALGORITHM_INCREMENTAL);
- } else
- rrdset_next(RS);
-
- rrddim_set_by_pointer(RS, UserRD, TRU->ru_utime.tv_sec * 1000000ULL + TRU->ru_utime.tv_usec);
- rrddim_set_by_pointer(RS, SystemRD, TRU->ru_stime.tv_sec * 1000000ULL + TRU->ru_stime.tv_usec);
- rrdset_done(RS);
-}
-
void RrdHost::addDimension(Dimension *D) {
- RRDDIM *AnomalyRateRD = rrddim_add(AnomalyRateRS, D->getID().c_str(), NULL,
- 1, 1000, RRD_ALGORITHM_ABSOLUTE);
- D->setAnomalyRateRD(AnomalyRateRD);
-
- {
- std::lock_guard<std::mutex> Lock(Mutex);
+ std::lock_guard<std::mutex> Lock(Mutex);
- DimensionsMap[D->getRD()] = D;
+ DimensionsMap[D->getRD()] = D;
- // Default construct mutex for dimension
- LocksMap[D];
- }
+ // Default construct mutex for dimension
+ LocksMap[D];
}
void RrdHost::removeDimension(Dimension *D) {
@@ -312,18 +54,33 @@ void RrdHost::getConfigAsJson(nlohmann::json &Json) const {
Json["max-kmeans-iters"] = Cfg.MaxKMeansIters;
Json["dimension-anomaly-score-threshold"] = Cfg.DimensionAnomalyScoreThreshold;
- Json["host-anomaly-rate-threshold"] = Cfg.HostAnomalyRateThreshold;
- Json["min-window-size"] = Cfg.ADMinWindowSize;
- Json["max-window-size"] = Cfg.ADMaxWindowSize;
- Json["idle-window-size"] = Cfg.ADIdleWindowSize;
- Json["window-rate-threshold"] = Cfg.ADWindowRateThreshold;
- Json["dimension-rate-threshold"] = Cfg.ADDimensionRateThreshold;
+ Json["host-anomaly-rate-threshold"] = Cfg.HostAnomalyRateThreshold;
+ Json["anomaly-detection-grouping-method"] = group_method2string(Cfg.AnomalyDetectionGroupingMethod);
+ Json["anomaly-detection-query-duration"] = Cfg.AnomalyDetectionQueryDuration;
Json["hosts-to-skip"] = Cfg.HostsToSkip;
Json["charts-to-skip"] = Cfg.ChartsToSkip;
}
+void TrainableHost::getModelsAsJson(nlohmann::json &Json) {
+ std::lock_guard<std::mutex> Lock(Mutex);
+
+ for (auto &DP : DimensionsMap) {
+ Dimension *D = DP.second;
+
+ nlohmann::json JsonArray = nlohmann::json::array();
+ for (const KMeans &KM : D->getModels()) {
+ nlohmann::json J;
+ KM.toJson(J);
+ JsonArray.push_back(J);
+ }
+ Json[getMLDimensionID(D->getRD())] = JsonArray;
+ }
+
+ return;
+}
+
std::pair<Dimension *, Duration<double>>
TrainableHost::findDimensionToTrain(const TimePoint &NowTP) {
std::lock_guard<std::mutex> Lock(Mutex);
@@ -384,7 +141,12 @@ void TrainableHost::train() {
worker_is_idle();
SleepFor = std::min(AllottedDuration - RealDuration, MaxSleepFor);
- std::this_thread::sleep_for(SleepFor);
+ TimePoint Now = SteadyClock::now();
+ auto Until = Now + SleepFor;
+ while (Now < Until && !netdata_exit) {
+ std::this_thread::sleep_for(std::chrono::milliseconds(1000));
+ Now = SteadyClock::now();
+ }
worker_is_busy(0);
}
}
@@ -393,78 +155,44 @@ void TrainableHost::train() {
#define WORKER_JOB_UPDATE_DETECTION_CHART 1
#define WORKER_JOB_UPDATE_ANOMALY_RATES 2
#define WORKER_JOB_UPDATE_CHARTS 3
-#define WORKER_JOB_SAVE_ANOMALY_EVENT 4
#if WORKER_UTILIZATION_MAX_JOB_TYPES < 5
#error WORKER_UTILIZATION_MAX_JOB_TYPES has to be at least 5
#endif
void DetectableHost::detectOnce() {
- auto P = BRW.insert(WindowAnomalyRate >= Cfg.HostAnomalyRateThreshold);
- BitRateWindow::Edge Edge = P.first;
- size_t WindowLength = P.second;
-
- bool ResetBitCounter = (Edge.first != BitRateWindow::State::AboveThreshold);
- bool NewAnomalyEvent = (Edge.first == BitRateWindow::State::AboveThreshold) &&
- (Edge.second == BitRateWindow::State::Idle);
-
- std::vector<std::pair<double, std::string>> DimsOverThreshold;
-
size_t NumAnomalousDimensions = 0;
size_t NumNormalDimensions = 0;
size_t NumTrainedDimensions = 0;
size_t NumActiveDimensions = 0;
- bool CollectAnomalyRates = (++AnomalyRateTimer == Cfg.DBEngineAnomalyRateEvery);
- if (CollectAnomalyRates)
- rrdset_next(AnomalyRateRS);
-
{
std::lock_guard<std::mutex> Lock(Mutex);
- DimsOverThreshold.reserve(DimensionsMap.size());
-
for (auto &DP : DimensionsMap) {
worker_is_busy(WORKER_JOB_DETECT_DIMENSION);
Dimension *D = DP.second;
- if (!D->isActive()) {
- D->updateAnomalyBitCounter(AnomalyRateRS, AnomalyRateTimer, false);
+ if (!D->isActive())
continue;
- }
NumActiveDimensions++;
-
- auto P = D->detect(WindowLength, ResetBitCounter);
- bool IsAnomalous = P.first;
- double AnomalyScore = P.second;
-
NumTrainedDimensions += D->isTrained();
+ bool IsAnomalous = D->isAnomalous();
if (IsAnomalous)
NumAnomalousDimensions += 1;
-
- if (NewAnomalyEvent && (AnomalyScore >= Cfg.ADDimensionRateThreshold))
- DimsOverThreshold.push_back({ AnomalyScore, D->getID() });
-
- D->updateAnomalyBitCounter(AnomalyRateRS, AnomalyRateTimer, IsAnomalous);
}
if (NumAnomalousDimensions)
- WindowAnomalyRate = static_cast<double>(NumAnomalousDimensions) / NumActiveDimensions;
+ HostAnomalyRate = static_cast<double>(NumAnomalousDimensions) / NumActiveDimensions;
else
- WindowAnomalyRate = 0.0;
+ HostAnomalyRate = 0.0;
NumNormalDimensions = NumActiveDimensions - NumAnomalousDimensions;
}
- if (CollectAnomalyRates) {
- worker_is_busy(WORKER_JOB_UPDATE_ANOMALY_RATES);
- AnomalyRateTimer = 0;
- rrdset_done(AnomalyRateRS);
- }
-
this->NumAnomalousDimensions = NumAnomalousDimensions;
this->NumNormalDimensions = NumNormalDimensions;
this->NumTrainedDimensions = NumTrainedDimensions;
@@ -472,38 +200,11 @@ void DetectableHost::detectOnce() {
worker_is_busy(WORKER_JOB_UPDATE_CHARTS);
updateDimensionsChart(getRH(), NumTrainedDimensions, NumNormalDimensions, NumAnomalousDimensions);
- updateRateChart(getRH(), WindowAnomalyRate * 10000.0);
- updateWindowLengthChart(getRH(), WindowLength);
- updateEventsChart(getRH(), P, ResetBitCounter, NewAnomalyEvent);
+ updateHostAndDetectionRateCharts(getRH(), HostAnomalyRate * 10000.0);
struct rusage TRU;
getResourceUsage(&TRU);
updateTrainingChart(getRH(), &TRU);
-
- if (!NewAnomalyEvent || (DimsOverThreshold.size() == 0))
- return;
-
- worker_is_busy(WORKER_JOB_SAVE_ANOMALY_EVENT);
-
- std::sort(DimsOverThreshold.begin(), DimsOverThreshold.end());
- std::reverse(DimsOverThreshold.begin(), DimsOverThreshold.end());
-
- // Make sure the JSON response won't grow beyond a specific number
- // of dimensions. Log an error message if this happens, because it
- // most likely means that the user specified a very-low anomaly rate
- // threshold.
- size_t NumMaxDimsOverThreshold = 2000;
- if (DimsOverThreshold.size() > NumMaxDimsOverThreshold) {
- error("Found %zu dimensions over threshold. Reducing JSON result to %zu dimensions.",
- DimsOverThreshold.size(), NumMaxDimsOverThreshold);
- DimsOverThreshold.resize(NumMaxDimsOverThreshold);
- }
-
- nlohmann::json JsonResult = DimsOverThreshold;
-
- time_t Before = now_realtime_sec();
- time_t After = Before - (WindowLength * updateEvery());
- DB.insertAnomaly("AD1", 1, getUUID(), After, Before, JsonResult.dump(4));
}
void DetectableHost::detect() {
@@ -512,7 +213,6 @@ void DetectableHost::detect() {
worker_register_job_name(WORKER_JOB_UPDATE_DETECTION_CHART, "detection chart");
worker_register_job_name(WORKER_JOB_UPDATE_ANOMALY_RATES, "anomaly rates");
worker_register_job_name(WORKER_JOB_UPDATE_CHARTS, "charts");
- worker_register_job_name(WORKER_JOB_SAVE_ANOMALY_EVENT, "anomaly event");
std::this_thread::sleep_for(Seconds{10});
diff --git a/ml/Host.h b/ml/Host.h
index 2715008f0..52a0cd095 100644
--- a/ml/Host.h
+++ b/ml/Host.h
@@ -3,38 +3,17 @@
#ifndef ML_HOST_H
#define ML_HOST_H
-#include "BitRateWindow.h"
#include "Config.h"
-#include "Database.h"
#include "Dimension.h"
#include "ml-private.h"
+#include "json/single_include/nlohmann/json.hpp"
namespace ml {
class RrdHost {
public:
- RrdHost(RRDHOST *RH) : RH(RH) {
- AnomalyRateRS = rrdset_create(
- RH,
- "anomaly_detection",
- "anomaly_rates",
- NULL, // name
- "anomaly_rates",
- NULL, // ctx
- "Average anomaly rate",
- "anomaly rate",
- "netdata",
- "ml",
- 39189,
- Cfg.DBEngineAnomalyRateEvery,
- RRDSET_TYPE_LINE
- );
-
- AnomalyRateRS->flags = static_cast<RRDSET_FLAGS>(
- static_cast<int>(AnomalyRateRS->flags) | RRDSET_FLAG_HIDDEN
- );
- }
+ RrdHost(RRDHOST *RH) : RH(RH) {};
RRDHOST *getRH() { return RH; }
@@ -55,7 +34,6 @@ public:
protected:
RRDHOST *RH;
- RRDSET *AnomalyRateRS;
// Protect dimension and lock maps
std::mutex Mutex;
@@ -80,6 +58,8 @@ public:
memcpy(RU, &ResourceUsage, sizeof(struct rusage));
}
+ void getModelsAsJson(nlohmann::json &Json);
+
private:
std::pair<Dimension *, Duration<double>> findDimensionToTrain(const TimePoint &NowTP);
void trainDimension(Dimension *D, const TimePoint &NowTP);
@@ -95,16 +75,6 @@ public:
void startAnomalyDetectionThreads();
void stopAnomalyDetectionThreads();
- template<typename ...ArgTypes>
- bool getAnomalyInfo(ArgTypes&&... Args) {
- return DB.getAnomalyInfo(Args...);
- }
-
- template<typename ...ArgTypes>
- bool getAnomaliesInRange(ArgTypes&&... Args) {
- return DB.getAnomaliesInRange(Args...);
- }
-
void getDetectionInfoAsJson(nlohmann::json &Json) const;
private:
@@ -115,23 +85,12 @@ private:
std::thread TrainingThread;
std::thread DetectionThread;
- BitRateWindow BRW{
- static_cast<size_t>(Cfg.ADMinWindowSize),
- static_cast<size_t>(Cfg.ADMaxWindowSize),
- static_cast<size_t>(Cfg.ADIdleWindowSize),
- static_cast<size_t>(Cfg.ADMinWindowSize * Cfg.ADWindowRateThreshold)
- };
-
- CalculatedNumber WindowAnomalyRate{0.0};
+ CalculatedNumber HostAnomalyRate{0.0};
size_t NumAnomalousDimensions{0};
size_t NumNormalDimensions{0};
size_t NumTrainedDimensions{0};
size_t NumActiveDimensions{0};
-
- unsigned AnomalyRateTimer{0};
-
- Database DB{Cfg.AnomalyDBPath};
};
using Host = DetectableHost;
diff --git a/ml/KMeans.cc b/ml/KMeans.cc
new file mode 100644
index 000000000..edc2ef49e
--- /dev/null
+++ b/ml/KMeans.cc
@@ -0,0 +1,43 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+#include "KMeans.h"
+#include <dlib/clustering.h>
+
+void KMeans::train(const std::vector<DSample> &Samples, size_t MaxIterations) {
+ MinDist = std::numeric_limits<CalculatedNumber>::max();
+ MaxDist = std::numeric_limits<CalculatedNumber>::min();
+
+ ClusterCenters.clear();
+
+ dlib::pick_initial_centers(NumClusters, ClusterCenters, Samples);
+ dlib::find_clusters_using_kmeans(Samples, ClusterCenters, MaxIterations);
+
+ for (const auto &S : Samples) {
+ CalculatedNumber MeanDist = 0.0;
+
+ for (const auto &KMCenter : ClusterCenters)
+ MeanDist += dlib::length(KMCenter - S);
+
+ MeanDist /= NumClusters;
+
+ if (MeanDist < MinDist)
+ MinDist = MeanDist;
+
+ if (MeanDist > MaxDist)
+ MaxDist = MeanDist;
+ }
+}
+
+CalculatedNumber KMeans::anomalyScore(const DSample &Sample) const {
+ CalculatedNumber MeanDist = 0.0;
+ for (const auto &CC: ClusterCenters)
+ MeanDist += dlib::length(CC - Sample);
+
+ MeanDist /= NumClusters;
+
+ if (MaxDist == MinDist)
+ return 0.0;
+
+ CalculatedNumber AnomalyScore = 100.0 * std::abs((MeanDist - MinDist) / (MaxDist - MinDist));
+ return (AnomalyScore > 100.0) ? 100.0 : AnomalyScore;
+}
diff --git a/ml/kmeans/KMeans.h b/ml/KMeans.h
index 4ea3b6a89..0398eeb86 100644
--- a/ml/kmeans/KMeans.h
+++ b/ml/KMeans.h
@@ -9,6 +9,7 @@
#include <mutex>
#include "SamplesBuffer.h"
+#include "json/single_include/nlohmann/json.hpp"
class KMeans {
public:
@@ -17,8 +18,16 @@ public:
MaxDist = std::numeric_limits<CalculatedNumber>::min();
};
- void train(SamplesBuffer &SB, size_t MaxIterations);
- CalculatedNumber anomalyScore(SamplesBuffer &SB);
+ void train(const std::vector<DSample> &Samples, size_t MaxIterations);
+ CalculatedNumber anomalyScore(const DSample &Sample) const;
+
+ void toJson(nlohmann::json &J) const {
+ J = nlohmann::json{
+ {"CCs", ClusterCenters},
+ {"MinDist", MinDist},
+ {"MaxDist", MaxDist}
+ };
+ }
private:
size_t NumClusters;
@@ -27,8 +36,6 @@ private:
CalculatedNumber MinDist;
CalculatedNumber MaxDist;
-
- std::mutex Mutex;
};
#endif /* KMEANS_H */
diff --git a/ml/Makefile.am b/ml/Makefile.am
deleted file mode 100644
index 27449d659..000000000
--- a/ml/Makefile.am
+++ /dev/null
@@ -1,8 +0,0 @@
-# SPDX-License-Identifier: GPL-3.0-or-later
-
-AUTOMAKE_OPTIONS = subdir-objects
-MAINTAINERCLEANFILES = $(srcdir)/Makefile.in
-
-SUBDIRS = \
- kmeans \
- $(NULL)
diff --git a/ml/Query.h b/ml/Query.h
index 24c5fa384..78d117003 100644
--- a/ml/Query.h
+++ b/ml/Query.h
@@ -7,8 +7,8 @@ namespace ml {
class Query {
public:
- Query(RRDDIM *RD) : RD(RD) {
- Ops = &RD->tiers[0]->query_ops;
+ Query(RRDDIM *RD) : RD(RD), Initialized(false) {
+ Ops = RD->tiers[0]->query_ops;
}
time_t latestTime() {
@@ -20,27 +20,36 @@ public:
}
void init(time_t AfterT, time_t BeforeT) {
- Ops->init(RD->tiers[0]->db_metric_handle, &Handle, AfterT, BeforeT, TIER_QUERY_FETCH_SUM);
+ Ops->init(RD->tiers[0]->db_metric_handle, &Handle, AfterT, BeforeT);
+ Initialized = true;
+ points_read = 0;
}
bool isFinished() {
return Ops->is_finished(&Handle);
}
+ ~Query() {
+ if (Initialized) {
+ Ops->finalize(&Handle);
+ global_statistics_ml_query_completed(points_read);
+ points_read = 0;
+ }
+ }
+
std::pair<time_t, CalculatedNumber> nextMetric() {
+ points_read++;
STORAGE_POINT sp = Ops->next_metric(&Handle);
return { sp.start_time, sp.sum / sp.count };
}
- ~Query() {
- Ops->finalize(&Handle);
- }
-
private:
RRDDIM *RD;
+ bool Initialized;
+ size_t points_read;
- struct rrddim_query_ops *Ops;
- struct rrddim_query_handle Handle;
+ struct storage_engine_query_ops *Ops;
+ struct storage_engine_query_handle Handle;
};
} // namespace ml
diff --git a/ml/README.md b/ml/README.md
index 2578993e2..f6fd923ab 100644
--- a/ml/README.md
+++ b/ml/README.md
@@ -248,8 +248,6 @@ In terms of anomaly detection, the most interesting charts would be the `anomaly
- `anomaly_detection.dimensions`: Percentage of anomalous dimensions.
- `anomaly_detection.detector_window`: The length of the active window used by the detector.
- `anomaly_detection.detector_events`: Flags (0 or 1) to show when an anomaly event has been triggered by the detector.
-- `anomaly_detection.prediction_stats`: Diagnostic metrics relating to prediction time of anomaly detection.
-- `anomaly_detection.training_stats`: Diagnostic metrics relating to training time of anomaly detection.
Below is an example of how these charts may look in the presence of an anomaly event.
diff --git a/ml/kmeans/SamplesBuffer.cc b/ml/SamplesBuffer.cc
index d276c6e09..d276c6e09 100644
--- a/ml/kmeans/SamplesBuffer.cc
+++ b/ml/SamplesBuffer.cc
diff --git a/ml/kmeans/SamplesBuffer.h b/ml/SamplesBuffer.h
index 1c7215cca..1c7215cca 100644
--- a/ml/kmeans/SamplesBuffer.h
+++ b/ml/SamplesBuffer.h
diff --git a/ml/kmeans/Tests.cc b/ml/SamplesBufferTests.cc
index 0cb595945..5997a2a15 100644
--- a/ml/kmeans/Tests.cc
+++ b/ml/SamplesBufferTests.cc
@@ -36,7 +36,8 @@ TEST(SamplesBufferTest, NS_8_NDPS_1_DN_1_SN_3_LN_1) {
CNs[6] = 0.2684839023122384;
CNs[7] = 0.851332948637479;
- SamplesBuffer SB(CNs, NumSamples, NumDimsPerSample, DiffN, SmoothN, LagN);
+ std::vector<uint32_t> RandNums(NumSamples, std::numeric_limits<uint32_t>::max());
+ SamplesBuffer SB(CNs, NumSamples, NumDimsPerSample, DiffN, SmoothN, LagN, 1.0, RandNums);
SB.preprocess();
std::vector<Sample> Samples = SB.getPreprocessedSamples();
@@ -76,7 +77,8 @@ TEST(SamplesBufferTest, NS_8_NDPS_1_DN_2_SN_3_LN_2) {
CNs[6] = 0.15552559051428083;
CNs[7] = 0.6309750314597955;
- SamplesBuffer SB(CNs, NumSamples, NumDimsPerSample, DiffN, SmoothN, LagN);
+ std::vector<uint32_t> RandNums(NumSamples, std::numeric_limits<uint32_t>::max());
+ SamplesBuffer SB(CNs, NumSamples, NumDimsPerSample, DiffN, SmoothN, LagN, 1.0, RandNums);
SB.preprocess();
std::vector<Sample> Samples = SB.getPreprocessedSamples();
@@ -114,7 +116,8 @@ TEST(SamplesBufferTest, NS_8_NDPS_3_DN_2_SN_4_LN_1) {
CNs[18] = 0.9394494507088997; CNs[19] =0.17567223681734334; CNs[20] = 0.42732886195446984;
CNs[21] = 0.9460522396152958; CNs[22] =0.23462747016780894; CNs[23] = 0.35983249900892145;
- SamplesBuffer SB(CNs, NumSamples, NumDimsPerSample, DiffN, SmoothN, LagN);
+ std::vector<uint32_t> RandNums(NumSamples, std::numeric_limits<uint32_t>::max());
+ SamplesBuffer SB(CNs, NumSamples, NumDimsPerSample, DiffN, SmoothN, LagN, 1.0, RandNums);
SB.preprocess();
std::vector<Sample> Samples = SB.getPreprocessedSamples();
diff --git a/ml/Tests.cc b/ml/Tests.cc
deleted file mode 100644
index 7d369d48d..000000000
--- a/ml/Tests.cc
+++ /dev/null
@@ -1,301 +0,0 @@
-// SPDX-License-Identifier: GPL-3.0-or-later
-
-#include "BitBufferCounter.h"
-#include "BitRateWindow.h"
-
-#include "gtest/gtest.h"
-
-using namespace ml;
-
-TEST(BitBufferCounterTest, Cap_4) {
- size_t Capacity = 4;
- BitBufferCounter BBC(Capacity);
-
- // No bits set
- EXPECT_EQ(BBC.numSetBits(), 0);
-
- // All ones
- for (size_t Idx = 0; Idx != (2 * Capacity); Idx++) {
- BBC.insert(true);
-
- EXPECT_EQ(BBC.numSetBits(), std::min(Idx + 1, Capacity));
- }
-
- // All zeroes
- for (size_t Idx = 0; Idx != Capacity; Idx++) {
- BBC.insert(false);
-
- if (Idx < Capacity)
- EXPECT_EQ(BBC.numSetBits(), Capacity - (Idx + 1));
- else
- EXPECT_EQ(BBC.numSetBits(), 0);
- }
-
- // Even ones/zeroes
- for (size_t Idx = 0; Idx != (2 * Capacity); Idx++)
- BBC.insert(Idx % 2 == 0);
- EXPECT_EQ(BBC.numSetBits(), Capacity / 2);
-}
-
-using State = BitRateWindow::State;
-using Edge = BitRateWindow::Edge;
-using Result = std::pair<Edge, size_t>;
-
-TEST(BitRateWindowTest, Cycles) {
- /* Test the FSM by going through its two cycles:
- * 1) NotFilled -> AboveThreshold -> Idle -> NotFilled
- * 2) NotFilled -> BelowThreshold -> AboveThreshold -> Idle -> NotFilled
- *
- * Check the window's length on every new state transition.
- */
-
- size_t MinLength = 4, MaxLength = 6, IdleLength = 5;
- size_t SetBitsThreshold = 3;
-
- Result R;
- BitRateWindow BRW(MinLength, MaxLength, IdleLength, SetBitsThreshold);
-
- /*
- * 1st cycle
- */
-
- // NotFilled -> AboveThreshold
- R = BRW.insert(true);
- EXPECT_EQ(R.first, std::make_pair(State::NotFilled, State::NotFilled));
- R = BRW.insert(true);
- EXPECT_EQ(R.first, std::make_pair(State::NotFilled, State::NotFilled));
- R = BRW.insert(true);
- EXPECT_EQ(R.first, std::make_pair(State::NotFilled, State::NotFilled));
- R = BRW.insert(true);
- EXPECT_EQ(R.first, std::make_pair(State::NotFilled, State::AboveThreshold));
- EXPECT_EQ(R.second, MinLength);
-
- // AboveThreshold -> Idle
- R = BRW.insert(true);
- EXPECT_EQ(R.first, std::make_pair(State::AboveThreshold, State::AboveThreshold));
- R = BRW.insert(true);
- EXPECT_EQ(R.first, std::make_pair(State::AboveThreshold, State::AboveThreshold));
-
- R = BRW.insert(true);
- EXPECT_EQ(R.first, std::make_pair(State::AboveThreshold, State::Idle));
- EXPECT_EQ(R.second, MaxLength);
-
-
- // Idle -> NotFilled
- R = BRW.insert(true);
- EXPECT_EQ(R.first, std::make_pair(State::Idle, State::Idle));
- R = BRW.insert(true);
- EXPECT_EQ(R.first, std::make_pair(State::Idle, State::Idle));
- R = BRW.insert(true);
- EXPECT_EQ(R.first, std::make_pair(State::Idle, State::Idle));
- R = BRW.insert(true);
- EXPECT_EQ(R.first, std::make_pair(State::Idle, State::Idle));
- R = BRW.insert(true);
- EXPECT_EQ(R.first, std::make_pair(State::Idle, State::NotFilled));
- EXPECT_EQ(R.second, 1);
-
- // NotFilled -> AboveThreshold
- R = BRW.insert(true);
- EXPECT_EQ(R.first, std::make_pair(State::NotFilled, State::NotFilled));
- R = BRW.insert(true);
- EXPECT_EQ(R.first, std::make_pair(State::NotFilled, State::NotFilled));
- R = BRW.insert(true);
- EXPECT_EQ(R.first, std::make_pair(State::NotFilled, State::AboveThreshold));
- EXPECT_EQ(R.second, MinLength);
-
- /*
- * 2nd cycle
- */
-
- BRW = BitRateWindow(MinLength, MaxLength, IdleLength, SetBitsThreshold);
-
- // NotFilled -> BelowThreshold
- R = BRW.insert(false);
- EXPECT_EQ(R.first, std::make_pair(State::NotFilled, State::NotFilled));
- R = BRW.insert(false);
- EXPECT_EQ(R.first, std::make_pair(State::NotFilled, State::NotFilled));
- R = BRW.insert(false);
- EXPECT_EQ(R.first, std::make_pair(State::NotFilled, State::NotFilled));
- R = BRW.insert(false);
- EXPECT_EQ(R.first, std::make_pair(State::NotFilled, State::BelowThreshold));
- EXPECT_EQ(R.second, MinLength);
-
- // BelowThreshold -> BelowThreshold:
- // Check the state's self loop by adding set bits that will keep the
- // bit buffer below the specified threshold.
- //
- for (size_t Idx = 0; Idx != 2 * MaxLength; Idx++) {
- R = BRW.insert(Idx % 2 == 0);
- EXPECT_EQ(R.first, std::make_pair(State::BelowThreshold, State::BelowThreshold));
- EXPECT_EQ(R.second, MinLength);
- }
-
- // Verify that at the end of the loop the internal bit buffer contains
- // "1010". Do so by adding one set bit and checking that we remain below
- // the specified threshold.
- R = BRW.insert(true);
- EXPECT_EQ(R.first, std::make_pair(State::BelowThreshold, State::BelowThreshold));
- EXPECT_EQ(R.second, MinLength);
-
- // BelowThreshold -> AboveThreshold
- R = BRW.insert(true);
- EXPECT_EQ(R.first, std::make_pair(State::BelowThreshold, State::AboveThreshold));
- EXPECT_EQ(R.second, MinLength);
-
- // AboveThreshold -> Idle:
- // Do the transition without filling the max window size this time.
- R = BRW.insert(false);
- EXPECT_EQ(R.first, std::make_pair(State::AboveThreshold, State::Idle));
- EXPECT_EQ(R.second, MinLength);
-
- // Idle -> NotFilled
- R = BRW.insert(false);
- EXPECT_EQ(R.first, std::make_pair(State::Idle, State::Idle));
- R = BRW.insert(false);
- EXPECT_EQ(R.first, std::make_pair(State::Idle, State::Idle));
- R = BRW.insert(false);
- EXPECT_EQ(R.first, std::make_pair(State::Idle, State::Idle));
- R = BRW.insert(false);
- EXPECT_EQ(R.first, std::make_pair(State::Idle, State::Idle));
- R = BRW.insert(false);
- EXPECT_EQ(R.first, std::make_pair(State::Idle, State::NotFilled));
- EXPECT_EQ(R.second, 1);
-
- // NotFilled -> AboveThreshold
- R = BRW.insert(true);
- EXPECT_EQ(R.first, std::make_pair(State::NotFilled, State::NotFilled));
- R = BRW.insert(true);
- EXPECT_EQ(R.first, std::make_pair(State::NotFilled, State::NotFilled));
- R = BRW.insert(true);
- EXPECT_EQ(R.first, std::make_pair(State::NotFilled, State::AboveThreshold));
- EXPECT_EQ(R.second, MinLength);
-}
-
-TEST(BitRateWindowTest, ConsecutiveOnes) {
- size_t MinLength = 120, MaxLength = 240, IdleLength = 30;
- size_t SetBitsThreshold = 30;
-
- Result R;
- BitRateWindow BRW(MinLength, MaxLength, IdleLength, SetBitsThreshold);
-
- for (size_t Idx = 0; Idx != MaxLength; Idx++)
- R = BRW.insert(false);
- EXPECT_EQ(R.first, std::make_pair(State::BelowThreshold, State::BelowThreshold));
- EXPECT_EQ(R.second, MinLength);
-
- for (size_t Idx = 0; Idx != SetBitsThreshold; Idx++) {
- EXPECT_EQ(R.first, std::make_pair(State::BelowThreshold, State::BelowThreshold));
- R = BRW.insert(true);
- }
- EXPECT_EQ(R.first, std::make_pair(State::BelowThreshold, State::AboveThreshold));
- EXPECT_EQ(R.second, MinLength);
-
- // At this point the window's buffer contains:
- // (MinLength - SetBitsThreshold = 90) 0s, followed by
- // (SetBitsThreshold = 30) 1s.
- //
- // To go below the threshold, we need to add (90 + 1) more 0s in the window's
- // buffer. At that point, the the window's buffer will contain:
- // (SetBitsThreshold = 29) 1s, followed by
- // (MinLength - SetBitsThreshold = 91) 0s.
- //
- // Right before adding the last 0, we expect the window's length to be equal to 210,
- // because the bit buffer has gone through these bits:
- // (MinLength - SetBitsThreshold = 90) 0s, followed by
- // (SetBitsThreshold = 30) 1s, followed by
- // (MinLength - SetBitsThreshold = 90) 0s.
-
- for (size_t Idx = 0; Idx != (MinLength - SetBitsThreshold); Idx++) {
- R = BRW.insert(false);
- EXPECT_EQ(R.first, std::make_pair(State::AboveThreshold, State::AboveThreshold));
- }
- EXPECT_EQ(R.second, 2 * MinLength - SetBitsThreshold);
- R = BRW.insert(false);
- EXPECT_EQ(R.first, std::make_pair(State::AboveThreshold, State::Idle));
-
- // Continue with the Idle -> NotFilled edge.
- for (size_t Idx = 0; Idx != IdleLength - 1; Idx++) {
- R = BRW.insert(false);
- EXPECT_EQ(R.first, std::make_pair(State::Idle, State::Idle));
- }
- R = BRW.insert(false);
- EXPECT_EQ(R.first, std::make_pair(State::Idle, State::NotFilled));
- EXPECT_EQ(R.second, 1);
-}
-
-TEST(BitRateWindowTest, WithHoles) {
- size_t MinLength = 120, MaxLength = 240, IdleLength = 30;
- size_t SetBitsThreshold = 30;
-
- Result R;
- BitRateWindow BRW(MinLength, MaxLength, IdleLength, SetBitsThreshold);
-
- for (size_t Idx = 0; Idx != MaxLength; Idx++)
- R = BRW.insert(false);
-
- for (size_t Idx = 0; Idx != SetBitsThreshold / 3; Idx++)
- R = BRW.insert(true);
- for (size_t Idx = 0; Idx != SetBitsThreshold / 3; Idx++)
- R = BRW.insert(false);
- for (size_t Idx = 0; Idx != SetBitsThreshold / 3; Idx++)
- R = BRW.insert(true);
- for (size_t Idx = 0; Idx != SetBitsThreshold / 3; Idx++)
- R = BRW.insert(false);
- for (size_t Idx = 0; Idx != SetBitsThreshold / 3; Idx++)
- R = BRW.insert(true);
-
- EXPECT_EQ(R.first, std::make_pair(State::BelowThreshold, State::AboveThreshold));
- EXPECT_EQ(R.second, MinLength);
-
- // The window's bit buffer contains:
- // 70 0s, 10 1s, 10 0s, 10 1s, 10 0s, 10 1s.
- // Where: 70 = MinLength - (5 / 3) * SetBitsThresholds, ie. we need
- // to add (70 + 1) more zeros to make the bit buffer go below the
- // threshold and then the window's length should be:
- // 70 + 50 + 70 = 190.
-
- BitRateWindow::Edge E;
- do {
- R = BRW.insert(false);
- E = R.first;
- } while (E.first != State::AboveThreshold || E.second != State::Idle);
- EXPECT_EQ(R.second, 2 * MinLength - (5 * SetBitsThreshold) / 3);
-}
-
-TEST(BitRateWindowTest, MinWindow) {
- size_t MinLength = 120, MaxLength = 240, IdleLength = 30;
- size_t SetBitsThreshold = 30;
-
- Result R;
- BitRateWindow BRW(MinLength, MaxLength, IdleLength, SetBitsThreshold);
-
- BRW.insert(true);
- BRW.insert(false);
- for (size_t Idx = 2; Idx != SetBitsThreshold; Idx++)
- BRW.insert(true);
- for (size_t Idx = SetBitsThreshold; Idx != MinLength - 1; Idx++)
- BRW.insert(false);
-
- R = BRW.insert(true);
- EXPECT_EQ(R.first, std::make_pair(State::NotFilled, State::AboveThreshold));
- EXPECT_EQ(R.second, MinLength);
-
- R = BRW.insert(false);
- EXPECT_EQ(R.first, std::make_pair(State::AboveThreshold, State::Idle));
-}
-
-TEST(BitRateWindowTest, MaxWindow) {
- size_t MinLength = 100, MaxLength = 200, IdleLength = 30;
- size_t SetBitsThreshold = 50;
-
- Result R;
- BitRateWindow BRW(MinLength, MaxLength, IdleLength, SetBitsThreshold);
-
- for (size_t Idx = 0; Idx != MaxLength; Idx++)
- R = BRW.insert(Idx % 2 == 0);
- EXPECT_EQ(R.first, std::make_pair(State::AboveThreshold, State::AboveThreshold));
- EXPECT_EQ(R.second, MaxLength);
-
- R = BRW.insert(false);
- EXPECT_EQ(R.first, std::make_pair(State::AboveThreshold, State::Idle));
-}
diff --git a/ml/kmeans/KMeans.cc b/ml/kmeans/KMeans.cc
deleted file mode 100644
index e66c66c16..000000000
--- a/ml/kmeans/KMeans.cc
+++ /dev/null
@@ -1,55 +0,0 @@
-// SPDX-License-Identifier: GPL-3.0-or-later
-
-#include "KMeans.h"
-#include <dlib/clustering.h>
-
-void KMeans::train(SamplesBuffer &SB, size_t MaxIterations) {
- std::vector<DSample> Samples = SB.preprocess();
-
- MinDist = std::numeric_limits<CalculatedNumber>::max();
- MaxDist = std::numeric_limits<CalculatedNumber>::min();
-
- {
- std::lock_guard<std::mutex> Lock(Mutex);
-
- ClusterCenters.clear();
-
- dlib::pick_initial_centers(NumClusters, ClusterCenters, Samples);
- dlib::find_clusters_using_kmeans(Samples, ClusterCenters, MaxIterations);
-
- for (const auto &S : Samples) {
- CalculatedNumber MeanDist = 0.0;
-
- for (const auto &KMCenter : ClusterCenters)
- MeanDist += dlib::length(KMCenter - S);
-
- MeanDist /= NumClusters;
-
- if (MeanDist < MinDist)
- MinDist = MeanDist;
-
- if (MeanDist > MaxDist)
- MaxDist = MeanDist;
- }
- }
-}
-
-CalculatedNumber KMeans::anomalyScore(SamplesBuffer &SB) {
- std::vector<DSample> DSamples = SB.preprocess();
-
- std::unique_lock<std::mutex> Lock(Mutex, std::defer_lock);
- if (!Lock.try_lock())
- return std::numeric_limits<CalculatedNumber>::quiet_NaN();
-
- CalculatedNumber MeanDist = 0.0;
- for (const auto &CC: ClusterCenters)
- MeanDist += dlib::length(CC - DSamples.back());
-
- MeanDist /= NumClusters;
-
- if (MaxDist == MinDist)
- return 0.0;
-
- CalculatedNumber AnomalyScore = 100.0 * std::abs((MeanDist - MinDist) / (MaxDist - MinDist));
- return (AnomalyScore > 100.0) ? 100.0 : AnomalyScore;
-}
diff --git a/ml/kmeans/Makefile.am b/ml/kmeans/Makefile.am
deleted file mode 100644
index babdcf0df..000000000
--- a/ml/kmeans/Makefile.am
+++ /dev/null
@@ -1,4 +0,0 @@
-# SPDX-License-Identifier: GPL-3.0-or-later
-
-AUTOMAKE_OPTIONS = subdir-objects
-MAINTAINERCLEANFILES = $(srcdir)/Makefile.in
diff --git a/ml/ml-dummy.c b/ml/ml-dummy.c
index fad480f4b..492dfe2fc 100644
--- a/ml/ml-dummy.c
+++ b/ml/ml-dummy.c
@@ -24,38 +24,23 @@ char *ml_get_host_info(RRDHOST *RH) {
return NULL;
}
-void ml_new_dimension(RRDDIM *RD) { (void) RD; }
-
-void ml_delete_dimension(RRDDIM *RD) { (void) RD; }
-
-bool ml_is_anomalous(RRDDIM *RD, double Value, bool Exists) {
- (void) RD; (void) Value; (void) Exists;
- return false;
-}
-
-char *ml_get_anomaly_events(RRDHOST *RH, const char *AnomalyDetectorName,
- int AnomalyDetectorVersion, time_t After, time_t Before) {
- (void) RH; (void) AnomalyDetectorName;
- (void) AnomalyDetectorVersion; (void) After; (void) Before;
+char *ml_get_host_runtime_info(RRDHOST *RH) {
+ (void) RH;
return NULL;
}
-char *ml_get_anomaly_event_info(RRDHOST *RH, const char *AnomalyDetectorName,
- int AnomalyDetectorVersion, time_t After, time_t Before) {
- (void) RH; (void) AnomalyDetectorName;
- (void) AnomalyDetectorVersion; (void) After; (void) Before;
+char *ml_get_host_models(RRDHOST *RH) {
+ (void) RH;
return NULL;
}
-void ml_process_rrdr(RRDR *R, int MaxAnomalyRates) {
- (void) R;
- (void) MaxAnomalyRates;
-}
+void ml_new_dimension(RRDDIM *RD) { (void) RD; }
-void ml_dimension_update_name(RRDSET *RS, RRDDIM *RD, const char *name) {
- (void) RS;
- (void) RD;
- (void) name;
+void ml_delete_dimension(RRDDIM *RD) { (void) RD; }
+
+bool ml_is_anomalous(RRDDIM *RD, double Value, bool Exists) {
+ (void) RD; (void) Value; (void) Exists;
+ return false;
}
bool ml_streaming_enabled() {
diff --git a/ml/ml-private.h b/ml/ml-private.h
index 7b3e00684..2bd72ac5a 100644
--- a/ml/ml-private.h
+++ b/ml/ml-private.h
@@ -3,7 +3,7 @@
#ifndef ML_PRIVATE_H
#define ML_PRIVATE_H
-#include "kmeans/KMeans.h"
+#include "KMeans.h"
#include "ml/ml.h"
#include <chrono>
diff --git a/ml/ml.cc b/ml/ml.cc
index 7275d88b8..1a7d6ae25 100644
--- a/ml/ml.cc
+++ b/ml/ml.cc
@@ -16,7 +16,7 @@ bool ml_enabled(RRDHOST *RH) {
if (!Cfg.EnableAnomalyDetection)
return false;
- if (simple_pattern_matches(Cfg.SP_HostsToSkip, RH->hostname))
+ if (simple_pattern_matches(Cfg.SP_HostsToSkip, rrdhost_hostname(RH)))
return false;
return true;
@@ -76,7 +76,7 @@ void ml_new_dimension(RRDDIM *RD) {
if (static_cast<unsigned>(RD->update_every) != H->updateEvery())
return;
- if (simple_pattern_matches(Cfg.SP_ChartsToSkip, RS->name))
+ if (simple_pattern_matches(Cfg.SP_ChartsToSkip, rrdset_name(RS)))
return;
Dimension *D = new Dimension(RD);
@@ -108,7 +108,7 @@ char *ml_get_host_info(RRDHOST *RH) {
ConfigJson["enabled"] = false;
}
- return strdup(ConfigJson.dump(2, '\t').c_str());
+ return strdupz(ConfigJson.dump(2, '\t').c_str());
}
char *ml_get_host_runtime_info(RRDHOST *RH) {
@@ -124,97 +124,24 @@ char *ml_get_host_runtime_info(RRDHOST *RH) {
return strdup(ConfigJson.dump(1, '\t').c_str());
}
-bool ml_is_anomalous(RRDDIM *RD, double Value, bool Exists) {
- Dimension *D = static_cast<Dimension *>(RD->ml_dimension);
- if (!D)
- return false;
-
- D->addValue(Value, Exists);
- bool Result = D->predict().second;
- return Result;
-}
-
-char *ml_get_anomaly_events(RRDHOST *RH, const char *AnomalyDetectorName,
- int AnomalyDetectorVersion, time_t After, time_t Before) {
- if (!RH || !RH->ml_host) {
- error("No host");
- return nullptr;
- }
-
- Host *H = static_cast<Host *>(RH->ml_host);
- std::vector<std::pair<time_t, time_t>> TimeRanges;
-
- bool Res = H->getAnomaliesInRange(TimeRanges, AnomalyDetectorName,
- AnomalyDetectorVersion,
- H->getUUID(),
- After, Before);
- if (!Res) {
- error("DB result is empty");
- return nullptr;
- }
-
- nlohmann::json Json = TimeRanges;
- return strdup(Json.dump(4).c_str());
-}
-
-char *ml_get_anomaly_event_info(RRDHOST *RH, const char *AnomalyDetectorName,
- int AnomalyDetectorVersion, time_t After, time_t Before) {
- if (!RH || !RH->ml_host) {
- error("No host");
- return nullptr;
- }
-
- Host *H = static_cast<Host *>(RH->ml_host);
+char *ml_get_host_models(RRDHOST *RH) {
+ nlohmann::json ModelsJson;
- nlohmann::json Json;
- bool Res = H->getAnomalyInfo(Json, AnomalyDetectorName,
- AnomalyDetectorVersion,
- H->getUUID(),
- After, Before);
- if (!Res) {
- error("DB result is empty");
- return nullptr;
+ if (RH && RH->ml_host) {
+ Host *H = static_cast<Host *>(RH->ml_host);
+ H->getModelsAsJson(ModelsJson);
+ return strdup(ModelsJson.dump(2, '\t').c_str());
}
- return strdup(Json.dump(4, '\t').c_str());
-}
-
-void ml_process_rrdr(RRDR *R, int MaxAnomalyRates) {
- if (R->rows != 1)
- return;
-
- if (MaxAnomalyRates < 1 || MaxAnomalyRates >= R->d)
- return;
-
- NETDATA_DOUBLE *CNs = R->v;
- RRDR_DIMENSION_FLAGS *DimFlags = R->od;
-
- std::vector<std::pair<NETDATA_DOUBLE, int>> V;
-
- V.reserve(R->d);
- for (int Idx = 0; Idx != R->d; Idx++)
- V.emplace_back(CNs[Idx], Idx);
-
- std::sort(V.rbegin(), V.rend());
-
- for (int Idx = MaxAnomalyRates; Idx != R->d; Idx++) {
- int UnsortedIdx = V[Idx].second;
-
- int OldFlags = static_cast<int>(DimFlags[UnsortedIdx]);
- int NewFlags = OldFlags | RRDR_DIMENSION_HIDDEN;
-
- DimFlags[UnsortedIdx] = static_cast<rrdr_dimension_flag>(NewFlags);
- }
+ return nullptr;
}
-void ml_dimension_update_name(RRDSET *RS, RRDDIM *RD, const char *Name) {
- (void) RS;
-
+bool ml_is_anomalous(RRDDIM *RD, double Value, bool Exists) {
Dimension *D = static_cast<Dimension *>(RD->ml_dimension);
if (!D)
- return;
+ return false;
- D->setAnomalyRateRDName(Name);
+ return D->predict(Value, Exists);
}
bool ml_streaming_enabled() {
diff --git a/ml/ml.h b/ml/ml.h
index e07eb094e..8e62c4988 100644
--- a/ml/ml.h
+++ b/ml/ml.h
@@ -12,7 +12,7 @@ extern "C" {
// This is a DBEngine function redeclared here so that we can free
// the anomaly rate dimension, whenever its backing dimension is freed.
-extern void rrddim_free(RRDSET *st, RRDDIM *rd);
+void rrddim_free(RRDSET *st, RRDDIM *rd);
typedef void* ml_host_t;
typedef void* ml_dimension_t;
@@ -28,22 +28,13 @@ void ml_delete_host(RRDHOST *RH);
char *ml_get_host_info(RRDHOST *RH);
char *ml_get_host_runtime_info(RRDHOST *RH);
+char *ml_get_host_models(RRDHOST *RH);
void ml_new_dimension(RRDDIM *RD);
void ml_delete_dimension(RRDDIM *RD);
bool ml_is_anomalous(RRDDIM *RD, double value, bool exists);
-char *ml_get_anomaly_events(RRDHOST *RH, const char *AnomalyDetectorName,
- int AnomalyDetectorVersion, time_t After, time_t Before);
-
-char *ml_get_anomaly_event_info(RRDHOST *RH, const char *AnomalyDetectorName,
- int AnomalyDetectorVersion, time_t After, time_t Before);
-
-void ml_process_rrdr(RRDR *R, int MaxAnomalyRates);
-
-void ml_dimension_update_name(RRDSET *RS, RRDDIM *RD, const char *name);
-
bool ml_streaming_enabled();
#define ML_ANOMALY_RATES_CHART_ID "anomaly_detection.anomaly_rates"