summaryrefslogtreecommitdiffstats
path: root/src/librbd/cache/pwl/ssd
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--src/librbd/cache/pwl/ssd/Builder.h108
-rw-r--r--src/librbd/cache/pwl/ssd/LogEntry.cc63
-rw-r--r--src/librbd/cache/pwl/ssd/LogEntry.h75
-rw-r--r--src/librbd/cache/pwl/ssd/LogOperation.cc36
-rw-r--r--src/librbd/cache/pwl/ssd/LogOperation.h35
-rw-r--r--src/librbd/cache/pwl/ssd/ReadRequest.cc92
-rw-r--r--src/librbd/cache/pwl/ssd/ReadRequest.h34
-rw-r--r--src/librbd/cache/pwl/ssd/Request.cc63
-rw-r--r--src/librbd/cache/pwl/ssd/Request.h92
-rw-r--r--src/librbd/cache/pwl/ssd/Types.h51
-rw-r--r--src/librbd/cache/pwl/ssd/WriteLog.cc1158
-rw-r--r--src/librbd/cache/pwl/ssd/WriteLog.h156
12 files changed, 1963 insertions, 0 deletions
diff --git a/src/librbd/cache/pwl/ssd/Builder.h b/src/librbd/cache/pwl/ssd/Builder.h
new file mode 100644
index 000000000..07b3fb869
--- /dev/null
+++ b/src/librbd/cache/pwl/ssd/Builder.h
@@ -0,0 +1,108 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#ifndef CEPH_LIBRBD_CACHE_PWL_SSD_BUILDER_H
+#define CEPH_LIBRBD_CACHE_PWL_SSD_BUILDER_H
+
+#include <iostream>
+#include "LogEntry.h"
+#include "ReadRequest.h"
+#include "Request.h"
+#include "LogOperation.h"
+
+#include "librbd/cache/ImageWriteback.h"
+#include "librbd/cache/pwl/Builder.h"
+
+namespace librbd {
+namespace cache {
+namespace pwl {
+namespace ssd {
+
+template <typename T>
+class Builder : public pwl::Builder<T> {
+public:
+ std::shared_ptr<pwl::WriteLogEntry> create_write_log_entry(
+ uint64_t image_offset_bytes, uint64_t write_bytes) override {
+ return std::make_shared<WriteLogEntry>(image_offset_bytes, write_bytes);
+ }
+ std::shared_ptr<pwl::WriteLogEntry> create_write_log_entry(
+ std::shared_ptr<SyncPointLogEntry> sync_point_entry,
+ uint64_t image_offset_bytes, uint64_t write_bytes) override {
+ return std::make_shared<WriteLogEntry>(
+ sync_point_entry, image_offset_bytes, write_bytes);
+ }
+ std::shared_ptr<pwl::WriteLogEntry> create_writesame_log_entry(
+ uint64_t image_offset_bytes, uint64_t write_bytes,
+ uint32_t data_length) override {
+ return std::make_shared<WriteSameLogEntry>(
+ image_offset_bytes, write_bytes, data_length);
+ }
+ std::shared_ptr<pwl::WriteLogEntry> create_writesame_log_entry(
+ std::shared_ptr<SyncPointLogEntry> sync_point_entry,
+ uint64_t image_offset_bytes, uint64_t write_bytes,
+ uint32_t data_length) override {
+ return std::make_shared<WriteSameLogEntry>(
+ sync_point_entry, image_offset_bytes, write_bytes, data_length);
+ }
+ pwl::C_WriteRequest<T> *create_write_request(
+ T &pwl, utime_t arrived, io::Extents &&image_extents,
+ bufferlist&& bl, const int fadvise_flags, ceph::mutex &lock,
+ PerfCounters *perfcounter, Context *user_req) override {
+ return new C_WriteRequest<T>(
+ pwl, arrived, std::move(image_extents), std::move(bl),
+ fadvise_flags, lock, perfcounter, user_req);
+ }
+ pwl::C_WriteSameRequest<T> *create_writesame_request(
+ T &pwl, utime_t arrived, io::Extents &&image_extents,
+ bufferlist&& bl, const int fadvise_flags, ceph::mutex &lock,
+ PerfCounters *perfcounter, Context *user_req) override {
+ return new C_WriteSameRequest<T>(
+ pwl, arrived, std::move(image_extents), std::move(bl),
+ fadvise_flags, lock, perfcounter, user_req);
+ }
+ pwl::C_WriteRequest<T> *create_comp_and_write_request(
+ T &pwl, utime_t arrived, io::Extents &&image_extents,
+ bufferlist&& cmp_bl, bufferlist&& bl, uint64_t *mismatch_offset,
+ const int fadvise_flags, ceph::mutex &lock,
+ PerfCounters *perfcounter, Context *user_req) override {
+ return new C_CompAndWriteRequest<T>(
+ pwl, arrived, std::move(image_extents), std::move(cmp_bl),
+ std::move(bl), mismatch_offset, fadvise_flags,
+ lock, perfcounter, user_req);
+ }
+ std::shared_ptr<pwl::WriteLogOperation> create_write_log_operation(
+ WriteLogOperationSet &set, uint64_t image_offset_bytes,
+ uint64_t write_bytes, CephContext *cct,
+ std::shared_ptr<pwl::WriteLogEntry> write_log_entry) {
+ return std::make_shared<WriteLogOperation>(
+ set, image_offset_bytes, write_bytes, cct, write_log_entry);
+ }
+ std::shared_ptr<pwl::WriteLogOperation> create_write_log_operation(
+ WriteLogOperationSet &set, uint64_t image_offset_bytes,
+ uint64_t write_bytes, uint32_t data_len, CephContext *cct,
+ std::shared_ptr<pwl::WriteLogEntry> writesame_log_entry) {
+ return std::make_shared<WriteLogOperation>(
+ set, image_offset_bytes, write_bytes, data_len, cct,
+ writesame_log_entry);
+ }
+ std::shared_ptr<pwl::DiscardLogOperation> create_discard_log_operation(
+ std::shared_ptr<SyncPoint> sync_point, uint64_t image_offset_bytes,
+ uint64_t write_bytes, uint32_t discard_granularity_bytes,
+ utime_t dispatch_time, PerfCounters *perfcounter, CephContext *cct) {
+ return std::make_shared<DiscardLogOperation>(
+ sync_point, image_offset_bytes, write_bytes, discard_granularity_bytes,
+ dispatch_time, perfcounter, cct);
+ }
+ C_ReadRequest *create_read_request(CephContext *cct, utime_t arrived,
+ PerfCounters *perfcounter, ceph::bufferlist *bl, Context *on_finish) {
+ return new C_ReadRequest(cct, arrived, perfcounter, bl, on_finish);
+ }
+};
+
+
+} // namespace ssd
+} // namespace pwl
+} // namespace cache
+} // namespace librbd
+
+#endif // CEPH_LIBRBD_CACHE_PWL_SSD_BUILDER_H
diff --git a/src/librbd/cache/pwl/ssd/LogEntry.cc b/src/librbd/cache/pwl/ssd/LogEntry.cc
new file mode 100644
index 000000000..0e6edd87b
--- /dev/null
+++ b/src/librbd/cache/pwl/ssd/LogEntry.cc
@@ -0,0 +1,63 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include "librbd/cache/ImageWriteback.h"
+#include "librbd/cache/pwl/ssd/LogEntry.h"
+
+#define dout_subsys ceph_subsys_rbd_pwl
+#undef dout_prefix
+#define dout_prefix *_dout << "librbd::cache::pwl::ssd::WriteLogEntry: " \
+ << this << " " << __func__ << ": "
+
+namespace librbd {
+namespace cache {
+namespace pwl {
+namespace ssd {
+
+void WriteLogEntry::init_cache_bl(
+ bufferlist &src_bl, uint64_t off, uint64_t len) {
+ cache_bl.clear();
+ cache_bl.substr_of(src_bl, off, len);
+}
+
+buffer::list& WriteLogEntry::get_cache_bl() {
+ return cache_bl;
+}
+
+void WriteLogEntry::copy_cache_bl(bufferlist *out) {
+ std::lock_guard locker(m_entry_bl_lock);
+ *out = cache_bl;
+}
+
+void WriteLogEntry::remove_cache_bl() {
+ std::lock_guard locker(m_entry_bl_lock);
+ cache_bl.clear();
+}
+
+unsigned int WriteLogEntry::get_aligned_data_size() const {
+ if (cache_bl.length()) {
+ return round_up_to(cache_bl.length(), MIN_WRITE_ALLOC_SSD_SIZE);
+ }
+ return round_up_to(write_bytes(), MIN_WRITE_ALLOC_SSD_SIZE);
+}
+
+void WriteLogEntry::writeback_bl(
+ librbd::cache::ImageWritebackInterface &image_writeback,
+ Context *ctx, ceph::bufferlist&& bl) {
+ image_writeback.aio_write({{ram_entry.image_offset_bytes,
+ ram_entry.write_bytes}},
+ std::move(bl), 0, ctx);
+}
+
+void WriteSameLogEntry::writeback_bl(
+ librbd::cache::ImageWritebackInterface &image_writeback,
+ Context *ctx, ceph::bufferlist &&bl) {
+ image_writeback.aio_writesame(ram_entry.image_offset_bytes,
+ ram_entry.write_bytes,
+ std::move(bl), 0, ctx);
+}
+
+} // namespace ssd
+} // namespace pwl
+} // namespace cache
+} // namespace librbd
diff --git a/src/librbd/cache/pwl/ssd/LogEntry.h b/src/librbd/cache/pwl/ssd/LogEntry.h
new file mode 100644
index 000000000..8e26f661f
--- /dev/null
+++ b/src/librbd/cache/pwl/ssd/LogEntry.h
@@ -0,0 +1,75 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// // vim: ts=8 sw=2 smarttab
+
+#ifndef CEPH_LIBRBD_CACHE_PWL_SSD_LOG_ENTRY_H
+#define CEPH_LIBRBD_CACHE_PWL_SSD_LOG_ENTRY_H
+
+#include "librbd/cache/pwl/LogEntry.h"
+
+namespace librbd {
+namespace cache {
+class ImageWritebackInterface;
+namespace pwl {
+namespace ssd {
+
+class WriteLogEntry : public pwl::WriteLogEntry {
+public:
+ WriteLogEntry(
+ std::shared_ptr<SyncPointLogEntry> sync_point_entry,
+ uint64_t image_offset_bytes, uint64_t write_bytes)
+ : pwl::WriteLogEntry(sync_point_entry, image_offset_bytes, write_bytes) {}
+ WriteLogEntry(
+ uint64_t image_offset_bytes, uint64_t write_bytes)
+ : pwl::WriteLogEntry(image_offset_bytes, write_bytes) {}
+ WriteLogEntry(
+ std::shared_ptr<SyncPointLogEntry> sync_point_entry,
+ uint64_t image_offset_bytes, uint64_t write_bytes,
+ uint32_t data_length)
+ : pwl::WriteLogEntry(sync_point_entry, image_offset_bytes,
+ write_bytes, data_length) {}
+ WriteLogEntry(
+ uint64_t image_offset_bytes, uint64_t write_bytes,
+ uint32_t data_length)
+ : pwl::WriteLogEntry(image_offset_bytes, write_bytes, data_length) {}
+ ~WriteLogEntry() {}
+ WriteLogEntry(const WriteLogEntry&) = delete;
+ WriteLogEntry &operator=(const WriteLogEntry&) = delete;
+ void writeback_bl(librbd::cache::ImageWritebackInterface &image_writeback,
+ Context *ctx, ceph::bufferlist &&bl) override;
+ void init_cache_bl(bufferlist &src_bl, uint64_t off, uint64_t len) override;
+ buffer::list &get_cache_bl() override;
+ void copy_cache_bl(bufferlist *out) override;
+ void remove_cache_bl() override;
+ unsigned int get_aligned_data_size() const override;
+ void inc_bl_refs() { bl_refs++; };
+ void dec_bl_refs() { bl_refs--; };
+ unsigned int reader_count() const override {
+ return bl_refs;
+ }
+};
+
+class WriteSameLogEntry : public WriteLogEntry {
+public:
+ WriteSameLogEntry(
+ std::shared_ptr<SyncPointLogEntry> sync_point_entry,
+ uint64_t image_offset_bytes, uint64_t write_bytes,
+ uint32_t data_length)
+ : WriteLogEntry(sync_point_entry, image_offset_bytes,
+ write_bytes, data_length) {}
+ WriteSameLogEntry(
+ uint64_t image_offset_bytes, uint64_t write_bytes,
+ uint32_t data_length)
+ : WriteLogEntry(image_offset_bytes, write_bytes, data_length) {}
+ ~WriteSameLogEntry() {}
+ WriteSameLogEntry(const WriteSameLogEntry&) = delete;
+ WriteSameLogEntry &operator=(const WriteSameLogEntry&) = delete;
+ void writeback_bl(librbd::cache::ImageWritebackInterface &image_writeback,
+ Context *ctx, ceph::bufferlist &&bl) override;
+};
+
+} // namespace ssd
+} // namespace pwl
+} // namespace cache
+} // namespace librbd
+
+#endif // CEPH_LIBRBD_CACHE_PWL_SSD_LOG_ENTRY_H
diff --git a/src/librbd/cache/pwl/ssd/LogOperation.cc b/src/librbd/cache/pwl/ssd/LogOperation.cc
new file mode 100644
index 000000000..c8080e37d
--- /dev/null
+++ b/src/librbd/cache/pwl/ssd/LogOperation.cc
@@ -0,0 +1,36 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include "LogOperation.h"
+
+#define dout_subsys ceph_subsys_rbd_pwl
+#undef dout_prefix
+#define dout_prefix *_dout << "librbd::cache::pwl::ssd::LogOperation: " \
+ << this << " " << __func__ << ": "
+
+namespace librbd {
+namespace cache {
+namespace pwl {
+namespace ssd {
+
+void DiscardLogOperation::init_op(
+ uint64_t current_sync_gen, bool persist_on_flush,
+ uint64_t last_op_sequence_num, Context *write_persist,
+ Context *write_append) {
+ log_entry->init(current_sync_gen, persist_on_flush, last_op_sequence_num);
+ if (persist_on_flush) {
+ this->on_write_append = new LambdaContext(
+ [write_persist, write_append] (int r) {
+ write_append->complete(r);
+ write_persist->complete(r);
+ });
+ } else {
+ this->on_write_append = write_append;
+ this->on_write_persist = write_persist;
+ }
+}
+
+} // namespace ssd
+} // namespace pwl
+} // namespace cache
+} // namespace librbd
diff --git a/src/librbd/cache/pwl/ssd/LogOperation.h b/src/librbd/cache/pwl/ssd/LogOperation.h
new file mode 100644
index 000000000..dbc89aa73
--- /dev/null
+++ b/src/librbd/cache/pwl/ssd/LogOperation.h
@@ -0,0 +1,35 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#ifndef CEPH_LIBRBD_CACHE_PWL_SSD_LOG_OPERATION_H
+#define CEPH_LIBRBD_CACHE_PWL_SSD_LOG_OPERATION_H
+
+#include "librbd/cache/pwl/LogOperation.h"
+
+
+namespace librbd {
+namespace cache {
+namespace pwl {
+namespace ssd {
+
+class DiscardLogOperation : public pwl::DiscardLogOperation {
+public:
+ DiscardLogOperation(
+ std::shared_ptr<SyncPoint> sync_point, uint64_t image_offset_bytes,
+ uint64_t write_bytes, uint32_t discard_granularity_bytes,
+ utime_t dispatch_time, PerfCounters *perfcounter, CephContext *cct)
+ : pwl::DiscardLogOperation(sync_point, image_offset_bytes, write_bytes,
+ discard_granularity_bytes, dispatch_time,
+ perfcounter, cct) {}
+ void init_op(
+ uint64_t current_sync_gen, bool persist_on_flush,
+ uint64_t last_op_sequence_num, Context *write_persist,
+ Context *write_append) override;
+};
+
+} // namespace ssd
+} // namespace pwl
+} // namespace cache
+} // namespace librbd
+
+#endif // CEPH_LIBRBD_CACHE_PWL_SSD_LOG_OPERATION_H
diff --git a/src/librbd/cache/pwl/ssd/ReadRequest.cc b/src/librbd/cache/pwl/ssd/ReadRequest.cc
new file mode 100644
index 000000000..1a80a8d8c
--- /dev/null
+++ b/src/librbd/cache/pwl/ssd/ReadRequest.cc
@@ -0,0 +1,92 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include "ReadRequest.h"
+
+#define dout_subsys ceph_subsys_rbd_pwl
+#undef dout_prefix
+#define dout_prefix *_dout << "librbd::cache::pwl::ssd::ReadRequest: " << this << " " \
+ << __func__ << ": "
+
+namespace librbd {
+namespace cache {
+namespace pwl {
+namespace ssd {
+
+void C_ReadRequest::finish(int r) {
+ ldout(m_cct, 20) << "(" << get_name() << "): r=" << r << dendl;
+ int hits = 0;
+ int misses = 0;
+ int hit_bytes = 0;
+ int miss_bytes = 0;
+ if (r >= 0) {
+ /*
+ * At this point the miss read has completed. We'll iterate through
+ * m_read_extents and produce *m_out_bl by assembling pieces of m_miss_bl
+ * and the individual hit extent bufs in the read extents that represent
+ * hits.
+ */
+ uint64_t miss_bl_offset = 0;
+ for (auto extent : read_extents) {
+ if (extent->m_bl.length()) {
+ /* This was a hit */
+ bufferlist data_bl;
+ if (extent->writesame) {
+ int data_len = extent->m_bl.length();
+ int read_buffer_offset = extent->truncate_offset;
+ if (extent->need_to_truncate && extent->truncate_offset >= data_len) {
+ read_buffer_offset = (extent->truncate_offset) % data_len;
+ }
+ // build data and truncate
+ bufferlist temp_bl;
+ uint64_t total_left_bytes = read_buffer_offset + extent->second;
+ while (total_left_bytes > 0) {
+ temp_bl.append(extent->m_bl);
+ total_left_bytes = total_left_bytes - data_len;
+ }
+ data_bl.substr_of(temp_bl, read_buffer_offset, extent->second);
+ m_out_bl->claim_append(data_bl);
+ } else if (extent->need_to_truncate) {
+ assert(extent->m_bl.length() >= extent->truncate_offset + extent->second);
+ data_bl.substr_of(extent->m_bl, extent->truncate_offset, extent->second);
+ m_out_bl->claim_append(data_bl);
+ } else {
+ assert(extent->second == extent->m_bl.length());
+ m_out_bl->claim_append(extent->m_bl);
+ }
+ ++hits;
+ hit_bytes += extent->second;
+ } else {
+ /* This was a miss. */
+ ++misses;
+ miss_bytes += extent->second;
+ bufferlist miss_extent_bl;
+ miss_extent_bl.substr_of(miss_bl, miss_bl_offset, extent->second);
+ /* Add this read miss bufferlist to the output bufferlist */
+ m_out_bl->claim_append(miss_extent_bl);
+ /* Consume these bytes in the read miss bufferlist */
+ miss_bl_offset += extent->second;
+ }
+ }
+ }
+ ldout(m_cct, 20) << "(" << get_name() << "): r=" << r << " bl=" << *m_out_bl << dendl;
+ utime_t now = ceph_clock_now();
+ ceph_assert((int)m_out_bl->length() == hit_bytes + miss_bytes);
+ m_on_finish->complete(r);
+ m_perfcounter->inc(l_librbd_pwl_rd_bytes, hit_bytes + miss_bytes);
+ m_perfcounter->inc(l_librbd_pwl_rd_hit_bytes, hit_bytes);
+ m_perfcounter->tinc(l_librbd_pwl_rd_latency, now - m_arrived_time);
+ if (!misses) {
+ m_perfcounter->inc(l_librbd_pwl_rd_hit_req, 1);
+ m_perfcounter->tinc(l_librbd_pwl_rd_hit_latency, now - m_arrived_time);
+ } else {
+ if (hits) {
+ m_perfcounter->inc(l_librbd_pwl_rd_part_hit_req, 1);
+ }
+ }
+}
+
+} // namespace ssd
+} // namespace pwl
+} // namespace cache
+} // namespace librbd
diff --git a/src/librbd/cache/pwl/ssd/ReadRequest.h b/src/librbd/cache/pwl/ssd/ReadRequest.h
new file mode 100644
index 000000000..345c4aa65
--- /dev/null
+++ b/src/librbd/cache/pwl/ssd/ReadRequest.h
@@ -0,0 +1,34 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#ifndef CEPH_LIBRBD_CACHE_PWL_SSD_READ_REQUEST_H
+#define CEPH_LIBRBD_CACHE_PWL_SSD_READ_REQUEST_H
+
+#include "librbd/cache/pwl/ReadRequest.h"
+
+namespace librbd {
+namespace cache {
+namespace pwl {
+namespace ssd {
+
+typedef std::vector<pwl::ImageExtentBuf> ImageExtentBufs;
+
+class C_ReadRequest : public pwl::C_ReadRequest {
+protected:
+ using pwl::C_ReadRequest::m_cct;
+ using pwl::C_ReadRequest::m_on_finish;
+ using pwl::C_ReadRequest::m_out_bl;
+ using pwl::C_ReadRequest::m_arrived_time;
+ using pwl::C_ReadRequest::m_perfcounter;
+public:
+ C_ReadRequest(CephContext *cct, utime_t arrived, PerfCounters *perfcounter, bufferlist *out_bl, Context *on_finish)
+ : pwl::C_ReadRequest(cct, arrived, perfcounter, out_bl, on_finish) {}
+ void finish(int r) override;
+};
+
+} // namespace ssd
+} // namespace pwl
+} // namespace cache
+} // namespace librbd
+
+#endif // CEPH_LIBRBD_CACHE_PWL_SSD_READ_REQUEST_H
diff --git a/src/librbd/cache/pwl/ssd/Request.cc b/src/librbd/cache/pwl/ssd/Request.cc
new file mode 100644
index 000000000..e92e547c8
--- /dev/null
+++ b/src/librbd/cache/pwl/ssd/Request.cc
@@ -0,0 +1,63 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include "Request.h"
+
+#define dout_subsys ceph_subsys_rbd_pwl
+#undef dout_prefix
+#define dout_prefix *_dout << "librbd::cache::pwl::ssd::Request: " << this << " " \
+ << __func__ << ": "
+
+namespace librbd {
+namespace cache {
+namespace pwl {
+namespace ssd {
+
+template <typename T>
+void C_WriteRequest<T>::setup_buffer_resources(
+ uint64_t *bytes_cached, uint64_t *bytes_dirtied, uint64_t *bytes_allocated,
+ uint64_t *number_lanes, uint64_t *number_log_entries,
+ uint64_t *number_unpublished_reserves) {
+
+ *bytes_cached = 0;
+ *bytes_allocated = 0;
+ *number_log_entries = this->image_extents.size();
+
+ for (auto &extent : this->image_extents) {
+ *bytes_cached += extent.second;
+ *bytes_allocated += round_up_to(extent.second, MIN_WRITE_ALLOC_SSD_SIZE);
+ }
+ *bytes_dirtied = *bytes_cached;
+}
+
+template <typename T>
+std::ostream &operator<<(std::ostream &os,
+ const C_CompAndWriteRequest<T> &req) {
+ os << (C_WriteRequest<T>&)req
+ << "cmp_bl=" << req.cmp_bl << ", "
+ << "read_bl=" << req.read_bl << ", "
+ << "compare_succeeded=" << req.compare_succeeded << ", "
+ << "mismatch_offset=" << req.mismatch_offset;
+ return os;
+}
+
+template <typename T>
+void C_WriteSameRequest<T>::setup_buffer_resources(
+ uint64_t *bytes_cached, uint64_t *bytes_dirtied, uint64_t *bytes_allocated,
+ uint64_t *number_lanes, uint64_t *number_log_entries,
+ uint64_t *number_unpublished_reserves) {
+ ceph_assert(this->image_extents.size() == 1);
+ *number_log_entries = 1;
+ *bytes_dirtied = this->image_extents[0].second;
+ *bytes_cached = this->bl.length();
+ *bytes_allocated = round_up_to(*bytes_cached, MIN_WRITE_ALLOC_SSD_SIZE);
+}
+
+} // namespace ssd
+} // namespace pwl
+} // namespace cache
+} // namespace librbd
+
+template class librbd::cache::pwl::ssd::C_WriteRequest<librbd::cache::pwl::AbstractWriteLog<librbd::ImageCtx> >;
+template class librbd::cache::pwl::ssd::C_WriteSameRequest<librbd::cache::pwl::AbstractWriteLog<librbd::ImageCtx> >;
+template class librbd::cache::pwl::ssd::C_CompAndWriteRequest<librbd::cache::pwl::AbstractWriteLog<librbd::ImageCtx> >;
diff --git a/src/librbd/cache/pwl/ssd/Request.h b/src/librbd/cache/pwl/ssd/Request.h
new file mode 100644
index 000000000..9bb3e85b9
--- /dev/null
+++ b/src/librbd/cache/pwl/ssd/Request.h
@@ -0,0 +1,92 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#ifndef CEPH_LIBRBD_CACHE_SSD_REQUEST_H
+#define CEPH_LIBRBD_CACHE_SSD_REQUEST_H
+
+#include "librbd/cache/pwl/Request.h"
+
+namespace librbd {
+class BlockGuardCell;
+
+namespace cache {
+namespace pwl {
+
+template<typename T>
+class AbstractWriteLog;
+
+namespace ssd {
+
+template <typename T>
+class C_WriteRequest : public pwl::C_WriteRequest<T> {
+public:
+ C_WriteRequest(
+ T &pwl, const utime_t arrived, io::Extents &&image_extents,
+ bufferlist&& cmp_bl, bufferlist&& bl, uint64_t *mismatch_offset,
+ const int fadvise_flags, ceph::mutex &lock,
+ PerfCounters *perfcounter, Context *user_req)
+ : pwl::C_WriteRequest<T>(
+ pwl, arrived, std::move(image_extents), std::move(cmp_bl),
+ std::move(bl), mismatch_offset, fadvise_flags,
+ lock, perfcounter, user_req) {}
+
+ C_WriteRequest(
+ T &pwl, const utime_t arrived, io::Extents &&image_extents,
+ bufferlist&& bl, const int fadvise_flags, ceph::mutex &lock,
+ PerfCounters *perfcounter, Context *user_req)
+ : pwl::C_WriteRequest<T>(
+ pwl, arrived, std::move(image_extents), std::move(bl),
+ fadvise_flags, lock, perfcounter, user_req) {}
+protected:
+ void setup_buffer_resources(
+ uint64_t *bytes_cached, uint64_t *bytes_dirtied,
+ uint64_t *bytes_allocated, uint64_t *number_lanes,
+ uint64_t *number_log_entries,
+ uint64_t *number_unpublished_reserves) override;
+};
+
+template <typename T>
+class C_CompAndWriteRequest : public C_WriteRequest<T> {
+public:
+ C_CompAndWriteRequest(
+ T &pwl, const utime_t arrived, io::Extents &&image_extents,
+ bufferlist&& cmp_bl, bufferlist&& bl, uint64_t *mismatch_offset,
+ const int fadvise_flags, ceph::mutex &lock,
+ PerfCounters *perfcounter, Context *user_req)
+ : C_WriteRequest<T>(
+ pwl, arrived, std::move(image_extents), std::move(cmp_bl),
+ std::move(bl), mismatch_offset,fadvise_flags,
+ lock, perfcounter, user_req) {}
+
+ const char *get_name() const override {
+ return "C_CompAndWriteRequest";
+ }
+ template <typename U>
+ friend std::ostream &operator<<(std::ostream &os,
+ const C_CompAndWriteRequest<U> &req);
+};
+
+template <typename T>
+class C_WriteSameRequest : public pwl::C_WriteSameRequest<T> {
+public:
+ C_WriteSameRequest(
+ T &pwl, const utime_t arrived, io::Extents &&image_extents,
+ bufferlist&& bl, const int fadvise_flags, ceph::mutex &lock,
+ PerfCounters *perfcounter, Context *user_req)
+ : pwl::C_WriteSameRequest<T>(
+ pwl, arrived, std::move(image_extents), std::move(bl), fadvise_flags,
+ lock, perfcounter, user_req) {}
+
+ void setup_buffer_resources(
+ uint64_t *bytes_cached, uint64_t *bytes_dirtied,
+ uint64_t *bytes_allocated, uint64_t *number_lanes,
+ uint64_t *number_log_entries,
+ uint64_t *number_unpublished_reserves) override;
+};
+
+} // namespace ssd
+} // namespace pwl
+} // namespace cache
+} // namespace librbd
+
+#endif // CEPH_LIBRBD_CACHE_SSD_REQUEST_H
diff --git a/src/librbd/cache/pwl/ssd/Types.h b/src/librbd/cache/pwl/ssd/Types.h
new file mode 100644
index 000000000..3ebad1fd9
--- /dev/null
+++ b/src/librbd/cache/pwl/ssd/Types.h
@@ -0,0 +1,51 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#ifndef CEPH_LIBRBD_CACHE_SSD_TYPES_H
+#define CEPH_LIBRBD_CACHE_SSD_TYPES_H
+
+#include "acconfig.h"
+
+#include "librbd/io/Types.h"
+#include "librbd/cache/pwl/Types.h"
+
+namespace librbd {
+namespace cache {
+namespace pwl {
+namespace ssd {
+
+struct SuperBlock{
+ WriteLogPoolRoot root;
+
+ DENC(SuperBlock, v, p) {
+ DENC_START(1, 1, p);
+ denc(v.root, p);
+ DENC_FINISH(p);
+ }
+
+ void dump(Formatter *f) const {
+ f->dump_object("super", root);
+ }
+
+ static void generate_test_instances(list<SuperBlock*>& ls) {
+ ls.push_back(new SuperBlock());
+ ls.push_back(new SuperBlock);
+ ls.back()->root.layout_version = 3;
+ ls.back()->root.cur_sync_gen = 1;
+ ls.back()->root.pool_size = 10737418240;
+ ls.back()->root.flushed_sync_gen = 1;
+ ls.back()->root.block_size = 4096;
+ ls.back()->root.num_log_entries = 0;
+ ls.back()->root.first_free_entry = 30601;
+ ls.back()->root.first_valid_entry = 2;
+ }
+};
+
+} // namespace ssd
+} // namespace pwl
+} // namespace cache
+} // namespace librbd
+
+WRITE_CLASS_DENC(librbd::cache::pwl::ssd::SuperBlock)
+
+#endif // CEPH_LIBRBD_CACHE_SSD_TYPES_H
diff --git a/src/librbd/cache/pwl/ssd/WriteLog.cc b/src/librbd/cache/pwl/ssd/WriteLog.cc
new file mode 100644
index 000000000..00626506a
--- /dev/null
+++ b/src/librbd/cache/pwl/ssd/WriteLog.cc
@@ -0,0 +1,1158 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include "WriteLog.h"
+#include "include/buffer.h"
+#include "include/Context.h"
+#include "include/ceph_assert.h"
+#include "common/deleter.h"
+#include "common/dout.h"
+#include "common/environment.h"
+#include "common/errno.h"
+#include "common/WorkQueue.h"
+#include "common/Timer.h"
+#include "common/perf_counters.h"
+#include "librbd/ImageCtx.h"
+#include "librbd/asio/ContextWQ.h"
+#include "librbd/cache/pwl/ImageCacheState.h"
+#include "librbd/cache/pwl/LogEntry.h"
+#include <map>
+#include <vector>
+
+#undef dout_subsys
+#define dout_subsys ceph_subsys_rbd_pwl
+#undef dout_prefix
+#define dout_prefix *_dout << "librbd::cache::pwl::ssd::WriteLog: " \
+ << this << " " << __func__ << ": "
+
+namespace librbd {
+namespace cache {
+namespace pwl {
+namespace ssd {
+
+using namespace librbd::cache::pwl;
+
+static bool is_valid_pool_root(const WriteLogPoolRoot& root) {
+ return root.pool_size % MIN_WRITE_ALLOC_SSD_SIZE == 0 &&
+ root.first_valid_entry >= DATA_RING_BUFFER_OFFSET &&
+ root.first_valid_entry < root.pool_size &&
+ root.first_valid_entry % MIN_WRITE_ALLOC_SSD_SIZE == 0 &&
+ root.first_free_entry >= DATA_RING_BUFFER_OFFSET &&
+ root.first_free_entry < root.pool_size &&
+ root.first_free_entry % MIN_WRITE_ALLOC_SSD_SIZE == 0;
+}
+
+template <typename I>
+Builder<AbstractWriteLog<I>>* WriteLog<I>::create_builder() {
+ m_builderobj = new Builder<This>();
+ return m_builderobj;
+}
+
+template <typename I>
+WriteLog<I>::WriteLog(
+ I &image_ctx, librbd::cache::pwl::ImageCacheState<I>* cache_state,
+ cache::ImageWritebackInterface& image_writeback,
+ plugin::Api<I>& plugin_api)
+ : AbstractWriteLog<I>(image_ctx, cache_state, create_builder(),
+ image_writeback, plugin_api)
+{
+}
+
+template <typename I>
+WriteLog<I>::~WriteLog() {
+ delete m_builderobj;
+}
+
+template <typename I>
+void WriteLog<I>::collect_read_extents(
+ uint64_t read_buffer_offset, LogMapEntry<GenericWriteLogEntry> map_entry,
+ std::vector<std::shared_ptr<GenericWriteLogEntry>> &log_entries_to_read,
+ std::vector<bufferlist*> &bls_to_read,
+ uint64_t entry_hit_length, Extent hit_extent,
+ pwl::C_ReadRequest *read_ctx) {
+ // Make a bl for this hit extent. This will add references to the
+ // write_entry->cache_bl */
+ ldout(m_image_ctx.cct, 5) << dendl;
+ auto write_entry = static_pointer_cast<WriteLogEntry>(map_entry.log_entry);
+ buffer::list hit_bl;
+ write_entry->copy_cache_bl(&hit_bl);
+ bool writesame = write_entry->is_writesame_entry();
+ auto hit_extent_buf = std::make_shared<ImageExtentBuf>(
+ hit_extent, hit_bl, true, read_buffer_offset, writesame);
+ read_ctx->read_extents.push_back(hit_extent_buf);
+
+ if (!hit_bl.length()) {
+ ldout(m_image_ctx.cct, 5) << "didn't hit RAM" << dendl;
+ auto read_extent = read_ctx->read_extents.back();
+ write_entry->inc_bl_refs();
+ log_entries_to_read.push_back(std::move(write_entry));
+ bls_to_read.push_back(&read_extent->m_bl);
+ }
+}
+
+template <typename I>
+void WriteLog<I>::complete_read(
+ std::vector<std::shared_ptr<GenericWriteLogEntry>> &log_entries_to_read,
+ std::vector<bufferlist*> &bls_to_read,
+ Context *ctx) {
+ if (!log_entries_to_read.empty()) {
+ aio_read_data_blocks(log_entries_to_read, bls_to_read, ctx);
+ } else {
+ ctx->complete(0);
+ }
+}
+
+template <typename I>
+int WriteLog<I>::create_and_open_bdev() {
+ CephContext *cct = m_image_ctx.cct;
+
+ bdev = BlockDevice::create(cct, this->m_log_pool_name, aio_cache_cb,
+ nullptr, nullptr, nullptr);
+ int r = bdev->open(this->m_log_pool_name);
+ if (r < 0) {
+ lderr(cct) << "failed to open bdev" << dendl;
+ delete bdev;
+ return r;
+ }
+
+ ceph_assert(this->m_log_pool_size % MIN_WRITE_ALLOC_SSD_SIZE == 0);
+ if (bdev->get_size() != this->m_log_pool_size) {
+ lderr(cct) << "size mismatch: bdev size " << bdev->get_size()
+ << " (block size " << bdev->get_block_size()
+ << ") != pool size " << this->m_log_pool_size << dendl;
+ bdev->close();
+ delete bdev;
+ return -EINVAL;
+ }
+
+ return 0;
+}
+
+template <typename I>
+bool WriteLog<I>::initialize_pool(Context *on_finish,
+ pwl::DeferredContexts &later) {
+ int r;
+ CephContext *cct = m_image_ctx.cct;
+
+ ceph_assert(ceph_mutex_is_locked_by_me(m_lock));
+ if (access(this->m_log_pool_name.c_str(), F_OK) != 0) {
+ int fd = ::open(this->m_log_pool_name.c_str(), O_RDWR|O_CREAT, 0644);
+ bool succeed = true;
+ if (fd >= 0) {
+ if (truncate(this->m_log_pool_name.c_str(),
+ this->m_log_pool_size) != 0) {
+ succeed = false;
+ }
+ ::close(fd);
+ } else {
+ succeed = false;
+ }
+ if (!succeed) {
+ m_cache_state->present = false;
+ m_cache_state->clean = true;
+ m_cache_state->empty = true;
+ /* TODO: filter/replace errnos that are meaningless to the caller */
+ on_finish->complete(-errno);
+ return false;
+ }
+
+ r = create_and_open_bdev();
+ if (r < 0) {
+ on_finish->complete(r);
+ return false;
+ }
+ m_cache_state->present = true;
+ m_cache_state->clean = true;
+ m_cache_state->empty = true;
+ /* new pool, calculate and store metadata */
+
+ /* Keep ring buffer at least MIN_WRITE_ALLOC_SSD_SIZE bytes free.
+ * In this way, when all ring buffer spaces are allocated,
+ * m_first_free_entry and m_first_valid_entry will not be equal.
+ * Equal only means the cache is empty. */
+ this->m_bytes_allocated_cap = this->m_log_pool_size -
+ DATA_RING_BUFFER_OFFSET - MIN_WRITE_ALLOC_SSD_SIZE;
+ /* Log ring empty */
+ m_first_free_entry = DATA_RING_BUFFER_OFFSET;
+ m_first_valid_entry = DATA_RING_BUFFER_OFFSET;
+
+ auto new_root = std::make_shared<WriteLogPoolRoot>(pool_root);
+ new_root->layout_version = SSD_LAYOUT_VERSION;
+ new_root->pool_size = this->m_log_pool_size;
+ new_root->flushed_sync_gen = this->m_flushed_sync_gen;
+ new_root->block_size = MIN_WRITE_ALLOC_SSD_SIZE;
+ new_root->first_free_entry = m_first_free_entry;
+ new_root->first_valid_entry = m_first_valid_entry;
+ new_root->num_log_entries = 0;
+ pool_root = *new_root;
+
+ r = update_pool_root_sync(new_root);
+ if (r != 0) {
+ lderr(cct) << "failed to initialize pool ("
+ << this->m_log_pool_name << ")" << dendl;
+ bdev->close();
+ delete bdev;
+ on_finish->complete(r);
+ return false;
+ }
+ } else {
+ ceph_assert(m_cache_state->present);
+ r = create_and_open_bdev();
+ if (r < 0) {
+ on_finish->complete(r);
+ return false;
+ }
+
+ bufferlist bl;
+ SuperBlock superblock;
+ ::IOContext ioctx(cct, nullptr);
+ r = bdev->read(0, MIN_WRITE_ALLOC_SSD_SIZE, &bl, &ioctx, false);
+ if (r < 0) {
+ lderr(cct) << "Read ssd cache superblock failed " << dendl;
+ goto error_handle;
+ }
+ auto p = bl.cbegin();
+ decode(superblock, p);
+ pool_root = superblock.root;
+ ldout(cct, 1) << "Decoded root: pool_size=" << pool_root.pool_size
+ << " first_valid_entry=" << pool_root.first_valid_entry
+ << " first_free_entry=" << pool_root.first_free_entry
+ << " flushed_sync_gen=" << pool_root.flushed_sync_gen
+ << dendl;
+ ceph_assert(is_valid_pool_root(pool_root));
+ if (pool_root.layout_version != SSD_LAYOUT_VERSION) {
+ lderr(cct) << "Pool layout version is "
+ << pool_root.layout_version
+ << " expected " << SSD_LAYOUT_VERSION
+ << dendl;
+ goto error_handle;
+ }
+ if (pool_root.block_size != MIN_WRITE_ALLOC_SSD_SIZE) {
+ lderr(cct) << "Pool block size is " << pool_root.block_size
+ << " expected " << MIN_WRITE_ALLOC_SSD_SIZE
+ << dendl;
+ goto error_handle;
+ }
+
+ this->m_log_pool_size = pool_root.pool_size;
+ this->m_flushed_sync_gen = pool_root.flushed_sync_gen;
+ this->m_first_valid_entry = pool_root.first_valid_entry;
+ this->m_first_free_entry = pool_root.first_free_entry;
+ this->m_bytes_allocated_cap = this->m_log_pool_size -
+ DATA_RING_BUFFER_OFFSET -
+ MIN_WRITE_ALLOC_SSD_SIZE;
+
+ load_existing_entries(later);
+ m_cache_state->clean = this->m_dirty_log_entries.empty();
+ m_cache_state->empty = m_log_entries.empty();
+ }
+ return true;
+
+error_handle:
+ bdev->close();
+ delete bdev;
+ on_finish->complete(-EINVAL);
+ return false;
+}
+
+template <typename I>
+void WriteLog<I>::remove_pool_file() {
+ ceph_assert(bdev);
+ bdev->close();
+ delete bdev;
+ bdev = nullptr;
+ ldout(m_image_ctx.cct, 5) << "block device is closed" << dendl;
+
+ if (m_cache_state->clean) {
+ ldout(m_image_ctx.cct, 5) << "Removing empty pool file: "
+ << this->m_log_pool_name << dendl;
+ if (remove(this->m_log_pool_name.c_str()) != 0) {
+ lderr(m_image_ctx.cct) << "failed to remove empty pool \""
+ << this->m_log_pool_name << "\": " << dendl;
+ } else {
+ m_cache_state->present = false;
+ }
+ } else {
+ ldout(m_image_ctx.cct, 5) << "Not removing pool file: "
+ << this->m_log_pool_name << dendl;
+ }
+}
+
+template <typename I>
+void WriteLog<I>::load_existing_entries(pwl::DeferredContexts &later) {
+ CephContext *cct = m_image_ctx.cct;
+ std::map<uint64_t, std::shared_ptr<SyncPointLogEntry>> sync_point_entries;
+ std::map<uint64_t, bool> missing_sync_points;
+
+ // Iterate through the log_entries and append all the write_bytes
+ // of each entry to fetch the pos of next 4k of log_entries. Iterate
+ // through the log entries and append them to the in-memory vector
+ for (uint64_t next_log_pos = this->m_first_valid_entry;
+ next_log_pos != this->m_first_free_entry; ) {
+ // read the entries from SSD cache and decode
+ bufferlist bl_entries;
+ ::IOContext ioctx_entry(cct, nullptr);
+ bdev->read(next_log_pos, MIN_WRITE_ALLOC_SSD_SIZE, &bl_entries,
+ &ioctx_entry, false);
+ std::vector<WriteLogCacheEntry> ssd_log_entries;
+ auto pl = bl_entries.cbegin();
+ decode(ssd_log_entries, pl);
+ ldout(cct, 5) << "decoded ssd log entries" << dendl;
+ uint64_t curr_log_pos = next_log_pos;
+ std::shared_ptr<GenericLogEntry> log_entry = nullptr;
+
+ for (auto it = ssd_log_entries.begin(); it != ssd_log_entries.end(); ++it) {
+ this->update_entries(&log_entry, &*it, missing_sync_points,
+ sync_point_entries, curr_log_pos);
+ log_entry->ram_entry = *it;
+ log_entry->log_entry_index = curr_log_pos;
+ log_entry->completed = true;
+ m_log_entries.push_back(log_entry);
+ next_log_pos += round_up_to(it->write_bytes, MIN_WRITE_ALLOC_SSD_SIZE);
+ }
+ // along with the write_bytes, add control block size too
+ next_log_pos += MIN_WRITE_ALLOC_SSD_SIZE;
+ if (next_log_pos >= this->m_log_pool_size) {
+ next_log_pos = next_log_pos % this->m_log_pool_size + DATA_RING_BUFFER_OFFSET;
+ }
+ }
+ this->update_sync_points(missing_sync_points, sync_point_entries, later);
+ if (m_first_valid_entry > m_first_free_entry) {
+ m_bytes_allocated = this->m_log_pool_size - m_first_valid_entry +
+ m_first_free_entry - DATA_RING_BUFFER_OFFSET;
+ } else {
+ m_bytes_allocated = m_first_free_entry - m_first_valid_entry;
+ }
+}
+
+// For SSD we don't calc m_bytes_allocated in this
+template <typename I>
+void WriteLog<I>::inc_allocated_cached_bytes(
+ std::shared_ptr<pwl::GenericLogEntry> log_entry) {
+ if (log_entry->is_write_entry()) {
+ this->m_bytes_cached += log_entry->write_bytes();
+ }
+}
+
+template <typename I>
+bool WriteLog<I>::alloc_resources(C_BlockIORequestT *req) {
+ bool alloc_succeeds = true;
+ uint64_t bytes_allocated = 0;
+ uint64_t bytes_cached = 0;
+ uint64_t bytes_dirtied = 0;
+ uint64_t num_lanes = 0;
+ uint64_t num_unpublished_reserves = 0;
+ uint64_t num_log_entries = 0;
+
+ // Setup buffer, and get all the number of required resources
+ req->setup_buffer_resources(&bytes_cached, &bytes_dirtied, &bytes_allocated,
+ &num_lanes, &num_log_entries,
+ &num_unpublished_reserves);
+
+ ceph_assert(!num_lanes);
+ if (num_log_entries) {
+ bytes_allocated += num_log_entries * MIN_WRITE_ALLOC_SSD_SIZE;
+ num_log_entries = 0;
+ }
+ ceph_assert(!num_unpublished_reserves);
+
+ alloc_succeeds = this->check_allocation(req, bytes_cached, bytes_dirtied,
+ bytes_allocated, num_lanes,
+ num_log_entries,
+ num_unpublished_reserves);
+ req->set_allocated(alloc_succeeds);
+ return alloc_succeeds;
+}
+
+template <typename I>
+bool WriteLog<I>::has_sync_point_logs(GenericLogOperations &ops) {
+ for (auto &op : ops) {
+ if (op->get_log_entry()->is_sync_point()) {
+ return true;
+ break;
+ }
+ }
+ return false;
+}
+
+template<typename I>
+void WriteLog<I>::enlist_op_appender() {
+ this->m_async_append_ops++;
+ this->m_async_op_tracker.start_op();
+ Context *append_ctx = new LambdaContext([this](int r) {
+ append_scheduled_ops();
+ });
+ this->m_work_queue.queue(append_ctx);
+}
+/*
+ * Takes custody of ops. They'll all get their log entries appended,
+ * and have their on_write_persist contexts completed once they and
+ * all prior log entries are persisted everywhere.
+ */
+template<typename I>
+void WriteLog<I>::schedule_append_ops(GenericLogOperations &ops, C_BlockIORequestT *req) {
+ bool need_finisher = false;
+ GenericLogOperationsVector appending;
+
+ std::copy(std::begin(ops), std::end(ops), std::back_inserter(appending));
+ {
+ std::lock_guard locker(m_lock);
+
+ bool persist_on_flush = this->get_persist_on_flush();
+ need_finisher = !this->m_appending &&
+ ((this->m_ops_to_append.size() >= CONTROL_BLOCK_MAX_LOG_ENTRIES) ||
+ !persist_on_flush);
+
+ // Only flush logs into SSD when there is internal/external flush request
+ if (!need_finisher) {
+ need_finisher = has_sync_point_logs(ops);
+ }
+ this->m_ops_to_append.splice(this->m_ops_to_append.end(), ops);
+
+ // To preserve the order of overlapping IOs, release_cell() may be
+ // called only after the ops are added to m_ops_to_append.
+ // As soon as m_lock is released, the appended ops can be picked up
+ // by append_scheduled_ops() in another thread and req can be freed.
+ if (req != nullptr) {
+ if (persist_on_flush) {
+ req->complete_user_request(0);
+ }
+ req->release_cell();
+ }
+ }
+
+ if (need_finisher) {
+ this->enlist_op_appender();
+ }
+
+ for (auto &op : appending) {
+ op->appending();
+ }
+}
+
+template <typename I>
+void WriteLog<I>::setup_schedule_append(pwl::GenericLogOperationsVector &ops,
+ bool do_early_flush,
+ C_BlockIORequestT *req) {
+ this->schedule_append(ops, req);
+}
+
+template <typename I>
+void WriteLog<I>::append_scheduled_ops(void) {
+ GenericLogOperations ops;
+ ldout(m_image_ctx.cct, 20) << dendl;
+
+ bool ops_remain = false; //no-op variable for SSD
+ bool appending = false; //no-op variable for SSD
+ this->append_scheduled(ops, ops_remain, appending);
+
+ if (ops.size()) {
+ alloc_op_log_entries(ops);
+ append_op_log_entries(ops);
+ } else {
+ this->m_async_append_ops--;
+ this->m_async_op_tracker.finish_op();
+ }
+}
+
+/*
+ * Write and persist the (already allocated) write log entries and
+ * data buffer allocations for a set of ops. The data buffer for each
+ * of these must already have been persisted to its reserved area.
+ */
+template <typename I>
+void WriteLog<I>::append_op_log_entries(GenericLogOperations &ops) {
+ ceph_assert(!ops.empty());
+ ldout(m_image_ctx.cct, 20) << dendl;
+ Context *ctx = new LambdaContext([this, ops](int r) {
+ assert(r == 0);
+ ldout(m_image_ctx.cct, 20) << "Finished root update " << dendl;
+
+ auto captured_ops = std::move(ops);
+ this->complete_op_log_entries(std::move(captured_ops), r);
+
+ bool need_finisher = false;
+ {
+ std::lock_guard locker1(m_lock);
+ bool persist_on_flush = this->get_persist_on_flush();
+ need_finisher = ((this->m_ops_to_append.size() >= CONTROL_BLOCK_MAX_LOG_ENTRIES) ||
+ !persist_on_flush);
+
+ if (!need_finisher) {
+ need_finisher = has_sync_point_logs(this->m_ops_to_append);
+ }
+ }
+
+ if (need_finisher) {
+ this->enlist_op_appender();
+ }
+ this->m_async_update_superblock--;
+ this->m_async_op_tracker.finish_op();
+ });
+ uint64_t *new_first_free_entry = new(uint64_t);
+ Context *append_ctx = new LambdaContext(
+ [this, new_first_free_entry, ops, ctx](int r) {
+ std::shared_ptr<WriteLogPoolRoot> new_root;
+ {
+ ldout(m_image_ctx.cct, 20) << "Finished appending at "
+ << *new_first_free_entry << dendl;
+ utime_t now = ceph_clock_now();
+ for (auto &operation : ops) {
+ operation->log_append_comp_time = now;
+ }
+
+ std::lock_guard locker(this->m_log_append_lock);
+ std::lock_guard locker1(m_lock);
+ assert(this->m_appending);
+ this->m_appending = false;
+ new_root = std::make_shared<WriteLogPoolRoot>(pool_root);
+ pool_root.first_free_entry = *new_first_free_entry;
+ new_root->first_free_entry = *new_first_free_entry;
+ delete new_first_free_entry;
+ schedule_update_root(new_root, ctx);
+ }
+ this->m_async_append_ops--;
+ this->m_async_op_tracker.finish_op();
+ });
+ // Append logs and update first_free_update
+ append_ops(ops, append_ctx, new_first_free_entry);
+
+ if (ops.size()) {
+ this->dispatch_deferred_writes();
+ }
+}
+
+template <typename I>
+void WriteLog<I>::release_ram(std::shared_ptr<GenericLogEntry> log_entry) {
+ log_entry->remove_cache_bl();
+}
+
+template <typename I>
+void WriteLog<I>::alloc_op_log_entries(GenericLogOperations &ops) {
+ std::unique_lock locker(m_lock);
+
+ for (auto &operation : ops) {
+ auto &log_entry = operation->get_log_entry();
+ log_entry->ram_entry.set_entry_valid(true);
+ m_log_entries.push_back(log_entry);
+ ldout(m_image_ctx.cct, 20) << "operation=[" << *operation << "]" << dendl;
+ }
+ if (m_cache_state->empty && !m_log_entries.empty()) {
+ m_cache_state->empty = false;
+ this->update_image_cache_state();
+ this->write_image_cache_state(locker);
+ }
+}
+
+template <typename I>
+void WriteLog<I>::construct_flush_entries(pwl::GenericLogEntries entries_to_flush,
+ DeferredContexts &post_unlock,
+ bool has_write_entry) {
+ // snapshot so we behave consistently
+ bool invalidating = this->m_invalidating;
+
+ if (invalidating || !has_write_entry) {
+ for (auto &log_entry : entries_to_flush) {
+ GuardedRequestFunctionContext *guarded_ctx =
+ new GuardedRequestFunctionContext([this, log_entry, invalidating]
+ (GuardedRequestFunctionContext &guard_ctx) {
+ log_entry->m_cell = guard_ctx.cell;
+ Context *ctx = this->construct_flush_entry(log_entry, invalidating);
+
+ if (!invalidating) {
+ ctx = new LambdaContext([this, log_entry, ctx](int r) {
+ m_image_ctx.op_work_queue->queue(new LambdaContext(
+ [this, log_entry, ctx](int r) {
+ ldout(m_image_ctx.cct, 15) << "flushing:" << log_entry
+ << " " << *log_entry << dendl;
+ log_entry->writeback(this->m_image_writeback, ctx);
+ }), 0);
+ });
+ }
+ ctx->complete(0);
+ });
+ this->detain_flush_guard_request(log_entry, guarded_ctx);
+ }
+ } else {
+ int count = entries_to_flush.size();
+ std::vector<std::shared_ptr<GenericWriteLogEntry>> write_entries;
+ std::vector<bufferlist *> read_bls;
+
+ write_entries.reserve(count);
+ read_bls.reserve(count);
+
+ for (auto &log_entry : entries_to_flush) {
+ if (log_entry->is_write_entry()) {
+ bufferlist *bl = new bufferlist;
+ auto write_entry = static_pointer_cast<WriteLogEntry>(log_entry);
+ write_entry->inc_bl_refs();
+ write_entries.push_back(write_entry);
+ read_bls.push_back(bl);
+ }
+ }
+
+ Context *ctx = new LambdaContext(
+ [this, entries_to_flush, read_bls](int r) {
+ int i = 0;
+ GuardedRequestFunctionContext *guarded_ctx = nullptr;
+
+ for (auto &log_entry : entries_to_flush) {
+ if (log_entry->is_write_entry()) {
+ bufferlist captured_entry_bl;
+ captured_entry_bl.claim_append(*read_bls[i]);
+ delete read_bls[i++];
+
+ guarded_ctx = new GuardedRequestFunctionContext([this, log_entry, captured_entry_bl]
+ (GuardedRequestFunctionContext &guard_ctx) {
+ log_entry->m_cell = guard_ctx.cell;
+ Context *ctx = this->construct_flush_entry(log_entry, false);
+
+ m_image_ctx.op_work_queue->queue(new LambdaContext(
+ [this, log_entry, entry_bl=std::move(captured_entry_bl), ctx](int r) {
+ auto captured_entry_bl = std::move(entry_bl);
+ ldout(m_image_ctx.cct, 15) << "flushing:" << log_entry
+ << " " << *log_entry << dendl;
+ log_entry->writeback_bl(this->m_image_writeback, ctx,
+ std::move(captured_entry_bl));
+ }), 0);
+ });
+ } else {
+ guarded_ctx = new GuardedRequestFunctionContext([this, log_entry]
+ (GuardedRequestFunctionContext &guard_ctx) {
+ log_entry->m_cell = guard_ctx.cell;
+ Context *ctx = this->construct_flush_entry(log_entry, false);
+ m_image_ctx.op_work_queue->queue(new LambdaContext(
+ [this, log_entry, ctx](int r) {
+ ldout(m_image_ctx.cct, 15) << "flushing:" << log_entry
+ << " " << *log_entry << dendl;
+ log_entry->writeback(this->m_image_writeback, ctx);
+ }), 0);
+ });
+ }
+ this->detain_flush_guard_request(log_entry, guarded_ctx);
+ }
+ });
+
+ aio_read_data_blocks(write_entries, read_bls, ctx);
+ }
+}
+
+template <typename I>
+void WriteLog<I>::process_work() {
+ CephContext *cct = m_image_ctx.cct;
+ int max_iterations = 4;
+ bool wake_up_requested = false;
+ uint64_t aggressive_high_water_bytes =
+ this->m_bytes_allocated_cap * AGGRESSIVE_RETIRE_HIGH_WATER;
+ uint64_t high_water_bytes = this->m_bytes_allocated_cap * RETIRE_HIGH_WATER;
+
+ ldout(cct, 20) << dendl;
+
+ do {
+ {
+ std::lock_guard locker(m_lock);
+ this->m_wake_up_requested = false;
+ }
+ if (this->m_alloc_failed_since_retire || (this->m_shutting_down) ||
+ this->m_invalidating || m_bytes_allocated > high_water_bytes) {
+ ldout(m_image_ctx.cct, 10) << "alloc_fail=" << this->m_alloc_failed_since_retire
+ << ", allocated > high_water="
+ << (m_bytes_allocated > high_water_bytes)
+ << dendl;
+ retire_entries((this->m_shutting_down || this->m_invalidating ||
+ m_bytes_allocated > aggressive_high_water_bytes)
+ ? MAX_ALLOC_PER_TRANSACTION : MAX_FREE_PER_TRANSACTION);
+ }
+ this->dispatch_deferred_writes();
+ this->process_writeback_dirty_entries();
+ {
+ std::lock_guard locker(m_lock);
+ wake_up_requested = this->m_wake_up_requested;
+ }
+ } while (wake_up_requested && --max_iterations > 0);
+
+ {
+ std::lock_guard locker(m_lock);
+ this->m_wake_up_scheduled = false;
+ // Reschedule if it's still requested
+ if (this->m_wake_up_requested) {
+ this->wake_up();
+ }
+ }
+}
+
+/**
+ * Retire up to MAX_ALLOC_PER_TRANSACTION of the oldest log entries
+ * that are eligible to be retired. Returns true if anything was
+ * retired.
+ *
+*/
+template <typename I>
+bool WriteLog<I>::retire_entries(const unsigned long int frees_per_tx) {
+ CephContext *cct = m_image_ctx.cct;
+ GenericLogEntriesVector retiring_entries;
+ uint64_t initial_first_valid_entry;
+ uint64_t first_valid_entry;
+
+ std::lock_guard retire_locker(this->m_log_retire_lock);
+ ldout(cct, 20) << "Look for entries to retire" << dendl;
+ {
+ // Entry readers can't be added while we hold m_entry_reader_lock
+ RWLock::WLocker entry_reader_locker(this->m_entry_reader_lock);
+ std::lock_guard locker(m_lock);
+ initial_first_valid_entry = m_first_valid_entry;
+ first_valid_entry = m_first_valid_entry;
+ while (retiring_entries.size() < frees_per_tx && !m_log_entries.empty()) {
+ GenericLogEntriesVector retiring_subentries;
+ uint64_t control_block_pos = m_log_entries.front()->log_entry_index;
+ uint64_t data_length = 0;
+ for (auto it = m_log_entries.begin(); it != m_log_entries.end(); ++it) {
+ if (this->can_retire_entry(*it)) {
+ // log_entry_index is valid after appending to SSD
+ if ((*it)->log_entry_index != control_block_pos) {
+ ldout(cct, 20) << "Old log_entry_index is " << control_block_pos
+ << ",New log_entry_index is "
+ << (*it)->log_entry_index
+ << ",data length is " << data_length << dendl;
+ ldout(cct, 20) << "The log entry is " << *(*it) << dendl;
+ if ((*it)->log_entry_index < control_block_pos) {
+ ceph_assert((*it)->log_entry_index ==
+ (control_block_pos + data_length + MIN_WRITE_ALLOC_SSD_SIZE) %
+ this->m_log_pool_size + DATA_RING_BUFFER_OFFSET);
+ } else {
+ ceph_assert((*it)->log_entry_index == control_block_pos +
+ data_length + MIN_WRITE_ALLOC_SSD_SIZE);
+ }
+ break;
+ } else {
+ retiring_subentries.push_back(*it);
+ if ((*it)->is_write_entry()) {
+ data_length += (*it)->get_aligned_data_size();
+ }
+ }
+ } else {
+ retiring_subentries.clear();
+ break;
+ }
+ }
+ // SSD: retiring_subentries in a span
+ if (!retiring_subentries.empty()) {
+ for (auto it = retiring_subentries.begin();
+ it != retiring_subentries.end(); it++) {
+ ceph_assert(m_log_entries.front() == *it);
+ m_log_entries.pop_front();
+ if ((*it)->write_bytes() > 0 || (*it)->bytes_dirty() > 0) {
+ auto gen_write_entry = static_pointer_cast<GenericWriteLogEntry>(*it);
+ if (gen_write_entry) {
+ this->m_blocks_to_log_entries.remove_log_entry(gen_write_entry);
+ }
+ }
+ }
+
+ ldout(cct, 20) << "span with " << retiring_subentries.size()
+ << " entries: control_block_pos=" << control_block_pos
+ << " data_length=" << data_length
+ << dendl;
+ retiring_entries.insert(
+ retiring_entries.end(), retiring_subentries.begin(),
+ retiring_subentries.end());
+
+ first_valid_entry = control_block_pos + data_length +
+ MIN_WRITE_ALLOC_SSD_SIZE;
+ if (first_valid_entry >= this->m_log_pool_size) {
+ first_valid_entry = first_valid_entry % this->m_log_pool_size +
+ DATA_RING_BUFFER_OFFSET;
+ }
+ } else {
+ break;
+ }
+ }
+ }
+ if (retiring_entries.size()) {
+ ldout(cct, 20) << "Retiring " << retiring_entries.size() << " entries"
+ << dendl;
+
+ // Advance first valid entry and release buffers
+ uint64_t flushed_sync_gen;
+ std::lock_guard append_locker(this->m_log_append_lock);
+ {
+ std::lock_guard locker(m_lock);
+ flushed_sync_gen = this->m_flushed_sync_gen;
+ }
+
+ ceph_assert(first_valid_entry != initial_first_valid_entry);
+ auto new_root = std::make_shared<WriteLogPoolRoot>(pool_root);
+ new_root->flushed_sync_gen = flushed_sync_gen;
+ new_root->first_valid_entry = first_valid_entry;
+ pool_root.flushed_sync_gen = flushed_sync_gen;
+ pool_root.first_valid_entry = first_valid_entry;
+
+ Context *ctx = new LambdaContext(
+ [this, first_valid_entry, initial_first_valid_entry,
+ retiring_entries](int r) {
+ uint64_t allocated_bytes = 0;
+ uint64_t cached_bytes = 0;
+ uint64_t former_log_pos = 0;
+ for (auto &entry : retiring_entries) {
+ ceph_assert(entry->log_entry_index != 0);
+ if (entry->log_entry_index != former_log_pos ) {
+ // Space for control blocks
+ allocated_bytes += MIN_WRITE_ALLOC_SSD_SIZE;
+ former_log_pos = entry->log_entry_index;
+ }
+ if (entry->is_write_entry()) {
+ cached_bytes += entry->write_bytes();
+ // space for userdata
+ allocated_bytes += entry->get_aligned_data_size();
+ }
+ }
+ bool need_update_state = false;
+ {
+ std::lock_guard locker(m_lock);
+ m_first_valid_entry = first_valid_entry;
+ ceph_assert(m_first_valid_entry % MIN_WRITE_ALLOC_SSD_SIZE == 0);
+ ceph_assert(this->m_bytes_allocated >= allocated_bytes);
+ this->m_bytes_allocated -= allocated_bytes;
+ ceph_assert(this->m_bytes_cached >= cached_bytes);
+ this->m_bytes_cached -= cached_bytes;
+ if (!m_cache_state->empty && m_log_entries.empty()) {
+ m_cache_state->empty = true;
+ this->update_image_cache_state();
+ need_update_state = true;
+ }
+
+ ldout(m_image_ctx.cct, 20)
+ << "Finished root update: " << "initial_first_valid_entry="
+ << initial_first_valid_entry << ", " << "m_first_valid_entry="
+ << m_first_valid_entry << "," << "release space = "
+ << allocated_bytes << "," << "m_bytes_allocated="
+ << m_bytes_allocated << "," << "release cached space="
+ << cached_bytes << "," << "m_bytes_cached="
+ << this->m_bytes_cached << dendl;
+
+ this->m_alloc_failed_since_retire = false;
+ this->wake_up();
+ }
+ if (need_update_state) {
+ std::unique_lock locker(m_lock);
+ this->write_image_cache_state(locker);
+ }
+
+ this->dispatch_deferred_writes();
+ this->process_writeback_dirty_entries();
+ m_async_update_superblock--;
+ this->m_async_op_tracker.finish_op();
+ });
+
+ std::lock_guard locker(m_lock);
+ schedule_update_root(new_root, ctx);
+ } else {
+ ldout(cct, 20) << "Nothing to retire" << dendl;
+ return false;
+ }
+ return true;
+}
+
+template <typename I>
+void WriteLog<I>::append_ops(GenericLogOperations &ops, Context *ctx,
+ uint64_t* new_first_free_entry) {
+ GenericLogEntriesVector log_entries;
+ CephContext *cct = m_image_ctx.cct;
+ uint64_t span_payload_len = 0;
+ uint64_t bytes_to_free = 0;
+ ldout(cct, 20) << "Appending " << ops.size() << " log entries." << dendl;
+
+ *new_first_free_entry = pool_root.first_free_entry;
+ AioTransContext* aio = new AioTransContext(cct, ctx);
+
+ utime_t now = ceph_clock_now();
+ for (auto &operation : ops) {
+ operation->log_append_start_time = now;
+ auto log_entry = operation->get_log_entry();
+
+ if (log_entries.size() == CONTROL_BLOCK_MAX_LOG_ENTRIES ||
+ span_payload_len >= SPAN_MAX_DATA_LEN) {
+ if (log_entries.size() > 1) {
+ bytes_to_free += (log_entries.size() - 1) * MIN_WRITE_ALLOC_SSD_SIZE;
+ }
+ write_log_entries(log_entries, aio, new_first_free_entry);
+ log_entries.clear();
+ span_payload_len = 0;
+ }
+ log_entries.push_back(log_entry);
+ span_payload_len += log_entry->write_bytes();
+ }
+ if (!span_payload_len || !log_entries.empty()) {
+ if (log_entries.size() > 1) {
+ bytes_to_free += (log_entries.size() - 1) * MIN_WRITE_ALLOC_SSD_SIZE;
+ }
+ write_log_entries(log_entries, aio, new_first_free_entry);
+ }
+
+ {
+ std::lock_guard locker1(m_lock);
+ m_first_free_entry = *new_first_free_entry;
+ m_bytes_allocated -= bytes_to_free;
+ }
+
+ bdev->aio_submit(&aio->ioc);
+}
+
+template <typename I>
+void WriteLog<I>::write_log_entries(GenericLogEntriesVector log_entries,
+ AioTransContext *aio, uint64_t *pos) {
+ CephContext *cct = m_image_ctx.cct;
+ ldout(m_image_ctx.cct, 20) << "pos=" << *pos << dendl;
+ ceph_assert(*pos >= DATA_RING_BUFFER_OFFSET &&
+ *pos < this->m_log_pool_size &&
+ *pos % MIN_WRITE_ALLOC_SSD_SIZE == 0);
+
+ // The first block is for log entries
+ uint64_t control_block_pos = *pos;
+ *pos += MIN_WRITE_ALLOC_SSD_SIZE;
+ if (*pos == this->m_log_pool_size) {
+ *pos = DATA_RING_BUFFER_OFFSET;
+ }
+
+ std::vector<WriteLogCacheEntry> persist_log_entries;
+ bufferlist data_bl;
+ for (auto &log_entry : log_entries) {
+ log_entry->log_entry_index = control_block_pos;
+ // Append data buffer for write operations
+ if (log_entry->is_write_entry()) {
+ auto write_entry = static_pointer_cast<WriteLogEntry>(log_entry);
+ auto cache_bl = write_entry->get_cache_bl();
+ auto align_size = write_entry->get_aligned_data_size();
+ data_bl.append(cache_bl);
+ data_bl.append_zero(align_size - cache_bl.length());
+
+ write_entry->ram_entry.write_data_pos = *pos;
+ *pos += align_size;
+ if (*pos >= this->m_log_pool_size) {
+ *pos = *pos % this->m_log_pool_size + DATA_RING_BUFFER_OFFSET;
+ }
+ }
+ // push_back _after_ setting write_data_pos
+ persist_log_entries.push_back(log_entry->ram_entry);
+ }
+
+ //aio write
+ bufferlist bl;
+ encode(persist_log_entries, bl);
+ ceph_assert(bl.length() <= MIN_WRITE_ALLOC_SSD_SIZE);
+ bl.append_zero(MIN_WRITE_ALLOC_SSD_SIZE - bl.length());
+ bl.append(data_bl);
+ ceph_assert(bl.length() % MIN_WRITE_ALLOC_SSD_SIZE == 0);
+ if (control_block_pos + bl.length() > this->m_log_pool_size) {
+ //exceeds border, need to split
+ uint64_t size = bl.length();
+ bufferlist bl1;
+ bl.splice(0, this->m_log_pool_size - control_block_pos, &bl1);
+ ceph_assert(bl.length() == (size - bl1.length()));
+
+ ldout(cct, 20) << "write " << control_block_pos << "~"
+ << size << " spans boundary, split into "
+ << control_block_pos << "~" << bl1.length()
+ << " and " << DATA_RING_BUFFER_OFFSET << "~"
+ << bl.length() << dendl;
+ bdev->aio_write(control_block_pos, bl1, &aio->ioc, false,
+ WRITE_LIFE_NOT_SET);
+ bdev->aio_write(DATA_RING_BUFFER_OFFSET, bl, &aio->ioc, false,
+ WRITE_LIFE_NOT_SET);
+ } else {
+ ldout(cct, 20) << "write " << control_block_pos << "~"
+ << bl.length() << dendl;
+ bdev->aio_write(control_block_pos, bl, &aio->ioc, false,
+ WRITE_LIFE_NOT_SET);
+ }
+}
+
+template <typename I>
+void WriteLog<I>::schedule_update_root(
+ std::shared_ptr<WriteLogPoolRoot> root, Context *ctx) {
+ CephContext *cct = m_image_ctx.cct;
+ ldout(cct, 15) << "New root: pool_size=" << root->pool_size
+ << " first_valid_entry=" << root->first_valid_entry
+ << " first_free_entry=" << root->first_free_entry
+ << " flushed_sync_gen=" << root->flushed_sync_gen
+ << dendl;
+ ceph_assert(is_valid_pool_root(*root));
+
+ bool need_finisher;
+ {
+ ceph_assert(ceph_mutex_is_locked_by_me(m_lock));
+ need_finisher = m_poolroot_to_update.empty() && !m_updating_pool_root;
+ std::shared_ptr<WriteLogPoolRootUpdate> entry =
+ std::make_shared<WriteLogPoolRootUpdate>(root, ctx);
+ this->m_async_update_superblock++;
+ this->m_async_op_tracker.start_op();
+ m_poolroot_to_update.emplace_back(entry);
+ }
+ if (need_finisher) {
+ enlist_op_update_root();
+ }
+}
+
+template <typename I>
+void WriteLog<I>::enlist_op_update_root() {
+ Context *append_ctx = new LambdaContext([this](int r) {
+ update_root_scheduled_ops();
+ });
+ this->m_work_queue.queue(append_ctx);
+}
+
+template <typename I>
+void WriteLog<I>::update_root_scheduled_ops() {
+ ldout(m_image_ctx.cct, 20) << dendl;
+
+ std::shared_ptr<WriteLogPoolRoot> root;
+ WriteLogPoolRootUpdateList root_updates;
+ Context *ctx = nullptr;
+ {
+ std::lock_guard locker(m_lock);
+ if (m_updating_pool_root) {
+ /* Another thread is appending */
+ ldout(m_image_ctx.cct, 15) << "Another thread is updating pool root"
+ << dendl;
+ return;
+ }
+ if (m_poolroot_to_update.size()) {
+ m_updating_pool_root = true;
+ root_updates.swap(m_poolroot_to_update);
+ }
+ }
+ ceph_assert(!root_updates.empty());
+ ldout(m_image_ctx.cct, 15) << "Update root number: " << root_updates.size()
+ << dendl;
+ // We just update the last one, and call all the completions.
+ auto entry = root_updates.back();
+ root = entry->root;
+
+ ctx = new LambdaContext([this, updates = std::move(root_updates)](int r) {
+ ldout(m_image_ctx.cct, 15) << "Start to callback." << dendl;
+ for (auto it = updates.begin(); it != updates.end(); it++) {
+ Context *it_ctx = (*it)->ctx;
+ it_ctx->complete(r);
+ }
+ });
+ Context *append_ctx = new LambdaContext([this, ctx](int r) {
+ ldout(m_image_ctx.cct, 15) << "Finish the update of pool root." << dendl;
+ bool need_finisher = false;;
+ assert(r == 0);
+ {
+ std::lock_guard locker(m_lock);
+ m_updating_pool_root = false;
+ need_finisher = !m_poolroot_to_update.empty();
+ }
+ if (need_finisher) {
+ enlist_op_update_root();
+ }
+ ctx->complete(r);
+ });
+ AioTransContext* aio = new AioTransContext(m_image_ctx.cct, append_ctx);
+ update_pool_root(root, aio);
+}
+
+template <typename I>
+void WriteLog<I>::update_pool_root(std::shared_ptr<WriteLogPoolRoot> root,
+ AioTransContext *aio) {
+ bufferlist bl;
+ SuperBlock superblock;
+ superblock.root = *root;
+ encode(superblock, bl);
+ bl.append_zero(MIN_WRITE_ALLOC_SSD_SIZE - bl.length());
+ ceph_assert(bl.length() % MIN_WRITE_ALLOC_SSD_SIZE == 0);
+ bdev->aio_write(0, bl, &aio->ioc, false, WRITE_LIFE_NOT_SET);
+ bdev->aio_submit(&aio->ioc);
+}
+
+template <typename I>
+int WriteLog<I>::update_pool_root_sync(
+ std::shared_ptr<WriteLogPoolRoot> root) {
+ bufferlist bl;
+ SuperBlock superblock;
+ superblock.root = *root;
+ encode(superblock, bl);
+ bl.append_zero(MIN_WRITE_ALLOC_SSD_SIZE - bl.length());
+ ceph_assert(bl.length() % MIN_WRITE_ALLOC_SSD_SIZE == 0);
+ return bdev->write(0, bl, false);
+}
+
+template <typename I>
+void WriteLog<I>::aio_read_data_block(std::shared_ptr<GenericWriteLogEntry> log_entry,
+ bufferlist *bl, Context *ctx) {
+ std::vector<std::shared_ptr<GenericWriteLogEntry>> log_entries = {std::move(log_entry)};
+ std::vector<bufferlist *> bls {bl};
+ aio_read_data_blocks(log_entries, bls, ctx);
+}
+
+template <typename I>
+void WriteLog<I>::aio_read_data_blocks(
+ std::vector<std::shared_ptr<GenericWriteLogEntry>> &log_entries,
+ std::vector<bufferlist *> &bls, Context *ctx) {
+ ceph_assert(log_entries.size() == bls.size());
+
+ //get the valid part
+ Context *read_ctx = new LambdaContext(
+ [log_entries, bls, ctx](int r) {
+ for (unsigned int i = 0; i < log_entries.size(); i++) {
+ bufferlist valid_data_bl;
+ auto write_entry = static_pointer_cast<WriteLogEntry>(log_entries[i]);
+ auto length = write_entry->ram_entry.is_write() ? write_entry->ram_entry.write_bytes
+ : write_entry->ram_entry.ws_datalen;
+
+ valid_data_bl.substr_of(*bls[i], 0, length);
+ bls[i]->clear();
+ bls[i]->append(valid_data_bl);
+ write_entry->dec_bl_refs();
+ }
+ ctx->complete(r);
+ });
+
+ CephContext *cct = m_image_ctx.cct;
+ AioTransContext *aio = new AioTransContext(cct, read_ctx);
+ for (unsigned int i = 0; i < log_entries.size(); i++) {
+ WriteLogCacheEntry *log_entry = &log_entries[i]->ram_entry;
+
+ ceph_assert(log_entry->is_write() || log_entry->is_writesame());
+ uint64_t len = log_entry->is_write() ? log_entry->write_bytes :
+ log_entry->ws_datalen;
+ uint64_t align_len = round_up_to(len, MIN_WRITE_ALLOC_SSD_SIZE);
+
+ ldout(cct, 20) << "entry i=" << i << " " << log_entry->write_data_pos
+ << "~" << len << dendl;
+ ceph_assert(log_entry->write_data_pos >= DATA_RING_BUFFER_OFFSET &&
+ log_entry->write_data_pos < pool_root.pool_size);
+ ceph_assert(align_len);
+ if (log_entry->write_data_pos + align_len > pool_root.pool_size) {
+ // spans boundary, need to split
+ uint64_t len1 = pool_root.pool_size - log_entry->write_data_pos;
+ uint64_t len2 = align_len - len1;
+
+ ldout(cct, 20) << "read " << log_entry->write_data_pos << "~"
+ << align_len << " spans boundary, split into "
+ << log_entry->write_data_pos << "~" << len1
+ << " and " << DATA_RING_BUFFER_OFFSET << "~"
+ << len2 << dendl;
+ bdev->aio_read(log_entry->write_data_pos, len1, bls[i], &aio->ioc);
+ bdev->aio_read(DATA_RING_BUFFER_OFFSET, len2, bls[i], &aio->ioc);
+ } else {
+ ldout(cct, 20) << "read " << log_entry->write_data_pos << "~"
+ << align_len << dendl;
+ bdev->aio_read(log_entry->write_data_pos, align_len, bls[i], &aio->ioc);
+ }
+ }
+ bdev->aio_submit(&aio->ioc);
+}
+
+template <typename I>
+void WriteLog<I>::complete_user_request(Context *&user_req, int r) {
+ m_image_ctx.op_work_queue->queue(user_req, r);
+}
+
+} // namespace ssd
+} // namespace pwl
+} // namespace cache
+} // namespace librbd
+
+template class librbd::cache::pwl::ssd::WriteLog<librbd::ImageCtx>;
diff --git a/src/librbd/cache/pwl/ssd/WriteLog.h b/src/librbd/cache/pwl/ssd/WriteLog.h
new file mode 100644
index 000000000..69cc36662
--- /dev/null
+++ b/src/librbd/cache/pwl/ssd/WriteLog.h
@@ -0,0 +1,156 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#ifndef CEPH_LIBRBD_CACHE_PWL_SSD_WRITE_LOG
+#define CEPH_LIBRBD_CACHE_PWL_SSD_WRITE_LOG
+
+#include "blk/BlockDevice.h"
+#include "common/AsyncOpTracker.h"
+#include "common/Checksummer.h"
+#include "common/environment.h"
+#include "common/RWLock.h"
+#include "common/WorkQueue.h"
+#include "librbd/BlockGuard.h"
+#include "librbd/Utils.h"
+#include "librbd/cache/ImageWriteback.h"
+#include "librbd/cache/Types.h"
+#include "librbd/cache/pwl/AbstractWriteLog.h"
+#include "librbd/cache/pwl/LogMap.h"
+#include "librbd/cache/pwl/LogOperation.h"
+#include "librbd/cache/pwl/Request.h"
+#include "librbd/cache/pwl/ssd/Builder.h"
+#include "librbd/cache/pwl/ssd/Types.h"
+#include <functional>
+#include <list>
+
+namespace librbd {
+
+struct ImageCtx;
+
+namespace cache {
+namespace pwl {
+namespace ssd {
+
+template <typename ImageCtxT>
+class WriteLog : public AbstractWriteLog<ImageCtxT> {
+public:
+ WriteLog(ImageCtxT &image_ctx,
+ librbd::cache::pwl::ImageCacheState<ImageCtxT>* cache_state,
+ cache::ImageWritebackInterface& image_writeback,
+ plugin::Api<ImageCtxT>& plugin_api);
+ ~WriteLog();
+ WriteLog(const WriteLog&) = delete;
+ WriteLog &operator=(const WriteLog&) = delete;
+
+ typedef io::Extent Extent;
+ using This = AbstractWriteLog<ImageCtxT>;
+ using C_BlockIORequestT = pwl::C_BlockIORequest<This>;
+ using C_WriteRequestT = pwl::C_WriteRequest<This>;
+ using C_WriteSameRequestT = pwl::C_WriteSameRequest<This>;
+
+ bool alloc_resources(C_BlockIORequestT *req) override;
+ void setup_schedule_append(
+ pwl::GenericLogOperationsVector &ops, bool do_early_flush,
+ C_BlockIORequestT *req) override;
+ void complete_user_request(Context *&user_req, int r) override;
+
+protected:
+ using AbstractWriteLog<ImageCtxT>::m_lock;
+ using AbstractWriteLog<ImageCtxT>::m_log_entries;
+ using AbstractWriteLog<ImageCtxT>::m_image_ctx;
+ using AbstractWriteLog<ImageCtxT>::m_cache_state;
+ using AbstractWriteLog<ImageCtxT>::m_first_free_entry;
+ using AbstractWriteLog<ImageCtxT>::m_first_valid_entry;
+ using AbstractWriteLog<ImageCtxT>::m_bytes_allocated;
+
+ bool initialize_pool(Context *on_finish,
+ pwl::DeferredContexts &later) override;
+ void process_work() override;
+ void append_scheduled_ops(void) override;
+ void schedule_append_ops(pwl::GenericLogOperations &ops, C_BlockIORequestT *req) override;
+ void remove_pool_file() override;
+ void release_ram(std::shared_ptr<GenericLogEntry> log_entry) override;
+
+private:
+ class AioTransContext {
+ public:
+ Context *on_finish;
+ ::IOContext ioc;
+ explicit AioTransContext(CephContext* cct, Context *cb)
+ : on_finish(cb), ioc(cct, this) {}
+
+ ~AioTransContext(){}
+
+ void aio_finish() {
+ on_finish->complete(ioc.get_return_value());
+ delete this;
+ }
+ }; //class AioTransContext
+
+ struct WriteLogPoolRootUpdate {
+ std::shared_ptr<pwl::WriteLogPoolRoot> root;
+ Context *ctx;
+ WriteLogPoolRootUpdate(std::shared_ptr<pwl::WriteLogPoolRoot> r,
+ Context* c)
+ : root(r), ctx(c) {}
+ };
+
+ using WriteLogPoolRootUpdateList = std::list<std::shared_ptr<WriteLogPoolRootUpdate>>;
+ WriteLogPoolRootUpdateList m_poolroot_to_update; /* pool root list to update to SSD */
+ bool m_updating_pool_root = false;
+
+ std::atomic<int> m_async_update_superblock = {0};
+ BlockDevice *bdev = nullptr;
+ pwl::WriteLogPoolRoot pool_root;
+ Builder<This> *m_builderobj;
+
+ Builder<This>* create_builder();
+ int create_and_open_bdev();
+ void load_existing_entries(pwl::DeferredContexts &later);
+ void inc_allocated_cached_bytes(
+ std::shared_ptr<pwl::GenericLogEntry> log_entry) override;
+ void collect_read_extents(
+ uint64_t read_buffer_offset, LogMapEntry<GenericWriteLogEntry> map_entry,
+ std::vector<std::shared_ptr<GenericWriteLogEntry>> &log_entries_to_read,
+ std::vector<bufferlist*> &bls_to_read, uint64_t entry_hit_length,
+ Extent hit_extent, pwl::C_ReadRequest *read_ctx) override;
+ void complete_read(
+ std::vector<std::shared_ptr<GenericWriteLogEntry>> &log_entries_to_read,
+ std::vector<bufferlist*> &bls_to_read, Context *ctx) override;
+ void enlist_op_appender();
+ bool retire_entries(const unsigned long int frees_per_tx);
+ bool has_sync_point_logs(GenericLogOperations &ops);
+ void append_op_log_entries(GenericLogOperations &ops);
+ void alloc_op_log_entries(GenericLogOperations &ops);
+ void construct_flush_entries(pwl::GenericLogEntries entires_to_flush,
+ DeferredContexts &post_unlock,
+ bool has_write_entry) override;
+ void append_ops(GenericLogOperations &ops, Context *ctx,
+ uint64_t* new_first_free_entry);
+ void write_log_entries(GenericLogEntriesVector log_entries,
+ AioTransContext *aio, uint64_t *pos);
+ void schedule_update_root(std::shared_ptr<WriteLogPoolRoot> root,
+ Context *ctx);
+ void enlist_op_update_root();
+ void update_root_scheduled_ops();
+ int update_pool_root_sync(std::shared_ptr<pwl::WriteLogPoolRoot> root);
+ void update_pool_root(std::shared_ptr<WriteLogPoolRoot> root,
+ AioTransContext *aio);
+ void aio_read_data_block(std::shared_ptr<GenericWriteLogEntry> log_entry,
+ bufferlist *bl, Context *ctx);
+ void aio_read_data_blocks(std::vector<std::shared_ptr<GenericWriteLogEntry>> &log_entries,
+ std::vector<bufferlist *> &bls, Context *ctx);
+ static void aio_cache_cb(void *priv, void *priv2) {
+ AioTransContext *c = static_cast<AioTransContext*>(priv2);
+ c->aio_finish();
+ }
+};//class WriteLog
+
+} // namespace ssd
+} // namespace pwl
+} // namespace cache
+} // namespace librbd
+
+extern template class librbd::cache::pwl::ssd::WriteLog<librbd::ImageCtx>;
+
+#endif // CEPH_LIBRBD_CACHE_PWL_SSD_WRITE_LOG