summaryrefslogtreecommitdiffstats
path: root/src/rgw/rgw_lc.h
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--src/rgw/rgw_lc.h640
1 files changed, 640 insertions, 0 deletions
diff --git a/src/rgw/rgw_lc.h b/src/rgw/rgw_lc.h
new file mode 100644
index 000000000..bd8efd9b6
--- /dev/null
+++ b/src/rgw/rgw_lc.h
@@ -0,0 +1,640 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab ft=cpp
+
+#pragma once
+
+#include <map>
+#include <array>
+#include <string>
+#include <iostream>
+
+#include "common/debug.h"
+
+#include "include/types.h"
+#include "include/rados/librados.hpp"
+#include "common/ceph_mutex.h"
+#include "common/Cond.h"
+#include "common/iso_8601.h"
+#include "common/Thread.h"
+#include "rgw_common.h"
+#include "cls/rgw/cls_rgw_types.h"
+#include "rgw_tag.h"
+#include "rgw_sal.h"
+
+#include <atomic>
+#include <tuple>
+
+#define HASH_PRIME 7877
+#define MAX_ID_LEN 255
+static std::string lc_oid_prefix = "lc";
+static std::string lc_index_lock_name = "lc_process";
+
+extern const char* LC_STATUS[];
+
+typedef enum {
+ lc_uninitial = 0,
+ lc_processing,
+ lc_failed,
+ lc_complete,
+} LC_BUCKET_STATUS;
+
+class LCExpiration
+{
+protected:
+ std::string days;
+ //At present only current object has expiration date
+ std::string date;
+public:
+ LCExpiration() {}
+ LCExpiration(const std::string& _days, const std::string& _date) : days(_days), date(_date) {}
+
+ void encode(bufferlist& bl) const {
+ ENCODE_START(3, 2, bl);
+ encode(days, bl);
+ encode(date, bl);
+ ENCODE_FINISH(bl);
+ }
+ void decode(bufferlist::const_iterator& bl) {
+ DECODE_START_LEGACY_COMPAT_LEN(3, 2, 2, bl);
+ decode(days, bl);
+ if (struct_v >= 3) {
+ decode(date, bl);
+ }
+ DECODE_FINISH(bl);
+ }
+ void dump(Formatter *f) const;
+// static void generate_test_instances(list<ACLOwner*>& o);
+ void set_days(const std::string& _days) { days = _days; }
+ std::string get_days_str() const {
+ return days;
+ }
+ int get_days() const {return atoi(days.c_str()); }
+ bool has_days() const {
+ return !days.empty();
+ }
+ void set_date(const std::string& _date) { date = _date; }
+ std::string get_date() const {
+ return date;
+ }
+ bool has_date() const {
+ return !date.empty();
+ }
+ bool empty() const {
+ return days.empty() && date.empty();
+ }
+ bool valid() const {
+ if (!days.empty() && !date.empty()) {
+ return false;
+ } else if (!days.empty() && get_days() <= 0) {
+ return false;
+ }
+ //We've checked date in xml parsing
+ return true;
+ }
+};
+WRITE_CLASS_ENCODER(LCExpiration)
+
+class LCTransition
+{
+protected:
+ std::string days;
+ std::string date;
+ std::string storage_class;
+
+public:
+ int get_days() const {
+ return atoi(days.c_str());
+ }
+
+ std::string get_date() const {
+ return date;
+ }
+
+ std::string get_storage_class() const {
+ return storage_class;
+ }
+
+ bool has_days() const {
+ return !days.empty();
+ }
+
+ bool has_date() const {
+ return !date.empty();
+ }
+
+ bool empty() const {
+ return days.empty() && date.empty();
+ }
+
+ bool valid() const {
+ if (!days.empty() && !date.empty()) {
+ return false;
+ } else if (!days.empty() && get_days() < 0) {
+ return false;
+ }
+ //We've checked date in xml parsing
+ return true;
+ }
+
+ void encode(bufferlist& bl) const {
+ ENCODE_START(1, 1, bl);
+ encode(days, bl);
+ encode(date, bl);
+ encode(storage_class, bl);
+ ENCODE_FINISH(bl);
+ }
+
+ void decode(bufferlist::const_iterator& bl) {
+ DECODE_START(1, bl);
+ decode(days, bl);
+ decode(date, bl);
+ decode(storage_class, bl);
+ DECODE_FINISH(bl);
+ }
+ void dump(Formatter *f) const {
+ f->dump_string("days", days);
+ f->dump_string("date", date);
+ f->dump_string("storage_class", storage_class);
+ }
+};
+WRITE_CLASS_ENCODER(LCTransition)
+
+enum class LCFlagType : uint16_t
+{
+ none = 0,
+ ArchiveZone,
+};
+
+class LCFlag {
+public:
+ LCFlagType bit;
+ const char* name;
+
+ constexpr LCFlag(LCFlagType ord, const char* name) : bit(ord), name(name)
+ {}
+};
+
+class LCFilter
+{
+ public:
+
+ static constexpr uint32_t make_flag(LCFlagType type) {
+ switch (type) {
+ case LCFlagType::none:
+ return 0;
+ break;
+ default:
+ return 1 << (uint32_t(type) - 1);
+ }
+ }
+
+ static constexpr std::array<LCFlag, 2> filter_flags =
+ {
+ LCFlag(LCFlagType::none, "none"),
+ LCFlag(LCFlagType::ArchiveZone, "ArchiveZone"),
+ };
+
+protected:
+ std::string prefix;
+ RGWObjTags obj_tags;
+ uint32_t flags;
+
+public:
+
+ LCFilter() : flags(make_flag(LCFlagType::none))
+ {}
+
+ const std::string& get_prefix() const {
+ return prefix;
+ }
+
+ const RGWObjTags& get_tags() const {
+ return obj_tags;
+ }
+
+ const uint32_t get_flags() const {
+ return flags;
+ }
+
+ bool empty() const {
+ return !(has_prefix() || has_tags() || has_flags());
+ }
+
+ // Determine if we need AND tag when creating xml
+ bool has_multi_condition() const {
+ if (obj_tags.count() + int(has_prefix()) + int(has_flags()) > 1) // Prefix is a member of Filter
+ return true;
+ return false;
+ }
+
+ bool has_prefix() const {
+ return !prefix.empty();
+ }
+
+ bool has_tags() const {
+ return !obj_tags.empty();
+ }
+
+ bool has_flags() const {
+ return !(flags == uint32_t(LCFlagType::none));
+ }
+
+ bool have_flag(LCFlagType flag) const {
+ return flags & make_flag(flag);
+ }
+
+ void encode(bufferlist& bl) const {
+ ENCODE_START(3, 1, bl);
+ encode(prefix, bl);
+ encode(obj_tags, bl);
+ encode(flags, bl);
+ ENCODE_FINISH(bl);
+ }
+ void decode(bufferlist::const_iterator& bl) {
+ DECODE_START(3, bl);
+ decode(prefix, bl);
+ if (struct_v >= 2) {
+ decode(obj_tags, bl);
+ if (struct_v >= 3) {
+ decode(flags, bl);
+ }
+ }
+ DECODE_FINISH(bl);
+ }
+ void dump(Formatter *f) const;
+};
+WRITE_CLASS_ENCODER(LCFilter)
+
+class LCRule
+{
+protected:
+ std::string id;
+ std::string prefix;
+ std::string status;
+ LCExpiration expiration;
+ LCExpiration noncur_expiration;
+ LCExpiration mp_expiration;
+ LCFilter filter;
+ std::map<std::string, LCTransition> transitions;
+ std::map<std::string, LCTransition> noncur_transitions;
+ bool dm_expiration = false;
+
+public:
+
+ LCRule(){};
+ virtual ~LCRule() {}
+
+ const std::string& get_id() const {
+ return id;
+ }
+
+ const std::string& get_status() const {
+ return status;
+ }
+
+ bool is_enabled() const {
+ return status == "Enabled";
+ }
+
+ void set_enabled(bool flag) {
+ status = (flag ? "Enabled" : "Disabled");
+ }
+
+ const std::string& get_prefix() const {
+ return prefix;
+ }
+
+ const LCFilter& get_filter() const {
+ return filter;
+ }
+
+ const LCExpiration& get_expiration() const {
+ return expiration;
+ }
+
+ const LCExpiration& get_noncur_expiration() const {
+ return noncur_expiration;
+ }
+
+ const LCExpiration& get_mp_expiration() const {
+ return mp_expiration;
+ }
+
+ bool get_dm_expiration() const {
+ return dm_expiration;
+ }
+
+ const std::map<std::string, LCTransition>& get_transitions() const {
+ return transitions;
+ }
+
+ const std::map<std::string, LCTransition>& get_noncur_transitions() const {
+ return noncur_transitions;
+ }
+
+ void set_id(const std::string& _id) {
+ id = _id;
+ }
+
+ void set_prefix(const std::string& _prefix) {
+ prefix = _prefix;
+ }
+
+ void set_status(const std::string& _status) {
+ status = _status;
+ }
+
+ void set_expiration(const LCExpiration& _expiration) {
+ expiration = _expiration;
+ }
+
+ void set_noncur_expiration(const LCExpiration& _noncur_expiration) {
+ noncur_expiration = _noncur_expiration;
+ }
+
+ void set_mp_expiration(const LCExpiration& _mp_expiration) {
+ mp_expiration = _mp_expiration;
+ }
+
+ void set_dm_expiration(bool _dm_expiration) {
+ dm_expiration = _dm_expiration;
+ }
+
+ bool add_transition(const LCTransition& _transition) {
+ auto ret = transitions.emplace(_transition.get_storage_class(), _transition);
+ return ret.second;
+ }
+
+ bool add_noncur_transition(const LCTransition& _noncur_transition) {
+ auto ret = noncur_transitions.emplace(_noncur_transition.get_storage_class(), _noncur_transition);
+ return ret.second;
+ }
+
+ bool valid() const;
+
+ void encode(bufferlist& bl) const {
+ ENCODE_START(6, 1, bl);
+ encode(id, bl);
+ encode(prefix, bl);
+ encode(status, bl);
+ encode(expiration, bl);
+ encode(noncur_expiration, bl);
+ encode(mp_expiration, bl);
+ encode(dm_expiration, bl);
+ encode(filter, bl);
+ encode(transitions, bl);
+ encode(noncur_transitions, bl);
+ ENCODE_FINISH(bl);
+ }
+ void decode(bufferlist::const_iterator& bl) {
+ DECODE_START_LEGACY_COMPAT_LEN(6, 1, 1, bl);
+ decode(id, bl);
+ decode(prefix, bl);
+ decode(status, bl);
+ decode(expiration, bl);
+ if (struct_v >=2) {
+ decode(noncur_expiration, bl);
+ }
+ if (struct_v >= 3) {
+ decode(mp_expiration, bl);
+ }
+ if (struct_v >= 4) {
+ decode(dm_expiration, bl);
+ }
+ if (struct_v >= 5) {
+ decode(filter, bl);
+ }
+ if (struct_v >= 6) {
+ decode(transitions, bl);
+ decode(noncur_transitions, bl);
+ }
+ DECODE_FINISH(bl);
+ }
+ void dump(Formatter *f) const;
+
+ void init_simple_days_rule(std::string_view _id, std::string_view _prefix, int num_days);
+};
+WRITE_CLASS_ENCODER(LCRule)
+
+struct transition_action
+{
+ int days;
+ boost::optional<ceph::real_time> date;
+ std::string storage_class;
+ transition_action() : days(0) {}
+ void dump(Formatter *f) const {
+ if (!date) {
+ f->dump_int("days", days);
+ } else {
+ utime_t ut(*date);
+ f->dump_stream("date") << ut;
+ }
+ }
+};
+
+/* XXX why not LCRule? */
+struct lc_op
+{
+ std::string id;
+ bool status{false};
+ bool dm_expiration{false};
+ int expiration{0};
+ int noncur_expiration{0};
+ int mp_expiration{0};
+ boost::optional<ceph::real_time> expiration_date;
+ boost::optional<RGWObjTags> obj_tags;
+ std::map<std::string, transition_action> transitions;
+ std::map<std::string, transition_action> noncur_transitions;
+ uint32_t rule_flags;
+
+ /* ctors are nice */
+ lc_op() = delete;
+
+ lc_op(const std::string id) : id(id)
+ {}
+
+ void dump(Formatter *f) const;
+};
+
+class RGWLifecycleConfiguration
+{
+protected:
+ CephContext *cct;
+ std::multimap<std::string, lc_op> prefix_map;
+ std::multimap<std::string, LCRule> rule_map;
+ bool _add_rule(const LCRule& rule);
+ bool has_same_action(const lc_op& first, const lc_op& second);
+public:
+ explicit RGWLifecycleConfiguration(CephContext *_cct) : cct(_cct) {}
+ RGWLifecycleConfiguration() : cct(NULL) {}
+
+ void set_ctx(CephContext *ctx) {
+ cct = ctx;
+ }
+
+ virtual ~RGWLifecycleConfiguration() {}
+
+// int get_perm(std::string& id, int perm_mask);
+// int get_group_perm(ACLGroupTypeEnum group, int perm_mask);
+ void encode(bufferlist& bl) const {
+ ENCODE_START(1, 1, bl);
+ encode(rule_map, bl);
+ ENCODE_FINISH(bl);
+ }
+ void decode(bufferlist::const_iterator& bl) {
+ DECODE_START_LEGACY_COMPAT_LEN(1, 1, 1, bl);
+ decode(rule_map, bl);
+ std::multimap<std::string, LCRule>::iterator iter;
+ for (iter = rule_map.begin(); iter != rule_map.end(); ++iter) {
+ LCRule& rule = iter->second;
+ _add_rule(rule);
+ }
+ DECODE_FINISH(bl);
+ }
+ void dump(Formatter *f) const;
+ static void generate_test_instances(std::list<RGWLifecycleConfiguration*>& o);
+
+ void add_rule(const LCRule& rule);
+
+ int check_and_add_rule(const LCRule& rule);
+
+ bool valid();
+
+ std::multimap<std::string, LCRule>& get_rule_map() { return rule_map; }
+ std::multimap<std::string, lc_op>& get_prefix_map() { return prefix_map; }
+/*
+ void create_default(std::string id, std::string name) {
+ ACLGrant grant;
+ grant.set_canon(id, name, RGW_PERM_FULL_CONTROL);
+ add_grant(&grant);
+ }
+*/
+};
+WRITE_CLASS_ENCODER(RGWLifecycleConfiguration)
+
+class RGWLC : public DoutPrefixProvider {
+ CephContext *cct;
+ rgw::sal::Driver* driver;
+ std::unique_ptr<rgw::sal::Lifecycle> sal_lc;
+ int max_objs{0};
+ std::string *obj_names{nullptr};
+ std::atomic<bool> down_flag = { false };
+ std::string cookie;
+
+public:
+
+ class WorkPool;
+
+ class LCWorker : public Thread
+ {
+ const DoutPrefixProvider *dpp;
+ CephContext *cct;
+ RGWLC *lc;
+ int ix;
+ std::mutex lock;
+ std::condition_variable cond;
+ WorkPool* workpool{nullptr};
+ /* save the target bucket names created as part of object transition
+ * to cloud. This list is maintained for the duration of each RGWLC::process()
+ * post which it is discarded. */
+ std::set<std::string> cloud_targets;
+
+ public:
+
+ using lock_guard = std::lock_guard<std::mutex>;
+ using unique_lock = std::unique_lock<std::mutex>;
+
+ LCWorker(const DoutPrefixProvider* dpp, CephContext *_cct, RGWLC *_lc,
+ int ix);
+ RGWLC* get_lc() { return lc; }
+
+ std::string thr_name() {
+ return std::string{"lc_thrd: "} + std::to_string(ix);
+ }
+
+ void *entry() override;
+ void stop();
+ bool should_work(utime_t& now);
+ int schedule_next_start_time(utime_t& start, utime_t& now);
+ std::set<std::string>& get_cloud_targets() { return cloud_targets; }
+ virtual ~LCWorker() override;
+
+ friend class RGWRados;
+ friend class RGWLC;
+ friend class WorkQ;
+ }; /* LCWorker */
+
+ friend class RGWRados;
+
+ std::vector<std::unique_ptr<RGWLC::LCWorker>> workers;
+
+ RGWLC() : cct(nullptr), driver(nullptr) {}
+ virtual ~RGWLC() override;
+
+ void initialize(CephContext *_cct, rgw::sal::Driver* _driver);
+ void finalize();
+
+ int process(LCWorker* worker,
+ const std::unique_ptr<rgw::sal::Bucket>& optional_bucket,
+ bool once);
+ int advance_head(const std::string& lc_shard,
+ rgw::sal::Lifecycle::LCHead& head,
+ rgw::sal::Lifecycle::LCEntry& entry,
+ time_t start_date);
+ int process(int index, int max_lock_secs, LCWorker* worker, bool once);
+ int process_bucket(int index, int max_lock_secs, LCWorker* worker,
+ const std::string& bucket_entry_marker, bool once);
+ bool expired_session(time_t started);
+ time_t thread_stop_at();
+ int list_lc_progress(std::string& marker, uint32_t max_entries,
+ std::vector<std::unique_ptr<rgw::sal::Lifecycle::LCEntry>>&,
+ int& index);
+ int bucket_lc_process(std::string& shard_id, LCWorker* worker, time_t stop_at,
+ bool once);
+ int bucket_lc_post(int index, int max_lock_sec,
+ rgw::sal::Lifecycle::LCEntry& entry, int& result, LCWorker* worker);
+ bool going_down();
+ void start_processor();
+ void stop_processor();
+ int set_bucket_config(rgw::sal::Bucket* bucket,
+ const rgw::sal::Attrs& bucket_attrs,
+ RGWLifecycleConfiguration *config);
+ int remove_bucket_config(rgw::sal::Bucket* bucket,
+ const rgw::sal::Attrs& bucket_attrs,
+ bool merge_attrs = true);
+
+ CephContext *get_cct() const override { return cct; }
+ rgw::sal::Lifecycle* get_lc() const { return sal_lc.get(); }
+ unsigned get_subsys() const;
+ std::ostream& gen_prefix(std::ostream& out) const;
+
+ private:
+
+ int handle_multipart_expiration(rgw::sal::Bucket* target,
+ const std::multimap<std::string, lc_op>& prefix_map,
+ LCWorker* worker, time_t stop_at, bool once);
+};
+
+namespace rgw::lc {
+
+int fix_lc_shard_entry(const DoutPrefixProvider *dpp,
+ rgw::sal::Driver* driver,
+ rgw::sal::Lifecycle* sal_lc,
+ rgw::sal::Bucket* bucket);
+
+std::string s3_expiration_header(
+ DoutPrefixProvider* dpp,
+ const rgw_obj_key& obj_key,
+ const RGWObjTags& obj_tagset,
+ const ceph::real_time& mtime,
+ const std::map<std::string, buffer::list>& bucket_attrs);
+
+bool s3_multipart_abort_header(
+ DoutPrefixProvider* dpp,
+ const rgw_obj_key& obj_key,
+ const ceph::real_time& mtime,
+ const std::map<std::string, buffer::list>& bucket_attrs,
+ ceph::real_time& abort_date,
+ std::string& rule_id);
+
+} // namespace rgw::lc