From 19fcec84d8d7d21e796c7624e521b60d28ee21ed Mon Sep 17 00:00:00 2001 From: Daniel Baumann Date: Sun, 7 Apr 2024 20:45:59 +0200 Subject: Adding upstream version 16.2.11+ds. Signed-off-by: Daniel Baumann --- src/librbd/cache/pwl/ssd/Builder.h | 108 +++ src/librbd/cache/pwl/ssd/LogEntry.cc | 63 ++ src/librbd/cache/pwl/ssd/LogEntry.h | 75 ++ src/librbd/cache/pwl/ssd/LogOperation.cc | 36 + src/librbd/cache/pwl/ssd/LogOperation.h | 35 + src/librbd/cache/pwl/ssd/ReadRequest.cc | 92 +++ src/librbd/cache/pwl/ssd/ReadRequest.h | 34 + src/librbd/cache/pwl/ssd/Request.cc | 63 ++ src/librbd/cache/pwl/ssd/Request.h | 92 +++ src/librbd/cache/pwl/ssd/Types.h | 51 ++ src/librbd/cache/pwl/ssd/WriteLog.cc | 1158 ++++++++++++++++++++++++++++++ src/librbd/cache/pwl/ssd/WriteLog.h | 156 ++++ 12 files changed, 1963 insertions(+) create mode 100644 src/librbd/cache/pwl/ssd/Builder.h create mode 100644 src/librbd/cache/pwl/ssd/LogEntry.cc create mode 100644 src/librbd/cache/pwl/ssd/LogEntry.h create mode 100644 src/librbd/cache/pwl/ssd/LogOperation.cc create mode 100644 src/librbd/cache/pwl/ssd/LogOperation.h create mode 100644 src/librbd/cache/pwl/ssd/ReadRequest.cc create mode 100644 src/librbd/cache/pwl/ssd/ReadRequest.h create mode 100644 src/librbd/cache/pwl/ssd/Request.cc create mode 100644 src/librbd/cache/pwl/ssd/Request.h create mode 100644 src/librbd/cache/pwl/ssd/Types.h create mode 100644 src/librbd/cache/pwl/ssd/WriteLog.cc create mode 100644 src/librbd/cache/pwl/ssd/WriteLog.h (limited to 'src/librbd/cache/pwl/ssd') diff --git a/src/librbd/cache/pwl/ssd/Builder.h b/src/librbd/cache/pwl/ssd/Builder.h new file mode 100644 index 000000000..07b3fb869 --- /dev/null +++ b/src/librbd/cache/pwl/ssd/Builder.h @@ -0,0 +1,108 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#ifndef CEPH_LIBRBD_CACHE_PWL_SSD_BUILDER_H +#define CEPH_LIBRBD_CACHE_PWL_SSD_BUILDER_H + +#include +#include "LogEntry.h" +#include "ReadRequest.h" +#include "Request.h" +#include "LogOperation.h" + +#include "librbd/cache/ImageWriteback.h" +#include "librbd/cache/pwl/Builder.h" + +namespace librbd { +namespace cache { +namespace pwl { +namespace ssd { + +template +class Builder : public pwl::Builder { +public: + std::shared_ptr create_write_log_entry( + uint64_t image_offset_bytes, uint64_t write_bytes) override { + return std::make_shared(image_offset_bytes, write_bytes); + } + std::shared_ptr create_write_log_entry( + std::shared_ptr sync_point_entry, + uint64_t image_offset_bytes, uint64_t write_bytes) override { + return std::make_shared( + sync_point_entry, image_offset_bytes, write_bytes); + } + std::shared_ptr create_writesame_log_entry( + uint64_t image_offset_bytes, uint64_t write_bytes, + uint32_t data_length) override { + return std::make_shared( + image_offset_bytes, write_bytes, data_length); + } + std::shared_ptr create_writesame_log_entry( + std::shared_ptr sync_point_entry, + uint64_t image_offset_bytes, uint64_t write_bytes, + uint32_t data_length) override { + return std::make_shared( + sync_point_entry, image_offset_bytes, write_bytes, data_length); + } + pwl::C_WriteRequest *create_write_request( + T &pwl, utime_t arrived, io::Extents &&image_extents, + bufferlist&& bl, const int fadvise_flags, ceph::mutex &lock, + PerfCounters *perfcounter, Context *user_req) override { + return new C_WriteRequest( + pwl, arrived, std::move(image_extents), std::move(bl), + fadvise_flags, lock, perfcounter, user_req); + } + pwl::C_WriteSameRequest *create_writesame_request( + T &pwl, utime_t arrived, io::Extents &&image_extents, + bufferlist&& bl, const int fadvise_flags, ceph::mutex &lock, + PerfCounters *perfcounter, Context *user_req) override { + return new C_WriteSameRequest( + pwl, arrived, std::move(image_extents), std::move(bl), + fadvise_flags, lock, perfcounter, user_req); + } + pwl::C_WriteRequest *create_comp_and_write_request( + T &pwl, utime_t arrived, io::Extents &&image_extents, + bufferlist&& cmp_bl, bufferlist&& bl, uint64_t *mismatch_offset, + const int fadvise_flags, ceph::mutex &lock, + PerfCounters *perfcounter, Context *user_req) override { + return new C_CompAndWriteRequest( + pwl, arrived, std::move(image_extents), std::move(cmp_bl), + std::move(bl), mismatch_offset, fadvise_flags, + lock, perfcounter, user_req); + } + std::shared_ptr create_write_log_operation( + WriteLogOperationSet &set, uint64_t image_offset_bytes, + uint64_t write_bytes, CephContext *cct, + std::shared_ptr write_log_entry) { + return std::make_shared( + set, image_offset_bytes, write_bytes, cct, write_log_entry); + } + std::shared_ptr create_write_log_operation( + WriteLogOperationSet &set, uint64_t image_offset_bytes, + uint64_t write_bytes, uint32_t data_len, CephContext *cct, + std::shared_ptr writesame_log_entry) { + return std::make_shared( + set, image_offset_bytes, write_bytes, data_len, cct, + writesame_log_entry); + } + std::shared_ptr create_discard_log_operation( + std::shared_ptr sync_point, uint64_t image_offset_bytes, + uint64_t write_bytes, uint32_t discard_granularity_bytes, + utime_t dispatch_time, PerfCounters *perfcounter, CephContext *cct) { + return std::make_shared( + sync_point, image_offset_bytes, write_bytes, discard_granularity_bytes, + dispatch_time, perfcounter, cct); + } + C_ReadRequest *create_read_request(CephContext *cct, utime_t arrived, + PerfCounters *perfcounter, ceph::bufferlist *bl, Context *on_finish) { + return new C_ReadRequest(cct, arrived, perfcounter, bl, on_finish); + } +}; + + +} // namespace ssd +} // namespace pwl +} // namespace cache +} // namespace librbd + +#endif // CEPH_LIBRBD_CACHE_PWL_SSD_BUILDER_H diff --git a/src/librbd/cache/pwl/ssd/LogEntry.cc b/src/librbd/cache/pwl/ssd/LogEntry.cc new file mode 100644 index 000000000..0e6edd87b --- /dev/null +++ b/src/librbd/cache/pwl/ssd/LogEntry.cc @@ -0,0 +1,63 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#include "librbd/cache/ImageWriteback.h" +#include "librbd/cache/pwl/ssd/LogEntry.h" + +#define dout_subsys ceph_subsys_rbd_pwl +#undef dout_prefix +#define dout_prefix *_dout << "librbd::cache::pwl::ssd::WriteLogEntry: " \ + << this << " " << __func__ << ": " + +namespace librbd { +namespace cache { +namespace pwl { +namespace ssd { + +void WriteLogEntry::init_cache_bl( + bufferlist &src_bl, uint64_t off, uint64_t len) { + cache_bl.clear(); + cache_bl.substr_of(src_bl, off, len); +} + +buffer::list& WriteLogEntry::get_cache_bl() { + return cache_bl; +} + +void WriteLogEntry::copy_cache_bl(bufferlist *out) { + std::lock_guard locker(m_entry_bl_lock); + *out = cache_bl; +} + +void WriteLogEntry::remove_cache_bl() { + std::lock_guard locker(m_entry_bl_lock); + cache_bl.clear(); +} + +unsigned int WriteLogEntry::get_aligned_data_size() const { + if (cache_bl.length()) { + return round_up_to(cache_bl.length(), MIN_WRITE_ALLOC_SSD_SIZE); + } + return round_up_to(write_bytes(), MIN_WRITE_ALLOC_SSD_SIZE); +} + +void WriteLogEntry::writeback_bl( + librbd::cache::ImageWritebackInterface &image_writeback, + Context *ctx, ceph::bufferlist&& bl) { + image_writeback.aio_write({{ram_entry.image_offset_bytes, + ram_entry.write_bytes}}, + std::move(bl), 0, ctx); +} + +void WriteSameLogEntry::writeback_bl( + librbd::cache::ImageWritebackInterface &image_writeback, + Context *ctx, ceph::bufferlist &&bl) { + image_writeback.aio_writesame(ram_entry.image_offset_bytes, + ram_entry.write_bytes, + std::move(bl), 0, ctx); +} + +} // namespace ssd +} // namespace pwl +} // namespace cache +} // namespace librbd diff --git a/src/librbd/cache/pwl/ssd/LogEntry.h b/src/librbd/cache/pwl/ssd/LogEntry.h new file mode 100644 index 000000000..8e26f661f --- /dev/null +++ b/src/librbd/cache/pwl/ssd/LogEntry.h @@ -0,0 +1,75 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// // vim: ts=8 sw=2 smarttab + +#ifndef CEPH_LIBRBD_CACHE_PWL_SSD_LOG_ENTRY_H +#define CEPH_LIBRBD_CACHE_PWL_SSD_LOG_ENTRY_H + +#include "librbd/cache/pwl/LogEntry.h" + +namespace librbd { +namespace cache { +class ImageWritebackInterface; +namespace pwl { +namespace ssd { + +class WriteLogEntry : public pwl::WriteLogEntry { +public: + WriteLogEntry( + std::shared_ptr sync_point_entry, + uint64_t image_offset_bytes, uint64_t write_bytes) + : pwl::WriteLogEntry(sync_point_entry, image_offset_bytes, write_bytes) {} + WriteLogEntry( + uint64_t image_offset_bytes, uint64_t write_bytes) + : pwl::WriteLogEntry(image_offset_bytes, write_bytes) {} + WriteLogEntry( + std::shared_ptr sync_point_entry, + uint64_t image_offset_bytes, uint64_t write_bytes, + uint32_t data_length) + : pwl::WriteLogEntry(sync_point_entry, image_offset_bytes, + write_bytes, data_length) {} + WriteLogEntry( + uint64_t image_offset_bytes, uint64_t write_bytes, + uint32_t data_length) + : pwl::WriteLogEntry(image_offset_bytes, write_bytes, data_length) {} + ~WriteLogEntry() {} + WriteLogEntry(const WriteLogEntry&) = delete; + WriteLogEntry &operator=(const WriteLogEntry&) = delete; + void writeback_bl(librbd::cache::ImageWritebackInterface &image_writeback, + Context *ctx, ceph::bufferlist &&bl) override; + void init_cache_bl(bufferlist &src_bl, uint64_t off, uint64_t len) override; + buffer::list &get_cache_bl() override; + void copy_cache_bl(bufferlist *out) override; + void remove_cache_bl() override; + unsigned int get_aligned_data_size() const override; + void inc_bl_refs() { bl_refs++; }; + void dec_bl_refs() { bl_refs--; }; + unsigned int reader_count() const override { + return bl_refs; + } +}; + +class WriteSameLogEntry : public WriteLogEntry { +public: + WriteSameLogEntry( + std::shared_ptr sync_point_entry, + uint64_t image_offset_bytes, uint64_t write_bytes, + uint32_t data_length) + : WriteLogEntry(sync_point_entry, image_offset_bytes, + write_bytes, data_length) {} + WriteSameLogEntry( + uint64_t image_offset_bytes, uint64_t write_bytes, + uint32_t data_length) + : WriteLogEntry(image_offset_bytes, write_bytes, data_length) {} + ~WriteSameLogEntry() {} + WriteSameLogEntry(const WriteSameLogEntry&) = delete; + WriteSameLogEntry &operator=(const WriteSameLogEntry&) = delete; + void writeback_bl(librbd::cache::ImageWritebackInterface &image_writeback, + Context *ctx, ceph::bufferlist &&bl) override; +}; + +} // namespace ssd +} // namespace pwl +} // namespace cache +} // namespace librbd + +#endif // CEPH_LIBRBD_CACHE_PWL_SSD_LOG_ENTRY_H diff --git a/src/librbd/cache/pwl/ssd/LogOperation.cc b/src/librbd/cache/pwl/ssd/LogOperation.cc new file mode 100644 index 000000000..c8080e37d --- /dev/null +++ b/src/librbd/cache/pwl/ssd/LogOperation.cc @@ -0,0 +1,36 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#include "LogOperation.h" + +#define dout_subsys ceph_subsys_rbd_pwl +#undef dout_prefix +#define dout_prefix *_dout << "librbd::cache::pwl::ssd::LogOperation: " \ + << this << " " << __func__ << ": " + +namespace librbd { +namespace cache { +namespace pwl { +namespace ssd { + +void DiscardLogOperation::init_op( + uint64_t current_sync_gen, bool persist_on_flush, + uint64_t last_op_sequence_num, Context *write_persist, + Context *write_append) { + log_entry->init(current_sync_gen, persist_on_flush, last_op_sequence_num); + if (persist_on_flush) { + this->on_write_append = new LambdaContext( + [write_persist, write_append] (int r) { + write_append->complete(r); + write_persist->complete(r); + }); + } else { + this->on_write_append = write_append; + this->on_write_persist = write_persist; + } +} + +} // namespace ssd +} // namespace pwl +} // namespace cache +} // namespace librbd diff --git a/src/librbd/cache/pwl/ssd/LogOperation.h b/src/librbd/cache/pwl/ssd/LogOperation.h new file mode 100644 index 000000000..dbc89aa73 --- /dev/null +++ b/src/librbd/cache/pwl/ssd/LogOperation.h @@ -0,0 +1,35 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#ifndef CEPH_LIBRBD_CACHE_PWL_SSD_LOG_OPERATION_H +#define CEPH_LIBRBD_CACHE_PWL_SSD_LOG_OPERATION_H + +#include "librbd/cache/pwl/LogOperation.h" + + +namespace librbd { +namespace cache { +namespace pwl { +namespace ssd { + +class DiscardLogOperation : public pwl::DiscardLogOperation { +public: + DiscardLogOperation( + std::shared_ptr sync_point, uint64_t image_offset_bytes, + uint64_t write_bytes, uint32_t discard_granularity_bytes, + utime_t dispatch_time, PerfCounters *perfcounter, CephContext *cct) + : pwl::DiscardLogOperation(sync_point, image_offset_bytes, write_bytes, + discard_granularity_bytes, dispatch_time, + perfcounter, cct) {} + void init_op( + uint64_t current_sync_gen, bool persist_on_flush, + uint64_t last_op_sequence_num, Context *write_persist, + Context *write_append) override; +}; + +} // namespace ssd +} // namespace pwl +} // namespace cache +} // namespace librbd + +#endif // CEPH_LIBRBD_CACHE_PWL_SSD_LOG_OPERATION_H diff --git a/src/librbd/cache/pwl/ssd/ReadRequest.cc b/src/librbd/cache/pwl/ssd/ReadRequest.cc new file mode 100644 index 000000000..1a80a8d8c --- /dev/null +++ b/src/librbd/cache/pwl/ssd/ReadRequest.cc @@ -0,0 +1,92 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#include "ReadRequest.h" + +#define dout_subsys ceph_subsys_rbd_pwl +#undef dout_prefix +#define dout_prefix *_dout << "librbd::cache::pwl::ssd::ReadRequest: " << this << " " \ + << __func__ << ": " + +namespace librbd { +namespace cache { +namespace pwl { +namespace ssd { + +void C_ReadRequest::finish(int r) { + ldout(m_cct, 20) << "(" << get_name() << "): r=" << r << dendl; + int hits = 0; + int misses = 0; + int hit_bytes = 0; + int miss_bytes = 0; + if (r >= 0) { + /* + * At this point the miss read has completed. We'll iterate through + * m_read_extents and produce *m_out_bl by assembling pieces of m_miss_bl + * and the individual hit extent bufs in the read extents that represent + * hits. + */ + uint64_t miss_bl_offset = 0; + for (auto extent : read_extents) { + if (extent->m_bl.length()) { + /* This was a hit */ + bufferlist data_bl; + if (extent->writesame) { + int data_len = extent->m_bl.length(); + int read_buffer_offset = extent->truncate_offset; + if (extent->need_to_truncate && extent->truncate_offset >= data_len) { + read_buffer_offset = (extent->truncate_offset) % data_len; + } + // build data and truncate + bufferlist temp_bl; + uint64_t total_left_bytes = read_buffer_offset + extent->second; + while (total_left_bytes > 0) { + temp_bl.append(extent->m_bl); + total_left_bytes = total_left_bytes - data_len; + } + data_bl.substr_of(temp_bl, read_buffer_offset, extent->second); + m_out_bl->claim_append(data_bl); + } else if (extent->need_to_truncate) { + assert(extent->m_bl.length() >= extent->truncate_offset + extent->second); + data_bl.substr_of(extent->m_bl, extent->truncate_offset, extent->second); + m_out_bl->claim_append(data_bl); + } else { + assert(extent->second == extent->m_bl.length()); + m_out_bl->claim_append(extent->m_bl); + } + ++hits; + hit_bytes += extent->second; + } else { + /* This was a miss. */ + ++misses; + miss_bytes += extent->second; + bufferlist miss_extent_bl; + miss_extent_bl.substr_of(miss_bl, miss_bl_offset, extent->second); + /* Add this read miss bufferlist to the output bufferlist */ + m_out_bl->claim_append(miss_extent_bl); + /* Consume these bytes in the read miss bufferlist */ + miss_bl_offset += extent->second; + } + } + } + ldout(m_cct, 20) << "(" << get_name() << "): r=" << r << " bl=" << *m_out_bl << dendl; + utime_t now = ceph_clock_now(); + ceph_assert((int)m_out_bl->length() == hit_bytes + miss_bytes); + m_on_finish->complete(r); + m_perfcounter->inc(l_librbd_pwl_rd_bytes, hit_bytes + miss_bytes); + m_perfcounter->inc(l_librbd_pwl_rd_hit_bytes, hit_bytes); + m_perfcounter->tinc(l_librbd_pwl_rd_latency, now - m_arrived_time); + if (!misses) { + m_perfcounter->inc(l_librbd_pwl_rd_hit_req, 1); + m_perfcounter->tinc(l_librbd_pwl_rd_hit_latency, now - m_arrived_time); + } else { + if (hits) { + m_perfcounter->inc(l_librbd_pwl_rd_part_hit_req, 1); + } + } +} + +} // namespace ssd +} // namespace pwl +} // namespace cache +} // namespace librbd diff --git a/src/librbd/cache/pwl/ssd/ReadRequest.h b/src/librbd/cache/pwl/ssd/ReadRequest.h new file mode 100644 index 000000000..345c4aa65 --- /dev/null +++ b/src/librbd/cache/pwl/ssd/ReadRequest.h @@ -0,0 +1,34 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#ifndef CEPH_LIBRBD_CACHE_PWL_SSD_READ_REQUEST_H +#define CEPH_LIBRBD_CACHE_PWL_SSD_READ_REQUEST_H + +#include "librbd/cache/pwl/ReadRequest.h" + +namespace librbd { +namespace cache { +namespace pwl { +namespace ssd { + +typedef std::vector ImageExtentBufs; + +class C_ReadRequest : public pwl::C_ReadRequest { +protected: + using pwl::C_ReadRequest::m_cct; + using pwl::C_ReadRequest::m_on_finish; + using pwl::C_ReadRequest::m_out_bl; + using pwl::C_ReadRequest::m_arrived_time; + using pwl::C_ReadRequest::m_perfcounter; +public: + C_ReadRequest(CephContext *cct, utime_t arrived, PerfCounters *perfcounter, bufferlist *out_bl, Context *on_finish) + : pwl::C_ReadRequest(cct, arrived, perfcounter, out_bl, on_finish) {} + void finish(int r) override; +}; + +} // namespace ssd +} // namespace pwl +} // namespace cache +} // namespace librbd + +#endif // CEPH_LIBRBD_CACHE_PWL_SSD_READ_REQUEST_H diff --git a/src/librbd/cache/pwl/ssd/Request.cc b/src/librbd/cache/pwl/ssd/Request.cc new file mode 100644 index 000000000..e92e547c8 --- /dev/null +++ b/src/librbd/cache/pwl/ssd/Request.cc @@ -0,0 +1,63 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#include "Request.h" + +#define dout_subsys ceph_subsys_rbd_pwl +#undef dout_prefix +#define dout_prefix *_dout << "librbd::cache::pwl::ssd::Request: " << this << " " \ + << __func__ << ": " + +namespace librbd { +namespace cache { +namespace pwl { +namespace ssd { + +template +void C_WriteRequest::setup_buffer_resources( + uint64_t *bytes_cached, uint64_t *bytes_dirtied, uint64_t *bytes_allocated, + uint64_t *number_lanes, uint64_t *number_log_entries, + uint64_t *number_unpublished_reserves) { + + *bytes_cached = 0; + *bytes_allocated = 0; + *number_log_entries = this->image_extents.size(); + + for (auto &extent : this->image_extents) { + *bytes_cached += extent.second; + *bytes_allocated += round_up_to(extent.second, MIN_WRITE_ALLOC_SSD_SIZE); + } + *bytes_dirtied = *bytes_cached; +} + +template +std::ostream &operator<<(std::ostream &os, + const C_CompAndWriteRequest &req) { + os << (C_WriteRequest&)req + << "cmp_bl=" << req.cmp_bl << ", " + << "read_bl=" << req.read_bl << ", " + << "compare_succeeded=" << req.compare_succeeded << ", " + << "mismatch_offset=" << req.mismatch_offset; + return os; +} + +template +void C_WriteSameRequest::setup_buffer_resources( + uint64_t *bytes_cached, uint64_t *bytes_dirtied, uint64_t *bytes_allocated, + uint64_t *number_lanes, uint64_t *number_log_entries, + uint64_t *number_unpublished_reserves) { + ceph_assert(this->image_extents.size() == 1); + *number_log_entries = 1; + *bytes_dirtied = this->image_extents[0].second; + *bytes_cached = this->bl.length(); + *bytes_allocated = round_up_to(*bytes_cached, MIN_WRITE_ALLOC_SSD_SIZE); +} + +} // namespace ssd +} // namespace pwl +} // namespace cache +} // namespace librbd + +template class librbd::cache::pwl::ssd::C_WriteRequest >; +template class librbd::cache::pwl::ssd::C_WriteSameRequest >; +template class librbd::cache::pwl::ssd::C_CompAndWriteRequest >; diff --git a/src/librbd/cache/pwl/ssd/Request.h b/src/librbd/cache/pwl/ssd/Request.h new file mode 100644 index 000000000..9bb3e85b9 --- /dev/null +++ b/src/librbd/cache/pwl/ssd/Request.h @@ -0,0 +1,92 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#ifndef CEPH_LIBRBD_CACHE_SSD_REQUEST_H +#define CEPH_LIBRBD_CACHE_SSD_REQUEST_H + +#include "librbd/cache/pwl/Request.h" + +namespace librbd { +class BlockGuardCell; + +namespace cache { +namespace pwl { + +template +class AbstractWriteLog; + +namespace ssd { + +template +class C_WriteRequest : public pwl::C_WriteRequest { +public: + C_WriteRequest( + T &pwl, const utime_t arrived, io::Extents &&image_extents, + bufferlist&& cmp_bl, bufferlist&& bl, uint64_t *mismatch_offset, + const int fadvise_flags, ceph::mutex &lock, + PerfCounters *perfcounter, Context *user_req) + : pwl::C_WriteRequest( + pwl, arrived, std::move(image_extents), std::move(cmp_bl), + std::move(bl), mismatch_offset, fadvise_flags, + lock, perfcounter, user_req) {} + + C_WriteRequest( + T &pwl, const utime_t arrived, io::Extents &&image_extents, + bufferlist&& bl, const int fadvise_flags, ceph::mutex &lock, + PerfCounters *perfcounter, Context *user_req) + : pwl::C_WriteRequest( + pwl, arrived, std::move(image_extents), std::move(bl), + fadvise_flags, lock, perfcounter, user_req) {} +protected: + void setup_buffer_resources( + uint64_t *bytes_cached, uint64_t *bytes_dirtied, + uint64_t *bytes_allocated, uint64_t *number_lanes, + uint64_t *number_log_entries, + uint64_t *number_unpublished_reserves) override; +}; + +template +class C_CompAndWriteRequest : public C_WriteRequest { +public: + C_CompAndWriteRequest( + T &pwl, const utime_t arrived, io::Extents &&image_extents, + bufferlist&& cmp_bl, bufferlist&& bl, uint64_t *mismatch_offset, + const int fadvise_flags, ceph::mutex &lock, + PerfCounters *perfcounter, Context *user_req) + : C_WriteRequest( + pwl, arrived, std::move(image_extents), std::move(cmp_bl), + std::move(bl), mismatch_offset,fadvise_flags, + lock, perfcounter, user_req) {} + + const char *get_name() const override { + return "C_CompAndWriteRequest"; + } + template + friend std::ostream &operator<<(std::ostream &os, + const C_CompAndWriteRequest &req); +}; + +template +class C_WriteSameRequest : public pwl::C_WriteSameRequest { +public: + C_WriteSameRequest( + T &pwl, const utime_t arrived, io::Extents &&image_extents, + bufferlist&& bl, const int fadvise_flags, ceph::mutex &lock, + PerfCounters *perfcounter, Context *user_req) + : pwl::C_WriteSameRequest( + pwl, arrived, std::move(image_extents), std::move(bl), fadvise_flags, + lock, perfcounter, user_req) {} + + void setup_buffer_resources( + uint64_t *bytes_cached, uint64_t *bytes_dirtied, + uint64_t *bytes_allocated, uint64_t *number_lanes, + uint64_t *number_log_entries, + uint64_t *number_unpublished_reserves) override; +}; + +} // namespace ssd +} // namespace pwl +} // namespace cache +} // namespace librbd + +#endif // CEPH_LIBRBD_CACHE_SSD_REQUEST_H diff --git a/src/librbd/cache/pwl/ssd/Types.h b/src/librbd/cache/pwl/ssd/Types.h new file mode 100644 index 000000000..3ebad1fd9 --- /dev/null +++ b/src/librbd/cache/pwl/ssd/Types.h @@ -0,0 +1,51 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#ifndef CEPH_LIBRBD_CACHE_SSD_TYPES_H +#define CEPH_LIBRBD_CACHE_SSD_TYPES_H + +#include "acconfig.h" + +#include "librbd/io/Types.h" +#include "librbd/cache/pwl/Types.h" + +namespace librbd { +namespace cache { +namespace pwl { +namespace ssd { + +struct SuperBlock{ + WriteLogPoolRoot root; + + DENC(SuperBlock, v, p) { + DENC_START(1, 1, p); + denc(v.root, p); + DENC_FINISH(p); + } + + void dump(Formatter *f) const { + f->dump_object("super", root); + } + + static void generate_test_instances(list& ls) { + ls.push_back(new SuperBlock()); + ls.push_back(new SuperBlock); + ls.back()->root.layout_version = 3; + ls.back()->root.cur_sync_gen = 1; + ls.back()->root.pool_size = 10737418240; + ls.back()->root.flushed_sync_gen = 1; + ls.back()->root.block_size = 4096; + ls.back()->root.num_log_entries = 0; + ls.back()->root.first_free_entry = 30601; + ls.back()->root.first_valid_entry = 2; + } +}; + +} // namespace ssd +} // namespace pwl +} // namespace cache +} // namespace librbd + +WRITE_CLASS_DENC(librbd::cache::pwl::ssd::SuperBlock) + +#endif // CEPH_LIBRBD_CACHE_SSD_TYPES_H diff --git a/src/librbd/cache/pwl/ssd/WriteLog.cc b/src/librbd/cache/pwl/ssd/WriteLog.cc new file mode 100644 index 000000000..00626506a --- /dev/null +++ b/src/librbd/cache/pwl/ssd/WriteLog.cc @@ -0,0 +1,1158 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#include "WriteLog.h" +#include "include/buffer.h" +#include "include/Context.h" +#include "include/ceph_assert.h" +#include "common/deleter.h" +#include "common/dout.h" +#include "common/environment.h" +#include "common/errno.h" +#include "common/WorkQueue.h" +#include "common/Timer.h" +#include "common/perf_counters.h" +#include "librbd/ImageCtx.h" +#include "librbd/asio/ContextWQ.h" +#include "librbd/cache/pwl/ImageCacheState.h" +#include "librbd/cache/pwl/LogEntry.h" +#include +#include + +#undef dout_subsys +#define dout_subsys ceph_subsys_rbd_pwl +#undef dout_prefix +#define dout_prefix *_dout << "librbd::cache::pwl::ssd::WriteLog: " \ + << this << " " << __func__ << ": " + +namespace librbd { +namespace cache { +namespace pwl { +namespace ssd { + +using namespace librbd::cache::pwl; + +static bool is_valid_pool_root(const WriteLogPoolRoot& root) { + return root.pool_size % MIN_WRITE_ALLOC_SSD_SIZE == 0 && + root.first_valid_entry >= DATA_RING_BUFFER_OFFSET && + root.first_valid_entry < root.pool_size && + root.first_valid_entry % MIN_WRITE_ALLOC_SSD_SIZE == 0 && + root.first_free_entry >= DATA_RING_BUFFER_OFFSET && + root.first_free_entry < root.pool_size && + root.first_free_entry % MIN_WRITE_ALLOC_SSD_SIZE == 0; +} + +template +Builder>* WriteLog::create_builder() { + m_builderobj = new Builder(); + return m_builderobj; +} + +template +WriteLog::WriteLog( + I &image_ctx, librbd::cache::pwl::ImageCacheState* cache_state, + cache::ImageWritebackInterface& image_writeback, + plugin::Api& plugin_api) + : AbstractWriteLog(image_ctx, cache_state, create_builder(), + image_writeback, plugin_api) +{ +} + +template +WriteLog::~WriteLog() { + delete m_builderobj; +} + +template +void WriteLog::collect_read_extents( + uint64_t read_buffer_offset, LogMapEntry map_entry, + std::vector> &log_entries_to_read, + std::vector &bls_to_read, + uint64_t entry_hit_length, Extent hit_extent, + pwl::C_ReadRequest *read_ctx) { + // Make a bl for this hit extent. This will add references to the + // write_entry->cache_bl */ + ldout(m_image_ctx.cct, 5) << dendl; + auto write_entry = static_pointer_cast(map_entry.log_entry); + buffer::list hit_bl; + write_entry->copy_cache_bl(&hit_bl); + bool writesame = write_entry->is_writesame_entry(); + auto hit_extent_buf = std::make_shared( + hit_extent, hit_bl, true, read_buffer_offset, writesame); + read_ctx->read_extents.push_back(hit_extent_buf); + + if (!hit_bl.length()) { + ldout(m_image_ctx.cct, 5) << "didn't hit RAM" << dendl; + auto read_extent = read_ctx->read_extents.back(); + write_entry->inc_bl_refs(); + log_entries_to_read.push_back(std::move(write_entry)); + bls_to_read.push_back(&read_extent->m_bl); + } +} + +template +void WriteLog::complete_read( + std::vector> &log_entries_to_read, + std::vector &bls_to_read, + Context *ctx) { + if (!log_entries_to_read.empty()) { + aio_read_data_blocks(log_entries_to_read, bls_to_read, ctx); + } else { + ctx->complete(0); + } +} + +template +int WriteLog::create_and_open_bdev() { + CephContext *cct = m_image_ctx.cct; + + bdev = BlockDevice::create(cct, this->m_log_pool_name, aio_cache_cb, + nullptr, nullptr, nullptr); + int r = bdev->open(this->m_log_pool_name); + if (r < 0) { + lderr(cct) << "failed to open bdev" << dendl; + delete bdev; + return r; + } + + ceph_assert(this->m_log_pool_size % MIN_WRITE_ALLOC_SSD_SIZE == 0); + if (bdev->get_size() != this->m_log_pool_size) { + lderr(cct) << "size mismatch: bdev size " << bdev->get_size() + << " (block size " << bdev->get_block_size() + << ") != pool size " << this->m_log_pool_size << dendl; + bdev->close(); + delete bdev; + return -EINVAL; + } + + return 0; +} + +template +bool WriteLog::initialize_pool(Context *on_finish, + pwl::DeferredContexts &later) { + int r; + CephContext *cct = m_image_ctx.cct; + + ceph_assert(ceph_mutex_is_locked_by_me(m_lock)); + if (access(this->m_log_pool_name.c_str(), F_OK) != 0) { + int fd = ::open(this->m_log_pool_name.c_str(), O_RDWR|O_CREAT, 0644); + bool succeed = true; + if (fd >= 0) { + if (truncate(this->m_log_pool_name.c_str(), + this->m_log_pool_size) != 0) { + succeed = false; + } + ::close(fd); + } else { + succeed = false; + } + if (!succeed) { + m_cache_state->present = false; + m_cache_state->clean = true; + m_cache_state->empty = true; + /* TODO: filter/replace errnos that are meaningless to the caller */ + on_finish->complete(-errno); + return false; + } + + r = create_and_open_bdev(); + if (r < 0) { + on_finish->complete(r); + return false; + } + m_cache_state->present = true; + m_cache_state->clean = true; + m_cache_state->empty = true; + /* new pool, calculate and store metadata */ + + /* Keep ring buffer at least MIN_WRITE_ALLOC_SSD_SIZE bytes free. + * In this way, when all ring buffer spaces are allocated, + * m_first_free_entry and m_first_valid_entry will not be equal. + * Equal only means the cache is empty. */ + this->m_bytes_allocated_cap = this->m_log_pool_size - + DATA_RING_BUFFER_OFFSET - MIN_WRITE_ALLOC_SSD_SIZE; + /* Log ring empty */ + m_first_free_entry = DATA_RING_BUFFER_OFFSET; + m_first_valid_entry = DATA_RING_BUFFER_OFFSET; + + auto new_root = std::make_shared(pool_root); + new_root->layout_version = SSD_LAYOUT_VERSION; + new_root->pool_size = this->m_log_pool_size; + new_root->flushed_sync_gen = this->m_flushed_sync_gen; + new_root->block_size = MIN_WRITE_ALLOC_SSD_SIZE; + new_root->first_free_entry = m_first_free_entry; + new_root->first_valid_entry = m_first_valid_entry; + new_root->num_log_entries = 0; + pool_root = *new_root; + + r = update_pool_root_sync(new_root); + if (r != 0) { + lderr(cct) << "failed to initialize pool (" + << this->m_log_pool_name << ")" << dendl; + bdev->close(); + delete bdev; + on_finish->complete(r); + return false; + } + } else { + ceph_assert(m_cache_state->present); + r = create_and_open_bdev(); + if (r < 0) { + on_finish->complete(r); + return false; + } + + bufferlist bl; + SuperBlock superblock; + ::IOContext ioctx(cct, nullptr); + r = bdev->read(0, MIN_WRITE_ALLOC_SSD_SIZE, &bl, &ioctx, false); + if (r < 0) { + lderr(cct) << "Read ssd cache superblock failed " << dendl; + goto error_handle; + } + auto p = bl.cbegin(); + decode(superblock, p); + pool_root = superblock.root; + ldout(cct, 1) << "Decoded root: pool_size=" << pool_root.pool_size + << " first_valid_entry=" << pool_root.first_valid_entry + << " first_free_entry=" << pool_root.first_free_entry + << " flushed_sync_gen=" << pool_root.flushed_sync_gen + << dendl; + ceph_assert(is_valid_pool_root(pool_root)); + if (pool_root.layout_version != SSD_LAYOUT_VERSION) { + lderr(cct) << "Pool layout version is " + << pool_root.layout_version + << " expected " << SSD_LAYOUT_VERSION + << dendl; + goto error_handle; + } + if (pool_root.block_size != MIN_WRITE_ALLOC_SSD_SIZE) { + lderr(cct) << "Pool block size is " << pool_root.block_size + << " expected " << MIN_WRITE_ALLOC_SSD_SIZE + << dendl; + goto error_handle; + } + + this->m_log_pool_size = pool_root.pool_size; + this->m_flushed_sync_gen = pool_root.flushed_sync_gen; + this->m_first_valid_entry = pool_root.first_valid_entry; + this->m_first_free_entry = pool_root.first_free_entry; + this->m_bytes_allocated_cap = this->m_log_pool_size - + DATA_RING_BUFFER_OFFSET - + MIN_WRITE_ALLOC_SSD_SIZE; + + load_existing_entries(later); + m_cache_state->clean = this->m_dirty_log_entries.empty(); + m_cache_state->empty = m_log_entries.empty(); + } + return true; + +error_handle: + bdev->close(); + delete bdev; + on_finish->complete(-EINVAL); + return false; +} + +template +void WriteLog::remove_pool_file() { + ceph_assert(bdev); + bdev->close(); + delete bdev; + bdev = nullptr; + ldout(m_image_ctx.cct, 5) << "block device is closed" << dendl; + + if (m_cache_state->clean) { + ldout(m_image_ctx.cct, 5) << "Removing empty pool file: " + << this->m_log_pool_name << dendl; + if (remove(this->m_log_pool_name.c_str()) != 0) { + lderr(m_image_ctx.cct) << "failed to remove empty pool \"" + << this->m_log_pool_name << "\": " << dendl; + } else { + m_cache_state->present = false; + } + } else { + ldout(m_image_ctx.cct, 5) << "Not removing pool file: " + << this->m_log_pool_name << dendl; + } +} + +template +void WriteLog::load_existing_entries(pwl::DeferredContexts &later) { + CephContext *cct = m_image_ctx.cct; + std::map> sync_point_entries; + std::map missing_sync_points; + + // Iterate through the log_entries and append all the write_bytes + // of each entry to fetch the pos of next 4k of log_entries. Iterate + // through the log entries and append them to the in-memory vector + for (uint64_t next_log_pos = this->m_first_valid_entry; + next_log_pos != this->m_first_free_entry; ) { + // read the entries from SSD cache and decode + bufferlist bl_entries; + ::IOContext ioctx_entry(cct, nullptr); + bdev->read(next_log_pos, MIN_WRITE_ALLOC_SSD_SIZE, &bl_entries, + &ioctx_entry, false); + std::vector ssd_log_entries; + auto pl = bl_entries.cbegin(); + decode(ssd_log_entries, pl); + ldout(cct, 5) << "decoded ssd log entries" << dendl; + uint64_t curr_log_pos = next_log_pos; + std::shared_ptr log_entry = nullptr; + + for (auto it = ssd_log_entries.begin(); it != ssd_log_entries.end(); ++it) { + this->update_entries(&log_entry, &*it, missing_sync_points, + sync_point_entries, curr_log_pos); + log_entry->ram_entry = *it; + log_entry->log_entry_index = curr_log_pos; + log_entry->completed = true; + m_log_entries.push_back(log_entry); + next_log_pos += round_up_to(it->write_bytes, MIN_WRITE_ALLOC_SSD_SIZE); + } + // along with the write_bytes, add control block size too + next_log_pos += MIN_WRITE_ALLOC_SSD_SIZE; + if (next_log_pos >= this->m_log_pool_size) { + next_log_pos = next_log_pos % this->m_log_pool_size + DATA_RING_BUFFER_OFFSET; + } + } + this->update_sync_points(missing_sync_points, sync_point_entries, later); + if (m_first_valid_entry > m_first_free_entry) { + m_bytes_allocated = this->m_log_pool_size - m_first_valid_entry + + m_first_free_entry - DATA_RING_BUFFER_OFFSET; + } else { + m_bytes_allocated = m_first_free_entry - m_first_valid_entry; + } +} + +// For SSD we don't calc m_bytes_allocated in this +template +void WriteLog::inc_allocated_cached_bytes( + std::shared_ptr log_entry) { + if (log_entry->is_write_entry()) { + this->m_bytes_cached += log_entry->write_bytes(); + } +} + +template +bool WriteLog::alloc_resources(C_BlockIORequestT *req) { + bool alloc_succeeds = true; + uint64_t bytes_allocated = 0; + uint64_t bytes_cached = 0; + uint64_t bytes_dirtied = 0; + uint64_t num_lanes = 0; + uint64_t num_unpublished_reserves = 0; + uint64_t num_log_entries = 0; + + // Setup buffer, and get all the number of required resources + req->setup_buffer_resources(&bytes_cached, &bytes_dirtied, &bytes_allocated, + &num_lanes, &num_log_entries, + &num_unpublished_reserves); + + ceph_assert(!num_lanes); + if (num_log_entries) { + bytes_allocated += num_log_entries * MIN_WRITE_ALLOC_SSD_SIZE; + num_log_entries = 0; + } + ceph_assert(!num_unpublished_reserves); + + alloc_succeeds = this->check_allocation(req, bytes_cached, bytes_dirtied, + bytes_allocated, num_lanes, + num_log_entries, + num_unpublished_reserves); + req->set_allocated(alloc_succeeds); + return alloc_succeeds; +} + +template +bool WriteLog::has_sync_point_logs(GenericLogOperations &ops) { + for (auto &op : ops) { + if (op->get_log_entry()->is_sync_point()) { + return true; + break; + } + } + return false; +} + +template +void WriteLog::enlist_op_appender() { + this->m_async_append_ops++; + this->m_async_op_tracker.start_op(); + Context *append_ctx = new LambdaContext([this](int r) { + append_scheduled_ops(); + }); + this->m_work_queue.queue(append_ctx); +} +/* + * Takes custody of ops. They'll all get their log entries appended, + * and have their on_write_persist contexts completed once they and + * all prior log entries are persisted everywhere. + */ +template +void WriteLog::schedule_append_ops(GenericLogOperations &ops, C_BlockIORequestT *req) { + bool need_finisher = false; + GenericLogOperationsVector appending; + + std::copy(std::begin(ops), std::end(ops), std::back_inserter(appending)); + { + std::lock_guard locker(m_lock); + + bool persist_on_flush = this->get_persist_on_flush(); + need_finisher = !this->m_appending && + ((this->m_ops_to_append.size() >= CONTROL_BLOCK_MAX_LOG_ENTRIES) || + !persist_on_flush); + + // Only flush logs into SSD when there is internal/external flush request + if (!need_finisher) { + need_finisher = has_sync_point_logs(ops); + } + this->m_ops_to_append.splice(this->m_ops_to_append.end(), ops); + + // To preserve the order of overlapping IOs, release_cell() may be + // called only after the ops are added to m_ops_to_append. + // As soon as m_lock is released, the appended ops can be picked up + // by append_scheduled_ops() in another thread and req can be freed. + if (req != nullptr) { + if (persist_on_flush) { + req->complete_user_request(0); + } + req->release_cell(); + } + } + + if (need_finisher) { + this->enlist_op_appender(); + } + + for (auto &op : appending) { + op->appending(); + } +} + +template +void WriteLog::setup_schedule_append(pwl::GenericLogOperationsVector &ops, + bool do_early_flush, + C_BlockIORequestT *req) { + this->schedule_append(ops, req); +} + +template +void WriteLog::append_scheduled_ops(void) { + GenericLogOperations ops; + ldout(m_image_ctx.cct, 20) << dendl; + + bool ops_remain = false; //no-op variable for SSD + bool appending = false; //no-op variable for SSD + this->append_scheduled(ops, ops_remain, appending); + + if (ops.size()) { + alloc_op_log_entries(ops); + append_op_log_entries(ops); + } else { + this->m_async_append_ops--; + this->m_async_op_tracker.finish_op(); + } +} + +/* + * Write and persist the (already allocated) write log entries and + * data buffer allocations for a set of ops. The data buffer for each + * of these must already have been persisted to its reserved area. + */ +template +void WriteLog::append_op_log_entries(GenericLogOperations &ops) { + ceph_assert(!ops.empty()); + ldout(m_image_ctx.cct, 20) << dendl; + Context *ctx = new LambdaContext([this, ops](int r) { + assert(r == 0); + ldout(m_image_ctx.cct, 20) << "Finished root update " << dendl; + + auto captured_ops = std::move(ops); + this->complete_op_log_entries(std::move(captured_ops), r); + + bool need_finisher = false; + { + std::lock_guard locker1(m_lock); + bool persist_on_flush = this->get_persist_on_flush(); + need_finisher = ((this->m_ops_to_append.size() >= CONTROL_BLOCK_MAX_LOG_ENTRIES) || + !persist_on_flush); + + if (!need_finisher) { + need_finisher = has_sync_point_logs(this->m_ops_to_append); + } + } + + if (need_finisher) { + this->enlist_op_appender(); + } + this->m_async_update_superblock--; + this->m_async_op_tracker.finish_op(); + }); + uint64_t *new_first_free_entry = new(uint64_t); + Context *append_ctx = new LambdaContext( + [this, new_first_free_entry, ops, ctx](int r) { + std::shared_ptr new_root; + { + ldout(m_image_ctx.cct, 20) << "Finished appending at " + << *new_first_free_entry << dendl; + utime_t now = ceph_clock_now(); + for (auto &operation : ops) { + operation->log_append_comp_time = now; + } + + std::lock_guard locker(this->m_log_append_lock); + std::lock_guard locker1(m_lock); + assert(this->m_appending); + this->m_appending = false; + new_root = std::make_shared(pool_root); + pool_root.first_free_entry = *new_first_free_entry; + new_root->first_free_entry = *new_first_free_entry; + delete new_first_free_entry; + schedule_update_root(new_root, ctx); + } + this->m_async_append_ops--; + this->m_async_op_tracker.finish_op(); + }); + // Append logs and update first_free_update + append_ops(ops, append_ctx, new_first_free_entry); + + if (ops.size()) { + this->dispatch_deferred_writes(); + } +} + +template +void WriteLog::release_ram(std::shared_ptr log_entry) { + log_entry->remove_cache_bl(); +} + +template +void WriteLog::alloc_op_log_entries(GenericLogOperations &ops) { + std::unique_lock locker(m_lock); + + for (auto &operation : ops) { + auto &log_entry = operation->get_log_entry(); + log_entry->ram_entry.set_entry_valid(true); + m_log_entries.push_back(log_entry); + ldout(m_image_ctx.cct, 20) << "operation=[" << *operation << "]" << dendl; + } + if (m_cache_state->empty && !m_log_entries.empty()) { + m_cache_state->empty = false; + this->update_image_cache_state(); + this->write_image_cache_state(locker); + } +} + +template +void WriteLog::construct_flush_entries(pwl::GenericLogEntries entries_to_flush, + DeferredContexts &post_unlock, + bool has_write_entry) { + // snapshot so we behave consistently + bool invalidating = this->m_invalidating; + + if (invalidating || !has_write_entry) { + for (auto &log_entry : entries_to_flush) { + GuardedRequestFunctionContext *guarded_ctx = + new GuardedRequestFunctionContext([this, log_entry, invalidating] + (GuardedRequestFunctionContext &guard_ctx) { + log_entry->m_cell = guard_ctx.cell; + Context *ctx = this->construct_flush_entry(log_entry, invalidating); + + if (!invalidating) { + ctx = new LambdaContext([this, log_entry, ctx](int r) { + m_image_ctx.op_work_queue->queue(new LambdaContext( + [this, log_entry, ctx](int r) { + ldout(m_image_ctx.cct, 15) << "flushing:" << log_entry + << " " << *log_entry << dendl; + log_entry->writeback(this->m_image_writeback, ctx); + }), 0); + }); + } + ctx->complete(0); + }); + this->detain_flush_guard_request(log_entry, guarded_ctx); + } + } else { + int count = entries_to_flush.size(); + std::vector> write_entries; + std::vector read_bls; + + write_entries.reserve(count); + read_bls.reserve(count); + + for (auto &log_entry : entries_to_flush) { + if (log_entry->is_write_entry()) { + bufferlist *bl = new bufferlist; + auto write_entry = static_pointer_cast(log_entry); + write_entry->inc_bl_refs(); + write_entries.push_back(write_entry); + read_bls.push_back(bl); + } + } + + Context *ctx = new LambdaContext( + [this, entries_to_flush, read_bls](int r) { + int i = 0; + GuardedRequestFunctionContext *guarded_ctx = nullptr; + + for (auto &log_entry : entries_to_flush) { + if (log_entry->is_write_entry()) { + bufferlist captured_entry_bl; + captured_entry_bl.claim_append(*read_bls[i]); + delete read_bls[i++]; + + guarded_ctx = new GuardedRequestFunctionContext([this, log_entry, captured_entry_bl] + (GuardedRequestFunctionContext &guard_ctx) { + log_entry->m_cell = guard_ctx.cell; + Context *ctx = this->construct_flush_entry(log_entry, false); + + m_image_ctx.op_work_queue->queue(new LambdaContext( + [this, log_entry, entry_bl=std::move(captured_entry_bl), ctx](int r) { + auto captured_entry_bl = std::move(entry_bl); + ldout(m_image_ctx.cct, 15) << "flushing:" << log_entry + << " " << *log_entry << dendl; + log_entry->writeback_bl(this->m_image_writeback, ctx, + std::move(captured_entry_bl)); + }), 0); + }); + } else { + guarded_ctx = new GuardedRequestFunctionContext([this, log_entry] + (GuardedRequestFunctionContext &guard_ctx) { + log_entry->m_cell = guard_ctx.cell; + Context *ctx = this->construct_flush_entry(log_entry, false); + m_image_ctx.op_work_queue->queue(new LambdaContext( + [this, log_entry, ctx](int r) { + ldout(m_image_ctx.cct, 15) << "flushing:" << log_entry + << " " << *log_entry << dendl; + log_entry->writeback(this->m_image_writeback, ctx); + }), 0); + }); + } + this->detain_flush_guard_request(log_entry, guarded_ctx); + } + }); + + aio_read_data_blocks(write_entries, read_bls, ctx); + } +} + +template +void WriteLog::process_work() { + CephContext *cct = m_image_ctx.cct; + int max_iterations = 4; + bool wake_up_requested = false; + uint64_t aggressive_high_water_bytes = + this->m_bytes_allocated_cap * AGGRESSIVE_RETIRE_HIGH_WATER; + uint64_t high_water_bytes = this->m_bytes_allocated_cap * RETIRE_HIGH_WATER; + + ldout(cct, 20) << dendl; + + do { + { + std::lock_guard locker(m_lock); + this->m_wake_up_requested = false; + } + if (this->m_alloc_failed_since_retire || (this->m_shutting_down) || + this->m_invalidating || m_bytes_allocated > high_water_bytes) { + ldout(m_image_ctx.cct, 10) << "alloc_fail=" << this->m_alloc_failed_since_retire + << ", allocated > high_water=" + << (m_bytes_allocated > high_water_bytes) + << dendl; + retire_entries((this->m_shutting_down || this->m_invalidating || + m_bytes_allocated > aggressive_high_water_bytes) + ? MAX_ALLOC_PER_TRANSACTION : MAX_FREE_PER_TRANSACTION); + } + this->dispatch_deferred_writes(); + this->process_writeback_dirty_entries(); + { + std::lock_guard locker(m_lock); + wake_up_requested = this->m_wake_up_requested; + } + } while (wake_up_requested && --max_iterations > 0); + + { + std::lock_guard locker(m_lock); + this->m_wake_up_scheduled = false; + // Reschedule if it's still requested + if (this->m_wake_up_requested) { + this->wake_up(); + } + } +} + +/** + * Retire up to MAX_ALLOC_PER_TRANSACTION of the oldest log entries + * that are eligible to be retired. Returns true if anything was + * retired. + * +*/ +template +bool WriteLog::retire_entries(const unsigned long int frees_per_tx) { + CephContext *cct = m_image_ctx.cct; + GenericLogEntriesVector retiring_entries; + uint64_t initial_first_valid_entry; + uint64_t first_valid_entry; + + std::lock_guard retire_locker(this->m_log_retire_lock); + ldout(cct, 20) << "Look for entries to retire" << dendl; + { + // Entry readers can't be added while we hold m_entry_reader_lock + RWLock::WLocker entry_reader_locker(this->m_entry_reader_lock); + std::lock_guard locker(m_lock); + initial_first_valid_entry = m_first_valid_entry; + first_valid_entry = m_first_valid_entry; + while (retiring_entries.size() < frees_per_tx && !m_log_entries.empty()) { + GenericLogEntriesVector retiring_subentries; + uint64_t control_block_pos = m_log_entries.front()->log_entry_index; + uint64_t data_length = 0; + for (auto it = m_log_entries.begin(); it != m_log_entries.end(); ++it) { + if (this->can_retire_entry(*it)) { + // log_entry_index is valid after appending to SSD + if ((*it)->log_entry_index != control_block_pos) { + ldout(cct, 20) << "Old log_entry_index is " << control_block_pos + << ",New log_entry_index is " + << (*it)->log_entry_index + << ",data length is " << data_length << dendl; + ldout(cct, 20) << "The log entry is " << *(*it) << dendl; + if ((*it)->log_entry_index < control_block_pos) { + ceph_assert((*it)->log_entry_index == + (control_block_pos + data_length + MIN_WRITE_ALLOC_SSD_SIZE) % + this->m_log_pool_size + DATA_RING_BUFFER_OFFSET); + } else { + ceph_assert((*it)->log_entry_index == control_block_pos + + data_length + MIN_WRITE_ALLOC_SSD_SIZE); + } + break; + } else { + retiring_subentries.push_back(*it); + if ((*it)->is_write_entry()) { + data_length += (*it)->get_aligned_data_size(); + } + } + } else { + retiring_subentries.clear(); + break; + } + } + // SSD: retiring_subentries in a span + if (!retiring_subentries.empty()) { + for (auto it = retiring_subentries.begin(); + it != retiring_subentries.end(); it++) { + ceph_assert(m_log_entries.front() == *it); + m_log_entries.pop_front(); + if ((*it)->write_bytes() > 0 || (*it)->bytes_dirty() > 0) { + auto gen_write_entry = static_pointer_cast(*it); + if (gen_write_entry) { + this->m_blocks_to_log_entries.remove_log_entry(gen_write_entry); + } + } + } + + ldout(cct, 20) << "span with " << retiring_subentries.size() + << " entries: control_block_pos=" << control_block_pos + << " data_length=" << data_length + << dendl; + retiring_entries.insert( + retiring_entries.end(), retiring_subentries.begin(), + retiring_subentries.end()); + + first_valid_entry = control_block_pos + data_length + + MIN_WRITE_ALLOC_SSD_SIZE; + if (first_valid_entry >= this->m_log_pool_size) { + first_valid_entry = first_valid_entry % this->m_log_pool_size + + DATA_RING_BUFFER_OFFSET; + } + } else { + break; + } + } + } + if (retiring_entries.size()) { + ldout(cct, 20) << "Retiring " << retiring_entries.size() << " entries" + << dendl; + + // Advance first valid entry and release buffers + uint64_t flushed_sync_gen; + std::lock_guard append_locker(this->m_log_append_lock); + { + std::lock_guard locker(m_lock); + flushed_sync_gen = this->m_flushed_sync_gen; + } + + ceph_assert(first_valid_entry != initial_first_valid_entry); + auto new_root = std::make_shared(pool_root); + new_root->flushed_sync_gen = flushed_sync_gen; + new_root->first_valid_entry = first_valid_entry; + pool_root.flushed_sync_gen = flushed_sync_gen; + pool_root.first_valid_entry = first_valid_entry; + + Context *ctx = new LambdaContext( + [this, first_valid_entry, initial_first_valid_entry, + retiring_entries](int r) { + uint64_t allocated_bytes = 0; + uint64_t cached_bytes = 0; + uint64_t former_log_pos = 0; + for (auto &entry : retiring_entries) { + ceph_assert(entry->log_entry_index != 0); + if (entry->log_entry_index != former_log_pos ) { + // Space for control blocks + allocated_bytes += MIN_WRITE_ALLOC_SSD_SIZE; + former_log_pos = entry->log_entry_index; + } + if (entry->is_write_entry()) { + cached_bytes += entry->write_bytes(); + // space for userdata + allocated_bytes += entry->get_aligned_data_size(); + } + } + bool need_update_state = false; + { + std::lock_guard locker(m_lock); + m_first_valid_entry = first_valid_entry; + ceph_assert(m_first_valid_entry % MIN_WRITE_ALLOC_SSD_SIZE == 0); + ceph_assert(this->m_bytes_allocated >= allocated_bytes); + this->m_bytes_allocated -= allocated_bytes; + ceph_assert(this->m_bytes_cached >= cached_bytes); + this->m_bytes_cached -= cached_bytes; + if (!m_cache_state->empty && m_log_entries.empty()) { + m_cache_state->empty = true; + this->update_image_cache_state(); + need_update_state = true; + } + + ldout(m_image_ctx.cct, 20) + << "Finished root update: " << "initial_first_valid_entry=" + << initial_first_valid_entry << ", " << "m_first_valid_entry=" + << m_first_valid_entry << "," << "release space = " + << allocated_bytes << "," << "m_bytes_allocated=" + << m_bytes_allocated << "," << "release cached space=" + << cached_bytes << "," << "m_bytes_cached=" + << this->m_bytes_cached << dendl; + + this->m_alloc_failed_since_retire = false; + this->wake_up(); + } + if (need_update_state) { + std::unique_lock locker(m_lock); + this->write_image_cache_state(locker); + } + + this->dispatch_deferred_writes(); + this->process_writeback_dirty_entries(); + m_async_update_superblock--; + this->m_async_op_tracker.finish_op(); + }); + + std::lock_guard locker(m_lock); + schedule_update_root(new_root, ctx); + } else { + ldout(cct, 20) << "Nothing to retire" << dendl; + return false; + } + return true; +} + +template +void WriteLog::append_ops(GenericLogOperations &ops, Context *ctx, + uint64_t* new_first_free_entry) { + GenericLogEntriesVector log_entries; + CephContext *cct = m_image_ctx.cct; + uint64_t span_payload_len = 0; + uint64_t bytes_to_free = 0; + ldout(cct, 20) << "Appending " << ops.size() << " log entries." << dendl; + + *new_first_free_entry = pool_root.first_free_entry; + AioTransContext* aio = new AioTransContext(cct, ctx); + + utime_t now = ceph_clock_now(); + for (auto &operation : ops) { + operation->log_append_start_time = now; + auto log_entry = operation->get_log_entry(); + + if (log_entries.size() == CONTROL_BLOCK_MAX_LOG_ENTRIES || + span_payload_len >= SPAN_MAX_DATA_LEN) { + if (log_entries.size() > 1) { + bytes_to_free += (log_entries.size() - 1) * MIN_WRITE_ALLOC_SSD_SIZE; + } + write_log_entries(log_entries, aio, new_first_free_entry); + log_entries.clear(); + span_payload_len = 0; + } + log_entries.push_back(log_entry); + span_payload_len += log_entry->write_bytes(); + } + if (!span_payload_len || !log_entries.empty()) { + if (log_entries.size() > 1) { + bytes_to_free += (log_entries.size() - 1) * MIN_WRITE_ALLOC_SSD_SIZE; + } + write_log_entries(log_entries, aio, new_first_free_entry); + } + + { + std::lock_guard locker1(m_lock); + m_first_free_entry = *new_first_free_entry; + m_bytes_allocated -= bytes_to_free; + } + + bdev->aio_submit(&aio->ioc); +} + +template +void WriteLog::write_log_entries(GenericLogEntriesVector log_entries, + AioTransContext *aio, uint64_t *pos) { + CephContext *cct = m_image_ctx.cct; + ldout(m_image_ctx.cct, 20) << "pos=" << *pos << dendl; + ceph_assert(*pos >= DATA_RING_BUFFER_OFFSET && + *pos < this->m_log_pool_size && + *pos % MIN_WRITE_ALLOC_SSD_SIZE == 0); + + // The first block is for log entries + uint64_t control_block_pos = *pos; + *pos += MIN_WRITE_ALLOC_SSD_SIZE; + if (*pos == this->m_log_pool_size) { + *pos = DATA_RING_BUFFER_OFFSET; + } + + std::vector persist_log_entries; + bufferlist data_bl; + for (auto &log_entry : log_entries) { + log_entry->log_entry_index = control_block_pos; + // Append data buffer for write operations + if (log_entry->is_write_entry()) { + auto write_entry = static_pointer_cast(log_entry); + auto cache_bl = write_entry->get_cache_bl(); + auto align_size = write_entry->get_aligned_data_size(); + data_bl.append(cache_bl); + data_bl.append_zero(align_size - cache_bl.length()); + + write_entry->ram_entry.write_data_pos = *pos; + *pos += align_size; + if (*pos >= this->m_log_pool_size) { + *pos = *pos % this->m_log_pool_size + DATA_RING_BUFFER_OFFSET; + } + } + // push_back _after_ setting write_data_pos + persist_log_entries.push_back(log_entry->ram_entry); + } + + //aio write + bufferlist bl; + encode(persist_log_entries, bl); + ceph_assert(bl.length() <= MIN_WRITE_ALLOC_SSD_SIZE); + bl.append_zero(MIN_WRITE_ALLOC_SSD_SIZE - bl.length()); + bl.append(data_bl); + ceph_assert(bl.length() % MIN_WRITE_ALLOC_SSD_SIZE == 0); + if (control_block_pos + bl.length() > this->m_log_pool_size) { + //exceeds border, need to split + uint64_t size = bl.length(); + bufferlist bl1; + bl.splice(0, this->m_log_pool_size - control_block_pos, &bl1); + ceph_assert(bl.length() == (size - bl1.length())); + + ldout(cct, 20) << "write " << control_block_pos << "~" + << size << " spans boundary, split into " + << control_block_pos << "~" << bl1.length() + << " and " << DATA_RING_BUFFER_OFFSET << "~" + << bl.length() << dendl; + bdev->aio_write(control_block_pos, bl1, &aio->ioc, false, + WRITE_LIFE_NOT_SET); + bdev->aio_write(DATA_RING_BUFFER_OFFSET, bl, &aio->ioc, false, + WRITE_LIFE_NOT_SET); + } else { + ldout(cct, 20) << "write " << control_block_pos << "~" + << bl.length() << dendl; + bdev->aio_write(control_block_pos, bl, &aio->ioc, false, + WRITE_LIFE_NOT_SET); + } +} + +template +void WriteLog::schedule_update_root( + std::shared_ptr root, Context *ctx) { + CephContext *cct = m_image_ctx.cct; + ldout(cct, 15) << "New root: pool_size=" << root->pool_size + << " first_valid_entry=" << root->first_valid_entry + << " first_free_entry=" << root->first_free_entry + << " flushed_sync_gen=" << root->flushed_sync_gen + << dendl; + ceph_assert(is_valid_pool_root(*root)); + + bool need_finisher; + { + ceph_assert(ceph_mutex_is_locked_by_me(m_lock)); + need_finisher = m_poolroot_to_update.empty() && !m_updating_pool_root; + std::shared_ptr entry = + std::make_shared(root, ctx); + this->m_async_update_superblock++; + this->m_async_op_tracker.start_op(); + m_poolroot_to_update.emplace_back(entry); + } + if (need_finisher) { + enlist_op_update_root(); + } +} + +template +void WriteLog::enlist_op_update_root() { + Context *append_ctx = new LambdaContext([this](int r) { + update_root_scheduled_ops(); + }); + this->m_work_queue.queue(append_ctx); +} + +template +void WriteLog::update_root_scheduled_ops() { + ldout(m_image_ctx.cct, 20) << dendl; + + std::shared_ptr root; + WriteLogPoolRootUpdateList root_updates; + Context *ctx = nullptr; + { + std::lock_guard locker(m_lock); + if (m_updating_pool_root) { + /* Another thread is appending */ + ldout(m_image_ctx.cct, 15) << "Another thread is updating pool root" + << dendl; + return; + } + if (m_poolroot_to_update.size()) { + m_updating_pool_root = true; + root_updates.swap(m_poolroot_to_update); + } + } + ceph_assert(!root_updates.empty()); + ldout(m_image_ctx.cct, 15) << "Update root number: " << root_updates.size() + << dendl; + // We just update the last one, and call all the completions. + auto entry = root_updates.back(); + root = entry->root; + + ctx = new LambdaContext([this, updates = std::move(root_updates)](int r) { + ldout(m_image_ctx.cct, 15) << "Start to callback." << dendl; + for (auto it = updates.begin(); it != updates.end(); it++) { + Context *it_ctx = (*it)->ctx; + it_ctx->complete(r); + } + }); + Context *append_ctx = new LambdaContext([this, ctx](int r) { + ldout(m_image_ctx.cct, 15) << "Finish the update of pool root." << dendl; + bool need_finisher = false;; + assert(r == 0); + { + std::lock_guard locker(m_lock); + m_updating_pool_root = false; + need_finisher = !m_poolroot_to_update.empty(); + } + if (need_finisher) { + enlist_op_update_root(); + } + ctx->complete(r); + }); + AioTransContext* aio = new AioTransContext(m_image_ctx.cct, append_ctx); + update_pool_root(root, aio); +} + +template +void WriteLog::update_pool_root(std::shared_ptr root, + AioTransContext *aio) { + bufferlist bl; + SuperBlock superblock; + superblock.root = *root; + encode(superblock, bl); + bl.append_zero(MIN_WRITE_ALLOC_SSD_SIZE - bl.length()); + ceph_assert(bl.length() % MIN_WRITE_ALLOC_SSD_SIZE == 0); + bdev->aio_write(0, bl, &aio->ioc, false, WRITE_LIFE_NOT_SET); + bdev->aio_submit(&aio->ioc); +} + +template +int WriteLog::update_pool_root_sync( + std::shared_ptr root) { + bufferlist bl; + SuperBlock superblock; + superblock.root = *root; + encode(superblock, bl); + bl.append_zero(MIN_WRITE_ALLOC_SSD_SIZE - bl.length()); + ceph_assert(bl.length() % MIN_WRITE_ALLOC_SSD_SIZE == 0); + return bdev->write(0, bl, false); +} + +template +void WriteLog::aio_read_data_block(std::shared_ptr log_entry, + bufferlist *bl, Context *ctx) { + std::vector> log_entries = {std::move(log_entry)}; + std::vector bls {bl}; + aio_read_data_blocks(log_entries, bls, ctx); +} + +template +void WriteLog::aio_read_data_blocks( + std::vector> &log_entries, + std::vector &bls, Context *ctx) { + ceph_assert(log_entries.size() == bls.size()); + + //get the valid part + Context *read_ctx = new LambdaContext( + [log_entries, bls, ctx](int r) { + for (unsigned int i = 0; i < log_entries.size(); i++) { + bufferlist valid_data_bl; + auto write_entry = static_pointer_cast(log_entries[i]); + auto length = write_entry->ram_entry.is_write() ? write_entry->ram_entry.write_bytes + : write_entry->ram_entry.ws_datalen; + + valid_data_bl.substr_of(*bls[i], 0, length); + bls[i]->clear(); + bls[i]->append(valid_data_bl); + write_entry->dec_bl_refs(); + } + ctx->complete(r); + }); + + CephContext *cct = m_image_ctx.cct; + AioTransContext *aio = new AioTransContext(cct, read_ctx); + for (unsigned int i = 0; i < log_entries.size(); i++) { + WriteLogCacheEntry *log_entry = &log_entries[i]->ram_entry; + + ceph_assert(log_entry->is_write() || log_entry->is_writesame()); + uint64_t len = log_entry->is_write() ? log_entry->write_bytes : + log_entry->ws_datalen; + uint64_t align_len = round_up_to(len, MIN_WRITE_ALLOC_SSD_SIZE); + + ldout(cct, 20) << "entry i=" << i << " " << log_entry->write_data_pos + << "~" << len << dendl; + ceph_assert(log_entry->write_data_pos >= DATA_RING_BUFFER_OFFSET && + log_entry->write_data_pos < pool_root.pool_size); + ceph_assert(align_len); + if (log_entry->write_data_pos + align_len > pool_root.pool_size) { + // spans boundary, need to split + uint64_t len1 = pool_root.pool_size - log_entry->write_data_pos; + uint64_t len2 = align_len - len1; + + ldout(cct, 20) << "read " << log_entry->write_data_pos << "~" + << align_len << " spans boundary, split into " + << log_entry->write_data_pos << "~" << len1 + << " and " << DATA_RING_BUFFER_OFFSET << "~" + << len2 << dendl; + bdev->aio_read(log_entry->write_data_pos, len1, bls[i], &aio->ioc); + bdev->aio_read(DATA_RING_BUFFER_OFFSET, len2, bls[i], &aio->ioc); + } else { + ldout(cct, 20) << "read " << log_entry->write_data_pos << "~" + << align_len << dendl; + bdev->aio_read(log_entry->write_data_pos, align_len, bls[i], &aio->ioc); + } + } + bdev->aio_submit(&aio->ioc); +} + +template +void WriteLog::complete_user_request(Context *&user_req, int r) { + m_image_ctx.op_work_queue->queue(user_req, r); +} + +} // namespace ssd +} // namespace pwl +} // namespace cache +} // namespace librbd + +template class librbd::cache::pwl::ssd::WriteLog; diff --git a/src/librbd/cache/pwl/ssd/WriteLog.h b/src/librbd/cache/pwl/ssd/WriteLog.h new file mode 100644 index 000000000..69cc36662 --- /dev/null +++ b/src/librbd/cache/pwl/ssd/WriteLog.h @@ -0,0 +1,156 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#ifndef CEPH_LIBRBD_CACHE_PWL_SSD_WRITE_LOG +#define CEPH_LIBRBD_CACHE_PWL_SSD_WRITE_LOG + +#include "blk/BlockDevice.h" +#include "common/AsyncOpTracker.h" +#include "common/Checksummer.h" +#include "common/environment.h" +#include "common/RWLock.h" +#include "common/WorkQueue.h" +#include "librbd/BlockGuard.h" +#include "librbd/Utils.h" +#include "librbd/cache/ImageWriteback.h" +#include "librbd/cache/Types.h" +#include "librbd/cache/pwl/AbstractWriteLog.h" +#include "librbd/cache/pwl/LogMap.h" +#include "librbd/cache/pwl/LogOperation.h" +#include "librbd/cache/pwl/Request.h" +#include "librbd/cache/pwl/ssd/Builder.h" +#include "librbd/cache/pwl/ssd/Types.h" +#include +#include + +namespace librbd { + +struct ImageCtx; + +namespace cache { +namespace pwl { +namespace ssd { + +template +class WriteLog : public AbstractWriteLog { +public: + WriteLog(ImageCtxT &image_ctx, + librbd::cache::pwl::ImageCacheState* cache_state, + cache::ImageWritebackInterface& image_writeback, + plugin::Api& plugin_api); + ~WriteLog(); + WriteLog(const WriteLog&) = delete; + WriteLog &operator=(const WriteLog&) = delete; + + typedef io::Extent Extent; + using This = AbstractWriteLog; + using C_BlockIORequestT = pwl::C_BlockIORequest; + using C_WriteRequestT = pwl::C_WriteRequest; + using C_WriteSameRequestT = pwl::C_WriteSameRequest; + + bool alloc_resources(C_BlockIORequestT *req) override; + void setup_schedule_append( + pwl::GenericLogOperationsVector &ops, bool do_early_flush, + C_BlockIORequestT *req) override; + void complete_user_request(Context *&user_req, int r) override; + +protected: + using AbstractWriteLog::m_lock; + using AbstractWriteLog::m_log_entries; + using AbstractWriteLog::m_image_ctx; + using AbstractWriteLog::m_cache_state; + using AbstractWriteLog::m_first_free_entry; + using AbstractWriteLog::m_first_valid_entry; + using AbstractWriteLog::m_bytes_allocated; + + bool initialize_pool(Context *on_finish, + pwl::DeferredContexts &later) override; + void process_work() override; + void append_scheduled_ops(void) override; + void schedule_append_ops(pwl::GenericLogOperations &ops, C_BlockIORequestT *req) override; + void remove_pool_file() override; + void release_ram(std::shared_ptr log_entry) override; + +private: + class AioTransContext { + public: + Context *on_finish; + ::IOContext ioc; + explicit AioTransContext(CephContext* cct, Context *cb) + : on_finish(cb), ioc(cct, this) {} + + ~AioTransContext(){} + + void aio_finish() { + on_finish->complete(ioc.get_return_value()); + delete this; + } + }; //class AioTransContext + + struct WriteLogPoolRootUpdate { + std::shared_ptr root; + Context *ctx; + WriteLogPoolRootUpdate(std::shared_ptr r, + Context* c) + : root(r), ctx(c) {} + }; + + using WriteLogPoolRootUpdateList = std::list>; + WriteLogPoolRootUpdateList m_poolroot_to_update; /* pool root list to update to SSD */ + bool m_updating_pool_root = false; + + std::atomic m_async_update_superblock = {0}; + BlockDevice *bdev = nullptr; + pwl::WriteLogPoolRoot pool_root; + Builder *m_builderobj; + + Builder* create_builder(); + int create_and_open_bdev(); + void load_existing_entries(pwl::DeferredContexts &later); + void inc_allocated_cached_bytes( + std::shared_ptr log_entry) override; + void collect_read_extents( + uint64_t read_buffer_offset, LogMapEntry map_entry, + std::vector> &log_entries_to_read, + std::vector &bls_to_read, uint64_t entry_hit_length, + Extent hit_extent, pwl::C_ReadRequest *read_ctx) override; + void complete_read( + std::vector> &log_entries_to_read, + std::vector &bls_to_read, Context *ctx) override; + void enlist_op_appender(); + bool retire_entries(const unsigned long int frees_per_tx); + bool has_sync_point_logs(GenericLogOperations &ops); + void append_op_log_entries(GenericLogOperations &ops); + void alloc_op_log_entries(GenericLogOperations &ops); + void construct_flush_entries(pwl::GenericLogEntries entires_to_flush, + DeferredContexts &post_unlock, + bool has_write_entry) override; + void append_ops(GenericLogOperations &ops, Context *ctx, + uint64_t* new_first_free_entry); + void write_log_entries(GenericLogEntriesVector log_entries, + AioTransContext *aio, uint64_t *pos); + void schedule_update_root(std::shared_ptr root, + Context *ctx); + void enlist_op_update_root(); + void update_root_scheduled_ops(); + int update_pool_root_sync(std::shared_ptr root); + void update_pool_root(std::shared_ptr root, + AioTransContext *aio); + void aio_read_data_block(std::shared_ptr log_entry, + bufferlist *bl, Context *ctx); + void aio_read_data_blocks(std::vector> &log_entries, + std::vector &bls, Context *ctx); + static void aio_cache_cb(void *priv, void *priv2) { + AioTransContext *c = static_cast(priv2); + c->aio_finish(); + } +};//class WriteLog + +} // namespace ssd +} // namespace pwl +} // namespace cache +} // namespace librbd + +extern template class librbd::cache::pwl::ssd::WriteLog; + +#endif // CEPH_LIBRBD_CACHE_PWL_SSD_WRITE_LOG -- cgit v1.2.3