diff options
Diffstat (limited to 'src/pybind/mgr/diskprediction_local/predictor.py')
-rw-r--r-- | src/pybind/mgr/diskprediction_local/predictor.py | 484 |
1 files changed, 484 insertions, 0 deletions
diff --git a/src/pybind/mgr/diskprediction_local/predictor.py b/src/pybind/mgr/diskprediction_local/predictor.py new file mode 100644 index 000000000..3bbe7a4b7 --- /dev/null +++ b/src/pybind/mgr/diskprediction_local/predictor.py @@ -0,0 +1,484 @@ +"""Machine learning model for disk failure prediction. + +This classes defined here provide the disk failure prediction module. +RHDiskFailurePredictor uses the models developed at the AICoE in the +Office of the CTO at Red Hat. These models were built using the open +source Backblaze SMART metrics dataset. +PSDiskFailurePredictor uses the models developed by ProphetStor as an +example. + +An instance of the predictor is initialized by providing the path to trained +models. Then, to predict hard drive health and deduce time to failure, the +predict function is called with 6 days worth of SMART data from the hard drive. +It will return a string to indicate disk failure status: "Good", "Warning", +"Bad", or "Unknown". + +An example code is as follows: + +>>> model = RHDiskFailurePredictor() +>>> model.initialize(get_diskfailurepredictor_path() + "/models/redhat") +>>> vendor = list(RHDiskFailurePredictor.MANUFACTURER_MODELNAME_PREFIXES.keys())[0] +>>> disk_days = [{'vendor': vendor}] +>>> model.predict(disk_days) +'Unknown' +""" +import os +import json +import pickle +import logging +from typing import Any, Dict, List, Optional, Sequence, Tuple + +import numpy as np + + +def get_diskfailurepredictor_path() -> str: + path = os.path.abspath(__file__) + dir_path = os.path.dirname(path) + return dir_path + + +DevSmartT = Dict[str, Any] +AttrNamesT = List[str] +AttrDiffsT = List[Dict[str, int]] + + +class Predictor: + @classmethod + def create(cls, name: str) -> Optional['Predictor']: + if name == 'prophetstor': + return PSDiskFailurePredictor() + elif name == 'redhat': + return RHDiskFailurePredictor() + else: + return None + + def initialize(self, model_dir: str) -> None: + raise NotImplementedError() + + def predict(self, dataset: Sequence[DevSmartT]) -> str: + raise NotImplementedError() + + +class RHDiskFailurePredictor(Predictor): + """Disk failure prediction module developed at Red Hat + + This class implements a disk failure prediction module. + """ + + # json with manufacturer names as keys + # and features used for prediction as values + CONFIG_FILE = "config.json" + PREDICTION_CLASSES = {-1: "Unknown", 0: "Good", 1: "Warning", 2: "Bad"} + + # model name prefixes to identify vendor + MANUFACTURER_MODELNAME_PREFIXES = { + "WDC": "WDC", + "Toshiba": "Toshiba", # for cases like "Toshiba xxx" + "TOSHIBA": "Toshiba", # for cases like "TOSHIBA xxx" + "toshiba": "Toshiba", # for cases like "toshiba xxx" + "S": "Seagate", # for cases like "STxxxx" and "Seagate BarraCuda ZAxxx" + "ZA": "Seagate", # for cases like "ZAxxxx" + "Hitachi": "Hitachi", + "HGST": "HGST", + } + + LOGGER = logging.getLogger() + + def __init__(self) -> None: + """ + This function may throw exception due to wrong file operation. + """ + self.model_dirpath = "" + self.model_context: Dict[str, List[str]] = {} + + def initialize(self, model_dirpath: str) -> None: + """Initialize all models. Save paths of all trained model files to list + + Arguments: + model_dirpath {str} -- path to directory of trained models + + Returns: + str -- Error message. If all goes well, return None + """ + # read config file as json, if it exists + config_path = os.path.join(model_dirpath, self.CONFIG_FILE) + if not os.path.isfile(config_path): + raise Exception("Missing config file: " + config_path) + with open(config_path) as f_conf: + self.model_context = json.load(f_conf) + + # ensure all manufacturers whose context is defined in config file + # have models and scalers saved inside model_dirpath + for manufacturer in self.model_context: + scaler_path = os.path.join(model_dirpath, manufacturer + "_scaler.pkl") + if not os.path.isfile(scaler_path): + raise Exception(f"Missing scaler file: {scaler_path}") + model_path = os.path.join(model_dirpath, manufacturer + "_predictor.pkl") + if not os.path.isfile(model_path): + raise Exception(f"Missing model file: {model_path}") + + self.model_dirpath = model_dirpath + + def __preprocess(self, disk_days: Sequence[DevSmartT], manufacturer: str) -> Optional[np.ndarray]: + """Scales and transforms input dataframe to feed it to prediction model + + Arguments: + disk_days {list} -- list in which each element is a dictionary with key,val + as feature name,value respectively. + e.g.[{'smart_1_raw': 0, 'user_capacity': 512 ...}, ...] + manufacturer {str} -- manufacturer of the hard drive + + Returns: + numpy.ndarray -- (n, d) shaped array of n days worth of data and d + features, scaled + """ + # get the attributes that were used to train model for current manufacturer + try: + model_smart_attr = self.model_context[manufacturer] + except KeyError: + RHDiskFailurePredictor.LOGGER.debug( + "No context (SMART attributes on which model has been trained) found for manufacturer: {}".format( + manufacturer + ) + ) + return None + + # convert to structured array, keeping only the required features + # assumes all data is in float64 dtype + try: + struc_dtypes = [(attr, np.float64) for attr in model_smart_attr] + values = [tuple(day[attr] for attr in model_smart_attr) for day in disk_days] + disk_days_sa = np.array(values, dtype=struc_dtypes) + except KeyError: + RHDiskFailurePredictor.LOGGER.debug( + "Mismatch in SMART attributes used to train model and SMART attributes available" + ) + return None + + # view structured array as 2d array for applying rolling window transforms + # do not include capacity_bytes in this. only use smart_attrs + disk_days_attrs = disk_days_sa[[attr for attr in model_smart_attr if 'smart_' in attr]]\ + .view(np.float64).reshape(disk_days_sa.shape + (-1,)) + + # featurize n (6 to 12) days data - mean,std,coefficient of variation + # current model is trained on 6 days of data because that is what will be + # available at runtime + + # rolling time window interval size in days + roll_window_size = 6 + + # rolling means generator + dataset_size = disk_days_attrs.shape[0] - roll_window_size + 1 + gen = (disk_days_attrs[i: i + roll_window_size, ...].mean(axis=0) + for i in range(dataset_size)) + means = np.vstack(gen) # type: ignore + + # rolling stds generator + gen = (disk_days_attrs[i: i + roll_window_size, ...].std(axis=0, ddof=1) + for i in range(dataset_size)) + stds = np.vstack(gen) # type: ignore + + # coefficient of variation + cvs = stds / means + cvs[np.isnan(cvs)] = 0 + featurized = np.hstack((means, + stds, + cvs, + disk_days_sa['user_capacity'][: dataset_size].reshape(-1, 1))) + + # scale features + scaler_path = os.path.join(self.model_dirpath, manufacturer + "_scaler.pkl") + with open(scaler_path, 'rb') as f: + scaler = pickle.load(f) + featurized = scaler.transform(featurized) + return featurized + + @staticmethod + def __get_manufacturer(model_name: str) -> Optional[str]: + """Returns the manufacturer name for a given hard drive model name + + Arguments: + model_name {str} -- hard drive model name + + Returns: + str -- manufacturer name + """ + for prefix, manufacturer in RHDiskFailurePredictor.MANUFACTURER_MODELNAME_PREFIXES.items(): + if model_name.startswith(prefix): + return manufacturer.lower() + # print error message + RHDiskFailurePredictor.LOGGER.debug( + f"Could not infer manufacturer from model name {model_name}") + return None + + def predict(self, disk_days: Sequence[DevSmartT]) -> str: + # get manufacturer preferably as a smartctl attribute + # if not available then infer using model name + manufacturer = disk_days[0].get("vendor") + if manufacturer is None: + RHDiskFailurePredictor.LOGGER.debug( + '"vendor" field not found in smartctl output. Will try to infer manufacturer from model name.' + ) + manufacturer = RHDiskFailurePredictor.__get_manufacturer( + disk_days[0].get("model_name", "")) + + # print error message, return Unknown, and continue execution + if manufacturer is None: + RHDiskFailurePredictor.LOGGER.debug( + "Manufacturer could not be determiend. This may be because \ + DiskPredictor has never encountered this manufacturer before, \ + or the model name is not according to the manufacturer's \ + naming conventions known to DiskPredictor" + ) + return RHDiskFailurePredictor.PREDICTION_CLASSES[-1] + + # preprocess for feeding to model + preprocessed_data = self.__preprocess(disk_days, manufacturer) + if preprocessed_data is None: + return RHDiskFailurePredictor.PREDICTION_CLASSES[-1] + + # get model for current manufacturer + model_path = os.path.join( + self.model_dirpath, manufacturer + "_predictor.pkl" + ) + with open(model_path, 'rb') as f: + model = pickle.load(f) + + # use prediction for most recent day + # TODO: ensure that most recent day is last element and most previous day + # is first element in input disk_days + pred_class_id = model.predict(preprocessed_data)[-1] + return RHDiskFailurePredictor.PREDICTION_CLASSES[pred_class_id] + + +class PSDiskFailurePredictor(Predictor): + """Disk failure prediction developed at ProphetStor + + This class implements a disk failure prediction module. + """ + + CONFIG_FILE = "config.json" + EXCLUDED_ATTRS = ["smart_9_raw", "smart_241_raw", "smart_242_raw"] + + def __init__(self) -> None: + """ + This function may throw exception due to wrong file operation. + """ + + self.model_dirpath = "" + self.model_context: Dict[str, List[str]] = {} + + def initialize(self, model_dirpath: str) -> None: + """ + Initialize all models. + + Args: None + + Returns: + Error message. If all goes well, return an empty string. + + Raises: + """ + + config_path = os.path.join(model_dirpath, self.CONFIG_FILE) + if not os.path.isfile(config_path): + raise Exception(f"Missing config file: {config_path}") + with open(config_path) as f_conf: + self.model_context = json.load(f_conf) + + for model_name in self.model_context: + model_path = os.path.join(model_dirpath, model_name) + + if not os.path.isfile(model_path): + raise Exception(f"Missing model file: {model_path}") + + self.model_dirpath = model_dirpath + + def __preprocess(self, disk_days: Sequence[DevSmartT]) -> Sequence[DevSmartT]: + """ + Preprocess disk attributes. + + Args: + disk_days: Refer to function predict(...). + + Returns: + new_disk_days: Processed disk days. + """ + + req_attrs = [] + new_disk_days = [] + + attr_list = set.intersection(*[set(disk_day.keys()) for disk_day in disk_days]) + for attr in attr_list: + if ( + attr.startswith("smart_") and attr.endswith("_raw") + ) and attr not in self.EXCLUDED_ATTRS: + req_attrs.append(attr) + + for disk_day in disk_days: + new_disk_day = {} + for attr in req_attrs: + if float(disk_day[attr]) >= 0.0: + new_disk_day[attr] = disk_day[attr] + + new_disk_days.append(new_disk_day) + + return new_disk_days + + @staticmethod + def __get_diff_attrs(disk_days: Sequence[DevSmartT]) -> Tuple[AttrNamesT, AttrDiffsT]: + """ + Get 5 days differential attributes. + + Args: + disk_days: Refer to function predict(...). + + Returns: + attr_list: All S.M.A.R.T. attributes used in given disk. Here we + use intersection set of all disk days. + + diff_disk_days: A list struct comprises 5 dictionaries, each + dictionary contains differential attributes. + + Raises: + Exceptions of wrong list/dict operations. + """ + + all_attrs = [set(disk_day.keys()) for disk_day in disk_days] + attr_list = list(set.intersection(*all_attrs)) + prev_days = disk_days[:-1] + curr_days = disk_days[1:] + diff_disk_days = [] + # TODO: ensure that this ordering is correct + for prev, cur in zip(prev_days, curr_days): + diff_disk_days.append( + {attr: (int(cur[attr]) - int(prev[attr])) for attr in attr_list} + ) + + return attr_list, diff_disk_days + + def __get_best_models(self, attr_list: AttrNamesT) -> Optional[Dict[str, List[str]]]: + """ + Find the best model from model list according to given attribute list. + + Args: + attr_list: All S.M.A.R.T. attributes used in given disk. + + Returns: + modelpath: The best model for the given attribute list. + model_attrlist: 'Ordered' attribute list of the returned model. + Must be aware that SMART attributes is in order. + + Raises: + """ + + models = self.model_context.keys() + + scores = [] + for model_name in models: + scores.append( + sum(attr in attr_list for attr in self.model_context[model_name]) + ) + max_score = max(scores) + + # Skip if too few matched attributes. + if max_score < 3: + print("Too few matched attributes") + return None + + best_models: Dict[str, List[str]] = {} + best_model_indices = [ + idx for idx, score in enumerate(scores) if score > max_score - 2 + ] + for model_idx in best_model_indices: + model_name = list(models)[model_idx] + model_path = os.path.join(self.model_dirpath, model_name) + model_attrlist = self.model_context[model_name] + best_models[model_path] = model_attrlist + + return best_models + # return os.path.join(self.model_dirpath, model_name), model_attrlist + + @staticmethod + def __get_ordered_attrs(disk_days: Sequence[DevSmartT], model_attrlist: List[str]) -> List[List[float]]: + """ + Return ordered attributes of given disk days. + + Args: + disk_days: Unordered disk days. + model_attrlist: Model's ordered attribute list. + + Returns: + ordered_attrs: Ordered disk days. + + Raises: None + """ + + ordered_attrs = [] + + for one_day in disk_days: + one_day_attrs = [] + + for attr in model_attrlist: + if attr in one_day: + one_day_attrs.append(one_day[attr]) + else: + one_day_attrs.append(0) + + ordered_attrs.append(one_day_attrs) + + return ordered_attrs + + def predict(self, disk_days: Sequence[DevSmartT]) -> str: + """ + Predict using given 6-days disk S.M.A.R.T. attributes. + + Args: + disk_days: A list struct comprises 6 dictionaries. These + dictionaries store 'consecutive' days of disk SMART + attributes. + Returns: + A string indicates prediction result. One of following four strings + will be returned according to disk failure status: + (1) Good : Disk is health + (2) Warning : Disk has some symptoms but may not fail immediately + (3) Bad : Disk is in danger and data backup is highly recommended + (4) Unknown : Not enough data for prediction. + + Raises: + Pickle exceptions + """ + + all_pred = [] + + proc_disk_days = self.__preprocess(disk_days) + attr_list, diff_data = PSDiskFailurePredictor.__get_diff_attrs(proc_disk_days) + modellist = self.__get_best_models(attr_list) + if modellist is None: + return "Unknown" + + for modelpath in modellist: + model_attrlist = modellist[modelpath] + ordered_data = PSDiskFailurePredictor.__get_ordered_attrs( + diff_data, model_attrlist + ) + + try: + with open(modelpath, "rb") as f_model: + clf = pickle.load(f_model) + + except UnicodeDecodeError: + # Compatibility for python3 + with open(modelpath, "rb") as f_model: + clf = pickle.load(f_model, encoding="latin1") + + pred = clf.predict(ordered_data) + + all_pred.append(1 if any(pred) else 0) + + score = 2 ** sum(all_pred) - len(modellist) + if score > 10: + return "Bad" + if score > 4: + return "Warning" + return "Good" |