// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- // vim: ts=8 sw=2 smarttab ft=cpp /* * Ceph - scalable distributed file system * * Copyright (C) 2020 Red Hat, Inc * * This is free software; you can redistribute it and/or * modify it under the terms of the GNU Lesser General Public * License version 2.1, as published by the Free Software * Foundation. See file COPYING. */ #pragma once #include #include "common/intrusive_lru.h" #include "rgw_data_sync.h" namespace rgw::bucket_sync { // per bucket-shard state cached by DataSyncShardCR struct State { // the source bucket shard to sync std::pair> key; // current sync obligation being processed by DataSyncSingleEntry std::optional obligation; // incremented with each new obligation uint32_t counter = 0; // highest timestamp applied by all sources ceph::real_time progress_timestamp; State(const std::pair>& key ) noexcept : key(key) {} State(const rgw_bucket_shard& shard, std::optional gen) noexcept : key(shard, gen) {} }; struct Entry; struct EntryToKey; class Handle; using lru_config = ceph::common::intrusive_lru_config< std::pair>, Entry, EntryToKey>; // a recyclable cache entry struct Entry : State, ceph::common::intrusive_lru_base { using State::State; }; struct EntryToKey { using type = std::pair>; const type& operator()(const Entry& e) { return e.key; } }; // use a non-atomic reference count since these aren't shared across threads template using thread_unsafe_ref_counter = boost::intrusive_ref_counter< T, boost::thread_unsafe_counter>; // a state cache for entries within a single datalog shard class Cache : public thread_unsafe_ref_counter { ceph::common::intrusive_lru cache; protected: // protected ctor to enforce the use of factory function create() explicit Cache(size_t target_size) { cache.set_target_size(target_size); } public: static boost::intrusive_ptr create(size_t target_size) { return new Cache(target_size); } // find or create a cache entry for the given key, and return a Handle that // keeps it lru-pinned until destruction Handle get(const rgw_bucket_shard& shard, std::optional gen); }; // a State handle that keeps the Cache referenced class Handle { boost::intrusive_ptr cache; boost::intrusive_ptr entry; public: Handle() noexcept = default; ~Handle() = default; Handle(boost::intrusive_ptr cache, boost::intrusive_ptr entry) noexcept : cache(std::move(cache)), entry(std::move(entry)) {} Handle(Handle&&) = default; Handle(const Handle&) = default; Handle& operator=(Handle&& o) noexcept { // move the entry first so that its cache stays referenced over destruction entry = std::move(o.entry); cache = std::move(o.cache); return *this; } Handle& operator=(const Handle& o) noexcept { // copy the entry first so that its cache stays referenced over destruction entry = o.entry; cache = o.cache; return *this; } explicit operator bool() const noexcept { return static_cast(entry); } State& operator*() const noexcept { return *entry; } State* operator->() const noexcept { return entry.get(); } }; inline Handle Cache::get(const rgw_bucket_shard& shard, std::optional gen) { auto result = cache.get_or_create({ shard, gen }); return {this, std::move(result.first)}; } } // namespace rgw::bucket_sync