summaryrefslogtreecommitdiffstats
path: root/src/librbd/cache/pwl/LogOperation.cc
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--src/librbd/cache/pwl/LogOperation.cc316
1 files changed, 316 insertions, 0 deletions
diff --git a/src/librbd/cache/pwl/LogOperation.cc b/src/librbd/cache/pwl/LogOperation.cc
new file mode 100644
index 000000000..4fc13a91a
--- /dev/null
+++ b/src/librbd/cache/pwl/LogOperation.cc
@@ -0,0 +1,316 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include <iostream>
+#include "LogOperation.h"
+#include "librbd/cache/pwl/Types.h"
+
+#define dout_subsys ceph_subsys_rbd_pwl
+#undef dout_prefix
+#define dout_prefix *_dout << "librbd::cache::pwl::LogOperation: " << this \
+ << " " << __func__ << ": "
+
+namespace librbd {
+namespace cache {
+namespace pwl {
+
+GenericLogOperation::GenericLogOperation(utime_t dispatch_time,
+ PerfCounters *perfcounter)
+ : m_perfcounter(perfcounter), dispatch_time(dispatch_time) {
+}
+
+std::ostream& GenericLogOperation::format(std::ostream &os) const {
+ os << "dispatch_time=[" << dispatch_time << "], "
+ << "buf_persist_start_time=[" << buf_persist_start_time << "], "
+ << "buf_persist_comp_time=[" << buf_persist_comp_time << "], "
+ << "log_append_start_time=[" << log_append_start_time << "], "
+ << "log_append_comp_time=[" << log_append_comp_time << "], ";
+ return os;
+}
+
+std::ostream &operator<<(std::ostream &os,
+ const GenericLogOperation &op) {
+ return op.format(os);
+}
+
+SyncPointLogOperation::SyncPointLogOperation(ceph::mutex &lock,
+ std::shared_ptr<SyncPoint> sync_point,
+ utime_t dispatch_time,
+ PerfCounters *perfcounter,
+ CephContext *cct)
+ : GenericLogOperation(dispatch_time, perfcounter), m_cct(cct), m_lock(lock),
+ sync_point(sync_point) {
+}
+
+SyncPointLogOperation::~SyncPointLogOperation() { }
+
+std::ostream &SyncPointLogOperation::format(std::ostream &os) const {
+ os << "(Sync Point) ";
+ GenericLogOperation::format(os);
+ os << ", "
+ << "sync_point=[" << *sync_point << "]";
+ return os;
+}
+
+std::ostream &operator<<(std::ostream &os,
+ const SyncPointLogOperation &op) {
+ return op.format(os);
+}
+
+std::vector<Context*> SyncPointLogOperation::append_sync_point() {
+ std::vector<Context*> appending_contexts;
+ std::lock_guard locker(m_lock);
+ if (!sync_point->appending) {
+ sync_point->appending = true;
+ }
+ appending_contexts.swap(sync_point->on_sync_point_appending);
+ return appending_contexts;
+}
+
+void SyncPointLogOperation::clear_earlier_sync_point() {
+ std::lock_guard locker(m_lock);
+ ceph_assert(sync_point->later_sync_point);
+ ceph_assert(sync_point->later_sync_point->earlier_sync_point == sync_point);
+ sync_point->later_sync_point->earlier_sync_point = nullptr;
+ sync_point->later_sync_point = nullptr;
+}
+
+std::vector<Context*> SyncPointLogOperation::swap_on_sync_point_persisted() {
+ std::lock_guard locker(m_lock);
+ std::vector<Context*> persisted_contexts;
+ persisted_contexts.swap(sync_point->on_sync_point_persisted);
+ return persisted_contexts;
+}
+
+void SyncPointLogOperation::appending() {
+ ceph_assert(sync_point);
+ ldout(m_cct, 20) << "Sync point op=[" << *this
+ << "] appending" << dendl;
+ auto appending_contexts = append_sync_point();
+ for (auto &ctx : appending_contexts) {
+ ctx->complete(0);
+ }
+}
+
+void SyncPointLogOperation::complete(int result) {
+ ceph_assert(sync_point);
+ ldout(m_cct, 20) << "Sync point op =[" << *this
+ << "] completed" << dendl;
+ clear_earlier_sync_point();
+
+ /* Do append now in case completion occurred before the
+ * normal append callback executed, and to handle
+ * on_append work that was queued after the sync point
+ * entered the appending state. */
+ appending();
+ auto persisted_contexts = swap_on_sync_point_persisted();
+ for (auto &ctx : persisted_contexts) {
+ ctx->complete(result);
+ }
+}
+
+GenericWriteLogOperation::GenericWriteLogOperation(std::shared_ptr<SyncPoint> sync_point,
+ utime_t dispatch_time,
+ PerfCounters *perfcounter,
+ CephContext *cct)
+ : GenericLogOperation(dispatch_time, perfcounter),
+ m_lock(ceph::make_mutex(pwl::unique_lock_name(
+ "librbd::cache::pwl::GenericWriteLogOperation::m_lock", this))),
+ m_cct(cct),
+ sync_point(sync_point) {
+}
+
+GenericWriteLogOperation::~GenericWriteLogOperation() { }
+
+std::ostream &GenericWriteLogOperation::format(std::ostream &os) const {
+ GenericLogOperation::format(os);
+ return os;
+}
+
+std::ostream &operator<<(std::ostream &os,
+ const GenericWriteLogOperation &op) {
+ return op.format(os);
+}
+
+/* Called when the write log operation is appending and its log position is guaranteed */
+void GenericWriteLogOperation::appending() {
+ Context *on_append = nullptr;
+ ldout(m_cct, 20) << __func__ << " " << this << dendl;
+ {
+ std::lock_guard locker(m_lock);
+ on_append = on_write_append;
+ on_write_append = nullptr;
+ }
+ if (on_append) {
+ ldout(m_cct, 20) << __func__ << " " << this << " on_append=" << on_append << dendl;
+ on_append->complete(0);
+ }
+}
+
+/* Called when the write log operation is completed in all log replicas */
+void GenericWriteLogOperation::complete(int result) {
+ appending();
+ Context *on_persist = nullptr;
+ ldout(m_cct, 20) << __func__ << " " << this << dendl;
+ {
+ std::lock_guard locker(m_lock);
+ on_persist = on_write_persist;
+ on_write_persist = nullptr;
+ }
+ if (on_persist) {
+ ldout(m_cct, 20) << __func__ << " " << this << " on_persist=" << on_persist
+ << dendl;
+ on_persist->complete(result);
+ }
+}
+
+WriteLogOperation::WriteLogOperation(
+ WriteLogOperationSet &set, uint64_t image_offset_bytes,
+ uint64_t write_bytes, CephContext *cct,
+ std::shared_ptr<WriteLogEntry> write_log_entry)
+ : GenericWriteLogOperation(set.sync_point, set.dispatch_time,
+ set.perfcounter, cct),
+ log_entry(write_log_entry) {
+ on_write_append = set.extent_ops_appending->new_sub();
+ on_write_persist = set.extent_ops_persist->new_sub();
+ log_entry->sync_point_entry->writes++;
+ log_entry->sync_point_entry->bytes += write_bytes;
+}
+
+WriteLogOperation::WriteLogOperation(WriteLogOperationSet &set,
+ uint64_t image_offset_bytes,
+ uint64_t write_bytes,
+ uint32_t data_len,
+ CephContext *cct,
+ std::shared_ptr<WriteLogEntry> writesame_log_entry)
+ : WriteLogOperation(set, image_offset_bytes, write_bytes, cct,
+ writesame_log_entry) {
+ is_writesame = true;
+}
+
+WriteLogOperation::~WriteLogOperation() { }
+
+void WriteLogOperation::init(bool has_data, std::vector<WriteBufferAllocation>::iterator allocation,
+ uint64_t current_sync_gen,
+ uint64_t last_op_sequence_num,
+ bufferlist &write_req_bl, uint64_t buffer_offset,
+ bool persist_on_flush) {
+ log_entry->init(has_data, current_sync_gen, last_op_sequence_num,
+ persist_on_flush);
+ buffer_alloc = &(*allocation);
+ bl.substr_of(write_req_bl, buffer_offset, log_entry->write_bytes());
+ log_entry->init_cache_bl(write_req_bl, buffer_offset,
+ log_entry->write_bytes());
+}
+
+std::ostream &WriteLogOperation::format(std::ostream &os) const {
+ string op_name = is_writesame ? "(Write Same) " : "(Write) ";
+ os << op_name;
+ GenericWriteLogOperation::format(os);
+ os << ", ";
+ if (log_entry) {
+ os << "log_entry=[" << *log_entry << "], ";
+ } else {
+ os << "log_entry=nullptr, ";
+ }
+ os << "bl=[" << bl << "],"
+ << "buffer_alloc=" << buffer_alloc;
+ return os;
+}
+
+std::ostream &operator<<(std::ostream &os,
+ const WriteLogOperation &op) {
+ return op.format(os);
+}
+
+
+void WriteLogOperation::complete(int result) {
+ GenericWriteLogOperation::complete(result);
+ m_perfcounter->tinc(l_librbd_pwl_log_op_dis_to_buf_t,
+ buf_persist_start_time - dispatch_time);
+ utime_t buf_persist_lat = buf_persist_comp_time - buf_persist_start_time;
+ m_perfcounter->tinc(l_librbd_pwl_log_op_buf_to_bufc_t, buf_persist_lat);
+ m_perfcounter->hinc(l_librbd_pwl_log_op_buf_to_bufc_t_hist,
+ buf_persist_lat.to_nsec(),
+ log_entry->ram_entry.write_bytes);
+ m_perfcounter->tinc(l_librbd_pwl_log_op_buf_to_app_t,
+ log_append_start_time - buf_persist_start_time);
+}
+
+WriteLogOperationSet::WriteLogOperationSet(utime_t dispatched, PerfCounters *perfcounter, std::shared_ptr<SyncPoint> sync_point,
+ bool persist_on_flush, CephContext *cct, Context *on_finish)
+ : m_cct(cct), m_on_finish(on_finish),
+ persist_on_flush(persist_on_flush),
+ dispatch_time(dispatched),
+ perfcounter(perfcounter),
+ sync_point(sync_point) {
+ on_ops_appending = sync_point->prior_persisted_gather_new_sub();
+ on_ops_persist = nullptr;
+ extent_ops_persist =
+ new C_Gather(m_cct,
+ new LambdaContext( [this](int r) {
+ ldout(this->m_cct,20) << __func__ << " " << this << " m_extent_ops_persist completed" << dendl;
+ if (on_ops_persist) {
+ on_ops_persist->complete(r);
+ }
+ m_on_finish->complete(r);
+ }));
+ auto appending_persist_sub = extent_ops_persist->new_sub();
+ extent_ops_appending =
+ new C_Gather(m_cct,
+ new LambdaContext( [this, appending_persist_sub](int r) {
+ ldout(this->m_cct, 20) << __func__ << " " << this << " m_extent_ops_appending completed" << dendl;
+ on_ops_appending->complete(r);
+ appending_persist_sub->complete(r);
+ }));
+}
+
+WriteLogOperationSet::~WriteLogOperationSet() { }
+
+std::ostream &operator<<(std::ostream &os,
+ const WriteLogOperationSet &s) {
+ os << "cell=" << (void*)s.cell << ", "
+ << "extent_ops_appending=[" << s.extent_ops_appending << ", "
+ << "extent_ops_persist=[" << s.extent_ops_persist << "]";
+ return os;
+}
+
+DiscardLogOperation::DiscardLogOperation(std::shared_ptr<SyncPoint> sync_point,
+ uint64_t image_offset_bytes,
+ uint64_t write_bytes,
+ uint32_t discard_granularity_bytes,
+ utime_t dispatch_time,
+ PerfCounters *perfcounter,
+ CephContext *cct)
+ : GenericWriteLogOperation(sync_point, dispatch_time, perfcounter, cct),
+ log_entry(std::make_shared<DiscardLogEntry>(sync_point->log_entry,
+ image_offset_bytes,
+ write_bytes,
+ discard_granularity_bytes)) {
+ on_write_persist = nullptr;
+ log_entry->sync_point_entry->writes++;
+ log_entry->sync_point_entry->bytes += write_bytes;
+}
+
+DiscardLogOperation::~DiscardLogOperation() { }
+
+std::ostream &DiscardLogOperation::format(std::ostream &os) const {
+ os << "(Discard) ";
+ GenericWriteLogOperation::format(os);
+ os << ", ";
+ if (log_entry) {
+ os << "log_entry=[" << *log_entry << "], ";
+ } else {
+ os << "log_entry=nullptr, ";
+ }
+ return os;
+}
+
+std::ostream &operator<<(std::ostream &os,
+ const DiscardLogOperation &op) {
+ return op.format(os);
+}
+
+} // namespace pwl
+} // namespace cache
+} // namespace librbd