diff options
Diffstat (limited to 'src/rocksdb/tools/block_cache_analyzer/block_cache_pysim.py')
-rw-r--r-- | src/rocksdb/tools/block_cache_analyzer/block_cache_pysim.py | 2000 |
1 files changed, 2000 insertions, 0 deletions
diff --git a/src/rocksdb/tools/block_cache_analyzer/block_cache_pysim.py b/src/rocksdb/tools/block_cache_analyzer/block_cache_pysim.py new file mode 100644 index 000000000..67307df53 --- /dev/null +++ b/src/rocksdb/tools/block_cache_analyzer/block_cache_pysim.py @@ -0,0 +1,2000 @@ +#!/usr/bin/env python3 +# Copyright (c) Facebook, Inc. and its affiliates. All Rights Reserved. + +import gc +import heapq +import random +import sys +import time +from collections import OrderedDict +from os import path + +import numpy as np + + +kSampleSize = 64 # The sample size used when performing eviction. +kMicrosInSecond = 1000000 +kSecondsInMinute = 60 +kSecondsInHour = 3600 + + +class TraceRecord: + """ + A trace record represents a block access. + It holds the same struct as BlockCacheTraceRecord in + trace_replay/block_cache_tracer.h + """ + + def __init__( + self, + access_time, + block_id, + block_type, + block_size, + cf_id, + cf_name, + level, + fd, + caller, + no_insert, + get_id, + key_id, + kv_size, + is_hit, + referenced_key_exist_in_block, + num_keys_in_block, + table_id, + seq_number, + block_key_size, + key_size, + block_offset_in_file, + next_access_seq_no, + ): + self.access_time = access_time + self.block_id = block_id + self.block_type = block_type + self.block_size = block_size + block_key_size + self.cf_id = cf_id + self.cf_name = cf_name + self.level = level + self.fd = fd + self.caller = caller + if no_insert == 1: + self.no_insert = True + else: + self.no_insert = False + self.get_id = get_id + self.key_id = key_id + self.kv_size = kv_size + if is_hit == 1: + self.is_hit = True + else: + self.is_hit = False + if referenced_key_exist_in_block == 1: + self.referenced_key_exist_in_block = True + else: + self.referenced_key_exist_in_block = False + self.num_keys_in_block = num_keys_in_block + self.table_id = table_id + self.seq_number = seq_number + self.block_key_size = block_key_size + self.key_size = key_size + self.block_offset_in_file = block_offset_in_file + self.next_access_seq_no = next_access_seq_no + + +class CacheEntry: + """A cache entry stored in the cache.""" + + def __init__( + self, + value_size, + cf_id, + level, + block_type, + table_id, + access_number, + time_s, + num_hits=0, + ): + self.value_size = value_size + self.last_access_number = access_number + self.num_hits = num_hits + self.cf_id = 0 + self.level = level + self.block_type = block_type + self.last_access_time = time_s + self.insertion_time = time_s + self.table_id = table_id + + def __repr__(self): + """Debug string.""" + return "(s={},last={},hits={},cf={},l={},bt={})\n".format( + self.value_size, + self.last_access_number, + self.num_hits, + self.cf_id, + self.level, + self.block_type, + ) + + def cost_class(self, cost_class_label): + if cost_class_label == "table_bt": + return "{}-{}".format(self.table_id, self.block_type) + elif cost_class_label == "table": + return "{}".format(self.table_id) + elif cost_class_label == "bt": + return "{}".format(self.block_type) + elif cost_class_label == "cf": + return "{}".format(self.cf_id) + elif cost_class_label == "cf_bt": + return "{}-{}".format(self.cf_id, self.block_type) + elif cost_class_label == "table_level_bt": + return "{}-{}-{}".format(self.table_id, self.level, self.block_type) + assert False, "Unknown cost class label {}".format(cost_class_label) + return None + + +class HashEntry: + """A hash entry stored in a hash table.""" + + def __init__(self, key, hash, value): + self.key = key + self.hash = hash + self.value = value + + def __repr__(self): + return "k={},h={},v=[{}]".format(self.key, self.hash, self.value) + + +class HashTable: + """ + A custom implementation of hash table to support fast random sampling. + It is closed hashing and uses chaining to resolve hash conflicts. + It grows/shrinks the hash table upon insertion/deletion to support + fast lookups and random samplings. + """ + + def __init__(self): + self.initial_size = 32 + self.table = [None] * self.initial_size + self.elements = 0 + + def random_sample(self, sample_size): + """Randomly sample 'sample_size' hash entries from the table.""" + samples = [] + index = random.randint(0, len(self.table) - 1) + pos = index + # Starting from index, adding hash entries to the sample list until + # sample_size is met or we ran out of entries. + while True: + if self.table[pos] is not None: + for i in range(len(self.table[pos])): + if self.table[pos][i] is None: + continue + samples.append(self.table[pos][i]) + if len(samples) == sample_size: + break + pos += 1 + pos = pos % len(self.table) + if pos == index or len(samples) == sample_size: + break + assert len(samples) <= sample_size + return samples + + def __repr__(self): + all_entries = [] + for i in range(len(self.table)): + if self.table[i] is None: + continue + for j in range(len(self.table[i])): + if self.table[i][j] is not None: + all_entries.append(self.table[i][j]) + return "{}".format(all_entries) + + def values(self): + all_values = [] + for i in range(len(self.table)): + if self.table[i] is None: + continue + for j in range(len(self.table[i])): + if self.table[i][j] is not None: + all_values.append(self.table[i][j].value) + return all_values + + def __len__(self): + return self.elements + + def insert(self, key, hash, value): + """ + Insert a hash entry in the table. Replace the old entry if it already + exists. + """ + self.grow() + inserted = False + index = hash % len(self.table) + if self.table[index] is None: + self.table[index] = [] + # Search for the entry first. + for i in range(len(self.table[index])): + if self.table[index][i] is None: + continue + if self.table[index][i].hash == hash and self.table[index][i].key == key: + # The entry already exists in the table. + self.table[index][i] = HashEntry(key, hash, value) + return + + # Find an empty slot. + for i in range(len(self.table[index])): + if self.table[index][i] is None: + self.table[index][i] = HashEntry(key, hash, value) + inserted = True + break + if not inserted: + self.table[index].append(HashEntry(key, hash, value)) + self.elements += 1 + + def resize(self, new_size): + if new_size == len(self.table): + return + if new_size < self.initial_size: + return + if self.elements < 100: + return + new_table = [None] * new_size + # Copy 'self.table' to new_table. + for i in range(len(self.table)): + entries = self.table[i] + if entries is None: + continue + for j in range(len(entries)): + if entries[j] is None: + continue + index = entries[j].hash % new_size + if new_table[index] is None: + new_table[index] = [] + new_table[index].append(entries[j]) + self.table = new_table + del new_table + # Manually call python gc here to free the memory as 'self.table' + # might be very large. + gc.collect() + + def grow(self): + if self.elements < 4 * len(self.table): + return + new_size = int(len(self.table) * 1.5) + self.resize(new_size) + + def delete(self, key, hash): + index = hash % len(self.table) + deleted = False + deleted_entry = None + if self.table[index] is None: + return + for i in range(len(self.table[index])): + if ( + self.table[index][i] is not None + and self.table[index][i].hash == hash + and self.table[index][i].key == key + ): + deleted_entry = self.table[index][i] + self.table[index][i] = None + self.elements -= 1 + deleted = True + break + if deleted: + self.shrink() + return deleted_entry + + def shrink(self): + if self.elements * 2 >= len(self.table): + return + new_size = int(len(self.table) * 0.7) + self.resize(new_size) + + def lookup(self, key, hash): + index = hash % len(self.table) + if self.table[index] is None: + return None + for i in range(len(self.table[index])): + if ( + self.table[index][i] is not None + and self.table[index][i].hash == hash + and self.table[index][i].key == key + ): + return self.table[index][i].value + return None + + +class MissRatioStats: + def __init__(self, time_unit): + self.num_misses = 0 + self.num_accesses = 0 + self.time_unit = time_unit + self.time_misses = {} + self.time_miss_bytes = {} + self.time_accesses = {} + + def update_metrics(self, access_time, is_hit, miss_bytes): + access_time /= kMicrosInSecond * self.time_unit + self.num_accesses += 1 + if access_time not in self.time_accesses: + self.time_accesses[access_time] = 0 + self.time_accesses[access_time] += 1 + if not is_hit: + self.num_misses += 1 + if access_time not in self.time_misses: + self.time_misses[access_time] = 0 + self.time_miss_bytes[access_time] = 0 + self.time_misses[access_time] += 1 + self.time_miss_bytes[access_time] += miss_bytes + + def reset_counter(self): + self.num_misses = 0 + self.num_accesses = 0 + self.time_miss_bytes.clear() + self.time_misses.clear() + self.time_accesses.clear() + + def compute_miss_bytes(self): + miss_bytes = [] + for at in self.time_miss_bytes: + miss_bytes.append(self.time_miss_bytes[at]) + miss_bytes = sorted(miss_bytes) + avg_miss_bytes = 0 + p95_miss_bytes = 0 + for i in range(len(miss_bytes)): + avg_miss_bytes += float(miss_bytes[i]) / float(len(miss_bytes)) + + p95_index = min(int(0.95 * float(len(miss_bytes))), len(miss_bytes) - 1) + p95_miss_bytes = miss_bytes[p95_index] + return avg_miss_bytes, p95_miss_bytes + + def miss_ratio(self): + return float(self.num_misses) * 100.0 / float(self.num_accesses) + + def write_miss_timeline( + self, cache_type, cache_size, target_cf_name, result_dir, start, end + ): + start /= kMicrosInSecond * self.time_unit + end /= kMicrosInSecond * self.time_unit + header_file_path = "{}/header-ml-miss-timeline-{}-{}-{}-{}".format( + result_dir, self.time_unit, cache_type, cache_size, target_cf_name + ) + if not path.exists(header_file_path): + with open(header_file_path, "w+") as header_file: + header = "time" + for trace_time in range(start, end): + header += ",{}".format(trace_time) + header_file.write(header + "\n") + file_path = "{}/data-ml-miss-timeline-{}-{}-{}-{}".format( + result_dir, self.time_unit, cache_type, cache_size, target_cf_name + ) + with open(file_path, "w+") as file: + row = "{}".format(cache_type) + for trace_time in range(start, end): + row += ",{}".format(self.time_misses.get(trace_time, 0)) + file.write(row + "\n") + + def write_miss_ratio_timeline( + self, cache_type, cache_size, target_cf_name, result_dir, start, end + ): + start /= kMicrosInSecond * self.time_unit + end /= kMicrosInSecond * self.time_unit + header_file_path = "{}/header-ml-miss-ratio-timeline-{}-{}-{}-{}".format( + result_dir, self.time_unit, cache_type, cache_size, target_cf_name + ) + if not path.exists(header_file_path): + with open(header_file_path, "w+") as header_file: + header = "time" + for trace_time in range(start, end): + header += ",{}".format(trace_time) + header_file.write(header + "\n") + file_path = "{}/data-ml-miss-ratio-timeline-{}-{}-{}-{}".format( + result_dir, self.time_unit, cache_type, cache_size, target_cf_name + ) + with open(file_path, "w+") as file: + row = "{}".format(cache_type) + for trace_time in range(start, end): + naccesses = self.time_accesses.get(trace_time, 0) + miss_ratio = 0 + if naccesses > 0: + miss_ratio = float( + self.time_misses.get(trace_time, 0) * 100.0 + ) / float(naccesses) + row += ",{0:.2f}".format(miss_ratio) + file.write(row + "\n") + + +class PolicyStats: + def __init__(self, time_unit, policies): + self.time_selected_polices = {} + self.time_accesses = {} + self.policy_names = {} + self.time_unit = time_unit + for i in range(len(policies)): + self.policy_names[i] = policies[i].policy_name() + + def update_metrics(self, access_time, selected_policy): + access_time /= kMicrosInSecond * self.time_unit + if access_time not in self.time_accesses: + self.time_accesses[access_time] = 0 + self.time_accesses[access_time] += 1 + if access_time not in self.time_selected_polices: + self.time_selected_polices[access_time] = {} + policy_name = self.policy_names[selected_policy] + if policy_name not in self.time_selected_polices[access_time]: + self.time_selected_polices[access_time][policy_name] = 0 + self.time_selected_polices[access_time][policy_name] += 1 + + def write_policy_timeline( + self, cache_type, cache_size, target_cf_name, result_dir, start, end + ): + start /= kMicrosInSecond * self.time_unit + end /= kMicrosInSecond * self.time_unit + header_file_path = "{}/header-ml-policy-timeline-{}-{}-{}-{}".format( + result_dir, self.time_unit, cache_type, cache_size, target_cf_name + ) + if not path.exists(header_file_path): + with open(header_file_path, "w+") as header_file: + header = "time" + for trace_time in range(start, end): + header += ",{}".format(trace_time) + header_file.write(header + "\n") + file_path = "{}/data-ml-policy-timeline-{}-{}-{}-{}".format( + result_dir, self.time_unit, cache_type, cache_size, target_cf_name + ) + with open(file_path, "w+") as file: + for policy in self.policy_names: + policy_name = self.policy_names[policy] + row = "{}-{}".format(cache_type, policy_name) + for trace_time in range(start, end): + row += ",{}".format( + self.time_selected_polices.get(trace_time, {}).get( + policy_name, 0 + ) + ) + file.write(row + "\n") + + def write_policy_ratio_timeline( + self, cache_type, cache_size, target_cf_name, file_path, start, end + ): + start /= kMicrosInSecond * self.time_unit + end /= kMicrosInSecond * self.time_unit + header_file_path = "{}/header-ml-policy-ratio-timeline-{}-{}-{}-{}".format( + result_dir, self.time_unit, cache_type, cache_size, target_cf_name + ) + if not path.exists(header_file_path): + with open(header_file_path, "w+") as header_file: + header = "time" + for trace_time in range(start, end): + header += ",{}".format(trace_time) + header_file.write(header + "\n") + file_path = "{}/data-ml-policy-ratio-timeline-{}-{}-{}-{}".format( + result_dir, self.time_unit, cache_type, cache_size, target_cf_name + ) + with open(file_path, "w+") as file: + for policy in self.policy_names: + policy_name = self.policy_names[policy] + row = "{}-{}".format(cache_type, policy_name) + for trace_time in range(start, end): + naccesses = self.time_accesses.get(trace_time, 0) + ratio = 0 + if naccesses > 0: + ratio = float( + self.time_selected_polices.get(trace_time, {}).get( + policy_name, 0 + ) + * 100.0 + ) / float(naccesses) + row += ",{0:.2f}".format(ratio) + file.write(row + "\n") + + +class Policy(object): + """ + A policy maintains a set of evicted keys. It returns a reward of one to + itself if it has not evicted a missing key. Otherwise, it gives itself 0 + reward. + """ + + def __init__(self): + self.evicted_keys = {} + + def evict(self, key, max_size): + self.evicted_keys[key] = 0 + + def delete(self, key): + self.evicted_keys.pop(key, None) + + def prioritize_samples(self, samples, auxilliary_info): + raise NotImplementedError + + def policy_name(self): + raise NotImplementedError + + def generate_reward(self, key): + if key in self.evicted_keys: + return 0 + return 1 + + +class LRUPolicy(Policy): + def prioritize_samples(self, samples, auxilliary_info): + return sorted( + samples, + cmp=lambda e1, e2: e1.value.last_access_number + - e2.value.last_access_number, + ) + + def policy_name(self): + return "lru" + + +class MRUPolicy(Policy): + def prioritize_samples(self, samples, auxilliary_info): + return sorted( + samples, + cmp=lambda e1, e2: e2.value.last_access_number + - e1.value.last_access_number, + ) + + def policy_name(self): + return "mru" + + +class LFUPolicy(Policy): + def prioritize_samples(self, samples, auxilliary_info): + return sorted(samples, cmp=lambda e1, e2: e1.value.num_hits - e2.value.num_hits) + + def policy_name(self): + return "lfu" + + +class HyperbolicPolicy(Policy): + """ + An implementation of Hyperbolic caching. + + Aaron Blankstein, Siddhartha Sen, and Michael J. Freedman. 2017. + Hyperbolic caching: flexible caching for web applications. In Proceedings + of the 2017 USENIX Conference on Usenix Annual Technical Conference + (USENIX ATC '17). USENIX Association, Berkeley, CA, USA, 499-511. + """ + + def compare(self, e1, e2, now): + e1_duration = max(0, (now - e1.value.insertion_time) / kMicrosInSecond) * float( + e1.value.value_size + ) + e2_duration = max(0, (now - e2.value.insertion_time) / kMicrosInSecond) * float( + e2.value.value_size + ) + if e1_duration == e2_duration: + return e1.value.num_hits - e2.value.num_hits + if e1_duration == 0: + return 1 + if e2_duration == 0: + return 1 + diff = (float(e1.value.num_hits) / (float(e1_duration))) - ( + float(e2.value.num_hits) / float(e2_duration) + ) + if diff == 0: + return 0 + elif diff > 0: + return 1 + else: + return -1 + + def prioritize_samples(self, samples, auxilliary_info): + assert len(auxilliary_info) == 3 + now = auxilliary_info[0] + return sorted(samples, cmp=lambda e1, e2: self.compare(e1, e2, now)) + + def policy_name(self): + return "hb" + + +class CostClassPolicy(Policy): + """ + We calculate the hit density of a cost class as + number of hits / total size in cache * average duration in the cache. + + An entry has a higher priority if its class's hit density is higher. + """ + + def compare(self, e1, e2, now, cost_classes, cost_class_label): + e1_class = e1.value.cost_class(cost_class_label) + e2_class = e2.value.cost_class(cost_class_label) + + assert e1_class in cost_classes + assert e2_class in cost_classes + + e1_entry = cost_classes[e1_class] + e2_entry = cost_classes[e2_class] + e1_density = e1_entry.density(now) + e2_density = e2_entry.density(now) + e1_hits = cost_classes[e1_class].hits + e2_hits = cost_classes[e2_class].hits + + if e1_density == e2_density: + return e1_hits - e2_hits + + if e1_entry.num_entries_in_cache == 0: + return -1 + if e2_entry.num_entries_in_cache == 0: + return 1 + + if e1_density == 0: + return 1 + if e2_density == 0: + return -1 + diff = (float(e1_hits) / float(e1_density)) - ( + float(e2_hits) / float(e2_density) + ) + if diff == 0: + return 0 + elif diff > 0: + return 1 + else: + return -1 + + def prioritize_samples(self, samples, auxilliary_info): + assert len(auxilliary_info) == 3 + now = auxilliary_info[0] + cost_classes = auxilliary_info[1] + cost_class_label = auxilliary_info[2] + return sorted( + samples, + cmp=lambda e1, e2: self.compare( + e1, e2, now, cost_classes, cost_class_label + ), + ) + + def policy_name(self): + return "cc" + + +class Cache(object): + """ + This is the base class for the implementations of alternative cache + replacement policies. + """ + + def __init__(self, cache_size, enable_cache_row_key): + self.cache_size = cache_size + self.used_size = 0 + self.per_second_miss_ratio_stats = MissRatioStats(1) + self.miss_ratio_stats = MissRatioStats(kSecondsInMinute) + self.per_hour_miss_ratio_stats = MissRatioStats(kSecondsInHour) + # 0: disabled. 1: enabled. Insert both row and the refereneced data block. + # 2: enabled. Insert only the row but NOT the referenced data block. + self.enable_cache_row_key = enable_cache_row_key + self.get_id_row_key_map = {} + self.max_seen_get_id = 0 + self.retain_get_id_range = 100000 + + def block_key(self, trace_record): + return "b{}".format(trace_record.block_id) + + def row_key(self, trace_record): + return "g{}-{}".format(trace_record.fd, trace_record.key_id) + + def _lookup(self, trace_record, key, hash): + """ + Look up the key in the cache. + Returns true upon a cache hit, false otherwise. + """ + raise NotImplementedError + + def _evict(self, trace_record, key, hash, value_size): + """ + Evict entries in the cache until there is enough room to insert the new + entry with 'value_size'. + """ + raise NotImplementedError + + def _insert(self, trace_record, key, hash, value_size): + """ + Insert the new entry into the cache. + """ + raise NotImplementedError + + def _should_admit(self, trace_record, key, hash, value_size): + """ + A custom admission policy to decide whether we should admit the new + entry upon a cache miss. + Returns true if the new entry should be admitted, false otherwise. + """ + raise NotImplementedError + + def cache_name(self): + """ + The name of the replacement policy. + """ + raise NotImplementedError + + def is_ml_cache(self): + return False + + def _update_stats(self, access_time, is_hit, miss_bytes): + self.per_second_miss_ratio_stats.update_metrics(access_time, is_hit, miss_bytes) + self.miss_ratio_stats.update_metrics(access_time, is_hit, miss_bytes) + self.per_hour_miss_ratio_stats.update_metrics(access_time, is_hit, miss_bytes) + + def access(self, trace_record): + """ + Access a trace record. The simulator calls this function to access a + trace record. + """ + assert self.used_size <= self.cache_size + if ( + self.enable_cache_row_key > 0 + and trace_record.caller == 1 + and trace_record.key_id != 0 + and trace_record.get_id != 0 + ): + # This is a get request. + self._access_row(trace_record) + return + is_hit = self._access_kv( + trace_record, + self.block_key(trace_record), + trace_record.block_id, + trace_record.block_size, + trace_record.no_insert, + ) + self._update_stats( + trace_record.access_time, is_hit=is_hit, miss_bytes=trace_record.block_size + ) + + def _access_row(self, trace_record): + row_key = self.row_key(trace_record) + self.max_seen_get_id = max(self.max_seen_get_id, trace_record.get_id) + self.get_id_row_key_map.pop( + self.max_seen_get_id - self.retain_get_id_range, None + ) + if trace_record.get_id not in self.get_id_row_key_map: + self.get_id_row_key_map[trace_record.get_id] = {} + self.get_id_row_key_map[trace_record.get_id]["h"] = False + if self.get_id_row_key_map[trace_record.get_id]["h"]: + # We treat future accesses as hits since this get request + # completes. + # print("row hit 1") + self._update_stats(trace_record.access_time, is_hit=True, miss_bytes=0) + return + if row_key not in self.get_id_row_key_map[trace_record.get_id]: + # First time seen this key. + is_hit = self._access_kv( + trace_record, + key=row_key, + hash=trace_record.key_id, + value_size=trace_record.kv_size, + no_insert=False, + ) + inserted = False + if trace_record.kv_size > 0: + inserted = True + self.get_id_row_key_map[trace_record.get_id][row_key] = inserted + self.get_id_row_key_map[trace_record.get_id]["h"] = is_hit + if self.get_id_row_key_map[trace_record.get_id]["h"]: + # We treat future accesses as hits since this get request + # completes. + # print("row hit 2") + self._update_stats(trace_record.access_time, is_hit=True, miss_bytes=0) + return + # Access its blocks. + no_insert = trace_record.no_insert + if ( + self.enable_cache_row_key == 2 + and trace_record.kv_size > 0 + and trace_record.block_type == 9 + ): + no_insert = True + is_hit = self._access_kv( + trace_record, + key=self.block_key(trace_record), + hash=trace_record.block_id, + value_size=trace_record.block_size, + no_insert=no_insert, + ) + self._update_stats( + trace_record.access_time, is_hit, miss_bytes=trace_record.block_size + ) + if ( + trace_record.kv_size > 0 + and not self.get_id_row_key_map[trace_record.get_id][row_key] + ): + # Insert the row key-value pair. + self._access_kv( + trace_record, + key=row_key, + hash=trace_record.key_id, + value_size=trace_record.kv_size, + no_insert=False, + ) + # Mark as inserted. + self.get_id_row_key_map[trace_record.get_id][row_key] = True + + def _access_kv(self, trace_record, key, hash, value_size, no_insert): + # Sanity checks. + assert self.used_size <= self.cache_size + if self._lookup(trace_record, key, hash): + # A cache hit. + return True + if no_insert or value_size <= 0: + return False + # A cache miss. + if value_size > self.cache_size: + # The block is too large to fit into the cache. + return False + self._evict(trace_record, key, hash, value_size) + if self._should_admit(trace_record, key, hash, value_size): + self._insert(trace_record, key, hash, value_size) + self.used_size += value_size + return False + + +class CostClassEntry: + """ + A cost class maintains aggregated statistics of cached entries in a class. + For example, we may define block type as a class. Then, cached blocks of the + same type will share one cost class entry. + """ + + def __init__(self): + self.hits = 0 + self.num_entries_in_cache = 0 + self.size_in_cache = 0 + self.sum_insertion_times = 0 + self.sum_last_access_time = 0 + + def insert(self, trace_record, key, value_size): + self.size_in_cache += value_size + self.num_entries_in_cache += 1 + self.sum_insertion_times += trace_record.access_time / kMicrosInSecond + self.sum_last_access_time += trace_record.access_time / kMicrosInSecond + + def remove(self, insertion_time, last_access_time, key, value_size, num_hits): + self.hits -= num_hits + self.num_entries_in_cache -= 1 + self.sum_insertion_times -= insertion_time / kMicrosInSecond + self.size_in_cache -= value_size + self.sum_last_access_time -= last_access_time / kMicrosInSecond + + def update_on_hit(self, trace_record, last_access_time): + self.hits += 1 + self.sum_last_access_time -= last_access_time / kMicrosInSecond + self.sum_last_access_time += trace_record.access_time / kMicrosInSecond + + def avg_lifetime_in_cache(self, now): + avg_insertion_time = self.sum_insertion_times / self.num_entries_in_cache + return now / kMicrosInSecond - avg_insertion_time + + def avg_last_access_time(self): + if self.num_entries_in_cache == 0: + return 0 + return float(self.sum_last_access_time) / float(self.num_entries_in_cache) + + def avg_size(self): + if self.num_entries_in_cache == 0: + return 0 + return float(self.sum_last_access_time) / float(self.num_entries_in_cache) + + def density(self, now): + avg_insertion_time = self.sum_insertion_times / self.num_entries_in_cache + in_cache_duration = now / kMicrosInSecond - avg_insertion_time + return self.size_in_cache * in_cache_duration + + +class MLCache(Cache): + """ + MLCache is the base class for implementations of alternative replacement + policies using reinforcement learning. + """ + + def __init__(self, cache_size, enable_cache_row_key, policies, cost_class_label): + super(MLCache, self).__init__(cache_size, enable_cache_row_key) + self.table = HashTable() + self.policy_stats = PolicyStats(kSecondsInMinute, policies) + self.per_hour_policy_stats = PolicyStats(kSecondsInHour, policies) + self.policies = policies + self.cost_classes = {} + self.cost_class_label = cost_class_label + + def is_ml_cache(self): + return True + + def _lookup(self, trace_record, key, hash): + value = self.table.lookup(key, hash) + if value is not None: + # Update the entry's cost class statistics. + if self.cost_class_label is not None: + cost_class = value.cost_class(self.cost_class_label) + assert cost_class in self.cost_classes + self.cost_classes[cost_class].update_on_hit( + trace_record, value.last_access_time + ) + # Update the entry's last access time. + self.table.insert( + key, + hash, + CacheEntry( + value_size=value.value_size, + cf_id=value.cf_id, + level=value.level, + block_type=value.block_type, + table_id=value.table_id, + access_number=self.miss_ratio_stats.num_accesses, + time_s=trace_record.access_time, + num_hits=value.num_hits + 1, + ), + ) + return True + return False + + def _evict(self, trace_record, key, hash, value_size): + # Select a policy, random sample kSampleSize keys from the cache, then + # evict keys in the sample set until we have enough room for the new + # entry. + policy_index = self._select_policy(trace_record, key) + assert policy_index < len(self.policies) and policy_index >= 0 + self.policies[policy_index].delete(key) + self.policy_stats.update_metrics(trace_record.access_time, policy_index) + self.per_hour_policy_stats.update_metrics( + trace_record.access_time, policy_index + ) + while self.used_size + value_size > self.cache_size: + # Randomly sample n entries. + samples = self.table.random_sample(kSampleSize) + samples = self.policies[policy_index].prioritize_samples( + samples, + [trace_record.access_time, self.cost_classes, self.cost_class_label], + ) + for hash_entry in samples: + assert self.table.delete(hash_entry.key, hash_entry.hash) is not None + self.used_size -= hash_entry.value.value_size + self.policies[policy_index].evict( + key=hash_entry.key, max_size=self.table.elements + ) + # Update the entry's cost class statistics. + if self.cost_class_label is not None: + cost_class = hash_entry.value.cost_class(self.cost_class_label) + assert cost_class in self.cost_classes + self.cost_classes[cost_class].remove( + hash_entry.value.insertion_time, + hash_entry.value.last_access_time, + key, + hash_entry.value.value_size, + hash_entry.value.num_hits, + ) + if self.used_size + value_size <= self.cache_size: + break + + def _insert(self, trace_record, key, hash, value_size): + assert self.used_size + value_size <= self.cache_size + entry = CacheEntry( + value_size, + trace_record.cf_id, + trace_record.level, + trace_record.block_type, + trace_record.table_id, + self.miss_ratio_stats.num_accesses, + trace_record.access_time, + ) + # Update the entry's cost class statistics. + if self.cost_class_label is not None: + cost_class = entry.cost_class(self.cost_class_label) + if cost_class not in self.cost_classes: + self.cost_classes[cost_class] = CostClassEntry() + self.cost_classes[cost_class].insert(trace_record, key, value_size) + self.table.insert(key, hash, entry) + + def _should_admit(self, trace_record, key, hash, value_size): + return True + + def _select_policy(self, trace_record, key): + raise NotImplementedError + + +class ThompsonSamplingCache(MLCache): + """ + An implementation of Thompson Sampling for the Bernoulli Bandit. + + Daniel J. Russo, Benjamin Van Roy, Abbas Kazerouni, Ian Osband, + and Zheng Wen. 2018. A Tutorial on Thompson Sampling. Found. + Trends Mach. Learn. 11, 1 (July 2018), 1-96. + DOI: https://doi.org/10.1561/2200000070 + """ + + def __init__( + self, + cache_size, + enable_cache_row_key, + policies, + cost_class_label, + init_a=1, + init_b=1, + ): + super(ThompsonSamplingCache, self).__init__( + cache_size, enable_cache_row_key, policies, cost_class_label + ) + self._as = {} + self._bs = {} + for _i in range(len(policies)): + self._as = [init_a] * len(self.policies) + self._bs = [init_b] * len(self.policies) + + def _select_policy(self, trace_record, key): + if len(self.policies) == 1: + return 0 + samples = [ + np.random.beta(self._as[x], self._bs[x]) for x in range(len(self.policies)) + ] + selected_policy = max(range(len(self.policies)), key=lambda x: samples[x]) + reward = self.policies[selected_policy].generate_reward(key) + assert reward <= 1 and reward >= 0 + self._as[selected_policy] += reward + self._bs[selected_policy] += 1 - reward + return selected_policy + + def cache_name(self): + if self.enable_cache_row_key: + return "Hybrid ThompsonSampling with cost class {} (ts_hybrid)".format( + self.cost_class_label + ) + return "ThompsonSampling with cost class {} (ts)".format(self.cost_class_label) + + +class LinUCBCache(MLCache): + """ + An implementation of LinUCB with disjoint linear models. + + Lihong Li, Wei Chu, John Langford, and Robert E. Schapire. 2010. + A contextual-bandit approach to personalized news article recommendation. + In Proceedings of the 19th international conference on World wide web + (WWW '10). ACM, New York, NY, USA, 661-670. + DOI=http://dx.doi.org/10.1145/1772690.1772758 + """ + + def __init__(self, cache_size, enable_cache_row_key, policies, cost_class_label): + super(LinUCBCache, self).__init__( + cache_size, enable_cache_row_key, policies, cost_class_label + ) + self.nfeatures = 4 # Block type, level, cf. + self.th = np.zeros((len(self.policies), self.nfeatures)) + self.eps = 0.2 + self.b = np.zeros_like(self.th) + self.A = np.zeros((len(self.policies), self.nfeatures, self.nfeatures)) + self.A_inv = np.zeros((len(self.policies), self.nfeatures, self.nfeatures)) + for i in range(len(self.policies)): + self.A[i] = np.identity(self.nfeatures) + self.th_hat = np.zeros_like(self.th) + self.p = np.zeros(len(self.policies)) + self.alph = 0.2 + + def _select_policy(self, trace_record, key): + if len(self.policies) == 1: + return 0 + x_i = np.zeros(self.nfeatures) # The current context vector + x_i[0] = trace_record.block_type + x_i[1] = trace_record.level + x_i[2] = trace_record.cf_id + p = np.zeros(len(self.policies)) + for a in range(len(self.policies)): + self.th_hat[a] = self.A_inv[a].dot(self.b[a]) + ta = x_i.dot(self.A_inv[a]).dot(x_i) + a_upper_ci = self.alph * np.sqrt(ta) + a_mean = self.th_hat[a].dot(x_i) + p[a] = a_mean + a_upper_ci + p = p + (np.random.random(len(p)) * 0.000001) + selected_policy = p.argmax() + reward = self.policies[selected_policy].generate_reward(key) + assert reward <= 1 and reward >= 0 + self.A[selected_policy] += np.outer(x_i, x_i) + self.b[selected_policy] += reward * x_i + self.A_inv[selected_policy] = np.linalg.inv(self.A[selected_policy]) + del x_i + return selected_policy + + def cache_name(self): + if self.enable_cache_row_key: + return "Hybrid LinUCB with cost class {} (linucb_hybrid)".format( + self.cost_class_label + ) + return "LinUCB with cost class {} (linucb)".format(self.cost_class_label) + + +class OPTCacheEntry: + """ + A cache entry for the OPT algorithm. The entries are sorted based on its + next access sequence number in reverse order, i.e., the entry which next + access is the furthest in the future is ordered before other entries. + """ + + def __init__(self, key, next_access_seq_no, value_size): + self.key = key + self.next_access_seq_no = next_access_seq_no + self.value_size = value_size + self.is_removed = False + + def __cmp__(self, other): + if other.next_access_seq_no != self.next_access_seq_no: + return other.next_access_seq_no - self.next_access_seq_no + return self.value_size - other.value_size + + def __repr__(self): + return "({} {} {} {})".format( + self.key, self.next_access_seq_no, self.value_size, self.is_removed + ) + + +class PQTable: + """ + A hash table with a priority queue. + """ + + def __init__(self): + # A list of entries arranged in a heap sorted based on the entry custom + # implementation of __cmp__ + self.pq = [] + self.table = {} + + def pqinsert(self, entry): + "Add a new key or update the priority of an existing key" + # Remove the entry from the table first. + removed_entry = self.table.pop(entry.key, None) + if removed_entry: + # Mark as removed since there is no 'remove' API in heappq. + # Instead, an entry in pq is removed lazily when calling pop. + removed_entry.is_removed = True + self.table[entry.key] = entry + heapq.heappush(self.pq, entry) + return removed_entry + + def pqpop(self): + while self.pq: + entry = heapq.heappop(self.pq) + if not entry.is_removed: + del self.table[entry.key] + return entry + return None + + def pqpeek(self): + while self.pq: + entry = self.pq[0] + if not entry.is_removed: + return entry + heapq.heappop(self.pq) + return + + def __contains__(self, k): + return k in self.table + + def __getitem__(self, k): + return self.table[k] + + def __len__(self): + return len(self.table) + + def values(self): + return self.table.values() + + +class OPTCache(Cache): + """ + An implementation of the Belady MIN algorithm. OPTCache evicts an entry + in the cache whose next access occurs furthest in the future. + + Note that Belady MIN algorithm is optimal assuming all blocks having the + same size and a missing entry will be inserted in the cache. + These are NOT true for the block cache trace since blocks have different + sizes and we may not insert a block into the cache upon a cache miss. + However, it is still useful to serve as a "theoretical upper bound" on the + lowest miss ratio we can achieve given a cache size. + + L. A. Belady. 1966. A Study of Replacement Algorithms for a + Virtual-storage Computer. IBM Syst. J. 5, 2 (June 1966), 78-101. + DOI=http://dx.doi.org/10.1147/sj.52.0078 + """ + + def __init__(self, cache_size): + super(OPTCache, self).__init__(cache_size, enable_cache_row_key=0) + self.table = PQTable() + + def _lookup(self, trace_record, key, hash): + if key not in self.table: + return False + # A cache hit. Update its next access time. + assert ( + self.table.pqinsert( + OPTCacheEntry( + key, trace_record.next_access_seq_no, self.table[key].value_size + ) + ) + is not None + ) + return True + + def _evict(self, trace_record, key, hash, value_size): + while self.used_size + value_size > self.cache_size: + evict_entry = self.table.pqpop() + assert evict_entry is not None + self.used_size -= evict_entry.value_size + + def _insert(self, trace_record, key, hash, value_size): + assert ( + self.table.pqinsert( + OPTCacheEntry(key, trace_record.next_access_seq_no, value_size) + ) + is None + ) + + def _should_admit(self, trace_record, key, hash, value_size): + return True + + def cache_name(self): + return "Belady MIN (opt)" + + +class GDSizeEntry: + """ + A cache entry for the greedy dual size replacement policy. + """ + + def __init__(self, key, value_size, priority): + self.key = key + self.value_size = value_size + self.priority = priority + self.is_removed = False + + def __cmp__(self, other): + if other.priority != self.priority: + return self.priority - other.priority + return self.value_size - other.value_size + + def __repr__(self): + return "({} {} {} {})".format( + self.key, self.next_access_seq_no, self.value_size, self.is_removed + ) + + +class GDSizeCache(Cache): + """ + An implementation of the greedy dual size algorithm. + We define cost as an entry's size. + + See https://www.usenix.org/legacy/publications/library/proceedings/usits97/full_papers/cao/cao_html/node8.html + and N. Young. The k-server dual and loose competitiveness for paging. + Algorithmica,June 1994, vol. 11,(no.6):525-41. + Rewritten version of ''On-line caching as cache size varies'', + in The 2nd Annual ACM-SIAM Symposium on Discrete Algorithms, 241-250, 1991. + """ + + def __init__(self, cache_size, enable_cache_row_key): + super(GDSizeCache, self).__init__(cache_size, enable_cache_row_key) + self.table = PQTable() + self.L = 0.0 + + def cache_name(self): + if self.enable_cache_row_key: + return "Hybrid GreedyDualSize (gdsize_hybrid)" + return "GreedyDualSize (gdsize)" + + def _lookup(self, trace_record, key, hash): + if key not in self.table: + return False + # A cache hit. Update its priority. + entry = self.table[key] + assert ( + self.table.pqinsert( + GDSizeEntry(key, entry.value_size, self.L + entry.value_size) + ) + is not None + ) + return True + + def _evict(self, trace_record, key, hash, value_size): + while self.used_size + value_size > self.cache_size: + evict_entry = self.table.pqpop() + assert evict_entry is not None + self.L = evict_entry.priority + self.used_size -= evict_entry.value_size + + def _insert(self, trace_record, key, hash, value_size): + assert ( + self.table.pqinsert(GDSizeEntry(key, value_size, self.L + value_size)) + is None + ) + + def _should_admit(self, trace_record, key, hash, value_size): + return True + + +class Deque(object): + """A Deque class facilitates the implementation of LRU and ARC.""" + + def __init__(self): + self.od = OrderedDict() + + def appendleft(self, k): + if k in self.od: + del self.od[k] + self.od[k] = None + + def pop(self): + item = self.od.popitem(last=False) if self.od else None + if item is not None: + return item[0] + return None + + def remove(self, k): + del self.od[k] + + def __len__(self): + return len(self.od) + + def __contains__(self, k): + return k in self.od + + def __iter__(self): + return reversed(self.od) + + def __repr__(self): + return "Deque(%r)" % (list(self),) + + +class ARCCache(Cache): + """ + An implementation of ARC. ARC assumes that all blocks are having the + same size. The size of index and filter blocks are variable. To accommodate + this, we modified ARC as follows: + 1) We use 16 KB as the average block size and calculate the number of blocks + (c) in the cache. + 2) When we insert an entry, the cache evicts entries in both t1 and t2 + queues until it has enough space for the new entry. This also requires + modification of the algorithm to maintain a maximum of 2*c blocks. + + Nimrod Megiddo and Dharmendra S. Modha. 2003. ARC: A Self-Tuning, Low + Overhead Replacement Cache. In Proceedings of the 2nd USENIX Conference on + File and Storage Technologies (FAST '03). USENIX Association, Berkeley, CA, + USA, 115-130. + """ + + def __init__(self, cache_size, enable_cache_row_key): + super(ARCCache, self).__init__(cache_size, enable_cache_row_key) + self.table = {} + self.c = cache_size / 16 * 1024 # Number of elements in the cache. + self.p = 0 # Target size for the list T1 + # L1: only once recently + self.t1 = Deque() # T1: recent cache entries + self.b1 = Deque() # B1: ghost entries recently evicted from the T1 cache + # L2: at least twice recently + self.t2 = Deque() # T2: frequent entries + self.b2 = Deque() # B2: ghost entries recently evicted from the T2 cache + + def _replace(self, key, value_size): + while self.used_size + value_size > self.cache_size: + if self.t1 and ((key in self.b2) or (len(self.t1) > self.p)): + old = self.t1.pop() + self.b1.appendleft(old) + else: + if self.t2: + old = self.t2.pop() + self.b2.appendleft(old) + else: + old = self.t1.pop() + self.b1.appendleft(old) + self.used_size -= self.table[old].value_size + del self.table[old] + + def _lookup(self, trace_record, key, hash): + # Case I: key is in T1 or T2. + # Move key to MRU position in T2. + if key in self.t1: + self.t1.remove(key) + self.t2.appendleft(key) + return True + + if key in self.t2: + self.t2.remove(key) + self.t2.appendleft(key) + return True + return False + + def _evict(self, trace_record, key, hash, value_size): + # Case II: key is in B1 + # Move x from B1 to the MRU position in T2 (also fetch x to the cache). + if key in self.b1: + self.p = min(self.c, self.p + max(len(self.b2) / len(self.b1), 1)) + self._replace(key, value_size) + self.b1.remove(key) + self.t2.appendleft(key) + return + + # Case III: key is in B2 + # Move x from B2 to the MRU position in T2 (also fetch x to the cache). + if key in self.b2: + self.p = max(0, self.p - max(len(self.b1) / len(self.b2), 1)) + self._replace(key, value_size) + self.b2.remove(key) + self.t2.appendleft(key) + return + + # Case IV: key is not in (T1 u B1 u T2 u B2) + self._replace(key, value_size) + while len(self.t1) + len(self.b1) >= self.c and self.b1: + self.b1.pop() + + total = len(self.t1) + len(self.b1) + len(self.t2) + len(self.b2) + while total >= (2 * self.c) and self.b2: + self.b2.pop() + total -= 1 + # Finally, move it to MRU position in T1. + self.t1.appendleft(key) + return + + def _insert(self, trace_record, key, hash, value_size): + self.table[key] = CacheEntry( + value_size, + trace_record.cf_id, + trace_record.level, + trace_record.block_type, + trace_record.table_id, + 0, + trace_record.access_time, + ) + + def _should_admit(self, trace_record, key, hash, value_size): + return True + + def cache_name(self): + if self.enable_cache_row_key: + return "Hybrid Adaptive Replacement Cache (arc_hybrid)" + return "Adaptive Replacement Cache (arc)" + + +class LRUCache(Cache): + """ + A strict LRU queue. + """ + + def __init__(self, cache_size, enable_cache_row_key): + super(LRUCache, self).__init__(cache_size, enable_cache_row_key) + self.table = {} + self.lru = Deque() + + def cache_name(self): + if self.enable_cache_row_key: + return "Hybrid LRU (lru_hybrid)" + return "LRU (lru)" + + def _lookup(self, trace_record, key, hash): + if key not in self.table: + return False + # A cache hit. Update LRU queue. + self.lru.remove(key) + self.lru.appendleft(key) + return True + + def _evict(self, trace_record, key, hash, value_size): + while self.used_size + value_size > self.cache_size: + evict_key = self.lru.pop() + self.used_size -= self.table[evict_key].value_size + del self.table[evict_key] + + def _insert(self, trace_record, key, hash, value_size): + self.table[key] = CacheEntry( + value_size, + trace_record.cf_id, + trace_record.level, + trace_record.block_type, + trace_record.table_id, + 0, + trace_record.access_time, + ) + self.lru.appendleft(key) + + def _should_admit(self, trace_record, key, hash, value_size): + return True + + +class TraceCache(Cache): + """ + A trace cache. Lookup returns true if the trace observes a cache hit. + It is used to maintain cache hits observed in the trace. + """ + + def __init__(self, cache_size): + super(TraceCache, self).__init__(cache_size, enable_cache_row_key=0) + + def _lookup(self, trace_record, key, hash): + return trace_record.is_hit + + def _evict(self, trace_record, key, hash, value_size): + pass + + def _insert(self, trace_record, key, hash, value_size): + pass + + def _should_admit(self, trace_record, key, hash, value_size): + return False + + def cache_name(self): + return "Trace" + + +def parse_cache_size(cs): + cs = cs.replace("\n", "") + if cs[-1] == "M": + return int(cs[: len(cs) - 1]) * 1024 * 1024 + if cs[-1] == "G": + return int(cs[: len(cs) - 1]) * 1024 * 1024 * 1024 + if cs[-1] == "T": + return int(cs[: len(cs) - 1]) * 1024 * 1024 * 1024 * 1024 + return int(cs) + + +def create_cache(cache_type, cache_size, downsample_size): + cache_size = cache_size / downsample_size + enable_cache_row_key = 0 + if "hybridn" in cache_type: + enable_cache_row_key = 2 + cache_type = cache_type[:-8] + if "hybrid" in cache_type: + enable_cache_row_key = 1 + cache_type = cache_type[:-7] + if cache_type == "ts": + return ThompsonSamplingCache( + cache_size, + enable_cache_row_key, + [LRUPolicy(), LFUPolicy(), HyperbolicPolicy()], + cost_class_label=None, + ) + elif cache_type == "linucb": + return LinUCBCache( + cache_size, + enable_cache_row_key, + [LRUPolicy(), LFUPolicy(), HyperbolicPolicy()], + cost_class_label=None, + ) + elif cache_type == "pylru": + return ThompsonSamplingCache( + cache_size, enable_cache_row_key, [LRUPolicy()], cost_class_label=None + ) + elif cache_type == "pymru": + return ThompsonSamplingCache( + cache_size, enable_cache_row_key, [MRUPolicy()], cost_class_label=None + ) + elif cache_type == "pylfu": + return ThompsonSamplingCache( + cache_size, enable_cache_row_key, [LFUPolicy()], cost_class_label=None + ) + elif cache_type == "pyhb": + return ThompsonSamplingCache( + cache_size, + enable_cache_row_key, + [HyperbolicPolicy()], + cost_class_label=None, + ) + elif cache_type == "pycctbbt": + return ThompsonSamplingCache( + cache_size, + enable_cache_row_key, + [CostClassPolicy()], + cost_class_label="table_bt", + ) + elif cache_type == "pycccf": + return ThompsonSamplingCache( + cache_size, enable_cache_row_key, [CostClassPolicy()], cost_class_label="cf" + ) + elif cache_type == "pycctblevelbt": + return ThompsonSamplingCache( + cache_size, + enable_cache_row_key, + [CostClassPolicy()], + cost_class_label="table_level_bt", + ) + elif cache_type == "pycccfbt": + return ThompsonSamplingCache( + cache_size, + enable_cache_row_key, + [CostClassPolicy()], + cost_class_label="cf_bt", + ) + elif cache_type == "pycctb": + return ThompsonSamplingCache( + cache_size, + enable_cache_row_key, + [CostClassPolicy()], + cost_class_label="table", + ) + elif cache_type == "pyccbt": + return ThompsonSamplingCache( + cache_size, enable_cache_row_key, [CostClassPolicy()], cost_class_label="bt" + ) + elif cache_type == "opt": + if enable_cache_row_key: + print("opt does not support hybrid mode.") + assert False + return OPTCache(cache_size) + elif cache_type == "trace": + if enable_cache_row_key: + print("trace does not support hybrid mode.") + assert False + return TraceCache(cache_size) + elif cache_type == "lru": + return LRUCache(cache_size, enable_cache_row_key) + elif cache_type == "arc": + return ARCCache(cache_size, enable_cache_row_key) + elif cache_type == "gdsize": + return GDSizeCache(cache_size, enable_cache_row_key) + else: + print("Unknown cache type {}".format(cache_type)) + assert False + return None + + +class BlockAccessTimeline: + """ + BlockAccessTimeline stores all accesses of a block. + """ + + def __init__(self): + self.accesses = [] + self.current_access_index = 1 + + def get_next_access(self): + if self.current_access_index == len(self.accesses): + return sys.maxsize + next_access_seq_no = self.accesses[self.current_access_index] + self.current_access_index += 1 + return next_access_seq_no + + +def percent(e1, e2): + if e2 == 0: + return -1 + return float(e1) * 100.0 / float(e2) + + +def is_target_cf(access_cf, target_cf_name): + if target_cf_name == "all": + return True + return access_cf == target_cf_name + + +def run( + trace_file_path, + cache_type, + cache, + warmup_seconds, + max_accesses_to_process, + target_cf_name, +): + warmup_complete = False + trace_miss_ratio_stats = MissRatioStats(kSecondsInMinute) + access_seq_no = 0 + time_interval = 1 + start_time = time.time() + trace_start_time = 0 + trace_duration = 0 + is_opt_cache = False + if cache.cache_name() == "Belady MIN (opt)": + is_opt_cache = True + + block_access_timelines = {} + num_no_inserts = 0 + num_blocks_with_no_size = 0 + num_inserts_block_with_no_size = 0 + + if is_opt_cache: + # Read all blocks in memory and stores their access times so that OPT + # can use this information to evict the cached key which next access is + # the furthest in the future. + print("Preprocessing block traces.") + with open(trace_file_path, "r") as trace_file: + for line in trace_file: + if ( + max_accesses_to_process != -1 + and access_seq_no > max_accesses_to_process + ): + break + ts = line.split(",") + timestamp = int(ts[0]) + cf_name = ts[5] + if not is_target_cf(cf_name, target_cf_name): + continue + if trace_start_time == 0: + trace_start_time = timestamp + trace_duration = timestamp - trace_start_time + block_id = int(ts[1]) + block_size = int(ts[3]) + no_insert = int(ts[9]) + if block_id not in block_access_timelines: + block_access_timelines[block_id] = BlockAccessTimeline() + if block_size == 0: + num_blocks_with_no_size += 1 + block_access_timelines[block_id].accesses.append(access_seq_no) + access_seq_no += 1 + if no_insert == 1: + num_no_inserts += 1 + if no_insert == 0 and block_size == 0: + num_inserts_block_with_no_size += 1 + if access_seq_no % 100 != 0: + continue + now = time.time() + if now - start_time > time_interval * 10: + print( + "Take {} seconds to process {} trace records with trace " + "duration of {} seconds. Throughput: {} records/second.".format( + now - start_time, + access_seq_no, + trace_duration / 1000000, + access_seq_no / (now - start_time), + ) + ) + time_interval += 1 + print( + "Trace contains {0} blocks, {1}({2:.2f}%) blocks with no size." + "{3} accesses, {4}({5:.2f}%) accesses with no_insert," + "{6}({7:.2f}%) accesses that want to insert but block size is 0.".format( + len(block_access_timelines), + num_blocks_with_no_size, + percent(num_blocks_with_no_size, len(block_access_timelines)), + access_seq_no, + num_no_inserts, + percent(num_no_inserts, access_seq_no), + num_inserts_block_with_no_size, + percent(num_inserts_block_with_no_size, access_seq_no), + ) + ) + + access_seq_no = 0 + time_interval = 1 + start_time = time.time() + trace_start_time = 0 + trace_duration = 0 + print("Running simulated {} cache on block traces.".format(cache.cache_name())) + with open(trace_file_path, "r") as trace_file: + for line in trace_file: + if ( + max_accesses_to_process != -1 + and access_seq_no > max_accesses_to_process + ): + break + if access_seq_no % 1000000 == 0: + # Force a python gc periodically to reduce memory usage. + gc.collect() + ts = line.split(",") + timestamp = int(ts[0]) + cf_name = ts[5] + if not is_target_cf(cf_name, target_cf_name): + continue + if trace_start_time == 0: + trace_start_time = timestamp + trace_duration = timestamp - trace_start_time + if ( + not warmup_complete + and warmup_seconds > 0 + and trace_duration > warmup_seconds * 1000000 + ): + cache.miss_ratio_stats.reset_counter() + warmup_complete = True + next_access_seq_no = 0 + block_id = int(ts[1]) + if is_opt_cache: + next_access_seq_no = block_access_timelines[block_id].get_next_access() + record = TraceRecord( + access_time=int(ts[0]), + block_id=int(ts[1]), + block_type=int(ts[2]), + block_size=int(ts[3]), + cf_id=int(ts[4]), + cf_name=ts[5], + level=int(ts[6]), + fd=int(ts[7]), + caller=int(ts[8]), + no_insert=int(ts[9]), + get_id=int(ts[10]), + key_id=int(ts[11]), + kv_size=int(ts[12]), + is_hit=int(ts[13]), + referenced_key_exist_in_block=int(ts[14]), + num_keys_in_block=int(ts[15]), + table_id=int(ts[16]), + seq_number=int(ts[17]), + block_key_size=int(ts[18]), + key_size=int(ts[19]), + block_offset_in_file=int(ts[20]), + next_access_seq_no=next_access_seq_no, + ) + trace_miss_ratio_stats.update_metrics( + record.access_time, is_hit=record.is_hit, miss_bytes=record.block_size + ) + cache.access(record) + access_seq_no += 1 + del record + del ts + if access_seq_no % 100 != 0: + continue + # Report progress every 10 seconds. + now = time.time() + if now - start_time > time_interval * 10: + print( + "Take {} seconds to process {} trace records with trace " + "duration of {} seconds. Throughput: {} records/second. " + "Trace miss ratio {}".format( + now - start_time, + access_seq_no, + trace_duration / 1000000, + access_seq_no / (now - start_time), + trace_miss_ratio_stats.miss_ratio(), + ) + ) + time_interval += 1 + print( + "{},0,0,{},{},{}".format( + cache_type, + cache.cache_size, + cache.miss_ratio_stats.miss_ratio(), + cache.miss_ratio_stats.num_accesses, + ) + ) + now = time.time() + print( + "Take {} seconds to process {} trace records with trace duration of {} " + "seconds. Throughput: {} records/second. Trace miss ratio {}".format( + now - start_time, + access_seq_no, + trace_duration / 1000000, + access_seq_no / (now - start_time), + trace_miss_ratio_stats.miss_ratio(), + ) + ) + print( + "{},0,0,{},{},{}".format( + cache_type, + cache.cache_size, + cache.miss_ratio_stats.miss_ratio(), + cache.miss_ratio_stats.num_accesses, + ) + ) + return trace_start_time, trace_duration + + +def report_stats( + cache, + cache_type, + cache_size, + target_cf_name, + result_dir, + trace_start_time, + trace_end_time, +): + cache_label = "{}-{}-{}".format(cache_type, cache_size, target_cf_name) + with open("{}/data-ml-mrc-{}".format(result_dir, cache_label), "w+") as mrc_file: + mrc_file.write( + "{},0,0,{},{},{}\n".format( + cache_type, + cache_size, + cache.miss_ratio_stats.miss_ratio(), + cache.miss_ratio_stats.num_accesses, + ) + ) + + cache_stats = [ + cache.per_second_miss_ratio_stats, + cache.miss_ratio_stats, + cache.per_hour_miss_ratio_stats, + ] + for i in range(len(cache_stats)): + avg_miss_bytes, p95_miss_bytes = cache_stats[i].compute_miss_bytes() + + with open( + "{}/data-ml-avgmb-{}-{}".format( + result_dir, cache_stats[i].time_unit, cache_label + ), + "w+", + ) as mb_file: + mb_file.write( + "{},0,0,{},{}\n".format(cache_type, cache_size, avg_miss_bytes) + ) + + with open( + "{}/data-ml-p95mb-{}-{}".format( + result_dir, cache_stats[i].time_unit, cache_label + ), + "w+", + ) as mb_file: + mb_file.write( + "{},0,0,{},{}\n".format(cache_type, cache_size, p95_miss_bytes) + ) + + cache_stats[i].write_miss_timeline( + cache_type, + cache_size, + target_cf_name, + result_dir, + trace_start_time, + trace_end_time, + ) + cache_stats[i].write_miss_ratio_timeline( + cache_type, + cache_size, + target_cf_name, + result_dir, + trace_start_time, + trace_end_time, + ) + + if not cache.is_ml_cache(): + return + + policy_stats = [cache.policy_stats, cache.per_hour_policy_stats] + for i in range(len(policy_stats)): + policy_stats[i].write_policy_timeline( + cache_type, + cache_size, + target_cf_name, + result_dir, + trace_start_time, + trace_end_time, + ) + policy_stats[i].write_policy_ratio_timeline( + cache_type, + cache_size, + target_cf_name, + result_dir, + trace_start_time, + trace_end_time, + ) + + +if __name__ == "__main__": + if len(sys.argv) <= 8: + print( + "Must provide 8 arguments.\n" + "1) Cache type (ts, linucb, arc, lru, opt, pylru, pymru, pylfu, " + "pyhb, gdsize, trace). One may evaluate the hybrid row_block cache " + "by appending '_hybrid' to a cache_type, e.g., ts_hybrid. " + "Note that hybrid is not supported with opt and trace. \n" + "2) Cache size (xM, xG, xT).\n" + "3) The sampling frequency used to collect the trace. (The " + "simulation scales down the cache size by the sampling frequency).\n" + "4) Warmup seconds (The number of seconds used for warmup).\n" + "5) Trace file path.\n" + "6) Result directory (A directory that saves generated results)\n" + "7) Max number of accesses to process\n" + "8) The target column family. (The simulation will only run " + "accesses on the target column family. If it is set to all, " + "it will run against all accesses.)" + ) + exit(1) + print("Arguments: {}".format(sys.argv)) + cache_type = sys.argv[1] + cache_size = parse_cache_size(sys.argv[2]) + downsample_size = int(sys.argv[3]) + warmup_seconds = int(sys.argv[4]) + trace_file_path = sys.argv[5] + result_dir = sys.argv[6] + max_accesses_to_process = int(sys.argv[7]) + target_cf_name = sys.argv[8] + cache = create_cache(cache_type, cache_size, downsample_size) + trace_start_time, trace_duration = run( + trace_file_path, + cache_type, + cache, + warmup_seconds, + max_accesses_to_process, + target_cf_name, + ) + trace_end_time = trace_start_time + trace_duration + report_stats( + cache, + cache_type, + cache_size, + target_cf_name, + result_dir, + trace_start_time, + trace_end_time, + ) |