summaryrefslogtreecommitdiffstats
path: root/src/spdk/module/bdev/compress
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-07 18:45:59 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-07 18:45:59 +0000
commit19fcec84d8d7d21e796c7624e521b60d28ee21ed (patch)
tree42d26aa27d1e3f7c0b8bd3fd14e7d7082f5008dc /src/spdk/module/bdev/compress
parentInitial commit. (diff)
downloadceph-upstream.tar.xz
ceph-upstream.zip
Adding upstream version 16.2.11+ds.upstream/16.2.11+dsupstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'src/spdk/module/bdev/compress')
-rw-r--r--src/spdk/module/bdev/compress/Makefile48
-rw-r--r--src/spdk/module/bdev/compress/vbdev_compress.c1865
-rw-r--r--src/spdk/module/bdev/compress/vbdev_compress.h106
-rw-r--r--src/spdk/module/bdev/compress/vbdev_compress_rpc.c252
4 files changed, 2271 insertions, 0 deletions
diff --git a/src/spdk/module/bdev/compress/Makefile b/src/spdk/module/bdev/compress/Makefile
new file mode 100644
index 000000000..e3d889e67
--- /dev/null
+++ b/src/spdk/module/bdev/compress/Makefile
@@ -0,0 +1,48 @@
+#
+# BSD LICENSE
+#
+# Copyright (c) Intel Corporation.
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions
+# are met:
+#
+# * Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+# * Redistributions in binary form must reproduce the above copyright
+# notice, this list of conditions and the following disclaimer in
+# the documentation and/or other materials provided with the
+# distribution.
+# * Neither the name of Intel Corporation nor the names of its
+# contributors may be used to endorse or promote products derived
+# from this software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+#
+
+SPDK_ROOT_DIR := $(abspath $(CURDIR)/../../..)
+include $(SPDK_ROOT_DIR)/mk/spdk.common.mk
+
+SO_VER := 2
+SO_MINOR := 0
+
+CFLAGS += -I$(SPDK_ROOT_DIR)/lib/bdev/
+
+C_SRCS = vbdev_compress.c vbdev_compress_rpc.c
+LIBNAME = bdev_compress
+CFLAGS += $(ENV_CFLAGS)
+
+SPDK_MAP_FILE = $(SPDK_ROOT_DIR)/mk/spdk_blank.map
+
+include $(SPDK_ROOT_DIR)/mk/spdk.lib.mk
diff --git a/src/spdk/module/bdev/compress/vbdev_compress.c b/src/spdk/module/bdev/compress/vbdev_compress.c
new file mode 100644
index 000000000..a83c97c64
--- /dev/null
+++ b/src/spdk/module/bdev/compress/vbdev_compress.c
@@ -0,0 +1,1865 @@
+/*-
+ * BSD LICENSE
+ *
+ * Copyright (c) Intel Corporation.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions
+ * are met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above copyright
+ * notice, this list of conditions and the following disclaimer in
+ * the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Intel Corporation nor the names of its
+ * contributors may be used to endorse or promote products derived
+ * from this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+#include "vbdev_compress.h"
+
+#include "spdk/reduce.h"
+#include "spdk/stdinc.h"
+#include "spdk/rpc.h"
+#include "spdk/env.h"
+#include "spdk/conf.h"
+#include "spdk/endian.h"
+#include "spdk/string.h"
+#include "spdk/thread.h"
+#include "spdk/util.h"
+#include "spdk/bdev_module.h"
+
+#include "spdk_internal/log.h"
+
+#include <rte_config.h>
+#include <rte_bus_vdev.h>
+#include <rte_compressdev.h>
+#include <rte_comp.h>
+
+#define NUM_MAX_XFORMS 2
+#define NUM_MAX_INFLIGHT_OPS 128
+#define DEFAULT_WINDOW_SIZE 15
+/* We need extra mbufs per operation to accommodate host buffers that
+ * span a 2MB boundary.
+ */
+#define MAX_MBUFS_PER_OP (REDUCE_MAX_IOVECS * 2)
+#define CHUNK_SIZE (1024 * 16)
+#define COMP_BDEV_NAME "compress"
+#define BACKING_IO_SZ (4 * 1024)
+
+#define ISAL_PMD "compress_isal"
+#define QAT_PMD "compress_qat"
+#define NUM_MBUFS 8192
+#define POOL_CACHE_SIZE 256
+
+static enum compress_pmd g_opts;
+
+/* Global list of available compression devices. */
+struct compress_dev {
+ struct rte_compressdev_info cdev_info; /* includes device friendly name */
+ uint8_t cdev_id; /* identifier for the device */
+ void *comp_xform; /* shared private xform for comp on this PMD */
+ void *decomp_xform; /* shared private xform for decomp on this PMD */
+ TAILQ_ENTRY(compress_dev) link;
+};
+static TAILQ_HEAD(, compress_dev) g_compress_devs = TAILQ_HEAD_INITIALIZER(g_compress_devs);
+
+/* Although ISAL PMD reports 'unlimited' qpairs, it has an unplanned limit of 99 due to
+ * the length of the internal ring name that it creates, it breaks a limit in the generic
+ * ring code and fails the qp initialization.
+ */
+#define MAX_NUM_QP 99
+/* Global list and lock for unique device/queue pair combos */
+struct comp_device_qp {
+ struct compress_dev *device; /* ptr to compression device */
+ uint8_t qp; /* queue pair for this node */
+ struct spdk_thread *thread; /* thead that this qp is assigned to */
+ TAILQ_ENTRY(comp_device_qp) link;
+};
+static TAILQ_HEAD(, comp_device_qp) g_comp_device_qp = TAILQ_HEAD_INITIALIZER(g_comp_device_qp);
+static pthread_mutex_t g_comp_device_qp_lock = PTHREAD_MUTEX_INITIALIZER;
+
+/* For queueing up compression operations that we can't submit for some reason */
+struct vbdev_comp_op {
+ struct spdk_reduce_backing_dev *backing_dev;
+ struct iovec *src_iovs;
+ int src_iovcnt;
+ struct iovec *dst_iovs;
+ int dst_iovcnt;
+ bool compress;
+ void *cb_arg;
+ TAILQ_ENTRY(vbdev_comp_op) link;
+};
+
+struct vbdev_comp_delete_ctx {
+ spdk_delete_compress_complete cb_fn;
+ void *cb_arg;
+ int cb_rc;
+ struct spdk_thread *orig_thread;
+};
+
+/* List of virtual bdevs and associated info for each. */
+struct vbdev_compress {
+ struct spdk_bdev *base_bdev; /* the thing we're attaching to */
+ struct spdk_bdev_desc *base_desc; /* its descriptor we get from open */
+ struct spdk_io_channel *base_ch; /* IO channel of base device */
+ struct spdk_bdev comp_bdev; /* the compression virtual bdev */
+ struct comp_io_channel *comp_ch; /* channel associated with this bdev */
+ char *drv_name; /* name of the compression device driver */
+ struct comp_device_qp *device_qp;
+ struct spdk_thread *reduce_thread;
+ pthread_mutex_t reduce_lock;
+ uint32_t ch_count;
+ TAILQ_HEAD(, spdk_bdev_io) pending_comp_ios; /* outstanding operations to a comp library */
+ struct spdk_poller *poller; /* completion poller */
+ struct spdk_reduce_vol_params params; /* params for the reduce volume */
+ struct spdk_reduce_backing_dev backing_dev; /* backing device info for the reduce volume */
+ struct spdk_reduce_vol *vol; /* the reduce volume */
+ struct vbdev_comp_delete_ctx *delete_ctx;
+ bool orphaned; /* base bdev claimed but comp_bdev not registered */
+ int reduce_errno;
+ TAILQ_HEAD(, vbdev_comp_op) queued_comp_ops;
+ TAILQ_ENTRY(vbdev_compress) link;
+ struct spdk_thread *thread; /* thread where base device is opened */
+};
+static TAILQ_HEAD(, vbdev_compress) g_vbdev_comp = TAILQ_HEAD_INITIALIZER(g_vbdev_comp);
+
+/* The comp vbdev channel struct. It is allocated and freed on my behalf by the io channel code.
+ */
+struct comp_io_channel {
+ struct spdk_io_channel_iter *iter; /* used with for_each_channel in reset */
+};
+
+/* Per I/O context for the compression vbdev. */
+struct comp_bdev_io {
+ struct comp_io_channel *comp_ch; /* used in completion handling */
+ struct vbdev_compress *comp_bdev; /* vbdev associated with this IO */
+ struct spdk_bdev_io_wait_entry bdev_io_wait; /* for bdev_io_wait */
+ struct spdk_bdev_io *orig_io; /* the original IO */
+ struct spdk_io_channel *ch; /* for resubmission */
+ int status; /* save for completion on orig thread */
+};
+
+/* Shared mempools between all devices on this system */
+static struct rte_mempool *g_mbuf_mp = NULL; /* mbuf mempool */
+static struct rte_mempool *g_comp_op_mp = NULL; /* comp operations, must be rte* mempool */
+static struct rte_mbuf_ext_shared_info g_shinfo = {}; /* used by DPDK mbuf macros */
+static bool g_qat_available = false;
+static bool g_isal_available = false;
+
+/* Create shared (between all ops per PMD) compress xforms. */
+static struct rte_comp_xform g_comp_xform = {
+ .type = RTE_COMP_COMPRESS,
+ .compress = {
+ .algo = RTE_COMP_ALGO_DEFLATE,
+ .deflate.huffman = RTE_COMP_HUFFMAN_DEFAULT,
+ .level = RTE_COMP_LEVEL_MAX,
+ .window_size = DEFAULT_WINDOW_SIZE,
+ .chksum = RTE_COMP_CHECKSUM_NONE,
+ .hash_algo = RTE_COMP_HASH_ALGO_NONE
+ }
+};
+/* Create shared (between all ops per PMD) decompress xforms. */
+static struct rte_comp_xform g_decomp_xform = {
+ .type = RTE_COMP_DECOMPRESS,
+ .decompress = {
+ .algo = RTE_COMP_ALGO_DEFLATE,
+ .chksum = RTE_COMP_CHECKSUM_NONE,
+ .window_size = DEFAULT_WINDOW_SIZE,
+ .hash_algo = RTE_COMP_HASH_ALGO_NONE
+ }
+};
+
+static void vbdev_compress_examine(struct spdk_bdev *bdev);
+static void vbdev_compress_claim(struct vbdev_compress *comp_bdev);
+static void vbdev_compress_queue_io(struct spdk_bdev_io *bdev_io);
+struct vbdev_compress *_prepare_for_load_init(struct spdk_bdev *bdev, uint32_t lb_size);
+static void vbdev_compress_submit_request(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io);
+static void comp_bdev_ch_destroy_cb(void *io_device, void *ctx_buf);
+static void vbdev_compress_delete_done(void *cb_arg, int bdeverrno);
+
+/* Dummy function used by DPDK to free ext attached buffers
+ * to mbufs, we free them ourselves but this callback has to
+ * be here.
+ */
+static void
+shinfo_free_cb(void *arg1, void *arg2)
+{
+}
+
+/* Called by vbdev_init_compress_drivers() to init each discovered compression device */
+static int
+create_compress_dev(uint8_t index)
+{
+ struct compress_dev *device;
+ uint16_t q_pairs;
+ uint8_t cdev_id;
+ int rc, i;
+ struct comp_device_qp *dev_qp;
+ struct comp_device_qp *tmp_qp;
+
+ device = calloc(1, sizeof(struct compress_dev));
+ if (!device) {
+ return -ENOMEM;
+ }
+
+ /* Get details about this device. */
+ rte_compressdev_info_get(index, &device->cdev_info);
+
+ cdev_id = device->cdev_id = index;
+
+ /* Zero means no limit so choose number of lcores. */
+ if (device->cdev_info.max_nb_queue_pairs == 0) {
+ q_pairs = MAX_NUM_QP;
+ } else {
+ q_pairs = spdk_min(device->cdev_info.max_nb_queue_pairs, MAX_NUM_QP);
+ }
+
+ /* Configure the compression device. */
+ struct rte_compressdev_config config = {
+ .socket_id = rte_socket_id(),
+ .nb_queue_pairs = q_pairs,
+ .max_nb_priv_xforms = NUM_MAX_XFORMS,
+ .max_nb_streams = 0
+ };
+ rc = rte_compressdev_configure(cdev_id, &config);
+ if (rc < 0) {
+ SPDK_ERRLOG("Failed to configure compressdev %u\n", cdev_id);
+ goto err;
+ }
+
+ /* Pre-setup all potential qpairs now and assign them in the channel
+ * callback.
+ */
+ for (i = 0; i < q_pairs; i++) {
+ rc = rte_compressdev_queue_pair_setup(cdev_id, i,
+ NUM_MAX_INFLIGHT_OPS,
+ rte_socket_id());
+ if (rc) {
+ if (i > 0) {
+ q_pairs = i;
+ SPDK_NOTICELOG("FYI failed to setup a queue pair on "
+ "compressdev %u with error %u "
+ "so limiting to %u qpairs\n",
+ cdev_id, rc, q_pairs);
+ break;
+ } else {
+ SPDK_ERRLOG("Failed to setup queue pair on "
+ "compressdev %u with error %u\n", cdev_id, rc);
+ rc = -EINVAL;
+ goto err;
+ }
+ }
+ }
+
+ rc = rte_compressdev_start(cdev_id);
+ if (rc < 0) {
+ SPDK_ERRLOG("Failed to start device %u: error %d\n",
+ cdev_id, rc);
+ goto err;
+ }
+
+ if (device->cdev_info.capabilities->comp_feature_flags & RTE_COMP_FF_SHAREABLE_PRIV_XFORM) {
+ rc = rte_compressdev_private_xform_create(cdev_id, &g_comp_xform,
+ &device->comp_xform);
+ if (rc < 0) {
+ SPDK_ERRLOG("Failed to create private comp xform device %u: error %d\n",
+ cdev_id, rc);
+ goto err;
+ }
+
+ rc = rte_compressdev_private_xform_create(cdev_id, &g_decomp_xform,
+ &device->decomp_xform);
+ if (rc) {
+ SPDK_ERRLOG("Failed to create private decomp xform device %u: error %d\n",
+ cdev_id, rc);
+ goto err;
+ }
+ } else {
+ SPDK_ERRLOG("PMD does not support shared transforms\n");
+ goto err;
+ }
+
+ /* Build up list of device/qp combinations */
+ for (i = 0; i < q_pairs; i++) {
+ dev_qp = calloc(1, sizeof(struct comp_device_qp));
+ if (!dev_qp) {
+ rc = -ENOMEM;
+ goto err;
+ }
+ dev_qp->device = device;
+ dev_qp->qp = i;
+ dev_qp->thread = NULL;
+ TAILQ_INSERT_TAIL(&g_comp_device_qp, dev_qp, link);
+ }
+
+ TAILQ_INSERT_TAIL(&g_compress_devs, device, link);
+
+ if (strcmp(device->cdev_info.driver_name, QAT_PMD) == 0) {
+ g_qat_available = true;
+ }
+ if (strcmp(device->cdev_info.driver_name, ISAL_PMD) == 0) {
+ g_isal_available = true;
+ }
+
+ return 0;
+
+err:
+ TAILQ_FOREACH_SAFE(dev_qp, &g_comp_device_qp, link, tmp_qp) {
+ TAILQ_REMOVE(&g_comp_device_qp, dev_qp, link);
+ free(dev_qp);
+ }
+ free(device);
+ return rc;
+}
+
+/* Called from driver init entry point, vbdev_compress_init() */
+static int
+vbdev_init_compress_drivers(void)
+{
+ uint8_t cdev_count, i;
+ struct compress_dev *tmp_dev;
+ struct compress_dev *device;
+ int rc;
+
+ /* We always init the compress_isal PMD */
+ rc = rte_vdev_init(ISAL_PMD, NULL);
+ if (rc == 0) {
+ SPDK_NOTICELOG("created virtual PMD %s\n", ISAL_PMD);
+ } else if (rc == -EEXIST) {
+ SPDK_NOTICELOG("virtual PMD %s already exists.\n", ISAL_PMD);
+ } else {
+ SPDK_ERRLOG("creating virtual PMD %s\n", ISAL_PMD);
+ return -EINVAL;
+ }
+
+ /* If we have no compression devices, there's no reason to continue. */
+ cdev_count = rte_compressdev_count();
+ if (cdev_count == 0) {
+ return 0;
+ }
+ if (cdev_count > RTE_COMPRESS_MAX_DEVS) {
+ SPDK_ERRLOG("invalid device count from rte_compressdev_count()\n");
+ return -EINVAL;
+ }
+
+ g_mbuf_mp = rte_pktmbuf_pool_create("comp_mbuf_mp", NUM_MBUFS, POOL_CACHE_SIZE,
+ sizeof(struct rte_mbuf), 0, rte_socket_id());
+ if (g_mbuf_mp == NULL) {
+ SPDK_ERRLOG("Cannot create mbuf pool\n");
+ rc = -ENOMEM;
+ goto error_create_mbuf;
+ }
+
+ g_comp_op_mp = rte_comp_op_pool_create("comp_op_pool", NUM_MBUFS, POOL_CACHE_SIZE,
+ 0, rte_socket_id());
+ if (g_comp_op_mp == NULL) {
+ SPDK_ERRLOG("Cannot create comp op pool\n");
+ rc = -ENOMEM;
+ goto error_create_op;
+ }
+
+ /* Init all devices */
+ for (i = 0; i < cdev_count; i++) {
+ rc = create_compress_dev(i);
+ if (rc != 0) {
+ goto error_create_compress_devs;
+ }
+ }
+
+ if (g_qat_available == true) {
+ SPDK_NOTICELOG("initialized QAT PMD\n");
+ }
+
+ g_shinfo.free_cb = shinfo_free_cb;
+
+ return 0;
+
+ /* Error cleanup paths. */
+error_create_compress_devs:
+ TAILQ_FOREACH_SAFE(device, &g_compress_devs, link, tmp_dev) {
+ TAILQ_REMOVE(&g_compress_devs, device, link);
+ free(device);
+ }
+error_create_op:
+error_create_mbuf:
+ rte_mempool_free(g_mbuf_mp);
+
+ return rc;
+}
+
+/* for completing rw requests on the orig IO thread. */
+static void
+_reduce_rw_blocks_cb(void *arg)
+{
+ struct comp_bdev_io *io_ctx = arg;
+
+ if (io_ctx->status == 0) {
+ spdk_bdev_io_complete(io_ctx->orig_io, SPDK_BDEV_IO_STATUS_SUCCESS);
+ } else {
+ SPDK_ERRLOG("status %d on operation from reduce API\n", io_ctx->status);
+ spdk_bdev_io_complete(io_ctx->orig_io, SPDK_BDEV_IO_STATUS_FAILED);
+ }
+}
+
+/* Completion callback for r/w that were issued via reducelib. */
+static void
+reduce_rw_blocks_cb(void *arg, int reduce_errno)
+{
+ struct spdk_bdev_io *bdev_io = arg;
+ struct comp_bdev_io *io_ctx = (struct comp_bdev_io *)bdev_io->driver_ctx;
+ struct spdk_io_channel *ch = spdk_io_channel_from_ctx(io_ctx->comp_ch);
+ struct spdk_thread *orig_thread;
+
+ /* TODO: need to decide which error codes are bdev_io success vs failure;
+ * example examine calls reading metadata */
+
+ io_ctx->status = reduce_errno;
+
+ /* Send this request to the orig IO thread. */
+ orig_thread = spdk_io_channel_get_thread(ch);
+ if (orig_thread != spdk_get_thread()) {
+ spdk_thread_send_msg(orig_thread, _reduce_rw_blocks_cb, io_ctx);
+ } else {
+ _reduce_rw_blocks_cb(io_ctx);
+ }
+}
+
+static uint64_t
+_setup_compress_mbuf(struct rte_mbuf **mbufs, int *mbuf_total, uint64_t *total_length,
+ struct iovec *iovs, int iovcnt, void *reduce_cb_arg)
+{
+ uint64_t updated_length, remainder, phys_addr;
+ uint8_t *current_base = NULL;
+ int iov_index, mbuf_index;
+ int rc = 0;
+
+ /* Setup mbufs */
+ iov_index = mbuf_index = 0;
+ while (iov_index < iovcnt) {
+
+ current_base = iovs[iov_index].iov_base;
+ if (total_length) {
+ *total_length += iovs[iov_index].iov_len;
+ }
+ assert(mbufs[mbuf_index] != NULL);
+ mbufs[mbuf_index]->userdata = reduce_cb_arg;
+ updated_length = iovs[iov_index].iov_len;
+ phys_addr = spdk_vtophys((void *)current_base, &updated_length);
+
+ rte_pktmbuf_attach_extbuf(mbufs[mbuf_index],
+ current_base,
+ phys_addr,
+ updated_length,
+ &g_shinfo);
+ rte_pktmbuf_append(mbufs[mbuf_index], updated_length);
+ remainder = iovs[iov_index].iov_len - updated_length;
+
+ if (mbuf_index > 0) {
+ rte_pktmbuf_chain(mbufs[0], mbufs[mbuf_index]);
+ }
+
+ /* If we crossed 2 2MB boundary we need another mbuf for the remainder */
+ if (remainder > 0) {
+ /* allocate an mbuf at the end of the array */
+ rc = rte_pktmbuf_alloc_bulk(g_mbuf_mp,
+ (struct rte_mbuf **)&mbufs[*mbuf_total], 1);
+ if (rc) {
+ SPDK_ERRLOG("ERROR trying to get an extra mbuf!\n");
+ return -1;
+ }
+ (*mbuf_total)++;
+ mbuf_index++;
+ mbufs[mbuf_index]->userdata = reduce_cb_arg;
+ current_base += updated_length;
+ phys_addr = spdk_vtophys((void *)current_base, &remainder);
+ /* assert we don't cross another */
+ assert(remainder == iovs[iov_index].iov_len - updated_length);
+
+ rte_pktmbuf_attach_extbuf(mbufs[mbuf_index],
+ current_base,
+ phys_addr,
+ remainder,
+ &g_shinfo);
+ rte_pktmbuf_append(mbufs[mbuf_index], remainder);
+ rte_pktmbuf_chain(mbufs[0], mbufs[mbuf_index]);
+ }
+ iov_index++;
+ mbuf_index++;
+ }
+
+ return 0;
+}
+
+static int
+_compress_operation(struct spdk_reduce_backing_dev *backing_dev, struct iovec *src_iovs,
+ int src_iovcnt, struct iovec *dst_iovs,
+ int dst_iovcnt, bool compress, void *cb_arg)
+{
+ void *reduce_cb_arg = cb_arg;
+ struct vbdev_compress *comp_bdev = SPDK_CONTAINEROF(backing_dev, struct vbdev_compress,
+ backing_dev);
+ struct rte_comp_op *comp_op;
+ struct rte_mbuf *src_mbufs[MAX_MBUFS_PER_OP];
+ struct rte_mbuf *dst_mbufs[MAX_MBUFS_PER_OP];
+ uint8_t cdev_id = comp_bdev->device_qp->device->cdev_id;
+ uint64_t total_length = 0;
+ int rc = 0;
+ struct vbdev_comp_op *op_to_queue;
+ int i;
+ int src_mbuf_total = src_iovcnt;
+ int dst_mbuf_total = dst_iovcnt;
+ bool device_error = false;
+
+ assert(src_iovcnt < MAX_MBUFS_PER_OP);
+
+#ifdef DEBUG
+ memset(src_mbufs, 0, sizeof(src_mbufs));
+ memset(dst_mbufs, 0, sizeof(dst_mbufs));
+#endif
+
+ comp_op = rte_comp_op_alloc(g_comp_op_mp);
+ if (!comp_op) {
+ SPDK_ERRLOG("trying to get a comp op!\n");
+ goto error_get_op;
+ }
+
+ /* get an mbuf per iov, src and dst */
+ rc = rte_pktmbuf_alloc_bulk(g_mbuf_mp, (struct rte_mbuf **)&src_mbufs[0], src_iovcnt);
+ if (rc) {
+ SPDK_ERRLOG("ERROR trying to get src_mbufs!\n");
+ goto error_get_src;
+ }
+
+ rc = rte_pktmbuf_alloc_bulk(g_mbuf_mp, (struct rte_mbuf **)&dst_mbufs[0], dst_iovcnt);
+ if (rc) {
+ SPDK_ERRLOG("ERROR trying to get dst_mbufs!\n");
+ goto error_get_dst;
+ }
+
+ /* There is a 1:1 mapping between a bdev_io and a compression operation, but
+ * all compression PMDs that SPDK uses support chaining so build our mbuf chain
+ * and associate with our single comp_op.
+ */
+
+ rc = _setup_compress_mbuf(&src_mbufs[0], &src_mbuf_total, &total_length,
+ src_iovs, src_iovcnt, reduce_cb_arg);
+ if (rc < 0) {
+ goto error_src_dst;
+ }
+
+ comp_op->m_src = src_mbufs[0];
+ comp_op->src.offset = 0;
+ comp_op->src.length = total_length;
+
+ /* setup dst mbufs, for the current test being used with this code there's only one vector */
+ rc = _setup_compress_mbuf(&dst_mbufs[0], &dst_mbuf_total, NULL,
+ dst_iovs, dst_iovcnt, reduce_cb_arg);
+ if (rc < 0) {
+ goto error_src_dst;
+ }
+
+ comp_op->m_dst = dst_mbufs[0];
+ comp_op->dst.offset = 0;
+
+ if (compress == true) {
+ comp_op->private_xform = comp_bdev->device_qp->device->comp_xform;
+ } else {
+ comp_op->private_xform = comp_bdev->device_qp->device->decomp_xform;
+ }
+
+ comp_op->op_type = RTE_COMP_OP_STATELESS;
+ comp_op->flush_flag = RTE_COMP_FLUSH_FINAL;
+
+ rc = rte_compressdev_enqueue_burst(cdev_id, comp_bdev->device_qp->qp, &comp_op, 1);
+ assert(rc <= 1);
+
+ /* We always expect 1 got queued, if 0 then we need to queue it up. */
+ if (rc == 1) {
+ return 0;
+ } else if (comp_op->status == RTE_COMP_OP_STATUS_NOT_PROCESSED) {
+ /* we free mbufs differently depending on whether they were chained or not */
+ rte_pktmbuf_free(comp_op->m_src);
+ rte_pktmbuf_free(comp_op->m_dst);
+ goto error_enqueue;
+ } else {
+ device_error = true;
+ goto error_src_dst;
+ }
+
+ /* Error cleanup paths. */
+error_src_dst:
+ for (i = 0; i < dst_mbuf_total; i++) {
+ rte_pktmbuf_free((struct rte_mbuf *)&dst_mbufs[i]);
+ }
+error_get_dst:
+ for (i = 0; i < src_mbuf_total; i++) {
+ rte_pktmbuf_free((struct rte_mbuf *)&src_mbufs[i]);
+ }
+error_get_src:
+error_enqueue:
+ rte_comp_op_free(comp_op);
+error_get_op:
+
+ if (device_error == true) {
+ /* There was an error sending the op to the device, most
+ * likely with the parameters.
+ */
+ SPDK_ERRLOG("Compression API returned 0x%x\n", comp_op->status);
+ return -EINVAL;
+ }
+
+ op_to_queue = calloc(1, sizeof(struct vbdev_comp_op));
+ if (op_to_queue == NULL) {
+ SPDK_ERRLOG("unable to allocate operation for queueing.\n");
+ return -ENOMEM;
+ }
+ op_to_queue->backing_dev = backing_dev;
+ op_to_queue->src_iovs = src_iovs;
+ op_to_queue->src_iovcnt = src_iovcnt;
+ op_to_queue->dst_iovs = dst_iovs;
+ op_to_queue->dst_iovcnt = dst_iovcnt;
+ op_to_queue->compress = compress;
+ op_to_queue->cb_arg = cb_arg;
+ TAILQ_INSERT_TAIL(&comp_bdev->queued_comp_ops,
+ op_to_queue,
+ link);
+ return 0;
+}
+
+/* Poller for the DPDK compression driver. */
+static int
+comp_dev_poller(void *args)
+{
+ struct vbdev_compress *comp_bdev = args;
+ uint8_t cdev_id = comp_bdev->device_qp->device->cdev_id;
+ struct rte_comp_op *deq_ops[NUM_MAX_INFLIGHT_OPS];
+ uint16_t num_deq;
+ struct spdk_reduce_vol_cb_args *reduce_args;
+ struct vbdev_comp_op *op_to_resubmit;
+ int rc, i;
+
+ num_deq = rte_compressdev_dequeue_burst(cdev_id, comp_bdev->device_qp->qp, deq_ops,
+ NUM_MAX_INFLIGHT_OPS);
+ for (i = 0; i < num_deq; i++) {
+ reduce_args = (struct spdk_reduce_vol_cb_args *)deq_ops[i]->m_src->userdata;
+
+ if (deq_ops[i]->status == RTE_COMP_OP_STATUS_SUCCESS) {
+
+ /* tell reduce this is done and what the bytecount was */
+ reduce_args->cb_fn(reduce_args->cb_arg, deq_ops[i]->produced);
+ } else {
+ SPDK_NOTICELOG("FYI storing data uncompressed due to deque status %u\n",
+ deq_ops[i]->status);
+
+ /* Reduce will simply store uncompressed on neg errno value. */
+ reduce_args->cb_fn(reduce_args->cb_arg, -EINVAL);
+ }
+
+ /* Now free both mbufs and the compress operation. The rte_pktmbuf_free()
+ * call takes care of freeing all of the mbufs in the chain back to their
+ * original pool.
+ */
+ rte_pktmbuf_free(deq_ops[i]->m_src);
+ rte_pktmbuf_free(deq_ops[i]->m_dst);
+
+ /* There is no bulk free for com ops so we have to free them one at a time
+ * here however it would be rare that we'd ever have more than 1 at a time
+ * anyways.
+ */
+ rte_comp_op_free(deq_ops[i]);
+
+ /* Check if there are any pending comp ops to process, only pull one
+ * at a time off as _compress_operation() may re-queue the op.
+ */
+ if (!TAILQ_EMPTY(&comp_bdev->queued_comp_ops)) {
+ op_to_resubmit = TAILQ_FIRST(&comp_bdev->queued_comp_ops);
+ rc = _compress_operation(op_to_resubmit->backing_dev,
+ op_to_resubmit->src_iovs,
+ op_to_resubmit->src_iovcnt,
+ op_to_resubmit->dst_iovs,
+ op_to_resubmit->dst_iovcnt,
+ op_to_resubmit->compress,
+ op_to_resubmit->cb_arg);
+ if (rc == 0) {
+ TAILQ_REMOVE(&comp_bdev->queued_comp_ops, op_to_resubmit, link);
+ free(op_to_resubmit);
+ }
+ }
+ }
+ return num_deq == 0 ? SPDK_POLLER_IDLE : SPDK_POLLER_BUSY;
+}
+
+/* Entry point for reduce lib to issue a compress operation. */
+static void
+_comp_reduce_compress(struct spdk_reduce_backing_dev *dev,
+ struct iovec *src_iovs, int src_iovcnt,
+ struct iovec *dst_iovs, int dst_iovcnt,
+ struct spdk_reduce_vol_cb_args *cb_arg)
+{
+ int rc;
+
+ rc = _compress_operation(dev, src_iovs, src_iovcnt, dst_iovs, dst_iovcnt, true, cb_arg);
+ if (rc) {
+ SPDK_ERRLOG("with compress operation code %d (%s)\n", rc, spdk_strerror(-rc));
+ cb_arg->cb_fn(cb_arg->cb_arg, rc);
+ }
+}
+
+/* Entry point for reduce lib to issue a decompress operation. */
+static void
+_comp_reduce_decompress(struct spdk_reduce_backing_dev *dev,
+ struct iovec *src_iovs, int src_iovcnt,
+ struct iovec *dst_iovs, int dst_iovcnt,
+ struct spdk_reduce_vol_cb_args *cb_arg)
+{
+ int rc;
+
+ rc = _compress_operation(dev, src_iovs, src_iovcnt, dst_iovs, dst_iovcnt, false, cb_arg);
+ if (rc) {
+ SPDK_ERRLOG("with decompress operation code %d (%s)\n", rc, spdk_strerror(-rc));
+ cb_arg->cb_fn(cb_arg->cb_arg, rc);
+ }
+}
+
+/* Callback for getting a buf from the bdev pool in the event that the caller passed
+ * in NULL, we need to own the buffer so it doesn't get freed by another vbdev module
+ * beneath us before we're done with it.
+ */
+static void
+comp_read_get_buf_cb(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io, bool success)
+{
+ struct vbdev_compress *comp_bdev = SPDK_CONTAINEROF(bdev_io->bdev, struct vbdev_compress,
+ comp_bdev);
+
+ spdk_reduce_vol_readv(comp_bdev->vol, bdev_io->u.bdev.iovs, bdev_io->u.bdev.iovcnt,
+ bdev_io->u.bdev.offset_blocks, bdev_io->u.bdev.num_blocks,
+ reduce_rw_blocks_cb, bdev_io);
+}
+
+/* scheduled for completion on IO thread */
+static void
+_complete_other_io(void *arg)
+{
+ struct comp_bdev_io *io_ctx = (struct comp_bdev_io *)arg;
+ if (io_ctx->status == 0) {
+ spdk_bdev_io_complete(io_ctx->orig_io, SPDK_BDEV_IO_STATUS_SUCCESS);
+ } else {
+ spdk_bdev_io_complete(io_ctx->orig_io, SPDK_BDEV_IO_STATUS_FAILED);
+ }
+}
+
+/* scheduled for submission on reduce thread */
+static void
+_comp_bdev_io_submit(void *arg)
+{
+ struct spdk_bdev_io *bdev_io = arg;
+ struct comp_bdev_io *io_ctx = (struct comp_bdev_io *)bdev_io->driver_ctx;
+ struct spdk_io_channel *ch = spdk_io_channel_from_ctx(io_ctx->comp_ch);
+ struct vbdev_compress *comp_bdev = SPDK_CONTAINEROF(bdev_io->bdev, struct vbdev_compress,
+ comp_bdev);
+ struct spdk_thread *orig_thread;
+ int rc = 0;
+
+ switch (bdev_io->type) {
+ case SPDK_BDEV_IO_TYPE_READ:
+ spdk_bdev_io_get_buf(bdev_io, comp_read_get_buf_cb,
+ bdev_io->u.bdev.num_blocks * bdev_io->bdev->blocklen);
+ return;
+ case SPDK_BDEV_IO_TYPE_WRITE:
+ spdk_reduce_vol_writev(comp_bdev->vol, bdev_io->u.bdev.iovs, bdev_io->u.bdev.iovcnt,
+ bdev_io->u.bdev.offset_blocks, bdev_io->u.bdev.num_blocks,
+ reduce_rw_blocks_cb, bdev_io);
+ return;
+ /* TODO in future patch in the series */
+ case SPDK_BDEV_IO_TYPE_RESET:
+ break;
+ case SPDK_BDEV_IO_TYPE_WRITE_ZEROES:
+ case SPDK_BDEV_IO_TYPE_UNMAP:
+ case SPDK_BDEV_IO_TYPE_FLUSH:
+ default:
+ SPDK_ERRLOG("Unknown I/O type %d\n", bdev_io->type);
+ rc = -EINVAL;
+ }
+
+ if (rc) {
+ if (rc == -ENOMEM) {
+ SPDK_ERRLOG("No memory, start to queue io for compress.\n");
+ io_ctx->ch = ch;
+ vbdev_compress_queue_io(bdev_io);
+ return;
+ } else {
+ SPDK_ERRLOG("on bdev_io submission!\n");
+ io_ctx->status = rc;
+ }
+ }
+
+ /* Complete this on the orig IO thread. */
+ orig_thread = spdk_io_channel_get_thread(ch);
+ if (orig_thread != spdk_get_thread()) {
+ spdk_thread_send_msg(orig_thread, _complete_other_io, io_ctx);
+ } else {
+ _complete_other_io(io_ctx);
+ }
+}
+
+/* Called when someone above submits IO to this vbdev. */
+static void
+vbdev_compress_submit_request(struct spdk_io_channel *ch, struct spdk_bdev_io *bdev_io)
+{
+ struct comp_bdev_io *io_ctx = (struct comp_bdev_io *)bdev_io->driver_ctx;
+ struct vbdev_compress *comp_bdev = SPDK_CONTAINEROF(bdev_io->bdev, struct vbdev_compress,
+ comp_bdev);
+ struct comp_io_channel *comp_ch = spdk_io_channel_get_ctx(ch);
+
+ memset(io_ctx, 0, sizeof(struct comp_bdev_io));
+ io_ctx->comp_bdev = comp_bdev;
+ io_ctx->comp_ch = comp_ch;
+ io_ctx->orig_io = bdev_io;
+
+ /* Send this request to the reduce_thread if that's not what we're on. */
+ if (spdk_get_thread() != comp_bdev->reduce_thread) {
+ spdk_thread_send_msg(comp_bdev->reduce_thread, _comp_bdev_io_submit, bdev_io);
+ } else {
+ _comp_bdev_io_submit(bdev_io);
+ }
+}
+
+static bool
+vbdev_compress_io_type_supported(void *ctx, enum spdk_bdev_io_type io_type)
+{
+ struct vbdev_compress *comp_bdev = (struct vbdev_compress *)ctx;
+
+ switch (io_type) {
+ case SPDK_BDEV_IO_TYPE_READ:
+ case SPDK_BDEV_IO_TYPE_WRITE:
+ return spdk_bdev_io_type_supported(comp_bdev->base_bdev, io_type);
+ case SPDK_BDEV_IO_TYPE_UNMAP:
+ case SPDK_BDEV_IO_TYPE_RESET:
+ case SPDK_BDEV_IO_TYPE_FLUSH:
+ case SPDK_BDEV_IO_TYPE_WRITE_ZEROES:
+ default:
+ return false;
+ }
+}
+
+/* Resubmission function used by the bdev layer when a queued IO is ready to be
+ * submitted.
+ */
+static void
+vbdev_compress_resubmit_io(void *arg)
+{
+ struct spdk_bdev_io *bdev_io = (struct spdk_bdev_io *)arg;
+ struct comp_bdev_io *io_ctx = (struct comp_bdev_io *)bdev_io->driver_ctx;
+
+ vbdev_compress_submit_request(io_ctx->ch, bdev_io);
+}
+
+/* Used to queue an IO in the event of resource issues. */
+static void
+vbdev_compress_queue_io(struct spdk_bdev_io *bdev_io)
+{
+ struct comp_bdev_io *io_ctx = (struct comp_bdev_io *)bdev_io->driver_ctx;
+ int rc;
+
+ io_ctx->bdev_io_wait.bdev = bdev_io->bdev;
+ io_ctx->bdev_io_wait.cb_fn = vbdev_compress_resubmit_io;
+ io_ctx->bdev_io_wait.cb_arg = bdev_io;
+
+ rc = spdk_bdev_queue_io_wait(bdev_io->bdev, io_ctx->comp_bdev->base_ch, &io_ctx->bdev_io_wait);
+ if (rc) {
+ SPDK_ERRLOG("Queue io failed in vbdev_compress_queue_io, rc=%d.\n", rc);
+ assert(false);
+ spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED);
+ }
+}
+
+/* Callback for unregistering the IO device. */
+static void
+_device_unregister_cb(void *io_device)
+{
+ struct vbdev_compress *comp_bdev = io_device;
+
+ /* Done with this comp_bdev. */
+ pthread_mutex_destroy(&comp_bdev->reduce_lock);
+ free(comp_bdev->comp_bdev.name);
+ free(comp_bdev);
+}
+
+static void
+_vbdev_compress_destruct_cb(void *ctx)
+{
+ struct vbdev_compress *comp_bdev = ctx;
+
+ TAILQ_REMOVE(&g_vbdev_comp, comp_bdev, link);
+ spdk_bdev_module_release_bdev(comp_bdev->base_bdev);
+ /* Close the underlying bdev on its same opened thread. */
+ spdk_bdev_close(comp_bdev->base_desc);
+ comp_bdev->vol = NULL;
+ if (comp_bdev->orphaned == false) {
+ spdk_io_device_unregister(comp_bdev, _device_unregister_cb);
+ } else {
+ vbdev_compress_delete_done(comp_bdev->delete_ctx, 0);
+ _device_unregister_cb(comp_bdev);
+ }
+}
+
+static void
+vbdev_compress_destruct_cb(void *cb_arg, int reduce_errno)
+{
+ struct vbdev_compress *comp_bdev = (struct vbdev_compress *)cb_arg;
+
+ if (reduce_errno) {
+ SPDK_ERRLOG("number %d\n", reduce_errno);
+ } else {
+ if (comp_bdev->thread && comp_bdev->thread != spdk_get_thread()) {
+ spdk_thread_send_msg(comp_bdev->thread,
+ _vbdev_compress_destruct_cb, comp_bdev);
+ } else {
+ _vbdev_compress_destruct_cb(comp_bdev);
+ }
+ }
+}
+
+static void
+_reduce_destroy_cb(void *ctx, int reduce_errno)
+{
+ struct vbdev_compress *comp_bdev = (struct vbdev_compress *)ctx;
+
+ if (reduce_errno) {
+ SPDK_ERRLOG("number %d\n", reduce_errno);
+ }
+
+ comp_bdev->vol = NULL;
+ spdk_put_io_channel(comp_bdev->base_ch);
+ if (comp_bdev->orphaned == false) {
+ spdk_bdev_unregister(&comp_bdev->comp_bdev, vbdev_compress_delete_done,
+ comp_bdev->delete_ctx);
+ } else {
+ vbdev_compress_destruct_cb((void *)comp_bdev, 0);
+ }
+
+}
+
+static void
+_delete_vol_unload_cb(void *ctx)
+{
+ struct vbdev_compress *comp_bdev = ctx;
+
+ /* FIXME: Assert if these conditions are not satisified for now. */
+ assert(!comp_bdev->reduce_thread ||
+ comp_bdev->reduce_thread == spdk_get_thread());
+
+ /* reducelib needs a channel to comm with the backing device */
+ comp_bdev->base_ch = spdk_bdev_get_io_channel(comp_bdev->base_desc);
+
+ /* Clean the device before we free our resources. */
+ spdk_reduce_vol_destroy(&comp_bdev->backing_dev, _reduce_destroy_cb, comp_bdev);
+}
+
+/* Called by reduceLib after performing unload vol actions */
+static void
+delete_vol_unload_cb(void *cb_arg, int reduce_errno)
+{
+ struct vbdev_compress *comp_bdev = (struct vbdev_compress *)cb_arg;
+
+ if (reduce_errno) {
+ SPDK_ERRLOG("number %d\n", reduce_errno);
+ /* FIXME: callback should be executed. */
+ return;
+ }
+
+ pthread_mutex_lock(&comp_bdev->reduce_lock);
+ if (comp_bdev->reduce_thread && comp_bdev->reduce_thread != spdk_get_thread()) {
+ spdk_thread_send_msg(comp_bdev->reduce_thread,
+ _delete_vol_unload_cb, comp_bdev);
+ pthread_mutex_unlock(&comp_bdev->reduce_lock);
+ } else {
+ pthread_mutex_unlock(&comp_bdev->reduce_lock);
+
+ _delete_vol_unload_cb(comp_bdev);
+ }
+}
+
+const char *
+compress_get_name(const struct vbdev_compress *comp_bdev)
+{
+ return comp_bdev->comp_bdev.name;
+}
+
+struct vbdev_compress *
+compress_bdev_first(void)
+{
+ struct vbdev_compress *comp_bdev;
+
+ comp_bdev = TAILQ_FIRST(&g_vbdev_comp);
+
+ return comp_bdev;
+}
+
+struct vbdev_compress *
+compress_bdev_next(struct vbdev_compress *prev)
+{
+ struct vbdev_compress *comp_bdev;
+
+ comp_bdev = TAILQ_NEXT(prev, link);
+
+ return comp_bdev;
+}
+
+bool
+compress_has_orphan(const char *name)
+{
+ struct vbdev_compress *comp_bdev;
+
+ TAILQ_FOREACH(comp_bdev, &g_vbdev_comp, link) {
+ if (comp_bdev->orphaned && strcmp(name, comp_bdev->comp_bdev.name) == 0) {
+ return true;
+ }
+ }
+ return false;
+}
+
+/* Called after we've unregistered following a hot remove callback.
+ * Our finish entry point will be called next.
+ */
+static int
+vbdev_compress_destruct(void *ctx)
+{
+ struct vbdev_compress *comp_bdev = (struct vbdev_compress *)ctx;
+
+ if (comp_bdev->vol != NULL) {
+ /* Tell reducelib that we're done with this volume. */
+ spdk_reduce_vol_unload(comp_bdev->vol, vbdev_compress_destruct_cb, comp_bdev);
+ } else {
+ vbdev_compress_destruct_cb(comp_bdev, 0);
+ }
+
+ return 0;
+}
+
+/* We supplied this as an entry point for upper layers who want to communicate to this
+ * bdev. This is how they get a channel.
+ */
+static struct spdk_io_channel *
+vbdev_compress_get_io_channel(void *ctx)
+{
+ struct vbdev_compress *comp_bdev = (struct vbdev_compress *)ctx;
+
+ /* The IO channel code will allocate a channel for us which consists of
+ * the SPDK channel structure plus the size of our comp_io_channel struct
+ * that we passed in when we registered our IO device. It will then call
+ * our channel create callback to populate any elements that we need to
+ * update.
+ */
+ return spdk_get_io_channel(comp_bdev);
+}
+
+/* This is the output for bdev_get_bdevs() for this vbdev */
+static int
+vbdev_compress_dump_info_json(void *ctx, struct spdk_json_write_ctx *w)
+{
+ struct vbdev_compress *comp_bdev = (struct vbdev_compress *)ctx;
+
+ spdk_json_write_name(w, "compress");
+ spdk_json_write_object_begin(w);
+ spdk_json_write_named_string(w, "name", spdk_bdev_get_name(&comp_bdev->comp_bdev));
+ spdk_json_write_named_string(w, "base_bdev_name", spdk_bdev_get_name(comp_bdev->base_bdev));
+ spdk_json_write_named_string(w, "compression_pmd", comp_bdev->drv_name);
+ spdk_json_write_object_end(w);
+
+ return 0;
+}
+
+/* This is used to generate JSON that can configure this module to its current state. */
+static int
+vbdev_compress_config_json(struct spdk_json_write_ctx *w)
+{
+ struct vbdev_compress *comp_bdev;
+
+ TAILQ_FOREACH(comp_bdev, &g_vbdev_comp, link) {
+ spdk_json_write_object_begin(w);
+ spdk_json_write_named_string(w, "method", "bdev_compress_create");
+ spdk_json_write_named_object_begin(w, "params");
+ spdk_json_write_named_string(w, "base_bdev_name", spdk_bdev_get_name(comp_bdev->base_bdev));
+ spdk_json_write_named_string(w, "name", spdk_bdev_get_name(&comp_bdev->comp_bdev));
+ spdk_json_write_named_string(w, "compression_pmd", comp_bdev->drv_name);
+ spdk_json_write_object_end(w);
+ spdk_json_write_object_end(w);
+ }
+ return 0;
+}
+
+static void
+_vbdev_reduce_init_cb(void *ctx)
+{
+ struct vbdev_compress *meta_ctx = ctx;
+
+ /* We're done with metadata operations */
+ spdk_put_io_channel(meta_ctx->base_ch);
+ /* Close the underlying bdev on its same opened thread. */
+ spdk_bdev_close(meta_ctx->base_desc);
+ meta_ctx->base_desc = NULL;
+
+ if (meta_ctx->vol) {
+ vbdev_compress_claim(meta_ctx);
+ } else {
+ free(meta_ctx);
+ }
+}
+
+/* Callback from reduce for when init is complete. We'll pass the vbdev_comp struct
+ * used for initial metadata operations to claim where it will be further filled out
+ * and added to the global list.
+ */
+static void
+vbdev_reduce_init_cb(void *cb_arg, struct spdk_reduce_vol *vol, int reduce_errno)
+{
+ struct vbdev_compress *meta_ctx = cb_arg;
+
+ if (reduce_errno == 0) {
+ meta_ctx->vol = vol;
+ } else {
+ SPDK_ERRLOG("for vol %s, error %u\n",
+ spdk_bdev_get_name(meta_ctx->base_bdev), reduce_errno);
+ }
+
+ if (meta_ctx->thread && meta_ctx->thread != spdk_get_thread()) {
+ spdk_thread_send_msg(meta_ctx->thread, _vbdev_reduce_init_cb, meta_ctx);
+ } else {
+ _vbdev_reduce_init_cb(meta_ctx);
+ }
+}
+
+/* Callback for the function used by reduceLib to perform IO to/from the backing device. We just
+ * call the callback provided by reduceLib when it called the read/write/unmap function and
+ * free the bdev_io.
+ */
+static void
+comp_reduce_io_cb(struct spdk_bdev_io *bdev_io, bool success, void *arg)
+{
+ struct spdk_reduce_vol_cb_args *cb_args = arg;
+ int reduce_errno;
+
+ if (success) {
+ reduce_errno = 0;
+ } else {
+ reduce_errno = -EIO;
+ }
+ spdk_bdev_free_io(bdev_io);
+ cb_args->cb_fn(cb_args->cb_arg, reduce_errno);
+}
+
+/* This is the function provided to the reduceLib for sending reads directly to
+ * the backing device.
+ */
+static void
+_comp_reduce_readv(struct spdk_reduce_backing_dev *dev, struct iovec *iov, int iovcnt,
+ uint64_t lba, uint32_t lba_count, struct spdk_reduce_vol_cb_args *args)
+{
+ struct vbdev_compress *comp_bdev = SPDK_CONTAINEROF(dev, struct vbdev_compress,
+ backing_dev);
+ int rc;
+
+ rc = spdk_bdev_readv_blocks(comp_bdev->base_desc, comp_bdev->base_ch,
+ iov, iovcnt, lba, lba_count,
+ comp_reduce_io_cb,
+ args);
+ if (rc) {
+ if (rc == -ENOMEM) {
+ SPDK_ERRLOG("No memory, start to queue io.\n");
+ /* TODO: there's no bdev_io to queue */
+ } else {
+ SPDK_ERRLOG("submitting readv request\n");
+ }
+ args->cb_fn(args->cb_arg, rc);
+ }
+}
+
+/* This is the function provided to the reduceLib for sending writes directly to
+ * the backing device.
+ */
+static void
+_comp_reduce_writev(struct spdk_reduce_backing_dev *dev, struct iovec *iov, int iovcnt,
+ uint64_t lba, uint32_t lba_count, struct spdk_reduce_vol_cb_args *args)
+{
+ struct vbdev_compress *comp_bdev = SPDK_CONTAINEROF(dev, struct vbdev_compress,
+ backing_dev);
+ int rc;
+
+ rc = spdk_bdev_writev_blocks(comp_bdev->base_desc, comp_bdev->base_ch,
+ iov, iovcnt, lba, lba_count,
+ comp_reduce_io_cb,
+ args);
+ if (rc) {
+ if (rc == -ENOMEM) {
+ SPDK_ERRLOG("No memory, start to queue io.\n");
+ /* TODO: there's no bdev_io to queue */
+ } else {
+ SPDK_ERRLOG("error submitting writev request\n");
+ }
+ args->cb_fn(args->cb_arg, rc);
+ }
+}
+
+/* This is the function provided to the reduceLib for sending unmaps directly to
+ * the backing device.
+ */
+static void
+_comp_reduce_unmap(struct spdk_reduce_backing_dev *dev,
+ uint64_t lba, uint32_t lba_count, struct spdk_reduce_vol_cb_args *args)
+{
+ struct vbdev_compress *comp_bdev = SPDK_CONTAINEROF(dev, struct vbdev_compress,
+ backing_dev);
+ int rc;
+
+ rc = spdk_bdev_unmap_blocks(comp_bdev->base_desc, comp_bdev->base_ch,
+ lba, lba_count,
+ comp_reduce_io_cb,
+ args);
+
+ if (rc) {
+ if (rc == -ENOMEM) {
+ SPDK_ERRLOG("No memory, start to queue io.\n");
+ /* TODO: there's no bdev_io to queue */
+ } else {
+ SPDK_ERRLOG("submitting unmap request\n");
+ }
+ args->cb_fn(args->cb_arg, rc);
+ }
+}
+
+/* Called by reduceLib after performing unload vol actions following base bdev hotremove */
+static void
+bdev_hotremove_vol_unload_cb(void *cb_arg, int reduce_errno)
+{
+ struct vbdev_compress *comp_bdev = (struct vbdev_compress *)cb_arg;
+
+ if (reduce_errno) {
+ SPDK_ERRLOG("number %d\n", reduce_errno);
+ }
+
+ comp_bdev->vol = NULL;
+ spdk_bdev_unregister(&comp_bdev->comp_bdev, NULL, NULL);
+}
+
+/* Called when the underlying base bdev goes away. */
+static void
+vbdev_compress_base_bdev_hotremove_cb(void *ctx)
+{
+ struct vbdev_compress *comp_bdev, *tmp;
+ struct spdk_bdev *bdev_find = ctx;
+
+ TAILQ_FOREACH_SAFE(comp_bdev, &g_vbdev_comp, link, tmp) {
+ if (bdev_find == comp_bdev->base_bdev) {
+ /* Tell reduceLib that we're done with this volume. */
+ spdk_reduce_vol_unload(comp_bdev->vol, bdev_hotremove_vol_unload_cb, comp_bdev);
+ }
+ }
+}
+
+/* TODO: determine which parms we want user configurable, HC for now
+ * params.vol_size
+ * params.chunk_size
+ * compression PMD, algorithm, window size, comp level, etc.
+ * DEV_MD_PATH
+ */
+
+/* Common function for init and load to allocate and populate the minimal
+ * information for reducelib to init or load.
+ */
+struct vbdev_compress *
+_prepare_for_load_init(struct spdk_bdev *bdev, uint32_t lb_size)
+{
+ struct vbdev_compress *meta_ctx;
+
+ meta_ctx = calloc(1, sizeof(struct vbdev_compress));
+ if (meta_ctx == NULL) {
+ SPDK_ERRLOG("failed to alloc init contexts\n");
+ return NULL;
+ }
+
+ meta_ctx->drv_name = "None";
+ meta_ctx->base_bdev = bdev;
+ meta_ctx->backing_dev.unmap = _comp_reduce_unmap;
+ meta_ctx->backing_dev.readv = _comp_reduce_readv;
+ meta_ctx->backing_dev.writev = _comp_reduce_writev;
+ meta_ctx->backing_dev.compress = _comp_reduce_compress;
+ meta_ctx->backing_dev.decompress = _comp_reduce_decompress;
+
+ meta_ctx->backing_dev.blocklen = bdev->blocklen;
+ meta_ctx->backing_dev.blockcnt = bdev->blockcnt;
+
+ meta_ctx->params.chunk_size = CHUNK_SIZE;
+ if (lb_size == 0) {
+ meta_ctx->params.logical_block_size = bdev->blocklen;
+ } else {
+ meta_ctx->params.logical_block_size = lb_size;
+ }
+
+ meta_ctx->params.backing_io_unit_size = BACKING_IO_SZ;
+ return meta_ctx;
+}
+
+static bool
+_set_pmd(struct vbdev_compress *comp_dev)
+{
+ if (g_opts == COMPRESS_PMD_AUTO) {
+ if (g_qat_available) {
+ comp_dev->drv_name = QAT_PMD;
+ } else {
+ comp_dev->drv_name = ISAL_PMD;
+ }
+ } else if (g_opts == COMPRESS_PMD_QAT_ONLY && g_qat_available) {
+ comp_dev->drv_name = QAT_PMD;
+ } else if (g_opts == COMPRESS_PMD_ISAL_ONLY && g_isal_available) {
+ comp_dev->drv_name = ISAL_PMD;
+ } else {
+ SPDK_ERRLOG("Requested PMD is not available.\n");
+ return false;
+ }
+ SPDK_NOTICELOG("PMD being used: %s\n", comp_dev->drv_name);
+ return true;
+}
+
+/* Call reducelib to initialize a new volume */
+static int
+vbdev_init_reduce(struct spdk_bdev *bdev, const char *pm_path, uint32_t lb_size)
+{
+ struct vbdev_compress *meta_ctx;
+ int rc;
+
+ meta_ctx = _prepare_for_load_init(bdev, lb_size);
+ if (meta_ctx == NULL) {
+ return -EINVAL;
+ }
+
+ if (_set_pmd(meta_ctx) == false) {
+ SPDK_ERRLOG("could not find required pmd\n");
+ free(meta_ctx);
+ return -EINVAL;
+ }
+
+ rc = spdk_bdev_open(meta_ctx->base_bdev, true, vbdev_compress_base_bdev_hotremove_cb,
+ meta_ctx->base_bdev, &meta_ctx->base_desc);
+ if (rc) {
+ SPDK_ERRLOG("could not open bdev %s\n", spdk_bdev_get_name(meta_ctx->base_bdev));
+ free(meta_ctx);
+ return -EINVAL;
+ }
+
+ /* Save the thread where the base device is opened */
+ meta_ctx->thread = spdk_get_thread();
+
+ meta_ctx->base_ch = spdk_bdev_get_io_channel(meta_ctx->base_desc);
+
+ spdk_reduce_vol_init(&meta_ctx->params, &meta_ctx->backing_dev,
+ pm_path,
+ vbdev_reduce_init_cb,
+ meta_ctx);
+ return 0;
+}
+
+/* We provide this callback for the SPDK channel code to create a channel using
+ * the channel struct we provided in our module get_io_channel() entry point. Here
+ * we get and save off an underlying base channel of the device below us so that
+ * we can communicate with the base bdev on a per channel basis. If we needed
+ * our own poller for this vbdev, we'd register it here.
+ */
+static int
+comp_bdev_ch_create_cb(void *io_device, void *ctx_buf)
+{
+ struct vbdev_compress *comp_bdev = io_device;
+ struct comp_device_qp *device_qp;
+
+ /* Now set the reduce channel if it's not already set. */
+ pthread_mutex_lock(&comp_bdev->reduce_lock);
+ if (comp_bdev->ch_count == 0) {
+ /* We use this queue to track outstanding IO in our layer. */
+ TAILQ_INIT(&comp_bdev->pending_comp_ios);
+
+ /* We use this to queue up compression operations as needed. */
+ TAILQ_INIT(&comp_bdev->queued_comp_ops);
+
+ comp_bdev->base_ch = spdk_bdev_get_io_channel(comp_bdev->base_desc);
+ comp_bdev->reduce_thread = spdk_get_thread();
+ comp_bdev->poller = SPDK_POLLER_REGISTER(comp_dev_poller, comp_bdev, 0);
+ /* Now assign a q pair */
+ pthread_mutex_lock(&g_comp_device_qp_lock);
+ TAILQ_FOREACH(device_qp, &g_comp_device_qp, link) {
+ if (strcmp(device_qp->device->cdev_info.driver_name, comp_bdev->drv_name) == 0) {
+ if (device_qp->thread == spdk_get_thread()) {
+ comp_bdev->device_qp = device_qp;
+ break;
+ }
+ if (device_qp->thread == NULL) {
+ comp_bdev->device_qp = device_qp;
+ device_qp->thread = spdk_get_thread();
+ break;
+ }
+ }
+ }
+ pthread_mutex_unlock(&g_comp_device_qp_lock);
+ }
+ comp_bdev->ch_count++;
+ pthread_mutex_unlock(&comp_bdev->reduce_lock);
+
+ if (comp_bdev->device_qp != NULL) {
+ return 0;
+ } else {
+ SPDK_ERRLOG("out of qpairs, cannot assign one to comp_bdev %p\n", comp_bdev);
+ assert(false);
+ return -ENOMEM;
+ }
+}
+
+static void
+_channel_cleanup(struct vbdev_compress *comp_bdev)
+{
+ /* Note: comp_bdevs can share a device_qp if they are
+ * on the same thread so we leave the device_qp element
+ * alone for this comp_bdev and just clear the reduce thread.
+ */
+ spdk_put_io_channel(comp_bdev->base_ch);
+ comp_bdev->reduce_thread = NULL;
+ spdk_poller_unregister(&comp_bdev->poller);
+}
+
+/* Used to reroute destroy_ch to the correct thread */
+static void
+_comp_bdev_ch_destroy_cb(void *arg)
+{
+ struct vbdev_compress *comp_bdev = arg;
+
+ pthread_mutex_lock(&comp_bdev->reduce_lock);
+ _channel_cleanup(comp_bdev);
+ pthread_mutex_unlock(&comp_bdev->reduce_lock);
+}
+
+/* We provide this callback for the SPDK channel code to destroy a channel
+ * created with our create callback. We just need to undo anything we did
+ * when we created. If this bdev used its own poller, we'd unregister it here.
+ */
+static void
+comp_bdev_ch_destroy_cb(void *io_device, void *ctx_buf)
+{
+ struct vbdev_compress *comp_bdev = io_device;
+
+ pthread_mutex_lock(&comp_bdev->reduce_lock);
+ comp_bdev->ch_count--;
+ if (comp_bdev->ch_count == 0) {
+ /* Send this request to the thread where the channel was created. */
+ if (comp_bdev->reduce_thread != spdk_get_thread()) {
+ spdk_thread_send_msg(comp_bdev->reduce_thread,
+ _comp_bdev_ch_destroy_cb, comp_bdev);
+ } else {
+ _channel_cleanup(comp_bdev);
+ }
+ }
+ pthread_mutex_unlock(&comp_bdev->reduce_lock);
+}
+
+/* RPC entry point for compression vbdev creation. */
+int
+create_compress_bdev(const char *bdev_name, const char *pm_path, uint32_t lb_size)
+{
+ struct spdk_bdev *bdev;
+
+ bdev = spdk_bdev_get_by_name(bdev_name);
+ if (!bdev) {
+ return -ENODEV;
+ }
+
+ if ((lb_size != 0) && (lb_size != LB_SIZE_4K) && (lb_size != LB_SIZE_512B)) {
+ SPDK_ERRLOG("Logical block size must be 512 or 4096\n");
+ return -EINVAL;
+ }
+
+ return vbdev_init_reduce(bdev, pm_path, lb_size);
+}
+
+/* On init, just init the compress drivers. All metadata is stored on disk. */
+static int
+vbdev_compress_init(void)
+{
+ if (vbdev_init_compress_drivers()) {
+ SPDK_ERRLOG("Error setting up compression devices\n");
+ return -EINVAL;
+ }
+
+ return 0;
+}
+
+/* Called when the entire module is being torn down. */
+static void
+vbdev_compress_finish(void)
+{
+ struct comp_device_qp *dev_qp;
+ /* TODO: unload vol in a future patch */
+
+ while ((dev_qp = TAILQ_FIRST(&g_comp_device_qp))) {
+ TAILQ_REMOVE(&g_comp_device_qp, dev_qp, link);
+ free(dev_qp);
+ }
+ pthread_mutex_destroy(&g_comp_device_qp_lock);
+
+ rte_mempool_free(g_comp_op_mp);
+ rte_mempool_free(g_mbuf_mp);
+}
+
+/* During init we'll be asked how much memory we'd like passed to us
+ * in bev_io structures as context. Here's where we specify how
+ * much context we want per IO.
+ */
+static int
+vbdev_compress_get_ctx_size(void)
+{
+ return sizeof(struct comp_bdev_io);
+}
+
+/* When we register our bdev this is how we specify our entry points. */
+static const struct spdk_bdev_fn_table vbdev_compress_fn_table = {
+ .destruct = vbdev_compress_destruct,
+ .submit_request = vbdev_compress_submit_request,
+ .io_type_supported = vbdev_compress_io_type_supported,
+ .get_io_channel = vbdev_compress_get_io_channel,
+ .dump_info_json = vbdev_compress_dump_info_json,
+ .write_config_json = NULL,
+};
+
+static struct spdk_bdev_module compress_if = {
+ .name = "compress",
+ .module_init = vbdev_compress_init,
+ .config_text = NULL,
+ .get_ctx_size = vbdev_compress_get_ctx_size,
+ .examine_disk = vbdev_compress_examine,
+ .module_fini = vbdev_compress_finish,
+ .config_json = vbdev_compress_config_json
+};
+
+SPDK_BDEV_MODULE_REGISTER(compress, &compress_if)
+
+static int _set_compbdev_name(struct vbdev_compress *comp_bdev)
+{
+ struct spdk_bdev_alias *aliases;
+
+ if (!TAILQ_EMPTY(spdk_bdev_get_aliases(comp_bdev->base_bdev))) {
+ aliases = TAILQ_FIRST(spdk_bdev_get_aliases(comp_bdev->base_bdev));
+ comp_bdev->comp_bdev.name = spdk_sprintf_alloc("COMP_%s", aliases->alias);
+ if (!comp_bdev->comp_bdev.name) {
+ SPDK_ERRLOG("could not allocate comp_bdev name for alias\n");
+ return -ENOMEM;
+ }
+ } else {
+ comp_bdev->comp_bdev.name = spdk_sprintf_alloc("COMP_%s", comp_bdev->base_bdev->name);
+ if (!comp_bdev->comp_bdev.name) {
+ SPDK_ERRLOG("could not allocate comp_bdev name for unique name\n");
+ return -ENOMEM;
+ }
+ }
+ return 0;
+}
+
+static void
+vbdev_compress_claim(struct vbdev_compress *comp_bdev)
+{
+ int rc;
+
+ if (_set_compbdev_name(comp_bdev)) {
+ goto error_bdev_name;
+ }
+
+ /* Note: some of the fields below will change in the future - for example,
+ * blockcnt specifically will not match (the compressed volume size will
+ * be slightly less than the base bdev size)
+ */
+ comp_bdev->comp_bdev.product_name = COMP_BDEV_NAME;
+ comp_bdev->comp_bdev.write_cache = comp_bdev->base_bdev->write_cache;
+
+ if (strcmp(comp_bdev->drv_name, QAT_PMD) == 0) {
+ comp_bdev->comp_bdev.required_alignment =
+ spdk_max(spdk_u32log2(comp_bdev->base_bdev->blocklen),
+ comp_bdev->base_bdev->required_alignment);
+ SPDK_NOTICELOG("QAT in use: Required alignment set to %u\n",
+ comp_bdev->comp_bdev.required_alignment);
+ } else {
+ comp_bdev->comp_bdev.required_alignment = comp_bdev->base_bdev->required_alignment;
+ }
+ comp_bdev->comp_bdev.optimal_io_boundary =
+ comp_bdev->params.chunk_size / comp_bdev->params.logical_block_size;
+
+ comp_bdev->comp_bdev.split_on_optimal_io_boundary = true;
+
+ comp_bdev->comp_bdev.blocklen = comp_bdev->params.logical_block_size;
+ comp_bdev->comp_bdev.blockcnt = comp_bdev->params.vol_size / comp_bdev->comp_bdev.blocklen;
+ assert(comp_bdev->comp_bdev.blockcnt > 0);
+
+ /* This is the context that is passed to us when the bdev
+ * layer calls in so we'll save our comp_bdev node here.
+ */
+ comp_bdev->comp_bdev.ctxt = comp_bdev;
+ comp_bdev->comp_bdev.fn_table = &vbdev_compress_fn_table;
+ comp_bdev->comp_bdev.module = &compress_if;
+
+ pthread_mutex_init(&comp_bdev->reduce_lock, NULL);
+
+ rc = spdk_bdev_open(comp_bdev->base_bdev, true, vbdev_compress_base_bdev_hotremove_cb,
+ comp_bdev->base_bdev, &comp_bdev->base_desc);
+ if (rc) {
+ SPDK_ERRLOG("could not open bdev %s\n", spdk_bdev_get_name(comp_bdev->base_bdev));
+ goto error_open;
+ }
+
+ /* Save the thread where the base device is opened */
+ comp_bdev->thread = spdk_get_thread();
+
+ spdk_io_device_register(comp_bdev, comp_bdev_ch_create_cb, comp_bdev_ch_destroy_cb,
+ sizeof(struct comp_io_channel),
+ comp_bdev->comp_bdev.name);
+
+ rc = spdk_bdev_module_claim_bdev(comp_bdev->base_bdev, comp_bdev->base_desc,
+ comp_bdev->comp_bdev.module);
+ if (rc) {
+ SPDK_ERRLOG("could not claim bdev %s\n", spdk_bdev_get_name(comp_bdev->base_bdev));
+ goto error_claim;
+ }
+
+ rc = spdk_bdev_register(&comp_bdev->comp_bdev);
+ if (rc < 0) {
+ SPDK_ERRLOG("trying to register bdev\n");
+ goto error_bdev_register;
+ }
+
+ TAILQ_INSERT_TAIL(&g_vbdev_comp, comp_bdev, link);
+
+ SPDK_NOTICELOG("registered io_device and virtual bdev for: %s\n", comp_bdev->comp_bdev.name);
+
+ return;
+ /* Error cleanup paths. */
+error_bdev_register:
+ spdk_bdev_module_release_bdev(comp_bdev->base_bdev);
+error_claim:
+ spdk_io_device_unregister(comp_bdev, NULL);
+ spdk_bdev_close(comp_bdev->base_desc);
+error_open:
+ free(comp_bdev->comp_bdev.name);
+error_bdev_name:
+ free(comp_bdev);
+}
+
+static void
+_vbdev_compress_delete_done(void *_ctx)
+{
+ struct vbdev_comp_delete_ctx *ctx = _ctx;
+
+ ctx->cb_fn(ctx->cb_arg, ctx->cb_rc);
+
+ free(ctx);
+}
+
+static void
+vbdev_compress_delete_done(void *cb_arg, int bdeverrno)
+{
+ struct vbdev_comp_delete_ctx *ctx = cb_arg;
+
+ ctx->cb_rc = bdeverrno;
+
+ if (ctx->orig_thread != spdk_get_thread()) {
+ spdk_thread_send_msg(ctx->orig_thread, _vbdev_compress_delete_done, ctx);
+ } else {
+ _vbdev_compress_delete_done(ctx);
+ }
+}
+
+void
+bdev_compress_delete(const char *name, spdk_delete_compress_complete cb_fn, void *cb_arg)
+{
+ struct vbdev_compress *comp_bdev = NULL;
+ struct vbdev_comp_delete_ctx *ctx;
+
+ TAILQ_FOREACH(comp_bdev, &g_vbdev_comp, link) {
+ if (strcmp(name, comp_bdev->comp_bdev.name) == 0) {
+ break;
+ }
+ }
+
+ if (comp_bdev == NULL) {
+ cb_fn(cb_arg, -ENODEV);
+ return;
+ }
+
+ ctx = calloc(1, sizeof(*ctx));
+ if (ctx == NULL) {
+ SPDK_ERRLOG("Failed to allocate delete context\n");
+ cb_fn(cb_arg, -ENOMEM);
+ return;
+ }
+
+ /* Save these for after the vol is destroyed. */
+ ctx->cb_fn = cb_fn;
+ ctx->cb_arg = cb_arg;
+ ctx->orig_thread = spdk_get_thread();
+
+ comp_bdev->delete_ctx = ctx;
+
+ /* Tell reducelib that we're done with this volume. */
+ if (comp_bdev->orphaned == false) {
+ spdk_reduce_vol_unload(comp_bdev->vol, delete_vol_unload_cb, comp_bdev);
+ } else {
+ delete_vol_unload_cb(comp_bdev, 0);
+ }
+}
+
+static void
+_vbdev_reduce_load_cb(void *ctx)
+{
+ struct vbdev_compress *meta_ctx = ctx;
+ int rc;
+
+ /* Done with metadata operations */
+ spdk_put_io_channel(meta_ctx->base_ch);
+ /* Close the underlying bdev on its same opened thread. */
+ spdk_bdev_close(meta_ctx->base_desc);
+ meta_ctx->base_desc = NULL;
+
+ if (meta_ctx->reduce_errno == 0) {
+ if (_set_pmd(meta_ctx) == false) {
+ SPDK_ERRLOG("could not find required pmd\n");
+ goto err;
+ }
+
+ vbdev_compress_claim(meta_ctx);
+ } else if (meta_ctx->reduce_errno == -ENOENT) {
+ if (_set_compbdev_name(meta_ctx)) {
+ goto err;
+ }
+
+ /* We still want to open and claim the backing device to protect the data until
+ * either the pm metadata file is recovered or the comp bdev is deleted.
+ */
+ rc = spdk_bdev_open(meta_ctx->base_bdev, true, vbdev_compress_base_bdev_hotremove_cb,
+ meta_ctx->base_bdev, &meta_ctx->base_desc);
+ if (rc) {
+ SPDK_ERRLOG("could not open bdev %s\n", spdk_bdev_get_name(meta_ctx->base_bdev));
+ free(meta_ctx->comp_bdev.name);
+ goto err;
+ }
+
+ /* Save the thread where the base device is opened */
+ meta_ctx->thread = spdk_get_thread();
+
+ meta_ctx->comp_bdev.module = &compress_if;
+ pthread_mutex_init(&meta_ctx->reduce_lock, NULL);
+ rc = spdk_bdev_module_claim_bdev(meta_ctx->base_bdev, meta_ctx->base_desc,
+ meta_ctx->comp_bdev.module);
+ if (rc) {
+ SPDK_ERRLOG("could not claim bdev %s\n", spdk_bdev_get_name(meta_ctx->base_bdev));
+ spdk_bdev_close(meta_ctx->base_desc);
+ free(meta_ctx->comp_bdev.name);
+ goto err;
+ }
+
+ meta_ctx->orphaned = true;
+ TAILQ_INSERT_TAIL(&g_vbdev_comp, meta_ctx, link);
+ } else {
+ if (meta_ctx->reduce_errno != -EILSEQ) {
+ SPDK_ERRLOG("for vol %s, error %u\n",
+ spdk_bdev_get_name(meta_ctx->base_bdev), meta_ctx->reduce_errno);
+ }
+ goto err;
+ }
+
+ spdk_bdev_module_examine_done(&compress_if);
+ return;
+
+err:
+ free(meta_ctx);
+ spdk_bdev_module_examine_done(&compress_if);
+}
+
+/* Callback from reduce for then load is complete. We'll pass the vbdev_comp struct
+ * used for initial metadata operations to claim where it will be further filled out
+ * and added to the global list.
+ */
+static void
+vbdev_reduce_load_cb(void *cb_arg, struct spdk_reduce_vol *vol, int reduce_errno)
+{
+ struct vbdev_compress *meta_ctx = cb_arg;
+
+ if (reduce_errno == 0) {
+ /* Update information following volume load. */
+ meta_ctx->vol = vol;
+ memcpy(&meta_ctx->params, spdk_reduce_vol_get_params(vol),
+ sizeof(struct spdk_reduce_vol_params));
+ }
+
+ meta_ctx->reduce_errno = reduce_errno;
+
+ if (meta_ctx->thread && meta_ctx->thread != spdk_get_thread()) {
+ spdk_thread_send_msg(meta_ctx->thread, _vbdev_reduce_load_cb, meta_ctx);
+ } else {
+ _vbdev_reduce_load_cb(meta_ctx);
+ }
+
+}
+
+/* Examine_disk entry point: will do a metadata load to see if this is ours,
+ * and if so will go ahead and claim it.
+ */
+static void
+vbdev_compress_examine(struct spdk_bdev *bdev)
+{
+ struct vbdev_compress *meta_ctx;
+ int rc;
+
+ if (strcmp(bdev->product_name, COMP_BDEV_NAME) == 0) {
+ spdk_bdev_module_examine_done(&compress_if);
+ return;
+ }
+
+ meta_ctx = _prepare_for_load_init(bdev, 0);
+ if (meta_ctx == NULL) {
+ spdk_bdev_module_examine_done(&compress_if);
+ return;
+ }
+
+ rc = spdk_bdev_open(meta_ctx->base_bdev, false, vbdev_compress_base_bdev_hotremove_cb,
+ meta_ctx->base_bdev, &meta_ctx->base_desc);
+ if (rc) {
+ SPDK_ERRLOG("could not open bdev %s\n", spdk_bdev_get_name(meta_ctx->base_bdev));
+ free(meta_ctx);
+ spdk_bdev_module_examine_done(&compress_if);
+ return;
+ }
+
+ /* Save the thread where the base device is opened */
+ meta_ctx->thread = spdk_get_thread();
+
+ meta_ctx->base_ch = spdk_bdev_get_io_channel(meta_ctx->base_desc);
+ spdk_reduce_vol_load(&meta_ctx->backing_dev, vbdev_reduce_load_cb, meta_ctx);
+}
+
+int
+compress_set_pmd(enum compress_pmd *opts)
+{
+ g_opts = *opts;
+
+ return 0;
+}
+
+SPDK_LOG_REGISTER_COMPONENT("vbdev_compress", SPDK_LOG_VBDEV_COMPRESS)
diff --git a/src/spdk/module/bdev/compress/vbdev_compress.h b/src/spdk/module/bdev/compress/vbdev_compress.h
new file mode 100644
index 000000000..4dcd78f60
--- /dev/null
+++ b/src/spdk/module/bdev/compress/vbdev_compress.h
@@ -0,0 +1,106 @@
+/*-
+ * BSD LICENSE
+ *
+ * Copyright (c) Intel Corporation.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions
+ * are met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above copyright
+ * notice, this list of conditions and the following disclaimer in
+ * the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Intel Corporation nor the names of its
+ * contributors may be used to endorse or promote products derived
+ * from this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+#ifndef SPDK_VBDEV_COMPRESS_H
+#define SPDK_VBDEV_COMPRESS_H
+
+#include "spdk/stdinc.h"
+
+#include "spdk/bdev.h"
+
+#define LB_SIZE_4K 0x1000UL
+#define LB_SIZE_512B 0x200UL
+
+/**
+ * Get the first compression bdev.
+ *
+ * \return the first compression bdev.
+ */
+struct vbdev_compress *compress_bdev_first(void);
+
+/**
+ * Get the next compression bdev.
+ *
+ * \param prev previous compression bdev.
+ * \return the next compression bdev.
+ */
+struct vbdev_compress *compress_bdev_next(struct vbdev_compress *prev);
+
+/**
+ * Test to see if a compression bdev orphan exists.
+ *
+ * \param name The name of the compression bdev.
+ * \return true if found, false if not.
+ */
+bool compress_has_orphan(const char *name);
+
+/**
+ * Get the name of a compression bdev.
+ *
+ * \param comp_bdev The compression bdev.
+ * \return the name of the compression bdev.
+ */
+const char *compress_get_name(const struct vbdev_compress *comp_bdev);
+
+enum compress_pmd {
+ COMPRESS_PMD_AUTO = 0,
+ COMPRESS_PMD_QAT_ONLY,
+ COMPRESS_PMD_ISAL_ONLY,
+ COMPRESS_PMD_MAX
+};
+
+int compress_set_pmd(enum compress_pmd *opts);
+
+typedef void (*spdk_delete_compress_complete)(void *cb_arg, int bdeverrno);
+
+/**
+ * Create new compression bdev.
+ *
+ * \param bdev_name Bdev on which compression bdev will be created.
+ * \param pm_path Path to persistent memory.
+ * \param lb_size Logical block size for the compressed volume in bytes. Must be 4K or 512.
+ * \return 0 on success, other on failure.
+ */
+int create_compress_bdev(const char *bdev_name, const char *pm_path, uint32_t lb_size);
+
+/**
+ * Delete compress bdev.
+ *
+ * \param bdev_name Bdev on which compression bdev will be deleted.
+ * \param cb_fn Function to call after deletion.
+ * \param cb_arg Argument to pass to cb_fn.
+ */
+void bdev_compress_delete(const char *bdev_name, spdk_delete_compress_complete cb_fn,
+ void *cb_arg);
+
+#endif /* SPDK_VBDEV_COMPRESS_H */
diff --git a/src/spdk/module/bdev/compress/vbdev_compress_rpc.c b/src/spdk/module/bdev/compress/vbdev_compress_rpc.c
new file mode 100644
index 000000000..9eedae066
--- /dev/null
+++ b/src/spdk/module/bdev/compress/vbdev_compress_rpc.c
@@ -0,0 +1,252 @@
+/*-
+ * BSD LICENSE
+ *
+ * Copyright (c) Intel Corporation.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions
+ * are met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above copyright
+ * notice, this list of conditions and the following disclaimer in
+ * the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Intel Corporation nor the names of its
+ * contributors may be used to endorse or promote products derived
+ * from this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+#include "vbdev_compress.h"
+#include "spdk/rpc.h"
+#include "spdk/util.h"
+#include "spdk/string.h"
+#include "spdk_internal/log.h"
+
+struct rpc_bdev_compress_get_orphans {
+ char *name;
+};
+
+static void
+free_rpc_bdev_compress_get_orphans(struct rpc_bdev_compress_get_orphans *r)
+{
+ free(r->name);
+}
+
+static const struct spdk_json_object_decoder rpc_bdev_compress_get_orphans_decoders[] = {
+ {"name", offsetof(struct rpc_bdev_compress_get_orphans, name), spdk_json_decode_string, true},
+};
+
+static void
+rpc_bdev_compress_get_orphans(struct spdk_jsonrpc_request *request,
+ const struct spdk_json_val *params)
+{
+ struct rpc_bdev_compress_get_orphans req = {};
+ struct spdk_json_write_ctx *w;
+ struct vbdev_compress *comp_bdev;
+ bool found = false;
+
+
+ if (params && spdk_json_decode_object(params, rpc_bdev_compress_get_orphans_decoders,
+ SPDK_COUNTOF(rpc_bdev_compress_get_orphans_decoders),
+ &req)) {
+ SPDK_ERRLOG("spdk_json_decode_object failed\n");
+ spdk_jsonrpc_send_error_response(request, SPDK_JSONRPC_ERROR_INTERNAL_ERROR,
+ "spdk_json_decode_object failed");
+ free_rpc_bdev_compress_get_orphans(&req);
+ return;
+ }
+
+ if (req.name) {
+ if (compress_has_orphan(req.name) == false) {
+ spdk_jsonrpc_send_error_response(request, -ENODEV, spdk_strerror(ENODEV));
+ free_rpc_bdev_compress_get_orphans(&req);
+ return;
+ }
+ found = true;
+ }
+
+ w = spdk_jsonrpc_begin_result(request);
+ spdk_json_write_array_begin(w);
+ if (found) {
+ spdk_json_write_string(w, req.name);
+ } else {
+ for (comp_bdev = compress_bdev_first(); comp_bdev != NULL;
+ comp_bdev = compress_bdev_next(comp_bdev)) {
+ if (compress_has_orphan(compress_get_name(comp_bdev))) {
+ spdk_json_write_string(w, compress_get_name(comp_bdev));
+ }
+ }
+ }
+ spdk_json_write_array_end(w);
+ spdk_jsonrpc_end_result(request, w);
+ free_rpc_bdev_compress_get_orphans(&req);
+}
+SPDK_RPC_REGISTER("bdev_compress_get_orphans", rpc_bdev_compress_get_orphans, SPDK_RPC_RUNTIME)
+
+struct rpc_compress_set_pmd {
+ enum compress_pmd pmd;
+};
+
+static const struct spdk_json_object_decoder rpc_compress_pmd_decoder[] = {
+ {"pmd", offsetof(struct rpc_compress_set_pmd, pmd), spdk_json_decode_int32},
+};
+
+static void
+rpc_compress_set_pmd(struct spdk_jsonrpc_request *request,
+ const struct spdk_json_val *params)
+{
+ struct rpc_compress_set_pmd req;
+ struct spdk_json_write_ctx *w;
+ int rc = 0;
+
+ if (spdk_json_decode_object(params, rpc_compress_pmd_decoder,
+ SPDK_COUNTOF(rpc_compress_pmd_decoder),
+ &req)) {
+ SPDK_ERRLOG("spdk_json_decode_object failed\n");
+ spdk_jsonrpc_send_error_response(request, SPDK_JSONRPC_ERROR_INTERNAL_ERROR,
+ "spdk_json_decode_object failed");
+ return;
+ }
+
+ if (req.pmd >= COMPRESS_PMD_MAX) {
+ spdk_jsonrpc_send_error_response_fmt(request, -EINVAL,
+ "PMD value %d should be less than %d", req.pmd, COMPRESS_PMD_MAX);
+ return;
+ }
+
+ rc = compress_set_pmd(&req.pmd);
+ if (rc) {
+ spdk_jsonrpc_send_error_response(request, rc, spdk_strerror(-rc));
+ return;
+ }
+
+ w = spdk_jsonrpc_begin_result(request);
+ if (w != NULL) {
+ spdk_json_write_bool(w, true);
+ spdk_jsonrpc_end_result(request, w);
+ }
+}
+SPDK_RPC_REGISTER("compress_set_pmd", rpc_compress_set_pmd,
+ SPDK_RPC_STARTUP | SPDK_RPC_RUNTIME)
+SPDK_RPC_REGISTER_ALIAS_DEPRECATED(compress_set_pmd, set_compress_pmd)
+
+/* Structure to hold the parameters for this RPC method. */
+struct rpc_construct_compress {
+ char *base_bdev_name;
+ char *pm_path;
+ uint32_t lb_size;
+};
+
+/* Free the allocated memory resource after the RPC handling. */
+static void
+free_rpc_construct_compress(struct rpc_construct_compress *r)
+{
+ free(r->base_bdev_name);
+ free(r->pm_path);
+}
+
+/* Structure to decode the input parameters for this RPC method. */
+static const struct spdk_json_object_decoder rpc_construct_compress_decoders[] = {
+ {"base_bdev_name", offsetof(struct rpc_construct_compress, base_bdev_name), spdk_json_decode_string},
+ {"pm_path", offsetof(struct rpc_construct_compress, pm_path), spdk_json_decode_string},
+ {"lb_size", offsetof(struct rpc_construct_compress, lb_size), spdk_json_decode_uint32},
+};
+
+/* Decode the parameters for this RPC method and properly construct the compress
+ * device. Error status returned in the failed cases.
+ */
+static void
+rpc_bdev_compress_create(struct spdk_jsonrpc_request *request,
+ const struct spdk_json_val *params)
+{
+ struct rpc_construct_compress req = {NULL};
+ struct spdk_json_write_ctx *w;
+ char *name;
+ int rc;
+
+ if (spdk_json_decode_object(params, rpc_construct_compress_decoders,
+ SPDK_COUNTOF(rpc_construct_compress_decoders),
+ &req)) {
+ SPDK_DEBUGLOG(SPDK_LOG_VBDEV_COMPRESS, "spdk_json_decode_object failed\n");
+ spdk_jsonrpc_send_error_response(request, SPDK_JSONRPC_ERROR_PARSE_ERROR,
+ "spdk_json_decode_object failed");
+ goto cleanup;
+ }
+
+ rc = create_compress_bdev(req.base_bdev_name, req.pm_path, req.lb_size);
+ if (rc != 0) {
+ spdk_jsonrpc_send_error_response(request, rc, spdk_strerror(-rc));
+ goto cleanup;
+ }
+
+ w = spdk_jsonrpc_begin_result(request);
+ name = spdk_sprintf_alloc("COMP_%s", req.base_bdev_name);
+ spdk_json_write_string(w, name);
+ spdk_jsonrpc_end_result(request, w);
+ free(name);
+
+cleanup:
+ free_rpc_construct_compress(&req);
+}
+SPDK_RPC_REGISTER("bdev_compress_create", rpc_bdev_compress_create, SPDK_RPC_RUNTIME)
+SPDK_RPC_REGISTER_ALIAS_DEPRECATED(bdev_compress_create, construct_compress_bdev)
+
+struct rpc_delete_compress {
+ char *name;
+};
+
+static void
+free_rpc_delete_compress(struct rpc_delete_compress *req)
+{
+ free(req->name);
+}
+
+static const struct spdk_json_object_decoder rpc_delete_compress_decoders[] = {
+ {"name", offsetof(struct rpc_delete_compress, name), spdk_json_decode_string},
+};
+
+static void
+_rpc_bdev_compress_delete_cb(void *cb_arg, int bdeverrno)
+{
+ struct spdk_jsonrpc_request *request = cb_arg;
+ struct spdk_json_write_ctx *w;
+
+ w = spdk_jsonrpc_begin_result(request);
+ spdk_json_write_bool(w, bdeverrno == 0);
+ spdk_jsonrpc_end_result(request, w);
+}
+
+static void
+rpc_bdev_compress_delete(struct spdk_jsonrpc_request *request,
+ const struct spdk_json_val *params)
+{
+ struct rpc_delete_compress req = {NULL};
+
+ if (spdk_json_decode_object(params, rpc_delete_compress_decoders,
+ SPDK_COUNTOF(rpc_delete_compress_decoders),
+ &req)) {
+ spdk_jsonrpc_send_error_response(request, SPDK_JSONRPC_ERROR_INTERNAL_ERROR,
+ "spdk_json_decode_object failed");
+ } else {
+ bdev_compress_delete(req.name, _rpc_bdev_compress_delete_cb, request);
+ }
+
+ free_rpc_delete_compress(&req);
+}
+SPDK_RPC_REGISTER("bdev_compress_delete", rpc_bdev_compress_delete, SPDK_RPC_RUNTIME)
+SPDK_RPC_REGISTER_ALIAS_DEPRECATED(bdev_compress_delete, delete_compress_bdev)