summaryrefslogtreecommitdiffstats
path: root/collectors/python.d.plugin/anomalies/anomalies.chart.py
diff options
context:
space:
mode:
Diffstat (limited to 'collectors/python.d.plugin/anomalies/anomalies.chart.py')
-rw-r--r--collectors/python.d.plugin/anomalies/anomalies.chart.py125
1 files changed, 101 insertions, 24 deletions
diff --git a/collectors/python.d.plugin/anomalies/anomalies.chart.py b/collectors/python.d.plugin/anomalies/anomalies.chart.py
index 97dbb1d1e..61b51d9c0 100644
--- a/collectors/python.d.plugin/anomalies/anomalies.chart.py
+++ b/collectors/python.d.plugin/anomalies/anomalies.chart.py
@@ -3,6 +3,7 @@
# Author: andrewm4894
# SPDX-License-Identifier: GPL-3.0-or-later
+import sys
import time
from datetime import datetime
import re
@@ -51,14 +52,17 @@ class Service(SimpleService):
self.basic_init()
self.charts_init()
self.custom_models_init()
+ self.data_init()
self.model_params_init()
self.models_init()
+ self.collected_dims = {'probability': set(), 'anomaly': set()}
def check(self):
- _ = get_allmetrics_async(
- host_charts_dict=self.host_charts_dict, host_prefix=True, host_sep='::', wide=True, sort_cols=True,
- protocol=self.protocol, numeric_only=True, float_size='float32', user=self.username, pwd=self.password
- )
+ python_version = float('{}.{}'.format(sys.version_info[0], sys.version_info[1]))
+ if python_version < 3.6:
+ self.error("anomalies collector only works with Python>=3.6")
+ if len(self.host_charts_dict[self.host]) > 0:
+ _ = get_allmetrics_async(host_charts_dict=self.host_charts_dict, protocol=self.protocol, user=self.username, pwd=self.password)
return True
def basic_init(self):
@@ -70,17 +74,18 @@ class Service(SimpleService):
self.host = self.configuration.get('host', '127.0.0.1:19999')
self.username = self.configuration.get('username', None)
self.password = self.configuration.get('password', None)
+ self.tls_verify = self.configuration.get('tls_verify', True)
self.fitted_at = {}
self.df_allmetrics = pd.DataFrame()
- self.data_latest = {}
self.last_train_at = 0
self.include_average_prob = bool(self.configuration.get('include_average_prob', True))
+ self.reinitialize_at_every_step = bool(self.configuration.get('reinitialize_at_every_step', False))
def charts_init(self):
"""Do some initialisation of charts in scope related variables.
"""
self.charts_regex = re.compile(self.configuration.get('charts_regex','None'))
- self.charts_available = [c for c in list(requests.get(f'{self.protocol}://{self.host}/api/v1/charts').json().get('charts', {}).keys())]
+ self.charts_available = [c for c in list(requests.get(f'{self.protocol}://{self.host}/api/v1/charts', verify=self.tls_verify).json().get('charts', {}).keys())]
self.charts_in_scope = list(filter(self.charts_regex.match, self.charts_available))
self.charts_to_exclude = self.configuration.get('charts_to_exclude', '').split(',')
if len(self.charts_to_exclude) > 0:
@@ -115,6 +120,14 @@ class Service(SimpleService):
self.models_in_scope = [f'{self.host}::{c}' for c in self.charts_in_scope]
self.host_charts_dict = {self.host: self.charts_in_scope}
self.model_display_names = {model: model.split('::')[1] if '::' in model else model for model in self.models_in_scope}
+ #self.info(f'self.host_charts_dict (len={len(self.host_charts_dict[self.host])}): {self.host_charts_dict}')
+
+ def data_init(self):
+ """Initialize some empty data objects.
+ """
+ self.data_probability_latest = {f'{m}_prob': 0 for m in self.charts_in_scope}
+ self.data_anomaly_latest = {f'{m}_anomaly': 0 for m in self.charts_in_scope}
+ self.data_latest = {**self.data_probability_latest, **self.data_anomaly_latest}
def model_params_init(self):
"""Model parameters initialisation.
@@ -153,12 +166,55 @@ class Service(SimpleService):
self.models = {model: HBOS(contamination=self.contamination) for model in self.models_in_scope}
self.custom_model_scalers = {model: MinMaxScaler() for model in self.models_in_scope}
- def validate_charts(self, name, data, algorithm='absolute', multiplier=1, divisor=1):
+ def model_init(self, model):
+ """Model initialisation of a single model.
+ """
+ if self.model == 'pca':
+ self.models[model] = PCA(contamination=self.contamination)
+ elif self.model == 'loda':
+ self.models[model] = LODA(contamination=self.contamination)
+ elif self.model == 'iforest':
+ self.models[model] = IForest(n_estimators=50, bootstrap=True, behaviour='new', contamination=self.contamination)
+ elif self.model == 'cblof':
+ self.models[model] = CBLOF(n_clusters=3, contamination=self.contamination)
+ elif self.model == 'feature_bagging':
+ self.models[model] = FeatureBagging(base_estimator=PCA(contamination=self.contamination), contamination=self.contamination)
+ elif self.model == 'copod':
+ self.models[model] = COPOD(contamination=self.contamination)
+ elif self.model == 'hbos':
+ self.models[model] = HBOS(contamination=self.contamination)
+ else:
+ self.models[model] = HBOS(contamination=self.contamination)
+ self.custom_model_scalers[model] = MinMaxScaler()
+
+ def reinitialize(self):
+ """Reinitialize charts, models and data to a begining state.
+ """
+ self.charts_init()
+ self.custom_models_init()
+ self.data_init()
+ self.model_params_init()
+ self.models_init()
+
+ def save_data_latest(self, data, data_probability, data_anomaly):
+ """Save the most recent data objects to be used if needed in the future.
+ """
+ self.data_latest = data
+ self.data_probability_latest = data_probability
+ self.data_anomaly_latest = data_anomaly
+
+ def validate_charts(self, chart, data, algorithm='absolute', multiplier=1, divisor=1):
"""If dimension not in chart then add it.
"""
for dim in data:
- if dim not in self.charts[name]:
- self.charts[name].add_dimension([dim, dim, algorithm, multiplier, divisor])
+ if dim not in self.collected_dims[chart]:
+ self.collected_dims[chart].add(dim)
+ self.charts[chart].add_dimension([dim, dim, algorithm, multiplier, divisor])
+
+ for dim in list(self.collected_dims[chart]):
+ if dim not in data:
+ self.collected_dims[chart].remove(dim)
+ self.charts[chart].del_dimension(dim, hide=False)
def add_custom_models_dims(self, df):
"""Given a df, select columns used by custom models, add custom model name as prefix, and append to df.
@@ -242,8 +298,9 @@ class Service(SimpleService):
# get training data
df_train = get_data(
host_charts_dict=self.host_charts_dict, host_prefix=True, host_sep='::', after=after, before=before,
- sort_cols=True, numeric_only=True, protocol=self.protocol, float_size='float32', user=self.username, pwd=self.password
- ).ffill()
+ sort_cols=True, numeric_only=True, protocol=self.protocol, float_size='float32', user=self.username, pwd=self.password,
+ verify=self.tls_verify
+ ).ffill()
if self.custom_models:
df_train = self.add_custom_models_dims(df_train)
@@ -262,6 +319,8 @@ class Service(SimpleService):
models_to_train = list(self.models.keys())
self.n_fit_fail, self.n_fit_success = 0, 0
for model in models_to_train:
+ if model not in self.models:
+ self.model_init(model)
X_train = self.make_features(
df_train[df_train.columns[df_train.columns.str.startswith(f'{model}|')]].values,
train=True, model=model)
@@ -303,13 +362,16 @@ class Service(SimpleService):
data_probability, data_anomaly = {}, {}
for model in self.fitted_at.keys():
model_display_name = self.model_display_names[model]
- X_model = np.nan_to_num(self.make_features(
- self.df_allmetrics[self.df_allmetrics.columns[self.df_allmetrics.columns.str.startswith(f'{model}|')]].values,
- model=model)[-1,:].reshape(1, -1))
try:
+ X_model = np.nan_to_num(
+ self.make_features(
+ self.df_allmetrics[self.df_allmetrics.columns[self.df_allmetrics.columns.str.startswith(f'{model}|')]].values,
+ model=model
+ )[-1,:].reshape(1, -1)
+ )
data_probability[model_display_name + '_prob'] = np.nan_to_num(self.models[model].predict_proba(X_model)[-1][1]) * 10000
data_anomaly[model_display_name + '_anomaly'] = self.models[model].predict(X_model)[-1]
- except Exception:
+ except Exception as _:
#self.info(e)
if model_display_name + '_prob' in self.data_latest:
#self.info(f'prediction failed for {model} at run_counter {self.runs_counter}, using last prediction instead.')
@@ -323,27 +385,42 @@ class Service(SimpleService):
def get_data(self):
+ # initialize to whats available right now
+ if self.reinitialize_at_every_step or len(self.host_charts_dict[self.host]) == 0:
+ self.charts_init()
+ self.custom_models_init()
+ self.model_params_init()
+
# if not all models have been trained then train those we need to
- if len(self.fitted_at) < len(self.models):
+ if len(self.fitted_at) < len(self.models_in_scope):
self.train(
- models_to_train=[m for m in self.models if m not in self.fitted_at],
+ models_to_train=[m for m in self.models_in_scope if m not in self.fitted_at],
train_data_after=self.initial_train_data_after,
- train_data_before=self.initial_train_data_before)
+ train_data_before=self.initial_train_data_before
+ )
# retrain all models as per schedule from config
elif self.train_every_n > 0 and self.runs_counter % self.train_every_n == 0:
+ self.reinitialize()
self.train()
# roll forward previous predictions around a training step to avoid the possibility of having the training itself trigger an anomaly
if (self.runs_counter - self.last_train_at) <= self.train_no_prediction_n:
- data = self.data_latest
+ data_probability = self.data_probability_latest
+ data_anomaly = self.data_anomaly_latest
else:
data_probability, data_anomaly = self.predict()
if self.include_average_prob:
- data_probability['average_prob'] = np.mean(list(data_probability.values()))
- data = {**data_probability, **data_anomaly}
- self.validate_charts('probability', data_probability, divisor=100)
- self.validate_charts('anomaly', data_anomaly)
+ average_prob = np.mean(list(data_probability.values()))
+ data_probability['average_prob'] = 0 if np.isnan(average_prob) else average_prob
+
+ data = {**data_probability, **data_anomaly}
- self.data_latest = data
+ self.validate_charts('probability', data_probability, divisor=100)
+ self.validate_charts('anomaly', data_anomaly)
+
+ self.save_data_latest(data, data_probability, data_anomaly)
+
+ #self.info(f'len(data)={len(data)}')
+ #self.info(f'data')
return data