diff options
Diffstat (limited to 'src/spdk/lib/reduce/reduce.c')
-rw-r--r-- | src/spdk/lib/reduce/reduce.c | 1625 |
1 files changed, 1625 insertions, 0 deletions
diff --git a/src/spdk/lib/reduce/reduce.c b/src/spdk/lib/reduce/reduce.c new file mode 100644 index 000000000..6188f6c6c --- /dev/null +++ b/src/spdk/lib/reduce/reduce.c @@ -0,0 +1,1625 @@ +/*- + * BSD LICENSE + * + * Copyright (c) Intel Corporation. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions + * are met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in + * the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Intel Corporation nor the names of its + * contributors may be used to endorse or promote products derived + * from this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +#include "spdk/stdinc.h" + +#include "spdk/reduce.h" +#include "spdk/env.h" +#include "spdk/string.h" +#include "spdk/bit_array.h" +#include "spdk/util.h" +#include "spdk_internal/log.h" + +#include "libpmem.h" + +/* Always round up the size of the PM region to the nearest cacheline. */ +#define REDUCE_PM_SIZE_ALIGNMENT 64 + +/* Offset into the backing device where the persistent memory file's path is stored. */ +#define REDUCE_BACKING_DEV_PATH_OFFSET 4096 + +#define REDUCE_EMPTY_MAP_ENTRY -1ULL + +#define REDUCE_NUM_VOL_REQUESTS 256 + +/* Structure written to offset 0 of both the pm file and the backing device. */ +struct spdk_reduce_vol_superblock { + uint8_t signature[8]; + struct spdk_reduce_vol_params params; + uint8_t reserved[4048]; +}; +SPDK_STATIC_ASSERT(sizeof(struct spdk_reduce_vol_superblock) == 4096, "size incorrect"); + +#define SPDK_REDUCE_SIGNATURE "SPDKREDU" +/* null terminator counts one */ +SPDK_STATIC_ASSERT(sizeof(SPDK_REDUCE_SIGNATURE) - 1 == + sizeof(((struct spdk_reduce_vol_superblock *)0)->signature), "size incorrect"); + +#define REDUCE_PATH_MAX 4096 + +#define REDUCE_ZERO_BUF_SIZE 0x100000 + +/** + * Describes a persistent memory file used to hold metadata associated with a + * compressed volume. + */ +struct spdk_reduce_pm_file { + char path[REDUCE_PATH_MAX]; + void *pm_buf; + int pm_is_pmem; + uint64_t size; +}; + +#define REDUCE_IO_READV 1 +#define REDUCE_IO_WRITEV 2 + +struct spdk_reduce_chunk_map { + uint32_t compressed_size; + uint32_t reserved; + uint64_t io_unit_index[0]; +}; + +struct spdk_reduce_vol_request { + /** + * Scratch buffer used for uncompressed chunk. This is used for: + * 1) source buffer for compression operations + * 2) destination buffer for decompression operations + * 3) data buffer when writing uncompressed chunk to disk + * 4) data buffer when reading uncompressed chunk from disk + */ + uint8_t *decomp_buf; + struct iovec *decomp_buf_iov; + + /** + * These are used to construct the iovecs that are sent to + * the decomp engine, they point to a mix of the scratch buffer + * and user buffer + */ + struct iovec decomp_iov[REDUCE_MAX_IOVECS + 2]; + int decomp_iovcnt; + + /** + * Scratch buffer used for compressed chunk. This is used for: + * 1) destination buffer for compression operations + * 2) source buffer for decompression operations + * 3) data buffer when writing compressed chunk to disk + * 4) data buffer when reading compressed chunk from disk + */ + uint8_t *comp_buf; + struct iovec *comp_buf_iov; + struct iovec *iov; + bool rmw; + struct spdk_reduce_vol *vol; + int type; + int reduce_errno; + int iovcnt; + int num_backing_ops; + uint32_t num_io_units; + bool chunk_is_compressed; + uint64_t offset; + uint64_t logical_map_index; + uint64_t length; + uint64_t chunk_map_index; + struct spdk_reduce_chunk_map *chunk; + spdk_reduce_vol_op_complete cb_fn; + void *cb_arg; + TAILQ_ENTRY(spdk_reduce_vol_request) tailq; + struct spdk_reduce_vol_cb_args backing_cb_args; +}; + +struct spdk_reduce_vol { + struct spdk_reduce_vol_params params; + uint32_t backing_io_units_per_chunk; + uint32_t backing_lba_per_io_unit; + uint32_t logical_blocks_per_chunk; + struct spdk_reduce_pm_file pm_file; + struct spdk_reduce_backing_dev *backing_dev; + struct spdk_reduce_vol_superblock *backing_super; + struct spdk_reduce_vol_superblock *pm_super; + uint64_t *pm_logical_map; + uint64_t *pm_chunk_maps; + + struct spdk_bit_array *allocated_chunk_maps; + struct spdk_bit_array *allocated_backing_io_units; + + struct spdk_reduce_vol_request *request_mem; + TAILQ_HEAD(, spdk_reduce_vol_request) free_requests; + TAILQ_HEAD(, spdk_reduce_vol_request) executing_requests; + TAILQ_HEAD(, spdk_reduce_vol_request) queued_requests; + + /* Single contiguous buffer used for all request buffers for this volume. */ + uint8_t *buf_mem; + struct iovec *buf_iov_mem; +}; + +static void _start_readv_request(struct spdk_reduce_vol_request *req); +static void _start_writev_request(struct spdk_reduce_vol_request *req); +static uint8_t *g_zero_buf; +static int g_vol_count = 0; + +/* + * Allocate extra metadata chunks and corresponding backing io units to account for + * outstanding IO in worst case scenario where logical map is completely allocated + * and no data can be compressed. We need extra chunks in this case to handle + * in-flight writes since reduce never writes data in place. + */ +#define REDUCE_NUM_EXTRA_CHUNKS 128 + +static void +_reduce_persist(struct spdk_reduce_vol *vol, const void *addr, size_t len) +{ + if (vol->pm_file.pm_is_pmem) { + pmem_persist(addr, len); + } else { + pmem_msync(addr, len); + } +} + +static uint64_t +_get_pm_logical_map_size(uint64_t vol_size, uint64_t chunk_size) +{ + uint64_t chunks_in_logical_map, logical_map_size; + + chunks_in_logical_map = vol_size / chunk_size; + logical_map_size = chunks_in_logical_map * sizeof(uint64_t); + + /* Round up to next cacheline. */ + return spdk_divide_round_up(logical_map_size, REDUCE_PM_SIZE_ALIGNMENT) * + REDUCE_PM_SIZE_ALIGNMENT; +} + +static uint64_t +_get_total_chunks(uint64_t vol_size, uint64_t chunk_size) +{ + uint64_t num_chunks; + + num_chunks = vol_size / chunk_size; + num_chunks += REDUCE_NUM_EXTRA_CHUNKS; + + return num_chunks; +} + +static inline uint32_t +_reduce_vol_get_chunk_struct_size(uint64_t backing_io_units_per_chunk) +{ + return sizeof(struct spdk_reduce_chunk_map) + sizeof(uint64_t) * backing_io_units_per_chunk; +} + +static uint64_t +_get_pm_total_chunks_size(uint64_t vol_size, uint64_t chunk_size, uint64_t backing_io_unit_size) +{ + uint64_t io_units_per_chunk, num_chunks, total_chunks_size; + + num_chunks = _get_total_chunks(vol_size, chunk_size); + io_units_per_chunk = chunk_size / backing_io_unit_size; + + total_chunks_size = num_chunks * _reduce_vol_get_chunk_struct_size(io_units_per_chunk); + + return spdk_divide_round_up(total_chunks_size, REDUCE_PM_SIZE_ALIGNMENT) * + REDUCE_PM_SIZE_ALIGNMENT; +} + +static struct spdk_reduce_chunk_map * +_reduce_vol_get_chunk_map(struct spdk_reduce_vol *vol, uint64_t chunk_map_index) +{ + uintptr_t chunk_map_addr; + + assert(chunk_map_index < _get_total_chunks(vol->params.vol_size, vol->params.chunk_size)); + + chunk_map_addr = (uintptr_t)vol->pm_chunk_maps; + chunk_map_addr += chunk_map_index * + _reduce_vol_get_chunk_struct_size(vol->backing_io_units_per_chunk); + + return (struct spdk_reduce_chunk_map *)chunk_map_addr; +} + +static int +_validate_vol_params(struct spdk_reduce_vol_params *params) +{ + if (params->vol_size > 0) { + /** + * User does not pass in the vol size - it gets calculated by libreduce from + * values in this structure plus the size of the backing device. + */ + return -EINVAL; + } + + if (params->chunk_size == 0 || params->backing_io_unit_size == 0 || + params->logical_block_size == 0) { + return -EINVAL; + } + + /* Chunk size must be an even multiple of the backing io unit size. */ + if ((params->chunk_size % params->backing_io_unit_size) != 0) { + return -EINVAL; + } + + /* Chunk size must be an even multiple of the logical block size. */ + if ((params->chunk_size % params->logical_block_size) != 0) { + return -1; + } + + return 0; +} + +static uint64_t +_get_vol_size(uint64_t chunk_size, uint64_t backing_dev_size) +{ + uint64_t num_chunks; + + num_chunks = backing_dev_size / chunk_size; + if (num_chunks <= REDUCE_NUM_EXTRA_CHUNKS) { + return 0; + } + + num_chunks -= REDUCE_NUM_EXTRA_CHUNKS; + return num_chunks * chunk_size; +} + +static uint64_t +_get_pm_file_size(struct spdk_reduce_vol_params *params) +{ + uint64_t total_pm_size; + + total_pm_size = sizeof(struct spdk_reduce_vol_superblock); + total_pm_size += _get_pm_logical_map_size(params->vol_size, params->chunk_size); + total_pm_size += _get_pm_total_chunks_size(params->vol_size, params->chunk_size, + params->backing_io_unit_size); + return total_pm_size; +} + +const struct spdk_uuid * +spdk_reduce_vol_get_uuid(struct spdk_reduce_vol *vol) +{ + return &vol->params.uuid; +} + +static void +_initialize_vol_pm_pointers(struct spdk_reduce_vol *vol) +{ + uint64_t logical_map_size; + + /* Superblock is at the beginning of the pm file. */ + vol->pm_super = (struct spdk_reduce_vol_superblock *)vol->pm_file.pm_buf; + + /* Logical map immediately follows the super block. */ + vol->pm_logical_map = (uint64_t *)(vol->pm_super + 1); + + /* Chunks maps follow the logical map. */ + logical_map_size = _get_pm_logical_map_size(vol->params.vol_size, vol->params.chunk_size); + vol->pm_chunk_maps = (uint64_t *)((uint8_t *)vol->pm_logical_map + logical_map_size); +} + +/* We need 2 iovs during load - one for the superblock, another for the path */ +#define LOAD_IOV_COUNT 2 + +struct reduce_init_load_ctx { + struct spdk_reduce_vol *vol; + struct spdk_reduce_vol_cb_args backing_cb_args; + spdk_reduce_vol_op_with_handle_complete cb_fn; + void *cb_arg; + struct iovec iov[LOAD_IOV_COUNT]; + void *path; +}; + +static int +_allocate_vol_requests(struct spdk_reduce_vol *vol) +{ + struct spdk_reduce_vol_request *req; + int i; + + /* Allocate 2x since we need buffers for both read/write and compress/decompress + * intermediate buffers. + */ + vol->buf_mem = spdk_malloc(2 * REDUCE_NUM_VOL_REQUESTS * vol->params.chunk_size, + 64, NULL, SPDK_ENV_LCORE_ID_ANY, SPDK_MALLOC_DMA); + if (vol->buf_mem == NULL) { + return -ENOMEM; + } + + vol->request_mem = calloc(REDUCE_NUM_VOL_REQUESTS, sizeof(*req)); + if (vol->request_mem == NULL) { + spdk_free(vol->buf_mem); + vol->buf_mem = NULL; + return -ENOMEM; + } + + /* Allocate 2x since we need iovs for both read/write and compress/decompress intermediate + * buffers. + */ + vol->buf_iov_mem = calloc(REDUCE_NUM_VOL_REQUESTS, + 2 * sizeof(struct iovec) * vol->backing_io_units_per_chunk); + if (vol->buf_iov_mem == NULL) { + free(vol->request_mem); + spdk_free(vol->buf_mem); + vol->request_mem = NULL; + vol->buf_mem = NULL; + return -ENOMEM; + } + + for (i = 0; i < REDUCE_NUM_VOL_REQUESTS; i++) { + req = &vol->request_mem[i]; + TAILQ_INSERT_HEAD(&vol->free_requests, req, tailq); + req->decomp_buf_iov = &vol->buf_iov_mem[(2 * i) * vol->backing_io_units_per_chunk]; + req->decomp_buf = vol->buf_mem + (2 * i) * vol->params.chunk_size; + req->comp_buf_iov = &vol->buf_iov_mem[(2 * i + 1) * vol->backing_io_units_per_chunk]; + req->comp_buf = vol->buf_mem + (2 * i + 1) * vol->params.chunk_size; + } + + return 0; +} + +static void +_init_load_cleanup(struct spdk_reduce_vol *vol, struct reduce_init_load_ctx *ctx) +{ + if (ctx != NULL) { + spdk_free(ctx->path); + free(ctx); + } + + if (vol != NULL) { + if (vol->pm_file.pm_buf != NULL) { + pmem_unmap(vol->pm_file.pm_buf, vol->pm_file.size); + } + + spdk_free(vol->backing_super); + spdk_bit_array_free(&vol->allocated_chunk_maps); + spdk_bit_array_free(&vol->allocated_backing_io_units); + free(vol->request_mem); + free(vol->buf_iov_mem); + spdk_free(vol->buf_mem); + free(vol); + } +} + +static int +_alloc_zero_buff(void) +{ + int rc = 0; + + /* The zero buffer is shared between all volumnes and just used + * for reads so allocate one global instance here if not already + * allocated when another vol init'd or loaded. + */ + if (g_vol_count++ == 0) { + g_zero_buf = spdk_zmalloc(REDUCE_ZERO_BUF_SIZE, + 64, NULL, SPDK_ENV_LCORE_ID_ANY, + SPDK_MALLOC_DMA); + if (g_zero_buf == NULL) { + rc = -ENOMEM; + } + } + return rc; +} + +static void +_init_write_super_cpl(void *cb_arg, int reduce_errno) +{ + struct reduce_init_load_ctx *init_ctx = cb_arg; + int rc; + + rc = _allocate_vol_requests(init_ctx->vol); + if (rc != 0) { + init_ctx->cb_fn(init_ctx->cb_arg, NULL, rc); + _init_load_cleanup(init_ctx->vol, init_ctx); + return; + } + + rc = _alloc_zero_buff(); + if (rc != 0) { + init_ctx->cb_fn(init_ctx->cb_arg, NULL, rc); + _init_load_cleanup(init_ctx->vol, init_ctx); + return; + } + + init_ctx->cb_fn(init_ctx->cb_arg, init_ctx->vol, reduce_errno); + /* Only clean up the ctx - the vol has been passed to the application + * for use now that initialization was successful. + */ + _init_load_cleanup(NULL, init_ctx); +} + +static void +_init_write_path_cpl(void *cb_arg, int reduce_errno) +{ + struct reduce_init_load_ctx *init_ctx = cb_arg; + struct spdk_reduce_vol *vol = init_ctx->vol; + + init_ctx->iov[0].iov_base = vol->backing_super; + init_ctx->iov[0].iov_len = sizeof(*vol->backing_super); + init_ctx->backing_cb_args.cb_fn = _init_write_super_cpl; + init_ctx->backing_cb_args.cb_arg = init_ctx; + vol->backing_dev->writev(vol->backing_dev, init_ctx->iov, 1, + 0, sizeof(*vol->backing_super) / vol->backing_dev->blocklen, + &init_ctx->backing_cb_args); +} + +static int +_allocate_bit_arrays(struct spdk_reduce_vol *vol) +{ + uint64_t total_chunks, total_backing_io_units; + uint32_t i, num_metadata_io_units; + + total_chunks = _get_total_chunks(vol->params.vol_size, vol->params.chunk_size); + vol->allocated_chunk_maps = spdk_bit_array_create(total_chunks); + total_backing_io_units = total_chunks * (vol->params.chunk_size / vol->params.backing_io_unit_size); + vol->allocated_backing_io_units = spdk_bit_array_create(total_backing_io_units); + + if (vol->allocated_chunk_maps == NULL || vol->allocated_backing_io_units == NULL) { + return -ENOMEM; + } + + /* Set backing io unit bits associated with metadata. */ + num_metadata_io_units = (sizeof(*vol->backing_super) + REDUCE_PATH_MAX) / + vol->backing_dev->blocklen; + for (i = 0; i < num_metadata_io_units; i++) { + spdk_bit_array_set(vol->allocated_backing_io_units, i); + } + + return 0; +} + +void +spdk_reduce_vol_init(struct spdk_reduce_vol_params *params, + struct spdk_reduce_backing_dev *backing_dev, + const char *pm_file_dir, + spdk_reduce_vol_op_with_handle_complete cb_fn, void *cb_arg) +{ + struct spdk_reduce_vol *vol; + struct reduce_init_load_ctx *init_ctx; + uint64_t backing_dev_size; + size_t mapped_len; + int dir_len, max_dir_len, rc; + + /* We need to append a path separator and the UUID to the supplied + * path. + */ + max_dir_len = REDUCE_PATH_MAX - SPDK_UUID_STRING_LEN - 1; + dir_len = strnlen(pm_file_dir, max_dir_len); + /* Strip trailing slash if the user provided one - we will add it back + * later when appending the filename. + */ + if (pm_file_dir[dir_len - 1] == '/') { + dir_len--; + } + if (dir_len == max_dir_len) { + SPDK_ERRLOG("pm_file_dir (%s) too long\n", pm_file_dir); + cb_fn(cb_arg, NULL, -EINVAL); + return; + } + + rc = _validate_vol_params(params); + if (rc != 0) { + SPDK_ERRLOG("invalid vol params\n"); + cb_fn(cb_arg, NULL, rc); + return; + } + + backing_dev_size = backing_dev->blockcnt * backing_dev->blocklen; + params->vol_size = _get_vol_size(params->chunk_size, backing_dev_size); + if (params->vol_size == 0) { + SPDK_ERRLOG("backing device is too small\n"); + cb_fn(cb_arg, NULL, -EINVAL); + return; + } + + if (backing_dev->readv == NULL || backing_dev->writev == NULL || + backing_dev->unmap == NULL) { + SPDK_ERRLOG("backing_dev function pointer not specified\n"); + cb_fn(cb_arg, NULL, -EINVAL); + return; + } + + vol = calloc(1, sizeof(*vol)); + if (vol == NULL) { + cb_fn(cb_arg, NULL, -ENOMEM); + return; + } + + TAILQ_INIT(&vol->free_requests); + TAILQ_INIT(&vol->executing_requests); + TAILQ_INIT(&vol->queued_requests); + + vol->backing_super = spdk_zmalloc(sizeof(*vol->backing_super), 0, NULL, + SPDK_ENV_LCORE_ID_ANY, SPDK_MALLOC_DMA); + if (vol->backing_super == NULL) { + cb_fn(cb_arg, NULL, -ENOMEM); + _init_load_cleanup(vol, NULL); + return; + } + + init_ctx = calloc(1, sizeof(*init_ctx)); + if (init_ctx == NULL) { + cb_fn(cb_arg, NULL, -ENOMEM); + _init_load_cleanup(vol, NULL); + return; + } + + init_ctx->path = spdk_zmalloc(REDUCE_PATH_MAX, 0, NULL, + SPDK_ENV_LCORE_ID_ANY, SPDK_MALLOC_DMA); + if (init_ctx->path == NULL) { + cb_fn(cb_arg, NULL, -ENOMEM); + _init_load_cleanup(vol, init_ctx); + return; + } + + if (spdk_mem_all_zero(¶ms->uuid, sizeof(params->uuid))) { + spdk_uuid_generate(¶ms->uuid); + } + + memcpy(vol->pm_file.path, pm_file_dir, dir_len); + vol->pm_file.path[dir_len] = '/'; + spdk_uuid_fmt_lower(&vol->pm_file.path[dir_len + 1], SPDK_UUID_STRING_LEN, + ¶ms->uuid); + vol->pm_file.size = _get_pm_file_size(params); + vol->pm_file.pm_buf = pmem_map_file(vol->pm_file.path, vol->pm_file.size, + PMEM_FILE_CREATE | PMEM_FILE_EXCL, 0600, + &mapped_len, &vol->pm_file.pm_is_pmem); + if (vol->pm_file.pm_buf == NULL) { + SPDK_ERRLOG("could not pmem_map_file(%s): %s\n", + vol->pm_file.path, strerror(errno)); + cb_fn(cb_arg, NULL, -errno); + _init_load_cleanup(vol, init_ctx); + return; + } + + if (vol->pm_file.size != mapped_len) { + SPDK_ERRLOG("could not map entire pmem file (size=%" PRIu64 " mapped=%" PRIu64 ")\n", + vol->pm_file.size, mapped_len); + cb_fn(cb_arg, NULL, -ENOMEM); + _init_load_cleanup(vol, init_ctx); + return; + } + + vol->backing_io_units_per_chunk = params->chunk_size / params->backing_io_unit_size; + vol->logical_blocks_per_chunk = params->chunk_size / params->logical_block_size; + vol->backing_lba_per_io_unit = params->backing_io_unit_size / backing_dev->blocklen; + memcpy(&vol->params, params, sizeof(*params)); + + vol->backing_dev = backing_dev; + + rc = _allocate_bit_arrays(vol); + if (rc != 0) { + cb_fn(cb_arg, NULL, rc); + _init_load_cleanup(vol, init_ctx); + return; + } + + memcpy(vol->backing_super->signature, SPDK_REDUCE_SIGNATURE, + sizeof(vol->backing_super->signature)); + memcpy(&vol->backing_super->params, params, sizeof(*params)); + + _initialize_vol_pm_pointers(vol); + + memcpy(vol->pm_super, vol->backing_super, sizeof(*vol->backing_super)); + /* Writing 0xFF's is equivalent of filling it all with SPDK_EMPTY_MAP_ENTRY. + * Note that this writes 0xFF to not just the logical map but the chunk maps as well. + */ + memset(vol->pm_logical_map, 0xFF, vol->pm_file.size - sizeof(*vol->backing_super)); + _reduce_persist(vol, vol->pm_file.pm_buf, vol->pm_file.size); + + init_ctx->vol = vol; + init_ctx->cb_fn = cb_fn; + init_ctx->cb_arg = cb_arg; + + memcpy(init_ctx->path, vol->pm_file.path, REDUCE_PATH_MAX); + init_ctx->iov[0].iov_base = init_ctx->path; + init_ctx->iov[0].iov_len = REDUCE_PATH_MAX; + init_ctx->backing_cb_args.cb_fn = _init_write_path_cpl; + init_ctx->backing_cb_args.cb_arg = init_ctx; + /* Write path to offset 4K on backing device - just after where the super + * block will be written. We wait until this is committed before writing the + * super block to guarantee we don't get the super block written without the + * the path if the system crashed in the middle of a write operation. + */ + vol->backing_dev->writev(vol->backing_dev, init_ctx->iov, 1, + REDUCE_BACKING_DEV_PATH_OFFSET / vol->backing_dev->blocklen, + REDUCE_PATH_MAX / vol->backing_dev->blocklen, + &init_ctx->backing_cb_args); +} + +static void destroy_load_cb(void *cb_arg, struct spdk_reduce_vol *vol, int reduce_errno); + +static void +_load_read_super_and_path_cpl(void *cb_arg, int reduce_errno) +{ + struct reduce_init_load_ctx *load_ctx = cb_arg; + struct spdk_reduce_vol *vol = load_ctx->vol; + uint64_t backing_dev_size; + uint64_t i, num_chunks, logical_map_index; + struct spdk_reduce_chunk_map *chunk; + size_t mapped_len; + uint32_t j; + int rc; + + rc = _alloc_zero_buff(); + if (rc) { + goto error; + } + + if (memcmp(vol->backing_super->signature, + SPDK_REDUCE_SIGNATURE, + sizeof(vol->backing_super->signature)) != 0) { + /* This backing device isn't a libreduce backing device. */ + rc = -EILSEQ; + goto error; + } + + /* If the cb_fn is destroy_load_cb, it means we are wanting to destroy this compress bdev. + * So don't bother getting the volume ready to use - invoke the callback immediately + * so destroy_load_cb can delete the metadata off of the block device and delete the + * persistent memory file if it exists. + */ + memcpy(vol->pm_file.path, load_ctx->path, sizeof(vol->pm_file.path)); + if (load_ctx->cb_fn == (*destroy_load_cb)) { + load_ctx->cb_fn(load_ctx->cb_arg, vol, 0); + _init_load_cleanup(NULL, load_ctx); + return; + } + + memcpy(&vol->params, &vol->backing_super->params, sizeof(vol->params)); + vol->backing_io_units_per_chunk = vol->params.chunk_size / vol->params.backing_io_unit_size; + vol->logical_blocks_per_chunk = vol->params.chunk_size / vol->params.logical_block_size; + vol->backing_lba_per_io_unit = vol->params.backing_io_unit_size / vol->backing_dev->blocklen; + + rc = _allocate_bit_arrays(vol); + if (rc != 0) { + goto error; + } + + backing_dev_size = vol->backing_dev->blockcnt * vol->backing_dev->blocklen; + if (_get_vol_size(vol->params.chunk_size, backing_dev_size) < vol->params.vol_size) { + SPDK_ERRLOG("backing device size %" PRIi64 " smaller than expected\n", + backing_dev_size); + rc = -EILSEQ; + goto error; + } + + vol->pm_file.size = _get_pm_file_size(&vol->params); + vol->pm_file.pm_buf = pmem_map_file(vol->pm_file.path, 0, 0, 0, &mapped_len, + &vol->pm_file.pm_is_pmem); + if (vol->pm_file.pm_buf == NULL) { + SPDK_ERRLOG("could not pmem_map_file(%s): %s\n", vol->pm_file.path, strerror(errno)); + rc = -errno; + goto error; + } + + if (vol->pm_file.size != mapped_len) { + SPDK_ERRLOG("could not map entire pmem file (size=%" PRIu64 " mapped=%" PRIu64 ")\n", + vol->pm_file.size, mapped_len); + rc = -ENOMEM; + goto error; + } + + rc = _allocate_vol_requests(vol); + if (rc != 0) { + goto error; + } + + _initialize_vol_pm_pointers(vol); + + num_chunks = vol->params.vol_size / vol->params.chunk_size; + for (i = 0; i < num_chunks; i++) { + logical_map_index = vol->pm_logical_map[i]; + if (logical_map_index == REDUCE_EMPTY_MAP_ENTRY) { + continue; + } + spdk_bit_array_set(vol->allocated_chunk_maps, logical_map_index); + chunk = _reduce_vol_get_chunk_map(vol, logical_map_index); + for (j = 0; j < vol->backing_io_units_per_chunk; j++) { + if (chunk->io_unit_index[j] != REDUCE_EMPTY_MAP_ENTRY) { + spdk_bit_array_set(vol->allocated_backing_io_units, chunk->io_unit_index[j]); + } + } + } + + load_ctx->cb_fn(load_ctx->cb_arg, vol, 0); + /* Only clean up the ctx - the vol has been passed to the application + * for use now that volume load was successful. + */ + _init_load_cleanup(NULL, load_ctx); + return; + +error: + load_ctx->cb_fn(load_ctx->cb_arg, NULL, rc); + _init_load_cleanup(vol, load_ctx); +} + +void +spdk_reduce_vol_load(struct spdk_reduce_backing_dev *backing_dev, + spdk_reduce_vol_op_with_handle_complete cb_fn, void *cb_arg) +{ + struct spdk_reduce_vol *vol; + struct reduce_init_load_ctx *load_ctx; + + if (backing_dev->readv == NULL || backing_dev->writev == NULL || + backing_dev->unmap == NULL) { + SPDK_ERRLOG("backing_dev function pointer not specified\n"); + cb_fn(cb_arg, NULL, -EINVAL); + return; + } + + vol = calloc(1, sizeof(*vol)); + if (vol == NULL) { + cb_fn(cb_arg, NULL, -ENOMEM); + return; + } + + TAILQ_INIT(&vol->free_requests); + TAILQ_INIT(&vol->executing_requests); + TAILQ_INIT(&vol->queued_requests); + + vol->backing_super = spdk_zmalloc(sizeof(*vol->backing_super), 64, NULL, + SPDK_ENV_LCORE_ID_ANY, SPDK_MALLOC_DMA); + if (vol->backing_super == NULL) { + _init_load_cleanup(vol, NULL); + cb_fn(cb_arg, NULL, -ENOMEM); + return; + } + + vol->backing_dev = backing_dev; + + load_ctx = calloc(1, sizeof(*load_ctx)); + if (load_ctx == NULL) { + _init_load_cleanup(vol, NULL); + cb_fn(cb_arg, NULL, -ENOMEM); + return; + } + + load_ctx->path = spdk_zmalloc(REDUCE_PATH_MAX, 64, NULL, + SPDK_ENV_LCORE_ID_ANY, SPDK_MALLOC_DMA); + if (load_ctx->path == NULL) { + _init_load_cleanup(vol, load_ctx); + cb_fn(cb_arg, NULL, -ENOMEM); + return; + } + + load_ctx->vol = vol; + load_ctx->cb_fn = cb_fn; + load_ctx->cb_arg = cb_arg; + + load_ctx->iov[0].iov_base = vol->backing_super; + load_ctx->iov[0].iov_len = sizeof(*vol->backing_super); + load_ctx->iov[1].iov_base = load_ctx->path; + load_ctx->iov[1].iov_len = REDUCE_PATH_MAX; + load_ctx->backing_cb_args.cb_fn = _load_read_super_and_path_cpl; + load_ctx->backing_cb_args.cb_arg = load_ctx; + vol->backing_dev->readv(vol->backing_dev, load_ctx->iov, LOAD_IOV_COUNT, 0, + (sizeof(*vol->backing_super) + REDUCE_PATH_MAX) / + vol->backing_dev->blocklen, + &load_ctx->backing_cb_args); +} + +void +spdk_reduce_vol_unload(struct spdk_reduce_vol *vol, + spdk_reduce_vol_op_complete cb_fn, void *cb_arg) +{ + if (vol == NULL) { + /* This indicates a programming error. */ + assert(false); + cb_fn(cb_arg, -EINVAL); + return; + } + + if (--g_vol_count == 0) { + spdk_free(g_zero_buf); + } + assert(g_vol_count >= 0); + _init_load_cleanup(vol, NULL); + cb_fn(cb_arg, 0); +} + +struct reduce_destroy_ctx { + spdk_reduce_vol_op_complete cb_fn; + void *cb_arg; + struct spdk_reduce_vol *vol; + struct spdk_reduce_vol_superblock *super; + struct iovec iov; + struct spdk_reduce_vol_cb_args backing_cb_args; + int reduce_errno; + char pm_path[REDUCE_PATH_MAX]; +}; + +static void +destroy_unload_cpl(void *cb_arg, int reduce_errno) +{ + struct reduce_destroy_ctx *destroy_ctx = cb_arg; + + if (destroy_ctx->reduce_errno == 0) { + if (unlink(destroy_ctx->pm_path)) { + SPDK_ERRLOG("%s could not be unlinked: %s\n", + destroy_ctx->pm_path, strerror(errno)); + } + } + + /* Even if the unload somehow failed, we still pass the destroy_ctx + * reduce_errno since that indicates whether or not the volume was + * actually destroyed. + */ + destroy_ctx->cb_fn(destroy_ctx->cb_arg, destroy_ctx->reduce_errno); + spdk_free(destroy_ctx->super); + free(destroy_ctx); +} + +static void +_destroy_zero_super_cpl(void *cb_arg, int reduce_errno) +{ + struct reduce_destroy_ctx *destroy_ctx = cb_arg; + struct spdk_reduce_vol *vol = destroy_ctx->vol; + + destroy_ctx->reduce_errno = reduce_errno; + spdk_reduce_vol_unload(vol, destroy_unload_cpl, destroy_ctx); +} + +static void +destroy_load_cb(void *cb_arg, struct spdk_reduce_vol *vol, int reduce_errno) +{ + struct reduce_destroy_ctx *destroy_ctx = cb_arg; + + if (reduce_errno != 0) { + destroy_ctx->cb_fn(destroy_ctx->cb_arg, reduce_errno); + spdk_free(destroy_ctx->super); + free(destroy_ctx); + return; + } + + destroy_ctx->vol = vol; + memcpy(destroy_ctx->pm_path, vol->pm_file.path, sizeof(destroy_ctx->pm_path)); + destroy_ctx->iov.iov_base = destroy_ctx->super; + destroy_ctx->iov.iov_len = sizeof(*destroy_ctx->super); + destroy_ctx->backing_cb_args.cb_fn = _destroy_zero_super_cpl; + destroy_ctx->backing_cb_args.cb_arg = destroy_ctx; + vol->backing_dev->writev(vol->backing_dev, &destroy_ctx->iov, 1, 0, + sizeof(*destroy_ctx->super) / vol->backing_dev->blocklen, + &destroy_ctx->backing_cb_args); +} + +void +spdk_reduce_vol_destroy(struct spdk_reduce_backing_dev *backing_dev, + spdk_reduce_vol_op_complete cb_fn, void *cb_arg) +{ + struct reduce_destroy_ctx *destroy_ctx; + + destroy_ctx = calloc(1, sizeof(*destroy_ctx)); + if (destroy_ctx == NULL) { + cb_fn(cb_arg, -ENOMEM); + return; + } + + destroy_ctx->super = spdk_zmalloc(sizeof(*destroy_ctx->super), 64, NULL, + SPDK_ENV_LCORE_ID_ANY, SPDK_MALLOC_DMA); + if (destroy_ctx->super == NULL) { + free(destroy_ctx); + cb_fn(cb_arg, -ENOMEM); + return; + } + destroy_ctx->cb_fn = cb_fn; + destroy_ctx->cb_arg = cb_arg; + spdk_reduce_vol_load(backing_dev, destroy_load_cb, destroy_ctx); +} + +static bool +_request_spans_chunk_boundary(struct spdk_reduce_vol *vol, uint64_t offset, uint64_t length) +{ + uint64_t start_chunk, end_chunk; + + start_chunk = offset / vol->logical_blocks_per_chunk; + end_chunk = (offset + length - 1) / vol->logical_blocks_per_chunk; + + return (start_chunk != end_chunk); +} + +typedef void (*reduce_request_fn)(void *_req, int reduce_errno); + +static void +_reduce_vol_complete_req(struct spdk_reduce_vol_request *req, int reduce_errno) +{ + struct spdk_reduce_vol_request *next_req; + struct spdk_reduce_vol *vol = req->vol; + + req->cb_fn(req->cb_arg, reduce_errno); + TAILQ_REMOVE(&vol->executing_requests, req, tailq); + + TAILQ_FOREACH(next_req, &vol->queued_requests, tailq) { + if (next_req->logical_map_index == req->logical_map_index) { + TAILQ_REMOVE(&vol->queued_requests, next_req, tailq); + if (next_req->type == REDUCE_IO_READV) { + _start_readv_request(next_req); + } else { + assert(next_req->type == REDUCE_IO_WRITEV); + _start_writev_request(next_req); + } + break; + } + } + + TAILQ_INSERT_HEAD(&vol->free_requests, req, tailq); +} + +static void +_write_write_done(void *_req, int reduce_errno) +{ + struct spdk_reduce_vol_request *req = _req; + struct spdk_reduce_vol *vol = req->vol; + uint64_t old_chunk_map_index; + struct spdk_reduce_chunk_map *old_chunk; + uint32_t i; + + if (reduce_errno != 0) { + req->reduce_errno = reduce_errno; + } + + assert(req->num_backing_ops > 0); + if (--req->num_backing_ops > 0) { + return; + } + + if (req->reduce_errno != 0) { + _reduce_vol_complete_req(req, req->reduce_errno); + return; + } + + old_chunk_map_index = vol->pm_logical_map[req->logical_map_index]; + if (old_chunk_map_index != REDUCE_EMPTY_MAP_ENTRY) { + old_chunk = _reduce_vol_get_chunk_map(vol, old_chunk_map_index); + for (i = 0; i < vol->backing_io_units_per_chunk; i++) { + if (old_chunk->io_unit_index[i] == REDUCE_EMPTY_MAP_ENTRY) { + break; + } + assert(spdk_bit_array_get(vol->allocated_backing_io_units, old_chunk->io_unit_index[i]) == true); + spdk_bit_array_clear(vol->allocated_backing_io_units, old_chunk->io_unit_index[i]); + old_chunk->io_unit_index[i] = REDUCE_EMPTY_MAP_ENTRY; + } + spdk_bit_array_clear(vol->allocated_chunk_maps, old_chunk_map_index); + } + + /* + * We don't need to persist the clearing of the old chunk map here. The old chunk map + * becomes invalid after we update the logical map, since the old chunk map will no + * longer have a reference to it in the logical map. + */ + + /* Persist the new chunk map. This must be persisted before we update the logical map. */ + _reduce_persist(vol, req->chunk, + _reduce_vol_get_chunk_struct_size(vol->backing_io_units_per_chunk)); + + vol->pm_logical_map[req->logical_map_index] = req->chunk_map_index; + + _reduce_persist(vol, &vol->pm_logical_map[req->logical_map_index], sizeof(uint64_t)); + + _reduce_vol_complete_req(req, 0); +} + +static void +_issue_backing_ops(struct spdk_reduce_vol_request *req, struct spdk_reduce_vol *vol, + reduce_request_fn next_fn, bool is_write) +{ + struct iovec *iov; + uint8_t *buf; + uint32_t i; + + if (req->chunk_is_compressed) { + iov = req->comp_buf_iov; + buf = req->comp_buf; + } else { + iov = req->decomp_buf_iov; + buf = req->decomp_buf; + } + + req->num_backing_ops = req->num_io_units; + req->backing_cb_args.cb_fn = next_fn; + req->backing_cb_args.cb_arg = req; + for (i = 0; i < req->num_io_units; i++) { + iov[i].iov_base = buf + i * vol->params.backing_io_unit_size; + iov[i].iov_len = vol->params.backing_io_unit_size; + if (is_write) { + vol->backing_dev->writev(vol->backing_dev, &iov[i], 1, + req->chunk->io_unit_index[i] * vol->backing_lba_per_io_unit, + vol->backing_lba_per_io_unit, &req->backing_cb_args); + } else { + vol->backing_dev->readv(vol->backing_dev, &iov[i], 1, + req->chunk->io_unit_index[i] * vol->backing_lba_per_io_unit, + vol->backing_lba_per_io_unit, &req->backing_cb_args); + } + } +} + +static void +_reduce_vol_write_chunk(struct spdk_reduce_vol_request *req, reduce_request_fn next_fn, + uint32_t compressed_size) +{ + struct spdk_reduce_vol *vol = req->vol; + uint32_t i; + uint64_t chunk_offset, remainder, total_len = 0; + uint8_t *buf; + int j; + + req->chunk_map_index = spdk_bit_array_find_first_clear(vol->allocated_chunk_maps, 0); + + /* TODO: fail if no chunk map found - but really this should not happen if we + * size the number of requests similarly to number of extra chunk maps + */ + assert(req->chunk_map_index != UINT32_MAX); + spdk_bit_array_set(vol->allocated_chunk_maps, req->chunk_map_index); + + req->chunk = _reduce_vol_get_chunk_map(vol, req->chunk_map_index); + req->num_io_units = spdk_divide_round_up(compressed_size, + vol->params.backing_io_unit_size); + req->chunk_is_compressed = (req->num_io_units != vol->backing_io_units_per_chunk); + req->chunk->compressed_size = + req->chunk_is_compressed ? compressed_size : vol->params.chunk_size; + + /* if the chunk is uncompressed we need to copy the data from the host buffers. */ + if (req->chunk_is_compressed == false) { + chunk_offset = req->offset % vol->logical_blocks_per_chunk; + buf = req->decomp_buf; + total_len = chunk_offset * vol->params.logical_block_size; + + /* zero any offset into chunk */ + if (req->rmw == false && chunk_offset) { + memset(buf, 0, total_len); + } + buf += total_len; + + /* copy the data */ + for (j = 0; j < req->iovcnt; j++) { + memcpy(buf, req->iov[j].iov_base, req->iov[j].iov_len); + buf += req->iov[j].iov_len; + total_len += req->iov[j].iov_len; + } + + /* zero any remainder */ + remainder = vol->params.chunk_size - total_len; + total_len += remainder; + if (req->rmw == false && remainder) { + memset(buf, 0, remainder); + } + assert(total_len == vol->params.chunk_size); + } + + for (i = 0; i < req->num_io_units; i++) { + req->chunk->io_unit_index[i] = spdk_bit_array_find_first_clear(vol->allocated_backing_io_units, 0); + /* TODO: fail if no backing block found - but really this should also not + * happen (see comment above). + */ + assert(req->chunk->io_unit_index[i] != UINT32_MAX); + spdk_bit_array_set(vol->allocated_backing_io_units, req->chunk->io_unit_index[i]); + } + + _issue_backing_ops(req, vol, next_fn, true /* write */); +} + +static void +_write_compress_done(void *_req, int reduce_errno) +{ + struct spdk_reduce_vol_request *req = _req; + + /* Negative reduce_errno indicates failure for compression operations. + * Just write the uncompressed data instead. Force this to happen + * by just passing the full chunk size to _reduce_vol_write_chunk. + * When it sees the data couldn't be compressed, it will just write + * the uncompressed buffer to disk. + */ + if (reduce_errno < 0) { + reduce_errno = req->vol->params.chunk_size; + } + + /* Positive reduce_errno indicates number of bytes in compressed buffer. */ + _reduce_vol_write_chunk(req, _write_write_done, (uint32_t)reduce_errno); +} + +static void +_reduce_vol_compress_chunk(struct spdk_reduce_vol_request *req, reduce_request_fn next_fn) +{ + struct spdk_reduce_vol *vol = req->vol; + + req->backing_cb_args.cb_fn = next_fn; + req->backing_cb_args.cb_arg = req; + req->comp_buf_iov[0].iov_base = req->comp_buf; + req->comp_buf_iov[0].iov_len = vol->params.chunk_size; + vol->backing_dev->compress(vol->backing_dev, + &req->decomp_iov[0], req->decomp_iovcnt, req->comp_buf_iov, 1, + &req->backing_cb_args); +} + +static void +_reduce_vol_decompress_chunk_scratch(struct spdk_reduce_vol_request *req, reduce_request_fn next_fn) +{ + struct spdk_reduce_vol *vol = req->vol; + + req->backing_cb_args.cb_fn = next_fn; + req->backing_cb_args.cb_arg = req; + req->comp_buf_iov[0].iov_base = req->comp_buf; + req->comp_buf_iov[0].iov_len = req->chunk->compressed_size; + req->decomp_buf_iov[0].iov_base = req->decomp_buf; + req->decomp_buf_iov[0].iov_len = vol->params.chunk_size; + vol->backing_dev->decompress(vol->backing_dev, + req->comp_buf_iov, 1, req->decomp_buf_iov, 1, + &req->backing_cb_args); +} + +static void +_reduce_vol_decompress_chunk(struct spdk_reduce_vol_request *req, reduce_request_fn next_fn) +{ + struct spdk_reduce_vol *vol = req->vol; + uint64_t chunk_offset, remainder = 0; + uint64_t ttl_len = 0; + int i; + + req->decomp_iovcnt = 0; + chunk_offset = req->offset % vol->logical_blocks_per_chunk; + + if (chunk_offset) { + /* first iov point to our scratch buffer for any offset into the chunk */ + req->decomp_iov[0].iov_base = req->decomp_buf; + req->decomp_iov[0].iov_len = chunk_offset * vol->params.logical_block_size; + ttl_len += req->decomp_iov[0].iov_len; + req->decomp_iovcnt = 1; + } + + /* now the user data iov, direct to the user buffer */ + for (i = 0; i < req->iovcnt; i++) { + req->decomp_iov[i + req->decomp_iovcnt].iov_base = req->iov[i].iov_base; + req->decomp_iov[i + req->decomp_iovcnt].iov_len = req->iov[i].iov_len; + ttl_len += req->decomp_iov[i + req->decomp_iovcnt].iov_len; + } + req->decomp_iovcnt += req->iovcnt; + + /* send the rest of the chunk to our scratch buffer */ + remainder = vol->params.chunk_size - ttl_len; + if (remainder) { + req->decomp_iov[req->decomp_iovcnt].iov_base = req->decomp_buf + ttl_len; + req->decomp_iov[req->decomp_iovcnt].iov_len = remainder; + ttl_len += req->decomp_iov[req->decomp_iovcnt].iov_len; + req->decomp_iovcnt++; + } + assert(ttl_len == vol->params.chunk_size); + + req->backing_cb_args.cb_fn = next_fn; + req->backing_cb_args.cb_arg = req; + req->comp_buf_iov[0].iov_base = req->comp_buf; + req->comp_buf_iov[0].iov_len = req->chunk->compressed_size; + vol->backing_dev->decompress(vol->backing_dev, + req->comp_buf_iov, 1, &req->decomp_iov[0], req->decomp_iovcnt, + &req->backing_cb_args); +} + +static void +_write_decompress_done(void *_req, int reduce_errno) +{ + struct spdk_reduce_vol_request *req = _req; + struct spdk_reduce_vol *vol = req->vol; + uint64_t chunk_offset, remainder, ttl_len = 0; + int i; + + /* Negative reduce_errno indicates failure for compression operations. */ + if (reduce_errno < 0) { + _reduce_vol_complete_req(req, reduce_errno); + return; + } + + /* Positive reduce_errno indicates number of bytes in decompressed + * buffer. This should equal the chunk size - otherwise that's another + * type of failure. + */ + if ((uint32_t)reduce_errno != vol->params.chunk_size) { + _reduce_vol_complete_req(req, -EIO); + return; + } + + req->decomp_iovcnt = 0; + chunk_offset = req->offset % vol->logical_blocks_per_chunk; + + if (chunk_offset) { + req->decomp_iov[0].iov_base = req->decomp_buf; + req->decomp_iov[0].iov_len = chunk_offset * vol->params.logical_block_size; + ttl_len += req->decomp_iov[0].iov_len; + req->decomp_iovcnt = 1; + } + + for (i = 0; i < req->iovcnt; i++) { + req->decomp_iov[i + req->decomp_iovcnt].iov_base = req->iov[i].iov_base; + req->decomp_iov[i + req->decomp_iovcnt].iov_len = req->iov[i].iov_len; + ttl_len += req->decomp_iov[i + req->decomp_iovcnt].iov_len; + } + req->decomp_iovcnt += req->iovcnt; + + remainder = vol->params.chunk_size - ttl_len; + if (remainder) { + req->decomp_iov[req->decomp_iovcnt].iov_base = req->decomp_buf + ttl_len; + req->decomp_iov[req->decomp_iovcnt].iov_len = remainder; + ttl_len += req->decomp_iov[req->decomp_iovcnt].iov_len; + req->decomp_iovcnt++; + } + assert(ttl_len == vol->params.chunk_size); + + _reduce_vol_compress_chunk(req, _write_compress_done); +} + +static void +_write_read_done(void *_req, int reduce_errno) +{ + struct spdk_reduce_vol_request *req = _req; + + if (reduce_errno != 0) { + req->reduce_errno = reduce_errno; + } + + assert(req->num_backing_ops > 0); + if (--req->num_backing_ops > 0) { + return; + } + + if (req->reduce_errno != 0) { + _reduce_vol_complete_req(req, req->reduce_errno); + return; + } + + if (req->chunk_is_compressed) { + _reduce_vol_decompress_chunk_scratch(req, _write_decompress_done); + } else { + _write_decompress_done(req, req->chunk->compressed_size); + } +} + +static void +_read_decompress_done(void *_req, int reduce_errno) +{ + struct spdk_reduce_vol_request *req = _req; + struct spdk_reduce_vol *vol = req->vol; + + /* Negative reduce_errno indicates failure for compression operations. */ + if (reduce_errno < 0) { + _reduce_vol_complete_req(req, reduce_errno); + return; + } + + /* Positive reduce_errno indicates number of bytes in decompressed + * buffer. This should equal the chunk size - otherwise that's another + * type of failure. + */ + if ((uint32_t)reduce_errno != vol->params.chunk_size) { + _reduce_vol_complete_req(req, -EIO); + return; + } + + _reduce_vol_complete_req(req, 0); +} + +static void +_read_read_done(void *_req, int reduce_errno) +{ + struct spdk_reduce_vol_request *req = _req; + uint64_t chunk_offset; + uint8_t *buf; + int i; + + if (reduce_errno != 0) { + req->reduce_errno = reduce_errno; + } + + assert(req->num_backing_ops > 0); + if (--req->num_backing_ops > 0) { + return; + } + + if (req->reduce_errno != 0) { + _reduce_vol_complete_req(req, req->reduce_errno); + return; + } + + if (req->chunk_is_compressed) { + _reduce_vol_decompress_chunk(req, _read_decompress_done); + } else { + + /* If the chunk was compressed, the data would have been sent to the + * host buffers by the decompression operation, if not we need to memcpy here. + */ + chunk_offset = req->offset % req->vol->logical_blocks_per_chunk; + buf = req->decomp_buf + chunk_offset * req->vol->params.logical_block_size; + for (i = 0; i < req->iovcnt; i++) { + memcpy(req->iov[i].iov_base, buf, req->iov[i].iov_len); + buf += req->iov[i].iov_len; + } + + _read_decompress_done(req, req->chunk->compressed_size); + } +} + +static void +_reduce_vol_read_chunk(struct spdk_reduce_vol_request *req, reduce_request_fn next_fn) +{ + struct spdk_reduce_vol *vol = req->vol; + + req->chunk_map_index = vol->pm_logical_map[req->logical_map_index]; + assert(req->chunk_map_index != UINT32_MAX); + + req->chunk = _reduce_vol_get_chunk_map(vol, req->chunk_map_index); + req->num_io_units = spdk_divide_round_up(req->chunk->compressed_size, + vol->params.backing_io_unit_size); + req->chunk_is_compressed = (req->num_io_units != vol->backing_io_units_per_chunk); + + _issue_backing_ops(req, vol, next_fn, false /* read */); +} + +static bool +_iov_array_is_valid(struct spdk_reduce_vol *vol, struct iovec *iov, int iovcnt, + uint64_t length) +{ + uint64_t size = 0; + int i; + + if (iovcnt > REDUCE_MAX_IOVECS) { + return false; + } + + for (i = 0; i < iovcnt; i++) { + size += iov[i].iov_len; + } + + return size == (length * vol->params.logical_block_size); +} + +static bool +_check_overlap(struct spdk_reduce_vol *vol, uint64_t logical_map_index) +{ + struct spdk_reduce_vol_request *req; + + TAILQ_FOREACH(req, &vol->executing_requests, tailq) { + if (logical_map_index == req->logical_map_index) { + return true; + } + } + + return false; +} + +static void +_start_readv_request(struct spdk_reduce_vol_request *req) +{ + TAILQ_INSERT_TAIL(&req->vol->executing_requests, req, tailq); + _reduce_vol_read_chunk(req, _read_read_done); +} + +void +spdk_reduce_vol_readv(struct spdk_reduce_vol *vol, + struct iovec *iov, int iovcnt, uint64_t offset, uint64_t length, + spdk_reduce_vol_op_complete cb_fn, void *cb_arg) +{ + struct spdk_reduce_vol_request *req; + uint64_t logical_map_index; + bool overlapped; + int i; + + if (length == 0) { + cb_fn(cb_arg, 0); + return; + } + + if (_request_spans_chunk_boundary(vol, offset, length)) { + cb_fn(cb_arg, -EINVAL); + return; + } + + if (!_iov_array_is_valid(vol, iov, iovcnt, length)) { + cb_fn(cb_arg, -EINVAL); + return; + } + + logical_map_index = offset / vol->logical_blocks_per_chunk; + overlapped = _check_overlap(vol, logical_map_index); + + if (!overlapped && vol->pm_logical_map[logical_map_index] == REDUCE_EMPTY_MAP_ENTRY) { + /* + * This chunk hasn't been allocated. So treat the data as all + * zeroes for this chunk - do the memset and immediately complete + * the operation. + */ + for (i = 0; i < iovcnt; i++) { + memset(iov[i].iov_base, 0, iov[i].iov_len); + } + cb_fn(cb_arg, 0); + return; + } + + req = TAILQ_FIRST(&vol->free_requests); + if (req == NULL) { + cb_fn(cb_arg, -ENOMEM); + return; + } + + TAILQ_REMOVE(&vol->free_requests, req, tailq); + req->type = REDUCE_IO_READV; + req->vol = vol; + req->iov = iov; + req->iovcnt = iovcnt; + req->offset = offset; + req->logical_map_index = logical_map_index; + req->length = length; + req->cb_fn = cb_fn; + req->cb_arg = cb_arg; + + if (!overlapped) { + _start_readv_request(req); + } else { + TAILQ_INSERT_TAIL(&vol->queued_requests, req, tailq); + } +} + +static void +_start_writev_request(struct spdk_reduce_vol_request *req) +{ + struct spdk_reduce_vol *vol = req->vol; + uint64_t chunk_offset, ttl_len = 0; + uint64_t remainder = 0; + uint32_t lbsize; + int i; + + TAILQ_INSERT_TAIL(&req->vol->executing_requests, req, tailq); + if (vol->pm_logical_map[req->logical_map_index] != REDUCE_EMPTY_MAP_ENTRY) { + if ((req->length * vol->params.logical_block_size) < vol->params.chunk_size) { + /* Read old chunk, then overwrite with data from this write + * operation. + */ + req->rmw = true; + _reduce_vol_read_chunk(req, _write_read_done); + return; + } + } + + lbsize = vol->params.logical_block_size; + req->decomp_iovcnt = 0; + req->rmw = false; + + /* Note: point to our zero buf for offset into the chunk. */ + chunk_offset = req->offset % vol->logical_blocks_per_chunk; + if (chunk_offset != 0) { + ttl_len += chunk_offset * lbsize; + req->decomp_iov[0].iov_base = g_zero_buf; + req->decomp_iov[0].iov_len = ttl_len; + req->decomp_iovcnt = 1; + } + + /* now the user data iov, direct from the user buffer */ + for (i = 0; i < req->iovcnt; i++) { + req->decomp_iov[i + req->decomp_iovcnt].iov_base = req->iov[i].iov_base; + req->decomp_iov[i + req->decomp_iovcnt].iov_len = req->iov[i].iov_len; + ttl_len += req->decomp_iov[i + req->decomp_iovcnt].iov_len; + } + req->decomp_iovcnt += req->iovcnt; + + remainder = vol->params.chunk_size - ttl_len; + if (remainder) { + req->decomp_iov[req->decomp_iovcnt].iov_base = g_zero_buf; + req->decomp_iov[req->decomp_iovcnt].iov_len = remainder; + ttl_len += req->decomp_iov[req->decomp_iovcnt].iov_len; + req->decomp_iovcnt++; + } + assert(ttl_len == req->vol->params.chunk_size); + + _reduce_vol_compress_chunk(req, _write_compress_done); +} + +void +spdk_reduce_vol_writev(struct spdk_reduce_vol *vol, + struct iovec *iov, int iovcnt, uint64_t offset, uint64_t length, + spdk_reduce_vol_op_complete cb_fn, void *cb_arg) +{ + struct spdk_reduce_vol_request *req; + uint64_t logical_map_index; + bool overlapped; + + if (length == 0) { + cb_fn(cb_arg, 0); + return; + } + + if (_request_spans_chunk_boundary(vol, offset, length)) { + cb_fn(cb_arg, -EINVAL); + return; + } + + if (!_iov_array_is_valid(vol, iov, iovcnt, length)) { + cb_fn(cb_arg, -EINVAL); + return; + } + + logical_map_index = offset / vol->logical_blocks_per_chunk; + overlapped = _check_overlap(vol, logical_map_index); + + req = TAILQ_FIRST(&vol->free_requests); + if (req == NULL) { + cb_fn(cb_arg, -ENOMEM); + return; + } + + TAILQ_REMOVE(&vol->free_requests, req, tailq); + req->type = REDUCE_IO_WRITEV; + req->vol = vol; + req->iov = iov; + req->iovcnt = iovcnt; + req->offset = offset; + req->logical_map_index = logical_map_index; + req->length = length; + req->cb_fn = cb_fn; + req->cb_arg = cb_arg; + + if (!overlapped) { + _start_writev_request(req); + } else { + TAILQ_INSERT_TAIL(&vol->queued_requests, req, tailq); + } +} + +const struct spdk_reduce_vol_params * +spdk_reduce_vol_get_params(struct spdk_reduce_vol *vol) +{ + return &vol->params; +} + +void spdk_reduce_vol_print_info(struct spdk_reduce_vol *vol) +{ + uint64_t logical_map_size, num_chunks, ttl_chunk_sz; + uint32_t struct_size; + uint64_t chunk_map_size; + + SPDK_NOTICELOG("vol info:\n"); + SPDK_NOTICELOG("\tvol->params.backing_io_unit_size = 0x%x\n", vol->params.backing_io_unit_size); + SPDK_NOTICELOG("\tvol->params.logical_block_size = 0x%x\n", vol->params.logical_block_size); + SPDK_NOTICELOG("\tvol->params.chunk_size = 0x%x\n", vol->params.chunk_size); + SPDK_NOTICELOG("\tvol->params.vol_size = 0x%" PRIx64 "\n", vol->params.vol_size); + num_chunks = _get_total_chunks(vol->params.vol_size, vol->params.chunk_size); + SPDK_NOTICELOG("\ttotal chunks (including extra) = 0x%" PRIx64 "\n", num_chunks); + SPDK_NOTICELOG("\ttotal chunks (excluding extra) = 0x%" PRIx64 "\n", + vol->params.vol_size / vol->params.chunk_size); + ttl_chunk_sz = _get_pm_total_chunks_size(vol->params.vol_size, vol->params.chunk_size, + vol->params.backing_io_unit_size); + SPDK_NOTICELOG("\ttotal_chunks_size = 0x%" PRIx64 "\n", ttl_chunk_sz); + struct_size = _reduce_vol_get_chunk_struct_size(vol->backing_io_units_per_chunk); + SPDK_NOTICELOG("\tchunk_struct_size = 0x%x\n", struct_size); + + SPDK_NOTICELOG("pmem info:\n"); + SPDK_NOTICELOG("\tvol->pm_file.size = 0x%" PRIx64 "\n", vol->pm_file.size); + SPDK_NOTICELOG("\tvol->pm_file.pm_buf = %p\n", (void *)vol->pm_file.pm_buf); + SPDK_NOTICELOG("\tvol->pm_super = %p\n", (void *)vol->pm_super); + SPDK_NOTICELOG("\tvol->pm_logical_map = %p\n", (void *)vol->pm_logical_map); + logical_map_size = _get_pm_logical_map_size(vol->params.vol_size, + vol->params.chunk_size); + SPDK_NOTICELOG("\tlogical_map_size = 0x%" PRIx64 "\n", logical_map_size); + SPDK_NOTICELOG("\tvol->pm_chunk_maps = %p\n", (void *)vol->pm_chunk_maps); + chunk_map_size = _get_pm_total_chunks_size(vol->params.vol_size, vol->params.chunk_size, + vol->params.backing_io_unit_size); + SPDK_NOTICELOG("\tchunk_map_size = 0x%" PRIx64 "\n", chunk_map_size); +} + +SPDK_LOG_REGISTER_COMPONENT("reduce", SPDK_LOG_REDUCE) |