diff options
Diffstat (limited to '')
-rw-r--r-- | src/rgw/rgw_multi.cc | 381 |
1 files changed, 381 insertions, 0 deletions
diff --git a/src/rgw/rgw_multi.cc b/src/rgw/rgw_multi.cc new file mode 100644 index 000000000..989d17a14 --- /dev/null +++ b/src/rgw/rgw_multi.cc @@ -0,0 +1,381 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab ft=cpp + +#include <string.h> + +#include <iostream> +#include <map> + +#include "include/types.h" + +#include "rgw_xml.h" +#include "rgw_multi.h" +#include "rgw_op.h" +#include "rgw_sal.h" +#include "rgw_sal_rados.h" + +#include "services/svc_sys_obj.h" +#include "services/svc_tier_rados.h" + +#define dout_subsys ceph_subsys_rgw + + + +bool RGWMultiPart::xml_end(const char *el) +{ + RGWMultiPartNumber *num_obj = static_cast<RGWMultiPartNumber *>(find_first("PartNumber")); + RGWMultiETag *etag_obj = static_cast<RGWMultiETag *>(find_first("ETag")); + + if (!num_obj || !etag_obj) + return false; + + string s = num_obj->get_data(); + if (s.empty()) + return false; + + num = atoi(s.c_str()); + + s = etag_obj->get_data(); + etag = s; + + return true; +} + +bool RGWMultiCompleteUpload::xml_end(const char *el) { + XMLObjIter iter = find("Part"); + RGWMultiPart *part = static_cast<RGWMultiPart *>(iter.get_next()); + while (part) { + int num = part->get_num(); + string etag = part->get_etag(); + parts[num] = etag; + part = static_cast<RGWMultiPart *>(iter.get_next()); + } + return true; +} + + +XMLObj *RGWMultiXMLParser::alloc_obj(const char *el) { + XMLObj *obj = NULL; + if (strcmp(el, "CompleteMultipartUpload") == 0 || + strcmp(el, "MultipartUpload") == 0) { + obj = new RGWMultiCompleteUpload(); + } else if (strcmp(el, "Part") == 0) { + obj = new RGWMultiPart(); + } else if (strcmp(el, "PartNumber") == 0) { + obj = new RGWMultiPartNumber(); + } else if (strcmp(el, "ETag") == 0) { + obj = new RGWMultiETag(); + } + + return obj; +} + +bool is_v2_upload_id(const string& upload_id) +{ + const char *uid = upload_id.c_str(); + + return (strncmp(uid, MULTIPART_UPLOAD_ID_PREFIX, sizeof(MULTIPART_UPLOAD_ID_PREFIX) - 1) == 0) || + (strncmp(uid, MULTIPART_UPLOAD_ID_PREFIX_LEGACY, sizeof(MULTIPART_UPLOAD_ID_PREFIX_LEGACY) - 1) == 0); +} + +int list_multipart_parts(const DoutPrefixProvider *dpp, rgw::sal::RGWRadosStore *store, RGWBucketInfo& bucket_info, + CephContext *cct, + const string& upload_id, + const string& meta_oid, int num_parts, + int marker, map<uint32_t, RGWUploadPartInfo>& parts, + int *next_marker, bool *truncated, + bool assume_unsorted) +{ + map<string, bufferlist> parts_map; + map<string, bufferlist>::iterator iter; + + rgw_obj obj; + obj.init_ns(bucket_info.bucket, meta_oid, RGW_OBJ_NS_MULTIPART); + obj.set_in_extra_data(true); + + rgw_raw_obj raw_obj; + store->getRados()->obj_to_raw(bucket_info.placement_rule, obj, &raw_obj); + + bool sorted_omap = is_v2_upload_id(upload_id) && !assume_unsorted; + + parts.clear(); + + auto obj_ctx = store->svc()->sysobj->init_obj_ctx(); + auto sysobj = obj_ctx.get_obj(raw_obj); + int ret; + if (sorted_omap) { + string p; + p = "part."; + char buf[32]; + + snprintf(buf, sizeof(buf), "%08d", marker); + p.append(buf); + + ret = sysobj.omap().get_vals(dpp, p, num_parts + 1, &parts_map, + nullptr, null_yield); + } else { + ret = sysobj.omap().get_all(dpp, &parts_map, null_yield); + } + if (ret < 0) { + return ret; + } + + int i; + int last_num = 0; + + uint32_t expected_next = marker + 1; + + for (i = 0, iter = parts_map.begin(); + (i < num_parts || !sorted_omap) && iter != parts_map.end(); + ++iter, ++i) { + bufferlist& bl = iter->second; + auto bli = bl.cbegin(); + RGWUploadPartInfo info; + try { + decode(info, bli); + } catch (buffer::error& err) { + ldpp_dout(dpp, 0) << "ERROR: could not part info, caught buffer::error" << + dendl; + return -EIO; + } + if (sorted_omap) { + if (info.num != expected_next) { + /* ouch, we expected a specific part num here, but we got a + * different one. Either a part is missing, or it could be a + * case of mixed rgw versions working on the same upload, + * where one gateway doesn't support correctly sorted omap + * keys for multipart upload just assume data is unsorted. + */ + return list_multipart_parts(dpp, store, bucket_info, cct, upload_id, + meta_oid, num_parts, marker, parts, + next_marker, truncated, true); + } + expected_next++; + } + if (sorted_omap || + (int)info.num > marker) { + parts[info.num] = info; + last_num = info.num; + } + } + + if (sorted_omap) { + if (truncated) { + *truncated = (iter != parts_map.end()); + } + } else { + /* rebuild a map with only num_parts entries */ + map<uint32_t, RGWUploadPartInfo> new_parts; + map<uint32_t, RGWUploadPartInfo>::iterator piter; + for (i = 0, piter = parts.begin(); + i < num_parts && piter != parts.end(); + ++i, ++piter) { + new_parts[piter->first] = piter->second; + last_num = piter->first; + } + + if (truncated) { + *truncated = (piter != parts.end()); + } + + parts.swap(new_parts); + } + + if (next_marker) { + *next_marker = last_num; + } + + return 0; +} + +int list_multipart_parts(const DoutPrefixProvider *dpp, + rgw::sal::RGWRadosStore *store, struct req_state *s, + const string& upload_id, + const string& meta_oid, int num_parts, + int marker, map<uint32_t, RGWUploadPartInfo>& parts, + int *next_marker, bool *truncated, + bool assume_unsorted) +{ + return list_multipart_parts(dpp, store, s->bucket->get_info(), s->cct, upload_id, + meta_oid, num_parts, marker, parts, + next_marker, truncated, assume_unsorted); +} + +int abort_multipart_upload(const DoutPrefixProvider *dpp, + rgw::sal::RGWRadosStore *store, CephContext *cct, + RGWObjectCtx *obj_ctx, RGWBucketInfo& bucket_info, + RGWMPObj& mp_obj) +{ + rgw_obj meta_obj; + meta_obj.init_ns(bucket_info.bucket, mp_obj.get_meta(), RGW_OBJ_NS_MULTIPART); + meta_obj.set_in_extra_data(true); + meta_obj.index_hash_source = mp_obj.get_key(); + cls_rgw_obj_chain chain; + list<rgw_obj_index_key> remove_objs; + map<uint32_t, RGWUploadPartInfo> obj_parts; + bool truncated; + int marker = 0; + int ret; + uint64_t parts_accounted_size = 0; + + do { + ret = list_multipart_parts(dpp, store, bucket_info, cct, + mp_obj.get_upload_id(), mp_obj.get_meta(), + 1000, marker, obj_parts, &marker, &truncated); + if (ret < 0) { + ldpp_dout(dpp, 20) << __func__ << ": list_multipart_parts returned " << + ret << dendl; + return (ret == -ENOENT) ? -ERR_NO_SUCH_UPLOAD : ret; + } + + for (auto obj_iter = obj_parts.begin(); + obj_iter != obj_parts.end(); + ++obj_iter) { + RGWUploadPartInfo& obj_part = obj_iter->second; + rgw_obj obj; + if (obj_part.manifest.empty()) { + string oid = mp_obj.get_part(obj_iter->second.num); + obj.init_ns(bucket_info.bucket, oid, RGW_OBJ_NS_MULTIPART); + obj.index_hash_source = mp_obj.get_key(); + ret = store->getRados()->delete_obj(dpp, *obj_ctx, bucket_info, obj, 0); + if (ret < 0 && ret != -ENOENT) + return ret; + } else { + store->getRados()->update_gc_chain(dpp, meta_obj, obj_part.manifest, &chain); + RGWObjManifest::obj_iterator oiter = obj_part.manifest.obj_begin(dpp); + if (oiter != obj_part.manifest.obj_end(dpp)) { + rgw_obj head; + rgw_raw_obj raw_head = oiter.get_location().get_raw_obj(store); + RGWSI_Tier_RADOS::raw_obj_to_obj(bucket_info.bucket, raw_head, &head); + + rgw_obj_index_key key; + head.key.get_index_key(&key); + remove_objs.push_back(key); + } + } + parts_accounted_size += obj_part.accounted_size; + } + } while (truncated); + + /* use upload id as tag and do it synchronously */ + auto [r, leftover_chain] = store->getRados()->send_chain_to_gc(chain, mp_obj.get_upload_id()); + if (r < 0 && leftover_chain) { + ldpp_dout(dpp, 5) << __func__ << ": gc->send_chain() returned " << r << dendl; + if (r == -ENOENT) { + return -ERR_NO_SUCH_UPLOAD; + } + //Delete objects inline if send chain to gc fails + store->getRados()->delete_objs_inline(dpp, *leftover_chain, mp_obj.get_upload_id()); + } + + RGWRados::Object del_target(store->getRados(), bucket_info, *obj_ctx, meta_obj); + RGWRados::Object::Delete del_op(&del_target); + del_op.params.bucket_owner = bucket_info.owner; + del_op.params.versioning_status = 0; + if (!remove_objs.empty()) { + del_op.params.remove_objs = &remove_objs; + } + + del_op.params.abortmp = true; + del_op.params.parts_accounted_size = parts_accounted_size; + + // and also remove the metadata obj + ret = del_op.delete_obj(null_yield, dpp); + if (ret < 0) { + ldpp_dout(dpp, 20) << __func__ << ": del_op.delete_obj returned " << + ret << dendl; + } + return (ret == -ENOENT) ? -ERR_NO_SUCH_UPLOAD : ret; +} + +int list_bucket_multiparts(const DoutPrefixProvider* dpp, + rgw::sal::RGWRadosStore* store, + RGWBucketInfo& bucket_info, + const std::string& prefix, + std::string& marker, // in/out + const std::string& delim, + const int& max_uploads, + std::vector<rgw_bucket_dir_entry>* objs, + std::map<std::string, bool>* common_prefixes, + bool* is_truncated) +{ + RGWRados::Bucket target(store->getRados(), bucket_info); + RGWRados::Bucket::List list_op(&target); + MultipartMetaFilter mp_filter; // filter out all but ".meta" entries + + list_op.params.prefix = prefix; + list_op.params.delim = delim; + list_op.params.marker = marker; + list_op.params.ns = RGW_OBJ_NS_MULTIPART; + list_op.params.filter = &mp_filter; + + int ret = list_op.list_objects(dpp, max_uploads, objs, common_prefixes, is_truncated, null_yield); + if (ret >= 0) { + marker = list_op.params.marker.name; + } + return ret; +} + +int abort_bucket_multiparts(const DoutPrefixProvider* dpp, + rgw::sal::RGWRadosStore* store, + CephContext* cct, + RGWBucketInfo& bucket_info) +{ + constexpr int max = 1000; + int ret, num_deleted = 0; + vector<rgw_bucket_dir_entry> objs; + RGWObjectCtx obj_ctx(store); + string marker; + bool is_truncated; + + const std::string empty_delim; + const std::string empty_prefix; + + do { + ret = list_bucket_multiparts(dpp, store, bucket_info, empty_prefix, marker, empty_delim, + max, &objs, nullptr, &is_truncated); + if (ret < 0) { + ldpp_dout(dpp, 0) << __func__ << + " ERROR : calling list_bucket_multiparts; ret=" << ret << + "; bucket=\"" << bucket_info.bucket << "\"" << dendl; + return ret; + } + ldpp_dout(dpp, 20) << __func__ << + " INFO: aborting and cleaning up multipart upload(s); bucket=\"" << + bucket_info.bucket << "\"; objs.size()=" << objs.size() << + "; is_truncated=" << is_truncated << dendl; + + if (!objs.empty()) { + RGWMPObj mp; + for (const auto& obj : objs) { + rgw_obj_key key(obj.key); + if (!mp.from_meta(key.name)) + continue; + ret = abort_multipart_upload(dpp, store, cct, &obj_ctx, bucket_info, mp); + if (ret < 0) { + // we're doing a best-effort; if something cannot be found, + // log it and keep moving forward + if (ret != -ENOENT && ret != -ERR_NO_SUCH_UPLOAD) { + ldpp_dout(dpp, 0) << __func__ << + " ERROR : failed to abort and clean-up multipart upload \"" << + key.get_oid() << "\"" << dendl; + return ret; + } else { + ldpp_dout(dpp, 10) << __func__ << + " NOTE : unable to find part(s) of " + "aborted multipart upload of \"" << key.get_oid() << + "\" for cleaning up" << dendl; + } + } + num_deleted++; + } + if (num_deleted) { + ldpp_dout(dpp, 0) << __func__ << + " WARNING : aborted " << num_deleted << + " incomplete multipart uploads" << dendl; + } + } + } while (is_truncated); + + return 0; +} |