summaryrefslogtreecommitdiffstats
path: root/src/crimson/os/seastore/onode_manager/staged-fltree/stages/sub_items_stage.h
blob: 8ef5f747263b04a6589a9248b929ac62776f5634 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:nil -*-
// vim: ts=8 sw=2 smarttab

#pragma once

#include <variant>

#include "crimson/os/seastore/onode_manager/staged-fltree/node_types.h"
#include "key_layout.h"
#include "stage_types.h"

namespace crimson::os::seastore::onode {

class NodeExtentMutable;

struct internal_sub_item_t {
  const snap_gen_t& get_key() const { return key; }
  const laddr_packed_t* get_p_value() const { return &value; }

  snap_gen_t key;
  laddr_packed_t value;
} __attribute__((packed));

/**
 * internal_sub_items_t
 *
 * The STAGE_RIGHT implementation for internal node N0/N1/N2, implements staged
 * contract as an indexable container to index snap-gen to child node
 * addresses.
 *
 * The layout of the contaner storing n sub-items:
 *
 * # <--------- container range -----------> #
 * #<~># sub-items [2, n)                    #
 * #   # <- sub-item 1 -> # <- sub-item 0 -> #
 * #...# snap-gen | laddr # snap-gen | laddr #
 *                        ^
 *                        |
 *           p_first_item +
 */
class internal_sub_items_t {
 public:
  using num_keys_t = index_t;

  internal_sub_items_t(const memory_range_t& range) {
    assert(range.p_start < range.p_end);
    assert((range.p_end - range.p_start) % sizeof(internal_sub_item_t) == 0);
    num_items = (range.p_end - range.p_start) / sizeof(internal_sub_item_t);
    assert(num_items > 0);
    auto _p_first_item = range.p_end - sizeof(internal_sub_item_t);
    p_first_item = reinterpret_cast<const internal_sub_item_t*>(_p_first_item);
  }

  // container type system
  using key_get_type = const snap_gen_t&;
  static constexpr auto CONTAINER_TYPE = ContainerType::INDEXABLE;
  num_keys_t keys() const { return num_items; }
  key_get_type operator[](index_t index) const {
    assert(index < num_items);
    return (p_first_item - index)->get_key();
  }
  node_offset_t size_before(index_t index) const {
    size_t ret = index * sizeof(internal_sub_item_t);
    assert(ret < NODE_BLOCK_SIZE);
    return ret;
  }
  const laddr_packed_t* get_p_value(index_t index) const {
    assert(index < num_items);
    return (p_first_item - index)->get_p_value();
  }
  node_offset_t size_overhead_at(index_t index) const { return 0u; }
  void encode(const char* p_node_start, ceph::bufferlist& encoded) const {
    auto p_end = reinterpret_cast<const char*>(p_first_item) +
                 sizeof(internal_sub_item_t);
    auto p_start = p_end - num_items * sizeof(internal_sub_item_t);
    int start_offset = p_start - p_node_start;
    int end_offset = p_end - p_node_start;
    assert(start_offset > 0 &&
           start_offset < end_offset &&
           end_offset < NODE_BLOCK_SIZE);
    ceph::encode(static_cast<node_offset_t>(start_offset), encoded);
    ceph::encode(static_cast<node_offset_t>(end_offset), encoded);
  }

  static internal_sub_items_t decode(
      const char* p_node_start, ceph::bufferlist::const_iterator& delta) {
    node_offset_t start_offset;
    ceph::decode(start_offset, delta);
    node_offset_t end_offset;
    ceph::decode(end_offset, delta);
    assert(start_offset < end_offset);
    assert(end_offset <= NODE_BLOCK_SIZE);
    return internal_sub_items_t({p_node_start + start_offset,
                                 p_node_start + end_offset});
  }

  static node_offset_t header_size() { return 0u; }

  template <KeyT KT>
  static node_offset_t estimate_insert(
      const full_key_t<KT>&, const laddr_packed_t&) {
    return sizeof(internal_sub_item_t);
  }

  template <KeyT KT>
  static const laddr_packed_t* insert_at(
      NodeExtentMutable&, const internal_sub_items_t&,
      const full_key_t<KT>&, const laddr_packed_t&,
      index_t index, node_offset_t size, const char* p_left_bound);

  static node_offset_t trim_until(NodeExtentMutable&, internal_sub_items_t&, index_t);

  template <KeyT KT>
  class Appender;

 private:
  index_t num_items;
  const internal_sub_item_t* p_first_item;
};

template <KeyT KT>
class internal_sub_items_t::Appender {
 public:
  Appender(NodeExtentMutable* p_mut, char* p_append)
    : p_mut{p_mut}, p_append{p_append} {}
  void append(const internal_sub_items_t& src, index_t from, index_t items);
  void append(const full_key_t<KT>&, const laddr_packed_t&, const laddr_packed_t*&);
  char* wrap() { return p_append; }
 private:
  NodeExtentMutable* p_mut;
  char* p_append;
};

/**
 * leaf_sub_items_t
 *
 * The STAGE_RIGHT implementation for leaf node N0/N1/N2, implements staged
 * contract as an indexable container to index snap-gen to onode_t.
 *
 * The layout of the contaner storing n sub-items:
 *
 * # <------------------------ container range -------------------------------> #
 * # <---------- sub-items ----------------> # <--- offsets ---------#          #
 * #<~># sub-items [2, n)                    #<~>| offsets [2, n)    #          #
 * #   # <- sub-item 1 -> # <- sub-item 0 -> #   |                   #          #
 * #...# snap-gen | onode # snap-gen | onode #...| offset1 | offset0 # num_keys #
 *                                           ^             ^         ^
 *                                           |             |         |
 *                               p_items_end +   p_offsets +         |
 *                                                        p_num_keys +
 */
class leaf_sub_items_t {
 public:
  // TODO: decide by NODE_BLOCK_SIZE, sizeof(snap_gen_t),
  //       and the minimal size of onode_t
  using num_keys_t = uint8_t;

  leaf_sub_items_t(const memory_range_t& range) {
    assert(range.p_start < range.p_end);
    auto _p_num_keys = range.p_end - sizeof(num_keys_t);
    assert(range.p_start < _p_num_keys);
    p_num_keys = reinterpret_cast<const num_keys_t*>(_p_num_keys);
    assert(keys());
    auto _p_offsets = _p_num_keys - sizeof(node_offset_t);
    assert(range.p_start < _p_offsets);
    p_offsets = reinterpret_cast<const node_offset_packed_t*>(_p_offsets);
    p_items_end = reinterpret_cast<const char*>(&get_offset(keys() - 1));
    assert(range.p_start < p_items_end);
    assert(range.p_start == p_start());
  }

  bool operator==(const leaf_sub_items_t& x) {
    return (p_num_keys == x.p_num_keys &&
            p_offsets == x.p_offsets &&
            p_items_end == x.p_items_end);
  }

  const char* p_start() const { return get_item_end(keys()); }

  const node_offset_packed_t& get_offset(index_t index) const {
    assert(index < keys());
    return *(p_offsets - index);
  }

  const node_offset_t get_offset_to_end(index_t index) const {
    assert(index <= keys());
    return index == 0 ? 0 : get_offset(index - 1).value;
  }

  const char* get_item_start(index_t index) const {
    return p_items_end - get_offset(index).value;
  }

  const char* get_item_end(index_t index) const {
    return p_items_end - get_offset_to_end(index);
  }

  // container type system
  using key_get_type = const snap_gen_t&;
  static constexpr auto CONTAINER_TYPE = ContainerType::INDEXABLE;
  num_keys_t keys() const { return *p_num_keys; }
  key_get_type operator[](index_t index) const {
    assert(index < keys());
    auto pointer = get_item_end(index);
    assert(get_item_start(index) < pointer);
    pointer -= sizeof(snap_gen_t);
    assert(get_item_start(index) < pointer);
    return *reinterpret_cast<const snap_gen_t*>(pointer);
  }
  node_offset_t size_before(index_t index) const {
    assert(index <= keys());
    size_t ret;
    if (index == 0) {
      ret = sizeof(num_keys_t);
    } else {
      --index;
      ret = sizeof(num_keys_t) +
            (index + 1) * sizeof(node_offset_t) +
            get_offset(index).value;
    }
    assert(ret < NODE_BLOCK_SIZE);
    return ret;
  }
  node_offset_t size_overhead_at(index_t index) const { return sizeof(node_offset_t); }
  const onode_t* get_p_value(index_t index) const {
    assert(index < keys());
    auto pointer = get_item_start(index);
    auto value = reinterpret_cast<const onode_t*>(pointer);
    assert(pointer + value->size + sizeof(snap_gen_t) == get_item_end(index));
    return value;
  }
  void encode(const char* p_node_start, ceph::bufferlist& encoded) const {
    auto p_end = reinterpret_cast<const char*>(p_num_keys) +
                  sizeof(num_keys_t);
    int start_offset = p_start() - p_node_start;
    int end_offset = p_end - p_node_start;
    assert(start_offset > 0 &&
           start_offset < end_offset &&
           end_offset < NODE_BLOCK_SIZE);
    ceph::encode(static_cast<node_offset_t>(start_offset), encoded);
    ceph::encode(static_cast<node_offset_t>(end_offset), encoded);
  }

  static leaf_sub_items_t decode(
      const char* p_node_start, ceph::bufferlist::const_iterator& delta) {
    node_offset_t start_offset;
    ceph::decode(start_offset, delta);
    node_offset_t end_offset;
    ceph::decode(end_offset, delta);
    assert(start_offset < end_offset);
    assert(end_offset <= NODE_BLOCK_SIZE);
    return leaf_sub_items_t({p_node_start + start_offset,
                             p_node_start + end_offset});
  }

  static node_offset_t header_size() { return sizeof(num_keys_t); }

  template <KeyT KT>
  static node_offset_t estimate_insert(const full_key_t<KT>&, const onode_t& value) {
    return value.size + sizeof(snap_gen_t) + sizeof(node_offset_t);
  }

  template <KeyT KT>
  static const onode_t* insert_at(
      NodeExtentMutable&, const leaf_sub_items_t&,
      const full_key_t<KT>&, const onode_t&,
      index_t index, node_offset_t size, const char* p_left_bound);

  static node_offset_t trim_until(NodeExtentMutable&, leaf_sub_items_t&, index_t index);

  template <KeyT KT>
  class Appender;

 private:
  // TODO: support unaligned access
  const num_keys_t* p_num_keys;
  const node_offset_packed_t* p_offsets;
  const char* p_items_end;
};

constexpr index_t APPENDER_LIMIT = 3u;

template <KeyT KT>
class leaf_sub_items_t::Appender {
  struct range_items_t {
    index_t from;
    index_t items;
  };
  struct kv_item_t {
    const full_key_t<KT>* p_key;
    const onode_t* p_value;
  };
  using var_t = std::variant<range_items_t, kv_item_t>;

 public:
  Appender(NodeExtentMutable* p_mut, char* p_append)
    : p_mut{p_mut}, p_append{p_append} {
  }

  void append(const leaf_sub_items_t& src, index_t from, index_t items) {
    assert(cnt <= APPENDER_LIMIT);
    assert(from <= src.keys());
    if (items == 0) {
      return;
    }
    if (op_src) {
      assert(*op_src == src);
    } else {
      op_src = src;
    }
    assert(from < src.keys());
    assert(from + items <= src.keys());
    appends[cnt] = range_items_t{from, items};
    ++cnt;
  }
  void append(const full_key_t<KT>& key,
              const onode_t& value, const onode_t*& p_value) {
    assert(pp_value == nullptr);
    assert(cnt <= APPENDER_LIMIT);
    appends[cnt] = kv_item_t{&key, &value};
    ++cnt;
    pp_value = &p_value;
  }
  char* wrap();

 private:
  std::optional<leaf_sub_items_t> op_src;
  const onode_t** pp_value = nullptr;
  NodeExtentMutable* p_mut;
  char* p_append;
  var_t appends[APPENDER_LIMIT];
  index_t cnt = 0;
};

template <node_type_t> struct _sub_items_t;
template<> struct _sub_items_t<node_type_t::INTERNAL> { using type = internal_sub_items_t; };
template<> struct _sub_items_t<node_type_t::LEAF> { using type = leaf_sub_items_t; };
template <node_type_t NODE_TYPE>
using sub_items_t = typename _sub_items_t<NODE_TYPE>::type;

}