summaryrefslogtreecommitdiffstats
path: root/src/librbd/journal
diff options
context:
space:
mode:
Diffstat (limited to 'src/librbd/journal')
-rw-r--r--src/librbd/journal/CreateRequest.cc235
-rw-r--r--src/librbd/journal/CreateRequest.h106
-rw-r--r--src/librbd/journal/DemoteRequest.cc255
-rw-r--r--src/librbd/journal/DemoteRequest.h107
-rw-r--r--src/librbd/journal/DisabledPolicy.h31
-rw-r--r--src/librbd/journal/ObjectDispatch.cc213
-rw-r--r--src/librbd/journal/ObjectDispatch.h113
-rw-r--r--src/librbd/journal/OpenRequest.cc144
-rw-r--r--src/librbd/journal/OpenRequest.h85
-rw-r--r--src/librbd/journal/Policy.h25
-rw-r--r--src/librbd/journal/PromoteRequest.cc237
-rw-r--r--src/librbd/journal/PromoteRequest.h109
-rw-r--r--src/librbd/journal/RemoveRequest.cc154
-rw-r--r--src/librbd/journal/RemoveRequest.h82
-rw-r--r--src/librbd/journal/Replay.cc1190
-rw-r--r--src/librbd/journal/Replay.h210
-rw-r--r--src/librbd/journal/ResetRequest.cc162
-rw-r--r--src/librbd/journal/ResetRequest.h111
-rw-r--r--src/librbd/journal/StandardPolicy.cc32
-rw-r--r--src/librbd/journal/StandardPolicy.h38
-rw-r--r--src/librbd/journal/TypeTraits.h26
-rw-r--r--src/librbd/journal/Types.cc950
-rw-r--r--src/librbd/journal/Types.h685
-rw-r--r--src/librbd/journal/Utils.cc86
-rw-r--r--src/librbd/journal/Utils.h81
25 files changed, 5467 insertions, 0 deletions
diff --git a/src/librbd/journal/CreateRequest.cc b/src/librbd/journal/CreateRequest.cc
new file mode 100644
index 00000000..4f10ec65
--- /dev/null
+++ b/src/librbd/journal/CreateRequest.cc
@@ -0,0 +1,235 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include "common/dout.h"
+#include "common/errno.h"
+#include "include/ceph_assert.h"
+#include "librbd/Utils.h"
+#include "common/Timer.h"
+#include "common/WorkQueue.h"
+#include "journal/Settings.h"
+#include "librbd/journal/CreateRequest.h"
+#include "librbd/journal/RemoveRequest.h"
+
+#define dout_subsys ceph_subsys_rbd
+#undef dout_prefix
+#define dout_prefix *_dout << "librbd::Journal::CreateRequest: "
+
+namespace librbd {
+
+using util::create_context_callback;
+
+namespace journal {
+
+template<typename I>
+CreateRequest<I>::CreateRequest(IoCtx &ioctx, const std::string &imageid,
+ uint8_t order, uint8_t splay_width,
+ const std::string &object_pool,
+ uint64_t tag_class, TagData &tag_data,
+ const std::string &client_id,
+ ContextWQ *op_work_queue,
+ Context *on_finish)
+ : m_ioctx(ioctx), m_image_id(imageid), m_order(order),
+ m_splay_width(splay_width), m_object_pool(object_pool),
+ m_tag_class(tag_class), m_tag_data(tag_data), m_image_client_id(client_id),
+ m_op_work_queue(op_work_queue), m_on_finish(on_finish) {
+ m_cct = reinterpret_cast<CephContext *>(m_ioctx.cct());
+}
+
+template<typename I>
+void CreateRequest<I>::send() {
+ ldout(m_cct, 20) << this << " " << __func__ << dendl;
+
+ if (m_order > 64 || m_order < 12) {
+ lderr(m_cct) << "order must be in the range [12, 64]" << dendl;
+ complete(-EDOM);
+ return;
+ }
+ if (m_splay_width == 0) {
+ complete(-EINVAL);
+ return;
+ }
+
+ get_pool_id();
+}
+
+template<typename I>
+void CreateRequest<I>::get_pool_id() {
+ ldout(m_cct, 20) << this << " " << __func__ << dendl;
+
+ if (m_object_pool.empty()) {
+ create_journal();
+ return;
+ }
+
+ librados::Rados rados(m_ioctx);
+ IoCtx data_ioctx;
+ int r = rados.ioctx_create(m_object_pool.c_str(), data_ioctx);
+ if (r != 0) {
+ lderr(m_cct) << "failed to create journal: "
+ << "error opening journal object pool '" << m_object_pool
+ << "': " << cpp_strerror(r) << dendl;
+ complete(r);
+ return;
+ }
+ data_ioctx.set_namespace(m_ioctx.get_namespace());
+
+ m_pool_id = data_ioctx.get_id();
+ create_journal();
+}
+
+template<typename I>
+void CreateRequest<I>::create_journal() {
+ ldout(m_cct, 20) << this << " " << __func__ << dendl;
+
+ ImageCtx::get_timer_instance(m_cct, &m_timer, &m_timer_lock);
+ m_journaler = new Journaler(m_op_work_queue, m_timer, m_timer_lock,
+ m_ioctx, m_image_id, m_image_client_id, {});
+
+ using klass = CreateRequest<I>;
+ Context *ctx = create_context_callback<klass, &klass::handle_create_journal>(this);
+
+ m_journaler->create(m_order, m_splay_width, m_pool_id, ctx);
+}
+
+template<typename I>
+Context *CreateRequest<I>::handle_create_journal(int *result) {
+ ldout(m_cct, 20) << __func__ << ": r=" << *result << dendl;
+
+ if (*result < 0) {
+ lderr(m_cct) << "failed to create journal: " << cpp_strerror(*result) << dendl;
+ shut_down_journaler(*result);
+ return nullptr;
+ }
+
+ allocate_journal_tag();
+ return nullptr;
+}
+
+template<typename I>
+void CreateRequest<I>::allocate_journal_tag() {
+ ldout(m_cct, 20) << this << " " << __func__ << dendl;
+
+ using klass = CreateRequest<I>;
+ Context *ctx = create_context_callback<klass, &klass::handle_journal_tag>(this);
+
+ encode(m_tag_data, m_bl);
+ m_journaler->allocate_tag(m_tag_class, m_bl, &m_tag, ctx);
+}
+
+template<typename I>
+Context *CreateRequest<I>::handle_journal_tag(int *result) {
+ ldout(m_cct, 20) << __func__ << ": r=" << *result << dendl;
+
+ if (*result < 0) {
+ lderr(m_cct) << "failed to allocate tag: " << cpp_strerror(*result) << dendl;
+ shut_down_journaler(*result);
+ return nullptr;
+ }
+
+ register_client();
+ return nullptr;
+}
+
+template<typename I>
+void CreateRequest<I>::register_client() {
+ ldout(m_cct, 20) << this << " " << __func__ << dendl;
+
+ m_bl.clear();
+ encode(ClientData{ImageClientMeta{m_tag.tag_class}}, m_bl);
+
+ using klass = CreateRequest<I>;
+ Context *ctx = create_context_callback<klass, &klass::handle_register_client>(this);
+
+ m_journaler->register_client(m_bl, ctx);
+}
+
+template<typename I>
+Context *CreateRequest<I>::handle_register_client(int *result) {
+ ldout(m_cct, 20) << __func__ << ": r=" << *result << dendl;
+
+ if (*result < 0) {
+ lderr(m_cct) << "failed to register client: " << cpp_strerror(*result) << dendl;
+ }
+
+ shut_down_journaler(*result);
+ return nullptr;
+}
+
+template<typename I>
+void CreateRequest<I>::shut_down_journaler(int r) {
+ ldout(m_cct, 20) << this << " " << __func__ << dendl;
+
+ m_r_saved = r;
+
+ using klass = CreateRequest<I>;
+ Context *ctx = create_context_callback<klass, &klass::handle_journaler_shutdown>(this);
+
+ m_journaler->shut_down(ctx);
+}
+
+template<typename I>
+Context *CreateRequest<I>::handle_journaler_shutdown(int *result) {
+ ldout(m_cct, 20) << __func__ << ": r=" << *result << dendl;
+
+ if (*result < 0) {
+ lderr(m_cct) << "failed to shut down journaler: " << cpp_strerror(*result) << dendl;
+ }
+
+ delete m_journaler;
+
+ if (!m_r_saved) {
+ complete(0);
+ return nullptr;
+ }
+
+ // there was an error during journal creation, so we rollback
+ // what ever was done. the easiest way to do this is to invoke
+ // journal remove state machine, although it's not the most
+ // cleanest approach when it comes to redundancy, but that's
+ // ok during the failure path.
+ remove_journal();
+ return nullptr;
+}
+
+template<typename I>
+void CreateRequest<I>::remove_journal() {
+ ldout(m_cct, 20) << this << " " << __func__ << dendl;
+
+ using klass = CreateRequest<I>;
+ Context *ctx = create_context_callback<klass, &klass::handle_remove_journal>(this);
+
+ RemoveRequest<I> *req = RemoveRequest<I>::create(
+ m_ioctx, m_image_id, m_image_client_id, m_op_work_queue, ctx);
+ req->send();
+}
+
+template<typename I>
+Context *CreateRequest<I>::handle_remove_journal(int *result) {
+ ldout(m_cct, 20) << __func__ << ": r=" << *result << dendl;
+
+ if (*result < 0) {
+ lderr(m_cct) << "error cleaning up journal after creation failed: "
+ << cpp_strerror(*result) << dendl;
+ }
+
+ complete(m_r_saved);
+ return nullptr;
+}
+
+template<typename I>
+void CreateRequest<I>::complete(int r) {
+ ldout(m_cct, 20) << this << " " << __func__ << dendl;
+
+ if (r == 0) {
+ ldout(m_cct, 20) << "done." << dendl;
+ }
+
+ m_on_finish->complete(r);
+ delete this;
+}
+
+} // namespace journal
+} // namespace librbd
+
+template class librbd::journal::CreateRequest<librbd::ImageCtx>;
diff --git a/src/librbd/journal/CreateRequest.h b/src/librbd/journal/CreateRequest.h
new file mode 100644
index 00000000..c2b8cd9c
--- /dev/null
+++ b/src/librbd/journal/CreateRequest.h
@@ -0,0 +1,106 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#ifndef CEPH_LIBRBD_JOURNAL_CREATE_REQUEST_H
+#define CEPH_LIBRBD_JOURNAL_CREATE_REQUEST_H
+
+#include "include/int_types.h"
+#include "include/buffer.h"
+#include "include/rados/librados.hpp"
+#include "include/rbd/librbd.hpp"
+#include "common/Mutex.h"
+#include "librbd/ImageCtx.h"
+#include "journal/Journaler.h"
+#include "librbd/journal/Types.h"
+#include "librbd/journal/TypeTraits.h"
+#include "cls/journal/cls_journal_types.h"
+
+using librados::IoCtx;
+using journal::Journaler;
+
+class Context;
+class ContextWQ;
+class SafeTimer;
+
+namespace journal {
+ class Journaler;
+}
+
+namespace librbd {
+
+class ImageCtx;
+
+namespace journal {
+
+template<typename ImageCtxT = ImageCtx>
+class CreateRequest {
+public:
+ static CreateRequest *create(IoCtx &ioctx, const std::string &imageid,
+ uint8_t order, uint8_t splay_width,
+ const std::string &object_pool,
+ uint64_t tag_class, TagData &tag_data,
+ const std::string &client_id,
+ ContextWQ *op_work_queue, Context *on_finish) {
+ return new CreateRequest(ioctx, imageid, order, splay_width, object_pool,
+ tag_class, tag_data, client_id, op_work_queue,
+ on_finish);
+ }
+
+ void send();
+
+private:
+ typedef typename TypeTraits<ImageCtxT>::Journaler Journaler;
+
+ CreateRequest(IoCtx &ioctx, const std::string &imageid, uint8_t order,
+ uint8_t splay_width, const std::string &object_pool,
+ uint64_t tag_class, TagData &tag_data,
+ const std::string &client_id, ContextWQ *op_work_queue,
+ Context *on_finish);
+
+ IoCtx &m_ioctx;
+ std::string m_image_id;
+ uint8_t m_order;
+ uint8_t m_splay_width;
+ std::string m_object_pool;
+ uint64_t m_tag_class;
+ TagData m_tag_data;
+ std::string m_image_client_id;
+ ContextWQ *m_op_work_queue;
+ Context *m_on_finish;
+
+ CephContext *m_cct;
+ cls::journal::Tag m_tag;
+ bufferlist m_bl;
+ Journaler *m_journaler;
+ SafeTimer *m_timer;
+ Mutex *m_timer_lock;
+ int m_r_saved;
+
+ int64_t m_pool_id = -1;
+
+ void get_pool_id();
+
+ void create_journal();
+ Context *handle_create_journal(int *result);
+
+ void allocate_journal_tag();
+ Context *handle_journal_tag(int *result);
+
+ void register_client();
+ Context *handle_register_client(int *result);
+
+ void shut_down_journaler(int r);
+ Context *handle_journaler_shutdown(int *result);
+
+ void remove_journal();
+ Context *handle_remove_journal(int *result);
+
+ void complete(int r);
+};
+
+} // namespace journal
+} // namespace librbd
+
+extern template class librbd::journal::CreateRequest<librbd::ImageCtx>;
+
+#endif /* CEPH_LIBRBD_JOURNAL_CREATE_REQUEST_H */
diff --git a/src/librbd/journal/DemoteRequest.cc b/src/librbd/journal/DemoteRequest.cc
new file mode 100644
index 00000000..66a1d19e
--- /dev/null
+++ b/src/librbd/journal/DemoteRequest.cc
@@ -0,0 +1,255 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include "librbd/journal/DemoteRequest.h"
+#include "common/dout.h"
+#include "common/errno.h"
+#include "common/WorkQueue.h"
+#include "journal/Journaler.h"
+#include "journal/Settings.h"
+#include "librbd/ImageCtx.h"
+#include "librbd/Journal.h"
+#include "librbd/Utils.h"
+#include "librbd/journal/OpenRequest.h"
+
+#define dout_subsys ceph_subsys_rbd
+#undef dout_prefix
+#define dout_prefix *_dout << "librbd::journal::DemoteRequest: " << this \
+ << " " << __func__ << ": "
+
+namespace librbd {
+namespace journal {
+
+using librbd::util::create_async_context_callback;
+using librbd::util::create_context_callback;
+
+template <typename I>
+DemoteRequest<I>::DemoteRequest(I &image_ctx, Context *on_finish)
+ : m_image_ctx(image_ctx), m_on_finish(on_finish),
+ m_lock("DemoteRequest::m_lock") {
+}
+
+template <typename I>
+DemoteRequest<I>::~DemoteRequest() {
+ ceph_assert(m_journaler == nullptr);
+}
+
+template <typename I>
+void DemoteRequest<I>::send() {
+ open_journaler();
+}
+
+template <typename I>
+void DemoteRequest<I>::open_journaler() {
+ CephContext *cct = m_image_ctx.cct;
+ ldout(cct, 20) << dendl;
+
+ m_journaler = new Journaler(m_image_ctx.md_ctx, m_image_ctx.id,
+ Journal<>::IMAGE_CLIENT_ID, {});
+ auto ctx = create_async_context_callback(
+ m_image_ctx, create_context_callback<
+ DemoteRequest<I>, &DemoteRequest<I>::handle_open_journaler>(this));
+ auto req = OpenRequest<I>::create(&m_image_ctx, m_journaler, &m_lock,
+ &m_client_meta, &m_tag_tid, &m_tag_data,
+ ctx);
+ req->send();
+}
+
+template <typename I>
+void DemoteRequest<I>::handle_open_journaler(int r) {
+ CephContext *cct = m_image_ctx.cct;
+ ldout(cct, 20) << "r=" << r << dendl;
+
+ if (r < 0) {
+ m_ret_val = r;
+ lderr(cct) << "failed to open journal: " << cpp_strerror(r) << dendl;
+ shut_down_journaler();
+ return;
+ } else if (m_tag_data.mirror_uuid != Journal<>::LOCAL_MIRROR_UUID) {
+ m_ret_val = -EINVAL;
+ lderr(cct) << "image is not currently the primary" << dendl;
+ shut_down_journaler();
+ return;
+ }
+
+ allocate_tag();
+}
+
+template <typename I>
+void DemoteRequest<I>::allocate_tag() {
+ CephContext *cct = m_image_ctx.cct;
+ ldout(cct, 20) << dendl;
+
+ cls::journal::Client client;
+ int r = m_journaler->get_cached_client(Journal<>::IMAGE_CLIENT_ID, &client);
+ if (r < 0) {
+ m_ret_val = r;
+ lderr(cct) << "failed to retrieve client: " << cpp_strerror(r) << dendl;
+ shut_down_journaler();
+ return;
+ }
+
+ TagPredecessor predecessor;
+ predecessor.mirror_uuid = Journal<>::LOCAL_MIRROR_UUID;
+ if (!client.commit_position.object_positions.empty()) {
+ auto position = client.commit_position.object_positions.front();
+ predecessor.commit_valid = true;
+ predecessor.tag_tid = position.tag_tid;
+ predecessor.entry_tid = position.entry_tid;
+ }
+
+ TagData tag_data;
+ tag_data.mirror_uuid = Journal<>::ORPHAN_MIRROR_UUID;
+ tag_data.predecessor = std::move(predecessor);
+
+ bufferlist tag_bl;
+ encode(tag_data, tag_bl);
+
+ auto ctx = create_context_callback<
+ DemoteRequest<I>, &DemoteRequest<I>::handle_allocate_tag>(this);
+ m_journaler->allocate_tag(m_client_meta.tag_class, tag_bl, &m_tag, ctx);
+}
+
+template <typename I>
+void DemoteRequest<I>::handle_allocate_tag(int r) {
+ CephContext *cct = m_image_ctx.cct;
+ ldout(cct, 20) << "r=" << r << dendl;
+
+ if (r < 0) {
+ m_ret_val = r;
+ lderr(cct) << "failed to allocate tag: " << cpp_strerror(r) << dendl;
+ shut_down_journaler();
+ return;
+ }
+
+ m_tag_tid = m_tag.tid;
+ append_event();
+}
+
+template <typename I>
+void DemoteRequest<I>::append_event() {
+ CephContext *cct = m_image_ctx.cct;
+ ldout(cct, 20) << dendl;
+
+ EventEntry event_entry{DemotePromoteEvent{}, {}};
+ bufferlist event_entry_bl;
+ encode(event_entry, event_entry_bl);
+
+ m_journaler->start_append(0);
+ m_future = m_journaler->append(m_tag_tid, event_entry_bl);
+
+ auto ctx = create_context_callback<
+ DemoteRequest<I>, &DemoteRequest<I>::handle_append_event>(this);
+ m_future.flush(ctx);
+
+}
+
+template <typename I>
+void DemoteRequest<I>::handle_append_event(int r) {
+ CephContext *cct = m_image_ctx.cct;
+ ldout(cct, 20) << "r=" << r << dendl;
+
+ if (r < 0) {
+ m_ret_val = r;
+ lderr(cct) << "failed to append demotion journal event: " << cpp_strerror(r)
+ << dendl;
+ stop_append();
+ return;
+ }
+
+ commit_event();
+}
+
+template <typename I>
+void DemoteRequest<I>::commit_event() {
+ CephContext *cct = m_image_ctx.cct;
+ ldout(cct, 20) << dendl;
+
+ m_journaler->committed(m_future);
+
+ auto ctx = create_context_callback<
+ DemoteRequest<I>, &DemoteRequest<I>::handle_commit_event>(this);
+ m_journaler->flush_commit_position(ctx);
+}
+
+template <typename I>
+void DemoteRequest<I>::handle_commit_event(int r) {
+ CephContext *cct = m_image_ctx.cct;
+ ldout(cct, 20) << "r=" << r << dendl;
+
+ if (r < 0) {
+ m_ret_val = r;
+ lderr(cct) << "failed to flush demotion commit position: "
+ << cpp_strerror(r) << dendl;
+ }
+
+ stop_append();
+}
+
+template <typename I>
+void DemoteRequest<I>::stop_append() {
+ CephContext *cct = m_image_ctx.cct;
+ ldout(cct, 20) << dendl;
+
+ auto ctx = create_context_callback<
+ DemoteRequest<I>, &DemoteRequest<I>::handle_stop_append>(this);
+ m_journaler->stop_append(ctx);
+}
+
+template <typename I>
+void DemoteRequest<I>::handle_stop_append(int r) {
+ CephContext *cct = m_image_ctx.cct;
+ ldout(cct, 20) << "r=" << r << dendl;
+
+ if (r < 0) {
+ if (m_ret_val == 0) {
+ m_ret_val = r;
+ }
+ lderr(cct) << "failed to stop journal append: " << cpp_strerror(r) << dendl;
+ }
+
+ shut_down_journaler();
+}
+
+template <typename I>
+void DemoteRequest<I>::shut_down_journaler() {
+ CephContext *cct = m_image_ctx.cct;
+ ldout(cct, 20) << dendl;
+
+ Context *ctx = create_async_context_callback(
+ m_image_ctx, create_context_callback<
+ DemoteRequest<I>, &DemoteRequest<I>::handle_shut_down_journaler>(this));
+ m_journaler->shut_down(ctx);
+}
+
+template <typename I>
+void DemoteRequest<I>::handle_shut_down_journaler(int r) {
+ CephContext *cct = m_image_ctx.cct;
+ ldout(cct, 20) << "r=" << r << dendl;
+
+ if (r < 0) {
+ lderr(cct) << "failed to shut down journal: " << cpp_strerror(r) << dendl;
+ }
+
+ delete m_journaler;
+ m_journaler = nullptr;
+ finish(r);
+}
+
+template <typename I>
+void DemoteRequest<I>::finish(int r) {
+ if (m_ret_val < 0) {
+ r = m_ret_val;
+ }
+
+ CephContext *cct = m_image_ctx.cct;
+ ldout(cct, 20) << "r=" << r << dendl;
+
+ m_on_finish->complete(r);
+ delete this;
+}
+
+} // namespace journal
+} // namespace librbd
+
+template class librbd::journal::DemoteRequest<librbd::ImageCtx>;
diff --git a/src/librbd/journal/DemoteRequest.h b/src/librbd/journal/DemoteRequest.h
new file mode 100644
index 00000000..5fea7f47
--- /dev/null
+++ b/src/librbd/journal/DemoteRequest.h
@@ -0,0 +1,107 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#ifndef CEPH_LIBRBD_JOURNAL_DEMOTE_REQUEST_H
+#define CEPH_LIBRBD_JOURNAL_DEMOTE_REQUEST_H
+
+#include "common/Mutex.h"
+#include "cls/journal/cls_journal_types.h"
+#include "journal/Future.h"
+#include "librbd/journal/Types.h"
+#include "librbd/journal/TypeTraits.h"
+
+struct Context;
+
+namespace librbd {
+
+struct ImageCtx;
+
+namespace journal {
+
+template <typename ImageCtxT = librbd::ImageCtx>
+class DemoteRequest {
+public:
+ static DemoteRequest *create(ImageCtxT &image_ctx, Context *on_finish) {
+ return new DemoteRequest(image_ctx, on_finish);
+ }
+
+ DemoteRequest(ImageCtxT &image_ctx, Context *on_finish);
+ ~DemoteRequest();
+
+ void send();
+
+private:
+ /**
+ * @verbatim
+ *
+ * <start>
+ * |
+ * v
+ * OPEN_JOURNALER * * * * *
+ * | *
+ * v *
+ * ALLOCATE_TAG * * * * * *
+ * | *
+ * v *
+ * APPEND_EVENT * * * *
+ * | * *
+ * v * *
+ * COMMIT_EVENT * *
+ * | * *
+ * v * *
+ * STOP_APPEND <* * * *
+ * | *
+ * v *
+ * SHUT_DOWN_JOURNALER <* *
+ * |
+ * v
+ * <finish>
+ *
+ * @endverbatim
+ */
+
+ typedef typename TypeTraits<ImageCtxT>::Journaler Journaler;
+ typedef typename TypeTraits<ImageCtxT>::Future Future;
+
+ ImageCtxT &m_image_ctx;
+ Context *m_on_finish;
+
+ Journaler *m_journaler = nullptr;
+ int m_ret_val = 0;
+
+ Mutex m_lock;
+ ImageClientMeta m_client_meta;
+ uint64_t m_tag_tid = 0;
+ TagData m_tag_data;
+
+ cls::journal::Tag m_tag;
+ Future m_future;
+
+ void open_journaler();
+ void handle_open_journaler(int r);
+
+ void allocate_tag();
+ void handle_allocate_tag(int r);
+
+ void append_event();
+ void handle_append_event(int r);
+
+ void commit_event();
+ void handle_commit_event(int r);
+
+ void stop_append();
+ void handle_stop_append(int r);
+
+ void shut_down_journaler();
+ void handle_shut_down_journaler(int r);
+
+ void finish(int r);
+
+};
+
+} // namespace journal
+} // namespace librbd
+
+extern template class librbd::journal::DemoteRequest<librbd::ImageCtx>;
+
+#endif // CEPH_LIBRBD_JOURNAL_DEMOTE_REQUEST_H
diff --git a/src/librbd/journal/DisabledPolicy.h b/src/librbd/journal/DisabledPolicy.h
new file mode 100644
index 00000000..27d69a50
--- /dev/null
+++ b/src/librbd/journal/DisabledPolicy.h
@@ -0,0 +1,31 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#ifndef CEPH_LIBRBD_JOURNAL_DISABLED_POLICY_H
+#define CEPH_LIBRBD_JOURNAL_DISABLED_POLICY_H
+
+#include "librbd/journal/Policy.h"
+
+namespace librbd {
+
+struct ImageCtx;
+
+namespace journal {
+
+class DisabledPolicy : public Policy {
+public:
+ bool append_disabled() const override {
+ return true;
+ }
+ bool journal_disabled() const override {
+ return true;
+ }
+ void allocate_tag_on_lock(Context *on_finish) override {
+ ceph_abort();
+ }
+};
+
+} // namespace journal
+} // namespace librbd
+
+#endif // CEPH_LIBRBD_JOURNAL_DISABLED_POLICY_H
diff --git a/src/librbd/journal/ObjectDispatch.cc b/src/librbd/journal/ObjectDispatch.cc
new file mode 100644
index 00000000..737f5efe
--- /dev/null
+++ b/src/librbd/journal/ObjectDispatch.cc
@@ -0,0 +1,213 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include "librbd/journal/ObjectDispatch.h"
+#include "common/dout.h"
+#include "common/WorkQueue.h"
+#include "osdc/Striper.h"
+#include "librbd/ImageCtx.h"
+#include "librbd/Journal.h"
+#include "librbd/io/ObjectDispatchSpec.h"
+#include "librbd/io/ObjectDispatcher.h"
+
+#define dout_subsys ceph_subsys_rbd
+#undef dout_prefix
+#define dout_prefix *_dout << "librbd::journal::ObjectDispatch: " << this \
+ << " " << __func__ << ": "
+
+namespace librbd {
+namespace journal {
+
+namespace {
+
+template <typename I>
+struct C_CommitIOEvent : public Context {
+ I* image_ctx;
+ Journal<I>* journal;
+ uint64_t object_no;
+ uint64_t object_off;
+ uint64_t object_len;
+ uint64_t journal_tid;
+ int object_dispatch_flags;
+ Context* on_finish;
+
+ C_CommitIOEvent(I* image_ctx, Journal<I>* journal, uint64_t object_no,
+ uint64_t object_off, uint64_t object_len,
+ uint64_t journal_tid, int object_dispatch_flags,
+ Context* on_finish)
+ : image_ctx(image_ctx), journal(journal), object_no(object_no),
+ object_off(object_off), object_len(object_len), journal_tid(journal_tid),
+ object_dispatch_flags(object_dispatch_flags), on_finish(on_finish) {
+ }
+
+ void finish(int r) override {
+ // don't commit the IO extent if a previous dispatch handler will just
+ // retry the failed IO
+ if (r >= 0 ||
+ (object_dispatch_flags &
+ io::OBJECT_DISPATCH_FLAG_WILL_RETRY_ON_ERROR) == 0) {
+ io::Extents file_extents;
+ Striper::extent_to_file(image_ctx->cct, &image_ctx->layout, object_no,
+ object_off, object_len, file_extents);
+ for (auto& extent : file_extents) {
+ journal->commit_io_event_extent(journal_tid, extent.first,
+ extent.second, r);
+ }
+ }
+
+ if (on_finish != nullptr) {
+ on_finish->complete(r);
+ }
+ }
+};
+
+} // anonymous namespace
+
+template <typename I>
+ObjectDispatch<I>::ObjectDispatch(I* image_ctx, Journal<I>* journal)
+ : m_image_ctx(image_ctx), m_journal(journal) {
+}
+
+template <typename I>
+void ObjectDispatch<I>::shut_down(Context* on_finish) {
+ m_image_ctx->op_work_queue->queue(on_finish, 0);
+}
+
+template <typename I>
+bool ObjectDispatch<I>::discard(
+ const std::string &oid, uint64_t object_no, uint64_t object_off,
+ uint64_t object_len, const ::SnapContext &snapc, int discard_flags,
+ const ZTracer::Trace &parent_trace, int* object_dispatch_flags,
+ uint64_t* journal_tid, io::DispatchResult* dispatch_result,
+ Context** on_finish, Context* on_dispatched) {
+ if (*journal_tid == 0) {
+ // non-journaled IO
+ return false;
+ }
+
+ auto cct = m_image_ctx->cct;
+ ldout(cct, 20) << oid << " " << object_off << "~" << object_len << dendl;
+
+ *on_finish = new C_CommitIOEvent<I>(m_image_ctx, m_journal, object_no,
+ object_off, object_len, *journal_tid,
+ *object_dispatch_flags, *on_finish);
+
+ *dispatch_result = io::DISPATCH_RESULT_CONTINUE;
+ wait_or_flush_event(*journal_tid, *object_dispatch_flags, on_dispatched);
+ return true;
+}
+
+template <typename I>
+bool ObjectDispatch<I>::write(
+ const std::string &oid, uint64_t object_no, uint64_t object_off,
+ ceph::bufferlist&& data, const ::SnapContext &snapc, int op_flags,
+ const ZTracer::Trace &parent_trace, int* object_dispatch_flags,
+ uint64_t* journal_tid, io::DispatchResult* dispatch_result,
+ Context** on_finish, Context* on_dispatched) {
+ if (*journal_tid == 0) {
+ // non-journaled IO
+ return false;
+ }
+
+ auto cct = m_image_ctx->cct;
+ ldout(cct, 20) << oid << " " << object_off << "~" << data.length() << dendl;
+
+ *on_finish = new C_CommitIOEvent<I>(m_image_ctx, m_journal, object_no,
+ object_off, data.length(), *journal_tid,
+ *object_dispatch_flags, *on_finish);
+
+ *dispatch_result = io::DISPATCH_RESULT_CONTINUE;
+ wait_or_flush_event(*journal_tid, *object_dispatch_flags, on_dispatched);
+ return true;
+}
+
+template <typename I>
+bool ObjectDispatch<I>::write_same(
+ const std::string &oid, uint64_t object_no, uint64_t object_off,
+ uint64_t object_len, io::Extents&& buffer_extents, ceph::bufferlist&& data,
+ const ::SnapContext &snapc, int op_flags,
+ const ZTracer::Trace &parent_trace, int* object_dispatch_flags,
+ uint64_t* journal_tid, io::DispatchResult* dispatch_result,
+ Context** on_finish, Context* on_dispatched) {
+ if (*journal_tid == 0) {
+ // non-journaled IO
+ return false;
+ }
+
+ auto cct = m_image_ctx->cct;
+ ldout(cct, 20) << oid << " " << object_off << "~" << object_len << dendl;
+
+ *on_finish = new C_CommitIOEvent<I>(m_image_ctx, m_journal, object_no,
+ object_off, object_len, *journal_tid,
+ *object_dispatch_flags, *on_finish);
+
+ *dispatch_result = io::DISPATCH_RESULT_CONTINUE;
+ wait_or_flush_event(*journal_tid, *object_dispatch_flags, on_dispatched);
+ return true;
+}
+
+template <typename I>
+bool ObjectDispatch<I>::compare_and_write(
+ const std::string &oid, uint64_t object_no, uint64_t object_off,
+ ceph::bufferlist&& cmp_data, ceph::bufferlist&& write_data,
+ const ::SnapContext &snapc, int op_flags,
+ const ZTracer::Trace &parent_trace, uint64_t* mismatch_offset,
+ int* object_dispatch_flags, uint64_t* journal_tid,
+ io::DispatchResult* dispatch_result, Context** on_finish,
+ Context* on_dispatched) {
+ if (*journal_tid == 0) {
+ // non-journaled IO
+ return false;
+ }
+
+ auto cct = m_image_ctx->cct;
+ ldout(cct, 20) << oid << " " << object_off << "~" << write_data.length()
+ << dendl;
+
+ *on_finish = new C_CommitIOEvent<I>(m_image_ctx, m_journal, object_no,
+ object_off, write_data.length(),
+ *journal_tid, *object_dispatch_flags,
+ *on_finish);
+
+ *dispatch_result = io::DISPATCH_RESULT_CONTINUE;
+ wait_or_flush_event(*journal_tid, *object_dispatch_flags, on_dispatched);
+ return true;
+}
+
+template <typename I>
+void ObjectDispatch<I>::extent_overwritten(
+ uint64_t object_no, uint64_t object_off, uint64_t object_len,
+ uint64_t journal_tid, uint64_t new_journal_tid) {
+ auto cct = m_image_ctx->cct;
+ ldout(cct, 20) << object_no << " " << object_off << "~" << object_len
+ << dendl;
+
+ auto ctx = new C_CommitIOEvent<I>(m_image_ctx, m_journal, object_no,
+ object_off, object_len, journal_tid, false,
+ nullptr);
+ if (new_journal_tid != 0) {
+ // ensure new journal event is safely committed to disk before
+ // committing old event
+ m_journal->flush_event(new_journal_tid, ctx);
+ } else {
+ ctx->complete(0);
+ }
+}
+
+template <typename I>
+void ObjectDispatch<I>::wait_or_flush_event(
+ uint64_t journal_tid, int object_dispatch_flags, Context* on_dispatched) {
+ auto cct = m_image_ctx->cct;
+ ldout(cct, 20) << "journal_tid=" << journal_tid << dendl;
+
+ if ((object_dispatch_flags & io::OBJECT_DISPATCH_FLAG_FLUSH) != 0) {
+ m_journal->flush_event(journal_tid, on_dispatched);
+ } else {
+ m_journal->wait_event(journal_tid, on_dispatched);
+ }
+}
+
+} // namespace journal
+} // namespace librbd
+
+template class librbd::journal::ObjectDispatch<librbd::ImageCtx>;
diff --git a/src/librbd/journal/ObjectDispatch.h b/src/librbd/journal/ObjectDispatch.h
new file mode 100644
index 00000000..b5ae747e
--- /dev/null
+++ b/src/librbd/journal/ObjectDispatch.h
@@ -0,0 +1,113 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#ifndef CEPH_LIBRBD_JOURNAL_OBJECT_DISPATCH_H
+#define CEPH_LIBRBD_JOURNAL_OBJECT_DISPATCH_H
+
+#include "include/int_types.h"
+#include "include/buffer.h"
+#include "include/rados/librados.hpp"
+#include "common/snap_types.h"
+#include "common/zipkin_trace.h"
+#include "librbd/io/Types.h"
+#include "librbd/io/ObjectDispatchInterface.h"
+
+struct Context;
+
+namespace librbd {
+
+struct ImageCtx;
+template <typename> class Journal;
+
+namespace journal {
+
+template <typename ImageCtxT = librbd::ImageCtx>
+class ObjectDispatch : public io::ObjectDispatchInterface {
+public:
+ static ObjectDispatch* create(ImageCtxT* image_ctx,
+ Journal<ImageCtxT>* journal) {
+ return new ObjectDispatch(image_ctx, journal);
+ }
+
+ ObjectDispatch(ImageCtxT* image_ctx, Journal<ImageCtxT>* journal);
+
+ io::ObjectDispatchLayer get_object_dispatch_layer() const override {
+ return io::OBJECT_DISPATCH_LAYER_JOURNAL;
+ }
+
+ void shut_down(Context* on_finish) override;
+
+ bool read(
+ const std::string &oid, uint64_t object_no, uint64_t object_off,
+ uint64_t object_len, librados::snap_t snap_id, int op_flags,
+ const ZTracer::Trace &parent_trace, ceph::bufferlist* read_data,
+ io::ExtentMap* extent_map, int* object_dispatch_flags,
+ io::DispatchResult* dispatch_result, Context** on_finish,
+ Context* on_dispatched) {
+ return false;
+ }
+
+ bool discard(
+ const std::string &oid, uint64_t object_no, uint64_t object_off,
+ uint64_t object_len, const ::SnapContext &snapc, int discard_flags,
+ const ZTracer::Trace &parent_trace, int* object_dispatch_flags,
+ uint64_t* journal_tid, io::DispatchResult* dispatch_result,
+ Context** on_finish, Context* on_dispatched) override;
+
+ bool write(
+ const std::string &oid, uint64_t object_no, uint64_t object_off,
+ ceph::bufferlist&& data, const ::SnapContext &snapc, int op_flags,
+ const ZTracer::Trace &parent_trace, int* object_dispatch_flags,
+ uint64_t* journal_tid, io::DispatchResult* dispatch_result,
+ Context** on_finish, Context* on_dispatched) override;
+
+ bool write_same(
+ const std::string &oid, uint64_t object_no, uint64_t object_off,
+ uint64_t object_len, io::Extents&& buffer_extents,
+ ceph::bufferlist&& data, const ::SnapContext &snapc, int op_flags,
+ const ZTracer::Trace &parent_trace, int* object_dispatch_flags,
+ uint64_t* journal_tid, io::DispatchResult* dispatch_result,
+ Context** on_finish, Context* on_dispatched) override;
+
+ bool compare_and_write(
+ const std::string &oid, uint64_t object_no, uint64_t object_off,
+ ceph::bufferlist&& cmp_data, ceph::bufferlist&& write_data,
+ const ::SnapContext &snapc, int op_flags,
+ const ZTracer::Trace &parent_trace, uint64_t* mismatch_offset,
+ int* object_dispatch_flags, uint64_t* journal_tid,
+ io::DispatchResult* dispatch_result, Context** on_finish,
+ Context* on_dispatched) override;
+
+ bool flush(
+ io::FlushSource flush_source, const ZTracer::Trace &parent_trace,
+ io::DispatchResult* dispatch_result, Context** on_finish,
+ Context* on_dispatched) override {
+ return false;
+ }
+
+ bool invalidate_cache(Context* on_finish) override {
+ return false;
+ }
+ bool reset_existence_cache(Context* on_finish) override {
+ return false;
+ }
+
+ void extent_overwritten(
+ uint64_t object_no, uint64_t object_off, uint64_t object_len,
+ uint64_t journal_tid, uint64_t new_journal_tid) override;
+
+private:
+ ImageCtxT* m_image_ctx;
+ Journal<ImageCtxT>* m_journal;
+
+ void wait_or_flush_event(uint64_t journal_tid, int object_dispatch_flags,
+ Context* on_dispatched);
+
+};
+
+} // namespace journal
+} // namespace librbd
+
+extern template class librbd::journal::ObjectDispatch<librbd::ImageCtx>;
+
+#endif // CEPH_LIBRBD_JOURNAL_OBJECT_DISPATCH_H
diff --git a/src/librbd/journal/OpenRequest.cc b/src/librbd/journal/OpenRequest.cc
new file mode 100644
index 00000000..a5178de8
--- /dev/null
+++ b/src/librbd/journal/OpenRequest.cc
@@ -0,0 +1,144 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include "librbd/journal/OpenRequest.h"
+#include "common/dout.h"
+#include "common/errno.h"
+#include "common/WorkQueue.h"
+#include "journal/Journaler.h"
+#include "librbd/ImageCtx.h"
+#include "librbd/Journal.h"
+#include "librbd/Utils.h"
+#include "librbd/journal/Types.h"
+#include "librbd/journal/Utils.h"
+
+#define dout_subsys ceph_subsys_rbd
+#undef dout_prefix
+#define dout_prefix *_dout << "librbd::journal::OpenRequest: " << this << " " \
+ << __func__ << ": "
+
+namespace librbd {
+namespace journal {
+
+using librbd::util::create_async_context_callback;
+using librbd::util::create_context_callback;
+using util::C_DecodeTags;
+
+template <typename I>
+OpenRequest<I>::OpenRequest(I *image_ctx, Journaler *journaler, Mutex *lock,
+ journal::ImageClientMeta *client_meta,
+ uint64_t *tag_tid, journal::TagData *tag_data,
+ Context *on_finish)
+ : m_image_ctx(image_ctx), m_journaler(journaler), m_lock(lock),
+ m_client_meta(client_meta), m_tag_tid(tag_tid), m_tag_data(tag_data),
+ m_on_finish(on_finish) {
+}
+
+template <typename I>
+void OpenRequest<I>::send() {
+ send_init();
+}
+
+template <typename I>
+void OpenRequest<I>::send_init() {
+ CephContext *cct = m_image_ctx->cct;
+ ldout(cct, 20) << dendl;
+
+ m_journaler->init(create_async_context_callback(
+ *m_image_ctx, create_context_callback<
+ OpenRequest<I>, &OpenRequest<I>::handle_init>(this)));
+}
+
+template <typename I>
+void OpenRequest<I>::handle_init(int r) {
+ CephContext *cct = m_image_ctx->cct;
+ ldout(cct, 20) << "r=" << r << dendl;
+
+ if (r < 0) {
+ lderr(cct) << "failed to initialize journal: " << cpp_strerror(r)
+ << dendl;
+ finish(r);
+ return;
+ }
+
+ // locate the master image client record
+ cls::journal::Client client;
+ r = m_journaler->get_cached_client(Journal<ImageCtx>::IMAGE_CLIENT_ID,
+ &client);
+ if (r < 0) {
+ lderr(cct) << "failed to locate master image client" << dendl;
+ finish(r);
+ return;
+ }
+
+ librbd::journal::ClientData client_data;
+ auto bl = client.data.cbegin();
+ try {
+ decode(client_data, bl);
+ } catch (const buffer::error &err) {
+ lderr(cct) << "failed to decode client meta data: " << err.what()
+ << dendl;
+ finish(-EINVAL);
+ return;
+ }
+
+ journal::ImageClientMeta *image_client_meta =
+ boost::get<journal::ImageClientMeta>(&client_data.client_meta);
+ if (image_client_meta == nullptr) {
+ lderr(cct) << this << " " << __func__ << ": "
+ << "failed to extract client meta data" << dendl;
+ finish(-EINVAL);
+ return;
+ }
+
+ ldout(cct, 20) << this << " " << __func__ << ": "
+ << "client: " << client << ", "
+ << "image meta: " << *image_client_meta << dendl;
+
+ m_tag_class = image_client_meta->tag_class;
+ {
+ Mutex::Locker locker(*m_lock);
+ *m_client_meta = *image_client_meta;
+ }
+
+ send_get_tags();
+}
+
+template <typename I>
+void OpenRequest<I>::send_get_tags() {
+ CephContext *cct = m_image_ctx->cct;
+ ldout(cct, 20) << dendl;
+
+ C_DecodeTags *tags_ctx = new C_DecodeTags(
+ cct, m_lock, m_tag_tid, m_tag_data, create_async_context_callback(
+ *m_image_ctx, create_context_callback<
+ OpenRequest<I>, &OpenRequest<I>::handle_get_tags>(this)));
+ m_journaler->get_tags(m_tag_class, &tags_ctx->tags, tags_ctx);
+}
+
+template <typename I>
+void OpenRequest<I>::handle_get_tags(int r) {
+ CephContext *cct = m_image_ctx->cct;
+ ldout(cct, 20) << "r=" << r << dendl;
+
+ if (r < 0) {
+ lderr(cct) << this << " " << __func__ << ": "
+ << "failed to decode journal tags: " << cpp_strerror(r) << dendl;
+ }
+
+ finish(r);
+}
+
+template <typename I>
+void OpenRequest<I>::finish(int r) {
+ CephContext *cct = m_image_ctx->cct;
+ ldout(cct, 20) << "r=" << r << dendl;
+
+ m_on_finish->complete(r);
+ delete this;
+}
+
+} // namespace journal
+} // namespace librbd
+
+template class librbd::journal::OpenRequest<librbd::ImageCtx>;
diff --git a/src/librbd/journal/OpenRequest.h b/src/librbd/journal/OpenRequest.h
new file mode 100644
index 00000000..f71d8c53
--- /dev/null
+++ b/src/librbd/journal/OpenRequest.h
@@ -0,0 +1,85 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#ifndef CEPH_LIBRBD_JOURNAL_OPEN_REQUEST_H
+#define CEPH_LIBRBD_JOURNAL_OPEN_REQUEST_H
+
+#include "include/int_types.h"
+#include "librbd/journal/TypeTraits.h"
+
+struct Context;
+struct Mutex;
+
+namespace librbd {
+
+struct ImageCtx;
+
+namespace journal {
+
+struct ImageClientMeta;
+struct TagData;
+
+template <typename ImageCtxT = ImageCtx>
+class OpenRequest {
+public:
+ typedef typename TypeTraits<ImageCtxT>::Journaler Journaler;
+
+ static OpenRequest* create(ImageCtxT *image_ctx, Journaler *journaler,
+ Mutex *lock, journal::ImageClientMeta *client_meta,
+ uint64_t *tag_tid, journal::TagData *tag_data,
+ Context *on_finish) {
+ return new OpenRequest(image_ctx, journaler, lock, client_meta, tag_tid,
+ tag_data, on_finish);
+ }
+
+ OpenRequest(ImageCtxT *image_ctx, Journaler *journaler, Mutex *lock,
+ journal::ImageClientMeta *client_meta, uint64_t *tag_tid,
+ journal::TagData *tag_data, Context *on_finish);
+
+ void send();
+
+private:
+ /**
+ * @verbatim
+ *
+ * <start>
+ * |
+ * v
+ * INIT
+ * |
+ * v
+ * GET_TAGS
+ * |
+ * v
+ * <finish>
+ *
+ * @endverbatim
+ */
+
+
+ ImageCtxT *m_image_ctx;
+ Journaler *m_journaler;
+ Mutex *m_lock;
+ journal::ImageClientMeta *m_client_meta;
+ uint64_t *m_tag_tid;
+ journal::TagData *m_tag_data;
+ Context *m_on_finish;
+
+ uint64_t m_tag_class = 0;
+
+ void send_init();
+ void handle_init(int r);
+
+ void send_get_tags();
+ void handle_get_tags(int r);
+
+ void finish(int r);
+
+};
+
+} // namespace journal
+} // namespace librbd
+
+extern template class librbd::journal::OpenRequest<librbd::ImageCtx>;
+
+#endif // CEPH_LIBRBD_JOURNAL_OPEN_REQUEST_H
diff --git a/src/librbd/journal/Policy.h b/src/librbd/journal/Policy.h
new file mode 100644
index 00000000..1ced3c53
--- /dev/null
+++ b/src/librbd/journal/Policy.h
@@ -0,0 +1,25 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#ifndef CEPH_LIBRBD_JOURNAL_POLICY_H
+#define CEPH_LIBRBD_JOURNAL_POLICY_H
+
+class Context;
+
+namespace librbd {
+
+namespace journal {
+
+struct Policy {
+ virtual ~Policy() {
+ }
+
+ virtual bool append_disabled() const = 0;
+ virtual bool journal_disabled() const = 0;
+ virtual void allocate_tag_on_lock(Context *on_finish) = 0;
+};
+
+} // namespace journal
+} // namespace librbd
+
+#endif // CEPH_LIBRBD_JOURNAL_POLICY_H
diff --git a/src/librbd/journal/PromoteRequest.cc b/src/librbd/journal/PromoteRequest.cc
new file mode 100644
index 00000000..17f2957e
--- /dev/null
+++ b/src/librbd/journal/PromoteRequest.cc
@@ -0,0 +1,237 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include "librbd/journal/PromoteRequest.h"
+#include "common/dout.h"
+#include "common/errno.h"
+#include "common/WorkQueue.h"
+#include "journal/Journaler.h"
+#include "journal/Settings.h"
+#include "librbd/ImageCtx.h"
+#include "librbd/Journal.h"
+#include "librbd/Utils.h"
+#include "librbd/journal/OpenRequest.h"
+
+#define dout_subsys ceph_subsys_rbd
+#undef dout_prefix
+#define dout_prefix *_dout << "librbd::journal::PromoteRequest: " << this \
+ << " " << __func__ << ": "
+
+namespace librbd {
+namespace journal {
+
+using librbd::util::create_async_context_callback;
+using librbd::util::create_context_callback;
+
+template <typename I>
+PromoteRequest<I>::PromoteRequest(I *image_ctx, bool force, Context *on_finish)
+ : m_image_ctx(image_ctx), m_force(force), m_on_finish(on_finish),
+ m_lock("PromoteRequest::m_lock") {
+}
+
+template <typename I>
+void PromoteRequest<I>::send() {
+ send_open();
+}
+
+template <typename I>
+void PromoteRequest<I>::send_open() {
+ CephContext *cct = m_image_ctx->cct;
+ ldout(cct, 20) << dendl;
+
+ m_journaler = new Journaler(m_image_ctx->md_ctx, m_image_ctx->id,
+ Journal<>::IMAGE_CLIENT_ID, {});
+ Context *ctx = create_async_context_callback(
+ *m_image_ctx, create_context_callback<
+ PromoteRequest<I>, &PromoteRequest<I>::handle_open>(this));
+ auto open_req = OpenRequest<I>::create(m_image_ctx, m_journaler,
+ &m_lock, &m_client_meta,
+ &m_tag_tid, &m_tag_data, ctx);
+ open_req->send();
+}
+
+template <typename I>
+void PromoteRequest<I>::handle_open(int r) {
+ CephContext *cct = m_image_ctx->cct;
+ ldout(cct, 20) << "r=" << r << dendl;
+
+ if (r < 0) {
+ m_ret_val = r;
+ lderr(cct) << "failed to open journal: " << cpp_strerror(r) << dendl;
+ shut_down();
+ return;
+ }
+
+ allocate_tag();
+}
+
+template <typename I>
+void PromoteRequest<I>::allocate_tag() {
+ CephContext *cct = m_image_ctx->cct;
+ ldout(cct, 20) << dendl;
+
+ journal::TagPredecessor predecessor;
+ if (!m_force && m_tag_data.mirror_uuid == Journal<>::ORPHAN_MIRROR_UUID) {
+ // orderly promotion -- demotion epoch will have a single entry
+ // so link to our predecessor (demotion) epoch
+ predecessor = TagPredecessor{Journal<>::ORPHAN_MIRROR_UUID, true, m_tag_tid,
+ 1};
+ } else {
+ // forced promotion -- create an epoch no peers can link against
+ predecessor = TagPredecessor{Journal<>::LOCAL_MIRROR_UUID, true, m_tag_tid,
+ 0};
+ }
+
+ TagData tag_data;
+ tag_data.mirror_uuid = Journal<>::LOCAL_MIRROR_UUID;
+ tag_data.predecessor = predecessor;
+
+ bufferlist tag_bl;
+ encode(tag_data, tag_bl);
+
+ Context *ctx = create_context_callback<
+ PromoteRequest<I>, &PromoteRequest<I>::handle_allocate_tag>(this);
+ m_journaler->allocate_tag(m_client_meta.tag_class, tag_bl, &m_tag, ctx);
+}
+
+template <typename I>
+void PromoteRequest<I>::handle_allocate_tag(int r) {
+ CephContext *cct = m_image_ctx->cct;
+ ldout(cct, 20) << "r=" << r << dendl;
+
+ if (r < 0) {
+ m_ret_val = r;
+ lderr(cct) << "failed to allocate tag: " << cpp_strerror(r) << dendl;
+ shut_down();
+ return;
+ }
+
+ m_tag_tid = m_tag.tid;
+ append_event();
+}
+
+template <typename I>
+void PromoteRequest<I>::append_event() {
+ CephContext *cct = m_image_ctx->cct;
+ ldout(cct, 20) << dendl;
+
+ EventEntry event_entry{DemotePromoteEvent{}, {}};
+ bufferlist event_entry_bl;
+ encode(event_entry, event_entry_bl);
+
+ m_journaler->start_append(0);
+ m_future = m_journaler->append(m_tag_tid, event_entry_bl);
+
+ auto ctx = create_context_callback<
+ PromoteRequest<I>, &PromoteRequest<I>::handle_append_event>(this);
+ m_future.flush(ctx);
+}
+
+template <typename I>
+void PromoteRequest<I>::handle_append_event(int r) {
+ CephContext *cct = m_image_ctx->cct;
+ ldout(cct, 20) << "r=" << r << dendl;
+
+ if (r < 0) {
+ m_ret_val = r;
+ lderr(cct) << "failed to append promotion journal event: "
+ << cpp_strerror(r) << dendl;
+ stop_append();
+ return;
+ }
+
+ commit_event();
+}
+
+template <typename I>
+void PromoteRequest<I>::commit_event() {
+ CephContext *cct = m_image_ctx->cct;
+ ldout(cct, 20) << dendl;
+
+ m_journaler->committed(m_future);
+
+ auto ctx = create_context_callback<
+ PromoteRequest<I>, &PromoteRequest<I>::handle_commit_event>(this);
+ m_journaler->flush_commit_position(ctx);
+}
+
+template <typename I>
+void PromoteRequest<I>::handle_commit_event(int r) {
+ CephContext *cct = m_image_ctx->cct;
+ ldout(cct, 20) << "r=" << r << dendl;
+
+ if (r < 0) {
+ m_ret_val = r;
+ lderr(cct) << "failed to flush promote commit position: "
+ << cpp_strerror(r) << dendl;
+ }
+
+ stop_append();
+}
+
+template <typename I>
+void PromoteRequest<I>::stop_append() {
+ CephContext *cct = m_image_ctx->cct;
+ ldout(cct, 20) << dendl;
+
+ auto ctx = create_context_callback<
+ PromoteRequest<I>, &PromoteRequest<I>::handle_stop_append>(this);
+ m_journaler->stop_append(ctx);
+}
+
+template <typename I>
+void PromoteRequest<I>::handle_stop_append(int r) {
+ CephContext *cct = m_image_ctx->cct;
+ ldout(cct, 20) << "r=" << r << dendl;
+
+ if (r < 0) {
+ if (m_ret_val == 0) {
+ m_ret_val = r;
+ }
+ lderr(cct) << "failed to stop journal append: " << cpp_strerror(r) << dendl;
+ }
+
+ shut_down();
+}
+
+template <typename I>
+void PromoteRequest<I>::shut_down() {
+ CephContext *cct = m_image_ctx->cct;
+ ldout(cct, 20) << dendl;
+
+ Context *ctx = create_async_context_callback(
+ *m_image_ctx, create_context_callback<
+ PromoteRequest<I>, &PromoteRequest<I>::handle_shut_down>(this));
+ m_journaler->shut_down(ctx);
+}
+
+template <typename I>
+void PromoteRequest<I>::handle_shut_down(int r) {
+ CephContext *cct = m_image_ctx->cct;
+ ldout(cct, 20) << "r=" << r << dendl;
+
+ if (r < 0) {
+ lderr(cct) << "failed to shut down journal: " << cpp_strerror(r) << dendl;
+ }
+
+ delete m_journaler;
+ finish(r);
+}
+
+template <typename I>
+void PromoteRequest<I>::finish(int r) {
+ if (m_ret_val < 0) {
+ r = m_ret_val;
+ }
+
+ CephContext *cct = m_image_ctx->cct;
+ ldout(cct, 20) << "r=" << r << dendl;
+
+ m_on_finish->complete(r);
+ delete this;
+}
+
+} // namespace journal
+} // namespace librbd
+
+template class librbd::journal::PromoteRequest<librbd::ImageCtx>;
diff --git a/src/librbd/journal/PromoteRequest.h b/src/librbd/journal/PromoteRequest.h
new file mode 100644
index 00000000..0d01f596
--- /dev/null
+++ b/src/librbd/journal/PromoteRequest.h
@@ -0,0 +1,109 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#ifndef CEPH_LIBRBD_JOURNAL_PROMOTE_REQUEST_H
+#define CEPH_LIBRBD_JOURNAL_PROMOTE_REQUEST_H
+
+#include "include/int_types.h"
+#include "common/Mutex.h"
+#include "cls/journal/cls_journal_types.h"
+#include "journal/Future.h"
+#include "librbd/journal/Types.h"
+#include "librbd/journal/TypeTraits.h"
+
+struct Context;
+
+namespace librbd {
+
+struct ImageCtx;
+
+namespace journal {
+
+template <typename ImageCtxT = ImageCtx>
+class PromoteRequest {
+public:
+ static PromoteRequest* create(ImageCtxT *image_ctx, bool force,
+ Context *on_finish) {
+ return new PromoteRequest(image_ctx, force, on_finish);
+ }
+
+ PromoteRequest(ImageCtxT *image_ctx, bool force, Context *on_finish);
+
+ void send();
+
+private:
+ /**
+ * @verbatim
+ *
+ * <start>
+ * |
+ * v
+ * OPEN * * * * * * * * * *
+ * | *
+ * v *
+ * ALLOCATE_TAG * * * * * *
+ * | *
+ * v *
+ * APPEND_EVENT * * * *
+ * | * *
+ * v * *
+ * COMMIT_EVENT * *
+ * | * *
+ * v * *
+ * STOP_APPEND <* * * *
+ * | *
+ * v *
+ * SHUT_DOWN <* * * * * * *
+ * |
+ * v
+ * <finish>
+ *
+ * @endverbatim
+ */
+
+ typedef typename TypeTraits<ImageCtxT>::Journaler Journaler;
+ typedef typename TypeTraits<ImageCtxT>::Future Future;
+
+ ImageCtxT *m_image_ctx;
+ bool m_force;
+ Context *m_on_finish;
+
+ Journaler *m_journaler = nullptr;
+ int m_ret_val = 0;
+
+ Mutex m_lock;
+ ImageClientMeta m_client_meta;
+ uint64_t m_tag_tid = 0;
+ TagData m_tag_data;
+
+ cls::journal::Tag m_tag;
+ Future m_future;
+
+ void send_open();
+ void handle_open(int r);
+
+ void allocate_tag();
+ void handle_allocate_tag(int r);
+
+ void append_event();
+ void handle_append_event(int r);
+
+ void commit_event();
+ void handle_commit_event(int r);
+
+ void stop_append();
+ void handle_stop_append(int r);
+
+ void shut_down();
+ void handle_shut_down(int r);
+
+ void finish(int r);
+
+};
+
+} // namespace journal
+} // namespace librbd
+
+extern template class librbd::journal::PromoteRequest<librbd::ImageCtx>;
+
+#endif // CEPH_LIBRBD_JOURNAL_PROMOTE_REQUEST_H
diff --git a/src/librbd/journal/RemoveRequest.cc b/src/librbd/journal/RemoveRequest.cc
new file mode 100644
index 00000000..6ed5f54c
--- /dev/null
+++ b/src/librbd/journal/RemoveRequest.cc
@@ -0,0 +1,154 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include "common/dout.h"
+#include "common/errno.h"
+#include "common/Timer.h"
+#include "common/WorkQueue.h"
+#include "journal/Settings.h"
+#include "include/ceph_assert.h"
+#include "librbd/Utils.h"
+#include "librbd/journal/RemoveRequest.h"
+
+#define dout_subsys ceph_subsys_rbd
+#undef dout_prefix
+#define dout_prefix *_dout << "librbd::Journal::RemoveRequest: "
+
+namespace librbd {
+
+using util::create_context_callback;
+
+namespace journal {
+
+template<typename I>
+RemoveRequest<I>::RemoveRequest(IoCtx &ioctx, const std::string &image_id,
+ const std::string &client_id,
+ ContextWQ *op_work_queue,
+ Context *on_finish)
+ : m_ioctx(ioctx), m_image_id(image_id), m_image_client_id(client_id),
+ m_op_work_queue(op_work_queue), m_on_finish(on_finish) {
+ m_cct = reinterpret_cast<CephContext *>(m_ioctx.cct());
+}
+
+template<typename I>
+void RemoveRequest<I>::send() {
+ ldout(m_cct, 20) << this << " " << __func__ << dendl;
+
+ stat_journal();
+}
+
+template<typename I>
+void RemoveRequest<I>::stat_journal() {
+ ldout(m_cct, 20) << this << " " << __func__ << dendl;
+
+ ImageCtx::get_timer_instance(m_cct, &m_timer, &m_timer_lock);
+ m_journaler = new Journaler(m_op_work_queue, m_timer, m_timer_lock,
+ m_ioctx, m_image_id, m_image_client_id, {});
+
+ using klass = RemoveRequest<I>;
+ Context *ctx = create_context_callback<klass, &klass::handle_stat_journal>(this);
+
+ m_journaler->exists(ctx);
+}
+
+template<typename I>
+Context *RemoveRequest<I>::handle_stat_journal(int *result) {
+ ldout(m_cct, 20) << __func__ << ": r=" << *result << dendl;
+
+ if ((*result < 0) && (*result != -ENOENT)) {
+ lderr(m_cct) << "failed to stat journal header: " << cpp_strerror(*result) << dendl;
+ shut_down_journaler(*result);
+ return nullptr;
+ }
+
+ if (*result == -ENOENT) {
+ shut_down_journaler(0);
+ return nullptr;
+ }
+
+ init_journaler();
+ return nullptr;
+}
+
+template<typename I>
+void RemoveRequest<I>::init_journaler() {
+ ldout(m_cct, 20) << this << " " << __func__ << dendl;
+
+ using klass = RemoveRequest<I>;
+ Context *ctx = create_context_callback<klass, &klass::handle_init_journaler>(this);
+
+ m_journaler->init(ctx);
+}
+
+template<typename I>
+Context *RemoveRequest<I>::handle_init_journaler(int *result) {
+ ldout(m_cct, 20) << __func__ << ": r=" << *result << dendl;
+
+ if ((*result < 0) && (*result != -ENOENT)) {
+ lderr(m_cct) << "failed to init journaler: " << cpp_strerror(*result) << dendl;
+ shut_down_journaler(*result);
+ return nullptr;
+ }
+
+ remove_journal();
+ return nullptr;
+}
+
+template<typename I>
+void RemoveRequest<I>::remove_journal() {
+ ldout(m_cct, 20) << this << " " << __func__ << dendl;
+
+ using klass = RemoveRequest<I>;
+ Context *ctx = create_context_callback<klass, &klass::handle_remove_journal>(this);
+
+ m_journaler->remove(true, ctx);
+}
+
+template<typename I>
+Context *RemoveRequest<I>::handle_remove_journal(int *result) {
+ ldout(m_cct, 20) << __func__ << ": r=" << *result << dendl;
+
+ if (*result < 0) {
+ lderr(m_cct) << "failed to remove journal: " << cpp_strerror(*result) << dendl;
+ }
+
+ shut_down_journaler(*result);
+ return nullptr;
+}
+
+template<typename I>
+void RemoveRequest<I>::shut_down_journaler(int r) {
+ ldout(m_cct, 20) << this << " " << __func__ << dendl;
+
+ m_r_saved = r;
+
+ using klass = RemoveRequest<I>;
+ Context *ctx = create_context_callback<klass, &klass::handle_journaler_shutdown>(this);
+
+ m_journaler->shut_down(ctx);
+}
+
+template<typename I>
+Context *RemoveRequest<I>::handle_journaler_shutdown(int *result) {
+ ldout(m_cct, 20) << __func__ << ": r=" << *result << dendl;
+
+ if (*result < 0) {
+ lderr(m_cct) << "failed to shut down journaler: " << cpp_strerror(*result) << dendl;
+ }
+
+ delete m_journaler;
+
+ if (m_r_saved == 0) {
+ ldout(m_cct, 20) << "done." << dendl;
+ }
+
+ m_on_finish->complete(m_r_saved);
+ delete this;
+
+ return nullptr;
+}
+
+} // namespace journal
+} // namespace librbd
+
+template class librbd::journal::RemoveRequest<librbd::ImageCtx>;
diff --git a/src/librbd/journal/RemoveRequest.h b/src/librbd/journal/RemoveRequest.h
new file mode 100644
index 00000000..594e62d5
--- /dev/null
+++ b/src/librbd/journal/RemoveRequest.h
@@ -0,0 +1,82 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#ifndef CEPH_LIBRBD_JOURNAL_REMOVE_REQUEST_H
+#define CEPH_LIBRBD_JOURNAL_REMOVE_REQUEST_H
+
+#include "include/int_types.h"
+#include "include/buffer.h"
+#include "include/rados/librados.hpp"
+#include "include/rbd/librbd.hpp"
+#include "common/Mutex.h"
+#include "librbd/ImageCtx.h"
+#include "journal/Journaler.h"
+#include "librbd/journal/TypeTraits.h"
+
+using librados::IoCtx;
+using journal::Journaler;
+
+class Context;
+class ContextWQ;
+class SafeTimer;
+
+namespace journal {
+ class Journaler;
+}
+
+namespace librbd {
+
+class ImageCtx;
+
+namespace journal {
+
+template<typename ImageCtxT = ImageCtx>
+class RemoveRequest {
+public:
+ static RemoveRequest *create(IoCtx &ioctx, const std::string &image_id,
+ const std::string &client_id,
+ ContextWQ *op_work_queue, Context *on_finish) {
+ return new RemoveRequest(ioctx, image_id, client_id,
+ op_work_queue, on_finish);
+ }
+
+ void send();
+
+private:
+ typedef typename TypeTraits<ImageCtxT>::Journaler Journaler;
+
+ RemoveRequest(IoCtx &ioctx, const std::string &image_id,
+ const std::string &client_id,
+ ContextWQ *op_work_queue, Context *on_finish);
+
+ IoCtx &m_ioctx;
+ std::string m_image_id;
+ std::string m_image_client_id;
+ ContextWQ *m_op_work_queue;
+ Context *m_on_finish;
+
+ CephContext *m_cct;
+ Journaler *m_journaler;
+ SafeTimer *m_timer;
+ Mutex *m_timer_lock;
+ int m_r_saved;
+
+ void stat_journal();
+ Context *handle_stat_journal(int *result);
+
+ void init_journaler();
+ Context *handle_init_journaler(int *result);
+
+ void remove_journal();
+ Context *handle_remove_journal(int *result);
+
+ void shut_down_journaler(int r);
+ Context *handle_journaler_shutdown(int *result);
+};
+
+} // namespace journal
+} // namespace librbd
+
+extern template class librbd::journal::RemoveRequest<librbd::ImageCtx>;
+
+#endif // CEPH_LIBRBD_JOURNAL_REMOVE_REQUEST_H
diff --git a/src/librbd/journal/Replay.cc b/src/librbd/journal/Replay.cc
new file mode 100644
index 00000000..f96153e7
--- /dev/null
+++ b/src/librbd/journal/Replay.cc
@@ -0,0 +1,1190 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include "librbd/journal/Replay.h"
+#include "common/dout.h"
+#include "common/errno.h"
+#include "common/WorkQueue.h"
+#include "librbd/ExclusiveLock.h"
+#include "librbd/ImageCtx.h"
+#include "librbd/ImageState.h"
+#include "librbd/internal.h"
+#include "librbd/Operations.h"
+#include "librbd/Utils.h"
+#include "librbd/io/AioCompletion.h"
+#include "librbd/io/ImageRequest.h"
+
+#define dout_subsys ceph_subsys_rbd
+#undef dout_prefix
+#define dout_prefix *_dout << "librbd::journal::Replay: " << this << " "
+
+namespace librbd {
+namespace journal {
+
+namespace {
+
+static const uint64_t IN_FLIGHT_IO_LOW_WATER_MARK(32);
+static const uint64_t IN_FLIGHT_IO_HIGH_WATER_MARK(64);
+
+static NoOpProgressContext no_op_progress_callback;
+
+template <typename I, typename E>
+struct ExecuteOp : public Context {
+ I &image_ctx;
+ E event;
+ Context *on_op_complete;
+
+ ExecuteOp(I &image_ctx, const E &event, Context *on_op_complete)
+ : image_ctx(image_ctx), event(event), on_op_complete(on_op_complete) {
+ }
+
+ void execute(const journal::SnapCreateEvent &_) {
+ image_ctx.operations->execute_snap_create(event.snap_namespace,
+ event.snap_name,
+ on_op_complete,
+ event.op_tid, false);
+ }
+
+ void execute(const journal::SnapRemoveEvent &_) {
+ image_ctx.operations->execute_snap_remove(event.snap_namespace,
+ event.snap_name,
+ on_op_complete);
+ }
+
+ void execute(const journal::SnapRenameEvent &_) {
+ image_ctx.operations->execute_snap_rename(event.snap_id,
+ event.dst_snap_name,
+ on_op_complete);
+ }
+
+ void execute(const journal::SnapProtectEvent &_) {
+ image_ctx.operations->execute_snap_protect(event.snap_namespace,
+ event.snap_name,
+ on_op_complete);
+ }
+
+ void execute(const journal::SnapUnprotectEvent &_) {
+ image_ctx.operations->execute_snap_unprotect(event.snap_namespace,
+ event.snap_name,
+ on_op_complete);
+ }
+
+ void execute(const journal::SnapRollbackEvent &_) {
+ image_ctx.operations->execute_snap_rollback(event.snap_namespace,
+ event.snap_name,
+ no_op_progress_callback,
+ on_op_complete);
+ }
+
+ void execute(const journal::RenameEvent &_) {
+ image_ctx.operations->execute_rename(event.image_name,
+ on_op_complete);
+ }
+
+ void execute(const journal::ResizeEvent &_) {
+ image_ctx.operations->execute_resize(event.size, true, no_op_progress_callback,
+ on_op_complete, event.op_tid);
+ }
+
+ void execute(const journal::FlattenEvent &_) {
+ image_ctx.operations->execute_flatten(no_op_progress_callback,
+ on_op_complete);
+ }
+
+ void execute(const journal::SnapLimitEvent &_) {
+ image_ctx.operations->execute_snap_set_limit(event.limit, on_op_complete);
+ }
+
+ void execute(const journal::UpdateFeaturesEvent &_) {
+ image_ctx.operations->execute_update_features(event.features, event.enabled,
+ on_op_complete, event.op_tid);
+ }
+
+ void execute(const journal::MetadataSetEvent &_) {
+ image_ctx.operations->execute_metadata_set(event.key, event.value,
+ on_op_complete);
+ }
+
+ void execute(const journal::MetadataRemoveEvent &_) {
+ image_ctx.operations->execute_metadata_remove(event.key, on_op_complete);
+ }
+
+ void finish(int r) override {
+ CephContext *cct = image_ctx.cct;
+ if (r < 0) {
+ lderr(cct) << ": ExecuteOp::" << __func__ << ": r=" << r << dendl;
+ on_op_complete->complete(r);
+ return;
+ }
+
+ ldout(cct, 20) << ": ExecuteOp::" << __func__ << dendl;
+ RWLock::RLocker owner_locker(image_ctx.owner_lock);
+
+ if (image_ctx.exclusive_lock == nullptr ||
+ !image_ctx.exclusive_lock->accept_ops()) {
+ ldout(cct, 5) << ": lost exclusive lock -- skipping op" << dendl;
+ on_op_complete->complete(-ECANCELED);
+ return;
+ }
+
+ execute(event);
+ }
+};
+
+template <typename I>
+struct C_RefreshIfRequired : public Context {
+ I &image_ctx;
+ Context *on_finish;
+
+ C_RefreshIfRequired(I &image_ctx, Context *on_finish)
+ : image_ctx(image_ctx), on_finish(on_finish) {
+ }
+ ~C_RefreshIfRequired() override {
+ delete on_finish;
+ }
+
+ void finish(int r) override {
+ CephContext *cct = image_ctx.cct;
+ Context *ctx = on_finish;
+ on_finish = nullptr;
+
+ if (r < 0) {
+ lderr(cct) << ": C_RefreshIfRequired::" << __func__ << ": r=" << r << dendl;
+ image_ctx.op_work_queue->queue(ctx, r);
+ return;
+ }
+
+ if (image_ctx.state->is_refresh_required()) {
+ ldout(cct, 20) << ": C_RefreshIfRequired::" << __func__ << ": "
+ << "refresh required" << dendl;
+ image_ctx.state->refresh(ctx);
+ return;
+ }
+
+ image_ctx.op_work_queue->queue(ctx, 0);
+ }
+};
+
+} // anonymous namespace
+
+#undef dout_prefix
+#define dout_prefix *_dout << "librbd::journal::Replay: " << this << " " \
+ << __func__
+
+template <typename I>
+Replay<I>::Replay(I &image_ctx)
+ : m_image_ctx(image_ctx), m_lock("Replay<I>::m_lock") {
+}
+
+template <typename I>
+Replay<I>::~Replay() {
+ ceph_assert(m_in_flight_aio_flush == 0);
+ ceph_assert(m_in_flight_aio_modify == 0);
+ ceph_assert(m_aio_modify_unsafe_contexts.empty());
+ ceph_assert(m_aio_modify_safe_contexts.empty());
+ ceph_assert(m_op_events.empty());
+ ceph_assert(m_in_flight_op_events == 0);
+}
+
+template <typename I>
+int Replay<I>::decode(bufferlist::const_iterator *it, EventEntry *event_entry) {
+ try {
+ using ceph::decode;
+ decode(*event_entry, *it);
+ } catch (const buffer::error &err) {
+ return -EBADMSG;
+ }
+ return 0;
+}
+
+template <typename I>
+void Replay<I>::process(const EventEntry &event_entry,
+ Context *on_ready, Context *on_safe) {
+ CephContext *cct = m_image_ctx.cct;
+ ldout(cct, 20) << ": on_ready=" << on_ready << ", on_safe=" << on_safe
+ << dendl;
+
+ on_ready = util::create_async_context_callback(m_image_ctx, on_ready);
+
+ RWLock::RLocker owner_lock(m_image_ctx.owner_lock);
+ if (m_image_ctx.exclusive_lock == nullptr ||
+ !m_image_ctx.exclusive_lock->accept_ops()) {
+ ldout(cct, 5) << ": lost exclusive lock -- skipping event" << dendl;
+ m_image_ctx.op_work_queue->queue(on_safe, -ECANCELED);
+ on_ready->complete(0);
+ return;
+ }
+
+ boost::apply_visitor(EventVisitor(this, on_ready, on_safe),
+ event_entry.event);
+}
+
+template <typename I>
+void Replay<I>::shut_down(bool cancel_ops, Context *on_finish) {
+ CephContext *cct = m_image_ctx.cct;
+ ldout(cct, 20) << dendl;
+
+ io::AioCompletion *flush_comp = nullptr;
+ on_finish = util::create_async_context_callback(
+ m_image_ctx, on_finish);
+
+ {
+ Mutex::Locker locker(m_lock);
+
+ // safely commit any remaining AIO modify operations
+ if ((m_in_flight_aio_flush + m_in_flight_aio_modify) != 0) {
+ flush_comp = create_aio_flush_completion(nullptr);
+ ceph_assert(flush_comp != nullptr);
+ }
+
+ for (auto &op_event_pair : m_op_events) {
+ OpEvent &op_event = op_event_pair.second;
+ if (cancel_ops) {
+ // cancel ops that are waiting to start (waiting for
+ // OpFinishEvent or waiting for ready)
+ if (op_event.on_start_ready == nullptr &&
+ op_event.on_op_finish_event != nullptr) {
+ Context *on_op_finish_event = nullptr;
+ std::swap(on_op_finish_event, op_event.on_op_finish_event);
+ m_image_ctx.op_work_queue->queue(on_op_finish_event, -ERESTART);
+ }
+ } else if (op_event.on_op_finish_event != nullptr) {
+ // start ops waiting for OpFinishEvent
+ Context *on_op_finish_event = nullptr;
+ std::swap(on_op_finish_event, op_event.on_op_finish_event);
+ m_image_ctx.op_work_queue->queue(on_op_finish_event, 0);
+ } else if (op_event.on_start_ready != nullptr) {
+ // waiting for op ready
+ op_event_pair.second.finish_on_ready = true;
+ }
+ }
+
+ ceph_assert(!m_shut_down);
+ m_shut_down = true;
+
+ ceph_assert(m_flush_ctx == nullptr);
+ if (m_in_flight_op_events > 0 || flush_comp != nullptr) {
+ std::swap(m_flush_ctx, on_finish);
+ }
+ }
+
+ // execute the following outside of lock scope
+ if (flush_comp != nullptr) {
+ RWLock::RLocker owner_locker(m_image_ctx.owner_lock);
+ io::ImageRequest<I>::aio_flush(&m_image_ctx, flush_comp,
+ io::FLUSH_SOURCE_INTERNAL, {});
+ }
+ if (on_finish != nullptr) {
+ on_finish->complete(0);
+ }
+}
+
+template <typename I>
+void Replay<I>::flush(Context *on_finish) {
+ io::AioCompletion *aio_comp;
+ {
+ Mutex::Locker locker(m_lock);
+ aio_comp = create_aio_flush_completion(
+ util::create_async_context_callback(m_image_ctx, on_finish));
+ if (aio_comp == nullptr) {
+ return;
+ }
+ }
+
+ RWLock::RLocker owner_locker(m_image_ctx.owner_lock);
+ io::ImageRequest<I>::aio_flush(&m_image_ctx, aio_comp,
+ io::FLUSH_SOURCE_INTERNAL, {});
+}
+
+template <typename I>
+void Replay<I>::replay_op_ready(uint64_t op_tid, Context *on_resume) {
+ CephContext *cct = m_image_ctx.cct;
+ ldout(cct, 20) << ": op_tid=" << op_tid << dendl;
+
+ Mutex::Locker locker(m_lock);
+ auto op_it = m_op_events.find(op_tid);
+ ceph_assert(op_it != m_op_events.end());
+
+ OpEvent &op_event = op_it->second;
+ ceph_assert(op_event.op_in_progress &&
+ op_event.on_op_finish_event == nullptr &&
+ op_event.on_finish_ready == nullptr &&
+ op_event.on_finish_safe == nullptr);
+
+ // resume processing replay events
+ Context *on_start_ready = nullptr;
+ std::swap(on_start_ready, op_event.on_start_ready);
+ on_start_ready->complete(0);
+
+ // cancel has been requested -- send error to paused state machine
+ if (!op_event.finish_on_ready && m_flush_ctx != nullptr) {
+ m_image_ctx.op_work_queue->queue(on_resume, -ERESTART);
+ return;
+ }
+
+ // resume the op state machine once the associated OpFinishEvent
+ // is processed
+ op_event.on_op_finish_event = new FunctionContext(
+ [on_resume](int r) {
+ on_resume->complete(r);
+ });
+
+ // shut down request -- don't expect OpFinishEvent
+ if (op_event.finish_on_ready) {
+ m_image_ctx.op_work_queue->queue(on_resume, 0);
+ }
+}
+
+template <typename I>
+void Replay<I>::handle_event(const journal::AioDiscardEvent &event,
+ Context *on_ready, Context *on_safe) {
+ CephContext *cct = m_image_ctx.cct;
+ ldout(cct, 20) << ": AIO discard event" << dendl;
+
+ bool flush_required;
+ auto aio_comp = create_aio_modify_completion(on_ready, on_safe,
+ io::AIO_TYPE_DISCARD,
+ &flush_required,
+ {});
+ if (aio_comp == nullptr) {
+ return;
+ }
+
+ if (!clipped_io(event.offset, aio_comp)) {
+ io::ImageRequest<I>::aio_discard(&m_image_ctx, aio_comp,
+ {{event.offset, event.length}},
+ event.discard_granularity_bytes, {});
+ }
+
+ if (flush_required) {
+ m_lock.Lock();
+ auto flush_comp = create_aio_flush_completion(nullptr);
+ m_lock.Unlock();
+
+ if (flush_comp != nullptr) {
+ io::ImageRequest<I>::aio_flush(&m_image_ctx, flush_comp,
+ io::FLUSH_SOURCE_INTERNAL, {});
+ }
+ }
+}
+
+template <typename I>
+void Replay<I>::handle_event(const journal::AioWriteEvent &event,
+ Context *on_ready, Context *on_safe) {
+ CephContext *cct = m_image_ctx.cct;
+ ldout(cct, 20) << ": AIO write event" << dendl;
+
+ bufferlist data = event.data;
+ bool flush_required;
+ auto aio_comp = create_aio_modify_completion(on_ready, on_safe,
+ io::AIO_TYPE_WRITE,
+ &flush_required,
+ {});
+ if (aio_comp == nullptr) {
+ return;
+ }
+
+ if (!clipped_io(event.offset, aio_comp)) {
+ io::ImageRequest<I>::aio_write(&m_image_ctx, aio_comp,
+ {{event.offset, event.length}},
+ std::move(data), 0, {});
+ }
+
+ if (flush_required) {
+ m_lock.Lock();
+ auto flush_comp = create_aio_flush_completion(nullptr);
+ m_lock.Unlock();
+
+ if (flush_comp != nullptr) {
+ io::ImageRequest<I>::aio_flush(&m_image_ctx, flush_comp,
+ io::FLUSH_SOURCE_INTERNAL, {});
+ }
+ }
+}
+
+template <typename I>
+void Replay<I>::handle_event(const journal::AioFlushEvent &event,
+ Context *on_ready, Context *on_safe) {
+ CephContext *cct = m_image_ctx.cct;
+ ldout(cct, 20) << ": AIO flush event" << dendl;
+
+ io::AioCompletion *aio_comp;
+ {
+ Mutex::Locker locker(m_lock);
+ aio_comp = create_aio_flush_completion(on_safe);
+ }
+
+ if (aio_comp != nullptr) {
+ io::ImageRequest<I>::aio_flush(&m_image_ctx, aio_comp,
+ io::FLUSH_SOURCE_INTERNAL, {});
+ }
+ on_ready->complete(0);
+}
+
+template <typename I>
+void Replay<I>::handle_event(const journal::AioWriteSameEvent &event,
+ Context *on_ready, Context *on_safe) {
+ CephContext *cct = m_image_ctx.cct;
+ ldout(cct, 20) << ": AIO writesame event" << dendl;
+
+ bufferlist data = event.data;
+ bool flush_required;
+ auto aio_comp = create_aio_modify_completion(on_ready, on_safe,
+ io::AIO_TYPE_WRITESAME,
+ &flush_required,
+ {});
+ if (aio_comp == nullptr) {
+ return;
+ }
+
+ if (!clipped_io(event.offset, aio_comp)) {
+ io::ImageRequest<I>::aio_writesame(&m_image_ctx, aio_comp,
+ {{event.offset, event.length}},
+ std::move(data), 0, {});
+ }
+
+ if (flush_required) {
+ m_lock.Lock();
+ auto flush_comp = create_aio_flush_completion(nullptr);
+ m_lock.Unlock();
+
+ if (flush_comp != nullptr) {
+ io::ImageRequest<I>::aio_flush(&m_image_ctx, flush_comp,
+ io::FLUSH_SOURCE_INTERNAL, {});
+ }
+ }
+}
+
+ template <typename I>
+ void Replay<I>::handle_event(const journal::AioCompareAndWriteEvent &event,
+ Context *on_ready, Context *on_safe) {
+ CephContext *cct = m_image_ctx.cct;
+ ldout(cct, 20) << ": AIO CompareAndWrite event" << dendl;
+
+ bufferlist cmp_data = event.cmp_data;
+ bufferlist write_data = event.write_data;
+ bool flush_required;
+ auto aio_comp = create_aio_modify_completion(on_ready, on_safe,
+ io::AIO_TYPE_COMPARE_AND_WRITE,
+ &flush_required,
+ {-EILSEQ});
+
+ if (!clipped_io(event.offset, aio_comp)) {
+ io::ImageRequest<I>::aio_compare_and_write(&m_image_ctx, aio_comp,
+ {{event.offset, event.length}},
+ std::move(cmp_data),
+ std::move(write_data),
+ nullptr, 0, {});
+ }
+
+ if (flush_required) {
+ m_lock.Lock();
+ auto flush_comp = create_aio_flush_completion(nullptr);
+ m_lock.Unlock();
+
+ io::ImageRequest<I>::aio_flush(&m_image_ctx, flush_comp,
+ io::FLUSH_SOURCE_INTERNAL, {});
+ }
+}
+
+template <typename I>
+void Replay<I>::handle_event(const journal::OpFinishEvent &event,
+ Context *on_ready, Context *on_safe) {
+ CephContext *cct = m_image_ctx.cct;
+ ldout(cct, 20) << ": Op finish event: "
+ << "op_tid=" << event.op_tid << dendl;
+
+ bool op_in_progress;
+ bool filter_ret_val;
+ Context *on_op_complete = nullptr;
+ Context *on_op_finish_event = nullptr;
+ {
+ Mutex::Locker locker(m_lock);
+ auto op_it = m_op_events.find(event.op_tid);
+ if (op_it == m_op_events.end()) {
+ ldout(cct, 10) << ": unable to locate associated op: assuming previously "
+ << "committed." << dendl;
+ on_ready->complete(0);
+ m_image_ctx.op_work_queue->queue(on_safe, 0);
+ return;
+ }
+
+ OpEvent &op_event = op_it->second;
+ ceph_assert(op_event.on_finish_safe == nullptr);
+ op_event.on_finish_ready = on_ready;
+ op_event.on_finish_safe = on_safe;
+ op_in_progress = op_event.op_in_progress;
+ std::swap(on_op_complete, op_event.on_op_complete);
+ std::swap(on_op_finish_event, op_event.on_op_finish_event);
+
+ // special errors which indicate op never started but was recorded
+ // as failed in the journal
+ filter_ret_val = (op_event.op_finish_error_codes.count(event.r) != 0);
+ }
+
+ if (event.r < 0) {
+ if (op_in_progress) {
+ // bubble the error up to the in-progress op to cancel it
+ on_op_finish_event->complete(event.r);
+ } else {
+ // op hasn't been started -- bubble the error up since
+ // our image is now potentially in an inconsistent state
+ // since simple errors should have been caught before
+ // creating the op event
+ delete on_op_complete;
+ delete on_op_finish_event;
+ handle_op_complete(event.op_tid, filter_ret_val ? 0 : event.r);
+ }
+ return;
+ }
+
+ // journal recorded success -- apply the op now
+ on_op_finish_event->complete(0);
+}
+
+template <typename I>
+void Replay<I>::handle_event(const journal::SnapCreateEvent &event,
+ Context *on_ready, Context *on_safe) {
+ CephContext *cct = m_image_ctx.cct;
+ ldout(cct, 20) << ": Snap create event" << dendl;
+
+ Mutex::Locker locker(m_lock);
+ OpEvent *op_event;
+ Context *on_op_complete = create_op_context_callback(event.op_tid, on_ready,
+ on_safe, &op_event);
+ if (on_op_complete == nullptr) {
+ return;
+ }
+
+ // ignore errors caused due to replay
+ op_event->ignore_error_codes = {-EEXIST};
+
+ // avoid lock cycles
+ m_image_ctx.op_work_queue->queue(new C_RefreshIfRequired<I>(
+ m_image_ctx, new ExecuteOp<I, journal::SnapCreateEvent>(m_image_ctx, event,
+ on_op_complete)),
+ 0);
+
+ // do not process more events until the state machine is ready
+ // since it will affect IO
+ op_event->op_in_progress = true;
+ op_event->on_start_ready = on_ready;
+}
+
+template <typename I>
+void Replay<I>::handle_event(const journal::SnapRemoveEvent &event,
+ Context *on_ready, Context *on_safe) {
+ CephContext *cct = m_image_ctx.cct;
+ ldout(cct, 20) << ": Snap remove event" << dendl;
+
+ Mutex::Locker locker(m_lock);
+ OpEvent *op_event;
+ Context *on_op_complete = create_op_context_callback(event.op_tid, on_ready,
+ on_safe, &op_event);
+ if (on_op_complete == nullptr) {
+ return;
+ }
+
+ op_event->on_op_finish_event = new C_RefreshIfRequired<I>(
+ m_image_ctx, new ExecuteOp<I, journal::SnapRemoveEvent>(m_image_ctx, event,
+ on_op_complete));
+
+ // ignore errors caused due to replay
+ op_event->ignore_error_codes = {-ENOENT};
+
+ on_ready->complete(0);
+}
+
+template <typename I>
+void Replay<I>::handle_event(const journal::SnapRenameEvent &event,
+ Context *on_ready, Context *on_safe) {
+ CephContext *cct = m_image_ctx.cct;
+ ldout(cct, 20) << ": Snap rename event" << dendl;
+
+ Mutex::Locker locker(m_lock);
+ OpEvent *op_event;
+ Context *on_op_complete = create_op_context_callback(event.op_tid, on_ready,
+ on_safe, &op_event);
+ if (on_op_complete == nullptr) {
+ return;
+ }
+
+ op_event->on_op_finish_event = new C_RefreshIfRequired<I>(
+ m_image_ctx, new ExecuteOp<I, journal::SnapRenameEvent>(m_image_ctx, event,
+ on_op_complete));
+
+ // ignore errors caused due to replay
+ op_event->ignore_error_codes = {-EEXIST};
+
+ on_ready->complete(0);
+}
+
+template <typename I>
+void Replay<I>::handle_event(const journal::SnapProtectEvent &event,
+ Context *on_ready, Context *on_safe) {
+ CephContext *cct = m_image_ctx.cct;
+ ldout(cct, 20) << ": Snap protect event" << dendl;
+
+ Mutex::Locker locker(m_lock);
+ OpEvent *op_event;
+ Context *on_op_complete = create_op_context_callback(event.op_tid, on_ready,
+ on_safe, &op_event);
+ if (on_op_complete == nullptr) {
+ return;
+ }
+
+ op_event->on_op_finish_event = new C_RefreshIfRequired<I>(
+ m_image_ctx, new ExecuteOp<I, journal::SnapProtectEvent>(m_image_ctx, event,
+ on_op_complete));
+
+ // ignore errors caused due to replay
+ op_event->ignore_error_codes = {-EBUSY};
+
+ on_ready->complete(0);
+}
+
+template <typename I>
+void Replay<I>::handle_event(const journal::SnapUnprotectEvent &event,
+ Context *on_ready, Context *on_safe) {
+ CephContext *cct = m_image_ctx.cct;
+ ldout(cct, 20) << ": Snap unprotect event" << dendl;
+
+ Mutex::Locker locker(m_lock);
+ OpEvent *op_event;
+ Context *on_op_complete = create_op_context_callback(event.op_tid, on_ready,
+ on_safe, &op_event);
+ if (on_op_complete == nullptr) {
+ return;
+ }
+
+ op_event->on_op_finish_event = new C_RefreshIfRequired<I>(
+ m_image_ctx, new ExecuteOp<I, journal::SnapUnprotectEvent>(m_image_ctx,
+ event,
+ on_op_complete));
+
+ // ignore errors recorded in the journal
+ op_event->op_finish_error_codes = {-EBUSY};
+
+ // ignore errors caused due to replay
+ op_event->ignore_error_codes = {-EINVAL};
+
+ on_ready->complete(0);
+}
+
+template <typename I>
+void Replay<I>::handle_event(const journal::SnapRollbackEvent &event,
+ Context *on_ready, Context *on_safe) {
+ CephContext *cct = m_image_ctx.cct;
+ ldout(cct, 20) << ": Snap rollback start event" << dendl;
+
+ Mutex::Locker locker(m_lock);
+ OpEvent *op_event;
+ Context *on_op_complete = create_op_context_callback(event.op_tid, on_ready,
+ on_safe, &op_event);
+ if (on_op_complete == nullptr) {
+ return;
+ }
+
+ op_event->on_op_finish_event = new C_RefreshIfRequired<I>(
+ m_image_ctx, new ExecuteOp<I, journal::SnapRollbackEvent>(m_image_ctx,
+ event,
+ on_op_complete));
+
+ on_ready->complete(0);
+}
+
+template <typename I>
+void Replay<I>::handle_event(const journal::RenameEvent &event,
+ Context *on_ready, Context *on_safe) {
+ CephContext *cct = m_image_ctx.cct;
+ ldout(cct, 20) << ": Rename event" << dendl;
+
+ Mutex::Locker locker(m_lock);
+ OpEvent *op_event;
+ Context *on_op_complete = create_op_context_callback(event.op_tid, on_ready,
+ on_safe, &op_event);
+ if (on_op_complete == nullptr) {
+ return;
+ }
+
+ op_event->on_op_finish_event = new C_RefreshIfRequired<I>(
+ m_image_ctx, new ExecuteOp<I, journal::RenameEvent>(m_image_ctx, event,
+ on_op_complete));
+
+ // ignore errors caused due to replay
+ op_event->ignore_error_codes = {-EEXIST};
+
+ on_ready->complete(0);
+}
+
+template <typename I>
+void Replay<I>::handle_event(const journal::ResizeEvent &event,
+ Context *on_ready, Context *on_safe) {
+ CephContext *cct = m_image_ctx.cct;
+ ldout(cct, 20) << ": Resize start event" << dendl;
+
+ Mutex::Locker locker(m_lock);
+ OpEvent *op_event;
+ Context *on_op_complete = create_op_context_callback(event.op_tid, on_ready,
+ on_safe, &op_event);
+ if (on_op_complete == nullptr) {
+ return;
+ }
+
+ // avoid lock cycles
+ m_image_ctx.op_work_queue->queue(new C_RefreshIfRequired<I>(
+ m_image_ctx, new ExecuteOp<I, journal::ResizeEvent>(m_image_ctx, event,
+ on_op_complete)), 0);
+
+ // do not process more events until the state machine is ready
+ // since it will affect IO
+ op_event->op_in_progress = true;
+ op_event->on_start_ready = on_ready;
+}
+
+template <typename I>
+void Replay<I>::handle_event(const journal::FlattenEvent &event,
+ Context *on_ready, Context *on_safe) {
+ CephContext *cct = m_image_ctx.cct;
+ ldout(cct, 20) << ": Flatten start event" << dendl;
+
+ Mutex::Locker locker(m_lock);
+ OpEvent *op_event;
+ Context *on_op_complete = create_op_context_callback(event.op_tid, on_ready,
+ on_safe, &op_event);
+ if (on_op_complete == nullptr) {
+ return;
+ }
+
+ op_event->on_op_finish_event = new C_RefreshIfRequired<I>(
+ m_image_ctx, new ExecuteOp<I, journal::FlattenEvent>(m_image_ctx, event,
+ on_op_complete));
+
+ // ignore errors caused due to replay
+ op_event->ignore_error_codes = {-EINVAL};
+
+ on_ready->complete(0);
+}
+
+template <typename I>
+void Replay<I>::handle_event(const journal::DemotePromoteEvent &event,
+ Context *on_ready, Context *on_safe) {
+ CephContext *cct = m_image_ctx.cct;
+ ldout(cct, 20) << ": Demote/Promote event" << dendl;
+ on_ready->complete(0);
+ on_safe->complete(0);
+}
+
+template <typename I>
+void Replay<I>::handle_event(const journal::SnapLimitEvent &event,
+ Context *on_ready, Context *on_safe) {
+ CephContext *cct = m_image_ctx.cct;
+ ldout(cct, 20) << ": Snap limit event" << dendl;
+
+ Mutex::Locker locker(m_lock);
+ OpEvent *op_event;
+ Context *on_op_complete = create_op_context_callback(event.op_tid, on_ready,
+ on_safe, &op_event);
+ if (on_op_complete == nullptr) {
+ return;
+ }
+
+ op_event->on_op_finish_event = new C_RefreshIfRequired<I>(
+ m_image_ctx, new ExecuteOp<I, journal::SnapLimitEvent>(m_image_ctx,
+ event,
+ on_op_complete));
+
+ op_event->ignore_error_codes = {-ERANGE};
+
+ on_ready->complete(0);
+}
+
+template <typename I>
+void Replay<I>::handle_event(const journal::UpdateFeaturesEvent &event,
+ Context *on_ready, Context *on_safe) {
+ CephContext *cct = m_image_ctx.cct;
+ ldout(cct, 20) << ": Update features event" << dendl;
+
+ Mutex::Locker locker(m_lock);
+ OpEvent *op_event;
+ Context *on_op_complete = create_op_context_callback(event.op_tid, on_ready,
+ on_safe, &op_event);
+ if (on_op_complete == nullptr) {
+ return;
+ }
+
+ // avoid lock cycles
+ m_image_ctx.op_work_queue->queue(new C_RefreshIfRequired<I>(
+ m_image_ctx, new ExecuteOp<I, journal::UpdateFeaturesEvent>(
+ m_image_ctx, event, on_op_complete)), 0);
+
+ // do not process more events until the state machine is ready
+ // since it will affect IO
+ op_event->op_in_progress = true;
+ op_event->on_start_ready = on_ready;
+}
+
+template <typename I>
+void Replay<I>::handle_event(const journal::MetadataSetEvent &event,
+ Context *on_ready, Context *on_safe) {
+ CephContext *cct = m_image_ctx.cct;
+ ldout(cct, 20) << ": Metadata set event" << dendl;
+
+ Mutex::Locker locker(m_lock);
+ OpEvent *op_event;
+ Context *on_op_complete = create_op_context_callback(event.op_tid, on_ready,
+ on_safe, &op_event);
+ if (on_op_complete == nullptr) {
+ return;
+ }
+
+ on_op_complete = new C_RefreshIfRequired<I>(m_image_ctx, on_op_complete);
+ op_event->on_op_finish_event = util::create_async_context_callback(
+ m_image_ctx, new ExecuteOp<I, journal::MetadataSetEvent>(
+ m_image_ctx, event, on_op_complete));
+
+ on_ready->complete(0);
+}
+
+template <typename I>
+void Replay<I>::handle_event(const journal::MetadataRemoveEvent &event,
+ Context *on_ready, Context *on_safe) {
+ CephContext *cct = m_image_ctx.cct;
+ ldout(cct, 20) << ": Metadata remove event" << dendl;
+
+ Mutex::Locker locker(m_lock);
+ OpEvent *op_event;
+ Context *on_op_complete = create_op_context_callback(event.op_tid, on_ready,
+ on_safe, &op_event);
+ if (on_op_complete == nullptr) {
+ return;
+ }
+
+ on_op_complete = new C_RefreshIfRequired<I>(m_image_ctx, on_op_complete);
+ op_event->on_op_finish_event = util::create_async_context_callback(
+ m_image_ctx, new ExecuteOp<I, journal::MetadataRemoveEvent>(
+ m_image_ctx, event, on_op_complete));
+
+ // ignore errors caused due to replay
+ op_event->ignore_error_codes = {-ENOENT};
+
+ on_ready->complete(0);
+}
+
+template <typename I>
+void Replay<I>::handle_event(const journal::UnknownEvent &event,
+ Context *on_ready, Context *on_safe) {
+ CephContext *cct = m_image_ctx.cct;
+ ldout(cct, 20) << ": unknown event" << dendl;
+ on_ready->complete(0);
+ on_safe->complete(0);
+}
+
+template <typename I>
+void Replay<I>::handle_aio_modify_complete(Context *on_ready, Context *on_safe,
+ int r, std::set<int> &filters,
+ bool writeback_cache_enabled) {
+ Mutex::Locker locker(m_lock);
+ CephContext *cct = m_image_ctx.cct;
+ ldout(cct, 20) << ": on_ready=" << on_ready << ", "
+ << "on_safe=" << on_safe << ", r=" << r << dendl;
+
+ if (on_ready != nullptr) {
+ on_ready->complete(0);
+ }
+
+ if (filters.find(r) != filters.end())
+ r = 0;
+
+ if (r < 0) {
+ lderr(cct) << ": AIO modify op failed: " << cpp_strerror(r) << dendl;
+ m_image_ctx.op_work_queue->queue(on_safe, r);
+ return;
+ }
+
+ if (writeback_cache_enabled) {
+ // will be completed after next flush operation completes
+ m_aio_modify_safe_contexts.insert(on_safe);
+ } else {
+ // IO is safely stored on disk
+ ceph_assert(m_in_flight_aio_modify > 0);
+ --m_in_flight_aio_modify;
+
+ if (m_on_aio_ready != nullptr) {
+ ldout(cct, 10) << ": resuming paused AIO" << dendl;
+ m_on_aio_ready->complete(0);
+ m_on_aio_ready = nullptr;
+ }
+
+ ldout(cct, 20) << ": completing safe context: " << on_safe << dendl;
+ m_image_ctx.op_work_queue->queue(on_safe, 0);
+ }
+}
+
+template <typename I>
+void Replay<I>::handle_aio_flush_complete(Context *on_flush_safe,
+ Contexts &on_safe_ctxs, int r) {
+ CephContext *cct = m_image_ctx.cct;
+ ldout(cct, 20) << ": r=" << r << dendl;
+
+ if (r < 0) {
+ lderr(cct) << ": AIO flush failed: " << cpp_strerror(r) << dendl;
+ }
+
+ Context *on_aio_ready = nullptr;
+ Context *on_flush = nullptr;
+ {
+ Mutex::Locker locker(m_lock);
+ ceph_assert(m_in_flight_aio_flush > 0);
+ ceph_assert(m_in_flight_aio_modify >= on_safe_ctxs.size());
+ --m_in_flight_aio_flush;
+ m_in_flight_aio_modify -= on_safe_ctxs.size();
+
+ std::swap(on_aio_ready, m_on_aio_ready);
+ if (m_in_flight_op_events == 0 &&
+ (m_in_flight_aio_flush + m_in_flight_aio_modify) == 0) {
+ on_flush = m_flush_ctx;
+ }
+
+ // strip out previously failed on_safe contexts
+ for (auto it = on_safe_ctxs.begin(); it != on_safe_ctxs.end(); ) {
+ if (m_aio_modify_safe_contexts.erase(*it)) {
+ ++it;
+ } else {
+ it = on_safe_ctxs.erase(it);
+ }
+ }
+ }
+
+ if (on_aio_ready != nullptr) {
+ ldout(cct, 10) << ": resuming paused AIO" << dendl;
+ on_aio_ready->complete(0);
+ }
+
+ if (on_flush_safe != nullptr) {
+ on_safe_ctxs.push_back(on_flush_safe);
+ }
+ for (auto ctx : on_safe_ctxs) {
+ ldout(cct, 20) << ": completing safe context: " << ctx << dendl;
+ ctx->complete(r);
+ }
+
+ if (on_flush != nullptr) {
+ ldout(cct, 20) << ": completing flush context: " << on_flush << dendl;
+ on_flush->complete(r);
+ }
+}
+
+template <typename I>
+Context *Replay<I>::create_op_context_callback(uint64_t op_tid,
+ Context *on_ready,
+ Context *on_safe,
+ OpEvent **op_event) {
+ CephContext *cct = m_image_ctx.cct;
+ if (m_shut_down) {
+ ldout(cct, 5) << ": ignoring event after shut down" << dendl;
+ on_ready->complete(0);
+ m_image_ctx.op_work_queue->queue(on_safe, -ESHUTDOWN);
+ return nullptr;
+ }
+
+ ceph_assert(m_lock.is_locked());
+ if (m_op_events.count(op_tid) != 0) {
+ lderr(cct) << ": duplicate op tid detected: " << op_tid << dendl;
+
+ // on_ready is already async but on failure invoke on_safe async
+ // as well
+ on_ready->complete(0);
+ m_image_ctx.op_work_queue->queue(on_safe, -EINVAL);
+ return nullptr;
+ }
+
+ ++m_in_flight_op_events;
+ *op_event = &m_op_events[op_tid];
+ (*op_event)->on_start_safe = on_safe;
+
+ Context *on_op_complete = new C_OpOnComplete(this, op_tid);
+ (*op_event)->on_op_complete = on_op_complete;
+ return on_op_complete;
+}
+
+template <typename I>
+void Replay<I>::handle_op_complete(uint64_t op_tid, int r) {
+ CephContext *cct = m_image_ctx.cct;
+ ldout(cct, 20) << ": op_tid=" << op_tid << ", "
+ << "r=" << r << dendl;
+
+ OpEvent op_event;
+ bool shutting_down = false;
+ {
+ Mutex::Locker locker(m_lock);
+ auto op_it = m_op_events.find(op_tid);
+ ceph_assert(op_it != m_op_events.end());
+
+ op_event = std::move(op_it->second);
+ m_op_events.erase(op_it);
+
+ if (m_shut_down) {
+ ceph_assert(m_flush_ctx != nullptr);
+ shutting_down = true;
+ }
+ }
+
+ ceph_assert(op_event.on_start_ready == nullptr || (r < 0 && r != -ERESTART));
+ if (op_event.on_start_ready != nullptr) {
+ // blocking op event failed before it became ready
+ ceph_assert(op_event.on_finish_ready == nullptr &&
+ op_event.on_finish_safe == nullptr);
+
+ op_event.on_start_ready->complete(0);
+ } else {
+ // event kicked off by OpFinishEvent
+ ceph_assert((op_event.on_finish_ready != nullptr &&
+ op_event.on_finish_safe != nullptr) || shutting_down);
+ }
+
+ if (op_event.on_op_finish_event != nullptr) {
+ op_event.on_op_finish_event->complete(r);
+ }
+
+ if (op_event.on_finish_ready != nullptr) {
+ op_event.on_finish_ready->complete(0);
+ }
+
+ // filter out errors caused by replay of the same op
+ if (r < 0 && op_event.ignore_error_codes.count(r) != 0) {
+ r = 0;
+ }
+
+ op_event.on_start_safe->complete(r);
+ if (op_event.on_finish_safe != nullptr) {
+ op_event.on_finish_safe->complete(r);
+ }
+
+ // shut down request might have occurred while lock was
+ // dropped -- handle if pending
+ Context *on_flush = nullptr;
+ {
+ Mutex::Locker locker(m_lock);
+ ceph_assert(m_in_flight_op_events > 0);
+ --m_in_flight_op_events;
+ if (m_in_flight_op_events == 0 &&
+ (m_in_flight_aio_flush + m_in_flight_aio_modify) == 0) {
+ on_flush = m_flush_ctx;
+ }
+ }
+ if (on_flush != nullptr) {
+ m_image_ctx.op_work_queue->queue(on_flush, 0);
+ }
+}
+
+template <typename I>
+io::AioCompletion *
+Replay<I>::create_aio_modify_completion(Context *on_ready,
+ Context *on_safe,
+ io::aio_type_t aio_type,
+ bool *flush_required,
+ std::set<int> &&filters) {
+ Mutex::Locker locker(m_lock);
+ CephContext *cct = m_image_ctx.cct;
+ ceph_assert(m_on_aio_ready == nullptr);
+
+ if (m_shut_down) {
+ ldout(cct, 5) << ": ignoring event after shut down" << dendl;
+ on_ready->complete(0);
+ m_image_ctx.op_work_queue->queue(on_safe, -ESHUTDOWN);
+ return nullptr;
+ }
+
+ ++m_in_flight_aio_modify;
+
+ bool writeback_cache_enabled = m_image_ctx.is_writeback_cache_enabled();
+ if (writeback_cache_enabled) {
+ m_aio_modify_unsafe_contexts.push_back(on_safe);
+ }
+
+ // FLUSH if we hit the low-water mark -- on_safe contexts are
+ // completed by flushes-only so that we don't move the journal
+ // commit position until safely on-disk
+
+ *flush_required = (writeback_cache_enabled &&
+ m_aio_modify_unsafe_contexts.size() ==
+ IN_FLIGHT_IO_LOW_WATER_MARK);
+ if (*flush_required) {
+ ldout(cct, 10) << ": hit AIO replay low-water mark: scheduling flush"
+ << dendl;
+ }
+
+ // READY for more events if:
+ // * not at high-water mark for IO
+ // * in-flight ops are at a consistent point (snap create has IO flushed,
+ // shrink has adjusted clip boundary, etc) -- should have already been
+ // flagged not-ready
+ if (m_in_flight_aio_modify == IN_FLIGHT_IO_HIGH_WATER_MARK) {
+ ldout(cct, 10) << ": hit AIO replay high-water mark: pausing replay"
+ << dendl;
+ ceph_assert(m_on_aio_ready == nullptr);
+ std::swap(m_on_aio_ready, on_ready);
+ }
+
+ // when the modification is ACKed by librbd, we can process the next
+ // event. when flushed, the completion of the next flush will fire the
+ // on_safe callback
+ auto aio_comp = io::AioCompletion::create_and_start<Context>(
+ new C_AioModifyComplete(this, on_ready, on_safe, std::move(filters),
+ writeback_cache_enabled),
+ util::get_image_ctx(&m_image_ctx), aio_type);
+ return aio_comp;
+}
+
+template <typename I>
+io::AioCompletion *Replay<I>::create_aio_flush_completion(Context *on_safe) {
+ ceph_assert(m_lock.is_locked());
+
+ CephContext *cct = m_image_ctx.cct;
+ if (m_shut_down) {
+ ldout(cct, 5) << ": ignoring event after shut down" << dendl;
+ if (on_safe != nullptr) {
+ m_image_ctx.op_work_queue->queue(on_safe, -ESHUTDOWN);
+ }
+ return nullptr;
+ }
+
+ ++m_in_flight_aio_flush;
+
+ // associate all prior write/discard ops to this flush request
+ auto aio_comp = io::AioCompletion::create_and_start<Context>(
+ new C_AioFlushComplete(this, on_safe,
+ std::move(m_aio_modify_unsafe_contexts)),
+ util::get_image_ctx(&m_image_ctx), io::AIO_TYPE_FLUSH);
+ m_aio_modify_unsafe_contexts.clear();
+ return aio_comp;
+}
+
+template <typename I>
+bool Replay<I>::clipped_io(uint64_t image_offset, io::AioCompletion *aio_comp) {
+ CephContext *cct = m_image_ctx.cct;
+
+ m_image_ctx.snap_lock.get_read();
+ size_t image_size = m_image_ctx.size;
+ m_image_ctx.snap_lock.put_read();
+
+ if (image_offset >= image_size) {
+ // rbd-mirror image sync might race an IO event w/ associated resize between
+ // the point the peer is registered and the sync point is created, so no-op
+ // IO events beyond the current image extents since under normal conditions
+ // it wouldn't have been recorded in the journal
+ ldout(cct, 5) << ": no-op IO event beyond image size" << dendl;
+ aio_comp->get();
+ aio_comp->set_request_count(0);
+ aio_comp->put();
+ return true;
+ }
+
+ return false;
+}
+
+} // namespace journal
+} // namespace librbd
+
+template class librbd::journal::Replay<librbd::ImageCtx>;
diff --git a/src/librbd/journal/Replay.h b/src/librbd/journal/Replay.h
new file mode 100644
index 00000000..206d7db4
--- /dev/null
+++ b/src/librbd/journal/Replay.h
@@ -0,0 +1,210 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#ifndef CEPH_LIBRBD_JOURNAL_REPLAY_H
+#define CEPH_LIBRBD_JOURNAL_REPLAY_H
+
+#include "include/int_types.h"
+#include "include/buffer_fwd.h"
+#include "include/Context.h"
+#include "common/Mutex.h"
+#include "librbd/io/Types.h"
+#include "librbd/journal/Types.h"
+#include <boost/variant.hpp>
+#include <list>
+#include <unordered_set>
+#include <unordered_map>
+
+namespace librbd {
+
+class ImageCtx;
+namespace io { struct AioCompletion; }
+
+namespace journal {
+
+template <typename ImageCtxT = ImageCtx>
+class Replay {
+public:
+ static Replay *create(ImageCtxT &image_ctx) {
+ return new Replay(image_ctx);
+ }
+
+ Replay(ImageCtxT &image_ctx);
+ ~Replay();
+
+ int decode(bufferlist::const_iterator *it, EventEntry *event_entry);
+ void process(const EventEntry &event_entry,
+ Context *on_ready, Context *on_safe);
+
+ void shut_down(bool cancel_ops, Context *on_finish);
+ void flush(Context *on_finish);
+
+ void replay_op_ready(uint64_t op_tid, Context *on_resume);
+
+private:
+ typedef std::unordered_set<int> ReturnValues;
+
+ struct OpEvent {
+ bool op_in_progress = false;
+ bool finish_on_ready = false;
+ Context *on_op_finish_event = nullptr;
+ Context *on_start_ready = nullptr;
+ Context *on_start_safe = nullptr;
+ Context *on_finish_ready = nullptr;
+ Context *on_finish_safe = nullptr;
+ Context *on_op_complete = nullptr;
+ ReturnValues op_finish_error_codes;
+ ReturnValues ignore_error_codes;
+ };
+
+ typedef std::list<uint64_t> OpTids;
+ typedef std::list<Context *> Contexts;
+ typedef std::unordered_set<Context *> ContextSet;
+ typedef std::unordered_map<uint64_t, OpEvent> OpEvents;
+
+ struct C_OpOnComplete : public Context {
+ Replay *replay;
+ uint64_t op_tid;
+ C_OpOnComplete(Replay *replay, uint64_t op_tid)
+ : replay(replay), op_tid(op_tid) {
+ }
+ void finish(int r) override {
+ replay->handle_op_complete(op_tid, r);
+ }
+ };
+
+ struct C_AioModifyComplete : public Context {
+ Replay *replay;
+ Context *on_ready;
+ Context *on_safe;
+ std::set<int> filters;
+ bool writeback_cache_enabled;
+ C_AioModifyComplete(Replay *replay, Context *on_ready,
+ Context *on_safe, std::set<int> &&filters,
+ bool writeback_cache_enabled)
+ : replay(replay), on_ready(on_ready), on_safe(on_safe),
+ filters(std::move(filters)),
+ writeback_cache_enabled(writeback_cache_enabled) {
+ }
+ void finish(int r) override {
+ replay->handle_aio_modify_complete(on_ready, on_safe, r, filters,
+ writeback_cache_enabled);
+ }
+ };
+
+ struct C_AioFlushComplete : public Context {
+ Replay *replay;
+ Context *on_flush_safe;
+ Contexts on_safe_ctxs;
+ C_AioFlushComplete(Replay *replay, Context *on_flush_safe,
+ Contexts &&on_safe_ctxs)
+ : replay(replay), on_flush_safe(on_flush_safe),
+ on_safe_ctxs(on_safe_ctxs) {
+ }
+ void finish(int r) override {
+ replay->handle_aio_flush_complete(on_flush_safe, on_safe_ctxs, r);
+ }
+ };
+
+ struct EventVisitor : public boost::static_visitor<void> {
+ Replay *replay;
+ Context *on_ready;
+ Context *on_safe;
+
+ EventVisitor(Replay *_replay, Context *_on_ready, Context *_on_safe)
+ : replay(_replay), on_ready(_on_ready), on_safe(_on_safe) {
+ }
+
+ template <typename Event>
+ inline void operator()(const Event &event) const {
+ replay->handle_event(event, on_ready, on_safe);
+ }
+ };
+
+ ImageCtxT &m_image_ctx;
+
+ Mutex m_lock;
+
+ uint64_t m_in_flight_aio_flush = 0;
+ uint64_t m_in_flight_aio_modify = 0;
+ Contexts m_aio_modify_unsafe_contexts;
+ ContextSet m_aio_modify_safe_contexts;
+
+ OpEvents m_op_events;
+ uint64_t m_in_flight_op_events = 0;
+
+ bool m_shut_down = false;
+ Context *m_flush_ctx = nullptr;
+ Context *m_on_aio_ready = nullptr;
+
+ void handle_event(const AioDiscardEvent &event, Context *on_ready,
+ Context *on_safe);
+ void handle_event(const AioWriteEvent &event, Context *on_ready,
+ Context *on_safe);
+ void handle_event(const AioWriteSameEvent &event, Context *on_ready,
+ Context *on_safe);
+ void handle_event(const AioCompareAndWriteEvent &event, Context *on_ready,
+ Context *on_safe);
+ void handle_event(const AioFlushEvent &event, Context *on_ready,
+ Context *on_safe);
+ void handle_event(const OpFinishEvent &event, Context *on_ready,
+ Context *on_safe);
+ void handle_event(const SnapCreateEvent &event, Context *on_ready,
+ Context *on_safe);
+ void handle_event(const SnapRemoveEvent &event, Context *on_ready,
+ Context *on_safe);
+ void handle_event(const SnapRenameEvent &event, Context *on_ready,
+ Context *on_safe);
+ void handle_event(const SnapProtectEvent &event, Context *on_ready,
+ Context *on_safe);
+ void handle_event(const SnapUnprotectEvent &event, Context *on_ready,
+ Context *on_safe);
+ void handle_event(const SnapRollbackEvent &event, Context *on_ready,
+ Context *on_safe);
+ void handle_event(const RenameEvent &event, Context *on_ready,
+ Context *on_safe);
+ void handle_event(const ResizeEvent &event, Context *on_ready,
+ Context *on_safe);
+ void handle_event(const FlattenEvent &event, Context *on_ready,
+ Context *on_safe);
+ void handle_event(const DemotePromoteEvent &event, Context *on_ready,
+ Context *on_safe);
+ void handle_event(const SnapLimitEvent &event, Context *on_ready,
+ Context *on_safe);
+ void handle_event(const UpdateFeaturesEvent &event, Context *on_ready,
+ Context *on_safe);
+ void handle_event(const MetadataSetEvent &event, Context *on_ready,
+ Context *on_safe);
+ void handle_event(const MetadataRemoveEvent &event, Context *on_ready,
+ Context *on_safe);
+ void handle_event(const UnknownEvent &event, Context *on_ready,
+ Context *on_safe);
+
+ void handle_aio_modify_complete(Context *on_ready, Context *on_safe,
+ int r, std::set<int> &filters,
+ bool writeback_cache_enabled);
+ void handle_aio_flush_complete(Context *on_flush_safe, Contexts &on_safe_ctxs,
+ int r);
+
+ Context *create_op_context_callback(uint64_t op_tid, Context *on_ready,
+ Context *on_safe, OpEvent **op_event);
+ void handle_op_complete(uint64_t op_tid, int r);
+
+ io::AioCompletion *create_aio_modify_completion(Context *on_ready,
+ Context *on_safe,
+ io::aio_type_t aio_type,
+ bool *flush_required,
+ std::set<int> &&filters);
+ io::AioCompletion *create_aio_flush_completion(Context *on_safe);
+ void handle_aio_completion(io::AioCompletion *aio_comp);
+
+ bool clipped_io(uint64_t image_offset, io::AioCompletion *aio_comp);
+
+};
+
+} // namespace journal
+} // namespace librbd
+
+extern template class librbd::journal::Replay<librbd::ImageCtx>;
+
+#endif // CEPH_LIBRBD_JOURNAL_REPLAY_H
diff --git a/src/librbd/journal/ResetRequest.cc b/src/librbd/journal/ResetRequest.cc
new file mode 100644
index 00000000..9b67f3e4
--- /dev/null
+++ b/src/librbd/journal/ResetRequest.cc
@@ -0,0 +1,162 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include "librbd/journal/ResetRequest.h"
+#include "common/dout.h"
+#include "common/errno.h"
+#include "common/Timer.h"
+#include "common/WorkQueue.h"
+#include "journal/Journaler.h"
+#include "journal/Settings.h"
+#include "include/ceph_assert.h"
+#include "librbd/Journal.h"
+#include "librbd/Utils.h"
+#include "librbd/journal/CreateRequest.h"
+#include "librbd/journal/RemoveRequest.h"
+
+#define dout_subsys ceph_subsys_rbd
+#undef dout_prefix
+#define dout_prefix *_dout << "librbd::journal::ResetRequest: " << this << " " \
+ << __func__ << ": "
+
+namespace librbd {
+namespace journal {
+
+using util::create_async_context_callback;
+using util::create_context_callback;
+
+template<typename I>
+void ResetRequest<I>::send() {
+ init_journaler();
+}
+
+template<typename I>
+void ResetRequest<I>::init_journaler() {
+ ldout(m_cct, 10) << dendl;
+
+ m_journaler = new Journaler(m_io_ctx, m_image_id, m_client_id, {});
+ Context *ctx = create_context_callback<
+ ResetRequest<I>, &ResetRequest<I>::handle_init_journaler>(this);
+ m_journaler->init(ctx);
+}
+
+template<typename I>
+void ResetRequest<I>::handle_init_journaler(int r) {
+ ldout(m_cct, 10) << "r=" << r << dendl;
+
+ if (r == -ENOENT) {
+ ldout(m_cct, 5) << "journal does not exist" << dendl;
+ m_ret_val = r;
+ } else if (r < 0) {
+ lderr(m_cct) << "failed to init journaler: " << cpp_strerror(r) << dendl;
+ m_ret_val = r;
+ } else {
+ int64_t pool_id;
+ m_journaler->get_metadata(&m_order, &m_splay_width, &pool_id);
+
+ if (pool_id != -1) {
+ librados::Rados rados(m_io_ctx);
+ r = rados.pool_reverse_lookup(pool_id, &m_object_pool_name);
+ if (r < 0) {
+ lderr(m_cct) << "failed to lookup data pool: " << cpp_strerror(r)
+ << dendl;
+ m_ret_val = r;
+ }
+ }
+ }
+
+ shut_down_journaler();
+}
+
+template<typename I>
+void ResetRequest<I>::shut_down_journaler() {
+ ldout(m_cct, 10) << dendl;
+
+ Context *ctx = create_async_context_callback(
+ m_op_work_queue, create_context_callback<
+ ResetRequest<I>, &ResetRequest<I>::handle_journaler_shutdown>(this));
+ m_journaler->shut_down(ctx);
+}
+
+template<typename I>
+void ResetRequest<I>::handle_journaler_shutdown(int r) {
+ ldout(m_cct, 10) << "r=" << r << dendl;
+
+ delete m_journaler;
+ if (r < 0) {
+ lderr(m_cct) << "failed to shut down journaler: " << cpp_strerror(r)
+ << dendl;
+ if (m_ret_val == 0) {
+ m_ret_val = r;
+ }
+ }
+
+ if (m_ret_val < 0) {
+ finish(m_ret_val);
+ return;
+ }
+
+ remove_journal();
+}
+
+template<typename I>
+void ResetRequest<I>::remove_journal() {
+ ldout(m_cct, 10) << dendl;
+
+ Context *ctx = create_context_callback<
+ ResetRequest<I>, &ResetRequest<I>::handle_remove_journal>(this);
+ auto req = RemoveRequest<I>::create(m_io_ctx, m_image_id, m_client_id,
+ m_op_work_queue, ctx);
+ req->send();
+}
+
+template<typename I>
+void ResetRequest<I>::handle_remove_journal(int r) {
+ ldout(m_cct, 10) << "r=" << r << dendl;
+
+ if (r < 0) {
+ lderr(m_cct) << "failed to remove journal: " << cpp_strerror(r) << dendl;
+ finish(r);
+ return;
+ }
+
+ create_journal();
+}
+
+template<typename I>
+void ResetRequest<I>::create_journal() {
+ ldout(m_cct, 10) << dendl;
+
+ Context *ctx = create_context_callback<
+ ResetRequest<I>, &ResetRequest<I>::handle_create_journal>(this);
+ journal::TagData tag_data(m_mirror_uuid);
+ auto req = CreateRequest<I>::create(m_io_ctx, m_image_id, m_order,
+ m_splay_width, m_object_pool_name,
+ cls::journal::Tag::TAG_CLASS_NEW,
+ tag_data, m_client_id, m_op_work_queue,
+ ctx);
+ req->send();
+}
+
+template<typename I>
+void ResetRequest<I>::handle_create_journal(int r) {
+ ldout(m_cct, 10) << "r=" << r << dendl;
+
+ if (r < 0) {
+ lderr(m_cct) << "failed to create journal: " << cpp_strerror(r) << dendl;
+ }
+ finish(r);
+}
+
+template<typename I>
+void ResetRequest<I>::finish(int r) {
+ ldout(m_cct, 10) << "r=" << r << dendl;
+
+ m_on_finish->complete(r);
+ delete this;
+}
+
+} // namespace journal
+} // namespace librbd
+
+template class librbd::journal::ResetRequest<librbd::ImageCtx>;
diff --git a/src/librbd/journal/ResetRequest.h b/src/librbd/journal/ResetRequest.h
new file mode 100644
index 00000000..9cc8e2e4
--- /dev/null
+++ b/src/librbd/journal/ResetRequest.h
@@ -0,0 +1,111 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#ifndef CEPH_LIBRBD_JOURNAL_RESET_REQUEST_H
+#define CEPH_LIBRBD_JOURNAL_RESET_REQUEST_H
+
+#include "include/int_types.h"
+#include "include/buffer.h"
+#include "include/rados/librados.hpp"
+#include "include/rbd/librbd.hpp"
+#include "common/Mutex.h"
+#include "librbd/journal/TypeTraits.h"
+#include <string>
+
+class Context;
+class ContextWQ;
+class SafeTimer;
+
+namespace journal { class Journaler; }
+
+namespace librbd {
+
+class ImageCtx;
+
+namespace journal {
+
+template<typename ImageCtxT = ImageCtx>
+class ResetRequest {
+public:
+ static ResetRequest *create(librados::IoCtx &io_ctx,
+ const std::string &image_id,
+ const std::string &client_id,
+ const std::string &mirror_uuid,
+ ContextWQ *op_work_queue, Context *on_finish) {
+ return new ResetRequest(io_ctx, image_id, client_id, mirror_uuid,
+ op_work_queue, on_finish);
+ }
+
+ ResetRequest(librados::IoCtx &io_ctx, const std::string &image_id,
+ const std::string &client_id, const std::string &mirror_uuid,
+ ContextWQ *op_work_queue, Context *on_finish)
+ : m_io_ctx(io_ctx), m_image_id(image_id), m_client_id(client_id),
+ m_mirror_uuid(mirror_uuid), m_op_work_queue(op_work_queue),
+ m_on_finish(on_finish),
+ m_cct(reinterpret_cast<CephContext *>(m_io_ctx.cct())) {
+ }
+
+ void send();
+
+private:
+ /**
+ * @verbatim
+ *
+ * <start>
+ * |
+ * v
+ * INIT_JOURNALER
+ * |
+ * v
+ * SHUT_DOWN_JOURNALER
+ * |
+ * v
+ * REMOVE_JOURNAL
+ * |
+ * v
+ * CREATE_JOURNAL
+ * |
+ * v
+ * <finish>
+ *
+ * @endverbatim
+ */
+ typedef typename TypeTraits<ImageCtxT>::Journaler Journaler;
+
+ librados::IoCtx &m_io_ctx;
+ std::string m_image_id;
+ std::string m_client_id;
+ std::string m_mirror_uuid;
+ ContextWQ *m_op_work_queue;
+ Context *m_on_finish;
+
+ CephContext *m_cct;
+ Journaler *m_journaler = nullptr;
+ int m_ret_val = 0;
+
+ uint8_t m_order = 0;
+ uint8_t m_splay_width = 0;
+ std::string m_object_pool_name;
+
+ void init_journaler();
+ void handle_init_journaler(int r);
+
+ void shut_down_journaler();
+ void handle_journaler_shutdown(int r);
+
+ void remove_journal();
+ void handle_remove_journal(int r);
+
+ void create_journal();
+ void handle_create_journal(int r);
+
+ void finish(int r);
+
+};
+
+} // namespace journal
+} // namespace librbd
+
+extern template class librbd::journal::ResetRequest<librbd::ImageCtx>;
+
+#endif // CEPH_LIBRBD_JOURNAL_REMOVE_REQUEST_H
diff --git a/src/librbd/journal/StandardPolicy.cc b/src/librbd/journal/StandardPolicy.cc
new file mode 100644
index 00000000..58631801
--- /dev/null
+++ b/src/librbd/journal/StandardPolicy.cc
@@ -0,0 +1,32 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include "librbd/journal/StandardPolicy.h"
+#include "common/WorkQueue.h"
+#include "librbd/ImageCtx.h"
+#include "librbd/Journal.h"
+
+#define dout_subsys ceph_subsys_rbd
+#undef dout_prefix
+#define dout_prefix *_dout << "librbd::journal::StandardPolicy: "
+
+namespace librbd {
+namespace journal {
+
+template<typename I>
+void StandardPolicy<I>::allocate_tag_on_lock(Context *on_finish) {
+ ceph_assert(m_image_ctx->journal != nullptr);
+
+ if (!m_image_ctx->journal->is_tag_owner()) {
+ lderr(m_image_ctx->cct) << "local image not promoted" << dendl;
+ m_image_ctx->op_work_queue->queue(on_finish, -EPERM);
+ return;
+ }
+
+ m_image_ctx->journal->allocate_local_tag(on_finish);
+}
+
+} // namespace journal
+} // namespace librbd
+
+template class librbd::journal::StandardPolicy<librbd::ImageCtx>;
diff --git a/src/librbd/journal/StandardPolicy.h b/src/librbd/journal/StandardPolicy.h
new file mode 100644
index 00000000..ec8d0148
--- /dev/null
+++ b/src/librbd/journal/StandardPolicy.h
@@ -0,0 +1,38 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#ifndef CEPH_LIBRBD_JOURNAL_STANDARD_POLICY_H
+#define CEPH_LIBRBD_JOURNAL_STANDARD_POLICY_H
+
+#include "librbd/journal/Policy.h"
+
+namespace librbd {
+
+struct ImageCtx;
+
+namespace journal {
+
+template<typename ImageCtxT = ImageCtx>
+class StandardPolicy : public Policy {
+public:
+ StandardPolicy(ImageCtxT *image_ctx) : m_image_ctx(image_ctx) {
+ }
+
+ bool append_disabled() const override {
+ return false;
+ }
+ bool journal_disabled() const override {
+ return false;
+ }
+ void allocate_tag_on_lock(Context *on_finish) override;
+
+private:
+ ImageCtxT *m_image_ctx;
+};
+
+} // namespace journal
+} // namespace librbd
+
+extern template class librbd::journal::StandardPolicy<librbd::ImageCtx>;
+
+#endif // CEPH_LIBRBD_JOURNAL_STANDARD_POLICY_H
diff --git a/src/librbd/journal/TypeTraits.h b/src/librbd/journal/TypeTraits.h
new file mode 100644
index 00000000..d6dde690
--- /dev/null
+++ b/src/librbd/journal/TypeTraits.h
@@ -0,0 +1,26 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#ifndef CEPH_LIBRBD_JOURNAL_TYPE_TRAITS_H
+#define CEPH_LIBRBD_JOURNAL_TYPE_TRAITS_H
+
+namespace journal {
+class Future;
+class Journaler;
+class ReplayEntry;
+}
+
+namespace librbd {
+namespace journal {
+
+template <typename ImageCtxT>
+struct TypeTraits {
+ typedef ::journal::Journaler Journaler;
+ typedef ::journal::Future Future;
+ typedef ::journal::ReplayEntry ReplayEntry;
+};
+
+} // namespace journal
+} // namespace librbd
+
+#endif // CEPH_LIBRBD_JOURNAL_TYPE_TRAITS_H
diff --git a/src/librbd/journal/Types.cc b/src/librbd/journal/Types.cc
new file mode 100644
index 00000000..ee919d35
--- /dev/null
+++ b/src/librbd/journal/Types.cc
@@ -0,0 +1,950 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include "librbd/journal/Types.h"
+#include "include/ceph_assert.h"
+#include "include/stringify.h"
+#include "include/types.h"
+#include "common/Formatter.h"
+
+namespace librbd {
+namespace journal {
+
+using ceph::encode;
+using ceph::decode;
+
+namespace {
+
+template <typename E>
+class GetTypeVisitor : public boost::static_visitor<E> {
+public:
+ template <typename T>
+ inline E operator()(const T&) const {
+ return T::TYPE;
+ }
+};
+
+class EncodeVisitor : public boost::static_visitor<void> {
+public:
+ explicit EncodeVisitor(bufferlist &bl) : m_bl(bl) {
+ }
+
+ template <typename T>
+ inline void operator()(const T& t) const {
+ encode(static_cast<uint32_t>(T::TYPE), m_bl);
+ t.encode(m_bl);
+ }
+private:
+ bufferlist &m_bl;
+};
+
+class DecodeVisitor : public boost::static_visitor<void> {
+public:
+ DecodeVisitor(__u8 version, bufferlist::const_iterator &iter)
+ : m_version(version), m_iter(iter) {
+ }
+
+ template <typename T>
+ inline void operator()(T& t) const {
+ t.decode(m_version, m_iter);
+ }
+private:
+ __u8 m_version;
+ bufferlist::const_iterator &m_iter;
+};
+
+class DumpVisitor : public boost::static_visitor<void> {
+public:
+ explicit DumpVisitor(Formatter *formatter, const std::string &key)
+ : m_formatter(formatter), m_key(key) {}
+
+ template <typename T>
+ inline void operator()(const T& t) const {
+ auto type = T::TYPE;
+ m_formatter->dump_string(m_key.c_str(), stringify(type));
+ t.dump(m_formatter);
+ }
+private:
+ ceph::Formatter *m_formatter;
+ std::string m_key;
+};
+
+} // anonymous namespace
+
+void AioDiscardEvent::encode(bufferlist& bl) const {
+ using ceph::encode;
+ encode(offset, bl);
+ encode(length, bl);
+ bool skip_partial_discard = (discard_granularity_bytes > 0);
+ encode(skip_partial_discard, bl);
+ encode(discard_granularity_bytes, bl);
+}
+
+void AioDiscardEvent::decode(__u8 version, bufferlist::const_iterator& it) {
+ using ceph::decode;
+ decode(offset, it);
+ decode(length, it);
+
+ bool skip_partial_discard = false;
+ if (version >= 4) {
+ decode(skip_partial_discard, it);
+ }
+
+ if (version >= 5) {
+ decode(discard_granularity_bytes, it);
+ } else {
+ if (skip_partial_discard) {
+ // use a size larger than the maximum object size which will
+ // truncated down to object size during IO processing
+ discard_granularity_bytes = std::numeric_limits<uint32_t>::max();
+ } else {
+ discard_granularity_bytes = 0;
+ }
+ }
+}
+
+void AioDiscardEvent::dump(Formatter *f) const {
+ f->dump_unsigned("offset", offset);
+ f->dump_unsigned("length", length);
+ f->dump_unsigned("discard_granularity_bytes", discard_granularity_bytes);
+}
+
+uint32_t AioWriteEvent::get_fixed_size() {
+ return EventEntry::get_fixed_size() + 16 /* offset, length */;
+}
+
+void AioWriteEvent::encode(bufferlist& bl) const {
+ using ceph::encode;
+ encode(offset, bl);
+ encode(length, bl);
+ encode(data, bl);
+}
+
+void AioWriteEvent::decode(__u8 version, bufferlist::const_iterator& it) {
+ using ceph::decode;
+ decode(offset, it);
+ decode(length, it);
+ decode(data, it);
+}
+
+void AioWriteEvent::dump(Formatter *f) const {
+ f->dump_unsigned("offset", offset);
+ f->dump_unsigned("length", length);
+}
+
+void AioWriteSameEvent::encode(bufferlist& bl) const {
+ using ceph::encode;
+ encode(offset, bl);
+ encode(length, bl);
+ encode(data, bl);
+}
+
+void AioWriteSameEvent::decode(__u8 version, bufferlist::const_iterator& it) {
+ using ceph::decode;
+ decode(offset, it);
+ decode(length, it);
+ decode(data, it);
+}
+
+void AioWriteSameEvent::dump(Formatter *f) const {
+ f->dump_unsigned("offset", offset);
+ f->dump_unsigned("length", length);
+}
+
+uint32_t AioCompareAndWriteEvent::get_fixed_size() {
+ return EventEntry::get_fixed_size() + 32 /* offset, length */;
+}
+
+void AioCompareAndWriteEvent::encode(bufferlist& bl) const {
+ using ceph::encode;
+ encode(offset, bl);
+ encode(length, bl);
+ encode(cmp_data, bl);
+ encode(write_data, bl);
+}
+
+void AioCompareAndWriteEvent::decode(__u8 version, bufferlist::const_iterator& it) {
+ using ceph::decode;
+ decode(offset, it);
+ decode(length, it);
+ decode(cmp_data, it);
+ decode(write_data, it);
+}
+
+void AioCompareAndWriteEvent::dump(Formatter *f) const {
+ f->dump_unsigned("offset", offset);
+ f->dump_unsigned("length", length);
+}
+
+void AioFlushEvent::encode(bufferlist& bl) const {
+}
+
+void AioFlushEvent::decode(__u8 version, bufferlist::const_iterator& it) {
+}
+
+void AioFlushEvent::dump(Formatter *f) const {
+}
+
+void OpEventBase::encode(bufferlist& bl) const {
+ using ceph::encode;
+ encode(op_tid, bl);
+}
+
+void OpEventBase::decode(__u8 version, bufferlist::const_iterator& it) {
+ using ceph::decode;
+ decode(op_tid, it);
+}
+
+void OpEventBase::dump(Formatter *f) const {
+ f->dump_unsigned("op_tid", op_tid);
+}
+
+void OpFinishEvent::encode(bufferlist& bl) const {
+ OpEventBase::encode(bl);
+ using ceph::encode;
+ encode(op_tid, bl);
+ encode(r, bl);
+}
+
+void OpFinishEvent::decode(__u8 version, bufferlist::const_iterator& it) {
+ OpEventBase::decode(version, it);
+ using ceph::decode;
+ decode(op_tid, it);
+ decode(r, it);
+}
+
+void OpFinishEvent::dump(Formatter *f) const {
+ OpEventBase::dump(f);
+ f->dump_unsigned("op_tid", op_tid);
+ f->dump_int("result", r);
+}
+
+void SnapEventBase::encode(bufferlist& bl) const {
+ using ceph::encode;
+ OpEventBase::encode(bl);
+ encode(snap_name, bl);
+ encode(snap_namespace, bl);
+}
+
+void SnapEventBase::decode(__u8 version, bufferlist::const_iterator& it) {
+ using ceph::decode;
+ OpEventBase::decode(version, it);
+ using ceph::decode;
+ decode(snap_name, it);
+ if (version >= 4) {
+ decode(snap_namespace, it);
+ }
+}
+
+void SnapEventBase::dump(Formatter *f) const {
+ OpEventBase::dump(f);
+ f->dump_string("snap_name", snap_name);
+ snap_namespace.dump(f);
+}
+
+void SnapCreateEvent::encode(bufferlist &bl) const {
+ SnapEventBase::encode(bl);
+}
+
+void SnapCreateEvent::decode(__u8 version, bufferlist::const_iterator& it) {
+ using ceph::decode;
+ SnapEventBase::decode(version, it);
+ if (version == 3) {
+ decode(snap_namespace, it);
+ }
+}
+
+void SnapCreateEvent::dump(Formatter *f) const {
+ SnapEventBase::dump(f);
+}
+
+void SnapLimitEvent::encode(bufferlist &bl) const {
+ OpEventBase::encode(bl);
+ using ceph::encode;
+ encode(limit, bl);
+}
+
+void SnapLimitEvent::decode(__u8 version, bufferlist::const_iterator& it) {
+ OpEventBase::decode(version, it);
+ using ceph::decode;
+ decode(limit, it);
+}
+
+void SnapLimitEvent::dump(Formatter *f) const {
+ OpEventBase::dump(f);
+ f->dump_unsigned("limit", limit);
+}
+
+void SnapRenameEvent::encode(bufferlist& bl) const {
+ OpEventBase::encode(bl);
+ using ceph::encode;
+ encode(dst_snap_name, bl);
+ encode(snap_id, bl);
+ encode(src_snap_name, bl);
+}
+
+void SnapRenameEvent::decode(__u8 version, bufferlist::const_iterator& it) {
+ using ceph::decode;
+ OpEventBase::decode(version, it);
+ decode(dst_snap_name, it);
+ decode(snap_id, it);
+ if (version >= 2) {
+ decode(src_snap_name, it);
+ }
+}
+
+void SnapRenameEvent::dump(Formatter *f) const {
+ OpEventBase::dump(f);
+ f->dump_unsigned("src_snap_id", snap_id);
+ f->dump_string("src_snap_name", src_snap_name);
+ f->dump_string("dest_snap_name", dst_snap_name);
+}
+
+void RenameEvent::encode(bufferlist& bl) const {
+ OpEventBase::encode(bl);
+ using ceph::encode;
+ encode(image_name, bl);
+}
+
+void RenameEvent::decode(__u8 version, bufferlist::const_iterator& it) {
+ OpEventBase::decode(version, it);
+ using ceph::decode;
+ decode(image_name, it);
+}
+
+void RenameEvent::dump(Formatter *f) const {
+ OpEventBase::dump(f);
+ f->dump_string("image_name", image_name);
+}
+
+void ResizeEvent::encode(bufferlist& bl) const {
+ OpEventBase::encode(bl);
+ using ceph::encode;
+ encode(size, bl);
+}
+
+void ResizeEvent::decode(__u8 version, bufferlist::const_iterator& it) {
+ OpEventBase::decode(version, it);
+ using ceph::decode;
+ decode(size, it);
+}
+
+void ResizeEvent::dump(Formatter *f) const {
+ OpEventBase::dump(f);
+ f->dump_unsigned("size", size);
+}
+
+void DemotePromoteEvent::encode(bufferlist& bl) const {
+}
+
+void DemotePromoteEvent::decode(__u8 version, bufferlist::const_iterator& it) {
+}
+
+void DemotePromoteEvent::dump(Formatter *f) const {
+}
+
+void UpdateFeaturesEvent::encode(bufferlist& bl) const {
+ OpEventBase::encode(bl);
+ using ceph::encode;
+ encode(features, bl);
+ encode(enabled, bl);
+}
+
+void UpdateFeaturesEvent::decode(__u8 version, bufferlist::const_iterator& it) {
+ OpEventBase::decode(version, it);
+ using ceph::decode;
+ decode(features, it);
+ decode(enabled, it);
+}
+
+void UpdateFeaturesEvent::dump(Formatter *f) const {
+ OpEventBase::dump(f);
+ f->dump_unsigned("features", features);
+ f->dump_bool("enabled", enabled);
+}
+
+void MetadataSetEvent::encode(bufferlist& bl) const {
+ OpEventBase::encode(bl);
+ using ceph::encode;
+ encode(key, bl);
+ encode(value, bl);
+}
+
+void MetadataSetEvent::decode(__u8 version, bufferlist::const_iterator& it) {
+ OpEventBase::decode(version, it);
+ using ceph::decode;
+ decode(key, it);
+ decode(value, it);
+}
+
+void MetadataSetEvent::dump(Formatter *f) const {
+ OpEventBase::dump(f);
+ f->dump_string("key", key);
+ f->dump_string("value", value);
+}
+
+void MetadataRemoveEvent::encode(bufferlist& bl) const {
+ OpEventBase::encode(bl);
+ using ceph::encode;
+ encode(key, bl);
+}
+
+void MetadataRemoveEvent::decode(__u8 version, bufferlist::const_iterator& it) {
+ OpEventBase::decode(version, it);
+ using ceph::decode;
+ decode(key, it);
+}
+
+void MetadataRemoveEvent::dump(Formatter *f) const {
+ OpEventBase::dump(f);
+ f->dump_string("key", key);
+}
+
+void UnknownEvent::encode(bufferlist& bl) const {
+ ceph_abort();
+}
+
+void UnknownEvent::decode(__u8 version, bufferlist::const_iterator& it) {
+}
+
+void UnknownEvent::dump(Formatter *f) const {
+}
+
+EventType EventEntry::get_event_type() const {
+ return boost::apply_visitor(GetTypeVisitor<EventType>(), event);
+}
+
+void EventEntry::encode(bufferlist& bl) const {
+ ENCODE_START(5, 1, bl);
+ boost::apply_visitor(EncodeVisitor(bl), event);
+ ENCODE_FINISH(bl);
+ encode_metadata(bl);
+}
+
+void EventEntry::decode(bufferlist::const_iterator& it) {
+ DECODE_START(1, it);
+
+ uint32_t event_type;
+ decode(event_type, it);
+
+ // select the correct payload variant based upon the encoded op
+ switch (event_type) {
+ case EVENT_TYPE_AIO_DISCARD:
+ event = AioDiscardEvent();
+ break;
+ case EVENT_TYPE_AIO_WRITE:
+ event = AioWriteEvent();
+ break;
+ case EVENT_TYPE_AIO_FLUSH:
+ event = AioFlushEvent();
+ break;
+ case EVENT_TYPE_OP_FINISH:
+ event = OpFinishEvent();
+ break;
+ case EVENT_TYPE_SNAP_CREATE:
+ event = SnapCreateEvent();
+ break;
+ case EVENT_TYPE_SNAP_REMOVE:
+ event = SnapRemoveEvent();
+ break;
+ case EVENT_TYPE_SNAP_RENAME:
+ event = SnapRenameEvent();
+ break;
+ case EVENT_TYPE_SNAP_PROTECT:
+ event = SnapProtectEvent();
+ break;
+ case EVENT_TYPE_SNAP_UNPROTECT:
+ event = SnapUnprotectEvent();
+ break;
+ case EVENT_TYPE_SNAP_ROLLBACK:
+ event = SnapRollbackEvent();
+ break;
+ case EVENT_TYPE_RENAME:
+ event = RenameEvent();
+ break;
+ case EVENT_TYPE_RESIZE:
+ event = ResizeEvent();
+ break;
+ case EVENT_TYPE_FLATTEN:
+ event = FlattenEvent();
+ break;
+ case EVENT_TYPE_DEMOTE_PROMOTE:
+ event = DemotePromoteEvent();
+ break;
+ case EVENT_TYPE_UPDATE_FEATURES:
+ event = UpdateFeaturesEvent();
+ break;
+ case EVENT_TYPE_METADATA_SET:
+ event = MetadataSetEvent();
+ break;
+ case EVENT_TYPE_METADATA_REMOVE:
+ event = MetadataRemoveEvent();
+ break;
+ case EVENT_TYPE_AIO_WRITESAME:
+ event = AioWriteSameEvent();
+ break;
+ case EVENT_TYPE_AIO_COMPARE_AND_WRITE:
+ event = AioCompareAndWriteEvent();
+ break;
+ default:
+ event = UnknownEvent();
+ break;
+ }
+
+ boost::apply_visitor(DecodeVisitor(struct_v, it), event);
+ DECODE_FINISH(it);
+ if (struct_v >= 4) {
+ decode_metadata(it);
+ }
+}
+
+void EventEntry::dump(Formatter *f) const {
+ boost::apply_visitor(DumpVisitor(f, "event_type"), event);
+ f->dump_stream("timestamp") << timestamp;
+}
+
+void EventEntry::encode_metadata(bufferlist& bl) const {
+ ENCODE_START(1, 1, bl);
+ encode(timestamp, bl);
+ ENCODE_FINISH(bl);
+}
+
+void EventEntry::decode_metadata(bufferlist::const_iterator& it) {
+ DECODE_START(1, it);
+ decode(timestamp, it);
+ DECODE_FINISH(it);
+}
+
+void EventEntry::generate_test_instances(std::list<EventEntry *> &o) {
+ o.push_back(new EventEntry(AioDiscardEvent()));
+ o.push_back(new EventEntry(AioDiscardEvent(123, 345, 4096), utime_t(1, 1)));
+
+ bufferlist bl;
+ bl.append(std::string(32, '1'));
+ o.push_back(new EventEntry(AioWriteEvent()));
+ o.push_back(new EventEntry(AioWriteEvent(123, 456, bl), utime_t(1, 1)));
+
+ o.push_back(new EventEntry(AioFlushEvent()));
+
+ o.push_back(new EventEntry(OpFinishEvent(123, -1), utime_t(1, 1)));
+
+ o.push_back(new EventEntry(SnapCreateEvent(), utime_t(1, 1)));
+ o.push_back(new EventEntry(SnapCreateEvent(234, cls::rbd::UserSnapshotNamespace(), "snap"), utime_t(1, 1)));
+
+ o.push_back(new EventEntry(SnapRemoveEvent()));
+ o.push_back(new EventEntry(SnapRemoveEvent(345, cls::rbd::UserSnapshotNamespace(), "snap"), utime_t(1, 1)));
+
+ o.push_back(new EventEntry(SnapRenameEvent()));
+ o.push_back(new EventEntry(SnapRenameEvent(456, 1, "src snap", "dest snap"),
+ utime_t(1, 1)));
+
+ o.push_back(new EventEntry(SnapProtectEvent()));
+ o.push_back(new EventEntry(SnapProtectEvent(567, cls::rbd::UserSnapshotNamespace(), "snap"), utime_t(1, 1)));
+
+ o.push_back(new EventEntry(SnapUnprotectEvent()));
+ o.push_back(new EventEntry(SnapUnprotectEvent(678, cls::rbd::UserSnapshotNamespace(), "snap"), utime_t(1, 1)));
+
+ o.push_back(new EventEntry(SnapRollbackEvent()));
+ o.push_back(new EventEntry(SnapRollbackEvent(789, cls::rbd::UserSnapshotNamespace(), "snap"), utime_t(1, 1)));
+
+ o.push_back(new EventEntry(RenameEvent()));
+ o.push_back(new EventEntry(RenameEvent(890, "image name"), utime_t(1, 1)));
+
+ o.push_back(new EventEntry(ResizeEvent()));
+ o.push_back(new EventEntry(ResizeEvent(901, 1234), utime_t(1, 1)));
+
+ o.push_back(new EventEntry(FlattenEvent(123), utime_t(1, 1)));
+
+ o.push_back(new EventEntry(DemotePromoteEvent()));
+
+ o.push_back(new EventEntry(UpdateFeaturesEvent()));
+ o.push_back(new EventEntry(UpdateFeaturesEvent(123, 127, true), utime_t(1, 1)));
+
+ o.push_back(new EventEntry(MetadataSetEvent()));
+ o.push_back(new EventEntry(MetadataSetEvent(123, "key", "value"), utime_t(1, 1)));
+
+ o.push_back(new EventEntry(MetadataRemoveEvent()));
+ o.push_back(new EventEntry(MetadataRemoveEvent(123, "key"), utime_t(1, 1)));
+}
+
+// Journal Client
+
+void ImageClientMeta::encode(bufferlist& bl) const {
+ using ceph::encode;
+ encode(tag_class, bl);
+ encode(resync_requested, bl);
+}
+
+void ImageClientMeta::decode(__u8 version, bufferlist::const_iterator& it) {
+ using ceph::decode;
+ decode(tag_class, it);
+ decode(resync_requested, it);
+}
+
+void ImageClientMeta::dump(Formatter *f) const {
+ f->dump_unsigned("tag_class", tag_class);
+ f->dump_bool("resync_requested", resync_requested);
+}
+
+void MirrorPeerSyncPoint::encode(bufferlist& bl) const {
+ using ceph::encode;
+ encode(snap_name, bl);
+ encode(from_snap_name, bl);
+ encode(object_number, bl);
+ encode(snap_namespace, bl);
+}
+
+void MirrorPeerSyncPoint::decode(__u8 version, bufferlist::const_iterator& it) {
+ using ceph::decode;
+ decode(snap_name, it);
+ decode(from_snap_name, it);
+ decode(object_number, it);
+ if (version >= 2) {
+ decode(snap_namespace, it);
+ }
+}
+
+void MirrorPeerSyncPoint::dump(Formatter *f) const {
+ f->dump_string("snap_name", snap_name);
+ f->dump_string("from_snap_name", from_snap_name);
+ if (object_number) {
+ f->dump_unsigned("object_number", *object_number);
+ }
+ snap_namespace.dump(f);
+}
+
+void MirrorPeerClientMeta::encode(bufferlist& bl) const {
+ using ceph::encode;
+ encode(image_id, bl);
+ encode(static_cast<uint32_t>(state), bl);
+ encode(sync_object_count, bl);
+ encode(static_cast<uint32_t>(sync_points.size()), bl);
+ for (auto &sync_point : sync_points) {
+ sync_point.encode(bl);
+ }
+ encode(snap_seqs, bl);
+}
+
+void MirrorPeerClientMeta::decode(__u8 version, bufferlist::const_iterator& it) {
+ using ceph::decode;
+ decode(image_id, it);
+
+ uint32_t decode_state;
+ decode(decode_state, it);
+ state = static_cast<MirrorPeerState>(decode_state);
+
+ decode(sync_object_count, it);
+
+ uint32_t sync_point_count;
+ decode(sync_point_count, it);
+ sync_points.resize(sync_point_count);
+ for (auto &sync_point : sync_points) {
+ sync_point.decode(version, it);
+ }
+
+ decode(snap_seqs, it);
+}
+
+void MirrorPeerClientMeta::dump(Formatter *f) const {
+ f->dump_string("image_id", image_id);
+ f->dump_stream("state") << state;
+ f->dump_unsigned("sync_object_count", sync_object_count);
+ f->open_array_section("sync_points");
+ for (auto &sync_point : sync_points) {
+ f->open_object_section("sync_point");
+ sync_point.dump(f);
+ f->close_section();
+ }
+ f->close_section();
+ f->open_array_section("snap_seqs");
+ for (auto &pair : snap_seqs) {
+ f->open_object_section("snap_seq");
+ f->dump_unsigned("local_snap_seq", pair.first);
+ f->dump_unsigned("peer_snap_seq", pair.second);
+ f->close_section();
+ }
+ f->close_section();
+}
+
+void CliClientMeta::encode(bufferlist& bl) const {
+}
+
+void CliClientMeta::decode(__u8 version, bufferlist::const_iterator& it) {
+}
+
+void CliClientMeta::dump(Formatter *f) const {
+}
+
+void UnknownClientMeta::encode(bufferlist& bl) const {
+ ceph_abort();
+}
+
+void UnknownClientMeta::decode(__u8 version, bufferlist::const_iterator& it) {
+}
+
+void UnknownClientMeta::dump(Formatter *f) const {
+}
+
+ClientMetaType ClientData::get_client_meta_type() const {
+ return boost::apply_visitor(GetTypeVisitor<ClientMetaType>(), client_meta);
+}
+
+void ClientData::encode(bufferlist& bl) const {
+ ENCODE_START(2, 1, bl);
+ boost::apply_visitor(EncodeVisitor(bl), client_meta);
+ ENCODE_FINISH(bl);
+}
+
+void ClientData::decode(bufferlist::const_iterator& it) {
+ DECODE_START(1, it);
+
+ uint32_t client_meta_type;
+ decode(client_meta_type, it);
+
+ // select the correct payload variant based upon the encoded op
+ switch (client_meta_type) {
+ case IMAGE_CLIENT_META_TYPE:
+ client_meta = ImageClientMeta();
+ break;
+ case MIRROR_PEER_CLIENT_META_TYPE:
+ client_meta = MirrorPeerClientMeta();
+ break;
+ case CLI_CLIENT_META_TYPE:
+ client_meta = CliClientMeta();
+ break;
+ default:
+ client_meta = UnknownClientMeta();
+ break;
+ }
+
+ boost::apply_visitor(DecodeVisitor(struct_v, it), client_meta);
+ DECODE_FINISH(it);
+}
+
+void ClientData::dump(Formatter *f) const {
+ boost::apply_visitor(DumpVisitor(f, "client_meta_type"), client_meta);
+}
+
+void ClientData::generate_test_instances(std::list<ClientData *> &o) {
+ o.push_back(new ClientData(ImageClientMeta()));
+ o.push_back(new ClientData(ImageClientMeta(123)));
+ o.push_back(new ClientData(MirrorPeerClientMeta()));
+ o.push_back(new ClientData(MirrorPeerClientMeta("image_id",
+ {{{}, "snap 2", "snap 1", 123}},
+ {{1, 2}, {3, 4}})));
+ o.push_back(new ClientData(CliClientMeta()));
+}
+
+// Journal Tag
+
+void TagPredecessor::encode(bufferlist& bl) const {
+ using ceph::encode;
+ encode(mirror_uuid, bl);
+ encode(commit_valid, bl);
+ encode(tag_tid, bl);
+ encode(entry_tid, bl);
+}
+
+void TagPredecessor::decode(bufferlist::const_iterator& it) {
+ using ceph::decode;
+ decode(mirror_uuid, it);
+ decode(commit_valid, it);
+ decode(tag_tid, it);
+ decode(entry_tid, it);
+}
+
+void TagPredecessor::dump(Formatter *f) const {
+ f->dump_string("mirror_uuid", mirror_uuid);
+ f->dump_string("commit_valid", commit_valid ? "true" : "false");
+ f->dump_unsigned("tag_tid", tag_tid);
+ f->dump_unsigned("entry_tid", entry_tid);
+}
+
+void TagData::encode(bufferlist& bl) const {
+ using ceph::encode;
+ encode(mirror_uuid, bl);
+ predecessor.encode(bl);
+}
+
+void TagData::decode(bufferlist::const_iterator& it) {
+ using ceph::decode;
+ decode(mirror_uuid, it);
+ predecessor.decode(it);
+}
+
+void TagData::dump(Formatter *f) const {
+ f->dump_string("mirror_uuid", mirror_uuid);
+ f->open_object_section("predecessor");
+ predecessor.dump(f);
+ f->close_section();
+}
+
+void TagData::generate_test_instances(std::list<TagData *> &o) {
+ o.push_back(new TagData());
+ o.push_back(new TagData("mirror-uuid"));
+ o.push_back(new TagData("mirror-uuid", "remote-mirror-uuid", true, 123, 234));
+}
+
+std::ostream &operator<<(std::ostream &out, const EventType &type) {
+ using namespace librbd::journal;
+
+ switch (type) {
+ case EVENT_TYPE_AIO_DISCARD:
+ out << "AioDiscard";
+ break;
+ case EVENT_TYPE_AIO_WRITE:
+ out << "AioWrite";
+ break;
+ case EVENT_TYPE_AIO_FLUSH:
+ out << "AioFlush";
+ break;
+ case EVENT_TYPE_OP_FINISH:
+ out << "OpFinish";
+ break;
+ case EVENT_TYPE_SNAP_CREATE:
+ out << "SnapCreate";
+ break;
+ case EVENT_TYPE_SNAP_REMOVE:
+ out << "SnapRemove";
+ break;
+ case EVENT_TYPE_SNAP_RENAME:
+ out << "SnapRename";
+ break;
+ case EVENT_TYPE_SNAP_PROTECT:
+ out << "SnapProtect";
+ break;
+ case EVENT_TYPE_SNAP_UNPROTECT:
+ out << "SnapUnprotect";
+ break;
+ case EVENT_TYPE_SNAP_ROLLBACK:
+ out << "SnapRollback";
+ break;
+ case EVENT_TYPE_RENAME:
+ out << "Rename";
+ break;
+ case EVENT_TYPE_RESIZE:
+ out << "Resize";
+ break;
+ case EVENT_TYPE_FLATTEN:
+ out << "Flatten";
+ break;
+ case EVENT_TYPE_DEMOTE_PROMOTE:
+ out << "Demote/Promote";
+ break;
+ case EVENT_TYPE_UPDATE_FEATURES:
+ out << "UpdateFeatures";
+ break;
+ case EVENT_TYPE_METADATA_SET:
+ out << "MetadataSet";
+ break;
+ case EVENT_TYPE_METADATA_REMOVE:
+ out << "MetadataRemove";
+ break;
+ case EVENT_TYPE_AIO_WRITESAME:
+ out << "AioWriteSame";
+ break;
+ case EVENT_TYPE_AIO_COMPARE_AND_WRITE:
+ out << "AioCompareAndWrite";
+ break;
+ default:
+ out << "Unknown (" << static_cast<uint32_t>(type) << ")";
+ break;
+ }
+ return out;
+}
+
+std::ostream &operator<<(std::ostream &out, const ClientMetaType &type) {
+ using namespace librbd::journal;
+
+ switch (type) {
+ case IMAGE_CLIENT_META_TYPE:
+ out << "Master Image";
+ break;
+ case MIRROR_PEER_CLIENT_META_TYPE:
+ out << "Mirror Peer";
+ break;
+ case CLI_CLIENT_META_TYPE:
+ out << "CLI Tool";
+ break;
+ default:
+ out << "Unknown (" << static_cast<uint32_t>(type) << ")";
+ break;
+ }
+ return out;
+}
+
+std::ostream &operator<<(std::ostream &out, const ImageClientMeta &meta) {
+ out << "[tag_class=" << meta.tag_class << "]";
+ return out;
+}
+
+std::ostream &operator<<(std::ostream &out, const MirrorPeerSyncPoint &sync) {
+ out << "[snap_name=" << sync.snap_name << ", "
+ << "from_snap_name=" << sync.from_snap_name;
+ if (sync.object_number) {
+ out << ", " << *sync.object_number;
+ }
+ out << "]";
+ return out;
+}
+
+std::ostream &operator<<(std::ostream &out, const MirrorPeerState &state) {
+ switch (state) {
+ case MIRROR_PEER_STATE_SYNCING:
+ out << "Syncing";
+ break;
+ case MIRROR_PEER_STATE_REPLAYING:
+ out << "Replaying";
+ break;
+ default:
+ out << "Unknown (" << static_cast<uint32_t>(state) << ")";
+ break;
+ }
+ return out;
+}
+
+std::ostream &operator<<(std::ostream &out, const MirrorPeerClientMeta &meta) {
+ out << "[image_id=" << meta.image_id << ", "
+ << "state=" << meta.state << ", "
+ << "sync_object_count=" << meta.sync_object_count << ", "
+ << "sync_points=[";
+ std::string delimiter;
+ for (auto &sync_point : meta.sync_points) {
+ out << delimiter << "[" << sync_point << "]";
+ delimiter = ", ";
+ }
+ out << "], snap_seqs=[";
+ delimiter = "";
+ for (auto &pair : meta.snap_seqs) {
+ out << delimiter << "["
+ << "local_snap_seq=" << pair.first << ", "
+ << "peer_snap_seq" << pair.second << "]";
+ delimiter = ", ";
+ }
+ out << "]";
+ return out;
+}
+
+std::ostream &operator<<(std::ostream &out, const TagPredecessor &predecessor) {
+ out << "["
+ << "mirror_uuid=" << predecessor.mirror_uuid;
+ if (predecessor.commit_valid) {
+ out << ", "
+ << "tag_tid=" << predecessor.tag_tid << ", "
+ << "entry_tid=" << predecessor.entry_tid;
+ }
+ out << "]";
+ return out;
+}
+
+std::ostream &operator<<(std::ostream &out, const TagData &tag_data) {
+ out << "["
+ << "mirror_uuid=" << tag_data.mirror_uuid << ", "
+ << "predecessor=" << tag_data.predecessor
+ << "]";
+ return out;
+}
+
+} // namespace journal
+} // namespace librbd
+
diff --git a/src/librbd/journal/Types.h b/src/librbd/journal/Types.h
new file mode 100644
index 00000000..ae5681ad
--- /dev/null
+++ b/src/librbd/journal/Types.h
@@ -0,0 +1,685 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#ifndef CEPH_LIBRBD_JOURNAL_TYPES_H
+#define CEPH_LIBRBD_JOURNAL_TYPES_H
+
+#include "cls/rbd/cls_rbd_types.h"
+#include "include/int_types.h"
+#include "include/buffer.h"
+#include "include/encoding.h"
+#include "include/types.h"
+#include "include/utime.h"
+#include "librbd/Types.h"
+#include <iosfwd>
+#include <list>
+#include <boost/none.hpp>
+#include <boost/optional.hpp>
+#include <boost/variant.hpp>
+#include <boost/mpl/vector.hpp>
+
+namespace ceph {
+class Formatter;
+}
+
+namespace librbd {
+namespace journal {
+
+enum EventType {
+ EVENT_TYPE_AIO_DISCARD = 0,
+ EVENT_TYPE_AIO_WRITE = 1,
+ EVENT_TYPE_AIO_FLUSH = 2,
+ EVENT_TYPE_OP_FINISH = 3,
+ EVENT_TYPE_SNAP_CREATE = 4,
+ EVENT_TYPE_SNAP_REMOVE = 5,
+ EVENT_TYPE_SNAP_RENAME = 6,
+ EVENT_TYPE_SNAP_PROTECT = 7,
+ EVENT_TYPE_SNAP_UNPROTECT = 8,
+ EVENT_TYPE_SNAP_ROLLBACK = 9,
+ EVENT_TYPE_RENAME = 10,
+ EVENT_TYPE_RESIZE = 11,
+ EVENT_TYPE_FLATTEN = 12,
+ EVENT_TYPE_DEMOTE_PROMOTE = 13,
+ EVENT_TYPE_SNAP_LIMIT = 14,
+ EVENT_TYPE_UPDATE_FEATURES = 15,
+ EVENT_TYPE_METADATA_SET = 16,
+ EVENT_TYPE_METADATA_REMOVE = 17,
+ EVENT_TYPE_AIO_WRITESAME = 18,
+ EVENT_TYPE_AIO_COMPARE_AND_WRITE = 19,
+};
+
+struct AioDiscardEvent {
+ static const EventType TYPE = EVENT_TYPE_AIO_DISCARD;
+
+ uint64_t offset = 0;
+ uint64_t length = 0;
+ uint32_t discard_granularity_bytes = 0;
+
+ AioDiscardEvent() {
+ }
+ AioDiscardEvent(uint64_t _offset, uint64_t _length,
+ uint32_t discard_granularity_bytes)
+ : offset(_offset), length(_length),
+ discard_granularity_bytes(discard_granularity_bytes) {
+ }
+
+ void encode(bufferlist& bl) const;
+ void decode(__u8 version, bufferlist::const_iterator& it);
+ void dump(Formatter *f) const;
+};
+
+struct AioWriteEvent {
+ static const EventType TYPE = EVENT_TYPE_AIO_WRITE;
+
+ uint64_t offset;
+ uint64_t length;
+ bufferlist data;
+
+ static uint32_t get_fixed_size();
+
+ AioWriteEvent() : offset(0), length(0) {
+ }
+ AioWriteEvent(uint64_t _offset, uint64_t _length, const bufferlist &_data)
+ : offset(_offset), length(_length), data(_data) {
+ }
+
+ void encode(bufferlist& bl) const;
+ void decode(__u8 version, bufferlist::const_iterator& it);
+ void dump(Formatter *f) const;
+};
+
+struct AioWriteSameEvent {
+ static const EventType TYPE = EVENT_TYPE_AIO_WRITESAME;
+
+ uint64_t offset;
+ uint64_t length;
+ bufferlist data;
+
+ AioWriteSameEvent() : offset(0), length(0) {
+ }
+ AioWriteSameEvent(uint64_t _offset, uint64_t _length,
+ const bufferlist &_data)
+ : offset(_offset), length(_length), data(_data) {
+ }
+
+ void encode(bufferlist& bl) const;
+ void decode(__u8 version, bufferlist::const_iterator& it);
+ void dump(Formatter *f) const;
+};
+
+struct AioCompareAndWriteEvent {
+ static const EventType TYPE = EVENT_TYPE_AIO_COMPARE_AND_WRITE;
+
+ uint64_t offset;
+ uint64_t length;
+ bufferlist cmp_data;
+ bufferlist write_data;
+
+ static uint32_t get_fixed_size();
+
+ AioCompareAndWriteEvent() : offset(0), length(0) {
+ }
+ AioCompareAndWriteEvent(uint64_t _offset, uint64_t _length,
+ const bufferlist &_cmp_data, const bufferlist &_write_data)
+ : offset(_offset), length(_length), cmp_data(_cmp_data), write_data(_write_data) {
+ }
+
+ void encode(bufferlist& bl) const;
+ void decode(__u8 version, bufferlist::const_iterator& it);
+ void dump(Formatter *f) const;
+};
+
+struct AioFlushEvent {
+ static const EventType TYPE = EVENT_TYPE_AIO_FLUSH;
+
+ void encode(bufferlist& bl) const;
+ void decode(__u8 version, bufferlist::const_iterator& it);
+ void dump(Formatter *f) const;
+};
+
+struct OpEventBase {
+ uint64_t op_tid;
+
+protected:
+ OpEventBase() : op_tid(0) {
+ }
+ OpEventBase(uint64_t op_tid) : op_tid(op_tid) {
+ }
+
+ void encode(bufferlist& bl) const;
+ void decode(__u8 version, bufferlist::const_iterator& it);
+ void dump(Formatter *f) const;
+};
+
+struct OpFinishEvent : public OpEventBase {
+ static const EventType TYPE = EVENT_TYPE_OP_FINISH;
+
+ int r;
+
+ OpFinishEvent() : r(0) {
+ }
+ OpFinishEvent(uint64_t op_tid, int r) : OpEventBase(op_tid), r(r) {
+ }
+
+ void encode(bufferlist& bl) const;
+ void decode(__u8 version, bufferlist::const_iterator& it);
+ void dump(Formatter *f) const;
+};
+
+struct SnapEventBase : public OpEventBase {
+ cls::rbd::SnapshotNamespace snap_namespace;
+ std::string snap_name;
+
+protected:
+ SnapEventBase() {
+ }
+ SnapEventBase(uint64_t op_tid, const cls::rbd::SnapshotNamespace& _snap_namespace,
+ const std::string &_snap_name)
+ : OpEventBase(op_tid),
+ snap_namespace(_snap_namespace),
+ snap_name(_snap_name) {
+ }
+
+ void encode(bufferlist& bl) const;
+ void decode(__u8 version, bufferlist::const_iterator& it);
+ void dump(Formatter *f) const;
+};
+
+struct SnapCreateEvent : public SnapEventBase {
+ static const EventType TYPE = EVENT_TYPE_SNAP_CREATE;
+
+ SnapCreateEvent() {
+ }
+ SnapCreateEvent(uint64_t op_tid, const cls::rbd::SnapshotNamespace& snap_namespace,
+ const std::string &snap_name)
+ : SnapEventBase(op_tid, snap_namespace, snap_name) {
+ }
+
+ void encode(bufferlist& bl) const;
+ void decode(__u8 version, bufferlist::const_iterator& it);
+ void dump(Formatter *f) const;
+};
+
+struct SnapRemoveEvent : public SnapEventBase {
+ static const EventType TYPE = EVENT_TYPE_SNAP_REMOVE;
+
+ SnapRemoveEvent() {
+ }
+ SnapRemoveEvent(uint64_t op_tid, const cls::rbd::SnapshotNamespace& snap_namespace,
+ const std::string &snap_name)
+ : SnapEventBase(op_tid, snap_namespace, snap_name) {
+ }
+
+ using SnapEventBase::encode;
+ using SnapEventBase::decode;
+ using SnapEventBase::dump;
+};
+
+struct SnapRenameEvent : public OpEventBase{
+ static const EventType TYPE = EVENT_TYPE_SNAP_RENAME;
+
+ uint64_t snap_id;
+ std::string src_snap_name;
+ std::string dst_snap_name;
+
+ SnapRenameEvent() : snap_id(CEPH_NOSNAP) {
+ }
+ SnapRenameEvent(uint64_t op_tid, uint64_t src_snap_id,
+ const std::string &src_snap_name,
+ const std::string &dest_snap_name)
+ : OpEventBase(op_tid),
+ snap_id(src_snap_id),
+ src_snap_name(src_snap_name),
+ dst_snap_name(dest_snap_name) {
+ }
+
+ void encode(bufferlist& bl) const;
+ void decode(__u8 version, bufferlist::const_iterator& it);
+ void dump(Formatter *f) const;
+};
+
+struct SnapProtectEvent : public SnapEventBase {
+ static const EventType TYPE = EVENT_TYPE_SNAP_PROTECT;
+
+ SnapProtectEvent() {
+ }
+ SnapProtectEvent(uint64_t op_tid, const cls::rbd::SnapshotNamespace& snap_namespace,
+ const std::string &snap_name)
+ : SnapEventBase(op_tid, snap_namespace, snap_name) {
+ }
+
+ using SnapEventBase::encode;
+ using SnapEventBase::decode;
+ using SnapEventBase::dump;
+};
+
+struct SnapUnprotectEvent : public SnapEventBase {
+ static const EventType TYPE = EVENT_TYPE_SNAP_UNPROTECT;
+
+ SnapUnprotectEvent() {
+ }
+ SnapUnprotectEvent(uint64_t op_tid, const cls::rbd::SnapshotNamespace &snap_namespace,
+ const std::string &snap_name)
+ : SnapEventBase(op_tid, snap_namespace, snap_name) {
+ }
+
+ using SnapEventBase::encode;
+ using SnapEventBase::decode;
+ using SnapEventBase::dump;
+};
+
+struct SnapLimitEvent : public OpEventBase {
+ static const EventType TYPE = EVENT_TYPE_SNAP_LIMIT;
+ uint64_t limit;
+
+ SnapLimitEvent() {
+ }
+ SnapLimitEvent(uint64_t op_tid, const uint64_t _limit)
+ : OpEventBase(op_tid), limit(_limit) {
+ }
+
+ void encode(bufferlist& bl) const;
+ void decode(__u8 version, bufferlist::const_iterator& it);
+ void dump(Formatter *f) const;
+};
+
+struct SnapRollbackEvent : public SnapEventBase {
+ static const EventType TYPE = EVENT_TYPE_SNAP_ROLLBACK;
+
+ SnapRollbackEvent() {
+ }
+ SnapRollbackEvent(uint64_t op_tid, const cls::rbd::SnapshotNamespace& snap_namespace,
+ const std::string &snap_name)
+ : SnapEventBase(op_tid, snap_namespace, snap_name) {
+ }
+
+ using SnapEventBase::encode;
+ using SnapEventBase::decode;
+ using SnapEventBase::dump;
+};
+
+struct RenameEvent : public OpEventBase {
+ static const EventType TYPE = EVENT_TYPE_RENAME;
+
+ std::string image_name;
+
+ RenameEvent() {
+ }
+ RenameEvent(uint64_t op_tid, const std::string &_image_name)
+ : OpEventBase(op_tid), image_name(_image_name) {
+ }
+
+ void encode(bufferlist& bl) const;
+ void decode(__u8 version, bufferlist::const_iterator& it);
+ void dump(Formatter *f) const;
+};
+
+struct ResizeEvent : public OpEventBase {
+ static const EventType TYPE = EVENT_TYPE_RESIZE;
+
+ uint64_t size;
+
+ ResizeEvent() : size(0) {
+ }
+ ResizeEvent(uint64_t op_tid, uint64_t _size)
+ : OpEventBase(op_tid), size(_size) {
+ }
+
+ void encode(bufferlist& bl) const;
+ void decode(__u8 version, bufferlist::const_iterator& it);
+ void dump(Formatter *f) const;
+};
+
+struct FlattenEvent : public OpEventBase {
+ static const EventType TYPE = EVENT_TYPE_FLATTEN;
+
+ FlattenEvent() {
+ }
+ FlattenEvent(uint64_t op_tid) : OpEventBase(op_tid) {
+ }
+
+ using OpEventBase::encode;
+ using OpEventBase::decode;
+ using OpEventBase::dump;
+};
+
+struct DemotePromoteEvent {
+ static const EventType TYPE = static_cast<EventType>(
+ EVENT_TYPE_DEMOTE_PROMOTE);
+
+ void encode(bufferlist& bl) const;
+ void decode(__u8 version, bufferlist::const_iterator& it);
+ void dump(Formatter *f) const;
+};
+
+struct UpdateFeaturesEvent : public OpEventBase {
+ static const EventType TYPE = EVENT_TYPE_UPDATE_FEATURES;
+
+ uint64_t features;
+ bool enabled;
+
+ UpdateFeaturesEvent() : features(0), enabled(false) {
+ }
+ UpdateFeaturesEvent(uint64_t op_tid, uint64_t _features, bool _enabled)
+ : OpEventBase(op_tid), features(_features), enabled(_enabled) {
+ }
+
+ void encode(bufferlist& bl) const;
+ void decode(__u8 version, bufferlist::const_iterator& it);
+ void dump(Formatter *f) const;
+};
+
+struct MetadataSetEvent : public OpEventBase {
+ static const EventType TYPE = EVENT_TYPE_METADATA_SET;
+
+ string key;
+ string value;
+
+ MetadataSetEvent() {
+ }
+ MetadataSetEvent(uint64_t op_tid, const string &_key, const string &_value)
+ : OpEventBase(op_tid), key(_key), value(_value) {
+ }
+
+ void encode(bufferlist& bl) const;
+ void decode(__u8 version, bufferlist::const_iterator& it);
+ void dump(Formatter *f) const;
+};
+
+struct MetadataRemoveEvent : public OpEventBase {
+ static const EventType TYPE = EVENT_TYPE_METADATA_REMOVE;
+
+ string key;
+
+ MetadataRemoveEvent() {
+ }
+ MetadataRemoveEvent(uint64_t op_tid, const string &_key)
+ : OpEventBase(op_tid), key(_key) {
+ }
+
+ void encode(bufferlist& bl) const;
+ void decode(__u8 version, bufferlist::const_iterator& it);
+ void dump(Formatter *f) const;
+};
+
+struct UnknownEvent {
+ static const EventType TYPE = static_cast<EventType>(-1);
+
+ void encode(bufferlist& bl) const;
+ void decode(__u8 version, bufferlist::const_iterator& it);
+ void dump(Formatter *f) const;
+};
+
+typedef boost::mpl::vector<AioDiscardEvent,
+ AioWriteEvent,
+ AioFlushEvent,
+ OpFinishEvent,
+ SnapCreateEvent,
+ SnapRemoveEvent,
+ SnapRenameEvent,
+ SnapProtectEvent,
+ SnapUnprotectEvent,
+ SnapRollbackEvent,
+ RenameEvent,
+ ResizeEvent,
+ FlattenEvent,
+ DemotePromoteEvent,
+ SnapLimitEvent,
+ UpdateFeaturesEvent,
+ MetadataSetEvent,
+ MetadataRemoveEvent,
+ AioWriteSameEvent,
+ AioCompareAndWriteEvent,
+ UnknownEvent> EventVector;
+typedef boost::make_variant_over<EventVector>::type Event;
+
+struct EventEntry {
+ static uint32_t get_fixed_size() {
+ return EVENT_FIXED_SIZE + METADATA_FIXED_SIZE;
+ }
+
+ EventEntry() : event(UnknownEvent()) {
+ }
+ EventEntry(const Event &_event, const utime_t &_timestamp = utime_t())
+ : event(_event), timestamp(_timestamp) {
+ }
+
+ Event event;
+ utime_t timestamp;
+
+ EventType get_event_type() const;
+
+ void encode(bufferlist& bl) const;
+ void decode(bufferlist::const_iterator& it);
+ void dump(Formatter *f) const;
+
+ static void generate_test_instances(std::list<EventEntry *> &o);
+
+private:
+ static const uint32_t EVENT_FIXED_SIZE = 14; /// version encoding, type
+ static const uint32_t METADATA_FIXED_SIZE = 14; /// version encoding, timestamp
+
+ void encode_metadata(bufferlist& bl) const;
+ void decode_metadata(bufferlist::const_iterator& it);
+};
+
+// Journal Client data structures
+
+enum ClientMetaType {
+ IMAGE_CLIENT_META_TYPE = 0,
+ MIRROR_PEER_CLIENT_META_TYPE = 1,
+ CLI_CLIENT_META_TYPE = 2
+};
+
+struct ImageClientMeta {
+ static const ClientMetaType TYPE = IMAGE_CLIENT_META_TYPE;
+
+ uint64_t tag_class = 0;
+ bool resync_requested = false;
+
+ ImageClientMeta() {
+ }
+ ImageClientMeta(uint64_t tag_class) : tag_class(tag_class) {
+ }
+
+ void encode(bufferlist& bl) const;
+ void decode(__u8 version, bufferlist::const_iterator& it);
+ void dump(Formatter *f) const;
+};
+
+struct MirrorPeerSyncPoint {
+ typedef boost::optional<uint64_t> ObjectNumber;
+
+ cls::rbd::SnapshotNamespace snap_namespace;
+ std::string snap_name;
+ std::string from_snap_name;
+ ObjectNumber object_number;
+
+ MirrorPeerSyncPoint() : MirrorPeerSyncPoint({}, "", "", boost::none) {
+ }
+ MirrorPeerSyncPoint(const cls::rbd::SnapshotNamespace& snap_namespace,
+ const std::string &snap_name,
+ const ObjectNumber &object_number)
+ : MirrorPeerSyncPoint(snap_namespace, snap_name, "", object_number) {
+ }
+ MirrorPeerSyncPoint(const cls::rbd::SnapshotNamespace& snap_namespace,
+ const std::string &snap_name,
+ const std::string &from_snap_name,
+ const ObjectNumber &object_number)
+ : snap_namespace(snap_namespace), snap_name(snap_name),
+ from_snap_name(from_snap_name), object_number(object_number) {
+ }
+
+ inline bool operator==(const MirrorPeerSyncPoint &sync) const {
+ return (snap_name == sync.snap_name &&
+ from_snap_name == sync.from_snap_name &&
+ object_number == sync.object_number &&
+ snap_namespace == sync.snap_namespace);
+ }
+
+ void encode(bufferlist& bl) const;
+ void decode(__u8 version, bufferlist::const_iterator& it);
+ void dump(Formatter *f) const;
+};
+
+enum MirrorPeerState {
+ MIRROR_PEER_STATE_SYNCING,
+ MIRROR_PEER_STATE_REPLAYING
+};
+
+struct MirrorPeerClientMeta {
+ typedef std::list<MirrorPeerSyncPoint> SyncPoints;
+
+ static const ClientMetaType TYPE = MIRROR_PEER_CLIENT_META_TYPE;
+
+ std::string image_id;
+ MirrorPeerState state = MIRROR_PEER_STATE_SYNCING; ///< replay state
+ uint64_t sync_object_count = 0; ///< maximum number of objects ever sync'ed
+ SyncPoints sync_points; ///< max two in-use snapshots for sync
+ SnapSeqs snap_seqs; ///< local to peer snap seq mapping
+
+ MirrorPeerClientMeta() {
+ }
+ MirrorPeerClientMeta(const std::string &image_id,
+ const SyncPoints &sync_points = SyncPoints(),
+ const SnapSeqs &snap_seqs = SnapSeqs())
+ : image_id(image_id), sync_points(sync_points), snap_seqs(snap_seqs) {
+ }
+
+ inline bool operator==(const MirrorPeerClientMeta &meta) const {
+ return (image_id == meta.image_id &&
+ state == meta.state &&
+ sync_object_count == meta.sync_object_count &&
+ sync_points == meta.sync_points &&
+ snap_seqs == meta.snap_seqs);
+ }
+
+ void encode(bufferlist& bl) const;
+ void decode(__u8 version, bufferlist::const_iterator& it);
+ void dump(Formatter *f) const;
+};
+
+struct CliClientMeta {
+ static const ClientMetaType TYPE = CLI_CLIENT_META_TYPE;
+
+ void encode(bufferlist& bl) const;
+ void decode(__u8 version, bufferlist::const_iterator& it);
+ void dump(Formatter *f) const;
+};
+
+struct UnknownClientMeta {
+ static const ClientMetaType TYPE = static_cast<ClientMetaType>(-1);
+
+ void encode(bufferlist& bl) const;
+ void decode(__u8 version, bufferlist::const_iterator& it);
+ void dump(Formatter *f) const;
+};
+
+typedef boost::variant<ImageClientMeta,
+ MirrorPeerClientMeta,
+ CliClientMeta,
+ UnknownClientMeta> ClientMeta;
+
+struct ClientData {
+ ClientData() {
+ }
+ ClientData(const ClientMeta &client_meta) : client_meta(client_meta) {
+ }
+
+ ClientMeta client_meta;
+
+ ClientMetaType get_client_meta_type() const;
+
+ void encode(bufferlist& bl) const;
+ void decode(bufferlist::const_iterator& it);
+ void dump(Formatter *f) const;
+
+ static void generate_test_instances(std::list<ClientData *> &o);
+};
+
+// Journal Tag data structures
+
+struct TagPredecessor {
+ std::string mirror_uuid; // empty if local
+ bool commit_valid = false;
+ uint64_t tag_tid = 0;
+ uint64_t entry_tid = 0;
+
+ TagPredecessor() {
+ }
+ TagPredecessor(const std::string &mirror_uuid, bool commit_valid,
+ uint64_t tag_tid, uint64_t entry_tid)
+ : mirror_uuid(mirror_uuid), commit_valid(commit_valid), tag_tid(tag_tid),
+ entry_tid(entry_tid) {
+ }
+
+ inline bool operator==(const TagPredecessor &rhs) const {
+ return (mirror_uuid == rhs.mirror_uuid &&
+ commit_valid == rhs.commit_valid &&
+ tag_tid == rhs.tag_tid &&
+ entry_tid == rhs.entry_tid);
+ }
+
+ void encode(bufferlist& bl) const;
+ void decode(bufferlist::const_iterator& it);
+ void dump(Formatter *f) const;
+};
+
+struct TagData {
+ // owner of the tag (exclusive lock epoch)
+ std::string mirror_uuid; // empty if local
+
+ // mapping to last committed record of previous tag
+ TagPredecessor predecessor;
+
+ TagData() {
+ }
+ TagData(const std::string &mirror_uuid) : mirror_uuid(mirror_uuid) {
+ }
+ TagData(const std::string &mirror_uuid,
+ const std::string &predecessor_mirror_uuid,
+ bool predecessor_commit_valid,
+ uint64_t predecessor_tag_tid, uint64_t predecessor_entry_tid)
+ : mirror_uuid(mirror_uuid),
+ predecessor(predecessor_mirror_uuid, predecessor_commit_valid,
+ predecessor_tag_tid, predecessor_entry_tid) {
+ }
+
+ void encode(bufferlist& bl) const;
+ void decode(bufferlist::const_iterator& it);
+ void dump(Formatter *f) const;
+
+ static void generate_test_instances(std::list<TagData *> &o);
+};
+
+std::ostream &operator<<(std::ostream &out, const EventType &type);
+std::ostream &operator<<(std::ostream &out, const ClientMetaType &type);
+std::ostream &operator<<(std::ostream &out, const ImageClientMeta &meta);
+std::ostream &operator<<(std::ostream &out, const MirrorPeerSyncPoint &sync);
+std::ostream &operator<<(std::ostream &out, const MirrorPeerState &meta);
+std::ostream &operator<<(std::ostream &out, const MirrorPeerClientMeta &meta);
+std::ostream &operator<<(std::ostream &out, const TagPredecessor &predecessor);
+std::ostream &operator<<(std::ostream &out, const TagData &tag_data);
+
+struct Listener {
+ virtual ~Listener() {
+ }
+
+ /// invoked when journal close is requested
+ virtual void handle_close() = 0;
+
+ /// invoked when journal is promoted to primary
+ virtual void handle_promoted() = 0;
+
+ /// invoked when journal resync is requested
+ virtual void handle_resync() = 0;
+};
+
+WRITE_CLASS_ENCODER(EventEntry);
+WRITE_CLASS_ENCODER(ClientData);
+WRITE_CLASS_ENCODER(TagData);
+
+} // namespace journal
+} // namespace librbd
+
+#endif // CEPH_LIBRBD_JOURNAL_TYPES_H
diff --git a/src/librbd/journal/Utils.cc b/src/librbd/journal/Utils.cc
new file mode 100644
index 00000000..1721a9b2
--- /dev/null
+++ b/src/librbd/journal/Utils.cc
@@ -0,0 +1,86 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include "librbd/journal/Utils.h"
+#include "common/dout.h"
+#include "common/errno.h"
+#include "librbd/journal/Types.h"
+
+#define dout_subsys ceph_subsys_rbd
+#undef dout_prefix
+#define dout_prefix *_dout << "librbd::journal::"
+
+namespace librbd {
+namespace journal {
+namespace util {
+
+int C_DecodeTag::decode(bufferlist::const_iterator *it, TagData *tag_data) {
+ try {
+ using ceph::decode;
+ decode(*tag_data, *it);
+ } catch (const buffer::error &err) {
+ return -EBADMSG;
+ }
+ return 0;
+}
+
+int C_DecodeTag::process(int r) {
+ if (r < 0) {
+ lderr(cct) << "C_DecodeTag: " << this << " " << __func__ << ": "
+ << "failed to allocate tag: " << cpp_strerror(r)
+ << dendl;
+ return r;
+ }
+
+ Mutex::Locker locker(*lock);
+ *tag_tid = tag.tid;
+
+ auto data_it = tag.data.cbegin();
+ r = decode(&data_it, tag_data);
+ if (r < 0) {
+ lderr(cct) << "C_DecodeTag: " << this << " " << __func__ << ": "
+ << "failed to decode allocated tag" << dendl;
+ return r;
+ }
+
+ ldout(cct, 20) << "C_DecodeTag: " << this << " " << __func__ << ": "
+ << "allocated journal tag: "
+ << "tid=" << tag.tid << ", "
+ << "data=" << *tag_data << dendl;
+ return 0;
+}
+
+int C_DecodeTags::process(int r) {
+ if (r < 0) {
+ lderr(cct) << "C_DecodeTags: " << this << " " << __func__ << ": "
+ << "failed to retrieve journal tags: " << cpp_strerror(r)
+ << dendl;
+ return r;
+ }
+
+ if (tags.empty()) {
+ lderr(cct) << "C_DecodeTags: " << this << " " << __func__ << ": "
+ << "no journal tags retrieved" << dendl;
+ return -ENOENT;
+ }
+
+ Mutex::Locker locker(*lock);
+ *tag_tid = tags.back().tid;
+ auto data_it = tags.back().data.cbegin();
+ r = C_DecodeTag::decode(&data_it, tag_data);
+ if (r < 0) {
+ lderr(cct) << "C_DecodeTags: " << this << " " << __func__ << ": "
+ << "failed to decode journal tag" << dendl;
+ return r;
+ }
+
+ ldout(cct, 20) << "C_DecodeTags: " << this << " " << __func__ << ": "
+ << "most recent journal tag: "
+ << "tid=" << *tag_tid << ", "
+ << "data=" << *tag_data << dendl;
+ return 0;
+}
+
+} // namespace util
+} // namespace journal
+} // namespace librbd
diff --git a/src/librbd/journal/Utils.h b/src/librbd/journal/Utils.h
new file mode 100644
index 00000000..63d37c03
--- /dev/null
+++ b/src/librbd/journal/Utils.h
@@ -0,0 +1,81 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#ifndef CEPH_LIBRBD_JOURNAL_UTILS_H
+#define CEPH_LIBRBD_JOURNAL_UTILS_H
+
+#include "include/int_types.h"
+#include "include/Context.h"
+#include "cls/journal/cls_journal_types.h"
+#include <list>
+
+struct CephContext;
+struct Mutex;
+
+namespace librbd {
+namespace journal {
+
+struct TagData;
+
+namespace util {
+
+struct C_DecodeTag : public Context {
+ CephContext *cct;
+ Mutex *lock;
+ uint64_t *tag_tid;
+ TagData *tag_data;
+ Context *on_finish;
+
+ cls::journal::Tag tag;
+
+ C_DecodeTag(CephContext *cct, Mutex *lock, uint64_t *tag_tid,
+ TagData *tag_data, Context *on_finish)
+ : cct(cct), lock(lock), tag_tid(tag_tid), tag_data(tag_data),
+ on_finish(on_finish) {
+ }
+
+ void complete(int r) override {
+ on_finish->complete(process(r));
+ Context::complete(0);
+ }
+ void finish(int r) override {
+ }
+
+ int process(int r);
+
+ static int decode(bufferlist::const_iterator *it, TagData *tag_data);
+
+};
+
+struct C_DecodeTags : public Context {
+ typedef std::list<cls::journal::Tag> Tags;
+
+ CephContext *cct;
+ Mutex *lock;
+ uint64_t *tag_tid;
+ TagData *tag_data;
+ Context *on_finish;
+
+ Tags tags;
+
+ C_DecodeTags(CephContext *cct, Mutex *lock, uint64_t *tag_tid,
+ TagData *tag_data, Context *on_finish)
+ : cct(cct), lock(lock), tag_tid(tag_tid), tag_data(tag_data),
+ on_finish(on_finish) {
+ }
+
+ void complete(int r) override {
+ on_finish->complete(process(r));
+ Context::complete(0);
+ }
+ void finish(int r) override {
+ }
+
+ int process(int r);
+};
+
+} // namespace util
+} // namespace journal
+} // namespace librbd
+
+#endif // CEPH_LIBRBD_JOURNAL_UTILS_H