From e6918187568dbd01842d8d1d2c808ce16a894239 Mon Sep 17 00:00:00 2001 From: Daniel Baumann Date: Sun, 21 Apr 2024 13:54:28 +0200 Subject: Adding upstream version 18.2.2. Signed-off-by: Daniel Baumann --- src/librbd/cache/pwl/ssd/WriteLog.cc | 1160 ++++++++++++++++++++++++++++++++++ 1 file changed, 1160 insertions(+) create mode 100644 src/librbd/cache/pwl/ssd/WriteLog.cc (limited to 'src/librbd/cache/pwl/ssd/WriteLog.cc') diff --git a/src/librbd/cache/pwl/ssd/WriteLog.cc b/src/librbd/cache/pwl/ssd/WriteLog.cc new file mode 100644 index 000000000..753b15b69 --- /dev/null +++ b/src/librbd/cache/pwl/ssd/WriteLog.cc @@ -0,0 +1,1160 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#include "WriteLog.h" +#include "include/buffer.h" +#include "include/Context.h" +#include "include/ceph_assert.h" +#include "common/deleter.h" +#include "common/dout.h" +#include "common/environment.h" +#include "common/errno.h" +#include "common/WorkQueue.h" +#include "common/Timer.h" +#include "common/perf_counters.h" +#include "librbd/ImageCtx.h" +#include "librbd/asio/ContextWQ.h" +#include "librbd/cache/pwl/ImageCacheState.h" +#include "librbd/cache/pwl/LogEntry.h" +#include +#include + +#undef dout_subsys +#define dout_subsys ceph_subsys_rbd_pwl +#undef dout_prefix +#define dout_prefix *_dout << "librbd::cache::pwl::ssd::WriteLog: " \ + << this << " " << __func__ << ": " + +namespace librbd { +namespace cache { +namespace pwl { +namespace ssd { + +using namespace std; +using namespace librbd::cache::pwl; + +static bool is_valid_pool_root(const WriteLogPoolRoot& root) { + return root.pool_size % MIN_WRITE_ALLOC_SSD_SIZE == 0 && + root.first_valid_entry >= DATA_RING_BUFFER_OFFSET && + root.first_valid_entry < root.pool_size && + root.first_valid_entry % MIN_WRITE_ALLOC_SSD_SIZE == 0 && + root.first_free_entry >= DATA_RING_BUFFER_OFFSET && + root.first_free_entry < root.pool_size && + root.first_free_entry % MIN_WRITE_ALLOC_SSD_SIZE == 0; +} + +template +Builder>* WriteLog::create_builder() { + m_builderobj = new Builder(); + return m_builderobj; +} + +template +WriteLog::WriteLog( + I &image_ctx, librbd::cache::pwl::ImageCacheState* cache_state, + cache::ImageWritebackInterface& image_writeback, + plugin::Api& plugin_api) + : AbstractWriteLog(image_ctx, cache_state, create_builder(), + image_writeback, plugin_api) +{ +} + +template +WriteLog::~WriteLog() { + delete m_builderobj; +} + +template +void WriteLog::collect_read_extents( + uint64_t read_buffer_offset, LogMapEntry map_entry, + std::vector> &log_entries_to_read, + std::vector &bls_to_read, + uint64_t entry_hit_length, Extent hit_extent, + pwl::C_ReadRequest *read_ctx) { + // Make a bl for this hit extent. This will add references to the + // write_entry->cache_bl */ + ldout(m_image_ctx.cct, 5) << dendl; + auto write_entry = std::static_pointer_cast(map_entry.log_entry); + buffer::list hit_bl; + write_entry->copy_cache_bl(&hit_bl); + bool writesame = write_entry->is_writesame_entry(); + auto hit_extent_buf = std::make_shared( + hit_extent, hit_bl, true, read_buffer_offset, writesame); + read_ctx->read_extents.push_back(hit_extent_buf); + + if (!hit_bl.length()) { + ldout(m_image_ctx.cct, 5) << "didn't hit RAM" << dendl; + auto read_extent = read_ctx->read_extents.back(); + write_entry->inc_bl_refs(); + log_entries_to_read.push_back(std::move(write_entry)); + bls_to_read.push_back(&read_extent->m_bl); + } +} + +template +void WriteLog::complete_read( + std::vector> &log_entries_to_read, + std::vector &bls_to_read, + Context *ctx) { + if (!log_entries_to_read.empty()) { + aio_read_data_blocks(log_entries_to_read, bls_to_read, ctx); + } else { + ctx->complete(0); + } +} + +template +int WriteLog::create_and_open_bdev() { + CephContext *cct = m_image_ctx.cct; + + bdev = BlockDevice::create(cct, this->m_log_pool_name, aio_cache_cb, + nullptr, nullptr, nullptr); + int r = bdev->open(this->m_log_pool_name); + if (r < 0) { + lderr(cct) << "failed to open bdev" << dendl; + delete bdev; + return r; + } + + ceph_assert(this->m_log_pool_size % MIN_WRITE_ALLOC_SSD_SIZE == 0); + if (bdev->get_size() != this->m_log_pool_size) { + lderr(cct) << "size mismatch: bdev size " << bdev->get_size() + << " (block size " << bdev->get_block_size() + << ") != pool size " << this->m_log_pool_size << dendl; + bdev->close(); + delete bdev; + return -EINVAL; + } + + return 0; +} + +template +bool WriteLog::initialize_pool(Context *on_finish, + pwl::DeferredContexts &later) { + int r; + CephContext *cct = m_image_ctx.cct; + + ceph_assert(ceph_mutex_is_locked_by_me(m_lock)); + if (access(this->m_log_pool_name.c_str(), F_OK) != 0) { + int fd = ::open(this->m_log_pool_name.c_str(), O_RDWR|O_CREAT, 0644); + bool succeed = true; + if (fd >= 0) { + if (truncate(this->m_log_pool_name.c_str(), + this->m_log_pool_size) != 0) { + succeed = false; + } + ::close(fd); + } else { + succeed = false; + } + if (!succeed) { + m_cache_state->present = false; + m_cache_state->clean = true; + m_cache_state->empty = true; + /* TODO: filter/replace errnos that are meaningless to the caller */ + on_finish->complete(-errno); + return false; + } + + r = create_and_open_bdev(); + if (r < 0) { + on_finish->complete(r); + return false; + } + m_cache_state->present = true; + m_cache_state->clean = true; + m_cache_state->empty = true; + /* new pool, calculate and store metadata */ + + /* Keep ring buffer at least MIN_WRITE_ALLOC_SSD_SIZE bytes free. + * In this way, when all ring buffer spaces are allocated, + * m_first_free_entry and m_first_valid_entry will not be equal. + * Equal only means the cache is empty. */ + this->m_bytes_allocated_cap = this->m_log_pool_size - + DATA_RING_BUFFER_OFFSET - MIN_WRITE_ALLOC_SSD_SIZE; + /* Log ring empty */ + m_first_free_entry = DATA_RING_BUFFER_OFFSET; + m_first_valid_entry = DATA_RING_BUFFER_OFFSET; + + auto new_root = std::make_shared(pool_root); + new_root->layout_version = SSD_LAYOUT_VERSION; + new_root->pool_size = this->m_log_pool_size; + new_root->flushed_sync_gen = this->m_flushed_sync_gen; + new_root->block_size = MIN_WRITE_ALLOC_SSD_SIZE; + new_root->first_free_entry = m_first_free_entry; + new_root->first_valid_entry = m_first_valid_entry; + new_root->num_log_entries = 0; + pool_root = *new_root; + + r = update_pool_root_sync(new_root); + if (r != 0) { + lderr(cct) << "failed to initialize pool (" + << this->m_log_pool_name << ")" << dendl; + bdev->close(); + delete bdev; + on_finish->complete(r); + return false; + } + } else { + ceph_assert(m_cache_state->present); + r = create_and_open_bdev(); + if (r < 0) { + on_finish->complete(r); + return false; + } + + bufferlist bl; + SuperBlock superblock; + ::IOContext ioctx(cct, nullptr); + r = bdev->read(0, MIN_WRITE_ALLOC_SSD_SIZE, &bl, &ioctx, false); + if (r < 0) { + lderr(cct) << "read ssd cache superblock failed " << dendl; + goto err_close_bdev; + } + auto p = bl.cbegin(); + decode(superblock, p); + pool_root = superblock.root; + ldout(cct, 1) << "Decoded root: pool_size=" << pool_root.pool_size + << " first_valid_entry=" << pool_root.first_valid_entry + << " first_free_entry=" << pool_root.first_free_entry + << " flushed_sync_gen=" << pool_root.flushed_sync_gen + << dendl; + ceph_assert(is_valid_pool_root(pool_root)); + if (pool_root.layout_version != SSD_LAYOUT_VERSION) { + lderr(cct) << "pool layout version is " + << pool_root.layout_version + << " expected " << SSD_LAYOUT_VERSION + << dendl; + goto err_close_bdev; + } + if (pool_root.block_size != MIN_WRITE_ALLOC_SSD_SIZE) { + lderr(cct) << "pool block size is " << pool_root.block_size + << " expected " << MIN_WRITE_ALLOC_SSD_SIZE + << dendl; + goto err_close_bdev; + } + + this->m_log_pool_size = pool_root.pool_size; + this->m_flushed_sync_gen = pool_root.flushed_sync_gen; + this->m_first_valid_entry = pool_root.first_valid_entry; + this->m_first_free_entry = pool_root.first_free_entry; + this->m_bytes_allocated_cap = this->m_log_pool_size - + DATA_RING_BUFFER_OFFSET - + MIN_WRITE_ALLOC_SSD_SIZE; + + load_existing_entries(later); + m_cache_state->clean = this->m_dirty_log_entries.empty(); + m_cache_state->empty = m_log_entries.empty(); + } + return true; + +err_close_bdev: + bdev->close(); + delete bdev; + on_finish->complete(-EINVAL); + return false; +} + +template +void WriteLog::remove_pool_file() { + ceph_assert(bdev); + bdev->close(); + delete bdev; + bdev = nullptr; + ldout(m_image_ctx.cct, 5) << "block device is closed" << dendl; + + if (m_cache_state->clean) { + ldout(m_image_ctx.cct, 5) << "Removing empty pool file: " + << this->m_log_pool_name << dendl; + if (remove(this->m_log_pool_name.c_str()) != 0) { + lderr(m_image_ctx.cct) << "failed to remove empty pool \"" + << this->m_log_pool_name << "\": " << dendl; + } else { + m_cache_state->present = false; + } + } else { + ldout(m_image_ctx.cct, 5) << "Not removing pool file: " + << this->m_log_pool_name << dendl; + } +} + +template +void WriteLog::load_existing_entries(pwl::DeferredContexts &later) { + CephContext *cct = m_image_ctx.cct; + std::map> sync_point_entries; + std::map missing_sync_points; + + // Iterate through the log_entries and append all the write_bytes + // of each entry to fetch the pos of next 4k of log_entries. Iterate + // through the log entries and append them to the in-memory vector + for (uint64_t next_log_pos = this->m_first_valid_entry; + next_log_pos != this->m_first_free_entry; ) { + // read the entries from SSD cache and decode + bufferlist bl_entries; + ::IOContext ioctx_entry(cct, nullptr); + bdev->read(next_log_pos, MIN_WRITE_ALLOC_SSD_SIZE, &bl_entries, + &ioctx_entry, false); + std::vector ssd_log_entries; + auto pl = bl_entries.cbegin(); + decode(ssd_log_entries, pl); + ldout(cct, 5) << "decoded ssd log entries" << dendl; + uint64_t curr_log_pos = next_log_pos; + std::shared_ptr log_entry = nullptr; + + for (auto it = ssd_log_entries.begin(); it != ssd_log_entries.end(); ++it) { + this->update_entries(&log_entry, &*it, missing_sync_points, + sync_point_entries, curr_log_pos); + log_entry->ram_entry = *it; + log_entry->log_entry_index = curr_log_pos; + log_entry->completed = true; + m_log_entries.push_back(log_entry); + next_log_pos += round_up_to(it->write_bytes, MIN_WRITE_ALLOC_SSD_SIZE); + } + // along with the write_bytes, add control block size too + next_log_pos += MIN_WRITE_ALLOC_SSD_SIZE; + if (next_log_pos >= this->m_log_pool_size) { + next_log_pos = next_log_pos % this->m_log_pool_size + DATA_RING_BUFFER_OFFSET; + } + } + this->update_sync_points(missing_sync_points, sync_point_entries, later); + if (m_first_valid_entry > m_first_free_entry) { + m_bytes_allocated = this->m_log_pool_size - m_first_valid_entry + + m_first_free_entry - DATA_RING_BUFFER_OFFSET; + } else { + m_bytes_allocated = m_first_free_entry - m_first_valid_entry; + } +} + +// For SSD we don't calc m_bytes_allocated in this +template +void WriteLog::inc_allocated_cached_bytes( + std::shared_ptr log_entry) { + if (log_entry->is_write_entry()) { + this->m_bytes_cached += log_entry->write_bytes(); + } +} + +template +bool WriteLog::alloc_resources(C_BlockIORequestT *req) { + bool alloc_succeeds = true; + uint64_t bytes_allocated = 0; + uint64_t bytes_cached = 0; + uint64_t bytes_dirtied = 0; + uint64_t num_lanes = 0; + uint64_t num_unpublished_reserves = 0; + uint64_t num_log_entries = 0; + + // Setup buffer, and get all the number of required resources + req->setup_buffer_resources(&bytes_cached, &bytes_dirtied, &bytes_allocated, + &num_lanes, &num_log_entries, + &num_unpublished_reserves); + + ceph_assert(!num_lanes); + if (num_log_entries) { + bytes_allocated += num_log_entries * MIN_WRITE_ALLOC_SSD_SIZE; + num_log_entries = 0; + } + ceph_assert(!num_unpublished_reserves); + + alloc_succeeds = this->check_allocation(req, bytes_cached, bytes_dirtied, + bytes_allocated, num_lanes, + num_log_entries, + num_unpublished_reserves); + req->set_allocated(alloc_succeeds); + return alloc_succeeds; +} + +template +bool WriteLog::has_sync_point_logs(GenericLogOperations &ops) { + for (auto &op : ops) { + if (op->get_log_entry()->is_sync_point()) { + return true; + break; + } + } + return false; +} + +template +void WriteLog::enlist_op_appender() { + this->m_async_append_ops++; + this->m_async_op_tracker.start_op(); + Context *append_ctx = new LambdaContext([this](int r) { + append_scheduled_ops(); + }); + this->m_work_queue.queue(append_ctx); +} + +/* + * Takes custody of ops. They'll all get their log entries appended, + * and have their on_write_persist contexts completed once they and + * all prior log entries are persisted everywhere. + */ +template +void WriteLog::schedule_append_ops(GenericLogOperations &ops, C_BlockIORequestT *req) { + bool need_finisher = false; + GenericLogOperationsVector appending; + + std::copy(std::begin(ops), std::end(ops), std::back_inserter(appending)); + { + std::lock_guard locker(m_lock); + + bool persist_on_flush = this->get_persist_on_flush(); + need_finisher = !this->m_appending && + ((this->m_ops_to_append.size() >= CONTROL_BLOCK_MAX_LOG_ENTRIES) || + !persist_on_flush); + + // Only flush logs into SSD when there is internal/external flush request + if (!need_finisher) { + need_finisher = has_sync_point_logs(ops); + } + this->m_ops_to_append.splice(this->m_ops_to_append.end(), ops); + + // To preserve the order of overlapping IOs, release_cell() may be + // called only after the ops are added to m_ops_to_append. + // As soon as m_lock is released, the appended ops can be picked up + // by append_scheduled_ops() in another thread and req can be freed. + if (req != nullptr) { + if (persist_on_flush) { + req->complete_user_request(0); + } + req->release_cell(); + } + } + + if (need_finisher) { + this->enlist_op_appender(); + } + + for (auto &op : appending) { + op->appending(); + } +} + +template +void WriteLog::setup_schedule_append(pwl::GenericLogOperationsVector &ops, + bool do_early_flush, + C_BlockIORequestT *req) { + this->schedule_append(ops, req); +} + +template +void WriteLog::append_scheduled_ops(void) { + GenericLogOperations ops; + ldout(m_image_ctx.cct, 20) << dendl; + + bool ops_remain = false; // unused, no-op variable for SSD + bool appending = false; // unused, no-op variable for SSD + this->append_scheduled(ops, ops_remain, appending); + + if (ops.size()) { + alloc_op_log_entries(ops); + append_op_log_entries(ops); + } else { + this->m_async_append_ops--; + this->m_async_op_tracker.finish_op(); + } +} + +/* + * Write and persist the (already allocated) write log entries and + * data buffer allocations for a set of ops. The data buffer for each + * of these must already have been persisted to its reserved area. + */ +template +void WriteLog::append_op_log_entries(GenericLogOperations &ops) { + ceph_assert(!ops.empty()); + ldout(m_image_ctx.cct, 20) << dendl; + Context *ctx = new LambdaContext([this, ops](int r) { + assert(r == 0); + ldout(m_image_ctx.cct, 20) << "Finished root update " << dendl; + + auto captured_ops = std::move(ops); + this->complete_op_log_entries(std::move(captured_ops), r); + + bool need_finisher = false; + { + std::lock_guard locker1(m_lock); + bool persist_on_flush = this->get_persist_on_flush(); + need_finisher = ((this->m_ops_to_append.size() >= CONTROL_BLOCK_MAX_LOG_ENTRIES) || + !persist_on_flush); + + if (!need_finisher) { + need_finisher = has_sync_point_logs(this->m_ops_to_append); + } + } + + if (need_finisher) { + this->enlist_op_appender(); + } + this->m_async_update_superblock--; + this->m_async_op_tracker.finish_op(); + }); + uint64_t *new_first_free_entry = new(uint64_t); + Context *append_ctx = new LambdaContext( + [this, new_first_free_entry, ops, ctx](int r) { + std::shared_ptr new_root; + { + ldout(m_image_ctx.cct, 20) << "Finished appending at " + << *new_first_free_entry << dendl; + utime_t now = ceph_clock_now(); + for (auto &operation : ops) { + operation->log_append_comp_time = now; + } + + std::lock_guard locker(this->m_log_append_lock); + std::lock_guard locker1(m_lock); + assert(this->m_appending); + this->m_appending = false; + new_root = std::make_shared(pool_root); + pool_root.first_free_entry = *new_first_free_entry; + new_root->first_free_entry = *new_first_free_entry; + delete new_first_free_entry; + schedule_update_root(new_root, ctx); + } + this->m_async_append_ops--; + this->m_async_op_tracker.finish_op(); + }); + // Append logs and update first_free_update + append_ops(ops, append_ctx, new_first_free_entry); + + if (ops.size()) { + this->dispatch_deferred_writes(); + } +} + +template +void WriteLog::release_ram(std::shared_ptr log_entry) { + log_entry->remove_cache_bl(); +} + +template +void WriteLog::alloc_op_log_entries(GenericLogOperations &ops) { + std::unique_lock locker(m_lock); + + for (auto &operation : ops) { + auto &log_entry = operation->get_log_entry(); + log_entry->ram_entry.set_entry_valid(true); + m_log_entries.push_back(log_entry); + ldout(m_image_ctx.cct, 20) << "operation=[" << *operation << "]" << dendl; + } + if (m_cache_state->empty && !m_log_entries.empty()) { + m_cache_state->empty = false; + this->update_image_cache_state(); + this->write_image_cache_state(locker); + } +} + +template +void WriteLog::construct_flush_entries(pwl::GenericLogEntries entries_to_flush, + DeferredContexts &post_unlock, + bool has_write_entry) { + // snapshot so we behave consistently + bool invalidating = this->m_invalidating; + + if (invalidating || !has_write_entry) { + for (auto &log_entry : entries_to_flush) { + GuardedRequestFunctionContext *guarded_ctx = + new GuardedRequestFunctionContext([this, log_entry, invalidating] + (GuardedRequestFunctionContext &guard_ctx) { + log_entry->m_cell = guard_ctx.cell; + Context *ctx = this->construct_flush_entry(log_entry, invalidating); + + if (!invalidating) { + ctx = new LambdaContext([this, log_entry, ctx](int r) { + m_image_ctx.op_work_queue->queue(new LambdaContext( + [this, log_entry, ctx](int r) { + ldout(m_image_ctx.cct, 15) << "flushing:" << log_entry + << " " << *log_entry << dendl; + log_entry->writeback(this->m_image_writeback, ctx); + }), 0); + }); + } + ctx->complete(0); + }); + this->detain_flush_guard_request(log_entry, guarded_ctx); + } + } else { + int count = entries_to_flush.size(); + std::vector> write_entries; + std::vector read_bls; + + write_entries.reserve(count); + read_bls.reserve(count); + + for (auto &log_entry : entries_to_flush) { + if (log_entry->is_write_entry()) { + bufferlist *bl = new bufferlist; + auto write_entry = static_pointer_cast(log_entry); + write_entry->inc_bl_refs(); + write_entries.push_back(write_entry); + read_bls.push_back(bl); + } + } + + Context *ctx = new LambdaContext( + [this, entries_to_flush, read_bls](int r) { + int i = 0; + GuardedRequestFunctionContext *guarded_ctx = nullptr; + + for (auto &log_entry : entries_to_flush) { + if (log_entry->is_write_entry()) { + bufferlist captured_entry_bl; + captured_entry_bl.claim_append(*read_bls[i]); + delete read_bls[i++]; + + guarded_ctx = new GuardedRequestFunctionContext([this, log_entry, captured_entry_bl] + (GuardedRequestFunctionContext &guard_ctx) { + log_entry->m_cell = guard_ctx.cell; + Context *ctx = this->construct_flush_entry(log_entry, false); + + m_image_ctx.op_work_queue->queue(new LambdaContext( + [this, log_entry, entry_bl=std::move(captured_entry_bl), ctx](int r) { + auto captured_entry_bl = std::move(entry_bl); + ldout(m_image_ctx.cct, 15) << "flushing:" << log_entry + << " " << *log_entry << dendl; + log_entry->writeback_bl(this->m_image_writeback, ctx, + std::move(captured_entry_bl)); + }), 0); + }); + } else { + guarded_ctx = new GuardedRequestFunctionContext([this, log_entry] + (GuardedRequestFunctionContext &guard_ctx) { + log_entry->m_cell = guard_ctx.cell; + Context *ctx = this->construct_flush_entry(log_entry, false); + m_image_ctx.op_work_queue->queue(new LambdaContext( + [this, log_entry, ctx](int r) { + ldout(m_image_ctx.cct, 15) << "flushing:" << log_entry + << " " << *log_entry << dendl; + log_entry->writeback(this->m_image_writeback, ctx); + }), 0); + }); + } + this->detain_flush_guard_request(log_entry, guarded_ctx); + } + }); + + aio_read_data_blocks(write_entries, read_bls, ctx); + } +} + +template +void WriteLog::process_work() { + CephContext *cct = m_image_ctx.cct; + int max_iterations = 4; + bool wake_up_requested = false; + uint64_t aggressive_high_water_bytes = + this->m_bytes_allocated_cap * AGGRESSIVE_RETIRE_HIGH_WATER; + uint64_t high_water_bytes = this->m_bytes_allocated_cap * RETIRE_HIGH_WATER; + + ldout(cct, 20) << dendl; + + do { + { + std::lock_guard locker(m_lock); + this->m_wake_up_requested = false; + } + if (this->m_alloc_failed_since_retire || (this->m_shutting_down) || + this->m_invalidating || m_bytes_allocated > high_water_bytes) { + ldout(m_image_ctx.cct, 10) << "alloc_fail=" << this->m_alloc_failed_since_retire + << ", allocated > high_water=" + << (m_bytes_allocated > high_water_bytes) + << dendl; + retire_entries((this->m_shutting_down || this->m_invalidating || + m_bytes_allocated > aggressive_high_water_bytes) + ? MAX_ALLOC_PER_TRANSACTION : MAX_FREE_PER_TRANSACTION); + } + this->dispatch_deferred_writes(); + this->process_writeback_dirty_entries(); + { + std::lock_guard locker(m_lock); + wake_up_requested = this->m_wake_up_requested; + } + } while (wake_up_requested && --max_iterations > 0); + + { + std::lock_guard locker(m_lock); + this->m_wake_up_scheduled = false; + // Reschedule if it's still requested + if (this->m_wake_up_requested) { + this->wake_up(); + } + } +} + +/** + * Retire up to MAX_ALLOC_PER_TRANSACTION of the oldest log entries + * that are eligible to be retired. Returns true if anything was + * retired. + * +*/ +template +bool WriteLog::retire_entries(const unsigned long int frees_per_tx) { + CephContext *cct = m_image_ctx.cct; + GenericLogEntriesVector retiring_entries; + uint64_t initial_first_valid_entry; + uint64_t first_valid_entry; + + std::lock_guard retire_locker(this->m_log_retire_lock); + ldout(cct, 20) << "Look for entries to retire" << dendl; + { + // Entry readers can't be added while we hold m_entry_reader_lock + RWLock::WLocker entry_reader_locker(this->m_entry_reader_lock); + std::lock_guard locker(m_lock); + initial_first_valid_entry = m_first_valid_entry; + first_valid_entry = m_first_valid_entry; + while (retiring_entries.size() < frees_per_tx && !m_log_entries.empty()) { + GenericLogEntriesVector retiring_subentries; + uint64_t control_block_pos = m_log_entries.front()->log_entry_index; + uint64_t data_length = 0; + for (auto it = m_log_entries.begin(); it != m_log_entries.end(); ++it) { + if (this->can_retire_entry(*it)) { + // log_entry_index is valid after appending to SSD + if ((*it)->log_entry_index != control_block_pos) { + ldout(cct, 20) << "Old log_entry_index is " << control_block_pos + << ",New log_entry_index is " + << (*it)->log_entry_index + << ",data length is " << data_length << dendl; + ldout(cct, 20) << "The log entry is " << *(*it) << dendl; + if ((*it)->log_entry_index < control_block_pos) { + ceph_assert((*it)->log_entry_index == + (control_block_pos + data_length + MIN_WRITE_ALLOC_SSD_SIZE) % + this->m_log_pool_size + DATA_RING_BUFFER_OFFSET); + } else { + ceph_assert((*it)->log_entry_index == control_block_pos + + data_length + MIN_WRITE_ALLOC_SSD_SIZE); + } + break; + } else { + retiring_subentries.push_back(*it); + if ((*it)->is_write_entry()) { + data_length += (*it)->get_aligned_data_size(); + } + } + } else { + retiring_subentries.clear(); + break; + } + } + // SSD: retiring_subentries in a span + if (!retiring_subentries.empty()) { + for (auto it = retiring_subentries.begin(); + it != retiring_subentries.end(); it++) { + ceph_assert(m_log_entries.front() == *it); + m_log_entries.pop_front(); + if ((*it)->write_bytes() > 0 || (*it)->bytes_dirty() > 0) { + auto gen_write_entry = static_pointer_cast(*it); + if (gen_write_entry) { + this->m_blocks_to_log_entries.remove_log_entry(gen_write_entry); + } + } + } + + ldout(cct, 20) << "span with " << retiring_subentries.size() + << " entries: control_block_pos=" << control_block_pos + << " data_length=" << data_length + << dendl; + retiring_entries.insert( + retiring_entries.end(), retiring_subentries.begin(), + retiring_subentries.end()); + + first_valid_entry = control_block_pos + data_length + + MIN_WRITE_ALLOC_SSD_SIZE; + if (first_valid_entry >= this->m_log_pool_size) { + first_valid_entry = first_valid_entry % this->m_log_pool_size + + DATA_RING_BUFFER_OFFSET; + } + } else { + break; + } + } + } + if (retiring_entries.size()) { + ldout(cct, 20) << "Retiring " << retiring_entries.size() << " entries" + << dendl; + + // Advance first valid entry and release buffers + uint64_t flushed_sync_gen; + std::lock_guard append_locker(this->m_log_append_lock); + { + std::lock_guard locker(m_lock); + flushed_sync_gen = this->m_flushed_sync_gen; + } + + ceph_assert(first_valid_entry != initial_first_valid_entry); + auto new_root = std::make_shared(pool_root); + new_root->flushed_sync_gen = flushed_sync_gen; + new_root->first_valid_entry = first_valid_entry; + pool_root.flushed_sync_gen = flushed_sync_gen; + pool_root.first_valid_entry = first_valid_entry; + + Context *ctx = new LambdaContext( + [this, first_valid_entry, initial_first_valid_entry, + retiring_entries](int r) { + uint64_t allocated_bytes = 0; + uint64_t cached_bytes = 0; + uint64_t former_log_pos = 0; + for (auto &entry : retiring_entries) { + ceph_assert(entry->log_entry_index != 0); + if (entry->log_entry_index != former_log_pos ) { + // Space for control blocks + allocated_bytes += MIN_WRITE_ALLOC_SSD_SIZE; + former_log_pos = entry->log_entry_index; + } + if (entry->is_write_entry()) { + cached_bytes += entry->write_bytes(); + // space for userdata + allocated_bytes += entry->get_aligned_data_size(); + } + } + bool need_update_state = false; + { + std::lock_guard locker(m_lock); + m_first_valid_entry = first_valid_entry; + ceph_assert(m_first_valid_entry % MIN_WRITE_ALLOC_SSD_SIZE == 0); + ceph_assert(this->m_bytes_allocated >= allocated_bytes); + this->m_bytes_allocated -= allocated_bytes; + ceph_assert(this->m_bytes_cached >= cached_bytes); + this->m_bytes_cached -= cached_bytes; + if (!m_cache_state->empty && m_log_entries.empty()) { + m_cache_state->empty = true; + this->update_image_cache_state(); + need_update_state = true; + } + + ldout(m_image_ctx.cct, 20) + << "Finished root update: initial_first_valid_entry=" + << initial_first_valid_entry << ", m_first_valid_entry=" + << m_first_valid_entry << ", release space = " + << allocated_bytes << ", m_bytes_allocated=" + << m_bytes_allocated << ", release cached space=" + << cached_bytes << ", m_bytes_cached=" + << this->m_bytes_cached << dendl; + + this->m_alloc_failed_since_retire = false; + this->wake_up(); + } + if (need_update_state) { + std::unique_lock locker(m_lock); + this->write_image_cache_state(locker); + } + + this->dispatch_deferred_writes(); + this->process_writeback_dirty_entries(); + m_async_update_superblock--; + this->m_async_op_tracker.finish_op(); + }); + + std::lock_guard locker(m_lock); + schedule_update_root(new_root, ctx); + } else { + ldout(cct, 20) << "Nothing to retire" << dendl; + return false; + } + return true; +} + +template +void WriteLog::append_ops(GenericLogOperations &ops, Context *ctx, + uint64_t* new_first_free_entry) { + GenericLogEntriesVector log_entries; + CephContext *cct = m_image_ctx.cct; + uint64_t span_payload_len = 0; + uint64_t bytes_to_free = 0; + ldout(cct, 20) << "Appending " << ops.size() << " log entries." << dendl; + + *new_first_free_entry = pool_root.first_free_entry; + AioTransContext* aio = new AioTransContext(cct, ctx); + + utime_t now = ceph_clock_now(); + for (auto &operation : ops) { + operation->log_append_start_time = now; + auto log_entry = operation->get_log_entry(); + + if (log_entries.size() == CONTROL_BLOCK_MAX_LOG_ENTRIES || + span_payload_len >= SPAN_MAX_DATA_LEN) { + if (log_entries.size() > 1) { + bytes_to_free += (log_entries.size() - 1) * MIN_WRITE_ALLOC_SSD_SIZE; + } + write_log_entries(log_entries, aio, new_first_free_entry); + log_entries.clear(); + span_payload_len = 0; + } + log_entries.push_back(log_entry); + span_payload_len += log_entry->write_bytes(); + } + if (!span_payload_len || !log_entries.empty()) { + if (log_entries.size() > 1) { + bytes_to_free += (log_entries.size() - 1) * MIN_WRITE_ALLOC_SSD_SIZE; + } + write_log_entries(log_entries, aio, new_first_free_entry); + } + + { + std::lock_guard locker1(m_lock); + m_first_free_entry = *new_first_free_entry; + m_bytes_allocated -= bytes_to_free; + } + + bdev->aio_submit(&aio->ioc); +} + +template +void WriteLog::write_log_entries(GenericLogEntriesVector log_entries, + AioTransContext *aio, uint64_t *pos) { + CephContext *cct = m_image_ctx.cct; + ldout(m_image_ctx.cct, 20) << "pos=" << *pos << dendl; + ceph_assert(*pos >= DATA_RING_BUFFER_OFFSET && + *pos < this->m_log_pool_size && + *pos % MIN_WRITE_ALLOC_SSD_SIZE == 0); + + // The first block is for log entries + uint64_t control_block_pos = *pos; + *pos += MIN_WRITE_ALLOC_SSD_SIZE; + if (*pos == this->m_log_pool_size) { + *pos = DATA_RING_BUFFER_OFFSET; + } + + std::vector persist_log_entries; + bufferlist data_bl; + for (auto &log_entry : log_entries) { + log_entry->log_entry_index = control_block_pos; + // Append data buffer for write operations + if (log_entry->is_write_entry()) { + auto write_entry = static_pointer_cast(log_entry); + auto cache_bl = write_entry->get_cache_bl(); + auto align_size = write_entry->get_aligned_data_size(); + data_bl.append(cache_bl); + data_bl.append_zero(align_size - cache_bl.length()); + + write_entry->ram_entry.write_data_pos = *pos; + *pos += align_size; + if (*pos >= this->m_log_pool_size) { + *pos = *pos % this->m_log_pool_size + DATA_RING_BUFFER_OFFSET; + } + } + // push_back _after_ setting write_data_pos + persist_log_entries.push_back(log_entry->ram_entry); + } + + //aio write + bufferlist bl; + encode(persist_log_entries, bl); + ceph_assert(bl.length() <= MIN_WRITE_ALLOC_SSD_SIZE); + bl.append_zero(MIN_WRITE_ALLOC_SSD_SIZE - bl.length()); + bl.append(data_bl); + ceph_assert(bl.length() % MIN_WRITE_ALLOC_SSD_SIZE == 0); + if (control_block_pos + bl.length() > this->m_log_pool_size) { + //exceeds border, need to split + uint64_t size = bl.length(); + bufferlist bl1; + bl.splice(0, this->m_log_pool_size - control_block_pos, &bl1); + ceph_assert(bl.length() == (size - bl1.length())); + + ldout(cct, 20) << "write " << control_block_pos << "~" + << size << " spans boundary, split into " + << control_block_pos << "~" << bl1.length() + << " and " << DATA_RING_BUFFER_OFFSET << "~" + << bl.length() << dendl; + bdev->aio_write(control_block_pos, bl1, &aio->ioc, false, + WRITE_LIFE_NOT_SET); + bdev->aio_write(DATA_RING_BUFFER_OFFSET, bl, &aio->ioc, false, + WRITE_LIFE_NOT_SET); + } else { + ldout(cct, 20) << "write " << control_block_pos << "~" + << bl.length() << dendl; + bdev->aio_write(control_block_pos, bl, &aio->ioc, false, + WRITE_LIFE_NOT_SET); + } +} + +template +void WriteLog::schedule_update_root( + std::shared_ptr root, Context *ctx) { + CephContext *cct = m_image_ctx.cct; + ldout(cct, 15) << "New root: pool_size=" << root->pool_size + << " first_valid_entry=" << root->first_valid_entry + << " first_free_entry=" << root->first_free_entry + << " flushed_sync_gen=" << root->flushed_sync_gen + << dendl; + ceph_assert(is_valid_pool_root(*root)); + + bool need_finisher; + { + ceph_assert(ceph_mutex_is_locked_by_me(m_lock)); + need_finisher = m_poolroot_to_update.empty() && !m_updating_pool_root; + std::shared_ptr entry = + std::make_shared(root, ctx); + this->m_async_update_superblock++; + this->m_async_op_tracker.start_op(); + m_poolroot_to_update.emplace_back(entry); + } + if (need_finisher) { + enlist_op_update_root(); + } +} + +template +void WriteLog::enlist_op_update_root() { + Context *append_ctx = new LambdaContext([this](int r) { + update_root_scheduled_ops(); + }); + this->m_work_queue.queue(append_ctx); +} + +template +void WriteLog::update_root_scheduled_ops() { + ldout(m_image_ctx.cct, 20) << dendl; + + std::shared_ptr root; + WriteLogPoolRootUpdateList root_updates; + Context *ctx = nullptr; + { + std::lock_guard locker(m_lock); + if (m_updating_pool_root) { + /* Another thread is appending */ + ldout(m_image_ctx.cct, 15) << "Another thread is updating pool root" + << dendl; + return; + } + if (m_poolroot_to_update.size()) { + m_updating_pool_root = true; + root_updates.swap(m_poolroot_to_update); + } + } + ceph_assert(!root_updates.empty()); + ldout(m_image_ctx.cct, 15) << "Update root number: " << root_updates.size() + << dendl; + // We just update the last one, and call all the completions. + auto entry = root_updates.back(); + root = entry->root; + + ctx = new LambdaContext([this, updates = std::move(root_updates)](int r) { + ldout(m_image_ctx.cct, 15) << "Start to callback." << dendl; + for (auto it = updates.begin(); it != updates.end(); it++) { + Context *it_ctx = (*it)->ctx; + it_ctx->complete(r); + } + }); + Context *append_ctx = new LambdaContext([this, ctx](int r) { + ldout(m_image_ctx.cct, 15) << "Finish the update of pool root." << dendl; + bool need_finisher = false; + assert(r == 0); + { + std::lock_guard locker(m_lock); + m_updating_pool_root = false; + need_finisher = !m_poolroot_to_update.empty(); + } + if (need_finisher) { + enlist_op_update_root(); + } + ctx->complete(r); + }); + AioTransContext* aio = new AioTransContext(m_image_ctx.cct, append_ctx); + update_pool_root(root, aio); +} + +template +void WriteLog::update_pool_root(std::shared_ptr root, + AioTransContext *aio) { + bufferlist bl; + SuperBlock superblock; + superblock.root = *root; + encode(superblock, bl); + bl.append_zero(MIN_WRITE_ALLOC_SSD_SIZE - bl.length()); + ceph_assert(bl.length() % MIN_WRITE_ALLOC_SSD_SIZE == 0); + bdev->aio_write(0, bl, &aio->ioc, false, WRITE_LIFE_NOT_SET); + bdev->aio_submit(&aio->ioc); +} + +template +int WriteLog::update_pool_root_sync( + std::shared_ptr root) { + bufferlist bl; + SuperBlock superblock; + superblock.root = *root; + encode(superblock, bl); + bl.append_zero(MIN_WRITE_ALLOC_SSD_SIZE - bl.length()); + ceph_assert(bl.length() % MIN_WRITE_ALLOC_SSD_SIZE == 0); + return bdev->write(0, bl, false); +} + +template +void WriteLog::aio_read_data_block(std::shared_ptr log_entry, + bufferlist *bl, Context *ctx) { + std::vector> log_entries = {std::move(log_entry)}; + std::vector bls {bl}; + aio_read_data_blocks(log_entries, bls, ctx); +} + +template +void WriteLog::aio_read_data_blocks( + std::vector> &log_entries, + std::vector &bls, Context *ctx) { + ceph_assert(log_entries.size() == bls.size()); + + //get the valid part + Context *read_ctx = new LambdaContext( + [log_entries, bls, ctx](int r) { + for (unsigned int i = 0; i < log_entries.size(); i++) { + bufferlist valid_data_bl; + auto write_entry = static_pointer_cast(log_entries[i]); + auto length = write_entry->ram_entry.is_write() ? write_entry->ram_entry.write_bytes + : write_entry->ram_entry.ws_datalen; + + valid_data_bl.substr_of(*bls[i], 0, length); + bls[i]->clear(); + bls[i]->append(valid_data_bl); + write_entry->dec_bl_refs(); + } + ctx->complete(r); + }); + + CephContext *cct = m_image_ctx.cct; + AioTransContext *aio = new AioTransContext(cct, read_ctx); + for (unsigned int i = 0; i < log_entries.size(); i++) { + WriteLogCacheEntry *log_entry = &log_entries[i]->ram_entry; + + ceph_assert(log_entry->is_write() || log_entry->is_writesame()); + uint64_t len = log_entry->is_write() ? log_entry->write_bytes : + log_entry->ws_datalen; + uint64_t align_len = round_up_to(len, MIN_WRITE_ALLOC_SSD_SIZE); + + ldout(cct, 20) << "entry i=" << i << " " << log_entry->write_data_pos + << "~" << len << dendl; + ceph_assert(log_entry->write_data_pos >= DATA_RING_BUFFER_OFFSET && + log_entry->write_data_pos < pool_root.pool_size); + ceph_assert(align_len); + if (log_entry->write_data_pos + align_len > pool_root.pool_size) { + // spans boundary, need to split + uint64_t len1 = pool_root.pool_size - log_entry->write_data_pos; + uint64_t len2 = align_len - len1; + + ldout(cct, 20) << "read " << log_entry->write_data_pos << "~" + << align_len << " spans boundary, split into " + << log_entry->write_data_pos << "~" << len1 + << " and " << DATA_RING_BUFFER_OFFSET << "~" + << len2 << dendl; + bdev->aio_read(log_entry->write_data_pos, len1, bls[i], &aio->ioc); + bdev->aio_read(DATA_RING_BUFFER_OFFSET, len2, bls[i], &aio->ioc); + } else { + ldout(cct, 20) << "read " << log_entry->write_data_pos << "~" + << align_len << dendl; + bdev->aio_read(log_entry->write_data_pos, align_len, bls[i], &aio->ioc); + } + } + bdev->aio_submit(&aio->ioc); +} + +template +void WriteLog::complete_user_request(Context *&user_req, int r) { + m_image_ctx.op_work_queue->queue(user_req, r); +} + +} // namespace ssd +} // namespace pwl +} // namespace cache +} // namespace librbd + +template class librbd::cache::pwl::ssd::WriteLog; -- cgit v1.2.3