1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
|
// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
// vim: ts=8 sw=2 smarttab ft=cpp
/*
* Ceph - scalable distributed file system
*
* Copyright (C) 2020 Red Hat, Inc
*
* This is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
* License version 2.1, as published by the Free Software
* Foundation. See file COPYING.
*/
#pragma once
#include <boost/smart_ptr/intrusive_ref_counter.hpp>
#include "common/intrusive_lru.h"
#include "rgw_data_sync.h"
namespace rgw::bucket_sync {
// per bucket-shard state cached by DataSyncShardCR
struct State {
// the source bucket shard to sync
std::pair<rgw_bucket_shard, std::optional<uint64_t>> key;
// current sync obligation being processed by DataSyncSingleEntry
std::optional<rgw_data_sync_obligation> obligation;
// incremented with each new obligation
uint32_t counter = 0;
// highest timestamp applied by all sources
ceph::real_time progress_timestamp;
State(const std::pair<rgw_bucket_shard, std::optional<uint64_t>>& key ) noexcept
: key(key) {}
State(const rgw_bucket_shard& shard, std::optional<uint64_t> gen) noexcept
: key(shard, gen) {}
};
struct Entry;
struct EntryToKey;
class Handle;
using lru_config = ceph::common::intrusive_lru_config<
std::pair<rgw_bucket_shard, std::optional<uint64_t>>, Entry, EntryToKey>;
// a recyclable cache entry
struct Entry : State, ceph::common::intrusive_lru_base<lru_config> {
using State::State;
};
struct EntryToKey {
using type = std::pair<rgw_bucket_shard, std::optional<uint64_t>>;
const type& operator()(const Entry& e) { return e.key; }
};
// use a non-atomic reference count since these aren't shared across threads
template <typename T>
using thread_unsafe_ref_counter = boost::intrusive_ref_counter<
T, boost::thread_unsafe_counter>;
// a state cache for entries within a single datalog shard
class Cache : public thread_unsafe_ref_counter<Cache> {
ceph::common::intrusive_lru<lru_config> cache;
protected:
// protected ctor to enforce the use of factory function create()
explicit Cache(size_t target_size) {
cache.set_target_size(target_size);
}
public:
static boost::intrusive_ptr<Cache> create(size_t target_size) {
return new Cache(target_size);
}
// find or create a cache entry for the given key, and return a Handle that
// keeps it lru-pinned until destruction
Handle get(const rgw_bucket_shard& shard, std::optional<uint64_t> gen);
};
// a State handle that keeps the Cache referenced
class Handle {
boost::intrusive_ptr<Cache> cache;
boost::intrusive_ptr<Entry> entry;
public:
Handle() noexcept = default;
~Handle() = default;
Handle(boost::intrusive_ptr<Cache> cache,
boost::intrusive_ptr<Entry> entry) noexcept
: cache(std::move(cache)), entry(std::move(entry)) {}
Handle(Handle&&) = default;
Handle(const Handle&) = default;
Handle& operator=(Handle&& o) noexcept {
// move the entry first so that its cache stays referenced over destruction
entry = std::move(o.entry);
cache = std::move(o.cache);
return *this;
}
Handle& operator=(const Handle& o) noexcept {
// copy the entry first so that its cache stays referenced over destruction
entry = o.entry;
cache = o.cache;
return *this;
}
explicit operator bool() const noexcept { return static_cast<bool>(entry); }
State& operator*() const noexcept { return *entry; }
State* operator->() const noexcept { return entry.get(); }
};
inline Handle Cache::get(const rgw_bucket_shard& shard, std::optional<uint64_t> gen)
{
auto result = cache.get_or_create({ shard, gen });
return {this, std::move(result.first)};
}
} // namespace rgw::bucket_sync
|