summaryrefslogtreecommitdiffstats
path: root/src/librbd/cache/pwl/rwl
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--src/librbd/cache/pwl/rwl/Builder.h107
-rw-r--r--src/librbd/cache/pwl/rwl/LogEntry.cc106
-rw-r--r--src/librbd/cache/pwl/rwl/LogEntry.h68
-rw-r--r--src/librbd/cache/pwl/rwl/LogOperation.cc39
-rw-r--r--src/librbd/cache/pwl/rwl/LogOperation.h55
-rw-r--r--src/librbd/cache/pwl/rwl/ReadRequest.cc70
-rw-r--r--src/librbd/cache/pwl/rwl/ReadRequest.h34
-rw-r--r--src/librbd/cache/pwl/rwl/Request.cc86
-rw-r--r--src/librbd/cache/pwl/rwl/Request.h90
-rw-r--r--src/librbd/cache/pwl/rwl/WriteLog.cc1011
-rw-r--r--src/librbd/cache/pwl/rwl/WriteLog.h124
11 files changed, 1790 insertions, 0 deletions
diff --git a/src/librbd/cache/pwl/rwl/Builder.h b/src/librbd/cache/pwl/rwl/Builder.h
new file mode 100644
index 000000000..c13c7b5ae
--- /dev/null
+++ b/src/librbd/cache/pwl/rwl/Builder.h
@@ -0,0 +1,107 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#ifndef CEPH_LIBRBD_CACHE_PWL_RWL_BUILDER_H
+#define CEPH_LIBRBD_CACHE_PWL_RWL_BUILDER_H
+
+#include <iostream>
+#include "LogEntry.h"
+#include "ReadRequest.h"
+#include "Request.h"
+#include "LogOperation.h"
+
+#include "librbd/cache/ImageWriteback.h"
+#include "librbd/cache/pwl/Builder.h"
+
+namespace librbd {
+namespace cache {
+namespace pwl {
+namespace rwl {
+
+template <typename T>
+class Builder : public pwl::Builder<T> {
+public:
+ std::shared_ptr<pwl::WriteLogEntry> create_write_log_entry(
+ uint64_t image_offset_bytes, uint64_t write_bytes) override {
+ return std::make_shared<WriteLogEntry>(image_offset_bytes, write_bytes);
+ }
+ std::shared_ptr<pwl::WriteLogEntry> create_write_log_entry(
+ std::shared_ptr<SyncPointLogEntry> sync_point_entry,
+ uint64_t image_offset_bytes, uint64_t write_bytes) override {
+ return std::make_shared<WriteLogEntry>(
+ sync_point_entry, image_offset_bytes, write_bytes);
+ }
+ std::shared_ptr<pwl::WriteLogEntry> create_writesame_log_entry(
+ uint64_t image_offset_bytes, uint64_t write_bytes,
+ uint32_t data_length) override {
+ return std::make_shared<WriteSameLogEntry>(
+ image_offset_bytes, write_bytes, data_length);
+ }
+ std::shared_ptr<pwl::WriteLogEntry> create_writesame_log_entry(
+ std::shared_ptr<SyncPointLogEntry> sync_point_entry,
+ uint64_t image_offset_bytes, uint64_t write_bytes,
+ uint32_t data_length) override {
+ return std::make_shared<WriteSameLogEntry>(
+ sync_point_entry, image_offset_bytes, write_bytes, data_length);
+ }
+ pwl::C_WriteRequest<T> *create_write_request(
+ T &pwl, utime_t arrived, io::Extents &&image_extents,
+ bufferlist&& bl, const int fadvise_flags, ceph::mutex &lock,
+ PerfCounters *perfcounter, Context *user_req) override {
+ return new C_WriteRequest<T>(
+ pwl, arrived, std::move(image_extents), std::move(bl),
+ fadvise_flags, lock, perfcounter, user_req);
+ }
+ pwl::C_WriteSameRequest<T> *create_writesame_request(
+ T &pwl, utime_t arrived, io::Extents &&image_extents,
+ bufferlist&& bl, const int fadvise_flags, ceph::mutex &lock,
+ PerfCounters *perfcounter, Context *user_req) override {
+ return new C_WriteSameRequest<T>(
+ pwl, arrived, std::move(image_extents), std::move(bl),
+ fadvise_flags, lock, perfcounter, user_req);
+ }
+ pwl::C_WriteRequest<T> *create_comp_and_write_request(
+ T &pwl, utime_t arrived, io::Extents &&image_extents,
+ bufferlist&& cmp_bl, bufferlist&& bl, uint64_t *mismatch_offset,
+ const int fadvise_flags, ceph::mutex &lock,
+ PerfCounters *perfcounter, Context *user_req) override {
+ return new rwl::C_CompAndWriteRequest<T>(
+ pwl, arrived, std::move(image_extents), std::move(cmp_bl),
+ std::move(bl), mismatch_offset, fadvise_flags,
+ lock, perfcounter, user_req);
+ }
+ std::shared_ptr<pwl::WriteLogOperation> create_write_log_operation(
+ WriteLogOperationSet &set, uint64_t image_offset_bytes,
+ uint64_t write_bytes, CephContext *cct,
+ std::shared_ptr<pwl::WriteLogEntry> write_log_entry) {
+ return std::make_shared<WriteLogOperation>(
+ set, image_offset_bytes, write_bytes, cct, write_log_entry);
+ }
+ std::shared_ptr<pwl::WriteLogOperation> create_write_log_operation(
+ WriteLogOperationSet &set, uint64_t image_offset_bytes,
+ uint64_t write_bytes, uint32_t data_len, CephContext *cct,
+ std::shared_ptr<pwl::WriteLogEntry> writesame_log_entry) {
+ return std::make_shared<WriteLogOperation>(
+ set, image_offset_bytes, write_bytes, data_len, cct,
+ writesame_log_entry);
+ }
+ std::shared_ptr<pwl::DiscardLogOperation> create_discard_log_operation(
+ std::shared_ptr<SyncPoint> sync_point, uint64_t image_offset_bytes,
+ uint64_t write_bytes, uint32_t discard_granularity_bytes,
+ utime_t dispatch_time, PerfCounters *perfcounter, CephContext *cct) {
+ return std::make_shared<DiscardLogOperation>(
+ sync_point, image_offset_bytes, write_bytes, discard_granularity_bytes,
+ dispatch_time, perfcounter, cct);
+ }
+ C_ReadRequest *create_read_request(CephContext *cct, utime_t arrived,
+ PerfCounters *perfcounter, ceph::bufferlist *bl, Context *on_finish) {
+ return new C_ReadRequest(cct, arrived, perfcounter, bl, on_finish);
+ }
+};
+
+} // namespace rwl
+} // namespace pwl
+} // namespace cache
+} // namespace librbd
+
+#endif // CEPH_LIBRBD_CACHE_PWL_RWL_BUILDER_H
diff --git a/src/librbd/cache/pwl/rwl/LogEntry.cc b/src/librbd/cache/pwl/rwl/LogEntry.cc
new file mode 100644
index 000000000..38e09c22a
--- /dev/null
+++ b/src/librbd/cache/pwl/rwl/LogEntry.cc
@@ -0,0 +1,106 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include "librbd/cache/ImageWriteback.h"
+#include "LogEntry.h"
+
+#define dout_subsys ceph_subsys_rbd_pwl
+#undef dout_prefix
+#define dout_prefix *_dout << "librbd::cache::pwl::rwl::WriteLogEntry: " \
+ << this << " " << __func__ << ": "
+
+namespace librbd {
+namespace cache {
+namespace pwl {
+namespace rwl {
+
+void WriteLogEntry::writeback(
+ librbd::cache::ImageWritebackInterface &image_writeback, Context *ctx) {
+ /* Pass a copy of the pmem buffer to ImageWriteback (which may hang on to the
+ * bl even after flush()). */
+ bufferlist entry_bl;
+ buffer::list entry_bl_copy;
+ copy_cache_bl(&entry_bl_copy);
+ entry_bl_copy.begin(0).copy(write_bytes(), entry_bl);
+ image_writeback.aio_write({{ram_entry.image_offset_bytes,
+ ram_entry.write_bytes}},
+ std::move(entry_bl), 0, ctx);
+}
+
+void WriteLogEntry::init_cache_bp() {
+ ceph_assert(!this->cache_bp.have_raw());
+ cache_bp = buffer::ptr(buffer::create_static(this->write_bytes(),
+ (char*)this->cache_buffer));
+}
+
+void WriteLogEntry::init_bl(buffer::ptr &bp, buffer::list &bl) {
+ if(!is_writesame) {
+ bl.append(bp);
+ return;
+ }
+ for (uint64_t i = 0; i < ram_entry.write_bytes / ram_entry.ws_datalen; i++) {
+ bl.append(bp);
+ }
+ int trailing_partial = ram_entry.write_bytes % ram_entry.ws_datalen;
+ if (trailing_partial) {
+ bl.append(bp, 0, trailing_partial);
+ }
+}
+
+void WriteLogEntry::init_cache_buffer(
+ std::vector<WriteBufferAllocation>::iterator allocation) {
+ this->ram_entry.write_data = allocation->buffer_oid;
+ ceph_assert(!TOID_IS_NULL(this->ram_entry.write_data));
+ cache_buffer = D_RW(this->ram_entry.write_data);
+}
+
+buffer::list& WriteLogEntry::get_cache_bl() {
+ if (0 == bl_refs) {
+ std::lock_guard locker(m_entry_bl_lock);
+ if (0 == bl_refs) {
+ //init pmem bufferlist
+ cache_bl.clear();
+ init_cache_bp();
+ ceph_assert(cache_bp.have_raw());
+ int before_bl = cache_bp.raw_nref();
+ this->init_bl(cache_bp, cache_bl);
+ int after_bl = cache_bp.raw_nref();
+ bl_refs = after_bl - before_bl;
+ }
+ ceph_assert(0 != bl_refs);
+ }
+ return cache_bl;
+}
+
+void WriteLogEntry::copy_cache_bl(bufferlist *out_bl) {
+ this->get_cache_bl();
+ // cache_bp is now initialized
+ ceph_assert(cache_bp.length() == cache_bp.raw_length());
+ buffer::ptr cloned_bp = cache_bp.begin_deep().get_ptr(cache_bp.length());
+ out_bl->clear();
+ this->init_bl(cloned_bp, *out_bl);
+}
+
+unsigned int WriteLogEntry::reader_count() const {
+ if (cache_bp.have_raw()) {
+ return (cache_bp.raw_nref() - bl_refs - 1);
+ } else {
+ return 0;
+ }
+}
+
+void WriteSameLogEntry::writeback(
+ librbd::cache::ImageWritebackInterface &image_writeback, Context *ctx) {
+ bufferlist entry_bl;
+ buffer::list entry_bl_copy;
+ copy_cache_bl(&entry_bl_copy);
+ entry_bl_copy.begin(0).copy(write_bytes(), entry_bl);
+ image_writeback.aio_writesame(ram_entry.image_offset_bytes,
+ ram_entry.write_bytes,
+ std::move(entry_bl), 0, ctx);
+}
+
+} // namespace rwl
+} // namespace pwl
+} // namespace cache
+} // namespace librbd
diff --git a/src/librbd/cache/pwl/rwl/LogEntry.h b/src/librbd/cache/pwl/rwl/LogEntry.h
new file mode 100644
index 000000000..a4675c5fb
--- /dev/null
+++ b/src/librbd/cache/pwl/rwl/LogEntry.h
@@ -0,0 +1,68 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#ifndef CEPH_LIBRBD_CACHE_PWL_RWL_LOG_ENTRY_H
+#define CEPH_LIBRBD_CACHE_PWL_RWL_LOG_ENTRY_H
+
+#include "librbd/cache/pwl/LogEntry.h"
+
+namespace librbd {
+namespace cache {
+class ImageWritebackInterface;
+namespace pwl {
+namespace rwl {
+
+class WriteLogEntry : public pwl::WriteLogEntry {
+public:
+ WriteLogEntry(std::shared_ptr<SyncPointLogEntry> sync_point_entry,
+ uint64_t image_offset_bytes, uint64_t write_bytes)
+ : pwl::WriteLogEntry(sync_point_entry, image_offset_bytes, write_bytes) {}
+ WriteLogEntry(uint64_t image_offset_bytes, uint64_t write_bytes)
+ : pwl::WriteLogEntry(image_offset_bytes, write_bytes) {}
+ WriteLogEntry(std::shared_ptr<SyncPointLogEntry> sync_point_entry,
+ uint64_t image_offset_bytes, uint64_t write_bytes,
+ uint32_t data_length)
+ : pwl::WriteLogEntry(sync_point_entry, image_offset_bytes, write_bytes,
+ data_length) {}
+ WriteLogEntry(uint64_t image_offset_bytes, uint64_t write_bytes,
+ uint32_t data_length)
+ : pwl::WriteLogEntry(image_offset_bytes, write_bytes, data_length) {}
+ ~WriteLogEntry() {}
+ WriteLogEntry(const WriteLogEntry&) = delete;
+ WriteLogEntry &operator=(const WriteLogEntry&) = delete;
+
+ void writeback(librbd::cache::ImageWritebackInterface &image_writeback,
+ Context *ctx) override;
+ void init_cache_bp() override;
+ void init_bl(buffer::ptr &bp, buffer::list &bl) override;
+ void init_cache_buffer(
+ std::vector<WriteBufferAllocation>::iterator allocation) override;
+ buffer::list &get_cache_bl() override;
+ void copy_cache_bl(bufferlist *out_bl) override;
+ unsigned int reader_count() const override;
+};
+
+class WriteSameLogEntry : public WriteLogEntry {
+public:
+ WriteSameLogEntry(std::shared_ptr<SyncPointLogEntry> sync_point_entry,
+ uint64_t image_offset_bytes, uint64_t write_bytes,
+ uint32_t data_length)
+ : WriteLogEntry(sync_point_entry, image_offset_bytes, write_bytes,
+ data_length) {}
+ WriteSameLogEntry(uint64_t image_offset_bytes, uint64_t write_bytes,
+ uint32_t data_length)
+ : WriteLogEntry(image_offset_bytes, write_bytes, data_length) {}
+ ~WriteSameLogEntry() {}
+ WriteSameLogEntry(const WriteSameLogEntry&) = delete;
+ WriteSameLogEntry &operator=(const WriteSameLogEntry&) = delete;
+
+ void writeback(librbd::cache::ImageWritebackInterface &image_writeback,
+ Context *ctx) override;
+};
+
+} // namespace rwl
+} // namespace pwl
+} // namespace cache
+} // namespace librbd
+
+#endif // CEPH_LIBRBD_CACHE_PWL_RWL_LOG_ENTRY_H
diff --git a/src/librbd/cache/pwl/rwl/LogOperation.cc b/src/librbd/cache/pwl/rwl/LogOperation.cc
new file mode 100644
index 000000000..53fb917b2
--- /dev/null
+++ b/src/librbd/cache/pwl/rwl/LogOperation.cc
@@ -0,0 +1,39 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include "LogOperation.h"
+
+#define dout_subsys ceph_subsys_rbd_pwl
+#undef dout_prefix
+#define dout_prefix *_dout << "librbd::cache::pwl::rwl::LogOperation: " \
+ << this << " " << __func__ << ": "
+
+namespace librbd {
+namespace cache {
+namespace pwl {
+namespace rwl {
+
+void WriteLogOperation::copy_bl_to_cache_buffer(
+ std::vector<WriteBufferAllocation>::iterator allocation) {
+ /* operation is a shared_ptr, so write_op is only good as long as operation is
+ * in scope */
+ bufferlist::iterator i(&bl);
+ m_perfcounter->inc(l_librbd_pwl_log_op_bytes, log_entry->write_bytes());
+ ldout(m_cct, 20) << bl << dendl;
+ log_entry->init_cache_buffer(allocation);
+ i.copy((unsigned)log_entry->write_bytes(), (char*)log_entry->cache_buffer);
+}
+
+void DiscardLogOperation::init_op(
+ uint64_t current_sync_gen, bool persist_on_flush,
+ uint64_t last_op_sequence_num, Context *write_persist,
+ Context *write_append) {
+ log_entry->init(current_sync_gen, persist_on_flush, last_op_sequence_num);
+ this->on_write_append = write_append;
+ this->on_write_persist = write_persist;
+}
+
+} // namespace rwl
+} // namespace pwl
+} // namespace cache
+} // namespace librbd
diff --git a/src/librbd/cache/pwl/rwl/LogOperation.h b/src/librbd/cache/pwl/rwl/LogOperation.h
new file mode 100644
index 000000000..874ac77fb
--- /dev/null
+++ b/src/librbd/cache/pwl/rwl/LogOperation.h
@@ -0,0 +1,55 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#ifndef CEPH_LIBRBD_CACHE_PWL_RWL_LOG_OPERATION_H
+#define CEPH_LIBRBD_CACHE_PWL_RWL_LOG_OPERATION_H
+
+#include "librbd/cache/pwl/LogOperation.h"
+
+
+namespace librbd {
+namespace cache {
+namespace pwl {
+namespace rwl {
+
+class WriteLogOperation : public pwl::WriteLogOperation {
+public:
+ WriteLogOperation(
+ WriteLogOperationSet &set, uint64_t image_offset_bytes,
+ uint64_t write_bytes, CephContext *cct,
+ std::shared_ptr<pwl::WriteLogEntry> write_log_entry)
+ : pwl::WriteLogOperation(set, image_offset_bytes, write_bytes, cct,
+ write_log_entry) {}
+
+ WriteLogOperation(
+ WriteLogOperationSet &set, uint64_t image_offset_bytes,
+ uint64_t write_bytes, uint32_t data_len, CephContext *cct,
+ std::shared_ptr<pwl::WriteLogEntry> writesame_log_entry)
+ : pwl::WriteLogOperation(set, image_offset_bytes, write_bytes, cct,
+ writesame_log_entry) {}
+
+ void copy_bl_to_cache_buffer(
+ std::vector<WriteBufferAllocation>::iterator allocation) override;
+};
+
+class DiscardLogOperation : public pwl::DiscardLogOperation {
+public:
+ DiscardLogOperation(
+ std::shared_ptr<SyncPoint> sync_point, uint64_t image_offset_bytes,
+ uint64_t write_bytes, uint32_t discard_granularity_bytes,
+ utime_t dispatch_time, PerfCounters *perfcounter, CephContext *cct)
+ : pwl::DiscardLogOperation(sync_point, image_offset_bytes, write_bytes,
+ discard_granularity_bytes, dispatch_time,
+ perfcounter, cct) {}
+ void init_op(
+ uint64_t current_sync_gen, bool persist_on_flush,
+ uint64_t last_op_sequence_num, Context *write_persist,
+ Context *write_append) override;
+};
+
+} // namespace rwl
+} // namespace pwl
+} // namespace cache
+} // namespace librbd
+
+#endif // CEPH_LIBRBD_CACHE_PWL_RWL_LOG_OPERATION_H
diff --git a/src/librbd/cache/pwl/rwl/ReadRequest.cc b/src/librbd/cache/pwl/rwl/ReadRequest.cc
new file mode 100644
index 000000000..f91f8e5a7
--- /dev/null
+++ b/src/librbd/cache/pwl/rwl/ReadRequest.cc
@@ -0,0 +1,70 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include "ReadRequest.h"
+
+#define dout_subsys ceph_subsys_rbd_pwl
+#undef dout_prefix
+#define dout_prefix *_dout << "librbd::cache::pwl::rwl::ReadRequest: " << this << " " \
+ << __func__ << ": "
+
+namespace librbd {
+namespace cache {
+namespace pwl {
+namespace rwl {
+
+void C_ReadRequest::finish(int r) {
+ ldout(m_cct, 20) << "(" << get_name() << "): r=" << r << dendl;
+ int hits = 0;
+ int misses = 0;
+ int hit_bytes = 0;
+ int miss_bytes = 0;
+ if (r >= 0) {
+ /*
+ * At this point the miss read has completed. We'll iterate through
+ * read_extents and produce *m_out_bl by assembling pieces of miss_bl
+ * and the individual hit extent bufs in the read extents that represent
+ * hits.
+ */
+ uint64_t miss_bl_offset = 0;
+ for (auto extent : read_extents) {
+ if (extent->m_bl.length()) {
+ /* This was a hit */
+ ceph_assert(extent->second == extent->m_bl.length());
+ ++hits;
+ hit_bytes += extent->second;
+ m_out_bl->claim_append(extent->m_bl);
+ } else {
+ /* This was a miss. */
+ ++misses;
+ miss_bytes += extent->second;
+ bufferlist miss_extent_bl;
+ miss_extent_bl.substr_of(miss_bl, miss_bl_offset, extent->second);
+ /* Add this read miss bufferlist to the output bufferlist */
+ m_out_bl->claim_append(miss_extent_bl);
+ /* Consume these bytes in the read miss bufferlist */
+ miss_bl_offset += extent->second;
+ }
+ }
+ }
+ ldout(m_cct, 20) << "(" << get_name() << "): r=" << r << " bl=" << *m_out_bl << dendl;
+ utime_t now = ceph_clock_now();
+ ceph_assert((int)m_out_bl->length() == hit_bytes + miss_bytes);
+ m_on_finish->complete(r);
+ m_perfcounter->inc(l_librbd_pwl_rd_bytes, hit_bytes + miss_bytes);
+ m_perfcounter->inc(l_librbd_pwl_rd_hit_bytes, hit_bytes);
+ m_perfcounter->tinc(l_librbd_pwl_rd_latency, now - m_arrived_time);
+ if (!misses) {
+ m_perfcounter->inc(l_librbd_pwl_rd_hit_req, 1);
+ m_perfcounter->tinc(l_librbd_pwl_rd_hit_latency, now - m_arrived_time);
+ } else {
+ if (hits) {
+ m_perfcounter->inc(l_librbd_pwl_rd_part_hit_req, 1);
+ }
+ }
+}
+
+} // namespace rwl
+} // namespace pwl
+} // namespace cache
+} // namespace librbd
diff --git a/src/librbd/cache/pwl/rwl/ReadRequest.h b/src/librbd/cache/pwl/rwl/ReadRequest.h
new file mode 100644
index 000000000..25168e83b
--- /dev/null
+++ b/src/librbd/cache/pwl/rwl/ReadRequest.h
@@ -0,0 +1,34 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#ifndef CEPH_LIBRBD_CACHE_PWL_RWL_READ_REQUEST_H
+#define CEPH_LIBRBD_CACHE_PWL_RWL_READ_REQUEST_H
+
+#include "librbd/cache/pwl/ReadRequest.h"
+
+namespace librbd {
+namespace cache {
+namespace pwl {
+namespace rwl {
+
+typedef std::vector<pwl::ImageExtentBuf> ImageExtentBufs;
+
+class C_ReadRequest : public pwl::C_ReadRequest {
+protected:
+ using pwl::C_ReadRequest::m_cct;
+ using pwl::C_ReadRequest::m_on_finish;
+ using pwl::C_ReadRequest::m_out_bl;
+ using pwl::C_ReadRequest::m_arrived_time;
+ using pwl::C_ReadRequest::m_perfcounter;
+public:
+ C_ReadRequest(CephContext *cct, utime_t arrived, PerfCounters *perfcounter, bufferlist *out_bl, Context *on_finish)
+ : pwl::C_ReadRequest(cct, arrived, perfcounter, out_bl, on_finish) {}
+ void finish(int r) override;
+};
+
+} // namespace rwl
+} // namespace pwl
+} // namespace cache
+} // namespace librbd
+
+#endif // CEPH_LIBRBD_CACHE_PWL_RWL_READ_REQUEST_H
diff --git a/src/librbd/cache/pwl/rwl/Request.cc b/src/librbd/cache/pwl/rwl/Request.cc
new file mode 100644
index 000000000..a6b81d55b
--- /dev/null
+++ b/src/librbd/cache/pwl/rwl/Request.cc
@@ -0,0 +1,86 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include "Request.h"
+#include "librbd/cache/pwl/AbstractWriteLog.h"
+
+#define dout_subsys ceph_subsys_rbd_pwl
+#undef dout_prefix
+#define dout_prefix *_dout << "librbd::cache::pwl::rwl::Request: " << this \
+ << " " << __func__ << ": "
+
+namespace librbd {
+namespace cache {
+namespace pwl {
+namespace rwl {
+
+template <typename T>
+void C_WriteRequest<T>::setup_buffer_resources(
+ uint64_t *bytes_cached, uint64_t *bytes_dirtied, uint64_t *bytes_allocated,
+ uint64_t *number_lanes, uint64_t *number_log_entries,
+ uint64_t *number_unpublished_reserves) {
+
+ ceph_assert(!this->m_resources.allocated);
+
+ auto image_extents_size = this->image_extents.size();
+ this->m_resources.buffers.reserve(image_extents_size);
+
+ *bytes_cached = 0;
+ *bytes_allocated = 0;
+ *number_lanes = image_extents_size;
+ *number_log_entries = image_extents_size;
+ *number_unpublished_reserves = image_extents_size;
+
+ for (auto &extent : this->image_extents) {
+ this->m_resources.buffers.emplace_back();
+ struct WriteBufferAllocation &buffer = this->m_resources.buffers.back();
+ buffer.allocation_size = MIN_WRITE_ALLOC_SIZE;
+ buffer.allocated = false;
+ *bytes_cached += extent.second;
+ if (extent.second > buffer.allocation_size) {
+ buffer.allocation_size = extent.second;
+ }
+ *bytes_allocated += buffer.allocation_size;
+ }
+ *bytes_dirtied = *bytes_cached;
+}
+
+template <typename T>
+std::ostream &operator<<(std::ostream &os,
+ const C_CompAndWriteRequest<T> &req) {
+ os << (C_WriteRequest<T>&)req
+ << " cmp_bl=" << req.cmp_bl
+ << ", read_bl=" << req.read_bl
+ << ", compare_succeeded=" << req.compare_succeeded
+ << ", mismatch_offset=" << req.mismatch_offset;
+ return os;
+}
+
+template <typename T>
+void C_WriteSameRequest<T>::setup_buffer_resources(
+ uint64_t *bytes_cached, uint64_t *bytes_dirtied, uint64_t *bytes_allocated,
+ uint64_t *number_lanes, uint64_t *number_log_entries,
+ uint64_t *number_unpublished_reserves) {
+ ceph_assert(this->image_extents.size() == 1);
+ *number_log_entries = 1;
+ *bytes_dirtied += this->image_extents[0].second;
+ auto pattern_length = this->bl.length();
+ this->m_resources.buffers.emplace_back();
+ struct WriteBufferAllocation &buffer = this->m_resources.buffers.back();
+ buffer.allocation_size = MIN_WRITE_ALLOC_SIZE;
+ buffer.allocated = false;
+ *bytes_cached += pattern_length;
+ if (pattern_length > buffer.allocation_size) {
+ buffer.allocation_size = pattern_length;
+ }
+ *bytes_allocated += buffer.allocation_size;
+}
+
+} // namespace rwl
+} // namespace pwl
+} // namespace cache
+} // namespace librbd
+
+template class librbd::cache::pwl::rwl::C_WriteRequest<librbd::cache::pwl::AbstractWriteLog<librbd::ImageCtx> >;
+template class librbd::cache::pwl::rwl::C_WriteSameRequest<librbd::cache::pwl::AbstractWriteLog<librbd::ImageCtx> >;
+template class librbd::cache::pwl::rwl::C_CompAndWriteRequest<librbd::cache::pwl::AbstractWriteLog<librbd::ImageCtx> >;
diff --git a/src/librbd/cache/pwl/rwl/Request.h b/src/librbd/cache/pwl/rwl/Request.h
new file mode 100644
index 000000000..0a5c610d6
--- /dev/null
+++ b/src/librbd/cache/pwl/rwl/Request.h
@@ -0,0 +1,90 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#ifndef CEPH_LIBRBD_CACHE_RWL_REQUEST_H
+#define CEPH_LIBRBD_CACHE_RWL_REQUEST_H
+
+#include "librbd/cache/pwl/Request.h"
+
+namespace librbd {
+class BlockGuardCell;
+
+namespace cache {
+namespace pwl {
+namespace rwl {
+
+template <typename T>
+class C_WriteRequest : public pwl::C_WriteRequest<T> {
+public:
+ C_WriteRequest(
+ T &pwl, const utime_t arrived, io::Extents &&image_extents,
+ bufferlist&& cmp_bl, bufferlist&& bl, uint64_t *mismatch_offset,
+ const int fadvise_flags, ceph::mutex &lock,
+ PerfCounters *perfcounter, Context *user_req)
+ : pwl::C_WriteRequest<T>(
+ pwl, arrived, std::move(image_extents), std::move(cmp_bl),
+ std::move(bl), mismatch_offset, fadvise_flags,
+ lock, perfcounter, user_req) {}
+
+ C_WriteRequest(
+ T &pwl, const utime_t arrived, io::Extents &&image_extents,
+ bufferlist&& bl, const int fadvise_flags, ceph::mutex &lock,
+ PerfCounters *perfcounter, Context *user_req)
+ : pwl::C_WriteRequest<T>(
+ pwl, arrived, std::move(image_extents), std::move(bl),
+ fadvise_flags, lock, perfcounter, user_req) {}
+protected:
+ //Plain writes will allocate one buffer per request extent
+ void setup_buffer_resources(
+ uint64_t *bytes_cached, uint64_t *bytes_dirtied,
+ uint64_t *bytes_allocated, uint64_t *number_lanes,
+ uint64_t *number_log_entries,
+ uint64_t *number_unpublished_reserves) override;
+};
+
+template <typename T>
+class C_CompAndWriteRequest : public C_WriteRequest<T> {
+public:
+ C_CompAndWriteRequest(
+ T &pwl, const utime_t arrived, io::Extents &&image_extents,
+ bufferlist&& cmp_bl, bufferlist&& bl, uint64_t *mismatch_offset,
+ const int fadvise_flags, ceph::mutex &lock,
+ PerfCounters *perfcounter, Context *user_req)
+ : C_WriteRequest<T>(
+ pwl, arrived, std::move(image_extents), std::move(cmp_bl),
+ std::move(bl), mismatch_offset, fadvise_flags,
+ lock, perfcounter, user_req) {}
+
+ const char *get_name() const override {
+ return "C_CompAndWriteRequest";
+ }
+ template <typename U>
+ friend std::ostream &operator<<(std::ostream &os,
+ const C_CompAndWriteRequest<U> &req);
+};
+
+template <typename T>
+class C_WriteSameRequest : public pwl::C_WriteSameRequest<T> {
+public:
+ C_WriteSameRequest(
+ T &pwl, const utime_t arrived, io::Extents &&image_extents,
+ bufferlist&& bl, const int fadvise_flags, ceph::mutex &lock,
+ PerfCounters *perfcounter, Context *user_req)
+ : pwl::C_WriteSameRequest<T>(
+ pwl, arrived, std::move(image_extents), std::move(bl), fadvise_flags,
+ lock, perfcounter, user_req) {}
+
+ void setup_buffer_resources(
+ uint64_t *bytes_cached, uint64_t *bytes_dirtied,
+ uint64_t *bytes_allocated, uint64_t *number_lanes,
+ uint64_t *number_log_entries,
+ uint64_t *number_unpublished_reserves) override;
+
+};
+
+} // namespace rwl
+} // namespace pwl
+} // namespace cache
+} // namespace librbd
+
+#endif // CEPH_LIBRBD_CACHE_RWL_REQUEST_H
diff --git a/src/librbd/cache/pwl/rwl/WriteLog.cc b/src/librbd/cache/pwl/rwl/WriteLog.cc
new file mode 100644
index 000000000..e922ba543
--- /dev/null
+++ b/src/librbd/cache/pwl/rwl/WriteLog.cc
@@ -0,0 +1,1011 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include "WriteLog.h"
+#include "include/buffer.h"
+#include "include/Context.h"
+#include "include/ceph_assert.h"
+#include "common/deleter.h"
+#include "common/dout.h"
+#include "common/environment.h"
+#include "common/errno.h"
+#include "common/WorkQueue.h"
+#include "common/Timer.h"
+#include "common/perf_counters.h"
+#include "librbd/ImageCtx.h"
+#include "librbd/asio/ContextWQ.h"
+#include "librbd/cache/pwl/ImageCacheState.h"
+#include "librbd/cache/pwl/LogEntry.h"
+#include "librbd/plugin/Api.h"
+#include <map>
+#include <vector>
+
+#undef dout_subsys
+#define dout_subsys ceph_subsys_rbd_pwl
+#undef dout_prefix
+#define dout_prefix *_dout << "librbd::cache::pwl::rwl::WriteLog: " << this \
+ << " " << __func__ << ": "
+
+namespace librbd {
+namespace cache {
+namespace pwl {
+using namespace std;
+using namespace librbd::cache::pwl;
+namespace rwl {
+
+const unsigned long int OPS_APPENDED_TOGETHER = MAX_ALLOC_PER_TRANSACTION;
+
+template <typename I>
+Builder<AbstractWriteLog<I>>* WriteLog<I>::create_builder() {
+ m_builderobj = new Builder<This>();
+ return m_builderobj;
+}
+
+template <typename I>
+WriteLog<I>::WriteLog(
+ I &image_ctx, librbd::cache::pwl::ImageCacheState<I>* cache_state,
+ ImageWritebackInterface& image_writeback,
+ plugin::Api<I>& plugin_api)
+: AbstractWriteLog<I>(image_ctx, cache_state, create_builder(), image_writeback,
+ plugin_api),
+ m_pwl_pool_layout_name(POBJ_LAYOUT_NAME(rbd_pwl))
+{
+}
+
+template <typename I>
+WriteLog<I>::~WriteLog() {
+ m_log_pool = nullptr;
+ delete m_builderobj;
+}
+
+template <typename I>
+void WriteLog<I>::collect_read_extents(
+ uint64_t read_buffer_offset, LogMapEntry<GenericWriteLogEntry> map_entry,
+ std::vector<std::shared_ptr<GenericWriteLogEntry>> &log_entries_to_read,
+ std::vector<bufferlist*> &bls_to_read, uint64_t entry_hit_length,
+ Extent hit_extent, pwl::C_ReadRequest *read_ctx) {
+ /* Make a bl for this hit extent. This will add references to the
+ * write_entry->pmem_bp */
+ buffer::list hit_bl;
+
+ /* Create buffer object referring to pmem pool for this read hit */
+ auto write_entry = map_entry.log_entry;
+
+ buffer::list entry_bl_copy;
+ write_entry->copy_cache_bl(&entry_bl_copy);
+ entry_bl_copy.begin(read_buffer_offset).copy(entry_hit_length, hit_bl);
+ ceph_assert(hit_bl.length() == entry_hit_length);
+
+ /* Add hit extent to read extents */
+ auto hit_extent_buf = std::make_shared<ImageExtentBuf>(hit_extent, hit_bl);
+ read_ctx->read_extents.push_back(hit_extent_buf);
+}
+
+template <typename I>
+void WriteLog<I>::complete_read(
+ std::vector<std::shared_ptr<GenericWriteLogEntry>> &log_entries_to_read,
+ std::vector<bufferlist*> &bls_to_read, Context *ctx) {
+ ctx->complete(0);
+}
+
+/*
+ * Allocate the (already reserved) write log entries for a set of operations.
+ *
+ * Locking:
+ * Acquires lock
+ */
+template <typename I>
+void WriteLog<I>::alloc_op_log_entries(GenericLogOperations &ops)
+{
+ TOID(struct WriteLogPoolRoot) pool_root;
+ pool_root = POBJ_ROOT(m_log_pool, struct WriteLogPoolRoot);
+ struct WriteLogCacheEntry *pmem_log_entries = D_RW(D_RW(pool_root)->log_entries);
+
+ ceph_assert(ceph_mutex_is_locked_by_me(this->m_log_append_lock));
+
+ /* Allocate the (already reserved) log entries */
+ std::unique_lock locker(m_lock);
+
+ for (auto &operation : ops) {
+ uint32_t entry_index = this->m_first_free_entry;
+ this->m_first_free_entry = (this->m_first_free_entry + 1) % this->m_total_log_entries;
+ auto &log_entry = operation->get_log_entry();
+ log_entry->log_entry_index = entry_index;
+ log_entry->ram_entry.entry_index = entry_index;
+ log_entry->cache_entry = &pmem_log_entries[entry_index];
+ log_entry->ram_entry.set_entry_valid(true);
+ m_log_entries.push_back(log_entry);
+ ldout(m_image_ctx.cct, 20) << "operation=[" << *operation << "]" << dendl;
+ }
+ if (m_cache_state->empty && !m_log_entries.empty()) {
+ m_cache_state->empty = false;
+ this->update_image_cache_state();
+ this->write_image_cache_state(locker);
+ }
+}
+
+/*
+ * Write and persist the (already allocated) write log entries and
+ * data buffer allocations for a set of ops. The data buffer for each
+ * of these must already have been persisted to its reserved area.
+ */
+template <typename I>
+int WriteLog<I>::append_op_log_entries(GenericLogOperations &ops)
+{
+ CephContext *cct = m_image_ctx.cct;
+ GenericLogOperationsVector entries_to_flush;
+ TOID(struct WriteLogPoolRoot) pool_root;
+ pool_root = POBJ_ROOT(m_log_pool, struct WriteLogPoolRoot);
+ int ret = 0;
+
+ ceph_assert(ceph_mutex_is_locked_by_me(this->m_log_append_lock));
+
+ if (ops.empty()) {
+ return 0;
+ }
+ entries_to_flush.reserve(OPS_APPENDED_TOGETHER);
+
+ /* Write log entries to ring and persist */
+ utime_t now = ceph_clock_now();
+ for (auto &operation : ops) {
+ if (!entries_to_flush.empty()) {
+ /* Flush these and reset the list if the current entry wraps to the
+ * tail of the ring */
+ if (entries_to_flush.back()->get_log_entry()->log_entry_index >
+ operation->get_log_entry()->log_entry_index) {
+ ldout(m_image_ctx.cct, 20) << "entries to flush wrap around the end of the ring at "
+ << "operation=[" << *operation << "]" << dendl;
+ flush_op_log_entries(entries_to_flush);
+ entries_to_flush.clear();
+ now = ceph_clock_now();
+ }
+ }
+ ldout(m_image_ctx.cct, 20) << "Copying entry for operation at index="
+ << operation->get_log_entry()->log_entry_index
+ << " from " << &operation->get_log_entry()->ram_entry
+ << " to " << operation->get_log_entry()->cache_entry
+ << " operation=[" << *operation << "]" << dendl;
+ operation->log_append_start_time = now;
+ *operation->get_log_entry()->cache_entry = operation->get_log_entry()->ram_entry;
+ ldout(m_image_ctx.cct, 20) << "APPENDING: index="
+ << operation->get_log_entry()->log_entry_index
+ << " pmem_entry=[" << *operation->get_log_entry()->cache_entry
+ << "]" << dendl;
+ entries_to_flush.push_back(operation);
+ }
+ flush_op_log_entries(entries_to_flush);
+
+ /* Drain once for all */
+ pmemobj_drain(m_log_pool);
+
+ /*
+ * Atomically advance the log head pointer and publish the
+ * allocations for all the data buffers they refer to.
+ */
+ utime_t tx_start = ceph_clock_now();
+ TX_BEGIN(m_log_pool) {
+ D_RW(pool_root)->first_free_entry = this->m_first_free_entry;
+ for (auto &operation : ops) {
+ if (operation->reserved_allocated()) {
+ auto write_op = (std::shared_ptr<WriteLogOperation>&) operation;
+ pmemobj_tx_publish(&write_op->buffer_alloc->buffer_alloc_action, 1);
+ } else {
+ ldout(m_image_ctx.cct, 20) << "skipping non-write op: " << *operation << dendl;
+ }
+ }
+ } TX_ONCOMMIT {
+ } TX_ONABORT {
+ lderr(cct) << "failed to commit " << ops.size()
+ << " log entries (" << this->m_log_pool_name << ")" << dendl;
+ ceph_assert(false);
+ ret = -EIO;
+ } TX_FINALLY {
+ } TX_END;
+
+ utime_t tx_end = ceph_clock_now();
+ m_perfcounter->tinc(l_librbd_pwl_append_tx_t, tx_end - tx_start);
+ m_perfcounter->hinc(
+ l_librbd_pwl_append_tx_t_hist, utime_t(tx_end - tx_start).to_nsec(), ops.size());
+ for (auto &operation : ops) {
+ operation->log_append_comp_time = tx_end;
+ }
+
+ return ret;
+}
+
+/*
+ * Flush the persistent write log entries set of ops. The entries must
+ * be contiguous in persistent memory.
+ */
+template <typename I>
+void WriteLog<I>::flush_op_log_entries(GenericLogOperationsVector &ops)
+{
+ if (ops.empty()) {
+ return;
+ }
+
+ if (ops.size() > 1) {
+ ceph_assert(ops.front()->get_log_entry()->cache_entry < ops.back()->get_log_entry()->cache_entry);
+ }
+
+ ldout(m_image_ctx.cct, 20) << "entry count=" << ops.size()
+ << " start address="
+ << ops.front()->get_log_entry()->cache_entry
+ << " bytes="
+ << ops.size() * sizeof(*(ops.front()->get_log_entry()->cache_entry))
+ << dendl;
+ pmemobj_flush(m_log_pool,
+ ops.front()->get_log_entry()->cache_entry,
+ ops.size() * sizeof(*(ops.front()->get_log_entry()->cache_entry)));
+}
+
+template <typename I>
+void WriteLog<I>::remove_pool_file() {
+ if (m_log_pool) {
+ ldout(m_image_ctx.cct, 6) << "closing pmem pool" << dendl;
+ pmemobj_close(m_log_pool);
+ }
+ if (m_cache_state->clean) {
+ ldout(m_image_ctx.cct, 5) << "Removing empty pool file: " << this->m_log_pool_name << dendl;
+ if (remove(this->m_log_pool_name.c_str()) != 0) {
+ lderr(m_image_ctx.cct) << "failed to remove empty pool \"" << this->m_log_pool_name << "\": "
+ << pmemobj_errormsg() << dendl;
+ } else {
+ m_cache_state->present = false;
+ }
+ } else {
+ ldout(m_image_ctx.cct, 5) << "Not removing pool file: " << this->m_log_pool_name << dendl;
+ }
+}
+
+template <typename I>
+bool WriteLog<I>::initialize_pool(Context *on_finish, pwl::DeferredContexts &later) {
+ CephContext *cct = m_image_ctx.cct;
+ int r = -EINVAL;
+ TOID(struct WriteLogPoolRoot) pool_root;
+ ceph_assert(ceph_mutex_is_locked_by_me(m_lock));
+ if (access(this->m_log_pool_name.c_str(), F_OK) != 0) {
+ if ((m_log_pool =
+ pmemobj_create(this->m_log_pool_name.c_str(),
+ this->m_pwl_pool_layout_name,
+ this->m_log_pool_size,
+ (S_IWUSR | S_IRUSR))) == NULL) {
+ lderr(cct) << "failed to create pool: " << this->m_log_pool_name
+ << ". error: " << pmemobj_errormsg() << dendl;
+ m_cache_state->present = false;
+ m_cache_state->clean = true;
+ m_cache_state->empty = true;
+ /* TODO: filter/replace errnos that are meaningless to the caller */
+ on_finish->complete(-errno);
+ return false;
+ }
+ m_cache_state->present = true;
+ m_cache_state->clean = true;
+ m_cache_state->empty = true;
+ pool_root = POBJ_ROOT(m_log_pool, struct WriteLogPoolRoot);
+
+ /* new pool, calculate and store metadata */
+ size_t effective_pool_size = (size_t)(this->m_log_pool_size * USABLE_SIZE);
+ size_t small_write_size = MIN_WRITE_ALLOC_SIZE + BLOCK_ALLOC_OVERHEAD_BYTES + sizeof(struct WriteLogCacheEntry);
+ uint64_t num_small_writes = (uint64_t)(effective_pool_size / small_write_size);
+ if (num_small_writes > MAX_LOG_ENTRIES) {
+ num_small_writes = MAX_LOG_ENTRIES;
+ }
+ if (num_small_writes <= 2) {
+ lderr(cct) << "num_small_writes needs to > 2" << dendl;
+ goto err_close_pool;
+ }
+ this->m_bytes_allocated_cap = effective_pool_size;
+ /* Log ring empty */
+ m_first_free_entry = 0;
+ m_first_valid_entry = 0;
+ TX_BEGIN(m_log_pool) {
+ TX_ADD(pool_root);
+ D_RW(pool_root)->header.layout_version = RWL_LAYOUT_VERSION;
+ D_RW(pool_root)->log_entries =
+ TX_ZALLOC(struct WriteLogCacheEntry,
+ sizeof(struct WriteLogCacheEntry) * num_small_writes);
+ D_RW(pool_root)->pool_size = this->m_log_pool_size;
+ D_RW(pool_root)->flushed_sync_gen = this->m_flushed_sync_gen;
+ D_RW(pool_root)->block_size = MIN_WRITE_ALLOC_SIZE;
+ D_RW(pool_root)->num_log_entries = num_small_writes;
+ D_RW(pool_root)->first_free_entry = m_first_free_entry;
+ D_RW(pool_root)->first_valid_entry = m_first_valid_entry;
+ } TX_ONCOMMIT {
+ this->m_total_log_entries = D_RO(pool_root)->num_log_entries;
+ this->m_free_log_entries = D_RO(pool_root)->num_log_entries - 1; // leave one free
+ } TX_ONABORT {
+ this->m_total_log_entries = 0;
+ this->m_free_log_entries = 0;
+ lderr(cct) << "failed to initialize pool: " << this->m_log_pool_name
+ << ". pmemobj TX errno: " << pmemobj_tx_errno() << dendl;
+ r = -pmemobj_tx_errno();
+ goto err_close_pool;
+ } TX_FINALLY {
+ } TX_END;
+ } else {
+ ceph_assert(m_cache_state->present);
+ /* Open existing pool */
+ if ((m_log_pool =
+ pmemobj_open(this->m_log_pool_name.c_str(),
+ this->m_pwl_pool_layout_name)) == NULL) {
+ lderr(cct) << "failed to open pool (" << this->m_log_pool_name << "): "
+ << pmemobj_errormsg() << dendl;
+ on_finish->complete(-errno);
+ return false;
+ }
+ pool_root = POBJ_ROOT(m_log_pool, struct WriteLogPoolRoot);
+ if (D_RO(pool_root)->header.layout_version != RWL_LAYOUT_VERSION) {
+ // TODO: will handle upgrading version in the future
+ lderr(cct) << "pool layout version is "
+ << D_RO(pool_root)->header.layout_version
+ << " expected " << RWL_LAYOUT_VERSION << dendl;
+ goto err_close_pool;
+ }
+ if (D_RO(pool_root)->block_size != MIN_WRITE_ALLOC_SIZE) {
+ lderr(cct) << "pool block size is " << D_RO(pool_root)->block_size
+ << " expected " << MIN_WRITE_ALLOC_SIZE << dendl;
+ goto err_close_pool;
+ }
+ this->m_log_pool_size = D_RO(pool_root)->pool_size;
+ this->m_flushed_sync_gen = D_RO(pool_root)->flushed_sync_gen;
+ this->m_total_log_entries = D_RO(pool_root)->num_log_entries;
+ m_first_free_entry = D_RO(pool_root)->first_free_entry;
+ m_first_valid_entry = D_RO(pool_root)->first_valid_entry;
+ if (m_first_free_entry < m_first_valid_entry) {
+ /* Valid entries wrap around the end of the ring, so first_free is lower
+ * than first_valid. If first_valid was == first_free+1, the entry at
+ * first_free would be empty. The last entry is never used, so in
+ * that case there would be zero free log entries. */
+ this->m_free_log_entries = this->m_total_log_entries - (m_first_valid_entry - m_first_free_entry) -1;
+ } else {
+ /* first_valid is <= first_free. If they are == we have zero valid log
+ * entries, and n-1 free log entries */
+ this->m_free_log_entries = this->m_total_log_entries - (m_first_free_entry - m_first_valid_entry) -1;
+ }
+ size_t effective_pool_size = (size_t)(this->m_log_pool_size * USABLE_SIZE);
+ this->m_bytes_allocated_cap = effective_pool_size;
+ load_existing_entries(later);
+ m_cache_state->clean = this->m_dirty_log_entries.empty();
+ m_cache_state->empty = m_log_entries.empty();
+ }
+ return true;
+
+err_close_pool:
+ pmemobj_close(m_log_pool);
+ on_finish->complete(r);
+ return false;
+}
+
+/*
+ * Loads the log entries from an existing log.
+ *
+ * Creates the in-memory structures to represent the state of the
+ * re-opened log.
+ *
+ * Finds the last appended sync point, and any sync points referred to
+ * in log entries, but missing from the log. These missing sync points
+ * are created and scheduled for append. Some rudimentary consistency
+ * checking is done.
+ *
+ * Rebuilds the m_blocks_to_log_entries map, to make log entries
+ * readable.
+ *
+ * Places all writes on the dirty entries list, which causes them all
+ * to be flushed.
+ *
+ */
+
+template <typename I>
+void WriteLog<I>::load_existing_entries(DeferredContexts &later) {
+ TOID(struct WriteLogPoolRoot) pool_root;
+ pool_root = POBJ_ROOT(m_log_pool, struct WriteLogPoolRoot);
+ struct WriteLogCacheEntry *pmem_log_entries = D_RW(D_RW(pool_root)->log_entries);
+ uint64_t entry_index = m_first_valid_entry;
+ /* The map below allows us to find sync point log entries by sync
+ * gen number, which is necessary so write entries can be linked to
+ * their sync points. */
+ std::map<uint64_t, std::shared_ptr<SyncPointLogEntry>> sync_point_entries;
+ /* The map below tracks sync points referred to in writes but not
+ * appearing in the sync_point_entries map. We'll use this to
+ * determine which sync points are missing and need to be
+ * created. */
+ std::map<uint64_t, bool> missing_sync_points;
+
+ /*
+ * Read the existing log entries. Construct an in-memory log entry
+ * object of the appropriate type for each. Add these to the global
+ * log entries list.
+ *
+ * Write entries will not link to their sync points yet. We'll do
+ * that in the next pass. Here we'll accumulate a map of sync point
+ * gen numbers that are referred to in writes but do not appearing in
+ * the log.
+ */
+ while (entry_index != m_first_free_entry) {
+ WriteLogCacheEntry *pmem_entry = &pmem_log_entries[entry_index];
+ std::shared_ptr<GenericLogEntry> log_entry = nullptr;
+ ceph_assert(pmem_entry->entry_index == entry_index);
+
+ this->update_entries(&log_entry, pmem_entry, missing_sync_points,
+ sync_point_entries, entry_index);
+
+ log_entry->ram_entry = *pmem_entry;
+ log_entry->cache_entry = pmem_entry;
+ log_entry->log_entry_index = entry_index;
+ log_entry->completed = true;
+
+ m_log_entries.push_back(log_entry);
+
+ entry_index = (entry_index + 1) % this->m_total_log_entries;
+ }
+
+ this->update_sync_points(missing_sync_points, sync_point_entries, later);
+}
+
+template <typename I>
+void WriteLog<I>::inc_allocated_cached_bytes(
+ std::shared_ptr<pwl::GenericLogEntry> log_entry) {
+ if (log_entry->is_write_entry()) {
+ this->m_bytes_allocated += std::max(log_entry->write_bytes(), MIN_WRITE_ALLOC_SIZE);
+ this->m_bytes_cached += log_entry->write_bytes();
+ }
+}
+
+template <typename I>
+void WriteLog<I>::write_data_to_buffer(
+ std::shared_ptr<pwl::WriteLogEntry> ws_entry,
+ WriteLogCacheEntry *pmem_entry) {
+ ws_entry->cache_buffer = D_RW(pmem_entry->write_data);
+}
+
+/**
+ * Retire up to MAX_ALLOC_PER_TRANSACTION of the oldest log entries
+ * that are eligible to be retired. Returns true if anything was
+ * retired.
+ */
+template <typename I>
+bool WriteLog<I>::retire_entries(const unsigned long int frees_per_tx) {
+ CephContext *cct = m_image_ctx.cct;
+ GenericLogEntriesVector retiring_entries;
+ uint32_t initial_first_valid_entry;
+ uint32_t first_valid_entry;
+
+ std::lock_guard retire_locker(this->m_log_retire_lock);
+ ldout(cct, 20) << "Look for entries to retire" << dendl;
+ {
+ /* Entry readers can't be added while we hold m_entry_reader_lock */
+ RWLock::WLocker entry_reader_locker(this->m_entry_reader_lock);
+ std::lock_guard locker(m_lock);
+ initial_first_valid_entry = this->m_first_valid_entry;
+ first_valid_entry = this->m_first_valid_entry;
+ while (!m_log_entries.empty() && retiring_entries.size() < frees_per_tx &&
+ this->can_retire_entry(m_log_entries.front())) {
+ auto entry = m_log_entries.front();
+ if (entry->log_entry_index != first_valid_entry) {
+ lderr(cct) << "retiring entry index (" << entry->log_entry_index
+ << ") and first valid log entry index (" << first_valid_entry
+ << ") must be ==." << dendl;
+ }
+ ceph_assert(entry->log_entry_index == first_valid_entry);
+ first_valid_entry = (first_valid_entry + 1) % this->m_total_log_entries;
+ m_log_entries.pop_front();
+ retiring_entries.push_back(entry);
+ /* Remove entry from map so there will be no more readers */
+ if ((entry->write_bytes() > 0) || (entry->bytes_dirty() > 0)) {
+ auto gen_write_entry = static_pointer_cast<GenericWriteLogEntry>(entry);
+ if (gen_write_entry) {
+ this->m_blocks_to_log_entries.remove_log_entry(gen_write_entry);
+ }
+ }
+ }
+ }
+
+ if (retiring_entries.size()) {
+ ldout(cct, 20) << "Retiring " << retiring_entries.size() << " entries" << dendl;
+ TOID(struct WriteLogPoolRoot) pool_root;
+ pool_root = POBJ_ROOT(m_log_pool, struct WriteLogPoolRoot);
+
+ utime_t tx_start;
+ utime_t tx_end;
+ /* Advance first valid entry and release buffers */
+ {
+ uint64_t flushed_sync_gen;
+ std::lock_guard append_locker(this->m_log_append_lock);
+ {
+ std::lock_guard locker(m_lock);
+ flushed_sync_gen = this->m_flushed_sync_gen;
+ }
+
+ tx_start = ceph_clock_now();
+ TX_BEGIN(m_log_pool) {
+ if (D_RO(pool_root)->flushed_sync_gen < flushed_sync_gen) {
+ ldout(m_image_ctx.cct, 20) << "flushed_sync_gen in log updated from "
+ << D_RO(pool_root)->flushed_sync_gen << " to "
+ << flushed_sync_gen << dendl;
+ D_RW(pool_root)->flushed_sync_gen = flushed_sync_gen;
+ }
+ D_RW(pool_root)->first_valid_entry = first_valid_entry;
+ for (auto &entry: retiring_entries) {
+ if (entry->write_bytes()) {
+ ldout(cct, 20) << "Freeing " << entry->ram_entry.write_data.oid.pool_uuid_lo
+ << "." << entry->ram_entry.write_data.oid.off << dendl;
+ TX_FREE(entry->ram_entry.write_data);
+ } else {
+ ldout(cct, 20) << "Retiring non-write: " << *entry << dendl;
+ }
+ }
+ } TX_ONCOMMIT {
+ } TX_ONABORT {
+ lderr(cct) << "failed to commit free of" << retiring_entries.size()
+ << " log entries (" << this->m_log_pool_name << ")" << dendl;
+ ceph_assert(false);
+ } TX_FINALLY {
+ } TX_END;
+ tx_end = ceph_clock_now();
+ }
+ m_perfcounter->tinc(l_librbd_pwl_retire_tx_t, tx_end - tx_start);
+ m_perfcounter->hinc(l_librbd_pwl_retire_tx_t_hist, utime_t(tx_end - tx_start).to_nsec(),
+ retiring_entries.size());
+
+ bool need_update_state = false;
+ /* Update runtime copy of first_valid, and free entries counts */
+ {
+ std::lock_guard locker(m_lock);
+
+ ceph_assert(this->m_first_valid_entry == initial_first_valid_entry);
+ this->m_first_valid_entry = first_valid_entry;
+ this->m_free_log_entries += retiring_entries.size();
+ if (!m_cache_state->empty && m_log_entries.empty()) {
+ m_cache_state->empty = true;
+ this->update_image_cache_state();
+ need_update_state = true;
+ }
+ for (auto &entry: retiring_entries) {
+ if (entry->write_bytes()) {
+ ceph_assert(this->m_bytes_cached >= entry->write_bytes());
+ this->m_bytes_cached -= entry->write_bytes();
+ uint64_t entry_allocation_size = entry->write_bytes();
+ if (entry_allocation_size < MIN_WRITE_ALLOC_SIZE) {
+ entry_allocation_size = MIN_WRITE_ALLOC_SIZE;
+ }
+ ceph_assert(this->m_bytes_allocated >= entry_allocation_size);
+ this->m_bytes_allocated -= entry_allocation_size;
+ }
+ }
+ this->m_alloc_failed_since_retire = false;
+ this->wake_up();
+ }
+ if (need_update_state) {
+ std::unique_lock locker(m_lock);
+ this->write_image_cache_state(locker);
+ }
+ } else {
+ ldout(cct, 20) << "Nothing to retire" << dendl;
+ return false;
+ }
+ return true;
+}
+
+template <typename I>
+void WriteLog<I>::construct_flush_entries(pwl::GenericLogEntries entries_to_flush,
+ DeferredContexts &post_unlock,
+ bool has_write_entry) {
+ bool invalidating = this->m_invalidating; // snapshot so we behave consistently
+
+ for (auto &log_entry : entries_to_flush) {
+ GuardedRequestFunctionContext *guarded_ctx =
+ new GuardedRequestFunctionContext([this, log_entry, invalidating]
+ (GuardedRequestFunctionContext &guard_ctx) {
+ log_entry->m_cell = guard_ctx.cell;
+ Context *ctx = this->construct_flush_entry(log_entry, invalidating);
+
+ if (!invalidating) {
+ ctx = new LambdaContext(
+ [this, log_entry, ctx](int r) {
+ m_image_ctx.op_work_queue->queue(new LambdaContext(
+ [this, log_entry, ctx](int r) {
+ ldout(m_image_ctx.cct, 15) << "flushing:" << log_entry
+ << " " << *log_entry << dendl;
+ log_entry->writeback(this->m_image_writeback, ctx);
+ }), 0);
+ });
+ }
+
+ ctx->complete(0);
+ });
+ this->detain_flush_guard_request(log_entry, guarded_ctx);
+ }
+}
+
+const unsigned long int ops_flushed_together = 4;
+/*
+ * Performs the pmem buffer flush on all scheduled ops, then schedules
+ * the log event append operation for all of them.
+ */
+template <typename I>
+void WriteLog<I>::flush_then_append_scheduled_ops(void)
+{
+ GenericLogOperations ops;
+ bool ops_remain = false;
+ ldout(m_image_ctx.cct, 20) << dendl;
+ do {
+ {
+ ops.clear();
+ std::lock_guard locker(m_lock);
+ if (m_ops_to_flush.size()) {
+ auto last_in_batch = m_ops_to_flush.begin();
+ unsigned int ops_to_flush = m_ops_to_flush.size();
+ if (ops_to_flush > ops_flushed_together) {
+ ops_to_flush = ops_flushed_together;
+ }
+ ldout(m_image_ctx.cct, 20) << "should flush " << ops_to_flush << dendl;
+ std::advance(last_in_batch, ops_to_flush);
+ ops.splice(ops.end(), m_ops_to_flush, m_ops_to_flush.begin(), last_in_batch);
+ ops_remain = !m_ops_to_flush.empty();
+ ldout(m_image_ctx.cct, 20) << "flushing " << ops.size() << ", remain "
+ << m_ops_to_flush.size() << dendl;
+ } else {
+ ops_remain = false;
+ }
+ }
+ if (ops_remain) {
+ enlist_op_flusher();
+ }
+
+ /* Ops subsequently scheduled for flush may finish before these,
+ * which is fine. We're unconcerned with completion order until we
+ * get to the log message append step. */
+ if (ops.size()) {
+ flush_pmem_buffer(ops);
+ schedule_append_ops(ops, nullptr);
+ }
+ } while (ops_remain);
+ append_scheduled_ops();
+}
+
+/*
+ * Performs the log event append operation for all of the scheduled
+ * events.
+ */
+template <typename I>
+void WriteLog<I>::append_scheduled_ops(void) {
+ GenericLogOperations ops;
+ int append_result = 0;
+ bool ops_remain = false;
+ bool appending = false; /* true if we set m_appending */
+ ldout(m_image_ctx.cct, 20) << dendl;
+ do {
+ ops.clear();
+ this->append_scheduled(ops, ops_remain, appending, true);
+
+ if (ops.size()) {
+ std::lock_guard locker(this->m_log_append_lock);
+ alloc_op_log_entries(ops);
+ append_result = append_op_log_entries(ops);
+ }
+
+ int num_ops = ops.size();
+ if (num_ops) {
+ /* New entries may be flushable. Completion will wake up flusher. */
+ this->complete_op_log_entries(std::move(ops), append_result);
+ }
+ } while (ops_remain);
+}
+
+template <typename I>
+void WriteLog<I>::enlist_op_flusher()
+{
+ this->m_async_flush_ops++;
+ this->m_async_op_tracker.start_op();
+ Context *flush_ctx = new LambdaContext([this](int r) {
+ flush_then_append_scheduled_ops();
+ this->m_async_flush_ops--;
+ this->m_async_op_tracker.finish_op();
+ });
+ this->m_work_queue.queue(flush_ctx);
+}
+
+template <typename I>
+void WriteLog<I>::setup_schedule_append(
+ pwl::GenericLogOperationsVector &ops, bool do_early_flush,
+ C_BlockIORequestT *req) {
+ if (do_early_flush) {
+ /* This caller is waiting for persist, so we'll use their thread to
+ * expedite it */
+ flush_pmem_buffer(ops);
+ this->schedule_append(ops);
+ } else {
+ /* This is probably not still the caller's thread, so do the payload
+ * flushing/replicating later. */
+ schedule_flush_and_append(ops);
+ }
+}
+
+/*
+ * Takes custody of ops. They'll all get their log entries appended,
+ * and have their on_write_persist contexts completed once they and
+ * all prior log entries are persisted everywhere.
+ */
+template <typename I>
+void WriteLog<I>::schedule_append_ops(GenericLogOperations &ops, C_BlockIORequestT *req)
+{
+ bool need_finisher;
+ GenericLogOperationsVector appending;
+
+ std::copy(std::begin(ops), std::end(ops), std::back_inserter(appending));
+ {
+ std::lock_guard locker(m_lock);
+
+ need_finisher = this->m_ops_to_append.empty() && !this->m_appending;
+ this->m_ops_to_append.splice(this->m_ops_to_append.end(), ops);
+ }
+
+ if (need_finisher) {
+ //enlist op appender
+ this->m_async_append_ops++;
+ this->m_async_op_tracker.start_op();
+ Context *append_ctx = new LambdaContext([this](int r) {
+ append_scheduled_ops();
+ this->m_async_append_ops--;
+ this->m_async_op_tracker.finish_op();
+ });
+ this->m_work_queue.queue(append_ctx);
+ }
+
+ for (auto &op : appending) {
+ op->appending();
+ }
+}
+
+/*
+ * Takes custody of ops. They'll all get their pmem blocks flushed,
+ * then get their log entries appended.
+ */
+template <typename I>
+void WriteLog<I>::schedule_flush_and_append(GenericLogOperationsVector &ops)
+{
+ GenericLogOperations to_flush(ops.begin(), ops.end());
+ bool need_finisher;
+ ldout(m_image_ctx.cct, 20) << dendl;
+ {
+ std::lock_guard locker(m_lock);
+
+ need_finisher = m_ops_to_flush.empty();
+ m_ops_to_flush.splice(m_ops_to_flush.end(), to_flush);
+ }
+
+ if (need_finisher) {
+ enlist_op_flusher();
+ }
+}
+
+template <typename I>
+void WriteLog<I>::process_work() {
+ CephContext *cct = m_image_ctx.cct;
+ int max_iterations = 4;
+ bool wake_up_requested = false;
+ uint64_t aggressive_high_water_bytes = this->m_bytes_allocated_cap * AGGRESSIVE_RETIRE_HIGH_WATER;
+ uint64_t high_water_bytes = this->m_bytes_allocated_cap * RETIRE_HIGH_WATER;
+ uint64_t low_water_bytes = this->m_bytes_allocated_cap * RETIRE_LOW_WATER;
+ uint64_t aggressive_high_water_entries = this->m_total_log_entries * AGGRESSIVE_RETIRE_HIGH_WATER;
+ uint64_t high_water_entries = this->m_total_log_entries * RETIRE_HIGH_WATER;
+ uint64_t low_water_entries = this->m_total_log_entries * RETIRE_LOW_WATER;
+
+ ldout(cct, 20) << dendl;
+
+ do {
+ {
+ std::lock_guard locker(m_lock);
+ this->m_wake_up_requested = false;
+ }
+ if (this->m_alloc_failed_since_retire || this->m_invalidating ||
+ this->m_bytes_allocated > high_water_bytes ||
+ (m_log_entries.size() > high_water_entries)) {
+ int retired = 0;
+ utime_t started = ceph_clock_now();
+ ldout(m_image_ctx.cct, 10) << "alloc_fail=" << this->m_alloc_failed_since_retire
+ << ", allocated > high_water="
+ << (this->m_bytes_allocated > high_water_bytes)
+ << ", allocated_entries > high_water="
+ << (m_log_entries.size() > high_water_entries)
+ << dendl;
+ while (this->m_alloc_failed_since_retire || this->m_invalidating ||
+ (this->m_bytes_allocated > high_water_bytes) ||
+ (m_log_entries.size() > high_water_entries) ||
+ (((this->m_bytes_allocated > low_water_bytes) ||
+ (m_log_entries.size() > low_water_entries)) &&
+ (utime_t(ceph_clock_now() - started).to_msec() < RETIRE_BATCH_TIME_LIMIT_MS))) {
+ if (!retire_entries((this->m_shutting_down || this->m_invalidating ||
+ (this->m_bytes_allocated > aggressive_high_water_bytes) ||
+ (m_log_entries.size() > aggressive_high_water_entries) ||
+ this->m_alloc_failed_since_retire)
+ ? MAX_ALLOC_PER_TRANSACTION
+ : MAX_FREE_PER_TRANSACTION)) {
+ break;
+ }
+ retired++;
+ this->dispatch_deferred_writes();
+ this->process_writeback_dirty_entries();
+ }
+ ldout(m_image_ctx.cct, 10) << "Retired " << retired << " times" << dendl;
+ }
+ this->dispatch_deferred_writes();
+ this->process_writeback_dirty_entries();
+
+ {
+ std::lock_guard locker(m_lock);
+ wake_up_requested = this->m_wake_up_requested;
+ }
+ } while (wake_up_requested && --max_iterations > 0);
+
+ {
+ std::lock_guard locker(m_lock);
+ this->m_wake_up_scheduled = false;
+ /* Reschedule if it's still requested */
+ if (this->m_wake_up_requested) {
+ this->wake_up();
+ }
+ }
+}
+
+/*
+ * Flush the pmem regions for the data blocks of a set of operations
+ *
+ * V is expected to be GenericLogOperations<I>, or GenericLogOperationsVector<I>
+ */
+template <typename I>
+template <typename V>
+void WriteLog<I>::flush_pmem_buffer(V& ops)
+{
+ utime_t now = ceph_clock_now();
+ for (auto &operation : ops) {
+ if (operation->reserved_allocated()) {
+ operation->buf_persist_start_time = now;
+ } else {
+ ldout(m_image_ctx.cct, 20) << "skipping non-write op: "
+ << *operation << dendl;
+ }
+ }
+
+ for (auto &operation : ops) {
+ if(operation->is_writing_op()) {
+ auto log_entry = static_pointer_cast<WriteLogEntry>(operation->get_log_entry());
+ pmemobj_flush(m_log_pool, log_entry->cache_buffer, log_entry->write_bytes());
+ }
+ }
+
+ /* Drain once for all */
+ pmemobj_drain(m_log_pool);
+
+ now = ceph_clock_now();
+ for (auto &operation : ops) {
+ if (operation->reserved_allocated()) {
+ operation->buf_persist_comp_time = now;
+ } else {
+ ldout(m_image_ctx.cct, 20) << "skipping non-write op: "
+ << *operation << dendl;
+ }
+ }
+}
+
+/**
+ * Update/persist the last flushed sync point in the log
+ */
+template <typename I>
+void WriteLog<I>::persist_last_flushed_sync_gen()
+{
+ TOID(struct WriteLogPoolRoot) pool_root;
+ pool_root = POBJ_ROOT(m_log_pool, struct WriteLogPoolRoot);
+ uint64_t flushed_sync_gen;
+
+ std::lock_guard append_locker(this->m_log_append_lock);
+ {
+ std::lock_guard locker(m_lock);
+ flushed_sync_gen = this->m_flushed_sync_gen;
+ }
+
+ if (D_RO(pool_root)->flushed_sync_gen < flushed_sync_gen) {
+ ldout(m_image_ctx.cct, 15) << "flushed_sync_gen in log updated from "
+ << D_RO(pool_root)->flushed_sync_gen << " to "
+ << flushed_sync_gen << dendl;
+ TX_BEGIN(m_log_pool) {
+ D_RW(pool_root)->flushed_sync_gen = flushed_sync_gen;
+ } TX_ONCOMMIT {
+ } TX_ONABORT {
+ lderr(m_image_ctx.cct) << "failed to commit update of flushed sync point" << dendl;
+ ceph_assert(false);
+ } TX_FINALLY {
+ } TX_END;
+ }
+}
+
+template <typename I>
+void WriteLog<I>::reserve_cache(C_BlockIORequestT *req,
+ bool &alloc_succeeds, bool &no_space) {
+ std::vector<WriteBufferAllocation>& buffers = req->get_resources_buffers();
+ for (auto &buffer : buffers) {
+ utime_t before_reserve = ceph_clock_now();
+ buffer.buffer_oid = pmemobj_reserve(m_log_pool,
+ &buffer.buffer_alloc_action,
+ buffer.allocation_size,
+ 0 /* Object type */);
+ buffer.allocation_lat = ceph_clock_now() - before_reserve;
+ if (TOID_IS_NULL(buffer.buffer_oid)) {
+ ldout(m_image_ctx.cct, 5) << "can't allocate all data buffers: "
+ << pmemobj_errormsg() << ". "
+ << *req << dendl;
+ alloc_succeeds = false;
+ no_space = true; /* Entries need to be retired */
+
+ if (this->m_free_log_entries == this->m_total_log_entries - 1) {
+ /* When the cache is empty, there is still no space to allocate.
+ * Defragment. */
+ pmemobj_defrag(m_log_pool, NULL, 0, NULL);
+ }
+ break;
+ } else {
+ buffer.allocated = true;
+ }
+ ldout(m_image_ctx.cct, 20) << "Allocated " << buffer.buffer_oid.oid.pool_uuid_lo
+ << "." << buffer.buffer_oid.oid.off
+ << ", size=" << buffer.allocation_size << dendl;
+ }
+}
+
+template<typename I>
+void WriteLog<I>::copy_bl_to_buffer(
+ WriteRequestResources *resources, std::unique_ptr<WriteLogOperationSet> &op_set) {
+ auto allocation = resources->buffers.begin();
+ for (auto &operation : op_set->operations) {
+ operation->copy_bl_to_cache_buffer(allocation);
+ allocation++;
+ }
+}
+
+template <typename I>
+bool WriteLog<I>::alloc_resources(C_BlockIORequestT *req) {
+ bool alloc_succeeds = true;
+ uint64_t bytes_allocated = 0;
+ uint64_t bytes_cached = 0;
+ uint64_t bytes_dirtied = 0;
+ uint64_t num_lanes = 0;
+ uint64_t num_unpublished_reserves = 0;
+ uint64_t num_log_entries = 0;
+
+ ldout(m_image_ctx.cct, 20) << dendl;
+ // Setup buffer, and get all the number of required resources
+ req->setup_buffer_resources(&bytes_cached, &bytes_dirtied, &bytes_allocated,
+ &num_lanes, &num_log_entries, &num_unpublished_reserves);
+
+ alloc_succeeds = this->check_allocation(req, bytes_cached, bytes_dirtied,
+ bytes_allocated, num_lanes, num_log_entries,
+ num_unpublished_reserves);
+
+ std::vector<WriteBufferAllocation>& buffers = req->get_resources_buffers();
+ if (!alloc_succeeds) {
+ /* On alloc failure, free any buffers we did allocate */
+ for (auto &buffer : buffers) {
+ if (buffer.allocated) {
+ pmemobj_cancel(m_log_pool, &buffer.buffer_alloc_action, 1);
+ }
+ }
+ }
+
+ req->set_allocated(alloc_succeeds);
+ return alloc_succeeds;
+}
+
+template <typename I>
+void WriteLog<I>::complete_user_request(Context *&user_req, int r) {
+ user_req->complete(r);
+ // Set user_req as null as it is deleted
+ user_req = nullptr;
+}
+
+} // namespace rwl
+} // namespace pwl
+} // namespace cache
+} // namespace librbd
+
+template class librbd::cache::pwl::rwl::WriteLog<librbd::ImageCtx>;
diff --git a/src/librbd/cache/pwl/rwl/WriteLog.h b/src/librbd/cache/pwl/rwl/WriteLog.h
new file mode 100644
index 000000000..5083a2568
--- /dev/null
+++ b/src/librbd/cache/pwl/rwl/WriteLog.h
@@ -0,0 +1,124 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#ifndef CEPH_LIBRBD_CACHE_REPLICATED_WRITE_LOG
+#define CEPH_LIBRBD_CACHE_REPLICATED_WRITE_LOG
+
+#include <functional>
+#include <libpmemobj.h>
+#include <list>
+#include "common/Timer.h"
+#include "common/RWLock.h"
+#include "common/WorkQueue.h"
+#include "common/AsyncOpTracker.h"
+#include "librbd/cache/ImageWriteback.h"
+#include "librbd/Utils.h"
+#include "librbd/BlockGuard.h"
+#include "librbd/cache/Types.h"
+#include "librbd/cache/pwl/AbstractWriteLog.h"
+#include "librbd/cache/pwl/LogMap.h"
+#include "librbd/cache/pwl/LogOperation.h"
+#include "librbd/cache/pwl/Request.h"
+#include "librbd/cache/pwl/rwl/Builder.h"
+
+class Context;
+
+namespace librbd {
+
+struct ImageCtx;
+
+namespace cache {
+namespace pwl {
+namespace rwl {
+
+template <typename ImageCtxT>
+class WriteLog : public AbstractWriteLog<ImageCtxT> {
+public:
+ WriteLog(
+ ImageCtxT &image_ctx, librbd::cache::pwl::ImageCacheState<ImageCtxT>* cache_state,
+ ImageWritebackInterface& image_writeback,
+ plugin::Api<ImageCtxT>& plugin_api);
+ ~WriteLog();
+ WriteLog(const WriteLog&) = delete;
+ WriteLog &operator=(const WriteLog&) = delete;
+
+ typedef io::Extent Extent;
+ using This = AbstractWriteLog<ImageCtxT>;
+ using C_WriteRequestT = pwl::C_WriteRequest<This>;
+ using C_WriteSameRequestT = pwl::C_WriteSameRequest<This>;
+
+ void copy_bl_to_buffer(
+ WriteRequestResources *resources, std::unique_ptr<WriteLogOperationSet> &op_set) override;
+ void complete_user_request(Context *&user_req, int r) override;
+private:
+ using C_BlockIORequestT = pwl::C_BlockIORequest<This>;
+ using C_FlushRequestT = pwl::C_FlushRequest<This>;
+ using C_DiscardRequestT = pwl::C_DiscardRequest<This>;
+
+ PMEMobjpool *m_log_pool = nullptr;
+ Builder<This> *m_builderobj;
+ const char* m_pwl_pool_layout_name;
+ const uint64_t MAX_EXTENT_SIZE = 1048576;
+
+ Builder<This>* create_builder();
+ void remove_pool_file();
+ void load_existing_entries(pwl::DeferredContexts &later);
+ void alloc_op_log_entries(pwl::GenericLogOperations &ops);
+ int append_op_log_entries(pwl::GenericLogOperations &ops);
+ void flush_then_append_scheduled_ops(void);
+ void enlist_op_flusher();
+ void flush_op_log_entries(pwl::GenericLogOperationsVector &ops);
+ template <typename V>
+ void flush_pmem_buffer(V& ops);
+ void inc_allocated_cached_bytes(
+ std::shared_ptr<pwl::GenericLogEntry> log_entry) override;
+protected:
+ using AbstractWriteLog<ImageCtxT>::m_lock;
+ using AbstractWriteLog<ImageCtxT>::m_log_entries;
+ using AbstractWriteLog<ImageCtxT>::m_image_ctx;
+ using AbstractWriteLog<ImageCtxT>::m_perfcounter;
+ using AbstractWriteLog<ImageCtxT>::m_ops_to_flush;
+ using AbstractWriteLog<ImageCtxT>::m_cache_state;
+ using AbstractWriteLog<ImageCtxT>::m_first_free_entry;
+ using AbstractWriteLog<ImageCtxT>::m_first_valid_entry;
+
+ void process_work() override;
+ void schedule_append_ops(pwl::GenericLogOperations &ops, C_BlockIORequestT *req) override;
+ void append_scheduled_ops(void) override;
+ void reserve_cache(C_BlockIORequestT *req,
+ bool &alloc_succeeds, bool &no_space) override;
+ void collect_read_extents(
+ uint64_t read_buffer_offset, LogMapEntry<GenericWriteLogEntry> map_entry,
+ std::vector<std::shared_ptr<GenericWriteLogEntry>> &log_entries_to_read,
+ std::vector<bufferlist*> &bls_to_read, uint64_t entry_hit_length,
+ Extent hit_extent, pwl::C_ReadRequest *read_ctx) override;
+ void complete_read(
+ std::vector<std::shared_ptr<GenericWriteLogEntry>> &log_entries_to_read,
+ std::vector<bufferlist*> &bls_to_read, Context *ctx) override;
+ bool retire_entries(const unsigned long int frees_per_tx) override;
+ void persist_last_flushed_sync_gen() override;
+ bool alloc_resources(C_BlockIORequestT *req) override;
+ void schedule_flush_and_append(pwl::GenericLogOperationsVector &ops) override;
+ void setup_schedule_append(
+ pwl::GenericLogOperationsVector &ops, bool do_early_flush,
+ C_BlockIORequestT *req) override;
+ void construct_flush_entries(pwl::GenericLogEntries entries_to_flush,
+ DeferredContexts &post_unlock,
+ bool has_write_entry) override;
+ bool initialize_pool(Context *on_finish, pwl::DeferredContexts &later) override;
+ void write_data_to_buffer(
+ std::shared_ptr<pwl::WriteLogEntry> ws_entry,
+ pwl::WriteLogCacheEntry *pmem_entry) override;
+ uint64_t get_max_extent() override {
+ return MAX_EXTENT_SIZE;
+ }
+};
+
+} // namespace rwl
+} // namespace pwl
+} // namespace cache
+} // namespace librbd
+
+extern template class librbd::cache::pwl::rwl::WriteLog<librbd::ImageCtx>;
+
+#endif // CEPH_LIBRBD_CACHE_REPLICATED_WRITE_LOG