summaryrefslogtreecommitdiffstats
path: root/src/rocksdb/tools/block_cache_analyzer
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-21 11:54:28 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-21 11:54:28 +0000
commite6918187568dbd01842d8d1d2c808ce16a894239 (patch)
tree64f88b554b444a49f656b6c656111a145cbbaa28 /src/rocksdb/tools/block_cache_analyzer
parentInitial commit. (diff)
downloadceph-b26c4052f3542036551aa9dec9caa4226e456195.tar.xz
ceph-b26c4052f3542036551aa9dec9caa4226e456195.zip
Adding upstream version 18.2.2.upstream/18.2.2
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'src/rocksdb/tools/block_cache_analyzer')
-rw-r--r--src/rocksdb/tools/block_cache_analyzer/__init__.py2
-rw-r--r--src/rocksdb/tools/block_cache_analyzer/block_cache_pysim.py2000
-rw-r--r--src/rocksdb/tools/block_cache_analyzer/block_cache_pysim.sh156
-rw-r--r--src/rocksdb/tools/block_cache_analyzer/block_cache_pysim_test.py734
-rw-r--r--src/rocksdb/tools/block_cache_analyzer/block_cache_trace_analyzer.cc2316
-rw-r--r--src/rocksdb/tools/block_cache_analyzer/block_cache_trace_analyzer.h397
-rw-r--r--src/rocksdb/tools/block_cache_analyzer/block_cache_trace_analyzer_plot.py729
-rw-r--r--src/rocksdb/tools/block_cache_analyzer/block_cache_trace_analyzer_test.cc735
-rw-r--r--src/rocksdb/tools/block_cache_analyzer/block_cache_trace_analyzer_tool.cc25
9 files changed, 7094 insertions, 0 deletions
diff --git a/src/rocksdb/tools/block_cache_analyzer/__init__.py b/src/rocksdb/tools/block_cache_analyzer/__init__.py
new file mode 100644
index 000000000..8dbe96a78
--- /dev/null
+++ b/src/rocksdb/tools/block_cache_analyzer/__init__.py
@@ -0,0 +1,2 @@
+#!/usr/bin/env python3
+# Copyright (c) Facebook, Inc. and its affiliates. All Rights Reserved.
diff --git a/src/rocksdb/tools/block_cache_analyzer/block_cache_pysim.py b/src/rocksdb/tools/block_cache_analyzer/block_cache_pysim.py
new file mode 100644
index 000000000..67307df53
--- /dev/null
+++ b/src/rocksdb/tools/block_cache_analyzer/block_cache_pysim.py
@@ -0,0 +1,2000 @@
+#!/usr/bin/env python3
+# Copyright (c) Facebook, Inc. and its affiliates. All Rights Reserved.
+
+import gc
+import heapq
+import random
+import sys
+import time
+from collections import OrderedDict
+from os import path
+
+import numpy as np
+
+
+kSampleSize = 64 # The sample size used when performing eviction.
+kMicrosInSecond = 1000000
+kSecondsInMinute = 60
+kSecondsInHour = 3600
+
+
+class TraceRecord:
+ """
+ A trace record represents a block access.
+ It holds the same struct as BlockCacheTraceRecord in
+ trace_replay/block_cache_tracer.h
+ """
+
+ def __init__(
+ self,
+ access_time,
+ block_id,
+ block_type,
+ block_size,
+ cf_id,
+ cf_name,
+ level,
+ fd,
+ caller,
+ no_insert,
+ get_id,
+ key_id,
+ kv_size,
+ is_hit,
+ referenced_key_exist_in_block,
+ num_keys_in_block,
+ table_id,
+ seq_number,
+ block_key_size,
+ key_size,
+ block_offset_in_file,
+ next_access_seq_no,
+ ):
+ self.access_time = access_time
+ self.block_id = block_id
+ self.block_type = block_type
+ self.block_size = block_size + block_key_size
+ self.cf_id = cf_id
+ self.cf_name = cf_name
+ self.level = level
+ self.fd = fd
+ self.caller = caller
+ if no_insert == 1:
+ self.no_insert = True
+ else:
+ self.no_insert = False
+ self.get_id = get_id
+ self.key_id = key_id
+ self.kv_size = kv_size
+ if is_hit == 1:
+ self.is_hit = True
+ else:
+ self.is_hit = False
+ if referenced_key_exist_in_block == 1:
+ self.referenced_key_exist_in_block = True
+ else:
+ self.referenced_key_exist_in_block = False
+ self.num_keys_in_block = num_keys_in_block
+ self.table_id = table_id
+ self.seq_number = seq_number
+ self.block_key_size = block_key_size
+ self.key_size = key_size
+ self.block_offset_in_file = block_offset_in_file
+ self.next_access_seq_no = next_access_seq_no
+
+
+class CacheEntry:
+ """A cache entry stored in the cache."""
+
+ def __init__(
+ self,
+ value_size,
+ cf_id,
+ level,
+ block_type,
+ table_id,
+ access_number,
+ time_s,
+ num_hits=0,
+ ):
+ self.value_size = value_size
+ self.last_access_number = access_number
+ self.num_hits = num_hits
+ self.cf_id = 0
+ self.level = level
+ self.block_type = block_type
+ self.last_access_time = time_s
+ self.insertion_time = time_s
+ self.table_id = table_id
+
+ def __repr__(self):
+ """Debug string."""
+ return "(s={},last={},hits={},cf={},l={},bt={})\n".format(
+ self.value_size,
+ self.last_access_number,
+ self.num_hits,
+ self.cf_id,
+ self.level,
+ self.block_type,
+ )
+
+ def cost_class(self, cost_class_label):
+ if cost_class_label == "table_bt":
+ return "{}-{}".format(self.table_id, self.block_type)
+ elif cost_class_label == "table":
+ return "{}".format(self.table_id)
+ elif cost_class_label == "bt":
+ return "{}".format(self.block_type)
+ elif cost_class_label == "cf":
+ return "{}".format(self.cf_id)
+ elif cost_class_label == "cf_bt":
+ return "{}-{}".format(self.cf_id, self.block_type)
+ elif cost_class_label == "table_level_bt":
+ return "{}-{}-{}".format(self.table_id, self.level, self.block_type)
+ assert False, "Unknown cost class label {}".format(cost_class_label)
+ return None
+
+
+class HashEntry:
+ """A hash entry stored in a hash table."""
+
+ def __init__(self, key, hash, value):
+ self.key = key
+ self.hash = hash
+ self.value = value
+
+ def __repr__(self):
+ return "k={},h={},v=[{}]".format(self.key, self.hash, self.value)
+
+
+class HashTable:
+ """
+ A custom implementation of hash table to support fast random sampling.
+ It is closed hashing and uses chaining to resolve hash conflicts.
+ It grows/shrinks the hash table upon insertion/deletion to support
+ fast lookups and random samplings.
+ """
+
+ def __init__(self):
+ self.initial_size = 32
+ self.table = [None] * self.initial_size
+ self.elements = 0
+
+ def random_sample(self, sample_size):
+ """Randomly sample 'sample_size' hash entries from the table."""
+ samples = []
+ index = random.randint(0, len(self.table) - 1)
+ pos = index
+ # Starting from index, adding hash entries to the sample list until
+ # sample_size is met or we ran out of entries.
+ while True:
+ if self.table[pos] is not None:
+ for i in range(len(self.table[pos])):
+ if self.table[pos][i] is None:
+ continue
+ samples.append(self.table[pos][i])
+ if len(samples) == sample_size:
+ break
+ pos += 1
+ pos = pos % len(self.table)
+ if pos == index or len(samples) == sample_size:
+ break
+ assert len(samples) <= sample_size
+ return samples
+
+ def __repr__(self):
+ all_entries = []
+ for i in range(len(self.table)):
+ if self.table[i] is None:
+ continue
+ for j in range(len(self.table[i])):
+ if self.table[i][j] is not None:
+ all_entries.append(self.table[i][j])
+ return "{}".format(all_entries)
+
+ def values(self):
+ all_values = []
+ for i in range(len(self.table)):
+ if self.table[i] is None:
+ continue
+ for j in range(len(self.table[i])):
+ if self.table[i][j] is not None:
+ all_values.append(self.table[i][j].value)
+ return all_values
+
+ def __len__(self):
+ return self.elements
+
+ def insert(self, key, hash, value):
+ """
+ Insert a hash entry in the table. Replace the old entry if it already
+ exists.
+ """
+ self.grow()
+ inserted = False
+ index = hash % len(self.table)
+ if self.table[index] is None:
+ self.table[index] = []
+ # Search for the entry first.
+ for i in range(len(self.table[index])):
+ if self.table[index][i] is None:
+ continue
+ if self.table[index][i].hash == hash and self.table[index][i].key == key:
+ # The entry already exists in the table.
+ self.table[index][i] = HashEntry(key, hash, value)
+ return
+
+ # Find an empty slot.
+ for i in range(len(self.table[index])):
+ if self.table[index][i] is None:
+ self.table[index][i] = HashEntry(key, hash, value)
+ inserted = True
+ break
+ if not inserted:
+ self.table[index].append(HashEntry(key, hash, value))
+ self.elements += 1
+
+ def resize(self, new_size):
+ if new_size == len(self.table):
+ return
+ if new_size < self.initial_size:
+ return
+ if self.elements < 100:
+ return
+ new_table = [None] * new_size
+ # Copy 'self.table' to new_table.
+ for i in range(len(self.table)):
+ entries = self.table[i]
+ if entries is None:
+ continue
+ for j in range(len(entries)):
+ if entries[j] is None:
+ continue
+ index = entries[j].hash % new_size
+ if new_table[index] is None:
+ new_table[index] = []
+ new_table[index].append(entries[j])
+ self.table = new_table
+ del new_table
+ # Manually call python gc here to free the memory as 'self.table'
+ # might be very large.
+ gc.collect()
+
+ def grow(self):
+ if self.elements < 4 * len(self.table):
+ return
+ new_size = int(len(self.table) * 1.5)
+ self.resize(new_size)
+
+ def delete(self, key, hash):
+ index = hash % len(self.table)
+ deleted = False
+ deleted_entry = None
+ if self.table[index] is None:
+ return
+ for i in range(len(self.table[index])):
+ if (
+ self.table[index][i] is not None
+ and self.table[index][i].hash == hash
+ and self.table[index][i].key == key
+ ):
+ deleted_entry = self.table[index][i]
+ self.table[index][i] = None
+ self.elements -= 1
+ deleted = True
+ break
+ if deleted:
+ self.shrink()
+ return deleted_entry
+
+ def shrink(self):
+ if self.elements * 2 >= len(self.table):
+ return
+ new_size = int(len(self.table) * 0.7)
+ self.resize(new_size)
+
+ def lookup(self, key, hash):
+ index = hash % len(self.table)
+ if self.table[index] is None:
+ return None
+ for i in range(len(self.table[index])):
+ if (
+ self.table[index][i] is not None
+ and self.table[index][i].hash == hash
+ and self.table[index][i].key == key
+ ):
+ return self.table[index][i].value
+ return None
+
+
+class MissRatioStats:
+ def __init__(self, time_unit):
+ self.num_misses = 0
+ self.num_accesses = 0
+ self.time_unit = time_unit
+ self.time_misses = {}
+ self.time_miss_bytes = {}
+ self.time_accesses = {}
+
+ def update_metrics(self, access_time, is_hit, miss_bytes):
+ access_time /= kMicrosInSecond * self.time_unit
+ self.num_accesses += 1
+ if access_time not in self.time_accesses:
+ self.time_accesses[access_time] = 0
+ self.time_accesses[access_time] += 1
+ if not is_hit:
+ self.num_misses += 1
+ if access_time not in self.time_misses:
+ self.time_misses[access_time] = 0
+ self.time_miss_bytes[access_time] = 0
+ self.time_misses[access_time] += 1
+ self.time_miss_bytes[access_time] += miss_bytes
+
+ def reset_counter(self):
+ self.num_misses = 0
+ self.num_accesses = 0
+ self.time_miss_bytes.clear()
+ self.time_misses.clear()
+ self.time_accesses.clear()
+
+ def compute_miss_bytes(self):
+ miss_bytes = []
+ for at in self.time_miss_bytes:
+ miss_bytes.append(self.time_miss_bytes[at])
+ miss_bytes = sorted(miss_bytes)
+ avg_miss_bytes = 0
+ p95_miss_bytes = 0
+ for i in range(len(miss_bytes)):
+ avg_miss_bytes += float(miss_bytes[i]) / float(len(miss_bytes))
+
+ p95_index = min(int(0.95 * float(len(miss_bytes))), len(miss_bytes) - 1)
+ p95_miss_bytes = miss_bytes[p95_index]
+ return avg_miss_bytes, p95_miss_bytes
+
+ def miss_ratio(self):
+ return float(self.num_misses) * 100.0 / float(self.num_accesses)
+
+ def write_miss_timeline(
+ self, cache_type, cache_size, target_cf_name, result_dir, start, end
+ ):
+ start /= kMicrosInSecond * self.time_unit
+ end /= kMicrosInSecond * self.time_unit
+ header_file_path = "{}/header-ml-miss-timeline-{}-{}-{}-{}".format(
+ result_dir, self.time_unit, cache_type, cache_size, target_cf_name
+ )
+ if not path.exists(header_file_path):
+ with open(header_file_path, "w+") as header_file:
+ header = "time"
+ for trace_time in range(start, end):
+ header += ",{}".format(trace_time)
+ header_file.write(header + "\n")
+ file_path = "{}/data-ml-miss-timeline-{}-{}-{}-{}".format(
+ result_dir, self.time_unit, cache_type, cache_size, target_cf_name
+ )
+ with open(file_path, "w+") as file:
+ row = "{}".format(cache_type)
+ for trace_time in range(start, end):
+ row += ",{}".format(self.time_misses.get(trace_time, 0))
+ file.write(row + "\n")
+
+ def write_miss_ratio_timeline(
+ self, cache_type, cache_size, target_cf_name, result_dir, start, end
+ ):
+ start /= kMicrosInSecond * self.time_unit
+ end /= kMicrosInSecond * self.time_unit
+ header_file_path = "{}/header-ml-miss-ratio-timeline-{}-{}-{}-{}".format(
+ result_dir, self.time_unit, cache_type, cache_size, target_cf_name
+ )
+ if not path.exists(header_file_path):
+ with open(header_file_path, "w+") as header_file:
+ header = "time"
+ for trace_time in range(start, end):
+ header += ",{}".format(trace_time)
+ header_file.write(header + "\n")
+ file_path = "{}/data-ml-miss-ratio-timeline-{}-{}-{}-{}".format(
+ result_dir, self.time_unit, cache_type, cache_size, target_cf_name
+ )
+ with open(file_path, "w+") as file:
+ row = "{}".format(cache_type)
+ for trace_time in range(start, end):
+ naccesses = self.time_accesses.get(trace_time, 0)
+ miss_ratio = 0
+ if naccesses > 0:
+ miss_ratio = float(
+ self.time_misses.get(trace_time, 0) * 100.0
+ ) / float(naccesses)
+ row += ",{0:.2f}".format(miss_ratio)
+ file.write(row + "\n")
+
+
+class PolicyStats:
+ def __init__(self, time_unit, policies):
+ self.time_selected_polices = {}
+ self.time_accesses = {}
+ self.policy_names = {}
+ self.time_unit = time_unit
+ for i in range(len(policies)):
+ self.policy_names[i] = policies[i].policy_name()
+
+ def update_metrics(self, access_time, selected_policy):
+ access_time /= kMicrosInSecond * self.time_unit
+ if access_time not in self.time_accesses:
+ self.time_accesses[access_time] = 0
+ self.time_accesses[access_time] += 1
+ if access_time not in self.time_selected_polices:
+ self.time_selected_polices[access_time] = {}
+ policy_name = self.policy_names[selected_policy]
+ if policy_name not in self.time_selected_polices[access_time]:
+ self.time_selected_polices[access_time][policy_name] = 0
+ self.time_selected_polices[access_time][policy_name] += 1
+
+ def write_policy_timeline(
+ self, cache_type, cache_size, target_cf_name, result_dir, start, end
+ ):
+ start /= kMicrosInSecond * self.time_unit
+ end /= kMicrosInSecond * self.time_unit
+ header_file_path = "{}/header-ml-policy-timeline-{}-{}-{}-{}".format(
+ result_dir, self.time_unit, cache_type, cache_size, target_cf_name
+ )
+ if not path.exists(header_file_path):
+ with open(header_file_path, "w+") as header_file:
+ header = "time"
+ for trace_time in range(start, end):
+ header += ",{}".format(trace_time)
+ header_file.write(header + "\n")
+ file_path = "{}/data-ml-policy-timeline-{}-{}-{}-{}".format(
+ result_dir, self.time_unit, cache_type, cache_size, target_cf_name
+ )
+ with open(file_path, "w+") as file:
+ for policy in self.policy_names:
+ policy_name = self.policy_names[policy]
+ row = "{}-{}".format(cache_type, policy_name)
+ for trace_time in range(start, end):
+ row += ",{}".format(
+ self.time_selected_polices.get(trace_time, {}).get(
+ policy_name, 0
+ )
+ )
+ file.write(row + "\n")
+
+ def write_policy_ratio_timeline(
+ self, cache_type, cache_size, target_cf_name, file_path, start, end
+ ):
+ start /= kMicrosInSecond * self.time_unit
+ end /= kMicrosInSecond * self.time_unit
+ header_file_path = "{}/header-ml-policy-ratio-timeline-{}-{}-{}-{}".format(
+ result_dir, self.time_unit, cache_type, cache_size, target_cf_name
+ )
+ if not path.exists(header_file_path):
+ with open(header_file_path, "w+") as header_file:
+ header = "time"
+ for trace_time in range(start, end):
+ header += ",{}".format(trace_time)
+ header_file.write(header + "\n")
+ file_path = "{}/data-ml-policy-ratio-timeline-{}-{}-{}-{}".format(
+ result_dir, self.time_unit, cache_type, cache_size, target_cf_name
+ )
+ with open(file_path, "w+") as file:
+ for policy in self.policy_names:
+ policy_name = self.policy_names[policy]
+ row = "{}-{}".format(cache_type, policy_name)
+ for trace_time in range(start, end):
+ naccesses = self.time_accesses.get(trace_time, 0)
+ ratio = 0
+ if naccesses > 0:
+ ratio = float(
+ self.time_selected_polices.get(trace_time, {}).get(
+ policy_name, 0
+ )
+ * 100.0
+ ) / float(naccesses)
+ row += ",{0:.2f}".format(ratio)
+ file.write(row + "\n")
+
+
+class Policy(object):
+ """
+ A policy maintains a set of evicted keys. It returns a reward of one to
+ itself if it has not evicted a missing key. Otherwise, it gives itself 0
+ reward.
+ """
+
+ def __init__(self):
+ self.evicted_keys = {}
+
+ def evict(self, key, max_size):
+ self.evicted_keys[key] = 0
+
+ def delete(self, key):
+ self.evicted_keys.pop(key, None)
+
+ def prioritize_samples(self, samples, auxilliary_info):
+ raise NotImplementedError
+
+ def policy_name(self):
+ raise NotImplementedError
+
+ def generate_reward(self, key):
+ if key in self.evicted_keys:
+ return 0
+ return 1
+
+
+class LRUPolicy(Policy):
+ def prioritize_samples(self, samples, auxilliary_info):
+ return sorted(
+ samples,
+ cmp=lambda e1, e2: e1.value.last_access_number
+ - e2.value.last_access_number,
+ )
+
+ def policy_name(self):
+ return "lru"
+
+
+class MRUPolicy(Policy):
+ def prioritize_samples(self, samples, auxilliary_info):
+ return sorted(
+ samples,
+ cmp=lambda e1, e2: e2.value.last_access_number
+ - e1.value.last_access_number,
+ )
+
+ def policy_name(self):
+ return "mru"
+
+
+class LFUPolicy(Policy):
+ def prioritize_samples(self, samples, auxilliary_info):
+ return sorted(samples, cmp=lambda e1, e2: e1.value.num_hits - e2.value.num_hits)
+
+ def policy_name(self):
+ return "lfu"
+
+
+class HyperbolicPolicy(Policy):
+ """
+ An implementation of Hyperbolic caching.
+
+ Aaron Blankstein, Siddhartha Sen, and Michael J. Freedman. 2017.
+ Hyperbolic caching: flexible caching for web applications. In Proceedings
+ of the 2017 USENIX Conference on Usenix Annual Technical Conference
+ (USENIX ATC '17). USENIX Association, Berkeley, CA, USA, 499-511.
+ """
+
+ def compare(self, e1, e2, now):
+ e1_duration = max(0, (now - e1.value.insertion_time) / kMicrosInSecond) * float(
+ e1.value.value_size
+ )
+ e2_duration = max(0, (now - e2.value.insertion_time) / kMicrosInSecond) * float(
+ e2.value.value_size
+ )
+ if e1_duration == e2_duration:
+ return e1.value.num_hits - e2.value.num_hits
+ if e1_duration == 0:
+ return 1
+ if e2_duration == 0:
+ return 1
+ diff = (float(e1.value.num_hits) / (float(e1_duration))) - (
+ float(e2.value.num_hits) / float(e2_duration)
+ )
+ if diff == 0:
+ return 0
+ elif diff > 0:
+ return 1
+ else:
+ return -1
+
+ def prioritize_samples(self, samples, auxilliary_info):
+ assert len(auxilliary_info) == 3
+ now = auxilliary_info[0]
+ return sorted(samples, cmp=lambda e1, e2: self.compare(e1, e2, now))
+
+ def policy_name(self):
+ return "hb"
+
+
+class CostClassPolicy(Policy):
+ """
+ We calculate the hit density of a cost class as
+ number of hits / total size in cache * average duration in the cache.
+
+ An entry has a higher priority if its class's hit density is higher.
+ """
+
+ def compare(self, e1, e2, now, cost_classes, cost_class_label):
+ e1_class = e1.value.cost_class(cost_class_label)
+ e2_class = e2.value.cost_class(cost_class_label)
+
+ assert e1_class in cost_classes
+ assert e2_class in cost_classes
+
+ e1_entry = cost_classes[e1_class]
+ e2_entry = cost_classes[e2_class]
+ e1_density = e1_entry.density(now)
+ e2_density = e2_entry.density(now)
+ e1_hits = cost_classes[e1_class].hits
+ e2_hits = cost_classes[e2_class].hits
+
+ if e1_density == e2_density:
+ return e1_hits - e2_hits
+
+ if e1_entry.num_entries_in_cache == 0:
+ return -1
+ if e2_entry.num_entries_in_cache == 0:
+ return 1
+
+ if e1_density == 0:
+ return 1
+ if e2_density == 0:
+ return -1
+ diff = (float(e1_hits) / float(e1_density)) - (
+ float(e2_hits) / float(e2_density)
+ )
+ if diff == 0:
+ return 0
+ elif diff > 0:
+ return 1
+ else:
+ return -1
+
+ def prioritize_samples(self, samples, auxilliary_info):
+ assert len(auxilliary_info) == 3
+ now = auxilliary_info[0]
+ cost_classes = auxilliary_info[1]
+ cost_class_label = auxilliary_info[2]
+ return sorted(
+ samples,
+ cmp=lambda e1, e2: self.compare(
+ e1, e2, now, cost_classes, cost_class_label
+ ),
+ )
+
+ def policy_name(self):
+ return "cc"
+
+
+class Cache(object):
+ """
+ This is the base class for the implementations of alternative cache
+ replacement policies.
+ """
+
+ def __init__(self, cache_size, enable_cache_row_key):
+ self.cache_size = cache_size
+ self.used_size = 0
+ self.per_second_miss_ratio_stats = MissRatioStats(1)
+ self.miss_ratio_stats = MissRatioStats(kSecondsInMinute)
+ self.per_hour_miss_ratio_stats = MissRatioStats(kSecondsInHour)
+ # 0: disabled. 1: enabled. Insert both row and the refereneced data block.
+ # 2: enabled. Insert only the row but NOT the referenced data block.
+ self.enable_cache_row_key = enable_cache_row_key
+ self.get_id_row_key_map = {}
+ self.max_seen_get_id = 0
+ self.retain_get_id_range = 100000
+
+ def block_key(self, trace_record):
+ return "b{}".format(trace_record.block_id)
+
+ def row_key(self, trace_record):
+ return "g{}-{}".format(trace_record.fd, trace_record.key_id)
+
+ def _lookup(self, trace_record, key, hash):
+ """
+ Look up the key in the cache.
+ Returns true upon a cache hit, false otherwise.
+ """
+ raise NotImplementedError
+
+ def _evict(self, trace_record, key, hash, value_size):
+ """
+ Evict entries in the cache until there is enough room to insert the new
+ entry with 'value_size'.
+ """
+ raise NotImplementedError
+
+ def _insert(self, trace_record, key, hash, value_size):
+ """
+ Insert the new entry into the cache.
+ """
+ raise NotImplementedError
+
+ def _should_admit(self, trace_record, key, hash, value_size):
+ """
+ A custom admission policy to decide whether we should admit the new
+ entry upon a cache miss.
+ Returns true if the new entry should be admitted, false otherwise.
+ """
+ raise NotImplementedError
+
+ def cache_name(self):
+ """
+ The name of the replacement policy.
+ """
+ raise NotImplementedError
+
+ def is_ml_cache(self):
+ return False
+
+ def _update_stats(self, access_time, is_hit, miss_bytes):
+ self.per_second_miss_ratio_stats.update_metrics(access_time, is_hit, miss_bytes)
+ self.miss_ratio_stats.update_metrics(access_time, is_hit, miss_bytes)
+ self.per_hour_miss_ratio_stats.update_metrics(access_time, is_hit, miss_bytes)
+
+ def access(self, trace_record):
+ """
+ Access a trace record. The simulator calls this function to access a
+ trace record.
+ """
+ assert self.used_size <= self.cache_size
+ if (
+ self.enable_cache_row_key > 0
+ and trace_record.caller == 1
+ and trace_record.key_id != 0
+ and trace_record.get_id != 0
+ ):
+ # This is a get request.
+ self._access_row(trace_record)
+ return
+ is_hit = self._access_kv(
+ trace_record,
+ self.block_key(trace_record),
+ trace_record.block_id,
+ trace_record.block_size,
+ trace_record.no_insert,
+ )
+ self._update_stats(
+ trace_record.access_time, is_hit=is_hit, miss_bytes=trace_record.block_size
+ )
+
+ def _access_row(self, trace_record):
+ row_key = self.row_key(trace_record)
+ self.max_seen_get_id = max(self.max_seen_get_id, trace_record.get_id)
+ self.get_id_row_key_map.pop(
+ self.max_seen_get_id - self.retain_get_id_range, None
+ )
+ if trace_record.get_id not in self.get_id_row_key_map:
+ self.get_id_row_key_map[trace_record.get_id] = {}
+ self.get_id_row_key_map[trace_record.get_id]["h"] = False
+ if self.get_id_row_key_map[trace_record.get_id]["h"]:
+ # We treat future accesses as hits since this get request
+ # completes.
+ # print("row hit 1")
+ self._update_stats(trace_record.access_time, is_hit=True, miss_bytes=0)
+ return
+ if row_key not in self.get_id_row_key_map[trace_record.get_id]:
+ # First time seen this key.
+ is_hit = self._access_kv(
+ trace_record,
+ key=row_key,
+ hash=trace_record.key_id,
+ value_size=trace_record.kv_size,
+ no_insert=False,
+ )
+ inserted = False
+ if trace_record.kv_size > 0:
+ inserted = True
+ self.get_id_row_key_map[trace_record.get_id][row_key] = inserted
+ self.get_id_row_key_map[trace_record.get_id]["h"] = is_hit
+ if self.get_id_row_key_map[trace_record.get_id]["h"]:
+ # We treat future accesses as hits since this get request
+ # completes.
+ # print("row hit 2")
+ self._update_stats(trace_record.access_time, is_hit=True, miss_bytes=0)
+ return
+ # Access its blocks.
+ no_insert = trace_record.no_insert
+ if (
+ self.enable_cache_row_key == 2
+ and trace_record.kv_size > 0
+ and trace_record.block_type == 9
+ ):
+ no_insert = True
+ is_hit = self._access_kv(
+ trace_record,
+ key=self.block_key(trace_record),
+ hash=trace_record.block_id,
+ value_size=trace_record.block_size,
+ no_insert=no_insert,
+ )
+ self._update_stats(
+ trace_record.access_time, is_hit, miss_bytes=trace_record.block_size
+ )
+ if (
+ trace_record.kv_size > 0
+ and not self.get_id_row_key_map[trace_record.get_id][row_key]
+ ):
+ # Insert the row key-value pair.
+ self._access_kv(
+ trace_record,
+ key=row_key,
+ hash=trace_record.key_id,
+ value_size=trace_record.kv_size,
+ no_insert=False,
+ )
+ # Mark as inserted.
+ self.get_id_row_key_map[trace_record.get_id][row_key] = True
+
+ def _access_kv(self, trace_record, key, hash, value_size, no_insert):
+ # Sanity checks.
+ assert self.used_size <= self.cache_size
+ if self._lookup(trace_record, key, hash):
+ # A cache hit.
+ return True
+ if no_insert or value_size <= 0:
+ return False
+ # A cache miss.
+ if value_size > self.cache_size:
+ # The block is too large to fit into the cache.
+ return False
+ self._evict(trace_record, key, hash, value_size)
+ if self._should_admit(trace_record, key, hash, value_size):
+ self._insert(trace_record, key, hash, value_size)
+ self.used_size += value_size
+ return False
+
+
+class CostClassEntry:
+ """
+ A cost class maintains aggregated statistics of cached entries in a class.
+ For example, we may define block type as a class. Then, cached blocks of the
+ same type will share one cost class entry.
+ """
+
+ def __init__(self):
+ self.hits = 0
+ self.num_entries_in_cache = 0
+ self.size_in_cache = 0
+ self.sum_insertion_times = 0
+ self.sum_last_access_time = 0
+
+ def insert(self, trace_record, key, value_size):
+ self.size_in_cache += value_size
+ self.num_entries_in_cache += 1
+ self.sum_insertion_times += trace_record.access_time / kMicrosInSecond
+ self.sum_last_access_time += trace_record.access_time / kMicrosInSecond
+
+ def remove(self, insertion_time, last_access_time, key, value_size, num_hits):
+ self.hits -= num_hits
+ self.num_entries_in_cache -= 1
+ self.sum_insertion_times -= insertion_time / kMicrosInSecond
+ self.size_in_cache -= value_size
+ self.sum_last_access_time -= last_access_time / kMicrosInSecond
+
+ def update_on_hit(self, trace_record, last_access_time):
+ self.hits += 1
+ self.sum_last_access_time -= last_access_time / kMicrosInSecond
+ self.sum_last_access_time += trace_record.access_time / kMicrosInSecond
+
+ def avg_lifetime_in_cache(self, now):
+ avg_insertion_time = self.sum_insertion_times / self.num_entries_in_cache
+ return now / kMicrosInSecond - avg_insertion_time
+
+ def avg_last_access_time(self):
+ if self.num_entries_in_cache == 0:
+ return 0
+ return float(self.sum_last_access_time) / float(self.num_entries_in_cache)
+
+ def avg_size(self):
+ if self.num_entries_in_cache == 0:
+ return 0
+ return float(self.sum_last_access_time) / float(self.num_entries_in_cache)
+
+ def density(self, now):
+ avg_insertion_time = self.sum_insertion_times / self.num_entries_in_cache
+ in_cache_duration = now / kMicrosInSecond - avg_insertion_time
+ return self.size_in_cache * in_cache_duration
+
+
+class MLCache(Cache):
+ """
+ MLCache is the base class for implementations of alternative replacement
+ policies using reinforcement learning.
+ """
+
+ def __init__(self, cache_size, enable_cache_row_key, policies, cost_class_label):
+ super(MLCache, self).__init__(cache_size, enable_cache_row_key)
+ self.table = HashTable()
+ self.policy_stats = PolicyStats(kSecondsInMinute, policies)
+ self.per_hour_policy_stats = PolicyStats(kSecondsInHour, policies)
+ self.policies = policies
+ self.cost_classes = {}
+ self.cost_class_label = cost_class_label
+
+ def is_ml_cache(self):
+ return True
+
+ def _lookup(self, trace_record, key, hash):
+ value = self.table.lookup(key, hash)
+ if value is not None:
+ # Update the entry's cost class statistics.
+ if self.cost_class_label is not None:
+ cost_class = value.cost_class(self.cost_class_label)
+ assert cost_class in self.cost_classes
+ self.cost_classes[cost_class].update_on_hit(
+ trace_record, value.last_access_time
+ )
+ # Update the entry's last access time.
+ self.table.insert(
+ key,
+ hash,
+ CacheEntry(
+ value_size=value.value_size,
+ cf_id=value.cf_id,
+ level=value.level,
+ block_type=value.block_type,
+ table_id=value.table_id,
+ access_number=self.miss_ratio_stats.num_accesses,
+ time_s=trace_record.access_time,
+ num_hits=value.num_hits + 1,
+ ),
+ )
+ return True
+ return False
+
+ def _evict(self, trace_record, key, hash, value_size):
+ # Select a policy, random sample kSampleSize keys from the cache, then
+ # evict keys in the sample set until we have enough room for the new
+ # entry.
+ policy_index = self._select_policy(trace_record, key)
+ assert policy_index < len(self.policies) and policy_index >= 0
+ self.policies[policy_index].delete(key)
+ self.policy_stats.update_metrics(trace_record.access_time, policy_index)
+ self.per_hour_policy_stats.update_metrics(
+ trace_record.access_time, policy_index
+ )
+ while self.used_size + value_size > self.cache_size:
+ # Randomly sample n entries.
+ samples = self.table.random_sample(kSampleSize)
+ samples = self.policies[policy_index].prioritize_samples(
+ samples,
+ [trace_record.access_time, self.cost_classes, self.cost_class_label],
+ )
+ for hash_entry in samples:
+ assert self.table.delete(hash_entry.key, hash_entry.hash) is not None
+ self.used_size -= hash_entry.value.value_size
+ self.policies[policy_index].evict(
+ key=hash_entry.key, max_size=self.table.elements
+ )
+ # Update the entry's cost class statistics.
+ if self.cost_class_label is not None:
+ cost_class = hash_entry.value.cost_class(self.cost_class_label)
+ assert cost_class in self.cost_classes
+ self.cost_classes[cost_class].remove(
+ hash_entry.value.insertion_time,
+ hash_entry.value.last_access_time,
+ key,
+ hash_entry.value.value_size,
+ hash_entry.value.num_hits,
+ )
+ if self.used_size + value_size <= self.cache_size:
+ break
+
+ def _insert(self, trace_record, key, hash, value_size):
+ assert self.used_size + value_size <= self.cache_size
+ entry = CacheEntry(
+ value_size,
+ trace_record.cf_id,
+ trace_record.level,
+ trace_record.block_type,
+ trace_record.table_id,
+ self.miss_ratio_stats.num_accesses,
+ trace_record.access_time,
+ )
+ # Update the entry's cost class statistics.
+ if self.cost_class_label is not None:
+ cost_class = entry.cost_class(self.cost_class_label)
+ if cost_class not in self.cost_classes:
+ self.cost_classes[cost_class] = CostClassEntry()
+ self.cost_classes[cost_class].insert(trace_record, key, value_size)
+ self.table.insert(key, hash, entry)
+
+ def _should_admit(self, trace_record, key, hash, value_size):
+ return True
+
+ def _select_policy(self, trace_record, key):
+ raise NotImplementedError
+
+
+class ThompsonSamplingCache(MLCache):
+ """
+ An implementation of Thompson Sampling for the Bernoulli Bandit.
+
+ Daniel J. Russo, Benjamin Van Roy, Abbas Kazerouni, Ian Osband,
+ and Zheng Wen. 2018. A Tutorial on Thompson Sampling. Found.
+ Trends Mach. Learn. 11, 1 (July 2018), 1-96.
+ DOI: https://doi.org/10.1561/2200000070
+ """
+
+ def __init__(
+ self,
+ cache_size,
+ enable_cache_row_key,
+ policies,
+ cost_class_label,
+ init_a=1,
+ init_b=1,
+ ):
+ super(ThompsonSamplingCache, self).__init__(
+ cache_size, enable_cache_row_key, policies, cost_class_label
+ )
+ self._as = {}
+ self._bs = {}
+ for _i in range(len(policies)):
+ self._as = [init_a] * len(self.policies)
+ self._bs = [init_b] * len(self.policies)
+
+ def _select_policy(self, trace_record, key):
+ if len(self.policies) == 1:
+ return 0
+ samples = [
+ np.random.beta(self._as[x], self._bs[x]) for x in range(len(self.policies))
+ ]
+ selected_policy = max(range(len(self.policies)), key=lambda x: samples[x])
+ reward = self.policies[selected_policy].generate_reward(key)
+ assert reward <= 1 and reward >= 0
+ self._as[selected_policy] += reward
+ self._bs[selected_policy] += 1 - reward
+ return selected_policy
+
+ def cache_name(self):
+ if self.enable_cache_row_key:
+ return "Hybrid ThompsonSampling with cost class {} (ts_hybrid)".format(
+ self.cost_class_label
+ )
+ return "ThompsonSampling with cost class {} (ts)".format(self.cost_class_label)
+
+
+class LinUCBCache(MLCache):
+ """
+ An implementation of LinUCB with disjoint linear models.
+
+ Lihong Li, Wei Chu, John Langford, and Robert E. Schapire. 2010.
+ A contextual-bandit approach to personalized news article recommendation.
+ In Proceedings of the 19th international conference on World wide web
+ (WWW '10). ACM, New York, NY, USA, 661-670.
+ DOI=http://dx.doi.org/10.1145/1772690.1772758
+ """
+
+ def __init__(self, cache_size, enable_cache_row_key, policies, cost_class_label):
+ super(LinUCBCache, self).__init__(
+ cache_size, enable_cache_row_key, policies, cost_class_label
+ )
+ self.nfeatures = 4 # Block type, level, cf.
+ self.th = np.zeros((len(self.policies), self.nfeatures))
+ self.eps = 0.2
+ self.b = np.zeros_like(self.th)
+ self.A = np.zeros((len(self.policies), self.nfeatures, self.nfeatures))
+ self.A_inv = np.zeros((len(self.policies), self.nfeatures, self.nfeatures))
+ for i in range(len(self.policies)):
+ self.A[i] = np.identity(self.nfeatures)
+ self.th_hat = np.zeros_like(self.th)
+ self.p = np.zeros(len(self.policies))
+ self.alph = 0.2
+
+ def _select_policy(self, trace_record, key):
+ if len(self.policies) == 1:
+ return 0
+ x_i = np.zeros(self.nfeatures) # The current context vector
+ x_i[0] = trace_record.block_type
+ x_i[1] = trace_record.level
+ x_i[2] = trace_record.cf_id
+ p = np.zeros(len(self.policies))
+ for a in range(len(self.policies)):
+ self.th_hat[a] = self.A_inv[a].dot(self.b[a])
+ ta = x_i.dot(self.A_inv[a]).dot(x_i)
+ a_upper_ci = self.alph * np.sqrt(ta)
+ a_mean = self.th_hat[a].dot(x_i)
+ p[a] = a_mean + a_upper_ci
+ p = p + (np.random.random(len(p)) * 0.000001)
+ selected_policy = p.argmax()
+ reward = self.policies[selected_policy].generate_reward(key)
+ assert reward <= 1 and reward >= 0
+ self.A[selected_policy] += np.outer(x_i, x_i)
+ self.b[selected_policy] += reward * x_i
+ self.A_inv[selected_policy] = np.linalg.inv(self.A[selected_policy])
+ del x_i
+ return selected_policy
+
+ def cache_name(self):
+ if self.enable_cache_row_key:
+ return "Hybrid LinUCB with cost class {} (linucb_hybrid)".format(
+ self.cost_class_label
+ )
+ return "LinUCB with cost class {} (linucb)".format(self.cost_class_label)
+
+
+class OPTCacheEntry:
+ """
+ A cache entry for the OPT algorithm. The entries are sorted based on its
+ next access sequence number in reverse order, i.e., the entry which next
+ access is the furthest in the future is ordered before other entries.
+ """
+
+ def __init__(self, key, next_access_seq_no, value_size):
+ self.key = key
+ self.next_access_seq_no = next_access_seq_no
+ self.value_size = value_size
+ self.is_removed = False
+
+ def __cmp__(self, other):
+ if other.next_access_seq_no != self.next_access_seq_no:
+ return other.next_access_seq_no - self.next_access_seq_no
+ return self.value_size - other.value_size
+
+ def __repr__(self):
+ return "({} {} {} {})".format(
+ self.key, self.next_access_seq_no, self.value_size, self.is_removed
+ )
+
+
+class PQTable:
+ """
+ A hash table with a priority queue.
+ """
+
+ def __init__(self):
+ # A list of entries arranged in a heap sorted based on the entry custom
+ # implementation of __cmp__
+ self.pq = []
+ self.table = {}
+
+ def pqinsert(self, entry):
+ "Add a new key or update the priority of an existing key"
+ # Remove the entry from the table first.
+ removed_entry = self.table.pop(entry.key, None)
+ if removed_entry:
+ # Mark as removed since there is no 'remove' API in heappq.
+ # Instead, an entry in pq is removed lazily when calling pop.
+ removed_entry.is_removed = True
+ self.table[entry.key] = entry
+ heapq.heappush(self.pq, entry)
+ return removed_entry
+
+ def pqpop(self):
+ while self.pq:
+ entry = heapq.heappop(self.pq)
+ if not entry.is_removed:
+ del self.table[entry.key]
+ return entry
+ return None
+
+ def pqpeek(self):
+ while self.pq:
+ entry = self.pq[0]
+ if not entry.is_removed:
+ return entry
+ heapq.heappop(self.pq)
+ return
+
+ def __contains__(self, k):
+ return k in self.table
+
+ def __getitem__(self, k):
+ return self.table[k]
+
+ def __len__(self):
+ return len(self.table)
+
+ def values(self):
+ return self.table.values()
+
+
+class OPTCache(Cache):
+ """
+ An implementation of the Belady MIN algorithm. OPTCache evicts an entry
+ in the cache whose next access occurs furthest in the future.
+
+ Note that Belady MIN algorithm is optimal assuming all blocks having the
+ same size and a missing entry will be inserted in the cache.
+ These are NOT true for the block cache trace since blocks have different
+ sizes and we may not insert a block into the cache upon a cache miss.
+ However, it is still useful to serve as a "theoretical upper bound" on the
+ lowest miss ratio we can achieve given a cache size.
+
+ L. A. Belady. 1966. A Study of Replacement Algorithms for a
+ Virtual-storage Computer. IBM Syst. J. 5, 2 (June 1966), 78-101.
+ DOI=http://dx.doi.org/10.1147/sj.52.0078
+ """
+
+ def __init__(self, cache_size):
+ super(OPTCache, self).__init__(cache_size, enable_cache_row_key=0)
+ self.table = PQTable()
+
+ def _lookup(self, trace_record, key, hash):
+ if key not in self.table:
+ return False
+ # A cache hit. Update its next access time.
+ assert (
+ self.table.pqinsert(
+ OPTCacheEntry(
+ key, trace_record.next_access_seq_no, self.table[key].value_size
+ )
+ )
+ is not None
+ )
+ return True
+
+ def _evict(self, trace_record, key, hash, value_size):
+ while self.used_size + value_size > self.cache_size:
+ evict_entry = self.table.pqpop()
+ assert evict_entry is not None
+ self.used_size -= evict_entry.value_size
+
+ def _insert(self, trace_record, key, hash, value_size):
+ assert (
+ self.table.pqinsert(
+ OPTCacheEntry(key, trace_record.next_access_seq_no, value_size)
+ )
+ is None
+ )
+
+ def _should_admit(self, trace_record, key, hash, value_size):
+ return True
+
+ def cache_name(self):
+ return "Belady MIN (opt)"
+
+
+class GDSizeEntry:
+ """
+ A cache entry for the greedy dual size replacement policy.
+ """
+
+ def __init__(self, key, value_size, priority):
+ self.key = key
+ self.value_size = value_size
+ self.priority = priority
+ self.is_removed = False
+
+ def __cmp__(self, other):
+ if other.priority != self.priority:
+ return self.priority - other.priority
+ return self.value_size - other.value_size
+
+ def __repr__(self):
+ return "({} {} {} {})".format(
+ self.key, self.next_access_seq_no, self.value_size, self.is_removed
+ )
+
+
+class GDSizeCache(Cache):
+ """
+ An implementation of the greedy dual size algorithm.
+ We define cost as an entry's size.
+
+ See https://www.usenix.org/legacy/publications/library/proceedings/usits97/full_papers/cao/cao_html/node8.html
+ and N. Young. The k-server dual and loose competitiveness for paging.
+ Algorithmica,June 1994, vol. 11,(no.6):525-41.
+ Rewritten version of ''On-line caching as cache size varies'',
+ in The 2nd Annual ACM-SIAM Symposium on Discrete Algorithms, 241-250, 1991.
+ """
+
+ def __init__(self, cache_size, enable_cache_row_key):
+ super(GDSizeCache, self).__init__(cache_size, enable_cache_row_key)
+ self.table = PQTable()
+ self.L = 0.0
+
+ def cache_name(self):
+ if self.enable_cache_row_key:
+ return "Hybrid GreedyDualSize (gdsize_hybrid)"
+ return "GreedyDualSize (gdsize)"
+
+ def _lookup(self, trace_record, key, hash):
+ if key not in self.table:
+ return False
+ # A cache hit. Update its priority.
+ entry = self.table[key]
+ assert (
+ self.table.pqinsert(
+ GDSizeEntry(key, entry.value_size, self.L + entry.value_size)
+ )
+ is not None
+ )
+ return True
+
+ def _evict(self, trace_record, key, hash, value_size):
+ while self.used_size + value_size > self.cache_size:
+ evict_entry = self.table.pqpop()
+ assert evict_entry is not None
+ self.L = evict_entry.priority
+ self.used_size -= evict_entry.value_size
+
+ def _insert(self, trace_record, key, hash, value_size):
+ assert (
+ self.table.pqinsert(GDSizeEntry(key, value_size, self.L + value_size))
+ is None
+ )
+
+ def _should_admit(self, trace_record, key, hash, value_size):
+ return True
+
+
+class Deque(object):
+ """A Deque class facilitates the implementation of LRU and ARC."""
+
+ def __init__(self):
+ self.od = OrderedDict()
+
+ def appendleft(self, k):
+ if k in self.od:
+ del self.od[k]
+ self.od[k] = None
+
+ def pop(self):
+ item = self.od.popitem(last=False) if self.od else None
+ if item is not None:
+ return item[0]
+ return None
+
+ def remove(self, k):
+ del self.od[k]
+
+ def __len__(self):
+ return len(self.od)
+
+ def __contains__(self, k):
+ return k in self.od
+
+ def __iter__(self):
+ return reversed(self.od)
+
+ def __repr__(self):
+ return "Deque(%r)" % (list(self),)
+
+
+class ARCCache(Cache):
+ """
+ An implementation of ARC. ARC assumes that all blocks are having the
+ same size. The size of index and filter blocks are variable. To accommodate
+ this, we modified ARC as follows:
+ 1) We use 16 KB as the average block size and calculate the number of blocks
+ (c) in the cache.
+ 2) When we insert an entry, the cache evicts entries in both t1 and t2
+ queues until it has enough space for the new entry. This also requires
+ modification of the algorithm to maintain a maximum of 2*c blocks.
+
+ Nimrod Megiddo and Dharmendra S. Modha. 2003. ARC: A Self-Tuning, Low
+ Overhead Replacement Cache. In Proceedings of the 2nd USENIX Conference on
+ File and Storage Technologies (FAST '03). USENIX Association, Berkeley, CA,
+ USA, 115-130.
+ """
+
+ def __init__(self, cache_size, enable_cache_row_key):
+ super(ARCCache, self).__init__(cache_size, enable_cache_row_key)
+ self.table = {}
+ self.c = cache_size / 16 * 1024 # Number of elements in the cache.
+ self.p = 0 # Target size for the list T1
+ # L1: only once recently
+ self.t1 = Deque() # T1: recent cache entries
+ self.b1 = Deque() # B1: ghost entries recently evicted from the T1 cache
+ # L2: at least twice recently
+ self.t2 = Deque() # T2: frequent entries
+ self.b2 = Deque() # B2: ghost entries recently evicted from the T2 cache
+
+ def _replace(self, key, value_size):
+ while self.used_size + value_size > self.cache_size:
+ if self.t1 and ((key in self.b2) or (len(self.t1) > self.p)):
+ old = self.t1.pop()
+ self.b1.appendleft(old)
+ else:
+ if self.t2:
+ old = self.t2.pop()
+ self.b2.appendleft(old)
+ else:
+ old = self.t1.pop()
+ self.b1.appendleft(old)
+ self.used_size -= self.table[old].value_size
+ del self.table[old]
+
+ def _lookup(self, trace_record, key, hash):
+ # Case I: key is in T1 or T2.
+ # Move key to MRU position in T2.
+ if key in self.t1:
+ self.t1.remove(key)
+ self.t2.appendleft(key)
+ return True
+
+ if key in self.t2:
+ self.t2.remove(key)
+ self.t2.appendleft(key)
+ return True
+ return False
+
+ def _evict(self, trace_record, key, hash, value_size):
+ # Case II: key is in B1
+ # Move x from B1 to the MRU position in T2 (also fetch x to the cache).
+ if key in self.b1:
+ self.p = min(self.c, self.p + max(len(self.b2) / len(self.b1), 1))
+ self._replace(key, value_size)
+ self.b1.remove(key)
+ self.t2.appendleft(key)
+ return
+
+ # Case III: key is in B2
+ # Move x from B2 to the MRU position in T2 (also fetch x to the cache).
+ if key in self.b2:
+ self.p = max(0, self.p - max(len(self.b1) / len(self.b2), 1))
+ self._replace(key, value_size)
+ self.b2.remove(key)
+ self.t2.appendleft(key)
+ return
+
+ # Case IV: key is not in (T1 u B1 u T2 u B2)
+ self._replace(key, value_size)
+ while len(self.t1) + len(self.b1) >= self.c and self.b1:
+ self.b1.pop()
+
+ total = len(self.t1) + len(self.b1) + len(self.t2) + len(self.b2)
+ while total >= (2 * self.c) and self.b2:
+ self.b2.pop()
+ total -= 1
+ # Finally, move it to MRU position in T1.
+ self.t1.appendleft(key)
+ return
+
+ def _insert(self, trace_record, key, hash, value_size):
+ self.table[key] = CacheEntry(
+ value_size,
+ trace_record.cf_id,
+ trace_record.level,
+ trace_record.block_type,
+ trace_record.table_id,
+ 0,
+ trace_record.access_time,
+ )
+
+ def _should_admit(self, trace_record, key, hash, value_size):
+ return True
+
+ def cache_name(self):
+ if self.enable_cache_row_key:
+ return "Hybrid Adaptive Replacement Cache (arc_hybrid)"
+ return "Adaptive Replacement Cache (arc)"
+
+
+class LRUCache(Cache):
+ """
+ A strict LRU queue.
+ """
+
+ def __init__(self, cache_size, enable_cache_row_key):
+ super(LRUCache, self).__init__(cache_size, enable_cache_row_key)
+ self.table = {}
+ self.lru = Deque()
+
+ def cache_name(self):
+ if self.enable_cache_row_key:
+ return "Hybrid LRU (lru_hybrid)"
+ return "LRU (lru)"
+
+ def _lookup(self, trace_record, key, hash):
+ if key not in self.table:
+ return False
+ # A cache hit. Update LRU queue.
+ self.lru.remove(key)
+ self.lru.appendleft(key)
+ return True
+
+ def _evict(self, trace_record, key, hash, value_size):
+ while self.used_size + value_size > self.cache_size:
+ evict_key = self.lru.pop()
+ self.used_size -= self.table[evict_key].value_size
+ del self.table[evict_key]
+
+ def _insert(self, trace_record, key, hash, value_size):
+ self.table[key] = CacheEntry(
+ value_size,
+ trace_record.cf_id,
+ trace_record.level,
+ trace_record.block_type,
+ trace_record.table_id,
+ 0,
+ trace_record.access_time,
+ )
+ self.lru.appendleft(key)
+
+ def _should_admit(self, trace_record, key, hash, value_size):
+ return True
+
+
+class TraceCache(Cache):
+ """
+ A trace cache. Lookup returns true if the trace observes a cache hit.
+ It is used to maintain cache hits observed in the trace.
+ """
+
+ def __init__(self, cache_size):
+ super(TraceCache, self).__init__(cache_size, enable_cache_row_key=0)
+
+ def _lookup(self, trace_record, key, hash):
+ return trace_record.is_hit
+
+ def _evict(self, trace_record, key, hash, value_size):
+ pass
+
+ def _insert(self, trace_record, key, hash, value_size):
+ pass
+
+ def _should_admit(self, trace_record, key, hash, value_size):
+ return False
+
+ def cache_name(self):
+ return "Trace"
+
+
+def parse_cache_size(cs):
+ cs = cs.replace("\n", "")
+ if cs[-1] == "M":
+ return int(cs[: len(cs) - 1]) * 1024 * 1024
+ if cs[-1] == "G":
+ return int(cs[: len(cs) - 1]) * 1024 * 1024 * 1024
+ if cs[-1] == "T":
+ return int(cs[: len(cs) - 1]) * 1024 * 1024 * 1024 * 1024
+ return int(cs)
+
+
+def create_cache(cache_type, cache_size, downsample_size):
+ cache_size = cache_size / downsample_size
+ enable_cache_row_key = 0
+ if "hybridn" in cache_type:
+ enable_cache_row_key = 2
+ cache_type = cache_type[:-8]
+ if "hybrid" in cache_type:
+ enable_cache_row_key = 1
+ cache_type = cache_type[:-7]
+ if cache_type == "ts":
+ return ThompsonSamplingCache(
+ cache_size,
+ enable_cache_row_key,
+ [LRUPolicy(), LFUPolicy(), HyperbolicPolicy()],
+ cost_class_label=None,
+ )
+ elif cache_type == "linucb":
+ return LinUCBCache(
+ cache_size,
+ enable_cache_row_key,
+ [LRUPolicy(), LFUPolicy(), HyperbolicPolicy()],
+ cost_class_label=None,
+ )
+ elif cache_type == "pylru":
+ return ThompsonSamplingCache(
+ cache_size, enable_cache_row_key, [LRUPolicy()], cost_class_label=None
+ )
+ elif cache_type == "pymru":
+ return ThompsonSamplingCache(
+ cache_size, enable_cache_row_key, [MRUPolicy()], cost_class_label=None
+ )
+ elif cache_type == "pylfu":
+ return ThompsonSamplingCache(
+ cache_size, enable_cache_row_key, [LFUPolicy()], cost_class_label=None
+ )
+ elif cache_type == "pyhb":
+ return ThompsonSamplingCache(
+ cache_size,
+ enable_cache_row_key,
+ [HyperbolicPolicy()],
+ cost_class_label=None,
+ )
+ elif cache_type == "pycctbbt":
+ return ThompsonSamplingCache(
+ cache_size,
+ enable_cache_row_key,
+ [CostClassPolicy()],
+ cost_class_label="table_bt",
+ )
+ elif cache_type == "pycccf":
+ return ThompsonSamplingCache(
+ cache_size, enable_cache_row_key, [CostClassPolicy()], cost_class_label="cf"
+ )
+ elif cache_type == "pycctblevelbt":
+ return ThompsonSamplingCache(
+ cache_size,
+ enable_cache_row_key,
+ [CostClassPolicy()],
+ cost_class_label="table_level_bt",
+ )
+ elif cache_type == "pycccfbt":
+ return ThompsonSamplingCache(
+ cache_size,
+ enable_cache_row_key,
+ [CostClassPolicy()],
+ cost_class_label="cf_bt",
+ )
+ elif cache_type == "pycctb":
+ return ThompsonSamplingCache(
+ cache_size,
+ enable_cache_row_key,
+ [CostClassPolicy()],
+ cost_class_label="table",
+ )
+ elif cache_type == "pyccbt":
+ return ThompsonSamplingCache(
+ cache_size, enable_cache_row_key, [CostClassPolicy()], cost_class_label="bt"
+ )
+ elif cache_type == "opt":
+ if enable_cache_row_key:
+ print("opt does not support hybrid mode.")
+ assert False
+ return OPTCache(cache_size)
+ elif cache_type == "trace":
+ if enable_cache_row_key:
+ print("trace does not support hybrid mode.")
+ assert False
+ return TraceCache(cache_size)
+ elif cache_type == "lru":
+ return LRUCache(cache_size, enable_cache_row_key)
+ elif cache_type == "arc":
+ return ARCCache(cache_size, enable_cache_row_key)
+ elif cache_type == "gdsize":
+ return GDSizeCache(cache_size, enable_cache_row_key)
+ else:
+ print("Unknown cache type {}".format(cache_type))
+ assert False
+ return None
+
+
+class BlockAccessTimeline:
+ """
+ BlockAccessTimeline stores all accesses of a block.
+ """
+
+ def __init__(self):
+ self.accesses = []
+ self.current_access_index = 1
+
+ def get_next_access(self):
+ if self.current_access_index == len(self.accesses):
+ return sys.maxsize
+ next_access_seq_no = self.accesses[self.current_access_index]
+ self.current_access_index += 1
+ return next_access_seq_no
+
+
+def percent(e1, e2):
+ if e2 == 0:
+ return -1
+ return float(e1) * 100.0 / float(e2)
+
+
+def is_target_cf(access_cf, target_cf_name):
+ if target_cf_name == "all":
+ return True
+ return access_cf == target_cf_name
+
+
+def run(
+ trace_file_path,
+ cache_type,
+ cache,
+ warmup_seconds,
+ max_accesses_to_process,
+ target_cf_name,
+):
+ warmup_complete = False
+ trace_miss_ratio_stats = MissRatioStats(kSecondsInMinute)
+ access_seq_no = 0
+ time_interval = 1
+ start_time = time.time()
+ trace_start_time = 0
+ trace_duration = 0
+ is_opt_cache = False
+ if cache.cache_name() == "Belady MIN (opt)":
+ is_opt_cache = True
+
+ block_access_timelines = {}
+ num_no_inserts = 0
+ num_blocks_with_no_size = 0
+ num_inserts_block_with_no_size = 0
+
+ if is_opt_cache:
+ # Read all blocks in memory and stores their access times so that OPT
+ # can use this information to evict the cached key which next access is
+ # the furthest in the future.
+ print("Preprocessing block traces.")
+ with open(trace_file_path, "r") as trace_file:
+ for line in trace_file:
+ if (
+ max_accesses_to_process != -1
+ and access_seq_no > max_accesses_to_process
+ ):
+ break
+ ts = line.split(",")
+ timestamp = int(ts[0])
+ cf_name = ts[5]
+ if not is_target_cf(cf_name, target_cf_name):
+ continue
+ if trace_start_time == 0:
+ trace_start_time = timestamp
+ trace_duration = timestamp - trace_start_time
+ block_id = int(ts[1])
+ block_size = int(ts[3])
+ no_insert = int(ts[9])
+ if block_id not in block_access_timelines:
+ block_access_timelines[block_id] = BlockAccessTimeline()
+ if block_size == 0:
+ num_blocks_with_no_size += 1
+ block_access_timelines[block_id].accesses.append(access_seq_no)
+ access_seq_no += 1
+ if no_insert == 1:
+ num_no_inserts += 1
+ if no_insert == 0 and block_size == 0:
+ num_inserts_block_with_no_size += 1
+ if access_seq_no % 100 != 0:
+ continue
+ now = time.time()
+ if now - start_time > time_interval * 10:
+ print(
+ "Take {} seconds to process {} trace records with trace "
+ "duration of {} seconds. Throughput: {} records/second.".format(
+ now - start_time,
+ access_seq_no,
+ trace_duration / 1000000,
+ access_seq_no / (now - start_time),
+ )
+ )
+ time_interval += 1
+ print(
+ "Trace contains {0} blocks, {1}({2:.2f}%) blocks with no size."
+ "{3} accesses, {4}({5:.2f}%) accesses with no_insert,"
+ "{6}({7:.2f}%) accesses that want to insert but block size is 0.".format(
+ len(block_access_timelines),
+ num_blocks_with_no_size,
+ percent(num_blocks_with_no_size, len(block_access_timelines)),
+ access_seq_no,
+ num_no_inserts,
+ percent(num_no_inserts, access_seq_no),
+ num_inserts_block_with_no_size,
+ percent(num_inserts_block_with_no_size, access_seq_no),
+ )
+ )
+
+ access_seq_no = 0
+ time_interval = 1
+ start_time = time.time()
+ trace_start_time = 0
+ trace_duration = 0
+ print("Running simulated {} cache on block traces.".format(cache.cache_name()))
+ with open(trace_file_path, "r") as trace_file:
+ for line in trace_file:
+ if (
+ max_accesses_to_process != -1
+ and access_seq_no > max_accesses_to_process
+ ):
+ break
+ if access_seq_no % 1000000 == 0:
+ # Force a python gc periodically to reduce memory usage.
+ gc.collect()
+ ts = line.split(",")
+ timestamp = int(ts[0])
+ cf_name = ts[5]
+ if not is_target_cf(cf_name, target_cf_name):
+ continue
+ if trace_start_time == 0:
+ trace_start_time = timestamp
+ trace_duration = timestamp - trace_start_time
+ if (
+ not warmup_complete
+ and warmup_seconds > 0
+ and trace_duration > warmup_seconds * 1000000
+ ):
+ cache.miss_ratio_stats.reset_counter()
+ warmup_complete = True
+ next_access_seq_no = 0
+ block_id = int(ts[1])
+ if is_opt_cache:
+ next_access_seq_no = block_access_timelines[block_id].get_next_access()
+ record = TraceRecord(
+ access_time=int(ts[0]),
+ block_id=int(ts[1]),
+ block_type=int(ts[2]),
+ block_size=int(ts[3]),
+ cf_id=int(ts[4]),
+ cf_name=ts[5],
+ level=int(ts[6]),
+ fd=int(ts[7]),
+ caller=int(ts[8]),
+ no_insert=int(ts[9]),
+ get_id=int(ts[10]),
+ key_id=int(ts[11]),
+ kv_size=int(ts[12]),
+ is_hit=int(ts[13]),
+ referenced_key_exist_in_block=int(ts[14]),
+ num_keys_in_block=int(ts[15]),
+ table_id=int(ts[16]),
+ seq_number=int(ts[17]),
+ block_key_size=int(ts[18]),
+ key_size=int(ts[19]),
+ block_offset_in_file=int(ts[20]),
+ next_access_seq_no=next_access_seq_no,
+ )
+ trace_miss_ratio_stats.update_metrics(
+ record.access_time, is_hit=record.is_hit, miss_bytes=record.block_size
+ )
+ cache.access(record)
+ access_seq_no += 1
+ del record
+ del ts
+ if access_seq_no % 100 != 0:
+ continue
+ # Report progress every 10 seconds.
+ now = time.time()
+ if now - start_time > time_interval * 10:
+ print(
+ "Take {} seconds to process {} trace records with trace "
+ "duration of {} seconds. Throughput: {} records/second. "
+ "Trace miss ratio {}".format(
+ now - start_time,
+ access_seq_no,
+ trace_duration / 1000000,
+ access_seq_no / (now - start_time),
+ trace_miss_ratio_stats.miss_ratio(),
+ )
+ )
+ time_interval += 1
+ print(
+ "{},0,0,{},{},{}".format(
+ cache_type,
+ cache.cache_size,
+ cache.miss_ratio_stats.miss_ratio(),
+ cache.miss_ratio_stats.num_accesses,
+ )
+ )
+ now = time.time()
+ print(
+ "Take {} seconds to process {} trace records with trace duration of {} "
+ "seconds. Throughput: {} records/second. Trace miss ratio {}".format(
+ now - start_time,
+ access_seq_no,
+ trace_duration / 1000000,
+ access_seq_no / (now - start_time),
+ trace_miss_ratio_stats.miss_ratio(),
+ )
+ )
+ print(
+ "{},0,0,{},{},{}".format(
+ cache_type,
+ cache.cache_size,
+ cache.miss_ratio_stats.miss_ratio(),
+ cache.miss_ratio_stats.num_accesses,
+ )
+ )
+ return trace_start_time, trace_duration
+
+
+def report_stats(
+ cache,
+ cache_type,
+ cache_size,
+ target_cf_name,
+ result_dir,
+ trace_start_time,
+ trace_end_time,
+):
+ cache_label = "{}-{}-{}".format(cache_type, cache_size, target_cf_name)
+ with open("{}/data-ml-mrc-{}".format(result_dir, cache_label), "w+") as mrc_file:
+ mrc_file.write(
+ "{},0,0,{},{},{}\n".format(
+ cache_type,
+ cache_size,
+ cache.miss_ratio_stats.miss_ratio(),
+ cache.miss_ratio_stats.num_accesses,
+ )
+ )
+
+ cache_stats = [
+ cache.per_second_miss_ratio_stats,
+ cache.miss_ratio_stats,
+ cache.per_hour_miss_ratio_stats,
+ ]
+ for i in range(len(cache_stats)):
+ avg_miss_bytes, p95_miss_bytes = cache_stats[i].compute_miss_bytes()
+
+ with open(
+ "{}/data-ml-avgmb-{}-{}".format(
+ result_dir, cache_stats[i].time_unit, cache_label
+ ),
+ "w+",
+ ) as mb_file:
+ mb_file.write(
+ "{},0,0,{},{}\n".format(cache_type, cache_size, avg_miss_bytes)
+ )
+
+ with open(
+ "{}/data-ml-p95mb-{}-{}".format(
+ result_dir, cache_stats[i].time_unit, cache_label
+ ),
+ "w+",
+ ) as mb_file:
+ mb_file.write(
+ "{},0,0,{},{}\n".format(cache_type, cache_size, p95_miss_bytes)
+ )
+
+ cache_stats[i].write_miss_timeline(
+ cache_type,
+ cache_size,
+ target_cf_name,
+ result_dir,
+ trace_start_time,
+ trace_end_time,
+ )
+ cache_stats[i].write_miss_ratio_timeline(
+ cache_type,
+ cache_size,
+ target_cf_name,
+ result_dir,
+ trace_start_time,
+ trace_end_time,
+ )
+
+ if not cache.is_ml_cache():
+ return
+
+ policy_stats = [cache.policy_stats, cache.per_hour_policy_stats]
+ for i in range(len(policy_stats)):
+ policy_stats[i].write_policy_timeline(
+ cache_type,
+ cache_size,
+ target_cf_name,
+ result_dir,
+ trace_start_time,
+ trace_end_time,
+ )
+ policy_stats[i].write_policy_ratio_timeline(
+ cache_type,
+ cache_size,
+ target_cf_name,
+ result_dir,
+ trace_start_time,
+ trace_end_time,
+ )
+
+
+if __name__ == "__main__":
+ if len(sys.argv) <= 8:
+ print(
+ "Must provide 8 arguments.\n"
+ "1) Cache type (ts, linucb, arc, lru, opt, pylru, pymru, pylfu, "
+ "pyhb, gdsize, trace). One may evaluate the hybrid row_block cache "
+ "by appending '_hybrid' to a cache_type, e.g., ts_hybrid. "
+ "Note that hybrid is not supported with opt and trace. \n"
+ "2) Cache size (xM, xG, xT).\n"
+ "3) The sampling frequency used to collect the trace. (The "
+ "simulation scales down the cache size by the sampling frequency).\n"
+ "4) Warmup seconds (The number of seconds used for warmup).\n"
+ "5) Trace file path.\n"
+ "6) Result directory (A directory that saves generated results)\n"
+ "7) Max number of accesses to process\n"
+ "8) The target column family. (The simulation will only run "
+ "accesses on the target column family. If it is set to all, "
+ "it will run against all accesses.)"
+ )
+ exit(1)
+ print("Arguments: {}".format(sys.argv))
+ cache_type = sys.argv[1]
+ cache_size = parse_cache_size(sys.argv[2])
+ downsample_size = int(sys.argv[3])
+ warmup_seconds = int(sys.argv[4])
+ trace_file_path = sys.argv[5]
+ result_dir = sys.argv[6]
+ max_accesses_to_process = int(sys.argv[7])
+ target_cf_name = sys.argv[8]
+ cache = create_cache(cache_type, cache_size, downsample_size)
+ trace_start_time, trace_duration = run(
+ trace_file_path,
+ cache_type,
+ cache,
+ warmup_seconds,
+ max_accesses_to_process,
+ target_cf_name,
+ )
+ trace_end_time = trace_start_time + trace_duration
+ report_stats(
+ cache,
+ cache_type,
+ cache_size,
+ target_cf_name,
+ result_dir,
+ trace_start_time,
+ trace_end_time,
+ )
diff --git a/src/rocksdb/tools/block_cache_analyzer/block_cache_pysim.sh b/src/rocksdb/tools/block_cache_analyzer/block_cache_pysim.sh
new file mode 100644
index 000000000..295f734aa
--- /dev/null
+++ b/src/rocksdb/tools/block_cache_analyzer/block_cache_pysim.sh
@@ -0,0 +1,156 @@
+#!/usr/bin/env bash
+# Copyright (c) Facebook, Inc. and its affiliates. All Rights Reserved.
+#
+# A shell script to run a batch of pysims and combine individual pysim output files.
+#
+# Usage: bash block_cache_pysim.sh trace_file_path result_dir downsample_size warmup_seconds max_jobs
+# trace_file_path: The file path that stores the traces.
+# result_dir: The directory to store pysim results. The output files from a pysim is stores in result_dir/ml
+# downsample_size: The downsample size used to collect the trace.
+# warmup_seconds: The number of seconds used for warmup.
+# max_jobs: The max number of concurrent pysims to run.
+
+# Install required packages to run simulations.
+# sudo dnf install -y numpy scipy python-matplotlib ipython python-pandas sympy python-nose atlas-devel
+ulimit -c 0
+
+if [ $# -ne 5 ]; then
+ echo "Usage: ./block_cache_pysim.sh trace_file_path result_dir downsample_size warmup_seconds max_jobs"
+ exit 0
+fi
+
+trace_file="$1"
+result_dir="$2"
+downsample_size="$3"
+warmup_seconds="$4"
+max_jobs="$5"
+max_num_accesses=100000000
+current_jobs=1
+
+ml_tmp_result_dir="$result_dir/ml"
+rm -rf "$ml_tmp_result_dir"
+mkdir -p "$result_dir"
+mkdir -p "$ml_tmp_result_dir"
+
+# Report miss ratio in the trace.
+current_jobs=$(ps aux | grep pysim | grep python | grep -cv grep)
+for cf_name in "all"
+do
+for cache_size in "1G" "2G" "4G" "8G" "16G" #"12G" "16G" "1T"
+do
+for cache_type in "opt" "lru" "pylru" "pycctbbt" "pyhb" "ts" "trace" "lru_hybrid" #"pycctblevelbt" #"lru_hybridn" "opt" #"pylru" "pylru_hybrid" "pycctbbt" "pycccfbt" "trace"
+do
+ if [[ $cache_type == "trace" && $cache_size != "16G" ]]; then
+ # We only need to collect miss ratios observed in the trace once.
+ continue
+ fi
+ while [ "$current_jobs" -ge "$max_jobs" ]
+ do
+ sleep 10
+ echo "Waiting jobs to complete. Number of running jobs: $current_jobs"
+ current_jobs=$(ps aux | grep pysim | grep python | grep -cv grep)
+ echo "Waiting jobs to complete. Number of running jobs: $current_jobs"
+ done
+ output="log-ml-$cache_type-$cache_size-$cf_name"
+ echo "Running simulation for $cache_type, cache size $cache_size, and cf_name $cf_name. Number of running jobs: $current_jobs. "
+ nohup python block_cache_pysim.py "$cache_type" "$cache_size" "$downsample_size" "$warmup_seconds" "$trace_file" "$ml_tmp_result_dir" "$max_num_accesses" "$cf_name" >& "$ml_tmp_result_dir/$output" &
+ current_jobs=$((current_jobs+1))
+done
+done
+done
+
+# Wait for all jobs to complete.
+while [ $current_jobs -gt 0 ]
+do
+ sleep 10
+ echo "Waiting jobs to complete. Number of running jobs: $current_jobs"
+ current_jobs=$(ps aux | grep pysim | grep python | grep -cv grep)
+ echo "Waiting jobs to complete. Number of running jobs: $current_jobs"
+done
+
+echo "Combine individual pysim output files"
+
+rm -rf "$result_dir/ml_*"
+for header in "header-" "data-"
+do
+for fn in "$ml_tmp_result_dir"/*
+do
+ sum_file=""
+ time_unit=""
+ capacity=""
+ target_cf_name=""
+ if [[ $fn == *"timeline"* ]]; then
+ tmpfn="$fn"
+ IFS='-' read -ra elements <<< "$tmpfn"
+ time_unit_index=0
+ capacity_index=0
+ for i in "${elements[@]}"
+ do
+ if [[ $i == "timeline" ]]; then
+ break
+ fi
+ time_unit_index=$((time_unit_index+1))
+ done
+ time_unit_index=$((time_unit_index+1))
+ capacity_index=$((time_unit_index+2))
+ target_cf_name_index=$((time_unit_index+3))
+ time_unit="${elements[$time_unit_index]}_"
+ capacity="${elements[$capacity_index]}_"
+ target_cf_name="${elements[$target_cf_name_index]}_"
+ fi
+
+ if [[ $fn == *"${header}ml-policy-timeline"* ]]; then
+ sum_file="$result_dir/ml_${target_cf_name}${capacity}${time_unit}policy_timeline"
+ fi
+ if [[ $fn == *"${header}ml-policy-ratio-timeline"* ]]; then
+ sum_file="$result_dir/ml_${target_cf_name}${capacity}${time_unit}policy_ratio_timeline"
+ fi
+ if [[ $fn == *"${header}ml-miss-timeline"* ]]; then
+ sum_file="$result_dir/ml_${target_cf_name}${capacity}${time_unit}miss_timeline"
+ fi
+ if [[ $fn == *"${header}ml-miss-ratio-timeline"* ]]; then
+ sum_file="$result_dir/ml_${target_cf_name}${capacity}${time_unit}miss_ratio_timeline"
+ fi
+ if [[ $fn == *"${header}ml-mrc"* ]]; then
+ tmpfn="$fn"
+ IFS='-' read -ra elements <<< "$tmpfn"
+ target_cf_name=${elements[-1]}
+ sum_file="${result_dir}/ml_${target_cf_name}_mrc"
+ fi
+ if [[ $fn == *"${header}ml-avgmb"* ]]; then
+ tmpfn="$fn"
+ IFS='-' read -ra elements <<< "$tmpfn"
+ time_unit=${elements[3]}
+ target_cf_name=${elements[-1]}
+ sum_file="${result_dir}/ml_${time_unit}_${target_cf_name}_avgmb"
+ fi
+ if [[ $fn == *"${header}ml-p95mb"* ]]; then
+ tmpfn="$fn"
+ IFS='-' read -ra elements <<< "$tmpfn"
+ time_unit=${elements[3]}
+ target_cf_name=${elements[-1]}
+ sum_file="${result_dir}/ml_${time_unit}_${target_cf_name}_p95mb"
+ fi
+ if [[ $sum_file == "" ]]; then
+ continue
+ fi
+ if [[ $header == "header-" ]]; then
+ if [ -e "$sum_file" ]; then
+ continue
+ fi
+ fi
+ cat "$fn" >> "$sum_file"
+done
+done
+
+echo "Done"
+for fn in $result_dir/*
+do
+ if [[ $fn == *"_mrc" || $fn == *"_avgmb" || $fn == *"_p95mb" ]]; then
+ # Sort MRC file by cache_type and cache_size.
+ tmp_file="$result_dir/tmp_mrc"
+ cat "$fn" | sort -t ',' -k1,1 -k4,4n > "$tmp_file"
+ cat "$tmp_file" > "$fn"
+ rm -rf "$tmp_file"
+ fi
+done
diff --git a/src/rocksdb/tools/block_cache_analyzer/block_cache_pysim_test.py b/src/rocksdb/tools/block_cache_analyzer/block_cache_pysim_test.py
new file mode 100644
index 000000000..eed1b94af
--- /dev/null
+++ b/src/rocksdb/tools/block_cache_analyzer/block_cache_pysim_test.py
@@ -0,0 +1,734 @@
+#!/usr/bin/env python3
+# Copyright (c) Facebook, Inc. and its affiliates. All Rights Reserved.
+
+import os
+import random
+import sys
+
+from block_cache_pysim import (
+ ARCCache,
+ CacheEntry,
+ create_cache,
+ GDSizeCache,
+ HashTable,
+ HyperbolicPolicy,
+ kMicrosInSecond,
+ kSampleSize,
+ LFUPolicy,
+ LinUCBCache,
+ LRUCache,
+ LRUPolicy,
+ MRUPolicy,
+ OPTCache,
+ OPTCacheEntry,
+ run,
+ ThompsonSamplingCache,
+ TraceCache,
+ TraceRecord,
+)
+
+
+def test_hash_table():
+ print("Test hash table")
+ table = HashTable()
+ data_size = 10000
+ for i in range(data_size):
+ table.insert("k{}".format(i), i, "v{}".format(i))
+ for i in range(data_size):
+ assert table.lookup("k{}".format(i), i) is not None
+ for i in range(data_size):
+ table.delete("k{}".format(i), i)
+ for i in range(data_size):
+ assert table.lookup("k{}".format(i), i) is None
+
+ truth_map = {}
+ n = 1000000
+ records = 100
+ for i in range(n):
+ key_id = random.randint(0, records)
+ v = random.randint(0, records)
+ key = "k{}".format(key_id)
+ value = CacheEntry(v, v, v, v, v, v, v)
+ action = random.randint(0, 10)
+ assert len(truth_map) == table.elements, "{} {} {}".format(
+ len(truth_map), table.elements, i
+ )
+ if action <= 8:
+ if key in truth_map:
+ assert table.lookup(key, key_id) is not None
+ assert truth_map[key].value_size == table.lookup(key, key_id).value_size
+ else:
+ assert table.lookup(key, key_id) is None
+ table.insert(key, key_id, value)
+ truth_map[key] = value
+ else:
+ deleted = table.delete(key, key_id)
+ if deleted:
+ assert key in truth_map
+ if key in truth_map:
+ del truth_map[key]
+
+ # Check all keys are unique in the sample set.
+ for _i in range(10):
+ samples = table.random_sample(kSampleSize)
+ unique_keys = {}
+ for sample in samples:
+ unique_keys[sample.key] = True
+ assert len(samples) == len(unique_keys)
+
+ assert len(table) == len(truth_map)
+ for key in truth_map:
+ assert table.lookup(key, int(key[1:])) is not None
+ assert truth_map[key].value_size == table.lookup(key, int(key[1:])).value_size
+ print("Test hash table: Success")
+
+
+def assert_metrics(cache, expected_value, expected_value_size=1, custom_hashtable=True):
+ assert cache.used_size == expected_value[0], "Expected {}, Actual {}".format(
+ expected_value[0], cache.used_size
+ )
+ assert (
+ cache.miss_ratio_stats.num_accesses == expected_value[1]
+ ), "Expected {}, Actual {}".format(
+ expected_value[1], cache.miss_ratio_stats.num_accesses
+ )
+ assert (
+ cache.miss_ratio_stats.num_misses == expected_value[2]
+ ), "Expected {}, Actual {}".format(
+ expected_value[2], cache.miss_ratio_stats.num_misses
+ )
+ assert len(cache.table) == len(expected_value[3]) + len(
+ expected_value[4]
+ ), "Expected {}, Actual {}".format(
+ len(expected_value[3]) + len(expected_value[4]), cache.table.elements
+ )
+ for expeceted_k in expected_value[3]:
+ if custom_hashtable:
+ val = cache.table.lookup("b{}".format(expeceted_k), expeceted_k)
+ else:
+ val = cache.table["b{}".format(expeceted_k)]
+ assert val is not None, "Expected {} Actual: Not Exist {}, Table: {}".format(
+ expeceted_k, expected_value, cache.table
+ )
+ assert val.value_size == expected_value_size
+ for expeceted_k in expected_value[4]:
+ if custom_hashtable:
+ val = cache.table.lookup("g0-{}".format(expeceted_k), expeceted_k)
+ else:
+ val = cache.table["g0-{}".format(expeceted_k)]
+ assert val is not None
+ assert val.value_size == expected_value_size
+
+
+# Access k1, k1, k2, k3, k3, k3, k4
+# When k4 is inserted,
+# LRU should evict k1.
+# LFU should evict k2.
+# MRU should evict k3.
+def test_cache(cache, expected_value, custom_hashtable=True):
+ k1 = TraceRecord(
+ access_time=0,
+ block_id=1,
+ block_type=1,
+ block_size=1,
+ cf_id=0,
+ cf_name="",
+ level=0,
+ fd=0,
+ caller=1,
+ no_insert=0,
+ get_id=1,
+ key_id=1,
+ kv_size=5,
+ is_hit=1,
+ referenced_key_exist_in_block=1,
+ num_keys_in_block=0,
+ table_id=0,
+ seq_number=0,
+ block_key_size=0,
+ key_size=0,
+ block_offset_in_file=0,
+ next_access_seq_no=0,
+ )
+ k2 = TraceRecord(
+ access_time=1,
+ block_id=2,
+ block_type=1,
+ block_size=1,
+ cf_id=0,
+ cf_name="",
+ level=0,
+ fd=0,
+ caller=1,
+ no_insert=0,
+ get_id=1,
+ key_id=1,
+ kv_size=5,
+ is_hit=1,
+ referenced_key_exist_in_block=1,
+ num_keys_in_block=0,
+ table_id=0,
+ seq_number=0,
+ block_key_size=0,
+ key_size=0,
+ block_offset_in_file=0,
+ next_access_seq_no=0,
+ )
+ k3 = TraceRecord(
+ access_time=2,
+ block_id=3,
+ block_type=1,
+ block_size=1,
+ cf_id=0,
+ cf_name="",
+ level=0,
+ fd=0,
+ caller=1,
+ no_insert=0,
+ get_id=1,
+ key_id=1,
+ kv_size=5,
+ is_hit=1,
+ referenced_key_exist_in_block=1,
+ num_keys_in_block=0,
+ table_id=0,
+ seq_number=0,
+ block_key_size=0,
+ key_size=0,
+ block_offset_in_file=0,
+ next_access_seq_no=0,
+ )
+ k4 = TraceRecord(
+ access_time=3,
+ block_id=4,
+ block_type=1,
+ block_size=1,
+ cf_id=0,
+ cf_name="",
+ level=0,
+ fd=0,
+ caller=1,
+ no_insert=0,
+ get_id=1,
+ key_id=1,
+ kv_size=5,
+ is_hit=1,
+ referenced_key_exist_in_block=1,
+ num_keys_in_block=0,
+ table_id=0,
+ seq_number=0,
+ block_key_size=0,
+ key_size=0,
+ block_offset_in_file=0,
+ next_access_seq_no=0,
+ )
+ sequence = [k1, k1, k2, k3, k3, k3]
+ index = 0
+ expected_values = []
+ # Access k1, miss.
+ expected_values.append([1, 1, 1, [1], []])
+ # Access k1, hit.
+ expected_values.append([1, 2, 1, [1], []])
+ # Access k2, miss.
+ expected_values.append([2, 3, 2, [1, 2], []])
+ # Access k3, miss.
+ expected_values.append([3, 4, 3, [1, 2, 3], []])
+ # Access k3, hit.
+ expected_values.append([3, 5, 3, [1, 2, 3], []])
+ # Access k3, hit.
+ expected_values.append([3, 6, 3, [1, 2, 3], []])
+ access_time = 0
+ for access in sequence:
+ access.access_time = access_time
+ cache.access(access)
+ assert_metrics(
+ cache,
+ expected_values[index],
+ expected_value_size=1,
+ custom_hashtable=custom_hashtable,
+ )
+ access_time += 1
+ index += 1
+ k4.access_time = access_time
+ cache.access(k4)
+ assert_metrics(
+ cache, expected_value, expected_value_size=1, custom_hashtable=custom_hashtable
+ )
+
+
+def test_lru_cache(cache, custom_hashtable):
+ print("Test LRU cache")
+ # Access k4, miss. evict k1
+ test_cache(cache, [3, 7, 4, [2, 3, 4], []], custom_hashtable)
+ print("Test LRU cache: Success")
+
+
+def test_mru_cache():
+ print("Test MRU cache")
+ policies = []
+ policies.append(MRUPolicy())
+ # Access k4, miss. evict k3
+ test_cache(
+ ThompsonSamplingCache(3, False, policies, cost_class_label=None),
+ [3, 7, 4, [1, 2, 4], []],
+ )
+ print("Test MRU cache: Success")
+
+
+def test_lfu_cache():
+ print("Test LFU cache")
+ policies = []
+ policies.append(LFUPolicy())
+ # Access k4, miss. evict k2
+ test_cache(
+ ThompsonSamplingCache(3, False, policies, cost_class_label=None),
+ [3, 7, 4, [1, 3, 4], []],
+ )
+ print("Test LFU cache: Success")
+
+
+def test_mix(cache):
+ print("Test Mix {} cache".format(cache.cache_name()))
+ n = 100000
+ records = 100
+ block_size_table = {}
+ trace_num_misses = 0
+ for i in range(n):
+ key_id = random.randint(0, records)
+ vs = random.randint(0, 10)
+ now = i * kMicrosInSecond
+ block_size = vs
+ if key_id in block_size_table:
+ block_size = block_size_table[key_id]
+ else:
+ block_size_table[key_id] = block_size
+ is_hit = key_id % 2
+ if is_hit == 0:
+ trace_num_misses += 1
+ k = TraceRecord(
+ access_time=now,
+ block_id=key_id,
+ block_type=1,
+ block_size=block_size,
+ cf_id=0,
+ cf_name="",
+ level=0,
+ fd=0,
+ caller=1,
+ no_insert=0,
+ get_id=key_id,
+ key_id=key_id,
+ kv_size=5,
+ is_hit=is_hit,
+ referenced_key_exist_in_block=1,
+ num_keys_in_block=0,
+ table_id=0,
+ seq_number=0,
+ block_key_size=0,
+ key_size=0,
+ block_offset_in_file=0,
+ next_access_seq_no=vs,
+ )
+ cache.access(k)
+ assert cache.miss_ratio_stats.miss_ratio() > 0
+ if cache.cache_name() == "Trace":
+ assert cache.miss_ratio_stats.num_accesses == n
+ assert cache.miss_ratio_stats.num_misses == trace_num_misses
+ else:
+ assert cache.used_size <= cache.cache_size
+ all_values = cache.table.values()
+ cached_size = 0
+ for value in all_values:
+ cached_size += value.value_size
+ assert cached_size == cache.used_size, "Expeced {} Actual {}".format(
+ cache.used_size, cached_size
+ )
+ print("Test Mix {} cache: Success".format(cache.cache_name()))
+
+
+def test_end_to_end():
+ print("Test All caches")
+ n = 100000
+ nblocks = 1000
+ block_size = 16 * 1024
+ ncfs = 7
+ nlevels = 6
+ nfds = 100000
+ trace_file_path = "test_trace"
+ # All blocks are of the same size so that OPT must achieve the lowest miss
+ # ratio.
+ with open(trace_file_path, "w+") as trace_file:
+ access_records = ""
+ for i in range(n):
+ key_id = random.randint(0, nblocks)
+ cf_id = random.randint(0, ncfs)
+ level = random.randint(0, nlevels)
+ fd = random.randint(0, nfds)
+ now = i * kMicrosInSecond
+ access_record = ""
+ access_record += "{},".format(now)
+ access_record += "{},".format(key_id)
+ access_record += "{},".format(9) # block type
+ access_record += "{},".format(block_size) # block size
+ access_record += "{},".format(cf_id)
+ access_record += "cf_{},".format(cf_id)
+ access_record += "{},".format(level)
+ access_record += "{},".format(fd)
+ access_record += "{},".format(key_id % 3) # caller
+ access_record += "{},".format(0) # no insert
+ access_record += "{},".format(i) # get_id
+ access_record += "{},".format(i) # key_id
+ access_record += "{},".format(100) # kv_size
+ access_record += "{},".format(1) # is_hit
+ access_record += "{},".format(1) # referenced_key_exist_in_block
+ access_record += "{},".format(10) # num_keys_in_block
+ access_record += "{},".format(1) # table_id
+ access_record += "{},".format(0) # seq_number
+ access_record += "{},".format(10) # block key size
+ access_record += "{},".format(20) # key size
+ access_record += "{},".format(0) # block offset
+ access_record = access_record[:-1]
+ access_records += access_record + "\n"
+ trace_file.write(access_records)
+
+ print("Test All caches: Start testing caches")
+ cache_size = block_size * nblocks / 10
+ downsample_size = 1
+ cache_ms = {}
+ for cache_type in [
+ "ts",
+ "opt",
+ "lru",
+ "pylru",
+ "linucb",
+ "gdsize",
+ "pyccbt",
+ "pycctbbt",
+ ]:
+ cache = create_cache(cache_type, cache_size, downsample_size)
+ run(trace_file_path, cache_type, cache, 0, -1, "all")
+ cache_ms[cache_type] = cache
+ assert cache.miss_ratio_stats.num_accesses == n
+
+ for cache_type in cache_ms:
+ cache = cache_ms[cache_type]
+ ms = cache.miss_ratio_stats.miss_ratio()
+ assert ms <= 100.0 and ms >= 0.0
+ # OPT should perform the best.
+ assert cache_ms["opt"].miss_ratio_stats.miss_ratio() <= ms
+ assert cache.used_size <= cache.cache_size
+ all_values = cache.table.values()
+ cached_size = 0
+ for value in all_values:
+ cached_size += value.value_size
+ assert cached_size == cache.used_size, "Expeced {} Actual {}".format(
+ cache.used_size, cached_size
+ )
+ print("Test All {}: Success".format(cache.cache_name()))
+
+ os.remove(trace_file_path)
+ print("Test All: Success")
+
+
+def test_hybrid(cache):
+ print("Test {} cache".format(cache.cache_name()))
+ k = TraceRecord(
+ access_time=0,
+ block_id=1,
+ block_type=1,
+ block_size=1,
+ cf_id=0,
+ cf_name="",
+ level=0,
+ fd=0,
+ caller=1,
+ no_insert=0,
+ get_id=1, # the first get request.
+ key_id=1,
+ kv_size=0, # no size.
+ is_hit=1,
+ referenced_key_exist_in_block=1,
+ num_keys_in_block=0,
+ table_id=0,
+ seq_number=0,
+ block_key_size=0,
+ key_size=0,
+ block_offset_in_file=0,
+ next_access_seq_no=0,
+ )
+ cache.access(k) # Expect a miss.
+ # used size, num accesses, num misses, hash table size, blocks, get keys.
+ assert_metrics(cache, [1, 1, 1, [1], []])
+ k.access_time += 1
+ k.kv_size = 1
+ k.block_id = 2
+ cache.access(k) # k should be inserted.
+ assert_metrics(cache, [3, 2, 2, [1, 2], [1]])
+ k.access_time += 1
+ k.block_id = 3
+ cache.access(k) # k should not be inserted again.
+ assert_metrics(cache, [4, 3, 3, [1, 2, 3], [1]])
+ # A second get request referencing the same key.
+ k.access_time += 1
+ k.get_id = 2
+ k.block_id = 4
+ k.kv_size = 0
+ cache.access(k) # k should observe a hit. No block access.
+ assert_metrics(cache, [4, 4, 3, [1, 2, 3], [1]])
+
+ # A third get request searches three files, three different keys.
+ # And the second key observes a hit.
+ k.access_time += 1
+ k.kv_size = 1
+ k.get_id = 3
+ k.block_id = 3
+ k.key_id = 2
+ cache.access(k) # k should observe a miss. block 3 observes a hit.
+ assert_metrics(cache, [5, 5, 3, [1, 2, 3], [1, 2]])
+
+ k.access_time += 1
+ k.kv_size = 1
+ k.get_id = 3
+ k.block_id = 4
+ k.kv_size = 1
+ k.key_id = 1
+ cache.access(k) # k1 should observe a hit.
+ assert_metrics(cache, [5, 6, 3, [1, 2, 3], [1, 2]])
+
+ k.access_time += 1
+ k.kv_size = 1
+ k.get_id = 3
+ k.block_id = 4
+ k.kv_size = 1
+ k.key_id = 3
+ # k3 should observe a miss.
+ # However, as the get already complete, we should not access k3 any more.
+ cache.access(k)
+ assert_metrics(cache, [5, 7, 3, [1, 2, 3], [1, 2]])
+
+ # A fourth get request searches one file and two blocks. One row key.
+ k.access_time += 1
+ k.get_id = 4
+ k.block_id = 5
+ k.key_id = 4
+ k.kv_size = 1
+ cache.access(k)
+ assert_metrics(cache, [7, 8, 4, [1, 2, 3, 5], [1, 2, 4]])
+
+ # A bunch of insertions which evict cached row keys.
+ for i in range(6, 100):
+ k.access_time += 1
+ k.get_id = 0
+ k.block_id = i
+ cache.access(k)
+
+ k.get_id = 4
+ k.block_id = 100 # A different block.
+ k.key_id = 4 # Same row key and should not be inserted again.
+ k.kv_size = 1
+ cache.access(k)
+ assert_metrics(
+ cache, [kSampleSize, 103, 99, [i for i in range(101 - kSampleSize, 101)], []]
+ )
+ print("Test {} cache: Success".format(cache.cache_name()))
+
+
+def test_opt_cache():
+ print("Test OPT cache")
+ cache = OPTCache(3)
+ # seq: 0, 1, 2, 3, 4, 5, 6, 7, 8
+ # key: k1, k2, k3, k4, k5, k6, k7, k1, k8
+ # next_access: 7, 19, 18, M, M, 17, 16, 25, M
+ k = TraceRecord(
+ access_time=0,
+ block_id=1,
+ block_type=1,
+ block_size=1,
+ cf_id=0,
+ cf_name="",
+ level=0,
+ fd=0,
+ caller=1,
+ no_insert=0,
+ get_id=1, # the first get request.
+ key_id=1,
+ kv_size=0, # no size.
+ is_hit=1,
+ referenced_key_exist_in_block=1,
+ num_keys_in_block=0,
+ table_id=0,
+ seq_number=0,
+ block_key_size=0,
+ key_size=0,
+ block_offset_in_file=0,
+ next_access_seq_no=7,
+ )
+ cache.access(k)
+ assert_metrics(
+ cache, [1, 1, 1, [1], []], expected_value_size=1, custom_hashtable=False
+ )
+ k.access_time += 1
+ k.block_id = 2
+ k.next_access_seq_no = 19
+ cache.access(k)
+ assert_metrics(
+ cache, [2, 2, 2, [1, 2], []], expected_value_size=1, custom_hashtable=False
+ )
+ k.access_time += 1
+ k.block_id = 3
+ k.next_access_seq_no = 18
+ cache.access(k)
+ assert_metrics(
+ cache, [3, 3, 3, [1, 2, 3], []], expected_value_size=1, custom_hashtable=False
+ )
+ k.access_time += 1
+ k.block_id = 4
+ k.next_access_seq_no = sys.maxsize # Never accessed again.
+ cache.access(k)
+ # Evict 2 since its next access 19 is the furthest in the future.
+ assert_metrics(
+ cache, [3, 4, 4, [1, 3, 4], []], expected_value_size=1, custom_hashtable=False
+ )
+ k.access_time += 1
+ k.block_id = 5
+ k.next_access_seq_no = sys.maxsize # Never accessed again.
+ cache.access(k)
+ # Evict 4 since its next access MAXINT is the furthest in the future.
+ assert_metrics(
+ cache, [3, 5, 5, [1, 3, 5], []], expected_value_size=1, custom_hashtable=False
+ )
+ k.access_time += 1
+ k.block_id = 6
+ k.next_access_seq_no = 17
+ cache.access(k)
+ # Evict 5 since its next access MAXINT is the furthest in the future.
+ assert_metrics(
+ cache, [3, 6, 6, [1, 3, 6], []], expected_value_size=1, custom_hashtable=False
+ )
+ k.access_time += 1
+ k.block_id = 7
+ k.next_access_seq_no = 16
+ cache.access(k)
+ # Evict 3 since its next access 18 is the furthest in the future.
+ assert_metrics(
+ cache, [3, 7, 7, [1, 6, 7], []], expected_value_size=1, custom_hashtable=False
+ )
+ k.access_time += 1
+ k.block_id = 1
+ k.next_access_seq_no = 25
+ cache.access(k)
+ assert_metrics(
+ cache, [3, 8, 7, [1, 6, 7], []], expected_value_size=1, custom_hashtable=False
+ )
+ k.access_time += 1
+ k.block_id = 8
+ k.next_access_seq_no = sys.maxsize
+ cache.access(k)
+ # Evict 1 since its next access 25 is the furthest in the future.
+ assert_metrics(
+ cache, [3, 9, 8, [6, 7, 8], []], expected_value_size=1, custom_hashtable=False
+ )
+
+ # Insert a large kv pair to evict all keys.
+ k.access_time += 1
+ k.block_id = 10
+ k.block_size = 3
+ k.next_access_seq_no = sys.maxsize
+ cache.access(k)
+ assert_metrics(
+ cache, [3, 10, 9, [10], []], expected_value_size=3, custom_hashtable=False
+ )
+ print("Test OPT cache: Success")
+
+
+def test_trace_cache():
+ print("Test trace cache")
+ cache = TraceCache(0)
+ k = TraceRecord(
+ access_time=0,
+ block_id=1,
+ block_type=1,
+ block_size=1,
+ cf_id=0,
+ cf_name="",
+ level=0,
+ fd=0,
+ caller=1,
+ no_insert=0,
+ get_id=1,
+ key_id=1,
+ kv_size=0,
+ is_hit=1,
+ referenced_key_exist_in_block=1,
+ num_keys_in_block=0,
+ table_id=0,
+ seq_number=0,
+ block_key_size=0,
+ key_size=0,
+ block_offset_in_file=0,
+ next_access_seq_no=7,
+ )
+ cache.access(k)
+ assert cache.miss_ratio_stats.num_accesses == 1
+ assert cache.miss_ratio_stats.num_misses == 0
+ k.is_hit = 0
+ cache.access(k)
+ assert cache.miss_ratio_stats.num_accesses == 2
+ assert cache.miss_ratio_stats.num_misses == 1
+ print("Test trace cache: Success")
+
+
+if __name__ == "__main__":
+ test_hash_table()
+ test_trace_cache()
+ test_opt_cache()
+ test_lru_cache(
+ ThompsonSamplingCache(
+ 3, enable_cache_row_key=0, policies=[LRUPolicy()], cost_class_label=None
+ ),
+ custom_hashtable=True,
+ )
+ test_lru_cache(LRUCache(3, enable_cache_row_key=0), custom_hashtable=False)
+ test_mru_cache()
+ test_lfu_cache()
+ test_hybrid(
+ ThompsonSamplingCache(
+ kSampleSize,
+ enable_cache_row_key=1,
+ policies=[LRUPolicy()],
+ cost_class_label=None,
+ )
+ )
+ test_hybrid(
+ LinUCBCache(
+ kSampleSize,
+ enable_cache_row_key=1,
+ policies=[LRUPolicy()],
+ cost_class_label=None,
+ )
+ )
+ for cache_type in [
+ "ts",
+ "opt",
+ "arc",
+ "pylfu",
+ "pymru",
+ "trace",
+ "pyhb",
+ "lru",
+ "pylru",
+ "linucb",
+ "gdsize",
+ "pycctbbt",
+ "pycctb",
+ "pyccbt",
+ ]:
+ for enable_row_cache in [0, 1, 2]:
+ cache_type_str = cache_type
+ if cache_type != "opt" and cache_type != "trace":
+ if enable_row_cache == 1:
+ cache_type_str += "_hybrid"
+ elif enable_row_cache == 2:
+ cache_type_str += "_hybridn"
+ test_mix(create_cache(cache_type_str, cache_size=100, downsample_size=1))
+ test_end_to_end()
diff --git a/src/rocksdb/tools/block_cache_analyzer/block_cache_trace_analyzer.cc b/src/rocksdb/tools/block_cache_analyzer/block_cache_trace_analyzer.cc
new file mode 100644
index 000000000..f0bb6975b
--- /dev/null
+++ b/src/rocksdb/tools/block_cache_analyzer/block_cache_trace_analyzer.cc
@@ -0,0 +1,2316 @@
+// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
+// This source code is licensed under both the GPLv2 (found in the
+// COPYING file in the root directory) and Apache 2.0 License
+// (found in the LICENSE.Apache file in the root directory).
+
+#ifndef ROCKSDB_LITE
+#ifdef GFLAGS
+#include "tools/block_cache_analyzer/block_cache_trace_analyzer.h"
+
+#include <algorithm>
+#include <cinttypes>
+#include <cstdio>
+#include <cstdlib>
+#include <fstream>
+#include <iomanip>
+#include <iostream>
+#include <memory>
+#include <random>
+#include <sstream>
+
+#include "monitoring/histogram.h"
+#include "rocksdb/system_clock.h"
+#include "rocksdb/trace_record.h"
+#include "util/gflags_compat.h"
+#include "util/string_util.h"
+
+using GFLAGS_NAMESPACE::ParseCommandLineFlags;
+
+DEFINE_string(block_cache_trace_path, "", "The trace file path.");
+DEFINE_bool(is_block_cache_human_readable_trace, false,
+ "Is the trace file provided for analysis generated by running "
+ "block_cache_trace_analyzer with "
+ "FLAGS_human_readable_trace_file_path is specified.");
+DEFINE_string(
+ block_cache_sim_config_path, "",
+ "The config file path. One cache configuration per line. The format of a "
+ "cache configuration is "
+ "cache_name,num_shard_bits,ghost_capacity,cache_capacity_1,...,cache_"
+ "capacity_N. Supported cache names are lru, lru_priority, lru_hybrid, and "
+ "lru_hybrid_no_insert_on_row_miss. User may also add a prefix 'ghost_' to "
+ "a cache_name to add a ghost cache in front of the real cache. "
+ "ghost_capacity and cache_capacity can be xK, xM or xG where x is a "
+ "positive number.");
+DEFINE_int32(block_cache_trace_downsample_ratio, 1,
+ "The trace collected accesses on one in every "
+ "block_cache_trace_downsample_ratio blocks. We scale "
+ "down the simulated cache size by this ratio.");
+DEFINE_bool(print_block_size_stats, false,
+ "Print block size distribution and the distribution break down by "
+ "block type and column family.");
+DEFINE_bool(print_access_count_stats, false,
+ "Print access count distribution and the distribution break down "
+ "by block type and column family.");
+DEFINE_bool(print_data_block_access_count_stats, false,
+ "Print data block accesses by user Get and Multi-Get.");
+DEFINE_int32(cache_sim_warmup_seconds, 0,
+ "The number of seconds to warmup simulated caches. The hit/miss "
+ "counters are reset after the warmup completes.");
+DEFINE_int32(analyze_bottom_k_access_count_blocks, 0,
+ "Print out detailed access information for blocks with their "
+ "number of accesses are the bottom k among all blocks.");
+DEFINE_int32(analyze_top_k_access_count_blocks, 0,
+ "Print out detailed access information for blocks with their "
+ "number of accesses are the top k among all blocks.");
+DEFINE_string(block_cache_analysis_result_dir, "",
+ "The directory that saves block cache analysis results.");
+DEFINE_string(
+ timeline_labels, "",
+ "Group the number of accesses per block per second using these labels. "
+ "Possible labels are a combination of the following: cf (column family), "
+ "sst, level, bt (block type), caller, block. For example, label \"cf_bt\" "
+ "means the number of access per second is grouped by unique pairs of "
+ "\"cf_bt\". A label \"all\" contains the aggregated number of accesses per "
+ "second across all possible labels.");
+DEFINE_string(reuse_distance_labels, "",
+ "Group the reuse distance of a block using these labels. Reuse "
+ "distance is defined as the cumulated size of unique blocks read "
+ "between two consecutive accesses on the same block.");
+DEFINE_string(
+ reuse_distance_buckets, "",
+ "Group blocks by their reuse distances given these buckets. For "
+ "example, if 'reuse_distance_buckets' is '1K,1M,1G', we will "
+ "create four buckets. The first three buckets contain the number of "
+ "blocks with reuse distance less than 1KB, between 1K and 1M, between 1M "
+ "and 1G, respectively. The last bucket contains the number of blocks with "
+ "reuse distance larger than 1G. ");
+DEFINE_string(
+ reuse_interval_labels, "",
+ "Group the reuse interval of a block using these labels. Reuse "
+ "interval is defined as the time between two consecutive accesses "
+ "on the same block.");
+DEFINE_string(
+ reuse_interval_buckets, "",
+ "Group blocks by their reuse interval given these buckets. For "
+ "example, if 'reuse_distance_buckets' is '1,10,100', we will "
+ "create four buckets. The first three buckets contain the number of "
+ "blocks with reuse interval less than 1 second, between 1 second and 10 "
+ "seconds, between 10 seconds and 100 seconds, respectively. The last "
+ "bucket contains the number of blocks with reuse interval longer than 100 "
+ "seconds.");
+DEFINE_string(
+ reuse_lifetime_labels, "",
+ "Group the reuse lifetime of a block using these labels. Reuse "
+ "lifetime is defined as the time interval between the first access on a "
+ "block and the last access on the same block. For blocks that are only "
+ "accessed once, its lifetime is set to kMaxUint64.");
+DEFINE_string(
+ reuse_lifetime_buckets, "",
+ "Group blocks by their reuse lifetime given these buckets. For "
+ "example, if 'reuse_lifetime_buckets' is '1,10,100', we will "
+ "create four buckets. The first three buckets contain the number of "
+ "blocks with reuse lifetime less than 1 second, between 1 second and 10 "
+ "seconds, between 10 seconds and 100 seconds, respectively. The last "
+ "bucket contains the number of blocks with reuse lifetime longer than 100 "
+ "seconds.");
+DEFINE_string(
+ analyze_callers, "",
+ "The list of callers to perform a detailed analysis on. If speicfied, the "
+ "analyzer will output a detailed percentage of accesses for each caller "
+ "break down by column family, level, and block type. A list of available "
+ "callers are: Get, MultiGet, Iterator, ApproximateSize, VerifyChecksum, "
+ "SSTDumpTool, ExternalSSTIngestion, Repair, Prefetch, Compaction, "
+ "CompactionRefill, Flush, SSTFileReader, Uncategorized.");
+DEFINE_string(access_count_buckets, "",
+ "Group number of blocks by their access count given these "
+ "buckets. If specified, the analyzer will output a detailed "
+ "analysis on the number of blocks grouped by their access count "
+ "break down by block type and column family.");
+DEFINE_int32(analyze_blocks_reuse_k_reuse_window, 0,
+ "Analyze the percentage of blocks that are accessed in the "
+ "[k, 2*k] seconds are accessed again in the next [2*k, 3*k], "
+ "[3*k, 4*k],...,[k*(n-1), k*n] seconds. ");
+DEFINE_string(analyze_get_spatial_locality_labels, "",
+ "Group data blocks using these labels.");
+DEFINE_string(analyze_get_spatial_locality_buckets, "",
+ "Group data blocks by their statistics using these buckets.");
+DEFINE_string(skew_labels, "",
+ "Group the access count of a block using these labels.");
+DEFINE_string(skew_buckets, "", "Group the skew labels using these buckets.");
+DEFINE_bool(mrc_only, false,
+ "Evaluate alternative cache policies only. When this flag is true, "
+ "the analyzer does NOT maintain states of each block in memory for "
+ "analysis. It only feeds the accesses into the cache simulators.");
+DEFINE_string(
+ analyze_correlation_coefficients_labels, "",
+ "Analyze the correlation coefficients of features such as number of past "
+ "accesses with regard to the number of accesses till the next access.");
+DEFINE_int32(analyze_correlation_coefficients_max_number_of_values, 1000000,
+ "The maximum number of values for a feature. If the number of "
+ "values for a feature is larger than this max, it randomly "
+ "selects 'max' number of values.");
+DEFINE_string(human_readable_trace_file_path, "",
+ "The filt path that saves human readable access records.");
+
+namespace ROCKSDB_NAMESPACE {
+namespace {
+
+const std::string kMissRatioCurveFileName = "mrc";
+const std::string kGroupbyBlock = "block";
+const std::string kGroupbyTable = "table";
+const std::string kGroupbyColumnFamily = "cf";
+const std::string kGroupbySSTFile = "sst";
+const std::string kGroupbyBlockType = "bt";
+const std::string kGroupbyCaller = "caller";
+const std::string kGroupbyLevel = "level";
+const std::string kGroupbyAll = "all";
+const std::set<std::string> kGroupbyLabels{
+ kGroupbyBlock, kGroupbyColumnFamily, kGroupbySSTFile, kGroupbyLevel,
+ kGroupbyBlockType, kGroupbyCaller, kGroupbyAll};
+const std::string kSupportedCacheNames =
+ " lru ghost_lru lru_priority ghost_lru_priority lru_hybrid "
+ "ghost_lru_hybrid lru_hybrid_no_insert_on_row_miss "
+ "ghost_lru_hybrid_no_insert_on_row_miss ";
+
+// The suffix for the generated csv files.
+const std::string kFileNameSuffixMissRatioTimeline = "miss_ratio_timeline";
+const std::string kFileNameSuffixMissTimeline = "miss_timeline";
+const std::string kFileNameSuffixSkew = "skewness";
+const std::string kFileNameSuffixAccessTimeline = "access_timeline";
+const std::string kFileNameSuffixCorrelation = "correlation_input";
+const std::string kFileNameSuffixAvgReuseIntervalNaccesses =
+ "avg_reuse_interval_naccesses";
+const std::string kFileNameSuffixAvgReuseInterval = "avg_reuse_interval";
+const std::string kFileNameSuffixReuseInterval = "access_reuse_interval";
+const std::string kFileNameSuffixReuseLifetime = "reuse_lifetime";
+const std::string kFileNameSuffixAccessReuseBlocksTimeline =
+ "reuse_blocks_timeline";
+const std::string kFileNameSuffixPercentOfAccessSummary =
+ "percentage_of_accesses_summary";
+const std::string kFileNameSuffixPercentRefKeys = "percent_ref_keys";
+const std::string kFileNameSuffixPercentDataSizeOnRefKeys =
+ "percent_data_size_on_ref_keys";
+const std::string kFileNameSuffixPercentAccessesOnRefKeys =
+ "percent_accesses_on_ref_keys";
+const std::string kFileNameSuffixAccessCountSummary = "access_count_summary";
+
+std::string block_type_to_string(TraceType type) {
+ switch (type) {
+ case kBlockTraceFilterBlock:
+ return "Filter";
+ case kBlockTraceDataBlock:
+ return "Data";
+ case kBlockTraceIndexBlock:
+ return "Index";
+ case kBlockTraceRangeDeletionBlock:
+ return "RangeDeletion";
+ case kBlockTraceUncompressionDictBlock:
+ return "UncompressionDict";
+ default:
+ break;
+ }
+ // This cannot happen.
+ return "InvalidType";
+}
+
+std::string caller_to_string(TableReaderCaller caller) {
+ switch (caller) {
+ case kUserGet:
+ return "Get";
+ case kUserMultiGet:
+ return "MultiGet";
+ case kUserIterator:
+ return "Iterator";
+ case kUserApproximateSize:
+ return "ApproximateSize";
+ case kUserVerifyChecksum:
+ return "VerifyChecksum";
+ case kSSTDumpTool:
+ return "SSTDumpTool";
+ case kExternalSSTIngestion:
+ return "ExternalSSTIngestion";
+ case kRepair:
+ return "Repair";
+ case kPrefetch:
+ return "Prefetch";
+ case kCompaction:
+ return "Compaction";
+ case kCompactionRefill:
+ return "CompactionRefill";
+ case kFlush:
+ return "Flush";
+ case kSSTFileReader:
+ return "SSTFileReader";
+ case kUncategorized:
+ return "Uncategorized";
+ default:
+ break;
+ }
+ // This cannot happen.
+ return "InvalidCaller";
+}
+
+TableReaderCaller string_to_caller(std::string caller_str) {
+ if (caller_str == "Get") {
+ return kUserGet;
+ } else if (caller_str == "MultiGet") {
+ return kUserMultiGet;
+ } else if (caller_str == "Iterator") {
+ return kUserIterator;
+ } else if (caller_str == "ApproximateSize") {
+ return kUserApproximateSize;
+ } else if (caller_str == "VerifyChecksum") {
+ return kUserVerifyChecksum;
+ } else if (caller_str == "SSTDumpTool") {
+ return kSSTDumpTool;
+ } else if (caller_str == "ExternalSSTIngestion") {
+ return kExternalSSTIngestion;
+ } else if (caller_str == "Repair") {
+ return kRepair;
+ } else if (caller_str == "Prefetch") {
+ return kPrefetch;
+ } else if (caller_str == "Compaction") {
+ return kCompaction;
+ } else if (caller_str == "CompactionRefill") {
+ return kCompactionRefill;
+ } else if (caller_str == "Flush") {
+ return kFlush;
+ } else if (caller_str == "SSTFileReader") {
+ return kSSTFileReader;
+ } else if (caller_str == "Uncategorized") {
+ return kUncategorized;
+ }
+ return TableReaderCaller::kMaxBlockCacheLookupCaller;
+}
+
+bool is_user_access(TableReaderCaller caller) {
+ switch (caller) {
+ case kUserGet:
+ case kUserMultiGet:
+ case kUserIterator:
+ case kUserApproximateSize:
+ case kUserVerifyChecksum:
+ return true;
+ default:
+ break;
+ }
+ return false;
+}
+
+const char kBreakLine[] =
+ "***************************************************************\n";
+
+void print_break_lines(uint32_t num_break_lines) {
+ for (uint32_t i = 0; i < num_break_lines; i++) {
+ fprintf(stdout, kBreakLine);
+ }
+}
+
+double percent(uint64_t numerator, uint64_t denomenator) {
+ if (denomenator == 0) {
+ return -1;
+ }
+ return static_cast<double>(numerator * 100.0 / denomenator);
+}
+
+std::map<uint64_t, uint64_t> adjust_time_unit(
+ const std::map<uint64_t, uint64_t>& time_stats, uint64_t time_unit) {
+ if (time_unit == 1) {
+ return time_stats;
+ }
+ std::map<uint64_t, uint64_t> adjusted_time_stats;
+ for (auto const& time : time_stats) {
+ adjusted_time_stats[static_cast<uint64_t>(time.first / time_unit)] +=
+ time.second;
+ }
+ return adjusted_time_stats;
+}
+} // namespace
+
+void BlockCacheTraceAnalyzer::WriteMissRatioCurves() const {
+ if (!cache_simulator_) {
+ return;
+ }
+ if (output_dir_.empty()) {
+ return;
+ }
+ uint64_t trace_duration =
+ trace_end_timestamp_in_seconds_ - trace_start_timestamp_in_seconds_;
+ uint64_t total_accesses = access_sequence_number_;
+ const std::string output_miss_ratio_curve_path =
+ output_dir_ + "/" + std::to_string(trace_duration) + "_" +
+ std::to_string(total_accesses) + "_" + kMissRatioCurveFileName;
+ std::ofstream out(output_miss_ratio_curve_path);
+ if (!out.is_open()) {
+ return;
+ }
+ // Write header.
+ const std::string header =
+ "cache_name,num_shard_bits,ghost_capacity,capacity,miss_ratio,total_"
+ "accesses";
+ out << header << std::endl;
+ for (auto const& config_caches : cache_simulator_->sim_caches()) {
+ const CacheConfiguration& config = config_caches.first;
+ for (uint32_t i = 0; i < config.cache_capacities.size(); i++) {
+ double miss_ratio =
+ config_caches.second[i]->miss_ratio_stats().miss_ratio();
+ // Write the body.
+ out << config.cache_name;
+ out << ",";
+ out << config.num_shard_bits;
+ out << ",";
+ out << config.ghost_cache_capacity;
+ out << ",";
+ out << config.cache_capacities[i];
+ out << ",";
+ out << std::fixed << std::setprecision(4) << miss_ratio;
+ out << ",";
+ out << config_caches.second[i]->miss_ratio_stats().total_accesses();
+ out << std::endl;
+ }
+ }
+ out.close();
+}
+
+void BlockCacheTraceAnalyzer::UpdateFeatureVectors(
+ const std::vector<uint64_t>& access_sequence_number_timeline,
+ const std::vector<uint64_t>& access_timeline, const std::string& label,
+ std::map<std::string, Features>* label_features,
+ std::map<std::string, Predictions>* label_predictions) const {
+ if (access_sequence_number_timeline.empty() || access_timeline.empty()) {
+ return;
+ }
+ assert(access_timeline.size() == access_sequence_number_timeline.size());
+ uint64_t prev_access_sequence_number = access_sequence_number_timeline[0];
+ uint64_t prev_access_timestamp = access_timeline[0];
+ for (uint32_t i = 0; i < access_sequence_number_timeline.size(); i++) {
+ uint64_t num_accesses_since_last_access =
+ access_sequence_number_timeline[i] - prev_access_sequence_number;
+ uint64_t elapsed_time_since_last_access =
+ access_timeline[i] - prev_access_timestamp;
+ prev_access_sequence_number = access_sequence_number_timeline[i];
+ prev_access_timestamp = access_timeline[i];
+ if (i < access_sequence_number_timeline.size() - 1) {
+ (*label_features)[label].num_accesses_since_last_access.push_back(
+ num_accesses_since_last_access);
+ (*label_features)[label].num_past_accesses.push_back(i);
+ (*label_features)[label].elapsed_time_since_last_access.push_back(
+ elapsed_time_since_last_access);
+ }
+ if (i >= 1) {
+ (*label_predictions)[label].num_accesses_till_next_access.push_back(
+ num_accesses_since_last_access);
+ (*label_predictions)[label].elapsed_time_till_next_access.push_back(
+ elapsed_time_since_last_access);
+ }
+ }
+}
+
+void BlockCacheTraceAnalyzer::WriteMissRatioTimeline(uint64_t time_unit) const {
+ if (!cache_simulator_ || output_dir_.empty()) {
+ return;
+ }
+ std::map<uint64_t, std::map<std::string, std::map<uint64_t, double>>>
+ cs_name_timeline;
+ uint64_t start_time = std::numeric_limits<uint64_t>::max();
+ uint64_t end_time = 0;
+ const std::map<uint64_t, uint64_t>& trace_num_misses =
+ adjust_time_unit(miss_ratio_stats_.num_misses_timeline(), time_unit);
+ const std::map<uint64_t, uint64_t>& trace_num_accesses =
+ adjust_time_unit(miss_ratio_stats_.num_accesses_timeline(), time_unit);
+ assert(trace_num_misses.size() == trace_num_accesses.size());
+ for (auto const& num_miss : trace_num_misses) {
+ uint64_t time = num_miss.first;
+ start_time = std::min(start_time, time);
+ end_time = std::max(end_time, time);
+ uint64_t miss = num_miss.second;
+ auto it = trace_num_accesses.find(time);
+ assert(it != trace_num_accesses.end());
+ uint64_t access = it->second;
+ cs_name_timeline[std::numeric_limits<uint64_t>::max()]["trace"][time] =
+ percent(miss, access);
+ }
+ for (auto const& config_caches : cache_simulator_->sim_caches()) {
+ const CacheConfiguration& config = config_caches.first;
+ std::string cache_label = config.cache_name + "-" +
+ std::to_string(config.num_shard_bits) + "-" +
+ std::to_string(config.ghost_cache_capacity);
+ for (uint32_t i = 0; i < config.cache_capacities.size(); i++) {
+ const std::map<uint64_t, uint64_t>& num_misses = adjust_time_unit(
+ config_caches.second[i]->miss_ratio_stats().num_misses_timeline(),
+ time_unit);
+ const std::map<uint64_t, uint64_t>& num_accesses = adjust_time_unit(
+ config_caches.second[i]->miss_ratio_stats().num_accesses_timeline(),
+ time_unit);
+ assert(num_misses.size() == num_accesses.size());
+ for (auto const& num_miss : num_misses) {
+ uint64_t time = num_miss.first;
+ start_time = std::min(start_time, time);
+ end_time = std::max(end_time, time);
+ uint64_t miss = num_miss.second;
+ auto it = num_accesses.find(time);
+ assert(it != num_accesses.end());
+ uint64_t access = it->second;
+ cs_name_timeline[config.cache_capacities[i]][cache_label][time] =
+ percent(miss, access);
+ }
+ }
+ }
+ for (auto const& it : cs_name_timeline) {
+ const std::string output_miss_ratio_timeline_path =
+ output_dir_ + "/" + std::to_string(it.first) + "_" +
+ std::to_string(time_unit) + "_" + kFileNameSuffixMissRatioTimeline;
+ std::ofstream out(output_miss_ratio_timeline_path);
+ if (!out.is_open()) {
+ return;
+ }
+ std::string header("time");
+ for (uint64_t now = start_time; now <= end_time; now++) {
+ header += ",";
+ header += std::to_string(now);
+ }
+ out << header << std::endl;
+ for (auto const& label : it.second) {
+ std::string row(label.first);
+ for (uint64_t now = start_time; now <= end_time; now++) {
+ auto misses = label.second.find(now);
+ row += ",";
+ if (misses != label.second.end()) {
+ row += std::to_string(misses->second);
+ } else {
+ row += "0";
+ }
+ }
+ out << row << std::endl;
+ }
+ out.close();
+ }
+}
+
+void BlockCacheTraceAnalyzer::WriteMissTimeline(uint64_t time_unit) const {
+ if (!cache_simulator_ || output_dir_.empty()) {
+ return;
+ }
+ std::map<uint64_t, std::map<std::string, std::map<uint64_t, uint64_t>>>
+ cs_name_timeline;
+ uint64_t start_time = std::numeric_limits<uint64_t>::max();
+ uint64_t end_time = 0;
+ const std::map<uint64_t, uint64_t>& trace_num_misses =
+ adjust_time_unit(miss_ratio_stats_.num_misses_timeline(), time_unit);
+ for (auto const& num_miss : trace_num_misses) {
+ uint64_t time = num_miss.first;
+ start_time = std::min(start_time, time);
+ end_time = std::max(end_time, time);
+ uint64_t miss = num_miss.second;
+ cs_name_timeline[std::numeric_limits<uint64_t>::max()]["trace"][time] =
+ miss;
+ }
+ for (auto const& config_caches : cache_simulator_->sim_caches()) {
+ const CacheConfiguration& config = config_caches.first;
+ std::string cache_label = config.cache_name + "-" +
+ std::to_string(config.num_shard_bits) + "-" +
+ std::to_string(config.ghost_cache_capacity);
+ for (uint32_t i = 0; i < config.cache_capacities.size(); i++) {
+ const std::map<uint64_t, uint64_t>& num_misses = adjust_time_unit(
+ config_caches.second[i]->miss_ratio_stats().num_misses_timeline(),
+ time_unit);
+ for (auto const& num_miss : num_misses) {
+ uint64_t time = num_miss.first;
+ start_time = std::min(start_time, time);
+ end_time = std::max(end_time, time);
+ uint64_t miss = num_miss.second;
+ cs_name_timeline[config.cache_capacities[i]][cache_label][time] = miss;
+ }
+ }
+ }
+ for (auto const& it : cs_name_timeline) {
+ const std::string output_miss_ratio_timeline_path =
+ output_dir_ + "/" + std::to_string(it.first) + "_" +
+ std::to_string(time_unit) + "_" + kFileNameSuffixMissTimeline;
+ std::ofstream out(output_miss_ratio_timeline_path);
+ if (!out.is_open()) {
+ return;
+ }
+ std::string header("time");
+ for (uint64_t now = start_time; now <= end_time; now++) {
+ header += ",";
+ header += std::to_string(now);
+ }
+ out << header << std::endl;
+ for (auto const& label : it.second) {
+ std::string row(label.first);
+ for (uint64_t now = start_time; now <= end_time; now++) {
+ auto misses = label.second.find(now);
+ row += ",";
+ if (misses != label.second.end()) {
+ row += std::to_string(misses->second);
+ } else {
+ row += "0";
+ }
+ }
+ out << row << std::endl;
+ }
+ out.close();
+ }
+}
+
+void BlockCacheTraceAnalyzer::WriteSkewness(
+ const std::string& label_str, const std::vector<uint64_t>& percent_buckets,
+ TraceType target_block_type) const {
+ std::set<std::string> labels = ParseLabelStr(label_str);
+ std::map<std::string, uint64_t> label_naccesses;
+ uint64_t total_naccesses = 0;
+ auto block_callback = [&](const std::string& cf_name, uint64_t fd,
+ uint32_t level, TraceType type,
+ const std::string& /*block_key*/, uint64_t block_id,
+ const BlockAccessInfo& block) {
+ if (target_block_type != TraceType::kTraceMax &&
+ target_block_type != type) {
+ return;
+ }
+ const std::string label = BuildLabel(
+ labels, cf_name, fd, level, type,
+ TableReaderCaller::kMaxBlockCacheLookupCaller, block_id, block);
+ label_naccesses[label] += block.num_accesses;
+ total_naccesses += block.num_accesses;
+ };
+ TraverseBlocks(block_callback, &labels);
+ std::map<std::string, std::map<uint64_t, uint64_t>> label_bucket_naccesses;
+ std::vector<std::pair<std::string, uint64_t>> pairs;
+ for (auto const& itr : label_naccesses) {
+ pairs.push_back(itr);
+ }
+ // Sort in descending order.
+ sort(pairs.begin(), pairs.end(),
+ [](const std::pair<std::string, uint64_t>& a,
+ const std::pair<std::string, uint64_t>& b) {
+ return b.second < a.second;
+ });
+
+ size_t prev_start_index = 0;
+ for (auto const& percent : percent_buckets) {
+ label_bucket_naccesses[label_str][percent] = 0;
+ size_t end_index = 0;
+ if (percent == std::numeric_limits<uint64_t>::max()) {
+ end_index = label_naccesses.size();
+ } else {
+ end_index = percent * label_naccesses.size() / 100;
+ }
+ for (size_t i = prev_start_index; i < end_index; i++) {
+ label_bucket_naccesses[label_str][percent] += pairs[i].second;
+ }
+ prev_start_index = end_index;
+ }
+ std::string filename_suffix;
+ if (target_block_type != TraceType::kTraceMax) {
+ filename_suffix = block_type_to_string(target_block_type);
+ filename_suffix += "_";
+ }
+ filename_suffix += kFileNameSuffixSkew;
+ WriteStatsToFile(label_str, percent_buckets, filename_suffix,
+ label_bucket_naccesses, total_naccesses);
+}
+
+void BlockCacheTraceAnalyzer::WriteCorrelationFeatures(
+ const std::string& label_str, uint32_t max_number_of_values) const {
+ std::set<std::string> labels = ParseLabelStr(label_str);
+ std::map<std::string, Features> label_features;
+ std::map<std::string, Predictions> label_predictions;
+ auto block_callback =
+ [&](const std::string& cf_name, uint64_t fd, uint32_t level,
+ TraceType block_type, const std::string& /*block_key*/,
+ uint64_t /*block_key_id*/, const BlockAccessInfo& block) {
+ if (block.table_id == 0 && labels.find(kGroupbyTable) != labels.end()) {
+ // We only know table id information for get requests.
+ return;
+ }
+ if (labels.find(kGroupbyCaller) != labels.end()) {
+ // Group by caller.
+ for (auto const& caller_map : block.caller_access_timeline) {
+ const std::string label =
+ BuildLabel(labels, cf_name, fd, level, block_type,
+ caller_map.first, /*block_id=*/0, block);
+ auto it = block.caller_access_sequence__number_timeline.find(
+ caller_map.first);
+ assert(it != block.caller_access_sequence__number_timeline.end());
+ UpdateFeatureVectors(it->second, caller_map.second, label,
+ &label_features, &label_predictions);
+ }
+ return;
+ }
+ const std::string label =
+ BuildLabel(labels, cf_name, fd, level, block_type,
+ TableReaderCaller::kMaxBlockCacheLookupCaller,
+ /*block_id=*/0, block);
+ UpdateFeatureVectors(block.access_sequence_number_timeline,
+ block.access_timeline, label, &label_features,
+ &label_predictions);
+ };
+ TraverseBlocks(block_callback, &labels);
+ WriteCorrelationFeaturesToFile(label_str, label_features, label_predictions,
+ max_number_of_values);
+}
+
+void BlockCacheTraceAnalyzer::WriteCorrelationFeaturesToFile(
+ const std::string& label,
+ const std::map<std::string, Features>& label_features,
+ const std::map<std::string, Predictions>& label_predictions,
+ uint32_t max_number_of_values) const {
+ for (auto const& label_feature_vectors : label_features) {
+ const Features& past = label_feature_vectors.second;
+ auto it = label_predictions.find(label_feature_vectors.first);
+ assert(it != label_predictions.end());
+ const Predictions& future = it->second;
+ const std::string output_path = output_dir_ + "/" + label + "_" +
+ label_feature_vectors.first + "_" +
+ kFileNameSuffixCorrelation;
+ std::ofstream out(output_path);
+ if (!out.is_open()) {
+ return;
+ }
+ std::string header(
+ "num_accesses_since_last_access,elapsed_time_since_last_access,num_"
+ "past_accesses,num_accesses_till_next_access,elapsed_time_till_next_"
+ "access");
+ out << header << std::endl;
+ std::vector<uint32_t> indexes;
+ for (uint32_t i = 0; i < past.num_accesses_since_last_access.size(); i++) {
+ indexes.push_back(i);
+ }
+ RandomShuffle(indexes.begin(), indexes.end());
+ for (uint32_t i = 0; i < max_number_of_values && i < indexes.size(); i++) {
+ uint32_t rand_index = indexes[i];
+ out << std::to_string(past.num_accesses_since_last_access[rand_index])
+ << ",";
+ out << std::to_string(past.elapsed_time_since_last_access[rand_index])
+ << ",";
+ out << std::to_string(past.num_past_accesses[rand_index]) << ",";
+ out << std::to_string(future.num_accesses_till_next_access[rand_index])
+ << ",";
+ out << std::to_string(future.elapsed_time_till_next_access[rand_index])
+ << std::endl;
+ }
+ out.close();
+ }
+}
+
+void BlockCacheTraceAnalyzer::WriteCorrelationFeaturesForGet(
+ uint32_t max_number_of_values) const {
+ std::string label = "GetKeyInfo";
+ std::map<std::string, Features> label_features;
+ std::map<std::string, Predictions> label_predictions;
+ for (auto const& get_info : get_key_info_map_) {
+ const GetKeyInfo& info = get_info.second;
+ UpdateFeatureVectors(info.access_sequence_number_timeline,
+ info.access_timeline, label, &label_features,
+ &label_predictions);
+ }
+ WriteCorrelationFeaturesToFile(label, label_features, label_predictions,
+ max_number_of_values);
+}
+
+std::set<std::string> BlockCacheTraceAnalyzer::ParseLabelStr(
+ const std::string& label_str) const {
+ std::stringstream ss(label_str);
+ std::set<std::string> labels;
+ // label_str is in the form of "label1_label2_label3", e.g., cf_bt.
+ while (ss.good()) {
+ std::string label_name;
+ getline(ss, label_name, '_');
+ if (kGroupbyLabels.find(label_name) == kGroupbyLabels.end()) {
+ // Unknown label name.
+ fprintf(stderr, "Unknown label name %s, label string %s\n",
+ label_name.c_str(), label_str.c_str());
+ return {};
+ }
+ labels.insert(label_name);
+ }
+ return labels;
+}
+
+std::string BlockCacheTraceAnalyzer::BuildLabel(
+ const std::set<std::string>& labels, const std::string& cf_name,
+ uint64_t fd, uint32_t level, TraceType type, TableReaderCaller caller,
+ uint64_t block_key, const BlockAccessInfo& block) const {
+ std::map<std::string, std::string> label_value_map;
+ label_value_map[kGroupbyAll] = kGroupbyAll;
+ label_value_map[kGroupbyLevel] = std::to_string(level);
+ label_value_map[kGroupbyCaller] = caller_to_string(caller);
+ label_value_map[kGroupbySSTFile] = std::to_string(fd);
+ label_value_map[kGroupbyBlockType] = block_type_to_string(type);
+ label_value_map[kGroupbyColumnFamily] = cf_name;
+ label_value_map[kGroupbyBlock] = std::to_string(block_key);
+ label_value_map[kGroupbyTable] = std::to_string(block.table_id);
+ // Concatenate the label values.
+ std::string label;
+ for (auto const& l : labels) {
+ label += label_value_map[l];
+ label += "-";
+ }
+ if (!label.empty()) {
+ label.pop_back();
+ }
+ return label;
+}
+
+void BlockCacheTraceAnalyzer::TraverseBlocks(
+ std::function<void(const std::string& /*cf_name*/, uint64_t /*fd*/,
+ uint32_t /*level*/, TraceType /*block_type*/,
+ const std::string& /*block_key*/,
+ uint64_t /*block_key_id*/,
+ const BlockAccessInfo& /*block_access_info*/)>
+ block_callback,
+ std::set<std::string>* labels) const {
+ for (auto const& cf_aggregates : cf_aggregates_map_) {
+ // Stats per column family.
+ const std::string& cf_name = cf_aggregates.first;
+ for (auto const& file_aggregates : cf_aggregates.second.fd_aggregates_map) {
+ // Stats per SST file.
+ const uint64_t fd = file_aggregates.first;
+ const uint32_t level = file_aggregates.second.level;
+ for (auto const& block_type_aggregates :
+ file_aggregates.second.block_type_aggregates_map) {
+ // Stats per block type.
+ const TraceType type = block_type_aggregates.first;
+ for (auto const& block_access_info :
+ block_type_aggregates.second.block_access_info_map) {
+ // Stats per block.
+ if (labels && block_access_info.second.table_id == 0 &&
+ labels->find(kGroupbyTable) != labels->end()) {
+ // We only know table id information for get requests.
+ return;
+ }
+ block_callback(cf_name, fd, level, type, block_access_info.first,
+ block_access_info.second.block_id,
+ block_access_info.second);
+ }
+ }
+ }
+ }
+}
+
+void BlockCacheTraceAnalyzer::WriteGetSpatialLocality(
+ const std::string& label_str,
+ const std::vector<uint64_t>& percent_buckets) const {
+ std::set<std::string> labels = ParseLabelStr(label_str);
+ std::map<std::string, std::map<uint64_t, uint64_t>> label_pnrefkeys_nblocks;
+ std::map<std::string, std::map<uint64_t, uint64_t>> label_pnrefs_nblocks;
+ std::map<std::string, std::map<uint64_t, uint64_t>> label_pndatasize_nblocks;
+ uint64_t nblocks = 0;
+ auto block_callback = [&](const std::string& cf_name, uint64_t fd,
+ uint32_t level, TraceType /*block_type*/,
+ const std::string& /*block_key*/,
+ uint64_t /*block_key_id*/,
+ const BlockAccessInfo& block) {
+ if (block.num_keys == 0) {
+ return;
+ }
+ uint64_t naccesses = 0;
+ for (auto const& key_access : block.key_num_access_map) {
+ for (auto const& caller_access : key_access.second) {
+ if (caller_access.first == TableReaderCaller::kUserGet) {
+ naccesses += caller_access.second;
+ }
+ }
+ }
+ const std::string label =
+ BuildLabel(labels, cf_name, fd, level, TraceType::kBlockTraceDataBlock,
+ TableReaderCaller::kUserGet, /*block_id=*/0, block);
+
+ const uint64_t percent_referenced_for_existing_keys =
+ static_cast<uint64_t>(std::max(
+ percent(block.key_num_access_map.size(), block.num_keys), 0.0));
+ const uint64_t percent_accesses_for_existing_keys =
+ static_cast<uint64_t>(std::max(
+ percent(block.num_referenced_key_exist_in_block, naccesses), 0.0));
+ const uint64_t percent_referenced_data_size = static_cast<uint64_t>(
+ std::max(percent(block.referenced_data_size, block.block_size), 0.0));
+ if (label_pnrefkeys_nblocks.find(label) == label_pnrefkeys_nblocks.end()) {
+ for (auto const& percent_bucket : percent_buckets) {
+ label_pnrefkeys_nblocks[label][percent_bucket] = 0;
+ label_pnrefs_nblocks[label][percent_bucket] = 0;
+ label_pndatasize_nblocks[label][percent_bucket] = 0;
+ }
+ }
+ label_pnrefkeys_nblocks[label]
+ .upper_bound(percent_referenced_for_existing_keys)
+ ->second += 1;
+ label_pnrefs_nblocks[label]
+ .upper_bound(percent_accesses_for_existing_keys)
+ ->second += 1;
+ label_pndatasize_nblocks[label]
+ .upper_bound(percent_referenced_data_size)
+ ->second += 1;
+ nblocks += 1;
+ };
+ TraverseBlocks(block_callback, &labels);
+ WriteStatsToFile(label_str, percent_buckets, kFileNameSuffixPercentRefKeys,
+ label_pnrefkeys_nblocks, nblocks);
+ WriteStatsToFile(label_str, percent_buckets,
+ kFileNameSuffixPercentAccessesOnRefKeys,
+ label_pnrefs_nblocks, nblocks);
+ WriteStatsToFile(label_str, percent_buckets,
+ kFileNameSuffixPercentDataSizeOnRefKeys,
+ label_pndatasize_nblocks, nblocks);
+}
+
+void BlockCacheTraceAnalyzer::WriteAccessTimeline(const std::string& label_str,
+ uint64_t time_unit,
+ bool user_access_only) const {
+ std::set<std::string> labels = ParseLabelStr(label_str);
+ uint64_t start_time = std::numeric_limits<uint64_t>::max();
+ uint64_t end_time = 0;
+ std::map<std::string, std::map<uint64_t, uint64_t>> label_access_timeline;
+ std::map<uint64_t, std::vector<std::string>> access_count_block_id_map;
+
+ auto block_callback = [&](const std::string& cf_name, uint64_t fd,
+ uint32_t level, TraceType type,
+ const std::string& /*block_key*/, uint64_t block_id,
+ const BlockAccessInfo& block) {
+ uint64_t naccesses = 0;
+ for (auto const& timeline : block.caller_num_accesses_timeline) {
+ const TableReaderCaller caller = timeline.first;
+ if (user_access_only && !is_user_access(caller)) {
+ continue;
+ }
+ const std::string label =
+ BuildLabel(labels, cf_name, fd, level, type, caller, block_id, block);
+ for (auto const& naccess : timeline.second) {
+ const uint64_t timestamp = naccess.first / time_unit;
+ const uint64_t num = naccess.second;
+ label_access_timeline[label][timestamp] += num;
+ start_time = std::min(start_time, timestamp);
+ end_time = std::max(end_time, timestamp);
+ naccesses += num;
+ }
+ }
+ if (naccesses > 0) {
+ access_count_block_id_map[naccesses].push_back(std::to_string(block_id));
+ }
+ };
+ TraverseBlocks(block_callback, &labels);
+
+ // We have label_access_timeline now. Write them into a file.
+ const std::string user_access_prefix =
+ user_access_only ? "user_access_only_" : "all_access_";
+ const std::string output_path = output_dir_ + "/" + user_access_prefix +
+ label_str + "_" + std::to_string(time_unit) +
+ "_" + kFileNameSuffixAccessTimeline;
+ std::ofstream out(output_path);
+ if (!out.is_open()) {
+ return;
+ }
+ std::string header("time");
+ if (labels.find("block") != labels.end()) {
+ for (uint64_t now = start_time; now <= end_time; now++) {
+ header += ",";
+ header += std::to_string(now);
+ }
+ out << header << std::endl;
+ // Write the most frequently accessed blocks first.
+ for (auto naccess_it = access_count_block_id_map.rbegin();
+ naccess_it != access_count_block_id_map.rend(); naccess_it++) {
+ for (auto& block_id_it : naccess_it->second) {
+ std::string row(block_id_it);
+ for (uint64_t now = start_time; now <= end_time; now++) {
+ auto it = label_access_timeline[block_id_it].find(now);
+ row += ",";
+ if (it != label_access_timeline[block_id_it].end()) {
+ row += std::to_string(it->second);
+ } else {
+ row += "0";
+ }
+ }
+ out << row << std::endl;
+ }
+ }
+ out.close();
+ return;
+ }
+ for (uint64_t now = start_time; now <= end_time; now++) {
+ header += ",";
+ header += std::to_string(now);
+ }
+ out << header << std::endl;
+ for (auto const& label : label_access_timeline) {
+ std::string row(label.first);
+ for (uint64_t now = start_time; now <= end_time; now++) {
+ auto it = label.second.find(now);
+ row += ",";
+ if (it != label.second.end()) {
+ row += std::to_string(it->second);
+ } else {
+ row += "0";
+ }
+ }
+ out << row << std::endl;
+ }
+
+ out.close();
+}
+
+void BlockCacheTraceAnalyzer::WriteReuseDistance(
+ const std::string& label_str,
+ const std::vector<uint64_t>& distance_buckets) const {
+ std::set<std::string> labels = ParseLabelStr(label_str);
+ std::map<std::string, std::map<uint64_t, uint64_t>> label_distance_num_reuses;
+ uint64_t total_num_reuses = 0;
+ auto block_callback = [&](const std::string& cf_name, uint64_t fd,
+ uint32_t level, TraceType type,
+ const std::string& /*block_key*/, uint64_t block_id,
+ const BlockAccessInfo& block) {
+ const std::string label = BuildLabel(
+ labels, cf_name, fd, level, type,
+ TableReaderCaller::kMaxBlockCacheLookupCaller, block_id, block);
+ if (label_distance_num_reuses.find(label) ==
+ label_distance_num_reuses.end()) {
+ // The first time we encounter this label.
+ for (auto const& distance_bucket : distance_buckets) {
+ label_distance_num_reuses[label][distance_bucket] = 0;
+ }
+ }
+ for (auto const& reuse_distance : block.reuse_distance_count) {
+ label_distance_num_reuses[label]
+ .upper_bound(reuse_distance.first)
+ ->second += reuse_distance.second;
+ total_num_reuses += reuse_distance.second;
+ }
+ };
+ TraverseBlocks(block_callback, &labels);
+ // We have label_naccesses and label_distance_num_reuses now. Write them into
+ // a file.
+ const std::string output_path =
+ output_dir_ + "/" + label_str + "_reuse_distance";
+ std::ofstream out(output_path);
+ if (!out.is_open()) {
+ return;
+ }
+ std::string header("bucket");
+ for (auto const& label_it : label_distance_num_reuses) {
+ header += ",";
+ header += label_it.first;
+ }
+ out << header << std::endl;
+ for (auto const& bucket : distance_buckets) {
+ std::string row(std::to_string(bucket));
+ for (auto const& label_it : label_distance_num_reuses) {
+ auto const& it = label_it.second.find(bucket);
+ assert(it != label_it.second.end());
+ row += ",";
+ row += std::to_string(percent(it->second, total_num_reuses));
+ }
+ out << row << std::endl;
+ }
+ out.close();
+}
+
+void BlockCacheTraceAnalyzer::UpdateReuseIntervalStats(
+ const std::string& label, const std::vector<uint64_t>& time_buckets,
+ const std::map<uint64_t, uint64_t> timeline,
+ std::map<std::string, std::map<uint64_t, uint64_t>>* label_time_num_reuses,
+ uint64_t* total_num_reuses) const {
+ assert(label_time_num_reuses);
+ assert(total_num_reuses);
+ if (label_time_num_reuses->find(label) == label_time_num_reuses->end()) {
+ // The first time we encounter this label.
+ for (auto const& time_bucket : time_buckets) {
+ (*label_time_num_reuses)[label][time_bucket] = 0;
+ }
+ }
+ auto it = timeline.begin();
+ uint64_t prev_timestamp = it->first;
+ const uint64_t prev_num = it->second;
+ it++;
+ // Reused within one second.
+ if (prev_num > 1) {
+ (*label_time_num_reuses)[label].upper_bound(0)->second += prev_num - 1;
+ *total_num_reuses += prev_num - 1;
+ }
+ while (it != timeline.end()) {
+ const uint64_t timestamp = it->first;
+ const uint64_t num = it->second;
+ const uint64_t reuse_interval = timestamp - prev_timestamp;
+ (*label_time_num_reuses)[label].upper_bound(reuse_interval)->second += 1;
+ if (num > 1) {
+ (*label_time_num_reuses)[label].upper_bound(0)->second += num - 1;
+ }
+ prev_timestamp = timestamp;
+ *total_num_reuses += num;
+ it++;
+ }
+}
+
+void BlockCacheTraceAnalyzer::WriteStatsToFile(
+ const std::string& label_str, const std::vector<uint64_t>& time_buckets,
+ const std::string& filename_suffix,
+ const std::map<std::string, std::map<uint64_t, uint64_t>>& label_data,
+ uint64_t ntotal) const {
+ const std::string output_path =
+ output_dir_ + "/" + label_str + "_" + filename_suffix;
+ std::ofstream out(output_path);
+ if (!out.is_open()) {
+ return;
+ }
+ std::string header("bucket");
+ for (auto const& label_it : label_data) {
+ header += ",";
+ header += label_it.first;
+ }
+ out << header << std::endl;
+ for (auto const& bucket : time_buckets) {
+ std::string row(std::to_string(bucket));
+ for (auto const& label_it : label_data) {
+ auto const& it = label_it.second.find(bucket);
+ assert(it != label_it.second.end());
+ row += ",";
+ row += std::to_string(percent(it->second, ntotal));
+ }
+ out << row << std::endl;
+ }
+ out.close();
+}
+
+void BlockCacheTraceAnalyzer::WriteReuseInterval(
+ const std::string& label_str,
+ const std::vector<uint64_t>& time_buckets) const {
+ std::set<std::string> labels = ParseLabelStr(label_str);
+ std::map<std::string, std::map<uint64_t, uint64_t>> label_time_num_reuses;
+ std::map<std::string, std::map<uint64_t, uint64_t>> label_avg_reuse_nblocks;
+ std::map<std::string, std::map<uint64_t, uint64_t>> label_avg_reuse_naccesses;
+
+ uint64_t total_num_reuses = 0;
+ uint64_t total_nblocks = 0;
+ uint64_t total_accesses = 0;
+ auto block_callback = [&](const std::string& cf_name, uint64_t fd,
+ uint32_t level, TraceType type,
+ const std::string& /*block_key*/, uint64_t block_id,
+ const BlockAccessInfo& block) {
+ total_nblocks++;
+ total_accesses += block.num_accesses;
+ uint64_t avg_reuse_interval = 0;
+ if (block.num_accesses > 1) {
+ avg_reuse_interval = ((block.last_access_time - block.first_access_time) /
+ kMicrosInSecond) /
+ block.num_accesses;
+ } else {
+ avg_reuse_interval = std::numeric_limits<uint64_t>::max() - 1;
+ }
+ if (labels.find(kGroupbyCaller) != labels.end()) {
+ for (auto const& timeline : block.caller_num_accesses_timeline) {
+ const TableReaderCaller caller = timeline.first;
+ const std::string label = BuildLabel(labels, cf_name, fd, level, type,
+ caller, block_id, block);
+ UpdateReuseIntervalStats(label, time_buckets, timeline.second,
+ &label_time_num_reuses, &total_num_reuses);
+ }
+ return;
+ }
+ // Does not group by caller so we need to flatten the access timeline.
+ const std::string label = BuildLabel(
+ labels, cf_name, fd, level, type,
+ TableReaderCaller::kMaxBlockCacheLookupCaller, block_id, block);
+ std::map<uint64_t, uint64_t> timeline;
+ for (auto const& caller_timeline : block.caller_num_accesses_timeline) {
+ for (auto const& time_naccess : caller_timeline.second) {
+ timeline[time_naccess.first] += time_naccess.second;
+ }
+ }
+ UpdateReuseIntervalStats(label, time_buckets, timeline,
+ &label_time_num_reuses, &total_num_reuses);
+ if (label_avg_reuse_nblocks.find(label) == label_avg_reuse_nblocks.end()) {
+ for (auto const& time_bucket : time_buckets) {
+ label_avg_reuse_nblocks[label][time_bucket] = 0;
+ label_avg_reuse_naccesses[label][time_bucket] = 0;
+ }
+ }
+ label_avg_reuse_nblocks[label].upper_bound(avg_reuse_interval)->second += 1;
+ label_avg_reuse_naccesses[label].upper_bound(avg_reuse_interval)->second +=
+ block.num_accesses;
+ };
+ TraverseBlocks(block_callback, &labels);
+
+ // Write the stats into files.
+ WriteStatsToFile(label_str, time_buckets, kFileNameSuffixReuseInterval,
+ label_time_num_reuses, total_num_reuses);
+ WriteStatsToFile(label_str, time_buckets, kFileNameSuffixAvgReuseInterval,
+ label_avg_reuse_nblocks, total_nblocks);
+ WriteStatsToFile(label_str, time_buckets,
+ kFileNameSuffixAvgReuseIntervalNaccesses,
+ label_avg_reuse_naccesses, total_accesses);
+}
+
+void BlockCacheTraceAnalyzer::WriteReuseLifetime(
+ const std::string& label_str,
+ const std::vector<uint64_t>& time_buckets) const {
+ std::set<std::string> labels = ParseLabelStr(label_str);
+ std::map<std::string, std::map<uint64_t, uint64_t>> label_lifetime_nblocks;
+ uint64_t total_nblocks = 0;
+ auto block_callback = [&](const std::string& cf_name, uint64_t fd,
+ uint32_t level, TraceType type,
+ const std::string& /*block_key*/, uint64_t block_id,
+ const BlockAccessInfo& block) {
+ uint64_t lifetime = 0;
+ if (block.num_accesses > 1) {
+ lifetime =
+ (block.last_access_time - block.first_access_time) / kMicrosInSecond;
+ } else {
+ lifetime = std::numeric_limits<uint64_t>::max() - 1;
+ }
+ const std::string label = BuildLabel(
+ labels, cf_name, fd, level, type,
+ TableReaderCaller::kMaxBlockCacheLookupCaller, block_id, block);
+
+ if (label_lifetime_nblocks.find(label) == label_lifetime_nblocks.end()) {
+ // The first time we encounter this label.
+ for (auto const& time_bucket : time_buckets) {
+ label_lifetime_nblocks[label][time_bucket] = 0;
+ }
+ }
+ label_lifetime_nblocks[label].upper_bound(lifetime)->second += 1;
+ total_nblocks += 1;
+ };
+ TraverseBlocks(block_callback, &labels);
+ WriteStatsToFile(label_str, time_buckets, kFileNameSuffixReuseLifetime,
+ label_lifetime_nblocks, total_nblocks);
+}
+
+void BlockCacheTraceAnalyzer::WriteBlockReuseTimeline(
+ const uint64_t reuse_window, bool user_access_only,
+ TraceType block_type) const {
+ // A map from block key to an array of bools that states whether a block is
+ // accessed in a time window.
+ std::map<uint64_t, std::vector<bool>> block_accessed;
+ const uint64_t trace_duration =
+ trace_end_timestamp_in_seconds_ - trace_start_timestamp_in_seconds_;
+ const uint64_t reuse_vector_size = (trace_duration / reuse_window);
+ if (reuse_vector_size < 2) {
+ // The reuse window is less than 2. We cannot calculate the reused
+ // percentage of blocks.
+ return;
+ }
+ auto block_callback = [&](const std::string& /*cf_name*/, uint64_t /*fd*/,
+ uint32_t /*level*/, TraceType /*type*/,
+ const std::string& /*block_key*/, uint64_t block_id,
+ const BlockAccessInfo& block) {
+ if (block_accessed.find(block_id) == block_accessed.end()) {
+ block_accessed[block_id].resize(reuse_vector_size);
+ for (uint64_t i = 0; i < reuse_vector_size; i++) {
+ block_accessed[block_id][i] = false;
+ }
+ }
+ for (auto const& caller_num : block.caller_num_accesses_timeline) {
+ const TableReaderCaller caller = caller_num.first;
+ for (auto const& timeline : caller_num.second) {
+ const uint64_t timestamp = timeline.first;
+ const uint64_t elapsed_time =
+ timestamp - trace_start_timestamp_in_seconds_;
+ if (!user_access_only || is_user_access(caller)) {
+ uint64_t index =
+ std::min(elapsed_time / reuse_window, reuse_vector_size - 1);
+ block_accessed[block_id][index] = true;
+ }
+ }
+ }
+ };
+ TraverseBlocks(block_callback);
+
+ // A cell is the number of blocks accessed in a reuse window.
+ std::unique_ptr<uint64_t[]> reuse_table(
+ new uint64_t[reuse_vector_size * reuse_vector_size]);
+ for (uint64_t start_time = 0; start_time < reuse_vector_size; start_time++) {
+ // Initialize the reuse_table.
+ for (uint64_t i = 0; i < reuse_vector_size; i++) {
+ reuse_table[start_time * reuse_vector_size + i] = 0;
+ }
+ // Examine all blocks.
+ for (auto const& block : block_accessed) {
+ for (uint64_t i = start_time; i < reuse_vector_size; i++) {
+ if (block.second[start_time] && block.second[i]) {
+ // This block is accessed at start time and at the current time. We
+ // increment reuse_table[start_time][i] since it is reused at the ith
+ // window.
+ reuse_table[start_time * reuse_vector_size + i]++;
+ }
+ }
+ }
+ }
+ const std::string user_access_prefix =
+ user_access_only ? "_user_access_only_" : "_all_access_";
+ const std::string output_path =
+ output_dir_ + "/" + block_type_to_string(block_type) +
+ user_access_prefix + std::to_string(reuse_window) + "_" +
+ kFileNameSuffixAccessReuseBlocksTimeline;
+ std::ofstream out(output_path);
+ if (!out.is_open()) {
+ return;
+ }
+ std::string header("start_time");
+ for (uint64_t start_time = 0; start_time < reuse_vector_size; start_time++) {
+ header += ",";
+ header += std::to_string(start_time);
+ }
+ out << header << std::endl;
+ for (uint64_t start_time = 0; start_time < reuse_vector_size; start_time++) {
+ std::string row(std::to_string(start_time * reuse_window));
+ for (uint64_t j = 0; j < reuse_vector_size; j++) {
+ row += ",";
+ if (j < start_time) {
+ row += "100.0";
+ } else {
+ row += std::to_string(
+ percent(reuse_table[start_time * reuse_vector_size + j],
+ reuse_table[start_time * reuse_vector_size + start_time]));
+ }
+ }
+ out << row << std::endl;
+ }
+ out.close();
+}
+
+std::string BlockCacheTraceAnalyzer::OutputPercentAccessStats(
+ uint64_t total_accesses,
+ const std::map<std::string, uint64_t>& cf_access_count) const {
+ std::string row;
+ for (auto const& cf_aggregates : cf_aggregates_map_) {
+ const std::string& cf_name = cf_aggregates.first;
+ const auto& naccess = cf_access_count.find(cf_name);
+ row += ",";
+ if (naccess != cf_access_count.end()) {
+ row += std::to_string(percent(naccess->second, total_accesses));
+ } else {
+ row += "0";
+ }
+ }
+ return row;
+}
+
+void BlockCacheTraceAnalyzer::WritePercentAccessSummaryStats() const {
+ std::map<TableReaderCaller, std::map<std::string, uint64_t>>
+ caller_cf_accesses;
+ uint64_t total_accesses = 0;
+ auto block_callback =
+ [&](const std::string& cf_name, uint64_t /*fd*/, uint32_t /*level*/,
+ TraceType /*type*/, const std::string& /*block_key*/,
+ uint64_t /*block_id*/, const BlockAccessInfo& block) {
+ for (auto const& caller_num : block.caller_num_access_map) {
+ const TableReaderCaller caller = caller_num.first;
+ const uint64_t naccess = caller_num.second;
+ caller_cf_accesses[caller][cf_name] += naccess;
+ total_accesses += naccess;
+ }
+ };
+ TraverseBlocks(block_callback);
+
+ const std::string output_path =
+ output_dir_ + "/" + kFileNameSuffixPercentOfAccessSummary;
+ std::ofstream out(output_path);
+ if (!out.is_open()) {
+ return;
+ }
+ std::string header("caller");
+ for (auto const& cf_name : cf_aggregates_map_) {
+ header += ",";
+ header += cf_name.first;
+ }
+ out << header << std::endl;
+ for (auto const& cf_naccess_it : caller_cf_accesses) {
+ const TableReaderCaller caller = cf_naccess_it.first;
+ std::string row;
+ row += caller_to_string(caller);
+ row += OutputPercentAccessStats(total_accesses, cf_naccess_it.second);
+ out << row << std::endl;
+ }
+ out.close();
+}
+
+void BlockCacheTraceAnalyzer::WriteDetailedPercentAccessSummaryStats(
+ TableReaderCaller analyzing_caller) const {
+ std::map<uint32_t, std::map<std::string, uint64_t>> level_cf_accesses;
+ std::map<TraceType, std::map<std::string, uint64_t>> bt_cf_accesses;
+ uint64_t total_accesses = 0;
+ auto block_callback =
+ [&](const std::string& cf_name, uint64_t /*fd*/, uint32_t level,
+ TraceType type, const std::string& /*block_key*/,
+ uint64_t /*block_id*/, const BlockAccessInfo& block) {
+ for (auto const& caller_num : block.caller_num_access_map) {
+ const TableReaderCaller caller = caller_num.first;
+ if (caller == analyzing_caller) {
+ const uint64_t naccess = caller_num.second;
+ level_cf_accesses[level][cf_name] += naccess;
+ bt_cf_accesses[type][cf_name] += naccess;
+ total_accesses += naccess;
+ }
+ }
+ };
+ TraverseBlocks(block_callback);
+ {
+ const std::string output_path =
+ output_dir_ + "/" + caller_to_string(analyzing_caller) + "_level_" +
+ kFileNameSuffixPercentOfAccessSummary;
+ std::ofstream out(output_path);
+ if (!out.is_open()) {
+ return;
+ }
+ std::string header("level");
+ for (auto const& cf_name : cf_aggregates_map_) {
+ header += ",";
+ header += cf_name.first;
+ }
+ out << header << std::endl;
+ for (auto const& level_naccess_it : level_cf_accesses) {
+ const uint32_t level = level_naccess_it.first;
+ std::string row;
+ row += std::to_string(level);
+ row += OutputPercentAccessStats(total_accesses, level_naccess_it.second);
+ out << row << std::endl;
+ }
+ out.close();
+ }
+ {
+ const std::string output_path =
+ output_dir_ + "/" + caller_to_string(analyzing_caller) + "_bt_" +
+ kFileNameSuffixPercentOfAccessSummary;
+ std::ofstream out(output_path);
+ if (!out.is_open()) {
+ return;
+ }
+ std::string header("bt");
+ for (auto const& cf_name : cf_aggregates_map_) {
+ header += ",";
+ header += cf_name.first;
+ }
+ out << header << std::endl;
+ for (auto const& bt_naccess_it : bt_cf_accesses) {
+ const TraceType bt = bt_naccess_it.first;
+ std::string row;
+ row += block_type_to_string(bt);
+ row += OutputPercentAccessStats(total_accesses, bt_naccess_it.second);
+ out << row << std::endl;
+ }
+ out.close();
+ }
+}
+
+void BlockCacheTraceAnalyzer::WriteAccessCountSummaryStats(
+ const std::vector<uint64_t>& access_count_buckets,
+ bool user_access_only) const {
+ // x: buckets.
+ // y: # of accesses.
+ std::map<std::string, std::map<uint64_t, uint64_t>> bt_access_nblocks;
+ std::map<std::string, std::map<uint64_t, uint64_t>> cf_access_nblocks;
+ uint64_t total_nblocks = 0;
+ auto block_callback =
+ [&](const std::string& cf_name, uint64_t /*fd*/, uint32_t /*level*/,
+ TraceType type, const std::string& /*block_key*/,
+ uint64_t /*block_id*/, const BlockAccessInfo& block) {
+ const std::string type_str = block_type_to_string(type);
+ if (cf_access_nblocks.find(cf_name) == cf_access_nblocks.end()) {
+ // initialize.
+ for (auto& access : access_count_buckets) {
+ cf_access_nblocks[cf_name][access] = 0;
+ }
+ }
+ if (bt_access_nblocks.find(type_str) == bt_access_nblocks.end()) {
+ // initialize.
+ for (auto& access : access_count_buckets) {
+ bt_access_nblocks[type_str][access] = 0;
+ }
+ }
+ uint64_t naccesses = 0;
+ for (auto const& caller_access : block.caller_num_access_map) {
+ if (!user_access_only || is_user_access(caller_access.first)) {
+ naccesses += caller_access.second;
+ }
+ }
+ if (naccesses == 0) {
+ return;
+ }
+ total_nblocks += 1;
+ bt_access_nblocks[type_str].upper_bound(naccesses)->second += 1;
+ cf_access_nblocks[cf_name].upper_bound(naccesses)->second += 1;
+ };
+ TraverseBlocks(block_callback);
+ const std::string user_access_prefix =
+ user_access_only ? "user_access_only_" : "all_access_";
+ WriteStatsToFile("cf", access_count_buckets,
+ user_access_prefix + kFileNameSuffixAccessCountSummary,
+ cf_access_nblocks, total_nblocks);
+ WriteStatsToFile("bt", access_count_buckets,
+ user_access_prefix + kFileNameSuffixAccessCountSummary,
+ bt_access_nblocks, total_nblocks);
+}
+
+BlockCacheTraceAnalyzer::BlockCacheTraceAnalyzer(
+ const std::string& trace_file_path, const std::string& output_dir,
+ const std::string& human_readable_trace_file_path,
+ bool compute_reuse_distance, bool mrc_only,
+ bool is_human_readable_trace_file,
+ std::unique_ptr<BlockCacheTraceSimulator>&& cache_simulator)
+ : env_(ROCKSDB_NAMESPACE::Env::Default()),
+ trace_file_path_(trace_file_path),
+ output_dir_(output_dir),
+ human_readable_trace_file_path_(human_readable_trace_file_path),
+ compute_reuse_distance_(compute_reuse_distance),
+ mrc_only_(mrc_only),
+ is_human_readable_trace_file_(is_human_readable_trace_file),
+ cache_simulator_(std::move(cache_simulator)) {}
+
+void BlockCacheTraceAnalyzer::ComputeReuseDistance(
+ BlockAccessInfo* info) const {
+ assert(info);
+ if (info->num_accesses == 0) {
+ return;
+ }
+ uint64_t reuse_distance = 0;
+ for (auto const& block_key : info->unique_blocks_since_last_access) {
+ auto const& it = block_info_map_.find(block_key);
+ // This block must exist.
+ assert(it != block_info_map_.end());
+ reuse_distance += it->second->block_size;
+ }
+ info->reuse_distance_count[reuse_distance] += 1;
+ // We clear this hash set since this is the second access on this block.
+ info->unique_blocks_since_last_access.clear();
+}
+
+Status BlockCacheTraceAnalyzer::RecordAccess(
+ const BlockCacheTraceRecord& access) {
+ ColumnFamilyAccessInfoAggregate& cf_aggr = cf_aggregates_map_[access.cf_name];
+ SSTFileAccessInfoAggregate& file_aggr =
+ cf_aggr.fd_aggregates_map[access.sst_fd_number];
+ file_aggr.level = access.level;
+ BlockTypeAccessInfoAggregate& block_type_aggr =
+ file_aggr.block_type_aggregates_map[access.block_type];
+ if (block_type_aggr.block_access_info_map.find(access.block_key) ==
+ block_type_aggr.block_access_info_map.end()) {
+ block_type_aggr.block_access_info_map[access.block_key].block_id =
+ unique_block_id_;
+ unique_block_id_++;
+ }
+ BlockAccessInfo& block_access_info =
+ block_type_aggr.block_access_info_map[access.block_key];
+ if (compute_reuse_distance_) {
+ ComputeReuseDistance(&block_access_info);
+ }
+ block_access_info.AddAccess(access, access_sequence_number_);
+ block_info_map_[access.block_key] = &block_access_info;
+ uint64_t get_key_id = 0;
+ if (access.caller == TableReaderCaller::kUserGet &&
+ access.get_id != BlockCacheTraceHelper::kReservedGetId) {
+ std::string user_key = ExtractUserKey(access.referenced_key).ToString();
+ if (get_key_info_map_.find(user_key) == get_key_info_map_.end()) {
+ get_key_info_map_[user_key].key_id = unique_get_key_id_;
+ unique_get_key_id_++;
+ }
+ get_key_id = get_key_info_map_[user_key].key_id;
+ get_key_info_map_[user_key].AddAccess(access, access_sequence_number_);
+ }
+
+ if (compute_reuse_distance_) {
+ // Add this block to all existing blocks.
+ for (auto& cf_aggregates : cf_aggregates_map_) {
+ for (auto& file_aggregates : cf_aggregates.second.fd_aggregates_map) {
+ for (auto& block_type_aggregates :
+ file_aggregates.second.block_type_aggregates_map) {
+ for (auto& existing_block :
+ block_type_aggregates.second.block_access_info_map) {
+ existing_block.second.unique_blocks_since_last_access.insert(
+ access.block_key);
+ }
+ }
+ }
+ }
+ }
+ return human_readable_trace_writer_.WriteHumanReadableTraceRecord(
+ access, block_access_info.block_id, get_key_id);
+}
+
+Status BlockCacheTraceAnalyzer::Analyze() {
+ SystemClock* clock = env_->GetSystemClock().get();
+ std::unique_ptr<BlockCacheTraceReader> reader;
+ Status s = Status::OK();
+ if (is_human_readable_trace_file_) {
+ reader.reset(new BlockCacheHumanReadableTraceReader(trace_file_path_));
+ } else {
+ std::unique_ptr<TraceReader> trace_reader;
+ s = NewFileTraceReader(env_, EnvOptions(), trace_file_path_, &trace_reader);
+ if (!s.ok()) {
+ return s;
+ }
+ reader.reset(new BlockCacheTraceReader(std::move(trace_reader)));
+ s = reader->ReadHeader(&header_);
+ if (!s.ok()) {
+ return s;
+ }
+ }
+ if (!human_readable_trace_file_path_.empty()) {
+ s = human_readable_trace_writer_.NewWritableFile(
+ human_readable_trace_file_path_, env_);
+ if (!s.ok()) {
+ return s;
+ }
+ }
+ uint64_t start = clock->NowMicros();
+ uint64_t time_interval = 0;
+ while (s.ok()) {
+ BlockCacheTraceRecord access;
+ s = reader->ReadAccess(&access);
+ if (!s.ok()) {
+ break;
+ }
+ if (!mrc_only_) {
+ s = RecordAccess(access);
+ if (!s.ok()) {
+ break;
+ }
+ }
+ if (trace_start_timestamp_in_seconds_ == 0) {
+ trace_start_timestamp_in_seconds_ =
+ access.access_timestamp / kMicrosInSecond;
+ }
+ trace_end_timestamp_in_seconds_ = access.access_timestamp / kMicrosInSecond;
+ miss_ratio_stats_.UpdateMetrics(access.access_timestamp,
+ is_user_access(access.caller),
+ !access.is_cache_hit);
+ if (cache_simulator_) {
+ cache_simulator_->Access(access);
+ }
+ access_sequence_number_++;
+ uint64_t now = clock->NowMicros();
+ uint64_t duration = (now - start) / kMicrosInSecond;
+ if (duration > 10 * time_interval) {
+ uint64_t trace_duration =
+ trace_end_timestamp_in_seconds_ - trace_start_timestamp_in_seconds_;
+ fprintf(stdout,
+ "Running for %" PRIu64 " seconds: Processed %" PRIu64
+ " records/second. Trace duration %" PRIu64
+ " seconds. Observed miss ratio %.2f\n",
+ duration, duration > 0 ? access_sequence_number_ / duration : 0,
+ trace_duration, miss_ratio_stats_.miss_ratio());
+ time_interval++;
+ }
+ }
+ uint64_t now = clock->NowMicros();
+ uint64_t duration = (now - start) / kMicrosInSecond;
+ uint64_t trace_duration =
+ trace_end_timestamp_in_seconds_ - trace_start_timestamp_in_seconds_;
+ fprintf(stdout,
+ "Running for %" PRIu64 " seconds: Processed %" PRIu64
+ " records/second. Trace duration %" PRIu64
+ " seconds. Observed miss ratio %.2f\n",
+ duration, duration > 0 ? access_sequence_number_ / duration : 0,
+ trace_duration, miss_ratio_stats_.miss_ratio());
+ return s;
+}
+
+void BlockCacheTraceAnalyzer::PrintBlockSizeStats() const {
+ HistogramStat bs_stats;
+ std::map<TraceType, HistogramStat> bt_stats_map;
+ std::map<std::string, std::map<TraceType, HistogramStat>> cf_bt_stats_map;
+ auto block_callback =
+ [&](const std::string& cf_name, uint64_t /*fd*/, uint32_t /*level*/,
+ TraceType type, const std::string& /*block_key*/,
+ uint64_t /*block_id*/, const BlockAccessInfo& block) {
+ if (block.block_size == 0) {
+ // Block size may be 0 when 1) compaction observes a cache miss and
+ // does not insert the missing block into the cache again. 2)
+ // fetching filter blocks in SST files at the last level.
+ return;
+ }
+ bs_stats.Add(block.block_size);
+ bt_stats_map[type].Add(block.block_size);
+ cf_bt_stats_map[cf_name][type].Add(block.block_size);
+ };
+ TraverseBlocks(block_callback);
+ fprintf(stdout, "Block size stats: \n%s", bs_stats.ToString().c_str());
+ for (auto const& bt_stats : bt_stats_map) {
+ print_break_lines(/*num_break_lines=*/1);
+ fprintf(stdout, "Block size stats for block type %s: \n%s",
+ block_type_to_string(bt_stats.first).c_str(),
+ bt_stats.second.ToString().c_str());
+ }
+ for (auto const& cf_bt_stats : cf_bt_stats_map) {
+ const std::string& cf_name = cf_bt_stats.first;
+ for (auto const& bt_stats : cf_bt_stats.second) {
+ print_break_lines(/*num_break_lines=*/1);
+ fprintf(stdout,
+ "Block size stats for column family %s and block type %s: \n%s",
+ cf_name.c_str(), block_type_to_string(bt_stats.first).c_str(),
+ bt_stats.second.ToString().c_str());
+ }
+ }
+}
+
+void BlockCacheTraceAnalyzer::PrintAccessCountStats(bool user_access_only,
+ uint32_t bottom_k,
+ uint32_t top_k) const {
+ HistogramStat access_stats;
+ std::map<TraceType, HistogramStat> bt_stats_map;
+ std::map<std::string, std::map<TraceType, HistogramStat>> cf_bt_stats_map;
+ std::map<uint64_t, std::vector<std::string>> access_count_blocks;
+ auto block_callback = [&](const std::string& cf_name, uint64_t /*fd*/,
+ uint32_t /*level*/, TraceType type,
+ const std::string& block_key, uint64_t /*block_id*/,
+ const BlockAccessInfo& block) {
+ uint64_t naccesses = 0;
+ for (auto const& caller_access : block.caller_num_access_map) {
+ if (!user_access_only || is_user_access(caller_access.first)) {
+ naccesses += caller_access.second;
+ }
+ }
+ if (naccesses == 0) {
+ return;
+ }
+ if (type == TraceType::kBlockTraceDataBlock) {
+ access_count_blocks[naccesses].push_back(block_key);
+ }
+ access_stats.Add(naccesses);
+ bt_stats_map[type].Add(naccesses);
+ cf_bt_stats_map[cf_name][type].Add(naccesses);
+ };
+ TraverseBlocks(block_callback);
+ fprintf(stdout,
+ "Block access count stats: The number of accesses per block. %s\n%s",
+ user_access_only ? "User accesses only" : "All accesses",
+ access_stats.ToString().c_str());
+ uint32_t bottom_k_index = 0;
+ for (auto naccess_it = access_count_blocks.begin();
+ naccess_it != access_count_blocks.end(); naccess_it++) {
+ bottom_k_index++;
+ if (bottom_k_index >= bottom_k) {
+ break;
+ }
+ std::map<TableReaderCaller, uint64_t> caller_naccesses;
+ uint64_t naccesses = 0;
+ for (auto const& block_id : naccess_it->second) {
+ BlockAccessInfo* block = block_info_map_.find(block_id)->second;
+ for (auto const& caller_access : block->caller_num_access_map) {
+ if (!user_access_only || is_user_access(caller_access.first)) {
+ caller_naccesses[caller_access.first] += caller_access.second;
+ naccesses += caller_access.second;
+ }
+ }
+ }
+ std::string statistics("Caller:");
+ for (auto const& caller_naccessess_it : caller_naccesses) {
+ statistics += caller_to_string(caller_naccessess_it.first);
+ statistics += ":";
+ statistics +=
+ std::to_string(percent(caller_naccessess_it.second, naccesses));
+ statistics += ",";
+ }
+ fprintf(stdout,
+ "Bottom %" PRIu32 " access count. Access count=%" PRIu64
+ " nblocks=%" ROCKSDB_PRIszt " %s\n",
+ bottom_k, naccess_it->first, naccess_it->second.size(),
+ statistics.c_str());
+ }
+
+ uint32_t top_k_index = 0;
+ for (auto naccess_it = access_count_blocks.rbegin();
+ naccess_it != access_count_blocks.rend(); naccess_it++) {
+ top_k_index++;
+ if (top_k_index >= top_k) {
+ break;
+ }
+ for (auto const& block_id : naccess_it->second) {
+ BlockAccessInfo* block = block_info_map_.find(block_id)->second;
+ std::string statistics("Caller:");
+ uint64_t naccesses = 0;
+ for (auto const& caller_access : block->caller_num_access_map) {
+ if (!user_access_only || is_user_access(caller_access.first)) {
+ naccesses += caller_access.second;
+ }
+ }
+ assert(naccesses > 0);
+ for (auto const& caller_access : block->caller_num_access_map) {
+ if (!user_access_only || is_user_access(caller_access.first)) {
+ statistics += ",";
+ statistics += caller_to_string(caller_access.first);
+ statistics += ":";
+ statistics +=
+ std::to_string(percent(caller_access.second, naccesses));
+ }
+ }
+ uint64_t ref_keys_accesses = 0;
+ uint64_t ref_keys_does_not_exist_accesses = 0;
+ for (auto const& ref_key_caller_access : block->key_num_access_map) {
+ for (auto const& caller_access : ref_key_caller_access.second) {
+ if (!user_access_only || is_user_access(caller_access.first)) {
+ ref_keys_accesses += caller_access.second;
+ }
+ }
+ }
+ for (auto const& ref_key_caller_access :
+ block->non_exist_key_num_access_map) {
+ for (auto const& caller_access : ref_key_caller_access.second) {
+ if (!user_access_only || is_user_access(caller_access.first)) {
+ ref_keys_does_not_exist_accesses += caller_access.second;
+ }
+ }
+ }
+ statistics += ",nkeys=";
+ statistics += std::to_string(block->num_keys);
+ statistics += ",block_size=";
+ statistics += std::to_string(block->block_size);
+ statistics += ",num_ref_keys=";
+ statistics += std::to_string(block->key_num_access_map.size());
+ statistics += ",percent_access_ref_keys=";
+ statistics += std::to_string(percent(ref_keys_accesses, naccesses));
+ statistics += ",num_ref_keys_does_not_exist=";
+ statistics += std::to_string(block->non_exist_key_num_access_map.size());
+ statistics += ",percent_access_ref_keys_does_not_exist=";
+ statistics +=
+ std::to_string(percent(ref_keys_does_not_exist_accesses, naccesses));
+ statistics += ",ref_data_size=";
+ statistics += std::to_string(block->referenced_data_size);
+ fprintf(stdout,
+ "Top %" PRIu32 " access count blocks access_count=%" PRIu64
+ " %s\n",
+ top_k, naccess_it->first, statistics.c_str());
+ }
+ }
+
+ for (auto const& bt_stats : bt_stats_map) {
+ print_break_lines(/*num_break_lines=*/1);
+ fprintf(stdout, "Break down by block type %s: \n%s",
+ block_type_to_string(bt_stats.first).c_str(),
+ bt_stats.second.ToString().c_str());
+ }
+ for (auto const& cf_bt_stats : cf_bt_stats_map) {
+ const std::string& cf_name = cf_bt_stats.first;
+ for (auto const& bt_stats : cf_bt_stats.second) {
+ print_break_lines(/*num_break_lines=*/1);
+ fprintf(stdout,
+ "Break down by column family %s and block type "
+ "%s: \n%s",
+ cf_name.c_str(), block_type_to_string(bt_stats.first).c_str(),
+ bt_stats.second.ToString().c_str());
+ }
+ }
+}
+
+void BlockCacheTraceAnalyzer::PrintDataBlockAccessStats() const {
+ HistogramStat existing_keys_stats;
+ std::map<std::string, HistogramStat> cf_existing_keys_stats_map;
+ HistogramStat non_existing_keys_stats;
+ std::map<std::string, HistogramStat> cf_non_existing_keys_stats_map;
+ HistogramStat block_access_stats;
+ std::map<std::string, HistogramStat> cf_block_access_info;
+ HistogramStat percent_referenced_bytes;
+ std::map<std::string, HistogramStat> cf_percent_referenced_bytes;
+ // Total number of accesses in a data block / number of keys in a data block.
+ HistogramStat avg_naccesses_per_key_in_a_data_block;
+ std::map<std::string, HistogramStat> cf_avg_naccesses_per_key_in_a_data_block;
+ // The standard deviation on the number of accesses of a key in a data block.
+ HistogramStat stdev_naccesses_per_key_in_a_data_block;
+ std::map<std::string, HistogramStat>
+ cf_stdev_naccesses_per_key_in_a_data_block;
+ auto block_callback =
+ [&](const std::string& cf_name, uint64_t /*fd*/, uint32_t /*level*/,
+ TraceType /*type*/, const std::string& /*block_key*/,
+ uint64_t /*block_id*/, const BlockAccessInfo& block) {
+ if (block.num_keys == 0) {
+ return;
+ }
+ // Use four decimal points.
+ uint64_t percent_referenced_for_existing_keys =
+ (uint64_t)(((double)block.key_num_access_map.size() /
+ (double)block.num_keys) *
+ 10000.0);
+ uint64_t percent_referenced_for_non_existing_keys =
+ (uint64_t)(((double)block.non_exist_key_num_access_map.size() /
+ (double)block.num_keys) *
+ 10000.0);
+ uint64_t percent_accesses_for_existing_keys =
+ (uint64_t)(((double)block.num_referenced_key_exist_in_block /
+ (double)block.num_accesses) *
+ 10000.0);
+
+ HistogramStat hist_naccess_per_key;
+ for (auto const& key_access : block.key_num_access_map) {
+ for (auto const& caller_access : key_access.second) {
+ hist_naccess_per_key.Add(caller_access.second);
+ }
+ }
+ uint64_t avg_accesses =
+ static_cast<uint64_t>(hist_naccess_per_key.Average());
+ uint64_t stdev_accesses =
+ static_cast<uint64_t>(hist_naccess_per_key.StandardDeviation());
+ avg_naccesses_per_key_in_a_data_block.Add(avg_accesses);
+ cf_avg_naccesses_per_key_in_a_data_block[cf_name].Add(avg_accesses);
+ stdev_naccesses_per_key_in_a_data_block.Add(stdev_accesses);
+ cf_stdev_naccesses_per_key_in_a_data_block[cf_name].Add(stdev_accesses);
+
+ existing_keys_stats.Add(percent_referenced_for_existing_keys);
+ cf_existing_keys_stats_map[cf_name].Add(
+ percent_referenced_for_existing_keys);
+ non_existing_keys_stats.Add(percent_referenced_for_non_existing_keys);
+ cf_non_existing_keys_stats_map[cf_name].Add(
+ percent_referenced_for_non_existing_keys);
+ block_access_stats.Add(percent_accesses_for_existing_keys);
+ cf_block_access_info[cf_name].Add(percent_accesses_for_existing_keys);
+ };
+ TraverseBlocks(block_callback);
+ fprintf(stdout,
+ "Histogram on the number of referenced keys existing in a block over "
+ "the total number of keys in a block: \n%s",
+ existing_keys_stats.ToString().c_str());
+ for (auto const& cf_stats : cf_existing_keys_stats_map) {
+ print_break_lines(/*num_break_lines=*/1);
+ fprintf(stdout, "Break down by column family %s: \n%s",
+ cf_stats.first.c_str(), cf_stats.second.ToString().c_str());
+ }
+ print_break_lines(/*num_break_lines=*/1);
+ fprintf(
+ stdout,
+ "Histogram on the number of referenced keys DO NOT exist in a block over "
+ "the total number of keys in a block: \n%s",
+ non_existing_keys_stats.ToString().c_str());
+ for (auto const& cf_stats : cf_non_existing_keys_stats_map) {
+ print_break_lines(/*num_break_lines=*/1);
+ fprintf(stdout, "Break down by column family %s: \n%s",
+ cf_stats.first.c_str(), cf_stats.second.ToString().c_str());
+ }
+ print_break_lines(/*num_break_lines=*/1);
+ fprintf(stdout,
+ "Histogram on the number of accesses on keys exist in a block over "
+ "the total number of accesses in a block: \n%s",
+ block_access_stats.ToString().c_str());
+ for (auto const& cf_stats : cf_block_access_info) {
+ print_break_lines(/*num_break_lines=*/1);
+ fprintf(stdout, "Break down by column family %s: \n%s",
+ cf_stats.first.c_str(), cf_stats.second.ToString().c_str());
+ }
+ print_break_lines(/*num_break_lines=*/1);
+ fprintf(
+ stdout,
+ "Histogram on the average number of accesses per key in a block: \n%s",
+ avg_naccesses_per_key_in_a_data_block.ToString().c_str());
+ for (auto const& cf_stats : cf_avg_naccesses_per_key_in_a_data_block) {
+ fprintf(stdout, "Break down by column family %s: \n%s",
+ cf_stats.first.c_str(), cf_stats.second.ToString().c_str());
+ }
+ print_break_lines(/*num_break_lines=*/1);
+ fprintf(stdout,
+ "Histogram on the standard deviation of the number of accesses per "
+ "key in a block: \n%s",
+ stdev_naccesses_per_key_in_a_data_block.ToString().c_str());
+ for (auto const& cf_stats : cf_stdev_naccesses_per_key_in_a_data_block) {
+ fprintf(stdout, "Break down by column family %s: \n%s",
+ cf_stats.first.c_str(), cf_stats.second.ToString().c_str());
+ }
+}
+
+void BlockCacheTraceAnalyzer::PrintStatsSummary() const {
+ uint64_t total_num_files = 0;
+ uint64_t total_num_blocks = 0;
+ uint64_t total_num_accesses = 0;
+ std::map<TraceType, uint64_t> bt_num_blocks_map;
+ std::map<TableReaderCaller, uint64_t> caller_num_access_map;
+ std::map<TableReaderCaller, std::map<TraceType, uint64_t>>
+ caller_bt_num_access_map;
+ std::map<TableReaderCaller, std::map<uint32_t, uint64_t>>
+ caller_level_num_access_map;
+ for (auto const& cf_aggregates : cf_aggregates_map_) {
+ // Stats per column family.
+ const std::string& cf_name = cf_aggregates.first;
+ uint64_t cf_num_files = 0;
+ uint64_t cf_num_blocks = 0;
+ std::map<TraceType, uint64_t> cf_bt_blocks;
+ uint64_t cf_num_accesses = 0;
+ std::map<TableReaderCaller, uint64_t> cf_caller_num_accesses_map;
+ std::map<TableReaderCaller, std::map<uint64_t, uint64_t>>
+ cf_caller_level_num_accesses_map;
+ std::map<TableReaderCaller, std::map<uint64_t, uint64_t>>
+ cf_caller_file_num_accesses_map;
+ std::map<TableReaderCaller, std::map<TraceType, uint64_t>>
+ cf_caller_bt_num_accesses_map;
+ total_num_files += cf_aggregates.second.fd_aggregates_map.size();
+ for (auto const& file_aggregates : cf_aggregates.second.fd_aggregates_map) {
+ // Stats per SST file.
+ const uint64_t fd = file_aggregates.first;
+ const uint32_t level = file_aggregates.second.level;
+ cf_num_files++;
+ for (auto const& block_type_aggregates :
+ file_aggregates.second.block_type_aggregates_map) {
+ // Stats per block type.
+ const TraceType type = block_type_aggregates.first;
+ cf_bt_blocks[type] +=
+ block_type_aggregates.second.block_access_info_map.size();
+ total_num_blocks +=
+ block_type_aggregates.second.block_access_info_map.size();
+ bt_num_blocks_map[type] +=
+ block_type_aggregates.second.block_access_info_map.size();
+ for (auto const& block_access_info :
+ block_type_aggregates.second.block_access_info_map) {
+ // Stats per block.
+ cf_num_blocks++;
+ for (auto const& stats :
+ block_access_info.second.caller_num_access_map) {
+ // Stats per caller.
+ const TableReaderCaller caller = stats.first;
+ const uint64_t num_accesses = stats.second;
+ // Overall stats.
+ total_num_accesses += num_accesses;
+ caller_num_access_map[caller] += num_accesses;
+ caller_bt_num_access_map[caller][type] += num_accesses;
+ caller_level_num_access_map[caller][level] += num_accesses;
+ // Column Family stats.
+ cf_num_accesses += num_accesses;
+ cf_caller_num_accesses_map[caller] += num_accesses;
+ cf_caller_level_num_accesses_map[caller][level] += num_accesses;
+ cf_caller_file_num_accesses_map[caller][fd] += num_accesses;
+ cf_caller_bt_num_accesses_map[caller][type] += num_accesses;
+ }
+ }
+ }
+ }
+
+ // Print stats.
+ print_break_lines(/*num_break_lines=*/3);
+ fprintf(stdout, "Statistics for column family %s:\n", cf_name.c_str());
+ fprintf(stdout,
+ " Number of files:%" PRIu64 " Number of blocks: %" PRIu64
+ " Number of accesses: %" PRIu64 "\n",
+ cf_num_files, cf_num_blocks, cf_num_accesses);
+ for (auto block_type : cf_bt_blocks) {
+ fprintf(stdout, "Number of %s blocks: %" PRIu64 " Percent: %.2f\n",
+ block_type_to_string(block_type.first).c_str(), block_type.second,
+ percent(block_type.second, cf_num_blocks));
+ }
+ for (auto caller : cf_caller_num_accesses_map) {
+ const uint64_t naccesses = caller.second;
+ print_break_lines(/*num_break_lines=*/1);
+ fprintf(stdout,
+ "Caller %s: Number of accesses %" PRIu64 " Percent: %.2f\n",
+ caller_to_string(caller.first).c_str(), naccesses,
+ percent(naccesses, cf_num_accesses));
+ fprintf(stdout, "Caller %s: Number of accesses per level break down\n",
+ caller_to_string(caller.first).c_str());
+ for (auto naccess_level :
+ cf_caller_level_num_accesses_map[caller.first]) {
+ fprintf(stdout,
+ "\t Level %" PRIu64 ": Number of accesses: %" PRIu64
+ " Percent: %.2f\n",
+ naccess_level.first, naccess_level.second,
+ percent(naccess_level.second, naccesses));
+ }
+ fprintf(stdout, "Caller %s: Number of accesses per file break down\n",
+ caller_to_string(caller.first).c_str());
+ for (auto naccess_file : cf_caller_file_num_accesses_map[caller.first]) {
+ fprintf(stdout,
+ "\t File %" PRIu64 ": Number of accesses: %" PRIu64
+ " Percent: %.2f\n",
+ naccess_file.first, naccess_file.second,
+ percent(naccess_file.second, naccesses));
+ }
+ fprintf(stdout,
+ "Caller %s: Number of accesses per block type break down\n",
+ caller_to_string(caller.first).c_str());
+ for (auto naccess_type : cf_caller_bt_num_accesses_map[caller.first]) {
+ fprintf(stdout,
+ "\t Block Type %s: Number of accesses: %" PRIu64
+ " Percent: %.2f\n",
+ block_type_to_string(naccess_type.first).c_str(),
+ naccess_type.second, percent(naccess_type.second, naccesses));
+ }
+ }
+ }
+ print_break_lines(/*num_break_lines=*/3);
+ fprintf(stdout, "Overall statistics:\n");
+ fprintf(stdout,
+ "Number of files: %" PRIu64 " Number of blocks: %" PRIu64
+ " Number of accesses: %" PRIu64 "\n",
+ total_num_files, total_num_blocks, total_num_accesses);
+ for (auto block_type : bt_num_blocks_map) {
+ fprintf(stdout, "Number of %s blocks: %" PRIu64 " Percent: %.2f\n",
+ block_type_to_string(block_type.first).c_str(), block_type.second,
+ percent(block_type.second, total_num_blocks));
+ }
+ for (auto caller : caller_num_access_map) {
+ print_break_lines(/*num_break_lines=*/1);
+ uint64_t naccesses = caller.second;
+ fprintf(stdout, "Caller %s: Number of accesses %" PRIu64 " Percent: %.2f\n",
+ caller_to_string(caller.first).c_str(), naccesses,
+ percent(naccesses, total_num_accesses));
+ fprintf(stdout, "Caller %s: Number of accesses per level break down\n",
+ caller_to_string(caller.first).c_str());
+ for (auto naccess_level : caller_level_num_access_map[caller.first]) {
+ fprintf(stdout,
+ "\t Level %d: Number of accesses: %" PRIu64 " Percent: %.2f\n",
+ naccess_level.first, naccess_level.second,
+ percent(naccess_level.second, naccesses));
+ }
+ fprintf(stdout, "Caller %s: Number of accesses per block type break down\n",
+ caller_to_string(caller.first).c_str());
+ for (auto naccess_type : caller_bt_num_access_map[caller.first]) {
+ fprintf(stdout,
+ "\t Block Type %s: Number of accesses: %" PRIu64
+ " Percent: %.2f\n",
+ block_type_to_string(naccess_type.first).c_str(),
+ naccess_type.second, percent(naccess_type.second, naccesses));
+ }
+ }
+}
+
+std::vector<CacheConfiguration> parse_cache_config_file(
+ const std::string& config_path) {
+ std::ifstream file(config_path);
+ if (!file.is_open()) {
+ return {};
+ }
+ std::vector<CacheConfiguration> configs;
+ std::string line;
+ while (getline(file, line)) {
+ CacheConfiguration cache_config;
+ std::stringstream ss(line);
+ std::vector<std::string> config_strs;
+ while (ss.good()) {
+ std::string substr;
+ getline(ss, substr, ',');
+ config_strs.push_back(substr);
+ }
+ // Sanity checks.
+ if (config_strs.size() < 4) {
+ fprintf(stderr, "Invalid cache simulator configuration %s\n",
+ line.c_str());
+ exit(1);
+ }
+ if (kSupportedCacheNames.find(" " + config_strs[0] + " ") ==
+ std::string::npos) {
+ fprintf(stderr, "Invalid cache name %s. Supported cache names are %s\n",
+ line.c_str(), kSupportedCacheNames.c_str());
+ exit(1);
+ }
+ cache_config.cache_name = config_strs[0];
+ cache_config.num_shard_bits = ParseUint32(config_strs[1]);
+ cache_config.ghost_cache_capacity = ParseUint64(config_strs[2]);
+ for (uint32_t i = 3; i < config_strs.size(); i++) {
+ uint64_t capacity = ParseUint64(config_strs[i]);
+ if (capacity == 0) {
+ fprintf(stderr, "Invalid cache capacity %s, %s\n",
+ config_strs[i].c_str(), line.c_str());
+ exit(1);
+ }
+ cache_config.cache_capacities.push_back(capacity);
+ }
+ configs.push_back(cache_config);
+ }
+ file.close();
+ return configs;
+}
+
+std::vector<uint64_t> parse_buckets(const std::string& bucket_str) {
+ std::vector<uint64_t> buckets;
+ std::stringstream ss(bucket_str);
+ while (ss.good()) {
+ std::string bucket;
+ getline(ss, bucket, ',');
+ buckets.push_back(ParseUint64(bucket));
+ }
+ buckets.push_back(std::numeric_limits<uint64_t>::max());
+ return buckets;
+}
+
+int block_cache_trace_analyzer_tool(int argc, char** argv) {
+ ParseCommandLineFlags(&argc, &argv, true);
+ if (FLAGS_block_cache_trace_path.empty()) {
+ fprintf(stderr, "block cache trace path is empty\n");
+ exit(1);
+ }
+ uint64_t warmup_seconds =
+ FLAGS_cache_sim_warmup_seconds > 0 ? FLAGS_cache_sim_warmup_seconds : 0;
+ uint32_t downsample_ratio = FLAGS_block_cache_trace_downsample_ratio > 0
+ ? FLAGS_block_cache_trace_downsample_ratio
+ : 0;
+ std::vector<CacheConfiguration> cache_configs =
+ parse_cache_config_file(FLAGS_block_cache_sim_config_path);
+ std::unique_ptr<BlockCacheTraceSimulator> cache_simulator;
+ if (!cache_configs.empty()) {
+ cache_simulator.reset(new BlockCacheTraceSimulator(
+ warmup_seconds, downsample_ratio, cache_configs));
+ Status s = cache_simulator->InitializeCaches();
+ if (!s.ok()) {
+ fprintf(stderr, "Cannot initialize cache simulators %s\n",
+ s.ToString().c_str());
+ exit(1);
+ }
+ }
+ BlockCacheTraceAnalyzer analyzer(
+ FLAGS_block_cache_trace_path, FLAGS_block_cache_analysis_result_dir,
+ FLAGS_human_readable_trace_file_path,
+ !FLAGS_reuse_distance_labels.empty(), FLAGS_mrc_only,
+ FLAGS_is_block_cache_human_readable_trace, std::move(cache_simulator));
+ Status s = analyzer.Analyze();
+ if (!s.IsIncomplete() && !s.ok()) {
+ // Read all traces.
+ fprintf(stderr, "Cannot process the trace %s\n", s.ToString().c_str());
+ exit(1);
+ }
+ fprintf(stdout, "Status: %s\n", s.ToString().c_str());
+ analyzer.WriteMissRatioCurves();
+ analyzer.WriteMissRatioTimeline(1);
+ analyzer.WriteMissRatioTimeline(kSecondInMinute);
+ analyzer.WriteMissRatioTimeline(kSecondInHour);
+ analyzer.WriteMissTimeline(1);
+ analyzer.WriteMissTimeline(kSecondInMinute);
+ analyzer.WriteMissTimeline(kSecondInHour);
+
+ if (FLAGS_mrc_only) {
+ fprintf(stdout,
+ "Skipping the analysis statistics since the user wants to compute "
+ "MRC only");
+ return 0;
+ }
+
+ analyzer.PrintStatsSummary();
+ if (FLAGS_print_access_count_stats) {
+ print_break_lines(/*num_break_lines=*/3);
+ analyzer.PrintAccessCountStats(
+ /*user_access_only=*/false, FLAGS_analyze_bottom_k_access_count_blocks,
+ FLAGS_analyze_top_k_access_count_blocks);
+ print_break_lines(/*num_break_lines=*/3);
+ analyzer.PrintAccessCountStats(
+ /*user_access_only=*/true, FLAGS_analyze_bottom_k_access_count_blocks,
+ FLAGS_analyze_top_k_access_count_blocks);
+ }
+ if (FLAGS_print_block_size_stats) {
+ print_break_lines(/*num_break_lines=*/3);
+ analyzer.PrintBlockSizeStats();
+ }
+ if (FLAGS_print_data_block_access_count_stats) {
+ print_break_lines(/*num_break_lines=*/3);
+ analyzer.PrintDataBlockAccessStats();
+ }
+ print_break_lines(/*num_break_lines=*/3);
+
+ if (!FLAGS_timeline_labels.empty()) {
+ std::stringstream ss(FLAGS_timeline_labels);
+ while (ss.good()) {
+ std::string label;
+ getline(ss, label, ',');
+ if (label.find("block") != std::string::npos) {
+ analyzer.WriteAccessTimeline(label, kSecondInMinute, true);
+ analyzer.WriteAccessTimeline(label, kSecondInMinute, false);
+ analyzer.WriteAccessTimeline(label, kSecondInHour, true);
+ analyzer.WriteAccessTimeline(label, kSecondInHour, false);
+ } else {
+ analyzer.WriteAccessTimeline(label, kSecondInMinute, false);
+ analyzer.WriteAccessTimeline(label, kSecondInHour, false);
+ }
+ }
+ }
+
+ if (!FLAGS_analyze_callers.empty()) {
+ analyzer.WritePercentAccessSummaryStats();
+ std::stringstream ss(FLAGS_analyze_callers);
+ while (ss.good()) {
+ std::string caller;
+ getline(ss, caller, ',');
+ analyzer.WriteDetailedPercentAccessSummaryStats(string_to_caller(caller));
+ }
+ }
+
+ if (!FLAGS_access_count_buckets.empty()) {
+ std::vector<uint64_t> buckets = parse_buckets(FLAGS_access_count_buckets);
+ analyzer.WriteAccessCountSummaryStats(buckets, /*user_access_only=*/true);
+ analyzer.WriteAccessCountSummaryStats(buckets, /*user_access_only=*/false);
+ }
+
+ if (!FLAGS_reuse_distance_labels.empty() &&
+ !FLAGS_reuse_distance_buckets.empty()) {
+ std::vector<uint64_t> buckets = parse_buckets(FLAGS_reuse_distance_buckets);
+ std::stringstream ss(FLAGS_reuse_distance_labels);
+ while (ss.good()) {
+ std::string label;
+ getline(ss, label, ',');
+ analyzer.WriteReuseDistance(label, buckets);
+ }
+ }
+
+ if (!FLAGS_reuse_interval_labels.empty() &&
+ !FLAGS_reuse_interval_buckets.empty()) {
+ std::vector<uint64_t> buckets = parse_buckets(FLAGS_reuse_interval_buckets);
+ std::stringstream ss(FLAGS_reuse_interval_labels);
+ while (ss.good()) {
+ std::string label;
+ getline(ss, label, ',');
+ analyzer.WriteReuseInterval(label, buckets);
+ }
+ }
+
+ if (!FLAGS_reuse_lifetime_labels.empty() &&
+ !FLAGS_reuse_lifetime_buckets.empty()) {
+ std::vector<uint64_t> buckets = parse_buckets(FLAGS_reuse_lifetime_buckets);
+ std::stringstream ss(FLAGS_reuse_lifetime_labels);
+ while (ss.good()) {
+ std::string label;
+ getline(ss, label, ',');
+ analyzer.WriteReuseLifetime(label, buckets);
+ }
+ }
+
+ if (FLAGS_analyze_blocks_reuse_k_reuse_window != 0) {
+ std::vector<TraceType> block_types{TraceType::kBlockTraceIndexBlock,
+ TraceType::kBlockTraceDataBlock,
+ TraceType::kBlockTraceFilterBlock};
+ for (auto block_type : block_types) {
+ analyzer.WriteBlockReuseTimeline(
+ FLAGS_analyze_blocks_reuse_k_reuse_window,
+ /*user_access_only=*/true, block_type);
+ analyzer.WriteBlockReuseTimeline(
+ FLAGS_analyze_blocks_reuse_k_reuse_window,
+ /*user_access_only=*/false, block_type);
+ }
+ }
+
+ if (!FLAGS_analyze_get_spatial_locality_labels.empty() &&
+ !FLAGS_analyze_get_spatial_locality_buckets.empty()) {
+ std::vector<uint64_t> buckets =
+ parse_buckets(FLAGS_analyze_get_spatial_locality_buckets);
+ std::stringstream ss(FLAGS_analyze_get_spatial_locality_labels);
+ while (ss.good()) {
+ std::string label;
+ getline(ss, label, ',');
+ analyzer.WriteGetSpatialLocality(label, buckets);
+ }
+ }
+
+ if (!FLAGS_analyze_correlation_coefficients_labels.empty()) {
+ std::stringstream ss(FLAGS_analyze_correlation_coefficients_labels);
+ while (ss.good()) {
+ std::string label;
+ getline(ss, label, ',');
+ analyzer.WriteCorrelationFeatures(
+ label, FLAGS_analyze_correlation_coefficients_max_number_of_values);
+ }
+ analyzer.WriteCorrelationFeaturesForGet(
+ FLAGS_analyze_correlation_coefficients_max_number_of_values);
+ }
+
+ if (!FLAGS_skew_labels.empty() && !FLAGS_skew_buckets.empty()) {
+ std::vector<uint64_t> buckets = parse_buckets(FLAGS_skew_buckets);
+ std::stringstream ss(FLAGS_skew_labels);
+ while (ss.good()) {
+ std::string label;
+ getline(ss, label, ',');
+ if (label.find("block") != std::string::npos) {
+ analyzer.WriteSkewness(label, buckets,
+ TraceType::kBlockTraceIndexBlock);
+ analyzer.WriteSkewness(label, buckets,
+ TraceType::kBlockTraceFilterBlock);
+ analyzer.WriteSkewness(label, buckets, TraceType::kBlockTraceDataBlock);
+ analyzer.WriteSkewness(label, buckets, TraceType::kTraceMax);
+ } else {
+ analyzer.WriteSkewness(label, buckets, TraceType::kTraceMax);
+ }
+ }
+ }
+ return 0;
+}
+
+} // namespace ROCKSDB_NAMESPACE
+
+#endif // GFLAGS
+#endif // ROCKSDB_LITE
diff --git a/src/rocksdb/tools/block_cache_analyzer/block_cache_trace_analyzer.h b/src/rocksdb/tools/block_cache_analyzer/block_cache_trace_analyzer.h
new file mode 100644
index 000000000..2f1ebd139
--- /dev/null
+++ b/src/rocksdb/tools/block_cache_analyzer/block_cache_trace_analyzer.h
@@ -0,0 +1,397 @@
+// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
+// This source code is licensed under both the GPLv2 (found in the
+// COPYING file in the root directory) and Apache 2.0 License
+// (found in the LICENSE.Apache file in the root directory).
+
+#pragma once
+
+#include <map>
+#include <set>
+#include <vector>
+
+#include "db/dbformat.h"
+#include "rocksdb/env.h"
+#include "rocksdb/trace_record.h"
+#include "rocksdb/utilities/sim_cache.h"
+#include "trace_replay/block_cache_tracer.h"
+#include "utilities/simulator_cache/cache_simulator.h"
+
+namespace ROCKSDB_NAMESPACE {
+
+// Statistics of a key refereneced by a Get.
+struct GetKeyInfo {
+ uint64_t key_id = 0;
+ std::vector<uint64_t> access_sequence_number_timeline;
+ std::vector<uint64_t> access_timeline;
+
+ void AddAccess(const BlockCacheTraceRecord& access,
+ uint64_t access_sequnce_number) {
+ access_sequence_number_timeline.push_back(access_sequnce_number);
+ access_timeline.push_back(access.access_timestamp);
+ }
+};
+
+// Statistics of a block.
+struct BlockAccessInfo {
+ uint64_t block_id = 0;
+ uint64_t table_id = 0;
+ uint64_t block_offset = 0;
+ uint64_t num_accesses = 0;
+ uint64_t block_size = 0;
+ uint64_t first_access_time = 0;
+ uint64_t last_access_time = 0;
+ uint64_t num_keys = 0;
+ std::map<std::string, std::map<TableReaderCaller, uint64_t>>
+ key_num_access_map; // for keys exist in this block.
+ std::map<std::string, std::map<TableReaderCaller, uint64_t>>
+ non_exist_key_num_access_map; // for keys do not exist in this block.
+ uint64_t num_referenced_key_exist_in_block = 0;
+ uint64_t referenced_data_size = 0;
+ std::map<TableReaderCaller, uint64_t> caller_num_access_map;
+ // caller:timestamp:number_of_accesses. The granularity of the timestamp is
+ // seconds.
+ std::map<TableReaderCaller, std::map<uint64_t, uint64_t>>
+ caller_num_accesses_timeline;
+ // Unique blocks since the last access.
+ std::set<std::string> unique_blocks_since_last_access;
+ // Number of reuses grouped by reuse distance.
+ std::map<uint64_t, uint64_t> reuse_distance_count;
+
+ // The access sequence numbers of this block.
+ std::vector<uint64_t> access_sequence_number_timeline;
+ std::map<TableReaderCaller, std::vector<uint64_t>>
+ caller_access_sequence__number_timeline;
+ // The access timestamp in microseconds of this block.
+ std::vector<uint64_t> access_timeline;
+ std::map<TableReaderCaller, std::vector<uint64_t>> caller_access_timeline;
+
+ void AddAccess(const BlockCacheTraceRecord& access,
+ uint64_t access_sequnce_number) {
+ if (block_size != 0 && access.block_size != 0) {
+ assert(block_size == access.block_size);
+ }
+ if (num_keys != 0 && access.num_keys_in_block != 0) {
+ assert(num_keys == access.num_keys_in_block);
+ }
+ if (first_access_time == 0) {
+ first_access_time = access.access_timestamp;
+ }
+ table_id = BlockCacheTraceHelper::GetTableId(access);
+ block_offset = BlockCacheTraceHelper::GetBlockOffsetInFile(access);
+ last_access_time = access.access_timestamp;
+ block_size = access.block_size;
+ caller_num_access_map[access.caller]++;
+ num_accesses++;
+ // access.access_timestamp is in microsecond.
+ const uint64_t timestamp_in_seconds =
+ access.access_timestamp / kMicrosInSecond;
+ caller_num_accesses_timeline[access.caller][timestamp_in_seconds] += 1;
+ // Populate the feature vectors.
+ access_sequence_number_timeline.push_back(access_sequnce_number);
+ caller_access_sequence__number_timeline[access.caller].push_back(
+ access_sequnce_number);
+ access_timeline.push_back(access.access_timestamp);
+ caller_access_timeline[access.caller].push_back(access.access_timestamp);
+ if (BlockCacheTraceHelper::IsGetOrMultiGetOnDataBlock(access.block_type,
+ access.caller)) {
+ num_keys = access.num_keys_in_block;
+ if (access.referenced_key_exist_in_block) {
+ if (key_num_access_map.find(access.referenced_key) ==
+ key_num_access_map.end()) {
+ referenced_data_size += access.referenced_data_size;
+ }
+ key_num_access_map[access.referenced_key][access.caller]++;
+ num_referenced_key_exist_in_block++;
+ if (referenced_data_size > block_size && block_size != 0) {
+ ParsedInternalKey internal_key;
+ Status s = ParseInternalKey(access.referenced_key, &internal_key,
+ false /* log_err_key */); // TODO
+ assert(s.ok()); // TODO
+ }
+ } else {
+ non_exist_key_num_access_map[access.referenced_key][access.caller]++;
+ }
+ }
+ }
+};
+
+// Aggregates stats of a block given a block type.
+struct BlockTypeAccessInfoAggregate {
+ std::map<std::string, BlockAccessInfo> block_access_info_map;
+};
+
+// Aggregates BlockTypeAggregate given a SST file.
+struct SSTFileAccessInfoAggregate {
+ uint32_t level;
+ std::map<TraceType, BlockTypeAccessInfoAggregate> block_type_aggregates_map;
+};
+
+// Aggregates SSTFileAggregate given a column family.
+struct ColumnFamilyAccessInfoAggregate {
+ std::map<uint64_t, SSTFileAccessInfoAggregate> fd_aggregates_map;
+};
+
+struct Features {
+ std::vector<uint64_t> elapsed_time_since_last_access;
+ std::vector<uint64_t> num_accesses_since_last_access;
+ std::vector<uint64_t> num_past_accesses;
+};
+
+struct Predictions {
+ std::vector<uint64_t> elapsed_time_till_next_access;
+ std::vector<uint64_t> num_accesses_till_next_access;
+};
+
+class BlockCacheTraceAnalyzer {
+ public:
+ BlockCacheTraceAnalyzer(
+ const std::string& trace_file_path, const std::string& output_dir,
+ const std::string& human_readable_trace_file_path,
+ bool compute_reuse_distance, bool mrc_only,
+ bool is_human_readable_trace_file,
+ std::unique_ptr<BlockCacheTraceSimulator>&& cache_simulator);
+ ~BlockCacheTraceAnalyzer() = default;
+ // No copy and move.
+ BlockCacheTraceAnalyzer(const BlockCacheTraceAnalyzer&) = delete;
+ BlockCacheTraceAnalyzer& operator=(const BlockCacheTraceAnalyzer&) = delete;
+ BlockCacheTraceAnalyzer(BlockCacheTraceAnalyzer&&) = delete;
+ BlockCacheTraceAnalyzer& operator=(BlockCacheTraceAnalyzer&&) = delete;
+
+ // Read all access records in the given trace_file, maintains the stats of
+ // a block, and aggregates the information by block type, sst file, and column
+ // family. Subsequently, the caller may call Print* functions to print
+ // statistics.
+ Status Analyze();
+
+ // Print a summary of statistics of the trace, e.g.,
+ // Number of files: 2 Number of blocks: 50 Number of accesses: 50
+ // Number of Index blocks: 10
+ // Number of Filter blocks: 10
+ // Number of Data blocks: 10
+ // Number of UncompressionDict blocks: 10
+ // Number of RangeDeletion blocks: 10
+ // ***************************************************************
+ // Caller Get: Number of accesses 10
+ // Caller Get: Number of accesses per level break down
+ // Level 0: Number of accesses: 10
+ // Caller Get: Number of accesses per block type break down
+ // Block Type Index: Number of accesses: 2
+ // Block Type Filter: Number of accesses: 2
+ // Block Type Data: Number of accesses: 2
+ // Block Type UncompressionDict: Number of accesses: 2
+ // Block Type RangeDeletion: Number of accesses: 2
+ void PrintStatsSummary() const;
+
+ // Print block size distribution and the distribution break down by block type
+ // and column family.
+ void PrintBlockSizeStats() const;
+
+ // Print access count distribution and the distribution break down by block
+ // type and column family.
+ void PrintAccessCountStats(bool user_access_only, uint32_t bottom_k,
+ uint32_t top_k) const;
+
+ // Print data block accesses by user Get and Multi-Get.
+ // It prints out 1) A histogram on the percentage of keys accessed in a data
+ // block break down by if a referenced key exists in the data block andthe
+ // histogram break down by column family. 2) A histogram on the percentage of
+ // accesses on keys exist in a data block and its break down by column family.
+ void PrintDataBlockAccessStats() const;
+
+ // Write the percentage of accesses break down by column family into a csv
+ // file saved in 'output_dir'.
+ //
+ // The file is named "percentage_of_accesses_summary". The file format is
+ // caller,cf_0,cf_1,...,cf_n where the cf_i is the column family name found in
+ // the trace.
+ void WritePercentAccessSummaryStats() const;
+
+ // Write the percentage of accesses for the given caller break down by column
+ // family, level, and block type into a csv file saved in 'output_dir'.
+ //
+ // It generates two files: 1) caller_level_percentage_of_accesses_summary and
+ // 2) caller_bt_percentage_of_accesses_summary which break down by the level
+ // and block type, respectively. The file format is
+ // level/bt,cf_0,cf_1,...,cf_n where cf_i is the column family name found in
+ // the trace.
+ void WriteDetailedPercentAccessSummaryStats(TableReaderCaller caller) const;
+
+ // Write the access count summary into a csv file saved in 'output_dir'.
+ // It groups blocks by their access count.
+ //
+ // It generates two files: 1) cf_access_count_summary and 2)
+ // bt_access_count_summary which break down the access count by column family
+ // and block type, respectively. The file format is
+ // cf/bt,bucket_0,bucket_1,...,bucket_N.
+ void WriteAccessCountSummaryStats(
+ const std::vector<uint64_t>& access_count_buckets,
+ bool user_access_only) const;
+
+ // Write miss ratio curves of simulated cache configurations into a csv file
+ // named "mrc" saved in 'output_dir'.
+ //
+ // The file format is
+ // "cache_name,num_shard_bits,capacity,miss_ratio,total_accesses".
+ void WriteMissRatioCurves() const;
+
+ // Write miss ratio timeline of simulated cache configurations into several
+ // csv files, one per cache capacity saved in 'output_dir'.
+ //
+ // The file format is
+ // "time,label_1_access_per_second,label_2_access_per_second,...,label_N_access_per_second"
+ // where N is the number of unique cache names
+ // (cache_name+num_shard_bits+ghost_capacity).
+ void WriteMissRatioTimeline(uint64_t time_unit) const;
+
+ // Write misses timeline of simulated cache configurations into several
+ // csv files, one per cache capacity saved in 'output_dir'.
+ //
+ // The file format is
+ // "time,label_1_access_per_second,label_2_access_per_second,...,label_N_access_per_second"
+ // where N is the number of unique cache names
+ // (cache_name+num_shard_bits+ghost_capacity).
+ void WriteMissTimeline(uint64_t time_unit) const;
+
+ // Write the access timeline into a csv file saved in 'output_dir'.
+ //
+ // The file is named "label_access_timeline".The file format is
+ // "time,label_1_access_per_second,label_2_access_per_second,...,label_N_access_per_second"
+ // where N is the number of unique labels found in the trace.
+ void WriteAccessTimeline(const std::string& label, uint64_t time_unit,
+ bool user_access_only) const;
+
+ // Write the reuse distance into a csv file saved in 'output_dir'. Reuse
+ // distance is defined as the cumulated size of unique blocks read between two
+ // consective accesses on the same block.
+ //
+ // The file is named "label_reuse_distance". The file format is
+ // bucket,label_1,label_2,...,label_N.
+ void WriteReuseDistance(const std::string& label_str,
+ const std::vector<uint64_t>& distance_buckets) const;
+
+ // Write the reuse interval into a csv file saved in 'output_dir'. Reuse
+ // interval is defined as the time between two consecutive accesses on the
+ // same block.
+ //
+ // The file is named "label_reuse_interval". The file format is
+ // bucket,label_1,label_2,...,label_N.
+ void WriteReuseInterval(const std::string& label_str,
+ const std::vector<uint64_t>& time_buckets) const;
+
+ // Write the reuse lifetime into a csv file saved in 'output_dir'. Reuse
+ // lifetime is defined as the time interval between the first access of a
+ // block and its last access.
+ //
+ // The file is named "label_reuse_lifetime". The file format is
+ // bucket,label_1,label_2,...,label_N.
+ void WriteReuseLifetime(const std::string& label_str,
+ const std::vector<uint64_t>& time_buckets) const;
+
+ // Write the reuse timeline into a csv file saved in 'output_dir'.
+ //
+ // The file is named
+ // "block_type_user_access_only_reuse_window_reuse_timeline". The file format
+ // is start_time,0,1,...,N where N equals trace_duration / reuse_window.
+ void WriteBlockReuseTimeline(const uint64_t reuse_window,
+ bool user_access_only,
+ TraceType block_type) const;
+
+ // Write the Get spatical locality into csv files saved in 'output_dir'.
+ //
+ // It generates three csv files. label_percent_ref_keys,
+ // label_percent_accesses_on_ref_keys, and
+ // label_percent_data_size_on_ref_keys.
+ void WriteGetSpatialLocality(
+ const std::string& label_str,
+ const std::vector<uint64_t>& percent_buckets) const;
+
+ void WriteCorrelationFeatures(const std::string& label_str,
+ uint32_t max_number_of_values) const;
+
+ void WriteCorrelationFeaturesForGet(uint32_t max_number_of_values) const;
+
+ void WriteSkewness(const std::string& label_str,
+ const std::vector<uint64_t>& percent_buckets,
+ TraceType target_block_type) const;
+
+ const std::map<std::string, ColumnFamilyAccessInfoAggregate>&
+ TEST_cf_aggregates_map() const {
+ return cf_aggregates_map_;
+ }
+
+ private:
+ std::set<std::string> ParseLabelStr(const std::string& label_str) const;
+
+ std::string BuildLabel(const std::set<std::string>& labels,
+ const std::string& cf_name, uint64_t fd,
+ uint32_t level, TraceType type,
+ TableReaderCaller caller, uint64_t block_key,
+ const BlockAccessInfo& block) const;
+
+ void ComputeReuseDistance(BlockAccessInfo* info) const;
+
+ Status RecordAccess(const BlockCacheTraceRecord& access);
+
+ void UpdateReuseIntervalStats(
+ const std::string& label, const std::vector<uint64_t>& time_buckets,
+ const std::map<uint64_t, uint64_t> timeline,
+ std::map<std::string, std::map<uint64_t, uint64_t>>*
+ label_time_num_reuses,
+ uint64_t* total_num_reuses) const;
+
+ std::string OutputPercentAccessStats(
+ uint64_t total_accesses,
+ const std::map<std::string, uint64_t>& cf_access_count) const;
+
+ void WriteStatsToFile(
+ const std::string& label_str, const std::vector<uint64_t>& time_buckets,
+ const std::string& filename_suffix,
+ const std::map<std::string, std::map<uint64_t, uint64_t>>& label_data,
+ uint64_t ntotal) const;
+
+ void TraverseBlocks(
+ std::function<void(const std::string& /*cf_name*/, uint64_t /*fd*/,
+ uint32_t /*level*/, TraceType /*block_type*/,
+ const std::string& /*block_key*/,
+ uint64_t /*block_key_id*/,
+ const BlockAccessInfo& /*block_access_info*/)>
+ block_callback,
+ std::set<std::string>* labels = nullptr) const;
+
+ void UpdateFeatureVectors(
+ const std::vector<uint64_t>& access_sequence_number_timeline,
+ const std::vector<uint64_t>& access_timeline, const std::string& label,
+ std::map<std::string, Features>* label_features,
+ std::map<std::string, Predictions>* label_predictions) const;
+
+ void WriteCorrelationFeaturesToFile(
+ const std::string& label,
+ const std::map<std::string, Features>& label_features,
+ const std::map<std::string, Predictions>& label_predictions,
+ uint32_t max_number_of_values) const;
+
+ ROCKSDB_NAMESPACE::Env* env_;
+ const std::string trace_file_path_;
+ const std::string output_dir_;
+ std::string human_readable_trace_file_path_;
+ const bool compute_reuse_distance_;
+ const bool mrc_only_;
+ const bool is_human_readable_trace_file_;
+
+ BlockCacheTraceHeader header_;
+ std::unique_ptr<BlockCacheTraceSimulator> cache_simulator_;
+ std::map<std::string, ColumnFamilyAccessInfoAggregate> cf_aggregates_map_;
+ std::map<std::string, BlockAccessInfo*> block_info_map_;
+ std::unordered_map<std::string, GetKeyInfo> get_key_info_map_;
+ uint64_t access_sequence_number_ = 0;
+ uint64_t trace_start_timestamp_in_seconds_ = 0;
+ uint64_t trace_end_timestamp_in_seconds_ = 0;
+ MissRatioStats miss_ratio_stats_;
+ uint64_t unique_block_id_ = 1;
+ uint64_t unique_get_key_id_ = 1;
+ BlockCacheHumanReadableTraceWriter human_readable_trace_writer_;
+};
+
+int block_cache_trace_analyzer_tool(int argc, char** argv);
+
+} // namespace ROCKSDB_NAMESPACE
diff --git a/src/rocksdb/tools/block_cache_analyzer/block_cache_trace_analyzer_plot.py b/src/rocksdb/tools/block_cache_analyzer/block_cache_trace_analyzer_plot.py
new file mode 100644
index 000000000..37166bcb4
--- /dev/null
+++ b/src/rocksdb/tools/block_cache_analyzer/block_cache_trace_analyzer_plot.py
@@ -0,0 +1,729 @@
+# Copyright (c) Meta Platforms, Inc. and affiliates.
+#
+# This source code is licensed under both the GPLv2 (found in the
+# COPYING file in the root directory) and Apache 2.0 License
+# (found in the LICENSE.Apache file in the root directory).
+
+#!/usr/bin/env python3
+
+import csv
+import math
+import os
+import random
+import sys
+
+import matplotlib
+
+matplotlib.use("Agg")
+import matplotlib.backends.backend_pdf
+import matplotlib.pyplot as plt
+import numpy as np
+import pandas as pd
+import seaborn as sns
+
+
+# Make sure a legend has the same color across all generated graphs.
+def get_cmap(n, name="hsv"):
+ """Returns a function that maps each index in 0, 1, ..., n-1 to a distinct
+ RGB color; the keyword argument name must be a standard mpl colormap name."""
+ return plt.cm.get_cmap(name, n)
+
+
+color_index = 0
+bar_color_maps = {}
+colors = []
+n_colors = 360
+linear_colors = get_cmap(n_colors)
+for i in range(n_colors):
+ colors.append(linear_colors(i))
+# Shuffle the colors so that adjacent bars in a graph are obvious to differentiate.
+random.shuffle(colors)
+
+
+def num_to_gb(n):
+ one_gb = 1024 * 1024 * 1024
+ if float(n) % one_gb == 0:
+ return "{}".format(n / one_gb)
+ # Keep two decimal points.
+ return "{0:.2f}".format(float(n) / one_gb)
+
+
+def plot_miss_stats_graphs(
+ csv_result_dir, output_result_dir, file_prefix, file_suffix, ylabel, pdf_file_name
+):
+ miss_ratios = {}
+ for file in os.listdir(csv_result_dir):
+ if not file.startswith(file_prefix):
+ continue
+ if not file.endswith(file_suffix):
+ continue
+ print("Processing file {}/{}".format(csv_result_dir, file))
+ mrc_file_path = csv_result_dir + "/" + file
+ with open(mrc_file_path, "r") as csvfile:
+ rows = csv.reader(csvfile, delimiter=",")
+ for row in rows:
+ cache_name = row[0]
+ num_shard_bits = int(row[1])
+ ghost_capacity = int(row[2])
+ capacity = int(row[3])
+ miss_ratio = float(row[4])
+ config = "{}-{}-{}".format(cache_name, num_shard_bits, ghost_capacity)
+ if config not in miss_ratios:
+ miss_ratios[config] = {}
+ miss_ratios[config]["x"] = []
+ miss_ratios[config]["y"] = []
+ miss_ratios[config]["x"].append(capacity)
+ miss_ratios[config]["y"].append(miss_ratio)
+ fig = plt.figure()
+ for config in miss_ratios:
+ plt.plot(
+ miss_ratios[config]["x"], miss_ratios[config]["y"], label=config
+ )
+ plt.xlabel("Cache capacity")
+ plt.ylabel(ylabel)
+ plt.xscale("log", basex=2)
+ plt.ylim(ymin=0)
+ plt.title("{}".format(file))
+ plt.legend()
+ fig.savefig(
+ output_result_dir + "/{}.pdf".format(pdf_file_name), bbox_inches="tight"
+ )
+
+
+def plot_miss_stats_diff_lru_graphs(
+ csv_result_dir, output_result_dir, file_prefix, file_suffix, ylabel, pdf_file_name
+):
+ miss_ratios = {}
+ for file in os.listdir(csv_result_dir):
+ if not file.startswith(file_prefix):
+ continue
+ if not file.endswith(file_suffix):
+ continue
+ print("Processing file {}/{}".format(csv_result_dir, file))
+ mrc_file_path = csv_result_dir + "/" + file
+ with open(mrc_file_path, "r") as csvfile:
+ rows = csv.reader(csvfile, delimiter=",")
+ for row in rows:
+ cache_name = row[0]
+ num_shard_bits = int(row[1])
+ ghost_capacity = int(row[2])
+ capacity = int(row[3])
+ miss_ratio = float(row[4])
+ config = "{}-{}-{}".format(cache_name, num_shard_bits, ghost_capacity)
+ if config not in miss_ratios:
+ miss_ratios[config] = {}
+ miss_ratios[config]["x"] = []
+ miss_ratios[config]["y"] = []
+ miss_ratios[config]["x"].append(capacity)
+ miss_ratios[config]["y"].append(miss_ratio)
+ if "lru-0-0" not in miss_ratios:
+ return
+ fig = plt.figure()
+ for config in miss_ratios:
+ diffs = [0] * len(miss_ratios["lru-0-0"]["x"])
+ for i in range(len(miss_ratios["lru-0-0"]["x"])):
+ for j in range(len(miss_ratios[config]["x"])):
+ if miss_ratios["lru-0-0"]["x"][i] == miss_ratios[config]["x"][j]:
+ diffs[i] = (
+ miss_ratios[config]["y"][j] - miss_ratios["lru-0-0"]["y"][i]
+ )
+ break
+ plt.plot(miss_ratios["lru-0-0"]["x"], diffs, label=config)
+ plt.xlabel("Cache capacity")
+ plt.ylabel(ylabel)
+ plt.xscale("log", basex=2)
+ plt.title("{}".format(file))
+ plt.legend()
+ fig.savefig(
+ output_result_dir + "/{}.pdf".format(pdf_file_name), bbox_inches="tight"
+ )
+
+
+def sanitize(label):
+ # matplotlib cannot plot legends that is prefixed with "_"
+ # so we need to remove them here.
+ index = 0
+ for i in range(len(label)):
+ if label[i] == "_":
+ index += 1
+ else:
+ break
+ data = label[index:]
+ # The value of uint64_max in c++.
+ if "18446744073709551615" in data:
+ return "max"
+ return data
+
+
+# Read the csv file vertically, i.e., group the data by columns.
+def read_data_for_plot_vertical(csvfile):
+ x = []
+ labels = []
+ label_stats = {}
+ csv_rows = csv.reader(csvfile, delimiter=",")
+ data_rows = []
+ for row in csv_rows:
+ data_rows.append(row)
+ # header
+ for i in range(1, len(data_rows[0])):
+ labels.append(sanitize(data_rows[0][i]))
+ label_stats[i - 1] = []
+ for i in range(1, len(data_rows)):
+ for j in range(len(data_rows[i])):
+ if j == 0:
+ x.append(sanitize(data_rows[i][j]))
+ continue
+ label_stats[j - 1].append(float(data_rows[i][j]))
+ return x, labels, label_stats
+
+
+# Read the csv file horizontally, i.e., group the data by rows.
+def read_data_for_plot_horizontal(csvfile):
+ x = []
+ labels = []
+ label_stats = {}
+ csv_rows = csv.reader(csvfile, delimiter=",")
+ data_rows = []
+ for row in csv_rows:
+ data_rows.append(row)
+ # header
+ for i in range(1, len(data_rows)):
+ labels.append(sanitize(data_rows[i][0]))
+ label_stats[i - 1] = []
+ for i in range(1, len(data_rows[0])):
+ x.append(sanitize(data_rows[0][i]))
+ for i in range(1, len(data_rows)):
+ for j in range(len(data_rows[i])):
+ if j == 0:
+ # label
+ continue
+ label_stats[i - 1].append(float(data_rows[i][j]))
+ return x, labels, label_stats
+
+
+def read_data_for_plot(csvfile, vertical):
+ if vertical:
+ return read_data_for_plot_vertical(csvfile)
+ return read_data_for_plot_horizontal(csvfile)
+
+
+def plot_line_charts(
+ csv_result_dir,
+ output_result_dir,
+ filename_prefix,
+ filename_suffix,
+ pdf_name,
+ xlabel,
+ ylabel,
+ title,
+ vertical,
+ legend,
+):
+ global color_index, bar_color_maps, colors
+ pdf = matplotlib.backends.backend_pdf.PdfPages(output_result_dir + "/" + pdf_name)
+ for file in os.listdir(csv_result_dir):
+ if not file.endswith(filename_suffix):
+ continue
+ if not file.startswith(filename_prefix):
+ continue
+ print("Processing file {}/{}".format(csv_result_dir, file))
+ with open(csv_result_dir + "/" + file, "r") as csvfile:
+ x, labels, label_stats = read_data_for_plot(csvfile, vertical)
+ if len(x) == 0 or len(labels) == 0:
+ continue
+ # plot figure
+ fig = plt.figure()
+ for label_index in label_stats:
+ # Assign a unique color to this label.
+ if labels[label_index] not in bar_color_maps:
+ bar_color_maps[labels[label_index]] = colors[color_index]
+ color_index += 1
+ plt.plot(
+ [int(x[i]) for i in range(len(x) - 1)],
+ label_stats[label_index][:-1],
+ label=labels[label_index],
+ color=bar_color_maps[labels[label_index]],
+ )
+
+ # Translate time unit into x labels.
+ if "_60" in file:
+ plt.xlabel("{} (Minute)".format(xlabel))
+ if "_3600" in file:
+ plt.xlabel("{} (Hour)".format(xlabel))
+ plt.ylabel(ylabel)
+ plt.title("{} {}".format(title, file))
+ if legend:
+ plt.legend()
+ pdf.savefig(fig)
+ pdf.close()
+
+
+def plot_stacked_bar_charts(
+ csv_result_dir,
+ output_result_dir,
+ filename_suffix,
+ pdf_name,
+ xlabel,
+ ylabel,
+ title,
+ vertical,
+ x_prefix,
+):
+ global color_index, bar_color_maps, colors
+ pdf = matplotlib.backends.backend_pdf.PdfPages(
+ "{}/{}".format(output_result_dir, pdf_name)
+ )
+ for file in os.listdir(csv_result_dir):
+ if not file.endswith(filename_suffix):
+ continue
+ with open(csv_result_dir + "/" + file, "r") as csvfile:
+ print("Processing file {}/{}".format(csv_result_dir, file))
+ x, labels, label_stats = read_data_for_plot(csvfile, vertical)
+ if len(x) == 0 or len(label_stats) == 0:
+ continue
+ # Plot figure
+ fig = plt.figure()
+ ind = np.arange(len(x)) # the x locations for the groups
+ width = 0.5 # the width of the bars: can also be len(x) sequence
+ bars = []
+ bottom_bars = []
+ for _i in label_stats[0]:
+ bottom_bars.append(0)
+ for i in range(0, len(label_stats)):
+ # Assign a unique color to this label.
+ if labels[i] not in bar_color_maps:
+ bar_color_maps[labels[i]] = colors[color_index]
+ color_index += 1
+ p = plt.bar(
+ ind,
+ label_stats[i],
+ width,
+ bottom=bottom_bars,
+ color=bar_color_maps[labels[i]],
+ )
+ bars.append(p[0])
+ for j in range(len(label_stats[i])):
+ bottom_bars[j] += label_stats[i][j]
+ plt.xlabel(xlabel)
+ plt.ylabel(ylabel)
+ plt.xticks(
+ ind, [x_prefix + x[i] for i in range(len(x))], rotation=20, fontsize=8
+ )
+ plt.legend(bars, labels)
+ plt.title("{} filename:{}".format(title, file))
+ pdf.savefig(fig)
+ pdf.close()
+
+
+def plot_heatmap(csv_result_dir, output_result_dir, filename_suffix, pdf_name, title):
+ pdf = matplotlib.backends.backend_pdf.PdfPages(
+ "{}/{}".format(output_result_dir, pdf_name)
+ )
+ for file in os.listdir(csv_result_dir):
+ if not file.endswith(filename_suffix):
+ continue
+ csv_file_name = "{}/{}".format(csv_result_dir, file)
+ print("Processing file {}/{}".format(csv_result_dir, file))
+ corr_table = pd.read_csv(csv_file_name)
+ corr_table = corr_table.pivot("label", "corr", "value")
+ fig = plt.figure()
+ sns.heatmap(corr_table, annot=True, linewidths=0.5, fmt=".2")
+ plt.title("{} filename:{}".format(title, file))
+ pdf.savefig(fig)
+ pdf.close()
+
+
+def plot_timeline(csv_result_dir, output_result_dir):
+ plot_line_charts(
+ csv_result_dir,
+ output_result_dir,
+ filename_prefix="",
+ filename_suffix="access_timeline",
+ pdf_name="access_time.pdf",
+ xlabel="Time",
+ ylabel="Throughput",
+ title="Access timeline with group by label",
+ vertical=False,
+ legend=True,
+ )
+
+
+def convert_to_0_if_nan(n):
+ if math.isnan(n):
+ return 0.0
+ return n
+
+
+def plot_correlation(csv_result_dir, output_result_dir):
+ # Processing the correlation input first.
+ label_str_file = {}
+ for file in os.listdir(csv_result_dir):
+ if not file.endswith("correlation_input"):
+ continue
+ csv_file_name = "{}/{}".format(csv_result_dir, file)
+ print("Processing file {}/{}".format(csv_result_dir, file))
+ corr_table = pd.read_csv(csv_file_name)
+ label_str = file.split("_")[0]
+ label = file[len(label_str) + 1 :]
+ label = label[: len(label) - len("_correlation_input")]
+
+ output_file = "{}/{}_correlation_output".format(csv_result_dir, label_str)
+ if output_file not in label_str_file:
+ f = open("{}/{}_correlation_output".format(csv_result_dir, label_str), "w+")
+ label_str_file[output_file] = f
+ f.write("label,corr,value\n")
+ f = label_str_file[output_file]
+ f.write(
+ "{},{},{}\n".format(
+ label,
+ "LA+A",
+ convert_to_0_if_nan(
+ corr_table["num_accesses_since_last_access"].corr(
+ corr_table["num_accesses_till_next_access"], method="spearman"
+ )
+ ),
+ )
+ )
+ f.write(
+ "{},{},{}\n".format(
+ label,
+ "PA+A",
+ convert_to_0_if_nan(
+ corr_table["num_past_accesses"].corr(
+ corr_table["num_accesses_till_next_access"], method="spearman"
+ )
+ ),
+ )
+ )
+ f.write(
+ "{},{},{}\n".format(
+ label,
+ "LT+A",
+ convert_to_0_if_nan(
+ corr_table["elapsed_time_since_last_access"].corr(
+ corr_table["num_accesses_till_next_access"], method="spearman"
+ )
+ ),
+ )
+ )
+ f.write(
+ "{},{},{}\n".format(
+ label,
+ "LA+T",
+ convert_to_0_if_nan(
+ corr_table["num_accesses_since_last_access"].corr(
+ corr_table["elapsed_time_till_next_access"], method="spearman"
+ )
+ ),
+ )
+ )
+ f.write(
+ "{},{},{}\n".format(
+ label,
+ "LT+T",
+ convert_to_0_if_nan(
+ corr_table["elapsed_time_since_last_access"].corr(
+ corr_table["elapsed_time_till_next_access"], method="spearman"
+ )
+ ),
+ )
+ )
+ f.write(
+ "{},{},{}\n".format(
+ label,
+ "PA+T",
+ convert_to_0_if_nan(
+ corr_table["num_past_accesses"].corr(
+ corr_table["elapsed_time_till_next_access"], method="spearman"
+ )
+ ),
+ )
+ )
+ for label_str in label_str_file:
+ label_str_file[label_str].close()
+
+ plot_heatmap(
+ csv_result_dir,
+ output_result_dir,
+ "correlation_output",
+ "correlation.pdf",
+ "Correlation",
+ )
+
+
+def plot_reuse_graphs(csv_result_dir, output_result_dir):
+ plot_stacked_bar_charts(
+ csv_result_dir,
+ output_result_dir,
+ filename_suffix="avg_reuse_interval_naccesses",
+ pdf_name="avg_reuse_interval_naccesses.pdf",
+ xlabel="",
+ ylabel="Percentage of accesses",
+ title="Average reuse interval",
+ vertical=True,
+ x_prefix="< ",
+ )
+ plot_stacked_bar_charts(
+ csv_result_dir,
+ output_result_dir,
+ filename_suffix="avg_reuse_interval",
+ pdf_name="avg_reuse_interval.pdf",
+ xlabel="",
+ ylabel="Percentage of blocks",
+ title="Average reuse interval",
+ vertical=True,
+ x_prefix="< ",
+ )
+ plot_stacked_bar_charts(
+ csv_result_dir,
+ output_result_dir,
+ filename_suffix="access_reuse_interval",
+ pdf_name="reuse_interval.pdf",
+ xlabel="Seconds",
+ ylabel="Percentage of accesses",
+ title="Reuse interval",
+ vertical=True,
+ x_prefix="< ",
+ )
+ plot_stacked_bar_charts(
+ csv_result_dir,
+ output_result_dir,
+ filename_suffix="reuse_lifetime",
+ pdf_name="reuse_lifetime.pdf",
+ xlabel="Seconds",
+ ylabel="Percentage of blocks",
+ title="Reuse lifetime",
+ vertical=True,
+ x_prefix="< ",
+ )
+ plot_line_charts(
+ csv_result_dir,
+ output_result_dir,
+ filename_prefix="",
+ filename_suffix="reuse_blocks_timeline",
+ pdf_name="reuse_blocks_timeline.pdf",
+ xlabel="",
+ ylabel="Percentage of blocks",
+ title="Reuse blocks timeline",
+ vertical=False,
+ legend=False,
+ )
+
+
+def plot_percentage_access_summary(csv_result_dir, output_result_dir):
+ plot_stacked_bar_charts(
+ csv_result_dir,
+ output_result_dir,
+ filename_suffix="percentage_of_accesses_summary",
+ pdf_name="percentage_access.pdf",
+ xlabel="",
+ ylabel="Percentage of accesses",
+ title="",
+ vertical=True,
+ x_prefix="",
+ )
+ plot_stacked_bar_charts(
+ csv_result_dir,
+ output_result_dir,
+ filename_suffix="percent_ref_keys",
+ pdf_name="percent_ref_keys.pdf",
+ xlabel="",
+ ylabel="Percentage of blocks",
+ title="",
+ vertical=True,
+ x_prefix="",
+ )
+ plot_stacked_bar_charts(
+ csv_result_dir,
+ output_result_dir,
+ filename_suffix="percent_data_size_on_ref_keys",
+ pdf_name="percent_data_size_on_ref_keys.pdf",
+ xlabel="",
+ ylabel="Percentage of blocks",
+ title="",
+ vertical=True,
+ x_prefix="",
+ )
+ plot_stacked_bar_charts(
+ csv_result_dir,
+ output_result_dir,
+ filename_suffix="percent_accesses_on_ref_keys",
+ pdf_name="percent_accesses_on_ref_keys.pdf",
+ xlabel="",
+ ylabel="Percentage of blocks",
+ title="",
+ vertical=True,
+ x_prefix="",
+ )
+
+
+def plot_access_count_summary(csv_result_dir, output_result_dir):
+ plot_stacked_bar_charts(
+ csv_result_dir,
+ output_result_dir,
+ filename_suffix="access_count_summary",
+ pdf_name="access_count_summary.pdf",
+ xlabel="Access count",
+ ylabel="Percentage of blocks",
+ title="",
+ vertical=True,
+ x_prefix="< ",
+ )
+ plot_line_charts(
+ csv_result_dir,
+ output_result_dir,
+ filename_prefix="",
+ filename_suffix="skewness",
+ pdf_name="skew.pdf",
+ xlabel="",
+ ylabel="Percentage of accesses",
+ title="Skewness",
+ vertical=True,
+ legend=False,
+ )
+
+
+def plot_miss_ratio_timeline(csv_result_dir, output_result_dir):
+ plot_line_charts(
+ csv_result_dir,
+ output_result_dir,
+ filename_prefix="",
+ filename_suffix="3600_miss_ratio_timeline",
+ pdf_name="miss_ratio_timeline.pdf",
+ xlabel="Time",
+ ylabel="Miss Ratio (%)",
+ title="Miss ratio timeline",
+ vertical=False,
+ legend=True,
+ )
+ plot_line_charts(
+ csv_result_dir,
+ output_result_dir,
+ filename_prefix="",
+ filename_suffix="3600_miss_timeline",
+ pdf_name="miss_timeline.pdf",
+ xlabel="Time",
+ ylabel="# of misses ",
+ title="Miss timeline",
+ vertical=False,
+ legend=True,
+ )
+ plot_line_charts(
+ csv_result_dir,
+ output_result_dir,
+ filename_prefix="",
+ filename_suffix="3600_miss_timeline",
+ pdf_name="miss_timeline.pdf",
+ xlabel="Time",
+ ylabel="# of misses ",
+ title="Miss timeline",
+ vertical=False,
+ legend=True,
+ )
+ plot_line_charts(
+ csv_result_dir,
+ output_result_dir,
+ filename_prefix="",
+ filename_suffix="3600_policy_timeline",
+ pdf_name="policy_timeline.pdf",
+ xlabel="Time",
+ ylabel="# of times a policy is selected ",
+ title="Policy timeline",
+ vertical=False,
+ legend=True,
+ )
+ plot_line_charts(
+ csv_result_dir,
+ output_result_dir,
+ filename_prefix="",
+ filename_suffix="3600_policy_ratio_timeline",
+ pdf_name="policy_ratio_timeline.pdf",
+ xlabel="Time",
+ ylabel="Percentage of times a policy is selected ",
+ title="Policy timeline",
+ vertical=False,
+ legend=True,
+ )
+
+
+if __name__ == "__main__":
+ if len(sys.argv) < 3:
+ print(
+ "Must provide two arguments: \n"
+ "1) The directory that saves a list of "
+ "directories which contain block cache trace analyzer result files. \n"
+ "2) the directory to save plotted graphs. \n"
+ )
+ exit(1)
+ csv_result_dir = sys.argv[1]
+ output_result_dir = sys.argv[2]
+ print(
+ "Processing directory {} and save graphs to {}.".format(
+ csv_result_dir, output_result_dir
+ )
+ )
+ for csv_relative_dir in os.listdir(csv_result_dir):
+ csv_abs_dir = csv_result_dir + "/" + csv_relative_dir
+ result_dir = output_result_dir + "/" + csv_relative_dir
+ if not os.path.isdir(csv_abs_dir):
+ print("{} is not a directory".format(csv_abs_dir))
+ continue
+ print("Processing experiment dir: {}".format(csv_relative_dir))
+ if not os.path.exists(result_dir):
+ os.makedirs(result_dir)
+ plot_access_count_summary(csv_abs_dir, result_dir)
+ plot_timeline(csv_abs_dir, result_dir)
+ plot_miss_ratio_timeline(csv_result_dir, output_result_dir)
+ plot_correlation(csv_abs_dir, result_dir)
+ plot_reuse_graphs(csv_abs_dir, result_dir)
+ plot_percentage_access_summary(csv_abs_dir, result_dir)
+ plot_miss_stats_graphs(
+ csv_abs_dir,
+ result_dir,
+ file_prefix="",
+ file_suffix="mrc",
+ ylabel="Miss ratio (%)",
+ pdf_file_name="mrc",
+ )
+ plot_miss_stats_diff_lru_graphs(
+ csv_abs_dir,
+ result_dir,
+ file_prefix="",
+ file_suffix="mrc",
+ ylabel="Miss ratio (%)",
+ pdf_file_name="mrc_diff_lru",
+ )
+ # The following stats are only available in pysim.
+ for time_unit in ["1", "60", "3600"]:
+ plot_miss_stats_graphs(
+ csv_abs_dir,
+ result_dir,
+ file_prefix="ml_{}_".format(time_unit),
+ file_suffix="p95mb",
+ ylabel="p95 number of byte miss per {} seconds".format(time_unit),
+ pdf_file_name="p95mb_per{}_seconds".format(time_unit),
+ )
+ plot_miss_stats_graphs(
+ csv_abs_dir,
+ result_dir,
+ file_prefix="ml_{}_".format(time_unit),
+ file_suffix="avgmb",
+ ylabel="Average number of byte miss per {} seconds".format(time_unit),
+ pdf_file_name="avgmb_per{}_seconds".format(time_unit),
+ )
+ plot_miss_stats_diff_lru_graphs(
+ csv_abs_dir,
+ result_dir,
+ file_prefix="ml_{}_".format(time_unit),
+ file_suffix="p95mb",
+ ylabel="p95 number of byte miss per {} seconds".format(time_unit),
+ pdf_file_name="p95mb_per{}_seconds_diff_lru".format(time_unit),
+ )
+ plot_miss_stats_diff_lru_graphs(
+ csv_abs_dir,
+ result_dir,
+ file_prefix="ml_{}_".format(time_unit),
+ file_suffix="avgmb",
+ ylabel="Average number of byte miss per {} seconds".format(time_unit),
+ pdf_file_name="avgmb_per{}_seconds_diff_lru".format(time_unit),
+ )
diff --git a/src/rocksdb/tools/block_cache_analyzer/block_cache_trace_analyzer_test.cc b/src/rocksdb/tools/block_cache_analyzer/block_cache_trace_analyzer_test.cc
new file mode 100644
index 000000000..c5d9b1452
--- /dev/null
+++ b/src/rocksdb/tools/block_cache_analyzer/block_cache_trace_analyzer_test.cc
@@ -0,0 +1,735 @@
+// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
+// This source code is licensed under both the GPLv2 (found in the
+// COPYING file in the root directory) and Apache 2.0 License
+// (found in the LICENSE.Apache file in the root directory).
+
+#ifndef ROCKSDB_LITE
+#ifndef GFLAGS
+#include <cstdio>
+int main() {
+ fprintf(stderr,
+ "Please install gflags to run block_cache_trace_analyzer_test\n");
+ return 0;
+}
+#else
+
+#include <fstream>
+#include <iostream>
+#include <map>
+#include <vector>
+
+#include "rocksdb/db.h"
+#include "rocksdb/env.h"
+#include "rocksdb/status.h"
+#include "rocksdb/trace_reader_writer.h"
+#include "rocksdb/trace_record.h"
+#include "test_util/testharness.h"
+#include "test_util/testutil.h"
+#include "tools/block_cache_analyzer/block_cache_trace_analyzer.h"
+#include "trace_replay/block_cache_tracer.h"
+
+namespace ROCKSDB_NAMESPACE {
+
+namespace {
+const uint64_t kBlockSize = 1024;
+const std::string kBlockKeyPrefix = "test-block-";
+const uint32_t kCFId = 0;
+const uint32_t kLevel = 1;
+const uint64_t kSSTStoringEvenKeys = 100;
+const uint64_t kSSTStoringOddKeys = 101;
+const std::string kRefKeyPrefix = "test-get-";
+const uint64_t kNumKeysInBlock = 1024;
+const int kMaxArgCount = 100;
+const size_t kArgBufferSize = 100000;
+} // namespace
+
+class BlockCacheTracerTest : public testing::Test {
+ public:
+ BlockCacheTracerTest() {
+ test_path_ = test::PerThreadDBPath("block_cache_trace_analyzer_test");
+ env_ = ROCKSDB_NAMESPACE::Env::Default();
+ EXPECT_OK(env_->CreateDir(test_path_));
+ trace_file_path_ = test_path_ + "/block_cache_trace";
+ block_cache_sim_config_path_ = test_path_ + "/block_cache_sim_config";
+ timeline_labels_ =
+ "block,all,cf,sst,level,bt,caller,cf_sst,cf_level,cf_bt,cf_caller";
+ reuse_distance_labels_ =
+ "block,all,cf,sst,level,bt,caller,cf_sst,cf_level,cf_bt,cf_caller";
+ reuse_distance_buckets_ = "1,1K,1M,1G";
+ reuse_interval_labels_ = "block,all,cf,sst,level,bt,cf_sst,cf_level,cf_bt";
+ reuse_interval_buckets_ = "1,10,100,1000";
+ reuse_lifetime_labels_ = "block,all,cf,sst,level,bt,cf_sst,cf_level,cf_bt";
+ reuse_lifetime_buckets_ = "1,10,100,1000";
+ analyzing_callers_ = "Get,Iterator";
+ access_count_buckets_ = "2,3,4,5,10";
+ analyze_get_spatial_locality_labels_ = "all";
+ analyze_get_spatial_locality_buckets_ = "10,20,30,40,50,60,70,80,90,100";
+ }
+
+ ~BlockCacheTracerTest() override {
+ if (getenv("KEEP_DB")) {
+ printf("The trace file is still at %s\n", trace_file_path_.c_str());
+ return;
+ }
+ EXPECT_OK(env_->DeleteFile(trace_file_path_));
+ EXPECT_OK(env_->DeleteDir(test_path_));
+ }
+
+ TableReaderCaller GetCaller(uint32_t key_id) {
+ uint32_t n = key_id % 5;
+ switch (n) {
+ case 0:
+ return TableReaderCaller::kPrefetch;
+ case 1:
+ return TableReaderCaller::kCompaction;
+ case 2:
+ return TableReaderCaller::kUserGet;
+ case 3:
+ return TableReaderCaller::kUserMultiGet;
+ case 4:
+ return TableReaderCaller::kUserIterator;
+ }
+ // This cannot happend.
+ assert(false);
+ return TableReaderCaller::kMaxBlockCacheLookupCaller;
+ }
+
+ void WriteBlockAccess(BlockCacheTraceWriter* writer, uint32_t from_key_id,
+ TraceType block_type, uint32_t nblocks) {
+ assert(writer);
+ for (uint32_t i = 0; i < nblocks; i++) {
+ uint32_t key_id = from_key_id + i;
+ uint64_t timestamp = (key_id + 1) * kMicrosInSecond;
+ BlockCacheTraceRecord record;
+ record.block_type = block_type;
+ record.block_size = kBlockSize + key_id;
+ record.block_key = kBlockKeyPrefix + std::to_string(key_id);
+ record.access_timestamp = timestamp;
+ record.cf_id = kCFId;
+ record.cf_name = kDefaultColumnFamilyName;
+ record.caller = GetCaller(key_id);
+ record.level = kLevel;
+ if (key_id % 2 == 0) {
+ record.sst_fd_number = kSSTStoringEvenKeys;
+ } else {
+ record.sst_fd_number = kSSTStoringOddKeys;
+ }
+ record.is_cache_hit = false;
+ record.no_insert = false;
+ // Provide these fields for all block types.
+ // The writer should only write these fields for data blocks and the
+ // caller is either GET or MGET.
+ record.referenced_key =
+ kRefKeyPrefix + std::to_string(key_id) + std::string(8, 0);
+ record.referenced_key_exist_in_block = true;
+ record.num_keys_in_block = kNumKeysInBlock;
+ ASSERT_OK(writer->WriteBlockAccess(
+ record, record.block_key, record.cf_name, record.referenced_key));
+ }
+ }
+
+ void AssertBlockAccessInfo(
+ uint32_t key_id, TraceType type,
+ const std::map<std::string, BlockAccessInfo>& block_access_info_map) {
+ auto key_id_str = kBlockKeyPrefix + std::to_string(key_id);
+ ASSERT_TRUE(block_access_info_map.find(key_id_str) !=
+ block_access_info_map.end());
+ auto& block_access_info = block_access_info_map.find(key_id_str)->second;
+ ASSERT_EQ(1, block_access_info.num_accesses);
+ ASSERT_EQ(kBlockSize + key_id, block_access_info.block_size);
+ ASSERT_GT(block_access_info.first_access_time, 0);
+ ASSERT_GT(block_access_info.last_access_time, 0);
+ ASSERT_EQ(1, block_access_info.caller_num_access_map.size());
+ TableReaderCaller expected_caller = GetCaller(key_id);
+ ASSERT_TRUE(block_access_info.caller_num_access_map.find(expected_caller) !=
+ block_access_info.caller_num_access_map.end());
+ ASSERT_EQ(
+ 1,
+ block_access_info.caller_num_access_map.find(expected_caller)->second);
+
+ if ((expected_caller == TableReaderCaller::kUserGet ||
+ expected_caller == TableReaderCaller::kUserMultiGet) &&
+ type == TraceType::kBlockTraceDataBlock) {
+ ASSERT_EQ(kNumKeysInBlock, block_access_info.num_keys);
+ ASSERT_EQ(1, block_access_info.key_num_access_map.size());
+ ASSERT_EQ(0, block_access_info.non_exist_key_num_access_map.size());
+ ASSERT_EQ(1, block_access_info.num_referenced_key_exist_in_block);
+ }
+ }
+
+ void RunBlockCacheTraceAnalyzer() {
+ std::vector<std::string> params = {
+ "./block_cache_trace_analyzer",
+ "-block_cache_trace_path=" + trace_file_path_,
+ "-block_cache_sim_config_path=" + block_cache_sim_config_path_,
+ "-block_cache_analysis_result_dir=" + test_path_,
+ "-print_block_size_stats",
+ "-print_access_count_stats",
+ "-print_data_block_access_count_stats",
+ "-cache_sim_warmup_seconds=0",
+ "-analyze_bottom_k_access_count_blocks=5",
+ "-analyze_top_k_access_count_blocks=5",
+ "-analyze_blocks_reuse_k_reuse_window=5",
+ "-timeline_labels=" + timeline_labels_,
+ "-reuse_distance_labels=" + reuse_distance_labels_,
+ "-reuse_distance_buckets=" + reuse_distance_buckets_,
+ "-reuse_interval_labels=" + reuse_interval_labels_,
+ "-reuse_interval_buckets=" + reuse_interval_buckets_,
+ "-reuse_lifetime_labels=" + reuse_lifetime_labels_,
+ "-reuse_lifetime_buckets=" + reuse_lifetime_buckets_,
+ "-analyze_callers=" + analyzing_callers_,
+ "-access_count_buckets=" + access_count_buckets_,
+ "-analyze_get_spatial_locality_labels=" +
+ analyze_get_spatial_locality_labels_,
+ "-analyze_get_spatial_locality_buckets=" +
+ analyze_get_spatial_locality_buckets_,
+ "-analyze_correlation_coefficients_labels=all",
+ "-skew_labels=all",
+ "-skew_buckets=10,50,100"};
+ char arg_buffer[kArgBufferSize];
+ char* argv[kMaxArgCount];
+ int argc = 0;
+ int cursor = 0;
+ for (const auto& arg : params) {
+ ASSERT_LE(cursor + arg.size() + 1, kArgBufferSize);
+ ASSERT_LE(argc + 1, kMaxArgCount);
+ snprintf(arg_buffer + cursor, arg.size() + 1, "%s", arg.c_str());
+
+ argv[argc++] = arg_buffer + cursor;
+ cursor += static_cast<int>(arg.size()) + 1;
+ }
+ ASSERT_EQ(0,
+ ROCKSDB_NAMESPACE::block_cache_trace_analyzer_tool(argc, argv));
+ }
+
+ Env* env_;
+ EnvOptions env_options_;
+ std::string block_cache_sim_config_path_;
+ std::string trace_file_path_;
+ std::string test_path_;
+ std::string timeline_labels_;
+ std::string reuse_distance_labels_;
+ std::string reuse_distance_buckets_;
+ std::string reuse_interval_labels_;
+ std::string reuse_interval_buckets_;
+ std::string reuse_lifetime_labels_;
+ std::string reuse_lifetime_buckets_;
+ std::string analyzing_callers_;
+ std::string access_count_buckets_;
+ std::string analyze_get_spatial_locality_labels_;
+ std::string analyze_get_spatial_locality_buckets_;
+};
+
+TEST_F(BlockCacheTracerTest, BlockCacheAnalyzer) {
+ {
+ // Generate a trace file.
+ BlockCacheTraceWriterOptions trace_writer_opt;
+ std::unique_ptr<TraceWriter> trace_writer;
+ ASSERT_OK(NewFileTraceWriter(env_, env_options_, trace_file_path_,
+ &trace_writer));
+ const auto& clock = env_->GetSystemClock();
+ std::unique_ptr<BlockCacheTraceWriter> block_cache_trace_writer =
+ NewBlockCacheTraceWriter(clock.get(), trace_writer_opt,
+ std::move(trace_writer));
+ ASSERT_NE(block_cache_trace_writer, nullptr);
+ ASSERT_OK(block_cache_trace_writer->WriteHeader());
+ WriteBlockAccess(block_cache_trace_writer.get(), 0,
+ TraceType::kBlockTraceDataBlock, 50);
+ ASSERT_OK(env_->FileExists(trace_file_path_));
+ }
+ {
+ // Generate a cache sim config.
+ std::string config = "lru,1,0,1K,1M,1G";
+ std::ofstream out(block_cache_sim_config_path_);
+ ASSERT_TRUE(out.is_open());
+ out << config << std::endl;
+ out.close();
+ }
+ RunBlockCacheTraceAnalyzer();
+ {
+ // Validate the cache miss ratios.
+ std::vector<uint64_t> expected_capacities{1024, 1024 * 1024,
+ 1024 * 1024 * 1024};
+ const std::string mrc_path = test_path_ + "/49_50_mrc";
+ std::ifstream infile(mrc_path);
+ uint32_t config_index = 0;
+ std::string line;
+ // Read header.
+ ASSERT_TRUE(getline(infile, line));
+ while (getline(infile, line)) {
+ std::stringstream ss(line);
+ std::vector<std::string> result_strs;
+ while (ss.good()) {
+ std::string substr;
+ getline(ss, substr, ',');
+ result_strs.push_back(substr);
+ }
+ ASSERT_EQ(6, result_strs.size());
+ ASSERT_LT(config_index, expected_capacities.size());
+ ASSERT_EQ("lru", result_strs[0]); // cache_name
+ ASSERT_EQ("1", result_strs[1]); // num_shard_bits
+ ASSERT_EQ("0", result_strs[2]); // ghost_cache_capacity
+ ASSERT_EQ(std::to_string(expected_capacities[config_index]),
+ result_strs[3]); // cache_capacity
+ ASSERT_EQ("100.0000", result_strs[4]); // miss_ratio
+ ASSERT_EQ("50", result_strs[5]); // number of accesses.
+ config_index++;
+ }
+ ASSERT_EQ(expected_capacities.size(), config_index);
+ infile.close();
+ ASSERT_OK(env_->DeleteFile(mrc_path));
+
+ const std::vector<std::string> time_units{"1", "60", "3600"};
+ expected_capacities.push_back(std::numeric_limits<uint64_t>::max());
+ for (auto const& expected_capacity : expected_capacities) {
+ for (auto const& time_unit : time_units) {
+ const std::string miss_ratio_timeline_path =
+ test_path_ + "/" + std::to_string(expected_capacity) + "_" +
+ time_unit + "_miss_ratio_timeline";
+ std::ifstream mrt_file(miss_ratio_timeline_path);
+ // Read header.
+ ASSERT_TRUE(getline(mrt_file, line));
+ ASSERT_TRUE(getline(mrt_file, line));
+ std::stringstream ss(line);
+ bool read_header = false;
+ while (ss.good()) {
+ std::string substr;
+ getline(ss, substr, ',');
+ if (!read_header) {
+ if (expected_capacity == std::numeric_limits<uint64_t>::max()) {
+ ASSERT_EQ("trace", substr);
+ } else {
+ ASSERT_EQ("lru-1-0", substr);
+ }
+ read_header = true;
+ continue;
+ }
+ ASSERT_DOUBLE_EQ(100.0, ParseDouble(substr));
+ }
+ ASSERT_FALSE(getline(mrt_file, line));
+ mrt_file.close();
+ ASSERT_OK(env_->DeleteFile(miss_ratio_timeline_path));
+ }
+ for (auto const& time_unit : time_units) {
+ const std::string miss_timeline_path =
+ test_path_ + "/" + std::to_string(expected_capacity) + "_" +
+ time_unit + "_miss_timeline";
+ std::ifstream mt_file(miss_timeline_path);
+ // Read header.
+ ASSERT_TRUE(getline(mt_file, line));
+ ASSERT_TRUE(getline(mt_file, line));
+ std::stringstream ss(line);
+ uint32_t num_misses = 0;
+ while (ss.good()) {
+ std::string substr;
+ getline(ss, substr, ',');
+ if (num_misses == 0) {
+ if (expected_capacity == std::numeric_limits<uint64_t>::max()) {
+ ASSERT_EQ("trace", substr);
+ } else {
+ ASSERT_EQ("lru-1-0", substr);
+ }
+ num_misses++;
+ continue;
+ }
+ num_misses += ParseInt(substr);
+ }
+ ASSERT_EQ(51u, num_misses);
+ ASSERT_FALSE(getline(mt_file, line));
+ mt_file.close();
+ ASSERT_OK(env_->DeleteFile(miss_timeline_path));
+ }
+ }
+ }
+ {
+ // Validate the skewness csv file.
+ const std::string skewness_file_path = test_path_ + "/all_skewness";
+ std::ifstream skew_file(skewness_file_path);
+ // Read header.
+ std::string line;
+ ASSERT_TRUE(getline(skew_file, line));
+ std::stringstream ss(line);
+ double sum_percent = 0;
+ while (getline(skew_file, line)) {
+ std::stringstream ss_naccess(line);
+ std::string substr;
+ bool read_label = false;
+ while (ss_naccess.good()) {
+ ASSERT_TRUE(getline(ss_naccess, substr, ','));
+ if (!read_label) {
+ read_label = true;
+ continue;
+ }
+ sum_percent += ParseDouble(substr);
+ }
+ }
+ ASSERT_EQ(100.0, sum_percent);
+ ASSERT_FALSE(getline(skew_file, line));
+ skew_file.close();
+ ASSERT_OK(env_->DeleteFile(skewness_file_path));
+ }
+ {
+ // Validate the timeline csv files.
+ const std::vector<std::string> time_units{"_60", "_3600"};
+ const std::vector<std::string> user_access_only_flags{"user_access_only_",
+ "all_access_"};
+ for (auto const& user_access_only : user_access_only_flags) {
+ for (auto const& unit : time_units) {
+ std::stringstream ss(timeline_labels_);
+ while (ss.good()) {
+ std::string l;
+ ASSERT_TRUE(getline(ss, l, ','));
+ if (l.find("block") == std::string::npos) {
+ if (user_access_only != "all_access_") {
+ continue;
+ }
+ }
+ const std::string timeline_file = test_path_ + "/" +
+ user_access_only + l + unit +
+ "_access_timeline";
+ std::ifstream infile(timeline_file);
+ std::string line;
+ const uint64_t expected_naccesses = 50;
+ const uint64_t expected_user_accesses = 30;
+ ASSERT_TRUE(getline(infile, line)) << timeline_file;
+ uint32_t naccesses = 0;
+ while (getline(infile, line)) {
+ std::stringstream ss_naccess(line);
+ std::string substr;
+ bool read_label = false;
+ while (ss_naccess.good()) {
+ ASSERT_TRUE(getline(ss_naccess, substr, ','));
+ if (!read_label) {
+ read_label = true;
+ continue;
+ }
+ naccesses += ParseUint32(substr);
+ }
+ }
+ if (user_access_only == "user_access_only_") {
+ ASSERT_EQ(expected_user_accesses, naccesses) << timeline_file;
+ } else {
+ ASSERT_EQ(expected_naccesses, naccesses) << timeline_file;
+ }
+ ASSERT_OK(env_->DeleteFile(timeline_file));
+ }
+ }
+ }
+ }
+ {
+ // Validate the reuse_interval and reuse_distance csv files.
+ std::map<std::string, std::string> test_reuse_csv_files;
+ test_reuse_csv_files["_access_reuse_interval"] = reuse_interval_labels_;
+ test_reuse_csv_files["_reuse_distance"] = reuse_distance_labels_;
+ test_reuse_csv_files["_reuse_lifetime"] = reuse_lifetime_labels_;
+ test_reuse_csv_files["_avg_reuse_interval"] = reuse_interval_labels_;
+ test_reuse_csv_files["_avg_reuse_interval_naccesses"] =
+ reuse_interval_labels_;
+ for (auto const& test : test_reuse_csv_files) {
+ const std::string& file_suffix = test.first;
+ const std::string& labels = test.second;
+ const uint32_t expected_num_rows = 5;
+ std::stringstream ss(labels);
+ while (ss.good()) {
+ std::string l;
+ ASSERT_TRUE(getline(ss, l, ','));
+ const std::string reuse_csv_file = test_path_ + "/" + l + file_suffix;
+ std::ifstream infile(reuse_csv_file);
+ std::string line;
+ ASSERT_TRUE(getline(infile, line));
+ double npercentage = 0;
+ uint32_t nrows = 0;
+ while (getline(infile, line)) {
+ std::stringstream ss_naccess(line);
+ bool label_read = false;
+ nrows++;
+ while (ss_naccess.good()) {
+ std::string substr;
+ ASSERT_TRUE(getline(ss_naccess, substr, ','));
+ if (!label_read) {
+ label_read = true;
+ continue;
+ }
+ npercentage += ParseDouble(substr);
+ }
+ }
+ ASSERT_EQ(expected_num_rows, nrows);
+ if ("_reuse_lifetime" == test.first ||
+ "_avg_reuse_interval" == test.first ||
+ "_avg_reuse_interval_naccesses" == test.first) {
+ ASSERT_EQ(100, npercentage) << reuse_csv_file;
+ } else {
+ ASSERT_LT(npercentage, 0);
+ }
+ ASSERT_OK(env_->DeleteFile(reuse_csv_file));
+ }
+ }
+ }
+
+ {
+ // Validate the percentage of accesses summary.
+ const std::string percent_access_summary_file =
+ test_path_ + "/percentage_of_accesses_summary";
+ std::ifstream infile(percent_access_summary_file);
+ std::string line;
+ ASSERT_TRUE(getline(infile, line));
+ std::set<std::string> callers;
+ std::set<std::string> expected_callers{"Get", "MultiGet", "Iterator",
+ "Prefetch", "Compaction"};
+ while (getline(infile, line)) {
+ std::stringstream caller_percent(line);
+ std::string caller;
+ ASSERT_TRUE(getline(caller_percent, caller, ','));
+ std::string percent;
+ ASSERT_TRUE(getline(caller_percent, percent, ','));
+ ASSERT_FALSE(caller_percent.good());
+ callers.insert(caller);
+ ASSERT_EQ(20, ParseDouble(percent));
+ }
+ ASSERT_EQ(expected_callers.size(), callers.size());
+ for (auto caller : callers) {
+ ASSERT_TRUE(expected_callers.find(caller) != expected_callers.end());
+ }
+ ASSERT_OK(env_->DeleteFile(percent_access_summary_file));
+ }
+ {
+ // Validate the percentage of accesses summary by analyzing callers.
+ std::stringstream analyzing_callers(analyzing_callers_);
+ while (analyzing_callers.good()) {
+ std::string caller;
+ ASSERT_TRUE(getline(analyzing_callers, caller, ','));
+ std::vector<std::string> breakdowns{"level", "bt"};
+ for (auto breakdown : breakdowns) {
+ const std::string file_name = test_path_ + "/" + caller + "_" +
+ breakdown +
+ "_percentage_of_accesses_summary";
+ std::ifstream infile(file_name);
+ std::string line;
+ ASSERT_TRUE(getline(infile, line));
+ double sum = 0;
+ while (getline(infile, line)) {
+ std::stringstream label_percent(line);
+ std::string label;
+ ASSERT_TRUE(getline(label_percent, label, ','));
+ std::string percent;
+ ASSERT_TRUE(getline(label_percent, percent, ','));
+ ASSERT_FALSE(label_percent.good());
+ sum += ParseDouble(percent);
+ }
+ ASSERT_EQ(100, sum);
+ ASSERT_OK(env_->DeleteFile(file_name));
+ }
+ }
+ }
+ const std::vector<std::string> access_types{"user_access_only", "all_access"};
+ const std::vector<std::string> prefix{"bt", "cf"};
+ for (auto const& pre : prefix) {
+ for (auto const& access_type : access_types) {
+ {
+ // Validate the access count summary.
+ const std::string bt_access_count_summary = test_path_ + "/" + pre +
+ "_" + access_type +
+ "_access_count_summary";
+ std::ifstream infile(bt_access_count_summary);
+ std::string line;
+ ASSERT_TRUE(getline(infile, line));
+ double sum_percent = 0;
+ while (getline(infile, line)) {
+ std::stringstream bt_percent(line);
+ std::string bt;
+ ASSERT_TRUE(getline(bt_percent, bt, ','));
+ std::string percent;
+ ASSERT_TRUE(getline(bt_percent, percent, ','));
+ sum_percent += ParseDouble(percent);
+ }
+ ASSERT_EQ(100.0, sum_percent);
+ ASSERT_OK(env_->DeleteFile(bt_access_count_summary));
+ }
+ }
+ }
+ for (auto const& access_type : access_types) {
+ std::vector<std::string> block_types{"Index", "Data", "Filter"};
+ for (auto block_type : block_types) {
+ // Validate reuse block timeline.
+ const std::string reuse_blocks_timeline = test_path_ + "/" + block_type +
+ "_" + access_type +
+ "_5_reuse_blocks_timeline";
+ std::ifstream infile(reuse_blocks_timeline);
+ std::string line;
+ ASSERT_TRUE(getline(infile, line)) << reuse_blocks_timeline;
+ uint32_t index = 0;
+ while (getline(infile, line)) {
+ std::stringstream timeline(line);
+ bool start_time = false;
+ double sum = 0;
+ while (timeline.good()) {
+ std::string value;
+ ASSERT_TRUE(getline(timeline, value, ','));
+ if (!start_time) {
+ start_time = true;
+ continue;
+ }
+ sum += ParseDouble(value);
+ }
+ index++;
+ ASSERT_LT(sum, 100.0 * index + 1) << reuse_blocks_timeline;
+ }
+ ASSERT_OK(env_->DeleteFile(reuse_blocks_timeline));
+ }
+ }
+
+ std::stringstream ss(analyze_get_spatial_locality_labels_);
+ while (ss.good()) {
+ std::string l;
+ ASSERT_TRUE(getline(ss, l, ','));
+ const std::vector<std::string> spatial_locality_files{
+ "_percent_ref_keys", "_percent_accesses_on_ref_keys",
+ "_percent_data_size_on_ref_keys"};
+ for (auto const& spatial_locality_file : spatial_locality_files) {
+ const std::string filename = test_path_ + "/" + l + spatial_locality_file;
+ std::ifstream infile(filename);
+ std::string line;
+ ASSERT_TRUE(getline(infile, line));
+ double sum_percent = 0;
+ uint32_t nrows = 0;
+ while (getline(infile, line)) {
+ std::stringstream bt_percent(line);
+ std::string bt;
+ ASSERT_TRUE(getline(bt_percent, bt, ','));
+ std::string percent;
+ ASSERT_TRUE(getline(bt_percent, percent, ','));
+ sum_percent += ParseDouble(percent);
+ nrows++;
+ }
+ ASSERT_EQ(11u, nrows);
+ ASSERT_EQ(100.0, sum_percent);
+ ASSERT_OK(env_->DeleteFile(filename));
+ }
+ }
+ ASSERT_OK(env_->DeleteFile(block_cache_sim_config_path_));
+}
+
+TEST_F(BlockCacheTracerTest, MixedBlocks) {
+ {
+ // Generate a trace file containing a mix of blocks.
+ // It contains two SST files with 25 blocks of odd numbered block_key in
+ // kSSTStoringOddKeys and 25 blocks of even numbered blocks_key in
+ // kSSTStoringEvenKeys.
+ BlockCacheTraceWriterOptions trace_writer_opt;
+ std::unique_ptr<TraceWriter> trace_writer;
+ const auto& clock = env_->GetSystemClock();
+ ASSERT_OK(NewFileTraceWriter(env_, env_options_, trace_file_path_,
+ &trace_writer));
+ std::unique_ptr<BlockCacheTraceWriter> block_cache_trace_writer =
+ NewBlockCacheTraceWriter(clock.get(), trace_writer_opt,
+ std::move(trace_writer));
+ ASSERT_NE(block_cache_trace_writer, nullptr);
+ ASSERT_OK(block_cache_trace_writer->WriteHeader());
+ // Write blocks of different types.
+ WriteBlockAccess(block_cache_trace_writer.get(), 0,
+ TraceType::kBlockTraceUncompressionDictBlock, 10);
+ WriteBlockAccess(block_cache_trace_writer.get(), 10,
+ TraceType::kBlockTraceDataBlock, 10);
+ WriteBlockAccess(block_cache_trace_writer.get(), 20,
+ TraceType::kBlockTraceFilterBlock, 10);
+ WriteBlockAccess(block_cache_trace_writer.get(), 30,
+ TraceType::kBlockTraceIndexBlock, 10);
+ WriteBlockAccess(block_cache_trace_writer.get(), 40,
+ TraceType::kBlockTraceRangeDeletionBlock, 10);
+ ASSERT_OK(env_->FileExists(trace_file_path_));
+ }
+
+ {
+ // Verify trace file is generated correctly.
+ std::unique_ptr<TraceReader> trace_reader;
+ ASSERT_OK(NewFileTraceReader(env_, env_options_, trace_file_path_,
+ &trace_reader));
+ BlockCacheTraceReader reader(std::move(trace_reader));
+ BlockCacheTraceHeader header;
+ ASSERT_OK(reader.ReadHeader(&header));
+ ASSERT_EQ(static_cast<uint32_t>(kMajorVersion),
+ header.rocksdb_major_version);
+ ASSERT_EQ(static_cast<uint32_t>(kMinorVersion),
+ header.rocksdb_minor_version);
+ // Read blocks.
+ BlockCacheTraceAnalyzer analyzer(
+ trace_file_path_,
+ /*output_miss_ratio_curve_path=*/"",
+ /*human_readable_trace_file_path=*/"",
+ /*compute_reuse_distance=*/true,
+ /*mrc_only=*/false,
+ /*is_block_cache_human_readable_trace=*/false,
+ /*simulator=*/nullptr);
+ // The analyzer ends when it detects an incomplete access record.
+ ASSERT_EQ(Status::Incomplete(""), analyzer.Analyze());
+ const uint64_t expected_num_cfs = 1;
+ std::vector<uint64_t> expected_fds{kSSTStoringOddKeys, kSSTStoringEvenKeys};
+ const std::vector<TraceType> expected_types{
+ TraceType::kBlockTraceUncompressionDictBlock,
+ TraceType::kBlockTraceDataBlock, TraceType::kBlockTraceFilterBlock,
+ TraceType::kBlockTraceIndexBlock,
+ TraceType::kBlockTraceRangeDeletionBlock};
+ const uint64_t expected_num_keys_per_type = 5;
+
+ auto& stats = analyzer.TEST_cf_aggregates_map();
+ ASSERT_EQ(expected_num_cfs, stats.size());
+ ASSERT_TRUE(stats.find(kDefaultColumnFamilyName) != stats.end());
+ auto& cf_stats = stats.find(kDefaultColumnFamilyName)->second;
+ ASSERT_EQ(expected_fds.size(), cf_stats.fd_aggregates_map.size());
+ for (auto fd_id : expected_fds) {
+ ASSERT_TRUE(cf_stats.fd_aggregates_map.find(fd_id) !=
+ cf_stats.fd_aggregates_map.end());
+ ASSERT_EQ(kLevel, cf_stats.fd_aggregates_map.find(fd_id)->second.level);
+ auto& block_type_aggregates_map = cf_stats.fd_aggregates_map.find(fd_id)
+ ->second.block_type_aggregates_map;
+ ASSERT_EQ(expected_types.size(), block_type_aggregates_map.size());
+ uint32_t key_id = 0;
+ for (auto type : expected_types) {
+ ASSERT_TRUE(block_type_aggregates_map.find(type) !=
+ block_type_aggregates_map.end());
+ auto& block_access_info_map =
+ block_type_aggregates_map.find(type)->second.block_access_info_map;
+ // Each block type has 5 blocks.
+ ASSERT_EQ(expected_num_keys_per_type, block_access_info_map.size());
+ for (uint32_t i = 0; i < 10; i++) {
+ // Verify that odd numbered blocks are stored in kSSTStoringOddKeys
+ // and even numbered blocks are stored in kSSTStoringEvenKeys.
+ auto key_id_str = kBlockKeyPrefix + std::to_string(key_id);
+ if (fd_id == kSSTStoringOddKeys) {
+ if (key_id % 2 == 1) {
+ AssertBlockAccessInfo(key_id, type, block_access_info_map);
+ } else {
+ ASSERT_TRUE(block_access_info_map.find(key_id_str) ==
+ block_access_info_map.end());
+ }
+ } else {
+ if (key_id % 2 == 1) {
+ ASSERT_TRUE(block_access_info_map.find(key_id_str) ==
+ block_access_info_map.end());
+ } else {
+ AssertBlockAccessInfo(key_id, type, block_access_info_map);
+ }
+ }
+ key_id++;
+ }
+ }
+ }
+ }
+}
+
+} // namespace ROCKSDB_NAMESPACE
+
+int main(int argc, char** argv) {
+ ROCKSDB_NAMESPACE::port::InstallStackTraceHandler();
+ ::testing::InitGoogleTest(&argc, argv);
+ return RUN_ALL_TESTS();
+}
+#endif // GFLAG
+#else
+#include <stdio.h>
+int main(int /*argc*/, char** /*argv*/) {
+ fprintf(stderr,
+ "block_cache_trace_analyzer_test is not supported in ROCKSDB_LITE\n");
+ return 0;
+}
+#endif // ROCKSDB_LITE
diff --git a/src/rocksdb/tools/block_cache_analyzer/block_cache_trace_analyzer_tool.cc b/src/rocksdb/tools/block_cache_analyzer/block_cache_trace_analyzer_tool.cc
new file mode 100644
index 000000000..44fec5598
--- /dev/null
+++ b/src/rocksdb/tools/block_cache_analyzer/block_cache_trace_analyzer_tool.cc
@@ -0,0 +1,25 @@
+// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
+// This source code is licensed under both the GPLv2 (found in the
+// COPYING file in the root directory) and Apache 2.0 License
+// (found in the LICENSE.Apache file in the root directory).
+//
+#ifndef ROCKSDB_LITE
+#ifndef GFLAGS
+#include <cstdio>
+int main() {
+ fprintf(stderr, "Please install gflags to run rocksdb tools\n");
+ return 1;
+}
+#else // GFLAGS
+#include "tools/block_cache_analyzer/block_cache_trace_analyzer.h"
+int main(int argc, char** argv) {
+ return ROCKSDB_NAMESPACE::block_cache_trace_analyzer_tool(argc, argv);
+}
+#endif // GFLAGS
+#else // ROCKSDB_LITE
+#include <stdio.h>
+int main(int /*argc*/, char** /*argv*/) {
+ fprintf(stderr, "Not supported in lite mode.\n");
+ return 1;
+}
+#endif // ROCKSDB_LITE