diff options
Diffstat (limited to '')
9 files changed, 7056 insertions, 0 deletions
diff --git a/src/rocksdb/tools/block_cache_analyzer/__init__.py b/src/rocksdb/tools/block_cache_analyzer/__init__.py new file mode 100644 index 000000000..8dbe96a78 --- /dev/null +++ b/src/rocksdb/tools/block_cache_analyzer/__init__.py @@ -0,0 +1,2 @@ +#!/usr/bin/env python3 +# Copyright (c) Facebook, Inc. and its affiliates. All Rights Reserved. diff --git a/src/rocksdb/tools/block_cache_analyzer/block_cache_pysim.py b/src/rocksdb/tools/block_cache_analyzer/block_cache_pysim.py new file mode 100644 index 000000000..67307df53 --- /dev/null +++ b/src/rocksdb/tools/block_cache_analyzer/block_cache_pysim.py @@ -0,0 +1,2000 @@ +#!/usr/bin/env python3 +# Copyright (c) Facebook, Inc. and its affiliates. All Rights Reserved. + +import gc +import heapq +import random +import sys +import time +from collections import OrderedDict +from os import path + +import numpy as np + + +kSampleSize = 64 # The sample size used when performing eviction. +kMicrosInSecond = 1000000 +kSecondsInMinute = 60 +kSecondsInHour = 3600 + + +class TraceRecord: + """ + A trace record represents a block access. + It holds the same struct as BlockCacheTraceRecord in + trace_replay/block_cache_tracer.h + """ + + def __init__( + self, + access_time, + block_id, + block_type, + block_size, + cf_id, + cf_name, + level, + fd, + caller, + no_insert, + get_id, + key_id, + kv_size, + is_hit, + referenced_key_exist_in_block, + num_keys_in_block, + table_id, + seq_number, + block_key_size, + key_size, + block_offset_in_file, + next_access_seq_no, + ): + self.access_time = access_time + self.block_id = block_id + self.block_type = block_type + self.block_size = block_size + block_key_size + self.cf_id = cf_id + self.cf_name = cf_name + self.level = level + self.fd = fd + self.caller = caller + if no_insert == 1: + self.no_insert = True + else: + self.no_insert = False + self.get_id = get_id + self.key_id = key_id + self.kv_size = kv_size + if is_hit == 1: + self.is_hit = True + else: + self.is_hit = False + if referenced_key_exist_in_block == 1: + self.referenced_key_exist_in_block = True + else: + self.referenced_key_exist_in_block = False + self.num_keys_in_block = num_keys_in_block + self.table_id = table_id + self.seq_number = seq_number + self.block_key_size = block_key_size + self.key_size = key_size + self.block_offset_in_file = block_offset_in_file + self.next_access_seq_no = next_access_seq_no + + +class CacheEntry: + """A cache entry stored in the cache.""" + + def __init__( + self, + value_size, + cf_id, + level, + block_type, + table_id, + access_number, + time_s, + num_hits=0, + ): + self.value_size = value_size + self.last_access_number = access_number + self.num_hits = num_hits + self.cf_id = 0 + self.level = level + self.block_type = block_type + self.last_access_time = time_s + self.insertion_time = time_s + self.table_id = table_id + + def __repr__(self): + """Debug string.""" + return "(s={},last={},hits={},cf={},l={},bt={})\n".format( + self.value_size, + self.last_access_number, + self.num_hits, + self.cf_id, + self.level, + self.block_type, + ) + + def cost_class(self, cost_class_label): + if cost_class_label == "table_bt": + return "{}-{}".format(self.table_id, self.block_type) + elif cost_class_label == "table": + return "{}".format(self.table_id) + elif cost_class_label == "bt": + return "{}".format(self.block_type) + elif cost_class_label == "cf": + return "{}".format(self.cf_id) + elif cost_class_label == "cf_bt": + return "{}-{}".format(self.cf_id, self.block_type) + elif cost_class_label == "table_level_bt": + return "{}-{}-{}".format(self.table_id, self.level, self.block_type) + assert False, "Unknown cost class label {}".format(cost_class_label) + return None + + +class HashEntry: + """A hash entry stored in a hash table.""" + + def __init__(self, key, hash, value): + self.key = key + self.hash = hash + self.value = value + + def __repr__(self): + return "k={},h={},v=[{}]".format(self.key, self.hash, self.value) + + +class HashTable: + """ + A custom implementation of hash table to support fast random sampling. + It is closed hashing and uses chaining to resolve hash conflicts. + It grows/shrinks the hash table upon insertion/deletion to support + fast lookups and random samplings. + """ + + def __init__(self): + self.initial_size = 32 + self.table = [None] * self.initial_size + self.elements = 0 + + def random_sample(self, sample_size): + """Randomly sample 'sample_size' hash entries from the table.""" + samples = [] + index = random.randint(0, len(self.table) - 1) + pos = index + # Starting from index, adding hash entries to the sample list until + # sample_size is met or we ran out of entries. + while True: + if self.table[pos] is not None: + for i in range(len(self.table[pos])): + if self.table[pos][i] is None: + continue + samples.append(self.table[pos][i]) + if len(samples) == sample_size: + break + pos += 1 + pos = pos % len(self.table) + if pos == index or len(samples) == sample_size: + break + assert len(samples) <= sample_size + return samples + + def __repr__(self): + all_entries = [] + for i in range(len(self.table)): + if self.table[i] is None: + continue + for j in range(len(self.table[i])): + if self.table[i][j] is not None: + all_entries.append(self.table[i][j]) + return "{}".format(all_entries) + + def values(self): + all_values = [] + for i in range(len(self.table)): + if self.table[i] is None: + continue + for j in range(len(self.table[i])): + if self.table[i][j] is not None: + all_values.append(self.table[i][j].value) + return all_values + + def __len__(self): + return self.elements + + def insert(self, key, hash, value): + """ + Insert a hash entry in the table. Replace the old entry if it already + exists. + """ + self.grow() + inserted = False + index = hash % len(self.table) + if self.table[index] is None: + self.table[index] = [] + # Search for the entry first. + for i in range(len(self.table[index])): + if self.table[index][i] is None: + continue + if self.table[index][i].hash == hash and self.table[index][i].key == key: + # The entry already exists in the table. + self.table[index][i] = HashEntry(key, hash, value) + return + + # Find an empty slot. + for i in range(len(self.table[index])): + if self.table[index][i] is None: + self.table[index][i] = HashEntry(key, hash, value) + inserted = True + break + if not inserted: + self.table[index].append(HashEntry(key, hash, value)) + self.elements += 1 + + def resize(self, new_size): + if new_size == len(self.table): + return + if new_size < self.initial_size: + return + if self.elements < 100: + return + new_table = [None] * new_size + # Copy 'self.table' to new_table. + for i in range(len(self.table)): + entries = self.table[i] + if entries is None: + continue + for j in range(len(entries)): + if entries[j] is None: + continue + index = entries[j].hash % new_size + if new_table[index] is None: + new_table[index] = [] + new_table[index].append(entries[j]) + self.table = new_table + del new_table + # Manually call python gc here to free the memory as 'self.table' + # might be very large. + gc.collect() + + def grow(self): + if self.elements < 4 * len(self.table): + return + new_size = int(len(self.table) * 1.5) + self.resize(new_size) + + def delete(self, key, hash): + index = hash % len(self.table) + deleted = False + deleted_entry = None + if self.table[index] is None: + return + for i in range(len(self.table[index])): + if ( + self.table[index][i] is not None + and self.table[index][i].hash == hash + and self.table[index][i].key == key + ): + deleted_entry = self.table[index][i] + self.table[index][i] = None + self.elements -= 1 + deleted = True + break + if deleted: + self.shrink() + return deleted_entry + + def shrink(self): + if self.elements * 2 >= len(self.table): + return + new_size = int(len(self.table) * 0.7) + self.resize(new_size) + + def lookup(self, key, hash): + index = hash % len(self.table) + if self.table[index] is None: + return None + for i in range(len(self.table[index])): + if ( + self.table[index][i] is not None + and self.table[index][i].hash == hash + and self.table[index][i].key == key + ): + return self.table[index][i].value + return None + + +class MissRatioStats: + def __init__(self, time_unit): + self.num_misses = 0 + self.num_accesses = 0 + self.time_unit = time_unit + self.time_misses = {} + self.time_miss_bytes = {} + self.time_accesses = {} + + def update_metrics(self, access_time, is_hit, miss_bytes): + access_time /= kMicrosInSecond * self.time_unit + self.num_accesses += 1 + if access_time not in self.time_accesses: + self.time_accesses[access_time] = 0 + self.time_accesses[access_time] += 1 + if not is_hit: + self.num_misses += 1 + if access_time not in self.time_misses: + self.time_misses[access_time] = 0 + self.time_miss_bytes[access_time] = 0 + self.time_misses[access_time] += 1 + self.time_miss_bytes[access_time] += miss_bytes + + def reset_counter(self): + self.num_misses = 0 + self.num_accesses = 0 + self.time_miss_bytes.clear() + self.time_misses.clear() + self.time_accesses.clear() + + def compute_miss_bytes(self): + miss_bytes = [] + for at in self.time_miss_bytes: + miss_bytes.append(self.time_miss_bytes[at]) + miss_bytes = sorted(miss_bytes) + avg_miss_bytes = 0 + p95_miss_bytes = 0 + for i in range(len(miss_bytes)): + avg_miss_bytes += float(miss_bytes[i]) / float(len(miss_bytes)) + + p95_index = min(int(0.95 * float(len(miss_bytes))), len(miss_bytes) - 1) + p95_miss_bytes = miss_bytes[p95_index] + return avg_miss_bytes, p95_miss_bytes + + def miss_ratio(self): + return float(self.num_misses) * 100.0 / float(self.num_accesses) + + def write_miss_timeline( + self, cache_type, cache_size, target_cf_name, result_dir, start, end + ): + start /= kMicrosInSecond * self.time_unit + end /= kMicrosInSecond * self.time_unit + header_file_path = "{}/header-ml-miss-timeline-{}-{}-{}-{}".format( + result_dir, self.time_unit, cache_type, cache_size, target_cf_name + ) + if not path.exists(header_file_path): + with open(header_file_path, "w+") as header_file: + header = "time" + for trace_time in range(start, end): + header += ",{}".format(trace_time) + header_file.write(header + "\n") + file_path = "{}/data-ml-miss-timeline-{}-{}-{}-{}".format( + result_dir, self.time_unit, cache_type, cache_size, target_cf_name + ) + with open(file_path, "w+") as file: + row = "{}".format(cache_type) + for trace_time in range(start, end): + row += ",{}".format(self.time_misses.get(trace_time, 0)) + file.write(row + "\n") + + def write_miss_ratio_timeline( + self, cache_type, cache_size, target_cf_name, result_dir, start, end + ): + start /= kMicrosInSecond * self.time_unit + end /= kMicrosInSecond * self.time_unit + header_file_path = "{}/header-ml-miss-ratio-timeline-{}-{}-{}-{}".format( + result_dir, self.time_unit, cache_type, cache_size, target_cf_name + ) + if not path.exists(header_file_path): + with open(header_file_path, "w+") as header_file: + header = "time" + for trace_time in range(start, end): + header += ",{}".format(trace_time) + header_file.write(header + "\n") + file_path = "{}/data-ml-miss-ratio-timeline-{}-{}-{}-{}".format( + result_dir, self.time_unit, cache_type, cache_size, target_cf_name + ) + with open(file_path, "w+") as file: + row = "{}".format(cache_type) + for trace_time in range(start, end): + naccesses = self.time_accesses.get(trace_time, 0) + miss_ratio = 0 + if naccesses > 0: + miss_ratio = float( + self.time_misses.get(trace_time, 0) * 100.0 + ) / float(naccesses) + row += ",{0:.2f}".format(miss_ratio) + file.write(row + "\n") + + +class PolicyStats: + def __init__(self, time_unit, policies): + self.time_selected_polices = {} + self.time_accesses = {} + self.policy_names = {} + self.time_unit = time_unit + for i in range(len(policies)): + self.policy_names[i] = policies[i].policy_name() + + def update_metrics(self, access_time, selected_policy): + access_time /= kMicrosInSecond * self.time_unit + if access_time not in self.time_accesses: + self.time_accesses[access_time] = 0 + self.time_accesses[access_time] += 1 + if access_time not in self.time_selected_polices: + self.time_selected_polices[access_time] = {} + policy_name = self.policy_names[selected_policy] + if policy_name not in self.time_selected_polices[access_time]: + self.time_selected_polices[access_time][policy_name] = 0 + self.time_selected_polices[access_time][policy_name] += 1 + + def write_policy_timeline( + self, cache_type, cache_size, target_cf_name, result_dir, start, end + ): + start /= kMicrosInSecond * self.time_unit + end /= kMicrosInSecond * self.time_unit + header_file_path = "{}/header-ml-policy-timeline-{}-{}-{}-{}".format( + result_dir, self.time_unit, cache_type, cache_size, target_cf_name + ) + if not path.exists(header_file_path): + with open(header_file_path, "w+") as header_file: + header = "time" + for trace_time in range(start, end): + header += ",{}".format(trace_time) + header_file.write(header + "\n") + file_path = "{}/data-ml-policy-timeline-{}-{}-{}-{}".format( + result_dir, self.time_unit, cache_type, cache_size, target_cf_name + ) + with open(file_path, "w+") as file: + for policy in self.policy_names: + policy_name = self.policy_names[policy] + row = "{}-{}".format(cache_type, policy_name) + for trace_time in range(start, end): + row += ",{}".format( + self.time_selected_polices.get(trace_time, {}).get( + policy_name, 0 + ) + ) + file.write(row + "\n") + + def write_policy_ratio_timeline( + self, cache_type, cache_size, target_cf_name, file_path, start, end + ): + start /= kMicrosInSecond * self.time_unit + end /= kMicrosInSecond * self.time_unit + header_file_path = "{}/header-ml-policy-ratio-timeline-{}-{}-{}-{}".format( + result_dir, self.time_unit, cache_type, cache_size, target_cf_name + ) + if not path.exists(header_file_path): + with open(header_file_path, "w+") as header_file: + header = "time" + for trace_time in range(start, end): + header += ",{}".format(trace_time) + header_file.write(header + "\n") + file_path = "{}/data-ml-policy-ratio-timeline-{}-{}-{}-{}".format( + result_dir, self.time_unit, cache_type, cache_size, target_cf_name + ) + with open(file_path, "w+") as file: + for policy in self.policy_names: + policy_name = self.policy_names[policy] + row = "{}-{}".format(cache_type, policy_name) + for trace_time in range(start, end): + naccesses = self.time_accesses.get(trace_time, 0) + ratio = 0 + if naccesses > 0: + ratio = float( + self.time_selected_polices.get(trace_time, {}).get( + policy_name, 0 + ) + * 100.0 + ) / float(naccesses) + row += ",{0:.2f}".format(ratio) + file.write(row + "\n") + + +class Policy(object): + """ + A policy maintains a set of evicted keys. It returns a reward of one to + itself if it has not evicted a missing key. Otherwise, it gives itself 0 + reward. + """ + + def __init__(self): + self.evicted_keys = {} + + def evict(self, key, max_size): + self.evicted_keys[key] = 0 + + def delete(self, key): + self.evicted_keys.pop(key, None) + + def prioritize_samples(self, samples, auxilliary_info): + raise NotImplementedError + + def policy_name(self): + raise NotImplementedError + + def generate_reward(self, key): + if key in self.evicted_keys: + return 0 + return 1 + + +class LRUPolicy(Policy): + def prioritize_samples(self, samples, auxilliary_info): + return sorted( + samples, + cmp=lambda e1, e2: e1.value.last_access_number + - e2.value.last_access_number, + ) + + def policy_name(self): + return "lru" + + +class MRUPolicy(Policy): + def prioritize_samples(self, samples, auxilliary_info): + return sorted( + samples, + cmp=lambda e1, e2: e2.value.last_access_number + - e1.value.last_access_number, + ) + + def policy_name(self): + return "mru" + + +class LFUPolicy(Policy): + def prioritize_samples(self, samples, auxilliary_info): + return sorted(samples, cmp=lambda e1, e2: e1.value.num_hits - e2.value.num_hits) + + def policy_name(self): + return "lfu" + + +class HyperbolicPolicy(Policy): + """ + An implementation of Hyperbolic caching. + + Aaron Blankstein, Siddhartha Sen, and Michael J. Freedman. 2017. + Hyperbolic caching: flexible caching for web applications. In Proceedings + of the 2017 USENIX Conference on Usenix Annual Technical Conference + (USENIX ATC '17). USENIX Association, Berkeley, CA, USA, 499-511. + """ + + def compare(self, e1, e2, now): + e1_duration = max(0, (now - e1.value.insertion_time) / kMicrosInSecond) * float( + e1.value.value_size + ) + e2_duration = max(0, (now - e2.value.insertion_time) / kMicrosInSecond) * float( + e2.value.value_size + ) + if e1_duration == e2_duration: + return e1.value.num_hits - e2.value.num_hits + if e1_duration == 0: + return 1 + if e2_duration == 0: + return 1 + diff = (float(e1.value.num_hits) / (float(e1_duration))) - ( + float(e2.value.num_hits) / float(e2_duration) + ) + if diff == 0: + return 0 + elif diff > 0: + return 1 + else: + return -1 + + def prioritize_samples(self, samples, auxilliary_info): + assert len(auxilliary_info) == 3 + now = auxilliary_info[0] + return sorted(samples, cmp=lambda e1, e2: self.compare(e1, e2, now)) + + def policy_name(self): + return "hb" + + +class CostClassPolicy(Policy): + """ + We calculate the hit density of a cost class as + number of hits / total size in cache * average duration in the cache. + + An entry has a higher priority if its class's hit density is higher. + """ + + def compare(self, e1, e2, now, cost_classes, cost_class_label): + e1_class = e1.value.cost_class(cost_class_label) + e2_class = e2.value.cost_class(cost_class_label) + + assert e1_class in cost_classes + assert e2_class in cost_classes + + e1_entry = cost_classes[e1_class] + e2_entry = cost_classes[e2_class] + e1_density = e1_entry.density(now) + e2_density = e2_entry.density(now) + e1_hits = cost_classes[e1_class].hits + e2_hits = cost_classes[e2_class].hits + + if e1_density == e2_density: + return e1_hits - e2_hits + + if e1_entry.num_entries_in_cache == 0: + return -1 + if e2_entry.num_entries_in_cache == 0: + return 1 + + if e1_density == 0: + return 1 + if e2_density == 0: + return -1 + diff = (float(e1_hits) / float(e1_density)) - ( + float(e2_hits) / float(e2_density) + ) + if diff == 0: + return 0 + elif diff > 0: + return 1 + else: + return -1 + + def prioritize_samples(self, samples, auxilliary_info): + assert len(auxilliary_info) == 3 + now = auxilliary_info[0] + cost_classes = auxilliary_info[1] + cost_class_label = auxilliary_info[2] + return sorted( + samples, + cmp=lambda e1, e2: self.compare( + e1, e2, now, cost_classes, cost_class_label + ), + ) + + def policy_name(self): + return "cc" + + +class Cache(object): + """ + This is the base class for the implementations of alternative cache + replacement policies. + """ + + def __init__(self, cache_size, enable_cache_row_key): + self.cache_size = cache_size + self.used_size = 0 + self.per_second_miss_ratio_stats = MissRatioStats(1) + self.miss_ratio_stats = MissRatioStats(kSecondsInMinute) + self.per_hour_miss_ratio_stats = MissRatioStats(kSecondsInHour) + # 0: disabled. 1: enabled. Insert both row and the refereneced data block. + # 2: enabled. Insert only the row but NOT the referenced data block. + self.enable_cache_row_key = enable_cache_row_key + self.get_id_row_key_map = {} + self.max_seen_get_id = 0 + self.retain_get_id_range = 100000 + + def block_key(self, trace_record): + return "b{}".format(trace_record.block_id) + + def row_key(self, trace_record): + return "g{}-{}".format(trace_record.fd, trace_record.key_id) + + def _lookup(self, trace_record, key, hash): + """ + Look up the key in the cache. + Returns true upon a cache hit, false otherwise. + """ + raise NotImplementedError + + def _evict(self, trace_record, key, hash, value_size): + """ + Evict entries in the cache until there is enough room to insert the new + entry with 'value_size'. + """ + raise NotImplementedError + + def _insert(self, trace_record, key, hash, value_size): + """ + Insert the new entry into the cache. + """ + raise NotImplementedError + + def _should_admit(self, trace_record, key, hash, value_size): + """ + A custom admission policy to decide whether we should admit the new + entry upon a cache miss. + Returns true if the new entry should be admitted, false otherwise. + """ + raise NotImplementedError + + def cache_name(self): + """ + The name of the replacement policy. + """ + raise NotImplementedError + + def is_ml_cache(self): + return False + + def _update_stats(self, access_time, is_hit, miss_bytes): + self.per_second_miss_ratio_stats.update_metrics(access_time, is_hit, miss_bytes) + self.miss_ratio_stats.update_metrics(access_time, is_hit, miss_bytes) + self.per_hour_miss_ratio_stats.update_metrics(access_time, is_hit, miss_bytes) + + def access(self, trace_record): + """ + Access a trace record. The simulator calls this function to access a + trace record. + """ + assert self.used_size <= self.cache_size + if ( + self.enable_cache_row_key > 0 + and trace_record.caller == 1 + and trace_record.key_id != 0 + and trace_record.get_id != 0 + ): + # This is a get request. + self._access_row(trace_record) + return + is_hit = self._access_kv( + trace_record, + self.block_key(trace_record), + trace_record.block_id, + trace_record.block_size, + trace_record.no_insert, + ) + self._update_stats( + trace_record.access_time, is_hit=is_hit, miss_bytes=trace_record.block_size + ) + + def _access_row(self, trace_record): + row_key = self.row_key(trace_record) + self.max_seen_get_id = max(self.max_seen_get_id, trace_record.get_id) + self.get_id_row_key_map.pop( + self.max_seen_get_id - self.retain_get_id_range, None + ) + if trace_record.get_id not in self.get_id_row_key_map: + self.get_id_row_key_map[trace_record.get_id] = {} + self.get_id_row_key_map[trace_record.get_id]["h"] = False + if self.get_id_row_key_map[trace_record.get_id]["h"]: + # We treat future accesses as hits since this get request + # completes. + # print("row hit 1") + self._update_stats(trace_record.access_time, is_hit=True, miss_bytes=0) + return + if row_key not in self.get_id_row_key_map[trace_record.get_id]: + # First time seen this key. + is_hit = self._access_kv( + trace_record, + key=row_key, + hash=trace_record.key_id, + value_size=trace_record.kv_size, + no_insert=False, + ) + inserted = False + if trace_record.kv_size > 0: + inserted = True + self.get_id_row_key_map[trace_record.get_id][row_key] = inserted + self.get_id_row_key_map[trace_record.get_id]["h"] = is_hit + if self.get_id_row_key_map[trace_record.get_id]["h"]: + # We treat future accesses as hits since this get request + # completes. + # print("row hit 2") + self._update_stats(trace_record.access_time, is_hit=True, miss_bytes=0) + return + # Access its blocks. + no_insert = trace_record.no_insert + if ( + self.enable_cache_row_key == 2 + and trace_record.kv_size > 0 + and trace_record.block_type == 9 + ): + no_insert = True + is_hit = self._access_kv( + trace_record, + key=self.block_key(trace_record), + hash=trace_record.block_id, + value_size=trace_record.block_size, + no_insert=no_insert, + ) + self._update_stats( + trace_record.access_time, is_hit, miss_bytes=trace_record.block_size + ) + if ( + trace_record.kv_size > 0 + and not self.get_id_row_key_map[trace_record.get_id][row_key] + ): + # Insert the row key-value pair. + self._access_kv( + trace_record, + key=row_key, + hash=trace_record.key_id, + value_size=trace_record.kv_size, + no_insert=False, + ) + # Mark as inserted. + self.get_id_row_key_map[trace_record.get_id][row_key] = True + + def _access_kv(self, trace_record, key, hash, value_size, no_insert): + # Sanity checks. + assert self.used_size <= self.cache_size + if self._lookup(trace_record, key, hash): + # A cache hit. + return True + if no_insert or value_size <= 0: + return False + # A cache miss. + if value_size > self.cache_size: + # The block is too large to fit into the cache. + return False + self._evict(trace_record, key, hash, value_size) + if self._should_admit(trace_record, key, hash, value_size): + self._insert(trace_record, key, hash, value_size) + self.used_size += value_size + return False + + +class CostClassEntry: + """ + A cost class maintains aggregated statistics of cached entries in a class. + For example, we may define block type as a class. Then, cached blocks of the + same type will share one cost class entry. + """ + + def __init__(self): + self.hits = 0 + self.num_entries_in_cache = 0 + self.size_in_cache = 0 + self.sum_insertion_times = 0 + self.sum_last_access_time = 0 + + def insert(self, trace_record, key, value_size): + self.size_in_cache += value_size + self.num_entries_in_cache += 1 + self.sum_insertion_times += trace_record.access_time / kMicrosInSecond + self.sum_last_access_time += trace_record.access_time / kMicrosInSecond + + def remove(self, insertion_time, last_access_time, key, value_size, num_hits): + self.hits -= num_hits + self.num_entries_in_cache -= 1 + self.sum_insertion_times -= insertion_time / kMicrosInSecond + self.size_in_cache -= value_size + self.sum_last_access_time -= last_access_time / kMicrosInSecond + + def update_on_hit(self, trace_record, last_access_time): + self.hits += 1 + self.sum_last_access_time -= last_access_time / kMicrosInSecond + self.sum_last_access_time += trace_record.access_time / kMicrosInSecond + + def avg_lifetime_in_cache(self, now): + avg_insertion_time = self.sum_insertion_times / self.num_entries_in_cache + return now / kMicrosInSecond - avg_insertion_time + + def avg_last_access_time(self): + if self.num_entries_in_cache == 0: + return 0 + return float(self.sum_last_access_time) / float(self.num_entries_in_cache) + + def avg_size(self): + if self.num_entries_in_cache == 0: + return 0 + return float(self.sum_last_access_time) / float(self.num_entries_in_cache) + + def density(self, now): + avg_insertion_time = self.sum_insertion_times / self.num_entries_in_cache + in_cache_duration = now / kMicrosInSecond - avg_insertion_time + return self.size_in_cache * in_cache_duration + + +class MLCache(Cache): + """ + MLCache is the base class for implementations of alternative replacement + policies using reinforcement learning. + """ + + def __init__(self, cache_size, enable_cache_row_key, policies, cost_class_label): + super(MLCache, self).__init__(cache_size, enable_cache_row_key) + self.table = HashTable() + self.policy_stats = PolicyStats(kSecondsInMinute, policies) + self.per_hour_policy_stats = PolicyStats(kSecondsInHour, policies) + self.policies = policies + self.cost_classes = {} + self.cost_class_label = cost_class_label + + def is_ml_cache(self): + return True + + def _lookup(self, trace_record, key, hash): + value = self.table.lookup(key, hash) + if value is not None: + # Update the entry's cost class statistics. + if self.cost_class_label is not None: + cost_class = value.cost_class(self.cost_class_label) + assert cost_class in self.cost_classes + self.cost_classes[cost_class].update_on_hit( + trace_record, value.last_access_time + ) + # Update the entry's last access time. + self.table.insert( + key, + hash, + CacheEntry( + value_size=value.value_size, + cf_id=value.cf_id, + level=value.level, + block_type=value.block_type, + table_id=value.table_id, + access_number=self.miss_ratio_stats.num_accesses, + time_s=trace_record.access_time, + num_hits=value.num_hits + 1, + ), + ) + return True + return False + + def _evict(self, trace_record, key, hash, value_size): + # Select a policy, random sample kSampleSize keys from the cache, then + # evict keys in the sample set until we have enough room for the new + # entry. + policy_index = self._select_policy(trace_record, key) + assert policy_index < len(self.policies) and policy_index >= 0 + self.policies[policy_index].delete(key) + self.policy_stats.update_metrics(trace_record.access_time, policy_index) + self.per_hour_policy_stats.update_metrics( + trace_record.access_time, policy_index + ) + while self.used_size + value_size > self.cache_size: + # Randomly sample n entries. + samples = self.table.random_sample(kSampleSize) + samples = self.policies[policy_index].prioritize_samples( + samples, + [trace_record.access_time, self.cost_classes, self.cost_class_label], + ) + for hash_entry in samples: + assert self.table.delete(hash_entry.key, hash_entry.hash) is not None + self.used_size -= hash_entry.value.value_size + self.policies[policy_index].evict( + key=hash_entry.key, max_size=self.table.elements + ) + # Update the entry's cost class statistics. + if self.cost_class_label is not None: + cost_class = hash_entry.value.cost_class(self.cost_class_label) + assert cost_class in self.cost_classes + self.cost_classes[cost_class].remove( + hash_entry.value.insertion_time, + hash_entry.value.last_access_time, + key, + hash_entry.value.value_size, + hash_entry.value.num_hits, + ) + if self.used_size + value_size <= self.cache_size: + break + + def _insert(self, trace_record, key, hash, value_size): + assert self.used_size + value_size <= self.cache_size + entry = CacheEntry( + value_size, + trace_record.cf_id, + trace_record.level, + trace_record.block_type, + trace_record.table_id, + self.miss_ratio_stats.num_accesses, + trace_record.access_time, + ) + # Update the entry's cost class statistics. + if self.cost_class_label is not None: + cost_class = entry.cost_class(self.cost_class_label) + if cost_class not in self.cost_classes: + self.cost_classes[cost_class] = CostClassEntry() + self.cost_classes[cost_class].insert(trace_record, key, value_size) + self.table.insert(key, hash, entry) + + def _should_admit(self, trace_record, key, hash, value_size): + return True + + def _select_policy(self, trace_record, key): + raise NotImplementedError + + +class ThompsonSamplingCache(MLCache): + """ + An implementation of Thompson Sampling for the Bernoulli Bandit. + + Daniel J. Russo, Benjamin Van Roy, Abbas Kazerouni, Ian Osband, + and Zheng Wen. 2018. A Tutorial on Thompson Sampling. Found. + Trends Mach. Learn. 11, 1 (July 2018), 1-96. + DOI: https://doi.org/10.1561/2200000070 + """ + + def __init__( + self, + cache_size, + enable_cache_row_key, + policies, + cost_class_label, + init_a=1, + init_b=1, + ): + super(ThompsonSamplingCache, self).__init__( + cache_size, enable_cache_row_key, policies, cost_class_label + ) + self._as = {} + self._bs = {} + for _i in range(len(policies)): + self._as = [init_a] * len(self.policies) + self._bs = [init_b] * len(self.policies) + + def _select_policy(self, trace_record, key): + if len(self.policies) == 1: + return 0 + samples = [ + np.random.beta(self._as[x], self._bs[x]) for x in range(len(self.policies)) + ] + selected_policy = max(range(len(self.policies)), key=lambda x: samples[x]) + reward = self.policies[selected_policy].generate_reward(key) + assert reward <= 1 and reward >= 0 + self._as[selected_policy] += reward + self._bs[selected_policy] += 1 - reward + return selected_policy + + def cache_name(self): + if self.enable_cache_row_key: + return "Hybrid ThompsonSampling with cost class {} (ts_hybrid)".format( + self.cost_class_label + ) + return "ThompsonSampling with cost class {} (ts)".format(self.cost_class_label) + + +class LinUCBCache(MLCache): + """ + An implementation of LinUCB with disjoint linear models. + + Lihong Li, Wei Chu, John Langford, and Robert E. Schapire. 2010. + A contextual-bandit approach to personalized news article recommendation. + In Proceedings of the 19th international conference on World wide web + (WWW '10). ACM, New York, NY, USA, 661-670. + DOI=http://dx.doi.org/10.1145/1772690.1772758 + """ + + def __init__(self, cache_size, enable_cache_row_key, policies, cost_class_label): + super(LinUCBCache, self).__init__( + cache_size, enable_cache_row_key, policies, cost_class_label + ) + self.nfeatures = 4 # Block type, level, cf. + self.th = np.zeros((len(self.policies), self.nfeatures)) + self.eps = 0.2 + self.b = np.zeros_like(self.th) + self.A = np.zeros((len(self.policies), self.nfeatures, self.nfeatures)) + self.A_inv = np.zeros((len(self.policies), self.nfeatures, self.nfeatures)) + for i in range(len(self.policies)): + self.A[i] = np.identity(self.nfeatures) + self.th_hat = np.zeros_like(self.th) + self.p = np.zeros(len(self.policies)) + self.alph = 0.2 + + def _select_policy(self, trace_record, key): + if len(self.policies) == 1: + return 0 + x_i = np.zeros(self.nfeatures) # The current context vector + x_i[0] = trace_record.block_type + x_i[1] = trace_record.level + x_i[2] = trace_record.cf_id + p = np.zeros(len(self.policies)) + for a in range(len(self.policies)): + self.th_hat[a] = self.A_inv[a].dot(self.b[a]) + ta = x_i.dot(self.A_inv[a]).dot(x_i) + a_upper_ci = self.alph * np.sqrt(ta) + a_mean = self.th_hat[a].dot(x_i) + p[a] = a_mean + a_upper_ci + p = p + (np.random.random(len(p)) * 0.000001) + selected_policy = p.argmax() + reward = self.policies[selected_policy].generate_reward(key) + assert reward <= 1 and reward >= 0 + self.A[selected_policy] += np.outer(x_i, x_i) + self.b[selected_policy] += reward * x_i + self.A_inv[selected_policy] = np.linalg.inv(self.A[selected_policy]) + del x_i + return selected_policy + + def cache_name(self): + if self.enable_cache_row_key: + return "Hybrid LinUCB with cost class {} (linucb_hybrid)".format( + self.cost_class_label + ) + return "LinUCB with cost class {} (linucb)".format(self.cost_class_label) + + +class OPTCacheEntry: + """ + A cache entry for the OPT algorithm. The entries are sorted based on its + next access sequence number in reverse order, i.e., the entry which next + access is the furthest in the future is ordered before other entries. + """ + + def __init__(self, key, next_access_seq_no, value_size): + self.key = key + self.next_access_seq_no = next_access_seq_no + self.value_size = value_size + self.is_removed = False + + def __cmp__(self, other): + if other.next_access_seq_no != self.next_access_seq_no: + return other.next_access_seq_no - self.next_access_seq_no + return self.value_size - other.value_size + + def __repr__(self): + return "({} {} {} {})".format( + self.key, self.next_access_seq_no, self.value_size, self.is_removed + ) + + +class PQTable: + """ + A hash table with a priority queue. + """ + + def __init__(self): + # A list of entries arranged in a heap sorted based on the entry custom + # implementation of __cmp__ + self.pq = [] + self.table = {} + + def pqinsert(self, entry): + "Add a new key or update the priority of an existing key" + # Remove the entry from the table first. + removed_entry = self.table.pop(entry.key, None) + if removed_entry: + # Mark as removed since there is no 'remove' API in heappq. + # Instead, an entry in pq is removed lazily when calling pop. + removed_entry.is_removed = True + self.table[entry.key] = entry + heapq.heappush(self.pq, entry) + return removed_entry + + def pqpop(self): + while self.pq: + entry = heapq.heappop(self.pq) + if not entry.is_removed: + del self.table[entry.key] + return entry + return None + + def pqpeek(self): + while self.pq: + entry = self.pq[0] + if not entry.is_removed: + return entry + heapq.heappop(self.pq) + return + + def __contains__(self, k): + return k in self.table + + def __getitem__(self, k): + return self.table[k] + + def __len__(self): + return len(self.table) + + def values(self): + return self.table.values() + + +class OPTCache(Cache): + """ + An implementation of the Belady MIN algorithm. OPTCache evicts an entry + in the cache whose next access occurs furthest in the future. + + Note that Belady MIN algorithm is optimal assuming all blocks having the + same size and a missing entry will be inserted in the cache. + These are NOT true for the block cache trace since blocks have different + sizes and we may not insert a block into the cache upon a cache miss. + However, it is still useful to serve as a "theoretical upper bound" on the + lowest miss ratio we can achieve given a cache size. + + L. A. Belady. 1966. A Study of Replacement Algorithms for a + Virtual-storage Computer. IBM Syst. J. 5, 2 (June 1966), 78-101. + DOI=http://dx.doi.org/10.1147/sj.52.0078 + """ + + def __init__(self, cache_size): + super(OPTCache, self).__init__(cache_size, enable_cache_row_key=0) + self.table = PQTable() + + def _lookup(self, trace_record, key, hash): + if key not in self.table: + return False + # A cache hit. Update its next access time. + assert ( + self.table.pqinsert( + OPTCacheEntry( + key, trace_record.next_access_seq_no, self.table[key].value_size + ) + ) + is not None + ) + return True + + def _evict(self, trace_record, key, hash, value_size): + while self.used_size + value_size > self.cache_size: + evict_entry = self.table.pqpop() + assert evict_entry is not None + self.used_size -= evict_entry.value_size + + def _insert(self, trace_record, key, hash, value_size): + assert ( + self.table.pqinsert( + OPTCacheEntry(key, trace_record.next_access_seq_no, value_size) + ) + is None + ) + + def _should_admit(self, trace_record, key, hash, value_size): + return True + + def cache_name(self): + return "Belady MIN (opt)" + + +class GDSizeEntry: + """ + A cache entry for the greedy dual size replacement policy. + """ + + def __init__(self, key, value_size, priority): + self.key = key + self.value_size = value_size + self.priority = priority + self.is_removed = False + + def __cmp__(self, other): + if other.priority != self.priority: + return self.priority - other.priority + return self.value_size - other.value_size + + def __repr__(self): + return "({} {} {} {})".format( + self.key, self.next_access_seq_no, self.value_size, self.is_removed + ) + + +class GDSizeCache(Cache): + """ + An implementation of the greedy dual size algorithm. + We define cost as an entry's size. + + See https://www.usenix.org/legacy/publications/library/proceedings/usits97/full_papers/cao/cao_html/node8.html + and N. Young. The k-server dual and loose competitiveness for paging. + Algorithmica,June 1994, vol. 11,(no.6):525-41. + Rewritten version of ''On-line caching as cache size varies'', + in The 2nd Annual ACM-SIAM Symposium on Discrete Algorithms, 241-250, 1991. + """ + + def __init__(self, cache_size, enable_cache_row_key): + super(GDSizeCache, self).__init__(cache_size, enable_cache_row_key) + self.table = PQTable() + self.L = 0.0 + + def cache_name(self): + if self.enable_cache_row_key: + return "Hybrid GreedyDualSize (gdsize_hybrid)" + return "GreedyDualSize (gdsize)" + + def _lookup(self, trace_record, key, hash): + if key not in self.table: + return False + # A cache hit. Update its priority. + entry = self.table[key] + assert ( + self.table.pqinsert( + GDSizeEntry(key, entry.value_size, self.L + entry.value_size) + ) + is not None + ) + return True + + def _evict(self, trace_record, key, hash, value_size): + while self.used_size + value_size > self.cache_size: + evict_entry = self.table.pqpop() + assert evict_entry is not None + self.L = evict_entry.priority + self.used_size -= evict_entry.value_size + + def _insert(self, trace_record, key, hash, value_size): + assert ( + self.table.pqinsert(GDSizeEntry(key, value_size, self.L + value_size)) + is None + ) + + def _should_admit(self, trace_record, key, hash, value_size): + return True + + +class Deque(object): + """A Deque class facilitates the implementation of LRU and ARC.""" + + def __init__(self): + self.od = OrderedDict() + + def appendleft(self, k): + if k in self.od: + del self.od[k] + self.od[k] = None + + def pop(self): + item = self.od.popitem(last=False) if self.od else None + if item is not None: + return item[0] + return None + + def remove(self, k): + del self.od[k] + + def __len__(self): + return len(self.od) + + def __contains__(self, k): + return k in self.od + + def __iter__(self): + return reversed(self.od) + + def __repr__(self): + return "Deque(%r)" % (list(self),) + + +class ARCCache(Cache): + """ + An implementation of ARC. ARC assumes that all blocks are having the + same size. The size of index and filter blocks are variable. To accommodate + this, we modified ARC as follows: + 1) We use 16 KB as the average block size and calculate the number of blocks + (c) in the cache. + 2) When we insert an entry, the cache evicts entries in both t1 and t2 + queues until it has enough space for the new entry. This also requires + modification of the algorithm to maintain a maximum of 2*c blocks. + + Nimrod Megiddo and Dharmendra S. Modha. 2003. ARC: A Self-Tuning, Low + Overhead Replacement Cache. In Proceedings of the 2nd USENIX Conference on + File and Storage Technologies (FAST '03). USENIX Association, Berkeley, CA, + USA, 115-130. + """ + + def __init__(self, cache_size, enable_cache_row_key): + super(ARCCache, self).__init__(cache_size, enable_cache_row_key) + self.table = {} + self.c = cache_size / 16 * 1024 # Number of elements in the cache. + self.p = 0 # Target size for the list T1 + # L1: only once recently + self.t1 = Deque() # T1: recent cache entries + self.b1 = Deque() # B1: ghost entries recently evicted from the T1 cache + # L2: at least twice recently + self.t2 = Deque() # T2: frequent entries + self.b2 = Deque() # B2: ghost entries recently evicted from the T2 cache + + def _replace(self, key, value_size): + while self.used_size + value_size > self.cache_size: + if self.t1 and ((key in self.b2) or (len(self.t1) > self.p)): + old = self.t1.pop() + self.b1.appendleft(old) + else: + if self.t2: + old = self.t2.pop() + self.b2.appendleft(old) + else: + old = self.t1.pop() + self.b1.appendleft(old) + self.used_size -= self.table[old].value_size + del self.table[old] + + def _lookup(self, trace_record, key, hash): + # Case I: key is in T1 or T2. + # Move key to MRU position in T2. + if key in self.t1: + self.t1.remove(key) + self.t2.appendleft(key) + return True + + if key in self.t2: + self.t2.remove(key) + self.t2.appendleft(key) + return True + return False + + def _evict(self, trace_record, key, hash, value_size): + # Case II: key is in B1 + # Move x from B1 to the MRU position in T2 (also fetch x to the cache). + if key in self.b1: + self.p = min(self.c, self.p + max(len(self.b2) / len(self.b1), 1)) + self._replace(key, value_size) + self.b1.remove(key) + self.t2.appendleft(key) + return + + # Case III: key is in B2 + # Move x from B2 to the MRU position in T2 (also fetch x to the cache). + if key in self.b2: + self.p = max(0, self.p - max(len(self.b1) / len(self.b2), 1)) + self._replace(key, value_size) + self.b2.remove(key) + self.t2.appendleft(key) + return + + # Case IV: key is not in (T1 u B1 u T2 u B2) + self._replace(key, value_size) + while len(self.t1) + len(self.b1) >= self.c and self.b1: + self.b1.pop() + + total = len(self.t1) + len(self.b1) + len(self.t2) + len(self.b2) + while total >= (2 * self.c) and self.b2: + self.b2.pop() + total -= 1 + # Finally, move it to MRU position in T1. + self.t1.appendleft(key) + return + + def _insert(self, trace_record, key, hash, value_size): + self.table[key] = CacheEntry( + value_size, + trace_record.cf_id, + trace_record.level, + trace_record.block_type, + trace_record.table_id, + 0, + trace_record.access_time, + ) + + def _should_admit(self, trace_record, key, hash, value_size): + return True + + def cache_name(self): + if self.enable_cache_row_key: + return "Hybrid Adaptive Replacement Cache (arc_hybrid)" + return "Adaptive Replacement Cache (arc)" + + +class LRUCache(Cache): + """ + A strict LRU queue. + """ + + def __init__(self, cache_size, enable_cache_row_key): + super(LRUCache, self).__init__(cache_size, enable_cache_row_key) + self.table = {} + self.lru = Deque() + + def cache_name(self): + if self.enable_cache_row_key: + return "Hybrid LRU (lru_hybrid)" + return "LRU (lru)" + + def _lookup(self, trace_record, key, hash): + if key not in self.table: + return False + # A cache hit. Update LRU queue. + self.lru.remove(key) + self.lru.appendleft(key) + return True + + def _evict(self, trace_record, key, hash, value_size): + while self.used_size + value_size > self.cache_size: + evict_key = self.lru.pop() + self.used_size -= self.table[evict_key].value_size + del self.table[evict_key] + + def _insert(self, trace_record, key, hash, value_size): + self.table[key] = CacheEntry( + value_size, + trace_record.cf_id, + trace_record.level, + trace_record.block_type, + trace_record.table_id, + 0, + trace_record.access_time, + ) + self.lru.appendleft(key) + + def _should_admit(self, trace_record, key, hash, value_size): + return True + + +class TraceCache(Cache): + """ + A trace cache. Lookup returns true if the trace observes a cache hit. + It is used to maintain cache hits observed in the trace. + """ + + def __init__(self, cache_size): + super(TraceCache, self).__init__(cache_size, enable_cache_row_key=0) + + def _lookup(self, trace_record, key, hash): + return trace_record.is_hit + + def _evict(self, trace_record, key, hash, value_size): + pass + + def _insert(self, trace_record, key, hash, value_size): + pass + + def _should_admit(self, trace_record, key, hash, value_size): + return False + + def cache_name(self): + return "Trace" + + +def parse_cache_size(cs): + cs = cs.replace("\n", "") + if cs[-1] == "M": + return int(cs[: len(cs) - 1]) * 1024 * 1024 + if cs[-1] == "G": + return int(cs[: len(cs) - 1]) * 1024 * 1024 * 1024 + if cs[-1] == "T": + return int(cs[: len(cs) - 1]) * 1024 * 1024 * 1024 * 1024 + return int(cs) + + +def create_cache(cache_type, cache_size, downsample_size): + cache_size = cache_size / downsample_size + enable_cache_row_key = 0 + if "hybridn" in cache_type: + enable_cache_row_key = 2 + cache_type = cache_type[:-8] + if "hybrid" in cache_type: + enable_cache_row_key = 1 + cache_type = cache_type[:-7] + if cache_type == "ts": + return ThompsonSamplingCache( + cache_size, + enable_cache_row_key, + [LRUPolicy(), LFUPolicy(), HyperbolicPolicy()], + cost_class_label=None, + ) + elif cache_type == "linucb": + return LinUCBCache( + cache_size, + enable_cache_row_key, + [LRUPolicy(), LFUPolicy(), HyperbolicPolicy()], + cost_class_label=None, + ) + elif cache_type == "pylru": + return ThompsonSamplingCache( + cache_size, enable_cache_row_key, [LRUPolicy()], cost_class_label=None + ) + elif cache_type == "pymru": + return ThompsonSamplingCache( + cache_size, enable_cache_row_key, [MRUPolicy()], cost_class_label=None + ) + elif cache_type == "pylfu": + return ThompsonSamplingCache( + cache_size, enable_cache_row_key, [LFUPolicy()], cost_class_label=None + ) + elif cache_type == "pyhb": + return ThompsonSamplingCache( + cache_size, + enable_cache_row_key, + [HyperbolicPolicy()], + cost_class_label=None, + ) + elif cache_type == "pycctbbt": + return ThompsonSamplingCache( + cache_size, + enable_cache_row_key, + [CostClassPolicy()], + cost_class_label="table_bt", + ) + elif cache_type == "pycccf": + return ThompsonSamplingCache( + cache_size, enable_cache_row_key, [CostClassPolicy()], cost_class_label="cf" + ) + elif cache_type == "pycctblevelbt": + return ThompsonSamplingCache( + cache_size, + enable_cache_row_key, + [CostClassPolicy()], + cost_class_label="table_level_bt", + ) + elif cache_type == "pycccfbt": + return ThompsonSamplingCache( + cache_size, + enable_cache_row_key, + [CostClassPolicy()], + cost_class_label="cf_bt", + ) + elif cache_type == "pycctb": + return ThompsonSamplingCache( + cache_size, + enable_cache_row_key, + [CostClassPolicy()], + cost_class_label="table", + ) + elif cache_type == "pyccbt": + return ThompsonSamplingCache( + cache_size, enable_cache_row_key, [CostClassPolicy()], cost_class_label="bt" + ) + elif cache_type == "opt": + if enable_cache_row_key: + print("opt does not support hybrid mode.") + assert False + return OPTCache(cache_size) + elif cache_type == "trace": + if enable_cache_row_key: + print("trace does not support hybrid mode.") + assert False + return TraceCache(cache_size) + elif cache_type == "lru": + return LRUCache(cache_size, enable_cache_row_key) + elif cache_type == "arc": + return ARCCache(cache_size, enable_cache_row_key) + elif cache_type == "gdsize": + return GDSizeCache(cache_size, enable_cache_row_key) + else: + print("Unknown cache type {}".format(cache_type)) + assert False + return None + + +class BlockAccessTimeline: + """ + BlockAccessTimeline stores all accesses of a block. + """ + + def __init__(self): + self.accesses = [] + self.current_access_index = 1 + + def get_next_access(self): + if self.current_access_index == len(self.accesses): + return sys.maxsize + next_access_seq_no = self.accesses[self.current_access_index] + self.current_access_index += 1 + return next_access_seq_no + + +def percent(e1, e2): + if e2 == 0: + return -1 + return float(e1) * 100.0 / float(e2) + + +def is_target_cf(access_cf, target_cf_name): + if target_cf_name == "all": + return True + return access_cf == target_cf_name + + +def run( + trace_file_path, + cache_type, + cache, + warmup_seconds, + max_accesses_to_process, + target_cf_name, +): + warmup_complete = False + trace_miss_ratio_stats = MissRatioStats(kSecondsInMinute) + access_seq_no = 0 + time_interval = 1 + start_time = time.time() + trace_start_time = 0 + trace_duration = 0 + is_opt_cache = False + if cache.cache_name() == "Belady MIN (opt)": + is_opt_cache = True + + block_access_timelines = {} + num_no_inserts = 0 + num_blocks_with_no_size = 0 + num_inserts_block_with_no_size = 0 + + if is_opt_cache: + # Read all blocks in memory and stores their access times so that OPT + # can use this information to evict the cached key which next access is + # the furthest in the future. + print("Preprocessing block traces.") + with open(trace_file_path, "r") as trace_file: + for line in trace_file: + if ( + max_accesses_to_process != -1 + and access_seq_no > max_accesses_to_process + ): + break + ts = line.split(",") + timestamp = int(ts[0]) + cf_name = ts[5] + if not is_target_cf(cf_name, target_cf_name): + continue + if trace_start_time == 0: + trace_start_time = timestamp + trace_duration = timestamp - trace_start_time + block_id = int(ts[1]) + block_size = int(ts[3]) + no_insert = int(ts[9]) + if block_id not in block_access_timelines: + block_access_timelines[block_id] = BlockAccessTimeline() + if block_size == 0: + num_blocks_with_no_size += 1 + block_access_timelines[block_id].accesses.append(access_seq_no) + access_seq_no += 1 + if no_insert == 1: + num_no_inserts += 1 + if no_insert == 0 and block_size == 0: + num_inserts_block_with_no_size += 1 + if access_seq_no % 100 != 0: + continue + now = time.time() + if now - start_time > time_interval * 10: + print( + "Take {} seconds to process {} trace records with trace " + "duration of {} seconds. Throughput: {} records/second.".format( + now - start_time, + access_seq_no, + trace_duration / 1000000, + access_seq_no / (now - start_time), + ) + ) + time_interval += 1 + print( + "Trace contains {0} blocks, {1}({2:.2f}%) blocks with no size." + "{3} accesses, {4}({5:.2f}%) accesses with no_insert," + "{6}({7:.2f}%) accesses that want to insert but block size is 0.".format( + len(block_access_timelines), + num_blocks_with_no_size, + percent(num_blocks_with_no_size, len(block_access_timelines)), + access_seq_no, + num_no_inserts, + percent(num_no_inserts, access_seq_no), + num_inserts_block_with_no_size, + percent(num_inserts_block_with_no_size, access_seq_no), + ) + ) + + access_seq_no = 0 + time_interval = 1 + start_time = time.time() + trace_start_time = 0 + trace_duration = 0 + print("Running simulated {} cache on block traces.".format(cache.cache_name())) + with open(trace_file_path, "r") as trace_file: + for line in trace_file: + if ( + max_accesses_to_process != -1 + and access_seq_no > max_accesses_to_process + ): + break + if access_seq_no % 1000000 == 0: + # Force a python gc periodically to reduce memory usage. + gc.collect() + ts = line.split(",") + timestamp = int(ts[0]) + cf_name = ts[5] + if not is_target_cf(cf_name, target_cf_name): + continue + if trace_start_time == 0: + trace_start_time = timestamp + trace_duration = timestamp - trace_start_time + if ( + not warmup_complete + and warmup_seconds > 0 + and trace_duration > warmup_seconds * 1000000 + ): + cache.miss_ratio_stats.reset_counter() + warmup_complete = True + next_access_seq_no = 0 + block_id = int(ts[1]) + if is_opt_cache: + next_access_seq_no = block_access_timelines[block_id].get_next_access() + record = TraceRecord( + access_time=int(ts[0]), + block_id=int(ts[1]), + block_type=int(ts[2]), + block_size=int(ts[3]), + cf_id=int(ts[4]), + cf_name=ts[5], + level=int(ts[6]), + fd=int(ts[7]), + caller=int(ts[8]), + no_insert=int(ts[9]), + get_id=int(ts[10]), + key_id=int(ts[11]), + kv_size=int(ts[12]), + is_hit=int(ts[13]), + referenced_key_exist_in_block=int(ts[14]), + num_keys_in_block=int(ts[15]), + table_id=int(ts[16]), + seq_number=int(ts[17]), + block_key_size=int(ts[18]), + key_size=int(ts[19]), + block_offset_in_file=int(ts[20]), + next_access_seq_no=next_access_seq_no, + ) + trace_miss_ratio_stats.update_metrics( + record.access_time, is_hit=record.is_hit, miss_bytes=record.block_size + ) + cache.access(record) + access_seq_no += 1 + del record + del ts + if access_seq_no % 100 != 0: + continue + # Report progress every 10 seconds. + now = time.time() + if now - start_time > time_interval * 10: + print( + "Take {} seconds to process {} trace records with trace " + "duration of {} seconds. Throughput: {} records/second. " + "Trace miss ratio {}".format( + now - start_time, + access_seq_no, + trace_duration / 1000000, + access_seq_no / (now - start_time), + trace_miss_ratio_stats.miss_ratio(), + ) + ) + time_interval += 1 + print( + "{},0,0,{},{},{}".format( + cache_type, + cache.cache_size, + cache.miss_ratio_stats.miss_ratio(), + cache.miss_ratio_stats.num_accesses, + ) + ) + now = time.time() + print( + "Take {} seconds to process {} trace records with trace duration of {} " + "seconds. Throughput: {} records/second. Trace miss ratio {}".format( + now - start_time, + access_seq_no, + trace_duration / 1000000, + access_seq_no / (now - start_time), + trace_miss_ratio_stats.miss_ratio(), + ) + ) + print( + "{},0,0,{},{},{}".format( + cache_type, + cache.cache_size, + cache.miss_ratio_stats.miss_ratio(), + cache.miss_ratio_stats.num_accesses, + ) + ) + return trace_start_time, trace_duration + + +def report_stats( + cache, + cache_type, + cache_size, + target_cf_name, + result_dir, + trace_start_time, + trace_end_time, +): + cache_label = "{}-{}-{}".format(cache_type, cache_size, target_cf_name) + with open("{}/data-ml-mrc-{}".format(result_dir, cache_label), "w+") as mrc_file: + mrc_file.write( + "{},0,0,{},{},{}\n".format( + cache_type, + cache_size, + cache.miss_ratio_stats.miss_ratio(), + cache.miss_ratio_stats.num_accesses, + ) + ) + + cache_stats = [ + cache.per_second_miss_ratio_stats, + cache.miss_ratio_stats, + cache.per_hour_miss_ratio_stats, + ] + for i in range(len(cache_stats)): + avg_miss_bytes, p95_miss_bytes = cache_stats[i].compute_miss_bytes() + + with open( + "{}/data-ml-avgmb-{}-{}".format( + result_dir, cache_stats[i].time_unit, cache_label + ), + "w+", + ) as mb_file: + mb_file.write( + "{},0,0,{},{}\n".format(cache_type, cache_size, avg_miss_bytes) + ) + + with open( + "{}/data-ml-p95mb-{}-{}".format( + result_dir, cache_stats[i].time_unit, cache_label + ), + "w+", + ) as mb_file: + mb_file.write( + "{},0,0,{},{}\n".format(cache_type, cache_size, p95_miss_bytes) + ) + + cache_stats[i].write_miss_timeline( + cache_type, + cache_size, + target_cf_name, + result_dir, + trace_start_time, + trace_end_time, + ) + cache_stats[i].write_miss_ratio_timeline( + cache_type, + cache_size, + target_cf_name, + result_dir, + trace_start_time, + trace_end_time, + ) + + if not cache.is_ml_cache(): + return + + policy_stats = [cache.policy_stats, cache.per_hour_policy_stats] + for i in range(len(policy_stats)): + policy_stats[i].write_policy_timeline( + cache_type, + cache_size, + target_cf_name, + result_dir, + trace_start_time, + trace_end_time, + ) + policy_stats[i].write_policy_ratio_timeline( + cache_type, + cache_size, + target_cf_name, + result_dir, + trace_start_time, + trace_end_time, + ) + + +if __name__ == "__main__": + if len(sys.argv) <= 8: + print( + "Must provide 8 arguments.\n" + "1) Cache type (ts, linucb, arc, lru, opt, pylru, pymru, pylfu, " + "pyhb, gdsize, trace). One may evaluate the hybrid row_block cache " + "by appending '_hybrid' to a cache_type, e.g., ts_hybrid. " + "Note that hybrid is not supported with opt and trace. \n" + "2) Cache size (xM, xG, xT).\n" + "3) The sampling frequency used to collect the trace. (The " + "simulation scales down the cache size by the sampling frequency).\n" + "4) Warmup seconds (The number of seconds used for warmup).\n" + "5) Trace file path.\n" + "6) Result directory (A directory that saves generated results)\n" + "7) Max number of accesses to process\n" + "8) The target column family. (The simulation will only run " + "accesses on the target column family. If it is set to all, " + "it will run against all accesses.)" + ) + exit(1) + print("Arguments: {}".format(sys.argv)) + cache_type = sys.argv[1] + cache_size = parse_cache_size(sys.argv[2]) + downsample_size = int(sys.argv[3]) + warmup_seconds = int(sys.argv[4]) + trace_file_path = sys.argv[5] + result_dir = sys.argv[6] + max_accesses_to_process = int(sys.argv[7]) + target_cf_name = sys.argv[8] + cache = create_cache(cache_type, cache_size, downsample_size) + trace_start_time, trace_duration = run( + trace_file_path, + cache_type, + cache, + warmup_seconds, + max_accesses_to_process, + target_cf_name, + ) + trace_end_time = trace_start_time + trace_duration + report_stats( + cache, + cache_type, + cache_size, + target_cf_name, + result_dir, + trace_start_time, + trace_end_time, + ) diff --git a/src/rocksdb/tools/block_cache_analyzer/block_cache_pysim.sh b/src/rocksdb/tools/block_cache_analyzer/block_cache_pysim.sh new file mode 100644 index 000000000..295f734aa --- /dev/null +++ b/src/rocksdb/tools/block_cache_analyzer/block_cache_pysim.sh @@ -0,0 +1,156 @@ +#!/usr/bin/env bash +# Copyright (c) Facebook, Inc. and its affiliates. All Rights Reserved. +# +# A shell script to run a batch of pysims and combine individual pysim output files. +# +# Usage: bash block_cache_pysim.sh trace_file_path result_dir downsample_size warmup_seconds max_jobs +# trace_file_path: The file path that stores the traces. +# result_dir: The directory to store pysim results. The output files from a pysim is stores in result_dir/ml +# downsample_size: The downsample size used to collect the trace. +# warmup_seconds: The number of seconds used for warmup. +# max_jobs: The max number of concurrent pysims to run. + +# Install required packages to run simulations. +# sudo dnf install -y numpy scipy python-matplotlib ipython python-pandas sympy python-nose atlas-devel +ulimit -c 0 + +if [ $# -ne 5 ]; then + echo "Usage: ./block_cache_pysim.sh trace_file_path result_dir downsample_size warmup_seconds max_jobs" + exit 0 +fi + +trace_file="$1" +result_dir="$2" +downsample_size="$3" +warmup_seconds="$4" +max_jobs="$5" +max_num_accesses=100000000 +current_jobs=1 + +ml_tmp_result_dir="$result_dir/ml" +rm -rf "$ml_tmp_result_dir" +mkdir -p "$result_dir" +mkdir -p "$ml_tmp_result_dir" + +# Report miss ratio in the trace. +current_jobs=$(ps aux | grep pysim | grep python | grep -cv grep) +for cf_name in "all" +do +for cache_size in "1G" "2G" "4G" "8G" "16G" #"12G" "16G" "1T" +do +for cache_type in "opt" "lru" "pylru" "pycctbbt" "pyhb" "ts" "trace" "lru_hybrid" #"pycctblevelbt" #"lru_hybridn" "opt" #"pylru" "pylru_hybrid" "pycctbbt" "pycccfbt" "trace" +do + if [[ $cache_type == "trace" && $cache_size != "16G" ]]; then + # We only need to collect miss ratios observed in the trace once. + continue + fi + while [ "$current_jobs" -ge "$max_jobs" ] + do + sleep 10 + echo "Waiting jobs to complete. Number of running jobs: $current_jobs" + current_jobs=$(ps aux | grep pysim | grep python | grep -cv grep) + echo "Waiting jobs to complete. Number of running jobs: $current_jobs" + done + output="log-ml-$cache_type-$cache_size-$cf_name" + echo "Running simulation for $cache_type, cache size $cache_size, and cf_name $cf_name. Number of running jobs: $current_jobs. " + nohup python block_cache_pysim.py "$cache_type" "$cache_size" "$downsample_size" "$warmup_seconds" "$trace_file" "$ml_tmp_result_dir" "$max_num_accesses" "$cf_name" >& "$ml_tmp_result_dir/$output" & + current_jobs=$((current_jobs+1)) +done +done +done + +# Wait for all jobs to complete. +while [ $current_jobs -gt 0 ] +do + sleep 10 + echo "Waiting jobs to complete. Number of running jobs: $current_jobs" + current_jobs=$(ps aux | grep pysim | grep python | grep -cv grep) + echo "Waiting jobs to complete. Number of running jobs: $current_jobs" +done + +echo "Combine individual pysim output files" + +rm -rf "$result_dir/ml_*" +for header in "header-" "data-" +do +for fn in "$ml_tmp_result_dir"/* +do + sum_file="" + time_unit="" + capacity="" + target_cf_name="" + if [[ $fn == *"timeline"* ]]; then + tmpfn="$fn" + IFS='-' read -ra elements <<< "$tmpfn" + time_unit_index=0 + capacity_index=0 + for i in "${elements[@]}" + do + if [[ $i == "timeline" ]]; then + break + fi + time_unit_index=$((time_unit_index+1)) + done + time_unit_index=$((time_unit_index+1)) + capacity_index=$((time_unit_index+2)) + target_cf_name_index=$((time_unit_index+3)) + time_unit="${elements[$time_unit_index]}_" + capacity="${elements[$capacity_index]}_" + target_cf_name="${elements[$target_cf_name_index]}_" + fi + + if [[ $fn == *"${header}ml-policy-timeline"* ]]; then + sum_file="$result_dir/ml_${target_cf_name}${capacity}${time_unit}policy_timeline" + fi + if [[ $fn == *"${header}ml-policy-ratio-timeline"* ]]; then + sum_file="$result_dir/ml_${target_cf_name}${capacity}${time_unit}policy_ratio_timeline" + fi + if [[ $fn == *"${header}ml-miss-timeline"* ]]; then + sum_file="$result_dir/ml_${target_cf_name}${capacity}${time_unit}miss_timeline" + fi + if [[ $fn == *"${header}ml-miss-ratio-timeline"* ]]; then + sum_file="$result_dir/ml_${target_cf_name}${capacity}${time_unit}miss_ratio_timeline" + fi + if [[ $fn == *"${header}ml-mrc"* ]]; then + tmpfn="$fn" + IFS='-' read -ra elements <<< "$tmpfn" + target_cf_name=${elements[-1]} + sum_file="${result_dir}/ml_${target_cf_name}_mrc" + fi + if [[ $fn == *"${header}ml-avgmb"* ]]; then + tmpfn="$fn" + IFS='-' read -ra elements <<< "$tmpfn" + time_unit=${elements[3]} + target_cf_name=${elements[-1]} + sum_file="${result_dir}/ml_${time_unit}_${target_cf_name}_avgmb" + fi + if [[ $fn == *"${header}ml-p95mb"* ]]; then + tmpfn="$fn" + IFS='-' read -ra elements <<< "$tmpfn" + time_unit=${elements[3]} + target_cf_name=${elements[-1]} + sum_file="${result_dir}/ml_${time_unit}_${target_cf_name}_p95mb" + fi + if [[ $sum_file == "" ]]; then + continue + fi + if [[ $header == "header-" ]]; then + if [ -e "$sum_file" ]; then + continue + fi + fi + cat "$fn" >> "$sum_file" +done +done + +echo "Done" +for fn in $result_dir/* +do + if [[ $fn == *"_mrc" || $fn == *"_avgmb" || $fn == *"_p95mb" ]]; then + # Sort MRC file by cache_type and cache_size. + tmp_file="$result_dir/tmp_mrc" + cat "$fn" | sort -t ',' -k1,1 -k4,4n > "$tmp_file" + cat "$tmp_file" > "$fn" + rm -rf "$tmp_file" + fi +done diff --git a/src/rocksdb/tools/block_cache_analyzer/block_cache_pysim_test.py b/src/rocksdb/tools/block_cache_analyzer/block_cache_pysim_test.py new file mode 100644 index 000000000..4b2bdeba6 --- /dev/null +++ b/src/rocksdb/tools/block_cache_analyzer/block_cache_pysim_test.py @@ -0,0 +1,734 @@ +#!/usr/bin/env python3 +# Copyright (c) Facebook, Inc. and its affiliates. All Rights Reserved. + +import os +import random +import sys + +from block_cache_pysim import ( + ARCCache, + CacheEntry, + GDSizeCache, + HashTable, + HyperbolicPolicy, + LFUPolicy, + LinUCBCache, + LRUCache, + LRUPolicy, + MRUPolicy, + OPTCache, + OPTCacheEntry, + ThompsonSamplingCache, + TraceCache, + TraceRecord, + create_cache, + kMicrosInSecond, + kSampleSize, + run, +) + + +def test_hash_table(): + print("Test hash table") + table = HashTable() + data_size = 10000 + for i in range(data_size): + table.insert("k{}".format(i), i, "v{}".format(i)) + for i in range(data_size): + assert table.lookup("k{}".format(i), i) is not None + for i in range(data_size): + table.delete("k{}".format(i), i) + for i in range(data_size): + assert table.lookup("k{}".format(i), i) is None + + truth_map = {} + n = 1000000 + records = 100 + for i in range(n): + key_id = random.randint(0, records) + v = random.randint(0, records) + key = "k{}".format(key_id) + value = CacheEntry(v, v, v, v, v, v, v) + action = random.randint(0, 10) + assert len(truth_map) == table.elements, "{} {} {}".format( + len(truth_map), table.elements, i + ) + if action <= 8: + if key in truth_map: + assert table.lookup(key, key_id) is not None + assert truth_map[key].value_size == table.lookup(key, key_id).value_size + else: + assert table.lookup(key, key_id) is None + table.insert(key, key_id, value) + truth_map[key] = value + else: + deleted = table.delete(key, key_id) + if deleted: + assert key in truth_map + if key in truth_map: + del truth_map[key] + + # Check all keys are unique in the sample set. + for _i in range(10): + samples = table.random_sample(kSampleSize) + unique_keys = {} + for sample in samples: + unique_keys[sample.key] = True + assert len(samples) == len(unique_keys) + + assert len(table) == len(truth_map) + for key in truth_map: + assert table.lookup(key, int(key[1:])) is not None + assert truth_map[key].value_size == table.lookup(key, int(key[1:])).value_size + print("Test hash table: Success") + + +def assert_metrics(cache, expected_value, expected_value_size=1, custom_hashtable=True): + assert cache.used_size == expected_value[0], "Expected {}, Actual {}".format( + expected_value[0], cache.used_size + ) + assert ( + cache.miss_ratio_stats.num_accesses == expected_value[1] + ), "Expected {}, Actual {}".format( + expected_value[1], cache.miss_ratio_stats.num_accesses + ) + assert ( + cache.miss_ratio_stats.num_misses == expected_value[2] + ), "Expected {}, Actual {}".format( + expected_value[2], cache.miss_ratio_stats.num_misses + ) + assert len(cache.table) == len(expected_value[3]) + len( + expected_value[4] + ), "Expected {}, Actual {}".format( + len(expected_value[3]) + len(expected_value[4]), cache.table.elements + ) + for expeceted_k in expected_value[3]: + if custom_hashtable: + val = cache.table.lookup("b{}".format(expeceted_k), expeceted_k) + else: + val = cache.table["b{}".format(expeceted_k)] + assert val is not None, "Expected {} Actual: Not Exist {}, Table: {}".format( + expeceted_k, expected_value, cache.table + ) + assert val.value_size == expected_value_size + for expeceted_k in expected_value[4]: + if custom_hashtable: + val = cache.table.lookup("g0-{}".format(expeceted_k), expeceted_k) + else: + val = cache.table["g0-{}".format(expeceted_k)] + assert val is not None + assert val.value_size == expected_value_size + + +# Access k1, k1, k2, k3, k3, k3, k4 +# When k4 is inserted, +# LRU should evict k1. +# LFU should evict k2. +# MRU should evict k3. +def test_cache(cache, expected_value, custom_hashtable=True): + k1 = TraceRecord( + access_time=0, + block_id=1, + block_type=1, + block_size=1, + cf_id=0, + cf_name="", + level=0, + fd=0, + caller=1, + no_insert=0, + get_id=1, + key_id=1, + kv_size=5, + is_hit=1, + referenced_key_exist_in_block=1, + num_keys_in_block=0, + table_id=0, + seq_number=0, + block_key_size=0, + key_size=0, + block_offset_in_file=0, + next_access_seq_no=0, + ) + k2 = TraceRecord( + access_time=1, + block_id=2, + block_type=1, + block_size=1, + cf_id=0, + cf_name="", + level=0, + fd=0, + caller=1, + no_insert=0, + get_id=1, + key_id=1, + kv_size=5, + is_hit=1, + referenced_key_exist_in_block=1, + num_keys_in_block=0, + table_id=0, + seq_number=0, + block_key_size=0, + key_size=0, + block_offset_in_file=0, + next_access_seq_no=0, + ) + k3 = TraceRecord( + access_time=2, + block_id=3, + block_type=1, + block_size=1, + cf_id=0, + cf_name="", + level=0, + fd=0, + caller=1, + no_insert=0, + get_id=1, + key_id=1, + kv_size=5, + is_hit=1, + referenced_key_exist_in_block=1, + num_keys_in_block=0, + table_id=0, + seq_number=0, + block_key_size=0, + key_size=0, + block_offset_in_file=0, + next_access_seq_no=0, + ) + k4 = TraceRecord( + access_time=3, + block_id=4, + block_type=1, + block_size=1, + cf_id=0, + cf_name="", + level=0, + fd=0, + caller=1, + no_insert=0, + get_id=1, + key_id=1, + kv_size=5, + is_hit=1, + referenced_key_exist_in_block=1, + num_keys_in_block=0, + table_id=0, + seq_number=0, + block_key_size=0, + key_size=0, + block_offset_in_file=0, + next_access_seq_no=0, + ) + sequence = [k1, k1, k2, k3, k3, k3] + index = 0 + expected_values = [] + # Access k1, miss. + expected_values.append([1, 1, 1, [1], []]) + # Access k1, hit. + expected_values.append([1, 2, 1, [1], []]) + # Access k2, miss. + expected_values.append([2, 3, 2, [1, 2], []]) + # Access k3, miss. + expected_values.append([3, 4, 3, [1, 2, 3], []]) + # Access k3, hit. + expected_values.append([3, 5, 3, [1, 2, 3], []]) + # Access k3, hit. + expected_values.append([3, 6, 3, [1, 2, 3], []]) + access_time = 0 + for access in sequence: + access.access_time = access_time + cache.access(access) + assert_metrics( + cache, + expected_values[index], + expected_value_size=1, + custom_hashtable=custom_hashtable, + ) + access_time += 1 + index += 1 + k4.access_time = access_time + cache.access(k4) + assert_metrics( + cache, expected_value, expected_value_size=1, custom_hashtable=custom_hashtable + ) + + +def test_lru_cache(cache, custom_hashtable): + print("Test LRU cache") + # Access k4, miss. evict k1 + test_cache(cache, [3, 7, 4, [2, 3, 4], []], custom_hashtable) + print("Test LRU cache: Success") + + +def test_mru_cache(): + print("Test MRU cache") + policies = [] + policies.append(MRUPolicy()) + # Access k4, miss. evict k3 + test_cache( + ThompsonSamplingCache(3, False, policies, cost_class_label=None), + [3, 7, 4, [1, 2, 4], []], + ) + print("Test MRU cache: Success") + + +def test_lfu_cache(): + print("Test LFU cache") + policies = [] + policies.append(LFUPolicy()) + # Access k4, miss. evict k2 + test_cache( + ThompsonSamplingCache(3, False, policies, cost_class_label=None), + [3, 7, 4, [1, 3, 4], []], + ) + print("Test LFU cache: Success") + + +def test_mix(cache): + print("Test Mix {} cache".format(cache.cache_name())) + n = 100000 + records = 100 + block_size_table = {} + trace_num_misses = 0 + for i in range(n): + key_id = random.randint(0, records) + vs = random.randint(0, 10) + now = i * kMicrosInSecond + block_size = vs + if key_id in block_size_table: + block_size = block_size_table[key_id] + else: + block_size_table[key_id] = block_size + is_hit = key_id % 2 + if is_hit == 0: + trace_num_misses += 1 + k = TraceRecord( + access_time=now, + block_id=key_id, + block_type=1, + block_size=block_size, + cf_id=0, + cf_name="", + level=0, + fd=0, + caller=1, + no_insert=0, + get_id=key_id, + key_id=key_id, + kv_size=5, + is_hit=is_hit, + referenced_key_exist_in_block=1, + num_keys_in_block=0, + table_id=0, + seq_number=0, + block_key_size=0, + key_size=0, + block_offset_in_file=0, + next_access_seq_no=vs, + ) + cache.access(k) + assert cache.miss_ratio_stats.miss_ratio() > 0 + if cache.cache_name() == "Trace": + assert cache.miss_ratio_stats.num_accesses == n + assert cache.miss_ratio_stats.num_misses == trace_num_misses + else: + assert cache.used_size <= cache.cache_size + all_values = cache.table.values() + cached_size = 0 + for value in all_values: + cached_size += value.value_size + assert cached_size == cache.used_size, "Expeced {} Actual {}".format( + cache.used_size, cached_size + ) + print("Test Mix {} cache: Success".format(cache.cache_name())) + + +def test_end_to_end(): + print("Test All caches") + n = 100000 + nblocks = 1000 + block_size = 16 * 1024 + ncfs = 7 + nlevels = 6 + nfds = 100000 + trace_file_path = "test_trace" + # All blocks are of the same size so that OPT must achieve the lowest miss + # ratio. + with open(trace_file_path, "w+") as trace_file: + access_records = "" + for i in range(n): + key_id = random.randint(0, nblocks) + cf_id = random.randint(0, ncfs) + level = random.randint(0, nlevels) + fd = random.randint(0, nfds) + now = i * kMicrosInSecond + access_record = "" + access_record += "{},".format(now) + access_record += "{},".format(key_id) + access_record += "{},".format(9) # block type + access_record += "{},".format(block_size) # block size + access_record += "{},".format(cf_id) + access_record += "cf_{},".format(cf_id) + access_record += "{},".format(level) + access_record += "{},".format(fd) + access_record += "{},".format(key_id % 3) # caller + access_record += "{},".format(0) # no insert + access_record += "{},".format(i) # get_id + access_record += "{},".format(i) # key_id + access_record += "{},".format(100) # kv_size + access_record += "{},".format(1) # is_hit + access_record += "{},".format(1) # referenced_key_exist_in_block + access_record += "{},".format(10) # num_keys_in_block + access_record += "{},".format(1) # table_id + access_record += "{},".format(0) # seq_number + access_record += "{},".format(10) # block key size + access_record += "{},".format(20) # key size + access_record += "{},".format(0) # block offset + access_record = access_record[:-1] + access_records += access_record + "\n" + trace_file.write(access_records) + + print("Test All caches: Start testing caches") + cache_size = block_size * nblocks / 10 + downsample_size = 1 + cache_ms = {} + for cache_type in [ + "ts", + "opt", + "lru", + "pylru", + "linucb", + "gdsize", + "pyccbt", + "pycctbbt", + ]: + cache = create_cache(cache_type, cache_size, downsample_size) + run(trace_file_path, cache_type, cache, 0, -1, "all") + cache_ms[cache_type] = cache + assert cache.miss_ratio_stats.num_accesses == n + + for cache_type in cache_ms: + cache = cache_ms[cache_type] + ms = cache.miss_ratio_stats.miss_ratio() + assert ms <= 100.0 and ms >= 0.0 + # OPT should perform the best. + assert cache_ms["opt"].miss_ratio_stats.miss_ratio() <= ms + assert cache.used_size <= cache.cache_size + all_values = cache.table.values() + cached_size = 0 + for value in all_values: + cached_size += value.value_size + assert cached_size == cache.used_size, "Expeced {} Actual {}".format( + cache.used_size, cached_size + ) + print("Test All {}: Success".format(cache.cache_name())) + + os.remove(trace_file_path) + print("Test All: Success") + + +def test_hybrid(cache): + print("Test {} cache".format(cache.cache_name())) + k = TraceRecord( + access_time=0, + block_id=1, + block_type=1, + block_size=1, + cf_id=0, + cf_name="", + level=0, + fd=0, + caller=1, + no_insert=0, + get_id=1, # the first get request. + key_id=1, + kv_size=0, # no size. + is_hit=1, + referenced_key_exist_in_block=1, + num_keys_in_block=0, + table_id=0, + seq_number=0, + block_key_size=0, + key_size=0, + block_offset_in_file=0, + next_access_seq_no=0, + ) + cache.access(k) # Expect a miss. + # used size, num accesses, num misses, hash table size, blocks, get keys. + assert_metrics(cache, [1, 1, 1, [1], []]) + k.access_time += 1 + k.kv_size = 1 + k.block_id = 2 + cache.access(k) # k should be inserted. + assert_metrics(cache, [3, 2, 2, [1, 2], [1]]) + k.access_time += 1 + k.block_id = 3 + cache.access(k) # k should not be inserted again. + assert_metrics(cache, [4, 3, 3, [1, 2, 3], [1]]) + # A second get request referencing the same key. + k.access_time += 1 + k.get_id = 2 + k.block_id = 4 + k.kv_size = 0 + cache.access(k) # k should observe a hit. No block access. + assert_metrics(cache, [4, 4, 3, [1, 2, 3], [1]]) + + # A third get request searches three files, three different keys. + # And the second key observes a hit. + k.access_time += 1 + k.kv_size = 1 + k.get_id = 3 + k.block_id = 3 + k.key_id = 2 + cache.access(k) # k should observe a miss. block 3 observes a hit. + assert_metrics(cache, [5, 5, 3, [1, 2, 3], [1, 2]]) + + k.access_time += 1 + k.kv_size = 1 + k.get_id = 3 + k.block_id = 4 + k.kv_size = 1 + k.key_id = 1 + cache.access(k) # k1 should observe a hit. + assert_metrics(cache, [5, 6, 3, [1, 2, 3], [1, 2]]) + + k.access_time += 1 + k.kv_size = 1 + k.get_id = 3 + k.block_id = 4 + k.kv_size = 1 + k.key_id = 3 + # k3 should observe a miss. + # However, as the get already complete, we should not access k3 any more. + cache.access(k) + assert_metrics(cache, [5, 7, 3, [1, 2, 3], [1, 2]]) + + # A fourth get request searches one file and two blocks. One row key. + k.access_time += 1 + k.get_id = 4 + k.block_id = 5 + k.key_id = 4 + k.kv_size = 1 + cache.access(k) + assert_metrics(cache, [7, 8, 4, [1, 2, 3, 5], [1, 2, 4]]) + + # A bunch of insertions which evict cached row keys. + for i in range(6, 100): + k.access_time += 1 + k.get_id = 0 + k.block_id = i + cache.access(k) + + k.get_id = 4 + k.block_id = 100 # A different block. + k.key_id = 4 # Same row key and should not be inserted again. + k.kv_size = 1 + cache.access(k) + assert_metrics( + cache, [kSampleSize, 103, 99, [i for i in range(101 - kSampleSize, 101)], []] + ) + print("Test {} cache: Success".format(cache.cache_name())) + + +def test_opt_cache(): + print("Test OPT cache") + cache = OPTCache(3) + # seq: 0, 1, 2, 3, 4, 5, 6, 7, 8 + # key: k1, k2, k3, k4, k5, k6, k7, k1, k8 + # next_access: 7, 19, 18, M, M, 17, 16, 25, M + k = TraceRecord( + access_time=0, + block_id=1, + block_type=1, + block_size=1, + cf_id=0, + cf_name="", + level=0, + fd=0, + caller=1, + no_insert=0, + get_id=1, # the first get request. + key_id=1, + kv_size=0, # no size. + is_hit=1, + referenced_key_exist_in_block=1, + num_keys_in_block=0, + table_id=0, + seq_number=0, + block_key_size=0, + key_size=0, + block_offset_in_file=0, + next_access_seq_no=7, + ) + cache.access(k) + assert_metrics( + cache, [1, 1, 1, [1], []], expected_value_size=1, custom_hashtable=False + ) + k.access_time += 1 + k.block_id = 2 + k.next_access_seq_no = 19 + cache.access(k) + assert_metrics( + cache, [2, 2, 2, [1, 2], []], expected_value_size=1, custom_hashtable=False + ) + k.access_time += 1 + k.block_id = 3 + k.next_access_seq_no = 18 + cache.access(k) + assert_metrics( + cache, [3, 3, 3, [1, 2, 3], []], expected_value_size=1, custom_hashtable=False + ) + k.access_time += 1 + k.block_id = 4 + k.next_access_seq_no = sys.maxsize # Never accessed again. + cache.access(k) + # Evict 2 since its next access 19 is the furthest in the future. + assert_metrics( + cache, [3, 4, 4, [1, 3, 4], []], expected_value_size=1, custom_hashtable=False + ) + k.access_time += 1 + k.block_id = 5 + k.next_access_seq_no = sys.maxsize # Never accessed again. + cache.access(k) + # Evict 4 since its next access MAXINT is the furthest in the future. + assert_metrics( + cache, [3, 5, 5, [1, 3, 5], []], expected_value_size=1, custom_hashtable=False + ) + k.access_time += 1 + k.block_id = 6 + k.next_access_seq_no = 17 + cache.access(k) + # Evict 5 since its next access MAXINT is the furthest in the future. + assert_metrics( + cache, [3, 6, 6, [1, 3, 6], []], expected_value_size=1, custom_hashtable=False + ) + k.access_time += 1 + k.block_id = 7 + k.next_access_seq_no = 16 + cache.access(k) + # Evict 3 since its next access 18 is the furthest in the future. + assert_metrics( + cache, [3, 7, 7, [1, 6, 7], []], expected_value_size=1, custom_hashtable=False + ) + k.access_time += 1 + k.block_id = 1 + k.next_access_seq_no = 25 + cache.access(k) + assert_metrics( + cache, [3, 8, 7, [1, 6, 7], []], expected_value_size=1, custom_hashtable=False + ) + k.access_time += 1 + k.block_id = 8 + k.next_access_seq_no = sys.maxsize + cache.access(k) + # Evict 1 since its next access 25 is the furthest in the future. + assert_metrics( + cache, [3, 9, 8, [6, 7, 8], []], expected_value_size=1, custom_hashtable=False + ) + + # Insert a large kv pair to evict all keys. + k.access_time += 1 + k.block_id = 10 + k.block_size = 3 + k.next_access_seq_no = sys.maxsize + cache.access(k) + assert_metrics( + cache, [3, 10, 9, [10], []], expected_value_size=3, custom_hashtable=False + ) + print("Test OPT cache: Success") + + +def test_trace_cache(): + print("Test trace cache") + cache = TraceCache(0) + k = TraceRecord( + access_time=0, + block_id=1, + block_type=1, + block_size=1, + cf_id=0, + cf_name="", + level=0, + fd=0, + caller=1, + no_insert=0, + get_id=1, + key_id=1, + kv_size=0, + is_hit=1, + referenced_key_exist_in_block=1, + num_keys_in_block=0, + table_id=0, + seq_number=0, + block_key_size=0, + key_size=0, + block_offset_in_file=0, + next_access_seq_no=7, + ) + cache.access(k) + assert cache.miss_ratio_stats.num_accesses == 1 + assert cache.miss_ratio_stats.num_misses == 0 + k.is_hit = 0 + cache.access(k) + assert cache.miss_ratio_stats.num_accesses == 2 + assert cache.miss_ratio_stats.num_misses == 1 + print("Test trace cache: Success") + + +if __name__ == "__main__": + test_hash_table() + test_trace_cache() + test_opt_cache() + test_lru_cache( + ThompsonSamplingCache( + 3, enable_cache_row_key=0, policies=[LRUPolicy()], cost_class_label=None + ), + custom_hashtable=True, + ) + test_lru_cache(LRUCache(3, enable_cache_row_key=0), custom_hashtable=False) + test_mru_cache() + test_lfu_cache() + test_hybrid( + ThompsonSamplingCache( + kSampleSize, + enable_cache_row_key=1, + policies=[LRUPolicy()], + cost_class_label=None, + ) + ) + test_hybrid( + LinUCBCache( + kSampleSize, + enable_cache_row_key=1, + policies=[LRUPolicy()], + cost_class_label=None, + ) + ) + for cache_type in [ + "ts", + "opt", + "arc", + "pylfu", + "pymru", + "trace", + "pyhb", + "lru", + "pylru", + "linucb", + "gdsize", + "pycctbbt", + "pycctb", + "pyccbt", + ]: + for enable_row_cache in [0, 1, 2]: + cache_type_str = cache_type + if cache_type != "opt" and cache_type != "trace": + if enable_row_cache == 1: + cache_type_str += "_hybrid" + elif enable_row_cache == 2: + cache_type_str += "_hybridn" + test_mix(create_cache(cache_type_str, cache_size=100, downsample_size=1)) + test_end_to_end() diff --git a/src/rocksdb/tools/block_cache_analyzer/block_cache_trace_analyzer.cc b/src/rocksdb/tools/block_cache_analyzer/block_cache_trace_analyzer.cc new file mode 100644 index 000000000..f90cb794b --- /dev/null +++ b/src/rocksdb/tools/block_cache_analyzer/block_cache_trace_analyzer.cc @@ -0,0 +1,2308 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#ifndef ROCKSDB_LITE +#ifdef GFLAGS +#include "tools/block_cache_analyzer/block_cache_trace_analyzer.h" + +#include <algorithm> +#include <cinttypes> +#include <cstdio> +#include <cstdlib> +#include <fstream> +#include <iomanip> +#include <iostream> +#include <memory> +#include <random> +#include <sstream> + +#include "monitoring/histogram.h" +#include "util/gflags_compat.h" +#include "util/string_util.h" + +using GFLAGS_NAMESPACE::ParseCommandLineFlags; + +DEFINE_string(block_cache_trace_path, "", "The trace file path."); +DEFINE_bool(is_block_cache_human_readable_trace, false, + "Is the trace file provided for analysis generated by running " + "block_cache_trace_analyzer with " + "FLAGS_human_readable_trace_file_path is specified."); +DEFINE_string( + block_cache_sim_config_path, "", + "The config file path. One cache configuration per line. The format of a " + "cache configuration is " + "cache_name,num_shard_bits,ghost_capacity,cache_capacity_1,...,cache_" + "capacity_N. Supported cache names are lru, lru_priority, lru_hybrid, and " + "lru_hybrid_no_insert_on_row_miss. User may also add a prefix 'ghost_' to " + "a cache_name to add a ghost cache in front of the real cache. " + "ghost_capacity and cache_capacity can be xK, xM or xG where x is a " + "positive number."); +DEFINE_int32(block_cache_trace_downsample_ratio, 1, + "The trace collected accesses on one in every " + "block_cache_trace_downsample_ratio blocks. We scale " + "down the simulated cache size by this ratio."); +DEFINE_bool(print_block_size_stats, false, + "Print block size distribution and the distribution break down by " + "block type and column family."); +DEFINE_bool(print_access_count_stats, false, + "Print access count distribution and the distribution break down " + "by block type and column family."); +DEFINE_bool(print_data_block_access_count_stats, false, + "Print data block accesses by user Get and Multi-Get."); +DEFINE_int32(cache_sim_warmup_seconds, 0, + "The number of seconds to warmup simulated caches. The hit/miss " + "counters are reset after the warmup completes."); +DEFINE_int32(analyze_bottom_k_access_count_blocks, 0, + "Print out detailed access information for blocks with their " + "number of accesses are the bottom k among all blocks."); +DEFINE_int32(analyze_top_k_access_count_blocks, 0, + "Print out detailed access information for blocks with their " + "number of accesses are the top k among all blocks."); +DEFINE_string(block_cache_analysis_result_dir, "", + "The directory that saves block cache analysis results."); +DEFINE_string( + timeline_labels, "", + "Group the number of accesses per block per second using these labels. " + "Possible labels are a combination of the following: cf (column family), " + "sst, level, bt (block type), caller, block. For example, label \"cf_bt\" " + "means the number of acccess per second is grouped by unique pairs of " + "\"cf_bt\". A label \"all\" contains the aggregated number of accesses per " + "second across all possible labels."); +DEFINE_string(reuse_distance_labels, "", + "Group the reuse distance of a block using these labels. Reuse " + "distance is defined as the cumulated size of unique blocks read " + "between two consecutive accesses on the same block."); +DEFINE_string( + reuse_distance_buckets, "", + "Group blocks by their reuse distances given these buckets. For " + "example, if 'reuse_distance_buckets' is '1K,1M,1G', we will " + "create four buckets. The first three buckets contain the number of " + "blocks with reuse distance less than 1KB, between 1K and 1M, between 1M " + "and 1G, respectively. The last bucket contains the number of blocks with " + "reuse distance larger than 1G. "); +DEFINE_string( + reuse_interval_labels, "", + "Group the reuse interval of a block using these labels. Reuse " + "interval is defined as the time between two consecutive accesses " + "on the same block."); +DEFINE_string( + reuse_interval_buckets, "", + "Group blocks by their reuse interval given these buckets. For " + "example, if 'reuse_distance_buckets' is '1,10,100', we will " + "create four buckets. The first three buckets contain the number of " + "blocks with reuse interval less than 1 second, between 1 second and 10 " + "seconds, between 10 seconds and 100 seconds, respectively. The last " + "bucket contains the number of blocks with reuse interval longer than 100 " + "seconds."); +DEFINE_string( + reuse_lifetime_labels, "", + "Group the reuse lifetime of a block using these labels. Reuse " + "lifetime is defined as the time interval between the first access on a " + "block and the last access on the same block. For blocks that are only " + "accessed once, its lifetime is set to kMaxUint64."); +DEFINE_string( + reuse_lifetime_buckets, "", + "Group blocks by their reuse lifetime given these buckets. For " + "example, if 'reuse_lifetime_buckets' is '1,10,100', we will " + "create four buckets. The first three buckets contain the number of " + "blocks with reuse lifetime less than 1 second, between 1 second and 10 " + "seconds, between 10 seconds and 100 seconds, respectively. The last " + "bucket contains the number of blocks with reuse lifetime longer than 100 " + "seconds."); +DEFINE_string( + analyze_callers, "", + "The list of callers to perform a detailed analysis on. If speicfied, the " + "analyzer will output a detailed percentage of accesses for each caller " + "break down by column family, level, and block type. A list of available " + "callers are: Get, MultiGet, Iterator, ApproximateSize, VerifyChecksum, " + "SSTDumpTool, ExternalSSTIngestion, Repair, Prefetch, Compaction, " + "CompactionRefill, Flush, SSTFileReader, Uncategorized."); +DEFINE_string(access_count_buckets, "", + "Group number of blocks by their access count given these " + "buckets. If specified, the analyzer will output a detailed " + "analysis on the number of blocks grouped by their access count " + "break down by block type and column family."); +DEFINE_int32(analyze_blocks_reuse_k_reuse_window, 0, + "Analyze the percentage of blocks that are accessed in the " + "[k, 2*k] seconds are accessed again in the next [2*k, 3*k], " + "[3*k, 4*k],...,[k*(n-1), k*n] seconds. "); +DEFINE_string(analyze_get_spatial_locality_labels, "", + "Group data blocks using these labels."); +DEFINE_string(analyze_get_spatial_locality_buckets, "", + "Group data blocks by their statistics using these buckets."); +DEFINE_string(skew_labels, "", + "Group the access count of a block using these labels."); +DEFINE_string(skew_buckets, "", "Group the skew labels using these buckets."); +DEFINE_bool(mrc_only, false, + "Evaluate alternative cache policies only. When this flag is true, " + "the analyzer does NOT maintain states of each block in memory for " + "analysis. It only feeds the accesses into the cache simulators."); +DEFINE_string( + analyze_correlation_coefficients_labels, "", + "Analyze the correlation coefficients of features such as number of past " + "accesses with regard to the number of accesses till the next access."); +DEFINE_int32(analyze_correlation_coefficients_max_number_of_values, 1000000, + "The maximum number of values for a feature. If the number of " + "values for a feature is larger than this max, it randomly " + "selects 'max' number of values."); +DEFINE_string(human_readable_trace_file_path, "", + "The filt path that saves human readable access records."); + +namespace ROCKSDB_NAMESPACE { +namespace { + +const std::string kMissRatioCurveFileName = "mrc"; +const std::string kGroupbyBlock = "block"; +const std::string kGroupbyTable = "table"; +const std::string kGroupbyColumnFamily = "cf"; +const std::string kGroupbySSTFile = "sst"; +const std::string kGroupbyBlockType = "bt"; +const std::string kGroupbyCaller = "caller"; +const std::string kGroupbyLevel = "level"; +const std::string kGroupbyAll = "all"; +const std::set<std::string> kGroupbyLabels{ + kGroupbyBlock, kGroupbyColumnFamily, kGroupbySSTFile, kGroupbyLevel, + kGroupbyBlockType, kGroupbyCaller, kGroupbyAll}; +const std::string kSupportedCacheNames = + " lru ghost_lru lru_priority ghost_lru_priority lru_hybrid " + "ghost_lru_hybrid lru_hybrid_no_insert_on_row_miss " + "ghost_lru_hybrid_no_insert_on_row_miss "; + +// The suffix for the generated csv files. +const std::string kFileNameSuffixMissRatioTimeline = "miss_ratio_timeline"; +const std::string kFileNameSuffixMissTimeline = "miss_timeline"; +const std::string kFileNameSuffixSkew = "skewness"; +const std::string kFileNameSuffixAccessTimeline = "access_timeline"; +const std::string kFileNameSuffixCorrelation = "correlation_input"; +const std::string kFileNameSuffixAvgReuseIntervalNaccesses = + "avg_reuse_interval_naccesses"; +const std::string kFileNameSuffixAvgReuseInterval = "avg_reuse_interval"; +const std::string kFileNameSuffixReuseInterval = "access_reuse_interval"; +const std::string kFileNameSuffixReuseLifetime = "reuse_lifetime"; +const std::string kFileNameSuffixAccessReuseBlocksTimeline = + "reuse_blocks_timeline"; +const std::string kFileNameSuffixPercentOfAccessSummary = + "percentage_of_accesses_summary"; +const std::string kFileNameSuffixPercentRefKeys = "percent_ref_keys"; +const std::string kFileNameSuffixPercentDataSizeOnRefKeys = + "percent_data_size_on_ref_keys"; +const std::string kFileNameSuffixPercentAccessesOnRefKeys = + "percent_accesses_on_ref_keys"; +const std::string kFileNameSuffixAccessCountSummary = "access_count_summary"; + +std::string block_type_to_string(TraceType type) { + switch (type) { + case kBlockTraceFilterBlock: + return "Filter"; + case kBlockTraceDataBlock: + return "Data"; + case kBlockTraceIndexBlock: + return "Index"; + case kBlockTraceRangeDeletionBlock: + return "RangeDeletion"; + case kBlockTraceUncompressionDictBlock: + return "UncompressionDict"; + default: + break; + } + // This cannot happen. + return "InvalidType"; +} + +std::string caller_to_string(TableReaderCaller caller) { + switch (caller) { + case kUserGet: + return "Get"; + case kUserMultiGet: + return "MultiGet"; + case kUserIterator: + return "Iterator"; + case kUserApproximateSize: + return "ApproximateSize"; + case kUserVerifyChecksum: + return "VerifyChecksum"; + case kSSTDumpTool: + return "SSTDumpTool"; + case kExternalSSTIngestion: + return "ExternalSSTIngestion"; + case kRepair: + return "Repair"; + case kPrefetch: + return "Prefetch"; + case kCompaction: + return "Compaction"; + case kCompactionRefill: + return "CompactionRefill"; + case kFlush: + return "Flush"; + case kSSTFileReader: + return "SSTFileReader"; + case kUncategorized: + return "Uncategorized"; + default: + break; + } + // This cannot happen. + return "InvalidCaller"; +} + +TableReaderCaller string_to_caller(std::string caller_str) { + if (caller_str == "Get") { + return kUserGet; + } else if (caller_str == "MultiGet") { + return kUserMultiGet; + } else if (caller_str == "Iterator") { + return kUserIterator; + } else if (caller_str == "ApproximateSize") { + return kUserApproximateSize; + } else if (caller_str == "VerifyChecksum") { + return kUserVerifyChecksum; + } else if (caller_str == "SSTDumpTool") { + return kSSTDumpTool; + } else if (caller_str == "ExternalSSTIngestion") { + return kExternalSSTIngestion; + } else if (caller_str == "Repair") { + return kRepair; + } else if (caller_str == "Prefetch") { + return kPrefetch; + } else if (caller_str == "Compaction") { + return kCompaction; + } else if (caller_str == "CompactionRefill") { + return kCompactionRefill; + } else if (caller_str == "Flush") { + return kFlush; + } else if (caller_str == "SSTFileReader") { + return kSSTFileReader; + } else if (caller_str == "Uncategorized") { + return kUncategorized; + } + return TableReaderCaller::kMaxBlockCacheLookupCaller; +} + +bool is_user_access(TableReaderCaller caller) { + switch (caller) { + case kUserGet: + case kUserMultiGet: + case kUserIterator: + case kUserApproximateSize: + case kUserVerifyChecksum: + return true; + default: + break; + } + return false; +} + +const char kBreakLine[] = + "***************************************************************\n"; + +void print_break_lines(uint32_t num_break_lines) { + for (uint32_t i = 0; i < num_break_lines; i++) { + fprintf(stdout, kBreakLine); + } +} + +double percent(uint64_t numerator, uint64_t denomenator) { + if (denomenator == 0) { + return -1; + } + return static_cast<double>(numerator * 100.0 / denomenator); +} + +std::map<uint64_t, uint64_t> adjust_time_unit( + const std::map<uint64_t, uint64_t>& time_stats, uint64_t time_unit) { + if (time_unit == 1) { + return time_stats; + } + std::map<uint64_t, uint64_t> adjusted_time_stats; + for (auto const& time : time_stats) { + adjusted_time_stats[static_cast<uint64_t>(time.first / time_unit)] += + time.second; + } + return adjusted_time_stats; +} +} // namespace + +void BlockCacheTraceAnalyzer::WriteMissRatioCurves() const { + if (!cache_simulator_) { + return; + } + if (output_dir_.empty()) { + return; + } + uint64_t trace_duration = + trace_end_timestamp_in_seconds_ - trace_start_timestamp_in_seconds_; + uint64_t total_accesses = access_sequence_number_; + const std::string output_miss_ratio_curve_path = + output_dir_ + "/" + std::to_string(trace_duration) + "_" + + std::to_string(total_accesses) + "_" + kMissRatioCurveFileName; + std::ofstream out(output_miss_ratio_curve_path); + if (!out.is_open()) { + return; + } + // Write header. + const std::string header = + "cache_name,num_shard_bits,ghost_capacity,capacity,miss_ratio,total_" + "accesses"; + out << header << std::endl; + for (auto const& config_caches : cache_simulator_->sim_caches()) { + const CacheConfiguration& config = config_caches.first; + for (uint32_t i = 0; i < config.cache_capacities.size(); i++) { + double miss_ratio = + config_caches.second[i]->miss_ratio_stats().miss_ratio(); + // Write the body. + out << config.cache_name; + out << ","; + out << config.num_shard_bits; + out << ","; + out << config.ghost_cache_capacity; + out << ","; + out << config.cache_capacities[i]; + out << ","; + out << std::fixed << std::setprecision(4) << miss_ratio; + out << ","; + out << config_caches.second[i]->miss_ratio_stats().total_accesses(); + out << std::endl; + } + } + out.close(); +} + +void BlockCacheTraceAnalyzer::UpdateFeatureVectors( + const std::vector<uint64_t>& access_sequence_number_timeline, + const std::vector<uint64_t>& access_timeline, const std::string& label, + std::map<std::string, Features>* label_features, + std::map<std::string, Predictions>* label_predictions) const { + if (access_sequence_number_timeline.empty() || access_timeline.empty()) { + return; + } + assert(access_timeline.size() == access_sequence_number_timeline.size()); + uint64_t prev_access_sequence_number = access_sequence_number_timeline[0]; + uint64_t prev_access_timestamp = access_timeline[0]; + for (uint32_t i = 0; i < access_sequence_number_timeline.size(); i++) { + uint64_t num_accesses_since_last_access = + access_sequence_number_timeline[i] - prev_access_sequence_number; + uint64_t elapsed_time_since_last_access = + access_timeline[i] - prev_access_timestamp; + prev_access_sequence_number = access_sequence_number_timeline[i]; + prev_access_timestamp = access_timeline[i]; + if (i < access_sequence_number_timeline.size() - 1) { + (*label_features)[label].num_accesses_since_last_access.push_back( + num_accesses_since_last_access); + (*label_features)[label].num_past_accesses.push_back(i); + (*label_features)[label].elapsed_time_since_last_access.push_back( + elapsed_time_since_last_access); + } + if (i >= 1) { + (*label_predictions)[label].num_accesses_till_next_access.push_back( + num_accesses_since_last_access); + (*label_predictions)[label].elapsed_time_till_next_access.push_back( + elapsed_time_since_last_access); + } + } +} + +void BlockCacheTraceAnalyzer::WriteMissRatioTimeline(uint64_t time_unit) const { + if (!cache_simulator_ || output_dir_.empty()) { + return; + } + std::map<uint64_t, std::map<std::string, std::map<uint64_t, double>>> + cs_name_timeline; + uint64_t start_time = port::kMaxUint64; + uint64_t end_time = 0; + const std::map<uint64_t, uint64_t>& trace_num_misses = + adjust_time_unit(miss_ratio_stats_.num_misses_timeline(), time_unit); + const std::map<uint64_t, uint64_t>& trace_num_accesses = + adjust_time_unit(miss_ratio_stats_.num_accesses_timeline(), time_unit); + assert(trace_num_misses.size() == trace_num_accesses.size()); + for (auto const& num_miss : trace_num_misses) { + uint64_t time = num_miss.first; + start_time = std::min(start_time, time); + end_time = std::max(end_time, time); + uint64_t miss = num_miss.second; + auto it = trace_num_accesses.find(time); + assert(it != trace_num_accesses.end()); + uint64_t access = it->second; + cs_name_timeline[port::kMaxUint64]["trace"][time] = percent(miss, access); + } + for (auto const& config_caches : cache_simulator_->sim_caches()) { + const CacheConfiguration& config = config_caches.first; + std::string cache_label = config.cache_name + "-" + + std::to_string(config.num_shard_bits) + "-" + + std::to_string(config.ghost_cache_capacity); + for (uint32_t i = 0; i < config.cache_capacities.size(); i++) { + const std::map<uint64_t, uint64_t>& num_misses = adjust_time_unit( + config_caches.second[i]->miss_ratio_stats().num_misses_timeline(), + time_unit); + const std::map<uint64_t, uint64_t>& num_accesses = adjust_time_unit( + config_caches.second[i]->miss_ratio_stats().num_accesses_timeline(), + time_unit); + assert(num_misses.size() == num_accesses.size()); + for (auto const& num_miss : num_misses) { + uint64_t time = num_miss.first; + start_time = std::min(start_time, time); + end_time = std::max(end_time, time); + uint64_t miss = num_miss.second; + auto it = num_accesses.find(time); + assert(it != num_accesses.end()); + uint64_t access = it->second; + cs_name_timeline[config.cache_capacities[i]][cache_label][time] = + percent(miss, access); + } + } + } + for (auto const& it : cs_name_timeline) { + const std::string output_miss_ratio_timeline_path = + output_dir_ + "/" + std::to_string(it.first) + "_" + + std::to_string(time_unit) + "_" + kFileNameSuffixMissRatioTimeline; + std::ofstream out(output_miss_ratio_timeline_path); + if (!out.is_open()) { + return; + } + std::string header("time"); + for (uint64_t now = start_time; now <= end_time; now++) { + header += ","; + header += std::to_string(now); + } + out << header << std::endl; + for (auto const& label : it.second) { + std::string row(label.first); + for (uint64_t now = start_time; now <= end_time; now++) { + auto misses = label.second.find(now); + row += ","; + if (misses != label.second.end()) { + row += std::to_string(misses->second); + } else { + row += "0"; + } + } + out << row << std::endl; + } + out.close(); + } +} + +void BlockCacheTraceAnalyzer::WriteMissTimeline(uint64_t time_unit) const { + if (!cache_simulator_ || output_dir_.empty()) { + return; + } + std::map<uint64_t, std::map<std::string, std::map<uint64_t, uint64_t>>> + cs_name_timeline; + uint64_t start_time = port::kMaxUint64; + uint64_t end_time = 0; + const std::map<uint64_t, uint64_t>& trace_num_misses = + adjust_time_unit(miss_ratio_stats_.num_misses_timeline(), time_unit); + for (auto const& num_miss : trace_num_misses) { + uint64_t time = num_miss.first; + start_time = std::min(start_time, time); + end_time = std::max(end_time, time); + uint64_t miss = num_miss.second; + cs_name_timeline[port::kMaxUint64]["trace"][time] = miss; + } + for (auto const& config_caches : cache_simulator_->sim_caches()) { + const CacheConfiguration& config = config_caches.first; + std::string cache_label = config.cache_name + "-" + + std::to_string(config.num_shard_bits) + "-" + + std::to_string(config.ghost_cache_capacity); + for (uint32_t i = 0; i < config.cache_capacities.size(); i++) { + const std::map<uint64_t, uint64_t>& num_misses = adjust_time_unit( + config_caches.second[i]->miss_ratio_stats().num_misses_timeline(), + time_unit); + for (auto const& num_miss : num_misses) { + uint64_t time = num_miss.first; + start_time = std::min(start_time, time); + end_time = std::max(end_time, time); + uint64_t miss = num_miss.second; + cs_name_timeline[config.cache_capacities[i]][cache_label][time] = miss; + } + } + } + for (auto const& it : cs_name_timeline) { + const std::string output_miss_ratio_timeline_path = + output_dir_ + "/" + std::to_string(it.first) + "_" + + std::to_string(time_unit) + "_" + kFileNameSuffixMissTimeline; + std::ofstream out(output_miss_ratio_timeline_path); + if (!out.is_open()) { + return; + } + std::string header("time"); + for (uint64_t now = start_time; now <= end_time; now++) { + header += ","; + header += std::to_string(now); + } + out << header << std::endl; + for (auto const& label : it.second) { + std::string row(label.first); + for (uint64_t now = start_time; now <= end_time; now++) { + auto misses = label.second.find(now); + row += ","; + if (misses != label.second.end()) { + row += std::to_string(misses->second); + } else { + row += "0"; + } + } + out << row << std::endl; + } + out.close(); + } +} + +void BlockCacheTraceAnalyzer::WriteSkewness( + const std::string& label_str, const std::vector<uint64_t>& percent_buckets, + TraceType target_block_type) const { + std::set<std::string> labels = ParseLabelStr(label_str); + std::map<std::string, uint64_t> label_naccesses; + uint64_t total_naccesses = 0; + auto block_callback = [&](const std::string& cf_name, uint64_t fd, + uint32_t level, TraceType type, + const std::string& /*block_key*/, uint64_t block_id, + const BlockAccessInfo& block) { + if (target_block_type != TraceType::kTraceMax && + target_block_type != type) { + return; + } + const std::string label = BuildLabel( + labels, cf_name, fd, level, type, + TableReaderCaller::kMaxBlockCacheLookupCaller, block_id, block); + label_naccesses[label] += block.num_accesses; + total_naccesses += block.num_accesses; + }; + TraverseBlocks(block_callback, &labels); + std::map<std::string, std::map<uint64_t, uint64_t>> label_bucket_naccesses; + std::vector<std::pair<std::string, uint64_t>> pairs; + for (auto const& itr : label_naccesses) { + pairs.push_back(itr); + } + // Sort in descending order. + sort(pairs.begin(), pairs.end(), + [=](const std::pair<std::string, uint64_t>& a, + const std::pair<std::string, uint64_t>& b) { + return b.second < a.second; + }); + + size_t prev_start_index = 0; + for (auto const& percent : percent_buckets) { + label_bucket_naccesses[label_str][percent] = 0; + size_t end_index = 0; + if (percent == port::kMaxUint64) { + end_index = label_naccesses.size(); + } else { + end_index = percent * label_naccesses.size() / 100; + } + for (size_t i = prev_start_index; i < end_index; i++) { + label_bucket_naccesses[label_str][percent] += pairs[i].second; + } + prev_start_index = end_index; + } + std::string filename_suffix; + if (target_block_type != TraceType::kTraceMax) { + filename_suffix = block_type_to_string(target_block_type); + filename_suffix += "_"; + } + filename_suffix += kFileNameSuffixSkew; + WriteStatsToFile(label_str, percent_buckets, filename_suffix, + label_bucket_naccesses, total_naccesses); +} + +void BlockCacheTraceAnalyzer::WriteCorrelationFeatures( + const std::string& label_str, uint32_t max_number_of_values) const { + std::set<std::string> labels = ParseLabelStr(label_str); + std::map<std::string, Features> label_features; + std::map<std::string, Predictions> label_predictions; + auto block_callback = + [&](const std::string& cf_name, uint64_t fd, uint32_t level, + TraceType block_type, const std::string& /*block_key*/, + uint64_t /*block_key_id*/, const BlockAccessInfo& block) { + if (block.table_id == 0 && labels.find(kGroupbyTable) != labels.end()) { + // We only know table id information for get requests. + return; + } + if (labels.find(kGroupbyCaller) != labels.end()) { + // Group by caller. + for (auto const& caller_map : block.caller_access_timeline) { + const std::string label = + BuildLabel(labels, cf_name, fd, level, block_type, + caller_map.first, /*block_id=*/0, block); + auto it = block.caller_access_sequence__number_timeline.find( + caller_map.first); + assert(it != block.caller_access_sequence__number_timeline.end()); + UpdateFeatureVectors(it->second, caller_map.second, label, + &label_features, &label_predictions); + } + return; + } + const std::string label = + BuildLabel(labels, cf_name, fd, level, block_type, + TableReaderCaller::kMaxBlockCacheLookupCaller, + /*block_id=*/0, block); + UpdateFeatureVectors(block.access_sequence_number_timeline, + block.access_timeline, label, &label_features, + &label_predictions); + }; + TraverseBlocks(block_callback, &labels); + WriteCorrelationFeaturesToFile(label_str, label_features, label_predictions, + max_number_of_values); +} + +void BlockCacheTraceAnalyzer::WriteCorrelationFeaturesToFile( + const std::string& label, + const std::map<std::string, Features>& label_features, + const std::map<std::string, Predictions>& label_predictions, + uint32_t max_number_of_values) const { + std::default_random_engine rand_engine(static_cast<std::default_random_engine::result_type>(env_->NowMicros())); + for (auto const& label_feature_vectors : label_features) { + const Features& past = label_feature_vectors.second; + auto it = label_predictions.find(label_feature_vectors.first); + assert(it != label_predictions.end()); + const Predictions& future = it->second; + const std::string output_path = output_dir_ + "/" + label + "_" + + label_feature_vectors.first + "_" + + kFileNameSuffixCorrelation; + std::ofstream out(output_path); + if (!out.is_open()) { + return; + } + std::string header( + "num_accesses_since_last_access,elapsed_time_since_last_access,num_" + "past_accesses,num_accesses_till_next_access,elapsed_time_till_next_" + "access"); + out << header << std::endl; + std::vector<uint32_t> indexes; + for (uint32_t i = 0; i < past.num_accesses_since_last_access.size(); i++) { + indexes.push_back(i); + } + std::shuffle(indexes.begin(), indexes.end(), rand_engine); + for (uint32_t i = 0; i < max_number_of_values && i < indexes.size(); i++) { + uint32_t rand_index = indexes[i]; + out << std::to_string(past.num_accesses_since_last_access[rand_index]) + << ","; + out << std::to_string(past.elapsed_time_since_last_access[rand_index]) + << ","; + out << std::to_string(past.num_past_accesses[rand_index]) << ","; + out << std::to_string(future.num_accesses_till_next_access[rand_index]) + << ","; + out << std::to_string(future.elapsed_time_till_next_access[rand_index]) + << std::endl; + } + out.close(); + } +} + +void BlockCacheTraceAnalyzer::WriteCorrelationFeaturesForGet( + uint32_t max_number_of_values) const { + std::string label = "GetKeyInfo"; + std::map<std::string, Features> label_features; + std::map<std::string, Predictions> label_predictions; + for (auto const& get_info : get_key_info_map_) { + const GetKeyInfo& info = get_info.second; + UpdateFeatureVectors(info.access_sequence_number_timeline, + info.access_timeline, label, &label_features, + &label_predictions); + } + WriteCorrelationFeaturesToFile(label, label_features, label_predictions, + max_number_of_values); +} + +std::set<std::string> BlockCacheTraceAnalyzer::ParseLabelStr( + const std::string& label_str) const { + std::stringstream ss(label_str); + std::set<std::string> labels; + // label_str is in the form of "label1_label2_label3", e.g., cf_bt. + while (ss.good()) { + std::string label_name; + getline(ss, label_name, '_'); + if (kGroupbyLabels.find(label_name) == kGroupbyLabels.end()) { + // Unknown label name. + fprintf(stderr, "Unknown label name %s, label string %s\n", + label_name.c_str(), label_str.c_str()); + return {}; + } + labels.insert(label_name); + } + return labels; +} + +std::string BlockCacheTraceAnalyzer::BuildLabel( + const std::set<std::string>& labels, const std::string& cf_name, + uint64_t fd, uint32_t level, TraceType type, TableReaderCaller caller, + uint64_t block_key, const BlockAccessInfo& block) const { + std::map<std::string, std::string> label_value_map; + label_value_map[kGroupbyAll] = kGroupbyAll; + label_value_map[kGroupbyLevel] = std::to_string(level); + label_value_map[kGroupbyCaller] = caller_to_string(caller); + label_value_map[kGroupbySSTFile] = std::to_string(fd); + label_value_map[kGroupbyBlockType] = block_type_to_string(type); + label_value_map[kGroupbyColumnFamily] = cf_name; + label_value_map[kGroupbyBlock] = std::to_string(block_key); + label_value_map[kGroupbyTable] = std::to_string(block.table_id); + // Concatenate the label values. + std::string label; + for (auto const& l : labels) { + label += label_value_map[l]; + label += "-"; + } + if (!label.empty()) { + label.pop_back(); + } + return label; +} + +void BlockCacheTraceAnalyzer::TraverseBlocks( + std::function<void(const std::string& /*cf_name*/, uint64_t /*fd*/, + uint32_t /*level*/, TraceType /*block_type*/, + const std::string& /*block_key*/, + uint64_t /*block_key_id*/, + const BlockAccessInfo& /*block_access_info*/)> + block_callback, + std::set<std::string>* labels) const { + for (auto const& cf_aggregates : cf_aggregates_map_) { + // Stats per column family. + const std::string& cf_name = cf_aggregates.first; + for (auto const& file_aggregates : cf_aggregates.second.fd_aggregates_map) { + // Stats per SST file. + const uint64_t fd = file_aggregates.first; + const uint32_t level = file_aggregates.second.level; + for (auto const& block_type_aggregates : + file_aggregates.second.block_type_aggregates_map) { + // Stats per block type. + const TraceType type = block_type_aggregates.first; + for (auto const& block_access_info : + block_type_aggregates.second.block_access_info_map) { + // Stats per block. + if (labels && block_access_info.second.table_id == 0 && + labels->find(kGroupbyTable) != labels->end()) { + // We only know table id information for get requests. + return; + } + block_callback(cf_name, fd, level, type, block_access_info.first, + block_access_info.second.block_id, + block_access_info.second); + } + } + } + } +} + +void BlockCacheTraceAnalyzer::WriteGetSpatialLocality( + const std::string& label_str, + const std::vector<uint64_t>& percent_buckets) const { + std::set<std::string> labels = ParseLabelStr(label_str); + std::map<std::string, std::map<uint64_t, uint64_t>> label_pnrefkeys_nblocks; + std::map<std::string, std::map<uint64_t, uint64_t>> label_pnrefs_nblocks; + std::map<std::string, std::map<uint64_t, uint64_t>> label_pndatasize_nblocks; + uint64_t nblocks = 0; + auto block_callback = [&](const std::string& cf_name, uint64_t fd, + uint32_t level, TraceType /*block_type*/, + const std::string& /*block_key*/, + uint64_t /*block_key_id*/, + const BlockAccessInfo& block) { + if (block.num_keys == 0) { + return; + } + uint64_t naccesses = 0; + for (auto const& key_access : block.key_num_access_map) { + for (auto const& caller_access : key_access.second) { + if (caller_access.first == TableReaderCaller::kUserGet) { + naccesses += caller_access.second; + } + } + } + const std::string label = + BuildLabel(labels, cf_name, fd, level, TraceType::kBlockTraceDataBlock, + TableReaderCaller::kUserGet, /*block_id=*/0, block); + + const uint64_t percent_referenced_for_existing_keys = + static_cast<uint64_t>(std::max( + percent(block.key_num_access_map.size(), block.num_keys), 0.0)); + const uint64_t percent_accesses_for_existing_keys = + static_cast<uint64_t>(std::max( + percent(block.num_referenced_key_exist_in_block, naccesses), 0.0)); + const uint64_t percent_referenced_data_size = static_cast<uint64_t>( + std::max(percent(block.referenced_data_size, block.block_size), 0.0)); + if (label_pnrefkeys_nblocks.find(label) == label_pnrefkeys_nblocks.end()) { + for (auto const& percent_bucket : percent_buckets) { + label_pnrefkeys_nblocks[label][percent_bucket] = 0; + label_pnrefs_nblocks[label][percent_bucket] = 0; + label_pndatasize_nblocks[label][percent_bucket] = 0; + } + } + label_pnrefkeys_nblocks[label] + .upper_bound(percent_referenced_for_existing_keys) + ->second += 1; + label_pnrefs_nblocks[label] + .upper_bound(percent_accesses_for_existing_keys) + ->second += 1; + label_pndatasize_nblocks[label] + .upper_bound(percent_referenced_data_size) + ->second += 1; + nblocks += 1; + }; + TraverseBlocks(block_callback, &labels); + WriteStatsToFile(label_str, percent_buckets, kFileNameSuffixPercentRefKeys, + label_pnrefkeys_nblocks, nblocks); + WriteStatsToFile(label_str, percent_buckets, + kFileNameSuffixPercentAccessesOnRefKeys, + label_pnrefs_nblocks, nblocks); + WriteStatsToFile(label_str, percent_buckets, + kFileNameSuffixPercentDataSizeOnRefKeys, + label_pndatasize_nblocks, nblocks); +} + +void BlockCacheTraceAnalyzer::WriteAccessTimeline(const std::string& label_str, + uint64_t time_unit, + bool user_access_only) const { + std::set<std::string> labels = ParseLabelStr(label_str); + uint64_t start_time = port::kMaxUint64; + uint64_t end_time = 0; + std::map<std::string, std::map<uint64_t, uint64_t>> label_access_timeline; + std::map<uint64_t, std::vector<std::string>> access_count_block_id_map; + + auto block_callback = [&](const std::string& cf_name, uint64_t fd, + uint32_t level, TraceType type, + const std::string& /*block_key*/, uint64_t block_id, + const BlockAccessInfo& block) { + uint64_t naccesses = 0; + for (auto const& timeline : block.caller_num_accesses_timeline) { + const TableReaderCaller caller = timeline.first; + if (user_access_only && !is_user_access(caller)) { + continue; + } + const std::string label = + BuildLabel(labels, cf_name, fd, level, type, caller, block_id, block); + for (auto const& naccess : timeline.second) { + const uint64_t timestamp = naccess.first / time_unit; + const uint64_t num = naccess.second; + label_access_timeline[label][timestamp] += num; + start_time = std::min(start_time, timestamp); + end_time = std::max(end_time, timestamp); + naccesses += num; + } + } + if (naccesses > 0) { + access_count_block_id_map[naccesses].push_back(std::to_string(block_id)); + } + }; + TraverseBlocks(block_callback, &labels); + + // We have label_access_timeline now. Write them into a file. + const std::string user_access_prefix = + user_access_only ? "user_access_only_" : "all_access_"; + const std::string output_path = output_dir_ + "/" + user_access_prefix + + label_str + "_" + std::to_string(time_unit) + + "_" + kFileNameSuffixAccessTimeline; + std::ofstream out(output_path); + if (!out.is_open()) { + return; + } + std::string header("time"); + if (labels.find("block") != labels.end()) { + for (uint64_t now = start_time; now <= end_time; now++) { + header += ","; + header += std::to_string(now); + } + out << header << std::endl; + // Write the most frequently accessed blocks first. + for (auto naccess_it = access_count_block_id_map.rbegin(); + naccess_it != access_count_block_id_map.rend(); naccess_it++) { + for (auto& block_id_it : naccess_it->second) { + std::string row(block_id_it); + for (uint64_t now = start_time; now <= end_time; now++) { + auto it = label_access_timeline[block_id_it].find(now); + row += ","; + if (it != label_access_timeline[block_id_it].end()) { + row += std::to_string(it->second); + } else { + row += "0"; + } + } + out << row << std::endl; + } + } + out.close(); + return; + } + for (uint64_t now = start_time; now <= end_time; now++) { + header += ","; + header += std::to_string(now); + } + out << header << std::endl; + for (auto const& label : label_access_timeline) { + std::string row(label.first); + for (uint64_t now = start_time; now <= end_time; now++) { + auto it = label.second.find(now); + row += ","; + if (it != label.second.end()) { + row += std::to_string(it->second); + } else { + row += "0"; + } + } + out << row << std::endl; + } + + out.close(); +} + +void BlockCacheTraceAnalyzer::WriteReuseDistance( + const std::string& label_str, + const std::vector<uint64_t>& distance_buckets) const { + std::set<std::string> labels = ParseLabelStr(label_str); + std::map<std::string, std::map<uint64_t, uint64_t>> label_distance_num_reuses; + uint64_t total_num_reuses = 0; + auto block_callback = [&](const std::string& cf_name, uint64_t fd, + uint32_t level, TraceType type, + const std::string& /*block_key*/, uint64_t block_id, + const BlockAccessInfo& block) { + const std::string label = BuildLabel( + labels, cf_name, fd, level, type, + TableReaderCaller::kMaxBlockCacheLookupCaller, block_id, block); + if (label_distance_num_reuses.find(label) == + label_distance_num_reuses.end()) { + // The first time we encounter this label. + for (auto const& distance_bucket : distance_buckets) { + label_distance_num_reuses[label][distance_bucket] = 0; + } + } + for (auto const& reuse_distance : block.reuse_distance_count) { + label_distance_num_reuses[label] + .upper_bound(reuse_distance.first) + ->second += reuse_distance.second; + total_num_reuses += reuse_distance.second; + } + }; + TraverseBlocks(block_callback, &labels); + // We have label_naccesses and label_distance_num_reuses now. Write them into + // a file. + const std::string output_path = + output_dir_ + "/" + label_str + "_reuse_distance"; + std::ofstream out(output_path); + if (!out.is_open()) { + return; + } + std::string header("bucket"); + for (auto const& label_it : label_distance_num_reuses) { + header += ","; + header += label_it.first; + } + out << header << std::endl; + for (auto const& bucket : distance_buckets) { + std::string row(std::to_string(bucket)); + for (auto const& label_it : label_distance_num_reuses) { + auto const& it = label_it.second.find(bucket); + assert(it != label_it.second.end()); + row += ","; + row += std::to_string(percent(it->second, total_num_reuses)); + } + out << row << std::endl; + } + out.close(); +} + +void BlockCacheTraceAnalyzer::UpdateReuseIntervalStats( + const std::string& label, const std::vector<uint64_t>& time_buckets, + const std::map<uint64_t, uint64_t> timeline, + std::map<std::string, std::map<uint64_t, uint64_t>>* label_time_num_reuses, + uint64_t* total_num_reuses) const { + assert(label_time_num_reuses); + assert(total_num_reuses); + if (label_time_num_reuses->find(label) == label_time_num_reuses->end()) { + // The first time we encounter this label. + for (auto const& time_bucket : time_buckets) { + (*label_time_num_reuses)[label][time_bucket] = 0; + } + } + auto it = timeline.begin(); + uint64_t prev_timestamp = it->first; + const uint64_t prev_num = it->second; + it++; + // Reused within one second. + if (prev_num > 1) { + (*label_time_num_reuses)[label].upper_bound(0)->second += prev_num - 1; + *total_num_reuses += prev_num - 1; + } + while (it != timeline.end()) { + const uint64_t timestamp = it->first; + const uint64_t num = it->second; + const uint64_t reuse_interval = timestamp - prev_timestamp; + (*label_time_num_reuses)[label].upper_bound(reuse_interval)->second += 1; + if (num > 1) { + (*label_time_num_reuses)[label].upper_bound(0)->second += num - 1; + } + prev_timestamp = timestamp; + *total_num_reuses += num; + it++; + } +} + +void BlockCacheTraceAnalyzer::WriteStatsToFile( + const std::string& label_str, const std::vector<uint64_t>& time_buckets, + const std::string& filename_suffix, + const std::map<std::string, std::map<uint64_t, uint64_t>>& label_data, + uint64_t ntotal) const { + const std::string output_path = + output_dir_ + "/" + label_str + "_" + filename_suffix; + std::ofstream out(output_path); + if (!out.is_open()) { + return; + } + std::string header("bucket"); + for (auto const& label_it : label_data) { + header += ","; + header += label_it.first; + } + out << header << std::endl; + for (auto const& bucket : time_buckets) { + std::string row(std::to_string(bucket)); + for (auto const& label_it : label_data) { + auto const& it = label_it.second.find(bucket); + assert(it != label_it.second.end()); + row += ","; + row += std::to_string(percent(it->second, ntotal)); + } + out << row << std::endl; + } + out.close(); +} + +void BlockCacheTraceAnalyzer::WriteReuseInterval( + const std::string& label_str, + const std::vector<uint64_t>& time_buckets) const { + std::set<std::string> labels = ParseLabelStr(label_str); + std::map<std::string, std::map<uint64_t, uint64_t>> label_time_num_reuses; + std::map<std::string, std::map<uint64_t, uint64_t>> label_avg_reuse_nblocks; + std::map<std::string, std::map<uint64_t, uint64_t>> label_avg_reuse_naccesses; + + uint64_t total_num_reuses = 0; + uint64_t total_nblocks = 0; + uint64_t total_accesses = 0; + auto block_callback = [&](const std::string& cf_name, uint64_t fd, + uint32_t level, TraceType type, + const std::string& /*block_key*/, uint64_t block_id, + const BlockAccessInfo& block) { + total_nblocks++; + total_accesses += block.num_accesses; + uint64_t avg_reuse_interval = 0; + if (block.num_accesses > 1) { + avg_reuse_interval = ((block.last_access_time - block.first_access_time) / + kMicrosInSecond) / + block.num_accesses; + } else { + avg_reuse_interval = port::kMaxUint64 - 1; + } + if (labels.find(kGroupbyCaller) != labels.end()) { + for (auto const& timeline : block.caller_num_accesses_timeline) { + const TableReaderCaller caller = timeline.first; + const std::string label = BuildLabel(labels, cf_name, fd, level, type, + caller, block_id, block); + UpdateReuseIntervalStats(label, time_buckets, timeline.second, + &label_time_num_reuses, &total_num_reuses); + } + return; + } + // Does not group by caller so we need to flatten the access timeline. + const std::string label = BuildLabel( + labels, cf_name, fd, level, type, + TableReaderCaller::kMaxBlockCacheLookupCaller, block_id, block); + std::map<uint64_t, uint64_t> timeline; + for (auto const& caller_timeline : block.caller_num_accesses_timeline) { + for (auto const& time_naccess : caller_timeline.second) { + timeline[time_naccess.first] += time_naccess.second; + } + } + UpdateReuseIntervalStats(label, time_buckets, timeline, + &label_time_num_reuses, &total_num_reuses); + if (label_avg_reuse_nblocks.find(label) == label_avg_reuse_nblocks.end()) { + for (auto const& time_bucket : time_buckets) { + label_avg_reuse_nblocks[label][time_bucket] = 0; + label_avg_reuse_naccesses[label][time_bucket] = 0; + } + } + label_avg_reuse_nblocks[label].upper_bound(avg_reuse_interval)->second += 1; + label_avg_reuse_naccesses[label].upper_bound(avg_reuse_interval)->second += + block.num_accesses; + }; + TraverseBlocks(block_callback, &labels); + + // Write the stats into files. + WriteStatsToFile(label_str, time_buckets, kFileNameSuffixReuseInterval, + label_time_num_reuses, total_num_reuses); + WriteStatsToFile(label_str, time_buckets, kFileNameSuffixAvgReuseInterval, + label_avg_reuse_nblocks, total_nblocks); + WriteStatsToFile(label_str, time_buckets, + kFileNameSuffixAvgReuseIntervalNaccesses, + label_avg_reuse_naccesses, total_accesses); +} + +void BlockCacheTraceAnalyzer::WriteReuseLifetime( + const std::string& label_str, + const std::vector<uint64_t>& time_buckets) const { + std::set<std::string> labels = ParseLabelStr(label_str); + std::map<std::string, std::map<uint64_t, uint64_t>> label_lifetime_nblocks; + uint64_t total_nblocks = 0; + auto block_callback = [&](const std::string& cf_name, uint64_t fd, + uint32_t level, TraceType type, + const std::string& /*block_key*/, uint64_t block_id, + const BlockAccessInfo& block) { + uint64_t lifetime = 0; + if (block.num_accesses > 1) { + lifetime = + (block.last_access_time - block.first_access_time) / kMicrosInSecond; + } else { + lifetime = port::kMaxUint64 - 1; + } + const std::string label = BuildLabel( + labels, cf_name, fd, level, type, + TableReaderCaller::kMaxBlockCacheLookupCaller, block_id, block); + + if (label_lifetime_nblocks.find(label) == label_lifetime_nblocks.end()) { + // The first time we encounter this label. + for (auto const& time_bucket : time_buckets) { + label_lifetime_nblocks[label][time_bucket] = 0; + } + } + label_lifetime_nblocks[label].upper_bound(lifetime)->second += 1; + total_nblocks += 1; + }; + TraverseBlocks(block_callback, &labels); + WriteStatsToFile(label_str, time_buckets, kFileNameSuffixReuseLifetime, + label_lifetime_nblocks, total_nblocks); +} + +void BlockCacheTraceAnalyzer::WriteBlockReuseTimeline( + const uint64_t reuse_window, bool user_access_only, TraceType block_type) const { + // A map from block key to an array of bools that states whether a block is + // accessed in a time window. + std::map<uint64_t, std::vector<bool>> block_accessed; + const uint64_t trace_duration = + trace_end_timestamp_in_seconds_ - trace_start_timestamp_in_seconds_; + const uint64_t reuse_vector_size = (trace_duration / reuse_window); + if (reuse_vector_size < 2) { + // The reuse window is less than 2. We cannot calculate the reused + // percentage of blocks. + return; + } + auto block_callback = [&](const std::string& /*cf_name*/, uint64_t /*fd*/, + uint32_t /*level*/, TraceType /*type*/, + const std::string& /*block_key*/, uint64_t block_id, + const BlockAccessInfo& block) { + if (block_accessed.find(block_id) == block_accessed.end()) { + block_accessed[block_id].resize(reuse_vector_size); + for (uint64_t i = 0; i < reuse_vector_size; i++) { + block_accessed[block_id][i] = false; + } + } + for (auto const& caller_num : block.caller_num_accesses_timeline) { + const TableReaderCaller caller = caller_num.first; + for (auto const& timeline : caller_num.second) { + const uint64_t timestamp = timeline.first; + const uint64_t elapsed_time = + timestamp - trace_start_timestamp_in_seconds_; + if (!user_access_only || is_user_access(caller)) { + uint64_t index = + std::min(elapsed_time / reuse_window, reuse_vector_size - 1); + block_accessed[block_id][index] = true; + } + } + } + }; + TraverseBlocks(block_callback); + + // A cell is the number of blocks accessed in a reuse window. + std::unique_ptr<uint64_t[]> reuse_table(new uint64_t[reuse_vector_size * reuse_vector_size]); + for (uint64_t start_time = 0; start_time < reuse_vector_size; start_time++) { + // Initialize the reuse_table. + for (uint64_t i = 0; i < reuse_vector_size; i++) { + reuse_table[start_time * reuse_vector_size + i] = 0; + } + // Examine all blocks. + for (auto const& block : block_accessed) { + for (uint64_t i = start_time; i < reuse_vector_size; i++) { + if (block.second[start_time] && block.second[i]) { + // This block is accessed at start time and at the current time. We + // increment reuse_table[start_time][i] since it is reused at the ith + // window. + reuse_table[start_time * reuse_vector_size + i]++; + } + } + } + } + const std::string user_access_prefix = + user_access_only ? "_user_access_only_" : "_all_access_"; + const std::string output_path = + output_dir_ + "/" + block_type_to_string(block_type) + + user_access_prefix + std::to_string(reuse_window) + "_" + + kFileNameSuffixAccessReuseBlocksTimeline; + std::ofstream out(output_path); + if (!out.is_open()) { + return; + } + std::string header("start_time"); + for (uint64_t start_time = 0; start_time < reuse_vector_size; start_time++) { + header += ","; + header += std::to_string(start_time); + } + out << header << std::endl; + for (uint64_t start_time = 0; start_time < reuse_vector_size; start_time++) { + std::string row(std::to_string(start_time * reuse_window)); + for (uint64_t j = 0; j < reuse_vector_size; j++) { + row += ","; + if (j < start_time) { + row += "100.0"; + } else { + row += std::to_string(percent(reuse_table[start_time * reuse_vector_size + j], + reuse_table[start_time * reuse_vector_size + start_time])); + } + } + out << row << std::endl; + } + out.close(); +} + +std::string BlockCacheTraceAnalyzer::OutputPercentAccessStats( + uint64_t total_accesses, + const std::map<std::string, uint64_t>& cf_access_count) const { + std::string row; + for (auto const& cf_aggregates : cf_aggregates_map_) { + const std::string& cf_name = cf_aggregates.first; + const auto& naccess = cf_access_count.find(cf_name); + row += ","; + if (naccess != cf_access_count.end()) { + row += std::to_string(percent(naccess->second, total_accesses)); + } else { + row += "0"; + } + } + return row; +} + +void BlockCacheTraceAnalyzer::WritePercentAccessSummaryStats() const { + std::map<TableReaderCaller, std::map<std::string, uint64_t>> + caller_cf_accesses; + uint64_t total_accesses = 0; + auto block_callback = + [&](const std::string& cf_name, uint64_t /*fd*/, uint32_t /*level*/, + TraceType /*type*/, const std::string& /*block_key*/, + uint64_t /*block_id*/, const BlockAccessInfo& block) { + for (auto const& caller_num : block.caller_num_access_map) { + const TableReaderCaller caller = caller_num.first; + const uint64_t naccess = caller_num.second; + caller_cf_accesses[caller][cf_name] += naccess; + total_accesses += naccess; + } + }; + TraverseBlocks(block_callback); + + const std::string output_path = + output_dir_ + "/" + kFileNameSuffixPercentOfAccessSummary; + std::ofstream out(output_path); + if (!out.is_open()) { + return; + } + std::string header("caller"); + for (auto const& cf_name : cf_aggregates_map_) { + header += ","; + header += cf_name.first; + } + out << header << std::endl; + for (auto const& cf_naccess_it : caller_cf_accesses) { + const TableReaderCaller caller = cf_naccess_it.first; + std::string row; + row += caller_to_string(caller); + row += OutputPercentAccessStats(total_accesses, cf_naccess_it.second); + out << row << std::endl; + } + out.close(); +} + +void BlockCacheTraceAnalyzer::WriteDetailedPercentAccessSummaryStats( + TableReaderCaller analyzing_caller) const { + std::map<uint32_t, std::map<std::string, uint64_t>> level_cf_accesses; + std::map<TraceType, std::map<std::string, uint64_t>> bt_cf_accesses; + uint64_t total_accesses = 0; + auto block_callback = + [&](const std::string& cf_name, uint64_t /*fd*/, uint32_t level, + TraceType type, const std::string& /*block_key*/, + uint64_t /*block_id*/, const BlockAccessInfo& block) { + for (auto const& caller_num : block.caller_num_access_map) { + const TableReaderCaller caller = caller_num.first; + if (caller == analyzing_caller) { + const uint64_t naccess = caller_num.second; + level_cf_accesses[level][cf_name] += naccess; + bt_cf_accesses[type][cf_name] += naccess; + total_accesses += naccess; + } + } + }; + TraverseBlocks(block_callback); + { + const std::string output_path = + output_dir_ + "/" + caller_to_string(analyzing_caller) + "_level_" + + kFileNameSuffixPercentOfAccessSummary; + std::ofstream out(output_path); + if (!out.is_open()) { + return; + } + std::string header("level"); + for (auto const& cf_name : cf_aggregates_map_) { + header += ","; + header += cf_name.first; + } + out << header << std::endl; + for (auto const& level_naccess_it : level_cf_accesses) { + const uint32_t level = level_naccess_it.first; + std::string row; + row += std::to_string(level); + row += OutputPercentAccessStats(total_accesses, level_naccess_it.second); + out << row << std::endl; + } + out.close(); + } + { + const std::string output_path = + output_dir_ + "/" + caller_to_string(analyzing_caller) + "_bt_" + + kFileNameSuffixPercentOfAccessSummary; + std::ofstream out(output_path); + if (!out.is_open()) { + return; + } + std::string header("bt"); + for (auto const& cf_name : cf_aggregates_map_) { + header += ","; + header += cf_name.first; + } + out << header << std::endl; + for (auto const& bt_naccess_it : bt_cf_accesses) { + const TraceType bt = bt_naccess_it.first; + std::string row; + row += block_type_to_string(bt); + row += OutputPercentAccessStats(total_accesses, bt_naccess_it.second); + out << row << std::endl; + } + out.close(); + } +} + +void BlockCacheTraceAnalyzer::WriteAccessCountSummaryStats( + const std::vector<uint64_t>& access_count_buckets, + bool user_access_only) const { + // x: buckets. + // y: # of accesses. + std::map<std::string, std::map<uint64_t, uint64_t>> bt_access_nblocks; + std::map<std::string, std::map<uint64_t, uint64_t>> cf_access_nblocks; + uint64_t total_nblocks = 0; + auto block_callback = + [&](const std::string& cf_name, uint64_t /*fd*/, uint32_t /*level*/, + TraceType type, const std::string& /*block_key*/, + uint64_t /*block_id*/, const BlockAccessInfo& block) { + const std::string type_str = block_type_to_string(type); + if (cf_access_nblocks.find(cf_name) == cf_access_nblocks.end()) { + // initialize. + for (auto& access : access_count_buckets) { + cf_access_nblocks[cf_name][access] = 0; + } + } + if (bt_access_nblocks.find(type_str) == bt_access_nblocks.end()) { + // initialize. + for (auto& access : access_count_buckets) { + bt_access_nblocks[type_str][access] = 0; + } + } + uint64_t naccesses = 0; + for (auto const& caller_access : block.caller_num_access_map) { + if (!user_access_only || is_user_access(caller_access.first)) { + naccesses += caller_access.second; + } + } + if (naccesses == 0) { + return; + } + total_nblocks += 1; + bt_access_nblocks[type_str].upper_bound(naccesses)->second += 1; + cf_access_nblocks[cf_name].upper_bound(naccesses)->second += 1; + }; + TraverseBlocks(block_callback); + const std::string user_access_prefix = + user_access_only ? "user_access_only_" : "all_access_"; + WriteStatsToFile("cf", access_count_buckets, + user_access_prefix + kFileNameSuffixAccessCountSummary, + cf_access_nblocks, total_nblocks); + WriteStatsToFile("bt", access_count_buckets, + user_access_prefix + kFileNameSuffixAccessCountSummary, + bt_access_nblocks, total_nblocks); +} + +BlockCacheTraceAnalyzer::BlockCacheTraceAnalyzer( + const std::string& trace_file_path, const std::string& output_dir, + const std::string& human_readable_trace_file_path, + bool compute_reuse_distance, bool mrc_only, + bool is_human_readable_trace_file, + std::unique_ptr<BlockCacheTraceSimulator>&& cache_simulator) + : env_(ROCKSDB_NAMESPACE::Env::Default()), + trace_file_path_(trace_file_path), + output_dir_(output_dir), + human_readable_trace_file_path_(human_readable_trace_file_path), + compute_reuse_distance_(compute_reuse_distance), + mrc_only_(mrc_only), + is_human_readable_trace_file_(is_human_readable_trace_file), + cache_simulator_(std::move(cache_simulator)) {} + +void BlockCacheTraceAnalyzer::ComputeReuseDistance( + BlockAccessInfo* info) const { + assert(info); + if (info->num_accesses == 0) { + return; + } + uint64_t reuse_distance = 0; + for (auto const& block_key : info->unique_blocks_since_last_access) { + auto const& it = block_info_map_.find(block_key); + // This block must exist. + assert(it != block_info_map_.end()); + reuse_distance += it->second->block_size; + } + info->reuse_distance_count[reuse_distance] += 1; + // We clear this hash set since this is the second access on this block. + info->unique_blocks_since_last_access.clear(); +} + +Status BlockCacheTraceAnalyzer::RecordAccess( + const BlockCacheTraceRecord& access) { + ColumnFamilyAccessInfoAggregate& cf_aggr = cf_aggregates_map_[access.cf_name]; + SSTFileAccessInfoAggregate& file_aggr = + cf_aggr.fd_aggregates_map[access.sst_fd_number]; + file_aggr.level = access.level; + BlockTypeAccessInfoAggregate& block_type_aggr = + file_aggr.block_type_aggregates_map[access.block_type]; + if (block_type_aggr.block_access_info_map.find(access.block_key) == + block_type_aggr.block_access_info_map.end()) { + block_type_aggr.block_access_info_map[access.block_key].block_id = + unique_block_id_; + unique_block_id_++; + } + BlockAccessInfo& block_access_info = + block_type_aggr.block_access_info_map[access.block_key]; + if (compute_reuse_distance_) { + ComputeReuseDistance(&block_access_info); + } + block_access_info.AddAccess(access, access_sequence_number_); + block_info_map_[access.block_key] = &block_access_info; + uint64_t get_key_id = 0; + if (access.caller == TableReaderCaller::kUserGet && + access.get_id != BlockCacheTraceHelper::kReservedGetId) { + std::string user_key = ExtractUserKey(access.referenced_key).ToString(); + if (get_key_info_map_.find(user_key) == get_key_info_map_.end()) { + get_key_info_map_[user_key].key_id = unique_get_key_id_; + unique_get_key_id_++; + } + get_key_id = get_key_info_map_[user_key].key_id; + get_key_info_map_[user_key].AddAccess(access, access_sequence_number_); + } + + if (compute_reuse_distance_) { + // Add this block to all existing blocks. + for (auto& cf_aggregates : cf_aggregates_map_) { + for (auto& file_aggregates : cf_aggregates.second.fd_aggregates_map) { + for (auto& block_type_aggregates : + file_aggregates.second.block_type_aggregates_map) { + for (auto& existing_block : + block_type_aggregates.second.block_access_info_map) { + existing_block.second.unique_blocks_since_last_access.insert( + access.block_key); + } + } + } + } + } + return human_readable_trace_writer_.WriteHumanReadableTraceRecord( + access, block_access_info.block_id, get_key_id); +} + +Status BlockCacheTraceAnalyzer::Analyze() { + std::unique_ptr<BlockCacheTraceReader> reader; + Status s = Status::OK(); + if (is_human_readable_trace_file_) { + reader.reset(new BlockCacheHumanReadableTraceReader(trace_file_path_)); + } else { + std::unique_ptr<TraceReader> trace_reader; + s = NewFileTraceReader(env_, EnvOptions(), trace_file_path_, &trace_reader); + if (!s.ok()) { + return s; + } + reader.reset(new BlockCacheTraceReader(std::move(trace_reader))); + s = reader->ReadHeader(&header_); + if (!s.ok()) { + return s; + } + } + if (!human_readable_trace_file_path_.empty()) { + s = human_readable_trace_writer_.NewWritableFile( + human_readable_trace_file_path_, env_); + if (!s.ok()) { + return s; + } + } + uint64_t start = env_->NowMicros(); + uint64_t time_interval = 0; + while (s.ok()) { + BlockCacheTraceRecord access; + s = reader->ReadAccess(&access); + if (!s.ok()) { + break; + } + if (!mrc_only_) { + s = RecordAccess(access); + if (!s.ok()) { + break; + } + } + if (trace_start_timestamp_in_seconds_ == 0) { + trace_start_timestamp_in_seconds_ = + access.access_timestamp / kMicrosInSecond; + } + trace_end_timestamp_in_seconds_ = access.access_timestamp / kMicrosInSecond; + miss_ratio_stats_.UpdateMetrics(access.access_timestamp, + is_user_access(access.caller), + access.is_cache_hit == Boolean::kFalse); + if (cache_simulator_) { + cache_simulator_->Access(access); + } + access_sequence_number_++; + uint64_t now = env_->NowMicros(); + uint64_t duration = (now - start) / kMicrosInSecond; + if (duration > 10 * time_interval) { + uint64_t trace_duration = + trace_end_timestamp_in_seconds_ - trace_start_timestamp_in_seconds_; + fprintf(stdout, + "Running for %" PRIu64 " seconds: Processed %" PRIu64 + " records/second. Trace duration %" PRIu64 + " seconds. Observed miss ratio %.2f\n", + duration, duration > 0 ? access_sequence_number_ / duration : 0, + trace_duration, miss_ratio_stats_.miss_ratio()); + time_interval++; + } + } + uint64_t now = env_->NowMicros(); + uint64_t duration = (now - start) / kMicrosInSecond; + uint64_t trace_duration = + trace_end_timestamp_in_seconds_ - trace_start_timestamp_in_seconds_; + fprintf(stdout, + "Running for %" PRIu64 " seconds: Processed %" PRIu64 + " records/second. Trace duration %" PRIu64 + " seconds. Observed miss ratio %.2f\n", + duration, duration > 0 ? access_sequence_number_ / duration : 0, + trace_duration, miss_ratio_stats_.miss_ratio()); + return s; +} + +void BlockCacheTraceAnalyzer::PrintBlockSizeStats() const { + HistogramStat bs_stats; + std::map<TraceType, HistogramStat> bt_stats_map; + std::map<std::string, std::map<TraceType, HistogramStat>> cf_bt_stats_map; + auto block_callback = + [&](const std::string& cf_name, uint64_t /*fd*/, uint32_t /*level*/, + TraceType type, const std::string& /*block_key*/, + uint64_t /*block_id*/, const BlockAccessInfo& block) { + if (block.block_size == 0) { + // Block size may be 0 when 1) compaction observes a cache miss and + // does not insert the missing block into the cache again. 2) + // fetching filter blocks in SST files at the last level. + return; + } + bs_stats.Add(block.block_size); + bt_stats_map[type].Add(block.block_size); + cf_bt_stats_map[cf_name][type].Add(block.block_size); + }; + TraverseBlocks(block_callback); + fprintf(stdout, "Block size stats: \n%s", bs_stats.ToString().c_str()); + for (auto const& bt_stats : bt_stats_map) { + print_break_lines(/*num_break_lines=*/1); + fprintf(stdout, "Block size stats for block type %s: \n%s", + block_type_to_string(bt_stats.first).c_str(), + bt_stats.second.ToString().c_str()); + } + for (auto const& cf_bt_stats : cf_bt_stats_map) { + const std::string& cf_name = cf_bt_stats.first; + for (auto const& bt_stats : cf_bt_stats.second) { + print_break_lines(/*num_break_lines=*/1); + fprintf(stdout, + "Block size stats for column family %s and block type %s: \n%s", + cf_name.c_str(), block_type_to_string(bt_stats.first).c_str(), + bt_stats.second.ToString().c_str()); + } + } +} + +void BlockCacheTraceAnalyzer::PrintAccessCountStats(bool user_access_only, + uint32_t bottom_k, + uint32_t top_k) const { + HistogramStat access_stats; + std::map<TraceType, HistogramStat> bt_stats_map; + std::map<std::string, std::map<TraceType, HistogramStat>> cf_bt_stats_map; + std::map<uint64_t, std::vector<std::string>> access_count_blocks; + auto block_callback = [&](const std::string& cf_name, uint64_t /*fd*/, + uint32_t /*level*/, TraceType type, + const std::string& block_key, uint64_t /*block_id*/, + const BlockAccessInfo& block) { + uint64_t naccesses = 0; + for (auto const& caller_access : block.caller_num_access_map) { + if (!user_access_only || is_user_access(caller_access.first)) { + naccesses += caller_access.second; + } + } + if (naccesses == 0) { + return; + } + if (type == TraceType::kBlockTraceDataBlock) { + access_count_blocks[naccesses].push_back(block_key); + } + access_stats.Add(naccesses); + bt_stats_map[type].Add(naccesses); + cf_bt_stats_map[cf_name][type].Add(naccesses); + }; + TraverseBlocks(block_callback); + fprintf(stdout, + "Block access count stats: The number of accesses per block. %s\n%s", + user_access_only ? "User accesses only" : "All accesses", + access_stats.ToString().c_str()); + uint32_t bottom_k_index = 0; + for (auto naccess_it = access_count_blocks.begin(); + naccess_it != access_count_blocks.end(); naccess_it++) { + bottom_k_index++; + if (bottom_k_index >= bottom_k) { + break; + } + std::map<TableReaderCaller, uint64_t> caller_naccesses; + uint64_t naccesses = 0; + for (auto const& block_id : naccess_it->second) { + BlockAccessInfo* block = block_info_map_.find(block_id)->second; + for (auto const& caller_access : block->caller_num_access_map) { + if (!user_access_only || is_user_access(caller_access.first)) { + caller_naccesses[caller_access.first] += caller_access.second; + naccesses += caller_access.second; + } + } + } + std::string statistics("Caller:"); + for (auto const& caller_naccessess_it : caller_naccesses) { + statistics += caller_to_string(caller_naccessess_it.first); + statistics += ":"; + statistics += + std::to_string(percent(caller_naccessess_it.second, naccesses)); + statistics += ","; + } + fprintf(stdout, + "Bottom %" PRIu32 " access count. Access count=%" PRIu64 + " nblocks=%" ROCKSDB_PRIszt " %s\n", + bottom_k, naccess_it->first, naccess_it->second.size(), + statistics.c_str()); + } + + uint32_t top_k_index = 0; + for (auto naccess_it = access_count_blocks.rbegin(); + naccess_it != access_count_blocks.rend(); naccess_it++) { + top_k_index++; + if (top_k_index >= top_k) { + break; + } + for (auto const& block_id : naccess_it->second) { + BlockAccessInfo* block = block_info_map_.find(block_id)->second; + std::string statistics("Caller:"); + uint64_t naccesses = 0; + for (auto const& caller_access : block->caller_num_access_map) { + if (!user_access_only || is_user_access(caller_access.first)) { + naccesses += caller_access.second; + } + } + assert(naccesses > 0); + for (auto const& caller_access : block->caller_num_access_map) { + if (!user_access_only || is_user_access(caller_access.first)) { + statistics += ","; + statistics += caller_to_string(caller_access.first); + statistics += ":"; + statistics += + std::to_string(percent(caller_access.second, naccesses)); + } + } + uint64_t ref_keys_accesses = 0; + uint64_t ref_keys_does_not_exist_accesses = 0; + for (auto const& ref_key_caller_access : block->key_num_access_map) { + for (auto const& caller_access : ref_key_caller_access.second) { + if (!user_access_only || is_user_access(caller_access.first)) { + ref_keys_accesses += caller_access.second; + } + } + } + for (auto const& ref_key_caller_access : + block->non_exist_key_num_access_map) { + for (auto const& caller_access : ref_key_caller_access.second) { + if (!user_access_only || is_user_access(caller_access.first)) { + ref_keys_does_not_exist_accesses += caller_access.second; + } + } + } + statistics += ",nkeys="; + statistics += std::to_string(block->num_keys); + statistics += ",block_size="; + statistics += std::to_string(block->block_size); + statistics += ",num_ref_keys="; + statistics += std::to_string(block->key_num_access_map.size()); + statistics += ",percent_access_ref_keys="; + statistics += std::to_string(percent(ref_keys_accesses, naccesses)); + statistics += ",num_ref_keys_does_not_exist="; + statistics += std::to_string(block->non_exist_key_num_access_map.size()); + statistics += ",percent_access_ref_keys_does_not_exist="; + statistics += + std::to_string(percent(ref_keys_does_not_exist_accesses, naccesses)); + statistics += ",ref_data_size="; + statistics += std::to_string(block->referenced_data_size); + fprintf(stdout, + "Top %" PRIu32 " access count blocks access_count=%" PRIu64 + " %s\n", + top_k, naccess_it->first, statistics.c_str()); + } + } + + for (auto const& bt_stats : bt_stats_map) { + print_break_lines(/*num_break_lines=*/1); + fprintf(stdout, "Break down by block type %s: \n%s", + block_type_to_string(bt_stats.first).c_str(), + bt_stats.second.ToString().c_str()); + } + for (auto const& cf_bt_stats : cf_bt_stats_map) { + const std::string& cf_name = cf_bt_stats.first; + for (auto const& bt_stats : cf_bt_stats.second) { + print_break_lines(/*num_break_lines=*/1); + fprintf(stdout, + "Break down by column family %s and block type " + "%s: \n%s", + cf_name.c_str(), block_type_to_string(bt_stats.first).c_str(), + bt_stats.second.ToString().c_str()); + } + } +} + +void BlockCacheTraceAnalyzer::PrintDataBlockAccessStats() const { + HistogramStat existing_keys_stats; + std::map<std::string, HistogramStat> cf_existing_keys_stats_map; + HistogramStat non_existing_keys_stats; + std::map<std::string, HistogramStat> cf_non_existing_keys_stats_map; + HistogramStat block_access_stats; + std::map<std::string, HistogramStat> cf_block_access_info; + HistogramStat percent_referenced_bytes; + std::map<std::string, HistogramStat> cf_percent_referenced_bytes; + // Total number of accesses in a data block / number of keys in a data block. + HistogramStat avg_naccesses_per_key_in_a_data_block; + std::map<std::string, HistogramStat> cf_avg_naccesses_per_key_in_a_data_block; + // The standard deviation on the number of accesses of a key in a data block. + HistogramStat stdev_naccesses_per_key_in_a_data_block; + std::map<std::string, HistogramStat> + cf_stdev_naccesses_per_key_in_a_data_block; + auto block_callback = + [&](const std::string& cf_name, uint64_t /*fd*/, uint32_t /*level*/, + TraceType /*type*/, const std::string& /*block_key*/, + uint64_t /*block_id*/, const BlockAccessInfo& block) { + if (block.num_keys == 0) { + return; + } + // Use four decimal points. + uint64_t percent_referenced_for_existing_keys = (uint64_t)( + ((double)block.key_num_access_map.size() / (double)block.num_keys) * + 10000.0); + uint64_t percent_referenced_for_non_existing_keys = + (uint64_t)(((double)block.non_exist_key_num_access_map.size() / + (double)block.num_keys) * + 10000.0); + uint64_t percent_accesses_for_existing_keys = + (uint64_t)(((double)block.num_referenced_key_exist_in_block / + (double)block.num_accesses) * + 10000.0); + + HistogramStat hist_naccess_per_key; + for (auto const& key_access : block.key_num_access_map) { + for (auto const& caller_access : key_access.second) { + hist_naccess_per_key.Add(caller_access.second); + } + } + uint64_t avg_accesses = + static_cast<uint64_t>(hist_naccess_per_key.Average()); + uint64_t stdev_accesses = + static_cast<uint64_t>(hist_naccess_per_key.StandardDeviation()); + avg_naccesses_per_key_in_a_data_block.Add(avg_accesses); + cf_avg_naccesses_per_key_in_a_data_block[cf_name].Add(avg_accesses); + stdev_naccesses_per_key_in_a_data_block.Add(stdev_accesses); + cf_stdev_naccesses_per_key_in_a_data_block[cf_name].Add(stdev_accesses); + + existing_keys_stats.Add(percent_referenced_for_existing_keys); + cf_existing_keys_stats_map[cf_name].Add( + percent_referenced_for_existing_keys); + non_existing_keys_stats.Add(percent_referenced_for_non_existing_keys); + cf_non_existing_keys_stats_map[cf_name].Add( + percent_referenced_for_non_existing_keys); + block_access_stats.Add(percent_accesses_for_existing_keys); + cf_block_access_info[cf_name].Add(percent_accesses_for_existing_keys); + }; + TraverseBlocks(block_callback); + fprintf(stdout, + "Histogram on the number of referenced keys existing in a block over " + "the total number of keys in a block: \n%s", + existing_keys_stats.ToString().c_str()); + for (auto const& cf_stats : cf_existing_keys_stats_map) { + print_break_lines(/*num_break_lines=*/1); + fprintf(stdout, "Break down by column family %s: \n%s", + cf_stats.first.c_str(), cf_stats.second.ToString().c_str()); + } + print_break_lines(/*num_break_lines=*/1); + fprintf( + stdout, + "Histogram on the number of referenced keys DO NOT exist in a block over " + "the total number of keys in a block: \n%s", + non_existing_keys_stats.ToString().c_str()); + for (auto const& cf_stats : cf_non_existing_keys_stats_map) { + print_break_lines(/*num_break_lines=*/1); + fprintf(stdout, "Break down by column family %s: \n%s", + cf_stats.first.c_str(), cf_stats.second.ToString().c_str()); + } + print_break_lines(/*num_break_lines=*/1); + fprintf(stdout, + "Histogram on the number of accesses on keys exist in a block over " + "the total number of accesses in a block: \n%s", + block_access_stats.ToString().c_str()); + for (auto const& cf_stats : cf_block_access_info) { + print_break_lines(/*num_break_lines=*/1); + fprintf(stdout, "Break down by column family %s: \n%s", + cf_stats.first.c_str(), cf_stats.second.ToString().c_str()); + } + print_break_lines(/*num_break_lines=*/1); + fprintf( + stdout, + "Histogram on the average number of accesses per key in a block: \n%s", + avg_naccesses_per_key_in_a_data_block.ToString().c_str()); + for (auto const& cf_stats : cf_avg_naccesses_per_key_in_a_data_block) { + fprintf(stdout, "Break down by column family %s: \n%s", + cf_stats.first.c_str(), cf_stats.second.ToString().c_str()); + } + print_break_lines(/*num_break_lines=*/1); + fprintf(stdout, + "Histogram on the standard deviation of the number of accesses per " + "key in a block: \n%s", + stdev_naccesses_per_key_in_a_data_block.ToString().c_str()); + for (auto const& cf_stats : cf_stdev_naccesses_per_key_in_a_data_block) { + fprintf(stdout, "Break down by column family %s: \n%s", + cf_stats.first.c_str(), cf_stats.second.ToString().c_str()); + } +} + +void BlockCacheTraceAnalyzer::PrintStatsSummary() const { + uint64_t total_num_files = 0; + uint64_t total_num_blocks = 0; + uint64_t total_num_accesses = 0; + std::map<TraceType, uint64_t> bt_num_blocks_map; + std::map<TableReaderCaller, uint64_t> caller_num_access_map; + std::map<TableReaderCaller, std::map<TraceType, uint64_t>> + caller_bt_num_access_map; + std::map<TableReaderCaller, std::map<uint32_t, uint64_t>> + caller_level_num_access_map; + for (auto const& cf_aggregates : cf_aggregates_map_) { + // Stats per column family. + const std::string& cf_name = cf_aggregates.first; + uint64_t cf_num_files = 0; + uint64_t cf_num_blocks = 0; + std::map<TraceType, uint64_t> cf_bt_blocks; + uint64_t cf_num_accesses = 0; + std::map<TableReaderCaller, uint64_t> cf_caller_num_accesses_map; + std::map<TableReaderCaller, std::map<uint64_t, uint64_t>> + cf_caller_level_num_accesses_map; + std::map<TableReaderCaller, std::map<uint64_t, uint64_t>> + cf_caller_file_num_accesses_map; + std::map<TableReaderCaller, std::map<TraceType, uint64_t>> + cf_caller_bt_num_accesses_map; + total_num_files += cf_aggregates.second.fd_aggregates_map.size(); + for (auto const& file_aggregates : cf_aggregates.second.fd_aggregates_map) { + // Stats per SST file. + const uint64_t fd = file_aggregates.first; + const uint32_t level = file_aggregates.second.level; + cf_num_files++; + for (auto const& block_type_aggregates : + file_aggregates.second.block_type_aggregates_map) { + // Stats per block type. + const TraceType type = block_type_aggregates.first; + cf_bt_blocks[type] += + block_type_aggregates.second.block_access_info_map.size(); + total_num_blocks += + block_type_aggregates.second.block_access_info_map.size(); + bt_num_blocks_map[type] += + block_type_aggregates.second.block_access_info_map.size(); + for (auto const& block_access_info : + block_type_aggregates.second.block_access_info_map) { + // Stats per block. + cf_num_blocks++; + for (auto const& stats : + block_access_info.second.caller_num_access_map) { + // Stats per caller. + const TableReaderCaller caller = stats.first; + const uint64_t num_accesses = stats.second; + // Overall stats. + total_num_accesses += num_accesses; + caller_num_access_map[caller] += num_accesses; + caller_bt_num_access_map[caller][type] += num_accesses; + caller_level_num_access_map[caller][level] += num_accesses; + // Column Family stats. + cf_num_accesses += num_accesses; + cf_caller_num_accesses_map[caller] += num_accesses; + cf_caller_level_num_accesses_map[caller][level] += num_accesses; + cf_caller_file_num_accesses_map[caller][fd] += num_accesses; + cf_caller_bt_num_accesses_map[caller][type] += num_accesses; + } + } + } + } + + // Print stats. + print_break_lines(/*num_break_lines=*/3); + fprintf(stdout, "Statistics for column family %s:\n", cf_name.c_str()); + fprintf(stdout, + " Number of files:%" PRIu64 " Number of blocks: %" PRIu64 + " Number of accesses: %" PRIu64 "\n", + cf_num_files, cf_num_blocks, cf_num_accesses); + for (auto block_type : cf_bt_blocks) { + fprintf(stdout, "Number of %s blocks: %" PRIu64 " Percent: %.2f\n", + block_type_to_string(block_type.first).c_str(), block_type.second, + percent(block_type.second, cf_num_blocks)); + } + for (auto caller : cf_caller_num_accesses_map) { + const uint64_t naccesses = caller.second; + print_break_lines(/*num_break_lines=*/1); + fprintf(stdout, + "Caller %s: Number of accesses %" PRIu64 " Percent: %.2f\n", + caller_to_string(caller.first).c_str(), naccesses, + percent(naccesses, cf_num_accesses)); + fprintf(stdout, "Caller %s: Number of accesses per level break down\n", + caller_to_string(caller.first).c_str()); + for (auto naccess_level : + cf_caller_level_num_accesses_map[caller.first]) { + fprintf(stdout, + "\t Level %" PRIu64 ": Number of accesses: %" PRIu64 + " Percent: %.2f\n", + naccess_level.first, naccess_level.second, + percent(naccess_level.second, naccesses)); + } + fprintf(stdout, "Caller %s: Number of accesses per file break down\n", + caller_to_string(caller.first).c_str()); + for (auto naccess_file : cf_caller_file_num_accesses_map[caller.first]) { + fprintf(stdout, + "\t File %" PRIu64 ": Number of accesses: %" PRIu64 + " Percent: %.2f\n", + naccess_file.first, naccess_file.second, + percent(naccess_file.second, naccesses)); + } + fprintf(stdout, + "Caller %s: Number of accesses per block type break down\n", + caller_to_string(caller.first).c_str()); + for (auto naccess_type : cf_caller_bt_num_accesses_map[caller.first]) { + fprintf(stdout, + "\t Block Type %s: Number of accesses: %" PRIu64 + " Percent: %.2f\n", + block_type_to_string(naccess_type.first).c_str(), + naccess_type.second, percent(naccess_type.second, naccesses)); + } + } + } + print_break_lines(/*num_break_lines=*/3); + fprintf(stdout, "Overall statistics:\n"); + fprintf(stdout, + "Number of files: %" PRIu64 " Number of blocks: %" PRIu64 + " Number of accesses: %" PRIu64 "\n", + total_num_files, total_num_blocks, total_num_accesses); + for (auto block_type : bt_num_blocks_map) { + fprintf(stdout, "Number of %s blocks: %" PRIu64 " Percent: %.2f\n", + block_type_to_string(block_type.first).c_str(), block_type.second, + percent(block_type.second, total_num_blocks)); + } + for (auto caller : caller_num_access_map) { + print_break_lines(/*num_break_lines=*/1); + uint64_t naccesses = caller.second; + fprintf(stdout, "Caller %s: Number of accesses %" PRIu64 " Percent: %.2f\n", + caller_to_string(caller.first).c_str(), naccesses, + percent(naccesses, total_num_accesses)); + fprintf(stdout, "Caller %s: Number of accesses per level break down\n", + caller_to_string(caller.first).c_str()); + for (auto naccess_level : caller_level_num_access_map[caller.first]) { + fprintf(stdout, + "\t Level %d: Number of accesses: %" PRIu64 " Percent: %.2f\n", + naccess_level.first, naccess_level.second, + percent(naccess_level.second, naccesses)); + } + fprintf(stdout, "Caller %s: Number of accesses per block type break down\n", + caller_to_string(caller.first).c_str()); + for (auto naccess_type : caller_bt_num_access_map[caller.first]) { + fprintf(stdout, + "\t Block Type %s: Number of accesses: %" PRIu64 + " Percent: %.2f\n", + block_type_to_string(naccess_type.first).c_str(), + naccess_type.second, percent(naccess_type.second, naccesses)); + } + } +} + +std::vector<CacheConfiguration> parse_cache_config_file( + const std::string& config_path) { + std::ifstream file(config_path); + if (!file.is_open()) { + return {}; + } + std::vector<CacheConfiguration> configs; + std::string line; + while (getline(file, line)) { + CacheConfiguration cache_config; + std::stringstream ss(line); + std::vector<std::string> config_strs; + while (ss.good()) { + std::string substr; + getline(ss, substr, ','); + config_strs.push_back(substr); + } + // Sanity checks. + if (config_strs.size() < 4) { + fprintf(stderr, "Invalid cache simulator configuration %s\n", + line.c_str()); + exit(1); + } + if (kSupportedCacheNames.find(" " + config_strs[0] + " ") == + std::string::npos) { + fprintf(stderr, "Invalid cache name %s. Supported cache names are %s\n", + line.c_str(), kSupportedCacheNames.c_str()); + exit(1); + } + cache_config.cache_name = config_strs[0]; + cache_config.num_shard_bits = ParseUint32(config_strs[1]); + cache_config.ghost_cache_capacity = ParseUint64(config_strs[2]); + for (uint32_t i = 3; i < config_strs.size(); i++) { + uint64_t capacity = ParseUint64(config_strs[i]); + if (capacity == 0) { + fprintf(stderr, "Invalid cache capacity %s, %s\n", + config_strs[i].c_str(), line.c_str()); + exit(1); + } + cache_config.cache_capacities.push_back(capacity); + } + configs.push_back(cache_config); + } + file.close(); + return configs; +} + +std::vector<uint64_t> parse_buckets(const std::string& bucket_str) { + std::vector<uint64_t> buckets; + std::stringstream ss(bucket_str); + while (ss.good()) { + std::string bucket; + getline(ss, bucket, ','); + buckets.push_back(ParseUint64(bucket)); + } + buckets.push_back(port::kMaxUint64); + return buckets; +} + +int block_cache_trace_analyzer_tool(int argc, char** argv) { + ParseCommandLineFlags(&argc, &argv, true); + if (FLAGS_block_cache_trace_path.empty()) { + fprintf(stderr, "block cache trace path is empty\n"); + exit(1); + } + uint64_t warmup_seconds = + FLAGS_cache_sim_warmup_seconds > 0 ? FLAGS_cache_sim_warmup_seconds : 0; + uint32_t downsample_ratio = FLAGS_block_cache_trace_downsample_ratio > 0 + ? FLAGS_block_cache_trace_downsample_ratio + : 0; + std::vector<CacheConfiguration> cache_configs = + parse_cache_config_file(FLAGS_block_cache_sim_config_path); + std::unique_ptr<BlockCacheTraceSimulator> cache_simulator; + if (!cache_configs.empty()) { + cache_simulator.reset(new BlockCacheTraceSimulator( + warmup_seconds, downsample_ratio, cache_configs)); + Status s = cache_simulator->InitializeCaches(); + if (!s.ok()) { + fprintf(stderr, "Cannot initialize cache simulators %s\n", + s.ToString().c_str()); + exit(1); + } + } + BlockCacheTraceAnalyzer analyzer( + FLAGS_block_cache_trace_path, FLAGS_block_cache_analysis_result_dir, + FLAGS_human_readable_trace_file_path, + !FLAGS_reuse_distance_labels.empty(), FLAGS_mrc_only, + FLAGS_is_block_cache_human_readable_trace, std::move(cache_simulator)); + Status s = analyzer.Analyze(); + if (!s.IsIncomplete() && !s.ok()) { + // Read all traces. + fprintf(stderr, "Cannot process the trace %s\n", s.ToString().c_str()); + exit(1); + } + fprintf(stdout, "Status: %s\n", s.ToString().c_str()); + analyzer.WriteMissRatioCurves(); + analyzer.WriteMissRatioTimeline(1); + analyzer.WriteMissRatioTimeline(kSecondInMinute); + analyzer.WriteMissRatioTimeline(kSecondInHour); + analyzer.WriteMissTimeline(1); + analyzer.WriteMissTimeline(kSecondInMinute); + analyzer.WriteMissTimeline(kSecondInHour); + + if (FLAGS_mrc_only) { + fprintf(stdout, + "Skipping the analysis statistics since the user wants to compute " + "MRC only"); + return 0; + } + + analyzer.PrintStatsSummary(); + if (FLAGS_print_access_count_stats) { + print_break_lines(/*num_break_lines=*/3); + analyzer.PrintAccessCountStats( + /*user_access_only=*/false, FLAGS_analyze_bottom_k_access_count_blocks, + FLAGS_analyze_top_k_access_count_blocks); + print_break_lines(/*num_break_lines=*/3); + analyzer.PrintAccessCountStats( + /*user_access_only=*/true, FLAGS_analyze_bottom_k_access_count_blocks, + FLAGS_analyze_top_k_access_count_blocks); + } + if (FLAGS_print_block_size_stats) { + print_break_lines(/*num_break_lines=*/3); + analyzer.PrintBlockSizeStats(); + } + if (FLAGS_print_data_block_access_count_stats) { + print_break_lines(/*num_break_lines=*/3); + analyzer.PrintDataBlockAccessStats(); + } + print_break_lines(/*num_break_lines=*/3); + + if (!FLAGS_timeline_labels.empty()) { + std::stringstream ss(FLAGS_timeline_labels); + while (ss.good()) { + std::string label; + getline(ss, label, ','); + if (label.find("block") != std::string::npos) { + analyzer.WriteAccessTimeline(label, kSecondInMinute, true); + analyzer.WriteAccessTimeline(label, kSecondInMinute, false); + analyzer.WriteAccessTimeline(label, kSecondInHour, true); + analyzer.WriteAccessTimeline(label, kSecondInHour, false); + } else { + analyzer.WriteAccessTimeline(label, kSecondInMinute, false); + analyzer.WriteAccessTimeline(label, kSecondInHour, false); + } + } + } + + if (!FLAGS_analyze_callers.empty()) { + analyzer.WritePercentAccessSummaryStats(); + std::stringstream ss(FLAGS_analyze_callers); + while (ss.good()) { + std::string caller; + getline(ss, caller, ','); + analyzer.WriteDetailedPercentAccessSummaryStats(string_to_caller(caller)); + } + } + + if (!FLAGS_access_count_buckets.empty()) { + std::vector<uint64_t> buckets = parse_buckets(FLAGS_access_count_buckets); + analyzer.WriteAccessCountSummaryStats(buckets, /*user_access_only=*/true); + analyzer.WriteAccessCountSummaryStats(buckets, /*user_access_only=*/false); + } + + if (!FLAGS_reuse_distance_labels.empty() && + !FLAGS_reuse_distance_buckets.empty()) { + std::vector<uint64_t> buckets = parse_buckets(FLAGS_reuse_distance_buckets); + std::stringstream ss(FLAGS_reuse_distance_labels); + while (ss.good()) { + std::string label; + getline(ss, label, ','); + analyzer.WriteReuseDistance(label, buckets); + } + } + + if (!FLAGS_reuse_interval_labels.empty() && + !FLAGS_reuse_interval_buckets.empty()) { + std::vector<uint64_t> buckets = parse_buckets(FLAGS_reuse_interval_buckets); + std::stringstream ss(FLAGS_reuse_interval_labels); + while (ss.good()) { + std::string label; + getline(ss, label, ','); + analyzer.WriteReuseInterval(label, buckets); + } + } + + if (!FLAGS_reuse_lifetime_labels.empty() && + !FLAGS_reuse_lifetime_buckets.empty()) { + std::vector<uint64_t> buckets = parse_buckets(FLAGS_reuse_lifetime_buckets); + std::stringstream ss(FLAGS_reuse_lifetime_labels); + while (ss.good()) { + std::string label; + getline(ss, label, ','); + analyzer.WriteReuseLifetime(label, buckets); + } + } + + if (FLAGS_analyze_blocks_reuse_k_reuse_window != 0) { + std::vector<TraceType> block_types{TraceType::kBlockTraceIndexBlock, + TraceType::kBlockTraceDataBlock, + TraceType::kBlockTraceFilterBlock}; + for (auto block_type : block_types) { + analyzer.WriteBlockReuseTimeline( + FLAGS_analyze_blocks_reuse_k_reuse_window, + /*user_access_only=*/true, block_type); + analyzer.WriteBlockReuseTimeline( + FLAGS_analyze_blocks_reuse_k_reuse_window, + /*user_access_only=*/false, block_type); + } + } + + if (!FLAGS_analyze_get_spatial_locality_labels.empty() && + !FLAGS_analyze_get_spatial_locality_buckets.empty()) { + std::vector<uint64_t> buckets = + parse_buckets(FLAGS_analyze_get_spatial_locality_buckets); + std::stringstream ss(FLAGS_analyze_get_spatial_locality_labels); + while (ss.good()) { + std::string label; + getline(ss, label, ','); + analyzer.WriteGetSpatialLocality(label, buckets); + } + } + + if (!FLAGS_analyze_correlation_coefficients_labels.empty()) { + std::stringstream ss(FLAGS_analyze_correlation_coefficients_labels); + while (ss.good()) { + std::string label; + getline(ss, label, ','); + analyzer.WriteCorrelationFeatures( + label, FLAGS_analyze_correlation_coefficients_max_number_of_values); + } + analyzer.WriteCorrelationFeaturesForGet( + FLAGS_analyze_correlation_coefficients_max_number_of_values); + } + + if (!FLAGS_skew_labels.empty() && !FLAGS_skew_buckets.empty()) { + std::vector<uint64_t> buckets = parse_buckets(FLAGS_skew_buckets); + std::stringstream ss(FLAGS_skew_labels); + while (ss.good()) { + std::string label; + getline(ss, label, ','); + if (label.find("block") != std::string::npos) { + analyzer.WriteSkewness(label, buckets, + TraceType::kBlockTraceIndexBlock); + analyzer.WriteSkewness(label, buckets, + TraceType::kBlockTraceFilterBlock); + analyzer.WriteSkewness(label, buckets, TraceType::kBlockTraceDataBlock); + analyzer.WriteSkewness(label, buckets, TraceType::kTraceMax); + } else { + analyzer.WriteSkewness(label, buckets, TraceType::kTraceMax); + } + } + } + return 0; +} + +} // namespace ROCKSDB_NAMESPACE + +#endif // GFLAGS +#endif // ROCKSDB_LITE diff --git a/src/rocksdb/tools/block_cache_analyzer/block_cache_trace_analyzer.h b/src/rocksdb/tools/block_cache_analyzer/block_cache_trace_analyzer.h new file mode 100644 index 000000000..48a544813 --- /dev/null +++ b/src/rocksdb/tools/block_cache_analyzer/block_cache_trace_analyzer.h @@ -0,0 +1,393 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#pragma once + +#include <map> +#include <set> +#include <vector> + +#include "db/dbformat.h" +#include "rocksdb/env.h" +#include "rocksdb/utilities/sim_cache.h" +#include "trace_replay/block_cache_tracer.h" +#include "utilities/simulator_cache/cache_simulator.h" + +namespace ROCKSDB_NAMESPACE { + +// Statistics of a key refereneced by a Get. +struct GetKeyInfo { + uint64_t key_id = 0; + std::vector<uint64_t> access_sequence_number_timeline; + std::vector<uint64_t> access_timeline; + + void AddAccess(const BlockCacheTraceRecord& access, + uint64_t access_sequnce_number) { + access_sequence_number_timeline.push_back(access_sequnce_number); + access_timeline.push_back(access.access_timestamp); + } +}; + +// Statistics of a block. +struct BlockAccessInfo { + uint64_t block_id = 0; + uint64_t table_id = 0; + uint64_t block_offset = 0; + uint64_t num_accesses = 0; + uint64_t block_size = 0; + uint64_t first_access_time = 0; + uint64_t last_access_time = 0; + uint64_t num_keys = 0; + std::map<std::string, std::map<TableReaderCaller, uint64_t>> + key_num_access_map; // for keys exist in this block. + std::map<std::string, std::map<TableReaderCaller, uint64_t>> + non_exist_key_num_access_map; // for keys do not exist in this block. + uint64_t num_referenced_key_exist_in_block = 0; + uint64_t referenced_data_size = 0; + std::map<TableReaderCaller, uint64_t> caller_num_access_map; + // caller:timestamp:number_of_accesses. The granularity of the timestamp is + // seconds. + std::map<TableReaderCaller, std::map<uint64_t, uint64_t>> + caller_num_accesses_timeline; + // Unique blocks since the last access. + std::set<std::string> unique_blocks_since_last_access; + // Number of reuses grouped by reuse distance. + std::map<uint64_t, uint64_t> reuse_distance_count; + + // The access sequence numbers of this block. + std::vector<uint64_t> access_sequence_number_timeline; + std::map<TableReaderCaller, std::vector<uint64_t>> + caller_access_sequence__number_timeline; + // The access timestamp in microseconds of this block. + std::vector<uint64_t> access_timeline; + std::map<TableReaderCaller, std::vector<uint64_t>> caller_access_timeline; + + void AddAccess(const BlockCacheTraceRecord& access, + uint64_t access_sequnce_number) { + if (block_size != 0 && access.block_size != 0) { + assert(block_size == access.block_size); + } + if (num_keys != 0 && access.num_keys_in_block != 0) { + assert(num_keys == access.num_keys_in_block); + } + if (first_access_time == 0) { + first_access_time = access.access_timestamp; + } + table_id = BlockCacheTraceHelper::GetTableId(access); + block_offset = BlockCacheTraceHelper::GetBlockOffsetInFile(access); + last_access_time = access.access_timestamp; + block_size = access.block_size; + caller_num_access_map[access.caller]++; + num_accesses++; + // access.access_timestamp is in microsecond. + const uint64_t timestamp_in_seconds = + access.access_timestamp / kMicrosInSecond; + caller_num_accesses_timeline[access.caller][timestamp_in_seconds] += 1; + // Populate the feature vectors. + access_sequence_number_timeline.push_back(access_sequnce_number); + caller_access_sequence__number_timeline[access.caller].push_back( + access_sequnce_number); + access_timeline.push_back(access.access_timestamp); + caller_access_timeline[access.caller].push_back(access.access_timestamp); + if (BlockCacheTraceHelper::IsGetOrMultiGetOnDataBlock(access.block_type, + access.caller)) { + num_keys = access.num_keys_in_block; + if (access.referenced_key_exist_in_block == Boolean::kTrue) { + if (key_num_access_map.find(access.referenced_key) == + key_num_access_map.end()) { + referenced_data_size += access.referenced_data_size; + } + key_num_access_map[access.referenced_key][access.caller]++; + num_referenced_key_exist_in_block++; + if (referenced_data_size > block_size && block_size != 0) { + ParsedInternalKey internal_key; + ParseInternalKey(access.referenced_key, &internal_key); + } + } else { + non_exist_key_num_access_map[access.referenced_key][access.caller]++; + } + } + } +}; + +// Aggregates stats of a block given a block type. +struct BlockTypeAccessInfoAggregate { + std::map<std::string, BlockAccessInfo> block_access_info_map; +}; + +// Aggregates BlockTypeAggregate given a SST file. +struct SSTFileAccessInfoAggregate { + uint32_t level; + std::map<TraceType, BlockTypeAccessInfoAggregate> block_type_aggregates_map; +}; + +// Aggregates SSTFileAggregate given a column family. +struct ColumnFamilyAccessInfoAggregate { + std::map<uint64_t, SSTFileAccessInfoAggregate> fd_aggregates_map; +}; + +struct Features { + std::vector<uint64_t> elapsed_time_since_last_access; + std::vector<uint64_t> num_accesses_since_last_access; + std::vector<uint64_t> num_past_accesses; +}; + +struct Predictions { + std::vector<uint64_t> elapsed_time_till_next_access; + std::vector<uint64_t> num_accesses_till_next_access; +}; + +class BlockCacheTraceAnalyzer { + public: + BlockCacheTraceAnalyzer( + const std::string& trace_file_path, const std::string& output_dir, + const std::string& human_readable_trace_file_path, + bool compute_reuse_distance, bool mrc_only, + bool is_human_readable_trace_file, + std::unique_ptr<BlockCacheTraceSimulator>&& cache_simulator); + ~BlockCacheTraceAnalyzer() = default; + // No copy and move. + BlockCacheTraceAnalyzer(const BlockCacheTraceAnalyzer&) = delete; + BlockCacheTraceAnalyzer& operator=(const BlockCacheTraceAnalyzer&) = delete; + BlockCacheTraceAnalyzer(BlockCacheTraceAnalyzer&&) = delete; + BlockCacheTraceAnalyzer& operator=(BlockCacheTraceAnalyzer&&) = delete; + + // Read all access records in the given trace_file, maintains the stats of + // a block, and aggregates the information by block type, sst file, and column + // family. Subsequently, the caller may call Print* functions to print + // statistics. + Status Analyze(); + + // Print a summary of statistics of the trace, e.g., + // Number of files: 2 Number of blocks: 50 Number of accesses: 50 + // Number of Index blocks: 10 + // Number of Filter blocks: 10 + // Number of Data blocks: 10 + // Number of UncompressionDict blocks: 10 + // Number of RangeDeletion blocks: 10 + // *************************************************************** + // Caller Get: Number of accesses 10 + // Caller Get: Number of accesses per level break down + // Level 0: Number of accesses: 10 + // Caller Get: Number of accesses per block type break down + // Block Type Index: Number of accesses: 2 + // Block Type Filter: Number of accesses: 2 + // Block Type Data: Number of accesses: 2 + // Block Type UncompressionDict: Number of accesses: 2 + // Block Type RangeDeletion: Number of accesses: 2 + void PrintStatsSummary() const; + + // Print block size distribution and the distribution break down by block type + // and column family. + void PrintBlockSizeStats() const; + + // Print access count distribution and the distribution break down by block + // type and column family. + void PrintAccessCountStats(bool user_access_only, uint32_t bottom_k, + uint32_t top_k) const; + + // Print data block accesses by user Get and Multi-Get. + // It prints out 1) A histogram on the percentage of keys accessed in a data + // block break down by if a referenced key exists in the data block andthe + // histogram break down by column family. 2) A histogram on the percentage of + // accesses on keys exist in a data block and its break down by column family. + void PrintDataBlockAccessStats() const; + + // Write the percentage of accesses break down by column family into a csv + // file saved in 'output_dir'. + // + // The file is named "percentage_of_accesses_summary". The file format is + // caller,cf_0,cf_1,...,cf_n where the cf_i is the column family name found in + // the trace. + void WritePercentAccessSummaryStats() const; + + // Write the percentage of accesses for the given caller break down by column + // family, level, and block type into a csv file saved in 'output_dir'. + // + // It generates two files: 1) caller_level_percentage_of_accesses_summary and + // 2) caller_bt_percentage_of_accesses_summary which break down by the level + // and block type, respectively. The file format is + // level/bt,cf_0,cf_1,...,cf_n where cf_i is the column family name found in + // the trace. + void WriteDetailedPercentAccessSummaryStats(TableReaderCaller caller) const; + + // Write the access count summary into a csv file saved in 'output_dir'. + // It groups blocks by their access count. + // + // It generates two files: 1) cf_access_count_summary and 2) + // bt_access_count_summary which break down the access count by column family + // and block type, respectively. The file format is + // cf/bt,bucket_0,bucket_1,...,bucket_N. + void WriteAccessCountSummaryStats( + const std::vector<uint64_t>& access_count_buckets, + bool user_access_only) const; + + // Write miss ratio curves of simulated cache configurations into a csv file + // named "mrc" saved in 'output_dir'. + // + // The file format is + // "cache_name,num_shard_bits,capacity,miss_ratio,total_accesses". + void WriteMissRatioCurves() const; + + // Write miss ratio timeline of simulated cache configurations into several + // csv files, one per cache capacity saved in 'output_dir'. + // + // The file format is + // "time,label_1_access_per_second,label_2_access_per_second,...,label_N_access_per_second" + // where N is the number of unique cache names + // (cache_name+num_shard_bits+ghost_capacity). + void WriteMissRatioTimeline(uint64_t time_unit) const; + + // Write misses timeline of simulated cache configurations into several + // csv files, one per cache capacity saved in 'output_dir'. + // + // The file format is + // "time,label_1_access_per_second,label_2_access_per_second,...,label_N_access_per_second" + // where N is the number of unique cache names + // (cache_name+num_shard_bits+ghost_capacity). + void WriteMissTimeline(uint64_t time_unit) const; + + // Write the access timeline into a csv file saved in 'output_dir'. + // + // The file is named "label_access_timeline".The file format is + // "time,label_1_access_per_second,label_2_access_per_second,...,label_N_access_per_second" + // where N is the number of unique labels found in the trace. + void WriteAccessTimeline(const std::string& label, uint64_t time_unit, + bool user_access_only) const; + + // Write the reuse distance into a csv file saved in 'output_dir'. Reuse + // distance is defined as the cumulated size of unique blocks read between two + // consective accesses on the same block. + // + // The file is named "label_reuse_distance". The file format is + // bucket,label_1,label_2,...,label_N. + void WriteReuseDistance(const std::string& label_str, + const std::vector<uint64_t>& distance_buckets) const; + + // Write the reuse interval into a csv file saved in 'output_dir'. Reuse + // interval is defined as the time between two consecutive accesses on the + // same block. + // + // The file is named "label_reuse_interval". The file format is + // bucket,label_1,label_2,...,label_N. + void WriteReuseInterval(const std::string& label_str, + const std::vector<uint64_t>& time_buckets) const; + + // Write the reuse lifetime into a csv file saved in 'output_dir'. Reuse + // lifetime is defined as the time interval between the first access of a + // block and its last access. + // + // The file is named "label_reuse_lifetime". The file format is + // bucket,label_1,label_2,...,label_N. + void WriteReuseLifetime(const std::string& label_str, + const std::vector<uint64_t>& time_buckets) const; + + // Write the reuse timeline into a csv file saved in 'output_dir'. + // + // The file is named + // "block_type_user_access_only_reuse_window_reuse_timeline". The file format + // is start_time,0,1,...,N where N equals trace_duration / reuse_window. + void WriteBlockReuseTimeline(const uint64_t reuse_window, bool user_access_only, + TraceType block_type) const; + + // Write the Get spatical locality into csv files saved in 'output_dir'. + // + // It generates three csv files. label_percent_ref_keys, + // label_percent_accesses_on_ref_keys, and + // label_percent_data_size_on_ref_keys. + void WriteGetSpatialLocality( + const std::string& label_str, + const std::vector<uint64_t>& percent_buckets) const; + + void WriteCorrelationFeatures(const std::string& label_str, + uint32_t max_number_of_values) const; + + void WriteCorrelationFeaturesForGet(uint32_t max_number_of_values) const; + + void WriteSkewness(const std::string& label_str, + const std::vector<uint64_t>& percent_buckets, + TraceType target_block_type) const; + + const std::map<std::string, ColumnFamilyAccessInfoAggregate>& + TEST_cf_aggregates_map() const { + return cf_aggregates_map_; + } + + private: + std::set<std::string> ParseLabelStr(const std::string& label_str) const; + + std::string BuildLabel(const std::set<std::string>& labels, + const std::string& cf_name, uint64_t fd, + uint32_t level, TraceType type, + TableReaderCaller caller, uint64_t block_key, + const BlockAccessInfo& block) const; + + void ComputeReuseDistance(BlockAccessInfo* info) const; + + Status RecordAccess(const BlockCacheTraceRecord& access); + + void UpdateReuseIntervalStats( + const std::string& label, const std::vector<uint64_t>& time_buckets, + const std::map<uint64_t, uint64_t> timeline, + std::map<std::string, std::map<uint64_t, uint64_t>>* + label_time_num_reuses, + uint64_t* total_num_reuses) const; + + std::string OutputPercentAccessStats( + uint64_t total_accesses, + const std::map<std::string, uint64_t>& cf_access_count) const; + + void WriteStatsToFile( + const std::string& label_str, const std::vector<uint64_t>& time_buckets, + const std::string& filename_suffix, + const std::map<std::string, std::map<uint64_t, uint64_t>>& label_data, + uint64_t ntotal) const; + + void TraverseBlocks( + std::function<void(const std::string& /*cf_name*/, uint64_t /*fd*/, + uint32_t /*level*/, TraceType /*block_type*/, + const std::string& /*block_key*/, + uint64_t /*block_key_id*/, + const BlockAccessInfo& /*block_access_info*/)> + block_callback, + std::set<std::string>* labels = nullptr) const; + + void UpdateFeatureVectors( + const std::vector<uint64_t>& access_sequence_number_timeline, + const std::vector<uint64_t>& access_timeline, const std::string& label, + std::map<std::string, Features>* label_features, + std::map<std::string, Predictions>* label_predictions) const; + + void WriteCorrelationFeaturesToFile( + const std::string& label, + const std::map<std::string, Features>& label_features, + const std::map<std::string, Predictions>& label_predictions, + uint32_t max_number_of_values) const; + + ROCKSDB_NAMESPACE::Env* env_; + const std::string trace_file_path_; + const std::string output_dir_; + std::string human_readable_trace_file_path_; + const bool compute_reuse_distance_; + const bool mrc_only_; + const bool is_human_readable_trace_file_; + + BlockCacheTraceHeader header_; + std::unique_ptr<BlockCacheTraceSimulator> cache_simulator_; + std::map<std::string, ColumnFamilyAccessInfoAggregate> cf_aggregates_map_; + std::map<std::string, BlockAccessInfo*> block_info_map_; + std::unordered_map<std::string, GetKeyInfo> get_key_info_map_; + uint64_t access_sequence_number_ = 0; + uint64_t trace_start_timestamp_in_seconds_ = 0; + uint64_t trace_end_timestamp_in_seconds_ = 0; + MissRatioStats miss_ratio_stats_; + uint64_t unique_block_id_ = 1; + uint64_t unique_get_key_id_ = 1; + BlockCacheHumanReadableTraceWriter human_readable_trace_writer_; +}; + +int block_cache_trace_analyzer_tool(int argc, char** argv); + +} // namespace ROCKSDB_NAMESPACE diff --git a/src/rocksdb/tools/block_cache_analyzer/block_cache_trace_analyzer_plot.py b/src/rocksdb/tools/block_cache_analyzer/block_cache_trace_analyzer_plot.py new file mode 100644 index 000000000..0fdaa4158 --- /dev/null +++ b/src/rocksdb/tools/block_cache_analyzer/block_cache_trace_analyzer_plot.py @@ -0,0 +1,721 @@ +#!/usr/bin/env python3 +import csv +import math +import os +import random +import sys + +import matplotlib +matplotlib.use("Agg") +import matplotlib.backends.backend_pdf +import matplotlib.pyplot as plt +import numpy as np +import pandas as pd +import seaborn as sns + + +# Make sure a legend has the same color across all generated graphs. +def get_cmap(n, name="hsv"): + """Returns a function that maps each index in 0, 1, ..., n-1 to a distinct + RGB color; the keyword argument name must be a standard mpl colormap name.""" + return plt.cm.get_cmap(name, n) + + +color_index = 0 +bar_color_maps = {} +colors = [] +n_colors = 360 +linear_colors = get_cmap(n_colors) +for i in range(n_colors): + colors.append(linear_colors(i)) +# Shuffle the colors so that adjacent bars in a graph are obvious to differentiate. +random.shuffle(colors) + + +def num_to_gb(n): + one_gb = 1024 * 1024 * 1024 + if float(n) % one_gb == 0: + return "{}".format(n / one_gb) + # Keep two decimal points. + return "{0:.2f}".format(float(n) / one_gb) + + +def plot_miss_stats_graphs( + csv_result_dir, output_result_dir, file_prefix, file_suffix, ylabel, pdf_file_name +): + miss_ratios = {} + for file in os.listdir(csv_result_dir): + if not file.startswith(file_prefix): + continue + if not file.endswith(file_suffix): + continue + print("Processing file {}/{}".format(csv_result_dir, file)) + mrc_file_path = csv_result_dir + "/" + file + with open(mrc_file_path, "r") as csvfile: + rows = csv.reader(csvfile, delimiter=",") + for row in rows: + cache_name = row[0] + num_shard_bits = int(row[1]) + ghost_capacity = int(row[2]) + capacity = int(row[3]) + miss_ratio = float(row[4]) + config = "{}-{}-{}".format(cache_name, num_shard_bits, ghost_capacity) + if config not in miss_ratios: + miss_ratios[config] = {} + miss_ratios[config]["x"] = [] + miss_ratios[config]["y"] = [] + miss_ratios[config]["x"].append(capacity) + miss_ratios[config]["y"].append(miss_ratio) + fig = plt.figure() + for config in miss_ratios: + plt.plot( + miss_ratios[config]["x"], miss_ratios[config]["y"], label=config + ) + plt.xlabel("Cache capacity") + plt.ylabel(ylabel) + plt.xscale("log", basex=2) + plt.ylim(ymin=0) + plt.title("{}".format(file)) + plt.legend() + fig.savefig( + output_result_dir + "/{}.pdf".format(pdf_file_name), bbox_inches="tight" + ) + + +def plot_miss_stats_diff_lru_graphs( + csv_result_dir, output_result_dir, file_prefix, file_suffix, ylabel, pdf_file_name +): + miss_ratios = {} + for file in os.listdir(csv_result_dir): + if not file.startswith(file_prefix): + continue + if not file.endswith(file_suffix): + continue + print("Processing file {}/{}".format(csv_result_dir, file)) + mrc_file_path = csv_result_dir + "/" + file + with open(mrc_file_path, "r") as csvfile: + rows = csv.reader(csvfile, delimiter=",") + for row in rows: + cache_name = row[0] + num_shard_bits = int(row[1]) + ghost_capacity = int(row[2]) + capacity = int(row[3]) + miss_ratio = float(row[4]) + config = "{}-{}-{}".format(cache_name, num_shard_bits, ghost_capacity) + if config not in miss_ratios: + miss_ratios[config] = {} + miss_ratios[config]["x"] = [] + miss_ratios[config]["y"] = [] + miss_ratios[config]["x"].append(capacity) + miss_ratios[config]["y"].append(miss_ratio) + if "lru-0-0" not in miss_ratios: + return + fig = plt.figure() + for config in miss_ratios: + diffs = [0] * len(miss_ratios["lru-0-0"]["x"]) + for i in range(len(miss_ratios["lru-0-0"]["x"])): + for j in range(len(miss_ratios[config]["x"])): + if miss_ratios["lru-0-0"]["x"][i] == miss_ratios[config]["x"][j]: + diffs[i] = ( + miss_ratios[config]["y"][j] - miss_ratios["lru-0-0"]["y"][i] + ) + break + plt.plot(miss_ratios["lru-0-0"]["x"], diffs, label=config) + plt.xlabel("Cache capacity") + plt.ylabel(ylabel) + plt.xscale("log", basex=2) + plt.title("{}".format(file)) + plt.legend() + fig.savefig( + output_result_dir + "/{}.pdf".format(pdf_file_name), bbox_inches="tight" + ) + + +def sanitize(label): + # matplotlib cannot plot legends that is prefixed with "_" + # so we need to remove them here. + index = 0 + for i in range(len(label)): + if label[i] == "_": + index += 1 + else: + break + data = label[index:] + # The value of uint64_max in c++. + if "18446744073709551615" in data: + return "max" + return data + + +# Read the csv file vertically, i.e., group the data by columns. +def read_data_for_plot_vertical(csvfile): + x = [] + labels = [] + label_stats = {} + csv_rows = csv.reader(csvfile, delimiter=",") + data_rows = [] + for row in csv_rows: + data_rows.append(row) + # header + for i in range(1, len(data_rows[0])): + labels.append(sanitize(data_rows[0][i])) + label_stats[i - 1] = [] + for i in range(1, len(data_rows)): + for j in range(len(data_rows[i])): + if j == 0: + x.append(sanitize(data_rows[i][j])) + continue + label_stats[j - 1].append(float(data_rows[i][j])) + return x, labels, label_stats + + +# Read the csv file horizontally, i.e., group the data by rows. +def read_data_for_plot_horizontal(csvfile): + x = [] + labels = [] + label_stats = {} + csv_rows = csv.reader(csvfile, delimiter=",") + data_rows = [] + for row in csv_rows: + data_rows.append(row) + # header + for i in range(1, len(data_rows)): + labels.append(sanitize(data_rows[i][0])) + label_stats[i - 1] = [] + for i in range(1, len(data_rows[0])): + x.append(sanitize(data_rows[0][i])) + for i in range(1, len(data_rows)): + for j in range(len(data_rows[i])): + if j == 0: + # label + continue + label_stats[i - 1].append(float(data_rows[i][j])) + return x, labels, label_stats + + +def read_data_for_plot(csvfile, vertical): + if vertical: + return read_data_for_plot_vertical(csvfile) + return read_data_for_plot_horizontal(csvfile) + + +def plot_line_charts( + csv_result_dir, + output_result_dir, + filename_prefix, + filename_suffix, + pdf_name, + xlabel, + ylabel, + title, + vertical, + legend, +): + global color_index, bar_color_maps, colors + pdf = matplotlib.backends.backend_pdf.PdfPages(output_result_dir + "/" + pdf_name) + for file in os.listdir(csv_result_dir): + if not file.endswith(filename_suffix): + continue + if not file.startswith(filename_prefix): + continue + print("Processing file {}/{}".format(csv_result_dir, file)) + with open(csv_result_dir + "/" + file, "r") as csvfile: + x, labels, label_stats = read_data_for_plot(csvfile, vertical) + if len(x) == 0 or len(labels) == 0: + continue + # plot figure + fig = plt.figure() + for label_index in label_stats: + # Assign a unique color to this label. + if labels[label_index] not in bar_color_maps: + bar_color_maps[labels[label_index]] = colors[color_index] + color_index += 1 + plt.plot( + [int(x[i]) for i in range(len(x) - 1)], + label_stats[label_index][:-1], + label=labels[label_index], + color=bar_color_maps[labels[label_index]], + ) + + # Translate time unit into x labels. + if "_60" in file: + plt.xlabel("{} (Minute)".format(xlabel)) + if "_3600" in file: + plt.xlabel("{} (Hour)".format(xlabel)) + plt.ylabel(ylabel) + plt.title("{} {}".format(title, file)) + if legend: + plt.legend() + pdf.savefig(fig) + pdf.close() + + +def plot_stacked_bar_charts( + csv_result_dir, + output_result_dir, + filename_suffix, + pdf_name, + xlabel, + ylabel, + title, + vertical, + x_prefix, +): + global color_index, bar_color_maps, colors + pdf = matplotlib.backends.backend_pdf.PdfPages( + "{}/{}".format(output_result_dir, pdf_name) + ) + for file in os.listdir(csv_result_dir): + if not file.endswith(filename_suffix): + continue + with open(csv_result_dir + "/" + file, "r") as csvfile: + print("Processing file {}/{}".format(csv_result_dir, file)) + x, labels, label_stats = read_data_for_plot(csvfile, vertical) + if len(x) == 0 or len(label_stats) == 0: + continue + # Plot figure + fig = plt.figure() + ind = np.arange(len(x)) # the x locations for the groups + width = 0.5 # the width of the bars: can also be len(x) sequence + bars = [] + bottom_bars = [] + for _i in label_stats[0]: + bottom_bars.append(0) + for i in range(0, len(label_stats)): + # Assign a unique color to this label. + if labels[i] not in bar_color_maps: + bar_color_maps[labels[i]] = colors[color_index] + color_index += 1 + p = plt.bar( + ind, + label_stats[i], + width, + bottom=bottom_bars, + color=bar_color_maps[labels[i]], + ) + bars.append(p[0]) + for j in range(len(label_stats[i])): + bottom_bars[j] += label_stats[i][j] + plt.xlabel(xlabel) + plt.ylabel(ylabel) + plt.xticks( + ind, [x_prefix + x[i] for i in range(len(x))], rotation=20, fontsize=8 + ) + plt.legend(bars, labels) + plt.title("{} filename:{}".format(title, file)) + pdf.savefig(fig) + pdf.close() + + +def plot_heatmap(csv_result_dir, output_result_dir, filename_suffix, pdf_name, title): + pdf = matplotlib.backends.backend_pdf.PdfPages( + "{}/{}".format(output_result_dir, pdf_name) + ) + for file in os.listdir(csv_result_dir): + if not file.endswith(filename_suffix): + continue + csv_file_name = "{}/{}".format(csv_result_dir, file) + print("Processing file {}/{}".format(csv_result_dir, file)) + corr_table = pd.read_csv(csv_file_name) + corr_table = corr_table.pivot("label", "corr", "value") + fig = plt.figure() + sns.heatmap(corr_table, annot=True, linewidths=0.5, fmt=".2") + plt.title("{} filename:{}".format(title, file)) + pdf.savefig(fig) + pdf.close() + + +def plot_timeline(csv_result_dir, output_result_dir): + plot_line_charts( + csv_result_dir, + output_result_dir, + filename_prefix="", + filename_suffix="access_timeline", + pdf_name="access_time.pdf", + xlabel="Time", + ylabel="Throughput", + title="Access timeline with group by label", + vertical=False, + legend=True, + ) + + +def convert_to_0_if_nan(n): + if math.isnan(n): + return 0.0 + return n + + +def plot_correlation(csv_result_dir, output_result_dir): + # Processing the correlation input first. + label_str_file = {} + for file in os.listdir(csv_result_dir): + if not file.endswith("correlation_input"): + continue + csv_file_name = "{}/{}".format(csv_result_dir, file) + print("Processing file {}/{}".format(csv_result_dir, file)) + corr_table = pd.read_csv(csv_file_name) + label_str = file.split("_")[0] + label = file[len(label_str) + 1 :] + label = label[: len(label) - len("_correlation_input")] + + output_file = "{}/{}_correlation_output".format(csv_result_dir, label_str) + if output_file not in label_str_file: + f = open("{}/{}_correlation_output".format(csv_result_dir, label_str), "w+") + label_str_file[output_file] = f + f.write("label,corr,value\n") + f = label_str_file[output_file] + f.write( + "{},{},{}\n".format( + label, + "LA+A", + convert_to_0_if_nan( + corr_table["num_accesses_since_last_access"].corr( + corr_table["num_accesses_till_next_access"], method="spearman" + ) + ), + ) + ) + f.write( + "{},{},{}\n".format( + label, + "PA+A", + convert_to_0_if_nan( + corr_table["num_past_accesses"].corr( + corr_table["num_accesses_till_next_access"], method="spearman" + ) + ), + ) + ) + f.write( + "{},{},{}\n".format( + label, + "LT+A", + convert_to_0_if_nan( + corr_table["elapsed_time_since_last_access"].corr( + corr_table["num_accesses_till_next_access"], method="spearman" + ) + ), + ) + ) + f.write( + "{},{},{}\n".format( + label, + "LA+T", + convert_to_0_if_nan( + corr_table["num_accesses_since_last_access"].corr( + corr_table["elapsed_time_till_next_access"], method="spearman" + ) + ), + ) + ) + f.write( + "{},{},{}\n".format( + label, + "LT+T", + convert_to_0_if_nan( + corr_table["elapsed_time_since_last_access"].corr( + corr_table["elapsed_time_till_next_access"], method="spearman" + ) + ), + ) + ) + f.write( + "{},{},{}\n".format( + label, + "PA+T", + convert_to_0_if_nan( + corr_table["num_past_accesses"].corr( + corr_table["elapsed_time_till_next_access"], method="spearman" + ) + ), + ) + ) + for label_str in label_str_file: + label_str_file[label_str].close() + + plot_heatmap( + csv_result_dir, + output_result_dir, + "correlation_output", + "correlation.pdf", + "Correlation", + ) + + +def plot_reuse_graphs(csv_result_dir, output_result_dir): + plot_stacked_bar_charts( + csv_result_dir, + output_result_dir, + filename_suffix="avg_reuse_interval_naccesses", + pdf_name="avg_reuse_interval_naccesses.pdf", + xlabel="", + ylabel="Percentage of accesses", + title="Average reuse interval", + vertical=True, + x_prefix="< ", + ) + plot_stacked_bar_charts( + csv_result_dir, + output_result_dir, + filename_suffix="avg_reuse_interval", + pdf_name="avg_reuse_interval.pdf", + xlabel="", + ylabel="Percentage of blocks", + title="Average reuse interval", + vertical=True, + x_prefix="< ", + ) + plot_stacked_bar_charts( + csv_result_dir, + output_result_dir, + filename_suffix="access_reuse_interval", + pdf_name="reuse_interval.pdf", + xlabel="Seconds", + ylabel="Percentage of accesses", + title="Reuse interval", + vertical=True, + x_prefix="< ", + ) + plot_stacked_bar_charts( + csv_result_dir, + output_result_dir, + filename_suffix="reuse_lifetime", + pdf_name="reuse_lifetime.pdf", + xlabel="Seconds", + ylabel="Percentage of blocks", + title="Reuse lifetime", + vertical=True, + x_prefix="< ", + ) + plot_line_charts( + csv_result_dir, + output_result_dir, + filename_prefix="", + filename_suffix="reuse_blocks_timeline", + pdf_name="reuse_blocks_timeline.pdf", + xlabel="", + ylabel="Percentage of blocks", + title="Reuse blocks timeline", + vertical=False, + legend=False, + ) + + +def plot_percentage_access_summary(csv_result_dir, output_result_dir): + plot_stacked_bar_charts( + csv_result_dir, + output_result_dir, + filename_suffix="percentage_of_accesses_summary", + pdf_name="percentage_access.pdf", + xlabel="", + ylabel="Percentage of accesses", + title="", + vertical=True, + x_prefix="", + ) + plot_stacked_bar_charts( + csv_result_dir, + output_result_dir, + filename_suffix="percent_ref_keys", + pdf_name="percent_ref_keys.pdf", + xlabel="", + ylabel="Percentage of blocks", + title="", + vertical=True, + x_prefix="", + ) + plot_stacked_bar_charts( + csv_result_dir, + output_result_dir, + filename_suffix="percent_data_size_on_ref_keys", + pdf_name="percent_data_size_on_ref_keys.pdf", + xlabel="", + ylabel="Percentage of blocks", + title="", + vertical=True, + x_prefix="", + ) + plot_stacked_bar_charts( + csv_result_dir, + output_result_dir, + filename_suffix="percent_accesses_on_ref_keys", + pdf_name="percent_accesses_on_ref_keys.pdf", + xlabel="", + ylabel="Percentage of blocks", + title="", + vertical=True, + x_prefix="", + ) + + +def plot_access_count_summary(csv_result_dir, output_result_dir): + plot_stacked_bar_charts( + csv_result_dir, + output_result_dir, + filename_suffix="access_count_summary", + pdf_name="access_count_summary.pdf", + xlabel="Access count", + ylabel="Percentage of blocks", + title="", + vertical=True, + x_prefix="< ", + ) + plot_line_charts( + csv_result_dir, + output_result_dir, + filename_prefix="", + filename_suffix="skewness", + pdf_name="skew.pdf", + xlabel="", + ylabel="Percentage of accesses", + title="Skewness", + vertical=True, + legend=False, + ) + + +def plot_miss_ratio_timeline(csv_result_dir, output_result_dir): + plot_line_charts( + csv_result_dir, + output_result_dir, + filename_prefix="", + filename_suffix="3600_miss_ratio_timeline", + pdf_name="miss_ratio_timeline.pdf", + xlabel="Time", + ylabel="Miss Ratio (%)", + title="Miss ratio timeline", + vertical=False, + legend=True, + ) + plot_line_charts( + csv_result_dir, + output_result_dir, + filename_prefix="", + filename_suffix="3600_miss_timeline", + pdf_name="miss_timeline.pdf", + xlabel="Time", + ylabel="# of misses ", + title="Miss timeline", + vertical=False, + legend=True, + ) + plot_line_charts( + csv_result_dir, + output_result_dir, + filename_prefix="", + filename_suffix="3600_miss_timeline", + pdf_name="miss_timeline.pdf", + xlabel="Time", + ylabel="# of misses ", + title="Miss timeline", + vertical=False, + legend=True, + ) + plot_line_charts( + csv_result_dir, + output_result_dir, + filename_prefix="", + filename_suffix="3600_policy_timeline", + pdf_name="policy_timeline.pdf", + xlabel="Time", + ylabel="# of times a policy is selected ", + title="Policy timeline", + vertical=False, + legend=True, + ) + plot_line_charts( + csv_result_dir, + output_result_dir, + filename_prefix="", + filename_suffix="3600_policy_ratio_timeline", + pdf_name="policy_ratio_timeline.pdf", + xlabel="Time", + ylabel="Percentage of times a policy is selected ", + title="Policy timeline", + vertical=False, + legend=True, + ) + + +if __name__ == "__main__": + if len(sys.argv) < 3: + print( + "Must provide two arguments: \n" + "1) The directory that saves a list of " + "directories which contain block cache trace analyzer result files. \n" + "2) the directory to save plotted graphs. \n" + ) + exit(1) + csv_result_dir = sys.argv[1] + output_result_dir = sys.argv[2] + print( + "Processing directory {} and save graphs to {}.".format( + csv_result_dir, output_result_dir + ) + ) + for csv_relative_dir in os.listdir(csv_result_dir): + csv_abs_dir = csv_result_dir + "/" + csv_relative_dir + result_dir = output_result_dir + "/" + csv_relative_dir + if not os.path.isdir(csv_abs_dir): + print("{} is not a directory".format(csv_abs_dir)) + continue + print("Processing experiment dir: {}".format(csv_relative_dir)) + if not os.path.exists(result_dir): + os.makedirs(result_dir) + plot_access_count_summary(csv_abs_dir, result_dir) + plot_timeline(csv_abs_dir, result_dir) + plot_miss_ratio_timeline(csv_result_dir, output_result_dir) + plot_correlation(csv_abs_dir, result_dir) + plot_reuse_graphs(csv_abs_dir, result_dir) + plot_percentage_access_summary(csv_abs_dir, result_dir) + plot_miss_stats_graphs( + csv_abs_dir, + result_dir, + file_prefix="", + file_suffix="mrc", + ylabel="Miss ratio (%)", + pdf_file_name="mrc", + ) + plot_miss_stats_diff_lru_graphs( + csv_abs_dir, + result_dir, + file_prefix="", + file_suffix="mrc", + ylabel="Miss ratio (%)", + pdf_file_name="mrc_diff_lru", + ) + # The following stats are only available in pysim. + for time_unit in ["1", "60", "3600"]: + plot_miss_stats_graphs( + csv_abs_dir, + result_dir, + file_prefix="ml_{}_".format(time_unit), + file_suffix="p95mb", + ylabel="p95 number of byte miss per {} seconds".format(time_unit), + pdf_file_name="p95mb_per{}_seconds".format(time_unit), + ) + plot_miss_stats_graphs( + csv_abs_dir, + result_dir, + file_prefix="ml_{}_".format(time_unit), + file_suffix="avgmb", + ylabel="Average number of byte miss per {} seconds".format(time_unit), + pdf_file_name="avgmb_per{}_seconds".format(time_unit), + ) + plot_miss_stats_diff_lru_graphs( + csv_abs_dir, + result_dir, + file_prefix="ml_{}_".format(time_unit), + file_suffix="p95mb", + ylabel="p95 number of byte miss per {} seconds".format(time_unit), + pdf_file_name="p95mb_per{}_seconds_diff_lru".format(time_unit), + ) + plot_miss_stats_diff_lru_graphs( + csv_abs_dir, + result_dir, + file_prefix="ml_{}_".format(time_unit), + file_suffix="avgmb", + ylabel="Average number of byte miss per {} seconds".format(time_unit), + pdf_file_name="avgmb_per{}_seconds_diff_lru".format(time_unit), + ) diff --git a/src/rocksdb/tools/block_cache_analyzer/block_cache_trace_analyzer_test.cc b/src/rocksdb/tools/block_cache_analyzer/block_cache_trace_analyzer_test.cc new file mode 100644 index 000000000..1dc723629 --- /dev/null +++ b/src/rocksdb/tools/block_cache_analyzer/block_cache_trace_analyzer_test.cc @@ -0,0 +1,717 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#ifndef ROCKSDB_LITE +#ifndef GFLAGS +#include <cstdio> +int main() { + fprintf(stderr, + "Please install gflags to run block_cache_trace_analyzer_test\n"); + return 1; +} +#else + +#include <fstream> +#include <iostream> +#include <map> +#include <vector> + +#include "rocksdb/env.h" +#include "rocksdb/status.h" +#include "rocksdb/trace_reader_writer.h" +#include "test_util/testharness.h" +#include "test_util/testutil.h" +#include "tools/block_cache_analyzer/block_cache_trace_analyzer.h" +#include "trace_replay/block_cache_tracer.h" + +namespace ROCKSDB_NAMESPACE { + +namespace { +const uint64_t kBlockSize = 1024; +const std::string kBlockKeyPrefix = "test-block-"; +const uint32_t kCFId = 0; +const uint32_t kLevel = 1; +const uint64_t kSSTStoringEvenKeys = 100; +const uint64_t kSSTStoringOddKeys = 101; +const std::string kRefKeyPrefix = "test-get-"; +const uint64_t kNumKeysInBlock = 1024; +const int kMaxArgCount = 100; +const size_t kArgBufferSize = 100000; +} // namespace + +class BlockCacheTracerTest : public testing::Test { + public: + BlockCacheTracerTest() { + test_path_ = test::PerThreadDBPath("block_cache_tracer_test"); + env_ = ROCKSDB_NAMESPACE::Env::Default(); + EXPECT_OK(env_->CreateDir(test_path_)); + trace_file_path_ = test_path_ + "/block_cache_trace"; + block_cache_sim_config_path_ = test_path_ + "/block_cache_sim_config"; + timeline_labels_ = + "block,all,cf,sst,level,bt,caller,cf_sst,cf_level,cf_bt,cf_caller"; + reuse_distance_labels_ = + "block,all,cf,sst,level,bt,caller,cf_sst,cf_level,cf_bt,cf_caller"; + reuse_distance_buckets_ = "1,1K,1M,1G"; + reuse_interval_labels_ = "block,all,cf,sst,level,bt,cf_sst,cf_level,cf_bt"; + reuse_interval_buckets_ = "1,10,100,1000"; + reuse_lifetime_labels_ = "block,all,cf,sst,level,bt,cf_sst,cf_level,cf_bt"; + reuse_lifetime_buckets_ = "1,10,100,1000"; + analyzing_callers_ = "Get,Iterator"; + access_count_buckets_ = "2,3,4,5,10"; + analyze_get_spatial_locality_labels_ = "all"; + analyze_get_spatial_locality_buckets_ = "10,20,30,40,50,60,70,80,90,100"; + } + + ~BlockCacheTracerTest() override { + if (getenv("KEEP_DB")) { + printf("The trace file is still at %s\n", trace_file_path_.c_str()); + return; + } + EXPECT_OK(env_->DeleteFile(trace_file_path_)); + EXPECT_OK(env_->DeleteDir(test_path_)); + } + + TableReaderCaller GetCaller(uint32_t key_id) { + uint32_t n = key_id % 5; + switch (n) { + case 0: + return TableReaderCaller::kPrefetch; + case 1: + return TableReaderCaller::kCompaction; + case 2: + return TableReaderCaller::kUserGet; + case 3: + return TableReaderCaller::kUserMultiGet; + case 4: + return TableReaderCaller::kUserIterator; + } + // This cannot happend. + assert(false); + return TableReaderCaller::kMaxBlockCacheLookupCaller; + } + + void WriteBlockAccess(BlockCacheTraceWriter* writer, uint32_t from_key_id, + TraceType block_type, uint32_t nblocks) { + assert(writer); + for (uint32_t i = 0; i < nblocks; i++) { + uint32_t key_id = from_key_id + i; + uint64_t timestamp = (key_id + 1) * kMicrosInSecond; + BlockCacheTraceRecord record; + record.block_type = block_type; + record.block_size = kBlockSize + key_id; + record.block_key = kBlockKeyPrefix + std::to_string(key_id); + record.access_timestamp = timestamp; + record.cf_id = kCFId; + record.cf_name = kDefaultColumnFamilyName; + record.caller = GetCaller(key_id); + record.level = kLevel; + if (key_id % 2 == 0) { + record.sst_fd_number = kSSTStoringEvenKeys; + } else { + record.sst_fd_number = kSSTStoringOddKeys; + } + record.is_cache_hit = Boolean::kFalse; + record.no_insert = Boolean::kFalse; + // Provide these fields for all block types. + // The writer should only write these fields for data blocks and the + // caller is either GET or MGET. + record.referenced_key = + kRefKeyPrefix + std::to_string(key_id) + std::string(8, 0); + record.referenced_key_exist_in_block = Boolean::kTrue; + record.num_keys_in_block = kNumKeysInBlock; + ASSERT_OK(writer->WriteBlockAccess( + record, record.block_key, record.cf_name, record.referenced_key)); + } + } + + void AssertBlockAccessInfo( + uint32_t key_id, TraceType type, + const std::map<std::string, BlockAccessInfo>& block_access_info_map) { + auto key_id_str = kBlockKeyPrefix + std::to_string(key_id); + ASSERT_TRUE(block_access_info_map.find(key_id_str) != + block_access_info_map.end()); + auto& block_access_info = block_access_info_map.find(key_id_str)->second; + ASSERT_EQ(1, block_access_info.num_accesses); + ASSERT_EQ(kBlockSize + key_id, block_access_info.block_size); + ASSERT_GT(block_access_info.first_access_time, 0); + ASSERT_GT(block_access_info.last_access_time, 0); + ASSERT_EQ(1, block_access_info.caller_num_access_map.size()); + TableReaderCaller expected_caller = GetCaller(key_id); + ASSERT_TRUE(block_access_info.caller_num_access_map.find(expected_caller) != + block_access_info.caller_num_access_map.end()); + ASSERT_EQ( + 1, + block_access_info.caller_num_access_map.find(expected_caller)->second); + + if ((expected_caller == TableReaderCaller::kUserGet || + expected_caller == TableReaderCaller::kUserMultiGet) && + type == TraceType::kBlockTraceDataBlock) { + ASSERT_EQ(kNumKeysInBlock, block_access_info.num_keys); + ASSERT_EQ(1, block_access_info.key_num_access_map.size()); + ASSERT_EQ(0, block_access_info.non_exist_key_num_access_map.size()); + ASSERT_EQ(1, block_access_info.num_referenced_key_exist_in_block); + } + } + + void RunBlockCacheTraceAnalyzer() { + std::vector<std::string> params = { + "./block_cache_trace_analyzer", + "-block_cache_trace_path=" + trace_file_path_, + "-block_cache_sim_config_path=" + block_cache_sim_config_path_, + "-block_cache_analysis_result_dir=" + test_path_, + "-print_block_size_stats", + "-print_access_count_stats", + "-print_data_block_access_count_stats", + "-cache_sim_warmup_seconds=0", + "-analyze_bottom_k_access_count_blocks=5", + "-analyze_top_k_access_count_blocks=5", + "-analyze_blocks_reuse_k_reuse_window=5", + "-timeline_labels=" + timeline_labels_, + "-reuse_distance_labels=" + reuse_distance_labels_, + "-reuse_distance_buckets=" + reuse_distance_buckets_, + "-reuse_interval_labels=" + reuse_interval_labels_, + "-reuse_interval_buckets=" + reuse_interval_buckets_, + "-reuse_lifetime_labels=" + reuse_lifetime_labels_, + "-reuse_lifetime_buckets=" + reuse_lifetime_buckets_, + "-analyze_callers=" + analyzing_callers_, + "-access_count_buckets=" + access_count_buckets_, + "-analyze_get_spatial_locality_labels=" + + analyze_get_spatial_locality_labels_, + "-analyze_get_spatial_locality_buckets=" + + analyze_get_spatial_locality_buckets_, + "-analyze_correlation_coefficients_labels=all", + "-skew_labels=all", + "-skew_buckets=10,50,100"}; + char arg_buffer[kArgBufferSize]; + char* argv[kMaxArgCount]; + int argc = 0; + int cursor = 0; + for (const auto& arg : params) { + ASSERT_LE(cursor + arg.size() + 1, kArgBufferSize); + ASSERT_LE(argc + 1, kMaxArgCount); + snprintf(arg_buffer + cursor, arg.size() + 1, "%s", arg.c_str()); + + argv[argc++] = arg_buffer + cursor; + cursor += static_cast<int>(arg.size()) + 1; + } + ASSERT_EQ(0, + ROCKSDB_NAMESPACE::block_cache_trace_analyzer_tool(argc, argv)); + } + + Env* env_; + EnvOptions env_options_; + std::string block_cache_sim_config_path_; + std::string trace_file_path_; + std::string test_path_; + std::string timeline_labels_; + std::string reuse_distance_labels_; + std::string reuse_distance_buckets_; + std::string reuse_interval_labels_; + std::string reuse_interval_buckets_; + std::string reuse_lifetime_labels_; + std::string reuse_lifetime_buckets_; + std::string analyzing_callers_; + std::string access_count_buckets_; + std::string analyze_get_spatial_locality_labels_; + std::string analyze_get_spatial_locality_buckets_; +}; + +TEST_F(BlockCacheTracerTest, BlockCacheAnalyzer) { + { + // Generate a trace file. + TraceOptions trace_opt; + std::unique_ptr<TraceWriter> trace_writer; + ASSERT_OK(NewFileTraceWriter(env_, env_options_, trace_file_path_, + &trace_writer)); + BlockCacheTraceWriter writer(env_, trace_opt, std::move(trace_writer)); + ASSERT_OK(writer.WriteHeader()); + WriteBlockAccess(&writer, 0, TraceType::kBlockTraceDataBlock, 50); + ASSERT_OK(env_->FileExists(trace_file_path_)); + } + { + // Generate a cache sim config. + std::string config = "lru,1,0,1K,1M,1G"; + std::ofstream out(block_cache_sim_config_path_); + ASSERT_TRUE(out.is_open()); + out << config << std::endl; + out.close(); + } + RunBlockCacheTraceAnalyzer(); + { + // Validate the cache miss ratios. + std::vector<uint64_t> expected_capacities{1024, 1024 * 1024, + 1024 * 1024 * 1024}; + const std::string mrc_path = test_path_ + "/49_50_mrc"; + std::ifstream infile(mrc_path); + uint32_t config_index = 0; + std::string line; + // Read header. + ASSERT_TRUE(getline(infile, line)); + while (getline(infile, line)) { + std::stringstream ss(line); + std::vector<std::string> result_strs; + while (ss.good()) { + std::string substr; + getline(ss, substr, ','); + result_strs.push_back(substr); + } + ASSERT_EQ(6, result_strs.size()); + ASSERT_LT(config_index, expected_capacities.size()); + ASSERT_EQ("lru", result_strs[0]); // cache_name + ASSERT_EQ("1", result_strs[1]); // num_shard_bits + ASSERT_EQ("0", result_strs[2]); // ghost_cache_capacity + ASSERT_EQ(std::to_string(expected_capacities[config_index]), + result_strs[3]); // cache_capacity + ASSERT_EQ("100.0000", result_strs[4]); // miss_ratio + ASSERT_EQ("50", result_strs[5]); // number of accesses. + config_index++; + } + ASSERT_EQ(expected_capacities.size(), config_index); + infile.close(); + ASSERT_OK(env_->DeleteFile(mrc_path)); + + const std::vector<std::string> time_units{"1", "60", "3600"}; + expected_capacities.push_back(port::kMaxUint64); + for (auto const& expected_capacity : expected_capacities) { + for (auto const& time_unit : time_units) { + const std::string miss_ratio_timeline_path = + test_path_ + "/" + std::to_string(expected_capacity) + "_" + + time_unit + "_miss_ratio_timeline"; + std::ifstream mrt_file(miss_ratio_timeline_path); + // Read header. + ASSERT_TRUE(getline(mrt_file, line)); + ASSERT_TRUE(getline(mrt_file, line)); + std::stringstream ss(line); + bool read_header = false; + while (ss.good()) { + std::string substr; + getline(ss, substr, ','); + if (!read_header) { + if (expected_capacity == port::kMaxUint64) { + ASSERT_EQ("trace", substr); + } else { + ASSERT_EQ("lru-1-0", substr); + } + read_header = true; + continue; + } + ASSERT_DOUBLE_EQ(100.0, ParseDouble(substr)); + } + ASSERT_FALSE(getline(mrt_file, line)); + mrt_file.close(); + ASSERT_OK(env_->DeleteFile(miss_ratio_timeline_path)); + } + for (auto const& time_unit : time_units) { + const std::string miss_timeline_path = + test_path_ + "/" + std::to_string(expected_capacity) + "_" + + time_unit + "_miss_timeline"; + std::ifstream mt_file(miss_timeline_path); + // Read header. + ASSERT_TRUE(getline(mt_file, line)); + ASSERT_TRUE(getline(mt_file, line)); + std::stringstream ss(line); + uint32_t num_misses = 0; + while (ss.good()) { + std::string substr; + getline(ss, substr, ','); + if (num_misses == 0) { + if (expected_capacity == port::kMaxUint64) { + ASSERT_EQ("trace", substr); + } else { + ASSERT_EQ("lru-1-0", substr); + } + num_misses++; + continue; + } + num_misses += ParseInt(substr); + } + ASSERT_EQ(51, num_misses); + ASSERT_FALSE(getline(mt_file, line)); + mt_file.close(); + ASSERT_OK(env_->DeleteFile(miss_timeline_path)); + } + } + } + { + // Validate the skewness csv file. + const std::string skewness_file_path = test_path_ + "/all_skewness"; + std::ifstream skew_file(skewness_file_path); + // Read header. + std::string line; + ASSERT_TRUE(getline(skew_file, line)); + std::stringstream ss(line); + double sum_percent = 0; + while (getline(skew_file, line)) { + std::stringstream ss_naccess(line); + std::string substr; + bool read_label = false; + while (ss_naccess.good()) { + ASSERT_TRUE(getline(ss_naccess, substr, ',')); + if (!read_label) { + read_label = true; + continue; + } + sum_percent += ParseDouble(substr); + } + } + ASSERT_EQ(100.0, sum_percent); + ASSERT_FALSE(getline(skew_file, line)); + skew_file.close(); + ASSERT_OK(env_->DeleteFile(skewness_file_path)); + } + { + // Validate the timeline csv files. + const std::vector<std::string> time_units{"_60", "_3600"}; + const std::vector<std::string> user_access_only_flags{"user_access_only_", + "all_access_"}; + for (auto const& user_access_only : user_access_only_flags) { + for (auto const& unit : time_units) { + std::stringstream ss(timeline_labels_); + while (ss.good()) { + std::string l; + ASSERT_TRUE(getline(ss, l, ',')); + if (l.find("block") == std::string::npos) { + if (user_access_only != "all_access_") { + continue; + } + } + const std::string timeline_file = test_path_ + "/" + + user_access_only + l + unit + + "_access_timeline"; + std::ifstream infile(timeline_file); + std::string line; + const uint64_t expected_naccesses = 50; + const uint64_t expected_user_accesses = 30; + ASSERT_TRUE(getline(infile, line)) << timeline_file; + uint32_t naccesses = 0; + while (getline(infile, line)) { + std::stringstream ss_naccess(line); + std::string substr; + bool read_label = false; + while (ss_naccess.good()) { + ASSERT_TRUE(getline(ss_naccess, substr, ',')); + if (!read_label) { + read_label = true; + continue; + } + naccesses += ParseUint32(substr); + } + } + if (user_access_only == "user_access_only_") { + ASSERT_EQ(expected_user_accesses, naccesses) << timeline_file; + } else { + ASSERT_EQ(expected_naccesses, naccesses) << timeline_file; + } + ASSERT_OK(env_->DeleteFile(timeline_file)); + } + } + } + } + { + // Validate the reuse_interval and reuse_distance csv files. + std::map<std::string, std::string> test_reuse_csv_files; + test_reuse_csv_files["_access_reuse_interval"] = reuse_interval_labels_; + test_reuse_csv_files["_reuse_distance"] = reuse_distance_labels_; + test_reuse_csv_files["_reuse_lifetime"] = reuse_lifetime_labels_; + test_reuse_csv_files["_avg_reuse_interval"] = reuse_interval_labels_; + test_reuse_csv_files["_avg_reuse_interval_naccesses"] = + reuse_interval_labels_; + for (auto const& test : test_reuse_csv_files) { + const std::string& file_suffix = test.first; + const std::string& labels = test.second; + const uint32_t expected_num_rows = 5; + std::stringstream ss(labels); + while (ss.good()) { + std::string l; + ASSERT_TRUE(getline(ss, l, ',')); + const std::string reuse_csv_file = test_path_ + "/" + l + file_suffix; + std::ifstream infile(reuse_csv_file); + std::string line; + ASSERT_TRUE(getline(infile, line)); + double npercentage = 0; + uint32_t nrows = 0; + while (getline(infile, line)) { + std::stringstream ss_naccess(line); + bool label_read = false; + nrows++; + while (ss_naccess.good()) { + std::string substr; + ASSERT_TRUE(getline(ss_naccess, substr, ',')); + if (!label_read) { + label_read = true; + continue; + } + npercentage += ParseDouble(substr); + } + } + ASSERT_EQ(expected_num_rows, nrows); + if ("_reuse_lifetime" == test.first || + "_avg_reuse_interval" == test.first || + "_avg_reuse_interval_naccesses" == test.first) { + ASSERT_EQ(100, npercentage) << reuse_csv_file; + } else { + ASSERT_LT(npercentage, 0); + } + ASSERT_OK(env_->DeleteFile(reuse_csv_file)); + } + } + } + + { + // Validate the percentage of accesses summary. + const std::string percent_access_summary_file = + test_path_ + "/percentage_of_accesses_summary"; + std::ifstream infile(percent_access_summary_file); + std::string line; + ASSERT_TRUE(getline(infile, line)); + std::set<std::string> callers; + std::set<std::string> expected_callers{"Get", "MultiGet", "Iterator", + "Prefetch", "Compaction"}; + while (getline(infile, line)) { + std::stringstream caller_percent(line); + std::string caller; + ASSERT_TRUE(getline(caller_percent, caller, ',')); + std::string percent; + ASSERT_TRUE(getline(caller_percent, percent, ',')); + ASSERT_FALSE(caller_percent.good()); + callers.insert(caller); + ASSERT_EQ(20, ParseDouble(percent)); + } + ASSERT_EQ(expected_callers.size(), callers.size()); + for (auto caller : callers) { + ASSERT_TRUE(expected_callers.find(caller) != expected_callers.end()); + } + ASSERT_OK(env_->DeleteFile(percent_access_summary_file)); + } + { + // Validate the percentage of accesses summary by analyzing callers. + std::stringstream analyzing_callers(analyzing_callers_); + while (analyzing_callers.good()) { + std::string caller; + ASSERT_TRUE(getline(analyzing_callers, caller, ',')); + std::vector<std::string> breakdowns{"level", "bt"}; + for (auto breakdown : breakdowns) { + const std::string file_name = test_path_ + "/" + caller + "_" + + breakdown + + "_percentage_of_accesses_summary"; + std::ifstream infile(file_name); + std::string line; + ASSERT_TRUE(getline(infile, line)); + double sum = 0; + while (getline(infile, line)) { + std::stringstream label_percent(line); + std::string label; + ASSERT_TRUE(getline(label_percent, label, ',')); + std::string percent; + ASSERT_TRUE(getline(label_percent, percent, ',')); + ASSERT_FALSE(label_percent.good()); + sum += ParseDouble(percent); + } + ASSERT_EQ(100, sum); + ASSERT_OK(env_->DeleteFile(file_name)); + } + } + } + const std::vector<std::string> access_types{"user_access_only", "all_access"}; + const std::vector<std::string> prefix{"bt", "cf"}; + for (auto const& pre : prefix) { + for (auto const& access_type : access_types) { + { + // Validate the access count summary. + const std::string bt_access_count_summary = test_path_ + "/" + pre + + "_" + access_type + + "_access_count_summary"; + std::ifstream infile(bt_access_count_summary); + std::string line; + ASSERT_TRUE(getline(infile, line)); + double sum_percent = 0; + while (getline(infile, line)) { + std::stringstream bt_percent(line); + std::string bt; + ASSERT_TRUE(getline(bt_percent, bt, ',')); + std::string percent; + ASSERT_TRUE(getline(bt_percent, percent, ',')); + sum_percent += ParseDouble(percent); + } + ASSERT_EQ(100.0, sum_percent); + ASSERT_OK(env_->DeleteFile(bt_access_count_summary)); + } + } + } + for (auto const& access_type : access_types) { + std::vector<std::string> block_types{"Index", "Data", "Filter"}; + for (auto block_type : block_types) { + // Validate reuse block timeline. + const std::string reuse_blocks_timeline = test_path_ + "/" + block_type + + "_" + access_type + + "_5_reuse_blocks_timeline"; + std::ifstream infile(reuse_blocks_timeline); + std::string line; + ASSERT_TRUE(getline(infile, line)) << reuse_blocks_timeline; + uint32_t index = 0; + while (getline(infile, line)) { + std::stringstream timeline(line); + bool start_time = false; + double sum = 0; + while (timeline.good()) { + std::string value; + ASSERT_TRUE(getline(timeline, value, ',')); + if (!start_time) { + start_time = true; + continue; + } + sum += ParseDouble(value); + } + index++; + ASSERT_LT(sum, 100.0 * index + 1) << reuse_blocks_timeline; + } + ASSERT_OK(env_->DeleteFile(reuse_blocks_timeline)); + } + } + + std::stringstream ss(analyze_get_spatial_locality_labels_); + while (ss.good()) { + std::string l; + ASSERT_TRUE(getline(ss, l, ',')); + const std::vector<std::string> spatial_locality_files{ + "_percent_ref_keys", "_percent_accesses_on_ref_keys", + "_percent_data_size_on_ref_keys"}; + for (auto const& spatial_locality_file : spatial_locality_files) { + const std::string filename = test_path_ + "/" + l + spatial_locality_file; + std::ifstream infile(filename); + std::string line; + ASSERT_TRUE(getline(infile, line)); + double sum_percent = 0; + uint32_t nrows = 0; + while (getline(infile, line)) { + std::stringstream bt_percent(line); + std::string bt; + ASSERT_TRUE(getline(bt_percent, bt, ',')); + std::string percent; + ASSERT_TRUE(getline(bt_percent, percent, ',')); + sum_percent += ParseDouble(percent); + nrows++; + } + ASSERT_EQ(11, nrows); + ASSERT_EQ(100.0, sum_percent); + ASSERT_OK(env_->DeleteFile(filename)); + } + } + ASSERT_OK(env_->DeleteFile(block_cache_sim_config_path_)); +} + +TEST_F(BlockCacheTracerTest, MixedBlocks) { + { + // Generate a trace file containing a mix of blocks. + // It contains two SST files with 25 blocks of odd numbered block_key in + // kSSTStoringOddKeys and 25 blocks of even numbered blocks_key in + // kSSTStoringEvenKeys. + TraceOptions trace_opt; + std::unique_ptr<TraceWriter> trace_writer; + ASSERT_OK(NewFileTraceWriter(env_, env_options_, trace_file_path_, + &trace_writer)); + BlockCacheTraceWriter writer(env_, trace_opt, std::move(trace_writer)); + ASSERT_OK(writer.WriteHeader()); + // Write blocks of different types. + WriteBlockAccess(&writer, 0, TraceType::kBlockTraceUncompressionDictBlock, + 10); + WriteBlockAccess(&writer, 10, TraceType::kBlockTraceDataBlock, 10); + WriteBlockAccess(&writer, 20, TraceType::kBlockTraceFilterBlock, 10); + WriteBlockAccess(&writer, 30, TraceType::kBlockTraceIndexBlock, 10); + WriteBlockAccess(&writer, 40, TraceType::kBlockTraceRangeDeletionBlock, 10); + ASSERT_OK(env_->FileExists(trace_file_path_)); + } + + { + // Verify trace file is generated correctly. + std::unique_ptr<TraceReader> trace_reader; + ASSERT_OK(NewFileTraceReader(env_, env_options_, trace_file_path_, + &trace_reader)); + BlockCacheTraceReader reader(std::move(trace_reader)); + BlockCacheTraceHeader header; + ASSERT_OK(reader.ReadHeader(&header)); + ASSERT_EQ(kMajorVersion, header.rocksdb_major_version); + ASSERT_EQ(kMinorVersion, header.rocksdb_minor_version); + // Read blocks. + BlockCacheTraceAnalyzer analyzer( + trace_file_path_, + /*output_miss_ratio_curve_path=*/"", + /*human_readable_trace_file_path=*/"", + /*compute_reuse_distance=*/true, + /*mrc_only=*/false, + /*is_block_cache_human_readable_trace=*/false, + /*simulator=*/nullptr); + // The analyzer ends when it detects an incomplete access record. + ASSERT_EQ(Status::Incomplete(""), analyzer.Analyze()); + const uint64_t expected_num_cfs = 1; + std::vector<uint64_t> expected_fds{kSSTStoringOddKeys, kSSTStoringEvenKeys}; + const std::vector<TraceType> expected_types{ + TraceType::kBlockTraceUncompressionDictBlock, + TraceType::kBlockTraceDataBlock, TraceType::kBlockTraceFilterBlock, + TraceType::kBlockTraceIndexBlock, + TraceType::kBlockTraceRangeDeletionBlock}; + const uint64_t expected_num_keys_per_type = 5; + + auto& stats = analyzer.TEST_cf_aggregates_map(); + ASSERT_EQ(expected_num_cfs, stats.size()); + ASSERT_TRUE(stats.find(kDefaultColumnFamilyName) != stats.end()); + auto& cf_stats = stats.find(kDefaultColumnFamilyName)->second; + ASSERT_EQ(expected_fds.size(), cf_stats.fd_aggregates_map.size()); + for (auto fd_id : expected_fds) { + ASSERT_TRUE(cf_stats.fd_aggregates_map.find(fd_id) != + cf_stats.fd_aggregates_map.end()); + ASSERT_EQ(kLevel, cf_stats.fd_aggregates_map.find(fd_id)->second.level); + auto& block_type_aggregates_map = cf_stats.fd_aggregates_map.find(fd_id) + ->second.block_type_aggregates_map; + ASSERT_EQ(expected_types.size(), block_type_aggregates_map.size()); + uint32_t key_id = 0; + for (auto type : expected_types) { + ASSERT_TRUE(block_type_aggregates_map.find(type) != + block_type_aggregates_map.end()); + auto& block_access_info_map = + block_type_aggregates_map.find(type)->second.block_access_info_map; + // Each block type has 5 blocks. + ASSERT_EQ(expected_num_keys_per_type, block_access_info_map.size()); + for (uint32_t i = 0; i < 10; i++) { + // Verify that odd numbered blocks are stored in kSSTStoringOddKeys + // and even numbered blocks are stored in kSSTStoringEvenKeys. + auto key_id_str = kBlockKeyPrefix + std::to_string(key_id); + if (fd_id == kSSTStoringOddKeys) { + if (key_id % 2 == 1) { + AssertBlockAccessInfo(key_id, type, block_access_info_map); + } else { + ASSERT_TRUE(block_access_info_map.find(key_id_str) == + block_access_info_map.end()); + } + } else { + if (key_id % 2 == 1) { + ASSERT_TRUE(block_access_info_map.find(key_id_str) == + block_access_info_map.end()); + } else { + AssertBlockAccessInfo(key_id, type, block_access_info_map); + } + } + key_id++; + } + } + } + } +} + +} // namespace ROCKSDB_NAMESPACE + +int main(int argc, char** argv) { + ::testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +} +#endif // GFLAG +#else +#include <stdio.h> +int main(int /*argc*/, char** /*argv*/) { + fprintf(stderr, + "block_cache_trace_analyzer_test is not supported in ROCKSDB_LITE\n"); + return 0; +} +#endif // ROCKSDB_LITE diff --git a/src/rocksdb/tools/block_cache_analyzer/block_cache_trace_analyzer_tool.cc b/src/rocksdb/tools/block_cache_analyzer/block_cache_trace_analyzer_tool.cc new file mode 100644 index 000000000..44fec5598 --- /dev/null +++ b/src/rocksdb/tools/block_cache_analyzer/block_cache_trace_analyzer_tool.cc @@ -0,0 +1,25 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). +// +#ifndef ROCKSDB_LITE +#ifndef GFLAGS +#include <cstdio> +int main() { + fprintf(stderr, "Please install gflags to run rocksdb tools\n"); + return 1; +} +#else // GFLAGS +#include "tools/block_cache_analyzer/block_cache_trace_analyzer.h" +int main(int argc, char** argv) { + return ROCKSDB_NAMESPACE::block_cache_trace_analyzer_tool(argc, argv); +} +#endif // GFLAGS +#else // ROCKSDB_LITE +#include <stdio.h> +int main(int /*argc*/, char** /*argv*/) { + fprintf(stderr, "Not supported in lite mode.\n"); + return 1; +} +#endif // ROCKSDB_LITE |