summaryrefslogtreecommitdiffstats
path: root/src/rgw/rgw_bucket_sync_cache.h
blob: de81d26c9bc31fc446a8429fc70e64371a85d38d (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
// vim: ts=8 sw=2 smarttab ft=cpp

/*
 * Ceph - scalable distributed file system
 *
 * Copyright (C) 2020 Red Hat, Inc
 *
 * This is free software; you can redistribute it and/or
 * modify it under the terms of the GNU Lesser General Public
 * License version 2.1, as published by the Free Software
 * Foundation.  See file COPYING.
 */

#pragma once

#include <boost/smart_ptr/intrusive_ref_counter.hpp>
#include "common/intrusive_lru.h"
#include "rgw_data_sync.h"

namespace rgw::bucket_sync {

// per bucket-shard state cached by DataSyncShardCR
struct State {
  // the source bucket shard to sync
  rgw_bucket_shard key;
  // current sync obligation being processed by DataSyncSingleEntry
  std::optional<rgw_data_sync_obligation> obligation;
  // incremented with each new obligation
  uint32_t counter = 0;
  // highest timestamp applied by all sources
  ceph::real_time progress_timestamp;

  State(const rgw_bucket_shard& key) noexcept : key(key) {}
};

struct Entry;
struct EntryToKey;
class Handle;

using lru_config = ceph::common::intrusive_lru_config<
    rgw_bucket_shard, Entry, EntryToKey>;

// a recyclable cache entry
struct Entry : State, ceph::common::intrusive_lru_base<lru_config> {
  using State::State;
};

struct EntryToKey {
  using type = rgw_bucket_shard;
  const type& operator()(const Entry& e) { return e.key; }
};

// use a non-atomic reference count since these aren't shared across threads
template <typename T>
using thread_unsafe_ref_counter = boost::intrusive_ref_counter<
    T, boost::thread_unsafe_counter>;

// a state cache for entries within a single datalog shard
class Cache : public thread_unsafe_ref_counter<Cache> {
  ceph::common::intrusive_lru<lru_config> cache;
 protected:
  // protected ctor to enforce the use of factory function create()
  explicit Cache(size_t target_size) {
    cache.set_target_size(target_size);
  }
 public:
  static boost::intrusive_ptr<Cache> create(size_t target_size) {
    return new Cache(target_size);
  }

  // find or create a cache entry for the given key, and return a Handle that
  // keeps it lru-pinned until destruction
  Handle get(const rgw_bucket_shard& key);
};

// a State handle that keeps the Cache referenced
class Handle {
  boost::intrusive_ptr<Cache> cache;
  boost::intrusive_ptr<Entry> entry;
 public:
  Handle() noexcept = default;
  ~Handle() = default;
  Handle(boost::intrusive_ptr<Cache> cache,
         boost::intrusive_ptr<Entry> entry) noexcept
    : cache(std::move(cache)), entry(std::move(entry)) {}
  Handle(Handle&&) = default;
  Handle(const Handle&) = default;
  Handle& operator=(Handle&& o) noexcept {
    // move the entry first so that its cache stays referenced over destruction
    entry = std::move(o.entry);
    cache = std::move(o.cache);
    return *this;
  }
  Handle& operator=(const Handle& o) noexcept {
    // copy the entry first so that its cache stays referenced over destruction
    entry = o.entry;
    cache = o.cache;
    return *this;
  }

  explicit operator bool() const noexcept { return static_cast<bool>(entry); }
  State& operator*() const noexcept { return *entry; }
  State* operator->() const noexcept { return entry.get(); }
};

inline Handle Cache::get(const rgw_bucket_shard& key)
{
  auto result = cache.get_or_create(key);
  return {this, std::move(result.first)};
}

} // namespace rgw::bucket_sync