diff options
Diffstat (limited to '')
-rw-r--r-- | src/rgw/rgw_bucket.h | 575 |
1 files changed, 575 insertions, 0 deletions
diff --git a/src/rgw/rgw_bucket.h b/src/rgw/rgw_bucket.h new file mode 100644 index 00000000..11623b85 --- /dev/null +++ b/src/rgw/rgw_bucket.h @@ -0,0 +1,575 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#ifndef CEPH_RGW_BUCKET_H +#define CEPH_RGW_BUCKET_H + +#include <string> +#include <memory> + +#include "include/types.h" +#include "rgw_common.h" +#include "rgw_tools.h" + +#include "rgw_rados.h" + +#include "rgw_string.h" + +#include "common/Formatter.h" +#include "common/lru_map.h" +#include "common/ceph_time.h" +#include "rgw_formats.h" + + +// define as static when RGWBucket implementation completes +extern void rgw_get_buckets_obj(const rgw_user& user_id, string& buckets_obj_id); + +extern int rgw_bucket_store_info(RGWRados *store, const string& bucket_name, bufferlist& bl, bool exclusive, + map<string, bufferlist> *pattrs, RGWObjVersionTracker *objv_tracker, + real_time mtime); +extern int rgw_bucket_instance_store_info(RGWRados *store, string& oid, bufferlist& bl, bool exclusive, + map<string, bufferlist> *pattrs, RGWObjVersionTracker *objv_tracker, + real_time mtime); + +extern int rgw_bucket_parse_bucket_instance(const string& bucket_instance, string *target_bucket_instance, int *shard_id); +extern int rgw_bucket_parse_bucket_key(CephContext *cct, const string& key, + rgw_bucket* bucket, int *shard_id); + +extern int rgw_bucket_instance_remove_entry(RGWRados *store, const string& entry, + RGWObjVersionTracker *objv_tracker); +extern void rgw_bucket_instance_key_to_oid(string& key); +extern void rgw_bucket_instance_oid_to_key(string& oid); + +extern int rgw_bucket_delete_bucket_obj(RGWRados *store, + const string& tenant_name, + const string& bucket_name, + RGWObjVersionTracker& objv_tracker); + +extern int rgw_bucket_sync_user_stats(RGWRados *store, const rgw_user& user_id, const RGWBucketInfo& bucket_info); +extern int rgw_bucket_sync_user_stats(RGWRados *store, const string& tenant_name, const string& bucket_name); + +extern std::string rgw_make_bucket_entry_name(const std::string& tenant_name, + const std::string& bucket_name); +static inline void rgw_make_bucket_entry_name(const string& tenant_name, + const string& bucket_name, + std::string& bucket_entry) { + bucket_entry = rgw_make_bucket_entry_name(tenant_name, bucket_name); +} + +extern void rgw_parse_url_bucket(const string& bucket, + const string& auth_tenant, + string &tenant_name, string &bucket_name); + +struct RGWBucketCompleteInfo { + RGWBucketInfo info; + map<string, bufferlist> attrs; + + void dump(Formatter *f) const; + void decode_json(JSONObj *obj); +}; + +class RGWBucketEntryMetadataObject : public RGWMetadataObject { + RGWBucketEntryPoint ep; +public: + RGWBucketEntryMetadataObject(RGWBucketEntryPoint& _ep, obj_version& v, real_time m) : ep(_ep) { + objv = v; + mtime = m; + } + + void dump(Formatter *f) const override { + ep.dump(f); + } +}; + +class RGWBucketInstanceMetadataObject : public RGWMetadataObject { + RGWBucketCompleteInfo info; +public: + RGWBucketInstanceMetadataObject() {} + RGWBucketInstanceMetadataObject(RGWBucketCompleteInfo& i, obj_version& v, real_time m) : info(i) { + objv = v; + mtime = m; + } + + void dump(Formatter *f) const override { + info.dump(f); + } + + void decode_json(JSONObj *obj) { + info.decode_json(obj); + } + + RGWBucketInfo& get_bucket_info() { return info.info; } +}; + +/** + * Store a list of the user's buckets, with associated functinos. + */ +class RGWUserBuckets +{ + std::map<std::string, RGWBucketEnt> buckets; + +public: + RGWUserBuckets() = default; + RGWUserBuckets(RGWUserBuckets&&) = default; + + RGWUserBuckets& operator=(const RGWUserBuckets&) = default; + + void encode(bufferlist& bl) const { + using ceph::encode; + encode(buckets, bl); + } + void decode(bufferlist::const_iterator& bl) { + using ceph::decode; + decode(buckets, bl); + } + /** + * Check if the user owns a bucket by the given name. + */ + bool owns(string& name) { + map<string, RGWBucketEnt>::iterator iter; + iter = buckets.find(name); + return (iter != buckets.end()); + } + + /** + * Add a (created) bucket to the user's bucket list. + */ + void add(const RGWBucketEnt& bucket) { + buckets[bucket.bucket.name] = bucket; + } + + /** + * Remove a bucket from the user's list by name. + */ + void remove(const string& name) { + map<string, RGWBucketEnt>::iterator iter; + iter = buckets.find(name); + if (iter != buckets.end()) { + buckets.erase(iter); + } + } + + /** + * Get the user's buckets as a map. + */ + map<string, RGWBucketEnt>& get_buckets() { return buckets; } + + /** + * Cleanup data structure + */ + void clear() { buckets.clear(); } + + size_t count() { return buckets.size(); } +}; +WRITE_CLASS_ENCODER(RGWUserBuckets) + +class RGWMetadataManager; +class RGWMetadataHandler; + +class RGWBucketMetaHandlerAllocator { +public: + static RGWMetadataHandler *alloc(); +}; + +class RGWBucketInstanceMetaHandlerAllocator { +public: + static RGWMetadataHandler *alloc(); +}; + +class RGWArchiveBucketMetaHandlerAllocator { +public: + static RGWMetadataHandler *alloc(); +}; + +class RGWArchiveBucketInstanceMetaHandlerAllocator { +public: + static RGWMetadataHandler *alloc(); +}; + +extern void rgw_bucket_init(RGWMetadataManager *mm); +/** + * Get all the buckets owned by a user and fill up an RGWUserBuckets with them. + * Returns: 0 on success, -ERR# on failure. + */ +extern int rgw_read_user_buckets(RGWRados *store, + const rgw_user& user_id, + RGWUserBuckets& buckets, + const string& marker, + const string& end_marker, + uint64_t max, + bool need_stats, + bool* is_truncated, + uint64_t default_amount = 1000); + +extern int rgw_link_bucket(RGWRados* store, + const rgw_user& user_id, + rgw_bucket& bucket, + ceph::real_time creation_time, + bool update_entrypoint = true); +extern int rgw_unlink_bucket(RGWRados *store, const rgw_user& user_id, + const string& tenant_name, const string& bucket_name, bool update_entrypoint = true); + +extern int rgw_remove_object(RGWRados *store, const RGWBucketInfo& bucket_info, const rgw_bucket& bucket, rgw_obj_key& key); +extern int rgw_remove_bucket(RGWRados *store, rgw_bucket& bucket, bool delete_children); +extern int rgw_remove_bucket_bypass_gc(RGWRados *store, rgw_bucket& bucket, int concurrent_max); + +extern int rgw_bucket_set_attrs(RGWRados *store, RGWBucketInfo& bucket_info, + map<string, bufferlist>& attrs, + RGWObjVersionTracker *objv_tracker); +extern int rgw_object_get_attr(RGWRados* store, const RGWBucketInfo& bucket_info, + const rgw_obj& obj, const char* attr_name, + bufferlist& out_bl); + +extern void check_bad_user_bucket_mapping(RGWRados *store, const rgw_user& user_id, bool fix); + +struct RGWBucketAdminOpState { + rgw_user uid; + std::string display_name; + std::string bucket_name; + std::string bucket_id; + std::string object_name; + + bool list_buckets; + bool stat_buckets; + bool check_objects; + bool fix_index; + bool delete_child_objects; + bool bucket_stored; + int max_aio = 0; + + rgw_bucket bucket; + + RGWQuotaInfo quota; + + void set_fetch_stats(bool value) { stat_buckets = value; } + void set_check_objects(bool value) { check_objects = value; } + void set_fix_index(bool value) { fix_index = value; } + void set_delete_children(bool value) { delete_child_objects = value; } + + void set_max_aio(int value) { max_aio = value; } + + void set_user_id(const rgw_user& user_id) { + if (!user_id.empty()) + uid = user_id; + } + void set_tenant(const std::string& tenant_str) { + uid.tenant = tenant_str; + } + void set_bucket_name(const std::string& bucket_str) { + bucket_name = bucket_str; + } + void set_object(std::string& object_str) { + object_name = object_str; + } + void set_quota(RGWQuotaInfo& value) { + quota = value; + } + + + rgw_user& get_user_id() { return uid; } + std::string& get_user_display_name() { return display_name; } + std::string& get_bucket_name() { return bucket_name; } + std::string& get_object_name() { return object_name; } + std::string& get_tenant() { return uid.tenant; } + + rgw_bucket& get_bucket() { return bucket; } + void set_bucket(rgw_bucket& _bucket) { + bucket = _bucket; + bucket_stored = true; + } + + void set_bucket_id(const string& bi) { + bucket_id = bi; + } + const string& get_bucket_id() { return bucket_id; } + + bool will_fetch_stats() { return stat_buckets; } + bool will_fix_index() { return fix_index; } + bool will_delete_children() { return delete_child_objects; } + bool will_check_objects() { return check_objects; } + bool is_user_op() { return !uid.empty(); } + bool is_system_op() { return uid.empty(); } + bool has_bucket_stored() { return bucket_stored; } + int get_max_aio() { return max_aio; } + + RGWBucketAdminOpState() : list_buckets(false), stat_buckets(false), check_objects(false), + fix_index(false), delete_child_objects(false), + bucket_stored(false) {} +}; + +/* + * A simple wrapper class for administrative bucket operations + */ + +class RGWBucket +{ + RGWUserBuckets buckets; + RGWRados *store; + RGWAccessHandle handle; + + RGWUserInfo user_info; + std::string tenant; + std::string bucket_name; + + bool failure; + + RGWBucketInfo bucket_info; + +public: + RGWBucket() : store(NULL), handle(NULL), failure(false) {} + int init(RGWRados *storage, RGWBucketAdminOpState& op_state); + + int check_bad_index_multipart(RGWBucketAdminOpState& op_state, + RGWFormatterFlusher& flusher, std::string *err_msg = NULL); + + int check_object_index(RGWBucketAdminOpState& op_state, + RGWFormatterFlusher& flusher, + std::string *err_msg = NULL); + + int check_index(RGWBucketAdminOpState& op_state, + map<RGWObjCategory, RGWStorageStats>& existing_stats, + map<RGWObjCategory, RGWStorageStats>& calculated_stats, + std::string *err_msg = NULL); + + int remove(RGWBucketAdminOpState& op_state, bool bypass_gc = false, bool keep_index_consistent = true, std::string *err_msg = NULL); + int link(RGWBucketAdminOpState& op_state, std::string *err_msg = NULL); + int unlink(RGWBucketAdminOpState& op_state, std::string *err_msg = NULL); + int set_quota(RGWBucketAdminOpState& op_state, std::string *err_msg = NULL); + + int remove_object(RGWBucketAdminOpState& op_state, std::string *err_msg = NULL); + int policy_bl_to_stream(bufferlist& bl, ostream& o); + int get_policy(RGWBucketAdminOpState& op_state, RGWAccessControlPolicy& policy); + + void clear_failure() { failure = false; } + + const RGWBucketInfo& get_bucket_info() const { return bucket_info; } +}; + +class RGWBucketAdminOp +{ +public: + static int get_policy(RGWRados *store, RGWBucketAdminOpState& op_state, + RGWFormatterFlusher& flusher); + static int get_policy(RGWRados *store, RGWBucketAdminOpState& op_state, + RGWAccessControlPolicy& policy); + static int dump_s3_policy(RGWRados *store, RGWBucketAdminOpState& op_state, + ostream& os); + + static int unlink(RGWRados *store, RGWBucketAdminOpState& op_state); + static int link(RGWRados *store, RGWBucketAdminOpState& op_state, string *err_msg = NULL); + + static int check_index(RGWRados *store, RGWBucketAdminOpState& op_state, + RGWFormatterFlusher& flusher); + + static int remove_bucket(RGWRados *store, RGWBucketAdminOpState& op_state, bool bypass_gc = false, bool keep_index_consistent = true); + static int remove_object(RGWRados *store, RGWBucketAdminOpState& op_state); + static int info(RGWRados *store, RGWBucketAdminOpState& op_state, RGWFormatterFlusher& flusher); + static int limit_check(RGWRados *store, RGWBucketAdminOpState& op_state, + const std::list<std::string>& user_ids, + RGWFormatterFlusher& flusher, + bool warnings_only = false); + static int set_quota(RGWRados *store, RGWBucketAdminOpState& op_state); + + static int list_stale_instances(RGWRados *store, RGWBucketAdminOpState& op_state, + RGWFormatterFlusher& flusher); + + static int clear_stale_instances(RGWRados *store, RGWBucketAdminOpState& op_state, + RGWFormatterFlusher& flusher); + static int fix_lc_shards(RGWRados *store, RGWBucketAdminOpState& op_state, + RGWFormatterFlusher& flusher); + static int fix_obj_expiry(RGWRados *store, RGWBucketAdminOpState& op_state, + RGWFormatterFlusher& flusher, bool dry_run = false); +}; + + +enum DataLogEntityType { + ENTITY_TYPE_UNKNOWN = 0, + ENTITY_TYPE_BUCKET = 1, +}; + +struct rgw_data_change { + DataLogEntityType entity_type; + string key; + real_time timestamp; + + void encode(bufferlist& bl) const { + ENCODE_START(1, 1, bl); + uint8_t t = (uint8_t)entity_type; + encode(t, bl); + encode(key, bl); + encode(timestamp, bl); + ENCODE_FINISH(bl); + } + + void decode(bufferlist::const_iterator& bl) { + DECODE_START(1, bl); + uint8_t t; + decode(t, bl); + entity_type = (DataLogEntityType)t; + decode(key, bl); + decode(timestamp, bl); + DECODE_FINISH(bl); + } + + void dump(Formatter *f) const; + void decode_json(JSONObj *obj); +}; +WRITE_CLASS_ENCODER(rgw_data_change) + +struct rgw_data_change_log_entry { + string log_id; + real_time log_timestamp; + rgw_data_change entry; + + void encode(bufferlist& bl) const { + ENCODE_START(1, 1, bl); + encode(log_id, bl); + encode(log_timestamp, bl); + encode(entry, bl); + ENCODE_FINISH(bl); + } + + void decode(bufferlist::const_iterator& bl) { + DECODE_START(1, bl); + decode(log_id, bl); + decode(log_timestamp, bl); + decode(entry, bl); + DECODE_FINISH(bl); + } + + void dump(Formatter *f) const; + void decode_json(JSONObj *obj); +}; +WRITE_CLASS_ENCODER(rgw_data_change_log_entry) + +struct RGWDataChangesLogInfo { + string marker; + real_time last_update; + + void dump(Formatter *f) const; + void decode_json(JSONObj *obj); +}; + +namespace rgw { +struct BucketChangeObserver; +} + +class RGWDataChangesLog { + CephContext *cct; + RGWRados *store; + rgw::BucketChangeObserver *observer = nullptr; + + int num_shards; + string *oids; + + Mutex lock; + RWLock modified_lock; + map<int, set<string> > modified_shards; + + std::atomic<bool> down_flag = { false }; + + struct ChangeStatus { + real_time cur_expiration; + real_time cur_sent; + bool pending; + RefCountedCond *cond; + Mutex *lock; + + ChangeStatus() : pending(false), cond(NULL) { + lock = new Mutex("RGWDataChangesLog::ChangeStatus"); + } + + ~ChangeStatus() { + delete lock; + } + }; + + typedef std::shared_ptr<ChangeStatus> ChangeStatusPtr; + + lru_map<rgw_bucket_shard, ChangeStatusPtr> changes; + + map<rgw_bucket_shard, bool> cur_cycle; + + void _get_change(const rgw_bucket_shard& bs, ChangeStatusPtr& status); + void register_renew(rgw_bucket_shard& bs); + void update_renewed(rgw_bucket_shard& bs, real_time& expiration); + + class ChangesRenewThread : public Thread { + CephContext *cct; + RGWDataChangesLog *log; + Mutex lock; + Cond cond; + + public: + ChangesRenewThread(CephContext *_cct, RGWDataChangesLog *_log) : cct(_cct), log(_log), lock("ChangesRenewThread::lock") {} + void *entry() override; + void stop(); + }; + + ChangesRenewThread *renew_thread; + +public: + + RGWDataChangesLog(CephContext *_cct, RGWRados *_store) : cct(_cct), store(_store), + lock("RGWDataChangesLog::lock"), modified_lock("RGWDataChangesLog::modified_lock"), + changes(cct->_conf->rgw_data_log_changes_size) { + num_shards = cct->_conf->rgw_data_log_num_shards; + + oids = new string[num_shards]; + + string prefix = cct->_conf->rgw_data_log_obj_prefix; + + if (prefix.empty()) { + prefix = "data_log"; + } + + for (int i = 0; i < num_shards; i++) { + char buf[16]; + snprintf(buf, sizeof(buf), "%s.%d", prefix.c_str(), i); + oids[i] = buf; + } + + renew_thread = new ChangesRenewThread(cct, this); + renew_thread->create("rgw_dt_lg_renew"); + } + + ~RGWDataChangesLog(); + + int choose_oid(const rgw_bucket_shard& bs); + const std::string& get_oid(int shard_id) const { return oids[shard_id]; } + int add_entry(rgw_bucket& bucket, int shard_id); + int get_log_shard_id(rgw_bucket& bucket, int shard_id); + int renew_entries(); + int list_entries(int shard, const real_time& start_time, const real_time& end_time, int max_entries, + list<rgw_data_change_log_entry>& entries, + const string& marker, + string *out_marker, + bool *truncated); + int trim_entries(int shard_id, const real_time& start_time, const real_time& end_time, + const string& start_marker, const string& end_marker); + int get_info(int shard_id, RGWDataChangesLogInfo *info); + int lock_exclusive(int shard_id, timespan duration, string& zone_id, string& owner_id); + int unlock(int shard_id, string& zone_id, string& owner_id); + struct LogMarker { + int shard; + string marker; + + LogMarker() : shard(0) {} + }; + int list_entries(const real_time& start_time, const real_time& end_time, int max_entries, + list<rgw_data_change_log_entry>& entries, LogMarker& marker, bool *ptruncated); + + void mark_modified(int shard_id, const rgw_bucket_shard& bs); + void read_clear_modified(map<int, set<string> > &modified); + + void set_observer(rgw::BucketChangeObserver *observer) { + this->observer = observer; + } + + bool going_down(); +}; + +bool rgw_find_bucket_by_id(CephContext *cct, RGWMetadataManager *mgr, const string& marker, + const string& bucket_id, rgw_bucket* bucket_out); + +#endif |