summaryrefslogtreecommitdiffstats
path: root/src/rgw/rgw_quota.cc
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-21 11:54:28 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-21 11:54:28 +0000
commite6918187568dbd01842d8d1d2c808ce16a894239 (patch)
tree64f88b554b444a49f656b6c656111a145cbbaa28 /src/rgw/rgw_quota.cc
parentInitial commit. (diff)
downloadceph-e6918187568dbd01842d8d1d2c808ce16a894239.tar.xz
ceph-e6918187568dbd01842d8d1d2c808ce16a894239.zip
Adding upstream version 18.2.2.upstream/18.2.2
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to '')
-rw-r--r--src/rgw/rgw_quota.cc1049
1 files changed, 1049 insertions, 0 deletions
diff --git a/src/rgw/rgw_quota.cc b/src/rgw/rgw_quota.cc
new file mode 100644
index 000000000..f1ae34f93
--- /dev/null
+++ b/src/rgw/rgw_quota.cc
@@ -0,0 +1,1049 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab ft=cpp
+
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2013 Inktank, Inc
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+
+#include "include/utime.h"
+#include "common/lru_map.h"
+#include "common/RefCountedObj.h"
+#include "common/Thread.h"
+#include "common/ceph_mutex.h"
+
+#include "rgw_common.h"
+#include "rgw_sal.h"
+#include "rgw_sal_rados.h"
+#include "rgw_quota.h"
+#include "rgw_bucket.h"
+#include "rgw_user.h"
+
+#include "services/svc_sys_obj.h"
+#include "services/svc_meta.h"
+
+#include <atomic>
+
+#define dout_context g_ceph_context
+#define dout_subsys ceph_subsys_rgw
+
+using namespace std;
+
+struct RGWQuotaCacheStats {
+ RGWStorageStats stats;
+ utime_t expiration;
+ utime_t async_refresh_time;
+};
+
+template<class T>
+class RGWQuotaCache {
+protected:
+ rgw::sal::Driver* driver;
+ lru_map<T, RGWQuotaCacheStats> stats_map;
+ RefCountedWaitObject *async_refcount;
+
+ class StatsAsyncTestSet : public lru_map<T, RGWQuotaCacheStats>::UpdateContext {
+ int objs_delta;
+ uint64_t added_bytes;
+ uint64_t removed_bytes;
+ public:
+ StatsAsyncTestSet() : objs_delta(0), added_bytes(0), removed_bytes(0) {}
+ bool update(RGWQuotaCacheStats *entry) override {
+ if (entry->async_refresh_time.sec() == 0)
+ return false;
+
+ entry->async_refresh_time = utime_t(0, 0);
+
+ return true;
+ }
+ };
+
+ virtual int fetch_stats_from_storage(const rgw_user& user, const rgw_bucket& bucket, RGWStorageStats& stats, optional_yield y, const DoutPrefixProvider *dpp) = 0;
+
+ virtual bool map_find(const rgw_user& user, const rgw_bucket& bucket, RGWQuotaCacheStats& qs) = 0;
+
+ virtual bool map_find_and_update(const rgw_user& user, const rgw_bucket& bucket, typename lru_map<T, RGWQuotaCacheStats>::UpdateContext *ctx) = 0;
+ virtual void map_add(const rgw_user& user, const rgw_bucket& bucket, RGWQuotaCacheStats& qs) = 0;
+
+ virtual void data_modified(const rgw_user& user, rgw_bucket& bucket) {}
+public:
+ RGWQuotaCache(rgw::sal::Driver* _driver, int size) : driver(_driver), stats_map(size) {
+ async_refcount = new RefCountedWaitObject;
+ }
+ virtual ~RGWQuotaCache() {
+ async_refcount->put_wait(); /* wait for all pending async requests to complete */
+ }
+
+ int get_stats(const rgw_user& user, const rgw_bucket& bucket, RGWStorageStats& stats, optional_yield y,
+ const DoutPrefixProvider* dpp);
+ void adjust_stats(const rgw_user& user, rgw_bucket& bucket, int objs_delta, uint64_t added_bytes, uint64_t removed_bytes);
+
+ void set_stats(const rgw_user& user, const rgw_bucket& bucket, RGWQuotaCacheStats& qs, RGWStorageStats& stats);
+ int async_refresh(const rgw_user& user, const rgw_bucket& bucket, RGWQuotaCacheStats& qs);
+ void async_refresh_response(const rgw_user& user, rgw_bucket& bucket, RGWStorageStats& stats);
+ void async_refresh_fail(const rgw_user& user, rgw_bucket& bucket);
+
+ class AsyncRefreshHandler {
+ protected:
+ rgw::sal::Driver* driver;
+ RGWQuotaCache<T> *cache;
+ public:
+ AsyncRefreshHandler(rgw::sal::Driver* _driver, RGWQuotaCache<T> *_cache) : driver(_driver), cache(_cache) {}
+ virtual ~AsyncRefreshHandler() {}
+
+ virtual int init_fetch() = 0;
+ virtual void drop_reference() = 0;
+ };
+
+ virtual AsyncRefreshHandler *allocate_refresh_handler(const rgw_user& user, const rgw_bucket& bucket) = 0;
+};
+
+template<class T>
+int RGWQuotaCache<T>::async_refresh(const rgw_user& user, const rgw_bucket& bucket, RGWQuotaCacheStats& qs)
+{
+ /* protect against multiple updates */
+ StatsAsyncTestSet test_update;
+ if (!map_find_and_update(user, bucket, &test_update)) {
+ /* most likely we just raced with another update */
+ return 0;
+ }
+
+ async_refcount->get();
+
+
+ AsyncRefreshHandler *handler = allocate_refresh_handler(user, bucket);
+
+ int ret = handler->init_fetch();
+ if (ret < 0) {
+ async_refcount->put();
+ handler->drop_reference();
+ return ret;
+ }
+
+ return 0;
+}
+
+template<class T>
+void RGWQuotaCache<T>::async_refresh_fail(const rgw_user& user, rgw_bucket& bucket)
+{
+ ldout(driver->ctx(), 20) << "async stats refresh response for bucket=" << bucket << dendl;
+
+ async_refcount->put();
+}
+
+template<class T>
+void RGWQuotaCache<T>::async_refresh_response(const rgw_user& user, rgw_bucket& bucket, RGWStorageStats& stats)
+{
+ ldout(driver->ctx(), 20) << "async stats refresh response for bucket=" << bucket << dendl;
+
+ RGWQuotaCacheStats qs;
+
+ map_find(user, bucket, qs);
+
+ set_stats(user, bucket, qs, stats);
+
+ async_refcount->put();
+}
+
+template<class T>
+void RGWQuotaCache<T>::set_stats(const rgw_user& user, const rgw_bucket& bucket, RGWQuotaCacheStats& qs, RGWStorageStats& stats)
+{
+ qs.stats = stats;
+ qs.expiration = ceph_clock_now();
+ qs.async_refresh_time = qs.expiration;
+ qs.expiration += driver->ctx()->_conf->rgw_bucket_quota_ttl;
+ qs.async_refresh_time += driver->ctx()->_conf->rgw_bucket_quota_ttl / 2;
+
+ map_add(user, bucket, qs);
+}
+
+template<class T>
+int RGWQuotaCache<T>::get_stats(const rgw_user& user, const rgw_bucket& bucket, RGWStorageStats& stats, optional_yield y, const DoutPrefixProvider* dpp) {
+ RGWQuotaCacheStats qs;
+ utime_t now = ceph_clock_now();
+ if (map_find(user, bucket, qs)) {
+ if (qs.async_refresh_time.sec() > 0 && now >= qs.async_refresh_time) {
+ int r = async_refresh(user, bucket, qs);
+ if (r < 0) {
+ ldpp_dout(dpp, 0) << "ERROR: quota async refresh returned ret=" << r << dendl;
+
+ /* continue processing, might be a transient error, async refresh is just optimization */
+ }
+ }
+
+ if (qs.expiration > ceph_clock_now()) {
+ stats = qs.stats;
+ return 0;
+ }
+ }
+
+ int ret = fetch_stats_from_storage(user, bucket, stats, y, dpp);
+ if (ret < 0 && ret != -ENOENT)
+ return ret;
+
+ set_stats(user, bucket, qs, stats);
+
+ return 0;
+}
+
+
+template<class T>
+class RGWQuotaStatsUpdate : public lru_map<T, RGWQuotaCacheStats>::UpdateContext {
+ const int objs_delta;
+ const uint64_t added_bytes;
+ const uint64_t removed_bytes;
+public:
+ RGWQuotaStatsUpdate(const int objs_delta,
+ const uint64_t added_bytes,
+ const uint64_t removed_bytes)
+ : objs_delta(objs_delta),
+ added_bytes(added_bytes),
+ removed_bytes(removed_bytes) {
+ }
+
+ bool update(RGWQuotaCacheStats * const entry) override {
+ const uint64_t rounded_added = rgw_rounded_objsize(added_bytes);
+ const uint64_t rounded_removed = rgw_rounded_objsize(removed_bytes);
+
+ if (((int64_t)(entry->stats.size + added_bytes - removed_bytes)) >= 0) {
+ entry->stats.size += added_bytes - removed_bytes;
+ } else {
+ entry->stats.size = 0;
+ }
+
+ if (((int64_t)(entry->stats.size_rounded + rounded_added - rounded_removed)) >= 0) {
+ entry->stats.size_rounded += rounded_added - rounded_removed;
+ } else {
+ entry->stats.size_rounded = 0;
+ }
+
+ if (((int64_t)(entry->stats.num_objects + objs_delta)) >= 0) {
+ entry->stats.num_objects += objs_delta;
+ } else {
+ entry->stats.num_objects = 0;
+ }
+
+ return true;
+ }
+};
+
+
+template<class T>
+void RGWQuotaCache<T>::adjust_stats(const rgw_user& user, rgw_bucket& bucket, int objs_delta,
+ uint64_t added_bytes, uint64_t removed_bytes)
+{
+ RGWQuotaStatsUpdate<T> update(objs_delta, added_bytes, removed_bytes);
+ map_find_and_update(user, bucket, &update);
+
+ data_modified(user, bucket);
+}
+
+class BucketAsyncRefreshHandler : public RGWQuotaCache<rgw_bucket>::AsyncRefreshHandler,
+ public RGWGetBucketStats_CB {
+ rgw_user user;
+public:
+ BucketAsyncRefreshHandler(rgw::sal::Driver* _driver, RGWQuotaCache<rgw_bucket> *_cache,
+ const rgw_user& _user, const rgw_bucket& _bucket) :
+ RGWQuotaCache<rgw_bucket>::AsyncRefreshHandler(_driver, _cache),
+ RGWGetBucketStats_CB(_bucket), user(_user) {}
+
+ void drop_reference() override { put(); }
+ void handle_response(int r) override;
+ int init_fetch() override;
+};
+
+int BucketAsyncRefreshHandler::init_fetch()
+{
+ std::unique_ptr<rgw::sal::Bucket> rbucket;
+
+ const DoutPrefix dp(driver->ctx(), dout_subsys, "rgw bucket async refresh handler: ");
+ int r = driver->get_bucket(&dp, nullptr, bucket, &rbucket, null_yield);
+ if (r < 0) {
+ ldpp_dout(&dp, 0) << "could not get bucket info for bucket=" << bucket << " r=" << r << dendl;
+ return r;
+ }
+
+ ldpp_dout(&dp, 20) << "initiating async quota refresh for bucket=" << bucket << dendl;
+
+ const auto& index = rbucket->get_info().get_current_index();
+ if (is_layout_indexless(index)) {
+ return 0;
+ }
+
+ r = rbucket->read_stats_async(&dp, index, RGW_NO_SHARD, this);
+ if (r < 0) {
+ ldpp_dout(&dp, 0) << "could not get bucket info for bucket=" << bucket.name << dendl;
+
+ /* read_stats_async() dropped our reference already */
+ return r;
+ }
+
+ return 0;
+}
+
+void BucketAsyncRefreshHandler::handle_response(const int r)
+{
+ if (r < 0) {
+ ldout(driver->ctx(), 20) << "AsyncRefreshHandler::handle_response() r=" << r << dendl;
+ cache->async_refresh_fail(user, bucket);
+ return;
+ }
+
+ RGWStorageStats bs;
+
+ for (const auto& pair : *stats) {
+ const RGWStorageStats& s = pair.second;
+
+ bs.size += s.size;
+ bs.size_rounded += s.size_rounded;
+ bs.num_objects += s.num_objects;
+ }
+
+ cache->async_refresh_response(user, bucket, bs);
+}
+
+class RGWBucketStatsCache : public RGWQuotaCache<rgw_bucket> {
+protected:
+ bool map_find(const rgw_user& user, const rgw_bucket& bucket, RGWQuotaCacheStats& qs) override {
+ return stats_map.find(bucket, qs);
+ }
+
+ bool map_find_and_update(const rgw_user& user, const rgw_bucket& bucket, lru_map<rgw_bucket, RGWQuotaCacheStats>::UpdateContext *ctx) override {
+ return stats_map.find_and_update(bucket, NULL, ctx);
+ }
+
+ void map_add(const rgw_user& user, const rgw_bucket& bucket, RGWQuotaCacheStats& qs) override {
+ stats_map.add(bucket, qs);
+ }
+
+ int fetch_stats_from_storage(const rgw_user& user, const rgw_bucket& bucket, RGWStorageStats& stats, optional_yield y, const DoutPrefixProvider *dpp) override;
+
+public:
+ explicit RGWBucketStatsCache(rgw::sal::Driver* _driver) : RGWQuotaCache<rgw_bucket>(_driver, _driver->ctx()->_conf->rgw_bucket_quota_cache_size) {
+ }
+
+ AsyncRefreshHandler *allocate_refresh_handler(const rgw_user& user, const rgw_bucket& bucket) override {
+ return new BucketAsyncRefreshHandler(driver, this, user, bucket);
+ }
+};
+
+int RGWBucketStatsCache::fetch_stats_from_storage(const rgw_user& _u, const rgw_bucket& _b, RGWStorageStats& stats, optional_yield y, const DoutPrefixProvider *dpp)
+{
+ std::unique_ptr<rgw::sal::User> user = driver->get_user(_u);
+ std::unique_ptr<rgw::sal::Bucket> bucket;
+
+ int r = driver->get_bucket(dpp, user.get(), _b, &bucket, y);
+ if (r < 0) {
+ ldpp_dout(dpp, 0) << "could not get bucket info for bucket=" << _b << " r=" << r << dendl;
+ return r;
+ }
+
+ stats = RGWStorageStats();
+
+ const auto& index = bucket->get_info().get_current_index();
+ if (is_layout_indexless(index)) {
+ return 0;
+ }
+
+ string bucket_ver;
+ string master_ver;
+
+ map<RGWObjCategory, RGWStorageStats> bucket_stats;
+ r = bucket->read_stats(dpp, index, RGW_NO_SHARD, &bucket_ver,
+ &master_ver, bucket_stats, nullptr);
+ if (r < 0) {
+ ldpp_dout(dpp, 0) << "could not get bucket stats for bucket="
+ << _b.name << dendl;
+ return r;
+ }
+
+ for (const auto& pair : bucket_stats) {
+ const RGWStorageStats& s = pair.second;
+
+ stats.size += s.size;
+ stats.size_rounded += s.size_rounded;
+ stats.num_objects += s.num_objects;
+ }
+
+ return 0;
+}
+
+class UserAsyncRefreshHandler : public RGWQuotaCache<rgw_user>::AsyncRefreshHandler,
+ public RGWGetUserStats_CB {
+ const DoutPrefixProvider *dpp;
+ rgw_bucket bucket;
+public:
+ UserAsyncRefreshHandler(const DoutPrefixProvider *_dpp, rgw::sal::Driver* _driver, RGWQuotaCache<rgw_user> *_cache,
+ const rgw_user& _user, const rgw_bucket& _bucket) :
+ RGWQuotaCache<rgw_user>::AsyncRefreshHandler(_driver, _cache),
+ RGWGetUserStats_CB(_user),
+ dpp(_dpp),
+ bucket(_bucket) {}
+
+ void drop_reference() override { put(); }
+ int init_fetch() override;
+ void handle_response(int r) override;
+};
+
+int UserAsyncRefreshHandler::init_fetch()
+{
+ std::unique_ptr<rgw::sal::User> ruser = driver->get_user(user);
+
+ ldpp_dout(dpp, 20) << "initiating async quota refresh for user=" << user << dendl;
+ int r = ruser->read_stats_async(dpp, this);
+ if (r < 0) {
+ ldpp_dout(dpp, 0) << "could not get bucket info for user=" << user << dendl;
+
+ /* get_bucket_stats_async() dropped our reference already */
+ return r;
+ }
+
+ return 0;
+}
+
+void UserAsyncRefreshHandler::handle_response(int r)
+{
+ if (r < 0) {
+ ldout(driver->ctx(), 20) << "AsyncRefreshHandler::handle_response() r=" << r << dendl;
+ cache->async_refresh_fail(user, bucket);
+ return;
+ }
+
+ cache->async_refresh_response(user, bucket, stats);
+}
+
+class RGWUserStatsCache : public RGWQuotaCache<rgw_user> {
+ const DoutPrefixProvider *dpp;
+ std::atomic<bool> down_flag = { false };
+ ceph::shared_mutex mutex = ceph::make_shared_mutex("RGWUserStatsCache");
+ map<rgw_bucket, rgw_user> modified_buckets;
+
+ /* thread, sync recent modified buckets info */
+ class BucketsSyncThread : public Thread {
+ CephContext *cct;
+ RGWUserStatsCache *stats;
+
+ ceph::mutex lock = ceph::make_mutex("RGWUserStatsCache::BucketsSyncThread");
+ ceph::condition_variable cond;
+ public:
+
+ BucketsSyncThread(CephContext *_cct, RGWUserStatsCache *_s) : cct(_cct), stats(_s) {}
+
+ void *entry() override {
+ ldout(cct, 20) << "BucketsSyncThread: start" << dendl;
+ do {
+ map<rgw_bucket, rgw_user> buckets;
+
+ stats->swap_modified_buckets(buckets);
+
+ for (map<rgw_bucket, rgw_user>::iterator iter = buckets.begin(); iter != buckets.end(); ++iter) {
+ rgw_bucket bucket = iter->first;
+ rgw_user& user = iter->second;
+ ldout(cct, 20) << "BucketsSyncThread: sync user=" << user << " bucket=" << bucket << dendl;
+ const DoutPrefix dp(cct, dout_subsys, "rgw bucket sync thread: ");
+ int r = stats->sync_bucket(user, bucket, null_yield, &dp);
+ if (r < 0) {
+ ldout(cct, 0) << "WARNING: sync_bucket() returned r=" << r << dendl;
+ }
+ }
+
+ if (stats->going_down())
+ break;
+
+ std::unique_lock locker{lock};
+ cond.wait_for(
+ locker,
+ std::chrono::seconds(cct->_conf->rgw_user_quota_bucket_sync_interval));
+ } while (!stats->going_down());
+ ldout(cct, 20) << "BucketsSyncThread: done" << dendl;
+
+ return NULL;
+ }
+
+ void stop() {
+ std::lock_guard l{lock};
+ cond.notify_all();
+ }
+ };
+
+ /*
+ * thread, full sync all users stats periodically
+ *
+ * only sync non idle users or ones that never got synced before, this is needed so that
+ * users that didn't have quota turned on before (or existed before the user objclass
+ * tracked stats) need to get their backend stats up to date.
+ */
+ class UserSyncThread : public Thread {
+ CephContext *cct;
+ RGWUserStatsCache *stats;
+
+ ceph::mutex lock = ceph::make_mutex("RGWUserStatsCache::UserSyncThread");
+ ceph::condition_variable cond;
+ public:
+
+ UserSyncThread(CephContext *_cct, RGWUserStatsCache *_s) : cct(_cct), stats(_s) {}
+
+ void *entry() override {
+ ldout(cct, 20) << "UserSyncThread: start" << dendl;
+ do {
+ const DoutPrefix dp(cct, dout_subsys, "rgw user sync thread: ");
+ int ret = stats->sync_all_users(&dp, null_yield);
+ if (ret < 0) {
+ ldout(cct, 5) << "ERROR: sync_all_users() returned ret=" << ret << dendl;
+ }
+
+ if (stats->going_down())
+ break;
+
+ std::unique_lock l{lock};
+ cond.wait_for(l, std::chrono::seconds(cct->_conf->rgw_user_quota_sync_interval));
+ } while (!stats->going_down());
+ ldout(cct, 20) << "UserSyncThread: done" << dendl;
+
+ return NULL;
+ }
+
+ void stop() {
+ std::lock_guard l{lock};
+ cond.notify_all();
+ }
+ };
+
+ BucketsSyncThread *buckets_sync_thread;
+ UserSyncThread *user_sync_thread;
+protected:
+ bool map_find(const rgw_user& user,const rgw_bucket& bucket, RGWQuotaCacheStats& qs) override {
+ return stats_map.find(user, qs);
+ }
+
+ bool map_find_and_update(const rgw_user& user, const rgw_bucket& bucket, lru_map<rgw_user, RGWQuotaCacheStats>::UpdateContext *ctx) override {
+ return stats_map.find_and_update(user, NULL, ctx);
+ }
+
+ void map_add(const rgw_user& user, const rgw_bucket& bucket, RGWQuotaCacheStats& qs) override {
+ stats_map.add(user, qs);
+ }
+
+ int fetch_stats_from_storage(const rgw_user& user, const rgw_bucket& bucket, RGWStorageStats& stats, optional_yield y, const DoutPrefixProvider *dpp) override;
+ int sync_bucket(const rgw_user& rgw_user, rgw_bucket& bucket, optional_yield y, const DoutPrefixProvider *dpp);
+ int sync_user(const DoutPrefixProvider *dpp, const rgw_user& user, optional_yield y);
+ int sync_all_users(const DoutPrefixProvider *dpp, optional_yield y);
+
+ void data_modified(const rgw_user& user, rgw_bucket& bucket) override;
+
+ void swap_modified_buckets(map<rgw_bucket, rgw_user>& out) {
+ std::unique_lock lock{mutex};
+ modified_buckets.swap(out);
+ }
+
+ template<class T> /* easier doing it as a template, Thread doesn't have ->stop() */
+ void stop_thread(T **pthr) {
+ T *thread = *pthr;
+ if (!thread)
+ return;
+
+ thread->stop();
+ thread->join();
+ delete thread;
+ *pthr = NULL;
+ }
+
+public:
+ RGWUserStatsCache(const DoutPrefixProvider *dpp, rgw::sal::Driver* _driver, bool quota_threads)
+ : RGWQuotaCache<rgw_user>(_driver, _driver->ctx()->_conf->rgw_bucket_quota_cache_size), dpp(dpp)
+ {
+ if (quota_threads) {
+ buckets_sync_thread = new BucketsSyncThread(driver->ctx(), this);
+ buckets_sync_thread->create("rgw_buck_st_syn");
+ user_sync_thread = new UserSyncThread(driver->ctx(), this);
+ user_sync_thread->create("rgw_user_st_syn");
+ } else {
+ buckets_sync_thread = NULL;
+ user_sync_thread = NULL;
+ }
+ }
+ ~RGWUserStatsCache() override {
+ stop();
+ }
+
+ AsyncRefreshHandler *allocate_refresh_handler(const rgw_user& user, const rgw_bucket& bucket) override {
+ return new UserAsyncRefreshHandler(dpp, driver, this, user, bucket);
+ }
+
+ bool going_down() {
+ return down_flag;
+ }
+
+ void stop() {
+ down_flag = true;
+ {
+ std::unique_lock lock{mutex};
+ stop_thread(&buckets_sync_thread);
+ }
+ stop_thread(&user_sync_thread);
+ }
+};
+
+int RGWUserStatsCache::fetch_stats_from_storage(const rgw_user& _u,
+ const rgw_bucket& _b,
+ RGWStorageStats& stats,
+ optional_yield y,
+ const DoutPrefixProvider *dpp)
+{
+ std::unique_ptr<rgw::sal::User> user = driver->get_user(_u);
+ int r = user->read_stats(dpp, y, &stats);
+ if (r < 0) {
+ ldpp_dout(dpp, 0) << "could not get user stats for user=" << user << dendl;
+ return r;
+ }
+
+ return 0;
+}
+
+int RGWUserStatsCache::sync_bucket(const rgw_user& _u, rgw_bucket& _b, optional_yield y, const DoutPrefixProvider *dpp)
+{
+ std::unique_ptr<rgw::sal::User> user = driver->get_user(_u);
+ std::unique_ptr<rgw::sal::Bucket> bucket;
+
+ int r = driver->get_bucket(dpp, user.get(), _b, &bucket, y);
+ if (r < 0) {
+ ldpp_dout(dpp, 0) << "could not get bucket info for bucket=" << _b << " r=" << r << dendl;
+ return r;
+ }
+
+ r = bucket->sync_user_stats(dpp, y);
+ if (r < 0) {
+ ldpp_dout(dpp, 0) << "ERROR: sync_user_stats() for user=" << _u << ", bucket=" << bucket << " returned " << r << dendl;
+ return r;
+ }
+
+ return bucket->check_bucket_shards(dpp);
+}
+
+int RGWUserStatsCache::sync_user(const DoutPrefixProvider *dpp, const rgw_user& _u, optional_yield y)
+{
+ RGWStorageStats stats;
+ ceph::real_time last_stats_sync;
+ ceph::real_time last_stats_update;
+ std::unique_ptr<rgw::sal::User> user = driver->get_user(rgw_user(_u.to_str()));
+
+ int ret = user->read_stats(dpp, y, &stats, &last_stats_sync, &last_stats_update);
+ if (ret < 0) {
+ ldpp_dout(dpp, 5) << "ERROR: can't read user header: ret=" << ret << dendl;
+ return ret;
+ }
+
+ if (!driver->ctx()->_conf->rgw_user_quota_sync_idle_users &&
+ last_stats_update < last_stats_sync) {
+ ldpp_dout(dpp, 20) << "user is idle, not doing a full sync (user=" << user << ")" << dendl;
+ return 0;
+ }
+
+ real_time when_need_full_sync = last_stats_sync;
+ when_need_full_sync += make_timespan(driver->ctx()->_conf->rgw_user_quota_sync_wait_time);
+
+ // check if enough time passed since last full sync
+ /* FIXME: missing check? */
+
+ ret = rgw_user_sync_all_stats(dpp, driver, user.get(), y);
+ if (ret < 0) {
+ ldpp_dout(dpp, 0) << "ERROR: failed user stats sync, ret=" << ret << dendl;
+ return ret;
+ }
+
+ return 0;
+}
+
+int RGWUserStatsCache::sync_all_users(const DoutPrefixProvider *dpp, optional_yield y)
+{
+ string key = "user";
+ void *handle;
+
+ int ret = driver->meta_list_keys_init(dpp, key, string(), &handle);
+ if (ret < 0) {
+ ldpp_dout(dpp, 10) << "ERROR: can't get key: ret=" << ret << dendl;
+ return ret;
+ }
+
+ bool truncated;
+ int max = 1000;
+
+ do {
+ list<string> keys;
+ ret = driver->meta_list_keys_next(dpp, handle, max, keys, &truncated);
+ if (ret < 0) {
+ ldpp_dout(dpp, 0) << "ERROR: lists_keys_next(): ret=" << ret << dendl;
+ goto done;
+ }
+ for (list<string>::iterator iter = keys.begin();
+ iter != keys.end() && !going_down();
+ ++iter) {
+ rgw_user user(*iter);
+ ldpp_dout(dpp, 20) << "RGWUserStatsCache: sync user=" << user << dendl;
+ int ret = sync_user(dpp, user, y);
+ if (ret < 0) {
+ ldpp_dout(dpp, 5) << "ERROR: sync_user() failed, user=" << user << " ret=" << ret << dendl;
+
+ /* continuing to next user */
+ continue;
+ }
+ }
+ } while (truncated);
+
+ ret = 0;
+done:
+ driver->meta_list_keys_complete(handle);
+ return ret;
+}
+
+void RGWUserStatsCache::data_modified(const rgw_user& user, rgw_bucket& bucket)
+{
+ /* racy, but it's ok */
+ mutex.lock_shared();
+ bool need_update = modified_buckets.find(bucket) == modified_buckets.end();
+ mutex.unlock_shared();
+
+ if (need_update) {
+ std::unique_lock lock{mutex};
+ modified_buckets[bucket] = user;
+ }
+}
+
+
+class RGWQuotaInfoApplier {
+ /* NOTE: no non-static field allowed as instances are supposed to live in
+ * the static memory only. */
+protected:
+ RGWQuotaInfoApplier() = default;
+
+public:
+ virtual ~RGWQuotaInfoApplier() {}
+
+ virtual bool is_size_exceeded(const DoutPrefixProvider *dpp,
+ const char * const entity,
+ const RGWQuotaInfo& qinfo,
+ const RGWStorageStats& stats,
+ const uint64_t size) const = 0;
+
+ virtual bool is_num_objs_exceeded(const DoutPrefixProvider *dpp,
+ const char * const entity,
+ const RGWQuotaInfo& qinfo,
+ const RGWStorageStats& stats,
+ const uint64_t num_objs) const = 0;
+
+ static const RGWQuotaInfoApplier& get_instance(const RGWQuotaInfo& qinfo);
+};
+
+class RGWQuotaInfoDefApplier : public RGWQuotaInfoApplier {
+public:
+ bool is_size_exceeded(const DoutPrefixProvider *dpp, const char * const entity,
+ const RGWQuotaInfo& qinfo,
+ const RGWStorageStats& stats,
+ const uint64_t size) const override;
+
+ bool is_num_objs_exceeded(const DoutPrefixProvider *dpp, const char * const entity,
+ const RGWQuotaInfo& qinfo,
+ const RGWStorageStats& stats,
+ const uint64_t num_objs) const override;
+};
+
+class RGWQuotaInfoRawApplier : public RGWQuotaInfoApplier {
+public:
+ bool is_size_exceeded(const DoutPrefixProvider *dpp, const char * const entity,
+ const RGWQuotaInfo& qinfo,
+ const RGWStorageStats& stats,
+ const uint64_t size) const override;
+
+ bool is_num_objs_exceeded(const DoutPrefixProvider *dpp, const char * const entity,
+ const RGWQuotaInfo& qinfo,
+ const RGWStorageStats& stats,
+ const uint64_t num_objs) const override;
+};
+
+
+bool RGWQuotaInfoDefApplier::is_size_exceeded(const DoutPrefixProvider *dpp,
+ const char * const entity,
+ const RGWQuotaInfo& qinfo,
+ const RGWStorageStats& stats,
+ const uint64_t size) const
+{
+ if (qinfo.max_size < 0) {
+ /* The limit is not enabled. */
+ return false;
+ }
+
+ const uint64_t cur_size = stats.size_rounded;
+ const uint64_t new_size = rgw_rounded_objsize(size);
+
+ if (std::cmp_greater(cur_size + new_size, qinfo.max_size)) {
+ ldpp_dout(dpp, 10) << "quota exceeded: stats.size_rounded=" << stats.size_rounded
+ << " size=" << new_size << " "
+ << entity << "_quota.max_size=" << qinfo.max_size << dendl;
+ return true;
+ }
+
+ return false;
+}
+
+bool RGWQuotaInfoDefApplier::is_num_objs_exceeded(const DoutPrefixProvider *dpp,
+ const char * const entity,
+ const RGWQuotaInfo& qinfo,
+ const RGWStorageStats& stats,
+ const uint64_t num_objs) const
+{
+ if (qinfo.max_objects < 0) {
+ /* The limit is not enabled. */
+ return false;
+ }
+
+ if (std::cmp_greater(stats.num_objects + num_objs, qinfo.max_objects)) {
+ ldpp_dout(dpp, 10) << "quota exceeded: stats.num_objects=" << stats.num_objects
+ << " " << entity << "_quota.max_objects=" << qinfo.max_objects
+ << dendl;
+ return true;
+ }
+
+ return false;
+}
+
+bool RGWQuotaInfoRawApplier::is_size_exceeded(const DoutPrefixProvider *dpp,
+ const char * const entity,
+ const RGWQuotaInfo& qinfo,
+ const RGWStorageStats& stats,
+ const uint64_t size) const
+{
+ if (qinfo.max_size < 0) {
+ /* The limit is not enabled. */
+ return false;
+ }
+
+ const uint64_t cur_size = stats.size;
+
+ if (std::cmp_greater(cur_size + size, qinfo.max_size)) {
+ ldpp_dout(dpp, 10) << "quota exceeded: stats.size=" << stats.size
+ << " size=" << size << " "
+ << entity << "_quota.max_size=" << qinfo.max_size << dendl;
+ return true;
+ }
+
+ return false;
+}
+
+bool RGWQuotaInfoRawApplier::is_num_objs_exceeded(const DoutPrefixProvider *dpp,
+ const char * const entity,
+ const RGWQuotaInfo& qinfo,
+ const RGWStorageStats& stats,
+ const uint64_t num_objs) const
+{
+ if (qinfo.max_objects < 0) {
+ /* The limit is not enabled. */
+ return false;
+ }
+
+ if (std::cmp_greater(stats.num_objects + num_objs, qinfo.max_objects)) {
+ ldpp_dout(dpp, 10) << "quota exceeded: stats.num_objects=" << stats.num_objects
+ << " " << entity << "_quota.max_objects=" << qinfo.max_objects
+ << dendl;
+ return true;
+ }
+
+ return false;
+}
+
+const RGWQuotaInfoApplier& RGWQuotaInfoApplier::get_instance(
+ const RGWQuotaInfo& qinfo)
+{
+ static RGWQuotaInfoDefApplier default_qapplier;
+ static RGWQuotaInfoRawApplier raw_qapplier;
+
+ if (qinfo.check_on_raw) {
+ return raw_qapplier;
+ } else {
+ return default_qapplier;
+ }
+}
+
+
+class RGWQuotaHandlerImpl : public RGWQuotaHandler {
+ rgw::sal::Driver* driver;
+ RGWBucketStatsCache bucket_stats_cache;
+ RGWUserStatsCache user_stats_cache;
+
+ int check_quota(const DoutPrefixProvider *dpp,
+ const char * const entity,
+ const RGWQuotaInfo& quota,
+ const RGWStorageStats& stats,
+ const uint64_t num_objs,
+ const uint64_t size) {
+ if (!quota.enabled) {
+ return 0;
+ }
+
+ const auto& quota_applier = RGWQuotaInfoApplier::get_instance(quota);
+
+ ldpp_dout(dpp, 20) << entity
+ << " quota: max_objects=" << quota.max_objects
+ << " max_size=" << quota.max_size << dendl;
+
+
+ if (quota_applier.is_num_objs_exceeded(dpp, entity, quota, stats, num_objs)) {
+ return -ERR_QUOTA_EXCEEDED;
+ }
+
+ if (quota_applier.is_size_exceeded(dpp, entity, quota, stats, size)) {
+ return -ERR_QUOTA_EXCEEDED;
+ }
+
+ ldpp_dout(dpp, 20) << entity << " quota OK:"
+ << " stats.num_objects=" << stats.num_objects
+ << " stats.size=" << stats.size << dendl;
+ return 0;
+ }
+public:
+ RGWQuotaHandlerImpl(const DoutPrefixProvider *dpp, rgw::sal::Driver* _driver, bool quota_threads) : driver(_driver),
+ bucket_stats_cache(_driver),
+ user_stats_cache(dpp, _driver, quota_threads) {}
+
+ int check_quota(const DoutPrefixProvider *dpp,
+ const rgw_user& user,
+ rgw_bucket& bucket,
+ RGWQuota& quota,
+ uint64_t num_objs,
+ uint64_t size, optional_yield y) override {
+
+ if (!quota.bucket_quota.enabled && !quota.user_quota.enabled) {
+ return 0;
+ }
+
+ /*
+ * we need to fetch bucket stats if the user quota is enabled, because
+ * the whole system relies on us periodically updating the user's bucket
+ * stats in the user's header, this happens in get_stats() if we actually
+ * fetch that info and not rely on cached data
+ */
+
+ const DoutPrefix dp(driver->ctx(), dout_subsys, "rgw quota handler: ");
+ if (quota.bucket_quota.enabled) {
+ RGWStorageStats bucket_stats;
+ int ret = bucket_stats_cache.get_stats(user, bucket, bucket_stats, y, &dp);
+ if (ret < 0) {
+ return ret;
+ }
+ ret = check_quota(dpp, "bucket", quota.bucket_quota, bucket_stats, num_objs, size);
+ if (ret < 0) {
+ return ret;
+ }
+ }
+
+ if (quota.user_quota.enabled) {
+ RGWStorageStats user_stats;
+ int ret = user_stats_cache.get_stats(user, bucket, user_stats, y, &dp);
+ if (ret < 0) {
+ return ret;
+ }
+ ret = check_quota(dpp, "user", quota.user_quota, user_stats, num_objs, size);
+ if (ret < 0) {
+ return ret;
+ }
+ }
+ return 0;
+ }
+
+ void update_stats(const rgw_user& user, rgw_bucket& bucket, int obj_delta, uint64_t added_bytes, uint64_t removed_bytes) override {
+ bucket_stats_cache.adjust_stats(user, bucket, obj_delta, added_bytes, removed_bytes);
+ user_stats_cache.adjust_stats(user, bucket, obj_delta, added_bytes, removed_bytes);
+ }
+
+ void check_bucket_shards(const DoutPrefixProvider *dpp, uint64_t max_objs_per_shard,
+ uint64_t num_shards, uint64_t num_objs, bool is_multisite,
+ bool& need_resharding, uint32_t *suggested_num_shards) override
+ {
+ if (num_objs > num_shards * max_objs_per_shard) {
+ ldpp_dout(dpp, 0) << __func__ << ": resharding needed: stats.num_objects=" << num_objs
+ << " shard max_objects=" << max_objs_per_shard * num_shards << dendl;
+ need_resharding = true;
+ if (suggested_num_shards) {
+ uint32_t obj_multiplier = 2;
+ if (is_multisite) {
+ // if we're maintaining bilogs for multisite, reshards are significantly
+ // more expensive. scale up the shard count much faster to minimize the
+ // number of reshard events during a write workload
+ obj_multiplier = 8;
+ }
+ *suggested_num_shards = num_objs * obj_multiplier / max_objs_per_shard;
+ }
+ } else {
+ need_resharding = false;
+ }
+ }
+};
+
+
+RGWQuotaHandler *RGWQuotaHandler::generate_handler(const DoutPrefixProvider *dpp, rgw::sal::Driver* driver, bool quota_threads)
+{
+ return new RGWQuotaHandlerImpl(dpp, driver, quota_threads);
+}
+
+void RGWQuotaHandler::free_handler(RGWQuotaHandler *handler)
+{
+ delete handler;
+}
+
+
+void rgw_apply_default_bucket_quota(RGWQuotaInfo& quota, const ConfigProxy& conf)
+{
+ if (conf->rgw_bucket_default_quota_max_objects >= 0) {
+ quota.max_objects = conf->rgw_bucket_default_quota_max_objects;
+ quota.enabled = true;
+ }
+ if (conf->rgw_bucket_default_quota_max_size >= 0) {
+ quota.max_size = conf->rgw_bucket_default_quota_max_size;
+ quota.enabled = true;
+ }
+}
+
+void rgw_apply_default_user_quota(RGWQuotaInfo& quota, const ConfigProxy& conf)
+{
+ if (conf->rgw_user_default_quota_max_objects >= 0) {
+ quota.max_objects = conf->rgw_user_default_quota_max_objects;
+ quota.enabled = true;
+ }
+ if (conf->rgw_user_default_quota_max_size >= 0) {
+ quota.max_size = conf->rgw_user_default_quota_max_size;
+ quota.enabled = true;
+ }
+}
+
+void RGWQuotaInfo::dump(Formatter *f) const
+{
+ f->dump_bool("enabled", enabled);
+ f->dump_bool("check_on_raw", check_on_raw);
+
+ f->dump_int("max_size", max_size);
+ f->dump_int("max_size_kb", rgw_rounded_kb(max_size));
+ f->dump_int("max_objects", max_objects);
+}
+
+void RGWQuotaInfo::decode_json(JSONObj *obj)
+{
+ if (false == JSONDecoder::decode_json("max_size", max_size, obj)) {
+ /* We're parsing an older version of the struct. */
+ int64_t max_size_kb = 0;
+
+ JSONDecoder::decode_json("max_size_kb", max_size_kb, obj);
+ max_size = max_size_kb * 1024;
+ }
+ JSONDecoder::decode_json("max_objects", max_objects, obj);
+
+ JSONDecoder::decode_json("check_on_raw", check_on_raw, obj);
+ JSONDecoder::decode_json("enabled", enabled, obj);
+}
+