diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-21 11:54:28 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-21 11:54:28 +0000 |
commit | e6918187568dbd01842d8d1d2c808ce16a894239 (patch) | |
tree | 64f88b554b444a49f656b6c656111a145cbbaa28 /src/rgw/rgw_lc.cc | |
parent | Initial commit. (diff) | |
download | ceph-e6918187568dbd01842d8d1d2c808ce16a894239.tar.xz ceph-e6918187568dbd01842d8d1d2c808ce16a894239.zip |
Adding upstream version 18.2.2.upstream/18.2.2
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'src/rgw/rgw_lc.cc')
-rw-r--r-- | src/rgw/rgw_lc.cc | 2869 |
1 files changed, 2869 insertions, 0 deletions
diff --git a/src/rgw/rgw_lc.cc b/src/rgw/rgw_lc.cc new file mode 100644 index 000000000..7f4a79501 --- /dev/null +++ b/src/rgw/rgw_lc.cc @@ -0,0 +1,2869 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab ft=cpp + +#include <string.h> +#include <iostream> +#include <map> +#include <algorithm> +#include <tuple> +#include <functional> + +#include <boost/algorithm/string/split.hpp> +#include <boost/algorithm/string.hpp> +#include <boost/algorithm/string/predicate.hpp> +#include <boost/variant.hpp> + +#include "include/scope_guard.h" +#include "include/function2.hpp" +#include "common/Formatter.h" +#include "common/containers.h" +#include "common/split.h" +#include <common/errno.h> +#include "include/random.h" +#include "cls/lock/cls_lock_client.h" +#include "rgw_perf_counters.h" +#include "rgw_common.h" +#include "rgw_bucket.h" +#include "rgw_lc.h" +#include "rgw_zone.h" +#include "rgw_string.h" +#include "rgw_multi.h" +#include "rgw_sal.h" +#include "rgw_lc_tier.h" +#include "rgw_notify.h" + +#include "fmt/format.h" + +#include "services/svc_sys_obj.h" +#include "services/svc_zone.h" +#include "services/svc_tier_rados.h" + +#define dout_context g_ceph_context +#define dout_subsys ceph_subsys_rgw + +using namespace std; + +const char* LC_STATUS[] = { + "UNINITIAL", + "PROCESSING", + "FAILED", + "COMPLETE" +}; + +using namespace librados; + +bool LCRule::valid() const +{ + if (id.length() > MAX_ID_LEN) { + return false; + } + else if(expiration.empty() && noncur_expiration.empty() && + mp_expiration.empty() && !dm_expiration && + transitions.empty() && noncur_transitions.empty()) { + return false; + } + else if (!expiration.valid() || !noncur_expiration.valid() || + !mp_expiration.valid()) { + return false; + } + if (!transitions.empty()) { + bool using_days = expiration.has_days(); + bool using_date = expiration.has_date(); + for (const auto& elem : transitions) { + if (!elem.second.valid()) { + return false; + } + using_days = using_days || elem.second.has_days(); + using_date = using_date || elem.second.has_date(); + if (using_days && using_date) { + return false; + } + } + } + for (const auto& elem : noncur_transitions) { + if (!elem.second.valid()) { + return false; + } + } + + return true; +} + +void LCRule::init_simple_days_rule(std::string_view _id, + std::string_view _prefix, int num_days) +{ + id = _id; + prefix = _prefix; + char buf[32]; + snprintf(buf, sizeof(buf), "%d", num_days); + expiration.set_days(buf); + set_enabled(true); +} + +void RGWLifecycleConfiguration::add_rule(const LCRule& rule) +{ + auto& id = rule.get_id(); // note that this will return false for groups, but that's ok, we won't search groups + rule_map.insert(pair<string, LCRule>(id, rule)); +} + +bool RGWLifecycleConfiguration::_add_rule(const LCRule& rule) +{ + lc_op op(rule.get_id()); + op.status = rule.is_enabled(); + if (rule.get_expiration().has_days()) { + op.expiration = rule.get_expiration().get_days(); + } + if (rule.get_expiration().has_date()) { + op.expiration_date = ceph::from_iso_8601(rule.get_expiration().get_date()); + } + if (rule.get_noncur_expiration().has_days()) { + op.noncur_expiration = rule.get_noncur_expiration().get_days(); + } + if (rule.get_mp_expiration().has_days()) { + op.mp_expiration = rule.get_mp_expiration().get_days(); + } + op.dm_expiration = rule.get_dm_expiration(); + for (const auto &elem : rule.get_transitions()) { + transition_action action; + if (elem.second.has_days()) { + action.days = elem.second.get_days(); + } else { + action.date = ceph::from_iso_8601(elem.second.get_date()); + } + action.storage_class + = rgw_placement_rule::get_canonical_storage_class(elem.first); + op.transitions.emplace(elem.first, std::move(action)); + } + for (const auto &elem : rule.get_noncur_transitions()) { + transition_action action; + action.days = elem.second.get_days(); + action.date = ceph::from_iso_8601(elem.second.get_date()); + action.storage_class + = rgw_placement_rule::get_canonical_storage_class(elem.first); + op.noncur_transitions.emplace(elem.first, std::move(action)); + } + std::string prefix; + if (rule.get_filter().has_prefix()){ + prefix = rule.get_filter().get_prefix(); + } else { + prefix = rule.get_prefix(); + } + if (rule.get_filter().has_tags()){ + op.obj_tags = rule.get_filter().get_tags(); + } + op.rule_flags = rule.get_filter().get_flags(); + prefix_map.emplace(std::move(prefix), std::move(op)); + return true; +} + +int RGWLifecycleConfiguration::check_and_add_rule(const LCRule& rule) +{ + if (!rule.valid()) { + return -EINVAL; + } + auto& id = rule.get_id(); + if (rule_map.find(id) != rule_map.end()) { //id shouldn't be the same + return -EINVAL; + } + if (rule.get_filter().has_tags() && (rule.get_dm_expiration() || + !rule.get_mp_expiration().empty())) { + return -ERR_INVALID_REQUEST; + } + rule_map.insert(pair<string, LCRule>(id, rule)); + + if (!_add_rule(rule)) { + return -ERR_INVALID_REQUEST; + } + return 0; +} + +bool RGWLifecycleConfiguration::has_same_action(const lc_op& first, + const lc_op& second) { + if ((first.expiration > 0 || first.expiration_date != boost::none) && + (second.expiration > 0 || second.expiration_date != boost::none)) { + return true; + } else if (first.noncur_expiration > 0 && second.noncur_expiration > 0) { + return true; + } else if (first.mp_expiration > 0 && second.mp_expiration > 0) { + return true; + } else if (!first.transitions.empty() && !second.transitions.empty()) { + for (auto &elem : first.transitions) { + if (second.transitions.find(elem.first) != second.transitions.end()) { + return true; + } + } + } else if (!first.noncur_transitions.empty() && + !second.noncur_transitions.empty()) { + for (auto &elem : first.noncur_transitions) { + if (second.noncur_transitions.find(elem.first) != + second.noncur_transitions.end()) { + return true; + } + } + } + return false; +} + +/* Formerly, this method checked for duplicate rules using an invalid + * method (prefix uniqueness). */ +bool RGWLifecycleConfiguration::valid() +{ + return true; +} + +void *RGWLC::LCWorker::entry() { + do { + std::unique_ptr<rgw::sal::Bucket> all_buckets; // empty restriction + utime_t start = ceph_clock_now(); + if (should_work(start)) { + ldpp_dout(dpp, 2) << "life cycle: start" << dendl; + int r = lc->process(this, all_buckets, false /* once */); + if (r < 0) { + ldpp_dout(dpp, 0) << "ERROR: do life cycle process() returned error r=" + << r << dendl; + } + ldpp_dout(dpp, 2) << "life cycle: stop" << dendl; + cloud_targets.clear(); // clear cloud targets + } + if (lc->going_down()) + break; + + utime_t end = ceph_clock_now(); + int secs = schedule_next_start_time(start, end); + utime_t next; + next.set_from_double(end + secs); + + ldpp_dout(dpp, 5) << "schedule life cycle next start time: " + << rgw_to_asctime(next) << dendl; + + std::unique_lock l{lock}; + cond.wait_for(l, std::chrono::seconds(secs)); + } while (!lc->going_down()); + + return NULL; +} + +void RGWLC::initialize(CephContext *_cct, rgw::sal::Driver* _driver) { + cct = _cct; + driver = _driver; + sal_lc = driver->get_lifecycle(); + max_objs = cct->_conf->rgw_lc_max_objs; + if (max_objs > HASH_PRIME) + max_objs = HASH_PRIME; + + obj_names = new string[max_objs]; + + for (int i = 0; i < max_objs; i++) { + obj_names[i] = lc_oid_prefix; + char buf[32]; + snprintf(buf, 32, ".%d", i); + obj_names[i].append(buf); + } + +#define COOKIE_LEN 16 + char cookie_buf[COOKIE_LEN + 1]; + gen_rand_alphanumeric(cct, cookie_buf, sizeof(cookie_buf) - 1); + cookie = cookie_buf; +} + +void RGWLC::finalize() +{ + delete[] obj_names; +} + +static inline std::ostream& operator<<(std::ostream &os, rgw::sal::Lifecycle::LCEntry& ent) { + os << "<ent: bucket="; + os << ent.get_bucket(); + os << "; start_time="; + os << rgw_to_asctime(utime_t(time_t(ent.get_start_time()), 0)); + os << "; status="; + os << LC_STATUS[ent.get_status()]; + os << ">"; + return os; +} + +static bool obj_has_expired(const DoutPrefixProvider *dpp, CephContext *cct, ceph::real_time mtime, int days, + ceph::real_time *expire_time = nullptr) +{ + double timediff, cmp; + utime_t base_time; + if (cct->_conf->rgw_lc_debug_interval <= 0) { + /* Normal case, run properly */ + cmp = double(days)*24*60*60; + base_time = ceph_clock_now().round_to_day(); + } else { + /* We're in debug mode; Treat each rgw_lc_debug_interval seconds as a day */ + cmp = double(days)*cct->_conf->rgw_lc_debug_interval; + base_time = ceph_clock_now(); + } + auto tt_mtime = ceph::real_clock::to_time_t(mtime); + timediff = base_time - tt_mtime; + + if (expire_time) { + *expire_time = mtime + make_timespan(cmp); + } + + ldpp_dout(dpp, 20) << __func__ + << "(): mtime=" << mtime << " days=" << days + << " base_time=" << base_time << " timediff=" << timediff + << " cmp=" << cmp + << " is_expired=" << (timediff >= cmp) + << dendl; + + return (timediff >= cmp); +} + +static bool pass_object_lock_check(rgw::sal::Driver* driver, rgw::sal::Object* obj, const DoutPrefixProvider *dpp) +{ + if (!obj->get_bucket()->get_info().obj_lock_enabled()) { + return true; + } + std::unique_ptr<rgw::sal::Object::ReadOp> read_op = obj->get_read_op(); + int ret = read_op->prepare(null_yield, dpp); + if (ret < 0) { + if (ret == -ENOENT) { + return true; + } else { + return false; + } + } else { + auto iter = obj->get_attrs().find(RGW_ATTR_OBJECT_RETENTION); + if (iter != obj->get_attrs().end()) { + RGWObjectRetention retention; + try { + decode(retention, iter->second); + } catch (buffer::error& err) { + ldpp_dout(dpp, 0) << "ERROR: failed to decode RGWObjectRetention" + << dendl; + return false; + } + if (ceph::real_clock::to_time_t(retention.get_retain_until_date()) > + ceph_clock_now()) { + return false; + } + } + iter = obj->get_attrs().find(RGW_ATTR_OBJECT_LEGAL_HOLD); + if (iter != obj->get_attrs().end()) { + RGWObjectLegalHold obj_legal_hold; + try { + decode(obj_legal_hold, iter->second); + } catch (buffer::error& err) { + ldpp_dout(dpp, 0) << "ERROR: failed to decode RGWObjectLegalHold" + << dendl; + return false; + } + if (obj_legal_hold.is_enabled()) { + return false; + } + } + return true; + } +} + +class LCObjsLister { + rgw::sal::Driver* driver; + rgw::sal::Bucket* bucket; + rgw::sal::Bucket::ListParams list_params; + rgw::sal::Bucket::ListResults list_results; + string prefix; + vector<rgw_bucket_dir_entry>::iterator obj_iter; + rgw_bucket_dir_entry pre_obj; + int64_t delay_ms; + +public: + LCObjsLister(rgw::sal::Driver* _driver, rgw::sal::Bucket* _bucket) : + driver(_driver), bucket(_bucket) { + list_params.list_versions = bucket->versioned(); + list_params.allow_unordered = true; + delay_ms = driver->ctx()->_conf.get_val<int64_t>("rgw_lc_thread_delay"); + } + + void set_prefix(const string& p) { + prefix = p; + list_params.prefix = prefix; + } + + int init(const DoutPrefixProvider *dpp) { + return fetch(dpp); + } + + int fetch(const DoutPrefixProvider *dpp) { + int ret = bucket->list(dpp, list_params, 1000, list_results, null_yield); + if (ret < 0) { + return ret; + } + + obj_iter = list_results.objs.begin(); + + return 0; + } + + void delay() { + std::this_thread::sleep_for(std::chrono::milliseconds(delay_ms)); + } + + bool get_obj(const DoutPrefixProvider *dpp, rgw_bucket_dir_entry **obj, + std::function<void(void)> fetch_barrier + = []() { /* nada */}) { + if (obj_iter == list_results.objs.end()) { + if (!list_results.is_truncated) { + delay(); + return false; + } else { + fetch_barrier(); + list_params.marker = pre_obj.key; + int ret = fetch(dpp); + if (ret < 0) { + ldpp_dout(dpp, 0) << "ERROR: list_op returned ret=" << ret + << dendl; + return false; + } + } + delay(); + } + /* returning address of entry in objs */ + *obj = &(*obj_iter); + return obj_iter != list_results.objs.end(); + } + + rgw_bucket_dir_entry get_prev_obj() { + return pre_obj; + } + + void next() { + pre_obj = *obj_iter; + ++obj_iter; + } + + boost::optional<std::string> next_key_name() { + if (obj_iter == list_results.objs.end() || + (obj_iter + 1) == list_results.objs.end()) { + /* this should have been called after get_obj() was called, so this should + * only happen if is_truncated is false */ + return boost::none; + } + + return ((obj_iter + 1)->key.name); + } + +}; /* LCObjsLister */ + +struct op_env { + + using LCWorker = RGWLC::LCWorker; + + lc_op op; + rgw::sal::Driver* driver; + LCWorker* worker; + rgw::sal::Bucket* bucket; + LCObjsLister& ol; + + op_env(lc_op& _op, rgw::sal::Driver* _driver, LCWorker* _worker, + rgw::sal::Bucket* _bucket, LCObjsLister& _ol) + : op(_op), driver(_driver), worker(_worker), bucket(_bucket), + ol(_ol) {} +}; /* op_env */ + +class LCRuleOp; +class WorkQ; + +struct lc_op_ctx { + CephContext *cct; + op_env env; + rgw_bucket_dir_entry o; + boost::optional<std::string> next_key_name; + ceph::real_time effective_mtime; + + rgw::sal::Driver* driver; + rgw::sal::Bucket* bucket; + lc_op& op; // ok--refers to expanded env.op + LCObjsLister& ol; + + std::unique_ptr<rgw::sal::Object> obj; + RGWObjectCtx rctx; + const DoutPrefixProvider *dpp; + WorkQ* wq; + + std::unique_ptr<rgw::sal::PlacementTier> tier; + + lc_op_ctx(op_env& env, rgw_bucket_dir_entry& o, + boost::optional<std::string> next_key_name, + ceph::real_time effective_mtime, + const DoutPrefixProvider *dpp, WorkQ* wq) + : cct(env.driver->ctx()), env(env), o(o), next_key_name(next_key_name), + effective_mtime(effective_mtime), + driver(env.driver), bucket(env.bucket), op(env.op), ol(env.ol), + rctx(env.driver), dpp(dpp), wq(wq) + { + obj = bucket->get_object(o.key); + } + + bool next_has_same_name(const std::string& key_name) { + return (next_key_name && key_name.compare( + boost::get<std::string>(next_key_name)) == 0); + } + +}; /* lc_op_ctx */ + + +static std::string lc_id = "rgw lifecycle"; +static std::string lc_req_id = "0"; + +static int remove_expired_obj( + const DoutPrefixProvider *dpp, lc_op_ctx& oc, bool remove_indeed, + rgw::notify::EventType event_type) +{ + auto& driver = oc.driver; + auto& bucket_info = oc.bucket->get_info(); + auto& o = oc.o; + auto obj_key = o.key; + auto& meta = o.meta; + int ret; + std::string version_id; + std::unique_ptr<rgw::sal::Notification> notify; + + if (!remove_indeed) { + obj_key.instance.clear(); + } else if (obj_key.instance.empty()) { + obj_key.instance = "null"; + } + + std::unique_ptr<rgw::sal::Bucket> bucket; + std::unique_ptr<rgw::sal::Object> obj; + + ret = driver->get_bucket(nullptr, bucket_info, &bucket); + if (ret < 0) { + return ret; + } + + // XXXX currently, rgw::sal::Bucket.owner is always null here + std::unique_ptr<rgw::sal::User> user; + if (! bucket->get_owner()) { + auto& bucket_info = bucket->get_info(); + user = driver->get_user(bucket_info.owner); + // forgive me, lord + if (user) { + bucket->set_owner(user.get()); + } + } + + obj = bucket->get_object(obj_key); + + RGWObjState* obj_state{nullptr}; + ret = obj->get_obj_state(dpp, &obj_state, null_yield, true); + if (ret < 0) { + return ret; + } + + std::unique_ptr<rgw::sal::Object::DeleteOp> del_op + = obj->get_delete_op(); + del_op->params.versioning_status + = obj->get_bucket()->get_info().versioning_status(); + del_op->params.obj_owner.set_id(rgw_user {meta.owner}); + del_op->params.obj_owner.set_name(meta.owner_display_name); + del_op->params.bucket_owner.set_id(bucket_info.owner); + del_op->params.unmod_since = meta.mtime; + del_op->params.marker_version_id = version_id; + + // notification supported only for RADOS driver for now + notify = driver->get_notification(dpp, obj.get(), nullptr, event_type, + bucket.get(), lc_id, + const_cast<std::string&>(oc.bucket->get_tenant()), + lc_req_id, null_yield); + + ret = notify->publish_reserve(dpp, nullptr); + if ( ret < 0) { + ldpp_dout(dpp, 1) + << "ERROR: notify reservation failed, deferring delete of object k=" + << o.key + << dendl; + return ret; + } + ret = del_op->delete_obj(dpp, null_yield); + if (ret < 0) { + ldpp_dout(dpp, 1) << + "ERROR: publishing notification failed, with error: " << ret << dendl; + } else { + // send request to notification manager + (void) notify->publish_commit(dpp, obj_state->size, + ceph::real_clock::now(), + obj_state->attrset[RGW_ATTR_ETAG].to_str(), + version_id); + } + + return ret; + +} /* remove_expired_obj */ + +class LCOpAction { +public: + virtual ~LCOpAction() {} + + virtual bool check(lc_op_ctx& oc, ceph::real_time *exp_time, const DoutPrefixProvider *dpp) { + return false; + } + + /* called after check(). Check should tell us whether this action + * is applicable. If there are multiple actions, we'll end up executing + * the latest applicable action + * For example: + * one action after 10 days, another after 20, third after 40. + * After 10 days, the latest applicable action would be the first one, + * after 20 days it will be the second one. After 21 days it will still be the + * second one. So check() should return true for the second action at that point, + * but should_process() if the action has already been applied. In object removal + * it doesn't matter, but in object transition it does. + */ + virtual bool should_process() { + return true; + } + + virtual int process(lc_op_ctx& oc) { + return 0; + } + + friend class LCOpRule; +}; /* LCOpAction */ + +class LCOpFilter { +public: +virtual ~LCOpFilter() {} + virtual bool check(const DoutPrefixProvider *dpp, lc_op_ctx& oc) { + return false; + } +}; /* LCOpFilter */ + +class LCOpRule { + friend class LCOpAction; + + op_env env; + boost::optional<std::string> next_key_name; + ceph::real_time effective_mtime; + + std::vector<shared_ptr<LCOpFilter> > filters; // n.b., sharing ovhd + std::vector<shared_ptr<LCOpAction> > actions; + +public: + LCOpRule(op_env& _env) : env(_env) {} + + boost::optional<std::string> get_next_key_name() { + return next_key_name; + } + + std::vector<shared_ptr<LCOpAction>>& get_actions() { + return actions; + } + + void build(); + void update(); + int process(rgw_bucket_dir_entry& o, const DoutPrefixProvider *dpp, + WorkQ* wq); +}; /* LCOpRule */ + +using WorkItem = + boost::variant<void*, + /* out-of-line delete */ + std::tuple<LCOpRule, rgw_bucket_dir_entry>, + /* uncompleted MPU expiration */ + std::tuple<lc_op, rgw_bucket_dir_entry>, + rgw_bucket_dir_entry>; + +class WorkQ : public Thread +{ +public: + using unique_lock = std::unique_lock<std::mutex>; + using work_f = std::function<void(RGWLC::LCWorker*, WorkQ*, WorkItem&)>; + using dequeue_result = boost::variant<void*, WorkItem>; + + static constexpr uint32_t FLAG_NONE = 0x0000; + static constexpr uint32_t FLAG_EWAIT_SYNC = 0x0001; + static constexpr uint32_t FLAG_DWAIT_SYNC = 0x0002; + static constexpr uint32_t FLAG_EDRAIN_SYNC = 0x0004; + +private: + const work_f bsf = [](RGWLC::LCWorker* wk, WorkQ* wq, WorkItem& wi) {}; + RGWLC::LCWorker* wk; + uint32_t qmax; + int ix; + std::mutex mtx; + std::condition_variable cv; + uint32_t flags; + vector<WorkItem> items; + work_f f; + +public: + WorkQ(RGWLC::LCWorker* wk, uint32_t ix, uint32_t qmax) + : wk(wk), qmax(qmax), ix(ix), flags(FLAG_NONE), f(bsf) + { + create(thr_name().c_str()); + } + + std::string thr_name() { + return std::string{"wp_thrd: "} + + std::to_string(wk->ix) + ", " + std::to_string(ix); + } + + void setf(work_f _f) { + f = _f; + } + + void enqueue(WorkItem&& item) { + unique_lock uniq(mtx); + while ((!wk->get_lc()->going_down()) && + (items.size() > qmax)) { + flags |= FLAG_EWAIT_SYNC; + cv.wait_for(uniq, 200ms); + } + items.push_back(item); + if (flags & FLAG_DWAIT_SYNC) { + flags &= ~FLAG_DWAIT_SYNC; + cv.notify_one(); + } + } + + void drain() { + unique_lock uniq(mtx); + flags |= FLAG_EDRAIN_SYNC; + while (flags & FLAG_EDRAIN_SYNC) { + cv.wait_for(uniq, 200ms); + } + } + +private: + dequeue_result dequeue() { + unique_lock uniq(mtx); + while ((!wk->get_lc()->going_down()) && + (items.size() == 0)) { + /* clear drain state, as we are NOT doing work and qlen==0 */ + if (flags & FLAG_EDRAIN_SYNC) { + flags &= ~FLAG_EDRAIN_SYNC; + } + flags |= FLAG_DWAIT_SYNC; + cv.wait_for(uniq, 200ms); + } + if (items.size() > 0) { + auto item = items.back(); + items.pop_back(); + if (flags & FLAG_EWAIT_SYNC) { + flags &= ~FLAG_EWAIT_SYNC; + cv.notify_one(); + } + return {item}; + } + return nullptr; + } + + void* entry() override { + while (!wk->get_lc()->going_down()) { + auto item = dequeue(); + if (item.which() == 0) { + /* going down */ + break; + } + f(wk, this, boost::get<WorkItem>(item)); + } + return nullptr; + } +}; /* WorkQ */ + +class RGWLC::WorkPool +{ + using TVector = ceph::containers::tiny_vector<WorkQ, 3>; + TVector wqs; + uint64_t ix; + +public: + WorkPool(RGWLC::LCWorker* wk, uint16_t n_threads, uint32_t qmax) + : wqs(TVector{ + n_threads, + [&](const size_t ix, auto emplacer) { + emplacer.emplace(wk, ix, qmax); + }}), + ix(0) + {} + + ~WorkPool() { + for (auto& wq : wqs) { + wq.join(); + } + } + + void setf(WorkQ::work_f _f) { + for (auto& wq : wqs) { + wq.setf(_f); + } + } + + void enqueue(WorkItem item) { + const auto tix = ix; + ix = (ix+1) % wqs.size(); + (wqs[tix]).enqueue(std::move(item)); + } + + void drain() { + for (auto& wq : wqs) { + wq.drain(); + } + } +}; /* WorkPool */ + +RGWLC::LCWorker::LCWorker(const DoutPrefixProvider* dpp, CephContext *cct, + RGWLC *lc, int ix) + : dpp(dpp), cct(cct), lc(lc), ix(ix) +{ + auto wpw = cct->_conf.get_val<int64_t>("rgw_lc_max_wp_worker"); + workpool = new WorkPool(this, wpw, 512); +} + +static inline bool worker_should_stop(time_t stop_at, bool once) +{ + return !once && stop_at < time(nullptr); +} + +int RGWLC::handle_multipart_expiration(rgw::sal::Bucket* target, + const multimap<string, lc_op>& prefix_map, + LCWorker* worker, time_t stop_at, bool once) +{ + MultipartMetaFilter mp_filter; + int ret; + rgw::sal::Bucket::ListParams params; + rgw::sal::Bucket::ListResults results; + auto delay_ms = cct->_conf.get_val<int64_t>("rgw_lc_thread_delay"); + params.list_versions = false; + /* lifecycle processing does not depend on total order, so can + * take advantage of unordered listing optimizations--such as + * operating on one shard at a time */ + params.allow_unordered = true; + params.ns = RGW_OBJ_NS_MULTIPART; + params.access_list_filter = &mp_filter; + + auto pf = [&](RGWLC::LCWorker* wk, WorkQ* wq, WorkItem& wi) { + auto wt = boost::get<std::tuple<lc_op, rgw_bucket_dir_entry>>(wi); + auto& [rule, obj] = wt; + if (obj_has_expired(this, cct, obj.meta.mtime, rule.mp_expiration)) { + rgw_obj_key key(obj.key); + std::unique_ptr<rgw::sal::MultipartUpload> mpu = target->get_multipart_upload(key.name); + int ret = mpu->abort(this, cct); + if (ret == 0) { + if (perfcounter) { + perfcounter->inc(l_rgw_lc_abort_mpu, 1); + } + } else { + if (ret == -ERR_NO_SUCH_UPLOAD) { + ldpp_dout(wk->get_lc(), 5) + << "ERROR: abort_multipart_upload failed, ret=" << ret + << ", thread:" << wq->thr_name() + << ", meta:" << obj.key + << dendl; + } else { + ldpp_dout(wk->get_lc(), 0) + << "ERROR: abort_multipart_upload failed, ret=" << ret + << ", thread:" << wq->thr_name() + << ", meta:" << obj.key + << dendl; + } + } /* abort failed */ + } /* expired */ + }; + + worker->workpool->setf(pf); + + for (auto prefix_iter = prefix_map.begin(); prefix_iter != prefix_map.end(); + ++prefix_iter) { + + if (worker_should_stop(stop_at, once)) { + ldpp_dout(this, 5) << __func__ << " interval budget EXPIRED worker " + << worker->ix + << dendl; + return 0; + } + + if (!prefix_iter->second.status || prefix_iter->second.mp_expiration <= 0) { + continue; + } + params.prefix = prefix_iter->first; + do { + auto offset = 0; + results.objs.clear(); + ret = target->list(this, params, 1000, results, null_yield); + if (ret < 0) { + if (ret == (-ENOENT)) + return 0; + ldpp_dout(this, 0) << "ERROR: driver->list_objects():" <<dendl; + return ret; + } + + for (auto obj_iter = results.objs.begin(); obj_iter != results.objs.end(); ++obj_iter, ++offset) { + std::tuple<lc_op, rgw_bucket_dir_entry> t1 = + {prefix_iter->second, *obj_iter}; + worker->workpool->enqueue(WorkItem{t1}); + if (going_down()) { + return 0; + } + } /* for objs */ + + if ((offset % 100) == 0) { + if (worker_should_stop(stop_at, once)) { + ldpp_dout(this, 5) << __func__ << " interval budget EXPIRED worker " + << worker->ix + << dendl; + return 0; + } + } + + std::this_thread::sleep_for(std::chrono::milliseconds(delay_ms)); + } while(results.is_truncated); + } /* for prefix_map */ + + worker->workpool->drain(); + return 0; +} /* RGWLC::handle_multipart_expiration */ + +static int read_obj_tags(const DoutPrefixProvider *dpp, rgw::sal::Object* obj, bufferlist& tags_bl) +{ + std::unique_ptr<rgw::sal::Object::ReadOp> rop = obj->get_read_op(); + + return rop->get_attr(dpp, RGW_ATTR_TAGS, tags_bl, null_yield); +} + +static bool is_valid_op(const lc_op& op) +{ + return (op.status && + (op.expiration > 0 + || op.expiration_date != boost::none + || op.noncur_expiration > 0 + || op.dm_expiration + || !op.transitions.empty() + || !op.noncur_transitions.empty())); +} + +static bool zone_check(const lc_op& op, rgw::sal::Zone* zone) +{ + + if (zone->get_tier_type() == "archive") { + return (op.rule_flags & uint32_t(LCFlagType::ArchiveZone)); + } else { + return (! (op.rule_flags & uint32_t(LCFlagType::ArchiveZone))); + } +} + +static inline bool has_all_tags(const lc_op& rule_action, + const RGWObjTags& object_tags) +{ + if(! rule_action.obj_tags) + return false; + if(object_tags.count() < rule_action.obj_tags->count()) + return false; + size_t tag_count = 0; + for (const auto& tag : object_tags.get_tags()) { + const auto& rule_tags = rule_action.obj_tags->get_tags(); + const auto& iter = rule_tags.find(tag.first); + if(iter == rule_tags.end()) + continue; + if(iter->second == tag.second) + { + tag_count++; + } + /* all tags in the rule appear in obj tags */ + } + return tag_count == rule_action.obj_tags->count(); +} + +static int check_tags(const DoutPrefixProvider *dpp, lc_op_ctx& oc, bool *skip) +{ + auto& op = oc.op; + + if (op.obj_tags != boost::none) { + *skip = true; + + bufferlist tags_bl; + int ret = read_obj_tags(dpp, oc.obj.get(), tags_bl); + if (ret < 0) { + if (ret != -ENODATA) { + ldpp_dout(oc.dpp, 5) << "ERROR: read_obj_tags returned r=" + << ret << " " << oc.wq->thr_name() << dendl; + } + return 0; + } + RGWObjTags dest_obj_tags; + try { + auto iter = tags_bl.cbegin(); + dest_obj_tags.decode(iter); + } catch (buffer::error& err) { + ldpp_dout(oc.dpp,0) << "ERROR: caught buffer::error, couldn't decode TagSet " + << oc.wq->thr_name() << dendl; + return -EIO; + } + + if (! has_all_tags(op, dest_obj_tags)) { + ldpp_dout(oc.dpp, 20) << __func__ << "() skipping obj " << oc.obj + << " as tags do not match in rule: " + << op.id << " " + << oc.wq->thr_name() << dendl; + return 0; + } + } + *skip = false; + return 0; +} + +class LCOpFilter_Tags : public LCOpFilter { +public: + bool check(const DoutPrefixProvider *dpp, lc_op_ctx& oc) override { + auto& o = oc.o; + + if (o.is_delete_marker()) { + return true; + } + + bool skip; + + int ret = check_tags(dpp, oc, &skip); + if (ret < 0) { + if (ret == -ENOENT) { + return false; + } + ldpp_dout(oc.dpp, 0) << "ERROR: check_tags on obj=" << oc.obj + << " returned ret=" << ret << " " + << oc.wq->thr_name() << dendl; + return false; + } + + return !skip; + }; +}; + +class LCOpAction_CurrentExpiration : public LCOpAction { +public: + LCOpAction_CurrentExpiration(op_env& env) {} + + bool check(lc_op_ctx& oc, ceph::real_time *exp_time, const DoutPrefixProvider *dpp) override { + auto& o = oc.o; + if (!o.is_current()) { + ldpp_dout(dpp, 20) << __func__ << "(): key=" << o.key + << ": not current, skipping " + << oc.wq->thr_name() << dendl; + return false; + } + if (o.is_delete_marker()) { + if (oc.next_key_name) { + std::string nkn = *oc.next_key_name; + if (oc.next_has_same_name(o.key.name)) { + ldpp_dout(dpp, 7) << __func__ << "(): dm-check SAME: key=" << o.key + << " next_key_name: %%" << nkn << "%% " + << oc.wq->thr_name() << dendl; + return false; + } else { + ldpp_dout(dpp, 7) << __func__ << "(): dm-check DELE: key=" << o.key + << " next_key_name: %%" << nkn << "%% " + << oc.wq->thr_name() << dendl; + *exp_time = real_clock::now(); + return true; + } + } + return false; + } + + auto& mtime = o.meta.mtime; + bool is_expired; + auto& op = oc.op; + if (op.expiration <= 0) { + if (op.expiration_date == boost::none) { + ldpp_dout(dpp, 20) << __func__ << "(): key=" << o.key + << ": no expiration set in rule, skipping " + << oc.wq->thr_name() << dendl; + return false; + } + is_expired = ceph_clock_now() >= + ceph::real_clock::to_time_t(*op.expiration_date); + *exp_time = *op.expiration_date; + } else { + is_expired = obj_has_expired(dpp, oc.cct, mtime, op.expiration, exp_time); + } + + ldpp_dout(dpp, 20) << __func__ << "(): key=" << o.key << ": is_expired=" + << (int)is_expired << " " + << oc.wq->thr_name() << dendl; + return is_expired; + } + + int process(lc_op_ctx& oc) { + auto& o = oc.o; + int r; + if (o.is_delete_marker()) { + r = remove_expired_obj(oc.dpp, oc, true, + rgw::notify::ObjectExpirationDeleteMarker); + if (r < 0) { + ldpp_dout(oc.dpp, 0) << "ERROR: current is-dm remove_expired_obj " + << oc.bucket << ":" << o.key + << " " << cpp_strerror(r) << " " + << oc.wq->thr_name() << dendl; + return r; + } + ldpp_dout(oc.dpp, 2) << "DELETED: current is-dm " + << oc.bucket << ":" << o.key + << " " << oc.wq->thr_name() << dendl; + } else { + /* ! o.is_delete_marker() */ + r = remove_expired_obj(oc.dpp, oc, !oc.bucket->versioned(), + rgw::notify::ObjectExpirationCurrent); + if (r < 0) { + ldpp_dout(oc.dpp, 0) << "ERROR: remove_expired_obj " + << oc.bucket << ":" << o.key + << " " << cpp_strerror(r) << " " + << oc.wq->thr_name() << dendl; + return r; + } + if (perfcounter) { + perfcounter->inc(l_rgw_lc_expire_current, 1); + } + ldpp_dout(oc.dpp, 2) << "DELETED:" << oc.bucket << ":" << o.key + << " " << oc.wq->thr_name() << dendl; + } + return 0; + } +}; + +class LCOpAction_NonCurrentExpiration : public LCOpAction { +protected: +public: + LCOpAction_NonCurrentExpiration(op_env& env) + {} + + bool check(lc_op_ctx& oc, ceph::real_time *exp_time, const DoutPrefixProvider *dpp) override { + auto& o = oc.o; + if (o.is_current()) { + ldpp_dout(dpp, 20) << __func__ << "(): key=" << o.key + << ": current version, skipping " + << oc.wq->thr_name() << dendl; + return false; + } + + int expiration = oc.op.noncur_expiration; + bool is_expired = obj_has_expired(dpp, oc.cct, oc.effective_mtime, expiration, + exp_time); + + ldpp_dout(dpp, 20) << __func__ << "(): key=" << o.key << ": is_expired=" + << is_expired << " " + << oc.wq->thr_name() << dendl; + + return is_expired && + pass_object_lock_check(oc.driver, oc.obj.get(), dpp); + } + + int process(lc_op_ctx& oc) { + auto& o = oc.o; + int r = remove_expired_obj(oc.dpp, oc, true, + rgw::notify::ObjectExpirationNoncurrent); + if (r < 0) { + ldpp_dout(oc.dpp, 0) << "ERROR: remove_expired_obj (non-current expiration) " + << oc.bucket << ":" << o.key + << " " << cpp_strerror(r) + << " " << oc.wq->thr_name() << dendl; + return r; + } + if (perfcounter) { + perfcounter->inc(l_rgw_lc_expire_noncurrent, 1); + } + ldpp_dout(oc.dpp, 2) << "DELETED:" << oc.bucket << ":" << o.key + << " (non-current expiration) " + << oc.wq->thr_name() << dendl; + return 0; + } +}; + +class LCOpAction_DMExpiration : public LCOpAction { +public: + LCOpAction_DMExpiration(op_env& env) {} + + bool check(lc_op_ctx& oc, ceph::real_time *exp_time, const DoutPrefixProvider *dpp) override { + auto& o = oc.o; + if (!o.is_delete_marker()) { + ldpp_dout(dpp, 20) << __func__ << "(): key=" << o.key + << ": not a delete marker, skipping " + << oc.wq->thr_name() << dendl; + return false; + } + if (oc.next_has_same_name(o.key.name)) { + ldpp_dout(dpp, 20) << __func__ << "(): key=" << o.key + << ": next is same object, skipping " + << oc.wq->thr_name() << dendl; + return false; + } + + *exp_time = real_clock::now(); + + return true; + } + + int process(lc_op_ctx& oc) { + auto& o = oc.o; + int r = remove_expired_obj(oc.dpp, oc, true, + rgw::notify::ObjectExpirationDeleteMarker); + if (r < 0) { + ldpp_dout(oc.dpp, 0) << "ERROR: remove_expired_obj (delete marker expiration) " + << oc.bucket << ":" << o.key + << " " << cpp_strerror(r) + << " " << oc.wq->thr_name() + << dendl; + return r; + } + if (perfcounter) { + perfcounter->inc(l_rgw_lc_expire_dm, 1); + } + ldpp_dout(oc.dpp, 2) << "DELETED:" << oc.bucket << ":" << o.key + << " (delete marker expiration) " + << oc.wq->thr_name() << dendl; + return 0; + } +}; + +class LCOpAction_Transition : public LCOpAction { + const transition_action& transition; + bool need_to_process{false}; + +protected: + virtual bool check_current_state(bool is_current) = 0; + virtual ceph::real_time get_effective_mtime(lc_op_ctx& oc) = 0; +public: + LCOpAction_Transition(const transition_action& _transition) + : transition(_transition) {} + + bool check(lc_op_ctx& oc, ceph::real_time *exp_time, const DoutPrefixProvider *dpp) override { + auto& o = oc.o; + + if (o.is_delete_marker()) { + return false; + } + + if (!check_current_state(o.is_current())) { + return false; + } + + auto mtime = get_effective_mtime(oc); + bool is_expired; + if (transition.days < 0) { + if (transition.date == boost::none) { + ldpp_dout(dpp, 20) << __func__ << "(): key=" << o.key + << ": no transition day/date set in rule, skipping " + << oc.wq->thr_name() << dendl; + return false; + } + is_expired = ceph_clock_now() >= + ceph::real_clock::to_time_t(*transition.date); + *exp_time = *transition.date; + } else { + is_expired = obj_has_expired(dpp, oc.cct, mtime, transition.days, exp_time); + } + + ldpp_dout(oc.dpp, 20) << __func__ << "(): key=" << o.key << ": is_expired=" + << is_expired << " " + << oc.wq->thr_name() << dendl; + + need_to_process = + (rgw_placement_rule::get_canonical_storage_class(o.meta.storage_class) != + transition.storage_class); + + return is_expired; + } + + bool should_process() override { + return need_to_process; + } + + int delete_tier_obj(lc_op_ctx& oc) { + int ret = 0; + + /* If bucket is versioned, create delete_marker for current version + */ + if (oc.bucket->versioned() && oc.o.is_current() && !oc.o.is_delete_marker()) { + ret = remove_expired_obj(oc.dpp, oc, false, rgw::notify::ObjectExpiration); + ldpp_dout(oc.dpp, 20) << "delete_tier_obj Object(key:" << oc.o.key << ") current & not delete_marker" << " versioned_epoch: " << oc.o.versioned_epoch << "flags: " << oc.o.flags << dendl; + } else { + ret = remove_expired_obj(oc.dpp, oc, true, rgw::notify::ObjectExpiration); + ldpp_dout(oc.dpp, 20) << "delete_tier_obj Object(key:" << oc.o.key << ") not current " << "versioned_epoch: " << oc.o.versioned_epoch << "flags: " << oc.o.flags << dendl; + } + return ret; + } + + int transition_obj_to_cloud(lc_op_ctx& oc) { + /* If CurrentVersion object, remove it & create delete marker */ + bool delete_object = (!oc.tier->retain_head_object() || + (oc.o.is_current() && oc.bucket->versioned())); + + int ret = oc.obj->transition_to_cloud(oc.bucket, oc.tier.get(), oc.o, + oc.env.worker->get_cloud_targets(), oc.cct, + !delete_object, oc.dpp, null_yield); + if (ret < 0) { + return ret; + } + + if (delete_object) { + ret = delete_tier_obj(oc); + if (ret < 0) { + ldpp_dout(oc.dpp, 0) << "ERROR: Deleting tier object(" << oc.o.key << ") failed ret=" << ret << dendl; + return ret; + } + } + + return 0; + } + + int process(lc_op_ctx& oc) { + auto& o = oc.o; + int r; + + if (oc.o.meta.category == RGWObjCategory::CloudTiered) { + /* Skip objects which are already cloud tiered. */ + ldpp_dout(oc.dpp, 30) << "Object(key:" << oc.o.key << ") is already cloud tiered to cloud-s3 tier: " << oc.o.meta.storage_class << dendl; + return 0; + } + + std::string tier_type = ""; + rgw::sal::ZoneGroup& zonegroup = oc.driver->get_zone()->get_zonegroup(); + + rgw_placement_rule target_placement; + target_placement.inherit_from(oc.bucket->get_placement_rule()); + target_placement.storage_class = transition.storage_class; + + r = zonegroup.get_placement_tier(target_placement, &oc.tier); + + if (!r && oc.tier->get_tier_type() == "cloud-s3") { + ldpp_dout(oc.dpp, 30) << "Found cloud s3 tier: " << target_placement.storage_class << dendl; + if (!oc.o.is_current() && + !pass_object_lock_check(oc.driver, oc.obj.get(), oc.dpp)) { + /* Skip objects which has object lock enabled. */ + ldpp_dout(oc.dpp, 10) << "Object(key:" << oc.o.key << ") is locked. Skipping transition to cloud-s3 tier: " << target_placement.storage_class << dendl; + return 0; + } + + r = transition_obj_to_cloud(oc); + if (r < 0) { + ldpp_dout(oc.dpp, 0) << "ERROR: failed to transition obj(key:" << oc.o.key << ") to cloud (r=" << r << ")" + << dendl; + return r; + } + } else { + if (!oc.driver->valid_placement(target_placement)) { + ldpp_dout(oc.dpp, 0) << "ERROR: non existent dest placement: " + << target_placement + << " bucket="<< oc.bucket + << " rule_id=" << oc.op.id + << " " << oc.wq->thr_name() << dendl; + return -EINVAL; + } + + int r = oc.obj->transition(oc.bucket, target_placement, o.meta.mtime, + o.versioned_epoch, oc.dpp, null_yield); + if (r < 0) { + ldpp_dout(oc.dpp, 0) << "ERROR: failed to transition obj " + << oc.bucket << ":" << o.key + << " -> " << transition.storage_class + << " " << cpp_strerror(r) + << " " << oc.wq->thr_name() << dendl; + return r; + } + } + ldpp_dout(oc.dpp, 2) << "TRANSITIONED:" << oc.bucket + << ":" << o.key << " -> " + << transition.storage_class + << " " << oc.wq->thr_name() << dendl; + return 0; + } +}; + +class LCOpAction_CurrentTransition : public LCOpAction_Transition { +protected: + bool check_current_state(bool is_current) override { + return is_current; + } + + ceph::real_time get_effective_mtime(lc_op_ctx& oc) override { + return oc.o.meta.mtime; + } +public: + LCOpAction_CurrentTransition(const transition_action& _transition) + : LCOpAction_Transition(_transition) {} + int process(lc_op_ctx& oc) { + int r = LCOpAction_Transition::process(oc); + if (r == 0) { + if (perfcounter) { + perfcounter->inc(l_rgw_lc_transition_current, 1); + } + } + return r; + } +}; + +class LCOpAction_NonCurrentTransition : public LCOpAction_Transition { +protected: + bool check_current_state(bool is_current) override { + return !is_current; + } + + ceph::real_time get_effective_mtime(lc_op_ctx& oc) override { + return oc.effective_mtime; + } +public: + LCOpAction_NonCurrentTransition(op_env& env, + const transition_action& _transition) + : LCOpAction_Transition(_transition) + {} + int process(lc_op_ctx& oc) { + int r = LCOpAction_Transition::process(oc); + if (r == 0) { + if (perfcounter) { + perfcounter->inc(l_rgw_lc_transition_noncurrent, 1); + } + } + return r; + } +}; + +void LCOpRule::build() +{ + filters.emplace_back(new LCOpFilter_Tags); + + auto& op = env.op; + + if (op.expiration > 0 || + op.expiration_date != boost::none) { + actions.emplace_back(new LCOpAction_CurrentExpiration(env)); + } + + if (op.dm_expiration) { + actions.emplace_back(new LCOpAction_DMExpiration(env)); + } + + if (op.noncur_expiration > 0) { + actions.emplace_back(new LCOpAction_NonCurrentExpiration(env)); + } + + for (auto& iter : op.transitions) { + actions.emplace_back(new LCOpAction_CurrentTransition(iter.second)); + } + + for (auto& iter : op.noncur_transitions) { + actions.emplace_back(new LCOpAction_NonCurrentTransition(env, iter.second)); + } +} + +void LCOpRule::update() +{ + next_key_name = env.ol.next_key_name(); + effective_mtime = env.ol.get_prev_obj().meta.mtime; +} + +int LCOpRule::process(rgw_bucket_dir_entry& o, + const DoutPrefixProvider *dpp, + WorkQ* wq) +{ + lc_op_ctx ctx(env, o, next_key_name, effective_mtime, dpp, wq); + shared_ptr<LCOpAction> *selected = nullptr; // n.b., req'd by sharing + real_time exp; + + for (auto& a : actions) { + real_time action_exp; + + if (a->check(ctx, &action_exp, dpp)) { + if (action_exp > exp) { + exp = action_exp; + selected = &a; + } + } + } + + if (selected && + (*selected)->should_process()) { + + /* + * Calling filter checks after action checks because + * all action checks (as they are implemented now) do + * not access the objects themselves, but return result + * from info from bucket index listing. The current tags filter + * check does access the objects, so we avoid unnecessary rados calls + * having filters check later in the process. + */ + + bool cont = false; + for (auto& f : filters) { + if (f->check(dpp, ctx)) { + cont = true; + break; + } + } + + if (!cont) { + ldpp_dout(dpp, 20) << __func__ << "(): key=" << o.key + << ": no rule match, skipping " + << wq->thr_name() << dendl; + return 0; + } + + int r = (*selected)->process(ctx); + if (r < 0) { + ldpp_dout(dpp, 0) << "ERROR: remove_expired_obj " + << env.bucket << ":" << o.key + << " " << cpp_strerror(r) + << " " << wq->thr_name() << dendl; + return r; + } + ldpp_dout(dpp, 20) << "processed:" << env.bucket << ":" + << o.key << " " << wq->thr_name() << dendl; + } + + return 0; + +} + +int RGWLC::bucket_lc_process(string& shard_id, LCWorker* worker, + time_t stop_at, bool once) +{ + RGWLifecycleConfiguration config(cct); + std::unique_ptr<rgw::sal::Bucket> bucket; + string no_ns, list_versions; + vector<rgw_bucket_dir_entry> objs; + vector<std::string> result; + boost::split(result, shard_id, boost::is_any_of(":")); + string bucket_tenant = result[0]; + string bucket_name = result[1]; + string bucket_marker = result[2]; + + ldpp_dout(this, 5) << "RGWLC::bucket_lc_process ENTER " << bucket_name << dendl; + if (unlikely(cct->_conf->rgwlc_skip_bucket_step)) { + return 0; + } + + int ret = driver->get_bucket(this, nullptr, bucket_tenant, bucket_name, &bucket, null_yield); + if (ret < 0) { + ldpp_dout(this, 0) << "LC:get_bucket for " << bucket_name + << " failed" << dendl; + return ret; + } + + ret = bucket->load_bucket(this, null_yield); + if (ret < 0) { + ldpp_dout(this, 0) << "LC:load_bucket for " << bucket_name + << " failed" << dendl; + return ret; + } + + auto stack_guard = make_scope_guard( + [&worker] + { + worker->workpool->drain(); + } + ); + + if (bucket->get_marker() != bucket_marker) { + ldpp_dout(this, 1) << "LC: deleting stale entry found for bucket=" + << bucket_tenant << ":" << bucket_name + << " cur_marker=" << bucket->get_marker() + << " orig_marker=" << bucket_marker << dendl; + return -ENOENT; + } + + map<string, bufferlist>::iterator aiter + = bucket->get_attrs().find(RGW_ATTR_LC); + if (aiter == bucket->get_attrs().end()) { + ldpp_dout(this, 0) << "WARNING: bucket_attrs.find(RGW_ATTR_LC) failed for " + << bucket_name << " (terminates bucket_lc_process(...))" + << dendl; + return 0; + } + + bufferlist::const_iterator iter{&aiter->second}; + try { + config.decode(iter); + } catch (const buffer::error& e) { + ldpp_dout(this, 0) << __func__ << "() decode life cycle config failed" + << dendl; + return -1; + } + + /* fetch information for zone checks */ + rgw::sal::Zone* zone = driver->get_zone(); + + auto pf = [](RGWLC::LCWorker* wk, WorkQ* wq, WorkItem& wi) { + auto wt = + boost::get<std::tuple<LCOpRule, rgw_bucket_dir_entry>>(wi); + auto& [op_rule, o] = wt; + + ldpp_dout(wk->get_lc(), 20) + << __func__ << "(): key=" << o.key << wq->thr_name() + << dendl; + int ret = op_rule.process(o, wk->dpp, wq); + if (ret < 0) { + ldpp_dout(wk->get_lc(), 20) + << "ERROR: orule.process() returned ret=" << ret + << "thread:" << wq->thr_name() + << dendl; + } + }; + worker->workpool->setf(pf); + + multimap<string, lc_op>& prefix_map = config.get_prefix_map(); + ldpp_dout(this, 10) << __func__ << "() prefix_map size=" + << prefix_map.size() + << dendl; + + rgw_obj_key pre_marker; + rgw_obj_key next_marker; + for(auto prefix_iter = prefix_map.begin(); prefix_iter != prefix_map.end(); + ++prefix_iter) { + + if (worker_should_stop(stop_at, once)) { + ldpp_dout(this, 5) << __func__ << " interval budget EXPIRED worker " + << worker->ix + << dendl; + return 0; + } + + auto& op = prefix_iter->second; + if (!is_valid_op(op)) { + continue; + } + ldpp_dout(this, 20) << __func__ << "(): prefix=" << prefix_iter->first + << dendl; + if (prefix_iter != prefix_map.begin() && + (prefix_iter->first.compare(0, prev(prefix_iter)->first.length(), + prev(prefix_iter)->first) == 0)) { + next_marker = pre_marker; + } else { + pre_marker = next_marker; + } + + LCObjsLister ol(driver, bucket.get()); + ol.set_prefix(prefix_iter->first); + + if (! zone_check(op, zone)) { + ldpp_dout(this, 7) << "LC rule not executable in " << zone->get_tier_type() + << " zone, skipping" << dendl; + continue; + } + + ret = ol.init(this); + if (ret < 0) { + if (ret == (-ENOENT)) + return 0; + ldpp_dout(this, 0) << "ERROR: driver->list_objects():" << dendl; + return ret; + } + + op_env oenv(op, driver, worker, bucket.get(), ol); + LCOpRule orule(oenv); + orule.build(); // why can't ctor do it? + rgw_bucket_dir_entry* o{nullptr}; + for (auto offset = 0; ol.get_obj(this, &o /* , fetch_barrier */); ++offset, ol.next()) { + orule.update(); + std::tuple<LCOpRule, rgw_bucket_dir_entry> t1 = {orule, *o}; + worker->workpool->enqueue(WorkItem{t1}); + if ((offset % 100) == 0) { + if (worker_should_stop(stop_at, once)) { + ldpp_dout(this, 5) << __func__ << " interval budget EXPIRED worker " + << worker->ix + << dendl; + return 0; + } + } + } + worker->workpool->drain(); + } + + ret = handle_multipart_expiration(bucket.get(), prefix_map, worker, stop_at, once); + return ret; +} + +class SimpleBackoff +{ + const int max_retries; + std::chrono::milliseconds sleep_ms; + int retries{0}; +public: + SimpleBackoff(int max_retries, std::chrono::milliseconds initial_sleep_ms) + : max_retries(max_retries), sleep_ms(initial_sleep_ms) + {} + SimpleBackoff(const SimpleBackoff&) = delete; + SimpleBackoff& operator=(const SimpleBackoff&) = delete; + + int get_retries() const { + return retries; + } + + void reset() { + retries = 0; + } + + bool wait_backoff(const fu2::unique_function<bool(void) const>& barrier) { + reset(); + while (retries < max_retries) { + auto r = barrier(); + if (r) { + return r; + } + std::this_thread::sleep_for(sleep_ms * 2 * retries++); + } + return false; + } +}; + +int RGWLC::bucket_lc_post(int index, int max_lock_sec, + rgw::sal::Lifecycle::LCEntry& entry, int& result, + LCWorker* worker) +{ + utime_t lock_duration(cct->_conf->rgw_lc_lock_max_time, 0); + + std::unique_ptr<rgw::sal::LCSerializer> lock = + sal_lc->get_serializer(lc_index_lock_name, obj_names[index], cookie); + + ldpp_dout(this, 5) << "RGWLC::bucket_lc_post(): POST " << entry + << " index: " << index << " worker ix: " << worker->ix + << dendl; + + do { + int ret = lock->try_lock(this, lock_duration, null_yield); + if (ret == -EBUSY || ret == -EEXIST) { + /* already locked by another lc processor */ + ldpp_dout(this, 0) << "RGWLC::bucket_lc_post() failed to acquire lock on " + << obj_names[index] << ", sleep 5, try again " << dendl; + sleep(5); + continue; + } + + if (ret < 0) + return 0; + ldpp_dout(this, 20) << "RGWLC::bucket_lc_post() lock " << obj_names[index] + << dendl; + + if (result == -ENOENT) { + /* XXXX are we SURE the only way result could == ENOENT is when + * there is no such bucket? It is currently the value returned + * from bucket_lc_process(...) */ + ret = sal_lc->rm_entry(obj_names[index], entry); + if (ret < 0) { + ldpp_dout(this, 0) << "RGWLC::bucket_lc_post() failed to remove entry " + << obj_names[index] << dendl; + } + goto clean; + } else if (result < 0) { + entry.set_status(lc_failed); + } else { + entry.set_status(lc_complete); + } + + ret = sal_lc->set_entry(obj_names[index], entry); + if (ret < 0) { + ldpp_dout(this, 0) << "RGWLC::process() failed to set entry on " + << obj_names[index] << dendl; + } +clean: + lock->unlock(); + ldpp_dout(this, 20) << "RGWLC::bucket_lc_post() unlock " + << obj_names[index] << dendl; + return 0; + } while (true); +} /* RGWLC::bucket_lc_post */ + +int RGWLC::list_lc_progress(string& marker, uint32_t max_entries, + vector<std::unique_ptr<rgw::sal::Lifecycle::LCEntry>>& progress_map, + int& index) +{ + progress_map.clear(); + for(; index < max_objs; index++, marker="") { + vector<std::unique_ptr<rgw::sal::Lifecycle::LCEntry>> entries; + int ret = sal_lc->list_entries(obj_names[index], marker, max_entries, entries); + if (ret < 0) { + if (ret == -ENOENT) { + ldpp_dout(this, 10) << __func__ << "() ignoring unfound lc object=" + << obj_names[index] << dendl; + continue; + } else { + return ret; + } + } + progress_map.reserve(progress_map.size() + entries.size()); + std::move(begin(entries), end(entries), std::back_inserter(progress_map)); + //progress_map.insert(progress_map.end(), entries.begin(), entries.end()); + + /* update index, marker tuple */ + if (progress_map.size() > 0) + marker = progress_map.back()->get_bucket(); + + if (progress_map.size() >= max_entries) + break; + } + return 0; +} + +static inline vector<int> random_sequence(uint32_t n) +{ + vector<int> v(n, 0); + std::generate(v.begin(), v.end(), + [ix = 0]() mutable { + return ix++; + }); + std::random_device rd; + std::default_random_engine rng{rd()}; + std::shuffle(v.begin(), v.end(), rng); + return v; +} + +static inline int get_lc_index(CephContext *cct, + const std::string& shard_id) +{ + int max_objs = + (cct->_conf->rgw_lc_max_objs > HASH_PRIME ? HASH_PRIME : + cct->_conf->rgw_lc_max_objs); + /* n.b. review hash algo */ + int index = ceph_str_hash_linux(shard_id.c_str(), + shard_id.size()) % HASH_PRIME % max_objs; + return index; +} + +static inline void get_lc_oid(CephContext *cct, + const std::string& shard_id, string *oid) +{ + /* n.b. review hash algo */ + int index = get_lc_index(cct, shard_id); + *oid = lc_oid_prefix; + char buf[32]; + snprintf(buf, 32, ".%d", index); + oid->append(buf); + return; +} + +static std::string get_bucket_lc_key(const rgw_bucket& bucket){ + return string_join_reserve(':', bucket.tenant, bucket.name, bucket.marker); +} + +int RGWLC::process(LCWorker* worker, + const std::unique_ptr<rgw::sal::Bucket>& optional_bucket, + bool once = false) +{ + int ret = 0; + int max_secs = cct->_conf->rgw_lc_lock_max_time; + + if (optional_bucket) { + /* if a bucket is provided, this is a single-bucket run, and + * can be processed without traversing any state entries (we + * do need the entry {pro,epi}logue which update the state entry + * for this bucket) */ + auto bucket_lc_key = get_bucket_lc_key(optional_bucket->get_key()); + auto index = get_lc_index(driver->ctx(), bucket_lc_key); + ret = process_bucket(index, max_secs, worker, bucket_lc_key, once); + return ret; + } else { + /* generate an index-shard sequence unrelated to any other + * that might be running in parallel */ + std::string all_buckets{""}; + vector<int> shard_seq = random_sequence(max_objs); + for (auto index : shard_seq) { + ret = process(index, max_secs, worker, once); + if (ret < 0) + return ret; + } + } + + return 0; +} + +bool RGWLC::expired_session(time_t started) +{ + if (! cct->_conf->rgwlc_auto_session_clear) { + return false; + } + + time_t interval = (cct->_conf->rgw_lc_debug_interval > 0) + ? cct->_conf->rgw_lc_debug_interval + : 24*60*60; + + auto now = time(nullptr); + + ldpp_dout(this, 16) << "RGWLC::expired_session" + << " started: " << started + << " interval: " << interval << "(*2==" << 2*interval << ")" + << " now: " << now + << dendl; + + return (started + 2*interval < now); +} + +time_t RGWLC::thread_stop_at() +{ + uint64_t interval = (cct->_conf->rgw_lc_debug_interval > 0) + ? cct->_conf->rgw_lc_debug_interval + : 24*60*60; + + return time(nullptr) + interval; +} + +int RGWLC::process_bucket(int index, int max_lock_secs, LCWorker* worker, + const std::string& bucket_entry_marker, + bool once = false) +{ + ldpp_dout(this, 5) << "RGWLC::process_bucket(): ENTER: " + << "index: " << index << " worker ix: " << worker->ix + << dendl; + + int ret = 0; + std::unique_ptr<rgw::sal::LCSerializer> serializer = + sal_lc->get_serializer(lc_index_lock_name, obj_names[index], + worker->thr_name()); + std::unique_ptr<rgw::sal::Lifecycle::LCEntry> entry; + if (max_lock_secs <= 0) { + return -EAGAIN; + } + + utime_t time(max_lock_secs, 0); + ret = serializer->try_lock(this, time, null_yield); + if (ret == -EBUSY || ret == -EEXIST) { + /* already locked by another lc processor */ + ldpp_dout(this, 0) << "RGWLC::process() failed to acquire lock on " + << obj_names[index] << dendl; + return -EBUSY; + } + if (ret < 0) + return 0; + + std::unique_lock<rgw::sal::LCSerializer> lock( + *(serializer.get()), std::adopt_lock); + + ret = sal_lc->get_entry(obj_names[index], bucket_entry_marker, &entry); + if (ret >= 0) { + if (entry->get_status() == lc_processing) { + if (expired_session(entry->get_start_time())) { + ldpp_dout(this, 5) << "RGWLC::process_bucket(): STALE lc session found for: " << entry + << " index: " << index << " worker ix: " << worker->ix + << " (clearing)" + << dendl; + } else { + ldpp_dout(this, 5) << "RGWLC::process_bucket(): ACTIVE entry: " + << entry + << " index: " << index + << " worker ix: " << worker->ix + << dendl; + return ret; + } + } + } + + /* do nothing if no bucket */ + if (entry->get_bucket().empty()) { + return ret; + } + + ldpp_dout(this, 5) << "RGWLC::process_bucket(): START entry 1: " << entry + << " index: " << index << " worker ix: " << worker->ix + << dendl; + + entry->set_status(lc_processing); + ret = sal_lc->set_entry(obj_names[index], *entry); + if (ret < 0) { + ldpp_dout(this, 0) << "RGWLC::process_bucket() failed to set obj entry " + << obj_names[index] << entry->get_bucket() << entry->get_status() + << dendl; + return ret; + } + + ldpp_dout(this, 5) << "RGWLC::process_bucket(): START entry 2: " << entry + << " index: " << index << " worker ix: " << worker->ix + << dendl; + + lock.unlock(); + ret = bucket_lc_process(entry->get_bucket(), worker, thread_stop_at(), once); + bucket_lc_post(index, max_lock_secs, *entry, ret, worker); + + return ret; +} /* RGWLC::process_bucket */ + +static inline bool allow_shard_rollover(CephContext* cct, time_t now, time_t shard_rollover_date) +{ + /* return true iff: + * - non-debug scheduling is in effect, and + * - the current shard has not rolled over in the last 24 hours + */ + if (((shard_rollover_date < now) && + (now - shard_rollover_date > 24*60*60)) || + (! shard_rollover_date /* no rollover date stored */) || + (cct->_conf->rgw_lc_debug_interval > 0 /* defaults to -1 == disabled */)) { + return true; + } + return false; +} /* allow_shard_rollover */ + +static inline bool already_run_today(CephContext* cct, time_t start_date) +{ + struct tm bdt; + time_t begin_of_day; + utime_t now = ceph_clock_now(); + localtime_r(&start_date, &bdt); + + if (cct->_conf->rgw_lc_debug_interval > 0) { + if (now - start_date < cct->_conf->rgw_lc_debug_interval) + return true; + else + return false; + } + + bdt.tm_hour = 0; + bdt.tm_min = 0; + bdt.tm_sec = 0; + begin_of_day = mktime(&bdt); + if (now - begin_of_day < 24*60*60) + return true; + else + return false; +} /* already_run_today */ + +inline int RGWLC::advance_head(const std::string& lc_shard, + rgw::sal::Lifecycle::LCHead& head, + rgw::sal::Lifecycle::LCEntry& entry, + time_t start_date) +{ + int ret{0}; + std::unique_ptr<rgw::sal::Lifecycle::LCEntry> next_entry; + + ret = sal_lc->get_next_entry(lc_shard, entry.get_bucket(), &next_entry); + if (ret < 0) { + ldpp_dout(this, 0) << "RGWLC::process() failed to get obj entry " + << lc_shard << dendl; + goto exit; + } + + /* save the next position */ + head.set_marker(next_entry->get_bucket()); + head.set_start_date(start_date); + + ret = sal_lc->put_head(lc_shard, head); + if (ret < 0) { + ldpp_dout(this, 0) << "RGWLC::process() failed to put head " + << lc_shard + << dendl; + goto exit; + } +exit: + return ret; +} /* advance head */ + +int RGWLC::process(int index, int max_lock_secs, LCWorker* worker, + bool once = false) +{ + int ret{0}; + const auto& lc_shard = obj_names[index]; + + std::unique_ptr<rgw::sal::Lifecycle::LCHead> head; + std::unique_ptr<rgw::sal::Lifecycle::LCEntry> entry; //string = bucket_name:bucket_id, start_time, int = LC_BUCKET_STATUS + + ldpp_dout(this, 5) << "RGWLC::process(): ENTER: " + << "index: " << index << " worker ix: " << worker->ix + << dendl; + + std::unique_ptr<rgw::sal::LCSerializer> lock = + sal_lc->get_serializer(lc_index_lock_name, lc_shard, worker->thr_name()); + + utime_t lock_for_s(max_lock_secs, 0); + const auto& lock_lambda = [&]() { + ret = lock->try_lock(this, lock_for_s, null_yield); + if (ret == 0) { + return true; + } + if (ret == -EBUSY || ret == -EEXIST) { + /* already locked by another lc processor */ + return false; + } + return false; + }; + + SimpleBackoff shard_lock(5 /* max retries */, 50ms); + if (! shard_lock.wait_backoff(lock_lambda)) { + ldpp_dout(this, 0) << "RGWLC::process(): failed to aquire lock on " + << lc_shard << " after " << shard_lock.get_retries() + << dendl; + return 0; + } + + do { + utime_t now = ceph_clock_now(); + + /* preamble: find an inital bucket/marker */ + ret = sal_lc->get_head(lc_shard, &head); + if (ret < 0) { + ldpp_dout(this, 0) << "RGWLC::process() failed to get obj head " + << lc_shard << ", ret=" << ret << dendl; + goto exit; + } + + /* if there is nothing at head, try to reinitialize head.marker with the + * first entry in the queue */ + if (head->get_marker().empty() && + allow_shard_rollover(cct, now, head->get_shard_rollover_date()) /* prevent multiple passes by diff. + * rgws,in same cycle */) { + + ldpp_dout(this, 5) << "RGWLC::process() process shard rollover lc_shard=" << lc_shard + << " head.marker=" << head->get_marker() + << " head.shard_rollover_date=" << head->get_shard_rollover_date() + << dendl; + + vector<std::unique_ptr<rgw::sal::Lifecycle::LCEntry>> entries; + int ret = sal_lc->list_entries(lc_shard, head->get_marker(), 1, entries); + if (ret < 0) { + ldpp_dout(this, 0) << "RGWLC::process() sal_lc->list_entries(lc_shard, head.marker, 1, " + << "entries) returned error ret==" << ret << dendl; + goto exit; + } + if (entries.size() > 0) { + entry = std::move(entries.front()); + head->set_marker(entry->get_bucket()); + head->set_start_date(now); + head->set_shard_rollover_date(0); + } + } else { + ldpp_dout(this, 0) << "RGWLC::process() head.marker !empty() at START for shard==" + << lc_shard << " head last stored at " + << rgw_to_asctime(utime_t(time_t(head->get_start_date()), 0)) + << dendl; + + /* fetches the entry pointed to by head.bucket */ + ret = sal_lc->get_entry(lc_shard, head->get_marker(), &entry); + if (ret < 0) { + ldpp_dout(this, 0) << "RGWLC::process() sal_lc->get_entry(lc_shard, head.marker, entry) " + << "returned error ret==" << ret << dendl; + goto exit; + } + } + + if (entry && !entry->get_bucket().empty()) { + if (entry->get_status() == lc_processing) { + if (expired_session(entry->get_start_time())) { + ldpp_dout(this, 5) + << "RGWLC::process(): STALE lc session found for: " << entry + << " index: " << index << " worker ix: " << worker->ix + << " (clearing)" << dendl; + } else { + ldpp_dout(this, 5) + << "RGWLC::process(): ACTIVE entry: " << entry + << " index: " << index << " worker ix: " << worker->ix << dendl; + /* skip to next entry */ + if (advance_head(lc_shard, *head.get(), *entry.get(), now) < 0) { + goto exit; + } + /* done with this shard */ + if (head->get_marker().empty()) { + ldpp_dout(this, 5) << + "RGWLC::process() cycle finished lc_shard=" + << lc_shard + << dendl; + head->set_shard_rollover_date(ceph_clock_now()); + ret = sal_lc->put_head(lc_shard, *head.get()); + if (ret < 0) { + ldpp_dout(this, 0) << "RGWLC::process() failed to put head " + << lc_shard + << dendl; + } + goto exit; + } + continue; + } + } else { + if ((entry->get_status() == lc_complete) && + already_run_today(cct, entry->get_start_time())) { + /* skip to next entry */ + if (advance_head(lc_shard, *head.get(), *entry.get(), now) < 0) { + goto exit; + } + ldpp_dout(this, 5) << "RGWLC::process() worker ix; " << worker->ix + << " SKIP processing for already-processed bucket " << entry->get_bucket() + << dendl; + /* done with this shard */ + if (head->get_marker().empty()) { + ldpp_dout(this, 5) << + "RGWLC::process() cycle finished lc_shard=" + << lc_shard + << dendl; + head->set_shard_rollover_date(ceph_clock_now()); + ret = sal_lc->put_head(lc_shard, *head.get()); + if (ret < 0) { + ldpp_dout(this, 0) << "RGWLC::process() failed to put head " + << lc_shard + << dendl; + } + goto exit; + } + continue; + } + } + } else { + ldpp_dout(this, 5) << "RGWLC::process() entry.bucket.empty() == true at START 1" + << " (this is possible mainly before any lc policy has been stored" + << " or after removal of an lc_shard object)" + << dendl; + goto exit; + } + + /* When there are no more entries to process, entry will be + * equivalent to an empty marker and so the following resets the + * processing for the shard automatically when processing is + * finished for the shard */ + ldpp_dout(this, 5) << "RGWLC::process(): START entry 1: " << entry + << " index: " << index << " worker ix: " << worker->ix + << dendl; + + entry->set_status(lc_processing); + entry->set_start_time(now); + + ret = sal_lc->set_entry(lc_shard, *entry); + if (ret < 0) { + ldpp_dout(this, 0) << "RGWLC::process() failed to set obj entry " + << lc_shard << entry->get_bucket() << entry->get_status() << dendl; + goto exit; + } + + /* advance head for next waiter, then process */ + if (advance_head(lc_shard, *head.get(), *entry.get(), now) < 0) { + goto exit; + } + + ldpp_dout(this, 5) << "RGWLC::process(): START entry 2: " << entry + << " index: " << index << " worker ix: " << worker->ix + << dendl; + + /* drop lock so other instances can make progress while this + * bucket is being processed */ + lock->unlock(); + ret = bucket_lc_process(entry->get_bucket(), worker, thread_stop_at(), once); + + /* postamble */ + //bucket_lc_post(index, max_lock_secs, entry, ret, worker); + if (! shard_lock.wait_backoff(lock_lambda)) { + ldpp_dout(this, 0) << "RGWLC::process(): failed to aquire lock on " + << lc_shard << " after " << shard_lock.get_retries() + << dendl; + return 0; + } + + if (ret == -ENOENT) { + /* XXXX are we SURE the only way result could == ENOENT is when + * there is no such bucket? It is currently the value returned + * from bucket_lc_process(...) */ + ret = sal_lc->rm_entry(lc_shard, *entry); + if (ret < 0) { + ldpp_dout(this, 0) << "RGWLC::process() failed to remove entry " + << lc_shard << " (nonfatal)" + << dendl; + /* not fatal, could result from a race */ + } + } else { + if (ret < 0) { + entry->set_status(lc_failed); + } else { + entry->set_status(lc_complete); + } + ret = sal_lc->set_entry(lc_shard, *entry); + if (ret < 0) { + ldpp_dout(this, 0) << "RGWLC::process() failed to set entry on " + << lc_shard + << dendl; + /* fatal, locked */ + goto exit; + } + } + + /* done with this shard */ + if (head->get_marker().empty()) { + ldpp_dout(this, 5) << + "RGWLC::process() cycle finished lc_shard=" + << lc_shard + << dendl; + head->set_shard_rollover_date(ceph_clock_now()); + ret = sal_lc->put_head(lc_shard, *head.get()); + if (ret < 0) { + ldpp_dout(this, 0) << "RGWLC::process() failed to put head " + << lc_shard + << dendl; + } + goto exit; + } + } while(1 && !once && !going_down()); + +exit: + lock->unlock(); + return 0; +} + +void RGWLC::start_processor() +{ + auto maxw = cct->_conf->rgw_lc_max_worker; + workers.reserve(maxw); + for (int ix = 0; ix < maxw; ++ix) { + auto worker = + std::make_unique<RGWLC::LCWorker>(this /* dpp */, cct, this, ix); + worker->create((string{"lifecycle_thr_"} + to_string(ix)).c_str()); + workers.emplace_back(std::move(worker)); + } +} + +void RGWLC::stop_processor() +{ + down_flag = true; + for (auto& worker : workers) { + worker->stop(); + worker->join(); + } + workers.clear(); +} + +unsigned RGWLC::get_subsys() const +{ + return dout_subsys; +} + +std::ostream& RGWLC::gen_prefix(std::ostream& out) const +{ + return out << "lifecycle: "; +} + +void RGWLC::LCWorker::stop() +{ + std::lock_guard l{lock}; + cond.notify_all(); +} + +bool RGWLC::going_down() +{ + return down_flag; +} + +bool RGWLC::LCWorker::should_work(utime_t& now) +{ + int start_hour; + int start_minute; + int end_hour; + int end_minute; + string worktime = cct->_conf->rgw_lifecycle_work_time; + sscanf(worktime.c_str(),"%d:%d-%d:%d",&start_hour, &start_minute, + &end_hour, &end_minute); + struct tm bdt; + time_t tt = now.sec(); + localtime_r(&tt, &bdt); + + if (cct->_conf->rgw_lc_debug_interval > 0) { + /* We're debugging, so say we can run */ + return true; + } else if ((bdt.tm_hour*60 + bdt.tm_min >= start_hour*60 + start_minute) && + (bdt.tm_hour*60 + bdt.tm_min <= end_hour*60 + end_minute)) { + return true; + } else { + return false; + } + +} + +int RGWLC::LCWorker::schedule_next_start_time(utime_t &start, utime_t& now) +{ + int secs; + + if (cct->_conf->rgw_lc_debug_interval > 0) { + secs = start + cct->_conf->rgw_lc_debug_interval - now; + if (secs < 0) + secs = 0; + return (secs); + } + + int start_hour; + int start_minute; + int end_hour; + int end_minute; + string worktime = cct->_conf->rgw_lifecycle_work_time; + sscanf(worktime.c_str(),"%d:%d-%d:%d",&start_hour, &start_minute, &end_hour, + &end_minute); + struct tm bdt; + time_t tt = now.sec(); + time_t nt; + localtime_r(&tt, &bdt); + bdt.tm_hour = start_hour; + bdt.tm_min = start_minute; + bdt.tm_sec = 0; + nt = mktime(&bdt); + secs = nt - tt; + + return secs>0 ? secs : secs+24*60*60; +} + +RGWLC::LCWorker::~LCWorker() +{ + delete workpool; +} /* ~LCWorker */ + +void RGWLifecycleConfiguration::generate_test_instances( + list<RGWLifecycleConfiguration*>& o) +{ + o.push_back(new RGWLifecycleConfiguration); +} + +template<typename F> +static int guard_lc_modify(const DoutPrefixProvider *dpp, + rgw::sal::Driver* driver, + rgw::sal::Lifecycle* sal_lc, + const rgw_bucket& bucket, const string& cookie, + const F& f) { + CephContext *cct = driver->ctx(); + + auto bucket_lc_key = get_bucket_lc_key(bucket); + string oid; + get_lc_oid(cct, bucket_lc_key, &oid); + + /* XXX it makes sense to take shard_id for a bucket_id? */ + std::unique_ptr<rgw::sal::Lifecycle::LCEntry> entry = sal_lc->get_entry(); + entry->set_bucket(bucket_lc_key); + entry->set_status(lc_uninitial); + int max_lock_secs = cct->_conf->rgw_lc_lock_max_time; + + std::unique_ptr<rgw::sal::LCSerializer> lock = + sal_lc->get_serializer(lc_index_lock_name, oid, cookie); + utime_t time(max_lock_secs, 0); + + int ret; + uint16_t retries{0}; + + // due to reports of starvation trying to save lifecycle policy, try hard + do { + ret = lock->try_lock(dpp, time, null_yield); + if (ret == -EBUSY || ret == -EEXIST) { + ldpp_dout(dpp, 0) << "RGWLC::RGWPutLC() failed to acquire lock on " + << oid << ", retry in 100ms, ret=" << ret << dendl; + std::this_thread::sleep_for(std::chrono::milliseconds(100)); + // the typical S3 client will time out in 60s + if(retries++ < 500) { + continue; + } + } + if (ret < 0) { + ldpp_dout(dpp, 0) << "RGWLC::RGWPutLC() failed to acquire lock on " + << oid << ", ret=" << ret << dendl; + break; + } + ret = f(sal_lc, oid, *entry.get()); + if (ret < 0) { + ldpp_dout(dpp, 0) << "RGWLC::RGWPutLC() failed to set entry on " + << oid << ", ret=" << ret << dendl; + } + break; + } while(true); + lock->unlock(); + return ret; +} + +int RGWLC::set_bucket_config(rgw::sal::Bucket* bucket, + const rgw::sal::Attrs& bucket_attrs, + RGWLifecycleConfiguration *config) +{ + int ret{0}; + rgw::sal::Attrs attrs = bucket_attrs; + if (config) { + /* if no RGWLifecycleconfiguration provided, it means + * RGW_ATTR_LC is already valid and present */ + bufferlist lc_bl; + config->encode(lc_bl); + attrs[RGW_ATTR_LC] = std::move(lc_bl); + + ret = + bucket->merge_and_store_attrs(this, attrs, null_yield); + if (ret < 0) { + return ret; + } + } + + rgw_bucket& b = bucket->get_key(); + + + ret = guard_lc_modify(this, driver, sal_lc.get(), b, cookie, + [&](rgw::sal::Lifecycle* sal_lc, const string& oid, + rgw::sal::Lifecycle::LCEntry& entry) { + return sal_lc->set_entry(oid, entry); + }); + + return ret; +} + +int RGWLC::remove_bucket_config(rgw::sal::Bucket* bucket, + const rgw::sal::Attrs& bucket_attrs, + bool merge_attrs) +{ + rgw::sal::Attrs attrs = bucket_attrs; + rgw_bucket& b = bucket->get_key(); + int ret{0}; + + if (merge_attrs) { + attrs.erase(RGW_ATTR_LC); + ret = bucket->merge_and_store_attrs(this, attrs, null_yield); + + if (ret < 0) { + ldpp_dout(this, 0) << "RGWLC::RGWDeleteLC() failed to set attrs on bucket=" + << b.name << " returned err=" << ret << dendl; + return ret; + } + } + + ret = guard_lc_modify(this, driver, sal_lc.get(), b, cookie, + [&](rgw::sal::Lifecycle* sal_lc, const string& oid, + rgw::sal::Lifecycle::LCEntry& entry) { + return sal_lc->rm_entry(oid, entry); + }); + + return ret; +} /* RGWLC::remove_bucket_config */ + +RGWLC::~RGWLC() +{ + stop_processor(); + finalize(); +} /* ~RGWLC() */ + +namespace rgw::lc { + +int fix_lc_shard_entry(const DoutPrefixProvider *dpp, + rgw::sal::Driver* driver, + rgw::sal::Lifecycle* sal_lc, + rgw::sal::Bucket* bucket) +{ + if (auto aiter = bucket->get_attrs().find(RGW_ATTR_LC); + aiter == bucket->get_attrs().end()) { + return 0; // No entry, nothing to fix + } + + auto bucket_lc_key = get_bucket_lc_key(bucket->get_key()); + std::string lc_oid; + get_lc_oid(driver->ctx(), bucket_lc_key, &lc_oid); + + std::unique_ptr<rgw::sal::Lifecycle::LCEntry> entry; + // There are multiple cases we need to encounter here + // 1. entry exists and is already set to marker, happens in plain buckets & newly resharded buckets + // 2. entry doesn't exist, which usually happens when reshard has happened prior to update and next LC process has already dropped the update + // 3. entry exists matching the current bucket id which was after a reshard (needs to be updated to the marker) + // We are not dropping the old marker here as that would be caught by the next LC process update + int ret = sal_lc->get_entry(lc_oid, bucket_lc_key, &entry); + if (ret == 0) { + ldpp_dout(dpp, 5) << "Entry already exists, nothing to do" << dendl; + return ret; // entry is already existing correctly set to marker + } + ldpp_dout(dpp, 5) << "lc_get_entry errored ret code=" << ret << dendl; + if (ret == -ENOENT) { + ldpp_dout(dpp, 1) << "No entry for bucket=" << bucket + << " creating " << dendl; + // TODO: we have too many ppl making cookies like this! + char cookie_buf[COOKIE_LEN + 1]; + gen_rand_alphanumeric(driver->ctx(), cookie_buf, sizeof(cookie_buf) - 1); + std::string cookie = cookie_buf; + + ret = guard_lc_modify(dpp, + driver, sal_lc, bucket->get_key(), cookie, + [&lc_oid](rgw::sal::Lifecycle* slc, + const string& oid, + rgw::sal::Lifecycle::LCEntry& entry) { + return slc->set_entry(lc_oid, entry); + }); + + } + + return ret; +} + +std::string s3_expiration_header( + DoutPrefixProvider* dpp, + const rgw_obj_key& obj_key, + const RGWObjTags& obj_tagset, + const ceph::real_time& mtime, + const std::map<std::string, buffer::list>& bucket_attrs) +{ + CephContext* cct = dpp->get_cct(); + RGWLifecycleConfiguration config(cct); + std::string hdr{""}; + + const auto& aiter = bucket_attrs.find(RGW_ATTR_LC); + if (aiter == bucket_attrs.end()) + return hdr; + + bufferlist::const_iterator iter{&aiter->second}; + try { + config.decode(iter); + } catch (const buffer::error& e) { + ldpp_dout(dpp, 0) << __func__ + << "() decode life cycle config failed" + << dendl; + return hdr; + } /* catch */ + + /* dump tags at debug level 16 */ + RGWObjTags::tag_map_t obj_tag_map = obj_tagset.get_tags(); + if (cct->_conf->subsys.should_gather(ceph_subsys_rgw, 16)) { + for (const auto& elt : obj_tag_map) { + ldpp_dout(dpp, 16) << __func__ + << "() key=" << elt.first << " val=" << elt.second + << dendl; + } + } + + boost::optional<ceph::real_time> expiration_date; + boost::optional<std::string> rule_id; + + const auto& rule_map = config.get_rule_map(); + for (const auto& ri : rule_map) { + const auto& rule = ri.second; + auto& id = rule.get_id(); + auto& filter = rule.get_filter(); + auto& prefix = filter.has_prefix() ? filter.get_prefix(): rule.get_prefix(); + auto& expiration = rule.get_expiration(); + auto& noncur_expiration = rule.get_noncur_expiration(); + + ldpp_dout(dpp, 10) << "rule: " << ri.first + << " prefix: " << prefix + << " expiration: " + << " date: " << expiration.get_date() + << " days: " << expiration.get_days() + << " noncur_expiration: " + << " date: " << noncur_expiration.get_date() + << " days: " << noncur_expiration.get_days() + << dendl; + + /* skip if rule !enabled + * if rule has prefix, skip iff object !match prefix + * if rule has tags, skip iff object !match tags + * note if object is current or non-current, compare accordingly + * if rule has days, construct date expression and save iff older + * than last saved + * if rule has date, convert date expression and save iff older + * than last saved + * if the date accum has a value, format it into hdr + */ + + if (! rule.is_enabled()) + continue; + + if(! prefix.empty()) { + if (! boost::starts_with(obj_key.name, prefix)) + continue; + } + + if (filter.has_tags()) { + bool tag_match = false; + const RGWObjTags& rule_tagset = filter.get_tags(); + for (auto& tag : rule_tagset.get_tags()) { + /* remember, S3 tags are {key,value} tuples */ + tag_match = true; + auto obj_tag = obj_tag_map.find(tag.first); + if (obj_tag == obj_tag_map.end() || obj_tag->second != tag.second) { + ldpp_dout(dpp, 10) << "tag does not match obj_key=" << obj_key + << " rule_id=" << id + << " tag=" << tag + << dendl; + tag_match = false; + break; + } + } + if (! tag_match) + continue; + } + + // compute a uniform expiration date + boost::optional<ceph::real_time> rule_expiration_date; + const LCExpiration& rule_expiration = + (obj_key.instance.empty()) ? expiration : noncur_expiration; + + if (rule_expiration.has_date()) { + rule_expiration_date = + boost::optional<ceph::real_time>( + ceph::from_iso_8601(rule.get_expiration().get_date())); + } else { + if (rule_expiration.has_days()) { + rule_expiration_date = + boost::optional<ceph::real_time>( + mtime + make_timespan(double(rule_expiration.get_days())*24*60*60 - ceph::real_clock::to_time_t(mtime)%(24*60*60) + 24*60*60)); + } + } + + // update earliest expiration + if (rule_expiration_date) { + if ((! expiration_date) || + (*expiration_date > *rule_expiration_date)) { + expiration_date = + boost::optional<ceph::real_time>(rule_expiration_date); + rule_id = boost::optional<std::string>(id); + } + } + } + + // cond format header + if (expiration_date && rule_id) { + // Fri, 23 Dec 2012 00:00:00 GMT + char exp_buf[100]; + time_t exp = ceph::real_clock::to_time_t(*expiration_date); + if (std::strftime(exp_buf, sizeof(exp_buf), + "%a, %d %b %Y %T %Z", std::gmtime(&exp))) { + hdr = fmt::format("expiry-date=\"{0}\", rule-id=\"{1}\"", exp_buf, + *rule_id); + } else { + ldpp_dout(dpp, 0) << __func__ << + "() strftime of life cycle expiration header failed" + << dendl; + } + } + + return hdr; + +} /* rgwlc_s3_expiration_header */ + +bool s3_multipart_abort_header( + DoutPrefixProvider* dpp, + const rgw_obj_key& obj_key, + const ceph::real_time& mtime, + const std::map<std::string, buffer::list>& bucket_attrs, + ceph::real_time& abort_date, + std::string& rule_id) +{ + CephContext* cct = dpp->get_cct(); + RGWLifecycleConfiguration config(cct); + + const auto& aiter = bucket_attrs.find(RGW_ATTR_LC); + if (aiter == bucket_attrs.end()) + return false; + + bufferlist::const_iterator iter{&aiter->second}; + try { + config.decode(iter); + } catch (const buffer::error& e) { + ldpp_dout(dpp, 0) << __func__ + << "() decode life cycle config failed" + << dendl; + return false; + } /* catch */ + + std::optional<ceph::real_time> abort_date_tmp; + std::optional<std::string_view> rule_id_tmp; + const auto& rule_map = config.get_rule_map(); + for (const auto& ri : rule_map) { + const auto& rule = ri.second; + const auto& id = rule.get_id(); + const auto& filter = rule.get_filter(); + const auto& prefix = filter.has_prefix()?filter.get_prefix():rule.get_prefix(); + const auto& mp_expiration = rule.get_mp_expiration(); + if (!rule.is_enabled()) { + continue; + } + if(!prefix.empty() && !boost::starts_with(obj_key.name, prefix)) { + continue; + } + + std::optional<ceph::real_time> rule_abort_date; + if (mp_expiration.has_days()) { + rule_abort_date = std::optional<ceph::real_time>( + mtime + make_timespan(mp_expiration.get_days()*24*60*60 - ceph::real_clock::to_time_t(mtime)%(24*60*60) + 24*60*60)); + } + + // update earliest abort date + if (rule_abort_date) { + if ((! abort_date_tmp) || + (*abort_date_tmp > *rule_abort_date)) { + abort_date_tmp = + std::optional<ceph::real_time>(rule_abort_date); + rule_id_tmp = std::optional<std::string_view>(id); + } + } + } + if (abort_date_tmp && rule_id_tmp) { + abort_date = *abort_date_tmp; + rule_id = *rule_id_tmp; + return true; + } else { + return false; + } +} + +} /* namespace rgw::lc */ + +void lc_op::dump(Formatter *f) const +{ + f->dump_bool("status", status); + f->dump_bool("dm_expiration", dm_expiration); + + f->dump_int("expiration", expiration); + f->dump_int("noncur_expiration", noncur_expiration); + f->dump_int("mp_expiration", mp_expiration); + if (expiration_date) { + utime_t ut(*expiration_date); + f->dump_stream("expiration_date") << ut; + } + if (obj_tags) { + f->dump_object("obj_tags", *obj_tags); + } + f->open_object_section("transitions"); + for(auto& [storage_class, transition] : transitions) { + f->dump_object(storage_class, transition); + } + f->close_section(); + + f->open_object_section("noncur_transitions"); + for (auto& [storage_class, transition] : noncur_transitions) { + f->dump_object(storage_class, transition); + } + f->close_section(); +} + +void LCFilter::dump(Formatter *f) const +{ + f->dump_string("prefix", prefix); + f->dump_object("obj_tags", obj_tags); + if (have_flag(LCFlagType::ArchiveZone)) { + f->dump_string("archivezone", ""); + } +} + +void LCExpiration::dump(Formatter *f) const +{ + f->dump_string("days", days); + f->dump_string("date", date); +} + +void LCRule::dump(Formatter *f) const +{ + f->dump_string("id", id); + f->dump_string("prefix", prefix); + f->dump_string("status", status); + f->dump_object("expiration", expiration); + f->dump_object("noncur_expiration", noncur_expiration); + f->dump_object("mp_expiration", mp_expiration); + f->dump_object("filter", filter); + f->open_object_section("transitions"); + for (auto& [storage_class, transition] : transitions) { + f->dump_object(storage_class, transition); + } + f->close_section(); + + f->open_object_section("noncur_transitions"); + for (auto& [storage_class, transition] : noncur_transitions) { + f->dump_object(storage_class, transition); + } + f->close_section(); + f->dump_bool("dm_expiration", dm_expiration); +} + + +void RGWLifecycleConfiguration::dump(Formatter *f) const +{ + f->open_object_section("prefix_map"); + for (auto& prefix : prefix_map) { + f->dump_object(prefix.first.c_str(), prefix.second); + } + f->close_section(); + + f->open_array_section("rule_map"); + for (auto& rule : rule_map) { + f->open_object_section("entry"); + f->dump_string("id", rule.first); + f->open_object_section("rule"); + rule.second.dump(f); + f->close_section(); + f->close_section(); + } + f->close_section(); +} + |