From 19fcec84d8d7d21e796c7624e521b60d28ee21ed Mon Sep 17 00:00:00 2001 From: Daniel Baumann Date: Sun, 7 Apr 2024 20:45:59 +0200 Subject: Adding upstream version 16.2.11+ds. Signed-off-by: Daniel Baumann --- src/SimpleRADOSStriper.cc | 775 ++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 775 insertions(+) create mode 100644 src/SimpleRADOSStriper.cc (limited to 'src/SimpleRADOSStriper.cc') diff --git a/src/SimpleRADOSStriper.cc b/src/SimpleRADOSStriper.cc new file mode 100644 index 000000000..66bc653fb --- /dev/null +++ b/src/SimpleRADOSStriper.cc @@ -0,0 +1,775 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2021 Red Hat, Inc. + * + * This is free software; you can redistribute it and/or modify it under the + * terms of the GNU Lesser General Public License version 2.1, as published by + * the Free Software Foundation. See file COPYING. + * + */ + +#include + +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include + +#include +#include + +#include "include/ceph_assert.h" +#include "include/rados/librados.hpp" + +#include "cls/lock/cls_lock_client.h" + +#include "common/ceph_argparse.h" +#include "common/ceph_mutex.h" +#include "common/common_init.h" +#include "common/config.h" +#include "common/debug.h" +#include "common/errno.h" +#include "common/version.h" + +#include "SimpleRADOSStriper.h" + +using ceph::bufferlist; + +#define dout_subsys ceph_subsys_cephsqlite +#undef dout_prefix +#define dout_prefix *_dout << "client." << ioctx.get_instance_id() << ": SimpleRADOSStriper: " << __func__ << ": " << oid << ": " +#define d(lvl) ldout((CephContext*)ioctx.cct(), (lvl)) + +enum { + P_FIRST = 0xe0000, + P_UPDATE_METADATA, + P_UPDATE_ALLOCATED, + P_UPDATE_SIZE, + P_UPDATE_VERSION, + P_SHRINK, + P_SHRINK_BYTES, + P_LOCK, + P_UNLOCK, + P_LAST, +}; + +int SimpleRADOSStriper::config_logger(CephContext* cct, std::string_view name, std::shared_ptr* l) +{ + PerfCountersBuilder plb(cct, name.data(), P_FIRST, P_LAST); + plb.add_u64_counter(P_UPDATE_METADATA, "update_metadata", "Number of metadata updates"); + plb.add_u64_counter(P_UPDATE_ALLOCATED, "update_allocated", "Number of allocated updates"); + plb.add_u64_counter(P_UPDATE_SIZE, "update_size", "Number of size updates"); + plb.add_u64_counter(P_UPDATE_VERSION, "update_version", "Number of version updates"); + plb.add_u64_counter(P_SHRINK, "shrink", "Number of allocation shrinks"); + plb.add_u64_counter(P_SHRINK_BYTES, "shrink_bytes", "Bytes shrunk"); + plb.add_u64_counter(P_LOCK, "lock", "Number of locks"); + plb.add_u64_counter(P_UNLOCK, "unlock", "Number of unlocks"); + l->reset(plb.create_perf_counters()); + return 0; +} + +SimpleRADOSStriper::~SimpleRADOSStriper() +{ + if (lock_keeper.joinable()) { + shutdown = true; + lock_keeper_cvar.notify_all(); + lock_keeper.join(); + } + + if (ioctx.is_valid()) { + d(5) << dendl; + + if (is_locked()) { + unlock(); + } + } +} + +SimpleRADOSStriper::extent SimpleRADOSStriper::get_next_extent(uint64_t off, size_t len) const +{ + extent e; + { + uint64_t stripe = (off>>object_size); + CachedStackStringStream css; + *css << oid; + *css << "."; + *css << std::setw(16) << std::setfill('0') << std::hex << stripe; + e.soid = css->str(); + } + e.off = off & ((1<(len, (1<wait_for_complete(); + } else { + if (aiocp->is_complete()) { + rc = aiocp->get_return_value(); + } else { + return 0; + } + } + if (rc) { + d(5) << " aio failed: " << cpp_strerror(rc) << dendl; + if (aios_failure == 0) { + aios_failure = rc; + } + } + aios.pop(); + } + return aios_failure; +} + +int SimpleRADOSStriper::flush() +{ + d(5) << dendl; + + if (blocklisted.load()) { + return -EBLOCKLISTED; + } + + if (size_dirty) { + if (int rc = set_metadata(size, true); rc < 0) { + return rc; + } + } + + if (int rc = wait_for_aios(true); rc < 0) { + aios_failure = 0; + return rc; + } + + return 0; +} + +int SimpleRADOSStriper::stat(uint64_t* s) +{ + d(5) << dendl; + + if (blocklisted.load()) { + return -EBLOCKLISTED; + } + + *s = size; + return 0; +} + +int SimpleRADOSStriper::create() +{ + d(5) << dendl; + + if (blocklisted.load()) { + return -EBLOCKLISTED; + } + + auto ext = get_first_extent(); + auto op = librados::ObjectWriteOperation(); + /* exclusive create ensures we do none of these setxattrs happen if it fails */ + op.create(1); + op.setxattr(XATTR_VERSION, uint2bl(0)); + op.setxattr(XATTR_EXCL, bufferlist()); + op.setxattr(XATTR_SIZE, uint2bl(0)); + op.setxattr(XATTR_ALLOCATED, uint2bl(0)); + op.setxattr(XATTR_LAYOUT_STRIPE_UNIT, uint2bl(1)); + op.setxattr(XATTR_LAYOUT_STRIPE_COUNT, uint2bl(1)); + op.setxattr(XATTR_LAYOUT_OBJECT_SIZE, uint2bl(1< removes; + + ceph_assert(a <= allocated); + uint64_t prune = std::max(a, (1u << object_size)); /* never delete first extent here */ + uint64_t len = allocated - prune; + const uint64_t bytes_removed = len; + uint64_t offset = prune; + while (len > 0) { + auto ext = get_next_extent(offset, len); + auto aiocp = aiocompletionptr(librados::Rados::aio_create_completion()); + if (int rc = ioctx.aio_remove(ext.soid, aiocp.get()); rc < 0) { + d(5) << " aio_remove failed: " << cpp_strerror(rc) << dendl; + return rc; + } + removes.emplace_back(std::move(aiocp)); + len -= ext.len; + offset += ext.len; + } + + for (auto& aiocp : removes) { + if (int rc = aiocp->wait_for_complete(); rc < 0 && rc != -ENOENT) { + d(5) << " aio_remove failed: " << cpp_strerror(rc) << dendl; + return rc; + } + } + + auto ext = get_first_extent(); + auto op = librados::ObjectWriteOperation(); + auto aiocp = aiocompletionptr(librados::Rados::aio_create_completion()); + op.setxattr(XATTR_ALLOCATED, uint2bl(a)); + d(15) << " updating allocated to " << a << dendl; + op.setxattr(XATTR_VERSION, uint2bl(version+1)); + d(15) << " updating version to " << (version+1) << dendl; + if (int rc = ioctx.aio_operate(ext.soid, aiocp.get(), &op); rc < 0) { + d(5) << " update failed: " << cpp_strerror(rc) << dendl; + return rc; + } + /* we need to wait so we don't have dangling extents */ + d(10) << " waiting for allocated update" << dendl; + if (int rc = aiocp->wait_for_complete(); rc < 0) { + d(1) << " update failure: " << cpp_strerror(rc) << dendl; + return rc; + } + if (logger) { + logger->inc(P_UPDATE_METADATA); + logger->inc(P_UPDATE_ALLOCATED); + logger->inc(P_UPDATE_VERSION); + logger->inc(P_SHRINK); + logger->inc(P_SHRINK_BYTES, bytes_removed); + } + + version += 1; + allocated = a; + return 0; +} + +int SimpleRADOSStriper::maybe_shrink_alloc() +{ + d(15) << dendl; + + if (size == 0) { + if (allocated > 0) { + d(10) << "allocation shrink to 0" << dendl; + return shrink_alloc(0); + } else { + return 0; + } + } + + uint64_t mask = (1< new_allocated && ((allocated-new_allocated) > min_growth)) { + d(10) << "allocation shrink to " << new_allocated << dendl; + return shrink_alloc(new_allocated); + } + + return 0; +} + +bufferlist SimpleRADOSStriper::str2bl(std::string_view sv) +{ + bufferlist bl; + bl.append(sv); + return bl; +} + +bufferlist SimpleRADOSStriper::uint2bl(uint64_t v) +{ + CachedStackStringStream css; + *css << std::dec << std::setw(16) << std::setfill('0') << v; + bufferlist bl; + bl.append(css->strv()); + return bl; +} + +int SimpleRADOSStriper::set_metadata(uint64_t new_size, bool update_size) +{ + d(10) << " new_size: " << new_size + << " update_size: " << update_size + << " allocated: " << allocated + << " size: " << size + << " version: " << version + << dendl; + + bool do_op = false; + auto new_allocated = allocated; + auto ext = get_first_extent(); + auto op = librados::ObjectWriteOperation(); + if (new_size > allocated) { + uint64_t mask = (1<inc(P_UPDATE_ALLOCATED); + d(15) << " updating allocated to " << new_allocated << dendl; + } + if (update_size) { + op.setxattr(XATTR_SIZE, uint2bl(new_size)); + do_op = true; + if (logger) logger->inc(P_UPDATE_SIZE); + d(15) << " updating size to " << new_size << dendl; + } + if (do_op) { + if (logger) logger->inc(P_UPDATE_METADATA); + if (logger) logger->inc(P_UPDATE_VERSION); + op.setxattr(XATTR_VERSION, uint2bl(version+1)); + d(15) << " updating version to " << (version+1) << dendl; + auto aiocp = aiocompletionptr(librados::Rados::aio_create_completion()); + if (int rc = ioctx.aio_operate(ext.soid, aiocp.get(), &op); rc < 0) { + d(1) << " update failure: " << cpp_strerror(rc) << dendl; + return rc; + } + version += 1; + if (allocated != new_allocated) { + /* we need to wait so we don't have dangling extents */ + d(10) << "waiting for allocated update" << dendl; + if (int rc = aiocp->wait_for_complete(); rc < 0) { + d(1) << " update failure: " << cpp_strerror(rc) << dendl; + return rc; + } + aiocp.reset(); + allocated = new_allocated; + } + if (aiocp) { + aios.emplace(std::move(aiocp)); + } + if (update_size) { + size = new_size; + size_dirty = false; + return maybe_shrink_alloc(); + } + } + return 0; +} + +ssize_t SimpleRADOSStriper::write(const void* data, size_t len, uint64_t off) +{ + d(5) << off << "~" << len << dendl; + + if (blocklisted.load()) { + return -EBLOCKLISTED; + } + + if (allocated < (len+off)) { + if (int rc = set_metadata(len+off, false); rc < 0) { + return rc; + } + } + + size_t w = 0; + while ((len-w) > 0) { + auto ext = get_next_extent(off+w, len-w); + auto aiocp = aiocompletionptr(librados::Rados::aio_create_completion()); + bufferlist bl; + bl.append((const char*)data+w, ext.len); + if (int rc = ioctx.aio_write(ext.soid, aiocp.get(), bl, ext.len, ext.off); rc < 0) { + break; + } + aios.emplace(std::move(aiocp)); + w += ext.len; + } + + wait_for_aios(false); // clean up finished completions + + if (size < (len+off)) { + size = len+off; + size_dirty = true; + d(10) << " dirty size: " << size << dendl; + } + + return (ssize_t)w; +} + +ssize_t SimpleRADOSStriper::read(void* data, size_t len, uint64_t off) +{ + d(5) << off << "~" << len << dendl; + + if (blocklisted.load()) { + return -EBLOCKLISTED; + } + + size_t r = 0; + // Don't use std::vector to store bufferlists (e.g for parallelizing aio_reads), + // as they are being moved whenever the vector resizes + // and will cause invalidated references. + std::deque> reads; + while ((len-r) > 0) { + auto ext = get_next_extent(off+r, len-r); + auto& [bl, aiocp] = reads.emplace_back(); + aiocp = aiocompletionptr(librados::Rados::aio_create_completion()); + if (int rc = ioctx.aio_read(ext.soid, aiocp.get(), &bl, ext.len, ext.off); rc < 0) { + d(1) << " read failure: " << cpp_strerror(rc) << dendl; + return rc; + } + r += ext.len; + } + + r = 0; + for (auto& [bl, aiocp] : reads) { + if (int rc = aiocp->wait_for_complete(); rc < 0) { + d(1) << " read failure: " << cpp_strerror(rc) << dendl; + return rc; + } + bl.begin().copy(bl.length(), ((char*)data)+r); + r += bl.length(); + } + ceph_assert(r <= len); + + return r; +} + +int SimpleRADOSStriper::print_lockers(std::ostream& out) +{ + int exclusive; + std::string tag; + std::list lockers; + auto ext = get_first_extent(); + if (int rc = ioctx.list_lockers(ext.soid, biglock, &exclusive, &tag, &lockers); rc < 0) { + d(1) << " list_lockers failure: " << cpp_strerror(rc) << dendl; + return rc; + } + if (lockers.empty()) { + out << " lockers none"; + } else { + out << " lockers exclusive=" << exclusive << " tag=" << tag << " lockers=["; + bool first = true; + for (const auto& l : lockers) { + if (!first) out << ","; + out << l.client << ":" << l.cookie << ":" << l.address; + } + out << "]"; + } + return 0; +} + +/* Do lock renewal in a separate thread: while it's unlikely sqlite chews on + * something for multiple seconds without calling into the VFS (where we could + * initiate a lock renewal), it's not impossible with complex queries. Also, we + * want to allow "PRAGMA locking_mode = exclusive" where the application may + * not use the sqlite3 database connection for an indeterminate amount of time. + */ +void SimpleRADOSStriper::lock_keeper_main(void) +{ + d(20) << dendl; + const auto ext = get_first_extent(); + while (!shutdown) { + d(20) << "tick" << dendl; + std::unique_lock lock(lock_keeper_mutex); + auto now = clock::now(); + auto since = now-last_renewal; + + if (since >= lock_keeper_interval && locked) { + d(10) << "renewing lock" << dendl; + auto tv = ceph::to_timeval(lock_keeper_timeout); + int rc = ioctx.lock_exclusive(ext.soid, biglock, cookie.to_string(), lockdesc, &tv, LIBRADOS_LOCK_FLAG_MUST_RENEW); + if (rc) { + /* If lock renewal fails, we cannot continue the application. Return + * -EBLOCKLISTED for all calls into the striper for this instance, even + * if we're not actually blocklisted. + */ + d(-1) << "lock renewal failed: " << cpp_strerror(rc) << dendl; + blocklisted = true; + break; + } + last_renewal = clock::now(); + } + + lock_keeper_cvar.wait_for(lock, lock_keeper_interval); + } +} + +int SimpleRADOSStriper::recover_lock() +{ + d(5) << "attempting to recover lock" << dendl; + + std::string addrs; + const auto ext = get_first_extent(); + + { + auto tv = ceph::to_timeval(lock_keeper_timeout); + if (int rc = ioctx.lock_exclusive(ext.soid, biglock, cookie.to_string(), lockdesc, &tv, 0); rc < 0) { + return rc; + } + locked = true; + last_renewal = clock::now(); + } + + d(5) << "acquired lock, fetching last owner" << dendl; + + { + bufferlist bl_excl; + if (int rc = ioctx.getxattr(ext.soid, XATTR_EXCL, bl_excl); rc < 0) { + if (rc == -ENOENT) { + /* someone removed it? ok... */ + goto setowner; + } else { + d(-1) << "could not recover exclusive locker" << dendl; + locked = false; /* it will drop eventually */ + return -EIO; + } + } + addrs = bl_excl.to_str(); + } + + if (addrs.empty()) { + d(5) << "someone else cleaned up" << dendl; + goto setowner; + } else { + d(5) << "exclusive lock holder was " << addrs << dendl; + } + + if (blocklist_the_dead) { + entity_addrvec_t addrv; + addrv.parse(addrs.c_str()); + auto R = librados::Rados(ioctx); + auto b = "blocklist"sv; +retry: + for (auto& a : addrv.v) { + CachedStackStringStream css; + *css << "{\"prefix\":\"osd " << b << "\", \"" << b << "op\":\"add\","; + *css << "\"addr\":\""; + *css << a; + *css << "\"}"; + std::vector cmd = {css->str()}; + d(5) << "sending blocklist command: " << cmd << dendl; + std::string out; + if (int rc = R.mon_command(css->str(), bufferlist(), nullptr, &out); rc < 0) { + if (rc == -EINVAL && b == "blocklist"sv) { + b = "blacklist"sv; + goto retry; + } + d(-1) << "Cannot proceed with recovery because I have failed to blocklist the old client: " << cpp_strerror(rc) << ", out = " << out << dendl; + locked = false; /* it will drop eventually */ + return -EIO; + } + } + /* Ensure our osd_op requests have the latest epoch. */ + R.wait_for_latest_osdmap(); + } + +setowner: + d(5) << "setting new owner to myself, " << myaddrs << dendl; + { + auto myaddrbl = str2bl(myaddrs); + if (int rc = ioctx.setxattr(ext.soid, XATTR_EXCL, myaddrbl); rc < 0) { + d(-1) << "could not set lock owner" << dendl; + locked = false; /* it will drop eventually */ + return -EIO; + } + } + return 0; +} + +int SimpleRADOSStriper::lock(uint64_t timeoutms) +{ + /* XXX: timeoutms is unused */ + d(5) << "timeout=" << timeoutms << dendl; + + if (blocklisted.load()) { + return -EBLOCKLISTED; + } + + std::scoped_lock lock(lock_keeper_mutex); + + ceph_assert(!is_locked()); + + /* We're going to be very lazy here in implementation: only exclusive locks + * are allowed. That even ensures a single reader. + */ + uint64_t slept = 0; + + auto ext = get_first_extent(); + while (true) { + /* The general fast path in one compound operation: obtain the lock, + * confirm the past locker cleaned up after themselves (set XATTR_EXCL to + * ""), then finally set XATTR_EXCL to our address vector as the new + * exclusive locker. + */ + + auto op = librados::ObjectWriteOperation(); + auto tv = ceph::to_timeval(lock_keeper_timeout); + utime_t duration; + duration.set_from_timeval(&tv); + rados::cls::lock::lock(&op, biglock, ClsLockType::EXCLUSIVE, cookie.to_string(), "", lockdesc, duration, 0); + op.cmpxattr(XATTR_EXCL, LIBRADOS_CMPXATTR_OP_EQ, bufferlist()); + op.setxattr(XATTR_EXCL, str2bl(myaddrs)); + int rc = ioctx.operate(ext.soid, &op); + if (rc == 0) { + locked = true; + last_renewal = clock::now(); + break; + } else if (rc == -EBUSY) { + if ((slept % 500000) == 0) { + d(-1) << "waiting for locks: "; + print_lockers(*_dout); + *_dout << dendl; + } + usleep(5000); + slept += 5000; + continue; + } else if (rc == -ECANCELED) { + /* CMPXATTR failed, a locker didn't cleanup. Try to recover! */ + if (rc = recover_lock(); rc < 0) { + if (rc == -EBUSY) { + continue; /* try again */ + } + return rc; + } + break; + } else { + d(-1) << " lock failed: " << cpp_strerror(rc) << dendl; + return rc; + } + } + + if (!lock_keeper.joinable()) { + lock_keeper = std::thread(&SimpleRADOSStriper::lock_keeper_main, this); + } + + if (int rc = open(); rc < 0) { + d(5) << " open failed: " << cpp_strerror(rc) << dendl; + return rc; + } + + d(5) << " = 0" << dendl; + if (logger) { + logger->inc(P_LOCK); + } + + return 0; +} + +int SimpleRADOSStriper::unlock() +{ + d(5) << dendl; + + if (blocklisted.load()) { + return -EBLOCKLISTED; + } + + std::scoped_lock lock(lock_keeper_mutex); + + ceph_assert(is_locked()); + + /* wait for flush of metadata */ + if (int rc = flush(); rc < 0) { + return rc; + } + + const auto ext = get_first_extent(); + auto op = librados::ObjectWriteOperation(); + op.cmpxattr(XATTR_EXCL, LIBRADOS_CMPXATTR_OP_EQ, str2bl(myaddrs)); + op.setxattr(XATTR_EXCL, bufferlist()); + rados::cls::lock::unlock(&op, biglock, cookie.to_string()); + if (int rc = ioctx.operate(ext.soid, &op); rc < 0) { + d(-1) << " unlock failed: " << cpp_strerror(rc) << dendl; + return rc; + } + locked = false; + + d(5) << " = 0" << dendl; + if (logger) { + logger->inc(P_UNLOCK); + } + + return 0; +} -- cgit v1.2.3