diff options
Diffstat (limited to 'collectors/python.d.plugin/anomalies')
-rw-r--r-- | collectors/python.d.plugin/anomalies/README.md | 21 | ||||
-rw-r--r-- | collectors/python.d.plugin/anomalies/anomalies.chart.py | 125 | ||||
-rw-r--r-- | collectors/python.d.plugin/anomalies/anomalies.conf | 3 |
3 files changed, 120 insertions, 29 deletions
diff --git a/collectors/python.d.plugin/anomalies/README.md b/collectors/python.d.plugin/anomalies/README.md index bcbfdbcd7..9d24e8685 100644 --- a/collectors/python.d.plugin/anomalies/README.md +++ b/collectors/python.d.plugin/anomalies/README.md @@ -35,18 +35,26 @@ Then, as the issue passes, the anomaly probabilities should settle back down int ## Requirements - This collector will only work with Python 3 and requires the packages below be installed. +- Typically you will not need to do this, but, if needed, to ensure Python 3 is used you can add the below line to the `[plugin:python.d]` section of `netdata.conf` + +```conf +[plugin:python.d] + # update every = 1 + command options = -ppython3 +``` + +Install the required python libraries. ```bash # become netdata user sudo su -s /bin/bash netdata # install required packages for the netdata user -pip3 install --user netdata-pandas==0.0.32 numba==0.50.1 scikit-learn==0.23.2 pyod==0.8.3 +pip3 install --user netdata-pandas==0.0.38 numba==0.50.1 scikit-learn==0.23.2 pyod==0.8.3 ``` ## Configuration -Install the Python requirements above, enable the collector and [restart -Netdata](/docs/configure/start-stop-restart.md). +Install the Python requirements above, enable the collector and restart Netdata. ```bash cd /etc/netdata/ @@ -69,7 +77,7 @@ sudo ./edit-config python.d/anomalies.conf The default configuration should look something like this. Here you can see each parameter (with sane defaults) and some information about each one and what it does. -```yaml +```conf # ---------------------------------------------------------------------- # JOBS (data collection sources) @@ -87,6 +95,9 @@ local: # Use http or https to pull data protocol: 'http' + # SSL verify parameter for requests.get() calls + tls_verify: true + # What charts to pull data for - A regex like 'system\..*|' or 'system\..*|apps.cpu|apps.mem' etc. charts_regex: 'system\..*' @@ -229,4 +240,4 @@ If you would like to go deeper on what exactly the anomalies collector is doing - Good [blog post](https://www.anodot.com/blog/what-is-anomaly-detection/) from Anodot on time series anomaly detection. Anodot also have some great whitepapers in this space too that some may find useful. - Novelty and outlier detection in the [scikit-learn documentation](https://scikit-learn.org/stable/modules/outlier_detection.html). -[![analytics](https://www.google-analytics.com/collect?v=1&aip=1&t=pageview&_s=1&ds=github&dr=https%3A%2F%2Fgithub.com%2Fnetdata%2Fnetdata&dl=https%3A%2F%2Fmy-netdata.io%2Fgithub%2Fcollectors%2Fpython.d.plugin%2Fanomalies%2FREADME&_u=MAC~&cid=5792dfd7-8dc4-476b-af31-da2fdb9f93d2&tid=UA-64295674-3)]() +[![analytics](https://www.google-analytics.com/collect?v=1&aip=1&t=pageview&_s=1&ds=github&dr=https%3A%2F%2Fgithub.com%2Fnetdata%2Fnetdata&dl=https%3A%2F%2Fmy-netdata.io%2Fgithub%2Fcollectors%2Fpython.d.plugin%2Fanomalies%2FREADME&_u=MAC~&cid=5792dfd7-8dc4-476b-af31-da2fdb9f93d2&tid=UA-64295674-3)]()
\ No newline at end of file diff --git a/collectors/python.d.plugin/anomalies/anomalies.chart.py b/collectors/python.d.plugin/anomalies/anomalies.chart.py index 97dbb1d1e..61b51d9c0 100644 --- a/collectors/python.d.plugin/anomalies/anomalies.chart.py +++ b/collectors/python.d.plugin/anomalies/anomalies.chart.py @@ -3,6 +3,7 @@ # Author: andrewm4894 # SPDX-License-Identifier: GPL-3.0-or-later +import sys import time from datetime import datetime import re @@ -51,14 +52,17 @@ class Service(SimpleService): self.basic_init() self.charts_init() self.custom_models_init() + self.data_init() self.model_params_init() self.models_init() + self.collected_dims = {'probability': set(), 'anomaly': set()} def check(self): - _ = get_allmetrics_async( - host_charts_dict=self.host_charts_dict, host_prefix=True, host_sep='::', wide=True, sort_cols=True, - protocol=self.protocol, numeric_only=True, float_size='float32', user=self.username, pwd=self.password - ) + python_version = float('{}.{}'.format(sys.version_info[0], sys.version_info[1])) + if python_version < 3.6: + self.error("anomalies collector only works with Python>=3.6") + if len(self.host_charts_dict[self.host]) > 0: + _ = get_allmetrics_async(host_charts_dict=self.host_charts_dict, protocol=self.protocol, user=self.username, pwd=self.password) return True def basic_init(self): @@ -70,17 +74,18 @@ class Service(SimpleService): self.host = self.configuration.get('host', '127.0.0.1:19999') self.username = self.configuration.get('username', None) self.password = self.configuration.get('password', None) + self.tls_verify = self.configuration.get('tls_verify', True) self.fitted_at = {} self.df_allmetrics = pd.DataFrame() - self.data_latest = {} self.last_train_at = 0 self.include_average_prob = bool(self.configuration.get('include_average_prob', True)) + self.reinitialize_at_every_step = bool(self.configuration.get('reinitialize_at_every_step', False)) def charts_init(self): """Do some initialisation of charts in scope related variables. """ self.charts_regex = re.compile(self.configuration.get('charts_regex','None')) - self.charts_available = [c for c in list(requests.get(f'{self.protocol}://{self.host}/api/v1/charts').json().get('charts', {}).keys())] + self.charts_available = [c for c in list(requests.get(f'{self.protocol}://{self.host}/api/v1/charts', verify=self.tls_verify).json().get('charts', {}).keys())] self.charts_in_scope = list(filter(self.charts_regex.match, self.charts_available)) self.charts_to_exclude = self.configuration.get('charts_to_exclude', '').split(',') if len(self.charts_to_exclude) > 0: @@ -115,6 +120,14 @@ class Service(SimpleService): self.models_in_scope = [f'{self.host}::{c}' for c in self.charts_in_scope] self.host_charts_dict = {self.host: self.charts_in_scope} self.model_display_names = {model: model.split('::')[1] if '::' in model else model for model in self.models_in_scope} + #self.info(f'self.host_charts_dict (len={len(self.host_charts_dict[self.host])}): {self.host_charts_dict}') + + def data_init(self): + """Initialize some empty data objects. + """ + self.data_probability_latest = {f'{m}_prob': 0 for m in self.charts_in_scope} + self.data_anomaly_latest = {f'{m}_anomaly': 0 for m in self.charts_in_scope} + self.data_latest = {**self.data_probability_latest, **self.data_anomaly_latest} def model_params_init(self): """Model parameters initialisation. @@ -153,12 +166,55 @@ class Service(SimpleService): self.models = {model: HBOS(contamination=self.contamination) for model in self.models_in_scope} self.custom_model_scalers = {model: MinMaxScaler() for model in self.models_in_scope} - def validate_charts(self, name, data, algorithm='absolute', multiplier=1, divisor=1): + def model_init(self, model): + """Model initialisation of a single model. + """ + if self.model == 'pca': + self.models[model] = PCA(contamination=self.contamination) + elif self.model == 'loda': + self.models[model] = LODA(contamination=self.contamination) + elif self.model == 'iforest': + self.models[model] = IForest(n_estimators=50, bootstrap=True, behaviour='new', contamination=self.contamination) + elif self.model == 'cblof': + self.models[model] = CBLOF(n_clusters=3, contamination=self.contamination) + elif self.model == 'feature_bagging': + self.models[model] = FeatureBagging(base_estimator=PCA(contamination=self.contamination), contamination=self.contamination) + elif self.model == 'copod': + self.models[model] = COPOD(contamination=self.contamination) + elif self.model == 'hbos': + self.models[model] = HBOS(contamination=self.contamination) + else: + self.models[model] = HBOS(contamination=self.contamination) + self.custom_model_scalers[model] = MinMaxScaler() + + def reinitialize(self): + """Reinitialize charts, models and data to a begining state. + """ + self.charts_init() + self.custom_models_init() + self.data_init() + self.model_params_init() + self.models_init() + + def save_data_latest(self, data, data_probability, data_anomaly): + """Save the most recent data objects to be used if needed in the future. + """ + self.data_latest = data + self.data_probability_latest = data_probability + self.data_anomaly_latest = data_anomaly + + def validate_charts(self, chart, data, algorithm='absolute', multiplier=1, divisor=1): """If dimension not in chart then add it. """ for dim in data: - if dim not in self.charts[name]: - self.charts[name].add_dimension([dim, dim, algorithm, multiplier, divisor]) + if dim not in self.collected_dims[chart]: + self.collected_dims[chart].add(dim) + self.charts[chart].add_dimension([dim, dim, algorithm, multiplier, divisor]) + + for dim in list(self.collected_dims[chart]): + if dim not in data: + self.collected_dims[chart].remove(dim) + self.charts[chart].del_dimension(dim, hide=False) def add_custom_models_dims(self, df): """Given a df, select columns used by custom models, add custom model name as prefix, and append to df. @@ -242,8 +298,9 @@ class Service(SimpleService): # get training data df_train = get_data( host_charts_dict=self.host_charts_dict, host_prefix=True, host_sep='::', after=after, before=before, - sort_cols=True, numeric_only=True, protocol=self.protocol, float_size='float32', user=self.username, pwd=self.password - ).ffill() + sort_cols=True, numeric_only=True, protocol=self.protocol, float_size='float32', user=self.username, pwd=self.password, + verify=self.tls_verify + ).ffill() if self.custom_models: df_train = self.add_custom_models_dims(df_train) @@ -262,6 +319,8 @@ class Service(SimpleService): models_to_train = list(self.models.keys()) self.n_fit_fail, self.n_fit_success = 0, 0 for model in models_to_train: + if model not in self.models: + self.model_init(model) X_train = self.make_features( df_train[df_train.columns[df_train.columns.str.startswith(f'{model}|')]].values, train=True, model=model) @@ -303,13 +362,16 @@ class Service(SimpleService): data_probability, data_anomaly = {}, {} for model in self.fitted_at.keys(): model_display_name = self.model_display_names[model] - X_model = np.nan_to_num(self.make_features( - self.df_allmetrics[self.df_allmetrics.columns[self.df_allmetrics.columns.str.startswith(f'{model}|')]].values, - model=model)[-1,:].reshape(1, -1)) try: + X_model = np.nan_to_num( + self.make_features( + self.df_allmetrics[self.df_allmetrics.columns[self.df_allmetrics.columns.str.startswith(f'{model}|')]].values, + model=model + )[-1,:].reshape(1, -1) + ) data_probability[model_display_name + '_prob'] = np.nan_to_num(self.models[model].predict_proba(X_model)[-1][1]) * 10000 data_anomaly[model_display_name + '_anomaly'] = self.models[model].predict(X_model)[-1] - except Exception: + except Exception as _: #self.info(e) if model_display_name + '_prob' in self.data_latest: #self.info(f'prediction failed for {model} at run_counter {self.runs_counter}, using last prediction instead.') @@ -323,27 +385,42 @@ class Service(SimpleService): def get_data(self): + # initialize to whats available right now + if self.reinitialize_at_every_step or len(self.host_charts_dict[self.host]) == 0: + self.charts_init() + self.custom_models_init() + self.model_params_init() + # if not all models have been trained then train those we need to - if len(self.fitted_at) < len(self.models): + if len(self.fitted_at) < len(self.models_in_scope): self.train( - models_to_train=[m for m in self.models if m not in self.fitted_at], + models_to_train=[m for m in self.models_in_scope if m not in self.fitted_at], train_data_after=self.initial_train_data_after, - train_data_before=self.initial_train_data_before) + train_data_before=self.initial_train_data_before + ) # retrain all models as per schedule from config elif self.train_every_n > 0 and self.runs_counter % self.train_every_n == 0: + self.reinitialize() self.train() # roll forward previous predictions around a training step to avoid the possibility of having the training itself trigger an anomaly if (self.runs_counter - self.last_train_at) <= self.train_no_prediction_n: - data = self.data_latest + data_probability = self.data_probability_latest + data_anomaly = self.data_anomaly_latest else: data_probability, data_anomaly = self.predict() if self.include_average_prob: - data_probability['average_prob'] = np.mean(list(data_probability.values())) - data = {**data_probability, **data_anomaly} - self.validate_charts('probability', data_probability, divisor=100) - self.validate_charts('anomaly', data_anomaly) + average_prob = np.mean(list(data_probability.values())) + data_probability['average_prob'] = 0 if np.isnan(average_prob) else average_prob + + data = {**data_probability, **data_anomaly} - self.data_latest = data + self.validate_charts('probability', data_probability, divisor=100) + self.validate_charts('anomaly', data_anomaly) + + self.save_data_latest(data, data_probability, data_anomaly) + + #self.info(f'len(data)={len(data)}') + #self.info(f'data') return data diff --git a/collectors/python.d.plugin/anomalies/anomalies.conf b/collectors/python.d.plugin/anomalies/anomalies.conf index 9950534aa..0dc40ef2c 100644 --- a/collectors/python.d.plugin/anomalies/anomalies.conf +++ b/collectors/python.d.plugin/anomalies/anomalies.conf @@ -44,6 +44,9 @@ local: # Use http or https to pull data protocol: 'http' + # SSL verify parameter for requests.get() calls + tls_verify: true + # What charts to pull data for - A regex like 'system\..*|' or 'system\..*|apps.cpu|apps.mem' etc. charts_regex: 'system\..*' |