From 19fcec84d8d7d21e796c7624e521b60d28ee21ed Mon Sep 17 00:00:00 2001 From: Daniel Baumann Date: Sun, 7 Apr 2024 20:45:59 +0200 Subject: Adding upstream version 16.2.11+ds. Signed-off-by: Daniel Baumann --- src/crimson/osd/pg_backend.cc | 1171 +++++++++++++++++++++++++++++++++++++++++ 1 file changed, 1171 insertions(+) create mode 100644 src/crimson/osd/pg_backend.cc (limited to 'src/crimson/osd/pg_backend.cc') diff --git a/src/crimson/osd/pg_backend.cc b/src/crimson/osd/pg_backend.cc new file mode 100644 index 000000000..38dbdbf41 --- /dev/null +++ b/src/crimson/osd/pg_backend.cc @@ -0,0 +1,1171 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#include "pg_backend.h" + +#include +#include +#include +#include +#include +#include +#include + +#include "messages/MOSDOp.h" +#include "os/Transaction.h" +#include "common/Checksummer.h" +#include "common/Clock.h" + +#include "crimson/common/exception.h" +#include "crimson/os/futurized_collection.h" +#include "crimson/os/futurized_store.h" +#include "crimson/osd/osd_operation.h" +#include "replicated_backend.h" +#include "replicated_recovery_backend.h" +#include "ec_backend.h" +#include "exceptions.h" + +namespace { + seastar::logger& logger() { + return crimson::get_logger(ceph_subsys_osd); + } +} + +using crimson::common::local_conf; + +std::unique_ptr +PGBackend::create(pg_t pgid, + const pg_shard_t pg_shard, + const pg_pool_t& pool, + crimson::os::CollectionRef coll, + crimson::osd::ShardServices& shard_services, + const ec_profile_t& ec_profile) +{ + switch (pool.type) { + case pg_pool_t::TYPE_REPLICATED: + return std::make_unique(pgid, pg_shard, + coll, shard_services); + case pg_pool_t::TYPE_ERASURE: + return std::make_unique(pg_shard.shard, coll, shard_services, + std::move(ec_profile), + pool.stripe_width); + default: + throw runtime_error(seastar::format("unsupported pool type '{}'", + pool.type)); + } +} + +PGBackend::PGBackend(shard_id_t shard, + CollectionRef coll, + crimson::os::FuturizedStore* store) + : shard{shard}, + coll{coll}, + store{store} +{} + +PGBackend::load_metadata_ertr::future +PGBackend::load_metadata(const hobject_t& oid) +{ + if (__builtin_expect(stopping, false)) { + throw crimson::common::system_shutdown_exception(); + } + + return store->get_attrs( + coll, + ghobject_t{oid, ghobject_t::NO_GEN, shard}).safe_then( + [oid](auto &&attrs) -> load_metadata_ertr::future{ + loaded_object_md_t::ref ret(new loaded_object_md_t()); + if (auto oiiter = attrs.find(OI_ATTR); oiiter != attrs.end()) { + bufferlist bl; + bl.push_back(std::move(oiiter->second)); + ret->os = ObjectState( + object_info_t(bl), + true); + } else { + logger().error( + "load_metadata: object {} present but missing object info", + oid); + return crimson::ct_error::object_corrupted::make(); + } + + if (oid.is_head()) { + if (auto ssiter = attrs.find(SS_ATTR); ssiter != attrs.end()) { + bufferlist bl; + bl.push_back(std::move(ssiter->second)); + ret->ss = SnapSet(bl); + } else { + /* TODO: add support for writing out snapsets + logger().error( + "load_metadata: object {} present but missing snapset", + oid); + //return crimson::ct_error::object_corrupted::make(); + */ + ret->ss = SnapSet(); + } + } + + return load_metadata_ertr::make_ready_future( + std::move(ret)); + }, crimson::ct_error::enoent::handle([oid] { + logger().debug( + "load_metadata: object {} doesn't exist, returning empty metadata", + oid); + return load_metadata_ertr::make_ready_future( + new loaded_object_md_t{ + ObjectState( + object_info_t(oid), + false), + oid.is_head() ? std::optional(SnapSet()) : std::nullopt + }); + })); +} + +seastar::future +PGBackend::mutate_object( + std::set pg_shards, + crimson::osd::ObjectContextRef &&obc, + ceph::os::Transaction&& txn, + const osd_op_params_t& osd_op_p, + epoch_t min_epoch, + epoch_t map_epoch, + std::vector&& log_entries) +{ + logger().trace("mutate_object: num_ops={}", txn.get_num_ops()); + if (obc->obs.exists) { +#if 0 + obc->obs.oi.version = ctx->at_version; + obc->obs.oi.prior_version = ctx->obs->oi.version; +#endif + + auto& m = osd_op_p.req; + obc->obs.oi.prior_version = obc->obs.oi.version; + obc->obs.oi.version = osd_op_p.at_version; + if (osd_op_p.user_at_version > obc->obs.oi.user_version) + obc->obs.oi.user_version = osd_op_p.user_at_version; + obc->obs.oi.last_reqid = m->get_reqid(); + obc->obs.oi.mtime = m->get_mtime(); + obc->obs.oi.local_mtime = ceph_clock_now(); + + // object_info_t + { + ceph::bufferlist osv; + encode(obc->obs.oi, osv, CEPH_FEATURES_ALL); + // TODO: get_osdmap()->get_features(CEPH_ENTITY_TYPE_OSD, nullptr)); + txn.setattr(coll->get_cid(), ghobject_t{obc->obs.oi.soid}, OI_ATTR, osv); + } + } else { + // reset cached ObjectState without enforcing eviction + obc->obs.oi = object_info_t(obc->obs.oi.soid); + } + return _submit_transaction( + std::move(pg_shards), obc->obs.oi.soid, std::move(txn), + std::move(osd_op_p), min_epoch, map_epoch, std::move(log_entries)); +} + +static inline bool _read_verify_data( + const object_info_t& oi, + const ceph::bufferlist& data) +{ + if (oi.is_data_digest() && oi.size == data.length()) { + // whole object? can we verify the checksum? + if (auto crc = data.crc32c(-1); crc != oi.data_digest) { + logger().error("full-object read crc {} != expected {} on {}", + crc, oi.data_digest, oi.soid); + // todo: mark soid missing, perform recovery, and retry + return false; + } + } + return true; +} + +PGBackend::read_errorator::future<> +PGBackend::read(const ObjectState& os, OSDOp& osd_op) +{ + const auto& oi = os.oi; + const ceph_osd_op& op = osd_op.op; + const uint64_t offset = op.extent.offset; + uint64_t length = op.extent.length; + logger().trace("read: {} {}~{}", oi.soid, offset, length); + + if (!os.exists || os.oi.is_whiteout()) { + logger().debug("{}: {} DNE", __func__, os.oi.soid); + return crimson::ct_error::enoent::make(); + } + // are we beyond truncate_size? + size_t size = oi.size; + if ((op.extent.truncate_seq > oi.truncate_seq) && + (op.extent.truncate_size < offset + length) && + (op.extent.truncate_size < size)) { + size = op.extent.truncate_size; + } + if (offset >= size) { + // read size was trimmed to zero and it is expected to do nothing, + return read_errorator::now(); + } + if (!length) { + // read the whole object if length is 0 + length = size; + } + return _read(oi.soid, offset, length, op.flags).safe_then( + [&oi, &osd_op](auto&& bl) -> read_errorator::future<> { + if (!_read_verify_data(oi, bl)) { + return crimson::ct_error::object_corrupted::make(); + } + logger().debug("read: data length: {}", bl.length()); + osd_op.rval = bl.length(); + osd_op.outdata = std::move(bl); + return read_errorator::now(); + }); +} + +PGBackend::read_errorator::future<> +PGBackend::sparse_read(const ObjectState& os, OSDOp& osd_op) +{ + const auto& op = osd_op.op; + logger().trace("sparse_read: {} {}~{}", + os.oi.soid, op.extent.offset, op.extent.length); + return store->fiemap(coll, ghobject_t{os.oi.soid}, + op.extent.offset, + op.extent.length).then([&os, &osd_op, this](auto&& m) { + return seastar::do_with(interval_set{std::move(m)}, + [&os, &osd_op, this](auto&& extents) { + return store->readv(coll, ghobject_t{os.oi.soid}, + extents, osd_op.op.flags).safe_then( + [&os, &osd_op, &extents](auto&& bl) -> read_errorator::future<> { + if (_read_verify_data(os.oi, bl)) { + osd_op.op.extent.length = bl.length(); + // re-encode since it might be modified + ceph::encode(extents, osd_op.outdata); + encode_destructively(bl, osd_op.outdata); + logger().trace("sparse_read got {} bytes from object {}", + osd_op.op.extent.length, os.oi.soid); + return read_errorator::make_ready_future<>(); + } else { + // TODO: repair it if crc mismatches + return crimson::ct_error::object_corrupted::make(); + } + }); + }); + }); +} + +namespace { + + template + PGBackend::checksum_errorator::future<> + do_checksum(ceph::bufferlist& init_value_bl, + size_t chunk_size, + const ceph::bufferlist& buf, + ceph::bufferlist& result) + { + typename CSum::init_value_t init_value; + auto init_value_p = init_value_bl.cbegin(); + try { + decode(init_value, init_value_p); + // chop off the consumed part + init_value_bl.splice(0, init_value_p.get_off()); + } catch (const ceph::buffer::end_of_buffer&) { + logger().warn("{}: init value not provided", __func__); + return crimson::ct_error::invarg::make(); + } + const uint32_t chunk_count = buf.length() / chunk_size; + ceph::bufferptr csum_data{ + ceph::buffer::create(sizeof(typename CSum::value_t) * chunk_count)}; + Checksummer::calculate( + init_value, chunk_size, 0, buf.length(), buf, &csum_data); + encode(chunk_count, result); + result.append(std::move(csum_data)); + return PGBackend::checksum_errorator::now(); + } +} + +PGBackend::checksum_errorator::future<> +PGBackend::checksum(const ObjectState& os, OSDOp& osd_op) +{ + // sanity tests and normalize the argments + auto& checksum = osd_op.op.checksum; + if (checksum.offset == 0 && checksum.length == 0) { + // zeroed offset+length implies checksum whole object + checksum.length = os.oi.size; + } else if (checksum.offset >= os.oi.size) { + // read size was trimmed to zero, do nothing, + // see PGBackend::read() + return checksum_errorator::now(); + } + if (checksum.chunk_size > 0) { + if (checksum.length == 0) { + logger().warn("{}: length required when chunk size provided", __func__); + return crimson::ct_error::invarg::make(); + } + if (checksum.length % checksum.chunk_size != 0) { + logger().warn("{}: length not aligned to chunk size", __func__); + return crimson::ct_error::invarg::make(); + } + } else { + checksum.chunk_size = checksum.length; + } + if (checksum.length == 0) { + uint32_t count = 0; + encode(count, osd_op.outdata); + return checksum_errorator::now(); + } + + // read the chunk to be checksum'ed + return _read(os.oi.soid, checksum.offset, checksum.length, osd_op.op.flags).safe_then( + [&osd_op](auto&& read_bl) mutable -> checksum_errorator::future<> { + auto& checksum = osd_op.op.checksum; + if (read_bl.length() != checksum.length) { + logger().warn("checksum: bytes read {} != {}", + read_bl.length(), checksum.length); + return crimson::ct_error::invarg::make(); + } + // calculate its checksum and put the result in outdata + switch (checksum.type) { + case CEPH_OSD_CHECKSUM_OP_TYPE_XXHASH32: + return do_checksum(osd_op.indata, + checksum.chunk_size, + read_bl, + osd_op.outdata); + case CEPH_OSD_CHECKSUM_OP_TYPE_XXHASH64: + return do_checksum(osd_op.indata, + checksum.chunk_size, + read_bl, + osd_op.outdata); + case CEPH_OSD_CHECKSUM_OP_TYPE_CRC32C: + return do_checksum(osd_op.indata, + checksum.chunk_size, + read_bl, + osd_op.outdata); + default: + logger().warn("checksum: unknown crc type ({})", + static_cast(checksum.type)); + return crimson::ct_error::invarg::make(); + } + }); +} + +PGBackend::cmp_ext_errorator::future<> +PGBackend::cmp_ext(const ObjectState& os, OSDOp& osd_op) +{ + const ceph_osd_op& op = osd_op.op; + // return the index of the first unmatched byte in the payload, hence the + // strange limit and check + if (op.extent.length > MAX_ERRNO) { + return crimson::ct_error::invarg::make(); + } + uint64_t obj_size = os.oi.size; + if (os.oi.truncate_seq < op.extent.truncate_seq && + op.extent.offset + op.extent.length > op.extent.truncate_size) { + obj_size = op.extent.truncate_size; + } + uint64_t ext_len; + if (op.extent.offset >= obj_size) { + ext_len = 0; + } else if (op.extent.offset + op.extent.length > obj_size) { + ext_len = obj_size - op.extent.offset; + } else { + ext_len = op.extent.length; + } + auto read_ext = ll_read_errorator::make_ready_future(); + if (ext_len == 0) { + logger().debug("{}: zero length extent", __func__); + } else if (!os.exists || os.oi.is_whiteout()) { + logger().debug("{}: {} DNE", __func__, os.oi.soid); + } else { + read_ext = _read(os.oi.soid, op.extent.offset, ext_len, 0); + } + return read_ext.safe_then([&osd_op](auto&& read_bl) { + int32_t retcode = 0; + for (unsigned index = 0; index < osd_op.indata.length(); index++) { + char byte_in_op = osd_op.indata[index]; + char byte_from_disk = (index < read_bl.length() ? read_bl[index] : 0); + if (byte_in_op != byte_from_disk) { + logger().debug("cmp_ext: mismatch at {}", index); + retcode = -MAX_ERRNO - index; + break; + } + } + logger().debug("cmp_ext: {}", retcode); + osd_op.rval = retcode; + }); +} + +PGBackend::stat_errorator::future<> PGBackend::stat( + const ObjectState& os, + OSDOp& osd_op) +{ + if (os.exists/* TODO: && !os.is_whiteout() */) { + logger().debug("stat os.oi.size={}, os.oi.mtime={}", os.oi.size, os.oi.mtime); + encode(os.oi.size, osd_op.outdata); + encode(os.oi.mtime, osd_op.outdata); + } else { + logger().debug("stat object does not exist"); + return crimson::ct_error::enoent::make(); + } + return stat_errorator::now(); + // TODO: ctx->delta_stats.num_rd++; +} + +bool PGBackend::maybe_create_new_object( + ObjectState& os, + ceph::os::Transaction& txn) +{ + if (!os.exists) { + ceph_assert(!os.oi.is_whiteout()); + os.exists = true; + os.oi.new_object(); + + txn.touch(coll->get_cid(), ghobject_t{os.oi.soid}); + // TODO: delta_stats.num_objects++ + return false; + } else if (os.oi.is_whiteout()) { + os.oi.clear_flag(object_info_t::FLAG_WHITEOUT); + // TODO: delta_stats.num_whiteouts-- + } + return true; +} + +static bool is_offset_and_length_valid( + const std::uint64_t offset, + const std::uint64_t length) +{ + if (const std::uint64_t max = local_conf()->osd_max_object_size; + offset >= max || length > max || offset + length > max) { + logger().debug("{} osd_max_object_size: {}, offset: {}, len: {}; " + "Hard limit of object size is 4GB", + __func__, max, offset, length); + return false; + } else { + return true; + } +} + +seastar::future<> PGBackend::write( + ObjectState& os, + const OSDOp& osd_op, + ceph::os::Transaction& txn, + osd_op_params_t& osd_op_params) +{ + const ceph_osd_op& op = osd_op.op; + uint64_t offset = op.extent.offset; + uint64_t length = op.extent.length; + bufferlist buf = osd_op.indata; + if (auto seq = os.oi.truncate_seq; + seq != 0 && op.extent.truncate_seq < seq) { + // old write, arrived after trimtrunc + if (offset + length > os.oi.size) { + // no-op + if (offset > os.oi.size) { + length = 0; + buf.clear(); + } else { + // truncate + auto len = os.oi.size - offset; + buf.splice(len, length); + length = len; + } + } + } else if (op.extent.truncate_seq > seq) { + // write arrives before trimtrunc + if (os.exists && !os.oi.is_whiteout()) { + txn.truncate(coll->get_cid(), + ghobject_t{os.oi.soid}, op.extent.truncate_size); + if (op.extent.truncate_size != os.oi.size) { + os.oi.size = length; + // TODO: truncate_update_size_and_usage() + if (op.extent.truncate_size > os.oi.size) { + osd_op_params.clean_regions.mark_data_region_dirty(os.oi.size, + op.extent.truncate_size - os.oi.size); + } else { + osd_op_params.clean_regions.mark_data_region_dirty(op.extent.truncate_size, + os.oi.size - op.extent.truncate_size); + } + } + } + os.oi.truncate_seq = op.extent.truncate_seq; + os.oi.truncate_size = op.extent.truncate_size; + } + maybe_create_new_object(os, txn); + if (length == 0) { + if (offset > os.oi.size) { + txn.truncate(coll->get_cid(), ghobject_t{os.oi.soid}, op.extent.offset); + } else { + txn.nop(); + } + } else { + txn.write(coll->get_cid(), ghobject_t{os.oi.soid}, + offset, length, std::move(buf), op.flags); + os.oi.size = std::max(offset + length, os.oi.size); + } + osd_op_params.clean_regions.mark_data_region_dirty(op.extent.offset, + op.extent.length); + + return seastar::now(); +} + +seastar::future<> PGBackend::write_same( + ObjectState& os, + const OSDOp& osd_op, + ceph::os::Transaction& txn, + osd_op_params_t& osd_op_params) +{ + const ceph_osd_op& op = osd_op.op; + const uint64_t len = op.writesame.length; + if (len == 0) { + return seastar::now(); + } + if (op.writesame.data_length == 0 || + len % op.writesame.data_length != 0 || + op.writesame.data_length != osd_op.indata.length()) { + throw crimson::osd::invalid_argument(); + } + ceph::bufferlist repeated_indata; + for (uint64_t size = 0; size < len; size += op.writesame.data_length) { + repeated_indata.append(osd_op.indata); + } + maybe_create_new_object(os, txn); + txn.write(coll->get_cid(), ghobject_t{os.oi.soid}, + op.writesame.offset, len, + std::move(repeated_indata), op.flags); + os.oi.size = len; + osd_op_params.clean_regions.mark_data_region_dirty(op.writesame.offset, len); + return seastar::now(); +} + +seastar::future<> PGBackend::writefull( + ObjectState& os, + const OSDOp& osd_op, + ceph::os::Transaction& txn, + osd_op_params_t& osd_op_params) +{ + const ceph_osd_op& op = osd_op.op; + if (op.extent.length != osd_op.indata.length()) { + throw crimson::osd::invalid_argument(); + } + + const bool existing = maybe_create_new_object(os, txn); + if (existing && op.extent.length < os.oi.size) { + txn.truncate(coll->get_cid(), ghobject_t{os.oi.soid}, op.extent.length); + osd_op_params.clean_regions.mark_data_region_dirty(op.extent.length, + os.oi.size - op.extent.length); + } + if (op.extent.length) { + txn.write(coll->get_cid(), ghobject_t{os.oi.soid}, 0, op.extent.length, + osd_op.indata, op.flags); + os.oi.size = op.extent.length; + osd_op_params.clean_regions.mark_data_region_dirty(0, + std::max((uint64_t) op.extent.length, os.oi.size)); + } + return seastar::now(); +} + +PGBackend::append_errorator::future<> PGBackend::append( + ObjectState& os, + OSDOp& osd_op, + ceph::os::Transaction& txn, + osd_op_params_t& osd_op_params) +{ + const ceph_osd_op& op = osd_op.op; + if (op.extent.length != osd_op.indata.length()) { + return crimson::ct_error::invarg::make(); + } + maybe_create_new_object(os, txn); + if (op.extent.length) { + txn.write(coll->get_cid(), ghobject_t{os.oi.soid}, + os.oi.size /* offset */, op.extent.length, + std::move(osd_op.indata), op.flags); + os.oi.size += op.extent.length; + osd_op_params.clean_regions.mark_data_region_dirty(os.oi.size, + op.extent.length); + } + return seastar::now(); +} + +PGBackend::write_ertr::future<> PGBackend::truncate( + ObjectState& os, + const OSDOp& osd_op, + ceph::os::Transaction& txn, + osd_op_params_t& osd_op_params) +{ + if (!os.exists || os.oi.is_whiteout()) { + logger().debug("{} object dne, truncate is a no-op", __func__); + return write_ertr::now(); + } + const ceph_osd_op& op = osd_op.op; + if (!is_offset_and_length_valid(op.extent.offset, op.extent.length)) { + return crimson::ct_error::file_too_large::make(); + } + if (op.extent.truncate_seq) { + assert(op.extent.offset == op.extent.truncate_size); + if (op.extent.truncate_seq <= os.oi.truncate_seq) { + logger().debug("{} truncate seq {} <= current {}, no-op", + __func__, op.extent.truncate_seq, os.oi.truncate_seq); + return write_ertr::make_ready_future<>(); + } else { + logger().debug("{} truncate seq {} > current {}, truncating", + __func__, op.extent.truncate_seq, os.oi.truncate_seq); + os.oi.truncate_seq = op.extent.truncate_seq; + os.oi.truncate_size = op.extent.truncate_size; + } + } + maybe_create_new_object(os, txn); + if (os.oi.size != op.extent.offset) { + txn.truncate(coll->get_cid(), + ghobject_t{os.oi.soid}, op.extent.offset); + if (os.oi.size > op.extent.offset) { + // TODO: modified_ranges.union_of(trim); + osd_op_params.clean_regions.mark_data_region_dirty( + op.extent.offset, + os.oi.size - op.extent.offset); + } else { + // os.oi.size < op.extent.offset + osd_op_params.clean_regions.mark_data_region_dirty( + os.oi.size, + op.extent.offset - os.oi.size); + } + os.oi.size = op.extent.offset; + os.oi.clear_data_digest(); + } + // TODO: truncate_update_size_and_usage() + // TODO: ctx->delta_stats.num_wr++; + // ---- + // do no set exists, or we will break above DELETE -> TRUNCATE munging. + return write_ertr::now(); +} + +PGBackend::write_ertr::future<> PGBackend::zero( + ObjectState& os, + const OSDOp& osd_op, + ceph::os::Transaction& txn, + osd_op_params_t& osd_op_params) +{ + if (!os.exists || os.oi.is_whiteout()) { + logger().debug("{} object dne, zero is a no-op", __func__); + return write_ertr::now(); + } + const ceph_osd_op& op = osd_op.op; + if (!is_offset_and_length_valid(op.extent.offset, op.extent.length)) { + return crimson::ct_error::file_too_large::make(); + } + assert(op.extent.length); + txn.zero(coll->get_cid(), + ghobject_t{os.oi.soid}, + op.extent.offset, + op.extent.length); + // TODO: modified_ranges.union_of(zeroed); + osd_op_params.clean_regions.mark_data_region_dirty(op.extent.offset, + op.extent.length); + // TODO: ctx->delta_stats.num_wr++; + os.oi.clear_data_digest(); + return write_ertr::now(); +} + +seastar::future<> PGBackend::create( + ObjectState& os, + const OSDOp& osd_op, + ceph::os::Transaction& txn) +{ + if (os.exists && !os.oi.is_whiteout() && + (osd_op.op.flags & CEPH_OSD_OP_FLAG_EXCL)) { + // this is an exclusive create + throw crimson::osd::make_error(-EEXIST); + } + + if (osd_op.indata.length()) { + // handle the legacy. `category` is no longer implemented. + try { + auto p = osd_op.indata.cbegin(); + std::string category; + decode(category, p); + } catch (buffer::error&) { + throw crimson::osd::invalid_argument(); + } + } + maybe_create_new_object(os, txn); + txn.nop(); + return seastar::now(); +} + +seastar::future<> PGBackend::remove(ObjectState& os, + ceph::os::Transaction& txn) +{ + // todo: snapset + txn.remove(coll->get_cid(), + ghobject_t{os.oi.soid, ghobject_t::NO_GEN, shard}); + os.oi.size = 0; + os.oi.new_object(); + os.exists = false; + // todo: update watchers + if (os.oi.is_whiteout()) { + os.oi.clear_flag(object_info_t::FLAG_WHITEOUT); + } + return seastar::now(); +} + +seastar::future, hobject_t>> +PGBackend::list_objects(const hobject_t& start, uint64_t limit) const +{ + if (__builtin_expect(stopping, false)) { + throw crimson::common::system_shutdown_exception(); + } + + auto gstart = start.is_min() ? ghobject_t{} : ghobject_t{start, 0, shard}; + return store->list_objects(coll, + gstart, + ghobject_t::get_max(), + limit) + .then([](auto ret) { + auto& [gobjects, next] = ret; + std::vector objects; + boost::copy(gobjects | + boost::adaptors::filtered([](const ghobject_t& o) { + if (o.is_pgmeta()) { + return false; + } else if (o.hobj.is_temp()) { + return false; + } else { + return o.is_no_gen(); + } + }) | + boost::adaptors::transformed([](const ghobject_t& o) { + return o.hobj; + }), + std::back_inserter(objects)); + return seastar::make_ready_future, hobject_t>>( + std::make_tuple(objects, next.hobj)); + }); +} + +seastar::future<> PGBackend::setxattr( + ObjectState& os, + const OSDOp& osd_op, + ceph::os::Transaction& txn) +{ + if (local_conf()->osd_max_attr_size > 0 && + osd_op.op.xattr.value_len > local_conf()->osd_max_attr_size) { + throw crimson::osd::make_error(-EFBIG); + } + + const auto max_name_len = std::min( + store->get_max_attr_name_length(), local_conf()->osd_max_attr_name_len); + if (osd_op.op.xattr.name_len > max_name_len) { + throw crimson::osd::make_error(-ENAMETOOLONG); + } + + maybe_create_new_object(os, txn); + + std::string name{"_"}; + ceph::bufferlist val; + { + auto bp = osd_op.indata.cbegin(); + bp.copy(osd_op.op.xattr.name_len, name); + bp.copy(osd_op.op.xattr.value_len, val); + } + logger().debug("setxattr on obj={} for attr={}", os.oi.soid, name); + + txn.setattr(coll->get_cid(), ghobject_t{os.oi.soid}, name, val); + return seastar::now(); + //ctx->delta_stats.num_wr++; +} + +PGBackend::get_attr_errorator::future<> PGBackend::getxattr( + const ObjectState& os, + OSDOp& osd_op) const +{ + std::string name; + ceph::bufferlist val; + { + auto bp = osd_op.indata.cbegin(); + std::string aname; + bp.copy(osd_op.op.xattr.name_len, aname); + name = "_" + aname; + } + logger().debug("getxattr on obj={} for attr={}", os.oi.soid, name); + return getxattr(os.oi.soid, name).safe_then([&osd_op] (ceph::bufferptr val) { + osd_op.outdata.clear(); + osd_op.outdata.push_back(std::move(val)); + osd_op.op.xattr.value_len = osd_op.outdata.length(); + return get_attr_errorator::now(); + //ctx->delta_stats.num_rd_kb += shift_round_up(osd_op.outdata.length(), 10); + }); + //ctx->delta_stats.num_rd++; +} + +PGBackend::get_attr_errorator::future PGBackend::getxattr( + const hobject_t& soid, + std::string_view key) const +{ + if (__builtin_expect(stopping, false)) { + throw crimson::common::system_shutdown_exception(); + } + + return store->get_attr(coll, ghobject_t{soid}, key); +} + +PGBackend::get_attr_errorator::future<> PGBackend::get_xattrs( + const ObjectState& os, + OSDOp& osd_op) const +{ + if (__builtin_expect(stopping, false)) { + throw crimson::common::system_shutdown_exception(); + } + return store->get_attrs(coll, ghobject_t{os.oi.soid}).safe_then( + [&osd_op](auto&& attrs) { + std::vector> user_xattrs; + for (auto& [key, val] : attrs) { + if (key.size() > 1 && key[0] == '_') { + ceph::bufferlist bl; + bl.append(std::move(val)); + user_xattrs.emplace_back(key.substr(1), std::move(bl)); + } + } + ceph::encode(user_xattrs, osd_op.outdata); + return get_attr_errorator::now(); + }); +} + +PGBackend::rm_xattr_ertr::future<> PGBackend::rm_xattr( + ObjectState& os, + const OSDOp& osd_op, + ceph::os::Transaction& txn) +{ + if (__builtin_expect(stopping, false)) { + throw crimson::common::system_shutdown_exception(); + } + if (!os.exists || os.oi.is_whiteout()) { + logger().debug("{}: {} DNE", __func__, os.oi.soid); + return crimson::ct_error::enoent::make(); + } + auto bp = osd_op.indata.cbegin(); + string attr_name{"_"}; + bp.copy(osd_op.op.xattr.name_len, attr_name); + txn.rmattr(coll->get_cid(), ghobject_t{os.oi.soid}, attr_name); + return rm_xattr_ertr::now(); +} + +using get_omap_ertr = + crimson::os::FuturizedStore::read_errorator::extend< + crimson::ct_error::enodata>; +static +get_omap_ertr::future< + crimson::os::FuturizedStore::omap_values_t> +maybe_get_omap_vals_by_keys( + crimson::os::FuturizedStore* store, + const crimson::os::CollectionRef& coll, + const object_info_t& oi, + const std::set& keys_to_get) +{ + if (oi.is_omap()) { + return store->omap_get_values(coll, ghobject_t{oi.soid}, keys_to_get); + } else { + return crimson::ct_error::enodata::make(); + } +} + +static +get_omap_ertr::future< + std::tuple> +maybe_get_omap_vals( + crimson::os::FuturizedStore* store, + const crimson::os::CollectionRef& coll, + const object_info_t& oi, + const std::string& start_after) +{ + if (oi.is_omap()) { + return store->omap_get_values(coll, ghobject_t{oi.soid}, start_after); + } else { + return crimson::ct_error::enodata::make(); + } +} + +PGBackend::ll_read_errorator::future +PGBackend::omap_get_header( + const crimson::os::CollectionRef& c, + const ghobject_t& oid) const +{ + return store->omap_get_header(c, oid); +} + +PGBackend::ll_read_errorator::future<> +PGBackend::omap_get_header( + const ObjectState& os, + OSDOp& osd_op) const +{ + return omap_get_header(coll, ghobject_t{os.oi.soid}).safe_then( + [&osd_op] (ceph::bufferlist&& header) { + osd_op.outdata = std::move(header); + return seastar::now(); + }); +} + +PGBackend::ll_read_errorator::future<> +PGBackend::omap_get_keys( + const ObjectState& os, + OSDOp& osd_op) const +{ + if (__builtin_expect(stopping, false)) { + throw crimson::common::system_shutdown_exception(); + } + if (!os.exists || os.oi.is_whiteout()) { + logger().debug("{}: object does not exist: {}", os.oi.soid); + return crimson::ct_error::enoent::make(); + } + std::string start_after; + uint64_t max_return; + try { + auto p = osd_op.indata.cbegin(); + decode(start_after, p); + decode(max_return, p); + } catch (buffer::error&) { + throw crimson::osd::invalid_argument{}; + } + max_return = + std::min(max_return, local_conf()->osd_max_omap_entries_per_request); + + // TODO: truly chunk the reading + return maybe_get_omap_vals(store, coll, os.oi, start_after).safe_then( + [=, &osd_op](auto ret) { + ceph::bufferlist result; + bool truncated = false; + uint32_t num = 0; + for (auto &[key, val] : std::get<1>(ret)) { + if (num >= max_return || + result.length() >= local_conf()->osd_max_omap_bytes_per_request) { + truncated = true; + break; + } + encode(key, result); + ++num; + } + encode(num, osd_op.outdata); + osd_op.outdata.claim_append(result); + encode(truncated, osd_op.outdata); + return seastar::now(); + }).handle_error( + crimson::ct_error::enodata::handle([&osd_op] { + uint32_t num = 0; + bool truncated = false; + encode(num, osd_op.outdata); + encode(truncated, osd_op.outdata); + return seastar::now(); + }), + ll_read_errorator::pass_further{} + ); + // TODO: + //ctx->delta_stats.num_rd_kb += shift_round_up(osd_op.outdata.length(), 10); + //ctx->delta_stats.num_rd++; +} + +PGBackend::ll_read_errorator::future<> +PGBackend::omap_get_vals( + const ObjectState& os, + OSDOp& osd_op) const +{ + if (__builtin_expect(stopping, false)) { + throw crimson::common::system_shutdown_exception(); + } + + std::string start_after; + uint64_t max_return; + std::string filter_prefix; + try { + auto p = osd_op.indata.cbegin(); + decode(start_after, p); + decode(max_return, p); + decode(filter_prefix, p); + } catch (buffer::error&) { + throw crimson::osd::invalid_argument{}; + } + + max_return = \ + std::min(max_return, local_conf()->osd_max_omap_entries_per_request); + + // TODO: truly chunk the reading + return maybe_get_omap_vals(store, coll, os.oi, start_after).safe_then( + [=, &osd_op] (auto&& ret) { + auto [done, vals] = std::move(ret); + assert(done); + ceph::bufferlist result; + bool truncated = false; + uint32_t num = 0; + auto iter = filter_prefix > start_after ? vals.lower_bound(filter_prefix) + : std::begin(vals); + for (; iter != std::end(vals); ++iter) { + const auto& [key, value] = *iter; + if (key.substr(0, filter_prefix.size()) != filter_prefix) { + break; + } else if (num >= max_return || + result.length() >= local_conf()->osd_max_omap_bytes_per_request) { + truncated = true; + break; + } + encode(key, result); + encode(value, result); + ++num; + } + encode(num, osd_op.outdata); + osd_op.outdata.claim_append(result); + encode(truncated, osd_op.outdata); + return ll_read_errorator::now(); + }).handle_error( + crimson::ct_error::enodata::handle([&osd_op] { + encode(uint32_t{0} /* num */, osd_op.outdata); + encode(bool{false} /* truncated */, osd_op.outdata); + return ll_read_errorator::now(); + }), + ll_read_errorator::pass_further{} + ); + + // TODO: + //ctx->delta_stats.num_rd_kb += shift_round_up(osd_op.outdata.length(), 10); + //ctx->delta_stats.num_rd++; +} + +PGBackend::ll_read_errorator::future<> +PGBackend::omap_get_vals_by_keys( + const ObjectState& os, + OSDOp& osd_op) const +{ + if (__builtin_expect(stopping, false)) { + throw crimson::common::system_shutdown_exception(); + } + if (!os.exists || os.oi.is_whiteout()) { + logger().debug("{}: object does not exist: {}", os.oi.soid); + return crimson::ct_error::enoent::make(); + } + + std::set keys_to_get; + try { + auto p = osd_op.indata.cbegin(); + decode(keys_to_get, p); + } catch (buffer::error&) { + throw crimson::osd::invalid_argument(); + } + return maybe_get_omap_vals_by_keys(store, coll, os.oi, keys_to_get).safe_then( + [&osd_op] (crimson::os::FuturizedStore::omap_values_t&& vals) { + encode(vals, osd_op.outdata); + return ll_read_errorator::now(); + }).handle_error( + crimson::ct_error::enodata::handle([&osd_op] { + uint32_t num = 0; + encode(num, osd_op.outdata); + return ll_read_errorator::now(); + }), + ll_read_errorator::pass_further{} + ); + + // TODO: + //ctx->delta_stats.num_rd_kb += shift_round_up(osd_op.outdata.length(), 10); + //ctx->delta_stats.num_rd++; +} + +seastar::future<> PGBackend::omap_set_vals( + ObjectState& os, + const OSDOp& osd_op, + ceph::os::Transaction& txn, + osd_op_params_t& osd_op_params) +{ + maybe_create_new_object(os, txn); + + ceph::bufferlist to_set_bl; + try { + auto p = osd_op.indata.cbegin(); + decode_str_str_map_to_bl(p, &to_set_bl); + } catch (buffer::error&) { + throw crimson::osd::invalid_argument{}; + } + + txn.omap_setkeys(coll->get_cid(), ghobject_t{os.oi.soid}, to_set_bl); + + // TODO: + //ctx->clean_regions.mark_omap_dirty(); + + // TODO: + //ctx->delta_stats.num_wr++; + //ctx->delta_stats.num_wr_kb += shift_round_up(to_set_bl.length(), 10); + os.oi.set_flag(object_info_t::FLAG_OMAP); + os.oi.clear_omap_digest(); + osd_op_params.clean_regions.mark_omap_dirty(); + return seastar::now(); +} + +seastar::future<> PGBackend::omap_set_header( + ObjectState& os, + const OSDOp& osd_op, + ceph::os::Transaction& txn) +{ + maybe_create_new_object(os, txn); + txn.omap_setheader(coll->get_cid(), ghobject_t{os.oi.soid}, osd_op.indata); + //TODO: + //ctx->clean_regions.mark_omap_dirty(); + //ctx->delta_stats.num_wr++; + os.oi.set_flag(object_info_t::FLAG_OMAP); + os.oi.clear_omap_digest(); + return seastar::now(); +} + +seastar::future<> PGBackend::omap_remove_range( + ObjectState& os, + const OSDOp& osd_op, + ceph::os::Transaction& txn) +{ + std::string key_begin, key_end; + try { + auto p = osd_op.indata.cbegin(); + decode(key_begin, p); + decode(key_end, p); + } catch (buffer::error& e) { + throw crimson::osd::invalid_argument{}; + } + txn.omap_rmkeyrange(coll->get_cid(), ghobject_t{os.oi.soid}, key_begin, key_end); + //TODO: + //ctx->delta_stats.num_wr++; + os.oi.clear_omap_digest(); + return seastar::now(); +} + +PGBackend::omap_clear_ertr::future<> +PGBackend::omap_clear( + ObjectState& os, + OSDOp& osd_op, + ceph::os::Transaction& txn, + osd_op_params_t& osd_op_params) +{ + if (__builtin_expect(stopping, false)) { + throw crimson::common::system_shutdown_exception(); + } + if (!os.exists || os.oi.is_whiteout()) { + logger().debug("{}: object does not exist: {}", os.oi.soid); + return crimson::ct_error::enoent::make(); + } + if (!os.oi.is_omap()) { + return omap_clear_ertr::now(); + } + txn.omap_clear(coll->get_cid(), ghobject_t{os.oi.soid}); + osd_op_params.clean_regions.mark_omap_dirty(); + os.oi.clear_omap_digest(); + os.oi.clear_flag(object_info_t::FLAG_OMAP); + return omap_clear_ertr::now(); +} + +seastar::future PGBackend::stat( + CollectionRef c, + const ghobject_t& oid) const +{ + return store->stat(c, oid); +} + +seastar::future> +PGBackend::fiemap( + CollectionRef c, + const ghobject_t& oid, + uint64_t off, + uint64_t len) +{ + return store->fiemap(c, oid, off, len); +} + +void PGBackend::on_activate_complete() { + peering.reset(); +} + -- cgit v1.2.3