diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-21 11:54:28 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-21 11:54:28 +0000 |
commit | e6918187568dbd01842d8d1d2c808ce16a894239 (patch) | |
tree | 64f88b554b444a49f656b6c656111a145cbbaa28 /src/os/bluestore/BlueFS.cc | |
parent | Initial commit. (diff) | |
download | ceph-e6918187568dbd01842d8d1d2c808ce16a894239.tar.xz ceph-e6918187568dbd01842d8d1d2c808ce16a894239.zip |
Adding upstream version 18.2.2.upstream/18.2.2
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to '')
-rw-r--r-- | src/os/bluestore/BlueFS.cc | 4682 |
1 files changed, 4682 insertions, 0 deletions
diff --git a/src/os/bluestore/BlueFS.cc b/src/os/bluestore/BlueFS.cc new file mode 100644 index 000000000..710021f07 --- /dev/null +++ b/src/os/bluestore/BlueFS.cc @@ -0,0 +1,4682 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +#include <chrono> +#include "boost/algorithm/string.hpp" +#include "bluestore_common.h" +#include "BlueFS.h" + +#include "common/debug.h" +#include "common/errno.h" +#include "common/perf_counters.h" +#include "Allocator.h" +#include "include/ceph_assert.h" +#include "common/admin_socket.h" + +#define dout_context cct +#define dout_subsys ceph_subsys_bluefs +#undef dout_prefix +#define dout_prefix *_dout << "bluefs " +using TOPNSPC::common::cmd_getval; + +using std::byte; +using std::list; +using std::make_pair; +using std::map; +using std::ostream; +using std::pair; +using std::set; +using std::string; +using std::to_string; +using std::vector; +using std::chrono::duration; +using std::chrono::seconds; + +using ceph::bufferlist; +using ceph::decode; +using ceph::encode; +using ceph::Formatter; + + +MEMPOOL_DEFINE_OBJECT_FACTORY(BlueFS::File, bluefs_file, bluefs); +MEMPOOL_DEFINE_OBJECT_FACTORY(BlueFS::Dir, bluefs_dir, bluefs); +MEMPOOL_DEFINE_OBJECT_FACTORY(BlueFS::FileWriter, bluefs_file_writer, bluefs_file_writer); +MEMPOOL_DEFINE_OBJECT_FACTORY(BlueFS::FileReaderBuffer, + bluefs_file_reader_buffer, bluefs_file_reader); +MEMPOOL_DEFINE_OBJECT_FACTORY(BlueFS::FileReader, bluefs_file_reader, bluefs_file_reader); +MEMPOOL_DEFINE_OBJECT_FACTORY(BlueFS::FileLock, bluefs_file_lock, bluefs); + +static void wal_discard_cb(void *priv, void* priv2) { + BlueFS *bluefs = static_cast<BlueFS*>(priv); + interval_set<uint64_t> *tmp = static_cast<interval_set<uint64_t>*>(priv2); + bluefs->handle_discard(BlueFS::BDEV_WAL, *tmp); +} + +static void db_discard_cb(void *priv, void* priv2) { + BlueFS *bluefs = static_cast<BlueFS*>(priv); + interval_set<uint64_t> *tmp = static_cast<interval_set<uint64_t>*>(priv2); + bluefs->handle_discard(BlueFS::BDEV_DB, *tmp); +} + +static void slow_discard_cb(void *priv, void* priv2) { + BlueFS *bluefs = static_cast<BlueFS*>(priv); + interval_set<uint64_t> *tmp = static_cast<interval_set<uint64_t>*>(priv2); + bluefs->handle_discard(BlueFS::BDEV_SLOW, *tmp); +} + +class BlueFS::SocketHook : public AdminSocketHook { + BlueFS* bluefs; +public: + static BlueFS::SocketHook* create(BlueFS* bluefs) + { + BlueFS::SocketHook* hook = nullptr; + AdminSocket* admin_socket = bluefs->cct->get_admin_socket(); + if (admin_socket) { + hook = new BlueFS::SocketHook(bluefs); + int r = admin_socket->register_command("bluestore bluefs device info " + "name=alloc_size,type=CephInt,req=false", + hook, + "Shows space report for bluefs devices. " + "This also includes an estimation for space " + "available to bluefs at main device. " + "alloc_size, if set, specifies the custom bluefs " + "allocation unit size for the estimation above."); + if (r != 0) { + ldout(bluefs->cct, 1) << __func__ << " cannot register SocketHook" << dendl; + delete hook; + hook = nullptr; + } else { + r = admin_socket->register_command("bluefs stats", + hook, + "Dump internal statistics for bluefs." + ""); + ceph_assert(r == 0); + r = admin_socket->register_command("bluefs files list", hook, + "print files in bluefs"); + ceph_assert(r == 0); + r = admin_socket->register_command("bluefs debug_inject_read_zeros", hook, + "Injects 8K zeros into next BlueFS read. Debug only."); + ceph_assert(r == 0); + } + } + return hook; + } + + ~SocketHook() { + AdminSocket* admin_socket = bluefs->cct->get_admin_socket(); + admin_socket->unregister_commands(this); + } +private: + SocketHook(BlueFS* bluefs) : + bluefs(bluefs) {} + int call(std::string_view command, const cmdmap_t& cmdmap, + const bufferlist&, + Formatter *f, + std::ostream& errss, + bufferlist& out) override { + if (command == "bluestore bluefs device info") { + int64_t alloc_size = 0; + cmd_getval(cmdmap, "alloc_size", alloc_size); + if ((alloc_size & (alloc_size - 1)) != 0) { + errss << "Invalid allocation size:'" << alloc_size << std::endl; + return -EINVAL; + } + if (alloc_size == 0) + alloc_size = bluefs->cct->_conf->bluefs_shared_alloc_size; + f->open_object_section("bluefs_device_info"); + for (unsigned dev = BDEV_WAL; dev <= BDEV_SLOW; dev++) { + if (bluefs->bdev[dev]) { + f->open_object_section("dev"); + f->dump_string("device", bluefs->get_device_name(dev)); + ceph_assert(bluefs->alloc[dev]); + auto total = bluefs->get_total(dev); + auto free = bluefs->get_free(dev); + auto used = bluefs->get_used(dev); + + f->dump_int("total", total); + f->dump_int("free", free); + f->dump_int("bluefs_used", used); + if (bluefs->is_shared_alloc(dev)) { + size_t avail = bluefs->probe_alloc_avail(dev, alloc_size); + f->dump_int("bluefs max available", avail); + } + f->close_section(); + } + } + + f->close_section(); + } else if (command == "bluefs stats") { + std::stringstream ss; + bluefs->dump_block_extents(ss); + bluefs->dump_volume_selector(ss); + out.append(ss); + } else if (command == "bluefs files list") { + const char* devnames[3] = {"wal","db","slow"}; + std::lock_guard l(bluefs->nodes.lock); + f->open_array_section("files"); + for (auto &d : bluefs->nodes.dir_map) { + std::string dir = d.first; + for (auto &r : d.second->file_map) { + f->open_object_section("file"); + f->dump_string("name", (dir + "/" + r.first).c_str()); + std::vector<size_t> sizes; + sizes.resize(bluefs->bdev.size()); + for(auto& i : r.second->fnode.extents) { + sizes[i.bdev] += i.length; + } + for (size_t i = 0; i < sizes.size(); i++) { + if (sizes[i]>0) { + if (i < sizeof(devnames) / sizeof(*devnames)) + f->dump_int(devnames[i], sizes[i]); + else + f->dump_int(("dev-"+to_string(i)).c_str(), sizes[i]); + } + } + f->close_section(); + } + } + f->close_section(); + f->flush(out); + } else if (command == "bluefs debug_inject_read_zeros") { + bluefs->inject_read_zeros++; + } else { + errss << "Invalid command" << std::endl; + return -ENOSYS; + } + return 0; + } +}; + +BlueFS::BlueFS(CephContext* cct) + : cct(cct), + bdev(MAX_BDEV), + ioc(MAX_BDEV), + block_reserved(MAX_BDEV), + alloc(MAX_BDEV), + alloc_size(MAX_BDEV, 0) +{ + dirty.pending_release.resize(MAX_BDEV); + discard_cb[BDEV_WAL] = wal_discard_cb; + discard_cb[BDEV_DB] = db_discard_cb; + discard_cb[BDEV_SLOW] = slow_discard_cb; + asok_hook = SocketHook::create(this); +} + +BlueFS::~BlueFS() +{ + delete asok_hook; + for (auto p : ioc) { + if (p) + p->aio_wait(); + } + for (auto p : bdev) { + if (p) { + p->close(); + delete p; + } + } + for (auto p : ioc) { + delete p; + } +} + +void BlueFS::_init_logger() +{ + PerfCountersBuilder b(cct, "bluefs", + l_bluefs_first, l_bluefs_last); + b.add_u64(l_bluefs_db_total_bytes, "db_total_bytes", + "Total bytes (main db device)", + "b", PerfCountersBuilder::PRIO_USEFUL, unit_t(UNIT_BYTES)); + b.add_u64(l_bluefs_db_used_bytes, "db_used_bytes", + "Used bytes (main db device)", + "u", PerfCountersBuilder::PRIO_USEFUL, unit_t(UNIT_BYTES)); + b.add_u64(l_bluefs_wal_total_bytes, "wal_total_bytes", + "Total bytes (wal device)", + "walb", PerfCountersBuilder::PRIO_USEFUL, unit_t(UNIT_BYTES)); + b.add_u64(l_bluefs_wal_used_bytes, "wal_used_bytes", + "Used bytes (wal device)", + "walu", PerfCountersBuilder::PRIO_USEFUL, unit_t(UNIT_BYTES)); + b.add_u64(l_bluefs_slow_total_bytes, "slow_total_bytes", + "Total bytes (slow device)", + "slob", PerfCountersBuilder::PRIO_USEFUL, unit_t(UNIT_BYTES)); + b.add_u64(l_bluefs_slow_used_bytes, "slow_used_bytes", + "Used bytes (slow device)", + "slou", PerfCountersBuilder::PRIO_USEFUL, unit_t(UNIT_BYTES)); + b.add_u64(l_bluefs_num_files, "num_files", "File count", + "f", PerfCountersBuilder::PRIO_USEFUL); + b.add_u64(l_bluefs_log_bytes, "log_bytes", "Size of the metadata log", + "jlen", PerfCountersBuilder::PRIO_INTERESTING, unit_t(UNIT_BYTES)); + b.add_u64_counter(l_bluefs_log_compactions, "log_compactions", + "Compactions of the metadata log"); + b.add_u64_counter(l_bluefs_log_write_count, "log_write_count", + "Write op count to the metadata log"); + b.add_u64_counter(l_bluefs_logged_bytes, "logged_bytes", + "Bytes written to the metadata log", + "j", + PerfCountersBuilder::PRIO_CRITICAL, unit_t(UNIT_BYTES)); + b.add_u64_counter(l_bluefs_files_written_wal, "files_written_wal", + "Files written to WAL"); + b.add_u64_counter(l_bluefs_files_written_sst, "files_written_sst", + "Files written to SSTs"); + b.add_u64_counter(l_bluefs_write_count_wal, "write_count_wal", + "Write op count to WAL"); + b.add_u64_counter(l_bluefs_write_count_sst, "write_count_sst", + "Write op count to SSTs"); + b.add_u64_counter(l_bluefs_bytes_written_wal, "bytes_written_wal", + "Bytes written to WAL", + "walb", + PerfCountersBuilder::PRIO_CRITICAL); + b.add_u64_counter(l_bluefs_bytes_written_sst, "bytes_written_sst", + "Bytes written to SSTs", + "sstb", + PerfCountersBuilder::PRIO_CRITICAL, unit_t(UNIT_BYTES)); + b.add_u64_counter(l_bluefs_bytes_written_slow, "bytes_written_slow", + "Bytes written to WAL/SSTs at slow device", + "slwb", + PerfCountersBuilder::PRIO_CRITICAL, unit_t(UNIT_BYTES)); + b.add_u64_counter(l_bluefs_max_bytes_wal, "max_bytes_wal", + "Maximum bytes allocated from WAL", + "mxwb", + PerfCountersBuilder::PRIO_INTERESTING, + unit_t(UNIT_BYTES)); + b.add_u64_counter(l_bluefs_max_bytes_db, "max_bytes_db", + "Maximum bytes allocated from DB", + "mxdb", + PerfCountersBuilder::PRIO_INTERESTING, + unit_t(UNIT_BYTES)); + b.add_u64_counter(l_bluefs_max_bytes_slow, "max_bytes_slow", + "Maximum bytes allocated from SLOW", + "mxwb", + PerfCountersBuilder::PRIO_INTERESTING, + unit_t(UNIT_BYTES)); + b.add_u64_counter(l_bluefs_main_alloc_unit, "alloc_unit_main", + "Allocation unit size (in bytes) for primary/shared device", + "aumb", + PerfCountersBuilder::PRIO_CRITICAL, + unit_t(UNIT_BYTES)); + b.add_u64_counter(l_bluefs_db_alloc_unit, "alloc_unit_db", + "Allocation unit size (in bytes) for standalone DB device", + "audb", + PerfCountersBuilder::PRIO_CRITICAL, + unit_t(UNIT_BYTES)); + b.add_u64_counter(l_bluefs_wal_alloc_unit, "alloc_unit_wal", + "Allocation unit size (in bytes) for standalone WAL device", + "auwb", + PerfCountersBuilder::PRIO_CRITICAL, + unit_t(UNIT_BYTES)); + b.add_u64_counter(l_bluefs_read_random_count, "read_random_count", + "random read requests processed", + NULL, + PerfCountersBuilder::PRIO_USEFUL); + b.add_u64_counter(l_bluefs_read_random_bytes, "read_random_bytes", + "Bytes requested in random read mode", + NULL, + PerfCountersBuilder::PRIO_USEFUL, unit_t(UNIT_BYTES)); + b.add_u64_counter(l_bluefs_read_random_disk_count, "read_random_disk_count", + "random reads requests going to disk", + NULL, + PerfCountersBuilder::PRIO_USEFUL); + b.add_u64_counter(l_bluefs_read_random_disk_bytes, "read_random_disk_bytes", + "Bytes read from disk in random read mode", + "rrb", + PerfCountersBuilder::PRIO_INTERESTING, + unit_t(UNIT_BYTES)); + b.add_u64_counter(l_bluefs_read_random_disk_bytes_wal, "read_random_disk_bytes_wal", + "random reads requests going to WAL disk", + NULL, + PerfCountersBuilder::PRIO_USEFUL, unit_t(UNIT_BYTES)); + b.add_u64_counter(l_bluefs_read_random_disk_bytes_db, "read_random_disk_bytes_db", + "random reads requests going to DB disk", + NULL, + PerfCountersBuilder::PRIO_USEFUL, unit_t(UNIT_BYTES)); + b.add_u64_counter(l_bluefs_read_random_disk_bytes_slow, "read_random_disk_bytes_slow", + "random reads requests going to main disk", + "rrsb", + PerfCountersBuilder::PRIO_INTERESTING, + unit_t(UNIT_BYTES)); + b.add_u64_counter(l_bluefs_read_random_buffer_count, "read_random_buffer_count", + "random read requests processed using prefetch buffer", + NULL, + PerfCountersBuilder::PRIO_USEFUL); + b.add_u64_counter(l_bluefs_read_random_buffer_bytes, "read_random_buffer_bytes", + "Bytes read from prefetch buffer in random read mode", + NULL, + PerfCountersBuilder::PRIO_USEFUL, unit_t(UNIT_BYTES)); + b.add_u64_counter(l_bluefs_read_count, "read_count", + "buffered read requests processed", + NULL, + PerfCountersBuilder::PRIO_USEFUL); + b.add_u64_counter(l_bluefs_read_bytes, "read_bytes", + "Bytes requested in buffered read mode", + NULL, + PerfCountersBuilder::PRIO_USEFUL, unit_t(UNIT_BYTES)); + b.add_u64_counter(l_bluefs_read_disk_count, "read_disk_count", + "buffered reads requests going to disk", + NULL, + PerfCountersBuilder::PRIO_USEFUL); + b.add_u64_counter(l_bluefs_read_disk_bytes, "read_disk_bytes", + "Bytes read in buffered mode from disk", + "rb", + PerfCountersBuilder::PRIO_INTERESTING, unit_t(UNIT_BYTES)); + b.add_u64_counter(l_bluefs_read_disk_bytes_wal, "read_disk_bytes_wal", + "reads requests going to WAL disk", + NULL, + PerfCountersBuilder::PRIO_USEFUL, unit_t(UNIT_BYTES)); + b.add_u64_counter(l_bluefs_read_disk_bytes_db, "read_disk_bytes_db", + "reads requests going to DB disk", + NULL, + PerfCountersBuilder::PRIO_USEFUL, unit_t(UNIT_BYTES)); + b.add_u64_counter(l_bluefs_read_disk_bytes_slow, "read_disk_bytes_slow", + "reads requests going to main disk", + "rsb", + PerfCountersBuilder::PRIO_INTERESTING, unit_t(UNIT_BYTES)); + b.add_u64_counter(l_bluefs_read_prefetch_count, "read_prefetch_count", + "prefetch read requests processed", + NULL, + PerfCountersBuilder::PRIO_USEFUL); + b.add_u64_counter(l_bluefs_read_prefetch_bytes, "read_prefetch_bytes", + "Bytes requested in prefetch read mode", + NULL, + PerfCountersBuilder::PRIO_USEFUL, unit_t(UNIT_BYTES)); + b.add_u64_counter(l_bluefs_write_count, "write_count", + "Write requests processed"); + b.add_u64_counter(l_bluefs_write_disk_count, "write_disk_count", + "Write requests sent to disk"); + b.add_u64_counter(l_bluefs_write_bytes, "write_bytes", + "Bytes written", NULL, + PerfCountersBuilder::PRIO_USEFUL, unit_t(UNIT_BYTES)); + b.add_time_avg (l_bluefs_compaction_lat, "compact_lat", + "Average bluefs log compaction latency", + "c__t", + PerfCountersBuilder::PRIO_INTERESTING); + b.add_time_avg (l_bluefs_compaction_lock_lat, "compact_lock_lat", + "Average lock duration while compacting bluefs log", + "c_lt", + PerfCountersBuilder::PRIO_INTERESTING); + b.add_u64_counter(l_bluefs_alloc_shared_dev_fallbacks, "alloc_slow_fallback", + "Amount of allocations that required fallback to " + " slow/shared device", + "asdf", + PerfCountersBuilder::PRIO_USEFUL); + b.add_u64_counter(l_bluefs_alloc_shared_size_fallbacks, "alloc_slow_size_fallback", + "Amount of allocations that required fallback to shared device's " + "regular unit size", + "assf", + PerfCountersBuilder::PRIO_USEFUL); + b.add_u64(l_bluefs_read_zeros_candidate, "read_zeros_candidate", + "How many times bluefs read found page with all 0s"); + b.add_u64(l_bluefs_read_zeros_errors, "read_zeros_errors", + "How many times bluefs read found transient page with all 0s"); + + logger = b.create_perf_counters(); + cct->get_perfcounters_collection()->add(logger); +} + +void BlueFS::_shutdown_logger() +{ + cct->get_perfcounters_collection()->remove(logger); + delete logger; +} + +void BlueFS::_update_logger_stats() +{ + if (alloc[BDEV_WAL]) { + logger->set(l_bluefs_wal_total_bytes, _get_total(BDEV_WAL)); + logger->set(l_bluefs_wal_used_bytes, _get_used(BDEV_WAL)); + } + if (alloc[BDEV_DB]) { + logger->set(l_bluefs_db_total_bytes, _get_total(BDEV_DB)); + logger->set(l_bluefs_db_used_bytes, _get_used(BDEV_DB)); + } + if (alloc[BDEV_SLOW]) { + logger->set(l_bluefs_slow_total_bytes, _get_total(BDEV_SLOW)); + logger->set(l_bluefs_slow_used_bytes, _get_used(BDEV_SLOW)); + } +} + +int BlueFS::add_block_device(unsigned id, const string& path, bool trim, + uint64_t reserved, + bluefs_shared_alloc_context_t* _shared_alloc) +{ + dout(10) << __func__ << " bdev " << id << " path " << path << " " + << reserved << dendl; + ceph_assert(id < bdev.size()); + ceph_assert(bdev[id] == NULL); + BlockDevice *b = BlockDevice::create(cct, path, NULL, NULL, + discard_cb[id], static_cast<void*>(this)); + block_reserved[id] = reserved; + if (_shared_alloc) { + b->set_no_exclusive_lock(); + } + int r = b->open(path); + if (r < 0) { + delete b; + return r; + } + if (trim) { + interval_set<uint64_t> whole_device; + whole_device.insert(0, b->get_size()); + b->try_discard(whole_device, false); + } + + dout(1) << __func__ << " bdev " << id << " path " << path + << " size " << byte_u_t(b->get_size()) << dendl; + bdev[id] = b; + ioc[id] = new IOContext(cct, NULL); + if (_shared_alloc) { + ceph_assert(!shared_alloc); + shared_alloc = _shared_alloc; + alloc[id] = shared_alloc->a; + shared_alloc_id = id; + } + return 0; +} + +bool BlueFS::bdev_support_label(unsigned id) +{ + ceph_assert(id < bdev.size()); + ceph_assert(bdev[id]); + return bdev[id]->supported_bdev_label(); +} + +uint64_t BlueFS::get_block_device_size(unsigned id) const +{ + if (id < bdev.size() && bdev[id]) + return bdev[id]->get_size(); + return 0; +} + +void BlueFS::handle_discard(unsigned id, interval_set<uint64_t>& to_release) +{ + dout(10) << __func__ << " bdev " << id << dendl; + ceph_assert(alloc[id]); + alloc[id]->release(to_release); + if (is_shared_alloc(id)) { + shared_alloc->bluefs_used -= to_release.size(); + } +} + +uint64_t BlueFS::get_used() +{ + uint64_t used = 0; + for (unsigned id = 0; id < MAX_BDEV; ++id) { + used += _get_used(id); + } + return used; +} + +uint64_t BlueFS::_get_used(unsigned id) const +{ + uint64_t used = 0; + if (!alloc[id]) + return 0; + + if (is_shared_alloc(id)) { + used = shared_alloc->bluefs_used; + } else { + used = _get_total(id) - alloc[id]->get_free(); + } + return used; +} + +uint64_t BlueFS::get_used(unsigned id) +{ + ceph_assert(id < alloc.size()); + ceph_assert(alloc[id]); + return _get_used(id); +} + +uint64_t BlueFS::_get_total(unsigned id) const +{ + ceph_assert(id < bdev.size()); + ceph_assert(id < block_reserved.size()); + return get_block_device_size(id) - block_reserved[id]; +} + +uint64_t BlueFS::get_total(unsigned id) +{ + return _get_total(id); +} + +uint64_t BlueFS::get_free(unsigned id) +{ + ceph_assert(id < alloc.size()); + return alloc[id]->get_free(); +} + +void BlueFS::dump_perf_counters(Formatter *f) +{ + f->open_object_section("bluefs_perf_counters"); + logger->dump_formatted(f, false, false); + f->close_section(); +} + +void BlueFS::dump_block_extents(ostream& out) +{ + for (unsigned i = 0; i < MAX_BDEV; ++i) { + if (!bdev[i]) { + continue; + } + auto total = get_total(i); + auto free = get_free(i); + + out << i << " : device size 0x" << std::hex << total + << " : using 0x" << total - free + << std::dec << "(" << byte_u_t(total - free) << ")"; + out << "\n"; + } +} + +void BlueFS::foreach_block_extents( + unsigned id, + std::function<void(uint64_t, uint32_t)> fn) +{ + std::lock_guard nl(nodes.lock); + dout(10) << __func__ << " bdev " << id << dendl; + ceph_assert(id < alloc.size()); + for (auto& p : nodes.file_map) { + for (auto& q : p.second->fnode.extents) { + if (q.bdev == id) { + fn(q.offset, q.length); + } + } + } +} + +int BlueFS::mkfs(uuid_d osd_uuid, const bluefs_layout_t& layout) +{ + dout(1) << __func__ + << " osd_uuid " << osd_uuid + << dendl; + + // set volume selector if not provided before/outside + if (vselector == nullptr) { + vselector.reset( + new OriginalVolumeSelector( + get_block_device_size(BlueFS::BDEV_WAL) * 95 / 100, + get_block_device_size(BlueFS::BDEV_DB) * 95 / 100, + get_block_device_size(BlueFS::BDEV_SLOW) * 95 / 100)); + } + + _init_logger(); + _init_alloc(); + + super.version = 0; + super.block_size = bdev[BDEV_DB]->get_block_size(); + super.osd_uuid = osd_uuid; + super.uuid.generate_random(); + dout(1) << __func__ << " uuid " << super.uuid << dendl; + + // init log + FileRef log_file = ceph::make_ref<File>(); + log_file->fnode.ino = 1; + log_file->vselector_hint = vselector->get_hint_for_log(); + int r = _allocate( + vselector->select_prefer_bdev(log_file->vselector_hint), + cct->_conf->bluefs_max_log_runway, + 0, + &log_file->fnode); + vselector->add_usage(log_file->vselector_hint, log_file->fnode); + ceph_assert(r == 0); + log.writer = _create_writer(log_file); + + // initial txn + ceph_assert(log.seq_live == 1); + log.t.seq = 1; + log.t.op_init(); + _flush_and_sync_log_LD(); + + // write supers + super.log_fnode = log_file->fnode; + super.memorized_layout = layout; + _write_super(BDEV_DB); + _flush_bdev(); + + // clean up + super = bluefs_super_t(); + _close_writer(log.writer); + log.writer = NULL; + vselector.reset(nullptr); + _stop_alloc(); + _shutdown_logger(); + if (shared_alloc) { + ceph_assert(shared_alloc->need_init); + shared_alloc->need_init = false; + } + + dout(10) << __func__ << " success" << dendl; + return 0; +} + +void BlueFS::_init_alloc() +{ + dout(20) << __func__ << dendl; + + size_t wal_alloc_size = 0; + if (bdev[BDEV_WAL]) { + wal_alloc_size = cct->_conf->bluefs_alloc_size; + alloc_size[BDEV_WAL] = wal_alloc_size; + } + logger->set(l_bluefs_wal_alloc_unit, wal_alloc_size); + + + uint64_t shared_alloc_size = cct->_conf->bluefs_shared_alloc_size; + if (shared_alloc && shared_alloc->a) { + uint64_t unit = shared_alloc->a->get_block_size(); + shared_alloc_size = std::max( + unit, + shared_alloc_size); + ceph_assert(0 == p2phase(shared_alloc_size, unit)); + } + if (bdev[BDEV_SLOW]) { + alloc_size[BDEV_DB] = cct->_conf->bluefs_alloc_size; + alloc_size[BDEV_SLOW] = shared_alloc_size; + } else { + alloc_size[BDEV_DB] = shared_alloc_size; + alloc_size[BDEV_SLOW] = 0; + } + logger->set(l_bluefs_db_alloc_unit, alloc_size[BDEV_DB]); + logger->set(l_bluefs_main_alloc_unit, alloc_size[BDEV_SLOW]); + // new wal and db devices are never shared + if (bdev[BDEV_NEWWAL]) { + alloc_size[BDEV_NEWWAL] = cct->_conf->bluefs_alloc_size; + } + if (bdev[BDEV_NEWDB]) { + alloc_size[BDEV_NEWDB] = cct->_conf->bluefs_alloc_size; + } + + for (unsigned id = 0; id < bdev.size(); ++id) { + if (!bdev[id]) { + continue; + } + ceph_assert(bdev[id]->get_size()); + if (is_shared_alloc(id)) { + dout(1) << __func__ << " shared, id " << id << std::hex + << ", capacity 0x" << bdev[id]->get_size() + << ", block size 0x" << alloc_size[id] + << std::dec << dendl; + } else { + ceph_assert(alloc_size[id]); + std::string name = "bluefs-"; + const char* devnames[] = { "wal","db","slow" }; + if (id <= BDEV_SLOW) + name += devnames[id]; + else + name += to_string(uintptr_t(this)); + dout(1) << __func__ << " new, id " << id << std::hex + << ", allocator name " << name + << ", allocator type " << cct->_conf->bluefs_allocator + << ", capacity 0x" << bdev[id]->get_size() + << ", block size 0x" << alloc_size[id] + << std::dec << dendl; + alloc[id] = Allocator::create(cct, cct->_conf->bluefs_allocator, + bdev[id]->get_size(), + alloc_size[id], + 0, 0, + name); + alloc[id]->init_add_free( + block_reserved[id], + _get_total(id)); + } + } +} + +void BlueFS::_stop_alloc() +{ + dout(20) << __func__ << dendl; + for (auto p : bdev) { + if (p) + p->discard_drain(); + } + + for (size_t i = 0; i < alloc.size(); ++i) { + if (alloc[i] && !is_shared_alloc(i)) { + alloc[i]->shutdown(); + delete alloc[i]; + alloc[i] = nullptr; + } + } +} + +int BlueFS::_read_and_check(uint8_t ndev, uint64_t off, uint64_t len, + ceph::buffer::list *pbl, IOContext *ioc, bool buffered) +{ + dout(10) << __func__ << " dev " << int(ndev) + << ": 0x" << std::hex << off << "~" << len << std::dec + << (buffered ? " buffered" : "") + << dendl; + int r; + bufferlist bl; + r = _bdev_read(ndev, off, len, &bl, ioc, buffered); + if (r != 0) { + return r; + } + uint64_t block_size = bdev[ndev]->get_block_size(); + if (inject_read_zeros) { + if (len >= block_size * 2) { + derr << __func__ << " injecting error, zeros at " + << int(ndev) << ": 0x" << std::hex << (off + len / 2) + << "~" << (block_size * 2) << std::dec << dendl; + //use beginning, replace 8K in the middle with zeros, use tail + bufferlist temp; + bl.splice(0, len / 2 - block_size, &temp); + temp.append(buffer::create(block_size * 2, 0)); + bl.splice(block_size * 2, len / 2 - block_size, &temp); + bl = temp; + inject_read_zeros--; + } + } + //make a check if there is a block with all 0 + uint64_t to_check_len = len; + uint64_t skip = p2nphase(off, block_size); + if (skip >= to_check_len) { + return r; + } + auto it = bl.begin(skip); + to_check_len -= skip; + bool all_zeros = false; + while (all_zeros == false && to_check_len >= block_size) { + // checking 0s step + unsigned block_left = block_size; + unsigned avail; + const char* data; + all_zeros = true; + while (all_zeros && block_left > 0) { + avail = it.get_ptr_and_advance(block_left, &data); + block_left -= avail; + all_zeros = mem_is_zero(data, avail); + } + // skipping step + while (block_left > 0) { + avail = it.get_ptr_and_advance(block_left, &data); + block_left -= avail; + } + to_check_len -= block_size; + } + if (all_zeros) { + logger->inc(l_bluefs_read_zeros_candidate, 1); + bufferlist bl_reread; + r = _bdev_read(ndev, off, len, &bl_reread, ioc, buffered); + if (r != 0) { + return r; + } + // check if both read gave the same + if (!bl.contents_equal(bl_reread)) { + // report problems to log, but continue, maybe it will be good now... + derr << __func__ << " initial read of " << int(ndev) + << ": 0x" << std::hex << off << "~" << len + << std::dec << ": different then re-read " << dendl; + logger->inc(l_bluefs_read_zeros_errors, 1); + } + // use second read will be better if is different + pbl->append(bl_reread); + } else { + pbl->append(bl); + } + return r; +} + +int BlueFS::_read_random_and_check( + uint8_t ndev, uint64_t off, uint64_t len, char *buf, bool buffered) +{ + dout(10) << __func__ << " dev " << int(ndev) + << ": 0x" << std::hex << off << "~" << len << std::dec + << (buffered ? " buffered" : "") + << dendl; + int r; + r = _bdev_read_random(ndev, off, len, buf, buffered); + if (r != 0) { + return r; + } + uint64_t block_size = bdev[ndev]->get_block_size(); + if (inject_read_zeros) { + if (len >= block_size * 2) { + derr << __func__ << " injecting error, zeros at " + << int(ndev) << ": 0x" << std::hex << (off + len / 2) + << "~" << (block_size * 2) << std::dec << dendl; + //zero middle 8K + memset(buf + len / 2 - block_size, 0, block_size * 2); + inject_read_zeros--; + } + } + //make a check if there is a block with all 0 + uint64_t to_check_len = len; + const char* data = buf; + uint64_t skip = p2nphase(off, block_size); + if (skip >= to_check_len) { + return r; + } + to_check_len -= skip; + data += skip; + + bool all_zeros = false; + while (all_zeros == false && to_check_len >= block_size) { + if (mem_is_zero(data, block_size)) { + // at least one block is all zeros + all_zeros = true; + break; + } + data += block_size; + to_check_len -= block_size; + } + if (all_zeros) { + logger->inc(l_bluefs_read_zeros_candidate, 1); + std::unique_ptr<char[]> data_reread(new char[len]); + r = _bdev_read_random(ndev, off, len, &data_reread[0], buffered); + if (r != 0) { + return r; + } + // check if both read gave the same + if (memcmp(buf, &data_reread[0], len) != 0) { + derr << __func__ << " initial read of " << int(ndev) + << ": 0x" << std::hex << off << "~" << len + << std::dec << ": different then re-read " << dendl; + logger->inc(l_bluefs_read_zeros_errors, 1); + // second read is probably better + memcpy(buf, &data_reread[0], len); + } + } + return r; +} + +int BlueFS::_bdev_read(uint8_t ndev, uint64_t off, uint64_t len, + ceph::buffer::list* pbl, IOContext* ioc, bool buffered) +{ + int cnt = 0; + switch (ndev) { + case BDEV_WAL: cnt = l_bluefs_read_disk_bytes_wal; break; + case BDEV_DB: cnt = l_bluefs_read_disk_bytes_db; break; + case BDEV_SLOW: cnt = l_bluefs_read_disk_bytes_slow; break; + + } + if (cnt) { + logger->inc(cnt, len); + } + return bdev[ndev]->read(off, len, pbl, ioc, buffered); +} + +int BlueFS::_bdev_read_random(uint8_t ndev, uint64_t off, uint64_t len, + char* buf, bool buffered) +{ + int cnt = 0; + switch (ndev) { + case BDEV_WAL: cnt = l_bluefs_read_random_disk_bytes_wal; break; + case BDEV_DB: cnt = l_bluefs_read_random_disk_bytes_db; break; + case BDEV_SLOW: cnt = l_bluefs_read_random_disk_bytes_slow; break; + } + if (cnt) { + logger->inc(cnt, len); + } + return bdev[ndev]->read_random(off, len, buf, buffered); +} + +int BlueFS::mount() +{ + dout(1) << __func__ << dendl; + + _init_logger(); + int r = _open_super(); + if (r < 0) { + derr << __func__ << " failed to open super: " << cpp_strerror(r) << dendl; + goto out; + } + + // set volume selector if not provided before/outside + if (vselector == nullptr) { + vselector.reset( + new OriginalVolumeSelector( + get_block_device_size(BlueFS::BDEV_WAL) * 95 / 100, + get_block_device_size(BlueFS::BDEV_DB) * 95 / 100, + get_block_device_size(BlueFS::BDEV_SLOW) * 95 / 100)); + } + + _init_alloc(); + + r = _replay(false, false); + if (r < 0) { + derr << __func__ << " failed to replay log: " << cpp_strerror(r) << dendl; + _stop_alloc(); + goto out; + } + + // init freelist + for (auto& p : nodes.file_map) { + dout(30) << __func__ << " noting alloc for " << p.second->fnode << dendl; + for (auto& q : p.second->fnode.extents) { + bool is_shared = is_shared_alloc(q.bdev); + ceph_assert(!is_shared || (is_shared && shared_alloc)); + if (is_shared && shared_alloc->need_init && shared_alloc->a) { + shared_alloc->bluefs_used += q.length; + alloc[q.bdev]->init_rm_free(q.offset, q.length); + } else if (!is_shared) { + alloc[q.bdev]->init_rm_free(q.offset, q.length); + } + } + } + if (shared_alloc) { + shared_alloc->need_init = false; + dout(1) << __func__ << " shared_bdev_used = " + << shared_alloc->bluefs_used << dendl; + } else { + dout(1) << __func__ << " shared bdev not used" + << dendl; + } + + // set up the log for future writes + log.writer = _create_writer(_get_file(1)); + ceph_assert(log.writer->file->fnode.ino == 1); + log.writer->pos = log.writer->file->fnode.size; + log.writer->file->fnode.reset_delta(); + dout(10) << __func__ << " log write pos set to 0x" + << std::hex << log.writer->pos << std::dec + << dendl; + // update log size + logger->set(l_bluefs_log_bytes, log.writer->file->fnode.size); + return 0; + + out: + super = bluefs_super_t(); + return r; +} + +int BlueFS::maybe_verify_layout(const bluefs_layout_t& layout) const +{ + if (super.memorized_layout) { + if (layout == *super.memorized_layout) { + dout(10) << __func__ << " bluefs layout verified positively" << dendl; + } else { + derr << __func__ << " memorized layout doesn't fit current one" << dendl; + return -EIO; + } + } else { + dout(10) << __func__ << " no memorized_layout in bluefs superblock" + << dendl; + } + + return 0; +} + +void BlueFS::umount(bool avoid_compact) +{ + dout(1) << __func__ << dendl; + + sync_metadata(avoid_compact); + if (cct->_conf->bluefs_check_volume_selector_on_umount) { + _check_vselector_LNF(); + } + _close_writer(log.writer); + log.writer = NULL; + log.t.clear(); + + vselector.reset(nullptr); + _stop_alloc(); + nodes.file_map.clear(); + nodes.dir_map.clear(); + super = bluefs_super_t(); + _shutdown_logger(); +} + +int BlueFS::prepare_new_device(int id, const bluefs_layout_t& layout) +{ + dout(1) << __func__ << dendl; + + if(id == BDEV_NEWDB) { + int new_log_dev_cur = BDEV_WAL; + int new_log_dev_next = BDEV_WAL; + if (!bdev[BDEV_WAL]) { + new_log_dev_cur = BDEV_NEWDB; + new_log_dev_next = BDEV_DB; + } + _rewrite_log_and_layout_sync_LNF_LD(false, + BDEV_NEWDB, + new_log_dev_cur, + new_log_dev_next, + RENAME_DB2SLOW, + layout); + } else if(id == BDEV_NEWWAL) { + _rewrite_log_and_layout_sync_LNF_LD(false, + BDEV_DB, + BDEV_NEWWAL, + BDEV_WAL, + REMOVE_WAL, + layout); + } else { + assert(false); + } + return 0; +} + +void BlueFS::collect_metadata(map<string,string> *pm, unsigned skip_bdev_id) +{ + if (skip_bdev_id != BDEV_DB && bdev[BDEV_DB]) + bdev[BDEV_DB]->collect_metadata("bluefs_db_", pm); + if (bdev[BDEV_WAL]) + bdev[BDEV_WAL]->collect_metadata("bluefs_wal_", pm); +} + +void BlueFS::get_devices(set<string> *ls) +{ + for (unsigned i = 0; i < MAX_BDEV; ++i) { + if (bdev[i]) { + bdev[i]->get_devices(ls); + } + } +} + +int BlueFS::fsck() +{ + dout(1) << __func__ << dendl; + // hrm, i think we check everything on mount... + return 0; +} + +int BlueFS::_write_super(int dev) +{ + ++super.version; + // build superblock + bufferlist bl; + encode(super, bl); + uint32_t crc = bl.crc32c(-1); + encode(crc, bl); + dout(10) << __func__ << " super block length(encoded): " << bl.length() << dendl; + dout(10) << __func__ << " superblock " << super.version << dendl; + dout(10) << __func__ << " log_fnode " << super.log_fnode << dendl; + ceph_assert_always(bl.length() <= get_super_length()); + bl.append_zero(get_super_length() - bl.length()); + + bdev[dev]->write(get_super_offset(), bl, false, WRITE_LIFE_SHORT); + dout(20) << __func__ << " v " << super.version + << " crc 0x" << std::hex << crc + << " offset 0x" << get_super_offset() << std::dec + << dendl; + return 0; +} + +int BlueFS::_open_super() +{ + dout(10) << __func__ << dendl; + + bufferlist bl; + uint32_t expected_crc, crc; + int r; + + // always the second block + r = _bdev_read(BDEV_DB, get_super_offset(), get_super_length(), + &bl, ioc[BDEV_DB], false); + if (r < 0) + return r; + + auto p = bl.cbegin(); + decode(super, p); + { + bufferlist t; + t.substr_of(bl, 0, p.get_off()); + crc = t.crc32c(-1); + } + decode(expected_crc, p); + if (crc != expected_crc) { + derr << __func__ << " bad crc on superblock, expected 0x" + << std::hex << expected_crc << " != actual 0x" << crc << std::dec + << dendl; + return -EIO; + } + dout(10) << __func__ << " superblock " << super.version << dendl; + dout(10) << __func__ << " log_fnode " << super.log_fnode << dendl; + return 0; +} + +int BlueFS::_check_allocations(const bluefs_fnode_t& fnode, + boost::dynamic_bitset<uint64_t>* used_blocks, + bool is_alloc, //true when allocating, false when deallocating + const char* op_name) +{ + auto& fnode_extents = fnode.extents; + for (auto e : fnode_extents) { + auto id = e.bdev; + bool fail = false; + ceph_assert(id < MAX_BDEV); + ceph_assert(bdev[id]); + // let's use minimal allocation unit we can have + auto alloc_unit = bdev[id]->get_block_size(); + + if (int r = _verify_alloc_granularity(id, e.offset, e.length, + alloc_unit, + op_name); r < 0) { + return r; + } + + apply_for_bitset_range(e.offset, e.length, alloc_unit, used_blocks[id], + [&](uint64_t pos, boost::dynamic_bitset<uint64_t> &bs) { + if (is_alloc == bs.test(pos)) { + fail = true; + } else { + bs.flip(pos); + } + } + ); + if (fail) { + derr << __func__ << " " << op_name << " invalid extent " << int(e.bdev) + << ": 0x" << std::hex << e.offset << "~" << e.length << std::dec + << (is_alloc == true ? + ": duplicate reference, ino " : ": double free, ino ") + << fnode.ino << dendl; + return -EFAULT; + } + } + return 0; +} + +int BlueFS::_verify_alloc_granularity( + __u8 id, uint64_t offset, uint64_t length, uint64_t alloc_unit, const char *op) +{ + if ((offset & (alloc_unit - 1)) || + (length & (alloc_unit - 1))) { + derr << __func__ << " " << op << " of " << (int)id + << ":0x" << std::hex << offset << "~" << length << std::dec + << " does not align to alloc_size 0x" + << std::hex << alloc_unit << std::dec << dendl; + return -EFAULT; + } + return 0; +} + +int BlueFS::_replay(bool noop, bool to_stdout) +{ + dout(10) << __func__ << (noop ? " NO-OP" : "") << dendl; + ino_last = 1; // by the log + uint64_t log_seq = 0; + + FileRef log_file; + log_file = _get_file(1); + + log_file->fnode = super.log_fnode; + if (!noop) { + log_file->vselector_hint = + vselector->get_hint_for_log(); + } + dout(10) << __func__ << " log_fnode " << super.log_fnode << dendl; + if (unlikely(to_stdout)) { + std::cout << " log_fnode " << super.log_fnode << std::endl; + } + + FileReader *log_reader = new FileReader( + log_file, cct->_conf->bluefs_max_prefetch, + false, // !random + true); // ignore eof + + bool seen_recs = false; + + boost::dynamic_bitset<uint64_t> used_blocks[MAX_BDEV]; + + if (!noop) { + if (cct->_conf->bluefs_log_replay_check_allocations) { + for (size_t i = 0; i < MAX_BDEV; ++i) { + if (bdev[i] != nullptr) { + // let's use minimal allocation unit we can have + auto au = bdev[i]->get_block_size(); + //hmm... on 32TB/4K drive this would take 1GB RAM!!! + used_blocks[i].resize(round_up_to(bdev[i]->get_size(), au) / au); + } + } + // check initial log layout + int r = _check_allocations(log_file->fnode, + used_blocks, true, "Log from super"); + if (r < 0) { + return r; + } + } + } + + while (true) { + ceph_assert((log_reader->buf.pos & ~super.block_mask()) == 0); + uint64_t pos = log_reader->buf.pos; + uint64_t read_pos = pos; + bufferlist bl; + { + int r = _read(log_reader, read_pos, super.block_size, + &bl, NULL); + if (r != (int)super.block_size && cct->_conf->bluefs_replay_recovery) { + r += _do_replay_recovery_read(log_reader, pos, read_pos + r, super.block_size - r, &bl); + } + assert(r == (int)super.block_size); + read_pos += r; + } + uint64_t more = 0; + uint64_t seq; + uuid_d uuid; + { + auto p = bl.cbegin(); + __u8 a, b; + uint32_t len; + decode(a, p); + decode(b, p); + decode(len, p); + decode(uuid, p); + decode(seq, p); + if (len + 6 > bl.length()) { + more = round_up_to(len + 6 - bl.length(), super.block_size); + } + } + if (uuid != super.uuid) { + if (seen_recs) { + dout(10) << __func__ << " 0x" << std::hex << pos << std::dec + << ": stop: uuid " << uuid << " != super.uuid " << super.uuid + << dendl; + } else { + derr << __func__ << " 0x" << std::hex << pos << std::dec + << ": stop: uuid " << uuid << " != super.uuid " << super.uuid + << ", block dump: \n"; + bufferlist t; + t.substr_of(bl, 0, super.block_size); + t.hexdump(*_dout); + *_dout << dendl; + } + break; + } + if (seq != log_seq + 1) { + if (seen_recs) { + dout(10) << __func__ << " 0x" << std::hex << pos << std::dec + << ": stop: seq " << seq << " != expected " << log_seq + 1 + << dendl;; + } else { + derr << __func__ << " 0x" << std::hex << pos << std::dec + << ": stop: seq " << seq << " != expected " << log_seq + 1 + << dendl;; + } + break; + } + if (more) { + dout(20) << __func__ << " need 0x" << std::hex << more << std::dec + << " more bytes" << dendl; + bufferlist t; + int r = _read(log_reader, read_pos, more, &t, NULL); + if (r < (int)more) { + dout(10) << __func__ << " 0x" << std::hex << pos + << ": stop: len is 0x" << bl.length() + more << std::dec + << ", which is past eof" << dendl; + if (cct->_conf->bluefs_replay_recovery) { + //try to search for more data + r += _do_replay_recovery_read(log_reader, pos, read_pos + r, more - r, &t); + if (r < (int)more) { + //in normal mode we must read r==more, for recovery it is too strict + break; + } + } + } + ceph_assert(r == (int)more); + bl.claim_append(t); + read_pos += r; + } + bluefs_transaction_t t; + try { + auto p = bl.cbegin(); + decode(t, p); + seen_recs = true; + } + catch (ceph::buffer::error& e) { + // Multi-block transactions might be incomplete due to unexpected + // power off. Hence let's treat that as a regular stop condition. + if (seen_recs && more) { + dout(10) << __func__ << " 0x" << std::hex << pos << std::dec + << ": stop: failed to decode: " << e.what() + << dendl; + } else { + derr << __func__ << " 0x" << std::hex << pos << std::dec + << ": stop: failed to decode: " << e.what() + << dendl; + delete log_reader; + return -EIO; + } + break; + } + ceph_assert(seq == t.seq); + dout(10) << __func__ << " 0x" << std::hex << pos << std::dec + << ": " << t << dendl; + if (unlikely(to_stdout)) { + std::cout << " 0x" << std::hex << pos << std::dec + << ": " << t << std::endl; + } + + auto p = t.op_bl.cbegin(); + auto pos0 = pos; + while (!p.end()) { + pos = pos0 + p.get_off(); + __u8 op; + decode(op, p); + switch (op) { + + case bluefs_transaction_t::OP_INIT: + dout(20) << __func__ << " 0x" << std::hex << pos << std::dec + << ": op_init" << dendl; + if (unlikely(to_stdout)) { + std::cout << " 0x" << std::hex << pos << std::dec + << ": op_init" << std::endl; + } + + ceph_assert(t.seq == 1); + break; + + case bluefs_transaction_t::OP_JUMP: + { + uint64_t next_seq; + uint64_t offset; + decode(next_seq, p); + decode(offset, p); + dout(20) << __func__ << " 0x" << std::hex << pos << std::dec + << ": op_jump seq " << next_seq + << " offset 0x" << std::hex << offset << std::dec << dendl; + if (unlikely(to_stdout)) { + std::cout << " 0x" << std::hex << pos << std::dec + << ": op_jump seq " << next_seq + << " offset 0x" << std::hex << offset << std::dec + << std::endl; + } + + ceph_assert(next_seq > log_seq); + log_seq = next_seq - 1; // we will increment it below + uint64_t skip = offset - read_pos; + if (skip) { + bufferlist junk; + int r = _read(log_reader, read_pos, skip, &junk, + NULL); + if (r != (int)skip) { + dout(10) << __func__ << " 0x" << std::hex << read_pos + << ": stop: failed to skip to " << offset + << std::dec << dendl; + ceph_abort_msg("problem with op_jump"); + } + } + } + break; + + case bluefs_transaction_t::OP_JUMP_SEQ: + { + uint64_t next_seq; + decode(next_seq, p); + dout(20) << __func__ << " 0x" << std::hex << pos << std::dec + << ": op_jump_seq " << next_seq << dendl; + if (unlikely(to_stdout)) { + std::cout << " 0x" << std::hex << pos << std::dec + << ": op_jump_seq " << next_seq << std::endl; + } + + ceph_assert(next_seq > log_seq); + log_seq = next_seq - 1; // we will increment it below + } + break; + + case bluefs_transaction_t::OP_ALLOC_ADD: + // LEGACY, do nothing but read params + { + __u8 id; + uint64_t offset, length; + decode(id, p); + decode(offset, p); + decode(length, p); + } + break; + + case bluefs_transaction_t::OP_ALLOC_RM: + // LEGACY, do nothing but read params + { + __u8 id; + uint64_t offset, length; + decode(id, p); + decode(offset, p); + decode(length, p); + } + break; + + case bluefs_transaction_t::OP_DIR_LINK: + { + string dirname, filename; + uint64_t ino; + decode(dirname, p); + decode(filename, p); + decode(ino, p); + dout(20) << __func__ << " 0x" << std::hex << pos << std::dec + << ": op_dir_link " << " " << dirname << "/" << filename + << " to " << ino + << dendl; + if (unlikely(to_stdout)) { + std::cout << " 0x" << std::hex << pos << std::dec + << ": op_dir_link " << " " << dirname << "/" << filename + << " to " << ino + << std::endl; + } + + if (!noop) { + FileRef file = _get_file(ino); + ceph_assert(file->fnode.ino); + map<string,DirRef>::iterator q = nodes.dir_map.find(dirname); + ceph_assert(q != nodes.dir_map.end()); + map<string,FileRef>::iterator r = q->second->file_map.find(filename); + ceph_assert(r == q->second->file_map.end()); + + vselector->sub_usage(file->vselector_hint, file->fnode); + file->vselector_hint = + vselector->get_hint_by_dir(dirname); + vselector->add_usage(file->vselector_hint, file->fnode); + + q->second->file_map[filename] = file; + ++file->refs; + } + } + break; + + case bluefs_transaction_t::OP_DIR_UNLINK: + { + string dirname, filename; + decode(dirname, p); + decode(filename, p); + dout(20) << __func__ << " 0x" << std::hex << pos << std::dec + << ": op_dir_unlink " << " " << dirname << "/" << filename + << dendl; + if (unlikely(to_stdout)) { + std::cout << " 0x" << std::hex << pos << std::dec + << ": op_dir_unlink " << " " << dirname << "/" << filename + << std::endl; + } + + if (!noop) { + map<string,DirRef>::iterator q = nodes.dir_map.find(dirname); + ceph_assert(q != nodes.dir_map.end()); + map<string,FileRef>::iterator r = q->second->file_map.find(filename); + ceph_assert(r != q->second->file_map.end()); + ceph_assert(r->second->refs > 0); + --r->second->refs; + q->second->file_map.erase(r); + } + } + break; + + case bluefs_transaction_t::OP_DIR_CREATE: + { + string dirname; + decode(dirname, p); + dout(20) << __func__ << " 0x" << std::hex << pos << std::dec + << ": op_dir_create " << dirname << dendl; + if (unlikely(to_stdout)) { + std::cout << " 0x" << std::hex << pos << std::dec + << ": op_dir_create " << dirname << std::endl; + } + + if (!noop) { + map<string,DirRef>::iterator q = nodes.dir_map.find(dirname); + ceph_assert(q == nodes.dir_map.end()); + nodes.dir_map[dirname] = ceph::make_ref<Dir>(); + } + } + break; + + case bluefs_transaction_t::OP_DIR_REMOVE: + { + string dirname; + decode(dirname, p); + dout(20) << __func__ << " 0x" << std::hex << pos << std::dec + << ": op_dir_remove " << dirname << dendl; + if (unlikely(to_stdout)) { + std::cout << " 0x" << std::hex << pos << std::dec + << ": op_dir_remove " << dirname << std::endl; + } + + if (!noop) { + map<string,DirRef>::iterator q = nodes.dir_map.find(dirname); + ceph_assert(q != nodes.dir_map.end()); + ceph_assert(q->second->file_map.empty()); + nodes.dir_map.erase(q); + } + } + break; + + case bluefs_transaction_t::OP_FILE_UPDATE: + { + bluefs_fnode_t fnode; + decode(fnode, p); + dout(20) << __func__ << " 0x" << std::hex << pos << std::dec + << ": op_file_update " << " " << fnode << " " << dendl; + if (unlikely(to_stdout)) { + std::cout << " 0x" << std::hex << pos << std::dec + << ": op_file_update " << " " << fnode << std::endl; + } + if (!noop) { + FileRef f = _get_file(fnode.ino); + if (cct->_conf->bluefs_log_replay_check_allocations) { + int r = _check_allocations(f->fnode, + used_blocks, false, "OP_FILE_UPDATE"); + if (r < 0) { + return r; + } + } + if (fnode.ino != 1) { + vselector->sub_usage(f->vselector_hint, f->fnode); + } + f->fnode = fnode; + if (fnode.ino != 1) { + vselector->add_usage(f->vselector_hint, f->fnode); + } + + if (fnode.ino > ino_last) { + ino_last = fnode.ino; + } + if (cct->_conf->bluefs_log_replay_check_allocations) { + int r = _check_allocations(f->fnode, + used_blocks, true, "OP_FILE_UPDATE"); + if (r < 0) { + return r; + } + } + } else if (noop && fnode.ino == 1) { + FileRef f = _get_file(fnode.ino); + f->fnode = fnode; + } + } + break; + case bluefs_transaction_t::OP_FILE_UPDATE_INC: + { + bluefs_fnode_delta_t delta; + decode(delta, p); + dout(20) << __func__ << " 0x" << std::hex << pos << std::dec + << ": op_file_update_inc " << " " << delta << " " << dendl; + if (unlikely(to_stdout)) { + std::cout << " 0x" << std::hex << pos << std::dec + << ": op_file_update_inc " << " " << delta << std::endl; + } + if (!noop) { + FileRef f = _get_file(delta.ino); + bluefs_fnode_t& fnode = f->fnode; + if (delta.offset != fnode.allocated) { + derr << __func__ << " invalid op_file_update_inc, new extents miss end of file" + << " fnode=" << fnode + << " delta=" << delta + << dendl; + ceph_assert(delta.offset == fnode.allocated); + } + if (cct->_conf->bluefs_log_replay_check_allocations) { + int r = _check_allocations(fnode, + used_blocks, false, "OP_FILE_UPDATE_INC"); + if (r < 0) { + return r; + } + } + + fnode.ino = delta.ino; + fnode.mtime = delta.mtime; + if (fnode.ino != 1) { + vselector->sub_usage(f->vselector_hint, fnode); + } + fnode.size = delta.size; + fnode.claim_extents(delta.extents); + dout(20) << __func__ << " 0x" << std::hex << pos << std::dec + << ": op_file_update_inc produced " << " " << fnode << " " << dendl; + + if (fnode.ino != 1) { + vselector->add_usage(f->vselector_hint, fnode); + } + + if (fnode.ino > ino_last) { + ino_last = fnode.ino; + } + if (cct->_conf->bluefs_log_replay_check_allocations) { + int r = _check_allocations(f->fnode, + used_blocks, true, "OP_FILE_UPDATE_INC"); + if (r < 0) { + return r; + } + } + } else if (noop && delta.ino == 1) { + // we need to track bluefs log, even in noop mode + FileRef f = _get_file(1); + bluefs_fnode_t& fnode = f->fnode; + fnode.ino = delta.ino; + fnode.mtime = delta.mtime; + fnode.size = delta.size; + fnode.claim_extents(delta.extents); + } + } + break; + + case bluefs_transaction_t::OP_FILE_REMOVE: + { + uint64_t ino; + decode(ino, p); + dout(20) << __func__ << " 0x" << std::hex << pos << std::dec + << ": op_file_remove " << ino << dendl; + if (unlikely(to_stdout)) { + std::cout << " 0x" << std::hex << pos << std::dec + << ": op_file_remove " << ino << std::endl; + } + + if (!noop) { + auto p = nodes.file_map.find(ino); + ceph_assert(p != nodes.file_map.end()); + vselector->sub_usage(p->second->vselector_hint, p->second->fnode); + if (cct->_conf->bluefs_log_replay_check_allocations) { + int r = _check_allocations(p->second->fnode, + used_blocks, false, "OP_FILE_REMOVE"); + if (r < 0) { + return r; + } + } + nodes.file_map.erase(p); + } + } + break; + + default: + derr << __func__ << " 0x" << std::hex << pos << std::dec + << ": stop: unrecognized op " << (int)op << dendl; + delete log_reader; + return -EIO; + } + } + ceph_assert(p.end()); + + // we successfully replayed the transaction; bump the seq and log size + ++log_seq; + log_file->fnode.size = log_reader->buf.pos; + } + if (!noop) { + vselector->add_usage(log_file->vselector_hint, log_file->fnode); + log.seq_live = log_seq + 1; + dirty.seq_live = log_seq + 1; + log.t.seq = log.seq_live; + dirty.seq_stable = log_seq; + } + + dout(10) << __func__ << " log file size was 0x" + << std::hex << log_file->fnode.size << std::dec << dendl; + if (unlikely(to_stdout)) { + std::cout << " log file size was 0x" + << std::hex << log_file->fnode.size << std::dec << std::endl; + } + + delete log_reader; + + if (!noop) { + // verify file link counts are all >0 + for (auto& p : nodes.file_map) { + if (p.second->refs == 0 && + p.second->fnode.ino > 1) { + derr << __func__ << " file with link count 0: " << p.second->fnode + << dendl; + return -EIO; + } + } + } + // reflect file count in logger + logger->set(l_bluefs_num_files, nodes.file_map.size()); + + dout(10) << __func__ << " done" << dendl; + return 0; +} + +int BlueFS::log_dump() +{ + // only dump log file's content + ceph_assert(log.writer == nullptr && "cannot log_dump on mounted BlueFS"); + _init_logger(); + int r = _open_super(); + if (r < 0) { + derr << __func__ << " failed to open super: " << cpp_strerror(r) << dendl; + return r; + } + r = _replay(true, true); + if (r < 0) { + derr << __func__ << " failed to replay log: " << cpp_strerror(r) << dendl; + } + _shutdown_logger(); + super = bluefs_super_t(); + return r; +} + +int BlueFS::device_migrate_to_existing( + CephContext *cct, + const set<int>& devs_source, + int dev_target, + const bluefs_layout_t& layout) +{ + vector<byte> buf; + bool buffered = cct->_conf->bluefs_buffered_io; + + dout(10) << __func__ << " devs_source " << devs_source + << " dev_target " << dev_target << dendl; + assert(dev_target < (int)MAX_BDEV); + + int flags = 0; + flags |= devs_source.count(BDEV_DB) ? + (REMOVE_DB | RENAME_SLOW2DB) : 0; + flags |= devs_source.count(BDEV_WAL) ? REMOVE_WAL : 0; + int dev_target_new = dev_target; + + // Slow device without separate DB one is addressed via BDEV_DB + // Hence need renaming. + if ((flags & REMOVE_DB) && dev_target == BDEV_SLOW) { + dev_target_new = BDEV_DB; + dout(0) << __func__ << " super to be written to " << dev_target << dendl; + } + + for (auto& [ino, file_ref] : nodes.file_map) { + //do not copy log + if (ino == 1) { + continue; + } + dout(10) << __func__ << " " << ino << " " << file_ref->fnode << dendl; + + vselector->sub_usage(file_ref->vselector_hint, file_ref->fnode); + + bool rewrite = std::any_of( + file_ref->fnode.extents.begin(), + file_ref->fnode.extents.end(), + [=](auto& ext) { + return ext.bdev != dev_target && devs_source.count(ext.bdev); + }); + if (rewrite) { + dout(10) << __func__ << " migrating" << dendl; + bluefs_fnode_t old_fnode; + old_fnode.swap_extents(file_ref->fnode); + auto& old_fnode_extents = old_fnode.extents; + // read entire file + bufferlist bl; + for (const auto &old_ext : old_fnode_extents) { + buf.resize(old_ext.length); + int r = _bdev_read_random(old_ext.bdev, + old_ext.offset, + old_ext.length, + (char*)&buf.at(0), + buffered); + if (r != 0) { + derr << __func__ << " failed to read 0x" << std::hex + << old_ext.offset << "~" << old_ext.length << std::dec + << " from " << (int)dev_target << dendl; + return -EIO; + } + bl.append((char*)&buf[0], old_ext.length); + } + + // write entire file + auto l = _allocate(dev_target, bl.length(), 0, + &file_ref->fnode, 0, false); + if (l < 0) { + derr << __func__ << " unable to allocate len 0x" << std::hex + << bl.length() << std::dec << " from " << (int)dev_target + << ": " << cpp_strerror(l) << dendl; + return -ENOSPC; + } + + uint64_t off = 0; + for (auto& i : file_ref->fnode.extents) { + bufferlist cur; + uint64_t cur_len = std::min<uint64_t>(i.length, bl.length() - off); + ceph_assert(cur_len > 0); + cur.substr_of(bl, off, cur_len); + int r = bdev[dev_target]->write(i.offset, cur, buffered); + ceph_assert(r == 0); + off += cur_len; + } + + // release old extents + for (const auto &old_ext : old_fnode_extents) { + PExtentVector to_release; + to_release.emplace_back(old_ext.offset, old_ext.length); + alloc[old_ext.bdev]->release(to_release); + if (is_shared_alloc(old_ext.bdev)) { + shared_alloc->bluefs_used -= to_release.size(); + } + } + + // update fnode + for (auto& i : file_ref->fnode.extents) { + i.bdev = dev_target_new; + } + } else { + for (auto& ext : file_ref->fnode.extents) { + if (dev_target != dev_target_new && ext.bdev == dev_target) { + dout(20) << __func__ << " " << " ... adjusting extent 0x" + << std::hex << ext.offset << std::dec + << " bdev " << dev_target << " -> " << dev_target_new + << dendl; + ext.bdev = dev_target_new; + } + } + } + vselector->add_usage(file_ref->vselector_hint, file_ref->fnode); + } + // new logging device in the current naming scheme + int new_log_dev_cur = bdev[BDEV_WAL] ? + BDEV_WAL : + bdev[BDEV_DB] ? BDEV_DB : BDEV_SLOW; + + // new logging device in new naming scheme + int new_log_dev_next = new_log_dev_cur; + + if (devs_source.count(new_log_dev_cur)) { + // SLOW device is addressed via BDEV_DB too hence either WAL or DB + new_log_dev_next = (flags & REMOVE_WAL) || !bdev[BDEV_WAL] ? + BDEV_DB : + BDEV_WAL; + + dout(0) << __func__ << " log moved from " << new_log_dev_cur + << " to " << new_log_dev_next << dendl; + + new_log_dev_cur = + (flags & REMOVE_DB) && new_log_dev_next == BDEV_DB ? + BDEV_SLOW : + new_log_dev_next; + } + + _rewrite_log_and_layout_sync_LNF_LD( + false, + (flags & REMOVE_DB) ? BDEV_SLOW : BDEV_DB, + new_log_dev_cur, + new_log_dev_next, + flags, + layout); + return 0; +} + +int BlueFS::device_migrate_to_new( + CephContext *cct, + const set<int>& devs_source, + int dev_target, + const bluefs_layout_t& layout) +{ + vector<byte> buf; + bool buffered = cct->_conf->bluefs_buffered_io; + + dout(10) << __func__ << " devs_source " << devs_source + << " dev_target " << dev_target << dendl; + assert(dev_target == (int)BDEV_NEWDB || dev_target == (int)BDEV_NEWWAL); + + int flags = 0; + + flags |= devs_source.count(BDEV_DB) ? + (!bdev[BDEV_SLOW] ? RENAME_DB2SLOW: REMOVE_DB) : + 0; + flags |= devs_source.count(BDEV_WAL) ? REMOVE_WAL : 0; + int dev_target_new = dev_target; //FIXME: remove, makes no sense + + for (auto& [ino, file_ref] : nodes.file_map) { + //do not copy log + if (ino == 1) { + continue; + } + dout(10) << __func__ << " " << ino << " " << file_ref->fnode << dendl; + + vselector->sub_usage(file_ref->vselector_hint, file_ref->fnode); + + bool rewrite = std::any_of( + file_ref->fnode.extents.begin(), + file_ref->fnode.extents.end(), + [=](auto& ext) { + return ext.bdev != dev_target && devs_source.count(ext.bdev); + }); + if (rewrite) { + dout(10) << __func__ << " migrating" << dendl; + bluefs_fnode_t old_fnode; + old_fnode.swap_extents(file_ref->fnode); + auto& old_fnode_extents = old_fnode.extents; + // read entire file + bufferlist bl; + for (const auto &old_ext : old_fnode_extents) { + buf.resize(old_ext.length); + int r = _bdev_read_random(old_ext.bdev, + old_ext.offset, + old_ext.length, + (char*)&buf.at(0), + buffered); + if (r != 0) { + derr << __func__ << " failed to read 0x" << std::hex + << old_ext.offset << "~" << old_ext.length << std::dec + << " from " << (int)dev_target << dendl; + return -EIO; + } + bl.append((char*)&buf[0], old_ext.length); + } + + // write entire file + auto l = _allocate(dev_target, bl.length(), 0, + &file_ref->fnode, 0, false); + if (l < 0) { + derr << __func__ << " unable to allocate len 0x" << std::hex + << bl.length() << std::dec << " from " << (int)dev_target + << ": " << cpp_strerror(l) << dendl; + return -ENOSPC; + } + + uint64_t off = 0; + for (auto& i : file_ref->fnode.extents) { + bufferlist cur; + uint64_t cur_len = std::min<uint64_t>(i.length, bl.length() - off); + ceph_assert(cur_len > 0); + cur.substr_of(bl, off, cur_len); + int r = bdev[dev_target]->write(i.offset, cur, buffered); + ceph_assert(r == 0); + off += cur_len; + } + + // release old extents + for (const auto &old_ext : old_fnode_extents) { + PExtentVector to_release; + to_release.emplace_back(old_ext.offset, old_ext.length); + alloc[old_ext.bdev]->release(to_release); + if (is_shared_alloc(old_ext.bdev)) { + shared_alloc->bluefs_used -= to_release.size(); + } + } + + // update fnode + for (auto& i : file_ref->fnode.extents) { + i.bdev = dev_target_new; + } + } + } + // new logging device in the current naming scheme + int new_log_dev_cur = + bdev[BDEV_NEWWAL] ? + BDEV_NEWWAL : + bdev[BDEV_WAL] && !(flags & REMOVE_WAL) ? + BDEV_WAL : + bdev[BDEV_NEWDB] ? + BDEV_NEWDB : + bdev[BDEV_DB] && !(flags & REMOVE_DB)? + BDEV_DB : + BDEV_SLOW; + + // new logging device in new naming scheme + int new_log_dev_next = + new_log_dev_cur == BDEV_NEWWAL ? + BDEV_WAL : + new_log_dev_cur == BDEV_NEWDB ? + BDEV_DB : + new_log_dev_cur; + + int super_dev = + dev_target == BDEV_NEWDB ? + BDEV_NEWDB : + bdev[BDEV_DB] ? + BDEV_DB : + BDEV_SLOW; + + _rewrite_log_and_layout_sync_LNF_LD( + false, + super_dev, + new_log_dev_cur, + new_log_dev_next, + flags, + layout); + return 0; +} + +BlueFS::FileRef BlueFS::_get_file(uint64_t ino) +{ + auto p = nodes.file_map.find(ino); + if (p == nodes.file_map.end()) { + FileRef f = ceph::make_ref<File>(); + nodes.file_map[ino] = f; + // track files count in logger + logger->set(l_bluefs_num_files, nodes.file_map.size()); + dout(30) << __func__ << " ino " << ino << " = " << f + << " (new)" << dendl; + return f; + } else { + dout(30) << __func__ << " ino " << ino << " = " << p->second << dendl; + return p->second; + } +} + + +/** +To modify fnode both FileWriter::lock and File::lock must be obtained. +The special case is when we modify bluefs log (ino 1) or +we are compacting log (ino 0). + +In any case it is enough to hold File::lock to be sure fnode will not be modified. +*/ +struct lock_fnode_print { + BlueFS::FileRef file; + lock_fnode_print(BlueFS::FileRef file) : file(file) {}; +}; +std::ostream& operator<<(std::ostream& out, const lock_fnode_print& to_lock) { + std::lock_guard l(to_lock.file->lock); + out << to_lock.file->fnode; + return out; +} + +void BlueFS::_drop_link_D(FileRef file) +{ + dout(20) << __func__ << " had refs " << file->refs + << " on " << lock_fnode_print(file) << dendl; + ceph_assert(file->refs > 0); + ceph_assert(ceph_mutex_is_locked(log.lock)); + ceph_assert(ceph_mutex_is_locked(nodes.lock)); + + --file->refs; + if (file->refs == 0) { + dout(20) << __func__ << " destroying " << file->fnode << dendl; + ceph_assert(file->num_reading.load() == 0); + vselector->sub_usage(file->vselector_hint, file->fnode); + log.t.op_file_remove(file->fnode.ino); + nodes.file_map.erase(file->fnode.ino); + logger->set(l_bluefs_num_files, nodes.file_map.size()); + file->deleted = true; + + std::lock_guard dl(dirty.lock); + for (auto& r : file->fnode.extents) { + dirty.pending_release[r.bdev].insert(r.offset, r.length); + } + if (file->dirty_seq > dirty.seq_stable) { + // retract request to serialize changes + ceph_assert(dirty.files.count(file->dirty_seq)); + auto it = dirty.files[file->dirty_seq].iterator_to(*file); + dirty.files[file->dirty_seq].erase(it); + file->dirty_seq = dirty.seq_stable; + } + } +} + +int64_t BlueFS::_read_random( + FileReader *h, ///< [in] read from here + uint64_t off, ///< [in] offset + uint64_t len, ///< [in] this many bytes + char *out) ///< [out] copy it here +{ + auto* buf = &h->buf; + + int64_t ret = 0; + dout(10) << __func__ << " h " << h + << " 0x" << std::hex << off << "~" << len << std::dec + << " from " << lock_fnode_print(h->file) << dendl; + + ++h->file->num_reading; + + if (!h->ignore_eof && + off + len > h->file->fnode.size) { + if (off > h->file->fnode.size) + len = 0; + else + len = h->file->fnode.size - off; + dout(20) << __func__ << " reaching (or past) eof, len clipped to 0x" + << std::hex << len << std::dec << dendl; + } + logger->inc(l_bluefs_read_random_count, 1); + logger->inc(l_bluefs_read_random_bytes, len); + + std::shared_lock s_lock(h->lock); + buf->bl.reassign_to_mempool(mempool::mempool_bluefs_file_reader); + while (len > 0) { + if (off < buf->bl_off || off >= buf->get_buf_end()) { + s_lock.unlock(); + uint64_t x_off = 0; + auto p = h->file->fnode.seek(off, &x_off); + ceph_assert(p != h->file->fnode.extents.end()); + uint64_t l = std::min(p->length - x_off, len); + //hard cap to 1GB + l = std::min(l, uint64_t(1) << 30); + dout(20) << __func__ << " read random 0x" + << std::hex << x_off << "~" << l << std::dec + << " of " << *p << dendl; + int r; + if (!cct->_conf->bluefs_check_for_zeros) { + r = _bdev_read_random(p->bdev, p->offset + x_off, l, out, + cct->_conf->bluefs_buffered_io); + } else { + r = _read_random_and_check(p->bdev, p->offset + x_off, l, out, + cct->_conf->bluefs_buffered_io); + } + ceph_assert(r == 0); + off += l; + len -= l; + ret += l; + out += l; + + logger->inc(l_bluefs_read_random_disk_count, 1); + logger->inc(l_bluefs_read_random_disk_bytes, l); + if (len > 0) { + s_lock.lock(); + } + } else { + auto left = buf->get_buf_remaining(off); + int64_t r = std::min(len, left); + logger->inc(l_bluefs_read_random_buffer_count, 1); + logger->inc(l_bluefs_read_random_buffer_bytes, r); + dout(20) << __func__ << " left 0x" << std::hex << left + << " 0x" << off << "~" << len << std::dec + << dendl; + + auto p = buf->bl.begin(); + p.seek(off - buf->bl_off); + p.copy(r, out); + out += r; + + dout(30) << __func__ << " result chunk (0x" + << std::hex << r << std::dec << " bytes):\n"; + bufferlist t; + t.substr_of(buf->bl, off - buf->bl_off, r); + t.hexdump(*_dout); + *_dout << dendl; + + off += r; + len -= r; + ret += r; + buf->pos += r; + } + } + dout(20) << __func__ << std::hex + << " got 0x" << ret + << std::dec << dendl; + --h->file->num_reading; + return ret; +} + +int64_t BlueFS::_read( + FileReader *h, ///< [in] read from here + uint64_t off, ///< [in] offset + size_t len, ///< [in] this many bytes + bufferlist *outbl, ///< [out] optional: reference the result here + char *out) ///< [out] optional: or copy it here +{ + FileReaderBuffer *buf = &(h->buf); + + bool prefetch = !outbl && !out; + dout(10) << __func__ << " h " << h + << " 0x" << std::hex << off << "~" << len << std::dec + << " from " << lock_fnode_print(h->file) + << (prefetch ? " prefetch" : "") + << dendl; + + ++h->file->num_reading; + + if (!h->ignore_eof && + off + len > h->file->fnode.size) { + if (off > h->file->fnode.size) + len = 0; + else + len = h->file->fnode.size - off; + dout(20) << __func__ << " reaching (or past) eof, len clipped to 0x" + << std::hex << len << std::dec << dendl; + } + logger->inc(l_bluefs_read_count, 1); + logger->inc(l_bluefs_read_bytes, len); + if (prefetch) { + logger->inc(l_bluefs_read_prefetch_count, 1); + logger->inc(l_bluefs_read_prefetch_bytes, len); + } + + if (outbl) + outbl->clear(); + + int64_t ret = 0; + std::shared_lock s_lock(h->lock); + while (len > 0) { + size_t left; + if (off < buf->bl_off || off >= buf->get_buf_end()) { + s_lock.unlock(); + std::unique_lock u_lock(h->lock); + buf->bl.reassign_to_mempool(mempool::mempool_bluefs_file_reader); + if (off < buf->bl_off || off >= buf->get_buf_end()) { + // if precondition hasn't changed during locking upgrade. + buf->bl.clear(); + buf->bl_off = off & super.block_mask(); + uint64_t x_off = 0; + auto p = h->file->fnode.seek(buf->bl_off, &x_off); + if (p == h->file->fnode.extents.end()) { + dout(5) << __func__ << " reading less then required " + << ret << "<" << ret + len << dendl; + break; + } + + uint64_t want = round_up_to(len + (off & ~super.block_mask()), + super.block_size); + want = std::max(want, buf->max_prefetch); + uint64_t l = std::min(p->length - x_off, want); + //hard cap to 1GB + l = std::min(l, uint64_t(1) << 30); + uint64_t eof_offset = round_up_to(h->file->fnode.size, super.block_size); + if (!h->ignore_eof && + buf->bl_off + l > eof_offset) { + l = eof_offset - buf->bl_off; + } + dout(20) << __func__ << " fetching 0x" + << std::hex << x_off << "~" << l << std::dec + << " of " << *p << dendl; + int r; + // when reading BlueFS log (only happens on startup) use non-buffered io + // it makes it in sync with logic in _flush_range() + bool use_buffered_io = h->file->fnode.ino == 1 ? false : cct->_conf->bluefs_buffered_io; + if (!cct->_conf->bluefs_check_for_zeros) { + r = _bdev_read(p->bdev, p->offset + x_off, l, &buf->bl, ioc[p->bdev], + use_buffered_io); + } else { + r = _read_and_check( + p->bdev, p->offset + x_off, l, &buf->bl, ioc[p->bdev], + use_buffered_io); + } + logger->inc(l_bluefs_read_disk_count, 1); + logger->inc(l_bluefs_read_disk_bytes, l); + + ceph_assert(r == 0); + } + u_lock.unlock(); + s_lock.lock(); + // we should recheck if buffer is valid after lock downgrade + continue; + } + left = buf->get_buf_remaining(off); + dout(20) << __func__ << " left 0x" << std::hex << left + << " len 0x" << len << std::dec << dendl; + + int64_t r = std::min(len, left); + if (outbl) { + bufferlist t; + t.substr_of(buf->bl, off - buf->bl_off, r); + outbl->claim_append(t); + } + if (out) { + auto p = buf->bl.begin(); + p.seek(off - buf->bl_off); + p.copy(r, out); + out += r; + } + + dout(30) << __func__ << " result chunk (0x" + << std::hex << r << std::dec << " bytes):\n"; + bufferlist t; + t.substr_of(buf->bl, off - buf->bl_off, r); + t.hexdump(*_dout); + *_dout << dendl; + + off += r; + len -= r; + ret += r; + buf->pos += r; + } + + dout(20) << __func__ << std::hex + << " got 0x" << ret + << std::dec << dendl; + ceph_assert(!outbl || (int)outbl->length() == ret); + --h->file->num_reading; + return ret; +} + +void BlueFS::invalidate_cache(FileRef f, uint64_t offset, uint64_t length) +{ + std::lock_guard l(f->lock); + dout(10) << __func__ << " file " << f->fnode + << " 0x" << std::hex << offset << "~" << length << std::dec + << dendl; + if (offset & ~super.block_mask()) { + offset &= super.block_mask(); + length = round_up_to(length, super.block_size); + } + uint64_t x_off = 0; + auto p = f->fnode.seek(offset, &x_off); + while (length > 0 && p != f->fnode.extents.end()) { + uint64_t x_len = std::min(p->length - x_off, length); + bdev[p->bdev]->invalidate_cache(p->offset + x_off, x_len); + dout(20) << __func__ << " 0x" << std::hex << x_off << "~" << x_len + << std:: dec << " of " << *p << dendl; + offset += x_len; + length -= x_len; + } +} + + +uint64_t BlueFS::_estimate_transaction_size(bluefs_transaction_t* t) +{ + uint64_t max_alloc_size = std::max(alloc_size[BDEV_WAL], + std::max(alloc_size[BDEV_DB], + alloc_size[BDEV_SLOW])); + + // conservative estimate for final encoded size + return round_up_to(t->op_bl.length() + super.block_size * 2, max_alloc_size); +} + +uint64_t BlueFS::_make_initial_transaction(uint64_t start_seq, + bluefs_fnode_t& fnode, + uint64_t expected_final_size, + bufferlist* out) +{ + bluefs_transaction_t t0; + t0.seq = start_seq; + t0.uuid = super.uuid; + t0.op_init(); + t0.op_file_update_inc(fnode); + t0.op_jump(start_seq, expected_final_size); // this is a fixed size op, + // hence it's valid with fake + // params for overall txc size + // estimation + if (!out) { + return _estimate_transaction_size(&t0); + } + + ceph_assert(expected_final_size > 0); + out->reserve(expected_final_size); + encode(t0, *out); + // make sure we're not wrong aboth the size + ceph_assert(out->length() <= expected_final_size); + _pad_bl(*out, expected_final_size); + return expected_final_size; +} + +uint64_t BlueFS::_estimate_log_size_N() +{ + std::lock_guard nl(nodes.lock); + int avg_dir_size = 40; // fixme + int avg_file_size = 12; + uint64_t size = 4096 * 2; + size += nodes.file_map.size() * (1 + sizeof(bluefs_fnode_t)); + size += nodes.dir_map.size() + (1 + avg_dir_size); + size += nodes.file_map.size() * (1 + avg_dir_size + avg_file_size); + return round_up_to(size, super.block_size); +} + +void BlueFS::compact_log()/*_LNF_LD_NF_D*/ +{ + if (!cct->_conf->bluefs_replay_recovery_disable_compact) { + if (cct->_conf->bluefs_compact_log_sync) { + _compact_log_sync_LNF_LD(); + } else { + _compact_log_async_LD_LNF_D(); + } + } +} + +bool BlueFS::_should_start_compact_log_L_N() +{ + if (log_is_compacting.load() == true) { + // compaction is already running + return false; + } + uint64_t current; + { + std::lock_guard ll(log.lock); + current = log.writer->file->fnode.size; + } + uint64_t expected = _estimate_log_size_N(); + float ratio = (float)current / (float)expected; + dout(10) << __func__ << " current 0x" << std::hex << current + << " expected " << expected << std::dec + << " ratio " << ratio + << dendl; + if (current < cct->_conf->bluefs_log_compact_min_size || + ratio < cct->_conf->bluefs_log_compact_min_ratio) { + return false; + } + return true; +} + +void BlueFS::_compact_log_dump_metadata_NF(uint64_t start_seq, + bluefs_transaction_t *t, + int bdev_update_flags, + uint64_t capture_before_seq) +{ + dout(20) << __func__ << dendl; + t->seq = start_seq; + t->uuid = super.uuid; + + std::lock_guard nl(nodes.lock); + + for (auto& [ino, file_ref] : nodes.file_map) { + if (ino == 1) + continue; + ceph_assert(ino > 1); + std::lock_guard fl(file_ref->lock); + if (bdev_update_flags) { + for(auto& e : file_ref->fnode.extents) { + auto bdev = e.bdev; + auto bdev_new = bdev; + ceph_assert(!((bdev_update_flags & REMOVE_WAL) && bdev == BDEV_WAL)); + if ((bdev_update_flags & RENAME_SLOW2DB) && bdev == BDEV_SLOW) { + bdev_new = BDEV_DB; + } + if ((bdev_update_flags & RENAME_DB2SLOW) && bdev == BDEV_DB) { + bdev_new = BDEV_SLOW; + } + if (bdev == BDEV_NEWDB) { + // REMOVE_DB xor RENAME_DB + ceph_assert(!(bdev_update_flags & REMOVE_DB) != !(bdev_update_flags & RENAME_DB2SLOW)); + ceph_assert(!(bdev_update_flags & RENAME_SLOW2DB)); + bdev_new = BDEV_DB; + } + if (bdev == BDEV_NEWWAL) { + ceph_assert(bdev_update_flags & REMOVE_WAL); + bdev_new = BDEV_WAL; + } + e.bdev = bdev_new; + } + } + if (capture_before_seq == 0 || file_ref->dirty_seq < capture_before_seq) { + dout(20) << __func__ << " op_file_update " << file_ref->fnode << dendl; + } else { + dout(20) << __func__ << " op_file_update just modified, dirty_seq=" + << file_ref->dirty_seq << " " << file_ref->fnode << dendl; + } + t->op_file_update(file_ref->fnode); + } + for (auto& [path, dir_ref] : nodes.dir_map) { + dout(20) << __func__ << " op_dir_create " << path << dendl; + t->op_dir_create(path); + for (auto& [fname, file_ref] : dir_ref->file_map) { + dout(20) << __func__ << " op_dir_link " << path << "/" << fname + << " to " << file_ref->fnode.ino << dendl; + t->op_dir_link(path, fname, file_ref->fnode.ino); + } + } +} + +void BlueFS::_compact_log_sync_LNF_LD() +{ + dout(10) << __func__ << dendl; + uint8_t prefer_bdev; + { + std::lock_guard ll(log.lock); + prefer_bdev = + vselector->select_prefer_bdev(log.writer->file->vselector_hint); + } + _rewrite_log_and_layout_sync_LNF_LD(true, + BDEV_DB, + prefer_bdev, + prefer_bdev, + 0, + super.memorized_layout); + logger->inc(l_bluefs_log_compactions); +} + +/* + * SYNC LOG COMPACTION + * + * 0. Lock the log completely through the whole procedure + * + * 1. Build new log. It will include log's starter and compacted metadata + * body. Jump op appended to the starter will link the pieces together. + * + * 2. Write out new log's content + * + * 3. Write out new superblock. This includes relevant device layout update. + * + * 4. Finalization. Old space release. + */ + +void BlueFS::_rewrite_log_and_layout_sync_LNF_LD(bool permit_dev_fallback, + int super_dev, + int log_dev, + int log_dev_new, + int flags, + std::optional<bluefs_layout_t> layout) +{ + // we substitute log_dev with log_dev_new for new allocations below + // and permitting fallback allocations prevents such a substitution + ceph_assert((permit_dev_fallback && log_dev == log_dev_new) || + !permit_dev_fallback); + + dout(10) << __func__ << " super_dev:" << super_dev + << " log_dev:" << log_dev + << " log_dev_new:" << log_dev_new + << " flags:" << flags + << " seq:" << log.seq_live + << dendl; + utime_t mtime = ceph_clock_now(); + uint64_t starter_seq = 1; + + // Part 0. + // Lock the log totally till the end of the procedure + std::lock_guard ll(log.lock); + auto t0 = mono_clock::now(); + + File *log_file = log.writer->file.get(); + bluefs_fnode_t fnode_tail; + // log.t.seq is always set to current live seq + ceph_assert(log.t.seq == log.seq_live); + // Capturing entire state. Dump anything that has been stored there. + log.t.clear(); + log.t.seq = log.seq_live; + // From now on, no changes to log.t are permitted until we finish rewriting log. + // Can allow dirty to remain dirty - log.seq_live will not change. + + // + // Part 1. + // Build new log starter and compacted metadata body + // 1.1. Build full compacted meta transaction. + // Encode a bluefs transaction that dumps all of the in-memory fnodes + // and names. + // This might be pretty large and its allocation map can exceed + // superblock size. Hence instead we'll need log starter part which + // goes to superblock and refers that new meta through op_update_inc. + // 1.2. Allocate space for the above transaction + // using its size estimation. + // 1.3. Allocate the space required for the starter part of the new log. + // It should be small enough to fit into superblock. + // 1.4 Building new log persistent fnode representation which will + // finally land to disk. + // Depending on input parameters we might need to perform device ids + // rename - runtime and persistent replicas should be different when we + // are in the device migration process. + // 1.5 Store starter fnode to run-time superblock, to be written out later. + // It doesn't contain compacted meta to fit relevant alocation map into + // superblock. + // 1.6 Proceed building new log persistent fnode representation. + // Will add log tail with compacted meta extents from 1.1. + // Device rename applied as well + // + // 1.7. Encode new log fnode starter, + // It will include op_init, new log's op_update_inc + // and jump to the compacted meta transaction beginning. + // Superblock will reference this starter part + // + // 1.8. Encode compacted meta transaction, + // extend the transaction with a jump to proper sequence no + // + + + // 1.1 Build full compacted meta transaction + bluefs_transaction_t compacted_meta_t; + _compact_log_dump_metadata_NF(starter_seq + 1, &compacted_meta_t, flags, 0); + + // 1.2 Allocate the space required for the compacted meta transaction + uint64_t compacted_meta_need = + _estimate_transaction_size(&compacted_meta_t) + + cct->_conf->bluefs_max_log_runway; + + dout(20) << __func__ << " compacted_meta_need " << compacted_meta_need << dendl; + + int r = _allocate(log_dev, compacted_meta_need, 0, &fnode_tail, 0, + permit_dev_fallback); + ceph_assert(r == 0); + + + // 1.3 Allocate the space required for the starter part of the new log. + // estimate new log fnode size to be referenced from superblock + // hence use dummy fnode and jump parameters + uint64_t starter_need = _make_initial_transaction(starter_seq, fnode_tail, 0, nullptr); + + bluefs_fnode_t fnode_starter(log_file->fnode.ino, 0, mtime); + r = _allocate(log_dev, starter_need, 0, &fnode_starter, 0, + permit_dev_fallback); + ceph_assert(r == 0); + + // 1.4 Building starter fnode + bluefs_fnode_t fnode_persistent(fnode_starter.ino, 0, mtime); + for (auto p : fnode_starter.extents) { + // rename device if needed - this is possible when fallback allocations + // are prohibited only. Which means every extent is targeted to the same + // device and we can unconditionally update them. + if (log_dev != log_dev_new) { + dout(10) << __func__ << " renaming log extents to " + << log_dev_new << dendl; + p.bdev = log_dev_new; + } + fnode_persistent.append_extent(p); + } + + // 1.5 Store starter fnode to run-time superblock, to be written out later + super.log_fnode = fnode_persistent; + + // 1.6 Proceed building new log persistent fnode representation + // we'll build incremental update starting from this point + fnode_persistent.reset_delta(); + for (auto p : fnode_tail.extents) { + // rename device if needed - this is possible when fallback allocations + // are prohibited only. Which means every extent is targeted to the same + // device and we can unconditionally update them. + if (log_dev != log_dev_new) { + dout(10) << __func__ << " renaming log extents to " + << log_dev_new << dendl; + p.bdev = log_dev_new; + } + fnode_persistent.append_extent(p); + } + + // 1.7 Encode new log fnode + // This will flush incremental part of fnode_persistent only. + bufferlist starter_bl; + _make_initial_transaction(starter_seq, fnode_persistent, starter_need, &starter_bl); + + // 1.8 Encode compacted meta transaction + dout(20) << __func__ << " op_jump_seq " << log.seq_live << dendl; + // hopefully "compact_meta_need" estimation provides enough extra space + // for this op, assert below if not + compacted_meta_t.op_jump_seq(log.seq_live); + + bufferlist compacted_meta_bl; + encode(compacted_meta_t, compacted_meta_bl); + _pad_bl(compacted_meta_bl); + ceph_assert(compacted_meta_bl.length() <= compacted_meta_need); + + // + // Part 2 + // Write out new log's content + // 2.1. Build the full runtime new log's fnode + // + // 2.2. Write out new log's + // + // 2.3. Do flush and wait for completion through flush_bdev() + // + // 2.4. Finalize log update + // Update all sequence numbers + // + + // 2.1 Build the full runtime new log's fnode + bluefs_fnode_t old_log_fnode; + old_log_fnode.swap(fnode_starter); + old_log_fnode.clone_extents(fnode_tail); + old_log_fnode.reset_delta(); + log_file->fnode.swap(old_log_fnode); + + // 2.2 Write out new log's content + // Get rid off old writer + _close_writer(log.writer); + // Make new log writer and stage new log's content writing + log.writer = _create_writer(log_file); + log.writer->append(starter_bl); + log.writer->append(compacted_meta_bl); + + // 2.3 Do flush and wait for completion through flush_bdev() + _flush_special(log.writer); +#ifdef HAVE_LIBAIO + if (!cct->_conf->bluefs_sync_write) { + list<aio_t> completed_ios; + _claim_completed_aios(log.writer, &completed_ios); + _wait_for_aio(log.writer); + completed_ios.clear(); + } +#endif + _flush_bdev(); + + // 2.4 Finalize log update + ++log.seq_live; + dirty.seq_live = log.seq_live; + log.t.seq = log.seq_live; + vselector->sub_usage(log_file->vselector_hint, old_log_fnode); + vselector->add_usage(log_file->vselector_hint, log_file->fnode); + + // Part 3. + // Write out new superblock to reflect all the changes. + // + + super.memorized_layout = layout; + _write_super(super_dev); + _flush_bdev(); + + // we're mostly done + dout(10) << __func__ << " log extents " << log_file->fnode.extents << dendl; + logger->inc(l_bluefs_log_compactions); + + // Part 4 + // Finalization. Release old space. + // + { + dout(10) << __func__ + << " release old log extents " << old_log_fnode.extents + << dendl; + std::lock_guard dl(dirty.lock); + for (auto& r : old_log_fnode.extents) { + dirty.pending_release[r.bdev].insert(r.offset, r.length); + } + } + logger->tinc(l_bluefs_compaction_lock_lat, mono_clock::now() - t0); +} + +/* + * ASYNC LOG COMPACTION + * + * 0. Lock the log and forbid its extension. The former covers just + * a part of the below procedure while the latter spans over it + * completely. + * 1. Allocate a new extent to continue the log, and then log an event + * that jumps the log write position to the new extent. At this point, the + * old extent(s) won't be written to, and reflect everything to compact. + * New events will be written to the new region that we'll keep. + * The latter will finally become new log tail on compaction completion. + * + * 2. Build new log. It will include log's starter, compacted metadata + * body and the above tail. Jump ops appended to the starter and meta body + * will link the pieces togather. Log's lock is releases in the mid of the + * process to permit parallel access to it. + * + * 3. Write out new log's content. + * + * 4. Write out new superblock to reflect all the changes. + * + * 5. Apply new log fnode, log is locked for a while. + * + * 6. Finalization. Clean up, old space release and total unlocking. + */ + +void BlueFS::_compact_log_async_LD_LNF_D() //also locks FW for new_writer +{ + dout(10) << __func__ << dendl; + utime_t mtime = ceph_clock_now(); + uint64_t starter_seq = 1; + uint64_t old_log_jump_to = 0; + + // Part 0. + // Lock the log and forbid its expansion and other compactions + + // only one compaction allowed at one time + bool old_is_comp = std::atomic_exchange(&log_is_compacting, true); + if (old_is_comp) { + dout(10) << __func__ << " ongoing" <<dendl; + return; + } + // lock log's run-time structures for a while + log.lock.lock(); + auto t0 = mono_clock::now(); + + // Part 1. + // Prepare current log for jumping into it. + // 1. Allocate extent + // 2. Update op to log + // 3. Jump op to log + // During that, no one else can write to log, otherwise we risk jumping backwards. + // We need to sync log, because we are injecting discontinuity, and writer is not prepared for that. + + //signal _maybe_extend_log that expansion of log is temporary inacceptable + bool old_forbidden = atomic_exchange(&log_forbidden_to_expand, true); + ceph_assert(old_forbidden == false); + + // + // Part 1. + // Prepare current log for jumping into it. + // 1.1. Allocate extent + // 1.2. Save log's fnode extents and add new extents + // 1.3. Update op to log + // 1.4. Jump op to log + // During that, no one else can write to log, otherwise we risk jumping backwards. + // We need to sync log, because we are injecting discontinuity, and writer is not prepared for that. + + // 1.1 allocate new log extents and store them at fnode_tail + File *log_file = log.writer->file.get(); + old_log_jump_to = log_file->fnode.get_allocated(); + bluefs_fnode_t fnode_tail; + uint64_t runway = log_file->fnode.get_allocated() - log.writer->get_effective_write_pos(); + dout(10) << __func__ << " old_log_jump_to 0x" << std::hex << old_log_jump_to + << " need 0x" << cct->_conf->bluefs_max_log_runway << std::dec << dendl; + int r = _allocate(vselector->select_prefer_bdev(log_file->vselector_hint), + cct->_conf->bluefs_max_log_runway, + 0, + &fnode_tail); + ceph_assert(r == 0); + + // 1.2 save log's fnode extents and add new extents + bluefs_fnode_t old_log_fnode(log_file->fnode); + log_file->fnode.clone_extents(fnode_tail); + //adjust usage as flush below will need it + vselector->sub_usage(log_file->vselector_hint, old_log_fnode); + vselector->add_usage(log_file->vselector_hint, log_file->fnode); + dout(10) << __func__ << " log extents " << log_file->fnode.extents << dendl; + + // 1.3 update the log file change and log a jump to the offset where we want to + // write the new entries + log.t.op_file_update_inc(log_file->fnode); + + // 1.4 jump to new position should mean next seq + log.t.op_jump(log.seq_live + 1, old_log_jump_to); + uint64_t seq_now = log.seq_live; + // we need to flush all bdev because we will be streaming all dirty files to log + // TODO - think - if _flush_and_sync_log_jump will not add dirty files nor release pending allocations + // then flush_bdev() will not be necessary + _flush_bdev(); + _flush_and_sync_log_jump_D(old_log_jump_to, runway); + + // + // Part 2. + // Build new log starter and compacted metadata body + // 2.1. Build full compacted meta transaction. + // While still holding the lock, encode a bluefs transaction + // that dumps all of the in-memory fnodes and names. + // This might be pretty large and its allocation map can exceed + // superblock size. Hence instead we'll need log starter part which + // goes to superblock and refers that new meta through op_update_inc. + // 2.2. After releasing the lock allocate space for the above transaction + // using its size estimation. + // Then build tailing list of extents which consists of these + // newly allocated extents followed by ones from Part 1. + // 2.3. Allocate the space required for the starter part of the new log. + // It should be small enough to fit into superblock. + // Effectively we start building new log fnode here. + // 2.4. Store starter fnode to run-time superblock, to be written out later + // 2.5. Finalize new log's fnode building + // This will include log's starter and tailing extents built at 2.2 + // 2.6. Encode new log fnode starter, + // It will include op_init, new log's op_update_inc + // and jump to the compacted meta transaction beginning. + // Superblock will reference this starter part + // 2.7. Encode compacted meta transaction, + // extend the transaction with a jump to the log tail from 1.1 before + // encoding. + // + + // 2.1 Build full compacted meta transaction + bluefs_transaction_t compacted_meta_t; + _compact_log_dump_metadata_NF(starter_seq + 1, &compacted_meta_t, 0, seq_now); + + // now state is captured to compacted_meta_t, + // current log can be used to write to, + //ops in log will be continuation of captured state + logger->tinc(l_bluefs_compaction_lock_lat, mono_clock::now() - t0); + log.lock.unlock(); + + // 2.2 Allocate the space required for the compacted meta transaction + uint64_t compacted_meta_need = _estimate_transaction_size(&compacted_meta_t); + dout(20) << __func__ << " compacted_meta_need " << compacted_meta_need + << dendl; + { + bluefs_fnode_t fnode_pre_tail; + // do allocate + r = _allocate(vselector->select_prefer_bdev(log_file->vselector_hint), + compacted_meta_need, + 0, + &fnode_pre_tail); + ceph_assert(r == 0); + // build trailing list of extents in fnode_tail, + // this will include newly allocated extents for compacted meta + // and aux extents allocated at step 1.1 + fnode_pre_tail.claim_extents(fnode_tail.extents); + fnode_tail.swap_extents(fnode_pre_tail); + } + + // 2.3 Allocate the space required for the starter part of the new log. + // Start building New log fnode + FileRef new_log = nullptr; + new_log = ceph::make_ref<File>(); + new_log->fnode.ino = log_file->fnode.ino; + new_log->fnode.mtime = mtime; + // Estimate the required space + uint64_t starter_need = + _make_initial_transaction(starter_seq, fnode_tail, 0, nullptr); + // and now allocate and store at new_log_fnode + r = _allocate(vselector->select_prefer_bdev(log_file->vselector_hint), + starter_need, + 0, + &new_log->fnode); + ceph_assert(r == 0); + + // 2.4 Store starter fnode to run-time superblock, to be written out later + super.log_fnode = new_log->fnode; + + // 2.5 Finalize new log's fnode building + // start collecting new log fnode updates (to make op_update_inc later) + // since this point. This will include compacted meta from 2.2 and aux + // extents from 1.1. + new_log->fnode.reset_delta(); + new_log->fnode.claim_extents(fnode_tail.extents); + + // 2.6 Encode new log fnode + bufferlist starter_bl; + _make_initial_transaction(starter_seq, new_log->fnode, starter_need, + &starter_bl); + + // 2.7 Encode compacted meta transaction, + dout(20) << __func__ + << " new_log jump seq " << seq_now + << std::hex << " offset 0x" << starter_need + compacted_meta_need + << std::dec << dendl; + // Extent compacted_meta transaction with a just to new log tail. + // Hopefully "compact_meta_need" estimation provides enough extra space + // for this new jump, assert below if not + compacted_meta_t.op_jump(seq_now, starter_need + compacted_meta_need); + // Now do encodeing and padding + bufferlist compacted_meta_bl; + compacted_meta_bl.reserve(compacted_meta_need); + encode(compacted_meta_t, compacted_meta_bl); + ceph_assert(compacted_meta_bl.length() <= compacted_meta_need); + _pad_bl(compacted_meta_bl, compacted_meta_need); + + // + // Part 3. + // Write out new log's content + // 3.1 Stage new log's content writing + // 3.2 Do flush and wait for completion through flush_bdev() + // + + // 3.1 Stage new log's content writing + // Make new log writer and append bufferlists to write out. + FileWriter *new_log_writer = _create_writer(new_log); + // And append all new log's bufferlists to write out. + new_log_writer->append(starter_bl); + new_log_writer->append(compacted_meta_bl); + + // 3.2. flush and wait + _flush_special(new_log_writer); + _flush_bdev(new_log_writer, false); // do not check log.lock is locked + + // Part 4. + // Write out new superblock to reflect all the changes. + // + + _write_super(BDEV_DB); + _flush_bdev(); + + // Part 5. + // Apply new log fnode + // + + // we need to acquire log's lock back at this point + log.lock.lock(); + // Reconstruct actual log object from the new one. + vselector->sub_usage(log_file->vselector_hint, log_file->fnode); + log_file->fnode.size = + log.writer->pos - old_log_jump_to + starter_need + compacted_meta_need; + log_file->fnode.mtime = std::max(mtime, log_file->fnode.mtime); + log_file->fnode.swap_extents(new_log->fnode); + // update log's writer + log.writer->pos = log.writer->file->fnode.size; + vselector->add_usage(log_file->vselector_hint, log_file->fnode); + // and unlock + log.lock.unlock(); + + // we're mostly done + dout(10) << __func__ << " log extents " << log_file->fnode.extents << dendl; + logger->inc(l_bluefs_log_compactions); + + //Part 6. + // Finalization + // 6.1 Permit log's extension, forbidden at step 0. + // + // 6.2 Release the new log writer + // + // 6.3 Release old space + // + // 6.4. Enable other compactions + // + + // 6.1 Permit log's extension, forbidden at step 0. + old_forbidden = atomic_exchange(&log_forbidden_to_expand, false); + ceph_assert(old_forbidden == true); + //to wake up if someone was in need of expanding log + log_cond.notify_all(); + + // 6.2 Release the new log writer + _close_writer(new_log_writer); + new_log_writer = nullptr; + new_log = nullptr; + + // 6.3 Release old space + { + dout(10) << __func__ + << " release old log extents " << old_log_fnode.extents + << dendl; + std::lock_guard dl(dirty.lock); + for (auto& r : old_log_fnode.extents) { + dirty.pending_release[r.bdev].insert(r.offset, r.length); + } + } + + // 6.4. Enable other compactions + old_is_comp = atomic_exchange(&log_is_compacting, false); + ceph_assert(old_is_comp); +} + +void BlueFS::_pad_bl(bufferlist& bl, uint64_t pad_size) +{ + pad_size = std::max(pad_size, uint64_t(super.block_size)); + uint64_t partial = bl.length() % pad_size; + if (partial) { + dout(10) << __func__ << " padding with 0x" << std::hex + << pad_size - partial << " zeros" << std::dec << dendl; + bl.append_zero(pad_size - partial); + } +} + + +// Returns log seq that was live before advance. +uint64_t BlueFS::_log_advance_seq() +{ + ceph_assert(ceph_mutex_is_locked(dirty.lock)); + ceph_assert(ceph_mutex_is_locked(log.lock)); + //acquire new seq + // this will became seq_stable once we write + ceph_assert(dirty.seq_stable < dirty.seq_live); + ceph_assert(log.t.seq == log.seq_live); + uint64_t seq = log.seq_live; + log.t.uuid = super.uuid; + + ++dirty.seq_live; + ++log.seq_live; + ceph_assert(dirty.seq_live == log.seq_live); + return seq; +} + + +// Adds to log.t file modifications mentioned in `dirty.files`. +// Note: some bluefs ops may have already been stored in log.t transaction. +void BlueFS::_consume_dirty(uint64_t seq) +{ + ceph_assert(ceph_mutex_is_locked(dirty.lock)); + ceph_assert(ceph_mutex_is_locked(log.lock)); + + // log dirty files + // we just incremented log_seq. It is now illegal to add to dirty.files[log_seq] + auto lsi = dirty.files.find(seq); + if (lsi != dirty.files.end()) { + dout(20) << __func__ << " " << lsi->second.size() << " dirty.files" << dendl; + for (auto &f : lsi->second) { + // fnode here is protected indirectly + // the only path that adds to dirty.files goes from _fsync() + // _fsync() is executed under writer lock, + // and does not exit until syncing log is done + dout(20) << __func__ << " op_file_update_inc " << f.fnode << dendl; + log.t.op_file_update_inc(f.fnode); + } + } +} + +// Extends log if its free space is smaller then bluefs_min_log_runway. +// Returns space available *BEFORE* adding new space. Signed for additional <0 detection. +int64_t BlueFS::_maybe_extend_log() +{ + ceph_assert(ceph_mutex_is_locked(log.lock)); + // allocate some more space (before we run out)? + // BTW: this triggers `flush()` in the `page_aligned_appender` of `log.writer`. + int64_t runway = log.writer->file->fnode.get_allocated() - + log.writer->get_effective_write_pos(); + if (runway < (int64_t)cct->_conf->bluefs_min_log_runway) { + dout(10) << __func__ << " allocating more log runway (0x" + << std::hex << runway << std::dec << " remaining)" << dendl; + /* + * Usually, when we are low on space in log, we just allocate new extent, + * put update op(log) to log and we are fine. + * Problem - it interferes with log compaction: + * New log produced in compaction will include - as last op - jump into some offset (anchor) of current log. + * It is assumed that log region (anchor - end) will contain all changes made by bluefs since + * full state capture into new log. + * Putting log update into (anchor - end) region is illegal, because any update there must be compatible with + * both logs, but old log is different then new log. + * + * Possible solutions: + * - stall extending log until we finish compacting and switch log (CURRENT) + * - re-run compaction with more runway for old log + * - add OP_FILE_ADDEXT that adds extent; will be compatible with both logs + */ + if (log_forbidden_to_expand.load() == true) { + return -EWOULDBLOCK; + } + vselector->sub_usage(log.writer->file->vselector_hint, log.writer->file->fnode); + int r = _allocate( + vselector->select_prefer_bdev(log.writer->file->vselector_hint), + cct->_conf->bluefs_max_log_runway, + 0, + &log.writer->file->fnode); + ceph_assert(r == 0); + vselector->add_usage(log.writer->file->vselector_hint, log.writer->file->fnode); + log.t.op_file_update_inc(log.writer->file->fnode); + } + return runway; +} + +void BlueFS::_flush_and_sync_log_core(int64_t runway) +{ + ceph_assert(ceph_mutex_is_locked(log.lock)); + dout(10) << __func__ << " " << log.t << dendl; + + bufferlist bl; + bl.reserve(super.block_size); + encode(log.t, bl); + // pad to block boundary + size_t realign = super.block_size - (bl.length() % super.block_size); + if (realign && realign != super.block_size) + bl.append_zero(realign); + + logger->inc(l_bluefs_log_write_count, 1); + logger->inc(l_bluefs_logged_bytes, bl.length()); + + if (true) { + ceph_assert(bl.length() <= runway); // if we write this, we will have an unrecoverable data loss + // transaction will not fit extents before growth -> data loss on _replay + } + + log.writer->append(bl); + + // prepare log for new transactions + log.t.clear(); + log.t.seq = log.seq_live; + + uint64_t new_data = _flush_special(log.writer); + vselector->add_usage(log.writer->file->vselector_hint, new_data); +} + +// Clears dirty.files up to (including) seq_stable. +void BlueFS::_clear_dirty_set_stable_D(uint64_t seq) +{ + std::lock_guard dl(dirty.lock); + + // clean dirty files + if (seq > dirty.seq_stable) { + dirty.seq_stable = seq; + dout(20) << __func__ << " seq_stable " << dirty.seq_stable << dendl; + + // undirty all files that were already streamed to log + auto p = dirty.files.begin(); + while (p != dirty.files.end()) { + if (p->first > dirty.seq_stable) { + dout(20) << __func__ << " done cleaning up dirty files" << dendl; + break; + } + + auto l = p->second.begin(); + while (l != p->second.end()) { + File *file = &*l; + ceph_assert(file->dirty_seq <= dirty.seq_stable); + dout(20) << __func__ << " cleaned file " << file->fnode.ino << dendl; + file->dirty_seq = dirty.seq_stable; + p->second.erase(l++); + } + + ceph_assert(p->second.empty()); + dirty.files.erase(p++); + } + } else { + dout(20) << __func__ << " seq_stable " << dirty.seq_stable + << " already >= out seq " << seq + << ", we lost a race against another log flush, done" << dendl; + } +} + +void BlueFS::_release_pending_allocations(vector<interval_set<uint64_t>>& to_release) +{ + for (unsigned i = 0; i < to_release.size(); ++i) { + if (to_release[i].empty()) { + continue; + } + /* OK, now we have the guarantee alloc[i] won't be null. */ + + bool discard_queued = bdev[i]->try_discard(to_release[i]); + if (!discard_queued) { + alloc[i]->release(to_release[i]); + if (is_shared_alloc(i)) { + shared_alloc->bluefs_used -= to_release[i].size(); + } + } + } +} + +int BlueFS::_flush_and_sync_log_LD(uint64_t want_seq) +{ + int64_t available_runway; + do { + log.lock.lock(); + dirty.lock.lock(); + if (want_seq && want_seq <= dirty.seq_stable) { + dout(10) << __func__ << " want_seq " << want_seq << " <= seq_stable " + << dirty.seq_stable << ", done" << dendl; + dirty.lock.unlock(); + log.lock.unlock(); + return 0; + } + + available_runway = _maybe_extend_log(); + if (available_runway == -EWOULDBLOCK) { + // we are in need of adding runway, but we are during log-switch from compaction + dirty.lock.unlock(); + //instead log.lock.unlock() do move ownership + std::unique_lock<ceph::mutex> ll(log.lock, std::adopt_lock); + while (log_forbidden_to_expand.load()) { + log_cond.wait(ll); + } + } else { + ceph_assert(available_runway >= 0); + } + } while (available_runway < 0); + + ceph_assert(want_seq == 0 || want_seq <= dirty.seq_live); // illegal to request seq that was not created yet + uint64_t seq =_log_advance_seq(); + _consume_dirty(seq); + vector<interval_set<uint64_t>> to_release(dirty.pending_release.size()); + to_release.swap(dirty.pending_release); + dirty.lock.unlock(); + + _flush_and_sync_log_core(available_runway); + _flush_bdev(log.writer); + logger->set(l_bluefs_log_bytes, log.writer->file->fnode.size); + //now log.lock is no longer needed + log.lock.unlock(); + + _clear_dirty_set_stable_D(seq); + _release_pending_allocations(to_release); + + _update_logger_stats(); + return 0; +} + +// Flushes log and immediately adjusts log_writer pos. +int BlueFS::_flush_and_sync_log_jump_D(uint64_t jump_to, + int64_t available_runway) +{ + ceph_assert(ceph_mutex_is_locked(log.lock)); + + ceph_assert(jump_to); + // we synchronize writing to log, by lock to log.lock + + dirty.lock.lock(); + uint64_t seq =_log_advance_seq(); + _consume_dirty(seq); + vector<interval_set<uint64_t>> to_release(dirty.pending_release.size()); + to_release.swap(dirty.pending_release); + dirty.lock.unlock(); + _flush_and_sync_log_core(available_runway); + + dout(10) << __func__ << " jumping log offset from 0x" << std::hex + << log.writer->pos << " -> 0x" << jump_to << std::dec << dendl; + log.writer->pos = jump_to; + vselector->sub_usage(log.writer->file->vselector_hint, log.writer->file->fnode.size); + log.writer->file->fnode.size = jump_to; + vselector->add_usage(log.writer->file->vselector_hint, log.writer->file->fnode.size); + + _flush_bdev(log.writer); + + _clear_dirty_set_stable_D(seq); + _release_pending_allocations(to_release); + + logger->set(l_bluefs_log_bytes, log.writer->file->fnode.size); + _update_logger_stats(); + return 0; +} + +ceph::bufferlist BlueFS::FileWriter::flush_buffer( + CephContext* const cct, + const bool partial, + const unsigned length, + const bluefs_super_t& super) +{ + ceph_assert(ceph_mutex_is_locked(this->lock) || file->fnode.ino <= 1); + ceph::bufferlist bl; + if (partial) { + tail_block.splice(0, tail_block.length(), &bl); + } + const auto remaining_len = length - bl.length(); + buffer.splice(0, remaining_len, &bl); + if (buffer.length()) { + dout(20) << " leaving 0x" << std::hex << buffer.length() << std::dec + << " unflushed" << dendl; + } + if (const unsigned tail = bl.length() & ~super.block_mask(); tail) { + const auto padding_len = super.block_size - tail; + dout(20) << __func__ << " caching tail of 0x" + << std::hex << tail + << " and padding block with 0x" << padding_len + << " buffer.length() " << buffer.length() + << std::dec << dendl; + // We need to go through the `buffer_appender` to get a chance to + // preserve in-memory contiguity and not mess with the alignment. + // Otherwise a costly rebuild could happen in e.g. `KernelDevice`. + buffer_appender.append_zero(padding_len); + buffer.splice(buffer.length() - padding_len, padding_len, &bl); + // Deep copy the tail here. This allows to avoid costlier copy on + // bufferlist rebuild in e.g. `KernelDevice` and minimizes number + // of memory allocations. + // The alternative approach would be to place the entire tail and + // padding on a dedicated, 4 KB long memory chunk. This shouldn't + // trigger the rebuild while still being less expensive. + buffer_appender.substr_of(bl, bl.length() - padding_len - tail, tail); + buffer.splice(buffer.length() - tail, tail, &tail_block); + } else { + tail_block.clear(); + } + return bl; +} + +int BlueFS::_signal_dirty_to_log_D(FileWriter *h) +{ + ceph_assert(ceph_mutex_is_locked(h->lock)); + std::lock_guard dl(dirty.lock); + if (h->file->deleted) { + dout(10) << __func__ << " deleted, no-op" << dendl; + return 0; + } + + h->file->fnode.mtime = ceph_clock_now(); + ceph_assert(h->file->fnode.ino >= 1); + if (h->file->dirty_seq <= dirty.seq_stable) { + h->file->dirty_seq = dirty.seq_live; + dirty.files[h->file->dirty_seq].push_back(*h->file); + dout(20) << __func__ << " dirty_seq = " << dirty.seq_live + << " (was clean)" << dendl; + } else { + if (h->file->dirty_seq != dirty.seq_live) { + // need re-dirty, erase from list first + ceph_assert(dirty.files.count(h->file->dirty_seq)); + auto it = dirty.files[h->file->dirty_seq].iterator_to(*h->file); + dirty.files[h->file->dirty_seq].erase(it); + h->file->dirty_seq = dirty.seq_live; + dirty.files[h->file->dirty_seq].push_back(*h->file); + dout(20) << __func__ << " dirty_seq = " << dirty.seq_live + << " (was " << h->file->dirty_seq << ")" << dendl; + } else { + dout(20) << __func__ << " dirty_seq = " << dirty.seq_live + << " (unchanged, do nothing) " << dendl; + } + } + return 0; +} + +void BlueFS::flush_range(FileWriter *h, uint64_t offset, uint64_t length)/*_WF*/ +{ + _maybe_check_vselector_LNF(); + std::unique_lock hl(h->lock); + _flush_range_F(h, offset, length); +} + +int BlueFS::_flush_range_F(FileWriter *h, uint64_t offset, uint64_t length) +{ + ceph_assert(ceph_mutex_is_locked(h->lock)); + ceph_assert(h->file->num_readers.load() == 0); + ceph_assert(h->file->fnode.ino > 1); + + dout(10) << __func__ << " " << h << " pos 0x" << std::hex << h->pos + << " 0x" << offset << "~" << length << std::dec + << " to " << h->file->fnode << dendl; + if (h->file->deleted) { + dout(10) << __func__ << " deleted, no-op" << dendl; + return 0; + } + + bool buffered = cct->_conf->bluefs_buffered_io; + + if (offset + length <= h->pos) + return 0; + if (offset < h->pos) { + length -= h->pos - offset; + offset = h->pos; + dout(10) << " still need 0x" + << std::hex << offset << "~" << length << std::dec + << dendl; + } + std::lock_guard file_lock(h->file->lock); + ceph_assert(offset <= h->file->fnode.size); + + uint64_t allocated = h->file->fnode.get_allocated(); + vselector->sub_usage(h->file->vselector_hint, h->file->fnode); + // do not bother to dirty the file if we are overwriting + // previously allocated extents. + if (allocated < offset + length) { + // we should never run out of log space here; see the min runway check + // in _flush_and_sync_log. + int r = _allocate(vselector->select_prefer_bdev(h->file->vselector_hint), + offset + length - allocated, + 0, + &h->file->fnode); + if (r < 0) { + derr << __func__ << " allocated: 0x" << std::hex << allocated + << " offset: 0x" << offset << " length: 0x" << length << std::dec + << dendl; + vselector->add_usage(h->file->vselector_hint, h->file->fnode); // undo + ceph_abort_msg("bluefs enospc"); + return r; + } + h->file->is_dirty = true; + } + if (h->file->fnode.size < offset + length) { + h->file->fnode.size = offset + length; + h->file->is_dirty = true; + } + + dout(20) << __func__ << " file now, unflushed " << h->file->fnode << dendl; + int res = _flush_data(h, offset, length, buffered); + vselector->add_usage(h->file->vselector_hint, h->file->fnode); + return res; +} + +int BlueFS::_flush_data(FileWriter *h, uint64_t offset, uint64_t length, bool buffered) +{ + if (h->file->fnode.ino > 1) { + ceph_assert(ceph_mutex_is_locked(h->lock)); + ceph_assert(ceph_mutex_is_locked(h->file->lock)); + } + uint64_t x_off = 0; + auto p = h->file->fnode.seek(offset, &x_off); + ceph_assert(p != h->file->fnode.extents.end()); + dout(20) << __func__ << " in " << *p << " x_off 0x" + << std::hex << x_off << std::dec << dendl; + + unsigned partial = x_off & ~super.block_mask(); + if (partial) { + dout(20) << __func__ << " using partial tail 0x" + << std::hex << partial << std::dec << dendl; + x_off -= partial; + offset -= partial; + length += partial; + dout(20) << __func__ << " waiting for previous aio to complete" << dendl; + for (auto p : h->iocv) { + if (p) { + p->aio_wait(); + } + } + } + + auto bl = h->flush_buffer(cct, partial, length, super); + ceph_assert(bl.length() >= length); + h->pos = offset + length; + length = bl.length(); + + logger->inc(l_bluefs_write_count, 1); + logger->inc(l_bluefs_write_bytes, length); + + switch (h->writer_type) { + case WRITER_WAL: + logger->inc(l_bluefs_write_count_wal, 1); + logger->inc(l_bluefs_bytes_written_wal, length); + break; + case WRITER_SST: + logger->inc(l_bluefs_write_count_sst, 1); + logger->inc(l_bluefs_bytes_written_sst, length); + break; + } + + dout(30) << "dump:\n"; + bl.hexdump(*_dout); + *_dout << dendl; + + uint64_t bloff = 0; + uint64_t bytes_written_slow = 0; + while (length > 0) { + logger->inc(l_bluefs_write_disk_count, 1); + + uint64_t x_len = std::min(p->length - x_off, length); + bufferlist t; + t.substr_of(bl, bloff, x_len); + if (cct->_conf->bluefs_sync_write) { + bdev[p->bdev]->write(p->offset + x_off, t, buffered, h->write_hint); + } else { + bdev[p->bdev]->aio_write(p->offset + x_off, t, h->iocv[p->bdev], buffered, h->write_hint); + } + h->dirty_devs[p->bdev] = true; + if (p->bdev == BDEV_SLOW) { + bytes_written_slow += t.length(); + } + + bloff += x_len; + length -= x_len; + ++p; + x_off = 0; + } + if (bytes_written_slow) { + logger->inc(l_bluefs_bytes_written_slow, bytes_written_slow); + } + for (unsigned i = 0; i < MAX_BDEV; ++i) { + if (bdev[i]) { + if (h->iocv[i] && h->iocv[i]->has_pending_aios()) { + bdev[i]->aio_submit(h->iocv[i]); + } + } + } + dout(20) << __func__ << " h " << h << " pos now 0x" + << std::hex << h->pos << std::dec << dendl; + return 0; +} + +#ifdef HAVE_LIBAIO +// we need to retire old completed aios so they don't stick around in +// memory indefinitely (along with their bufferlist refs). +void BlueFS::_claim_completed_aios(FileWriter *h, list<aio_t> *ls) +{ + for (auto p : h->iocv) { + if (p) { + ls->splice(ls->end(), p->running_aios); + } + } + dout(10) << __func__ << " got " << ls->size() << " aios" << dendl; +} + +void BlueFS::_wait_for_aio(FileWriter *h) +{ + // NOTE: this is safe to call without a lock, as long as our reference is + // stable. + utime_t start; + lgeneric_subdout(cct, bluefs, 10) << __func__; + start = ceph_clock_now(); + *_dout << " " << h << dendl; + for (auto p : h->iocv) { + if (p) { + p->aio_wait(); + } + } + dout(10) << __func__ << " " << h << " done in " << (ceph_clock_now() - start) << dendl; +} +#endif + +void BlueFS::append_try_flush(FileWriter *h, const char* buf, size_t len)/*_WF_LNF_NF_LD_D*/ +{ + bool flushed_sum = false; + { + std::unique_lock hl(h->lock); + size_t max_size = 1ull << 30; // cap to 1GB + while (len > 0) { + bool need_flush = true; + auto l0 = h->get_buffer_length(); + if (l0 < max_size) { + size_t l = std::min(len, max_size - l0); + h->append(buf, l); + buf += l; + len -= l; + need_flush = h->get_buffer_length() >= cct->_conf->bluefs_min_flush_size; + } + if (need_flush) { + bool flushed = false; + int r = _flush_F(h, true, &flushed); + ceph_assert(r == 0); + flushed_sum |= flushed; + // make sure we've made any progress with flush hence the + // loop doesn't iterate forever + ceph_assert(h->get_buffer_length() < max_size); + } + } + } + if (flushed_sum) { + _maybe_compact_log_LNF_NF_LD_D(); + } +} + +void BlueFS::flush(FileWriter *h, bool force)/*_WF_LNF_NF_LD_D*/ +{ + bool flushed = false; + int r; + { + std::unique_lock hl(h->lock); + r = _flush_F(h, force, &flushed); + ceph_assert(r == 0); + } + if (r == 0 && flushed) { + _maybe_compact_log_LNF_NF_LD_D(); + } +} + +int BlueFS::_flush_F(FileWriter *h, bool force, bool *flushed) +{ + ceph_assert(ceph_mutex_is_locked(h->lock)); + uint64_t length = h->get_buffer_length(); + uint64_t offset = h->pos; + if (flushed) { + *flushed = false; + } + if (!force && + length < cct->_conf->bluefs_min_flush_size) { + dout(10) << __func__ << " " << h << " ignoring, length " << length + << " < min_flush_size " << cct->_conf->bluefs_min_flush_size + << dendl; + return 0; + } + if (length == 0) { + dout(10) << __func__ << " " << h << " no dirty data on " + << h->file->fnode << dendl; + return 0; + } + dout(10) << __func__ << " " << h << " 0x" + << std::hex << offset << "~" << length << std::dec + << " to " << h->file->fnode << dendl; + ceph_assert(h->pos <= h->file->fnode.size); + int r = _flush_range_F(h, offset, length); + if (flushed) { + *flushed = true; + } + return r; +} + +// Flush for bluefs special files. +// Does not add extents to h. +// Does not mark h as dirty. +// we do not need to dirty the log file (or it's compacting +// replacement) when the file size changes because replay is +// smart enough to discover it on its own. +uint64_t BlueFS::_flush_special(FileWriter *h) +{ + ceph_assert(h->file->fnode.ino <= 1); + uint64_t length = h->get_buffer_length(); + uint64_t offset = h->pos; + uint64_t new_data = 0; + ceph_assert(length + offset <= h->file->fnode.get_allocated()); + if (h->file->fnode.size < offset + length) { + new_data = offset + length - h->file->fnode.size; + h->file->fnode.size = offset + length; + } + _flush_data(h, offset, length, false); + return new_data; +} + +int BlueFS::truncate(FileWriter *h, uint64_t offset)/*_WF_L*/ +{ + std::lock_guard hl(h->lock); + dout(10) << __func__ << " 0x" << std::hex << offset << std::dec + << " file " << h->file->fnode << dendl; + if (h->file->deleted) { + dout(10) << __func__ << " deleted, no-op" << dendl; + return 0; + } + + // we never truncate internal log files + ceph_assert(h->file->fnode.ino > 1); + + // truncate off unflushed data? + if (h->pos < offset && + h->pos + h->get_buffer_length() > offset) { + dout(20) << __func__ << " tossing out last " << offset - h->pos + << " unflushed bytes" << dendl; + ceph_abort_msg("actually this shouldn't happen"); + } + if (h->get_buffer_length()) { + int r = _flush_F(h, true); + if (r < 0) + return r; + } + if (offset == h->file->fnode.size) { + return 0; // no-op! + } + if (offset > h->file->fnode.size) { + ceph_abort_msg("truncate up not supported"); + } + ceph_assert(h->file->fnode.size >= offset); + _flush_bdev(h); + + std::lock_guard ll(log.lock); + vselector->sub_usage(h->file->vselector_hint, h->file->fnode.size); + h->file->fnode.size = offset; + h->file->is_dirty = true; + vselector->add_usage(h->file->vselector_hint, h->file->fnode.size); + log.t.op_file_update_inc(h->file->fnode); + return 0; +} + +int BlueFS::fsync(FileWriter *h)/*_WF_WD_WLD_WLNF_WNF*/ +{ + _maybe_check_vselector_LNF(); + std::unique_lock hl(h->lock); + uint64_t old_dirty_seq = 0; + { + dout(10) << __func__ << " " << h << " " << h->file->fnode + << " dirty " << h->file->is_dirty << dendl; + int r = _flush_F(h, true); + if (r < 0) + return r; + _flush_bdev(h); + if (h->file->is_dirty) { + _signal_dirty_to_log_D(h); + h->file->is_dirty = false; + } + { + std::lock_guard dl(dirty.lock); + if (dirty.seq_stable < h->file->dirty_seq) { + old_dirty_seq = h->file->dirty_seq; + dout(20) << __func__ << " file metadata was dirty (" << old_dirty_seq + << ") on " << h->file->fnode << ", flushing log" << dendl; + } + } + } + if (old_dirty_seq) { + _flush_and_sync_log_LD(old_dirty_seq); + } + _maybe_compact_log_LNF_NF_LD_D(); + + return 0; +} + +// be careful - either h->file->lock or log.lock must be taken +void BlueFS::_flush_bdev(FileWriter *h, bool check_mutext_locked) +{ + if (check_mutext_locked) { + if (h->file->fnode.ino > 1) { + ceph_assert(ceph_mutex_is_locked(h->lock)); + } else if (h->file->fnode.ino == 1) { + ceph_assert(ceph_mutex_is_locked(log.lock)); + } + } + std::array<bool, MAX_BDEV> flush_devs = h->dirty_devs; + h->dirty_devs.fill(false); +#ifdef HAVE_LIBAIO + if (!cct->_conf->bluefs_sync_write) { + list<aio_t> completed_ios; + _claim_completed_aios(h, &completed_ios); + _wait_for_aio(h); + completed_ios.clear(); + } +#endif + _flush_bdev(flush_devs); +} + +void BlueFS::_flush_bdev(std::array<bool, MAX_BDEV>& dirty_bdevs) +{ + // NOTE: this is safe to call without a lock. + dout(20) << __func__ << dendl; + for (unsigned i = 0; i < MAX_BDEV; i++) { + if (dirty_bdevs[i]) + bdev[i]->flush(); + } +} + +void BlueFS::_flush_bdev() +{ + // NOTE: this is safe to call without a lock. + dout(20) << __func__ << dendl; + for (unsigned i = 0; i < MAX_BDEV; i++) { + // alloc space from BDEV_SLOW is unexpected. + // So most cases we don't alloc from BDEV_SLOW and so avoiding flush not-used device. + if (bdev[i] && (i != BDEV_SLOW || _get_used(i))) { + bdev[i]->flush(); + } + } +} + +const char* BlueFS::get_device_name(unsigned id) +{ + if (id >= MAX_BDEV) return "BDEV_INV"; + const char* names[] = {"BDEV_WAL", "BDEV_DB", "BDEV_SLOW", "BDEV_NEWWAL", "BDEV_NEWDB"}; + return names[id]; +} + +int BlueFS::_allocate(uint8_t id, uint64_t len, + uint64_t alloc_unit, + bluefs_fnode_t* node, + size_t alloc_attempts, + bool permit_dev_fallback) +{ + dout(10) << __func__ << " len 0x" << std::hex << len + << " au 0x" << alloc_unit + << std::dec << " from " << (int)id + << " cooldown " << cooldown_deadline + << dendl; + ceph_assert(id < alloc.size()); + int64_t alloc_len = 0; + PExtentVector extents; + uint64_t hint = 0; + int64_t need = len; + bool shared = is_shared_alloc(id); + auto shared_unit = shared_alloc ? shared_alloc->alloc_unit : 0; + bool was_cooldown = false; + if (alloc[id]) { + if (!alloc_unit) { + alloc_unit = alloc_size[id]; + } + // do not attempt shared_allocator with bluefs alloc unit + // when cooling down, fallback to slow dev alloc unit. + if (shared && alloc_unit != shared_unit) { + if (duration_cast<seconds>(real_clock::now().time_since_epoch()).count() < + cooldown_deadline) { + logger->inc(l_bluefs_alloc_shared_size_fallbacks); + alloc_unit = shared_unit; + was_cooldown = true; + } else if (cooldown_deadline.fetch_and(0)) { + // we might get false cooldown_deadline reset at this point + // but that's mostly harmless. + dout(1) << __func__ << " shared allocation cooldown period elapsed" + << dendl; + } + } + need = round_up_to(len, alloc_unit); + if (!node->extents.empty() && node->extents.back().bdev == id) { + hint = node->extents.back().end(); + } + ++alloc_attempts; + extents.reserve(4); // 4 should be (more than) enough for most allocations + alloc_len = alloc[id]->allocate(need, alloc_unit, hint, &extents); + } + if (alloc_len < 0 || alloc_len < need) { + if (alloc[id]) { + if (alloc_len > 0) { + alloc[id]->release(extents); + } + if (!was_cooldown && shared) { + auto delay_s = cct->_conf->bluefs_failed_shared_alloc_cooldown; + cooldown_deadline = delay_s + + duration_cast<seconds>(real_clock::now().time_since_epoch()).count(); + dout(1) << __func__ << " shared allocation cooldown set for " + << delay_s << "s" + << dendl; + } + dout(1) << __func__ << " unable to allocate 0x" << std::hex << need + << " on bdev " << (int)id + << ", allocator name " << alloc[id]->get_name() + << ", allocator type " << alloc[id]->get_type() + << ", capacity 0x" << alloc[id]->get_capacity() + << ", block size 0x" << alloc[id]->get_block_size() + << ", alloc unit 0x" << alloc_unit + << ", free 0x" << alloc[id]->get_free() + << ", fragmentation " << alloc[id]->get_fragmentation() + << ", allocated 0x" << (alloc_len > 0 ? alloc_len : 0) + << std::dec << dendl; + } else { + dout(20) << __func__ << " alloc-id not set on index="<< (int)id + << " unable to allocate 0x" << std::hex << need + << " on bdev " << (int)id << std::dec << dendl; + } + if (alloc[id] && shared && alloc_unit != shared_unit) { + alloc_unit = shared_unit; + dout(20) << __func__ << " fallback to bdev " + << (int)id + << " with alloc unit 0x" << std::hex << alloc_unit + << std::dec << dendl; + logger->inc(l_bluefs_alloc_shared_size_fallbacks); + return _allocate(id, + len, + alloc_unit, + node, + alloc_attempts, + permit_dev_fallback); + } else if (permit_dev_fallback && id != BDEV_SLOW && alloc[id + 1]) { + dout(20) << __func__ << " fallback to bdev " + << (int)id + 1 + << dendl; + if (alloc_attempts > 0 && is_shared_alloc(id + 1)) { + logger->inc(l_bluefs_alloc_shared_dev_fallbacks); + } + return _allocate(id + 1, + len, + 0, // back to default alloc unit + node, + alloc_attempts, + permit_dev_fallback); + } else { + derr << __func__ << " allocation failed, needed 0x" << std::hex << need + << dendl; + } + return -ENOSPC; + } else { + uint64_t used = _get_used(id); + if (max_bytes[id] < used) { + logger->set(max_bytes_pcounters[id], used); + max_bytes[id] = used; + } + if (shared) { + shared_alloc->bluefs_used += alloc_len; + } + } + + for (auto& p : extents) { + node->append_extent(bluefs_extent_t(id, p.offset, p.length)); + } + + return 0; +} + +int BlueFS::preallocate(FileRef f, uint64_t off, uint64_t len)/*_LF*/ +{ + std::lock_guard ll(log.lock); + std::lock_guard fl(f->lock); + dout(10) << __func__ << " file " << f->fnode << " 0x" + << std::hex << off << "~" << len << std::dec << dendl; + if (f->deleted) { + dout(10) << __func__ << " deleted, no-op" << dendl; + return 0; + } + ceph_assert(f->fnode.ino > 1); + uint64_t allocated = f->fnode.get_allocated(); + if (off + len > allocated) { + uint64_t want = off + len - allocated; + + vselector->sub_usage(f->vselector_hint, f->fnode); + int r = _allocate(vselector->select_prefer_bdev(f->vselector_hint), + want, + 0, + &f->fnode); + vselector->add_usage(f->vselector_hint, f->fnode); + if (r < 0) + return r; + + log.t.op_file_update_inc(f->fnode); + } + return 0; +} + +void BlueFS::sync_metadata(bool avoid_compact)/*_LNF_NF_LD_D*/ +{ + bool can_skip_flush; + { + std::lock_guard ll(log.lock); + std::lock_guard dl(dirty.lock); + can_skip_flush = log.t.empty() && dirty.files.empty(); + } + if (can_skip_flush) { + dout(10) << __func__ << " - no pending log events" << dendl; + } else { + utime_t start; + lgeneric_subdout(cct, bluefs, 10) << __func__; + start = ceph_clock_now(); + *_dout << dendl; + _flush_bdev(); // FIXME? + _flush_and_sync_log_LD(); + dout(10) << __func__ << " done in " << (ceph_clock_now() - start) << dendl; + } + + if (!avoid_compact) { + _maybe_compact_log_LNF_NF_LD_D(); + } +} + +void BlueFS::_maybe_compact_log_LNF_NF_LD_D() +{ + if (!cct->_conf->bluefs_replay_recovery_disable_compact && + _should_start_compact_log_L_N()) { + auto t0 = mono_clock::now(); + if (cct->_conf->bluefs_compact_log_sync) { + _compact_log_sync_LNF_LD(); + } else { + _compact_log_async_LD_LNF_D(); + } + logger->tinc(l_bluefs_compaction_lat, mono_clock::now() - t0); + } +} + +int BlueFS::open_for_write( + std::string_view dirname, + std::string_view filename, + FileWriter **h, + bool overwrite)/*_LND*/ +{ + _maybe_check_vselector_LNF(); + FileRef file; + bool create = false; + bool truncate = false; + mempool::bluefs::vector<bluefs_extent_t> pending_release_extents; + { + std::lock_guard ll(log.lock); + std::lock_guard nl(nodes.lock); + dout(10) << __func__ << " " << dirname << "/" << filename << dendl; + map<string,DirRef>::iterator p = nodes.dir_map.find(dirname); + DirRef dir; + if (p == nodes.dir_map.end()) { + // implicitly create the dir + dout(20) << __func__ << " dir " << dirname + << " does not exist" << dendl; + return -ENOENT; + } else { + dir = p->second; + } + + map<string,FileRef>::iterator q = dir->file_map.find(filename); + if (q == dir->file_map.end()) { + if (overwrite) { + dout(20) << __func__ << " dir " << dirname << " (" << dir + << ") file " << filename + << " does not exist" << dendl; + return -ENOENT; + } + file = ceph::make_ref<File>(); + file->fnode.ino = ++ino_last; + nodes.file_map[ino_last] = file; + dir->file_map[string{filename}] = file; + ++file->refs; + create = true; + logger->set(l_bluefs_num_files, nodes.file_map.size()); + } else { + // overwrite existing file? + file = q->second; + if (overwrite) { + dout(20) << __func__ << " dir " << dirname << " (" << dir + << ") file " << filename + << " already exists, overwrite in place" << dendl; + } else { + dout(20) << __func__ << " dir " << dirname << " (" << dir + << ") file " << filename + << " already exists, truncate + overwrite" << dendl; + vselector->sub_usage(file->vselector_hint, file->fnode); + file->fnode.size = 0; + pending_release_extents.swap(file->fnode.extents); + truncate = true; + + file->fnode.clear_extents(); + } + } + ceph_assert(file->fnode.ino > 1); + + file->fnode.mtime = ceph_clock_now(); + file->vselector_hint = vselector->get_hint_by_dir(dirname); + if (create || truncate) { + vselector->add_usage(file->vselector_hint, file->fnode); // update file count + } + + dout(20) << __func__ << " mapping " << dirname << "/" << filename + << " vsel_hint " << file->vselector_hint + << dendl; + + log.t.op_file_update(file->fnode); + if (create) + log.t.op_dir_link(dirname, filename, file->fnode.ino); + + std::lock_guard dl(dirty.lock); + for (auto& p : pending_release_extents) { + dirty.pending_release[p.bdev].insert(p.offset, p.length); + } + } + *h = _create_writer(file); + + if (boost::algorithm::ends_with(filename, ".log")) { + (*h)->writer_type = BlueFS::WRITER_WAL; + if (logger && !overwrite) { + logger->inc(l_bluefs_files_written_wal); + } + } else if (boost::algorithm::ends_with(filename, ".sst")) { + (*h)->writer_type = BlueFS::WRITER_SST; + if (logger) { + logger->inc(l_bluefs_files_written_sst); + } + } + + dout(10) << __func__ << " h " << *h << " on " << file->fnode << dendl; + return 0; +} + +BlueFS::FileWriter *BlueFS::_create_writer(FileRef f) +{ + FileWriter *w = new FileWriter(f); + for (unsigned i = 0; i < MAX_BDEV; ++i) { + if (bdev[i]) { + w->iocv[i] = new IOContext(cct, NULL); + } + } + return w; +} + +void BlueFS::_drain_writer(FileWriter *h) +{ + dout(10) << __func__ << " " << h << " type " << h->writer_type << dendl; + //h->buffer.reassign_to_mempool(mempool::mempool_bluefs_file_writer); + for (unsigned i=0; i<MAX_BDEV; ++i) { + if (bdev[i]) { + if (h->iocv[i]) { + h->iocv[i]->aio_wait(); + delete h->iocv[i]; + } + } + } + // sanity + if (h->file->fnode.size >= (1ull << 30)) { + dout(10) << __func__ << " file is unexpectedly large:" << h->file->fnode << dendl; + } +} + +void BlueFS::_close_writer(FileWriter *h) +{ + _drain_writer(h); + delete h; +} +void BlueFS::close_writer(FileWriter *h) +{ + { + std::lock_guard l(h->lock); + _drain_writer(h); + } + delete h; +} + +uint64_t BlueFS::debug_get_dirty_seq(FileWriter *h) +{ + std::lock_guard l(h->lock); + return h->file->dirty_seq; +} + +bool BlueFS::debug_get_is_dev_dirty(FileWriter *h, uint8_t dev) +{ + std::lock_guard l(h->lock); + return h->dirty_devs[dev]; +} + +int BlueFS::open_for_read( + std::string_view dirname, + std::string_view filename, + FileReader **h, + bool random)/*_N*/ +{ + _maybe_check_vselector_LNF(); + std::lock_guard nl(nodes.lock); + dout(10) << __func__ << " " << dirname << "/" << filename + << (random ? " (random)":" (sequential)") << dendl; + map<string,DirRef>::iterator p = nodes.dir_map.find(dirname); + if (p == nodes.dir_map.end()) { + dout(20) << __func__ << " dir " << dirname << " not found" << dendl; + return -ENOENT; + } + DirRef dir = p->second; + + map<string,FileRef>::iterator q = dir->file_map.find(filename); + if (q == dir->file_map.end()) { + dout(20) << __func__ << " dir " << dirname << " (" << dir + << ") file " << filename + << " not found" << dendl; + return -ENOENT; + } + File *file = q->second.get(); + + *h = new FileReader(file, random ? 4096 : cct->_conf->bluefs_max_prefetch, + random, false); + dout(10) << __func__ << " h " << *h << " on " << file->fnode << dendl; + return 0; +} + +int BlueFS::rename( + std::string_view old_dirname, std::string_view old_filename, + std::string_view new_dirname, std::string_view new_filename)/*_LND*/ +{ + std::lock_guard ll(log.lock); + std::lock_guard nl(nodes.lock); + dout(10) << __func__ << " " << old_dirname << "/" << old_filename + << " -> " << new_dirname << "/" << new_filename << dendl; + map<string,DirRef>::iterator p = nodes.dir_map.find(old_dirname); + if (p == nodes.dir_map.end()) { + dout(20) << __func__ << " dir " << old_dirname << " not found" << dendl; + return -ENOENT; + } + DirRef old_dir = p->second; + map<string,FileRef>::iterator q = old_dir->file_map.find(old_filename); + if (q == old_dir->file_map.end()) { + dout(20) << __func__ << " dir " << old_dirname << " (" << old_dir + << ") file " << old_filename + << " not found" << dendl; + return -ENOENT; + } + FileRef file = q->second; + + p = nodes.dir_map.find(new_dirname); + if (p == nodes.dir_map.end()) { + dout(20) << __func__ << " dir " << new_dirname << " not found" << dendl; + return -ENOENT; + } + DirRef new_dir = p->second; + q = new_dir->file_map.find(new_filename); + if (q != new_dir->file_map.end()) { + dout(20) << __func__ << " dir " << new_dirname << " (" << old_dir + << ") file " << new_filename + << " already exists, unlinking" << dendl; + ceph_assert(q->second != file); + log.t.op_dir_unlink(new_dirname, new_filename); + _drop_link_D(q->second); + } + + dout(10) << __func__ << " " << new_dirname << "/" << new_filename << " " + << " " << file->fnode << dendl; + + new_dir->file_map[string{new_filename}] = file; + old_dir->file_map.erase(string{old_filename}); + + log.t.op_dir_link(new_dirname, new_filename, file->fnode.ino); + log.t.op_dir_unlink(old_dirname, old_filename); + return 0; +} + +int BlueFS::mkdir(std::string_view dirname)/*_LN*/ +{ + std::lock_guard ll(log.lock); + std::lock_guard nl(nodes.lock); + dout(10) << __func__ << " " << dirname << dendl; + map<string,DirRef>::iterator p = nodes.dir_map.find(dirname); + if (p != nodes.dir_map.end()) { + dout(20) << __func__ << " dir " << dirname << " exists" << dendl; + return -EEXIST; + } + nodes.dir_map[string{dirname}] = ceph::make_ref<Dir>(); + log.t.op_dir_create(dirname); + return 0; +} + +int BlueFS::rmdir(std::string_view dirname)/*_LN*/ +{ + std::lock_guard ll(log.lock); + std::lock_guard nl(nodes.lock); + dout(10) << __func__ << " " << dirname << dendl; + auto p = nodes.dir_map.find(dirname); + if (p == nodes.dir_map.end()) { + dout(20) << __func__ << " dir " << dirname << " does not exist" << dendl; + return -ENOENT; + } + DirRef dir = p->second; + if (!dir->file_map.empty()) { + dout(20) << __func__ << " dir " << dirname << " not empty" << dendl; + return -ENOTEMPTY; + } + nodes.dir_map.erase(string{dirname}); + log.t.op_dir_remove(dirname); + return 0; +} + +bool BlueFS::dir_exists(std::string_view dirname)/*_N*/ +{ + std::lock_guard nl(nodes.lock); + map<string,DirRef>::iterator p = nodes.dir_map.find(dirname); + bool exists = p != nodes.dir_map.end(); + dout(10) << __func__ << " " << dirname << " = " << (int)exists << dendl; + return exists; +} + +int BlueFS::stat(std::string_view dirname, std::string_view filename, + uint64_t *size, utime_t *mtime)/*_N*/ +{ + std::lock_guard nl(nodes.lock); + dout(10) << __func__ << " " << dirname << "/" << filename << dendl; + map<string,DirRef>::iterator p = nodes.dir_map.find(dirname); + if (p == nodes.dir_map.end()) { + dout(20) << __func__ << " dir " << dirname << " not found" << dendl; + return -ENOENT; + } + DirRef dir = p->second; + map<string,FileRef>::iterator q = dir->file_map.find(filename); + if (q == dir->file_map.end()) { + dout(20) << __func__ << " dir " << dirname << " (" << dir + << ") file " << filename + << " not found" << dendl; + return -ENOENT; + } + File *file = q->second.get(); + dout(10) << __func__ << " " << dirname << "/" << filename + << " " << file->fnode << dendl; + if (size) + *size = file->fnode.size; + if (mtime) + *mtime = file->fnode.mtime; + return 0; +} + +int BlueFS::lock_file(std::string_view dirname, std::string_view filename, + FileLock **plock)/*_LN*/ +{ + std::lock_guard ll(log.lock); + std::lock_guard nl(nodes.lock); + dout(10) << __func__ << " " << dirname << "/" << filename << dendl; + map<string,DirRef>::iterator p = nodes.dir_map.find(dirname); + if (p == nodes.dir_map.end()) { + dout(20) << __func__ << " dir " << dirname << " not found" << dendl; + return -ENOENT; + } + DirRef dir = p->second; + auto q = dir->file_map.find(filename); + FileRef file; + if (q == dir->file_map.end()) { + dout(20) << __func__ << " dir " << dirname << " (" << dir + << ") file " << filename + << " not found, creating" << dendl; + file = ceph::make_ref<File>(); + file->fnode.ino = ++ino_last; + file->fnode.mtime = ceph_clock_now(); + nodes.file_map[ino_last] = file; + dir->file_map[string{filename}] = file; + logger->set(l_bluefs_num_files, nodes.file_map.size()); + ++file->refs; + log.t.op_file_update(file->fnode); + log.t.op_dir_link(dirname, filename, file->fnode.ino); + } else { + file = q->second; + if (file->locked) { + dout(10) << __func__ << " already locked" << dendl; + return -ENOLCK; + } + } + file->locked = true; + *plock = new FileLock(file); + dout(10) << __func__ << " locked " << file->fnode + << " with " << *plock << dendl; + return 0; +} + +int BlueFS::unlock_file(FileLock *fl)/*_N*/ +{ + std::lock_guard nl(nodes.lock); + dout(10) << __func__ << " " << fl << " on " << fl->file->fnode << dendl; + ceph_assert(fl->file->locked); + fl->file->locked = false; + delete fl; + return 0; +} + +int BlueFS::readdir(std::string_view dirname, vector<string> *ls)/*_N*/ +{ + // dirname may contain a trailing / + if (!dirname.empty() && dirname.back() == '/') { + dirname.remove_suffix(1); + } + std::lock_guard nl(nodes.lock); + dout(10) << __func__ << " " << dirname << dendl; + if (dirname.empty()) { + // list dirs + ls->reserve(nodes.dir_map.size() + 2); + for (auto& q : nodes.dir_map) { + ls->push_back(q.first); + } + } else { + // list files in dir + map<string,DirRef>::iterator p = nodes.dir_map.find(dirname); + if (p == nodes.dir_map.end()) { + dout(20) << __func__ << " dir " << dirname << " not found" << dendl; + return -ENOENT; + } + DirRef dir = p->second; + ls->reserve(dir->file_map.size() + 2); + for (auto& q : dir->file_map) { + ls->push_back(q.first); + } + } + ls->push_back("."); + ls->push_back(".."); + return 0; +} + +int BlueFS::unlink(std::string_view dirname, std::string_view filename)/*_LND*/ +{ + std::lock_guard ll(log.lock); + std::lock_guard nl(nodes.lock); + dout(10) << __func__ << " " << dirname << "/" << filename << dendl; + map<string,DirRef>::iterator p = nodes.dir_map.find(dirname); + if (p == nodes.dir_map.end()) { + dout(20) << __func__ << " dir " << dirname << " not found" << dendl; + return -ENOENT; + } + DirRef dir = p->second; + map<string,FileRef>::iterator q = dir->file_map.find(filename); + if (q == dir->file_map.end()) { + dout(20) << __func__ << " file " << dirname << "/" << filename + << " not found" << dendl; + return -ENOENT; + } + FileRef file = q->second; + if (file->locked) { + dout(20) << __func__ << " file " << dirname << "/" << filename + << " is locked" << dendl; + return -EBUSY; + } + dir->file_map.erase(string{filename}); + log.t.op_dir_unlink(dirname, filename); + _drop_link_D(file); + return 0; +} + +bool BlueFS::wal_is_rotational() +{ + if (bdev[BDEV_WAL]) { + return bdev[BDEV_WAL]->is_rotational(); + } else if (bdev[BDEV_DB]) { + return bdev[BDEV_DB]->is_rotational(); + } + return bdev[BDEV_SLOW]->is_rotational(); +} + +bool BlueFS::db_is_rotational() +{ + if (bdev[BDEV_DB]) { + return bdev[BDEV_DB]->is_rotational(); + } + return bdev[BDEV_SLOW]->is_rotational(); +} + +/* + Algorithm. + do_replay_recovery_read is used when bluefs log abruptly ends, but it seems that more data should be there. + Idea is to search disk for definiton of extents that will be accompanied with bluefs log in future, + and try if using it will produce healthy bluefs transaction. + We encode already known bluefs log extents and search disk for these bytes. + When we find it, we decode following bytes as extent. + We read that whole extent and then check if merged with existing log part gives a proper bluefs transaction. + */ +int BlueFS::_do_replay_recovery_read(FileReader *log_reader, + size_t replay_pos, + size_t read_offset, + size_t read_len, + bufferlist* bl) { + dout(1) << __func__ << " replay_pos=0x" << std::hex << replay_pos << + " needs 0x" << read_offset << "~" << read_len << std::dec << dendl; + + bluefs_fnode_t& log_fnode = log_reader->file->fnode; + bufferlist bin_extents; + ::encode(log_fnode.extents, bin_extents); + dout(2) << __func__ << " log file encoded extents length = " << bin_extents.length() << dendl; + + // cannot process if too small to effectively search + ceph_assert(bin_extents.length() >= 32); + bufferlist last_32; + last_32.substr_of(bin_extents, bin_extents.length() - 32, 32); + + //read fixed part from replay_pos to end of bluefs_log extents + bufferlist fixed; + uint64_t e_off = 0; + auto e = log_fnode.seek(replay_pos, &e_off); + ceph_assert(e != log_fnode.extents.end()); + int r = _bdev_read(e->bdev, e->offset + e_off, e->length - e_off, &fixed, ioc[e->bdev], + cct->_conf->bluefs_buffered_io); + ceph_assert(r == 0); + //capture dev of last good extent + uint8_t last_e_dev = e->bdev; + uint64_t last_e_off = e->offset; + ++e; + while (e != log_fnode.extents.end()) { + r = _bdev_read(e->bdev, e->offset, e->length, &fixed, ioc[e->bdev], + cct->_conf->bluefs_buffered_io); + ceph_assert(r == 0); + last_e_dev = e->bdev; + ++e; + } + ceph_assert(replay_pos + fixed.length() == read_offset); + + dout(2) << __func__ << " valid data in log = " << fixed.length() << dendl; + + struct compare { + bool operator()(const bluefs_extent_t& a, const bluefs_extent_t& b) const { + if (a.bdev < b.bdev) return true; + if (a.offset < b.offset) return true; + return a.length < b.length; + } + }; + std::set<bluefs_extent_t, compare> extents_rejected; + for (int dcnt = 0; dcnt < 3; dcnt++) { + uint8_t dev = (last_e_dev + dcnt) % MAX_BDEV; + if (bdev[dev] == nullptr) continue; + dout(2) << __func__ << " processing " << get_device_name(dev) << dendl; + interval_set<uint64_t> disk_regions; + disk_regions.insert(0, bdev[dev]->get_size()); + for (auto f : nodes.file_map) { + auto& e = f.second->fnode.extents; + for (auto& p : e) { + if (p.bdev == dev) { + disk_regions.erase(p.offset, p.length); + } + } + } + size_t disk_regions_count = disk_regions.num_intervals(); + dout(5) << __func__ << " " << disk_regions_count << " regions to scan on " << get_device_name(dev) << dendl; + + auto reg = disk_regions.lower_bound(last_e_off); + //for all except first, start from beginning + last_e_off = 0; + if (reg == disk_regions.end()) { + reg = disk_regions.begin(); + } + const uint64_t chunk_size = 4 * 1024 * 1024; + const uint64_t page_size = 4096; + const uint64_t max_extent_size = 16; + uint64_t overlay_size = last_32.length() + max_extent_size; + for (size_t i = 0; i < disk_regions_count; reg++, i++) { + if (reg == disk_regions.end()) { + reg = disk_regions.begin(); + } + uint64_t pos = reg.get_start(); + uint64_t len = reg.get_len(); + + std::unique_ptr<char[]> raw_data_p{new char[page_size + chunk_size]}; + char* raw_data = raw_data_p.get(); + memset(raw_data, 0, page_size); + + while (len > last_32.length()) { + uint64_t chunk_len = len > chunk_size ? chunk_size : len; + dout(5) << __func__ << " read " + << get_device_name(dev) << ":0x" << std::hex << pos << "+" << chunk_len + << std::dec << dendl; + r = _bdev_read_random(dev, pos, chunk_len, + raw_data + page_size, cct->_conf->bluefs_buffered_io); + ceph_assert(r == 0); + + //search for fixed_last_32 + char* chunk_b = raw_data + page_size; + char* chunk_e = chunk_b + chunk_len; + + char* search_b = chunk_b - overlay_size; + char* search_e = chunk_e; + + for (char* sp = search_b; ; sp += last_32.length()) { + sp = (char*)memmem(sp, search_e - sp, last_32.c_str(), last_32.length()); + if (sp == nullptr) { + break; + } + + char* n = sp + last_32.length(); + dout(5) << __func__ << " checking location 0x" << std::hex << pos + (n - chunk_b) << std::dec << dendl; + bufferlist test; + test.append(n, std::min<size_t>(max_extent_size, chunk_e - n)); + bluefs_extent_t ne; + try { + bufferlist::const_iterator p = test.begin(); + ::decode(ne, p); + } catch (buffer::error& e) { + continue; + } + if (extents_rejected.count(ne) != 0) { + dout(5) << __func__ << " extent " << ne << " already refected" <<dendl; + continue; + } + //insert as rejected already. if we succeed, it wouldn't make difference. + extents_rejected.insert(ne); + + if (ne.bdev >= MAX_BDEV || + bdev[ne.bdev] == nullptr || + ne.length > 16 * 1024 * 1024 || + (ne.length & 4095) != 0 || + ne.offset + ne.length > bdev[ne.bdev]->get_size() || + (ne.offset & 4095) != 0) { + dout(5) << __func__ << " refusing extent " << ne << dendl; + continue; + } + dout(5) << __func__ << " checking extent " << ne << dendl; + + //read candidate extent - whole + bufferlist candidate; + candidate.append(fixed); + r = _bdev_read(ne.bdev, ne.offset, ne.length, &candidate, ioc[ne.bdev], + cct->_conf->bluefs_buffered_io); + ceph_assert(r == 0); + + //check if transaction & crc is ok + bluefs_transaction_t t; + try { + bufferlist::const_iterator p = candidate.begin(); + ::decode(t, p); + } + catch (buffer::error& e) { + dout(5) << __func__ << " failed match" << dendl; + continue; + } + + //success, it seems a probable candidate + uint64_t l = std::min<uint64_t>(ne.length, read_len); + //trim to required size + bufferlist requested_read; + requested_read.substr_of(candidate, fixed.length(), l); + bl->append(requested_read); + dout(5) << __func__ << " successful extension of log " << l << "/" << read_len << dendl; + log_fnode.append_extent(ne); + log_fnode.recalc_allocated(); + log_reader->buf.pos += l; + return l; + } + //save overlay for next search + memcpy(search_b, chunk_e - overlay_size, overlay_size); + pos += chunk_len; + len -= chunk_len; + } + } + } + return 0; +} + +void BlueFS::_check_vselector_LNF() { + BlueFSVolumeSelector* vs = vselector->clone_empty(); + if (!vs) { + return; + } + std::lock_guard ll(log.lock); + std::lock_guard nl(nodes.lock); + // Checking vselector is under log, nodes and file(s) locks, + // so any modification of vselector must be under at least one of those locks. + for (auto& f : nodes.file_map) { + f.second->lock.lock(); + vs->add_usage(f.second->vselector_hint, f.second->fnode); + } + bool res = vselector->compare(vs); + if (!res) { + dout(0) << "Current:"; + vselector->dump(*_dout); + *_dout << dendl; + dout(0) << "Expected:"; + vs->dump(*_dout); + *_dout << dendl; + } + ceph_assert(res); + for (auto& f : nodes.file_map) { + f.second->lock.unlock(); + } + delete vs; +} + +size_t BlueFS::probe_alloc_avail(int dev, uint64_t alloc_size) +{ + size_t total = 0; + auto iterated_allocation = [&](size_t off, size_t len) { + //only count in size that is alloc_size aligned + size_t dist_to_alignment; + size_t offset_in_block = off & (alloc_size - 1); + if (offset_in_block == 0) + dist_to_alignment = 0; + else + dist_to_alignment = alloc_size - offset_in_block; + if (dist_to_alignment >= len) + return; + len -= dist_to_alignment; + total += p2align(len, alloc_size); + }; + if (alloc[dev]) { + alloc[dev]->foreach(iterated_allocation); + } + return total; +} +// =============================================== +// OriginalVolumeSelector + +void* OriginalVolumeSelector::get_hint_for_log() const { + return reinterpret_cast<void*>(BlueFS::BDEV_WAL); +} +void* OriginalVolumeSelector::get_hint_by_dir(std::string_view dirname) const { + uint8_t res = BlueFS::BDEV_DB; + if (dirname.length() > 5) { + // the "db.slow" and "db.wal" directory names are hard-coded at + // match up with bluestore. the slow device is always the second + // one (when a dedicated block.db device is present and used at + // bdev 0). the wal device is always last. + if (boost::algorithm::ends_with(dirname, ".slow") && slow_total) { + res = BlueFS::BDEV_SLOW; + } else if (boost::algorithm::ends_with(dirname, ".wal") && wal_total) { + res = BlueFS::BDEV_WAL; + } + } + return reinterpret_cast<void*>(res); +} + +uint8_t OriginalVolumeSelector::select_prefer_bdev(void* hint) +{ + return (uint8_t)(reinterpret_cast<uint64_t>(hint)); +} + +void OriginalVolumeSelector::get_paths(const std::string& base, paths& res) const +{ + res.emplace_back(base, db_total); + res.emplace_back(base + ".slow", + slow_total ? slow_total : db_total); // use fake non-zero value if needed to + // avoid RocksDB complains +} + +#undef dout_prefix +#define dout_prefix *_dout << "OriginalVolumeSelector: " + +void OriginalVolumeSelector::dump(ostream& sout) { + sout<< "wal_total:" << wal_total + << ", db_total:" << db_total + << ", slow_total:" << slow_total + << std::endl; +} + +// =============================================== +// FitToFastVolumeSelector + +void FitToFastVolumeSelector::get_paths(const std::string& base, paths& res) const { + res.emplace_back(base, 1); // size of the last db_path has no effect +} |