summaryrefslogtreecommitdiffstats
path: root/src/blk
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-21 11:54:28 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-21 11:54:28 +0000
commite6918187568dbd01842d8d1d2c808ce16a894239 (patch)
tree64f88b554b444a49f656b6c656111a145cbbaa28 /src/blk
parentInitial commit. (diff)
downloadceph-e6918187568dbd01842d8d1d2c808ce16a894239.tar.xz
ceph-e6918187568dbd01842d8d1d2c808ce16a894239.zip
Adding upstream version 18.2.2.upstream/18.2.2
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'src/blk')
-rw-r--r--src/blk/BlockDevice.cc211
-rw-r--r--src/blk/BlockDevice.h304
-rw-r--r--src/blk/CMakeLists.txt67
-rw-r--r--src/blk/aio/aio.cc124
-rw-r--r--src/blk/aio/aio.h159
-rw-r--r--src/blk/kernel/KernelDevice.cc1449
-rw-r--r--src/blk/kernel/KernelDevice.h156
-rw-r--r--src/blk/kernel/io_uring.cc264
-rw-r--r--src/blk/kernel/io_uring.h33
-rw-r--r--src/blk/pmem/PMEMDevice.cc378
-rw-r--r--src/blk/pmem/PMEMDevice.h78
-rw-r--r--src/blk/spdk/NVMEDevice.cc992
-rw-r--r--src/blk/spdk/NVMEDevice.h84
-rw-r--r--src/blk/zoned/HMSMRDevice.cc131
-rw-r--r--src/blk/zoned/HMSMRDevice.h52
15 files changed, 4482 insertions, 0 deletions
diff --git a/src/blk/BlockDevice.cc b/src/blk/BlockDevice.cc
new file mode 100644
index 000000000..fd07e443c
--- /dev/null
+++ b/src/blk/BlockDevice.cc
@@ -0,0 +1,211 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2015 XSky <haomai@xsky.com>
+ *
+ * Author: Haomai Wang <haomaiwang@gmail.com>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+#include <libgen.h>
+#include <unistd.h>
+
+#include "BlockDevice.h"
+
+#if defined(HAVE_LIBAIO) || defined(HAVE_POSIXAIO)
+#include "kernel/KernelDevice.h"
+#endif
+
+#if defined(HAVE_SPDK)
+#include "spdk/NVMEDevice.h"
+#endif
+
+#if defined(HAVE_BLUESTORE_PMEM)
+#include "pmem/PMEMDevice.h"
+#endif
+
+#if defined(HAVE_LIBZBD)
+#include "zoned/HMSMRDevice.h"
+#endif
+
+#include "common/debug.h"
+#include "common/EventTrace.h"
+#include "common/errno.h"
+#include "include/compat.h"
+
+#define dout_context cct
+#define dout_subsys ceph_subsys_bdev
+#undef dout_prefix
+#define dout_prefix *_dout << "bdev "
+
+using std::string;
+
+
+blk_access_mode_t buffermode(bool buffered)
+{
+ return buffered ? blk_access_mode_t::BUFFERED : blk_access_mode_t::DIRECT;
+}
+
+std::ostream& operator<<(std::ostream& os, const blk_access_mode_t buffered)
+{
+ os << (buffered == blk_access_mode_t::BUFFERED ? "(buffered)" : "(direct)");
+ return os;
+}
+
+
+
+void IOContext::aio_wait()
+{
+ std::unique_lock l(lock);
+ // see _aio_thread for waker logic
+ while (num_running.load() > 0) {
+ dout(10) << __func__ << " " << this
+ << " waiting for " << num_running.load() << " aios to complete"
+ << dendl;
+ cond.wait(l);
+ }
+ dout(20) << __func__ << " " << this << " done" << dendl;
+}
+
+uint64_t IOContext::get_num_ios() const
+{
+ // this is about the simplest model for transaction cost you can
+ // imagine. there is some fixed overhead cost by saying there is a
+ // minimum of one "io". and then we have some cost per "io" that is
+ // a configurable (with different hdd and ssd defaults), and add
+ // that to the bytes value.
+ uint64_t ios = 0;
+#if defined(HAVE_LIBAIO) || defined(HAVE_POSIXAIO)
+ ios += pending_aios.size();
+#endif
+#ifdef HAVE_SPDK
+ ios += total_nseg;
+#endif
+ return ios;
+}
+
+void IOContext::release_running_aios()
+{
+ ceph_assert(!num_running);
+#if defined(HAVE_LIBAIO) || defined(HAVE_POSIXAIO)
+ // release aio contexts (including pinned buffers).
+ running_aios.clear();
+#endif
+}
+
+BlockDevice::block_device_t
+BlockDevice::detect_device_type(const std::string& path)
+{
+#if defined(HAVE_SPDK)
+ if (NVMEDevice::support(path)) {
+ return block_device_t::spdk;
+ }
+#endif
+#if defined(HAVE_BLUESTORE_PMEM)
+ if (PMEMDevice::support(path)) {
+ return block_device_t::pmem;
+ }
+#endif
+#if (defined(HAVE_LIBAIO) || defined(HAVE_POSIXAIO)) && defined(HAVE_LIBZBD)
+ if (HMSMRDevice::support(path)) {
+ return block_device_t::hm_smr;
+ }
+#endif
+#if defined(HAVE_LIBAIO) || defined(HAVE_POSIXAIO)
+ return block_device_t::aio;
+#else
+ return block_device_t::unknown;
+#endif
+}
+
+BlockDevice::block_device_t
+BlockDevice::device_type_from_name(const std::string& blk_dev_name)
+{
+#if defined(HAVE_LIBAIO) || defined(HAVE_POSIXAIO)
+ if (blk_dev_name == "aio") {
+ return block_device_t::aio;
+ }
+#endif
+#if defined(HAVE_SPDK)
+ if (blk_dev_name == "spdk") {
+ return block_device_t::spdk;
+ }
+#endif
+#if defined(HAVE_BLUESTORE_PMEM)
+ if (blk_dev_name == "pmem") {
+ return block_device_t::pmem;
+ }
+#endif
+#if (defined(HAVE_LIBAIO) || defined(HAVE_POSIXAIO)) && defined(HAVE_LIBZBD)
+ if (blk_dev_name == "hm_smr") {
+ return block_device_t::hm_smr;
+ }
+#endif
+ return block_device_t::unknown;
+}
+
+BlockDevice* BlockDevice::create_with_type(block_device_t device_type,
+ CephContext* cct, const std::string& path, aio_callback_t cb,
+ void *cbpriv, aio_callback_t d_cb, void *d_cbpriv)
+{
+
+ switch (device_type) {
+#if defined(HAVE_LIBAIO) || defined(HAVE_POSIXAIO)
+ case block_device_t::aio:
+ return new KernelDevice(cct, cb, cbpriv, d_cb, d_cbpriv);
+#endif
+#if defined(HAVE_SPDK)
+ case block_device_t::spdk:
+ return new NVMEDevice(cct, cb, cbpriv);
+#endif
+#if defined(HAVE_BLUESTORE_PMEM)
+ case block_device_t::pmem:
+ return new PMEMDevice(cct, cb, cbpriv);
+#endif
+#if (defined(HAVE_LIBAIO) || defined(HAVE_POSIXAIO)) && defined(HAVE_LIBZBD)
+ case block_device_t::hm_smr:
+ return new HMSMRDevice(cct, cb, cbpriv, d_cb, d_cbpriv);
+#endif
+ default:
+ ceph_abort_msg("unsupported device");
+ return nullptr;
+ }
+}
+
+BlockDevice *BlockDevice::create(
+ CephContext* cct, const string& path, aio_callback_t cb,
+ void *cbpriv, aio_callback_t d_cb, void *d_cbpriv)
+{
+ const string blk_dev_name = cct->_conf.get_val<string>("bdev_type");
+ block_device_t device_type = block_device_t::unknown;
+ if (blk_dev_name.empty()) {
+ device_type = detect_device_type(path);
+ } else {
+ device_type = device_type_from_name(blk_dev_name);
+ }
+ return create_with_type(device_type, cct, path, cb, cbpriv, d_cb, d_cbpriv);
+}
+
+bool BlockDevice::is_valid_io(uint64_t off, uint64_t len) const {
+ bool ret = (off % block_size == 0 &&
+ len % block_size == 0 &&
+ len > 0 &&
+ off < size &&
+ off + len <= size);
+
+ if (!ret) {
+ derr << __func__ << " " << std::hex
+ << off << "~" << len
+ << " block_size " << block_size
+ << " size " << size
+ << std::dec << dendl;
+ }
+ return ret;
+}
diff --git a/src/blk/BlockDevice.h b/src/blk/BlockDevice.h
new file mode 100644
index 000000000..440faf3d4
--- /dev/null
+++ b/src/blk/BlockDevice.h
@@ -0,0 +1,304 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2015 XSky <haomai@xsky.com>
+ *
+ * Author: Haomai Wang <haomaiwang@gmail.com>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+#ifndef CEPH_BLK_BLOCKDEVICE_H
+#define CEPH_BLK_BLOCKDEVICE_H
+
+#include <atomic>
+#include <condition_variable>
+#include <list>
+#include <map>
+#include <mutex>
+#include <set>
+#include <string>
+#include <vector>
+
+#include "acconfig.h"
+#include "common/ceph_mutex.h"
+#include "include/common_fwd.h"
+#include "extblkdev/ExtBlkDevInterface.h"
+
+#if defined(HAVE_LIBAIO) || defined(HAVE_POSIXAIO)
+#include "aio/aio.h"
+#endif
+#include "include/ceph_assert.h"
+#include "include/buffer.h"
+#include "include/interval_set.h"
+#define SPDK_PREFIX "spdk:"
+
+#if defined(__linux__)
+#if !defined(F_SET_FILE_RW_HINT)
+#define F_LINUX_SPECIFIC_BASE 1024
+#define F_SET_FILE_RW_HINT (F_LINUX_SPECIFIC_BASE + 14)
+#endif
+// These values match Linux definition
+// https://git.kernel.org/pub/scm/linux/kernel/git/torvalds/linux.git/tree/include/uapi/linux/fcntl.h#n56
+#define WRITE_LIFE_NOT_SET 0 // No hint information set
+#define WRITE_LIFE_NONE 1 // No hints about write life time
+#define WRITE_LIFE_SHORT 2 // Data written has a short life time
+#define WRITE_LIFE_MEDIUM 3 // Data written has a medium life time
+#define WRITE_LIFE_LONG 4 // Data written has a long life time
+#define WRITE_LIFE_EXTREME 5 // Data written has an extremely long life time
+#define WRITE_LIFE_MAX 6
+#else
+// On systems don't have WRITE_LIFE_* only use one FD
+// And all files are created equal
+#define WRITE_LIFE_NOT_SET 0 // No hint information set
+#define WRITE_LIFE_NONE 0 // No hints about write life time
+#define WRITE_LIFE_SHORT 0 // Data written has a short life time
+#define WRITE_LIFE_MEDIUM 0 // Data written has a medium life time
+#define WRITE_LIFE_LONG 0 // Data written has a long life time
+#define WRITE_LIFE_EXTREME 0 // Data written has an extremely long life time
+#define WRITE_LIFE_MAX 1
+#endif
+
+enum struct blk_access_mode_t {
+ DIRECT,
+ BUFFERED
+};
+blk_access_mode_t buffermode(bool buffered);
+std::ostream& operator<<(std::ostream& os, const blk_access_mode_t buffered);
+
+/// track in-flight io
+struct IOContext {
+ enum {
+ FLAG_DONT_CACHE = 1
+ };
+
+private:
+ ceph::mutex lock = ceph::make_mutex("IOContext::lock");
+ ceph::condition_variable cond;
+ int r = 0;
+
+public:
+ CephContext* cct;
+ void *priv;
+#ifdef HAVE_SPDK
+ void *nvme_task_first = nullptr;
+ void *nvme_task_last = nullptr;
+ std::atomic_int total_nseg = {0};
+#endif
+
+#if defined(HAVE_LIBAIO) || defined(HAVE_POSIXAIO)
+ std::list<aio_t> pending_aios; ///< not yet submitted
+ std::list<aio_t> running_aios; ///< submitting or submitted
+#endif
+ std::atomic_int num_pending = {0};
+ std::atomic_int num_running = {0};
+ bool allow_eio;
+ uint32_t flags = 0; // FLAG_*
+
+ explicit IOContext(CephContext* cct, void *p, bool allow_eio = false)
+ : cct(cct), priv(p), allow_eio(allow_eio)
+ {}
+
+ // no copying
+ IOContext(const IOContext& other) = delete;
+ IOContext &operator=(const IOContext& other) = delete;
+
+ bool has_pending_aios() {
+ return num_pending.load();
+ }
+ void release_running_aios();
+ void aio_wait();
+ uint64_t get_num_ios() const;
+
+ void try_aio_wake() {
+ assert(num_running >= 1);
+
+ std::lock_guard l(lock);
+ if (num_running.fetch_sub(1) == 1) {
+
+ // we might have some pending IOs submitted after the check
+ // as there is no lock protection for aio_submit.
+ // Hence we might have false conditional trigger.
+ // aio_wait has to handle that hence do not care here.
+ cond.notify_all();
+ }
+ }
+
+ void set_return_value(int _r) {
+ r = _r;
+ }
+
+ int get_return_value() const {
+ return r;
+ }
+
+ bool skip_cache() const {
+ return flags & FLAG_DONT_CACHE;
+ }
+};
+
+
+class BlockDevice {
+public:
+ CephContext* cct;
+ typedef void (*aio_callback_t)(void *handle, void *aio);
+private:
+ ceph::mutex ioc_reap_lock = ceph::make_mutex("BlockDevice::ioc_reap_lock");
+ std::vector<IOContext*> ioc_reap_queue;
+ std::atomic_int ioc_reap_count = {0};
+ enum class block_device_t {
+ unknown,
+#if defined(HAVE_LIBAIO) || defined(HAVE_POSIXAIO)
+ aio,
+#if defined(HAVE_LIBZBD)
+ hm_smr,
+#endif
+#endif
+#if defined(HAVE_SPDK)
+ spdk,
+#endif
+#if defined(HAVE_BLUESTORE_PMEM)
+ pmem,
+#endif
+ };
+ static block_device_t detect_device_type(const std::string& path);
+ static block_device_t device_type_from_name(const std::string& blk_dev_name);
+ static BlockDevice *create_with_type(block_device_t device_type,
+ CephContext* cct, const std::string& path, aio_callback_t cb,
+ void *cbpriv, aio_callback_t d_cb, void *d_cbpriv);
+
+protected:
+ uint64_t size = 0;
+ uint64_t block_size = 0;
+ uint64_t optimal_io_size = 0;
+ bool support_discard = false;
+ bool rotational = true;
+ bool lock_exclusive = true;
+
+ // HM-SMR specific properties. In HM-SMR drives the LBA space is divided into
+ // fixed-size zones. Typically, the first few zones are randomly writable;
+ // they form a conventional region of the drive. The remaining zones must be
+ // written sequentially and they must be reset before rewritten. For example,
+ // a 14 TB HGST HSH721414AL drive has 52156 zones each of size is 256 MiB.
+ // The zones 0-523 are randomly writable and they form the conventional region
+ // of the drive. The zones 524-52155 are sequential zones.
+ uint64_t conventional_region_size = 0;
+ uint64_t zone_size = 0;
+
+public:
+ aio_callback_t aio_callback;
+ void *aio_callback_priv;
+ BlockDevice(CephContext* cct, aio_callback_t cb, void *cbpriv)
+ : cct(cct),
+ aio_callback(cb),
+ aio_callback_priv(cbpriv)
+ {}
+ virtual ~BlockDevice() = default;
+
+ static BlockDevice *create(
+ CephContext* cct, const std::string& path, aio_callback_t cb, void *cbpriv, aio_callback_t d_cb, void *d_cbpriv);
+ virtual bool supported_bdev_label() { return true; }
+ virtual bool is_rotational() { return rotational; }
+
+ // HM-SMR-specific calls
+ virtual bool is_smr() const { return false; }
+ virtual uint64_t get_zone_size() const {
+ ceph_assert(is_smr());
+ return zone_size;
+ }
+ virtual uint64_t get_conventional_region_size() const {
+ ceph_assert(is_smr());
+ return conventional_region_size;
+ }
+ virtual void reset_all_zones() {
+ ceph_assert(is_smr());
+ }
+ virtual void reset_zone(uint64_t zone) {
+ ceph_assert(is_smr());
+ }
+ virtual std::vector<uint64_t> get_zones() {
+ ceph_assert(is_smr());
+ return std::vector<uint64_t>();
+ }
+
+ virtual void aio_submit(IOContext *ioc) = 0;
+
+ void set_no_exclusive_lock() {
+ lock_exclusive = false;
+ }
+
+ uint64_t get_size() const { return size; }
+ uint64_t get_block_size() const { return block_size; }
+ uint64_t get_optimal_io_size() const { return optimal_io_size; }
+
+ /// hook to provide utilization of thinly-provisioned device
+ virtual int get_ebd_state(ExtBlkDevState &state) const {
+ return -ENOENT;
+ }
+
+ virtual int collect_metadata(const std::string& prefix, std::map<std::string,std::string> *pm) const = 0;
+
+ virtual int get_devname(std::string *out) const {
+ return -ENOENT;
+ }
+ virtual int get_devices(std::set<std::string> *ls) const {
+ std::string s;
+ if (get_devname(&s) == 0) {
+ ls->insert(s);
+ }
+ return 0;
+ }
+ virtual int get_numa_node(int *node) const {
+ return -EOPNOTSUPP;
+ }
+
+ virtual int read(
+ uint64_t off,
+ uint64_t len,
+ ceph::buffer::list *pbl,
+ IOContext *ioc,
+ bool buffered) = 0;
+ virtual int read_random(
+ uint64_t off,
+ uint64_t len,
+ char *buf,
+ bool buffered) = 0;
+ virtual int write(
+ uint64_t off,
+ ceph::buffer::list& bl,
+ bool buffered,
+ int write_hint = WRITE_LIFE_NOT_SET) = 0;
+
+ virtual int aio_read(
+ uint64_t off,
+ uint64_t len,
+ ceph::buffer::list *pbl,
+ IOContext *ioc) = 0;
+ virtual int aio_write(
+ uint64_t off,
+ ceph::buffer::list& bl,
+ IOContext *ioc,
+ bool buffered,
+ int write_hint = WRITE_LIFE_NOT_SET) = 0;
+ virtual int flush() = 0;
+ virtual bool try_discard(interval_set<uint64_t> &to_release, bool async=true) { return false; }
+ virtual void discard_drain() { return; }
+
+ // for managing buffered readers/writers
+ virtual int invalidate_cache(uint64_t off, uint64_t len) = 0;
+ virtual int open(const std::string& path) = 0;
+ virtual void close() = 0;
+
+ struct hugepaged_raw_marker_t {};
+
+protected:
+ bool is_valid_io(uint64_t off, uint64_t len) const;
+};
+
+#endif //CEPH_BLK_BLOCKDEVICE_H
diff --git a/src/blk/CMakeLists.txt b/src/blk/CMakeLists.txt
new file mode 100644
index 000000000..288955dd0
--- /dev/null
+++ b/src/blk/CMakeLists.txt
@@ -0,0 +1,67 @@
+if(WITH_BLUESTORE OR WITH_RBD_SSD_CACHE)
+list(APPEND libblk_srcs
+ BlockDevice.cc)
+endif()
+
+if(HAVE_LIBAIO OR HAVE_POSIXAIO)
+ list(APPEND libblk_srcs
+ kernel/KernelDevice.cc
+ kernel/io_uring.cc
+ aio/aio.cc)
+endif()
+
+if(WITH_BLUESTORE_PMEM)
+ list(APPEND libblk_srcs
+ pmem/PMEMDevice.cc)
+endif()
+
+if(WITH_SPDK)
+ list(APPEND libblk_srcs
+ spdk/NVMEDevice.cc)
+endif()
+
+if(WITH_ZBD)
+ list(APPEND libblk_srcs
+ zoned/HMSMRDevice.cc)
+endif()
+
+if(libblk_srcs)
+ add_library(blk STATIC ${libblk_srcs})
+ target_include_directories(blk PRIVATE "./")
+endif()
+
+if(HAVE_LIBAIO)
+ target_link_libraries(blk PUBLIC ${AIO_LIBRARIES} extblkdev)
+endif(HAVE_LIBAIO)
+
+if(WITH_SPDK)
+ target_link_libraries(blk
+ PRIVATE spdk::spdk)
+endif()
+
+if(WITH_ZBD)
+ target_link_libraries(blk PRIVATE ${ZBD_LIBRARIES})
+endif()
+
+if(WITH_BLUESTORE_PMEM)
+ if(HAVE_LIBDML)
+ target_link_libraries(blk PRIVATE dml::dml dml::dmlhl)
+ endif()
+
+ target_link_libraries(blk
+ PRIVATE pmdk::pmem)
+endif()
+
+if(WITH_EVENTTRACE)
+ add_dependencies(blk eventtrace_tp)
+endif()
+
+if(WITH_LIBURING)
+ if(WITH_SYSTEM_LIBURING)
+ find_package(uring REQUIRED)
+ else()
+ include(Builduring)
+ build_uring()
+ endif()
+ target_link_libraries(blk PRIVATE uring::uring)
+endif()
diff --git a/src/blk/aio/aio.cc b/src/blk/aio/aio.cc
new file mode 100644
index 000000000..00a12bfd1
--- /dev/null
+++ b/src/blk/aio/aio.cc
@@ -0,0 +1,124 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include <algorithm>
+#include "aio.h"
+
+std::ostream& operator<<(std::ostream& os, const aio_t& aio)
+{
+ unsigned i = 0;
+ os << "aio: ";
+ for (auto& iov : aio.iov) {
+ os << "\n [" << i++ << "] 0x"
+ << std::hex << iov.iov_base << "~" << iov.iov_len << std::dec;
+ }
+ return os;
+}
+
+int aio_queue_t::submit_batch(aio_iter begin, aio_iter end,
+ uint16_t aios_size, void *priv,
+ int *retries)
+{
+ // 2^16 * 125us = ~8 seconds, so max sleep is ~16 seconds
+ int attempts = 16;
+ int delay = 125;
+ int r;
+
+ aio_iter cur = begin;
+ struct aio_t *piocb[aios_size];
+ int left = 0;
+ while (cur != end) {
+ cur->priv = priv;
+ *(piocb+left) = &(*cur);
+ ++left;
+ ++cur;
+ }
+ ceph_assert(aios_size >= left);
+ int done = 0;
+ while (left > 0) {
+#if defined(HAVE_LIBAIO)
+ r = io_submit(ctx, std::min(left, max_iodepth), (struct iocb**)(piocb + done));
+#elif defined(HAVE_POSIXAIO)
+ if (piocb[done]->n_aiocb == 1) {
+ // TODO: consider batching multiple reads together with lio_listio
+ piocb[done]->aio.aiocb.aio_sigevent.sigev_notify = SIGEV_KEVENT;
+ piocb[done]->aio.aiocb.aio_sigevent.sigev_notify_kqueue = ctx;
+ piocb[done]->aio.aiocb.aio_sigevent.sigev_value.sival_ptr = piocb[done];
+ r = aio_read(&piocb[done]->aio.aiocb);
+ } else {
+ struct sigevent sev;
+ sev.sigev_notify = SIGEV_KEVENT;
+ sev.sigev_notify_kqueue = ctx;
+ sev.sigev_value.sival_ptr = piocb[done];
+ r = lio_listio(LIO_NOWAIT, &piocb[done]->aio.aiocbp, piocb[done]->n_aiocb, &sev);
+ }
+#endif
+ if (r < 0) {
+ if (r == -EAGAIN && attempts-- > 0) {
+ usleep(delay);
+ delay *= 2;
+ (*retries)++;
+ continue;
+ }
+ return r;
+ }
+ ceph_assert(r > 0);
+ done += r;
+ left -= r;
+ attempts = 16;
+ delay = 125;
+ }
+ return done;
+}
+
+int aio_queue_t::get_next_completed(int timeout_ms, aio_t **paio, int max)
+{
+#if defined(HAVE_LIBAIO)
+ io_event events[max];
+#elif defined(HAVE_POSIXAIO)
+ struct kevent events[max];
+#endif
+ struct timespec t = {
+ timeout_ms / 1000,
+ (timeout_ms % 1000) * 1000 * 1000
+ };
+
+ int r = 0;
+ do {
+#if defined(HAVE_LIBAIO)
+ r = io_getevents(ctx, 1, max, events, &t);
+#elif defined(HAVE_POSIXAIO)
+ r = kevent(ctx, NULL, 0, events, max, &t);
+ if (r < 0)
+ r = -errno;
+#endif
+ } while (r == -EINTR);
+
+ for (int i=0; i<r; ++i) {
+#if defined(HAVE_LIBAIO)
+ paio[i] = (aio_t *)events[i].obj;
+ paio[i]->rval = events[i].res;
+#else
+ paio[i] = (aio_t*)events[i].udata;
+ if (paio[i]->n_aiocb == 1) {
+ paio[i]->rval = aio_return(&paio[i]->aio.aiocb);
+ } else {
+ // Emulate the return value of pwritev. I can't find any documentation
+ // for what the value of io_event.res is supposed to be. I'm going to
+ // assume that it's just like pwritev/preadv/pwrite/pread.
+ paio[i]->rval = 0;
+ for (int j = 0; j < paio[i]->n_aiocb; j++) {
+ int res = aio_return(&paio[i]->aio.aiocbp[j]);
+ if (res < 0) {
+ paio[i]->rval = res;
+ break;
+ } else {
+ paio[i]->rval += res;
+ }
+ }
+ free(paio[i]->aio.aiocbp);
+ }
+#endif
+ }
+ return r;
+}
diff --git a/src/blk/aio/aio.h b/src/blk/aio/aio.h
new file mode 100644
index 000000000..14b89784b
--- /dev/null
+++ b/src/blk/aio/aio.h
@@ -0,0 +1,159 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#pragma once
+
+#include "acconfig.h"
+
+#if defined(HAVE_LIBAIO)
+#include <libaio.h>
+#elif defined(HAVE_POSIXAIO)
+#include <aio.h>
+#include <sys/event.h>
+#endif
+
+#include <boost/intrusive/list.hpp>
+#include <boost/container/small_vector.hpp>
+
+#include "include/buffer.h"
+#include "include/types.h"
+
+struct aio_t {
+#if defined(HAVE_LIBAIO)
+ struct iocb iocb{}; // must be first element; see shenanigans in aio_queue_t
+#elif defined(HAVE_POSIXAIO)
+ // static long aio_listio_max = -1;
+ union {
+ struct aiocb aiocb;
+ struct aiocb *aiocbp;
+ } aio;
+ int n_aiocb;
+#endif
+ void *priv;
+ int fd;
+ boost::container::small_vector<iovec,4> iov;
+ uint64_t offset, length;
+ long rval;
+ ceph::buffer::list bl; ///< write payload (so that it remains stable for duration)
+
+ boost::intrusive::list_member_hook<> queue_item;
+
+ aio_t(void *p, int f) : priv(p), fd(f), offset(0), length(0), rval(-1000) {
+ }
+
+ void pwritev(uint64_t _offset, uint64_t len) {
+ offset = _offset;
+ length = len;
+#if defined(HAVE_LIBAIO)
+ io_prep_pwritev(&iocb, fd, &iov[0], iov.size(), offset);
+#elif defined(HAVE_POSIXAIO)
+ n_aiocb = iov.size();
+ aio.aiocbp = (struct aiocb*)calloc(iov.size(), sizeof(struct aiocb));
+ for (int i = 0; i < iov.size(); i++) {
+ aio.aiocbp[i].aio_fildes = fd;
+ aio.aiocbp[i].aio_offset = offset;
+ aio.aiocbp[i].aio_buf = iov[i].iov_base;
+ aio.aiocbp[i].aio_nbytes = iov[i].iov_len;
+ aio.aiocbp[i].aio_lio_opcode = LIO_WRITE;
+ offset += iov[i].iov_len;
+ }
+#endif
+ }
+
+ void preadv(uint64_t _offset, uint64_t len) {
+ offset = _offset;
+ length = len;
+#if defined(HAVE_LIBAIO)
+ io_prep_preadv(&iocb, fd, &iov[0], iov.size(), offset);
+#elif defined(HAVE_POSIXAIO)
+ n_aiocb = iov.size();
+ aio.aiocbp = (struct aiocb*)calloc(iov.size(), sizeof(struct aiocb));
+ for (size_t i = 0; i < iov.size(); i++) {
+ aio.aiocbp[i].aio_fildes = fd;
+ aio.aiocbp[i].aio_buf = iov[i].iov_base;
+ aio.aiocbp[i].aio_nbytes = iov[i].iov_len;
+ aio.aiocbp[i].aio_offset = offset;
+ aio.aiocbp[i].aio_lio_opcode = LIO_READ;
+ offset += iov[i].iov_len;
+ }
+#endif
+ }
+
+ long get_return_value() {
+ return rval;
+ }
+};
+
+std::ostream& operator<<(std::ostream& os, const aio_t& aio);
+
+typedef boost::intrusive::list<
+ aio_t,
+ boost::intrusive::member_hook<
+ aio_t,
+ boost::intrusive::list_member_hook<>,
+ &aio_t::queue_item> > aio_list_t;
+
+struct io_queue_t {
+ typedef std::list<aio_t>::iterator aio_iter;
+
+ virtual ~io_queue_t() {};
+
+ virtual int init(std::vector<int> &fds) = 0;
+ virtual void shutdown() = 0;
+ virtual int submit_batch(aio_iter begin, aio_iter end, uint16_t aios_size,
+ void *priv, int *retries) = 0;
+ virtual int get_next_completed(int timeout_ms, aio_t **paio, int max) = 0;
+};
+
+struct aio_queue_t final : public io_queue_t {
+ int max_iodepth;
+#if defined(HAVE_LIBAIO)
+ io_context_t ctx;
+#elif defined(HAVE_POSIXAIO)
+ int ctx;
+#endif
+
+ explicit aio_queue_t(unsigned max_iodepth)
+ : max_iodepth(max_iodepth),
+ ctx(0) {
+ }
+ ~aio_queue_t() final {
+ ceph_assert(ctx == 0);
+ }
+
+ int init(std::vector<int> &fds) final {
+ (void)fds;
+ ceph_assert(ctx == 0);
+#if defined(HAVE_LIBAIO)
+ int r = io_setup(max_iodepth, &ctx);
+ if (r < 0) {
+ if (ctx) {
+ io_destroy(ctx);
+ ctx = 0;
+ }
+ }
+ return r;
+#elif defined(HAVE_POSIXAIO)
+ ctx = kqueue();
+ if (ctx < 0)
+ return -errno;
+ else
+ return 0;
+#endif
+ }
+ void shutdown() final {
+ if (ctx) {
+#if defined(HAVE_LIBAIO)
+ int r = io_destroy(ctx);
+#elif defined(HAVE_POSIXAIO)
+ int r = close(ctx);
+#endif
+ ceph_assert(r == 0);
+ ctx = 0;
+ }
+ }
+
+ int submit_batch(aio_iter begin, aio_iter end, uint16_t aios_size,
+ void *priv, int *retries) final;
+ int get_next_completed(int timeout_ms, aio_t **paio, int max) final;
+};
diff --git a/src/blk/kernel/KernelDevice.cc b/src/blk/kernel/KernelDevice.cc
new file mode 100644
index 000000000..754b44d32
--- /dev/null
+++ b/src/blk/kernel/KernelDevice.cc
@@ -0,0 +1,1449 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2014 Red Hat
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+#include <limits>
+#include <unistd.h>
+#include <stdlib.h>
+#include <sys/types.h>
+#include <sys/stat.h>
+#include <fcntl.h>
+#include <sys/file.h>
+#include <sys/mman.h>
+
+#include <boost/container/flat_map.hpp>
+#include <boost/lockfree/queue.hpp>
+
+#include "KernelDevice.h"
+#include "include/buffer_raw.h"
+#include "include/intarith.h"
+#include "include/types.h"
+#include "include/compat.h"
+#include "include/stringify.h"
+#include "include/str_map.h"
+#include "common/blkdev.h"
+#include "common/buffer_instrumentation.h"
+#include "common/errno.h"
+#if defined(__FreeBSD__)
+#include "bsm/audit_errno.h"
+#endif
+#include "common/debug.h"
+#include "common/numa.h"
+
+#include "global/global_context.h"
+#include "io_uring.h"
+
+#define dout_context cct
+#define dout_subsys ceph_subsys_bdev
+#undef dout_prefix
+#define dout_prefix *_dout << "bdev(" << this << " " << path << ") "
+
+using std::list;
+using std::map;
+using std::string;
+using std::vector;
+
+using ceph::bufferlist;
+using ceph::bufferptr;
+using ceph::make_timespan;
+using ceph::mono_clock;
+using ceph::operator <<;
+
+KernelDevice::KernelDevice(CephContext* cct, aio_callback_t cb, void *cbpriv, aio_callback_t d_cb, void *d_cbpriv)
+ : BlockDevice(cct, cb, cbpriv),
+ aio(false), dio(false),
+ discard_callback(d_cb),
+ discard_callback_priv(d_cbpriv),
+ aio_stop(false),
+ discard_started(false),
+ discard_stop(false),
+ aio_thread(this),
+ discard_thread(this),
+ injecting_crash(0)
+{
+ fd_directs.resize(WRITE_LIFE_MAX, -1);
+ fd_buffereds.resize(WRITE_LIFE_MAX, -1);
+
+ bool use_ioring = cct->_conf.get_val<bool>("bdev_ioring");
+ unsigned int iodepth = cct->_conf->bdev_aio_max_queue_depth;
+
+ if (use_ioring && ioring_queue_t::supported()) {
+ bool use_ioring_hipri = cct->_conf.get_val<bool>("bdev_ioring_hipri");
+ bool use_ioring_sqthread_poll = cct->_conf.get_val<bool>("bdev_ioring_sqthread_poll");
+ io_queue = std::make_unique<ioring_queue_t>(iodepth, use_ioring_hipri, use_ioring_sqthread_poll);
+ } else {
+ static bool once;
+ if (use_ioring && !once) {
+ derr << "WARNING: io_uring API is not supported! Fallback to libaio!"
+ << dendl;
+ once = true;
+ }
+ io_queue = std::make_unique<aio_queue_t>(iodepth);
+ }
+}
+
+int KernelDevice::_lock()
+{
+ // When the block changes, systemd-udevd will open the block,
+ // read some information and close it. Then a failure occurs here.
+ // So we need to try again here.
+ int fd = fd_directs[WRITE_LIFE_NOT_SET];
+ dout(10) << __func__ << " fd=" << fd << dendl;
+ uint64_t nr_tries = 0;
+ for (;;) {
+ struct flock fl = { .l_type = F_WRLCK,
+ .l_whence = SEEK_SET };
+ int r = ::fcntl(fd, F_OFD_SETLK, &fl);
+ if (r < 0) {
+ if (errno == EINVAL) {
+ r = ::flock(fd, LOCK_EX | LOCK_NB);
+ }
+ }
+ if (r == 0) {
+ return 0;
+ }
+ if (errno != EAGAIN) {
+ return -errno;
+ }
+ dout(1) << __func__ << " flock busy on " << path << dendl;
+ if (const uint64_t max_retry =
+ cct->_conf.get_val<uint64_t>("bdev_flock_retry");
+ max_retry > 0 && nr_tries++ == max_retry) {
+ return -EAGAIN;
+ }
+ double retry_interval =
+ cct->_conf.get_val<double>("bdev_flock_retry_interval");
+ std::this_thread::sleep_for(ceph::make_timespan(retry_interval));
+ }
+}
+
+int KernelDevice::open(const string& p)
+{
+ path = p;
+ int r = 0, i = 0;
+ dout(1) << __func__ << " path " << path << dendl;
+
+ struct stat statbuf;
+ bool is_block;
+ r = stat(path.c_str(), &statbuf);
+ if (r != 0) {
+ derr << __func__ << " stat got: " << cpp_strerror(r) << dendl;
+ goto out_fail;
+ }
+ is_block = (statbuf.st_mode & S_IFMT) == S_IFBLK;
+ for (i = 0; i < WRITE_LIFE_MAX; i++) {
+ int flags = 0;
+ if (lock_exclusive && is_block && (i == 0)) {
+ // If opening block device use O_EXCL flag. It gives us best protection,
+ // as no other process can overwrite the data for as long as we are running.
+ // For block devices ::flock is not enough,
+ // since 2 different inodes with same major/minor can be locked.
+ // Exclusion by O_EXCL works in containers too.
+ flags |= O_EXCL;
+ }
+ int fd = ::open(path.c_str(), O_RDWR | O_DIRECT | flags);
+ if (fd < 0) {
+ r = -errno;
+ break;
+ }
+ fd_directs[i] = fd;
+
+ fd = ::open(path.c_str(), O_RDWR | O_CLOEXEC);
+ if (fd < 0) {
+ r = -errno;
+ break;
+ }
+ fd_buffereds[i] = fd;
+ }
+
+ if (i != WRITE_LIFE_MAX) {
+ derr << __func__ << " open got: " << cpp_strerror(r) << dendl;
+ goto out_fail;
+ }
+
+#if defined(F_SET_FILE_RW_HINT)
+ for (i = WRITE_LIFE_NONE; i < WRITE_LIFE_MAX; i++) {
+ if (fcntl(fd_directs[i], F_SET_FILE_RW_HINT, &i) < 0) {
+ r = -errno;
+ break;
+ }
+ if (fcntl(fd_buffereds[i], F_SET_FILE_RW_HINT, &i) < 0) {
+ r = -errno;
+ break;
+ }
+ }
+ if (i != WRITE_LIFE_MAX) {
+ enable_wrt = false;
+ dout(0) << "ioctl(F_SET_FILE_RW_HINT) on " << path << " failed: " << cpp_strerror(r) << dendl;
+ }
+#endif
+
+ dio = true;
+ aio = cct->_conf->bdev_aio;
+ if (!aio) {
+ ceph_abort_msg("non-aio not supported");
+ }
+
+ // disable readahead as it will wreak havoc on our mix of
+ // directio/aio and buffered io.
+ r = posix_fadvise(fd_buffereds[WRITE_LIFE_NOT_SET], 0, 0, POSIX_FADV_RANDOM);
+ if (r) {
+ r = -r;
+ derr << __func__ << " posix_fadvise got: " << cpp_strerror(r) << dendl;
+ goto out_fail;
+ }
+
+ if (lock_exclusive) {
+ // We need to keep soft locking (via flock()) because O_EXCL does not work for regular files.
+ // This is as good as we can get. Other processes can still overwrite the data,
+ // but at least we are protected from mounting same device twice in ceph processes.
+ // We also apply soft locking for block devices, as it populates /proc/locks. (see lslocks)
+ r = _lock();
+ if (r < 0) {
+ derr << __func__ << " failed to lock " << path << ": " << cpp_strerror(r)
+ << dendl;
+ goto out_fail;
+ }
+ }
+
+ struct stat st;
+ r = ::fstat(fd_directs[WRITE_LIFE_NOT_SET], &st);
+ if (r < 0) {
+ r = -errno;
+ derr << __func__ << " fstat got " << cpp_strerror(r) << dendl;
+ goto out_fail;
+ }
+
+ // Operate as though the block size is 4 KB. The backing file
+ // blksize doesn't strictly matter except that some file systems may
+ // require a read/modify/write if we write something smaller than
+ // it.
+ block_size = cct->_conf->bdev_block_size;
+ if (block_size != (unsigned)st.st_blksize) {
+ dout(1) << __func__ << " backing device/file reports st_blksize "
+ << st.st_blksize << ", using bdev_block_size "
+ << block_size << " anyway" << dendl;
+ }
+
+
+ {
+ BlkDev blkdev_direct(fd_directs[WRITE_LIFE_NOT_SET]);
+ BlkDev blkdev_buffered(fd_buffereds[WRITE_LIFE_NOT_SET]);
+
+ if (S_ISBLK(st.st_mode)) {
+ int64_t s;
+ r = blkdev_direct.get_size(&s);
+ if (r < 0) {
+ goto out_fail;
+ }
+ size = s;
+ } else {
+ size = st.st_size;
+ }
+
+ char partition[PATH_MAX], devname[PATH_MAX];
+ if ((r = blkdev_buffered.partition(partition, PATH_MAX)) ||
+ (r = blkdev_buffered.wholedisk(devname, PATH_MAX))) {
+ derr << "unable to get device name for " << path << ": "
+ << cpp_strerror(r) << dendl;
+ rotational = true;
+ } else {
+ dout(20) << __func__ << " devname " << devname << dendl;
+ rotational = blkdev_buffered.is_rotational();
+ support_discard = blkdev_buffered.support_discard();
+ optimal_io_size = blkdev_buffered.get_optimal_io_size();
+ this->devname = devname;
+ // check if any extended block device plugin recognizes this device
+ // detect_vdo has moved into the VDO plugin
+ int rc = extblkdev::detect_device(cct, devname, ebd_impl);
+ if (rc != 0) {
+ dout(20) << __func__ << " no plugin volume maps to " << devname << dendl;
+ }
+ }
+ }
+
+ r = _post_open();
+ if (r < 0) {
+ goto out_fail;
+ }
+
+ r = _aio_start();
+ if (r < 0) {
+ goto out_fail;
+ }
+ if (support_discard && cct->_conf->bdev_enable_discard && cct->_conf->bdev_async_discard) {
+ _discard_start();
+ }
+
+ // round size down to an even block
+ size &= ~(block_size - 1);
+
+ dout(1) << __func__
+ << " size " << size
+ << " (0x" << std::hex << size << std::dec << ", "
+ << byte_u_t(size) << ")"
+ << " block_size " << block_size
+ << " (" << byte_u_t(block_size) << ")"
+ << " " << (rotational ? "rotational device," : "non-rotational device,")
+ << " discard " << (support_discard ? "supported" : "not supported")
+ << dendl;
+ return 0;
+
+out_fail:
+ for (i = 0; i < WRITE_LIFE_MAX; i++) {
+ if (fd_directs[i] >= 0) {
+ VOID_TEMP_FAILURE_RETRY(::close(fd_directs[i]));
+ fd_directs[i] = -1;
+ } else {
+ break;
+ }
+ if (fd_buffereds[i] >= 0) {
+ VOID_TEMP_FAILURE_RETRY(::close(fd_buffereds[i]));
+ fd_buffereds[i] = -1;
+ } else {
+ break;
+ }
+ }
+ return r;
+}
+
+int KernelDevice::get_devices(std::set<std::string> *ls) const
+{
+ if (devname.empty()) {
+ return 0;
+ }
+ get_raw_devices(devname, ls);
+ return 0;
+}
+
+void KernelDevice::close()
+{
+ dout(1) << __func__ << dendl;
+ _aio_stop();
+ if (discard_thread.is_started()) {
+ _discard_stop();
+ }
+ _pre_close();
+
+ extblkdev::release_device(ebd_impl);
+
+ for (int i = 0; i < WRITE_LIFE_MAX; i++) {
+ assert(fd_directs[i] >= 0);
+ VOID_TEMP_FAILURE_RETRY(::close(fd_directs[i]));
+ fd_directs[i] = -1;
+
+ assert(fd_buffereds[i] >= 0);
+ VOID_TEMP_FAILURE_RETRY(::close(fd_buffereds[i]));
+ fd_buffereds[i] = -1;
+ }
+ path.clear();
+}
+
+int KernelDevice::collect_metadata(const string& prefix, map<string,string> *pm) const
+{
+ (*pm)[prefix + "support_discard"] = stringify((int)(bool)support_discard);
+ (*pm)[prefix + "rotational"] = stringify((int)(bool)rotational);
+ (*pm)[prefix + "size"] = stringify(get_size());
+ (*pm)[prefix + "block_size"] = stringify(get_block_size());
+ (*pm)[prefix + "optimal_io_size"] = stringify(get_optimal_io_size());
+ (*pm)[prefix + "driver"] = "KernelDevice";
+ if (rotational) {
+ (*pm)[prefix + "type"] = "hdd";
+ } else {
+ (*pm)[prefix + "type"] = "ssd";
+ }
+ // if compression device detected, collect meta data for device
+ // VDO specific meta data has moved into VDO plugin
+ if (ebd_impl) {
+ ebd_impl->collect_metadata(prefix, pm);
+ }
+
+ {
+ string res_names;
+ std::set<std::string> devnames;
+ if (get_devices(&devnames) == 0) {
+ for (auto& dev : devnames) {
+ if (!res_names.empty()) {
+ res_names += ",";
+ }
+ res_names += dev;
+ }
+ if (res_names.size()) {
+ (*pm)[prefix + "devices"] = res_names;
+ }
+ }
+ }
+
+ struct stat st;
+ int r = ::fstat(fd_buffereds[WRITE_LIFE_NOT_SET], &st);
+ if (r < 0)
+ return -errno;
+ if (S_ISBLK(st.st_mode)) {
+ (*pm)[prefix + "access_mode"] = "blk";
+
+ char buffer[1024] = {0};
+ BlkDev blkdev{fd_buffereds[WRITE_LIFE_NOT_SET]};
+ if (r = blkdev.partition(buffer, sizeof(buffer)); r) {
+ (*pm)[prefix + "partition_path"] = "unknown";
+ } else {
+ (*pm)[prefix + "partition_path"] = buffer;
+ }
+ buffer[0] = '\0';
+ if (r = blkdev.partition(buffer, sizeof(buffer)); r) {
+ (*pm)[prefix + "dev_node"] = "unknown";
+ } else {
+ (*pm)[prefix + "dev_node"] = buffer;
+ }
+ if (!r) {
+ return 0;
+ }
+ buffer[0] = '\0';
+ blkdev.model(buffer, sizeof(buffer));
+ (*pm)[prefix + "model"] = buffer;
+
+ buffer[0] = '\0';
+ blkdev.dev(buffer, sizeof(buffer));
+ (*pm)[prefix + "dev"] = buffer;
+
+ // nvme exposes a serial number
+ buffer[0] = '\0';
+ blkdev.serial(buffer, sizeof(buffer));
+ (*pm)[prefix + "serial"] = buffer;
+
+ // numa
+ int node;
+ r = blkdev.get_numa_node(&node);
+ if (r >= 0) {
+ (*pm)[prefix + "numa_node"] = stringify(node);
+ }
+ } else {
+ (*pm)[prefix + "access_mode"] = "file";
+ (*pm)[prefix + "path"] = path;
+ }
+ return 0;
+}
+
+int KernelDevice::get_ebd_state(ExtBlkDevState &state) const
+{
+ // use compression driver plugin to determine physical size and availability
+ // VDO specific get_thin_utilization has moved into VDO plugin
+ if (ebd_impl) {
+ return ebd_impl->get_state(state);
+ }
+ return -ENOENT;
+}
+
+int KernelDevice::choose_fd(bool buffered, int write_hint) const
+{
+#if defined(F_SET_FILE_RW_HINT)
+ if (!enable_wrt)
+ write_hint = WRITE_LIFE_NOT_SET;
+#else
+ // Without WRITE_LIFE capabilities, only one file is used.
+ // And rocksdb sets this value also to > 0, so we need to catch this here
+ // instead of trusting rocksdb to set write_hint.
+ write_hint = WRITE_LIFE_NOT_SET;
+#endif
+ return buffered ? fd_buffereds[write_hint] : fd_directs[write_hint];
+}
+
+int KernelDevice::flush()
+{
+ // protect flush with a mutex. note that we are not really protecting
+ // data here. instead, we're ensuring that if any flush() caller
+ // sees that io_since_flush is true, they block any racing callers
+ // until the flush is observed. that allows racing threads to be
+ // calling flush while still ensuring that *any* of them that got an
+ // aio completion notification will not return before that aio is
+ // stable on disk: whichever thread sees the flag first will block
+ // followers until the aio is stable.
+ std::lock_guard l(flush_mutex);
+
+ bool expect = true;
+ if (!io_since_flush.compare_exchange_strong(expect, false)) {
+ dout(10) << __func__ << " no-op (no ios since last flush), flag is "
+ << (int)io_since_flush.load() << dendl;
+ return 0;
+ }
+
+ dout(10) << __func__ << " start" << dendl;
+ if (cct->_conf->bdev_inject_crash) {
+ ++injecting_crash;
+ // sleep for a moment to give other threads a chance to submit or
+ // wait on io that races with a flush.
+ derr << __func__ << " injecting crash. first we sleep..." << dendl;
+ sleep(cct->_conf->bdev_inject_crash_flush_delay);
+ derr << __func__ << " and now we die" << dendl;
+ cct->_log->flush();
+ _exit(1);
+ }
+ utime_t start = ceph_clock_now();
+ int r = ::fdatasync(fd_directs[WRITE_LIFE_NOT_SET]);
+ utime_t end = ceph_clock_now();
+ utime_t dur = end - start;
+ if (r < 0) {
+ r = -errno;
+ derr << __func__ << " fdatasync got: " << cpp_strerror(r) << dendl;
+ ceph_abort();
+ }
+ dout(5) << __func__ << " in " << dur << dendl;;
+ return r;
+}
+
+int KernelDevice::_aio_start()
+{
+ if (aio) {
+ dout(10) << __func__ << dendl;
+ int r = io_queue->init(fd_directs);
+ if (r < 0) {
+ if (r == -EAGAIN) {
+ derr << __func__ << " io_setup(2) failed with EAGAIN; "
+ << "try increasing /proc/sys/fs/aio-max-nr" << dendl;
+ } else {
+ derr << __func__ << " io_setup(2) failed: " << cpp_strerror(r) << dendl;
+ }
+ return r;
+ }
+ aio_thread.create("bstore_aio");
+ }
+ return 0;
+}
+
+void KernelDevice::_aio_stop()
+{
+ if (aio) {
+ dout(10) << __func__ << dendl;
+ aio_stop = true;
+ aio_thread.join();
+ aio_stop = false;
+ io_queue->shutdown();
+ }
+}
+
+void KernelDevice::_discard_start()
+{
+ discard_thread.create("bstore_discard");
+}
+
+void KernelDevice::_discard_stop()
+{
+ dout(10) << __func__ << dendl;
+ {
+ std::unique_lock l(discard_lock);
+ while (!discard_started) {
+ discard_cond.wait(l);
+ }
+ discard_stop = true;
+ discard_cond.notify_all();
+ }
+ discard_thread.join();
+ {
+ std::lock_guard l(discard_lock);
+ discard_stop = false;
+ }
+ dout(10) << __func__ << " stopped" << dendl;
+}
+
+void KernelDevice::discard_drain()
+{
+ dout(10) << __func__ << dendl;
+ std::unique_lock l(discard_lock);
+ while (!discard_queued.empty() || discard_running) {
+ discard_cond.wait(l);
+ }
+}
+
+static bool is_expected_ioerr(const int r)
+{
+ // https://lxr.missinglinkelectronics.com/linux+v4.15/block/blk-core.c#L135
+ return (r == -EOPNOTSUPP || r == -ETIMEDOUT || r == -ENOSPC ||
+ r == -ENOLINK || r == -EREMOTEIO || r == -EAGAIN || r == -EIO ||
+ r == -ENODATA || r == -EILSEQ || r == -ENOMEM ||
+#if defined(__linux__)
+ r == -EREMCHG || r == -EBADE
+#elif defined(__FreeBSD__)
+ r == - BSM_ERRNO_EREMCHG || r == -BSM_ERRNO_EBADE
+#endif
+ );
+}
+
+void KernelDevice::_aio_thread()
+{
+ dout(10) << __func__ << " start" << dendl;
+ int inject_crash_count = 0;
+ while (!aio_stop) {
+ dout(40) << __func__ << " polling" << dendl;
+ int max = cct->_conf->bdev_aio_reap_max;
+ aio_t *aio[max];
+ int r = io_queue->get_next_completed(cct->_conf->bdev_aio_poll_ms,
+ aio, max);
+ if (r < 0) {
+ derr << __func__ << " got " << cpp_strerror(r) << dendl;
+ ceph_abort_msg("got unexpected error from io_getevents");
+ }
+ if (r > 0) {
+ dout(30) << __func__ << " got " << r << " completed aios" << dendl;
+ for (int i = 0; i < r; ++i) {
+ IOContext *ioc = static_cast<IOContext*>(aio[i]->priv);
+ _aio_log_finish(ioc, aio[i]->offset, aio[i]->length);
+ if (aio[i]->queue_item.is_linked()) {
+ std::lock_guard l(debug_queue_lock);
+ debug_aio_unlink(*aio[i]);
+ }
+
+ // set flag indicating new ios have completed. we do this *before*
+ // any completion or notifications so that any user flush() that
+ // follows the observed io completion will include this io. Note
+ // that an earlier, racing flush() could observe and clear this
+ // flag, but that also ensures that the IO will be stable before the
+ // later flush() occurs.
+ io_since_flush.store(true);
+
+ long r = aio[i]->get_return_value();
+ if (r < 0) {
+ derr << __func__ << " got r=" << r << " (" << cpp_strerror(r) << ")"
+ << dendl;
+ if (ioc->allow_eio && is_expected_ioerr(r)) {
+ derr << __func__ << " translating the error to EIO for upper layer"
+ << dendl;
+ ioc->set_return_value(-EIO);
+ } else {
+ if (is_expected_ioerr(r)) {
+ note_io_error_event(
+ devname.c_str(),
+ path.c_str(),
+ r,
+#if defined(HAVE_POSIXAIO)
+ aio[i]->aio.aiocb.aio_lio_opcode,
+#else
+ aio[i]->iocb.aio_lio_opcode,
+#endif
+ aio[i]->offset,
+ aio[i]->length);
+ ceph_abort_msg(
+ "Unexpected IO error. "
+ "This may suggest a hardware issue. "
+ "Please check your kernel log!");
+ }
+ ceph_abort_msg(
+ "Unexpected IO error. "
+ "This may suggest HW issue. Please check your dmesg!");
+ }
+ } else if (aio[i]->length != (uint64_t)r) {
+ derr << "aio to 0x" << std::hex << aio[i]->offset
+ << "~" << aio[i]->length << std::dec
+ << " but returned: " << r << dendl;
+ ceph_abort_msg("unexpected aio return value: does not match length");
+ }
+
+ dout(10) << __func__ << " finished aio " << aio[i] << " r " << r
+ << " ioc " << ioc
+ << " with " << (ioc->num_running.load() - 1)
+ << " aios left" << dendl;
+
+ // NOTE: once num_running and we either call the callback or
+ // call aio_wake we cannot touch ioc or aio[] as the caller
+ // may free it.
+ if (ioc->priv) {
+ if (--ioc->num_running == 0) {
+ aio_callback(aio_callback_priv, ioc->priv);
+ }
+ } else {
+ ioc->try_aio_wake();
+ }
+ }
+ }
+ if (cct->_conf->bdev_debug_aio) {
+ utime_t now = ceph_clock_now();
+ std::lock_guard l(debug_queue_lock);
+ if (debug_oldest) {
+ if (debug_stall_since == utime_t()) {
+ debug_stall_since = now;
+ } else {
+ if (cct->_conf->bdev_debug_aio_suicide_timeout) {
+ utime_t cutoff = now;
+ cutoff -= cct->_conf->bdev_debug_aio_suicide_timeout;
+ if (debug_stall_since < cutoff) {
+ derr << __func__ << " stalled aio " << debug_oldest
+ << " since " << debug_stall_since << ", timeout is "
+ << cct->_conf->bdev_debug_aio_suicide_timeout
+ << "s, suicide" << dendl;
+ ceph_abort_msg("stalled aio... buggy kernel or bad device?");
+ }
+ }
+ }
+ }
+ }
+ if (cct->_conf->bdev_inject_crash) {
+ ++inject_crash_count;
+ if (inject_crash_count * cct->_conf->bdev_aio_poll_ms / 1000 >
+ cct->_conf->bdev_inject_crash + cct->_conf->bdev_inject_crash_flush_delay) {
+ derr << __func__ << " bdev_inject_crash trigger from aio thread"
+ << dendl;
+ cct->_log->flush();
+ _exit(1);
+ }
+ }
+ }
+ dout(10) << __func__ << " end" << dendl;
+}
+
+void KernelDevice::_discard_thread()
+{
+ std::unique_lock l(discard_lock);
+ ceph_assert(!discard_started);
+ discard_started = true;
+ discard_cond.notify_all();
+ while (true) {
+ ceph_assert(discard_finishing.empty());
+ if (discard_queued.empty()) {
+ if (discard_stop)
+ break;
+ dout(20) << __func__ << " sleep" << dendl;
+ discard_cond.notify_all(); // for the thread trying to drain...
+ discard_cond.wait(l);
+ dout(20) << __func__ << " wake" << dendl;
+ } else {
+ discard_finishing.swap(discard_queued);
+ discard_running = true;
+ l.unlock();
+ dout(20) << __func__ << " finishing" << dendl;
+ for (auto p = discard_finishing.begin();p != discard_finishing.end(); ++p) {
+ _discard(p.get_start(), p.get_len());
+ }
+
+ discard_callback(discard_callback_priv, static_cast<void*>(&discard_finishing));
+ discard_finishing.clear();
+ l.lock();
+ discard_running = false;
+ }
+ }
+ dout(10) << __func__ << " finish" << dendl;
+ discard_started = false;
+}
+
+int KernelDevice::_queue_discard(interval_set<uint64_t> &to_release)
+{
+ // if bdev_async_discard enabled on the fly, discard_thread is not started here, fallback to sync discard
+ if (!discard_thread.is_started())
+ return -1;
+
+ if (to_release.empty())
+ return 0;
+
+ std::lock_guard l(discard_lock);
+ discard_queued.insert(to_release);
+ discard_cond.notify_all();
+ return 0;
+}
+
+// return true only if _queue_discard succeeded, so caller won't have to do alloc->release
+// otherwise false
+bool KernelDevice::try_discard(interval_set<uint64_t> &to_release, bool async)
+{
+ if (!support_discard || !cct->_conf->bdev_enable_discard)
+ return false;
+
+ if (async && discard_thread.is_started()) {
+ return 0 == _queue_discard(to_release);
+ } else {
+ for (auto p = to_release.begin(); p != to_release.end(); ++p) {
+ _discard(p.get_start(), p.get_len());
+ }
+ }
+ return false;
+}
+
+void KernelDevice::_aio_log_start(
+ IOContext *ioc,
+ uint64_t offset,
+ uint64_t length)
+{
+ dout(20) << __func__ << " 0x" << std::hex << offset << "~" << length
+ << std::dec << dendl;
+ if (cct->_conf->bdev_debug_inflight_ios) {
+ std::lock_guard l(debug_lock);
+ if (debug_inflight.intersects(offset, length)) {
+ derr << __func__ << " inflight overlap of 0x"
+ << std::hex
+ << offset << "~" << length << std::dec
+ << " with " << debug_inflight << dendl;
+ ceph_abort();
+ }
+ debug_inflight.insert(offset, length);
+ }
+}
+
+void KernelDevice::debug_aio_link(aio_t& aio)
+{
+ if (debug_queue.empty()) {
+ debug_oldest = &aio;
+ }
+ debug_queue.push_back(aio);
+}
+
+void KernelDevice::debug_aio_unlink(aio_t& aio)
+{
+ if (aio.queue_item.is_linked()) {
+ debug_queue.erase(debug_queue.iterator_to(aio));
+ if (debug_oldest == &aio) {
+ auto age = cct->_conf->bdev_debug_aio_log_age;
+ if (age && debug_stall_since != utime_t()) {
+ utime_t cutoff = ceph_clock_now();
+ cutoff -= age;
+ if (debug_stall_since < cutoff) {
+ derr << __func__ << " stalled aio " << debug_oldest
+ << " since " << debug_stall_since << ", timeout is "
+ << age
+ << "s" << dendl;
+ }
+ }
+
+ if (debug_queue.empty()) {
+ debug_oldest = nullptr;
+ } else {
+ debug_oldest = &debug_queue.front();
+ }
+ debug_stall_since = utime_t();
+ }
+ }
+}
+
+void KernelDevice::_aio_log_finish(
+ IOContext *ioc,
+ uint64_t offset,
+ uint64_t length)
+{
+ dout(20) << __func__ << " " << aio << " 0x"
+ << std::hex << offset << "~" << length << std::dec << dendl;
+ if (cct->_conf->bdev_debug_inflight_ios) {
+ std::lock_guard l(debug_lock);
+ debug_inflight.erase(offset, length);
+ }
+}
+
+void KernelDevice::aio_submit(IOContext *ioc)
+{
+ dout(20) << __func__ << " ioc " << ioc
+ << " pending " << ioc->num_pending.load()
+ << " running " << ioc->num_running.load()
+ << dendl;
+
+ if (ioc->num_pending.load() == 0) {
+ return;
+ }
+
+ // move these aside, and get our end iterator position now, as the
+ // aios might complete as soon as they are submitted and queue more
+ // wal aio's.
+ list<aio_t>::iterator e = ioc->running_aios.begin();
+ ioc->running_aios.splice(e, ioc->pending_aios);
+
+ int pending = ioc->num_pending.load();
+ ioc->num_running += pending;
+ ioc->num_pending -= pending;
+ ceph_assert(ioc->num_pending.load() == 0); // we should be only thread doing this
+ ceph_assert(ioc->pending_aios.size() == 0);
+
+ if (cct->_conf->bdev_debug_aio) {
+ list<aio_t>::iterator p = ioc->running_aios.begin();
+ while (p != e) {
+ dout(30) << __func__ << " " << *p << dendl;
+ std::lock_guard l(debug_queue_lock);
+ debug_aio_link(*p++);
+ }
+ }
+
+ void *priv = static_cast<void*>(ioc);
+ int r, retries = 0;
+ // num of pending aios should not overflow when passed to submit_batch()
+ assert(pending <= std::numeric_limits<uint16_t>::max());
+ r = io_queue->submit_batch(ioc->running_aios.begin(), e,
+ pending, priv, &retries);
+
+ if (retries)
+ derr << __func__ << " retries " << retries << dendl;
+ if (r < 0) {
+ derr << " aio submit got " << cpp_strerror(r) << dendl;
+ ceph_assert(r == 0);
+ }
+}
+
+int KernelDevice::_sync_write(uint64_t off, bufferlist &bl, bool buffered, int write_hint)
+{
+ uint64_t len = bl.length();
+ dout(5) << __func__ << " 0x" << std::hex << off << "~" << len
+ << std::dec << " " << buffermode(buffered) << dendl;
+ if (cct->_conf->bdev_inject_crash &&
+ rand() % cct->_conf->bdev_inject_crash == 0) {
+ derr << __func__ << " bdev_inject_crash: dropping io 0x" << std::hex
+ << off << "~" << len << std::dec << dendl;
+ ++injecting_crash;
+ return 0;
+ }
+ vector<iovec> iov;
+ bl.prepare_iov(&iov);
+
+ auto left = len;
+ auto o = off;
+ size_t idx = 0;
+ do {
+ auto r = ::pwritev(choose_fd(buffered, write_hint),
+ &iov[idx], iov.size() - idx, o);
+
+ if (r < 0) {
+ r = -errno;
+ derr << __func__ << " pwritev error: " << cpp_strerror(r) << dendl;
+ return r;
+ }
+ o += r;
+ left -= r;
+ if (left) {
+ // skip fully processed IOVs
+ while (idx < iov.size() && (size_t)r >= iov[idx].iov_len) {
+ r -= iov[idx++].iov_len;
+ }
+ // update partially processed one if any
+ if (r) {
+ ceph_assert(idx < iov.size());
+ ceph_assert((size_t)r < iov[idx].iov_len);
+ iov[idx].iov_base = static_cast<char*>(iov[idx].iov_base) + r;
+ iov[idx].iov_len -= r;
+ r = 0;
+ }
+ ceph_assert(r == 0);
+ }
+ } while (left);
+
+#ifdef HAVE_SYNC_FILE_RANGE
+ if (buffered) {
+ // initiate IO and wait till it completes
+ auto r = ::sync_file_range(fd_buffereds[WRITE_LIFE_NOT_SET], off, len, SYNC_FILE_RANGE_WRITE|SYNC_FILE_RANGE_WAIT_AFTER|SYNC_FILE_RANGE_WAIT_BEFORE);
+ if (r < 0) {
+ r = -errno;
+ derr << __func__ << " sync_file_range error: " << cpp_strerror(r) << dendl;
+ return r;
+ }
+ }
+#endif
+
+ io_since_flush.store(true);
+
+ return 0;
+}
+
+int KernelDevice::write(
+ uint64_t off,
+ bufferlist &bl,
+ bool buffered,
+ int write_hint)
+{
+ uint64_t len = bl.length();
+ dout(20) << __func__ << " 0x" << std::hex << off << "~" << len << std::dec
+ << " " << buffermode(buffered)
+ << dendl;
+ ceph_assert(is_valid_io(off, len));
+ if (cct->_conf->objectstore_blackhole) {
+ lderr(cct) << __func__ << " objectstore_blackhole=true, throwing out IO"
+ << dendl;
+ return 0;
+ }
+
+ if ((!buffered || bl.get_num_buffers() >= IOV_MAX) &&
+ bl.rebuild_aligned_size_and_memory(block_size, block_size, IOV_MAX)) {
+ dout(20) << __func__ << " rebuilding buffer to be aligned" << dendl;
+ }
+ dout(40) << "data:\n";
+ bl.hexdump(*_dout);
+ *_dout << dendl;
+
+ return _sync_write(off, bl, buffered, write_hint);
+}
+
+int KernelDevice::aio_write(
+ uint64_t off,
+ bufferlist &bl,
+ IOContext *ioc,
+ bool buffered,
+ int write_hint)
+{
+ uint64_t len = bl.length();
+ dout(20) << __func__ << " 0x" << std::hex << off << "~" << len << std::dec
+ << " " << buffermode(buffered)
+ << dendl;
+ ceph_assert(is_valid_io(off, len));
+ if (cct->_conf->objectstore_blackhole) {
+ lderr(cct) << __func__ << " objectstore_blackhole=true, throwing out IO"
+ << dendl;
+ return 0;
+ }
+
+ if ((!buffered || bl.get_num_buffers() >= IOV_MAX) &&
+ bl.rebuild_aligned_size_and_memory(block_size, block_size, IOV_MAX)) {
+ dout(20) << __func__ << " rebuilding buffer to be aligned" << dendl;
+ }
+ dout(40) << "data:\n";
+ bl.hexdump(*_dout);
+ *_dout << dendl;
+
+ _aio_log_start(ioc, off, len);
+
+#ifdef HAVE_LIBAIO
+ if (aio && dio && !buffered) {
+ if (cct->_conf->bdev_inject_crash &&
+ rand() % cct->_conf->bdev_inject_crash == 0) {
+ derr << __func__ << " bdev_inject_crash: dropping io 0x" << std::hex
+ << off << "~" << len << std::dec
+ << dendl;
+ // generate a real io so that aio_wait behaves properly, but make it
+ // a read instead of write, and toss the result.
+ ioc->pending_aios.push_back(aio_t(ioc, choose_fd(false, write_hint)));
+ ++ioc->num_pending;
+ auto& aio = ioc->pending_aios.back();
+ aio.bl.push_back(
+ ceph::buffer::ptr_node::create(ceph::buffer::create_small_page_aligned(len)));
+ aio.bl.prepare_iov(&aio.iov);
+ aio.preadv(off, len);
+ ++injecting_crash;
+ } else {
+ if (bl.length() <= RW_IO_MAX) {
+ // fast path (non-huge write)
+ ioc->pending_aios.push_back(aio_t(ioc, choose_fd(false, write_hint)));
+ ++ioc->num_pending;
+ auto& aio = ioc->pending_aios.back();
+ bl.prepare_iov(&aio.iov);
+ aio.bl.claim_append(bl);
+ aio.pwritev(off, len);
+ dout(30) << aio << dendl;
+ dout(5) << __func__ << " 0x" << std::hex << off << "~" << len
+ << std::dec << " aio " << &aio << dendl;
+ } else {
+ // write in RW_IO_MAX-sized chunks
+ uint64_t prev_len = 0;
+ while (prev_len < bl.length()) {
+ bufferlist tmp;
+ if (prev_len + RW_IO_MAX < bl.length()) {
+ tmp.substr_of(bl, prev_len, RW_IO_MAX);
+ } else {
+ tmp.substr_of(bl, prev_len, bl.length() - prev_len);
+ }
+ auto len = tmp.length();
+ ioc->pending_aios.push_back(aio_t(ioc, choose_fd(false, write_hint)));
+ ++ioc->num_pending;
+ auto& aio = ioc->pending_aios.back();
+ tmp.prepare_iov(&aio.iov);
+ aio.bl.claim_append(tmp);
+ aio.pwritev(off + prev_len, len);
+ dout(30) << aio << dendl;
+ dout(5) << __func__ << " 0x" << std::hex << off + prev_len
+ << "~" << len
+ << std::dec << " aio " << &aio << " (piece)" << dendl;
+ prev_len += len;
+ }
+ }
+ }
+ } else
+#endif
+ {
+ int r = _sync_write(off, bl, buffered, write_hint);
+ _aio_log_finish(ioc, off, len);
+ if (r < 0)
+ return r;
+ }
+ return 0;
+}
+
+int KernelDevice::_discard(uint64_t offset, uint64_t len)
+{
+ int r = 0;
+ if (cct->_conf->objectstore_blackhole) {
+ lderr(cct) << __func__ << " objectstore_blackhole=true, throwing out IO"
+ << dendl;
+ return 0;
+ }
+ dout(10) << __func__
+ << " 0x" << std::hex << offset << "~" << len << std::dec
+ << dendl;
+ r = BlkDev{fd_directs[WRITE_LIFE_NOT_SET]}.discard((int64_t)offset, (int64_t)len);
+ return r;
+}
+
+struct ExplicitHugePagePool {
+ using region_queue_t = boost::lockfree::queue<void*>;
+ using instrumented_raw = ceph::buffer_instrumentation::instrumented_raw<
+ BlockDevice::hugepaged_raw_marker_t>;
+
+ struct mmaped_buffer_raw : public instrumented_raw {
+ region_queue_t& region_q; // for recycling
+
+ mmaped_buffer_raw(void* mmaped_region, ExplicitHugePagePool& parent)
+ : instrumented_raw(static_cast<char*>(mmaped_region), parent.buffer_size),
+ region_q(parent.region_q) {
+ // the `mmaped_region` has been passed to `raw` as the buffer's `data`
+ }
+ ~mmaped_buffer_raw() override {
+ // don't delete nor unmmap; recycle the region instead
+ region_q.push(data);
+ }
+ };
+
+ ExplicitHugePagePool(const size_t buffer_size, size_t buffers_in_pool)
+ : buffer_size(buffer_size), region_q(buffers_in_pool) {
+ while (buffers_in_pool--) {
+ void* const mmaped_region = ::mmap(
+ nullptr,
+ buffer_size,
+ PROT_READ | PROT_WRITE,
+#if defined(__FreeBSD__)
+ // FreeBSD doesn't have MAP_HUGETLB nor MAP_POPULATE but it has
+ // a different, more automated / implicit mechanisms. However,
+ // we want to mimic the Linux behavior as closely as possible
+ // also in the matter of error handling which is the reason
+ // behind MAP_ALIGNED_SUPER.
+ // See: https://lists.freebsd.org/pipermail/freebsd-questions/2014-August/260578.html
+ MAP_PRIVATE | MAP_ANONYMOUS | MAP_PREFAULT_READ | MAP_ALIGNED_SUPER,
+#else
+ MAP_PRIVATE | MAP_ANONYMOUS | MAP_POPULATE | MAP_HUGETLB,
+#endif // __FreeBSD__
+ -1,
+ 0);
+ if (mmaped_region == MAP_FAILED) {
+ ceph_abort("can't allocate huge buffer;"
+ " /proc/sys/vm/nr_hugepages misconfigured?");
+ } else {
+ region_q.push(mmaped_region);
+ }
+ }
+ }
+ ~ExplicitHugePagePool() {
+ void* mmaped_region;
+ while (region_q.pop(mmaped_region)) {
+ ::munmap(mmaped_region, buffer_size);
+ }
+ }
+
+ ceph::unique_leakable_ptr<buffer::raw> try_create() {
+ if (void* mmaped_region; region_q.pop(mmaped_region)) {
+ return ceph::unique_leakable_ptr<buffer::raw> {
+ new mmaped_buffer_raw(mmaped_region, *this)
+ };
+ } else {
+ // oops, empty queue.
+ return nullptr;
+ }
+ }
+
+ size_t get_buffer_size() const {
+ return buffer_size;
+ }
+
+private:
+ const size_t buffer_size;
+ region_queue_t region_q;
+};
+
+struct HugePagePoolOfPools {
+ HugePagePoolOfPools(const std::map<size_t, size_t> conf)
+ : pools(conf.size(), [conf] (size_t index, auto emplacer) {
+ ceph_assert(index < conf.size());
+ // it could be replaced with a state-mutating lambda and
+ // `conf::erase()` but performance is not a concern here.
+ const auto [buffer_size, buffers_in_pool] =
+ *std::next(std::begin(conf), index);
+ emplacer.emplace(buffer_size, buffers_in_pool);
+ }) {
+ }
+
+ ceph::unique_leakable_ptr<buffer::raw> try_create(const size_t size) {
+ // thankfully to `conf` being a `std::map` we store the pools
+ // sorted by buffer sizes. this would allow to clamp to log(n)
+ // but I doubt admins want to have dozens of accelerated buffer
+ // size. let's keep this simple for now.
+ if (auto iter = std::find_if(std::begin(pools), std::end(pools),
+ [size] (const auto& pool) {
+ return size == pool.get_buffer_size();
+ });
+ iter != std::end(pools)) {
+ return iter->try_create();
+ }
+ return nullptr;
+ }
+
+ static HugePagePoolOfPools from_desc(const std::string& conf);
+
+private:
+ // let's have some space inside (for 2 MB and 4 MB perhaps?)
+ // NOTE: we need tiny_vector as the boost::lockfree queue inside
+ // pool is not-movable.
+ ceph::containers::tiny_vector<ExplicitHugePagePool, 2> pools;
+};
+
+
+HugePagePoolOfPools HugePagePoolOfPools::from_desc(const std::string& desc) {
+ std::map<size_t, size_t> conf; // buffer_size -> buffers_in_pool
+ std::map<std::string, std::string> exploded_str_conf;
+ get_str_map(desc, &exploded_str_conf);
+ for (const auto& [buffer_size_s, buffers_in_pool_s] : exploded_str_conf) {
+ size_t buffer_size, buffers_in_pool;
+ if (sscanf(buffer_size_s.c_str(), "%zu", &buffer_size) != 1) {
+ ceph_abort("can't parse a key in the configuration");
+ }
+ if (sscanf(buffers_in_pool_s.c_str(), "%zu", &buffers_in_pool) != 1) {
+ ceph_abort("can't parse a value in the configuration");
+ }
+ conf[buffer_size] = buffers_in_pool;
+ }
+ return HugePagePoolOfPools{std::move(conf)};
+}
+
+// create a buffer basing on user-configurable. it's intended to make
+// our buffers THP-able.
+ceph::unique_leakable_ptr<buffer::raw> KernelDevice::create_custom_aligned(
+ const size_t len,
+ IOContext* const ioc) const
+{
+ // just to preserve the logic of create_small_page_aligned().
+ if (len < CEPH_PAGE_SIZE) {
+ return ceph::buffer::create_small_page_aligned(len);
+ } else {
+ static HugePagePoolOfPools hp_pools = HugePagePoolOfPools::from_desc(
+ cct->_conf.get_val<std::string>("bdev_read_preallocated_huge_buffers")
+ );
+ if (auto lucky_raw = hp_pools.try_create(len); lucky_raw) {
+ dout(20) << __func__ << " allocated from huge pool"
+ << " lucky_raw.data=" << (void*)lucky_raw->get_data()
+ << " bdev_read_preallocated_huge_buffers="
+ << cct->_conf.get_val<std::string>("bdev_read_preallocated_huge_buffers")
+ << dendl;
+ ioc->flags |= IOContext::FLAG_DONT_CACHE;
+ return lucky_raw;
+ } else {
+ // fallthrough due to empty buffer pool. this can happen also
+ // when the configurable was explicitly set to 0.
+ dout(20) << __func__ << " cannot allocate from huge pool"
+ << dendl;
+ }
+ }
+ const size_t custom_alignment = cct->_conf->bdev_read_buffer_alignment;
+ dout(20) << __func__ << " with the custom alignment;"
+ << " len=" << len
+ << " custom_alignment=" << custom_alignment
+ << dendl;
+ return ceph::buffer::create_aligned(len, custom_alignment);
+}
+
+int KernelDevice::read(uint64_t off, uint64_t len, bufferlist *pbl,
+ IOContext *ioc,
+ bool buffered)
+{
+ dout(5) << __func__ << " 0x" << std::hex << off << "~" << len << std::dec
+ << " " << buffermode(buffered)
+ << dendl;
+ ceph_assert(is_valid_io(off, len));
+
+ _aio_log_start(ioc, off, len);
+
+ auto start1 = mono_clock::now();
+
+ auto p = ceph::buffer::ptr_node::create(create_custom_aligned(len, ioc));
+ int r = ::pread(choose_fd(buffered, WRITE_LIFE_NOT_SET),
+ p->c_str(), len, off);
+ auto age = cct->_conf->bdev_debug_aio_log_age;
+ if (mono_clock::now() - start1 >= make_timespan(age)) {
+ derr << __func__ << " stalled read "
+ << " 0x" << std::hex << off << "~" << len << std::dec
+ << " " << buffermode(buffered)
+ << " since " << start1 << ", timeout is "
+ << age
+ << "s" << dendl;
+ }
+ if (r < 0) {
+ if (ioc->allow_eio && is_expected_ioerr(-errno)) {
+ r = -EIO;
+ } else {
+ r = -errno;
+ }
+ derr << __func__ << " 0x" << std::hex << off << "~" << std::left
+ << std::dec << " error: " << cpp_strerror(r) << dendl;
+ goto out;
+ }
+ ceph_assert((uint64_t)r == len);
+ pbl->push_back(std::move(p));
+
+ dout(40) << "data:\n";
+ pbl->hexdump(*_dout);
+ *_dout << dendl;
+
+ out:
+ _aio_log_finish(ioc, off, len);
+ return r < 0 ? r : 0;
+}
+
+int KernelDevice::aio_read(
+ uint64_t off,
+ uint64_t len,
+ bufferlist *pbl,
+ IOContext *ioc)
+{
+ dout(5) << __func__ << " 0x" << std::hex << off << "~" << len << std::dec
+ << dendl;
+
+ int r = 0;
+#ifdef HAVE_LIBAIO
+ if (aio && dio) {
+ ceph_assert(is_valid_io(off, len));
+ _aio_log_start(ioc, off, len);
+ ioc->pending_aios.push_back(aio_t(ioc, fd_directs[WRITE_LIFE_NOT_SET]));
+ ++ioc->num_pending;
+ aio_t& aio = ioc->pending_aios.back();
+ aio.bl.push_back(
+ ceph::buffer::ptr_node::create(create_custom_aligned(len, ioc)));
+ aio.bl.prepare_iov(&aio.iov);
+ aio.preadv(off, len);
+ dout(30) << aio << dendl;
+ pbl->append(aio.bl);
+ dout(5) << __func__ << " 0x" << std::hex << off << "~" << len
+ << std::dec << " aio " << &aio << dendl;
+ } else
+#endif
+ {
+ r = read(off, len, pbl, ioc, false);
+ }
+
+ return r;
+}
+
+int KernelDevice::direct_read_unaligned(uint64_t off, uint64_t len, char *buf)
+{
+ uint64_t aligned_off = p2align(off, block_size);
+ uint64_t aligned_len = p2roundup(off+len, block_size) - aligned_off;
+ bufferptr p = ceph::buffer::create_small_page_aligned(aligned_len);
+ int r = 0;
+
+ auto start1 = mono_clock::now();
+ r = ::pread(fd_directs[WRITE_LIFE_NOT_SET], p.c_str(), aligned_len, aligned_off);
+ auto age = cct->_conf->bdev_debug_aio_log_age;
+ if (mono_clock::now() - start1 >= make_timespan(age)) {
+ derr << __func__ << " stalled read "
+ << " 0x" << std::hex << off << "~" << len << std::dec
+ << " since " << start1 << ", timeout is "
+ << age
+ << "s" << dendl;
+ }
+
+ if (r < 0) {
+ r = -errno;
+ derr << __func__ << " 0x" << std::hex << off << "~" << len << std::dec
+ << " error: " << cpp_strerror(r) << dendl;
+ goto out;
+ }
+ ceph_assert((uint64_t)r == aligned_len);
+ memcpy(buf, p.c_str() + (off - aligned_off), len);
+
+ dout(40) << __func__ << " data:\n";
+ bufferlist bl;
+ bl.append(buf, len);
+ bl.hexdump(*_dout);
+ *_dout << dendl;
+
+ out:
+ return r < 0 ? r : 0;
+}
+
+int KernelDevice::read_random(uint64_t off, uint64_t len, char *buf,
+ bool buffered)
+{
+ dout(5) << __func__ << " 0x" << std::hex << off << "~" << len << std::dec
+ << "buffered " << buffered
+ << dendl;
+ ceph_assert(len > 0);
+ ceph_assert(off < size);
+ ceph_assert(off + len <= size);
+ int r = 0;
+ auto age = cct->_conf->bdev_debug_aio_log_age;
+
+ //if it's direct io and unaligned, we have to use a internal buffer
+ if (!buffered && ((off % block_size != 0)
+ || (len % block_size != 0)
+ || (uintptr_t(buf) % CEPH_PAGE_SIZE != 0)))
+ return direct_read_unaligned(off, len, buf);
+
+ auto start1 = mono_clock::now();
+ if (buffered) {
+ //buffered read
+ auto off0 = off;
+ char *t = buf;
+ uint64_t left = len;
+ while (left > 0) {
+ r = ::pread(fd_buffereds[WRITE_LIFE_NOT_SET], t, left, off);
+ if (r < 0) {
+ r = -errno;
+ derr << __func__ << " 0x" << std::hex << off << "~" << left
+ << std::dec << " error: " << cpp_strerror(r) << dendl;
+ goto out;
+ }
+ off += r;
+ t += r;
+ left -= r;
+ }
+ if (mono_clock::now() - start1 >= make_timespan(age)) {
+ derr << __func__ << " stalled read "
+ << " 0x" << std::hex << off0 << "~" << len << std::dec
+ << " (buffered) since " << start1 << ", timeout is "
+ << age
+ << "s" << dendl;
+ }
+ } else {
+ //direct and aligned read
+ r = ::pread(fd_directs[WRITE_LIFE_NOT_SET], buf, len, off);
+ if (mono_clock::now() - start1 >= make_timespan(age)) {
+ derr << __func__ << " stalled read "
+ << " 0x" << std::hex << off << "~" << len << std::dec
+ << " (direct) since " << start1 << ", timeout is "
+ << age
+ << "s" << dendl;
+ }
+ if (r < 0) {
+ r = -errno;
+ derr << __func__ << " direct_aligned_read" << " 0x" << std::hex
+ << off << "~" << std::left << std::dec << " error: " << cpp_strerror(r)
+ << dendl;
+ goto out;
+ }
+ ceph_assert((uint64_t)r == len);
+ }
+
+ dout(40) << __func__ << " data:\n";
+ bufferlist bl;
+ bl.append(buf, len);
+ bl.hexdump(*_dout);
+ *_dout << dendl;
+
+ out:
+ return r < 0 ? r : 0;
+}
+
+int KernelDevice::invalidate_cache(uint64_t off, uint64_t len)
+{
+ dout(5) << __func__ << " 0x" << std::hex << off << "~" << len << std::dec
+ << dendl;
+ ceph_assert(off % block_size == 0);
+ ceph_assert(len % block_size == 0);
+ int r = posix_fadvise(fd_buffereds[WRITE_LIFE_NOT_SET], off, len, POSIX_FADV_DONTNEED);
+ if (r) {
+ r = -r;
+ derr << __func__ << " 0x" << std::hex << off << "~" << len << std::dec
+ << " error: " << cpp_strerror(r) << dendl;
+ }
+ return r;
+}
diff --git a/src/blk/kernel/KernelDevice.h b/src/blk/kernel/KernelDevice.h
new file mode 100644
index 000000000..e00e31f10
--- /dev/null
+++ b/src/blk/kernel/KernelDevice.h
@@ -0,0 +1,156 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2014 Red Hat
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+#ifndef CEPH_BLK_KERNELDEVICE_H
+#define CEPH_BLK_KERNELDEVICE_H
+
+#include <atomic>
+
+#include "include/types.h"
+#include "include/interval_set.h"
+#include "common/Thread.h"
+#include "include/utime.h"
+
+#include "aio/aio.h"
+#include "BlockDevice.h"
+#include "extblkdev/ExtBlkDevPlugin.h"
+
+#define RW_IO_MAX (INT_MAX & CEPH_PAGE_MASK)
+
+class KernelDevice : public BlockDevice {
+protected:
+ std::string path;
+private:
+ std::vector<int> fd_directs, fd_buffereds;
+ bool enable_wrt = true;
+ bool aio, dio;
+
+ ExtBlkDevInterfaceRef ebd_impl; // structure for retrieving compression state from extended block device
+
+ std::string devname; ///< kernel dev name (/sys/block/$devname), if any
+
+ ceph::mutex debug_lock = ceph::make_mutex("KernelDevice::debug_lock");
+ interval_set<uint64_t> debug_inflight;
+
+ std::atomic<bool> io_since_flush = {false};
+ ceph::mutex flush_mutex = ceph::make_mutex("KernelDevice::flush_mutex");
+
+ std::unique_ptr<io_queue_t> io_queue;
+ aio_callback_t discard_callback;
+ void *discard_callback_priv;
+ bool aio_stop;
+ bool discard_started;
+ bool discard_stop;
+
+ ceph::mutex discard_lock = ceph::make_mutex("KernelDevice::discard_lock");
+ ceph::condition_variable discard_cond;
+ bool discard_running = false;
+ interval_set<uint64_t> discard_queued;
+ interval_set<uint64_t> discard_finishing;
+
+ struct AioCompletionThread : public Thread {
+ KernelDevice *bdev;
+ explicit AioCompletionThread(KernelDevice *b) : bdev(b) {}
+ void *entry() override {
+ bdev->_aio_thread();
+ return NULL;
+ }
+ } aio_thread;
+
+ struct DiscardThread : public Thread {
+ KernelDevice *bdev;
+ explicit DiscardThread(KernelDevice *b) : bdev(b) {}
+ void *entry() override {
+ bdev->_discard_thread();
+ return NULL;
+ }
+ } discard_thread;
+
+ std::atomic_int injecting_crash;
+
+ virtual int _post_open() { return 0; } // hook for child implementations
+ virtual void _pre_close() { } // hook for child implementations
+
+ void _aio_thread();
+ void _discard_thread();
+ int _queue_discard(interval_set<uint64_t> &to_release);
+ bool try_discard(interval_set<uint64_t> &to_release, bool async = true) override;
+
+ int _aio_start();
+ void _aio_stop();
+
+ void _discard_start();
+ void _discard_stop();
+
+ void _aio_log_start(IOContext *ioc, uint64_t offset, uint64_t length);
+ void _aio_log_finish(IOContext *ioc, uint64_t offset, uint64_t length);
+
+ int _sync_write(uint64_t off, ceph::buffer::list& bl, bool buffered, int write_hint);
+
+ int _lock();
+
+ int direct_read_unaligned(uint64_t off, uint64_t len, char *buf);
+
+ // stalled aio debugging
+ aio_list_t debug_queue;
+ ceph::mutex debug_queue_lock = ceph::make_mutex("KernelDevice::debug_queue_lock");
+ aio_t *debug_oldest = nullptr;
+ utime_t debug_stall_since;
+ void debug_aio_link(aio_t& aio);
+ void debug_aio_unlink(aio_t& aio);
+
+ int choose_fd(bool buffered, int write_hint) const;
+
+ ceph::unique_leakable_ptr<buffer::raw> create_custom_aligned(size_t len, IOContext* ioc) const;
+
+public:
+ KernelDevice(CephContext* cct, aio_callback_t cb, void *cbpriv, aio_callback_t d_cb, void *d_cbpriv);
+
+ void aio_submit(IOContext *ioc) override;
+ void discard_drain() override;
+
+ int collect_metadata(const std::string& prefix, std::map<std::string,std::string> *pm) const override;
+ int get_devname(std::string *s) const override {
+ if (devname.empty()) {
+ return -ENOENT;
+ }
+ *s = devname;
+ return 0;
+ }
+ int get_devices(std::set<std::string> *ls) const override;
+
+ int get_ebd_state(ExtBlkDevState &state) const override;
+
+ int read(uint64_t off, uint64_t len, ceph::buffer::list *pbl,
+ IOContext *ioc,
+ bool buffered) override;
+ int aio_read(uint64_t off, uint64_t len, ceph::buffer::list *pbl,
+ IOContext *ioc) override;
+ int read_random(uint64_t off, uint64_t len, char *buf, bool buffered) override;
+
+ int write(uint64_t off, ceph::buffer::list& bl, bool buffered, int write_hint = WRITE_LIFE_NOT_SET) override;
+ int aio_write(uint64_t off, ceph::buffer::list& bl,
+ IOContext *ioc,
+ bool buffered,
+ int write_hint = WRITE_LIFE_NOT_SET) override;
+ int flush() override;
+ int _discard(uint64_t offset, uint64_t len);
+
+ // for managing buffered readers/writers
+ int invalidate_cache(uint64_t off, uint64_t len) override;
+ int open(const std::string& path) override;
+ void close() override;
+};
+
+#endif
diff --git a/src/blk/kernel/io_uring.cc b/src/blk/kernel/io_uring.cc
new file mode 100644
index 000000000..5e7fd1227
--- /dev/null
+++ b/src/blk/kernel/io_uring.cc
@@ -0,0 +1,264 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include "io_uring.h"
+
+#if defined(HAVE_LIBURING)
+
+#include "liburing.h"
+#include <sys/epoll.h>
+
+using std::list;
+using std::make_unique;
+
+struct ioring_data {
+ struct io_uring io_uring;
+ pthread_mutex_t cq_mutex;
+ pthread_mutex_t sq_mutex;
+ int epoll_fd = -1;
+ std::map<int, int> fixed_fds_map;
+};
+
+static int ioring_get_cqe(struct ioring_data *d, unsigned int max,
+ struct aio_t **paio)
+{
+ struct io_uring *ring = &d->io_uring;
+ struct io_uring_cqe *cqe;
+
+ unsigned nr = 0;
+ unsigned head;
+ io_uring_for_each_cqe(ring, head, cqe) {
+ struct aio_t *io = (struct aio_t *)(uintptr_t) io_uring_cqe_get_data(cqe);
+ io->rval = cqe->res;
+
+ paio[nr++] = io;
+
+ if (nr == max)
+ break;
+ }
+ io_uring_cq_advance(ring, nr);
+
+ return nr;
+}
+
+static int find_fixed_fd(struct ioring_data *d, int real_fd)
+{
+ auto it = d->fixed_fds_map.find(real_fd);
+ if (it == d->fixed_fds_map.end())
+ return -1;
+
+ return it->second;
+}
+
+static void init_sqe(struct ioring_data *d, struct io_uring_sqe *sqe,
+ struct aio_t *io)
+{
+ int fixed_fd = find_fixed_fd(d, io->fd);
+
+ ceph_assert(fixed_fd != -1);
+
+ if (io->iocb.aio_lio_opcode == IO_CMD_PWRITEV)
+ io_uring_prep_writev(sqe, fixed_fd, &io->iov[0],
+ io->iov.size(), io->offset);
+ else if (io->iocb.aio_lio_opcode == IO_CMD_PREADV)
+ io_uring_prep_readv(sqe, fixed_fd, &io->iov[0],
+ io->iov.size(), io->offset);
+ else
+ ceph_assert(0);
+
+ io_uring_sqe_set_data(sqe, io);
+ io_uring_sqe_set_flags(sqe, IOSQE_FIXED_FILE);
+}
+
+static int ioring_queue(struct ioring_data *d, void *priv,
+ list<aio_t>::iterator beg, list<aio_t>::iterator end)
+{
+ struct io_uring *ring = &d->io_uring;
+ struct aio_t *io = nullptr;
+
+ ceph_assert(beg != end);
+
+ do {
+ struct io_uring_sqe *sqe = io_uring_get_sqe(ring);
+ if (!sqe)
+ break;
+
+ io = &*beg;
+ io->priv = priv;
+
+ init_sqe(d, sqe, io);
+
+ } while (++beg != end);
+
+ if (!io)
+ /* Queue is full, go and reap something first */
+ return 0;
+
+ return io_uring_submit(ring);
+}
+
+static void build_fixed_fds_map(struct ioring_data *d,
+ std::vector<int> &fds)
+{
+ int fixed_fd = 0;
+ for (int real_fd : fds) {
+ d->fixed_fds_map[real_fd] = fixed_fd++;
+ }
+}
+
+ioring_queue_t::ioring_queue_t(unsigned iodepth_, bool hipri_, bool sq_thread_) :
+ d(make_unique<ioring_data>()),
+ iodepth(iodepth_),
+ hipri(hipri_),
+ sq_thread(sq_thread_)
+{
+}
+
+ioring_queue_t::~ioring_queue_t()
+{
+}
+
+int ioring_queue_t::init(std::vector<int> &fds)
+{
+ unsigned flags = 0;
+
+ pthread_mutex_init(&d->cq_mutex, NULL);
+ pthread_mutex_init(&d->sq_mutex, NULL);
+
+ if (hipri)
+ flags |= IORING_SETUP_IOPOLL;
+ if (sq_thread)
+ flags |= IORING_SETUP_SQPOLL;
+
+ int ret = io_uring_queue_init(iodepth, &d->io_uring, flags);
+ if (ret < 0)
+ return ret;
+
+ ret = io_uring_register_files(&d->io_uring,
+ &fds[0], fds.size());
+ if (ret < 0) {
+ ret = -errno;
+ goto close_ring_fd;
+ }
+
+ build_fixed_fds_map(d.get(), fds);
+
+ d->epoll_fd = epoll_create1(0);
+ if (d->epoll_fd < 0) {
+ ret = -errno;
+ goto close_ring_fd;
+ }
+
+ struct epoll_event ev;
+ ev.events = EPOLLIN;
+ ret = epoll_ctl(d->epoll_fd, EPOLL_CTL_ADD, d->io_uring.ring_fd, &ev);
+ if (ret < 0) {
+ ret = -errno;
+ goto close_epoll_fd;
+ }
+
+ return 0;
+
+close_epoll_fd:
+ close(d->epoll_fd);
+close_ring_fd:
+ io_uring_queue_exit(&d->io_uring);
+
+ return ret;
+}
+
+void ioring_queue_t::shutdown()
+{
+ d->fixed_fds_map.clear();
+ close(d->epoll_fd);
+ d->epoll_fd = -1;
+ io_uring_queue_exit(&d->io_uring);
+}
+
+int ioring_queue_t::submit_batch(aio_iter beg, aio_iter end,
+ uint16_t aios_size, void *priv,
+ int *retries)
+{
+ (void)aios_size;
+ (void)retries;
+
+ pthread_mutex_lock(&d->sq_mutex);
+ int rc = ioring_queue(d.get(), priv, beg, end);
+ pthread_mutex_unlock(&d->sq_mutex);
+
+ return rc;
+}
+
+int ioring_queue_t::get_next_completed(int timeout_ms, aio_t **paio, int max)
+{
+get_cqe:
+ pthread_mutex_lock(&d->cq_mutex);
+ int events = ioring_get_cqe(d.get(), max, paio);
+ pthread_mutex_unlock(&d->cq_mutex);
+
+ if (events == 0) {
+ struct epoll_event ev;
+ int ret = TEMP_FAILURE_RETRY(epoll_wait(d->epoll_fd, &ev, 1, timeout_ms));
+ if (ret < 0)
+ events = -errno;
+ else if (ret > 0)
+ /* Time to reap */
+ goto get_cqe;
+ }
+
+ return events;
+}
+
+bool ioring_queue_t::supported()
+{
+ struct io_uring ring;
+ int ret = io_uring_queue_init(16, &ring, 0);
+ if (ret) {
+ return false;
+ }
+ io_uring_queue_exit(&ring);
+ return true;
+}
+
+#else // #if defined(HAVE_LIBURING)
+
+struct ioring_data {};
+
+ioring_queue_t::ioring_queue_t(unsigned iodepth_, bool hipri_, bool sq_thread_)
+{
+ ceph_assert(0);
+}
+
+ioring_queue_t::~ioring_queue_t()
+{
+ ceph_assert(0);
+}
+
+int ioring_queue_t::init(std::vector<int> &fds)
+{
+ ceph_assert(0);
+}
+
+void ioring_queue_t::shutdown()
+{
+ ceph_assert(0);
+}
+
+int ioring_queue_t::submit_batch(aio_iter beg, aio_iter end,
+ uint16_t aios_size, void *priv,
+ int *retries)
+{
+ ceph_assert(0);
+}
+
+int ioring_queue_t::get_next_completed(int timeout_ms, aio_t **paio, int max)
+{
+ ceph_assert(0);
+}
+
+bool ioring_queue_t::supported()
+{
+ return false;
+}
+
+#endif // #if defined(HAVE_LIBURING)
diff --git a/src/blk/kernel/io_uring.h b/src/blk/kernel/io_uring.h
new file mode 100644
index 000000000..e7d0acde0
--- /dev/null
+++ b/src/blk/kernel/io_uring.h
@@ -0,0 +1,33 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#pragma once
+
+#include "acconfig.h"
+
+#include "include/types.h"
+#include "aio/aio.h"
+
+struct ioring_data;
+
+struct ioring_queue_t final : public io_queue_t {
+ std::unique_ptr<ioring_data> d;
+ unsigned iodepth = 0;
+ bool hipri = false;
+ bool sq_thread = false;
+
+ typedef std::list<aio_t>::iterator aio_iter;
+
+ // Returns true if arch is x86-64 and kernel supports io_uring
+ static bool supported();
+
+ ioring_queue_t(unsigned iodepth_, bool hipri_, bool sq_thread_);
+ ~ioring_queue_t() final;
+
+ int init(std::vector<int> &fds) final;
+ void shutdown() final;
+
+ int submit_batch(aio_iter begin, aio_iter end, uint16_t aios_size,
+ void *priv, int *retries) final;
+ int get_next_completed(int timeout_ms, aio_t **paio, int max) final;
+};
diff --git a/src/blk/pmem/PMEMDevice.cc b/src/blk/pmem/PMEMDevice.cc
new file mode 100644
index 000000000..728c79a19
--- /dev/null
+++ b/src/blk/pmem/PMEMDevice.cc
@@ -0,0 +1,378 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2015 Intel <jianpeng.ma@intel.com>
+ *
+ * Author: Jianpeng Ma <jianpeng.ma@intel.com>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+#include <unistd.h>
+#include <stdlib.h>
+#include <sys/types.h>
+#include <sys/stat.h>
+#include <sys/sysmacros.h>
+#include <stdio.h>
+#include <errno.h>
+#include <fcntl.h>
+#include <string.h>
+#include <filesystem>
+#include <fstream>
+
+#include <fmt/format.h>
+
+#include "PMEMDevice.h"
+#include "libpmem.h"
+#include "include/types.h"
+#include "include/compat.h"
+#include "include/stringify.h"
+#include "common/errno.h"
+#include "common/debug.h"
+#include "common/blkdev.h"
+
+#if defined(HAVE_LIBDML)
+#include <dml/dml.hpp>
+using execution_path = dml::automatic;
+#endif
+
+#define dout_context cct
+#define dout_subsys ceph_subsys_bdev
+#undef dout_prefix
+#define dout_prefix *_dout << "bdev-PMEM(" << path << ") "
+
+PMEMDevice::PMEMDevice(CephContext *cct, aio_callback_t cb, void *cbpriv)
+ : BlockDevice(cct, cb, cbpriv),
+ fd(-1), addr(0),
+ injecting_crash(0)
+{
+}
+
+int PMEMDevice::_lock()
+{
+ struct flock l;
+ memset(&l, 0, sizeof(l));
+ l.l_type = F_WRLCK;
+ l.l_whence = SEEK_SET;
+ l.l_start = 0;
+ l.l_len = 0;
+ int r = ::fcntl(fd, F_SETLK, &l);
+ if (r < 0)
+ return -errno;
+ return 0;
+}
+
+static int pmem_check_file_type(int fd, const char *pmem_file, uint64_t *total_size)
+{
+ namespace fs = std::filesystem;
+ if (!fs::is_character_file(pmem_file)) {
+ return -EINVAL;
+ }
+ struct stat file_stat;
+ if (::fstat(fd, &file_stat)) {
+ return -EINVAL;
+ }
+ fs::path char_dir = fmt::format("/sys/dev/char/{}:{}",
+ major(file_stat.st_rdev),
+ minor(file_stat.st_rdev));
+ // Need to check if it is a DAX device
+ if (auto subsys_path = char_dir / "subsystem";
+ fs::read_symlink(subsys_path).filename().string() != "dax") {
+ return -EINVAL;
+ }
+ if (total_size == nullptr) {
+ return 0;
+ }
+ if (std::ifstream size_file(char_dir / "size"); size_file) {
+ size_file >> *total_size;
+ return size_file ? 0 : -EINVAL;
+ } else {
+ return -EINVAL;
+ }
+}
+
+int PMEMDevice::open(const std::string& p)
+{
+ path = p;
+ int r = 0;
+ dout(1) << __func__ << " path " << path << dendl;
+
+ fd = ::open(path.c_str(), O_RDWR | O_CLOEXEC);
+ if (fd < 0) {
+ r = -errno;
+ derr << __func__ << " open got: " << cpp_strerror(r) << dendl;
+ return r;
+ }
+
+ r = pmem_check_file_type(fd, path.c_str(), &size);
+ if (!r) {
+ dout(1) << __func__ << " This path " << path << " is a devdax dev " << dendl;
+ devdax_device = true;
+ // If using devdax char device, set it to not rotational device.
+ rotational = false;
+ }
+
+ r = _lock();
+ if (r < 0) {
+ derr << __func__ << " failed to lock " << path << ": " << cpp_strerror(r)
+ << dendl;
+ goto out_fail;
+ }
+
+ struct stat st;
+ r = ::fstat(fd, &st);
+ if (r < 0) {
+ r = -errno;
+ derr << __func__ << " fstat got " << cpp_strerror(r) << dendl;
+ goto out_fail;
+ }
+
+ size_t map_len;
+ addr = (char *)pmem_map_file(path.c_str(), 0,
+ devdax_device ? 0: PMEM_FILE_EXCL, O_RDWR,
+ &map_len, NULL);
+ if (addr == NULL) {
+ derr << __func__ << " pmem_map_file failed: " << pmem_errormsg() << dendl;
+ goto out_fail;
+ }
+ size = map_len;
+
+ // Operate as though the block size is 4 KB. The backing file
+ // blksize doesn't strictly matter except that some file systems may
+ // require a read/modify/write if we write something smaller than
+ // it.
+ block_size = g_conf()->bdev_block_size;
+ if (block_size != (unsigned)st.st_blksize) {
+ dout(1) << __func__ << " backing device/file reports st_blksize "
+ << st.st_blksize << ", using bdev_block_size "
+ << block_size << " anyway" << dendl;
+ }
+
+ dout(1) << __func__
+ << " size " << size
+ << " (" << byte_u_t(size) << ")"
+ << " block_size " << block_size
+ << " (" << byte_u_t(block_size) << ")"
+ << dendl;
+ return 0;
+
+ out_fail:
+ VOID_TEMP_FAILURE_RETRY(::close(fd));
+ fd = -1;
+ return r;
+}
+
+void PMEMDevice::close()
+{
+ dout(1) << __func__ << dendl;
+
+ ceph_assert(addr != NULL);
+ if (devdax_device) {
+ devdax_device = false;
+ }
+ pmem_unmap(addr, size);
+
+ ceph_assert(fd >= 0);
+ VOID_TEMP_FAILURE_RETRY(::close(fd));
+ fd = -1;
+
+ path.clear();
+}
+
+int PMEMDevice::collect_metadata(const std::string& prefix, std::map<std::string,std::string> *pm) const
+{
+ (*pm)[prefix + "rotational"] = stringify((int)(bool)rotational);
+ (*pm)[prefix + "size"] = stringify(get_size());
+ (*pm)[prefix + "block_size"] = stringify(get_block_size());
+ (*pm)[prefix + "driver"] = "PMEMDevice";
+ (*pm)[prefix + "type"] = "ssd";
+
+ struct stat st;
+ int r = ::fstat(fd, &st);
+ if (r < 0)
+ return -errno;
+ if (S_ISBLK(st.st_mode)) {
+ (*pm)[prefix + "access_mode"] = "blk";
+ char buffer[1024] = {0};
+ BlkDev blkdev(fd);
+
+ blkdev.model(buffer, sizeof(buffer));
+ (*pm)[prefix + "model"] = buffer;
+
+ buffer[0] = '\0';
+ blkdev.dev(buffer, sizeof(buffer));
+ (*pm)[prefix + "dev"] = buffer;
+
+ // nvme exposes a serial number
+ buffer[0] = '\0';
+ blkdev.serial(buffer, sizeof(buffer));
+ (*pm)[prefix + "serial"] = buffer;
+
+ } else if (S_ISCHR(st.st_mode)) {
+ (*pm)[prefix + "access_mode"] = "chardevice";
+ (*pm)[prefix + "path"] = path;
+
+ } else {
+ (*pm)[prefix + "access_mode"] = "file";
+ (*pm)[prefix + "path"] = path;
+ }
+ return 0;
+}
+
+bool PMEMDevice::support(const std::string &path)
+{
+ int is_pmem = 0;
+ size_t map_len = 0;
+ int r = 0;
+ int local_fd;
+
+ local_fd = ::open(path.c_str(), O_RDWR);
+ if (local_fd < 0) {
+ return false;
+ }
+
+ r = pmem_check_file_type(local_fd, path.c_str(), NULL);
+ VOID_TEMP_FAILURE_RETRY(::close(local_fd));
+ int flags = PMEM_FILE_EXCL;
+ if (r == 0) {
+ flags = 0;
+ }
+
+ void *addr = pmem_map_file(path.c_str(), 0, flags, O_RDONLY, &map_len, &is_pmem);
+ if (addr != NULL) {
+ pmem_unmap(addr, map_len);
+ if (is_pmem) {
+ return true;
+ }
+ }
+
+ return false;
+}
+
+int PMEMDevice::flush()
+{
+ //Because all write is persist. So no need
+ return 0;
+}
+
+
+void PMEMDevice::aio_submit(IOContext *ioc)
+{
+ if (ioc->priv) {
+ ceph_assert(ioc->num_running == 0);
+ aio_callback(aio_callback_priv, ioc->priv);
+ } else {
+ ioc->try_aio_wake();
+ }
+ return;
+}
+
+int PMEMDevice::write(uint64_t off, bufferlist& bl, bool buffered, int write_hint)
+{
+ uint64_t len = bl.length();
+ dout(20) << __func__ << " " << off << "~" << len << dendl;
+ ceph_assert(is_valid_io(off, len));
+
+ dout(40) << "data:\n";
+ bl.hexdump(*_dout);
+ *_dout << dendl;
+
+ if (g_conf()->bdev_inject_crash &&
+ rand() % g_conf()->bdev_inject_crash == 0) {
+ derr << __func__ << " bdev_inject_crash: dropping io " << off << "~" << len
+ << dendl;
+ ++injecting_crash;
+ return 0;
+ }
+
+ bufferlist::iterator p = bl.begin();
+ uint64_t off1 = off;
+ while (len) {
+ const char *data;
+ uint32_t l = p.get_ptr_and_advance(len, &data);
+
+#if defined(HAVE_LIBDML)
+ // Take care of the persistency issue
+ auto result = dml::execute<execution_path>(dml::mem_move, dml::make_view(data, l), dml::make_view(addr + off1, l));
+ ceph_assert(result.status == dml::status_code::ok);
+#else
+ pmem_memcpy_persist(addr + off1, data, l);
+#endif
+ len -= l;
+ off1 += l;
+ }
+ return 0;
+}
+
+int PMEMDevice::aio_write(
+ uint64_t off,
+ bufferlist &bl,
+ IOContext *ioc,
+ bool buffered,
+ int write_hint)
+{
+ return write(off, bl, buffered);
+}
+
+
+int PMEMDevice::read(uint64_t off, uint64_t len, bufferlist *pbl,
+ IOContext *ioc,
+ bool buffered)
+{
+ dout(5) << __func__ << " " << off << "~" << len << dendl;
+ ceph_assert(is_valid_io(off, len));
+
+ bufferptr p = buffer::create_small_page_aligned(len);
+
+#if defined(HAVE_LIBDML)
+ auto result = dml::execute<execution_path>(dml::mem_move, dml::make_view(addr + off, len), dml::make_view(p.c_str(), len));
+ ceph_assert(result.status == dml::status_code::ok);
+#else
+ memcpy(p.c_str(), addr + off, len);
+#endif
+
+ pbl->clear();
+ pbl->push_back(std::move(p));
+
+ dout(40) << "data:\n";
+ pbl->hexdump(*_dout);
+ *_dout << dendl;
+
+ return 0;
+}
+
+int PMEMDevice::aio_read(uint64_t off, uint64_t len, bufferlist *pbl,
+ IOContext *ioc)
+{
+ return read(off, len, pbl, ioc, false);
+}
+
+int PMEMDevice::read_random(uint64_t off, uint64_t len, char *buf, bool buffered)
+{
+ dout(5) << __func__ << " " << off << "~" << len << dendl;
+ ceph_assert(is_valid_io(off, len));
+
+
+#if defined(HAVE_LIBDML)
+ auto result = dml::execute<execution_path>(dml::mem_move, dml::make_view(addr + off, len), dml::make_view(buf, len));
+ ceph_assert(result.status == dml::status_code::ok);
+#else
+ memcpy(buf, addr + off, len);
+#endif
+ return 0;
+}
+
+
+int PMEMDevice::invalidate_cache(uint64_t off, uint64_t len)
+{
+ dout(5) << __func__ << " " << off << "~" << len << dendl;
+ return 0;
+}
+
+
diff --git a/src/blk/pmem/PMEMDevice.h b/src/blk/pmem/PMEMDevice.h
new file mode 100644
index 000000000..af42eb656
--- /dev/null
+++ b/src/blk/pmem/PMEMDevice.h
@@ -0,0 +1,78 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2015 Intel <jianpeng.ma@intel.com>
+ *
+ * Author: Jianpeng Ma <jianpeng.ma@intel.com>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+#ifndef CEPH_BLK_PMEMDEVICE_H
+#define CEPH_BLK_PMEMDEVICE_H
+
+#include <atomic>
+#include <map>
+#include <string>
+
+#include "os/fs/FS.h"
+#include "include/interval_set.h"
+#include "aio/aio.h"
+#include "BlockDevice.h"
+
+class PMEMDevice : public BlockDevice {
+ int fd;
+ char *addr; //the address of mmap
+ std::string path;
+ bool devdax_device = false;
+
+ ceph::mutex debug_lock = ceph::make_mutex("PMEMDevice::debug_lock");
+ interval_set<uint64_t> debug_inflight;
+
+ std::atomic_int injecting_crash;
+ int _lock();
+
+public:
+ PMEMDevice(CephContext *cct, aio_callback_t cb, void *cbpriv);
+
+ bool supported_bdev_label() override { return !devdax_device; }
+ void aio_submit(IOContext *ioc) override;
+
+ int collect_metadata(const std::string& prefix, std::map<std::string,std::string> *pm) const override;
+
+ static bool support(const std::string& path);
+
+ int read(uint64_t off, uint64_t len, bufferlist *pbl,
+ IOContext *ioc,
+ bool buffered) override;
+ int aio_read(uint64_t off, uint64_t len, bufferlist *pbl,
+ IOContext *ioc) override;
+
+ int read_random(uint64_t off, uint64_t len, char *buf, bool buffered) override;
+ int write(uint64_t off, bufferlist& bl, bool buffered, int write_hint = WRITE_LIFE_NOT_SET) override;
+ int aio_write(uint64_t off, bufferlist& bl,
+ IOContext *ioc,
+ bool buffered,
+ int write_hint = WRITE_LIFE_NOT_SET) override;
+ int flush() override;
+
+ // for managing buffered readers/writers
+ int invalidate_cache(uint64_t off, uint64_t len) override;
+ int open(const std::string &path) override;
+ void close() override;
+
+private:
+ bool is_valid_io(uint64_t off, uint64_t len) const {
+ return (len > 0 &&
+ off < size &&
+ off + len <= size);
+ }
+};
+
+#endif
diff --git a/src/blk/spdk/NVMEDevice.cc b/src/blk/spdk/NVMEDevice.cc
new file mode 100644
index 000000000..4461f6a07
--- /dev/null
+++ b/src/blk/spdk/NVMEDevice.cc
@@ -0,0 +1,992 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+//
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2015 XSky <haomai@xsky.com>
+ *
+ * Author: Haomai Wang <haomaiwang@gmail.com>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+#include <unistd.h>
+#include <stdlib.h>
+#include <strings.h>
+#include <sys/types.h>
+#include <sys/stat.h>
+#include <fcntl.h>
+#include <unistd.h>
+
+#include <chrono>
+#include <fstream>
+#include <functional>
+#include <map>
+#include <thread>
+#include <boost/intrusive/slist.hpp>
+
+#include <spdk/nvme.h>
+
+#include "include/intarith.h"
+#include "include/stringify.h"
+#include "include/types.h"
+#include "include/compat.h"
+#include "common/errno.h"
+#include "common/debug.h"
+#include "common/perf_counters.h"
+
+#include "NVMEDevice.h"
+
+#define dout_context g_ceph_context
+#define dout_subsys ceph_subsys_bdev
+#undef dout_prefix
+#define dout_prefix *_dout << "bdev(" << sn << ") "
+
+using namespace std;
+
+static constexpr uint16_t data_buffer_default_num = 1024;
+
+static constexpr uint32_t data_buffer_size = 8192;
+
+static constexpr uint16_t inline_segment_num = 32;
+
+/* Default to 10 seconds for the keep alive value. This value is arbitrary. */
+static constexpr uint32_t nvme_ctrlr_keep_alive_timeout_in_ms = 10000;
+
+static void io_complete(void *t, const struct spdk_nvme_cpl *completion);
+
+struct IORequest {
+ uint16_t cur_seg_idx = 0;
+ uint16_t nseg;
+ uint32_t cur_seg_left = 0;
+ void *inline_segs[inline_segment_num];
+ void **extra_segs = nullptr;
+};
+
+namespace bi = boost::intrusive;
+struct data_cache_buf : public bi::slist_base_hook<bi::link_mode<bi::normal_link>>
+{};
+
+struct Task;
+
+class SharedDriverData {
+ unsigned id;
+ spdk_nvme_transport_id trid;
+ spdk_nvme_ctrlr *ctrlr;
+ spdk_nvme_ns *ns;
+ uint32_t block_size = 0;
+ uint64_t size = 0;
+ std::thread admin_thread;
+
+ public:
+ std::vector<NVMEDevice*> registered_devices;
+ friend class SharedDriverQueueData;
+ SharedDriverData(unsigned id_, const spdk_nvme_transport_id& trid_,
+ spdk_nvme_ctrlr *c, spdk_nvme_ns *ns_)
+ : id(id_),
+ trid(trid_),
+ ctrlr(c),
+ ns(ns_) {
+ block_size = spdk_nvme_ns_get_extended_sector_size(ns);
+ size = spdk_nvme_ns_get_size(ns);
+ if (trid.trtype == SPDK_NVME_TRANSPORT_PCIE) {
+ return;
+ }
+
+ // For Non-PCIe transport, we need to send keep-alive periodically.
+ admin_thread = std::thread(
+ [this]() {
+ int rc;
+ while (true) {
+ rc = spdk_nvme_ctrlr_process_admin_completions(ctrlr);
+ ceph_assert(rc >= 0);
+ sleep(1);
+ }
+ }
+ );
+ }
+
+ bool is_equal(const spdk_nvme_transport_id& trid2) const {
+ return spdk_nvme_transport_id_compare(&trid, &trid2) == 0;
+ }
+ ~SharedDriverData() {
+ if (admin_thread.joinable()) {
+ admin_thread.join();
+ }
+ }
+
+ void register_device(NVMEDevice *device) {
+ registered_devices.push_back(device);
+ }
+
+ void remove_device(NVMEDevice *device) {
+ std::vector<NVMEDevice*> new_devices;
+ for (auto &&it : registered_devices) {
+ if (it != device)
+ new_devices.push_back(it);
+ }
+ registered_devices.swap(new_devices);
+ }
+
+ uint32_t get_block_size() {
+ return block_size;
+ }
+ uint64_t get_size() {
+ return size;
+ }
+};
+
+class SharedDriverQueueData {
+ NVMEDevice *bdev;
+ SharedDriverData *driver;
+ spdk_nvme_ctrlr *ctrlr;
+ spdk_nvme_ns *ns;
+ std::string sn;
+ uint32_t block_size;
+ uint32_t max_queue_depth;
+ struct spdk_nvme_qpair *qpair;
+ int alloc_buf_from_pool(Task *t, bool write);
+
+ public:
+ uint32_t current_queue_depth = 0;
+ std::atomic_ulong completed_op_seq, queue_op_seq;
+ bi::slist<data_cache_buf, bi::constant_time_size<true>> data_buf_list;
+ void _aio_handle(Task *t, IOContext *ioc);
+
+ SharedDriverQueueData(NVMEDevice *bdev, SharedDriverData *driver)
+ : bdev(bdev),
+ driver(driver) {
+ ctrlr = driver->ctrlr;
+ ns = driver->ns;
+ block_size = driver->block_size;
+
+ struct spdk_nvme_io_qpair_opts opts = {};
+ spdk_nvme_ctrlr_get_default_io_qpair_opts(ctrlr, &opts, sizeof(opts));
+ opts.qprio = SPDK_NVME_QPRIO_URGENT;
+ // usable queue depth should minus 1 to avoid overflow.
+ max_queue_depth = opts.io_queue_size - 1;
+ qpair = spdk_nvme_ctrlr_alloc_io_qpair(ctrlr, &opts, sizeof(opts));
+ ceph_assert(qpair != NULL);
+
+ // allocate spdk dma memory
+ for (uint16_t i = 0; i < data_buffer_default_num; i++) {
+ void *b = spdk_dma_zmalloc(data_buffer_size, CEPH_PAGE_SIZE, NULL);
+ if (!b) {
+ derr << __func__ << " failed to create memory pool for nvme data buffer" << dendl;
+ ceph_assert(b);
+ }
+ data_buf_list.push_front(*reinterpret_cast<data_cache_buf *>(b));
+ }
+ }
+
+ ~SharedDriverQueueData() {
+ if (qpair) {
+ spdk_nvme_ctrlr_free_io_qpair(qpair);
+ }
+
+ data_buf_list.clear_and_dispose(spdk_dma_free);
+ }
+};
+
+struct Task {
+ NVMEDevice *device;
+ IOContext *ctx = nullptr;
+ IOCommand command;
+ uint64_t offset;
+ uint64_t len;
+ bufferlist bl;
+ std::function<void()> fill_cb;
+ Task *next = nullptr;
+ int64_t return_code;
+ Task *primary = nullptr;
+ IORequest io_request = {};
+ SharedDriverQueueData *queue = nullptr;
+ // reference count by subtasks.
+ int ref = 0;
+ Task(NVMEDevice *dev, IOCommand c, uint64_t off, uint64_t l, int64_t rc = 0,
+ Task *p = nullptr)
+ : device(dev), command(c), offset(off), len(l),
+ return_code(rc), primary(p) {
+ if (primary) {
+ primary->ref++;
+ return_code = primary->return_code;
+ }
+ }
+ ~Task() {
+ if (primary)
+ primary->ref--;
+ ceph_assert(!io_request.nseg);
+ }
+ void release_segs(SharedDriverQueueData *queue_data) {
+ if (io_request.extra_segs) {
+ for (uint16_t i = 0; i < io_request.nseg; i++) {
+ auto buf = reinterpret_cast<data_cache_buf *>(io_request.extra_segs[i]);
+ queue_data->data_buf_list.push_front(*buf);
+ }
+ delete io_request.extra_segs;
+ } else if (io_request.nseg) {
+ for (uint16_t i = 0; i < io_request.nseg; i++) {
+ auto buf = reinterpret_cast<data_cache_buf *>(io_request.inline_segs[i]);
+ queue_data->data_buf_list.push_front(*buf);
+ }
+ }
+ ctx->total_nseg -= io_request.nseg;
+ io_request.nseg = 0;
+ }
+
+ void copy_to_buf(char *buf, uint64_t off, uint64_t len) {
+ uint64_t copied = 0;
+ uint64_t left = len;
+ void **segs = io_request.extra_segs ? io_request.extra_segs : io_request.inline_segs;
+ uint16_t i = 0;
+ while (left > 0) {
+ char *src = static_cast<char*>(segs[i++]);
+ uint64_t need_copy = std::min(left, data_buffer_size-off);
+ memcpy(buf+copied, src+off, need_copy);
+ off = 0;
+ left -= need_copy;
+ copied += need_copy;
+ }
+ }
+};
+
+static void data_buf_reset_sgl(void *cb_arg, uint32_t sgl_offset)
+{
+ Task *t = static_cast<Task*>(cb_arg);
+ uint32_t i = sgl_offset / data_buffer_size;
+ uint32_t offset = i * data_buffer_size;
+ ceph_assert(i <= t->io_request.nseg);
+
+ for (; i < t->io_request.nseg; i++) {
+ offset += data_buffer_size;
+ if (offset > sgl_offset) {
+ if (offset > t->len)
+ offset = t->len;
+ break;
+ }
+ }
+
+ t->io_request.cur_seg_idx = i;
+ t->io_request.cur_seg_left = offset - sgl_offset;
+ return ;
+}
+
+static int data_buf_next_sge(void *cb_arg, void **address, uint32_t *length)
+{
+ uint32_t size;
+ void *addr;
+ Task *t = static_cast<Task*>(cb_arg);
+ if (t->io_request.cur_seg_idx >= t->io_request.nseg) {
+ *length = 0;
+ *address = 0;
+ return 0;
+ }
+
+ addr = t->io_request.extra_segs ? t->io_request.extra_segs[t->io_request.cur_seg_idx] : t->io_request.inline_segs[t->io_request.cur_seg_idx];
+
+ size = data_buffer_size;
+ if (t->io_request.cur_seg_idx == t->io_request.nseg - 1) {
+ uint64_t tail = t->len % data_buffer_size;
+ if (tail) {
+ size = (uint32_t) tail;
+ }
+ }
+
+ if (t->io_request.cur_seg_left) {
+ *address = (void *)((uint64_t)addr + size - t->io_request.cur_seg_left);
+ *length = t->io_request.cur_seg_left;
+ t->io_request.cur_seg_left = 0;
+ } else {
+ *address = addr;
+ *length = size;
+ }
+
+ t->io_request.cur_seg_idx++;
+ return 0;
+}
+
+int SharedDriverQueueData::alloc_buf_from_pool(Task *t, bool write)
+{
+ uint64_t count = t->len / data_buffer_size;
+ if (t->len % data_buffer_size)
+ ++count;
+ void **segs;
+ if (count > data_buf_list.size())
+ return -ENOMEM;
+ if (count <= inline_segment_num) {
+ segs = t->io_request.inline_segs;
+ } else {
+ t->io_request.extra_segs = new void*[count];
+ segs = t->io_request.extra_segs;
+ }
+ for (uint16_t i = 0; i < count; i++) {
+ ceph_assert(!data_buf_list.empty());
+ segs[i] = &data_buf_list.front();
+ ceph_assert(segs[i] != nullptr);
+ data_buf_list.pop_front();
+ }
+ t->io_request.nseg = count;
+ t->ctx->total_nseg += count;
+ if (write) {
+ auto blp = t->bl.begin();
+ uint32_t len = 0;
+ uint16_t i = 0;
+ for (; i < count - 1; ++i) {
+ blp.copy(data_buffer_size, static_cast<char*>(segs[i]));
+ len += data_buffer_size;
+ }
+ blp.copy(t->bl.length() - len, static_cast<char*>(segs[i]));
+ }
+
+ return 0;
+}
+
+void SharedDriverQueueData::_aio_handle(Task *t, IOContext *ioc)
+{
+ dout(20) << __func__ << " start" << dendl;
+
+ int r = 0;
+ uint64_t lba_off, lba_count;
+ uint32_t max_io_completion = (uint32_t)g_conf().get_val<uint64_t>("bluestore_spdk_max_io_completion");
+ uint64_t io_sleep_in_us = g_conf().get_val<uint64_t>("bluestore_spdk_io_sleep");
+
+ while (ioc->num_running) {
+ again:
+ dout(40) << __func__ << " polling" << dendl;
+ if (current_queue_depth) {
+ r = spdk_nvme_qpair_process_completions(qpair, max_io_completion);
+ if (r < 0) {
+ ceph_abort();
+ } else if (r == 0) {
+ usleep(io_sleep_in_us);
+ }
+ }
+
+ for (; t; t = t->next) {
+ if (current_queue_depth == max_queue_depth) {
+ // no slots
+ goto again;
+ }
+
+ t->queue = this;
+ lba_off = t->offset / block_size;
+ lba_count = t->len / block_size;
+ switch (t->command) {
+ case IOCommand::WRITE_COMMAND:
+ {
+ dout(20) << __func__ << " write command issued " << lba_off << "~" << lba_count << dendl;
+ r = alloc_buf_from_pool(t, true);
+ if (r < 0) {
+ goto again;
+ }
+
+ r = spdk_nvme_ns_cmd_writev(
+ ns, qpair, lba_off, lba_count, io_complete, t, 0,
+ data_buf_reset_sgl, data_buf_next_sge);
+ if (r < 0) {
+ derr << __func__ << " failed to do write command: " << cpp_strerror(r) << dendl;
+ t->ctx->nvme_task_first = t->ctx->nvme_task_last = nullptr;
+ t->release_segs(this);
+ delete t;
+ ceph_abort();
+ }
+ break;
+ }
+ case IOCommand::READ_COMMAND:
+ {
+ dout(20) << __func__ << " read command issued " << lba_off << "~" << lba_count << dendl;
+ r = alloc_buf_from_pool(t, false);
+ if (r < 0) {
+ goto again;
+ }
+
+ r = spdk_nvme_ns_cmd_readv(
+ ns, qpair, lba_off, lba_count, io_complete, t, 0,
+ data_buf_reset_sgl, data_buf_next_sge);
+ if (r < 0) {
+ derr << __func__ << " failed to read: " << cpp_strerror(r) << dendl;
+ t->release_segs(this);
+ delete t;
+ ceph_abort();
+ }
+ break;
+ }
+ case IOCommand::FLUSH_COMMAND:
+ {
+ dout(20) << __func__ << " flush command issueed " << dendl;
+ r = spdk_nvme_ns_cmd_flush(ns, qpair, io_complete, t);
+ if (r < 0) {
+ derr << __func__ << " failed to flush: " << cpp_strerror(r) << dendl;
+ t->release_segs(this);
+ delete t;
+ ceph_abort();
+ }
+ break;
+ }
+ }
+ current_queue_depth++;
+ }
+ }
+
+ dout(20) << __func__ << " end" << dendl;
+}
+
+#define dout_subsys ceph_subsys_bdev
+#undef dout_prefix
+#define dout_prefix *_dout << "bdev "
+
+class NVMEManager {
+ public:
+ struct ProbeContext {
+ spdk_nvme_transport_id trid;
+ NVMEManager *manager;
+ SharedDriverData *driver;
+ bool done;
+ };
+
+ private:
+ ceph::mutex lock = ceph::make_mutex("NVMEManager::lock");
+ bool stopping = false;
+ std::vector<SharedDriverData*> shared_driver_datas;
+ std::thread dpdk_thread;
+ ceph::mutex probe_queue_lock = ceph::make_mutex("NVMEManager::probe_queue_lock");
+ ceph::condition_variable probe_queue_cond;
+ std::list<ProbeContext*> probe_queue;
+
+ public:
+ NVMEManager() {}
+ ~NVMEManager() {
+ if (!dpdk_thread.joinable())
+ return;
+ {
+ std::lock_guard guard(probe_queue_lock);
+ stopping = true;
+ probe_queue_cond.notify_all();
+ }
+ dpdk_thread.join();
+ }
+
+ int try_get(const spdk_nvme_transport_id& trid, SharedDriverData **driver);
+ void register_ctrlr(const spdk_nvme_transport_id& trid, spdk_nvme_ctrlr *c, SharedDriverData **driver) {
+ ceph_assert(ceph_mutex_is_locked(lock));
+ spdk_nvme_ns *ns;
+ int num_ns = spdk_nvme_ctrlr_get_num_ns(c);
+ ceph_assert(num_ns >= 1);
+ if (num_ns > 1) {
+ dout(0) << __func__ << " namespace count larger than 1, currently only use the first namespace" << dendl;
+ }
+ ns = spdk_nvme_ctrlr_get_ns(c, 1);
+ if (!ns) {
+ derr << __func__ << " failed to get namespace at 1" << dendl;
+ ceph_abort();
+ }
+ dout(1) << __func__ << " successfully attach nvme device at" << trid.traddr << dendl;
+
+ // only support one device per osd now!
+ ceph_assert(shared_driver_datas.empty());
+ // index 0 is occurred by master thread
+ shared_driver_datas.push_back(new SharedDriverData(shared_driver_datas.size()+1, trid, c, ns));
+ *driver = shared_driver_datas.back();
+ }
+};
+
+static NVMEManager manager;
+
+static bool probe_cb(void *cb_ctx, const struct spdk_nvme_transport_id *trid, struct spdk_nvme_ctrlr_opts *opts)
+{
+ NVMEManager::ProbeContext *ctx = static_cast<NVMEManager::ProbeContext*>(cb_ctx);
+ bool do_attach = false;
+
+ if (trid->trtype == SPDK_NVME_TRANSPORT_PCIE) {
+ do_attach = spdk_nvme_transport_id_compare(&ctx->trid, trid) == 0;
+ if (!do_attach) {
+ dout(0) << __func__ << " device traddr (" << ctx->trid.traddr
+ << ") not match " << trid->traddr << dendl;
+ }
+ } else {
+ // for non-pcie devices, should always match the specified trid
+ assert(!spdk_nvme_transport_id_compare(&ctx->trid, trid));
+ do_attach = true;
+ }
+
+ if (do_attach) {
+ dout(0) << __func__ << " found device at: "
+ << "trtype=" << spdk_nvme_transport_id_trtype_str(trid->trtype) << ", "
+ << "traddr=" << trid->traddr << dendl;
+
+ opts->io_queue_size = UINT16_MAX;
+ opts->io_queue_requests = UINT16_MAX;
+ opts->keep_alive_timeout_ms = nvme_ctrlr_keep_alive_timeout_in_ms;
+ }
+
+ return do_attach;
+}
+
+static void attach_cb(void *cb_ctx, const struct spdk_nvme_transport_id *trid,
+ struct spdk_nvme_ctrlr *ctrlr, const struct spdk_nvme_ctrlr_opts *opts)
+{
+ auto ctx = static_cast<NVMEManager::ProbeContext*>(cb_ctx);
+ ctx->manager->register_ctrlr(ctx->trid, ctrlr, &ctx->driver);
+}
+
+static int hex2dec(unsigned char c)
+{
+ if (isdigit(c))
+ return c - '0';
+ else if (isupper(c))
+ return c - 'A' + 10;
+ else
+ return c - 'a' + 10;
+}
+
+static int find_first_bitset(const string& s)
+{
+ auto e = s.rend();
+ if (s.compare(0, 2, "0x") == 0 ||
+ s.compare(0, 2, "0X") == 0) {
+ advance(e, -2);
+ }
+ auto p = s.rbegin();
+ for (int pos = 0; p != e; ++p, pos += 4) {
+ if (!isxdigit(*p)) {
+ return -EINVAL;
+ }
+ if (int val = hex2dec(*p); val != 0) {
+ return pos + ffs(val);
+ }
+ }
+ return 0;
+}
+
+int NVMEManager::try_get(const spdk_nvme_transport_id& trid, SharedDriverData **driver)
+{
+ std::lock_guard l(lock);
+ for (auto &&it : shared_driver_datas) {
+ if (it->is_equal(trid)) {
+ *driver = it;
+ return 0;
+ }
+ }
+
+ auto coremask_arg = g_conf().get_val<std::string>("bluestore_spdk_coremask");
+ int m_core_arg = find_first_bitset(coremask_arg);
+ // at least one core is needed for using spdk
+ if (m_core_arg <= 0) {
+ derr << __func__ << " invalid bluestore_spdk_coremask, "
+ << "at least one core is needed" << dendl;
+ return -ENOENT;
+ }
+ m_core_arg -= 1;
+
+ uint32_t mem_size_arg = (uint32_t)g_conf().get_val<Option::size_t>("bluestore_spdk_mem");
+
+ if (!dpdk_thread.joinable()) {
+ dpdk_thread = std::thread(
+ [this, coremask_arg, m_core_arg, mem_size_arg, trid]() {
+ struct spdk_env_opts opts;
+ struct spdk_pci_addr addr;
+ int r;
+
+ bool local_pci_device = false;
+ int rc = spdk_pci_addr_parse(&addr, trid.traddr);
+ if (!rc) {
+ local_pci_device = true;
+ opts.pci_whitelist = &addr;
+ opts.num_pci_addr = 1;
+ }
+
+ spdk_env_opts_init(&opts);
+ opts.name = "nvme-device-manager";
+ opts.core_mask = coremask_arg.c_str();
+ opts.master_core = m_core_arg;
+ opts.mem_size = mem_size_arg;
+ spdk_env_init(&opts);
+ spdk_unaffinitize_thread();
+
+ std::unique_lock l(probe_queue_lock);
+ while (!stopping) {
+ if (!probe_queue.empty()) {
+ ProbeContext* ctxt = probe_queue.front();
+ probe_queue.pop_front();
+ r = spdk_nvme_probe(local_pci_device ? NULL : &trid, ctxt, probe_cb, attach_cb, NULL);
+ if (r < 0) {
+ ceph_assert(!ctxt->driver);
+ derr << __func__ << " device probe nvme failed" << dendl;
+ }
+ ctxt->done = true;
+ probe_queue_cond.notify_all();
+ } else {
+ probe_queue_cond.wait(l);
+ }
+ }
+ for (auto p : probe_queue)
+ p->done = true;
+ probe_queue_cond.notify_all();
+ }
+ );
+ }
+
+ ProbeContext ctx{trid, this, nullptr, false};
+ {
+ std::unique_lock l(probe_queue_lock);
+ probe_queue.push_back(&ctx);
+ while (!ctx.done)
+ probe_queue_cond.wait(l);
+ }
+ if (!ctx.driver)
+ return -1;
+ *driver = ctx.driver;
+
+ return 0;
+}
+
+void io_complete(void *t, const struct spdk_nvme_cpl *completion)
+{
+ Task *task = static_cast<Task*>(t);
+ IOContext *ctx = task->ctx;
+ SharedDriverQueueData *queue = task->queue;
+
+ ceph_assert(queue != NULL);
+ ceph_assert(ctx != NULL);
+ --queue->current_queue_depth;
+ if (task->command == IOCommand::WRITE_COMMAND) {
+ ceph_assert(!spdk_nvme_cpl_is_error(completion));
+ dout(20) << __func__ << " write/zero op successfully, left "
+ << queue->queue_op_seq - queue->completed_op_seq << dendl;
+ // check waiting count before doing callback (which may
+ // destroy this ioc).
+ if (ctx->priv) {
+ if (!--ctx->num_running) {
+ task->device->aio_callback(task->device->aio_callback_priv, ctx->priv);
+ }
+ } else {
+ ctx->try_aio_wake();
+ }
+ task->release_segs(queue);
+ delete task;
+ } else if (task->command == IOCommand::READ_COMMAND) {
+ ceph_assert(!spdk_nvme_cpl_is_error(completion));
+ dout(20) << __func__ << " read op successfully" << dendl;
+ task->fill_cb();
+ task->release_segs(queue);
+ // read submitted by AIO
+ if (!task->return_code) {
+ if (ctx->priv) {
+ if (!--ctx->num_running) {
+ task->device->aio_callback(task->device->aio_callback_priv, ctx->priv);
+ }
+ } else {
+ ctx->try_aio_wake();
+ }
+ delete task;
+ } else {
+ if (Task* primary = task->primary; primary != nullptr) {
+ delete task;
+ if (!primary->ref)
+ primary->return_code = 0;
+ } else {
+ task->return_code = 0;
+ }
+ --ctx->num_running;
+ }
+ } else {
+ ceph_assert(task->command == IOCommand::FLUSH_COMMAND);
+ ceph_assert(!spdk_nvme_cpl_is_error(completion));
+ dout(20) << __func__ << " flush op successfully" << dendl;
+ task->return_code = 0;
+ }
+}
+
+// ----------------
+#undef dout_prefix
+#define dout_prefix *_dout << "bdev(" << name << ") "
+
+NVMEDevice::NVMEDevice(CephContext* cct, aio_callback_t cb, void *cbpriv)
+ : BlockDevice(cct, cb, cbpriv),
+ driver(nullptr)
+{
+}
+
+bool NVMEDevice::support(const std::string& path)
+{
+ char buf[PATH_MAX + 1];
+ int r = ::readlink(path.c_str(), buf, sizeof(buf) - 1);
+ if (r >= 0) {
+ buf[r] = '\0';
+ char *bname = ::basename(buf);
+ if (strncmp(bname, SPDK_PREFIX, sizeof(SPDK_PREFIX)-1) == 0) {
+ return true;
+ }
+ }
+ return false;
+}
+
+int NVMEDevice::open(const string& p)
+{
+ dout(1) << __func__ << " path " << p << dendl;
+
+ std::ifstream ifs(p);
+ if (!ifs) {
+ derr << __func__ << " unable to open " << p << dendl;
+ return -1;
+ }
+ string val;
+ std::getline(ifs, val);
+ spdk_nvme_transport_id trid;
+ if (int r = spdk_nvme_transport_id_parse(&trid, val.c_str()); r) {
+ derr << __func__ << " unable to read " << p << ": " << cpp_strerror(r)
+ << dendl;
+ return r;
+ }
+ if (int r = manager.try_get(trid, &driver); r < 0) {
+ derr << __func__ << " failed to get nvme device with transport address "
+ << trid.traddr << " type " << trid.trtype << dendl;
+ return r;
+ }
+
+ driver->register_device(this);
+ block_size = driver->get_block_size();
+ size = driver->get_size();
+ name = trid.traddr;
+
+ //nvme is non-rotational device.
+ rotational = false;
+
+ // round size down to an even block
+ size &= ~(block_size - 1);
+
+ dout(1) << __func__ << " size " << size << " (" << byte_u_t(size) << ")"
+ << " block_size " << block_size << " (" << byte_u_t(block_size)
+ << ")" << dendl;
+
+
+ return 0;
+}
+
+void NVMEDevice::close()
+{
+ dout(1) << __func__ << dendl;
+
+ name.clear();
+ driver->remove_device(this);
+
+ dout(1) << __func__ << " end" << dendl;
+}
+
+int NVMEDevice::collect_metadata(const string& prefix, map<string,string> *pm) const
+{
+ (*pm)[prefix + "rotational"] = "0";
+ (*pm)[prefix + "size"] = stringify(get_size());
+ (*pm)[prefix + "block_size"] = stringify(get_block_size());
+ (*pm)[prefix + "driver"] = "NVMEDevice";
+ (*pm)[prefix + "type"] = "nvme";
+ (*pm)[prefix + "access_mode"] = "spdk";
+ (*pm)[prefix + "nvme_serial_number"] = name;
+
+ return 0;
+}
+
+int NVMEDevice::flush()
+{
+ return 0;
+}
+
+void NVMEDevice::aio_submit(IOContext *ioc)
+{
+ dout(20) << __func__ << " ioc " << ioc << " pending "
+ << ioc->num_pending.load() << " running "
+ << ioc->num_running.load() << dendl;
+ int pending = ioc->num_pending.load();
+ Task *t = static_cast<Task*>(ioc->nvme_task_first);
+ if (pending && t) {
+ ioc->num_running += pending;
+ ioc->num_pending -= pending;
+ ceph_assert(ioc->num_pending.load() == 0); // we should be only thread doing this
+ // Only need to push the first entry
+ ioc->nvme_task_first = ioc->nvme_task_last = nullptr;
+
+ thread_local SharedDriverQueueData queue_t = SharedDriverQueueData(this, driver);
+ queue_t._aio_handle(t, ioc);
+ }
+}
+
+static void ioc_append_task(IOContext *ioc, Task *t)
+{
+ Task *first, *last;
+
+ first = static_cast<Task*>(ioc->nvme_task_first);
+ last = static_cast<Task*>(ioc->nvme_task_last);
+ if (last)
+ last->next = t;
+ if (!first)
+ ioc->nvme_task_first = t;
+ ioc->nvme_task_last = t;
+ ++ioc->num_pending;
+}
+
+static void write_split(
+ NVMEDevice *dev,
+ uint64_t off,
+ bufferlist &bl,
+ IOContext *ioc)
+{
+ uint64_t remain_len = bl.length(), begin = 0, write_size;
+ Task *t;
+ // This value may need to be got from configuration later.
+ uint64_t split_size = 131072; // 128KB.
+
+ while (remain_len > 0) {
+ write_size = std::min(remain_len, split_size);
+ t = new Task(dev, IOCommand::WRITE_COMMAND, off + begin, write_size);
+ // TODO: if upper layer alloc memory with known physical address,
+ // we can reduce this copy
+ bl.splice(0, write_size, &t->bl);
+ remain_len -= write_size;
+ t->ctx = ioc;
+ ioc_append_task(ioc, t);
+ begin += write_size;
+ }
+}
+
+static void make_read_tasks(
+ NVMEDevice *dev,
+ uint64_t aligned_off,
+ IOContext *ioc, char *buf, uint64_t aligned_len, Task *primary,
+ uint64_t orig_off, uint64_t orig_len)
+{
+ // This value may need to be got from configuration later.
+ uint64_t split_size = 131072; // 128KB.
+ uint64_t tmp_off = orig_off - aligned_off, remain_orig_len = orig_len;
+ auto begin = aligned_off;
+ const auto aligned_end = begin + aligned_len;
+
+ for (; begin < aligned_end; begin += split_size) {
+ auto read_size = std::min(aligned_end - begin, split_size);
+ auto tmp_len = std::min(remain_orig_len, read_size - tmp_off);
+ Task *t = nullptr;
+
+ if (primary && (aligned_len <= split_size)) {
+ t = primary;
+ } else {
+ t = new Task(dev, IOCommand::READ_COMMAND, begin, read_size, 0, primary);
+ }
+
+ t->ctx = ioc;
+
+ // TODO: if upper layer alloc memory with known physical address,
+ // we can reduce this copy
+ t->fill_cb = [buf, t, tmp_off, tmp_len] {
+ t->copy_to_buf(buf, tmp_off, tmp_len);
+ };
+
+ ioc_append_task(ioc, t);
+ remain_orig_len -= tmp_len;
+ buf += tmp_len;
+ tmp_off = 0;
+ }
+}
+
+int NVMEDevice::aio_write(
+ uint64_t off,
+ bufferlist &bl,
+ IOContext *ioc,
+ bool buffered,
+ int write_hint)
+{
+ uint64_t len = bl.length();
+ dout(20) << __func__ << " " << off << "~" << len << " ioc " << ioc
+ << " buffered " << buffered << dendl;
+ ceph_assert(is_valid_io(off, len));
+
+ write_split(this, off, bl, ioc);
+ dout(5) << __func__ << " " << off << "~" << len << dendl;
+
+ return 0;
+}
+
+int NVMEDevice::write(uint64_t off, bufferlist &bl, bool buffered, int write_hint)
+{
+ uint64_t len = bl.length();
+ dout(20) << __func__ << " " << off << "~" << len << " buffered "
+ << buffered << dendl;
+ ceph_assert(off % block_size == 0);
+ ceph_assert(len % block_size == 0);
+ ceph_assert(len > 0);
+ ceph_assert(off < size);
+ ceph_assert(off + len <= size);
+
+ IOContext ioc(cct, NULL);
+ write_split(this, off, bl, &ioc);
+ dout(5) << __func__ << " " << off << "~" << len << dendl;
+ aio_submit(&ioc);
+ ioc.aio_wait();
+ return 0;
+}
+
+int NVMEDevice::read(uint64_t off, uint64_t len, bufferlist *pbl,
+ IOContext *ioc,
+ bool buffered)
+{
+ dout(5) << __func__ << " " << off << "~" << len << " ioc " << ioc << dendl;
+ ceph_assert(is_valid_io(off, len));
+
+ Task t(this, IOCommand::READ_COMMAND, off, len, 1);
+ bufferptr p = buffer::create_small_page_aligned(len);
+ char *buf = p.c_str();
+
+ // for sync read, need to control IOContext in itself
+ IOContext read_ioc(cct, nullptr);
+ make_read_tasks(this, off, &read_ioc, buf, len, &t, off, len);
+ dout(5) << __func__ << " " << off << "~" << len << dendl;
+ aio_submit(&read_ioc);
+
+ pbl->push_back(std::move(p));
+ return t.return_code;
+}
+
+int NVMEDevice::aio_read(
+ uint64_t off,
+ uint64_t len,
+ bufferlist *pbl,
+ IOContext *ioc)
+{
+ dout(20) << __func__ << " " << off << "~" << len << " ioc " << ioc << dendl;
+ ceph_assert(is_valid_io(off, len));
+ bufferptr p = buffer::create_small_page_aligned(len);
+ pbl->append(p);
+ char* buf = p.c_str();
+
+ make_read_tasks(this, off, ioc, buf, len, NULL, off, len);
+ dout(5) << __func__ << " " << off << "~" << len << dendl;
+ return 0;
+}
+
+int NVMEDevice::read_random(uint64_t off, uint64_t len, char *buf, bool buffered)
+{
+ ceph_assert(len > 0);
+ ceph_assert(off < size);
+ ceph_assert(off + len <= size);
+
+ uint64_t aligned_off = p2align(off, block_size);
+ uint64_t aligned_len = p2roundup(off+len, block_size) - aligned_off;
+ dout(5) << __func__ << " " << off << "~" << len
+ << " aligned " << aligned_off << "~" << aligned_len << dendl;
+ IOContext ioc(g_ceph_context, nullptr);
+ Task t(this, IOCommand::READ_COMMAND, aligned_off, aligned_len, 1);
+
+ make_read_tasks(this, aligned_off, &ioc, buf, aligned_len, &t, off, len);
+ aio_submit(&ioc);
+
+ return t.return_code;
+}
+
+int NVMEDevice::invalidate_cache(uint64_t off, uint64_t len)
+{
+ dout(5) << __func__ << " " << off << "~" << len << dendl;
+ return 0;
+}
diff --git a/src/blk/spdk/NVMEDevice.h b/src/blk/spdk/NVMEDevice.h
new file mode 100644
index 000000000..323d7281f
--- /dev/null
+++ b/src/blk/spdk/NVMEDevice.h
@@ -0,0 +1,84 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2015 XSky <haomai@xsky.com>
+ *
+ * Author: Haomai Wang <haomaiwang@gmail.com>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+#ifndef CEPH_BLK_NVMEDEVICE
+#define CEPH_BLK_NVMEDEVICE
+
+#include <queue>
+#include <map>
+#include <limits>
+
+// since _Static_assert introduced in c11
+#define _Static_assert static_assert
+
+
+#include "include/interval_set.h"
+#include "common/ceph_time.h"
+#include "BlockDevice.h"
+
+enum class IOCommand {
+ READ_COMMAND,
+ WRITE_COMMAND,
+ FLUSH_COMMAND
+};
+
+class SharedDriverData;
+class SharedDriverQueueData;
+
+class NVMEDevice : public BlockDevice {
+ /**
+ * points to pinned, physically contiguous memory region;
+ * contains 4KB IDENTIFY structure for controller which is
+ * target for CONTROLLER IDENTIFY command during initialization
+ */
+ SharedDriverData *driver;
+ std::string name;
+
+ public:
+ SharedDriverData *get_driver() { return driver; }
+
+ NVMEDevice(CephContext* cct, aio_callback_t cb, void *cbpriv);
+
+ bool supported_bdev_label() override { return false; }
+
+ static bool support(const std::string& path);
+
+ void aio_submit(IOContext *ioc) override;
+
+ int read(uint64_t off, uint64_t len, bufferlist *pbl,
+ IOContext *ioc,
+ bool buffered) override;
+ int aio_read(
+ uint64_t off,
+ uint64_t len,
+ bufferlist *pbl,
+ IOContext *ioc) override;
+ int aio_write(uint64_t off, bufferlist& bl,
+ IOContext *ioc,
+ bool buffered,
+ int write_hint = WRITE_LIFE_NOT_SET) override;
+ int write(uint64_t off, bufferlist& bl, bool buffered, int write_hint = WRITE_LIFE_NOT_SET) override;
+ int flush() override;
+ int read_random(uint64_t off, uint64_t len, char *buf, bool buffered) override;
+
+ // for managing buffered readers/writers
+ int invalidate_cache(uint64_t off, uint64_t len) override;
+ int open(const std::string& path) override;
+ void close() override;
+ int collect_metadata(const std::string& prefix, std::map<std::string,std::string> *pm) const override;
+};
+
+#endif
diff --git a/src/blk/zoned/HMSMRDevice.cc b/src/blk/zoned/HMSMRDevice.cc
new file mode 100644
index 000000000..416eae4e4
--- /dev/null
+++ b/src/blk/zoned/HMSMRDevice.cc
@@ -0,0 +1,131 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2014 Red Hat
+ * Copyright (C) 2020 Abutalib Aghayev
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+#include "HMSMRDevice.h"
+extern "C" {
+#include <libzbd/zbd.h>
+}
+#include "common/debug.h"
+#include "common/errno.h"
+
+#define dout_context cct
+#define dout_subsys ceph_subsys_bdev
+#undef dout_prefix
+#define dout_prefix *_dout << "smrbdev(" << this << " " << path << ") "
+
+using namespace std;
+
+HMSMRDevice::HMSMRDevice(CephContext* cct,
+ aio_callback_t cb,
+ void *cbpriv,
+ aio_callback_t d_cb,
+ void *d_cbpriv)
+ : KernelDevice(cct, cb, cbpriv, d_cb, d_cbpriv)
+{
+}
+
+bool HMSMRDevice::support(const std::string& path)
+{
+ return zbd_device_is_zoned(path.c_str()) == 1;
+}
+
+int HMSMRDevice::_post_open()
+{
+ dout(10) << __func__ << dendl;
+
+ zbd_fd = zbd_open(path.c_str(), O_RDWR | O_DIRECT | O_LARGEFILE, nullptr);
+ int r;
+ if (zbd_fd < 0) {
+ r = errno;
+ derr << __func__ << " zbd_open failed on " << path << ": "
+ << cpp_strerror(r) << dendl;
+ return -r;
+ }
+
+ unsigned int nr_zones = 0;
+ std::vector<zbd_zone> zones;
+ if (zbd_report_nr_zones(zbd_fd, 0, 0, ZBD_RO_NOT_WP, &nr_zones) != 0) {
+ r = -errno;
+ derr << __func__ << " zbd_report_nr_zones failed on " << path << ": "
+ << cpp_strerror(r) << dendl;
+ goto fail;
+ }
+
+ zones.resize(nr_zones);
+ if (zbd_report_zones(zbd_fd, 0, 0, ZBD_RO_NOT_WP, zones.data(), &nr_zones) != 0) {
+ r = -errno;
+ derr << __func__ << " zbd_report_zones failed on " << path << dendl;
+ goto fail;
+ }
+
+ zone_size = zbd_zone_len(&zones[0]);
+ conventional_region_size = nr_zones * zone_size;
+
+ dout(10) << __func__ << " setting zone size to " << zone_size
+ << " and conventional region size to " << conventional_region_size
+ << dendl;
+
+ return 0;
+
+fail:
+ zbd_close(zbd_fd);
+ zbd_fd = -1;
+ return r;
+}
+
+
+void HMSMRDevice::_pre_close()
+{
+ if (zbd_fd >= 0) {
+ zbd_close(zbd_fd);
+ zbd_fd = -1;
+ }
+}
+
+void HMSMRDevice::reset_all_zones()
+{
+ dout(10) << __func__ << dendl;
+ zbd_reset_zones(zbd_fd, conventional_region_size, 0);
+}
+
+void HMSMRDevice::reset_zone(uint64_t zone)
+{
+ dout(10) << __func__ << " zone 0x" << std::hex << zone << std::dec << dendl;
+ if (zbd_reset_zones(zbd_fd, zone * zone_size, zone_size) != 0) {
+ derr << __func__ << " resetting zone failed for zone 0x" << std::hex
+ << zone << std::dec << dendl;
+ ceph_abort("zbd_reset_zones failed");
+ }
+}
+
+std::vector<uint64_t> HMSMRDevice::get_zones()
+{
+ std::vector<zbd_zone> zones;
+ unsigned int num_zones = size / zone_size;
+ zones.resize(num_zones);
+
+ int r = zbd_report_zones(zbd_fd, 0, 0, ZBD_RO_ALL, zones.data(), &num_zones);
+ if (r != 0) {
+ derr << __func__ << " zbd_report_zones failed on " << path << ": "
+ << cpp_strerror(errno) << dendl;
+ ceph_abort("zbd_report_zones failed");
+ }
+
+ std::vector<uint64_t> wp(num_zones);
+ for (unsigned i = 0; i < num_zones; ++i) {
+ wp[i] = zones[i].wp;
+ }
+ return wp;
+}
diff --git a/src/blk/zoned/HMSMRDevice.h b/src/blk/zoned/HMSMRDevice.h
new file mode 100644
index 000000000..edf18b5f0
--- /dev/null
+++ b/src/blk/zoned/HMSMRDevice.h
@@ -0,0 +1,52 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2014 Red Hat
+ * Copyright (C) 2020 Abutalib Aghayev
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+#ifndef CEPH_BLK_HMSMRDEVICE_H
+#define CEPH_BLK_HMSMRDEVICE_H
+
+#include <atomic>
+
+#include "include/types.h"
+#include "include/interval_set.h"
+#include "common/Thread.h"
+#include "include/utime.h"
+
+#include "aio/aio.h"
+#include "BlockDevice.h"
+#include "../kernel/KernelDevice.h"
+
+
+class HMSMRDevice final : public KernelDevice {
+ int zbd_fd = -1; ///< fd for the zoned block device
+
+public:
+ HMSMRDevice(CephContext* cct, aio_callback_t cb, void *cbpriv,
+ aio_callback_t d_cb, void *d_cbpriv);
+
+ static bool support(const std::string& path);
+
+ // open/close hooks for libzbd
+ int _post_open() override;
+ void _pre_close() override;
+
+ // smr-specific methods
+ bool is_smr() const final { return true; }
+ void reset_all_zones() override;
+ void reset_zone(uint64_t zone) override;
+ std::vector<uint64_t> get_zones() override;
+
+};
+
+#endif //CEPH_BLK_HMSMRDEVICE_H