// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- // vim: ts=8 sw=2 smarttab ft=cpp #include #include #include #include #include #include #include #include #include #include #include "include/scope_guard.h" #include "include/function2.hpp" #include "common/Formatter.h" #include "common/containers.h" #include "common/split.h" #include #include "include/random.h" #include "cls/lock/cls_lock_client.h" #include "rgw_perf_counters.h" #include "rgw_common.h" #include "rgw_bucket.h" #include "rgw_lc.h" #include "rgw_zone.h" #include "rgw_string.h" #include "rgw_multi.h" #include "rgw_sal.h" #include "rgw_lc_tier.h" #include "rgw_notify.h" #include "fmt/format.h" #include "services/svc_sys_obj.h" #include "services/svc_zone.h" #include "services/svc_tier_rados.h" #define dout_context g_ceph_context #define dout_subsys ceph_subsys_rgw using namespace std; const char* LC_STATUS[] = { "UNINITIAL", "PROCESSING", "FAILED", "COMPLETE" }; using namespace librados; bool LCRule::valid() const { if (id.length() > MAX_ID_LEN) { return false; } else if(expiration.empty() && noncur_expiration.empty() && mp_expiration.empty() && !dm_expiration && transitions.empty() && noncur_transitions.empty()) { return false; } else if (!expiration.valid() || !noncur_expiration.valid() || !mp_expiration.valid()) { return false; } if (!transitions.empty()) { bool using_days = expiration.has_days(); bool using_date = expiration.has_date(); for (const auto& elem : transitions) { if (!elem.second.valid()) { return false; } using_days = using_days || elem.second.has_days(); using_date = using_date || elem.second.has_date(); if (using_days && using_date) { return false; } } } for (const auto& elem : noncur_transitions) { if (!elem.second.valid()) { return false; } } return true; } void LCRule::init_simple_days_rule(std::string_view _id, std::string_view _prefix, int num_days) { id = _id; prefix = _prefix; char buf[32]; snprintf(buf, sizeof(buf), "%d", num_days); expiration.set_days(buf); set_enabled(true); } void RGWLifecycleConfiguration::add_rule(const LCRule& rule) { auto& id = rule.get_id(); // note that this will return false for groups, but that's ok, we won't search groups rule_map.insert(pair(id, rule)); } bool RGWLifecycleConfiguration::_add_rule(const LCRule& rule) { lc_op op(rule.get_id()); op.status = rule.is_enabled(); if (rule.get_expiration().has_days()) { op.expiration = rule.get_expiration().get_days(); } if (rule.get_expiration().has_date()) { op.expiration_date = ceph::from_iso_8601(rule.get_expiration().get_date()); } if (rule.get_noncur_expiration().has_days()) { op.noncur_expiration = rule.get_noncur_expiration().get_days(); } if (rule.get_mp_expiration().has_days()) { op.mp_expiration = rule.get_mp_expiration().get_days(); } op.dm_expiration = rule.get_dm_expiration(); for (const auto &elem : rule.get_transitions()) { transition_action action; if (elem.second.has_days()) { action.days = elem.second.get_days(); } else { action.date = ceph::from_iso_8601(elem.second.get_date()); } action.storage_class = rgw_placement_rule::get_canonical_storage_class(elem.first); op.transitions.emplace(elem.first, std::move(action)); } for (const auto &elem : rule.get_noncur_transitions()) { transition_action action; action.days = elem.second.get_days(); action.date = ceph::from_iso_8601(elem.second.get_date()); action.storage_class = rgw_placement_rule::get_canonical_storage_class(elem.first); op.noncur_transitions.emplace(elem.first, std::move(action)); } std::string prefix; if (rule.get_filter().has_prefix()){ prefix = rule.get_filter().get_prefix(); } else { prefix = rule.get_prefix(); } if (rule.get_filter().has_tags()){ op.obj_tags = rule.get_filter().get_tags(); } op.rule_flags = rule.get_filter().get_flags(); prefix_map.emplace(std::move(prefix), std::move(op)); return true; } int RGWLifecycleConfiguration::check_and_add_rule(const LCRule& rule) { if (!rule.valid()) { return -EINVAL; } auto& id = rule.get_id(); if (rule_map.find(id) != rule_map.end()) { //id shouldn't be the same return -EINVAL; } if (rule.get_filter().has_tags() && (rule.get_dm_expiration() || !rule.get_mp_expiration().empty())) { return -ERR_INVALID_REQUEST; } rule_map.insert(pair(id, rule)); if (!_add_rule(rule)) { return -ERR_INVALID_REQUEST; } return 0; } bool RGWLifecycleConfiguration::has_same_action(const lc_op& first, const lc_op& second) { if ((first.expiration > 0 || first.expiration_date != boost::none) && (second.expiration > 0 || second.expiration_date != boost::none)) { return true; } else if (first.noncur_expiration > 0 && second.noncur_expiration > 0) { return true; } else if (first.mp_expiration > 0 && second.mp_expiration > 0) { return true; } else if (!first.transitions.empty() && !second.transitions.empty()) { for (auto &elem : first.transitions) { if (second.transitions.find(elem.first) != second.transitions.end()) { return true; } } } else if (!first.noncur_transitions.empty() && !second.noncur_transitions.empty()) { for (auto &elem : first.noncur_transitions) { if (second.noncur_transitions.find(elem.first) != second.noncur_transitions.end()) { return true; } } } return false; } /* Formerly, this method checked for duplicate rules using an invalid * method (prefix uniqueness). */ bool RGWLifecycleConfiguration::valid() { return true; } void *RGWLC::LCWorker::entry() { do { std::unique_ptr all_buckets; // empty restriction utime_t start = ceph_clock_now(); if (should_work(start)) { ldpp_dout(dpp, 2) << "life cycle: start" << dendl; int r = lc->process(this, all_buckets, false /* once */); if (r < 0) { ldpp_dout(dpp, 0) << "ERROR: do life cycle process() returned error r=" << r << dendl; } ldpp_dout(dpp, 2) << "life cycle: stop" << dendl; cloud_targets.clear(); // clear cloud targets } if (lc->going_down()) break; utime_t end = ceph_clock_now(); int secs = schedule_next_start_time(start, end); utime_t next; next.set_from_double(end + secs); ldpp_dout(dpp, 5) << "schedule life cycle next start time: " << rgw_to_asctime(next) << dendl; std::unique_lock l{lock}; cond.wait_for(l, std::chrono::seconds(secs)); } while (!lc->going_down()); return NULL; } void RGWLC::initialize(CephContext *_cct, rgw::sal::Driver* _driver) { cct = _cct; driver = _driver; sal_lc = driver->get_lifecycle(); max_objs = cct->_conf->rgw_lc_max_objs; if (max_objs > HASH_PRIME) max_objs = HASH_PRIME; obj_names = new string[max_objs]; for (int i = 0; i < max_objs; i++) { obj_names[i] = lc_oid_prefix; char buf[32]; snprintf(buf, 32, ".%d", i); obj_names[i].append(buf); } #define COOKIE_LEN 16 char cookie_buf[COOKIE_LEN + 1]; gen_rand_alphanumeric(cct, cookie_buf, sizeof(cookie_buf) - 1); cookie = cookie_buf; } void RGWLC::finalize() { delete[] obj_names; } static inline std::ostream& operator<<(std::ostream &os, rgw::sal::Lifecycle::LCEntry& ent) { os << ""; return os; } static bool obj_has_expired(const DoutPrefixProvider *dpp, CephContext *cct, ceph::real_time mtime, int days, ceph::real_time *expire_time = nullptr) { double timediff, cmp; utime_t base_time; if (cct->_conf->rgw_lc_debug_interval <= 0) { /* Normal case, run properly */ cmp = double(days)*24*60*60; base_time = ceph_clock_now().round_to_day(); } else { /* We're in debug mode; Treat each rgw_lc_debug_interval seconds as a day */ cmp = double(days)*cct->_conf->rgw_lc_debug_interval; base_time = ceph_clock_now(); } auto tt_mtime = ceph::real_clock::to_time_t(mtime); timediff = base_time - tt_mtime; if (expire_time) { *expire_time = mtime + make_timespan(cmp); } ldpp_dout(dpp, 20) << __func__ << "(): mtime=" << mtime << " days=" << days << " base_time=" << base_time << " timediff=" << timediff << " cmp=" << cmp << " is_expired=" << (timediff >= cmp) << dendl; return (timediff >= cmp); } static bool pass_object_lock_check(rgw::sal::Driver* driver, rgw::sal::Object* obj, const DoutPrefixProvider *dpp) { if (!obj->get_bucket()->get_info().obj_lock_enabled()) { return true; } std::unique_ptr read_op = obj->get_read_op(); int ret = read_op->prepare(null_yield, dpp); if (ret < 0) { if (ret == -ENOENT) { return true; } else { return false; } } else { auto iter = obj->get_attrs().find(RGW_ATTR_OBJECT_RETENTION); if (iter != obj->get_attrs().end()) { RGWObjectRetention retention; try { decode(retention, iter->second); } catch (buffer::error& err) { ldpp_dout(dpp, 0) << "ERROR: failed to decode RGWObjectRetention" << dendl; return false; } if (ceph::real_clock::to_time_t(retention.get_retain_until_date()) > ceph_clock_now()) { return false; } } iter = obj->get_attrs().find(RGW_ATTR_OBJECT_LEGAL_HOLD); if (iter != obj->get_attrs().end()) { RGWObjectLegalHold obj_legal_hold; try { decode(obj_legal_hold, iter->second); } catch (buffer::error& err) { ldpp_dout(dpp, 0) << "ERROR: failed to decode RGWObjectLegalHold" << dendl; return false; } if (obj_legal_hold.is_enabled()) { return false; } } return true; } } class LCObjsLister { rgw::sal::Driver* driver; rgw::sal::Bucket* bucket; rgw::sal::Bucket::ListParams list_params; rgw::sal::Bucket::ListResults list_results; string prefix; vector::iterator obj_iter; rgw_bucket_dir_entry pre_obj; int64_t delay_ms; public: LCObjsLister(rgw::sal::Driver* _driver, rgw::sal::Bucket* _bucket) : driver(_driver), bucket(_bucket) { list_params.list_versions = bucket->versioned(); list_params.allow_unordered = true; delay_ms = driver->ctx()->_conf.get_val("rgw_lc_thread_delay"); } void set_prefix(const string& p) { prefix = p; list_params.prefix = prefix; } int init(const DoutPrefixProvider *dpp) { return fetch(dpp); } int fetch(const DoutPrefixProvider *dpp) { int ret = bucket->list(dpp, list_params, 1000, list_results, null_yield); if (ret < 0) { return ret; } obj_iter = list_results.objs.begin(); return 0; } void delay() { std::this_thread::sleep_for(std::chrono::milliseconds(delay_ms)); } bool get_obj(const DoutPrefixProvider *dpp, rgw_bucket_dir_entry **obj, std::function fetch_barrier = []() { /* nada */}) { if (obj_iter == list_results.objs.end()) { if (!list_results.is_truncated) { delay(); return false; } else { fetch_barrier(); list_params.marker = pre_obj.key; int ret = fetch(dpp); if (ret < 0) { ldpp_dout(dpp, 0) << "ERROR: list_op returned ret=" << ret << dendl; return false; } } delay(); } /* returning address of entry in objs */ *obj = &(*obj_iter); return obj_iter != list_results.objs.end(); } rgw_bucket_dir_entry get_prev_obj() { return pre_obj; } void next() { pre_obj = *obj_iter; ++obj_iter; } boost::optional next_key_name() { if (obj_iter == list_results.objs.end() || (obj_iter + 1) == list_results.objs.end()) { /* this should have been called after get_obj() was called, so this should * only happen if is_truncated is false */ return boost::none; } return ((obj_iter + 1)->key.name); } }; /* LCObjsLister */ struct op_env { using LCWorker = RGWLC::LCWorker; lc_op op; rgw::sal::Driver* driver; LCWorker* worker; rgw::sal::Bucket* bucket; LCObjsLister& ol; op_env(lc_op& _op, rgw::sal::Driver* _driver, LCWorker* _worker, rgw::sal::Bucket* _bucket, LCObjsLister& _ol) : op(_op), driver(_driver), worker(_worker), bucket(_bucket), ol(_ol) {} }; /* op_env */ class LCRuleOp; class WorkQ; struct lc_op_ctx { CephContext *cct; op_env env; rgw_bucket_dir_entry o; boost::optional next_key_name; ceph::real_time effective_mtime; rgw::sal::Driver* driver; rgw::sal::Bucket* bucket; lc_op& op; // ok--refers to expanded env.op LCObjsLister& ol; std::unique_ptr obj; RGWObjectCtx rctx; const DoutPrefixProvider *dpp; WorkQ* wq; std::unique_ptr tier; lc_op_ctx(op_env& env, rgw_bucket_dir_entry& o, boost::optional next_key_name, ceph::real_time effective_mtime, const DoutPrefixProvider *dpp, WorkQ* wq) : cct(env.driver->ctx()), env(env), o(o), next_key_name(next_key_name), effective_mtime(effective_mtime), driver(env.driver), bucket(env.bucket), op(env.op), ol(env.ol), rctx(env.driver), dpp(dpp), wq(wq) { obj = bucket->get_object(o.key); } bool next_has_same_name(const std::string& key_name) { return (next_key_name && key_name.compare( boost::get(next_key_name)) == 0); } }; /* lc_op_ctx */ static std::string lc_id = "rgw lifecycle"; static std::string lc_req_id = "0"; static int remove_expired_obj( const DoutPrefixProvider *dpp, lc_op_ctx& oc, bool remove_indeed, rgw::notify::EventType event_type) { auto& driver = oc.driver; auto& bucket_info = oc.bucket->get_info(); auto& o = oc.o; auto obj_key = o.key; auto& meta = o.meta; int ret; std::string version_id; std::unique_ptr notify; if (!remove_indeed) { obj_key.instance.clear(); } else if (obj_key.instance.empty()) { obj_key.instance = "null"; } std::unique_ptr bucket; std::unique_ptr obj; ret = driver->get_bucket(nullptr, bucket_info, &bucket); if (ret < 0) { return ret; } // XXXX currently, rgw::sal::Bucket.owner is always null here std::unique_ptr user; if (! bucket->get_owner()) { auto& bucket_info = bucket->get_info(); user = driver->get_user(bucket_info.owner); // forgive me, lord if (user) { bucket->set_owner(user.get()); } } obj = bucket->get_object(obj_key); RGWObjState* obj_state{nullptr}; ret = obj->get_obj_state(dpp, &obj_state, null_yield, true); if (ret < 0) { return ret; } std::unique_ptr del_op = obj->get_delete_op(); del_op->params.versioning_status = obj->get_bucket()->get_info().versioning_status(); del_op->params.obj_owner.set_id(rgw_user {meta.owner}); del_op->params.obj_owner.set_name(meta.owner_display_name); del_op->params.bucket_owner.set_id(bucket_info.owner); del_op->params.unmod_since = meta.mtime; del_op->params.marker_version_id = version_id; // notification supported only for RADOS driver for now notify = driver->get_notification(dpp, obj.get(), nullptr, event_type, bucket.get(), lc_id, const_cast(oc.bucket->get_tenant()), lc_req_id, null_yield); ret = notify->publish_reserve(dpp, nullptr); if ( ret < 0) { ldpp_dout(dpp, 1) << "ERROR: notify reservation failed, deferring delete of object k=" << o.key << dendl; return ret; } ret = del_op->delete_obj(dpp, null_yield); if (ret < 0) { ldpp_dout(dpp, 1) << "ERROR: publishing notification failed, with error: " << ret << dendl; } else { // send request to notification manager (void) notify->publish_commit(dpp, obj_state->size, ceph::real_clock::now(), obj_state->attrset[RGW_ATTR_ETAG].to_str(), version_id); } return ret; } /* remove_expired_obj */ class LCOpAction { public: virtual ~LCOpAction() {} virtual bool check(lc_op_ctx& oc, ceph::real_time *exp_time, const DoutPrefixProvider *dpp) { return false; } /* called after check(). Check should tell us whether this action * is applicable. If there are multiple actions, we'll end up executing * the latest applicable action * For example: * one action after 10 days, another after 20, third after 40. * After 10 days, the latest applicable action would be the first one, * after 20 days it will be the second one. After 21 days it will still be the * second one. So check() should return true for the second action at that point, * but should_process() if the action has already been applied. In object removal * it doesn't matter, but in object transition it does. */ virtual bool should_process() { return true; } virtual int process(lc_op_ctx& oc) { return 0; } friend class LCOpRule; }; /* LCOpAction */ class LCOpFilter { public: virtual ~LCOpFilter() {} virtual bool check(const DoutPrefixProvider *dpp, lc_op_ctx& oc) { return false; } }; /* LCOpFilter */ class LCOpRule { friend class LCOpAction; op_env env; boost::optional next_key_name; ceph::real_time effective_mtime; std::vector > filters; // n.b., sharing ovhd std::vector > actions; public: LCOpRule(op_env& _env) : env(_env) {} boost::optional get_next_key_name() { return next_key_name; } std::vector>& get_actions() { return actions; } void build(); void update(); int process(rgw_bucket_dir_entry& o, const DoutPrefixProvider *dpp, WorkQ* wq); }; /* LCOpRule */ using WorkItem = boost::variant, /* uncompleted MPU expiration */ std::tuple, rgw_bucket_dir_entry>; class WorkQ : public Thread { public: using unique_lock = std::unique_lock; using work_f = std::function; using dequeue_result = boost::variant; static constexpr uint32_t FLAG_NONE = 0x0000; static constexpr uint32_t FLAG_EWAIT_SYNC = 0x0001; static constexpr uint32_t FLAG_DWAIT_SYNC = 0x0002; static constexpr uint32_t FLAG_EDRAIN_SYNC = 0x0004; private: const work_f bsf = [](RGWLC::LCWorker* wk, WorkQ* wq, WorkItem& wi) {}; RGWLC::LCWorker* wk; uint32_t qmax; int ix; std::mutex mtx; std::condition_variable cv; uint32_t flags; vector items; work_f f; public: WorkQ(RGWLC::LCWorker* wk, uint32_t ix, uint32_t qmax) : wk(wk), qmax(qmax), ix(ix), flags(FLAG_NONE), f(bsf) { create(thr_name().c_str()); } std::string thr_name() { return std::string{"wp_thrd: "} + std::to_string(wk->ix) + ", " + std::to_string(ix); } void setf(work_f _f) { f = _f; } void enqueue(WorkItem&& item) { unique_lock uniq(mtx); while ((!wk->get_lc()->going_down()) && (items.size() > qmax)) { flags |= FLAG_EWAIT_SYNC; cv.wait_for(uniq, 200ms); } items.push_back(item); if (flags & FLAG_DWAIT_SYNC) { flags &= ~FLAG_DWAIT_SYNC; cv.notify_one(); } } void drain() { unique_lock uniq(mtx); flags |= FLAG_EDRAIN_SYNC; while (flags & FLAG_EDRAIN_SYNC) { cv.wait_for(uniq, 200ms); } } private: dequeue_result dequeue() { unique_lock uniq(mtx); while ((!wk->get_lc()->going_down()) && (items.size() == 0)) { /* clear drain state, as we are NOT doing work and qlen==0 */ if (flags & FLAG_EDRAIN_SYNC) { flags &= ~FLAG_EDRAIN_SYNC; } flags |= FLAG_DWAIT_SYNC; cv.wait_for(uniq, 200ms); } if (items.size() > 0) { auto item = items.back(); items.pop_back(); if (flags & FLAG_EWAIT_SYNC) { flags &= ~FLAG_EWAIT_SYNC; cv.notify_one(); } return {item}; } return nullptr; } void* entry() override { while (!wk->get_lc()->going_down()) { auto item = dequeue(); if (item.which() == 0) { /* going down */ break; } f(wk, this, boost::get(item)); } return nullptr; } }; /* WorkQ */ class RGWLC::WorkPool { using TVector = ceph::containers::tiny_vector; TVector wqs; uint64_t ix; public: WorkPool(RGWLC::LCWorker* wk, uint16_t n_threads, uint32_t qmax) : wqs(TVector{ n_threads, [&](const size_t ix, auto emplacer) { emplacer.emplace(wk, ix, qmax); }}), ix(0) {} ~WorkPool() { for (auto& wq : wqs) { wq.join(); } } void setf(WorkQ::work_f _f) { for (auto& wq : wqs) { wq.setf(_f); } } void enqueue(WorkItem item) { const auto tix = ix; ix = (ix+1) % wqs.size(); (wqs[tix]).enqueue(std::move(item)); } void drain() { for (auto& wq : wqs) { wq.drain(); } } }; /* WorkPool */ RGWLC::LCWorker::LCWorker(const DoutPrefixProvider* dpp, CephContext *cct, RGWLC *lc, int ix) : dpp(dpp), cct(cct), lc(lc), ix(ix) { auto wpw = cct->_conf.get_val("rgw_lc_max_wp_worker"); workpool = new WorkPool(this, wpw, 512); } static inline bool worker_should_stop(time_t stop_at, bool once) { return !once && stop_at < time(nullptr); } int RGWLC::handle_multipart_expiration(rgw::sal::Bucket* target, const multimap& prefix_map, LCWorker* worker, time_t stop_at, bool once) { MultipartMetaFilter mp_filter; int ret; rgw::sal::Bucket::ListParams params; rgw::sal::Bucket::ListResults results; auto delay_ms = cct->_conf.get_val("rgw_lc_thread_delay"); params.list_versions = false; /* lifecycle processing does not depend on total order, so can * take advantage of unordered listing optimizations--such as * operating on one shard at a time */ params.allow_unordered = true; params.ns = RGW_OBJ_NS_MULTIPART; params.access_list_filter = &mp_filter; auto pf = [&](RGWLC::LCWorker* wk, WorkQ* wq, WorkItem& wi) { auto wt = boost::get>(wi); auto& [rule, obj] = wt; if (obj_has_expired(this, cct, obj.meta.mtime, rule.mp_expiration)) { rgw_obj_key key(obj.key); std::unique_ptr mpu = target->get_multipart_upload(key.name); int ret = mpu->abort(this, cct); if (ret == 0) { if (perfcounter) { perfcounter->inc(l_rgw_lc_abort_mpu, 1); } } else { if (ret == -ERR_NO_SUCH_UPLOAD) { ldpp_dout(wk->get_lc(), 5) << "ERROR: abort_multipart_upload failed, ret=" << ret << ", thread:" << wq->thr_name() << ", meta:" << obj.key << dendl; } else { ldpp_dout(wk->get_lc(), 0) << "ERROR: abort_multipart_upload failed, ret=" << ret << ", thread:" << wq->thr_name() << ", meta:" << obj.key << dendl; } } /* abort failed */ } /* expired */ }; worker->workpool->setf(pf); for (auto prefix_iter = prefix_map.begin(); prefix_iter != prefix_map.end(); ++prefix_iter) { if (worker_should_stop(stop_at, once)) { ldpp_dout(this, 5) << __func__ << " interval budget EXPIRED worker " << worker->ix << dendl; return 0; } if (!prefix_iter->second.status || prefix_iter->second.mp_expiration <= 0) { continue; } params.prefix = prefix_iter->first; do { auto offset = 0; results.objs.clear(); ret = target->list(this, params, 1000, results, null_yield); if (ret < 0) { if (ret == (-ENOENT)) return 0; ldpp_dout(this, 0) << "ERROR: driver->list_objects():" < t1 = {prefix_iter->second, *obj_iter}; worker->workpool->enqueue(WorkItem{t1}); if (going_down()) { return 0; } } /* for objs */ if ((offset % 100) == 0) { if (worker_should_stop(stop_at, once)) { ldpp_dout(this, 5) << __func__ << " interval budget EXPIRED worker " << worker->ix << dendl; return 0; } } std::this_thread::sleep_for(std::chrono::milliseconds(delay_ms)); } while(results.is_truncated); } /* for prefix_map */ worker->workpool->drain(); return 0; } /* RGWLC::handle_multipart_expiration */ static int read_obj_tags(const DoutPrefixProvider *dpp, rgw::sal::Object* obj, bufferlist& tags_bl) { std::unique_ptr rop = obj->get_read_op(); return rop->get_attr(dpp, RGW_ATTR_TAGS, tags_bl, null_yield); } static bool is_valid_op(const lc_op& op) { return (op.status && (op.expiration > 0 || op.expiration_date != boost::none || op.noncur_expiration > 0 || op.dm_expiration || !op.transitions.empty() || !op.noncur_transitions.empty())); } static bool zone_check(const lc_op& op, rgw::sal::Zone* zone) { if (zone->get_tier_type() == "archive") { return (op.rule_flags & uint32_t(LCFlagType::ArchiveZone)); } else { return (! (op.rule_flags & uint32_t(LCFlagType::ArchiveZone))); } } static inline bool has_all_tags(const lc_op& rule_action, const RGWObjTags& object_tags) { if(! rule_action.obj_tags) return false; if(object_tags.count() < rule_action.obj_tags->count()) return false; size_t tag_count = 0; for (const auto& tag : object_tags.get_tags()) { const auto& rule_tags = rule_action.obj_tags->get_tags(); const auto& iter = rule_tags.find(tag.first); if(iter == rule_tags.end()) continue; if(iter->second == tag.second) { tag_count++; } /* all tags in the rule appear in obj tags */ } return tag_count == rule_action.obj_tags->count(); } static int check_tags(const DoutPrefixProvider *dpp, lc_op_ctx& oc, bool *skip) { auto& op = oc.op; if (op.obj_tags != boost::none) { *skip = true; bufferlist tags_bl; int ret = read_obj_tags(dpp, oc.obj.get(), tags_bl); if (ret < 0) { if (ret != -ENODATA) { ldpp_dout(oc.dpp, 5) << "ERROR: read_obj_tags returned r=" << ret << " " << oc.wq->thr_name() << dendl; } return 0; } RGWObjTags dest_obj_tags; try { auto iter = tags_bl.cbegin(); dest_obj_tags.decode(iter); } catch (buffer::error& err) { ldpp_dout(oc.dpp,0) << "ERROR: caught buffer::error, couldn't decode TagSet " << oc.wq->thr_name() << dendl; return -EIO; } if (! has_all_tags(op, dest_obj_tags)) { ldpp_dout(oc.dpp, 20) << __func__ << "() skipping obj " << oc.obj << " as tags do not match in rule: " << op.id << " " << oc.wq->thr_name() << dendl; return 0; } } *skip = false; return 0; } class LCOpFilter_Tags : public LCOpFilter { public: bool check(const DoutPrefixProvider *dpp, lc_op_ctx& oc) override { auto& o = oc.o; if (o.is_delete_marker()) { return true; } bool skip; int ret = check_tags(dpp, oc, &skip); if (ret < 0) { if (ret == -ENOENT) { return false; } ldpp_dout(oc.dpp, 0) << "ERROR: check_tags on obj=" << oc.obj << " returned ret=" << ret << " " << oc.wq->thr_name() << dendl; return false; } return !skip; }; }; class LCOpAction_CurrentExpiration : public LCOpAction { public: LCOpAction_CurrentExpiration(op_env& env) {} bool check(lc_op_ctx& oc, ceph::real_time *exp_time, const DoutPrefixProvider *dpp) override { auto& o = oc.o; if (!o.is_current()) { ldpp_dout(dpp, 20) << __func__ << "(): key=" << o.key << ": not current, skipping " << oc.wq->thr_name() << dendl; return false; } if (o.is_delete_marker()) { if (oc.next_key_name) { std::string nkn = *oc.next_key_name; if (oc.next_has_same_name(o.key.name)) { ldpp_dout(dpp, 7) << __func__ << "(): dm-check SAME: key=" << o.key << " next_key_name: %%" << nkn << "%% " << oc.wq->thr_name() << dendl; return false; } else { ldpp_dout(dpp, 7) << __func__ << "(): dm-check DELE: key=" << o.key << " next_key_name: %%" << nkn << "%% " << oc.wq->thr_name() << dendl; *exp_time = real_clock::now(); return true; } } return false; } auto& mtime = o.meta.mtime; bool is_expired; auto& op = oc.op; if (op.expiration <= 0) { if (op.expiration_date == boost::none) { ldpp_dout(dpp, 20) << __func__ << "(): key=" << o.key << ": no expiration set in rule, skipping " << oc.wq->thr_name() << dendl; return false; } is_expired = ceph_clock_now() >= ceph::real_clock::to_time_t(*op.expiration_date); *exp_time = *op.expiration_date; } else { is_expired = obj_has_expired(dpp, oc.cct, mtime, op.expiration, exp_time); } ldpp_dout(dpp, 20) << __func__ << "(): key=" << o.key << ": is_expired=" << (int)is_expired << " " << oc.wq->thr_name() << dendl; return is_expired; } int process(lc_op_ctx& oc) { auto& o = oc.o; int r; if (o.is_delete_marker()) { r = remove_expired_obj(oc.dpp, oc, true, rgw::notify::ObjectExpirationDeleteMarker); if (r < 0) { ldpp_dout(oc.dpp, 0) << "ERROR: current is-dm remove_expired_obj " << oc.bucket << ":" << o.key << " " << cpp_strerror(r) << " " << oc.wq->thr_name() << dendl; return r; } ldpp_dout(oc.dpp, 2) << "DELETED: current is-dm " << oc.bucket << ":" << o.key << " " << oc.wq->thr_name() << dendl; } else { /* ! o.is_delete_marker() */ r = remove_expired_obj(oc.dpp, oc, !oc.bucket->versioned(), rgw::notify::ObjectExpirationCurrent); if (r < 0) { ldpp_dout(oc.dpp, 0) << "ERROR: remove_expired_obj " << oc.bucket << ":" << o.key << " " << cpp_strerror(r) << " " << oc.wq->thr_name() << dendl; return r; } if (perfcounter) { perfcounter->inc(l_rgw_lc_expire_current, 1); } ldpp_dout(oc.dpp, 2) << "DELETED:" << oc.bucket << ":" << o.key << " " << oc.wq->thr_name() << dendl; } return 0; } }; class LCOpAction_NonCurrentExpiration : public LCOpAction { protected: public: LCOpAction_NonCurrentExpiration(op_env& env) {} bool check(lc_op_ctx& oc, ceph::real_time *exp_time, const DoutPrefixProvider *dpp) override { auto& o = oc.o; if (o.is_current()) { ldpp_dout(dpp, 20) << __func__ << "(): key=" << o.key << ": current version, skipping " << oc.wq->thr_name() << dendl; return false; } int expiration = oc.op.noncur_expiration; bool is_expired = obj_has_expired(dpp, oc.cct, oc.effective_mtime, expiration, exp_time); ldpp_dout(dpp, 20) << __func__ << "(): key=" << o.key << ": is_expired=" << is_expired << " " << oc.wq->thr_name() << dendl; return is_expired && pass_object_lock_check(oc.driver, oc.obj.get(), dpp); } int process(lc_op_ctx& oc) { auto& o = oc.o; int r = remove_expired_obj(oc.dpp, oc, true, rgw::notify::ObjectExpirationNoncurrent); if (r < 0) { ldpp_dout(oc.dpp, 0) << "ERROR: remove_expired_obj (non-current expiration) " << oc.bucket << ":" << o.key << " " << cpp_strerror(r) << " " << oc.wq->thr_name() << dendl; return r; } if (perfcounter) { perfcounter->inc(l_rgw_lc_expire_noncurrent, 1); } ldpp_dout(oc.dpp, 2) << "DELETED:" << oc.bucket << ":" << o.key << " (non-current expiration) " << oc.wq->thr_name() << dendl; return 0; } }; class LCOpAction_DMExpiration : public LCOpAction { public: LCOpAction_DMExpiration(op_env& env) {} bool check(lc_op_ctx& oc, ceph::real_time *exp_time, const DoutPrefixProvider *dpp) override { auto& o = oc.o; if (!o.is_delete_marker()) { ldpp_dout(dpp, 20) << __func__ << "(): key=" << o.key << ": not a delete marker, skipping " << oc.wq->thr_name() << dendl; return false; } if (oc.next_has_same_name(o.key.name)) { ldpp_dout(dpp, 20) << __func__ << "(): key=" << o.key << ": next is same object, skipping " << oc.wq->thr_name() << dendl; return false; } *exp_time = real_clock::now(); return true; } int process(lc_op_ctx& oc) { auto& o = oc.o; int r = remove_expired_obj(oc.dpp, oc, true, rgw::notify::ObjectExpirationDeleteMarker); if (r < 0) { ldpp_dout(oc.dpp, 0) << "ERROR: remove_expired_obj (delete marker expiration) " << oc.bucket << ":" << o.key << " " << cpp_strerror(r) << " " << oc.wq->thr_name() << dendl; return r; } if (perfcounter) { perfcounter->inc(l_rgw_lc_expire_dm, 1); } ldpp_dout(oc.dpp, 2) << "DELETED:" << oc.bucket << ":" << o.key << " (delete marker expiration) " << oc.wq->thr_name() << dendl; return 0; } }; class LCOpAction_Transition : public LCOpAction { const transition_action& transition; bool need_to_process{false}; protected: virtual bool check_current_state(bool is_current) = 0; virtual ceph::real_time get_effective_mtime(lc_op_ctx& oc) = 0; public: LCOpAction_Transition(const transition_action& _transition) : transition(_transition) {} bool check(lc_op_ctx& oc, ceph::real_time *exp_time, const DoutPrefixProvider *dpp) override { auto& o = oc.o; if (o.is_delete_marker()) { return false; } if (!check_current_state(o.is_current())) { return false; } auto mtime = get_effective_mtime(oc); bool is_expired; if (transition.days < 0) { if (transition.date == boost::none) { ldpp_dout(dpp, 20) << __func__ << "(): key=" << o.key << ": no transition day/date set in rule, skipping " << oc.wq->thr_name() << dendl; return false; } is_expired = ceph_clock_now() >= ceph::real_clock::to_time_t(*transition.date); *exp_time = *transition.date; } else { is_expired = obj_has_expired(dpp, oc.cct, mtime, transition.days, exp_time); } ldpp_dout(oc.dpp, 20) << __func__ << "(): key=" << o.key << ": is_expired=" << is_expired << " " << oc.wq->thr_name() << dendl; need_to_process = (rgw_placement_rule::get_canonical_storage_class(o.meta.storage_class) != transition.storage_class); return is_expired; } bool should_process() override { return need_to_process; } int delete_tier_obj(lc_op_ctx& oc) { int ret = 0; /* If bucket is versioned, create delete_marker for current version */ if (oc.bucket->versioned() && oc.o.is_current() && !oc.o.is_delete_marker()) { ret = remove_expired_obj(oc.dpp, oc, false, rgw::notify::ObjectExpiration); ldpp_dout(oc.dpp, 20) << "delete_tier_obj Object(key:" << oc.o.key << ") current & not delete_marker" << " versioned_epoch: " << oc.o.versioned_epoch << "flags: " << oc.o.flags << dendl; } else { ret = remove_expired_obj(oc.dpp, oc, true, rgw::notify::ObjectExpiration); ldpp_dout(oc.dpp, 20) << "delete_tier_obj Object(key:" << oc.o.key << ") not current " << "versioned_epoch: " << oc.o.versioned_epoch << "flags: " << oc.o.flags << dendl; } return ret; } int transition_obj_to_cloud(lc_op_ctx& oc) { /* If CurrentVersion object, remove it & create delete marker */ bool delete_object = (!oc.tier->retain_head_object() || (oc.o.is_current() && oc.bucket->versioned())); int ret = oc.obj->transition_to_cloud(oc.bucket, oc.tier.get(), oc.o, oc.env.worker->get_cloud_targets(), oc.cct, !delete_object, oc.dpp, null_yield); if (ret < 0) { return ret; } if (delete_object) { ret = delete_tier_obj(oc); if (ret < 0) { ldpp_dout(oc.dpp, 0) << "ERROR: Deleting tier object(" << oc.o.key << ") failed ret=" << ret << dendl; return ret; } } return 0; } int process(lc_op_ctx& oc) { auto& o = oc.o; int r; if (oc.o.meta.category == RGWObjCategory::CloudTiered) { /* Skip objects which are already cloud tiered. */ ldpp_dout(oc.dpp, 30) << "Object(key:" << oc.o.key << ") is already cloud tiered to cloud-s3 tier: " << oc.o.meta.storage_class << dendl; return 0; } std::string tier_type = ""; rgw::sal::ZoneGroup& zonegroup = oc.driver->get_zone()->get_zonegroup(); rgw_placement_rule target_placement; target_placement.inherit_from(oc.bucket->get_placement_rule()); target_placement.storage_class = transition.storage_class; r = zonegroup.get_placement_tier(target_placement, &oc.tier); if (!r && oc.tier->get_tier_type() == "cloud-s3") { ldpp_dout(oc.dpp, 30) << "Found cloud s3 tier: " << target_placement.storage_class << dendl; if (!oc.o.is_current() && !pass_object_lock_check(oc.driver, oc.obj.get(), oc.dpp)) { /* Skip objects which has object lock enabled. */ ldpp_dout(oc.dpp, 10) << "Object(key:" << oc.o.key << ") is locked. Skipping transition to cloud-s3 tier: " << target_placement.storage_class << dendl; return 0; } r = transition_obj_to_cloud(oc); if (r < 0) { ldpp_dout(oc.dpp, 0) << "ERROR: failed to transition obj(key:" << oc.o.key << ") to cloud (r=" << r << ")" << dendl; return r; } } else { if (!oc.driver->valid_placement(target_placement)) { ldpp_dout(oc.dpp, 0) << "ERROR: non existent dest placement: " << target_placement << " bucket="<< oc.bucket << " rule_id=" << oc.op.id << " " << oc.wq->thr_name() << dendl; return -EINVAL; } int r = oc.obj->transition(oc.bucket, target_placement, o.meta.mtime, o.versioned_epoch, oc.dpp, null_yield); if (r < 0) { ldpp_dout(oc.dpp, 0) << "ERROR: failed to transition obj " << oc.bucket << ":" << o.key << " -> " << transition.storage_class << " " << cpp_strerror(r) << " " << oc.wq->thr_name() << dendl; return r; } } ldpp_dout(oc.dpp, 2) << "TRANSITIONED:" << oc.bucket << ":" << o.key << " -> " << transition.storage_class << " " << oc.wq->thr_name() << dendl; return 0; } }; class LCOpAction_CurrentTransition : public LCOpAction_Transition { protected: bool check_current_state(bool is_current) override { return is_current; } ceph::real_time get_effective_mtime(lc_op_ctx& oc) override { return oc.o.meta.mtime; } public: LCOpAction_CurrentTransition(const transition_action& _transition) : LCOpAction_Transition(_transition) {} int process(lc_op_ctx& oc) { int r = LCOpAction_Transition::process(oc); if (r == 0) { if (perfcounter) { perfcounter->inc(l_rgw_lc_transition_current, 1); } } return r; } }; class LCOpAction_NonCurrentTransition : public LCOpAction_Transition { protected: bool check_current_state(bool is_current) override { return !is_current; } ceph::real_time get_effective_mtime(lc_op_ctx& oc) override { return oc.effective_mtime; } public: LCOpAction_NonCurrentTransition(op_env& env, const transition_action& _transition) : LCOpAction_Transition(_transition) {} int process(lc_op_ctx& oc) { int r = LCOpAction_Transition::process(oc); if (r == 0) { if (perfcounter) { perfcounter->inc(l_rgw_lc_transition_noncurrent, 1); } } return r; } }; void LCOpRule::build() { filters.emplace_back(new LCOpFilter_Tags); auto& op = env.op; if (op.expiration > 0 || op.expiration_date != boost::none) { actions.emplace_back(new LCOpAction_CurrentExpiration(env)); } if (op.dm_expiration) { actions.emplace_back(new LCOpAction_DMExpiration(env)); } if (op.noncur_expiration > 0) { actions.emplace_back(new LCOpAction_NonCurrentExpiration(env)); } for (auto& iter : op.transitions) { actions.emplace_back(new LCOpAction_CurrentTransition(iter.second)); } for (auto& iter : op.noncur_transitions) { actions.emplace_back(new LCOpAction_NonCurrentTransition(env, iter.second)); } } void LCOpRule::update() { next_key_name = env.ol.next_key_name(); effective_mtime = env.ol.get_prev_obj().meta.mtime; } int LCOpRule::process(rgw_bucket_dir_entry& o, const DoutPrefixProvider *dpp, WorkQ* wq) { lc_op_ctx ctx(env, o, next_key_name, effective_mtime, dpp, wq); shared_ptr *selected = nullptr; // n.b., req'd by sharing real_time exp; for (auto& a : actions) { real_time action_exp; if (a->check(ctx, &action_exp, dpp)) { if (action_exp > exp) { exp = action_exp; selected = &a; } } } if (selected && (*selected)->should_process()) { /* * Calling filter checks after action checks because * all action checks (as they are implemented now) do * not access the objects themselves, but return result * from info from bucket index listing. The current tags filter * check does access the objects, so we avoid unnecessary rados calls * having filters check later in the process. */ bool cont = false; for (auto& f : filters) { if (f->check(dpp, ctx)) { cont = true; break; } } if (!cont) { ldpp_dout(dpp, 20) << __func__ << "(): key=" << o.key << ": no rule match, skipping " << wq->thr_name() << dendl; return 0; } int r = (*selected)->process(ctx); if (r < 0) { ldpp_dout(dpp, 0) << "ERROR: remove_expired_obj " << env.bucket << ":" << o.key << " " << cpp_strerror(r) << " " << wq->thr_name() << dendl; return r; } ldpp_dout(dpp, 20) << "processed:" << env.bucket << ":" << o.key << " " << wq->thr_name() << dendl; } return 0; } int RGWLC::bucket_lc_process(string& shard_id, LCWorker* worker, time_t stop_at, bool once) { RGWLifecycleConfiguration config(cct); std::unique_ptr bucket; string no_ns, list_versions; vector objs; vector result; boost::split(result, shard_id, boost::is_any_of(":")); string bucket_tenant = result[0]; string bucket_name = result[1]; string bucket_marker = result[2]; ldpp_dout(this, 5) << "RGWLC::bucket_lc_process ENTER " << bucket_name << dendl; if (unlikely(cct->_conf->rgwlc_skip_bucket_step)) { return 0; } int ret = driver->get_bucket(this, nullptr, bucket_tenant, bucket_name, &bucket, null_yield); if (ret < 0) { ldpp_dout(this, 0) << "LC:get_bucket for " << bucket_name << " failed" << dendl; return ret; } ret = bucket->load_bucket(this, null_yield); if (ret < 0) { ldpp_dout(this, 0) << "LC:load_bucket for " << bucket_name << " failed" << dendl; return ret; } auto stack_guard = make_scope_guard( [&worker] { worker->workpool->drain(); } ); if (bucket->get_marker() != bucket_marker) { ldpp_dout(this, 1) << "LC: deleting stale entry found for bucket=" << bucket_tenant << ":" << bucket_name << " cur_marker=" << bucket->get_marker() << " orig_marker=" << bucket_marker << dendl; return -ENOENT; } map::iterator aiter = bucket->get_attrs().find(RGW_ATTR_LC); if (aiter == bucket->get_attrs().end()) { ldpp_dout(this, 0) << "WARNING: bucket_attrs.find(RGW_ATTR_LC) failed for " << bucket_name << " (terminates bucket_lc_process(...))" << dendl; return 0; } bufferlist::const_iterator iter{&aiter->second}; try { config.decode(iter); } catch (const buffer::error& e) { ldpp_dout(this, 0) << __func__ << "() decode life cycle config failed" << dendl; return -1; } /* fetch information for zone checks */ rgw::sal::Zone* zone = driver->get_zone(); auto pf = [](RGWLC::LCWorker* wk, WorkQ* wq, WorkItem& wi) { auto wt = boost::get>(wi); auto& [op_rule, o] = wt; ldpp_dout(wk->get_lc(), 20) << __func__ << "(): key=" << o.key << wq->thr_name() << dendl; int ret = op_rule.process(o, wk->dpp, wq); if (ret < 0) { ldpp_dout(wk->get_lc(), 20) << "ERROR: orule.process() returned ret=" << ret << "thread:" << wq->thr_name() << dendl; } }; worker->workpool->setf(pf); multimap& prefix_map = config.get_prefix_map(); ldpp_dout(this, 10) << __func__ << "() prefix_map size=" << prefix_map.size() << dendl; rgw_obj_key pre_marker; rgw_obj_key next_marker; for(auto prefix_iter = prefix_map.begin(); prefix_iter != prefix_map.end(); ++prefix_iter) { if (worker_should_stop(stop_at, once)) { ldpp_dout(this, 5) << __func__ << " interval budget EXPIRED worker " << worker->ix << dendl; return 0; } auto& op = prefix_iter->second; if (!is_valid_op(op)) { continue; } ldpp_dout(this, 20) << __func__ << "(): prefix=" << prefix_iter->first << dendl; if (prefix_iter != prefix_map.begin() && (prefix_iter->first.compare(0, prev(prefix_iter)->first.length(), prev(prefix_iter)->first) == 0)) { next_marker = pre_marker; } else { pre_marker = next_marker; } LCObjsLister ol(driver, bucket.get()); ol.set_prefix(prefix_iter->first); if (! zone_check(op, zone)) { ldpp_dout(this, 7) << "LC rule not executable in " << zone->get_tier_type() << " zone, skipping" << dendl; continue; } ret = ol.init(this); if (ret < 0) { if (ret == (-ENOENT)) return 0; ldpp_dout(this, 0) << "ERROR: driver->list_objects():" << dendl; return ret; } op_env oenv(op, driver, worker, bucket.get(), ol); LCOpRule orule(oenv); orule.build(); // why can't ctor do it? rgw_bucket_dir_entry* o{nullptr}; for (auto offset = 0; ol.get_obj(this, &o /* , fetch_barrier */); ++offset, ol.next()) { orule.update(); std::tuple t1 = {orule, *o}; worker->workpool->enqueue(WorkItem{t1}); if ((offset % 100) == 0) { if (worker_should_stop(stop_at, once)) { ldpp_dout(this, 5) << __func__ << " interval budget EXPIRED worker " << worker->ix << dendl; return 0; } } } worker->workpool->drain(); } ret = handle_multipart_expiration(bucket.get(), prefix_map, worker, stop_at, once); return ret; } class SimpleBackoff { const int max_retries; std::chrono::milliseconds sleep_ms; int retries{0}; public: SimpleBackoff(int max_retries, std::chrono::milliseconds initial_sleep_ms) : max_retries(max_retries), sleep_ms(initial_sleep_ms) {} SimpleBackoff(const SimpleBackoff&) = delete; SimpleBackoff& operator=(const SimpleBackoff&) = delete; int get_retries() const { return retries; } void reset() { retries = 0; } bool wait_backoff(const fu2::unique_function& barrier) { reset(); while (retries < max_retries) { auto r = barrier(); if (r) { return r; } std::this_thread::sleep_for(sleep_ms * 2 * retries++); } return false; } }; int RGWLC::bucket_lc_post(int index, int max_lock_sec, rgw::sal::Lifecycle::LCEntry& entry, int& result, LCWorker* worker) { utime_t lock_duration(cct->_conf->rgw_lc_lock_max_time, 0); std::unique_ptr lock = sal_lc->get_serializer(lc_index_lock_name, obj_names[index], cookie); ldpp_dout(this, 5) << "RGWLC::bucket_lc_post(): POST " << entry << " index: " << index << " worker ix: " << worker->ix << dendl; do { int ret = lock->try_lock(this, lock_duration, null_yield); if (ret == -EBUSY || ret == -EEXIST) { /* already locked by another lc processor */ ldpp_dout(this, 0) << "RGWLC::bucket_lc_post() failed to acquire lock on " << obj_names[index] << ", sleep 5, try again " << dendl; sleep(5); continue; } if (ret < 0) return 0; ldpp_dout(this, 20) << "RGWLC::bucket_lc_post() lock " << obj_names[index] << dendl; if (result == -ENOENT) { /* XXXX are we SURE the only way result could == ENOENT is when * there is no such bucket? It is currently the value returned * from bucket_lc_process(...) */ ret = sal_lc->rm_entry(obj_names[index], entry); if (ret < 0) { ldpp_dout(this, 0) << "RGWLC::bucket_lc_post() failed to remove entry " << obj_names[index] << dendl; } goto clean; } else if (result < 0) { entry.set_status(lc_failed); } else { entry.set_status(lc_complete); } ret = sal_lc->set_entry(obj_names[index], entry); if (ret < 0) { ldpp_dout(this, 0) << "RGWLC::process() failed to set entry on " << obj_names[index] << dendl; } clean: lock->unlock(); ldpp_dout(this, 20) << "RGWLC::bucket_lc_post() unlock " << obj_names[index] << dendl; return 0; } while (true); } /* RGWLC::bucket_lc_post */ int RGWLC::list_lc_progress(string& marker, uint32_t max_entries, vector>& progress_map, int& index) { progress_map.clear(); for(; index < max_objs; index++, marker="") { vector> entries; int ret = sal_lc->list_entries(obj_names[index], marker, max_entries, entries); if (ret < 0) { if (ret == -ENOENT) { ldpp_dout(this, 10) << __func__ << "() ignoring unfound lc object=" << obj_names[index] << dendl; continue; } else { return ret; } } progress_map.reserve(progress_map.size() + entries.size()); std::move(begin(entries), end(entries), std::back_inserter(progress_map)); //progress_map.insert(progress_map.end(), entries.begin(), entries.end()); /* update index, marker tuple */ if (progress_map.size() > 0) marker = progress_map.back()->get_bucket(); if (progress_map.size() >= max_entries) break; } return 0; } static inline vector random_sequence(uint32_t n) { vector v(n, 0); std::generate(v.begin(), v.end(), [ix = 0]() mutable { return ix++; }); std::random_device rd; std::default_random_engine rng{rd()}; std::shuffle(v.begin(), v.end(), rng); return v; } static inline int get_lc_index(CephContext *cct, const std::string& shard_id) { int max_objs = (cct->_conf->rgw_lc_max_objs > HASH_PRIME ? HASH_PRIME : cct->_conf->rgw_lc_max_objs); /* n.b. review hash algo */ int index = ceph_str_hash_linux(shard_id.c_str(), shard_id.size()) % HASH_PRIME % max_objs; return index; } static inline void get_lc_oid(CephContext *cct, const std::string& shard_id, string *oid) { /* n.b. review hash algo */ int index = get_lc_index(cct, shard_id); *oid = lc_oid_prefix; char buf[32]; snprintf(buf, 32, ".%d", index); oid->append(buf); return; } static std::string get_bucket_lc_key(const rgw_bucket& bucket){ return string_join_reserve(':', bucket.tenant, bucket.name, bucket.marker); } int RGWLC::process(LCWorker* worker, const std::unique_ptr& optional_bucket, bool once = false) { int ret = 0; int max_secs = cct->_conf->rgw_lc_lock_max_time; if (optional_bucket) { /* if a bucket is provided, this is a single-bucket run, and * can be processed without traversing any state entries (we * do need the entry {pro,epi}logue which update the state entry * for this bucket) */ auto bucket_lc_key = get_bucket_lc_key(optional_bucket->get_key()); auto index = get_lc_index(driver->ctx(), bucket_lc_key); ret = process_bucket(index, max_secs, worker, bucket_lc_key, once); return ret; } else { /* generate an index-shard sequence unrelated to any other * that might be running in parallel */ std::string all_buckets{""}; vector shard_seq = random_sequence(max_objs); for (auto index : shard_seq) { ret = process(index, max_secs, worker, once); if (ret < 0) return ret; } } return 0; } bool RGWLC::expired_session(time_t started) { if (! cct->_conf->rgwlc_auto_session_clear) { return false; } time_t interval = (cct->_conf->rgw_lc_debug_interval > 0) ? cct->_conf->rgw_lc_debug_interval : 24*60*60; auto now = time(nullptr); ldpp_dout(this, 16) << "RGWLC::expired_session" << " started: " << started << " interval: " << interval << "(*2==" << 2*interval << ")" << " now: " << now << dendl; return (started + 2*interval < now); } time_t RGWLC::thread_stop_at() { uint64_t interval = (cct->_conf->rgw_lc_debug_interval > 0) ? cct->_conf->rgw_lc_debug_interval : 24*60*60; return time(nullptr) + interval; } int RGWLC::process_bucket(int index, int max_lock_secs, LCWorker* worker, const std::string& bucket_entry_marker, bool once = false) { ldpp_dout(this, 5) << "RGWLC::process_bucket(): ENTER: " << "index: " << index << " worker ix: " << worker->ix << dendl; int ret = 0; std::unique_ptr serializer = sal_lc->get_serializer(lc_index_lock_name, obj_names[index], worker->thr_name()); std::unique_ptr entry; if (max_lock_secs <= 0) { return -EAGAIN; } utime_t time(max_lock_secs, 0); ret = serializer->try_lock(this, time, null_yield); if (ret == -EBUSY || ret == -EEXIST) { /* already locked by another lc processor */ ldpp_dout(this, 0) << "RGWLC::process() failed to acquire lock on " << obj_names[index] << dendl; return -EBUSY; } if (ret < 0) return 0; std::unique_lock lock( *(serializer.get()), std::adopt_lock); ret = sal_lc->get_entry(obj_names[index], bucket_entry_marker, &entry); if (ret >= 0) { if (entry->get_status() == lc_processing) { if (expired_session(entry->get_start_time())) { ldpp_dout(this, 5) << "RGWLC::process_bucket(): STALE lc session found for: " << entry << " index: " << index << " worker ix: " << worker->ix << " (clearing)" << dendl; } else { ldpp_dout(this, 5) << "RGWLC::process_bucket(): ACTIVE entry: " << entry << " index: " << index << " worker ix: " << worker->ix << dendl; return ret; } } } /* do nothing if no bucket */ if (entry->get_bucket().empty()) { return ret; } ldpp_dout(this, 5) << "RGWLC::process_bucket(): START entry 1: " << entry << " index: " << index << " worker ix: " << worker->ix << dendl; entry->set_status(lc_processing); ret = sal_lc->set_entry(obj_names[index], *entry); if (ret < 0) { ldpp_dout(this, 0) << "RGWLC::process_bucket() failed to set obj entry " << obj_names[index] << entry->get_bucket() << entry->get_status() << dendl; return ret; } ldpp_dout(this, 5) << "RGWLC::process_bucket(): START entry 2: " << entry << " index: " << index << " worker ix: " << worker->ix << dendl; lock.unlock(); ret = bucket_lc_process(entry->get_bucket(), worker, thread_stop_at(), once); bucket_lc_post(index, max_lock_secs, *entry, ret, worker); return ret; } /* RGWLC::process_bucket */ static inline bool allow_shard_rollover(CephContext* cct, time_t now, time_t shard_rollover_date) { /* return true iff: * - non-debug scheduling is in effect, and * - the current shard has not rolled over in the last 24 hours */ if (((shard_rollover_date < now) && (now - shard_rollover_date > 24*60*60)) || (! shard_rollover_date /* no rollover date stored */) || (cct->_conf->rgw_lc_debug_interval > 0 /* defaults to -1 == disabled */)) { return true; } return false; } /* allow_shard_rollover */ static inline bool already_run_today(CephContext* cct, time_t start_date) { struct tm bdt; time_t begin_of_day; utime_t now = ceph_clock_now(); localtime_r(&start_date, &bdt); if (cct->_conf->rgw_lc_debug_interval > 0) { if (now - start_date < cct->_conf->rgw_lc_debug_interval) return true; else return false; } bdt.tm_hour = 0; bdt.tm_min = 0; bdt.tm_sec = 0; begin_of_day = mktime(&bdt); if (now - begin_of_day < 24*60*60) return true; else return false; } /* already_run_today */ inline int RGWLC::advance_head(const std::string& lc_shard, rgw::sal::Lifecycle::LCHead& head, rgw::sal::Lifecycle::LCEntry& entry, time_t start_date) { int ret{0}; std::unique_ptr next_entry; ret = sal_lc->get_next_entry(lc_shard, entry.get_bucket(), &next_entry); if (ret < 0) { ldpp_dout(this, 0) << "RGWLC::process() failed to get obj entry " << lc_shard << dendl; goto exit; } /* save the next position */ head.set_marker(next_entry->get_bucket()); head.set_start_date(start_date); ret = sal_lc->put_head(lc_shard, head); if (ret < 0) { ldpp_dout(this, 0) << "RGWLC::process() failed to put head " << lc_shard << dendl; goto exit; } exit: return ret; } /* advance head */ int RGWLC::process(int index, int max_lock_secs, LCWorker* worker, bool once = false) { int ret{0}; const auto& lc_shard = obj_names[index]; std::unique_ptr head; std::unique_ptr entry; //string = bucket_name:bucket_id, start_time, int = LC_BUCKET_STATUS ldpp_dout(this, 5) << "RGWLC::process(): ENTER: " << "index: " << index << " worker ix: " << worker->ix << dendl; std::unique_ptr lock = sal_lc->get_serializer(lc_index_lock_name, lc_shard, worker->thr_name()); utime_t lock_for_s(max_lock_secs, 0); const auto& lock_lambda = [&]() { ret = lock->try_lock(this, lock_for_s, null_yield); if (ret == 0) { return true; } if (ret == -EBUSY || ret == -EEXIST) { /* already locked by another lc processor */ return false; } return false; }; SimpleBackoff shard_lock(5 /* max retries */, 50ms); if (! shard_lock.wait_backoff(lock_lambda)) { ldpp_dout(this, 0) << "RGWLC::process(): failed to aquire lock on " << lc_shard << " after " << shard_lock.get_retries() << dendl; return 0; } do { utime_t now = ceph_clock_now(); /* preamble: find an inital bucket/marker */ ret = sal_lc->get_head(lc_shard, &head); if (ret < 0) { ldpp_dout(this, 0) << "RGWLC::process() failed to get obj head " << lc_shard << ", ret=" << ret << dendl; goto exit; } /* if there is nothing at head, try to reinitialize head.marker with the * first entry in the queue */ if (head->get_marker().empty() && allow_shard_rollover(cct, now, head->get_shard_rollover_date()) /* prevent multiple passes by diff. * rgws,in same cycle */) { ldpp_dout(this, 5) << "RGWLC::process() process shard rollover lc_shard=" << lc_shard << " head.marker=" << head->get_marker() << " head.shard_rollover_date=" << head->get_shard_rollover_date() << dendl; vector> entries; int ret = sal_lc->list_entries(lc_shard, head->get_marker(), 1, entries); if (ret < 0) { ldpp_dout(this, 0) << "RGWLC::process() sal_lc->list_entries(lc_shard, head.marker, 1, " << "entries) returned error ret==" << ret << dendl; goto exit; } if (entries.size() > 0) { entry = std::move(entries.front()); head->set_marker(entry->get_bucket()); head->set_start_date(now); head->set_shard_rollover_date(0); } } else { ldpp_dout(this, 0) << "RGWLC::process() head.marker !empty() at START for shard==" << lc_shard << " head last stored at " << rgw_to_asctime(utime_t(time_t(head->get_start_date()), 0)) << dendl; /* fetches the entry pointed to by head.bucket */ ret = sal_lc->get_entry(lc_shard, head->get_marker(), &entry); if (ret < 0) { ldpp_dout(this, 0) << "RGWLC::process() sal_lc->get_entry(lc_shard, head.marker, entry) " << "returned error ret==" << ret << dendl; goto exit; } } if (entry && !entry->get_bucket().empty()) { if (entry->get_status() == lc_processing) { if (expired_session(entry->get_start_time())) { ldpp_dout(this, 5) << "RGWLC::process(): STALE lc session found for: " << entry << " index: " << index << " worker ix: " << worker->ix << " (clearing)" << dendl; } else { ldpp_dout(this, 5) << "RGWLC::process(): ACTIVE entry: " << entry << " index: " << index << " worker ix: " << worker->ix << dendl; /* skip to next entry */ if (advance_head(lc_shard, *head.get(), *entry.get(), now) < 0) { goto exit; } /* done with this shard */ if (head->get_marker().empty()) { ldpp_dout(this, 5) << "RGWLC::process() cycle finished lc_shard=" << lc_shard << dendl; head->set_shard_rollover_date(ceph_clock_now()); ret = sal_lc->put_head(lc_shard, *head.get()); if (ret < 0) { ldpp_dout(this, 0) << "RGWLC::process() failed to put head " << lc_shard << dendl; } goto exit; } continue; } } else { if ((entry->get_status() == lc_complete) && already_run_today(cct, entry->get_start_time())) { /* skip to next entry */ if (advance_head(lc_shard, *head.get(), *entry.get(), now) < 0) { goto exit; } ldpp_dout(this, 5) << "RGWLC::process() worker ix; " << worker->ix << " SKIP processing for already-processed bucket " << entry->get_bucket() << dendl; /* done with this shard */ if (head->get_marker().empty()) { ldpp_dout(this, 5) << "RGWLC::process() cycle finished lc_shard=" << lc_shard << dendl; head->set_shard_rollover_date(ceph_clock_now()); ret = sal_lc->put_head(lc_shard, *head.get()); if (ret < 0) { ldpp_dout(this, 0) << "RGWLC::process() failed to put head " << lc_shard << dendl; } goto exit; } continue; } } } else { ldpp_dout(this, 5) << "RGWLC::process() entry.bucket.empty() == true at START 1" << " (this is possible mainly before any lc policy has been stored" << " or after removal of an lc_shard object)" << dendl; goto exit; } /* When there are no more entries to process, entry will be * equivalent to an empty marker and so the following resets the * processing for the shard automatically when processing is * finished for the shard */ ldpp_dout(this, 5) << "RGWLC::process(): START entry 1: " << entry << " index: " << index << " worker ix: " << worker->ix << dendl; entry->set_status(lc_processing); entry->set_start_time(now); ret = sal_lc->set_entry(lc_shard, *entry); if (ret < 0) { ldpp_dout(this, 0) << "RGWLC::process() failed to set obj entry " << lc_shard << entry->get_bucket() << entry->get_status() << dendl; goto exit; } /* advance head for next waiter, then process */ if (advance_head(lc_shard, *head.get(), *entry.get(), now) < 0) { goto exit; } ldpp_dout(this, 5) << "RGWLC::process(): START entry 2: " << entry << " index: " << index << " worker ix: " << worker->ix << dendl; /* drop lock so other instances can make progress while this * bucket is being processed */ lock->unlock(); ret = bucket_lc_process(entry->get_bucket(), worker, thread_stop_at(), once); /* postamble */ //bucket_lc_post(index, max_lock_secs, entry, ret, worker); if (! shard_lock.wait_backoff(lock_lambda)) { ldpp_dout(this, 0) << "RGWLC::process(): failed to aquire lock on " << lc_shard << " after " << shard_lock.get_retries() << dendl; return 0; } if (ret == -ENOENT) { /* XXXX are we SURE the only way result could == ENOENT is when * there is no such bucket? It is currently the value returned * from bucket_lc_process(...) */ ret = sal_lc->rm_entry(lc_shard, *entry); if (ret < 0) { ldpp_dout(this, 0) << "RGWLC::process() failed to remove entry " << lc_shard << " (nonfatal)" << dendl; /* not fatal, could result from a race */ } } else { if (ret < 0) { entry->set_status(lc_failed); } else { entry->set_status(lc_complete); } ret = sal_lc->set_entry(lc_shard, *entry); if (ret < 0) { ldpp_dout(this, 0) << "RGWLC::process() failed to set entry on " << lc_shard << dendl; /* fatal, locked */ goto exit; } } /* done with this shard */ if (head->get_marker().empty()) { ldpp_dout(this, 5) << "RGWLC::process() cycle finished lc_shard=" << lc_shard << dendl; head->set_shard_rollover_date(ceph_clock_now()); ret = sal_lc->put_head(lc_shard, *head.get()); if (ret < 0) { ldpp_dout(this, 0) << "RGWLC::process() failed to put head " << lc_shard << dendl; } goto exit; } } while(1 && !once && !going_down()); exit: lock->unlock(); return 0; } void RGWLC::start_processor() { auto maxw = cct->_conf->rgw_lc_max_worker; workers.reserve(maxw); for (int ix = 0; ix < maxw; ++ix) { auto worker = std::make_unique(this /* dpp */, cct, this, ix); worker->create((string{"lifecycle_thr_"} + to_string(ix)).c_str()); workers.emplace_back(std::move(worker)); } } void RGWLC::stop_processor() { down_flag = true; for (auto& worker : workers) { worker->stop(); worker->join(); } workers.clear(); } unsigned RGWLC::get_subsys() const { return dout_subsys; } std::ostream& RGWLC::gen_prefix(std::ostream& out) const { return out << "lifecycle: "; } void RGWLC::LCWorker::stop() { std::lock_guard l{lock}; cond.notify_all(); } bool RGWLC::going_down() { return down_flag; } bool RGWLC::LCWorker::should_work(utime_t& now) { int start_hour; int start_minute; int end_hour; int end_minute; string worktime = cct->_conf->rgw_lifecycle_work_time; sscanf(worktime.c_str(),"%d:%d-%d:%d",&start_hour, &start_minute, &end_hour, &end_minute); struct tm bdt; time_t tt = now.sec(); localtime_r(&tt, &bdt); if (cct->_conf->rgw_lc_debug_interval > 0) { /* We're debugging, so say we can run */ return true; } else if ((bdt.tm_hour*60 + bdt.tm_min >= start_hour*60 + start_minute) && (bdt.tm_hour*60 + bdt.tm_min <= end_hour*60 + end_minute)) { return true; } else { return false; } } int RGWLC::LCWorker::schedule_next_start_time(utime_t &start, utime_t& now) { int secs; if (cct->_conf->rgw_lc_debug_interval > 0) { secs = start + cct->_conf->rgw_lc_debug_interval - now; if (secs < 0) secs = 0; return (secs); } int start_hour; int start_minute; int end_hour; int end_minute; string worktime = cct->_conf->rgw_lifecycle_work_time; sscanf(worktime.c_str(),"%d:%d-%d:%d",&start_hour, &start_minute, &end_hour, &end_minute); struct tm bdt; time_t tt = now.sec(); time_t nt; localtime_r(&tt, &bdt); bdt.tm_hour = start_hour; bdt.tm_min = start_minute; bdt.tm_sec = 0; nt = mktime(&bdt); secs = nt - tt; return secs>0 ? secs : secs+24*60*60; } RGWLC::LCWorker::~LCWorker() { delete workpool; } /* ~LCWorker */ void RGWLifecycleConfiguration::generate_test_instances( list& o) { o.push_back(new RGWLifecycleConfiguration); } template static int guard_lc_modify(const DoutPrefixProvider *dpp, rgw::sal::Driver* driver, rgw::sal::Lifecycle* sal_lc, const rgw_bucket& bucket, const string& cookie, const F& f) { CephContext *cct = driver->ctx(); auto bucket_lc_key = get_bucket_lc_key(bucket); string oid; get_lc_oid(cct, bucket_lc_key, &oid); /* XXX it makes sense to take shard_id for a bucket_id? */ std::unique_ptr entry = sal_lc->get_entry(); entry->set_bucket(bucket_lc_key); entry->set_status(lc_uninitial); int max_lock_secs = cct->_conf->rgw_lc_lock_max_time; std::unique_ptr lock = sal_lc->get_serializer(lc_index_lock_name, oid, cookie); utime_t time(max_lock_secs, 0); int ret; uint16_t retries{0}; // due to reports of starvation trying to save lifecycle policy, try hard do { ret = lock->try_lock(dpp, time, null_yield); if (ret == -EBUSY || ret == -EEXIST) { ldpp_dout(dpp, 0) << "RGWLC::RGWPutLC() failed to acquire lock on " << oid << ", retry in 100ms, ret=" << ret << dendl; std::this_thread::sleep_for(std::chrono::milliseconds(100)); // the typical S3 client will time out in 60s if(retries++ < 500) { continue; } } if (ret < 0) { ldpp_dout(dpp, 0) << "RGWLC::RGWPutLC() failed to acquire lock on " << oid << ", ret=" << ret << dendl; break; } ret = f(sal_lc, oid, *entry.get()); if (ret < 0) { ldpp_dout(dpp, 0) << "RGWLC::RGWPutLC() failed to set entry on " << oid << ", ret=" << ret << dendl; } break; } while(true); lock->unlock(); return ret; } int RGWLC::set_bucket_config(rgw::sal::Bucket* bucket, const rgw::sal::Attrs& bucket_attrs, RGWLifecycleConfiguration *config) { int ret{0}; rgw::sal::Attrs attrs = bucket_attrs; if (config) { /* if no RGWLifecycleconfiguration provided, it means * RGW_ATTR_LC is already valid and present */ bufferlist lc_bl; config->encode(lc_bl); attrs[RGW_ATTR_LC] = std::move(lc_bl); ret = bucket->merge_and_store_attrs(this, attrs, null_yield); if (ret < 0) { return ret; } } rgw_bucket& b = bucket->get_key(); ret = guard_lc_modify(this, driver, sal_lc.get(), b, cookie, [&](rgw::sal::Lifecycle* sal_lc, const string& oid, rgw::sal::Lifecycle::LCEntry& entry) { return sal_lc->set_entry(oid, entry); }); return ret; } int RGWLC::remove_bucket_config(rgw::sal::Bucket* bucket, const rgw::sal::Attrs& bucket_attrs, bool merge_attrs) { rgw::sal::Attrs attrs = bucket_attrs; rgw_bucket& b = bucket->get_key(); int ret{0}; if (merge_attrs) { attrs.erase(RGW_ATTR_LC); ret = bucket->merge_and_store_attrs(this, attrs, null_yield); if (ret < 0) { ldpp_dout(this, 0) << "RGWLC::RGWDeleteLC() failed to set attrs on bucket=" << b.name << " returned err=" << ret << dendl; return ret; } } ret = guard_lc_modify(this, driver, sal_lc.get(), b, cookie, [&](rgw::sal::Lifecycle* sal_lc, const string& oid, rgw::sal::Lifecycle::LCEntry& entry) { return sal_lc->rm_entry(oid, entry); }); return ret; } /* RGWLC::remove_bucket_config */ RGWLC::~RGWLC() { stop_processor(); finalize(); } /* ~RGWLC() */ namespace rgw::lc { int fix_lc_shard_entry(const DoutPrefixProvider *dpp, rgw::sal::Driver* driver, rgw::sal::Lifecycle* sal_lc, rgw::sal::Bucket* bucket) { if (auto aiter = bucket->get_attrs().find(RGW_ATTR_LC); aiter == bucket->get_attrs().end()) { return 0; // No entry, nothing to fix } auto bucket_lc_key = get_bucket_lc_key(bucket->get_key()); std::string lc_oid; get_lc_oid(driver->ctx(), bucket_lc_key, &lc_oid); std::unique_ptr entry; // There are multiple cases we need to encounter here // 1. entry exists and is already set to marker, happens in plain buckets & newly resharded buckets // 2. entry doesn't exist, which usually happens when reshard has happened prior to update and next LC process has already dropped the update // 3. entry exists matching the current bucket id which was after a reshard (needs to be updated to the marker) // We are not dropping the old marker here as that would be caught by the next LC process update int ret = sal_lc->get_entry(lc_oid, bucket_lc_key, &entry); if (ret == 0) { ldpp_dout(dpp, 5) << "Entry already exists, nothing to do" << dendl; return ret; // entry is already existing correctly set to marker } ldpp_dout(dpp, 5) << "lc_get_entry errored ret code=" << ret << dendl; if (ret == -ENOENT) { ldpp_dout(dpp, 1) << "No entry for bucket=" << bucket << " creating " << dendl; // TODO: we have too many ppl making cookies like this! char cookie_buf[COOKIE_LEN + 1]; gen_rand_alphanumeric(driver->ctx(), cookie_buf, sizeof(cookie_buf) - 1); std::string cookie = cookie_buf; ret = guard_lc_modify(dpp, driver, sal_lc, bucket->get_key(), cookie, [&lc_oid](rgw::sal::Lifecycle* slc, const string& oid, rgw::sal::Lifecycle::LCEntry& entry) { return slc->set_entry(lc_oid, entry); }); } return ret; } std::string s3_expiration_header( DoutPrefixProvider* dpp, const rgw_obj_key& obj_key, const RGWObjTags& obj_tagset, const ceph::real_time& mtime, const std::map& bucket_attrs) { CephContext* cct = dpp->get_cct(); RGWLifecycleConfiguration config(cct); std::string hdr{""}; const auto& aiter = bucket_attrs.find(RGW_ATTR_LC); if (aiter == bucket_attrs.end()) return hdr; bufferlist::const_iterator iter{&aiter->second}; try { config.decode(iter); } catch (const buffer::error& e) { ldpp_dout(dpp, 0) << __func__ << "() decode life cycle config failed" << dendl; return hdr; } /* catch */ /* dump tags at debug level 16 */ RGWObjTags::tag_map_t obj_tag_map = obj_tagset.get_tags(); if (cct->_conf->subsys.should_gather(ceph_subsys_rgw, 16)) { for (const auto& elt : obj_tag_map) { ldpp_dout(dpp, 16) << __func__ << "() key=" << elt.first << " val=" << elt.second << dendl; } } boost::optional expiration_date; boost::optional rule_id; const auto& rule_map = config.get_rule_map(); for (const auto& ri : rule_map) { const auto& rule = ri.second; auto& id = rule.get_id(); auto& filter = rule.get_filter(); auto& prefix = filter.has_prefix() ? filter.get_prefix(): rule.get_prefix(); auto& expiration = rule.get_expiration(); auto& noncur_expiration = rule.get_noncur_expiration(); ldpp_dout(dpp, 10) << "rule: " << ri.first << " prefix: " << prefix << " expiration: " << " date: " << expiration.get_date() << " days: " << expiration.get_days() << " noncur_expiration: " << " date: " << noncur_expiration.get_date() << " days: " << noncur_expiration.get_days() << dendl; /* skip if rule !enabled * if rule has prefix, skip iff object !match prefix * if rule has tags, skip iff object !match tags * note if object is current or non-current, compare accordingly * if rule has days, construct date expression and save iff older * than last saved * if rule has date, convert date expression and save iff older * than last saved * if the date accum has a value, format it into hdr */ if (! rule.is_enabled()) continue; if(! prefix.empty()) { if (! boost::starts_with(obj_key.name, prefix)) continue; } if (filter.has_tags()) { bool tag_match = false; const RGWObjTags& rule_tagset = filter.get_tags(); for (auto& tag : rule_tagset.get_tags()) { /* remember, S3 tags are {key,value} tuples */ tag_match = true; auto obj_tag = obj_tag_map.find(tag.first); if (obj_tag == obj_tag_map.end() || obj_tag->second != tag.second) { ldpp_dout(dpp, 10) << "tag does not match obj_key=" << obj_key << " rule_id=" << id << " tag=" << tag << dendl; tag_match = false; break; } } if (! tag_match) continue; } // compute a uniform expiration date boost::optional rule_expiration_date; const LCExpiration& rule_expiration = (obj_key.instance.empty()) ? expiration : noncur_expiration; if (rule_expiration.has_date()) { rule_expiration_date = boost::optional( ceph::from_iso_8601(rule.get_expiration().get_date())); } else { if (rule_expiration.has_days()) { rule_expiration_date = boost::optional( mtime + make_timespan(double(rule_expiration.get_days())*24*60*60 - ceph::real_clock::to_time_t(mtime)%(24*60*60) + 24*60*60)); } } // update earliest expiration if (rule_expiration_date) { if ((! expiration_date) || (*expiration_date > *rule_expiration_date)) { expiration_date = boost::optional(rule_expiration_date); rule_id = boost::optional(id); } } } // cond format header if (expiration_date && rule_id) { // Fri, 23 Dec 2012 00:00:00 GMT char exp_buf[100]; time_t exp = ceph::real_clock::to_time_t(*expiration_date); if (std::strftime(exp_buf, sizeof(exp_buf), "%a, %d %b %Y %T %Z", std::gmtime(&exp))) { hdr = fmt::format("expiry-date=\"{0}\", rule-id=\"{1}\"", exp_buf, *rule_id); } else { ldpp_dout(dpp, 0) << __func__ << "() strftime of life cycle expiration header failed" << dendl; } } return hdr; } /* rgwlc_s3_expiration_header */ bool s3_multipart_abort_header( DoutPrefixProvider* dpp, const rgw_obj_key& obj_key, const ceph::real_time& mtime, const std::map& bucket_attrs, ceph::real_time& abort_date, std::string& rule_id) { CephContext* cct = dpp->get_cct(); RGWLifecycleConfiguration config(cct); const auto& aiter = bucket_attrs.find(RGW_ATTR_LC); if (aiter == bucket_attrs.end()) return false; bufferlist::const_iterator iter{&aiter->second}; try { config.decode(iter); } catch (const buffer::error& e) { ldpp_dout(dpp, 0) << __func__ << "() decode life cycle config failed" << dendl; return false; } /* catch */ std::optional abort_date_tmp; std::optional rule_id_tmp; const auto& rule_map = config.get_rule_map(); for (const auto& ri : rule_map) { const auto& rule = ri.second; const auto& id = rule.get_id(); const auto& filter = rule.get_filter(); const auto& prefix = filter.has_prefix()?filter.get_prefix():rule.get_prefix(); const auto& mp_expiration = rule.get_mp_expiration(); if (!rule.is_enabled()) { continue; } if(!prefix.empty() && !boost::starts_with(obj_key.name, prefix)) { continue; } std::optional rule_abort_date; if (mp_expiration.has_days()) { rule_abort_date = std::optional( mtime + make_timespan(mp_expiration.get_days()*24*60*60 - ceph::real_clock::to_time_t(mtime)%(24*60*60) + 24*60*60)); } // update earliest abort date if (rule_abort_date) { if ((! abort_date_tmp) || (*abort_date_tmp > *rule_abort_date)) { abort_date_tmp = std::optional(rule_abort_date); rule_id_tmp = std::optional(id); } } } if (abort_date_tmp && rule_id_tmp) { abort_date = *abort_date_tmp; rule_id = *rule_id_tmp; return true; } else { return false; } } } /* namespace rgw::lc */ void lc_op::dump(Formatter *f) const { f->dump_bool("status", status); f->dump_bool("dm_expiration", dm_expiration); f->dump_int("expiration", expiration); f->dump_int("noncur_expiration", noncur_expiration); f->dump_int("mp_expiration", mp_expiration); if (expiration_date) { utime_t ut(*expiration_date); f->dump_stream("expiration_date") << ut; } if (obj_tags) { f->dump_object("obj_tags", *obj_tags); } f->open_object_section("transitions"); for(auto& [storage_class, transition] : transitions) { f->dump_object(storage_class, transition); } f->close_section(); f->open_object_section("noncur_transitions"); for (auto& [storage_class, transition] : noncur_transitions) { f->dump_object(storage_class, transition); } f->close_section(); } void LCFilter::dump(Formatter *f) const { f->dump_string("prefix", prefix); f->dump_object("obj_tags", obj_tags); if (have_flag(LCFlagType::ArchiveZone)) { f->dump_string("archivezone", ""); } } void LCExpiration::dump(Formatter *f) const { f->dump_string("days", days); f->dump_string("date", date); } void LCRule::dump(Formatter *f) const { f->dump_string("id", id); f->dump_string("prefix", prefix); f->dump_string("status", status); f->dump_object("expiration", expiration); f->dump_object("noncur_expiration", noncur_expiration); f->dump_object("mp_expiration", mp_expiration); f->dump_object("filter", filter); f->open_object_section("transitions"); for (auto& [storage_class, transition] : transitions) { f->dump_object(storage_class, transition); } f->close_section(); f->open_object_section("noncur_transitions"); for (auto& [storage_class, transition] : noncur_transitions) { f->dump_object(storage_class, transition); } f->close_section(); f->dump_bool("dm_expiration", dm_expiration); } void RGWLifecycleConfiguration::dump(Formatter *f) const { f->open_object_section("prefix_map"); for (auto& prefix : prefix_map) { f->dump_object(prefix.first.c_str(), prefix.second); } f->close_section(); f->open_array_section("rule_map"); for (auto& rule : rule_map) { f->open_object_section("entry"); f->dump_string("id", rule.first); f->open_object_section("rule"); rule.second.dump(f); f->close_section(); f->close_section(); } f->close_section(); }