diff options
Diffstat (limited to 'src/rgw/services')
-rw-r--r-- | src/rgw/services/svc_finisher.cc | 53 | ||||
-rw-r--r-- | src/rgw/services/svc_finisher.h | 45 | ||||
-rw-r--r-- | src/rgw/services/svc_notify.cc | 484 | ||||
-rw-r--r-- | src/rgw/services/svc_notify.h | 100 | ||||
-rw-r--r-- | src/rgw/services/svc_quota.cc | 15 | ||||
-rw-r--r-- | src/rgw/services/svc_quota.h | 23 | ||||
-rw-r--r-- | src/rgw/services/svc_rados.cc | 308 | ||||
-rw-r--r-- | src/rgw/services/svc_rados.h | 178 | ||||
-rw-r--r-- | src/rgw/services/svc_sync_modules.cc | 15 | ||||
-rw-r--r-- | src/rgw/services/svc_sync_modules.h | 26 | ||||
-rw-r--r-- | src/rgw/services/svc_sys_obj.cc | 192 | ||||
-rw-r--r-- | src/rgw/services/svc_sys_obj.h | 275 | ||||
-rw-r--r-- | src/rgw/services/svc_sys_obj_cache.cc | 506 | ||||
-rw-r--r-- | src/rgw/services/svc_sys_obj_cache.h | 176 | ||||
-rw-r--r-- | src/rgw/services/svc_sys_obj_core.cc | 595 | ||||
-rw-r--r-- | src/rgw/services/svc_sys_obj_core.h | 201 | ||||
-rw-r--r-- | src/rgw/services/svc_zone.cc | 1250 | ||||
-rw-r--r-- | src/rgw/services/svc_zone.h | 134 | ||||
-rw-r--r-- | src/rgw/services/svc_zone_utils.cc | 59 | ||||
-rw-r--r-- | src/rgw/services/svc_zone_utils.h | 39 |
20 files changed, 4674 insertions, 0 deletions
diff --git a/src/rgw/services/svc_finisher.cc b/src/rgw/services/svc_finisher.cc new file mode 100644 index 00000000..d239ff3c --- /dev/null +++ b/src/rgw/services/svc_finisher.cc @@ -0,0 +1,53 @@ +#include "common/Finisher.h" + +#include "svc_finisher.h" + +int RGWSI_Finisher::do_start() +{ + finisher = new Finisher(cct); + finisher->start(); + + return 0; +} + +void RGWSI_Finisher::shutdown() +{ + if (finalized) { + return; + } + + if (finisher) { + finisher->stop(); + + map<int, ShutdownCB *> cbs; + cbs.swap(shutdown_cbs); /* move cbs out, in case caller unregisetrs */ + for (auto& iter : cbs) { + iter.second->call(); + } + delete finisher; + } + + finalized = true; +} + +RGWSI_Finisher::~RGWSI_Finisher() +{ + shutdown(); +} + +void RGWSI_Finisher::register_caller(ShutdownCB *cb, int *phandle) +{ + *phandle = ++handles_counter; + shutdown_cbs[*phandle] = cb; +} + +void RGWSI_Finisher::unregister_caller(int handle) +{ + shutdown_cbs.erase(handle); +} + +void RGWSI_Finisher::schedule_context(Context *c) +{ + finisher->queue(c); +} + diff --git a/src/rgw/services/svc_finisher.h b/src/rgw/services/svc_finisher.h new file mode 100644 index 00000000..116fd8fd --- /dev/null +++ b/src/rgw/services/svc_finisher.h @@ -0,0 +1,45 @@ +#ifndef CEPH_RGW_SERVICES_FINISHER_H +#define CEPH_RGW_SERVICES_FINISHER_H + + +#include "rgw/rgw_service.h" + +class Context; +class Finisher; + +class RGWSI_Finisher : public RGWServiceInstance +{ + friend struct RGWServices_Def; +public: + class ShutdownCB; + +private: + Finisher *finisher{nullptr}; + bool finalized{false}; + + void shutdown() override; + + std::map<int, ShutdownCB *> shutdown_cbs; + std::atomic<int> handles_counter{0}; + +protected: + void init() {} + int do_start() override; + +public: + RGWSI_Finisher(CephContext *cct): RGWServiceInstance(cct) {} + ~RGWSI_Finisher(); + + class ShutdownCB { + public: + virtual ~ShutdownCB() {} + virtual void call() = 0; + }; + + void register_caller(ShutdownCB *cb, int *phandle); + void unregister_caller(int handle); + + void schedule_context(Context *c); +}; + +#endif diff --git a/src/rgw/services/svc_notify.cc b/src/rgw/services/svc_notify.cc new file mode 100644 index 00000000..9ee7f295 --- /dev/null +++ b/src/rgw/services/svc_notify.cc @@ -0,0 +1,484 @@ +#include "include/random.h" +#include "common/errno.h" + +#include "svc_notify.h" +#include "svc_finisher.h" +#include "svc_zone.h" +#include "svc_rados.h" + +#include "rgw/rgw_zone.h" + +#define dout_subsys ceph_subsys_rgw + +static string notify_oid_prefix = "notify"; + +class RGWWatcher : public librados::WatchCtx2 { + CephContext *cct; + RGWSI_Notify *svc; + int index; + RGWSI_RADOS::Obj obj; + uint64_t watch_handle; + int register_ret{0}; + librados::AioCompletion *register_completion{nullptr}; + + class C_ReinitWatch : public Context { + RGWWatcher *watcher; + public: + explicit C_ReinitWatch(RGWWatcher *_watcher) : watcher(_watcher) {} + void finish(int r) override { + watcher->reinit(); + } + }; +public: + RGWWatcher(CephContext *_cct, RGWSI_Notify *s, int i, RGWSI_RADOS::Obj& o) : cct(_cct), svc(s), index(i), obj(o), watch_handle(0) {} + void handle_notify(uint64_t notify_id, + uint64_t cookie, + uint64_t notifier_id, + bufferlist& bl) override { + ldout(cct, 10) << "RGWWatcher::handle_notify() " + << " notify_id " << notify_id + << " cookie " << cookie + << " notifier " << notifier_id + << " bl.length()=" << bl.length() << dendl; + + if (unlikely(svc->inject_notify_timeout_probability == 1) || + (svc->inject_notify_timeout_probability > 0 && + (svc->inject_notify_timeout_probability > + ceph::util::generate_random_number(0.0, 1.0)))) { + ldout(cct, 0) + << "RGWWatcher::handle_notify() dropping notification! " + << "If this isn't what you want, set " + << "rgw_inject_notify_timeout_probability to zero!" << dendl; + return; + } + + svc->watch_cb(notify_id, cookie, notifier_id, bl); + + bufferlist reply_bl; // empty reply payload + obj.notify_ack(notify_id, cookie, reply_bl); + } + void handle_error(uint64_t cookie, int err) override { + lderr(cct) << "RGWWatcher::handle_error cookie " << cookie + << " err " << cpp_strerror(err) << dendl; + svc->remove_watcher(index); + svc->schedule_context(new C_ReinitWatch(this)); + } + + void reinit() { + int ret = unregister_watch(); + if (ret < 0) { + ldout(cct, 0) << "ERROR: unregister_watch() returned ret=" << ret << dendl; + return; + } + ret = register_watch(); + if (ret < 0) { + ldout(cct, 0) << "ERROR: register_watch() returned ret=" << ret << dendl; + return; + } + } + + int unregister_watch() { + int r = svc->unwatch(obj, watch_handle); + if (r < 0) { + return r; + } + svc->remove_watcher(index); + return 0; + } + + int register_watch_async() { + if (register_completion) { + register_completion->release(); + register_completion = nullptr; + } + register_completion = librados::Rados::aio_create_completion(nullptr, nullptr, nullptr); + register_ret = obj.aio_watch(register_completion, &watch_handle, this); + if (register_ret < 0) { + register_completion->release(); + return register_ret; + } + return 0; + } + + int register_watch_finish() { + if (register_ret < 0) { + return register_ret; + } + if (!register_completion) { + return -EINVAL; + } + register_completion->wait_for_safe(); + int r = register_completion->get_return_value(); + register_completion->release(); + register_completion = nullptr; + if (r < 0) { + return r; + } + svc->add_watcher(index); + return 0; + } + + int register_watch() { + int r = obj.watch(&watch_handle, this); + if (r < 0) { + return r; + } + svc->add_watcher(index); + return 0; + } +}; + + +class RGWSI_Notify_ShutdownCB : public RGWSI_Finisher::ShutdownCB +{ + RGWSI_Notify *svc; +public: + RGWSI_Notify_ShutdownCB(RGWSI_Notify *_svc) : svc(_svc) {} + void call() override { + svc->shutdown(); + } +}; + +string RGWSI_Notify::get_control_oid(int i) +{ + char buf[notify_oid_prefix.size() + 16]; + snprintf(buf, sizeof(buf), "%s.%d", notify_oid_prefix.c_str(), i); + + return string(buf); +} + +// do not call pick_obj_control before init_watch +RGWSI_RADOS::Obj RGWSI_Notify::pick_control_obj(const string& key) +{ + uint32_t r = ceph_str_hash_linux(key.c_str(), key.size()); + + int i = r % num_watchers; + return notify_objs[i]; +} + +int RGWSI_Notify::init_watch() +{ + num_watchers = cct->_conf->rgw_num_control_oids; + + bool compat_oid = (num_watchers == 0); + + if (num_watchers <= 0) + num_watchers = 1; + + watchers = new RGWWatcher *[num_watchers]; + + int error = 0; + + notify_objs.resize(num_watchers); + + for (int i=0; i < num_watchers; i++) { + string notify_oid; + + if (!compat_oid) { + notify_oid = get_control_oid(i); + } else { + notify_oid = notify_oid_prefix; + } + + notify_objs[i] = rados_svc->handle().obj({control_pool, notify_oid}); + auto& notify_obj = notify_objs[i]; + + int r = notify_obj.open(); + if (r < 0) { + ldout(cct, 0) << "ERROR: notify_obj.open() returned r=" << r << dendl; + return r; + } + + librados::ObjectWriteOperation op; + op.create(false); + r = notify_obj.operate(&op, null_yield); + if (r < 0 && r != -EEXIST) { + ldout(cct, 0) << "ERROR: notify_obj.operate() returned r=" << r << dendl; + return r; + } + + RGWWatcher *watcher = new RGWWatcher(cct, this, i, notify_obj); + watchers[i] = watcher; + + r = watcher->register_watch_async(); + if (r < 0) { + ldout(cct, 0) << "WARNING: register_watch_aio() returned " << r << dendl; + error = r; + continue; + } + } + + for (int i = 0; i < num_watchers; ++i) { + int r = watchers[i]->register_watch_finish(); + if (r < 0) { + ldout(cct, 0) << "WARNING: async watch returned " << r << dendl; + error = r; + } + } + + if (error < 0) { + return error; + } + + return 0; +} + +void RGWSI_Notify::finalize_watch() +{ + for (int i = 0; i < num_watchers; i++) { + RGWWatcher *watcher = watchers[i]; + watcher->unregister_watch(); + delete watcher; + } + + delete[] watchers; +} + +int RGWSI_Notify::do_start() +{ + int r = zone_svc->start(); + if (r < 0) { + return r; + } + + assert(zone_svc->is_started()); /* otherwise there's an ordering problem */ + + r = rados_svc->start(); + if (r < 0) { + return r; + } + r = finisher_svc->start(); + if (r < 0) { + return r; + } + + control_pool = zone_svc->get_zone_params().control_pool; + + int ret = init_watch(); + if (ret < 0) { + lderr(cct) << "ERROR: failed to initialize watch: " << cpp_strerror(-ret) << dendl; + return ret; + } + + shutdown_cb = new RGWSI_Notify_ShutdownCB(this); + int handle; + finisher_svc->register_caller(shutdown_cb, &handle); + finisher_handle = handle; + + return 0; +} + +void RGWSI_Notify::shutdown() +{ + if (finalized) { + return; + } + + if (finisher_handle) { + finisher_svc->unregister_caller(*finisher_handle); + } + finalize_watch(); + + delete shutdown_cb; + + finalized = true; +} + +RGWSI_Notify::~RGWSI_Notify() +{ + shutdown(); +} + +int RGWSI_Notify::unwatch(RGWSI_RADOS::Obj& obj, uint64_t watch_handle) +{ + int r = obj.unwatch(watch_handle); + if (r < 0) { + ldout(cct, 0) << "ERROR: rados->unwatch2() returned r=" << r << dendl; + return r; + } + r = rados_svc->handle().watch_flush(); + if (r < 0) { + ldout(cct, 0) << "ERROR: rados->watch_flush() returned r=" << r << dendl; + return r; + } + return 0; +} + +void RGWSI_Notify::add_watcher(int i) +{ + ldout(cct, 20) << "add_watcher() i=" << i << dendl; + RWLock::WLocker l(watchers_lock); + watchers_set.insert(i); + if (watchers_set.size() == (size_t)num_watchers) { + ldout(cct, 2) << "all " << num_watchers << " watchers are set, enabling cache" << dendl; + _set_enabled(true); + } +} + +void RGWSI_Notify::remove_watcher(int i) +{ + ldout(cct, 20) << "remove_watcher() i=" << i << dendl; + RWLock::WLocker l(watchers_lock); + size_t orig_size = watchers_set.size(); + watchers_set.erase(i); + if (orig_size == (size_t)num_watchers && + watchers_set.size() < orig_size) { /* actually removed */ + ldout(cct, 2) << "removed watcher, disabling cache" << dendl; + _set_enabled(false); + } +} + +int RGWSI_Notify::watch_cb(uint64_t notify_id, + uint64_t cookie, + uint64_t notifier_id, + bufferlist& bl) +{ + RWLock::RLocker l(watchers_lock); + if (cb) { + return cb->watch_cb(notify_id, cookie, notifier_id, bl); + } + return 0; +} + +void RGWSI_Notify::set_enabled(bool status) +{ + RWLock::WLocker l(watchers_lock); + _set_enabled(status); +} + +void RGWSI_Notify::_set_enabled(bool status) +{ + enabled = status; + if (cb) { + cb->set_enabled(status); + } +} + +int RGWSI_Notify::distribute(const string& key, bufferlist& bl) +{ + /* The RGW uses the control pool to store the watch notify objects. + The precedence in RGWSI_Notify::do_start is to call to zone_svc->start and later to init_watch(). + The first time, RGW starts in the cluster, the RGW will try to create zone and zonegroup system object. + In that case RGW will try to distribute the cache before it ran init_watch, + which will lead to division by 0 in pick_obj_control (num_watchers is 0). + */ + if (num_watchers > 0) { + RGWSI_RADOS::Obj notify_obj = pick_control_obj(key); + + ldout(cct, 10) << "distributing notification oid=" << notify_obj.get_ref().obj + << " bl.length()=" << bl.length() << dendl; + return robust_notify(notify_obj, bl); + } + return 0; +} + +int RGWSI_Notify::robust_notify(RGWSI_RADOS::Obj& notify_obj, bufferlist& bl) +{ + // The reply of every machine that acks goes in here. + boost::container::flat_set<std::pair<uint64_t, uint64_t>> acks; + bufferlist rbl; + + // First, try to send, without being fancy about it. + auto r = notify_obj.notify(bl, 0, &rbl); + + // If that doesn't work, get serious. + if (r < 0) { + ldout(cct, 1) << "robust_notify: If at first you don't succeed: " + << cpp_strerror(-r) << dendl; + + + auto p = rbl.cbegin(); + // Gather up the replies to the first attempt. + try { + uint32_t num_acks; + decode(num_acks, p); + // Doing this ourselves since we don't care about the payload; + for (auto i = 0u; i < num_acks; ++i) { + std::pair<uint64_t, uint64_t> id; + decode(id, p); + acks.insert(id); + ldout(cct, 20) << "robust_notify: acked by " << id << dendl; + uint32_t blen; + decode(blen, p); + p.advance(blen); + } + } catch (const buffer::error& e) { + ldout(cct, 0) << "robust_notify: notify response parse failed: " + << e.what() << dendl; + acks.clear(); // Throw away junk on failed parse. + } + + + // Every machine that fails to reply and hasn't acked a previous + // attempt goes in here. + boost::container::flat_set<std::pair<uint64_t, uint64_t>> timeouts; + + auto tries = 1u; + while (r < 0 && tries < max_notify_retries) { + ++tries; + rbl.clear(); + // Reset the timeouts, we're only concerned with new ones. + timeouts.clear(); + r = notify_obj.notify(bl, 0, &rbl); + if (r < 0) { + ldout(cct, 1) << "robust_notify: retry " << tries << " failed: " + << cpp_strerror(-r) << dendl; + p = rbl.begin(); + try { + uint32_t num_acks; + decode(num_acks, p); + // Not only do we not care about the payload, but we don't + // want to empty the container; we just want to augment it + // with any new members. + for (auto i = 0u; i < num_acks; ++i) { + std::pair<uint64_t, uint64_t> id; + decode(id, p); + auto ir = acks.insert(id); + if (ir.second) { + ldout(cct, 20) << "robust_notify: acked by " << id << dendl; + } + uint32_t blen; + decode(blen, p); + p.advance(blen); + } + + uint32_t num_timeouts; + decode(num_timeouts, p); + for (auto i = 0u; i < num_timeouts; ++i) { + std::pair<uint64_t, uint64_t> id; + decode(id, p); + // Only track timeouts from hosts that haven't acked previously. + if (acks.find(id) != acks.cend()) { + ldout(cct, 20) << "robust_notify: " << id << " timed out." + << dendl; + timeouts.insert(id); + } + } + } catch (const buffer::error& e) { + ldout(cct, 0) << "robust_notify: notify response parse failed: " + << e.what() << dendl; + continue; + } + // If we got a good parse and timeouts is empty, that means + // everyone who timed out in one call received the update in a + // previous one. + if (timeouts.empty()) { + r = 0; + } + } + } + } + return r; +} + +void RGWSI_Notify::register_watch_cb(CB *_cb) +{ + RWLock::WLocker l(watchers_lock); + cb = _cb; + _set_enabled(enabled); +} + +void RGWSI_Notify::schedule_context(Context *c) +{ + finisher_svc->schedule_context(c); +} diff --git a/src/rgw/services/svc_notify.h b/src/rgw/services/svc_notify.h new file mode 100644 index 00000000..cd9d9eb8 --- /dev/null +++ b/src/rgw/services/svc_notify.h @@ -0,0 +1,100 @@ +#ifndef CEPH_RGW_SERVICES_NOTIFY_H +#define CEPH_RGW_SERVICES_NOTIFY_H + + +#include "rgw/rgw_service.h" + +#include "svc_rados.h" + + +class RGWSI_Zone; +class RGWSI_Finisher; + +class RGWWatcher; +class RGWSI_Notify_ShutdownCB; + +class RGWSI_Notify : public RGWServiceInstance +{ + friend class RGWWatcher; + friend class RGWSI_Notify_ShutdownCB; + friend class RGWServices_Def; + +public: + class CB; + +private: + RGWSI_Zone *zone_svc{nullptr}; + RGWSI_RADOS *rados_svc{nullptr}; + RGWSI_Finisher *finisher_svc{nullptr}; + + RWLock watchers_lock{"watchers_lock"}; + rgw_pool control_pool; + + int num_watchers{0}; + RGWWatcher **watchers{nullptr}; + std::set<int> watchers_set; + vector<RGWSI_RADOS::Obj> notify_objs; + + bool enabled{false}; + + double inject_notify_timeout_probability{0}; + unsigned max_notify_retries{0}; + + string get_control_oid(int i); + RGWSI_RADOS::Obj pick_control_obj(const string& key); + + CB *cb{nullptr}; + + std::optional<int> finisher_handle; + RGWSI_Notify_ShutdownCB *shutdown_cb{nullptr}; + + bool finalized{false}; + + int init_watch(); + void finalize_watch(); + + void init(RGWSI_Zone *_zone_svc, + RGWSI_RADOS *_rados_svc, + RGWSI_Finisher *_finisher_svc) { + zone_svc = _zone_svc; + rados_svc = _rados_svc; + finisher_svc = _finisher_svc; + } + int do_start() override; + void shutdown() override; + + int unwatch(RGWSI_RADOS::Obj& obj, uint64_t watch_handle); + void add_watcher(int i); + void remove_watcher(int i); + + int watch_cb(uint64_t notify_id, + uint64_t cookie, + uint64_t notifier_id, + bufferlist& bl); + void _set_enabled(bool status); + void set_enabled(bool status); + + int robust_notify(RGWSI_RADOS::Obj& notify_obj, bufferlist& bl); + + void schedule_context(Context *c); +public: + RGWSI_Notify(CephContext *cct): RGWServiceInstance(cct) {} + ~RGWSI_Notify(); + + class CB { + public: + virtual ~CB() {} + virtual int watch_cb(uint64_t notify_id, + uint64_t cookie, + uint64_t notifier_id, + bufferlist& bl) = 0; + virtual void set_enabled(bool status) = 0; + }; + + int distribute(const string& key, bufferlist& bl); + + void register_watch_cb(CB *cb); +}; + +#endif + diff --git a/src/rgw/services/svc_quota.cc b/src/rgw/services/svc_quota.cc new file mode 100644 index 00000000..f2baac36 --- /dev/null +++ b/src/rgw/services/svc_quota.cc @@ -0,0 +1,15 @@ +#include "svc_quota.h" +#include "svc_zone.h" + +#include "rgw/rgw_zone.h" + +const RGWQuotaInfo& RGWSI_Quota::get_bucket_quota() const +{ + return zone_svc->get_current_period().get_config().bucket_quota; +} + +const RGWQuotaInfo& RGWSI_Quota::get_user_quota() const +{ + return zone_svc->get_current_period().get_config().user_quota; +} + diff --git a/src/rgw/services/svc_quota.h b/src/rgw/services/svc_quota.h new file mode 100644 index 00000000..7dfbf19b --- /dev/null +++ b/src/rgw/services/svc_quota.h @@ -0,0 +1,23 @@ +#ifndef CEPH_RGW_SERVICES_QUOTA_H +#define CEPH_RGW_SERVICES_QUOTA_H + + +#include "rgw/rgw_service.h" + + +class RGWSI_Quota : public RGWServiceInstance +{ + RGWSI_Zone *zone_svc{nullptr}; + +public: + RGWSI_Quota(CephContext *cct): RGWServiceInstance(cct) {} + + void init(RGWSI_Zone *_zone_svc) { + zone_svc = _zone_svc; + } + + const RGWQuotaInfo& get_bucket_quota() const; + const RGWQuotaInfo& get_user_quota() const; +}; + +#endif diff --git a/src/rgw/services/svc_rados.cc b/src/rgw/services/svc_rados.cc new file mode 100644 index 00000000..408d25d9 --- /dev/null +++ b/src/rgw/services/svc_rados.cc @@ -0,0 +1,308 @@ +#include "svc_rados.h" + +#include "include/rados/librados.hpp" +#include "common/errno.h" +#include "osd/osd_types.h" +#include "rgw/rgw_tools.h" + +#define dout_subsys ceph_subsys_rgw + +int RGWSI_RADOS::do_start() +{ + int ret = rados.init_with_context(cct); + if (ret < 0) { + return ret; + } + ret = rados.connect(); + if (ret < 0) { + return ret; + } + return 0; +} + +librados::Rados* RGWSI_RADOS::get_rados_handle() +{ + return &rados; +} + +uint64_t RGWSI_RADOS::instance_id() +{ + return get_rados_handle()->get_instance_id(); +} + +int RGWSI_RADOS::open_pool_ctx(const rgw_pool& pool, librados::IoCtx& io_ctx) +{ + constexpr bool create = true; // create the pool if it doesn't exist + return rgw_init_ioctx(get_rados_handle(), pool, io_ctx, create); +} + +int RGWSI_RADOS::pool_iterate(librados::IoCtx& io_ctx, + librados::NObjectIterator& iter, + uint32_t num, vector<rgw_bucket_dir_entry>& objs, + RGWAccessListFilter *filter, + bool *is_truncated) +{ + if (iter == io_ctx.nobjects_end()) + return -ENOENT; + + uint32_t i; + + for (i = 0; i < num && iter != io_ctx.nobjects_end(); ++i, ++iter) { + rgw_bucket_dir_entry e; + + string oid = iter->get_oid(); + ldout(cct, 20) << "RGWRados::pool_iterate: got " << oid << dendl; + + // fill it in with initial values; we may correct later + if (filter && !filter->filter(oid, oid)) + continue; + + e.key = oid; + objs.push_back(e); + } + + if (is_truncated) + *is_truncated = (iter != io_ctx.nobjects_end()); + + return objs.size(); +} + +void RGWSI_RADOS::Obj::init(const rgw_raw_obj& obj) +{ + ref.obj = obj; +} + +int RGWSI_RADOS::Obj::open() +{ + int r = rados_svc->open_pool_ctx(ref.obj.pool, ref.ioctx); + if (r < 0) { + return r; + } + + ref.ioctx.locator_set_key(ref.obj.loc); + + return 0; +} + +int RGWSI_RADOS::Obj::operate(librados::ObjectWriteOperation *op, + optional_yield y) +{ + return rgw_rados_operate(ref.ioctx, ref.obj.oid, op, y); +} + +int RGWSI_RADOS::Obj::operate(librados::ObjectReadOperation *op, bufferlist *pbl, + optional_yield y) +{ + return rgw_rados_operate(ref.ioctx, ref.obj.oid, op, pbl, y); +} + +int RGWSI_RADOS::Obj::aio_operate(librados::AioCompletion *c, librados::ObjectWriteOperation *op) +{ + return ref.ioctx.aio_operate(ref.obj.oid, c, op); +} + +int RGWSI_RADOS::Obj::aio_operate(librados::AioCompletion *c, librados::ObjectReadOperation *op, + bufferlist *pbl) +{ + return ref.ioctx.aio_operate(ref.obj.oid, c, op, pbl); +} + +int RGWSI_RADOS::Obj::watch(uint64_t *handle, librados::WatchCtx2 *ctx) +{ + return ref.ioctx.watch2(ref.obj.oid, handle, ctx); +} + +int RGWSI_RADOS::Obj::aio_watch(librados::AioCompletion *c, uint64_t *handle, librados::WatchCtx2 *ctx) +{ + return ref.ioctx.aio_watch(ref.obj.oid, c, handle, ctx); +} + +int RGWSI_RADOS::Obj::unwatch(uint64_t handle) +{ + return ref.ioctx.unwatch2(handle); +} + +int RGWSI_RADOS::Obj::notify(bufferlist& bl, + uint64_t timeout_ms, + bufferlist *pbl) +{ + return ref.ioctx.notify2(ref.obj.oid, bl, timeout_ms, pbl); +} + +void RGWSI_RADOS::Obj::notify_ack(uint64_t notify_id, + uint64_t cookie, + bufferlist& bl) +{ + ref.ioctx.notify_ack(ref.obj.oid, notify_id, cookie, bl); +} + +uint64_t RGWSI_RADOS::Obj::get_last_version() +{ + return ref.ioctx.get_last_version(); +} + +int RGWSI_RADOS::Pool::create() +{ + librados::Rados *rad = rados_svc->get_rados_handle(); + int r = rad->pool_create(pool.name.c_str()); + if (r < 0) { + ldout(rados_svc->cct, 0) << "WARNING: pool_create returned " << r << dendl; + return r; + } + librados::IoCtx io_ctx; + r = rad->ioctx_create(pool.name.c_str(), io_ctx); + if (r < 0) { + ldout(rados_svc->cct, 0) << "WARNING: ioctx_create returned " << r << dendl; + return r; + } + r = io_ctx.application_enable(pg_pool_t::APPLICATION_NAME_RGW, false); + if (r < 0) { + ldout(rados_svc->cct, 0) << "WARNING: application_enable returned " << r << dendl; + return r; + } + return 0; +} + +int RGWSI_RADOS::Pool::create(const vector<rgw_pool>& pools, vector<int> *retcodes) +{ + vector<librados::PoolAsyncCompletion *> completions; + vector<int> rets; + + librados::Rados *rad = rados_svc->get_rados_handle(); + for (auto iter = pools.begin(); iter != pools.end(); ++iter) { + librados::PoolAsyncCompletion *c = librados::Rados::pool_async_create_completion(); + completions.push_back(c); + auto& pool = *iter; + int ret = rad->pool_create_async(pool.name.c_str(), c); + rets.push_back(ret); + } + + vector<int>::iterator riter; + vector<librados::PoolAsyncCompletion *>::iterator citer; + + bool error = false; + ceph_assert(rets.size() == completions.size()); + for (riter = rets.begin(), citer = completions.begin(); riter != rets.end(); ++riter, ++citer) { + int r = *riter; + librados::PoolAsyncCompletion *c = *citer; + if (r == 0) { + c->wait(); + r = c->get_return_value(); + if (r < 0) { + ldout(rados_svc->cct, 0) << "WARNING: async pool_create returned " << r << dendl; + error = true; + } + } + c->release(); + retcodes->push_back(r); + } + if (error) { + return 0; + } + + std::vector<librados::IoCtx> io_ctxs; + retcodes->clear(); + for (auto pool : pools) { + io_ctxs.emplace_back(); + int ret = rad->ioctx_create(pool.name.c_str(), io_ctxs.back()); + if (ret < 0) { + ldout(rados_svc->cct, 0) << "WARNING: ioctx_create returned " << ret << dendl; + error = true; + } + retcodes->push_back(ret); + } + if (error) { + return 0; + } + + completions.clear(); + for (auto &io_ctx : io_ctxs) { + librados::PoolAsyncCompletion *c = + librados::Rados::pool_async_create_completion(); + completions.push_back(c); + int ret = io_ctx.application_enable_async(pg_pool_t::APPLICATION_NAME_RGW, + false, c); + ceph_assert(ret == 0); + } + + retcodes->clear(); + for (auto c : completions) { + c->wait(); + int ret = c->get_return_value(); + if (ret == -EOPNOTSUPP) { + ret = 0; + } else if (ret < 0) { + ldout(rados_svc->cct, 0) << "WARNING: async application_enable returned " << ret + << dendl; + error = true; + } + c->release(); + retcodes->push_back(ret); + } + return 0; +} + +int RGWSI_RADOS::Pool::lookup() +{ + librados::Rados *rad = rados_svc->get_rados_handle(); + int ret = rad->pool_lookup(pool.name.c_str()); + if (ret < 0) { + return ret; + } + + return 0; +} + +int RGWSI_RADOS::Pool::List::init(const string& marker, RGWAccessListFilter *filter) +{ + if (ctx.initialized) { + return -EINVAL; + } + + int r = pool.rados_svc->open_pool_ctx(pool.pool, ctx.ioctx); + if (r < 0) { + return r; + } + + librados::ObjectCursor oc; + if (!oc.from_str(marker)) { + ldout(pool.rados_svc->cct, 10) << "failed to parse cursor: " << marker << dendl; + return -EINVAL; + } + + ctx.iter = ctx.ioctx.nobjects_begin(oc); + ctx.filter = filter; + ctx.initialized = true; + + return 0; +} + +int RGWSI_RADOS::Pool::List::get_next(int max, + std::list<string> *oids, + bool *is_truncated) +{ + if (!ctx.initialized) { + return -EINVAL; + } + vector<rgw_bucket_dir_entry> objs; + int r = pool.rados_svc->pool_iterate(ctx.ioctx, ctx.iter, max, objs, ctx.filter, is_truncated); + if (r < 0) { + if(r != -ENOENT) { + ldout(pool.rados_svc->cct, 10) << "failed to list objects pool_iterate returned r=" << r << dendl; + } + return r; + } + + vector<rgw_bucket_dir_entry>::iterator iter; + for (auto& o : objs) { + oids->push_back(o.key.name); + } + + return oids->size(); +} + +int RGWSI_RADOS::Handle::watch_flush() +{ + librados::Rados *rad = rados_svc->get_rados_handle(); + return rad->watch_flush(); +} diff --git a/src/rgw/services/svc_rados.h b/src/rgw/services/svc_rados.h new file mode 100644 index 00000000..0453eb0c --- /dev/null +++ b/src/rgw/services/svc_rados.h @@ -0,0 +1,178 @@ +#ifndef CEPH_RGW_SERVICES_RADOS_H +#define CEPH_RGW_SERVICES_RADOS_H + + +#include "rgw/rgw_service.h" + +#include "include/rados/librados.hpp" +#include "common/async/yield_context.h" + +class RGWAccessListFilter { +public: + virtual ~RGWAccessListFilter() {} + virtual bool filter(const string& name, string& key) = 0; +}; + +struct RGWAccessListFilterPrefix : public RGWAccessListFilter { + string prefix; + + explicit RGWAccessListFilterPrefix(const string& _prefix) : prefix(_prefix) {} + bool filter(const string& name, string& key) override { + return (prefix.compare(key.substr(0, prefix.size())) == 0); + } +}; + +struct rgw_rados_ref { + rgw_raw_obj obj; + librados::IoCtx ioctx; +}; + +class RGWSI_RADOS : public RGWServiceInstance +{ + librados::Rados rados; + + int do_start() override; + + librados::Rados* get_rados_handle(); + int open_pool_ctx(const rgw_pool& pool, librados::IoCtx& io_ctx); + int pool_iterate(librados::IoCtx& ioctx, + librados::NObjectIterator& iter, + uint32_t num, vector<rgw_bucket_dir_entry>& objs, + RGWAccessListFilter *filter, + bool *is_truncated); + +public: + RGWSI_RADOS(CephContext *cct) : RGWServiceInstance(cct) {} + + void init() {} + + uint64_t instance_id(); + + class Handle; + + class Obj { + friend class RGWSI_RADOS; + friend Handle; + + RGWSI_RADOS *rados_svc{nullptr}; + rgw_rados_ref ref; + + void init(const rgw_raw_obj& obj); + + Obj(RGWSI_RADOS *_rados_svc, const rgw_raw_obj& _obj) + : rados_svc(_rados_svc) { + init(_obj); + } + + public: + Obj() {} + + int open(); + + int operate(librados::ObjectWriteOperation *op, optional_yield y); + int operate(librados::ObjectReadOperation *op, bufferlist *pbl, + optional_yield y); + int aio_operate(librados::AioCompletion *c, librados::ObjectWriteOperation *op); + int aio_operate(librados::AioCompletion *c, librados::ObjectReadOperation *op, + bufferlist *pbl); + + int watch(uint64_t *handle, librados::WatchCtx2 *ctx); + int aio_watch(librados::AioCompletion *c, uint64_t *handle, librados::WatchCtx2 *ctx); + int unwatch(uint64_t handle); + int notify(bufferlist& bl, + uint64_t timeout_ms, + bufferlist *pbl); + void notify_ack(uint64_t notify_id, + uint64_t cookie, + bufferlist& bl); + + uint64_t get_last_version(); + + rgw_rados_ref& get_ref() { return ref; } + const rgw_rados_ref& get_ref() const { return ref; } + }; + + class Pool { + friend class RGWSI_RADOS; + friend Handle; + + RGWSI_RADOS *rados_svc{nullptr}; + rgw_pool pool; + + Pool(RGWSI_RADOS *_rados_svc, + const rgw_pool& _pool) : rados_svc(_rados_svc), + pool(_pool) {} + + Pool(RGWSI_RADOS *_rados_svc) : rados_svc(_rados_svc) {} + public: + Pool() {} + + int create(); + int create(const std::vector<rgw_pool>& pools, std::vector<int> *retcodes); + int lookup(); + + struct List { + Pool& pool; + + struct Ctx { + bool initialized{false}; + librados::IoCtx ioctx; + librados::NObjectIterator iter; + RGWAccessListFilter *filter{nullptr}; + } ctx; + + List(Pool& _pool) : pool(_pool) {} + + int init(const string& marker, RGWAccessListFilter *filter = nullptr); + int get_next(int max, + std::list<string> *oids, + bool *is_truncated); + }; + + List op() { + return List(*this); + } + + friend List; + }; + + class Handle { + friend class RGWSI_RADOS; + + RGWSI_RADOS *rados_svc{nullptr}; + + Handle(RGWSI_RADOS *_rados_svc) : rados_svc(_rados_svc) {} + public: + Obj obj(const rgw_raw_obj& o) { + return Obj(rados_svc, o); + } + + Pool pool(const rgw_pool& p) { + return Pool(rados_svc, p); + } + + int watch_flush(); + }; + + Handle handle() { + return Handle(this); + } + + Obj obj(const rgw_raw_obj& o) { + return Obj(this, o); + } + + Pool pool() { + return Pool(this); + } + + Pool pool(const rgw_pool& p) { + return Pool(this, p); + } + + friend Obj; + friend Pool; + friend Pool::List; +}; + +#endif diff --git a/src/rgw/services/svc_sync_modules.cc b/src/rgw/services/svc_sync_modules.cc new file mode 100644 index 00000000..ca6a7a30 --- /dev/null +++ b/src/rgw/services/svc_sync_modules.cc @@ -0,0 +1,15 @@ +#include "svc_sync_modules.h" + +#include "rgw/rgw_sync_module.h" + +void RGWSI_SyncModules::init() +{ + sync_modules_manager = new RGWSyncModulesManager(); + rgw_register_sync_modules(sync_modules_manager); +} + +RGWSI_SyncModules::~RGWSI_SyncModules() +{ + delete sync_modules_manager; +} + diff --git a/src/rgw/services/svc_sync_modules.h b/src/rgw/services/svc_sync_modules.h new file mode 100644 index 00000000..19c4ec57 --- /dev/null +++ b/src/rgw/services/svc_sync_modules.h @@ -0,0 +1,26 @@ +#ifndef CEPH_RGW_SERVICES_SYNC_MODULES_H +#define CEPH_RGW_SERVICES_SYNC_MODULES_H + + +#include "rgw/rgw_service.h" + + +class RGWSyncModulesManager; + +class RGWSI_SyncModules : public RGWServiceInstance +{ + RGWSyncModulesManager *sync_modules_manager{nullptr}; + +public: + RGWSI_SyncModules(CephContext *cct): RGWServiceInstance(cct) {} + ~RGWSI_SyncModules(); + + RGWSyncModulesManager *get_manager() { + return sync_modules_manager; + } + + void init(); +}; + +#endif + diff --git a/src/rgw/services/svc_sys_obj.cc b/src/rgw/services/svc_sys_obj.cc new file mode 100644 index 00000000..1eda37f8 --- /dev/null +++ b/src/rgw/services/svc_sys_obj.cc @@ -0,0 +1,192 @@ +#include "svc_sys_obj.h" +#include "svc_sys_obj_core.h" +#include "svc_rados.h" +#include "svc_zone.h" + +#include "rgw/rgw_zone.h" + +#define dout_subsys ceph_subsys_rgw + +RGWSysObjectCtx RGWSI_SysObj::init_obj_ctx() +{ + return RGWSysObjectCtx(this); +} + +RGWSI_SysObj::Obj RGWSI_SysObj::get_obj(RGWSysObjectCtx& obj_ctx, const rgw_raw_obj& obj) +{ + return Obj(core_svc, obj_ctx, obj); +} + +void RGWSI_SysObj::Obj::invalidate() +{ + ctx.invalidate(obj); +} + +int RGWSI_SysObj::Obj::ROp::stat() +{ + RGWSI_SysObj_Core *svc = source.core_svc; + rgw_raw_obj& obj = source.obj; + + return svc->stat(source.get_ctx(), state, obj, + attrs, raw_attrs, + lastmod, obj_size, + objv_tracker); +} + +int RGWSI_SysObj::Obj::ROp::read(int64_t ofs, int64_t end, bufferlist *bl) +{ + RGWSI_SysObj_Core *svc = source.core_svc; + rgw_raw_obj& obj = source.get_obj(); + + return svc->read(source.get_ctx(), state, + objv_tracker, + obj, bl, ofs, end, + attrs, + raw_attrs, + cache_info, + refresh_version); +} + +int RGWSI_SysObj::Obj::ROp::get_attr(const char *name, bufferlist *dest) +{ + RGWSI_SysObj_Core *svc = source.core_svc; + rgw_raw_obj& obj = source.get_obj(); + + return svc->get_attr(obj, name, dest); +} + +int RGWSI_SysObj::Obj::WOp::remove() +{ + RGWSI_SysObj_Core *svc = source.core_svc; + rgw_raw_obj& obj = source.get_obj(); + + return svc->remove(source.get_ctx(), + objv_tracker, + obj); +} + +int RGWSI_SysObj::Obj::WOp::write(bufferlist& bl) +{ + RGWSI_SysObj_Core *svc = source.core_svc; + rgw_raw_obj& obj = source.get_obj(); + + return svc->write(obj, pmtime, attrs, exclusive, + bl, objv_tracker, mtime); +} + +int RGWSI_SysObj::Obj::WOp::write_data(bufferlist& bl) +{ + RGWSI_SysObj_Core *svc = source.core_svc; + rgw_raw_obj& obj = source.get_obj(); + + return svc->write_data(obj, bl, exclusive, objv_tracker); +} + +int RGWSI_SysObj::Obj::WOp::write_attrs() +{ + RGWSI_SysObj_Core *svc = source.core_svc; + rgw_raw_obj& obj = source.get_obj(); + + return svc->set_attrs(obj, attrs, nullptr, objv_tracker); +} + +int RGWSI_SysObj::Obj::WOp::write_attr(const char *name, bufferlist& bl) +{ + RGWSI_SysObj_Core *svc = source.core_svc; + rgw_raw_obj& obj = source.get_obj(); + + map<string, bufferlist> m; + m[name] = bl; + + return svc->set_attrs(obj, m, nullptr, objv_tracker); +} + +int RGWSI_SysObj::Pool::Op::list_prefixed_objs(const string& prefix, list<string> *result) +{ + bool is_truncated; + + auto rados_pool = source.rados_svc->pool(source.pool); + + auto op = rados_pool.op(); + + RGWAccessListFilterPrefix filter(prefix); + + int r = op.init(string(), &filter); + if (r < 0) { + return r; + } + + do { + list<string> oids; +#define MAX_OBJS_DEFAULT 1000 + int r = op.get_next(MAX_OBJS_DEFAULT, &oids, &is_truncated); + if (r < 0) { + return r; + } + for (auto& val : oids) { + if (val.size() > prefix.size()) { + result->push_back(val.substr(prefix.size())); + } + } + } while (is_truncated); + + return 0; +} + +int RGWSI_SysObj::Obj::OmapOp::get_all(std::map<string, bufferlist> *m) +{ + RGWSI_SysObj_Core *svc = source.core_svc; + rgw_raw_obj& obj = source.obj; + + return svc->omap_get_all(obj, m); +} + +int RGWSI_SysObj::Obj::OmapOp::get_vals(const string& marker, + uint64_t count, + std::map<string, bufferlist> *m, + bool *pmore) +{ + RGWSI_SysObj_Core *svc = source.core_svc; + rgw_raw_obj& obj = source.obj; + + return svc->omap_get_vals(obj, marker, count, m, pmore); +} + +int RGWSI_SysObj::Obj::OmapOp::set(const std::string& key, bufferlist& bl) +{ + RGWSI_SysObj_Core *svc = source.core_svc; + rgw_raw_obj& obj = source.obj; + + return svc->omap_set(obj, key, bl, must_exist); +} + +int RGWSI_SysObj::Obj::OmapOp::set(const map<std::string, bufferlist>& m) +{ + RGWSI_SysObj_Core *svc = source.core_svc; + rgw_raw_obj& obj = source.obj; + + return svc->omap_set(obj, m, must_exist); +} + +int RGWSI_SysObj::Obj::OmapOp::del(const std::string& key) +{ + RGWSI_SysObj_Core *svc = source.core_svc; + rgw_raw_obj& obj = source.obj; + + return svc->omap_del(obj, key); +} + +int RGWSI_SysObj::Obj::WNOp::notify(bufferlist& bl, + uint64_t timeout_ms, + bufferlist *pbl) +{ + RGWSI_SysObj_Core *svc = source.core_svc; + rgw_raw_obj& obj = source.obj; + + return svc->notify(obj, bl, timeout_ms, pbl); +} + +RGWSI_Zone *RGWSI_SysObj::get_zone_svc() +{ + return core_svc->get_zone_svc(); +} diff --git a/src/rgw/services/svc_sys_obj.h b/src/rgw/services/svc_sys_obj.h new file mode 100644 index 00000000..f6cd77ce --- /dev/null +++ b/src/rgw/services/svc_sys_obj.h @@ -0,0 +1,275 @@ +#ifndef CEPH_RGW_SERVICES_SYS_OBJ_H +#define CEPH_RGW_SERVICES_SYS_OBJ_H + + +#include "rgw/rgw_service.h" + +#include "svc_rados.h" +#include "svc_sys_obj_core.h" + + +class RGWSI_Zone; +class RGWSI_SysObj; +class RGWSysObjectCtx; + +struct rgw_cache_entry_info; + +class RGWSI_SysObj : public RGWServiceInstance +{ + friend struct RGWServices_Def; + +public: + class Obj { + friend class ROp; + + RGWSI_SysObj_Core *core_svc; + RGWSysObjectCtx& ctx; + rgw_raw_obj obj; + + public: + Obj(RGWSI_SysObj_Core *_core_svc, + RGWSysObjectCtx& _ctx, + const rgw_raw_obj& _obj) : core_svc(_core_svc), + ctx(_ctx), + obj(_obj) {} + + void invalidate(); + + RGWSysObjectCtx& get_ctx() { + return ctx; + } + + rgw_raw_obj& get_obj() { + return obj; + } + + struct ROp { + Obj& source; + + RGWSI_SysObj_Core::GetObjState state; + + RGWObjVersionTracker *objv_tracker{nullptr}; + map<string, bufferlist> *attrs{nullptr}; + bool raw_attrs{false}; + boost::optional<obj_version> refresh_version{boost::none}; + ceph::real_time *lastmod{nullptr}; + uint64_t *obj_size{nullptr}; + rgw_cache_entry_info *cache_info{nullptr}; + + ROp& set_objv_tracker(RGWObjVersionTracker *_objv_tracker) { + objv_tracker = _objv_tracker; + return *this; + } + + ROp& set_last_mod(ceph::real_time *_lastmod) { + lastmod = _lastmod; + return *this; + } + + ROp& set_obj_size(uint64_t *_obj_size) { + obj_size = _obj_size; + return *this; + } + + ROp& set_attrs(map<string, bufferlist> *_attrs) { + attrs = _attrs; + return *this; + } + + ROp& set_raw_attrs(bool ra) { + raw_attrs = ra; + return *this; + } + + ROp& set_refresh_version(boost::optional<obj_version>& rf) { + refresh_version = rf; + return *this; + } + + ROp& set_cache_info(rgw_cache_entry_info *ci) { + cache_info = ci; + return *this; + } + + ROp(Obj& _source) : source(_source) {} + + int stat(); + int read(int64_t ofs, int64_t end, bufferlist *pbl); + int read(bufferlist *pbl) { + return read(0, -1, pbl); + } + int get_attr(const char *name, bufferlist *dest); + }; + + struct WOp { + Obj& source; + + RGWObjVersionTracker *objv_tracker{nullptr}; + map<string, bufferlist> attrs; + ceph::real_time mtime; + ceph::real_time *pmtime{nullptr}; + bool exclusive{false}; + + WOp& set_objv_tracker(RGWObjVersionTracker *_objv_tracker) { + objv_tracker = _objv_tracker; + return *this; + } + + WOp& set_attrs(map<string, bufferlist>& _attrs) { + attrs = _attrs; + return *this; + } + + WOp& set_attrs(map<string, bufferlist>&& _attrs) { + attrs = _attrs; + return *this; + } + + WOp& set_mtime(const ceph::real_time& _mtime) { + mtime = _mtime; + return *this; + } + + WOp& set_pmtime(ceph::real_time *_pmtime) { + pmtime = _pmtime; + return *this; + } + + WOp& set_exclusive(bool _exclusive = true) { + exclusive = _exclusive; + return *this; + } + + WOp(Obj& _source) : source(_source) {} + + int remove(); + int write(bufferlist& bl); + + int write_data(bufferlist& bl); /* write data only */ + int write_attrs(); /* write attrs only */ + int write_attr(const char *name, bufferlist& bl); /* write attrs only */ + }; + + struct OmapOp { + Obj& source; + + bool must_exist{false}; + + OmapOp& set_must_exist(bool _must_exist = true) { + must_exist = _must_exist; + return *this; + } + + OmapOp(Obj& _source) : source(_source) {} + + int get_all(std::map<string, bufferlist> *m); + int get_vals(const string& marker, + uint64_t count, + std::map<string, bufferlist> *m, + bool *pmore); + int set(const std::string& key, bufferlist& bl); + int set(const map<std::string, bufferlist>& m); + int del(const std::string& key); + }; + + struct WNOp { + Obj& source; + + WNOp(Obj& _source) : source(_source) {} + + int notify(bufferlist& bl, + uint64_t timeout_ms, + bufferlist *pbl); + }; + ROp rop() { + return ROp(*this); + } + + WOp wop() { + return WOp(*this); + } + + OmapOp omap() { + return OmapOp(*this); + } + + WNOp wn() { + return WNOp(*this); + } + }; + + class Pool { + friend class Op; + + RGWSI_RADOS *rados_svc; + RGWSI_SysObj_Core *core_svc; + rgw_pool pool; + + public: + Pool(RGWSI_RADOS *_rados_svc, + RGWSI_SysObj_Core *_core_svc, + const rgw_pool& _pool) : rados_svc(_rados_svc), + core_svc(_core_svc), + pool(_pool) {} + + rgw_pool& get_pool() { + return pool; + } + + struct Op { + Pool& source; + + Op(Pool& _source) : source(_source) {} + + int list_prefixed_objs(const std::string& prefix, std::list<std::string> *result); + }; + + Op op() { + return Op(*this); + } + }; + + friend class Obj; + friend class Obj::ROp; + friend class Obj::WOp; + friend class Pool; + friend class Pool::Op; + +protected: + RGWSI_RADOS *rados_svc{nullptr}; + RGWSI_SysObj_Core *core_svc{nullptr}; + + void init(RGWSI_RADOS *_rados_svc, + RGWSI_SysObj_Core *_core_svc) { + rados_svc = _rados_svc; + core_svc = _core_svc; + } + +public: + RGWSI_SysObj(CephContext *cct): RGWServiceInstance(cct) {} + + RGWSysObjectCtx init_obj_ctx(); + Obj get_obj(RGWSysObjectCtx& obj_ctx, const rgw_raw_obj& obj); + + Pool get_pool(const rgw_pool& pool) { + return Pool(rados_svc, core_svc, pool); + } + + RGWSI_Zone *get_zone_svc(); +}; + +using RGWSysObj = RGWSI_SysObj::Obj; + +class RGWSysObjectCtx : public RGWSysObjectCtxBase +{ + RGWSI_SysObj *sysobj_svc; +public: + RGWSysObjectCtx(RGWSI_SysObj *_sysobj_svc) : sysobj_svc(_sysobj_svc) {} + + RGWSI_SysObj::Obj get_obj(const rgw_raw_obj& obj) { + return sysobj_svc->get_obj(*this, obj); + } +}; + +#endif + diff --git a/src/rgw/services/svc_sys_obj_cache.cc b/src/rgw/services/svc_sys_obj_cache.cc new file mode 100644 index 00000000..9130e054 --- /dev/null +++ b/src/rgw/services/svc_sys_obj_cache.cc @@ -0,0 +1,506 @@ +#include "svc_sys_obj_cache.h" +#include "svc_zone.h" +#include "svc_notify.h" + +#include "rgw/rgw_zone.h" +#include "rgw/rgw_tools.h" + +#define dout_subsys ceph_subsys_rgw + +class RGWSI_SysObj_Cache_CB : public RGWSI_Notify::CB +{ + RGWSI_SysObj_Cache *svc; +public: + RGWSI_SysObj_Cache_CB(RGWSI_SysObj_Cache *_svc) : svc(_svc) {} + int watch_cb(uint64_t notify_id, + uint64_t cookie, + uint64_t notifier_id, + bufferlist& bl) { + return svc->watch_cb(notify_id, cookie, notifier_id, bl); + } + + void set_enabled(bool status) { + svc->set_enabled(status); + } +}; + +int RGWSI_SysObj_Cache::do_start() +{ + int r = RGWSI_SysObj_Core::do_start(); + if (r < 0) { + return r; + } + + r = notify_svc->start(); + if (r < 0) { + return r; + } + + assert(notify_svc->is_started()); + + cb.reset(new RGWSI_SysObj_Cache_CB(this)); + + notify_svc->register_watch_cb(cb.get()); + + return 0; +} + +static string normal_name(rgw_pool& pool, const std::string& oid) { + std::string buf; + buf.reserve(pool.name.size() + pool.ns.size() + oid.size() + 2); + buf.append(pool.name).append("+").append(pool.ns).append("+").append(oid); + return buf; +} + +void RGWSI_SysObj_Cache::normalize_pool_and_obj(const rgw_pool& src_pool, const string& src_obj, rgw_pool& dst_pool, string& dst_obj) +{ + if (src_obj.size()) { + dst_pool = src_pool; + dst_obj = src_obj; + } else { + dst_pool = zone_svc->get_zone_params().domain_root; + dst_obj = src_pool.name; + } +} + + +int RGWSI_SysObj_Cache::remove(RGWSysObjectCtxBase& obj_ctx, + RGWObjVersionTracker *objv_tracker, + const rgw_raw_obj& obj) + +{ + rgw_pool pool; + string oid; + normalize_pool_and_obj(obj.pool, obj.oid, pool, oid); + + string name = normal_name(pool, oid); + cache.remove(name); + + ObjectCacheInfo info; + int r = distribute_cache(name, obj, info, REMOVE_OBJ); + if (r < 0) { + ldout(cct, 0) << "ERROR: " << __func__ << "(): failed to distribute cache: r=" << r << dendl; + } + + return RGWSI_SysObj_Core::remove(obj_ctx, objv_tracker, obj); +} + +int RGWSI_SysObj_Cache::read(RGWSysObjectCtxBase& obj_ctx, + GetObjState& read_state, + RGWObjVersionTracker *objv_tracker, + const rgw_raw_obj& obj, + bufferlist *obl, off_t ofs, off_t end, + map<string, bufferlist> *attrs, + bool raw_attrs, + rgw_cache_entry_info *cache_info, + boost::optional<obj_version> refresh_version) +{ + rgw_pool pool; + string oid; + if (ofs != 0) { + return RGWSI_SysObj_Core::read(obj_ctx, read_state, objv_tracker, + obj, obl, ofs, end, attrs, raw_attrs, + cache_info, refresh_version); + } + + normalize_pool_and_obj(obj.pool, obj.oid, pool, oid); + string name = normal_name(pool, oid); + + ObjectCacheInfo info; + + uint32_t flags = (end != 0 ? CACHE_FLAG_DATA : 0); + if (objv_tracker) + flags |= CACHE_FLAG_OBJV; + if (attrs) + flags |= CACHE_FLAG_XATTRS; + + int r = cache.get(name, info, flags, cache_info); + if (r == 0 && + (!refresh_version || !info.version.compare(&(*refresh_version)))) { + if (info.status < 0) + return info.status; + + bufferlist& bl = info.data; + + bufferlist::iterator i = bl.begin(); + + obl->clear(); + + i.copy_all(*obl); + if (objv_tracker) + objv_tracker->read_version = info.version; + if (attrs) { + if (raw_attrs) { + *attrs = info.xattrs; + } else { + rgw_filter_attrset(info.xattrs, RGW_ATTR_PREFIX, attrs); + } + } + return obl->length(); + } + if(r == -ENODATA) + return -ENOENT; + + map<string, bufferlist> unfiltered_attrset; + r = RGWSI_SysObj_Core::read(obj_ctx, read_state, objv_tracker, + obj, obl, ofs, end, + (attrs ? &unfiltered_attrset : nullptr), + true, /* cache unfiltered attrs */ + cache_info, + refresh_version); + if (r < 0) { + if (r == -ENOENT) { // only update ENOENT, we'd rather retry other errors + info.status = r; + cache.put(name, info, cache_info); + } + return r; + } + + if (obl->length() == end + 1) { + /* in this case, most likely object contains more data, we can't cache it */ + flags &= ~CACHE_FLAG_DATA; + } else { + bufferptr p(r); + bufferlist& bl = info.data; + bl.clear(); + bufferlist::iterator o = obl->begin(); + o.copy_all(bl); + } + + info.status = 0; + info.flags = flags; + if (objv_tracker) { + info.version = objv_tracker->read_version; + } + if (attrs) { + info.xattrs = std::move(unfiltered_attrset); + if (raw_attrs) { + *attrs = info.xattrs; + } else { + rgw_filter_attrset(info.xattrs, RGW_ATTR_PREFIX, attrs); + } + } + cache.put(name, info, cache_info); + return r; +} + +int RGWSI_SysObj_Cache::get_attr(const rgw_raw_obj& obj, + const char *attr_name, + bufferlist *dest) +{ + rgw_pool pool; + string oid; + + normalize_pool_and_obj(obj.pool, obj.oid, pool, oid); + string name = normal_name(pool, oid); + + ObjectCacheInfo info; + + uint32_t flags = CACHE_FLAG_XATTRS; + + int r = cache.get(name, info, flags, nullptr); + if (r == 0) { + if (info.status < 0) + return info.status; + + auto iter = info.xattrs.find(attr_name); + if (iter == info.xattrs.end()) { + return -ENODATA; + } + + *dest = iter->second; + return dest->length(); + } else if (r == -ENODATA) { + return -ENOENT; + } + /* don't try to cache this one */ + return RGWSI_SysObj_Core::get_attr(obj, attr_name, dest); +} + +int RGWSI_SysObj_Cache::set_attrs(const rgw_raw_obj& obj, + map<string, bufferlist>& attrs, + map<string, bufferlist> *rmattrs, + RGWObjVersionTracker *objv_tracker) +{ + rgw_pool pool; + string oid; + normalize_pool_and_obj(obj.pool, obj.oid, pool, oid); + ObjectCacheInfo info; + info.xattrs = attrs; + if (rmattrs) { + info.rm_xattrs = *rmattrs; + } + info.status = 0; + info.flags = CACHE_FLAG_MODIFY_XATTRS; + int ret = RGWSI_SysObj_Core::set_attrs(obj, attrs, rmattrs, objv_tracker); + string name = normal_name(pool, oid); + if (ret >= 0) { + if (objv_tracker && objv_tracker->read_version.ver) { + info.version = objv_tracker->read_version; + info.flags |= CACHE_FLAG_OBJV; + } + cache.put(name, info, NULL); + int r = distribute_cache(name, obj, info, UPDATE_OBJ); + if (r < 0) + ldout(cct, 0) << "ERROR: failed to distribute cache for " << obj << dendl; + } else { + cache.remove(name); + } + + return ret; +} + +int RGWSI_SysObj_Cache::write(const rgw_raw_obj& obj, + real_time *pmtime, + map<std::string, bufferlist>& attrs, + bool exclusive, + const bufferlist& data, + RGWObjVersionTracker *objv_tracker, + real_time set_mtime) +{ + rgw_pool pool; + string oid; + normalize_pool_and_obj(obj.pool, obj.oid, pool, oid); + ObjectCacheInfo info; + info.xattrs = attrs; + info.status = 0; + info.data = data; + info.flags = CACHE_FLAG_XATTRS | CACHE_FLAG_DATA | CACHE_FLAG_META; + ceph::real_time result_mtime; + int ret = RGWSI_SysObj_Core::write(obj, &result_mtime, attrs, + exclusive, data, + objv_tracker, set_mtime); + if (pmtime) { + *pmtime = result_mtime; + } + if (objv_tracker && objv_tracker->read_version.ver) { + info.version = objv_tracker->read_version; + info.flags |= CACHE_FLAG_OBJV; + } + info.meta.mtime = result_mtime; + info.meta.size = data.length(); + string name = normal_name(pool, oid); + if (ret >= 0) { + cache.put(name, info, NULL); + int r = distribute_cache(name, obj, info, UPDATE_OBJ); + if (r < 0) + ldout(cct, 0) << "ERROR: failed to distribute cache for " << obj << dendl; + } else { + cache.remove(name); + } + + return ret; +} + +int RGWSI_SysObj_Cache::write_data(const rgw_raw_obj& obj, + const bufferlist& data, + bool exclusive, + RGWObjVersionTracker *objv_tracker) +{ + rgw_pool pool; + string oid; + normalize_pool_and_obj(obj.pool, obj.oid, pool, oid); + + ObjectCacheInfo info; + info.data = data; + info.meta.size = data.length(); + info.status = 0; + info.flags = CACHE_FLAG_DATA; + + int ret = RGWSI_SysObj_Core::write_data(obj, data, exclusive, objv_tracker); + string name = normal_name(pool, oid); + if (ret >= 0) { + if (objv_tracker && objv_tracker->read_version.ver) { + info.version = objv_tracker->read_version; + info.flags |= CACHE_FLAG_OBJV; + } + cache.put(name, info, NULL); + int r = distribute_cache(name, obj, info, UPDATE_OBJ); + if (r < 0) + ldout(cct, 0) << "ERROR: failed to distribute cache for " << obj << dendl; + } else { + cache.remove(name); + } + + return ret; +} + +int RGWSI_SysObj_Cache::raw_stat(const rgw_raw_obj& obj, uint64_t *psize, real_time *pmtime, uint64_t *pepoch, + map<string, bufferlist> *attrs, bufferlist *first_chunk, + RGWObjVersionTracker *objv_tracker) +{ + rgw_pool pool; + string oid; + normalize_pool_and_obj(obj.pool, obj.oid, pool, oid); + + string name = normal_name(pool, oid); + + uint64_t size; + real_time mtime; + uint64_t epoch; + + ObjectCacheInfo info; + uint32_t flags = CACHE_FLAG_META | CACHE_FLAG_XATTRS; + if (objv_tracker) + flags |= CACHE_FLAG_OBJV; + int r = cache.get(name, info, flags, NULL); + if (r == 0) { + if (info.status < 0) + return info.status; + + size = info.meta.size; + mtime = info.meta.mtime; + epoch = info.epoch; + if (objv_tracker) + objv_tracker->read_version = info.version; + goto done; + } + if (r == -ENODATA) { + return -ENOENT; + } + r = RGWSI_SysObj_Core::raw_stat(obj, &size, &mtime, &epoch, &info.xattrs, first_chunk, objv_tracker); + if (r < 0) { + if (r == -ENOENT) { + info.status = r; + cache.put(name, info, NULL); + } + return r; + } + info.status = 0; + info.epoch = epoch; + info.meta.mtime = mtime; + info.meta.size = size; + info.flags = CACHE_FLAG_META | CACHE_FLAG_XATTRS; + if (objv_tracker) { + info.flags |= CACHE_FLAG_OBJV; + info.version = objv_tracker->read_version; + } + cache.put(name, info, NULL); +done: + if (psize) + *psize = size; + if (pmtime) + *pmtime = mtime; + if (pepoch) + *pepoch = epoch; + if (attrs) + *attrs = info.xattrs; + return 0; +} + +int RGWSI_SysObj_Cache::distribute_cache(const string& normal_name, const rgw_raw_obj& obj, ObjectCacheInfo& obj_info, int op) +{ + RGWCacheNotifyInfo info; + + info.op = op; + + info.obj_info = obj_info; + info.obj = obj; + bufferlist bl; + encode(info, bl); + return notify_svc->distribute(normal_name, bl); +} + +int RGWSI_SysObj_Cache::watch_cb(uint64_t notify_id, + uint64_t cookie, + uint64_t notifier_id, + bufferlist& bl) +{ + RGWCacheNotifyInfo info; + + try { + auto iter = bl.cbegin(); + decode(info, iter); + } catch (buffer::end_of_buffer& err) { + ldout(cct, 0) << "ERROR: got bad notification" << dendl; + return -EIO; + } catch (buffer::error& err) { + ldout(cct, 0) << "ERROR: buffer::error" << dendl; + return -EIO; + } + + rgw_pool pool; + string oid; + normalize_pool_and_obj(info.obj.pool, info.obj.oid, pool, oid); + string name = normal_name(pool, oid); + + switch (info.op) { + case UPDATE_OBJ: + cache.put(name, info.obj_info, NULL); + break; + case REMOVE_OBJ: + cache.remove(name); + break; + default: + ldout(cct, 0) << "WARNING: got unknown notification op: " << info.op << dendl; + return -EINVAL; + } + + return 0; +} + +void RGWSI_SysObj_Cache::set_enabled(bool status) +{ + cache.set_enabled(status); +} + +bool RGWSI_SysObj_Cache::chain_cache_entry(std::initializer_list<rgw_cache_entry_info *> cache_info_entries, + RGWChainedCache::Entry *chained_entry) +{ + return cache.chain_cache_entry(cache_info_entries, chained_entry); +} + +void RGWSI_SysObj_Cache::register_chained_cache(RGWChainedCache *cc) +{ + cache.chain_cache(cc); +} + +void RGWSI_SysObj_Cache::unregister_chained_cache(RGWChainedCache *cc) +{ + cache.unchain_cache(cc); +} + +static void cache_list_dump_helper(Formatter* f, + const std::string& name, + const ceph::real_time mtime, + const std::uint64_t size) +{ + f->dump_string("name", name); + f->dump_string("mtime", ceph::to_iso_8601(mtime)); + f->dump_unsigned("size", size); +} + +void RGWSI_SysObj_Cache::call_list(const std::optional<std::string>& filter, Formatter* f) +{ + cache.for_each( + [this, &filter, f] (const string& name, const ObjectCacheEntry& entry) { + if (!filter || name.find(*filter) != name.npos) { + cache_list_dump_helper(f, name, entry.info.meta.mtime, + entry.info.meta.size); + } + }); +} + +int RGWSI_SysObj_Cache::call_inspect(const std::string& target, Formatter* f) +{ + if (const auto entry = cache.get(target)) { + f->open_object_section("cache_entry"); + f->dump_string("name", target.c_str()); + entry->dump(f); + f->close_section(); + return true; + } else { + return false; + } +} + +int RGWSI_SysObj_Cache::call_erase(const std::string& target) +{ + return cache.remove(target); +} + +int RGWSI_SysObj_Cache::call_zap() +{ + cache.invalidate_all(); + return 0; +} diff --git a/src/rgw/services/svc_sys_obj_cache.h b/src/rgw/services/svc_sys_obj_cache.h new file mode 100644 index 00000000..e48b64f2 --- /dev/null +++ b/src/rgw/services/svc_sys_obj_cache.h @@ -0,0 +1,176 @@ + +#ifndef CEPH_RGW_SERVICES_SYS_OBJ_CACHE_H +#define CEPH_RGW_SERVICES_SYS_OBJ_CACHE_H + + +#include "rgw/rgw_service.h" +#include "rgw/rgw_cache.h" + +#include "svc_sys_obj_core.h" + +class RGWSI_Notify; + +class RGWSI_SysObj_Cache_CB; + +class RGWSI_SysObj_Cache : public RGWSI_SysObj_Core +{ + friend class RGWSI_SysObj_Cache_CB; + friend class RGWServices_Def; + + RGWSI_Notify *notify_svc{nullptr}; + ObjectCache cache; + + std::shared_ptr<RGWSI_SysObj_Cache_CB> cb; + + void normalize_pool_and_obj(const rgw_pool& src_pool, const string& src_obj, rgw_pool& dst_pool, string& dst_obj); +protected: + void init(RGWSI_RADOS *_rados_svc, + RGWSI_Zone *_zone_svc, + RGWSI_Notify *_notify_svc) { + core_init(_rados_svc, _zone_svc); + notify_svc = _notify_svc; + } + + int do_start() override; + + int raw_stat(const rgw_raw_obj& obj, uint64_t *psize, real_time *pmtime, uint64_t *epoch, + map<string, bufferlist> *attrs, bufferlist *first_chunk, + RGWObjVersionTracker *objv_tracker) override; + + int read(RGWSysObjectCtxBase& obj_ctx, + GetObjState& read_state, + RGWObjVersionTracker *objv_tracker, + const rgw_raw_obj& obj, + bufferlist *bl, off_t ofs, off_t end, + map<string, bufferlist> *attrs, + bool raw_attrs, + rgw_cache_entry_info *cache_info, + boost::optional<obj_version>) override; + + int get_attr(const rgw_raw_obj& obj, const char *name, bufferlist *dest) override; + + int set_attrs(const rgw_raw_obj& obj, + map<string, bufferlist>& attrs, + map<string, bufferlist> *rmattrs, + RGWObjVersionTracker *objv_tracker); + + int remove(RGWSysObjectCtxBase& obj_ctx, + RGWObjVersionTracker *objv_tracker, + const rgw_raw_obj& obj) override; + + int write(const rgw_raw_obj& obj, + real_time *pmtime, + map<std::string, bufferlist>& attrs, + bool exclusive, + const bufferlist& data, + RGWObjVersionTracker *objv_tracker, + real_time set_mtime) override; + + int write_data(const rgw_raw_obj& obj, + const bufferlist& bl, + bool exclusive, + RGWObjVersionTracker *objv_tracker); + + int distribute_cache(const string& normal_name, const rgw_raw_obj& obj, ObjectCacheInfo& obj_info, int op); + + int watch_cb(uint64_t notify_id, + uint64_t cookie, + uint64_t notifier_id, + bufferlist& bl); + + void set_enabled(bool status); + +public: + RGWSI_SysObj_Cache(CephContext *cct) : RGWSI_SysObj_Core(cct) { + cache.set_ctx(cct); + } + + bool chain_cache_entry(std::initializer_list<rgw_cache_entry_info *> cache_info_entries, + RGWChainedCache::Entry *chained_entry); + void register_chained_cache(RGWChainedCache *cc); + void unregister_chained_cache(RGWChainedCache *cc); + + void call_list(const std::optional<std::string>& filter, Formatter* f); + int call_inspect(const std::string& target, Formatter* f); + int call_erase(const std::string& target); + int call_zap(); +}; + +template <class T> +class RGWChainedCacheImpl : public RGWChainedCache { + RGWSI_SysObj_Cache *svc{nullptr}; + ceph::timespan expiry; + RWLock lock; + + std::unordered_map<std::string, std::pair<T, ceph::coarse_mono_time>> entries; + +public: + RGWChainedCacheImpl() : lock("RGWChainedCacheImpl::lock") {} + ~RGWChainedCacheImpl() { + if (!svc) { + return; + } + svc->unregister_chained_cache(this); + } + + void unregistered() override { + svc = nullptr; + } + + void init(RGWSI_SysObj_Cache *_svc) { + if (!_svc) { + return; + } + svc = _svc; + svc->register_chained_cache(this); + expiry = std::chrono::seconds(svc->ctx()->_conf.get_val<uint64_t>( + "rgw_cache_expiry_interval")); + } + + boost::optional<T> find(const string& key) { + RWLock::RLocker rl(lock); + auto iter = entries.find(key); + if (iter == entries.end()) { + return boost::none; + } + if (expiry.count() && + (ceph::coarse_mono_clock::now() - iter->second.second) > expiry) { + return boost::none; + } + + return iter->second.first; + } + + bool put(RGWSI_SysObj_Cache *svc, const string& key, T *entry, + std::initializer_list<rgw_cache_entry_info *> cache_info_entries) { + if (!svc) { + return false; + } + + Entry chain_entry(this, key, entry); + + /* we need the svc cache to call us under its lock to maintain lock ordering */ + return svc->chain_cache_entry(cache_info_entries, &chain_entry); + } + + void chain_cb(const string& key, void *data) override { + T *entry = static_cast<T *>(data); + RWLock::WLocker wl(lock); + entries[key].first = *entry; + if (expiry.count() > 0) { + entries[key].second = ceph::coarse_mono_clock::now(); + } + } + + void invalidate(const string& key) override { + RWLock::WLocker wl(lock); + entries.erase(key); + } + + void invalidate_all() override { + RWLock::WLocker wl(lock); + entries.clear(); + } +}; /* RGWChainedCacheImpl */ + +#endif diff --git a/src/rgw/services/svc_sys_obj_core.cc b/src/rgw/services/svc_sys_obj_core.cc new file mode 100644 index 00000000..ead6aebd --- /dev/null +++ b/src/rgw/services/svc_sys_obj_core.cc @@ -0,0 +1,595 @@ +#include "svc_sys_obj_core.h" +#include "svc_rados.h" +#include "svc_zone.h" + +#include "rgw/rgw_tools.h" + +#define dout_subsys ceph_subsys_rgw + +int RGWSI_SysObj_Core::GetObjState::get_rados_obj(RGWSI_RADOS *rados_svc, + RGWSI_Zone *zone_svc, + const rgw_raw_obj& obj, + RGWSI_RADOS::Obj **pobj) +{ + if (!has_rados_obj) { + if (obj.oid.empty()) { + ldout(rados_svc->ctx(), 0) << "ERROR: obj.oid is empty" << dendl; + return -EINVAL; + } + + rados_obj = rados_svc->obj(obj); + int r = rados_obj.open(); + if (r < 0) { + return r; + } + has_rados_obj = true; + } + *pobj = &rados_obj; + return 0; +} + +int RGWSI_SysObj_Core::get_rados_obj(RGWSI_Zone *zone_svc, + const rgw_raw_obj& obj, + RGWSI_RADOS::Obj *pobj) +{ + if (obj.oid.empty()) { + ldout(rados_svc->ctx(), 0) << "ERROR: obj.oid is empty" << dendl; + return -EINVAL; + } + + *pobj = std::move(rados_svc->obj(obj)); + int r = pobj->open(); + if (r < 0) { + return r; + } + + return 0; +} + +int RGWSI_SysObj_Core::get_system_obj_state_impl(RGWSysObjectCtxBase *rctx, const rgw_raw_obj& obj, RGWSysObjState **state, RGWObjVersionTracker *objv_tracker) +{ + if (obj.empty()) { + return -EINVAL; + } + + RGWSysObjState *s = rctx->get_state(obj); + ldout(cct, 20) << "get_system_obj_state: rctx=" << (void *)rctx << " obj=" << obj << " state=" << (void *)s << " s->prefetch_data=" << s->prefetch_data << dendl; + *state = s; + if (s->has_attrs) { + return 0; + } + + s->obj = obj; + + int r = raw_stat(obj, &s->size, &s->mtime, &s->epoch, &s->attrset, (s->prefetch_data ? &s->data : nullptr), objv_tracker); + if (r == -ENOENT) { + s->exists = false; + s->has_attrs = true; + s->mtime = real_time(); + return 0; + } + if (r < 0) + return r; + + s->exists = true; + s->has_attrs = true; + s->obj_tag = s->attrset[RGW_ATTR_ID_TAG]; + + if (s->obj_tag.length()) + ldout(cct, 20) << "get_system_obj_state: setting s->obj_tag to " + << s->obj_tag.c_str() << dendl; + else + ldout(cct, 20) << "get_system_obj_state: s->obj_tag was set empty" << dendl; + + return 0; +} + +int RGWSI_SysObj_Core::get_system_obj_state(RGWSysObjectCtxBase *rctx, const rgw_raw_obj& obj, RGWSysObjState **state, RGWObjVersionTracker *objv_tracker) +{ + int ret; + + do { + ret = get_system_obj_state_impl(rctx, obj, state, objv_tracker); + } while (ret == -EAGAIN); + + return ret; +} + +int RGWSI_SysObj_Core::raw_stat(const rgw_raw_obj& obj, uint64_t *psize, real_time *pmtime, uint64_t *epoch, + map<string, bufferlist> *attrs, bufferlist *first_chunk, + RGWObjVersionTracker *objv_tracker) +{ + RGWSI_RADOS::Obj rados_obj; + int r = get_rados_obj(zone_svc, obj, &rados_obj); + if (r < 0) { + return r; + } + + uint64_t size = 0; + struct timespec mtime_ts; + + librados::ObjectReadOperation op; + if (objv_tracker) { + objv_tracker->prepare_op_for_read(&op); + } + op.getxattrs(attrs, nullptr); + if (psize || pmtime) { + op.stat2(&size, &mtime_ts, nullptr); + } + if (first_chunk) { + op.read(0, cct->_conf->rgw_max_chunk_size, first_chunk, nullptr); + } + bufferlist outbl; + r = rados_obj.operate(&op, &outbl, null_yield); + + if (epoch) { + *epoch = rados_obj.get_last_version(); + } + + if (r < 0) + return r; + + if (psize) + *psize = size; + if (pmtime) + *pmtime = ceph::real_clock::from_timespec(mtime_ts); + + return 0; +} + +int RGWSI_SysObj_Core::stat(RGWSysObjectCtxBase& obj_ctx, + GetObjState& state, + const rgw_raw_obj& obj, + map<string, bufferlist> *attrs, + bool raw_attrs, + real_time *lastmod, + uint64_t *obj_size, + RGWObjVersionTracker *objv_tracker) +{ + RGWSysObjState *astate = nullptr; + + int r = get_system_obj_state(&obj_ctx, obj, &astate, objv_tracker); + if (r < 0) + return r; + + if (!astate->exists) { + return -ENOENT; + } + + if (attrs) { + if (raw_attrs) { + *attrs = astate->attrset; + } else { + rgw_filter_attrset(astate->attrset, RGW_ATTR_PREFIX, attrs); + } + if (cct->_conf->subsys.should_gather<ceph_subsys_rgw, 20>()) { + map<string, bufferlist>::iterator iter; + for (iter = attrs->begin(); iter != attrs->end(); ++iter) { + ldout(cct, 20) << "Read xattr: " << iter->first << dendl; + } + } + } + + if (obj_size) + *obj_size = astate->size; + if (lastmod) + *lastmod = astate->mtime; + + return 0; +} + +int RGWSI_SysObj_Core::read(RGWSysObjectCtxBase& obj_ctx, + GetObjState& read_state, + RGWObjVersionTracker *objv_tracker, + const rgw_raw_obj& obj, + bufferlist *bl, off_t ofs, off_t end, + map<string, bufferlist> *attrs, + bool raw_attrs, + rgw_cache_entry_info *cache_info, + boost::optional<obj_version>) +{ + uint64_t len; + librados::ObjectReadOperation op; + + if (end < 0) + len = 0; + else + len = end - ofs + 1; + + if (objv_tracker) { + objv_tracker->prepare_op_for_read(&op); + } + + ldout(cct, 20) << "rados->read ofs=" << ofs << " len=" << len << dendl; + op.read(ofs, len, bl, nullptr); + + map<string, bufferlist> unfiltered_attrset; + + if (attrs) { + if (raw_attrs) { + op.getxattrs(attrs, nullptr); + } else { + op.getxattrs(&unfiltered_attrset, nullptr); + } + } + + RGWSI_RADOS::Obj rados_obj; + int r = get_rados_obj(zone_svc, obj, &rados_obj); + if (r < 0) { + ldout(cct, 20) << "get_rados_obj() on obj=" << obj << " returned " << r << dendl; + return r; + } + r = rados_obj.operate(&op, nullptr, null_yield); + if (r < 0) { + ldout(cct, 20) << "rados_obj.operate() r=" << r << " bl.length=" << bl->length() << dendl; + return r; + } + ldout(cct, 20) << "rados_obj.operate() r=" << r << " bl.length=" << bl->length() << dendl; + + uint64_t op_ver = rados_obj.get_last_version(); + + if (read_state.last_ver > 0 && + read_state.last_ver != op_ver) { + ldout(cct, 5) << "raced with an object write, abort" << dendl; + return -ECANCELED; + } + + if (attrs && !raw_attrs) { + rgw_filter_attrset(unfiltered_attrset, RGW_ATTR_PREFIX, attrs); + } + + read_state.last_ver = op_ver; + + return bl->length(); +} + +/** + * Get an attribute for a system object. + * obj: the object to get attr + * name: name of the attr to retrieve + * dest: bufferlist to store the result in + * Returns: 0 on success, -ERR# otherwise. + */ +int RGWSI_SysObj_Core::get_attr(const rgw_raw_obj& obj, + const char *name, + bufferlist *dest) +{ + RGWSI_RADOS::Obj rados_obj; + int r = get_rados_obj(zone_svc, obj, &rados_obj); + if (r < 0) { + ldout(cct, 20) << "get_rados_obj() on obj=" << obj << " returned " << r << dendl; + return r; + } + + librados::ObjectReadOperation op; + + int rval; + op.getxattr(name, dest, &rval); + + r = rados_obj.operate(&op, nullptr, null_yield); + if (r < 0) + return r; + + return 0; +} + +int RGWSI_SysObj_Core::set_attrs(const rgw_raw_obj& obj, + map<string, bufferlist>& attrs, + map<string, bufferlist> *rmattrs, + RGWObjVersionTracker *objv_tracker) +{ + RGWSI_RADOS::Obj rados_obj; + int r = get_rados_obj(zone_svc, obj, &rados_obj); + if (r < 0) { + ldout(cct, 20) << "get_rados_obj() on obj=" << obj << " returned " << r << dendl; + return r; + } + + librados::ObjectWriteOperation op; + + if (objv_tracker) { + objv_tracker->prepare_op_for_write(&op); + } + + map<string, bufferlist>::iterator iter; + if (rmattrs) { + for (iter = rmattrs->begin(); iter != rmattrs->end(); ++iter) { + const string& name = iter->first; + op.rmxattr(name.c_str()); + } + } + + for (iter = attrs.begin(); iter != attrs.end(); ++iter) { + const string& name = iter->first; + bufferlist& bl = iter->second; + + if (!bl.length()) + continue; + + op.setxattr(name.c_str(), bl); + } + + if (!op.size()) + return 0; + + bufferlist bl; + + r = rados_obj.operate(&op, null_yield); + if (r < 0) + return r; + + return 0; +} + +int RGWSI_SysObj_Core::omap_get_vals(const rgw_raw_obj& obj, + const string& marker, + uint64_t count, + std::map<string, bufferlist> *m, + bool *pmore) +{ + RGWSI_RADOS::Obj rados_obj; + int r = get_rados_obj(zone_svc, obj, &rados_obj); + if (r < 0) { + ldout(cct, 20) << "get_rados_obj() on obj=" << obj << " returned " << r << dendl; + return r; + } + + string start_after = marker; + bool more; + + do { + librados::ObjectReadOperation op; + + std::map<string, bufferlist> t; + int rval; + op.omap_get_vals2(start_after, count, &t, &more, &rval); + + r = rados_obj.operate(&op, nullptr, null_yield); + if (r < 0) { + return r; + } + if (t.empty()) { + break; + } + count -= t.size(); + start_after = t.rbegin()->first; + m->insert(t.begin(), t.end()); + } while (more && count > 0); + + if (pmore) { + *pmore = more; + } + return 0; +} + +int RGWSI_SysObj_Core::omap_get_all(const rgw_raw_obj& obj, std::map<string, bufferlist> *m) +{ + RGWSI_RADOS::Obj rados_obj; + int r = get_rados_obj(zone_svc, obj, &rados_obj); + if (r < 0) { + ldout(cct, 20) << "get_rados_obj() on obj=" << obj << " returned " << r << dendl; + return r; + } + +#define MAX_OMAP_GET_ENTRIES 1024 + const int count = MAX_OMAP_GET_ENTRIES; + string start_after; + bool more; + + do { + librados::ObjectReadOperation op; + + std::map<string, bufferlist> t; + int rval; + op.omap_get_vals2(start_after, count, &t, &more, &rval); + + r = rados_obj.operate(&op, nullptr, null_yield); + if (r < 0) { + return r; + } + if (t.empty()) { + break; + } + start_after = t.rbegin()->first; + m->insert(t.begin(), t.end()); + } while (more); + return 0; +} + +int RGWSI_SysObj_Core::omap_set(const rgw_raw_obj& obj, const std::string& key, bufferlist& bl, bool must_exist) +{ + RGWSI_RADOS::Obj rados_obj; + int r = get_rados_obj(zone_svc, obj, &rados_obj); + if (r < 0) { + ldout(cct, 20) << "get_rados_obj() on obj=" << obj << " returned " << r << dendl; + return r; + } + + ldout(cct, 15) << "omap_set obj=" << obj << " key=" << key << dendl; + + map<string, bufferlist> m; + m[key] = bl; + librados::ObjectWriteOperation op; + if (must_exist) + op.assert_exists(); + op.omap_set(m); + r = rados_obj.operate(&op, null_yield); + return r; +} + +int RGWSI_SysObj_Core::omap_set(const rgw_raw_obj& obj, const std::map<std::string, bufferlist>& m, bool must_exist) +{ + RGWSI_RADOS::Obj rados_obj; + int r = get_rados_obj(zone_svc, obj, &rados_obj); + if (r < 0) { + ldout(cct, 20) << "get_rados_obj() on obj=" << obj << " returned " << r << dendl; + return r; + } + + librados::ObjectWriteOperation op; + if (must_exist) + op.assert_exists(); + op.omap_set(m); + r = rados_obj.operate(&op, null_yield); + return r; +} + +int RGWSI_SysObj_Core::omap_del(const rgw_raw_obj& obj, const std::string& key) +{ + RGWSI_RADOS::Obj rados_obj; + int r = get_rados_obj(zone_svc, obj, &rados_obj); + if (r < 0) { + ldout(cct, 20) << "get_rados_obj() on obj=" << obj << " returned " << r << dendl; + return r; + } + + set<string> k; + k.insert(key); + + librados::ObjectWriteOperation op; + + op.omap_rm_keys(k); + + r = rados_obj.operate(&op, null_yield); + return r; +} + +int RGWSI_SysObj_Core::notify(const rgw_raw_obj& obj, + bufferlist& bl, + uint64_t timeout_ms, + bufferlist *pbl) +{ + RGWSI_RADOS::Obj rados_obj; + int r = get_rados_obj(zone_svc, obj, &rados_obj); + if (r < 0) { + ldout(cct, 20) << "get_rados_obj() on obj=" << obj << " returned " << r << dendl; + return r; + } + + r = rados_obj.notify(bl, timeout_ms, pbl); + return r; +} + +int RGWSI_SysObj_Core::remove(RGWSysObjectCtxBase& obj_ctx, + RGWObjVersionTracker *objv_tracker, + const rgw_raw_obj& obj) +{ + RGWSI_RADOS::Obj rados_obj; + int r = get_rados_obj(zone_svc, obj, &rados_obj); + if (r < 0) { + ldout(cct, 20) << "get_rados_obj() on obj=" << obj << " returned " << r << dendl; + return r; + } + + librados::ObjectWriteOperation op; + + if (objv_tracker) { + objv_tracker->prepare_op_for_write(&op); + } + + op.remove(); + r = rados_obj.operate(&op, null_yield); + if (r < 0) + return r; + + return 0; +} + +int RGWSI_SysObj_Core::write(const rgw_raw_obj& obj, + real_time *pmtime, + map<std::string, bufferlist>& attrs, + bool exclusive, + const bufferlist& data, + RGWObjVersionTracker *objv_tracker, + real_time set_mtime) +{ + RGWSI_RADOS::Obj rados_obj; + int r = get_rados_obj(zone_svc, obj, &rados_obj); + if (r < 0) { + ldout(cct, 20) << "get_rados_obj() on obj=" << obj << " returned " << r << dendl; + return r; + } + + librados::ObjectWriteOperation op; + + if (exclusive) { + op.create(true); // exclusive create + } else { + op.remove(); + op.set_op_flags2(LIBRADOS_OP_FLAG_FAILOK); + op.create(false); + } + + if (objv_tracker) { + objv_tracker->prepare_op_for_write(&op); + } + + if (real_clock::is_zero(set_mtime)) { + set_mtime = real_clock::now(); + } + + struct timespec mtime_ts = real_clock::to_timespec(set_mtime); + op.mtime2(&mtime_ts); + op.write_full(data); + + bufferlist acl_bl; + + for (map<string, bufferlist>::iterator iter = attrs.begin(); iter != attrs.end(); ++iter) { + const string& name = iter->first; + bufferlist& bl = iter->second; + + if (!bl.length()) + continue; + + op.setxattr(name.c_str(), bl); + } + + r = rados_obj.operate(&op, null_yield); + if (r < 0) { + return r; + } + + if (objv_tracker) { + objv_tracker->apply_write(); + } + + if (pmtime) { + *pmtime = set_mtime; + } + + return 0; +} + + +int RGWSI_SysObj_Core::write_data(const rgw_raw_obj& obj, + const bufferlist& bl, + bool exclusive, + RGWObjVersionTracker *objv_tracker) +{ + RGWSI_RADOS::Obj rados_obj; + int r = get_rados_obj(zone_svc, obj, &rados_obj); + if (r < 0) { + ldout(cct, 20) << "get_rados_obj() on obj=" << obj << " returned " << r << dendl; + return r; + } + + librados::ObjectWriteOperation op; + + if (exclusive) { + op.create(true); + } + + if (objv_tracker) { + objv_tracker->prepare_op_for_write(&op); + } + op.write_full(bl); + r = rados_obj.operate(&op, null_yield); + if (r < 0) + return r; + + if (objv_tracker) { + objv_tracker->apply_write(); + } + return 0; +} + diff --git a/src/rgw/services/svc_sys_obj_core.h b/src/rgw/services/svc_sys_obj_core.h new file mode 100644 index 00000000..d033267e --- /dev/null +++ b/src/rgw/services/svc_sys_obj_core.h @@ -0,0 +1,201 @@ +#ifndef CEPH_RGW_SERVICES_SYS_OBJ_CORE_H +#define CEPH_RGW_SERVICES_SYS_OBJ_CORE_H + + +#include "rgw/rgw_service.h" + +#include "svc_rados.h" + + +class RGWSI_Zone; + +struct rgw_cache_entry_info; + +struct RGWSysObjState { + rgw_raw_obj obj; + bool has_attrs{false}; + bool exists{false}; + uint64_t size{0}; + ceph::real_time mtime; + uint64_t epoch{0}; + bufferlist obj_tag; + bool has_data{false}; + bufferlist data; + bool prefetch_data{false}; + uint64_t pg_ver{0}; + + /* important! don't forget to update copy constructor */ + + RGWObjVersionTracker objv_tracker; + + map<string, bufferlist> attrset; + RGWSysObjState() {} + RGWSysObjState(const RGWSysObjState& rhs) : obj (rhs.obj) { + has_attrs = rhs.has_attrs; + exists = rhs.exists; + size = rhs.size; + mtime = rhs.mtime; + epoch = rhs.epoch; + if (rhs.obj_tag.length()) { + obj_tag = rhs.obj_tag; + } + has_data = rhs.has_data; + if (rhs.data.length()) { + data = rhs.data; + } + prefetch_data = rhs.prefetch_data; + pg_ver = rhs.pg_ver; + objv_tracker = rhs.objv_tracker; + } +}; + +class RGWSysObjectCtxBase { + std::map<rgw_raw_obj, RGWSysObjState> objs_state; + RWLock lock; + +public: + explicit RGWSysObjectCtxBase() : lock("RGWSysObjectCtxBase") {} + + RGWSysObjectCtxBase(const RGWSysObjectCtxBase& rhs) : objs_state(rhs.objs_state), + lock("RGWSysObjectCtxBase") {} + RGWSysObjectCtxBase(const RGWSysObjectCtxBase&& rhs) : objs_state(std::move(rhs.objs_state)), + lock("RGWSysObjectCtxBase") {} + + RGWSysObjState *get_state(const rgw_raw_obj& obj) { + RGWSysObjState *result; + std::map<rgw_raw_obj, RGWSysObjState>::iterator iter; + lock.get_read(); + assert (!obj.empty()); + iter = objs_state.find(obj); + if (iter != objs_state.end()) { + result = &iter->second; + lock.unlock(); + } else { + lock.unlock(); + lock.get_write(); + result = &objs_state[obj]; + lock.unlock(); + } + return result; + } + + void set_prefetch_data(rgw_raw_obj& obj) { + RWLock::WLocker wl(lock); + assert (!obj.empty()); + objs_state[obj].prefetch_data = true; + } + void invalidate(rgw_raw_obj& obj) { + RWLock::WLocker wl(lock); + auto iter = objs_state.find(obj); + if (iter == objs_state.end()) { + return; + } + objs_state.erase(iter); + } +}; + +class RGWSI_SysObj_Core : public RGWServiceInstance +{ + friend class RGWServices_Def; + friend class RGWSI_SysObj; + +protected: + RGWSI_RADOS *rados_svc{nullptr}; + RGWSI_Zone *zone_svc{nullptr}; + + struct GetObjState { + RGWSI_RADOS::Obj rados_obj; + bool has_rados_obj{false}; + uint64_t last_ver{0}; + + GetObjState() {} + + int get_rados_obj(RGWSI_RADOS *rados_svc, + RGWSI_Zone *zone_svc, + const rgw_raw_obj& obj, + RGWSI_RADOS::Obj **pobj); + }; + + + void core_init(RGWSI_RADOS *_rados_svc, + RGWSI_Zone *_zone_svc) { + rados_svc = _rados_svc; + zone_svc = _zone_svc; + } + int get_rados_obj(RGWSI_Zone *zone_svc, const rgw_raw_obj& obj, RGWSI_RADOS::Obj *pobj); + + virtual int raw_stat(const rgw_raw_obj& obj, uint64_t *psize, real_time *pmtime, uint64_t *epoch, + map<string, bufferlist> *attrs, bufferlist *first_chunk, + RGWObjVersionTracker *objv_tracker); + + virtual int read(RGWSysObjectCtxBase& obj_ctx, + GetObjState& read_state, + RGWObjVersionTracker *objv_tracker, + const rgw_raw_obj& obj, + bufferlist *bl, off_t ofs, off_t end, + map<string, bufferlist> *attrs, + bool raw_attrs, + rgw_cache_entry_info *cache_info, + boost::optional<obj_version>); + + virtual int remove(RGWSysObjectCtxBase& obj_ctx, + RGWObjVersionTracker *objv_tracker, + const rgw_raw_obj& obj); + + virtual int write(const rgw_raw_obj& obj, + real_time *pmtime, + map<std::string, bufferlist>& attrs, + bool exclusive, + const bufferlist& data, + RGWObjVersionTracker *objv_tracker, + real_time set_mtime); + + virtual int write_data(const rgw_raw_obj& obj, + const bufferlist& bl, + bool exclusive, + RGWObjVersionTracker *objv_tracker); + + virtual int get_attr(const rgw_raw_obj& obj, const char *name, bufferlist *dest); + + virtual int set_attrs(const rgw_raw_obj& obj, + map<string, bufferlist>& attrs, + map<string, bufferlist> *rmattrs, + RGWObjVersionTracker *objv_tracker); + + virtual int omap_get_all(const rgw_raw_obj& obj, std::map<string, bufferlist> *m); + virtual int omap_get_vals(const rgw_raw_obj& obj, + const string& marker, + uint64_t count, + std::map<string, bufferlist> *m, + bool *pmore); + virtual int omap_set(const rgw_raw_obj& obj, const std::string& key, bufferlist& bl, bool must_exist = false); + virtual int omap_set(const rgw_raw_obj& obj, const map<std::string, bufferlist>& m, bool must_exist = false); + virtual int omap_del(const rgw_raw_obj& obj, const std::string& key); + + virtual int notify(const rgw_raw_obj& obj, + bufferlist& bl, + uint64_t timeout_ms, + bufferlist *pbl); + + /* wrappers */ + int get_system_obj_state_impl(RGWSysObjectCtxBase *rctx, const rgw_raw_obj& obj, RGWSysObjState **state, RGWObjVersionTracker *objv_tracker); + int get_system_obj_state(RGWSysObjectCtxBase *rctx, const rgw_raw_obj& obj, RGWSysObjState **state, RGWObjVersionTracker *objv_tracker); + + int stat(RGWSysObjectCtxBase& obj_ctx, + GetObjState& state, + const rgw_raw_obj& obj, + map<string, bufferlist> *attrs, + bool raw_attrs, + real_time *lastmod, + uint64_t *obj_size, + RGWObjVersionTracker *objv_tracker); + +public: + RGWSI_SysObj_Core(CephContext *cct): RGWServiceInstance(cct) {} + + RGWSI_Zone *get_zone_svc() { + return zone_svc; + } +}; + +#endif diff --git a/src/rgw/services/svc_zone.cc b/src/rgw/services/svc_zone.cc new file mode 100644 index 00000000..724f83ae --- /dev/null +++ b/src/rgw/services/svc_zone.cc @@ -0,0 +1,1250 @@ +#include "svc_zone.h" +#include "svc_rados.h" +#include "svc_sys_obj.h" +#include "svc_sync_modules.h" + +#include "rgw/rgw_zone.h" +#include "rgw/rgw_rest_conn.h" + +#include "common/errno.h" +#include "include/random.h" + +#define dout_subsys ceph_subsys_rgw + +using namespace rgw_zone_defaults; + +RGWSI_Zone::RGWSI_Zone(CephContext *cct) : RGWServiceInstance(cct) +{ +} + +void RGWSI_Zone::init(RGWSI_SysObj *_sysobj_svc, + RGWSI_RADOS * _rados_svc, + RGWSI_SyncModules * _sync_modules_svc) +{ + sysobj_svc = _sysobj_svc; + rados_svc = _rados_svc; + sync_modules_svc = _sync_modules_svc; + + realm = new RGWRealm(); + zonegroup = new RGWZoneGroup(); + zone_public_config = new RGWZone(); + zone_params = new RGWZoneParams(); + current_period = new RGWPeriod(); +} + +RGWSI_Zone::~RGWSI_Zone() +{ + delete realm; + delete zonegroup; + delete zone_public_config; + delete zone_params; + delete current_period; +} + +bool RGWSI_Zone::zone_syncs_from(const RGWZone& target_zone, const RGWZone& source_zone) const +{ + return target_zone.syncs_from(source_zone.name) && + sync_modules_svc->get_manager()->supports_data_export(source_zone.tier_type); +} + +int RGWSI_Zone::do_start() +{ + int ret = sysobj_svc->start(); + if (ret < 0) { + return ret; + } + + assert(sysobj_svc->is_started()); /* if not then there's ordering issue */ + + ret = rados_svc->start(); + if (ret < 0) { + return ret; + } + ret = sync_modules_svc->start(); + if (ret < 0) { + return ret; + } + ret = realm->init(cct, sysobj_svc); + if (ret < 0 && ret != -ENOENT) { + ldout(cct, 0) << "failed reading realm info: ret "<< ret << " " << cpp_strerror(-ret) << dendl; + return ret; + } else if (ret != -ENOENT) { + ldout(cct, 20) << "realm " << realm->get_name() << " " << realm->get_id() << dendl; + ret = current_period->init(cct, sysobj_svc, realm->get_id(), realm->get_name()); + if (ret < 0 && ret != -ENOENT) { + ldout(cct, 0) << "failed reading current period info: " << " " << cpp_strerror(-ret) << dendl; + return ret; + } + ldout(cct, 20) << "current period " << current_period->get_id() << dendl; + } + + ret = replace_region_with_zonegroup(); + if (ret < 0) { + lderr(cct) << "failed converting region to zonegroup : ret "<< ret << " " << cpp_strerror(-ret) << dendl; + return ret; + } + + ret = convert_regionmap(); + if (ret < 0) { + lderr(cct) << "failed converting regionmap: " << cpp_strerror(-ret) << dendl; + return ret; + } + + bool zg_initialized = false; + + if (!current_period->get_id().empty()) { + ret = init_zg_from_period(&zg_initialized); + if (ret < 0) { + return ret; + } + } + + bool creating_defaults = false; + bool using_local = (!zg_initialized); + if (using_local) { + ldout(cct, 10) << " cannot find current period zonegroup using local zonegroup" << dendl; + ret = init_zg_from_local(&creating_defaults); + if (ret < 0) { + return ret; + } + // read period_config into current_period + auto& period_config = current_period->get_config(); + ret = period_config.read(sysobj_svc, zonegroup->realm_id); + if (ret < 0 && ret != -ENOENT) { + ldout(cct, 0) << "ERROR: failed to read period config: " + << cpp_strerror(ret) << dendl; + return ret; + } + } + + ldout(cct, 10) << "Cannot find current period zone using local zone" << dendl; + if (creating_defaults && cct->_conf->rgw_zone.empty()) { + ldout(cct, 10) << " Using default name "<< default_zone_name << dendl; + zone_params->set_name(default_zone_name); + } + + ret = zone_params->init(cct, sysobj_svc); + if (ret < 0 && ret != -ENOENT) { + lderr(cct) << "failed reading zone info: ret "<< ret << " " << cpp_strerror(-ret) << dendl; + return ret; + } + auto zone_iter = zonegroup->zones.find(zone_params->get_id()); + if (zone_iter == zonegroup->zones.end()) { + if (using_local) { + lderr(cct) << "Cannot find zone id=" << zone_params->get_id() << " (name=" << zone_params->get_name() << ")" << dendl; + return -EINVAL; + } + ldout(cct, 1) << "Cannot find zone id=" << zone_params->get_id() << " (name=" << zone_params->get_name() << "), switching to local zonegroup configuration" << dendl; + ret = init_zg_from_local(&creating_defaults); + if (ret < 0) { + return ret; + } + zone_iter = zonegroup->zones.find(zone_params->get_id()); + } + if (zone_iter != zonegroup->zones.end()) { + *zone_public_config = zone_iter->second; + ldout(cct, 20) << "zone " << zone_params->get_name() << dendl; + } else { + lderr(cct) << "Cannot find zone id=" << zone_params->get_id() << " (name=" << zone_params->get_name() << ")" << dendl; + return -EINVAL; + } + + zone_short_id = current_period->get_map().get_zone_short_id(zone_params->get_id()); + + RGWSyncModuleRef sm; + if (!sync_modules_svc->get_manager()->get_module(zone_public_config->tier_type, &sm)) { + lderr(cct) << "ERROR: tier type not found: " << zone_public_config->tier_type << dendl; + return -EINVAL; + } + + writeable_zone = sm->supports_writes(); + + /* first build all zones index */ + for (auto ziter : zonegroup->zones) { + const string& id = ziter.first; + RGWZone& z = ziter.second; + zone_id_by_name[z.name] = id; + zone_by_id[id] = z; + } + + if (zone_by_id.find(zone_id()) == zone_by_id.end()) { + ldout(cct, 0) << "WARNING: could not find zone config in zonegroup for local zone (" << zone_id() << "), will use defaults" << dendl; + } + *zone_public_config = zone_by_id[zone_id()]; + for (const auto& ziter : zonegroup->zones) { + const string& id = ziter.first; + const RGWZone& z = ziter.second; + if (id == zone_id()) { + continue; + } + if (z.endpoints.empty()) { + ldout(cct, 0) << "WARNING: can't generate connection for zone " << z.id << " id " << z.name << ": no endpoints defined" << dendl; + continue; + } + ldout(cct, 20) << "generating connection object for zone " << z.name << " id " << z.id << dendl; + RGWRESTConn *conn = new RGWRESTConn(cct, this, z.id, z.endpoints); + zone_conn_map[id] = conn; + if (zone_syncs_from(*zone_public_config, z) || + zone_syncs_from(z, *zone_public_config)) { + if (zone_syncs_from(*zone_public_config, z)) { + data_sync_source_zones.push_back(&z); + } + if (zone_syncs_from(z, *zone_public_config)) { + zone_data_notify_to_map[id] = conn; + } + } else { + ldout(cct, 20) << "NOTICE: not syncing to/from zone " << z.name << " id " << z.id << dendl; + } + } + + return 0; +} + +void RGWSI_Zone::shutdown() +{ + delete rest_master_conn; + + map<string, RGWRESTConn *>::iterator iter; + for (iter = zone_conn_map.begin(); iter != zone_conn_map.end(); ++iter) { + RGWRESTConn *conn = iter->second; + delete conn; + } + + for (iter = zonegroup_conn_map.begin(); iter != zonegroup_conn_map.end(); ++iter) { + RGWRESTConn *conn = iter->second; + delete conn; + } +} + +int RGWSI_Zone::list_regions(list<string>& regions) +{ + RGWZoneGroup zonegroup; + RGWSI_SysObj::Pool syspool = sysobj_svc->get_pool(zonegroup.get_pool(cct)); + + return syspool.op().list_prefixed_objs(region_info_oid_prefix, ®ions); +} + +int RGWSI_Zone::list_zonegroups(list<string>& zonegroups) +{ + RGWZoneGroup zonegroup; + RGWSI_SysObj::Pool syspool = sysobj_svc->get_pool(zonegroup.get_pool(cct)); + + return syspool.op().list_prefixed_objs(zonegroup_names_oid_prefix, &zonegroups); +} + +int RGWSI_Zone::list_zones(list<string>& zones) +{ + RGWZoneParams zoneparams; + RGWSI_SysObj::Pool syspool = sysobj_svc->get_pool(zoneparams.get_pool(cct)); + + return syspool.op().list_prefixed_objs(zone_names_oid_prefix, &zones); +} + +int RGWSI_Zone::list_realms(list<string>& realms) +{ + RGWRealm realm(cct, sysobj_svc); + RGWSI_SysObj::Pool syspool = sysobj_svc->get_pool(realm.get_pool(cct)); + + return syspool.op().list_prefixed_objs(realm_names_oid_prefix, &realms); +} + +int RGWSI_Zone::list_periods(list<string>& periods) +{ + RGWPeriod period; + list<string> raw_periods; + RGWSI_SysObj::Pool syspool = sysobj_svc->get_pool(period.get_pool(cct)); + int ret = syspool.op().list_prefixed_objs(period.get_info_oid_prefix(), &raw_periods); + if (ret < 0) { + return ret; + } + for (const auto& oid : raw_periods) { + size_t pos = oid.find("."); + if (pos != std::string::npos) { + periods.push_back(oid.substr(0, pos)); + } else { + periods.push_back(oid); + } + } + periods.sort(); // unique() only detects duplicates if they're adjacent + periods.unique(); + return 0; +} + + +int RGWSI_Zone::list_periods(const string& current_period, list<string>& periods) +{ + int ret = 0; + string period_id = current_period; + while(!period_id.empty()) { + RGWPeriod period(period_id); + ret = period.init(cct, sysobj_svc); + if (ret < 0) { + return ret; + } + periods.push_back(period.get_id()); + period_id = period.get_predecessor(); + } + + return ret; +} + +/** + * Replace all region configuration with zonegroup for + * backward compatability + * Returns 0 on success, -ERR# on failure. + */ +int RGWSI_Zone::replace_region_with_zonegroup() +{ + /* copy default region */ + /* convert default region to default zonegroup */ + string default_oid = cct->_conf->rgw_default_region_info_oid; + if (default_oid.empty()) { + default_oid = default_region_info_oid; + } + + RGWZoneGroup default_zonegroup; + rgw_pool pool{default_zonegroup.get_pool(cct)}; + string oid = "converted"; + bufferlist bl; + + RGWSysObjectCtx obj_ctx = sysobj_svc->init_obj_ctx(); + RGWSysObj sysobj = sysobj_svc->get_obj(obj_ctx, rgw_raw_obj(pool, oid)); + + int ret = sysobj.rop().read(&bl); + if (ret < 0 && ret != -ENOENT) { + ldout(cct, 0) << __func__ << " failed to read converted: ret "<< ret << " " << cpp_strerror(-ret) + << dendl; + return ret; + } else if (ret != -ENOENT) { + ldout(cct, 20) << "System already converted " << dendl; + return 0; + } + + string default_region; + ret = default_zonegroup.init(cct, sysobj_svc, false, true); + if (ret < 0) { + ldout(cct, 0) << __func__ << " failed init default region: ret "<< ret << " " << cpp_strerror(-ret) << dendl; + return ret; + } + ret = default_zonegroup.read_default_id(default_region, true); + if (ret < 0 && ret != -ENOENT) { + ldout(cct, 0) << __func__ << " failed reading old default region: ret "<< ret << " " << cpp_strerror(-ret) << dendl; + return ret; + } + + /* convert regions to zonegroups */ + list<string> regions; + ret = list_regions(regions); + if (ret < 0 && ret != -ENOENT) { + ldout(cct, 0) << __func__ << " failed to list regions: ret "<< ret << " " << cpp_strerror(-ret) << dendl; + return ret; + } else if (ret == -ENOENT || regions.empty()) { + RGWZoneParams zoneparams(default_zone_name); + int ret = zoneparams.init(cct, sysobj_svc); + if (ret < 0 && ret != -ENOENT) { + ldout(cct, 0) << __func__ << ": error initializing default zone params: " << cpp_strerror(-ret) << dendl; + return ret; + } + /* update master zone */ + RGWZoneGroup default_zg(default_zonegroup_name); + ret = default_zg.init(cct, sysobj_svc); + if (ret < 0 && ret != -ENOENT) { + ldout(cct, 0) << __func__ << ": error in initializing default zonegroup: " << cpp_strerror(-ret) << dendl; + return ret; + } + if (ret != -ENOENT && default_zg.master_zone.empty()) { + default_zg.master_zone = zoneparams.get_id(); + return default_zg.update(); + } + return 0; + } + + string master_region, master_zone; + for (list<string>::iterator iter = regions.begin(); iter != regions.end(); ++iter) { + if (*iter != default_zonegroup_name){ + RGWZoneGroup region(*iter); + int ret = region.init(cct, sysobj_svc, true, true); + if (ret < 0) { + ldout(cct, 0) << __func__ << " failed init region "<< *iter << ": " << cpp_strerror(-ret) << dendl; + return ret; + } + if (region.is_master_zonegroup()) { + master_region = region.get_id(); + master_zone = region.master_zone; + } + } + } + + /* create realm if there is none. + The realm name will be the region and zone concatenated + realm id will be mds of its name */ + if (realm->get_id().empty() && !master_region.empty() && !master_zone.empty()) { + string new_realm_name = master_region + "." + master_zone; + unsigned char md5[CEPH_CRYPTO_MD5_DIGESTSIZE]; + char md5_str[CEPH_CRYPTO_MD5_DIGESTSIZE * 2 + 1]; + MD5 hash; + hash.Update((const unsigned char *)new_realm_name.c_str(), new_realm_name.length()); + hash.Final(md5); + buf_to_hex(md5, CEPH_CRYPTO_MD5_DIGESTSIZE, md5_str); + string new_realm_id(md5_str); + RGWRealm new_realm(new_realm_id,new_realm_name); + ret = new_realm.init(cct, sysobj_svc, false); + if (ret < 0) { + ldout(cct, 0) << __func__ << " Error initing new realm: " << cpp_strerror(-ret) << dendl; + return ret; + } + ret = new_realm.create(); + if (ret < 0 && ret != -EEXIST) { + ldout(cct, 0) << __func__ << " Error creating new realm: " << cpp_strerror(-ret) << dendl; + return ret; + } + ret = new_realm.set_as_default(); + if (ret < 0) { + ldout(cct, 0) << __func__ << " Error setting realm as default: " << cpp_strerror(-ret) << dendl; + return ret; + } + ret = realm->init(cct, sysobj_svc); + if (ret < 0) { + ldout(cct, 0) << __func__ << " Error initing realm: " << cpp_strerror(-ret) << dendl; + return ret; + } + ret = current_period->init(cct, sysobj_svc, realm->get_id(), realm->get_name()); + if (ret < 0) { + ldout(cct, 0) << __func__ << " Error initing current period: " << cpp_strerror(-ret) << dendl; + return ret; + } + } + + list<string>::iterator iter; + /* create zonegroups */ + for (iter = regions.begin(); iter != regions.end(); ++iter) + { + ldout(cct, 0) << __func__ << " Converting " << *iter << dendl; + /* check to see if we don't have already a zonegroup with this name */ + RGWZoneGroup new_zonegroup(*iter); + ret = new_zonegroup.init(cct , sysobj_svc); + if (ret == 0 && new_zonegroup.get_id() != *iter) { + ldout(cct, 0) << __func__ << " zonegroup "<< *iter << " already exists id " << new_zonegroup.get_id () << + " skipping conversion " << dendl; + continue; + } + RGWZoneGroup zonegroup(*iter); + zonegroup.set_id(*iter); + int ret = zonegroup.init(cct, sysobj_svc, true, true); + if (ret < 0) { + ldout(cct, 0) << __func__ << " failed init zonegroup: ret "<< ret << " " << cpp_strerror(-ret) << dendl; + return ret; + } + zonegroup.realm_id = realm->get_id(); + /* fix default region master zone */ + if (*iter == default_zonegroup_name && zonegroup.master_zone.empty()) { + ldout(cct, 0) << __func__ << " Setting default zone as master for default region" << dendl; + zonegroup.master_zone = default_zone_name; + } + ret = zonegroup.update(); + if (ret < 0 && ret != -EEXIST) { + ldout(cct, 0) << __func__ << " failed to update zonegroup " << *iter << ": ret "<< ret << " " << cpp_strerror(-ret) + << dendl; + return ret; + } + ret = zonegroup.update_name(); + if (ret < 0 && ret != -EEXIST) { + ldout(cct, 0) << __func__ << " failed to update_name for zonegroup " << *iter << ": ret "<< ret << " " << cpp_strerror(-ret) + << dendl; + return ret; + } + if (zonegroup.get_name() == default_region) { + ret = zonegroup.set_as_default(); + if (ret < 0) { + ldout(cct, 0) << __func__ << " failed to set_as_default " << *iter << ": ret "<< ret << " " << cpp_strerror(-ret) + << dendl; + return ret; + } + } + for (map<string, RGWZone>::const_iterator iter = zonegroup.zones.begin(); iter != zonegroup.zones.end(); + ++iter) { + ldout(cct, 0) << __func__ << " Converting zone" << iter->first << dendl; + RGWZoneParams zoneparams(iter->first, iter->first); + zoneparams.set_id(iter->first); + zoneparams.realm_id = realm->get_id(); + ret = zoneparams.init(cct, sysobj_svc); + if (ret < 0 && ret != -ENOENT) { + ldout(cct, 0) << __func__ << " failed to init zoneparams " << iter->first << ": " << cpp_strerror(-ret) << dendl; + return ret; + } else if (ret == -ENOENT) { + ldout(cct, 0) << __func__ << " zone is part of another cluster " << iter->first << " skipping " << dendl; + continue; + } + zonegroup.realm_id = realm->get_id(); + ret = zoneparams.update(); + if (ret < 0 && ret != -EEXIST) { + ldout(cct, 0) << __func__ << " failed to update zoneparams " << iter->first << ": " << cpp_strerror(-ret) << dendl; + return ret; + } + ret = zoneparams.update_name(); + if (ret < 0 && ret != -EEXIST) { + ldout(cct, 0) << __func__ << " failed to init zoneparams " << iter->first << ": " << cpp_strerror(-ret) << dendl; + return ret; + } + } + + if (!current_period->get_id().empty()) { + ret = current_period->add_zonegroup(zonegroup); + if (ret < 0) { + ldout(cct, 0) << __func__ << " failed to add zonegroup to current_period: " << cpp_strerror(-ret) << dendl; + return ret; + } + } + } + + if (!current_period->get_id().empty()) { + ret = current_period->update(); + if (ret < 0) { + ldout(cct, 0) << __func__ << " failed to update new period: " << cpp_strerror(-ret) << dendl; + return ret; + } + ret = current_period->store_info(false); + if (ret < 0) { + ldout(cct, 0) << __func__ << " failed to store new period: " << cpp_strerror(-ret) << dendl; + return ret; + } + ret = current_period->reflect(); + if (ret < 0) { + ldout(cct, 0) << __func__ << " failed to update local objects: " << cpp_strerror(-ret) << dendl; + return ret; + } + } + + for (auto const& iter : regions) { + RGWZoneGroup zonegroup(iter); + int ret = zonegroup.init(cct, sysobj_svc, true, true); + if (ret < 0) { + ldout(cct, 0) << __func__ << " failed init zonegroup" << iter << ": ret "<< ret << " " << cpp_strerror(-ret) << dendl; + return ret; + } + ret = zonegroup.delete_obj(true); + if (ret < 0 && ret != -ENOENT) { + ldout(cct, 0) << __func__ << " failed to delete region " << iter << ": ret "<< ret << " " << cpp_strerror(-ret) + << dendl; + return ret; + } + } + + /* mark as converted */ + ret = sysobj.wop() + .set_exclusive(true) + .write(bl); + if (ret < 0 ) { + ldout(cct, 0) << __func__ << " failed to mark cluster as converted: ret "<< ret << " " << cpp_strerror(-ret) + << dendl; + return ret; + } + + return 0; +} + +/** + * Add new connection to connections map + * @param zonegroup_conn_map map which new connection will be added to + * @param zonegroup zonegroup which new connection will connect to + * @param new_connection pointer to new connection instance + */ +static void add_new_connection_to_map(map<string, RGWRESTConn *> &zonegroup_conn_map, + const RGWZoneGroup &zonegroup, RGWRESTConn *new_connection) +{ + // Delete if connection is already exists + map<string, RGWRESTConn *>::iterator iterZoneGroup = zonegroup_conn_map.find(zonegroup.get_id()); + if (iterZoneGroup != zonegroup_conn_map.end()) { + delete iterZoneGroup->second; + } + + // Add new connection to connections map + zonegroup_conn_map[zonegroup.get_id()] = new_connection; +} + +int RGWSI_Zone::init_zg_from_period(bool *initialized) +{ + *initialized = false; + + if (current_period->get_id().empty()) { + return 0; + } + + int ret = zonegroup->init(cct, sysobj_svc); + ldout(cct, 20) << "period zonegroup init ret " << ret << dendl; + if (ret == -ENOENT) { + return 0; + } + if (ret < 0) { + ldout(cct, 0) << "failed reading zonegroup info: " << cpp_strerror(-ret) << dendl; + return ret; + } + ldout(cct, 20) << "period zonegroup name " << zonegroup->get_name() << dendl; + + map<string, RGWZoneGroup>::const_iterator iter = + current_period->get_map().zonegroups.find(zonegroup->get_id()); + + if (iter != current_period->get_map().zonegroups.end()) { + ldout(cct, 20) << "using current period zonegroup " << zonegroup->get_name() << dendl; + *zonegroup = iter->second; + ret = zonegroup->init(cct, sysobj_svc, false); + if (ret < 0) { + ldout(cct, 0) << "failed init zonegroup: " << " " << cpp_strerror(-ret) << dendl; + return ret; + } + ret = zone_params->init(cct, sysobj_svc); + if (ret < 0 && ret != -ENOENT) { + ldout(cct, 0) << "failed reading zone params info: " << " " << cpp_strerror(-ret) << dendl; + return ret; + } if (ret ==-ENOENT && zonegroup->get_name() == default_zonegroup_name) { + ldout(cct, 10) << " Using default name "<< default_zone_name << dendl; + zone_params->set_name(default_zone_name); + ret = zone_params->init(cct, sysobj_svc); + if (ret < 0 && ret != -ENOENT) { + ldout(cct, 0) << "failed reading zone params info: " << " " << cpp_strerror(-ret) << dendl; + return ret; + } + } + } + for (iter = current_period->get_map().zonegroups.begin(); + iter != current_period->get_map().zonegroups.end(); ++iter){ + const RGWZoneGroup& zg = iter->second; + // use endpoints from the zonegroup's master zone + auto master = zg.zones.find(zg.master_zone); + if (master == zg.zones.end()) { + // Check for empty zonegroup which can happen if zone was deleted before removal + if (zg.zones.size() == 0) + continue; + // fix missing master zone for a single zone zonegroup + if (zg.master_zone.empty() && zg.zones.size() == 1) { + master = zg.zones.begin(); + ldout(cct, 0) << "zonegroup " << zg.get_name() << " missing master_zone, setting zone " << + master->second.name << " id:" << master->second.id << " as master" << dendl; + if (zonegroup->get_id() == zg.get_id()) { + zonegroup->master_zone = master->second.id; + ret = zonegroup->update(); + if (ret < 0) { + ldout(cct, 0) << "error updating zonegroup : " << cpp_strerror(-ret) << dendl; + return ret; + } + } else { + RGWZoneGroup fixed_zg(zg.get_id(),zg.get_name()); + ret = fixed_zg.init(cct, sysobj_svc); + if (ret < 0) { + ldout(cct, 0) << "error initializing zonegroup : " << cpp_strerror(-ret) << dendl; + return ret; + } + fixed_zg.master_zone = master->second.id; + ret = fixed_zg.update(); + if (ret < 0) { + ldout(cct, 0) << "error initializing zonegroup : " << cpp_strerror(-ret) << dendl; + return ret; + } + } + } else { + ldout(cct, 0) << "zonegroup " << zg.get_name() << " missing zone for master_zone=" << + zg.master_zone << dendl; + return -EINVAL; + } + } + const auto& endpoints = master->second.endpoints; + add_new_connection_to_map(zonegroup_conn_map, zg, new RGWRESTConn(cct, this, zg.get_id(), endpoints)); + if (!current_period->get_master_zonegroup().empty() && + zg.get_id() == current_period->get_master_zonegroup()) { + rest_master_conn = new RGWRESTConn(cct, this, zg.get_id(), endpoints); + } + } + + *initialized = true; + + return 0; +} + +int RGWSI_Zone::init_zg_from_local(bool *creating_defaults) +{ + int ret = zonegroup->init(cct, sysobj_svc); + if ( (ret < 0 && ret != -ENOENT) || (ret == -ENOENT && !cct->_conf->rgw_zonegroup.empty())) { + ldout(cct, 0) << "failed reading zonegroup info: ret "<< ret << " " << cpp_strerror(-ret) << dendl; + return ret; + } else if (ret == -ENOENT) { + *creating_defaults = true; + ldout(cct, 10) << "Creating default zonegroup " << dendl; + ret = zonegroup->create_default(); + if (ret < 0) { + ldout(cct, 0) << "failure in zonegroup create_default: ret "<< ret << " " << cpp_strerror(-ret) + << dendl; + return ret; + } + ret = zonegroup->init(cct, sysobj_svc); + if (ret < 0) { + ldout(cct, 0) << "failure in zonegroup create_default: ret "<< ret << " " << cpp_strerror(-ret) + << dendl; + return ret; + } + } + ldout(cct, 20) << "zonegroup " << zonegroup->get_name() << dendl; + if (zonegroup->is_master_zonegroup()) { + // use endpoints from the zonegroup's master zone + auto master = zonegroup->zones.find(zonegroup->master_zone); + if (master == zonegroup->zones.end()) { + // fix missing master zone for a single zone zonegroup + if (zonegroup->master_zone.empty() && zonegroup->zones.size() == 1) { + master = zonegroup->zones.begin(); + ldout(cct, 0) << "zonegroup " << zonegroup->get_name() << " missing master_zone, setting zone " << + master->second.name << " id:" << master->second.id << " as master" << dendl; + zonegroup->master_zone = master->second.id; + ret = zonegroup->update(); + if (ret < 0) { + ldout(cct, 0) << "error initializing zonegroup : " << cpp_strerror(-ret) << dendl; + return ret; + } + } else { + ldout(cct, 0) << "zonegroup " << zonegroup->get_name() << " missing zone for " + "master_zone=" << zonegroup->master_zone << dendl; + return -EINVAL; + } + } + const auto& endpoints = master->second.endpoints; + rest_master_conn = new RGWRESTConn(cct, this, zonegroup->get_id(), endpoints); + } + + return 0; +} + +int RGWSI_Zone::convert_regionmap() +{ + RGWZoneGroupMap zonegroupmap; + + string pool_name = cct->_conf->rgw_zone_root_pool; + if (pool_name.empty()) { + pool_name = RGW_DEFAULT_ZONE_ROOT_POOL; + } + string oid = region_map_oid; + + rgw_pool pool(pool_name); + bufferlist bl; + + RGWSysObjectCtx obj_ctx = sysobj_svc->init_obj_ctx(); + RGWSysObj sysobj = sysobj_svc->get_obj(obj_ctx, rgw_raw_obj(pool, oid)); + + int ret = sysobj.rop().read(&bl); + if (ret < 0 && ret != -ENOENT) { + return ret; + } else if (ret == -ENOENT) { + return 0; + } + + try { + auto iter = bl.cbegin(); + decode(zonegroupmap, iter); + } catch (buffer::error& err) { + ldout(cct, 0) << "error decoding regionmap from " << pool << ":" << oid << dendl; + return -EIO; + } + + for (map<string, RGWZoneGroup>::iterator iter = zonegroupmap.zonegroups.begin(); + iter != zonegroupmap.zonegroups.end(); ++iter) { + RGWZoneGroup& zonegroup = iter->second; + ret = zonegroup.init(cct, sysobj_svc, false); + ret = zonegroup.update(); + if (ret < 0 && ret != -ENOENT) { + ldout(cct, 0) << "Error could not update zonegroup " << zonegroup.get_name() << ": " << + cpp_strerror(-ret) << dendl; + return ret; + } else if (ret == -ENOENT) { + ret = zonegroup.create(); + if (ret < 0) { + ldout(cct, 0) << "Error could not create " << zonegroup.get_name() << ": " << + cpp_strerror(-ret) << dendl; + return ret; + } + } + } + + current_period->set_user_quota(zonegroupmap.user_quota); + current_period->set_bucket_quota(zonegroupmap.bucket_quota); + + // remove the region_map so we don't try to convert again + ret = sysobj.wop().remove(); + if (ret < 0) { + ldout(cct, 0) << "Error could not remove " << sysobj.get_obj() + << " after upgrading to zonegroup map: " << cpp_strerror(ret) << dendl; + return ret; + } + + return 0; +} + +const RGWZoneParams& RGWSI_Zone::get_zone_params() const +{ + return *zone_params; +} + +const RGWZone& RGWSI_Zone::get_zone() const +{ + return *zone_public_config; +} + +const RGWZoneGroup& RGWSI_Zone::get_zonegroup() const +{ + return *zonegroup; +} + +int RGWSI_Zone::get_zonegroup(const string& id, RGWZoneGroup& zg) const +{ + int ret = 0; + if (id == zonegroup->get_id()) { + zg = *zonegroup; + } else if (!current_period->get_id().empty()) { + ret = current_period->get_zonegroup(zg, id); + } + return ret; +} + +const RGWRealm& RGWSI_Zone::get_realm() const +{ + return *realm; +} + +const RGWPeriod& RGWSI_Zone::get_current_period() const +{ + return *current_period; +} + +const string& RGWSI_Zone::get_current_period_id() +{ + return current_period->get_id(); +} + +bool RGWSI_Zone::has_zonegroup_api(const std::string& api) const +{ + if (!current_period->get_id().empty()) { + const auto& zonegroups_by_api = current_period->get_map().zonegroups_by_api; + if (zonegroups_by_api.find(api) != zonegroups_by_api.end()) + return true; + } else if (zonegroup->api_name == api) { + return true; + } + return false; +} + +bool RGWSI_Zone::zone_is_writeable() +{ + return writeable_zone && !get_zone().is_read_only(); +} + +uint32_t RGWSI_Zone::get_zone_short_id() const +{ + return zone_short_id; +} + +const string& RGWSI_Zone::zone_name() +{ + return get_zone_params().get_name(); +} +const string& RGWSI_Zone::zone_id() +{ + return get_zone_params().get_id(); +} + +bool RGWSI_Zone::find_zone_by_id(const string& id, RGWZone **zone) +{ + auto iter = zone_by_id.find(id); + if (iter == zone_by_id.end()) { + return false; + } + *zone = &(iter->second); + return true; +} + +RGWRESTConn *RGWSI_Zone::get_zone_conn_by_id(const string& id) { + auto citer = zone_conn_map.find(id); + if (citer == zone_conn_map.end()) { + return NULL; + } + + return citer->second; +} + +RGWRESTConn *RGWSI_Zone::get_zone_conn_by_name(const string& name) { + auto i = zone_id_by_name.find(name); + if (i == zone_id_by_name.end()) { + return NULL; + } + + return get_zone_conn_by_id(i->second); +} + +bool RGWSI_Zone::find_zone_id_by_name(const string& name, string *id) { + auto i = zone_id_by_name.find(name); + if (i == zone_id_by_name.end()) { + return false; + } + *id = i->second; + return true; +} + +bool RGWSI_Zone::need_to_log_data() const +{ + return zone_public_config->log_data; +} + +bool RGWSI_Zone::is_meta_master() const +{ + if (!zonegroup->is_master_zonegroup()) { + return false; + } + + return (zonegroup->master_zone == zone_public_config->id); +} + +bool RGWSI_Zone::need_to_log_metadata() const +{ + return is_meta_master() && + (zonegroup->zones.size() > 1 || current_period->is_multi_zonegroups_with_zones()); +} + +bool RGWSI_Zone::can_reshard() const +{ + return current_period->get_id().empty() || + (zonegroup->zones.size() == 1 && current_period->is_single_zonegroup()); +} + +/** + * Check to see if the bucket metadata could be synced + * bucket: the bucket to check + * Returns false is the bucket is not synced + */ +bool RGWSI_Zone::is_syncing_bucket_meta(const rgw_bucket& bucket) +{ + + /* no current period */ + if (current_period->get_id().empty()) { + return false; + } + + /* zonegroup is not master zonegroup */ + if (!zonegroup->is_master_zonegroup()) { + return false; + } + + /* single zonegroup and a single zone */ + if (current_period->is_single_zonegroup() && zonegroup->zones.size() == 1) { + return false; + } + + /* zone is not master */ + if (zonegroup->master_zone.compare(zone_public_config->id) != 0) { + return false; + } + + return true; +} + + +int RGWSI_Zone::select_new_bucket_location(const RGWUserInfo& user_info, const string& zonegroup_id, + const rgw_placement_rule& request_rule, + rgw_placement_rule *pselected_rule_name, RGWZonePlacementInfo *rule_info) +{ + /* first check that zonegroup exists within current period. */ + RGWZoneGroup zonegroup; + int ret = get_zonegroup(zonegroup_id, zonegroup); + if (ret < 0) { + ldout(cct, 0) << "could not find zonegroup " << zonegroup_id << " in current period" << dendl; + return ret; + } + + const rgw_placement_rule *used_rule; + + /* find placement rule. Hierarchy: request rule > user default rule > zonegroup default rule */ + std::map<std::string, RGWZoneGroupPlacementTarget>::const_iterator titer; + + if (!request_rule.name.empty()) { + used_rule = &request_rule; + titer = zonegroup.placement_targets.find(request_rule.name); + if (titer == zonegroup.placement_targets.end()) { + ldout(cct, 0) << "could not find requested placement id " << request_rule + << " within zonegroup " << dendl; + return -ERR_INVALID_LOCATION_CONSTRAINT; + } + } else if (!user_info.default_placement.name.empty()) { + used_rule = &user_info.default_placement; + titer = zonegroup.placement_targets.find(user_info.default_placement.name); + if (titer == zonegroup.placement_targets.end()) { + ldout(cct, 0) << "could not find user default placement id " << user_info.default_placement + << " within zonegroup " << dendl; + return -ERR_INVALID_LOCATION_CONSTRAINT; + } + } else { + if (zonegroup.default_placement.name.empty()) { // zonegroup default rule as fallback, it should not be empty. + ldout(cct, 0) << "misconfiguration, zonegroup default placement id should not be empty." << dendl; + return -ERR_ZONEGROUP_DEFAULT_PLACEMENT_MISCONFIGURATION; + } else { + used_rule = &zonegroup.default_placement; + titer = zonegroup.placement_targets.find(zonegroup.default_placement.name); + if (titer == zonegroup.placement_targets.end()) { + ldout(cct, 0) << "could not find zonegroup default placement id " << zonegroup.default_placement + << " within zonegroup " << dendl; + return -ERR_INVALID_LOCATION_CONSTRAINT; + } + } + } + + /* now check tag for the rule, whether user is permitted to use rule */ + const auto& target_rule = titer->second; + if (!target_rule.user_permitted(user_info.placement_tags)) { + ldout(cct, 0) << "user not permitted to use placement rule " << titer->first << dendl; + return -EPERM; + } + + const string *storage_class = &request_rule.storage_class; + + if (storage_class->empty()) { + storage_class = &used_rule->storage_class; + } + + rgw_placement_rule rule(titer->first, *storage_class); + + if (pselected_rule_name) { + *pselected_rule_name = rule; + } + + return select_bucket_location_by_rule(rule, rule_info); +} + +int RGWSI_Zone::select_bucket_location_by_rule(const rgw_placement_rule& location_rule, RGWZonePlacementInfo *rule_info) +{ + if (location_rule.name.empty()) { + /* we can only reach here if we're trying to set a bucket location from a bucket + * created on a different zone, using a legacy / default pool configuration + */ + if (rule_info) { + return select_legacy_bucket_placement(rule_info); + } + + return 0; + } + + /* + * make sure that zone has this rule configured. We're + * checking it for the local zone, because that's where this bucket object is going to + * reside. + */ + auto piter = zone_params->placement_pools.find(location_rule.name); + if (piter == zone_params->placement_pools.end()) { + /* couldn't find, means we cannot really place data for this bucket in this zone */ + ldout(cct, 0) << "ERROR: This zone does not contain placement rule " + << location_rule << " present in the zonegroup!" << dendl; + return -EINVAL; + } + + auto storage_class = location_rule.get_storage_class(); + if (!piter->second.storage_class_exists(storage_class)) { + ldout(cct, 5) << "requested storage class does not exist: " << storage_class << dendl; + return -EINVAL; + } + + + RGWZonePlacementInfo& placement_info = piter->second; + + if (rule_info) { + *rule_info = placement_info; + } + + return 0; +} + +int RGWSI_Zone::select_bucket_placement(const RGWUserInfo& user_info, const string& zonegroup_id, + const rgw_placement_rule& placement_rule, + rgw_placement_rule *pselected_rule, RGWZonePlacementInfo *rule_info) +{ + if (!zone_params->placement_pools.empty()) { + return select_new_bucket_location(user_info, zonegroup_id, placement_rule, + pselected_rule, rule_info); + } + + if (pselected_rule) { + pselected_rule->clear(); + } + + if (rule_info) { + return select_legacy_bucket_placement(rule_info); + } + + return 0; +} + +int RGWSI_Zone::select_legacy_bucket_placement(RGWZonePlacementInfo *rule_info) +{ + bufferlist map_bl; + map<string, bufferlist> m; + string pool_name; + bool write_map = false; + + rgw_raw_obj obj(zone_params->domain_root, avail_pools); + + auto obj_ctx = sysobj_svc->init_obj_ctx(); + auto sysobj = obj_ctx.get_obj(obj); + + int ret = sysobj.rop().read(&map_bl); + if (ret < 0) { + goto read_omap; + } + + try { + auto iter = map_bl.cbegin(); + decode(m, iter); + } catch (buffer::error& err) { + ldout(cct, 0) << "ERROR: couldn't decode avail_pools" << dendl; + } + +read_omap: + if (m.empty()) { + ret = sysobj.omap().get_all(&m); + + write_map = true; + } + + if (ret < 0 || m.empty()) { + vector<rgw_pool> pools; + string s = string("default.") + default_storage_pool_suffix; + pools.push_back(rgw_pool(s)); + vector<int> retcodes; + bufferlist bl; + ret = rados_svc->pool().create(pools, &retcodes); + if (ret < 0) + return ret; + ret = sysobj.omap().set(s, bl); + if (ret < 0) + return ret; + m[s] = bl; + } + + if (write_map) { + bufferlist new_bl; + encode(m, new_bl); + ret = sysobj.wop().write(new_bl); + if (ret < 0) { + ldout(cct, 0) << "WARNING: could not save avail pools map info ret=" << ret << dendl; + } + } + + auto miter = m.begin(); + if (m.size() > 1) { + // choose a pool at random + auto r = ceph::util::generate_random_number<size_t>(0, m.size() - 1); + std::advance(miter, r); + } + pool_name = miter->first; + + rgw_pool pool = pool_name; + + rule_info->storage_classes.set_storage_class(RGW_STORAGE_CLASS_STANDARD, &pool, nullptr); + rule_info->data_extra_pool = pool_name; + rule_info->index_pool = pool_name; + rule_info->index_type = RGWBIType_Normal; + + return 0; +} + +int RGWSI_Zone::update_placement_map() +{ + bufferlist header; + map<string, bufferlist> m; + rgw_raw_obj obj(zone_params->domain_root, avail_pools); + + auto obj_ctx = sysobj_svc->init_obj_ctx(); + auto sysobj = obj_ctx.get_obj(obj); + + int ret = sysobj.omap().get_all(&m); + if (ret < 0) + return ret; + + bufferlist new_bl; + encode(m, new_bl); + ret = sysobj.wop().write(new_bl); + if (ret < 0) { + ldout(cct, 0) << "WARNING: could not save avail pools map info ret=" << ret << dendl; + } + + return ret; +} + +int RGWSI_Zone::add_bucket_placement(const rgw_pool& new_pool) +{ + int ret = rados_svc->pool(new_pool).lookup(); + if (ret < 0) { // DNE, or something + return ret; + } + + rgw_raw_obj obj(zone_params->domain_root, avail_pools); + auto obj_ctx = sysobj_svc->init_obj_ctx(); + auto sysobj = obj_ctx.get_obj(obj); + + bufferlist empty_bl; + ret = sysobj.omap().set(new_pool.to_str(), empty_bl); + + // don't care about return value + update_placement_map(); + + return ret; +} + +int RGWSI_Zone::remove_bucket_placement(const rgw_pool& old_pool) +{ + rgw_raw_obj obj(zone_params->domain_root, avail_pools); + auto obj_ctx = sysobj_svc->init_obj_ctx(); + auto sysobj = obj_ctx.get_obj(obj); + + int ret = sysobj.omap().del(old_pool.to_str()); + + // don't care about return value + update_placement_map(); + + return ret; +} + +int RGWSI_Zone::list_placement_set(set<rgw_pool>& names) +{ + bufferlist header; + map<string, bufferlist> m; + + rgw_raw_obj obj(zone_params->domain_root, avail_pools); + auto obj_ctx = sysobj_svc->init_obj_ctx(); + auto sysobj = obj_ctx.get_obj(obj); + int ret = sysobj.omap().get_all(&m); + if (ret < 0) + return ret; + + names.clear(); + map<string, bufferlist>::iterator miter; + for (miter = m.begin(); miter != m.end(); ++miter) { + names.insert(rgw_pool(miter->first)); + } + + return names.size(); +} + +bool RGWSI_Zone::get_redirect_zone_endpoint(string *endpoint) +{ + if (zone_public_config->redirect_zone.empty()) { + return false; + } + + auto iter = zone_conn_map.find(zone_public_config->redirect_zone); + if (iter == zone_conn_map.end()) { + ldout(cct, 0) << "ERROR: cannot find entry for redirect zone: " << zone_public_config->redirect_zone << dendl; + return false; + } + + RGWRESTConn *conn = iter->second; + + int ret = conn->get_url(*endpoint); + if (ret < 0) { + ldout(cct, 0) << "ERROR: redirect zone, conn->get_endpoint() returned ret=" << ret << dendl; + return false; + } + + return true; +} + diff --git a/src/rgw/services/svc_zone.h b/src/rgw/services/svc_zone.h new file mode 100644 index 00000000..8c8dbeba --- /dev/null +++ b/src/rgw/services/svc_zone.h @@ -0,0 +1,134 @@ +#ifndef CEPH_RGW_SERVICES_ZONE_H +#define CEPH_RGW_SERVICES_ZONE_H + + +#include "rgw/rgw_service.h" + + +class RGWSI_RADOS; +class RGWSI_SysObj; +class RGWSI_SyncModules; + +class RGWRealm; +class RGWZoneGroup; +class RGWZone; +class RGWZoneParams; +class RGWPeriod; +class RGWZonePlacementInfo; + +class RGWRESTConn; + +class RGWSI_Zone : public RGWServiceInstance +{ + friend struct RGWServices_Def; + + RGWSI_SysObj *sysobj_svc{nullptr}; + RGWSI_RADOS *rados_svc{nullptr}; + RGWSI_SyncModules *sync_modules_svc{nullptr}; + + RGWRealm *realm{nullptr}; + RGWZoneGroup *zonegroup{nullptr}; + RGWZone *zone_public_config{nullptr}; /* external zone params, e.g., entrypoints, log flags, etc. */ + RGWZoneParams *zone_params{nullptr}; /* internal zone params, e.g., rados pools */ + RGWPeriod *current_period{nullptr}; + uint32_t zone_short_id{0}; + bool writeable_zone{false}; + + RGWRESTConn *rest_master_conn{nullptr}; + map<string, RGWRESTConn *> zone_conn_map; + std::vector<const RGWZone*> data_sync_source_zones; + map<string, RGWRESTConn *> zone_data_notify_to_map; + map<string, RGWRESTConn *> zonegroup_conn_map; + + map<string, string> zone_id_by_name; + map<string, RGWZone> zone_by_id; + + void init(RGWSI_SysObj *_sysobj_svc, + RGWSI_RADOS *_rados_svc, + RGWSI_SyncModules *_sync_modules_svc); + int do_start() override; + void shutdown() override; + + int replace_region_with_zonegroup(); + int init_zg_from_period(bool *initialized); + int init_zg_from_local(bool *creating_defaults); + int convert_regionmap(); + + int update_placement_map(); +public: + RGWSI_Zone(CephContext *cct); + ~RGWSI_Zone(); + + const RGWZoneParams& get_zone_params() const; + const RGWPeriod& get_current_period() const; + const RGWRealm& get_realm() const; + const RGWZoneGroup& get_zonegroup() const; + int get_zonegroup(const string& id, RGWZoneGroup& zonegroup) const; + const RGWZone& get_zone() const; + + const string& zone_name(); + const string& zone_id(); + uint32_t get_zone_short_id() const; + + const string& get_current_period_id(); + bool has_zonegroup_api(const std::string& api) const; + + bool zone_is_writeable(); + bool zone_syncs_from(const RGWZone& target_zone, const RGWZone& source_zone) const; + bool get_redirect_zone_endpoint(string *endpoint); + + RGWRESTConn *get_master_conn() { + return rest_master_conn; + } + + map<string, RGWRESTConn *>& get_zonegroup_conn_map() { + return zonegroup_conn_map; + } + + map<string, RGWRESTConn *>& get_zone_conn_map() { + return zone_conn_map; + } + + std::vector<const RGWZone*>& get_data_sync_source_zones() { + return data_sync_source_zones; + } + + map<string, RGWRESTConn *>& get_zone_data_notify_to_map() { + return zone_data_notify_to_map; + } + + bool find_zone_by_id(const string& id, RGWZone **zone); + + RGWRESTConn *get_zone_conn_by_id(const string& id); + RGWRESTConn *get_zone_conn_by_name(const string& name); + bool find_zone_id_by_name(const string& name, string *id); + + int select_bucket_placement(const RGWUserInfo& user_info, const string& zonegroup_id, + const rgw_placement_rule& rule, + rgw_placement_rule *pselected_rule, RGWZonePlacementInfo *rule_info); + int select_legacy_bucket_placement(RGWZonePlacementInfo *rule_info); + int select_new_bucket_location(const RGWUserInfo& user_info, const string& zonegroup_id, + const rgw_placement_rule& rule, + rgw_placement_rule *pselected_rule_name, RGWZonePlacementInfo *rule_info); + int select_bucket_location_by_rule(const rgw_placement_rule& location_rule, RGWZonePlacementInfo *rule_info); + + int add_bucket_placement(const rgw_pool& new_pool); + int remove_bucket_placement(const rgw_pool& old_pool); + int list_placement_set(set<rgw_pool>& names); + + bool is_meta_master() const; + + bool need_to_log_data() const; + bool need_to_log_metadata() const; + bool can_reshard() const; + bool is_syncing_bucket_meta(const rgw_bucket& bucket); + + int list_zonegroups(list<string>& zonegroups); + int list_regions(list<string>& regions); + int list_zones(list<string>& zones); + int list_realms(list<string>& realms); + int list_periods(list<string>& periods); + int list_periods(const string& current_period, list<string>& periods); +}; + +#endif diff --git a/src/rgw/services/svc_zone_utils.cc b/src/rgw/services/svc_zone_utils.cc new file mode 100644 index 00000000..ef9c9c88 --- /dev/null +++ b/src/rgw/services/svc_zone_utils.cc @@ -0,0 +1,59 @@ +#include "svc_zone_utils.h" +#include "svc_rados.h" +#include "svc_zone.h" + +#include "rgw/rgw_zone.h" + +int RGWSI_ZoneUtils::do_start() +{ + init_unique_trans_id_deps(); + + return 0; +} + +string RGWSI_ZoneUtils::gen_host_id() { + /* uint64_t needs 16, two '-' separators and a trailing null */ + const string& zone_name = zone_svc->get_zone().name; + const string& zonegroup_name = zone_svc->get_zonegroup().get_name(); + char charbuf[16 + zone_name.size() + zonegroup_name.size() + 2 + 1]; + snprintf(charbuf, sizeof(charbuf), "%llx-%s-%s", (unsigned long long)rados_svc->instance_id(), zone_name.c_str(), zonegroup_name.c_str()); + return string(charbuf); +} + +string RGWSI_ZoneUtils::unique_id(uint64_t unique_num) +{ + char buf[32]; + snprintf(buf, sizeof(buf), ".%llu.%llu", (unsigned long long)rados_svc->instance_id(), (unsigned long long)unique_num); + string s = zone_svc->get_zone_params().get_id() + buf; + return s; +} + +void RGWSI_ZoneUtils::init_unique_trans_id_deps() { + char buf[16 + 2 + 1]; /* uint64_t needs 16, 2 hyphens add further 2 */ + + snprintf(buf, sizeof(buf), "-%llx-", (unsigned long long)rados_svc->instance_id()); + url_encode(string(buf) + zone_svc->get_zone().name, trans_id_suffix); +} + +/* In order to preserve compatibility with Swift API, transaction ID + * should contain at least 32 characters satisfying following spec: + * - first 21 chars must be in range [0-9a-f]. Swift uses this + * space for storing fragment of UUID obtained through a call to + * uuid4() function of Python's uuid module; + * - char no. 22 must be a hyphen; + * - at least 10 next characters constitute hex-formatted timestamp + * padded with zeroes if necessary. All bytes must be in [0-9a-f] + * range; + * - last, optional part of transaction ID is any url-encoded string + * without restriction on length. */ +string RGWSI_ZoneUtils::unique_trans_id(const uint64_t unique_num) { + char buf[41]; /* 2 + 21 + 1 + 16 (timestamp can consume up to 16) + 1 */ + time_t timestamp = time(NULL); + + snprintf(buf, sizeof(buf), "tx%021llx-%010llx", + (unsigned long long)unique_num, + (unsigned long long)timestamp); + + return string(buf) + trans_id_suffix; +} + diff --git a/src/rgw/services/svc_zone_utils.h b/src/rgw/services/svc_zone_utils.h new file mode 100644 index 00000000..158d2a92 --- /dev/null +++ b/src/rgw/services/svc_zone_utils.h @@ -0,0 +1,39 @@ +#ifndef CEPH_RGW_SERVICES_ZONEUTILS_H +#define CEPH_RGW_SERVICES_ZONEUTILS_H + + +#include "rgw/rgw_service.h" + + +class RGWSI_RADOS; +class RGWSI_Zone; + +class RGWSI_ZoneUtils : public RGWServiceInstance +{ + friend struct RGWServices_Def; + + RGWSI_RADOS *rados_svc{nullptr}; + RGWSI_Zone *zone_svc{nullptr}; + + string trans_id_suffix; + + void init(RGWSI_RADOS *_rados_svc, + RGWSI_Zone *_zone_svc) { + rados_svc = _rados_svc; + zone_svc = _zone_svc; + } + + int do_start() override; + + void init_unique_trans_id_deps(); + +public: + RGWSI_ZoneUtils(CephContext *cct): RGWServiceInstance(cct) {} + + string gen_host_id(); + string unique_id(uint64_t unique_num); + + string unique_trans_id(const uint64_t unique_num); +}; + +#endif |