// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- // vim: ts=8 sw=2 smarttab /* * Ceph - scalable distributed file system * * Copyright (C) 2004-2006 Sage Weil * * This is free software; you can redistribute it and/or * modify it under the terms of the GNU Lesser General Public * License version 2.1, as published by the Free Software * Foundation. See file COPYING. * */ #ifndef CEPH_SHAREDCACHE_H #define CEPH_SHAREDCACHE_H #include #include #ifdef WITH_SEASTAR #include #else #include #endif #include "common/ceph_mutex.h" #include "common/ceph_context.h" #include "common/dout.h" #include "include/unordered_map.h" template class SharedLRU { CephContext *cct; #ifdef WITH_SEASTAR using VPtr = boost::local_shared_ptr; using WeakVPtr = boost::weak_ptr; #else using VPtr = std::shared_ptr; using WeakVPtr = std::weak_ptr; #endif ceph::mutex lock; size_t max_size; ceph::condition_variable cond; unsigned size; public: int waiting; private: using C = std::less; using H = std::hash; ceph::unordered_map >::iterator, H> contents; std::list > lru; std::map, C> weak_refs; void trim_cache(std::list *to_release) { while (size > max_size) { to_release->push_back(lru.back().second); lru_remove(lru.back().first); } } void lru_remove(const K& key) { auto i = contents.find(key); if (i == contents.end()) return; lru.erase(i->second); --size; contents.erase(i); } void lru_add(const K& key, const VPtr& val, std::list *to_release) { auto i = contents.find(key); if (i != contents.end()) { lru.splice(lru.begin(), lru, i->second); } else { ++size; lru.push_front(make_pair(key, val)); contents[key] = lru.begin(); trim_cache(to_release); } } void remove(const K& key, V *valptr) { std::lock_guard l{lock}; auto i = weak_refs.find(key); if (i != weak_refs.end() && i->second.second == valptr) { weak_refs.erase(i); } cond.notify_all(); } class Cleanup { public: SharedLRU *cache; K key; Cleanup(SharedLRU *cache, K key) : cache(cache), key(key) {} void operator()(V *ptr) { cache->remove(key, ptr); delete ptr; } }; public: SharedLRU(CephContext *cct = NULL, size_t max_size = 20) : cct(cct), lock{ceph::make_mutex("SharedLRU::lock")}, max_size(max_size), size(0), waiting(0) { contents.rehash(max_size); } ~SharedLRU() { contents.clear(); lru.clear(); if (!weak_refs.empty()) { lderr(cct) << "leaked refs:\n"; dump_weak_refs(*_dout); *_dout << dendl; if (cct->_conf.get_val("debug_asserts_on_shutdown")) { ceph_assert(weak_refs.empty()); } } } int get_count() { std::lock_guard locker{lock}; return size; } void set_cct(CephContext *c) { cct = c; } void dump_weak_refs() { lderr(cct) << "leaked refs:\n"; dump_weak_refs(*_dout); *_dout << dendl; } void dump_weak_refs(std::ostream& out) { for (const auto& [key, ref] : weak_refs) { out << __func__ << " " << this << " weak_refs: " << key << " = " << ref.second << " with " << ref.first.use_count() << " refs" << std::endl; } } //clear all strong reference from the lru. void clear() { while (true) { VPtr val; // release any ref we have after we drop the lock std::lock_guard locker{lock}; if (size == 0) break; val = lru.back().second; lru_remove(lru.back().first); } } void clear(const K& key) { VPtr val; // release any ref we have after we drop the lock { std::lock_guard l{lock}; auto i = weak_refs.find(key); if (i != weak_refs.end()) { val = i->second.first.lock(); } lru_remove(key); } } /* Clears weakrefs in the interval [from, to] -- note that to is inclusive */ void clear_range( const K& from, const K& to) { std::list vals; // release any refs we have after we drop the lock { std::lock_guard l{lock}; auto from_iter = weak_refs.lower_bound(from); auto to_iter = weak_refs.upper_bound(to); for (auto i = from_iter; i != to_iter; ) { vals.push_back(i->second.first.lock()); lru_remove((i++)->first); } } } void purge(const K &key) { VPtr val; // release any ref we have after we drop the lock { std::lock_guard l{lock}; auto i = weak_refs.find(key); if (i != weak_refs.end()) { val = i->second.first.lock(); weak_refs.erase(i); } lru_remove(key); } } void set_size(size_t new_size) { std::list to_release; { std::lock_guard l{lock}; max_size = new_size; trim_cache(&to_release); } } // Returns K key s.t. key <= k for all currently cached k,v K cached_key_lower_bound() { std::lock_guard l{lock}; return weak_refs.begin()->first; } VPtr lower_bound(const K& key) { VPtr val; std::list to_release; { std::unique_lock l{lock}; ++waiting; cond.wait(l, [this, &key, &val, &to_release] { if (weak_refs.empty()) { return true; } auto i = weak_refs.lower_bound(key); if (i == weak_refs.end()) { --i; } if (val = i->second.first.lock(); val) { lru_add(i->first, val, &to_release); return true; } else { return false; } }); --waiting; } return val; } bool get_next(const K &key, std::pair *next) { std::pair r; { std::lock_guard l{lock}; VPtr next_val; typename std::map, C>::iterator i = weak_refs.upper_bound(key); while (i != weak_refs.end() && !(next_val = i->second.first.lock())) ++i; if (i == weak_refs.end()) return false; if (next) r = make_pair(i->first, next_val); } if (next) *next = r; return true; } bool get_next(const K &key, std::pair *next) { std::pair r; bool found = get_next(key, &r); if (!found || !next) return found; next->first = r.first; ceph_assert(r.second); next->second = *(r.second); return found; } VPtr lookup(const K& key) { VPtr val; std::list to_release; { std::unique_lock l{lock}; ++waiting; cond.wait(l, [this, &key, &val, &to_release] { if (auto i = weak_refs.find(key); i != weak_refs.end()) { if (val = i->second.first.lock(); val) { lru_add(key, val, &to_release); return true; } else { return false; } } else { return true; } }); --waiting; } return val; } VPtr lookup_or_create(const K &key) { VPtr val; std::list to_release; { std::unique_lock l{lock}; cond.wait(l, [this, &key, &val] { if (auto i = weak_refs.find(key); i != weak_refs.end()) { if (val = i->second.first.lock(); val) { return true; } else { return false; } } else { return true; } }); if (!val) { val = VPtr{new V{}, Cleanup{this, key}}; weak_refs.insert(make_pair(key, make_pair(val, val.get()))); } lru_add(key, val, &to_release); } return val; } /** * empty() * * Returns true iff there are no live references left to anything that has been * in the cache. */ bool empty() { std::lock_guard l{lock}; return weak_refs.empty(); } /*** * Inserts a key if not present, or bumps it to the front of the LRU if * it is, and then gives you a reference to the value. If the key already * existed, you are responsible for deleting the new value you tried to * insert. * * @param key The key to insert * @param value The value that goes with the key * @param existed Set to true if the value was already in the * map, false otherwise * @return A reference to the map's value for the given key */ VPtr add(const K& key, V *value, bool *existed = NULL) { VPtr val; std::list to_release; { typename std::map, C>::iterator actual; std::unique_lock l{lock}; cond.wait(l, [this, &key, &actual, &val] { actual = weak_refs.lower_bound(key); if (actual != weak_refs.end() && actual->first == key) { val = actual->second.first.lock(); if (val) { return true; } else { return false; } } else { return true; } }); if (val) { if (existed) { *existed = true; } } else { if (existed) { *existed = false; } val = VPtr(value, Cleanup(this, key)); weak_refs.insert(actual, make_pair(key, make_pair(val, value))); } lru_add(key, val, &to_release); } return val; } friend class SharedLRUTest; }; #endif