summaryrefslogtreecommitdiffstats
path: root/src/common/PriorityCache.cc
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--src/common/PriorityCache.cc398
1 files changed, 398 insertions, 0 deletions
diff --git a/src/common/PriorityCache.cc b/src/common/PriorityCache.cc
new file mode 100644
index 00000000..bb4366b6
--- /dev/null
+++ b/src/common/PriorityCache.cc
@@ -0,0 +1,398 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2018 Red Hat
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+#include "PriorityCache.h"
+#include "common/dout.h"
+#include "perfglue/heap_profiler.h"
+#define dout_context cct
+#define dout_subsys ceph_subsys_prioritycache
+#undef dout_prefix
+#define dout_prefix *_dout << "prioritycache "
+
+namespace PriorityCache
+{
+ int64_t get_chunk(uint64_t usage, uint64_t total_bytes)
+ {
+ uint64_t chunk = total_bytes;
+
+ // Find the nearest power of 2
+ chunk -= 1;
+ chunk |= chunk >> 1;
+ chunk |= chunk >> 2;
+ chunk |= chunk >> 4;
+ chunk |= chunk >> 8;
+ chunk |= chunk >> 16;
+ chunk |= chunk >> 32;
+ chunk += 1;
+ // shrink it to 1/256 of the rounded up cache size
+ chunk /= 256;
+
+ // bound the chunk size to be between 4MB and 32MB
+ chunk = (chunk > 4ul*1024*1024) ? chunk : 4ul*1024*1024;
+ chunk = (chunk < 16ul*1024*1024) ? chunk : 16ul*1024*1024;
+
+ /* Add 16 chunks of headroom and round up to the near chunk. Note that
+ * if RocksDB is used, it's a good idea to have N MB of headroom where
+ * N is the target_file_size_base value. RocksDB will read SST files
+ * into the block cache during compaction which potentially can force out
+ * all existing cached data. Once compaction is finished, the SST data is
+ * released leaving an empty cache. Having enough headroom to absorb
+ * compaction reads allows the kv cache grow even during extremely heavy
+ * compaction workloads.
+ */
+ uint64_t val = usage + (16 * chunk);
+ uint64_t r = (val) % chunk;
+ if (r > 0)
+ val = val + chunk - r;
+ return val;
+ }
+
+ Manager::Manager(CephContext *c,
+ uint64_t min,
+ uint64_t max,
+ uint64_t target,
+ bool reserve_extra) :
+ cct(c),
+ caches{},
+ min_mem(min),
+ max_mem(max),
+ target_mem(target),
+ tuned_mem(min),
+ reserve_extra(reserve_extra)
+ {
+ PerfCountersBuilder b(cct, "prioritycache",
+ MallocStats::M_FIRST, MallocStats::M_LAST);
+
+ b.add_u64(MallocStats::M_TARGET_BYTES, "target_bytes",
+ "target process memory usage in bytes", "t",
+ PerfCountersBuilder::PRIO_INTERESTING, unit_t(UNIT_BYTES));
+
+ b.add_u64(MallocStats::M_MAPPED_BYTES, "mapped_bytes",
+ "total bytes mapped by the process", "m",
+ PerfCountersBuilder::PRIO_INTERESTING, unit_t(UNIT_BYTES));
+
+ b.add_u64(MallocStats::M_UNMAPPED_BYTES, "unmapped_bytes",
+ "unmapped bytes that the kernel has yet to reclaimed", "u",
+ PerfCountersBuilder::PRIO_INTERESTING, unit_t(UNIT_BYTES));
+
+ b.add_u64(MallocStats::M_HEAP_BYTES, "heap_bytes",
+ "aggregate bytes in use by the heap", "h",
+ PerfCountersBuilder::PRIO_INTERESTING, unit_t(UNIT_BYTES));
+
+ b.add_u64(MallocStats::M_CACHE_BYTES, "cache_bytes",
+ "current memory available for caches.", "c",
+ PerfCountersBuilder::PRIO_INTERESTING, unit_t(UNIT_BYTES));
+
+ logger = b.create_perf_counters();
+ cct->get_perfcounters_collection()->add(logger);
+
+ tune_memory();
+ }
+
+ Manager::~Manager()
+ {
+ clear();
+ cct->get_perfcounters_collection()->remove(logger);
+ delete logger;
+ }
+
+ void Manager::tune_memory()
+ {
+ size_t heap_size = 0;
+ size_t unmapped = 0;
+ uint64_t mapped = 0;
+
+ ceph_heap_release_free_memory();
+ ceph_heap_get_numeric_property("generic.heap_size", &heap_size);
+ ceph_heap_get_numeric_property("tcmalloc.pageheap_unmapped_bytes", &unmapped);
+ mapped = heap_size - unmapped;
+
+ uint64_t new_size = tuned_mem;
+ new_size = (new_size < max_mem) ? new_size : max_mem;
+ new_size = (new_size > min_mem) ? new_size : min_mem;
+
+ // Approach the min/max slowly, but bounce away quickly.
+ if ((uint64_t) mapped < target_mem) {
+ double ratio = 1 - ((double) mapped / target_mem);
+ new_size += ratio * (max_mem - new_size);
+ } else {
+ double ratio = 1 - ((double) target_mem / mapped);
+ new_size -= ratio * (new_size - min_mem);
+ }
+
+ ldout(cct, 5) << __func__
+ << " target: " << target_mem
+ << " mapped: " << mapped
+ << " unmapped: " << unmapped
+ << " heap: " << heap_size
+ << " old mem: " << tuned_mem
+ << " new mem: " << new_size << dendl;
+
+ tuned_mem = new_size;
+
+ logger->set(MallocStats::M_TARGET_BYTES, target_mem);
+ logger->set(MallocStats::M_MAPPED_BYTES, mapped);
+ logger->set(MallocStats::M_UNMAPPED_BYTES, unmapped);
+ logger->set(MallocStats::M_HEAP_BYTES, heap_size);
+ logger->set(MallocStats::M_CACHE_BYTES, new_size);
+ }
+
+ void Manager::insert(const std::string& name, std::shared_ptr<PriCache> c,
+ bool enable_perf_counters)
+ {
+ ceph_assert(!caches.count(name));
+ ceph_assert(!indexes.count(name));
+
+ caches.emplace(name, c);
+
+ if (!enable_perf_counters) {
+ return;
+ }
+
+ // TODO: If we ever assign more than
+ // PERF_COUNTER_MAX_BOUND - PERF_COUNTER_LOWER_BOUND perf counters for
+ // priority caching we could run out of slots. Recycle them some day?
+ // Also note that start and end are *exclusive*.
+ int start = cur_index++;
+ int end = cur_index + Extra::E_LAST + 1;
+
+ ceph_assert(end < PERF_COUNTER_MAX_BOUND);
+ indexes.emplace(name, std::vector<int>(Extra::E_LAST + 1));
+
+ PerfCountersBuilder b(cct, "prioritycache:" + name, start, end);
+
+ b.add_u64(cur_index + Priority::PRI0, "pri0_bytes",
+ "bytes allocated to pri0", "p0",
+ PerfCountersBuilder::PRIO_INTERESTING, unit_t(UNIT_BYTES));
+
+ b.add_u64(cur_index + Priority::PRI1, "pri1_bytes",
+ "bytes allocated to pri1", "p1",
+ PerfCountersBuilder::PRIO_INTERESTING, unit_t(UNIT_BYTES));
+
+ b.add_u64(cur_index + Priority::PRI2, "pri2_bytes",
+ "bytes allocated to pri2", "p2",
+ PerfCountersBuilder::PRIO_INTERESTING, unit_t(UNIT_BYTES));
+
+ b.add_u64(cur_index + Priority::PRI3, "pri3_bytes",
+ "bytes allocated to pri3", "p3",
+ PerfCountersBuilder::PRIO_INTERESTING, unit_t(UNIT_BYTES));
+
+ b.add_u64(cur_index + Priority::PRI4, "pri4_bytes",
+ "bytes allocated to pri4", "p4",
+ PerfCountersBuilder::PRIO_INTERESTING, unit_t(UNIT_BYTES));
+
+ b.add_u64(cur_index + Priority::PRI5, "pri5_bytes",
+ "bytes allocated to pri5", "p5",
+ PerfCountersBuilder::PRIO_INTERESTING, unit_t(UNIT_BYTES));
+
+ b.add_u64(cur_index + Priority::PRI6, "pri6_bytes",
+ "bytes allocated to pri6", "p6",
+ PerfCountersBuilder::PRIO_INTERESTING, unit_t(UNIT_BYTES));
+
+ b.add_u64(cur_index + Priority::PRI7, "pri7_bytes",
+ "bytes allocated to pri7", "p7",
+ PerfCountersBuilder::PRIO_INTERESTING, unit_t(UNIT_BYTES));
+
+ b.add_u64(cur_index + Priority::PRI8, "pri8_bytes",
+ "bytes allocated to pri8", "p8",
+ PerfCountersBuilder::PRIO_INTERESTING, unit_t(UNIT_BYTES));
+
+ b.add_u64(cur_index + Priority::PRI9, "pri9_bytes",
+ "bytes allocated to pri9", "p9",
+ PerfCountersBuilder::PRIO_INTERESTING, unit_t(UNIT_BYTES));
+
+ b.add_u64(cur_index + Priority::PRI10, "pri10_bytes",
+ "bytes allocated to pri10", "p10",
+ PerfCountersBuilder::PRIO_INTERESTING, unit_t(UNIT_BYTES));
+
+ b.add_u64(cur_index + Priority::PRI11, "pri11_bytes",
+ "bytes allocated to pri11", "p11",
+ PerfCountersBuilder::PRIO_INTERESTING, unit_t(UNIT_BYTES));
+
+ b.add_u64(cur_index + Extra::E_RESERVED, "reserved_bytes",
+ "bytes reserved for future growth.", "r",
+ PerfCountersBuilder::PRIO_INTERESTING, unit_t(UNIT_BYTES));
+
+ b.add_u64(cur_index + Extra::E_COMMITTED, "committed_bytes",
+ "total bytes committed,", "c",
+ PerfCountersBuilder::PRIO_CRITICAL, unit_t(UNIT_BYTES));
+
+ for (int i = 0; i < Extra::E_LAST+1; i++) {
+ indexes[name][i] = cur_index + i;
+ }
+
+ auto l = b.create_perf_counters();
+ loggers.emplace(name, l);
+ cct->get_perfcounters_collection()->add(l);
+
+ cur_index = end;
+ }
+
+ void Manager::erase(const std::string& name)
+ {
+ auto li = loggers.find(name);
+ if (li != loggers.end()) {
+ cct->get_perfcounters_collection()->remove(li->second);
+ delete li->second;
+ loggers.erase(li);
+ }
+ indexes.erase(name);
+ caches.erase(name);
+ }
+
+ void Manager::clear()
+ {
+ auto li = loggers.begin();
+ while (li != loggers.end()) {
+ cct->get_perfcounters_collection()->remove(li->second);
+ delete li->second;
+ li = loggers.erase(li);
+ }
+ indexes.clear();
+ caches.clear();
+ }
+
+ void Manager::balance()
+ {
+ int64_t mem_avail = tuned_mem;
+ // Each cache is going to get a little extra from get_chunk, so shrink the
+ // available memory here to compensate.
+ if (reserve_extra) {
+ mem_avail -= get_chunk(1, tuned_mem) * caches.size();
+ }
+
+ if (mem_avail < 0) {
+ // There's so little memory available that just assigning a chunk per
+ // cache pushes us over the limit. Set mem_avail to 0 and continue to
+ // ensure each priority's byte counts are zeroed in balance_priority.
+ mem_avail = 0;
+ }
+
+ // Assign memory for each priority level
+ for (int i = 0; i < Priority::LAST+1; i++) {
+ ldout(cct, 10) << __func__ << " assigning cache bytes for PRI: " << i << dendl;
+
+ auto pri = static_cast<Priority>(i);
+ balance_priority(&mem_avail, pri);
+
+ // Update the per-priority perf counters
+ for (auto &l : loggers) {
+ auto it = caches.find(l.first);
+ ceph_assert(it != caches.end());
+
+ auto bytes = it->second->get_cache_bytes(pri);
+ l.second->set(indexes[it->first][pri], bytes);
+ }
+ }
+ // assert if we assigned more memory than is available.
+ ceph_assert(mem_avail >= 0);
+
+ for (auto &l : loggers) {
+ auto it = caches.find(l.first);
+ ceph_assert(it != caches.end());
+
+ // Commit the new cache size
+ int64_t committed = it->second->commit_cache_size(tuned_mem);
+
+ // Update the perf counters
+ int64_t alloc = it->second->get_cache_bytes();
+
+ l.second->set(indexes[it->first][Extra::E_RESERVED], committed - alloc);
+ l.second->set(indexes[it->first][Extra::E_COMMITTED], committed);
+ }
+ }
+
+ void Manager::balance_priority(int64_t *mem_avail, Priority pri)
+ {
+ std::unordered_map<std::string, std::shared_ptr<PriCache>> tmp_caches = caches;
+ double cur_ratios = 0;
+ double new_ratios = 0;
+ uint64_t round = 0;
+
+ // First, zero this priority's bytes, sum the initial ratios.
+ for (auto it = caches.begin(); it != caches.end(); it++) {
+ it->second->set_cache_bytes(pri, 0);
+ cur_ratios += it->second->get_cache_ratio();
+ }
+
+ // For other priorities, loop until caches are satisified or we run out of
+ // memory (stop if we can't guarantee a full byte allocation).
+ while (!tmp_caches.empty() && *mem_avail > static_cast<int64_t>(tmp_caches.size())) {
+ uint64_t total_assigned = 0;
+ for (auto it = tmp_caches.begin(); it != tmp_caches.end();) {
+ int64_t cache_wants = it->second->request_cache_bytes(pri, tuned_mem);
+ // Usually the ratio should be set to the fraction of the current caches'
+ // assigned ratio compared to the total ratio of all caches that still
+ // want memory. There is a special case where the only caches left are
+ // all assigned 0% ratios but still want memory. In that case, give
+ // them an equal shot at the remaining memory for this priority.
+ double ratio = 1.0 / tmp_caches.size();
+ if (cur_ratios > 0) {
+ ratio = it->second->get_cache_ratio() / cur_ratios;
+ }
+ int64_t fair_share = static_cast<int64_t>(*mem_avail * ratio);
+
+ ldout(cct, 10) << __func__ << " " << it->first
+ << " pri: " << (int) pri
+ << " round: " << round
+ << " wanted: " << cache_wants
+ << " ratio: " << it->second->get_cache_ratio()
+ << " cur_ratios: " << cur_ratios
+ << " fair_share: " << fair_share
+ << " mem_avail: " << *mem_avail
+ << dendl;
+
+ if (cache_wants > fair_share) {
+ // If we want too much, take what we can get but stick around for more
+ it->second->add_cache_bytes(pri, fair_share);
+ total_assigned += fair_share;
+ new_ratios += it->second->get_cache_ratio();
+ ++it;
+ } else {
+ // Otherwise assign only what we want
+ if (cache_wants > 0) {
+ it->second->add_cache_bytes(pri, cache_wants);
+ total_assigned += cache_wants;
+ }
+ // Either the cache didn't want anything or got what it wanted, so
+ // remove it from the tmp list.
+ it = tmp_caches.erase(it);
+ }
+ }
+ // Reset the ratios
+ *mem_avail -= total_assigned;
+ cur_ratios = new_ratios;
+ new_ratios = 0;
+ ++round;
+ }
+
+ // If this is the last priority, divide up any remaining memory based
+ // solely on the ratios.
+ if (pri == Priority::LAST) {
+ uint64_t total_assigned = 0;
+ for (auto it = caches.begin(); it != caches.end(); it++) {
+ double ratio = it->second->get_cache_ratio();
+ int64_t fair_share = static_cast<int64_t>(*mem_avail * ratio);
+ it->second->set_cache_bytes(Priority::LAST, fair_share);
+ total_assigned += fair_share;
+ }
+ *mem_avail -= total_assigned;
+ return;
+ }
+ }
+
+ PriCache::~PriCache()
+ {
+ }
+}