summaryrefslogtreecommitdiffstats
path: root/src/rgw/rgw_orphan.cc
diff options
context:
space:
mode:
Diffstat (limited to 'src/rgw/rgw_orphan.cc')
-rw-r--r--src/rgw/rgw_orphan.cc1523
1 files changed, 1523 insertions, 0 deletions
diff --git a/src/rgw/rgw_orphan.cc b/src/rgw/rgw_orphan.cc
new file mode 100644
index 00000000..832076b7
--- /dev/null
+++ b/src/rgw/rgw_orphan.cc
@@ -0,0 +1,1523 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include <string>
+
+
+#include "common/config.h"
+#include "common/Formatter.h"
+#include "common/errno.h"
+
+#include "rgw_rados.h"
+#include "rgw_op.h"
+#include "rgw_multi.h"
+#include "rgw_orphan.h"
+#include "rgw_zone.h"
+#include "rgw_bucket.h"
+
+#include "services/svc_zone.h"
+#include "services/svc_sys_obj.h"
+
+#define dout_subsys ceph_subsys_rgw
+
+#define DEFAULT_NUM_SHARDS 64
+
+static string obj_fingerprint(const string& oid, const char *force_ns = NULL)
+{
+ ssize_t pos = oid.find('_');
+ if (pos < 0) {
+ cerr << "ERROR: object does not have a bucket marker: " << oid << std::endl;
+ }
+
+ string obj_marker = oid.substr(0, pos);
+
+ rgw_obj_key key;
+
+ rgw_obj_key::parse_raw_oid(oid.substr(pos + 1), &key);
+
+ if (key.ns.empty()) {
+ return oid;
+ }
+
+ string s = oid;
+
+ if (force_ns) {
+ rgw_bucket b;
+ rgw_obj new_obj(b, key);
+ s = obj_marker + "_" + new_obj.get_oid();
+ }
+
+ /* cut out suffix */
+ size_t i = s.size() - 1;
+ for (; i >= s.size() - 10; --i) {
+ char c = s[i];
+ if (!isdigit(c) && c != '.' && c != '_') {
+ break;
+ }
+ }
+
+ return s.substr(0, i + 1);
+}
+
+int RGWOrphanStore::read_job(const string& job_name, RGWOrphanSearchState & state)
+{
+ set<string> keys;
+ map<string, bufferlist> vals;
+ keys.insert(job_name);
+ int r = ioctx.omap_get_vals_by_keys(oid, keys, &vals);
+ if (r < 0) {
+ return r;
+ }
+
+ map<string, bufferlist>::iterator iter = vals.find(job_name);
+ if (iter == vals.end()) {
+ return -ENOENT;
+ }
+
+ try {
+ bufferlist& bl = iter->second;
+ decode(state, bl);
+ } catch (buffer::error& err) {
+ lderr(store->ctx()) << "ERROR: could not decode buffer" << dendl;
+ return -EIO;
+ }
+
+ return 0;
+}
+
+int RGWOrphanStore::write_job(const string& job_name, const RGWOrphanSearchState& state)
+{
+ map<string, bufferlist> vals;
+ bufferlist bl;
+ encode(state, bl);
+ vals[job_name] = bl;
+ int r = ioctx.omap_set(oid, vals);
+ if (r < 0) {
+ return r;
+ }
+
+ return 0;
+}
+
+int RGWOrphanStore::remove_job(const string& job_name)
+{
+ set<string> keys;
+ keys.insert(job_name);
+
+ int r = ioctx.omap_rm_keys(oid, keys);
+ if (r < 0) {
+ return r;
+ }
+
+ return 0;
+}
+
+int RGWOrphanStore::list_jobs(map <string,RGWOrphanSearchState>& job_list)
+{
+ map <string,bufferlist> vals;
+ int MAX_READ=1024;
+ string marker="";
+ int r = 0;
+
+ // loop through all the omap vals from index object, storing them to job_list,
+ // read in batches of 1024, we update the marker every iteration and exit the
+ // loop when we find that total size read out is less than batch size
+ do {
+ r = ioctx.omap_get_vals(oid, marker, MAX_READ, &vals);
+ if (r < 0) {
+ return r;
+ }
+ r = vals.size();
+
+ for (const auto &it : vals) {
+ marker=it.first;
+ RGWOrphanSearchState state;
+ try {
+ bufferlist bl = it.second;
+ decode(state, bl);
+ } catch (buffer::error& err) {
+ lderr(store->ctx()) << "ERROR: could not decode buffer" << dendl;
+ return -EIO;
+ }
+ job_list[it.first] = state;
+ }
+ } while (r == MAX_READ);
+
+ return 0;
+}
+
+int RGWOrphanStore::init()
+{
+ const rgw_pool& log_pool = store->svc.zone->get_zone_params().log_pool;
+ int r = rgw_init_ioctx(store->get_rados_handle(), log_pool, ioctx);
+ if (r < 0) {
+ cerr << "ERROR: failed to open log pool (" << log_pool << " ret=" << r << std::endl;
+ return r;
+ }
+
+ return 0;
+}
+
+int RGWOrphanStore::store_entries(const string& oid, const map<string, bufferlist>& entries)
+{
+ librados::ObjectWriteOperation op;
+ op.omap_set(entries);
+ cout << "storing " << entries.size() << " entries at " << oid << std::endl;
+ ldout(store->ctx(), 20) << "storing " << entries.size() << " entries at " << oid << ": " << dendl;
+ for (map<string, bufferlist>::const_iterator iter = entries.begin(); iter != entries.end(); ++iter) {
+ ldout(store->ctx(), 20) << " > " << iter->first << dendl;
+ }
+ int ret = ioctx.operate(oid, &op);
+ if (ret < 0) {
+ lderr(store->ctx()) << "ERROR: " << __func__ << "(" << oid << ") returned ret=" << ret << dendl;
+ }
+
+ return 0;
+}
+
+int RGWOrphanStore::read_entries(const string& oid, const string& marker, map<string, bufferlist> *entries, bool *truncated)
+{
+#define MAX_OMAP_GET 100
+ int ret = ioctx.omap_get_vals(oid, marker, MAX_OMAP_GET, entries);
+ if (ret < 0 && ret != -ENOENT) {
+ cerr << "ERROR: " << __func__ << "(" << oid << ") returned ret=" << cpp_strerror(-ret) << std::endl;
+ }
+
+ *truncated = (entries->size() == MAX_OMAP_GET);
+
+ return 0;
+}
+
+int RGWOrphanSearch::init(const string& job_name, RGWOrphanSearchInfo *info, bool _detailed_mode)
+{
+ int r = orphan_store.init();
+ if (r < 0) {
+ return r;
+ }
+
+ constexpr int64_t MAX_LIST_OBJS_ENTRIES=100;
+
+ max_list_bucket_entries = std::max(store->ctx()->_conf->rgw_list_bucket_min_readahead,
+ MAX_LIST_OBJS_ENTRIES);
+
+ detailed_mode = _detailed_mode;
+ RGWOrphanSearchState state;
+ r = orphan_store.read_job(job_name, state);
+ if (r < 0 && r != -ENOENT) {
+ lderr(store->ctx()) << "ERROR: failed to read state ret=" << r << dendl;
+ return r;
+ }
+
+ if (r == 0) {
+ search_info = state.info;
+ search_stage = state.stage;
+ } else if (info) { /* r == -ENOENT, initiate a new job if info was provided */
+ search_info = *info;
+ search_info.job_name = job_name;
+ search_info.num_shards = (info->num_shards ? info->num_shards : DEFAULT_NUM_SHARDS);
+ search_info.start_time = ceph_clock_now();
+ search_stage = RGWOrphanSearchStage(ORPHAN_SEARCH_STAGE_INIT);
+
+ r = save_state();
+ if (r < 0) {
+ lderr(store->ctx()) << "ERROR: failed to write state ret=" << r << dendl;
+ return r;
+ }
+ } else {
+ lderr(store->ctx()) << "ERROR: job not found" << dendl;
+ return r;
+ }
+
+ index_objs_prefix = RGW_ORPHAN_INDEX_PREFIX + string(".");
+ index_objs_prefix += job_name;
+
+ for (int i = 0; i < search_info.num_shards; i++) {
+ char buf[128];
+
+ snprintf(buf, sizeof(buf), "%s.rados.%d", index_objs_prefix.c_str(), i);
+ all_objs_index[i] = buf;
+
+ snprintf(buf, sizeof(buf), "%s.buckets.%d", index_objs_prefix.c_str(), i);
+ buckets_instance_index[i] = buf;
+
+ snprintf(buf, sizeof(buf), "%s.linked.%d", index_objs_prefix.c_str(), i);
+ linked_objs_index[i] = buf;
+ }
+ return 0;
+}
+
+int RGWOrphanSearch::log_oids(map<int, string>& log_shards, map<int, list<string> >& oids)
+{
+ map<int, list<string> >::iterator miter = oids.begin();
+
+ list<log_iter_info> liters; /* a list of iterator pairs for begin and end */
+
+ for (; miter != oids.end(); ++miter) {
+ log_iter_info info;
+ info.oid = log_shards[miter->first];
+ info.cur = miter->second.begin();
+ info.end = miter->second.end();
+ liters.push_back(info);
+ }
+
+ list<log_iter_info>::iterator list_iter;
+ while (!liters.empty()) {
+ list_iter = liters.begin();
+
+ while (list_iter != liters.end()) {
+ log_iter_info& cur_info = *list_iter;
+
+ list<string>::iterator& cur = cur_info.cur;
+ list<string>::iterator& end = cur_info.end;
+
+ map<string, bufferlist> entries;
+#define MAX_OMAP_SET_ENTRIES 100
+ for (int j = 0; cur != end && j != MAX_OMAP_SET_ENTRIES; ++cur, ++j) {
+ ldout(store->ctx(), 20) << "adding obj: " << *cur << dendl;
+ entries[*cur] = bufferlist();
+ }
+
+ int ret = orphan_store.store_entries(cur_info.oid, entries);
+ if (ret < 0) {
+ return ret;
+ }
+ list<log_iter_info>::iterator tmp = list_iter;
+ ++list_iter;
+ if (cur == end) {
+ liters.erase(tmp);
+ }
+ }
+ }
+ return 0;
+}
+
+int RGWOrphanSearch::build_all_oids_index()
+{
+ librados::IoCtx ioctx;
+
+ int ret = rgw_init_ioctx(store->get_rados_handle(), search_info.pool, ioctx);
+ if (ret < 0) {
+ lderr(store->ctx()) << __func__ << ": rgw_init_ioctx() returned ret=" << ret << dendl;
+ return ret;
+ }
+
+ ioctx.set_namespace(librados::all_nspaces);
+ librados::NObjectIterator i = ioctx.nobjects_begin();
+ librados::NObjectIterator i_end = ioctx.nobjects_end();
+
+ map<int, list<string> > oids;
+
+ int count = 0;
+ uint64_t total = 0;
+
+ cout << "logging all objects in the pool" << std::endl;
+
+ for (; i != i_end; ++i) {
+ string nspace = i->get_nspace();
+ string oid = i->get_oid();
+ string locator = i->get_locator();
+
+ ssize_t pos = oid.find('_');
+ if (pos < 0) {
+ cout << "unidentified oid: " << oid << ", skipping" << std::endl;
+ /* what is this object, oids should be in the format of <bucket marker>_<obj>,
+ * skip this entry
+ */
+ continue;
+ }
+ string stripped_oid = oid.substr(pos + 1);
+ rgw_obj_key key;
+ if (!rgw_obj_key::parse_raw_oid(stripped_oid, &key)) {
+ cout << "cannot parse oid: " << oid << ", skipping" << std::endl;
+ continue;
+ }
+
+ if (key.ns.empty()) {
+ /* skipping head objects, we don't want to remove these as they are mutable and
+ * cleaning them up is racy (can race with object removal and a later recreation)
+ */
+ cout << "skipping head object: oid=" << oid << std::endl;
+ continue;
+ }
+
+ string oid_fp = obj_fingerprint(oid);
+
+ ldout(store->ctx(), 20) << "oid_fp=" << oid_fp << dendl;
+
+ int shard = orphan_shard(oid_fp);
+ oids[shard].push_back(oid);
+
+#define COUNT_BEFORE_FLUSH 1000
+ ++total;
+ if (++count >= COUNT_BEFORE_FLUSH) {
+ ldout(store->ctx(), 1) << "iterated through " << total << " objects" << dendl;
+ ret = log_oids(all_objs_index, oids);
+ if (ret < 0) {
+ cerr << __func__ << ": ERROR: log_oids() returned ret=" << ret << std::endl;
+ return ret;
+ }
+ count = 0;
+ oids.clear();
+ }
+ }
+ ret = log_oids(all_objs_index, oids);
+ if (ret < 0) {
+ cerr << __func__ << ": ERROR: log_oids() returned ret=" << ret << std::endl;
+ return ret;
+ }
+
+ return 0;
+}
+
+int RGWOrphanSearch::build_buckets_instance_index()
+{
+ void *handle;
+ int max = 1000;
+ string section = "bucket.instance";
+ int ret = store->meta_mgr->list_keys_init(section, &handle);
+ if (ret < 0) {
+ lderr(store->ctx()) << "ERROR: can't get key: " << cpp_strerror(-ret) << dendl;
+ return ret;
+ }
+
+ map<int, list<string> > instances;
+
+ bool truncated;
+
+ RGWObjectCtx obj_ctx(store);
+
+ int count = 0;
+ uint64_t total = 0;
+
+ do {
+ list<string> keys;
+ ret = store->meta_mgr->list_keys_next(handle, max, keys, &truncated);
+ if (ret < 0) {
+ lderr(store->ctx()) << "ERROR: lists_keys_next(): " << cpp_strerror(-ret) << dendl;
+ return ret;
+ }
+
+ for (list<string>::iterator iter = keys.begin(); iter != keys.end(); ++iter) {
+ ++total;
+ ldout(store->ctx(), 10) << "bucket_instance=" << *iter << " total=" << total << dendl;
+ int shard = orphan_shard(*iter);
+ instances[shard].push_back(*iter);
+
+ if (++count >= COUNT_BEFORE_FLUSH) {
+ ret = log_oids(buckets_instance_index, instances);
+ if (ret < 0) {
+ lderr(store->ctx()) << __func__ << ": ERROR: log_oids() returned ret=" << ret << dendl;
+ return ret;
+ }
+ count = 0;
+ instances.clear();
+ }
+ }
+
+ } while (truncated);
+
+ ret = log_oids(buckets_instance_index, instances);
+ if (ret < 0) {
+ lderr(store->ctx()) << __func__ << ": ERROR: log_oids() returned ret=" << ret << dendl;
+ return ret;
+ }
+ store->meta_mgr->list_keys_complete(handle);
+
+ return 0;
+}
+
+int RGWOrphanSearch::handle_stat_result(map<int, list<string> >& oids, RGWRados::Object::Stat::Result& result)
+{
+ set<string> obj_oids;
+ rgw_bucket& bucket = result.obj.bucket;
+ if (!result.has_manifest) { /* a very very old object, or part of a multipart upload during upload */
+ const string loc = bucket.bucket_id + "_" + result.obj.get_oid();
+ obj_oids.insert(obj_fingerprint(loc));
+
+ /*
+ * multipart parts don't have manifest on them, it's in the meta object. Instead of reading the
+ * meta object, just add a "shadow" object to the mix
+ */
+ obj_oids.insert(obj_fingerprint(loc, "shadow"));
+ } else {
+ RGWObjManifest& manifest = result.manifest;
+
+ if (!detailed_mode &&
+ manifest.get_obj_size() <= manifest.get_head_size()) {
+ ldout(store->ctx(), 5) << "skipping object as it fits in a head" << dendl;
+ return 0;
+ }
+
+ RGWObjManifest::obj_iterator miter;
+ for (miter = manifest.obj_begin(); miter != manifest.obj_end(); ++miter) {
+ const rgw_raw_obj& loc = miter.get_location().get_raw_obj(store);
+ string s = loc.oid;
+ obj_oids.insert(obj_fingerprint(s));
+ }
+ }
+
+ for (set<string>::iterator iter = obj_oids.begin(); iter != obj_oids.end(); ++iter) {
+ ldout(store->ctx(), 20) << __func__ << ": oid for obj=" << result.obj << ": " << *iter << dendl;
+
+ int shard = orphan_shard(*iter);
+ oids[shard].push_back(*iter);
+ }
+
+ return 0;
+}
+
+int RGWOrphanSearch::pop_and_handle_stat_op(map<int, list<string> >& oids, std::deque<RGWRados::Object::Stat>& ops)
+{
+ RGWRados::Object::Stat& front_op = ops.front();
+
+ int ret = front_op.wait();
+ if (ret < 0) {
+ if (ret != -ENOENT) {
+ lderr(store->ctx()) << "ERROR: stat_async() returned error: " << cpp_strerror(-ret) << dendl;
+ }
+ goto done;
+ }
+ ret = handle_stat_result(oids, front_op.result);
+ if (ret < 0) {
+ lderr(store->ctx()) << "ERROR: handle_stat_response() returned error: " << cpp_strerror(-ret) << dendl;
+ }
+done:
+ ops.pop_front();
+ return ret;
+}
+
+int RGWOrphanSearch::build_linked_oids_for_bucket(const string& bucket_instance_id, map<int, list<string> >& oids)
+{
+ RGWObjectCtx obj_ctx(store);
+ auto sysobj_ctx = store->svc.sysobj->init_obj_ctx();
+
+ rgw_bucket orphan_bucket;
+ int shard_id;
+ int ret = rgw_bucket_parse_bucket_key(store->ctx(), bucket_instance_id,
+ &orphan_bucket, &shard_id);
+ if (ret < 0) {
+ ldout(store->ctx(),0) << __func__ << " failed to parse bucket instance: "
+ << bucket_instance_id << " skipping" << dendl;
+ return ret;
+ }
+
+ RGWBucketInfo cur_bucket_info;
+ ret = store->get_bucket_info(sysobj_ctx, orphan_bucket.tenant,
+ orphan_bucket.name, cur_bucket_info, nullptr);
+ if (ret < 0) {
+ if (ret == -ENOENT) {
+ /* probably raced with bucket removal */
+ return 0;
+ }
+ lderr(store->ctx()) << __func__ << ": ERROR: RGWRados::get_bucket_instance_info() returned ret=" << ret << dendl;
+ return ret;
+ }
+
+ if (cur_bucket_info.bucket.bucket_id != orphan_bucket.bucket_id) {
+ ldout(store->ctx(), 0) << __func__ << ": Skipping stale bucket instance: "
+ << orphan_bucket.name << ": "
+ << orphan_bucket.bucket_id << dendl;
+ return 0;
+ }
+
+ if (cur_bucket_info.reshard_status == CLS_RGW_RESHARD_IN_PROGRESS) {
+ ldout(store->ctx(), 0) << __func__ << ": reshard in progress. Skipping "
+ << orphan_bucket.name << ": "
+ << orphan_bucket.bucket_id << dendl;
+ return 0;
+ }
+
+ RGWBucketInfo bucket_info;
+ ret = store->get_bucket_instance_info(sysobj_ctx, bucket_instance_id, bucket_info, nullptr, nullptr);
+ if (ret < 0) {
+ if (ret == -ENOENT) {
+ /* probably raced with bucket removal */
+ return 0;
+ }
+ lderr(store->ctx()) << __func__ << ": ERROR: RGWRados::get_bucket_instance_info() returned ret=" << ret << dendl;
+ return ret;
+ }
+
+ ldout(store->ctx(), 10) << "building linked oids for bucket instance: " << bucket_instance_id << dendl;
+ RGWRados::Bucket target(store, bucket_info);
+ RGWRados::Bucket::List list_op(&target);
+
+ string marker;
+ list_op.params.marker = rgw_obj_key(marker);
+ list_op.params.list_versions = true;
+ list_op.params.enforce_ns = false;
+
+ bool truncated;
+
+ deque<RGWRados::Object::Stat> stat_ops;
+
+ do {
+ vector<rgw_bucket_dir_entry> result;
+
+ ret = list_op.list_objects(max_list_bucket_entries,
+ &result, nullptr, &truncated);
+ if (ret < 0) {
+ cerr << "ERROR: store->list_objects(): " << cpp_strerror(-ret) << std::endl;
+ return ret;
+ }
+
+ for (vector<rgw_bucket_dir_entry>::iterator iter = result.begin(); iter != result.end(); ++iter) {
+ rgw_bucket_dir_entry& entry = *iter;
+ if (entry.key.instance.empty()) {
+ ldout(store->ctx(), 20) << "obj entry: " << entry.key.name << dendl;
+ } else {
+ ldout(store->ctx(), 20) << "obj entry: " << entry.key.name << " [" << entry.key.instance << "]" << dendl;
+ }
+
+ ldout(store->ctx(), 20) << __func__ << ": entry.key.name=" << entry.key.name << " entry.key.instance=" << entry.key.instance << dendl;
+
+ if (!detailed_mode &&
+ entry.meta.accounted_size <= (uint64_t)store->ctx()->_conf->rgw_max_chunk_size) {
+ ldout(store->ctx(),5) << __func__ << "skipping stat as the object " << entry.key.name
+ << "fits in a head" << dendl;
+ continue;
+ }
+
+ rgw_obj obj(bucket_info.bucket, entry.key);
+
+ RGWRados::Object op_target(store, bucket_info, obj_ctx, obj);
+
+ stat_ops.push_back(RGWRados::Object::Stat(&op_target));
+ RGWRados::Object::Stat& op = stat_ops.back();
+
+
+ ret = op.stat_async();
+ if (ret < 0) {
+ lderr(store->ctx()) << "ERROR: stat_async() returned error: " << cpp_strerror(-ret) << dendl;
+ return ret;
+ }
+ if (stat_ops.size() >= max_concurrent_ios) {
+ ret = pop_and_handle_stat_op(oids, stat_ops);
+ if (ret < 0) {
+ if (ret != -ENOENT) {
+ lderr(store->ctx()) << "ERROR: stat_async() returned error: " << cpp_strerror(-ret) << dendl;
+ }
+ }
+ }
+ if (oids.size() >= COUNT_BEFORE_FLUSH) {
+ ret = log_oids(linked_objs_index, oids);
+ if (ret < 0) {
+ cerr << __func__ << ": ERROR: log_oids() returned ret=" << ret << std::endl;
+ return ret;
+ }
+ oids.clear();
+ }
+ }
+ } while (truncated);
+
+ while (!stat_ops.empty()) {
+ ret = pop_and_handle_stat_op(oids, stat_ops);
+ if (ret < 0) {
+ if (ret != -ENOENT) {
+ lderr(store->ctx()) << "ERROR: stat_async() returned error: " << cpp_strerror(-ret) << dendl;
+ }
+ }
+ }
+
+ return 0;
+}
+
+int RGWOrphanSearch::build_linked_oids_index()
+{
+ map<int, list<string> > oids;
+ map<int, string>::iterator iter = buckets_instance_index.find(search_stage.shard);
+ for (; iter != buckets_instance_index.end(); ++iter) {
+ ldout(store->ctx(), 0) << "building linked oids index: " << iter->first << "/" << buckets_instance_index.size() << dendl;
+ bool truncated;
+
+ string oid = iter->second;
+
+ do {
+ map<string, bufferlist> entries;
+ int ret = orphan_store.read_entries(oid, search_stage.marker, &entries, &truncated);
+ if (ret == -ENOENT) {
+ truncated = false;
+ ret = 0;
+ }
+
+ if (ret < 0) {
+ lderr(store->ctx()) << __func__ << ": ERROR: read_entries() oid=" << oid << " returned ret=" << ret << dendl;
+ return ret;
+ }
+
+ if (entries.empty()) {
+ break;
+ }
+
+ for (map<string, bufferlist>::iterator eiter = entries.begin(); eiter != entries.end(); ++eiter) {
+ ldout(store->ctx(), 20) << " indexed entry: " << eiter->first << dendl;
+ ret = build_linked_oids_for_bucket(eiter->first, oids);
+ if (ret < 0) {
+ lderr(store->ctx()) << __func__ << ": ERROR: build_linked_oids_for_bucket() indexed entry=" << eiter->first
+ << " returned ret=" << ret << dendl;
+ return ret;
+ }
+ }
+
+ search_stage.shard = iter->first;
+ search_stage.marker = entries.rbegin()->first; /* last entry */
+ } while (truncated);
+
+ search_stage.marker.clear();
+ }
+
+ int ret = log_oids(linked_objs_index, oids);
+ if (ret < 0) {
+ cerr << __func__ << ": ERROR: log_oids() returned ret=" << ret << std::endl;
+ return ret;
+ }
+
+ ret = save_state();
+ if (ret < 0) {
+ cerr << __func__ << ": ERROR: failed to write state ret=" << ret << std::endl;
+ return ret;
+ }
+
+ return 0;
+}
+
+class OMAPReader {
+ librados::IoCtx ioctx;
+ string oid;
+
+ map<string, bufferlist> entries;
+ map<string, bufferlist>::iterator iter;
+ string marker;
+ bool truncated;
+
+public:
+ OMAPReader(librados::IoCtx& _ioctx, const string& _oid) : ioctx(_ioctx), oid(_oid), truncated(true) {
+ iter = entries.end();
+ }
+
+ int get_next(string *key, bufferlist *pbl, bool *done);
+};
+
+int OMAPReader::get_next(string *key, bufferlist *pbl, bool *done)
+{
+ if (iter != entries.end()) {
+ *key = iter->first;
+ if (pbl) {
+ *pbl = iter->second;
+ }
+ ++iter;
+ *done = false;
+ marker = *key;
+ return 0;
+ }
+
+ if (!truncated) {
+ *done = true;
+ return 0;
+ }
+
+#define MAX_OMAP_GET_ENTRIES 100
+ int ret = ioctx.omap_get_vals(oid, marker, MAX_OMAP_GET_ENTRIES, &entries);
+ if (ret < 0) {
+ if (ret == -ENOENT) {
+ *done = true;
+ return 0;
+ }
+ return ret;
+ }
+
+ truncated = (entries.size() == MAX_OMAP_GET_ENTRIES);
+ iter = entries.begin();
+ return get_next(key, pbl, done);
+}
+
+int RGWOrphanSearch::compare_oid_indexes()
+{
+ ceph_assert(linked_objs_index.size() == all_objs_index.size());
+
+ librados::IoCtx& ioctx = orphan_store.get_ioctx();
+
+ librados::IoCtx data_ioctx;
+
+ int ret = rgw_init_ioctx(store->get_rados_handle(), search_info.pool, data_ioctx);
+ if (ret < 0) {
+ lderr(store->ctx()) << __func__ << ": rgw_init_ioctx() returned ret=" << ret << dendl;
+ return ret;
+ }
+
+ uint64_t time_threshold = search_info.start_time.sec() - stale_secs;
+
+ map<int, string>::iterator liter = linked_objs_index.begin();
+ map<int, string>::iterator aiter = all_objs_index.begin();
+
+ for (; liter != linked_objs_index.end(); ++liter, ++aiter) {
+ OMAPReader linked_entries(ioctx, liter->second);
+ OMAPReader all_entries(ioctx, aiter->second);
+
+ bool done;
+
+ string cur_linked;
+ bool linked_done = false;
+
+
+ do {
+ string key;
+ int r = all_entries.get_next(&key, NULL, &done);
+ if (r < 0) {
+ return r;
+ }
+ if (done) {
+ break;
+ }
+
+ string key_fp = obj_fingerprint(key);
+
+ while (cur_linked < key_fp && !linked_done) {
+ r = linked_entries.get_next(&cur_linked, NULL, &linked_done);
+ if (r < 0) {
+ return r;
+ }
+ }
+
+ if (cur_linked == key_fp) {
+ ldout(store->ctx(), 20) << "linked: " << key << dendl;
+ continue;
+ }
+
+ time_t mtime;
+ r = data_ioctx.stat(key, NULL, &mtime);
+ if (r < 0) {
+ if (r != -ENOENT) {
+ lderr(store->ctx()) << "ERROR: ioctx.stat(" << key << ") returned ret=" << r << dendl;
+ }
+ continue;
+ }
+ if (stale_secs && (uint64_t)mtime >= time_threshold) {
+ ldout(store->ctx(), 20) << "skipping: " << key << " (mtime=" << mtime << " threshold=" << time_threshold << ")" << dendl;
+ continue;
+ }
+ ldout(store->ctx(), 20) << "leaked: " << key << dendl;
+ cout << "leaked: " << key << std::endl;
+ } while (!done);
+ }
+
+ return 0;
+}
+
+int RGWOrphanSearch::run()
+{
+ int r;
+
+ switch (search_stage.stage) {
+
+ case ORPHAN_SEARCH_STAGE_INIT:
+ ldout(store->ctx(), 0) << __func__ << "(): initializing state" << dendl;
+ search_stage = RGWOrphanSearchStage(ORPHAN_SEARCH_STAGE_LSPOOL);
+ r = save_state();
+ if (r < 0) {
+ lderr(store->ctx()) << __func__ << ": ERROR: failed to save state, ret=" << r << dendl;
+ return r;
+ }
+ // fall through
+ case ORPHAN_SEARCH_STAGE_LSPOOL:
+ ldout(store->ctx(), 0) << __func__ << "(): building index of all objects in pool" << dendl;
+ r = build_all_oids_index();
+ if (r < 0) {
+ lderr(store->ctx()) << __func__ << ": ERROR: build_all_objs_index returned ret=" << r << dendl;
+ return r;
+ }
+
+ search_stage = RGWOrphanSearchStage(ORPHAN_SEARCH_STAGE_LSBUCKETS);
+ r = save_state();
+ if (r < 0) {
+ lderr(store->ctx()) << __func__ << ": ERROR: failed to save state, ret=" << r << dendl;
+ return r;
+ }
+ // fall through
+
+ case ORPHAN_SEARCH_STAGE_LSBUCKETS:
+ ldout(store->ctx(), 0) << __func__ << "(): building index of all bucket indexes" << dendl;
+ r = build_buckets_instance_index();
+ if (r < 0) {
+ lderr(store->ctx()) << __func__ << ": ERROR: build_all_objs_index returned ret=" << r << dendl;
+ return r;
+ }
+
+ search_stage = RGWOrphanSearchStage(ORPHAN_SEARCH_STAGE_ITERATE_BI);
+ r = save_state();
+ if (r < 0) {
+ lderr(store->ctx()) << __func__ << ": ERROR: failed to save state, ret=" << r << dendl;
+ return r;
+ }
+ // fall through
+
+
+ case ORPHAN_SEARCH_STAGE_ITERATE_BI:
+ ldout(store->ctx(), 0) << __func__ << "(): building index of all linked objects" << dendl;
+ r = build_linked_oids_index();
+ if (r < 0) {
+ lderr(store->ctx()) << __func__ << ": ERROR: build_all_objs_index returned ret=" << r << dendl;
+ return r;
+ }
+
+ search_stage = RGWOrphanSearchStage(ORPHAN_SEARCH_STAGE_COMPARE);
+ r = save_state();
+ if (r < 0) {
+ lderr(store->ctx()) << __func__ << ": ERROR: failed to save state, ret=" << r << dendl;
+ return r;
+ }
+ // fall through
+
+ case ORPHAN_SEARCH_STAGE_COMPARE:
+ r = compare_oid_indexes();
+ if (r < 0) {
+ lderr(store->ctx()) << __func__ << ": ERROR: build_all_objs_index returned ret=" << r << dendl;
+ return r;
+ }
+
+ break;
+
+ default:
+ ceph_abort();
+ };
+
+ return 0;
+}
+
+
+int RGWOrphanSearch::remove_index(map<int, string>& index)
+{
+ librados::IoCtx& ioctx = orphan_store.get_ioctx();
+
+ for (map<int, string>::iterator iter = index.begin(); iter != index.end(); ++iter) {
+ int r = ioctx.remove(iter->second);
+ if (r < 0) {
+ if (r != -ENOENT) {
+ ldout(store->ctx(), 0) << "ERROR: couldn't remove " << iter->second << ": ret=" << r << dendl;
+ }
+ }
+ }
+ return 0;
+}
+
+int RGWOrphanSearch::finish()
+{
+ int r = remove_index(all_objs_index);
+ if (r < 0) {
+ ldout(store->ctx(), 0) << "ERROR: remove_index(" << all_objs_index << ") returned ret=" << r << dendl;
+ }
+ r = remove_index(buckets_instance_index);
+ if (r < 0) {
+ ldout(store->ctx(), 0) << "ERROR: remove_index(" << buckets_instance_index << ") returned ret=" << r << dendl;
+ }
+ r = remove_index(linked_objs_index);
+ if (r < 0) {
+ ldout(store->ctx(), 0) << "ERROR: remove_index(" << linked_objs_index << ") returned ret=" << r << dendl;
+ }
+
+ r = orphan_store.remove_job(search_info.job_name);
+ if (r < 0) {
+ ldout(store->ctx(), 0) << "ERROR: could not remove job name (" << search_info.job_name << ") ret=" << r << dendl;
+ }
+
+ return r;
+}
+
+
+int RGWRadosList::handle_stat_result(RGWRados::Object::Stat::Result& result,
+ std::set<string>& obj_oids)
+{
+ obj_oids.clear();
+
+ rgw_bucket& bucket = result.obj.bucket;
+
+ ldout(store->ctx(), 20) << "RGWRadosList::" << __func__ <<
+ " bucket=" << bucket << ", has_manifest=" << result.has_manifest <<
+ dendl;
+
+ // iterator to store result of dlo/slo attribute find
+ decltype(result.attrs)::iterator attr_it = result.attrs.end();
+ const std::string oid = bucket.marker + "_" + result.obj.get_oid();
+ ldout(store->ctx(), 20) << "radoslist processing object=\"" <<
+ oid << "\"" << dendl;
+ if (visited_oids.find(oid) != visited_oids.end()) {
+ // apparently we hit a loop; don't continue with this oid
+ ldout(store->ctx(), 15) <<
+ "radoslist stopped loop at already visited object=\"" <<
+ oid << "\"" << dendl;
+ return 0;
+ }
+
+ if (!result.has_manifest) {
+ /* a very very old object, or part of a multipart upload during upload */
+ obj_oids.insert(oid);
+
+ /*
+ * multipart parts don't have manifest on them, it's in the meta
+ * object; we'll process them in
+ * RGWRadosList::do_incomplete_multipart
+ */
+ } else if ((attr_it = result.attrs.find(RGW_ATTR_USER_MANIFEST)) !=
+ result.attrs.end()) {
+ // *** handle DLO object ***
+
+ obj_oids.insert(oid);
+ visited_oids.insert(oid); // prevent dlo loops
+ ldout(store->ctx(), 15) << "radoslist added to visited list DLO=\"" <<
+ oid << "\"" << dendl;
+
+ char* prefix_path_c = attr_it->second.c_str();
+ const std::string& prefix_path = prefix_path_c;
+
+ const size_t sep_pos = prefix_path.find('/');
+ if (string::npos == sep_pos) {
+ return -EINVAL;
+ }
+
+ const std::string bucket_name = prefix_path.substr(0, sep_pos);
+ const std::string prefix = prefix_path.substr(sep_pos + 1);
+
+ add_bucket_prefix(bucket_name, prefix);
+ ldout(store->ctx(), 25) << "radoslist DLO oid=\"" << oid <<
+ "\" added bucket=\"" << bucket_name << "\" prefix=\"" <<
+ prefix << "\" to process list" << dendl;
+ } else if ((attr_it = result.attrs.find(RGW_ATTR_SLO_MANIFEST)) !=
+ result.attrs.end()) {
+ // *** handle SLO object ***
+
+ obj_oids.insert(oid);
+ visited_oids.insert(oid); // prevent slo loops
+ ldout(store->ctx(), 15) << "radoslist added to visited list SLO=\"" <<
+ oid << "\"" << dendl;
+
+ RGWSLOInfo slo_info;
+ bufferlist::const_iterator bliter = attr_it->second.begin();
+ try {
+ ::decode(slo_info, bliter);
+ } catch (buffer::error& err) {
+ ldout(store->ctx(), 0) <<
+ "ERROR: failed to decode slo manifest for " << oid << dendl;
+ return -EIO;
+ }
+
+ for (const auto& iter : slo_info.entries) {
+ const string& path_str = iter.path;
+
+ const size_t sep_pos = path_str.find('/', 1 /* skip initial slash */);
+ if (string::npos == sep_pos) {
+ return -EINVAL;
+ }
+
+ std::string bucket_name;
+ std::string obj_name;
+
+ bucket_name = url_decode(path_str.substr(1, sep_pos - 1));
+ obj_name = url_decode(path_str.substr(sep_pos + 1));
+
+ const rgw_obj_key obj_key(obj_name);
+ add_bucket_filter(bucket_name, obj_key);
+ ldout(store->ctx(), 25) << "radoslist SLO oid=\"" << oid <<
+ "\" added bucket=\"" << bucket_name << "\" obj_key=\"" <<
+ obj_key << "\" to process list" << dendl;
+ }
+ } else {
+ RGWObjManifest& manifest = result.manifest;
+
+ // in multipart, the head object contains no data and just has the
+ // manifest AND empty objects have no manifest, but they're
+ // realized as empty rados objects
+ if (0 == manifest.get_max_head_size() ||
+ manifest.obj_begin() == manifest.obj_end()) {
+ obj_oids.insert(oid);
+ // first_insert = true;
+ }
+
+ RGWObjManifest::obj_iterator miter;
+ for (miter = manifest.obj_begin(); miter != manifest.obj_end(); ++miter) {
+ const rgw_raw_obj& loc = miter.get_location().get_raw_obj(store);
+ string s = loc.oid;
+ obj_oids.insert(s);
+ }
+ }
+
+ return 0;
+} // RGWRadosList::handle_stat_result
+
+int RGWRadosList::pop_and_handle_stat_op(
+ RGWObjectCtx& obj_ctx,
+ std::deque<RGWRados::Object::Stat>& ops)
+{
+ std::set<string> obj_oids;
+ RGWRados::Object::Stat& front_op = ops.front();
+
+ int ret = front_op.wait();
+ if (ret < 0) {
+ if (ret != -ENOENT) {
+ lderr(store->ctx()) << "ERROR: stat_async() returned error: " <<
+ cpp_strerror(-ret) << dendl;
+ }
+ goto done;
+ }
+
+ ret = handle_stat_result(front_op.result, obj_oids);
+ if (ret < 0) {
+ lderr(store->ctx()) << "ERROR: handle_stat_result() returned error: " <<
+ cpp_strerror(-ret) << dendl;
+ }
+
+ // output results
+ for (const auto& o : obj_oids) {
+ std::cout << o << std::endl;
+ }
+
+done:
+
+ // invalidate object context for this object to avoid memory leak
+ // (see pr https://github.com/ceph/ceph/pull/30174)
+ obj_ctx.invalidate(front_op.result.obj);
+
+ ops.pop_front();
+ return ret;
+}
+
+
+#if 0 // code that may be the basis for expansion
+int RGWRadosList::build_buckets_instance_index()
+{
+ void *handle;
+ int max = 1000;
+ string section = "bucket.instance";
+ int ret = store->meta_mgr->list_keys_init(section, &handle);
+ if (ret < 0) {
+ lderr(store->ctx()) << "ERROR: can't get key: " << cpp_strerror(-ret) << dendl;
+ return ret;
+ }
+
+ map<int, list<string> > instances;
+
+ bool truncated;
+
+ RGWObjectCtx obj_ctx(store);
+
+ int count = 0;
+ uint64_t total = 0;
+
+ do {
+ list<string> keys;
+ ret = store->meta_mgr->list_keys_next(handle, max, keys, &truncated);
+ if (ret < 0) {
+ lderr(store->ctx()) << "ERROR: lists_keys_next(): " << cpp_strerror(-ret) << dendl;
+ return ret;
+ }
+
+ for (list<string>::iterator iter = keys.begin(); iter != keys.end(); ++iter) {
+ ++total;
+ ldout(store->ctx(), 10) << "bucket_instance=" << *iter << " total=" << total << dendl;
+ int shard = orphan_shard(*iter);
+ instances[shard].push_back(*iter);
+
+ if (++count >= COUNT_BEFORE_FLUSH) {
+ ret = log_oids(buckets_instance_index, instances);
+ if (ret < 0) {
+ lderr(store->ctx()) << __func__ << ": ERROR: log_oids() returned ret=" << ret << dendl;
+ return ret;
+ }
+ count = 0;
+ instances.clear();
+ }
+ }
+ } while (truncated);
+
+ ret = log_oids(buckets_instance_index, instances);
+ if (ret < 0) {
+ lderr(store->ctx()) << __func__ << ": ERROR: log_oids() returned ret=" << ret << dendl;
+ return ret;
+ }
+ store->meta_mgr->list_keys_complete(handle);
+
+ return 0;
+}
+#endif
+
+
+int RGWRadosList::process_bucket(
+ const std::string& bucket_instance_id,
+ const std::string& prefix,
+ const std::set<rgw_obj_key>& entries_filter)
+{
+ ldout(store->ctx(), 10) << "RGWRadosList::" << __func__ <<
+ " bucket_instance_id=" << bucket_instance_id <<
+ ", prefix=" << prefix <<
+ ", entries_filter.size=" << entries_filter.size() << dendl;
+
+ RGWBucketInfo bucket_info;
+ RGWSysObjectCtx sys_obj_ctx = store->svc.sysobj->init_obj_ctx();
+ int ret = store->get_bucket_instance_info(sys_obj_ctx, bucket_instance_id,
+ bucket_info, nullptr, nullptr);
+ if (ret < 0) {
+ if (ret == -ENOENT) {
+ // probably raced with bucket removal
+ return 0;
+ }
+ lderr(store->ctx()) << __func__ <<
+ ": ERROR: RGWRados::get_bucket_instance_info() returned ret=" <<
+ ret << dendl;
+ return ret;
+ }
+
+ RGWRados::Bucket target(store, bucket_info);
+ RGWRados::Bucket::List list_op(&target);
+
+ std::string marker;
+ list_op.params.marker = rgw_obj_key(marker);
+ list_op.params.list_versions = true;
+ list_op.params.enforce_ns = false;
+ list_op.params.allow_unordered = false;
+ list_op.params.prefix = prefix;
+
+ bool truncated;
+
+ std::deque<RGWRados::Object::Stat> stat_ops;
+ std::string prev_versioned_key_name = "";
+
+ RGWObjectCtx obj_ctx(store);
+
+ do {
+ std::vector<rgw_bucket_dir_entry> result;
+
+ constexpr int64_t LIST_OBJS_MAX_ENTRIES = 100;
+ ret = list_op.list_objects(LIST_OBJS_MAX_ENTRIES, &result,
+ NULL, &truncated);
+ if (ret == -ENOENT) {
+ // race with bucket delete?
+ ret = 0;
+ break;
+ } else if (ret < 0) {
+ std::cerr << "ERROR: store->list_objects(): " << cpp_strerror(-ret) <<
+ std::endl;
+ return ret;
+ }
+
+ for (std::vector<rgw_bucket_dir_entry>::iterator iter = result.begin();
+ iter != result.end();
+ ++iter) {
+ rgw_bucket_dir_entry& entry = *iter;
+
+ if (entry.key.instance.empty()) {
+ ldout(store->ctx(), 20) << "obj entry: " << entry.key.name << dendl;
+ } else {
+ ldout(store->ctx(), 20) << "obj entry: " << entry.key.name <<
+ " [" << entry.key.instance << "]" << dendl;
+ }
+
+ ldout(store->ctx(), 20) << __func__ << ": entry.key.name=" <<
+ entry.key.name << " entry.key.instance=" << entry.key.instance <<
+ dendl;
+
+ // ignore entries that are not in the filter if there is a filter
+ if (!entries_filter.empty() &&
+ entries_filter.find(entry.key) == entries_filter.cend()) {
+ continue;
+ }
+
+ // we need to do this in two cases below, so use a lambda
+ auto do_stat_key =
+ [&](const rgw_obj_key& key) -> int {
+ int ret;
+
+ rgw_obj obj(bucket_info.bucket, key);
+
+ RGWRados::Object op_target(store, bucket_info, obj_ctx, obj);
+
+ stat_ops.push_back(RGWRados::Object::Stat(&op_target));
+ RGWRados::Object::Stat& op = stat_ops.back();
+
+ ret = op.stat_async();
+ if (ret < 0) {
+ lderr(store->ctx()) << "ERROR: stat_async() returned error: " <<
+ cpp_strerror(-ret) << dendl;
+ return ret;
+ }
+
+ if (stat_ops.size() >= max_concurrent_ios) {
+ ret = pop_and_handle_stat_op(obj_ctx, stat_ops);
+ if (ret < 0) {
+ if (ret != -ENOENT) {
+ lderr(store->ctx()) <<
+ "ERROR: pop_and_handle_stat_op() returned error: " <<
+ cpp_strerror(-ret) << dendl;
+ }
+
+ // clear error, so we'll continue processing directory
+ ret = 0;
+ }
+ }
+
+ return ret;
+ }; // do_stat_key lambda
+
+ // for versioned objects, make sure the head object is handled
+ // as well by ignoring the instance identifier
+ if (!entry.key.instance.empty() &&
+ entry.key.name != prev_versioned_key_name) {
+ // don't do the same key twice; even though out bucket index
+ // listing allows unordered, since all versions of an object
+ // use the same bucket index key, they'll all end up together
+ // and sorted
+ prev_versioned_key_name = entry.key.name;
+
+ rgw_obj_key uninstanced(entry.key.name);
+
+ ret = do_stat_key(uninstanced);
+ if (ret < 0) {
+ return ret;
+ }
+ }
+
+ ret = do_stat_key(entry.key);
+ if (ret < 0) {
+ return ret;
+ }
+ } // for iter loop
+ } while (truncated);
+
+ while (!stat_ops.empty()) {
+ ret = pop_and_handle_stat_op(obj_ctx, stat_ops);
+ if (ret < 0) {
+ if (ret != -ENOENT) {
+ lderr(store->ctx()) << "ERROR: stat_async() returned error: " <<
+ cpp_strerror(-ret) << dendl;
+ }
+ }
+ }
+
+ return 0;
+}
+
+
+int RGWRadosList::run()
+{
+ int ret;
+ void* handle = nullptr;
+
+ ret = store->meta_mgr->list_keys_init("bucket", &handle);
+ if (ret < 0) {
+ lderr(store->ctx()) << "RGWRadosList::" << __func__ <<
+ " ERROR: list_keys_init returned " <<
+ cpp_strerror(-ret) << dendl;
+ return ret;
+ }
+
+ const int max_keys = 1000;
+ bool truncated = true;
+
+ do {
+ std::list<std::string> buckets;
+ ret = store->meta_mgr->list_keys_next(handle, max_keys, buckets, &truncated);
+
+ for (std::string& bucket_id : buckets) {
+ ret = run(bucket_id);
+ if (ret == -ENOENT) {
+ continue;
+ } else if (ret < 0) {
+ return ret;
+ }
+ }
+ } while (truncated);
+
+ return 0;
+} // RGWRadosList::run()
+
+
+int RGWRadosList::run(const std::string& start_bucket_name)
+{
+ RGWSysObjectCtx sys_obj_ctx = store->svc.sysobj->init_obj_ctx();
+ RGWObjectCtx obj_ctx(store);
+ int ret;
+
+ add_bucket_entire(start_bucket_name);
+
+ while (! bucket_process_map.empty()) {
+ // pop item from map and capture its key data
+ auto front = bucket_process_map.begin();
+ std::string bucket_name = front->first;
+ process_t process;
+ std::swap(process, front->second);
+ bucket_process_map.erase(front);
+
+ RGWBucketInfo bucket_info;
+ ret = store->get_bucket_info(sys_obj_ctx,
+ tenant_name, bucket_name, bucket_info,
+ nullptr, nullptr);
+ if (ret == -ENOENT) {
+ std::cerr << "WARNING: bucket " << bucket_name <<
+ " does not exist; could it have been deleted very recently?" <<
+ std::endl;
+ continue;
+ } else if (ret < 0) {
+ std::cerr << "ERROR: could not get info for bucket " << bucket_name <<
+ " -- " << cpp_strerror(-ret) << std::endl;
+ return ret;
+ }
+
+ const std::string bucket_id = bucket_info.bucket.get_key();
+
+ static const std::set<rgw_obj_key> empty_filter;
+ static const std::string empty_prefix;
+
+ auto do_process_bucket =
+ [&bucket_id, this]
+ (const std::string& prefix,
+ const std::set<rgw_obj_key>& entries_filter) -> int {
+ int ret = process_bucket(bucket_id, prefix, entries_filter);
+ if (ret == -ENOENT) {
+ // bucket deletion race?
+ return 0;
+ } if (ret < 0) {
+ lderr(store->ctx()) << "RGWRadosList::" << __func__ <<
+ ": ERROR: process_bucket(); bucket_id=" <<
+ bucket_id << " returned ret=" << ret << dendl;
+ }
+
+ return ret;
+ };
+
+ // either process the whole bucket *or* process the filters and/or
+ // the prefixes
+ if (process.entire_container) {
+ ret = do_process_bucket(empty_prefix, empty_filter);
+ if (ret < 0) {
+ return ret;
+ }
+ } else {
+ if (! process.filter_keys.empty()) {
+ ret = do_process_bucket(empty_prefix, process.filter_keys);
+ if (ret < 0) {
+ return ret;
+ }
+ }
+ for (const auto& p : process.prefixes) {
+ ret = do_process_bucket(p, empty_filter);
+ if (ret < 0) {
+ return ret;
+ }
+ }
+ }
+ } // while (! bucket_process_map.empty())
+
+ // now handle incomplete multipart uploads by going back to the
+ // initial bucket
+
+ RGWBucketInfo bucket_info;
+ ret = store->get_bucket_info(sys_obj_ctx,
+ tenant_name, start_bucket_name, bucket_info,
+ nullptr, nullptr);
+ if (ret == -ENOENT) {
+ // bucket deletion race?
+ return 0;
+ } else if (ret < 0) {
+ lderr(store->ctx()) << "RGWRadosList::" << __func__ <<
+ ": ERROR: get_bucket_info returned ret=" << ret << dendl;
+ return ret;
+ }
+
+ ret = do_incomplete_multipart(store, bucket_info);
+ if (ret < 0) {
+ lderr(store->ctx()) << "RGWRadosList::" << __func__ <<
+ ": ERROR: do_incomplete_multipart returned ret=" << ret << dendl;
+ return ret;
+ }
+
+ return 0;
+} // RGWRadosList::run(string)
+
+
+int RGWRadosList::do_incomplete_multipart(
+ RGWRados* store,
+ RGWBucketInfo& bucket_info)
+{
+ constexpr int max_uploads = 1000;
+ constexpr int max_parts = 1000;
+ static const std::string mp_ns = RGW_OBJ_NS_MULTIPART;
+ static MultipartMetaFilter mp_filter;
+
+ int ret;
+
+ RGWRados::Bucket target(store, bucket_info);
+ RGWRados::Bucket::List list_op(&target);
+ list_op.params.ns = mp_ns;
+ list_op.params.filter = &mp_filter;
+ // use empty string for initial list_op.params.marker
+ // use empty strings for list_op.params.{prefix,delim}
+
+ bool is_listing_truncated;
+
+ do {
+ std::vector<rgw_bucket_dir_entry> objs;
+ std::map<string, bool> common_prefixes;
+ ret = list_op.list_objects(max_uploads, &objs, &common_prefixes,
+ &is_listing_truncated);
+ if (ret == -ENOENT) {
+ // could bucket have been removed while this is running?
+ ldout(store->ctx(), 20) << "RGWRadosList::" << __func__ <<
+ ": WARNING: call to list_objects of multipart namespace got ENOENT; "
+ "assuming bucket removal race" << dendl;
+ break;
+ } else if (ret < 0) {
+ lderr(store->ctx()) << "RGWRadosList::" << __func__ <<
+ ": ERROR: list_objects op returned ret=" << ret << dendl;
+ return ret;
+ }
+
+ if (!objs.empty()) {
+ std::vector<RGWMultipartUploadEntry> uploads;
+ RGWMultipartUploadEntry entry;
+ for (const rgw_bucket_dir_entry& obj : objs) {
+ const rgw_obj_key& key = obj.key;
+ if (!entry.mp.from_meta(key.name)) {
+ // we only want the meta objects, so skip all the components
+ continue;
+ }
+ entry.obj = obj;
+ uploads.push_back(entry);
+ ldout(store->ctx(), 20) << "RGWRadosList::" << __func__ <<
+ " processing incomplete multipart entry " <<
+ entry << dendl;
+ }
+
+ // now process the uploads vector
+ int parts_marker = 0;
+ bool is_parts_truncated = false;
+ do {
+ map<uint32_t, RGWUploadPartInfo> parts;
+
+ for (const auto& upload : uploads) {
+ const RGWMPObj& mp = upload.mp;
+ ret = list_multipart_parts(store, bucket_info, store->ctx(),
+ mp.get_upload_id(), mp.get_meta(),
+ max_parts,
+ parts_marker, parts, NULL, &is_parts_truncated);
+ if (ret == -ENOENT) {
+ continue;
+ } else if (ret < 0) {
+ lderr(store->ctx()) << "RGWRadosList::" << __func__ <<
+ ": ERROR: list_multipart_parts returned ret=" << ret << dendl;
+ return ret;
+ }
+
+ for (auto& p : parts) {
+ RGWObjManifest& manifest = p.second.manifest;
+ for (auto obj_it = manifest.obj_begin();
+ obj_it != manifest.obj_end();
+ ++obj_it) {
+ const rgw_raw_obj& loc = obj_it.get_location().get_raw_obj(store);
+ std::cout << loc.oid << std::endl;
+ }
+ }
+ }
+ } while (is_parts_truncated);
+ } // if objs not empty
+ } while (is_listing_truncated);
+
+ return 0;
+} // RGWRadosList::do_incomplete_multipart