diff options
Diffstat (limited to 'src/rgw/rgw_period_pusher.cc')
-rw-r--r-- | src/rgw/rgw_period_pusher.cc | 307 |
1 files changed, 307 insertions, 0 deletions
diff --git a/src/rgw/rgw_period_pusher.cc b/src/rgw/rgw_period_pusher.cc new file mode 100644 index 00000000..e3db85df --- /dev/null +++ b/src/rgw/rgw_period_pusher.cc @@ -0,0 +1,307 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#include <map> +#include <thread> + +#include "rgw_period_pusher.h" +#include "rgw_cr_rest.h" +#include "rgw_zone.h" + +#include "services/svc_zone.h" + +#include "common/errno.h" + +#include <boost/asio/yield.hpp> + +#define dout_subsys ceph_subsys_rgw + +#undef dout_prefix +#define dout_prefix (*_dout << "rgw period pusher: ") + +/// A coroutine to post the period over the given connection. +using PushCR = RGWPostRESTResourceCR<RGWPeriod, int>; + +/// A coroutine that calls PushCR, and retries with backoff until success. +class PushAndRetryCR : public RGWCoroutine { + const std::string& zone; + RGWRESTConn *const conn; + RGWHTTPManager *const http; + RGWPeriod& period; + const std::string epoch; //< epoch string for params + double timeout; //< current interval between retries + const double timeout_max; //< maximum interval between retries + uint32_t counter; //< number of failures since backoff increased + + public: + PushAndRetryCR(CephContext* cct, const std::string& zone, RGWRESTConn* conn, + RGWHTTPManager* http, RGWPeriod& period) + : RGWCoroutine(cct), zone(zone), conn(conn), http(http), period(period), + epoch(std::to_string(period.get_epoch())), + timeout(cct->_conf->rgw_period_push_interval), + timeout_max(cct->_conf->rgw_period_push_interval_max), + counter(0) + {} + + int operate() override; +}; + +int PushAndRetryCR::operate() +{ + reenter(this) { + for (;;) { + yield { + ldout(cct, 10) << "pushing period " << period.get_id() + << " to " << zone << dendl; + // initialize the http params + rgw_http_param_pair params[] = { + { "period", period.get_id().c_str() }, + { "epoch", epoch.c_str() }, + { nullptr, nullptr } + }; + call(new PushCR(cct, conn, http, "/admin/realm/period", + params, period, nullptr)); + } + + // stop on success + if (get_ret_status() == 0) { + ldout(cct, 10) << "push to " << zone << " succeeded" << dendl; + return set_cr_done(); + } + + // try each endpoint in the connection before waiting + if (++counter < conn->get_endpoint_count()) + continue; + counter = 0; + + // wait with exponential backoff up to timeout_max + yield { + utime_t dur; + dur.set_from_double(timeout); + + ldout(cct, 10) << "waiting " << dur << "s for retry.." << dendl; + wait(dur); + + timeout *= 2; + if (timeout > timeout_max) + timeout = timeout_max; + } + } + } + return 0; +} + +/** + * PushAllCR is a coroutine that sends the period over all of the given + * connections, retrying until they are all marked as completed. + */ +class PushAllCR : public RGWCoroutine { + RGWHTTPManager *const http; + RGWPeriod period; //< period object to push + std::map<std::string, RGWRESTConn> conns; //< zones that need the period + + public: + PushAllCR(CephContext* cct, RGWHTTPManager* http, RGWPeriod&& period, + std::map<std::string, RGWRESTConn>&& conns) + : RGWCoroutine(cct), http(http), + period(std::move(period)), + conns(std::move(conns)) + {} + + int operate() override; +}; + +int PushAllCR::operate() +{ + reenter(this) { + // spawn a coroutine to push the period over each connection + yield { + ldout(cct, 4) << "sending " << conns.size() << " periods" << dendl; + for (auto& c : conns) + spawn(new PushAndRetryCR(cct, c.first, &c.second, http, period), false); + } + // wait for all to complete + drain_all(); + return set_cr_done(); + } + return 0; +} + +/// A background thread to run the PushAllCR coroutine and exit. +class RGWPeriodPusher::CRThread { + RGWCoroutinesManager coroutines; + RGWHTTPManager http; + boost::intrusive_ptr<PushAllCR> push_all; + std::thread thread; + + public: + CRThread(CephContext* cct, RGWPeriod&& period, + std::map<std::string, RGWRESTConn>&& conns) + : coroutines(cct, NULL), + http(cct, coroutines.get_completion_mgr()), + push_all(new PushAllCR(cct, &http, std::move(period), std::move(conns))) + { + http.start(); + // must spawn the CR thread after start + thread = std::thread([this] { coroutines.run(push_all.get()); }); + } + ~CRThread() + { + push_all.reset(); + coroutines.stop(); + http.stop(); + if (thread.joinable()) + thread.join(); + } +}; + + +RGWPeriodPusher::RGWPeriodPusher(RGWRados* store) + : cct(store->ctx()), store(store) +{ + const auto& realm = store->svc.zone->get_realm(); + auto& realm_id = realm.get_id(); + if (realm_id.empty()) // no realm configuration + return; + + // always send out the current period on startup + RGWPeriod period; + int r = period.init(cct, store->svc.sysobj, realm_id, realm.get_name()); + if (r < 0) { + lderr(cct) << "failed to load period for realm " << realm_id << dendl; + return; + } + + std::lock_guard<std::mutex> lock(mutex); + handle_notify(std::move(period)); +} + +// destructor is here because CRThread is incomplete in the header +RGWPeriodPusher::~RGWPeriodPusher() = default; + +void RGWPeriodPusher::handle_notify(RGWRealmNotify type, + bufferlist::const_iterator& p) +{ + // decode the period + RGWZonesNeedPeriod info; + try { + decode(info, p); + } catch (buffer::error& e) { + lderr(cct) << "Failed to decode the period: " << e.what() << dendl; + return; + } + + std::lock_guard<std::mutex> lock(mutex); + + // we can't process this notification without access to our current realm + // configuration. queue it until resume() + if (store == nullptr) { + pending_periods.emplace_back(std::move(info)); + return; + } + + handle_notify(std::move(info)); +} + +// expects the caller to hold a lock on mutex +void RGWPeriodPusher::handle_notify(RGWZonesNeedPeriod&& period) +{ + if (period.get_realm_epoch() < realm_epoch) { + ldout(cct, 10) << "period's realm epoch " << period.get_realm_epoch() + << " is not newer than current realm epoch " << realm_epoch + << ", discarding update" << dendl; + return; + } + if (period.get_realm_epoch() == realm_epoch && + period.get_epoch() <= period_epoch) { + ldout(cct, 10) << "period epoch " << period.get_epoch() << " is not newer " + "than current epoch " << period_epoch << ", discarding update" << dendl; + return; + } + + // find our zonegroup in the new period + auto& zonegroups = period.get_map().zonegroups; + auto i = zonegroups.find(store->svc.zone->get_zonegroup().get_id()); + if (i == zonegroups.end()) { + lderr(cct) << "The new period does not contain my zonegroup!" << dendl; + return; + } + auto& my_zonegroup = i->second; + + // if we're not a master zone, we're not responsible for pushing any updates + if (my_zonegroup.master_zone != store->svc.zone->get_zone_params().get_id()) + return; + + // construct a map of the zones that need this period. the map uses the same + // keys/ordering as the zone[group] map, so we can use a hint for insertions + std::map<std::string, RGWRESTConn> conns; + auto hint = conns.end(); + + // are we the master zonegroup in this period? + if (period.get_map().master_zonegroup == store->svc.zone->get_zonegroup().get_id()) { + // update other zonegroup endpoints + for (auto& zg : zonegroups) { + auto& zonegroup = zg.second; + if (zonegroup.get_id() == store->svc.zone->get_zonegroup().get_id()) + continue; + if (zonegroup.endpoints.empty()) + continue; + + hint = conns.emplace_hint( + hint, std::piecewise_construct, + std::forward_as_tuple(zonegroup.get_id()), + std::forward_as_tuple(cct, store->svc.zone, zonegroup.get_id(), zonegroup.endpoints)); + } + } + + // update other zone endpoints + for (auto& z : my_zonegroup.zones) { + auto& zone = z.second; + if (zone.id == store->svc.zone->get_zone_params().get_id()) + continue; + if (zone.endpoints.empty()) + continue; + + hint = conns.emplace_hint( + hint, std::piecewise_construct, + std::forward_as_tuple(zone.id), + std::forward_as_tuple(cct, store->svc.zone, zone.id, zone.endpoints)); + } + + if (conns.empty()) { + ldout(cct, 4) << "No zones to update" << dendl; + return; + } + + realm_epoch = period.get_realm_epoch(); + period_epoch = period.get_epoch(); + + ldout(cct, 4) << "Zone master pushing period " << period.get_id() + << " epoch " << period_epoch << " to " + << conns.size() << " other zones" << dendl; + + // spawn a new coroutine thread, destroying the previous one + cr_thread.reset(new CRThread(cct, std::move(period), std::move(conns))); +} + +void RGWPeriodPusher::pause() +{ + ldout(cct, 4) << "paused for realm update" << dendl; + std::lock_guard<std::mutex> lock(mutex); + store = nullptr; +} + +void RGWPeriodPusher::resume(RGWRados* store) +{ + std::lock_guard<std::mutex> lock(mutex); + this->store = store; + + ldout(cct, 4) << "resume with " << pending_periods.size() + << " periods pending" << dendl; + + // process notification queue + for (auto& info : pending_periods) { + handle_notify(std::move(info)); + } + pending_periods.clear(); +} |