summaryrefslogtreecommitdiffstats
path: root/src/librbd/cache/pwl/AbstractWriteLog.cc
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--src/librbd/cache/pwl/AbstractWriteLog.cc2187
1 files changed, 2187 insertions, 0 deletions
diff --git a/src/librbd/cache/pwl/AbstractWriteLog.cc b/src/librbd/cache/pwl/AbstractWriteLog.cc
new file mode 100644
index 000000000..1e784f6b5
--- /dev/null
+++ b/src/librbd/cache/pwl/AbstractWriteLog.cc
@@ -0,0 +1,2187 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include "AbstractWriteLog.h"
+#include "include/buffer.h"
+#include "include/Context.h"
+#include "include/ceph_assert.h"
+#include "common/deleter.h"
+#include "common/dout.h"
+#include "common/environment.h"
+#include "common/errno.h"
+#include "common/hostname.h"
+#include "common/WorkQueue.h"
+#include "common/Timer.h"
+#include "common/perf_counters.h"
+#include "librbd/ImageCtx.h"
+#include "librbd/asio/ContextWQ.h"
+#include "librbd/cache/pwl/ImageCacheState.h"
+#include "librbd/cache/pwl/LogEntry.h"
+#include "librbd/plugin/Api.h"
+#include <map>
+#include <vector>
+
+#undef dout_subsys
+#define dout_subsys ceph_subsys_rbd_pwl
+#undef dout_prefix
+#define dout_prefix *_dout << "librbd::cache::pwl::AbstractWriteLog: " << this \
+ << " " << __func__ << ": "
+
+namespace librbd {
+namespace cache {
+namespace pwl {
+
+using namespace std;
+using namespace librbd::cache::pwl;
+
+typedef AbstractWriteLog<ImageCtx>::Extent Extent;
+typedef AbstractWriteLog<ImageCtx>::Extents Extents;
+
+template <typename I>
+AbstractWriteLog<I>::AbstractWriteLog(
+ I &image_ctx, librbd::cache::pwl::ImageCacheState<I>* cache_state,
+ Builder<This> *builder, cache::ImageWritebackInterface& image_writeback,
+ plugin::Api<I>& plugin_api)
+ : m_builder(builder),
+ m_write_log_guard(image_ctx.cct),
+ m_flush_guard(image_ctx.cct),
+ m_flush_guard_lock(ceph::make_mutex(pwl::unique_lock_name(
+ "librbd::cache::pwl::AbstractWriteLog::m_flush_guard_lock", this))),
+ m_deferred_dispatch_lock(ceph::make_mutex(pwl::unique_lock_name(
+ "librbd::cache::pwl::AbstractWriteLog::m_deferred_dispatch_lock", this))),
+ m_blockguard_lock(ceph::make_mutex(pwl::unique_lock_name(
+ "librbd::cache::pwl::AbstractWriteLog::m_blockguard_lock", this))),
+ m_thread_pool(
+ image_ctx.cct, "librbd::cache::pwl::AbstractWriteLog::thread_pool",
+ "tp_pwl", 4, ""),
+ m_cache_state(cache_state),
+ m_image_ctx(image_ctx),
+ m_log_pool_size(DEFAULT_POOL_SIZE),
+ m_image_writeback(image_writeback),
+ m_plugin_api(plugin_api),
+ m_log_retire_lock(ceph::make_mutex(pwl::unique_lock_name(
+ "librbd::cache::pwl::AbstractWriteLog::m_log_retire_lock", this))),
+ m_entry_reader_lock("librbd::cache::pwl::AbstractWriteLog::m_entry_reader_lock"),
+ m_log_append_lock(ceph::make_mutex(pwl::unique_lock_name(
+ "librbd::cache::pwl::AbstractWriteLog::m_log_append_lock", this))),
+ m_lock(ceph::make_mutex(pwl::unique_lock_name(
+ "librbd::cache::pwl::AbstractWriteLog::m_lock", this))),
+ m_blocks_to_log_entries(image_ctx.cct),
+ m_work_queue("librbd::cache::pwl::ReplicatedWriteLog::work_queue",
+ ceph::make_timespan(
+ image_ctx.config.template get_val<uint64_t>(
+ "rbd_op_thread_timeout")),
+ &m_thread_pool)
+{
+ CephContext *cct = m_image_ctx.cct;
+ m_plugin_api.get_image_timer_instance(cct, &m_timer, &m_timer_lock);
+}
+
+template <typename I>
+AbstractWriteLog<I>::~AbstractWriteLog() {
+ ldout(m_image_ctx.cct, 15) << "enter" << dendl;
+ {
+ std::lock_guard timer_locker(*m_timer_lock);
+ std::lock_guard locker(m_lock);
+ m_timer->cancel_event(m_timer_ctx);
+ m_thread_pool.stop();
+ ceph_assert(m_deferred_ios.size() == 0);
+ ceph_assert(m_ops_to_flush.size() == 0);
+ ceph_assert(m_ops_to_append.size() == 0);
+ ceph_assert(m_flush_ops_in_flight == 0);
+
+ delete m_cache_state;
+ m_cache_state = nullptr;
+ }
+ ldout(m_image_ctx.cct, 15) << "exit" << dendl;
+}
+
+template <typename I>
+void AbstractWriteLog<I>::perf_start(std::string name) {
+ PerfCountersBuilder plb(m_image_ctx.cct, name, l_librbd_pwl_first,
+ l_librbd_pwl_last);
+
+ // Latency axis configuration for op histograms, values are in nanoseconds
+ PerfHistogramCommon::axis_config_d op_hist_x_axis_config{
+ "Latency (nsec)",
+ PerfHistogramCommon::SCALE_LOG2, ///< Latency in logarithmic scale
+ 0, ///< Start at 0
+ 5000, ///< Quantization unit is 5usec
+ 16, ///< Ranges into the mS
+ };
+
+ // Syncpoint logentry number x-axis configuration for op histograms
+ PerfHistogramCommon::axis_config_d sp_logentry_number_config{
+ "logentry number",
+ PerfHistogramCommon::SCALE_LINEAR, // log entry number in linear scale
+ 0, // Start at 0
+ 1, // Quantization unit is 1
+ 260, // Up to 260 > (MAX_WRITES_PER_SYNC_POINT)
+ };
+
+ // Syncpoint bytes number y-axis configuration for op histogram
+ PerfHistogramCommon::axis_config_d sp_bytes_number_config{
+ "Number of SyncPoint",
+ PerfHistogramCommon::SCALE_LOG2, // Request size in logarithmic scale
+ 0, // Start at 0
+ 512, // Quantization unit is 512
+ 17, // Writes up to 8M >= MAX_BYTES_PER_SYNC_POINT
+ };
+
+ // Op size axis configuration for op histogram y axis, values are in bytes
+ PerfHistogramCommon::axis_config_d op_hist_y_axis_config{
+ "Request size (bytes)",
+ PerfHistogramCommon::SCALE_LOG2, ///< Request size in logarithmic scale
+ 0, ///< Start at 0
+ 512, ///< Quantization unit is 512 bytes
+ 16, ///< Writes up to >32k
+ };
+
+ // Num items configuration for op histogram y axis, values are in items
+ PerfHistogramCommon::axis_config_d op_hist_y_axis_count_config{
+ "Number of items",
+ PerfHistogramCommon::SCALE_LINEAR, ///< Request size in linear scale
+ 0, ///< Start at 0
+ 1, ///< Quantization unit is 1
+ 32, ///< Writes up to >32k
+ };
+
+ plb.add_u64_counter(l_librbd_pwl_rd_req, "rd", "Reads");
+ plb.add_u64_counter(l_librbd_pwl_rd_bytes, "rd_bytes", "Data size in reads");
+ plb.add_time_avg(l_librbd_pwl_rd_latency, "rd_latency", "Latency of reads");
+
+ plb.add_u64_counter(l_librbd_pwl_rd_hit_req, "hit_rd", "Reads completely hitting RWL");
+ plb.add_u64_counter(l_librbd_pwl_rd_hit_bytes, "rd_hit_bytes", "Bytes read from RWL");
+ plb.add_time_avg(l_librbd_pwl_rd_hit_latency, "hit_rd_latency", "Latency of read hits");
+
+ plb.add_u64_counter(l_librbd_pwl_rd_part_hit_req, "part_hit_rd", "reads partially hitting RWL");
+
+ plb.add_u64_counter_histogram(
+ l_librbd_pwl_syncpoint_hist, "syncpoint_logentry_bytes_histogram",
+ sp_logentry_number_config, sp_bytes_number_config,
+ "Histogram of syncpoint's logentry numbers vs bytes number");
+
+ plb.add_u64_counter(l_librbd_pwl_wr_req, "wr", "Writes");
+ plb.add_u64_counter(l_librbd_pwl_wr_bytes, "wr_bytes", "Data size in writes");
+ plb.add_u64_counter(l_librbd_pwl_wr_req_def, "wr_def", "Writes deferred for resources");
+ plb.add_u64_counter(l_librbd_pwl_wr_req_def_lanes, "wr_def_lanes", "Writes deferred for lanes");
+ plb.add_u64_counter(l_librbd_pwl_wr_req_def_log, "wr_def_log", "Writes deferred for log entries");
+ plb.add_u64_counter(l_librbd_pwl_wr_req_def_buf, "wr_def_buf", "Writes deferred for buffers");
+ plb.add_u64_counter(l_librbd_pwl_wr_req_overlap, "wr_overlap", "Writes overlapping with prior in-progress writes");
+ plb.add_u64_counter(l_librbd_pwl_wr_req_queued, "wr_q_barrier", "Writes queued for prior barriers (aio_flush)");
+
+ plb.add_u64_counter(l_librbd_pwl_log_ops, "log_ops", "Log appends");
+ plb.add_u64_avg(l_librbd_pwl_log_op_bytes, "log_op_bytes", "Average log append bytes");
+
+ plb.add_time_avg(
+ l_librbd_pwl_req_arr_to_all_t, "req_arr_to_all_t",
+ "Average arrival to allocation time (time deferred for overlap)");
+ plb.add_time_avg(
+ l_librbd_pwl_req_arr_to_dis_t, "req_arr_to_dis_t",
+ "Average arrival to dispatch time (includes time deferred for overlaps and allocation)");
+ plb.add_time_avg(
+ l_librbd_pwl_req_all_to_dis_t, "req_all_to_dis_t",
+ "Average allocation to dispatch time (time deferred for log resources)");
+ plb.add_time_avg(
+ l_librbd_pwl_wr_latency, "wr_latency",
+ "Latency of writes (persistent completion)");
+ plb.add_u64_counter_histogram(
+ l_librbd_pwl_wr_latency_hist, "wr_latency_bytes_histogram",
+ op_hist_x_axis_config, op_hist_y_axis_config,
+ "Histogram of write request latency (nanoseconds) vs. bytes written");
+ plb.add_time_avg(
+ l_librbd_pwl_wr_caller_latency, "caller_wr_latency",
+ "Latency of write completion to caller");
+ plb.add_time_avg(
+ l_librbd_pwl_nowait_req_arr_to_all_t, "req_arr_to_all_nw_t",
+ "Average arrival to allocation time (time deferred for overlap)");
+ plb.add_time_avg(
+ l_librbd_pwl_nowait_req_arr_to_dis_t, "req_arr_to_dis_nw_t",
+ "Average arrival to dispatch time (includes time deferred for overlaps and allocation)");
+ plb.add_time_avg(
+ l_librbd_pwl_nowait_req_all_to_dis_t, "req_all_to_dis_nw_t",
+ "Average allocation to dispatch time (time deferred for log resources)");
+ plb.add_time_avg(
+ l_librbd_pwl_nowait_wr_latency, "wr_latency_nw",
+ "Latency of writes (persistent completion) not deferred for free space");
+ plb.add_u64_counter_histogram(
+ l_librbd_pwl_nowait_wr_latency_hist, "wr_latency_nw_bytes_histogram",
+ op_hist_x_axis_config, op_hist_y_axis_config,
+ "Histogram of write request latency (nanoseconds) vs. bytes written for writes not deferred for free space");
+ plb.add_time_avg(
+ l_librbd_pwl_nowait_wr_caller_latency, "caller_wr_latency_nw",
+ "Latency of write completion to callerfor writes not deferred for free space");
+ plb.add_time_avg(l_librbd_pwl_log_op_alloc_t, "op_alloc_t", "Average buffer pmemobj_reserve() time");
+ plb.add_u64_counter_histogram(
+ l_librbd_pwl_log_op_alloc_t_hist, "op_alloc_t_bytes_histogram",
+ op_hist_x_axis_config, op_hist_y_axis_config,
+ "Histogram of buffer pmemobj_reserve() time (nanoseconds) vs. bytes written");
+ plb.add_time_avg(l_librbd_pwl_log_op_dis_to_buf_t, "op_dis_to_buf_t", "Average dispatch to buffer persist time");
+ plb.add_time_avg(l_librbd_pwl_log_op_dis_to_app_t, "op_dis_to_app_t", "Average dispatch to log append time");
+ plb.add_time_avg(l_librbd_pwl_log_op_dis_to_cmp_t, "op_dis_to_cmp_t", "Average dispatch to persist completion time");
+ plb.add_u64_counter_histogram(
+ l_librbd_pwl_log_op_dis_to_cmp_t_hist, "op_dis_to_cmp_t_bytes_histogram",
+ op_hist_x_axis_config, op_hist_y_axis_config,
+ "Histogram of op dispatch to persist complete time (nanoseconds) vs. bytes written");
+
+ plb.add_time_avg(
+ l_librbd_pwl_log_op_buf_to_app_t, "op_buf_to_app_t",
+ "Average buffer persist to log append time (write data persist/replicate + wait for append time)");
+ plb.add_time_avg(
+ l_librbd_pwl_log_op_buf_to_bufc_t, "op_buf_to_bufc_t",
+ "Average buffer persist time (write data persist/replicate time)");
+ plb.add_u64_counter_histogram(
+ l_librbd_pwl_log_op_buf_to_bufc_t_hist, "op_buf_to_bufc_t_bytes_histogram",
+ op_hist_x_axis_config, op_hist_y_axis_config,
+ "Histogram of write buffer persist time (nanoseconds) vs. bytes written");
+ plb.add_time_avg(
+ l_librbd_pwl_log_op_app_to_cmp_t, "op_app_to_cmp_t",
+ "Average log append to persist complete time (log entry append/replicate + wait for complete time)");
+ plb.add_time_avg(
+ l_librbd_pwl_log_op_app_to_appc_t, "op_app_to_appc_t",
+ "Average log append to persist complete time (log entry append/replicate time)");
+ plb.add_u64_counter_histogram(
+ l_librbd_pwl_log_op_app_to_appc_t_hist, "op_app_to_appc_t_bytes_histogram",
+ op_hist_x_axis_config, op_hist_y_axis_config,
+ "Histogram of log append persist time (nanoseconds) (vs. op bytes)");
+
+ plb.add_u64_counter(l_librbd_pwl_discard, "discard", "Discards");
+ plb.add_u64_counter(l_librbd_pwl_discard_bytes, "discard_bytes", "Bytes discarded");
+ plb.add_time_avg(l_librbd_pwl_discard_latency, "discard_lat", "Discard latency");
+
+ plb.add_u64_counter(l_librbd_pwl_aio_flush, "aio_flush", "AIO flush (flush to RWL)");
+ plb.add_u64_counter(l_librbd_pwl_aio_flush_def, "aio_flush_def", "AIO flushes deferred for resources");
+ plb.add_time_avg(l_librbd_pwl_aio_flush_latency, "aio_flush_lat", "AIO flush latency");
+
+ plb.add_u64_counter(l_librbd_pwl_ws,"ws", "Write Sames");
+ plb.add_u64_counter(l_librbd_pwl_ws_bytes, "ws_bytes", "Write Same bytes to image");
+ plb.add_time_avg(l_librbd_pwl_ws_latency, "ws_lat", "Write Same latency");
+
+ plb.add_u64_counter(l_librbd_pwl_cmp, "cmp", "Compare and Write requests");
+ plb.add_u64_counter(l_librbd_pwl_cmp_bytes, "cmp_bytes", "Compare and Write bytes compared/written");
+ plb.add_time_avg(l_librbd_pwl_cmp_latency, "cmp_lat", "Compare and Write latecy");
+ plb.add_u64_counter(l_librbd_pwl_cmp_fails, "cmp_fails", "Compare and Write compare fails");
+
+ plb.add_u64_counter(l_librbd_pwl_internal_flush, "internal_flush", "Flush RWL (write back to OSD)");
+ plb.add_time_avg(l_librbd_pwl_writeback_latency, "writeback_lat", "write back to OSD latency");
+ plb.add_u64_counter(l_librbd_pwl_invalidate_cache, "invalidate", "Invalidate RWL");
+ plb.add_u64_counter(l_librbd_pwl_invalidate_discard_cache, "discard", "Discard and invalidate RWL");
+
+ plb.add_time_avg(l_librbd_pwl_append_tx_t, "append_tx_lat", "Log append transaction latency");
+ plb.add_u64_counter_histogram(
+ l_librbd_pwl_append_tx_t_hist, "append_tx_lat_histogram",
+ op_hist_x_axis_config, op_hist_y_axis_count_config,
+ "Histogram of log append transaction time (nanoseconds) vs. entries appended");
+ plb.add_time_avg(l_librbd_pwl_retire_tx_t, "retire_tx_lat", "Log retire transaction latency");
+ plb.add_u64_counter_histogram(
+ l_librbd_pwl_retire_tx_t_hist, "retire_tx_lat_histogram",
+ op_hist_x_axis_config, op_hist_y_axis_count_config,
+ "Histogram of log retire transaction time (nanoseconds) vs. entries retired");
+
+ m_perfcounter = plb.create_perf_counters();
+ m_image_ctx.cct->get_perfcounters_collection()->add(m_perfcounter);
+}
+
+template <typename I>
+void AbstractWriteLog<I>::perf_stop() {
+ ceph_assert(m_perfcounter);
+ m_image_ctx.cct->get_perfcounters_collection()->remove(m_perfcounter);
+ delete m_perfcounter;
+}
+
+template <typename I>
+void AbstractWriteLog<I>::log_perf() {
+ bufferlist bl;
+ Formatter *f = Formatter::create("json-pretty");
+ bl.append("Perf dump follows\n--- Begin perf dump ---\n");
+ bl.append("{\n");
+ stringstream ss;
+ utime_t now = ceph_clock_now();
+ ss << "\"test_time\": \"" << now << "\",";
+ ss << "\"image\": \"" << m_image_ctx.name << "\",";
+ bl.append(ss);
+ bl.append("\"stats\": ");
+ m_image_ctx.cct->get_perfcounters_collection()->dump_formatted(f, false, false);
+ f->flush(bl);
+ bl.append(",\n\"histograms\": ");
+ m_image_ctx.cct->get_perfcounters_collection()->dump_formatted_histograms(f, 0);
+ f->flush(bl);
+ delete f;
+ bl.append("}\n--- End perf dump ---\n");
+ bl.append('\0');
+ ldout(m_image_ctx.cct, 1) << bl.c_str() << dendl;
+}
+
+template <typename I>
+void AbstractWriteLog<I>::periodic_stats() {
+ std::unique_lock locker(m_lock);
+ ldout(m_image_ctx.cct, 5) << "STATS: m_log_entries=" << m_log_entries.size()
+ << ", m_dirty_log_entries=" << m_dirty_log_entries.size()
+ << ", m_free_log_entries=" << m_free_log_entries
+ << ", m_bytes_allocated=" << m_bytes_allocated
+ << ", m_bytes_cached=" << m_bytes_cached
+ << ", m_bytes_dirty=" << m_bytes_dirty
+ << ", bytes available=" << m_bytes_allocated_cap - m_bytes_allocated
+ << ", m_first_valid_entry=" << m_first_valid_entry
+ << ", m_first_free_entry=" << m_first_free_entry
+ << ", m_current_sync_gen=" << m_current_sync_gen
+ << ", m_flushed_sync_gen=" << m_flushed_sync_gen
+ << dendl;
+
+ update_image_cache_state();
+ write_image_cache_state(locker);
+}
+
+template <typename I>
+void AbstractWriteLog<I>::arm_periodic_stats() {
+ ceph_assert(ceph_mutex_is_locked(*m_timer_lock));
+ m_timer_ctx = new LambdaContext([this](int r) {
+ /* m_timer_lock is held */
+ periodic_stats();
+ arm_periodic_stats();
+ });
+ m_timer->add_event_after(LOG_STATS_INTERVAL_SECONDS, m_timer_ctx);
+}
+
+template <typename I>
+void AbstractWriteLog<I>::update_entries(std::shared_ptr<GenericLogEntry> *log_entry,
+ WriteLogCacheEntry *cache_entry, std::map<uint64_t, bool> &missing_sync_points,
+ std::map<uint64_t, std::shared_ptr<SyncPointLogEntry>> &sync_point_entries,
+ uint64_t entry_index) {
+ bool writer = cache_entry->is_writer();
+ if (cache_entry->is_sync_point()) {
+ ldout(m_image_ctx.cct, 20) << "Entry " << entry_index
+ << " is a sync point. cache_entry=[" << *cache_entry << "]" << dendl;
+ auto sync_point_entry = std::make_shared<SyncPointLogEntry>(cache_entry->sync_gen_number);
+ *log_entry = sync_point_entry;
+ sync_point_entries[cache_entry->sync_gen_number] = sync_point_entry;
+ missing_sync_points.erase(cache_entry->sync_gen_number);
+ m_current_sync_gen = cache_entry->sync_gen_number;
+ } else if (cache_entry->is_write()) {
+ ldout(m_image_ctx.cct, 20) << "Entry " << entry_index
+ << " is a write. cache_entry=[" << *cache_entry << "]" << dendl;
+ auto write_entry =
+ m_builder->create_write_log_entry(nullptr, cache_entry->image_offset_bytes, cache_entry->write_bytes);
+ write_data_to_buffer(write_entry, cache_entry);
+ *log_entry = write_entry;
+ } else if (cache_entry->is_writesame()) {
+ ldout(m_image_ctx.cct, 20) << "Entry " << entry_index
+ << " is a write same. cache_entry=[" << *cache_entry << "]" << dendl;
+ auto ws_entry =
+ m_builder->create_writesame_log_entry(nullptr, cache_entry->image_offset_bytes,
+ cache_entry->write_bytes, cache_entry->ws_datalen);
+ write_data_to_buffer(ws_entry, cache_entry);
+ *log_entry = ws_entry;
+ } else if (cache_entry->is_discard()) {
+ ldout(m_image_ctx.cct, 20) << "Entry " << entry_index
+ << " is a discard. cache_entry=[" << *cache_entry << "]" << dendl;
+ auto discard_entry =
+ std::make_shared<DiscardLogEntry>(nullptr, cache_entry->image_offset_bytes, cache_entry->write_bytes,
+ m_discard_granularity_bytes);
+ *log_entry = discard_entry;
+ } else {
+ lderr(m_image_ctx.cct) << "Unexpected entry type in entry " << entry_index
+ << ", cache_entry=[" << *cache_entry << "]" << dendl;
+ }
+
+ if (writer) {
+ ldout(m_image_ctx.cct, 20) << "Entry " << entry_index
+ << " writes. cache_entry=[" << *cache_entry << "]" << dendl;
+ if (!sync_point_entries[cache_entry->sync_gen_number]) {
+ missing_sync_points[cache_entry->sync_gen_number] = true;
+ }
+ }
+}
+
+template <typename I>
+void AbstractWriteLog<I>::update_sync_points(std::map<uint64_t, bool> &missing_sync_points,
+ std::map<uint64_t, std::shared_ptr<SyncPointLogEntry>> &sync_point_entries,
+ DeferredContexts &later) {
+ /* Create missing sync points. These must not be appended until the
+ * entry reload is complete and the write map is up to
+ * date. Currently this is handled by the deferred contexts object
+ * passed to new_sync_point(). These contexts won't be completed
+ * until this function returns. */
+ for (auto &kv : missing_sync_points) {
+ ldout(m_image_ctx.cct, 5) << "Adding sync point " << kv.first << dendl;
+ if (0 == m_current_sync_gen) {
+ /* The unlikely case where the log contains writing entries, but no sync
+ * points (e.g. because they were all retired) */
+ m_current_sync_gen = kv.first-1;
+ }
+ ceph_assert(kv.first == m_current_sync_gen+1);
+ init_flush_new_sync_point(later);
+ ceph_assert(kv.first == m_current_sync_gen);
+ sync_point_entries[kv.first] = m_current_sync_point->log_entry;
+ }
+
+ /*
+ * Iterate over the log entries again (this time via the global
+ * entries list), connecting write entries to their sync points and
+ * updating the sync point stats.
+ *
+ * Add writes to the write log map.
+ */
+ std::shared_ptr<SyncPointLogEntry> previous_sync_point_entry = nullptr;
+ for (auto &log_entry : m_log_entries) {
+ if ((log_entry->write_bytes() > 0) || (log_entry->bytes_dirty() > 0)) {
+ /* This entry is one of the types that write */
+ auto gen_write_entry = static_pointer_cast<GenericWriteLogEntry>(log_entry);
+ if (gen_write_entry) {
+ auto sync_point_entry = sync_point_entries[gen_write_entry->ram_entry.sync_gen_number];
+ if (!sync_point_entry) {
+ lderr(m_image_ctx.cct) << "Sync point missing for entry=[" << *gen_write_entry << "]" << dendl;
+ ceph_assert(false);
+ } else {
+ gen_write_entry->sync_point_entry = sync_point_entry;
+ sync_point_entry->writes++;
+ sync_point_entry->bytes += gen_write_entry->ram_entry.write_bytes;
+ sync_point_entry->writes_completed++;
+ m_blocks_to_log_entries.add_log_entry(gen_write_entry);
+ /* This entry is only dirty if its sync gen number is > the flushed
+ * sync gen number from the root object. */
+ if (gen_write_entry->ram_entry.sync_gen_number > m_flushed_sync_gen) {
+ m_dirty_log_entries.push_back(log_entry);
+ m_bytes_dirty += gen_write_entry->bytes_dirty();
+ } else {
+ gen_write_entry->set_flushed(true);
+ sync_point_entry->writes_flushed++;
+ }
+
+ /* calc m_bytes_allocated & m_bytes_cached */
+ inc_allocated_cached_bytes(log_entry);
+ }
+ }
+ } else {
+ /* This entry is sync point entry */
+ auto sync_point_entry = static_pointer_cast<SyncPointLogEntry>(log_entry);
+ if (sync_point_entry) {
+ if (previous_sync_point_entry) {
+ previous_sync_point_entry->next_sync_point_entry = sync_point_entry;
+ if (previous_sync_point_entry->ram_entry.sync_gen_number > m_flushed_sync_gen) {
+ sync_point_entry->prior_sync_point_flushed = false;
+ ceph_assert(!previous_sync_point_entry->prior_sync_point_flushed ||
+ (0 == previous_sync_point_entry->writes) ||
+ (previous_sync_point_entry->writes >= previous_sync_point_entry->writes_flushed));
+ } else {
+ sync_point_entry->prior_sync_point_flushed = true;
+ ceph_assert(previous_sync_point_entry->prior_sync_point_flushed);
+ ceph_assert(previous_sync_point_entry->writes == previous_sync_point_entry->writes_flushed);
+ }
+ } else {
+ /* There are no previous sync points, so we'll consider them flushed */
+ sync_point_entry->prior_sync_point_flushed = true;
+ }
+ previous_sync_point_entry = sync_point_entry;
+ ldout(m_image_ctx.cct, 10) << "Loaded to sync point=[" << *sync_point_entry << dendl;
+ }
+ }
+ }
+ if (0 == m_current_sync_gen) {
+ /* If a re-opened log was completely flushed, we'll have found no sync point entries here,
+ * and not advanced m_current_sync_gen. Here we ensure it starts past the last flushed sync
+ * point recorded in the log. */
+ m_current_sync_gen = m_flushed_sync_gen;
+ }
+}
+
+template <typename I>
+void AbstractWriteLog<I>::pwl_init(Context *on_finish, DeferredContexts &later) {
+ CephContext *cct = m_image_ctx.cct;
+ ldout(cct, 20) << dendl;
+ ceph_assert(m_cache_state);
+ std::lock_guard locker(m_lock);
+ ceph_assert(!m_initialized);
+ ldout(cct,5) << "image name: " << m_image_ctx.name << " id: " << m_image_ctx.id << dendl;
+
+ if (!m_cache_state->present) {
+ m_cache_state->host = ceph_get_short_hostname();
+ m_cache_state->size = m_image_ctx.config.template get_val<uint64_t>(
+ "rbd_persistent_cache_size");
+
+ string path = m_image_ctx.config.template get_val<string>(
+ "rbd_persistent_cache_path");
+ std::string pool_name = m_image_ctx.md_ctx.get_pool_name();
+ m_cache_state->path = path + "/rbd-pwl." + pool_name + "." + m_image_ctx.id + ".pool";
+ }
+
+ ldout(cct,5) << "pwl_size: " << m_cache_state->size << dendl;
+ ldout(cct,5) << "pwl_path: " << m_cache_state->path << dendl;
+
+ m_log_pool_name = m_cache_state->path;
+ m_log_pool_size = max(m_cache_state->size, MIN_POOL_SIZE);
+ m_log_pool_size = p2align(m_log_pool_size, POOL_SIZE_ALIGN);
+ ldout(cct, 5) << "pool " << m_log_pool_name << " size " << m_log_pool_size
+ << " (adjusted from " << m_cache_state->size << ")" << dendl;
+
+ if ((!m_cache_state->present) &&
+ (access(m_log_pool_name.c_str(), F_OK) == 0)) {
+ ldout(cct, 5) << "There's an existing pool file " << m_log_pool_name
+ << ", While there's no cache in the image metatata." << dendl;
+ if (remove(m_log_pool_name.c_str()) != 0) {
+ lderr(cct) << "failed to remove the pool file " << m_log_pool_name
+ << dendl;
+ on_finish->complete(-errno);
+ return;
+ } else {
+ ldout(cct, 5) << "Removed the existing pool file." << dendl;
+ }
+ } else if ((m_cache_state->present) &&
+ (access(m_log_pool_name.c_str(), F_OK) != 0)) {
+ lderr(cct) << "can't find the existed pool file: " << m_log_pool_name
+ << ". error: " << cpp_strerror(-errno) << dendl;
+ on_finish->complete(-errno);
+ return;
+ }
+
+ bool succeeded = initialize_pool(on_finish, later);
+ if (!succeeded) {
+ return ;
+ }
+
+ ldout(cct,1) << "pool " << m_log_pool_name << " has " << m_total_log_entries
+ << " log entries, " << m_free_log_entries << " of which are free."
+ << " first_valid=" << m_first_valid_entry
+ << ", first_free=" << m_first_free_entry
+ << ", flushed_sync_gen=" << m_flushed_sync_gen
+ << ", m_current_sync_gen=" << m_current_sync_gen << dendl;
+ if (m_first_free_entry == m_first_valid_entry) {
+ ldout(cct,1) << "write log is empty" << dendl;
+ m_cache_state->empty = true;
+ }
+
+ /* Start the sync point following the last one seen in the
+ * log. Flush the last sync point created during the loading of the
+ * existing log entries. */
+ init_flush_new_sync_point(later);
+ ldout(cct,20) << "new sync point = [" << m_current_sync_point << "]" << dendl;
+
+ m_initialized = true;
+ // Start the thread
+ m_thread_pool.start();
+
+ /* Do these after we drop lock */
+ later.add(new LambdaContext([this](int r) {
+ /* Log stats for the first time */
+ periodic_stats();
+ /* Arm periodic stats logging for the first time */
+ std::lock_guard timer_locker(*m_timer_lock);
+ arm_periodic_stats();
+ }));
+ m_image_ctx.op_work_queue->queue(on_finish, 0);
+}
+
+template <typename I>
+void AbstractWriteLog<I>::write_image_cache_state(std::unique_lock<ceph::mutex>& locker) {
+ using klass = AbstractWriteLog<I>;
+ Context *ctx = util::create_context_callback<
+ klass, &klass::handle_write_image_cache_state>(this);
+ m_cache_state->write_image_cache_state(locker, ctx);
+}
+
+template <typename I>
+void AbstractWriteLog<I>::update_image_cache_state() {
+ ldout(m_image_ctx.cct, 10) << dendl;
+
+ ceph_assert(ceph_mutex_is_locked_by_me(m_lock));
+ m_cache_state->allocated_bytes = m_bytes_allocated;
+ m_cache_state->cached_bytes = m_bytes_cached;
+ m_cache_state->dirty_bytes = m_bytes_dirty;
+ m_cache_state->free_bytes = m_bytes_allocated_cap - m_bytes_allocated;
+ m_cache_state->hits_full = m_perfcounter->get(l_librbd_pwl_rd_hit_req);
+ m_cache_state->hits_partial = m_perfcounter->get(l_librbd_pwl_rd_part_hit_req);
+ m_cache_state->misses = m_perfcounter->get(l_librbd_pwl_rd_req) -
+ m_cache_state->hits_full - m_cache_state->hits_partial;
+ m_cache_state->hit_bytes = m_perfcounter->get(l_librbd_pwl_rd_hit_bytes);
+ m_cache_state->miss_bytes = m_perfcounter->get(l_librbd_pwl_rd_bytes) -
+ m_cache_state->hit_bytes;
+}
+
+template <typename I>
+void AbstractWriteLog<I>::handle_write_image_cache_state(int r) {
+ CephContext *cct = m_image_ctx.cct;
+ ldout(cct, 10) << "r=" << r << dendl;
+
+ if (r < 0) {
+ lderr(cct) << "failed to update image cache state: " << cpp_strerror(r)
+ << dendl;
+ return;
+ }
+}
+
+template <typename I>
+void AbstractWriteLog<I>::init(Context *on_finish) {
+ CephContext *cct = m_image_ctx.cct;
+ ldout(cct, 20) << dendl;
+ auto pname = std::string("librbd-pwl-") + m_image_ctx.id +
+ std::string("-") + m_image_ctx.md_ctx.get_pool_name() +
+ std::string("-") + m_image_ctx.name;
+ perf_start(pname);
+
+ ceph_assert(!m_initialized);
+
+ Context *ctx = new LambdaContext(
+ [this, on_finish](int r) {
+ if (r >= 0) {
+ std::unique_lock locker(m_lock);
+ update_image_cache_state();
+ m_cache_state->write_image_cache_state(locker, on_finish);
+ } else {
+ on_finish->complete(r);
+ }
+ });
+
+ DeferredContexts later;
+ pwl_init(ctx, later);
+}
+
+template <typename I>
+void AbstractWriteLog<I>::shut_down(Context *on_finish) {
+ CephContext *cct = m_image_ctx.cct;
+ ldout(cct, 20) << dendl;
+
+ ldout(cct,5) << "image name: " << m_image_ctx.name << " id: " << m_image_ctx.id << dendl;
+
+ Context *ctx = new LambdaContext(
+ [this, on_finish](int r) {
+ if (m_perfcounter) {
+ perf_stop();
+ }
+ ldout(m_image_ctx.cct, 6) << "shutdown complete" << dendl;
+ m_image_ctx.op_work_queue->queue(on_finish, r);
+ });
+ ctx = new LambdaContext(
+ [this, ctx](int r) {
+ ldout(m_image_ctx.cct, 6) << "image cache cleaned" << dendl;
+ Context *next_ctx = override_ctx(r, ctx);
+ periodic_stats();
+
+ std::unique_lock locker(m_lock);
+ check_image_cache_state_clean();
+ m_wake_up_enabled = false;
+ m_log_entries.clear();
+ m_cache_state->clean = true;
+ m_cache_state->empty = true;
+ remove_pool_file();
+ update_image_cache_state();
+ m_cache_state->write_image_cache_state(locker, next_ctx);
+ });
+ ctx = new LambdaContext(
+ [this, ctx](int r) {
+ Context *next_ctx = override_ctx(r, ctx);
+ ldout(m_image_ctx.cct, 6) << "waiting for in flight operations" << dendl;
+ // Wait for in progress IOs to complete
+ next_ctx = util::create_async_context_callback(&m_work_queue, next_ctx);
+ m_async_op_tracker.wait_for_ops(next_ctx);
+ });
+ ctx = new LambdaContext(
+ [this, ctx](int r) {
+ Context *next_ctx = override_ctx(r, ctx);
+ {
+ /* Sync with process_writeback_dirty_entries() */
+ RWLock::WLocker entry_reader_wlocker(m_entry_reader_lock);
+ m_shutting_down = true;
+ /* Flush all writes to OSDs (unless disabled) and wait for all
+ in-progress flush writes to complete */
+ ldout(m_image_ctx.cct, 6) << "flushing" << dendl;
+ periodic_stats();
+ }
+ flush_dirty_entries(next_ctx);
+ });
+ ctx = new LambdaContext(
+ [this, ctx](int r) {
+ ldout(m_image_ctx.cct, 6) << "Done internal_flush in shutdown" << dendl;
+ m_work_queue.queue(ctx, r);
+ });
+ /* Complete all in-flight writes before shutting down */
+ ldout(m_image_ctx.cct, 6) << "internal_flush in shutdown" << dendl;
+ internal_flush(false, ctx);
+}
+
+template <typename I>
+void AbstractWriteLog<I>::read(Extents&& image_extents,
+ ceph::bufferlist* bl,
+ int fadvise_flags, Context *on_finish) {
+ CephContext *cct = m_image_ctx.cct;
+ utime_t now = ceph_clock_now();
+
+ on_finish = new LambdaContext(
+ [this, on_finish](int r) {
+ m_async_op_tracker.finish_op();
+ on_finish->complete(r);
+ });
+ C_ReadRequest *read_ctx = m_builder->create_read_request(
+ cct, now, m_perfcounter, bl, on_finish);
+ ldout(cct, 20) << "name: " << m_image_ctx.name << " id: " << m_image_ctx.id
+ << "image_extents=" << image_extents
+ << ", bl=" << bl
+ << ", on_finish=" << on_finish << dendl;
+
+ ceph_assert(m_initialized);
+ bl->clear();
+ m_perfcounter->inc(l_librbd_pwl_rd_req, 1);
+
+ std::vector<std::shared_ptr<GenericWriteLogEntry>> log_entries_to_read;
+ std::vector<bufferlist*> bls_to_read;
+
+ m_async_op_tracker.start_op();
+ Context *ctx = new LambdaContext(
+ [this, read_ctx, fadvise_flags](int r) {
+ if (read_ctx->miss_extents.empty()) {
+ /* All of this read comes from RWL */
+ read_ctx->complete(0);
+ } else {
+ /* Pass the read misses on to the layer below RWL */
+ m_image_writeback.aio_read(
+ std::move(read_ctx->miss_extents), &read_ctx->miss_bl,
+ fadvise_flags, read_ctx);
+ }
+ });
+
+ /*
+ * The strategy here is to look up all the WriteLogMapEntries that overlap
+ * this read, and iterate through those to separate this read into hits and
+ * misses. A new Extents object is produced here with Extents for each miss
+ * region. The miss Extents is then passed on to the read cache below RWL. We
+ * also produce an ImageExtentBufs for all the extents (hit or miss) in this
+ * read. When the read from the lower cache layer completes, we iterate
+ * through the ImageExtentBufs and insert buffers for each cache hit at the
+ * appropriate spot in the bufferlist returned from below for the miss
+ * read. The buffers we insert here refer directly to regions of various
+ * write log entry data buffers.
+ *
+ * Locking: These buffer objects hold a reference on the write log entries
+ * they refer to. Log entries can't be retired until there are no references.
+ * The GenericWriteLogEntry references are released by the buffer destructor.
+ */
+ for (auto &extent : image_extents) {
+ uint64_t extent_offset = 0;
+ RWLock::RLocker entry_reader_locker(m_entry_reader_lock);
+ WriteLogMapEntries map_entries = m_blocks_to_log_entries.find_map_entries(
+ block_extent(extent));
+ for (auto &map_entry : map_entries) {
+ Extent entry_image_extent(pwl::image_extent(map_entry.block_extent));
+ /* If this map entry starts after the current image extent offset ... */
+ if (entry_image_extent.first > extent.first + extent_offset) {
+ /* ... add range before map_entry to miss extents */
+ uint64_t miss_extent_start = extent.first + extent_offset;
+ uint64_t miss_extent_length = entry_image_extent.first -
+ miss_extent_start;
+ Extent miss_extent(miss_extent_start, miss_extent_length);
+ read_ctx->miss_extents.push_back(miss_extent);
+ /* Add miss range to read extents */
+ auto miss_extent_buf = std::make_shared<ImageExtentBuf>(miss_extent);
+ read_ctx->read_extents.push_back(miss_extent_buf);
+ extent_offset += miss_extent_length;
+ }
+ ceph_assert(entry_image_extent.first <= extent.first + extent_offset);
+ uint64_t entry_offset = 0;
+ /* If this map entry starts before the current image extent offset ... */
+ if (entry_image_extent.first < extent.first + extent_offset) {
+ /* ... compute offset into log entry for this read extent */
+ entry_offset = (extent.first + extent_offset) - entry_image_extent.first;
+ }
+ /* This read hit ends at the end of the extent or the end of the log
+ entry, whichever is less. */
+ uint64_t entry_hit_length = min(entry_image_extent.second - entry_offset,
+ extent.second - extent_offset);
+ Extent hit_extent(entry_image_extent.first, entry_hit_length);
+ if (0 == map_entry.log_entry->write_bytes() &&
+ 0 < map_entry.log_entry->bytes_dirty()) {
+ /* discard log entry */
+ ldout(cct, 20) << "discard log entry" << dendl;
+ auto discard_entry = map_entry.log_entry;
+ ldout(cct, 20) << "read hit on discard entry: log_entry="
+ << *discard_entry
+ << dendl;
+ /* Discards read as zero, so we'll construct a bufferlist of zeros */
+ bufferlist zero_bl;
+ zero_bl.append_zero(entry_hit_length);
+ /* Add hit extent to read extents */
+ auto hit_extent_buf = std::make_shared<ImageExtentBuf>(
+ hit_extent, zero_bl);
+ read_ctx->read_extents.push_back(hit_extent_buf);
+ } else {
+ ldout(cct, 20) << "write or writesame log entry" << dendl;
+ /* write and writesame log entry */
+ /* Offset of the map entry into the log entry's buffer */
+ uint64_t map_entry_buffer_offset = entry_image_extent.first -
+ map_entry.log_entry->ram_entry.image_offset_bytes;
+ /* Offset into the log entry buffer of this read hit */
+ uint64_t read_buffer_offset = map_entry_buffer_offset + entry_offset;
+ /* Create buffer object referring to pmem pool for this read hit */
+ collect_read_extents(
+ read_buffer_offset, map_entry, log_entries_to_read, bls_to_read,
+ entry_hit_length, hit_extent, read_ctx);
+ }
+ /* Exclude RWL hit range from buffer and extent */
+ extent_offset += entry_hit_length;
+ ldout(cct, 20) << map_entry << dendl;
+ }
+ /* If the last map entry didn't consume the entire image extent ... */
+ if (extent.second > extent_offset) {
+ /* ... add the rest of this extent to miss extents */
+ uint64_t miss_extent_start = extent.first + extent_offset;
+ uint64_t miss_extent_length = extent.second - extent_offset;
+ Extent miss_extent(miss_extent_start, miss_extent_length);
+ read_ctx->miss_extents.push_back(miss_extent);
+ /* Add miss range to read extents */
+ auto miss_extent_buf = std::make_shared<ImageExtentBuf>(miss_extent);
+ read_ctx->read_extents.push_back(miss_extent_buf);
+ extent_offset += miss_extent_length;
+ }
+ }
+
+ ldout(cct, 20) << "miss_extents=" << read_ctx->miss_extents
+ << ", miss_bl=" << read_ctx->miss_bl << dendl;
+
+ complete_read(log_entries_to_read, bls_to_read, ctx);
+}
+
+template <typename I>
+void AbstractWriteLog<I>::write(Extents &&image_extents,
+ bufferlist&& bl,
+ int fadvise_flags,
+ Context *on_finish) {
+ CephContext *cct = m_image_ctx.cct;
+
+ ldout(cct, 20) << "aio_write" << dendl;
+
+ utime_t now = ceph_clock_now();
+ m_perfcounter->inc(l_librbd_pwl_wr_req, 1);
+
+ ceph_assert(m_initialized);
+
+ /* Split image extents larger than 1M. This isn't strictly necessary but
+ * makes libpmemobj allocator's job easier and reduces pmemobj_defrag() cost.
+ * We plan to manage pmem space and allocation by ourselves in the future.
+ */
+ Extents split_image_extents;
+ uint64_t max_extent_size = get_max_extent();
+ if (max_extent_size != 0) {
+ for (auto extent : image_extents) {
+ if (extent.second > max_extent_size) {
+ uint64_t off = extent.first;
+ uint64_t extent_bytes = extent.second;
+ for (int i = 0; extent_bytes != 0; ++i) {
+ Extent _ext;
+ _ext.first = off + i * max_extent_size;
+ _ext.second = std::min(max_extent_size, extent_bytes);
+ extent_bytes = extent_bytes - _ext.second ;
+ split_image_extents.emplace_back(_ext);
+ }
+ } else {
+ split_image_extents.emplace_back(extent);
+ }
+ }
+ } else {
+ split_image_extents = image_extents;
+ }
+
+ C_WriteRequestT *write_req =
+ m_builder->create_write_request(*this, now, std::move(split_image_extents),
+ std::move(bl), fadvise_flags, m_lock,
+ m_perfcounter, on_finish);
+ m_perfcounter->inc(l_librbd_pwl_wr_bytes,
+ write_req->image_extents_summary.total_bytes);
+
+ /* The lambda below will be called when the block guard for all
+ * blocks affected by this write is obtained */
+ GuardedRequestFunctionContext *guarded_ctx =
+ new GuardedRequestFunctionContext([this,
+ write_req](GuardedRequestFunctionContext &guard_ctx) {
+ write_req->blockguard_acquired(guard_ctx);
+ alloc_and_dispatch_io_req(write_req);
+ });
+
+ detain_guarded_request(write_req, guarded_ctx, false);
+}
+
+template <typename I>
+void AbstractWriteLog<I>::discard(uint64_t offset, uint64_t length,
+ uint32_t discard_granularity_bytes,
+ Context *on_finish) {
+ CephContext *cct = m_image_ctx.cct;
+
+ ldout(cct, 20) << dendl;
+
+ utime_t now = ceph_clock_now();
+ m_perfcounter->inc(l_librbd_pwl_discard, 1);
+ Extents discard_extents = {{offset, length}};
+ m_discard_granularity_bytes = discard_granularity_bytes;
+
+ ceph_assert(m_initialized);
+
+ auto *discard_req =
+ new C_DiscardRequestT(*this, now, std::move(discard_extents), discard_granularity_bytes,
+ m_lock, m_perfcounter, on_finish);
+
+ /* The lambda below will be called when the block guard for all
+ * blocks affected by this write is obtained */
+ GuardedRequestFunctionContext *guarded_ctx =
+ new GuardedRequestFunctionContext([this, discard_req](GuardedRequestFunctionContext &guard_ctx) {
+ discard_req->blockguard_acquired(guard_ctx);
+ alloc_and_dispatch_io_req(discard_req);
+ });
+
+ detain_guarded_request(discard_req, guarded_ctx, false);
+}
+
+/**
+ * Aio_flush completes when all previously completed writes are
+ * flushed to persistent cache. We make a best-effort attempt to also
+ * defer until all in-progress writes complete, but we may not know
+ * about all of the writes the application considers in-progress yet,
+ * due to uncertainty in the IO submission workq (multiple WQ threads
+ * may allow out-of-order submission).
+ *
+ * This flush operation will not wait for writes deferred for overlap
+ * in the block guard.
+ */
+template <typename I>
+void AbstractWriteLog<I>::flush(io::FlushSource flush_source, Context *on_finish) {
+ CephContext *cct = m_image_ctx.cct;
+ ldout(cct, 20) << "on_finish=" << on_finish << " flush_source=" << flush_source << dendl;
+
+ if (io::FLUSH_SOURCE_SHUTDOWN == flush_source || io::FLUSH_SOURCE_INTERNAL == flush_source ||
+ io::FLUSH_SOURCE_WRITE_BLOCK == flush_source) {
+ internal_flush(false, on_finish);
+ return;
+ }
+ m_perfcounter->inc(l_librbd_pwl_aio_flush, 1);
+
+ /* May be called even if initialization fails */
+ if (!m_initialized) {
+ ldout(cct, 05) << "never initialized" << dendl;
+ /* Deadlock if completed here */
+ m_image_ctx.op_work_queue->queue(on_finish, 0);
+ return;
+ }
+
+ {
+ std::shared_lock image_locker(m_image_ctx.image_lock);
+ if (m_image_ctx.snap_id != CEPH_NOSNAP || m_image_ctx.read_only) {
+ on_finish->complete(-EROFS);
+ return;
+ }
+ }
+
+ auto flush_req = make_flush_req(on_finish);
+
+ GuardedRequestFunctionContext *guarded_ctx =
+ new GuardedRequestFunctionContext([this, flush_req](GuardedRequestFunctionContext &guard_ctx) {
+ ldout(m_image_ctx.cct, 20) << "flush_req=" << flush_req << " cell=" << guard_ctx.cell << dendl;
+ ceph_assert(guard_ctx.cell);
+ flush_req->detained = guard_ctx.state.detained;
+ /* We don't call flush_req->set_cell(), because the block guard will be released here */
+ {
+ DeferredContexts post_unlock; /* Do these when the lock below is released */
+ std::lock_guard locker(m_lock);
+
+ if (!m_persist_on_flush && m_persist_on_write_until_flush) {
+ m_persist_on_flush = true;
+ ldout(m_image_ctx.cct, 5) << "now persisting on flush" << dendl;
+ }
+
+ /*
+ * Create a new sync point if there have been writes since the last
+ * one.
+ *
+ * We do not flush the caches below the RWL here.
+ */
+ flush_new_sync_point_if_needed(flush_req, post_unlock);
+ }
+
+ release_guarded_request(guard_ctx.cell);
+ });
+
+ detain_guarded_request(flush_req, guarded_ctx, true);
+}
+
+template <typename I>
+void AbstractWriteLog<I>::writesame(uint64_t offset, uint64_t length,
+ bufferlist&& bl, int fadvise_flags,
+ Context *on_finish) {
+ CephContext *cct = m_image_ctx.cct;
+
+ ldout(cct, 20) << "aio_writesame" << dendl;
+
+ utime_t now = ceph_clock_now();
+ Extents ws_extents = {{offset, length}};
+ m_perfcounter->inc(l_librbd_pwl_ws, 1);
+ ceph_assert(m_initialized);
+
+ /* A write same request is also a write request. The key difference is the
+ * write same data buffer is shorter than the extent of the request. The full
+ * extent will be used in the block guard, and appear in
+ * m_blocks_to_log_entries_map. The data buffer allocated for the WS is only
+ * as long as the length of the bl here, which is the pattern that's repeated
+ * in the image for the entire length of this WS. Read hits and flushing of
+ * write sames are different than normal writes. */
+ C_WriteSameRequestT *ws_req =
+ m_builder->create_writesame_request(*this, now, std::move(ws_extents), std::move(bl),
+ fadvise_flags, m_lock, m_perfcounter, on_finish);
+ m_perfcounter->inc(l_librbd_pwl_ws_bytes, ws_req->image_extents_summary.total_bytes);
+
+ /* The lambda below will be called when the block guard for all
+ * blocks affected by this write is obtained */
+ GuardedRequestFunctionContext *guarded_ctx =
+ new GuardedRequestFunctionContext([this, ws_req](GuardedRequestFunctionContext &guard_ctx) {
+ ws_req->blockguard_acquired(guard_ctx);
+ alloc_and_dispatch_io_req(ws_req);
+ });
+
+ detain_guarded_request(ws_req, guarded_ctx, false);
+}
+
+template <typename I>
+void AbstractWriteLog<I>::compare_and_write(Extents &&image_extents,
+ bufferlist&& cmp_bl,
+ bufferlist&& bl,
+ uint64_t *mismatch_offset,
+ int fadvise_flags,
+ Context *on_finish) {
+ ldout(m_image_ctx.cct, 20) << dendl;
+
+ utime_t now = ceph_clock_now();
+ m_perfcounter->inc(l_librbd_pwl_cmp, 1);
+ ceph_assert(m_initialized);
+
+ /* A compare and write request is also a write request. We only allocate
+ * resources and dispatch this write request if the compare phase
+ * succeeds. */
+ C_WriteRequestT *cw_req =
+ m_builder->create_comp_and_write_request(
+ *this, now, std::move(image_extents), std::move(cmp_bl), std::move(bl),
+ mismatch_offset, fadvise_flags, m_lock, m_perfcounter, on_finish);
+ m_perfcounter->inc(l_librbd_pwl_cmp_bytes, cw_req->image_extents_summary.total_bytes);
+
+ /* The lambda below will be called when the block guard for all
+ * blocks affected by this write is obtained */
+ GuardedRequestFunctionContext *guarded_ctx =
+ new GuardedRequestFunctionContext([this, cw_req](GuardedRequestFunctionContext &guard_ctx) {
+ cw_req->blockguard_acquired(guard_ctx);
+
+ auto read_complete_ctx = new LambdaContext(
+ [this, cw_req](int r) {
+ ldout(m_image_ctx.cct, 20) << "name: " << m_image_ctx.name << " id: " << m_image_ctx.id
+ << "cw_req=" << cw_req << dendl;
+
+ /* Compare read_bl to cmp_bl to determine if this will produce a write */
+ ceph_assert(cw_req->read_bl.length() <= cw_req->cmp_bl.length());
+ ceph_assert(cw_req->read_bl.length() == cw_req->image_extents_summary.total_bytes);
+ bufferlist sub_cmp_bl;
+ sub_cmp_bl.substr_of(cw_req->cmp_bl, 0, cw_req->read_bl.length());
+ if (sub_cmp_bl.contents_equal(cw_req->read_bl)) {
+ /* Compare phase succeeds. Begin write */
+ ldout(m_image_ctx.cct, 5) << " cw_req=" << cw_req << " compare matched" << dendl;
+ cw_req->compare_succeeded = true;
+ *cw_req->mismatch_offset = 0;
+ /* Continue with this request as a write. Blockguard release and
+ * user request completion handled as if this were a plain
+ * write. */
+ alloc_and_dispatch_io_req(cw_req);
+ } else {
+ /* Compare phase fails. Comp-and write ends now. */
+ ldout(m_image_ctx.cct, 15) << " cw_req=" << cw_req << " compare failed" << dendl;
+ /* Bufferlist doesn't tell us where they differed, so we'll have to determine that here */
+ uint64_t bl_index = 0;
+ for (bl_index = 0; bl_index < sub_cmp_bl.length(); bl_index++) {
+ if (sub_cmp_bl[bl_index] != cw_req->read_bl[bl_index]) {
+ ldout(m_image_ctx.cct, 15) << " cw_req=" << cw_req << " mismatch at " << bl_index << dendl;
+ break;
+ }
+ }
+ cw_req->compare_succeeded = false;
+ *cw_req->mismatch_offset = bl_index;
+ cw_req->complete_user_request(-EILSEQ);
+ cw_req->release_cell();
+ cw_req->complete(0);
+ }
+ });
+
+ /* Read phase of comp-and-write must read through RWL */
+ Extents image_extents_copy = cw_req->image_extents;
+ read(std::move(image_extents_copy), &cw_req->read_bl, cw_req->fadvise_flags, read_complete_ctx);
+ });
+
+ detain_guarded_request(cw_req, guarded_ctx, false);
+}
+
+template <typename I>
+void AbstractWriteLog<I>::flush(Context *on_finish) {
+ internal_flush(false, on_finish);
+}
+
+template <typename I>
+void AbstractWriteLog<I>::invalidate(Context *on_finish) {
+ internal_flush(true, on_finish);
+}
+
+template <typename I>
+CephContext *AbstractWriteLog<I>::get_context() {
+ return m_image_ctx.cct;
+}
+
+template <typename I>
+BlockGuardCell* AbstractWriteLog<I>::detain_guarded_request_helper(GuardedRequest &req)
+{
+ CephContext *cct = m_image_ctx.cct;
+ BlockGuardCell *cell;
+
+ ceph_assert(ceph_mutex_is_locked_by_me(m_blockguard_lock));
+ ldout(cct, 20) << dendl;
+
+ int r = m_write_log_guard.detain(req.block_extent, &req, &cell);
+ ceph_assert(r>=0);
+ if (r > 0) {
+ ldout(cct, 20) << "detaining guarded request due to in-flight requests: "
+ << "req=" << req << dendl;
+ return nullptr;
+ }
+
+ ldout(cct, 20) << "in-flight request cell: " << cell << dendl;
+ return cell;
+}
+
+template <typename I>
+BlockGuardCell* AbstractWriteLog<I>::detain_guarded_request_barrier_helper(
+ GuardedRequest &req)
+{
+ BlockGuardCell *cell = nullptr;
+
+ ceph_assert(ceph_mutex_is_locked_by_me(m_blockguard_lock));
+ ldout(m_image_ctx.cct, 20) << dendl;
+
+ if (m_barrier_in_progress) {
+ req.guard_ctx->state.queued = true;
+ m_awaiting_barrier.push_back(req);
+ } else {
+ bool barrier = req.guard_ctx->state.barrier;
+ if (barrier) {
+ m_barrier_in_progress = true;
+ req.guard_ctx->state.current_barrier = true;
+ }
+ cell = detain_guarded_request_helper(req);
+ if (barrier) {
+ /* Only non-null if the barrier acquires the guard now */
+ m_barrier_cell = cell;
+ }
+ }
+
+ return cell;
+}
+
+template <typename I>
+void AbstractWriteLog<I>::detain_guarded_request(
+ C_BlockIORequestT *request,
+ GuardedRequestFunctionContext *guarded_ctx,
+ bool is_barrier)
+{
+ BlockExtent extent;
+ if (request) {
+ extent = request->image_extents_summary.block_extent();
+ } else {
+ extent = block_extent(whole_volume_extent());
+ }
+ auto req = GuardedRequest(extent, guarded_ctx, is_barrier);
+ BlockGuardCell *cell = nullptr;
+
+ ldout(m_image_ctx.cct, 20) << dendl;
+ {
+ std::lock_guard locker(m_blockguard_lock);
+ cell = detain_guarded_request_barrier_helper(req);
+ }
+ if (cell) {
+ req.guard_ctx->cell = cell;
+ req.guard_ctx->complete(0);
+ }
+}
+
+template <typename I>
+void AbstractWriteLog<I>::release_guarded_request(BlockGuardCell *released_cell)
+{
+ CephContext *cct = m_image_ctx.cct;
+ WriteLogGuard::BlockOperations block_reqs;
+ ldout(cct, 20) << "released_cell=" << released_cell << dendl;
+
+ {
+ std::lock_guard locker(m_blockguard_lock);
+ m_write_log_guard.release(released_cell, &block_reqs);
+
+ for (auto &req : block_reqs) {
+ req.guard_ctx->state.detained = true;
+ BlockGuardCell *detained_cell = detain_guarded_request_helper(req);
+ if (detained_cell) {
+ if (req.guard_ctx->state.current_barrier) {
+ /* The current barrier is acquiring the block guard, so now we know its cell */
+ m_barrier_cell = detained_cell;
+ /* detained_cell could be == released_cell here */
+ ldout(cct, 20) << "current barrier cell=" << detained_cell << " req=" << req << dendl;
+ }
+ req.guard_ctx->cell = detained_cell;
+ m_work_queue.queue(req.guard_ctx);
+ }
+ }
+
+ if (m_barrier_in_progress && (released_cell == m_barrier_cell)) {
+ ldout(cct, 20) << "current barrier released cell=" << released_cell << dendl;
+ /* The released cell is the current barrier request */
+ m_barrier_in_progress = false;
+ m_barrier_cell = nullptr;
+ /* Move waiting requests into the blockguard. Stop if there's another barrier */
+ while (!m_barrier_in_progress && !m_awaiting_barrier.empty()) {
+ auto &req = m_awaiting_barrier.front();
+ ldout(cct, 20) << "submitting queued request to blockguard: " << req << dendl;
+ BlockGuardCell *detained_cell = detain_guarded_request_barrier_helper(req);
+ if (detained_cell) {
+ req.guard_ctx->cell = detained_cell;
+ m_work_queue.queue(req.guard_ctx);
+ }
+ m_awaiting_barrier.pop_front();
+ }
+ }
+ }
+
+ ldout(cct, 20) << "exit" << dendl;
+}
+
+template <typename I>
+void AbstractWriteLog<I>::append_scheduled(GenericLogOperations &ops, bool &ops_remain,
+ bool &appending, bool isRWL)
+{
+ const unsigned long int OPS_APPENDED = isRWL ? MAX_ALLOC_PER_TRANSACTION
+ : MAX_WRITES_PER_SYNC_POINT;
+ {
+ std::lock_guard locker(m_lock);
+ if (!appending && m_appending) {
+ /* Another thread is appending */
+ ldout(m_image_ctx.cct, 15) << "Another thread is appending" << dendl;
+ return;
+ }
+ if (m_ops_to_append.size()) {
+ appending = true;
+ m_appending = true;
+ auto last_in_batch = m_ops_to_append.begin();
+ unsigned int ops_to_append = m_ops_to_append.size();
+ if (ops_to_append > OPS_APPENDED) {
+ ops_to_append = OPS_APPENDED;
+ }
+ std::advance(last_in_batch, ops_to_append);
+ ops.splice(ops.end(), m_ops_to_append, m_ops_to_append.begin(), last_in_batch);
+ ops_remain = true; /* Always check again before leaving */
+ ldout(m_image_ctx.cct, 20) << "appending " << ops.size() << ", remain "
+ << m_ops_to_append.size() << dendl;
+ } else if (isRWL) {
+ ops_remain = false;
+ if (appending) {
+ appending = false;
+ m_appending = false;
+ }
+ }
+ }
+}
+
+template <typename I>
+void AbstractWriteLog<I>::schedule_append(GenericLogOperationsVector &ops, C_BlockIORequestT *req)
+{
+ GenericLogOperations to_append(ops.begin(), ops.end());
+
+ schedule_append_ops(to_append, req);
+}
+
+template <typename I>
+void AbstractWriteLog<I>::schedule_append(GenericLogOperationSharedPtr op, C_BlockIORequestT *req)
+{
+ GenericLogOperations to_append { op };
+
+ schedule_append_ops(to_append, req);
+}
+
+/*
+ * Complete a set of write ops with the result of append_op_entries.
+ */
+template <typename I>
+void AbstractWriteLog<I>::complete_op_log_entries(GenericLogOperations &&ops,
+ const int result)
+{
+ GenericLogEntries dirty_entries;
+ int published_reserves = 0;
+ ldout(m_image_ctx.cct, 20) << __func__ << ": completing" << dendl;
+ for (auto &op : ops) {
+ utime_t now = ceph_clock_now();
+ auto log_entry = op->get_log_entry();
+ log_entry->completed = true;
+ if (op->is_writing_op()) {
+ op->mark_log_entry_completed();
+ dirty_entries.push_back(log_entry);
+ }
+ if (log_entry->is_write_entry()) {
+ release_ram(log_entry);
+ }
+ if (op->reserved_allocated()) {
+ published_reserves++;
+ }
+ {
+ std::lock_guard locker(m_lock);
+ m_unpublished_reserves -= published_reserves;
+ m_dirty_log_entries.splice(m_dirty_log_entries.end(), dirty_entries);
+ }
+ op->complete(result);
+ m_perfcounter->tinc(l_librbd_pwl_log_op_dis_to_app_t,
+ op->log_append_start_time - op->dispatch_time);
+ m_perfcounter->tinc(l_librbd_pwl_log_op_dis_to_cmp_t, now - op->dispatch_time);
+ m_perfcounter->hinc(l_librbd_pwl_log_op_dis_to_cmp_t_hist,
+ utime_t(now - op->dispatch_time).to_nsec(),
+ log_entry->ram_entry.write_bytes);
+ utime_t app_lat = op->log_append_comp_time - op->log_append_start_time;
+ m_perfcounter->tinc(l_librbd_pwl_log_op_app_to_appc_t, app_lat);
+ m_perfcounter->hinc(l_librbd_pwl_log_op_app_to_appc_t_hist, app_lat.to_nsec(),
+ log_entry->ram_entry.write_bytes);
+ m_perfcounter->tinc(l_librbd_pwl_log_op_app_to_cmp_t, now - op->log_append_start_time);
+ }
+ // New entries may be flushable
+ {
+ std::lock_guard locker(m_lock);
+ wake_up();
+ }
+}
+
+/**
+ * Dispatch as many deferred writes as possible
+ */
+template <typename I>
+void AbstractWriteLog<I>::dispatch_deferred_writes(void)
+{
+ C_BlockIORequestT *front_req = nullptr; /* req still on front of deferred list */
+ C_BlockIORequestT *allocated_req = nullptr; /* req that was allocated, and is now off the list */
+ bool allocated = false; /* front_req allocate succeeded */
+ bool cleared_dispatching_flag = false;
+
+ /* If we can't become the dispatcher, we'll exit */
+ {
+ std::lock_guard locker(m_lock);
+ if (m_dispatching_deferred_ops ||
+ !m_deferred_ios.size()) {
+ return;
+ }
+ m_dispatching_deferred_ops = true;
+ }
+
+ /* There are ops to dispatch, and this should be the only thread dispatching them */
+ {
+ std::lock_guard deferred_dispatch(m_deferred_dispatch_lock);
+ do {
+ {
+ std::lock_guard locker(m_lock);
+ ceph_assert(m_dispatching_deferred_ops);
+ if (allocated) {
+ /* On the 2..n-1 th time we get lock, front_req->alloc_resources() will
+ * have succeeded, and we'll need to pop it off the deferred ops list
+ * here. */
+ ceph_assert(front_req);
+ ceph_assert(!allocated_req);
+ m_deferred_ios.pop_front();
+ allocated_req = front_req;
+ front_req = nullptr;
+ allocated = false;
+ }
+ ceph_assert(!allocated);
+ if (!allocated && front_req) {
+ /* front_req->alloc_resources() failed on the last iteration.
+ * We'll stop dispatching. */
+ wake_up();
+ front_req = nullptr;
+ ceph_assert(!cleared_dispatching_flag);
+ m_dispatching_deferred_ops = false;
+ cleared_dispatching_flag = true;
+ } else {
+ ceph_assert(!front_req);
+ if (m_deferred_ios.size()) {
+ /* New allocation candidate */
+ front_req = m_deferred_ios.front();
+ } else {
+ ceph_assert(!cleared_dispatching_flag);
+ m_dispatching_deferred_ops = false;
+ cleared_dispatching_flag = true;
+ }
+ }
+ }
+ /* Try allocating for front_req before we decide what to do with allocated_req
+ * (if any) */
+ if (front_req) {
+ ceph_assert(!cleared_dispatching_flag);
+ allocated = front_req->alloc_resources();
+ }
+ if (allocated_req && front_req && allocated) {
+ /* Push dispatch of the first allocated req to a wq */
+ m_work_queue.queue(new LambdaContext(
+ [allocated_req](int r) {
+ allocated_req->dispatch();
+ }), 0);
+ allocated_req = nullptr;
+ }
+ ceph_assert(!(allocated_req && front_req && allocated));
+
+ /* Continue while we're still considering the front of the deferred ops list */
+ } while (front_req);
+ ceph_assert(!allocated);
+ }
+ ceph_assert(cleared_dispatching_flag);
+
+ /* If any deferred requests were allocated, the last one will still be in allocated_req */
+ if (allocated_req) {
+ allocated_req->dispatch();
+ }
+}
+
+/**
+ * Returns the lanes used by this write, and attempts to dispatch the next
+ * deferred write
+ */
+template <typename I>
+void AbstractWriteLog<I>::release_write_lanes(C_BlockIORequestT *req)
+{
+ {
+ std::lock_guard locker(m_lock);
+ m_free_lanes += req->image_extents.size();
+ }
+ dispatch_deferred_writes();
+}
+
+/**
+ * Attempts to allocate log resources for a write. Write is dispatched if
+ * resources are available, or queued if they aren't.
+ */
+template <typename I>
+void AbstractWriteLog<I>::alloc_and_dispatch_io_req(C_BlockIORequestT *req)
+{
+ bool dispatch_here = false;
+
+ {
+ /* If there are already deferred writes, queue behind them for resources */
+ {
+ std::lock_guard locker(m_lock);
+ dispatch_here = m_deferred_ios.empty();
+ // Only flush req's total_bytes is the max uint64
+ if (req->image_extents_summary.total_bytes ==
+ std::numeric_limits<uint64_t>::max() &&
+ static_cast<C_FlushRequestT *>(req)->internal == true) {
+ dispatch_here = true;
+ }
+ }
+ if (dispatch_here) {
+ dispatch_here = req->alloc_resources();
+ }
+ if (dispatch_here) {
+ ldout(m_image_ctx.cct, 20) << "dispatching" << dendl;
+ req->dispatch();
+ } else {
+ req->deferred();
+ {
+ std::lock_guard locker(m_lock);
+ m_deferred_ios.push_back(req);
+ }
+ ldout(m_image_ctx.cct, 20) << "deferred IOs: " << m_deferred_ios.size() << dendl;
+ dispatch_deferred_writes();
+ }
+ }
+}
+
+template <typename I>
+bool AbstractWriteLog<I>::check_allocation(
+ C_BlockIORequestT *req, uint64_t bytes_cached, uint64_t bytes_dirtied,
+ uint64_t bytes_allocated, uint32_t num_lanes, uint32_t num_log_entries,
+ uint32_t num_unpublished_reserves) {
+ bool alloc_succeeds = true;
+ bool no_space = false;
+ {
+ std::lock_guard locker(m_lock);
+ if (m_free_lanes < num_lanes) {
+ ldout(m_image_ctx.cct, 20) << "not enough free lanes (need "
+ << num_lanes
+ << ", have " << m_free_lanes << ") "
+ << *req << dendl;
+ alloc_succeeds = false;
+ /* This isn't considered a "no space" alloc fail. Lanes are a throttling mechanism. */
+ }
+ if (m_free_log_entries < num_log_entries) {
+ ldout(m_image_ctx.cct, 20) << "not enough free entries (need "
+ << num_log_entries
+ << ", have " << m_free_log_entries << ") "
+ << *req << dendl;
+ alloc_succeeds = false;
+ no_space = true; /* Entries must be retired */
+ }
+ /* Don't attempt buffer allocate if we've exceeded the "full" threshold */
+ if (m_bytes_allocated + bytes_allocated > m_bytes_allocated_cap) {
+ ldout(m_image_ctx.cct, 20) << "Waiting for allocation cap (cap="
+ << m_bytes_allocated_cap
+ << ", allocated=" << m_bytes_allocated
+ << ") in write [" << *req << "]" << dendl;
+ alloc_succeeds = false;
+ no_space = true; /* Entries must be retired */
+ }
+ }
+
+ if (alloc_succeeds) {
+ reserve_cache(req, alloc_succeeds, no_space);
+ }
+
+ if (alloc_succeeds) {
+ std::unique_lock locker(m_lock);
+ /* We need one free log entry per extent (each is a separate entry), and
+ * one free "lane" for remote replication. */
+ if ((m_free_lanes >= num_lanes) &&
+ (m_free_log_entries >= num_log_entries) &&
+ (m_bytes_allocated_cap >= m_bytes_allocated + bytes_allocated)) {
+ m_free_lanes -= num_lanes;
+ m_free_log_entries -= num_log_entries;
+ m_unpublished_reserves += num_unpublished_reserves;
+ m_bytes_allocated += bytes_allocated;
+ m_bytes_cached += bytes_cached;
+ m_bytes_dirty += bytes_dirtied;
+ if (m_cache_state->clean && bytes_dirtied > 0) {
+ m_cache_state->clean = false;
+ update_image_cache_state();
+ write_image_cache_state(locker);
+ }
+ } else {
+ alloc_succeeds = false;
+ }
+ }
+
+ if (!alloc_succeeds && no_space) {
+ /* Expedite flushing and/or retiring */
+ std::lock_guard locker(m_lock);
+ m_alloc_failed_since_retire = true;
+ m_last_alloc_fail = ceph_clock_now();
+ }
+
+ return alloc_succeeds;
+}
+
+template <typename I>
+C_FlushRequest<AbstractWriteLog<I>>* AbstractWriteLog<I>::make_flush_req(Context *on_finish) {
+ utime_t flush_begins = ceph_clock_now();
+ bufferlist bl;
+ auto *flush_req =
+ new C_FlushRequestT(*this, flush_begins, Extents({whole_volume_extent()}),
+ std::move(bl), 0, m_lock, m_perfcounter, on_finish);
+
+ return flush_req;
+}
+
+template <typename I>
+void AbstractWriteLog<I>::wake_up() {
+ CephContext *cct = m_image_ctx.cct;
+ ceph_assert(ceph_mutex_is_locked_by_me(m_lock));
+
+ if (!m_wake_up_enabled) {
+ // wake_up is disabled during shutdown after flushing completes
+ ldout(m_image_ctx.cct, 6) << "deferred processing disabled" << dendl;
+ return;
+ }
+
+ if (m_wake_up_requested && m_wake_up_scheduled) {
+ return;
+ }
+
+ ldout(cct, 20) << dendl;
+
+ /* Wake-up can be requested while it's already scheduled */
+ m_wake_up_requested = true;
+
+ /* Wake-up cannot be scheduled if it's already scheduled */
+ if (m_wake_up_scheduled) {
+ return;
+ }
+ m_wake_up_scheduled = true;
+ m_async_process_work++;
+ m_async_op_tracker.start_op();
+ m_work_queue.queue(new LambdaContext(
+ [this](int r) {
+ process_work();
+ m_async_op_tracker.finish_op();
+ m_async_process_work--;
+ }), 0);
+}
+
+template <typename I>
+bool AbstractWriteLog<I>::can_flush_entry(std::shared_ptr<GenericLogEntry> log_entry) {
+ CephContext *cct = m_image_ctx.cct;
+
+ ldout(cct, 20) << "" << dendl;
+ ceph_assert(ceph_mutex_is_locked_by_me(m_lock));
+
+ if (m_invalidating) {
+ return true;
+ }
+
+ /* For OWB we can flush entries with the same sync gen number (write between
+ * aio_flush() calls) concurrently. Here we'll consider an entry flushable if
+ * its sync gen number is <= the lowest sync gen number carried by all the
+ * entries currently flushing.
+ *
+ * If the entry considered here bears a sync gen number lower than a
+ * previously flushed entry, the application had to have submitted the write
+ * bearing the higher gen number before the write with the lower gen number
+ * completed. So, flushing these concurrently is OK.
+ *
+ * If the entry considered here bears a sync gen number higher than a
+ * currently flushing entry, the write with the lower gen number may have
+ * completed to the application before the write with the higher sync gen
+ * number was submitted, and the application may rely on that completion
+ * order for volume consistency. In this case the entry will not be
+ * considered flushable until all the entries bearing lower sync gen numbers
+ * finish flushing.
+ */
+
+ if (m_flush_ops_in_flight &&
+ (log_entry->ram_entry.sync_gen_number > m_lowest_flushing_sync_gen)) {
+ return false;
+ }
+
+ return (log_entry->can_writeback() &&
+ (m_flush_ops_in_flight <= IN_FLIGHT_FLUSH_WRITE_LIMIT) &&
+ (m_flush_bytes_in_flight <= IN_FLIGHT_FLUSH_BYTES_LIMIT));
+}
+
+template <typename I>
+void AbstractWriteLog<I>::detain_flush_guard_request(std::shared_ptr<GenericLogEntry> log_entry,
+ GuardedRequestFunctionContext *guarded_ctx) {
+ ldout(m_image_ctx.cct, 20) << dendl;
+
+ BlockExtent extent;
+ if (log_entry->is_sync_point()) {
+ extent = block_extent(whole_volume_extent());
+ } else {
+ extent = log_entry->ram_entry.block_extent();
+ }
+
+ auto req = GuardedRequest(extent, guarded_ctx, false);
+ BlockGuardCell *cell = nullptr;
+
+ {
+ std::lock_guard locker(m_flush_guard_lock);
+ m_flush_guard.detain(req.block_extent, &req, &cell);
+ }
+ if (cell) {
+ req.guard_ctx->cell = cell;
+ m_image_ctx.op_work_queue->queue(req.guard_ctx, 0);
+ }
+}
+
+template <typename I>
+Context* AbstractWriteLog<I>::construct_flush_entry(std::shared_ptr<GenericLogEntry> log_entry,
+ bool invalidating) {
+ ldout(m_image_ctx.cct, 20) << "" << dendl;
+
+ /* Flush write completion action */
+ utime_t writeback_start_time = ceph_clock_now();
+ Context *ctx = new LambdaContext(
+ [this, log_entry, writeback_start_time, invalidating](int r) {
+ utime_t writeback_comp_time = ceph_clock_now();
+ m_perfcounter->tinc(l_librbd_pwl_writeback_latency,
+ writeback_comp_time - writeback_start_time);
+ {
+ std::lock_guard locker(m_lock);
+ if (r < 0) {
+ lderr(m_image_ctx.cct) << "failed to flush log entry"
+ << cpp_strerror(r) << dendl;
+ m_dirty_log_entries.push_front(log_entry);
+ } else {
+ ceph_assert(m_bytes_dirty >= log_entry->bytes_dirty());
+ log_entry->set_flushed(true);
+ m_bytes_dirty -= log_entry->bytes_dirty();
+ sync_point_writer_flushed(log_entry->get_sync_point_entry());
+ ldout(m_image_ctx.cct, 20) << "flushed: " << log_entry
+ << " invalidating=" << invalidating
+ << dendl;
+ }
+ m_flush_ops_in_flight -= 1;
+ m_flush_bytes_in_flight -= log_entry->ram_entry.write_bytes;
+ wake_up();
+ }
+ });
+ /* Flush through lower cache before completing */
+ ctx = new LambdaContext(
+ [this, ctx, log_entry](int r) {
+ {
+
+ WriteLogGuard::BlockOperations block_reqs;
+ BlockGuardCell *detained_cell = nullptr;
+
+ std::lock_guard locker{m_flush_guard_lock};
+ m_flush_guard.release(log_entry->m_cell, &block_reqs);
+
+ for (auto &req : block_reqs) {
+ m_flush_guard.detain(req.block_extent, &req, &detained_cell);
+ if (detained_cell) {
+ req.guard_ctx->cell = detained_cell;
+ m_image_ctx.op_work_queue->queue(req.guard_ctx, 0);
+ }
+ }
+ }
+
+ if (r < 0) {
+ lderr(m_image_ctx.cct) << "failed to flush log entry"
+ << cpp_strerror(r) << dendl;
+ ctx->complete(r);
+ } else {
+ m_image_writeback.aio_flush(io::FLUSH_SOURCE_WRITEBACK, ctx);
+ }
+ });
+ return ctx;
+}
+
+template <typename I>
+void AbstractWriteLog<I>::process_writeback_dirty_entries() {
+ CephContext *cct = m_image_ctx.cct;
+ bool all_clean = false;
+ int flushed = 0;
+ bool has_write_entry = false;
+ bool need_update_state = false;
+
+ ldout(cct, 20) << "Look for dirty entries" << dendl;
+ {
+ DeferredContexts post_unlock;
+ GenericLogEntries entries_to_flush;
+
+ std::shared_lock entry_reader_locker(m_entry_reader_lock);
+ std::lock_guard locker(m_lock);
+ while (flushed < IN_FLIGHT_FLUSH_WRITE_LIMIT) {
+ if (m_shutting_down) {
+ ldout(cct, 5) << "Flush during shutdown supressed" << dendl;
+ /* Do flush complete only when all flush ops are finished */
+ all_clean = !m_flush_ops_in_flight;
+ break;
+ }
+ if (m_dirty_log_entries.empty()) {
+ ldout(cct, 20) << "Nothing new to flush" << dendl;
+ /* Do flush complete only when all flush ops are finished */
+ all_clean = !m_flush_ops_in_flight;
+ if (!m_cache_state->clean && all_clean) {
+ m_cache_state->clean = true;
+ update_image_cache_state();
+ need_update_state = true;
+ }
+ break;
+ }
+
+ auto candidate = m_dirty_log_entries.front();
+ bool flushable = can_flush_entry(candidate);
+ if (flushable) {
+ entries_to_flush.push_back(candidate);
+ flushed++;
+ if (!has_write_entry)
+ has_write_entry = candidate->is_write_entry();
+ m_dirty_log_entries.pop_front();
+
+ // To track candidate, we should add m_flush_ops_in_flight in here
+ {
+ if (!m_flush_ops_in_flight ||
+ (candidate->ram_entry.sync_gen_number < m_lowest_flushing_sync_gen)) {
+ m_lowest_flushing_sync_gen = candidate->ram_entry.sync_gen_number;
+ }
+ m_flush_ops_in_flight += 1;
+ /* For write same this is the bytes affected by the flush op, not the bytes transferred */
+ m_flush_bytes_in_flight += candidate->ram_entry.write_bytes;
+ }
+ } else {
+ ldout(cct, 20) << "Next dirty entry isn't flushable yet" << dendl;
+ break;
+ }
+ }
+
+ construct_flush_entries(entries_to_flush, post_unlock, has_write_entry);
+ }
+ if (need_update_state) {
+ std::unique_lock locker(m_lock);
+ write_image_cache_state(locker);
+ }
+
+ if (all_clean) {
+ /* All flushing complete, drain outside lock */
+ Contexts flush_contexts;
+ {
+ std::lock_guard locker(m_lock);
+ flush_contexts.swap(m_flush_complete_contexts);
+ }
+ finish_contexts(m_image_ctx.cct, flush_contexts, 0);
+ }
+}
+
+/* Returns true if the specified SyncPointLogEntry is considered flushed, and
+ * the log will be updated to reflect this. */
+template <typename I>
+bool AbstractWriteLog<I>::handle_flushed_sync_point(std::shared_ptr<SyncPointLogEntry> log_entry)
+{
+ ceph_assert(ceph_mutex_is_locked_by_me(m_lock));
+ ceph_assert(log_entry);
+
+ if ((log_entry->writes_flushed == log_entry->writes) &&
+ log_entry->completed && log_entry->prior_sync_point_flushed &&
+ log_entry->next_sync_point_entry) {
+ ldout(m_image_ctx.cct, 20) << "All writes flushed up to sync point="
+ << *log_entry << dendl;
+ log_entry->next_sync_point_entry->prior_sync_point_flushed = true;
+ /* Don't move the flushed sync gen num backwards. */
+ if (m_flushed_sync_gen < log_entry->ram_entry.sync_gen_number) {
+ m_flushed_sync_gen = log_entry->ram_entry.sync_gen_number;
+ }
+ m_async_op_tracker.start_op();
+ m_work_queue.queue(new LambdaContext(
+ [this, next = std::move(log_entry->next_sync_point_entry)](int r) {
+ bool handled_by_next;
+ {
+ std::lock_guard locker(m_lock);
+ handled_by_next = handle_flushed_sync_point(std::move(next));
+ }
+ if (!handled_by_next) {
+ persist_last_flushed_sync_gen();
+ }
+ m_async_op_tracker.finish_op();
+ }));
+ return true;
+ }
+ return false;
+}
+
+template <typename I>
+void AbstractWriteLog<I>::sync_point_writer_flushed(std::shared_ptr<SyncPointLogEntry> log_entry)
+{
+ ceph_assert(ceph_mutex_is_locked_by_me(m_lock));
+ ceph_assert(log_entry);
+ log_entry->writes_flushed++;
+
+ /* If this entry might be completely flushed, look closer */
+ if ((log_entry->writes_flushed == log_entry->writes) && log_entry->completed) {
+ ldout(m_image_ctx.cct, 15) << "All writes flushed for sync point="
+ << *log_entry << dendl;
+ handle_flushed_sync_point(log_entry);
+ }
+}
+
+/* Make a new sync point and flush the previous during initialization, when there may or may
+ * not be a previous sync point */
+template <typename I>
+void AbstractWriteLog<I>::init_flush_new_sync_point(DeferredContexts &later) {
+ ceph_assert(ceph_mutex_is_locked_by_me(m_lock));
+ ceph_assert(!m_initialized); /* Don't use this after init */
+
+ if (!m_current_sync_point) {
+ /* First sync point since start */
+ new_sync_point(later);
+ } else {
+ flush_new_sync_point(nullptr, later);
+ }
+}
+
+/**
+ * Begin a new sync point
+ */
+template <typename I>
+void AbstractWriteLog<I>::new_sync_point(DeferredContexts &later) {
+ CephContext *cct = m_image_ctx.cct;
+ std::shared_ptr<SyncPoint> old_sync_point = m_current_sync_point;
+ std::shared_ptr<SyncPoint> new_sync_point;
+ ldout(cct, 20) << dendl;
+
+ ceph_assert(ceph_mutex_is_locked_by_me(m_lock));
+
+ /* The first time this is called, if this is a newly created log,
+ * this makes the first sync gen number we'll use 1. On the first
+ * call for a re-opened log m_current_sync_gen will be the highest
+ * gen number from all the sync point entries found in the re-opened
+ * log, and this advances to the next sync gen number. */
+ ++m_current_sync_gen;
+
+ new_sync_point = std::make_shared<SyncPoint>(m_current_sync_gen, cct);
+ m_current_sync_point = new_sync_point;
+
+ /* If this log has been re-opened, old_sync_point will initially be
+ * nullptr, but m_current_sync_gen may not be zero. */
+ if (old_sync_point) {
+ new_sync_point->setup_earlier_sync_point(old_sync_point, m_last_op_sequence_num);
+ m_perfcounter->hinc(l_librbd_pwl_syncpoint_hist,
+ old_sync_point->log_entry->writes,
+ old_sync_point->log_entry->bytes);
+ /* This sync point will acquire no more sub-ops. Activation needs
+ * to acquire m_lock, so defer to later*/
+ later.add(new LambdaContext(
+ [old_sync_point](int r) {
+ old_sync_point->prior_persisted_gather_activate();
+ }));
+ }
+
+ new_sync_point->prior_persisted_gather_set_finisher();
+
+ if (old_sync_point) {
+ ldout(cct,6) << "new sync point = [" << *m_current_sync_point
+ << "], prior = [" << *old_sync_point << "]" << dendl;
+ } else {
+ ldout(cct,6) << "first sync point = [" << *m_current_sync_point
+ << "]" << dendl;
+ }
+}
+
+template <typename I>
+void AbstractWriteLog<I>::flush_new_sync_point(C_FlushRequestT *flush_req,
+ DeferredContexts &later) {
+ ceph_assert(ceph_mutex_is_locked_by_me(m_lock));
+
+ if (!flush_req) {
+ m_async_null_flush_finish++;
+ m_async_op_tracker.start_op();
+ Context *flush_ctx = new LambdaContext([this](int r) {
+ m_async_null_flush_finish--;
+ m_async_op_tracker.finish_op();
+ });
+ flush_req = make_flush_req(flush_ctx);
+ flush_req->internal = true;
+ }
+
+ /* Add a new sync point. */
+ new_sync_point(later);
+ std::shared_ptr<SyncPoint> to_append = m_current_sync_point->earlier_sync_point;
+ ceph_assert(to_append);
+
+ /* This flush request will append/persist the (now) previous sync point */
+ flush_req->to_append = to_append;
+
+ /* When the m_sync_point_persist Gather completes this sync point can be
+ * appended. The only sub for this Gather is the finisher Context for
+ * m_prior_log_entries_persisted, which records the result of the Gather in
+ * the sync point, and completes. TODO: Do we still need both of these
+ * Gathers?*/
+ Context * ctx = new LambdaContext([this, flush_req](int r) {
+ ldout(m_image_ctx.cct, 20) << "Flush req=" << flush_req
+ << " sync point =" << flush_req->to_append
+ << ". Ready to persist." << dendl;
+ alloc_and_dispatch_io_req(flush_req);
+ });
+ to_append->persist_gather_set_finisher(ctx);
+
+ /* The m_sync_point_persist Gather has all the subs it will ever have, and
+ * now has its finisher. If the sub is already complete, activation will
+ * complete the Gather. The finisher will acquire m_lock, so we'll activate
+ * this when we release m_lock.*/
+ later.add(new LambdaContext([to_append](int r) {
+ to_append->persist_gather_activate();
+ }));
+
+ /* The flush request completes when the sync point persists */
+ to_append->add_in_on_persisted_ctxs(flush_req);
+}
+
+template <typename I>
+void AbstractWriteLog<I>::flush_new_sync_point_if_needed(C_FlushRequestT *flush_req,
+ DeferredContexts &later) {
+ ceph_assert(ceph_mutex_is_locked_by_me(m_lock));
+
+ /* If there have been writes since the last sync point ... */
+ if (m_current_sync_point->log_entry->writes) {
+ flush_new_sync_point(flush_req, later);
+ } else {
+ /* There have been no writes to the current sync point. */
+ if (m_current_sync_point->earlier_sync_point) {
+ /* If previous sync point hasn't completed, complete this flush
+ * with the earlier sync point. No alloc or dispatch needed. */
+ m_current_sync_point->earlier_sync_point->on_sync_point_persisted.push_back(flush_req);
+ } else {
+ /* The previous sync point has already completed and been
+ * appended. The current sync point has no writes, so this flush
+ * has nothing to wait for. This flush completes now. */
+ later.add(flush_req);
+ }
+ }
+}
+
+/*
+ * RWL internal flush - will actually flush the RWL.
+ *
+ * User flushes should arrive at aio_flush(), and only flush prior
+ * writes to all log replicas.
+ *
+ * Librbd internal flushes will arrive at flush(invalidate=false,
+ * discard=false), and traverse the block guard to ensure in-flight writes are
+ * flushed.
+ */
+template <typename I>
+void AbstractWriteLog<I>::flush_dirty_entries(Context *on_finish) {
+ CephContext *cct = m_image_ctx.cct;
+ bool all_clean;
+ bool flushing;
+ bool stop_flushing;
+
+ {
+ std::unique_lock locker(m_lock);
+ flushing = (0 != m_flush_ops_in_flight);
+ all_clean = m_dirty_log_entries.empty();
+ stop_flushing = (m_shutting_down);
+ if (!m_cache_state->clean && all_clean && !flushing) {
+ m_cache_state->clean = true;
+ update_image_cache_state();
+ write_image_cache_state(locker);
+ }
+ }
+
+ if (!flushing && (all_clean || stop_flushing)) {
+ /* Complete without holding m_lock */
+ if (all_clean) {
+ ldout(cct, 20) << "no dirty entries" << dendl;
+ } else {
+ ldout(cct, 5) << "flush during shutdown suppressed" << dendl;
+ }
+ on_finish->complete(0);
+ } else {
+ if (all_clean) {
+ ldout(cct, 5) << "flush ops still in progress" << dendl;
+ } else {
+ ldout(cct, 20) << "dirty entries remain" << dendl;
+ }
+ std::lock_guard locker(m_lock);
+ /* on_finish can't be completed yet */
+ m_flush_complete_contexts.push_back(new LambdaContext(
+ [this, on_finish](int r) {
+ flush_dirty_entries(on_finish);
+ }));
+ wake_up();
+ }
+}
+
+template <typename I>
+void AbstractWriteLog<I>::internal_flush(bool invalidate, Context *on_finish) {
+ ldout(m_image_ctx.cct, 20) << "invalidate=" << invalidate << dendl;
+
+ if (m_perfcounter) {
+ if (invalidate) {
+ m_perfcounter->inc(l_librbd_pwl_invalidate_cache, 1);
+ } else {
+ m_perfcounter->inc(l_librbd_pwl_internal_flush, 1);
+ }
+ }
+
+ /* May be called even if initialization fails */
+ if (!m_initialized) {
+ ldout(m_image_ctx.cct, 05) << "never initialized" << dendl;
+ /* Deadlock if completed here */
+ m_image_ctx.op_work_queue->queue(on_finish, 0);
+ return;
+ }
+
+ /* Flush/invalidate must pass through block guard to ensure all layers of
+ * cache are consistently flush/invalidated. This ensures no in-flight write leaves
+ * some layers with valid regions, which may later produce inconsistent read
+ * results. */
+ GuardedRequestFunctionContext *guarded_ctx =
+ new GuardedRequestFunctionContext(
+ [this, on_finish, invalidate](GuardedRequestFunctionContext &guard_ctx) {
+ DeferredContexts on_exit;
+ ldout(m_image_ctx.cct, 20) << "cell=" << guard_ctx.cell << dendl;
+ ceph_assert(guard_ctx.cell);
+
+ Context *ctx = new LambdaContext(
+ [this, cell=guard_ctx.cell, invalidate, on_finish](int r) {
+ std::lock_guard locker(m_lock);
+ m_invalidating = false;
+ ldout(m_image_ctx.cct, 6) << "Done flush/invalidating (invalidate="
+ << invalidate << ")" << dendl;
+ if (m_log_entries.size()) {
+ ldout(m_image_ctx.cct, 1) << "m_log_entries.size()="
+ << m_log_entries.size()
+ << ", front()=" << *m_log_entries.front()
+ << dendl;
+ }
+ if (invalidate) {
+ ceph_assert(m_log_entries.size() == 0);
+ }
+ ceph_assert(m_dirty_log_entries.size() == 0);
+ m_image_ctx.op_work_queue->queue(on_finish, r);
+ release_guarded_request(cell);
+ });
+ ctx = new LambdaContext(
+ [this, ctx, invalidate](int r) {
+ Context *next_ctx = ctx;
+ ldout(m_image_ctx.cct, 6) << "flush_dirty_entries finished" << dendl;
+ if (r < 0) {
+ /* Override on_finish status with this error */
+ next_ctx = new LambdaContext([r, ctx](int _r) {
+ ctx->complete(r);
+ });
+ }
+ if (invalidate) {
+ {
+ std::lock_guard locker(m_lock);
+ ceph_assert(m_dirty_log_entries.size() == 0);
+ ceph_assert(!m_invalidating);
+ ldout(m_image_ctx.cct, 6) << "Invalidating" << dendl;
+ m_invalidating = true;
+ }
+ /* Discards all RWL entries */
+ while (retire_entries(MAX_ALLOC_PER_TRANSACTION)) { }
+ next_ctx->complete(0);
+ } else {
+ {
+ std::lock_guard locker(m_lock);
+ ceph_assert(m_dirty_log_entries.size() == 0);
+ ceph_assert(!m_invalidating);
+ }
+ m_image_writeback.aio_flush(io::FLUSH_SOURCE_WRITEBACK, next_ctx);
+ }
+ });
+ ctx = new LambdaContext(
+ [this, ctx](int r) {
+ flush_dirty_entries(ctx);
+ });
+ std::lock_guard locker(m_lock);
+ /* Even if we're throwing everything away, but we want the last entry to
+ * be a sync point so we can cleanly resume.
+ *
+ * Also, the blockguard only guarantees the replication of this op
+ * can't overlap with prior ops. It doesn't guarantee those are all
+ * completed and eligible for flush & retire, which we require here.
+ */
+ auto flush_req = make_flush_req(ctx);
+ flush_new_sync_point_if_needed(flush_req, on_exit);
+ });
+ detain_guarded_request(nullptr, guarded_ctx, true);
+}
+
+template <typename I>
+void AbstractWriteLog<I>::add_into_log_map(GenericWriteLogEntries &log_entries,
+ C_BlockIORequestT *req) {
+ req->copy_cache();
+ m_blocks_to_log_entries.add_log_entries(log_entries);
+}
+
+template <typename I>
+bool AbstractWriteLog<I>::can_retire_entry(std::shared_ptr<GenericLogEntry> log_entry) {
+ CephContext *cct = m_image_ctx.cct;
+
+ ldout(cct, 20) << dendl;
+ ceph_assert(ceph_mutex_is_locked_by_me(m_lock));
+ ceph_assert(log_entry);
+ return log_entry->can_retire();
+}
+
+template <typename I>
+void AbstractWriteLog<I>::check_image_cache_state_clean() {
+ ceph_assert(m_deferred_ios.empty());
+ ceph_assert(m_ops_to_append.empty());
+ ceph_assert(m_async_flush_ops == 0);
+ ceph_assert(m_async_append_ops == 0);
+ ceph_assert(m_dirty_log_entries.empty());
+ ceph_assert(m_ops_to_flush.empty());
+ ceph_assert(m_flush_ops_in_flight == 0);
+ ceph_assert(m_flush_bytes_in_flight == 0);
+ ceph_assert(m_bytes_dirty == 0);
+ ceph_assert(m_work_queue.empty());
+}
+
+} // namespace pwl
+} // namespace cache
+} // namespace librbd
+
+template class librbd::cache::pwl::AbstractWriteLog<librbd::ImageCtx>;