summaryrefslogtreecommitdiffstats
path: root/src/blk/spdk
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-07 18:45:59 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-07 18:45:59 +0000
commit19fcec84d8d7d21e796c7624e521b60d28ee21ed (patch)
tree42d26aa27d1e3f7c0b8bd3fd14e7d7082f5008dc /src/blk/spdk
parentInitial commit. (diff)
downloadceph-upstream/16.2.11+ds.tar.xz
ceph-upstream/16.2.11+ds.zip
Adding upstream version 16.2.11+ds.upstream/16.2.11+dsupstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'src/blk/spdk')
-rw-r--r--src/blk/spdk/NVMEDevice.cc971
-rw-r--r--src/blk/spdk/NVMEDevice.h85
2 files changed, 1056 insertions, 0 deletions
diff --git a/src/blk/spdk/NVMEDevice.cc b/src/blk/spdk/NVMEDevice.cc
new file mode 100644
index 000000000..935d3bbd1
--- /dev/null
+++ b/src/blk/spdk/NVMEDevice.cc
@@ -0,0 +1,971 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+//
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2015 XSky <haomai@xsky.com>
+ *
+ * Author: Haomai Wang <haomaiwang@gmail.com>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+#include <unistd.h>
+#include <stdlib.h>
+#include <strings.h>
+#include <sys/types.h>
+#include <sys/stat.h>
+#include <fcntl.h>
+#include <unistd.h>
+
+#include <chrono>
+#include <fstream>
+#include <functional>
+#include <map>
+#include <thread>
+#include <boost/intrusive/slist.hpp>
+
+#include <spdk/nvme.h>
+
+#include "include/intarith.h"
+#include "include/stringify.h"
+#include "include/types.h"
+#include "include/compat.h"
+#include "common/errno.h"
+#include "common/debug.h"
+#include "common/perf_counters.h"
+
+#include "NVMEDevice.h"
+
+#define dout_context g_ceph_context
+#define dout_subsys ceph_subsys_bdev
+#undef dout_prefix
+#define dout_prefix *_dout << "bdev(" << sn << ") "
+
+static constexpr uint16_t data_buffer_default_num = 1024;
+
+static constexpr uint32_t data_buffer_size = 8192;
+
+static constexpr uint16_t inline_segment_num = 32;
+
+static void io_complete(void *t, const struct spdk_nvme_cpl *completion);
+
+struct IORequest {
+ uint16_t cur_seg_idx = 0;
+ uint16_t nseg;
+ uint32_t cur_seg_left = 0;
+ void *inline_segs[inline_segment_num];
+ void **extra_segs = nullptr;
+};
+
+namespace bi = boost::intrusive;
+struct data_cache_buf : public bi::slist_base_hook<bi::link_mode<bi::normal_link>>
+{};
+
+struct Task;
+
+class SharedDriverData {
+ unsigned id;
+ spdk_nvme_transport_id trid;
+ spdk_nvme_ctrlr *ctrlr;
+ spdk_nvme_ns *ns;
+ uint32_t block_size = 0;
+ uint64_t size = 0;
+
+ public:
+ std::vector<NVMEDevice*> registered_devices;
+ friend class SharedDriverQueueData;
+ SharedDriverData(unsigned id_, const spdk_nvme_transport_id& trid_,
+ spdk_nvme_ctrlr *c, spdk_nvme_ns *ns_)
+ : id(id_),
+ trid(trid_),
+ ctrlr(c),
+ ns(ns_) {
+ block_size = spdk_nvme_ns_get_extended_sector_size(ns);
+ size = spdk_nvme_ns_get_size(ns);
+ }
+
+ bool is_equal(const spdk_nvme_transport_id& trid2) const {
+ return spdk_nvme_transport_id_compare(&trid, &trid2) == 0;
+ }
+ ~SharedDriverData() {
+ }
+
+ void register_device(NVMEDevice *device) {
+ registered_devices.push_back(device);
+ }
+
+ void remove_device(NVMEDevice *device) {
+ std::vector<NVMEDevice*> new_devices;
+ for (auto &&it : registered_devices) {
+ if (it != device)
+ new_devices.push_back(it);
+ }
+ registered_devices.swap(new_devices);
+ }
+
+ uint32_t get_block_size() {
+ return block_size;
+ }
+ uint64_t get_size() {
+ return size;
+ }
+};
+
+class SharedDriverQueueData {
+ NVMEDevice *bdev;
+ SharedDriverData *driver;
+ spdk_nvme_ctrlr *ctrlr;
+ spdk_nvme_ns *ns;
+ std::string sn;
+ uint32_t block_size;
+ uint32_t max_queue_depth;
+ struct spdk_nvme_qpair *qpair;
+ bool reap_io = false;
+ int alloc_buf_from_pool(Task *t, bool write);
+
+ public:
+ uint32_t current_queue_depth = 0;
+ std::atomic_ulong completed_op_seq, queue_op_seq;
+ bi::slist<data_cache_buf, bi::constant_time_size<true>> data_buf_list;
+ void _aio_handle(Task *t, IOContext *ioc);
+
+ SharedDriverQueueData(NVMEDevice *bdev, SharedDriverData *driver)
+ : bdev(bdev),
+ driver(driver) {
+ ctrlr = driver->ctrlr;
+ ns = driver->ns;
+ block_size = driver->block_size;
+
+ struct spdk_nvme_io_qpair_opts opts = {};
+ spdk_nvme_ctrlr_get_default_io_qpair_opts(ctrlr, &opts, sizeof(opts));
+ opts.qprio = SPDK_NVME_QPRIO_URGENT;
+ // usable queue depth should minus 1 to aovid overflow.
+ max_queue_depth = opts.io_queue_size - 1;
+ qpair = spdk_nvme_ctrlr_alloc_io_qpair(ctrlr, &opts, sizeof(opts));
+ ceph_assert(qpair != NULL);
+
+ // allocate spdk dma memory
+ for (uint16_t i = 0; i < data_buffer_default_num; i++) {
+ void *b = spdk_dma_zmalloc(data_buffer_size, CEPH_PAGE_SIZE, NULL);
+ if (!b) {
+ derr << __func__ << " failed to create memory pool for nvme data buffer" << dendl;
+ ceph_assert(b);
+ }
+ data_buf_list.push_front(*reinterpret_cast<data_cache_buf *>(b));
+ }
+
+ bdev->queue_number++;
+ if (bdev->queue_number.load() == 1)
+ reap_io = true;
+ }
+
+ ~SharedDriverQueueData() {
+ if (qpair) {
+ spdk_nvme_ctrlr_free_io_qpair(qpair);
+ bdev->queue_number--;
+ }
+
+ data_buf_list.clear_and_dispose(spdk_dma_free);
+ }
+};
+
+struct Task {
+ NVMEDevice *device;
+ IOContext *ctx = nullptr;
+ IOCommand command;
+ uint64_t offset;
+ uint64_t len;
+ bufferlist bl;
+ std::function<void()> fill_cb;
+ Task *next = nullptr;
+ int64_t return_code;
+ Task *primary = nullptr;
+ ceph::coarse_real_clock::time_point start;
+ IORequest io_request = {};
+ ceph::mutex lock = ceph::make_mutex("Task::lock");
+ ceph::condition_variable cond;
+ SharedDriverQueueData *queue = nullptr;
+ // reference count by subtasks.
+ int ref = 0;
+ Task(NVMEDevice *dev, IOCommand c, uint64_t off, uint64_t l, int64_t rc = 0,
+ Task *p = nullptr)
+ : device(dev), command(c), offset(off), len(l),
+ return_code(rc), primary(p),
+ start(ceph::coarse_real_clock::now()) {
+ if (primary) {
+ primary->ref++;
+ return_code = primary->return_code;
+ }
+ }
+ ~Task() {
+ if (primary)
+ primary->ref--;
+ ceph_assert(!io_request.nseg);
+ }
+ void release_segs(SharedDriverQueueData *queue_data) {
+ if (io_request.extra_segs) {
+ for (uint16_t i = 0; i < io_request.nseg; i++) {
+ auto buf = reinterpret_cast<data_cache_buf *>(io_request.extra_segs[i]);
+ queue_data->data_buf_list.push_front(*buf);
+ }
+ delete io_request.extra_segs;
+ } else if (io_request.nseg) {
+ for (uint16_t i = 0; i < io_request.nseg; i++) {
+ auto buf = reinterpret_cast<data_cache_buf *>(io_request.inline_segs[i]);
+ queue_data->data_buf_list.push_front(*buf);
+ }
+ }
+ ctx->total_nseg -= io_request.nseg;
+ io_request.nseg = 0;
+ }
+
+ void copy_to_buf(char *buf, uint64_t off, uint64_t len) {
+ uint64_t copied = 0;
+ uint64_t left = len;
+ void **segs = io_request.extra_segs ? io_request.extra_segs : io_request.inline_segs;
+ uint16_t i = 0;
+ while (left > 0) {
+ char *src = static_cast<char*>(segs[i++]);
+ uint64_t need_copy = std::min(left, data_buffer_size-off);
+ memcpy(buf+copied, src+off, need_copy);
+ off = 0;
+ left -= need_copy;
+ copied += need_copy;
+ }
+ }
+};
+
+static void data_buf_reset_sgl(void *cb_arg, uint32_t sgl_offset)
+{
+ Task *t = static_cast<Task*>(cb_arg);
+ uint32_t i = sgl_offset / data_buffer_size;
+ uint32_t offset = i * data_buffer_size;
+ ceph_assert(i <= t->io_request.nseg);
+
+ for (; i < t->io_request.nseg; i++) {
+ offset += data_buffer_size;
+ if (offset > sgl_offset) {
+ if (offset > t->len)
+ offset = t->len;
+ break;
+ }
+ }
+
+ t->io_request.cur_seg_idx = i;
+ t->io_request.cur_seg_left = offset - sgl_offset;
+ return ;
+}
+
+static int data_buf_next_sge(void *cb_arg, void **address, uint32_t *length)
+{
+ uint32_t size;
+ void *addr;
+ Task *t = static_cast<Task*>(cb_arg);
+ if (t->io_request.cur_seg_idx >= t->io_request.nseg) {
+ *length = 0;
+ *address = 0;
+ return 0;
+ }
+
+ addr = t->io_request.extra_segs ? t->io_request.extra_segs[t->io_request.cur_seg_idx] : t->io_request.inline_segs[t->io_request.cur_seg_idx];
+
+ size = data_buffer_size;
+ if (t->io_request.cur_seg_idx == t->io_request.nseg - 1) {
+ uint64_t tail = t->len % data_buffer_size;
+ if (tail) {
+ size = (uint32_t) tail;
+ }
+ }
+
+ if (t->io_request.cur_seg_left) {
+ *address = (void *)((uint64_t)addr + size - t->io_request.cur_seg_left);
+ *length = t->io_request.cur_seg_left;
+ t->io_request.cur_seg_left = 0;
+ } else {
+ *address = addr;
+ *length = size;
+ }
+
+ t->io_request.cur_seg_idx++;
+ return 0;
+}
+
+int SharedDriverQueueData::alloc_buf_from_pool(Task *t, bool write)
+{
+ uint64_t count = t->len / data_buffer_size;
+ if (t->len % data_buffer_size)
+ ++count;
+ void **segs;
+ if (count > data_buf_list.size())
+ return -ENOMEM;
+ if (count <= inline_segment_num) {
+ segs = t->io_request.inline_segs;
+ } else {
+ t->io_request.extra_segs = new void*[count];
+ segs = t->io_request.extra_segs;
+ }
+ for (uint16_t i = 0; i < count; i++) {
+ ceph_assert(!data_buf_list.empty());
+ segs[i] = &data_buf_list.front();
+ ceph_assert(segs[i] != nullptr);
+ data_buf_list.pop_front();
+ }
+ t->io_request.nseg = count;
+ t->ctx->total_nseg += count;
+ if (write) {
+ auto blp = t->bl.begin();
+ uint32_t len = 0;
+ uint16_t i = 0;
+ for (; i < count - 1; ++i) {
+ blp.copy(data_buffer_size, static_cast<char*>(segs[i]));
+ len += data_buffer_size;
+ }
+ blp.copy(t->bl.length() - len, static_cast<char*>(segs[i]));
+ }
+
+ return 0;
+}
+
+void SharedDriverQueueData::_aio_handle(Task *t, IOContext *ioc)
+{
+ dout(20) << __func__ << " start" << dendl;
+
+ int r = 0;
+ uint64_t lba_off, lba_count;
+ uint32_t max_io_completion = (uint32_t)g_conf().get_val<uint64_t>("bluestore_spdk_max_io_completion");
+ uint64_t io_sleep_in_us = g_conf().get_val<uint64_t>("bluestore_spdk_io_sleep");
+
+ while (ioc->num_running) {
+ again:
+ dout(40) << __func__ << " polling" << dendl;
+ if (current_queue_depth) {
+ r = spdk_nvme_qpair_process_completions(qpair, max_io_completion);
+ if (r < 0) {
+ ceph_abort();
+ } else if (r == 0) {
+ usleep(io_sleep_in_us);
+ }
+ }
+
+ for (; t; t = t->next) {
+ if (current_queue_depth == max_queue_depth) {
+ // no slots
+ goto again;
+ }
+
+ t->queue = this;
+ lba_off = t->offset / block_size;
+ lba_count = t->len / block_size;
+ switch (t->command) {
+ case IOCommand::WRITE_COMMAND:
+ {
+ dout(20) << __func__ << " write command issued " << lba_off << "~" << lba_count << dendl;
+ r = alloc_buf_from_pool(t, true);
+ if (r < 0) {
+ goto again;
+ }
+
+ r = spdk_nvme_ns_cmd_writev(
+ ns, qpair, lba_off, lba_count, io_complete, t, 0,
+ data_buf_reset_sgl, data_buf_next_sge);
+ if (r < 0) {
+ derr << __func__ << " failed to do write command: " << cpp_strerror(r) << dendl;
+ t->ctx->nvme_task_first = t->ctx->nvme_task_last = nullptr;
+ t->release_segs(this);
+ delete t;
+ ceph_abort();
+ }
+ break;
+ }
+ case IOCommand::READ_COMMAND:
+ {
+ dout(20) << __func__ << " read command issued " << lba_off << "~" << lba_count << dendl;
+ r = alloc_buf_from_pool(t, false);
+ if (r < 0) {
+ goto again;
+ }
+
+ r = spdk_nvme_ns_cmd_readv(
+ ns, qpair, lba_off, lba_count, io_complete, t, 0,
+ data_buf_reset_sgl, data_buf_next_sge);
+ if (r < 0) {
+ derr << __func__ << " failed to read: " << cpp_strerror(r) << dendl;
+ t->release_segs(this);
+ delete t;
+ ceph_abort();
+ }
+ break;
+ }
+ case IOCommand::FLUSH_COMMAND:
+ {
+ dout(20) << __func__ << " flush command issueed " << dendl;
+ r = spdk_nvme_ns_cmd_flush(ns, qpair, io_complete, t);
+ if (r < 0) {
+ derr << __func__ << " failed to flush: " << cpp_strerror(r) << dendl;
+ t->release_segs(this);
+ delete t;
+ ceph_abort();
+ }
+ break;
+ }
+ }
+ current_queue_depth++;
+ }
+ }
+
+ if (reap_io)
+ bdev->reap_ioc();
+ dout(20) << __func__ << " end" << dendl;
+}
+
+#define dout_subsys ceph_subsys_bdev
+#undef dout_prefix
+#define dout_prefix *_dout << "bdev "
+
+class NVMEManager {
+ public:
+ struct ProbeContext {
+ spdk_nvme_transport_id trid;
+ NVMEManager *manager;
+ SharedDriverData *driver;
+ bool done;
+ };
+
+ private:
+ ceph::mutex lock = ceph::make_mutex("NVMEManager::lock");
+ bool stopping = false;
+ std::vector<SharedDriverData*> shared_driver_datas;
+ std::thread dpdk_thread;
+ ceph::mutex probe_queue_lock = ceph::make_mutex("NVMEManager::probe_queue_lock");
+ ceph::condition_variable probe_queue_cond;
+ std::list<ProbeContext*> probe_queue;
+
+ public:
+ NVMEManager() {}
+ ~NVMEManager() {
+ if (!dpdk_thread.joinable())
+ return;
+ {
+ std::lock_guard guard(probe_queue_lock);
+ stopping = true;
+ probe_queue_cond.notify_all();
+ }
+ dpdk_thread.join();
+ }
+
+ int try_get(const spdk_nvme_transport_id& trid, SharedDriverData **driver);
+ void register_ctrlr(const spdk_nvme_transport_id& trid, spdk_nvme_ctrlr *c, SharedDriverData **driver) {
+ ceph_assert(ceph_mutex_is_locked(lock));
+ spdk_nvme_ns *ns;
+ int num_ns = spdk_nvme_ctrlr_get_num_ns(c);
+ ceph_assert(num_ns >= 1);
+ if (num_ns > 1) {
+ dout(0) << __func__ << " namespace count larger than 1, currently only use the first namespace" << dendl;
+ }
+ ns = spdk_nvme_ctrlr_get_ns(c, 1);
+ if (!ns) {
+ derr << __func__ << " failed to get namespace at 1" << dendl;
+ ceph_abort();
+ }
+ dout(1) << __func__ << " successfully attach nvme device at" << trid.traddr << dendl;
+
+ // only support one device per osd now!
+ ceph_assert(shared_driver_datas.empty());
+ // index 0 is occurred by master thread
+ shared_driver_datas.push_back(new SharedDriverData(shared_driver_datas.size()+1, trid, c, ns));
+ *driver = shared_driver_datas.back();
+ }
+};
+
+static NVMEManager manager;
+
+static bool probe_cb(void *cb_ctx, const struct spdk_nvme_transport_id *trid, struct spdk_nvme_ctrlr_opts *opts)
+{
+ NVMEManager::ProbeContext *ctx = static_cast<NVMEManager::ProbeContext*>(cb_ctx);
+
+ if (trid->trtype != SPDK_NVME_TRANSPORT_PCIE) {
+ dout(0) << __func__ << " only probe local nvme device" << dendl;
+ return false;
+ }
+
+ dout(0) << __func__ << " found device at: "
+ << "trtype=" << spdk_nvme_transport_id_trtype_str(trid->trtype) << ", "
+ << "traddr=" << trid->traddr << dendl;
+ if (spdk_nvme_transport_id_compare(&ctx->trid, trid)) {
+ dout(0) << __func__ << " device traddr (" << ctx->trid.traddr << ") not match " << trid->traddr << dendl;
+ return false;
+ }
+
+ opts->io_queue_size = UINT16_MAX;
+
+ return true;
+}
+
+static void attach_cb(void *cb_ctx, const struct spdk_nvme_transport_id *trid,
+ struct spdk_nvme_ctrlr *ctrlr, const struct spdk_nvme_ctrlr_opts *opts)
+{
+ auto ctx = static_cast<NVMEManager::ProbeContext*>(cb_ctx);
+ ctx->manager->register_ctrlr(ctx->trid, ctrlr, &ctx->driver);
+}
+
+static int hex2dec(unsigned char c)
+{
+ if (isdigit(c))
+ return c - '0';
+ else if (isupper(c))
+ return c - 'A' + 10;
+ else
+ return c - 'a' + 10;
+}
+
+static int find_first_bitset(const string& s)
+{
+ auto e = s.rend();
+ if (s.compare(0, 2, "0x") == 0 ||
+ s.compare(0, 2, "0X") == 0) {
+ advance(e, -2);
+ }
+ auto p = s.rbegin();
+ for (int pos = 0; p != e; ++p, pos += 4) {
+ if (!isxdigit(*p)) {
+ return -EINVAL;
+ }
+ if (int val = hex2dec(*p); val != 0) {
+ return pos + ffs(val);
+ }
+ }
+ return 0;
+}
+
+int NVMEManager::try_get(const spdk_nvme_transport_id& trid, SharedDriverData **driver)
+{
+ std::lock_guard l(lock);
+ for (auto &&it : shared_driver_datas) {
+ if (it->is_equal(trid)) {
+ *driver = it;
+ return 0;
+ }
+ }
+
+ struct spdk_pci_addr pci_addr;
+ int rc = spdk_pci_addr_parse(&pci_addr, trid.traddr);
+ if (rc < 0) {
+ derr << __func__ << " invalid transport address: " << trid.traddr << dendl;
+ return -ENOENT;
+ }
+ auto coremask_arg = g_conf().get_val<std::string>("bluestore_spdk_coremask");
+ int m_core_arg = find_first_bitset(coremask_arg);
+ // at least one core is needed for using spdk
+ if (m_core_arg <= 0) {
+ derr << __func__ << " invalid bluestore_spdk_coremask, "
+ << "at least one core is needed" << dendl;
+ return -ENOENT;
+ }
+ m_core_arg -= 1;
+
+ uint32_t mem_size_arg = (uint32_t)g_conf().get_val<Option::size_t>("bluestore_spdk_mem");
+
+ if (!dpdk_thread.joinable()) {
+ dpdk_thread = std::thread(
+ [this, coremask_arg, m_core_arg, mem_size_arg, pci_addr]() {
+ struct spdk_env_opts opts;
+ struct spdk_pci_addr addr = pci_addr;
+ int r;
+
+ spdk_env_opts_init(&opts);
+ opts.name = "nvme-device-manager";
+ opts.core_mask = coremask_arg.c_str();
+ opts.master_core = m_core_arg;
+ opts.mem_size = mem_size_arg;
+ opts.pci_whitelist = &addr;
+ opts.num_pci_addr = 1;
+ spdk_env_init(&opts);
+ spdk_unaffinitize_thread();
+
+ std::unique_lock l(probe_queue_lock);
+ while (!stopping) {
+ if (!probe_queue.empty()) {
+ ProbeContext* ctxt = probe_queue.front();
+ probe_queue.pop_front();
+ r = spdk_nvme_probe(NULL, ctxt, probe_cb, attach_cb, NULL);
+ if (r < 0) {
+ ceph_assert(!ctxt->driver);
+ derr << __func__ << " device probe nvme failed" << dendl;
+ }
+ ctxt->done = true;
+ probe_queue_cond.notify_all();
+ } else {
+ probe_queue_cond.wait(l);
+ }
+ }
+ for (auto p : probe_queue)
+ p->done = true;
+ probe_queue_cond.notify_all();
+ }
+ );
+ }
+
+ ProbeContext ctx{trid, this, nullptr, false};
+ {
+ std::unique_lock l(probe_queue_lock);
+ probe_queue.push_back(&ctx);
+ while (!ctx.done)
+ probe_queue_cond.wait(l);
+ }
+ if (!ctx.driver)
+ return -1;
+ *driver = ctx.driver;
+
+ return 0;
+}
+
+void io_complete(void *t, const struct spdk_nvme_cpl *completion)
+{
+ Task *task = static_cast<Task*>(t);
+ IOContext *ctx = task->ctx;
+ SharedDriverQueueData *queue = task->queue;
+
+ ceph_assert(queue != NULL);
+ ceph_assert(ctx != NULL);
+ --queue->current_queue_depth;
+ if (task->command == IOCommand::WRITE_COMMAND) {
+ ceph_assert(!spdk_nvme_cpl_is_error(completion));
+ dout(20) << __func__ << " write/zero op successfully, left "
+ << queue->queue_op_seq - queue->completed_op_seq << dendl;
+ // check waiting count before doing callback (which may
+ // destroy this ioc).
+ if (ctx->priv) {
+ if (!--ctx->num_running) {
+ task->device->aio_callback(task->device->aio_callback_priv, ctx->priv);
+ }
+ } else {
+ ctx->try_aio_wake();
+ }
+ task->release_segs(queue);
+ delete task;
+ } else if (task->command == IOCommand::READ_COMMAND) {
+ ceph_assert(!spdk_nvme_cpl_is_error(completion));
+ dout(20) << __func__ << " read op successfully" << dendl;
+ task->fill_cb();
+ task->release_segs(queue);
+ // read submitted by AIO
+ if (!task->return_code) {
+ if (ctx->priv) {
+ if (!--ctx->num_running) {
+ task->device->aio_callback(task->device->aio_callback_priv, ctx->priv);
+ }
+ } else {
+ ctx->try_aio_wake();
+ }
+ delete task;
+ } else {
+ if (Task* primary = task->primary; primary != nullptr) {
+ delete task;
+ if (!primary->ref)
+ primary->return_code = 0;
+ } else {
+ task->return_code = 0;
+ }
+ --ctx->num_running;
+ }
+ } else {
+ ceph_assert(task->command == IOCommand::FLUSH_COMMAND);
+ ceph_assert(!spdk_nvme_cpl_is_error(completion));
+ dout(20) << __func__ << " flush op successfully" << dendl;
+ task->return_code = 0;
+ }
+}
+
+// ----------------
+#undef dout_prefix
+#define dout_prefix *_dout << "bdev(" << name << ") "
+
+NVMEDevice::NVMEDevice(CephContext* cct, aio_callback_t cb, void *cbpriv)
+ : BlockDevice(cct, cb, cbpriv),
+ driver(nullptr)
+{
+}
+
+bool NVMEDevice::support(const std::string& path)
+{
+ char buf[PATH_MAX + 1];
+ int r = ::readlink(path.c_str(), buf, sizeof(buf) - 1);
+ if (r >= 0) {
+ buf[r] = '\0';
+ char *bname = ::basename(buf);
+ if (strncmp(bname, SPDK_PREFIX, sizeof(SPDK_PREFIX)-1) == 0) {
+ return true;
+ }
+ }
+ return false;
+}
+
+int NVMEDevice::open(const string& p)
+{
+ dout(1) << __func__ << " path " << p << dendl;
+
+ std::ifstream ifs(p);
+ if (!ifs) {
+ derr << __func__ << " unable to open " << p << dendl;
+ return -1;
+ }
+ string val;
+ std::getline(ifs, val);
+ spdk_nvme_transport_id trid;
+ if (int r = spdk_nvme_transport_id_parse(&trid, val.c_str()); r) {
+ derr << __func__ << " unable to read " << p << ": " << cpp_strerror(r)
+ << dendl;
+ return r;
+ }
+ if (int r = manager.try_get(trid, &driver); r < 0) {
+ derr << __func__ << " failed to get nvme device with transport address " << trid.traddr << dendl;
+ return r;
+ }
+
+ driver->register_device(this);
+ block_size = driver->get_block_size();
+ size = driver->get_size();
+ name = trid.traddr;
+
+ //nvme is non-rotational device.
+ rotational = false;
+
+ // round size down to an even block
+ size &= ~(block_size - 1);
+
+ dout(1) << __func__ << " size " << size << " (" << byte_u_t(size) << ")"
+ << " block_size " << block_size << " (" << byte_u_t(block_size)
+ << ")" << dendl;
+
+
+ return 0;
+}
+
+void NVMEDevice::close()
+{
+ dout(1) << __func__ << dendl;
+
+ name.clear();
+ driver->remove_device(this);
+
+ dout(1) << __func__ << " end" << dendl;
+}
+
+int NVMEDevice::collect_metadata(const string& prefix, map<string,string> *pm) const
+{
+ (*pm)[prefix + "rotational"] = "0";
+ (*pm)[prefix + "size"] = stringify(get_size());
+ (*pm)[prefix + "block_size"] = stringify(get_block_size());
+ (*pm)[prefix + "driver"] = "NVMEDevice";
+ (*pm)[prefix + "type"] = "nvme";
+ (*pm)[prefix + "access_mode"] = "spdk";
+ (*pm)[prefix + "nvme_serial_number"] = name;
+
+ return 0;
+}
+
+int NVMEDevice::flush()
+{
+ return 0;
+}
+
+void NVMEDevice::aio_submit(IOContext *ioc)
+{
+ dout(20) << __func__ << " ioc " << ioc << " pending "
+ << ioc->num_pending.load() << " running "
+ << ioc->num_running.load() << dendl;
+ int pending = ioc->num_pending.load();
+ Task *t = static_cast<Task*>(ioc->nvme_task_first);
+ if (pending && t) {
+ ioc->num_running += pending;
+ ioc->num_pending -= pending;
+ ceph_assert(ioc->num_pending.load() == 0); // we should be only thread doing this
+ // Only need to push the first entry
+ ioc->nvme_task_first = ioc->nvme_task_last = nullptr;
+
+ thread_local SharedDriverQueueData queue_t = SharedDriverQueueData(this, driver);
+ queue_t._aio_handle(t, ioc);
+ }
+}
+
+static void ioc_append_task(IOContext *ioc, Task *t)
+{
+ Task *first, *last;
+
+ first = static_cast<Task*>(ioc->nvme_task_first);
+ last = static_cast<Task*>(ioc->nvme_task_last);
+ if (last)
+ last->next = t;
+ if (!first)
+ ioc->nvme_task_first = t;
+ ioc->nvme_task_last = t;
+ ++ioc->num_pending;
+}
+
+static void write_split(
+ NVMEDevice *dev,
+ uint64_t off,
+ bufferlist &bl,
+ IOContext *ioc)
+{
+ uint64_t remain_len = bl.length(), begin = 0, write_size;
+ Task *t;
+ // This value may need to be got from configuration later.
+ uint64_t split_size = 131072; // 128KB.
+
+ while (remain_len > 0) {
+ write_size = std::min(remain_len, split_size);
+ t = new Task(dev, IOCommand::WRITE_COMMAND, off + begin, write_size);
+ // TODO: if upper layer alloc memory with known physical address,
+ // we can reduce this copy
+ bl.splice(0, write_size, &t->bl);
+ remain_len -= write_size;
+ t->ctx = ioc;
+ ioc_append_task(ioc, t);
+ begin += write_size;
+ }
+}
+
+static void make_read_tasks(
+ NVMEDevice *dev,
+ uint64_t aligned_off,
+ IOContext *ioc, char *buf, uint64_t aligned_len, Task *primary,
+ uint64_t orig_off, uint64_t orig_len)
+{
+ // This value may need to be got from configuration later.
+ uint64_t split_size = 131072; // 128KB.
+ uint64_t tmp_off = orig_off - aligned_off, remain_orig_len = orig_len;
+ auto begin = aligned_off;
+ const auto aligned_end = begin + aligned_len;
+
+ for (; begin < aligned_end; begin += split_size) {
+ auto read_size = std::min(aligned_end - begin, split_size);
+ auto tmp_len = std::min(remain_orig_len, read_size - tmp_off);
+ Task *t = nullptr;
+
+ if (primary && (aligned_len <= split_size)) {
+ t = primary;
+ } else {
+ t = new Task(dev, IOCommand::READ_COMMAND, begin, read_size, 0, primary);
+ }
+
+ t->ctx = ioc;
+
+ // TODO: if upper layer alloc memory with known physical address,
+ // we can reduce this copy
+ t->fill_cb = [buf, t, tmp_off, tmp_len] {
+ t->copy_to_buf(buf, tmp_off, tmp_len);
+ };
+
+ ioc_append_task(ioc, t);
+ remain_orig_len -= tmp_len;
+ buf += tmp_len;
+ tmp_off = 0;
+ }
+}
+
+int NVMEDevice::aio_write(
+ uint64_t off,
+ bufferlist &bl,
+ IOContext *ioc,
+ bool buffered,
+ int write_hint)
+{
+ uint64_t len = bl.length();
+ dout(20) << __func__ << " " << off << "~" << len << " ioc " << ioc
+ << " buffered " << buffered << dendl;
+ ceph_assert(is_valid_io(off, len));
+
+ write_split(this, off, bl, ioc);
+ dout(5) << __func__ << " " << off << "~" << len << dendl;
+
+ return 0;
+}
+
+int NVMEDevice::write(uint64_t off, bufferlist &bl, bool buffered, int write_hint)
+{
+ uint64_t len = bl.length();
+ dout(20) << __func__ << " " << off << "~" << len << " buffered "
+ << buffered << dendl;
+ ceph_assert(off % block_size == 0);
+ ceph_assert(len % block_size == 0);
+ ceph_assert(len > 0);
+ ceph_assert(off < size);
+ ceph_assert(off + len <= size);
+
+ IOContext ioc(cct, NULL);
+ write_split(this, off, bl, &ioc);
+ dout(5) << __func__ << " " << off << "~" << len << dendl;
+ aio_submit(&ioc);
+ ioc.aio_wait();
+ return 0;
+}
+
+int NVMEDevice::read(uint64_t off, uint64_t len, bufferlist *pbl,
+ IOContext *ioc,
+ bool buffered)
+{
+ dout(5) << __func__ << " " << off << "~" << len << " ioc " << ioc << dendl;
+ ceph_assert(is_valid_io(off, len));
+
+ Task t(this, IOCommand::READ_COMMAND, off, len, 1);
+ bufferptr p = buffer::create_small_page_aligned(len);
+ char *buf = p.c_str();
+
+ // for sync read, need to control IOContext in itself
+ IOContext read_ioc(cct, nullptr);
+ make_read_tasks(this, off, &read_ioc, buf, len, &t, off, len);
+ dout(5) << __func__ << " " << off << "~" << len << dendl;
+ aio_submit(&read_ioc);
+
+ pbl->push_back(std::move(p));
+ return t.return_code;
+}
+
+int NVMEDevice::aio_read(
+ uint64_t off,
+ uint64_t len,
+ bufferlist *pbl,
+ IOContext *ioc)
+{
+ dout(20) << __func__ << " " << off << "~" << len << " ioc " << ioc << dendl;
+ ceph_assert(is_valid_io(off, len));
+ bufferptr p = buffer::create_small_page_aligned(len);
+ pbl->append(p);
+ char* buf = p.c_str();
+
+ make_read_tasks(this, off, ioc, buf, len, NULL, off, len);
+ dout(5) << __func__ << " " << off << "~" << len << dendl;
+ return 0;
+}
+
+int NVMEDevice::read_random(uint64_t off, uint64_t len, char *buf, bool buffered)
+{
+ ceph_assert(len > 0);
+ ceph_assert(off < size);
+ ceph_assert(off + len <= size);
+
+ uint64_t aligned_off = p2align(off, block_size);
+ uint64_t aligned_len = p2roundup(off+len, block_size) - aligned_off;
+ dout(5) << __func__ << " " << off << "~" << len
+ << " aligned " << aligned_off << "~" << aligned_len << dendl;
+ IOContext ioc(g_ceph_context, nullptr);
+ Task t(this, IOCommand::READ_COMMAND, aligned_off, aligned_len, 1);
+
+ make_read_tasks(this, aligned_off, &ioc, buf, aligned_len, &t, off, len);
+ aio_submit(&ioc);
+
+ return t.return_code;
+}
+
+int NVMEDevice::invalidate_cache(uint64_t off, uint64_t len)
+{
+ dout(5) << __func__ << " " << off << "~" << len << dendl;
+ return 0;
+}
diff --git a/src/blk/spdk/NVMEDevice.h b/src/blk/spdk/NVMEDevice.h
new file mode 100644
index 000000000..352856093
--- /dev/null
+++ b/src/blk/spdk/NVMEDevice.h
@@ -0,0 +1,85 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2015 XSky <haomai@xsky.com>
+ *
+ * Author: Haomai Wang <haomaiwang@gmail.com>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+#ifndef CEPH_BLK_NVMEDEVICE
+#define CEPH_BLK_NVMEDEVICE
+
+#include <queue>
+#include <map>
+#include <limits>
+
+// since _Static_assert introduced in c11
+#define _Static_assert static_assert
+
+
+#include "include/interval_set.h"
+#include "common/ceph_time.h"
+#include "BlockDevice.h"
+
+enum class IOCommand {
+ READ_COMMAND,
+ WRITE_COMMAND,
+ FLUSH_COMMAND
+};
+
+class SharedDriverData;
+class SharedDriverQueueData;
+
+class NVMEDevice : public BlockDevice {
+ /**
+ * points to pinned, physically contiguous memory region;
+ * contains 4KB IDENTIFY structure for controller which is
+ * target for CONTROLLER IDENTIFY command during initialization
+ */
+ SharedDriverData *driver;
+ string name;
+
+ public:
+ std::atomic_int queue_number = {0};
+ SharedDriverData *get_driver() { return driver; }
+
+ NVMEDevice(CephContext* cct, aio_callback_t cb, void *cbpriv);
+
+ bool supported_bdev_label() override { return false; }
+
+ static bool support(const std::string& path);
+
+ void aio_submit(IOContext *ioc) override;
+
+ int read(uint64_t off, uint64_t len, bufferlist *pbl,
+ IOContext *ioc,
+ bool buffered) override;
+ int aio_read(
+ uint64_t off,
+ uint64_t len,
+ bufferlist *pbl,
+ IOContext *ioc) override;
+ int aio_write(uint64_t off, bufferlist& bl,
+ IOContext *ioc,
+ bool buffered,
+ int write_hint = WRITE_LIFE_NOT_SET) override;
+ int write(uint64_t off, bufferlist& bl, bool buffered, int write_hint = WRITE_LIFE_NOT_SET) override;
+ int flush() override;
+ int read_random(uint64_t off, uint64_t len, char *buf, bool buffered) override;
+
+ // for managing buffered readers/writers
+ int invalidate_cache(uint64_t off, uint64_t len) override;
+ int open(const string& path) override;
+ void close() override;
+ int collect_metadata(const string& prefix, map<string,string> *pm) const override;
+};
+
+#endif