summaryrefslogtreecommitdiffstats
path: root/src/rgw/rgw_bucket_sync.h
blob: d1d09bbfc0722bb38483d6c74d15e2fe70a67db4 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
// vim: ts=8 sw=2 smarttab

/*
 * Ceph - scalable distributed file system
 *
 * Copyright (C) 2018 Red Hat, Inc.
 *
 * This is free software; you can redistribute it and/or
 * modify it under the terms of the GNU Lesser General Public
 * License version 2.1, as published by the Free Software
 * Foundation. See file COPYING.
 *
 */

#pragma once

#include "rgw_common.h"
#include "rgw_sync_policy.h"

class RGWSI_Zone;
class RGWSI_SyncModules;
class RGWSI_Bucket_Sync;

struct rgw_sync_group_pipe_map;
struct rgw_sync_bucket_pipes;
struct rgw_sync_policy_info;

struct rgw_sync_group_pipe_map {
  rgw_zone_id zone;
  std::optional<rgw_bucket> bucket;

  rgw_sync_policy_group::Status status{rgw_sync_policy_group::Status::FORBIDDEN};

  using zb_pipe_map_t = std::multimap<rgw_sync_bucket_entity, rgw_sync_bucket_pipe>;

  zb_pipe_map_t sources; /* all the pipes where zone is pulling from */
  zb_pipe_map_t dests; /* all the pipes that pull from zone */

  std::set<rgw_zone_id> *pall_zones{nullptr};
  rgw_sync_data_flow_group *default_flow{nullptr}; /* flow to use if policy doesn't define it,
                                                      used in the case of bucket sync policy, not at the
                                                      zonegroup level */

  void dump(ceph::Formatter *f) const;

  template <typename CB1, typename CB2>
  void try_add_to_pipe_map(const rgw_zone_id& source_zone,
                           const rgw_zone_id& dest_zone,
                           const std::vector<rgw_sync_bucket_pipes>& pipes,
                           zb_pipe_map_t *pipe_map,
                           CB1 filter_cb,
                           CB2 call_filter_cb);
          
  template <typename CB>
  void try_add_source(const rgw_zone_id& source_zone,
                      const rgw_zone_id& dest_zone,
                      const std::vector<rgw_sync_bucket_pipes>& pipes,
                      CB filter_cb);
          
  template <typename CB>
  void try_add_dest(const rgw_zone_id& source_zone,
                  const rgw_zone_id& dest_zone,
                  const std::vector<rgw_sync_bucket_pipes>& pipes,
                  CB filter_cb);
          
  pair<zb_pipe_map_t::const_iterator, zb_pipe_map_t::const_iterator> find_pipes(const zb_pipe_map_t& m,
                                                                                const rgw_zone_id& zone,
                                                                                std::optional<rgw_bucket> b) const;

  template <typename CB>
  void init(CephContext *cct,
            const rgw_zone_id& _zone,
            std::optional<rgw_bucket> _bucket,
            const rgw_sync_policy_group& group,
            rgw_sync_data_flow_group *_default_flow,
            std::set<rgw_zone_id> *_pall_zones,
            CB filter_cb);

  /*
   * find all relevant pipes in our zone that match {dest_bucket} <- {source_zone, source_bucket}
   */
  vector<rgw_sync_bucket_pipe> find_source_pipes(const rgw_zone_id& source_zone,
                                                 std::optional<rgw_bucket> source_bucket,
                                                 std::optional<rgw_bucket> dest_bucket) const;

  /*
   * find all relevant pipes in other zones that pull from a specific
   * source bucket in out zone {source_bucket} -> {dest_zone, dest_bucket}
   */
  vector<rgw_sync_bucket_pipe> find_dest_pipes(std::optional<rgw_bucket> source_bucket,
                                               const rgw_zone_id& dest_zone,
                                               std::optional<rgw_bucket> dest_bucket) const;

  /*
   * find all relevant pipes from {source_zone, source_bucket} -> {dest_zone, dest_bucket}
   */
  vector<rgw_sync_bucket_pipe> find_pipes(const rgw_zone_id& source_zone,
                                          std::optional<rgw_bucket> source_bucket,
                                          const rgw_zone_id& dest_zone,
                                          std::optional<rgw_bucket> dest_bucket) const;
};

class RGWSyncPolicyCompat {
public:
  static void convert_old_sync_config(RGWSI_Zone *zone_svc,
                                      RGWSI_SyncModules *sync_modules_svc,
                                      rgw_sync_policy_info *ppolicy);
};

class RGWBucketSyncFlowManager {
  friend class RGWBucketSyncPolicyHandler;
public:
  struct endpoints_pair {
    rgw_sync_bucket_entity source;
    rgw_sync_bucket_entity dest;

    endpoints_pair() {}
    endpoints_pair(const rgw_sync_bucket_pipe& pipe) {
      source = pipe.source;
      dest = pipe.dest;
    }

    bool operator<(const endpoints_pair& e) const {
      if (source < e.source) {
        return true;
      }
      if (e.source < source) {
        return false;
      }
      return (dest < e.dest);
    }
  };

  /*
   * pipe_rules: deal with a set of pipes that have common endpoints_pair
   */
  class pipe_rules {
    std::list<rgw_sync_bucket_pipe> pipes;

  public:
    using prefix_map_t = multimap<string, rgw_sync_bucket_pipe *>;

    map<string, rgw_sync_bucket_pipe *> tag_refs;
    prefix_map_t prefix_refs;

    void insert(const rgw_sync_bucket_pipe& pipe);

    bool find_basic_info_without_tags(const rgw_obj_key& key,
                                      std::optional<rgw_user> *user,
                                      std::optional<rgw_user> *acl_translation,
                                      std::optional<string> *storage_class,
                                      rgw_sync_pipe_params::Mode *mode,
                                      bool *need_more_info) const;
    bool find_obj_params(const rgw_obj_key& key, 
                         const RGWObjTags::tag_map_t& tags,
                         rgw_sync_pipe_params *params) const;

    void scan_prefixes(std::vector<string> *prefixes) const;

    prefix_map_t::const_iterator prefix_begin() const {
      return prefix_refs.begin();
    }
    prefix_map_t::const_iterator prefix_search(const std::string& s) const;
    prefix_map_t::const_iterator prefix_end() const {
      return prefix_refs.end();
    }
  };

  using pipe_rules_ref = std::shared_ptr<pipe_rules>;

  /*
   * pipe_handler: extends endpoints_rule to point at the corresponding rules handler
   */
  struct pipe_handler : public endpoints_pair {
    pipe_rules_ref rules;

    pipe_handler() {}
    pipe_handler(pipe_rules_ref& _rules,
                 const rgw_sync_bucket_pipe& _pipe) : endpoints_pair(_pipe),
                                                      rules(_rules) {}
    bool specific() const {
      return source.specific() && dest.specific();
    }
    
    bool find_basic_info_without_tags(const rgw_obj_key& key,
                                      std::optional<rgw_user> *user,
                                      std::optional<rgw_user> *acl_translation,
                                      std::optional<string> *storage_class,
                                      rgw_sync_pipe_params::Mode *mode,
                                      bool *need_more_info) const {
      if (!rules) {
        return false;
      }
      return rules->find_basic_info_without_tags(key, user, acl_translation, storage_class, mode, need_more_info);
    }

    bool find_obj_params(const rgw_obj_key& key,
                         const RGWObjTags::tag_map_t& tags,
                         rgw_sync_pipe_params *params) const {
      if (!rules) {
        return false;
      }
      return rules->find_obj_params(key, tags, params);
    }
  };

  struct pipe_set {
    std::map<endpoints_pair, pipe_rules_ref> rules;
    std::multimap<string, rgw_sync_bucket_pipe> pipe_map;

    std::set<pipe_handler> handlers;

    using iterator = std::set<pipe_handler>::iterator;

    void clear() {
      rules.clear();
      pipe_map.clear();
      handlers.clear();
    }

    void insert(const rgw_sync_bucket_pipe& pipe);

    iterator begin() const {
      return handlers.begin();
    }

    iterator end() const {
      return handlers.end();
    }

    void dump(ceph::Formatter *f) const;
  };

private:

  CephContext *cct;

  rgw_zone_id zone_id;
  std::optional<rgw_bucket> bucket;

  const RGWBucketSyncFlowManager *parent{nullptr};

  map<string, rgw_sync_group_pipe_map> flow_groups;

  std::set<rgw_zone_id> all_zones;

  bool allowed_data_flow(const rgw_zone_id& source_zone,
                         std::optional<rgw_bucket> source_bucket,
                         const rgw_zone_id& dest_zone,
                         std::optional<rgw_bucket> dest_bucket,
                         bool check_activated) const;

  /*
   * find all the matching flows om a flow map for a specific bucket
   */
  void update_flow_maps(const rgw_sync_bucket_pipes& pipe);

  void init(const rgw_sync_policy_info& sync_policy);

public:

  RGWBucketSyncFlowManager(CephContext *_cct,
                           const rgw_zone_id& _zone_id,
                           std::optional<rgw_bucket> _bucket,
                           const RGWBucketSyncFlowManager *_parent);

  void reflect(std::optional<rgw_bucket> effective_bucket,
               pipe_set *flow_by_source,
               pipe_set *flow_by_dest,  
               bool only_enabled) const;

};

static inline ostream& operator<<(ostream& os, const RGWBucketSyncFlowManager::endpoints_pair& e) {
  os << e.dest << " -> " << e.source;
  return os;
}

class RGWBucketSyncPolicyHandler {
  bool legacy_config{false};
  const RGWBucketSyncPolicyHandler *parent{nullptr};
  RGWSI_Zone *zone_svc;
  RGWSI_Bucket_Sync *bucket_sync_svc;
  rgw_zone_id zone_id;
  std::optional<RGWBucketInfo> bucket_info;
  std::optional<map<string, bufferlist> > bucket_attrs;
  std::optional<rgw_bucket> bucket;
  std::unique_ptr<RGWBucketSyncFlowManager> flow_mgr;
  rgw_sync_policy_info sync_policy;

  RGWBucketSyncFlowManager::pipe_set source_pipes;
  RGWBucketSyncFlowManager::pipe_set target_pipes;

  map<rgw_zone_id, RGWBucketSyncFlowManager::pipe_set> sources; /* source pipes by source zone id */
  map<rgw_zone_id, RGWBucketSyncFlowManager::pipe_set> targets; /* target pipes by target zone id */

  std::set<rgw_zone_id> source_zones;
  std::set<rgw_zone_id> target_zones;

  std::set<rgw_bucket> source_hints;
  std::set<rgw_bucket> target_hints;
  std::set<rgw_sync_bucket_pipe> resolved_sources;
  std::set<rgw_sync_bucket_pipe> resolved_dests;


  bool bucket_is_sync_source() const {
    return !targets.empty() || !resolved_dests.empty();
  }

  bool bucket_is_sync_target() const {
    return !sources.empty() || !resolved_sources.empty();
  }

  RGWBucketSyncPolicyHandler(const RGWBucketSyncPolicyHandler *_parent,
                             const RGWBucketInfo& _bucket_info,
                             map<string, bufferlist>&& _bucket_attrs);

  RGWBucketSyncPolicyHandler(const RGWBucketSyncPolicyHandler *_parent,
                             const rgw_bucket& _bucket,
                             std::optional<rgw_sync_policy_info> _sync_policy);
public:
  RGWBucketSyncPolicyHandler(RGWSI_Zone *_zone_svc,
                             RGWSI_SyncModules *sync_modules_svc,
			     RGWSI_Bucket_Sync *bucket_sync_svc,
                             std::optional<rgw_zone_id> effective_zone = std::nullopt);

  RGWBucketSyncPolicyHandler *alloc_child(const RGWBucketInfo& bucket_info,
                                          map<string, bufferlist>&& bucket_attrs) const;
  RGWBucketSyncPolicyHandler *alloc_child(const rgw_bucket& bucket,
                                          std::optional<rgw_sync_policy_info> sync_policy) const;

  int init(const DoutPrefixProvider *dpp, optional_yield y);

  void reflect(RGWBucketSyncFlowManager::pipe_set *psource_pipes,
               RGWBucketSyncFlowManager::pipe_set *ptarget_pipes,
               map<rgw_zone_id, RGWBucketSyncFlowManager::pipe_set> *psources,
               map<rgw_zone_id, RGWBucketSyncFlowManager::pipe_set> *ptargets,
               std::set<rgw_zone_id> *psource_zones,
               std::set<rgw_zone_id> *ptarget_zones,
               bool only_enabled) const;

  void set_resolved_hints(std::set<rgw_sync_bucket_pipe>&& _resolved_sources,
                          std::set<rgw_sync_bucket_pipe>&& _resolved_dests) {
    resolved_sources = std::move(_resolved_sources);
    resolved_dests = std::move(_resolved_dests);
  }

  const std::set<rgw_sync_bucket_pipe>& get_resolved_source_hints() {
    return resolved_sources;
  }

  const std::set<rgw_sync_bucket_pipe>& get_resolved_dest_hints() {
    return resolved_dests;
  }

  const std::set<rgw_zone_id>& get_source_zones() const {
    return source_zones;
  }

  const std::set<rgw_zone_id>& get_target_zones() const {
    return target_zones;
  }

  const  map<rgw_zone_id, RGWBucketSyncFlowManager::pipe_set>& get_sources() {
    return sources;
  }

  multimap<rgw_zone_id, rgw_sync_bucket_pipe> get_all_sources() const;
  multimap<rgw_zone_id, rgw_sync_bucket_pipe> get_all_dests() const;
  multimap<rgw_zone_id, rgw_sync_bucket_pipe> get_all_dests_in_zone(const rgw_zone_id& zone_id) const;

  const  map<rgw_zone_id, RGWBucketSyncFlowManager::pipe_set>& get_targets() {
    return targets;
  }

  const std::optional<RGWBucketInfo>& get_bucket_info() const {
    return bucket_info;
  }

  const std::optional<map<string, bufferlist> >& get_bucket_attrs() const {
    return bucket_attrs;
  }

  void get_pipes(RGWBucketSyncFlowManager::pipe_set **_sources, RGWBucketSyncFlowManager::pipe_set **_targets) { /* return raw pipes (with zone name) */
    *_sources = &source_pipes;
    *_targets = &target_pipes;
  }
  void get_pipes(std::set<rgw_sync_bucket_pipe> *sources, std::set<rgw_sync_bucket_pipe> *targets,
                 std::optional<rgw_sync_bucket_entity> filter_peer);

  const std::set<rgw_bucket>& get_source_hints() const {
    return source_hints;
  }

  const std::set<rgw_bucket>& get_target_hints() const {
    return target_hints;
  }

  bool bucket_exports_data() const;
  bool bucket_imports_data() const;

  const rgw_sync_policy_info& get_sync_policy() const {
    return sync_policy;
  }

  bool is_legacy_config() const {
    return legacy_config;
  }
};