diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2021-12-01 06:15:11 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2021-12-01 06:15:11 +0000 |
commit | 483926a283e118590da3f9ecfa75a8a4d62143ce (patch) | |
tree | cb77052778df9a128a8cd3ff5bf7645322a13bc5 /ml | |
parent | Releasing debian version 1.31.0-4. (diff) | |
download | netdata-483926a283e118590da3f9ecfa75a8a4d62143ce.tar.xz netdata-483926a283e118590da3f9ecfa75a8a4d62143ce.zip |
Merging upstream version 1.32.0.
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'ml')
-rw-r--r-- | ml/BitBufferCounter.cc | 29 | ||||
-rw-r--r-- | ml/BitBufferCounter.h | 54 | ||||
-rw-r--r-- | ml/BitRateWindow.cc | 75 | ||||
-rw-r--r-- | ml/BitRateWindow.h | 170 | ||||
-rw-r--r-- | ml/Config.cc | 128 | ||||
-rw-r--r-- | ml/Config.h | 45 | ||||
-rw-r--r-- | ml/Database.cc | 127 | ||||
-rw-r--r-- | ml/Database.h | 131 | ||||
-rw-r--r-- | ml/Dimension.cc | 169 | ||||
-rw-r--r-- | ml/Dimension.h | 124 | ||||
-rw-r--r-- | ml/Host.cc | 458 | ||||
-rw-r--r-- | ml/Host.h | 104 | ||||
-rw-r--r-- | ml/Makefile.am | 8 | ||||
-rw-r--r-- | ml/Query.h | 49 | ||||
-rw-r--r-- | ml/Tests.cc | 301 | ||||
-rw-r--r-- | ml/kmeans/KMeans.cc | 55 | ||||
-rw-r--r-- | ml/kmeans/KMeans.h | 34 | ||||
-rw-r--r-- | ml/kmeans/Makefile.am | 4 | ||||
-rw-r--r-- | ml/kmeans/SamplesBuffer.cc | 144 | ||||
-rw-r--r-- | ml/kmeans/SamplesBuffer.h | 140 | ||||
-rw-r--r-- | ml/kmeans/Tests.cc | 143 | ||||
-rw-r--r-- | ml/ml-dummy.c | 38 | ||||
-rw-r--r-- | ml/ml-private.h | 26 | ||||
-rw-r--r-- | ml/ml.cc | 153 | ||||
-rw-r--r-- | ml/ml.h | 41 |
25 files changed, 2750 insertions, 0 deletions
diff --git a/ml/BitBufferCounter.cc b/ml/BitBufferCounter.cc new file mode 100644 index 000000000..5e1ab5aca --- /dev/null +++ b/ml/BitBufferCounter.cc @@ -0,0 +1,29 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +#include "BitBufferCounter.h" + +using namespace ml; + +std::vector<bool> BitBufferCounter::getBuffer() const { + std::vector<bool> Buffer; + + for (size_t Idx = start(); Idx != (start() + size()); Idx++) + Buffer.push_back(V[Idx % V.size()]); + + return Buffer; +} + +void BitBufferCounter::insert(bool Bit) { + if (N >= V.size()) + NumSetBits -= (V[start()] == true); + + NumSetBits += (Bit == true); + V[N++ % V.size()] = Bit; +} + +void BitBufferCounter::print(std::ostream &OS) const { + std::vector<bool> Buffer = getBuffer(); + + for (bool B : Buffer) + OS << B; +} diff --git a/ml/BitBufferCounter.h b/ml/BitBufferCounter.h new file mode 100644 index 000000000..db924d776 --- /dev/null +++ b/ml/BitBufferCounter.h @@ -0,0 +1,54 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +#ifndef BIT_BUFFER_COUNTER_H +#define BIT_BUFFER_COUNTER_H + +#include "ml-private.h" + +namespace ml { + +class BitBufferCounter { +public: + BitBufferCounter(size_t Capacity) : V(Capacity, 0), NumSetBits(0), N(0) {} + + std::vector<bool> getBuffer() const; + + void insert(bool Bit); + + void print(std::ostream &OS) const; + + bool isFilled() const { + return N >= V.size(); + } + + size_t numSetBits() const { + return NumSetBits; + } + +private: + inline size_t size() const { + return N < V.size() ? N : V.size(); + } + + inline size_t start() const { + if (N <= V.size()) + return 0; + + return N % V.size(); + } + +private: + std::vector<bool> V; + size_t NumSetBits; + + size_t N; +}; + +} // namespace ml + +inline std::ostream& operator<<(std::ostream &OS, const ml::BitBufferCounter &BBC) { + BBC.print(OS); + return OS; +} + +#endif /* BIT_BUFFER_COUNTER_H */ diff --git a/ml/BitRateWindow.cc b/ml/BitRateWindow.cc new file mode 100644 index 000000000..c4c994c42 --- /dev/null +++ b/ml/BitRateWindow.cc @@ -0,0 +1,75 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +#include "BitRateWindow.h" + +using namespace ml; + +std::pair<BitRateWindow::Edge, size_t> BitRateWindow::insert(bool Bit) { + Edge E; + + BBC.insert(Bit); + switch (CurrState) { + case State::NotFilled: { + if (BBC.isFilled()) { + if (BBC.numSetBits() < SetBitsThreshold) { + CurrState = State::BelowThreshold; + } else { + CurrState = State::AboveThreshold; + } + } else { + CurrState = State::NotFilled; + } + + E = {State::NotFilled, CurrState}; + break; + } case State::BelowThreshold: { + if (BBC.numSetBits() >= SetBitsThreshold) { + CurrState = State::AboveThreshold; + } + + E = {State::BelowThreshold, CurrState}; + break; + } case State::AboveThreshold: { + if ((BBC.numSetBits() < SetBitsThreshold) || + (CurrLength == MaxLength)) { + CurrState = State::Idle; + } + + E = {State::AboveThreshold, CurrState}; + break; + } case State::Idle: { + if (CurrLength == IdleLength) { + CurrState = State::NotFilled; + } + + E = {State::Idle, CurrState}; + break; + } + } + + Action A = EdgeActions[E]; + size_t L = (this->*A)(E.first, Bit); + return {E, L}; +} + +void BitRateWindow::print(std::ostream &OS) const { + switch (CurrState) { + case State::NotFilled: + OS << "NotFilled"; + break; + case State::BelowThreshold: + OS << "BelowThreshold"; + break; + case State::AboveThreshold: + OS << "AboveThreshold"; + break; + case State::Idle: + OS << "Idle"; + break; + default: + OS << "UnknownState"; + break; + } + + OS << ": " << BBC << " (Current Length: " << CurrLength << ")"; +} diff --git a/ml/BitRateWindow.h b/ml/BitRateWindow.h new file mode 100644 index 000000000..0d99008b8 --- /dev/null +++ b/ml/BitRateWindow.h @@ -0,0 +1,170 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +#ifndef BIT_RATE_WINDOW_H +#define BIT_RATE_WINDOW_H + +#include "BitBufferCounter.h" +#include "ml-private.h" + +namespace ml { + +class BitRateWindow { +public: + enum class State { + NotFilled, + BelowThreshold, + AboveThreshold, + Idle + }; + + using Edge = std::pair<State, State>; + using Action = size_t (BitRateWindow::*)(State PrevState, bool NewBit); + +private: + std::map<Edge, Action> EdgeActions = { + // From == To + { + Edge(State::NotFilled, State::NotFilled), + &BitRateWindow::onRoundtripNotFilled, + }, + { + Edge(State::BelowThreshold, State::BelowThreshold), + &BitRateWindow::onRoundtripBelowThreshold, + }, + { + Edge(State::AboveThreshold, State::AboveThreshold), + &BitRateWindow::onRoundtripAboveThreshold, + }, + { + Edge(State::Idle, State::Idle), + &BitRateWindow::onRoundtripIdle, + }, + + + // NotFilled => {BelowThreshold, AboveThreshold} + { + Edge(State::NotFilled, State::BelowThreshold), + &BitRateWindow::onNotFilledToBelowThreshold + }, + { + Edge(State::NotFilled, State::AboveThreshold), + &BitRateWindow::onNotFilledToAboveThreshold + }, + + // BelowThreshold => AboveThreshold + { + Edge(State::BelowThreshold, State::AboveThreshold), + &BitRateWindow::onBelowToAboveThreshold + }, + + // AboveThreshold => Idle + { + Edge(State::AboveThreshold, State::Idle), + &BitRateWindow::onAboveThresholdToIdle + }, + + // Idle => NotFilled + { + Edge(State::Idle, State::NotFilled), + &BitRateWindow::onIdleToNotFilled + }, + }; + +public: + BitRateWindow(size_t MinLength, size_t MaxLength, size_t IdleLength, + size_t SetBitsThreshold) : + MinLength(MinLength), MaxLength(MaxLength), IdleLength(IdleLength), + SetBitsThreshold(SetBitsThreshold), + CurrState(State::NotFilled), CurrLength(0), BBC(MinLength) {} + + std::pair<Edge, size_t> insert(bool Bit); + + void print(std::ostream &OS) const; + +private: + size_t onRoundtripNotFilled(State PrevState, bool NewBit) { + (void) PrevState, (void) NewBit; + + CurrLength += 1; + return CurrLength; + } + + size_t onRoundtripBelowThreshold(State PrevState, bool NewBit) { + (void) PrevState, (void) NewBit; + + CurrLength = MinLength; + return CurrLength; + } + + size_t onRoundtripAboveThreshold(State PrevState, bool NewBit) { + (void) PrevState, (void) NewBit; + + CurrLength += 1; + return CurrLength; + } + + size_t onRoundtripIdle(State PrevState, bool NewBit) { + (void) PrevState, (void) NewBit; + + CurrLength += 1; + return CurrLength; + } + + size_t onNotFilledToBelowThreshold(State PrevState, bool NewBit) { + (void) PrevState, (void) NewBit; + + CurrLength = MinLength; + return CurrLength; + } + + size_t onNotFilledToAboveThreshold(State PrevState, bool NewBit) { + (void) PrevState, (void) NewBit; + + CurrLength += 1; + return CurrLength; + } + + size_t onBelowToAboveThreshold(State PrevState, bool NewBit) { + (void) PrevState, (void) NewBit; + + CurrLength = MinLength; + return CurrLength; + } + + size_t onAboveThresholdToIdle(State PrevState, bool NewBit) { + (void) PrevState, (void) NewBit; + + size_t PrevLength = CurrLength; + CurrLength = 1; + return PrevLength; + } + + size_t onIdleToNotFilled(State PrevState, bool NewBit) { + (void) PrevState, (void) NewBit; + + BBC = BitBufferCounter(MinLength); + BBC.insert(NewBit); + + CurrLength = 1; + return CurrLength; + } + +private: + size_t MinLength; + size_t MaxLength; + size_t IdleLength; + size_t SetBitsThreshold; + + State CurrState; + size_t CurrLength; + BitBufferCounter BBC; +}; + +} // namespace ml + +inline std::ostream& operator<<(std::ostream &OS, const ml::BitRateWindow BRW) { + BRW.print(OS); + return OS; +} + +#endif /* BIT_RATE_WINDOW_H */ diff --git a/ml/Config.cc b/ml/Config.cc new file mode 100644 index 000000000..f48f9b39f --- /dev/null +++ b/ml/Config.cc @@ -0,0 +1,128 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +#include "Config.h" +#include "ml-private.h" + +using namespace ml; + +/* + * Global configuration instance to be shared between training and + * prediction threads. + */ +Config ml::Cfg; + +template <typename T> +static T clamp(const T& Value, const T& Min, const T& Max) { + return std::max(Min, std::min(Value, Max)); +} + +/* + * Initialize global configuration variable. + */ +void Config::readMLConfig(void) { + const char *ConfigSectionML = CONFIG_SECTION_ML; + + bool EnableAnomalyDetection = config_get_boolean(ConfigSectionML, "enabled", false); + + /* + * Read values + */ + + unsigned MaxTrainSamples = config_get_number(ConfigSectionML, "maximum num samples to train", 4 * 3600); + unsigned MinTrainSamples = config_get_number(ConfigSectionML, "minimum num samples to train", 1 * 3600); + unsigned TrainEvery = config_get_number(ConfigSectionML, "train every", 1 * 3600); + + unsigned DiffN = config_get_number(ConfigSectionML, "num samples to diff", 1); + unsigned SmoothN = config_get_number(ConfigSectionML, "num samples to smooth", 3); + unsigned LagN = config_get_number(ConfigSectionML, "num samples to lag", 5); + + unsigned MaxKMeansIters = config_get_number(ConfigSectionML, "maximum number of k-means iterations", 1000); + + double DimensionAnomalyScoreThreshold = config_get_float(ConfigSectionML, "dimension anomaly score threshold", 0.99); + double HostAnomalyRateThreshold = config_get_float(ConfigSectionML, "host anomaly rate threshold", 0.01); + + double ADMinWindowSize = config_get_float(ConfigSectionML, "minimum window size", 30); + double ADMaxWindowSize = config_get_float(ConfigSectionML, "maximum window size", 600); + double ADIdleWindowSize = config_get_float(ConfigSectionML, "idle window size", 30); + double ADWindowRateThreshold = config_get_float(ConfigSectionML, "window minimum anomaly rate", 0.25); + double ADDimensionRateThreshold = config_get_float(ConfigSectionML, "anomaly event min dimension rate threshold", 0.05); + + std::string HostsToSkip = config_get(ConfigSectionML, "hosts to skip from training", "!*"); + std::string ChartsToSkip = config_get(ConfigSectionML, "charts to skip from training", + "!system.* !cpu.* !mem.* !disk.* !disk_* " + "!ip.* !ipv4.* !ipv6.* !net.* !net_* !netfilter.* " + "!services.* !apps.* !groups.* !user.* !ebpf.* !netdata.* *"); + + std::stringstream SS; + SS << netdata_configured_cache_dir << "/anomaly-detection.db"; + Cfg.AnomalyDBPath = SS.str(); + + /* + * Clamp + */ + + MaxTrainSamples = clamp(MaxTrainSamples, 1 * 3600u, 6 * 3600u); + MinTrainSamples = clamp(MinTrainSamples, 1 * 3600u, 6 * 3600u); + TrainEvery = clamp(TrainEvery, 1 * 3600u, 6 * 3600u); + + DiffN = clamp(DiffN, 0u, 1u); + SmoothN = clamp(SmoothN, 0u, 5u); + LagN = clamp(LagN, 0u, 5u); + + MaxKMeansIters = clamp(MaxKMeansIters, 500u, 1000u); + + DimensionAnomalyScoreThreshold = clamp(DimensionAnomalyScoreThreshold, 0.01, 5.00); + HostAnomalyRateThreshold = clamp(HostAnomalyRateThreshold, 0.01, 1.0); + + ADMinWindowSize = clamp(ADMinWindowSize, 30.0, 300.0); + ADMaxWindowSize = clamp(ADMaxWindowSize, 60.0, 900.0); + ADIdleWindowSize = clamp(ADIdleWindowSize, 30.0, 900.0); + ADWindowRateThreshold = clamp(ADWindowRateThreshold, 0.01, 0.99); + ADDimensionRateThreshold = clamp(ADDimensionRateThreshold, 0.01, 0.99); + + /* + * Validate + */ + + if (MinTrainSamples >= MaxTrainSamples) { + error("invalid min/max train samples found (%u >= %u)", MinTrainSamples, MaxTrainSamples); + + MinTrainSamples = 1 * 3600; + MaxTrainSamples = 4 * 3600; + } + + if (ADMinWindowSize >= ADMaxWindowSize) { + error("invalid min/max anomaly window size found (%lf >= %lf)", ADMinWindowSize, ADMaxWindowSize); + + ADMinWindowSize = 30.0; + ADMaxWindowSize = 600.0; + } + + /* + * Assign to config instance + */ + + Cfg.EnableAnomalyDetection = EnableAnomalyDetection; + + Cfg.MaxTrainSamples = MaxTrainSamples; + Cfg.MinTrainSamples = MinTrainSamples; + Cfg.TrainEvery = TrainEvery; + + Cfg.DiffN = DiffN; + Cfg.SmoothN = SmoothN; + Cfg.LagN = LagN; + + Cfg.MaxKMeansIters = MaxKMeansIters; + + Cfg.DimensionAnomalyScoreThreshold = DimensionAnomalyScoreThreshold; + Cfg.HostAnomalyRateThreshold = HostAnomalyRateThreshold; + + Cfg.ADMinWindowSize = ADMinWindowSize; + Cfg.ADMaxWindowSize = ADMaxWindowSize; + Cfg.ADIdleWindowSize = ADIdleWindowSize; + Cfg.ADWindowRateThreshold = ADWindowRateThreshold; + Cfg.ADDimensionRateThreshold = ADDimensionRateThreshold; + + Cfg.SP_HostsToSkip = simple_pattern_create(HostsToSkip.c_str(), NULL, SIMPLE_PATTERN_EXACT); + Cfg.SP_ChartsToSkip = simple_pattern_create(ChartsToSkip.c_str(), NULL, SIMPLE_PATTERN_EXACT); +} diff --git a/ml/Config.h b/ml/Config.h new file mode 100644 index 000000000..f29bae3a6 --- /dev/null +++ b/ml/Config.h @@ -0,0 +1,45 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +#ifndef ML_CONFIG_H +#define ML_CONFIG_H + +#include "ml-private.h" + +namespace ml { + +class Config { +public: + bool EnableAnomalyDetection; + + unsigned MaxTrainSamples; + unsigned MinTrainSamples; + unsigned TrainEvery; + + unsigned DiffN; + unsigned SmoothN; + unsigned LagN; + + unsigned MaxKMeansIters; + + double DimensionAnomalyScoreThreshold; + double HostAnomalyRateThreshold; + + double ADMinWindowSize; + double ADMaxWindowSize; + double ADIdleWindowSize; + double ADWindowRateThreshold; + double ADDimensionRateThreshold; + + SIMPLE_PATTERN *SP_HostsToSkip; + SIMPLE_PATTERN *SP_ChartsToSkip; + + std::string AnomalyDBPath; + + void readMLConfig(); +}; + +extern Config Cfg; + +} // namespace ml + +#endif /* ML_CONFIG_H */ diff --git a/ml/Database.cc b/ml/Database.cc new file mode 100644 index 000000000..06d0cdecb --- /dev/null +++ b/ml/Database.cc @@ -0,0 +1,127 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +#include "Database.h" + +const char *ml::Database::SQL_CREATE_ANOMALIES_TABLE = + "CREATE TABLE IF NOT EXISTS anomaly_events( " + " anomaly_detector_name text NOT NULL, " + " anomaly_detector_version int NOT NULL, " + " host_id text NOT NULL, " + " after int NOT NULL, " + " before int NOT NULL, " + " anomaly_event_info text, " + " PRIMARY KEY( " + " anomaly_detector_name, anomaly_detector_version, " + " host_id, after, before " + " ) " + ");"; + +const char *ml::Database::SQL_INSERT_ANOMALY = + "INSERT INTO anomaly_events( " + " anomaly_detector_name, anomaly_detector_version, " + " host_id, after, before, anomaly_event_info) " + "VALUES (?1, ?2, ?3, ?4, ?5, ?6);"; + +const char *ml::Database::SQL_SELECT_ANOMALY = + "SELECT anomaly_event_info FROM anomaly_events WHERE" + " anomaly_detector_name == ?1 AND" + " anomaly_detector_version == ?2 AND" + " host_id == ?3 AND" + " after == ?4 AND" + " before == ?5;"; + +const char *ml::Database::SQL_SELECT_ANOMALY_EVENTS = + "SELECT after, before FROM anomaly_events WHERE" + " anomaly_detector_name == ?1 AND" + " anomaly_detector_version == ?2 AND" + " host_id == ?3 AND" + " after >= ?4 AND" + " before <= ?5;"; + +using namespace ml; + +bool Statement::prepare(sqlite3 *Conn) { + if (!Conn) + return false; + + if (ParsedStmt) + return true; + + int RC = sqlite3_prepare_v2(Conn, RawStmt, -1, &ParsedStmt, nullptr); + if (RC == SQLITE_OK) + return true; + + std::string Msg = "Statement \"%s\" preparation failed due to \"%s\""; + error(Msg.c_str(), RawStmt, sqlite3_errstr(RC)); + + return false; +} + +bool Statement::bindValue(size_t Pos, const std::string &Value) { + int RC = sqlite3_bind_text(ParsedStmt, Pos, Value.c_str(), -1, SQLITE_TRANSIENT); + if (RC == SQLITE_OK) + return true; + + error("Failed to bind text '%s' (pos = %zu) in statement '%s'.", Value.c_str(), Pos, RawStmt); + return false; +} + +bool Statement::bindValue(size_t Pos, const int Value) { + int RC = sqlite3_bind_int(ParsedStmt, Pos, Value); + if (RC == SQLITE_OK) + return true; + + error("Failed to bind integer %d (pos = %zu) in statement '%s'.", Value, Pos, RawStmt); + return false; +} + +bool Statement::resetAndClear(bool Ret) { + int RC = sqlite3_reset(ParsedStmt); + if (RC != SQLITE_OK) { + error("Could not reset statement: '%s'", RawStmt); + return false; + } + + RC = sqlite3_clear_bindings(ParsedStmt); + if (RC != SQLITE_OK) { + error("Could not clear bindings in statement: '%s'", RawStmt); + return false; + } + + return Ret; +} + +Database::Database(const std::string &Path) { + // Get sqlite3 connection handle. + int RC = sqlite3_open(Path.c_str(), &Conn); + if (RC != SQLITE_OK) { + std::string Msg = "Failed to initialize ML DB at %s, due to \"%s\""; + error(Msg.c_str(), Path.c_str(), sqlite3_errstr(RC)); + + sqlite3_close(Conn); + Conn = nullptr; + return; + } + + // Create anomaly events table if it does not exist. + char *ErrMsg; + RC = sqlite3_exec(Conn, SQL_CREATE_ANOMALIES_TABLE, nullptr, nullptr, &ErrMsg); + if (RC == SQLITE_OK) + return; + + error("SQLite error during database initialization, rc = %d (%s)", RC, ErrMsg); + error("SQLite failed statement: %s", SQL_CREATE_ANOMALIES_TABLE); + + sqlite3_free(ErrMsg); + sqlite3_close(Conn); + Conn = nullptr; +} + +Database::~Database() { + if (!Conn) + return; + + int RC = sqlite3_close(Conn); + if (RC != SQLITE_OK) + error("Could not close connection properly (rc=%d)", RC); +} diff --git a/ml/Database.h b/ml/Database.h new file mode 100644 index 000000000..cc7b75872 --- /dev/null +++ b/ml/Database.h @@ -0,0 +1,131 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +#ifndef ML_DATABASE_H +#define ML_DATABASE_H + +#include "Dimension.h" +#include "ml-private.h" + +#include "json/single_include/nlohmann/json.hpp" + +namespace ml { + +class Statement { +public: + using RowCallback = std::function<void(sqlite3_stmt *Stmt)>; + +public: + Statement(const char *RawStmt) : RawStmt(RawStmt), ParsedStmt(nullptr) {} + + template<typename ...ArgTypes> + bool exec(sqlite3 *Conn, RowCallback RowCb, ArgTypes ...Args) { + if (!prepare(Conn)) + return false; + + switch (bind(1, Args...)) { + case 0: + return false; + case sizeof...(Args): + break; + default: + return resetAndClear(false); + } + + while (true) { + switch (int RC = sqlite3_step(ParsedStmt)) { + case SQLITE_BUSY: case SQLITE_LOCKED: + usleep(SQLITE_INSERT_DELAY * USEC_PER_MS); + continue; + case SQLITE_ROW: + RowCb(ParsedStmt); + continue; + case SQLITE_DONE: + return resetAndClear(true); + default: + error("Stepping through '%s' returned rc=%d", RawStmt, RC); + return resetAndClear(false); + } + } + } + + ~Statement() { + if (!ParsedStmt) + return; + + int RC = sqlite3_finalize(ParsedStmt); + if (RC != SQLITE_OK) + error("Could not properly finalize statement (rc=%d)", RC); + } + +private: + bool prepare(sqlite3 *Conn); + + bool bindValue(size_t Pos, const int Value); + bool bindValue(size_t Pos, const std::string &Value); + + template<typename ArgType, typename ...ArgTypes> + size_t bind(size_t Pos, ArgType T) { + return bindValue(Pos, T); + } + + template<typename ArgType, typename ...ArgTypes> + size_t bind(size_t Pos, ArgType T, ArgTypes ...Args) { + return bindValue(Pos, T) + bind(Pos + 1, Args...); + } + + bool resetAndClear(bool Ret); + +private: + const char *RawStmt; + sqlite3_stmt *ParsedStmt; +}; + +class Database { +private: + static const char *SQL_CREATE_ANOMALIES_TABLE; + static const char *SQL_INSERT_ANOMALY; + static const char *SQL_SELECT_ANOMALY; + static const char *SQL_SELECT_ANOMALY_EVENTS; + +public: + Database(const std::string &Path); + + ~Database(); + + template<typename ...ArgTypes> + bool insertAnomaly(ArgTypes... Args) { + Statement::RowCallback RowCb = [](sqlite3_stmt *Stmt) { (void) Stmt; }; + return InsertAnomalyStmt.exec(Conn, RowCb, Args...); + } + + template<typename ...ArgTypes> + bool getAnomalyInfo(nlohmann::json &Json, ArgTypes&&... Args) { + Statement::RowCallback RowCb = [&](sqlite3_stmt *Stmt) { + const char *Text = static_cast<const char *>(sqlite3_column_blob(Stmt, 0)); + Json = nlohmann::json::parse(Text); + }; + return GetAnomalyInfoStmt.exec(Conn, RowCb, Args...); + } + + template<typename ...ArgTypes> + bool getAnomaliesInRange(std::vector<std::pair<time_t, time_t>> &V, ArgTypes&&... Args) { + Statement::RowCallback RowCb = [&](sqlite3_stmt *Stmt) { + V.push_back({ + sqlite3_column_int64(Stmt, 0), + sqlite3_column_int64(Stmt, 1) + }); + }; + return GetAnomaliesInRangeStmt.exec(Conn, RowCb, Args...); + } + +private: + sqlite3 *Conn; + + Statement InsertAnomalyStmt{SQL_INSERT_ANOMALY}; + Statement GetAnomalyInfoStmt{SQL_SELECT_ANOMALY}; + Statement GetAnomaliesInRangeStmt{SQL_SELECT_ANOMALY_EVENTS}; +}; + +} + +#endif /* ML_DATABASE_H */ diff --git a/ml/Dimension.cc b/ml/Dimension.cc new file mode 100644 index 000000000..c27f30bb4 --- /dev/null +++ b/ml/Dimension.cc @@ -0,0 +1,169 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +#include "Config.h" +#include "Dimension.h" +#include "Query.h" + +using namespace ml; + +/* + * Copy of the unpack_storage_number which allows us to convert + * a storage_number to double. + */ +static CalculatedNumber unpack_storage_number_dbl(storage_number value) { + if(!value) + return 0; + + int sign = 0, exp = 0; + int factor = 10; + + // bit 32 = 0:positive, 1:negative + if(unlikely(value & (1 << 31))) + sign = 1; + + // bit 31 = 0:divide, 1:multiply + if(unlikely(value & (1 << 30))) + exp = 1; + + // bit 27 SN_EXISTS_100 + if(unlikely(value & (1 << 26))) + factor = 100; + + // bit 26 SN_EXISTS_RESET + // bit 25 SN_ANOMALY_BIT + + // bit 30, 29, 28 = (multiplier or divider) 0-7 (8 total) + int mul = (value & ((1<<29)|(1<<28)|(1<<27))) >> 27; + + // bit 24 to bit 1 = the value, so remove all other bits + value ^= value & ((1<<31)|(1<<30)|(1<<29)|(1<<28)|(1<<27)|(1<<26)|(1<<25)|(1<<24)); + + CalculatedNumber CN = value; + + if(exp) { + for(; mul; mul--) + CN *= factor; + } + else { + for( ; mul ; mul--) + CN /= 10; + } + + if(sign) + CN = -CN; + + return CN; +} + +std::pair<CalculatedNumber *, size_t> +TrainableDimension::getCalculatedNumbers() { + size_t MinN = Cfg.MinTrainSamples; + size_t MaxN = Cfg.MaxTrainSamples; + + // Figure out what our time window should be. + time_t BeforeT = now_realtime_sec() - 1; + time_t AfterT = BeforeT - (MaxN * updateEvery()); + + BeforeT -= (BeforeT % updateEvery()); + AfterT -= (AfterT % updateEvery()); + + BeforeT = std::min(BeforeT, latestTime()); + AfterT = std::max(AfterT, oldestTime()); + + if (AfterT >= BeforeT) + return { nullptr, 0 }; + + CalculatedNumber *CNs = new CalculatedNumber[MaxN * (Cfg.LagN + 1)](); + + // Start the query. + unsigned Idx = 0; + unsigned CollectedValues = 0; + unsigned TotalValues = 0; + + CalculatedNumber LastValue = std::numeric_limits<CalculatedNumber>::quiet_NaN(); + Query Q = Query(getRD()); + + Q.init(AfterT, BeforeT); + while (!Q.isFinished()) { + if (Idx == MaxN) + break; + + auto P = Q.nextMetric(); + storage_number SN = P.second; + + if (does_storage_number_exist(SN)) { + CNs[Idx] = unpack_storage_number_dbl(SN); + LastValue = CNs[Idx]; + CollectedValues++; + } else + CNs[Idx] = LastValue; + + Idx++; + } + TotalValues = Idx; + + if (CollectedValues < MinN) { + delete[] CNs; + return { nullptr, 0 }; + } + + // Find first non-NaN value. + for (Idx = 0; std::isnan(CNs[Idx]); Idx++, TotalValues--) { } + + // Overwrite NaN values. + if (Idx != 0) + memmove(CNs, &CNs[Idx], sizeof(CalculatedNumber) * TotalValues); + + return { CNs, TotalValues }; +} + +MLResult TrainableDimension::trainModel() { + auto P = getCalculatedNumbers(); + CalculatedNumber *CNs = P.first; + unsigned N = P.second; + + if (!CNs) + return MLResult::MissingData; + + SamplesBuffer SB = SamplesBuffer(CNs, N, 1, Cfg.DiffN, Cfg.SmoothN, Cfg.LagN); + KM.train(SB, Cfg.MaxKMeansIters); + Trained = true; + + delete[] CNs; + return MLResult::Success; +} + +void PredictableDimension::addValue(CalculatedNumber Value, bool Exists) { + if (!Exists) { + CNs.clear(); + return; + } + + unsigned N = Cfg.DiffN + Cfg.SmoothN + Cfg.LagN; + if (CNs.size() < N) { + CNs.push_back(Value); + return; + } + + std::rotate(std::begin(CNs), std::begin(CNs) + 1, std::end(CNs)); + CNs[N - 1] = Value; +} + +std::pair<MLResult, bool> PredictableDimension::predict() { + unsigned N = Cfg.DiffN + Cfg.SmoothN + Cfg.LagN; + if (CNs.size() != N) + return { MLResult::MissingData, AnomalyBit }; + + CalculatedNumber *TmpCNs = new CalculatedNumber[N * (Cfg.LagN + 1)](); + std::memcpy(TmpCNs, CNs.data(), N * sizeof(CalculatedNumber)); + + SamplesBuffer SB = SamplesBuffer(TmpCNs, N, 1, Cfg.DiffN, Cfg.SmoothN, Cfg.LagN); + AnomalyScore = computeAnomalyScore(SB); + delete[] TmpCNs; + + if (AnomalyScore == std::numeric_limits<CalculatedNumber>::quiet_NaN()) + return { MLResult::NaN, AnomalyBit }; + + AnomalyBit = AnomalyScore >= (100 * Cfg.DimensionAnomalyScoreThreshold); + return { MLResult::Success, AnomalyBit }; +} diff --git a/ml/Dimension.h b/ml/Dimension.h new file mode 100644 index 000000000..fdf923ccc --- /dev/null +++ b/ml/Dimension.h @@ -0,0 +1,124 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +#ifndef ML_DIMENSION_H +#define ML_DIMENSION_H + +#include "BitBufferCounter.h" +#include "Config.h" + +#include "ml-private.h" + +namespace ml { + +class RrdDimension { +public: + RrdDimension(RRDDIM *RD) : RD(RD), Ops(&RD->state->query_ops) { + std::stringstream SS; + SS << RD->rrdset->id << "|" << RD->name; + ID = SS.str(); + } + + RRDDIM *getRD() const { return RD; } + + time_t latestTime() { return Ops->latest_time(RD); } + + time_t oldestTime() { return Ops->oldest_time(RD); } + + unsigned updateEvery() const { return RD->update_every; } + + const std::string getID() const { return ID; } + + virtual ~RrdDimension() {} + +private: + RRDDIM *RD; + struct rrddim_volatile::rrddim_query_ops *Ops; + + std::string ID; +}; + +enum class MLResult { + Success = 0, + MissingData, + NaN, +}; + +class TrainableDimension : public RrdDimension { +public: + TrainableDimension(RRDDIM *RD) : + RrdDimension(RD), TrainEvery(Cfg.TrainEvery * updateEvery()) {} + + MLResult trainModel(); + + CalculatedNumber computeAnomalyScore(SamplesBuffer &SB) { + return Trained ? KM.anomalyScore(SB) : 0.0; + } + + bool shouldTrain(const TimePoint &TP) const { + return (LastTrainedAt + TrainEvery) < TP; + } + + bool isTrained() const { return Trained; } + + double updateTrainingDuration(double Duration) { + return TrainingDuration.exchange(Duration); + } + +private: + std::pair<CalculatedNumber *, size_t> getCalculatedNumbers(); + +public: + TimePoint LastTrainedAt{Seconds{0}}; + +private: + Seconds TrainEvery; + KMeans KM; + + std::atomic<bool> Trained{false}; + std::atomic<double> TrainingDuration{0.0}; +}; + +class PredictableDimension : public TrainableDimension { +public: + PredictableDimension(RRDDIM *RD) : TrainableDimension(RD) {} + + std::pair<MLResult, bool> predict(); + + void addValue(CalculatedNumber Value, bool Exists); + + bool isAnomalous() { return AnomalyBit; } + +private: + CalculatedNumber AnomalyScore{0.0}; + std::atomic<bool> AnomalyBit{false}; + + std::vector<CalculatedNumber> CNs; +}; + +class DetectableDimension : public PredictableDimension { +public: + DetectableDimension(RRDDIM *RD) : PredictableDimension(RD) {} + + std::pair<bool, double> detect(size_t WindowLength, bool Reset) { + bool AnomalyBit = isAnomalous(); + + if (Reset) + NumSetBits = BBC.numSetBits(); + + NumSetBits += AnomalyBit; + BBC.insert(AnomalyBit); + + double AnomalyRate = static_cast<double>(NumSetBits) / WindowLength; + return { AnomalyBit, AnomalyRate }; + } + +private: + BitBufferCounter BBC{static_cast<size_t>(Cfg.ADMinWindowSize)}; + size_t NumSetBits{0}; +}; + +using Dimension = DetectableDimension; + +} // namespace ml + +#endif /* ML_DIMENSION_H */ diff --git a/ml/Host.cc b/ml/Host.cc new file mode 100644 index 000000000..d26ff2ae4 --- /dev/null +++ b/ml/Host.cc @@ -0,0 +1,458 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +#include <dlib/statistics.h> + +#include "Config.h" +#include "Host.h" + +#include "json/single_include/nlohmann/json.hpp" + +using namespace ml; + +static void updateDimensionsChart(RRDHOST *RH, + collected_number NumTrainedDimensions, + collected_number NumNormalDimensions, + collected_number NumAnomalousDimensions) { + static thread_local RRDSET *RS = nullptr; + static thread_local RRDDIM *NumTotalDimensionsRD = nullptr; + static thread_local RRDDIM *NumTrainedDimensionsRD = nullptr; + static thread_local RRDDIM *NumNormalDimensionsRD = nullptr; + static thread_local RRDDIM *NumAnomalousDimensionsRD = nullptr; + + if (!RS) { + RS = rrdset_create( + RH, // host + "anomaly_detection", // type + "dimensions", // id + NULL, // name + "dimensions", // family + NULL, // ctx + "Anomaly detection dimensions", // title + "dimensions", // units + "netdata", // plugin + "ml", // module + 39183, // priority + RH->rrd_update_every, // update_every + RRDSET_TYPE_LINE // chart_type + ); + + NumTotalDimensionsRD = rrddim_add(RS, "total", NULL, + 1, 1, RRD_ALGORITHM_ABSOLUTE); + NumTrainedDimensionsRD = rrddim_add(RS, "trained", NULL, + 1, 1, RRD_ALGORITHM_ABSOLUTE); + NumNormalDimensionsRD = rrddim_add(RS, "normal", NULL, + 1, 1, RRD_ALGORITHM_ABSOLUTE); + NumAnomalousDimensionsRD = rrddim_add(RS, "anomalous", NULL, + 1, 1, RRD_ALGORITHM_ABSOLUTE); + } else + rrdset_next(RS); + + rrddim_set_by_pointer(RS, NumTotalDimensionsRD, NumNormalDimensions + NumAnomalousDimensions); + rrddim_set_by_pointer(RS, NumTrainedDimensionsRD, NumTrainedDimensions); + rrddim_set_by_pointer(RS, NumNormalDimensionsRD, NumNormalDimensions); + rrddim_set_by_pointer(RS, NumAnomalousDimensionsRD, NumAnomalousDimensions); + + rrdset_done(RS); +} + +static void updateRateChart(RRDHOST *RH, collected_number AnomalyRate) { + static thread_local RRDSET *RS = nullptr; + static thread_local RRDDIM *AnomalyRateRD = nullptr; + + if (!RS) { + RS = rrdset_create( + RH, // host + "anomaly_detection", // type + "anomaly_rate", // id + NULL, // name + "anomaly_rate", // family + NULL, // ctx + "Percentage of anomalous dimensions", // title + "percentage", // units + "netdata", // plugin + "ml", // module + 39184, // priority + RH->rrd_update_every, // update_every + RRDSET_TYPE_LINE // chart_type + ); + + AnomalyRateRD = rrddim_add(RS, "anomaly_rate", NULL, + 1, 100, RRD_ALGORITHM_ABSOLUTE); + } else + rrdset_next(RS); + + rrddim_set_by_pointer(RS, AnomalyRateRD, AnomalyRate); + + rrdset_done(RS); +} + +static void updateWindowLengthChart(RRDHOST *RH, collected_number WindowLength) { + static thread_local RRDSET *RS = nullptr; + static thread_local RRDDIM *WindowLengthRD = nullptr; + + if (!RS) { + RS = rrdset_create( + RH, // host + "anomaly_detection", // type + "detector_window", // id + NULL, // name + "detector_window", // family + NULL, // ctx + "Anomaly detector window length", // title + "seconds", // units + "netdata", // plugin + "ml", // module + 39185, // priority + RH->rrd_update_every, // update_every + RRDSET_TYPE_LINE // chart_type + ); + + WindowLengthRD = rrddim_add(RS, "duration", NULL, + 1, 1, RRD_ALGORITHM_ABSOLUTE); + } else + rrdset_next(RS); + + rrddim_set_by_pointer(RS, WindowLengthRD, WindowLength * RH->rrd_update_every); + rrdset_done(RS); +} + +static void updateEventsChart(RRDHOST *RH, + std::pair<BitRateWindow::Edge, size_t> P, + bool ResetBitCounter, + bool NewAnomalyEvent) { + static thread_local RRDSET *RS = nullptr; + static thread_local RRDDIM *AboveThresholdRD = nullptr; + static thread_local RRDDIM *ResetBitCounterRD = nullptr; + static thread_local RRDDIM *NewAnomalyEventRD = nullptr; + + if (!RS) { + RS = rrdset_create( + RH, // host + "anomaly_detection", // type + "detector_events", // id + NULL, // name + "detector_events", // family + NULL, // ctx + "Anomaly events triggered", // title + "boolean", // units + "netdata", // plugin + "ml", // module + 39186, // priority + RH->rrd_update_every, // update_every + RRDSET_TYPE_LINE // chart_type + ); + + AboveThresholdRD = rrddim_add(RS, "above_threshold", NULL, + 1, 1, RRD_ALGORITHM_ABSOLUTE); + ResetBitCounterRD = rrddim_add(RS, "reset_bit_counter", NULL, + 1, 1, RRD_ALGORITHM_ABSOLUTE); + NewAnomalyEventRD = rrddim_add(RS, "new_anomaly_event", NULL, + 1, 1, RRD_ALGORITHM_ABSOLUTE); + } else + rrdset_next(RS); + + BitRateWindow::Edge E = P.first; + bool AboveThreshold = E.second == BitRateWindow::State::AboveThreshold; + + rrddim_set_by_pointer(RS, AboveThresholdRD, AboveThreshold); + rrddim_set_by_pointer(RS, ResetBitCounterRD, ResetBitCounter); + rrddim_set_by_pointer(RS, NewAnomalyEventRD, NewAnomalyEvent); + + rrdset_done(RS); +} + +static void updateDetectionChart(RRDHOST *RH, collected_number PredictionDuration) { + static thread_local RRDSET *RS = nullptr; + static thread_local RRDDIM *PredictiobDurationRD = nullptr; + + if (!RS) { + RS = rrdset_create( + RH, // host + "anomaly_detection", // type + "prediction_stats", // id + NULL, // name + "prediction_stats", // family + NULL, // ctx + "Time it took to run prediction", // title + "milliseconds", // units + "netdata", // plugin + "ml", // module + 39187, // priority + RH->rrd_update_every, // update_every + RRDSET_TYPE_LINE // chart_type + ); + + PredictiobDurationRD = rrddim_add(RS, "duration", NULL, + 1, 1, RRD_ALGORITHM_ABSOLUTE); + } else + rrdset_next(RS); + + rrddim_set_by_pointer(RS, PredictiobDurationRD, PredictionDuration); + + rrdset_done(RS); +} + +static void updateTrainingChart(RRDHOST *RH, + collected_number TotalTrainingDuration, + collected_number MaxTrainingDuration) +{ + static thread_local RRDSET *RS = nullptr; + static thread_local RRDDIM *TotalTrainingDurationRD = nullptr; + static thread_local RRDDIM *MaxTrainingDurationRD = nullptr; + + if (!RS) { + RS = rrdset_create( + RH, // host + "anomaly_detection", // type + "training_stats", // id + NULL, // name + "training_stats", // family + NULL, // ctx + "Training step statistics", // title + "milliseconds", // units + "netdata", // plugin + "ml", // module + 39188, // priority + RH->rrd_update_every, // update_every + RRDSET_TYPE_LINE // chart_type + ); + + TotalTrainingDurationRD = rrddim_add(RS, "total_training_duration", NULL, + 1, 1, RRD_ALGORITHM_ABSOLUTE); + MaxTrainingDurationRD = rrddim_add(RS, "max_training_duration", NULL, + 1, 1, RRD_ALGORITHM_ABSOLUTE); + } else + rrdset_next(RS); + + rrddim_set_by_pointer(RS, TotalTrainingDurationRD, TotalTrainingDuration); + rrddim_set_by_pointer(RS, MaxTrainingDurationRD, MaxTrainingDuration); + + rrdset_done(RS); +} + +void RrdHost::addDimension(Dimension *D) { + std::lock_guard<std::mutex> Lock(Mutex); + + DimensionsMap[D->getRD()] = D; + + // Default construct mutex for dimension + LocksMap[D]; +} + +void RrdHost::removeDimension(Dimension *D) { + // Remove the dimension from the hosts map. + { + std::lock_guard<std::mutex> Lock(Mutex); + DimensionsMap.erase(D->getRD()); + } + + // Delete the dimension by locking the mutex that protects it. + { + std::lock_guard<std::mutex> Lock(LocksMap[D]); + delete D; + } + + // Remove the lock entry for the deleted dimension. + { + std::lock_guard<std::mutex> Lock(Mutex); + LocksMap.erase(D); + } +} + +void RrdHost::getConfigAsJson(nlohmann::json &Json) const { + Json["version"] = 1; + + Json["enabled"] = Cfg.EnableAnomalyDetection; + + Json["min-train-samples"] = Cfg.MinTrainSamples; + Json["max-train-samples"] = Cfg.MaxTrainSamples; + Json["train-every"] = Cfg.TrainEvery; + + Json["diff-n"] = Cfg.DiffN; + Json["smooth-n"] = Cfg.SmoothN; + Json["lag-n"] = Cfg.LagN; + + Json["max-kmeans-iters"] = Cfg.MaxKMeansIters; + + Json["dimension-anomaly-score-threshold"] = Cfg.DimensionAnomalyScoreThreshold; + Json["host-anomaly-rate-threshold"] = Cfg.HostAnomalyRateThreshold; + + Json["min-window-size"] = Cfg.ADMinWindowSize; + Json["max-window-size"] = Cfg.ADMaxWindowSize; + Json["idle-window-size"] = Cfg.ADIdleWindowSize; + Json["window-rate-threshold"] = Cfg.ADWindowRateThreshold; + Json["dimension-rate-threshold"] = Cfg.ADDimensionRateThreshold; +} + +std::pair<Dimension *, Duration<double>> +TrainableHost::findDimensionToTrain(const TimePoint &NowTP) { + std::lock_guard<std::mutex> Lock(Mutex); + + Duration<double> AllottedDuration = Duration<double>{Cfg.TrainEvery * updateEvery()} / (DimensionsMap.size() + 1); + + for (auto &DP : DimensionsMap) { + Dimension *D = DP.second; + + if (D->shouldTrain(NowTP)) { + LocksMap[D].lock(); + return { D, AllottedDuration }; + } + } + + return { nullptr, AllottedDuration }; +} + +void TrainableHost::trainDimension(Dimension *D, const TimePoint &NowTP) { + if (D == nullptr) + return; + + D->LastTrainedAt = NowTP + Seconds{D->updateEvery()}; + + TimePoint StartTP = SteadyClock::now(); + D->trainModel(); + Duration<double> Duration = SteadyClock::now() - StartTP; + D->updateTrainingDuration(Duration.count()); + + { + std::lock_guard<std::mutex> Lock(Mutex); + LocksMap[D].unlock(); + } +} + +void TrainableHost::train() { + Duration<double> MaxSleepFor = Seconds{updateEvery()}; + + while (!netdata_exit) { + TimePoint NowTP = SteadyClock::now(); + + auto P = findDimensionToTrain(NowTP); + trainDimension(P.first, NowTP); + + Duration<double> AllottedDuration = P.second; + Duration<double> RealDuration = SteadyClock::now() - NowTP; + + Duration<double> SleepFor; + if (RealDuration >= AllottedDuration) + continue; + + SleepFor = std::min(AllottedDuration - RealDuration, MaxSleepFor); + std::this_thread::sleep_for(SleepFor); + } +} + +void DetectableHost::detectOnce() { + auto P = BRW.insert(AnomalyRate >= Cfg.HostAnomalyRateThreshold); + BitRateWindow::Edge Edge = P.first; + size_t WindowLength = P.second; + + bool ResetBitCounter = (Edge.first != BitRateWindow::State::AboveThreshold); + bool NewAnomalyEvent = (Edge.first == BitRateWindow::State::AboveThreshold) && + (Edge.second == BitRateWindow::State::Idle); + + std::vector<std::pair<double, std::string>> DimsOverThreshold; + + size_t NumAnomalousDimensions = 0; + size_t NumNormalDimensions = 0; + size_t NumTrainedDimensions = 0; + + double TotalTrainingDuration = 0.0; + double MaxTrainingDuration = 0.0; + + { + std::lock_guard<std::mutex> Lock(Mutex); + + DimsOverThreshold.reserve(DimensionsMap.size()); + + for (auto &DP : DimensionsMap) { + Dimension *D = DP.second; + + auto P = D->detect(WindowLength, ResetBitCounter); + bool IsAnomalous = P.first; + double AnomalyRate = P.second; + + NumTrainedDimensions += D->isTrained(); + + double DimTrainingDuration = D->updateTrainingDuration(0.0); + MaxTrainingDuration = std::max(MaxTrainingDuration, DimTrainingDuration); + TotalTrainingDuration += DimTrainingDuration; + + if (IsAnomalous) + NumAnomalousDimensions += 1; + + if (NewAnomalyEvent && (AnomalyRate >= Cfg.ADDimensionRateThreshold)) + DimsOverThreshold.push_back({ AnomalyRate, D->getID() }); + } + + if (NumAnomalousDimensions) + AnomalyRate = static_cast<double>(NumAnomalousDimensions) / DimensionsMap.size(); + else + AnomalyRate = 0.0; + + NumNormalDimensions = DimensionsMap.size() - NumAnomalousDimensions; + } + + this->NumAnomalousDimensions = NumAnomalousDimensions; + this->NumNormalDimensions = NumNormalDimensions; + this->NumTrainedDimensions = NumTrainedDimensions; + + updateDimensionsChart(getRH(), NumTrainedDimensions, NumNormalDimensions, NumAnomalousDimensions); + updateRateChart(getRH(), AnomalyRate * 10000.0); + updateWindowLengthChart(getRH(), WindowLength); + updateEventsChart(getRH(), P, ResetBitCounter, NewAnomalyEvent); + updateTrainingChart(getRH(), TotalTrainingDuration * 1000.0, MaxTrainingDuration * 1000.0); + + if (!NewAnomalyEvent || (DimsOverThreshold.size() == 0)) + return; + + std::sort(DimsOverThreshold.begin(), DimsOverThreshold.end()); + std::reverse(DimsOverThreshold.begin(), DimsOverThreshold.end()); + + // Make sure the JSON response won't grow beyond a specific number + // of dimensions. Log an error message if this happens, because it + // most likely means that the user specified a very-low anomaly rate + // threshold. + size_t NumMaxDimsOverThreshold = 2000; + if (DimsOverThreshold.size() > NumMaxDimsOverThreshold) { + error("Found %zu dimensions over threshold. Reducing JSON result to %zu dimensions.", + DimsOverThreshold.size(), NumMaxDimsOverThreshold); + DimsOverThreshold.resize(NumMaxDimsOverThreshold); + } + + nlohmann::json JsonResult = DimsOverThreshold; + + time_t Before = now_realtime_sec(); + time_t After = Before - (WindowLength * updateEvery()); + DB.insertAnomaly("AD1", 1, getUUID(), After, Before, JsonResult.dump(4)); +} + +void DetectableHost::detect() { + std::this_thread::sleep_for(Seconds{10}); + + while (!netdata_exit) { + TimePoint StartTP = SteadyClock::now(); + detectOnce(); + TimePoint EndTP = SteadyClock::now(); + + Duration<double> Dur = EndTP - StartTP; + updateDetectionChart(getRH(), Dur.count() * 1000); + + std::this_thread::sleep_for(Seconds{updateEvery()}); + } +} + +void DetectableHost::getDetectionInfoAsJson(nlohmann::json &Json) const { + Json["anomalous-dimensions"] = NumAnomalousDimensions; + Json["normal-dimensions"] = NumNormalDimensions; + Json["total-dimensions"] = NumAnomalousDimensions + NumNormalDimensions; + Json["trained-dimensions"] = NumTrainedDimensions; +} + +void DetectableHost::startAnomalyDetectionThreads() { + TrainingThread = std::thread(&TrainableHost::train, this); + DetectionThread = std::thread(&DetectableHost::detect, this); +} + +void DetectableHost::stopAnomalyDetectionThreads() { + TrainingThread.join(); + DetectionThread.join(); +} diff --git a/ml/Host.h b/ml/Host.h new file mode 100644 index 000000000..86591d7ae --- /dev/null +++ b/ml/Host.h @@ -0,0 +1,104 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +#ifndef ML_HOST_H +#define ML_HOST_H + +#include "BitRateWindow.h" +#include "Config.h" +#include "Database.h" +#include "Dimension.h" + +#include "ml-private.h" + +namespace ml { + +class RrdHost { +public: + RrdHost(RRDHOST *RH) : RH(RH) {} + + RRDHOST *getRH() { return RH; } + + unsigned updateEvery() { return RH->rrd_update_every; } + + std::string getUUID() { + char S[UUID_STR_LEN]; + uuid_unparse_lower(RH->host_uuid, S); + return S; + } + + void addDimension(Dimension *D); + void removeDimension(Dimension *D); + + void getConfigAsJson(nlohmann::json &Json) const; + + virtual ~RrdHost() {}; + +protected: + RRDHOST *RH; + + // Protect dimension and lock maps + std::mutex Mutex; + + std::map<RRDDIM *, Dimension *> DimensionsMap; + std::map<Dimension *, std::mutex> LocksMap; +}; + +class TrainableHost : public RrdHost { +public: + TrainableHost(RRDHOST *RH) : RrdHost(RH) {} + + void train(); + +private: + std::pair<Dimension *, Duration<double>> findDimensionToTrain(const TimePoint &NowTP); + void trainDimension(Dimension *D, const TimePoint &NowTP); +}; + +class DetectableHost : public TrainableHost { +public: + DetectableHost(RRDHOST *RH) : TrainableHost(RH) {} + + void startAnomalyDetectionThreads(); + void stopAnomalyDetectionThreads(); + + template<typename ...ArgTypes> + bool getAnomalyInfo(ArgTypes&&... Args) { + return DB.getAnomalyInfo(Args...); + } + + template<typename ...ArgTypes> + bool getAnomaliesInRange(ArgTypes&&... Args) { + return DB.getAnomaliesInRange(Args...); + } + + void getDetectionInfoAsJson(nlohmann::json &Json) const; + +private: + void detect(); + void detectOnce(); + +private: + std::thread TrainingThread; + std::thread DetectionThread; + + BitRateWindow BRW{ + static_cast<size_t>(Cfg.ADMinWindowSize), + static_cast<size_t>(Cfg.ADMaxWindowSize), + static_cast<size_t>(Cfg.ADIdleWindowSize), + static_cast<size_t>(Cfg.ADMinWindowSize * Cfg.ADWindowRateThreshold) + }; + + CalculatedNumber AnomalyRate{0.0}; + + size_t NumAnomalousDimensions{0}; + size_t NumNormalDimensions{0}; + size_t NumTrainedDimensions{0}; + + Database DB{Cfg.AnomalyDBPath}; +}; + +using Host = DetectableHost; + +} // namespace ml + +#endif /* ML_HOST_H */ diff --git a/ml/Makefile.am b/ml/Makefile.am new file mode 100644 index 000000000..27449d659 --- /dev/null +++ b/ml/Makefile.am @@ -0,0 +1,8 @@ +# SPDX-License-Identifier: GPL-3.0-or-later + +AUTOMAKE_OPTIONS = subdir-objects +MAINTAINERCLEANFILES = $(srcdir)/Makefile.in + +SUBDIRS = \ + kmeans \ + $(NULL) diff --git a/ml/Query.h b/ml/Query.h new file mode 100644 index 000000000..cbaf6c297 --- /dev/null +++ b/ml/Query.h @@ -0,0 +1,49 @@ +#ifndef QUERY_H +#define QUERY_H + +#include "ml-private.h" + +namespace ml { + +class Query { +public: + Query(RRDDIM *RD) : RD(RD) { + Ops = &RD->state->query_ops; + } + + time_t latestTime() { + return Ops->latest_time(RD); + } + + time_t oldestTime() { + return Ops->oldest_time(RD); + } + + void init(time_t AfterT, time_t BeforeT) { + Ops->init(RD, &Handle, AfterT, BeforeT); + } + + bool isFinished() { + return Ops->is_finished(&Handle); + } + + std::pair<time_t, storage_number> nextMetric() { + time_t CurrT; + storage_number SN = Ops->next_metric(&Handle, &CurrT); + return { CurrT, SN }; + } + + ~Query() { + Ops->finalize(&Handle); + } + +private: + RRDDIM *RD; + + struct rrddim_volatile::rrddim_query_ops *Ops; + struct rrddim_query_handle Handle; +}; + +} // namespace ml + +#endif /* QUERY_H */ diff --git a/ml/Tests.cc b/ml/Tests.cc new file mode 100644 index 000000000..7d369d48d --- /dev/null +++ b/ml/Tests.cc @@ -0,0 +1,301 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +#include "BitBufferCounter.h" +#include "BitRateWindow.h" + +#include "gtest/gtest.h" + +using namespace ml; + +TEST(BitBufferCounterTest, Cap_4) { + size_t Capacity = 4; + BitBufferCounter BBC(Capacity); + + // No bits set + EXPECT_EQ(BBC.numSetBits(), 0); + + // All ones + for (size_t Idx = 0; Idx != (2 * Capacity); Idx++) { + BBC.insert(true); + + EXPECT_EQ(BBC.numSetBits(), std::min(Idx + 1, Capacity)); + } + + // All zeroes + for (size_t Idx = 0; Idx != Capacity; Idx++) { + BBC.insert(false); + + if (Idx < Capacity) + EXPECT_EQ(BBC.numSetBits(), Capacity - (Idx + 1)); + else + EXPECT_EQ(BBC.numSetBits(), 0); + } + + // Even ones/zeroes + for (size_t Idx = 0; Idx != (2 * Capacity); Idx++) + BBC.insert(Idx % 2 == 0); + EXPECT_EQ(BBC.numSetBits(), Capacity / 2); +} + +using State = BitRateWindow::State; +using Edge = BitRateWindow::Edge; +using Result = std::pair<Edge, size_t>; + +TEST(BitRateWindowTest, Cycles) { + /* Test the FSM by going through its two cycles: + * 1) NotFilled -> AboveThreshold -> Idle -> NotFilled + * 2) NotFilled -> BelowThreshold -> AboveThreshold -> Idle -> NotFilled + * + * Check the window's length on every new state transition. + */ + + size_t MinLength = 4, MaxLength = 6, IdleLength = 5; + size_t SetBitsThreshold = 3; + + Result R; + BitRateWindow BRW(MinLength, MaxLength, IdleLength, SetBitsThreshold); + + /* + * 1st cycle + */ + + // NotFilled -> AboveThreshold + R = BRW.insert(true); + EXPECT_EQ(R.first, std::make_pair(State::NotFilled, State::NotFilled)); + R = BRW.insert(true); + EXPECT_EQ(R.first, std::make_pair(State::NotFilled, State::NotFilled)); + R = BRW.insert(true); + EXPECT_EQ(R.first, std::make_pair(State::NotFilled, State::NotFilled)); + R = BRW.insert(true); + EXPECT_EQ(R.first, std::make_pair(State::NotFilled, State::AboveThreshold)); + EXPECT_EQ(R.second, MinLength); + + // AboveThreshold -> Idle + R = BRW.insert(true); + EXPECT_EQ(R.first, std::make_pair(State::AboveThreshold, State::AboveThreshold)); + R = BRW.insert(true); + EXPECT_EQ(R.first, std::make_pair(State::AboveThreshold, State::AboveThreshold)); + + R = BRW.insert(true); + EXPECT_EQ(R.first, std::make_pair(State::AboveThreshold, State::Idle)); + EXPECT_EQ(R.second, MaxLength); + + + // Idle -> NotFilled + R = BRW.insert(true); + EXPECT_EQ(R.first, std::make_pair(State::Idle, State::Idle)); + R = BRW.insert(true); + EXPECT_EQ(R.first, std::make_pair(State::Idle, State::Idle)); + R = BRW.insert(true); + EXPECT_EQ(R.first, std::make_pair(State::Idle, State::Idle)); + R = BRW.insert(true); + EXPECT_EQ(R.first, std::make_pair(State::Idle, State::Idle)); + R = BRW.insert(true); + EXPECT_EQ(R.first, std::make_pair(State::Idle, State::NotFilled)); + EXPECT_EQ(R.second, 1); + + // NotFilled -> AboveThreshold + R = BRW.insert(true); + EXPECT_EQ(R.first, std::make_pair(State::NotFilled, State::NotFilled)); + R = BRW.insert(true); + EXPECT_EQ(R.first, std::make_pair(State::NotFilled, State::NotFilled)); + R = BRW.insert(true); + EXPECT_EQ(R.first, std::make_pair(State::NotFilled, State::AboveThreshold)); + EXPECT_EQ(R.second, MinLength); + + /* + * 2nd cycle + */ + + BRW = BitRateWindow(MinLength, MaxLength, IdleLength, SetBitsThreshold); + + // NotFilled -> BelowThreshold + R = BRW.insert(false); + EXPECT_EQ(R.first, std::make_pair(State::NotFilled, State::NotFilled)); + R = BRW.insert(false); + EXPECT_EQ(R.first, std::make_pair(State::NotFilled, State::NotFilled)); + R = BRW.insert(false); + EXPECT_EQ(R.first, std::make_pair(State::NotFilled, State::NotFilled)); + R = BRW.insert(false); + EXPECT_EQ(R.first, std::make_pair(State::NotFilled, State::BelowThreshold)); + EXPECT_EQ(R.second, MinLength); + + // BelowThreshold -> BelowThreshold: + // Check the state's self loop by adding set bits that will keep the + // bit buffer below the specified threshold. + // + for (size_t Idx = 0; Idx != 2 * MaxLength; Idx++) { + R = BRW.insert(Idx % 2 == 0); + EXPECT_EQ(R.first, std::make_pair(State::BelowThreshold, State::BelowThreshold)); + EXPECT_EQ(R.second, MinLength); + } + + // Verify that at the end of the loop the internal bit buffer contains + // "1010". Do so by adding one set bit and checking that we remain below + // the specified threshold. + R = BRW.insert(true); + EXPECT_EQ(R.first, std::make_pair(State::BelowThreshold, State::BelowThreshold)); + EXPECT_EQ(R.second, MinLength); + + // BelowThreshold -> AboveThreshold + R = BRW.insert(true); + EXPECT_EQ(R.first, std::make_pair(State::BelowThreshold, State::AboveThreshold)); + EXPECT_EQ(R.second, MinLength); + + // AboveThreshold -> Idle: + // Do the transition without filling the max window size this time. + R = BRW.insert(false); + EXPECT_EQ(R.first, std::make_pair(State::AboveThreshold, State::Idle)); + EXPECT_EQ(R.second, MinLength); + + // Idle -> NotFilled + R = BRW.insert(false); + EXPECT_EQ(R.first, std::make_pair(State::Idle, State::Idle)); + R = BRW.insert(false); + EXPECT_EQ(R.first, std::make_pair(State::Idle, State::Idle)); + R = BRW.insert(false); + EXPECT_EQ(R.first, std::make_pair(State::Idle, State::Idle)); + R = BRW.insert(false); + EXPECT_EQ(R.first, std::make_pair(State::Idle, State::Idle)); + R = BRW.insert(false); + EXPECT_EQ(R.first, std::make_pair(State::Idle, State::NotFilled)); + EXPECT_EQ(R.second, 1); + + // NotFilled -> AboveThreshold + R = BRW.insert(true); + EXPECT_EQ(R.first, std::make_pair(State::NotFilled, State::NotFilled)); + R = BRW.insert(true); + EXPECT_EQ(R.first, std::make_pair(State::NotFilled, State::NotFilled)); + R = BRW.insert(true); + EXPECT_EQ(R.first, std::make_pair(State::NotFilled, State::AboveThreshold)); + EXPECT_EQ(R.second, MinLength); +} + +TEST(BitRateWindowTest, ConsecutiveOnes) { + size_t MinLength = 120, MaxLength = 240, IdleLength = 30; + size_t SetBitsThreshold = 30; + + Result R; + BitRateWindow BRW(MinLength, MaxLength, IdleLength, SetBitsThreshold); + + for (size_t Idx = 0; Idx != MaxLength; Idx++) + R = BRW.insert(false); + EXPECT_EQ(R.first, std::make_pair(State::BelowThreshold, State::BelowThreshold)); + EXPECT_EQ(R.second, MinLength); + + for (size_t Idx = 0; Idx != SetBitsThreshold; Idx++) { + EXPECT_EQ(R.first, std::make_pair(State::BelowThreshold, State::BelowThreshold)); + R = BRW.insert(true); + } + EXPECT_EQ(R.first, std::make_pair(State::BelowThreshold, State::AboveThreshold)); + EXPECT_EQ(R.second, MinLength); + + // At this point the window's buffer contains: + // (MinLength - SetBitsThreshold = 90) 0s, followed by + // (SetBitsThreshold = 30) 1s. + // + // To go below the threshold, we need to add (90 + 1) more 0s in the window's + // buffer. At that point, the the window's buffer will contain: + // (SetBitsThreshold = 29) 1s, followed by + // (MinLength - SetBitsThreshold = 91) 0s. + // + // Right before adding the last 0, we expect the window's length to be equal to 210, + // because the bit buffer has gone through these bits: + // (MinLength - SetBitsThreshold = 90) 0s, followed by + // (SetBitsThreshold = 30) 1s, followed by + // (MinLength - SetBitsThreshold = 90) 0s. + + for (size_t Idx = 0; Idx != (MinLength - SetBitsThreshold); Idx++) { + R = BRW.insert(false); + EXPECT_EQ(R.first, std::make_pair(State::AboveThreshold, State::AboveThreshold)); + } + EXPECT_EQ(R.second, 2 * MinLength - SetBitsThreshold); + R = BRW.insert(false); + EXPECT_EQ(R.first, std::make_pair(State::AboveThreshold, State::Idle)); + + // Continue with the Idle -> NotFilled edge. + for (size_t Idx = 0; Idx != IdleLength - 1; Idx++) { + R = BRW.insert(false); + EXPECT_EQ(R.first, std::make_pair(State::Idle, State::Idle)); + } + R = BRW.insert(false); + EXPECT_EQ(R.first, std::make_pair(State::Idle, State::NotFilled)); + EXPECT_EQ(R.second, 1); +} + +TEST(BitRateWindowTest, WithHoles) { + size_t MinLength = 120, MaxLength = 240, IdleLength = 30; + size_t SetBitsThreshold = 30; + + Result R; + BitRateWindow BRW(MinLength, MaxLength, IdleLength, SetBitsThreshold); + + for (size_t Idx = 0; Idx != MaxLength; Idx++) + R = BRW.insert(false); + + for (size_t Idx = 0; Idx != SetBitsThreshold / 3; Idx++) + R = BRW.insert(true); + for (size_t Idx = 0; Idx != SetBitsThreshold / 3; Idx++) + R = BRW.insert(false); + for (size_t Idx = 0; Idx != SetBitsThreshold / 3; Idx++) + R = BRW.insert(true); + for (size_t Idx = 0; Idx != SetBitsThreshold / 3; Idx++) + R = BRW.insert(false); + for (size_t Idx = 0; Idx != SetBitsThreshold / 3; Idx++) + R = BRW.insert(true); + + EXPECT_EQ(R.first, std::make_pair(State::BelowThreshold, State::AboveThreshold)); + EXPECT_EQ(R.second, MinLength); + + // The window's bit buffer contains: + // 70 0s, 10 1s, 10 0s, 10 1s, 10 0s, 10 1s. + // Where: 70 = MinLength - (5 / 3) * SetBitsThresholds, ie. we need + // to add (70 + 1) more zeros to make the bit buffer go below the + // threshold and then the window's length should be: + // 70 + 50 + 70 = 190. + + BitRateWindow::Edge E; + do { + R = BRW.insert(false); + E = R.first; + } while (E.first != State::AboveThreshold || E.second != State::Idle); + EXPECT_EQ(R.second, 2 * MinLength - (5 * SetBitsThreshold) / 3); +} + +TEST(BitRateWindowTest, MinWindow) { + size_t MinLength = 120, MaxLength = 240, IdleLength = 30; + size_t SetBitsThreshold = 30; + + Result R; + BitRateWindow BRW(MinLength, MaxLength, IdleLength, SetBitsThreshold); + + BRW.insert(true); + BRW.insert(false); + for (size_t Idx = 2; Idx != SetBitsThreshold; Idx++) + BRW.insert(true); + for (size_t Idx = SetBitsThreshold; Idx != MinLength - 1; Idx++) + BRW.insert(false); + + R = BRW.insert(true); + EXPECT_EQ(R.first, std::make_pair(State::NotFilled, State::AboveThreshold)); + EXPECT_EQ(R.second, MinLength); + + R = BRW.insert(false); + EXPECT_EQ(R.first, std::make_pair(State::AboveThreshold, State::Idle)); +} + +TEST(BitRateWindowTest, MaxWindow) { + size_t MinLength = 100, MaxLength = 200, IdleLength = 30; + size_t SetBitsThreshold = 50; + + Result R; + BitRateWindow BRW(MinLength, MaxLength, IdleLength, SetBitsThreshold); + + for (size_t Idx = 0; Idx != MaxLength; Idx++) + R = BRW.insert(Idx % 2 == 0); + EXPECT_EQ(R.first, std::make_pair(State::AboveThreshold, State::AboveThreshold)); + EXPECT_EQ(R.second, MaxLength); + + R = BRW.insert(false); + EXPECT_EQ(R.first, std::make_pair(State::AboveThreshold, State::Idle)); +} diff --git a/ml/kmeans/KMeans.cc b/ml/kmeans/KMeans.cc new file mode 100644 index 000000000..e66c66c16 --- /dev/null +++ b/ml/kmeans/KMeans.cc @@ -0,0 +1,55 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +#include "KMeans.h" +#include <dlib/clustering.h> + +void KMeans::train(SamplesBuffer &SB, size_t MaxIterations) { + std::vector<DSample> Samples = SB.preprocess(); + + MinDist = std::numeric_limits<CalculatedNumber>::max(); + MaxDist = std::numeric_limits<CalculatedNumber>::min(); + + { + std::lock_guard<std::mutex> Lock(Mutex); + + ClusterCenters.clear(); + + dlib::pick_initial_centers(NumClusters, ClusterCenters, Samples); + dlib::find_clusters_using_kmeans(Samples, ClusterCenters, MaxIterations); + + for (const auto &S : Samples) { + CalculatedNumber MeanDist = 0.0; + + for (const auto &KMCenter : ClusterCenters) + MeanDist += dlib::length(KMCenter - S); + + MeanDist /= NumClusters; + + if (MeanDist < MinDist) + MinDist = MeanDist; + + if (MeanDist > MaxDist) + MaxDist = MeanDist; + } + } +} + +CalculatedNumber KMeans::anomalyScore(SamplesBuffer &SB) { + std::vector<DSample> DSamples = SB.preprocess(); + + std::unique_lock<std::mutex> Lock(Mutex, std::defer_lock); + if (!Lock.try_lock()) + return std::numeric_limits<CalculatedNumber>::quiet_NaN(); + + CalculatedNumber MeanDist = 0.0; + for (const auto &CC: ClusterCenters) + MeanDist += dlib::length(CC - DSamples.back()); + + MeanDist /= NumClusters; + + if (MaxDist == MinDist) + return 0.0; + + CalculatedNumber AnomalyScore = 100.0 * std::abs((MeanDist - MinDist) / (MaxDist - MinDist)); + return (AnomalyScore > 100.0) ? 100.0 : AnomalyScore; +} diff --git a/ml/kmeans/KMeans.h b/ml/kmeans/KMeans.h new file mode 100644 index 000000000..4ea3b6a89 --- /dev/null +++ b/ml/kmeans/KMeans.h @@ -0,0 +1,34 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +#ifndef KMEANS_H +#define KMEANS_H + +#include <atomic> +#include <vector> +#include <limits> +#include <mutex> + +#include "SamplesBuffer.h" + +class KMeans { +public: + KMeans(size_t NumClusters = 2) : NumClusters(NumClusters) { + MinDist = std::numeric_limits<CalculatedNumber>::max(); + MaxDist = std::numeric_limits<CalculatedNumber>::min(); + }; + + void train(SamplesBuffer &SB, size_t MaxIterations); + CalculatedNumber anomalyScore(SamplesBuffer &SB); + +private: + size_t NumClusters; + + std::vector<DSample> ClusterCenters; + + CalculatedNumber MinDist; + CalculatedNumber MaxDist; + + std::mutex Mutex; +}; + +#endif /* KMEANS_H */ diff --git a/ml/kmeans/Makefile.am b/ml/kmeans/Makefile.am new file mode 100644 index 000000000..babdcf0df --- /dev/null +++ b/ml/kmeans/Makefile.am @@ -0,0 +1,4 @@ +# SPDX-License-Identifier: GPL-3.0-or-later + +AUTOMAKE_OPTIONS = subdir-objects +MAINTAINERCLEANFILES = $(srcdir)/Makefile.in diff --git a/ml/kmeans/SamplesBuffer.cc b/ml/kmeans/SamplesBuffer.cc new file mode 100644 index 000000000..f8211fb54 --- /dev/null +++ b/ml/kmeans/SamplesBuffer.cc @@ -0,0 +1,144 @@ +// SPDX-License-Identifier: GPL-3.0-or-later +// +#include "SamplesBuffer.h" + +#include <fstream> +#include <sstream> +#include <string> + +void Sample::print(std::ostream &OS) const { + for (size_t Idx = 0; Idx != NumDims - 1; Idx++) + OS << CNs[Idx] << ", "; + + OS << CNs[NumDims - 1]; +} + +void SamplesBuffer::print(std::ostream &OS) const { + for (size_t Idx = Preprocessed ? (DiffN + (SmoothN - 1) + (LagN)) : 0; + Idx != NumSamples; Idx++) { + Sample S = Preprocessed ? getPreprocessedSample(Idx) : getSample(Idx); + OS << S << std::endl; + } +} + +std::vector<Sample> SamplesBuffer::getPreprocessedSamples() const { + std::vector<Sample> V; + + for (size_t Idx = Preprocessed ? (DiffN + (SmoothN - 1) + (LagN)) : 0; + Idx != NumSamples; Idx++) { + Sample S = Preprocessed ? getPreprocessedSample(Idx) : getSample(Idx); + V.push_back(S); + } + + return V; +} + +void SamplesBuffer::diffSamples() { + // Panda's DataFrame default behaviour is to subtract each element from + // itself. For us `DiffN = 0` means "disable diff-ing" when preprocessing + // the samples buffer. This deviation will make it easier for us to test + // the KMeans implementation. + if (DiffN == 0) + return; + + for (size_t Idx = 0; Idx != (NumSamples - DiffN); Idx++) { + size_t High = (NumSamples - 1) - Idx; + size_t Low = High - DiffN; + + Sample LHS = getSample(High); + Sample RHS = getSample(Low); + + LHS.diff(RHS); + } +} + +void SamplesBuffer::smoothSamples() { + // Holds the mean value of each window + CalculatedNumber *AccCNs = new CalculatedNumber[NumDimsPerSample](); + Sample Acc(AccCNs, NumDimsPerSample); + + // Used to avoid clobbering the accumulator when moving the window + CalculatedNumber *TmpCNs = new CalculatedNumber[NumDimsPerSample](); + Sample Tmp(TmpCNs, NumDimsPerSample); + + CalculatedNumber Factor = (CalculatedNumber) 1 / SmoothN; + + // Calculate the value of the 1st window + for (size_t Idx = 0; Idx != std::min(SmoothN, NumSamples); Idx++) { + Tmp.add(getSample(NumSamples - (Idx + 1))); + } + + Acc.add(Tmp); + Acc.scale(Factor); + + // Move the window and update the samples + for (size_t Idx = NumSamples; Idx != (DiffN + SmoothN - 1); Idx--) { + Sample S = getSample(Idx - 1); + + // Tmp <- Next window (if any) + if (Idx >= (SmoothN + 1)) { + Tmp.diff(S); + Tmp.add(getSample(Idx - (SmoothN + 1))); + } + + // S <- Acc + S.copy(Acc); + + // Acc <- Tmp + Acc.copy(Tmp); + Acc.scale(Factor); + } + + delete[] AccCNs; + delete[] TmpCNs; +} + +void SamplesBuffer::lagSamples() { + if (LagN == 0) + return; + + for (size_t Idx = NumSamples; Idx != LagN; Idx--) { + Sample PS = getPreprocessedSample(Idx - 1); + PS.lag(getSample(Idx - 1), LagN); + } +} + +std::vector<DSample> SamplesBuffer::preprocess() { + assert(Preprocessed == false); + + std::vector<DSample> DSamples; + size_t OutN = NumSamples; + + // Diff + if (DiffN >= OutN) + return DSamples; + OutN -= DiffN; + diffSamples(); + + // Smooth + if (SmoothN == 0 || SmoothN > OutN) + return DSamples; + OutN -= (SmoothN - 1); + smoothSamples(); + + // Lag + if (LagN >= OutN) + return DSamples; + OutN -= LagN; + lagSamples(); + + DSamples.reserve(OutN); + Preprocessed = true; + + for (size_t Idx = NumSamples - OutN; Idx != NumSamples; Idx++) { + DSample DS; + DS.set_size(NumDimsPerSample * (LagN + 1)); + + const Sample PS = getPreprocessedSample(Idx); + PS.initDSample(DS); + + DSamples.push_back(DS); + } + + return DSamples; +} diff --git a/ml/kmeans/SamplesBuffer.h b/ml/kmeans/SamplesBuffer.h new file mode 100644 index 000000000..fccd216d5 --- /dev/null +++ b/ml/kmeans/SamplesBuffer.h @@ -0,0 +1,140 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +#ifndef SAMPLES_BUFFER_H +#define SAMPLES_BUFFER_H + +#include <iostream> +#include <vector> + +#include <cassert> +#include <cstdlib> +#include <cstring> + +#include <dlib/matrix.h> + +typedef double CalculatedNumber; +typedef dlib::matrix<CalculatedNumber, 0, 1> DSample; + +class Sample { +public: + Sample(CalculatedNumber *Buf, size_t N) : CNs(Buf), NumDims(N) {} + + void initDSample(DSample &DS) const { + for (size_t Idx = 0; Idx != NumDims; Idx++) + DS(Idx) = CNs[Idx]; + } + + void add(const Sample &RHS) const { + assert(NumDims == RHS.NumDims); + + for (size_t Idx = 0; Idx != NumDims; Idx++) + CNs[Idx] += RHS.CNs[Idx]; + }; + + void diff(const Sample &RHS) const { + assert(NumDims == RHS.NumDims); + + for (size_t Idx = 0; Idx != NumDims; Idx++) + CNs[Idx] -= RHS.CNs[Idx]; + }; + + void copy(const Sample &RHS) const { + assert(NumDims == RHS.NumDims); + + std::memcpy(CNs, RHS.CNs, NumDims * sizeof(CalculatedNumber)); + } + + void scale(CalculatedNumber Factor) { + for (size_t Idx = 0; Idx != NumDims; Idx++) + CNs[Idx] *= Factor; + } + + void lag(const Sample &S, size_t LagN) { + size_t N = S.NumDims; + + for (size_t Idx = 0; Idx != (LagN + 1); Idx++) { + Sample Src(S.CNs - (Idx * N), N); + Sample Dst(CNs + (Idx * N), N); + Dst.copy(Src); + } + } + + const CalculatedNumber *getCalculatedNumbers() const { + return CNs; + }; + + void print(std::ostream &OS) const; + +private: + CalculatedNumber *CNs; + size_t NumDims; +}; + +inline std::ostream& operator<<(std::ostream &OS, const Sample &S) { + S.print(OS); + return OS; +} + +class SamplesBuffer { +public: + SamplesBuffer(CalculatedNumber *CNs, + size_t NumSamples, size_t NumDimsPerSample, + size_t DiffN = 1, size_t SmoothN = 3, size_t LagN = 3) : + CNs(CNs), NumSamples(NumSamples), NumDimsPerSample(NumDimsPerSample), + DiffN(DiffN), SmoothN(SmoothN), LagN(LagN), + BytesPerSample(NumDimsPerSample * sizeof(CalculatedNumber)), + Preprocessed(false) {}; + + std::vector<DSample> preprocess(); + std::vector<Sample> getPreprocessedSamples() const; + + size_t capacity() const { return NumSamples; } + void print(std::ostream &OS) const; + +private: + size_t getSampleOffset(size_t Index) const { + assert(Index < NumSamples); + return Index * NumDimsPerSample; + } + + size_t getPreprocessedSampleOffset(size_t Index) const { + assert(Index < NumSamples); + return getSampleOffset(Index) * (LagN + 1); + } + + void setSample(size_t Index, const Sample &S) const { + size_t Offset = getSampleOffset(Index); + std::memcpy(&CNs[Offset], S.getCalculatedNumbers(), BytesPerSample); + } + + const Sample getSample(size_t Index) const { + size_t Offset = getSampleOffset(Index); + return Sample(&CNs[Offset], NumDimsPerSample); + }; + + const Sample getPreprocessedSample(size_t Index) const { + size_t Offset = getPreprocessedSampleOffset(Index); + return Sample(&CNs[Offset], NumDimsPerSample * (LagN + 1)); + }; + + void diffSamples(); + void smoothSamples(); + void lagSamples(); + +private: + CalculatedNumber *CNs; + size_t NumSamples; + size_t NumDimsPerSample; + size_t DiffN; + size_t SmoothN; + size_t LagN; + size_t BytesPerSample; + bool Preprocessed; +}; + +inline std::ostream& operator<<(std::ostream& OS, const SamplesBuffer &SB) { + SB.print(OS); + return OS; +} + +#endif /* SAMPLES_BUFFER_H */ diff --git a/ml/kmeans/Tests.cc b/ml/kmeans/Tests.cc new file mode 100644 index 000000000..0cb595945 --- /dev/null +++ b/ml/kmeans/Tests.cc @@ -0,0 +1,143 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +#include "ml/ml-private.h" +#include <gtest/gtest.h> + +/* + * The SamplesBuffer class implements the functionality of the following python + * code: + * >> df = pd.DataFrame(data=samples) + * >> df = df.diff(diff_n).dropna() + * >> df = df.rolling(smooth_n).mean().dropna() + * >> df = pd.concat([df.shift(n) for n in range(lag_n + 1)], axis=1).dropna() + * + * Its correctness has been verified by automatically generating random + * data frames in Python and comparing them with the correspondent preprocessed + * SampleBuffers. + * + * The following tests are meant to catch unintended changes in the SamplesBuffer + * implementation. For development purposes, one should compare changes against + * the aforementioned python code. +*/ + +TEST(SamplesBufferTest, NS_8_NDPS_1_DN_1_SN_3_LN_1) { + size_t NumSamples = 8, NumDimsPerSample = 1; + size_t DiffN = 1, SmoothN = 3, LagN = 3; + + size_t N = NumSamples * NumDimsPerSample * (LagN + 1); + CalculatedNumber *CNs = new CalculatedNumber[N](); + + CNs[0] = 0.7568336679490107; + CNs[1] = 0.4814406581763254; + CNs[2] = 0.40073555156221874; + CNs[3] = 0.5973257298194408; + CNs[4] = 0.5334727814345868; + CNs[5] = 0.2632477193454843; + CNs[6] = 0.2684839023122384; + CNs[7] = 0.851332948637479; + + SamplesBuffer SB(CNs, NumSamples, NumDimsPerSample, DiffN, SmoothN, LagN); + SB.preprocess(); + + std::vector<Sample> Samples = SB.getPreprocessedSamples(); + EXPECT_EQ(Samples.size(), 2); + + Sample S0 = Samples[0]; + const CalculatedNumber *S0_CNs = S0.getCalculatedNumbers(); + Sample S1 = Samples[1]; + const CalculatedNumber *S1_CNs = S1.getCalculatedNumbers(); + + EXPECT_NEAR(S0_CNs[0], -0.109614, 0.001); + EXPECT_NEAR(S0_CNs[1], -0.0458293, 0.001); + EXPECT_NEAR(S0_CNs[2], 0.017344, 0.001); + EXPECT_NEAR(S0_CNs[3], -0.0531693, 0.001); + + EXPECT_NEAR(S1_CNs[0], 0.105953, 0.001); + EXPECT_NEAR(S1_CNs[1], -0.109614, 0.001); + EXPECT_NEAR(S1_CNs[2], -0.0458293, 0.001); + EXPECT_NEAR(S1_CNs[3], 0.017344, 0.001); + + delete[] CNs; +} + +TEST(SamplesBufferTest, NS_8_NDPS_1_DN_2_SN_3_LN_2) { + size_t NumSamples = 8, NumDimsPerSample = 1; + size_t DiffN = 2, SmoothN = 3, LagN = 2; + + size_t N = NumSamples * NumDimsPerSample * (LagN + 1); + CalculatedNumber *CNs = new CalculatedNumber[N](); + + CNs[0] = 0.20511885291342846; + CNs[1] = 0.13151717360306558; + CNs[2] = 0.6017085062423134; + CNs[3] = 0.46256882933941545; + CNs[4] = 0.7887758447877941; + CNs[5] = 0.9237989080034406; + CNs[6] = 0.15552559051428083; + CNs[7] = 0.6309750314597955; + + SamplesBuffer SB(CNs, NumSamples, NumDimsPerSample, DiffN, SmoothN, LagN); + SB.preprocess(); + + std::vector<Sample> Samples = SB.getPreprocessedSamples(); + EXPECT_EQ(Samples.size(), 2); + + Sample S0 = Samples[0]; + const CalculatedNumber *S0_CNs = S0.getCalculatedNumbers(); + Sample S1 = Samples[1]; + const CalculatedNumber *S1_CNs = S1.getCalculatedNumbers(); + + EXPECT_NEAR(S0_CNs[0], 0.005016, 0.001); + EXPECT_NEAR(S0_CNs[1], 0.326450, 0.001); + EXPECT_NEAR(S0_CNs[2], 0.304903, 0.001); + + EXPECT_NEAR(S1_CNs[0], -0.154948, 0.001); + EXPECT_NEAR(S1_CNs[1], 0.005016, 0.001); + EXPECT_NEAR(S1_CNs[2], 0.326450, 0.001); + + delete[] CNs; +} + +TEST(SamplesBufferTest, NS_8_NDPS_3_DN_2_SN_4_LN_1) { + size_t NumSamples = 8, NumDimsPerSample = 3; + size_t DiffN = 2, SmoothN = 4, LagN = 1; + + size_t N = NumSamples * NumDimsPerSample * (LagN + 1); + CalculatedNumber *CNs = new CalculatedNumber[N](); + + CNs[0] = 0.34310900399667765; CNs[1] = 0.14694315994488194; CNs[2] = 0.8246677800938796; + CNs[3] = 0.48249504592307835; CNs[4] = 0.23241087965531182; CNs[5] = 0.9595348555892567; + CNs[6] = 0.44281094035598334; CNs[7] = 0.5143142171362715; CNs[8] = 0.06391303014242555; + CNs[9] = 0.7460491027783901; CNs[10] = 0.43887217459032923; CNs[11] = 0.2814395025355999; + CNs[12] = 0.9231114281214198; CNs[13] = 0.326882401786898; CNs[14] = 0.26747939220376216; + CNs[15] = 0.7787571209969636; CNs[16] =0.5851700001235088; CNs[17] = 0.34410728945321567; + CNs[18] = 0.9394494507088997; CNs[19] =0.17567223681734334; CNs[20] = 0.42732886195446984; + CNs[21] = 0.9460522396152958; CNs[22] =0.23462747016780894; CNs[23] = 0.35983249900892145; + + SamplesBuffer SB(CNs, NumSamples, NumDimsPerSample, DiffN, SmoothN, LagN); + SB.preprocess(); + + std::vector<Sample> Samples = SB.getPreprocessedSamples(); + EXPECT_EQ(Samples.size(), 2); + + Sample S0 = Samples[0]; + const CalculatedNumber *S0_CNs = S0.getCalculatedNumbers(); + Sample S1 = Samples[1]; + const CalculatedNumber *S1_CNs = S1.getCalculatedNumbers(); + + EXPECT_NEAR(S0_CNs[0], 0.198225, 0.001); + EXPECT_NEAR(S0_CNs[1], 0.003529, 0.001); + EXPECT_NEAR(S0_CNs[2], -0.063003, 0.001); + EXPECT_NEAR(S0_CNs[3], 0.219066, 0.001); + EXPECT_NEAR(S0_CNs[4], 0.133175, 0.001); + EXPECT_NEAR(S0_CNs[5], -0.293154, 0.001); + + EXPECT_NEAR(S1_CNs[0], 0.174160, 0.001); + EXPECT_NEAR(S1_CNs[1], -0.135722, 0.001); + EXPECT_NEAR(S1_CNs[2], 0.110452, 0.001); + EXPECT_NEAR(S1_CNs[3], 0.198225, 0.001); + EXPECT_NEAR(S1_CNs[4], 0.003529, 0.001); + EXPECT_NEAR(S1_CNs[5], -0.063003, 0.001); + + delete[] CNs; +} diff --git a/ml/ml-dummy.c b/ml/ml-dummy.c new file mode 100644 index 000000000..795b80c34 --- /dev/null +++ b/ml/ml-dummy.c @@ -0,0 +1,38 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +#include "ml.h" + +#if !defined(ENABLE_ML) + +void ml_init(void) {} + +void ml_new_host(RRDHOST *RH) { (void) RH; } + +void ml_delete_host(RRDHOST *RH) { (void) RH; } + +char *ml_get_host_info(RRDHOST *RH) { (void) RH; } + +void ml_new_dimension(RRDDIM *RD) { (void) RD; } + +void ml_delete_dimension(RRDDIM *RD) { (void) RD; } + +bool ml_is_anomalous(RRDDIM *RD, double Value, bool Exists) { + (void) RD; (void) Value; (void) Exists; + return false; +} + +char *ml_get_anomaly_events(RRDHOST *RH, const char *AnomalyDetectorName, + int AnomalyDetectorVersion, time_t After, time_t Before) { + (void) RH; (void) AnomalyDetectorName; + (void) AnomalyDetectorVersion; (void) After; (void) Before; + return NULL; +} + +char *ml_get_anomaly_event_info(RRDHOST *RH, const char *AnomalyDetectorName, + int AnomalyDetectorVersion, time_t After, time_t Before) { + (void) RH; (void) AnomalyDetectorName; + (void) AnomalyDetectorVersion; (void) After; (void) Before; + return NULL; +} + +#endif diff --git a/ml/ml-private.h b/ml/ml-private.h new file mode 100644 index 000000000..7b3e00684 --- /dev/null +++ b/ml/ml-private.h @@ -0,0 +1,26 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +#ifndef ML_PRIVATE_H +#define ML_PRIVATE_H + +#include "kmeans/KMeans.h" +#include "ml/ml.h" + +#include <chrono> +#include <map> +#include <mutex> +#include <sstream> + +namespace ml { + +using SteadyClock = std::chrono::steady_clock; +using TimePoint = std::chrono::time_point<SteadyClock>; + +template<typename T> +using Duration = std::chrono::duration<T>; + +using Seconds = std::chrono::seconds; + +} // namespace ml + +#endif /* ML_PRIVATE_H */ diff --git a/ml/ml.cc b/ml/ml.cc new file mode 100644 index 000000000..857d23d33 --- /dev/null +++ b/ml/ml.cc @@ -0,0 +1,153 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +#include "Config.h" +#include "Dimension.h" +#include "Host.h" + +using namespace ml; + +/* + * Assumptions: + * 1) hosts outlive their sets, and sets outlive their dimensions, + * 2) dimensions always have a set that has a host. + */ + +void ml_init(void) { + Cfg.readMLConfig(); +} + +void ml_new_host(RRDHOST *RH) { + if (!Cfg.EnableAnomalyDetection) + return; + + if (simple_pattern_matches(Cfg.SP_HostsToSkip, RH->hostname)) + return; + + Host *H = new Host(RH); + RH->ml_host = static_cast<ml_host_t>(H); + + H->startAnomalyDetectionThreads(); +} + +void ml_delete_host(RRDHOST *RH) { + Host *H = static_cast<Host *>(RH->ml_host); + if (!H) + return; + + H->stopAnomalyDetectionThreads(); + + delete H; + RH->ml_host = nullptr; +} + +void ml_new_dimension(RRDDIM *RD) { + RRDSET *RS = RD->rrdset; + + Host *H = static_cast<Host *>(RD->rrdset->rrdhost->ml_host); + if (!H) + return; + + if (static_cast<unsigned>(RD->update_every) != H->updateEvery()) + return; + + if (simple_pattern_matches(Cfg.SP_ChartsToSkip, RS->name)) + return; + + Dimension *D = new Dimension(RD); + RD->state->ml_dimension = static_cast<ml_dimension_t>(D); + H->addDimension(D); +} + +void ml_delete_dimension(RRDDIM *RD) { + Dimension *D = static_cast<Dimension *>(RD->state->ml_dimension); + if (!D) + return; + + Host *H = static_cast<Host *>(RD->rrdset->rrdhost->ml_host); + H->removeDimension(D); + + RD->state->ml_dimension = nullptr; +} + +char *ml_get_host_info(RRDHOST *RH) { + nlohmann::json ConfigJson; + + if (RH && RH->ml_host) { + Host *H = static_cast<Host *>(RH->ml_host); + H->getConfigAsJson(ConfigJson); + H->getDetectionInfoAsJson(ConfigJson); + } else { + ConfigJson["enabled"] = false; + } + + return strdup(ConfigJson.dump(2, '\t').c_str()); +} + +bool ml_is_anomalous(RRDDIM *RD, double Value, bool Exists) { + Dimension *D = static_cast<Dimension *>(RD->state->ml_dimension); + if (!D) + return false; + + D->addValue(Value, Exists); + bool Result = D->predict().second; + return Result; +} + +char *ml_get_anomaly_events(RRDHOST *RH, const char *AnomalyDetectorName, + int AnomalyDetectorVersion, time_t After, time_t Before) { + if (!RH || !RH->ml_host) { + error("No host"); + return nullptr; + } + + Host *H = static_cast<Host *>(RH->ml_host); + std::vector<std::pair<time_t, time_t>> TimeRanges; + + bool Res = H->getAnomaliesInRange(TimeRanges, AnomalyDetectorName, + AnomalyDetectorVersion, + H->getUUID(), + After, Before); + if (!Res) { + error("DB result is empty"); + return nullptr; + } + + nlohmann::json Json = TimeRanges; + return strdup(Json.dump(4).c_str()); +} + +char *ml_get_anomaly_event_info(RRDHOST *RH, const char *AnomalyDetectorName, + int AnomalyDetectorVersion, time_t After, time_t Before) { + if (!RH || !RH->ml_host) { + error("No host"); + return nullptr; + } + + Host *H = static_cast<Host *>(RH->ml_host); + + nlohmann::json Json; + bool Res = H->getAnomalyInfo(Json, AnomalyDetectorName, + AnomalyDetectorVersion, + H->getUUID(), + After, Before); + if (!Res) { + error("DB result is empty"); + return nullptr; + } + + return strdup(Json.dump(4, '\t').c_str()); +} + +#if defined(ENABLE_ML_TESTS) + +#include "gtest/gtest.h" + +int test_ml(int argc, char *argv[]) { + (void) argc; + (void) argv; + + ::testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +} + +#endif // ENABLE_ML_TESTS diff --git a/ml/ml.h b/ml/ml.h new file mode 100644 index 000000000..96448453c --- /dev/null +++ b/ml/ml.h @@ -0,0 +1,41 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +#ifndef NETDATA_ML_H +#define NETDATA_ML_H + +#ifdef __cplusplus +extern "C" { +#endif + +#include "daemon/common.h" + +typedef void* ml_host_t; +typedef void* ml_dimension_t; + +void ml_init(void); + +void ml_new_host(RRDHOST *RH); +void ml_delete_host(RRDHOST *RH); + +char *ml_get_host_info(RRDHOST *RH); + +void ml_new_dimension(RRDDIM *RD); +void ml_delete_dimension(RRDDIM *RD); + +bool ml_is_anomalous(RRDDIM *RD, double value, bool exists); + +char *ml_get_anomaly_events(RRDHOST *RH, const char *AnomalyDetectorName, + int AnomalyDetectorVersion, time_t After, time_t Before); + +char *ml_get_anomaly_event_info(RRDHOST *RH, const char *AnomalyDetectorName, + int AnomalyDetectorVersion, time_t After, time_t Before); + +#if defined(ENABLE_ML_TESTS) +int test_ml(int argc, char *argv[]); +#endif + +#ifdef __cplusplus +}; +#endif + +#endif /* NETDATA_ML_H */ |