diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-27 18:24:20 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-27 18:24:20 +0000 |
commit | 483eb2f56657e8e7f419ab1a4fab8dce9ade8609 (patch) | |
tree | e5d88d25d870d5dedacb6bbdbe2a966086a0a5cf /src/rgw/rgw_lc.cc | |
parent | Initial commit. (diff) | |
download | ceph-483eb2f56657e8e7f419ab1a4fab8dce9ade8609.tar.xz ceph-483eb2f56657e8e7f419ab1a4fab8dce9ade8609.zip |
Adding upstream version 14.2.21.upstream/14.2.21upstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'src/rgw/rgw_lc.cc')
-rw-r--r-- | src/rgw/rgw_lc.cc | 1678 |
1 files changed, 1678 insertions, 0 deletions
diff --git a/src/rgw/rgw_lc.cc b/src/rgw/rgw_lc.cc new file mode 100644 index 00000000..eeb14be1 --- /dev/null +++ b/src/rgw/rgw_lc.cc @@ -0,0 +1,1678 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#include <string.h> +#include <iostream> +#include <map> + +#include <boost/algorithm/string/split.hpp> +#include <boost/algorithm/string.hpp> +#include <boost/algorithm/string/predicate.hpp> + +#include "common/Formatter.h" +#include <common/errno.h> +#include "include/random.h" +#include "cls/rgw/cls_rgw_client.h" +#include "cls/lock/cls_lock_client.h" +#include "rgw_common.h" +#include "rgw_bucket.h" +#include "rgw_lc.h" +#include "rgw_zone.h" +#include "rgw_string.h" + +// this seems safe to use, at least for now--arguably, we should +// prefer header-only fmt, in general +#undef FMT_HEADER_ONLY +#define FMT_HEADER_ONLY 1 +#include "seastar/fmt/include/fmt/format.h" + +#include "services/svc_sys_obj.h" + +#define dout_context g_ceph_context +#define dout_subsys ceph_subsys_rgw + +const char* LC_STATUS[] = { + "UNINITIAL", + "PROCESSING", + "FAILED", + "COMPLETE" +}; + +using namespace librados; + +bool LCRule::valid() const +{ + if (id.length() > MAX_ID_LEN) { + return false; + } + else if(expiration.empty() && noncur_expiration.empty() && mp_expiration.empty() && !dm_expiration && + transitions.empty() && noncur_transitions.empty()) { + return false; + } + else if (!expiration.valid() || !noncur_expiration.valid() || !mp_expiration.valid()) { + return false; + } + if (!transitions.empty()) { + bool using_days = expiration.has_days(); + bool using_date = expiration.has_date(); + for (const auto& elem : transitions) { + if (!elem.second.valid()) { + return false; + } + using_days = using_days || elem.second.has_days(); + using_date = using_date || elem.second.has_date(); + if (using_days && using_date) { + return false; + } + } + } + for (const auto& elem : noncur_transitions) { + if (!elem.second.valid()) { + return false; + } + } + + return true; +} + +void LCRule::init_simple_days_rule(std::string_view _id, std::string_view _prefix, int num_days) +{ + id = _id; + prefix = _prefix; + char buf[32]; + snprintf(buf, sizeof(buf), "%d", num_days); + expiration.set_days(buf); + set_enabled(true); +} + +void RGWLifecycleConfiguration::add_rule(const LCRule& rule) +{ + auto& id = rule.get_id(); // note that this will return false for groups, but that's ok, we won't search groups + rule_map.insert(pair<string, LCRule>(id, rule)); +} + +bool RGWLifecycleConfiguration::_add_rule(const LCRule& rule) +{ + lc_op op(rule.get_id()); + op.status = rule.is_enabled(); + if (rule.get_expiration().has_days()) { + op.expiration = rule.get_expiration().get_days(); + } + if (rule.get_expiration().has_date()) { + op.expiration_date = ceph::from_iso_8601(rule.get_expiration().get_date()); + } + if (rule.get_noncur_expiration().has_days()) { + op.noncur_expiration = rule.get_noncur_expiration().get_days(); + } + if (rule.get_mp_expiration().has_days()) { + op.mp_expiration = rule.get_mp_expiration().get_days(); + } + op.dm_expiration = rule.get_dm_expiration(); + for (const auto &elem : rule.get_transitions()) { + transition_action action; + if (elem.second.has_days()) { + action.days = elem.second.get_days(); + } else { + action.date = ceph::from_iso_8601(elem.second.get_date()); + } + action.storage_class = rgw_placement_rule::get_canonical_storage_class(elem.first); + op.transitions.emplace(elem.first, std::move(action)); + } + for (const auto &elem : rule.get_noncur_transitions()) { + transition_action action; + action.days = elem.second.get_days(); + action.date = ceph::from_iso_8601(elem.second.get_date()); + action.storage_class = elem.first; + op.noncur_transitions.emplace(elem.first, std::move(action)); + } + std::string prefix; + if (rule.get_filter().has_prefix()){ + prefix = rule.get_filter().get_prefix(); + } else { + prefix = rule.get_prefix(); + } + + if (rule.get_filter().has_tags()){ + op.obj_tags = rule.get_filter().get_tags(); + } + prefix_map.emplace(std::move(prefix), std::move(op)); + return true; +} + +int RGWLifecycleConfiguration::check_and_add_rule(const LCRule& rule) +{ + if (!rule.valid()) { + return -EINVAL; + } + auto& id = rule.get_id(); + if (rule_map.find(id) != rule_map.end()) { //id shouldn't be the same + return -EINVAL; + } + rule_map.insert(pair<string, LCRule>(id, rule)); + + if (!_add_rule(rule)) { + return -ERR_INVALID_REQUEST; + } + return 0; +} + +bool RGWLifecycleConfiguration::has_same_action(const lc_op& first, const lc_op& second) { + if ((first.expiration > 0 || first.expiration_date != boost::none) && + (second.expiration > 0 || second.expiration_date != boost::none)) { + return true; + } else if (first.noncur_expiration > 0 && second.noncur_expiration > 0) { + return true; + } else if (first.mp_expiration > 0 && second.mp_expiration > 0) { + return true; + } else if (!first.transitions.empty() && !second.transitions.empty()) { + for (auto &elem : first.transitions) { + if (second.transitions.find(elem.first) != second.transitions.end()) { + return true; + } + } + } else if (!first.noncur_transitions.empty() && !second.noncur_transitions.empty()) { + for (auto &elem : first.noncur_transitions) { + if (second.noncur_transitions.find(elem.first) != second.noncur_transitions.end()) { + return true; + } + } + } + return false; +} + +/* Formerly, this method checked for duplicate rules using an invalid + * method (prefix uniqueness). */ +bool RGWLifecycleConfiguration::valid() +{ + return true; +} + +void *RGWLC::LCWorker::entry() { + do { + utime_t start = ceph_clock_now(); + if (should_work(start)) { + ldpp_dout(dpp, 2) << "life cycle: start" << dendl; + int r = lc->process(); + if (r < 0) { + ldpp_dout(dpp, 0) << "ERROR: do life cycle process() returned error r=" << r << dendl; + } + ldpp_dout(dpp, 2) << "life cycle: stop" << dendl; + } + if (lc->going_down()) + break; + + utime_t end = ceph_clock_now(); + int secs = schedule_next_start_time(start, end); + utime_t next; + next.set_from_double(end + secs); + + ldpp_dout(dpp, 5) << "schedule life cycle next start time: " << rgw_to_asctime(next) << dendl; + + lock.Lock(); + cond.WaitInterval(lock, utime_t(secs, 0)); + lock.Unlock(); + } while (!lc->going_down()); + + return NULL; +} + +void RGWLC::initialize(CephContext *_cct, RGWRados *_store) { + cct = _cct; + store = _store; + max_objs = cct->_conf->rgw_lc_max_objs; + if (max_objs > HASH_PRIME) + max_objs = HASH_PRIME; + + obj_names = new string[max_objs]; + + for (int i = 0; i < max_objs; i++) { + obj_names[i] = lc_oid_prefix; + char buf[32]; + snprintf(buf, 32, ".%d", i); + obj_names[i].append(buf); + } + +#define COOKIE_LEN 16 + char cookie_buf[COOKIE_LEN + 1]; + gen_rand_alphanumeric(cct, cookie_buf, sizeof(cookie_buf) - 1); + cookie = cookie_buf; +} + +void RGWLC::finalize() +{ + delete[] obj_names; +} + +bool RGWLC::if_already_run_today(time_t& start_date) +{ + struct tm bdt; + time_t begin_of_day; + utime_t now = ceph_clock_now(); + localtime_r(&start_date, &bdt); + + if (cct->_conf->rgw_lc_debug_interval > 0) { + if (now - start_date < cct->_conf->rgw_lc_debug_interval) + return true; + else + return false; + } + + bdt.tm_hour = 0; + bdt.tm_min = 0; + bdt.tm_sec = 0; + begin_of_day = mktime(&bdt); + if (now - begin_of_day < 24*60*60) + return true; + else + return false; +} + +int RGWLC::bucket_lc_prepare(int index) +{ + map<string, int > entries; + + string marker; + +#define MAX_LC_LIST_ENTRIES 100 + do { + int ret = cls_rgw_lc_list(store->lc_pool_ctx, obj_names[index], marker, MAX_LC_LIST_ENTRIES, entries); + if (ret < 0) + return ret; + map<string, int>::iterator iter; + for (iter = entries.begin(); iter != entries.end(); ++iter) { + pair<string, int > entry(iter->first, lc_uninitial); + ret = cls_rgw_lc_set_entry(store->lc_pool_ctx, obj_names[index], entry); + if (ret < 0) { + ldpp_dout(this, 0) << "RGWLC::bucket_lc_prepare() failed to set entry on " + << obj_names[index] << dendl; + return ret; + } + } + + if (!entries.empty()) { + marker = std::move(entries.rbegin()->first); + } + } while (!entries.empty()); + + return 0; +} + +static bool obj_has_expired(CephContext *cct, ceph::real_time mtime, int days, ceph::real_time *expire_time = nullptr) +{ + double timediff, cmp; + utime_t base_time; + if (cct->_conf->rgw_lc_debug_interval <= 0) { + /* Normal case, run properly */ + cmp = double(days)*24*60*60; + base_time = ceph_clock_now().round_to_day(); + } else { + /* We're in debug mode; Treat each rgw_lc_debug_interval seconds as a day */ + cmp = double(days)*cct->_conf->rgw_lc_debug_interval; + base_time = ceph_clock_now(); + } + timediff = base_time - ceph::real_clock::to_time_t(mtime); + + if (expire_time) { + *expire_time = mtime + make_timespan(cmp); + } + ldout(cct, 20) << __func__ << "(): mtime=" << mtime << " days=" << days << " base_time=" << base_time << " timediff=" << timediff << " cmp=" << cmp << dendl; + + return (timediff >= cmp); +} + +static bool pass_object_lock_check(RGWRados *store, RGWBucketInfo& bucket_info, rgw_obj& obj, RGWObjectCtx& ctx) +{ + if (!bucket_info.obj_lock_enabled()) { + return true; + } + RGWRados::Object op_target(store, bucket_info, ctx, obj); + RGWRados::Object::Read read_op(&op_target); + map<string, bufferlist> attrs; + read_op.params.attrs = &attrs; + int ret = read_op.prepare(); + if (ret < 0) { + if (ret == -ENOENT) { + return true; + } else { + return false; + } + } else { + auto iter = attrs.find(RGW_ATTR_OBJECT_RETENTION); + if (iter != attrs.end()) { + RGWObjectRetention retention; + try { + decode(retention, iter->second); + } catch (buffer::error& err) { + ldout(store->ctx(), 0) << "ERROR: failed to decode RGWObjectRetention" << dendl; + return false; + } + if (ceph::real_clock::to_time_t(retention.get_retain_until_date()) > ceph_clock_now()) { + return false; + } + } + iter = attrs.find(RGW_ATTR_OBJECT_LEGAL_HOLD); + if (iter != attrs.end()) { + RGWObjectLegalHold obj_legal_hold; + try { + decode(obj_legal_hold, iter->second); + } catch (buffer::error& err) { + ldout(store->ctx(), 0) << "ERROR: failed to decode RGWObjectLegalHold" << dendl; + return false; + } + if (obj_legal_hold.is_enabled()) { + return false; + } + } + return true; + } +} + +int RGWLC::handle_multipart_expiration( + RGWRados::Bucket *target, const multimap<string, lc_op>& prefix_map) +{ + MultipartMetaFilter mp_filter; + vector<rgw_bucket_dir_entry> objs; + RGWMPObj mp_obj; + bool is_truncated; + int ret; + RGWBucketInfo& bucket_info = target->get_bucket_info(); + RGWRados::Bucket::List list_op(target); + auto delay_ms = cct->_conf.get_val<int64_t>("rgw_lc_thread_delay"); + list_op.params.list_versions = false; + /* lifecycle processing does not depend on total order, so can + * take advantage of unorderd listing optimizations--such as + * operating on one shard at a time */ + list_op.params.allow_unordered = true; + list_op.params.ns = RGW_OBJ_NS_MULTIPART; + list_op.params.filter = &mp_filter; + for (auto prefix_iter = prefix_map.begin(); prefix_iter != prefix_map.end(); ++prefix_iter) { + if (!prefix_iter->second.status || prefix_iter->second.mp_expiration <= 0) { + continue; + } + list_op.params.prefix = prefix_iter->first; + do { + objs.clear(); + list_op.params.marker = list_op.get_next_marker(); + ret = list_op.list_objects(1000, &objs, NULL, &is_truncated); + if (ret < 0) { + if (ret == (-ENOENT)) + return 0; + ldpp_dout(this, 0) << "ERROR: store->list_objects():" <<dendl; + return ret; + } + + for (auto obj_iter = objs.begin(); obj_iter != objs.end(); ++obj_iter) { + if (obj_has_expired(cct, obj_iter->meta.mtime, prefix_iter->second.mp_expiration)) { + rgw_obj_key key(obj_iter->key); + if (!mp_obj.from_meta(key.name)) { + continue; + } + RGWObjectCtx rctx(store); + ret = abort_multipart_upload(store, cct, &rctx, bucket_info, mp_obj); + if (ret < 0 && ret != -ERR_NO_SUCH_UPLOAD) { + ldpp_dout(this, 0) << "ERROR: abort_multipart_upload failed, ret=" << ret << ", meta:" << obj_iter->key << dendl; + } else if (ret == -ERR_NO_SUCH_UPLOAD) { + ldpp_dout(this, 5) << "ERROR: abort_multipart_upload failed, ret=" << ret << ", meta:" << obj_iter->key << dendl; + } + if (going_down()) + return 0; + } + } /* for objs */ + std::this_thread::sleep_for(std::chrono::milliseconds(delay_ms)); + } while(is_truncated); + } + return 0; +} + +static int read_obj_tags(RGWRados *store, RGWBucketInfo& bucket_info, rgw_obj& obj, RGWObjectCtx& ctx, bufferlist& tags_bl) +{ + RGWRados::Object op_target(store, bucket_info, ctx, obj); + RGWRados::Object::Read read_op(&op_target); + + return read_op.get_attr(RGW_ATTR_TAGS, tags_bl); +} + +static bool is_valid_op(const lc_op& op) +{ + return (op.status && + (op.expiration > 0 + || op.expiration_date != boost::none + || op.noncur_expiration > 0 + || op.dm_expiration + || !op.transitions.empty() + || !op.noncur_transitions.empty())); +} + +static inline bool has_all_tags(const lc_op& rule_action, + const RGWObjTags& object_tags) +{ + if(! rule_action.obj_tags) + return false; + if(object_tags.count() < rule_action.obj_tags->count()) + return false; + size_t tag_count = 0; + for (const auto& tag : object_tags.get_tags()) { + const auto& rule_tags = rule_action.obj_tags->get_tags(); + const auto& iter = rule_tags.find(tag.first); + if(iter == rule_tags.end()) + continue; + if(iter->second == tag.second) + { + tag_count++; + } + /* all tags in the rule appear in obj tags */ + } + return tag_count == rule_action.obj_tags->count(); +} + +class LCObjsLister { + RGWRados *store; + RGWBucketInfo& bucket_info; + RGWRados::Bucket target; + RGWRados::Bucket::List list_op; + bool is_truncated{false}; + rgw_obj_key next_marker; + string prefix; + vector<rgw_bucket_dir_entry> objs; + vector<rgw_bucket_dir_entry>::iterator obj_iter; + rgw_bucket_dir_entry pre_obj; + int64_t delay_ms; + +public: + LCObjsLister(RGWRados *_store, RGWBucketInfo& _bucket_info) : + store(_store), bucket_info(_bucket_info), + target(store, bucket_info), list_op(&target) { + list_op.params.list_versions = bucket_info.versioned(); + list_op.params.allow_unordered = true; + delay_ms = store->ctx()->_conf.get_val<int64_t>("rgw_lc_thread_delay"); + } + + void set_prefix(const string& p) { + prefix = p; + list_op.params.prefix = prefix; + } + + int init() { + return fetch(); + } + + int fetch() { + int ret = list_op.list_objects(1000, &objs, NULL, &is_truncated); + if (ret < 0) { + return ret; + } + + obj_iter = objs.begin(); + + return 0; + } + + void delay() { + std::this_thread::sleep_for(std::chrono::milliseconds(delay_ms)); + } + + bool get_obj(rgw_bucket_dir_entry *obj) { + if (obj_iter == objs.end()) { + delay(); + return false; + } + if (is_truncated && (obj_iter + 1)==objs.end()) { + list_op.params.marker = obj_iter->key; + + int ret = fetch(); + if (ret < 0) { + ldout(store->ctx(), 0) << "ERROR: list_op returned ret=" << ret << dendl; + return ret; + } else { + obj_iter = objs.begin(); + } + delay(); + } + *obj = *obj_iter; + return true; + } + + rgw_bucket_dir_entry get_prev_obj() { + return pre_obj; + } + + void next() { + pre_obj = *obj_iter; + ++obj_iter; + } + + bool next_has_same_name() + { + if ((obj_iter + 1) == objs.end()) { + /* this should have been called after get_obj() was called, so this should + * only happen if is_truncated is false */ + return false; + } + return (obj_iter->key.name.compare((obj_iter + 1)->key.name) == 0); + } +}; + + +struct op_env { + lc_op& op; + RGWRados *store; + RGWLC *lc; + RGWBucketInfo& bucket_info; + LCObjsLister& ol; + + op_env(lc_op& _op, RGWRados *_store, RGWLC *_lc, RGWBucketInfo& _bucket_info, + LCObjsLister& _ol) : op(_op), store(_store), lc(_lc), bucket_info(_bucket_info), ol(_ol) {} +}; + +class LCRuleOp; + +struct lc_op_ctx { + CephContext *cct; + op_env& env; + rgw_bucket_dir_entry& o; + + RGWRados *store; + RGWBucketInfo& bucket_info; + lc_op& op; + LCObjsLister& ol; + + rgw_obj obj; + RGWObjectCtx rctx; + + lc_op_ctx(op_env& _env, rgw_bucket_dir_entry& _o) : cct(_env.store->ctx()), env(_env), o(_o), + store(env.store), bucket_info(env.bucket_info), op(env.op), ol(env.ol), + obj(env.bucket_info.bucket, o.key), rctx(env.store) {} +}; + +static int remove_expired_obj(lc_op_ctx& oc, bool remove_indeed) +{ + auto& store = oc.store; + auto& bucket_info = oc.bucket_info; + auto& o = oc.o; + auto obj_key = o.key; + auto& meta = o.meta; + + if (!remove_indeed) { + obj_key.instance.clear(); + } else if (obj_key.instance.empty()) { + obj_key.instance = "null"; + } + + rgw_obj obj(bucket_info.bucket, obj_key); + ACLOwner obj_owner; + obj_owner.set_id(rgw_user {meta.owner}); + obj_owner.set_name(meta.owner_display_name); + + RGWRados::Object del_target(store, bucket_info, oc.rctx, obj); + RGWRados::Object::Delete del_op(&del_target); + + del_op.params.bucket_owner = bucket_info.owner; + del_op.params.versioning_status = bucket_info.versioning_status(); + del_op.params.obj_owner = obj_owner; + del_op.params.unmod_since = meta.mtime; + + return del_op.delete_obj(); +} + +class LCOpAction { +public: + virtual ~LCOpAction() {} + + virtual bool check(lc_op_ctx& oc, ceph::real_time *exp_time) { + return false; + }; + + /* called after check(). Check should tell us whether this action + * is applicable. If there are multiple actions, we'll end up executing + * the latest applicable action + * For example: + * one action after 10 days, another after 20, third after 40. + * After 10 days, the latest applicable action would be the first one, + * after 20 days it will be the second one. After 21 days it will still be the + * second one. So check() should return true for the second action at that point, + * but should_process() if the action has already been applied. In object removal + * it doesn't matter, but in object transition it does. + */ + virtual bool should_process() { + return true; + } + + virtual int process(lc_op_ctx& oc) { + return 0; + } +}; + +class LCOpFilter { +public: +virtual ~LCOpFilter() {} + virtual bool check(lc_op_ctx& oc) { + return false; + } +}; + +class LCOpRule { + friend class LCOpAction; + + op_env& env; + + std::vector<unique_ptr<LCOpFilter> > filters; + std::vector<unique_ptr<LCOpAction> > actions; + +public: + LCOpRule(op_env& _env) : env(_env) {} + + void build(); + int process(rgw_bucket_dir_entry& o); +}; + +static int check_tags(lc_op_ctx& oc, bool *skip) +{ + auto& op = oc.op; + + if (op.obj_tags != boost::none) { + *skip = true; + + bufferlist tags_bl; + int ret = read_obj_tags(oc.store, oc.bucket_info, oc.obj, oc.rctx, tags_bl); + if (ret < 0) { + if (ret != -ENODATA) { + ldout(oc.cct, 5) << "ERROR: read_obj_tags returned r=" << ret << dendl; + } + return 0; + } + RGWObjTags dest_obj_tags; + try { + auto iter = tags_bl.cbegin(); + dest_obj_tags.decode(iter); + } catch (buffer::error& err) { + ldout(oc.cct,0) << "ERROR: caught buffer::error, couldn't decode TagSet" << dendl; + return -EIO; + } + + if (! has_all_tags(op, dest_obj_tags)) { + ldout(oc.cct, 20) << __func__ << "() skipping obj " << oc.obj << " as tags do not match in rule: " << op.id << dendl; + return 0; + } + } + *skip = false; + return 0; +} + +class LCOpFilter_Tags : public LCOpFilter { +public: + bool check(lc_op_ctx& oc) override { + auto& o = oc.o; + + if (o.is_delete_marker()) { + return true; + } + + bool skip; + + int ret = check_tags(oc, &skip); + if (ret < 0) { + if (ret == -ENOENT) { + return false; + } + ldout(oc.cct, 0) << "ERROR: check_tags on obj=" << oc.obj << " returned ret=" << ret << dendl; + return false; + } + + return !skip; + }; +}; + +class LCOpAction_CurrentExpiration : public LCOpAction { +public: + bool check(lc_op_ctx& oc, ceph::real_time *exp_time) override { + auto& o = oc.o; + if (!o.is_current()) { + ldout(oc.cct, 20) << __func__ << "(): key=" << o.key << ": not current, skipping" << dendl; + return false; + } + if (o.is_delete_marker()) { + if (oc.ol.next_has_same_name()) { + return false; + } else { + *exp_time = real_clock::now(); + return true; + } + } + + auto& mtime = o.meta.mtime; + bool is_expired; + auto& op = oc.op; + if (op.expiration <= 0) { + if (op.expiration_date == boost::none) { + ldout(oc.cct, 20) << __func__ << "(): key=" << o.key << ": no expiration set in rule, skipping" << dendl; + return false; + } + is_expired = ceph_clock_now() >= ceph::real_clock::to_time_t(*op.expiration_date); + *exp_time = *op.expiration_date; + } else { + is_expired = obj_has_expired(oc.cct, mtime, op.expiration, exp_time); + } + + ldout(oc.cct, 20) << __func__ << "(): key=" << o.key << ": is_expired=" << (int)is_expired << dendl; + return is_expired; + } + + int process(lc_op_ctx& oc) { + auto& o = oc.o; + int r; + if (o.is_delete_marker()) { + r = remove_expired_obj(oc, true); + } else { + r = remove_expired_obj(oc, !oc.bucket_info.versioned()); + } + if (r < 0) { + ldout(oc.cct, 0) << "ERROR: remove_expired_obj " << dendl; + return r; + } + ldout(oc.cct, 2) << "DELETED:" << oc.bucket_info.bucket << ":" << o.key << dendl; + return 0; + } +}; + +class LCOpAction_NonCurrentExpiration : public LCOpAction { +public: + bool check(lc_op_ctx& oc, ceph::real_time *exp_time) override { + auto& o = oc.o; + if (o.is_current()) { + ldout(oc.cct, 20) << __func__ << "(): key=" << o.key << ": current version, skipping" << dendl; + return false; + } + + auto mtime = oc.ol.get_prev_obj().meta.mtime; + int expiration = oc.op.noncur_expiration; + bool is_expired = obj_has_expired(oc.cct, mtime, expiration, exp_time); + + ldout(oc.cct, 20) << __func__ << "(): key=" << o.key << ": is_expired=" << is_expired << dendl; + return is_expired && pass_object_lock_check(oc.store, oc.bucket_info, oc.obj, oc.rctx); + } + + int process(lc_op_ctx& oc) { + auto& o = oc.o; + int r = remove_expired_obj(oc, true); + if (r < 0) { + ldout(oc.cct, 0) << "ERROR: remove_expired_obj " << dendl; + return r; + } + ldout(oc.cct, 2) << "DELETED:" << oc.bucket_info.bucket << ":" << o.key << " (non-current expiration)" << dendl; + return 0; + } +}; + +class LCOpAction_DMExpiration : public LCOpAction { +public: + bool check(lc_op_ctx& oc, ceph::real_time *exp_time) override { + auto& o = oc.o; + if (!o.is_delete_marker()) { + ldout(oc.cct, 20) << __func__ << "(): key=" << o.key << ": not a delete marker, skipping" << dendl; + return false; + } + + if (oc.ol.next_has_same_name()) { + ldout(oc.cct, 20) << __func__ << "(): key=" << o.key << ": next is same object, skipping" << dendl; + return false; + } + + *exp_time = real_clock::now(); + + return true; + } + + int process(lc_op_ctx& oc) { + auto& o = oc.o; + int r = remove_expired_obj(oc, true); + if (r < 0) { + ldout(oc.cct, 0) << "ERROR: remove_expired_obj " << dendl; + return r; + } + ldout(oc.cct, 2) << "DELETED:" << oc.bucket_info.bucket << ":" << o.key << " (delete marker expiration)" << dendl; + return 0; + } +}; + +class LCOpAction_Transition : public LCOpAction { + const transition_action& transition; + bool need_to_process{false}; + +protected: + virtual bool check_current_state(bool is_current) = 0; + virtual ceph::real_time get_effective_mtime(lc_op_ctx& oc) = 0; +public: + LCOpAction_Transition(const transition_action& _transition) : transition(_transition) {} + + bool check(lc_op_ctx& oc, ceph::real_time *exp_time) override { + auto& o = oc.o; + + if (o.is_delete_marker()) { + return false; + } + + if (!check_current_state(o.is_current())) { + return false; + } + + auto mtime = get_effective_mtime(oc); + bool is_expired; + if (transition.days < 0) { + if (transition.date == boost::none) { + ldout(oc.cct, 20) << __func__ << "(): key=" << o.key << ": no transition day/date set in rule, skipping" << dendl; + return false; + } + is_expired = ceph_clock_now() >= ceph::real_clock::to_time_t(*transition.date); + *exp_time = *transition.date; + } else { + is_expired = obj_has_expired(oc.cct, mtime, transition.days, exp_time); + } + + ldout(oc.cct, 20) << __func__ << "(): key=" << o.key << ": is_expired=" << is_expired << dendl; + + need_to_process = (rgw_placement_rule::get_canonical_storage_class(o.meta.storage_class) != transition.storage_class); + + return is_expired; + } + + bool should_process() override { + return need_to_process; + } + + int process(lc_op_ctx& oc) { + auto& o = oc.o; + + rgw_placement_rule target_placement; + target_placement.inherit_from(oc.bucket_info.placement_rule); + target_placement.storage_class = transition.storage_class; + + if (!oc.store->svc.zone->get_zone_params().valid_placement(target_placement)) { + ldout(oc.cct, 0) << "ERROR: non existent dest placement: " << target_placement + << " bucket="<< oc.bucket_info.bucket + << " rule_id=" << oc.op.id << dendl; + return -EINVAL; + } + + int r = oc.store->transition_obj(oc.rctx, oc.bucket_info, oc.obj, + target_placement, o.meta.mtime, o.versioned_epoch); + if (r < 0) { + ldout(oc.cct, 0) << "ERROR: failed to transition obj (r=" << r << ")" << dendl; + return r; + } + ldout(oc.cct, 2) << "TRANSITIONED:" << oc.bucket_info.bucket << ":" << o.key << " -> " << transition.storage_class << dendl; + return 0; + } +}; + +class LCOpAction_CurrentTransition : public LCOpAction_Transition { +protected: + bool check_current_state(bool is_current) override { + return is_current; + } + + ceph::real_time get_effective_mtime(lc_op_ctx& oc) override { + return oc.o.meta.mtime; + } +public: + LCOpAction_CurrentTransition(const transition_action& _transition) : LCOpAction_Transition(_transition) {} +}; + +class LCOpAction_NonCurrentTransition : public LCOpAction_Transition { +protected: + bool check_current_state(bool is_current) override { + return !is_current; + } + + ceph::real_time get_effective_mtime(lc_op_ctx& oc) override { + return oc.ol.get_prev_obj().meta.mtime; + } +public: + LCOpAction_NonCurrentTransition(const transition_action& _transition) : LCOpAction_Transition(_transition) {} +}; + +void LCOpRule::build() +{ + filters.emplace_back(new LCOpFilter_Tags); + + auto& op = env.op; + + if (op.expiration > 0 || + op.expiration_date != boost::none) { + actions.emplace_back(new LCOpAction_CurrentExpiration); + } + + if (op.dm_expiration) { + actions.emplace_back(new LCOpAction_DMExpiration); + } + + if (op.noncur_expiration > 0) { + actions.emplace_back(new LCOpAction_NonCurrentExpiration); + } + + for (auto& iter : op.transitions) { + actions.emplace_back(new LCOpAction_CurrentTransition(iter.second)); + } + + for (auto& iter : op.noncur_transitions) { + actions.emplace_back(new LCOpAction_NonCurrentTransition(iter.second)); + } +} + +int LCOpRule::process(rgw_bucket_dir_entry& o) +{ + lc_op_ctx ctx(env, o); + + unique_ptr<LCOpAction> *selected = nullptr; + real_time exp; + + for (auto& a : actions) { + real_time action_exp; + + if (a->check(ctx, &action_exp)) { + if (action_exp > exp) { + exp = action_exp; + selected = &a; + } + } + } + + if (selected && + (*selected)->should_process()) { + + /* + * Calling filter checks after action checks because + * all action checks (as they are implemented now) do + * not access the objects themselves, but return result + * from info from bucket index listing. The current tags filter + * check does access the objects, so we avoid unnecessary rados calls + * having filters check later in the process. + */ + + bool cont = false; + for (auto& f : filters) { + if (f->check(ctx)) { + cont = true; + break; + } + } + + if (!cont) { + ldout(env.store->ctx(), 20) << __func__ << "(): key=" << o.key << ": no rule match, skipping" << dendl; + return 0; + } + + int r = (*selected)->process(ctx); + if (r < 0) { + ldout(ctx.cct, 0) << "ERROR: remove_expired_obj " << dendl; + return r; + } + ldout(ctx.cct, 20) << "processed:" << env.bucket_info.bucket << ":" << o.key << dendl; + } + + return 0; + +} + +int RGWLC::bucket_lc_process(string& shard_id) +{ + RGWLifecycleConfiguration config(cct); + RGWBucketInfo bucket_info; + map<string, bufferlist> bucket_attrs; + string no_ns, list_versions; + vector<rgw_bucket_dir_entry> objs; + auto obj_ctx = store->svc.sysobj->init_obj_ctx(); + vector<std::string> result; + boost::split(result, shard_id, boost::is_any_of(":")); + string bucket_tenant = result[0]; + string bucket_name = result[1]; + string bucket_marker = result[2]; + int ret = store->get_bucket_info(obj_ctx, bucket_tenant, bucket_name, bucket_info, NULL, &bucket_attrs); + if (ret < 0) { + ldpp_dout(this, 0) << "LC:get_bucket_info for " << bucket_name << " failed" << dendl; + return ret; + } + + if (bucket_info.bucket.marker != bucket_marker) { + ldpp_dout(this, 1) << "LC: deleting stale entry found for bucket=" << bucket_tenant + << ":" << bucket_name << " cur_marker=" << bucket_info.bucket.marker + << " orig_marker=" << bucket_marker << dendl; + return -ENOENT; + } + + RGWRados::Bucket target(store, bucket_info); + + map<string, bufferlist>::iterator aiter = bucket_attrs.find(RGW_ATTR_LC); + if (aiter == bucket_attrs.end()) + return 0; + + bufferlist::const_iterator iter{&aiter->second}; + try { + config.decode(iter); + } catch (const buffer::error& e) { + ldpp_dout(this, 0) << __func__ << "() decode life cycle config failed" << dendl; + return -1; + } + + multimap<string, lc_op>& prefix_map = config.get_prefix_map(); + + ldpp_dout(this, 10) << __func__ << "() prefix_map size=" + << prefix_map.size() + << dendl; + + rgw_obj_key pre_marker; + rgw_obj_key next_marker; + for(auto prefix_iter = prefix_map.begin(); prefix_iter != prefix_map.end(); ++prefix_iter) { + auto& op = prefix_iter->second; + if (!is_valid_op(op)) { + continue; + } + ldpp_dout(this, 20) << __func__ << "(): prefix=" << prefix_iter->first << dendl; + if (prefix_iter != prefix_map.begin() && + (prefix_iter->first.compare(0, prev(prefix_iter)->first.length(), prev(prefix_iter)->first) == 0)) { + next_marker = pre_marker; + } else { + pre_marker = next_marker; + } + + LCObjsLister ol(store, bucket_info); + ol.set_prefix(prefix_iter->first); + + ret = ol.init(); + + if (ret < 0) { + if (ret == (-ENOENT)) + return 0; + ldpp_dout(this, 0) << "ERROR: store->list_objects():" <<dendl; + return ret; + } + + op_env oenv(op, store, this, bucket_info, ol); + + LCOpRule orule(oenv); + + orule.build(); + + ceph::real_time mtime; + rgw_bucket_dir_entry o; + for (; ol.get_obj(&o); ol.next()) { + ldpp_dout(this, 20) << __func__ << "(): key=" << o.key << dendl; + int ret = orule.process(o); + if (ret < 0) { + ldpp_dout(this, 20) << "ERROR: orule.process() returned ret=" + << ret + << dendl; + } + + if (going_down()) { + return 0; + } + } + } + + ret = handle_multipart_expiration(&target, prefix_map); + + return ret; +} + +int RGWLC::bucket_lc_post(int index, int max_lock_sec, pair<string, int >& entry, int& result) +{ + utime_t lock_duration(cct->_conf->rgw_lc_lock_max_time, 0); + + rados::cls::lock::Lock l(lc_index_lock_name); + l.set_cookie(cookie); + l.set_duration(lock_duration); + + do { + int ret = l.lock_exclusive(&store->lc_pool_ctx, obj_names[index]); + if (ret == -EBUSY || ret == -EEXIST) { /* already locked by another lc processor */ + ldpp_dout(this, 0) << "RGWLC::bucket_lc_post() failed to acquire lock on " + << obj_names[index] << ", sleep 5, try again" << dendl; + sleep(5); + continue; + } + if (ret < 0) + return 0; + ldpp_dout(this, 20) << "RGWLC::bucket_lc_post() lock " << obj_names[index] << dendl; + if (result == -ENOENT) { + ret = cls_rgw_lc_rm_entry(store->lc_pool_ctx, obj_names[index], entry); + if (ret < 0) { + ldpp_dout(this, 0) << "RGWLC::bucket_lc_post() failed to remove entry " + << obj_names[index] << dendl; + } + goto clean; + } else if (result < 0) { + entry.second = lc_failed; + } else { + entry.second = lc_complete; + } + + ret = cls_rgw_lc_set_entry(store->lc_pool_ctx, obj_names[index], entry); + if (ret < 0) { + ldpp_dout(this, 0) << "RGWLC::process() failed to set entry on " + << obj_names[index] << dendl; + } +clean: + l.unlock(&store->lc_pool_ctx, obj_names[index]); + ldpp_dout(this, 20) << "RGWLC::bucket_lc_post() unlock " << obj_names[index] << dendl; + return 0; + } while (true); +} + +int RGWLC::list_lc_progress(const string& marker, uint32_t max_entries, map<string, int> *progress_map) +{ + int index = 0; + progress_map->clear(); + for(; index <max_objs; index++) { + map<string, int > entries; + int ret = cls_rgw_lc_list(store->lc_pool_ctx, obj_names[index], marker, max_entries, entries); + if (ret < 0) { + if (ret == -ENOENT) { + ldpp_dout(this, 10) << __func__ << "() ignoring unfound lc object=" + << obj_names[index] << dendl; + continue; + } else { + return ret; + } + } + map<string, int>::iterator iter; + for (iter = entries.begin(); iter != entries.end(); ++iter) { + progress_map->insert(*iter); + } + } + return 0; +} + +int RGWLC::process() +{ + int max_secs = cct->_conf->rgw_lc_lock_max_time; + + const int start = ceph::util::generate_random_number(0, max_objs - 1); + + for (int i = 0; i < max_objs; i++) { + int index = (i + start) % max_objs; + int ret = process(index, max_secs); + if (ret < 0) + return ret; + } + + return 0; +} + +int RGWLC::process(int index, int max_lock_secs) +{ + rados::cls::lock::Lock l(lc_index_lock_name); + do { + utime_t now = ceph_clock_now(); + pair<string, int > entry;//string = bucket_name:bucket_id ,int = LC_BUCKET_STATUS + if (max_lock_secs <= 0) + return -EAGAIN; + + utime_t time(max_lock_secs, 0); + l.set_duration(time); + + int ret = l.lock_exclusive(&store->lc_pool_ctx, obj_names[index]); + if (ret == -EBUSY || ret == -EEXIST) { /* already locked by another lc processor */ + ldpp_dout(this, 0) << "RGWLC::process() failed to acquire lock on " + << obj_names[index] << ", sleep 5, try again" << dendl; + sleep(5); + continue; + } + if (ret < 0) + return 0; + + cls_rgw_lc_obj_head head; + ret = cls_rgw_lc_get_head(store->lc_pool_ctx, obj_names[index], head); + if (ret < 0) { + ldpp_dout(this, 0) << "RGWLC::process() failed to get obj head " + << obj_names[index] << ", ret=" << ret << dendl; + goto exit; + } + + if(!if_already_run_today(head.start_date)) { + head.start_date = now; + head.marker.clear(); + ret = bucket_lc_prepare(index); + if (ret < 0) { + ldpp_dout(this, 0) << "RGWLC::process() failed to update lc object " + << obj_names[index] << ", ret=" << ret << dendl; + goto exit; + } + } + + ret = cls_rgw_lc_get_next_entry(store->lc_pool_ctx, obj_names[index], head.marker, entry); + if (ret < 0) { + ldpp_dout(this, 0) << "RGWLC::process() failed to get obj entry " + << obj_names[index] << dendl; + goto exit; + } + + if (entry.first.empty()) + goto exit; + + entry.second = lc_processing; + ret = cls_rgw_lc_set_entry(store->lc_pool_ctx, obj_names[index], entry); + if (ret < 0) { + ldpp_dout(this, 0) << "RGWLC::process() failed to set obj entry " << obj_names[index] + << " (" << entry.first << "," << entry.second << ")" << dendl; + goto exit; + } + + head.marker = entry.first; + ret = cls_rgw_lc_put_head(store->lc_pool_ctx, obj_names[index], head); + if (ret < 0) { + ldpp_dout(this, 0) << "RGWLC::process() failed to put head " << obj_names[index] << dendl; + goto exit; + } + l.unlock(&store->lc_pool_ctx, obj_names[index]); + ret = bucket_lc_process(entry.first); + bucket_lc_post(index, max_lock_secs, entry, ret); + }while(1); + +exit: + l.unlock(&store->lc_pool_ctx, obj_names[index]); + return 0; +} + +void RGWLC::start_processor() +{ + worker = new LCWorker(this, cct, this); + worker->create("lifecycle_thr"); +} + +void RGWLC::stop_processor() +{ + down_flag = true; + if (worker) { + worker->stop(); + worker->join(); + } + delete worker; + worker = NULL; +} + + +unsigned RGWLC::get_subsys() const +{ + return dout_subsys; +} + +std::ostream& RGWLC::gen_prefix(std::ostream& out) const +{ + return out << "lifecycle: "; +} + +void RGWLC::LCWorker::stop() +{ + Mutex::Locker l(lock); + cond.Signal(); +} + +bool RGWLC::going_down() +{ + return down_flag; +} + +bool RGWLC::LCWorker::should_work(utime_t& now) +{ + int start_hour; + int start_minute; + int end_hour; + int end_minute; + string worktime = cct->_conf->rgw_lifecycle_work_time; + sscanf(worktime.c_str(),"%d:%d-%d:%d",&start_hour, &start_minute, &end_hour, &end_minute); + struct tm bdt; + time_t tt = now.sec(); + localtime_r(&tt, &bdt); + + if (cct->_conf->rgw_lc_debug_interval > 0) { + /* We're debugging, so say we can run */ + return true; + } else if ((bdt.tm_hour*60 + bdt.tm_min >= start_hour*60 + start_minute) && + (bdt.tm_hour*60 + bdt.tm_min <= end_hour*60 + end_minute)) { + return true; + } else { + return false; + } + +} + +int RGWLC::LCWorker::schedule_next_start_time(utime_t &start, utime_t& now) +{ + int secs; + + if (cct->_conf->rgw_lc_debug_interval > 0) { + secs = start + cct->_conf->rgw_lc_debug_interval - now; + if (secs < 0) + secs = 0; + return (secs); + } + + int start_hour; + int start_minute; + int end_hour; + int end_minute; + string worktime = cct->_conf->rgw_lifecycle_work_time; + sscanf(worktime.c_str(),"%d:%d-%d:%d",&start_hour, &start_minute, &end_hour, &end_minute); + struct tm bdt; + time_t tt = now.sec(); + time_t nt; + localtime_r(&tt, &bdt); + bdt.tm_hour = start_hour; + bdt.tm_min = start_minute; + bdt.tm_sec = 0; + nt = mktime(&bdt); + secs = nt - tt; + + return secs>0 ? secs : secs+24*60*60; +} + +void RGWLifecycleConfiguration::generate_test_instances(list<RGWLifecycleConfiguration*>& o) +{ + o.push_back(new RGWLifecycleConfiguration); +} + +void get_lc_oid(CephContext *cct, const string& shard_id, string *oid) +{ + int max_objs = (cct->_conf->rgw_lc_max_objs > HASH_PRIME ? HASH_PRIME : cct->_conf->rgw_lc_max_objs); + int index = ceph_str_hash_linux(shard_id.c_str(), shard_id.size()) % HASH_PRIME % max_objs; + *oid = lc_oid_prefix; + char buf[32]; + snprintf(buf, 32, ".%d", index); + oid->append(buf); + return; +} + + + +static std::string get_lc_shard_name(const rgw_bucket& bucket){ + return string_join_reserve(':', bucket.tenant, bucket.name, bucket.marker); +} + +template<typename F> +static int guard_lc_modify(RGWRados* store, const rgw_bucket& bucket, const string& cookie, const F& f) { + CephContext *cct = store->ctx(); + + string shard_id = get_lc_shard_name(bucket); + + string oid; + get_lc_oid(cct, shard_id, &oid); + + pair<string, int> entry(shard_id, lc_uninitial); + int max_lock_secs = cct->_conf->rgw_lc_lock_max_time; + + rados::cls::lock::Lock l(lc_index_lock_name); + utime_t time(max_lock_secs, 0); + l.set_duration(time); + l.set_cookie(cookie); + + librados::IoCtx *ctx = store->get_lc_pool_ctx(); + int ret; + + do { + ret = l.lock_exclusive(ctx, oid); + if (ret == -EBUSY || ret == -EEXIST) { + ldout(cct, 0) << "RGWLC::RGWPutLC() failed to acquire lock on " + << oid << ", sleep 5, try again" << dendl; + sleep(5); // XXX: return retryable error + continue; + } + if (ret < 0) { + ldout(cct, 0) << "RGWLC::RGWPutLC() failed to acquire lock on " + << oid << ", ret=" << ret << dendl; + break; + } + ret = f(ctx, oid, entry); + if (ret < 0) { + ldout(cct, 0) << "RGWLC::RGWPutLC() failed to set entry on " + << oid << ", ret=" << ret << dendl; + } + break; + } while(true); + l.unlock(ctx, oid); + return ret; +} + +int RGWLC::set_bucket_config(RGWBucketInfo& bucket_info, + const map<string, bufferlist>& bucket_attrs, + RGWLifecycleConfiguration *config) +{ + map<string, bufferlist> attrs = bucket_attrs; + bufferlist lc_bl; + config->encode(lc_bl); + + attrs[RGW_ATTR_LC] = std::move(lc_bl); + + int ret = rgw_bucket_set_attrs(store, bucket_info, attrs, &bucket_info.objv_tracker); + if (ret < 0) + return ret; + + rgw_bucket& bucket = bucket_info.bucket; + + + ret = guard_lc_modify(store, bucket, cookie, [&](librados::IoCtx *ctx, const string& oid, + const pair<string, int>& entry) { + return cls_rgw_lc_set_entry(*ctx, oid, entry); + }); + + return ret; +} + +int RGWLC::remove_bucket_config(RGWBucketInfo& bucket_info, + const map<string, bufferlist>& bucket_attrs) +{ + map<string, bufferlist> attrs = bucket_attrs; + attrs.erase(RGW_ATTR_LC); + int ret = rgw_bucket_set_attrs(store, bucket_info, attrs, + &bucket_info.objv_tracker); + + rgw_bucket& bucket = bucket_info.bucket; + + if (ret < 0) { + ldout(cct, 0) << "RGWLC::RGWDeleteLC() failed to set attrs on bucket=" + << bucket.name << " returned err=" << ret << dendl; + return ret; + } + + + ret = guard_lc_modify(store, bucket, cookie, [&](librados::IoCtx *ctx, const string& oid, + const pair<string, int>& entry) { + return cls_rgw_lc_rm_entry(*ctx, oid, entry); + }); + + return ret; +} + +namespace rgw::lc { + +int fix_lc_shard_entry(RGWRados* store, const RGWBucketInfo& bucket_info, + const map<std::string,bufferlist>& battrs) +{ + if (auto aiter = battrs.find(RGW_ATTR_LC); + aiter == battrs.end()) { + return 0; // No entry, nothing to fix + } + + auto shard_name = get_lc_shard_name(bucket_info.bucket); + std::string lc_oid; + get_lc_oid(store->ctx(), shard_name, &lc_oid); + + rgw_lc_entry_t entry; + // There are multiple cases we need to encounter here + // 1. entry exists and is already set to marker, happens in plain buckets & newly resharded buckets + // 2. entry doesn't exist, which usually happens when reshard has happened prior to update and next LC process has already dropped the update + // 3. entry exists matching the current bucket id which was after a reshard (needs to be updated to the marker) + // We are not dropping the old marker here as that would be caught by the next LC process update + auto lc_pool_ctx = store->get_lc_pool_ctx(); + int ret = cls_rgw_lc_get_entry(*lc_pool_ctx, + lc_oid, shard_name, entry); + if (ret == 0) { + ldout(store->ctx(), 5) << "Entry already exists, nothing to do" << dendl; + return ret; // entry is already existing correctly set to marker + } + ldout(store->ctx(), 5) << "cls_rgw_lc_get_entry errored ret code=" << ret << dendl; + if (ret == -ENOENT) { + ldout(store->ctx(), 1) << "No entry for bucket=" << bucket_info.bucket.name + << " creating " << dendl; + // TODO: we have too many ppl making cookies like this! + char cookie_buf[COOKIE_LEN + 1]; + gen_rand_alphanumeric(store->ctx(), cookie_buf, sizeof(cookie_buf) - 1); + std::string cookie = cookie_buf; + + ret = guard_lc_modify(store, bucket_info.bucket, cookie, + [&lc_pool_ctx, &lc_oid](librados::IoCtx *ctx, const string& oid, + const pair<string, int>& entry) { + return cls_rgw_lc_set_entry(*lc_pool_ctx, + lc_oid, entry); + }); + + } + + return ret; +} + +std::string s3_expiration_header( + DoutPrefixProvider* dpp, + const rgw_obj_key& obj_key, + const RGWObjTags& obj_tagset, + const ceph::real_time& mtime, + const std::map<std::string, buffer::list>& bucket_attrs) +{ + CephContext* cct = dpp->get_cct(); + RGWLifecycleConfiguration config(cct); + std::string hdr{""}; + + const auto& aiter = bucket_attrs.find(RGW_ATTR_LC); + if (aiter == bucket_attrs.end()) + return hdr; + + bufferlist::const_iterator iter{&aiter->second}; + try { + config.decode(iter); + } catch (const buffer::error& e) { + ldpp_dout(dpp, 0) << __func__ + << "() decode life cycle config failed" + << dendl; + return hdr; + } /* catch */ + + /* dump tags at debug level 16 */ + RGWObjTags::tag_map_t obj_tag_map = obj_tagset.get_tags(); + if (cct->_conf->subsys.should_gather(ceph_subsys_rgw, 16)) { + for (const auto& elt : obj_tag_map) { + ldout(cct, 16) << __func__ + << "() key=" << elt.first << " val=" << elt.second + << dendl; + } + } + + boost::optional<ceph::real_time> expiration_date; + boost::optional<std::string> rule_id; + + const auto& rule_map = config.get_rule_map(); + for (const auto& ri : rule_map) { + const auto& rule = ri.second; + auto& id = rule.get_id(); + auto& prefix = rule.get_prefix(); + auto& filter = rule.get_filter(); + auto& expiration = rule.get_expiration(); + auto& noncur_expiration = rule.get_noncur_expiration(); + + ldpp_dout(dpp, 10) << "rule: " << ri.first + << " prefix: " << prefix + << " expiration: " + << " date: " << expiration.get_date() + << " days: " << expiration.get_days() + << " noncur_expiration: " + << " date: " << noncur_expiration.get_date() + << " days: " << noncur_expiration.get_days() + << dendl; + + /* skip if rule !enabled + * if rule has prefix, skip iff object !match prefix + * if rule has tags, skip iff object !match tags + * note if object is current or non-current, compare accordingly + * if rule has days, construct date expression and save iff older + * than last saved + * if rule has date, convert date expression and save iff older + * than last saved + * if the date accum has a value, format it into hdr + */ + + if (!rule.is_enabled()) + continue; + + if(!prefix.empty()) { + if (!boost::starts_with(obj_key.name, prefix)) + continue; + } + + if (filter.has_tags()) { + bool tag_match = false; + const RGWObjTags& rule_tagset = filter.get_tags(); + for (auto& tag : rule_tagset.get_tags()) { + /* remember, S3 tags are {key,value} tuples */ + tag_match = true; + auto obj_tag = obj_tag_map.find(tag.first); + if (obj_tag == obj_tag_map.end() || obj_tag->second != tag.second) { + ldpp_dout(dpp, 10) << "tag does not match obj_key=" << obj_key + << " rule_id=" << id + << " tag=" << tag + << dendl; + tag_match = false; + break; + } + } + if (! tag_match) + continue; + } + + // compute a uniform expiration date + boost::optional<ceph::real_time> rule_expiration_date; + const LCExpiration& rule_expiration = + (obj_key.instance.empty()) ? expiration : noncur_expiration; + + if (rule_expiration.has_date()) { + rule_expiration_date = + boost::optional<ceph::real_time>( + ceph::from_iso_8601(rule.get_expiration().get_date())); + rule_id = id; + } else { + if (rule_expiration.has_days()) { + rule_expiration_date = + boost::optional<ceph::real_time>( + mtime + make_timespan(double(rule_expiration.get_days())*24*60*60)); + rule_id = id; + } + } + + // update earliest expiration + if (rule_expiration_date) { + if ((! expiration_date) || + (*expiration_date < *rule_expiration_date)) { + expiration_date = + boost::optional<ceph::real_time>(rule_expiration_date); + } + } + } + + // cond format header + if (expiration_date && rule_id) { + // Fri, 23 Dec 2012 00:00:00 GMT + char exp_buf[100]; + time_t exp = ceph::real_clock::to_time_t(*expiration_date); + if (std::strftime(exp_buf, sizeof(exp_buf), + "%a, %d %b %Y %T %Z", std::gmtime(&exp))) { + hdr = fmt::format("expiry-date=\"{0}\", rule-id=\"{1}\"", exp_buf, + *rule_id); + } else { + ldpp_dout(dpp, 0) << __func__ << + "() strftime of life cycle expiration header failed" + << dendl; + } + } + + return hdr; + +} /* rgwlc_s3_expiration_header */ + +} /* namespace rgw::lc */ |