summaryrefslogtreecommitdiffstats
path: root/ml/ml.cc
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2023-05-08 16:27:04 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2023-05-08 16:27:04 +0000
commita836a244a3d2bdd4da1ee2641e3e957850668cea (patch)
treecb87c75b3677fab7144f868435243f864048a1e6 /ml/ml.cc
parentAdding upstream version 1.38.1. (diff)
downloadnetdata-a836a244a3d2bdd4da1ee2641e3e957850668cea.tar.xz
netdata-a836a244a3d2bdd4da1ee2641e3e957850668cea.zip
Adding upstream version 1.39.0.upstream/1.39.0
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'ml/ml.cc')
-rw-r--r--ml/ml.cc1657
1 files changed, 1538 insertions, 119 deletions
diff --git a/ml/ml.cc b/ml/ml.cc
index 461c83baa..1f49f4bf1 100644
--- a/ml/ml.cc
+++ b/ml/ml.cc
@@ -1,202 +1,1621 @@
// SPDX-License-Identifier: GPL-3.0-or-later
-#include "Config.h"
-#include "Dimension.h"
-#include "Chart.h"
-#include "Host.h"
+#include <dlib/clustering.h>
+
+#include "ml-private.h"
#include <random>
-using namespace ml;
+#include "ad_charts.h"
+#include "database/sqlite/sqlite3.h"
+
+#define WORKER_TRAIN_QUEUE_POP 0
+#define WORKER_TRAIN_ACQUIRE_DIMENSION 1
+#define WORKER_TRAIN_QUERY 2
+#define WORKER_TRAIN_KMEANS 3
+#define WORKER_TRAIN_UPDATE_MODELS 4
+#define WORKER_TRAIN_RELEASE_DIMENSION 5
+#define WORKER_TRAIN_UPDATE_HOST 6
+#define WORKER_TRAIN_FLUSH_MODELS 7
+
+static sqlite3 *db = NULL;
+static netdata_mutex_t db_mutex = NETDATA_MUTEX_INITIALIZER;
+
+/*
+ * Functions to convert enums to strings
+*/
+
+__attribute__((unused)) static const char *
+ml_machine_learning_status_to_string(enum ml_machine_learning_status mls)
+{
+ switch (mls) {
+ case MACHINE_LEARNING_STATUS_ENABLED:
+ return "enabled";
+ case MACHINE_LEARNING_STATUS_DISABLED_DUE_TO_EXCLUDED_CHART:
+ return "disabled-sp";
+ default:
+ return "unknown";
+ }
+}
+
+__attribute__((unused)) static const char *
+ml_metric_type_to_string(enum ml_metric_type mt)
+{
+ switch (mt) {
+ case METRIC_TYPE_CONSTANT:
+ return "constant";
+ case METRIC_TYPE_VARIABLE:
+ return "variable";
+ default:
+ return "unknown";
+ }
+}
+
+__attribute__((unused)) static const char *
+ml_training_status_to_string(enum ml_training_status ts)
+{
+ switch (ts) {
+ case TRAINING_STATUS_PENDING_WITH_MODEL:
+ return "pending-with-model";
+ case TRAINING_STATUS_PENDING_WITHOUT_MODEL:
+ return "pending-without-model";
+ case TRAINING_STATUS_TRAINED:
+ return "trained";
+ case TRAINING_STATUS_UNTRAINED:
+ return "untrained";
+ default:
+ return "unknown";
+ }
+}
+
+__attribute__((unused)) static const char *
+ml_training_result_to_string(enum ml_training_result tr)
+{
+ switch (tr) {
+ case TRAINING_RESULT_OK:
+ return "ok";
+ case TRAINING_RESULT_INVALID_QUERY_TIME_RANGE:
+ return "invalid-query";
+ case TRAINING_RESULT_NOT_ENOUGH_COLLECTED_VALUES:
+ return "missing-values";
+ case TRAINING_RESULT_NULL_ACQUIRED_DIMENSION:
+ return "null-acquired-dim";
+ case TRAINING_RESULT_CHART_UNDER_REPLICATION:
+ return "chart-under-replication";
+ default:
+ return "unknown";
+ }
+}
+
+/*
+ * Features
+*/
+
+// subtract elements that are `diff_n` positions apart
+static void
+ml_features_diff(ml_features_t *features)
+{
+ if (features->diff_n == 0)
+ return;
+
+ for (size_t idx = 0; idx != (features->src_n - features->diff_n); idx++) {
+ size_t high = (features->src_n - 1) - idx;
+ size_t low = high - features->diff_n;
+
+ features->dst[low] = features->src[high] - features->src[low];
+ }
+
+ size_t n = features->src_n - features->diff_n;
+ memcpy(features->src, features->dst, n * sizeof(calculated_number_t));
+
+ for (size_t idx = features->src_n - features->diff_n; idx != features->src_n; idx++)
+ features->src[idx] = 0.0;
+}
+
+// a function that computes the window average of an array inplace
+static void
+ml_features_smooth(ml_features_t *features)
+{
+ calculated_number_t sum = 0.0;
+
+ size_t idx = 0;
+ for (; idx != features->smooth_n - 1; idx++)
+ sum += features->src[idx];
+
+ for (; idx != (features->src_n - features->diff_n); idx++) {
+ sum += features->src[idx];
+ calculated_number_t prev_cn = features->src[idx - (features->smooth_n - 1)];
+ features->src[idx - (features->smooth_n - 1)] = sum / features->smooth_n;
+ sum -= prev_cn;
+ }
+
+ for (idx = 0; idx != features->smooth_n; idx++)
+ features->src[(features->src_n - 1) - idx] = 0.0;
+}
+
+// create lag'd vectors out of the preprocessed buffer
+static void
+ml_features_lag(ml_features_t *features)
+{
+ size_t n = features->src_n - features->diff_n - features->smooth_n + 1 - features->lag_n;
+ features->preprocessed_features.resize(n);
+
+ unsigned target_num_samples = Cfg.max_train_samples * Cfg.random_sampling_ratio;
+ double sampling_ratio = std::min(static_cast<double>(target_num_samples) / n, 1.0);
+
+ uint32_t max_mt = std::numeric_limits<uint32_t>::max();
+ uint32_t cutoff = static_cast<double>(max_mt) * sampling_ratio;
+
+ size_t sample_idx = 0;
+
+ for (size_t idx = 0; idx != n; idx++) {
+ DSample &DS = features->preprocessed_features[sample_idx++];
+ DS.set_size(features->lag_n);
+
+ if (Cfg.random_nums[idx] > cutoff) {
+ sample_idx--;
+ continue;
+ }
+
+ for (size_t feature_idx = 0; feature_idx != features->lag_n + 1; feature_idx++)
+ DS(feature_idx) = features->src[idx + feature_idx];
+ }
+
+ features->preprocessed_features.resize(sample_idx);
+}
+
+static void
+ml_features_preprocess(ml_features_t *features)
+{
+ ml_features_diff(features);
+ ml_features_smooth(features);
+ ml_features_lag(features);
+}
+
+/*
+ * KMeans
+*/
+
+static void
+ml_kmeans_init(ml_kmeans_t *kmeans)
+{
+ kmeans->cluster_centers.reserve(2);
+ kmeans->min_dist = std::numeric_limits<calculated_number_t>::max();
+ kmeans->max_dist = std::numeric_limits<calculated_number_t>::min();
+}
+
+static void
+ml_kmeans_train(ml_kmeans_t *kmeans, const ml_features_t *features, time_t after, time_t before)
+{
+ kmeans->after = (uint32_t) after;
+ kmeans->before = (uint32_t) before;
+
+ kmeans->min_dist = std::numeric_limits<calculated_number_t>::max();
+ kmeans->max_dist = std::numeric_limits<calculated_number_t>::min();
+
+ kmeans->cluster_centers.clear();
+
+ dlib::pick_initial_centers(2, kmeans->cluster_centers, features->preprocessed_features);
+ dlib::find_clusters_using_kmeans(features->preprocessed_features, kmeans->cluster_centers, Cfg.max_kmeans_iters);
+
+ for (const auto &preprocessed_feature : features->preprocessed_features) {
+ calculated_number_t mean_dist = 0.0;
+
+ for (const auto &cluster_center : kmeans->cluster_centers) {
+ mean_dist += dlib::length(cluster_center - preprocessed_feature);
+ }
+
+ mean_dist /= kmeans->cluster_centers.size();
+
+ if (mean_dist < kmeans->min_dist)
+ kmeans->min_dist = mean_dist;
+
+ if (mean_dist > kmeans->max_dist)
+ kmeans->max_dist = mean_dist;
+ }
+}
+
+static calculated_number_t
+ml_kmeans_anomaly_score(const ml_kmeans_t *kmeans, const DSample &DS)
+{
+ calculated_number_t mean_dist = 0.0;
+ for (const auto &CC: kmeans->cluster_centers)
+ mean_dist += dlib::length(CC - DS);
+
+ mean_dist /= kmeans->cluster_centers.size();
+
+ if (kmeans->max_dist == kmeans->min_dist)
+ return 0.0;
+
+ calculated_number_t anomaly_score = 100.0 * std::abs((mean_dist - kmeans->min_dist) / (kmeans->max_dist - kmeans->min_dist));
+ return (anomaly_score > 100.0) ? 100.0 : anomaly_score;
+}
+
+/*
+ * Queue
+*/
+
+static ml_queue_t *
+ml_queue_init()
+{
+ ml_queue_t *q = new ml_queue_t();
+
+ netdata_mutex_init(&q->mutex);
+ pthread_cond_init(&q->cond_var, NULL);
+ q->exit = false;
+ return q;
+}
+
+static void
+ml_queue_destroy(ml_queue_t *q)
+{
+ netdata_mutex_destroy(&q->mutex);
+ pthread_cond_destroy(&q->cond_var);
+ delete q;
+}
+
+static void
+ml_queue_push(ml_queue_t *q, const ml_training_request_t req)
+{
+ netdata_mutex_lock(&q->mutex);
+ q->internal.push(req);
+ pthread_cond_signal(&q->cond_var);
+ netdata_mutex_unlock(&q->mutex);
+}
+
+static ml_training_request_t
+ml_queue_pop(ml_queue_t *q)
+{
+ netdata_mutex_lock(&q->mutex);
+
+ ml_training_request_t req = {
+ {'\0'}, // machine_guid
+ NULL, // chart id
+ NULL, // dimension id
+ 0, // current time
+ 0, // first entry
+ 0 // last entry
+ };
+
+ while (q->internal.empty()) {
+ pthread_cond_wait(&q->cond_var, &q->mutex);
+
+ if (q->exit) {
+ netdata_mutex_unlock(&q->mutex);
+
+ // We return a dummy request because the queue has been signaled
+ return req;
+ }
+ }
+
+ req = q->internal.front();
+ q->internal.pop();
+
+ netdata_mutex_unlock(&q->mutex);
+ return req;
+}
+
+static size_t
+ml_queue_size(ml_queue_t *q)
+{
+ netdata_mutex_lock(&q->mutex);
+ size_t size = q->internal.size();
+ netdata_mutex_unlock(&q->mutex);
+ return size;
+}
+
+static void
+ml_queue_signal(ml_queue_t *q)
+{
+ netdata_mutex_lock(&q->mutex);
+ q->exit = true;
+ pthread_cond_signal(&q->cond_var);
+ netdata_mutex_unlock(&q->mutex);
+}
+
+/*
+ * Dimension
+*/
+
+static std::pair<calculated_number_t *, ml_training_response_t>
+ml_dimension_calculated_numbers(ml_training_thread_t *training_thread, ml_dimension_t *dim, const ml_training_request_t &training_request)
+{
+ ml_training_response_t training_response = {};
+
+ training_response.request_time = training_request.request_time;
+ training_response.first_entry_on_request = training_request.first_entry_on_request;
+ training_response.last_entry_on_request = training_request.last_entry_on_request;
+
+ training_response.first_entry_on_response = rrddim_first_entry_s_of_tier(dim->rd, 0);
+ training_response.last_entry_on_response = rrddim_last_entry_s_of_tier(dim->rd, 0);
+
+ size_t min_n = Cfg.min_train_samples;
+ size_t max_n = Cfg.max_train_samples;
+
+ // Figure out what our time window should be.
+ training_response.query_before_t = training_response.last_entry_on_response;
+ training_response.query_after_t = std::max(
+ training_response.query_before_t - static_cast<time_t>((max_n - 1) * dim->rd->update_every),
+ training_response.first_entry_on_response
+ );
+
+ if (training_response.query_after_t >= training_response.query_before_t) {
+ training_response.result = TRAINING_RESULT_INVALID_QUERY_TIME_RANGE;
+ return { NULL, training_response };
+ }
+
+ if (rrdset_is_replicating(dim->rd->rrdset)) {
+ training_response.result = TRAINING_RESULT_CHART_UNDER_REPLICATION;
+ return { NULL, training_response };
+ }
+
+ /*
+ * Execute the query
+ */
+ struct storage_engine_query_handle handle;
+
+ storage_engine_query_init(dim->rd->tiers[0].backend, dim->rd->tiers[0].db_metric_handle, &handle,
+ training_response.query_after_t, training_response.query_before_t,
+ STORAGE_PRIORITY_BEST_EFFORT);
+
+ size_t idx = 0;
+ memset(training_thread->training_cns, 0, sizeof(calculated_number_t) * max_n * (Cfg.lag_n + 1));
+ calculated_number_t last_value = std::numeric_limits<calculated_number_t>::quiet_NaN();
+
+ while (!storage_engine_query_is_finished(&handle)) {
+ if (idx == max_n)
+ break;
+
+ STORAGE_POINT sp = storage_engine_query_next_metric(&handle);
+
+ time_t timestamp = sp.end_time_s;
+ calculated_number_t value = sp.sum / sp.count;
+
+ if (netdata_double_isnumber(value)) {
+ if (!training_response.db_after_t)
+ training_response.db_after_t = timestamp;
+ training_response.db_before_t = timestamp;
+
+ training_thread->training_cns[idx] = value;
+ last_value = training_thread->training_cns[idx];
+ training_response.collected_values++;
+ } else
+ training_thread->training_cns[idx] = last_value;
+
+ idx++;
+ }
+ storage_engine_query_finalize(&handle);
+
+ global_statistics_ml_query_completed(/* points_read */ idx);
+
+ training_response.total_values = idx;
+ if (training_response.collected_values < min_n) {
+ training_response.result = TRAINING_RESULT_NOT_ENOUGH_COLLECTED_VALUES;
+ return { NULL, training_response };
+ }
+
+ // Find first non-NaN value.
+ for (idx = 0; std::isnan(training_thread->training_cns[idx]); idx++, training_response.total_values--) { }
+
+ // Overwrite NaN values.
+ if (idx != 0)
+ memmove(training_thread->training_cns, &training_thread->training_cns[idx], sizeof(calculated_number_t) * training_response.total_values);
+
+ training_response.result = TRAINING_RESULT_OK;
+ return { training_thread->training_cns, training_response };
+}
+
+const char *db_models_create_table =
+ "CREATE TABLE IF NOT EXISTS models("
+ " dim_id BLOB, after INT, before INT,"
+ " min_dist REAL, max_dist REAL,"
+ " c00 REAL, c01 REAL, c02 REAL, c03 REAL, c04 REAL, c05 REAL,"
+ " c10 REAL, c11 REAL, c12 REAL, c13 REAL, c14 REAL, c15 REAL,"
+ " PRIMARY KEY(dim_id, after)"
+ ");";
+
+const char *db_models_add_model =
+ "INSERT OR REPLACE INTO models("
+ " dim_id, after, before,"
+ " min_dist, max_dist,"
+ " c00, c01, c02, c03, c04, c05,"
+ " c10, c11, c12, c13, c14, c15)"
+ "VALUES("
+ " @dim_id, @after, @before,"
+ " @min_dist, @max_dist,"
+ " @c00, @c01, @c02, @c03, @c04, @c05,"
+ " @c10, @c11, @c12, @c13, @c14, @c15);";
+
+const char *db_models_load =
+ "SELECT * FROM models "
+ "WHERE dim_id = @dim_id AND after >= @after ORDER BY before ASC;";
+
+const char *db_models_delete =
+ "DELETE FROM models "
+ "WHERE dim_id = @dim_id AND before < @before;";
+
+static int
+ml_dimension_add_model(const uuid_t *metric_uuid, const ml_kmeans_t *km)
+{
+ static __thread sqlite3_stmt *res = NULL;
+ int param = 0;
+ int rc = 0;
+
+ if (unlikely(!db)) {
+ error_report("Database has not been initialized");
+ return 1;
+ }
+
+ if (unlikely(!res)) {
+ rc = prepare_statement(db, db_models_add_model, &res);
+ if (unlikely(rc != SQLITE_OK)) {
+ error_report("Failed to prepare statement to store model, rc = %d", rc);
+ return 1;
+ }
+ }
+
+ rc = sqlite3_bind_blob(res, ++param, metric_uuid, sizeof(*metric_uuid), SQLITE_STATIC);
+ if (unlikely(rc != SQLITE_OK))
+ goto bind_fail;
+
+ rc = sqlite3_bind_int(res, ++param, (int) km->after);
+ if (unlikely(rc != SQLITE_OK))
+ goto bind_fail;
+
+ rc = sqlite3_bind_int(res, ++param, (int) km->before);
+ if (unlikely(rc != SQLITE_OK))
+ goto bind_fail;
+
+ rc = sqlite3_bind_double(res, ++param, km->min_dist);
+ if (unlikely(rc != SQLITE_OK))
+ goto bind_fail;
+
+ rc = sqlite3_bind_double(res, ++param, km->max_dist);
+ if (unlikely(rc != SQLITE_OK))
+ goto bind_fail;
+
+ if (km->cluster_centers.size() != 2)
+ fatal("Expected 2 cluster centers, got %zu", km->cluster_centers.size());
+
+ for (const DSample &ds : km->cluster_centers) {
+ if (ds.size() != 6)
+ fatal("Expected dsample with 6 dimensions, got %ld", ds.size());
+
+ for (long idx = 0; idx != ds.size(); idx++) {
+ calculated_number_t cn = ds(idx);
+ int rc = sqlite3_bind_double(res, ++param, cn);
+ if (unlikely(rc != SQLITE_OK))
+ goto bind_fail;
+ }
+ }
+
+ rc = execute_insert(res);
+ if (unlikely(rc != SQLITE_DONE))
+ error_report("Failed to store model, rc = %d", rc);
+
+ rc = sqlite3_reset(res);
+ if (unlikely(rc != SQLITE_OK))
+ error_report("Failed to reset statement when storing model, rc = %d", rc);
+
+ return 0;
+
+bind_fail:
+ error_report("Failed to bind parameter %d to store model, rc = %d", param, rc);
+ rc = sqlite3_reset(res);
+ if (unlikely(rc != SQLITE_OK))
+ error_report("Failed to reset statement to store model, rc = %d", rc);
+ return 1;
+}
+
+static int
+ml_dimension_delete_models(const uuid_t *metric_uuid, time_t before)
+{
+ static __thread sqlite3_stmt *res = NULL;
+ int rc = 0;
+ int param = 0;
+
+ if (unlikely(!db)) {
+ error_report("Database has not been initialized");
+ return 1;
+ }
+
+ if (unlikely(!res)) {
+ rc = prepare_statement(db, db_models_delete, &res);
+ if (unlikely(rc != SQLITE_OK)) {
+ error_report("Failed to prepare statement to delete models, rc = %d", rc);
+ return 1;
+ }
+ }
+
+ rc = sqlite3_bind_blob(res, ++param, metric_uuid, sizeof(*metric_uuid), SQLITE_STATIC);
+ if (unlikely(rc != SQLITE_OK))
+ goto bind_fail;
+
+ rc = sqlite3_bind_int(res, ++param, (int) before);
+ if (unlikely(rc != SQLITE_OK))
+ goto bind_fail;
+
+ rc = execute_insert(res);
+ if (unlikely(rc != SQLITE_DONE))
+ error_report("Failed to delete models, rc = %d", rc);
+
+ rc = sqlite3_reset(res);
+ if (unlikely(rc != SQLITE_OK))
+ error_report("Failed to reset statement when deleting models, rc = %d", rc);
+
+ return 0;
+
+bind_fail:
+ error_report("Failed to bind parameter %d to delete models, rc = %d", param, rc);
+ rc = sqlite3_reset(res);
+ if (unlikely(rc != SQLITE_OK))
+ error_report("Failed to reset statement to delete models, rc = %d", rc);
+ return 1;
+}
+
+int ml_dimension_load_models(RRDDIM *rd) {
+ ml_dimension_t *dim = (ml_dimension_t *) rd->ml_dimension;
+ if (!dim)
+ return 0;
+
+ netdata_mutex_lock(&dim->mutex);
+ bool is_empty = dim->km_contexts.empty();
+ netdata_mutex_unlock(&dim->mutex);
+
+ if (!is_empty)
+ return 0;
+
+ std::vector<ml_kmeans_t> V;
+
+ static __thread sqlite3_stmt *res = NULL;
+ int rc = 0;
+ int param = 0;
+
+ if (unlikely(!db)) {
+ error_report("Database has not been initialized");
+ return 1;
+ }
+
+ if (unlikely(!res)) {
+ rc = prepare_statement(db, db_models_load, &res);
+ if (unlikely(rc != SQLITE_OK)) {
+ error_report("Failed to prepare statement to load models, rc = %d", rc);
+ return 1;
+ }
+ }
+
+ rc = sqlite3_bind_blob(res, ++param, &dim->rd->metric_uuid, sizeof(dim->rd->metric_uuid), SQLITE_STATIC);
+ if (unlikely(rc != SQLITE_OK))
+ goto bind_fail;
+
+ rc = sqlite3_bind_int(res, ++param, now_realtime_usec() - (Cfg.num_models_to_use * Cfg.max_train_samples));
+ if (unlikely(rc != SQLITE_OK))
+ goto bind_fail;
+
+ netdata_mutex_lock(&dim->mutex);
+
+ dim->km_contexts.reserve(Cfg.num_models_to_use);
+ while ((rc = sqlite3_step_monitored(res)) == SQLITE_ROW) {
+ ml_kmeans_t km;
+
+ km.after = sqlite3_column_int(res, 2);
+ km.before = sqlite3_column_int(res, 3);
+
+ km.min_dist = sqlite3_column_int(res, 4);
+ km.max_dist = sqlite3_column_int(res, 5);
+
+ km.cluster_centers.resize(2);
+
+ km.cluster_centers[0].set_size(Cfg.lag_n + 1);
+ km.cluster_centers[0](0) = sqlite3_column_double(res, 6);
+ km.cluster_centers[0](1) = sqlite3_column_double(res, 7);
+ km.cluster_centers[0](2) = sqlite3_column_double(res, 8);
+ km.cluster_centers[0](3) = sqlite3_column_double(res, 9);
+ km.cluster_centers[0](4) = sqlite3_column_double(res, 10);
+ km.cluster_centers[0](5) = sqlite3_column_double(res, 11);
+
+ km.cluster_centers[1].set_size(Cfg.lag_n + 1);
+ km.cluster_centers[1](0) = sqlite3_column_double(res, 12);
+ km.cluster_centers[1](1) = sqlite3_column_double(res, 13);
+ km.cluster_centers[1](2) = sqlite3_column_double(res, 14);
+ km.cluster_centers[1](3) = sqlite3_column_double(res, 15);
+ km.cluster_centers[1](4) = sqlite3_column_double(res, 16);
+ km.cluster_centers[1](5) = sqlite3_column_double(res, 17);
+
+ dim->km_contexts.push_back(km);
+ }
+
+ if (!dim->km_contexts.empty()) {
+ dim->ts = TRAINING_STATUS_TRAINED;
+ }
+
+ netdata_mutex_unlock(&dim->mutex);
+
+ if (unlikely(rc != SQLITE_DONE))
+ error_report("Failed to load models, rc = %d", rc);
+
+ rc = sqlite3_reset(res);
+ if (unlikely(rc != SQLITE_OK))
+ error_report("Failed to reset statement when loading models, rc = %d", rc);
+
+ return 0;
+
+bind_fail:
+ error_report("Failed to bind parameter %d to load models, rc = %d", param, rc);
+ rc = sqlite3_reset(res);
+ if (unlikely(rc != SQLITE_OK))
+ error_report("Failed to reset statement to load models, rc = %d", rc);
+ return 1;
+}
+
+static enum ml_training_result
+ml_dimension_train_model(ml_training_thread_t *training_thread, ml_dimension_t *dim, const ml_training_request_t &training_request)
+{
+ worker_is_busy(WORKER_TRAIN_QUERY);
+ auto P = ml_dimension_calculated_numbers(training_thread, dim, training_request);
+ ml_training_response_t training_response = P.second;
+
+ if (training_response.result != TRAINING_RESULT_OK) {
+ netdata_mutex_lock(&dim->mutex);
+
+ dim->mt = METRIC_TYPE_CONSTANT;
+
+ switch (dim->ts) {
+ case TRAINING_STATUS_PENDING_WITH_MODEL:
+ dim->ts = TRAINING_STATUS_TRAINED;
+ break;
+ case TRAINING_STATUS_PENDING_WITHOUT_MODEL:
+ dim->ts = TRAINING_STATUS_UNTRAINED;
+ break;
+ default:
+ break;
+ }
+
+ dim->tr = training_response;
+
+ dim->last_training_time = training_response.last_entry_on_response;
+ enum ml_training_result result = training_response.result;
+ netdata_mutex_unlock(&dim->mutex);
+
+ return result;
+ }
+
+ // compute kmeans
+ worker_is_busy(WORKER_TRAIN_KMEANS);
+ {
+ memcpy(training_thread->scratch_training_cns, training_thread->training_cns,
+ training_response.total_values * sizeof(calculated_number_t));
+
+ ml_features_t features = {
+ Cfg.diff_n, Cfg.smooth_n, Cfg.lag_n,
+ training_thread->scratch_training_cns, training_response.total_values,
+ training_thread->training_cns, training_response.total_values,
+ training_thread->training_samples
+ };
+ ml_features_preprocess(&features);
+
+ ml_kmeans_init(&dim->kmeans);
+ ml_kmeans_train(&dim->kmeans, &features, training_response.query_after_t, training_response.query_before_t);
+ }
+
+ // update models
+ worker_is_busy(WORKER_TRAIN_UPDATE_MODELS);
+ {
+ netdata_mutex_lock(&dim->mutex);
+
+ if (dim->km_contexts.size() < Cfg.num_models_to_use) {
+ dim->km_contexts.push_back(std::move(dim->kmeans));
+ } else {
+ bool can_drop_middle_km = false;
+
+ if (Cfg.num_models_to_use > 2) {
+ const ml_kmeans_t *old_km = &dim->km_contexts[dim->km_contexts.size() - 1];
+ const ml_kmeans_t *middle_km = &dim->km_contexts[dim->km_contexts.size() - 2];
+ const ml_kmeans_t *new_km = &dim->kmeans;
+
+ can_drop_middle_km = (middle_km->after < old_km->before) &&
+ (middle_km->before > new_km->after);
+ }
+
+ if (can_drop_middle_km) {
+ dim->km_contexts.back() = dim->kmeans;
+ } else {
+ std::rotate(std::begin(dim->km_contexts), std::begin(dim->km_contexts) + 1, std::end(dim->km_contexts));
+ dim->km_contexts[dim->km_contexts.size() - 1] = std::move(dim->kmeans);
+ }
+ }
+
+ dim->mt = METRIC_TYPE_CONSTANT;
+ dim->ts = TRAINING_STATUS_TRAINED;
+ dim->tr = training_response;
+ dim->last_training_time = rrddim_last_entry_s(dim->rd);
+
+ // Add the newly generated model to the list of pending models to flush
+ ml_model_info_t model_info;
+ uuid_copy(model_info.metric_uuid, dim->rd->metric_uuid);
+ model_info.kmeans = dim->km_contexts.back();
+ training_thread->pending_model_info.push_back(model_info);
+
+ netdata_mutex_unlock(&dim->mutex);
+ }
+
+ return training_response.result;
+}
+
+static void
+ml_dimension_schedule_for_training(ml_dimension_t *dim, time_t curr_time)
+{
+ switch (dim->mt) {
+ case METRIC_TYPE_CONSTANT:
+ return;
+ default:
+ break;
+ }
+
+ bool schedule_for_training = false;
+
+ switch (dim->ts) {
+ case TRAINING_STATUS_PENDING_WITH_MODEL:
+ case TRAINING_STATUS_PENDING_WITHOUT_MODEL:
+ schedule_for_training = false;
+ break;
+ case TRAINING_STATUS_UNTRAINED:
+ schedule_for_training = true;
+ dim->ts = TRAINING_STATUS_PENDING_WITHOUT_MODEL;
+ break;
+ case TRAINING_STATUS_TRAINED:
+ if ((dim->last_training_time + (Cfg.train_every * dim->rd->update_every)) < curr_time) {
+ schedule_for_training = true;
+ dim->ts = TRAINING_STATUS_PENDING_WITH_MODEL;
+ }
+ break;
+ }
+
+ if (schedule_for_training) {
+ ml_training_request_t req;
+
+ memcpy(req.machine_guid, dim->rd->rrdset->rrdhost->machine_guid, GUID_LEN + 1);
+ req.chart_id = string_dup(dim->rd->rrdset->id);
+ req.dimension_id = string_dup(dim->rd->id);
+ req.request_time = curr_time;
+ req.first_entry_on_request = rrddim_first_entry_s(dim->rd);
+ req.last_entry_on_request = rrddim_last_entry_s(dim->rd);
+
+ ml_host_t *host = (ml_host_t *) dim->rd->rrdset->rrdhost->ml_host;
+ ml_queue_push(host->training_queue, req);
+ }
+}
+
+static bool
+ml_dimension_predict(ml_dimension_t *dim, time_t curr_time, calculated_number_t value, bool exists)
+{
+ // Nothing to do if ML is disabled for this dimension
+ if (dim->mls != MACHINE_LEARNING_STATUS_ENABLED)
+ return false;
+
+ // Don't treat values that don't exist as anomalous
+ if (!exists) {
+ dim->cns.clear();
+ return false;
+ }
+
+ // Save the value and return if we don't have enough values for a sample
+ unsigned n = Cfg.diff_n + Cfg.smooth_n + Cfg.lag_n;
+ if (dim->cns.size() < n) {
+ dim->cns.push_back(value);
+ return false;
+ }
+
+ // Push the value and check if it's different from the last one
+ bool same_value = true;
+ std::rotate(std::begin(dim->cns), std::begin(dim->cns) + 1, std::end(dim->cns));
+ if (dim->cns[n - 1] != value)
+ same_value = false;
+ dim->cns[n - 1] = value;
+
+ // Create the sample
+ assert((n * (Cfg.lag_n + 1) <= 128) &&
+ "Static buffers too small to perform prediction. "
+ "This should not be possible with the default clamping of feature extraction options");
+ calculated_number_t src_cns[128];
+ calculated_number_t dst_cns[128];
+
+ memset(src_cns, 0, n * (Cfg.lag_n + 1) * sizeof(calculated_number_t));
+ memcpy(src_cns, dim->cns.data(), n * sizeof(calculated_number_t));
+ memcpy(dst_cns, dim->cns.data(), n * sizeof(calculated_number_t));
+
+ ml_features_t features = {
+ Cfg.diff_n, Cfg.smooth_n, Cfg.lag_n,
+ dst_cns, n, src_cns, n,
+ dim->feature
+ };
+ ml_features_preprocess(&features);
+
+ /*
+ * Lock to predict and possibly schedule the dimension for training
+ */
+ if (netdata_mutex_trylock(&dim->mutex) != 0)
+ return false;
+
+ // Mark the metric time as variable if we received different values
+ if (!same_value)
+ dim->mt = METRIC_TYPE_VARIABLE;
+
+ // Decide if the dimension needs to be scheduled for training
+ ml_dimension_schedule_for_training(dim, curr_time);
+
+ // Nothing to do if we don't have a model
+ switch (dim->ts) {
+ case TRAINING_STATUS_UNTRAINED:
+ case TRAINING_STATUS_PENDING_WITHOUT_MODEL: {
+ netdata_mutex_unlock(&dim->mutex);
+ return false;
+ }
+ default:
+ break;
+ }
+
+ /*
+ * Use the KMeans models to check if the value is anomalous
+ */
+
+ size_t sum = 0;
+ size_t models_consulted = 0;
+
+ for (const auto &km_ctx : dim->km_contexts) {
+ models_consulted++;
+
+ calculated_number_t anomaly_score = ml_kmeans_anomaly_score(&km_ctx, features.preprocessed_features[0]);
+ if (anomaly_score == std::numeric_limits<calculated_number_t>::quiet_NaN())
+ continue;
+
+ if (anomaly_score < (100 * Cfg.dimension_anomaly_score_threshold)) {
+ global_statistics_ml_models_consulted(models_consulted);
+ netdata_mutex_unlock(&dim->mutex);
+ return false;
+ }
+
+ sum += 1;
+ }
+
+ netdata_mutex_unlock(&dim->mutex);
+
+ global_statistics_ml_models_consulted(models_consulted);
+ return sum;
+}
+
+/*
+ * Chart
+*/
+
+static bool
+ml_chart_is_available_for_ml(ml_chart_t *chart)
+{
+ return rrdset_is_available_for_exporting_and_alarms(chart->rs);
+}
+
+void
+ml_chart_update_dimension(ml_chart_t *chart, ml_dimension_t *dim, bool is_anomalous)
+{
+ switch (dim->mls) {
+ case MACHINE_LEARNING_STATUS_DISABLED_DUE_TO_EXCLUDED_CHART:
+ chart->mls.num_machine_learning_status_disabled_sp++;
+ return;
+ case MACHINE_LEARNING_STATUS_ENABLED: {
+ chart->mls.num_machine_learning_status_enabled++;
+
+ switch (dim->mt) {
+ case METRIC_TYPE_CONSTANT:
+ chart->mls.num_metric_type_constant++;
+ chart->mls.num_training_status_trained++;
+ chart->mls.num_normal_dimensions++;
+ return;
+ case METRIC_TYPE_VARIABLE:
+ chart->mls.num_metric_type_variable++;
+ break;
+ }
+
+ switch (dim->ts) {
+ case TRAINING_STATUS_UNTRAINED:
+ chart->mls.num_training_status_untrained++;
+ return;
+ case TRAINING_STATUS_PENDING_WITHOUT_MODEL:
+ chart->mls.num_training_status_pending_without_model++;
+ return;
+ case TRAINING_STATUS_TRAINED:
+ chart->mls.num_training_status_trained++;
+
+ chart->mls.num_anomalous_dimensions += is_anomalous;
+ chart->mls.num_normal_dimensions += !is_anomalous;
+ return;
+ case TRAINING_STATUS_PENDING_WITH_MODEL:
+ chart->mls.num_training_status_pending_with_model++;
+
+ chart->mls.num_anomalous_dimensions += is_anomalous;
+ chart->mls.num_normal_dimensions += !is_anomalous;
+ return;
+ }
+
+ return;
+ }
+ }
+}
+
+/*
+ * Host detection & training functions
+*/
+
+#define WORKER_JOB_DETECTION_COLLECT_STATS 0
+#define WORKER_JOB_DETECTION_DIM_CHART 1
+#define WORKER_JOB_DETECTION_HOST_CHART 2
+#define WORKER_JOB_DETECTION_STATS 3
+
+static void
+ml_host_detect_once(ml_host_t *host)
+{
+ worker_is_busy(WORKER_JOB_DETECTION_COLLECT_STATS);
+
+ host->mls = {};
+ ml_machine_learning_stats_t mls_copy = {};
+
+ {
+ netdata_mutex_lock(&host->mutex);
+
+ /*
+ * prediction/detection stats
+ */
+ void *rsp = NULL;
+ rrdset_foreach_read(rsp, host->rh) {
+ RRDSET *rs = static_cast<RRDSET *>(rsp);
-bool ml_capable() {
+ ml_chart_t *chart = (ml_chart_t *) rs->ml_chart;
+ if (!chart)
+ continue;
+
+ if (!ml_chart_is_available_for_ml(chart))
+ continue;
+
+ ml_machine_learning_stats_t chart_mls = chart->mls;
+
+ host->mls.num_machine_learning_status_enabled += chart_mls.num_machine_learning_status_enabled;
+ host->mls.num_machine_learning_status_disabled_sp += chart_mls.num_machine_learning_status_disabled_sp;
+
+ host->mls.num_metric_type_constant += chart_mls.num_metric_type_constant;
+ host->mls.num_metric_type_variable += chart_mls.num_metric_type_variable;
+
+ host->mls.num_training_status_untrained += chart_mls.num_training_status_untrained;
+ host->mls.num_training_status_pending_without_model += chart_mls.num_training_status_pending_without_model;
+ host->mls.num_training_status_trained += chart_mls.num_training_status_trained;
+ host->mls.num_training_status_pending_with_model += chart_mls.num_training_status_pending_with_model;
+
+ host->mls.num_anomalous_dimensions += chart_mls.num_anomalous_dimensions;
+ host->mls.num_normal_dimensions += chart_mls.num_normal_dimensions;
+ }
+ rrdset_foreach_done(rsp);
+
+ host->host_anomaly_rate = 0.0;
+ size_t NumActiveDimensions = host->mls.num_anomalous_dimensions + host->mls.num_normal_dimensions;
+ if (NumActiveDimensions)
+ host->host_anomaly_rate = static_cast<double>(host->mls.num_anomalous_dimensions) / NumActiveDimensions;
+
+ mls_copy = host->mls;
+
+ netdata_mutex_unlock(&host->mutex);
+ }
+
+ worker_is_busy(WORKER_JOB_DETECTION_DIM_CHART);
+ ml_update_dimensions_chart(host, mls_copy);
+
+ worker_is_busy(WORKER_JOB_DETECTION_HOST_CHART);
+ ml_update_host_and_detection_rate_charts(host, host->host_anomaly_rate * 10000.0);
+}
+
+typedef struct {
+ RRDHOST_ACQUIRED *acq_rh;
+ RRDSET_ACQUIRED *acq_rs;
+ RRDDIM_ACQUIRED *acq_rd;
+ ml_dimension_t *dim;
+} ml_acquired_dimension_t;
+
+static ml_acquired_dimension_t
+ml_acquired_dimension_get(char *machine_guid, STRING *chart_id, STRING *dimension_id)
+{
+ RRDHOST_ACQUIRED *acq_rh = NULL;
+ RRDSET_ACQUIRED *acq_rs = NULL;
+ RRDDIM_ACQUIRED *acq_rd = NULL;
+ ml_dimension_t *dim = NULL;
+
+ rrd_rdlock();
+
+ acq_rh = rrdhost_find_and_acquire(machine_guid);
+ if (acq_rh) {
+ RRDHOST *rh = rrdhost_acquired_to_rrdhost(acq_rh);
+ if (rh && !rrdhost_flag_check(rh, RRDHOST_FLAG_ORPHAN | RRDHOST_FLAG_ARCHIVED)) {
+ acq_rs = rrdset_find_and_acquire(rh, string2str(chart_id));
+ if (acq_rs) {
+ RRDSET *rs = rrdset_acquired_to_rrdset(acq_rs);
+ if (rs && !rrdset_flag_check(rs, RRDSET_FLAG_ARCHIVED | RRDSET_FLAG_OBSOLETE)) {
+ acq_rd = rrddim_find_and_acquire(rs, string2str(dimension_id));
+ if (acq_rd) {
+ RRDDIM *rd = rrddim_acquired_to_rrddim(acq_rd);
+ if (rd)
+ dim = (ml_dimension_t *) rd->ml_dimension;
+ }
+ }
+ }
+ }
+ }
+
+ rrd_unlock();
+
+ ml_acquired_dimension_t acq_dim = {
+ acq_rh, acq_rs, acq_rd, dim
+ };
+
+ return acq_dim;
+}
+
+static void
+ml_acquired_dimension_release(ml_acquired_dimension_t acq_dim)
+{
+ if (acq_dim.acq_rd)
+ rrddim_acquired_release(acq_dim.acq_rd);
+
+ if (acq_dim.acq_rs)
+ rrdset_acquired_release(acq_dim.acq_rs);
+
+ if (acq_dim.acq_rh)
+ rrdhost_acquired_release(acq_dim.acq_rh);
+}
+
+static enum ml_training_result
+ml_acquired_dimension_train(ml_training_thread_t *training_thread, ml_acquired_dimension_t acq_dim, const ml_training_request_t &tr)
+{
+ if (!acq_dim.dim)
+ return TRAINING_RESULT_NULL_ACQUIRED_DIMENSION;
+
+ return ml_dimension_train_model(training_thread, acq_dim.dim, tr);
+}
+
+static void *
+ml_detect_main(void *arg)
+{
+ UNUSED(arg);
+
+ worker_register("MLDETECT");
+ worker_register_job_name(WORKER_JOB_DETECTION_COLLECT_STATS, "collect stats");
+ worker_register_job_name(WORKER_JOB_DETECTION_DIM_CHART, "dim chart");
+ worker_register_job_name(WORKER_JOB_DETECTION_HOST_CHART, "host chart");
+ worker_register_job_name(WORKER_JOB_DETECTION_STATS, "training stats");
+
+ heartbeat_t hb;
+ heartbeat_init(&hb);
+
+ while (!Cfg.detection_stop) {
+ worker_is_idle();
+ heartbeat_next(&hb, USEC_PER_SEC);
+
+ RRDHOST *rh;
+ rrd_rdlock();
+ rrdhost_foreach_read(rh) {
+ if (!rh->ml_host)
+ continue;
+
+ ml_host_detect_once((ml_host_t *) rh->ml_host);
+ }
+ rrd_unlock();
+
+ if (Cfg.enable_statistics_charts) {
+ // collect and update training thread stats
+ for (size_t idx = 0; idx != Cfg.num_training_threads; idx++) {
+ ml_training_thread_t *training_thread = &Cfg.training_threads[idx];
+
+ netdata_mutex_lock(&training_thread->nd_mutex);
+ ml_training_stats_t training_stats = training_thread->training_stats;
+ training_thread->training_stats = {};
+ netdata_mutex_unlock(&training_thread->nd_mutex);
+
+ // calc the avg values
+ if (training_stats.num_popped_items) {
+ training_stats.queue_size /= training_stats.num_popped_items;
+ training_stats.allotted_ut /= training_stats.num_popped_items;
+ training_stats.consumed_ut /= training_stats.num_popped_items;
+ training_stats.remaining_ut /= training_stats.num_popped_items;
+ } else {
+ training_stats.queue_size = ml_queue_size(training_thread->training_queue);
+ training_stats.consumed_ut = 0;
+ training_stats.remaining_ut = training_stats.allotted_ut;
+
+ training_stats.training_result_ok = 0;
+ training_stats.training_result_invalid_query_time_range = 0;
+ training_stats.training_result_not_enough_collected_values = 0;
+ training_stats.training_result_null_acquired_dimension = 0;
+ training_stats.training_result_chart_under_replication = 0;
+ }
+
+ ml_update_training_statistics_chart(training_thread, training_stats);
+ }
+ }
+ }
+
+ return NULL;
+}
+
+/*
+ * Public API
+*/
+
+bool ml_capable()
+{
return true;
}
-bool ml_enabled(RRDHOST *RH) {
- if (!Cfg.EnableAnomalyDetection)
+bool ml_enabled(RRDHOST *rh)
+{
+ if (!rh)
+ return false;
+
+ if (!Cfg.enable_anomaly_detection)
return false;
- if (simple_pattern_matches(Cfg.SP_HostsToSkip, rrdhost_hostname(RH)))
+ if (simple_pattern_matches(Cfg.sp_host_to_skip, rrdhost_hostname(rh)))
return false;
return true;
}
-/*
- * Assumptions:
- * 1) hosts outlive their sets, and sets outlive their dimensions,
- * 2) dimensions always have a set that has a host.
- */
+bool ml_streaming_enabled()
+{
+ return Cfg.stream_anomaly_detection_charts;
+}
-void ml_init(void) {
- // Read config values
- Cfg.readMLConfig();
+void ml_host_new(RRDHOST *rh)
+{
+ if (!ml_enabled(rh))
+ return;
+
+ ml_host_t *host = new ml_host_t();
- if (!Cfg.EnableAnomalyDetection)
+ host->rh = rh;
+ host->mls = ml_machine_learning_stats_t();
+ //host->ts = ml_training_stats_t();
+
+ static std::atomic<size_t> times_called(0);
+ host->training_queue = Cfg.training_threads[times_called++ % Cfg.num_training_threads].training_queue;
+
+ host->host_anomaly_rate = 0.0;
+
+ netdata_mutex_init(&host->mutex);
+
+ rh->ml_host = (rrd_ml_host_t *) host;
+}
+
+void ml_host_delete(RRDHOST *rh)
+{
+ ml_host_t *host = (ml_host_t *) rh->ml_host;
+ if (!host)
return;
- // Generate random numbers to efficiently sample the features we need
- // for KMeans clustering.
- std::random_device RD;
- std::mt19937 Gen(RD());
+ netdata_mutex_destroy(&host->mutex);
- Cfg.RandomNums.reserve(Cfg.MaxTrainSamples);
- for (size_t Idx = 0; Idx != Cfg.MaxTrainSamples; Idx++)
- Cfg.RandomNums.push_back(Gen());
+ delete host;
+ rh->ml_host = NULL;
}
-void ml_host_new(RRDHOST *RH) {
- if (!ml_enabled(RH))
+void ml_host_get_info(RRDHOST *rh, BUFFER *wb)
+{
+ ml_host_t *host = (ml_host_t *) rh->ml_host;
+ if (!host) {
+ buffer_json_member_add_boolean(wb, "enabled", false);
return;
+ }
+
+ buffer_json_member_add_uint64(wb, "version", 1);
+
+ buffer_json_member_add_boolean(wb, "enabled", Cfg.enable_anomaly_detection);
- Host *H = new Host(RH);
- RH->ml_host = reinterpret_cast<ml_host_t *>(H);
+ buffer_json_member_add_uint64(wb, "min-train-samples", Cfg.min_train_samples);
+ buffer_json_member_add_uint64(wb, "max-train-samples", Cfg.max_train_samples);
+ buffer_json_member_add_uint64(wb, "train-every", Cfg.train_every);
+
+ buffer_json_member_add_uint64(wb, "diff-n", Cfg.diff_n);
+ buffer_json_member_add_uint64(wb, "smooth-n", Cfg.smooth_n);
+ buffer_json_member_add_uint64(wb, "lag-n", Cfg.lag_n);
+
+ buffer_json_member_add_double(wb, "random-sampling-ratio", Cfg.random_sampling_ratio);
+ buffer_json_member_add_uint64(wb, "max-kmeans-iters", Cfg.random_sampling_ratio);
+
+ buffer_json_member_add_double(wb, "dimension-anomaly-score-threshold", Cfg.dimension_anomaly_score_threshold);
+
+ buffer_json_member_add_string(wb, "anomaly-detection-grouping-method",
+ time_grouping_method2string(Cfg.anomaly_detection_grouping_method));
+
+ buffer_json_member_add_int64(wb, "anomaly-detection-query-duration", Cfg.anomaly_detection_query_duration);
+
+ buffer_json_member_add_string(wb, "hosts-to-skip", Cfg.hosts_to_skip.c_str());
+ buffer_json_member_add_string(wb, "charts-to-skip", Cfg.charts_to_skip.c_str());
}
-void ml_host_delete(RRDHOST *RH) {
- Host *H = reinterpret_cast<Host *>(RH->ml_host);
- if (!H)
+void ml_host_get_detection_info(RRDHOST *rh, BUFFER *wb)
+{
+ ml_host_t *host = (ml_host_t *) rh->ml_host;
+ if (!host)
return;
- delete H;
- RH->ml_host = nullptr;
+ netdata_mutex_lock(&host->mutex);
+
+ buffer_json_member_add_uint64(wb, "version", 1);
+ buffer_json_member_add_uint64(wb, "anomalous-dimensions", host->mls.num_anomalous_dimensions);
+ buffer_json_member_add_uint64(wb, "normal-dimensions", host->mls.num_normal_dimensions);
+ buffer_json_member_add_uint64(wb, "total-dimensions", host->mls.num_anomalous_dimensions +
+ host->mls.num_normal_dimensions);
+ buffer_json_member_add_uint64(wb, "trained-dimensions", host->mls.num_training_status_trained +
+ host->mls.num_training_status_pending_with_model);
+ netdata_mutex_unlock(&host->mutex);
}
-void ml_chart_new(RRDSET *RS) {
- Host *H = reinterpret_cast<Host *>(RS->rrdhost->ml_host);
- if (!H)
+void ml_host_get_models(RRDHOST *rh, BUFFER *wb)
+{
+ UNUSED(rh);
+ UNUSED(wb);
+
+ // TODO: To be implemented
+ error("Fetching KMeans models is not supported yet");
+}
+
+void ml_chart_new(RRDSET *rs)
+{
+ ml_host_t *host = (ml_host_t *) rs->rrdhost->ml_host;
+ if (!host)
return;
- Chart *C = new Chart(RS);
- RS->ml_chart = reinterpret_cast<ml_chart_t *>(C);
+ ml_chart_t *chart = new ml_chart_t();
- H->addChart(C);
+ chart->rs = rs;
+ chart->mls = ml_machine_learning_stats_t();
+
+ netdata_mutex_init(&chart->mutex);
+
+ rs->ml_chart = (rrd_ml_chart_t *) chart;
}
-void ml_chart_delete(RRDSET *RS) {
- Host *H = reinterpret_cast<Host *>(RS->rrdhost->ml_host);
- if (!H)
+void ml_chart_delete(RRDSET *rs)
+{
+ ml_host_t *host = (ml_host_t *) rs->rrdhost->ml_host;
+ if (!host)
return;
- Chart *C = reinterpret_cast<Chart *>(RS->ml_chart);
- H->removeChart(C);
+ ml_chart_t *chart = (ml_chart_t *) rs->ml_chart;
+
+ netdata_mutex_destroy(&chart->mutex);
- delete C;
- RS->ml_chart = nullptr;
+ delete chart;
+ rs->ml_chart = NULL;
}
-void ml_dimension_new(RRDDIM *RD) {
- Chart *C = reinterpret_cast<Chart *>(RD->rrdset->ml_chart);
- if (!C)
+bool ml_chart_update_begin(RRDSET *rs)
+{
+ ml_chart_t *chart = (ml_chart_t *) rs->ml_chart;
+ if (!chart)
+ return false;
+
+ netdata_mutex_lock(&chart->mutex);
+ chart->mls = {};
+ return true;
+}
+
+void ml_chart_update_end(RRDSET *rs)
+{
+ ml_chart_t *chart = (ml_chart_t *) rs->ml_chart;
+ if (!chart)
return;
- Dimension *D = new Dimension(RD);
- RD->ml_dimension = reinterpret_cast<ml_dimension_t *>(D);
- C->addDimension(D);
+ netdata_mutex_unlock(&chart->mutex);
}
-void ml_dimension_delete(RRDDIM *RD) {
- Dimension *D = reinterpret_cast<Dimension *>(RD->ml_dimension);
- if (!D)
+void ml_dimension_new(RRDDIM *rd)
+{
+ ml_chart_t *chart = (ml_chart_t *) rd->rrdset->ml_chart;
+ if (!chart)
return;
- Chart *C = reinterpret_cast<Chart *>(RD->rrdset->ml_chart);
- C->removeDimension(D);
+ ml_dimension_t *dim = new ml_dimension_t();
+
+ dim->rd = rd;
+
+ dim->mt = METRIC_TYPE_CONSTANT;
+ dim->ts = TRAINING_STATUS_UNTRAINED;
+
+ dim->last_training_time = 0;
+
+ ml_kmeans_init(&dim->kmeans);
+
+ if (simple_pattern_matches(Cfg.sp_charts_to_skip, rrdset_name(rd->rrdset)))
+ dim->mls = MACHINE_LEARNING_STATUS_DISABLED_DUE_TO_EXCLUDED_CHART;
+ else
+ dim->mls = MACHINE_LEARNING_STATUS_ENABLED;
+
+ netdata_mutex_init(&dim->mutex);
+
+ dim->km_contexts.reserve(Cfg.num_models_to_use);
+
+ rd->ml_dimension = (rrd_ml_dimension_t *) dim;
- delete D;
- RD->ml_dimension = nullptr;
+ metaqueue_ml_load_models(rd);
}
-char *ml_get_host_info(RRDHOST *RH) {
- nlohmann::json ConfigJson;
+void ml_dimension_delete(RRDDIM *rd)
+{
+ ml_dimension_t *dim = (ml_dimension_t *) rd->ml_dimension;
+ if (!dim)
+ return;
- if (RH && RH->ml_host) {
- Host *H = reinterpret_cast<Host *>(RH->ml_host);
- H->getConfigAsJson(ConfigJson);
- } else {
- ConfigJson["enabled"] = false;
- }
+ netdata_mutex_destroy(&dim->mutex);
- return strdupz(ConfigJson.dump(2, '\t').c_str());
+ delete dim;
+ rd->ml_dimension = NULL;
}
-char *ml_get_host_runtime_info(RRDHOST *RH) {
- nlohmann::json ConfigJson;
+bool ml_dimension_is_anomalous(RRDDIM *rd, time_t curr_time, double value, bool exists)
+{
+ ml_dimension_t *dim = (ml_dimension_t *) rd->ml_dimension;
+ if (!dim)
+ return false;
- if (RH && RH->ml_host) {
- Host *H = reinterpret_cast<Host *>(RH->ml_host);
- H->getDetectionInfoAsJson(ConfigJson);
- } else {
- return nullptr;
- }
+ ml_chart_t *chart = (ml_chart_t *) rd->rrdset->ml_chart;
- return strdup(ConfigJson.dump(1, '\t').c_str());
+ bool is_anomalous = ml_dimension_predict(dim, curr_time, value, exists);
+ ml_chart_update_dimension(chart, dim, is_anomalous);
+
+ return is_anomalous;
}
-char *ml_get_host_models(RRDHOST *RH) {
- nlohmann::json ModelsJson;
+static int ml_flush_pending_models(ml_training_thread_t *training_thread) {
+ (void) db_execute(db, "BEGIN TRANSACTION;");
+
+ for (const auto &pending_model: training_thread->pending_model_info) {
+ int rc = ml_dimension_add_model(&pending_model.metric_uuid, &pending_model.kmeans);
+ if (rc)
+ return rc;
- if (RH && RH->ml_host) {
- Host *H = reinterpret_cast<Host *>(RH->ml_host);
- H->getModelsAsJson(ModelsJson);
- return strdup(ModelsJson.dump(2, '\t').c_str());
+ rc = ml_dimension_delete_models(&pending_model.metric_uuid, pending_model.kmeans.before - (Cfg.num_models_to_use * Cfg.train_every));
+ if (rc)
+ return rc;
}
- return nullptr;
+ (void) db_execute(db, "COMMIT TRANSACTION;");
+
+ training_thread->pending_model_info.clear();
+ return 0;
}
-void ml_start_anomaly_detection_threads(RRDHOST *RH) {
- if (RH && RH->ml_host) {
- Host *H = reinterpret_cast<Host *>(RH->ml_host);
- H->startAnomalyDetectionThreads();
+static void *ml_train_main(void *arg) {
+ ml_training_thread_t *training_thread = (ml_training_thread_t *) arg;
+
+ char worker_name[1024];
+ snprintfz(worker_name, 1024, "training_thread_%zu", training_thread->id);
+ worker_register("MLTRAIN");
+
+ worker_register_job_name(WORKER_TRAIN_QUEUE_POP, "pop queue");
+ worker_register_job_name(WORKER_TRAIN_ACQUIRE_DIMENSION, "acquire");
+ worker_register_job_name(WORKER_TRAIN_QUERY, "query");
+ worker_register_job_name(WORKER_TRAIN_KMEANS, "kmeans");
+ worker_register_job_name(WORKER_TRAIN_UPDATE_MODELS, "update models");
+ worker_register_job_name(WORKER_TRAIN_RELEASE_DIMENSION, "release");
+ worker_register_job_name(WORKER_TRAIN_UPDATE_HOST, "update host");
+ worker_register_job_name(WORKER_TRAIN_FLUSH_MODELS, "flush models");
+
+ while (!Cfg.training_stop) {
+ worker_is_busy(WORKER_TRAIN_QUEUE_POP);
+
+ ml_training_request_t training_req = ml_queue_pop(training_thread->training_queue);
+
+ // we know this thread has been cancelled, when the queue starts
+ // returning "null" requests without blocking on queue's pop().
+ if (training_req.chart_id == NULL)
+ break;
+
+ size_t queue_size = ml_queue_size(training_thread->training_queue) + 1;
+
+ usec_t allotted_ut = (Cfg.train_every * USEC_PER_SEC) / queue_size;
+ if (allotted_ut > USEC_PER_SEC)
+ allotted_ut = USEC_PER_SEC;
+
+ usec_t start_ut = now_monotonic_usec();
+
+ enum ml_training_result training_res;
+ {
+ worker_is_busy(WORKER_TRAIN_ACQUIRE_DIMENSION);
+ ml_acquired_dimension_t acq_dim = ml_acquired_dimension_get(
+ training_req.machine_guid,
+ training_req.chart_id,
+ training_req.dimension_id);
+
+ training_res = ml_acquired_dimension_train(training_thread, acq_dim, training_req);
+
+ string_freez(training_req.chart_id);
+ string_freez(training_req.dimension_id);
+
+ worker_is_busy(WORKER_TRAIN_RELEASE_DIMENSION);
+ ml_acquired_dimension_release(acq_dim);
+ }
+
+ usec_t consumed_ut = now_monotonic_usec() - start_ut;
+
+ usec_t remaining_ut = 0;
+ if (consumed_ut < allotted_ut)
+ remaining_ut = allotted_ut - consumed_ut;
+
+ if (Cfg.enable_statistics_charts) {
+ worker_is_busy(WORKER_TRAIN_UPDATE_HOST);
+
+ netdata_mutex_lock(&training_thread->nd_mutex);
+
+ training_thread->training_stats.queue_size += queue_size;
+ training_thread->training_stats.num_popped_items += 1;
+
+ training_thread->training_stats.allotted_ut += allotted_ut;
+ training_thread->training_stats.consumed_ut += consumed_ut;
+ training_thread->training_stats.remaining_ut += remaining_ut;
+
+ switch (training_res) {
+ case TRAINING_RESULT_OK:
+ training_thread->training_stats.training_result_ok += 1;
+ break;
+ case TRAINING_RESULT_INVALID_QUERY_TIME_RANGE:
+ training_thread->training_stats.training_result_invalid_query_time_range += 1;
+ break;
+ case TRAINING_RESULT_NOT_ENOUGH_COLLECTED_VALUES:
+ training_thread->training_stats.training_result_not_enough_collected_values += 1;
+ break;
+ case TRAINING_RESULT_NULL_ACQUIRED_DIMENSION:
+ training_thread->training_stats.training_result_null_acquired_dimension += 1;
+ break;
+ case TRAINING_RESULT_CHART_UNDER_REPLICATION:
+ training_thread->training_stats.training_result_chart_under_replication += 1;
+ break;
+ }
+
+ netdata_mutex_unlock(&training_thread->nd_mutex);
+ }
+
+ if (training_thread->pending_model_info.size() >= Cfg.flush_models_batch_size) {
+ worker_is_busy(WORKER_TRAIN_FLUSH_MODELS);
+ netdata_mutex_lock(&db_mutex);
+ ml_flush_pending_models(training_thread);
+ netdata_mutex_unlock(&db_mutex);
+ continue;
+ }
+
+ worker_is_idle();
+ std::this_thread::sleep_for(std::chrono::microseconds{remaining_ut});
}
+
+ return NULL;
}
-void ml_stop_anomaly_detection_threads(RRDHOST *RH) {
- if (RH && RH->ml_host) {
- Host *H = reinterpret_cast<Host *>(RH->ml_host);
- H->stopAnomalyDetectionThreads(true);
+void ml_init()
+{
+ // Read config values
+ ml_config_load(&Cfg);
+
+ if (!Cfg.enable_anomaly_detection)
+ return;
+
+ // Generate random numbers to efficiently sample the features we need
+ // for KMeans clustering.
+ std::random_device RD;
+ std::mt19937 Gen(RD());
+
+ Cfg.random_nums.reserve(Cfg.max_train_samples);
+ for (size_t Idx = 0; Idx != Cfg.max_train_samples; Idx++)
+ Cfg.random_nums.push_back(Gen());
+
+ // init training thread-specific data
+ Cfg.training_threads.resize(Cfg.num_training_threads);
+ for (size_t idx = 0; idx != Cfg.num_training_threads; idx++) {
+ ml_training_thread_t *training_thread = &Cfg.training_threads[idx];
+
+ size_t max_elements_needed_for_training = Cfg.max_train_samples * (Cfg.lag_n + 1);
+ training_thread->training_cns = new calculated_number_t[max_elements_needed_for_training]();
+ training_thread->scratch_training_cns = new calculated_number_t[max_elements_needed_for_training]();
+
+ training_thread->id = idx;
+ training_thread->training_queue = ml_queue_init();
+ training_thread->pending_model_info.reserve(Cfg.flush_models_batch_size);
+ netdata_mutex_init(&training_thread->nd_mutex);
}
-}
-void ml_cancel_anomaly_detection_threads(RRDHOST *RH) {
- if (RH && RH->ml_host) {
- Host *H = reinterpret_cast<Host *>(RH->ml_host);
- H->stopAnomalyDetectionThreads(false);
+ // open sqlite db
+ char path[FILENAME_MAX];
+ snprintfz(path, FILENAME_MAX - 1, "%s/%s", netdata_configured_cache_dir, "ml.db");
+ int rc = sqlite3_open(path, &db);
+ if (rc != SQLITE_OK) {
+ error_report("Failed to initialize database at %s, due to \"%s\"", path, sqlite3_errstr(rc));
+ sqlite3_close(db);
+ db = NULL;
+ }
+
+ if (db) {
+ char *err = NULL;
+ int rc = sqlite3_exec(db, db_models_create_table, NULL, NULL, &err);
+ if (rc != SQLITE_OK) {
+ error_report("Failed to create models table (%s, %s)", sqlite3_errstr(rc), err ? err : "");
+ sqlite3_close(db);
+ sqlite3_free(err);
+ db = NULL;
+ }
}
}
-void ml_chart_update_begin(RRDSET *RS) {
- Chart *C = reinterpret_cast<Chart *>(RS->ml_chart);
- if (!C)
+void ml_fini() {
+ if (!Cfg.enable_anomaly_detection)
return;
- C->updateBegin();
+ int rc = sqlite3_close_v2(db);
+ if (unlikely(rc != SQLITE_OK))
+ error_report("Error %d while closing the SQLite database, %s", rc, sqlite3_errstr(rc));
}
-void ml_chart_update_end(RRDSET *RS) {
- Chart *C = reinterpret_cast<Chart *>(RS->ml_chart);
- if (!C)
+void ml_start_threads() {
+ if (!Cfg.enable_anomaly_detection)
return;
- C->updateEnd();
-}
+ // start detection & training threads
+ Cfg.detection_stop = false;
+ Cfg.training_stop = false;
-bool ml_is_anomalous(RRDDIM *RD, time_t CurrT, double Value, bool Exists) {
- Dimension *D = reinterpret_cast<Dimension *>(RD->ml_dimension);
- if (!D)
- return false;
+ char tag[NETDATA_THREAD_TAG_MAX + 1];
- Chart *C = reinterpret_cast<Chart *>(RD->rrdset->ml_chart);
+ snprintfz(tag, NETDATA_THREAD_TAG_MAX, "%s", "PREDICT");
+ netdata_thread_create(&Cfg.detection_thread, tag, NETDATA_THREAD_OPTION_JOINABLE, ml_detect_main, NULL);
- bool IsAnomalous = D->predict(CurrT, Value, Exists);
- C->updateDimension(D, IsAnomalous);
- return IsAnomalous;
+ for (size_t idx = 0; idx != Cfg.num_training_threads; idx++) {
+ ml_training_thread_t *training_thread = &Cfg.training_threads[idx];
+ snprintfz(tag, NETDATA_THREAD_TAG_MAX, "TRAIN[%zu]", training_thread->id);
+ netdata_thread_create(&training_thread->nd_thread, tag, NETDATA_THREAD_OPTION_JOINABLE, ml_train_main, training_thread);
+ }
}
-bool ml_streaming_enabled() {
- return Cfg.StreamADCharts;
-}
+void ml_stop_threads()
+{
+ if (!Cfg.enable_anomaly_detection)
+ return;
-#include "ml-private.h"
+ Cfg.detection_stop = true;
+ Cfg.training_stop = true;
+
+ netdata_thread_cancel(Cfg.detection_thread);
+ netdata_thread_join(Cfg.detection_thread, NULL);
+
+ // signal the training queue of each thread
+ for (size_t idx = 0; idx != Cfg.num_training_threads; idx++) {
+ ml_training_thread_t *training_thread = &Cfg.training_threads[idx];
+
+ ml_queue_signal(training_thread->training_queue);
+ }
+
+ // cancel training threads
+ for (size_t idx = 0; idx != Cfg.num_training_threads; idx++) {
+ ml_training_thread_t *training_thread = &Cfg.training_threads[idx];
+
+ netdata_thread_cancel(training_thread->nd_thread);
+ }
+
+ // join training threads
+ for (size_t idx = 0; idx != Cfg.num_training_threads; idx++) {
+ ml_training_thread_t *training_thread = &Cfg.training_threads[idx];
+
+ netdata_thread_join(training_thread->nd_thread, NULL);
+ }
+
+ // clear training thread data
+ for (size_t idx = 0; idx != Cfg.num_training_threads; idx++) {
+ ml_training_thread_t *training_thread = &Cfg.training_threads[idx];
+
+ delete[] training_thread->training_cns;
+ delete[] training_thread->scratch_training_cns;
+ ml_queue_destroy(training_thread->training_queue);
+ netdata_mutex_destroy(&training_thread->nd_mutex);
+ }
+}