diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2022-11-30 18:47:05 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2022-11-30 18:47:05 +0000 |
commit | 97e01009d69b8fbebfebf68f51e3d126d0ed43fc (patch) | |
tree | 02e8b836c3a9d89806f3e67d4a5fe9f52dbb0061 /ml | |
parent | Releasing debian version 1.36.1-1. (diff) | |
download | netdata-97e01009d69b8fbebfebf68f51e3d126d0ed43fc.tar.xz netdata-97e01009d69b8fbebfebf68f51e3d126d0ed43fc.zip |
Merging upstream version 1.37.0.
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'ml')
-rw-r--r-- | ml/ADCharts.cc | 233 | ||||
-rw-r--r-- | ml/ADCharts.h | 23 | ||||
-rw-r--r-- | ml/BitBufferCounter.cc | 29 | ||||
-rw-r--r-- | ml/BitBufferCounter.h | 54 | ||||
-rw-r--r-- | ml/BitRateWindow.cc | 75 | ||||
-rw-r--r-- | ml/BitRateWindow.h | 170 | ||||
-rw-r--r-- | ml/Config.cc | 52 | ||||
-rw-r--r-- | ml/Config.h | 11 | ||||
-rw-r--r-- | ml/Database.cc | 127 | ||||
-rw-r--r-- | ml/Database.h | 131 | ||||
-rw-r--r-- | ml/Dimension.cc | 76 | ||||
-rw-r--r-- | ml/Dimension.h | 165 | ||||
-rw-r--r-- | ml/Host.cc | 374 | ||||
-rw-r--r-- | ml/Host.h | 51 | ||||
-rw-r--r-- | ml/KMeans.cc | 43 | ||||
-rw-r--r-- | ml/KMeans.h (renamed from ml/kmeans/KMeans.h) | 15 | ||||
-rw-r--r-- | ml/Makefile.am | 8 | ||||
-rw-r--r-- | ml/Query.h | 27 | ||||
-rw-r--r-- | ml/README.md | 2 | ||||
-rw-r--r-- | ml/SamplesBuffer.cc (renamed from ml/kmeans/SamplesBuffer.cc) | 0 | ||||
-rw-r--r-- | ml/SamplesBuffer.h (renamed from ml/kmeans/SamplesBuffer.h) | 0 | ||||
-rw-r--r-- | ml/SamplesBufferTests.cc (renamed from ml/kmeans/Tests.cc) | 9 | ||||
-rw-r--r-- | ml/Tests.cc | 301 | ||||
-rw-r--r-- | ml/kmeans/KMeans.cc | 55 | ||||
-rw-r--r-- | ml/kmeans/Makefile.am | 4 | ||||
-rw-r--r-- | ml/ml-dummy.c | 35 | ||||
-rw-r--r-- | ml/ml-private.h | 2 | ||||
-rw-r--r-- | ml/ml.cc | 99 | ||||
-rw-r--r-- | ml/ml.h | 13 |
29 files changed, 527 insertions, 1657 deletions
diff --git a/ml/ADCharts.cc b/ml/ADCharts.cc new file mode 100644 index 000000000..00c593c0c --- /dev/null +++ b/ml/ADCharts.cc @@ -0,0 +1,233 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +#include "ADCharts.h" +#include "Config.h" + +void ml::updateDimensionsChart(RRDHOST *RH, + collected_number NumTrainedDimensions, + collected_number NumNormalDimensions, + collected_number NumAnomalousDimensions) { + static thread_local RRDSET *RS = nullptr; + static thread_local RRDDIM *NumTotalDimensionsRD = nullptr; + static thread_local RRDDIM *NumTrainedDimensionsRD = nullptr; + static thread_local RRDDIM *NumNormalDimensionsRD = nullptr; + static thread_local RRDDIM *NumAnomalousDimensionsRD = nullptr; + + if (!RS) { + std::stringstream IdSS, NameSS; + + IdSS << "dimensions_on_" << localhost->machine_guid; + NameSS << "dimensions_on_" << localhost->hostname; + + RS = rrdset_create( + RH, + "anomaly_detection", // type + IdSS.str().c_str(), // id + NameSS.str().c_str(), // name + "dimensions", // family + "anomaly_detection.dimensions", // ctx + "Anomaly detection dimensions", // title + "dimensions", // units + "netdata", // plugin + "ml", // module + 39183, // priority + RH->rrd_update_every, // update_every + RRDSET_TYPE_LINE // chart_type + ); + rrdset_flag_set(RS, RRDSET_FLAG_ANOMALY_DETECTION); + + NumTotalDimensionsRD = rrddim_add(RS, "total", NULL, + 1, 1, RRD_ALGORITHM_ABSOLUTE); + NumTrainedDimensionsRD = rrddim_add(RS, "trained", NULL, + 1, 1, RRD_ALGORITHM_ABSOLUTE); + NumNormalDimensionsRD = rrddim_add(RS, "normal", NULL, + 1, 1, RRD_ALGORITHM_ABSOLUTE); + NumAnomalousDimensionsRD = rrddim_add(RS, "anomalous", NULL, + 1, 1, RRD_ALGORITHM_ABSOLUTE); + } + + rrddim_set_by_pointer(RS, NumTotalDimensionsRD, NumNormalDimensions + NumAnomalousDimensions); + rrddim_set_by_pointer(RS, NumTrainedDimensionsRD, NumTrainedDimensions); + rrddim_set_by_pointer(RS, NumNormalDimensionsRD, NumNormalDimensions); + rrddim_set_by_pointer(RS, NumAnomalousDimensionsRD, NumAnomalousDimensions); + + rrdset_done(RS); +} + +void ml::updateHostAndDetectionRateCharts(RRDHOST *RH, collected_number AnomalyRate) { + static thread_local RRDSET *HostRateRS = nullptr; + static thread_local RRDDIM *AnomalyRateRD = nullptr; + + if (!HostRateRS) { + std::stringstream IdSS, NameSS; + + IdSS << "anomaly_rate_on_" << localhost->machine_guid; + NameSS << "anomaly_rate_on_" << localhost->hostname; + + HostRateRS = rrdset_create( + RH, + "anomaly_detection", // type + IdSS.str().c_str(), // id + NameSS.str().c_str(), // name + "anomaly_rate", // family + "anomaly_detection.anomaly_rate", // ctx + "Percentage of anomalous dimensions", // title + "percentage", // units + "netdata", // plugin + "ml", // module + 39184, // priority + RH->rrd_update_every, // update_every + RRDSET_TYPE_LINE // chart_type + ); + rrdset_flag_set(HostRateRS, RRDSET_FLAG_ANOMALY_DETECTION); + + AnomalyRateRD = rrddim_add(HostRateRS, "anomaly_rate", NULL, + 1, 100, RRD_ALGORITHM_ABSOLUTE); + } + + rrddim_set_by_pointer(HostRateRS, AnomalyRateRD, AnomalyRate); + rrdset_done(HostRateRS); + + static thread_local RRDSET *AnomalyDetectionRS = nullptr; + static thread_local RRDDIM *AboveThresholdRD = nullptr; + static thread_local RRDDIM *NewAnomalyEventRD = nullptr; + + if (!AnomalyDetectionRS) { + std::stringstream IdSS, NameSS; + + IdSS << "anomaly_detection_on_" << localhost->machine_guid; + NameSS << "anomaly_detection_on_" << localhost->hostname; + + AnomalyDetectionRS = rrdset_create( + RH, + "anomaly_detection", // type + IdSS.str().c_str(), // id + NameSS.str().c_str(), // name + "anomaly_detection", // family + "anomaly_detection.detector_events", // ctx + "Anomaly detection events", // title + "percentage", // units + "netdata", // plugin + "ml", // module + 39185, // priority + RH->rrd_update_every, // update_every + RRDSET_TYPE_LINE // chart_type + ); + rrdset_flag_set(AnomalyDetectionRS, RRDSET_FLAG_ANOMALY_DETECTION); + + AboveThresholdRD = rrddim_add(AnomalyDetectionRS, "above_threshold", NULL, + 1, 1, RRD_ALGORITHM_ABSOLUTE); + NewAnomalyEventRD = rrddim_add(AnomalyDetectionRS, "new_anomaly_event", NULL, + 1, 1, RRD_ALGORITHM_ABSOLUTE); + } + + /* + * Compute the values of the dimensions based on the host rate chart + */ + ONEWAYALLOC *OWA = onewayalloc_create(0); + time_t Now = now_realtime_sec(); + time_t Before = Now - RH->rrd_update_every; + time_t After = Before - Cfg.AnomalyDetectionQueryDuration; + RRDR_OPTIONS Options = static_cast<RRDR_OPTIONS>(0x00000000); + + RRDR *R = rrd2rrdr_legacy( + OWA, HostRateRS, + 1 /* points wanted */, + After, + Before, + Cfg.AnomalyDetectionGroupingMethod, + 0 /* resampling time */, + Options, "anomaly_rate", + NULL /* group options */, + 0, /* timeout */ + 0, /* tier */ + QUERY_SOURCE_ML + ); + if(R) { + assert(R->d == 1 && R->n == 1 && R->rows == 1); + + static thread_local bool PrevAboveThreshold = false; + bool AboveThreshold = R->v[0] >= Cfg.HostAnomalyRateThreshold; + bool NewAnomalyEvent = AboveThreshold && !PrevAboveThreshold; + PrevAboveThreshold = AboveThreshold; + + rrddim_set_by_pointer(AnomalyDetectionRS, AboveThresholdRD, AboveThreshold); + rrddim_set_by_pointer(AnomalyDetectionRS, NewAnomalyEventRD, NewAnomalyEvent); + rrdset_done(AnomalyDetectionRS); + + rrdr_free(OWA, R); + } + onewayalloc_destroy(OWA); +} + +void ml::updateDetectionChart(RRDHOST *RH) { + static thread_local RRDSET *RS = nullptr; + static thread_local RRDDIM *UserRD, *SystemRD = nullptr; + + if (!RS) { + std::stringstream IdSS, NameSS; + + IdSS << "prediction_stats_" << RH->machine_guid; + NameSS << "prediction_stats_for_" << RH->hostname; + + RS = rrdset_create_localhost( + "netdata", // type + IdSS.str().c_str(), // id + NameSS.str().c_str(), // name + "ml", // family + "netdata.prediction_stats", // ctx + "Prediction thread CPU usage", // title + "milliseconds/s", // units + "netdata", // plugin + "ml", // module + 136000, // priority + RH->rrd_update_every, // update_every + RRDSET_TYPE_STACKED // chart_type + ); + + UserRD = rrddim_add(RS, "user", NULL, 1, 1000, RRD_ALGORITHM_INCREMENTAL); + SystemRD = rrddim_add(RS, "system", NULL, 1, 1000, RRD_ALGORITHM_INCREMENTAL); + } + + struct rusage TRU; + getrusage(RUSAGE_THREAD, &TRU); + + rrddim_set_by_pointer(RS, UserRD, TRU.ru_utime.tv_sec * 1000000ULL + TRU.ru_utime.tv_usec); + rrddim_set_by_pointer(RS, SystemRD, TRU.ru_stime.tv_sec * 1000000ULL + TRU.ru_stime.tv_usec); + rrdset_done(RS); +} + +void ml::updateTrainingChart(RRDHOST *RH, struct rusage *TRU) { + static thread_local RRDSET *RS = nullptr; + static thread_local RRDDIM *UserRD = nullptr; + static thread_local RRDDIM *SystemRD = nullptr; + + if (!RS) { + std::stringstream IdSS, NameSS; + + IdSS << "training_stats_" << RH->machine_guid; + NameSS << "training_stats_for_" << RH->hostname; + + RS = rrdset_create_localhost( + "netdata", // type + IdSS.str().c_str(), // id + NameSS.str().c_str(), // name + "ml", // family + "netdata.training_stats", // ctx + "Training thread CPU usage", // title + "milliseconds/s", // units + "netdata", // plugin + "ml", // module + 136001, // priority + RH->rrd_update_every, // update_every + RRDSET_TYPE_STACKED // chart_type + ); + + UserRD = rrddim_add(RS, "user", NULL, 1, 1000, RRD_ALGORITHM_INCREMENTAL); + SystemRD = rrddim_add(RS, "system", NULL, 1, 1000, RRD_ALGORITHM_INCREMENTAL); + } + + rrddim_set_by_pointer(RS, UserRD, TRU->ru_utime.tv_sec * 1000000ULL + TRU->ru_utime.tv_usec); + rrddim_set_by_pointer(RS, SystemRD, TRU->ru_stime.tv_sec * 1000000ULL + TRU->ru_stime.tv_usec); + rrdset_done(RS); +} diff --git a/ml/ADCharts.h b/ml/ADCharts.h new file mode 100644 index 000000000..0be324f7d --- /dev/null +++ b/ml/ADCharts.h @@ -0,0 +1,23 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +#ifndef ML_ADCHARTS_H +#define ML_ADCHARTS_H + +#include "ml-private.h" + +namespace ml { + +void updateDimensionsChart(RRDHOST *RH, + collected_number NumTrainedDimensions, + collected_number NumNormalDimensions, + collected_number NumAnomalousDimensions); + +void updateHostAndDetectionRateCharts(RRDHOST *RH, collected_number AnomalyRate); + +void updateDetectionChart(RRDHOST *RH); + +void updateTrainingChart(RRDHOST *RH, struct rusage *TRU); + +} // namespace ml + +#endif /* ML_ADCHARTS_H */ diff --git a/ml/BitBufferCounter.cc b/ml/BitBufferCounter.cc deleted file mode 100644 index 5e1ab5aca..000000000 --- a/ml/BitBufferCounter.cc +++ /dev/null @@ -1,29 +0,0 @@ -// SPDX-License-Identifier: GPL-3.0-or-later - -#include "BitBufferCounter.h" - -using namespace ml; - -std::vector<bool> BitBufferCounter::getBuffer() const { - std::vector<bool> Buffer; - - for (size_t Idx = start(); Idx != (start() + size()); Idx++) - Buffer.push_back(V[Idx % V.size()]); - - return Buffer; -} - -void BitBufferCounter::insert(bool Bit) { - if (N >= V.size()) - NumSetBits -= (V[start()] == true); - - NumSetBits += (Bit == true); - V[N++ % V.size()] = Bit; -} - -void BitBufferCounter::print(std::ostream &OS) const { - std::vector<bool> Buffer = getBuffer(); - - for (bool B : Buffer) - OS << B; -} diff --git a/ml/BitBufferCounter.h b/ml/BitBufferCounter.h deleted file mode 100644 index db924d776..000000000 --- a/ml/BitBufferCounter.h +++ /dev/null @@ -1,54 +0,0 @@ -// SPDX-License-Identifier: GPL-3.0-or-later - -#ifndef BIT_BUFFER_COUNTER_H -#define BIT_BUFFER_COUNTER_H - -#include "ml-private.h" - -namespace ml { - -class BitBufferCounter { -public: - BitBufferCounter(size_t Capacity) : V(Capacity, 0), NumSetBits(0), N(0) {} - - std::vector<bool> getBuffer() const; - - void insert(bool Bit); - - void print(std::ostream &OS) const; - - bool isFilled() const { - return N >= V.size(); - } - - size_t numSetBits() const { - return NumSetBits; - } - -private: - inline size_t size() const { - return N < V.size() ? N : V.size(); - } - - inline size_t start() const { - if (N <= V.size()) - return 0; - - return N % V.size(); - } - -private: - std::vector<bool> V; - size_t NumSetBits; - - size_t N; -}; - -} // namespace ml - -inline std::ostream& operator<<(std::ostream &OS, const ml::BitBufferCounter &BBC) { - BBC.print(OS); - return OS; -} - -#endif /* BIT_BUFFER_COUNTER_H */ diff --git a/ml/BitRateWindow.cc b/ml/BitRateWindow.cc deleted file mode 100644 index c4c994c42..000000000 --- a/ml/BitRateWindow.cc +++ /dev/null @@ -1,75 +0,0 @@ -// SPDX-License-Identifier: GPL-3.0-or-later - -#include "BitRateWindow.h" - -using namespace ml; - -std::pair<BitRateWindow::Edge, size_t> BitRateWindow::insert(bool Bit) { - Edge E; - - BBC.insert(Bit); - switch (CurrState) { - case State::NotFilled: { - if (BBC.isFilled()) { - if (BBC.numSetBits() < SetBitsThreshold) { - CurrState = State::BelowThreshold; - } else { - CurrState = State::AboveThreshold; - } - } else { - CurrState = State::NotFilled; - } - - E = {State::NotFilled, CurrState}; - break; - } case State::BelowThreshold: { - if (BBC.numSetBits() >= SetBitsThreshold) { - CurrState = State::AboveThreshold; - } - - E = {State::BelowThreshold, CurrState}; - break; - } case State::AboveThreshold: { - if ((BBC.numSetBits() < SetBitsThreshold) || - (CurrLength == MaxLength)) { - CurrState = State::Idle; - } - - E = {State::AboveThreshold, CurrState}; - break; - } case State::Idle: { - if (CurrLength == IdleLength) { - CurrState = State::NotFilled; - } - - E = {State::Idle, CurrState}; - break; - } - } - - Action A = EdgeActions[E]; - size_t L = (this->*A)(E.first, Bit); - return {E, L}; -} - -void BitRateWindow::print(std::ostream &OS) const { - switch (CurrState) { - case State::NotFilled: - OS << "NotFilled"; - break; - case State::BelowThreshold: - OS << "BelowThreshold"; - break; - case State::AboveThreshold: - OS << "AboveThreshold"; - break; - case State::Idle: - OS << "Idle"; - break; - default: - OS << "UnknownState"; - break; - } - - OS << ": " << BBC << " (Current Length: " << CurrLength << ")"; -} diff --git a/ml/BitRateWindow.h b/ml/BitRateWindow.h deleted file mode 100644 index 0d99008b8..000000000 --- a/ml/BitRateWindow.h +++ /dev/null @@ -1,170 +0,0 @@ -// SPDX-License-Identifier: GPL-3.0-or-later - -#ifndef BIT_RATE_WINDOW_H -#define BIT_RATE_WINDOW_H - -#include "BitBufferCounter.h" -#include "ml-private.h" - -namespace ml { - -class BitRateWindow { -public: - enum class State { - NotFilled, - BelowThreshold, - AboveThreshold, - Idle - }; - - using Edge = std::pair<State, State>; - using Action = size_t (BitRateWindow::*)(State PrevState, bool NewBit); - -private: - std::map<Edge, Action> EdgeActions = { - // From == To - { - Edge(State::NotFilled, State::NotFilled), - &BitRateWindow::onRoundtripNotFilled, - }, - { - Edge(State::BelowThreshold, State::BelowThreshold), - &BitRateWindow::onRoundtripBelowThreshold, - }, - { - Edge(State::AboveThreshold, State::AboveThreshold), - &BitRateWindow::onRoundtripAboveThreshold, - }, - { - Edge(State::Idle, State::Idle), - &BitRateWindow::onRoundtripIdle, - }, - - - // NotFilled => {BelowThreshold, AboveThreshold} - { - Edge(State::NotFilled, State::BelowThreshold), - &BitRateWindow::onNotFilledToBelowThreshold - }, - { - Edge(State::NotFilled, State::AboveThreshold), - &BitRateWindow::onNotFilledToAboveThreshold - }, - - // BelowThreshold => AboveThreshold - { - Edge(State::BelowThreshold, State::AboveThreshold), - &BitRateWindow::onBelowToAboveThreshold - }, - - // AboveThreshold => Idle - { - Edge(State::AboveThreshold, State::Idle), - &BitRateWindow::onAboveThresholdToIdle - }, - - // Idle => NotFilled - { - Edge(State::Idle, State::NotFilled), - &BitRateWindow::onIdleToNotFilled - }, - }; - -public: - BitRateWindow(size_t MinLength, size_t MaxLength, size_t IdleLength, - size_t SetBitsThreshold) : - MinLength(MinLength), MaxLength(MaxLength), IdleLength(IdleLength), - SetBitsThreshold(SetBitsThreshold), - CurrState(State::NotFilled), CurrLength(0), BBC(MinLength) {} - - std::pair<Edge, size_t> insert(bool Bit); - - void print(std::ostream &OS) const; - -private: - size_t onRoundtripNotFilled(State PrevState, bool NewBit) { - (void) PrevState, (void) NewBit; - - CurrLength += 1; - return CurrLength; - } - - size_t onRoundtripBelowThreshold(State PrevState, bool NewBit) { - (void) PrevState, (void) NewBit; - - CurrLength = MinLength; - return CurrLength; - } - - size_t onRoundtripAboveThreshold(State PrevState, bool NewBit) { - (void) PrevState, (void) NewBit; - - CurrLength += 1; - return CurrLength; - } - - size_t onRoundtripIdle(State PrevState, bool NewBit) { - (void) PrevState, (void) NewBit; - - CurrLength += 1; - return CurrLength; - } - - size_t onNotFilledToBelowThreshold(State PrevState, bool NewBit) { - (void) PrevState, (void) NewBit; - - CurrLength = MinLength; - return CurrLength; - } - - size_t onNotFilledToAboveThreshold(State PrevState, bool NewBit) { - (void) PrevState, (void) NewBit; - - CurrLength += 1; - return CurrLength; - } - - size_t onBelowToAboveThreshold(State PrevState, bool NewBit) { - (void) PrevState, (void) NewBit; - - CurrLength = MinLength; - return CurrLength; - } - - size_t onAboveThresholdToIdle(State PrevState, bool NewBit) { - (void) PrevState, (void) NewBit; - - size_t PrevLength = CurrLength; - CurrLength = 1; - return PrevLength; - } - - size_t onIdleToNotFilled(State PrevState, bool NewBit) { - (void) PrevState, (void) NewBit; - - BBC = BitBufferCounter(MinLength); - BBC.insert(NewBit); - - CurrLength = 1; - return CurrLength; - } - -private: - size_t MinLength; - size_t MaxLength; - size_t IdleLength; - size_t SetBitsThreshold; - - State CurrState; - size_t CurrLength; - BitBufferCounter BBC; -}; - -} // namespace ml - -inline std::ostream& operator<<(std::ostream &OS, const ml::BitRateWindow BRW) { - BRW.print(OS); - return OS; -} - -#endif /* BIT_RATE_WINDOW_H */ diff --git a/ml/Config.cc b/ml/Config.cc index 65b05a34d..eedd8c29f 100644 --- a/ml/Config.cc +++ b/ml/Config.cc @@ -31,8 +31,7 @@ void Config::readMLConfig(void) { unsigned MaxTrainSamples = config_get_number(ConfigSectionML, "maximum num samples to train", 4 * 3600); unsigned MinTrainSamples = config_get_number(ConfigSectionML, "minimum num samples to train", 1 * 900); unsigned TrainEvery = config_get_number(ConfigSectionML, "train every", 1 * 3600); - - unsigned DBEngineAnomalyRateEvery = config_get_number(ConfigSectionML, "dbengine anomaly rate every", 30); + unsigned NumModelsToUse = config_get_number(ConfigSectionML, "number of models per dimension", 1 * 24); unsigned DiffN = config_get_number(ConfigSectionML, "num samples to diff", 1); unsigned SmoothN = config_get_number(ConfigSectionML, "num samples to smooth", 3); @@ -42,27 +41,19 @@ void Config::readMLConfig(void) { unsigned MaxKMeansIters = config_get_number(ConfigSectionML, "maximum number of k-means iterations", 1000); double DimensionAnomalyScoreThreshold = config_get_float(ConfigSectionML, "dimension anomaly score threshold", 0.99); - double HostAnomalyRateThreshold = config_get_float(ConfigSectionML, "host anomaly rate threshold", 0.01); - - double ADMinWindowSize = config_get_float(ConfigSectionML, "minimum window size", 30); - double ADMaxWindowSize = config_get_float(ConfigSectionML, "maximum window size", 600); - double ADIdleWindowSize = config_get_float(ConfigSectionML, "idle window size", 30); - double ADWindowRateThreshold = config_get_float(ConfigSectionML, "window minimum anomaly rate", 0.25); - double ADDimensionRateThreshold = config_get_float(ConfigSectionML, "anomaly event min dimension rate threshold", 0.05); - std::stringstream SS; - SS << netdata_configured_cache_dir << "/anomaly-detection.db"; - Cfg.AnomalyDBPath = SS.str(); + double HostAnomalyRateThreshold = config_get_float(ConfigSectionML, "host anomaly rate threshold", 1.0); + std::string AnomalyDetectionGroupingMethod = config_get(ConfigSectionML, "anomaly detection grouping method", "average"); + time_t AnomalyDetectionQueryDuration = config_get_number(ConfigSectionML, "anomaly detection grouping duration", 5 * 60); /* * Clamp */ - MaxTrainSamples = clamp(MaxTrainSamples, 1 * 3600u, 24 * 3600u); - MinTrainSamples = clamp(MinTrainSamples, 1 * 900u, 6 * 3600u); - TrainEvery = clamp(TrainEvery, 1 * 3600u, 6 * 3600u); - - DBEngineAnomalyRateEvery = clamp(DBEngineAnomalyRateEvery, 1 * 30u, 15 * 60u); + MaxTrainSamples = clamp<unsigned>(MaxTrainSamples, 1 * 3600, 24 * 3600); + MinTrainSamples = clamp<unsigned>(MinTrainSamples, 1 * 900, 6 * 3600); + TrainEvery = clamp<unsigned>(TrainEvery, 1 * 3600, 6 * 3600); + NumModelsToUse = clamp<unsigned>(TrainEvery, 1, 7 * 24); DiffN = clamp(DiffN, 0u, 1u); SmoothN = clamp(SmoothN, 0u, 5u); @@ -72,13 +63,9 @@ void Config::readMLConfig(void) { MaxKMeansIters = clamp(MaxKMeansIters, 500u, 1000u); DimensionAnomalyScoreThreshold = clamp(DimensionAnomalyScoreThreshold, 0.01, 5.00); - HostAnomalyRateThreshold = clamp(HostAnomalyRateThreshold, 0.01, 1.0); - ADMinWindowSize = clamp(ADMinWindowSize, 30.0, 300.0); - ADMaxWindowSize = clamp(ADMaxWindowSize, 60.0, 900.0); - ADIdleWindowSize = clamp(ADIdleWindowSize, 30.0, 900.0); - ADWindowRateThreshold = clamp(ADWindowRateThreshold, 0.01, 0.99); - ADDimensionRateThreshold = clamp(ADDimensionRateThreshold, 0.01, 0.99); + HostAnomalyRateThreshold = clamp(HostAnomalyRateThreshold, 0.1, 10.0); + AnomalyDetectionQueryDuration = clamp<time_t>(AnomalyDetectionQueryDuration, 60, 15 * 60); /* * Validate @@ -91,13 +78,6 @@ void Config::readMLConfig(void) { MaxTrainSamples = 4 * 3600; } - if (ADMinWindowSize >= ADMaxWindowSize) { - error("invalid min/max anomaly window size found (%lf >= %lf)", ADMinWindowSize, ADMaxWindowSize); - - ADMinWindowSize = 30.0; - ADMaxWindowSize = 600.0; - } - /* * Assign to config instance */ @@ -107,8 +87,7 @@ void Config::readMLConfig(void) { Cfg.MaxTrainSamples = MaxTrainSamples; Cfg.MinTrainSamples = MinTrainSamples; Cfg.TrainEvery = TrainEvery; - - Cfg.DBEngineAnomalyRateEvery = DBEngineAnomalyRateEvery; + Cfg.NumModelsToUse = NumModelsToUse; Cfg.DiffN = DiffN; Cfg.SmoothN = SmoothN; @@ -118,13 +97,10 @@ void Config::readMLConfig(void) { Cfg.MaxKMeansIters = MaxKMeansIters; Cfg.DimensionAnomalyScoreThreshold = DimensionAnomalyScoreThreshold; - Cfg.HostAnomalyRateThreshold = HostAnomalyRateThreshold; - Cfg.ADMinWindowSize = ADMinWindowSize; - Cfg.ADMaxWindowSize = ADMaxWindowSize; - Cfg.ADIdleWindowSize = ADIdleWindowSize; - Cfg.ADWindowRateThreshold = ADWindowRateThreshold; - Cfg.ADDimensionRateThreshold = ADDimensionRateThreshold; + Cfg.HostAnomalyRateThreshold = HostAnomalyRateThreshold; + Cfg.AnomalyDetectionGroupingMethod = web_client_api_request_v1_data_group(AnomalyDetectionGroupingMethod.c_str(), RRDR_GROUPING_AVERAGE); + Cfg.AnomalyDetectionQueryDuration = AnomalyDetectionQueryDuration; Cfg.HostsToSkip = config_get(ConfigSectionML, "hosts to skip from training", "!*"); Cfg.SP_HostsToSkip = simple_pattern_create(Cfg.HostsToSkip.c_str(), NULL, SIMPLE_PATTERN_EXACT); diff --git a/ml/Config.h b/ml/Config.h index 595fd072b..d876d4aa4 100644 --- a/ml/Config.h +++ b/ml/Config.h @@ -14,6 +14,7 @@ public: unsigned MaxTrainSamples; unsigned MinTrainSamples; unsigned TrainEvery; + unsigned NumModelsToUse; unsigned DBEngineAnomalyRateEvery; @@ -25,13 +26,10 @@ public: unsigned MaxKMeansIters; double DimensionAnomalyScoreThreshold; - double HostAnomalyRateThreshold; - double ADMinWindowSize; - double ADMaxWindowSize; - double ADIdleWindowSize; - double ADWindowRateThreshold; - double ADDimensionRateThreshold; + double HostAnomalyRateThreshold; + RRDR_GROUPING AnomalyDetectionGroupingMethod; + time_t AnomalyDetectionQueryDuration; bool StreamADCharts; @@ -41,7 +39,6 @@ public: std::string ChartsToSkip; SIMPLE_PATTERN *SP_ChartsToSkip; - std::string AnomalyDBPath; std::vector<uint32_t> RandomNums; void readMLConfig(); diff --git a/ml/Database.cc b/ml/Database.cc deleted file mode 100644 index 06d0cdecb..000000000 --- a/ml/Database.cc +++ /dev/null @@ -1,127 +0,0 @@ -// SPDX-License-Identifier: GPL-3.0-or-later - -#include "Database.h" - -const char *ml::Database::SQL_CREATE_ANOMALIES_TABLE = - "CREATE TABLE IF NOT EXISTS anomaly_events( " - " anomaly_detector_name text NOT NULL, " - " anomaly_detector_version int NOT NULL, " - " host_id text NOT NULL, " - " after int NOT NULL, " - " before int NOT NULL, " - " anomaly_event_info text, " - " PRIMARY KEY( " - " anomaly_detector_name, anomaly_detector_version, " - " host_id, after, before " - " ) " - ");"; - -const char *ml::Database::SQL_INSERT_ANOMALY = - "INSERT INTO anomaly_events( " - " anomaly_detector_name, anomaly_detector_version, " - " host_id, after, before, anomaly_event_info) " - "VALUES (?1, ?2, ?3, ?4, ?5, ?6);"; - -const char *ml::Database::SQL_SELECT_ANOMALY = - "SELECT anomaly_event_info FROM anomaly_events WHERE" - " anomaly_detector_name == ?1 AND" - " anomaly_detector_version == ?2 AND" - " host_id == ?3 AND" - " after == ?4 AND" - " before == ?5;"; - -const char *ml::Database::SQL_SELECT_ANOMALY_EVENTS = - "SELECT after, before FROM anomaly_events WHERE" - " anomaly_detector_name == ?1 AND" - " anomaly_detector_version == ?2 AND" - " host_id == ?3 AND" - " after >= ?4 AND" - " before <= ?5;"; - -using namespace ml; - -bool Statement::prepare(sqlite3 *Conn) { - if (!Conn) - return false; - - if (ParsedStmt) - return true; - - int RC = sqlite3_prepare_v2(Conn, RawStmt, -1, &ParsedStmt, nullptr); - if (RC == SQLITE_OK) - return true; - - std::string Msg = "Statement \"%s\" preparation failed due to \"%s\""; - error(Msg.c_str(), RawStmt, sqlite3_errstr(RC)); - - return false; -} - -bool Statement::bindValue(size_t Pos, const std::string &Value) { - int RC = sqlite3_bind_text(ParsedStmt, Pos, Value.c_str(), -1, SQLITE_TRANSIENT); - if (RC == SQLITE_OK) - return true; - - error("Failed to bind text '%s' (pos = %zu) in statement '%s'.", Value.c_str(), Pos, RawStmt); - return false; -} - -bool Statement::bindValue(size_t Pos, const int Value) { - int RC = sqlite3_bind_int(ParsedStmt, Pos, Value); - if (RC == SQLITE_OK) - return true; - - error("Failed to bind integer %d (pos = %zu) in statement '%s'.", Value, Pos, RawStmt); - return false; -} - -bool Statement::resetAndClear(bool Ret) { - int RC = sqlite3_reset(ParsedStmt); - if (RC != SQLITE_OK) { - error("Could not reset statement: '%s'", RawStmt); - return false; - } - - RC = sqlite3_clear_bindings(ParsedStmt); - if (RC != SQLITE_OK) { - error("Could not clear bindings in statement: '%s'", RawStmt); - return false; - } - - return Ret; -} - -Database::Database(const std::string &Path) { - // Get sqlite3 connection handle. - int RC = sqlite3_open(Path.c_str(), &Conn); - if (RC != SQLITE_OK) { - std::string Msg = "Failed to initialize ML DB at %s, due to \"%s\""; - error(Msg.c_str(), Path.c_str(), sqlite3_errstr(RC)); - - sqlite3_close(Conn); - Conn = nullptr; - return; - } - - // Create anomaly events table if it does not exist. - char *ErrMsg; - RC = sqlite3_exec(Conn, SQL_CREATE_ANOMALIES_TABLE, nullptr, nullptr, &ErrMsg); - if (RC == SQLITE_OK) - return; - - error("SQLite error during database initialization, rc = %d (%s)", RC, ErrMsg); - error("SQLite failed statement: %s", SQL_CREATE_ANOMALIES_TABLE); - - sqlite3_free(ErrMsg); - sqlite3_close(Conn); - Conn = nullptr; -} - -Database::~Database() { - if (!Conn) - return; - - int RC = sqlite3_close(Conn); - if (RC != SQLITE_OK) - error("Could not close connection properly (rc=%d)", RC); -} diff --git a/ml/Database.h b/ml/Database.h deleted file mode 100644 index cc7b75872..000000000 --- a/ml/Database.h +++ /dev/null @@ -1,131 +0,0 @@ -// SPDX-License-Identifier: GPL-3.0-or-later - -#ifndef ML_DATABASE_H -#define ML_DATABASE_H - -#include "Dimension.h" -#include "ml-private.h" - -#include "json/single_include/nlohmann/json.hpp" - -namespace ml { - -class Statement { -public: - using RowCallback = std::function<void(sqlite3_stmt *Stmt)>; - -public: - Statement(const char *RawStmt) : RawStmt(RawStmt), ParsedStmt(nullptr) {} - - template<typename ...ArgTypes> - bool exec(sqlite3 *Conn, RowCallback RowCb, ArgTypes ...Args) { - if (!prepare(Conn)) - return false; - - switch (bind(1, Args...)) { - case 0: - return false; - case sizeof...(Args): - break; - default: - return resetAndClear(false); - } - - while (true) { - switch (int RC = sqlite3_step(ParsedStmt)) { - case SQLITE_BUSY: case SQLITE_LOCKED: - usleep(SQLITE_INSERT_DELAY * USEC_PER_MS); - continue; - case SQLITE_ROW: - RowCb(ParsedStmt); - continue; - case SQLITE_DONE: - return resetAndClear(true); - default: - error("Stepping through '%s' returned rc=%d", RawStmt, RC); - return resetAndClear(false); - } - } - } - - ~Statement() { - if (!ParsedStmt) - return; - - int RC = sqlite3_finalize(ParsedStmt); - if (RC != SQLITE_OK) - error("Could not properly finalize statement (rc=%d)", RC); - } - -private: - bool prepare(sqlite3 *Conn); - - bool bindValue(size_t Pos, const int Value); - bool bindValue(size_t Pos, const std::string &Value); - - template<typename ArgType, typename ...ArgTypes> - size_t bind(size_t Pos, ArgType T) { - return bindValue(Pos, T); - } - - template<typename ArgType, typename ...ArgTypes> - size_t bind(size_t Pos, ArgType T, ArgTypes ...Args) { - return bindValue(Pos, T) + bind(Pos + 1, Args...); - } - - bool resetAndClear(bool Ret); - -private: - const char *RawStmt; - sqlite3_stmt *ParsedStmt; -}; - -class Database { -private: - static const char *SQL_CREATE_ANOMALIES_TABLE; - static const char *SQL_INSERT_ANOMALY; - static const char *SQL_SELECT_ANOMALY; - static const char *SQL_SELECT_ANOMALY_EVENTS; - -public: - Database(const std::string &Path); - - ~Database(); - - template<typename ...ArgTypes> - bool insertAnomaly(ArgTypes... Args) { - Statement::RowCallback RowCb = [](sqlite3_stmt *Stmt) { (void) Stmt; }; - return InsertAnomalyStmt.exec(Conn, RowCb, Args...); - } - - template<typename ...ArgTypes> - bool getAnomalyInfo(nlohmann::json &Json, ArgTypes&&... Args) { - Statement::RowCallback RowCb = [&](sqlite3_stmt *Stmt) { - const char *Text = static_cast<const char *>(sqlite3_column_blob(Stmt, 0)); - Json = nlohmann::json::parse(Text); - }; - return GetAnomalyInfoStmt.exec(Conn, RowCb, Args...); - } - - template<typename ...ArgTypes> - bool getAnomaliesInRange(std::vector<std::pair<time_t, time_t>> &V, ArgTypes&&... Args) { - Statement::RowCallback RowCb = [&](sqlite3_stmt *Stmt) { - V.push_back({ - sqlite3_column_int64(Stmt, 0), - sqlite3_column_int64(Stmt, 1) - }); - }; - return GetAnomaliesInRangeStmt.exec(Conn, RowCb, Args...); - } - -private: - sqlite3 *Conn; - - Statement InsertAnomalyStmt{SQL_INSERT_ANOMALY}; - Statement GetAnomalyInfoStmt{SQL_SELECT_ANOMALY}; - Statement GetAnomaliesInRangeStmt{SQL_SELECT_ANOMALY_EVENTS}; -}; - -} - -#endif /* ML_DATABASE_H */ diff --git a/ml/Dimension.cc b/ml/Dimension.cc index 0fe07530c..bf34abb72 100644 --- a/ml/Dimension.cc +++ b/ml/Dimension.cc @@ -6,8 +6,13 @@ using namespace ml; -std::pair<CalculatedNumber *, size_t> -TrainableDimension::getCalculatedNumbers() { +bool Dimension::isActive() const { + bool SetObsolete = rrdset_flag_check(RD->rrdset, RRDSET_FLAG_OBSOLETE); + bool DimObsolete = rrddim_flag_check(RD, RRDDIM_FLAG_OBSOLETE); + return !SetObsolete && !DimObsolete; +} + +std::pair<CalculatedNumber *, size_t> Dimension::getCalculatedNumbers() { size_t MinN = Cfg.MinTrainSamples; size_t MaxN = Cfg.MaxTrainSamples; @@ -68,7 +73,7 @@ TrainableDimension::getCalculatedNumbers() { return { CNs, TotalValues }; } -MLResult TrainableDimension::trainModel() { +MLResult Dimension::trainModel() { auto P = getCalculatedNumbers(); CalculatedNumber *CNs = P.first; unsigned N = P.second; @@ -81,7 +86,15 @@ MLResult TrainableDimension::trainModel() { SamplesBuffer SB = SamplesBuffer(CNs, N, 1, Cfg.DiffN, Cfg.SmoothN, Cfg.LagN, SamplingRatio, Cfg.RandomNums); - KM.train(SB, Cfg.MaxKMeansIters); + std::vector<DSample> Samples = SB.preprocess(); + + KMeans KM; + KM.train(Samples, Cfg.MaxKMeansIters); + + { + std::lock_guard<std::mutex> Lock(Mutex); + Models[0] = KM; + } Trained = true; ConstantModel = true; @@ -90,16 +103,25 @@ MLResult TrainableDimension::trainModel() { return MLResult::Success; } -void PredictableDimension::addValue(CalculatedNumber Value, bool Exists) { +bool Dimension::shouldTrain(const TimePoint &TP) const { + if (ConstantModel) + return false; + + return (LastTrainedAt + Seconds(Cfg.TrainEvery * updateEvery())) < TP; +} + +bool Dimension::predict(CalculatedNumber Value, bool Exists) { if (!Exists) { CNs.clear(); - return; + AnomalyBit = false; + return false; } unsigned N = Cfg.DiffN + Cfg.SmoothN + Cfg.LagN; if (CNs.size() < N) { CNs.push_back(Value); - return; + AnomalyBit = false; + return false; } std::rotate(std::begin(CNs), std::begin(CNs) + 1, std::end(CNs)); @@ -108,28 +130,44 @@ void PredictableDimension::addValue(CalculatedNumber Value, bool Exists) { ConstantModel = false; CNs[N - 1] = Value; -} -std::pair<MLResult, bool> PredictableDimension::predict() { - unsigned N = Cfg.DiffN + Cfg.SmoothN + Cfg.LagN; - if (CNs.size() != N) { + if (!isTrained() || ConstantModel) { AnomalyBit = false; - return { MLResult::MissingData, AnomalyBit }; + return false; } CalculatedNumber *TmpCNs = new CalculatedNumber[N * (Cfg.LagN + 1)](); std::memcpy(TmpCNs, CNs.data(), N * sizeof(CalculatedNumber)); - - SamplesBuffer SB = SamplesBuffer(TmpCNs, N, 1, Cfg.DiffN, Cfg.SmoothN, Cfg.LagN, + SamplesBuffer SB = SamplesBuffer(TmpCNs, N, 1, + Cfg.DiffN, Cfg.SmoothN, Cfg.LagN, 1.0, Cfg.RandomNums); - AnomalyScore = computeAnomalyScore(SB); + const DSample Sample = SB.preprocess().back(); delete[] TmpCNs; - if (AnomalyScore == std::numeric_limits<CalculatedNumber>::quiet_NaN()) { + std::unique_lock<std::mutex> Lock(Mutex, std::defer_lock); + if (!Lock.try_lock()) { AnomalyBit = false; - return { MLResult::NaN, AnomalyBit }; + return false; } - AnomalyBit = AnomalyScore >= (100 * Cfg.DimensionAnomalyScoreThreshold); - return { MLResult::Success, AnomalyBit }; + for (const auto &KM : Models) { + double AnomalyScore = KM.anomalyScore(Sample); + if (AnomalyScore == std::numeric_limits<CalculatedNumber>::quiet_NaN()) { + AnomalyBit = false; + continue; + } + + if (AnomalyScore < (100 * Cfg.DimensionAnomalyScoreThreshold)) { + AnomalyBit = false; + return false; + } + } + + AnomalyBit = true; + return true; +} + +std::array<KMeans, 1> Dimension::getModels() { + std::unique_lock<std::mutex> Lock(Mutex); + return Models; } diff --git a/ml/Dimension.h b/ml/Dimension.h index 4fbc09b98..3ec56e098 100644 --- a/ml/Dimension.h +++ b/ml/Dimension.h @@ -3,157 +3,92 @@ #ifndef ML_DIMENSION_H #define ML_DIMENSION_H -#include "BitBufferCounter.h" +#include "Query.h" #include "Config.h" #include "ml-private.h" namespace ml { -class RrdDimension { -public: - RrdDimension(RRDDIM *RD) : RD(RD), Ops(&RD->tiers[0]->query_ops) { } - - RRDDIM *getRD() const { return RD; } - - time_t latestTime() { return Ops->latest_time(RD->tiers[0]->db_metric_handle); } - - time_t oldestTime() { return Ops->oldest_time(RD->tiers[0]->db_metric_handle); } - - unsigned updateEvery() const { return RD->update_every; } - - const std::string getID() const { - RRDSET *RS = RD->rrdset; - - std::stringstream SS; - SS << RS->context << "|" << RS->id << "|" << RD->name; - return SS.str(); - } - - bool isActive() const { - if (rrdset_flag_check(RD->rrdset, RRDSET_FLAG_OBSOLETE)) - return false; - - if (rrddim_flag_check(RD, RRDDIM_FLAG_OBSOLETE)) - return false; - - return true; - } - - void setAnomalyRateRD(RRDDIM *ARRD) { AnomalyRateRD = ARRD; } - RRDDIM *getAnomalyRateRD() const { return AnomalyRateRD; } - - void setAnomalyRateRDName(const char *Name) const { - rrddim_set_name(AnomalyRateRD->rrdset, AnomalyRateRD, Name); - } - - virtual ~RrdDimension() { - rrddim_free(AnomalyRateRD->rrdset, AnomalyRateRD); - } - -private: - RRDDIM *RD; - RRDDIM *AnomalyRateRD; - - struct rrddim_query_ops *Ops; - - std::string ID; -}; - enum class MLResult { Success = 0, MissingData, NaN, }; -class TrainableDimension : public RrdDimension { -public: - TrainableDimension(RRDDIM *RD) : - RrdDimension(RD), TrainEvery(Cfg.TrainEvery * updateEvery()) {} +static inline std::string getMLDimensionID(RRDDIM *RD) { + RRDSET *RS = RD->rrdset; - MLResult trainModel(); + std::stringstream SS; + SS << rrdset_context(RS) << "|" << rrdset_id(RS) << "|" << rrddim_name(RD); + return SS.str(); +} - CalculatedNumber computeAnomalyScore(SamplesBuffer &SB) { - return Trained ? KM.anomalyScore(SB) : 0.0; +class Dimension { +public: + Dimension(RRDDIM *RD) : + RD(RD), + LastTrainedAt(Seconds(0)), + Trained(false), + ConstantModel(false), + AnomalyScore(0.0), + AnomalyBit(0) + { } + + RRDDIM *getRD() const { + return RD; } - bool shouldTrain(const TimePoint &TP) const { - if (ConstantModel) - return false; - - return (LastTrainedAt + TrainEvery) < TP; + unsigned updateEvery() const { + return RD->update_every; } - bool isTrained() const { return Trained; } - -private: - std::pair<CalculatedNumber *, size_t> getCalculatedNumbers(); - -public: - TimePoint LastTrainedAt{Seconds{0}}; + time_t latestTime() const { + return Query(RD).latestTime(); + } -protected: - std::atomic<bool> ConstantModel{false}; + time_t oldestTime() const { + return Query(RD).oldestTime(); + } -private: - Seconds TrainEvery; - KMeans KM; + bool isTrained() const { + return Trained; + } - std::atomic<bool> Trained{false}; -}; + bool isAnomalous() const { + return AnomalyBit; + } -class PredictableDimension : public TrainableDimension { -public: - PredictableDimension(RRDDIM *RD) : TrainableDimension(RD) {} + bool shouldTrain(const TimePoint &TP) const; - std::pair<MLResult, bool> predict(); + bool isActive() const; - void addValue(CalculatedNumber Value, bool Exists); + MLResult trainModel(); - bool isAnomalous() { return AnomalyBit; } + bool predict(CalculatedNumber Value, bool Exists); - void updateAnomalyBitCounter(RRDSET *RS, unsigned Elapsed, bool IsAnomalous) { - AnomalyBitCounter += IsAnomalous; + std::pair<bool, double> detect(size_t WindowLength, bool Reset); - if (Elapsed == Cfg.DBEngineAnomalyRateEvery) { - double AR = static_cast<double>(AnomalyBitCounter) / Cfg.DBEngineAnomalyRateEvery; - rrddim_set_by_pointer(RS, getAnomalyRateRD(), AR * 1000); - AnomalyBitCounter = 0; - } - } + std::array<KMeans, 1> getModels(); private: - CalculatedNumber AnomalyScore{0.0}; - std::atomic<bool> AnomalyBit{false}; - unsigned AnomalyBitCounter{0}; - - std::vector<CalculatedNumber> CNs; -}; + std::pair<CalculatedNumber *, size_t> getCalculatedNumbers(); -class DetectableDimension : public PredictableDimension { public: - DetectableDimension(RRDDIM *RD) : PredictableDimension(RD) {} - - std::pair<bool, double> detect(size_t WindowLength, bool Reset) { - bool AnomalyBit = isAnomalous(); - - if (Reset) - NumSetBits = BBC.numSetBits(); + RRDDIM *RD; - NumSetBits += AnomalyBit; - BBC.insert(AnomalyBit); + TimePoint LastTrainedAt; + std::atomic<bool> Trained; + std::atomic<bool> ConstantModel; - double AnomalyRate = static_cast<double>(NumSetBits) / WindowLength; - return { AnomalyBit, AnomalyRate }; - } + CalculatedNumber AnomalyScore; + std::atomic<bool> AnomalyBit; -private: - BitBufferCounter BBC{static_cast<size_t>(Cfg.ADMinWindowSize)}; - size_t NumSetBits{0}; + std::vector<CalculatedNumber> CNs; + std::array<KMeans, 1> Models; + std::mutex Mutex; }; -using Dimension = DetectableDimension; - } // namespace ml #endif /* ML_DIMENSION_H */ diff --git a/ml/Host.cc b/ml/Host.cc index f8cba9d64..4a57178c7 100644 --- a/ml/Host.cc +++ b/ml/Host.cc @@ -1,278 +1,20 @@ // SPDX-License-Identifier: GPL-3.0-or-later -#include <dlib/statistics.h> - #include "Config.h" #include "Host.h" +#include "ADCharts.h" #include "json/single_include/nlohmann/json.hpp" using namespace ml; -static void updateDimensionsChart(RRDHOST *RH, - collected_number NumTrainedDimensions, - collected_number NumNormalDimensions, - collected_number NumAnomalousDimensions) { - static thread_local RRDSET *RS = nullptr; - static thread_local RRDDIM *NumTotalDimensionsRD = nullptr; - static thread_local RRDDIM *NumTrainedDimensionsRD = nullptr; - static thread_local RRDDIM *NumNormalDimensionsRD = nullptr; - static thread_local RRDDIM *NumAnomalousDimensionsRD = nullptr; - - if (!RS) { - std::stringstream IdSS, NameSS; - - IdSS << "dimensions_on_" << localhost->machine_guid; - NameSS << "dimensions_on_" << localhost->hostname; - - RS = rrdset_create( - RH, - "anomaly_detection", // type - IdSS.str().c_str(), // id - NameSS.str().c_str(), // name - "dimensions", // family - "anomaly_detection.dimensions", // ctx - "Anomaly detection dimensions", // title - "dimensions", // units - "netdata", // plugin - "ml", // module - 39183, // priority - RH->rrd_update_every, // update_every - RRDSET_TYPE_LINE // chart_type - ); - rrdset_flag_set(RS, RRDSET_FLAG_ANOMALY_DETECTION); - - NumTotalDimensionsRD = rrddim_add(RS, "total", NULL, - 1, 1, RRD_ALGORITHM_ABSOLUTE); - NumTrainedDimensionsRD = rrddim_add(RS, "trained", NULL, - 1, 1, RRD_ALGORITHM_ABSOLUTE); - NumNormalDimensionsRD = rrddim_add(RS, "normal", NULL, - 1, 1, RRD_ALGORITHM_ABSOLUTE); - NumAnomalousDimensionsRD = rrddim_add(RS, "anomalous", NULL, - 1, 1, RRD_ALGORITHM_ABSOLUTE); - } else - rrdset_next(RS); - - rrddim_set_by_pointer(RS, NumTotalDimensionsRD, NumNormalDimensions + NumAnomalousDimensions); - rrddim_set_by_pointer(RS, NumTrainedDimensionsRD, NumTrainedDimensions); - rrddim_set_by_pointer(RS, NumNormalDimensionsRD, NumNormalDimensions); - rrddim_set_by_pointer(RS, NumAnomalousDimensionsRD, NumAnomalousDimensions); - - rrdset_done(RS); -} - -static void updateRateChart(RRDHOST *RH, collected_number AnomalyRate) { - static thread_local RRDSET *RS = nullptr; - static thread_local RRDDIM *AnomalyRateRD = nullptr; - - if (!RS) { - std::stringstream IdSS, NameSS; - - IdSS << "anomaly_rate_on_" << localhost->machine_guid; - NameSS << "anomaly_rate_on_" << localhost->hostname; - - RS = rrdset_create( - RH, - "anomaly_detection", // type - IdSS.str().c_str(), // id - NameSS.str().c_str(), // name - "anomaly_rate", // family - "anomaly_detection.anomaly_rate", // ctx - "Percentage of anomalous dimensions", // title - "percentage", // units - "netdata", // plugin - "ml", // module - 39184, // priority - RH->rrd_update_every, // update_every - RRDSET_TYPE_LINE // chart_type - ); - rrdset_flag_set(RS, RRDSET_FLAG_ANOMALY_DETECTION); - - AnomalyRateRD = rrddim_add(RS, "anomaly_rate", NULL, - 1, 100, RRD_ALGORITHM_ABSOLUTE); - } else - rrdset_next(RS); - - rrddim_set_by_pointer(RS, AnomalyRateRD, AnomalyRate); - - rrdset_done(RS); -} - -static void updateWindowLengthChart(RRDHOST *RH, collected_number WindowLength) { - static thread_local RRDSET *RS = nullptr; - static thread_local RRDDIM *WindowLengthRD = nullptr; - - if (!RS) { - std::stringstream IdSS, NameSS; - - IdSS << "detector_window_on_" << localhost->machine_guid; - NameSS << "detector_window_on_" << localhost->hostname; - - RS = rrdset_create( - RH, - "anomaly_detection", // type - IdSS.str().c_str(), // id - NameSS.str().c_str(), // name - "detector_window", // family - "anomaly_detection.detector_window", // ctx - "Anomaly detector window length", // title - "seconds", // units - "netdata", // plugin - "ml", // module - 39185, // priority - RH->rrd_update_every, // update_every - RRDSET_TYPE_LINE // chart_type - ); - rrdset_flag_set(RS, RRDSET_FLAG_ANOMALY_DETECTION); - - WindowLengthRD = rrddim_add(RS, "duration", NULL, - 1, 1, RRD_ALGORITHM_ABSOLUTE); - } else - rrdset_next(RS); - - rrddim_set_by_pointer(RS, WindowLengthRD, WindowLength * RH->rrd_update_every); - rrdset_done(RS); -} - -static void updateEventsChart(RRDHOST *RH, - std::pair<BitRateWindow::Edge, size_t> P, - bool ResetBitCounter, - bool NewAnomalyEvent) { - static thread_local RRDSET *RS = nullptr; - static thread_local RRDDIM *AboveThresholdRD = nullptr; - static thread_local RRDDIM *ResetBitCounterRD = nullptr; - static thread_local RRDDIM *NewAnomalyEventRD = nullptr; - - if (!RS) { - std::stringstream IdSS, NameSS; - - IdSS << "detector_events_on_" << localhost->machine_guid; - NameSS << "detector_events_on_" << localhost->hostname; - - RS = rrdset_create( - RH, - "anomaly_detection", // type - IdSS.str().c_str(), // id - NameSS.str().c_str(), // name - "detector_events", // family - "anomaly_detection.detector_events", // ctx - "Anomaly events triggered", // title - "boolean", // units - "netdata", // plugin - "ml", // module - 39186, // priority - RH->rrd_update_every, // update_every - RRDSET_TYPE_LINE // chart_type - ); - rrdset_flag_set(RS, RRDSET_FLAG_ANOMALY_DETECTION); - - AboveThresholdRD = rrddim_add(RS, "above_threshold", NULL, - 1, 1, RRD_ALGORITHM_ABSOLUTE); - ResetBitCounterRD = rrddim_add(RS, "reset_bit_counter", NULL, - 1, 1, RRD_ALGORITHM_ABSOLUTE); - NewAnomalyEventRD = rrddim_add(RS, "new_anomaly_event", NULL, - 1, 1, RRD_ALGORITHM_ABSOLUTE); - } else - rrdset_next(RS); - - BitRateWindow::Edge E = P.first; - bool AboveThreshold = E.second == BitRateWindow::State::AboveThreshold; - - rrddim_set_by_pointer(RS, AboveThresholdRD, AboveThreshold); - rrddim_set_by_pointer(RS, ResetBitCounterRD, ResetBitCounter); - rrddim_set_by_pointer(RS, NewAnomalyEventRD, NewAnomalyEvent); - - rrdset_done(RS); -} - -static void updateDetectionChart(RRDHOST *RH) { - static thread_local RRDSET *RS = nullptr; - static thread_local RRDDIM *UserRD, *SystemRD = nullptr; - - if (!RS) { - std::stringstream IdSS, NameSS; - - IdSS << "prediction_stats_" << RH->machine_guid; - NameSS << "prediction_stats_for_" << RH->hostname; - - RS = rrdset_create_localhost( - "netdata", // type - IdSS.str().c_str(), // id - NameSS.str().c_str(), // name - "ml", // family - "netdata.prediction_stats", // ctx - "Prediction thread CPU usage", // title - "milliseconds/s", // units - "netdata", // plugin - "ml", // module - 136000, // priority - RH->rrd_update_every, // update_every - RRDSET_TYPE_STACKED // chart_type - ); - - UserRD = rrddim_add(RS, "user", NULL, 1, 1000, RRD_ALGORITHM_INCREMENTAL); - SystemRD = rrddim_add(RS, "system", NULL, 1, 1000, RRD_ALGORITHM_INCREMENTAL); - } else - rrdset_next(RS); - - struct rusage TRU; - getrusage(RUSAGE_THREAD, &TRU); - - rrddim_set_by_pointer(RS, UserRD, TRU.ru_utime.tv_sec * 1000000ULL + TRU.ru_utime.tv_usec); - rrddim_set_by_pointer(RS, SystemRD, TRU.ru_stime.tv_sec * 1000000ULL + TRU.ru_stime.tv_usec); - rrdset_done(RS); -} - -static void updateTrainingChart(RRDHOST *RH, struct rusage *TRU) -{ - static thread_local RRDSET *RS = nullptr; - static thread_local RRDDIM *UserRD = nullptr; - static thread_local RRDDIM *SystemRD = nullptr; - - if (!RS) { - std::stringstream IdSS, NameSS; - - IdSS << "training_stats_" << RH->machine_guid; - NameSS << "training_stats_for_" << RH->hostname; - - RS = rrdset_create_localhost( - "netdata", // type - IdSS.str().c_str(), // id - NameSS.str().c_str(), // name - "ml", // family - "netdata.training_stats", // ctx - "Training thread CPU usage", // title - "milliseconds/s", // units - "netdata", // plugin - "ml", // module - 136001, // priority - RH->rrd_update_every, // update_every - RRDSET_TYPE_STACKED // chart_type - ); - - UserRD = rrddim_add(RS, "user", NULL, 1, 1000, RRD_ALGORITHM_INCREMENTAL); - SystemRD = rrddim_add(RS, "system", NULL, 1, 1000, RRD_ALGORITHM_INCREMENTAL); - } else - rrdset_next(RS); - - rrddim_set_by_pointer(RS, UserRD, TRU->ru_utime.tv_sec * 1000000ULL + TRU->ru_utime.tv_usec); - rrddim_set_by_pointer(RS, SystemRD, TRU->ru_stime.tv_sec * 1000000ULL + TRU->ru_stime.tv_usec); - rrdset_done(RS); -} - void RrdHost::addDimension(Dimension *D) { - RRDDIM *AnomalyRateRD = rrddim_add(AnomalyRateRS, D->getID().c_str(), NULL, - 1, 1000, RRD_ALGORITHM_ABSOLUTE); - D->setAnomalyRateRD(AnomalyRateRD); - - { - std::lock_guard<std::mutex> Lock(Mutex); + std::lock_guard<std::mutex> Lock(Mutex); - DimensionsMap[D->getRD()] = D; + DimensionsMap[D->getRD()] = D; - // Default construct mutex for dimension - LocksMap[D]; - } + // Default construct mutex for dimension + LocksMap[D]; } void RrdHost::removeDimension(Dimension *D) { @@ -312,18 +54,33 @@ void RrdHost::getConfigAsJson(nlohmann::json &Json) const { Json["max-kmeans-iters"] = Cfg.MaxKMeansIters; Json["dimension-anomaly-score-threshold"] = Cfg.DimensionAnomalyScoreThreshold; - Json["host-anomaly-rate-threshold"] = Cfg.HostAnomalyRateThreshold; - Json["min-window-size"] = Cfg.ADMinWindowSize; - Json["max-window-size"] = Cfg.ADMaxWindowSize; - Json["idle-window-size"] = Cfg.ADIdleWindowSize; - Json["window-rate-threshold"] = Cfg.ADWindowRateThreshold; - Json["dimension-rate-threshold"] = Cfg.ADDimensionRateThreshold; + Json["host-anomaly-rate-threshold"] = Cfg.HostAnomalyRateThreshold; + Json["anomaly-detection-grouping-method"] = group_method2string(Cfg.AnomalyDetectionGroupingMethod); + Json["anomaly-detection-query-duration"] = Cfg.AnomalyDetectionQueryDuration; Json["hosts-to-skip"] = Cfg.HostsToSkip; Json["charts-to-skip"] = Cfg.ChartsToSkip; } +void TrainableHost::getModelsAsJson(nlohmann::json &Json) { + std::lock_guard<std::mutex> Lock(Mutex); + + for (auto &DP : DimensionsMap) { + Dimension *D = DP.second; + + nlohmann::json JsonArray = nlohmann::json::array(); + for (const KMeans &KM : D->getModels()) { + nlohmann::json J; + KM.toJson(J); + JsonArray.push_back(J); + } + Json[getMLDimensionID(D->getRD())] = JsonArray; + } + + return; +} + std::pair<Dimension *, Duration<double>> TrainableHost::findDimensionToTrain(const TimePoint &NowTP) { std::lock_guard<std::mutex> Lock(Mutex); @@ -384,7 +141,12 @@ void TrainableHost::train() { worker_is_idle(); SleepFor = std::min(AllottedDuration - RealDuration, MaxSleepFor); - std::this_thread::sleep_for(SleepFor); + TimePoint Now = SteadyClock::now(); + auto Until = Now + SleepFor; + while (Now < Until && !netdata_exit) { + std::this_thread::sleep_for(std::chrono::milliseconds(1000)); + Now = SteadyClock::now(); + } worker_is_busy(0); } } @@ -393,78 +155,44 @@ void TrainableHost::train() { #define WORKER_JOB_UPDATE_DETECTION_CHART 1 #define WORKER_JOB_UPDATE_ANOMALY_RATES 2 #define WORKER_JOB_UPDATE_CHARTS 3 -#define WORKER_JOB_SAVE_ANOMALY_EVENT 4 #if WORKER_UTILIZATION_MAX_JOB_TYPES < 5 #error WORKER_UTILIZATION_MAX_JOB_TYPES has to be at least 5 #endif void DetectableHost::detectOnce() { - auto P = BRW.insert(WindowAnomalyRate >= Cfg.HostAnomalyRateThreshold); - BitRateWindow::Edge Edge = P.first; - size_t WindowLength = P.second; - - bool ResetBitCounter = (Edge.first != BitRateWindow::State::AboveThreshold); - bool NewAnomalyEvent = (Edge.first == BitRateWindow::State::AboveThreshold) && - (Edge.second == BitRateWindow::State::Idle); - - std::vector<std::pair<double, std::string>> DimsOverThreshold; - size_t NumAnomalousDimensions = 0; size_t NumNormalDimensions = 0; size_t NumTrainedDimensions = 0; size_t NumActiveDimensions = 0; - bool CollectAnomalyRates = (++AnomalyRateTimer == Cfg.DBEngineAnomalyRateEvery); - if (CollectAnomalyRates) - rrdset_next(AnomalyRateRS); - { std::lock_guard<std::mutex> Lock(Mutex); - DimsOverThreshold.reserve(DimensionsMap.size()); - for (auto &DP : DimensionsMap) { worker_is_busy(WORKER_JOB_DETECT_DIMENSION); Dimension *D = DP.second; - if (!D->isActive()) { - D->updateAnomalyBitCounter(AnomalyRateRS, AnomalyRateTimer, false); + if (!D->isActive()) continue; - } NumActiveDimensions++; - - auto P = D->detect(WindowLength, ResetBitCounter); - bool IsAnomalous = P.first; - double AnomalyScore = P.second; - NumTrainedDimensions += D->isTrained(); + bool IsAnomalous = D->isAnomalous(); if (IsAnomalous) NumAnomalousDimensions += 1; - - if (NewAnomalyEvent && (AnomalyScore >= Cfg.ADDimensionRateThreshold)) - DimsOverThreshold.push_back({ AnomalyScore, D->getID() }); - - D->updateAnomalyBitCounter(AnomalyRateRS, AnomalyRateTimer, IsAnomalous); } if (NumAnomalousDimensions) - WindowAnomalyRate = static_cast<double>(NumAnomalousDimensions) / NumActiveDimensions; + HostAnomalyRate = static_cast<double>(NumAnomalousDimensions) / NumActiveDimensions; else - WindowAnomalyRate = 0.0; + HostAnomalyRate = 0.0; NumNormalDimensions = NumActiveDimensions - NumAnomalousDimensions; } - if (CollectAnomalyRates) { - worker_is_busy(WORKER_JOB_UPDATE_ANOMALY_RATES); - AnomalyRateTimer = 0; - rrdset_done(AnomalyRateRS); - } - this->NumAnomalousDimensions = NumAnomalousDimensions; this->NumNormalDimensions = NumNormalDimensions; this->NumTrainedDimensions = NumTrainedDimensions; @@ -472,38 +200,11 @@ void DetectableHost::detectOnce() { worker_is_busy(WORKER_JOB_UPDATE_CHARTS); updateDimensionsChart(getRH(), NumTrainedDimensions, NumNormalDimensions, NumAnomalousDimensions); - updateRateChart(getRH(), WindowAnomalyRate * 10000.0); - updateWindowLengthChart(getRH(), WindowLength); - updateEventsChart(getRH(), P, ResetBitCounter, NewAnomalyEvent); + updateHostAndDetectionRateCharts(getRH(), HostAnomalyRate * 10000.0); struct rusage TRU; getResourceUsage(&TRU); updateTrainingChart(getRH(), &TRU); - - if (!NewAnomalyEvent || (DimsOverThreshold.size() == 0)) - return; - - worker_is_busy(WORKER_JOB_SAVE_ANOMALY_EVENT); - - std::sort(DimsOverThreshold.begin(), DimsOverThreshold.end()); - std::reverse(DimsOverThreshold.begin(), DimsOverThreshold.end()); - - // Make sure the JSON response won't grow beyond a specific number - // of dimensions. Log an error message if this happens, because it - // most likely means that the user specified a very-low anomaly rate - // threshold. - size_t NumMaxDimsOverThreshold = 2000; - if (DimsOverThreshold.size() > NumMaxDimsOverThreshold) { - error("Found %zu dimensions over threshold. Reducing JSON result to %zu dimensions.", - DimsOverThreshold.size(), NumMaxDimsOverThreshold); - DimsOverThreshold.resize(NumMaxDimsOverThreshold); - } - - nlohmann::json JsonResult = DimsOverThreshold; - - time_t Before = now_realtime_sec(); - time_t After = Before - (WindowLength * updateEvery()); - DB.insertAnomaly("AD1", 1, getUUID(), After, Before, JsonResult.dump(4)); } void DetectableHost::detect() { @@ -512,7 +213,6 @@ void DetectableHost::detect() { worker_register_job_name(WORKER_JOB_UPDATE_DETECTION_CHART, "detection chart"); worker_register_job_name(WORKER_JOB_UPDATE_ANOMALY_RATES, "anomaly rates"); worker_register_job_name(WORKER_JOB_UPDATE_CHARTS, "charts"); - worker_register_job_name(WORKER_JOB_SAVE_ANOMALY_EVENT, "anomaly event"); std::this_thread::sleep_for(Seconds{10}); @@ -3,38 +3,17 @@ #ifndef ML_HOST_H #define ML_HOST_H -#include "BitRateWindow.h" #include "Config.h" -#include "Database.h" #include "Dimension.h" #include "ml-private.h" +#include "json/single_include/nlohmann/json.hpp" namespace ml { class RrdHost { public: - RrdHost(RRDHOST *RH) : RH(RH) { - AnomalyRateRS = rrdset_create( - RH, - "anomaly_detection", - "anomaly_rates", - NULL, // name - "anomaly_rates", - NULL, // ctx - "Average anomaly rate", - "anomaly rate", - "netdata", - "ml", - 39189, - Cfg.DBEngineAnomalyRateEvery, - RRDSET_TYPE_LINE - ); - - AnomalyRateRS->flags = static_cast<RRDSET_FLAGS>( - static_cast<int>(AnomalyRateRS->flags) | RRDSET_FLAG_HIDDEN - ); - } + RrdHost(RRDHOST *RH) : RH(RH) {}; RRDHOST *getRH() { return RH; } @@ -55,7 +34,6 @@ public: protected: RRDHOST *RH; - RRDSET *AnomalyRateRS; // Protect dimension and lock maps std::mutex Mutex; @@ -80,6 +58,8 @@ public: memcpy(RU, &ResourceUsage, sizeof(struct rusage)); } + void getModelsAsJson(nlohmann::json &Json); + private: std::pair<Dimension *, Duration<double>> findDimensionToTrain(const TimePoint &NowTP); void trainDimension(Dimension *D, const TimePoint &NowTP); @@ -95,16 +75,6 @@ public: void startAnomalyDetectionThreads(); void stopAnomalyDetectionThreads(); - template<typename ...ArgTypes> - bool getAnomalyInfo(ArgTypes&&... Args) { - return DB.getAnomalyInfo(Args...); - } - - template<typename ...ArgTypes> - bool getAnomaliesInRange(ArgTypes&&... Args) { - return DB.getAnomaliesInRange(Args...); - } - void getDetectionInfoAsJson(nlohmann::json &Json) const; private: @@ -115,23 +85,12 @@ private: std::thread TrainingThread; std::thread DetectionThread; - BitRateWindow BRW{ - static_cast<size_t>(Cfg.ADMinWindowSize), - static_cast<size_t>(Cfg.ADMaxWindowSize), - static_cast<size_t>(Cfg.ADIdleWindowSize), - static_cast<size_t>(Cfg.ADMinWindowSize * Cfg.ADWindowRateThreshold) - }; - - CalculatedNumber WindowAnomalyRate{0.0}; + CalculatedNumber HostAnomalyRate{0.0}; size_t NumAnomalousDimensions{0}; size_t NumNormalDimensions{0}; size_t NumTrainedDimensions{0}; size_t NumActiveDimensions{0}; - - unsigned AnomalyRateTimer{0}; - - Database DB{Cfg.AnomalyDBPath}; }; using Host = DetectableHost; diff --git a/ml/KMeans.cc b/ml/KMeans.cc new file mode 100644 index 000000000..edc2ef49e --- /dev/null +++ b/ml/KMeans.cc @@ -0,0 +1,43 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +#include "KMeans.h" +#include <dlib/clustering.h> + +void KMeans::train(const std::vector<DSample> &Samples, size_t MaxIterations) { + MinDist = std::numeric_limits<CalculatedNumber>::max(); + MaxDist = std::numeric_limits<CalculatedNumber>::min(); + + ClusterCenters.clear(); + + dlib::pick_initial_centers(NumClusters, ClusterCenters, Samples); + dlib::find_clusters_using_kmeans(Samples, ClusterCenters, MaxIterations); + + for (const auto &S : Samples) { + CalculatedNumber MeanDist = 0.0; + + for (const auto &KMCenter : ClusterCenters) + MeanDist += dlib::length(KMCenter - S); + + MeanDist /= NumClusters; + + if (MeanDist < MinDist) + MinDist = MeanDist; + + if (MeanDist > MaxDist) + MaxDist = MeanDist; + } +} + +CalculatedNumber KMeans::anomalyScore(const DSample &Sample) const { + CalculatedNumber MeanDist = 0.0; + for (const auto &CC: ClusterCenters) + MeanDist += dlib::length(CC - Sample); + + MeanDist /= NumClusters; + + if (MaxDist == MinDist) + return 0.0; + + CalculatedNumber AnomalyScore = 100.0 * std::abs((MeanDist - MinDist) / (MaxDist - MinDist)); + return (AnomalyScore > 100.0) ? 100.0 : AnomalyScore; +} diff --git a/ml/kmeans/KMeans.h b/ml/KMeans.h index 4ea3b6a89..0398eeb86 100644 --- a/ml/kmeans/KMeans.h +++ b/ml/KMeans.h @@ -9,6 +9,7 @@ #include <mutex> #include "SamplesBuffer.h" +#include "json/single_include/nlohmann/json.hpp" class KMeans { public: @@ -17,8 +18,16 @@ public: MaxDist = std::numeric_limits<CalculatedNumber>::min(); }; - void train(SamplesBuffer &SB, size_t MaxIterations); - CalculatedNumber anomalyScore(SamplesBuffer &SB); + void train(const std::vector<DSample> &Samples, size_t MaxIterations); + CalculatedNumber anomalyScore(const DSample &Sample) const; + + void toJson(nlohmann::json &J) const { + J = nlohmann::json{ + {"CCs", ClusterCenters}, + {"MinDist", MinDist}, + {"MaxDist", MaxDist} + }; + } private: size_t NumClusters; @@ -27,8 +36,6 @@ private: CalculatedNumber MinDist; CalculatedNumber MaxDist; - - std::mutex Mutex; }; #endif /* KMEANS_H */ diff --git a/ml/Makefile.am b/ml/Makefile.am deleted file mode 100644 index 27449d659..000000000 --- a/ml/Makefile.am +++ /dev/null @@ -1,8 +0,0 @@ -# SPDX-License-Identifier: GPL-3.0-or-later - -AUTOMAKE_OPTIONS = subdir-objects -MAINTAINERCLEANFILES = $(srcdir)/Makefile.in - -SUBDIRS = \ - kmeans \ - $(NULL) diff --git a/ml/Query.h b/ml/Query.h index 24c5fa384..78d117003 100644 --- a/ml/Query.h +++ b/ml/Query.h @@ -7,8 +7,8 @@ namespace ml { class Query { public: - Query(RRDDIM *RD) : RD(RD) { - Ops = &RD->tiers[0]->query_ops; + Query(RRDDIM *RD) : RD(RD), Initialized(false) { + Ops = RD->tiers[0]->query_ops; } time_t latestTime() { @@ -20,27 +20,36 @@ public: } void init(time_t AfterT, time_t BeforeT) { - Ops->init(RD->tiers[0]->db_metric_handle, &Handle, AfterT, BeforeT, TIER_QUERY_FETCH_SUM); + Ops->init(RD->tiers[0]->db_metric_handle, &Handle, AfterT, BeforeT); + Initialized = true; + points_read = 0; } bool isFinished() { return Ops->is_finished(&Handle); } + ~Query() { + if (Initialized) { + Ops->finalize(&Handle); + global_statistics_ml_query_completed(points_read); + points_read = 0; + } + } + std::pair<time_t, CalculatedNumber> nextMetric() { + points_read++; STORAGE_POINT sp = Ops->next_metric(&Handle); return { sp.start_time, sp.sum / sp.count }; } - ~Query() { - Ops->finalize(&Handle); - } - private: RRDDIM *RD; + bool Initialized; + size_t points_read; - struct rrddim_query_ops *Ops; - struct rrddim_query_handle Handle; + struct storage_engine_query_ops *Ops; + struct storage_engine_query_handle Handle; }; } // namespace ml diff --git a/ml/README.md b/ml/README.md index 2578993e2..f6fd923ab 100644 --- a/ml/README.md +++ b/ml/README.md @@ -248,8 +248,6 @@ In terms of anomaly detection, the most interesting charts would be the `anomaly - `anomaly_detection.dimensions`: Percentage of anomalous dimensions. - `anomaly_detection.detector_window`: The length of the active window used by the detector. - `anomaly_detection.detector_events`: Flags (0 or 1) to show when an anomaly event has been triggered by the detector. -- `anomaly_detection.prediction_stats`: Diagnostic metrics relating to prediction time of anomaly detection. -- `anomaly_detection.training_stats`: Diagnostic metrics relating to training time of anomaly detection. Below is an example of how these charts may look in the presence of an anomaly event. diff --git a/ml/kmeans/SamplesBuffer.cc b/ml/SamplesBuffer.cc index d276c6e09..d276c6e09 100644 --- a/ml/kmeans/SamplesBuffer.cc +++ b/ml/SamplesBuffer.cc diff --git a/ml/kmeans/SamplesBuffer.h b/ml/SamplesBuffer.h index 1c7215cca..1c7215cca 100644 --- a/ml/kmeans/SamplesBuffer.h +++ b/ml/SamplesBuffer.h diff --git a/ml/kmeans/Tests.cc b/ml/SamplesBufferTests.cc index 0cb595945..5997a2a15 100644 --- a/ml/kmeans/Tests.cc +++ b/ml/SamplesBufferTests.cc @@ -36,7 +36,8 @@ TEST(SamplesBufferTest, NS_8_NDPS_1_DN_1_SN_3_LN_1) { CNs[6] = 0.2684839023122384; CNs[7] = 0.851332948637479; - SamplesBuffer SB(CNs, NumSamples, NumDimsPerSample, DiffN, SmoothN, LagN); + std::vector<uint32_t> RandNums(NumSamples, std::numeric_limits<uint32_t>::max()); + SamplesBuffer SB(CNs, NumSamples, NumDimsPerSample, DiffN, SmoothN, LagN, 1.0, RandNums); SB.preprocess(); std::vector<Sample> Samples = SB.getPreprocessedSamples(); @@ -76,7 +77,8 @@ TEST(SamplesBufferTest, NS_8_NDPS_1_DN_2_SN_3_LN_2) { CNs[6] = 0.15552559051428083; CNs[7] = 0.6309750314597955; - SamplesBuffer SB(CNs, NumSamples, NumDimsPerSample, DiffN, SmoothN, LagN); + std::vector<uint32_t> RandNums(NumSamples, std::numeric_limits<uint32_t>::max()); + SamplesBuffer SB(CNs, NumSamples, NumDimsPerSample, DiffN, SmoothN, LagN, 1.0, RandNums); SB.preprocess(); std::vector<Sample> Samples = SB.getPreprocessedSamples(); @@ -114,7 +116,8 @@ TEST(SamplesBufferTest, NS_8_NDPS_3_DN_2_SN_4_LN_1) { CNs[18] = 0.9394494507088997; CNs[19] =0.17567223681734334; CNs[20] = 0.42732886195446984; CNs[21] = 0.9460522396152958; CNs[22] =0.23462747016780894; CNs[23] = 0.35983249900892145; - SamplesBuffer SB(CNs, NumSamples, NumDimsPerSample, DiffN, SmoothN, LagN); + std::vector<uint32_t> RandNums(NumSamples, std::numeric_limits<uint32_t>::max()); + SamplesBuffer SB(CNs, NumSamples, NumDimsPerSample, DiffN, SmoothN, LagN, 1.0, RandNums); SB.preprocess(); std::vector<Sample> Samples = SB.getPreprocessedSamples(); diff --git a/ml/Tests.cc b/ml/Tests.cc deleted file mode 100644 index 7d369d48d..000000000 --- a/ml/Tests.cc +++ /dev/null @@ -1,301 +0,0 @@ -// SPDX-License-Identifier: GPL-3.0-or-later - -#include "BitBufferCounter.h" -#include "BitRateWindow.h" - -#include "gtest/gtest.h" - -using namespace ml; - -TEST(BitBufferCounterTest, Cap_4) { - size_t Capacity = 4; - BitBufferCounter BBC(Capacity); - - // No bits set - EXPECT_EQ(BBC.numSetBits(), 0); - - // All ones - for (size_t Idx = 0; Idx != (2 * Capacity); Idx++) { - BBC.insert(true); - - EXPECT_EQ(BBC.numSetBits(), std::min(Idx + 1, Capacity)); - } - - // All zeroes - for (size_t Idx = 0; Idx != Capacity; Idx++) { - BBC.insert(false); - - if (Idx < Capacity) - EXPECT_EQ(BBC.numSetBits(), Capacity - (Idx + 1)); - else - EXPECT_EQ(BBC.numSetBits(), 0); - } - - // Even ones/zeroes - for (size_t Idx = 0; Idx != (2 * Capacity); Idx++) - BBC.insert(Idx % 2 == 0); - EXPECT_EQ(BBC.numSetBits(), Capacity / 2); -} - -using State = BitRateWindow::State; -using Edge = BitRateWindow::Edge; -using Result = std::pair<Edge, size_t>; - -TEST(BitRateWindowTest, Cycles) { - /* Test the FSM by going through its two cycles: - * 1) NotFilled -> AboveThreshold -> Idle -> NotFilled - * 2) NotFilled -> BelowThreshold -> AboveThreshold -> Idle -> NotFilled - * - * Check the window's length on every new state transition. - */ - - size_t MinLength = 4, MaxLength = 6, IdleLength = 5; - size_t SetBitsThreshold = 3; - - Result R; - BitRateWindow BRW(MinLength, MaxLength, IdleLength, SetBitsThreshold); - - /* - * 1st cycle - */ - - // NotFilled -> AboveThreshold - R = BRW.insert(true); - EXPECT_EQ(R.first, std::make_pair(State::NotFilled, State::NotFilled)); - R = BRW.insert(true); - EXPECT_EQ(R.first, std::make_pair(State::NotFilled, State::NotFilled)); - R = BRW.insert(true); - EXPECT_EQ(R.first, std::make_pair(State::NotFilled, State::NotFilled)); - R = BRW.insert(true); - EXPECT_EQ(R.first, std::make_pair(State::NotFilled, State::AboveThreshold)); - EXPECT_EQ(R.second, MinLength); - - // AboveThreshold -> Idle - R = BRW.insert(true); - EXPECT_EQ(R.first, std::make_pair(State::AboveThreshold, State::AboveThreshold)); - R = BRW.insert(true); - EXPECT_EQ(R.first, std::make_pair(State::AboveThreshold, State::AboveThreshold)); - - R = BRW.insert(true); - EXPECT_EQ(R.first, std::make_pair(State::AboveThreshold, State::Idle)); - EXPECT_EQ(R.second, MaxLength); - - - // Idle -> NotFilled - R = BRW.insert(true); - EXPECT_EQ(R.first, std::make_pair(State::Idle, State::Idle)); - R = BRW.insert(true); - EXPECT_EQ(R.first, std::make_pair(State::Idle, State::Idle)); - R = BRW.insert(true); - EXPECT_EQ(R.first, std::make_pair(State::Idle, State::Idle)); - R = BRW.insert(true); - EXPECT_EQ(R.first, std::make_pair(State::Idle, State::Idle)); - R = BRW.insert(true); - EXPECT_EQ(R.first, std::make_pair(State::Idle, State::NotFilled)); - EXPECT_EQ(R.second, 1); - - // NotFilled -> AboveThreshold - R = BRW.insert(true); - EXPECT_EQ(R.first, std::make_pair(State::NotFilled, State::NotFilled)); - R = BRW.insert(true); - EXPECT_EQ(R.first, std::make_pair(State::NotFilled, State::NotFilled)); - R = BRW.insert(true); - EXPECT_EQ(R.first, std::make_pair(State::NotFilled, State::AboveThreshold)); - EXPECT_EQ(R.second, MinLength); - - /* - * 2nd cycle - */ - - BRW = BitRateWindow(MinLength, MaxLength, IdleLength, SetBitsThreshold); - - // NotFilled -> BelowThreshold - R = BRW.insert(false); - EXPECT_EQ(R.first, std::make_pair(State::NotFilled, State::NotFilled)); - R = BRW.insert(false); - EXPECT_EQ(R.first, std::make_pair(State::NotFilled, State::NotFilled)); - R = BRW.insert(false); - EXPECT_EQ(R.first, std::make_pair(State::NotFilled, State::NotFilled)); - R = BRW.insert(false); - EXPECT_EQ(R.first, std::make_pair(State::NotFilled, State::BelowThreshold)); - EXPECT_EQ(R.second, MinLength); - - // BelowThreshold -> BelowThreshold: - // Check the state's self loop by adding set bits that will keep the - // bit buffer below the specified threshold. - // - for (size_t Idx = 0; Idx != 2 * MaxLength; Idx++) { - R = BRW.insert(Idx % 2 == 0); - EXPECT_EQ(R.first, std::make_pair(State::BelowThreshold, State::BelowThreshold)); - EXPECT_EQ(R.second, MinLength); - } - - // Verify that at the end of the loop the internal bit buffer contains - // "1010". Do so by adding one set bit and checking that we remain below - // the specified threshold. - R = BRW.insert(true); - EXPECT_EQ(R.first, std::make_pair(State::BelowThreshold, State::BelowThreshold)); - EXPECT_EQ(R.second, MinLength); - - // BelowThreshold -> AboveThreshold - R = BRW.insert(true); - EXPECT_EQ(R.first, std::make_pair(State::BelowThreshold, State::AboveThreshold)); - EXPECT_EQ(R.second, MinLength); - - // AboveThreshold -> Idle: - // Do the transition without filling the max window size this time. - R = BRW.insert(false); - EXPECT_EQ(R.first, std::make_pair(State::AboveThreshold, State::Idle)); - EXPECT_EQ(R.second, MinLength); - - // Idle -> NotFilled - R = BRW.insert(false); - EXPECT_EQ(R.first, std::make_pair(State::Idle, State::Idle)); - R = BRW.insert(false); - EXPECT_EQ(R.first, std::make_pair(State::Idle, State::Idle)); - R = BRW.insert(false); - EXPECT_EQ(R.first, std::make_pair(State::Idle, State::Idle)); - R = BRW.insert(false); - EXPECT_EQ(R.first, std::make_pair(State::Idle, State::Idle)); - R = BRW.insert(false); - EXPECT_EQ(R.first, std::make_pair(State::Idle, State::NotFilled)); - EXPECT_EQ(R.second, 1); - - // NotFilled -> AboveThreshold - R = BRW.insert(true); - EXPECT_EQ(R.first, std::make_pair(State::NotFilled, State::NotFilled)); - R = BRW.insert(true); - EXPECT_EQ(R.first, std::make_pair(State::NotFilled, State::NotFilled)); - R = BRW.insert(true); - EXPECT_EQ(R.first, std::make_pair(State::NotFilled, State::AboveThreshold)); - EXPECT_EQ(R.second, MinLength); -} - -TEST(BitRateWindowTest, ConsecutiveOnes) { - size_t MinLength = 120, MaxLength = 240, IdleLength = 30; - size_t SetBitsThreshold = 30; - - Result R; - BitRateWindow BRW(MinLength, MaxLength, IdleLength, SetBitsThreshold); - - for (size_t Idx = 0; Idx != MaxLength; Idx++) - R = BRW.insert(false); - EXPECT_EQ(R.first, std::make_pair(State::BelowThreshold, State::BelowThreshold)); - EXPECT_EQ(R.second, MinLength); - - for (size_t Idx = 0; Idx != SetBitsThreshold; Idx++) { - EXPECT_EQ(R.first, std::make_pair(State::BelowThreshold, State::BelowThreshold)); - R = BRW.insert(true); - } - EXPECT_EQ(R.first, std::make_pair(State::BelowThreshold, State::AboveThreshold)); - EXPECT_EQ(R.second, MinLength); - - // At this point the window's buffer contains: - // (MinLength - SetBitsThreshold = 90) 0s, followed by - // (SetBitsThreshold = 30) 1s. - // - // To go below the threshold, we need to add (90 + 1) more 0s in the window's - // buffer. At that point, the the window's buffer will contain: - // (SetBitsThreshold = 29) 1s, followed by - // (MinLength - SetBitsThreshold = 91) 0s. - // - // Right before adding the last 0, we expect the window's length to be equal to 210, - // because the bit buffer has gone through these bits: - // (MinLength - SetBitsThreshold = 90) 0s, followed by - // (SetBitsThreshold = 30) 1s, followed by - // (MinLength - SetBitsThreshold = 90) 0s. - - for (size_t Idx = 0; Idx != (MinLength - SetBitsThreshold); Idx++) { - R = BRW.insert(false); - EXPECT_EQ(R.first, std::make_pair(State::AboveThreshold, State::AboveThreshold)); - } - EXPECT_EQ(R.second, 2 * MinLength - SetBitsThreshold); - R = BRW.insert(false); - EXPECT_EQ(R.first, std::make_pair(State::AboveThreshold, State::Idle)); - - // Continue with the Idle -> NotFilled edge. - for (size_t Idx = 0; Idx != IdleLength - 1; Idx++) { - R = BRW.insert(false); - EXPECT_EQ(R.first, std::make_pair(State::Idle, State::Idle)); - } - R = BRW.insert(false); - EXPECT_EQ(R.first, std::make_pair(State::Idle, State::NotFilled)); - EXPECT_EQ(R.second, 1); -} - -TEST(BitRateWindowTest, WithHoles) { - size_t MinLength = 120, MaxLength = 240, IdleLength = 30; - size_t SetBitsThreshold = 30; - - Result R; - BitRateWindow BRW(MinLength, MaxLength, IdleLength, SetBitsThreshold); - - for (size_t Idx = 0; Idx != MaxLength; Idx++) - R = BRW.insert(false); - - for (size_t Idx = 0; Idx != SetBitsThreshold / 3; Idx++) - R = BRW.insert(true); - for (size_t Idx = 0; Idx != SetBitsThreshold / 3; Idx++) - R = BRW.insert(false); - for (size_t Idx = 0; Idx != SetBitsThreshold / 3; Idx++) - R = BRW.insert(true); - for (size_t Idx = 0; Idx != SetBitsThreshold / 3; Idx++) - R = BRW.insert(false); - for (size_t Idx = 0; Idx != SetBitsThreshold / 3; Idx++) - R = BRW.insert(true); - - EXPECT_EQ(R.first, std::make_pair(State::BelowThreshold, State::AboveThreshold)); - EXPECT_EQ(R.second, MinLength); - - // The window's bit buffer contains: - // 70 0s, 10 1s, 10 0s, 10 1s, 10 0s, 10 1s. - // Where: 70 = MinLength - (5 / 3) * SetBitsThresholds, ie. we need - // to add (70 + 1) more zeros to make the bit buffer go below the - // threshold and then the window's length should be: - // 70 + 50 + 70 = 190. - - BitRateWindow::Edge E; - do { - R = BRW.insert(false); - E = R.first; - } while (E.first != State::AboveThreshold || E.second != State::Idle); - EXPECT_EQ(R.second, 2 * MinLength - (5 * SetBitsThreshold) / 3); -} - -TEST(BitRateWindowTest, MinWindow) { - size_t MinLength = 120, MaxLength = 240, IdleLength = 30; - size_t SetBitsThreshold = 30; - - Result R; - BitRateWindow BRW(MinLength, MaxLength, IdleLength, SetBitsThreshold); - - BRW.insert(true); - BRW.insert(false); - for (size_t Idx = 2; Idx != SetBitsThreshold; Idx++) - BRW.insert(true); - for (size_t Idx = SetBitsThreshold; Idx != MinLength - 1; Idx++) - BRW.insert(false); - - R = BRW.insert(true); - EXPECT_EQ(R.first, std::make_pair(State::NotFilled, State::AboveThreshold)); - EXPECT_EQ(R.second, MinLength); - - R = BRW.insert(false); - EXPECT_EQ(R.first, std::make_pair(State::AboveThreshold, State::Idle)); -} - -TEST(BitRateWindowTest, MaxWindow) { - size_t MinLength = 100, MaxLength = 200, IdleLength = 30; - size_t SetBitsThreshold = 50; - - Result R; - BitRateWindow BRW(MinLength, MaxLength, IdleLength, SetBitsThreshold); - - for (size_t Idx = 0; Idx != MaxLength; Idx++) - R = BRW.insert(Idx % 2 == 0); - EXPECT_EQ(R.first, std::make_pair(State::AboveThreshold, State::AboveThreshold)); - EXPECT_EQ(R.second, MaxLength); - - R = BRW.insert(false); - EXPECT_EQ(R.first, std::make_pair(State::AboveThreshold, State::Idle)); -} diff --git a/ml/kmeans/KMeans.cc b/ml/kmeans/KMeans.cc deleted file mode 100644 index e66c66c16..000000000 --- a/ml/kmeans/KMeans.cc +++ /dev/null @@ -1,55 +0,0 @@ -// SPDX-License-Identifier: GPL-3.0-or-later - -#include "KMeans.h" -#include <dlib/clustering.h> - -void KMeans::train(SamplesBuffer &SB, size_t MaxIterations) { - std::vector<DSample> Samples = SB.preprocess(); - - MinDist = std::numeric_limits<CalculatedNumber>::max(); - MaxDist = std::numeric_limits<CalculatedNumber>::min(); - - { - std::lock_guard<std::mutex> Lock(Mutex); - - ClusterCenters.clear(); - - dlib::pick_initial_centers(NumClusters, ClusterCenters, Samples); - dlib::find_clusters_using_kmeans(Samples, ClusterCenters, MaxIterations); - - for (const auto &S : Samples) { - CalculatedNumber MeanDist = 0.0; - - for (const auto &KMCenter : ClusterCenters) - MeanDist += dlib::length(KMCenter - S); - - MeanDist /= NumClusters; - - if (MeanDist < MinDist) - MinDist = MeanDist; - - if (MeanDist > MaxDist) - MaxDist = MeanDist; - } - } -} - -CalculatedNumber KMeans::anomalyScore(SamplesBuffer &SB) { - std::vector<DSample> DSamples = SB.preprocess(); - - std::unique_lock<std::mutex> Lock(Mutex, std::defer_lock); - if (!Lock.try_lock()) - return std::numeric_limits<CalculatedNumber>::quiet_NaN(); - - CalculatedNumber MeanDist = 0.0; - for (const auto &CC: ClusterCenters) - MeanDist += dlib::length(CC - DSamples.back()); - - MeanDist /= NumClusters; - - if (MaxDist == MinDist) - return 0.0; - - CalculatedNumber AnomalyScore = 100.0 * std::abs((MeanDist - MinDist) / (MaxDist - MinDist)); - return (AnomalyScore > 100.0) ? 100.0 : AnomalyScore; -} diff --git a/ml/kmeans/Makefile.am b/ml/kmeans/Makefile.am deleted file mode 100644 index babdcf0df..000000000 --- a/ml/kmeans/Makefile.am +++ /dev/null @@ -1,4 +0,0 @@ -# SPDX-License-Identifier: GPL-3.0-or-later - -AUTOMAKE_OPTIONS = subdir-objects -MAINTAINERCLEANFILES = $(srcdir)/Makefile.in diff --git a/ml/ml-dummy.c b/ml/ml-dummy.c index fad480f4b..492dfe2fc 100644 --- a/ml/ml-dummy.c +++ b/ml/ml-dummy.c @@ -24,38 +24,23 @@ char *ml_get_host_info(RRDHOST *RH) { return NULL; } -void ml_new_dimension(RRDDIM *RD) { (void) RD; } - -void ml_delete_dimension(RRDDIM *RD) { (void) RD; } - -bool ml_is_anomalous(RRDDIM *RD, double Value, bool Exists) { - (void) RD; (void) Value; (void) Exists; - return false; -} - -char *ml_get_anomaly_events(RRDHOST *RH, const char *AnomalyDetectorName, - int AnomalyDetectorVersion, time_t After, time_t Before) { - (void) RH; (void) AnomalyDetectorName; - (void) AnomalyDetectorVersion; (void) After; (void) Before; +char *ml_get_host_runtime_info(RRDHOST *RH) { + (void) RH; return NULL; } -char *ml_get_anomaly_event_info(RRDHOST *RH, const char *AnomalyDetectorName, - int AnomalyDetectorVersion, time_t After, time_t Before) { - (void) RH; (void) AnomalyDetectorName; - (void) AnomalyDetectorVersion; (void) After; (void) Before; +char *ml_get_host_models(RRDHOST *RH) { + (void) RH; return NULL; } -void ml_process_rrdr(RRDR *R, int MaxAnomalyRates) { - (void) R; - (void) MaxAnomalyRates; -} +void ml_new_dimension(RRDDIM *RD) { (void) RD; } -void ml_dimension_update_name(RRDSET *RS, RRDDIM *RD, const char *name) { - (void) RS; - (void) RD; - (void) name; +void ml_delete_dimension(RRDDIM *RD) { (void) RD; } + +bool ml_is_anomalous(RRDDIM *RD, double Value, bool Exists) { + (void) RD; (void) Value; (void) Exists; + return false; } bool ml_streaming_enabled() { diff --git a/ml/ml-private.h b/ml/ml-private.h index 7b3e00684..2bd72ac5a 100644 --- a/ml/ml-private.h +++ b/ml/ml-private.h @@ -3,7 +3,7 @@ #ifndef ML_PRIVATE_H #define ML_PRIVATE_H -#include "kmeans/KMeans.h" +#include "KMeans.h" #include "ml/ml.h" #include <chrono> @@ -16,7 +16,7 @@ bool ml_enabled(RRDHOST *RH) { if (!Cfg.EnableAnomalyDetection) return false; - if (simple_pattern_matches(Cfg.SP_HostsToSkip, RH->hostname)) + if (simple_pattern_matches(Cfg.SP_HostsToSkip, rrdhost_hostname(RH))) return false; return true; @@ -76,7 +76,7 @@ void ml_new_dimension(RRDDIM *RD) { if (static_cast<unsigned>(RD->update_every) != H->updateEvery()) return; - if (simple_pattern_matches(Cfg.SP_ChartsToSkip, RS->name)) + if (simple_pattern_matches(Cfg.SP_ChartsToSkip, rrdset_name(RS))) return; Dimension *D = new Dimension(RD); @@ -108,7 +108,7 @@ char *ml_get_host_info(RRDHOST *RH) { ConfigJson["enabled"] = false; } - return strdup(ConfigJson.dump(2, '\t').c_str()); + return strdupz(ConfigJson.dump(2, '\t').c_str()); } char *ml_get_host_runtime_info(RRDHOST *RH) { @@ -124,97 +124,24 @@ char *ml_get_host_runtime_info(RRDHOST *RH) { return strdup(ConfigJson.dump(1, '\t').c_str()); } -bool ml_is_anomalous(RRDDIM *RD, double Value, bool Exists) { - Dimension *D = static_cast<Dimension *>(RD->ml_dimension); - if (!D) - return false; - - D->addValue(Value, Exists); - bool Result = D->predict().second; - return Result; -} - -char *ml_get_anomaly_events(RRDHOST *RH, const char *AnomalyDetectorName, - int AnomalyDetectorVersion, time_t After, time_t Before) { - if (!RH || !RH->ml_host) { - error("No host"); - return nullptr; - } - - Host *H = static_cast<Host *>(RH->ml_host); - std::vector<std::pair<time_t, time_t>> TimeRanges; - - bool Res = H->getAnomaliesInRange(TimeRanges, AnomalyDetectorName, - AnomalyDetectorVersion, - H->getUUID(), - After, Before); - if (!Res) { - error("DB result is empty"); - return nullptr; - } - - nlohmann::json Json = TimeRanges; - return strdup(Json.dump(4).c_str()); -} - -char *ml_get_anomaly_event_info(RRDHOST *RH, const char *AnomalyDetectorName, - int AnomalyDetectorVersion, time_t After, time_t Before) { - if (!RH || !RH->ml_host) { - error("No host"); - return nullptr; - } - - Host *H = static_cast<Host *>(RH->ml_host); +char *ml_get_host_models(RRDHOST *RH) { + nlohmann::json ModelsJson; - nlohmann::json Json; - bool Res = H->getAnomalyInfo(Json, AnomalyDetectorName, - AnomalyDetectorVersion, - H->getUUID(), - After, Before); - if (!Res) { - error("DB result is empty"); - return nullptr; + if (RH && RH->ml_host) { + Host *H = static_cast<Host *>(RH->ml_host); + H->getModelsAsJson(ModelsJson); + return strdup(ModelsJson.dump(2, '\t').c_str()); } - return strdup(Json.dump(4, '\t').c_str()); -} - -void ml_process_rrdr(RRDR *R, int MaxAnomalyRates) { - if (R->rows != 1) - return; - - if (MaxAnomalyRates < 1 || MaxAnomalyRates >= R->d) - return; - - NETDATA_DOUBLE *CNs = R->v; - RRDR_DIMENSION_FLAGS *DimFlags = R->od; - - std::vector<std::pair<NETDATA_DOUBLE, int>> V; - - V.reserve(R->d); - for (int Idx = 0; Idx != R->d; Idx++) - V.emplace_back(CNs[Idx], Idx); - - std::sort(V.rbegin(), V.rend()); - - for (int Idx = MaxAnomalyRates; Idx != R->d; Idx++) { - int UnsortedIdx = V[Idx].second; - - int OldFlags = static_cast<int>(DimFlags[UnsortedIdx]); - int NewFlags = OldFlags | RRDR_DIMENSION_HIDDEN; - - DimFlags[UnsortedIdx] = static_cast<rrdr_dimension_flag>(NewFlags); - } + return nullptr; } -void ml_dimension_update_name(RRDSET *RS, RRDDIM *RD, const char *Name) { - (void) RS; - +bool ml_is_anomalous(RRDDIM *RD, double Value, bool Exists) { Dimension *D = static_cast<Dimension *>(RD->ml_dimension); if (!D) - return; + return false; - D->setAnomalyRateRDName(Name); + return D->predict(Value, Exists); } bool ml_streaming_enabled() { @@ -12,7 +12,7 @@ extern "C" { // This is a DBEngine function redeclared here so that we can free // the anomaly rate dimension, whenever its backing dimension is freed. -extern void rrddim_free(RRDSET *st, RRDDIM *rd); +void rrddim_free(RRDSET *st, RRDDIM *rd); typedef void* ml_host_t; typedef void* ml_dimension_t; @@ -28,22 +28,13 @@ void ml_delete_host(RRDHOST *RH); char *ml_get_host_info(RRDHOST *RH); char *ml_get_host_runtime_info(RRDHOST *RH); +char *ml_get_host_models(RRDHOST *RH); void ml_new_dimension(RRDDIM *RD); void ml_delete_dimension(RRDDIM *RD); bool ml_is_anomalous(RRDDIM *RD, double value, bool exists); -char *ml_get_anomaly_events(RRDHOST *RH, const char *AnomalyDetectorName, - int AnomalyDetectorVersion, time_t After, time_t Before); - -char *ml_get_anomaly_event_info(RRDHOST *RH, const char *AnomalyDetectorName, - int AnomalyDetectorVersion, time_t After, time_t Before); - -void ml_process_rrdr(RRDR *R, int MaxAnomalyRates); - -void ml_dimension_update_name(RRDSET *RS, RRDDIM *RD, const char *name); - bool ml_streaming_enabled(); #define ML_ANOMALY_RATES_CHART_ID "anomaly_detection.anomaly_rates" |