summaryrefslogtreecommitdiffstats
path: root/src/os/bluestore/BlueFS.cc
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-27 18:24:20 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-27 18:24:20 +0000
commit483eb2f56657e8e7f419ab1a4fab8dce9ade8609 (patch)
treee5d88d25d870d5dedacb6bbdbe2a966086a0a5cf /src/os/bluestore/BlueFS.cc
parentInitial commit. (diff)
downloadceph-upstream.tar.xz
ceph-upstream.zip
Adding upstream version 14.2.21.upstream/14.2.21upstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to '')
-rw-r--r--src/os/bluestore/BlueFS.cc3665
1 files changed, 3665 insertions, 0 deletions
diff --git a/src/os/bluestore/BlueFS.cc b/src/os/bluestore/BlueFS.cc
new file mode 100644
index 00000000..f7bda939
--- /dev/null
+++ b/src/os/bluestore/BlueFS.cc
@@ -0,0 +1,3665 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include "boost/algorithm/string.hpp"
+#include "BlueFS.h"
+
+#include "common/debug.h"
+#include "common/errno.h"
+#include "common/perf_counters.h"
+#include "BlockDevice.h"
+#include "Allocator.h"
+#include "include/ceph_assert.h"
+#include "common/admin_socket.h"
+
+#define dout_context cct
+#define dout_subsys ceph_subsys_bluefs
+#undef dout_prefix
+#define dout_prefix *_dout << "bluefs "
+
+MEMPOOL_DEFINE_OBJECT_FACTORY(BlueFS::File, bluefs_file, bluefs);
+MEMPOOL_DEFINE_OBJECT_FACTORY(BlueFS::Dir, bluefs_dir, bluefs);
+MEMPOOL_DEFINE_OBJECT_FACTORY(BlueFS::FileWriter, bluefs_file_writer, bluefs_file_writer);
+MEMPOOL_DEFINE_OBJECT_FACTORY(BlueFS::FileReaderBuffer,
+ bluefs_file_reader_buffer, bluefs_file_reader);
+MEMPOOL_DEFINE_OBJECT_FACTORY(BlueFS::FileReader, bluefs_file_reader, bluefs_file_reader);
+MEMPOOL_DEFINE_OBJECT_FACTORY(BlueFS::FileLock, bluefs_file_lock, bluefs);
+
+static void wal_discard_cb(void *priv, void* priv2) {
+ BlueFS *bluefs = static_cast<BlueFS*>(priv);
+ interval_set<uint64_t> *tmp = static_cast<interval_set<uint64_t>*>(priv2);
+ bluefs->handle_discard(BlueFS::BDEV_WAL, *tmp);
+}
+
+static void db_discard_cb(void *priv, void* priv2) {
+ BlueFS *bluefs = static_cast<BlueFS*>(priv);
+ interval_set<uint64_t> *tmp = static_cast<interval_set<uint64_t>*>(priv2);
+ bluefs->handle_discard(BlueFS::BDEV_DB, *tmp);
+}
+
+static void slow_discard_cb(void *priv, void* priv2) {
+ BlueFS *bluefs = static_cast<BlueFS*>(priv);
+ interval_set<uint64_t> *tmp = static_cast<interval_set<uint64_t>*>(priv2);
+ bluefs->handle_discard(BlueFS::BDEV_SLOW, *tmp);
+}
+
+class BlueFS::SocketHook : public AdminSocketHook {
+ BlueFS* bluefs;
+public:
+ static BlueFS::SocketHook* create(BlueFS* bluefs)
+ {
+ BlueFS::SocketHook* hook = nullptr;
+ AdminSocket* admin_socket = bluefs->cct->get_admin_socket();
+ if (admin_socket) {
+ hook = new BlueFS::SocketHook(bluefs);
+ int r = admin_socket->register_command("bluestore bluefs available",
+ "bluestore bluefs available "
+ "name=alloc_size,type=CephInt,req=false",
+ hook,
+ "Report available space for bluefs. "
+ "If alloc_size set, make simulation.");
+ if (r != 0) {
+ ldout(bluefs->cct, 1) << __func__ << " cannot register SocketHook" << dendl;
+ delete hook;
+ hook = nullptr;
+ } else {
+ r = admin_socket->register_command("bluestore bluefs stats",
+ "bluestore bluefs stats",
+ hook,
+ "Dump internal statistics for bluefs.");
+ ceph_assert(r == 0);
+ r = admin_socket->register_command("bluefs debug_inject_read_zeros",
+ "bluefs debug_inject_read_zeros",
+ hook,
+ "Injects 8K zeros into next BlueFS read. Debug only.");
+ ceph_assert(r == 0);
+ }
+ }
+ return hook;
+ }
+
+ ~SocketHook() {
+ AdminSocket* admin_socket = bluefs->cct->get_admin_socket();
+ admin_socket->unregister_commands(this);
+ }
+private:
+ SocketHook(BlueFS* bluefs) :
+ bluefs(bluefs) {}
+ bool call(std::string_view command, const cmdmap_t& cmdmap,
+ std::string_view format, bufferlist& out) override {
+ stringstream ss;
+ bool r = true;
+ if (command == "bluestore bluefs available") {
+ int64_t alloc_size = 0;
+ cmd_getval(bluefs->cct, cmdmap, "alloc_size", alloc_size);
+ if ((alloc_size & (alloc_size - 1)) != 0) {
+ ss << "Invalid allocation size:'" << alloc_size << std::endl;
+ }
+ if (alloc_size == 0)
+ alloc_size = bluefs->cct->_conf->bluefs_alloc_size;
+ Formatter *f = Formatter::create(format, "json-pretty", "json-pretty");
+ f->open_object_section("bluefs_available_space");
+ for (unsigned dev = BDEV_WAL; dev <= BDEV_SLOW; dev++) {
+ if (bluefs->bdev[dev]) {
+ f->open_object_section("dev");
+ f->dump_string("device", bluefs->get_device_name(dev));
+ ceph_assert(bluefs->alloc[dev]);
+ f->dump_int("free", bluefs->alloc[dev]->get_free());
+ f->close_section();
+ }
+ }
+ size_t extra_space = 0;
+ if (bluefs->slow_dev_expander) {
+ extra_space = bluefs->slow_dev_expander->available_freespace(alloc_size);
+ }
+ f->dump_int("available_from_bluestore", extra_space);
+ f->close_section();
+ f->flush(ss);
+ delete f;
+ } else if (command == "bluestore bluefs stats") {
+ bluefs->dump_block_extents(ss);
+ bluefs->dump_volume_selector(ss);
+ } else if (command == "bluefs debug_inject_read_zeros") {
+ bluefs->inject_read_zeros++;
+ } else {
+ ss << "Invalid command" << std::endl;
+ r = false;
+ }
+ out.append(ss);
+ return r;
+ }
+};
+
+BlueFS::BlueFS(CephContext* cct)
+ : cct(cct),
+ bdev(MAX_BDEV),
+ ioc(MAX_BDEV),
+ block_all(MAX_BDEV)
+{
+ discard_cb[BDEV_WAL] = wal_discard_cb;
+ discard_cb[BDEV_DB] = db_discard_cb;
+ discard_cb[BDEV_SLOW] = slow_discard_cb;
+ asok_hook = SocketHook::create(this);
+}
+
+BlueFS::~BlueFS()
+{
+ delete asok_hook;
+ for (auto p : ioc) {
+ if (p)
+ p->aio_wait();
+ }
+ for (auto p : bdev) {
+ if (p) {
+ p->close();
+ delete p;
+ }
+ }
+ for (auto p : ioc) {
+ delete p;
+ }
+}
+
+void BlueFS::_init_logger()
+{
+ PerfCountersBuilder b(cct, "bluefs",
+ l_bluefs_first, l_bluefs_last);
+ b.add_u64_counter(l_bluefs_gift_bytes, "gift_bytes",
+ "Bytes gifted from BlueStore", NULL, 0, unit_t(UNIT_BYTES));
+ b.add_u64_counter(l_bluefs_reclaim_bytes, "reclaim_bytes",
+ "Bytes reclaimed by BlueStore", NULL, 0, unit_t(UNIT_BYTES));
+ b.add_u64(l_bluefs_db_total_bytes, "db_total_bytes",
+ "Total bytes (main db device)",
+ "b", PerfCountersBuilder::PRIO_USEFUL, unit_t(UNIT_BYTES));
+ b.add_u64(l_bluefs_db_used_bytes, "db_used_bytes",
+ "Used bytes (main db device)",
+ "u", PerfCountersBuilder::PRIO_USEFUL, unit_t(UNIT_BYTES));
+ b.add_u64(l_bluefs_wal_total_bytes, "wal_total_bytes",
+ "Total bytes (wal device)",
+ "walb", PerfCountersBuilder::PRIO_USEFUL, unit_t(UNIT_BYTES));
+ b.add_u64(l_bluefs_wal_used_bytes, "wal_used_bytes",
+ "Used bytes (wal device)",
+ "walu", PerfCountersBuilder::PRIO_USEFUL, unit_t(UNIT_BYTES));
+ b.add_u64(l_bluefs_slow_total_bytes, "slow_total_bytes",
+ "Total bytes (slow device)",
+ "slob", PerfCountersBuilder::PRIO_USEFUL, unit_t(UNIT_BYTES));
+ b.add_u64(l_bluefs_slow_used_bytes, "slow_used_bytes",
+ "Used bytes (slow device)",
+ "slou", PerfCountersBuilder::PRIO_USEFUL, unit_t(UNIT_BYTES));
+ b.add_u64(l_bluefs_num_files, "num_files", "File count",
+ "f", PerfCountersBuilder::PRIO_USEFUL);
+ b.add_u64(l_bluefs_log_bytes, "log_bytes", "Size of the metadata log",
+ "jlen", PerfCountersBuilder::PRIO_INTERESTING, unit_t(UNIT_BYTES));
+ b.add_u64_counter(l_bluefs_log_compactions, "log_compactions",
+ "Compactions of the metadata log");
+ b.add_u64_counter(l_bluefs_logged_bytes, "logged_bytes",
+ "Bytes written to the metadata log", "j",
+ PerfCountersBuilder::PRIO_CRITICAL, unit_t(UNIT_BYTES));
+ b.add_u64_counter(l_bluefs_files_written_wal, "files_written_wal",
+ "Files written to WAL");
+ b.add_u64_counter(l_bluefs_files_written_sst, "files_written_sst",
+ "Files written to SSTs");
+ b.add_u64_counter(l_bluefs_bytes_written_wal, "bytes_written_wal",
+ "Bytes written to WAL", "wal",
+ PerfCountersBuilder::PRIO_CRITICAL);
+ b.add_u64_counter(l_bluefs_bytes_written_sst, "bytes_written_sst",
+ "Bytes written to SSTs", "sst",
+ PerfCountersBuilder::PRIO_CRITICAL, unit_t(UNIT_BYTES));
+ b.add_u64_counter(l_bluefs_bytes_written_slow, "bytes_written_slow",
+ "Bytes written to WAL/SSTs at slow device", NULL,
+ PerfCountersBuilder::PRIO_USEFUL, unit_t(UNIT_BYTES));
+ b.add_u64_counter(l_bluefs_max_bytes_wal, "max_bytes_wal",
+ "Maximum bytes allocated from WAL");
+ b.add_u64_counter(l_bluefs_max_bytes_db, "max_bytes_db",
+ "Maximum bytes allocated from DB");
+ b.add_u64_counter(l_bluefs_max_bytes_slow, "max_bytes_slow",
+ "Maximum bytes allocated from SLOW");
+
+ b.add_u64_counter(l_bluefs_read_random_count, "read_random_count",
+ "random read requests processed");
+ b.add_u64_counter(l_bluefs_read_random_bytes, "read_random_bytes",
+ "Bytes requested in random read mode", NULL,
+ PerfCountersBuilder::PRIO_USEFUL, unit_t(UNIT_BYTES));
+ b.add_u64_counter(l_bluefs_read_random_disk_count, "read_random_disk_count",
+ "random reads requests going to disk");
+ b.add_u64_counter(l_bluefs_read_random_disk_bytes, "read_random_disk_bytes",
+ "Bytes read from disk in random read mode", NULL,
+ PerfCountersBuilder::PRIO_USEFUL, unit_t(UNIT_BYTES));
+ b.add_u64_counter(l_bluefs_read_random_buffer_count, "read_random_buffer_count",
+ "random read requests processed using prefetch buffer");
+ b.add_u64_counter(l_bluefs_read_random_buffer_bytes, "read_random_buffer_bytes",
+ "Bytes read from prefetch buffer in random read mode", NULL,
+ PerfCountersBuilder::PRIO_USEFUL, unit_t(UNIT_BYTES));
+
+ b.add_u64_counter(l_bluefs_read_count, "read_count",
+ "buffered read requests processed");
+ b.add_u64_counter(l_bluefs_read_bytes, "read_bytes",
+ "Bytes requested in buffered read mode", NULL,
+ PerfCountersBuilder::PRIO_USEFUL, unit_t(UNIT_BYTES));
+
+ b.add_u64_counter(l_bluefs_read_prefetch_count, "read_prefetch_count",
+ "prefetch read requests processed");
+ b.add_u64_counter(l_bluefs_read_prefetch_bytes, "read_prefetch_bytes",
+ "Bytes requested in prefetch read mode", NULL,
+ PerfCountersBuilder::PRIO_USEFUL, unit_t(UNIT_BYTES));
+ b.add_u64(l_bluefs_read_zeros_candidate, "read_zeros_candidate",
+ "How many times bluefs read found page with all 0s");
+ b.add_u64(l_bluefs_read_zeros_errors, "read_zeros_errors",
+ "How many times bluefs read found transient page with all 0s");
+
+ logger = b.create_perf_counters();
+ cct->get_perfcounters_collection()->add(logger);
+}
+
+void BlueFS::_shutdown_logger()
+{
+ cct->get_perfcounters_collection()->remove(logger);
+ delete logger;
+}
+
+void BlueFS::_update_logger_stats()
+{
+ // we must be holding the lock
+ logger->set(l_bluefs_num_files, file_map.size());
+ logger->set(l_bluefs_log_bytes, log_writer->file->fnode.size);
+
+ if (alloc[BDEV_WAL]) {
+ logger->set(l_bluefs_wal_total_bytes, block_all[BDEV_WAL].size());
+ logger->set(l_bluefs_wal_used_bytes,
+ block_all[BDEV_WAL].size() - alloc[BDEV_WAL]->get_free());
+ }
+ if (alloc[BDEV_DB]) {
+ logger->set(l_bluefs_db_total_bytes, block_all[BDEV_DB].size());
+ logger->set(l_bluefs_db_used_bytes,
+ block_all[BDEV_DB].size() - alloc[BDEV_DB]->get_free());
+ }
+ if (alloc[BDEV_SLOW]) {
+ logger->set(l_bluefs_slow_total_bytes, block_all[BDEV_SLOW].size());
+ logger->set(l_bluefs_slow_used_bytes,
+ block_all[BDEV_SLOW].size() - alloc[BDEV_SLOW]->get_free());
+ }
+}
+
+int BlueFS::add_block_device(unsigned id, const string& path, bool trim,
+ bool shared_with_bluestore)
+{
+ dout(10) << __func__ << " bdev " << id << " path " << path << dendl;
+ ceph_assert(id < bdev.size());
+ ceph_assert(bdev[id] == NULL);
+ BlockDevice *b = BlockDevice::create(cct, path, NULL, NULL,
+ discard_cb[id], static_cast<void*>(this));
+ if (shared_with_bluestore) {
+ b->set_no_exclusive_lock();
+ }
+ int r = b->open(path);
+ if (r < 0) {
+ delete b;
+ return r;
+ }
+ if (trim) {
+ b->discard(0, b->get_size());
+ }
+
+ dout(1) << __func__ << " bdev " << id << " path " << path
+ << " size " << byte_u_t(b->get_size()) << dendl;
+ bdev[id] = b;
+ ioc[id] = new IOContext(cct, NULL);
+ return 0;
+}
+
+bool BlueFS::bdev_support_label(unsigned id)
+{
+ ceph_assert(id < bdev.size());
+ ceph_assert(bdev[id]);
+ return bdev[id]->supported_bdev_label();
+}
+
+uint64_t BlueFS::get_block_device_size(unsigned id)
+{
+ if (id < bdev.size() && bdev[id])
+ return bdev[id]->get_size();
+ return 0;
+}
+
+void BlueFS::_add_block_extent(unsigned id, uint64_t offset, uint64_t length,
+ bool skip)
+{
+ dout(1) << __func__ << " bdev " << id
+ << " 0x" << std::hex << offset << "~" << length << std::dec
+ << " skip " << skip
+ << dendl;
+
+ ceph_assert(id < bdev.size());
+ ceph_assert(bdev[id]);
+ ceph_assert(bdev[id]->get_size() >= offset + length);
+ block_all[id].insert(offset, length);
+
+ if (id < alloc.size() && alloc[id]) {
+ if (!skip)
+ log_t.op_alloc_add(id, offset, length);
+
+ alloc[id]->init_add_free(offset, length);
+ }
+
+ if (logger)
+ logger->inc(l_bluefs_gift_bytes, length);
+ dout(10) << __func__ << " done" << dendl;
+}
+
+int BlueFS::reclaim_blocks(unsigned id, uint64_t want,
+ PExtentVector *extents)
+{
+ std::unique_lock l(lock);
+ dout(1) << __func__ << " bdev " << id
+ << " want 0x" << std::hex << want << std::dec << dendl;
+ ceph_assert(id < alloc.size());
+ ceph_assert(alloc[id]);
+
+ int64_t got = alloc[id]->allocate(want, alloc_size[id], 0, extents);
+ ceph_assert(got != 0);
+ if (got < 0) {
+ derr << __func__ << " failed to allocate space to return to bluestore"
+ << dendl;
+ alloc[id]->dump();
+ return got;
+ }
+
+ for (auto& p : *extents) {
+ block_all[id].erase(p.offset, p.length);
+ log_t.op_alloc_rm(id, p.offset, p.length);
+ }
+
+ flush_bdev();
+ int r = _flush_and_sync_log(l);
+ ceph_assert(r == 0);
+
+ logger->inc(l_bluefs_reclaim_bytes, got);
+ dout(1) << __func__ << " bdev " << id << " want 0x" << std::hex << want
+ << " got " << *extents << dendl;
+ return 0;
+}
+
+void BlueFS::handle_discard(unsigned id, interval_set<uint64_t>& to_release)
+{
+ dout(10) << __func__ << " bdev " << id << dendl;
+ ceph_assert(alloc[id]);
+ alloc[id]->release(to_release);
+}
+
+uint64_t BlueFS::get_used()
+{
+ std::lock_guard l(lock);
+ uint64_t used = 0;
+ for (unsigned id = 0; id < MAX_BDEV; ++id) {
+ if (alloc[id]) {
+ used += block_all[id].size() - alloc[id]->get_free();
+ }
+ }
+ return used;
+}
+
+uint64_t BlueFS::get_total(unsigned id)
+{
+ std::lock_guard l(lock);
+ ceph_assert(id < block_all.size());
+ return block_all[id].size();
+}
+
+uint64_t BlueFS::get_free(unsigned id)
+{
+ std::lock_guard l(lock);
+ ceph_assert(id < alloc.size());
+ return alloc[id]->get_free();
+}
+
+void BlueFS::dump_perf_counters(Formatter *f)
+{
+ f->open_object_section("bluefs_perf_counters");
+ logger->dump_formatted(f,0);
+ f->close_section();
+}
+
+void BlueFS::dump_block_extents(ostream& out)
+{
+ for (unsigned i = 0; i < MAX_BDEV; ++i) {
+ if (!bdev[i]) {
+ continue;
+ }
+ auto owned = get_total(i);
+ auto free = get_free(i);
+
+ out << i << " : device size 0x" << std::hex << bdev[i]->get_size()
+ << " : own 0x" << block_all[i]
+ << " = 0x" << owned
+ << " : using 0x" << owned - free
+ << std::dec << "(" << byte_u_t(owned - free) << ")";
+ if (i == _get_slow_device_id()) {
+ ceph_assert(slow_dev_expander);
+ ceph_assert(alloc[i]);
+ free = slow_dev_expander->available_freespace(alloc_size[i]);
+ out << std::hex
+ << " : bluestore has 0x" << free
+ << std::dec << "(" << byte_u_t(free) << ") available";
+ }
+ out << "\n";
+ }
+}
+
+void BlueFS::get_usage(vector<pair<uint64_t,uint64_t>> *usage)
+{
+ std::lock_guard l(lock);
+ usage->resize(bdev.size());
+ for (unsigned id = 0; id < bdev.size(); ++id) {
+ if (!bdev[id]) {
+ (*usage)[id] = make_pair(0, 0);
+ continue;
+ }
+ (*usage)[id].first = alloc[id]->get_free();
+ (*usage)[id].second = block_all[id].size();
+ uint64_t used =
+ (block_all[id].size() - (*usage)[id].first) * 100 / block_all[id].size();
+ dout(10) << __func__ << " bdev " << id
+ << " free " << (*usage)[id].first
+ << " (" << byte_u_t((*usage)[id].first) << ")"
+ << " / " << (*usage)[id].second
+ << " (" << byte_u_t((*usage)[id].second) << ")"
+ << ", used " << used << "%"
+ << dendl;
+ }
+}
+
+int BlueFS::get_block_extents(unsigned id, interval_set<uint64_t> *extents)
+{
+ std::lock_guard l(lock);
+ dout(10) << __func__ << " bdev " << id << dendl;
+ if (id >= block_all.size())
+ return -EINVAL;
+ *extents = block_all[id];
+ return 0;
+}
+
+int BlueFS::mkfs(uuid_d osd_uuid)
+{
+ std::unique_lock l(lock);
+ dout(1) << __func__
+ << " osd_uuid " << osd_uuid
+ << dendl;
+
+ // set volume selector if not provided before/outside
+ if (vselector == nullptr) {
+ vselector.reset(
+ new OriginalVolumeSelector(
+ get_block_device_size(BlueFS::BDEV_WAL) * 95 / 100,
+ get_block_device_size(BlueFS::BDEV_DB) * 95 / 100,
+ get_block_device_size(BlueFS::BDEV_SLOW) * 95 / 100));
+ }
+
+ _init_alloc();
+ _init_logger();
+
+ super.version = 1;
+ super.block_size = bdev[BDEV_DB]->get_block_size();
+ super.osd_uuid = osd_uuid;
+ super.uuid.generate_random();
+ dout(1) << __func__ << " uuid " << super.uuid << dendl;
+
+ // init log
+ FileRef log_file = new File;
+ log_file->fnode.ino = 1;
+ log_file->vselector_hint = vselector->get_hint_by_device(BDEV_WAL);
+ int r = _allocate(
+ vselector->select_prefer_bdev(log_file->vselector_hint),
+ cct->_conf->bluefs_max_log_runway,
+ &log_file->fnode);
+ vselector->add_usage(log_file->vselector_hint, log_file->fnode);
+ ceph_assert(r == 0);
+ log_writer = _create_writer(log_file);
+
+ // initial txn
+ log_t.op_init();
+ for (unsigned bdev = 0; bdev < MAX_BDEV; ++bdev) {
+ interval_set<uint64_t>& p = block_all[bdev];
+ if (p.empty())
+ continue;
+ for (interval_set<uint64_t>::iterator q = p.begin(); q != p.end(); ++q) {
+ dout(20) << __func__ << " op_alloc_add " << bdev << " 0x"
+ << std::hex << q.get_start() << "~" << q.get_len() << std::dec
+ << dendl;
+ log_t.op_alloc_add(bdev, q.get_start(), q.get_len());
+ }
+ }
+ _flush_and_sync_log(l);
+
+ // write supers
+ super.log_fnode = log_file->fnode;
+ _write_super(BDEV_DB);
+ flush_bdev();
+
+ // clean up
+ super = bluefs_super_t();
+ _close_writer(log_writer);
+ log_writer = NULL;
+ block_all.clear();
+ vselector.reset(nullptr);
+ _stop_alloc();
+ _shutdown_logger();
+
+ dout(10) << __func__ << " success" << dendl;
+ return 0;
+}
+
+void BlueFS::_init_alloc()
+{
+ dout(20) << __func__ << dendl;
+ alloc.resize(MAX_BDEV);
+ alloc_size.resize(MAX_BDEV, 0);
+ pending_release.resize(MAX_BDEV);
+
+ if (bdev[BDEV_WAL]) {
+ alloc_size[BDEV_WAL] = cct->_conf->bluefs_alloc_size;
+ }
+ if (bdev[BDEV_SLOW]) {
+ alloc_size[BDEV_DB] = cct->_conf->bluefs_alloc_size;
+ alloc_size[BDEV_SLOW] = cct->_conf->bluefs_shared_alloc_size;
+ } else {
+ alloc_size[BDEV_DB] = cct->_conf->bluefs_shared_alloc_size;
+ }
+ // new wal and db devices are never shared
+ if (bdev[BDEV_NEWWAL]) {
+ alloc_size[BDEV_NEWWAL] = cct->_conf->bluefs_alloc_size;
+ }
+ if (bdev[BDEV_NEWDB]) {
+ alloc_size[BDEV_NEWDB] = cct->_conf->bluefs_alloc_size;
+ }
+
+ for (unsigned id = 0; id < bdev.size(); ++id) {
+ if (!bdev[id]) {
+ continue;
+ }
+ ceph_assert(bdev[id]->get_size());
+ std::string name = "bluefs-";
+ const char* devnames[] = {"wal","db","slow"};
+ if (id <= BDEV_SLOW)
+ name += devnames[id];
+ else
+ name += to_string(uintptr_t(this));
+ ceph_assert(alloc_size[id]);
+ dout(1) << __func__ << " id " << id
+ << " alloc_size 0x" << std::hex << alloc_size[id]
+ << " size 0x" << bdev[id]->get_size() << std::dec << dendl;
+ alloc[id] = Allocator::create(cct, cct->_conf->bluefs_allocator,
+ bdev[id]->get_size(),
+ alloc_size[id], name);
+ interval_set<uint64_t>& p = block_all[id];
+ for (interval_set<uint64_t>::iterator q = p.begin(); q != p.end(); ++q) {
+ alloc[id]->init_add_free(q.get_start(), q.get_len());
+ }
+ }
+}
+
+void BlueFS::_stop_alloc()
+{
+ dout(20) << __func__ << dendl;
+ for (auto p : bdev) {
+ if (p)
+ p->discard_drain();
+ }
+
+ for (auto p : alloc) {
+ if (p != nullptr) {
+ p->shutdown();
+ delete p;
+ }
+ }
+ alloc.clear();
+}
+
+int BlueFS::read(uint8_t ndev, uint64_t off, uint64_t len,
+ ceph::buffer::list *pbl, IOContext *ioc, bool buffered)
+{
+ dout(10) << __func__ << " dev " << int(ndev)
+ << ": 0x" << std::hex << off << "~" << len << std::dec
+ << (buffered ? " buffered" : "")
+ << dendl;
+ int r;
+ bufferlist bl;
+ r = bdev[ndev]->read(off, len, &bl, ioc, buffered);
+ if (r != 0) {
+ return r;
+ }
+ uint64_t block_size = bdev[ndev]->get_block_size();
+ if (inject_read_zeros) {
+ if (len >= block_size * 2) {
+ derr << __func__ << " injecting error, zeros at "
+ << int(ndev) << ": 0x" << std::hex << (off + len / 2)
+ << "~" << (block_size * 2) << std::dec << dendl;
+ //use beginning, replace 8K in the middle with zeros, use tail
+ bufferlist temp;
+ bl.splice(0, len / 2 - block_size, &temp);
+ temp.append_zero(block_size * 2);
+ bl.splice(block_size * 2, len / 2 - block_size, &temp);
+ bl = temp;
+ inject_read_zeros--;
+ }
+ }
+ //make a check if there is a block with all 0
+ uint64_t to_check_len = len;
+ uint64_t skip = p2nphase(off, block_size);
+ if (skip >= to_check_len) {
+ return r;
+ }
+ auto it = bl.begin();
+ it.seek(skip);
+ to_check_len -= skip;
+ bool all_zeros = false;
+ while (all_zeros == false && to_check_len >= block_size) {
+ // checking 0s step
+ unsigned block_left = block_size;
+ unsigned avail;
+ const char* data;
+ all_zeros = true;
+ while (all_zeros && block_left > 0) {
+ avail = it.get_ptr_and_advance(block_left, &data);
+ block_left -= avail;
+ all_zeros = mem_is_zero(data, avail);
+ }
+ // skipping step
+ while (block_left > 0) {
+ avail = it.get_ptr_and_advance(block_left, &data);
+ block_left -= avail;
+ }
+ to_check_len -= block_size;
+ }
+ if (all_zeros) {
+ logger->inc(l_bluefs_read_zeros_candidate, 1);
+ bufferlist bl_reread;
+ r = bdev[ndev]->read(off, len, &bl_reread, ioc, buffered);
+ if (r != 0) {
+ return r;
+ }
+ // check if both read gave the same
+ if (!bl.contents_equal(bl_reread)) {
+ // report problems to log, but continue, maybe it will be good now...
+ derr << __func__ << " initial read of " << int(ndev)
+ << ": 0x" << std::hex << off << "~" << len
+ << std::dec << ": different then re-read " << dendl;
+ logger->inc(l_bluefs_read_zeros_errors, 1);
+ }
+ // use second read will be better if is different
+ pbl->append(bl_reread);
+ } else {
+ pbl->append(bl);
+ }
+ return r;
+}
+
+int BlueFS::read_random(uint8_t ndev, uint64_t off, uint64_t len, char *buf, bool buffered)
+{
+ dout(10) << __func__ << " dev " << int(ndev)
+ << ": 0x" << std::hex << off << "~" << len << std::dec
+ << (buffered ? " buffered" : "")
+ << dendl;
+ int r;
+ r = bdev[ndev]->read_random(off, len, buf, buffered);
+ if (r != 0) {
+ return r;
+ }
+ uint64_t block_size = bdev[ndev]->get_block_size();
+ if (inject_read_zeros) {
+ if (len >= block_size * 2) {
+ derr << __func__ << " injecting error, zeros at "
+ << int(ndev) << ": 0x" << std::hex << (off + len / 2)
+ << "~" << (block_size * 2) << std::dec << dendl;
+ //zero middle 8K
+ memset(buf + len / 2 - block_size, 0, block_size * 2);
+ inject_read_zeros--;
+ }
+ }
+ //make a check if there is a block with all 0
+ uint64_t to_check_len = len;
+ const char* data = buf;
+ uint64_t skip = p2nphase(off, block_size);
+ if (skip >= to_check_len) {
+ return r;
+ }
+ to_check_len -= skip;
+ data += skip;
+
+ bool all_zeros = false;
+ while (all_zeros == false && to_check_len >= block_size) {
+ if (mem_is_zero(data, block_size)) {
+ // at least one block is all zeros
+ all_zeros = true;
+ break;
+ }
+ data += block_size;
+ to_check_len -= block_size;
+ }
+ if (all_zeros) {
+ logger->inc(l_bluefs_read_zeros_candidate, 1);
+ std::unique_ptr<char[]> data_reread(new char[len]);
+ r = bdev[ndev]->read_random(off, len, &data_reread[0], buffered);
+ if (r != 0) {
+ return r;
+ }
+ // check if both read gave the same
+ if (memcmp(buf, &data_reread[0], len) != 0) {
+ derr << __func__ << " initial read of " << int(ndev)
+ << ": 0x" << std::hex << off << "~" << len
+ << std::dec << ": different then re-read " << dendl;
+ logger->inc(l_bluefs_read_zeros_errors, 1);
+ // second read is probably better
+ memcpy(buf, &data_reread[0], len);
+ }
+ }
+ return r;
+}
+
+int BlueFS::mount()
+{
+ dout(1) << __func__ << dendl;
+
+ int r = _open_super();
+ if (r < 0) {
+ derr << __func__ << " failed to open super: " << cpp_strerror(r) << dendl;
+ goto out;
+ }
+
+ // set volume selector if not provided before/outside
+ if (vselector == nullptr) {
+ vselector.reset(
+ new OriginalVolumeSelector(
+ get_block_device_size(BlueFS::BDEV_WAL) * 95 / 100,
+ get_block_device_size(BlueFS::BDEV_DB) * 95 / 100,
+ get_block_device_size(BlueFS::BDEV_SLOW) * 95 / 100));
+ }
+
+ block_all.clear();
+ block_all.resize(MAX_BDEV);
+ _init_alloc();
+ _init_logger();
+
+ r = _replay(false, false);
+ if (r < 0) {
+ derr << __func__ << " failed to replay log: " << cpp_strerror(r) << dendl;
+ _stop_alloc();
+ goto out;
+ }
+
+ // init freelist
+ for (auto& p : file_map) {
+ dout(30) << __func__ << " noting alloc for " << p.second->fnode << dendl;
+ for (auto& q : p.second->fnode.extents) {
+ alloc[q.bdev]->init_rm_free(q.offset, q.length);
+ }
+ }
+
+ // set up the log for future writes
+ log_writer = _create_writer(_get_file(1));
+ ceph_assert(log_writer->file->fnode.ino == 1);
+ log_writer->pos = log_writer->file->fnode.size;
+ dout(10) << __func__ << " log write pos set to 0x"
+ << std::hex << log_writer->pos << std::dec
+ << dendl;
+
+ return 0;
+
+ out:
+ super = bluefs_super_t();
+ return r;
+}
+
+void BlueFS::umount(bool avoid_compact)
+{
+ dout(1) << __func__ << dendl;
+
+ sync_metadata(avoid_compact);
+
+ _close_writer(log_writer);
+ log_writer = NULL;
+
+ vselector.reset(nullptr);
+ _stop_alloc();
+ file_map.clear();
+ dir_map.clear();
+ super = bluefs_super_t();
+ log_t.clear();
+ _shutdown_logger();
+}
+
+int BlueFS::prepare_new_device(int id)
+{
+ dout(1) << __func__ << dendl;
+
+ if(id == BDEV_NEWDB) {
+ int new_log_dev_cur = BDEV_WAL;
+ int new_log_dev_next = BDEV_WAL;
+ if (!bdev[BDEV_WAL]) {
+ new_log_dev_cur = BDEV_NEWDB;
+ new_log_dev_next = BDEV_DB;
+ }
+ _rewrite_log_sync(false,
+ BDEV_NEWDB,
+ new_log_dev_cur,
+ new_log_dev_next,
+ RENAME_DB2SLOW);
+ //}
+ } else if(id == BDEV_NEWWAL) {
+ _rewrite_log_sync(false, BDEV_DB, BDEV_NEWWAL, BDEV_WAL, REMOVE_WAL);
+ } else {
+ assert(false);
+ }
+ return 0;
+}
+
+void BlueFS::collect_metadata(map<string,string> *pm, unsigned skip_bdev_id)
+{
+ if (skip_bdev_id != BDEV_DB && bdev[BDEV_DB])
+ bdev[BDEV_DB]->collect_metadata("bluefs_db_", pm);
+ if (bdev[BDEV_WAL])
+ bdev[BDEV_WAL]->collect_metadata("bluefs_wal_", pm);
+}
+
+void BlueFS::get_devices(set<string> *ls)
+{
+ for (unsigned i = 0; i < MAX_BDEV; ++i) {
+ if (bdev[i]) {
+ bdev[i]->get_devices(ls);
+ }
+ }
+}
+
+int BlueFS::fsck()
+{
+ std::lock_guard l(lock);
+ dout(1) << __func__ << dendl;
+ // hrm, i think we check everything on mount...
+ return 0;
+}
+
+int BlueFS::_write_super(int dev)
+{
+ // build superblock
+ bufferlist bl;
+ encode(super, bl);
+ uint32_t crc = bl.crc32c(-1);
+ encode(crc, bl);
+ dout(10) << __func__ << " super block length(encoded): " << bl.length() << dendl;
+ dout(10) << __func__ << " superblock " << super.version << dendl;
+ dout(10) << __func__ << " log_fnode " << super.log_fnode << dendl;
+ ceph_assert(bl.length() <= get_super_length());
+ bl.append_zero(get_super_length() - bl.length());
+
+ bdev[dev]->write(get_super_offset(), bl, false, WRITE_LIFE_SHORT);
+ dout(20) << __func__ << " v " << super.version
+ << " crc 0x" << std::hex << crc
+ << " offset 0x" << get_super_offset() << std::dec
+ << dendl;
+ return 0;
+}
+
+int BlueFS::_open_super()
+{
+ dout(10) << __func__ << dendl;
+
+ bufferlist bl;
+ uint32_t expected_crc, crc;
+ int r;
+
+ // always the second block
+ r = bdev[BDEV_DB]->read(get_super_offset(), get_super_length(),
+ &bl, ioc[BDEV_DB], false);
+ if (r < 0)
+ return r;
+
+ auto p = bl.cbegin();
+ decode(super, p);
+ {
+ bufferlist t;
+ t.substr_of(bl, 0, p.get_off());
+ crc = t.crc32c(-1);
+ }
+ decode(expected_crc, p);
+ if (crc != expected_crc) {
+ derr << __func__ << " bad crc on superblock, expected 0x"
+ << std::hex << expected_crc << " != actual 0x" << crc << std::dec
+ << dendl;
+ return -EIO;
+ }
+ dout(10) << __func__ << " superblock " << super.version << dendl;
+ dout(10) << __func__ << " log_fnode " << super.log_fnode << dendl;
+ return 0;
+}
+
+int BlueFS::_replay(bool noop, bool to_stdout)
+{
+ dout(10) << __func__ << (noop ? " NO-OP" : "") << dendl;
+ ino_last = 1; // by the log
+ log_seq = 0;
+
+ FileRef log_file;
+ log_file = _get_file(1);
+ if (!noop) {
+ log_file->fnode = super.log_fnode;
+ log_file->vselector_hint =
+ vselector->get_hint_by_device(BDEV_WAL);
+ } else {
+ // do not use fnode from superblock in 'noop' mode - log_file's one should
+ // be fine and up-to-date
+ ceph_assert(log_file->fnode.ino == 1);
+ ceph_assert(log_file->fnode.extents.size() != 0);
+ }
+ dout(10) << __func__ << " log_fnode " << super.log_fnode << dendl;
+ if (unlikely(to_stdout)) {
+ std::cout << " log_fnode " << super.log_fnode << std::endl;
+ }
+
+ FileReader *log_reader = new FileReader(
+ log_file, cct->_conf->bluefs_max_prefetch,
+ false, // !random
+ true); // ignore eof
+ while (true) {
+ ceph_assert((log_reader->buf.pos & ~super.block_mask()) == 0);
+ uint64_t pos = log_reader->buf.pos;
+ uint64_t read_pos = pos;
+ bufferlist bl;
+ {
+ int r = _read(log_reader, &log_reader->buf, read_pos, super.block_size,
+ &bl, NULL);
+ if (r != (int)super.block_size && cct->_conf->bluefs_replay_recovery) {
+ r += do_replay_recovery_read(log_reader, pos, read_pos + r, super.block_size - r, &bl);
+ }
+ assert(r == (int)super.block_size);
+ read_pos += r;
+ }
+ uint64_t more = 0;
+ uint64_t seq;
+ uuid_d uuid;
+ {
+ auto p = bl.cbegin();
+ __u8 a, b;
+ uint32_t len;
+ decode(a, p);
+ decode(b, p);
+ decode(len, p);
+ decode(uuid, p);
+ decode(seq, p);
+ if (len + 6 > bl.length()) {
+ more = round_up_to(len + 6 - bl.length(), super.block_size);
+ }
+ }
+ if (uuid != super.uuid) {
+ dout(10) << __func__ << " 0x" << std::hex << pos << std::dec
+ << ": stop: uuid " << uuid << " != super.uuid " << super.uuid
+ << dendl;
+ break;
+ }
+ if (seq != log_seq + 1) {
+ dout(10) << __func__ << " 0x" << std::hex << pos << std::dec
+ << ": stop: seq " << seq << " != expected " << log_seq + 1
+ << dendl;
+ break;
+ }
+ if (more) {
+ dout(20) << __func__ << " need 0x" << std::hex << more << std::dec
+ << " more bytes" << dendl;
+ bufferlist t;
+ int r = _read(log_reader, &log_reader->buf, read_pos, more, &t, NULL);
+ if (r < (int)more) {
+ dout(10) << __func__ << " 0x" << std::hex << pos
+ << ": stop: len is 0x" << bl.length() + more << std::dec
+ << ", which is past eof" << dendl;
+ if (cct->_conf->bluefs_replay_recovery) {
+ //try to search for more data
+ r += do_replay_recovery_read(log_reader, pos, read_pos + r, more - r, &t);
+ if (r < (int)more) {
+ //in normal mode we must read r==more, for recovery it is too strict
+ break;
+ }
+ }
+ }
+ ceph_assert(r == (int)more);
+ bl.claim_append(t);
+ read_pos += r;
+ }
+ bluefs_transaction_t t;
+ try {
+ auto p = bl.cbegin();
+ decode(t, p);
+ }
+ catch (buffer::error& e) {
+ dout(10) << __func__ << " 0x" << std::hex << pos << std::dec
+ << ": stop: failed to decode: " << e.what()
+ << dendl;
+ delete log_reader;
+ return -EIO;
+ }
+ ceph_assert(seq == t.seq);
+ dout(10) << __func__ << " 0x" << std::hex << pos << std::dec
+ << ": " << t << dendl;
+ if (unlikely(to_stdout)) {
+ std::cout << " 0x" << std::hex << pos << std::dec
+ << ": " << t << std::endl;
+ }
+
+ auto p = t.op_bl.cbegin();
+ while (!p.end()) {
+ __u8 op;
+ decode(op, p);
+ switch (op) {
+
+ case bluefs_transaction_t::OP_INIT:
+ dout(20) << __func__ << " 0x" << std::hex << pos << std::dec
+ << ": op_init" << dendl;
+ if (unlikely(to_stdout)) {
+ std::cout << " 0x" << std::hex << pos << std::dec
+ << ": op_init" << std::endl;
+ }
+
+ ceph_assert(t.seq == 1);
+ break;
+
+ case bluefs_transaction_t::OP_JUMP:
+ {
+ uint64_t next_seq;
+ uint64_t offset;
+ decode(next_seq, p);
+ decode(offset, p);
+ dout(20) << __func__ << " 0x" << std::hex << pos << std::dec
+ << ": op_jump seq " << next_seq
+ << " offset 0x" << std::hex << offset << std::dec << dendl;
+ if (unlikely(to_stdout)) {
+ std::cout << " 0x" << std::hex << pos << std::dec
+ << ": op_jump seq " << next_seq
+ << " offset 0x" << std::hex << offset << std::dec
+ << std::endl;
+ }
+
+ ceph_assert(next_seq >= log_seq);
+ log_seq = next_seq - 1; // we will increment it below
+ uint64_t skip = offset - read_pos;
+ if (skip) {
+ bufferlist junk;
+ int r = _read(log_reader, &log_reader->buf, read_pos, skip, &junk,
+ NULL);
+ if (r != (int)skip) {
+ dout(10) << __func__ << " 0x" << std::hex << read_pos
+ << ": stop: failed to skip to " << offset
+ << std::dec << dendl;
+ ceph_abort_msg("problem with op_jump");
+ }
+ }
+ }
+ break;
+
+ case bluefs_transaction_t::OP_JUMP_SEQ:
+ {
+ uint64_t next_seq;
+ decode(next_seq, p);
+ dout(20) << __func__ << " 0x" << std::hex << pos << std::dec
+ << ": op_jump_seq " << next_seq << dendl;
+ if (unlikely(to_stdout)) {
+ std::cout << " 0x" << std::hex << pos << std::dec
+ << ": op_jump_seq " << next_seq << std::endl;
+ }
+
+ ceph_assert(next_seq >= log_seq);
+ log_seq = next_seq - 1; // we will increment it below
+ }
+ break;
+
+ case bluefs_transaction_t::OP_ALLOC_ADD:
+ {
+ __u8 id;
+ uint64_t offset, length;
+ decode(id, p);
+ decode(offset, p);
+ decode(length, p);
+ dout(20) << __func__ << " 0x" << std::hex << pos << std::dec
+ << ": op_alloc_add " << " " << (int)id
+ << ":0x" << std::hex << offset << "~" << length << std::dec
+ << dendl;
+ if (unlikely(to_stdout)) {
+ std::cout << " 0x" << std::hex << pos << std::dec
+ << ": op_alloc_add " << " " << (int)id
+ << ":0x" << std::hex << offset << "~" << length << std::dec
+ << std::endl;
+ }
+
+ if (!noop) {
+ block_all[id].insert(offset, length);
+ alloc[id]->init_add_free(offset, length);
+ }
+ }
+ break;
+
+ case bluefs_transaction_t::OP_ALLOC_RM:
+ {
+ __u8 id;
+ uint64_t offset, length;
+ decode(id, p);
+ decode(offset, p);
+ decode(length, p);
+ dout(20) << __func__ << " 0x" << std::hex << pos << std::dec
+ << ": op_alloc_rm " << " " << (int)id
+ << ":0x" << std::hex << offset << "~" << length << std::dec
+ << dendl;
+ if (unlikely(to_stdout)) {
+ std::cout << " 0x" << std::hex << pos << std::dec
+ << ": op_alloc_rm " << " " << (int)id
+ << ":0x" << std::hex << offset << "~" << length << std::dec
+ << std::endl;
+ }
+
+ if (!noop) {
+ block_all[id].erase(offset, length);
+ alloc[id]->init_rm_free(offset, length);
+ }
+ }
+ break;
+
+ case bluefs_transaction_t::OP_DIR_LINK:
+ {
+ string dirname, filename;
+ uint64_t ino;
+ decode(dirname, p);
+ decode(filename, p);
+ decode(ino, p);
+ dout(20) << __func__ << " 0x" << std::hex << pos << std::dec
+ << ": op_dir_link " << " " << dirname << "/" << filename
+ << " to " << ino
+ << dendl;
+ if (unlikely(to_stdout)) {
+ std::cout << " 0x" << std::hex << pos << std::dec
+ << ": op_dir_link " << " " << dirname << "/" << filename
+ << " to " << ino
+ << std::endl;
+ }
+
+ if (!noop) {
+ FileRef file = _get_file(ino);
+ ceph_assert(file->fnode.ino);
+ map<string,DirRef>::iterator q = dir_map.find(dirname);
+ ceph_assert(q != dir_map.end());
+ map<string,FileRef>::iterator r = q->second->file_map.find(filename);
+ ceph_assert(r == q->second->file_map.end());
+
+ vselector->sub_usage(file->vselector_hint, file->fnode);
+ file->vselector_hint =
+ vselector->get_hint_by_dir(dirname);
+ vselector->add_usage(file->vselector_hint, file->fnode);
+
+ q->second->file_map[filename] = file;
+ ++file->refs;
+ }
+ }
+ break;
+
+ case bluefs_transaction_t::OP_DIR_UNLINK:
+ {
+ string dirname, filename;
+ decode(dirname, p);
+ decode(filename, p);
+ dout(20) << __func__ << " 0x" << std::hex << pos << std::dec
+ << ": op_dir_unlink " << " " << dirname << "/" << filename
+ << dendl;
+ if (unlikely(to_stdout)) {
+ std::cout << " 0x" << std::hex << pos << std::dec
+ << ": op_dir_unlink " << " " << dirname << "/" << filename
+ << std::endl;
+ }
+
+ if (!noop) {
+ map<string,DirRef>::iterator q = dir_map.find(dirname);
+ ceph_assert(q != dir_map.end());
+ map<string,FileRef>::iterator r = q->second->file_map.find(filename);
+ ceph_assert(r != q->second->file_map.end());
+ ceph_assert(r->second->refs > 0);
+ --r->second->refs;
+ q->second->file_map.erase(r);
+ }
+ }
+ break;
+
+ case bluefs_transaction_t::OP_DIR_CREATE:
+ {
+ string dirname;
+ decode(dirname, p);
+ dout(20) << __func__ << " 0x" << std::hex << pos << std::dec
+ << ": op_dir_create " << dirname << dendl;
+ if (unlikely(to_stdout)) {
+ std::cout << " 0x" << std::hex << pos << std::dec
+ << ": op_dir_create " << dirname << std::endl;
+ }
+
+ if (!noop) {
+ map<string,DirRef>::iterator q = dir_map.find(dirname);
+ ceph_assert(q == dir_map.end());
+ dir_map[dirname] = new Dir;
+ }
+ }
+ break;
+
+ case bluefs_transaction_t::OP_DIR_REMOVE:
+ {
+ string dirname;
+ decode(dirname, p);
+ dout(20) << __func__ << " 0x" << std::hex << pos << std::dec
+ << ": op_dir_remove " << dirname << dendl;
+ if (unlikely(to_stdout)) {
+ std::cout << " 0x" << std::hex << pos << std::dec
+ << ": op_dir_remove " << dirname << std::endl;
+ }
+
+ if (!noop) {
+ map<string,DirRef>::iterator q = dir_map.find(dirname);
+ ceph_assert(q != dir_map.end());
+ ceph_assert(q->second->file_map.empty());
+ dir_map.erase(q);
+ }
+ }
+ break;
+
+ case bluefs_transaction_t::OP_FILE_UPDATE:
+ {
+ bluefs_fnode_t fnode;
+ decode(fnode, p);
+ dout(20) << __func__ << " 0x" << std::hex << pos << std::dec
+ << ": op_file_update " << " " << fnode << " " << dendl;
+ if (unlikely(to_stdout)) {
+ std::cout << " 0x" << std::hex << pos << std::dec
+ << ": op_file_update " << " " << fnode << std::endl;
+ }
+
+ if (!noop) {
+ FileRef f = _get_file(fnode.ino);
+ if (fnode.ino != 1) {
+ vselector->sub_usage(f->vselector_hint, f->fnode);
+ }
+ f->fnode = fnode;
+ if (fnode.ino != 1) {
+ vselector->add_usage(f->vselector_hint, f->fnode);
+ }
+
+ if (fnode.ino > ino_last) {
+ ino_last = fnode.ino;
+ }
+ }
+ }
+ break;
+
+ case bluefs_transaction_t::OP_FILE_REMOVE:
+ {
+ uint64_t ino;
+ decode(ino, p);
+ dout(20) << __func__ << " 0x" << std::hex << pos << std::dec
+ << ": op_file_remove " << ino << dendl;
+ if (unlikely(to_stdout)) {
+ std::cout << " 0x" << std::hex << pos << std::dec
+ << ": op_file_remove " << ino << std::endl;
+ }
+
+ if (!noop) {
+ auto p = file_map.find(ino);
+ ceph_assert(p != file_map.end());
+ vselector->sub_usage(p->second->vselector_hint, p->second->fnode);
+ file_map.erase(p);
+ }
+ }
+ break;
+
+ default:
+ derr << __func__ << " 0x" << std::hex << pos << std::dec
+ << ": stop: unrecognized op " << (int)op << dendl;
+ delete log_reader;
+ return -EIO;
+ }
+ }
+ ceph_assert(p.end());
+
+ // we successfully replayed the transaction; bump the seq and log size
+ ++log_seq;
+ log_file->fnode.size = log_reader->buf.pos;
+ }
+ vselector->add_usage(log_file->vselector_hint, log_file->fnode);
+
+ dout(10) << __func__ << " log file size was 0x"
+ << std::hex << log_file->fnode.size << std::dec << dendl;
+ if (unlikely(to_stdout)) {
+ std::cout << " log file size was 0x"
+ << std::hex << log_file->fnode.size << std::dec << std::endl;
+ }
+
+ delete log_reader;
+
+ if (!noop) {
+ // verify file link counts are all >0
+ for (auto& p : file_map) {
+ if (p.second->refs == 0 &&
+ p.second->fnode.ino > 1) {
+ derr << __func__ << " file with link count 0: " << p.second->fnode
+ << dendl;
+ return -EIO;
+ }
+ }
+ }
+
+ dout(10) << __func__ << " done" << dendl;
+ return 0;
+}
+
+int BlueFS::log_dump()
+{
+ // only dump log file's content
+ int r = _replay(true, true);
+ if (r < 0) {
+ derr << __func__ << " failed to replay log: " << cpp_strerror(r) << dendl;
+ return r;
+ }
+
+ return 0;
+}
+
+int BlueFS::device_migrate_to_existing(
+ CephContext *cct,
+ const set<int>& devs_source,
+ int dev_target)
+{
+ vector<byte> buf;
+ bool buffered = cct->_conf->bluefs_buffered_io;
+
+ dout(10) << __func__ << " devs_source " << devs_source
+ << " dev_target " << dev_target << dendl;
+ assert(dev_target < (int)MAX_BDEV);
+
+ int flags = 0;
+ flags |= devs_source.count(BDEV_DB) ?
+ (REMOVE_DB | RENAME_SLOW2DB) : 0;
+ flags |= devs_source.count(BDEV_WAL) ? REMOVE_WAL : 0;
+ int dev_target_new = dev_target;
+
+ // Slow device without separate DB one is addressed via BDEV_DB
+ // Hence need renaming.
+ if ((flags & REMOVE_DB) && dev_target == BDEV_SLOW) {
+ dev_target_new = BDEV_DB;
+ dout(0) << __func__ << " super to be written to " << dev_target << dendl;
+ }
+
+ for (auto& p : file_map) {
+ //do not copy log
+ if (p.second->fnode.ino == 1) {
+ continue;
+ }
+ dout(10) << __func__ << " " << p.first << " " << p.second->fnode << dendl;
+
+ auto& fnode_extents = p.second->fnode.extents;
+
+ bool rewrite = false;
+ for (auto ext_it = fnode_extents.begin();
+ ext_it != p.second->fnode.extents.end();
+ ++ext_it) {
+ if (ext_it->bdev != dev_target && devs_source.count(ext_it->bdev)) {
+ rewrite = true;
+ break;
+ }
+ }
+ if (rewrite) {
+ dout(10) << __func__ << " migrating" << dendl;
+
+ // read entire file
+ bufferlist bl;
+ for (auto old_ext : fnode_extents) {
+ buf.resize(old_ext.length);
+ int r = bdev[old_ext.bdev]->read_random(
+ old_ext.offset,
+ old_ext.length,
+ (char*)&buf.at(0),
+ buffered);
+ if (r != 0) {
+ derr << __func__ << " failed to read 0x" << std::hex
+ << old_ext.offset << "~" << old_ext.length << std::dec
+ << " from " << (int)dev_target << dendl;
+ return -EIO;
+ }
+ bl.append((char*)&buf[0], old_ext.length);
+ }
+
+ // write entire file
+ PExtentVector extents;
+ auto l = _allocate_without_fallback(dev_target, bl.length(), &extents);
+ if (l < 0) {
+ derr << __func__ << " unable to allocate len 0x" << std::hex
+ << bl.length() << std::dec << " from " << (int)dev_target
+ << ": " << cpp_strerror(l) << dendl;
+ return -ENOSPC;
+ }
+
+ uint64_t off = 0;
+ for (auto& i : extents) {
+ bufferlist cur;
+ uint64_t cur_len = std::min<uint64_t>(i.length, bl.length() - off);
+ ceph_assert(cur_len > 0);
+ cur.substr_of(bl, off, cur_len);
+ int r = bdev[dev_target]->write(i.offset, cur, buffered);
+ ceph_assert(r == 0);
+ off += cur_len;
+ }
+
+ // release old extents
+ for (auto old_ext : fnode_extents) {
+ PExtentVector to_release;
+ to_release.emplace_back(old_ext.offset, old_ext.length);
+ alloc[old_ext.bdev]->release(to_release);
+ }
+
+ // update fnode
+ fnode_extents.clear();
+ for (auto& i : extents) {
+ fnode_extents.emplace_back(dev_target_new, i.offset, i.length);
+ }
+ } else {
+ for (auto ext_it = fnode_extents.begin();
+ ext_it != p.second->fnode.extents.end();
+ ++ext_it) {
+ if (dev_target != dev_target_new && ext_it->bdev == dev_target) {
+ dout(20) << __func__ << " " << " ... adjusting extent 0x"
+ << std::hex << ext_it->offset << std::dec
+ << " bdev " << dev_target << " -> " << dev_target_new
+ << dendl;
+ ext_it->bdev = dev_target_new;
+ }
+ }
+ }
+ }
+ // new logging device in the current naming scheme
+ int new_log_dev_cur = bdev[BDEV_WAL] ?
+ BDEV_WAL :
+ bdev[BDEV_DB] ? BDEV_DB : BDEV_SLOW;
+
+ // new logging device in new naming scheme
+ int new_log_dev_next = new_log_dev_cur;
+
+ if (devs_source.count(new_log_dev_cur)) {
+ // SLOW device is addressed via BDEV_DB too hence either WAL or DB
+ new_log_dev_next = (flags & REMOVE_WAL) || !bdev[BDEV_WAL] ?
+ BDEV_DB :
+ BDEV_WAL;
+
+ dout(0) << __func__ << " log moved from " << new_log_dev_cur
+ << " to " << new_log_dev_next << dendl;
+
+ new_log_dev_cur =
+ (flags & REMOVE_DB) && new_log_dev_next == BDEV_DB ?
+ BDEV_SLOW :
+ new_log_dev_next;
+ }
+
+ _rewrite_log_sync(
+ false,
+ (flags & REMOVE_DB) ? BDEV_SLOW : BDEV_DB,
+ new_log_dev_cur,
+ new_log_dev_next,
+ flags);
+ return 0;
+}
+
+int BlueFS::device_migrate_to_new(
+ CephContext *cct,
+ const set<int>& devs_source,
+ int dev_target)
+{
+ vector<byte> buf;
+ bool buffered = cct->_conf->bluefs_buffered_io;
+
+ dout(10) << __func__ << " devs_source " << devs_source
+ << " dev_target " << dev_target << dendl;
+ assert(dev_target == (int)BDEV_NEWDB || (int)BDEV_NEWWAL);
+
+ int flags = 0;
+
+ flags |= devs_source.count(BDEV_DB) ?
+ (!bdev[BDEV_SLOW] ? RENAME_DB2SLOW: REMOVE_DB) :
+ 0;
+ flags |= devs_source.count(BDEV_WAL) ? REMOVE_WAL : 0;
+ int dev_target_new = dev_target; //FIXME: remove, makes no sense
+
+ for (auto& p : file_map) {
+ //do not copy log
+ if (p.second->fnode.ino == 1) {
+ continue;
+ }
+ dout(10) << __func__ << " " << p.first << " " << p.second->fnode << dendl;
+
+ auto& fnode_extents = p.second->fnode.extents;
+
+ bool rewrite = false;
+ for (auto ext_it = fnode_extents.begin();
+ ext_it != p.second->fnode.extents.end();
+ ++ext_it) {
+ if (ext_it->bdev != dev_target && devs_source.count(ext_it->bdev)) {
+ rewrite = true;
+ break;
+ }
+ }
+ if (rewrite) {
+ dout(10) << __func__ << " migrating" << dendl;
+
+ // read entire file
+ bufferlist bl;
+ for (auto old_ext : fnode_extents) {
+ buf.resize(old_ext.length);
+ int r = bdev[old_ext.bdev]->read_random(
+ old_ext.offset,
+ old_ext.length,
+ (char*)&buf.at(0),
+ buffered);
+ if (r != 0) {
+ derr << __func__ << " failed to read 0x" << std::hex
+ << old_ext.offset << "~" << old_ext.length << std::dec
+ << " from " << (int)dev_target << dendl;
+ return -EIO;
+ }
+ bl.append((char*)&buf[0], old_ext.length);
+ }
+
+ // write entire file
+ PExtentVector extents;
+ auto l = _allocate_without_fallback(dev_target, bl.length(), &extents);
+ if (l < 0) {
+ derr << __func__ << " unable to allocate len 0x" << std::hex
+ << bl.length() << std::dec << " from " << (int)dev_target
+ << ": " << cpp_strerror(l) << dendl;
+ return -ENOSPC;
+ }
+
+ uint64_t off = 0;
+ for (auto& i : extents) {
+ bufferlist cur;
+ uint64_t cur_len = std::min<uint64_t>(i.length, bl.length() - off);
+ ceph_assert(cur_len > 0);
+ cur.substr_of(bl, off, cur_len);
+ int r = bdev[dev_target]->write(i.offset, cur, buffered);
+ ceph_assert(r == 0);
+ off += cur_len;
+ }
+
+ // release old extents
+ for (auto old_ext : fnode_extents) {
+ PExtentVector to_release;
+ to_release.emplace_back(old_ext.offset, old_ext.length);
+ alloc[old_ext.bdev]->release(to_release);
+ }
+
+ // update fnode
+ fnode_extents.clear();
+ for (auto& i : extents) {
+ fnode_extents.emplace_back(dev_target_new, i.offset, i.length);
+ }
+ }
+ }
+ // new logging device in the current naming scheme
+ int new_log_dev_cur =
+ bdev[BDEV_NEWWAL] ?
+ BDEV_NEWWAL :
+ bdev[BDEV_WAL] && !(flags & REMOVE_WAL) ?
+ BDEV_WAL :
+ bdev[BDEV_NEWDB] ?
+ BDEV_NEWDB :
+ bdev[BDEV_DB] && !(flags & REMOVE_DB)?
+ BDEV_DB :
+ BDEV_SLOW;
+
+ // new logging device in new naming scheme
+ int new_log_dev_next =
+ new_log_dev_cur == BDEV_NEWWAL ?
+ BDEV_WAL :
+ new_log_dev_cur == BDEV_NEWDB ?
+ BDEV_DB :
+ new_log_dev_cur;
+
+ int super_dev =
+ dev_target == BDEV_NEWDB ?
+ BDEV_NEWDB :
+ bdev[BDEV_DB] ?
+ BDEV_DB :
+ BDEV_SLOW;
+
+ _rewrite_log_sync(
+ false,
+ super_dev,
+ new_log_dev_cur,
+ new_log_dev_next,
+ flags);
+ return 0;
+}
+
+BlueFS::FileRef BlueFS::_get_file(uint64_t ino)
+{
+ auto p = file_map.find(ino);
+ if (p == file_map.end()) {
+ FileRef f = new File;
+ file_map[ino] = f;
+ dout(30) << __func__ << " ino " << ino << " = " << f
+ << " (new)" << dendl;
+ return f;
+ } else {
+ dout(30) << __func__ << " ino " << ino << " = " << p->second << dendl;
+ return p->second;
+ }
+}
+
+void BlueFS::_drop_link(FileRef file)
+{
+ dout(20) << __func__ << " had refs " << file->refs
+ << " on " << file->fnode << dendl;
+ ceph_assert(file->refs > 0);
+ --file->refs;
+ if (file->refs == 0) {
+ dout(20) << __func__ << " destroying " << file->fnode << dendl;
+ ceph_assert(file->num_reading.load() == 0);
+ vselector->sub_usage(file->vselector_hint, file->fnode);
+ log_t.op_file_remove(file->fnode.ino);
+ for (auto& r : file->fnode.extents) {
+ pending_release[r.bdev].insert(r.offset, r.length);
+ }
+ file_map.erase(file->fnode.ino);
+ file->deleted = true;
+
+ if (file->dirty_seq) {
+ ceph_assert(file->dirty_seq > log_seq_stable);
+ ceph_assert(dirty_files.count(file->dirty_seq));
+ auto it = dirty_files[file->dirty_seq].iterator_to(*file);
+ dirty_files[file->dirty_seq].erase(it);
+ file->dirty_seq = 0;
+ }
+ }
+}
+
+int64_t BlueFS::_read_random(
+ FileReader *h, ///< [in] read from here
+ uint64_t off, ///< [in] offset
+ size_t len, ///< [in] this many bytes
+ char *out) ///< [out] optional: or copy it here
+{
+ auto* buf = &h->buf;
+
+ int64_t ret = 0;
+ dout(10) << __func__ << " h " << h
+ << " 0x" << std::hex << off << "~" << len << std::dec
+ << " from " << h->file->fnode << dendl;
+
+ ++h->file->num_reading;
+
+ if (!h->ignore_eof &&
+ off + len > h->file->fnode.size) {
+ if (off > h->file->fnode.size)
+ len = 0;
+ else
+ len = h->file->fnode.size - off;
+ dout(20) << __func__ << " reaching (or past) eof, len clipped to 0x"
+ << std::hex << len << std::dec << dendl;
+ }
+ logger->inc(l_bluefs_read_random_count, 1);
+ logger->inc(l_bluefs_read_random_bytes, len);
+
+ std::shared_lock s_lock(h->lock);
+ buf->bl.reassign_to_mempool(mempool::mempool_bluefs_file_reader);
+ while (len > 0) {
+ if (off < buf->bl_off || off >= buf->get_buf_end()) {
+ s_lock.unlock();
+ uint64_t x_off = 0;
+ auto p = h->file->fnode.seek(off, &x_off);
+ ceph_assert(p != h->file->fnode.extents.end());
+ uint64_t l = std::min(p->length - x_off, static_cast<uint64_t>(len));
+ //hard cap to 1GB
+ l = std::min(l, uint64_t(1) << 30);
+ dout(20) << __func__ << " read random 0x"
+ << std::hex << x_off << "~" << l << std::dec
+ << " of " << *p << dendl;
+ int r;
+ if (!cct->_conf->bluefs_check_for_zeros) {
+ r = bdev[p->bdev]->read_random(p->offset + x_off, l, out,
+ cct->_conf->bluefs_buffered_io);
+ } else {
+ r = read_random(p->bdev, p->offset + x_off, l, out,
+ cct->_conf->bluefs_buffered_io);
+ }
+ ceph_assert(r == 0);
+ off += l;
+ len -= l;
+ ret += l;
+ out += l;
+
+ logger->inc(l_bluefs_read_random_disk_count, 1);
+ logger->inc(l_bluefs_read_random_disk_bytes, l);
+ if (len > 0) {
+ s_lock.lock();
+ }
+ } else {
+ auto left = buf->get_buf_remaining(off);
+ int64_t r = std::min(len, left);
+ logger->inc(l_bluefs_read_random_buffer_count, 1);
+ logger->inc(l_bluefs_read_random_buffer_bytes, r);
+ dout(20) << __func__ << " left 0x" << std::hex << left
+ << " 0x" << off << "~" << len << std::dec
+ << dendl;
+
+ if (out) {
+ // NOTE: h->bl is normally a contiguous buffer so c_str() is free.
+ memcpy(out, buf->bl.c_str() + off - buf->bl_off, r);
+ out += r;
+ }
+
+ dout(30) << __func__ << " result chunk (0x"
+ << std::hex << r << std::dec << " bytes):\n";
+ bufferlist t;
+ t.substr_of(buf->bl, off - buf->bl_off, r);
+ t.hexdump(*_dout);
+ *_dout << dendl;
+
+ off += r;
+ len -= r;
+ ret += r;
+ buf->pos += r;
+ }
+ }
+ dout(20) << __func__ << " got " << ret << dendl;
+ --h->file->num_reading;
+ return ret;
+}
+
+int64_t BlueFS::_read(
+ FileReader *h, ///< [in] read from here
+ FileReaderBuffer *buf, ///< [in] reader state
+ uint64_t off, ///< [in] offset
+ size_t len, ///< [in] this many bytes
+ bufferlist *outbl, ///< [out] optional: reference the result here
+ char *out) ///< [out] optional: or copy it here
+{
+ bool prefetch = !outbl && !out;
+ dout(10) << __func__ << " h " << h
+ << " 0x" << std::hex << off << "~" << len << std::dec
+ << " from " << h->file->fnode
+ << (prefetch ? " prefetch" : "")
+ << dendl;
+
+ ++h->file->num_reading;
+
+ if (!h->ignore_eof &&
+ off + len > h->file->fnode.size) {
+ if (off > h->file->fnode.size)
+ len = 0;
+ else
+ len = h->file->fnode.size - off;
+ dout(20) << __func__ << " reaching (or past) eof, len clipped to 0x"
+ << std::hex << len << std::dec << dendl;
+ }
+ logger->inc(l_bluefs_read_count, 1);
+ logger->inc(l_bluefs_read_bytes, len);
+ if (prefetch) {
+ logger->inc(l_bluefs_read_prefetch_count, 1);
+ logger->inc(l_bluefs_read_prefetch_bytes, len);
+ }
+
+ if (outbl)
+ outbl->clear();
+
+ int64_t ret = 0;
+ std::shared_lock s_lock(h->lock);
+ while (len > 0) {
+ size_t left;
+ if (off < buf->bl_off || off >= buf->get_buf_end()) {
+ s_lock.unlock();
+ std::unique_lock u_lock(h->lock);
+ buf->bl.reassign_to_mempool(mempool::mempool_bluefs_file_reader);
+ if (off < buf->bl_off || off >= buf->get_buf_end()) {
+ // if precondition hasn't changed during locking upgrade.
+ buf->bl.clear();
+ buf->bl_off = off & super.block_mask();
+ uint64_t x_off = 0;
+ auto p = h->file->fnode.seek(buf->bl_off, &x_off);
+ if (p == h->file->fnode.extents.end()) {
+ dout(5) << __func__ << " reading less then required "
+ << ret << "<" << ret + len << dendl;
+ break;
+ }
+
+ uint64_t want = round_up_to(len + (off & ~super.block_mask()),
+ super.block_size);
+ want = std::max(want, buf->max_prefetch);
+ uint64_t l = std::min(p->length - x_off, want);
+ //hard cap to 1GB
+ l = std::min(l, uint64_t(1) << 30);
+ uint64_t eof_offset = round_up_to(h->file->fnode.size, super.block_size);
+ if (!h->ignore_eof &&
+ buf->bl_off + l > eof_offset) {
+ l = eof_offset - buf->bl_off;
+ }
+ dout(20) << __func__ << " fetching 0x"
+ << std::hex << x_off << "~" << l << std::dec
+ << " of " << *p << dendl;
+ int r;
+ if (!cct->_conf->bluefs_check_for_zeros) {
+ r = bdev[p->bdev]->read(p->offset + x_off, l, &buf->bl, ioc[p->bdev],
+ cct->_conf->bluefs_buffered_io);
+ } else {
+ r = read(p->bdev, p->offset + x_off, l, &buf->bl, ioc[p->bdev],
+ cct->_conf->bluefs_buffered_io);
+ }
+ ceph_assert(r == 0);
+ }
+ u_lock.unlock();
+ s_lock.lock();
+ // we should recheck if buffer is valid after lock downgrade
+ continue;
+ }
+ left = buf->get_buf_remaining(off);
+ dout(20) << __func__ << " left 0x" << std::hex << left
+ << " len 0x" << len << std::dec << dendl;
+
+ int64_t r = std::min(len, left);
+ if (outbl) {
+ bufferlist t;
+ t.substr_of(buf->bl, off - buf->bl_off, r);
+ outbl->claim_append(t);
+ }
+ if (out) {
+ // NOTE: h->bl is normally a contiguous buffer so c_str() is free.
+ memcpy(out, buf->bl.c_str() + off - buf->bl_off, r);
+ out += r;
+ }
+
+ dout(30) << __func__ << " result chunk (0x"
+ << std::hex << r << std::dec << " bytes):\n";
+ bufferlist t;
+ t.substr_of(buf->bl, off - buf->bl_off, r);
+ t.hexdump(*_dout);
+ *_dout << dendl;
+
+ off += r;
+ len -= r;
+ ret += r;
+ buf->pos += r;
+ }
+ dout(20) << __func__ << " got " << ret << dendl;
+ ceph_assert(!outbl || (int)outbl->length() == ret);
+ --h->file->num_reading;
+ return ret;
+}
+
+void BlueFS::_invalidate_cache(FileRef f, uint64_t offset, uint64_t length)
+{
+ dout(10) << __func__ << " file " << f->fnode
+ << " 0x" << std::hex << offset << "~" << length << std::dec
+ << dendl;
+ if (offset & ~super.block_mask()) {
+ offset &= super.block_mask();
+ length = round_up_to(length, super.block_size);
+ }
+ uint64_t x_off = 0;
+ auto p = f->fnode.seek(offset, &x_off);
+ while (length > 0 && p != f->fnode.extents.end()) {
+ uint64_t x_len = std::min(p->length - x_off, length);
+ bdev[p->bdev]->invalidate_cache(p->offset + x_off, x_len);
+ dout(20) << __func__ << " 0x" << std::hex << x_off << "~" << x_len
+ << std:: dec << " of " << *p << dendl;
+ offset += x_len;
+ length -= x_len;
+ }
+}
+
+uint64_t BlueFS::_estimate_log_size()
+{
+ int avg_dir_size = 40; // fixme
+ int avg_file_size = 12;
+ uint64_t size = 4096 * 2;
+ size += file_map.size() * (1 + sizeof(bluefs_fnode_t));
+ for (auto& p : block_all)
+ size += p.num_intervals() * (1 + 1 + sizeof(uint64_t) * 2);
+ size += dir_map.size() + (1 + avg_dir_size);
+ size += file_map.size() * (1 + avg_dir_size + avg_file_size);
+ return round_up_to(size, super.block_size);
+}
+
+void BlueFS::compact_log()
+{
+ std::unique_lock<ceph::mutex> l(lock);
+ if (!cct->_conf->bluefs_replay_recovery_disable_compact) {
+ if (cct->_conf->bluefs_compact_log_sync) {
+ _compact_log_sync();
+ } else {
+ _compact_log_async(l);
+ }
+ }
+}
+
+bool BlueFS::_should_compact_log()
+{
+ uint64_t current = log_writer->file->fnode.size;
+ uint64_t expected = _estimate_log_size();
+ float ratio = (float)current / (float)expected;
+ dout(10) << __func__ << " current 0x" << std::hex << current
+ << " expected " << expected << std::dec
+ << " ratio " << ratio
+ << (new_log ? " (async compaction in progress)" : "")
+ << dendl;
+ if (new_log ||
+ current < cct->_conf->bluefs_log_compact_min_size ||
+ ratio < cct->_conf->bluefs_log_compact_min_ratio) {
+ return false;
+ }
+ return true;
+}
+
+void BlueFS::_compact_log_dump_metadata(bluefs_transaction_t *t,
+ int flags)
+{
+ t->seq = 1;
+ t->uuid = super.uuid;
+ dout(20) << __func__ << " op_init" << dendl;
+
+ t->op_init();
+ for (unsigned bdev = 0; bdev < MAX_BDEV; ++bdev) {
+ interval_set<uint64_t>& p = block_all[bdev];
+ for (interval_set<uint64_t>::iterator q = p.begin(); q != p.end(); ++q) {
+ auto bdev_new = bdev;
+ if ((flags & REMOVE_WAL) && bdev == BDEV_WAL) {
+ continue;
+ }
+ if ((flags & REMOVE_DB) && bdev == BDEV_DB) {
+ continue;
+ }
+ if ((flags & RENAME_SLOW2DB) && bdev == BDEV_SLOW) {
+ bdev_new = BDEV_DB;
+ }
+ if ((flags & RENAME_DB2SLOW) && bdev == BDEV_DB) {
+ bdev_new = BDEV_SLOW;
+ }
+ if (bdev == BDEV_NEWDB) {
+ // REMOVE_DB xor RENAME_DB
+ ceph_assert(!(flags & REMOVE_DB) != !(flags & RENAME_DB2SLOW));
+ ceph_assert(!(flags & RENAME_SLOW2DB));
+ bdev_new = BDEV_DB;
+ }
+ if (bdev == BDEV_NEWWAL) {
+ ceph_assert(flags & REMOVE_WAL);
+ bdev_new = BDEV_WAL;
+ }
+ dout(20) << __func__ << " op_alloc_add " << bdev_new << " 0x"
+ << std::hex << q.get_start() << "~" << q.get_len() << std::dec
+ << dendl;
+ t->op_alloc_add(bdev_new, q.get_start(), q.get_len());
+ }
+ }
+ for (auto& p : file_map) {
+ if (p.first == 1)
+ continue;
+ ceph_assert(p.first > 1);
+
+ for(auto& e : p.second->fnode.extents) {
+ auto bdev = e.bdev;
+ auto bdev_new = bdev;
+ ceph_assert(!((flags & REMOVE_WAL) && bdev == BDEV_WAL));
+ if ((flags & RENAME_SLOW2DB) && bdev == BDEV_SLOW) {
+ bdev_new = BDEV_DB;
+ }
+ if ((flags & RENAME_DB2SLOW) && bdev == BDEV_DB) {
+ bdev_new = BDEV_SLOW;
+ }
+ if (bdev == BDEV_NEWDB) {
+ // REMOVE_DB xor RENAME_DB
+ ceph_assert(!(flags & REMOVE_DB) != !(flags & RENAME_DB2SLOW));
+ ceph_assert(!(flags & RENAME_SLOW2DB));
+ bdev_new = BDEV_DB;
+ }
+ if (bdev == BDEV_NEWWAL) {
+ ceph_assert(flags & REMOVE_WAL);
+ bdev_new = BDEV_WAL;
+ }
+ e.bdev = bdev_new;
+ }
+ dout(20) << __func__ << " op_file_update " << p.second->fnode << dendl;
+ t->op_file_update(p.second->fnode);
+ }
+ for (auto& p : dir_map) {
+ dout(20) << __func__ << " op_dir_create " << p.first << dendl;
+ t->op_dir_create(p.first);
+ for (auto& q : p.second->file_map) {
+ dout(20) << __func__ << " op_dir_link " << p.first << "/" << q.first
+ << " to " << q.second->fnode.ino << dendl;
+ t->op_dir_link(p.first, q.first, q.second->fnode.ino);
+ }
+ }
+}
+
+void BlueFS::_compact_log_sync()
+{
+ dout(10) << __func__ << dendl;
+ auto prefer_bdev =
+ vselector->select_prefer_bdev(log_writer->file->vselector_hint);
+ _rewrite_log_sync(true,
+ BDEV_DB,
+ prefer_bdev,
+ prefer_bdev,
+ 0);
+ logger->inc(l_bluefs_log_compactions);
+}
+
+void BlueFS::_rewrite_log_sync(bool allocate_with_fallback,
+ int super_dev,
+ int log_dev,
+ int log_dev_new,
+ int flags)
+{
+ File *log_file = log_writer->file.get();
+
+ // clear out log (be careful who calls us!!!)
+ log_t.clear();
+
+ dout(20) << __func__ << " super_dev:" << super_dev
+ << " log_dev:" << log_dev
+ << " log_dev_new:" << log_dev_new
+ << " flags:" << flags
+ << dendl;
+ bluefs_transaction_t t;
+ _compact_log_dump_metadata(&t, flags);
+
+ dout(20) << __func__ << " op_jump_seq " << log_seq << dendl;
+ t.op_jump_seq(log_seq);
+
+ bufferlist bl;
+ encode(t, bl);
+ _pad_bl(bl);
+
+ uint64_t need = bl.length() + cct->_conf->bluefs_max_log_runway;
+ dout(20) << __func__ << " need " << need << dendl;
+
+ bluefs_fnode_t old_fnode;
+ int r;
+ log_file->fnode.swap_extents(old_fnode);
+ if (allocate_with_fallback) {
+ r = _allocate(log_dev, need, &log_file->fnode);
+ ceph_assert(r == 0);
+ } else {
+ PExtentVector extents;
+ r = _allocate_without_fallback(log_dev,
+ need,
+ &extents);
+ ceph_assert(r == 0);
+ for (auto& p : extents) {
+ log_file->fnode.append_extent(
+ bluefs_extent_t(log_dev, p.offset, p.length));
+ }
+ }
+
+ _close_writer(log_writer);
+
+ log_file->fnode.size = bl.length();
+ vselector->sub_usage(log_file->vselector_hint, old_fnode);
+ vselector->add_usage(log_file->vselector_hint, log_file->fnode);
+
+ log_writer = _create_writer(log_file);
+ log_writer->append(bl);
+ r = _flush(log_writer, true);
+ ceph_assert(r == 0);
+#ifdef HAVE_LIBAIO
+ if (!cct->_conf->bluefs_sync_write) {
+ list<aio_t> completed_ios;
+ _claim_completed_aios(log_writer, &completed_ios);
+ wait_for_aio(log_writer);
+ completed_ios.clear();
+ }
+#endif
+ flush_bdev();
+
+ super.log_fnode = log_file->fnode;
+ // rename device if needed
+ if (log_dev != log_dev_new) {
+ dout(10) << __func__ << " renaming log extents to " << log_dev_new << dendl;
+ for (auto& p : super.log_fnode.extents) {
+ p.bdev = log_dev_new;
+ }
+ }
+ dout(10) << __func__ << " writing super, log fnode: " << super.log_fnode << dendl;
+
+ ++super.version;
+ _write_super(super_dev);
+ flush_bdev();
+
+ dout(10) << __func__ << " release old log extents " << old_fnode.extents << dendl;
+ for (auto& r : old_fnode.extents) {
+ pending_release[r.bdev].insert(r.offset, r.length);
+ }
+}
+
+/*
+ * 1. Allocate a new extent to continue the log, and then log an event
+ * that jumps the log write position to the new extent. At this point, the
+ * old extent(s) won't be written to, and reflect everything to compact.
+ * New events will be written to the new region that we'll keep.
+ *
+ * 2. While still holding the lock, encode a bufferlist that dumps all of the
+ * in-memory fnodes and names. This will become the new beginning of the
+ * log. The last event will jump to the log continuation extent from #1.
+ *
+ * 3. Queue a write to a new extent for the new beginnging of the log.
+ *
+ * 4. Drop lock and wait
+ *
+ * 5. Retake the lock.
+ *
+ * 6. Update the log_fnode to splice in the new beginning.
+ *
+ * 7. Write the new superblock.
+ *
+ * 8. Release the old log space. Clean up.
+ */
+void BlueFS::_compact_log_async(std::unique_lock<ceph::mutex>& l)
+{
+ dout(10) << __func__ << dendl;
+ File *log_file = log_writer->file.get();
+ ceph_assert(!new_log);
+ ceph_assert(!new_log_writer);
+
+ // create a new log [writer] so that we know compaction is in progress
+ // (see _should_compact_log)
+ new_log = new File;
+ new_log->fnode.ino = 0; // so that _flush_range won't try to log the fnode
+
+ // 0. wait for any racing flushes to complete. (We do not want to block
+ // in _flush_sync_log with jump_to set or else a racing thread might flush
+ // our entries and our jump_to update won't be correct.)
+ while (log_flushing) {
+ dout(10) << __func__ << " log is currently flushing, waiting" << dendl;
+ log_cond.wait(l);
+ }
+
+ vselector->sub_usage(log_file->vselector_hint, log_file->fnode);
+
+ // 1. allocate new log space and jump to it.
+ old_log_jump_to = log_file->fnode.get_allocated();
+ dout(10) << __func__ << " old_log_jump_to 0x" << std::hex << old_log_jump_to
+ << " need 0x" << (old_log_jump_to + cct->_conf->bluefs_max_log_runway) << std::dec << dendl;
+ int r = _allocate(vselector->select_prefer_bdev(log_file->vselector_hint),
+ cct->_conf->bluefs_max_log_runway,
+ &log_file->fnode);
+ ceph_assert(r == 0);
+ //adjust usage as flush below will need it
+ vselector->add_usage(log_file->vselector_hint, log_file->fnode);
+ dout(10) << __func__ << " log extents " << log_file->fnode.extents << dendl;
+
+ // update the log file change and log a jump to the offset where we want to
+ // write the new entries
+ log_t.op_file_update(log_file->fnode);
+ log_t.op_jump(log_seq, old_log_jump_to);
+
+ flush_bdev(); // FIXME?
+
+ _flush_and_sync_log(l, 0, old_log_jump_to);
+
+ // 2. prepare compacted log
+ bluefs_transaction_t t;
+ //avoid record two times in log_t and _compact_log_dump_metadata.
+ log_t.clear();
+ _compact_log_dump_metadata(&t, 0);
+
+ uint64_t max_alloc_size = std::max(alloc_size[BDEV_WAL],
+ std::max(alloc_size[BDEV_DB],
+ alloc_size[BDEV_SLOW]));
+
+ // conservative estimate for final encoded size
+ new_log_jump_to = round_up_to(t.op_bl.length() + super.block_size * 2,
+ max_alloc_size);
+ t.op_jump(log_seq, new_log_jump_to);
+
+ // allocate
+ //FIXME: check if we want DB here?
+ r = _allocate(BlueFS::BDEV_DB, new_log_jump_to,
+ &new_log->fnode);
+ ceph_assert(r == 0);
+
+ // we might have some more ops in log_t due to _allocate call
+ t.claim_ops(log_t);
+
+ bufferlist bl;
+ encode(t, bl);
+ _pad_bl(bl);
+
+ dout(10) << __func__ << " new_log_jump_to 0x" << std::hex << new_log_jump_to
+ << std::dec << dendl;
+
+ new_log_writer = _create_writer(new_log);
+ new_log_writer->append(bl);
+
+ // 3. flush
+ r = _flush(new_log_writer, true);
+ ceph_assert(r == 0);
+
+ // 4. wait
+ _flush_bdev_safely(new_log_writer);
+
+ // 5. update our log fnode
+ // discard first old_log_jump_to extents
+
+ dout(10) << __func__ << " remove 0x" << std::hex << old_log_jump_to << std::dec
+ << " of " << log_file->fnode.extents << dendl;
+ uint64_t discarded = 0;
+ mempool::bluefs::vector<bluefs_extent_t> old_extents;
+ while (discarded < old_log_jump_to) {
+ ceph_assert(!log_file->fnode.extents.empty());
+ bluefs_extent_t& e = log_file->fnode.extents.front();
+ bluefs_extent_t temp = e;
+ if (discarded + e.length <= old_log_jump_to) {
+ dout(10) << __func__ << " remove old log extent " << e << dendl;
+ discarded += e.length;
+ log_file->fnode.pop_front_extent();
+ } else {
+ dout(10) << __func__ << " remove front of old log extent " << e << dendl;
+ uint64_t drop = old_log_jump_to - discarded;
+ temp.length = drop;
+ e.offset += drop;
+ e.length -= drop;
+ discarded += drop;
+ dout(10) << __func__ << " kept " << e << " removed " << temp << dendl;
+ }
+ old_extents.push_back(temp);
+ }
+ auto from = log_file->fnode.extents.begin();
+ auto to = log_file->fnode.extents.end();
+ while (from != to) {
+ new_log->fnode.append_extent(*from);
+ ++from;
+ }
+
+ vselector->sub_usage(log_file->vselector_hint, log_file->fnode);
+
+ // clear the extents from old log file, they are added to new log
+ log_file->fnode.clear_extents();
+ // swap the log files. New log file is the log file now.
+ new_log->fnode.swap_extents(log_file->fnode);
+
+ log_writer->pos = log_writer->file->fnode.size =
+ log_writer->pos - old_log_jump_to + new_log_jump_to;
+
+ vselector->add_usage(log_file->vselector_hint, log_file->fnode);
+
+ // 6. write the super block to reflect the changes
+ dout(10) << __func__ << " writing super" << dendl;
+ super.log_fnode = log_file->fnode;
+ ++super.version;
+ _write_super(BDEV_DB);
+
+ lock.unlock();
+ flush_bdev();
+ lock.lock();
+
+ // 7. release old space
+ dout(10) << __func__ << " release old log extents " << old_extents << dendl;
+ for (auto& r : old_extents) {
+ pending_release[r.bdev].insert(r.offset, r.length);
+ }
+
+ // delete the new log, remove from the dirty files list
+ _close_writer(new_log_writer);
+ if (new_log->dirty_seq) {
+ ceph_assert(dirty_files.count(new_log->dirty_seq));
+ auto it = dirty_files[new_log->dirty_seq].iterator_to(*new_log);
+ dirty_files[new_log->dirty_seq].erase(it);
+ }
+ new_log_writer = nullptr;
+ new_log = nullptr;
+ log_cond.notify_all();
+
+ dout(10) << __func__ << " log extents " << log_file->fnode.extents << dendl;
+ logger->inc(l_bluefs_log_compactions);
+}
+
+void BlueFS::_pad_bl(bufferlist& bl)
+{
+ uint64_t partial = bl.length() % super.block_size;
+ if (partial) {
+ dout(10) << __func__ << " padding with 0x" << std::hex
+ << super.block_size - partial << " zeros" << std::dec << dendl;
+ bl.append_zero(super.block_size - partial);
+ }
+}
+
+
+int BlueFS::_flush_and_sync_log(std::unique_lock<ceph::mutex>& l,
+ uint64_t want_seq,
+ uint64_t jump_to)
+{
+ while (log_flushing) {
+ dout(10) << __func__ << " want_seq " << want_seq
+ << " log is currently flushing, waiting" << dendl;
+ ceph_assert(!jump_to);
+ log_cond.wait(l);
+ }
+ if (want_seq && want_seq <= log_seq_stable) {
+ dout(10) << __func__ << " want_seq " << want_seq << " <= log_seq_stable "
+ << log_seq_stable << ", done" << dendl;
+ ceph_assert(!jump_to);
+ return 0;
+ }
+ if (log_t.empty() && dirty_files.empty()) {
+ dout(10) << __func__ << " want_seq " << want_seq
+ << " " << log_t << " not dirty, dirty_files empty, no-op" << dendl;
+ ceph_assert(!jump_to);
+ return 0;
+ }
+
+ vector<interval_set<uint64_t>> to_release(pending_release.size());
+ to_release.swap(pending_release);
+
+ uint64_t seq = log_t.seq = ++log_seq;
+ ceph_assert(want_seq == 0 || want_seq <= seq);
+ log_t.uuid = super.uuid;
+
+ // log dirty files
+ auto lsi = dirty_files.find(seq);
+ if (lsi != dirty_files.end()) {
+ dout(20) << __func__ << " " << lsi->second.size() << " dirty_files" << dendl;
+ for (auto &f : lsi->second) {
+ dout(20) << __func__ << " op_file_update " << f.fnode << dendl;
+ log_t.op_file_update(f.fnode);
+ }
+ }
+
+ dout(10) << __func__ << " " << log_t << dendl;
+ ceph_assert(!log_t.empty());
+
+ // allocate some more space (before we run out)?
+ int64_t runway = log_writer->file->fnode.get_allocated() -
+ log_writer->get_effective_write_pos();
+ bool just_expanded_log = false;
+ if (runway < (int64_t)cct->_conf->bluefs_min_log_runway) {
+ dout(10) << __func__ << " allocating more log runway (0x"
+ << std::hex << runway << std::dec << " remaining)" << dendl;
+ while (new_log_writer) {
+ dout(10) << __func__ << " waiting for async compaction" << dendl;
+ log_cond.wait(l);
+ }
+ vselector->sub_usage(log_writer->file->vselector_hint, log_writer->file->fnode);
+ int r = _allocate(
+ vselector->select_prefer_bdev(log_writer->file->vselector_hint),
+ cct->_conf->bluefs_max_log_runway,
+ &log_writer->file->fnode);
+ ceph_assert(r == 0);
+ vselector->add_usage(log_writer->file->vselector_hint, log_writer->file->fnode);
+ log_t.op_file_update(log_writer->file->fnode);
+ just_expanded_log = true;
+ }
+
+ bufferlist bl;
+ bl.reserve(super.block_size);
+ encode(log_t, bl);
+ // pad to block boundary
+ size_t realign = super.block_size - (bl.length() % super.block_size);
+ if (realign && realign != super.block_size)
+ bl.append_zero(realign);
+
+ logger->inc(l_bluefs_logged_bytes, bl.length());
+
+ if (just_expanded_log) {
+ ceph_assert(bl.length() <= runway); // if we write this, we will have an unrecoverable data loss
+ }
+
+ log_writer->append(bl);
+
+ log_t.clear();
+ log_t.seq = 0; // just so debug output is less confusing
+ log_flushing = true;
+
+ int r = _flush(log_writer, true);
+ ceph_assert(r == 0);
+
+ if (jump_to) {
+ dout(10) << __func__ << " jumping log offset from 0x" << std::hex
+ << log_writer->pos << " -> 0x" << jump_to << std::dec << dendl;
+ log_writer->pos = jump_to;
+ vselector->sub_usage(log_writer->file->vselector_hint, log_writer->file->fnode.size);
+ log_writer->file->fnode.size = jump_to;
+ vselector->add_usage(log_writer->file->vselector_hint, log_writer->file->fnode.size);
+ }
+
+ _flush_bdev_safely(log_writer);
+
+ log_flushing = false;
+ log_cond.notify_all();
+
+ // clean dirty files
+ if (seq > log_seq_stable) {
+ log_seq_stable = seq;
+ dout(20) << __func__ << " log_seq_stable " << log_seq_stable << dendl;
+
+ auto p = dirty_files.begin();
+ while (p != dirty_files.end()) {
+ if (p->first > log_seq_stable) {
+ dout(20) << __func__ << " done cleaning up dirty files" << dendl;
+ break;
+ }
+
+ auto l = p->second.begin();
+ while (l != p->second.end()) {
+ File *file = &*l;
+ ceph_assert(file->dirty_seq > 0);
+ ceph_assert(file->dirty_seq <= log_seq_stable);
+ dout(20) << __func__ << " cleaned file " << file->fnode << dendl;
+ file->dirty_seq = 0;
+ p->second.erase(l++);
+ }
+
+ ceph_assert(p->second.empty());
+ dirty_files.erase(p++);
+ }
+ } else {
+ dout(20) << __func__ << " log_seq_stable " << log_seq_stable
+ << " already >= out seq " << seq
+ << ", we lost a race against another log flush, done" << dendl;
+ }
+
+ for (unsigned i = 0; i < to_release.size(); ++i) {
+ if (!to_release[i].empty()) {
+ /* OK, now we have the guarantee alloc[i] won't be null. */
+ int r = 0;
+ if (cct->_conf->bdev_enable_discard && cct->_conf->bdev_async_discard) {
+ r = bdev[i]->queue_discard(to_release[i]);
+ if (r == 0)
+ continue;
+ } else if (cct->_conf->bdev_enable_discard) {
+ for (auto p = to_release[i].begin(); p != to_release[i].end(); ++p) {
+ bdev[i]->discard(p.get_start(), p.get_len());
+ }
+ }
+ alloc[i]->release(to_release[i]);
+ }
+ }
+
+ _update_logger_stats();
+
+ return 0;
+}
+
+int BlueFS::_flush_range(FileWriter *h, uint64_t offset, uint64_t length)
+{
+ dout(10) << __func__ << " " << h << " pos 0x" << std::hex << h->pos
+ << " 0x" << offset << "~" << length << std::dec
+ << " to " << h->file->fnode << dendl;
+ ceph_assert(!h->file->deleted);
+ ceph_assert(h->file->num_readers.load() == 0);
+
+ h->buffer_appender.flush();
+
+ bool buffered;
+ if (h->file->fnode.ino == 1)
+ buffered = false;
+ else
+ buffered = cct->_conf->bluefs_buffered_io;
+
+ if (offset + length <= h->pos)
+ return 0;
+ if (offset < h->pos) {
+ length -= h->pos - offset;
+ offset = h->pos;
+ dout(10) << " still need 0x"
+ << std::hex << offset << "~" << length << std::dec
+ << dendl;
+ }
+ ceph_assert(offset <= h->file->fnode.size);
+
+ uint64_t allocated = h->file->fnode.get_allocated();
+ vselector->sub_usage(h->file->vselector_hint, h->file->fnode);
+ // do not bother to dirty the file if we are overwriting
+ // previously allocated extents.
+ bool must_dirty = false;
+ if (allocated < offset + length) {
+ // we should never run out of log space here; see the min runway check
+ // in _flush_and_sync_log.
+ ceph_assert(h->file->fnode.ino != 1);
+ int r = _allocate(vselector->select_prefer_bdev(h->file->vselector_hint),
+ offset + length - allocated,
+ &h->file->fnode);
+ if (r < 0) {
+ derr << __func__ << " allocated: 0x" << std::hex << allocated
+ << " offset: 0x" << offset << " length: 0x" << length << std::dec
+ << dendl;
+ vselector->add_usage(h->file->vselector_hint, h->file->fnode); // undo
+ ceph_abort_msg("bluefs enospc");
+ return r;
+ }
+ if (cct->_conf->bluefs_preextend_wal_files &&
+ h->writer_type == WRITER_WAL) {
+ // NOTE: this *requires* that rocksdb also has log recycling
+ // enabled and is therefore doing robust CRCs on the log
+ // records. otherwise, we will fail to reply the rocksdb log
+ // properly due to garbage on the device.
+ h->file->fnode.size = h->file->fnode.get_allocated();
+ dout(10) << __func__ << " extending WAL size to 0x" << std::hex
+ << h->file->fnode.size << std::dec << " to include allocated"
+ << dendl;
+ }
+ must_dirty = true;
+ }
+ if (h->file->fnode.size < offset + length) {
+ h->file->fnode.size = offset + length;
+ if (h->file->fnode.ino > 1) {
+ // we do not need to dirty the log file (or it's compacting
+ // replacement) when the file size changes because replay is
+ // smart enough to discover it on its own.
+ must_dirty = true;
+ }
+ }
+ if (must_dirty) {
+ h->file->fnode.mtime = ceph_clock_now();
+ ceph_assert(h->file->fnode.ino >= 1);
+ if (h->file->dirty_seq == 0) {
+ h->file->dirty_seq = log_seq + 1;
+ dirty_files[h->file->dirty_seq].push_back(*h->file);
+ dout(20) << __func__ << " dirty_seq = " << log_seq + 1
+ << " (was clean)" << dendl;
+ } else {
+ if (h->file->dirty_seq != log_seq + 1) {
+ // need re-dirty, erase from list first
+ ceph_assert(dirty_files.count(h->file->dirty_seq));
+ auto it = dirty_files[h->file->dirty_seq].iterator_to(*h->file);
+ dirty_files[h->file->dirty_seq].erase(it);
+ h->file->dirty_seq = log_seq + 1;
+ dirty_files[h->file->dirty_seq].push_back(*h->file);
+ dout(20) << __func__ << " dirty_seq = " << log_seq + 1
+ << " (was " << h->file->dirty_seq << ")" << dendl;
+ } else {
+ dout(20) << __func__ << " dirty_seq = " << log_seq + 1
+ << " (unchanged, do nothing) " << dendl;
+ }
+ }
+ }
+ dout(20) << __func__ << " file now " << h->file->fnode << dendl;
+
+ uint64_t x_off = 0;
+ auto p = h->file->fnode.seek(offset, &x_off);
+ ceph_assert(p != h->file->fnode.extents.end());
+ dout(20) << __func__ << " in " << *p << " x_off 0x"
+ << std::hex << x_off << std::dec << dendl;
+
+ unsigned partial = x_off & ~super.block_mask();
+ bufferlist bl;
+ if (partial) {
+ dout(20) << __func__ << " using partial tail 0x"
+ << std::hex << partial << std::dec << dendl;
+ ceph_assert(h->tail_block.length() == partial);
+ bl.claim_append_piecewise(h->tail_block);
+ x_off -= partial;
+ offset -= partial;
+ length += partial;
+ dout(20) << __func__ << " waiting for previous aio to complete" << dendl;
+ for (auto p : h->iocv) {
+ if (p) {
+ p->aio_wait();
+ }
+ }
+ }
+ if (length == partial + h->buffer.length()) {
+ bl.claim_append_piecewise(h->buffer);
+ } else {
+ bufferlist t;
+ h->buffer.splice(0, length, &t);
+ bl.claim_append_piecewise(t);
+ t.substr_of(h->buffer, length, h->buffer.length() - length);
+ h->buffer.swap(t);
+ dout(20) << " leaving 0x" << std::hex << h->buffer.length() << std::dec
+ << " unflushed" << dendl;
+ }
+ ceph_assert(bl.length() == length);
+
+ switch (h->writer_type) {
+ case WRITER_WAL:
+ logger->inc(l_bluefs_bytes_written_wal, length);
+ break;
+ case WRITER_SST:
+ logger->inc(l_bluefs_bytes_written_sst, length);
+ break;
+ }
+
+ dout(30) << "dump:\n";
+ bl.hexdump(*_dout);
+ *_dout << dendl;
+
+ h->pos = offset + length;
+ h->tail_block.clear();
+
+ uint64_t bloff = 0;
+ uint64_t bytes_written_slow = 0;
+ while (length > 0) {
+ uint64_t x_len = std::min(p->length - x_off, length);
+ bufferlist t;
+ t.substr_of(bl, bloff, x_len);
+ unsigned tail = x_len & ~super.block_mask();
+ if (tail) {
+ size_t zlen = super.block_size - tail;
+ dout(20) << __func__ << " caching tail of 0x"
+ << std::hex << tail
+ << " and padding block with 0x" << zlen
+ << std::dec << dendl;
+ h->tail_block.substr_of(bl, bl.length() - tail, tail);
+ if (h->file->fnode.ino > 1) {
+ // we are using the page_aligned_appender, and can safely use
+ // the tail of the raw buffer.
+ const bufferptr &last = t.back();
+ if (last.unused_tail_length() < zlen) {
+ derr << " wtf, last is " << last << " from " << t << dendl;
+ ceph_assert(last.unused_tail_length() >= zlen);
+ }
+ bufferptr z = last;
+ z.set_offset(last.offset() + last.length());
+ z.set_length(zlen);
+ z.zero();
+ t.append(z, 0, zlen);
+ } else {
+ t.append_zero(zlen);
+ }
+ }
+ if (cct->_conf->bluefs_sync_write) {
+ bdev[p->bdev]->write(p->offset + x_off, t, buffered, h->write_hint);
+ } else {
+ bdev[p->bdev]->aio_write(p->offset + x_off, t, h->iocv[p->bdev], buffered, h->write_hint);
+ }
+ h->dirty_devs[p->bdev] = true;
+ if (p->bdev == BDEV_SLOW) {
+ bytes_written_slow += t.length();
+ }
+
+ bloff += x_len;
+ length -= x_len;
+ ++p;
+ x_off = 0;
+ }
+ logger->inc(l_bluefs_bytes_written_slow, bytes_written_slow);
+ for (unsigned i = 0; i < MAX_BDEV; ++i) {
+ if (bdev[i]) {
+ if (h->iocv[i] && h->iocv[i]->has_pending_aios()) {
+ bdev[i]->aio_submit(h->iocv[i]);
+ }
+ }
+ }
+ vselector->add_usage(h->file->vselector_hint, h->file->fnode);
+ dout(20) << __func__ << " h " << h << " pos now 0x"
+ << std::hex << h->pos << std::dec << dendl;
+ return 0;
+}
+
+#ifdef HAVE_LIBAIO
+// we need to retire old completed aios so they don't stick around in
+// memory indefinitely (along with their bufferlist refs).
+void BlueFS::_claim_completed_aios(FileWriter *h, list<aio_t> *ls)
+{
+ for (auto p : h->iocv) {
+ if (p) {
+ ls->splice(ls->end(), p->running_aios);
+ }
+ }
+ dout(10) << __func__ << " got " << ls->size() << " aios" << dendl;
+}
+
+void BlueFS::wait_for_aio(FileWriter *h)
+{
+ // NOTE: this is safe to call without a lock, as long as our reference is
+ // stable.
+ dout(10) << __func__ << " " << h << dendl;
+ utime_t start = ceph_clock_now();
+ for (auto p : h->iocv) {
+ if (p) {
+ p->aio_wait();
+ }
+ }
+ dout(10) << __func__ << " " << h << " done in " << (ceph_clock_now() - start) << dendl;
+}
+#endif
+
+int BlueFS::_flush(FileWriter *h, bool force, std::unique_lock<ceph::mutex>& l)
+{
+ bool flushed = false;
+ int r = _flush(h, force, &flushed);
+ if (r == 0 && flushed) {
+ _maybe_compact_log(l);
+ }
+ return r;
+}
+
+int BlueFS::_flush(FileWriter *h, bool force, bool *flushed)
+{
+ h->buffer_appender.flush();
+ uint64_t length = h->buffer.length();
+ uint64_t offset = h->pos;
+ if (flushed) {
+ *flushed = false;
+ }
+ if (!force &&
+ length < cct->_conf->bluefs_min_flush_size) {
+ dout(10) << __func__ << " " << h << " ignoring, length " << length
+ << " < min_flush_size " << cct->_conf->bluefs_min_flush_size
+ << dendl;
+ return 0;
+ }
+ if (length == 0) {
+ dout(10) << __func__ << " " << h << " no dirty data on "
+ << h->file->fnode << dendl;
+ return 0;
+ }
+ dout(10) << __func__ << " " << h << " 0x"
+ << std::hex << offset << "~" << length << std::dec
+ << " to " << h->file->fnode << dendl;
+ ceph_assert(h->pos <= h->file->fnode.size);
+ int r = _flush_range(h, offset, length);
+ if (flushed) {
+ *flushed = true;
+ }
+ return r;
+}
+
+int BlueFS::_truncate(FileWriter *h, uint64_t offset)
+{
+ dout(10) << __func__ << " 0x" << std::hex << offset << std::dec
+ << " file " << h->file->fnode << dendl;
+ if (h->file->deleted) {
+ dout(10) << __func__ << " deleted, no-op" << dendl;
+ return 0;
+ }
+
+ // we never truncate internal log files
+ ceph_assert(h->file->fnode.ino > 1);
+
+ h->buffer_appender.flush();
+
+ // truncate off unflushed data?
+ if (h->pos < offset &&
+ h->pos + h->buffer.length() > offset) {
+ bufferlist t;
+ dout(20) << __func__ << " tossing out last " << offset - h->pos
+ << " unflushed bytes" << dendl;
+ t.substr_of(h->buffer, 0, offset - h->pos);
+ h->buffer.swap(t);
+ ceph_abort_msg("actually this shouldn't happen");
+ }
+ if (h->buffer.length()) {
+ int r = _flush(h, true);
+ if (r < 0)
+ return r;
+ }
+ if (offset == h->file->fnode.size) {
+ return 0; // no-op!
+ }
+ if (offset > h->file->fnode.size) {
+ ceph_abort_msg("truncate up not supported");
+ }
+ ceph_assert(h->file->fnode.size >= offset);
+ vselector->sub_usage(h->file->vselector_hint, h->file->fnode.size);
+ h->file->fnode.size = offset;
+ vselector->add_usage(h->file->vselector_hint, h->file->fnode.size);
+ log_t.op_file_update(h->file->fnode);
+ return 0;
+}
+
+int BlueFS::_fsync(FileWriter *h, std::unique_lock<ceph::mutex>& l)
+{
+ dout(10) << __func__ << " " << h << " " << h->file->fnode << dendl;
+ int r = _flush(h, true);
+ if (r < 0)
+ return r;
+ uint64_t old_dirty_seq = h->file->dirty_seq;
+
+ _flush_bdev_safely(h);
+
+ if (old_dirty_seq) {
+ uint64_t s = log_seq;
+ dout(20) << __func__ << " file metadata was dirty (" << old_dirty_seq
+ << ") on " << h->file->fnode << ", flushing log" << dendl;
+ _flush_and_sync_log(l, old_dirty_seq);
+ ceph_assert(h->file->dirty_seq == 0 || // cleaned
+ h->file->dirty_seq > s); // or redirtied by someone else
+ }
+ return 0;
+}
+
+void BlueFS::_flush_bdev_safely(FileWriter *h)
+{
+ std::array<bool, MAX_BDEV> flush_devs = h->dirty_devs;
+ h->dirty_devs.fill(false);
+#ifdef HAVE_LIBAIO
+ if (!cct->_conf->bluefs_sync_write) {
+ list<aio_t> completed_ios;
+ _claim_completed_aios(h, &completed_ios);
+ lock.unlock();
+ wait_for_aio(h);
+ completed_ios.clear();
+ flush_bdev(flush_devs);
+ lock.lock();
+ } else
+#endif
+ {
+ lock.unlock();
+ flush_bdev(flush_devs);
+ lock.lock();
+ }
+}
+
+void BlueFS::flush_bdev(std::array<bool, MAX_BDEV>& dirty_bdevs)
+{
+ // NOTE: this is safe to call without a lock.
+ dout(20) << __func__ << dendl;
+ for (unsigned i = 0; i < MAX_BDEV; i++) {
+ if (dirty_bdevs[i])
+ bdev[i]->flush();
+ }
+}
+
+void BlueFS::flush_bdev()
+{
+ // NOTE: this is safe to call without a lock.
+ dout(20) << __func__ << dendl;
+ for (auto p : bdev) {
+ if (p)
+ p->flush();
+ }
+}
+
+const char* BlueFS::get_device_name(unsigned id)
+{
+ if (id >= MAX_BDEV) return "BDEV_INV";
+ const char* names[] = {"BDEV_WAL", "BDEV_DB", "BDEV_SLOW", "BDEV_NEWWAL", "BDEV_NEWDB"};
+ return names[id];
+}
+
+int BlueFS::_expand_slow_device(uint64_t need, PExtentVector& extents)
+{
+ int r = -ENOSPC;
+ if (slow_dev_expander) {
+ auto id = _get_slow_device_id();
+ auto min_alloc_size = alloc_size[id];
+ ceph_assert(id <= alloc.size() && alloc[id]);
+ auto min_need = round_up_to(need, min_alloc_size);
+ need = std::max(need,
+ slow_dev_expander->get_recommended_expansion_delta(
+ alloc[id]->get_free(), block_all[id].size()));
+
+ need = round_up_to(need, min_alloc_size);
+ dout(10) << __func__ << " expanding slow device by 0x"
+ << std::hex << need << std::dec
+ << dendl;
+ r = slow_dev_expander->allocate_freespace(min_need, need, extents);
+ }
+ return r;
+}
+
+int BlueFS::_allocate_without_fallback(uint8_t id, uint64_t len,
+ PExtentVector* extents)
+{
+ dout(10) << __func__ << " len 0x" << std::hex << len << std::dec
+ << " from " << (int)id << dendl;
+ assert(id < alloc.size());
+ if (!alloc[id]) {
+ return -ENOENT;
+ }
+ extents->reserve(4); // 4 should be (more than) enough for most allocations
+ uint64_t min_alloc_size = alloc_size[id];
+ uint64_t left = round_up_to(len, min_alloc_size);
+ int64_t alloc_len = alloc[id]->allocate(left, min_alloc_size, 0, extents);
+ if (alloc_len < 0 || alloc_len < (int64_t)left) {
+ if (alloc_len > 0) {
+ alloc[id]->release(*extents);
+ }
+ if (bdev[id])
+ derr << __func__ << " failed to allocate 0x" << std::hex << left
+ << " on bdev " << (int)id
+ << ", free 0x" << alloc[id]->get_free() << std::dec << dendl;
+ else
+ derr << __func__ << " failed to allocate 0x" << std::hex << left
+ << " on bdev " << (int)id << ", dne" << std::dec << dendl;
+ if (alloc[id])
+ alloc[id]->dump();
+ return -ENOSPC;
+ }
+
+ return 0;
+}
+
+int BlueFS::_allocate(uint8_t id, uint64_t len,
+ bluefs_fnode_t* node)
+{
+ dout(10) << __func__ << " len 0x" << std::hex << len << std::dec
+ << " from " << (int)id << dendl;
+ ceph_assert(id < alloc.size());
+ int64_t alloc_len = 0;
+ PExtentVector extents;
+ uint64_t hint = 0;
+ if (alloc[id]) {
+ if (!node->extents.empty() && node->extents.back().bdev == id) {
+ hint = node->extents.back().end();
+ }
+ extents.reserve(4); // 4 should be (more than) enough for most allocations
+ alloc_len = alloc[id]->allocate(round_up_to(len, alloc_size[id]),
+ alloc_size[id], hint, &extents);
+ }
+ if (!alloc[id] ||
+ alloc_len < 0 ||
+ alloc_len < (int64_t)round_up_to(len, alloc_size[id])) {
+ if (alloc_len > 0) {
+ alloc[id]->release(extents);
+ }
+ if (id != BDEV_SLOW) {
+ if (bdev[id]) {
+ dout(1) << __func__ << " failed to allocate 0x" << std::hex << len
+ << " on bdev " << (int)id
+ << ", free 0x" << alloc[id]->get_free()
+ << "; fallback to bdev " << (int)id + 1
+ << std::dec << dendl;
+ }
+ return _allocate(id + 1, len, node);
+ }
+ dout(1) << __func__ << " unable to allocate 0x" << std::hex << len
+ << " on bdev " << (int)id << ", free 0x"
+ << (alloc[id] ? alloc[id]->get_free() : (uint64_t)-1)
+ << "; fallback to slow device expander "
+ << std::dec << dendl;
+ extents.clear();
+ if (_expand_slow_device(len, extents) == 0) {
+ id = _get_slow_device_id();
+ for (auto& e : extents) {
+ _add_block_extent(id, e.offset, e.length);
+ }
+ extents.clear();
+ auto* last_alloc = alloc[id];
+ ceph_assert(last_alloc);
+ // try again
+ alloc_len = last_alloc->allocate(round_up_to(len, alloc_size[id]),
+ alloc_size[id], hint, &extents);
+ if (alloc_len < 0 || alloc_len < (int64_t)len) {
+ if (alloc_len > 0) {
+ last_alloc->release(extents);
+ }
+ derr << __func__ << " failed to allocate 0x" << std::hex << len
+ << " on bdev " << (int)id
+ << ", free 0x" << last_alloc->get_free() << std::dec << dendl;
+ return -ENOSPC;
+ }
+ } else {
+ derr << __func__ << " failed to expand slow device to fit +0x"
+ << std::hex << len << std::dec
+ << dendl;
+ return -ENOSPC;
+ }
+ } else {
+ uint64_t total_allocated =
+ block_all[id].size() - alloc[id]->get_free();
+ if (max_bytes[id] < total_allocated) {
+ logger->set(max_bytes_pcounters[id], total_allocated);
+ max_bytes[id] = total_allocated;
+ }
+ }
+
+ for (auto& p : extents) {
+ node->append_extent(bluefs_extent_t(id, p.offset, p.length));
+ }
+
+ return 0;
+}
+
+int BlueFS::_preallocate(FileRef f, uint64_t off, uint64_t len)
+{
+ dout(10) << __func__ << " file " << f->fnode << " 0x"
+ << std::hex << off << "~" << len << std::dec << dendl;
+ if (f->deleted) {
+ dout(10) << __func__ << " deleted, no-op" << dendl;
+ return 0;
+ }
+ ceph_assert(f->fnode.ino > 1);
+ uint64_t allocated = f->fnode.get_allocated();
+ if (off + len > allocated) {
+ uint64_t want = off + len - allocated;
+ vselector->sub_usage(f->vselector_hint, f->fnode);
+
+ int r = _allocate(vselector->select_prefer_bdev(f->vselector_hint),
+ want,
+ &f->fnode);
+ vselector->add_usage(f->vselector_hint, f->fnode);
+ if (r < 0)
+ return r;
+ log_t.op_file_update(f->fnode);
+ }
+ return 0;
+}
+
+void BlueFS::sync_metadata(bool avoid_compact)
+{
+ std::unique_lock l(lock);
+ if (log_t.empty() && dirty_files.empty()) {
+ dout(10) << __func__ << " - no pending log events" << dendl;
+ } else {
+ dout(10) << __func__ << dendl;
+ utime_t start = ceph_clock_now();
+ flush_bdev(); // FIXME?
+ _flush_and_sync_log(l);
+ dout(10) << __func__ << " done in " << (ceph_clock_now() - start) << dendl;
+ }
+
+ if (!avoid_compact) {
+ _maybe_compact_log(l);
+ }
+}
+
+void BlueFS::_maybe_compact_log(std::unique_lock<ceph::mutex>& l)
+{
+ if (!cct->_conf->bluefs_replay_recovery_disable_compact &&
+ _should_compact_log()) {
+ if (cct->_conf->bluefs_compact_log_sync) {
+ _compact_log_sync();
+ } else {
+ _compact_log_async(l);
+ }
+ }
+}
+
+int BlueFS::open_for_write(
+ const string& dirname,
+ const string& filename,
+ FileWriter **h,
+ bool overwrite)
+{
+ std::lock_guard l(lock);
+ dout(10) << __func__ << " " << dirname << "/" << filename << dendl;
+ map<string,DirRef>::iterator p = dir_map.find(dirname);
+ DirRef dir;
+ if (p == dir_map.end()) {
+ // implicitly create the dir
+ dout(20) << __func__ << " dir " << dirname
+ << " does not exist" << dendl;
+ return -ENOENT;
+ } else {
+ dir = p->second;
+ }
+
+ FileRef file;
+ bool create = false;
+ map<string,FileRef>::iterator q = dir->file_map.find(filename);
+ if (q == dir->file_map.end()) {
+ if (overwrite) {
+ dout(20) << __func__ << " dir " << dirname << " (" << dir
+ << ") file " << filename
+ << " does not exist" << dendl;
+ return -ENOENT;
+ }
+ file = new File;
+ file->fnode.ino = ++ino_last;
+ file_map[ino_last] = file;
+ dir->file_map[filename] = file;
+ ++file->refs;
+ create = true;
+ } else {
+ // overwrite existing file?
+ file = q->second;
+ if (overwrite) {
+ dout(20) << __func__ << " dir " << dirname << " (" << dir
+ << ") file " << filename
+ << " already exists, overwrite in place" << dendl;
+ } else {
+ dout(20) << __func__ << " dir " << dirname << " (" << dir
+ << ") file " << filename
+ << " already exists, truncate + overwrite" << dendl;
+ vselector->sub_usage(file->vselector_hint, file->fnode);
+ file->fnode.size = 0;
+ for (auto& p : file->fnode.extents) {
+ pending_release[p.bdev].insert(p.offset, p.length);
+ }
+
+ file->fnode.clear_extents();
+ }
+ }
+ ceph_assert(file->fnode.ino > 1);
+
+ file->fnode.mtime = ceph_clock_now();
+ file->vselector_hint = vselector->get_hint_by_dir(dirname);
+
+ dout(20) << __func__ << " mapping " << dirname << "/" << filename
+ << " vsel_hint " << file->vselector_hint
+ << dendl;
+
+ log_t.op_file_update(file->fnode);
+ if (create)
+ log_t.op_dir_link(dirname, filename, file->fnode.ino);
+
+ *h = _create_writer(file);
+
+ if (boost::algorithm::ends_with(filename, ".log")) {
+ (*h)->writer_type = BlueFS::WRITER_WAL;
+ if (logger && !overwrite) {
+ logger->inc(l_bluefs_files_written_wal);
+ }
+ } else if (boost::algorithm::ends_with(filename, ".sst")) {
+ (*h)->writer_type = BlueFS::WRITER_SST;
+ if (logger) {
+ logger->inc(l_bluefs_files_written_sst);
+ }
+ }
+
+ dout(10) << __func__ << " h " << *h << " on " << file->fnode << dendl;
+ return 0;
+}
+
+BlueFS::FileWriter *BlueFS::_create_writer(FileRef f)
+{
+ FileWriter *w = new FileWriter(f);
+ for (unsigned i = 0; i < MAX_BDEV; ++i) {
+ if (bdev[i]) {
+ w->iocv[i] = new IOContext(cct, NULL);
+ }
+ }
+ return w;
+}
+
+void BlueFS::_close_writer(FileWriter *h)
+{
+ dout(10) << __func__ << " " << h << " type " << h->writer_type << dendl;
+ h->buffer.reassign_to_mempool(mempool::mempool_bluefs_file_writer);
+ for (unsigned i=0; i<MAX_BDEV; ++i) {
+ if (bdev[i]) {
+ if (h->iocv[i]) {
+ h->iocv[i]->aio_wait();
+ bdev[i]->queue_reap_ioc(h->iocv[i]);
+ }
+ }
+ }
+ delete h;
+}
+
+int BlueFS::open_for_read(
+ const string& dirname,
+ const string& filename,
+ FileReader **h,
+ bool random)
+{
+ std::lock_guard l(lock);
+ dout(10) << __func__ << " " << dirname << "/" << filename
+ << (random ? " (random)":" (sequential)") << dendl;
+ map<string,DirRef>::iterator p = dir_map.find(dirname);
+ if (p == dir_map.end()) {
+ dout(20) << __func__ << " dir " << dirname << " not found" << dendl;
+ return -ENOENT;
+ }
+ DirRef dir = p->second;
+
+ map<string,FileRef>::iterator q = dir->file_map.find(filename);
+ if (q == dir->file_map.end()) {
+ dout(20) << __func__ << " dir " << dirname << " (" << dir
+ << ") file " << filename
+ << " not found" << dendl;
+ return -ENOENT;
+ }
+ File *file = q->second.get();
+
+ *h = new FileReader(file, random ? 4096 : cct->_conf->bluefs_max_prefetch,
+ random, false);
+ dout(10) << __func__ << " h " << *h << " on " << file->fnode << dendl;
+ return 0;
+}
+
+int BlueFS::rename(
+ const string& old_dirname, const string& old_filename,
+ const string& new_dirname, const string& new_filename)
+{
+ std::lock_guard l(lock);
+ dout(10) << __func__ << " " << old_dirname << "/" << old_filename
+ << " -> " << new_dirname << "/" << new_filename << dendl;
+ map<string,DirRef>::iterator p = dir_map.find(old_dirname);
+ if (p == dir_map.end()) {
+ dout(20) << __func__ << " dir " << old_dirname << " not found" << dendl;
+ return -ENOENT;
+ }
+ DirRef old_dir = p->second;
+ map<string,FileRef>::iterator q = old_dir->file_map.find(old_filename);
+ if (q == old_dir->file_map.end()) {
+ dout(20) << __func__ << " dir " << old_dirname << " (" << old_dir
+ << ") file " << old_filename
+ << " not found" << dendl;
+ return -ENOENT;
+ }
+ FileRef file = q->second;
+
+ p = dir_map.find(new_dirname);
+ if (p == dir_map.end()) {
+ dout(20) << __func__ << " dir " << new_dirname << " not found" << dendl;
+ return -ENOENT;
+ }
+ DirRef new_dir = p->second;
+ q = new_dir->file_map.find(new_filename);
+ if (q != new_dir->file_map.end()) {
+ dout(20) << __func__ << " dir " << new_dirname << " (" << old_dir
+ << ") file " << new_filename
+ << " already exists, unlinking" << dendl;
+ ceph_assert(q->second != file);
+ log_t.op_dir_unlink(new_dirname, new_filename);
+ _drop_link(q->second);
+ }
+
+ dout(10) << __func__ << " " << new_dirname << "/" << new_filename << " "
+ << " " << file->fnode << dendl;
+
+ new_dir->file_map[new_filename] = file;
+ old_dir->file_map.erase(old_filename);
+
+ log_t.op_dir_link(new_dirname, new_filename, file->fnode.ino);
+ log_t.op_dir_unlink(old_dirname, old_filename);
+ return 0;
+}
+
+int BlueFS::mkdir(const string& dirname)
+{
+ std::lock_guard l(lock);
+ dout(10) << __func__ << " " << dirname << dendl;
+ map<string,DirRef>::iterator p = dir_map.find(dirname);
+ if (p != dir_map.end()) {
+ dout(20) << __func__ << " dir " << dirname << " exists" << dendl;
+ return -EEXIST;
+ }
+ dir_map[dirname] = new Dir;
+ log_t.op_dir_create(dirname);
+ return 0;
+}
+
+int BlueFS::rmdir(const string& dirname)
+{
+ std::lock_guard l(lock);
+ dout(10) << __func__ << " " << dirname << dendl;
+ map<string,DirRef>::iterator p = dir_map.find(dirname);
+ if (p == dir_map.end()) {
+ dout(20) << __func__ << " dir " << dirname << " does not exist" << dendl;
+ return -ENOENT;
+ }
+ DirRef dir = p->second;
+ if (!dir->file_map.empty()) {
+ dout(20) << __func__ << " dir " << dirname << " not empty" << dendl;
+ return -ENOTEMPTY;
+ }
+ dir_map.erase(dirname);
+ log_t.op_dir_remove(dirname);
+ return 0;
+}
+
+bool BlueFS::dir_exists(const string& dirname)
+{
+ std::lock_guard l(lock);
+ map<string,DirRef>::iterator p = dir_map.find(dirname);
+ bool exists = p != dir_map.end();
+ dout(10) << __func__ << " " << dirname << " = " << (int)exists << dendl;
+ return exists;
+}
+
+int BlueFS::stat(const string& dirname, const string& filename,
+ uint64_t *size, utime_t *mtime)
+{
+ std::lock_guard l(lock);
+ dout(10) << __func__ << " " << dirname << "/" << filename << dendl;
+ map<string,DirRef>::iterator p = dir_map.find(dirname);
+ if (p == dir_map.end()) {
+ dout(20) << __func__ << " dir " << dirname << " not found" << dendl;
+ return -ENOENT;
+ }
+ DirRef dir = p->second;
+ map<string,FileRef>::iterator q = dir->file_map.find(filename);
+ if (q == dir->file_map.end()) {
+ dout(20) << __func__ << " dir " << dirname << " (" << dir
+ << ") file " << filename
+ << " not found" << dendl;
+ return -ENOENT;
+ }
+ File *file = q->second.get();
+ dout(10) << __func__ << " " << dirname << "/" << filename
+ << " " << file->fnode << dendl;
+ if (size)
+ *size = file->fnode.size;
+ if (mtime)
+ *mtime = file->fnode.mtime;
+ return 0;
+}
+
+int BlueFS::lock_file(const string& dirname, const string& filename,
+ FileLock **plock)
+{
+ std::lock_guard l(lock);
+ dout(10) << __func__ << " " << dirname << "/" << filename << dendl;
+ map<string,DirRef>::iterator p = dir_map.find(dirname);
+ if (p == dir_map.end()) {
+ dout(20) << __func__ << " dir " << dirname << " not found" << dendl;
+ return -ENOENT;
+ }
+ DirRef dir = p->second;
+ map<string,FileRef>::iterator q = dir->file_map.find(filename);
+ File *file;
+ if (q == dir->file_map.end()) {
+ dout(20) << __func__ << " dir " << dirname << " (" << dir
+ << ") file " << filename
+ << " not found, creating" << dendl;
+ file = new File;
+ file->fnode.ino = ++ino_last;
+ file->fnode.mtime = ceph_clock_now();
+ file_map[ino_last] = file;
+ dir->file_map[filename] = file;
+ ++file->refs;
+ log_t.op_file_update(file->fnode);
+ log_t.op_dir_link(dirname, filename, file->fnode.ino);
+ } else {
+ file = q->second.get();
+ if (file->locked) {
+ dout(10) << __func__ << " already locked" << dendl;
+ return -ENOLCK;
+ }
+ }
+ file->locked = true;
+ *plock = new FileLock(file);
+ dout(10) << __func__ << " locked " << file->fnode
+ << " with " << *plock << dendl;
+ return 0;
+}
+
+int BlueFS::unlock_file(FileLock *fl)
+{
+ std::lock_guard l(lock);
+ dout(10) << __func__ << " " << fl << " on " << fl->file->fnode << dendl;
+ ceph_assert(fl->file->locked);
+ fl->file->locked = false;
+ delete fl;
+ return 0;
+}
+
+int BlueFS::readdir(const string& dirname, vector<string> *ls)
+{
+ std::lock_guard l(lock);
+ dout(10) << __func__ << " " << dirname << dendl;
+ if (dirname.empty()) {
+ // list dirs
+ ls->reserve(dir_map.size() + 2);
+ for (auto& q : dir_map) {
+ ls->push_back(q.first);
+ }
+ } else {
+ // list files in dir
+ map<string,DirRef>::iterator p = dir_map.find(dirname);
+ if (p == dir_map.end()) {
+ dout(20) << __func__ << " dir " << dirname << " not found" << dendl;
+ return -ENOENT;
+ }
+ DirRef dir = p->second;
+ ls->reserve(dir->file_map.size() + 2);
+ for (auto& q : dir->file_map) {
+ ls->push_back(q.first);
+ }
+ }
+ ls->push_back(".");
+ ls->push_back("..");
+ return 0;
+}
+
+int BlueFS::unlink(const string& dirname, const string& filename)
+{
+ std::lock_guard l(lock);
+ dout(10) << __func__ << " " << dirname << "/" << filename << dendl;
+ map<string,DirRef>::iterator p = dir_map.find(dirname);
+ if (p == dir_map.end()) {
+ dout(20) << __func__ << " dir " << dirname << " not found" << dendl;
+ return -ENOENT;
+ }
+ DirRef dir = p->second;
+ map<string,FileRef>::iterator q = dir->file_map.find(filename);
+ if (q == dir->file_map.end()) {
+ dout(20) << __func__ << " file " << dirname << "/" << filename
+ << " not found" << dendl;
+ return -ENOENT;
+ }
+ FileRef file = q->second;
+ if (file->locked) {
+ dout(20) << __func__ << " file " << dirname << "/" << filename
+ << " is locked" << dendl;
+ return -EBUSY;
+ }
+ dir->file_map.erase(filename);
+ log_t.op_dir_unlink(dirname, filename);
+ _drop_link(file);
+ return 0;
+}
+
+bool BlueFS::wal_is_rotational()
+{
+ if (bdev[BDEV_WAL]) {
+ return bdev[BDEV_WAL]->is_rotational();
+ } else if (bdev[BDEV_DB]) {
+ return bdev[BDEV_DB]->is_rotational();
+ }
+ return bdev[BDEV_SLOW]->is_rotational();
+}
+
+/*
+ Algorithm.
+ do_replay_recovery_read is used when bluefs log abruptly ends, but it seems that more data should be there.
+ Idea is to search disk for definiton of extents that will be accompanied with bluefs log in future,
+ and try if using it will produce healthy bluefs transaction.
+ We encode already known bluefs log extents and search disk for these bytes.
+ When we find it, we decode following bytes as extent.
+ We read that whole extent and then check if merged with existing log part gives a proper bluefs transaction.
+ */
+int BlueFS::do_replay_recovery_read(FileReader *log_reader,
+ size_t replay_pos,
+ size_t read_offset,
+ size_t read_len,
+ bufferlist* bl) {
+ dout(1) << __func__ << " replay_pos=0x" << std::hex << replay_pos <<
+ " needs 0x" << read_offset << "~" << read_len << std::dec << dendl;
+
+ bluefs_fnode_t& log_fnode = log_reader->file->fnode;
+ bufferlist bin_extents;
+ encode(log_fnode.extents, bin_extents);
+ dout(2) << __func__ << " log file encoded extents length = " << bin_extents.length() << dendl;
+
+ // cannot process if too small to effectively search
+ ceph_assert(bin_extents.length() >= 32);
+ bufferlist last_32;
+ last_32.substr_of(bin_extents, bin_extents.length() - 32, 32);
+
+ //read fixed part from replay_pos to end of bluefs_log extents
+ bufferlist fixed;
+ uint64_t e_off = 0;
+ auto e = log_fnode.seek(replay_pos, &e_off);
+ ceph_assert(e != log_fnode.extents.end());
+ int r = bdev[e->bdev]->read(e->offset + e_off, e->length - e_off, &fixed, ioc[e->bdev],
+ cct->_conf->bluefs_buffered_io);
+ ceph_assert(r == 0);
+ //capture dev of last good extent
+ uint8_t last_e_dev = e->bdev;
+ uint64_t last_e_off = e->offset;
+ ++e;
+ while (e != log_fnode.extents.end()) {
+ r = bdev[e->bdev]->read(e->offset, e->length, &fixed, ioc[e->bdev],
+ cct->_conf->bluefs_buffered_io);
+ ceph_assert(r == 0);
+ last_e_dev = e->bdev;
+ ++e;
+ }
+ ceph_assert(replay_pos + fixed.length() == read_offset);
+
+ dout(2) << __func__ << " valid data in log = " << fixed.length() << dendl;
+
+ struct compare {
+ bool operator()(const bluefs_extent_t& a, const bluefs_extent_t& b) const {
+ if (a.bdev < b.bdev) return true;
+ if (a.offset < b.offset) return true;
+ return a.length < b.length;
+ }
+ };
+ std::set<bluefs_extent_t, compare> extents_rejected;
+ for (int dcnt = 0; dcnt < 3; dcnt++) {
+ uint8_t dev = (last_e_dev + dcnt) % MAX_BDEV;
+ if (bdev[dev] == nullptr) continue;
+ dout(2) << __func__ << " processing " << get_device_name(dev) << dendl;
+ interval_set<uint64_t> disk_regions;
+ disk_regions.insert(0, bdev[dev]->get_size());
+ for (auto f : file_map) {
+ auto& e = f.second->fnode.extents;
+ for (auto& p : e) {
+ if (p.bdev == dev) {
+ disk_regions.erase(p.offset, p.length);
+ }
+ }
+ }
+ size_t disk_regions_count = disk_regions.num_intervals();
+ dout(5) << __func__ << " " << disk_regions_count << " regions to scan on " << get_device_name(dev) << dendl;
+
+ auto reg = disk_regions.lower_bound(last_e_off);
+ //for all except first, start from beginning
+ last_e_off = 0;
+ if (reg == disk_regions.end()) {
+ reg = disk_regions.begin();
+ }
+ const uint64_t chunk_size = 4 * 1024 * 1024;
+ const uint64_t page_size = 4096;
+ const uint64_t max_extent_size = 16;
+ uint64_t overlay_size = last_32.length() + max_extent_size;
+ for (size_t i = 0; i < disk_regions_count; reg++, i++) {
+ if (reg == disk_regions.end()) {
+ reg = disk_regions.begin();
+ }
+ uint64_t pos = reg.get_start();
+ uint64_t len = reg.get_len();
+
+ std::unique_ptr<char[]> raw_data_p{new char[page_size + chunk_size]};
+ char* raw_data = raw_data_p.get();
+ memset(raw_data, 0, page_size);
+
+ while (len > last_32.length()) {
+ uint64_t chunk_len = len > chunk_size ? chunk_size : len;
+ dout(5) << __func__ << " read "
+ << get_device_name(dev) << ":0x" << std::hex << pos << "+" << chunk_len << std::dec << dendl;
+ r = bdev[dev]->read_random(pos, chunk_len, raw_data + page_size, cct->_conf->bluefs_buffered_io);
+ ceph_assert(r == 0);
+
+ //search for fixed_last_32
+ char* chunk_b = raw_data + page_size;
+ char* chunk_e = chunk_b + chunk_len;
+
+ char* search_b = chunk_b - overlay_size;
+ char* search_e = chunk_e;
+
+ for (char* sp = search_b; ; sp += last_32.length()) {
+ sp = (char*)memmem(sp, search_e - sp, last_32.c_str(), last_32.length());
+ if (sp == nullptr) {
+ break;
+ }
+
+ char* n = sp + last_32.length();
+ dout(5) << __func__ << " checking location 0x" << std::hex << pos + (n - chunk_b) << std::dec << dendl;
+ bufferlist test;
+ test.append(n, std::min<size_t>(max_extent_size, chunk_e - n));
+ bluefs_extent_t ne;
+ try {
+ bufferlist::const_iterator p = test.begin();
+ decode(ne, p);
+ } catch (buffer::error& e) {
+ continue;
+ }
+ if (extents_rejected.count(ne) != 0) {
+ dout(5) << __func__ << " extent " << ne << " already refected" <<dendl;
+ continue;
+ }
+ //insert as rejected already. if we succeed, it wouldn't make difference.
+ extents_rejected.insert(ne);
+
+ if (ne.bdev >= MAX_BDEV ||
+ bdev[ne.bdev] == nullptr ||
+ ne.length > 16 * 1024 * 1024 ||
+ (ne.length & 4095) != 0 ||
+ ne.offset + ne.length > bdev[ne.bdev]->get_size() ||
+ (ne.offset & 4095) != 0) {
+ dout(5) << __func__ << " refusing extent " << ne << dendl;
+ continue;
+ }
+ dout(5) << __func__ << " checking extent " << ne << dendl;
+
+ //read candidate extent - whole
+ bufferlist candidate;
+ candidate.append(fixed);
+ r = bdev[ne.bdev]->read(ne.offset, ne.length, &candidate, ioc[ne.bdev],
+ cct->_conf->bluefs_buffered_io);
+ ceph_assert(r == 0);
+
+ //check if transaction & crc is ok
+ bluefs_transaction_t t;
+ try {
+ bufferlist::const_iterator p = candidate.begin();
+ decode(t, p);
+ }
+ catch (buffer::error& e) {
+ dout(5) << __func__ << " failed match" << dendl;
+ continue;
+ }
+
+ //success, it seems a probable candidate
+ uint64_t l = std::min<uint64_t>(ne.length, read_len);
+ //trim to required size
+ bufferlist requested_read;
+ requested_read.substr_of(candidate, fixed.length(), l);
+ bl->append(requested_read);
+ dout(5) << __func__ << " successful extension of log " << l << "/" << read_len << dendl;
+ log_fnode.append_extent(ne);
+ log_fnode.recalc_allocated();
+ log_reader->buf.pos += l;
+ return l;
+ }
+ //save overlay for next search
+ memcpy(search_b, chunk_e - overlay_size, overlay_size);
+ pos += chunk_len;
+ len -= chunk_len;
+ }
+ }
+ }
+ return 0;
+}
+
+// ===============================================
+// OriginalVolumeSelector
+
+void* OriginalVolumeSelector::get_hint_by_device(uint8_t dev) const {
+ return reinterpret_cast<void*>(dev);
+}
+void* OriginalVolumeSelector::get_hint_by_dir(const string& dirname) const {
+ uint8_t res = BlueFS::BDEV_DB;
+ if (dirname.length() > 5) {
+ // the "db.slow" and "db.wal" directory names are hard-coded at
+ // match up with bluestore. the slow device is always the second
+ // one (when a dedicated block.db device is present and used at
+ // bdev 0). the wal device is always last.
+ if (boost::algorithm::ends_with(dirname, ".slow")) {
+ res = BlueFS::BDEV_SLOW;
+ }
+ else if (boost::algorithm::ends_with(dirname, ".wal")) {
+ res = BlueFS::BDEV_WAL;
+ }
+ }
+ return reinterpret_cast<void*>(res);
+}
+
+uint8_t OriginalVolumeSelector::select_prefer_bdev(void* hint)
+{
+ return (uint8_t)(reinterpret_cast<uint64_t>(hint));
+}
+
+void OriginalVolumeSelector::get_paths(const std::string& base, paths& res) const
+{
+ res.emplace_back(base, db_total);
+ res.emplace_back(base + ".slow", slow_total);
+}
+
+#undef dout_prefix
+#define dout_prefix *_dout << "OriginalVolumeSelector: "
+
+void OriginalVolumeSelector::dump(ostream& sout) {
+ sout<< "wal_total:" << wal_total
+ << ", db_total:" << db_total
+ << ", slow_total:" << slow_total
+ << std::endl;
+}