diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-07 18:45:59 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-07 18:45:59 +0000 |
commit | 19fcec84d8d7d21e796c7624e521b60d28ee21ed (patch) | |
tree | 42d26aa27d1e3f7c0b8bd3fd14e7d7082f5008dc /src/rgw/rgw_sync.cc | |
parent | Initial commit. (diff) | |
download | ceph-upstream.tar.xz ceph-upstream.zip |
Adding upstream version 16.2.11+ds.upstream/16.2.11+dsupstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to '')
-rw-r--r-- | src/rgw/rgw_sync.cc | 2484 |
1 files changed, 2484 insertions, 0 deletions
diff --git a/src/rgw/rgw_sync.cc b/src/rgw/rgw_sync.cc new file mode 100644 index 000000000..8eacd150f --- /dev/null +++ b/src/rgw/rgw_sync.cc @@ -0,0 +1,2484 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab ft=cpp + +#include <boost/optional.hpp> + +#include "common/ceph_json.h" +#include "common/RWLock.h" +#include "common/RefCountedObj.h" +#include "common/WorkQueue.h" +#include "common/Throttle.h" +#include "common/admin_socket.h" +#include "common/errno.h" + +#include "rgw_common.h" +#include "rgw_zone.h" +#include "rgw_sync.h" +#include "rgw_metadata.h" +#include "rgw_mdlog_types.h" +#include "rgw_rest_conn.h" +#include "rgw_tools.h" +#include "rgw_cr_rados.h" +#include "rgw_cr_rest.h" +#include "rgw_http_client.h" +#include "rgw_sync_trace.h" + +#include "cls/lock/cls_lock_client.h" + +#include "services/svc_zone.h" +#include "services/svc_mdlog.h" +#include "services/svc_meta.h" +#include "services/svc_cls.h" + +#include <boost/asio/yield.hpp> + +#define dout_subsys ceph_subsys_rgw + +#undef dout_prefix +#define dout_prefix (*_dout << "meta sync: ") + +static string mdlog_sync_status_oid = "mdlog.sync-status"; +static string mdlog_sync_status_shard_prefix = "mdlog.sync-status.shard"; +static string mdlog_sync_full_sync_index_prefix = "meta.full-sync.index"; + +RGWSyncErrorLogger::RGWSyncErrorLogger(rgw::sal::RGWRadosStore *_store, const string &oid_prefix, int _num_shards) : store(_store), num_shards(_num_shards) { + for (int i = 0; i < num_shards; i++) { + oids.push_back(get_shard_oid(oid_prefix, i)); + } +} +string RGWSyncErrorLogger::get_shard_oid(const string& oid_prefix, int shard_id) { + char buf[oid_prefix.size() + 16]; + snprintf(buf, sizeof(buf), "%s.%d", oid_prefix.c_str(), shard_id); + return string(buf); +} + +RGWCoroutine *RGWSyncErrorLogger::log_error_cr(const DoutPrefixProvider *dpp, const string& source_zone, const string& section, const string& name, uint32_t error_code, const string& message) { + cls_log_entry entry; + + rgw_sync_error_info info(source_zone, error_code, message); + bufferlist bl; + encode(info, bl); + store->svc()->cls->timelog.prepare_entry(entry, real_clock::now(), section, name, bl); + + uint32_t shard_id = ++counter % num_shards; + + + return new RGWRadosTimelogAddCR(dpp, store, oids[shard_id], entry); +} + +void RGWSyncBackoff::update_wait_time() +{ + if (cur_wait == 0) { + cur_wait = 1; + } else { + cur_wait = (cur_wait << 1); + } + if (cur_wait >= max_secs) { + cur_wait = max_secs; + } +} + +void RGWSyncBackoff::backoff_sleep() +{ + update_wait_time(); + sleep(cur_wait); +} + +void RGWSyncBackoff::backoff(RGWCoroutine *op) +{ + update_wait_time(); + op->wait(utime_t(cur_wait, 0)); +} + +int RGWBackoffControlCR::operate(const DoutPrefixProvider *dpp) { + reenter(this) { + // retry the operation until it succeeds + while (true) { + yield { + std::lock_guard l{lock}; + cr = alloc_cr(); + cr->get(); + call(cr); + } + { + std::lock_guard l{lock}; + cr->put(); + cr = NULL; + } + if (retcode >= 0) { + break; + } + if (retcode != -EBUSY && retcode != -EAGAIN) { + ldout(cct, 0) << "ERROR: RGWBackoffControlCR called coroutine returned " << retcode << dendl; + if (exit_on_error) { + return set_cr_error(retcode); + } + } + if (reset_backoff) { + backoff.reset(); + } + yield backoff.backoff(this); + } + + // run an optional finisher + yield call(alloc_finisher_cr()); + if (retcode < 0) { + ldout(cct, 0) << "ERROR: call to finisher_cr() failed: retcode=" << retcode << dendl; + return set_cr_error(retcode); + } + return set_cr_done(); + } + return 0; +} + +void rgw_mdlog_info::decode_json(JSONObj *obj) { + JSONDecoder::decode_json("num_objects", num_shards, obj); + JSONDecoder::decode_json("period", period, obj); + JSONDecoder::decode_json("realm_epoch", realm_epoch, obj); +} + +void rgw_mdlog_entry::decode_json(JSONObj *obj) { + JSONDecoder::decode_json("id", id, obj); + JSONDecoder::decode_json("section", section, obj); + JSONDecoder::decode_json("name", name, obj); + utime_t ut; + JSONDecoder::decode_json("timestamp", ut, obj); + timestamp = ut.to_real_time(); + JSONDecoder::decode_json("data", log_data, obj); +} + +void rgw_mdlog_shard_data::decode_json(JSONObj *obj) { + JSONDecoder::decode_json("marker", marker, obj); + JSONDecoder::decode_json("truncated", truncated, obj); + JSONDecoder::decode_json("entries", entries, obj); +}; + +int RGWShardCollectCR::operate(const DoutPrefixProvider *dpp) { + reenter(this) { + while (spawn_next()) { + current_running++; + + while (current_running >= max_concurrent) { + int child_ret; + yield wait_for_child(); + if (collect_next(&child_ret)) { + current_running--; + if (child_ret < 0 && child_ret != -ENOENT) { + ldout(cct, 10) << __func__ << ": failed to fetch log status, ret=" << child_ret << dendl; + status = child_ret; + } + } + } + } + while (current_running > 0) { + int child_ret; + yield wait_for_child(); + if (collect_next(&child_ret)) { + current_running--; + if (child_ret < 0 && child_ret != -ENOENT) { + ldout(cct, 10) << __func__ << ": failed to fetch log status, ret=" << child_ret << dendl; + status = child_ret; + } + } + } + if (status < 0) { + return set_cr_error(status); + } + return set_cr_done(); + } + return 0; +} + +class RGWReadRemoteMDLogInfoCR : public RGWShardCollectCR { + RGWMetaSyncEnv *sync_env; + + const std::string& period; + int num_shards; + map<int, RGWMetadataLogInfo> *mdlog_info; + + int shard_id; +#define READ_MDLOG_MAX_CONCURRENT 10 + +public: + RGWReadRemoteMDLogInfoCR(RGWMetaSyncEnv *_sync_env, + const std::string& period, int _num_shards, + map<int, RGWMetadataLogInfo> *_mdlog_info) : RGWShardCollectCR(_sync_env->cct, READ_MDLOG_MAX_CONCURRENT), + sync_env(_sync_env), + period(period), num_shards(_num_shards), + mdlog_info(_mdlog_info), shard_id(0) {} + bool spawn_next() override; +}; + +class RGWListRemoteMDLogCR : public RGWShardCollectCR { + RGWMetaSyncEnv *sync_env; + + const std::string& period; + map<int, string> shards; + int max_entries_per_shard; + map<int, rgw_mdlog_shard_data> *result; + + map<int, string>::iterator iter; +#define READ_MDLOG_MAX_CONCURRENT 10 + +public: + RGWListRemoteMDLogCR(RGWMetaSyncEnv *_sync_env, + const std::string& period, map<int, string>& _shards, + int _max_entries_per_shard, + map<int, rgw_mdlog_shard_data> *_result) : RGWShardCollectCR(_sync_env->cct, READ_MDLOG_MAX_CONCURRENT), + sync_env(_sync_env), period(period), + max_entries_per_shard(_max_entries_per_shard), + result(_result) { + shards.swap(_shards); + iter = shards.begin(); + } + bool spawn_next() override; +}; + +RGWRemoteMetaLog::~RGWRemoteMetaLog() +{ + delete error_logger; +} + +int RGWRemoteMetaLog::read_log_info(const DoutPrefixProvider *dpp, rgw_mdlog_info *log_info) +{ + rgw_http_param_pair pairs[] = { { "type", "metadata" }, + { NULL, NULL } }; + + int ret = conn->get_json_resource(dpp, "/admin/log", pairs, null_yield, *log_info); + if (ret < 0) { + ldpp_dout(dpp, 0) << "ERROR: failed to fetch mdlog info" << dendl; + return ret; + } + + ldpp_dout(dpp, 20) << "remote mdlog, num_shards=" << log_info->num_shards << dendl; + + return 0; +} + +int RGWRemoteMetaLog::read_master_log_shards_info(const DoutPrefixProvider *dpp, const string &master_period, map<int, RGWMetadataLogInfo> *shards_info) +{ + if (store->svc()->zone->is_meta_master()) { + return 0; + } + + rgw_mdlog_info log_info; + int ret = read_log_info(dpp, &log_info); + if (ret < 0) { + return ret; + } + + return run(dpp, new RGWReadRemoteMDLogInfoCR(&sync_env, master_period, log_info.num_shards, shards_info)); +} + +int RGWRemoteMetaLog::read_master_log_shards_next(const DoutPrefixProvider *dpp, const string& period, map<int, string> shard_markers, map<int, rgw_mdlog_shard_data> *result) +{ + if (store->svc()->zone->is_meta_master()) { + return 0; + } + + return run(dpp, new RGWListRemoteMDLogCR(&sync_env, period, shard_markers, 1, result)); +} + +int RGWRemoteMetaLog::init() +{ + conn = store->svc()->zone->get_master_conn(); + + int ret = http_manager.start(); + if (ret < 0) { + ldpp_dout(dpp, 0) << "failed in http_manager.start() ret=" << ret << dendl; + return ret; + } + + error_logger = new RGWSyncErrorLogger(store, RGW_SYNC_ERROR_LOG_SHARD_PREFIX, ERROR_LOGGER_SHARDS); + + init_sync_env(&sync_env); + + tn = sync_env.sync_tracer->add_node(sync_env.sync_tracer->root_node, "meta"); + + return 0; +} + +void RGWRemoteMetaLog::finish() +{ + going_down = true; + stop(); +} + +#define CLONE_MAX_ENTRIES 100 + +int RGWMetaSyncStatusManager::init(const DoutPrefixProvider *dpp) +{ + if (store->svc()->zone->is_meta_master()) { + return 0; + } + + if (!store->svc()->zone->get_master_conn()) { + ldpp_dout(dpp, -1) << "no REST connection to master zone" << dendl; + return -EIO; + } + + int r = rgw_init_ioctx(dpp, store->getRados()->get_rados_handle(), store->svc()->zone->get_zone_params().log_pool, ioctx, true); + if (r < 0) { + ldpp_dout(dpp, -1) << "ERROR: failed to open log pool (" << store->svc()->zone->get_zone_params().log_pool << " ret=" << r << dendl; + return r; + } + + r = master_log.init(); + if (r < 0) { + ldpp_dout(dpp, -1) << "ERROR: failed to init remote log, r=" << r << dendl; + return r; + } + + RGWMetaSyncEnv& sync_env = master_log.get_sync_env(); + + rgw_meta_sync_status sync_status; + r = read_sync_status(dpp, &sync_status); + if (r < 0 && r != -ENOENT) { + ldpp_dout(dpp, -1) << "ERROR: failed to read sync status, r=" << r << dendl; + return r; + } + + int num_shards = sync_status.sync_info.num_shards; + + for (int i = 0; i < num_shards; i++) { + shard_objs[i] = rgw_raw_obj(store->svc()->zone->get_zone_params().log_pool, sync_env.shard_obj_name(i)); + } + + std::unique_lock wl{ts_to_shard_lock}; + for (int i = 0; i < num_shards; i++) { + clone_markers.push_back(string()); + utime_shard ut; + ut.shard_id = i; + ts_to_shard[ut] = i; + } + + return 0; +} + +unsigned RGWMetaSyncStatusManager::get_subsys() const +{ + return dout_subsys; +} + +std::ostream& RGWMetaSyncStatusManager::gen_prefix(std::ostream& out) const +{ + return out << "meta sync: "; +} + +void RGWMetaSyncEnv::init(const DoutPrefixProvider *_dpp, CephContext *_cct, rgw::sal::RGWRadosStore *_store, RGWRESTConn *_conn, + RGWAsyncRadosProcessor *_async_rados, RGWHTTPManager *_http_manager, + RGWSyncErrorLogger *_error_logger, RGWSyncTraceManager *_sync_tracer) { + dpp = _dpp; + cct = _cct; + store = _store; + conn = _conn; + async_rados = _async_rados; + http_manager = _http_manager; + error_logger = _error_logger; + sync_tracer = _sync_tracer; +} + +string RGWMetaSyncEnv::status_oid() +{ + return mdlog_sync_status_oid; +} + +string RGWMetaSyncEnv::shard_obj_name(int shard_id) +{ + char buf[mdlog_sync_status_shard_prefix.size() + 16]; + snprintf(buf, sizeof(buf), "%s.%d", mdlog_sync_status_shard_prefix.c_str(), shard_id); + + return string(buf); +} + +class RGWAsyncReadMDLogEntries : public RGWAsyncRadosRequest { + const DoutPrefixProvider *dpp; + rgw::sal::RGWRadosStore *store; + RGWMetadataLog *mdlog; + int shard_id; + int max_entries; + +protected: + int _send_request(const DoutPrefixProvider *dpp) override { + real_time from_time; + real_time end_time; + + void *handle; + + mdlog->init_list_entries(shard_id, from_time, end_time, marker, &handle); + + int ret = mdlog->list_entries(dpp, handle, max_entries, entries, &marker, &truncated); + + mdlog->complete_list_entries(handle); + + return ret; + } +public: + string marker; + list<cls_log_entry> entries; + bool truncated; + + RGWAsyncReadMDLogEntries(const DoutPrefixProvider *_dpp, RGWCoroutine *caller, RGWAioCompletionNotifier *cn, rgw::sal::RGWRadosStore *_store, + RGWMetadataLog* mdlog, int _shard_id, + std::string _marker, int _max_entries) + : RGWAsyncRadosRequest(caller, cn), dpp(_dpp), store(_store), mdlog(mdlog), + shard_id(_shard_id), max_entries(_max_entries), marker(std::move(_marker)) {} +}; + +class RGWReadMDLogEntriesCR : public RGWSimpleCoroutine { + RGWMetaSyncEnv *sync_env; + RGWMetadataLog *const mdlog; + int shard_id; + string marker; + string *pmarker; + int max_entries; + list<cls_log_entry> *entries; + bool *truncated; + + RGWAsyncReadMDLogEntries *req{nullptr}; + +public: + RGWReadMDLogEntriesCR(RGWMetaSyncEnv *_sync_env, RGWMetadataLog* mdlog, + int _shard_id, string*_marker, int _max_entries, + list<cls_log_entry> *_entries, bool *_truncated) + : RGWSimpleCoroutine(_sync_env->cct), sync_env(_sync_env), mdlog(mdlog), + shard_id(_shard_id), pmarker(_marker), max_entries(_max_entries), + entries(_entries), truncated(_truncated) {} + + ~RGWReadMDLogEntriesCR() override { + if (req) { + req->finish(); + } + } + + int send_request(const DoutPrefixProvider *dpp) override { + marker = *pmarker; + req = new RGWAsyncReadMDLogEntries(dpp, this, stack->create_completion_notifier(), + sync_env->store, mdlog, shard_id, marker, + max_entries); + sync_env->async_rados->queue(req); + return 0; + } + + int request_complete() override { + *pmarker = std::move(req->marker); + *entries = std::move(req->entries); + *truncated = req->truncated; + return req->get_ret_status(); + } +}; + + +class RGWReadRemoteMDLogShardInfoCR : public RGWCoroutine { + RGWMetaSyncEnv *env; + RGWRESTReadResource *http_op; + + const std::string& period; + int shard_id; + RGWMetadataLogInfo *shard_info; + +public: + RGWReadRemoteMDLogShardInfoCR(RGWMetaSyncEnv *env, const std::string& period, + int _shard_id, RGWMetadataLogInfo *_shard_info) + : RGWCoroutine(env->store->ctx()), env(env), http_op(NULL), + period(period), shard_id(_shard_id), shard_info(_shard_info) {} + + int operate(const DoutPrefixProvider *dpp) override { + auto store = env->store; + RGWRESTConn *conn = store->svc()->zone->get_master_conn(); + reenter(this) { + yield { + char buf[16]; + snprintf(buf, sizeof(buf), "%d", shard_id); + rgw_http_param_pair pairs[] = { { "type" , "metadata" }, + { "id", buf }, + { "period", period.c_str() }, + { "info" , NULL }, + { NULL, NULL } }; + + string p = "/admin/log/"; + + http_op = new RGWRESTReadResource(conn, p, pairs, NULL, + env->http_manager); + + init_new_io(http_op); + + int ret = http_op->aio_read(dpp); + if (ret < 0) { + ldpp_dout(env->dpp, 0) << "ERROR: failed to read from " << p << dendl; + log_error() << "failed to send http operation: " << http_op->to_str() << " ret=" << ret << std::endl; + http_op->put(); + return set_cr_error(ret); + } + + return io_block(0); + } + yield { + int ret = http_op->wait(shard_info, null_yield); + http_op->put(); + if (ret < 0) { + return set_cr_error(ret); + } + return set_cr_done(); + } + } + return 0; + } +}; + +RGWCoroutine* create_read_remote_mdlog_shard_info_cr(RGWMetaSyncEnv *env, + const std::string& period, + int shard_id, + RGWMetadataLogInfo* info) +{ + return new RGWReadRemoteMDLogShardInfoCR(env, period, shard_id, info); +} + +class RGWListRemoteMDLogShardCR : public RGWSimpleCoroutine { + RGWMetaSyncEnv *sync_env; + RGWRESTReadResource *http_op; + + const std::string& period; + int shard_id; + string marker; + uint32_t max_entries; + rgw_mdlog_shard_data *result; + +public: + RGWListRemoteMDLogShardCR(RGWMetaSyncEnv *env, const std::string& period, + int _shard_id, const string& _marker, uint32_t _max_entries, + rgw_mdlog_shard_data *_result) + : RGWSimpleCoroutine(env->store->ctx()), sync_env(env), http_op(NULL), + period(period), shard_id(_shard_id), marker(_marker), max_entries(_max_entries), result(_result) {} + + int send_request(const DoutPrefixProvider *dpp) override { + RGWRESTConn *conn = sync_env->conn; + + char buf[32]; + snprintf(buf, sizeof(buf), "%d", shard_id); + + char max_entries_buf[32]; + snprintf(max_entries_buf, sizeof(max_entries_buf), "%d", (int)max_entries); + + const char *marker_key = (marker.empty() ? "" : "marker"); + + rgw_http_param_pair pairs[] = { { "type", "metadata" }, + { "id", buf }, + { "period", period.c_str() }, + { "max-entries", max_entries_buf }, + { marker_key, marker.c_str() }, + { NULL, NULL } }; + + string p = "/admin/log/"; + + http_op = new RGWRESTReadResource(conn, p, pairs, NULL, sync_env->http_manager); + init_new_io(http_op); + + int ret = http_op->aio_read(dpp); + if (ret < 0) { + ldpp_dout(dpp, 0) << "ERROR: failed to read from " << p << dendl; + log_error() << "failed to send http operation: " << http_op->to_str() << " ret=" << ret << std::endl; + http_op->put(); + return ret; + } + + return 0; + } + + int request_complete() override { + int ret = http_op->wait(result, null_yield); + http_op->put(); + if (ret < 0 && ret != -ENOENT) { + ldpp_dout(sync_env->dpp, 0) << "ERROR: failed to list remote mdlog shard, ret=" << ret << dendl; + return ret; + } + return 0; + } +}; + +RGWCoroutine* create_list_remote_mdlog_shard_cr(RGWMetaSyncEnv *env, + const std::string& period, + int shard_id, + const std::string& marker, + uint32_t max_entries, + rgw_mdlog_shard_data *result) +{ + return new RGWListRemoteMDLogShardCR(env, period, shard_id, marker, + max_entries, result); +} + +bool RGWReadRemoteMDLogInfoCR::spawn_next() { + if (shard_id >= num_shards) { + return false; + } + spawn(new RGWReadRemoteMDLogShardInfoCR(sync_env, period, shard_id, &(*mdlog_info)[shard_id]), false); + shard_id++; + return true; +} + +bool RGWListRemoteMDLogCR::spawn_next() { + if (iter == shards.end()) { + return false; + } + + spawn(new RGWListRemoteMDLogShardCR(sync_env, period, iter->first, iter->second, max_entries_per_shard, &(*result)[iter->first]), false); + ++iter; + return true; +} + +class RGWInitSyncStatusCoroutine : public RGWCoroutine { + RGWMetaSyncEnv *sync_env; + + rgw_meta_sync_info status; + vector<RGWMetadataLogInfo> shards_info; + boost::intrusive_ptr<RGWContinuousLeaseCR> lease_cr; + boost::intrusive_ptr<RGWCoroutinesStack> lease_stack; +public: + RGWInitSyncStatusCoroutine(RGWMetaSyncEnv *_sync_env, + const rgw_meta_sync_info &status) + : RGWCoroutine(_sync_env->store->ctx()), sync_env(_sync_env), + status(status), shards_info(status.num_shards), + lease_cr(nullptr), lease_stack(nullptr) {} + + ~RGWInitSyncStatusCoroutine() override { + if (lease_cr) { + lease_cr->abort(); + } + } + + int operate(const DoutPrefixProvider *dpp) override { + int ret; + reenter(this) { + yield { + set_status("acquiring sync lock"); + uint32_t lock_duration = cct->_conf->rgw_sync_lease_period; + string lock_name = "sync_lock"; + rgw::sal::RGWRadosStore *store = sync_env->store; + lease_cr.reset(new RGWContinuousLeaseCR(sync_env->async_rados, store, + rgw_raw_obj(store->svc()->zone->get_zone_params().log_pool, sync_env->status_oid()), + lock_name, lock_duration, this)); + lease_stack.reset(spawn(lease_cr.get(), false)); + } + while (!lease_cr->is_locked()) { + if (lease_cr->is_done()) { + ldpp_dout(dpp, 5) << "lease cr failed, done early " << dendl; + set_status("lease lock failed, early abort"); + return set_cr_error(lease_cr->get_ret_status()); + } + set_sleeping(true); + yield; + } + yield { + set_status("writing sync status"); + rgw::sal::RGWRadosStore *store = sync_env->store; + call(new RGWSimpleRadosWriteCR<rgw_meta_sync_info>(dpp, sync_env->async_rados, store->svc()->sysobj, + rgw_raw_obj(store->svc()->zone->get_zone_params().log_pool, sync_env->status_oid()), + status)); + } + + if (retcode < 0) { + set_status("failed to write sync status"); + ldpp_dout(dpp, 0) << "ERROR: failed to write sync status, retcode=" << retcode << dendl; + yield lease_cr->go_down(); + return set_cr_error(retcode); + } + /* fetch current position in logs */ + set_status("fetching remote log position"); + yield { + for (int i = 0; i < (int)status.num_shards; i++) { + spawn(new RGWReadRemoteMDLogShardInfoCR(sync_env, status.period, i, + &shards_info[i]), false); + } + } + + drain_all_but_stack(lease_stack.get()); /* the lease cr still needs to run */ + + yield { + set_status("updating sync status"); + for (int i = 0; i < (int)status.num_shards; i++) { + rgw_meta_sync_marker marker; + RGWMetadataLogInfo& info = shards_info[i]; + marker.next_step_marker = info.marker; + marker.timestamp = info.last_update; + rgw::sal::RGWRadosStore *store = sync_env->store; + spawn(new RGWSimpleRadosWriteCR<rgw_meta_sync_marker>(dpp, + sync_env->async_rados, + store->svc()->sysobj, + rgw_raw_obj(store->svc()->zone->get_zone_params().log_pool, sync_env->shard_obj_name(i)), + marker), true); + } + } + yield { + set_status("changing sync state: build full sync maps"); + status.state = rgw_meta_sync_info::StateBuildingFullSyncMaps; + rgw::sal::RGWRadosStore *store = sync_env->store; + call(new RGWSimpleRadosWriteCR<rgw_meta_sync_info>(dpp, sync_env->async_rados, store->svc()->sysobj, + rgw_raw_obj(store->svc()->zone->get_zone_params().log_pool, sync_env->status_oid()), + status)); + } + set_status("drop lock lease"); + yield lease_cr->go_down(); + while (collect(&ret, NULL)) { + if (ret < 0) { + return set_cr_error(ret); + } + yield; + } + drain_all(); + return set_cr_done(); + } + return 0; + } +}; + +class RGWReadSyncStatusMarkersCR : public RGWShardCollectCR { + static constexpr int MAX_CONCURRENT_SHARDS = 16; + + RGWMetaSyncEnv *env; + const int num_shards; + int shard_id{0}; + map<uint32_t, rgw_meta_sync_marker>& markers; + + public: + RGWReadSyncStatusMarkersCR(RGWMetaSyncEnv *env, int num_shards, + map<uint32_t, rgw_meta_sync_marker>& markers) + : RGWShardCollectCR(env->cct, MAX_CONCURRENT_SHARDS), + env(env), num_shards(num_shards), markers(markers) + {} + bool spawn_next() override; +}; + +bool RGWReadSyncStatusMarkersCR::spawn_next() +{ + if (shard_id >= num_shards) { + return false; + } + using CR = RGWSimpleRadosReadCR<rgw_meta_sync_marker>; + rgw_raw_obj obj{env->store->svc()->zone->get_zone_params().log_pool, + env->shard_obj_name(shard_id)}; + spawn(new CR(env->dpp, env->async_rados, env->store->svc()->sysobj, obj, &markers[shard_id]), false); + shard_id++; + return true; +} + +class RGWReadSyncStatusCoroutine : public RGWCoroutine { + RGWMetaSyncEnv *sync_env; + rgw_meta_sync_status *sync_status; + +public: + RGWReadSyncStatusCoroutine(RGWMetaSyncEnv *_sync_env, + rgw_meta_sync_status *_status) + : RGWCoroutine(_sync_env->cct), sync_env(_sync_env), sync_status(_status) + {} + int operate(const DoutPrefixProvider *dpp) override; +}; + +int RGWReadSyncStatusCoroutine::operate(const DoutPrefixProvider *dpp) +{ + reenter(this) { + // read sync info + using ReadInfoCR = RGWSimpleRadosReadCR<rgw_meta_sync_info>; + yield { + bool empty_on_enoent = false; // fail on ENOENT + rgw_raw_obj obj{sync_env->store->svc()->zone->get_zone_params().log_pool, + sync_env->status_oid()}; + call(new ReadInfoCR(dpp, sync_env->async_rados, sync_env->store->svc()->sysobj, obj, + &sync_status->sync_info, empty_on_enoent)); + } + if (retcode < 0) { + ldpp_dout(dpp, 4) << "failed to read sync status info with " + << cpp_strerror(retcode) << dendl; + return set_cr_error(retcode); + } + // read shard markers + using ReadMarkersCR = RGWReadSyncStatusMarkersCR; + yield call(new ReadMarkersCR(sync_env, sync_status->sync_info.num_shards, + sync_status->sync_markers)); + if (retcode < 0) { + ldpp_dout(dpp, 4) << "failed to read sync status markers with " + << cpp_strerror(retcode) << dendl; + return set_cr_error(retcode); + } + return set_cr_done(); + } + return 0; +} + +class RGWFetchAllMetaCR : public RGWCoroutine { + RGWMetaSyncEnv *sync_env; + + int num_shards; + + + int ret_status; + + list<string> sections; + list<string>::iterator sections_iter; + + struct meta_list_result { + list<string> keys; + string marker; + uint64_t count{0}; + bool truncated{false}; + + void decode_json(JSONObj *obj) { + JSONDecoder::decode_json("keys", keys, obj); + JSONDecoder::decode_json("marker", marker, obj); + JSONDecoder::decode_json("count", count, obj); + JSONDecoder::decode_json("truncated", truncated, obj); + } + } result; + list<string>::iterator iter; + + std::unique_ptr<RGWShardedOmapCRManager> entries_index; + + boost::intrusive_ptr<RGWContinuousLeaseCR> lease_cr; + boost::intrusive_ptr<RGWCoroutinesStack> lease_stack; + bool lost_lock; + bool failed; + + string marker; + + map<uint32_t, rgw_meta_sync_marker>& markers; + + RGWSyncTraceNodeRef tn; + +public: + RGWFetchAllMetaCR(RGWMetaSyncEnv *_sync_env, int _num_shards, + map<uint32_t, rgw_meta_sync_marker>& _markers, + RGWSyncTraceNodeRef& _tn_parent) : RGWCoroutine(_sync_env->cct), sync_env(_sync_env), + num_shards(_num_shards), + ret_status(0), lease_cr(nullptr), lease_stack(nullptr), + lost_lock(false), failed(false), markers(_markers) { + tn = sync_env->sync_tracer->add_node(_tn_parent, "fetch_all_meta"); + } + + ~RGWFetchAllMetaCR() override { + } + + void append_section_from_set(set<string>& all_sections, const string& name) { + set<string>::iterator iter = all_sections.find(name); + if (iter != all_sections.end()) { + sections.emplace_back(std::move(*iter)); + all_sections.erase(iter); + } + } + /* + * meta sync should go in the following order: user, bucket.instance, bucket + * then whatever other sections exist (if any) + */ + void rearrange_sections() { + set<string> all_sections; + std::move(sections.begin(), sections.end(), + std::inserter(all_sections, all_sections.end())); + sections.clear(); + + append_section_from_set(all_sections, "user"); + append_section_from_set(all_sections, "bucket.instance"); + append_section_from_set(all_sections, "bucket"); + + std::move(all_sections.begin(), all_sections.end(), + std::back_inserter(sections)); + } + + int operate(const DoutPrefixProvider *dpp) override { + RGWRESTConn *conn = sync_env->conn; + + reenter(this) { + yield { + set_status(string("acquiring lock (") + sync_env->status_oid() + ")"); + uint32_t lock_duration = cct->_conf->rgw_sync_lease_period; + string lock_name = "sync_lock"; + lease_cr.reset(new RGWContinuousLeaseCR(sync_env->async_rados, + sync_env->store, + rgw_raw_obj(sync_env->store->svc()->zone->get_zone_params().log_pool, sync_env->status_oid()), + lock_name, lock_duration, this)); + lease_stack.reset(spawn(lease_cr.get(), false)); + } + while (!lease_cr->is_locked()) { + if (lease_cr->is_done()) { + ldpp_dout(dpp, 5) << "lease cr failed, done early " << dendl; + set_status("failed acquiring lock"); + return set_cr_error(lease_cr->get_ret_status()); + } + set_sleeping(true); + yield; + } + entries_index.reset(new RGWShardedOmapCRManager(sync_env->async_rados, sync_env->store, this, num_shards, + sync_env->store->svc()->zone->get_zone_params().log_pool, + mdlog_sync_full_sync_index_prefix)); + yield { + call(new RGWReadRESTResourceCR<list<string> >(cct, conn, sync_env->http_manager, + "/admin/metadata", NULL, §ions)); + } + if (get_ret_status() < 0) { + ldpp_dout(dpp, 0) << "ERROR: failed to fetch metadata sections" << dendl; + yield entries_index->finish(); + yield lease_cr->go_down(); + drain_all(); + return set_cr_error(get_ret_status()); + } + rearrange_sections(); + sections_iter = sections.begin(); + for (; sections_iter != sections.end(); ++sections_iter) { + do { + yield { +#define META_FULL_SYNC_CHUNK_SIZE "1000" + string entrypoint = string("/admin/metadata/") + *sections_iter; + rgw_http_param_pair pairs[] = { { "max-entries", META_FULL_SYNC_CHUNK_SIZE }, + { "marker", result.marker.c_str() }, + { NULL, NULL } }; + result.keys.clear(); + call(new RGWReadRESTResourceCR<meta_list_result >(cct, conn, sync_env->http_manager, + entrypoint, pairs, &result)); + } + ret_status = get_ret_status(); + if (ret_status == -ENOENT) { + set_retcode(0); /* reset coroutine status so that we don't return it */ + ret_status = 0; + } + if (ret_status < 0) { + tn->log(0, SSTR("ERROR: failed to fetch metadata section: " << *sections_iter)); + yield entries_index->finish(); + yield lease_cr->go_down(); + drain_all(); + return set_cr_error(ret_status); + } + iter = result.keys.begin(); + for (; iter != result.keys.end(); ++iter) { + if (!lease_cr->is_locked()) { + lost_lock = true; + break; + } + yield; // allow entries_index consumer to make progress + + tn->log(20, SSTR("list metadata: section=" << *sections_iter << " key=" << *iter)); + string s = *sections_iter + ":" + *iter; + int shard_id; + rgw::sal::RGWRadosStore *store = sync_env->store; + int ret = store->ctl()->meta.mgr->get_shard_id(*sections_iter, *iter, &shard_id); + if (ret < 0) { + tn->log(0, SSTR("ERROR: could not determine shard id for " << *sections_iter << ":" << *iter)); + ret_status = ret; + break; + } + if (!entries_index->append(s, shard_id)) { + break; + } + } + } while (result.truncated); + } + yield { + if (!entries_index->finish()) { + failed = true; + } + } + if (!failed) { + for (map<uint32_t, rgw_meta_sync_marker>::iterator iter = markers.begin(); iter != markers.end(); ++iter) { + int shard_id = (int)iter->first; + rgw_meta_sync_marker& marker = iter->second; + marker.total_entries = entries_index->get_total_entries(shard_id); + spawn(new RGWSimpleRadosWriteCR<rgw_meta_sync_marker>(dpp, sync_env->async_rados, sync_env->store->svc()->sysobj, + rgw_raw_obj(sync_env->store->svc()->zone->get_zone_params().log_pool, sync_env->shard_obj_name(shard_id)), + marker), true); + } + } + + drain_all_but_stack(lease_stack.get()); /* the lease cr still needs to run */ + + yield lease_cr->go_down(); + + int ret; + while (collect(&ret, NULL)) { + if (ret < 0) { + return set_cr_error(ret); + } + yield; + } + drain_all(); + if (failed) { + yield return set_cr_error(-EIO); + } + if (lost_lock) { + yield return set_cr_error(-EBUSY); + } + + if (ret_status < 0) { + yield return set_cr_error(ret_status); + } + + yield return set_cr_done(); + } + return 0; + } +}; + +static string full_sync_index_shard_oid(int shard_id) +{ + char buf[mdlog_sync_full_sync_index_prefix.size() + 16]; + snprintf(buf, sizeof(buf), "%s.%d", mdlog_sync_full_sync_index_prefix.c_str(), shard_id); + return string(buf); +} + +class RGWReadRemoteMetadataCR : public RGWCoroutine { + RGWMetaSyncEnv *sync_env; + + RGWRESTReadResource *http_op; + + string section; + string key; + + bufferlist *pbl; + + RGWSyncTraceNodeRef tn; + +public: + RGWReadRemoteMetadataCR(RGWMetaSyncEnv *_sync_env, + const string& _section, const string& _key, bufferlist *_pbl, + const RGWSyncTraceNodeRef& _tn_parent) : RGWCoroutine(_sync_env->cct), sync_env(_sync_env), + http_op(NULL), + section(_section), + key(_key), + pbl(_pbl) { + tn = sync_env->sync_tracer->add_node(_tn_parent, "read_remote_meta", + section + ":" + key); + } + + int operate(const DoutPrefixProvider *dpp) override { + RGWRESTConn *conn = sync_env->conn; + reenter(this) { + yield { + string key_encode; + url_encode(key, key_encode); + rgw_http_param_pair pairs[] = { { "key" , key.c_str()}, + { NULL, NULL } }; + + string p = string("/admin/metadata/") + section + "/" + key_encode; + + http_op = new RGWRESTReadResource(conn, p, pairs, NULL, sync_env->http_manager); + + init_new_io(http_op); + + int ret = http_op->aio_read(dpp); + if (ret < 0) { + ldpp_dout(dpp, 0) << "ERROR: failed to fetch mdlog data" << dendl; + log_error() << "failed to send http operation: " << http_op->to_str() << " ret=" << ret << std::endl; + http_op->put(); + return set_cr_error(ret); + } + + return io_block(0); + } + yield { + int ret = http_op->wait(pbl, null_yield); + http_op->put(); + if (ret < 0) { + return set_cr_error(ret); + } + return set_cr_done(); + } + } + return 0; + } +}; + +class RGWAsyncMetaStoreEntry : public RGWAsyncRadosRequest { + rgw::sal::RGWRadosStore *store; + string raw_key; + bufferlist bl; + const DoutPrefixProvider *dpp; +protected: + int _send_request(const DoutPrefixProvider *dpp) override { + int ret = store->ctl()->meta.mgr->put(raw_key, bl, null_yield, dpp, RGWMDLogSyncType::APPLY_ALWAYS, true); + if (ret < 0) { + ldpp_dout(dpp, 0) << "ERROR: can't store key: " << raw_key << " ret=" << ret << dendl; + return ret; + } + return 0; + } +public: + RGWAsyncMetaStoreEntry(RGWCoroutine *caller, RGWAioCompletionNotifier *cn, rgw::sal::RGWRadosStore *_store, + const string& _raw_key, + bufferlist& _bl, + const DoutPrefixProvider *dpp) : RGWAsyncRadosRequest(caller, cn), store(_store), + raw_key(_raw_key), bl(_bl), dpp(dpp) {} +}; + + +class RGWMetaStoreEntryCR : public RGWSimpleCoroutine { + RGWMetaSyncEnv *sync_env; + string raw_key; + bufferlist bl; + + RGWAsyncMetaStoreEntry *req; + +public: + RGWMetaStoreEntryCR(RGWMetaSyncEnv *_sync_env, + const string& _raw_key, + bufferlist& _bl) : RGWSimpleCoroutine(_sync_env->cct), sync_env(_sync_env), + raw_key(_raw_key), bl(_bl), req(NULL) { + } + + ~RGWMetaStoreEntryCR() override { + if (req) { + req->finish(); + } + } + + int send_request(const DoutPrefixProvider *dpp) override { + req = new RGWAsyncMetaStoreEntry(this, stack->create_completion_notifier(), + sync_env->store, raw_key, bl, dpp); + sync_env->async_rados->queue(req); + return 0; + } + + int request_complete() override { + return req->get_ret_status(); + } +}; + +class RGWAsyncMetaRemoveEntry : public RGWAsyncRadosRequest { + rgw::sal::RGWRadosStore *store; + string raw_key; + const DoutPrefixProvider *dpp; +protected: + int _send_request(const DoutPrefixProvider *dpp) override { + int ret = store->ctl()->meta.mgr->remove(raw_key, null_yield, dpp); + if (ret < 0) { + ldpp_dout(dpp, 0) << "ERROR: can't remove key: " << raw_key << " ret=" << ret << dendl; + return ret; + } + return 0; + } +public: + RGWAsyncMetaRemoveEntry(RGWCoroutine *caller, RGWAioCompletionNotifier *cn, rgw::sal::RGWRadosStore *_store, + const string& _raw_key, const DoutPrefixProvider *dpp) : RGWAsyncRadosRequest(caller, cn), store(_store), + raw_key(_raw_key), dpp(dpp) {} +}; + + +class RGWMetaRemoveEntryCR : public RGWSimpleCoroutine { + RGWMetaSyncEnv *sync_env; + string raw_key; + + RGWAsyncMetaRemoveEntry *req; + +public: + RGWMetaRemoveEntryCR(RGWMetaSyncEnv *_sync_env, + const string& _raw_key) : RGWSimpleCoroutine(_sync_env->cct), sync_env(_sync_env), + raw_key(_raw_key), req(NULL) { + } + + ~RGWMetaRemoveEntryCR() override { + if (req) { + req->finish(); + } + } + + int send_request(const DoutPrefixProvider *dpp) override { + req = new RGWAsyncMetaRemoveEntry(this, stack->create_completion_notifier(), + sync_env->store, raw_key, dpp); + sync_env->async_rados->queue(req); + return 0; + } + + int request_complete() override { + int r = req->get_ret_status(); + if (r == -ENOENT) { + r = 0; + } + return r; + } +}; + +#define META_SYNC_UPDATE_MARKER_WINDOW 10 + + +int RGWLastCallerWinsCR::operate(const DoutPrefixProvider *dpp) { + RGWCoroutine *call_cr; + reenter(this) { + while (cr) { + call_cr = cr; + cr = nullptr; + yield call(call_cr); + /* cr might have been modified at this point */ + } + return set_cr_done(); + } + return 0; +} + +class RGWMetaSyncShardMarkerTrack : public RGWSyncShardMarkerTrack<string, string> { + RGWMetaSyncEnv *sync_env; + + string marker_oid; + rgw_meta_sync_marker sync_marker; + + RGWSyncTraceNodeRef tn; + +public: + RGWMetaSyncShardMarkerTrack(RGWMetaSyncEnv *_sync_env, + const string& _marker_oid, + const rgw_meta_sync_marker& _marker, + RGWSyncTraceNodeRef& _tn) : RGWSyncShardMarkerTrack(META_SYNC_UPDATE_MARKER_WINDOW), + sync_env(_sync_env), + marker_oid(_marker_oid), + sync_marker(_marker), + tn(_tn){} + + RGWCoroutine *store_marker(const string& new_marker, uint64_t index_pos, const real_time& timestamp) override { + sync_marker.marker = new_marker; + if (index_pos > 0) { + sync_marker.pos = index_pos; + } + + if (!real_clock::is_zero(timestamp)) { + sync_marker.timestamp = timestamp; + } + + ldpp_dout(sync_env->dpp, 20) << __func__ << "(): updating marker marker_oid=" << marker_oid << " marker=" << new_marker << " realm_epoch=" << sync_marker.realm_epoch << dendl; + tn->log(20, SSTR("new marker=" << new_marker)); + rgw::sal::RGWRadosStore *store = sync_env->store; + return new RGWSimpleRadosWriteCR<rgw_meta_sync_marker>(sync_env->dpp, sync_env->async_rados, + store->svc()->sysobj, + rgw_raw_obj(store->svc()->zone->get_zone_params().log_pool, marker_oid), + sync_marker); + } + + RGWOrderCallCR *allocate_order_control_cr() override { + return new RGWLastCallerWinsCR(sync_env->cct); + } +}; + +RGWMetaSyncSingleEntryCR::RGWMetaSyncSingleEntryCR(RGWMetaSyncEnv *_sync_env, + const string& _raw_key, const string& _entry_marker, + const RGWMDLogStatus& _op_status, + RGWMetaSyncShardMarkerTrack *_marker_tracker, const RGWSyncTraceNodeRef& _tn_parent) : RGWCoroutine(_sync_env->cct), + sync_env(_sync_env), + raw_key(_raw_key), entry_marker(_entry_marker), + op_status(_op_status), + pos(0), sync_status(0), + marker_tracker(_marker_tracker), tries(0) { + error_injection = (sync_env->cct->_conf->rgw_sync_meta_inject_err_probability > 0); + tn = sync_env->sync_tracer->add_node(_tn_parent, "entry", raw_key); +} + +int RGWMetaSyncSingleEntryCR::operate(const DoutPrefixProvider *dpp) { + reenter(this) { +#define NUM_TRANSIENT_ERROR_RETRIES 10 + + if (error_injection && + rand() % 10000 < cct->_conf->rgw_sync_meta_inject_err_probability * 10000.0) { + return set_cr_error(-EIO); + } + + if (op_status != MDLOG_STATUS_COMPLETE) { + tn->log(20, "skipping pending operation"); + yield call(marker_tracker->finish(entry_marker)); + if (retcode < 0) { + return set_cr_error(retcode); + } + return set_cr_done(); + } + tn->set_flag(RGW_SNS_FLAG_ACTIVE); + for (tries = 0; tries < NUM_TRANSIENT_ERROR_RETRIES; tries++) { + yield { + pos = raw_key.find(':'); + section = raw_key.substr(0, pos); + key = raw_key.substr(pos + 1); + tn->log(10, SSTR("fetching remote metadata entry" << (tries == 0 ? "" : " (retry)"))); + call(new RGWReadRemoteMetadataCR(sync_env, section, key, &md_bl, tn)); + } + + sync_status = retcode; + + if (sync_status == -ENOENT) { + /* FIXME: do we need to remove the entry from the local zone? */ + break; + } + + if (tries < NUM_TRANSIENT_ERROR_RETRIES - 1) { + ldpp_dout(dpp, 20) << *this << ": failed to fetch remote metadata: " << section << ":" << key << ", will retry" << dendl; + continue; + } + + if (sync_status < 0) { + tn->log(10, SSTR("failed to send read remote metadata entry: section=" << section << " key=" << key << " status=" << sync_status)); + log_error() << "failed to send read remote metadata entry: section=" << section << " key=" << key << " status=" << sync_status << std::endl; + yield call(sync_env->error_logger->log_error_cr(dpp, sync_env->conn->get_remote_id(), section, key, -sync_status, + string("failed to read remote metadata entry: ") + cpp_strerror(-sync_status))); + return set_cr_error(sync_status); + } + + break; + } + + retcode = 0; + for (tries = 0; tries < NUM_TRANSIENT_ERROR_RETRIES; tries++) { + if (sync_status != -ENOENT) { + tn->log(10, SSTR("storing local metadata entry")); + yield call(new RGWMetaStoreEntryCR(sync_env, raw_key, md_bl)); + } else { + tn->log(10, SSTR("removing local metadata entry")); + yield call(new RGWMetaRemoveEntryCR(sync_env, raw_key)); + } + if (tries < NUM_TRANSIENT_ERROR_RETRIES - 1) { + ldpp_dout(dpp, 20) << *this << ": failed to store metadata: " << section << ":" << key << ", got retcode=" << retcode << dendl; + continue; + } + break; + } + + sync_status = retcode; + + if (sync_status == 0 && marker_tracker) { + /* update marker */ + yield call(marker_tracker->finish(entry_marker)); + sync_status = retcode; + } + if (sync_status < 0) { + tn->log(10, SSTR("failed, status=" << sync_status)); + return set_cr_error(sync_status); + } + tn->log(10, "success"); + return set_cr_done(); + } + return 0; +} + +class RGWCloneMetaLogCoroutine : public RGWCoroutine { + RGWMetaSyncEnv *sync_env; + RGWMetadataLog *mdlog; + + const std::string& period; + int shard_id; + string marker; + bool truncated = false; + string *new_marker; + + int max_entries = CLONE_MAX_ENTRIES; + + RGWRESTReadResource *http_op = nullptr; + boost::intrusive_ptr<RGWMetadataLogInfoCompletion> completion; + + RGWMetadataLogInfo shard_info; + rgw_mdlog_shard_data data; + +public: + RGWCloneMetaLogCoroutine(RGWMetaSyncEnv *_sync_env, RGWMetadataLog* mdlog, + const std::string& period, int _id, + const string& _marker, string *_new_marker) + : RGWCoroutine(_sync_env->cct), sync_env(_sync_env), mdlog(mdlog), + period(period), shard_id(_id), marker(_marker), new_marker(_new_marker) { + if (new_marker) { + *new_marker = marker; + } + } + ~RGWCloneMetaLogCoroutine() override { + if (http_op) { + http_op->put(); + } + if (completion) { + completion->cancel(); + } + } + + int operate(const DoutPrefixProvider *dpp) override; + + int state_init(); + int state_read_shard_status(); + int state_read_shard_status_complete(); + int state_send_rest_request(const DoutPrefixProvider *dpp); + int state_receive_rest_response(); + int state_store_mdlog_entries(); + int state_store_mdlog_entries_complete(); +}; + +#define META_SYNC_SPAWN_WINDOW 20 + +class RGWMetaSyncShardCR : public RGWCoroutine { + RGWMetaSyncEnv *sync_env; + + const rgw_pool& pool; + const std::string& period; //< currently syncing period id + const epoch_t realm_epoch; //< realm_epoch of period + RGWMetadataLog* mdlog; //< log of syncing period + uint32_t shard_id; + rgw_meta_sync_marker& sync_marker; + boost::optional<rgw_meta_sync_marker> temp_marker; //< for pending updates + string marker; + string max_marker; + const std::string& period_marker; //< max marker stored in next period + + RGWRadosGetOmapKeysCR::ResultPtr omapkeys; + std::set<std::string> entries; + std::set<std::string>::iterator iter; + + string oid; + + RGWMetaSyncShardMarkerTrack *marker_tracker = nullptr; + + list<cls_log_entry> log_entries; + list<cls_log_entry>::iterator log_iter; + bool truncated = false; + + string mdlog_marker; + string raw_key; + rgw_mdlog_entry mdlog_entry; + + ceph::mutex inc_lock = ceph::make_mutex("RGWMetaSyncShardCR::inc_lock"); + ceph::condition_variable inc_cond; + + boost::asio::coroutine incremental_cr; + boost::asio::coroutine full_cr; + + boost::intrusive_ptr<RGWContinuousLeaseCR> lease_cr; + boost::intrusive_ptr<RGWCoroutinesStack> lease_stack; + + bool lost_lock = false; + + bool *reset_backoff; + + // hold a reference to the cr stack while it's in the map + using StackRef = boost::intrusive_ptr<RGWCoroutinesStack>; + map<StackRef, string> stack_to_pos; + map<string, string> pos_to_prev; + + bool can_adjust_marker = false; + bool done_with_period = false; + + int total_entries = 0; + + RGWSyncTraceNodeRef tn; +public: + RGWMetaSyncShardCR(RGWMetaSyncEnv *_sync_env, const rgw_pool& _pool, + const std::string& period, epoch_t realm_epoch, + RGWMetadataLog* mdlog, uint32_t _shard_id, + rgw_meta_sync_marker& _marker, + const std::string& period_marker, bool *_reset_backoff, + RGWSyncTraceNodeRef& _tn) + : RGWCoroutine(_sync_env->cct), sync_env(_sync_env), pool(_pool), + period(period), realm_epoch(realm_epoch), mdlog(mdlog), + shard_id(_shard_id), sync_marker(_marker), + period_marker(period_marker), + reset_backoff(_reset_backoff), tn(_tn) { + *reset_backoff = false; + } + + ~RGWMetaSyncShardCR() override { + delete marker_tracker; + if (lease_cr) { + lease_cr->abort(); + } + } + + void set_marker_tracker(RGWMetaSyncShardMarkerTrack *mt) { + delete marker_tracker; + marker_tracker = mt; + } + + int operate(const DoutPrefixProvider *dpp) override { + int r; + while (true) { + switch (sync_marker.state) { + case rgw_meta_sync_marker::FullSync: + r = full_sync(); + if (r < 0) { + ldpp_dout(dpp, 10) << "sync: full_sync: shard_id=" << shard_id << " r=" << r << dendl; + return set_cr_error(r); + } + return 0; + case rgw_meta_sync_marker::IncrementalSync: + r = incremental_sync(); + if (r < 0) { + ldpp_dout(dpp, 10) << "sync: incremental_sync: shard_id=" << shard_id << " r=" << r << dendl; + return set_cr_error(r); + } + return 0; + } + } + /* unreachable */ + return 0; + } + + void collect_children() + { + int child_ret; + RGWCoroutinesStack *child; + while (collect_next(&child_ret, &child)) { + auto iter = stack_to_pos.find(child); + if (iter == stack_to_pos.end()) { + /* some other stack that we don't care about */ + continue; + } + + string& pos = iter->second; + + if (child_ret < 0) { + ldpp_dout(sync_env->dpp, 0) << *this << ": child operation stack=" << child << " entry=" << pos << " returned " << child_ret << dendl; + // on any error code from RGWMetaSyncSingleEntryCR, we do not advance + // the sync status marker past this entry, and set + // can_adjust_marker=false to exit out of RGWMetaSyncShardCR. + // RGWMetaSyncShardControlCR will rerun RGWMetaSyncShardCR from the + // previous marker and retry + can_adjust_marker = false; + } + + map<string, string>::iterator prev_iter = pos_to_prev.find(pos); + ceph_assert(prev_iter != pos_to_prev.end()); + + if (pos_to_prev.size() == 1) { + if (can_adjust_marker) { + sync_marker.marker = pos; + } + pos_to_prev.erase(prev_iter); + } else { + ceph_assert(pos_to_prev.size() > 1); + pos_to_prev.erase(prev_iter); + prev_iter = pos_to_prev.begin(); + if (can_adjust_marker) { + sync_marker.marker = prev_iter->second; + } + } + + ldpp_dout(sync_env->dpp, 4) << *this << ": adjusting marker pos=" << sync_marker.marker << dendl; + stack_to_pos.erase(iter); + } + } + + int full_sync() { +#define OMAP_GET_MAX_ENTRIES 100 + int max_entries = OMAP_GET_MAX_ENTRIES; + reenter(&full_cr) { + set_status("full_sync"); + tn->log(10, "start full sync"); + oid = full_sync_index_shard_oid(shard_id); + can_adjust_marker = true; + /* grab lock */ + yield { + uint32_t lock_duration = cct->_conf->rgw_sync_lease_period; + string lock_name = "sync_lock"; + rgw::sal::RGWRadosStore *store = sync_env->store; + lease_cr.reset(new RGWContinuousLeaseCR(sync_env->async_rados, store, + rgw_raw_obj(pool, sync_env->shard_obj_name(shard_id)), + lock_name, lock_duration, this)); + lease_stack.reset(spawn(lease_cr.get(), false)); + lost_lock = false; + } + while (!lease_cr->is_locked()) { + if (lease_cr->is_done()) { + drain_all(); + tn->log(5, "failed to take lease"); + return lease_cr->get_ret_status(); + } + set_sleeping(true); + yield; + } + tn->log(10, "took lease"); + + /* lock succeeded, a retry now should avoid previous backoff status */ + *reset_backoff = true; + + /* prepare marker tracker */ + set_marker_tracker(new RGWMetaSyncShardMarkerTrack(sync_env, + sync_env->shard_obj_name(shard_id), + sync_marker, tn)); + + marker = sync_marker.marker; + + total_entries = sync_marker.pos; + + /* sync! */ + do { + if (!lease_cr->is_locked()) { + tn->log(10, "lost lease"); + lost_lock = true; + break; + } + omapkeys = std::make_shared<RGWRadosGetOmapKeysCR::Result>(); + yield call(new RGWRadosGetOmapKeysCR(sync_env->store, rgw_raw_obj(pool, oid), + marker, max_entries, omapkeys)); + if (retcode < 0) { + ldpp_dout(sync_env->dpp, 0) << "ERROR: " << __func__ << "(): RGWRadosGetOmapKeysCR() returned ret=" << retcode << dendl; + tn->log(0, SSTR("ERROR: failed to list omap keys, status=" << retcode)); + yield lease_cr->go_down(); + drain_all(); + return retcode; + } + entries = std::move(omapkeys->entries); + tn->log(20, SSTR("retrieved " << entries.size() << " entries to sync")); + if (entries.size() > 0) { + tn->set_flag(RGW_SNS_FLAG_ACTIVE); /* actually have entries to sync */ + } + iter = entries.begin(); + for (; iter != entries.end(); ++iter) { + marker = *iter; + tn->log(20, SSTR("full sync: " << marker)); + total_entries++; + if (!marker_tracker->start(marker, total_entries, real_time())) { + tn->log(0, SSTR("ERROR: cannot start syncing " << marker << ". Duplicate entry?")); + } else { + // fetch remote and write locally + yield { + RGWCoroutinesStack *stack = spawn(new RGWMetaSyncSingleEntryCR(sync_env, marker, marker, MDLOG_STATUS_COMPLETE, marker_tracker, tn), false); + // stack_to_pos holds a reference to the stack + stack_to_pos[stack] = marker; + pos_to_prev[marker] = marker; + } + // limit spawn window + while (num_spawned() > META_SYNC_SPAWN_WINDOW) { + yield wait_for_child(); + collect_children(); + } + } + } + collect_children(); + } while (omapkeys->more && can_adjust_marker); + + tn->unset_flag(RGW_SNS_FLAG_ACTIVE); /* actually have entries to sync */ + + while (num_spawned() > 1) { + yield wait_for_child(); + collect_children(); + } + + if (!lost_lock) { + /* update marker to reflect we're done with full sync */ + if (can_adjust_marker) { + // apply updates to a temporary marker, or operate() will send us + // to incremental_sync() after we yield + temp_marker = sync_marker; + temp_marker->state = rgw_meta_sync_marker::IncrementalSync; + temp_marker->marker = std::move(temp_marker->next_step_marker); + temp_marker->next_step_marker.clear(); + temp_marker->realm_epoch = realm_epoch; + ldpp_dout(sync_env->dpp, 4) << *this << ": saving marker pos=" << temp_marker->marker << " realm_epoch=" << realm_epoch << dendl; + + using WriteMarkerCR = RGWSimpleRadosWriteCR<rgw_meta_sync_marker>; + yield call(new WriteMarkerCR(sync_env->dpp, sync_env->async_rados, sync_env->store->svc()->sysobj, + rgw_raw_obj(pool, sync_env->shard_obj_name(shard_id)), + *temp_marker)); + } + + if (retcode < 0) { + ldpp_dout(sync_env->dpp, 0) << "ERROR: failed to set sync marker: retcode=" << retcode << dendl; + yield lease_cr->go_down(); + drain_all(); + return retcode; + } + } + + /* + * if we reached here, it means that lost_lock is true, otherwise the state + * change in the previous block will prevent us from reaching here + */ + + yield lease_cr->go_down(); + + lease_cr.reset(); + + drain_all(); + + if (!can_adjust_marker) { + return -EAGAIN; + } + + if (lost_lock) { + return -EBUSY; + } + + tn->log(10, "full sync complete"); + + // apply the sync marker update + ceph_assert(temp_marker); + sync_marker = std::move(*temp_marker); + temp_marker = boost::none; + // must not yield after this point! + } + return 0; + } + + + int incremental_sync() { + reenter(&incremental_cr) { + set_status("incremental_sync"); + tn->log(10, "start incremental sync"); + can_adjust_marker = true; + /* grab lock */ + if (!lease_cr) { /* could have had a lease_cr lock from previous state */ + yield { + uint32_t lock_duration = cct->_conf->rgw_sync_lease_period; + string lock_name = "sync_lock"; + rgw::sal::RGWRadosStore *store = sync_env->store; + lease_cr.reset( new RGWContinuousLeaseCR(sync_env->async_rados, store, + rgw_raw_obj(pool, sync_env->shard_obj_name(shard_id)), + lock_name, lock_duration, this)); + lease_stack.reset(spawn(lease_cr.get(), false)); + lost_lock = false; + } + while (!lease_cr->is_locked()) { + if (lease_cr->is_done()) { + drain_all(); + tn->log(10, "failed to take lease"); + return lease_cr->get_ret_status(); + } + set_sleeping(true); + yield; + } + } + tn->log(10, "took lease"); + // if the period has advanced, we can't use the existing marker + if (sync_marker.realm_epoch < realm_epoch) { + ldpp_dout(sync_env->dpp, 4) << "clearing marker=" << sync_marker.marker + << " from old realm_epoch=" << sync_marker.realm_epoch + << " (now " << realm_epoch << ')' << dendl; + sync_marker.realm_epoch = realm_epoch; + sync_marker.marker.clear(); + } + mdlog_marker = sync_marker.marker; + set_marker_tracker(new RGWMetaSyncShardMarkerTrack(sync_env, + sync_env->shard_obj_name(shard_id), + sync_marker, tn)); + + /* + * mdlog_marker: the remote sync marker positiion + * sync_marker: the local sync marker position + * max_marker: the max mdlog position that we fetched + * marker: the current position we try to sync + * period_marker: the last marker before the next period begins (optional) + */ + marker = max_marker = sync_marker.marker; + /* inc sync */ + do { + if (!lease_cr->is_locked()) { + lost_lock = true; + tn->log(10, "lost lease"); + break; + } +#define INCREMENTAL_MAX_ENTRIES 100 + ldpp_dout(sync_env->dpp, 20) << __func__ << ":" << __LINE__ << ": shard_id=" << shard_id << " mdlog_marker=" << mdlog_marker << " sync_marker.marker=" << sync_marker.marker << " period_marker=" << period_marker << dendl; + if (!period_marker.empty() && period_marker <= mdlog_marker) { + tn->log(10, SSTR("finished syncing current period: mdlog_marker=" << mdlog_marker << " sync_marker=" << sync_marker.marker << " period_marker=" << period_marker)); + done_with_period = true; + break; + } + if (mdlog_marker <= max_marker) { + /* we're at the tip, try to bring more entries */ + ldpp_dout(sync_env->dpp, 20) << __func__ << ":" << __LINE__ << ": shard_id=" << shard_id << " syncing mdlog for shard_id=" << shard_id << dendl; + yield call(new RGWCloneMetaLogCoroutine(sync_env, mdlog, + period, shard_id, + mdlog_marker, &mdlog_marker)); + } + if (retcode < 0) { + tn->log(10, SSTR(*this << ": failed to fetch more log entries, retcode=" << retcode)); + yield lease_cr->go_down(); + drain_all(); + *reset_backoff = false; // back off and try again later + return retcode; + } + *reset_backoff = true; /* if we got to this point, all systems function */ + if (mdlog_marker > max_marker) { + tn->set_flag(RGW_SNS_FLAG_ACTIVE); /* actually have entries to sync */ + tn->log(20, SSTR("mdlog_marker=" << mdlog_marker << " sync_marker=" << sync_marker.marker)); + marker = max_marker; + yield call(new RGWReadMDLogEntriesCR(sync_env, mdlog, shard_id, + &max_marker, INCREMENTAL_MAX_ENTRIES, + &log_entries, &truncated)); + if (retcode < 0) { + tn->log(10, SSTR("failed to list mdlog entries, retcode=" << retcode)); + yield lease_cr->go_down(); + drain_all(); + *reset_backoff = false; // back off and try again later + return retcode; + } + for (log_iter = log_entries.begin(); log_iter != log_entries.end() && !done_with_period; ++log_iter) { + if (!period_marker.empty() && period_marker <= log_iter->id) { + done_with_period = true; + if (period_marker < log_iter->id) { + tn->log(10, SSTR("found key=" << log_iter->id + << " past period_marker=" << period_marker)); + break; + } + ldpp_dout(sync_env->dpp, 10) << "found key at period_marker=" << period_marker << dendl; + // sync this entry, then return control to RGWMetaSyncCR + } + if (!mdlog_entry.convert_from(*log_iter)) { + tn->log(0, SSTR("ERROR: failed to convert mdlog entry, shard_id=" << shard_id << " log_entry: " << log_iter->id << ":" << log_iter->section << ":" << log_iter->name << ":" << log_iter->timestamp << " ... skipping entry")); + continue; + } + tn->log(20, SSTR("log_entry: " << log_iter->id << ":" << log_iter->section << ":" << log_iter->name << ":" << log_iter->timestamp)); + if (!marker_tracker->start(log_iter->id, 0, log_iter->timestamp.to_real_time())) { + ldpp_dout(sync_env->dpp, 0) << "ERROR: cannot start syncing " << log_iter->id << ". Duplicate entry?" << dendl; + } else { + raw_key = log_iter->section + ":" + log_iter->name; + yield { + RGWCoroutinesStack *stack = spawn(new RGWMetaSyncSingleEntryCR(sync_env, raw_key, log_iter->id, mdlog_entry.log_data.status, marker_tracker, tn), false); + ceph_assert(stack); + // stack_to_pos holds a reference to the stack + stack_to_pos[stack] = log_iter->id; + pos_to_prev[log_iter->id] = marker; + } + // limit spawn window + while (num_spawned() > META_SYNC_SPAWN_WINDOW) { + yield wait_for_child(); + collect_children(); + } + } + marker = log_iter->id; + } + } + collect_children(); + ldpp_dout(sync_env->dpp, 20) << __func__ << ":" << __LINE__ << ": shard_id=" << shard_id << " mdlog_marker=" << mdlog_marker << " max_marker=" << max_marker << " sync_marker.marker=" << sync_marker.marker << " period_marker=" << period_marker << dendl; + if (done_with_period) { + // return control to RGWMetaSyncCR and advance to the next period + tn->log(10, SSTR(*this << ": done with period")); + break; + } + if (mdlog_marker == max_marker && can_adjust_marker) { + tn->unset_flag(RGW_SNS_FLAG_ACTIVE); +#define INCREMENTAL_INTERVAL 20 + yield wait(utime_t(INCREMENTAL_INTERVAL, 0)); + } + } while (can_adjust_marker); + + tn->unset_flag(RGW_SNS_FLAG_ACTIVE); + + while (num_spawned() > 1) { + yield wait_for_child(); + collect_children(); + } + + yield lease_cr->go_down(); + + drain_all(); + + if (lost_lock) { + return -EBUSY; + } + + if (!can_adjust_marker) { + return -EAGAIN; + } + + return set_cr_done(); + } + /* TODO */ + return 0; + } +}; + +class RGWMetaSyncShardControlCR : public RGWBackoffControlCR +{ + RGWMetaSyncEnv *sync_env; + + const rgw_pool& pool; + const std::string& period; + epoch_t realm_epoch; + RGWMetadataLog* mdlog; + uint32_t shard_id; + rgw_meta_sync_marker sync_marker; + const std::string period_marker; + + RGWSyncTraceNodeRef tn; + + static constexpr bool exit_on_error = false; // retry on all errors +public: + RGWMetaSyncShardControlCR(RGWMetaSyncEnv *_sync_env, const rgw_pool& _pool, + const std::string& period, epoch_t realm_epoch, + RGWMetadataLog* mdlog, uint32_t _shard_id, + const rgw_meta_sync_marker& _marker, + std::string&& period_marker, + RGWSyncTraceNodeRef& _tn_parent) + : RGWBackoffControlCR(_sync_env->cct, exit_on_error), sync_env(_sync_env), + pool(_pool), period(period), realm_epoch(realm_epoch), mdlog(mdlog), + shard_id(_shard_id), sync_marker(_marker), + period_marker(std::move(period_marker)) { + tn = sync_env->sync_tracer->add_node(_tn_parent, "shard", + std::to_string(shard_id)); + } + + RGWCoroutine *alloc_cr() override { + return new RGWMetaSyncShardCR(sync_env, pool, period, realm_epoch, mdlog, + shard_id, sync_marker, period_marker, backoff_ptr(), tn); + } + + RGWCoroutine *alloc_finisher_cr() override { + rgw::sal::RGWRadosStore *store = sync_env->store; + return new RGWSimpleRadosReadCR<rgw_meta_sync_marker>(sync_env->dpp, sync_env->async_rados, store->svc()->sysobj, + rgw_raw_obj(pool, sync_env->shard_obj_name(shard_id)), + &sync_marker); + } +}; + +class RGWMetaSyncCR : public RGWCoroutine { + RGWMetaSyncEnv *sync_env; + const rgw_pool& pool; + RGWPeriodHistory::Cursor cursor; //< sync position in period history + RGWPeriodHistory::Cursor next; //< next period in history + rgw_meta_sync_status sync_status; + RGWSyncTraceNodeRef tn; + + std::mutex mutex; //< protect access to shard_crs + + // TODO: it should be enough to hold a reference on the stack only, as calling + // RGWCoroutinesStack::wakeup() doesn't refer to the RGWCoroutine if it has + // already completed + using ControlCRRef = boost::intrusive_ptr<RGWMetaSyncShardControlCR>; + using StackRef = boost::intrusive_ptr<RGWCoroutinesStack>; + using RefPair = std::pair<ControlCRRef, StackRef>; + map<int, RefPair> shard_crs; + int ret{0}; + +public: + RGWMetaSyncCR(RGWMetaSyncEnv *_sync_env, const RGWPeriodHistory::Cursor &cursor, + const rgw_meta_sync_status& _sync_status, RGWSyncTraceNodeRef& _tn) + : RGWCoroutine(_sync_env->cct), sync_env(_sync_env), + pool(sync_env->store->svc()->zone->get_zone_params().log_pool), + cursor(cursor), sync_status(_sync_status), tn(_tn) {} + + ~RGWMetaSyncCR() { + } + + int operate(const DoutPrefixProvider *dpp) override { + reenter(this) { + // loop through one period at a time + tn->log(1, "start"); + for (;;) { + if (cursor == sync_env->store->svc()->mdlog->get_period_history()->get_current()) { + next = RGWPeriodHistory::Cursor{}; + if (cursor) { + ldpp_dout(dpp, 10) << "RGWMetaSyncCR on current period=" + << cursor.get_period().get_id() << dendl; + } else { + ldpp_dout(dpp, 10) << "RGWMetaSyncCR with no period" << dendl; + } + } else { + next = cursor; + next.next(); + ldpp_dout(dpp, 10) << "RGWMetaSyncCR on period=" + << cursor.get_period().get_id() << ", next=" + << next.get_period().get_id() << dendl; + } + + yield { + // get the mdlog for the current period (may be empty) + auto& period_id = sync_status.sync_info.period; + auto realm_epoch = sync_status.sync_info.realm_epoch; + auto mdlog = sync_env->store->svc()->mdlog->get_log(period_id); + + tn->log(1, SSTR("realm epoch=" << realm_epoch << " period id=" << period_id)); + + // prevent wakeup() from accessing shard_crs while we're spawning them + std::lock_guard<std::mutex> lock(mutex); + + // sync this period on each shard + for (const auto& m : sync_status.sync_markers) { + uint32_t shard_id = m.first; + auto& marker = m.second; + + std::string period_marker; + if (next) { + // read the maximum marker from the next period's sync status + period_marker = next.get_period().get_sync_status()[shard_id]; + if (period_marker.empty()) { + // no metadata changes have occurred on this shard, skip it + ldpp_dout(dpp, 10) << "RGWMetaSyncCR: skipping shard " << shard_id + << " with empty period marker" << dendl; + continue; + } + } + + using ShardCR = RGWMetaSyncShardControlCR; + auto cr = new ShardCR(sync_env, pool, period_id, realm_epoch, + mdlog, shard_id, marker, + std::move(period_marker), tn); + auto stack = spawn(cr, false); + shard_crs[shard_id] = RefPair{cr, stack}; + } + } + // wait for each shard to complete + while (ret == 0 && num_spawned() > 0) { + yield wait_for_child(); + collect(&ret, nullptr); + } + drain_all(); + { + // drop shard cr refs under lock + std::lock_guard<std::mutex> lock(mutex); + shard_crs.clear(); + } + if (ret < 0) { + return set_cr_error(ret); + } + // advance to the next period + ceph_assert(next); + cursor = next; + + // write the updated sync info + sync_status.sync_info.period = cursor.get_period().get_id(); + sync_status.sync_info.realm_epoch = cursor.get_epoch(); + yield call(new RGWSimpleRadosWriteCR<rgw_meta_sync_info>(dpp, sync_env->async_rados, + sync_env->store->svc()->sysobj, + rgw_raw_obj(pool, sync_env->status_oid()), + sync_status.sync_info)); + } + } + return 0; + } + + void wakeup(int shard_id) { + std::lock_guard<std::mutex> lock(mutex); + auto iter = shard_crs.find(shard_id); + if (iter == shard_crs.end()) { + return; + } + iter->second.first->wakeup(); + } +}; + +void RGWRemoteMetaLog::init_sync_env(RGWMetaSyncEnv *env) { + env->dpp = dpp; + env->cct = store->ctx(); + env->store = store; + env->conn = conn; + env->async_rados = async_rados; + env->http_manager = &http_manager; + env->error_logger = error_logger; + env->sync_tracer = store->getRados()->get_sync_tracer(); +} + +int RGWRemoteMetaLog::read_sync_status(const DoutPrefixProvider *dpp, rgw_meta_sync_status *sync_status) +{ + if (store->svc()->zone->is_meta_master()) { + return 0; + } + // cannot run concurrently with run_sync(), so run in a separate manager + RGWCoroutinesManager crs(store->ctx(), store->getRados()->get_cr_registry()); + RGWHTTPManager http_manager(store->ctx(), crs.get_completion_mgr()); + int ret = http_manager.start(); + if (ret < 0) { + ldpp_dout(dpp, 0) << "failed in http_manager.start() ret=" << ret << dendl; + return ret; + } + RGWMetaSyncEnv sync_env_local = sync_env; + sync_env_local.http_manager = &http_manager; + tn->log(20, "read sync status"); + ret = crs.run(dpp, new RGWReadSyncStatusCoroutine(&sync_env_local, sync_status)); + http_manager.stop(); + return ret; +} + +int RGWRemoteMetaLog::init_sync_status(const DoutPrefixProvider *dpp) +{ + if (store->svc()->zone->is_meta_master()) { + return 0; + } + + rgw_mdlog_info mdlog_info; + int r = read_log_info(dpp, &mdlog_info); + if (r < 0) { + ldpp_dout(dpp, -1) << "ERROR: fail to fetch master log info (r=" << r << ")" << dendl; + return r; + } + + rgw_meta_sync_info sync_info; + sync_info.num_shards = mdlog_info.num_shards; + auto cursor = store->svc()->mdlog->get_period_history()->get_current(); + if (cursor) { + sync_info.period = cursor.get_period().get_id(); + sync_info.realm_epoch = cursor.get_epoch(); + } + + return run(dpp, new RGWInitSyncStatusCoroutine(&sync_env, sync_info)); +} + +int RGWRemoteMetaLog::store_sync_info(const DoutPrefixProvider *dpp, const rgw_meta_sync_info& sync_info) +{ + tn->log(20, "store sync info"); + return run(dpp, new RGWSimpleRadosWriteCR<rgw_meta_sync_info>(dpp, async_rados, store->svc()->sysobj, + rgw_raw_obj(store->svc()->zone->get_zone_params().log_pool, sync_env.status_oid()), + sync_info)); +} + +// return a cursor to the period at our sync position +static RGWPeriodHistory::Cursor get_period_at(const DoutPrefixProvider *dpp, + rgw::sal::RGWRadosStore* store, + const rgw_meta_sync_info& info, + optional_yield y) +{ + if (info.period.empty()) { + // return an empty cursor with error=0 + return RGWPeriodHistory::Cursor{}; + } + + // look for an existing period in our history + auto cursor = store->svc()->mdlog->get_period_history()->lookup(info.realm_epoch); + if (cursor) { + // verify that the period ids match + auto& existing = cursor.get_period().get_id(); + if (existing != info.period) { + ldpp_dout(dpp, -1) << "ERROR: sync status period=" << info.period + << " does not match period=" << existing + << " in history at realm epoch=" << info.realm_epoch << dendl; + return RGWPeriodHistory::Cursor{-EEXIST}; + } + return cursor; + } + + // read the period from rados or pull it from the master + RGWPeriod period; + int r = store->svc()->mdlog->pull_period(dpp, info.period, period, y); + if (r < 0) { + ldpp_dout(dpp, -1) << "ERROR: failed to read period id " + << info.period << ": " << cpp_strerror(r) << dendl; + return RGWPeriodHistory::Cursor{r}; + } + // attach the period to our history + cursor = store->svc()->mdlog->get_period_history()->attach(dpp, std::move(period), y); + if (!cursor) { + r = cursor.get_error(); + ldpp_dout(dpp, -1) << "ERROR: failed to read period history back to " + << info.period << ": " << cpp_strerror(r) << dendl; + } + return cursor; +} + +int RGWRemoteMetaLog::run_sync(const DoutPrefixProvider *dpp, optional_yield y) +{ + if (store->svc()->zone->is_meta_master()) { + return 0; + } + + int r = 0; + + // get shard count and oldest log period from master + rgw_mdlog_info mdlog_info; + for (;;) { + if (going_down) { + ldpp_dout(dpp, 1) << __func__ << "(): going down" << dendl; + return 0; + } + r = read_log_info(dpp, &mdlog_info); + if (r == -EIO || r == -ENOENT) { + // keep retrying if master isn't alive or hasn't initialized the log + ldpp_dout(dpp, 10) << __func__ << "(): waiting for master.." << dendl; + backoff.backoff_sleep(); + continue; + } + backoff.reset(); + if (r < 0) { + ldpp_dout(dpp, -1) << "ERROR: fail to fetch master log info (r=" << r << ")" << dendl; + return r; + } + break; + } + + rgw_meta_sync_status sync_status; + do { + if (going_down) { + ldpp_dout(dpp, 1) << __func__ << "(): going down" << dendl; + return 0; + } + r = run(dpp, new RGWReadSyncStatusCoroutine(&sync_env, &sync_status)); + if (r < 0 && r != -ENOENT) { + ldpp_dout(dpp, 0) << "ERROR: failed to fetch sync status r=" << r << dendl; + return r; + } + + if (!mdlog_info.period.empty()) { + // restart sync if the remote has a period, but: + // a) our status does not, or + // b) our sync period comes before the remote's oldest log period + if (sync_status.sync_info.period.empty() || + sync_status.sync_info.realm_epoch < mdlog_info.realm_epoch) { + sync_status.sync_info.state = rgw_meta_sync_info::StateInit; + string reason; + if (sync_status.sync_info.period.empty()) { + reason = "period is empty"; + } else { + reason = SSTR("sync_info realm epoch is behind: " << sync_status.sync_info.realm_epoch << " < " << mdlog_info.realm_epoch); + } + tn->log(1, "initialize sync (reason: " + reason + ")"); + ldpp_dout(dpp, 1) << "epoch=" << sync_status.sync_info.realm_epoch + << " in sync status comes before remote's oldest mdlog epoch=" + << mdlog_info.realm_epoch << ", restarting sync" << dendl; + } + } + + if (sync_status.sync_info.state == rgw_meta_sync_info::StateInit) { + ldpp_dout(dpp, 20) << __func__ << "(): init" << dendl; + sync_status.sync_info.num_shards = mdlog_info.num_shards; + auto cursor = store->svc()->mdlog->get_period_history()->get_current(); + if (cursor) { + // run full sync, then start incremental from the current period/epoch + sync_status.sync_info.period = cursor.get_period().get_id(); + sync_status.sync_info.realm_epoch = cursor.get_epoch(); + } + r = run(dpp, new RGWInitSyncStatusCoroutine(&sync_env, sync_status.sync_info)); + if (r == -EBUSY) { + backoff.backoff_sleep(); + continue; + } + backoff.reset(); + if (r < 0) { + ldpp_dout(dpp, 0) << "ERROR: failed to init sync status r=" << r << dendl; + return r; + } + } + } while (sync_status.sync_info.state == rgw_meta_sync_info::StateInit); + + auto num_shards = sync_status.sync_info.num_shards; + if (num_shards != mdlog_info.num_shards) { + ldpp_dout(dpp, -1) << "ERROR: can't sync, mismatch between num shards, master num_shards=" << mdlog_info.num_shards << " local num_shards=" << num_shards << dendl; + return -EINVAL; + } + + RGWPeriodHistory::Cursor cursor; + do { + r = run(dpp, new RGWReadSyncStatusCoroutine(&sync_env, &sync_status)); + if (r < 0 && r != -ENOENT) { + tn->log(0, SSTR("ERROR: failed to fetch sync status r=" << r)); + return r; + } + + switch ((rgw_meta_sync_info::SyncState)sync_status.sync_info.state) { + case rgw_meta_sync_info::StateBuildingFullSyncMaps: + tn->log(20, "building full sync maps"); + r = run(dpp, new RGWFetchAllMetaCR(&sync_env, num_shards, sync_status.sync_markers, tn)); + if (r == -EBUSY || r == -EIO) { + backoff.backoff_sleep(); + continue; + } + backoff.reset(); + if (r < 0) { + tn->log(0, SSTR("ERROR: failed to fetch all metadata keys (r=" << r << ")")); + return r; + } + + sync_status.sync_info.state = rgw_meta_sync_info::StateSync; + r = store_sync_info(dpp, sync_status.sync_info); + if (r < 0) { + tn->log(0, SSTR("ERROR: failed to update sync status (r=" << r << ")")); + return r; + } + /* fall through */ + case rgw_meta_sync_info::StateSync: + tn->log(20, "sync"); + // find our position in the period history (if any) + cursor = get_period_at(dpp, store, sync_status.sync_info, y); + r = cursor.get_error(); + if (r < 0) { + return r; + } + meta_sync_cr = new RGWMetaSyncCR(&sync_env, cursor, sync_status, tn); + r = run(dpp, meta_sync_cr); + if (r < 0) { + tn->log(0, "ERROR: failed to fetch all metadata keys"); + return r; + } + break; + default: + tn->log(0, "ERROR: bad sync state!"); + return -EIO; + } + } while (!going_down); + + return 0; +} + +void RGWRemoteMetaLog::wakeup(int shard_id) +{ + if (!meta_sync_cr) { + return; + } + meta_sync_cr->wakeup(shard_id); +} + +int RGWCloneMetaLogCoroutine::operate(const DoutPrefixProvider *dpp) +{ + reenter(this) { + do { + yield { + ldpp_dout(dpp, 20) << __func__ << ": shard_id=" << shard_id << ": init request" << dendl; + return state_init(); + } + yield { + ldpp_dout(dpp, 20) << __func__ << ": shard_id=" << shard_id << ": reading shard status" << dendl; + return state_read_shard_status(); + } + yield { + ldpp_dout(dpp, 20) << __func__ << ": shard_id=" << shard_id << ": reading shard status complete" << dendl; + return state_read_shard_status_complete(); + } + yield { + ldpp_dout(dpp, 20) << __func__ << ": shard_id=" << shard_id << ": sending rest request" << dendl; + return state_send_rest_request(dpp); + } + yield { + ldpp_dout(dpp, 20) << __func__ << ": shard_id=" << shard_id << ": receiving rest response" << dendl; + return state_receive_rest_response(); + } + yield { + ldpp_dout(dpp, 20) << __func__ << ": shard_id=" << shard_id << ": storing mdlog entries" << dendl; + return state_store_mdlog_entries(); + } + } while (truncated); + yield { + ldpp_dout(dpp, 20) << __func__ << ": shard_id=" << shard_id << ": storing mdlog entries complete" << dendl; + return state_store_mdlog_entries_complete(); + } + } + + return 0; +} + +int RGWCloneMetaLogCoroutine::state_init() +{ + data = rgw_mdlog_shard_data(); + + return 0; +} + +int RGWCloneMetaLogCoroutine::state_read_shard_status() +{ + const bool add_ref = false; // default constructs with refs=1 + + completion.reset(new RGWMetadataLogInfoCompletion( + [this](int ret, const cls_log_header& header) { + if (ret < 0) { + if (ret != -ENOENT) { + ldpp_dout(sync_env->dpp, 1) << "ERROR: failed to read mdlog info with " + << cpp_strerror(ret) << dendl; + } + } else { + shard_info.marker = header.max_marker; + shard_info.last_update = header.max_time.to_real_time(); + } + // wake up parent stack + io_complete(); + }), add_ref); + + int ret = mdlog->get_info_async(sync_env->dpp, shard_id, completion.get()); + if (ret < 0) { + ldpp_dout(sync_env->dpp, 0) << "ERROR: mdlog->get_info_async() returned ret=" << ret << dendl; + return set_cr_error(ret); + } + + return io_block(0); +} + +int RGWCloneMetaLogCoroutine::state_read_shard_status_complete() +{ + completion.reset(); + + ldpp_dout(sync_env->dpp, 20) << "shard_id=" << shard_id << " marker=" << shard_info.marker << " last_update=" << shard_info.last_update << dendl; + + marker = shard_info.marker; + + return 0; +} + +int RGWCloneMetaLogCoroutine::state_send_rest_request(const DoutPrefixProvider *dpp) +{ + RGWRESTConn *conn = sync_env->conn; + + char buf[32]; + snprintf(buf, sizeof(buf), "%d", shard_id); + + char max_entries_buf[32]; + snprintf(max_entries_buf, sizeof(max_entries_buf), "%d", max_entries); + + const char *marker_key = (marker.empty() ? "" : "marker"); + + rgw_http_param_pair pairs[] = { { "type", "metadata" }, + { "id", buf }, + { "period", period.c_str() }, + { "max-entries", max_entries_buf }, + { marker_key, marker.c_str() }, + { NULL, NULL } }; + + http_op = new RGWRESTReadResource(conn, "/admin/log", pairs, NULL, sync_env->http_manager); + + init_new_io(http_op); + + int ret = http_op->aio_read(dpp); + if (ret < 0) { + ldpp_dout(dpp, 0) << "ERROR: failed to fetch mdlog data" << dendl; + log_error() << "failed to send http operation: " << http_op->to_str() << " ret=" << ret << std::endl; + http_op->put(); + http_op = NULL; + return set_cr_error(ret); + } + + return io_block(0); +} + +int RGWCloneMetaLogCoroutine::state_receive_rest_response() +{ + int ret = http_op->wait(&data, null_yield); + if (ret < 0) { + error_stream << "http operation failed: " << http_op->to_str() << " status=" << http_op->get_http_status() << std::endl; + ldpp_dout(sync_env->dpp, 5) << "failed to wait for op, ret=" << ret << dendl; + http_op->put(); + http_op = NULL; + return set_cr_error(ret); + } + http_op->put(); + http_op = NULL; + + ldpp_dout(sync_env->dpp, 20) << "remote mdlog, shard_id=" << shard_id << " num of shard entries: " << data.entries.size() << dendl; + + truncated = ((int)data.entries.size() == max_entries); + + if (data.entries.empty()) { + if (new_marker) { + *new_marker = marker; + } + return set_cr_done(); + } + + if (new_marker) { + *new_marker = data.entries.back().id; + } + + return 0; +} + + +int RGWCloneMetaLogCoroutine::state_store_mdlog_entries() +{ + list<cls_log_entry> dest_entries; + + vector<rgw_mdlog_entry>::iterator iter; + for (iter = data.entries.begin(); iter != data.entries.end(); ++iter) { + rgw_mdlog_entry& entry = *iter; + ldpp_dout(sync_env->dpp, 20) << "entry: name=" << entry.name << dendl; + + cls_log_entry dest_entry; + dest_entry.id = entry.id; + dest_entry.section = entry.section; + dest_entry.name = entry.name; + dest_entry.timestamp = utime_t(entry.timestamp); + + encode(entry.log_data, dest_entry.data); + + dest_entries.push_back(dest_entry); + + marker = entry.id; + } + + RGWAioCompletionNotifier *cn = stack->create_completion_notifier(); + + int ret = mdlog->store_entries_in_shard(sync_env->dpp, dest_entries, shard_id, cn->completion()); + if (ret < 0) { + cn->put(); + ldpp_dout(sync_env->dpp, 10) << "failed to store md log entries shard_id=" << shard_id << " ret=" << ret << dendl; + return set_cr_error(ret); + } + return io_block(0); +} + +int RGWCloneMetaLogCoroutine::state_store_mdlog_entries_complete() +{ + return set_cr_done(); +} |