summaryrefslogtreecommitdiffstats
path: root/src/rocksdb/tools/advisor/advisor/db_timeseries_parser.py
blob: 308eb139ae4da642022886a7dc6b2c829944b44d (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
# Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
#  This source code is licensed under both the GPLv2 (found in the
#  COPYING file in the root directory) and Apache 2.0 License
#  (found in the LICENSE.Apache file in the root directory).

from abc import abstractmethod
from advisor.db_log_parser import DataSource
from enum import Enum
import math


NO_ENTITY = 'ENTITY_PLACEHOLDER'


class TimeSeriesData(DataSource):
    class Behavior(Enum):
        bursty = 1
        evaluate_expression = 2

    class AggregationOperator(Enum):
        avg = 1
        max = 2
        min = 3
        latest = 4
        oldest = 5

    def __init__(self):
        super().__init__(DataSource.Type.TIME_SERIES)
        self.keys_ts = None  # Dict[entity, Dict[key, Dict[timestamp, value]]]
        self.stats_freq_sec = None

    @abstractmethod
    def get_keys_from_conditions(self, conditions):
        # This method takes in a list of time-series conditions; for each
        # condition it manipulates the 'keys' in the way that is supported by
        # the subclass implementing this method
        pass

    @abstractmethod
    def fetch_timeseries(self, required_statistics):
        # this method takes in a list of statistics and fetches the timeseries
        # for each of them and populates the 'keys_ts' dictionary
        pass

    def fetch_burst_epochs(
        self, entities, statistic, window_sec, threshold, percent
    ):
        # type: (str, int, float, bool) -> Dict[str, Dict[int, float]]
        # this method calculates the (percent) rate change in the 'statistic'
        # for each entity (over 'window_sec' seconds) and returns the epochs
        # where this rate change is greater than or equal to the 'threshold'
        # value
        if self.stats_freq_sec == 0:
            # not time series data, cannot check for bursty behavior
            return
        if window_sec < self.stats_freq_sec:
            window_sec = self.stats_freq_sec
        # 'window_samples' is the number of windows to go back to
        # compare the current window with, while calculating rate change.
        window_samples = math.ceil(window_sec / self.stats_freq_sec)
        burst_epochs = {}
        # if percent = False:
        # curr_val = value at window for which rate change is being calculated
        # prev_val = value at window that is window_samples behind curr_window
        # Then rate_without_percent =
        # ((curr_val-prev_val)*duration_sec)/(curr_timestamp-prev_timestamp)
        # if percent = True:
        # rate_with_percent = (rate_without_percent * 100) / prev_val
        # These calculations are in line with the rate() transform supported
        # by ODS
        for entity in entities:
            if statistic not in self.keys_ts[entity]:
                continue
            timestamps = sorted(list(self.keys_ts[entity][statistic].keys()))
            for ix in range(window_samples, len(timestamps), 1):
                first_ts = timestamps[ix - window_samples]
                last_ts = timestamps[ix]
                first_val = self.keys_ts[entity][statistic][first_ts]
                last_val = self.keys_ts[entity][statistic][last_ts]
                diff = last_val - first_val
                if percent:
                    diff = diff * 100 / first_val
                rate = (diff * self.duration_sec) / (last_ts - first_ts)
                # if the rate change is greater than the provided threshold,
                # then the condition is triggered for entity at time 'last_ts'
                if rate >= threshold:
                    if entity not in burst_epochs:
                        burst_epochs[entity] = {}
                    burst_epochs[entity][last_ts] = rate
        return burst_epochs

    def fetch_aggregated_values(self, entity, statistics, aggregation_op):
        # type: (str, AggregationOperator) -> Dict[str, float]
        # this method performs the aggregation specified by 'aggregation_op'
        # on the timeseries of 'statistics' for 'entity' and returns:
        # Dict[statistic, aggregated_value]
        result = {}
        for stat in statistics:
            if stat not in self.keys_ts[entity]:
                continue
            agg_val = None
            if aggregation_op is self.AggregationOperator.latest:
                latest_timestamp = max(list(self.keys_ts[entity][stat].keys()))
                agg_val = self.keys_ts[entity][stat][latest_timestamp]
            elif aggregation_op is self.AggregationOperator.oldest:
                oldest_timestamp = min(list(self.keys_ts[entity][stat].keys()))
                agg_val = self.keys_ts[entity][stat][oldest_timestamp]
            elif aggregation_op is self.AggregationOperator.max:
                agg_val = max(list(self.keys_ts[entity][stat].values()))
            elif aggregation_op is self.AggregationOperator.min:
                agg_val = min(list(self.keys_ts[entity][stat].values()))
            elif aggregation_op is self.AggregationOperator.avg:
                values = list(self.keys_ts[entity][stat].values())
                agg_val = sum(values) / len(values)
            result[stat] = agg_val
        return result

    def check_and_trigger_conditions(self, conditions):
        # get the list of statistics that need to be fetched
        reqd_keys = self.get_keys_from_conditions(conditions)
        # fetch the required statistics and populate the map 'keys_ts'
        self.fetch_timeseries(reqd_keys)
        # Trigger the appropriate conditions
        for cond in conditions:
            complete_keys = self.get_keys_from_conditions([cond])
            # Get the entities that have all statistics required by 'cond':
            # an entity is checked for a given condition only if we possess all
            # of the condition's 'keys' for that entity
            entities_with_stats = []
            for entity in self.keys_ts:
                stat_missing = False
                for stat in complete_keys:
                    if stat not in self.keys_ts[entity]:
                        stat_missing = True
                        break
                if not stat_missing:
                    entities_with_stats.append(entity)
            if not entities_with_stats:
                continue
            if cond.behavior is self.Behavior.bursty:
                # for a condition that checks for bursty behavior, only one key
                # should be present in the condition's 'keys' field
                result = self.fetch_burst_epochs(
                    entities_with_stats,
                    complete_keys[0],  # there should be only one key
                    cond.window_sec,
                    cond.rate_threshold,
                    True
                )
                # Trigger in this case is:
                # Dict[entity_name, Dict[timestamp, rate_change]]
                # where the inner dictionary contains rate_change values when
                # the rate_change >= threshold provided, with the
                # corresponding timestamps
                if result:
                    cond.set_trigger(result)
            elif cond.behavior is self.Behavior.evaluate_expression:
                self.handle_evaluate_expression(
                    cond,
                    complete_keys,
                    entities_with_stats
                )

    def handle_evaluate_expression(self, condition, statistics, entities):
        trigger = {}
        # check 'condition' for each of these entities
        for entity in entities:
            if hasattr(condition, 'aggregation_op'):
                # in this case, the aggregation operation is performed on each
                # of the condition's 'keys' and then with aggregated values
                # condition's 'expression' is evaluated; if it evaluates to
                # True, then list of the keys values is added to the
                # condition's trigger: Dict[entity_name, List[stats]]
                result = self.fetch_aggregated_values(
                        entity, statistics, condition.aggregation_op
                )
                keys = [result[key] for key in statistics]
                try:
                    if eval(condition.expression):
                        trigger[entity] = keys
                except Exception as e:
                    print(
                        'WARNING(TimeSeriesData) check_and_trigger: ' + str(e)
                    )
            else:
                # assumption: all stats have same series of timestamps
                # this is similar to the above but 'expression' is evaluated at
                # each timestamp, since there is no aggregation, and all the
                # epochs are added to the trigger when the condition's
                # 'expression' evaluated to true; so trigger is:
                # Dict[entity, Dict[timestamp, List[stats]]]
                for epoch in self.keys_ts[entity][statistics[0]].keys():
                    keys = [
                        self.keys_ts[entity][key][epoch]
                        for key in statistics
                    ]
                    try:
                        if eval(condition.expression):
                            if entity not in trigger:
                                trigger[entity] = {}
                            trigger[entity][epoch] = keys
                    except Exception as e:
                        print(
                            'WARNING(TimeSeriesData) check_and_trigger: ' +
                            str(e)
                        )
        if trigger:
            condition.set_trigger(trigger)