// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- // vim: ts=8 sw=2 smarttab #include "AbstractWriteLog.h" #include "include/buffer.h" #include "include/Context.h" #include "include/ceph_assert.h" #include "common/deleter.h" #include "common/dout.h" #include "common/environment.h" #include "common/errno.h" #include "common/hostname.h" #include "common/WorkQueue.h" #include "common/Timer.h" #include "common/perf_counters.h" #include "librbd/ImageCtx.h" #include "librbd/asio/ContextWQ.h" #include "librbd/cache/pwl/ImageCacheState.h" #include "librbd/cache/pwl/LogEntry.h" #include "librbd/plugin/Api.h" #include #include #undef dout_subsys #define dout_subsys ceph_subsys_rbd_pwl #undef dout_prefix #define dout_prefix *_dout << "librbd::cache::pwl::AbstractWriteLog: " << this \ << " " << __func__ << ": " namespace librbd { namespace cache { namespace pwl { using namespace std; using namespace librbd::cache::pwl; typedef AbstractWriteLog::Extent Extent; typedef AbstractWriteLog::Extents Extents; template AbstractWriteLog::AbstractWriteLog( I &image_ctx, librbd::cache::pwl::ImageCacheState* cache_state, Builder *builder, cache::ImageWritebackInterface& image_writeback, plugin::Api& plugin_api) : m_builder(builder), m_write_log_guard(image_ctx.cct), m_flush_guard(image_ctx.cct), m_flush_guard_lock(ceph::make_mutex(pwl::unique_lock_name( "librbd::cache::pwl::AbstractWriteLog::m_flush_guard_lock", this))), m_deferred_dispatch_lock(ceph::make_mutex(pwl::unique_lock_name( "librbd::cache::pwl::AbstractWriteLog::m_deferred_dispatch_lock", this))), m_blockguard_lock(ceph::make_mutex(pwl::unique_lock_name( "librbd::cache::pwl::AbstractWriteLog::m_blockguard_lock", this))), m_thread_pool( image_ctx.cct, "librbd::cache::pwl::AbstractWriteLog::thread_pool", "tp_pwl", 4, ""), m_cache_state(cache_state), m_image_ctx(image_ctx), m_log_pool_size(DEFAULT_POOL_SIZE), m_image_writeback(image_writeback), m_plugin_api(plugin_api), m_log_retire_lock(ceph::make_mutex(pwl::unique_lock_name( "librbd::cache::pwl::AbstractWriteLog::m_log_retire_lock", this))), m_entry_reader_lock("librbd::cache::pwl::AbstractWriteLog::m_entry_reader_lock"), m_log_append_lock(ceph::make_mutex(pwl::unique_lock_name( "librbd::cache::pwl::AbstractWriteLog::m_log_append_lock", this))), m_lock(ceph::make_mutex(pwl::unique_lock_name( "librbd::cache::pwl::AbstractWriteLog::m_lock", this))), m_blocks_to_log_entries(image_ctx.cct), m_work_queue("librbd::cache::pwl::ReplicatedWriteLog::work_queue", ceph::make_timespan( image_ctx.config.template get_val( "rbd_op_thread_timeout")), &m_thread_pool) { CephContext *cct = m_image_ctx.cct; m_plugin_api.get_image_timer_instance(cct, &m_timer, &m_timer_lock); } template AbstractWriteLog::~AbstractWriteLog() { ldout(m_image_ctx.cct, 15) << "enter" << dendl; { std::lock_guard timer_locker(*m_timer_lock); std::lock_guard locker(m_lock); m_timer->cancel_event(m_timer_ctx); m_thread_pool.stop(); ceph_assert(m_deferred_ios.size() == 0); ceph_assert(m_ops_to_flush.size() == 0); ceph_assert(m_ops_to_append.size() == 0); ceph_assert(m_flush_ops_in_flight == 0); delete m_cache_state; m_cache_state = nullptr; } ldout(m_image_ctx.cct, 15) << "exit" << dendl; } template void AbstractWriteLog::perf_start(std::string name) { PerfCountersBuilder plb(m_image_ctx.cct, name, l_librbd_pwl_first, l_librbd_pwl_last); // Latency axis configuration for op histograms, values are in nanoseconds PerfHistogramCommon::axis_config_d op_hist_x_axis_config{ "Latency (nsec)", PerfHistogramCommon::SCALE_LOG2, ///< Latency in logarithmic scale 0, ///< Start at 0 5000, ///< Quantization unit is 5usec 16, ///< Ranges into the mS }; // Syncpoint logentry number x-axis configuration for op histograms PerfHistogramCommon::axis_config_d sp_logentry_number_config{ "logentry number", PerfHistogramCommon::SCALE_LINEAR, // log entry number in linear scale 0, // Start at 0 1, // Quantization unit is 1 260, // Up to 260 > (MAX_WRITES_PER_SYNC_POINT) }; // Syncpoint bytes number y-axis configuration for op histogram PerfHistogramCommon::axis_config_d sp_bytes_number_config{ "Number of SyncPoint", PerfHistogramCommon::SCALE_LOG2, // Request size in logarithmic scale 0, // Start at 0 512, // Quantization unit is 512 17, // Writes up to 8M >= MAX_BYTES_PER_SYNC_POINT }; // Op size axis configuration for op histogram y axis, values are in bytes PerfHistogramCommon::axis_config_d op_hist_y_axis_config{ "Request size (bytes)", PerfHistogramCommon::SCALE_LOG2, ///< Request size in logarithmic scale 0, ///< Start at 0 512, ///< Quantization unit is 512 bytes 16, ///< Writes up to >32k }; // Num items configuration for op histogram y axis, values are in items PerfHistogramCommon::axis_config_d op_hist_y_axis_count_config{ "Number of items", PerfHistogramCommon::SCALE_LINEAR, ///< Request size in linear scale 0, ///< Start at 0 1, ///< Quantization unit is 1 32, ///< Writes up to >32k }; plb.add_u64_counter(l_librbd_pwl_rd_req, "rd", "Reads"); plb.add_u64_counter(l_librbd_pwl_rd_bytes, "rd_bytes", "Data size in reads"); plb.add_time_avg(l_librbd_pwl_rd_latency, "rd_latency", "Latency of reads"); plb.add_u64_counter(l_librbd_pwl_rd_hit_req, "hit_rd", "Reads completely hitting RWL"); plb.add_u64_counter(l_librbd_pwl_rd_hit_bytes, "rd_hit_bytes", "Bytes read from RWL"); plb.add_time_avg(l_librbd_pwl_rd_hit_latency, "hit_rd_latency", "Latency of read hits"); plb.add_u64_counter(l_librbd_pwl_rd_part_hit_req, "part_hit_rd", "reads partially hitting RWL"); plb.add_u64_counter_histogram( l_librbd_pwl_syncpoint_hist, "syncpoint_logentry_bytes_histogram", sp_logentry_number_config, sp_bytes_number_config, "Histogram of syncpoint's logentry numbers vs bytes number"); plb.add_u64_counter(l_librbd_pwl_wr_req, "wr", "Writes"); plb.add_u64_counter(l_librbd_pwl_wr_bytes, "wr_bytes", "Data size in writes"); plb.add_u64_counter(l_librbd_pwl_wr_req_def, "wr_def", "Writes deferred for resources"); plb.add_u64_counter(l_librbd_pwl_wr_req_def_lanes, "wr_def_lanes", "Writes deferred for lanes"); plb.add_u64_counter(l_librbd_pwl_wr_req_def_log, "wr_def_log", "Writes deferred for log entries"); plb.add_u64_counter(l_librbd_pwl_wr_req_def_buf, "wr_def_buf", "Writes deferred for buffers"); plb.add_u64_counter(l_librbd_pwl_wr_req_overlap, "wr_overlap", "Writes overlapping with prior in-progress writes"); plb.add_u64_counter(l_librbd_pwl_wr_req_queued, "wr_q_barrier", "Writes queued for prior barriers (aio_flush)"); plb.add_u64_counter(l_librbd_pwl_log_ops, "log_ops", "Log appends"); plb.add_u64_avg(l_librbd_pwl_log_op_bytes, "log_op_bytes", "Average log append bytes"); plb.add_time_avg( l_librbd_pwl_req_arr_to_all_t, "req_arr_to_all_t", "Average arrival to allocation time (time deferred for overlap)"); plb.add_time_avg( l_librbd_pwl_req_arr_to_dis_t, "req_arr_to_dis_t", "Average arrival to dispatch time (includes time deferred for overlaps and allocation)"); plb.add_time_avg( l_librbd_pwl_req_all_to_dis_t, "req_all_to_dis_t", "Average allocation to dispatch time (time deferred for log resources)"); plb.add_time_avg( l_librbd_pwl_wr_latency, "wr_latency", "Latency of writes (persistent completion)"); plb.add_u64_counter_histogram( l_librbd_pwl_wr_latency_hist, "wr_latency_bytes_histogram", op_hist_x_axis_config, op_hist_y_axis_config, "Histogram of write request latency (nanoseconds) vs. bytes written"); plb.add_time_avg( l_librbd_pwl_wr_caller_latency, "caller_wr_latency", "Latency of write completion to caller"); plb.add_time_avg( l_librbd_pwl_nowait_req_arr_to_all_t, "req_arr_to_all_nw_t", "Average arrival to allocation time (time deferred for overlap)"); plb.add_time_avg( l_librbd_pwl_nowait_req_arr_to_dis_t, "req_arr_to_dis_nw_t", "Average arrival to dispatch time (includes time deferred for overlaps and allocation)"); plb.add_time_avg( l_librbd_pwl_nowait_req_all_to_dis_t, "req_all_to_dis_nw_t", "Average allocation to dispatch time (time deferred for log resources)"); plb.add_time_avg( l_librbd_pwl_nowait_wr_latency, "wr_latency_nw", "Latency of writes (persistent completion) not deferred for free space"); plb.add_u64_counter_histogram( l_librbd_pwl_nowait_wr_latency_hist, "wr_latency_nw_bytes_histogram", op_hist_x_axis_config, op_hist_y_axis_config, "Histogram of write request latency (nanoseconds) vs. bytes written for writes not deferred for free space"); plb.add_time_avg( l_librbd_pwl_nowait_wr_caller_latency, "caller_wr_latency_nw", "Latency of write completion to callerfor writes not deferred for free space"); plb.add_time_avg(l_librbd_pwl_log_op_alloc_t, "op_alloc_t", "Average buffer pmemobj_reserve() time"); plb.add_u64_counter_histogram( l_librbd_pwl_log_op_alloc_t_hist, "op_alloc_t_bytes_histogram", op_hist_x_axis_config, op_hist_y_axis_config, "Histogram of buffer pmemobj_reserve() time (nanoseconds) vs. bytes written"); plb.add_time_avg(l_librbd_pwl_log_op_dis_to_buf_t, "op_dis_to_buf_t", "Average dispatch to buffer persist time"); plb.add_time_avg(l_librbd_pwl_log_op_dis_to_app_t, "op_dis_to_app_t", "Average dispatch to log append time"); plb.add_time_avg(l_librbd_pwl_log_op_dis_to_cmp_t, "op_dis_to_cmp_t", "Average dispatch to persist completion time"); plb.add_u64_counter_histogram( l_librbd_pwl_log_op_dis_to_cmp_t_hist, "op_dis_to_cmp_t_bytes_histogram", op_hist_x_axis_config, op_hist_y_axis_config, "Histogram of op dispatch to persist complete time (nanoseconds) vs. bytes written"); plb.add_time_avg( l_librbd_pwl_log_op_buf_to_app_t, "op_buf_to_app_t", "Average buffer persist to log append time (write data persist/replicate + wait for append time)"); plb.add_time_avg( l_librbd_pwl_log_op_buf_to_bufc_t, "op_buf_to_bufc_t", "Average buffer persist time (write data persist/replicate time)"); plb.add_u64_counter_histogram( l_librbd_pwl_log_op_buf_to_bufc_t_hist, "op_buf_to_bufc_t_bytes_histogram", op_hist_x_axis_config, op_hist_y_axis_config, "Histogram of write buffer persist time (nanoseconds) vs. bytes written"); plb.add_time_avg( l_librbd_pwl_log_op_app_to_cmp_t, "op_app_to_cmp_t", "Average log append to persist complete time (log entry append/replicate + wait for complete time)"); plb.add_time_avg( l_librbd_pwl_log_op_app_to_appc_t, "op_app_to_appc_t", "Average log append to persist complete time (log entry append/replicate time)"); plb.add_u64_counter_histogram( l_librbd_pwl_log_op_app_to_appc_t_hist, "op_app_to_appc_t_bytes_histogram", op_hist_x_axis_config, op_hist_y_axis_config, "Histogram of log append persist time (nanoseconds) (vs. op bytes)"); plb.add_u64_counter(l_librbd_pwl_discard, "discard", "Discards"); plb.add_u64_counter(l_librbd_pwl_discard_bytes, "discard_bytes", "Bytes discarded"); plb.add_time_avg(l_librbd_pwl_discard_latency, "discard_lat", "Discard latency"); plb.add_u64_counter(l_librbd_pwl_aio_flush, "aio_flush", "AIO flush (flush to RWL)"); plb.add_u64_counter(l_librbd_pwl_aio_flush_def, "aio_flush_def", "AIO flushes deferred for resources"); plb.add_time_avg(l_librbd_pwl_aio_flush_latency, "aio_flush_lat", "AIO flush latency"); plb.add_u64_counter(l_librbd_pwl_ws,"ws", "Write Sames"); plb.add_u64_counter(l_librbd_pwl_ws_bytes, "ws_bytes", "Write Same bytes to image"); plb.add_time_avg(l_librbd_pwl_ws_latency, "ws_lat", "Write Same latency"); plb.add_u64_counter(l_librbd_pwl_cmp, "cmp", "Compare and Write requests"); plb.add_u64_counter(l_librbd_pwl_cmp_bytes, "cmp_bytes", "Compare and Write bytes compared/written"); plb.add_time_avg(l_librbd_pwl_cmp_latency, "cmp_lat", "Compare and Write latecy"); plb.add_u64_counter(l_librbd_pwl_cmp_fails, "cmp_fails", "Compare and Write compare fails"); plb.add_u64_counter(l_librbd_pwl_internal_flush, "internal_flush", "Flush RWL (write back to OSD)"); plb.add_time_avg(l_librbd_pwl_writeback_latency, "writeback_lat", "write back to OSD latency"); plb.add_u64_counter(l_librbd_pwl_invalidate_cache, "invalidate", "Invalidate RWL"); plb.add_u64_counter(l_librbd_pwl_invalidate_discard_cache, "discard", "Discard and invalidate RWL"); plb.add_time_avg(l_librbd_pwl_append_tx_t, "append_tx_lat", "Log append transaction latency"); plb.add_u64_counter_histogram( l_librbd_pwl_append_tx_t_hist, "append_tx_lat_histogram", op_hist_x_axis_config, op_hist_y_axis_count_config, "Histogram of log append transaction time (nanoseconds) vs. entries appended"); plb.add_time_avg(l_librbd_pwl_retire_tx_t, "retire_tx_lat", "Log retire transaction latency"); plb.add_u64_counter_histogram( l_librbd_pwl_retire_tx_t_hist, "retire_tx_lat_histogram", op_hist_x_axis_config, op_hist_y_axis_count_config, "Histogram of log retire transaction time (nanoseconds) vs. entries retired"); m_perfcounter = plb.create_perf_counters(); m_image_ctx.cct->get_perfcounters_collection()->add(m_perfcounter); } template void AbstractWriteLog::perf_stop() { ceph_assert(m_perfcounter); m_image_ctx.cct->get_perfcounters_collection()->remove(m_perfcounter); delete m_perfcounter; } template void AbstractWriteLog::log_perf() { bufferlist bl; Formatter *f = Formatter::create("json-pretty"); bl.append("Perf dump follows\n--- Begin perf dump ---\n"); bl.append("{\n"); stringstream ss; utime_t now = ceph_clock_now(); ss << "\"test_time\": \"" << now << "\","; ss << "\"image\": \"" << m_image_ctx.name << "\","; bl.append(ss); bl.append("\"stats\": "); m_image_ctx.cct->get_perfcounters_collection()->dump_formatted(f, false, false); f->flush(bl); bl.append(",\n\"histograms\": "); m_image_ctx.cct->get_perfcounters_collection()->dump_formatted_histograms(f, 0); f->flush(bl); delete f; bl.append("}\n--- End perf dump ---\n"); bl.append('\0'); ldout(m_image_ctx.cct, 1) << bl.c_str() << dendl; } template void AbstractWriteLog::periodic_stats() { std::unique_lock locker(m_lock); ldout(m_image_ctx.cct, 5) << "STATS: m_log_entries=" << m_log_entries.size() << ", m_dirty_log_entries=" << m_dirty_log_entries.size() << ", m_free_log_entries=" << m_free_log_entries << ", m_bytes_allocated=" << m_bytes_allocated << ", m_bytes_cached=" << m_bytes_cached << ", m_bytes_dirty=" << m_bytes_dirty << ", bytes available=" << m_bytes_allocated_cap - m_bytes_allocated << ", m_first_valid_entry=" << m_first_valid_entry << ", m_first_free_entry=" << m_first_free_entry << ", m_current_sync_gen=" << m_current_sync_gen << ", m_flushed_sync_gen=" << m_flushed_sync_gen << dendl; update_image_cache_state(); write_image_cache_state(locker); } template void AbstractWriteLog::arm_periodic_stats() { ceph_assert(ceph_mutex_is_locked(*m_timer_lock)); m_timer_ctx = new LambdaContext([this](int r) { /* m_timer_lock is held */ periodic_stats(); arm_periodic_stats(); }); m_timer->add_event_after(LOG_STATS_INTERVAL_SECONDS, m_timer_ctx); } template void AbstractWriteLog::update_entries(std::shared_ptr *log_entry, WriteLogCacheEntry *cache_entry, std::map &missing_sync_points, std::map> &sync_point_entries, uint64_t entry_index) { bool writer = cache_entry->is_writer(); if (cache_entry->is_sync_point()) { ldout(m_image_ctx.cct, 20) << "Entry " << entry_index << " is a sync point. cache_entry=[" << *cache_entry << "]" << dendl; auto sync_point_entry = std::make_shared(cache_entry->sync_gen_number); *log_entry = sync_point_entry; sync_point_entries[cache_entry->sync_gen_number] = sync_point_entry; missing_sync_points.erase(cache_entry->sync_gen_number); m_current_sync_gen = cache_entry->sync_gen_number; } else if (cache_entry->is_write()) { ldout(m_image_ctx.cct, 20) << "Entry " << entry_index << " is a write. cache_entry=[" << *cache_entry << "]" << dendl; auto write_entry = m_builder->create_write_log_entry(nullptr, cache_entry->image_offset_bytes, cache_entry->write_bytes); write_data_to_buffer(write_entry, cache_entry); *log_entry = write_entry; } else if (cache_entry->is_writesame()) { ldout(m_image_ctx.cct, 20) << "Entry " << entry_index << " is a write same. cache_entry=[" << *cache_entry << "]" << dendl; auto ws_entry = m_builder->create_writesame_log_entry(nullptr, cache_entry->image_offset_bytes, cache_entry->write_bytes, cache_entry->ws_datalen); write_data_to_buffer(ws_entry, cache_entry); *log_entry = ws_entry; } else if (cache_entry->is_discard()) { ldout(m_image_ctx.cct, 20) << "Entry " << entry_index << " is a discard. cache_entry=[" << *cache_entry << "]" << dendl; auto discard_entry = std::make_shared(nullptr, cache_entry->image_offset_bytes, cache_entry->write_bytes, m_discard_granularity_bytes); *log_entry = discard_entry; } else { lderr(m_image_ctx.cct) << "Unexpected entry type in entry " << entry_index << ", cache_entry=[" << *cache_entry << "]" << dendl; } if (writer) { ldout(m_image_ctx.cct, 20) << "Entry " << entry_index << " writes. cache_entry=[" << *cache_entry << "]" << dendl; if (!sync_point_entries[cache_entry->sync_gen_number]) { missing_sync_points[cache_entry->sync_gen_number] = true; } } } template void AbstractWriteLog::update_sync_points(std::map &missing_sync_points, std::map> &sync_point_entries, DeferredContexts &later) { /* Create missing sync points. These must not be appended until the * entry reload is complete and the write map is up to * date. Currently this is handled by the deferred contexts object * passed to new_sync_point(). These contexts won't be completed * until this function returns. */ for (auto &kv : missing_sync_points) { ldout(m_image_ctx.cct, 5) << "Adding sync point " << kv.first << dendl; if (0 == m_current_sync_gen) { /* The unlikely case where the log contains writing entries, but no sync * points (e.g. because they were all retired) */ m_current_sync_gen = kv.first-1; } ceph_assert(kv.first == m_current_sync_gen+1); init_flush_new_sync_point(later); ceph_assert(kv.first == m_current_sync_gen); sync_point_entries[kv.first] = m_current_sync_point->log_entry; } /* * Iterate over the log entries again (this time via the global * entries list), connecting write entries to their sync points and * updating the sync point stats. * * Add writes to the write log map. */ std::shared_ptr previous_sync_point_entry = nullptr; for (auto &log_entry : m_log_entries) { if ((log_entry->write_bytes() > 0) || (log_entry->bytes_dirty() > 0)) { /* This entry is one of the types that write */ auto gen_write_entry = static_pointer_cast(log_entry); if (gen_write_entry) { auto sync_point_entry = sync_point_entries[gen_write_entry->ram_entry.sync_gen_number]; if (!sync_point_entry) { lderr(m_image_ctx.cct) << "Sync point missing for entry=[" << *gen_write_entry << "]" << dendl; ceph_assert(false); } else { gen_write_entry->sync_point_entry = sync_point_entry; sync_point_entry->writes++; sync_point_entry->bytes += gen_write_entry->ram_entry.write_bytes; sync_point_entry->writes_completed++; m_blocks_to_log_entries.add_log_entry(gen_write_entry); /* This entry is only dirty if its sync gen number is > the flushed * sync gen number from the root object. */ if (gen_write_entry->ram_entry.sync_gen_number > m_flushed_sync_gen) { m_dirty_log_entries.push_back(log_entry); m_bytes_dirty += gen_write_entry->bytes_dirty(); } else { gen_write_entry->set_flushed(true); sync_point_entry->writes_flushed++; } /* calc m_bytes_allocated & m_bytes_cached */ inc_allocated_cached_bytes(log_entry); } } } else { /* This entry is sync point entry */ auto sync_point_entry = static_pointer_cast(log_entry); if (sync_point_entry) { if (previous_sync_point_entry) { previous_sync_point_entry->next_sync_point_entry = sync_point_entry; if (previous_sync_point_entry->ram_entry.sync_gen_number > m_flushed_sync_gen) { sync_point_entry->prior_sync_point_flushed = false; ceph_assert(!previous_sync_point_entry->prior_sync_point_flushed || (0 == previous_sync_point_entry->writes) || (previous_sync_point_entry->writes >= previous_sync_point_entry->writes_flushed)); } else { sync_point_entry->prior_sync_point_flushed = true; ceph_assert(previous_sync_point_entry->prior_sync_point_flushed); ceph_assert(previous_sync_point_entry->writes == previous_sync_point_entry->writes_flushed); } } else { /* There are no previous sync points, so we'll consider them flushed */ sync_point_entry->prior_sync_point_flushed = true; } previous_sync_point_entry = sync_point_entry; ldout(m_image_ctx.cct, 10) << "Loaded to sync point=[" << *sync_point_entry << dendl; } } } if (0 == m_current_sync_gen) { /* If a re-opened log was completely flushed, we'll have found no sync point entries here, * and not advanced m_current_sync_gen. Here we ensure it starts past the last flushed sync * point recorded in the log. */ m_current_sync_gen = m_flushed_sync_gen; } } template void AbstractWriteLog::pwl_init(Context *on_finish, DeferredContexts &later) { CephContext *cct = m_image_ctx.cct; ldout(cct, 20) << dendl; ceph_assert(m_cache_state); std::lock_guard locker(m_lock); ceph_assert(!m_initialized); ldout(cct,5) << "image name: " << m_image_ctx.name << " id: " << m_image_ctx.id << dendl; if (!m_cache_state->present) { m_cache_state->host = ceph_get_short_hostname(); m_cache_state->size = m_image_ctx.config.template get_val( "rbd_persistent_cache_size"); string path = m_image_ctx.config.template get_val( "rbd_persistent_cache_path"); std::string pool_name = m_image_ctx.md_ctx.get_pool_name(); m_cache_state->path = path + "/rbd-pwl." + pool_name + "." + m_image_ctx.id + ".pool"; } ldout(cct,5) << "pwl_size: " << m_cache_state->size << dendl; ldout(cct,5) << "pwl_path: " << m_cache_state->path << dendl; m_log_pool_name = m_cache_state->path; m_log_pool_size = max(m_cache_state->size, MIN_POOL_SIZE); m_log_pool_size = p2align(m_log_pool_size, POOL_SIZE_ALIGN); ldout(cct, 5) << "pool " << m_log_pool_name << " size " << m_log_pool_size << " (adjusted from " << m_cache_state->size << ")" << dendl; if ((!m_cache_state->present) && (access(m_log_pool_name.c_str(), F_OK) == 0)) { ldout(cct, 5) << "There's an existing pool file " << m_log_pool_name << ", While there's no cache in the image metatata." << dendl; if (remove(m_log_pool_name.c_str()) != 0) { lderr(cct) << "failed to remove the pool file " << m_log_pool_name << dendl; on_finish->complete(-errno); return; } else { ldout(cct, 5) << "Removed the existing pool file." << dendl; } } else if ((m_cache_state->present) && (access(m_log_pool_name.c_str(), F_OK) != 0)) { lderr(cct) << "can't find the existed pool file: " << m_log_pool_name << ". error: " << cpp_strerror(-errno) << dendl; on_finish->complete(-errno); return; } bool succeeded = initialize_pool(on_finish, later); if (!succeeded) { return ; } ldout(cct,1) << "pool " << m_log_pool_name << " has " << m_total_log_entries << " log entries, " << m_free_log_entries << " of which are free." << " first_valid=" << m_first_valid_entry << ", first_free=" << m_first_free_entry << ", flushed_sync_gen=" << m_flushed_sync_gen << ", m_current_sync_gen=" << m_current_sync_gen << dendl; if (m_first_free_entry == m_first_valid_entry) { ldout(cct,1) << "write log is empty" << dendl; m_cache_state->empty = true; } /* Start the sync point following the last one seen in the * log. Flush the last sync point created during the loading of the * existing log entries. */ init_flush_new_sync_point(later); ldout(cct,20) << "new sync point = [" << m_current_sync_point << "]" << dendl; m_initialized = true; // Start the thread m_thread_pool.start(); /* Do these after we drop lock */ later.add(new LambdaContext([this](int r) { /* Log stats for the first time */ periodic_stats(); /* Arm periodic stats logging for the first time */ std::lock_guard timer_locker(*m_timer_lock); arm_periodic_stats(); })); m_image_ctx.op_work_queue->queue(on_finish, 0); } template void AbstractWriteLog::write_image_cache_state(std::unique_lock& locker) { using klass = AbstractWriteLog; Context *ctx = util::create_context_callback< klass, &klass::handle_write_image_cache_state>(this); m_cache_state->write_image_cache_state(locker, ctx); } template void AbstractWriteLog::update_image_cache_state() { ldout(m_image_ctx.cct, 10) << dendl; ceph_assert(ceph_mutex_is_locked_by_me(m_lock)); m_cache_state->allocated_bytes = m_bytes_allocated; m_cache_state->cached_bytes = m_bytes_cached; m_cache_state->dirty_bytes = m_bytes_dirty; m_cache_state->free_bytes = m_bytes_allocated_cap - m_bytes_allocated; m_cache_state->hits_full = m_perfcounter->get(l_librbd_pwl_rd_hit_req); m_cache_state->hits_partial = m_perfcounter->get(l_librbd_pwl_rd_part_hit_req); m_cache_state->misses = m_perfcounter->get(l_librbd_pwl_rd_req) - m_cache_state->hits_full - m_cache_state->hits_partial; m_cache_state->hit_bytes = m_perfcounter->get(l_librbd_pwl_rd_hit_bytes); m_cache_state->miss_bytes = m_perfcounter->get(l_librbd_pwl_rd_bytes) - m_cache_state->hit_bytes; } template void AbstractWriteLog::handle_write_image_cache_state(int r) { CephContext *cct = m_image_ctx.cct; ldout(cct, 10) << "r=" << r << dendl; if (r < 0) { lderr(cct) << "failed to update image cache state: " << cpp_strerror(r) << dendl; return; } } template void AbstractWriteLog::init(Context *on_finish) { CephContext *cct = m_image_ctx.cct; ldout(cct, 20) << dendl; auto pname = std::string("librbd-pwl-") + m_image_ctx.id + std::string("-") + m_image_ctx.md_ctx.get_pool_name() + std::string("-") + m_image_ctx.name; perf_start(pname); ceph_assert(!m_initialized); Context *ctx = new LambdaContext( [this, on_finish](int r) { if (r >= 0) { std::unique_lock locker(m_lock); update_image_cache_state(); m_cache_state->write_image_cache_state(locker, on_finish); } else { on_finish->complete(r); } }); DeferredContexts later; pwl_init(ctx, later); } template void AbstractWriteLog::shut_down(Context *on_finish) { CephContext *cct = m_image_ctx.cct; ldout(cct, 20) << dendl; ldout(cct,5) << "image name: " << m_image_ctx.name << " id: " << m_image_ctx.id << dendl; Context *ctx = new LambdaContext( [this, on_finish](int r) { if (m_perfcounter) { perf_stop(); } ldout(m_image_ctx.cct, 6) << "shutdown complete" << dendl; m_image_ctx.op_work_queue->queue(on_finish, r); }); ctx = new LambdaContext( [this, ctx](int r) { ldout(m_image_ctx.cct, 6) << "image cache cleaned" << dendl; Context *next_ctx = override_ctx(r, ctx); periodic_stats(); std::unique_lock locker(m_lock); check_image_cache_state_clean(); m_wake_up_enabled = false; m_log_entries.clear(); m_cache_state->clean = true; m_cache_state->empty = true; remove_pool_file(); update_image_cache_state(); m_cache_state->write_image_cache_state(locker, next_ctx); }); ctx = new LambdaContext( [this, ctx](int r) { Context *next_ctx = override_ctx(r, ctx); ldout(m_image_ctx.cct, 6) << "waiting for in flight operations" << dendl; // Wait for in progress IOs to complete next_ctx = util::create_async_context_callback(&m_work_queue, next_ctx); m_async_op_tracker.wait_for_ops(next_ctx); }); ctx = new LambdaContext( [this, ctx](int r) { Context *next_ctx = override_ctx(r, ctx); { /* Sync with process_writeback_dirty_entries() */ RWLock::WLocker entry_reader_wlocker(m_entry_reader_lock); m_shutting_down = true; /* Flush all writes to OSDs (unless disabled) and wait for all in-progress flush writes to complete */ ldout(m_image_ctx.cct, 6) << "flushing" << dendl; periodic_stats(); } flush_dirty_entries(next_ctx); }); ctx = new LambdaContext( [this, ctx](int r) { ldout(m_image_ctx.cct, 6) << "Done internal_flush in shutdown" << dendl; m_work_queue.queue(ctx, r); }); /* Complete all in-flight writes before shutting down */ ldout(m_image_ctx.cct, 6) << "internal_flush in shutdown" << dendl; internal_flush(false, ctx); } template void AbstractWriteLog::read(Extents&& image_extents, ceph::bufferlist* bl, int fadvise_flags, Context *on_finish) { CephContext *cct = m_image_ctx.cct; utime_t now = ceph_clock_now(); on_finish = new LambdaContext( [this, on_finish](int r) { m_async_op_tracker.finish_op(); on_finish->complete(r); }); C_ReadRequest *read_ctx = m_builder->create_read_request( cct, now, m_perfcounter, bl, on_finish); ldout(cct, 20) << "name: " << m_image_ctx.name << " id: " << m_image_ctx.id << "image_extents=" << image_extents << ", bl=" << bl << ", on_finish=" << on_finish << dendl; ceph_assert(m_initialized); bl->clear(); m_perfcounter->inc(l_librbd_pwl_rd_req, 1); std::vector> log_entries_to_read; std::vector bls_to_read; m_async_op_tracker.start_op(); Context *ctx = new LambdaContext( [this, read_ctx, fadvise_flags](int r) { if (read_ctx->miss_extents.empty()) { /* All of this read comes from RWL */ read_ctx->complete(0); } else { /* Pass the read misses on to the layer below RWL */ m_image_writeback.aio_read( std::move(read_ctx->miss_extents), &read_ctx->miss_bl, fadvise_flags, read_ctx); } }); /* * The strategy here is to look up all the WriteLogMapEntries that overlap * this read, and iterate through those to separate this read into hits and * misses. A new Extents object is produced here with Extents for each miss * region. The miss Extents is then passed on to the read cache below RWL. We * also produce an ImageExtentBufs for all the extents (hit or miss) in this * read. When the read from the lower cache layer completes, we iterate * through the ImageExtentBufs and insert buffers for each cache hit at the * appropriate spot in the bufferlist returned from below for the miss * read. The buffers we insert here refer directly to regions of various * write log entry data buffers. * * Locking: These buffer objects hold a reference on the write log entries * they refer to. Log entries can't be retired until there are no references. * The GenericWriteLogEntry references are released by the buffer destructor. */ for (auto &extent : image_extents) { uint64_t extent_offset = 0; RWLock::RLocker entry_reader_locker(m_entry_reader_lock); WriteLogMapEntries map_entries = m_blocks_to_log_entries.find_map_entries( block_extent(extent)); for (auto &map_entry : map_entries) { Extent entry_image_extent(pwl::image_extent(map_entry.block_extent)); /* If this map entry starts after the current image extent offset ... */ if (entry_image_extent.first > extent.first + extent_offset) { /* ... add range before map_entry to miss extents */ uint64_t miss_extent_start = extent.first + extent_offset; uint64_t miss_extent_length = entry_image_extent.first - miss_extent_start; Extent miss_extent(miss_extent_start, miss_extent_length); read_ctx->miss_extents.push_back(miss_extent); /* Add miss range to read extents */ auto miss_extent_buf = std::make_shared(miss_extent); read_ctx->read_extents.push_back(miss_extent_buf); extent_offset += miss_extent_length; } ceph_assert(entry_image_extent.first <= extent.first + extent_offset); uint64_t entry_offset = 0; /* If this map entry starts before the current image extent offset ... */ if (entry_image_extent.first < extent.first + extent_offset) { /* ... compute offset into log entry for this read extent */ entry_offset = (extent.first + extent_offset) - entry_image_extent.first; } /* This read hit ends at the end of the extent or the end of the log entry, whichever is less. */ uint64_t entry_hit_length = min(entry_image_extent.second - entry_offset, extent.second - extent_offset); Extent hit_extent(entry_image_extent.first, entry_hit_length); if (0 == map_entry.log_entry->write_bytes() && 0 < map_entry.log_entry->bytes_dirty()) { /* discard log entry */ ldout(cct, 20) << "discard log entry" << dendl; auto discard_entry = map_entry.log_entry; ldout(cct, 20) << "read hit on discard entry: log_entry=" << *discard_entry << dendl; /* Discards read as zero, so we'll construct a bufferlist of zeros */ bufferlist zero_bl; zero_bl.append_zero(entry_hit_length); /* Add hit extent to read extents */ auto hit_extent_buf = std::make_shared( hit_extent, zero_bl); read_ctx->read_extents.push_back(hit_extent_buf); } else { ldout(cct, 20) << "write or writesame log entry" << dendl; /* write and writesame log entry */ /* Offset of the map entry into the log entry's buffer */ uint64_t map_entry_buffer_offset = entry_image_extent.first - map_entry.log_entry->ram_entry.image_offset_bytes; /* Offset into the log entry buffer of this read hit */ uint64_t read_buffer_offset = map_entry_buffer_offset + entry_offset; /* Create buffer object referring to pmem pool for this read hit */ collect_read_extents( read_buffer_offset, map_entry, log_entries_to_read, bls_to_read, entry_hit_length, hit_extent, read_ctx); } /* Exclude RWL hit range from buffer and extent */ extent_offset += entry_hit_length; ldout(cct, 20) << map_entry << dendl; } /* If the last map entry didn't consume the entire image extent ... */ if (extent.second > extent_offset) { /* ... add the rest of this extent to miss extents */ uint64_t miss_extent_start = extent.first + extent_offset; uint64_t miss_extent_length = extent.second - extent_offset; Extent miss_extent(miss_extent_start, miss_extent_length); read_ctx->miss_extents.push_back(miss_extent); /* Add miss range to read extents */ auto miss_extent_buf = std::make_shared(miss_extent); read_ctx->read_extents.push_back(miss_extent_buf); extent_offset += miss_extent_length; } } ldout(cct, 20) << "miss_extents=" << read_ctx->miss_extents << ", miss_bl=" << read_ctx->miss_bl << dendl; complete_read(log_entries_to_read, bls_to_read, ctx); } template void AbstractWriteLog::write(Extents &&image_extents, bufferlist&& bl, int fadvise_flags, Context *on_finish) { CephContext *cct = m_image_ctx.cct; ldout(cct, 20) << "aio_write" << dendl; utime_t now = ceph_clock_now(); m_perfcounter->inc(l_librbd_pwl_wr_req, 1); ceph_assert(m_initialized); /* Split image extents larger than 1M. This isn't strictly necessary but * makes libpmemobj allocator's job easier and reduces pmemobj_defrag() cost. * We plan to manage pmem space and allocation by ourselves in the future. */ Extents split_image_extents; uint64_t max_extent_size = get_max_extent(); if (max_extent_size != 0) { for (auto extent : image_extents) { if (extent.second > max_extent_size) { uint64_t off = extent.first; uint64_t extent_bytes = extent.second; for (int i = 0; extent_bytes != 0; ++i) { Extent _ext; _ext.first = off + i * max_extent_size; _ext.second = std::min(max_extent_size, extent_bytes); extent_bytes = extent_bytes - _ext.second ; split_image_extents.emplace_back(_ext); } } else { split_image_extents.emplace_back(extent); } } } else { split_image_extents = image_extents; } C_WriteRequestT *write_req = m_builder->create_write_request(*this, now, std::move(split_image_extents), std::move(bl), fadvise_flags, m_lock, m_perfcounter, on_finish); m_perfcounter->inc(l_librbd_pwl_wr_bytes, write_req->image_extents_summary.total_bytes); /* The lambda below will be called when the block guard for all * blocks affected by this write is obtained */ GuardedRequestFunctionContext *guarded_ctx = new GuardedRequestFunctionContext([this, write_req](GuardedRequestFunctionContext &guard_ctx) { write_req->blockguard_acquired(guard_ctx); alloc_and_dispatch_io_req(write_req); }); detain_guarded_request(write_req, guarded_ctx, false); } template void AbstractWriteLog::discard(uint64_t offset, uint64_t length, uint32_t discard_granularity_bytes, Context *on_finish) { CephContext *cct = m_image_ctx.cct; ldout(cct, 20) << dendl; utime_t now = ceph_clock_now(); m_perfcounter->inc(l_librbd_pwl_discard, 1); Extents discard_extents = {{offset, length}}; m_discard_granularity_bytes = discard_granularity_bytes; ceph_assert(m_initialized); auto *discard_req = new C_DiscardRequestT(*this, now, std::move(discard_extents), discard_granularity_bytes, m_lock, m_perfcounter, on_finish); /* The lambda below will be called when the block guard for all * blocks affected by this write is obtained */ GuardedRequestFunctionContext *guarded_ctx = new GuardedRequestFunctionContext([this, discard_req](GuardedRequestFunctionContext &guard_ctx) { discard_req->blockguard_acquired(guard_ctx); alloc_and_dispatch_io_req(discard_req); }); detain_guarded_request(discard_req, guarded_ctx, false); } /** * Aio_flush completes when all previously completed writes are * flushed to persistent cache. We make a best-effort attempt to also * defer until all in-progress writes complete, but we may not know * about all of the writes the application considers in-progress yet, * due to uncertainty in the IO submission workq (multiple WQ threads * may allow out-of-order submission). * * This flush operation will not wait for writes deferred for overlap * in the block guard. */ template void AbstractWriteLog::flush(io::FlushSource flush_source, Context *on_finish) { CephContext *cct = m_image_ctx.cct; ldout(cct, 20) << "on_finish=" << on_finish << " flush_source=" << flush_source << dendl; if (io::FLUSH_SOURCE_SHUTDOWN == flush_source || io::FLUSH_SOURCE_INTERNAL == flush_source || io::FLUSH_SOURCE_WRITE_BLOCK == flush_source) { internal_flush(false, on_finish); return; } m_perfcounter->inc(l_librbd_pwl_aio_flush, 1); /* May be called even if initialization fails */ if (!m_initialized) { ldout(cct, 05) << "never initialized" << dendl; /* Deadlock if completed here */ m_image_ctx.op_work_queue->queue(on_finish, 0); return; } { std::shared_lock image_locker(m_image_ctx.image_lock); if (m_image_ctx.snap_id != CEPH_NOSNAP || m_image_ctx.read_only) { on_finish->complete(-EROFS); return; } } auto flush_req = make_flush_req(on_finish); GuardedRequestFunctionContext *guarded_ctx = new GuardedRequestFunctionContext([this, flush_req](GuardedRequestFunctionContext &guard_ctx) { ldout(m_image_ctx.cct, 20) << "flush_req=" << flush_req << " cell=" << guard_ctx.cell << dendl; ceph_assert(guard_ctx.cell); flush_req->detained = guard_ctx.state.detained; /* We don't call flush_req->set_cell(), because the block guard will be released here */ { DeferredContexts post_unlock; /* Do these when the lock below is released */ std::lock_guard locker(m_lock); if (!m_persist_on_flush && m_persist_on_write_until_flush) { m_persist_on_flush = true; ldout(m_image_ctx.cct, 5) << "now persisting on flush" << dendl; } /* * Create a new sync point if there have been writes since the last * one. * * We do not flush the caches below the RWL here. */ flush_new_sync_point_if_needed(flush_req, post_unlock); } release_guarded_request(guard_ctx.cell); }); detain_guarded_request(flush_req, guarded_ctx, true); } template void AbstractWriteLog::writesame(uint64_t offset, uint64_t length, bufferlist&& bl, int fadvise_flags, Context *on_finish) { CephContext *cct = m_image_ctx.cct; ldout(cct, 20) << "aio_writesame" << dendl; utime_t now = ceph_clock_now(); Extents ws_extents = {{offset, length}}; m_perfcounter->inc(l_librbd_pwl_ws, 1); ceph_assert(m_initialized); /* A write same request is also a write request. The key difference is the * write same data buffer is shorter than the extent of the request. The full * extent will be used in the block guard, and appear in * m_blocks_to_log_entries_map. The data buffer allocated for the WS is only * as long as the length of the bl here, which is the pattern that's repeated * in the image for the entire length of this WS. Read hits and flushing of * write sames are different than normal writes. */ C_WriteSameRequestT *ws_req = m_builder->create_writesame_request(*this, now, std::move(ws_extents), std::move(bl), fadvise_flags, m_lock, m_perfcounter, on_finish); m_perfcounter->inc(l_librbd_pwl_ws_bytes, ws_req->image_extents_summary.total_bytes); /* The lambda below will be called when the block guard for all * blocks affected by this write is obtained */ GuardedRequestFunctionContext *guarded_ctx = new GuardedRequestFunctionContext([this, ws_req](GuardedRequestFunctionContext &guard_ctx) { ws_req->blockguard_acquired(guard_ctx); alloc_and_dispatch_io_req(ws_req); }); detain_guarded_request(ws_req, guarded_ctx, false); } template void AbstractWriteLog::compare_and_write(Extents &&image_extents, bufferlist&& cmp_bl, bufferlist&& bl, uint64_t *mismatch_offset, int fadvise_flags, Context *on_finish) { ldout(m_image_ctx.cct, 20) << dendl; utime_t now = ceph_clock_now(); m_perfcounter->inc(l_librbd_pwl_cmp, 1); ceph_assert(m_initialized); /* A compare and write request is also a write request. We only allocate * resources and dispatch this write request if the compare phase * succeeds. */ C_WriteRequestT *cw_req = m_builder->create_comp_and_write_request( *this, now, std::move(image_extents), std::move(cmp_bl), std::move(bl), mismatch_offset, fadvise_flags, m_lock, m_perfcounter, on_finish); m_perfcounter->inc(l_librbd_pwl_cmp_bytes, cw_req->image_extents_summary.total_bytes); /* The lambda below will be called when the block guard for all * blocks affected by this write is obtained */ GuardedRequestFunctionContext *guarded_ctx = new GuardedRequestFunctionContext([this, cw_req](GuardedRequestFunctionContext &guard_ctx) { cw_req->blockguard_acquired(guard_ctx); auto read_complete_ctx = new LambdaContext( [this, cw_req](int r) { ldout(m_image_ctx.cct, 20) << "name: " << m_image_ctx.name << " id: " << m_image_ctx.id << "cw_req=" << cw_req << dendl; /* Compare read_bl to cmp_bl to determine if this will produce a write */ ceph_assert(cw_req->read_bl.length() <= cw_req->cmp_bl.length()); ceph_assert(cw_req->read_bl.length() == cw_req->image_extents_summary.total_bytes); bufferlist sub_cmp_bl; sub_cmp_bl.substr_of(cw_req->cmp_bl, 0, cw_req->read_bl.length()); if (sub_cmp_bl.contents_equal(cw_req->read_bl)) { /* Compare phase succeeds. Begin write */ ldout(m_image_ctx.cct, 5) << " cw_req=" << cw_req << " compare matched" << dendl; cw_req->compare_succeeded = true; *cw_req->mismatch_offset = 0; /* Continue with this request as a write. Blockguard release and * user request completion handled as if this were a plain * write. */ alloc_and_dispatch_io_req(cw_req); } else { /* Compare phase fails. Comp-and write ends now. */ ldout(m_image_ctx.cct, 15) << " cw_req=" << cw_req << " compare failed" << dendl; /* Bufferlist doesn't tell us where they differed, so we'll have to determine that here */ uint64_t bl_index = 0; for (bl_index = 0; bl_index < sub_cmp_bl.length(); bl_index++) { if (sub_cmp_bl[bl_index] != cw_req->read_bl[bl_index]) { ldout(m_image_ctx.cct, 15) << " cw_req=" << cw_req << " mismatch at " << bl_index << dendl; break; } } cw_req->compare_succeeded = false; *cw_req->mismatch_offset = bl_index; cw_req->complete_user_request(-EILSEQ); cw_req->release_cell(); cw_req->complete(0); } }); /* Read phase of comp-and-write must read through RWL */ Extents image_extents_copy = cw_req->image_extents; read(std::move(image_extents_copy), &cw_req->read_bl, cw_req->fadvise_flags, read_complete_ctx); }); detain_guarded_request(cw_req, guarded_ctx, false); } template void AbstractWriteLog::flush(Context *on_finish) { internal_flush(false, on_finish); } template void AbstractWriteLog::invalidate(Context *on_finish) { internal_flush(true, on_finish); } template CephContext *AbstractWriteLog::get_context() { return m_image_ctx.cct; } template BlockGuardCell* AbstractWriteLog::detain_guarded_request_helper(GuardedRequest &req) { CephContext *cct = m_image_ctx.cct; BlockGuardCell *cell; ceph_assert(ceph_mutex_is_locked_by_me(m_blockguard_lock)); ldout(cct, 20) << dendl; int r = m_write_log_guard.detain(req.block_extent, &req, &cell); ceph_assert(r>=0); if (r > 0) { ldout(cct, 20) << "detaining guarded request due to in-flight requests: " << "req=" << req << dendl; return nullptr; } ldout(cct, 20) << "in-flight request cell: " << cell << dendl; return cell; } template BlockGuardCell* AbstractWriteLog::detain_guarded_request_barrier_helper( GuardedRequest &req) { BlockGuardCell *cell = nullptr; ceph_assert(ceph_mutex_is_locked_by_me(m_blockguard_lock)); ldout(m_image_ctx.cct, 20) << dendl; if (m_barrier_in_progress) { req.guard_ctx->state.queued = true; m_awaiting_barrier.push_back(req); } else { bool barrier = req.guard_ctx->state.barrier; if (barrier) { m_barrier_in_progress = true; req.guard_ctx->state.current_barrier = true; } cell = detain_guarded_request_helper(req); if (barrier) { /* Only non-null if the barrier acquires the guard now */ m_barrier_cell = cell; } } return cell; } template void AbstractWriteLog::detain_guarded_request( C_BlockIORequestT *request, GuardedRequestFunctionContext *guarded_ctx, bool is_barrier) { BlockExtent extent; if (request) { extent = request->image_extents_summary.block_extent(); } else { extent = block_extent(whole_volume_extent()); } auto req = GuardedRequest(extent, guarded_ctx, is_barrier); BlockGuardCell *cell = nullptr; ldout(m_image_ctx.cct, 20) << dendl; { std::lock_guard locker(m_blockguard_lock); cell = detain_guarded_request_barrier_helper(req); } if (cell) { req.guard_ctx->cell = cell; req.guard_ctx->complete(0); } } template void AbstractWriteLog::release_guarded_request(BlockGuardCell *released_cell) { CephContext *cct = m_image_ctx.cct; WriteLogGuard::BlockOperations block_reqs; ldout(cct, 20) << "released_cell=" << released_cell << dendl; { std::lock_guard locker(m_blockguard_lock); m_write_log_guard.release(released_cell, &block_reqs); for (auto &req : block_reqs) { req.guard_ctx->state.detained = true; BlockGuardCell *detained_cell = detain_guarded_request_helper(req); if (detained_cell) { if (req.guard_ctx->state.current_barrier) { /* The current barrier is acquiring the block guard, so now we know its cell */ m_barrier_cell = detained_cell; /* detained_cell could be == released_cell here */ ldout(cct, 20) << "current barrier cell=" << detained_cell << " req=" << req << dendl; } req.guard_ctx->cell = detained_cell; m_work_queue.queue(req.guard_ctx); } } if (m_barrier_in_progress && (released_cell == m_barrier_cell)) { ldout(cct, 20) << "current barrier released cell=" << released_cell << dendl; /* The released cell is the current barrier request */ m_barrier_in_progress = false; m_barrier_cell = nullptr; /* Move waiting requests into the blockguard. Stop if there's another barrier */ while (!m_barrier_in_progress && !m_awaiting_barrier.empty()) { auto &req = m_awaiting_barrier.front(); ldout(cct, 20) << "submitting queued request to blockguard: " << req << dendl; BlockGuardCell *detained_cell = detain_guarded_request_barrier_helper(req); if (detained_cell) { req.guard_ctx->cell = detained_cell; m_work_queue.queue(req.guard_ctx); } m_awaiting_barrier.pop_front(); } } } ldout(cct, 20) << "exit" << dendl; } template void AbstractWriteLog::append_scheduled(GenericLogOperations &ops, bool &ops_remain, bool &appending, bool isRWL) { const unsigned long int OPS_APPENDED = isRWL ? MAX_ALLOC_PER_TRANSACTION : MAX_WRITES_PER_SYNC_POINT; { std::lock_guard locker(m_lock); if (!appending && m_appending) { /* Another thread is appending */ ldout(m_image_ctx.cct, 15) << "Another thread is appending" << dendl; return; } if (m_ops_to_append.size()) { appending = true; m_appending = true; auto last_in_batch = m_ops_to_append.begin(); unsigned int ops_to_append = m_ops_to_append.size(); if (ops_to_append > OPS_APPENDED) { ops_to_append = OPS_APPENDED; } std::advance(last_in_batch, ops_to_append); ops.splice(ops.end(), m_ops_to_append, m_ops_to_append.begin(), last_in_batch); ops_remain = true; /* Always check again before leaving */ ldout(m_image_ctx.cct, 20) << "appending " << ops.size() << ", remain " << m_ops_to_append.size() << dendl; } else if (isRWL) { ops_remain = false; if (appending) { appending = false; m_appending = false; } } } } template void AbstractWriteLog::schedule_append(GenericLogOperationsVector &ops, C_BlockIORequestT *req) { GenericLogOperations to_append(ops.begin(), ops.end()); schedule_append_ops(to_append, req); } template void AbstractWriteLog::schedule_append(GenericLogOperationSharedPtr op, C_BlockIORequestT *req) { GenericLogOperations to_append { op }; schedule_append_ops(to_append, req); } /* * Complete a set of write ops with the result of append_op_entries. */ template void AbstractWriteLog::complete_op_log_entries(GenericLogOperations &&ops, const int result) { GenericLogEntries dirty_entries; int published_reserves = 0; ldout(m_image_ctx.cct, 20) << __func__ << ": completing" << dendl; for (auto &op : ops) { utime_t now = ceph_clock_now(); auto log_entry = op->get_log_entry(); log_entry->completed = true; if (op->is_writing_op()) { op->mark_log_entry_completed(); dirty_entries.push_back(log_entry); } if (log_entry->is_write_entry()) { release_ram(log_entry); } if (op->reserved_allocated()) { published_reserves++; } { std::lock_guard locker(m_lock); m_unpublished_reserves -= published_reserves; m_dirty_log_entries.splice(m_dirty_log_entries.end(), dirty_entries); } op->complete(result); m_perfcounter->tinc(l_librbd_pwl_log_op_dis_to_app_t, op->log_append_start_time - op->dispatch_time); m_perfcounter->tinc(l_librbd_pwl_log_op_dis_to_cmp_t, now - op->dispatch_time); m_perfcounter->hinc(l_librbd_pwl_log_op_dis_to_cmp_t_hist, utime_t(now - op->dispatch_time).to_nsec(), log_entry->ram_entry.write_bytes); utime_t app_lat = op->log_append_comp_time - op->log_append_start_time; m_perfcounter->tinc(l_librbd_pwl_log_op_app_to_appc_t, app_lat); m_perfcounter->hinc(l_librbd_pwl_log_op_app_to_appc_t_hist, app_lat.to_nsec(), log_entry->ram_entry.write_bytes); m_perfcounter->tinc(l_librbd_pwl_log_op_app_to_cmp_t, now - op->log_append_start_time); } // New entries may be flushable { std::lock_guard locker(m_lock); wake_up(); } } /** * Dispatch as many deferred writes as possible */ template void AbstractWriteLog::dispatch_deferred_writes(void) { C_BlockIORequestT *front_req = nullptr; /* req still on front of deferred list */ C_BlockIORequestT *allocated_req = nullptr; /* req that was allocated, and is now off the list */ bool allocated = false; /* front_req allocate succeeded */ bool cleared_dispatching_flag = false; /* If we can't become the dispatcher, we'll exit */ { std::lock_guard locker(m_lock); if (m_dispatching_deferred_ops || !m_deferred_ios.size()) { return; } m_dispatching_deferred_ops = true; } /* There are ops to dispatch, and this should be the only thread dispatching them */ { std::lock_guard deferred_dispatch(m_deferred_dispatch_lock); do { { std::lock_guard locker(m_lock); ceph_assert(m_dispatching_deferred_ops); if (allocated) { /* On the 2..n-1 th time we get lock, front_req->alloc_resources() will * have succeeded, and we'll need to pop it off the deferred ops list * here. */ ceph_assert(front_req); ceph_assert(!allocated_req); m_deferred_ios.pop_front(); allocated_req = front_req; front_req = nullptr; allocated = false; } ceph_assert(!allocated); if (!allocated && front_req) { /* front_req->alloc_resources() failed on the last iteration. * We'll stop dispatching. */ wake_up(); front_req = nullptr; ceph_assert(!cleared_dispatching_flag); m_dispatching_deferred_ops = false; cleared_dispatching_flag = true; } else { ceph_assert(!front_req); if (m_deferred_ios.size()) { /* New allocation candidate */ front_req = m_deferred_ios.front(); } else { ceph_assert(!cleared_dispatching_flag); m_dispatching_deferred_ops = false; cleared_dispatching_flag = true; } } } /* Try allocating for front_req before we decide what to do with allocated_req * (if any) */ if (front_req) { ceph_assert(!cleared_dispatching_flag); allocated = front_req->alloc_resources(); } if (allocated_req && front_req && allocated) { /* Push dispatch of the first allocated req to a wq */ m_work_queue.queue(new LambdaContext( [allocated_req](int r) { allocated_req->dispatch(); }), 0); allocated_req = nullptr; } ceph_assert(!(allocated_req && front_req && allocated)); /* Continue while we're still considering the front of the deferred ops list */ } while (front_req); ceph_assert(!allocated); } ceph_assert(cleared_dispatching_flag); /* If any deferred requests were allocated, the last one will still be in allocated_req */ if (allocated_req) { allocated_req->dispatch(); } } /** * Returns the lanes used by this write, and attempts to dispatch the next * deferred write */ template void AbstractWriteLog::release_write_lanes(C_BlockIORequestT *req) { { std::lock_guard locker(m_lock); m_free_lanes += req->image_extents.size(); } dispatch_deferred_writes(); } /** * Attempts to allocate log resources for a write. Write is dispatched if * resources are available, or queued if they aren't. */ template void AbstractWriteLog::alloc_and_dispatch_io_req(C_BlockIORequestT *req) { bool dispatch_here = false; { /* If there are already deferred writes, queue behind them for resources */ { std::lock_guard locker(m_lock); dispatch_here = m_deferred_ios.empty(); // Only flush req's total_bytes is the max uint64 if (req->image_extents_summary.total_bytes == std::numeric_limits::max() && static_cast(req)->internal == true) { dispatch_here = true; } } if (dispatch_here) { dispatch_here = req->alloc_resources(); } if (dispatch_here) { ldout(m_image_ctx.cct, 20) << "dispatching" << dendl; req->dispatch(); } else { req->deferred(); { std::lock_guard locker(m_lock); m_deferred_ios.push_back(req); } ldout(m_image_ctx.cct, 20) << "deferred IOs: " << m_deferred_ios.size() << dendl; dispatch_deferred_writes(); } } } template bool AbstractWriteLog::check_allocation( C_BlockIORequestT *req, uint64_t bytes_cached, uint64_t bytes_dirtied, uint64_t bytes_allocated, uint32_t num_lanes, uint32_t num_log_entries, uint32_t num_unpublished_reserves) { bool alloc_succeeds = true; bool no_space = false; { std::lock_guard locker(m_lock); if (m_free_lanes < num_lanes) { ldout(m_image_ctx.cct, 20) << "not enough free lanes (need " << num_lanes << ", have " << m_free_lanes << ") " << *req << dendl; alloc_succeeds = false; /* This isn't considered a "no space" alloc fail. Lanes are a throttling mechanism. */ } if (m_free_log_entries < num_log_entries) { ldout(m_image_ctx.cct, 20) << "not enough free entries (need " << num_log_entries << ", have " << m_free_log_entries << ") " << *req << dendl; alloc_succeeds = false; no_space = true; /* Entries must be retired */ } /* Don't attempt buffer allocate if we've exceeded the "full" threshold */ if (m_bytes_allocated + bytes_allocated > m_bytes_allocated_cap) { ldout(m_image_ctx.cct, 20) << "Waiting for allocation cap (cap=" << m_bytes_allocated_cap << ", allocated=" << m_bytes_allocated << ") in write [" << *req << "]" << dendl; alloc_succeeds = false; no_space = true; /* Entries must be retired */ } } if (alloc_succeeds) { reserve_cache(req, alloc_succeeds, no_space); } if (alloc_succeeds) { std::unique_lock locker(m_lock); /* We need one free log entry per extent (each is a separate entry), and * one free "lane" for remote replication. */ if ((m_free_lanes >= num_lanes) && (m_free_log_entries >= num_log_entries) && (m_bytes_allocated_cap >= m_bytes_allocated + bytes_allocated)) { m_free_lanes -= num_lanes; m_free_log_entries -= num_log_entries; m_unpublished_reserves += num_unpublished_reserves; m_bytes_allocated += bytes_allocated; m_bytes_cached += bytes_cached; m_bytes_dirty += bytes_dirtied; if (m_cache_state->clean && bytes_dirtied > 0) { m_cache_state->clean = false; update_image_cache_state(); write_image_cache_state(locker); } } else { alloc_succeeds = false; } } if (!alloc_succeeds && no_space) { /* Expedite flushing and/or retiring */ std::lock_guard locker(m_lock); m_alloc_failed_since_retire = true; m_last_alloc_fail = ceph_clock_now(); } return alloc_succeeds; } template C_FlushRequest>* AbstractWriteLog::make_flush_req(Context *on_finish) { utime_t flush_begins = ceph_clock_now(); bufferlist bl; auto *flush_req = new C_FlushRequestT(*this, flush_begins, Extents({whole_volume_extent()}), std::move(bl), 0, m_lock, m_perfcounter, on_finish); return flush_req; } template void AbstractWriteLog::wake_up() { CephContext *cct = m_image_ctx.cct; ceph_assert(ceph_mutex_is_locked_by_me(m_lock)); if (!m_wake_up_enabled) { // wake_up is disabled during shutdown after flushing completes ldout(m_image_ctx.cct, 6) << "deferred processing disabled" << dendl; return; } if (m_wake_up_requested && m_wake_up_scheduled) { return; } ldout(cct, 20) << dendl; /* Wake-up can be requested while it's already scheduled */ m_wake_up_requested = true; /* Wake-up cannot be scheduled if it's already scheduled */ if (m_wake_up_scheduled) { return; } m_wake_up_scheduled = true; m_async_process_work++; m_async_op_tracker.start_op(); m_work_queue.queue(new LambdaContext( [this](int r) { process_work(); m_async_op_tracker.finish_op(); m_async_process_work--; }), 0); } template bool AbstractWriteLog::can_flush_entry(std::shared_ptr log_entry) { CephContext *cct = m_image_ctx.cct; ldout(cct, 20) << "" << dendl; ceph_assert(ceph_mutex_is_locked_by_me(m_lock)); if (m_invalidating) { return true; } /* For OWB we can flush entries with the same sync gen number (write between * aio_flush() calls) concurrently. Here we'll consider an entry flushable if * its sync gen number is <= the lowest sync gen number carried by all the * entries currently flushing. * * If the entry considered here bears a sync gen number lower than a * previously flushed entry, the application had to have submitted the write * bearing the higher gen number before the write with the lower gen number * completed. So, flushing these concurrently is OK. * * If the entry considered here bears a sync gen number higher than a * currently flushing entry, the write with the lower gen number may have * completed to the application before the write with the higher sync gen * number was submitted, and the application may rely on that completion * order for volume consistency. In this case the entry will not be * considered flushable until all the entries bearing lower sync gen numbers * finish flushing. */ if (m_flush_ops_in_flight && (log_entry->ram_entry.sync_gen_number > m_lowest_flushing_sync_gen)) { return false; } return (log_entry->can_writeback() && (m_flush_ops_in_flight <= IN_FLIGHT_FLUSH_WRITE_LIMIT) && (m_flush_bytes_in_flight <= IN_FLIGHT_FLUSH_BYTES_LIMIT)); } template void AbstractWriteLog::detain_flush_guard_request(std::shared_ptr log_entry, GuardedRequestFunctionContext *guarded_ctx) { ldout(m_image_ctx.cct, 20) << dendl; BlockExtent extent; if (log_entry->is_sync_point()) { extent = block_extent(whole_volume_extent()); } else { extent = log_entry->ram_entry.block_extent(); } auto req = GuardedRequest(extent, guarded_ctx, false); BlockGuardCell *cell = nullptr; { std::lock_guard locker(m_flush_guard_lock); m_flush_guard.detain(req.block_extent, &req, &cell); } if (cell) { req.guard_ctx->cell = cell; m_image_ctx.op_work_queue->queue(req.guard_ctx, 0); } } template Context* AbstractWriteLog::construct_flush_entry(std::shared_ptr log_entry, bool invalidating) { ldout(m_image_ctx.cct, 20) << "" << dendl; /* Flush write completion action */ utime_t writeback_start_time = ceph_clock_now(); Context *ctx = new LambdaContext( [this, log_entry, writeback_start_time, invalidating](int r) { utime_t writeback_comp_time = ceph_clock_now(); m_perfcounter->tinc(l_librbd_pwl_writeback_latency, writeback_comp_time - writeback_start_time); { std::lock_guard locker(m_lock); if (r < 0) { lderr(m_image_ctx.cct) << "failed to flush log entry" << cpp_strerror(r) << dendl; m_dirty_log_entries.push_front(log_entry); } else { ceph_assert(m_bytes_dirty >= log_entry->bytes_dirty()); log_entry->set_flushed(true); m_bytes_dirty -= log_entry->bytes_dirty(); sync_point_writer_flushed(log_entry->get_sync_point_entry()); ldout(m_image_ctx.cct, 20) << "flushed: " << log_entry << " invalidating=" << invalidating << dendl; } m_flush_ops_in_flight -= 1; m_flush_bytes_in_flight -= log_entry->ram_entry.write_bytes; wake_up(); } }); /* Flush through lower cache before completing */ ctx = new LambdaContext( [this, ctx, log_entry](int r) { { WriteLogGuard::BlockOperations block_reqs; BlockGuardCell *detained_cell = nullptr; std::lock_guard locker{m_flush_guard_lock}; m_flush_guard.release(log_entry->m_cell, &block_reqs); for (auto &req : block_reqs) { m_flush_guard.detain(req.block_extent, &req, &detained_cell); if (detained_cell) { req.guard_ctx->cell = detained_cell; m_image_ctx.op_work_queue->queue(req.guard_ctx, 0); } } } if (r < 0) { lderr(m_image_ctx.cct) << "failed to flush log entry" << cpp_strerror(r) << dendl; ctx->complete(r); } else { m_image_writeback.aio_flush(io::FLUSH_SOURCE_WRITEBACK, ctx); } }); return ctx; } template void AbstractWriteLog::process_writeback_dirty_entries() { CephContext *cct = m_image_ctx.cct; bool all_clean = false; int flushed = 0; bool has_write_entry = false; bool need_update_state = false; ldout(cct, 20) << "Look for dirty entries" << dendl; { DeferredContexts post_unlock; GenericLogEntries entries_to_flush; std::shared_lock entry_reader_locker(m_entry_reader_lock); std::lock_guard locker(m_lock); while (flushed < IN_FLIGHT_FLUSH_WRITE_LIMIT) { if (m_shutting_down) { ldout(cct, 5) << "Flush during shutdown supressed" << dendl; /* Do flush complete only when all flush ops are finished */ all_clean = !m_flush_ops_in_flight; break; } if (m_dirty_log_entries.empty()) { ldout(cct, 20) << "Nothing new to flush" << dendl; /* Do flush complete only when all flush ops are finished */ all_clean = !m_flush_ops_in_flight; if (!m_cache_state->clean && all_clean) { m_cache_state->clean = true; update_image_cache_state(); need_update_state = true; } break; } auto candidate = m_dirty_log_entries.front(); bool flushable = can_flush_entry(candidate); if (flushable) { entries_to_flush.push_back(candidate); flushed++; if (!has_write_entry) has_write_entry = candidate->is_write_entry(); m_dirty_log_entries.pop_front(); // To track candidate, we should add m_flush_ops_in_flight in here { if (!m_flush_ops_in_flight || (candidate->ram_entry.sync_gen_number < m_lowest_flushing_sync_gen)) { m_lowest_flushing_sync_gen = candidate->ram_entry.sync_gen_number; } m_flush_ops_in_flight += 1; /* For write same this is the bytes affected by the flush op, not the bytes transferred */ m_flush_bytes_in_flight += candidate->ram_entry.write_bytes; } } else { ldout(cct, 20) << "Next dirty entry isn't flushable yet" << dendl; break; } } construct_flush_entries(entries_to_flush, post_unlock, has_write_entry); } if (need_update_state) { std::unique_lock locker(m_lock); write_image_cache_state(locker); } if (all_clean) { /* All flushing complete, drain outside lock */ Contexts flush_contexts; { std::lock_guard locker(m_lock); flush_contexts.swap(m_flush_complete_contexts); } finish_contexts(m_image_ctx.cct, flush_contexts, 0); } } /* Returns true if the specified SyncPointLogEntry is considered flushed, and * the log will be updated to reflect this. */ template bool AbstractWriteLog::handle_flushed_sync_point(std::shared_ptr log_entry) { ceph_assert(ceph_mutex_is_locked_by_me(m_lock)); ceph_assert(log_entry); if ((log_entry->writes_flushed == log_entry->writes) && log_entry->completed && log_entry->prior_sync_point_flushed && log_entry->next_sync_point_entry) { ldout(m_image_ctx.cct, 20) << "All writes flushed up to sync point=" << *log_entry << dendl; log_entry->next_sync_point_entry->prior_sync_point_flushed = true; /* Don't move the flushed sync gen num backwards. */ if (m_flushed_sync_gen < log_entry->ram_entry.sync_gen_number) { m_flushed_sync_gen = log_entry->ram_entry.sync_gen_number; } m_async_op_tracker.start_op(); m_work_queue.queue(new LambdaContext( [this, next = std::move(log_entry->next_sync_point_entry)](int r) { bool handled_by_next; { std::lock_guard locker(m_lock); handled_by_next = handle_flushed_sync_point(std::move(next)); } if (!handled_by_next) { persist_last_flushed_sync_gen(); } m_async_op_tracker.finish_op(); })); return true; } return false; } template void AbstractWriteLog::sync_point_writer_flushed(std::shared_ptr log_entry) { ceph_assert(ceph_mutex_is_locked_by_me(m_lock)); ceph_assert(log_entry); log_entry->writes_flushed++; /* If this entry might be completely flushed, look closer */ if ((log_entry->writes_flushed == log_entry->writes) && log_entry->completed) { ldout(m_image_ctx.cct, 15) << "All writes flushed for sync point=" << *log_entry << dendl; handle_flushed_sync_point(log_entry); } } /* Make a new sync point and flush the previous during initialization, when there may or may * not be a previous sync point */ template void AbstractWriteLog::init_flush_new_sync_point(DeferredContexts &later) { ceph_assert(ceph_mutex_is_locked_by_me(m_lock)); ceph_assert(!m_initialized); /* Don't use this after init */ if (!m_current_sync_point) { /* First sync point since start */ new_sync_point(later); } else { flush_new_sync_point(nullptr, later); } } /** * Begin a new sync point */ template void AbstractWriteLog::new_sync_point(DeferredContexts &later) { CephContext *cct = m_image_ctx.cct; std::shared_ptr old_sync_point = m_current_sync_point; std::shared_ptr new_sync_point; ldout(cct, 20) << dendl; ceph_assert(ceph_mutex_is_locked_by_me(m_lock)); /* The first time this is called, if this is a newly created log, * this makes the first sync gen number we'll use 1. On the first * call for a re-opened log m_current_sync_gen will be the highest * gen number from all the sync point entries found in the re-opened * log, and this advances to the next sync gen number. */ ++m_current_sync_gen; new_sync_point = std::make_shared(m_current_sync_gen, cct); m_current_sync_point = new_sync_point; /* If this log has been re-opened, old_sync_point will initially be * nullptr, but m_current_sync_gen may not be zero. */ if (old_sync_point) { new_sync_point->setup_earlier_sync_point(old_sync_point, m_last_op_sequence_num); m_perfcounter->hinc(l_librbd_pwl_syncpoint_hist, old_sync_point->log_entry->writes, old_sync_point->log_entry->bytes); /* This sync point will acquire no more sub-ops. Activation needs * to acquire m_lock, so defer to later*/ later.add(new LambdaContext( [old_sync_point](int r) { old_sync_point->prior_persisted_gather_activate(); })); } new_sync_point->prior_persisted_gather_set_finisher(); if (old_sync_point) { ldout(cct,6) << "new sync point = [" << *m_current_sync_point << "], prior = [" << *old_sync_point << "]" << dendl; } else { ldout(cct,6) << "first sync point = [" << *m_current_sync_point << "]" << dendl; } } template void AbstractWriteLog::flush_new_sync_point(C_FlushRequestT *flush_req, DeferredContexts &later) { ceph_assert(ceph_mutex_is_locked_by_me(m_lock)); if (!flush_req) { m_async_null_flush_finish++; m_async_op_tracker.start_op(); Context *flush_ctx = new LambdaContext([this](int r) { m_async_null_flush_finish--; m_async_op_tracker.finish_op(); }); flush_req = make_flush_req(flush_ctx); flush_req->internal = true; } /* Add a new sync point. */ new_sync_point(later); std::shared_ptr to_append = m_current_sync_point->earlier_sync_point; ceph_assert(to_append); /* This flush request will append/persist the (now) previous sync point */ flush_req->to_append = to_append; /* When the m_sync_point_persist Gather completes this sync point can be * appended. The only sub for this Gather is the finisher Context for * m_prior_log_entries_persisted, which records the result of the Gather in * the sync point, and completes. TODO: Do we still need both of these * Gathers?*/ Context * ctx = new LambdaContext([this, flush_req](int r) { ldout(m_image_ctx.cct, 20) << "Flush req=" << flush_req << " sync point =" << flush_req->to_append << ". Ready to persist." << dendl; alloc_and_dispatch_io_req(flush_req); }); to_append->persist_gather_set_finisher(ctx); /* The m_sync_point_persist Gather has all the subs it will ever have, and * now has its finisher. If the sub is already complete, activation will * complete the Gather. The finisher will acquire m_lock, so we'll activate * this when we release m_lock.*/ later.add(new LambdaContext([to_append](int r) { to_append->persist_gather_activate(); })); /* The flush request completes when the sync point persists */ to_append->add_in_on_persisted_ctxs(flush_req); } template void AbstractWriteLog::flush_new_sync_point_if_needed(C_FlushRequestT *flush_req, DeferredContexts &later) { ceph_assert(ceph_mutex_is_locked_by_me(m_lock)); /* If there have been writes since the last sync point ... */ if (m_current_sync_point->log_entry->writes) { flush_new_sync_point(flush_req, later); } else { /* There have been no writes to the current sync point. */ if (m_current_sync_point->earlier_sync_point) { /* If previous sync point hasn't completed, complete this flush * with the earlier sync point. No alloc or dispatch needed. */ m_current_sync_point->earlier_sync_point->on_sync_point_persisted.push_back(flush_req); } else { /* The previous sync point has already completed and been * appended. The current sync point has no writes, so this flush * has nothing to wait for. This flush completes now. */ later.add(flush_req); } } } /* * RWL internal flush - will actually flush the RWL. * * User flushes should arrive at aio_flush(), and only flush prior * writes to all log replicas. * * Librbd internal flushes will arrive at flush(invalidate=false, * discard=false), and traverse the block guard to ensure in-flight writes are * flushed. */ template void AbstractWriteLog::flush_dirty_entries(Context *on_finish) { CephContext *cct = m_image_ctx.cct; bool all_clean; bool flushing; bool stop_flushing; { std::unique_lock locker(m_lock); flushing = (0 != m_flush_ops_in_flight); all_clean = m_dirty_log_entries.empty(); stop_flushing = (m_shutting_down); if (!m_cache_state->clean && all_clean && !flushing) { m_cache_state->clean = true; update_image_cache_state(); write_image_cache_state(locker); } } if (!flushing && (all_clean || stop_flushing)) { /* Complete without holding m_lock */ if (all_clean) { ldout(cct, 20) << "no dirty entries" << dendl; } else { ldout(cct, 5) << "flush during shutdown suppressed" << dendl; } on_finish->complete(0); } else { if (all_clean) { ldout(cct, 5) << "flush ops still in progress" << dendl; } else { ldout(cct, 20) << "dirty entries remain" << dendl; } std::lock_guard locker(m_lock); /* on_finish can't be completed yet */ m_flush_complete_contexts.push_back(new LambdaContext( [this, on_finish](int r) { flush_dirty_entries(on_finish); })); wake_up(); } } template void AbstractWriteLog::internal_flush(bool invalidate, Context *on_finish) { ldout(m_image_ctx.cct, 20) << "invalidate=" << invalidate << dendl; if (m_perfcounter) { if (invalidate) { m_perfcounter->inc(l_librbd_pwl_invalidate_cache, 1); } else { m_perfcounter->inc(l_librbd_pwl_internal_flush, 1); } } /* May be called even if initialization fails */ if (!m_initialized) { ldout(m_image_ctx.cct, 05) << "never initialized" << dendl; /* Deadlock if completed here */ m_image_ctx.op_work_queue->queue(on_finish, 0); return; } /* Flush/invalidate must pass through block guard to ensure all layers of * cache are consistently flush/invalidated. This ensures no in-flight write leaves * some layers with valid regions, which may later produce inconsistent read * results. */ GuardedRequestFunctionContext *guarded_ctx = new GuardedRequestFunctionContext( [this, on_finish, invalidate](GuardedRequestFunctionContext &guard_ctx) { DeferredContexts on_exit; ldout(m_image_ctx.cct, 20) << "cell=" << guard_ctx.cell << dendl; ceph_assert(guard_ctx.cell); Context *ctx = new LambdaContext( [this, cell=guard_ctx.cell, invalidate, on_finish](int r) { std::lock_guard locker(m_lock); m_invalidating = false; ldout(m_image_ctx.cct, 6) << "Done flush/invalidating (invalidate=" << invalidate << ")" << dendl; if (m_log_entries.size()) { ldout(m_image_ctx.cct, 1) << "m_log_entries.size()=" << m_log_entries.size() << ", front()=" << *m_log_entries.front() << dendl; } if (invalidate) { ceph_assert(m_log_entries.size() == 0); } ceph_assert(m_dirty_log_entries.size() == 0); m_image_ctx.op_work_queue->queue(on_finish, r); release_guarded_request(cell); }); ctx = new LambdaContext( [this, ctx, invalidate](int r) { Context *next_ctx = ctx; ldout(m_image_ctx.cct, 6) << "flush_dirty_entries finished" << dendl; if (r < 0) { /* Override on_finish status with this error */ next_ctx = new LambdaContext([r, ctx](int _r) { ctx->complete(r); }); } if (invalidate) { { std::lock_guard locker(m_lock); ceph_assert(m_dirty_log_entries.size() == 0); ceph_assert(!m_invalidating); ldout(m_image_ctx.cct, 6) << "Invalidating" << dendl; m_invalidating = true; } /* Discards all RWL entries */ while (retire_entries(MAX_ALLOC_PER_TRANSACTION)) { } next_ctx->complete(0); } else { { std::lock_guard locker(m_lock); ceph_assert(m_dirty_log_entries.size() == 0); ceph_assert(!m_invalidating); } m_image_writeback.aio_flush(io::FLUSH_SOURCE_WRITEBACK, next_ctx); } }); ctx = new LambdaContext( [this, ctx](int r) { flush_dirty_entries(ctx); }); std::lock_guard locker(m_lock); /* Even if we're throwing everything away, but we want the last entry to * be a sync point so we can cleanly resume. * * Also, the blockguard only guarantees the replication of this op * can't overlap with prior ops. It doesn't guarantee those are all * completed and eligible for flush & retire, which we require here. */ auto flush_req = make_flush_req(ctx); flush_new_sync_point_if_needed(flush_req, on_exit); }); detain_guarded_request(nullptr, guarded_ctx, true); } template void AbstractWriteLog::add_into_log_map(GenericWriteLogEntries &log_entries, C_BlockIORequestT *req) { req->copy_cache(); m_blocks_to_log_entries.add_log_entries(log_entries); } template bool AbstractWriteLog::can_retire_entry(std::shared_ptr log_entry) { CephContext *cct = m_image_ctx.cct; ldout(cct, 20) << dendl; ceph_assert(ceph_mutex_is_locked_by_me(m_lock)); ceph_assert(log_entry); return log_entry->can_retire(); } template void AbstractWriteLog::check_image_cache_state_clean() { ceph_assert(m_deferred_ios.empty()); ceph_assert(m_ops_to_append.empty()); ceph_assert(m_async_flush_ops == 0); ceph_assert(m_async_append_ops == 0); ceph_assert(m_dirty_log_entries.empty()); ceph_assert(m_ops_to_flush.empty()); ceph_assert(m_flush_ops_in_flight == 0); ceph_assert(m_flush_bytes_in_flight == 0); ceph_assert(m_bytes_dirty == 0); ceph_assert(m_work_queue.empty()); } } // namespace pwl } // namespace cache } // namespace librbd template class librbd::cache::pwl::AbstractWriteLog;