// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- // vim: ts=8 sw=2 smarttab #include "dbstore.h" using namespace std; namespace rgw { namespace store { map DB::objectmap = {}; map DB::getObjectMap() { return DB::objectmap; } int DB::Initialize(string logfile, int loglevel) { int ret = -1; const DoutPrefixProvider *dpp = get_def_dpp(); if (!cct) { cout << "Failed to Initialize. No ceph Context \n"; return -1; } if (loglevel > 0) { cct->_conf->subsys.set_log_level(ceph_subsys_rgw, loglevel); } if (!logfile.empty()) { cct->_log->set_log_file(logfile); cct->_log->reopen_log_file(); } db = openDB(dpp); if (!db) { ldpp_dout(dpp, 0) <<"Failed to open database " << dendl; return ret; } ret = InitializeDBOps(dpp); if (ret) { ldpp_dout(dpp, 0) <<"InitializeDBOps failed " << dendl; closeDB(dpp); db = NULL; return ret; } ldpp_dout(dpp, 0) << "DB successfully initialized - name:" \ << db_name << "" << dendl; return ret; } int DB::createGC(const DoutPrefixProvider *dpp) { int ret = 0; /* create gc thread */ gc_worker = std::make_unique(dpp, this); gc_worker->create("db_gc"); return ret; } int DB::stopGC() { if (gc_worker) { gc_worker->signal_stop(); gc_worker->join(); } return 0; } int DB::Destroy(const DoutPrefixProvider *dpp) { if (!db) return 0; stopGC(); closeDB(dpp); ldpp_dout(dpp, 20)<<"DB successfully destroyed - name:" \ < DB::getDBOp(const DoutPrefixProvider *dpp, std::string_view Op, const DBOpParams *params) { if (!Op.compare("InsertUser")) return dbops.InsertUser; if (!Op.compare("RemoveUser")) return dbops.RemoveUser; if (!Op.compare("GetUser")) return dbops.GetUser; if (!Op.compare("InsertBucket")) return dbops.InsertBucket; if (!Op.compare("UpdateBucket")) return dbops.UpdateBucket; if (!Op.compare("RemoveBucket")) return dbops.RemoveBucket; if (!Op.compare("GetBucket")) return dbops.GetBucket; if (!Op.compare("ListUserBuckets")) return dbops.ListUserBuckets; if (!Op.compare("InsertLCEntry")) return dbops.InsertLCEntry; if (!Op.compare("RemoveLCEntry")) return dbops.RemoveLCEntry; if (!Op.compare("GetLCEntry")) return dbops.GetLCEntry; if (!Op.compare("ListLCEntries")) return dbops.ListLCEntries; if (!Op.compare("InsertLCHead")) return dbops.InsertLCHead; if (!Op.compare("RemoveLCHead")) return dbops.RemoveLCHead; if (!Op.compare("GetLCHead")) return dbops.GetLCHead; /* Object Operations */ map::iterator iter; class ObjectOp* Ob; { const std::lock_guard lk(mtx); iter = DB::objectmap.find(params->op.bucket.info.bucket.name); } if (iter == DB::objectmap.end()) { ldpp_dout(dpp, 30)<<"No objectmap found for bucket: " \ <op.bucket.info.bucket.name << dendl; /* not found */ return nullptr; } Ob = iter->second; if (!Op.compare("PutObject")) return Ob->PutObject; if (!Op.compare("DeleteObject")) return Ob->DeleteObject; if (!Op.compare("GetObject")) return Ob->GetObject; if (!Op.compare("UpdateObject")) return Ob->UpdateObject; if (!Op.compare("ListBucketObjects")) return Ob->ListBucketObjects; if (!Op.compare("ListVersionedObjects")) return Ob->ListVersionedObjects; if (!Op.compare("PutObjectData")) return Ob->PutObjectData; if (!Op.compare("UpdateObjectData")) return Ob->UpdateObjectData; if (!Op.compare("GetObjectData")) return Ob->GetObjectData; if (!Op.compare("DeleteObjectData")) return Ob->DeleteObjectData; if (!Op.compare("DeleteStaleObjectData")) return Ob->DeleteStaleObjectData; return nullptr; } int DB::objectmapInsert(const DoutPrefixProvider *dpp, string bucket, class ObjectOp* ptr) { map::iterator iter; class ObjectOp *Ob; const std::lock_guard lk(mtx); iter = DB::objectmap.find(bucket); if (iter != DB::objectmap.end()) { // entry already exists // return success or replace it or // return error ? // // return success for now & delete the newly allocated ptr ldpp_dout(dpp, 30)<<"Objectmap entry already exists for bucket("\ <InitializeObjectOps(getDBname(), dpp); DB::objectmap.insert(pair(bucket, Ob)); return 0; } int DB::objectmapDelete(const DoutPrefixProvider *dpp, string bucket) { map::iterator iter; const std::lock_guard lk(mtx); iter = DB::objectmap.find(bucket); if (iter == DB::objectmap.end()) { // entry doesn't exist // return success or return error ? // return success for now ldpp_dout(dpp, 20)<<"Objectmap entry for bucket("<cct = cct; //reset params here params->user_table = user_table; params->bucket_table = bucket_table; params->quota_table = quota_table; params->lc_entry_table = lc_entry_table; params->lc_head_table = lc_head_table; ret = 0; out: return ret; } int DB::ProcessOp(const DoutPrefixProvider *dpp, std::string_view Op, DBOpParams *params) { int ret = -1; shared_ptr db_op; db_op = getDBOp(dpp, Op, params); if (!db_op) { ldpp_dout(dpp, 0)<<"No db_op found for Op("<Execute(dpp, params); if (ret) { ldpp_dout(dpp, 0)<<"In Process op Execute failed for fop(" << Op << ")" << dendl; } else { ldpp_dout(dpp, 20)<<"Successfully processed fop(" << Op << ")" << dendl; } return ret; } int DB::get_user(const DoutPrefixProvider *dpp, const std::string& query_str, const std::string& query_str_val, RGWUserInfo& uinfo, map *pattrs, RGWObjVersionTracker *pobjv_tracker) { int ret = 0; if (query_str.empty() || query_str_val.empty()) { ldpp_dout(dpp, 0)<<"In GetUser - Invalid query(" << query_str <<"), query_str_val(" << query_str_val <<")" << dendl; return -1; } DBOpParams params = {}; InitializeParams(dpp, ¶ms); params.op.query_str = query_str; // validate query_str with UserTable entries names if (query_str == "username") { params.op.user.uinfo.display_name = query_str_val; } else if (query_str == "email") { params.op.user.uinfo.user_email = query_str_val; } else if (query_str == "access_key") { RGWAccessKey k(query_str_val, ""); map keys; keys[query_str_val] = k; params.op.user.uinfo.access_keys = keys; } else if (query_str == "user_id") { params.op.user.uinfo.user_id = uinfo.user_id; } else { ldpp_dout(dpp, 0)<<"In GetUser Invalid query string :" <read_version = params.op.user.user_version; } out: return ret; } int DB::store_user(const DoutPrefixProvider *dpp, RGWUserInfo& uinfo, bool exclusive, map *pattrs, RGWObjVersionTracker *pobjv, RGWUserInfo* pold_info) { DBOpParams params = {}; InitializeParams(dpp, ¶ms); int ret = 0; /* Check if the user already exists and return the old info, caller will have a use for it */ RGWUserInfo orig_info; RGWObjVersionTracker objv_tracker = {}; obj_version& obj_ver = objv_tracker.read_version; orig_info.user_id = uinfo.user_id; ret = get_user(dpp, string("user_id"), uinfo.user_id.id, orig_info, nullptr, &objv_tracker); if (!ret && obj_ver.ver) { /* already exists. */ if (pold_info) { *pold_info = orig_info; } if (pobjv && (pobjv->read_version.ver != obj_ver.ver)) { /* Object version mismatch.. return ECANCELED */ ret = -ECANCELED; ldpp_dout(dpp, 0)<<"User Read version mismatch err:(" <read_version = obj_ver; pobjv->write_version = obj_ver; } out: return ret; } int DB::remove_user(const DoutPrefixProvider *dpp, RGWUserInfo& uinfo, RGWObjVersionTracker *pobjv) { DBOpParams params = {}; InitializeParams(dpp, ¶ms); int ret = 0; RGWUserInfo orig_info; RGWObjVersionTracker objv_tracker = {}; orig_info.user_id = uinfo.user_id; ret = get_user(dpp, string("user_id"), uinfo.user_id.id, orig_info, nullptr, &objv_tracker); if (ret) { return ret; } if (!ret && objv_tracker.read_version.ver) { /* already exists. */ if (pobjv && (pobjv->read_version.ver != objv_tracker.read_version.ver)) { /* Object version mismatch.. return ECANCELED */ ret = -ECANCELED; ldpp_dout(dpp, 0)<<"User Read version mismatch err:(" <& attrs, RGWBucketInfo& info, obj_version *pobjv, obj_version *pep_objv, real_time creation_time, rgw_bucket *pmaster_bucket, uint32_t *pmaster_num_shards, optional_yield y, bool exclusive) { /* * XXX: Simple creation for now. * * Referring to RGWRados::create_bucket(), * Check if bucket already exists, select_bucket_placement, * is explicit put/remove instance info needed? - should not be ideally */ DBOpParams params = {}; InitializeParams(dpp, ¶ms); int ret = 0; /* Check if the bucket already exists and return the old info, caller will have a use for it */ RGWBucketInfo orig_info; orig_info.bucket.name = bucket.name; ret = get_bucket_info(dpp, string("name"), "", orig_info, nullptr, nullptr, nullptr); if (!ret && !orig_info.owner.id.empty() && exclusive) { /* already exists. Return the old info */ info = std::move(orig_info); return ret; } RGWObjVersionTracker& objv_tracker = info.objv_tracker; objv_tracker.read_version.clear(); if (pobjv) { objv_tracker.write_version = *pobjv; } else { objv_tracker.generate_new_write_ver(cct); } params.op.bucket.bucket_version = objv_tracker.write_version; objv_tracker.read_version = params.op.bucket.bucket_version; uint64_t bid = next_bucket_id(); string s = getDBname() + "." + std::to_string(bid); bucket.marker = bucket.bucket_id = s; info.bucket = bucket; info.owner = owner.user_id; info.zonegroup = zonegroup_id; info.placement_rule = placement_rule; info.swift_ver_location = swift_ver_location; info.swift_versioning = (!swift_ver_location.empty()); info.requester_pays = false; if (real_clock::is_zero(creation_time)) { info.creation_time = ceph::real_clock::now(); } else { info.creation_time = creation_time; } if (pquota_info) { info.quota = *pquota_info; } params.op.bucket.info = info; params.op.bucket.bucket_attrs = attrs; params.op.bucket.mtime = ceph::real_time(); params.op.user.uinfo.user_id.id = owner.user_id.id; ret = ProcessOp(dpp, "InsertBucket", ¶ms); if (ret) { ldpp_dout(dpp, 0)<<"create_bucket failed with err:(" <add(std::move(entry)); } if (query_str == "all") { // userID/OwnerID may have changed. Update it. user.id = params.op.bucket.info.owner.id; } out: return ret; } int DB::update_bucket(const DoutPrefixProvider *dpp, const std::string& query_str, RGWBucketInfo& info, bool exclusive, const rgw_user* powner_id, map* pattrs, ceph::real_time* pmtime, RGWObjVersionTracker* pobjv) { int ret = 0; DBOpParams params = {}; obj_version bucket_version; RGWBucketInfo orig_info; /* Check if the bucket already exists and return the old info, caller will have a use for it */ orig_info.bucket.name = info.bucket.name; params.op.bucket.info.bucket.name = info.bucket.name; ret = get_bucket_info(dpp, string("name"), "", orig_info, nullptr, nullptr, &bucket_version); if (ret) { ldpp_dout(dpp, 0)<<"Failed to read bucket info err:(" <read_version.ver != bucket_version.ver) { ldpp_dout(dpp, 0)<<"Read version mismatch err:(" <id; } else { params.op.user.uinfo.user_id.id = orig_info.owner.id; } /* Update version & mtime */ params.op.bucket.bucket_version.ver = ++(bucket_version.ver); if (pmtime) { params.op.bucket.mtime = *pmtime;; } else { params.op.bucket.mtime = ceph::real_time(); } if (query_str == "attrs") { params.op.query_str = "attrs"; params.op.bucket.bucket_attrs = *pattrs; } else if (query_str == "owner") { /* Update only owner i.e, chown. * Update creation_time too */ params.op.query_str = "owner"; params.op.bucket.info.creation_time = params.op.bucket.mtime; } else if (query_str == "info") { params.op.query_str = "info"; params.op.bucket.info = info; } else { ret = -1; ldpp_dout(dpp, 0)<<"In UpdateBucket Invalid query_str : " << query_str << dendl; goto out; } ret = ProcessOp(dpp, "UpdateBucket", ¶ms); if (ret) { ldpp_dout(dpp, 0)<<"In UpdateBucket failed err:(" <read_version = params.op.bucket.bucket_version; pobjv->write_version = params.op.bucket.bucket_version; } out: return ret; } /** * Get ordered listing of the objects in a bucket. * * max_p: maximum number of results to return * bucket: bucket to list contents of * prefix: only return results that match this prefix * delim: do not include results that match this string. * Any skipped results will have the matching portion of their name * inserted in common_prefixes with a "true" mark. * marker: if filled in, begin the listing with this object. * end_marker: if filled in, end the listing with this object. * result: the objects are put in here. * common_prefixes: if delim is filled in, any matching prefixes are * placed here. * is_truncated: if number of objects in the bucket is bigger than * max, then truncated. */ int DB::Bucket::List::list_objects(const DoutPrefixProvider *dpp, int64_t max, vector *result, map *common_prefixes, bool *is_truncated) { int ret = 0; DB *store = target->get_store(); int64_t count = 0; std::string prev_obj; DBOpParams db_params = {}; store->InitializeParams(dpp, &db_params); db_params.op.bucket.info = target->get_bucket_info(); /* XXX: Handle whole marker? key -> name, instance, ns? */ db_params.op.obj.min_marker = params.marker.name; db_params.op.obj.max_marker = params.end_marker.name; db_params.op.obj.prefix = params.prefix + "%"; db_params.op.list_max_count = max + 1; /* +1 for next_marker */ ret = store->ProcessOp(dpp, "ListBucketObjects", &db_params); if (ret) { ldpp_dout(dpp, 0)<<"In ListBucketObjects failed err:(" <= max) { *is_truncated = true; next_marker.name = entry.key.name; next_marker.instance = entry.key.instance; break; } if (!params.delim.empty()) { const std::string& objname = entry.key.name; const int delim_pos = objname.find(params.delim, params.prefix.size()); if (delim_pos >= 0) { /* extract key -with trailing delimiter- for CommonPrefix */ const std::string& prefix_key = objname.substr(0, delim_pos + params.delim.length()); if (common_prefixes && common_prefixes->find(prefix_key) == common_prefixes->end()) { next_marker = prefix_key; (*common_prefixes)[prefix_key] = true; count++; } continue; } } if (!params.end_marker.name.empty() && params.end_marker.name.compare(entry.key.name) <= 0) { // should not include end_marker *is_truncated = false; break; } count++; result->push_back(std::move(entry)); } out: return ret; } int DB::raw_obj::InitializeParamsfromRawObj(const DoutPrefixProvider *dpp, DBOpParams* params) { int ret = 0; if (!params) return -1; params->op.bucket.info.bucket.name = bucket_name; params->op.obj.state.obj.key.name = obj_name; params->op.obj.state.obj.key.instance = obj_instance; params->op.obj.state.obj.key.ns = obj_ns; params->op.obj.obj_id = obj_id; if (multipart_part_str != "0.0") { params->op.obj.is_multipart = true; } else { params->op.obj.is_multipart = false; } params->op.obj_data.multipart_part_str = multipart_part_str; params->op.obj_data.part_num = part_num; return ret; } int DB::Object::InitializeParamsfromObject(const DoutPrefixProvider *dpp, DBOpParams* params) { int ret = 0; string bucket = bucket_info.bucket.name; if (!params) return -1; params->op.bucket.info.bucket.name = bucket; params->op.obj.state.obj = obj; params->op.obj.obj_id = obj_id; return ret; } int DB::Object::get_object_impl(const DoutPrefixProvider *dpp, DBOpParams& params) { int ret = 0; if (params.op.obj.state.obj.key.name.empty()) { /* Initialize */ store->InitializeParams(dpp, ¶ms); InitializeParamsfromObject(dpp, ¶ms); } ret = store->ProcessOp(dpp, "GetObject", ¶ms); /* pick one field check if object exists */ if (!ret && !params.op.obj.state.exists) { ldpp_dout(dpp, 0)<<"Object(bucket:" << bucket_info.bucket.name << ", Object:"<< obj.key.name << ") doesn't exist" << dendl; ret = -ENOENT; } return ret; } int DB::Object::obj_omap_set_val_by_key(const DoutPrefixProvider *dpp, const std::string& key, bufferlist& val, bool must_exist) { int ret = 0; DBOpParams params = {}; ret = get_object_impl(dpp, params); if (ret) { ldpp_dout(dpp, 0) <<"get_object_impl failed err:(" <ProcessOp(dpp, "UpdateObject", ¶ms); if (ret) { ldpp_dout(dpp, 0)<<"In UpdateObject failed err:(" <& keys, std::map* vals) { int ret = 0; DBOpParams params = {}; std::map omap; if (!vals) return -1; ret = get_object_impl(dpp, params); if (ret) { ldpp_dout(dpp, 0) <<"get_object_impl failed err:(" <ProcessOp(dpp, "UpdateObject", ¶ms); if (ret) { ldpp_dout(dpp, 0)<<"In UpdateObject failed err:(" <& info) { int ret = 0; DBOpParams params = {}; std::map omap; ret = get_object_impl(dpp, params); if (ret) { ldpp_dout(dpp, 0) <<"get_object_impl failed err:(" <set_instance(buf); } int DB::Object::obj_omap_get_all(const DoutPrefixProvider *dpp, std::map *m) { int ret = 0; DBOpParams params = {}; std::map omap; if (!m) return -1; ret = get_object_impl(dpp, params); if (ret) { ldpp_dout(dpp, 0) <<"get_object_impl failed err:(" < *m, bool* pmore) { int ret = 0; DBOpParams params = {}; std::map omap; map::iterator iter; uint64_t count = 0; if (!m) return -1; ret = get_object_impl(dpp, params); if (ret) { ldpp_dout(dpp, 0) <<"get_object_impl failed err:(" <first < marker) continue; if ((++count) > max_count) { *pmore = true; break; } (*m)[iter->first] = iter->second; } out: return ret; } int DB::Object::set_attrs(const DoutPrefixProvider *dpp, map& setattrs, map* rmattrs) { int ret = 0; DBOpParams params = {}; rgw::sal::Attrs *attrs; map::iterator iter; RGWObjState* state; store->InitializeParams(dpp, ¶ms); InitializeParamsfromObject(dpp, ¶ms); ret = get_state(dpp, &state, true); if (ret && !state->exists) { ldpp_dout(dpp, 0) <<"get_state failed err:(" <begin(); iter != rmattrs->end(); ++iter) { (*attrs).erase(iter->first); } } for (iter = setattrs.begin(); iter != setattrs.end(); ++iter) { (*attrs)[iter->first] = iter->second; } params.op.query_str = "attrs"; /* As per https://docs.aws.amazon.com/AmazonS3/latest/userguide/UsingMetadata.html, * the only way for users to modify object metadata is to make a copy of the object and * set the metadata. * Hence do not update mtime for any other attr changes */ ret = store->ProcessOp(dpp, "UpdateObject", ¶ms); if (ret) { ldpp_dout(dpp, 0)<<"In UpdateObject failed err:(" < *attrset; store->InitializeParams(dpp, ¶ms); InitializeParamsfromObject(dpp, ¶ms); ret = store->ProcessOp(dpp, "GetObject", ¶ms); if (ret) { ldpp_dout(dpp, 0) <<"In GetObject failed err:(" <ProcessOp(dpp, "UpdateObject", ¶ms); if (ret) { ldpp_dout(dpp, 0)<<"In UpdateObject failed err:(" <InitializeParams(dpp, ¶ms); InitializeParamsfromRawObj(dpp, ¶ms); ret = db->ProcessOp(dpp, "GetObjectData", ¶ms); if (ret) { ldpp_dout(dpp, 0)<<"In GetObjectData failed err:(" <InitializeParams(dpp, ¶ms); InitializeParamsfromRawObj(dpp, ¶ms); /* XXX: Check for chunk_size ?? */ params.op.obj_data.offset = ofs; unsigned write_len = std::min((uint64_t)bl.length() - write_ofs, len); bl.begin(write_ofs).copy(write_len, params.op.obj_data.data); params.op.obj_data.size = params.op.obj_data.data.length(); params.op.obj.state.mtime = real_clock::now(); ret = db->ProcessOp(dpp, "PutObjectData", ¶ms); if (ret) { ldpp_dout(dpp, 0)<<"In PutObjectData failed err:(" <& list_entries) { int ret = 0; store = get_store(); DBOpParams db_params = {}; store->InitializeParams(dpp, &db_params); InitializeParamsfromObject(dpp, &db_params); db_params.op.list_max_count = MAX_VERSIONED_OBJECTS; ret = store->ProcessOp(dpp, "ListVersionedObjects", &db_params); if (ret) { ldpp_dout(dpp, 0)<<"In ListVersionedObjects failed err:(" <InitializeParams(dpp, ¶ms); InitializeParamsfromObject(dpp, ¶ms); params.op.obj.state.obj.key = ent.key; ret = get_object_impl(dpp, params); if (ret) { ldpp_dout(dpp, 0) <<"get_object_impl of versioned object failed err:(" <shadow_obj to store ObjectID string */ s->shadow_obj = params.op.obj.obj_id; *state = &obj_state; **state = *s; out: return ret; } int DB::Object::get_state(const DoutPrefixProvider *dpp, RGWObjState** pstate, bool follow_olh) { return get_obj_state(dpp, bucket_info, obj, follow_olh, pstate); } int DB::Object::Read::get_attr(const DoutPrefixProvider *dpp, const char *name, bufferlist& dest) { RGWObjState* state; int r = source->get_state(dpp, &state, true); if (r < 0) return r; if (!state->exists) return -ENOENT; if (!state->get_attr(name, dest)) return -ENODATA; return 0; } int DB::Object::Read::prepare(const DoutPrefixProvider *dpp) { DB *store = source->get_store(); CephContext *cct = store->ctx(); bufferlist etag; map::iterator iter; RGWObjState* astate; int r = source->get_state(dpp, &astate, true); if (r < 0) return r; if (!astate->exists) { return -ENOENT; } state.obj = astate->obj; source->obj_id = astate->shadow_obj; if (params.target_obj) { *params.target_obj = state.obj; } if (params.attrs) { *params.attrs = astate->attrset; if (cct->_conf->subsys.should_gather()) { for (iter = params.attrs->begin(); iter != params.attrs->end(); ++iter) { ldpp_dout(dpp, 20) << "Read xattr rgw_rados: " << iter->first << dendl; } } } if (conds.if_match || conds.if_nomatch) { r = get_attr(dpp, RGW_ATTR_ETAG, etag); if (r < 0) return r; if (conds.if_match) { string if_match_str = rgw_string_unquote(conds.if_match); ldpp_dout(dpp, 10) << "ETag: " << string(etag.c_str(), etag.length()) << " " << " If-Match: " << if_match_str << dendl; if (if_match_str.compare(0, etag.length(), etag.c_str(), etag.length()) != 0) { return -ERR_PRECONDITION_FAILED; } } if (conds.if_nomatch) { string if_nomatch_str = rgw_string_unquote(conds.if_nomatch); ldpp_dout(dpp, 10) << "ETag: " << string(etag.c_str(), etag.length()) << " " << " If-NoMatch: " << if_nomatch_str << dendl; if (if_nomatch_str.compare(0, etag.length(), etag.c_str(), etag.length()) == 0) { return -ERR_NOT_MODIFIED; } } } if (params.obj_size) *params.obj_size = astate->size; if (params.lastmod) *params.lastmod = astate->mtime; return 0; } int DB::Object::Read::range_to_ofs(uint64_t obj_size, int64_t &ofs, int64_t &end) { if (ofs < 0) { ofs += obj_size; if (ofs < 0) ofs = 0; end = obj_size - 1; } else if (end < 0) { end = obj_size - 1; } if (obj_size > 0) { if (ofs >= (off_t)obj_size) { return -ERANGE; } if (end >= (off_t)obj_size) { end = obj_size - 1; } } return 0; } int DB::Object::Read::read(int64_t ofs, int64_t end, bufferlist& bl, const DoutPrefixProvider *dpp) { DB *store = source->get_store(); uint64_t read_ofs = ofs; uint64_t len, read_len; bufferlist read_bl; uint64_t max_chunk_size = store->get_max_chunk_size(); RGWObjState* astate; int r = source->get_state(dpp, &astate, true); if (r < 0) return r; if (!astate->exists) { return -ENOENT; } if (astate->size == 0) { end = 0; } else if (end >= (int64_t)astate->size) { end = astate->size - 1; } if (end < 0) len = 0; else len = end - ofs + 1; if (len > max_chunk_size) { len = max_chunk_size; } int head_data_size = astate->data.length(); bool reading_from_head = (ofs < head_data_size); if (reading_from_head) { if (astate) { // && astate->prefetch_data)? if (!ofs && astate->data.length() >= len) { bl = astate->data; return bl.length(); } if (ofs < astate->data.length()) { unsigned copy_len = std::min((uint64_t)head_data_size - ofs, len); astate->data.begin(ofs).copy(copy_len, bl); return bl.length(); } } } /* tail object */ int part_num = (ofs / max_chunk_size); /* XXX: Handle multipart_str */ raw_obj read_obj(store, source->get_bucket_info().bucket.name, astate->obj.key.name, astate->obj.key.instance, astate->obj.key.ns, source->obj_id, "0.0", part_num); read_len = len; ldpp_dout(dpp, 20) << "dbstore->read obj-ofs=" << ofs << " read_ofs=" << read_ofs << " read_len=" << read_len << dendl; // read from non head object r = read_obj.read(dpp, read_ofs, read_len, bl); if (r < 0) { return r; } return bl.length(); } static int _get_obj_iterate_cb(const DoutPrefixProvider *dpp, const DB::raw_obj& read_obj, off_t obj_ofs, off_t len, bool is_head_obj, RGWObjState* astate, void *arg) { struct db_get_obj_data* d = static_cast(arg); return d->store->get_obj_iterate_cb(dpp, read_obj, obj_ofs, len, is_head_obj, astate, arg); } int DB::get_obj_iterate_cb(const DoutPrefixProvider *dpp, const raw_obj& read_obj, off_t obj_ofs, off_t len, bool is_head_obj, RGWObjState* astate, void *arg) { struct db_get_obj_data* d = static_cast(arg); bufferlist bl; int r = 0; if (is_head_obj) { bl = astate->data; } else { // read from non head object raw_obj robj = read_obj; /* read entire data. So pass offset as '0' & len as '-1' */ r = robj.read(dpp, 0, -1, bl); if (r <= 0) { return r; } } unsigned read_ofs = 0, read_len = 0; while (read_ofs < bl.length()) { unsigned chunk_len = std::min((uint64_t)bl.length() - read_ofs, (uint64_t)len); r = d->client_cb->handle_data(bl, read_ofs, chunk_len); if (r < 0) return r; read_ofs += chunk_len; read_len += chunk_len; ldpp_dout(dpp, 20) << "dbstore->get_obj_iterate_cb obj-ofs=" << obj_ofs << " len=" << len << " chunk_len = " << chunk_len << " read_len = " << read_len << dendl; } d->offset += read_len; return read_len; } int DB::Object::Read::iterate(const DoutPrefixProvider *dpp, int64_t ofs, int64_t end, RGWGetDataCB *cb) { DB *store = source->get_store(); const uint64_t chunk_size = store->get_max_chunk_size(); db_get_obj_data data(store, cb, ofs); int r = source->iterate_obj(dpp, source->get_bucket_info(), state.obj, ofs, end, chunk_size, _get_obj_iterate_cb, &data); if (r < 0) { ldpp_dout(dpp, 0) << "iterate_obj() failed with " << r << dendl; return r; } return 0; } int DB::Object::iterate_obj(const DoutPrefixProvider *dpp, const RGWBucketInfo& bucket_info, const rgw_obj& obj, off_t ofs, off_t end, uint64_t max_chunk_size, iterate_obj_cb cb, void *arg) { DB *store = get_store(); uint64_t len; RGWObjState* astate; int r = get_state(dpp, &astate, true); if (r < 0) { return r; } if (!astate->exists) { return -ENOENT; } if (end < 0) len = 0; else len = end - ofs + 1; /* XXX: Will it really help to store all parts info in astate like manifest in Rados? */ int part_num = 0; int head_data_size = astate->data.length(); while (ofs <= end && (uint64_t)ofs < astate->size) { part_num = (ofs / max_chunk_size); uint64_t read_len = std::min(len, max_chunk_size); /* XXX: Handle multipart_str */ raw_obj read_obj(store, get_bucket_info().bucket.name, astate->obj.key.name, astate->obj.key.instance, astate->obj.key.ns, obj_id, "0.0", part_num); bool reading_from_head = (ofs < head_data_size); r = cb(dpp, read_obj, ofs, read_len, reading_from_head, astate, arg); if (r <= 0) { return r; } /* r refers to chunk_len (no. of bytes) handled in cb */ len -= r; ofs += r; } return 0; } int DB::Object::Write::prepare(const DoutPrefixProvider* dpp) { DB *store = target->get_store(); int ret = -1; /* XXX: handle assume_noent */ obj_state.obj = target->obj; if (target->obj_id.empty()) { if (!target->obj.key.instance.empty() && (target->obj.key.instance != "null")) { /* versioned object. Set obj_id same as versionID/instance */ target->obj_id = target->obj.key.instance; } else { // generate obj_id char buf[33]; gen_rand_alphanumeric(store->ctx(), buf, sizeof(buf) - 1); target->obj_id = buf; } } ret = 0; return ret; } /* writes tail objects */ int DB::Object::Write::write_data(const DoutPrefixProvider* dpp, bufferlist& data, uint64_t ofs) { DB *store = target->get_store(); /* tail objects */ /* XXX: Split into parts each of max_chunk_size. But later make tail * object chunk size limit to sqlite blob limit */ int part_num = 0; uint64_t max_chunk_size = store->get_max_chunk_size(); /* tail_obj ofs should be greater than max_head_size */ if (mp_part_str == "0.0") { // ensure not multipart meta object if (ofs < store->get_max_head_size()) { return -1; } } uint64_t end = data.length(); uint64_t write_ofs = 0; /* as we are writing max_chunk_size at a time in sal_dbstore DBAtomicWriter::process(), * maybe this while loop is not needed */ while (write_ofs < end) { part_num = (ofs / max_chunk_size); uint64_t len = std::min(end, max_chunk_size); /* XXX: Handle multipart_str */ raw_obj write_obj(store, target->get_bucket_info().bucket.name, obj_state.obj.key.name, obj_state.obj.key.instance, obj_state.obj.key.ns, target->obj_id, mp_part_str, part_num); ldpp_dout(dpp, 20) << "dbstore->write obj-ofs=" << ofs << " write_len=" << len << dendl; // write into non head object int r = write_obj.write(dpp, ofs, write_ofs, len, data); if (r < 0) { return r; } /* r refers to chunk_len (no. of bytes) handled in raw_obj::write */ len -= r; ofs += r; write_ofs += r; } return 0; } /* Write metadata & head object data */ int DB::Object::Write::_do_write_meta(const DoutPrefixProvider *dpp, uint64_t size, uint64_t accounted_size, map& attrs, bool assume_noent, bool modify_tail) { DB *store = target->get_store(); RGWObjState* state = &obj_state; map *attrset; DBOpParams params = {}; int ret = 0; string etag; string content_type; bufferlist acl_bl; string storage_class; map::iterator iter; store->InitializeParams(dpp, ¶ms); target->InitializeParamsfromObject(dpp, ¶ms); obj_state = params.op.obj.state; if (real_clock::is_zero(meta.set_mtime)) { meta.set_mtime = real_clock::now(); } attrset = &state->attrset; if (target->bucket_info.obj_lock_enabled() && target->bucket_info.obj_lock.has_rule()) { // && meta.flags == PUT_OBJ_CREATE) { auto iter = attrs.find(RGW_ATTR_OBJECT_RETENTION); if (iter == attrs.end()) { real_time lock_until_date = target->bucket_info.obj_lock.get_lock_until_date(meta.set_mtime); string mode = target->bucket_info.obj_lock.get_mode(); RGWObjectRetention obj_retention(mode, lock_until_date); bufferlist bl; obj_retention.encode(bl); (*attrset)[RGW_ATTR_OBJECT_RETENTION] = bl; } } state->mtime = meta.set_mtime; if (meta.data) { /* if we want to overwrite the data, we also want to overwrite the xattrs, so just remove the object */ params.op.obj.head_data = *meta.data; } if (meta.rmattrs) { for (iter = meta.rmattrs->begin(); iter != meta.rmattrs->end(); ++iter) { const string& name = iter->first; (*attrset).erase(name.c_str()); } } if (meta.manifest) { storage_class = meta.manifest->get_tail_placement().placement_rule.storage_class; /* remove existing manifest attr */ iter = attrs.find(RGW_ATTR_MANIFEST); if (iter != attrs.end()) attrs.erase(iter); bufferlist bl; encode(*meta.manifest, bl); (*attrset)[RGW_ATTR_MANIFEST] = bl; } for (iter = attrs.begin(); iter != attrs.end(); ++iter) { const string& name = iter->first; bufferlist& bl = iter->second; if (!bl.length()) continue; (*attrset)[name.c_str()] = bl; if (name.compare(RGW_ATTR_ETAG) == 0) { etag = rgw_bl_str(bl); params.op.obj.etag = etag; } else if (name.compare(RGW_ATTR_CONTENT_TYPE) == 0) { content_type = rgw_bl_str(bl); } else if (name.compare(RGW_ATTR_ACL) == 0) { acl_bl = bl; } } if (!storage_class.empty()) { bufferlist bl; bl.append(storage_class); (*attrset)[RGW_ATTR_STORAGE_CLASS] = bl; } params.op.obj.state = *state ; params.op.obj.state.exists = true; params.op.obj.state.size = size; params.op.obj.state.accounted_size = accounted_size; params.op.obj.owner = target->get_bucket_info().owner.id; params.op.obj.category = meta.category; if (meta.mtime) { *meta.mtime = meta.set_mtime; } params.op.query_str = "meta"; params.op.obj.obj_id = target->obj_id; /* Check if versioned */ bool is_versioned = !target->obj.key.instance.empty() && (target->obj.key.instance != "null"); params.op.obj.is_versioned = is_versioned; if (is_versioned && (params.op.obj.category == RGWObjCategory::Main)) { /* versioned object */ params.op.obj.flags |= rgw_bucket_dir_entry::FLAG_VER; } ret = store->ProcessOp(dpp, "PutObject", ¶ms); if (ret) { ldpp_dout(dpp, 0)<<"In PutObject failed err:(" <list_versioned_objects(dpp, del_params.op.obj.list_entries); if (ret) { ldpp_dout(dpp, 0)<<"ListVersionedObjects failed err:(" <get_store(); ret = store->ProcessOp(dpp, "DeleteObject", &del_params); if (ret) { ldpp_dout(dpp, 0) << "In DeleteObject failed err:(" <ProcessOp(dpp, "UpdateObjectData", &update_params); if (ret) { ldpp_dout(dpp, 0) << "Updating tail objects mtime failed err:(" <get_store(); bool versioning_suspended = ((params.versioning_status & BUCKET_VERSIONS_SUSPENDED) == BUCKET_VERSIONS_SUSPENDED); int ret = -1; DBOpParams olh_params = {}; std::string version_id; DBOpParams next_params = del_params; version_id = del_params.op.obj.state.obj.key.instance; DBOpParams dm_params = del_params; // create delete marker store->InitializeParams(dpp, &dm_params); target->InitializeParamsfromObject(dpp, &dm_params); dm_params.op.obj.category = RGWObjCategory::None; if (versioning_suspended) { dm_params.op.obj.state.obj.key.instance = "null"; } else { store->gen_rand_obj_instance_name(&dm_params.op.obj.state.obj.key); dm_params.op.obj.obj_id = dm_params.op.obj.state.obj.key.instance; } dm_params.op.obj.flags |= (rgw_bucket_dir_entry::FLAG_DELETE_MARKER); ret = store->ProcessOp(dpp, "PutObject", &dm_params); if (ret) { ldpp_dout(dpp, 0) << "delete_olh: failed to create delete marker - err:(" <* entry) { int ret = 0; const DoutPrefixProvider *dpp = get_def_dpp(); DBOpParams params = {}; InitializeParams(dpp, ¶ms); params.op.lc_entry.index = oid; params.op.lc_entry.entry.set_bucket(marker); params.op.query_str = "get_entry"; ret = ProcessOp(dpp, "GetLCEntry", ¶ms); if (ret) { ldpp_dout(dpp, 0)<<"In GetLCEntry failed err:(" <reset(e); } out: return ret; } int DB::get_next_entry(const std::string& oid, const std::string& marker, std::unique_ptr* entry) { int ret = 0; const DoutPrefixProvider *dpp = get_def_dpp(); DBOpParams params = {}; InitializeParams(dpp, ¶ms); params.op.lc_entry.index = oid; params.op.lc_entry.entry.set_bucket(marker); params.op.query_str = "get_next_entry"; ret = ProcessOp(dpp, "GetLCEntry", ¶ms); if (ret) { ldpp_dout(dpp, 0)<<"In GetLCEntry failed err:(" <reset(e); } out: return ret; } int DB::set_entry(const std::string& oid, rgw::sal::Lifecycle::LCEntry& entry) { int ret = 0; const DoutPrefixProvider *dpp = get_def_dpp(); DBOpParams params = {}; InitializeParams(dpp, ¶ms); params.op.lc_entry.index = oid; params.op.lc_entry.entry = entry; ret = ProcessOp(dpp, "InsertLCEntry", ¶ms); if (ret) { ldpp_dout(dpp, 0)<<"In InsertLCEntry failed err:(" <>& entries) { int ret = 0; const DoutPrefixProvider *dpp = get_def_dpp(); entries.clear(); DBOpParams params = {}; InitializeParams(dpp, ¶ms); params.op.lc_entry.index = oid; params.op.lc_entry.min_marker = marker; params.op.list_max_count = max_entries; ret = ProcessOp(dpp, "ListLCEntries", ¶ms); if (ret) { ldpp_dout(dpp, 0)<<"In ListLCEntries failed err:(" <(std::move(entry))); } out: return ret; } int DB::rm_entry(const std::string& oid, rgw::sal::Lifecycle::LCEntry& entry) { int ret = 0; const DoutPrefixProvider *dpp = get_def_dpp(); DBOpParams params = {}; InitializeParams(dpp, ¶ms); params.op.lc_entry.index = oid; params.op.lc_entry.entry = entry; ret = ProcessOp(dpp, "RemoveLCEntry", ¶ms); if (ret) { ldpp_dout(dpp, 0)<<"In RemoveLCEntry failed err:(" <* head) { int ret = 0; const DoutPrefixProvider *dpp = get_def_dpp(); DBOpParams params = {}; InitializeParams(dpp, ¶ms); params.op.lc_head.index = oid; ret = ProcessOp(dpp, "GetLCHead", ¶ms); if (ret) { ldpp_dout(dpp, 0)<<"In GetLCHead failed err:(" <(params.op.lc_head.head); out: return ret; } int DB::put_head(const std::string& oid, rgw::sal::Lifecycle::LCHead& head) { int ret = 0; const DoutPrefixProvider *dpp = get_def_dpp(); DBOpParams params = {}; InitializeParams(dpp, ¶ms); params.op.lc_head.index = oid; params.op.lc_head.head = head; ret = ProcessOp(dpp, "InsertLCHead", ¶ms); if (ret) { ldpp_dout(dpp, 0)<<"In InsertLCHead failed err:(" < lk(mtx); ldpp_dout(dpp, 2) << " DB GC started " << dendl; int max = 100; RGWUserBuckets buckets; bool is_truncated = false; do { std::string& marker = bucket_marker; rgw_user user; user.id = user_marker; buckets.clear(); is_truncated = false; int r = db->list_buckets(dpp, "all", user, marker, string(), max, false, &buckets, &is_truncated); if (r < 0) { //do nothing? retry later ? break; } for (const auto& ent : buckets.get_buckets()) { const std::string &bname = ent.first; r = db->delete_stale_objs(dpp, bname, gc_obj_min_wait); if (r < 0) { //do nothing? skip to next entry? ldpp_dout(dpp, 2) << " delete_stale_objs failed for bucket( " << bname <<")" << dendl; } bucket_marker = bname; user_marker = user.id; /* XXX: If using locks, unlock here and reacquire in the next iteration */ cv.wait_for(lk, std::chrono::milliseconds(100)); if (stop_signalled) { goto done; } } } while(is_truncated); bucket_marker.clear(); cv.wait_for(lk, std::chrono::milliseconds(gc_interval*10)); } while(! stop_signalled); done: return nullptr; } } } // namespace rgw::store