summaryrefslogtreecommitdiffstats
path: root/src/librbd/cache/pwl/AbstractWriteLog.h
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--src/librbd/cache/pwl/AbstractWriteLog.h410
1 files changed, 410 insertions, 0 deletions
diff --git a/src/librbd/cache/pwl/AbstractWriteLog.h b/src/librbd/cache/pwl/AbstractWriteLog.h
new file mode 100644
index 000000000..ffe299c37
--- /dev/null
+++ b/src/librbd/cache/pwl/AbstractWriteLog.h
@@ -0,0 +1,410 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#ifndef CEPH_LIBRBD_CACHE_PARENT_WRITE_LOG
+#define CEPH_LIBRBD_CACHE_PARENT_WRITE_LOG
+
+#include "common/Timer.h"
+#include "common/RWLock.h"
+#include "common/WorkQueue.h"
+#include "common/AsyncOpTracker.h"
+#include "librbd/cache/ImageWriteback.h"
+#include "librbd/Utils.h"
+#include "librbd/BlockGuard.h"
+#include "librbd/cache/Types.h"
+#include "librbd/cache/pwl/LogOperation.h"
+#include "librbd/cache/pwl/ReadRequest.h"
+#include "librbd/cache/pwl/Request.h"
+#include "librbd/cache/pwl/LogMap.h"
+#include "librbd/cache/pwl/Builder.h"
+#include <functional>
+#include <list>
+
+class Context;
+
+namespace librbd {
+
+struct ImageCtx;
+
+namespace plugin { template <typename> struct Api; }
+
+namespace cache {
+namespace pwl {
+
+class GenericLogEntry;
+class GenericWriteLogEntry;
+class SyncPointLogEntry;
+class WriteLogEntry;
+struct WriteLogCacheEntry;
+
+typedef std::list<std::shared_ptr<WriteLogEntry>> WriteLogEntries;
+typedef std::list<std::shared_ptr<GenericLogEntry>> GenericLogEntries;
+typedef std::list<std::shared_ptr<GenericWriteLogEntry>> GenericWriteLogEntries;
+typedef std::vector<std::shared_ptr<GenericLogEntry>> GenericLogEntriesVector;
+
+typedef LogMapEntries<GenericWriteLogEntry> WriteLogMapEntries;
+typedef LogMap<GenericWriteLogEntry> WriteLogMap;
+
+/**** Write log entries end ****/
+
+typedef librbd::BlockGuard<GuardedRequest> WriteLogGuard;
+
+class DeferredContexts;
+template <typename>
+class ImageCacheState;
+
+template<typename T>
+class Builder;
+
+template <typename T>
+struct C_BlockIORequest;
+
+template <typename T>
+struct C_WriteRequest;
+
+using GenericLogOperations = std::list<GenericLogOperationSharedPtr>;
+
+
+template <typename ImageCtxT>
+class AbstractWriteLog {
+public:
+ typedef io::Extent Extent;
+ typedef io::Extents Extents;
+ using This = AbstractWriteLog<ImageCtxT>;
+ Builder<This> *m_builder;
+
+ AbstractWriteLog(ImageCtxT &image_ctx,
+ librbd::cache::pwl::ImageCacheState<ImageCtxT>* cache_state,
+ Builder<This> *builder,
+ cache::ImageWritebackInterface& image_writeback,
+ plugin::Api<ImageCtxT>& plugin_api);
+ virtual ~AbstractWriteLog();
+ AbstractWriteLog(const AbstractWriteLog&) = delete;
+ AbstractWriteLog &operator=(const AbstractWriteLog&) = delete;
+
+ /// IO methods
+ void read(
+ Extents&& image_extents, ceph::bufferlist *bl,
+ int fadvise_flags, Context *on_finish);
+ void write(
+ Extents&& image_extents, ceph::bufferlist&& bl,
+ int fadvise_flags,
+ Context *on_finish);
+ void discard(
+ uint64_t offset, uint64_t length,
+ uint32_t discard_granularity_bytes,
+ Context *on_finish);
+ void flush(
+ io::FlushSource flush_source, Context *on_finish);
+ void writesame(
+ uint64_t offset, uint64_t length,
+ ceph::bufferlist&& bl,
+ int fadvise_flags, Context *on_finish);
+ void compare_and_write(
+ Extents&& image_extents,
+ ceph::bufferlist&& cmp_bl, ceph::bufferlist&& bl,
+ uint64_t *mismatch_offset,int fadvise_flags,
+ Context *on_finish);
+
+ /// internal state methods
+ void init(Context *on_finish);
+ void shut_down(Context *on_finish);
+ void invalidate(Context *on_finish);
+ void flush(Context *on_finish);
+
+ using C_WriteRequestT = pwl::C_WriteRequest<This>;
+ using C_BlockIORequestT = pwl::C_BlockIORequest<This>;
+ using C_FlushRequestT = pwl::C_FlushRequest<This>;
+ using C_DiscardRequestT = pwl::C_DiscardRequest<This>;
+ using C_WriteSameRequestT = pwl::C_WriteSameRequest<This>;
+
+ CephContext * get_context();
+ void release_guarded_request(BlockGuardCell *cell);
+ void release_write_lanes(C_BlockIORequestT *req);
+ virtual bool alloc_resources(C_BlockIORequestT *req) = 0;
+ virtual void setup_schedule_append(
+ pwl::GenericLogOperationsVector &ops, bool do_early_flush,
+ C_BlockIORequestT *req) = 0;
+ void schedule_append(pwl::GenericLogOperationsVector &ops, C_BlockIORequestT *req = nullptr);
+ void schedule_append(pwl::GenericLogOperationSharedPtr op, C_BlockIORequestT *req = nullptr);
+ void flush_new_sync_point(C_FlushRequestT *flush_req,
+ pwl::DeferredContexts &later);
+
+ std::shared_ptr<pwl::SyncPoint> get_current_sync_point() {
+ return m_current_sync_point;
+ }
+ bool get_persist_on_flush() {
+ return m_persist_on_flush;
+ }
+ void inc_last_op_sequence_num() {
+ m_perfcounter->inc(l_librbd_pwl_log_ops, 1);
+ ++m_last_op_sequence_num;
+ }
+ uint64_t get_last_op_sequence_num() {
+ return m_last_op_sequence_num;
+ }
+ uint64_t get_current_sync_gen() {
+ return m_current_sync_gen;
+ }
+ unsigned int get_free_lanes() {
+ return m_free_lanes;
+ }
+ uint32_t get_free_log_entries() {
+ return m_free_log_entries;
+ }
+ void add_into_log_map(pwl::GenericWriteLogEntries &log_entries,
+ C_BlockIORequestT *req);
+ virtual void complete_user_request(Context *&user_req, int r) = 0;
+ virtual void copy_bl_to_buffer(
+ WriteRequestResources *resources,
+ std::unique_ptr<WriteLogOperationSet> &op_set) {}
+
+private:
+ typedef std::list<pwl::C_WriteRequest<This> *> C_WriteRequests;
+ typedef std::list<pwl::C_BlockIORequest<This> *> C_BlockIORequests;
+
+ std::atomic<bool> m_initialized = {false};
+
+ uint64_t m_bytes_dirty = 0; /* Total bytes yet to flush to RBD */
+ utime_t m_last_alloc_fail; /* Entry or buffer allocation fail seen */
+
+ pwl::WriteLogGuard m_write_log_guard;
+
+ /* Starts at 0 for a new write log. Incremented on every flush. */
+ uint64_t m_current_sync_gen = 0;
+ /* Starts at 0 on each sync gen increase. Incremented before applied
+ to an operation */
+ uint64_t m_last_op_sequence_num = 0;
+
+ bool m_persist_on_write_until_flush = true;
+
+ pwl::WriteLogGuard m_flush_guard;
+ mutable ceph::mutex m_flush_guard_lock;
+
+ /* Debug counters for the places m_async_op_tracker is used */
+ std::atomic<int> m_async_complete_ops = {0};
+ std::atomic<int> m_async_null_flush_finish = {0};
+ std::atomic<int> m_async_process_work = {0};
+
+ /* Hold m_deferred_dispatch_lock while consuming from m_deferred_ios. */
+ mutable ceph::mutex m_deferred_dispatch_lock;
+
+ /* Used in release/detain to make BlockGuard preserve submission order */
+ mutable ceph::mutex m_blockguard_lock;
+
+ /* Use m_blockguard_lock for the following 3 things */
+ bool m_barrier_in_progress = false;
+ BlockGuardCell *m_barrier_cell = nullptr;
+
+ bool m_wake_up_enabled = true;
+
+ Contexts m_flush_complete_contexts;
+
+ std::shared_ptr<pwl::SyncPoint> m_current_sync_point = nullptr;
+ bool m_persist_on_flush = false; //If false, persist each write before completion
+
+ int m_flush_ops_in_flight = 0;
+ int m_flush_bytes_in_flight = 0;
+ uint64_t m_lowest_flushing_sync_gen = 0;
+
+ /* Writes that have left the block guard, but are waiting for resources */
+ C_BlockIORequests m_deferred_ios;
+ /* Throttle writes concurrently allocating & replicating */
+ unsigned int m_free_lanes = pwl::MAX_CONCURRENT_WRITES;
+
+ SafeTimer *m_timer = nullptr; /* Used with m_timer_lock */
+ mutable ceph::mutex *m_timer_lock = nullptr; /* Used with and by m_timer */
+ Context *m_timer_ctx = nullptr;
+
+ ThreadPool m_thread_pool;
+
+ uint32_t m_discard_granularity_bytes;
+
+ BlockGuardCell* detain_guarded_request_helper(pwl::GuardedRequest &req);
+ BlockGuardCell* detain_guarded_request_barrier_helper(
+ pwl::GuardedRequest &req);
+ void detain_guarded_request(C_BlockIORequestT *request,
+ pwl::GuardedRequestFunctionContext *guarded_ctx,
+ bool is_barrier);
+ void perf_start(const std::string name);
+ void perf_stop();
+ void log_perf();
+ void periodic_stats();
+ void arm_periodic_stats();
+
+ void pwl_init(Context *on_finish, pwl::DeferredContexts &later);
+ void check_image_cache_state_clean();
+
+ void flush_dirty_entries(Context *on_finish);
+ bool can_flush_entry(const std::shared_ptr<pwl::GenericLogEntry> log_entry);
+ bool handle_flushed_sync_point(
+ std::shared_ptr<pwl::SyncPointLogEntry> log_entry);
+ void sync_point_writer_flushed(
+ std::shared_ptr<pwl::SyncPointLogEntry> log_entry);
+
+ void init_flush_new_sync_point(pwl::DeferredContexts &later);
+ void new_sync_point(pwl::DeferredContexts &later);
+ pwl::C_FlushRequest<AbstractWriteLog<ImageCtxT>>* make_flush_req(
+ Context *on_finish);
+ void flush_new_sync_point_if_needed(C_FlushRequestT *flush_req,
+ pwl::DeferredContexts &later);
+
+ void alloc_and_dispatch_io_req(C_BlockIORequestT *write_req);
+ void schedule_complete_op_log_entries(pwl::GenericLogOperations &&ops,
+ const int r);
+ void internal_flush(bool invalidate, Context *on_finish);
+
+protected:
+ librbd::cache::pwl::ImageCacheState<ImageCtxT>* m_cache_state = nullptr;
+
+ std::atomic<bool> m_shutting_down = {false};
+ std::atomic<bool> m_invalidating = {false};
+
+ ImageCtxT &m_image_ctx;
+
+ std::string m_log_pool_name;
+ uint64_t m_log_pool_size;
+
+ uint32_t m_total_log_entries = 0;
+ uint32_t m_free_log_entries = 0;
+
+ std::atomic<uint64_t> m_bytes_allocated = {0}; /* Total bytes allocated in write buffers */
+ uint64_t m_bytes_cached = 0; /* Total bytes used in write buffers */
+ uint64_t m_bytes_allocated_cap = 0;
+
+ std::atomic<bool> m_alloc_failed_since_retire = {false};
+
+ cache::ImageWritebackInterface& m_image_writeback;
+ plugin::Api<ImageCtxT>& m_plugin_api;
+
+ /*
+ * When m_first_free_entry == m_first_valid_entry, the log is
+ * empty. There is always at least one free entry, which can't be
+ * used.
+ */
+ uint64_t m_first_free_entry = 0; /* Entries from here to m_first_valid_entry-1 are free */
+ uint64_t m_first_valid_entry = 0; /* Entries from here to m_first_free_entry-1 are valid */
+
+ /* All writes bearing this and all prior sync gen numbers are flushed */
+ uint64_t m_flushed_sync_gen = 0;
+
+ AsyncOpTracker m_async_op_tracker;
+ /* Debug counters for the places m_async_op_tracker is used */
+ std::atomic<int> m_async_flush_ops = {0};
+ std::atomic<int> m_async_append_ops = {0};
+
+ /* Acquire locks in order declared here */
+
+ mutable ceph::mutex m_log_retire_lock;
+ /* Hold a read lock on m_entry_reader_lock to add readers to log entry
+ * bufs. Hold a write lock to prevent readers from being added (e.g. when
+ * removing log entrys from the map). No lock required to remove readers. */
+ mutable RWLock m_entry_reader_lock;
+ /* Hold m_log_append_lock while appending or retiring log entries. */
+ mutable ceph::mutex m_log_append_lock;
+ /* Used for most synchronization */
+ mutable ceph::mutex m_lock;
+
+ /* Use m_blockguard_lock for the following 3 things */
+ pwl::WriteLogGuard::BlockOperations m_awaiting_barrier;
+
+ bool m_wake_up_requested = false;
+ bool m_wake_up_scheduled = false;
+ bool m_appending = false;
+ bool m_dispatching_deferred_ops = false;
+
+ pwl::GenericLogOperations m_ops_to_flush; /* Write ops needing flush in local log */
+ pwl::GenericLogOperations m_ops_to_append; /* Write ops needing event append in local log */
+
+ pwl::WriteLogMap m_blocks_to_log_entries;
+
+ /* New entries are at the back. Oldest at the front */
+ pwl::GenericLogEntries m_log_entries;
+ pwl::GenericLogEntries m_dirty_log_entries;
+
+ PerfCounters *m_perfcounter = nullptr;
+
+ unsigned int m_unpublished_reserves = 0;
+
+ ContextWQ m_work_queue;
+
+ void wake_up();
+
+ void update_entries(
+ std::shared_ptr<pwl::GenericLogEntry> *log_entry,
+ pwl::WriteLogCacheEntry *cache_entry,
+ std::map<uint64_t, bool> &missing_sync_points,
+ std::map<uint64_t,
+ std::shared_ptr<pwl::SyncPointLogEntry>> &sync_point_entries,
+ uint64_t entry_index);
+ void update_sync_points(
+ std::map<uint64_t, bool> &missing_sync_points,
+ std::map<uint64_t,
+ std::shared_ptr<pwl::SyncPointLogEntry>> &sync_point_entries,
+ pwl::DeferredContexts &later);
+ virtual void inc_allocated_cached_bytes(
+ std::shared_ptr<pwl::GenericLogEntry> log_entry) = 0;
+ Context *construct_flush_entry(
+ const std::shared_ptr<pwl::GenericLogEntry> log_entry, bool invalidating);
+ void detain_flush_guard_request(std::shared_ptr<GenericLogEntry> log_entry,
+ GuardedRequestFunctionContext *guarded_ctx);
+ void process_writeback_dirty_entries();
+ bool can_retire_entry(const std::shared_ptr<pwl::GenericLogEntry> log_entry);
+
+ void dispatch_deferred_writes(void);
+ void complete_op_log_entries(pwl::GenericLogOperations &&ops, const int r);
+
+ bool check_allocation(
+ C_BlockIORequestT *req, uint64_t bytes_cached, uint64_t bytes_dirtied,
+ uint64_t bytes_allocated, uint32_t num_lanes, uint32_t num_log_entries,
+ uint32_t num_unpublished_reserves);
+ void append_scheduled(
+ pwl::GenericLogOperations &ops, bool &ops_remain, bool &appending,
+ bool isRWL=false);
+
+ virtual void process_work() = 0;
+ virtual void append_scheduled_ops(void) = 0;
+ virtual void schedule_append_ops(pwl::GenericLogOperations &ops, C_BlockIORequestT *req) = 0;
+ virtual void remove_pool_file() = 0;
+ virtual bool initialize_pool(Context *on_finish,
+ pwl::DeferredContexts &later) = 0;
+ virtual void collect_read_extents(
+ uint64_t read_buffer_offset, LogMapEntry<GenericWriteLogEntry> map_entry,
+ std::vector<std::shared_ptr<GenericWriteLogEntry>> &log_entries_to_read,
+ std::vector<bufferlist*> &bls_to_read, uint64_t entry_hit_length,
+ Extent hit_extent, pwl::C_ReadRequest *read_ctx) = 0;
+ virtual void complete_read(
+ std::vector<std::shared_ptr<GenericWriteLogEntry>> &log_entries_to_read,
+ std::vector<bufferlist*> &bls_to_read, Context *ctx) = 0;
+ virtual void write_data_to_buffer(
+ std::shared_ptr<pwl::WriteLogEntry> ws_entry,
+ pwl::WriteLogCacheEntry *cache_entry) {}
+ virtual void release_ram(
+ const std::shared_ptr<pwl::GenericLogEntry> log_entry) {}
+ virtual void alloc_op_log_entries(pwl::GenericLogOperations &ops) {}
+ virtual bool retire_entries(const unsigned long int frees_per_tx) {
+ return false;
+ }
+ virtual void schedule_flush_and_append(
+ pwl::GenericLogOperationsVector &ops) {}
+ virtual void persist_last_flushed_sync_gen() {}
+ virtual void reserve_cache(C_BlockIORequestT *req, bool &alloc_succeeds,
+ bool &no_space) {}
+ virtual void construct_flush_entries(pwl::GenericLogEntries entries_to_flush,
+ DeferredContexts &post_unlock,
+ bool has_write_entry) = 0;
+ virtual uint64_t get_max_extent() {
+ return 0;
+ }
+ void update_image_cache_state(void);
+ void write_image_cache_state(std::unique_lock<ceph::mutex>& locker);
+ void handle_write_image_cache_state(int r);
+};
+
+} // namespace pwl
+} // namespace cache
+} // namespace librbd
+
+extern template class librbd::cache::pwl::AbstractWriteLog<librbd::ImageCtx>;
+
+#endif // CEPH_LIBRBD_CACHE_PARENT_WRITE_LOG