summaryrefslogtreecommitdiffstats
path: root/ml
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--ml/BitBufferCounter.cc29
-rw-r--r--ml/BitBufferCounter.h54
-rw-r--r--ml/BitRateWindow.cc75
-rw-r--r--ml/BitRateWindow.h170
-rw-r--r--ml/Config.cc128
-rw-r--r--ml/Config.h45
-rw-r--r--ml/Database.cc127
-rw-r--r--ml/Database.h131
-rw-r--r--ml/Dimension.cc169
-rw-r--r--ml/Dimension.h124
-rw-r--r--ml/Host.cc458
-rw-r--r--ml/Host.h104
-rw-r--r--ml/Makefile.am8
-rw-r--r--ml/Query.h49
-rw-r--r--ml/Tests.cc301
-rw-r--r--ml/kmeans/KMeans.cc55
-rw-r--r--ml/kmeans/KMeans.h34
-rw-r--r--ml/kmeans/Makefile.am4
-rw-r--r--ml/kmeans/SamplesBuffer.cc144
-rw-r--r--ml/kmeans/SamplesBuffer.h140
-rw-r--r--ml/kmeans/Tests.cc143
-rw-r--r--ml/ml-dummy.c38
-rw-r--r--ml/ml-private.h26
-rw-r--r--ml/ml.cc153
-rw-r--r--ml/ml.h41
25 files changed, 2750 insertions, 0 deletions
diff --git a/ml/BitBufferCounter.cc b/ml/BitBufferCounter.cc
new file mode 100644
index 000000000..5e1ab5aca
--- /dev/null
+++ b/ml/BitBufferCounter.cc
@@ -0,0 +1,29 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+#include "BitBufferCounter.h"
+
+using namespace ml;
+
+std::vector<bool> BitBufferCounter::getBuffer() const {
+ std::vector<bool> Buffer;
+
+ for (size_t Idx = start(); Idx != (start() + size()); Idx++)
+ Buffer.push_back(V[Idx % V.size()]);
+
+ return Buffer;
+}
+
+void BitBufferCounter::insert(bool Bit) {
+ if (N >= V.size())
+ NumSetBits -= (V[start()] == true);
+
+ NumSetBits += (Bit == true);
+ V[N++ % V.size()] = Bit;
+}
+
+void BitBufferCounter::print(std::ostream &OS) const {
+ std::vector<bool> Buffer = getBuffer();
+
+ for (bool B : Buffer)
+ OS << B;
+}
diff --git a/ml/BitBufferCounter.h b/ml/BitBufferCounter.h
new file mode 100644
index 000000000..db924d776
--- /dev/null
+++ b/ml/BitBufferCounter.h
@@ -0,0 +1,54 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+#ifndef BIT_BUFFER_COUNTER_H
+#define BIT_BUFFER_COUNTER_H
+
+#include "ml-private.h"
+
+namespace ml {
+
+class BitBufferCounter {
+public:
+ BitBufferCounter(size_t Capacity) : V(Capacity, 0), NumSetBits(0), N(0) {}
+
+ std::vector<bool> getBuffer() const;
+
+ void insert(bool Bit);
+
+ void print(std::ostream &OS) const;
+
+ bool isFilled() const {
+ return N >= V.size();
+ }
+
+ size_t numSetBits() const {
+ return NumSetBits;
+ }
+
+private:
+ inline size_t size() const {
+ return N < V.size() ? N : V.size();
+ }
+
+ inline size_t start() const {
+ if (N <= V.size())
+ return 0;
+
+ return N % V.size();
+ }
+
+private:
+ std::vector<bool> V;
+ size_t NumSetBits;
+
+ size_t N;
+};
+
+} // namespace ml
+
+inline std::ostream& operator<<(std::ostream &OS, const ml::BitBufferCounter &BBC) {
+ BBC.print(OS);
+ return OS;
+}
+
+#endif /* BIT_BUFFER_COUNTER_H */
diff --git a/ml/BitRateWindow.cc b/ml/BitRateWindow.cc
new file mode 100644
index 000000000..c4c994c42
--- /dev/null
+++ b/ml/BitRateWindow.cc
@@ -0,0 +1,75 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+#include "BitRateWindow.h"
+
+using namespace ml;
+
+std::pair<BitRateWindow::Edge, size_t> BitRateWindow::insert(bool Bit) {
+ Edge E;
+
+ BBC.insert(Bit);
+ switch (CurrState) {
+ case State::NotFilled: {
+ if (BBC.isFilled()) {
+ if (BBC.numSetBits() < SetBitsThreshold) {
+ CurrState = State::BelowThreshold;
+ } else {
+ CurrState = State::AboveThreshold;
+ }
+ } else {
+ CurrState = State::NotFilled;
+ }
+
+ E = {State::NotFilled, CurrState};
+ break;
+ } case State::BelowThreshold: {
+ if (BBC.numSetBits() >= SetBitsThreshold) {
+ CurrState = State::AboveThreshold;
+ }
+
+ E = {State::BelowThreshold, CurrState};
+ break;
+ } case State::AboveThreshold: {
+ if ((BBC.numSetBits() < SetBitsThreshold) ||
+ (CurrLength == MaxLength)) {
+ CurrState = State::Idle;
+ }
+
+ E = {State::AboveThreshold, CurrState};
+ break;
+ } case State::Idle: {
+ if (CurrLength == IdleLength) {
+ CurrState = State::NotFilled;
+ }
+
+ E = {State::Idle, CurrState};
+ break;
+ }
+ }
+
+ Action A = EdgeActions[E];
+ size_t L = (this->*A)(E.first, Bit);
+ return {E, L};
+}
+
+void BitRateWindow::print(std::ostream &OS) const {
+ switch (CurrState) {
+ case State::NotFilled:
+ OS << "NotFilled";
+ break;
+ case State::BelowThreshold:
+ OS << "BelowThreshold";
+ break;
+ case State::AboveThreshold:
+ OS << "AboveThreshold";
+ break;
+ case State::Idle:
+ OS << "Idle";
+ break;
+ default:
+ OS << "UnknownState";
+ break;
+ }
+
+ OS << ": " << BBC << " (Current Length: " << CurrLength << ")";
+}
diff --git a/ml/BitRateWindow.h b/ml/BitRateWindow.h
new file mode 100644
index 000000000..0d99008b8
--- /dev/null
+++ b/ml/BitRateWindow.h
@@ -0,0 +1,170 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+#ifndef BIT_RATE_WINDOW_H
+#define BIT_RATE_WINDOW_H
+
+#include "BitBufferCounter.h"
+#include "ml-private.h"
+
+namespace ml {
+
+class BitRateWindow {
+public:
+ enum class State {
+ NotFilled,
+ BelowThreshold,
+ AboveThreshold,
+ Idle
+ };
+
+ using Edge = std::pair<State, State>;
+ using Action = size_t (BitRateWindow::*)(State PrevState, bool NewBit);
+
+private:
+ std::map<Edge, Action> EdgeActions = {
+ // From == To
+ {
+ Edge(State::NotFilled, State::NotFilled),
+ &BitRateWindow::onRoundtripNotFilled,
+ },
+ {
+ Edge(State::BelowThreshold, State::BelowThreshold),
+ &BitRateWindow::onRoundtripBelowThreshold,
+ },
+ {
+ Edge(State::AboveThreshold, State::AboveThreshold),
+ &BitRateWindow::onRoundtripAboveThreshold,
+ },
+ {
+ Edge(State::Idle, State::Idle),
+ &BitRateWindow::onRoundtripIdle,
+ },
+
+
+ // NotFilled => {BelowThreshold, AboveThreshold}
+ {
+ Edge(State::NotFilled, State::BelowThreshold),
+ &BitRateWindow::onNotFilledToBelowThreshold
+ },
+ {
+ Edge(State::NotFilled, State::AboveThreshold),
+ &BitRateWindow::onNotFilledToAboveThreshold
+ },
+
+ // BelowThreshold => AboveThreshold
+ {
+ Edge(State::BelowThreshold, State::AboveThreshold),
+ &BitRateWindow::onBelowToAboveThreshold
+ },
+
+ // AboveThreshold => Idle
+ {
+ Edge(State::AboveThreshold, State::Idle),
+ &BitRateWindow::onAboveThresholdToIdle
+ },
+
+ // Idle => NotFilled
+ {
+ Edge(State::Idle, State::NotFilled),
+ &BitRateWindow::onIdleToNotFilled
+ },
+ };
+
+public:
+ BitRateWindow(size_t MinLength, size_t MaxLength, size_t IdleLength,
+ size_t SetBitsThreshold) :
+ MinLength(MinLength), MaxLength(MaxLength), IdleLength(IdleLength),
+ SetBitsThreshold(SetBitsThreshold),
+ CurrState(State::NotFilled), CurrLength(0), BBC(MinLength) {}
+
+ std::pair<Edge, size_t> insert(bool Bit);
+
+ void print(std::ostream &OS) const;
+
+private:
+ size_t onRoundtripNotFilled(State PrevState, bool NewBit) {
+ (void) PrevState, (void) NewBit;
+
+ CurrLength += 1;
+ return CurrLength;
+ }
+
+ size_t onRoundtripBelowThreshold(State PrevState, bool NewBit) {
+ (void) PrevState, (void) NewBit;
+
+ CurrLength = MinLength;
+ return CurrLength;
+ }
+
+ size_t onRoundtripAboveThreshold(State PrevState, bool NewBit) {
+ (void) PrevState, (void) NewBit;
+
+ CurrLength += 1;
+ return CurrLength;
+ }
+
+ size_t onRoundtripIdle(State PrevState, bool NewBit) {
+ (void) PrevState, (void) NewBit;
+
+ CurrLength += 1;
+ return CurrLength;
+ }
+
+ size_t onNotFilledToBelowThreshold(State PrevState, bool NewBit) {
+ (void) PrevState, (void) NewBit;
+
+ CurrLength = MinLength;
+ return CurrLength;
+ }
+
+ size_t onNotFilledToAboveThreshold(State PrevState, bool NewBit) {
+ (void) PrevState, (void) NewBit;
+
+ CurrLength += 1;
+ return CurrLength;
+ }
+
+ size_t onBelowToAboveThreshold(State PrevState, bool NewBit) {
+ (void) PrevState, (void) NewBit;
+
+ CurrLength = MinLength;
+ return CurrLength;
+ }
+
+ size_t onAboveThresholdToIdle(State PrevState, bool NewBit) {
+ (void) PrevState, (void) NewBit;
+
+ size_t PrevLength = CurrLength;
+ CurrLength = 1;
+ return PrevLength;
+ }
+
+ size_t onIdleToNotFilled(State PrevState, bool NewBit) {
+ (void) PrevState, (void) NewBit;
+
+ BBC = BitBufferCounter(MinLength);
+ BBC.insert(NewBit);
+
+ CurrLength = 1;
+ return CurrLength;
+ }
+
+private:
+ size_t MinLength;
+ size_t MaxLength;
+ size_t IdleLength;
+ size_t SetBitsThreshold;
+
+ State CurrState;
+ size_t CurrLength;
+ BitBufferCounter BBC;
+};
+
+} // namespace ml
+
+inline std::ostream& operator<<(std::ostream &OS, const ml::BitRateWindow BRW) {
+ BRW.print(OS);
+ return OS;
+}
+
+#endif /* BIT_RATE_WINDOW_H */
diff --git a/ml/Config.cc b/ml/Config.cc
new file mode 100644
index 000000000..f48f9b39f
--- /dev/null
+++ b/ml/Config.cc
@@ -0,0 +1,128 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+#include "Config.h"
+#include "ml-private.h"
+
+using namespace ml;
+
+/*
+ * Global configuration instance to be shared between training and
+ * prediction threads.
+ */
+Config ml::Cfg;
+
+template <typename T>
+static T clamp(const T& Value, const T& Min, const T& Max) {
+ return std::max(Min, std::min(Value, Max));
+}
+
+/*
+ * Initialize global configuration variable.
+ */
+void Config::readMLConfig(void) {
+ const char *ConfigSectionML = CONFIG_SECTION_ML;
+
+ bool EnableAnomalyDetection = config_get_boolean(ConfigSectionML, "enabled", false);
+
+ /*
+ * Read values
+ */
+
+ unsigned MaxTrainSamples = config_get_number(ConfigSectionML, "maximum num samples to train", 4 * 3600);
+ unsigned MinTrainSamples = config_get_number(ConfigSectionML, "minimum num samples to train", 1 * 3600);
+ unsigned TrainEvery = config_get_number(ConfigSectionML, "train every", 1 * 3600);
+
+ unsigned DiffN = config_get_number(ConfigSectionML, "num samples to diff", 1);
+ unsigned SmoothN = config_get_number(ConfigSectionML, "num samples to smooth", 3);
+ unsigned LagN = config_get_number(ConfigSectionML, "num samples to lag", 5);
+
+ unsigned MaxKMeansIters = config_get_number(ConfigSectionML, "maximum number of k-means iterations", 1000);
+
+ double DimensionAnomalyScoreThreshold = config_get_float(ConfigSectionML, "dimension anomaly score threshold", 0.99);
+ double HostAnomalyRateThreshold = config_get_float(ConfigSectionML, "host anomaly rate threshold", 0.01);
+
+ double ADMinWindowSize = config_get_float(ConfigSectionML, "minimum window size", 30);
+ double ADMaxWindowSize = config_get_float(ConfigSectionML, "maximum window size", 600);
+ double ADIdleWindowSize = config_get_float(ConfigSectionML, "idle window size", 30);
+ double ADWindowRateThreshold = config_get_float(ConfigSectionML, "window minimum anomaly rate", 0.25);
+ double ADDimensionRateThreshold = config_get_float(ConfigSectionML, "anomaly event min dimension rate threshold", 0.05);
+
+ std::string HostsToSkip = config_get(ConfigSectionML, "hosts to skip from training", "!*");
+ std::string ChartsToSkip = config_get(ConfigSectionML, "charts to skip from training",
+ "!system.* !cpu.* !mem.* !disk.* !disk_* "
+ "!ip.* !ipv4.* !ipv6.* !net.* !net_* !netfilter.* "
+ "!services.* !apps.* !groups.* !user.* !ebpf.* !netdata.* *");
+
+ std::stringstream SS;
+ SS << netdata_configured_cache_dir << "/anomaly-detection.db";
+ Cfg.AnomalyDBPath = SS.str();
+
+ /*
+ * Clamp
+ */
+
+ MaxTrainSamples = clamp(MaxTrainSamples, 1 * 3600u, 6 * 3600u);
+ MinTrainSamples = clamp(MinTrainSamples, 1 * 3600u, 6 * 3600u);
+ TrainEvery = clamp(TrainEvery, 1 * 3600u, 6 * 3600u);
+
+ DiffN = clamp(DiffN, 0u, 1u);
+ SmoothN = clamp(SmoothN, 0u, 5u);
+ LagN = clamp(LagN, 0u, 5u);
+
+ MaxKMeansIters = clamp(MaxKMeansIters, 500u, 1000u);
+
+ DimensionAnomalyScoreThreshold = clamp(DimensionAnomalyScoreThreshold, 0.01, 5.00);
+ HostAnomalyRateThreshold = clamp(HostAnomalyRateThreshold, 0.01, 1.0);
+
+ ADMinWindowSize = clamp(ADMinWindowSize, 30.0, 300.0);
+ ADMaxWindowSize = clamp(ADMaxWindowSize, 60.0, 900.0);
+ ADIdleWindowSize = clamp(ADIdleWindowSize, 30.0, 900.0);
+ ADWindowRateThreshold = clamp(ADWindowRateThreshold, 0.01, 0.99);
+ ADDimensionRateThreshold = clamp(ADDimensionRateThreshold, 0.01, 0.99);
+
+ /*
+ * Validate
+ */
+
+ if (MinTrainSamples >= MaxTrainSamples) {
+ error("invalid min/max train samples found (%u >= %u)", MinTrainSamples, MaxTrainSamples);
+
+ MinTrainSamples = 1 * 3600;
+ MaxTrainSamples = 4 * 3600;
+ }
+
+ if (ADMinWindowSize >= ADMaxWindowSize) {
+ error("invalid min/max anomaly window size found (%lf >= %lf)", ADMinWindowSize, ADMaxWindowSize);
+
+ ADMinWindowSize = 30.0;
+ ADMaxWindowSize = 600.0;
+ }
+
+ /*
+ * Assign to config instance
+ */
+
+ Cfg.EnableAnomalyDetection = EnableAnomalyDetection;
+
+ Cfg.MaxTrainSamples = MaxTrainSamples;
+ Cfg.MinTrainSamples = MinTrainSamples;
+ Cfg.TrainEvery = TrainEvery;
+
+ Cfg.DiffN = DiffN;
+ Cfg.SmoothN = SmoothN;
+ Cfg.LagN = LagN;
+
+ Cfg.MaxKMeansIters = MaxKMeansIters;
+
+ Cfg.DimensionAnomalyScoreThreshold = DimensionAnomalyScoreThreshold;
+ Cfg.HostAnomalyRateThreshold = HostAnomalyRateThreshold;
+
+ Cfg.ADMinWindowSize = ADMinWindowSize;
+ Cfg.ADMaxWindowSize = ADMaxWindowSize;
+ Cfg.ADIdleWindowSize = ADIdleWindowSize;
+ Cfg.ADWindowRateThreshold = ADWindowRateThreshold;
+ Cfg.ADDimensionRateThreshold = ADDimensionRateThreshold;
+
+ Cfg.SP_HostsToSkip = simple_pattern_create(HostsToSkip.c_str(), NULL, SIMPLE_PATTERN_EXACT);
+ Cfg.SP_ChartsToSkip = simple_pattern_create(ChartsToSkip.c_str(), NULL, SIMPLE_PATTERN_EXACT);
+}
diff --git a/ml/Config.h b/ml/Config.h
new file mode 100644
index 000000000..f29bae3a6
--- /dev/null
+++ b/ml/Config.h
@@ -0,0 +1,45 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+#ifndef ML_CONFIG_H
+#define ML_CONFIG_H
+
+#include "ml-private.h"
+
+namespace ml {
+
+class Config {
+public:
+ bool EnableAnomalyDetection;
+
+ unsigned MaxTrainSamples;
+ unsigned MinTrainSamples;
+ unsigned TrainEvery;
+
+ unsigned DiffN;
+ unsigned SmoothN;
+ unsigned LagN;
+
+ unsigned MaxKMeansIters;
+
+ double DimensionAnomalyScoreThreshold;
+ double HostAnomalyRateThreshold;
+
+ double ADMinWindowSize;
+ double ADMaxWindowSize;
+ double ADIdleWindowSize;
+ double ADWindowRateThreshold;
+ double ADDimensionRateThreshold;
+
+ SIMPLE_PATTERN *SP_HostsToSkip;
+ SIMPLE_PATTERN *SP_ChartsToSkip;
+
+ std::string AnomalyDBPath;
+
+ void readMLConfig();
+};
+
+extern Config Cfg;
+
+} // namespace ml
+
+#endif /* ML_CONFIG_H */
diff --git a/ml/Database.cc b/ml/Database.cc
new file mode 100644
index 000000000..06d0cdecb
--- /dev/null
+++ b/ml/Database.cc
@@ -0,0 +1,127 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+#include "Database.h"
+
+const char *ml::Database::SQL_CREATE_ANOMALIES_TABLE =
+ "CREATE TABLE IF NOT EXISTS anomaly_events( "
+ " anomaly_detector_name text NOT NULL, "
+ " anomaly_detector_version int NOT NULL, "
+ " host_id text NOT NULL, "
+ " after int NOT NULL, "
+ " before int NOT NULL, "
+ " anomaly_event_info text, "
+ " PRIMARY KEY( "
+ " anomaly_detector_name, anomaly_detector_version, "
+ " host_id, after, before "
+ " ) "
+ ");";
+
+const char *ml::Database::SQL_INSERT_ANOMALY =
+ "INSERT INTO anomaly_events( "
+ " anomaly_detector_name, anomaly_detector_version, "
+ " host_id, after, before, anomaly_event_info) "
+ "VALUES (?1, ?2, ?3, ?4, ?5, ?6);";
+
+const char *ml::Database::SQL_SELECT_ANOMALY =
+ "SELECT anomaly_event_info FROM anomaly_events WHERE"
+ " anomaly_detector_name == ?1 AND"
+ " anomaly_detector_version == ?2 AND"
+ " host_id == ?3 AND"
+ " after == ?4 AND"
+ " before == ?5;";
+
+const char *ml::Database::SQL_SELECT_ANOMALY_EVENTS =
+ "SELECT after, before FROM anomaly_events WHERE"
+ " anomaly_detector_name == ?1 AND"
+ " anomaly_detector_version == ?2 AND"
+ " host_id == ?3 AND"
+ " after >= ?4 AND"
+ " before <= ?5;";
+
+using namespace ml;
+
+bool Statement::prepare(sqlite3 *Conn) {
+ if (!Conn)
+ return false;
+
+ if (ParsedStmt)
+ return true;
+
+ int RC = sqlite3_prepare_v2(Conn, RawStmt, -1, &ParsedStmt, nullptr);
+ if (RC == SQLITE_OK)
+ return true;
+
+ std::string Msg = "Statement \"%s\" preparation failed due to \"%s\"";
+ error(Msg.c_str(), RawStmt, sqlite3_errstr(RC));
+
+ return false;
+}
+
+bool Statement::bindValue(size_t Pos, const std::string &Value) {
+ int RC = sqlite3_bind_text(ParsedStmt, Pos, Value.c_str(), -1, SQLITE_TRANSIENT);
+ if (RC == SQLITE_OK)
+ return true;
+
+ error("Failed to bind text '%s' (pos = %zu) in statement '%s'.", Value.c_str(), Pos, RawStmt);
+ return false;
+}
+
+bool Statement::bindValue(size_t Pos, const int Value) {
+ int RC = sqlite3_bind_int(ParsedStmt, Pos, Value);
+ if (RC == SQLITE_OK)
+ return true;
+
+ error("Failed to bind integer %d (pos = %zu) in statement '%s'.", Value, Pos, RawStmt);
+ return false;
+}
+
+bool Statement::resetAndClear(bool Ret) {
+ int RC = sqlite3_reset(ParsedStmt);
+ if (RC != SQLITE_OK) {
+ error("Could not reset statement: '%s'", RawStmt);
+ return false;
+ }
+
+ RC = sqlite3_clear_bindings(ParsedStmt);
+ if (RC != SQLITE_OK) {
+ error("Could not clear bindings in statement: '%s'", RawStmt);
+ return false;
+ }
+
+ return Ret;
+}
+
+Database::Database(const std::string &Path) {
+ // Get sqlite3 connection handle.
+ int RC = sqlite3_open(Path.c_str(), &Conn);
+ if (RC != SQLITE_OK) {
+ std::string Msg = "Failed to initialize ML DB at %s, due to \"%s\"";
+ error(Msg.c_str(), Path.c_str(), sqlite3_errstr(RC));
+
+ sqlite3_close(Conn);
+ Conn = nullptr;
+ return;
+ }
+
+ // Create anomaly events table if it does not exist.
+ char *ErrMsg;
+ RC = sqlite3_exec(Conn, SQL_CREATE_ANOMALIES_TABLE, nullptr, nullptr, &ErrMsg);
+ if (RC == SQLITE_OK)
+ return;
+
+ error("SQLite error during database initialization, rc = %d (%s)", RC, ErrMsg);
+ error("SQLite failed statement: %s", SQL_CREATE_ANOMALIES_TABLE);
+
+ sqlite3_free(ErrMsg);
+ sqlite3_close(Conn);
+ Conn = nullptr;
+}
+
+Database::~Database() {
+ if (!Conn)
+ return;
+
+ int RC = sqlite3_close(Conn);
+ if (RC != SQLITE_OK)
+ error("Could not close connection properly (rc=%d)", RC);
+}
diff --git a/ml/Database.h b/ml/Database.h
new file mode 100644
index 000000000..cc7b75872
--- /dev/null
+++ b/ml/Database.h
@@ -0,0 +1,131 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+#ifndef ML_DATABASE_H
+#define ML_DATABASE_H
+
+#include "Dimension.h"
+#include "ml-private.h"
+
+#include "json/single_include/nlohmann/json.hpp"
+
+namespace ml {
+
+class Statement {
+public:
+ using RowCallback = std::function<void(sqlite3_stmt *Stmt)>;
+
+public:
+ Statement(const char *RawStmt) : RawStmt(RawStmt), ParsedStmt(nullptr) {}
+
+ template<typename ...ArgTypes>
+ bool exec(sqlite3 *Conn, RowCallback RowCb, ArgTypes ...Args) {
+ if (!prepare(Conn))
+ return false;
+
+ switch (bind(1, Args...)) {
+ case 0:
+ return false;
+ case sizeof...(Args):
+ break;
+ default:
+ return resetAndClear(false);
+ }
+
+ while (true) {
+ switch (int RC = sqlite3_step(ParsedStmt)) {
+ case SQLITE_BUSY: case SQLITE_LOCKED:
+ usleep(SQLITE_INSERT_DELAY * USEC_PER_MS);
+ continue;
+ case SQLITE_ROW:
+ RowCb(ParsedStmt);
+ continue;
+ case SQLITE_DONE:
+ return resetAndClear(true);
+ default:
+ error("Stepping through '%s' returned rc=%d", RawStmt, RC);
+ return resetAndClear(false);
+ }
+ }
+ }
+
+ ~Statement() {
+ if (!ParsedStmt)
+ return;
+
+ int RC = sqlite3_finalize(ParsedStmt);
+ if (RC != SQLITE_OK)
+ error("Could not properly finalize statement (rc=%d)", RC);
+ }
+
+private:
+ bool prepare(sqlite3 *Conn);
+
+ bool bindValue(size_t Pos, const int Value);
+ bool bindValue(size_t Pos, const std::string &Value);
+
+ template<typename ArgType, typename ...ArgTypes>
+ size_t bind(size_t Pos, ArgType T) {
+ return bindValue(Pos, T);
+ }
+
+ template<typename ArgType, typename ...ArgTypes>
+ size_t bind(size_t Pos, ArgType T, ArgTypes ...Args) {
+ return bindValue(Pos, T) + bind(Pos + 1, Args...);
+ }
+
+ bool resetAndClear(bool Ret);
+
+private:
+ const char *RawStmt;
+ sqlite3_stmt *ParsedStmt;
+};
+
+class Database {
+private:
+ static const char *SQL_CREATE_ANOMALIES_TABLE;
+ static const char *SQL_INSERT_ANOMALY;
+ static const char *SQL_SELECT_ANOMALY;
+ static const char *SQL_SELECT_ANOMALY_EVENTS;
+
+public:
+ Database(const std::string &Path);
+
+ ~Database();
+
+ template<typename ...ArgTypes>
+ bool insertAnomaly(ArgTypes... Args) {
+ Statement::RowCallback RowCb = [](sqlite3_stmt *Stmt) { (void) Stmt; };
+ return InsertAnomalyStmt.exec(Conn, RowCb, Args...);
+ }
+
+ template<typename ...ArgTypes>
+ bool getAnomalyInfo(nlohmann::json &Json, ArgTypes&&... Args) {
+ Statement::RowCallback RowCb = [&](sqlite3_stmt *Stmt) {
+ const char *Text = static_cast<const char *>(sqlite3_column_blob(Stmt, 0));
+ Json = nlohmann::json::parse(Text);
+ };
+ return GetAnomalyInfoStmt.exec(Conn, RowCb, Args...);
+ }
+
+ template<typename ...ArgTypes>
+ bool getAnomaliesInRange(std::vector<std::pair<time_t, time_t>> &V, ArgTypes&&... Args) {
+ Statement::RowCallback RowCb = [&](sqlite3_stmt *Stmt) {
+ V.push_back({
+ sqlite3_column_int64(Stmt, 0),
+ sqlite3_column_int64(Stmt, 1)
+ });
+ };
+ return GetAnomaliesInRangeStmt.exec(Conn, RowCb, Args...);
+ }
+
+private:
+ sqlite3 *Conn;
+
+ Statement InsertAnomalyStmt{SQL_INSERT_ANOMALY};
+ Statement GetAnomalyInfoStmt{SQL_SELECT_ANOMALY};
+ Statement GetAnomaliesInRangeStmt{SQL_SELECT_ANOMALY_EVENTS};
+};
+
+}
+
+#endif /* ML_DATABASE_H */
diff --git a/ml/Dimension.cc b/ml/Dimension.cc
new file mode 100644
index 000000000..c27f30bb4
--- /dev/null
+++ b/ml/Dimension.cc
@@ -0,0 +1,169 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+#include "Config.h"
+#include "Dimension.h"
+#include "Query.h"
+
+using namespace ml;
+
+/*
+ * Copy of the unpack_storage_number which allows us to convert
+ * a storage_number to double.
+ */
+static CalculatedNumber unpack_storage_number_dbl(storage_number value) {
+ if(!value)
+ return 0;
+
+ int sign = 0, exp = 0;
+ int factor = 10;
+
+ // bit 32 = 0:positive, 1:negative
+ if(unlikely(value & (1 << 31)))
+ sign = 1;
+
+ // bit 31 = 0:divide, 1:multiply
+ if(unlikely(value & (1 << 30)))
+ exp = 1;
+
+ // bit 27 SN_EXISTS_100
+ if(unlikely(value & (1 << 26)))
+ factor = 100;
+
+ // bit 26 SN_EXISTS_RESET
+ // bit 25 SN_ANOMALY_BIT
+
+ // bit 30, 29, 28 = (multiplier or divider) 0-7 (8 total)
+ int mul = (value & ((1<<29)|(1<<28)|(1<<27))) >> 27;
+
+ // bit 24 to bit 1 = the value, so remove all other bits
+ value ^= value & ((1<<31)|(1<<30)|(1<<29)|(1<<28)|(1<<27)|(1<<26)|(1<<25)|(1<<24));
+
+ CalculatedNumber CN = value;
+
+ if(exp) {
+ for(; mul; mul--)
+ CN *= factor;
+ }
+ else {
+ for( ; mul ; mul--)
+ CN /= 10;
+ }
+
+ if(sign)
+ CN = -CN;
+
+ return CN;
+}
+
+std::pair<CalculatedNumber *, size_t>
+TrainableDimension::getCalculatedNumbers() {
+ size_t MinN = Cfg.MinTrainSamples;
+ size_t MaxN = Cfg.MaxTrainSamples;
+
+ // Figure out what our time window should be.
+ time_t BeforeT = now_realtime_sec() - 1;
+ time_t AfterT = BeforeT - (MaxN * updateEvery());
+
+ BeforeT -= (BeforeT % updateEvery());
+ AfterT -= (AfterT % updateEvery());
+
+ BeforeT = std::min(BeforeT, latestTime());
+ AfterT = std::max(AfterT, oldestTime());
+
+ if (AfterT >= BeforeT)
+ return { nullptr, 0 };
+
+ CalculatedNumber *CNs = new CalculatedNumber[MaxN * (Cfg.LagN + 1)]();
+
+ // Start the query.
+ unsigned Idx = 0;
+ unsigned CollectedValues = 0;
+ unsigned TotalValues = 0;
+
+ CalculatedNumber LastValue = std::numeric_limits<CalculatedNumber>::quiet_NaN();
+ Query Q = Query(getRD());
+
+ Q.init(AfterT, BeforeT);
+ while (!Q.isFinished()) {
+ if (Idx == MaxN)
+ break;
+
+ auto P = Q.nextMetric();
+ storage_number SN = P.second;
+
+ if (does_storage_number_exist(SN)) {
+ CNs[Idx] = unpack_storage_number_dbl(SN);
+ LastValue = CNs[Idx];
+ CollectedValues++;
+ } else
+ CNs[Idx] = LastValue;
+
+ Idx++;
+ }
+ TotalValues = Idx;
+
+ if (CollectedValues < MinN) {
+ delete[] CNs;
+ return { nullptr, 0 };
+ }
+
+ // Find first non-NaN value.
+ for (Idx = 0; std::isnan(CNs[Idx]); Idx++, TotalValues--) { }
+
+ // Overwrite NaN values.
+ if (Idx != 0)
+ memmove(CNs, &CNs[Idx], sizeof(CalculatedNumber) * TotalValues);
+
+ return { CNs, TotalValues };
+}
+
+MLResult TrainableDimension::trainModel() {
+ auto P = getCalculatedNumbers();
+ CalculatedNumber *CNs = P.first;
+ unsigned N = P.second;
+
+ if (!CNs)
+ return MLResult::MissingData;
+
+ SamplesBuffer SB = SamplesBuffer(CNs, N, 1, Cfg.DiffN, Cfg.SmoothN, Cfg.LagN);
+ KM.train(SB, Cfg.MaxKMeansIters);
+ Trained = true;
+
+ delete[] CNs;
+ return MLResult::Success;
+}
+
+void PredictableDimension::addValue(CalculatedNumber Value, bool Exists) {
+ if (!Exists) {
+ CNs.clear();
+ return;
+ }
+
+ unsigned N = Cfg.DiffN + Cfg.SmoothN + Cfg.LagN;
+ if (CNs.size() < N) {
+ CNs.push_back(Value);
+ return;
+ }
+
+ std::rotate(std::begin(CNs), std::begin(CNs) + 1, std::end(CNs));
+ CNs[N - 1] = Value;
+}
+
+std::pair<MLResult, bool> PredictableDimension::predict() {
+ unsigned N = Cfg.DiffN + Cfg.SmoothN + Cfg.LagN;
+ if (CNs.size() != N)
+ return { MLResult::MissingData, AnomalyBit };
+
+ CalculatedNumber *TmpCNs = new CalculatedNumber[N * (Cfg.LagN + 1)]();
+ std::memcpy(TmpCNs, CNs.data(), N * sizeof(CalculatedNumber));
+
+ SamplesBuffer SB = SamplesBuffer(TmpCNs, N, 1, Cfg.DiffN, Cfg.SmoothN, Cfg.LagN);
+ AnomalyScore = computeAnomalyScore(SB);
+ delete[] TmpCNs;
+
+ if (AnomalyScore == std::numeric_limits<CalculatedNumber>::quiet_NaN())
+ return { MLResult::NaN, AnomalyBit };
+
+ AnomalyBit = AnomalyScore >= (100 * Cfg.DimensionAnomalyScoreThreshold);
+ return { MLResult::Success, AnomalyBit };
+}
diff --git a/ml/Dimension.h b/ml/Dimension.h
new file mode 100644
index 000000000..fdf923ccc
--- /dev/null
+++ b/ml/Dimension.h
@@ -0,0 +1,124 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+#ifndef ML_DIMENSION_H
+#define ML_DIMENSION_H
+
+#include "BitBufferCounter.h"
+#include "Config.h"
+
+#include "ml-private.h"
+
+namespace ml {
+
+class RrdDimension {
+public:
+ RrdDimension(RRDDIM *RD) : RD(RD), Ops(&RD->state->query_ops) {
+ std::stringstream SS;
+ SS << RD->rrdset->id << "|" << RD->name;
+ ID = SS.str();
+ }
+
+ RRDDIM *getRD() const { return RD; }
+
+ time_t latestTime() { return Ops->latest_time(RD); }
+
+ time_t oldestTime() { return Ops->oldest_time(RD); }
+
+ unsigned updateEvery() const { return RD->update_every; }
+
+ const std::string getID() const { return ID; }
+
+ virtual ~RrdDimension() {}
+
+private:
+ RRDDIM *RD;
+ struct rrddim_volatile::rrddim_query_ops *Ops;
+
+ std::string ID;
+};
+
+enum class MLResult {
+ Success = 0,
+ MissingData,
+ NaN,
+};
+
+class TrainableDimension : public RrdDimension {
+public:
+ TrainableDimension(RRDDIM *RD) :
+ RrdDimension(RD), TrainEvery(Cfg.TrainEvery * updateEvery()) {}
+
+ MLResult trainModel();
+
+ CalculatedNumber computeAnomalyScore(SamplesBuffer &SB) {
+ return Trained ? KM.anomalyScore(SB) : 0.0;
+ }
+
+ bool shouldTrain(const TimePoint &TP) const {
+ return (LastTrainedAt + TrainEvery) < TP;
+ }
+
+ bool isTrained() const { return Trained; }
+
+ double updateTrainingDuration(double Duration) {
+ return TrainingDuration.exchange(Duration);
+ }
+
+private:
+ std::pair<CalculatedNumber *, size_t> getCalculatedNumbers();
+
+public:
+ TimePoint LastTrainedAt{Seconds{0}};
+
+private:
+ Seconds TrainEvery;
+ KMeans KM;
+
+ std::atomic<bool> Trained{false};
+ std::atomic<double> TrainingDuration{0.0};
+};
+
+class PredictableDimension : public TrainableDimension {
+public:
+ PredictableDimension(RRDDIM *RD) : TrainableDimension(RD) {}
+
+ std::pair<MLResult, bool> predict();
+
+ void addValue(CalculatedNumber Value, bool Exists);
+
+ bool isAnomalous() { return AnomalyBit; }
+
+private:
+ CalculatedNumber AnomalyScore{0.0};
+ std::atomic<bool> AnomalyBit{false};
+
+ std::vector<CalculatedNumber> CNs;
+};
+
+class DetectableDimension : public PredictableDimension {
+public:
+ DetectableDimension(RRDDIM *RD) : PredictableDimension(RD) {}
+
+ std::pair<bool, double> detect(size_t WindowLength, bool Reset) {
+ bool AnomalyBit = isAnomalous();
+
+ if (Reset)
+ NumSetBits = BBC.numSetBits();
+
+ NumSetBits += AnomalyBit;
+ BBC.insert(AnomalyBit);
+
+ double AnomalyRate = static_cast<double>(NumSetBits) / WindowLength;
+ return { AnomalyBit, AnomalyRate };
+ }
+
+private:
+ BitBufferCounter BBC{static_cast<size_t>(Cfg.ADMinWindowSize)};
+ size_t NumSetBits{0};
+};
+
+using Dimension = DetectableDimension;
+
+} // namespace ml
+
+#endif /* ML_DIMENSION_H */
diff --git a/ml/Host.cc b/ml/Host.cc
new file mode 100644
index 000000000..d26ff2ae4
--- /dev/null
+++ b/ml/Host.cc
@@ -0,0 +1,458 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+#include <dlib/statistics.h>
+
+#include "Config.h"
+#include "Host.h"
+
+#include "json/single_include/nlohmann/json.hpp"
+
+using namespace ml;
+
+static void updateDimensionsChart(RRDHOST *RH,
+ collected_number NumTrainedDimensions,
+ collected_number NumNormalDimensions,
+ collected_number NumAnomalousDimensions) {
+ static thread_local RRDSET *RS = nullptr;
+ static thread_local RRDDIM *NumTotalDimensionsRD = nullptr;
+ static thread_local RRDDIM *NumTrainedDimensionsRD = nullptr;
+ static thread_local RRDDIM *NumNormalDimensionsRD = nullptr;
+ static thread_local RRDDIM *NumAnomalousDimensionsRD = nullptr;
+
+ if (!RS) {
+ RS = rrdset_create(
+ RH, // host
+ "anomaly_detection", // type
+ "dimensions", // id
+ NULL, // name
+ "dimensions", // family
+ NULL, // ctx
+ "Anomaly detection dimensions", // title
+ "dimensions", // units
+ "netdata", // plugin
+ "ml", // module
+ 39183, // priority
+ RH->rrd_update_every, // update_every
+ RRDSET_TYPE_LINE // chart_type
+ );
+
+ NumTotalDimensionsRD = rrddim_add(RS, "total", NULL,
+ 1, 1, RRD_ALGORITHM_ABSOLUTE);
+ NumTrainedDimensionsRD = rrddim_add(RS, "trained", NULL,
+ 1, 1, RRD_ALGORITHM_ABSOLUTE);
+ NumNormalDimensionsRD = rrddim_add(RS, "normal", NULL,
+ 1, 1, RRD_ALGORITHM_ABSOLUTE);
+ NumAnomalousDimensionsRD = rrddim_add(RS, "anomalous", NULL,
+ 1, 1, RRD_ALGORITHM_ABSOLUTE);
+ } else
+ rrdset_next(RS);
+
+ rrddim_set_by_pointer(RS, NumTotalDimensionsRD, NumNormalDimensions + NumAnomalousDimensions);
+ rrddim_set_by_pointer(RS, NumTrainedDimensionsRD, NumTrainedDimensions);
+ rrddim_set_by_pointer(RS, NumNormalDimensionsRD, NumNormalDimensions);
+ rrddim_set_by_pointer(RS, NumAnomalousDimensionsRD, NumAnomalousDimensions);
+
+ rrdset_done(RS);
+}
+
+static void updateRateChart(RRDHOST *RH, collected_number AnomalyRate) {
+ static thread_local RRDSET *RS = nullptr;
+ static thread_local RRDDIM *AnomalyRateRD = nullptr;
+
+ if (!RS) {
+ RS = rrdset_create(
+ RH, // host
+ "anomaly_detection", // type
+ "anomaly_rate", // id
+ NULL, // name
+ "anomaly_rate", // family
+ NULL, // ctx
+ "Percentage of anomalous dimensions", // title
+ "percentage", // units
+ "netdata", // plugin
+ "ml", // module
+ 39184, // priority
+ RH->rrd_update_every, // update_every
+ RRDSET_TYPE_LINE // chart_type
+ );
+
+ AnomalyRateRD = rrddim_add(RS, "anomaly_rate", NULL,
+ 1, 100, RRD_ALGORITHM_ABSOLUTE);
+ } else
+ rrdset_next(RS);
+
+ rrddim_set_by_pointer(RS, AnomalyRateRD, AnomalyRate);
+
+ rrdset_done(RS);
+}
+
+static void updateWindowLengthChart(RRDHOST *RH, collected_number WindowLength) {
+ static thread_local RRDSET *RS = nullptr;
+ static thread_local RRDDIM *WindowLengthRD = nullptr;
+
+ if (!RS) {
+ RS = rrdset_create(
+ RH, // host
+ "anomaly_detection", // type
+ "detector_window", // id
+ NULL, // name
+ "detector_window", // family
+ NULL, // ctx
+ "Anomaly detector window length", // title
+ "seconds", // units
+ "netdata", // plugin
+ "ml", // module
+ 39185, // priority
+ RH->rrd_update_every, // update_every
+ RRDSET_TYPE_LINE // chart_type
+ );
+
+ WindowLengthRD = rrddim_add(RS, "duration", NULL,
+ 1, 1, RRD_ALGORITHM_ABSOLUTE);
+ } else
+ rrdset_next(RS);
+
+ rrddim_set_by_pointer(RS, WindowLengthRD, WindowLength * RH->rrd_update_every);
+ rrdset_done(RS);
+}
+
+static void updateEventsChart(RRDHOST *RH,
+ std::pair<BitRateWindow::Edge, size_t> P,
+ bool ResetBitCounter,
+ bool NewAnomalyEvent) {
+ static thread_local RRDSET *RS = nullptr;
+ static thread_local RRDDIM *AboveThresholdRD = nullptr;
+ static thread_local RRDDIM *ResetBitCounterRD = nullptr;
+ static thread_local RRDDIM *NewAnomalyEventRD = nullptr;
+
+ if (!RS) {
+ RS = rrdset_create(
+ RH, // host
+ "anomaly_detection", // type
+ "detector_events", // id
+ NULL, // name
+ "detector_events", // family
+ NULL, // ctx
+ "Anomaly events triggered", // title
+ "boolean", // units
+ "netdata", // plugin
+ "ml", // module
+ 39186, // priority
+ RH->rrd_update_every, // update_every
+ RRDSET_TYPE_LINE // chart_type
+ );
+
+ AboveThresholdRD = rrddim_add(RS, "above_threshold", NULL,
+ 1, 1, RRD_ALGORITHM_ABSOLUTE);
+ ResetBitCounterRD = rrddim_add(RS, "reset_bit_counter", NULL,
+ 1, 1, RRD_ALGORITHM_ABSOLUTE);
+ NewAnomalyEventRD = rrddim_add(RS, "new_anomaly_event", NULL,
+ 1, 1, RRD_ALGORITHM_ABSOLUTE);
+ } else
+ rrdset_next(RS);
+
+ BitRateWindow::Edge E = P.first;
+ bool AboveThreshold = E.second == BitRateWindow::State::AboveThreshold;
+
+ rrddim_set_by_pointer(RS, AboveThresholdRD, AboveThreshold);
+ rrddim_set_by_pointer(RS, ResetBitCounterRD, ResetBitCounter);
+ rrddim_set_by_pointer(RS, NewAnomalyEventRD, NewAnomalyEvent);
+
+ rrdset_done(RS);
+}
+
+static void updateDetectionChart(RRDHOST *RH, collected_number PredictionDuration) {
+ static thread_local RRDSET *RS = nullptr;
+ static thread_local RRDDIM *PredictiobDurationRD = nullptr;
+
+ if (!RS) {
+ RS = rrdset_create(
+ RH, // host
+ "anomaly_detection", // type
+ "prediction_stats", // id
+ NULL, // name
+ "prediction_stats", // family
+ NULL, // ctx
+ "Time it took to run prediction", // title
+ "milliseconds", // units
+ "netdata", // plugin
+ "ml", // module
+ 39187, // priority
+ RH->rrd_update_every, // update_every
+ RRDSET_TYPE_LINE // chart_type
+ );
+
+ PredictiobDurationRD = rrddim_add(RS, "duration", NULL,
+ 1, 1, RRD_ALGORITHM_ABSOLUTE);
+ } else
+ rrdset_next(RS);
+
+ rrddim_set_by_pointer(RS, PredictiobDurationRD, PredictionDuration);
+
+ rrdset_done(RS);
+}
+
+static void updateTrainingChart(RRDHOST *RH,
+ collected_number TotalTrainingDuration,
+ collected_number MaxTrainingDuration)
+{
+ static thread_local RRDSET *RS = nullptr;
+ static thread_local RRDDIM *TotalTrainingDurationRD = nullptr;
+ static thread_local RRDDIM *MaxTrainingDurationRD = nullptr;
+
+ if (!RS) {
+ RS = rrdset_create(
+ RH, // host
+ "anomaly_detection", // type
+ "training_stats", // id
+ NULL, // name
+ "training_stats", // family
+ NULL, // ctx
+ "Training step statistics", // title
+ "milliseconds", // units
+ "netdata", // plugin
+ "ml", // module
+ 39188, // priority
+ RH->rrd_update_every, // update_every
+ RRDSET_TYPE_LINE // chart_type
+ );
+
+ TotalTrainingDurationRD = rrddim_add(RS, "total_training_duration", NULL,
+ 1, 1, RRD_ALGORITHM_ABSOLUTE);
+ MaxTrainingDurationRD = rrddim_add(RS, "max_training_duration", NULL,
+ 1, 1, RRD_ALGORITHM_ABSOLUTE);
+ } else
+ rrdset_next(RS);
+
+ rrddim_set_by_pointer(RS, TotalTrainingDurationRD, TotalTrainingDuration);
+ rrddim_set_by_pointer(RS, MaxTrainingDurationRD, MaxTrainingDuration);
+
+ rrdset_done(RS);
+}
+
+void RrdHost::addDimension(Dimension *D) {
+ std::lock_guard<std::mutex> Lock(Mutex);
+
+ DimensionsMap[D->getRD()] = D;
+
+ // Default construct mutex for dimension
+ LocksMap[D];
+}
+
+void RrdHost::removeDimension(Dimension *D) {
+ // Remove the dimension from the hosts map.
+ {
+ std::lock_guard<std::mutex> Lock(Mutex);
+ DimensionsMap.erase(D->getRD());
+ }
+
+ // Delete the dimension by locking the mutex that protects it.
+ {
+ std::lock_guard<std::mutex> Lock(LocksMap[D]);
+ delete D;
+ }
+
+ // Remove the lock entry for the deleted dimension.
+ {
+ std::lock_guard<std::mutex> Lock(Mutex);
+ LocksMap.erase(D);
+ }
+}
+
+void RrdHost::getConfigAsJson(nlohmann::json &Json) const {
+ Json["version"] = 1;
+
+ Json["enabled"] = Cfg.EnableAnomalyDetection;
+
+ Json["min-train-samples"] = Cfg.MinTrainSamples;
+ Json["max-train-samples"] = Cfg.MaxTrainSamples;
+ Json["train-every"] = Cfg.TrainEvery;
+
+ Json["diff-n"] = Cfg.DiffN;
+ Json["smooth-n"] = Cfg.SmoothN;
+ Json["lag-n"] = Cfg.LagN;
+
+ Json["max-kmeans-iters"] = Cfg.MaxKMeansIters;
+
+ Json["dimension-anomaly-score-threshold"] = Cfg.DimensionAnomalyScoreThreshold;
+ Json["host-anomaly-rate-threshold"] = Cfg.HostAnomalyRateThreshold;
+
+ Json["min-window-size"] = Cfg.ADMinWindowSize;
+ Json["max-window-size"] = Cfg.ADMaxWindowSize;
+ Json["idle-window-size"] = Cfg.ADIdleWindowSize;
+ Json["window-rate-threshold"] = Cfg.ADWindowRateThreshold;
+ Json["dimension-rate-threshold"] = Cfg.ADDimensionRateThreshold;
+}
+
+std::pair<Dimension *, Duration<double>>
+TrainableHost::findDimensionToTrain(const TimePoint &NowTP) {
+ std::lock_guard<std::mutex> Lock(Mutex);
+
+ Duration<double> AllottedDuration = Duration<double>{Cfg.TrainEvery * updateEvery()} / (DimensionsMap.size() + 1);
+
+ for (auto &DP : DimensionsMap) {
+ Dimension *D = DP.second;
+
+ if (D->shouldTrain(NowTP)) {
+ LocksMap[D].lock();
+ return { D, AllottedDuration };
+ }
+ }
+
+ return { nullptr, AllottedDuration };
+}
+
+void TrainableHost::trainDimension(Dimension *D, const TimePoint &NowTP) {
+ if (D == nullptr)
+ return;
+
+ D->LastTrainedAt = NowTP + Seconds{D->updateEvery()};
+
+ TimePoint StartTP = SteadyClock::now();
+ D->trainModel();
+ Duration<double> Duration = SteadyClock::now() - StartTP;
+ D->updateTrainingDuration(Duration.count());
+
+ {
+ std::lock_guard<std::mutex> Lock(Mutex);
+ LocksMap[D].unlock();
+ }
+}
+
+void TrainableHost::train() {
+ Duration<double> MaxSleepFor = Seconds{updateEvery()};
+
+ while (!netdata_exit) {
+ TimePoint NowTP = SteadyClock::now();
+
+ auto P = findDimensionToTrain(NowTP);
+ trainDimension(P.first, NowTP);
+
+ Duration<double> AllottedDuration = P.second;
+ Duration<double> RealDuration = SteadyClock::now() - NowTP;
+
+ Duration<double> SleepFor;
+ if (RealDuration >= AllottedDuration)
+ continue;
+
+ SleepFor = std::min(AllottedDuration - RealDuration, MaxSleepFor);
+ std::this_thread::sleep_for(SleepFor);
+ }
+}
+
+void DetectableHost::detectOnce() {
+ auto P = BRW.insert(AnomalyRate >= Cfg.HostAnomalyRateThreshold);
+ BitRateWindow::Edge Edge = P.first;
+ size_t WindowLength = P.second;
+
+ bool ResetBitCounter = (Edge.first != BitRateWindow::State::AboveThreshold);
+ bool NewAnomalyEvent = (Edge.first == BitRateWindow::State::AboveThreshold) &&
+ (Edge.second == BitRateWindow::State::Idle);
+
+ std::vector<std::pair<double, std::string>> DimsOverThreshold;
+
+ size_t NumAnomalousDimensions = 0;
+ size_t NumNormalDimensions = 0;
+ size_t NumTrainedDimensions = 0;
+
+ double TotalTrainingDuration = 0.0;
+ double MaxTrainingDuration = 0.0;
+
+ {
+ std::lock_guard<std::mutex> Lock(Mutex);
+
+ DimsOverThreshold.reserve(DimensionsMap.size());
+
+ for (auto &DP : DimensionsMap) {
+ Dimension *D = DP.second;
+
+ auto P = D->detect(WindowLength, ResetBitCounter);
+ bool IsAnomalous = P.first;
+ double AnomalyRate = P.second;
+
+ NumTrainedDimensions += D->isTrained();
+
+ double DimTrainingDuration = D->updateTrainingDuration(0.0);
+ MaxTrainingDuration = std::max(MaxTrainingDuration, DimTrainingDuration);
+ TotalTrainingDuration += DimTrainingDuration;
+
+ if (IsAnomalous)
+ NumAnomalousDimensions += 1;
+
+ if (NewAnomalyEvent && (AnomalyRate >= Cfg.ADDimensionRateThreshold))
+ DimsOverThreshold.push_back({ AnomalyRate, D->getID() });
+ }
+
+ if (NumAnomalousDimensions)
+ AnomalyRate = static_cast<double>(NumAnomalousDimensions) / DimensionsMap.size();
+ else
+ AnomalyRate = 0.0;
+
+ NumNormalDimensions = DimensionsMap.size() - NumAnomalousDimensions;
+ }
+
+ this->NumAnomalousDimensions = NumAnomalousDimensions;
+ this->NumNormalDimensions = NumNormalDimensions;
+ this->NumTrainedDimensions = NumTrainedDimensions;
+
+ updateDimensionsChart(getRH(), NumTrainedDimensions, NumNormalDimensions, NumAnomalousDimensions);
+ updateRateChart(getRH(), AnomalyRate * 10000.0);
+ updateWindowLengthChart(getRH(), WindowLength);
+ updateEventsChart(getRH(), P, ResetBitCounter, NewAnomalyEvent);
+ updateTrainingChart(getRH(), TotalTrainingDuration * 1000.0, MaxTrainingDuration * 1000.0);
+
+ if (!NewAnomalyEvent || (DimsOverThreshold.size() == 0))
+ return;
+
+ std::sort(DimsOverThreshold.begin(), DimsOverThreshold.end());
+ std::reverse(DimsOverThreshold.begin(), DimsOverThreshold.end());
+
+ // Make sure the JSON response won't grow beyond a specific number
+ // of dimensions. Log an error message if this happens, because it
+ // most likely means that the user specified a very-low anomaly rate
+ // threshold.
+ size_t NumMaxDimsOverThreshold = 2000;
+ if (DimsOverThreshold.size() > NumMaxDimsOverThreshold) {
+ error("Found %zu dimensions over threshold. Reducing JSON result to %zu dimensions.",
+ DimsOverThreshold.size(), NumMaxDimsOverThreshold);
+ DimsOverThreshold.resize(NumMaxDimsOverThreshold);
+ }
+
+ nlohmann::json JsonResult = DimsOverThreshold;
+
+ time_t Before = now_realtime_sec();
+ time_t After = Before - (WindowLength * updateEvery());
+ DB.insertAnomaly("AD1", 1, getUUID(), After, Before, JsonResult.dump(4));
+}
+
+void DetectableHost::detect() {
+ std::this_thread::sleep_for(Seconds{10});
+
+ while (!netdata_exit) {
+ TimePoint StartTP = SteadyClock::now();
+ detectOnce();
+ TimePoint EndTP = SteadyClock::now();
+
+ Duration<double> Dur = EndTP - StartTP;
+ updateDetectionChart(getRH(), Dur.count() * 1000);
+
+ std::this_thread::sleep_for(Seconds{updateEvery()});
+ }
+}
+
+void DetectableHost::getDetectionInfoAsJson(nlohmann::json &Json) const {
+ Json["anomalous-dimensions"] = NumAnomalousDimensions;
+ Json["normal-dimensions"] = NumNormalDimensions;
+ Json["total-dimensions"] = NumAnomalousDimensions + NumNormalDimensions;
+ Json["trained-dimensions"] = NumTrainedDimensions;
+}
+
+void DetectableHost::startAnomalyDetectionThreads() {
+ TrainingThread = std::thread(&TrainableHost::train, this);
+ DetectionThread = std::thread(&DetectableHost::detect, this);
+}
+
+void DetectableHost::stopAnomalyDetectionThreads() {
+ TrainingThread.join();
+ DetectionThread.join();
+}
diff --git a/ml/Host.h b/ml/Host.h
new file mode 100644
index 000000000..86591d7ae
--- /dev/null
+++ b/ml/Host.h
@@ -0,0 +1,104 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+#ifndef ML_HOST_H
+#define ML_HOST_H
+
+#include "BitRateWindow.h"
+#include "Config.h"
+#include "Database.h"
+#include "Dimension.h"
+
+#include "ml-private.h"
+
+namespace ml {
+
+class RrdHost {
+public:
+ RrdHost(RRDHOST *RH) : RH(RH) {}
+
+ RRDHOST *getRH() { return RH; }
+
+ unsigned updateEvery() { return RH->rrd_update_every; }
+
+ std::string getUUID() {
+ char S[UUID_STR_LEN];
+ uuid_unparse_lower(RH->host_uuid, S);
+ return S;
+ }
+
+ void addDimension(Dimension *D);
+ void removeDimension(Dimension *D);
+
+ void getConfigAsJson(nlohmann::json &Json) const;
+
+ virtual ~RrdHost() {};
+
+protected:
+ RRDHOST *RH;
+
+ // Protect dimension and lock maps
+ std::mutex Mutex;
+
+ std::map<RRDDIM *, Dimension *> DimensionsMap;
+ std::map<Dimension *, std::mutex> LocksMap;
+};
+
+class TrainableHost : public RrdHost {
+public:
+ TrainableHost(RRDHOST *RH) : RrdHost(RH) {}
+
+ void train();
+
+private:
+ std::pair<Dimension *, Duration<double>> findDimensionToTrain(const TimePoint &NowTP);
+ void trainDimension(Dimension *D, const TimePoint &NowTP);
+};
+
+class DetectableHost : public TrainableHost {
+public:
+ DetectableHost(RRDHOST *RH) : TrainableHost(RH) {}
+
+ void startAnomalyDetectionThreads();
+ void stopAnomalyDetectionThreads();
+
+ template<typename ...ArgTypes>
+ bool getAnomalyInfo(ArgTypes&&... Args) {
+ return DB.getAnomalyInfo(Args...);
+ }
+
+ template<typename ...ArgTypes>
+ bool getAnomaliesInRange(ArgTypes&&... Args) {
+ return DB.getAnomaliesInRange(Args...);
+ }
+
+ void getDetectionInfoAsJson(nlohmann::json &Json) const;
+
+private:
+ void detect();
+ void detectOnce();
+
+private:
+ std::thread TrainingThread;
+ std::thread DetectionThread;
+
+ BitRateWindow BRW{
+ static_cast<size_t>(Cfg.ADMinWindowSize),
+ static_cast<size_t>(Cfg.ADMaxWindowSize),
+ static_cast<size_t>(Cfg.ADIdleWindowSize),
+ static_cast<size_t>(Cfg.ADMinWindowSize * Cfg.ADWindowRateThreshold)
+ };
+
+ CalculatedNumber AnomalyRate{0.0};
+
+ size_t NumAnomalousDimensions{0};
+ size_t NumNormalDimensions{0};
+ size_t NumTrainedDimensions{0};
+
+ Database DB{Cfg.AnomalyDBPath};
+};
+
+using Host = DetectableHost;
+
+} // namespace ml
+
+#endif /* ML_HOST_H */
diff --git a/ml/Makefile.am b/ml/Makefile.am
new file mode 100644
index 000000000..27449d659
--- /dev/null
+++ b/ml/Makefile.am
@@ -0,0 +1,8 @@
+# SPDX-License-Identifier: GPL-3.0-or-later
+
+AUTOMAKE_OPTIONS = subdir-objects
+MAINTAINERCLEANFILES = $(srcdir)/Makefile.in
+
+SUBDIRS = \
+ kmeans \
+ $(NULL)
diff --git a/ml/Query.h b/ml/Query.h
new file mode 100644
index 000000000..cbaf6c297
--- /dev/null
+++ b/ml/Query.h
@@ -0,0 +1,49 @@
+#ifndef QUERY_H
+#define QUERY_H
+
+#include "ml-private.h"
+
+namespace ml {
+
+class Query {
+public:
+ Query(RRDDIM *RD) : RD(RD) {
+ Ops = &RD->state->query_ops;
+ }
+
+ time_t latestTime() {
+ return Ops->latest_time(RD);
+ }
+
+ time_t oldestTime() {
+ return Ops->oldest_time(RD);
+ }
+
+ void init(time_t AfterT, time_t BeforeT) {
+ Ops->init(RD, &Handle, AfterT, BeforeT);
+ }
+
+ bool isFinished() {
+ return Ops->is_finished(&Handle);
+ }
+
+ std::pair<time_t, storage_number> nextMetric() {
+ time_t CurrT;
+ storage_number SN = Ops->next_metric(&Handle, &CurrT);
+ return { CurrT, SN };
+ }
+
+ ~Query() {
+ Ops->finalize(&Handle);
+ }
+
+private:
+ RRDDIM *RD;
+
+ struct rrddim_volatile::rrddim_query_ops *Ops;
+ struct rrddim_query_handle Handle;
+};
+
+} // namespace ml
+
+#endif /* QUERY_H */
diff --git a/ml/Tests.cc b/ml/Tests.cc
new file mode 100644
index 000000000..7d369d48d
--- /dev/null
+++ b/ml/Tests.cc
@@ -0,0 +1,301 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+#include "BitBufferCounter.h"
+#include "BitRateWindow.h"
+
+#include "gtest/gtest.h"
+
+using namespace ml;
+
+TEST(BitBufferCounterTest, Cap_4) {
+ size_t Capacity = 4;
+ BitBufferCounter BBC(Capacity);
+
+ // No bits set
+ EXPECT_EQ(BBC.numSetBits(), 0);
+
+ // All ones
+ for (size_t Idx = 0; Idx != (2 * Capacity); Idx++) {
+ BBC.insert(true);
+
+ EXPECT_EQ(BBC.numSetBits(), std::min(Idx + 1, Capacity));
+ }
+
+ // All zeroes
+ for (size_t Idx = 0; Idx != Capacity; Idx++) {
+ BBC.insert(false);
+
+ if (Idx < Capacity)
+ EXPECT_EQ(BBC.numSetBits(), Capacity - (Idx + 1));
+ else
+ EXPECT_EQ(BBC.numSetBits(), 0);
+ }
+
+ // Even ones/zeroes
+ for (size_t Idx = 0; Idx != (2 * Capacity); Idx++)
+ BBC.insert(Idx % 2 == 0);
+ EXPECT_EQ(BBC.numSetBits(), Capacity / 2);
+}
+
+using State = BitRateWindow::State;
+using Edge = BitRateWindow::Edge;
+using Result = std::pair<Edge, size_t>;
+
+TEST(BitRateWindowTest, Cycles) {
+ /* Test the FSM by going through its two cycles:
+ * 1) NotFilled -> AboveThreshold -> Idle -> NotFilled
+ * 2) NotFilled -> BelowThreshold -> AboveThreshold -> Idle -> NotFilled
+ *
+ * Check the window's length on every new state transition.
+ */
+
+ size_t MinLength = 4, MaxLength = 6, IdleLength = 5;
+ size_t SetBitsThreshold = 3;
+
+ Result R;
+ BitRateWindow BRW(MinLength, MaxLength, IdleLength, SetBitsThreshold);
+
+ /*
+ * 1st cycle
+ */
+
+ // NotFilled -> AboveThreshold
+ R = BRW.insert(true);
+ EXPECT_EQ(R.first, std::make_pair(State::NotFilled, State::NotFilled));
+ R = BRW.insert(true);
+ EXPECT_EQ(R.first, std::make_pair(State::NotFilled, State::NotFilled));
+ R = BRW.insert(true);
+ EXPECT_EQ(R.first, std::make_pair(State::NotFilled, State::NotFilled));
+ R = BRW.insert(true);
+ EXPECT_EQ(R.first, std::make_pair(State::NotFilled, State::AboveThreshold));
+ EXPECT_EQ(R.second, MinLength);
+
+ // AboveThreshold -> Idle
+ R = BRW.insert(true);
+ EXPECT_EQ(R.first, std::make_pair(State::AboveThreshold, State::AboveThreshold));
+ R = BRW.insert(true);
+ EXPECT_EQ(R.first, std::make_pair(State::AboveThreshold, State::AboveThreshold));
+
+ R = BRW.insert(true);
+ EXPECT_EQ(R.first, std::make_pair(State::AboveThreshold, State::Idle));
+ EXPECT_EQ(R.second, MaxLength);
+
+
+ // Idle -> NotFilled
+ R = BRW.insert(true);
+ EXPECT_EQ(R.first, std::make_pair(State::Idle, State::Idle));
+ R = BRW.insert(true);
+ EXPECT_EQ(R.first, std::make_pair(State::Idle, State::Idle));
+ R = BRW.insert(true);
+ EXPECT_EQ(R.first, std::make_pair(State::Idle, State::Idle));
+ R = BRW.insert(true);
+ EXPECT_EQ(R.first, std::make_pair(State::Idle, State::Idle));
+ R = BRW.insert(true);
+ EXPECT_EQ(R.first, std::make_pair(State::Idle, State::NotFilled));
+ EXPECT_EQ(R.second, 1);
+
+ // NotFilled -> AboveThreshold
+ R = BRW.insert(true);
+ EXPECT_EQ(R.first, std::make_pair(State::NotFilled, State::NotFilled));
+ R = BRW.insert(true);
+ EXPECT_EQ(R.first, std::make_pair(State::NotFilled, State::NotFilled));
+ R = BRW.insert(true);
+ EXPECT_EQ(R.first, std::make_pair(State::NotFilled, State::AboveThreshold));
+ EXPECT_EQ(R.second, MinLength);
+
+ /*
+ * 2nd cycle
+ */
+
+ BRW = BitRateWindow(MinLength, MaxLength, IdleLength, SetBitsThreshold);
+
+ // NotFilled -> BelowThreshold
+ R = BRW.insert(false);
+ EXPECT_EQ(R.first, std::make_pair(State::NotFilled, State::NotFilled));
+ R = BRW.insert(false);
+ EXPECT_EQ(R.first, std::make_pair(State::NotFilled, State::NotFilled));
+ R = BRW.insert(false);
+ EXPECT_EQ(R.first, std::make_pair(State::NotFilled, State::NotFilled));
+ R = BRW.insert(false);
+ EXPECT_EQ(R.first, std::make_pair(State::NotFilled, State::BelowThreshold));
+ EXPECT_EQ(R.second, MinLength);
+
+ // BelowThreshold -> BelowThreshold:
+ // Check the state's self loop by adding set bits that will keep the
+ // bit buffer below the specified threshold.
+ //
+ for (size_t Idx = 0; Idx != 2 * MaxLength; Idx++) {
+ R = BRW.insert(Idx % 2 == 0);
+ EXPECT_EQ(R.first, std::make_pair(State::BelowThreshold, State::BelowThreshold));
+ EXPECT_EQ(R.second, MinLength);
+ }
+
+ // Verify that at the end of the loop the internal bit buffer contains
+ // "1010". Do so by adding one set bit and checking that we remain below
+ // the specified threshold.
+ R = BRW.insert(true);
+ EXPECT_EQ(R.first, std::make_pair(State::BelowThreshold, State::BelowThreshold));
+ EXPECT_EQ(R.second, MinLength);
+
+ // BelowThreshold -> AboveThreshold
+ R = BRW.insert(true);
+ EXPECT_EQ(R.first, std::make_pair(State::BelowThreshold, State::AboveThreshold));
+ EXPECT_EQ(R.second, MinLength);
+
+ // AboveThreshold -> Idle:
+ // Do the transition without filling the max window size this time.
+ R = BRW.insert(false);
+ EXPECT_EQ(R.first, std::make_pair(State::AboveThreshold, State::Idle));
+ EXPECT_EQ(R.second, MinLength);
+
+ // Idle -> NotFilled
+ R = BRW.insert(false);
+ EXPECT_EQ(R.first, std::make_pair(State::Idle, State::Idle));
+ R = BRW.insert(false);
+ EXPECT_EQ(R.first, std::make_pair(State::Idle, State::Idle));
+ R = BRW.insert(false);
+ EXPECT_EQ(R.first, std::make_pair(State::Idle, State::Idle));
+ R = BRW.insert(false);
+ EXPECT_EQ(R.first, std::make_pair(State::Idle, State::Idle));
+ R = BRW.insert(false);
+ EXPECT_EQ(R.first, std::make_pair(State::Idle, State::NotFilled));
+ EXPECT_EQ(R.second, 1);
+
+ // NotFilled -> AboveThreshold
+ R = BRW.insert(true);
+ EXPECT_EQ(R.first, std::make_pair(State::NotFilled, State::NotFilled));
+ R = BRW.insert(true);
+ EXPECT_EQ(R.first, std::make_pair(State::NotFilled, State::NotFilled));
+ R = BRW.insert(true);
+ EXPECT_EQ(R.first, std::make_pair(State::NotFilled, State::AboveThreshold));
+ EXPECT_EQ(R.second, MinLength);
+}
+
+TEST(BitRateWindowTest, ConsecutiveOnes) {
+ size_t MinLength = 120, MaxLength = 240, IdleLength = 30;
+ size_t SetBitsThreshold = 30;
+
+ Result R;
+ BitRateWindow BRW(MinLength, MaxLength, IdleLength, SetBitsThreshold);
+
+ for (size_t Idx = 0; Idx != MaxLength; Idx++)
+ R = BRW.insert(false);
+ EXPECT_EQ(R.first, std::make_pair(State::BelowThreshold, State::BelowThreshold));
+ EXPECT_EQ(R.second, MinLength);
+
+ for (size_t Idx = 0; Idx != SetBitsThreshold; Idx++) {
+ EXPECT_EQ(R.first, std::make_pair(State::BelowThreshold, State::BelowThreshold));
+ R = BRW.insert(true);
+ }
+ EXPECT_EQ(R.first, std::make_pair(State::BelowThreshold, State::AboveThreshold));
+ EXPECT_EQ(R.second, MinLength);
+
+ // At this point the window's buffer contains:
+ // (MinLength - SetBitsThreshold = 90) 0s, followed by
+ // (SetBitsThreshold = 30) 1s.
+ //
+ // To go below the threshold, we need to add (90 + 1) more 0s in the window's
+ // buffer. At that point, the the window's buffer will contain:
+ // (SetBitsThreshold = 29) 1s, followed by
+ // (MinLength - SetBitsThreshold = 91) 0s.
+ //
+ // Right before adding the last 0, we expect the window's length to be equal to 210,
+ // because the bit buffer has gone through these bits:
+ // (MinLength - SetBitsThreshold = 90) 0s, followed by
+ // (SetBitsThreshold = 30) 1s, followed by
+ // (MinLength - SetBitsThreshold = 90) 0s.
+
+ for (size_t Idx = 0; Idx != (MinLength - SetBitsThreshold); Idx++) {
+ R = BRW.insert(false);
+ EXPECT_EQ(R.first, std::make_pair(State::AboveThreshold, State::AboveThreshold));
+ }
+ EXPECT_EQ(R.second, 2 * MinLength - SetBitsThreshold);
+ R = BRW.insert(false);
+ EXPECT_EQ(R.first, std::make_pair(State::AboveThreshold, State::Idle));
+
+ // Continue with the Idle -> NotFilled edge.
+ for (size_t Idx = 0; Idx != IdleLength - 1; Idx++) {
+ R = BRW.insert(false);
+ EXPECT_EQ(R.first, std::make_pair(State::Idle, State::Idle));
+ }
+ R = BRW.insert(false);
+ EXPECT_EQ(R.first, std::make_pair(State::Idle, State::NotFilled));
+ EXPECT_EQ(R.second, 1);
+}
+
+TEST(BitRateWindowTest, WithHoles) {
+ size_t MinLength = 120, MaxLength = 240, IdleLength = 30;
+ size_t SetBitsThreshold = 30;
+
+ Result R;
+ BitRateWindow BRW(MinLength, MaxLength, IdleLength, SetBitsThreshold);
+
+ for (size_t Idx = 0; Idx != MaxLength; Idx++)
+ R = BRW.insert(false);
+
+ for (size_t Idx = 0; Idx != SetBitsThreshold / 3; Idx++)
+ R = BRW.insert(true);
+ for (size_t Idx = 0; Idx != SetBitsThreshold / 3; Idx++)
+ R = BRW.insert(false);
+ for (size_t Idx = 0; Idx != SetBitsThreshold / 3; Idx++)
+ R = BRW.insert(true);
+ for (size_t Idx = 0; Idx != SetBitsThreshold / 3; Idx++)
+ R = BRW.insert(false);
+ for (size_t Idx = 0; Idx != SetBitsThreshold / 3; Idx++)
+ R = BRW.insert(true);
+
+ EXPECT_EQ(R.first, std::make_pair(State::BelowThreshold, State::AboveThreshold));
+ EXPECT_EQ(R.second, MinLength);
+
+ // The window's bit buffer contains:
+ // 70 0s, 10 1s, 10 0s, 10 1s, 10 0s, 10 1s.
+ // Where: 70 = MinLength - (5 / 3) * SetBitsThresholds, ie. we need
+ // to add (70 + 1) more zeros to make the bit buffer go below the
+ // threshold and then the window's length should be:
+ // 70 + 50 + 70 = 190.
+
+ BitRateWindow::Edge E;
+ do {
+ R = BRW.insert(false);
+ E = R.first;
+ } while (E.first != State::AboveThreshold || E.second != State::Idle);
+ EXPECT_EQ(R.second, 2 * MinLength - (5 * SetBitsThreshold) / 3);
+}
+
+TEST(BitRateWindowTest, MinWindow) {
+ size_t MinLength = 120, MaxLength = 240, IdleLength = 30;
+ size_t SetBitsThreshold = 30;
+
+ Result R;
+ BitRateWindow BRW(MinLength, MaxLength, IdleLength, SetBitsThreshold);
+
+ BRW.insert(true);
+ BRW.insert(false);
+ for (size_t Idx = 2; Idx != SetBitsThreshold; Idx++)
+ BRW.insert(true);
+ for (size_t Idx = SetBitsThreshold; Idx != MinLength - 1; Idx++)
+ BRW.insert(false);
+
+ R = BRW.insert(true);
+ EXPECT_EQ(R.first, std::make_pair(State::NotFilled, State::AboveThreshold));
+ EXPECT_EQ(R.second, MinLength);
+
+ R = BRW.insert(false);
+ EXPECT_EQ(R.first, std::make_pair(State::AboveThreshold, State::Idle));
+}
+
+TEST(BitRateWindowTest, MaxWindow) {
+ size_t MinLength = 100, MaxLength = 200, IdleLength = 30;
+ size_t SetBitsThreshold = 50;
+
+ Result R;
+ BitRateWindow BRW(MinLength, MaxLength, IdleLength, SetBitsThreshold);
+
+ for (size_t Idx = 0; Idx != MaxLength; Idx++)
+ R = BRW.insert(Idx % 2 == 0);
+ EXPECT_EQ(R.first, std::make_pair(State::AboveThreshold, State::AboveThreshold));
+ EXPECT_EQ(R.second, MaxLength);
+
+ R = BRW.insert(false);
+ EXPECT_EQ(R.first, std::make_pair(State::AboveThreshold, State::Idle));
+}
diff --git a/ml/kmeans/KMeans.cc b/ml/kmeans/KMeans.cc
new file mode 100644
index 000000000..e66c66c16
--- /dev/null
+++ b/ml/kmeans/KMeans.cc
@@ -0,0 +1,55 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+#include "KMeans.h"
+#include <dlib/clustering.h>
+
+void KMeans::train(SamplesBuffer &SB, size_t MaxIterations) {
+ std::vector<DSample> Samples = SB.preprocess();
+
+ MinDist = std::numeric_limits<CalculatedNumber>::max();
+ MaxDist = std::numeric_limits<CalculatedNumber>::min();
+
+ {
+ std::lock_guard<std::mutex> Lock(Mutex);
+
+ ClusterCenters.clear();
+
+ dlib::pick_initial_centers(NumClusters, ClusterCenters, Samples);
+ dlib::find_clusters_using_kmeans(Samples, ClusterCenters, MaxIterations);
+
+ for (const auto &S : Samples) {
+ CalculatedNumber MeanDist = 0.0;
+
+ for (const auto &KMCenter : ClusterCenters)
+ MeanDist += dlib::length(KMCenter - S);
+
+ MeanDist /= NumClusters;
+
+ if (MeanDist < MinDist)
+ MinDist = MeanDist;
+
+ if (MeanDist > MaxDist)
+ MaxDist = MeanDist;
+ }
+ }
+}
+
+CalculatedNumber KMeans::anomalyScore(SamplesBuffer &SB) {
+ std::vector<DSample> DSamples = SB.preprocess();
+
+ std::unique_lock<std::mutex> Lock(Mutex, std::defer_lock);
+ if (!Lock.try_lock())
+ return std::numeric_limits<CalculatedNumber>::quiet_NaN();
+
+ CalculatedNumber MeanDist = 0.0;
+ for (const auto &CC: ClusterCenters)
+ MeanDist += dlib::length(CC - DSamples.back());
+
+ MeanDist /= NumClusters;
+
+ if (MaxDist == MinDist)
+ return 0.0;
+
+ CalculatedNumber AnomalyScore = 100.0 * std::abs((MeanDist - MinDist) / (MaxDist - MinDist));
+ return (AnomalyScore > 100.0) ? 100.0 : AnomalyScore;
+}
diff --git a/ml/kmeans/KMeans.h b/ml/kmeans/KMeans.h
new file mode 100644
index 000000000..4ea3b6a89
--- /dev/null
+++ b/ml/kmeans/KMeans.h
@@ -0,0 +1,34 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+#ifndef KMEANS_H
+#define KMEANS_H
+
+#include <atomic>
+#include <vector>
+#include <limits>
+#include <mutex>
+
+#include "SamplesBuffer.h"
+
+class KMeans {
+public:
+ KMeans(size_t NumClusters = 2) : NumClusters(NumClusters) {
+ MinDist = std::numeric_limits<CalculatedNumber>::max();
+ MaxDist = std::numeric_limits<CalculatedNumber>::min();
+ };
+
+ void train(SamplesBuffer &SB, size_t MaxIterations);
+ CalculatedNumber anomalyScore(SamplesBuffer &SB);
+
+private:
+ size_t NumClusters;
+
+ std::vector<DSample> ClusterCenters;
+
+ CalculatedNumber MinDist;
+ CalculatedNumber MaxDist;
+
+ std::mutex Mutex;
+};
+
+#endif /* KMEANS_H */
diff --git a/ml/kmeans/Makefile.am b/ml/kmeans/Makefile.am
new file mode 100644
index 000000000..babdcf0df
--- /dev/null
+++ b/ml/kmeans/Makefile.am
@@ -0,0 +1,4 @@
+# SPDX-License-Identifier: GPL-3.0-or-later
+
+AUTOMAKE_OPTIONS = subdir-objects
+MAINTAINERCLEANFILES = $(srcdir)/Makefile.in
diff --git a/ml/kmeans/SamplesBuffer.cc b/ml/kmeans/SamplesBuffer.cc
new file mode 100644
index 000000000..f8211fb54
--- /dev/null
+++ b/ml/kmeans/SamplesBuffer.cc
@@ -0,0 +1,144 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+//
+#include "SamplesBuffer.h"
+
+#include <fstream>
+#include <sstream>
+#include <string>
+
+void Sample::print(std::ostream &OS) const {
+ for (size_t Idx = 0; Idx != NumDims - 1; Idx++)
+ OS << CNs[Idx] << ", ";
+
+ OS << CNs[NumDims - 1];
+}
+
+void SamplesBuffer::print(std::ostream &OS) const {
+ for (size_t Idx = Preprocessed ? (DiffN + (SmoothN - 1) + (LagN)) : 0;
+ Idx != NumSamples; Idx++) {
+ Sample S = Preprocessed ? getPreprocessedSample(Idx) : getSample(Idx);
+ OS << S << std::endl;
+ }
+}
+
+std::vector<Sample> SamplesBuffer::getPreprocessedSamples() const {
+ std::vector<Sample> V;
+
+ for (size_t Idx = Preprocessed ? (DiffN + (SmoothN - 1) + (LagN)) : 0;
+ Idx != NumSamples; Idx++) {
+ Sample S = Preprocessed ? getPreprocessedSample(Idx) : getSample(Idx);
+ V.push_back(S);
+ }
+
+ return V;
+}
+
+void SamplesBuffer::diffSamples() {
+ // Panda's DataFrame default behaviour is to subtract each element from
+ // itself. For us `DiffN = 0` means "disable diff-ing" when preprocessing
+ // the samples buffer. This deviation will make it easier for us to test
+ // the KMeans implementation.
+ if (DiffN == 0)
+ return;
+
+ for (size_t Idx = 0; Idx != (NumSamples - DiffN); Idx++) {
+ size_t High = (NumSamples - 1) - Idx;
+ size_t Low = High - DiffN;
+
+ Sample LHS = getSample(High);
+ Sample RHS = getSample(Low);
+
+ LHS.diff(RHS);
+ }
+}
+
+void SamplesBuffer::smoothSamples() {
+ // Holds the mean value of each window
+ CalculatedNumber *AccCNs = new CalculatedNumber[NumDimsPerSample]();
+ Sample Acc(AccCNs, NumDimsPerSample);
+
+ // Used to avoid clobbering the accumulator when moving the window
+ CalculatedNumber *TmpCNs = new CalculatedNumber[NumDimsPerSample]();
+ Sample Tmp(TmpCNs, NumDimsPerSample);
+
+ CalculatedNumber Factor = (CalculatedNumber) 1 / SmoothN;
+
+ // Calculate the value of the 1st window
+ for (size_t Idx = 0; Idx != std::min(SmoothN, NumSamples); Idx++) {
+ Tmp.add(getSample(NumSamples - (Idx + 1)));
+ }
+
+ Acc.add(Tmp);
+ Acc.scale(Factor);
+
+ // Move the window and update the samples
+ for (size_t Idx = NumSamples; Idx != (DiffN + SmoothN - 1); Idx--) {
+ Sample S = getSample(Idx - 1);
+
+ // Tmp <- Next window (if any)
+ if (Idx >= (SmoothN + 1)) {
+ Tmp.diff(S);
+ Tmp.add(getSample(Idx - (SmoothN + 1)));
+ }
+
+ // S <- Acc
+ S.copy(Acc);
+
+ // Acc <- Tmp
+ Acc.copy(Tmp);
+ Acc.scale(Factor);
+ }
+
+ delete[] AccCNs;
+ delete[] TmpCNs;
+}
+
+void SamplesBuffer::lagSamples() {
+ if (LagN == 0)
+ return;
+
+ for (size_t Idx = NumSamples; Idx != LagN; Idx--) {
+ Sample PS = getPreprocessedSample(Idx - 1);
+ PS.lag(getSample(Idx - 1), LagN);
+ }
+}
+
+std::vector<DSample> SamplesBuffer::preprocess() {
+ assert(Preprocessed == false);
+
+ std::vector<DSample> DSamples;
+ size_t OutN = NumSamples;
+
+ // Diff
+ if (DiffN >= OutN)
+ return DSamples;
+ OutN -= DiffN;
+ diffSamples();
+
+ // Smooth
+ if (SmoothN == 0 || SmoothN > OutN)
+ return DSamples;
+ OutN -= (SmoothN - 1);
+ smoothSamples();
+
+ // Lag
+ if (LagN >= OutN)
+ return DSamples;
+ OutN -= LagN;
+ lagSamples();
+
+ DSamples.reserve(OutN);
+ Preprocessed = true;
+
+ for (size_t Idx = NumSamples - OutN; Idx != NumSamples; Idx++) {
+ DSample DS;
+ DS.set_size(NumDimsPerSample * (LagN + 1));
+
+ const Sample PS = getPreprocessedSample(Idx);
+ PS.initDSample(DS);
+
+ DSamples.push_back(DS);
+ }
+
+ return DSamples;
+}
diff --git a/ml/kmeans/SamplesBuffer.h b/ml/kmeans/SamplesBuffer.h
new file mode 100644
index 000000000..fccd216d5
--- /dev/null
+++ b/ml/kmeans/SamplesBuffer.h
@@ -0,0 +1,140 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+#ifndef SAMPLES_BUFFER_H
+#define SAMPLES_BUFFER_H
+
+#include <iostream>
+#include <vector>
+
+#include <cassert>
+#include <cstdlib>
+#include <cstring>
+
+#include <dlib/matrix.h>
+
+typedef double CalculatedNumber;
+typedef dlib::matrix<CalculatedNumber, 0, 1> DSample;
+
+class Sample {
+public:
+ Sample(CalculatedNumber *Buf, size_t N) : CNs(Buf), NumDims(N) {}
+
+ void initDSample(DSample &DS) const {
+ for (size_t Idx = 0; Idx != NumDims; Idx++)
+ DS(Idx) = CNs[Idx];
+ }
+
+ void add(const Sample &RHS) const {
+ assert(NumDims == RHS.NumDims);
+
+ for (size_t Idx = 0; Idx != NumDims; Idx++)
+ CNs[Idx] += RHS.CNs[Idx];
+ };
+
+ void diff(const Sample &RHS) const {
+ assert(NumDims == RHS.NumDims);
+
+ for (size_t Idx = 0; Idx != NumDims; Idx++)
+ CNs[Idx] -= RHS.CNs[Idx];
+ };
+
+ void copy(const Sample &RHS) const {
+ assert(NumDims == RHS.NumDims);
+
+ std::memcpy(CNs, RHS.CNs, NumDims * sizeof(CalculatedNumber));
+ }
+
+ void scale(CalculatedNumber Factor) {
+ for (size_t Idx = 0; Idx != NumDims; Idx++)
+ CNs[Idx] *= Factor;
+ }
+
+ void lag(const Sample &S, size_t LagN) {
+ size_t N = S.NumDims;
+
+ for (size_t Idx = 0; Idx != (LagN + 1); Idx++) {
+ Sample Src(S.CNs - (Idx * N), N);
+ Sample Dst(CNs + (Idx * N), N);
+ Dst.copy(Src);
+ }
+ }
+
+ const CalculatedNumber *getCalculatedNumbers() const {
+ return CNs;
+ };
+
+ void print(std::ostream &OS) const;
+
+private:
+ CalculatedNumber *CNs;
+ size_t NumDims;
+};
+
+inline std::ostream& operator<<(std::ostream &OS, const Sample &S) {
+ S.print(OS);
+ return OS;
+}
+
+class SamplesBuffer {
+public:
+ SamplesBuffer(CalculatedNumber *CNs,
+ size_t NumSamples, size_t NumDimsPerSample,
+ size_t DiffN = 1, size_t SmoothN = 3, size_t LagN = 3) :
+ CNs(CNs), NumSamples(NumSamples), NumDimsPerSample(NumDimsPerSample),
+ DiffN(DiffN), SmoothN(SmoothN), LagN(LagN),
+ BytesPerSample(NumDimsPerSample * sizeof(CalculatedNumber)),
+ Preprocessed(false) {};
+
+ std::vector<DSample> preprocess();
+ std::vector<Sample> getPreprocessedSamples() const;
+
+ size_t capacity() const { return NumSamples; }
+ void print(std::ostream &OS) const;
+
+private:
+ size_t getSampleOffset(size_t Index) const {
+ assert(Index < NumSamples);
+ return Index * NumDimsPerSample;
+ }
+
+ size_t getPreprocessedSampleOffset(size_t Index) const {
+ assert(Index < NumSamples);
+ return getSampleOffset(Index) * (LagN + 1);
+ }
+
+ void setSample(size_t Index, const Sample &S) const {
+ size_t Offset = getSampleOffset(Index);
+ std::memcpy(&CNs[Offset], S.getCalculatedNumbers(), BytesPerSample);
+ }
+
+ const Sample getSample(size_t Index) const {
+ size_t Offset = getSampleOffset(Index);
+ return Sample(&CNs[Offset], NumDimsPerSample);
+ };
+
+ const Sample getPreprocessedSample(size_t Index) const {
+ size_t Offset = getPreprocessedSampleOffset(Index);
+ return Sample(&CNs[Offset], NumDimsPerSample * (LagN + 1));
+ };
+
+ void diffSamples();
+ void smoothSamples();
+ void lagSamples();
+
+private:
+ CalculatedNumber *CNs;
+ size_t NumSamples;
+ size_t NumDimsPerSample;
+ size_t DiffN;
+ size_t SmoothN;
+ size_t LagN;
+ size_t BytesPerSample;
+ bool Preprocessed;
+};
+
+inline std::ostream& operator<<(std::ostream& OS, const SamplesBuffer &SB) {
+ SB.print(OS);
+ return OS;
+}
+
+#endif /* SAMPLES_BUFFER_H */
diff --git a/ml/kmeans/Tests.cc b/ml/kmeans/Tests.cc
new file mode 100644
index 000000000..0cb595945
--- /dev/null
+++ b/ml/kmeans/Tests.cc
@@ -0,0 +1,143 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+#include "ml/ml-private.h"
+#include <gtest/gtest.h>
+
+/*
+ * The SamplesBuffer class implements the functionality of the following python
+ * code:
+ * >> df = pd.DataFrame(data=samples)
+ * >> df = df.diff(diff_n).dropna()
+ * >> df = df.rolling(smooth_n).mean().dropna()
+ * >> df = pd.concat([df.shift(n) for n in range(lag_n + 1)], axis=1).dropna()
+ *
+ * Its correctness has been verified by automatically generating random
+ * data frames in Python and comparing them with the correspondent preprocessed
+ * SampleBuffers.
+ *
+ * The following tests are meant to catch unintended changes in the SamplesBuffer
+ * implementation. For development purposes, one should compare changes against
+ * the aforementioned python code.
+*/
+
+TEST(SamplesBufferTest, NS_8_NDPS_1_DN_1_SN_3_LN_1) {
+ size_t NumSamples = 8, NumDimsPerSample = 1;
+ size_t DiffN = 1, SmoothN = 3, LagN = 3;
+
+ size_t N = NumSamples * NumDimsPerSample * (LagN + 1);
+ CalculatedNumber *CNs = new CalculatedNumber[N]();
+
+ CNs[0] = 0.7568336679490107;
+ CNs[1] = 0.4814406581763254;
+ CNs[2] = 0.40073555156221874;
+ CNs[3] = 0.5973257298194408;
+ CNs[4] = 0.5334727814345868;
+ CNs[5] = 0.2632477193454843;
+ CNs[6] = 0.2684839023122384;
+ CNs[7] = 0.851332948637479;
+
+ SamplesBuffer SB(CNs, NumSamples, NumDimsPerSample, DiffN, SmoothN, LagN);
+ SB.preprocess();
+
+ std::vector<Sample> Samples = SB.getPreprocessedSamples();
+ EXPECT_EQ(Samples.size(), 2);
+
+ Sample S0 = Samples[0];
+ const CalculatedNumber *S0_CNs = S0.getCalculatedNumbers();
+ Sample S1 = Samples[1];
+ const CalculatedNumber *S1_CNs = S1.getCalculatedNumbers();
+
+ EXPECT_NEAR(S0_CNs[0], -0.109614, 0.001);
+ EXPECT_NEAR(S0_CNs[1], -0.0458293, 0.001);
+ EXPECT_NEAR(S0_CNs[2], 0.017344, 0.001);
+ EXPECT_NEAR(S0_CNs[3], -0.0531693, 0.001);
+
+ EXPECT_NEAR(S1_CNs[0], 0.105953, 0.001);
+ EXPECT_NEAR(S1_CNs[1], -0.109614, 0.001);
+ EXPECT_NEAR(S1_CNs[2], -0.0458293, 0.001);
+ EXPECT_NEAR(S1_CNs[3], 0.017344, 0.001);
+
+ delete[] CNs;
+}
+
+TEST(SamplesBufferTest, NS_8_NDPS_1_DN_2_SN_3_LN_2) {
+ size_t NumSamples = 8, NumDimsPerSample = 1;
+ size_t DiffN = 2, SmoothN = 3, LagN = 2;
+
+ size_t N = NumSamples * NumDimsPerSample * (LagN + 1);
+ CalculatedNumber *CNs = new CalculatedNumber[N]();
+
+ CNs[0] = 0.20511885291342846;
+ CNs[1] = 0.13151717360306558;
+ CNs[2] = 0.6017085062423134;
+ CNs[3] = 0.46256882933941545;
+ CNs[4] = 0.7887758447877941;
+ CNs[5] = 0.9237989080034406;
+ CNs[6] = 0.15552559051428083;
+ CNs[7] = 0.6309750314597955;
+
+ SamplesBuffer SB(CNs, NumSamples, NumDimsPerSample, DiffN, SmoothN, LagN);
+ SB.preprocess();
+
+ std::vector<Sample> Samples = SB.getPreprocessedSamples();
+ EXPECT_EQ(Samples.size(), 2);
+
+ Sample S0 = Samples[0];
+ const CalculatedNumber *S0_CNs = S0.getCalculatedNumbers();
+ Sample S1 = Samples[1];
+ const CalculatedNumber *S1_CNs = S1.getCalculatedNumbers();
+
+ EXPECT_NEAR(S0_CNs[0], 0.005016, 0.001);
+ EXPECT_NEAR(S0_CNs[1], 0.326450, 0.001);
+ EXPECT_NEAR(S0_CNs[2], 0.304903, 0.001);
+
+ EXPECT_NEAR(S1_CNs[0], -0.154948, 0.001);
+ EXPECT_NEAR(S1_CNs[1], 0.005016, 0.001);
+ EXPECT_NEAR(S1_CNs[2], 0.326450, 0.001);
+
+ delete[] CNs;
+}
+
+TEST(SamplesBufferTest, NS_8_NDPS_3_DN_2_SN_4_LN_1) {
+ size_t NumSamples = 8, NumDimsPerSample = 3;
+ size_t DiffN = 2, SmoothN = 4, LagN = 1;
+
+ size_t N = NumSamples * NumDimsPerSample * (LagN + 1);
+ CalculatedNumber *CNs = new CalculatedNumber[N]();
+
+ CNs[0] = 0.34310900399667765; CNs[1] = 0.14694315994488194; CNs[2] = 0.8246677800938796;
+ CNs[3] = 0.48249504592307835; CNs[4] = 0.23241087965531182; CNs[5] = 0.9595348555892567;
+ CNs[6] = 0.44281094035598334; CNs[7] = 0.5143142171362715; CNs[8] = 0.06391303014242555;
+ CNs[9] = 0.7460491027783901; CNs[10] = 0.43887217459032923; CNs[11] = 0.2814395025355999;
+ CNs[12] = 0.9231114281214198; CNs[13] = 0.326882401786898; CNs[14] = 0.26747939220376216;
+ CNs[15] = 0.7787571209969636; CNs[16] =0.5851700001235088; CNs[17] = 0.34410728945321567;
+ CNs[18] = 0.9394494507088997; CNs[19] =0.17567223681734334; CNs[20] = 0.42732886195446984;
+ CNs[21] = 0.9460522396152958; CNs[22] =0.23462747016780894; CNs[23] = 0.35983249900892145;
+
+ SamplesBuffer SB(CNs, NumSamples, NumDimsPerSample, DiffN, SmoothN, LagN);
+ SB.preprocess();
+
+ std::vector<Sample> Samples = SB.getPreprocessedSamples();
+ EXPECT_EQ(Samples.size(), 2);
+
+ Sample S0 = Samples[0];
+ const CalculatedNumber *S0_CNs = S0.getCalculatedNumbers();
+ Sample S1 = Samples[1];
+ const CalculatedNumber *S1_CNs = S1.getCalculatedNumbers();
+
+ EXPECT_NEAR(S0_CNs[0], 0.198225, 0.001);
+ EXPECT_NEAR(S0_CNs[1], 0.003529, 0.001);
+ EXPECT_NEAR(S0_CNs[2], -0.063003, 0.001);
+ EXPECT_NEAR(S0_CNs[3], 0.219066, 0.001);
+ EXPECT_NEAR(S0_CNs[4], 0.133175, 0.001);
+ EXPECT_NEAR(S0_CNs[5], -0.293154, 0.001);
+
+ EXPECT_NEAR(S1_CNs[0], 0.174160, 0.001);
+ EXPECT_NEAR(S1_CNs[1], -0.135722, 0.001);
+ EXPECT_NEAR(S1_CNs[2], 0.110452, 0.001);
+ EXPECT_NEAR(S1_CNs[3], 0.198225, 0.001);
+ EXPECT_NEAR(S1_CNs[4], 0.003529, 0.001);
+ EXPECT_NEAR(S1_CNs[5], -0.063003, 0.001);
+
+ delete[] CNs;
+}
diff --git a/ml/ml-dummy.c b/ml/ml-dummy.c
new file mode 100644
index 000000000..795b80c34
--- /dev/null
+++ b/ml/ml-dummy.c
@@ -0,0 +1,38 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+#include "ml.h"
+
+#if !defined(ENABLE_ML)
+
+void ml_init(void) {}
+
+void ml_new_host(RRDHOST *RH) { (void) RH; }
+
+void ml_delete_host(RRDHOST *RH) { (void) RH; }
+
+char *ml_get_host_info(RRDHOST *RH) { (void) RH; }
+
+void ml_new_dimension(RRDDIM *RD) { (void) RD; }
+
+void ml_delete_dimension(RRDDIM *RD) { (void) RD; }
+
+bool ml_is_anomalous(RRDDIM *RD, double Value, bool Exists) {
+ (void) RD; (void) Value; (void) Exists;
+ return false;
+}
+
+char *ml_get_anomaly_events(RRDHOST *RH, const char *AnomalyDetectorName,
+ int AnomalyDetectorVersion, time_t After, time_t Before) {
+ (void) RH; (void) AnomalyDetectorName;
+ (void) AnomalyDetectorVersion; (void) After; (void) Before;
+ return NULL;
+}
+
+char *ml_get_anomaly_event_info(RRDHOST *RH, const char *AnomalyDetectorName,
+ int AnomalyDetectorVersion, time_t After, time_t Before) {
+ (void) RH; (void) AnomalyDetectorName;
+ (void) AnomalyDetectorVersion; (void) After; (void) Before;
+ return NULL;
+}
+
+#endif
diff --git a/ml/ml-private.h b/ml/ml-private.h
new file mode 100644
index 000000000..7b3e00684
--- /dev/null
+++ b/ml/ml-private.h
@@ -0,0 +1,26 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+#ifndef ML_PRIVATE_H
+#define ML_PRIVATE_H
+
+#include "kmeans/KMeans.h"
+#include "ml/ml.h"
+
+#include <chrono>
+#include <map>
+#include <mutex>
+#include <sstream>
+
+namespace ml {
+
+using SteadyClock = std::chrono::steady_clock;
+using TimePoint = std::chrono::time_point<SteadyClock>;
+
+template<typename T>
+using Duration = std::chrono::duration<T>;
+
+using Seconds = std::chrono::seconds;
+
+} // namespace ml
+
+#endif /* ML_PRIVATE_H */
diff --git a/ml/ml.cc b/ml/ml.cc
new file mode 100644
index 000000000..857d23d33
--- /dev/null
+++ b/ml/ml.cc
@@ -0,0 +1,153 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+#include "Config.h"
+#include "Dimension.h"
+#include "Host.h"
+
+using namespace ml;
+
+/*
+ * Assumptions:
+ * 1) hosts outlive their sets, and sets outlive their dimensions,
+ * 2) dimensions always have a set that has a host.
+ */
+
+void ml_init(void) {
+ Cfg.readMLConfig();
+}
+
+void ml_new_host(RRDHOST *RH) {
+ if (!Cfg.EnableAnomalyDetection)
+ return;
+
+ if (simple_pattern_matches(Cfg.SP_HostsToSkip, RH->hostname))
+ return;
+
+ Host *H = new Host(RH);
+ RH->ml_host = static_cast<ml_host_t>(H);
+
+ H->startAnomalyDetectionThreads();
+}
+
+void ml_delete_host(RRDHOST *RH) {
+ Host *H = static_cast<Host *>(RH->ml_host);
+ if (!H)
+ return;
+
+ H->stopAnomalyDetectionThreads();
+
+ delete H;
+ RH->ml_host = nullptr;
+}
+
+void ml_new_dimension(RRDDIM *RD) {
+ RRDSET *RS = RD->rrdset;
+
+ Host *H = static_cast<Host *>(RD->rrdset->rrdhost->ml_host);
+ if (!H)
+ return;
+
+ if (static_cast<unsigned>(RD->update_every) != H->updateEvery())
+ return;
+
+ if (simple_pattern_matches(Cfg.SP_ChartsToSkip, RS->name))
+ return;
+
+ Dimension *D = new Dimension(RD);
+ RD->state->ml_dimension = static_cast<ml_dimension_t>(D);
+ H->addDimension(D);
+}
+
+void ml_delete_dimension(RRDDIM *RD) {
+ Dimension *D = static_cast<Dimension *>(RD->state->ml_dimension);
+ if (!D)
+ return;
+
+ Host *H = static_cast<Host *>(RD->rrdset->rrdhost->ml_host);
+ H->removeDimension(D);
+
+ RD->state->ml_dimension = nullptr;
+}
+
+char *ml_get_host_info(RRDHOST *RH) {
+ nlohmann::json ConfigJson;
+
+ if (RH && RH->ml_host) {
+ Host *H = static_cast<Host *>(RH->ml_host);
+ H->getConfigAsJson(ConfigJson);
+ H->getDetectionInfoAsJson(ConfigJson);
+ } else {
+ ConfigJson["enabled"] = false;
+ }
+
+ return strdup(ConfigJson.dump(2, '\t').c_str());
+}
+
+bool ml_is_anomalous(RRDDIM *RD, double Value, bool Exists) {
+ Dimension *D = static_cast<Dimension *>(RD->state->ml_dimension);
+ if (!D)
+ return false;
+
+ D->addValue(Value, Exists);
+ bool Result = D->predict().second;
+ return Result;
+}
+
+char *ml_get_anomaly_events(RRDHOST *RH, const char *AnomalyDetectorName,
+ int AnomalyDetectorVersion, time_t After, time_t Before) {
+ if (!RH || !RH->ml_host) {
+ error("No host");
+ return nullptr;
+ }
+
+ Host *H = static_cast<Host *>(RH->ml_host);
+ std::vector<std::pair<time_t, time_t>> TimeRanges;
+
+ bool Res = H->getAnomaliesInRange(TimeRanges, AnomalyDetectorName,
+ AnomalyDetectorVersion,
+ H->getUUID(),
+ After, Before);
+ if (!Res) {
+ error("DB result is empty");
+ return nullptr;
+ }
+
+ nlohmann::json Json = TimeRanges;
+ return strdup(Json.dump(4).c_str());
+}
+
+char *ml_get_anomaly_event_info(RRDHOST *RH, const char *AnomalyDetectorName,
+ int AnomalyDetectorVersion, time_t After, time_t Before) {
+ if (!RH || !RH->ml_host) {
+ error("No host");
+ return nullptr;
+ }
+
+ Host *H = static_cast<Host *>(RH->ml_host);
+
+ nlohmann::json Json;
+ bool Res = H->getAnomalyInfo(Json, AnomalyDetectorName,
+ AnomalyDetectorVersion,
+ H->getUUID(),
+ After, Before);
+ if (!Res) {
+ error("DB result is empty");
+ return nullptr;
+ }
+
+ return strdup(Json.dump(4, '\t').c_str());
+}
+
+#if defined(ENABLE_ML_TESTS)
+
+#include "gtest/gtest.h"
+
+int test_ml(int argc, char *argv[]) {
+ (void) argc;
+ (void) argv;
+
+ ::testing::InitGoogleTest(&argc, argv);
+ return RUN_ALL_TESTS();
+}
+
+#endif // ENABLE_ML_TESTS
diff --git a/ml/ml.h b/ml/ml.h
new file mode 100644
index 000000000..96448453c
--- /dev/null
+++ b/ml/ml.h
@@ -0,0 +1,41 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+#ifndef NETDATA_ML_H
+#define NETDATA_ML_H
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+#include "daemon/common.h"
+
+typedef void* ml_host_t;
+typedef void* ml_dimension_t;
+
+void ml_init(void);
+
+void ml_new_host(RRDHOST *RH);
+void ml_delete_host(RRDHOST *RH);
+
+char *ml_get_host_info(RRDHOST *RH);
+
+void ml_new_dimension(RRDDIM *RD);
+void ml_delete_dimension(RRDDIM *RD);
+
+bool ml_is_anomalous(RRDDIM *RD, double value, bool exists);
+
+char *ml_get_anomaly_events(RRDHOST *RH, const char *AnomalyDetectorName,
+ int AnomalyDetectorVersion, time_t After, time_t Before);
+
+char *ml_get_anomaly_event_info(RRDHOST *RH, const char *AnomalyDetectorName,
+ int AnomalyDetectorVersion, time_t After, time_t Before);
+
+#if defined(ENABLE_ML_TESTS)
+int test_ml(int argc, char *argv[]);
+#endif
+
+#ifdef __cplusplus
+};
+#endif
+
+#endif /* NETDATA_ML_H */