diff options
Diffstat (limited to '')
-rw-r--r-- | src/rgw/services/svc_notify.cc | 515 |
1 files changed, 515 insertions, 0 deletions
diff --git a/src/rgw/services/svc_notify.cc b/src/rgw/services/svc_notify.cc new file mode 100644 index 000000000..43f84ed0a --- /dev/null +++ b/src/rgw/services/svc_notify.cc @@ -0,0 +1,515 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab ft=cpp + +#include "include/random.h" +#include "include/Context.h" +#include "common/errno.h" + +#include "rgw_cache.h" +#include "svc_notify.h" +#include "svc_finisher.h" +#include "svc_zone.h" +#include "svc_rados.h" + +#include "rgw_zone.h" + +#define dout_subsys ceph_subsys_rgw + +using namespace std; + +static string notify_oid_prefix = "notify"; + +RGWSI_Notify::~RGWSI_Notify() +{ + shutdown(); +} + + +class RGWWatcher : public DoutPrefixProvider , public librados::WatchCtx2 { + CephContext *cct; + RGWSI_Notify *svc; + int index; + RGWSI_RADOS::Obj obj; + uint64_t watch_handle; + int register_ret{0}; + bool unregister_done{false}; + librados::AioCompletion *register_completion{nullptr}; + + class C_ReinitWatch : public Context { + RGWWatcher *watcher; + public: + explicit C_ReinitWatch(RGWWatcher *_watcher) : watcher(_watcher) {} + void finish(int r) override { + watcher->reinit(); + } + }; + + CephContext *get_cct() const override { return cct; } + unsigned get_subsys() const override { return dout_subsys; } + std::ostream& gen_prefix(std::ostream& out) const override { + return out << "rgw watcher librados: "; + } + +public: + RGWWatcher(CephContext *_cct, RGWSI_Notify *s, int i, RGWSI_RADOS::Obj& o) : cct(_cct), svc(s), index(i), obj(o), watch_handle(0) {} + void handle_notify(uint64_t notify_id, + uint64_t cookie, + uint64_t notifier_id, + bufferlist& bl) override { + ldpp_dout(this, 10) << "RGWWatcher::handle_notify() " + << " notify_id " << notify_id + << " cookie " << cookie + << " notifier " << notifier_id + << " bl.length()=" << bl.length() << dendl; + + if (unlikely(svc->inject_notify_timeout_probability == 1) || + (svc->inject_notify_timeout_probability > 0 && + (svc->inject_notify_timeout_probability > + ceph::util::generate_random_number(0.0, 1.0)))) { + ldpp_dout(this, 0) + << "RGWWatcher::handle_notify() dropping notification! " + << "If this isn't what you want, set " + << "rgw_inject_notify_timeout_probability to zero!" << dendl; + return; + } + + svc->watch_cb(this, notify_id, cookie, notifier_id, bl); + + bufferlist reply_bl; // empty reply payload + obj.notify_ack(notify_id, cookie, reply_bl); + } + void handle_error(uint64_t cookie, int err) override { + ldpp_dout(this, -1) << "RGWWatcher::handle_error cookie " << cookie + << " err " << cpp_strerror(err) << dendl; + svc->remove_watcher(index); + svc->schedule_context(new C_ReinitWatch(this)); + } + + void reinit() { + if(!unregister_done) { + int ret = unregister_watch(); + if (ret < 0) { + ldout(cct, 0) << "ERROR: unregister_watch() returned ret=" << ret << dendl; + } + } + int ret = register_watch(); + if (ret < 0) { + ldout(cct, 0) << "ERROR: register_watch() returned ret=" << ret << dendl; + svc->schedule_context(new C_ReinitWatch(this)); + return; + } + } + + int unregister_watch() { + int r = svc->unwatch(obj, watch_handle); + unregister_done = true; + if (r < 0) { + return r; + } + svc->remove_watcher(index); + return 0; + } + + int register_watch_async() { + if (register_completion) { + register_completion->release(); + register_completion = nullptr; + } + register_completion = librados::Rados::aio_create_completion(nullptr, nullptr); + register_ret = obj.aio_watch(register_completion, &watch_handle, this); + if (register_ret < 0) { + register_completion->release(); + return register_ret; + } + return 0; + } + + int register_watch_finish() { + if (register_ret < 0) { + return register_ret; + } + if (!register_completion) { + return -EINVAL; + } + register_completion->wait_for_complete(); + int r = register_completion->get_return_value(); + register_completion->release(); + register_completion = nullptr; + if (r < 0) { + return r; + } + svc->add_watcher(index); + unregister_done = false; + return 0; + } + + int register_watch() { + int r = obj.watch(&watch_handle, this); + if (r < 0) { + return r; + } + svc->add_watcher(index); + unregister_done = false; + return 0; + } +}; + + +class RGWSI_Notify_ShutdownCB : public RGWSI_Finisher::ShutdownCB +{ + RGWSI_Notify *svc; +public: + RGWSI_Notify_ShutdownCB(RGWSI_Notify *_svc) : svc(_svc) {} + void call() override { + svc->shutdown(); + } +}; + +string RGWSI_Notify::get_control_oid(int i) +{ + char buf[notify_oid_prefix.size() + 16]; + snprintf(buf, sizeof(buf), "%s.%d", notify_oid_prefix.c_str(), i); + + return string(buf); +} + +// do not call pick_obj_control before init_watch +RGWSI_RADOS::Obj RGWSI_Notify::pick_control_obj(const string& key) +{ + uint32_t r = ceph_str_hash_linux(key.c_str(), key.size()); + + int i = r % num_watchers; + return notify_objs[i]; +} + +int RGWSI_Notify::init_watch(const DoutPrefixProvider *dpp, optional_yield y) +{ + num_watchers = cct->_conf->rgw_num_control_oids; + + bool compat_oid = (num_watchers == 0); + + if (num_watchers <= 0) + num_watchers = 1; + + watchers = new RGWWatcher *[num_watchers]; + + int error = 0; + + notify_objs.resize(num_watchers); + + for (int i=0; i < num_watchers; i++) { + string notify_oid; + + if (!compat_oid) { + notify_oid = get_control_oid(i); + } else { + notify_oid = notify_oid_prefix; + } + + notify_objs[i] = rados_svc->handle().obj({control_pool, notify_oid}); + auto& notify_obj = notify_objs[i]; + + int r = notify_obj.open(dpp); + if (r < 0) { + ldpp_dout(dpp, 0) << "ERROR: notify_obj.open() returned r=" << r << dendl; + return r; + } + + librados::ObjectWriteOperation op; + op.create(false); + r = notify_obj.operate(dpp, &op, y); + if (r < 0 && r != -EEXIST) { + ldpp_dout(dpp, 0) << "ERROR: notify_obj.operate() returned r=" << r << dendl; + return r; + } + + RGWWatcher *watcher = new RGWWatcher(cct, this, i, notify_obj); + watchers[i] = watcher; + + r = watcher->register_watch_async(); + if (r < 0) { + ldpp_dout(dpp, 0) << "WARNING: register_watch_aio() returned " << r << dendl; + error = r; + continue; + } + } + + for (int i = 0; i < num_watchers; ++i) { + int r = watchers[i]->register_watch_finish(); + if (r < 0) { + ldpp_dout(dpp, 0) << "WARNING: async watch returned " << r << dendl; + error = r; + } + } + + if (error < 0) { + return error; + } + + return 0; +} + +void RGWSI_Notify::finalize_watch() +{ + for (int i = 0; i < num_watchers; i++) { + RGWWatcher *watcher = watchers[i]; + if (watchers_set.find(i) != watchers_set.end()) + watcher->unregister_watch(); + delete watcher; + } + + delete[] watchers; +} + +int RGWSI_Notify::do_start(optional_yield y, const DoutPrefixProvider *dpp) +{ + int r = zone_svc->start(y, dpp); + if (r < 0) { + return r; + } + + assert(zone_svc->is_started()); /* otherwise there's an ordering problem */ + + r = rados_svc->start(y, dpp); + if (r < 0) { + return r; + } + r = finisher_svc->start(y, dpp); + if (r < 0) { + return r; + } + + inject_notify_timeout_probability = + cct->_conf.get_val<double>("rgw_inject_notify_timeout_probability"); + max_notify_retries = cct->_conf.get_val<uint64_t>("rgw_max_notify_retries"); + + control_pool = zone_svc->get_zone_params().control_pool; + + int ret = init_watch(dpp, y); + if (ret < 0) { + ldpp_dout(dpp, -1) << "ERROR: failed to initialize watch: " << cpp_strerror(-ret) << dendl; + return ret; + } + + shutdown_cb = new RGWSI_Notify_ShutdownCB(this); + int handle; + finisher_svc->register_caller(shutdown_cb, &handle); + finisher_handle = handle; + + return 0; +} + +void RGWSI_Notify::shutdown() +{ + if (finalized) { + return; + } + + if (finisher_handle) { + finisher_svc->unregister_caller(*finisher_handle); + } + finalize_watch(); + + delete shutdown_cb; + + finalized = true; +} + +int RGWSI_Notify::unwatch(RGWSI_RADOS::Obj& obj, uint64_t watch_handle) +{ + int r = obj.unwatch(watch_handle); + if (r < 0) { + ldout(cct, 0) << "ERROR: rados->unwatch2() returned r=" << r << dendl; + return r; + } + r = rados_svc->handle().watch_flush(); + if (r < 0) { + ldout(cct, 0) << "ERROR: rados->watch_flush() returned r=" << r << dendl; + return r; + } + return 0; +} + +void RGWSI_Notify::add_watcher(int i) +{ + ldout(cct, 20) << "add_watcher() i=" << i << dendl; + std::unique_lock l{watchers_lock}; + watchers_set.insert(i); + if (watchers_set.size() == (size_t)num_watchers) { + ldout(cct, 2) << "all " << num_watchers << " watchers are set, enabling cache" << dendl; + _set_enabled(true); + } +} + +void RGWSI_Notify::remove_watcher(int i) +{ + ldout(cct, 20) << "remove_watcher() i=" << i << dendl; + std::unique_lock l{watchers_lock}; + size_t orig_size = watchers_set.size(); + watchers_set.erase(i); + if (orig_size == (size_t)num_watchers && + watchers_set.size() < orig_size) { /* actually removed */ + ldout(cct, 2) << "removed watcher, disabling cache" << dendl; + _set_enabled(false); + } +} + +int RGWSI_Notify::watch_cb(const DoutPrefixProvider *dpp, + uint64_t notify_id, + uint64_t cookie, + uint64_t notifier_id, + bufferlist& bl) +{ + std::shared_lock l{watchers_lock}; + if (cb) { + return cb->watch_cb(dpp, notify_id, cookie, notifier_id, bl); + } + return 0; +} + +void RGWSI_Notify::set_enabled(bool status) +{ + std::unique_lock l{watchers_lock}; + _set_enabled(status); +} + +void RGWSI_Notify::_set_enabled(bool status) +{ + enabled = status; + if (cb) { + cb->set_enabled(status); + } +} + +int RGWSI_Notify::distribute(const DoutPrefixProvider *dpp, const string& key, + const RGWCacheNotifyInfo& cni, + optional_yield y) +{ + /* The RGW uses the control pool to store the watch notify objects. + The precedence in RGWSI_Notify::do_start is to call to zone_svc->start and later to init_watch(). + The first time, RGW starts in the cluster, the RGW will try to create zone and zonegroup system object. + In that case RGW will try to distribute the cache before it ran init_watch, + which will lead to division by 0 in pick_obj_control (num_watchers is 0). + */ + if (num_watchers > 0) { + RGWSI_RADOS::Obj notify_obj = pick_control_obj(key); + + ldpp_dout(dpp, 10) << "distributing notification oid=" << notify_obj.get_ref().obj + << " cni=" << cni << dendl; + return robust_notify(dpp, notify_obj, cni, y); + } + return 0; +} + +namespace librados { + +static std::ostream& operator<<(std::ostream& out, const notify_timeout_t& t) +{ + return out << t.notifier_id << ':' << t.cookie; +} + +} // namespace librados + +using timeout_vector = std::vector<librados::notify_timeout_t>; + +static timeout_vector decode_timeouts(const bufferlist& bl) +{ + using ceph::decode; + auto p = bl.begin(); + + // decode and discard the acks + uint32_t num_acks; + decode(num_acks, p); + for (auto i = 0u; i < num_acks; ++i) { + std::pair<uint64_t, uint64_t> id; + decode(id, p); + // discard the payload + uint32_t blen; + decode(blen, p); + p += blen; + } + + // decode and return the timeouts + uint32_t num_timeouts; + decode(num_timeouts, p); + + timeout_vector timeouts; + for (auto i = 0u; i < num_timeouts; ++i) { + std::pair<uint64_t, uint64_t> id; + decode(id, p); + timeouts.push_back({id.first, id.second}); + } + return timeouts; +} + +int RGWSI_Notify::robust_notify(const DoutPrefixProvider *dpp, + RGWSI_RADOS::Obj& notify_obj, + const RGWCacheNotifyInfo& cni, + optional_yield y) +{ + bufferlist bl, rbl; + encode(cni, bl); + + // First, try to send, without being fancy about it. + auto r = notify_obj.notify(dpp, bl, 0, &rbl, y); + + if (r < 0) { + timeout_vector timeouts; + try { + timeouts = decode_timeouts(rbl); + } catch (const buffer::error& e) { + ldpp_dout(dpp, 0) << "robust_notify failed to decode notify response: " + << e.what() << dendl; + } + + ldpp_dout(dpp, 1) << __PRETTY_FUNCTION__ << ":" << __LINE__ + << " Watchers " << timeouts << " did not respond." + << " Notify failed on object " << cni.obj << ": " + << cpp_strerror(-r) << dendl; + } + + // If we timed out, get serious. + if (r == -ETIMEDOUT) { + RGWCacheNotifyInfo info; + info.op = INVALIDATE_OBJ; + info.obj = cni.obj; + bufferlist retrybl; + encode(info, retrybl); + + for (auto tries = 0u; + r == -ETIMEDOUT && tries < max_notify_retries; + ++tries) { + ldpp_dout(dpp, 1) << __PRETTY_FUNCTION__ << ":" << __LINE__ + << " Invalidating obj=" << info.obj << " tries=" + << tries << dendl; + r = notify_obj.notify(dpp, retrybl, 0, &rbl, y); + if (r < 0) { + timeout_vector timeouts; + try { + timeouts = decode_timeouts(rbl); + } catch (const buffer::error& e) { + ldpp_dout(dpp, 0) << "robust_notify failed to decode notify response: " + << e.what() << dendl; + } + + ldpp_dout(dpp, 1) << __PRETTY_FUNCTION__ << ":" << __LINE__ + << " Watchers " << timeouts << " did not respond." + << " Invalidation attempt " << tries << " failed: " + << cpp_strerror(-r) << dendl; + } + } + } + return r; +} + +void RGWSI_Notify::register_watch_cb(CB *_cb) +{ + std::unique_lock l{watchers_lock}; + cb = _cb; + _set_enabled(enabled); +} + +void RGWSI_Notify::schedule_context(Context *c) +{ + finisher_svc->schedule_context(c); +} |