summaryrefslogtreecommitdiffstats
path: root/src/rgw/services/svc_notify.cc
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-27 18:24:20 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-27 18:24:20 +0000
commit483eb2f56657e8e7f419ab1a4fab8dce9ade8609 (patch)
treee5d88d25d870d5dedacb6bbdbe2a966086a0a5cf /src/rgw/services/svc_notify.cc
parentInitial commit. (diff)
downloadceph-upstream.tar.xz
ceph-upstream.zip
Adding upstream version 14.2.21.upstream/14.2.21upstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'src/rgw/services/svc_notify.cc')
-rw-r--r--src/rgw/services/svc_notify.cc484
1 files changed, 484 insertions, 0 deletions
diff --git a/src/rgw/services/svc_notify.cc b/src/rgw/services/svc_notify.cc
new file mode 100644
index 00000000..9ee7f295
--- /dev/null
+++ b/src/rgw/services/svc_notify.cc
@@ -0,0 +1,484 @@
+#include "include/random.h"
+#include "common/errno.h"
+
+#include "svc_notify.h"
+#include "svc_finisher.h"
+#include "svc_zone.h"
+#include "svc_rados.h"
+
+#include "rgw/rgw_zone.h"
+
+#define dout_subsys ceph_subsys_rgw
+
+static string notify_oid_prefix = "notify";
+
+class RGWWatcher : public librados::WatchCtx2 {
+ CephContext *cct;
+ RGWSI_Notify *svc;
+ int index;
+ RGWSI_RADOS::Obj obj;
+ uint64_t watch_handle;
+ int register_ret{0};
+ librados::AioCompletion *register_completion{nullptr};
+
+ class C_ReinitWatch : public Context {
+ RGWWatcher *watcher;
+ public:
+ explicit C_ReinitWatch(RGWWatcher *_watcher) : watcher(_watcher) {}
+ void finish(int r) override {
+ watcher->reinit();
+ }
+ };
+public:
+ RGWWatcher(CephContext *_cct, RGWSI_Notify *s, int i, RGWSI_RADOS::Obj& o) : cct(_cct), svc(s), index(i), obj(o), watch_handle(0) {}
+ void handle_notify(uint64_t notify_id,
+ uint64_t cookie,
+ uint64_t notifier_id,
+ bufferlist& bl) override {
+ ldout(cct, 10) << "RGWWatcher::handle_notify() "
+ << " notify_id " << notify_id
+ << " cookie " << cookie
+ << " notifier " << notifier_id
+ << " bl.length()=" << bl.length() << dendl;
+
+ if (unlikely(svc->inject_notify_timeout_probability == 1) ||
+ (svc->inject_notify_timeout_probability > 0 &&
+ (svc->inject_notify_timeout_probability >
+ ceph::util::generate_random_number(0.0, 1.0)))) {
+ ldout(cct, 0)
+ << "RGWWatcher::handle_notify() dropping notification! "
+ << "If this isn't what you want, set "
+ << "rgw_inject_notify_timeout_probability to zero!" << dendl;
+ return;
+ }
+
+ svc->watch_cb(notify_id, cookie, notifier_id, bl);
+
+ bufferlist reply_bl; // empty reply payload
+ obj.notify_ack(notify_id, cookie, reply_bl);
+ }
+ void handle_error(uint64_t cookie, int err) override {
+ lderr(cct) << "RGWWatcher::handle_error cookie " << cookie
+ << " err " << cpp_strerror(err) << dendl;
+ svc->remove_watcher(index);
+ svc->schedule_context(new C_ReinitWatch(this));
+ }
+
+ void reinit() {
+ int ret = unregister_watch();
+ if (ret < 0) {
+ ldout(cct, 0) << "ERROR: unregister_watch() returned ret=" << ret << dendl;
+ return;
+ }
+ ret = register_watch();
+ if (ret < 0) {
+ ldout(cct, 0) << "ERROR: register_watch() returned ret=" << ret << dendl;
+ return;
+ }
+ }
+
+ int unregister_watch() {
+ int r = svc->unwatch(obj, watch_handle);
+ if (r < 0) {
+ return r;
+ }
+ svc->remove_watcher(index);
+ return 0;
+ }
+
+ int register_watch_async() {
+ if (register_completion) {
+ register_completion->release();
+ register_completion = nullptr;
+ }
+ register_completion = librados::Rados::aio_create_completion(nullptr, nullptr, nullptr);
+ register_ret = obj.aio_watch(register_completion, &watch_handle, this);
+ if (register_ret < 0) {
+ register_completion->release();
+ return register_ret;
+ }
+ return 0;
+ }
+
+ int register_watch_finish() {
+ if (register_ret < 0) {
+ return register_ret;
+ }
+ if (!register_completion) {
+ return -EINVAL;
+ }
+ register_completion->wait_for_safe();
+ int r = register_completion->get_return_value();
+ register_completion->release();
+ register_completion = nullptr;
+ if (r < 0) {
+ return r;
+ }
+ svc->add_watcher(index);
+ return 0;
+ }
+
+ int register_watch() {
+ int r = obj.watch(&watch_handle, this);
+ if (r < 0) {
+ return r;
+ }
+ svc->add_watcher(index);
+ return 0;
+ }
+};
+
+
+class RGWSI_Notify_ShutdownCB : public RGWSI_Finisher::ShutdownCB
+{
+ RGWSI_Notify *svc;
+public:
+ RGWSI_Notify_ShutdownCB(RGWSI_Notify *_svc) : svc(_svc) {}
+ void call() override {
+ svc->shutdown();
+ }
+};
+
+string RGWSI_Notify::get_control_oid(int i)
+{
+ char buf[notify_oid_prefix.size() + 16];
+ snprintf(buf, sizeof(buf), "%s.%d", notify_oid_prefix.c_str(), i);
+
+ return string(buf);
+}
+
+// do not call pick_obj_control before init_watch
+RGWSI_RADOS::Obj RGWSI_Notify::pick_control_obj(const string& key)
+{
+ uint32_t r = ceph_str_hash_linux(key.c_str(), key.size());
+
+ int i = r % num_watchers;
+ return notify_objs[i];
+}
+
+int RGWSI_Notify::init_watch()
+{
+ num_watchers = cct->_conf->rgw_num_control_oids;
+
+ bool compat_oid = (num_watchers == 0);
+
+ if (num_watchers <= 0)
+ num_watchers = 1;
+
+ watchers = new RGWWatcher *[num_watchers];
+
+ int error = 0;
+
+ notify_objs.resize(num_watchers);
+
+ for (int i=0; i < num_watchers; i++) {
+ string notify_oid;
+
+ if (!compat_oid) {
+ notify_oid = get_control_oid(i);
+ } else {
+ notify_oid = notify_oid_prefix;
+ }
+
+ notify_objs[i] = rados_svc->handle().obj({control_pool, notify_oid});
+ auto& notify_obj = notify_objs[i];
+
+ int r = notify_obj.open();
+ if (r < 0) {
+ ldout(cct, 0) << "ERROR: notify_obj.open() returned r=" << r << dendl;
+ return r;
+ }
+
+ librados::ObjectWriteOperation op;
+ op.create(false);
+ r = notify_obj.operate(&op, null_yield);
+ if (r < 0 && r != -EEXIST) {
+ ldout(cct, 0) << "ERROR: notify_obj.operate() returned r=" << r << dendl;
+ return r;
+ }
+
+ RGWWatcher *watcher = new RGWWatcher(cct, this, i, notify_obj);
+ watchers[i] = watcher;
+
+ r = watcher->register_watch_async();
+ if (r < 0) {
+ ldout(cct, 0) << "WARNING: register_watch_aio() returned " << r << dendl;
+ error = r;
+ continue;
+ }
+ }
+
+ for (int i = 0; i < num_watchers; ++i) {
+ int r = watchers[i]->register_watch_finish();
+ if (r < 0) {
+ ldout(cct, 0) << "WARNING: async watch returned " << r << dendl;
+ error = r;
+ }
+ }
+
+ if (error < 0) {
+ return error;
+ }
+
+ return 0;
+}
+
+void RGWSI_Notify::finalize_watch()
+{
+ for (int i = 0; i < num_watchers; i++) {
+ RGWWatcher *watcher = watchers[i];
+ watcher->unregister_watch();
+ delete watcher;
+ }
+
+ delete[] watchers;
+}
+
+int RGWSI_Notify::do_start()
+{
+ int r = zone_svc->start();
+ if (r < 0) {
+ return r;
+ }
+
+ assert(zone_svc->is_started()); /* otherwise there's an ordering problem */
+
+ r = rados_svc->start();
+ if (r < 0) {
+ return r;
+ }
+ r = finisher_svc->start();
+ if (r < 0) {
+ return r;
+ }
+
+ control_pool = zone_svc->get_zone_params().control_pool;
+
+ int ret = init_watch();
+ if (ret < 0) {
+ lderr(cct) << "ERROR: failed to initialize watch: " << cpp_strerror(-ret) << dendl;
+ return ret;
+ }
+
+ shutdown_cb = new RGWSI_Notify_ShutdownCB(this);
+ int handle;
+ finisher_svc->register_caller(shutdown_cb, &handle);
+ finisher_handle = handle;
+
+ return 0;
+}
+
+void RGWSI_Notify::shutdown()
+{
+ if (finalized) {
+ return;
+ }
+
+ if (finisher_handle) {
+ finisher_svc->unregister_caller(*finisher_handle);
+ }
+ finalize_watch();
+
+ delete shutdown_cb;
+
+ finalized = true;
+}
+
+RGWSI_Notify::~RGWSI_Notify()
+{
+ shutdown();
+}
+
+int RGWSI_Notify::unwatch(RGWSI_RADOS::Obj& obj, uint64_t watch_handle)
+{
+ int r = obj.unwatch(watch_handle);
+ if (r < 0) {
+ ldout(cct, 0) << "ERROR: rados->unwatch2() returned r=" << r << dendl;
+ return r;
+ }
+ r = rados_svc->handle().watch_flush();
+ if (r < 0) {
+ ldout(cct, 0) << "ERROR: rados->watch_flush() returned r=" << r << dendl;
+ return r;
+ }
+ return 0;
+}
+
+void RGWSI_Notify::add_watcher(int i)
+{
+ ldout(cct, 20) << "add_watcher() i=" << i << dendl;
+ RWLock::WLocker l(watchers_lock);
+ watchers_set.insert(i);
+ if (watchers_set.size() == (size_t)num_watchers) {
+ ldout(cct, 2) << "all " << num_watchers << " watchers are set, enabling cache" << dendl;
+ _set_enabled(true);
+ }
+}
+
+void RGWSI_Notify::remove_watcher(int i)
+{
+ ldout(cct, 20) << "remove_watcher() i=" << i << dendl;
+ RWLock::WLocker l(watchers_lock);
+ size_t orig_size = watchers_set.size();
+ watchers_set.erase(i);
+ if (orig_size == (size_t)num_watchers &&
+ watchers_set.size() < orig_size) { /* actually removed */
+ ldout(cct, 2) << "removed watcher, disabling cache" << dendl;
+ _set_enabled(false);
+ }
+}
+
+int RGWSI_Notify::watch_cb(uint64_t notify_id,
+ uint64_t cookie,
+ uint64_t notifier_id,
+ bufferlist& bl)
+{
+ RWLock::RLocker l(watchers_lock);
+ if (cb) {
+ return cb->watch_cb(notify_id, cookie, notifier_id, bl);
+ }
+ return 0;
+}
+
+void RGWSI_Notify::set_enabled(bool status)
+{
+ RWLock::WLocker l(watchers_lock);
+ _set_enabled(status);
+}
+
+void RGWSI_Notify::_set_enabled(bool status)
+{
+ enabled = status;
+ if (cb) {
+ cb->set_enabled(status);
+ }
+}
+
+int RGWSI_Notify::distribute(const string& key, bufferlist& bl)
+{
+ /* The RGW uses the control pool to store the watch notify objects.
+ The precedence in RGWSI_Notify::do_start is to call to zone_svc->start and later to init_watch().
+ The first time, RGW starts in the cluster, the RGW will try to create zone and zonegroup system object.
+ In that case RGW will try to distribute the cache before it ran init_watch,
+ which will lead to division by 0 in pick_obj_control (num_watchers is 0).
+ */
+ if (num_watchers > 0) {
+ RGWSI_RADOS::Obj notify_obj = pick_control_obj(key);
+
+ ldout(cct, 10) << "distributing notification oid=" << notify_obj.get_ref().obj
+ << " bl.length()=" << bl.length() << dendl;
+ return robust_notify(notify_obj, bl);
+ }
+ return 0;
+}
+
+int RGWSI_Notify::robust_notify(RGWSI_RADOS::Obj& notify_obj, bufferlist& bl)
+{
+ // The reply of every machine that acks goes in here.
+ boost::container::flat_set<std::pair<uint64_t, uint64_t>> acks;
+ bufferlist rbl;
+
+ // First, try to send, without being fancy about it.
+ auto r = notify_obj.notify(bl, 0, &rbl);
+
+ // If that doesn't work, get serious.
+ if (r < 0) {
+ ldout(cct, 1) << "robust_notify: If at first you don't succeed: "
+ << cpp_strerror(-r) << dendl;
+
+
+ auto p = rbl.cbegin();
+ // Gather up the replies to the first attempt.
+ try {
+ uint32_t num_acks;
+ decode(num_acks, p);
+ // Doing this ourselves since we don't care about the payload;
+ for (auto i = 0u; i < num_acks; ++i) {
+ std::pair<uint64_t, uint64_t> id;
+ decode(id, p);
+ acks.insert(id);
+ ldout(cct, 20) << "robust_notify: acked by " << id << dendl;
+ uint32_t blen;
+ decode(blen, p);
+ p.advance(blen);
+ }
+ } catch (const buffer::error& e) {
+ ldout(cct, 0) << "robust_notify: notify response parse failed: "
+ << e.what() << dendl;
+ acks.clear(); // Throw away junk on failed parse.
+ }
+
+
+ // Every machine that fails to reply and hasn't acked a previous
+ // attempt goes in here.
+ boost::container::flat_set<std::pair<uint64_t, uint64_t>> timeouts;
+
+ auto tries = 1u;
+ while (r < 0 && tries < max_notify_retries) {
+ ++tries;
+ rbl.clear();
+ // Reset the timeouts, we're only concerned with new ones.
+ timeouts.clear();
+ r = notify_obj.notify(bl, 0, &rbl);
+ if (r < 0) {
+ ldout(cct, 1) << "robust_notify: retry " << tries << " failed: "
+ << cpp_strerror(-r) << dendl;
+ p = rbl.begin();
+ try {
+ uint32_t num_acks;
+ decode(num_acks, p);
+ // Not only do we not care about the payload, but we don't
+ // want to empty the container; we just want to augment it
+ // with any new members.
+ for (auto i = 0u; i < num_acks; ++i) {
+ std::pair<uint64_t, uint64_t> id;
+ decode(id, p);
+ auto ir = acks.insert(id);
+ if (ir.second) {
+ ldout(cct, 20) << "robust_notify: acked by " << id << dendl;
+ }
+ uint32_t blen;
+ decode(blen, p);
+ p.advance(blen);
+ }
+
+ uint32_t num_timeouts;
+ decode(num_timeouts, p);
+ for (auto i = 0u; i < num_timeouts; ++i) {
+ std::pair<uint64_t, uint64_t> id;
+ decode(id, p);
+ // Only track timeouts from hosts that haven't acked previously.
+ if (acks.find(id) != acks.cend()) {
+ ldout(cct, 20) << "robust_notify: " << id << " timed out."
+ << dendl;
+ timeouts.insert(id);
+ }
+ }
+ } catch (const buffer::error& e) {
+ ldout(cct, 0) << "robust_notify: notify response parse failed: "
+ << e.what() << dendl;
+ continue;
+ }
+ // If we got a good parse and timeouts is empty, that means
+ // everyone who timed out in one call received the update in a
+ // previous one.
+ if (timeouts.empty()) {
+ r = 0;
+ }
+ }
+ }
+ }
+ return r;
+}
+
+void RGWSI_Notify::register_watch_cb(CB *_cb)
+{
+ RWLock::WLocker l(watchers_lock);
+ cb = _cb;
+ _set_enabled(enabled);
+}
+
+void RGWSI_Notify::schedule_context(Context *c)
+{
+ finisher_svc->schedule_context(c);
+}