summaryrefslogtreecommitdiffstats
path: root/src/pybind/mgr/diskprediction_local/predictor.py
blob: 3bbe7a4b7f232f227d1ed0735d4f16296bd9c52b (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
"""Machine learning model for disk failure prediction.

This classes defined here provide the disk failure prediction module.
RHDiskFailurePredictor uses the models developed at the AICoE in the
Office of the CTO at Red Hat. These models were built using the open
source Backblaze SMART metrics dataset.
PSDiskFailurePredictor uses the models developed by ProphetStor as an
example.

An instance of the predictor is initialized by providing the path to trained
models. Then, to predict hard drive health and deduce time to failure, the
predict function is called with 6 days worth of SMART data from the hard drive.
It will return a string to indicate disk failure status: "Good", "Warning",
"Bad", or "Unknown".

An example code is as follows:

>>> model = RHDiskFailurePredictor()
>>> model.initialize(get_diskfailurepredictor_path() + "/models/redhat")
>>> vendor = list(RHDiskFailurePredictor.MANUFACTURER_MODELNAME_PREFIXES.keys())[0]
>>> disk_days = [{'vendor': vendor}]
>>> model.predict(disk_days)
'Unknown'
"""
import os
import json
import pickle
import logging
from typing import Any, Dict, List, Optional, Sequence, Tuple

import numpy as np


def get_diskfailurepredictor_path() -> str:
    path = os.path.abspath(__file__)
    dir_path = os.path.dirname(path)
    return dir_path


DevSmartT = Dict[str, Any]
AttrNamesT = List[str]
AttrDiffsT = List[Dict[str, int]]


class Predictor:
    @classmethod
    def create(cls, name: str) -> Optional['Predictor']:
        if name == 'prophetstor':
            return PSDiskFailurePredictor()
        elif name == 'redhat':
            return RHDiskFailurePredictor()
        else:
            return None

    def initialize(self, model_dir: str) -> None:
        raise NotImplementedError()

    def predict(self, dataset: Sequence[DevSmartT]) -> str:
        raise NotImplementedError()


class RHDiskFailurePredictor(Predictor):
    """Disk failure prediction module developed at Red Hat

    This class implements a disk failure prediction module.
    """

    # json with manufacturer names as keys
    # and features used for prediction as values
    CONFIG_FILE = "config.json"
    PREDICTION_CLASSES = {-1: "Unknown", 0: "Good", 1: "Warning", 2: "Bad"}

    # model name prefixes to identify vendor
    MANUFACTURER_MODELNAME_PREFIXES = {
        "WDC": "WDC",
        "Toshiba": "Toshiba",  # for cases like "Toshiba xxx"
        "TOSHIBA": "Toshiba",  # for cases like "TOSHIBA xxx"
        "toshiba": "Toshiba",  # for cases like "toshiba xxx"
        "S": "Seagate",        # for cases like "STxxxx" and "Seagate BarraCuda ZAxxx"
        "ZA": "Seagate",       # for cases like "ZAxxxx"
        "Hitachi": "Hitachi",
        "HGST": "HGST",
    }

    LOGGER = logging.getLogger()

    def __init__(self) -> None:
        """
        This function may throw exception due to wrong file operation.
        """
        self.model_dirpath = ""
        self.model_context: Dict[str, List[str]] = {}

    def initialize(self, model_dirpath: str) -> None:
        """Initialize all models. Save paths of all trained model files to list

        Arguments:
            model_dirpath {str} -- path to directory of trained models

        Returns:
            str -- Error message. If all goes well, return None
        """
        # read config file as json, if it exists
        config_path = os.path.join(model_dirpath, self.CONFIG_FILE)
        if not os.path.isfile(config_path):
            raise Exception("Missing config file: " + config_path)
        with open(config_path) as f_conf:
            self.model_context = json.load(f_conf)

        # ensure all manufacturers whose context is defined in config file
        # have models and scalers saved inside model_dirpath
        for manufacturer in self.model_context:
            scaler_path = os.path.join(model_dirpath, manufacturer + "_scaler.pkl")
            if not os.path.isfile(scaler_path):
                raise Exception(f"Missing scaler file: {scaler_path}")
            model_path = os.path.join(model_dirpath, manufacturer + "_predictor.pkl")
            if not os.path.isfile(model_path):
                raise Exception(f"Missing model file: {model_path}")

        self.model_dirpath = model_dirpath

    def __preprocess(self, disk_days: Sequence[DevSmartT], manufacturer: str) -> Optional[np.ndarray]:
        """Scales and transforms input dataframe to feed it to prediction model

        Arguments:
            disk_days {list} -- list in which each element is a dictionary with key,val
                                as feature name,value respectively.
                                e.g.[{'smart_1_raw': 0, 'user_capacity': 512 ...}, ...]
            manufacturer {str} -- manufacturer of the hard drive

        Returns:
            numpy.ndarray -- (n, d) shaped array of n days worth of data and d
                                features, scaled
        """
        # get the attributes that were used to train model for current manufacturer
        try:
            model_smart_attr = self.model_context[manufacturer]
        except KeyError:
            RHDiskFailurePredictor.LOGGER.debug(
                "No context (SMART attributes on which model has been trained) found for manufacturer: {}".format(
                    manufacturer
                )
            )
            return None

        # convert to structured array, keeping only the required features
        # assumes all data is in float64 dtype
        try:
            struc_dtypes = [(attr, np.float64) for attr in model_smart_attr]
            values = [tuple(day[attr] for attr in model_smart_attr) for day in disk_days]
            disk_days_sa = np.array(values, dtype=struc_dtypes)
        except KeyError:
            RHDiskFailurePredictor.LOGGER.debug(
                "Mismatch in SMART attributes used to train model and SMART attributes available"
            )
            return None

        # view structured array as 2d array for applying rolling window transforms
        # do not include capacity_bytes in this. only use smart_attrs
        disk_days_attrs = disk_days_sa[[attr for attr in model_smart_attr if 'smart_' in attr]]\
            .view(np.float64).reshape(disk_days_sa.shape + (-1,))

        # featurize n (6 to 12) days data - mean,std,coefficient of variation
        # current model is trained on 6 days of data because that is what will be
        # available at runtime

        # rolling time window interval size in days
        roll_window_size = 6

        # rolling means generator
        dataset_size = disk_days_attrs.shape[0] - roll_window_size + 1
        gen = (disk_days_attrs[i: i + roll_window_size, ...].mean(axis=0)
               for i in range(dataset_size))
        means = np.vstack(gen)  # type: ignore

        # rolling stds generator
        gen = (disk_days_attrs[i: i + roll_window_size, ...].std(axis=0, ddof=1)
               for i in range(dataset_size))
        stds = np.vstack(gen)  # type: ignore

        # coefficient of variation
        cvs = stds / means
        cvs[np.isnan(cvs)] = 0
        featurized = np.hstack((means,
                                stds,
                                cvs,
                                disk_days_sa['user_capacity'][: dataset_size].reshape(-1, 1)))

        # scale features
        scaler_path = os.path.join(self.model_dirpath, manufacturer + "_scaler.pkl")
        with open(scaler_path, 'rb') as f:
            scaler = pickle.load(f)
        featurized = scaler.transform(featurized)
        return featurized

    @staticmethod
    def __get_manufacturer(model_name: str) -> Optional[str]:
        """Returns the manufacturer name for a given hard drive model name

        Arguments:
            model_name {str} -- hard drive model name

        Returns:
            str -- manufacturer name
        """
        for prefix, manufacturer in RHDiskFailurePredictor.MANUFACTURER_MODELNAME_PREFIXES.items():
            if model_name.startswith(prefix):
                return manufacturer.lower()
        # print error message
        RHDiskFailurePredictor.LOGGER.debug(
            f"Could not infer manufacturer from model name {model_name}")
        return None

    def predict(self, disk_days: Sequence[DevSmartT]) -> str:
        # get manufacturer preferably as a smartctl attribute
        # if not available then infer using model name
        manufacturer = disk_days[0].get("vendor")
        if manufacturer is None:
            RHDiskFailurePredictor.LOGGER.debug(
                '"vendor" field not found in smartctl output. Will try to infer manufacturer from model name.'
            )
            manufacturer = RHDiskFailurePredictor.__get_manufacturer(
                disk_days[0].get("model_name", ""))

        # print error message, return Unknown, and continue execution
        if manufacturer is None:
            RHDiskFailurePredictor.LOGGER.debug(
                "Manufacturer could not be determiend. This may be because \
                DiskPredictor has never encountered this manufacturer before, \
                    or the model name is not according to the manufacturer's \
                        naming conventions known to DiskPredictor"
            )
            return RHDiskFailurePredictor.PREDICTION_CLASSES[-1]

        # preprocess for feeding to model
        preprocessed_data = self.__preprocess(disk_days, manufacturer)
        if preprocessed_data is None:
            return RHDiskFailurePredictor.PREDICTION_CLASSES[-1]

        # get model for current manufacturer
        model_path = os.path.join(
            self.model_dirpath, manufacturer + "_predictor.pkl"
        )
        with open(model_path, 'rb') as f:
            model = pickle.load(f)

        # use prediction for most recent day
        # TODO: ensure that most recent day is last element and most previous day
        # is first element in input disk_days
        pred_class_id = model.predict(preprocessed_data)[-1]
        return RHDiskFailurePredictor.PREDICTION_CLASSES[pred_class_id]


class PSDiskFailurePredictor(Predictor):
    """Disk failure prediction developed at ProphetStor

    This class implements a disk failure prediction module.
    """

    CONFIG_FILE = "config.json"
    EXCLUDED_ATTRS = ["smart_9_raw", "smart_241_raw", "smart_242_raw"]

    def __init__(self) -> None:
        """
        This function may throw exception due to wrong file operation.
        """

        self.model_dirpath = ""
        self.model_context: Dict[str, List[str]] = {}

    def initialize(self, model_dirpath: str) -> None:
        """
        Initialize all models.

        Args: None

        Returns:
            Error message. If all goes well, return an empty string.

        Raises:
        """

        config_path = os.path.join(model_dirpath, self.CONFIG_FILE)
        if not os.path.isfile(config_path):
            raise Exception(f"Missing config file: {config_path}")
        with open(config_path) as f_conf:
            self.model_context = json.load(f_conf)

        for model_name in self.model_context:
            model_path = os.path.join(model_dirpath, model_name)

            if not os.path.isfile(model_path):
                raise Exception(f"Missing model file: {model_path}")

        self.model_dirpath = model_dirpath

    def __preprocess(self, disk_days: Sequence[DevSmartT]) -> Sequence[DevSmartT]:
        """
        Preprocess disk attributes.

        Args:
            disk_days: Refer to function predict(...).

        Returns:
            new_disk_days: Processed disk days.
        """

        req_attrs = []
        new_disk_days = []

        attr_list = set.intersection(*[set(disk_day.keys()) for disk_day in disk_days])
        for attr in attr_list:
            if (
                attr.startswith("smart_") and attr.endswith("_raw")
            ) and attr not in self.EXCLUDED_ATTRS:
                req_attrs.append(attr)

        for disk_day in disk_days:
            new_disk_day = {}
            for attr in req_attrs:
                if float(disk_day[attr]) >= 0.0:
                    new_disk_day[attr] = disk_day[attr]

            new_disk_days.append(new_disk_day)

        return new_disk_days

    @staticmethod
    def __get_diff_attrs(disk_days: Sequence[DevSmartT]) -> Tuple[AttrNamesT, AttrDiffsT]:
        """
        Get 5 days differential attributes.

        Args:
            disk_days: Refer to function predict(...).

        Returns:
            attr_list: All S.M.A.R.T. attributes used in given disk. Here we
                       use intersection set of all disk days.

            diff_disk_days: A list struct comprises 5 dictionaries, each
                            dictionary contains differential attributes.

        Raises:
            Exceptions of wrong list/dict operations.
        """

        all_attrs = [set(disk_day.keys()) for disk_day in disk_days]
        attr_list = list(set.intersection(*all_attrs))
        prev_days = disk_days[:-1]
        curr_days = disk_days[1:]
        diff_disk_days = []
        # TODO: ensure that this ordering is correct
        for prev, cur in zip(prev_days, curr_days):
            diff_disk_days.append(
                {attr: (int(cur[attr]) - int(prev[attr])) for attr in attr_list}
            )

        return attr_list, diff_disk_days

    def __get_best_models(self, attr_list: AttrNamesT) -> Optional[Dict[str, List[str]]]:
        """
        Find the best model from model list according to given attribute list.

        Args:
            attr_list: All S.M.A.R.T. attributes used in given disk.

        Returns:
            modelpath: The best model for the given attribute list.
            model_attrlist: 'Ordered' attribute list of the returned model.
                            Must be aware that SMART attributes is in order.

        Raises:
        """

        models = self.model_context.keys()

        scores = []
        for model_name in models:
            scores.append(
                sum(attr in attr_list for attr in self.model_context[model_name])
            )
        max_score = max(scores)

        # Skip if too few matched attributes.
        if max_score < 3:
            print("Too few matched attributes")
            return None

        best_models: Dict[str, List[str]] = {}
        best_model_indices = [
            idx for idx, score in enumerate(scores) if score > max_score - 2
        ]
        for model_idx in best_model_indices:
            model_name = list(models)[model_idx]
            model_path = os.path.join(self.model_dirpath, model_name)
            model_attrlist = self.model_context[model_name]
            best_models[model_path] = model_attrlist

        return best_models
        # return os.path.join(self.model_dirpath, model_name), model_attrlist

    @staticmethod
    def __get_ordered_attrs(disk_days: Sequence[DevSmartT], model_attrlist: List[str]) -> List[List[float]]:
        """
        Return ordered attributes of given disk days.

        Args:
            disk_days: Unordered disk days.
            model_attrlist: Model's ordered attribute list.

        Returns:
            ordered_attrs: Ordered disk days.

        Raises: None
        """

        ordered_attrs = []

        for one_day in disk_days:
            one_day_attrs = []

            for attr in model_attrlist:
                if attr in one_day:
                    one_day_attrs.append(one_day[attr])
                else:
                    one_day_attrs.append(0)

            ordered_attrs.append(one_day_attrs)

        return ordered_attrs

    def predict(self, disk_days: Sequence[DevSmartT]) -> str:
        """
        Predict using given 6-days disk S.M.A.R.T. attributes.

        Args:
            disk_days: A list struct comprises 6 dictionaries. These
                       dictionaries store 'consecutive' days of disk SMART
                       attributes.
        Returns:
            A string indicates prediction result. One of following four strings
            will be returned according to disk failure status:
            (1) Good : Disk is health
            (2) Warning : Disk has some symptoms but may not fail immediately
            (3) Bad : Disk is in danger and data backup is highly recommended
            (4) Unknown : Not enough data for prediction.

        Raises:
            Pickle exceptions
        """

        all_pred = []

        proc_disk_days = self.__preprocess(disk_days)
        attr_list, diff_data = PSDiskFailurePredictor.__get_diff_attrs(proc_disk_days)
        modellist = self.__get_best_models(attr_list)
        if modellist is None:
            return "Unknown"

        for modelpath in modellist:
            model_attrlist = modellist[modelpath]
            ordered_data = PSDiskFailurePredictor.__get_ordered_attrs(
                diff_data, model_attrlist
            )

            try:
                with open(modelpath, "rb") as f_model:
                    clf = pickle.load(f_model)

            except UnicodeDecodeError:
                # Compatibility for python3
                with open(modelpath, "rb") as f_model:
                    clf = pickle.load(f_model, encoding="latin1")

            pred = clf.predict(ordered_data)

            all_pred.append(1 if any(pred) else 0)

        score = 2 ** sum(all_pred) - len(modellist)
        if score > 10:
            return "Bad"
        if score > 4:
            return "Warning"
        return "Good"