summaryrefslogtreecommitdiffstats
path: root/src/rgw/rgw_trim_mdlog.cc
blob: f940f5c6e18d32546b207d3a65baeb2faf386606 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
// vim: ts=8 sw=2 smarttab ft=cpp

#include "common/errno.h"

#include "rgw_trim_mdlog.h"
#include "rgw_sync.h"
#include "rgw_cr_rados.h"
#include "rgw_cr_rest.h"
#include "rgw_zone.h"
#include "services/svc_zone.h"
#include "services/svc_meta.h"
#include "services/svc_mdlog.h"
#include "services/svc_cls.h"

#include <boost/asio/yield.hpp>

#define dout_subsys ceph_subsys_rgw

#undef dout_prefix
#define dout_prefix (*_dout << "meta trim: ")

/// purge all log shards for the given mdlog
class PurgeLogShardsCR : public RGWShardCollectCR {
  rgw::sal::RGWRadosStore *const store;
  const RGWMetadataLog* mdlog;
  const int num_shards;
  rgw_raw_obj obj;
  int i{0};

  static constexpr int max_concurrent = 16;

 public:
  PurgeLogShardsCR(rgw::sal::RGWRadosStore *store, const RGWMetadataLog* mdlog,
                   const rgw_pool& pool, int num_shards)
    : RGWShardCollectCR(store->ctx(), max_concurrent),
      store(store), mdlog(mdlog), num_shards(num_shards), obj(pool, "")
  {}

  bool spawn_next() override {
    if (i == num_shards) {
      return false;
    }
    mdlog->get_shard_oid(i++, obj.oid);
    spawn(new RGWRadosRemoveCR(store, obj), false);
    return true;
  }
};

using Cursor = RGWPeriodHistory::Cursor;

/// purge mdlogs from the oldest up to (but not including) the given realm_epoch
class PurgePeriodLogsCR : public RGWCoroutine {
  struct Svc {
    RGWSI_Zone *zone;
    RGWSI_MDLog *mdlog;
  } svc;
  const DoutPrefixProvider *dpp;
  rgw::sal::RGWRadosStore *const store;
  RGWMetadataManager *const metadata;
  RGWObjVersionTracker objv;
  Cursor cursor;
  epoch_t realm_epoch;
  epoch_t *last_trim_epoch; //< update last trim on success

 public:
  PurgePeriodLogsCR(const DoutPrefixProvider *dpp, rgw::sal::RGWRadosStore *store, epoch_t realm_epoch, epoch_t *last_trim)
    : RGWCoroutine(store->ctx()), dpp(dpp), store(store), metadata(store->ctl()->meta.mgr),
      realm_epoch(realm_epoch), last_trim_epoch(last_trim) {
    svc.zone = store->svc()->zone;
    svc.mdlog = store->svc()->mdlog;
  }

  int operate(const DoutPrefixProvider *dpp) override;
};

int PurgePeriodLogsCR::operate(const DoutPrefixProvider *dpp)
{
  reenter(this) {
    // read our current oldest log period
    yield call(svc.mdlog->read_oldest_log_period_cr(dpp, &cursor, &objv));
    if (retcode < 0) {
      return set_cr_error(retcode);
    }
    ceph_assert(cursor);
    ldpp_dout(dpp, 20) << "oldest log realm_epoch=" << cursor.get_epoch()
        << " period=" << cursor.get_period().get_id() << dendl;

    // trim -up to- the given realm_epoch
    while (cursor.get_epoch() < realm_epoch) {
      ldpp_dout(dpp, 4) << "purging log shards for realm_epoch=" << cursor.get_epoch()
          << " period=" << cursor.get_period().get_id() << dendl;
      yield {
        const auto mdlog = svc.mdlog->get_log(cursor.get_period().get_id());
        const auto& pool = svc.zone->get_zone_params().log_pool;
        auto num_shards = cct->_conf->rgw_md_log_max_shards;
        call(new PurgeLogShardsCR(store, mdlog, pool, num_shards));
      }
      if (retcode < 0) {
        ldpp_dout(dpp, 1) << "failed to remove log shards: "
            << cpp_strerror(retcode) << dendl;
        return set_cr_error(retcode);
      }
      ldpp_dout(dpp, 10) << "removed log shards for realm_epoch=" << cursor.get_epoch()
          << " period=" << cursor.get_period().get_id() << dendl;

      // update our mdlog history
      yield call(svc.mdlog->trim_log_period_cr(dpp, cursor, &objv));
      if (retcode == -ENOENT) {
        // must have raced to update mdlog history. return success and allow the
        // winner to continue purging
        ldpp_dout(dpp, 10) << "already removed log shards for realm_epoch=" << cursor.get_epoch()
            << " period=" << cursor.get_period().get_id() << dendl;
        return set_cr_done();
      } else if (retcode < 0) {
        ldpp_dout(dpp, 1) << "failed to remove log shards for realm_epoch="
            << cursor.get_epoch() << " period=" << cursor.get_period().get_id()
            << " with: " << cpp_strerror(retcode) << dendl;
        return set_cr_error(retcode);
      }

      if (*last_trim_epoch < cursor.get_epoch()) {
        *last_trim_epoch = cursor.get_epoch();
      }

      ceph_assert(cursor.has_next()); // get_current() should always come after
      cursor.next();
    }
    return set_cr_done();
  }
  return 0;
}

namespace {

using connection_map = std::map<std::string, std::unique_ptr<RGWRESTConn>>;

/// construct a RGWRESTConn for each zone in the realm
template <typename Zonegroups>
connection_map make_peer_connections(rgw::sal::RGWRadosStore *store,
                                     const Zonegroups& zonegroups)
{
  connection_map connections;
  for (auto& g : zonegroups) {
    for (auto& z : g.second.zones) {
      std::unique_ptr<RGWRESTConn> conn{
        new RGWRESTConn(store->ctx(), store->svc()->zone, z.first.id, z.second.endpoints)};
      connections.emplace(z.first.id, std::move(conn));
    }
  }
  return connections;
}

/// return the marker that it's safe to trim up to
const std::string& get_stable_marker(const rgw_meta_sync_marker& m)
{
  return m.state == m.FullSync ? m.next_step_marker : m.marker;
}

/// comparison operator for take_min_status()
bool operator<(const rgw_meta_sync_marker& lhs, const rgw_meta_sync_marker& rhs)
{
  // sort by stable marker
  return get_stable_marker(lhs) < get_stable_marker(rhs);
}

/// populate the status with the minimum stable marker of each shard for any
/// peer whose realm_epoch matches the minimum realm_epoch in the input
template <typename Iter>
int take_min_status(CephContext *cct, Iter first, Iter last,
                    rgw_meta_sync_status *status)
{
  if (first == last) {
    return -EINVAL;
  }
  const size_t num_shards = cct->_conf->rgw_md_log_max_shards;

  status->sync_info.realm_epoch = std::numeric_limits<epoch_t>::max();
  for (auto p = first; p != last; ++p) {
    // validate peer's shard count
    if (p->sync_markers.size() != num_shards) {
      ldout(cct, 1) << "take_min_status got peer status with "
          << p->sync_markers.size() << " shards, expected "
          << num_shards << dendl;
      return -EINVAL;
    }
    if (p->sync_info.realm_epoch < status->sync_info.realm_epoch) {
      // earlier epoch, take its entire status
      *status = std::move(*p);
    } else if (p->sync_info.realm_epoch == status->sync_info.realm_epoch) {
      // same epoch, take any earlier markers
      auto m = status->sync_markers.begin();
      for (auto& shard : p->sync_markers) {
        if (shard.second < m->second) {
          m->second = std::move(shard.second);
        }
        ++m;
      }
    }
  }
  return 0;
}

struct TrimEnv {
  const DoutPrefixProvider *dpp;
  rgw::sal::RGWRadosStore *const store;
  RGWHTTPManager *const http;
  int num_shards;
  const rgw_zone_id& zone;
  Cursor current; //< cursor to current period
  epoch_t last_trim_epoch{0}; //< epoch of last mdlog that was purged

  TrimEnv(const DoutPrefixProvider *dpp, rgw::sal::RGWRadosStore *store, RGWHTTPManager *http, int num_shards)
    : dpp(dpp), store(store), http(http), num_shards(num_shards),
      zone(store->svc()->zone->zone_id()),
      current(store->svc()->mdlog->get_period_history()->get_current())
  {}
};

struct MasterTrimEnv : public TrimEnv {
  connection_map connections; //< peer connections
  std::vector<rgw_meta_sync_status> peer_status; //< sync status for each peer
  /// last trim marker for each shard, only applies to current period's mdlog
  std::vector<std::string> last_trim_markers;

  MasterTrimEnv(const DoutPrefixProvider *dpp, rgw::sal::RGWRadosStore *store, RGWHTTPManager *http, int num_shards)
    : TrimEnv(dpp, store, http, num_shards),
      last_trim_markers(num_shards)
  {
    auto& period = current.get_period();
    connections = make_peer_connections(store, period.get_map().zonegroups);
    connections.erase(zone.id);
    peer_status.resize(connections.size());
  }
};

struct PeerTrimEnv : public TrimEnv {
  /// last trim timestamp for each shard, only applies to current period's mdlog
  std::vector<ceph::real_time> last_trim_timestamps;

  PeerTrimEnv(const DoutPrefixProvider *dpp, rgw::sal::RGWRadosStore *store, RGWHTTPManager *http, int num_shards)
    : TrimEnv(dpp, store, http, num_shards),
      last_trim_timestamps(num_shards)
  {}

  void set_num_shards(int num_shards) {
    this->num_shards = num_shards;
    last_trim_timestamps.resize(num_shards);
  }
};

} // anonymous namespace


/// spawn a trim cr for each shard that needs it, while limiting the number
/// of concurrent shards
class MetaMasterTrimShardCollectCR : public RGWShardCollectCR {
 private:
  static constexpr int MAX_CONCURRENT_SHARDS = 16;

  MasterTrimEnv& env;
  RGWMetadataLog *mdlog;
  int shard_id{0};
  std::string oid;
  const rgw_meta_sync_status& sync_status;

 public:
  MetaMasterTrimShardCollectCR(MasterTrimEnv& env, RGWMetadataLog *mdlog,
                               const rgw_meta_sync_status& sync_status)
    : RGWShardCollectCR(env.store->ctx(), MAX_CONCURRENT_SHARDS),
      env(env), mdlog(mdlog), sync_status(sync_status)
  {}

  bool spawn_next() override;
};

bool MetaMasterTrimShardCollectCR::spawn_next()
{
  while (shard_id < env.num_shards) {
    auto m = sync_status.sync_markers.find(shard_id);
    if (m == sync_status.sync_markers.end()) {
      shard_id++;
      continue;
    }
    auto& stable = get_stable_marker(m->second);
    auto& last_trim = env.last_trim_markers[shard_id];

    if (stable <= last_trim) {
      // already trimmed
      ldpp_dout(env.dpp, 20) << "skipping log shard " << shard_id
          << " at marker=" << stable
          << " last_trim=" << last_trim
          << " realm_epoch=" << sync_status.sync_info.realm_epoch << dendl;
      shard_id++;
      continue;
    }

    mdlog->get_shard_oid(shard_id, oid);

    ldpp_dout(env.dpp, 10) << "trimming log shard " << shard_id
        << " at marker=" << stable
        << " last_trim=" << last_trim
        << " realm_epoch=" << sync_status.sync_info.realm_epoch << dendl;
    spawn(new RGWSyncLogTrimCR(env.dpp, env.store, oid, stable, &last_trim), false);
    shard_id++;
    return true;
  }
  return false;
}

/// spawn rest requests to read each peer's sync status
class MetaMasterStatusCollectCR : public RGWShardCollectCR {
  static constexpr int MAX_CONCURRENT_SHARDS = 16;

  MasterTrimEnv& env;
  connection_map::iterator c;
  std::vector<rgw_meta_sync_status>::iterator s;
 public:
  explicit MetaMasterStatusCollectCR(MasterTrimEnv& env)
    : RGWShardCollectCR(env.store->ctx(), MAX_CONCURRENT_SHARDS),
      env(env), c(env.connections.begin()), s(env.peer_status.begin())
  {}

  bool spawn_next() override {
    if (c == env.connections.end()) {
      return false;
    }
    static rgw_http_param_pair params[] = {
      { "type", "metadata" },
      { "status", nullptr },
      { nullptr, nullptr }
    };

    ldout(cct, 20) << "query sync status from " << c->first << dendl;
    auto conn = c->second.get();
    using StatusCR = RGWReadRESTResourceCR<rgw_meta_sync_status>;
    spawn(new StatusCR(cct, conn, env.http, "/admin/log/", params, &*s),
          false);
    ++c;
    ++s;
    return true;
  }
};

class MetaMasterTrimCR : public RGWCoroutine {
  MasterTrimEnv& env;
  rgw_meta_sync_status min_status; //< minimum sync status of all peers
  int ret{0};

 public:
  explicit MetaMasterTrimCR(MasterTrimEnv& env)
    : RGWCoroutine(env.store->ctx()), env(env)
  {}

  int operate(const DoutPrefixProvider *dpp) override;
};

int MetaMasterTrimCR::operate(const DoutPrefixProvider *dpp)
{
  reenter(this) {
    // TODO: detect this and fail before we spawn the trim thread?
    if (env.connections.empty()) {
      ldpp_dout(dpp, 4) << "no peers, exiting" << dendl;
      return set_cr_done();
    }

    ldpp_dout(dpp, 10) << "fetching sync status for zone " << env.zone << dendl;
    // query mdlog sync status from peers
    yield call(new MetaMasterStatusCollectCR(env));

    // must get a successful reply from all peers to consider trimming
    if (ret < 0) {
      ldpp_dout(dpp, 4) << "failed to fetch sync status from all peers" << dendl;
      return set_cr_error(ret);
    }

    // determine the minimum epoch and markers
    ret = take_min_status(env.store->ctx(), env.peer_status.begin(),
                          env.peer_status.end(), &min_status);
    if (ret < 0) {
      ldpp_dout(dpp, 4) << "failed to calculate min sync status from peers" << dendl;
      return set_cr_error(ret);
    }
    yield {
      auto store = env.store;
      auto epoch = min_status.sync_info.realm_epoch;
      ldpp_dout(dpp, 4) << "realm epoch min=" << epoch
          << " current=" << env.current.get_epoch()<< dendl;
      if (epoch > env.last_trim_epoch + 1) {
        // delete any prior mdlog periods
        spawn(new PurgePeriodLogsCR(dpp, store, epoch, &env.last_trim_epoch), true);
      } else {
        ldpp_dout(dpp, 10) << "mdlogs already purged up to realm_epoch "
            << env.last_trim_epoch << dendl;
      }

      // if realm_epoch == current, trim mdlog based on markers
      if (epoch == env.current.get_epoch()) {
        auto mdlog = store->svc()->mdlog->get_log(env.current.get_period().get_id());
        spawn(new MetaMasterTrimShardCollectCR(env, mdlog, min_status), true);
      }
    }
    // ignore any errors during purge/trim because we want to hold the lock open
    return set_cr_done();
  }
  return 0;
}


/// read the first entry of the master's mdlog shard and trim to that position
class MetaPeerTrimShardCR : public RGWCoroutine {
  RGWMetaSyncEnv& env;
  RGWMetadataLog *mdlog;
  const std::string& period_id;
  const int shard_id;
  RGWMetadataLogInfo info;
  ceph::real_time stable; //< safe timestamp to trim, according to master
  ceph::real_time *last_trim; //< last trimmed timestamp, updated on trim
  rgw_mdlog_shard_data result; //< result from master's mdlog listing

 public:
  MetaPeerTrimShardCR(RGWMetaSyncEnv& env, RGWMetadataLog *mdlog,
                      const std::string& period_id, int shard_id,
                      ceph::real_time *last_trim)
    : RGWCoroutine(env.store->ctx()), env(env), mdlog(mdlog),
      period_id(period_id), shard_id(shard_id), last_trim(last_trim)
  {}

  int operate(const DoutPrefixProvider *dpp) override;
};

int MetaPeerTrimShardCR::operate(const DoutPrefixProvider *dpp)
{
  reenter(this) {
    // query master's first mdlog entry for this shard
    yield call(create_list_remote_mdlog_shard_cr(&env, period_id, shard_id,
                                                 "", 1, &result));
    if (retcode < 0) {
      ldpp_dout(dpp, 5) << "failed to read first entry from master's mdlog shard "
          << shard_id << " for period " << period_id
          << ": " << cpp_strerror(retcode) << dendl;
      return set_cr_error(retcode);
    }
    if (result.entries.empty()) {
      // if there are no mdlog entries, we don't have a timestamp to compare. we
      // can't just trim everything, because there could be racing updates since
      // this empty reply. query the mdlog shard info to read its max timestamp,
      // then retry the listing to make sure it's still empty before trimming to
      // that
      ldpp_dout(dpp, 10) << "empty master mdlog shard " << shard_id
          << ", reading last timestamp from shard info" << dendl;
      // read the mdlog shard info for the last timestamp
      yield call(create_read_remote_mdlog_shard_info_cr(&env, period_id, shard_id, &info));
      if (retcode < 0) {
        ldpp_dout(dpp, 5) << "failed to read info from master's mdlog shard "
            << shard_id << " for period " << period_id
            << ": " << cpp_strerror(retcode) << dendl;
        return set_cr_error(retcode);
      }
      if (ceph::real_clock::is_zero(info.last_update)) {
        return set_cr_done(); // nothing to trim
      }
      ldpp_dout(dpp, 10) << "got mdlog shard info with last update="
          << info.last_update << dendl;
      // re-read the master's first mdlog entry to make sure it hasn't changed
      yield call(create_list_remote_mdlog_shard_cr(&env, period_id, shard_id,
                                                   "", 1, &result));
      if (retcode < 0) {
        ldpp_dout(dpp, 5) << "failed to read first entry from master's mdlog shard "
            << shard_id << " for period " << period_id
            << ": " << cpp_strerror(retcode) << dendl;
        return set_cr_error(retcode);
      }
      // if the mdlog is still empty, trim to max marker
      if (result.entries.empty()) {
        stable = info.last_update;
      } else {
        stable = result.entries.front().timestamp;

        // can only trim -up to- master's first timestamp, so subtract a second.
        // (this is why we use timestamps instead of markers for the peers)
        stable -= std::chrono::seconds(1);
      }
    } else {
      stable = result.entries.front().timestamp;
      stable -= std::chrono::seconds(1);
    }

    if (stable <= *last_trim) {
      ldpp_dout(dpp, 10) << "skipping log shard " << shard_id
          << " at timestamp=" << stable
          << " last_trim=" << *last_trim << dendl;
      return set_cr_done();
    }

    ldpp_dout(dpp, 10) << "trimming log shard " << shard_id
        << " at timestamp=" << stable
        << " last_trim=" << *last_trim << dendl;
    yield {
      std::string oid;
      mdlog->get_shard_oid(shard_id, oid);
      call(new RGWRadosTimelogTrimCR(dpp, env.store, oid, real_time{}, stable, "", ""));
    }
    if (retcode < 0 && retcode != -ENODATA) {
      ldpp_dout(dpp, 1) << "failed to trim mdlog shard " << shard_id
          << ": " << cpp_strerror(retcode) << dendl;
      return set_cr_error(retcode);
    }
    *last_trim = stable;
    return set_cr_done();
  }
  return 0;
}

class MetaPeerTrimShardCollectCR : public RGWShardCollectCR {
  static constexpr int MAX_CONCURRENT_SHARDS = 16;

  PeerTrimEnv& env;
  RGWMetadataLog *mdlog;
  const std::string& period_id;
  RGWMetaSyncEnv meta_env; //< for RGWListRemoteMDLogShardCR
  int shard_id{0};

 public:
  MetaPeerTrimShardCollectCR(PeerTrimEnv& env, RGWMetadataLog *mdlog)
    : RGWShardCollectCR(env.store->ctx(), MAX_CONCURRENT_SHARDS),
      env(env), mdlog(mdlog), period_id(env.current.get_period().get_id())
  {
    meta_env.init(env.dpp, cct, env.store, env.store->svc()->zone->get_master_conn(),
                  env.store->svc()->rados->get_async_processor(), env.http, nullptr,
                  env.store->getRados()->get_sync_tracer());
  }

  bool spawn_next() override;
};

bool MetaPeerTrimShardCollectCR::spawn_next()
{
  if (shard_id >= env.num_shards) {
    return false;
  }
  auto& last_trim = env.last_trim_timestamps[shard_id];
  spawn(new MetaPeerTrimShardCR(meta_env, mdlog, period_id, shard_id, &last_trim),
        false);
  shard_id++;
  return true;
}

class MetaPeerTrimCR : public RGWCoroutine {
  PeerTrimEnv& env;
  rgw_mdlog_info mdlog_info; //< master's mdlog info

 public:
  explicit MetaPeerTrimCR(PeerTrimEnv& env) : RGWCoroutine(env.store->ctx()), env(env) {}

  int operate(const DoutPrefixProvider *dpp) override;
};

int MetaPeerTrimCR::operate(const DoutPrefixProvider *dpp)
{
  reenter(this) {
    ldpp_dout(dpp, 10) << "fetching master mdlog info" << dendl;
    yield {
      // query mdlog_info from master for oldest_log_period
      rgw_http_param_pair params[] = {
        { "type", "metadata" },
        { nullptr, nullptr }
      };

      using LogInfoCR = RGWReadRESTResourceCR<rgw_mdlog_info>;
      call(new LogInfoCR(cct, env.store->svc()->zone->get_master_conn(), env.http,
                         "/admin/log/", params, &mdlog_info));
    }
    if (retcode < 0) {
      ldpp_dout(dpp, 4) << "failed to read mdlog info from master" << dendl;
      return set_cr_error(retcode);
    }
    // use master's shard count instead
    env.set_num_shards(mdlog_info.num_shards);

    if (mdlog_info.realm_epoch > env.last_trim_epoch + 1) {
      // delete any prior mdlog periods
      yield call(new PurgePeriodLogsCR(dpp, env.store, mdlog_info.realm_epoch,
                                       &env.last_trim_epoch));
    } else {
      ldpp_dout(dpp, 10) << "mdlogs already purged through realm_epoch "
          << env.last_trim_epoch << dendl;
    }

    // if realm_epoch == current, trim mdlog based on master's markers
    if (mdlog_info.realm_epoch == env.current.get_epoch()) {
      yield {
        auto mdlog = env.store->svc()->mdlog->get_log(env.current.get_period().get_id());
        call(new MetaPeerTrimShardCollectCR(env, mdlog));
        // ignore any errors during purge/trim because we want to hold the lock open
      }
    }
    return set_cr_done();
  }
  return 0;
}

class MetaTrimPollCR : public RGWCoroutine {
  rgw::sal::RGWRadosStore *const store;
  const utime_t interval; //< polling interval
  const rgw_raw_obj obj;
  const std::string name{"meta_trim"}; //< lock name
  const std::string cookie;

 protected:
  /// allocate the coroutine to run within the lease
  virtual RGWCoroutine* alloc_cr() = 0;

 public:
  MetaTrimPollCR(rgw::sal::RGWRadosStore *store, utime_t interval)
    : RGWCoroutine(store->ctx()), store(store), interval(interval),
      obj(store->svc()->zone->get_zone_params().log_pool, RGWMetadataLogHistory::oid),
      cookie(RGWSimpleRadosLockCR::gen_random_cookie(cct))
  {}

  int operate(const DoutPrefixProvider *dpp) override;
};

int MetaTrimPollCR::operate(const DoutPrefixProvider *dpp)
{
  reenter(this) {
    for (;;) {
      set_status("sleeping");
      wait(interval);

      // prevent others from trimming for our entire wait interval
      set_status("acquiring trim lock");
      yield call(new RGWSimpleRadosLockCR(store->svc()->rados->get_async_processor(), store,
                                          obj, name, cookie, interval.sec()));
      if (retcode < 0) {
        ldout(cct, 4) << "failed to lock: " << cpp_strerror(retcode) << dendl;
        continue;
      }

      set_status("trimming");
      yield call(alloc_cr());

      if (retcode < 0) {
        // on errors, unlock so other gateways can try
        set_status("unlocking");
        yield call(new RGWSimpleRadosUnlockCR(store->svc()->rados->get_async_processor(), store,
                                              obj, name, cookie));
      }
    }
  }
  return 0;
}

class MetaMasterTrimPollCR : public MetaTrimPollCR  {
  MasterTrimEnv env; //< trim state to share between calls
  RGWCoroutine* alloc_cr() override {
    return new MetaMasterTrimCR(env);
  }
 public:
  MetaMasterTrimPollCR(const DoutPrefixProvider *dpp, rgw::sal::RGWRadosStore *store, RGWHTTPManager *http,
                       int num_shards, utime_t interval)
    : MetaTrimPollCR(store, interval),
      env(dpp, store, http, num_shards)
  {}
};

class MetaPeerTrimPollCR : public MetaTrimPollCR {
  PeerTrimEnv env; //< trim state to share between calls
  RGWCoroutine* alloc_cr() override {
    return new MetaPeerTrimCR(env);
  }
 public:
  MetaPeerTrimPollCR(const DoutPrefixProvider *dpp, rgw::sal::RGWRadosStore *store, RGWHTTPManager *http,
                     int num_shards, utime_t interval)
    : MetaTrimPollCR(store, interval),
      env(dpp, store, http, num_shards)
  {}
};

RGWCoroutine* create_meta_log_trim_cr(const DoutPrefixProvider *dpp, rgw::sal::RGWRadosStore *store, RGWHTTPManager *http,
                                      int num_shards, utime_t interval)
{
  if (store->svc()->zone->is_meta_master()) {
    return new MetaMasterTrimPollCR(dpp, store, http, num_shards, interval);
  }
  return new MetaPeerTrimPollCR(dpp, store, http, num_shards, interval);
}


struct MetaMasterAdminTrimCR : private MasterTrimEnv, public MetaMasterTrimCR {
  MetaMasterAdminTrimCR(const DoutPrefixProvider *dpp, rgw::sal::RGWRadosStore *store, RGWHTTPManager *http, int num_shards)
    : MasterTrimEnv(dpp, store, http, num_shards),
      MetaMasterTrimCR(*static_cast<MasterTrimEnv*>(this))
  {}
};

struct MetaPeerAdminTrimCR : private PeerTrimEnv, public MetaPeerTrimCR {
  MetaPeerAdminTrimCR(const DoutPrefixProvider *dpp, rgw::sal::RGWRadosStore *store, RGWHTTPManager *http, int num_shards)
    : PeerTrimEnv(dpp, store, http, num_shards),
      MetaPeerTrimCR(*static_cast<PeerTrimEnv*>(this))
  {}
};

RGWCoroutine* create_admin_meta_log_trim_cr(const DoutPrefixProvider *dpp, rgw::sal::RGWRadosStore *store,
                                            RGWHTTPManager *http,
                                            int num_shards)
{
  if (store->svc()->zone->is_meta_master()) {
    return new MetaMasterAdminTrimCR(dpp, store, http, num_shards);
  }
  return new MetaPeerAdminTrimCR(dpp, store, http, num_shards);
}