1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
|
// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
// vim: ts=8 sw=2 smarttab
#pragma once
#include <map>
#include <algorithm>
#include <seastar/core/future.hh>
#include <seastar/core/shared_future.hh>
#include "include/types.h"
#include "crimson/common/type_helpers.h"
#include "crimson/common/smp_helpers.h"
#include "crimson/osd/osd_operation.h"
#include "osd/osd_types.h"
namespace crimson::osd {
class PG;
/**
* PGShardMapping
*
* Maintains a mapping from spg_t to the core containing that PG. Internally, each
* core has a local copy of the mapping to enable core-local lookups. Updates
* are proxied to core 0, and the back out to all other cores -- see maybe_create_pg.
*/
class PGShardMapping : public seastar::peering_sharded_service<PGShardMapping> {
public:
/// Returns mapping if present, NULL_CORE otherwise
core_id_t get_pg_mapping(spg_t pgid) {
auto iter = pg_to_core.find(pgid);
ceph_assert_always(iter == pg_to_core.end() || iter->second != NULL_CORE);
return iter == pg_to_core.end() ? NULL_CORE : iter->second;
}
/// Returns mapping for pgid, creates new one if it doesn't already exist
seastar::future<core_id_t> maybe_create_pg(
spg_t pgid,
core_id_t core = NULL_CORE) {
auto find_iter = pg_to_core.find(pgid);
if (find_iter != pg_to_core.end()) {
ceph_assert_always(find_iter->second != NULL_CORE);
if (core != NULL_CORE) {
ceph_assert_always(find_iter->second == core);
}
return seastar::make_ready_future<core_id_t>(find_iter->second);
} else {
return container().invoke_on(0,[pgid, core]
(auto &primary_mapping) {
auto [insert_iter, inserted] = primary_mapping.pg_to_core.emplace(pgid, core);
ceph_assert_always(inserted);
ceph_assert_always(primary_mapping.core_to_num_pgs.size() > 0);
std::map<core_id_t, unsigned>::iterator core_iter;
if (core == NULL_CORE) {
core_iter = std::min_element(
primary_mapping.core_to_num_pgs.begin(),
primary_mapping.core_to_num_pgs.end(),
[](const auto &left, const auto &right) {
return left.second < right.second;
});
} else {
core_iter = primary_mapping.core_to_num_pgs.find(core);
}
ceph_assert_always(primary_mapping.core_to_num_pgs.end() != core_iter);
insert_iter->second = core_iter->first;
core_iter->second++;
return primary_mapping.container().invoke_on_others(
[pgid = insert_iter->first, core = insert_iter->second]
(auto &other_mapping) {
ceph_assert_always(core != NULL_CORE);
auto [insert_iter, inserted] = other_mapping.pg_to_core.emplace(pgid, core);
ceph_assert_always(inserted);
});
}).then([this, pgid] {
auto find_iter = pg_to_core.find(pgid);
return seastar::make_ready_future<core_id_t>(find_iter->second);
});
}
}
/// Remove pgid
seastar::future<> remove_pg(spg_t pgid) {
return container().invoke_on(0, [pgid](auto &primary_mapping) {
auto iter = primary_mapping.pg_to_core.find(pgid);
ceph_assert_always(iter != primary_mapping.pg_to_core.end());
ceph_assert_always(iter->second != NULL_CORE);
auto count_iter = primary_mapping.core_to_num_pgs.find(iter->second);
ceph_assert_always(count_iter != primary_mapping.core_to_num_pgs.end());
ceph_assert_always(count_iter->second > 0);
--(count_iter->second);
primary_mapping.pg_to_core.erase(iter);
return primary_mapping.container().invoke_on_others(
[pgid](auto &other_mapping) {
auto iter = other_mapping.pg_to_core.find(pgid);
ceph_assert_always(iter != other_mapping.pg_to_core.end());
ceph_assert_always(iter->second != NULL_CORE);
other_mapping.pg_to_core.erase(iter);
});
});
}
size_t get_num_pgs() const { return pg_to_core.size(); }
/// Map to cores in [min_core_mapping, core_mapping_limit)
PGShardMapping(core_id_t min_core_mapping, core_id_t core_mapping_limit) {
ceph_assert_always(min_core_mapping < core_mapping_limit);
for (auto i = min_core_mapping; i != core_mapping_limit; ++i) {
core_to_num_pgs.emplace(i, 0);
}
}
template <typename F>
void for_each_pgid(F &&f) const {
for (const auto &i: pg_to_core) {
std::invoke(f, i.first);
}
}
private:
std::map<core_id_t, unsigned> core_to_num_pgs;
std::map<spg_t, core_id_t> pg_to_core;
};
/**
* PGMap
*
* Maps spg_t to PG instance within a shard. Handles dealing with waiting
* on pg creation.
*/
class PGMap {
struct PGCreationState : BlockerT<PGCreationState> {
static constexpr const char * type_name = "PGCreation";
void dump_detail(Formatter *f) const final;
spg_t pgid;
seastar::shared_promise<Ref<PG>> promise;
bool creating = false;
PGCreationState(spg_t pgid);
PGCreationState(const PGCreationState &) = delete;
PGCreationState(PGCreationState &&) = delete;
PGCreationState &operator=(const PGCreationState &) = delete;
PGCreationState &operator=(PGCreationState &&) = delete;
~PGCreationState();
};
std::map<spg_t, PGCreationState> pgs_creating;
using pgs_t = std::map<spg_t, Ref<PG>>;
pgs_t pgs;
public:
using PGCreationBlocker = PGCreationState;
using PGCreationBlockingEvent = PGCreationBlocker::BlockingEvent;
/**
* Get future for pg with a bool indicating whether it's already being
* created.
*/
using wait_for_pg_ertr = crimson::errorator<
crimson::ct_error::ecanceled>;
using wait_for_pg_fut = wait_for_pg_ertr::future<Ref<PG>>;
using wait_for_pg_ret = std::pair<wait_for_pg_fut, bool>;
wait_for_pg_ret wait_for_pg(PGCreationBlockingEvent::TriggerI&&, spg_t pgid);
/**
* get PG in non-blocking manner
*/
Ref<PG> get_pg(spg_t pgid);
/**
* Set creating
*/
void set_creating(spg_t pgid);
/**
* Set newly created pg
*/
void pg_created(spg_t pgid, Ref<PG> pg);
/**
* Add newly loaded pg
*/
void pg_loaded(spg_t pgid, Ref<PG> pg);
/**
* Cancel pending creation of pgid.
*/
void pg_creation_canceled(spg_t pgid);
void remove_pg(spg_t pgid);
pgs_t& get_pgs() { return pgs; }
const pgs_t& get_pgs() const { return pgs; }
auto get_pg_count() const { return pgs.size(); }
PGMap() = default;
~PGMap();
};
}
|