diff options
Diffstat (limited to 'src/librbd/journal')
25 files changed, 5509 insertions, 0 deletions
diff --git a/src/librbd/journal/CreateRequest.cc b/src/librbd/journal/CreateRequest.cc new file mode 100644 index 000000000..4f7a0f5be --- /dev/null +++ b/src/librbd/journal/CreateRequest.cc @@ -0,0 +1,234 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#include "common/dout.h" +#include "common/errno.h" +#include "include/ceph_assert.h" +#include "librbd/Utils.h" +#include "common/Timer.h" +#include "journal/Settings.h" +#include "librbd/journal/CreateRequest.h" +#include "librbd/journal/RemoveRequest.h" + +#define dout_subsys ceph_subsys_rbd +#undef dout_prefix +#define dout_prefix *_dout << "librbd::Journal::CreateRequest: " + +namespace librbd { + +using util::create_context_callback; + +namespace journal { + +template<typename I> +CreateRequest<I>::CreateRequest(IoCtx &ioctx, const std::string &imageid, + uint8_t order, uint8_t splay_width, + const std::string &object_pool, + uint64_t tag_class, TagData &tag_data, + const std::string &client_id, + ContextWQ *op_work_queue, + Context *on_finish) + : m_ioctx(ioctx), m_image_id(imageid), m_order(order), + m_splay_width(splay_width), m_object_pool(object_pool), + m_tag_class(tag_class), m_tag_data(tag_data), m_image_client_id(client_id), + m_op_work_queue(op_work_queue), m_on_finish(on_finish) { + m_cct = reinterpret_cast<CephContext *>(m_ioctx.cct()); +} + +template<typename I> +void CreateRequest<I>::send() { + ldout(m_cct, 20) << this << " " << __func__ << dendl; + + if (m_order > 64 || m_order < 12) { + lderr(m_cct) << "order must be in the range [12, 64]" << dendl; + complete(-EDOM); + return; + } + if (m_splay_width == 0) { + complete(-EINVAL); + return; + } + + get_pool_id(); +} + +template<typename I> +void CreateRequest<I>::get_pool_id() { + ldout(m_cct, 20) << this << " " << __func__ << dendl; + + if (m_object_pool.empty()) { + create_journal(); + return; + } + + librados::Rados rados(m_ioctx); + IoCtx data_ioctx; + int r = rados.ioctx_create(m_object_pool.c_str(), data_ioctx); + if (r != 0) { + lderr(m_cct) << "failed to create journal: " + << "error opening journal object pool '" << m_object_pool + << "': " << cpp_strerror(r) << dendl; + complete(r); + return; + } + data_ioctx.set_namespace(m_ioctx.get_namespace()); + + m_pool_id = data_ioctx.get_id(); + create_journal(); +} + +template<typename I> +void CreateRequest<I>::create_journal() { + ldout(m_cct, 20) << this << " " << __func__ << dendl; + + ImageCtx::get_timer_instance(m_cct, &m_timer, &m_timer_lock); + m_journaler = new Journaler(m_op_work_queue, m_timer, m_timer_lock, m_ioctx, + m_image_id, m_image_client_id, {}, nullptr); + + using klass = CreateRequest<I>; + Context *ctx = create_context_callback<klass, &klass::handle_create_journal>(this); + + m_journaler->create(m_order, m_splay_width, m_pool_id, ctx); +} + +template<typename I> +Context *CreateRequest<I>::handle_create_journal(int *result) { + ldout(m_cct, 20) << __func__ << ": r=" << *result << dendl; + + if (*result < 0) { + lderr(m_cct) << "failed to create journal: " << cpp_strerror(*result) << dendl; + shut_down_journaler(*result); + return nullptr; + } + + allocate_journal_tag(); + return nullptr; +} + +template<typename I> +void CreateRequest<I>::allocate_journal_tag() { + ldout(m_cct, 20) << this << " " << __func__ << dendl; + + using klass = CreateRequest<I>; + Context *ctx = create_context_callback<klass, &klass::handle_journal_tag>(this); + + encode(m_tag_data, m_bl); + m_journaler->allocate_tag(m_tag_class, m_bl, &m_tag, ctx); +} + +template<typename I> +Context *CreateRequest<I>::handle_journal_tag(int *result) { + ldout(m_cct, 20) << __func__ << ": r=" << *result << dendl; + + if (*result < 0) { + lderr(m_cct) << "failed to allocate tag: " << cpp_strerror(*result) << dendl; + shut_down_journaler(*result); + return nullptr; + } + + register_client(); + return nullptr; +} + +template<typename I> +void CreateRequest<I>::register_client() { + ldout(m_cct, 20) << this << " " << __func__ << dendl; + + m_bl.clear(); + encode(ClientData{ImageClientMeta{m_tag.tag_class}}, m_bl); + + using klass = CreateRequest<I>; + Context *ctx = create_context_callback<klass, &klass::handle_register_client>(this); + + m_journaler->register_client(m_bl, ctx); +} + +template<typename I> +Context *CreateRequest<I>::handle_register_client(int *result) { + ldout(m_cct, 20) << __func__ << ": r=" << *result << dendl; + + if (*result < 0) { + lderr(m_cct) << "failed to register client: " << cpp_strerror(*result) << dendl; + } + + shut_down_journaler(*result); + return nullptr; +} + +template<typename I> +void CreateRequest<I>::shut_down_journaler(int r) { + ldout(m_cct, 20) << this << " " << __func__ << dendl; + + m_r_saved = r; + + using klass = CreateRequest<I>; + Context *ctx = create_context_callback<klass, &klass::handle_journaler_shutdown>(this); + + m_journaler->shut_down(ctx); +} + +template<typename I> +Context *CreateRequest<I>::handle_journaler_shutdown(int *result) { + ldout(m_cct, 20) << __func__ << ": r=" << *result << dendl; + + if (*result < 0) { + lderr(m_cct) << "failed to shut down journaler: " << cpp_strerror(*result) << dendl; + } + + delete m_journaler; + + if (!m_r_saved) { + complete(0); + return nullptr; + } + + // there was an error during journal creation, so we rollback + // what ever was done. the easiest way to do this is to invoke + // journal remove state machine, although it's not the most + // cleanest approach when it comes to redundancy, but that's + // ok during the failure path. + remove_journal(); + return nullptr; +} + +template<typename I> +void CreateRequest<I>::remove_journal() { + ldout(m_cct, 20) << this << " " << __func__ << dendl; + + using klass = CreateRequest<I>; + Context *ctx = create_context_callback<klass, &klass::handle_remove_journal>(this); + + RemoveRequest<I> *req = RemoveRequest<I>::create( + m_ioctx, m_image_id, m_image_client_id, m_op_work_queue, ctx); + req->send(); +} + +template<typename I> +Context *CreateRequest<I>::handle_remove_journal(int *result) { + ldout(m_cct, 20) << __func__ << ": r=" << *result << dendl; + + if (*result < 0) { + lderr(m_cct) << "error cleaning up journal after creation failed: " + << cpp_strerror(*result) << dendl; + } + + complete(m_r_saved); + return nullptr; +} + +template<typename I> +void CreateRequest<I>::complete(int r) { + ldout(m_cct, 20) << this << " " << __func__ << dendl; + + if (r == 0) { + ldout(m_cct, 20) << "done." << dendl; + } + + m_on_finish->complete(r); + delete this; +} + +} // namespace journal +} // namespace librbd + +template class librbd::journal::CreateRequest<librbd::ImageCtx>; diff --git a/src/librbd/journal/CreateRequest.h b/src/librbd/journal/CreateRequest.h new file mode 100644 index 000000000..6fab409c4 --- /dev/null +++ b/src/librbd/journal/CreateRequest.h @@ -0,0 +1,106 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#ifndef CEPH_LIBRBD_JOURNAL_CREATE_REQUEST_H +#define CEPH_LIBRBD_JOURNAL_CREATE_REQUEST_H + +#include "include/int_types.h" +#include "include/buffer.h" +#include "include/rados/librados.hpp" +#include "include/rbd/librbd.hpp" +#include "common/ceph_mutex.h" +#include "common/Timer.h" +#include "librbd/ImageCtx.h" +#include "journal/Journaler.h" +#include "librbd/journal/Types.h" +#include "librbd/journal/TypeTraits.h" +#include "cls/journal/cls_journal_types.h" + +using librados::IoCtx; +using journal::Journaler; + +class Context; +class ContextWQ; + +namespace journal { + class Journaler; +} + +namespace librbd { + +class ImageCtx; + +namespace journal { + +template<typename ImageCtxT = ImageCtx> +class CreateRequest { +public: + static CreateRequest *create(IoCtx &ioctx, const std::string &imageid, + uint8_t order, uint8_t splay_width, + const std::string &object_pool, + uint64_t tag_class, TagData &tag_data, + const std::string &client_id, + ContextWQ *op_work_queue, Context *on_finish) { + return new CreateRequest(ioctx, imageid, order, splay_width, object_pool, + tag_class, tag_data, client_id, op_work_queue, + on_finish); + } + + void send(); + +private: + typedef typename TypeTraits<ImageCtxT>::Journaler Journaler; + + CreateRequest(IoCtx &ioctx, const std::string &imageid, uint8_t order, + uint8_t splay_width, const std::string &object_pool, + uint64_t tag_class, TagData &tag_data, + const std::string &client_id, ContextWQ *op_work_queue, + Context *on_finish); + + IoCtx &m_ioctx; + std::string m_image_id; + uint8_t m_order; + uint8_t m_splay_width; + std::string m_object_pool; + uint64_t m_tag_class; + TagData m_tag_data; + std::string m_image_client_id; + ContextWQ *m_op_work_queue; + Context *m_on_finish; + + CephContext *m_cct; + cls::journal::Tag m_tag; + bufferlist m_bl; + Journaler *m_journaler; + SafeTimer *m_timer; + ceph::mutex *m_timer_lock; + int m_r_saved; + + int64_t m_pool_id = -1; + + void get_pool_id(); + + void create_journal(); + Context *handle_create_journal(int *result); + + void allocate_journal_tag(); + Context *handle_journal_tag(int *result); + + void register_client(); + Context *handle_register_client(int *result); + + void shut_down_journaler(int r); + Context *handle_journaler_shutdown(int *result); + + void remove_journal(); + Context *handle_remove_journal(int *result); + + void complete(int r); +}; + +} // namespace journal +} // namespace librbd + +extern template class librbd::journal::CreateRequest<librbd::ImageCtx>; + +#endif /* CEPH_LIBRBD_JOURNAL_CREATE_REQUEST_H */ diff --git a/src/librbd/journal/DemoteRequest.cc b/src/librbd/journal/DemoteRequest.cc new file mode 100644 index 000000000..564391978 --- /dev/null +++ b/src/librbd/journal/DemoteRequest.cc @@ -0,0 +1,255 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#include "librbd/journal/DemoteRequest.h" +#include "common/dout.h" +#include "common/errno.h" +#include "journal/Journaler.h" +#include "journal/Settings.h" +#include "librbd/ImageCtx.h" +#include "librbd/Journal.h" +#include "librbd/Utils.h" +#include "librbd/asio/ContextWQ.h" +#include "librbd/journal/OpenRequest.h" + +#define dout_subsys ceph_subsys_rbd +#undef dout_prefix +#define dout_prefix *_dout << "librbd::journal::DemoteRequest: " << this \ + << " " << __func__ << ": " + +namespace librbd { +namespace journal { + +using librbd::util::create_async_context_callback; +using librbd::util::create_context_callback; + +template <typename I> +DemoteRequest<I>::DemoteRequest(I &image_ctx, Context *on_finish) + : m_image_ctx(image_ctx), m_on_finish(on_finish), + m_lock(ceph::make_mutex("DemoteRequest::m_lock")) { +} + +template <typename I> +DemoteRequest<I>::~DemoteRequest() { + ceph_assert(m_journaler == nullptr); +} + +template <typename I> +void DemoteRequest<I>::send() { + open_journaler(); +} + +template <typename I> +void DemoteRequest<I>::open_journaler() { + CephContext *cct = m_image_ctx.cct; + ldout(cct, 20) << dendl; + + m_journaler = new Journaler(m_image_ctx.md_ctx, m_image_ctx.id, + Journal<>::IMAGE_CLIENT_ID, {}, nullptr); + auto ctx = create_async_context_callback( + m_image_ctx, create_context_callback< + DemoteRequest<I>, &DemoteRequest<I>::handle_open_journaler>(this)); + auto req = OpenRequest<I>::create(&m_image_ctx, m_journaler, &m_lock, + &m_client_meta, &m_tag_tid, &m_tag_data, + ctx); + req->send(); +} + +template <typename I> +void DemoteRequest<I>::handle_open_journaler(int r) { + CephContext *cct = m_image_ctx.cct; + ldout(cct, 20) << "r=" << r << dendl; + + if (r < 0) { + m_ret_val = r; + lderr(cct) << "failed to open journal: " << cpp_strerror(r) << dendl; + shut_down_journaler(); + return; + } else if (m_tag_data.mirror_uuid != Journal<>::LOCAL_MIRROR_UUID) { + m_ret_val = -EINVAL; + lderr(cct) << "image is not currently the primary" << dendl; + shut_down_journaler(); + return; + } + + allocate_tag(); +} + +template <typename I> +void DemoteRequest<I>::allocate_tag() { + CephContext *cct = m_image_ctx.cct; + ldout(cct, 20) << dendl; + + cls::journal::Client client; + int r = m_journaler->get_cached_client(Journal<>::IMAGE_CLIENT_ID, &client); + if (r < 0) { + m_ret_val = r; + lderr(cct) << "failed to retrieve client: " << cpp_strerror(r) << dendl; + shut_down_journaler(); + return; + } + + TagPredecessor predecessor; + predecessor.mirror_uuid = Journal<>::LOCAL_MIRROR_UUID; + if (!client.commit_position.object_positions.empty()) { + auto position = client.commit_position.object_positions.front(); + predecessor.commit_valid = true; + predecessor.tag_tid = position.tag_tid; + predecessor.entry_tid = position.entry_tid; + } + + TagData tag_data; + tag_data.mirror_uuid = Journal<>::ORPHAN_MIRROR_UUID; + tag_data.predecessor = std::move(predecessor); + + bufferlist tag_bl; + encode(tag_data, tag_bl); + + auto ctx = create_context_callback< + DemoteRequest<I>, &DemoteRequest<I>::handle_allocate_tag>(this); + m_journaler->allocate_tag(m_client_meta.tag_class, tag_bl, &m_tag, ctx); +} + +template <typename I> +void DemoteRequest<I>::handle_allocate_tag(int r) { + CephContext *cct = m_image_ctx.cct; + ldout(cct, 20) << "r=" << r << dendl; + + if (r < 0) { + m_ret_val = r; + lderr(cct) << "failed to allocate tag: " << cpp_strerror(r) << dendl; + shut_down_journaler(); + return; + } + + m_tag_tid = m_tag.tid; + append_event(); +} + +template <typename I> +void DemoteRequest<I>::append_event() { + CephContext *cct = m_image_ctx.cct; + ldout(cct, 20) << dendl; + + EventEntry event_entry{DemotePromoteEvent{}, {}}; + bufferlist event_entry_bl; + encode(event_entry, event_entry_bl); + + m_journaler->start_append(0); + m_future = m_journaler->append(m_tag_tid, event_entry_bl); + + auto ctx = create_context_callback< + DemoteRequest<I>, &DemoteRequest<I>::handle_append_event>(this); + m_future.flush(ctx); + +} + +template <typename I> +void DemoteRequest<I>::handle_append_event(int r) { + CephContext *cct = m_image_ctx.cct; + ldout(cct, 20) << "r=" << r << dendl; + + if (r < 0) { + m_ret_val = r; + lderr(cct) << "failed to append demotion journal event: " << cpp_strerror(r) + << dendl; + stop_append(); + return; + } + + commit_event(); +} + +template <typename I> +void DemoteRequest<I>::commit_event() { + CephContext *cct = m_image_ctx.cct; + ldout(cct, 20) << dendl; + + m_journaler->committed(m_future); + + auto ctx = create_context_callback< + DemoteRequest<I>, &DemoteRequest<I>::handle_commit_event>(this); + m_journaler->flush_commit_position(ctx); +} + +template <typename I> +void DemoteRequest<I>::handle_commit_event(int r) { + CephContext *cct = m_image_ctx.cct; + ldout(cct, 20) << "r=" << r << dendl; + + if (r < 0) { + m_ret_val = r; + lderr(cct) << "failed to flush demotion commit position: " + << cpp_strerror(r) << dendl; + } + + stop_append(); +} + +template <typename I> +void DemoteRequest<I>::stop_append() { + CephContext *cct = m_image_ctx.cct; + ldout(cct, 20) << dendl; + + auto ctx = create_context_callback< + DemoteRequest<I>, &DemoteRequest<I>::handle_stop_append>(this); + m_journaler->stop_append(ctx); +} + +template <typename I> +void DemoteRequest<I>::handle_stop_append(int r) { + CephContext *cct = m_image_ctx.cct; + ldout(cct, 20) << "r=" << r << dendl; + + if (r < 0) { + if (m_ret_val == 0) { + m_ret_val = r; + } + lderr(cct) << "failed to stop journal append: " << cpp_strerror(r) << dendl; + } + + shut_down_journaler(); +} + +template <typename I> +void DemoteRequest<I>::shut_down_journaler() { + CephContext *cct = m_image_ctx.cct; + ldout(cct, 20) << dendl; + + Context *ctx = create_async_context_callback( + m_image_ctx, create_context_callback< + DemoteRequest<I>, &DemoteRequest<I>::handle_shut_down_journaler>(this)); + m_journaler->shut_down(ctx); +} + +template <typename I> +void DemoteRequest<I>::handle_shut_down_journaler(int r) { + CephContext *cct = m_image_ctx.cct; + ldout(cct, 20) << "r=" << r << dendl; + + if (r < 0) { + lderr(cct) << "failed to shut down journal: " << cpp_strerror(r) << dendl; + } + + delete m_journaler; + m_journaler = nullptr; + finish(r); +} + +template <typename I> +void DemoteRequest<I>::finish(int r) { + if (m_ret_val < 0) { + r = m_ret_val; + } + + CephContext *cct = m_image_ctx.cct; + ldout(cct, 20) << "r=" << r << dendl; + + m_on_finish->complete(r); + delete this; +} + +} // namespace journal +} // namespace librbd + +template class librbd::journal::DemoteRequest<librbd::ImageCtx>; diff --git a/src/librbd/journal/DemoteRequest.h b/src/librbd/journal/DemoteRequest.h new file mode 100644 index 000000000..6aba6cc8f --- /dev/null +++ b/src/librbd/journal/DemoteRequest.h @@ -0,0 +1,107 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#ifndef CEPH_LIBRBD_JOURNAL_DEMOTE_REQUEST_H +#define CEPH_LIBRBD_JOURNAL_DEMOTE_REQUEST_H + +#include "common/ceph_mutex.h" +#include "cls/journal/cls_journal_types.h" +#include "journal/Future.h" +#include "librbd/journal/Types.h" +#include "librbd/journal/TypeTraits.h" + +struct Context; + +namespace librbd { + +struct ImageCtx; + +namespace journal { + +template <typename ImageCtxT = librbd::ImageCtx> +class DemoteRequest { +public: + static DemoteRequest *create(ImageCtxT &image_ctx, Context *on_finish) { + return new DemoteRequest(image_ctx, on_finish); + } + + DemoteRequest(ImageCtxT &image_ctx, Context *on_finish); + ~DemoteRequest(); + + void send(); + +private: + /** + * @verbatim + * + * <start> + * | + * v + * OPEN_JOURNALER * * * * * + * | * + * v * + * ALLOCATE_TAG * * * * * * + * | * + * v * + * APPEND_EVENT * * * * + * | * * + * v * * + * COMMIT_EVENT * * + * | * * + * v * * + * STOP_APPEND <* * * * + * | * + * v * + * SHUT_DOWN_JOURNALER <* * + * | + * v + * <finish> + * + * @endverbatim + */ + + typedef typename TypeTraits<ImageCtxT>::Journaler Journaler; + typedef typename TypeTraits<ImageCtxT>::Future Future; + + ImageCtxT &m_image_ctx; + Context *m_on_finish; + + Journaler *m_journaler = nullptr; + int m_ret_val = 0; + + ceph::mutex m_lock; + ImageClientMeta m_client_meta; + uint64_t m_tag_tid = 0; + TagData m_tag_data; + + cls::journal::Tag m_tag; + Future m_future; + + void open_journaler(); + void handle_open_journaler(int r); + + void allocate_tag(); + void handle_allocate_tag(int r); + + void append_event(); + void handle_append_event(int r); + + void commit_event(); + void handle_commit_event(int r); + + void stop_append(); + void handle_stop_append(int r); + + void shut_down_journaler(); + void handle_shut_down_journaler(int r); + + void finish(int r); + +}; + +} // namespace journal +} // namespace librbd + +extern template class librbd::journal::DemoteRequest<librbd::ImageCtx>; + +#endif // CEPH_LIBRBD_JOURNAL_DEMOTE_REQUEST_H diff --git a/src/librbd/journal/DisabledPolicy.h b/src/librbd/journal/DisabledPolicy.h new file mode 100644 index 000000000..27d69a50d --- /dev/null +++ b/src/librbd/journal/DisabledPolicy.h @@ -0,0 +1,31 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#ifndef CEPH_LIBRBD_JOURNAL_DISABLED_POLICY_H +#define CEPH_LIBRBD_JOURNAL_DISABLED_POLICY_H + +#include "librbd/journal/Policy.h" + +namespace librbd { + +struct ImageCtx; + +namespace journal { + +class DisabledPolicy : public Policy { +public: + bool append_disabled() const override { + return true; + } + bool journal_disabled() const override { + return true; + } + void allocate_tag_on_lock(Context *on_finish) override { + ceph_abort(); + } +}; + +} // namespace journal +} // namespace librbd + +#endif // CEPH_LIBRBD_JOURNAL_DISABLED_POLICY_H diff --git a/src/librbd/journal/ObjectDispatch.cc b/src/librbd/journal/ObjectDispatch.cc new file mode 100644 index 000000000..5623c635d --- /dev/null +++ b/src/librbd/journal/ObjectDispatch.cc @@ -0,0 +1,258 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#include "librbd/journal/ObjectDispatch.h" +#include "common/dout.h" +#include "osdc/Striper.h" +#include "librbd/ImageCtx.h" +#include "librbd/Journal.h" +#include "librbd/Utils.h" +#include "librbd/asio/ContextWQ.h" +#include "librbd/io/ObjectDispatchSpec.h" +#include "librbd/io/ObjectDispatcherInterface.h" +#include "librbd/io/Utils.h" + +#define dout_subsys ceph_subsys_rbd +#undef dout_prefix +#define dout_prefix *_dout << "librbd::journal::ObjectDispatch: " << this \ + << " " << __func__ << ": " + +namespace librbd { +namespace journal { + +using librbd::util::data_object_name; +using util::create_context_callback; + +namespace { + +template <typename I> +struct C_CommitIOEvent : public Context { + I* image_ctx; + Journal<I>* journal; + uint64_t object_no; + uint64_t object_off; + uint64_t object_len; + uint64_t journal_tid; + int object_dispatch_flags; + Context* on_finish; + + C_CommitIOEvent(I* image_ctx, Journal<I>* journal, uint64_t object_no, + uint64_t object_off, uint64_t object_len, + uint64_t journal_tid, int object_dispatch_flags, + Context* on_finish) + : image_ctx(image_ctx), journal(journal), object_no(object_no), + object_off(object_off), object_len(object_len), journal_tid(journal_tid), + object_dispatch_flags(object_dispatch_flags), on_finish(on_finish) { + } + + void finish(int r) override { + // don't commit the IO extent if a previous dispatch handler will just + // retry the failed IO + if (r >= 0 || + (object_dispatch_flags & + io::OBJECT_DISPATCH_FLAG_WILL_RETRY_ON_ERROR) == 0) { + io::Extents file_extents; + io::util::extent_to_file(image_ctx, object_no, object_off, object_len, + file_extents); + for (auto& extent : file_extents) { + journal->commit_io_event_extent(journal_tid, extent.first, + extent.second, r); + } + } + + if (on_finish != nullptr) { + on_finish->complete(r); + } + } +}; + +} // anonymous namespace + +template <typename I> +ObjectDispatch<I>::ObjectDispatch(I* image_ctx, Journal<I>* journal) + : m_image_ctx(image_ctx), m_journal(journal) { +} + +template <typename I> +void ObjectDispatch<I>::shut_down(Context* on_finish) { + m_image_ctx->op_work_queue->queue(on_finish, 0); +} + +template <typename I> +bool ObjectDispatch<I>::discard( + uint64_t object_no, uint64_t object_off, uint64_t object_len, + IOContext io_context, int discard_flags, + const ZTracer::Trace &parent_trace, int* object_dispatch_flags, + uint64_t* journal_tid, io::DispatchResult* dispatch_result, + Context** on_finish, Context* on_dispatched) { + if (*journal_tid == 0) { + // non-journaled IO + return false; + } + + auto cct = m_image_ctx->cct; + ldout(cct, 20) << data_object_name(m_image_ctx, object_no) << " " + << object_off << "~" << object_len << dendl; + + *on_finish = new C_CommitIOEvent<I>(m_image_ctx, m_journal, object_no, + object_off, object_len, *journal_tid, + *object_dispatch_flags, *on_finish); + *on_finish = create_context_callback< + Context, &Context::complete>(*on_finish, m_journal); + + *dispatch_result = io::DISPATCH_RESULT_CONTINUE; + wait_or_flush_event(*journal_tid, *object_dispatch_flags, on_dispatched); + return true; +} + +template <typename I> +bool ObjectDispatch<I>::write( + uint64_t object_no, uint64_t object_off, ceph::bufferlist&& data, + IOContext io_context, int op_flags, int write_flags, + std::optional<uint64_t> assert_version, + const ZTracer::Trace &parent_trace, int* object_dispatch_flags, + uint64_t* journal_tid, io::DispatchResult* dispatch_result, + Context** on_finish, Context* on_dispatched) { + if (*journal_tid == 0) { + // non-journaled IO + return false; + } + + auto cct = m_image_ctx->cct; + ldout(cct, 20) << data_object_name(m_image_ctx, object_no) << " " + << object_off << "~" << data.length() << dendl; + + *on_finish = new C_CommitIOEvent<I>(m_image_ctx, m_journal, object_no, + object_off, data.length(), *journal_tid, + *object_dispatch_flags, *on_finish); + *on_finish = create_context_callback< + Context, &Context::complete>(*on_finish, m_journal); + + *dispatch_result = io::DISPATCH_RESULT_CONTINUE; + wait_or_flush_event(*journal_tid, *object_dispatch_flags, on_dispatched); + return true; +} + +template <typename I> +bool ObjectDispatch<I>::write_same( + uint64_t object_no, uint64_t object_off, uint64_t object_len, + io::LightweightBufferExtents&& buffer_extents, ceph::bufferlist&& data, + IOContext io_context, int op_flags, + const ZTracer::Trace &parent_trace, int* object_dispatch_flags, + uint64_t* journal_tid, io::DispatchResult* dispatch_result, + Context** on_finish, Context* on_dispatched) { + if (*journal_tid == 0) { + // non-journaled IO + return false; + } + + auto cct = m_image_ctx->cct; + ldout(cct, 20) << data_object_name(m_image_ctx, object_no) << " " + << object_off << "~" << object_len << dendl; + + *on_finish = new C_CommitIOEvent<I>(m_image_ctx, m_journal, object_no, + object_off, object_len, *journal_tid, + *object_dispatch_flags, *on_finish); + *on_finish = create_context_callback< + Context, &Context::complete>(*on_finish, m_journal); + + *dispatch_result = io::DISPATCH_RESULT_CONTINUE; + wait_or_flush_event(*journal_tid, *object_dispatch_flags, on_dispatched); + return true; +} + +template <typename I> +bool ObjectDispatch<I>::compare_and_write( + uint64_t object_no, uint64_t object_off, ceph::bufferlist&& cmp_data, + ceph::bufferlist&& write_data, IOContext io_context, int op_flags, + const ZTracer::Trace &parent_trace, uint64_t* mismatch_offset, + int* object_dispatch_flags, uint64_t* journal_tid, + io::DispatchResult* dispatch_result, Context** on_finish, + Context* on_dispatched) { + if (*journal_tid == 0) { + // non-journaled IO + return false; + } + + auto cct = m_image_ctx->cct; + ldout(cct, 20) << data_object_name(m_image_ctx, object_no) << " " + << object_off << "~" << write_data.length() + << dendl; + + *on_finish = new C_CommitIOEvent<I>(m_image_ctx, m_journal, object_no, + object_off, write_data.length(), + *journal_tid, *object_dispatch_flags, + *on_finish); + *on_finish = create_context_callback< + Context, &Context::complete>(*on_finish, m_journal); + + *dispatch_result = io::DISPATCH_RESULT_CONTINUE; + wait_or_flush_event(*journal_tid, *object_dispatch_flags, on_dispatched); + return true; +} + +template <typename I> +bool ObjectDispatch<I>::flush( + io::FlushSource flush_source, const ZTracer::Trace &parent_trace, + uint64_t* journal_tid, io::DispatchResult* dispatch_result, + Context** on_finish, Context* on_dispatched) { + if (*journal_tid == 0) { + // non-journaled IO + return false; + } + + auto cct = m_image_ctx->cct; + ldout(cct, 20) << dendl; + + auto ctx = *on_finish; + *on_finish = new LambdaContext( + [image_ctx=m_image_ctx, ctx, journal_tid=*journal_tid](int r) { + image_ctx->journal->commit_io_event(journal_tid, r); + ctx->complete(r); + }); + + *dispatch_result = io::DISPATCH_RESULT_CONTINUE; + wait_or_flush_event(*journal_tid, io::OBJECT_DISPATCH_FLAG_FLUSH, + on_dispatched); + return true; +} + +template <typename I> +void ObjectDispatch<I>::extent_overwritten( + uint64_t object_no, uint64_t object_off, uint64_t object_len, + uint64_t journal_tid, uint64_t new_journal_tid) { + auto cct = m_image_ctx->cct; + ldout(cct, 20) << object_no << " " << object_off << "~" << object_len + << dendl; + + Context *ctx = new C_CommitIOEvent<I>(m_image_ctx, m_journal, object_no, + object_off, object_len, journal_tid, false, + nullptr); + if (new_journal_tid != 0) { + // ensure new journal event is safely committed to disk before + // committing old event + m_journal->flush_event(new_journal_tid, ctx); + } else { + ctx = create_context_callback< + Context, &Context::complete>(ctx, m_journal); + ctx->complete(0); + } +} + +template <typename I> +void ObjectDispatch<I>::wait_or_flush_event( + uint64_t journal_tid, int object_dispatch_flags, Context* on_dispatched) { + auto cct = m_image_ctx->cct; + ldout(cct, 20) << "journal_tid=" << journal_tid << dendl; + + if ((object_dispatch_flags & io::OBJECT_DISPATCH_FLAG_FLUSH) != 0) { + m_journal->flush_event(journal_tid, on_dispatched); + } else { + m_journal->wait_event(journal_tid, on_dispatched); + } +} + +} // namespace journal +} // namespace librbd + +template class librbd::journal::ObjectDispatch<librbd::ImageCtx>; diff --git a/src/librbd/journal/ObjectDispatch.h b/src/librbd/journal/ObjectDispatch.h new file mode 100644 index 000000000..45e4773cc --- /dev/null +++ b/src/librbd/journal/ObjectDispatch.h @@ -0,0 +1,124 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#ifndef CEPH_LIBRBD_JOURNAL_OBJECT_DISPATCH_H +#define CEPH_LIBRBD_JOURNAL_OBJECT_DISPATCH_H + +#include "include/int_types.h" +#include "include/buffer.h" +#include "include/rados/librados.hpp" +#include "common/zipkin_trace.h" +#include "librbd/io/Types.h" +#include "librbd/io/ObjectDispatchInterface.h" + +struct Context; + +namespace librbd { + +struct ImageCtx; +template <typename> class Journal; + +namespace journal { + +template <typename ImageCtxT = librbd::ImageCtx> +class ObjectDispatch : public io::ObjectDispatchInterface { +public: + static ObjectDispatch* create(ImageCtxT* image_ctx, + Journal<ImageCtxT>* journal) { + return new ObjectDispatch(image_ctx, journal); + } + + ObjectDispatch(ImageCtxT* image_ctx, Journal<ImageCtxT>* journal); + + io::ObjectDispatchLayer get_dispatch_layer() const override { + return io::OBJECT_DISPATCH_LAYER_JOURNAL; + } + + void shut_down(Context* on_finish) override; + + bool read( + uint64_t object_no, io::ReadExtents* extents, IOContext io_context, + int op_flags, int read_flags, const ZTracer::Trace &parent_trace, + uint64_t* version, int* object_dispatch_flags, + io::DispatchResult* dispatch_result, Context** on_finish, + Context* on_dispatched) { + return false; + } + + bool discard( + uint64_t object_no, uint64_t object_off, uint64_t object_len, + IOContext io_context, int discard_flags, + const ZTracer::Trace &parent_trace, int* object_dispatch_flags, + uint64_t* journal_tid, io::DispatchResult* dispatch_result, + Context** on_finish, Context* on_dispatched) override; + + bool write( + uint64_t object_no, uint64_t object_off, ceph::bufferlist&& data, + IOContext io_context, int op_flags, int write_flags, + std::optional<uint64_t> assert_version, + const ZTracer::Trace &parent_trace, int* object_dispatch_flags, + uint64_t* journal_tid, io::DispatchResult* dispatch_result, + Context** on_finish, Context* on_dispatched) override; + + bool write_same( + uint64_t object_no, uint64_t object_off, uint64_t object_len, + io::LightweightBufferExtents&& buffer_extents, ceph::bufferlist&& data, + IOContext io_context, int op_flags, + const ZTracer::Trace &parent_trace, int* object_dispatch_flags, + uint64_t* journal_tid, io::DispatchResult* dispatch_result, + Context** on_finish, Context* on_dispatched) override; + + bool compare_and_write( + uint64_t object_no, uint64_t object_off, ceph::bufferlist&& cmp_data, + ceph::bufferlist&& write_data, IOContext io_context, int op_flags, + const ZTracer::Trace &parent_trace, uint64_t* mismatch_offset, + int* object_dispatch_flags, uint64_t* journal_tid, + io::DispatchResult* dispatch_result, Context** on_finish, + Context* on_dispatched) override; + + bool flush( + io::FlushSource flush_source, const ZTracer::Trace &parent_trace, + uint64_t* journal_tid, io::DispatchResult* dispatch_result, + Context** on_finish, Context* on_dispatched) override; + + bool list_snaps( + uint64_t object_no, io::Extents&& extents, io::SnapIds&& snap_ids, + int list_snap_flags, const ZTracer::Trace &parent_trace, + io::SnapshotDelta* snapshot_delta, int* object_dispatch_flags, + io::DispatchResult* dispatch_result, Context** on_finish, + Context* on_dispatched) override { + return false; + } + + bool invalidate_cache(Context* on_finish) override { + return false; + } + bool reset_existence_cache(Context* on_finish) override { + return false; + } + + void extent_overwritten( + uint64_t object_no, uint64_t object_off, uint64_t object_len, + uint64_t journal_tid, uint64_t new_journal_tid) override; + + int prepare_copyup( + uint64_t object_no, + io::SnapshotSparseBufferlist* snapshot_sparse_bufferlist) override { + return 0; + } + +private: + ImageCtxT* m_image_ctx; + Journal<ImageCtxT>* m_journal; + + void wait_or_flush_event(uint64_t journal_tid, int object_dispatch_flags, + Context* on_dispatched); + +}; + +} // namespace journal +} // namespace librbd + +extern template class librbd::journal::ObjectDispatch<librbd::ImageCtx>; + +#endif // CEPH_LIBRBD_JOURNAL_OBJECT_DISPATCH_H diff --git a/src/librbd/journal/OpenRequest.cc b/src/librbd/journal/OpenRequest.cc new file mode 100644 index 000000000..eb01aa35a --- /dev/null +++ b/src/librbd/journal/OpenRequest.cc @@ -0,0 +1,144 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#include "librbd/journal/OpenRequest.h" +#include "common/dout.h" +#include "common/errno.h" +#include "journal/Journaler.h" +#include "librbd/ImageCtx.h" +#include "librbd/Journal.h" +#include "librbd/Utils.h" +#include "librbd/asio/ContextWQ.h" +#include "librbd/journal/Types.h" +#include "librbd/journal/Utils.h" + +#define dout_subsys ceph_subsys_rbd +#undef dout_prefix +#define dout_prefix *_dout << "librbd::journal::OpenRequest: " << this << " " \ + << __func__ << ": " + +namespace librbd { +namespace journal { + +using librbd::util::create_async_context_callback; +using librbd::util::create_context_callback; +using util::C_DecodeTags; + +template <typename I> +OpenRequest<I>::OpenRequest(I *image_ctx, Journaler *journaler, ceph::mutex *lock, + journal::ImageClientMeta *client_meta, + uint64_t *tag_tid, journal::TagData *tag_data, + Context *on_finish) + : m_image_ctx(image_ctx), m_journaler(journaler), m_lock(lock), + m_client_meta(client_meta), m_tag_tid(tag_tid), m_tag_data(tag_data), + m_on_finish(on_finish) { +} + +template <typename I> +void OpenRequest<I>::send() { + send_init(); +} + +template <typename I> +void OpenRequest<I>::send_init() { + CephContext *cct = m_image_ctx->cct; + ldout(cct, 20) << dendl; + + m_journaler->init(create_async_context_callback( + *m_image_ctx, create_context_callback< + OpenRequest<I>, &OpenRequest<I>::handle_init>(this))); +} + +template <typename I> +void OpenRequest<I>::handle_init(int r) { + CephContext *cct = m_image_ctx->cct; + ldout(cct, 20) << "r=" << r << dendl; + + if (r < 0) { + lderr(cct) << "failed to initialize journal: " << cpp_strerror(r) + << dendl; + finish(r); + return; + } + + // locate the master image client record + cls::journal::Client client; + r = m_journaler->get_cached_client(Journal<ImageCtx>::IMAGE_CLIENT_ID, + &client); + if (r < 0) { + lderr(cct) << "failed to locate master image client" << dendl; + finish(r); + return; + } + + librbd::journal::ClientData client_data; + auto bl = client.data.cbegin(); + try { + decode(client_data, bl); + } catch (const buffer::error &err) { + lderr(cct) << "failed to decode client meta data: " << err.what() + << dendl; + finish(-EINVAL); + return; + } + + journal::ImageClientMeta *image_client_meta = + boost::get<journal::ImageClientMeta>(&client_data.client_meta); + if (image_client_meta == nullptr) { + lderr(cct) << this << " " << __func__ << ": " + << "failed to extract client meta data" << dendl; + finish(-EINVAL); + return; + } + + ldout(cct, 20) << this << " " << __func__ << ": " + << "client: " << client << ", " + << "image meta: " << *image_client_meta << dendl; + + m_tag_class = image_client_meta->tag_class; + { + std::lock_guard locker{*m_lock}; + *m_client_meta = *image_client_meta; + } + + send_get_tags(); +} + +template <typename I> +void OpenRequest<I>::send_get_tags() { + CephContext *cct = m_image_ctx->cct; + ldout(cct, 20) << dendl; + + C_DecodeTags *tags_ctx = new C_DecodeTags( + cct, m_lock, m_tag_tid, m_tag_data, create_async_context_callback( + *m_image_ctx, create_context_callback< + OpenRequest<I>, &OpenRequest<I>::handle_get_tags>(this))); + m_journaler->get_tags(m_tag_class, &tags_ctx->tags, tags_ctx); +} + +template <typename I> +void OpenRequest<I>::handle_get_tags(int r) { + CephContext *cct = m_image_ctx->cct; + ldout(cct, 20) << "r=" << r << dendl; + + if (r < 0) { + lderr(cct) << this << " " << __func__ << ": " + << "failed to decode journal tags: " << cpp_strerror(r) << dendl; + } + + finish(r); +} + +template <typename I> +void OpenRequest<I>::finish(int r) { + CephContext *cct = m_image_ctx->cct; + ldout(cct, 20) << "r=" << r << dendl; + + m_on_finish->complete(r); + delete this; +} + +} // namespace journal +} // namespace librbd + +template class librbd::journal::OpenRequest<librbd::ImageCtx>; diff --git a/src/librbd/journal/OpenRequest.h b/src/librbd/journal/OpenRequest.h new file mode 100644 index 000000000..0f10bccf1 --- /dev/null +++ b/src/librbd/journal/OpenRequest.h @@ -0,0 +1,85 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#ifndef CEPH_LIBRBD_JOURNAL_OPEN_REQUEST_H +#define CEPH_LIBRBD_JOURNAL_OPEN_REQUEST_H + +#include "common/ceph_mutex.h" +#include "include/int_types.h" +#include "librbd/journal/TypeTraits.h" + +struct Context; + +namespace librbd { + +struct ImageCtx; + +namespace journal { + +struct ImageClientMeta; +struct TagData; + +template <typename ImageCtxT = ImageCtx> +class OpenRequest { +public: + typedef typename TypeTraits<ImageCtxT>::Journaler Journaler; + + static OpenRequest* create(ImageCtxT *image_ctx, Journaler *journaler, + ceph::mutex *lock, journal::ImageClientMeta *client_meta, + uint64_t *tag_tid, journal::TagData *tag_data, + Context *on_finish) { + return new OpenRequest(image_ctx, journaler, lock, client_meta, tag_tid, + tag_data, on_finish); + } + + OpenRequest(ImageCtxT *image_ctx, Journaler *journaler, ceph::mutex *lock, + journal::ImageClientMeta *client_meta, uint64_t *tag_tid, + journal::TagData *tag_data, Context *on_finish); + + void send(); + +private: + /** + * @verbatim + * + * <start> + * | + * v + * INIT + * | + * v + * GET_TAGS + * | + * v + * <finish> + * + * @endverbatim + */ + + + ImageCtxT *m_image_ctx; + Journaler *m_journaler; + ceph::mutex *m_lock; + journal::ImageClientMeta *m_client_meta; + uint64_t *m_tag_tid; + journal::TagData *m_tag_data; + Context *m_on_finish; + + uint64_t m_tag_class = 0; + + void send_init(); + void handle_init(int r); + + void send_get_tags(); + void handle_get_tags(int r); + + void finish(int r); + +}; + +} // namespace journal +} // namespace librbd + +extern template class librbd::journal::OpenRequest<librbd::ImageCtx>; + +#endif // CEPH_LIBRBD_JOURNAL_OPEN_REQUEST_H diff --git a/src/librbd/journal/Policy.h b/src/librbd/journal/Policy.h new file mode 100644 index 000000000..1ced3c53e --- /dev/null +++ b/src/librbd/journal/Policy.h @@ -0,0 +1,25 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#ifndef CEPH_LIBRBD_JOURNAL_POLICY_H +#define CEPH_LIBRBD_JOURNAL_POLICY_H + +class Context; + +namespace librbd { + +namespace journal { + +struct Policy { + virtual ~Policy() { + } + + virtual bool append_disabled() const = 0; + virtual bool journal_disabled() const = 0; + virtual void allocate_tag_on_lock(Context *on_finish) = 0; +}; + +} // namespace journal +} // namespace librbd + +#endif // CEPH_LIBRBD_JOURNAL_POLICY_H diff --git a/src/librbd/journal/PromoteRequest.cc b/src/librbd/journal/PromoteRequest.cc new file mode 100644 index 000000000..f7ae45a92 --- /dev/null +++ b/src/librbd/journal/PromoteRequest.cc @@ -0,0 +1,237 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#include "librbd/journal/PromoteRequest.h" +#include "common/dout.h" +#include "common/errno.h" +#include "journal/Journaler.h" +#include "journal/Settings.h" +#include "librbd/ImageCtx.h" +#include "librbd/Journal.h" +#include "librbd/Utils.h" +#include "librbd/asio/ContextWQ.h" +#include "librbd/journal/OpenRequest.h" + +#define dout_subsys ceph_subsys_rbd +#undef dout_prefix +#define dout_prefix *_dout << "librbd::journal::PromoteRequest: " << this \ + << " " << __func__ << ": " + +namespace librbd { +namespace journal { + +using librbd::util::create_async_context_callback; +using librbd::util::create_context_callback; + +template <typename I> +PromoteRequest<I>::PromoteRequest(I *image_ctx, bool force, Context *on_finish) + : m_image_ctx(image_ctx), m_force(force), m_on_finish(on_finish), + m_lock(ceph::make_mutex("PromoteRequest::m_lock")) { +} + +template <typename I> +void PromoteRequest<I>::send() { + send_open(); +} + +template <typename I> +void PromoteRequest<I>::send_open() { + CephContext *cct = m_image_ctx->cct; + ldout(cct, 20) << dendl; + + m_journaler = new Journaler(m_image_ctx->md_ctx, m_image_ctx->id, + Journal<>::IMAGE_CLIENT_ID, {}, nullptr); + Context *ctx = create_async_context_callback( + *m_image_ctx, create_context_callback< + PromoteRequest<I>, &PromoteRequest<I>::handle_open>(this)); + auto open_req = OpenRequest<I>::create(m_image_ctx, m_journaler, + &m_lock, &m_client_meta, + &m_tag_tid, &m_tag_data, ctx); + open_req->send(); +} + +template <typename I> +void PromoteRequest<I>::handle_open(int r) { + CephContext *cct = m_image_ctx->cct; + ldout(cct, 20) << "r=" << r << dendl; + + if (r < 0) { + m_ret_val = r; + lderr(cct) << "failed to open journal: " << cpp_strerror(r) << dendl; + shut_down(); + return; + } + + allocate_tag(); +} + +template <typename I> +void PromoteRequest<I>::allocate_tag() { + CephContext *cct = m_image_ctx->cct; + ldout(cct, 20) << dendl; + + journal::TagPredecessor predecessor; + if (!m_force && m_tag_data.mirror_uuid == Journal<>::ORPHAN_MIRROR_UUID) { + // orderly promotion -- demotion epoch will have a single entry + // so link to our predecessor (demotion) epoch + predecessor = TagPredecessor{Journal<>::ORPHAN_MIRROR_UUID, true, m_tag_tid, + 1}; + } else { + // forced promotion -- create an epoch no peers can link against + predecessor = TagPredecessor{Journal<>::LOCAL_MIRROR_UUID, true, m_tag_tid, + 0}; + } + + TagData tag_data; + tag_data.mirror_uuid = Journal<>::LOCAL_MIRROR_UUID; + tag_data.predecessor = predecessor; + + bufferlist tag_bl; + encode(tag_data, tag_bl); + + Context *ctx = create_context_callback< + PromoteRequest<I>, &PromoteRequest<I>::handle_allocate_tag>(this); + m_journaler->allocate_tag(m_client_meta.tag_class, tag_bl, &m_tag, ctx); +} + +template <typename I> +void PromoteRequest<I>::handle_allocate_tag(int r) { + CephContext *cct = m_image_ctx->cct; + ldout(cct, 20) << "r=" << r << dendl; + + if (r < 0) { + m_ret_val = r; + lderr(cct) << "failed to allocate tag: " << cpp_strerror(r) << dendl; + shut_down(); + return; + } + + m_tag_tid = m_tag.tid; + append_event(); +} + +template <typename I> +void PromoteRequest<I>::append_event() { + CephContext *cct = m_image_ctx->cct; + ldout(cct, 20) << dendl; + + EventEntry event_entry{DemotePromoteEvent{}, {}}; + bufferlist event_entry_bl; + encode(event_entry, event_entry_bl); + + m_journaler->start_append(0); + m_future = m_journaler->append(m_tag_tid, event_entry_bl); + + auto ctx = create_context_callback< + PromoteRequest<I>, &PromoteRequest<I>::handle_append_event>(this); + m_future.flush(ctx); +} + +template <typename I> +void PromoteRequest<I>::handle_append_event(int r) { + CephContext *cct = m_image_ctx->cct; + ldout(cct, 20) << "r=" << r << dendl; + + if (r < 0) { + m_ret_val = r; + lderr(cct) << "failed to append promotion journal event: " + << cpp_strerror(r) << dendl; + stop_append(); + return; + } + + commit_event(); +} + +template <typename I> +void PromoteRequest<I>::commit_event() { + CephContext *cct = m_image_ctx->cct; + ldout(cct, 20) << dendl; + + m_journaler->committed(m_future); + + auto ctx = create_context_callback< + PromoteRequest<I>, &PromoteRequest<I>::handle_commit_event>(this); + m_journaler->flush_commit_position(ctx); +} + +template <typename I> +void PromoteRequest<I>::handle_commit_event(int r) { + CephContext *cct = m_image_ctx->cct; + ldout(cct, 20) << "r=" << r << dendl; + + if (r < 0) { + m_ret_val = r; + lderr(cct) << "failed to flush promote commit position: " + << cpp_strerror(r) << dendl; + } + + stop_append(); +} + +template <typename I> +void PromoteRequest<I>::stop_append() { + CephContext *cct = m_image_ctx->cct; + ldout(cct, 20) << dendl; + + auto ctx = create_context_callback< + PromoteRequest<I>, &PromoteRequest<I>::handle_stop_append>(this); + m_journaler->stop_append(ctx); +} + +template <typename I> +void PromoteRequest<I>::handle_stop_append(int r) { + CephContext *cct = m_image_ctx->cct; + ldout(cct, 20) << "r=" << r << dendl; + + if (r < 0) { + if (m_ret_val == 0) { + m_ret_val = r; + } + lderr(cct) << "failed to stop journal append: " << cpp_strerror(r) << dendl; + } + + shut_down(); +} + +template <typename I> +void PromoteRequest<I>::shut_down() { + CephContext *cct = m_image_ctx->cct; + ldout(cct, 20) << dendl; + + Context *ctx = create_async_context_callback( + *m_image_ctx, create_context_callback< + PromoteRequest<I>, &PromoteRequest<I>::handle_shut_down>(this)); + m_journaler->shut_down(ctx); +} + +template <typename I> +void PromoteRequest<I>::handle_shut_down(int r) { + CephContext *cct = m_image_ctx->cct; + ldout(cct, 20) << "r=" << r << dendl; + + if (r < 0) { + lderr(cct) << "failed to shut down journal: " << cpp_strerror(r) << dendl; + } + + delete m_journaler; + finish(r); +} + +template <typename I> +void PromoteRequest<I>::finish(int r) { + if (m_ret_val < 0) { + r = m_ret_val; + } + + CephContext *cct = m_image_ctx->cct; + ldout(cct, 20) << "r=" << r << dendl; + + m_on_finish->complete(r); + delete this; +} + +} // namespace journal +} // namespace librbd + +template class librbd::journal::PromoteRequest<librbd::ImageCtx>; diff --git a/src/librbd/journal/PromoteRequest.h b/src/librbd/journal/PromoteRequest.h new file mode 100644 index 000000000..f6258066e --- /dev/null +++ b/src/librbd/journal/PromoteRequest.h @@ -0,0 +1,109 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#ifndef CEPH_LIBRBD_JOURNAL_PROMOTE_REQUEST_H +#define CEPH_LIBRBD_JOURNAL_PROMOTE_REQUEST_H + +#include "include/int_types.h" +#include "common/ceph_mutex.h" +#include "cls/journal/cls_journal_types.h" +#include "journal/Future.h" +#include "librbd/journal/Types.h" +#include "librbd/journal/TypeTraits.h" + +struct Context; + +namespace librbd { + +struct ImageCtx; + +namespace journal { + +template <typename ImageCtxT = ImageCtx> +class PromoteRequest { +public: + static PromoteRequest* create(ImageCtxT *image_ctx, bool force, + Context *on_finish) { + return new PromoteRequest(image_ctx, force, on_finish); + } + + PromoteRequest(ImageCtxT *image_ctx, bool force, Context *on_finish); + + void send(); + +private: + /** + * @verbatim + * + * <start> + * | + * v + * OPEN * * * * * * * * * * + * | * + * v * + * ALLOCATE_TAG * * * * * * + * | * + * v * + * APPEND_EVENT * * * * + * | * * + * v * * + * COMMIT_EVENT * * + * | * * + * v * * + * STOP_APPEND <* * * * + * | * + * v * + * SHUT_DOWN <* * * * * * * + * | + * v + * <finish> + * + * @endverbatim + */ + + typedef typename TypeTraits<ImageCtxT>::Journaler Journaler; + typedef typename TypeTraits<ImageCtxT>::Future Future; + + ImageCtxT *m_image_ctx; + bool m_force; + Context *m_on_finish; + + Journaler *m_journaler = nullptr; + int m_ret_val = 0; + + ceph::mutex m_lock; + ImageClientMeta m_client_meta; + uint64_t m_tag_tid = 0; + TagData m_tag_data; + + cls::journal::Tag m_tag; + Future m_future; + + void send_open(); + void handle_open(int r); + + void allocate_tag(); + void handle_allocate_tag(int r); + + void append_event(); + void handle_append_event(int r); + + void commit_event(); + void handle_commit_event(int r); + + void stop_append(); + void handle_stop_append(int r); + + void shut_down(); + void handle_shut_down(int r); + + void finish(int r); + +}; + +} // namespace journal +} // namespace librbd + +extern template class librbd::journal::PromoteRequest<librbd::ImageCtx>; + +#endif // CEPH_LIBRBD_JOURNAL_PROMOTE_REQUEST_H diff --git a/src/librbd/journal/RemoveRequest.cc b/src/librbd/journal/RemoveRequest.cc new file mode 100644 index 000000000..0f73a31ba --- /dev/null +++ b/src/librbd/journal/RemoveRequest.cc @@ -0,0 +1,153 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#include "common/dout.h" +#include "common/errno.h" +#include "common/Timer.h" +#include "journal/Settings.h" +#include "include/ceph_assert.h" +#include "librbd/Utils.h" +#include "librbd/journal/RemoveRequest.h" + +#define dout_subsys ceph_subsys_rbd +#undef dout_prefix +#define dout_prefix *_dout << "librbd::Journal::RemoveRequest: " + +namespace librbd { + +using util::create_context_callback; + +namespace journal { + +template<typename I> +RemoveRequest<I>::RemoveRequest(IoCtx &ioctx, const std::string &image_id, + const std::string &client_id, + ContextWQ *op_work_queue, + Context *on_finish) + : m_ioctx(ioctx), m_image_id(image_id), m_image_client_id(client_id), + m_op_work_queue(op_work_queue), m_on_finish(on_finish) { + m_cct = reinterpret_cast<CephContext *>(m_ioctx.cct()); +} + +template<typename I> +void RemoveRequest<I>::send() { + ldout(m_cct, 20) << this << " " << __func__ << dendl; + + stat_journal(); +} + +template<typename I> +void RemoveRequest<I>::stat_journal() { + ldout(m_cct, 20) << this << " " << __func__ << dendl; + + ImageCtx::get_timer_instance(m_cct, &m_timer, &m_timer_lock); + m_journaler = new Journaler(m_op_work_queue, m_timer, m_timer_lock, m_ioctx, + m_image_id, m_image_client_id, {}, nullptr); + + using klass = RemoveRequest<I>; + Context *ctx = create_context_callback<klass, &klass::handle_stat_journal>(this); + + m_journaler->exists(ctx); +} + +template<typename I> +Context *RemoveRequest<I>::handle_stat_journal(int *result) { + ldout(m_cct, 20) << __func__ << ": r=" << *result << dendl; + + if ((*result < 0) && (*result != -ENOENT)) { + lderr(m_cct) << "failed to stat journal header: " << cpp_strerror(*result) << dendl; + shut_down_journaler(*result); + return nullptr; + } + + if (*result == -ENOENT) { + shut_down_journaler(0); + return nullptr; + } + + init_journaler(); + return nullptr; +} + +template<typename I> +void RemoveRequest<I>::init_journaler() { + ldout(m_cct, 20) << this << " " << __func__ << dendl; + + using klass = RemoveRequest<I>; + Context *ctx = create_context_callback<klass, &klass::handle_init_journaler>(this); + + m_journaler->init(ctx); +} + +template<typename I> +Context *RemoveRequest<I>::handle_init_journaler(int *result) { + ldout(m_cct, 20) << __func__ << ": r=" << *result << dendl; + + if ((*result < 0) && (*result != -ENOENT)) { + lderr(m_cct) << "failed to init journaler: " << cpp_strerror(*result) << dendl; + shut_down_journaler(*result); + return nullptr; + } + + remove_journal(); + return nullptr; +} + +template<typename I> +void RemoveRequest<I>::remove_journal() { + ldout(m_cct, 20) << this << " " << __func__ << dendl; + + using klass = RemoveRequest<I>; + Context *ctx = create_context_callback<klass, &klass::handle_remove_journal>(this); + + m_journaler->remove(true, ctx); +} + +template<typename I> +Context *RemoveRequest<I>::handle_remove_journal(int *result) { + ldout(m_cct, 20) << __func__ << ": r=" << *result << dendl; + + if (*result < 0) { + lderr(m_cct) << "failed to remove journal: " << cpp_strerror(*result) << dendl; + } + + shut_down_journaler(*result); + return nullptr; +} + +template<typename I> +void RemoveRequest<I>::shut_down_journaler(int r) { + ldout(m_cct, 20) << this << " " << __func__ << dendl; + + m_r_saved = r; + + using klass = RemoveRequest<I>; + Context *ctx = create_context_callback<klass, &klass::handle_journaler_shutdown>(this); + + m_journaler->shut_down(ctx); +} + +template<typename I> +Context *RemoveRequest<I>::handle_journaler_shutdown(int *result) { + ldout(m_cct, 20) << __func__ << ": r=" << *result << dendl; + + if (*result < 0) { + lderr(m_cct) << "failed to shut down journaler: " << cpp_strerror(*result) << dendl; + } + + delete m_journaler; + + if (m_r_saved == 0) { + ldout(m_cct, 20) << "done." << dendl; + } + + m_on_finish->complete(m_r_saved); + delete this; + + return nullptr; +} + +} // namespace journal +} // namespace librbd + +template class librbd::journal::RemoveRequest<librbd::ImageCtx>; diff --git a/src/librbd/journal/RemoveRequest.h b/src/librbd/journal/RemoveRequest.h new file mode 100644 index 000000000..14b1c4dc5 --- /dev/null +++ b/src/librbd/journal/RemoveRequest.h @@ -0,0 +1,81 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#ifndef CEPH_LIBRBD_JOURNAL_REMOVE_REQUEST_H +#define CEPH_LIBRBD_JOURNAL_REMOVE_REQUEST_H + +#include "include/int_types.h" +#include "include/buffer.h" +#include "include/rados/librados.hpp" +#include "include/rbd/librbd.hpp" +#include "librbd/ImageCtx.h" +#include "journal/Journaler.h" +#include "librbd/journal/TypeTraits.h" +#include "common/Timer.h" + +using librados::IoCtx; +using journal::Journaler; + +class Context; +class ContextWQ; + +namespace journal { + class Journaler; +} + +namespace librbd { + +class ImageCtx; + +namespace journal { + +template<typename ImageCtxT = ImageCtx> +class RemoveRequest { +public: + static RemoveRequest *create(IoCtx &ioctx, const std::string &image_id, + const std::string &client_id, + ContextWQ *op_work_queue, Context *on_finish) { + return new RemoveRequest(ioctx, image_id, client_id, + op_work_queue, on_finish); + } + + void send(); + +private: + typedef typename TypeTraits<ImageCtxT>::Journaler Journaler; + + RemoveRequest(IoCtx &ioctx, const std::string &image_id, + const std::string &client_id, + ContextWQ *op_work_queue, Context *on_finish); + + IoCtx &m_ioctx; + std::string m_image_id; + std::string m_image_client_id; + ContextWQ *m_op_work_queue; + Context *m_on_finish; + + CephContext *m_cct; + Journaler *m_journaler; + SafeTimer *m_timer; + ceph::mutex *m_timer_lock; + int m_r_saved; + + void stat_journal(); + Context *handle_stat_journal(int *result); + + void init_journaler(); + Context *handle_init_journaler(int *result); + + void remove_journal(); + Context *handle_remove_journal(int *result); + + void shut_down_journaler(int r); + Context *handle_journaler_shutdown(int *result); +}; + +} // namespace journal +} // namespace librbd + +extern template class librbd::journal::RemoveRequest<librbd::ImageCtx>; + +#endif // CEPH_LIBRBD_JOURNAL_REMOVE_REQUEST_H diff --git a/src/librbd/journal/Replay.cc b/src/librbd/journal/Replay.cc new file mode 100644 index 000000000..db73edb61 --- /dev/null +++ b/src/librbd/journal/Replay.cc @@ -0,0 +1,1177 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#include "librbd/journal/Replay.h" +#include "common/dout.h" +#include "common/errno.h" +#include "librbd/ExclusiveLock.h" +#include "librbd/ImageCtx.h" +#include "librbd/ImageState.h" +#include "librbd/internal.h" +#include "librbd/Operations.h" +#include "librbd/Utils.h" +#include "librbd/asio/ContextWQ.h" +#include "librbd/io/AioCompletion.h" +#include "librbd/io/ImageRequest.h" + +#define dout_subsys ceph_subsys_rbd +#undef dout_prefix +#define dout_prefix *_dout << "librbd::journal::Replay: " << this << " " + +namespace librbd { +namespace journal { + +namespace { + +static const uint64_t IN_FLIGHT_IO_LOW_WATER_MARK(32); +static const uint64_t IN_FLIGHT_IO_HIGH_WATER_MARK(64); + +static NoOpProgressContext no_op_progress_callback; + +template <typename I, typename E> +struct ExecuteOp : public Context { + I &image_ctx; + E event; + Context *on_op_complete; + + ExecuteOp(I &image_ctx, const E &event, Context *on_op_complete) + : image_ctx(image_ctx), event(event), on_op_complete(on_op_complete) { + } + + void execute(const journal::SnapCreateEvent &_) { + image_ctx.operations->execute_snap_create(event.snap_namespace, + event.snap_name, + on_op_complete, + event.op_tid, + SNAP_CREATE_FLAG_SKIP_NOTIFY_QUIESCE, + no_op_progress_callback); + } + + void execute(const journal::SnapRemoveEvent &_) { + image_ctx.operations->execute_snap_remove(event.snap_namespace, + event.snap_name, + on_op_complete); + } + + void execute(const journal::SnapRenameEvent &_) { + image_ctx.operations->execute_snap_rename(event.snap_id, + event.dst_snap_name, + on_op_complete); + } + + void execute(const journal::SnapProtectEvent &_) { + image_ctx.operations->execute_snap_protect(event.snap_namespace, + event.snap_name, + on_op_complete); + } + + void execute(const journal::SnapUnprotectEvent &_) { + image_ctx.operations->execute_snap_unprotect(event.snap_namespace, + event.snap_name, + on_op_complete); + } + + void execute(const journal::SnapRollbackEvent &_) { + image_ctx.operations->execute_snap_rollback(event.snap_namespace, + event.snap_name, + no_op_progress_callback, + on_op_complete); + } + + void execute(const journal::RenameEvent &_) { + image_ctx.operations->execute_rename(event.image_name, + on_op_complete); + } + + void execute(const journal::ResizeEvent &_) { + image_ctx.operations->execute_resize(event.size, true, no_op_progress_callback, + on_op_complete, event.op_tid); + } + + void execute(const journal::FlattenEvent &_) { + image_ctx.operations->execute_flatten(no_op_progress_callback, + on_op_complete); + } + + void execute(const journal::SnapLimitEvent &_) { + image_ctx.operations->execute_snap_set_limit(event.limit, on_op_complete); + } + + void execute(const journal::UpdateFeaturesEvent &_) { + image_ctx.operations->execute_update_features(event.features, event.enabled, + on_op_complete, event.op_tid); + } + + void execute(const journal::MetadataSetEvent &_) { + image_ctx.operations->execute_metadata_set(event.key, event.value, + on_op_complete); + } + + void execute(const journal::MetadataRemoveEvent &_) { + image_ctx.operations->execute_metadata_remove(event.key, on_op_complete); + } + + void finish(int r) override { + CephContext *cct = image_ctx.cct; + if (r < 0) { + lderr(cct) << ": ExecuteOp::" << __func__ << ": r=" << r << dendl; + on_op_complete->complete(r); + return; + } + + ldout(cct, 20) << ": ExecuteOp::" << __func__ << dendl; + std::shared_lock owner_locker{image_ctx.owner_lock}; + + if (image_ctx.exclusive_lock == nullptr || + !image_ctx.exclusive_lock->accept_ops()) { + ldout(cct, 5) << ": lost exclusive lock -- skipping op" << dendl; + on_op_complete->complete(-ECANCELED); + return; + } + + execute(event); + } +}; + +template <typename I> +struct C_RefreshIfRequired : public Context { + I &image_ctx; + Context *on_finish; + + C_RefreshIfRequired(I &image_ctx, Context *on_finish) + : image_ctx(image_ctx), on_finish(on_finish) { + } + ~C_RefreshIfRequired() override { + delete on_finish; + } + + void finish(int r) override { + CephContext *cct = image_ctx.cct; + Context *ctx = on_finish; + on_finish = nullptr; + + if (r < 0) { + lderr(cct) << ": C_RefreshIfRequired::" << __func__ << ": r=" << r << dendl; + image_ctx.op_work_queue->queue(ctx, r); + return; + } + + if (image_ctx.state->is_refresh_required()) { + ldout(cct, 20) << ": C_RefreshIfRequired::" << __func__ << ": " + << "refresh required" << dendl; + image_ctx.state->refresh(ctx); + return; + } + + image_ctx.op_work_queue->queue(ctx, 0); + } +}; + +} // anonymous namespace + +#undef dout_prefix +#define dout_prefix *_dout << "librbd::journal::Replay: " << this << " " \ + << __func__ + +template <typename I> +Replay<I>::Replay(I &image_ctx) + : m_image_ctx(image_ctx) { +} + +template <typename I> +Replay<I>::~Replay() { + std::lock_guard locker{m_lock}; + ceph_assert(m_in_flight_aio_flush == 0); + ceph_assert(m_in_flight_aio_modify == 0); + ceph_assert(m_aio_modify_unsafe_contexts.empty()); + ceph_assert(m_aio_modify_safe_contexts.empty()); + ceph_assert(m_op_events.empty()); + ceph_assert(m_in_flight_op_events == 0); +} + +template <typename I> +int Replay<I>::decode(bufferlist::const_iterator *it, EventEntry *event_entry) { + try { + using ceph::decode; + decode(*event_entry, *it); + } catch (const buffer::error &err) { + return -EBADMSG; + } + return 0; +} + +template <typename I> +void Replay<I>::process(const EventEntry &event_entry, + Context *on_ready, Context *on_safe) { + CephContext *cct = m_image_ctx.cct; + ldout(cct, 20) << ": on_ready=" << on_ready << ", on_safe=" << on_safe + << dendl; + + on_ready = util::create_async_context_callback(m_image_ctx, on_ready); + + std::shared_lock owner_lock{m_image_ctx.owner_lock}; + if (m_image_ctx.exclusive_lock == nullptr || + !m_image_ctx.exclusive_lock->accept_ops()) { + ldout(cct, 5) << ": lost exclusive lock -- skipping event" << dendl; + m_image_ctx.op_work_queue->queue(on_safe, -ECANCELED); + on_ready->complete(0); + return; + } + + boost::apply_visitor(EventVisitor(this, on_ready, on_safe), + event_entry.event); +} + +template <typename I> +void Replay<I>::shut_down(bool cancel_ops, Context *on_finish) { + CephContext *cct = m_image_ctx.cct; + ldout(cct, 20) << dendl; + + io::AioCompletion *flush_comp = nullptr; + on_finish = util::create_async_context_callback( + m_image_ctx, on_finish); + + { + std::lock_guard locker{m_lock}; + + // safely commit any remaining AIO modify operations + if ((m_in_flight_aio_flush + m_in_flight_aio_modify) != 0) { + flush_comp = create_aio_flush_completion(nullptr); + ceph_assert(flush_comp != nullptr); + } + + for (auto &op_event_pair : m_op_events) { + OpEvent &op_event = op_event_pair.second; + if (cancel_ops) { + // cancel ops that are waiting to start (waiting for + // OpFinishEvent or waiting for ready) + if (op_event.on_start_ready == nullptr && + op_event.on_op_finish_event != nullptr) { + Context *on_op_finish_event = nullptr; + std::swap(on_op_finish_event, op_event.on_op_finish_event); + m_image_ctx.op_work_queue->queue(on_op_finish_event, -ERESTART); + } + } else if (op_event.on_op_finish_event != nullptr) { + // start ops waiting for OpFinishEvent + Context *on_op_finish_event = nullptr; + std::swap(on_op_finish_event, op_event.on_op_finish_event); + m_image_ctx.op_work_queue->queue(on_op_finish_event, 0); + } else if (op_event.on_start_ready != nullptr) { + // waiting for op ready + op_event_pair.second.finish_on_ready = true; + } + } + + ceph_assert(!m_shut_down); + m_shut_down = true; + + ceph_assert(m_flush_ctx == nullptr); + if (m_in_flight_op_events > 0 || flush_comp != nullptr) { + std::swap(m_flush_ctx, on_finish); + } + } + + // execute the following outside of lock scope + if (flush_comp != nullptr) { + std::shared_lock owner_locker{m_image_ctx.owner_lock}; + io::ImageRequest<I>::aio_flush(&m_image_ctx, flush_comp, + io::FLUSH_SOURCE_INTERNAL, {}); + } + if (on_finish != nullptr) { + on_finish->complete(0); + } +} + +template <typename I> +void Replay<I>::flush(Context *on_finish) { + io::AioCompletion *aio_comp; + { + std::lock_guard locker{m_lock}; + aio_comp = create_aio_flush_completion( + util::create_async_context_callback(m_image_ctx, on_finish)); + if (aio_comp == nullptr) { + return; + } + } + + std::shared_lock owner_locker{m_image_ctx.owner_lock}; + io::ImageRequest<I>::aio_flush(&m_image_ctx, aio_comp, + io::FLUSH_SOURCE_INTERNAL, {}); +} + +template <typename I> +void Replay<I>::replay_op_ready(uint64_t op_tid, Context *on_resume) { + CephContext *cct = m_image_ctx.cct; + ldout(cct, 20) << ": op_tid=" << op_tid << dendl; + + std::lock_guard locker{m_lock}; + auto op_it = m_op_events.find(op_tid); + ceph_assert(op_it != m_op_events.end()); + + OpEvent &op_event = op_it->second; + ceph_assert(op_event.op_in_progress && + op_event.on_op_finish_event == nullptr && + op_event.on_finish_ready == nullptr && + op_event.on_finish_safe == nullptr); + + // resume processing replay events + Context *on_start_ready = nullptr; + std::swap(on_start_ready, op_event.on_start_ready); + on_start_ready->complete(0); + + // cancel has been requested -- send error to paused state machine + if (!op_event.finish_on_ready && m_flush_ctx != nullptr) { + m_image_ctx.op_work_queue->queue(on_resume, -ERESTART); + return; + } + + // resume the op state machine once the associated OpFinishEvent + // is processed + op_event.on_op_finish_event = new LambdaContext( + [on_resume](int r) { + on_resume->complete(r); + }); + + // shut down request -- don't expect OpFinishEvent + if (op_event.finish_on_ready) { + m_image_ctx.op_work_queue->queue(on_resume, 0); + } +} + +template <typename I> +void Replay<I>::handle_event(const journal::AioDiscardEvent &event, + Context *on_ready, Context *on_safe) { + CephContext *cct = m_image_ctx.cct; + ldout(cct, 20) << ": AIO discard event" << dendl; + + bool flush_required; + auto aio_comp = create_aio_modify_completion(on_ready, on_safe, + io::AIO_TYPE_DISCARD, + &flush_required, + {}); + if (aio_comp == nullptr) { + return; + } + + if (!clipped_io(event.offset, aio_comp)) { + io::ImageRequest<I>::aio_discard(&m_image_ctx, aio_comp, + {{event.offset, event.length}}, + event.discard_granularity_bytes, + m_image_ctx.get_data_io_context(), {}); + } + + if (flush_required) { + m_lock.lock(); + auto flush_comp = create_aio_flush_completion(nullptr); + m_lock.unlock(); + + if (flush_comp != nullptr) { + io::ImageRequest<I>::aio_flush(&m_image_ctx, flush_comp, + io::FLUSH_SOURCE_INTERNAL, {}); + } + } +} + +template <typename I> +void Replay<I>::handle_event(const journal::AioWriteEvent &event, + Context *on_ready, Context *on_safe) { + CephContext *cct = m_image_ctx.cct; + ldout(cct, 20) << ": AIO write event" << dendl; + + bufferlist data = event.data; + bool flush_required; + auto aio_comp = create_aio_modify_completion(on_ready, on_safe, + io::AIO_TYPE_WRITE, + &flush_required, + {}); + if (aio_comp == nullptr) { + return; + } + + if (!clipped_io(event.offset, aio_comp)) { + io::ImageRequest<I>::aio_write(&m_image_ctx, aio_comp, + {{event.offset, event.length}}, + std::move(data), + m_image_ctx.get_data_io_context(), 0, {}); + } + + if (flush_required) { + m_lock.lock(); + auto flush_comp = create_aio_flush_completion(nullptr); + m_lock.unlock(); + + if (flush_comp != nullptr) { + io::ImageRequest<I>::aio_flush(&m_image_ctx, flush_comp, + io::FLUSH_SOURCE_INTERNAL, {}); + } + } +} + +template <typename I> +void Replay<I>::handle_event(const journal::AioFlushEvent &event, + Context *on_ready, Context *on_safe) { + CephContext *cct = m_image_ctx.cct; + ldout(cct, 20) << ": AIO flush event" << dendl; + + io::AioCompletion *aio_comp; + { + std::lock_guard locker{m_lock}; + aio_comp = create_aio_flush_completion(on_safe); + } + + if (aio_comp != nullptr) { + io::ImageRequest<I>::aio_flush(&m_image_ctx, aio_comp, + io::FLUSH_SOURCE_INTERNAL, {}); + } + on_ready->complete(0); +} + +template <typename I> +void Replay<I>::handle_event(const journal::AioWriteSameEvent &event, + Context *on_ready, Context *on_safe) { + CephContext *cct = m_image_ctx.cct; + ldout(cct, 20) << ": AIO writesame event" << dendl; + + bufferlist data = event.data; + bool flush_required; + auto aio_comp = create_aio_modify_completion(on_ready, on_safe, + io::AIO_TYPE_WRITESAME, + &flush_required, + {}); + if (aio_comp == nullptr) { + return; + } + + if (!clipped_io(event.offset, aio_comp)) { + io::ImageRequest<I>::aio_writesame(&m_image_ctx, aio_comp, + {{event.offset, event.length}}, + std::move(data), + m_image_ctx.get_data_io_context(), 0, + {}); + } + + if (flush_required) { + m_lock.lock(); + auto flush_comp = create_aio_flush_completion(nullptr); + m_lock.unlock(); + + if (flush_comp != nullptr) { + io::ImageRequest<I>::aio_flush(&m_image_ctx, flush_comp, + io::FLUSH_SOURCE_INTERNAL, {}); + } + } +} + + template <typename I> + void Replay<I>::handle_event(const journal::AioCompareAndWriteEvent &event, + Context *on_ready, Context *on_safe) { + CephContext *cct = m_image_ctx.cct; + ldout(cct, 20) << ": AIO CompareAndWrite event" << dendl; + + bufferlist cmp_data = event.cmp_data; + bufferlist write_data = event.write_data; + bool flush_required; + auto aio_comp = create_aio_modify_completion(on_ready, on_safe, + io::AIO_TYPE_COMPARE_AND_WRITE, + &flush_required, + {-EILSEQ}); + + if (!clipped_io(event.offset, aio_comp)) { + io::ImageRequest<I>::aio_compare_and_write(&m_image_ctx, aio_comp, + {{event.offset, event.length}}, + std::move(cmp_data), + std::move(write_data), + nullptr, + m_image_ctx.get_data_io_context(), + 0, {}); + } + + if (flush_required) { + m_lock.lock(); + auto flush_comp = create_aio_flush_completion(nullptr); + m_lock.unlock(); + + io::ImageRequest<I>::aio_flush(&m_image_ctx, flush_comp, + io::FLUSH_SOURCE_INTERNAL, {}); + } +} + +template <typename I> +void Replay<I>::handle_event(const journal::OpFinishEvent &event, + Context *on_ready, Context *on_safe) { + CephContext *cct = m_image_ctx.cct; + ldout(cct, 20) << ": Op finish event: " + << "op_tid=" << event.op_tid << dendl; + + bool op_in_progress; + bool filter_ret_val; + Context *on_op_complete = nullptr; + Context *on_op_finish_event = nullptr; + { + std::lock_guard locker{m_lock}; + auto op_it = m_op_events.find(event.op_tid); + if (op_it == m_op_events.end()) { + ldout(cct, 10) << ": unable to locate associated op: assuming previously " + << "committed." << dendl; + on_ready->complete(0); + m_image_ctx.op_work_queue->queue(on_safe, 0); + return; + } + + OpEvent &op_event = op_it->second; + ceph_assert(op_event.on_finish_safe == nullptr); + op_event.on_finish_ready = on_ready; + op_event.on_finish_safe = on_safe; + op_in_progress = op_event.op_in_progress; + std::swap(on_op_complete, op_event.on_op_complete); + std::swap(on_op_finish_event, op_event.on_op_finish_event); + + // special errors which indicate op never started but was recorded + // as failed in the journal + filter_ret_val = (op_event.op_finish_error_codes.count(event.r) != 0); + } + + if (event.r < 0) { + if (op_in_progress) { + // bubble the error up to the in-progress op to cancel it + on_op_finish_event->complete(event.r); + } else { + // op hasn't been started -- bubble the error up since + // our image is now potentially in an inconsistent state + // since simple errors should have been caught before + // creating the op event + delete on_op_complete; + delete on_op_finish_event; + handle_op_complete(event.op_tid, filter_ret_val ? 0 : event.r); + } + return; + } + + // journal recorded success -- apply the op now + on_op_finish_event->complete(0); +} + +template <typename I> +void Replay<I>::handle_event(const journal::SnapCreateEvent &event, + Context *on_ready, Context *on_safe) { + CephContext *cct = m_image_ctx.cct; + ldout(cct, 20) << ": Snap create event" << dendl; + + std::lock_guard locker{m_lock}; + OpEvent *op_event; + Context *on_op_complete = create_op_context_callback(event.op_tid, on_ready, + on_safe, &op_event); + if (on_op_complete == nullptr) { + return; + } + + // ignore errors caused due to replay + op_event->ignore_error_codes = {-EEXIST}; + + // avoid lock cycles + m_image_ctx.op_work_queue->queue(new C_RefreshIfRequired<I>( + m_image_ctx, new ExecuteOp<I, journal::SnapCreateEvent>(m_image_ctx, event, + on_op_complete)), + 0); + + // do not process more events until the state machine is ready + // since it will affect IO + op_event->op_in_progress = true; + op_event->on_start_ready = on_ready; +} + +template <typename I> +void Replay<I>::handle_event(const journal::SnapRemoveEvent &event, + Context *on_ready, Context *on_safe) { + CephContext *cct = m_image_ctx.cct; + ldout(cct, 20) << ": Snap remove event" << dendl; + + std::lock_guard locker{m_lock}; + OpEvent *op_event; + Context *on_op_complete = create_op_context_callback(event.op_tid, on_ready, + on_safe, &op_event); + if (on_op_complete == nullptr) { + return; + } + + op_event->on_op_finish_event = new C_RefreshIfRequired<I>( + m_image_ctx, new ExecuteOp<I, journal::SnapRemoveEvent>(m_image_ctx, event, + on_op_complete)); + + // ignore errors caused due to replay + op_event->ignore_error_codes = {-ENOENT}; + + on_ready->complete(0); +} + +template <typename I> +void Replay<I>::handle_event(const journal::SnapRenameEvent &event, + Context *on_ready, Context *on_safe) { + CephContext *cct = m_image_ctx.cct; + ldout(cct, 20) << ": Snap rename event" << dendl; + + std::lock_guard locker{m_lock}; + OpEvent *op_event; + Context *on_op_complete = create_op_context_callback(event.op_tid, on_ready, + on_safe, &op_event); + if (on_op_complete == nullptr) { + return; + } + + op_event->on_op_finish_event = new C_RefreshIfRequired<I>( + m_image_ctx, new ExecuteOp<I, journal::SnapRenameEvent>(m_image_ctx, event, + on_op_complete)); + + // ignore errors caused due to replay + op_event->ignore_error_codes = {-EEXIST}; + + on_ready->complete(0); +} + +template <typename I> +void Replay<I>::handle_event(const journal::SnapProtectEvent &event, + Context *on_ready, Context *on_safe) { + CephContext *cct = m_image_ctx.cct; + ldout(cct, 20) << ": Snap protect event" << dendl; + + std::lock_guard locker{m_lock}; + OpEvent *op_event; + Context *on_op_complete = create_op_context_callback(event.op_tid, on_ready, + on_safe, &op_event); + if (on_op_complete == nullptr) { + return; + } + + op_event->on_op_finish_event = new C_RefreshIfRequired<I>( + m_image_ctx, new ExecuteOp<I, journal::SnapProtectEvent>(m_image_ctx, event, + on_op_complete)); + + // ignore errors caused due to replay + op_event->ignore_error_codes = {-EBUSY}; + + on_ready->complete(0); +} + +template <typename I> +void Replay<I>::handle_event(const journal::SnapUnprotectEvent &event, + Context *on_ready, Context *on_safe) { + CephContext *cct = m_image_ctx.cct; + ldout(cct, 20) << ": Snap unprotect event" << dendl; + + std::lock_guard locker{m_lock}; + OpEvent *op_event; + Context *on_op_complete = create_op_context_callback(event.op_tid, on_ready, + on_safe, &op_event); + if (on_op_complete == nullptr) { + return; + } + + op_event->on_op_finish_event = new C_RefreshIfRequired<I>( + m_image_ctx, new ExecuteOp<I, journal::SnapUnprotectEvent>(m_image_ctx, + event, + on_op_complete)); + + // ignore errors recorded in the journal + op_event->op_finish_error_codes = {-EBUSY}; + + // ignore errors caused due to replay + op_event->ignore_error_codes = {-EINVAL}; + + on_ready->complete(0); +} + +template <typename I> +void Replay<I>::handle_event(const journal::SnapRollbackEvent &event, + Context *on_ready, Context *on_safe) { + CephContext *cct = m_image_ctx.cct; + ldout(cct, 20) << ": Snap rollback start event" << dendl; + + std::lock_guard locker{m_lock}; + OpEvent *op_event; + Context *on_op_complete = create_op_context_callback(event.op_tid, on_ready, + on_safe, &op_event); + if (on_op_complete == nullptr) { + return; + } + + op_event->on_op_finish_event = new C_RefreshIfRequired<I>( + m_image_ctx, new ExecuteOp<I, journal::SnapRollbackEvent>(m_image_ctx, + event, + on_op_complete)); + + on_ready->complete(0); +} + +template <typename I> +void Replay<I>::handle_event(const journal::RenameEvent &event, + Context *on_ready, Context *on_safe) { + CephContext *cct = m_image_ctx.cct; + ldout(cct, 20) << ": Rename event" << dendl; + + std::lock_guard locker{m_lock}; + OpEvent *op_event; + Context *on_op_complete = create_op_context_callback(event.op_tid, on_ready, + on_safe, &op_event); + if (on_op_complete == nullptr) { + return; + } + + op_event->on_op_finish_event = new C_RefreshIfRequired<I>( + m_image_ctx, new ExecuteOp<I, journal::RenameEvent>(m_image_ctx, event, + on_op_complete)); + + // ignore errors caused due to replay + op_event->ignore_error_codes = {-EEXIST}; + + on_ready->complete(0); +} + +template <typename I> +void Replay<I>::handle_event(const journal::ResizeEvent &event, + Context *on_ready, Context *on_safe) { + CephContext *cct = m_image_ctx.cct; + ldout(cct, 20) << ": Resize start event" << dendl; + + std::lock_guard locker{m_lock}; + OpEvent *op_event; + Context *on_op_complete = create_op_context_callback(event.op_tid, on_ready, + on_safe, &op_event); + if (on_op_complete == nullptr) { + return; + } + + // avoid lock cycles + m_image_ctx.op_work_queue->queue(new C_RefreshIfRequired<I>( + m_image_ctx, new ExecuteOp<I, journal::ResizeEvent>(m_image_ctx, event, + on_op_complete)), 0); + + // do not process more events until the state machine is ready + // since it will affect IO + op_event->op_in_progress = true; + op_event->on_start_ready = on_ready; +} + +template <typename I> +void Replay<I>::handle_event(const journal::FlattenEvent &event, + Context *on_ready, Context *on_safe) { + CephContext *cct = m_image_ctx.cct; + ldout(cct, 20) << ": Flatten start event" << dendl; + + std::lock_guard locker{m_lock}; + OpEvent *op_event; + Context *on_op_complete = create_op_context_callback(event.op_tid, on_ready, + on_safe, &op_event); + if (on_op_complete == nullptr) { + return; + } + + op_event->on_op_finish_event = new C_RefreshIfRequired<I>( + m_image_ctx, new ExecuteOp<I, journal::FlattenEvent>(m_image_ctx, event, + on_op_complete)); + + // ignore errors caused due to replay + op_event->ignore_error_codes = {-EINVAL}; + + on_ready->complete(0); +} + +template <typename I> +void Replay<I>::handle_event(const journal::DemotePromoteEvent &event, + Context *on_ready, Context *on_safe) { + CephContext *cct = m_image_ctx.cct; + ldout(cct, 20) << ": Demote/Promote event" << dendl; + on_ready->complete(0); + on_safe->complete(0); +} + +template <typename I> +void Replay<I>::handle_event(const journal::SnapLimitEvent &event, + Context *on_ready, Context *on_safe) { + CephContext *cct = m_image_ctx.cct; + ldout(cct, 20) << ": Snap limit event" << dendl; + + std::lock_guard locker{m_lock}; + OpEvent *op_event; + Context *on_op_complete = create_op_context_callback(event.op_tid, on_ready, + on_safe, &op_event); + if (on_op_complete == nullptr) { + return; + } + + op_event->on_op_finish_event = new C_RefreshIfRequired<I>( + m_image_ctx, new ExecuteOp<I, journal::SnapLimitEvent>(m_image_ctx, + event, + on_op_complete)); + + op_event->ignore_error_codes = {-ERANGE}; + + on_ready->complete(0); +} + +template <typename I> +void Replay<I>::handle_event(const journal::UpdateFeaturesEvent &event, + Context *on_ready, Context *on_safe) { + CephContext *cct = m_image_ctx.cct; + ldout(cct, 20) << ": Update features event" << dendl; + + std::lock_guard locker{m_lock}; + OpEvent *op_event; + Context *on_op_complete = create_op_context_callback(event.op_tid, on_ready, + on_safe, &op_event); + if (on_op_complete == nullptr) { + return; + } + + // avoid lock cycles + m_image_ctx.op_work_queue->queue(new C_RefreshIfRequired<I>( + m_image_ctx, new ExecuteOp<I, journal::UpdateFeaturesEvent>( + m_image_ctx, event, on_op_complete)), 0); + + // do not process more events until the state machine is ready + // since it will affect IO + op_event->op_in_progress = true; + op_event->on_start_ready = on_ready; +} + +template <typename I> +void Replay<I>::handle_event(const journal::MetadataSetEvent &event, + Context *on_ready, Context *on_safe) { + CephContext *cct = m_image_ctx.cct; + ldout(cct, 20) << ": Metadata set event" << dendl; + + std::lock_guard locker{m_lock}; + OpEvent *op_event; + Context *on_op_complete = create_op_context_callback(event.op_tid, on_ready, + on_safe, &op_event); + if (on_op_complete == nullptr) { + return; + } + + on_op_complete = new C_RefreshIfRequired<I>(m_image_ctx, on_op_complete); + op_event->on_op_finish_event = util::create_async_context_callback( + m_image_ctx, new ExecuteOp<I, journal::MetadataSetEvent>( + m_image_ctx, event, on_op_complete)); + + on_ready->complete(0); +} + +template <typename I> +void Replay<I>::handle_event(const journal::MetadataRemoveEvent &event, + Context *on_ready, Context *on_safe) { + CephContext *cct = m_image_ctx.cct; + ldout(cct, 20) << ": Metadata remove event" << dendl; + + std::lock_guard locker{m_lock}; + OpEvent *op_event; + Context *on_op_complete = create_op_context_callback(event.op_tid, on_ready, + on_safe, &op_event); + if (on_op_complete == nullptr) { + return; + } + + on_op_complete = new C_RefreshIfRequired<I>(m_image_ctx, on_op_complete); + op_event->on_op_finish_event = util::create_async_context_callback( + m_image_ctx, new ExecuteOp<I, journal::MetadataRemoveEvent>( + m_image_ctx, event, on_op_complete)); + + // ignore errors caused due to replay + op_event->ignore_error_codes = {-ENOENT}; + + on_ready->complete(0); +} + +template <typename I> +void Replay<I>::handle_event(const journal::UnknownEvent &event, + Context *on_ready, Context *on_safe) { + CephContext *cct = m_image_ctx.cct; + ldout(cct, 20) << ": unknown event" << dendl; + on_ready->complete(0); + on_safe->complete(0); +} + +template <typename I> +void Replay<I>::handle_aio_modify_complete(Context *on_ready, Context *on_safe, + int r, std::set<int> &filters) { + std::lock_guard locker{m_lock}; + CephContext *cct = m_image_ctx.cct; + ldout(cct, 20) << ": on_ready=" << on_ready << ", " + << "on_safe=" << on_safe << ", r=" << r << dendl; + + if (on_ready != nullptr) { + on_ready->complete(0); + } + + if (filters.find(r) != filters.end()) + r = 0; + + if (r < 0) { + lderr(cct) << ": AIO modify op failed: " << cpp_strerror(r) << dendl; + m_image_ctx.op_work_queue->queue(on_safe, r); + return; + } + + // will be completed after next flush operation completes + m_aio_modify_safe_contexts.insert(on_safe); +} + +template <typename I> +void Replay<I>::handle_aio_flush_complete(Context *on_flush_safe, + Contexts &on_safe_ctxs, int r) { + CephContext *cct = m_image_ctx.cct; + ldout(cct, 20) << ": r=" << r << dendl; + + if (r < 0) { + lderr(cct) << ": AIO flush failed: " << cpp_strerror(r) << dendl; + } + + Context *on_aio_ready = nullptr; + Context *on_flush = nullptr; + { + std::lock_guard locker{m_lock}; + ceph_assert(m_in_flight_aio_flush > 0); + ceph_assert(m_in_flight_aio_modify >= on_safe_ctxs.size()); + --m_in_flight_aio_flush; + m_in_flight_aio_modify -= on_safe_ctxs.size(); + + std::swap(on_aio_ready, m_on_aio_ready); + if (m_in_flight_op_events == 0 && + (m_in_flight_aio_flush + m_in_flight_aio_modify) == 0) { + on_flush = m_flush_ctx; + } + + // strip out previously failed on_safe contexts + for (auto it = on_safe_ctxs.begin(); it != on_safe_ctxs.end(); ) { + if (m_aio_modify_safe_contexts.erase(*it)) { + ++it; + } else { + it = on_safe_ctxs.erase(it); + } + } + } + + if (on_aio_ready != nullptr) { + ldout(cct, 10) << ": resuming paused AIO" << dendl; + on_aio_ready->complete(0); + } + + if (on_flush_safe != nullptr) { + on_safe_ctxs.push_back(on_flush_safe); + } + for (auto ctx : on_safe_ctxs) { + ldout(cct, 20) << ": completing safe context: " << ctx << dendl; + ctx->complete(r); + } + + if (on_flush != nullptr) { + ldout(cct, 20) << ": completing flush context: " << on_flush << dendl; + on_flush->complete(r); + } +} + +template <typename I> +Context *Replay<I>::create_op_context_callback(uint64_t op_tid, + Context *on_ready, + Context *on_safe, + OpEvent **op_event) { + CephContext *cct = m_image_ctx.cct; + if (m_shut_down) { + ldout(cct, 5) << ": ignoring event after shut down" << dendl; + on_ready->complete(0); + m_image_ctx.op_work_queue->queue(on_safe, -ESHUTDOWN); + return nullptr; + } + + ceph_assert(ceph_mutex_is_locked(m_lock)); + if (m_op_events.count(op_tid) != 0) { + lderr(cct) << ": duplicate op tid detected: " << op_tid << dendl; + + // on_ready is already async but on failure invoke on_safe async + // as well + on_ready->complete(0); + m_image_ctx.op_work_queue->queue(on_safe, -EINVAL); + return nullptr; + } + + ++m_in_flight_op_events; + *op_event = &m_op_events[op_tid]; + (*op_event)->on_start_safe = on_safe; + + Context *on_op_complete = new C_OpOnComplete(this, op_tid); + (*op_event)->on_op_complete = on_op_complete; + return on_op_complete; +} + +template <typename I> +void Replay<I>::handle_op_complete(uint64_t op_tid, int r) { + CephContext *cct = m_image_ctx.cct; + ldout(cct, 20) << ": op_tid=" << op_tid << ", " + << "r=" << r << dendl; + + OpEvent op_event; + bool shutting_down = false; + { + std::lock_guard locker{m_lock}; + auto op_it = m_op_events.find(op_tid); + ceph_assert(op_it != m_op_events.end()); + + op_event = std::move(op_it->second); + m_op_events.erase(op_it); + + if (m_shut_down) { + ceph_assert(m_flush_ctx != nullptr); + shutting_down = true; + } + } + + ceph_assert(op_event.on_start_ready == nullptr || (r < 0 && r != -ERESTART)); + if (op_event.on_start_ready != nullptr) { + // blocking op event failed before it became ready + ceph_assert(op_event.on_finish_ready == nullptr && + op_event.on_finish_safe == nullptr); + + op_event.on_start_ready->complete(0); + } else { + // event kicked off by OpFinishEvent + ceph_assert((op_event.on_finish_ready != nullptr && + op_event.on_finish_safe != nullptr) || shutting_down); + } + + if (op_event.on_op_finish_event != nullptr) { + op_event.on_op_finish_event->complete(r); + } + + if (op_event.on_finish_ready != nullptr) { + op_event.on_finish_ready->complete(0); + } + + // filter out errors caused by replay of the same op + if (r < 0 && op_event.ignore_error_codes.count(r) != 0) { + r = 0; + } + + op_event.on_start_safe->complete(r); + if (op_event.on_finish_safe != nullptr) { + op_event.on_finish_safe->complete(r); + } + + // shut down request might have occurred while lock was + // dropped -- handle if pending + Context *on_flush = nullptr; + { + std::lock_guard locker{m_lock}; + ceph_assert(m_in_flight_op_events > 0); + --m_in_flight_op_events; + if (m_in_flight_op_events == 0 && + (m_in_flight_aio_flush + m_in_flight_aio_modify) == 0) { + on_flush = m_flush_ctx; + } + } + if (on_flush != nullptr) { + m_image_ctx.op_work_queue->queue(on_flush, 0); + } +} + +template <typename I> +io::AioCompletion * +Replay<I>::create_aio_modify_completion(Context *on_ready, + Context *on_safe, + io::aio_type_t aio_type, + bool *flush_required, + std::set<int> &&filters) { + std::lock_guard locker{m_lock}; + CephContext *cct = m_image_ctx.cct; + ceph_assert(m_on_aio_ready == nullptr); + + if (m_shut_down) { + ldout(cct, 5) << ": ignoring event after shut down" << dendl; + on_ready->complete(0); + m_image_ctx.op_work_queue->queue(on_safe, -ESHUTDOWN); + return nullptr; + } + + ++m_in_flight_aio_modify; + m_aio_modify_unsafe_contexts.push_back(on_safe); + + // FLUSH if we hit the low-water mark -- on_safe contexts are + // completed by flushes-only so that we don't move the journal + // commit position until safely on-disk + + *flush_required = (m_aio_modify_unsafe_contexts.size() == + IN_FLIGHT_IO_LOW_WATER_MARK); + if (*flush_required) { + ldout(cct, 10) << ": hit AIO replay low-water mark: scheduling flush" + << dendl; + } + + // READY for more events if: + // * not at high-water mark for IO + // * in-flight ops are at a consistent point (snap create has IO flushed, + // shrink has adjusted clip boundary, etc) -- should have already been + // flagged not-ready + if (m_in_flight_aio_modify == IN_FLIGHT_IO_HIGH_WATER_MARK) { + ldout(cct, 10) << ": hit AIO replay high-water mark: pausing replay" + << dendl; + ceph_assert(m_on_aio_ready == nullptr); + std::swap(m_on_aio_ready, on_ready); + } + + // when the modification is ACKed by librbd, we can process the next + // event. when flushed, the completion of the next flush will fire the + // on_safe callback + auto aio_comp = io::AioCompletion::create_and_start<Context>( + new C_AioModifyComplete(this, on_ready, on_safe, std::move(filters)), + util::get_image_ctx(&m_image_ctx), aio_type); + return aio_comp; +} + +template <typename I> +io::AioCompletion *Replay<I>::create_aio_flush_completion(Context *on_safe) { + ceph_assert(ceph_mutex_is_locked(m_lock)); + + CephContext *cct = m_image_ctx.cct; + if (m_shut_down) { + ldout(cct, 5) << ": ignoring event after shut down" << dendl; + if (on_safe != nullptr) { + m_image_ctx.op_work_queue->queue(on_safe, -ESHUTDOWN); + } + return nullptr; + } + + ++m_in_flight_aio_flush; + + // associate all prior write/discard ops to this flush request + auto aio_comp = io::AioCompletion::create_and_start<Context>( + new C_AioFlushComplete(this, on_safe, + std::move(m_aio_modify_unsafe_contexts)), + util::get_image_ctx(&m_image_ctx), io::AIO_TYPE_FLUSH); + m_aio_modify_unsafe_contexts.clear(); + return aio_comp; +} + +template <typename I> +bool Replay<I>::clipped_io(uint64_t image_offset, io::AioCompletion *aio_comp) { + CephContext *cct = m_image_ctx.cct; + + m_image_ctx.image_lock.lock_shared(); + size_t image_size = m_image_ctx.size; + m_image_ctx.image_lock.unlock_shared(); + + if (image_offset >= image_size) { + // rbd-mirror image sync might race an IO event w/ associated resize between + // the point the peer is registered and the sync point is created, so no-op + // IO events beyond the current image extents since under normal conditions + // it wouldn't have been recorded in the journal + ldout(cct, 5) << ": no-op IO event beyond image size" << dendl; + aio_comp->get(); + aio_comp->set_request_count(0); + aio_comp->put(); + return true; + } + + return false; +} + +} // namespace journal +} // namespace librbd + +template class librbd::journal::Replay<librbd::ImageCtx>; diff --git a/src/librbd/journal/Replay.h b/src/librbd/journal/Replay.h new file mode 100644 index 000000000..038601833 --- /dev/null +++ b/src/librbd/journal/Replay.h @@ -0,0 +1,205 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#ifndef CEPH_LIBRBD_JOURNAL_REPLAY_H +#define CEPH_LIBRBD_JOURNAL_REPLAY_H + +#include "include/int_types.h" +#include "include/buffer_fwd.h" +#include "include/Context.h" +#include "common/ceph_mutex.h" +#include "librbd/io/Types.h" +#include "librbd/journal/Types.h" +#include <boost/variant.hpp> +#include <list> +#include <unordered_set> +#include <unordered_map> + +namespace librbd { + +class ImageCtx; +namespace io { struct AioCompletion; } + +namespace journal { + +template <typename ImageCtxT = ImageCtx> +class Replay { +public: + static Replay *create(ImageCtxT &image_ctx) { + return new Replay(image_ctx); + } + + Replay(ImageCtxT &image_ctx); + ~Replay(); + + int decode(bufferlist::const_iterator *it, EventEntry *event_entry); + void process(const EventEntry &event_entry, + Context *on_ready, Context *on_safe); + + void shut_down(bool cancel_ops, Context *on_finish); + void flush(Context *on_finish); + + void replay_op_ready(uint64_t op_tid, Context *on_resume); + +private: + typedef std::unordered_set<int> ReturnValues; + + struct OpEvent { + bool op_in_progress = false; + bool finish_on_ready = false; + Context *on_op_finish_event = nullptr; + Context *on_start_ready = nullptr; + Context *on_start_safe = nullptr; + Context *on_finish_ready = nullptr; + Context *on_finish_safe = nullptr; + Context *on_op_complete = nullptr; + ReturnValues op_finish_error_codes; + ReturnValues ignore_error_codes; + }; + + typedef std::list<uint64_t> OpTids; + typedef std::list<Context *> Contexts; + typedef std::unordered_set<Context *> ContextSet; + typedef std::unordered_map<uint64_t, OpEvent> OpEvents; + + struct C_OpOnComplete : public Context { + Replay *replay; + uint64_t op_tid; + C_OpOnComplete(Replay *replay, uint64_t op_tid) + : replay(replay), op_tid(op_tid) { + } + void finish(int r) override { + replay->handle_op_complete(op_tid, r); + } + }; + + struct C_AioModifyComplete : public Context { + Replay *replay; + Context *on_ready; + Context *on_safe; + std::set<int> filters; + C_AioModifyComplete(Replay *replay, Context *on_ready, + Context *on_safe, std::set<int> &&filters) + : replay(replay), on_ready(on_ready), on_safe(on_safe), + filters(std::move(filters)) { + } + void finish(int r) override { + replay->handle_aio_modify_complete(on_ready, on_safe, r, filters); + } + }; + + struct C_AioFlushComplete : public Context { + Replay *replay; + Context *on_flush_safe; + Contexts on_safe_ctxs; + C_AioFlushComplete(Replay *replay, Context *on_flush_safe, + Contexts &&on_safe_ctxs) + : replay(replay), on_flush_safe(on_flush_safe), + on_safe_ctxs(on_safe_ctxs) { + } + void finish(int r) override { + replay->handle_aio_flush_complete(on_flush_safe, on_safe_ctxs, r); + } + }; + + struct EventVisitor : public boost::static_visitor<void> { + Replay *replay; + Context *on_ready; + Context *on_safe; + + EventVisitor(Replay *_replay, Context *_on_ready, Context *_on_safe) + : replay(_replay), on_ready(_on_ready), on_safe(_on_safe) { + } + + template <typename Event> + inline void operator()(const Event &event) const { + replay->handle_event(event, on_ready, on_safe); + } + }; + + ImageCtxT &m_image_ctx; + + ceph::mutex m_lock = ceph::make_mutex("Replay<I>::m_lock"); + + uint64_t m_in_flight_aio_flush = 0; + uint64_t m_in_flight_aio_modify = 0; + Contexts m_aio_modify_unsafe_contexts; + ContextSet m_aio_modify_safe_contexts; + + OpEvents m_op_events; + uint64_t m_in_flight_op_events = 0; + + bool m_shut_down = false; + Context *m_flush_ctx = nullptr; + Context *m_on_aio_ready = nullptr; + + void handle_event(const AioDiscardEvent &event, Context *on_ready, + Context *on_safe); + void handle_event(const AioWriteEvent &event, Context *on_ready, + Context *on_safe); + void handle_event(const AioWriteSameEvent &event, Context *on_ready, + Context *on_safe); + void handle_event(const AioCompareAndWriteEvent &event, Context *on_ready, + Context *on_safe); + void handle_event(const AioFlushEvent &event, Context *on_ready, + Context *on_safe); + void handle_event(const OpFinishEvent &event, Context *on_ready, + Context *on_safe); + void handle_event(const SnapCreateEvent &event, Context *on_ready, + Context *on_safe); + void handle_event(const SnapRemoveEvent &event, Context *on_ready, + Context *on_safe); + void handle_event(const SnapRenameEvent &event, Context *on_ready, + Context *on_safe); + void handle_event(const SnapProtectEvent &event, Context *on_ready, + Context *on_safe); + void handle_event(const SnapUnprotectEvent &event, Context *on_ready, + Context *on_safe); + void handle_event(const SnapRollbackEvent &event, Context *on_ready, + Context *on_safe); + void handle_event(const RenameEvent &event, Context *on_ready, + Context *on_safe); + void handle_event(const ResizeEvent &event, Context *on_ready, + Context *on_safe); + void handle_event(const FlattenEvent &event, Context *on_ready, + Context *on_safe); + void handle_event(const DemotePromoteEvent &event, Context *on_ready, + Context *on_safe); + void handle_event(const SnapLimitEvent &event, Context *on_ready, + Context *on_safe); + void handle_event(const UpdateFeaturesEvent &event, Context *on_ready, + Context *on_safe); + void handle_event(const MetadataSetEvent &event, Context *on_ready, + Context *on_safe); + void handle_event(const MetadataRemoveEvent &event, Context *on_ready, + Context *on_safe); + void handle_event(const UnknownEvent &event, Context *on_ready, + Context *on_safe); + + void handle_aio_modify_complete(Context *on_ready, Context *on_safe, + int r, std::set<int> &filters); + void handle_aio_flush_complete(Context *on_flush_safe, Contexts &on_safe_ctxs, + int r); + + Context *create_op_context_callback(uint64_t op_tid, Context *on_ready, + Context *on_safe, OpEvent **op_event); + void handle_op_complete(uint64_t op_tid, int r); + + io::AioCompletion *create_aio_modify_completion(Context *on_ready, + Context *on_safe, + io::aio_type_t aio_type, + bool *flush_required, + std::set<int> &&filters); + io::AioCompletion *create_aio_flush_completion(Context *on_safe); + void handle_aio_completion(io::AioCompletion *aio_comp); + + bool clipped_io(uint64_t image_offset, io::AioCompletion *aio_comp); + +}; + +} // namespace journal +} // namespace librbd + +extern template class librbd::journal::Replay<librbd::ImageCtx>; + +#endif // CEPH_LIBRBD_JOURNAL_REPLAY_H diff --git a/src/librbd/journal/ResetRequest.cc b/src/librbd/journal/ResetRequest.cc new file mode 100644 index 000000000..895d0046e --- /dev/null +++ b/src/librbd/journal/ResetRequest.cc @@ -0,0 +1,162 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#include "librbd/journal/ResetRequest.h" +#include "common/dout.h" +#include "common/errno.h" +#include "common/Timer.h" +#include "common/WorkQueue.h" +#include "journal/Journaler.h" +#include "journal/Settings.h" +#include "include/ceph_assert.h" +#include "librbd/Journal.h" +#include "librbd/Utils.h" +#include "librbd/journal/CreateRequest.h" +#include "librbd/journal/RemoveRequest.h" + +#define dout_subsys ceph_subsys_rbd +#undef dout_prefix +#define dout_prefix *_dout << "librbd::journal::ResetRequest: " << this << " " \ + << __func__ << ": " + +namespace librbd { +namespace journal { + +using util::create_async_context_callback; +using util::create_context_callback; + +template<typename I> +void ResetRequest<I>::send() { + init_journaler(); +} + +template<typename I> +void ResetRequest<I>::init_journaler() { + ldout(m_cct, 10) << dendl; + + m_journaler = new Journaler(m_io_ctx, m_image_id, m_client_id, {}, nullptr); + Context *ctx = create_context_callback< + ResetRequest<I>, &ResetRequest<I>::handle_init_journaler>(this); + m_journaler->init(ctx); +} + +template<typename I> +void ResetRequest<I>::handle_init_journaler(int r) { + ldout(m_cct, 10) << "r=" << r << dendl; + + if (r == -ENOENT) { + ldout(m_cct, 5) << "journal does not exist" << dendl; + m_ret_val = r; + } else if (r < 0) { + lderr(m_cct) << "failed to init journaler: " << cpp_strerror(r) << dendl; + m_ret_val = r; + } else { + int64_t pool_id; + m_journaler->get_metadata(&m_order, &m_splay_width, &pool_id); + + if (pool_id != -1) { + librados::Rados rados(m_io_ctx); + r = rados.pool_reverse_lookup(pool_id, &m_object_pool_name); + if (r < 0) { + lderr(m_cct) << "failed to lookup data pool: " << cpp_strerror(r) + << dendl; + m_ret_val = r; + } + } + } + + shut_down_journaler(); +} + +template<typename I> +void ResetRequest<I>::shut_down_journaler() { + ldout(m_cct, 10) << dendl; + + Context *ctx = create_async_context_callback( + m_op_work_queue, create_context_callback< + ResetRequest<I>, &ResetRequest<I>::handle_journaler_shutdown>(this)); + m_journaler->shut_down(ctx); +} + +template<typename I> +void ResetRequest<I>::handle_journaler_shutdown(int r) { + ldout(m_cct, 10) << "r=" << r << dendl; + + delete m_journaler; + if (r < 0) { + lderr(m_cct) << "failed to shut down journaler: " << cpp_strerror(r) + << dendl; + if (m_ret_val == 0) { + m_ret_val = r; + } + } + + if (m_ret_val < 0) { + finish(m_ret_val); + return; + } + + remove_journal(); +} + +template<typename I> +void ResetRequest<I>::remove_journal() { + ldout(m_cct, 10) << dendl; + + Context *ctx = create_context_callback< + ResetRequest<I>, &ResetRequest<I>::handle_remove_journal>(this); + auto req = RemoveRequest<I>::create(m_io_ctx, m_image_id, m_client_id, + m_op_work_queue, ctx); + req->send(); +} + +template<typename I> +void ResetRequest<I>::handle_remove_journal(int r) { + ldout(m_cct, 10) << "r=" << r << dendl; + + if (r < 0) { + lderr(m_cct) << "failed to remove journal: " << cpp_strerror(r) << dendl; + finish(r); + return; + } + + create_journal(); +} + +template<typename I> +void ResetRequest<I>::create_journal() { + ldout(m_cct, 10) << dendl; + + Context *ctx = create_context_callback< + ResetRequest<I>, &ResetRequest<I>::handle_create_journal>(this); + journal::TagData tag_data(m_mirror_uuid); + auto req = CreateRequest<I>::create(m_io_ctx, m_image_id, m_order, + m_splay_width, m_object_pool_name, + cls::journal::Tag::TAG_CLASS_NEW, + tag_data, m_client_id, m_op_work_queue, + ctx); + req->send(); +} + +template<typename I> +void ResetRequest<I>::handle_create_journal(int r) { + ldout(m_cct, 10) << "r=" << r << dendl; + + if (r < 0) { + lderr(m_cct) << "failed to create journal: " << cpp_strerror(r) << dendl; + } + finish(r); +} + +template<typename I> +void ResetRequest<I>::finish(int r) { + ldout(m_cct, 10) << "r=" << r << dendl; + + m_on_finish->complete(r); + delete this; +} + +} // namespace journal +} // namespace librbd + +template class librbd::journal::ResetRequest<librbd::ImageCtx>; diff --git a/src/librbd/journal/ResetRequest.h b/src/librbd/journal/ResetRequest.h new file mode 100644 index 000000000..f9331f644 --- /dev/null +++ b/src/librbd/journal/ResetRequest.h @@ -0,0 +1,110 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#ifndef CEPH_LIBRBD_JOURNAL_RESET_REQUEST_H +#define CEPH_LIBRBD_JOURNAL_RESET_REQUEST_H + +#include "include/int_types.h" +#include "include/buffer.h" +#include "include/rados/librados.hpp" +#include "include/rbd/librbd.hpp" +#include "librbd/journal/TypeTraits.h" +#include "common/Timer.h" +#include <string> + +class Context; +class ContextWQ; + +namespace journal { class Journaler; } + +namespace librbd { + +class ImageCtx; + +namespace journal { + +template<typename ImageCtxT = ImageCtx> +class ResetRequest { +public: + static ResetRequest *create(librados::IoCtx &io_ctx, + const std::string &image_id, + const std::string &client_id, + const std::string &mirror_uuid, + ContextWQ *op_work_queue, Context *on_finish) { + return new ResetRequest(io_ctx, image_id, client_id, mirror_uuid, + op_work_queue, on_finish); + } + + ResetRequest(librados::IoCtx &io_ctx, const std::string &image_id, + const std::string &client_id, const std::string &mirror_uuid, + ContextWQ *op_work_queue, Context *on_finish) + : m_io_ctx(io_ctx), m_image_id(image_id), m_client_id(client_id), + m_mirror_uuid(mirror_uuid), m_op_work_queue(op_work_queue), + m_on_finish(on_finish), + m_cct(reinterpret_cast<CephContext *>(m_io_ctx.cct())) { + } + + void send(); + +private: + /** + * @verbatim + * + * <start> + * | + * v + * INIT_JOURNALER + * | + * v + * SHUT_DOWN_JOURNALER + * | + * v + * REMOVE_JOURNAL + * | + * v + * CREATE_JOURNAL + * | + * v + * <finish> + * + * @endverbatim + */ + typedef typename TypeTraits<ImageCtxT>::Journaler Journaler; + + librados::IoCtx &m_io_ctx; + std::string m_image_id; + std::string m_client_id; + std::string m_mirror_uuid; + ContextWQ *m_op_work_queue; + Context *m_on_finish; + + CephContext *m_cct; + Journaler *m_journaler = nullptr; + int m_ret_val = 0; + + uint8_t m_order = 0; + uint8_t m_splay_width = 0; + std::string m_object_pool_name; + + void init_journaler(); + void handle_init_journaler(int r); + + void shut_down_journaler(); + void handle_journaler_shutdown(int r); + + void remove_journal(); + void handle_remove_journal(int r); + + void create_journal(); + void handle_create_journal(int r); + + void finish(int r); + +}; + +} // namespace journal +} // namespace librbd + +extern template class librbd::journal::ResetRequest<librbd::ImageCtx>; + +#endif // CEPH_LIBRBD_JOURNAL_REMOVE_REQUEST_H diff --git a/src/librbd/journal/StandardPolicy.cc b/src/librbd/journal/StandardPolicy.cc new file mode 100644 index 000000000..7f124aeef --- /dev/null +++ b/src/librbd/journal/StandardPolicy.cc @@ -0,0 +1,32 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#include "librbd/journal/StandardPolicy.h" +#include "librbd/ImageCtx.h" +#include "librbd/Journal.h" +#include "librbd/asio/ContextWQ.h" + +#define dout_subsys ceph_subsys_rbd +#undef dout_prefix +#define dout_prefix *_dout << "librbd::journal::StandardPolicy: " + +namespace librbd { +namespace journal { + +template<typename I> +void StandardPolicy<I>::allocate_tag_on_lock(Context *on_finish) { + ceph_assert(m_image_ctx->journal != nullptr); + + if (!m_image_ctx->journal->is_tag_owner()) { + lderr(m_image_ctx->cct) << "local image not promoted" << dendl; + m_image_ctx->op_work_queue->queue(on_finish, -EPERM); + return; + } + + m_image_ctx->journal->allocate_local_tag(on_finish); +} + +} // namespace journal +} // namespace librbd + +template class librbd::journal::StandardPolicy<librbd::ImageCtx>; diff --git a/src/librbd/journal/StandardPolicy.h b/src/librbd/journal/StandardPolicy.h new file mode 100644 index 000000000..ec8d0148f --- /dev/null +++ b/src/librbd/journal/StandardPolicy.h @@ -0,0 +1,38 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#ifndef CEPH_LIBRBD_JOURNAL_STANDARD_POLICY_H +#define CEPH_LIBRBD_JOURNAL_STANDARD_POLICY_H + +#include "librbd/journal/Policy.h" + +namespace librbd { + +struct ImageCtx; + +namespace journal { + +template<typename ImageCtxT = ImageCtx> +class StandardPolicy : public Policy { +public: + StandardPolicy(ImageCtxT *image_ctx) : m_image_ctx(image_ctx) { + } + + bool append_disabled() const override { + return false; + } + bool journal_disabled() const override { + return false; + } + void allocate_tag_on_lock(Context *on_finish) override; + +private: + ImageCtxT *m_image_ctx; +}; + +} // namespace journal +} // namespace librbd + +extern template class librbd::journal::StandardPolicy<librbd::ImageCtx>; + +#endif // CEPH_LIBRBD_JOURNAL_STANDARD_POLICY_H diff --git a/src/librbd/journal/TypeTraits.h b/src/librbd/journal/TypeTraits.h new file mode 100644 index 000000000..51b025f6d --- /dev/null +++ b/src/librbd/journal/TypeTraits.h @@ -0,0 +1,29 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#ifndef CEPH_LIBRBD_JOURNAL_TYPE_TRAITS_H +#define CEPH_LIBRBD_JOURNAL_TYPE_TRAITS_H + +struct ContextWQ; + +namespace journal { +class Future; +class Journaler; +class ReplayEntry; +} + +namespace librbd { +namespace journal { + +template <typename ImageCtxT> +struct TypeTraits { + typedef ::journal::Journaler Journaler; + typedef ::journal::Future Future; + typedef ::journal::ReplayEntry ReplayEntry; + typedef ::ContextWQ ContextWQ; +}; + +} // namespace journal +} // namespace librbd + +#endif // CEPH_LIBRBD_JOURNAL_TYPE_TRAITS_H diff --git a/src/librbd/journal/Types.cc b/src/librbd/journal/Types.cc new file mode 100644 index 000000000..d76a15e55 --- /dev/null +++ b/src/librbd/journal/Types.cc @@ -0,0 +1,956 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#include "librbd/journal/Types.h" +#include "include/ceph_assert.h" +#include "include/stringify.h" +#include "include/types.h" +#include "common/Formatter.h" + +namespace librbd { +namespace journal { + +using ceph::encode; +using ceph::decode; + +namespace { + +template <typename E> +class GetTypeVisitor : public boost::static_visitor<E> { +public: + template <typename T> + inline E operator()(const T&) const { + return T::TYPE; + } +}; + +class EncodeVisitor : public boost::static_visitor<void> { +public: + explicit EncodeVisitor(bufferlist &bl) : m_bl(bl) { + } + + template <typename T> + inline void operator()(const T& t) const { + encode(static_cast<uint32_t>(T::TYPE), m_bl); + t.encode(m_bl); + } +private: + bufferlist &m_bl; +}; + +class DecodeVisitor : public boost::static_visitor<void> { +public: + DecodeVisitor(__u8 version, bufferlist::const_iterator &iter) + : m_version(version), m_iter(iter) { + } + + template <typename T> + inline void operator()(T& t) const { + t.decode(m_version, m_iter); + } +private: + __u8 m_version; + bufferlist::const_iterator &m_iter; +}; + +class DumpVisitor : public boost::static_visitor<void> { +public: + explicit DumpVisitor(Formatter *formatter, const std::string &key) + : m_formatter(formatter), m_key(key) {} + + template <typename T> + inline void operator()(const T& t) const { + auto type = T::TYPE; + m_formatter->dump_string(m_key.c_str(), stringify(type)); + t.dump(m_formatter); + } +private: + ceph::Formatter *m_formatter; + std::string m_key; +}; + +} // anonymous namespace + +void AioDiscardEvent::encode(bufferlist& bl) const { + using ceph::encode; + encode(offset, bl); + encode(length, bl); + bool skip_partial_discard = (discard_granularity_bytes > 0); + encode(skip_partial_discard, bl); + encode(discard_granularity_bytes, bl); +} + +void AioDiscardEvent::decode(__u8 version, bufferlist::const_iterator& it) { + using ceph::decode; + decode(offset, it); + decode(length, it); + + bool skip_partial_discard = false; + if (version >= 4) { + decode(skip_partial_discard, it); + } + + if (version >= 5) { + decode(discard_granularity_bytes, it); + } else { + if (skip_partial_discard) { + // use a size larger than the maximum object size which will + // truncated down to object size during IO processing + discard_granularity_bytes = std::numeric_limits<uint32_t>::max(); + } else { + discard_granularity_bytes = 0; + } + } +} + +void AioDiscardEvent::dump(Formatter *f) const { + f->dump_unsigned("offset", offset); + f->dump_unsigned("length", length); + f->dump_unsigned("discard_granularity_bytes", discard_granularity_bytes); +} + +uint32_t AioWriteEvent::get_fixed_size() { + return EventEntry::get_fixed_size() + 16 /* offset, length */; +} + +void AioWriteEvent::encode(bufferlist& bl) const { + using ceph::encode; + encode(offset, bl); + encode(length, bl); + encode(data, bl); +} + +void AioWriteEvent::decode(__u8 version, bufferlist::const_iterator& it) { + using ceph::decode; + decode(offset, it); + decode(length, it); + decode(data, it); +} + +void AioWriteEvent::dump(Formatter *f) const { + f->dump_unsigned("offset", offset); + f->dump_unsigned("length", length); +} + +void AioWriteSameEvent::encode(bufferlist& bl) const { + using ceph::encode; + encode(offset, bl); + encode(length, bl); + encode(data, bl); +} + +void AioWriteSameEvent::decode(__u8 version, bufferlist::const_iterator& it) { + using ceph::decode; + decode(offset, it); + decode(length, it); + decode(data, it); +} + +void AioWriteSameEvent::dump(Formatter *f) const { + f->dump_unsigned("offset", offset); + f->dump_unsigned("length", length); +} + +uint32_t AioCompareAndWriteEvent::get_fixed_size() { + return EventEntry::get_fixed_size() + 32 /* offset, length */; +} + +void AioCompareAndWriteEvent::encode(bufferlist& bl) const { + using ceph::encode; + encode(offset, bl); + encode(length, bl); + encode(cmp_data, bl); + encode(write_data, bl); +} + +void AioCompareAndWriteEvent::decode(__u8 version, bufferlist::const_iterator& it) { + using ceph::decode; + decode(offset, it); + decode(length, it); + decode(cmp_data, it); + decode(write_data, it); +} + +void AioCompareAndWriteEvent::dump(Formatter *f) const { + f->dump_unsigned("offset", offset); + f->dump_unsigned("length", length); +} + +void AioFlushEvent::encode(bufferlist& bl) const { +} + +void AioFlushEvent::decode(__u8 version, bufferlist::const_iterator& it) { +} + +void AioFlushEvent::dump(Formatter *f) const { +} + +void OpEventBase::encode(bufferlist& bl) const { + using ceph::encode; + encode(op_tid, bl); +} + +void OpEventBase::decode(__u8 version, bufferlist::const_iterator& it) { + using ceph::decode; + decode(op_tid, it); +} + +void OpEventBase::dump(Formatter *f) const { + f->dump_unsigned("op_tid", op_tid); +} + +void OpFinishEvent::encode(bufferlist& bl) const { + OpEventBase::encode(bl); + using ceph::encode; + encode(op_tid, bl); + encode(r, bl); +} + +void OpFinishEvent::decode(__u8 version, bufferlist::const_iterator& it) { + OpEventBase::decode(version, it); + using ceph::decode; + decode(op_tid, it); + decode(r, it); +} + +void OpFinishEvent::dump(Formatter *f) const { + OpEventBase::dump(f); + f->dump_unsigned("op_tid", op_tid); + f->dump_int("result", r); +} + +void SnapEventBase::encode(bufferlist& bl) const { + using ceph::encode; + OpEventBase::encode(bl); + encode(snap_name, bl); + encode(snap_namespace, bl); +} + +void SnapEventBase::decode(__u8 version, bufferlist::const_iterator& it) { + using ceph::decode; + OpEventBase::decode(version, it); + using ceph::decode; + decode(snap_name, it); + if (version >= 4) { + decode(snap_namespace, it); + } +} + +void SnapEventBase::dump(Formatter *f) const { + OpEventBase::dump(f); + f->dump_string("snap_name", snap_name); + snap_namespace.dump(f); +} + +void SnapCreateEvent::encode(bufferlist &bl) const { + SnapEventBase::encode(bl); +} + +void SnapCreateEvent::decode(__u8 version, bufferlist::const_iterator& it) { + using ceph::decode; + SnapEventBase::decode(version, it); + if (version == 3) { + decode(snap_namespace, it); + } +} + +void SnapCreateEvent::dump(Formatter *f) const { + SnapEventBase::dump(f); +} + +void SnapLimitEvent::encode(bufferlist &bl) const { + OpEventBase::encode(bl); + using ceph::encode; + encode(limit, bl); +} + +void SnapLimitEvent::decode(__u8 version, bufferlist::const_iterator& it) { + OpEventBase::decode(version, it); + using ceph::decode; + decode(limit, it); +} + +void SnapLimitEvent::dump(Formatter *f) const { + OpEventBase::dump(f); + f->dump_unsigned("limit", limit); +} + +void SnapRenameEvent::encode(bufferlist& bl) const { + OpEventBase::encode(bl); + using ceph::encode; + encode(dst_snap_name, bl); + encode(snap_id, bl); + encode(src_snap_name, bl); +} + +void SnapRenameEvent::decode(__u8 version, bufferlist::const_iterator& it) { + using ceph::decode; + OpEventBase::decode(version, it); + decode(dst_snap_name, it); + decode(snap_id, it); + if (version >= 2) { + decode(src_snap_name, it); + } +} + +void SnapRenameEvent::dump(Formatter *f) const { + OpEventBase::dump(f); + f->dump_unsigned("src_snap_id", snap_id); + f->dump_string("src_snap_name", src_snap_name); + f->dump_string("dest_snap_name", dst_snap_name); +} + +void RenameEvent::encode(bufferlist& bl) const { + OpEventBase::encode(bl); + using ceph::encode; + encode(image_name, bl); +} + +void RenameEvent::decode(__u8 version, bufferlist::const_iterator& it) { + OpEventBase::decode(version, it); + using ceph::decode; + decode(image_name, it); +} + +void RenameEvent::dump(Formatter *f) const { + OpEventBase::dump(f); + f->dump_string("image_name", image_name); +} + +void ResizeEvent::encode(bufferlist& bl) const { + OpEventBase::encode(bl); + using ceph::encode; + encode(size, bl); +} + +void ResizeEvent::decode(__u8 version, bufferlist::const_iterator& it) { + OpEventBase::decode(version, it); + using ceph::decode; + decode(size, it); +} + +void ResizeEvent::dump(Formatter *f) const { + OpEventBase::dump(f); + f->dump_unsigned("size", size); +} + +void DemotePromoteEvent::encode(bufferlist& bl) const { +} + +void DemotePromoteEvent::decode(__u8 version, bufferlist::const_iterator& it) { +} + +void DemotePromoteEvent::dump(Formatter *f) const { +} + +void UpdateFeaturesEvent::encode(bufferlist& bl) const { + OpEventBase::encode(bl); + using ceph::encode; + encode(features, bl); + encode(enabled, bl); +} + +void UpdateFeaturesEvent::decode(__u8 version, bufferlist::const_iterator& it) { + OpEventBase::decode(version, it); + using ceph::decode; + decode(features, it); + decode(enabled, it); +} + +void UpdateFeaturesEvent::dump(Formatter *f) const { + OpEventBase::dump(f); + f->dump_unsigned("features", features); + f->dump_bool("enabled", enabled); +} + +void MetadataSetEvent::encode(bufferlist& bl) const { + OpEventBase::encode(bl); + using ceph::encode; + encode(key, bl); + encode(value, bl); +} + +void MetadataSetEvent::decode(__u8 version, bufferlist::const_iterator& it) { + OpEventBase::decode(version, it); + using ceph::decode; + decode(key, it); + decode(value, it); +} + +void MetadataSetEvent::dump(Formatter *f) const { + OpEventBase::dump(f); + f->dump_string("key", key); + f->dump_string("value", value); +} + +void MetadataRemoveEvent::encode(bufferlist& bl) const { + OpEventBase::encode(bl); + using ceph::encode; + encode(key, bl); +} + +void MetadataRemoveEvent::decode(__u8 version, bufferlist::const_iterator& it) { + OpEventBase::decode(version, it); + using ceph::decode; + decode(key, it); +} + +void MetadataRemoveEvent::dump(Formatter *f) const { + OpEventBase::dump(f); + f->dump_string("key", key); +} + +void UnknownEvent::encode(bufferlist& bl) const { + ceph_abort(); +} + +void UnknownEvent::decode(__u8 version, bufferlist::const_iterator& it) { +} + +void UnknownEvent::dump(Formatter *f) const { +} + +EventType EventEntry::get_event_type() const { + return boost::apply_visitor(GetTypeVisitor<EventType>(), event); +} + +void EventEntry::encode(bufferlist& bl) const { + ENCODE_START(5, 1, bl); + boost::apply_visitor(EncodeVisitor(bl), event); + ENCODE_FINISH(bl); + encode_metadata(bl); +} + +void EventEntry::decode(bufferlist::const_iterator& it) { + DECODE_START(1, it); + + uint32_t event_type; + decode(event_type, it); + + // select the correct payload variant based upon the encoded op + switch (event_type) { + case EVENT_TYPE_AIO_DISCARD: + event = AioDiscardEvent(); + break; + case EVENT_TYPE_AIO_WRITE: + event = AioWriteEvent(); + break; + case EVENT_TYPE_AIO_FLUSH: + event = AioFlushEvent(); + break; + case EVENT_TYPE_OP_FINISH: + event = OpFinishEvent(); + break; + case EVENT_TYPE_SNAP_CREATE: + event = SnapCreateEvent(); + break; + case EVENT_TYPE_SNAP_REMOVE: + event = SnapRemoveEvent(); + break; + case EVENT_TYPE_SNAP_RENAME: + event = SnapRenameEvent(); + break; + case EVENT_TYPE_SNAP_PROTECT: + event = SnapProtectEvent(); + break; + case EVENT_TYPE_SNAP_UNPROTECT: + event = SnapUnprotectEvent(); + break; + case EVENT_TYPE_SNAP_ROLLBACK: + event = SnapRollbackEvent(); + break; + case EVENT_TYPE_RENAME: + event = RenameEvent(); + break; + case EVENT_TYPE_RESIZE: + event = ResizeEvent(); + break; + case EVENT_TYPE_FLATTEN: + event = FlattenEvent(); + break; + case EVENT_TYPE_DEMOTE_PROMOTE: + event = DemotePromoteEvent(); + break; + case EVENT_TYPE_SNAP_LIMIT: + event = SnapLimitEvent(); + break; + case EVENT_TYPE_UPDATE_FEATURES: + event = UpdateFeaturesEvent(); + break; + case EVENT_TYPE_METADATA_SET: + event = MetadataSetEvent(); + break; + case EVENT_TYPE_METADATA_REMOVE: + event = MetadataRemoveEvent(); + break; + case EVENT_TYPE_AIO_WRITESAME: + event = AioWriteSameEvent(); + break; + case EVENT_TYPE_AIO_COMPARE_AND_WRITE: + event = AioCompareAndWriteEvent(); + break; + default: + event = UnknownEvent(); + break; + } + + boost::apply_visitor(DecodeVisitor(struct_v, it), event); + DECODE_FINISH(it); + if (struct_v >= 4) { + decode_metadata(it); + } +} + +void EventEntry::dump(Formatter *f) const { + boost::apply_visitor(DumpVisitor(f, "event_type"), event); + f->dump_stream("timestamp") << timestamp; +} + +void EventEntry::encode_metadata(bufferlist& bl) const { + ENCODE_START(1, 1, bl); + encode(timestamp, bl); + ENCODE_FINISH(bl); +} + +void EventEntry::decode_metadata(bufferlist::const_iterator& it) { + DECODE_START(1, it); + decode(timestamp, it); + DECODE_FINISH(it); +} + +void EventEntry::generate_test_instances(std::list<EventEntry *> &o) { + o.push_back(new EventEntry(AioDiscardEvent())); + o.push_back(new EventEntry(AioDiscardEvent(123, 345, 4096), utime_t(1, 1))); + + bufferlist bl; + bl.append(std::string(32, '1')); + o.push_back(new EventEntry(AioWriteEvent())); + o.push_back(new EventEntry(AioWriteEvent(123, 456, bl), utime_t(1, 1))); + + o.push_back(new EventEntry(AioFlushEvent())); + + o.push_back(new EventEntry(OpFinishEvent(123, -1), utime_t(1, 1))); + + o.push_back(new EventEntry(SnapCreateEvent(), utime_t(1, 1))); + o.push_back(new EventEntry(SnapCreateEvent(234, cls::rbd::UserSnapshotNamespace(), "snap"), utime_t(1, 1))); + + o.push_back(new EventEntry(SnapRemoveEvent())); + o.push_back(new EventEntry(SnapRemoveEvent(345, cls::rbd::UserSnapshotNamespace(), "snap"), utime_t(1, 1))); + + o.push_back(new EventEntry(SnapRenameEvent())); + o.push_back(new EventEntry(SnapRenameEvent(456, 1, "src snap", "dest snap"), + utime_t(1, 1))); + + o.push_back(new EventEntry(SnapProtectEvent())); + o.push_back(new EventEntry(SnapProtectEvent(567, cls::rbd::UserSnapshotNamespace(), "snap"), utime_t(1, 1))); + + o.push_back(new EventEntry(SnapUnprotectEvent())); + o.push_back(new EventEntry(SnapUnprotectEvent(678, cls::rbd::UserSnapshotNamespace(), "snap"), utime_t(1, 1))); + + o.push_back(new EventEntry(SnapRollbackEvent())); + o.push_back(new EventEntry(SnapRollbackEvent(789, cls::rbd::UserSnapshotNamespace(), "snap"), utime_t(1, 1))); + + o.push_back(new EventEntry(RenameEvent())); + o.push_back(new EventEntry(RenameEvent(890, "image name"), utime_t(1, 1))); + + o.push_back(new EventEntry(ResizeEvent())); + o.push_back(new EventEntry(ResizeEvent(901, 1234), utime_t(1, 1))); + + o.push_back(new EventEntry(FlattenEvent(123), utime_t(1, 1))); + + o.push_back(new EventEntry(DemotePromoteEvent())); + + o.push_back(new EventEntry(UpdateFeaturesEvent())); + o.push_back(new EventEntry(UpdateFeaturesEvent(123, 127, true), utime_t(1, 1))); + + o.push_back(new EventEntry(MetadataSetEvent())); + o.push_back(new EventEntry(MetadataSetEvent(123, "key", "value"), utime_t(1, 1))); + + o.push_back(new EventEntry(MetadataRemoveEvent())); + o.push_back(new EventEntry(MetadataRemoveEvent(123, "key"), utime_t(1, 1))); +} + +// Journal Client + +void ImageClientMeta::encode(bufferlist& bl) const { + using ceph::encode; + encode(tag_class, bl); + encode(resync_requested, bl); +} + +void ImageClientMeta::decode(__u8 version, bufferlist::const_iterator& it) { + using ceph::decode; + decode(tag_class, it); + decode(resync_requested, it); +} + +void ImageClientMeta::dump(Formatter *f) const { + f->dump_unsigned("tag_class", tag_class); + f->dump_bool("resync_requested", resync_requested); +} + +void MirrorPeerSyncPoint::encode(bufferlist& bl) const { + using ceph::encode; + encode(snap_name, bl); + encode(from_snap_name, bl); + encode(object_number, bl); + encode(snap_namespace, bl); +} + +void MirrorPeerSyncPoint::decode(__u8 version, bufferlist::const_iterator& it) { + using ceph::decode; + decode(snap_name, it); + decode(from_snap_name, it); + decode(object_number, it); + if (version >= 2) { + decode(snap_namespace, it); + } +} + +void MirrorPeerSyncPoint::dump(Formatter *f) const { + f->dump_string("snap_name", snap_name); + f->dump_string("from_snap_name", from_snap_name); + if (object_number) { + f->dump_unsigned("object_number", *object_number); + } + snap_namespace.dump(f); +} + +void MirrorPeerClientMeta::encode(bufferlist& bl) const { + using ceph::encode; + encode(image_id, bl); + encode(static_cast<uint32_t>(state), bl); + encode(sync_object_count, bl); + encode(static_cast<uint32_t>(sync_points.size()), bl); + for (auto &sync_point : sync_points) { + sync_point.encode(bl); + } + encode(snap_seqs, bl); +} + +void MirrorPeerClientMeta::decode(__u8 version, bufferlist::const_iterator& it) { + using ceph::decode; + decode(image_id, it); + + uint32_t decode_state; + decode(decode_state, it); + state = static_cast<MirrorPeerState>(decode_state); + + decode(sync_object_count, it); + + uint32_t sync_point_count; + decode(sync_point_count, it); + sync_points.resize(sync_point_count); + for (auto &sync_point : sync_points) { + sync_point.decode(version, it); + } + + decode(snap_seqs, it); +} + +void MirrorPeerClientMeta::dump(Formatter *f) const { + f->dump_string("image_id", image_id); + f->dump_stream("state") << state; + f->dump_unsigned("sync_object_count", sync_object_count); + f->open_array_section("sync_points"); + for (auto &sync_point : sync_points) { + f->open_object_section("sync_point"); + sync_point.dump(f); + f->close_section(); + } + f->close_section(); + f->open_array_section("snap_seqs"); + for (auto &pair : snap_seqs) { + f->open_object_section("snap_seq"); + f->dump_unsigned("local_snap_seq", pair.first); + f->dump_unsigned("peer_snap_seq", pair.second); + f->close_section(); + } + f->close_section(); +} + +void CliClientMeta::encode(bufferlist& bl) const { +} + +void CliClientMeta::decode(__u8 version, bufferlist::const_iterator& it) { +} + +void CliClientMeta::dump(Formatter *f) const { +} + +void UnknownClientMeta::encode(bufferlist& bl) const { + ceph_abort(); +} + +void UnknownClientMeta::decode(__u8 version, bufferlist::const_iterator& it) { +} + +void UnknownClientMeta::dump(Formatter *f) const { +} + +ClientMetaType ClientData::get_client_meta_type() const { + return boost::apply_visitor(GetTypeVisitor<ClientMetaType>(), client_meta); +} + +void ClientData::encode(bufferlist& bl) const { + ENCODE_START(2, 1, bl); + boost::apply_visitor(EncodeVisitor(bl), client_meta); + ENCODE_FINISH(bl); +} + +void ClientData::decode(bufferlist::const_iterator& it) { + DECODE_START(1, it); + + uint32_t client_meta_type; + decode(client_meta_type, it); + + // select the correct payload variant based upon the encoded op + switch (client_meta_type) { + case IMAGE_CLIENT_META_TYPE: + client_meta = ImageClientMeta(); + break; + case MIRROR_PEER_CLIENT_META_TYPE: + client_meta = MirrorPeerClientMeta(); + break; + case CLI_CLIENT_META_TYPE: + client_meta = CliClientMeta(); + break; + default: + client_meta = UnknownClientMeta(); + break; + } + + boost::apply_visitor(DecodeVisitor(struct_v, it), client_meta); + DECODE_FINISH(it); +} + +void ClientData::dump(Formatter *f) const { + boost::apply_visitor(DumpVisitor(f, "client_meta_type"), client_meta); +} + +void ClientData::generate_test_instances(std::list<ClientData *> &o) { + o.push_back(new ClientData(ImageClientMeta())); + o.push_back(new ClientData(ImageClientMeta(123))); + o.push_back(new ClientData(MirrorPeerClientMeta())); + o.push_back(new ClientData(MirrorPeerClientMeta("image_id", + {{{}, "snap 2", "snap 1", 123}}, + {{1, 2}, {3, 4}}))); + o.push_back(new ClientData(CliClientMeta())); +} + +// Journal Tag + +void TagPredecessor::encode(bufferlist& bl) const { + using ceph::encode; + encode(mirror_uuid, bl); + encode(commit_valid, bl); + encode(tag_tid, bl); + encode(entry_tid, bl); +} + +void TagPredecessor::decode(bufferlist::const_iterator& it) { + using ceph::decode; + decode(mirror_uuid, it); + decode(commit_valid, it); + decode(tag_tid, it); + decode(entry_tid, it); +} + +void TagPredecessor::dump(Formatter *f) const { + f->dump_string("mirror_uuid", mirror_uuid); + f->dump_string("commit_valid", commit_valid ? "true" : "false"); + f->dump_unsigned("tag_tid", tag_tid); + f->dump_unsigned("entry_tid", entry_tid); +} + +void TagData::encode(bufferlist& bl) const { + using ceph::encode; + encode(mirror_uuid, bl); + predecessor.encode(bl); +} + +void TagData::decode(bufferlist::const_iterator& it) { + using ceph::decode; + decode(mirror_uuid, it); + predecessor.decode(it); +} + +void TagData::dump(Formatter *f) const { + f->dump_string("mirror_uuid", mirror_uuid); + f->open_object_section("predecessor"); + predecessor.dump(f); + f->close_section(); +} + +void TagData::generate_test_instances(std::list<TagData *> &o) { + o.push_back(new TagData()); + o.push_back(new TagData("mirror-uuid")); + o.push_back(new TagData("mirror-uuid", "remote-mirror-uuid", true, 123, 234)); +} + +std::ostream &operator<<(std::ostream &out, const EventType &type) { + using namespace librbd::journal; + + switch (type) { + case EVENT_TYPE_AIO_DISCARD: + out << "AioDiscard"; + break; + case EVENT_TYPE_AIO_WRITE: + out << "AioWrite"; + break; + case EVENT_TYPE_AIO_FLUSH: + out << "AioFlush"; + break; + case EVENT_TYPE_OP_FINISH: + out << "OpFinish"; + break; + case EVENT_TYPE_SNAP_CREATE: + out << "SnapCreate"; + break; + case EVENT_TYPE_SNAP_REMOVE: + out << "SnapRemove"; + break; + case EVENT_TYPE_SNAP_RENAME: + out << "SnapRename"; + break; + case EVENT_TYPE_SNAP_PROTECT: + out << "SnapProtect"; + break; + case EVENT_TYPE_SNAP_UNPROTECT: + out << "SnapUnprotect"; + break; + case EVENT_TYPE_SNAP_ROLLBACK: + out << "SnapRollback"; + break; + case EVENT_TYPE_RENAME: + out << "Rename"; + break; + case EVENT_TYPE_RESIZE: + out << "Resize"; + break; + case EVENT_TYPE_FLATTEN: + out << "Flatten"; + break; + case EVENT_TYPE_DEMOTE_PROMOTE: + out << "Demote/Promote"; + break; + case EVENT_TYPE_SNAP_LIMIT: + out << "SnapLimit"; + break; + case EVENT_TYPE_UPDATE_FEATURES: + out << "UpdateFeatures"; + break; + case EVENT_TYPE_METADATA_SET: + out << "MetadataSet"; + break; + case EVENT_TYPE_METADATA_REMOVE: + out << "MetadataRemove"; + break; + case EVENT_TYPE_AIO_WRITESAME: + out << "AioWriteSame"; + break; + case EVENT_TYPE_AIO_COMPARE_AND_WRITE: + out << "AioCompareAndWrite"; + break; + default: + out << "Unknown (" << static_cast<uint32_t>(type) << ")"; + break; + } + return out; +} + +std::ostream &operator<<(std::ostream &out, const ClientMetaType &type) { + using namespace librbd::journal; + + switch (type) { + case IMAGE_CLIENT_META_TYPE: + out << "Master Image"; + break; + case MIRROR_PEER_CLIENT_META_TYPE: + out << "Mirror Peer"; + break; + case CLI_CLIENT_META_TYPE: + out << "CLI Tool"; + break; + default: + out << "Unknown (" << static_cast<uint32_t>(type) << ")"; + break; + } + return out; +} + +std::ostream &operator<<(std::ostream &out, const ImageClientMeta &meta) { + out << "[tag_class=" << meta.tag_class << "]"; + return out; +} + +std::ostream &operator<<(std::ostream &out, const MirrorPeerSyncPoint &sync) { + out << "[snap_name=" << sync.snap_name << ", " + << "from_snap_name=" << sync.from_snap_name; + if (sync.object_number) { + out << ", " << *sync.object_number; + } + out << "]"; + return out; +} + +std::ostream &operator<<(std::ostream &out, const MirrorPeerState &state) { + switch (state) { + case MIRROR_PEER_STATE_SYNCING: + out << "Syncing"; + break; + case MIRROR_PEER_STATE_REPLAYING: + out << "Replaying"; + break; + default: + out << "Unknown (" << static_cast<uint32_t>(state) << ")"; + break; + } + return out; +} + +std::ostream &operator<<(std::ostream &out, const MirrorPeerClientMeta &meta) { + out << "[image_id=" << meta.image_id << ", " + << "state=" << meta.state << ", " + << "sync_object_count=" << meta.sync_object_count << ", " + << "sync_points=["; + std::string delimiter; + for (auto &sync_point : meta.sync_points) { + out << delimiter << "[" << sync_point << "]"; + delimiter = ", "; + } + out << "], snap_seqs=["; + delimiter = ""; + for (auto &pair : meta.snap_seqs) { + out << delimiter << "[" + << "local_snap_seq=" << pair.first << ", " + << "peer_snap_seq" << pair.second << "]"; + delimiter = ", "; + } + out << "]"; + return out; +} + +std::ostream &operator<<(std::ostream &out, const TagPredecessor &predecessor) { + out << "[" + << "mirror_uuid=" << predecessor.mirror_uuid; + if (predecessor.commit_valid) { + out << ", " + << "tag_tid=" << predecessor.tag_tid << ", " + << "entry_tid=" << predecessor.entry_tid; + } + out << "]"; + return out; +} + +std::ostream &operator<<(std::ostream &out, const TagData &tag_data) { + out << "[" + << "mirror_uuid=" << tag_data.mirror_uuid << ", " + << "predecessor=" << tag_data.predecessor + << "]"; + return out; +} + +} // namespace journal +} // namespace librbd + diff --git a/src/librbd/journal/Types.h b/src/librbd/journal/Types.h new file mode 100644 index 000000000..ae5681ade --- /dev/null +++ b/src/librbd/journal/Types.h @@ -0,0 +1,685 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#ifndef CEPH_LIBRBD_JOURNAL_TYPES_H +#define CEPH_LIBRBD_JOURNAL_TYPES_H + +#include "cls/rbd/cls_rbd_types.h" +#include "include/int_types.h" +#include "include/buffer.h" +#include "include/encoding.h" +#include "include/types.h" +#include "include/utime.h" +#include "librbd/Types.h" +#include <iosfwd> +#include <list> +#include <boost/none.hpp> +#include <boost/optional.hpp> +#include <boost/variant.hpp> +#include <boost/mpl/vector.hpp> + +namespace ceph { +class Formatter; +} + +namespace librbd { +namespace journal { + +enum EventType { + EVENT_TYPE_AIO_DISCARD = 0, + EVENT_TYPE_AIO_WRITE = 1, + EVENT_TYPE_AIO_FLUSH = 2, + EVENT_TYPE_OP_FINISH = 3, + EVENT_TYPE_SNAP_CREATE = 4, + EVENT_TYPE_SNAP_REMOVE = 5, + EVENT_TYPE_SNAP_RENAME = 6, + EVENT_TYPE_SNAP_PROTECT = 7, + EVENT_TYPE_SNAP_UNPROTECT = 8, + EVENT_TYPE_SNAP_ROLLBACK = 9, + EVENT_TYPE_RENAME = 10, + EVENT_TYPE_RESIZE = 11, + EVENT_TYPE_FLATTEN = 12, + EVENT_TYPE_DEMOTE_PROMOTE = 13, + EVENT_TYPE_SNAP_LIMIT = 14, + EVENT_TYPE_UPDATE_FEATURES = 15, + EVENT_TYPE_METADATA_SET = 16, + EVENT_TYPE_METADATA_REMOVE = 17, + EVENT_TYPE_AIO_WRITESAME = 18, + EVENT_TYPE_AIO_COMPARE_AND_WRITE = 19, +}; + +struct AioDiscardEvent { + static const EventType TYPE = EVENT_TYPE_AIO_DISCARD; + + uint64_t offset = 0; + uint64_t length = 0; + uint32_t discard_granularity_bytes = 0; + + AioDiscardEvent() { + } + AioDiscardEvent(uint64_t _offset, uint64_t _length, + uint32_t discard_granularity_bytes) + : offset(_offset), length(_length), + discard_granularity_bytes(discard_granularity_bytes) { + } + + void encode(bufferlist& bl) const; + void decode(__u8 version, bufferlist::const_iterator& it); + void dump(Formatter *f) const; +}; + +struct AioWriteEvent { + static const EventType TYPE = EVENT_TYPE_AIO_WRITE; + + uint64_t offset; + uint64_t length; + bufferlist data; + + static uint32_t get_fixed_size(); + + AioWriteEvent() : offset(0), length(0) { + } + AioWriteEvent(uint64_t _offset, uint64_t _length, const bufferlist &_data) + : offset(_offset), length(_length), data(_data) { + } + + void encode(bufferlist& bl) const; + void decode(__u8 version, bufferlist::const_iterator& it); + void dump(Formatter *f) const; +}; + +struct AioWriteSameEvent { + static const EventType TYPE = EVENT_TYPE_AIO_WRITESAME; + + uint64_t offset; + uint64_t length; + bufferlist data; + + AioWriteSameEvent() : offset(0), length(0) { + } + AioWriteSameEvent(uint64_t _offset, uint64_t _length, + const bufferlist &_data) + : offset(_offset), length(_length), data(_data) { + } + + void encode(bufferlist& bl) const; + void decode(__u8 version, bufferlist::const_iterator& it); + void dump(Formatter *f) const; +}; + +struct AioCompareAndWriteEvent { + static const EventType TYPE = EVENT_TYPE_AIO_COMPARE_AND_WRITE; + + uint64_t offset; + uint64_t length; + bufferlist cmp_data; + bufferlist write_data; + + static uint32_t get_fixed_size(); + + AioCompareAndWriteEvent() : offset(0), length(0) { + } + AioCompareAndWriteEvent(uint64_t _offset, uint64_t _length, + const bufferlist &_cmp_data, const bufferlist &_write_data) + : offset(_offset), length(_length), cmp_data(_cmp_data), write_data(_write_data) { + } + + void encode(bufferlist& bl) const; + void decode(__u8 version, bufferlist::const_iterator& it); + void dump(Formatter *f) const; +}; + +struct AioFlushEvent { + static const EventType TYPE = EVENT_TYPE_AIO_FLUSH; + + void encode(bufferlist& bl) const; + void decode(__u8 version, bufferlist::const_iterator& it); + void dump(Formatter *f) const; +}; + +struct OpEventBase { + uint64_t op_tid; + +protected: + OpEventBase() : op_tid(0) { + } + OpEventBase(uint64_t op_tid) : op_tid(op_tid) { + } + + void encode(bufferlist& bl) const; + void decode(__u8 version, bufferlist::const_iterator& it); + void dump(Formatter *f) const; +}; + +struct OpFinishEvent : public OpEventBase { + static const EventType TYPE = EVENT_TYPE_OP_FINISH; + + int r; + + OpFinishEvent() : r(0) { + } + OpFinishEvent(uint64_t op_tid, int r) : OpEventBase(op_tid), r(r) { + } + + void encode(bufferlist& bl) const; + void decode(__u8 version, bufferlist::const_iterator& it); + void dump(Formatter *f) const; +}; + +struct SnapEventBase : public OpEventBase { + cls::rbd::SnapshotNamespace snap_namespace; + std::string snap_name; + +protected: + SnapEventBase() { + } + SnapEventBase(uint64_t op_tid, const cls::rbd::SnapshotNamespace& _snap_namespace, + const std::string &_snap_name) + : OpEventBase(op_tid), + snap_namespace(_snap_namespace), + snap_name(_snap_name) { + } + + void encode(bufferlist& bl) const; + void decode(__u8 version, bufferlist::const_iterator& it); + void dump(Formatter *f) const; +}; + +struct SnapCreateEvent : public SnapEventBase { + static const EventType TYPE = EVENT_TYPE_SNAP_CREATE; + + SnapCreateEvent() { + } + SnapCreateEvent(uint64_t op_tid, const cls::rbd::SnapshotNamespace& snap_namespace, + const std::string &snap_name) + : SnapEventBase(op_tid, snap_namespace, snap_name) { + } + + void encode(bufferlist& bl) const; + void decode(__u8 version, bufferlist::const_iterator& it); + void dump(Formatter *f) const; +}; + +struct SnapRemoveEvent : public SnapEventBase { + static const EventType TYPE = EVENT_TYPE_SNAP_REMOVE; + + SnapRemoveEvent() { + } + SnapRemoveEvent(uint64_t op_tid, const cls::rbd::SnapshotNamespace& snap_namespace, + const std::string &snap_name) + : SnapEventBase(op_tid, snap_namespace, snap_name) { + } + + using SnapEventBase::encode; + using SnapEventBase::decode; + using SnapEventBase::dump; +}; + +struct SnapRenameEvent : public OpEventBase{ + static const EventType TYPE = EVENT_TYPE_SNAP_RENAME; + + uint64_t snap_id; + std::string src_snap_name; + std::string dst_snap_name; + + SnapRenameEvent() : snap_id(CEPH_NOSNAP) { + } + SnapRenameEvent(uint64_t op_tid, uint64_t src_snap_id, + const std::string &src_snap_name, + const std::string &dest_snap_name) + : OpEventBase(op_tid), + snap_id(src_snap_id), + src_snap_name(src_snap_name), + dst_snap_name(dest_snap_name) { + } + + void encode(bufferlist& bl) const; + void decode(__u8 version, bufferlist::const_iterator& it); + void dump(Formatter *f) const; +}; + +struct SnapProtectEvent : public SnapEventBase { + static const EventType TYPE = EVENT_TYPE_SNAP_PROTECT; + + SnapProtectEvent() { + } + SnapProtectEvent(uint64_t op_tid, const cls::rbd::SnapshotNamespace& snap_namespace, + const std::string &snap_name) + : SnapEventBase(op_tid, snap_namespace, snap_name) { + } + + using SnapEventBase::encode; + using SnapEventBase::decode; + using SnapEventBase::dump; +}; + +struct SnapUnprotectEvent : public SnapEventBase { + static const EventType TYPE = EVENT_TYPE_SNAP_UNPROTECT; + + SnapUnprotectEvent() { + } + SnapUnprotectEvent(uint64_t op_tid, const cls::rbd::SnapshotNamespace &snap_namespace, + const std::string &snap_name) + : SnapEventBase(op_tid, snap_namespace, snap_name) { + } + + using SnapEventBase::encode; + using SnapEventBase::decode; + using SnapEventBase::dump; +}; + +struct SnapLimitEvent : public OpEventBase { + static const EventType TYPE = EVENT_TYPE_SNAP_LIMIT; + uint64_t limit; + + SnapLimitEvent() { + } + SnapLimitEvent(uint64_t op_tid, const uint64_t _limit) + : OpEventBase(op_tid), limit(_limit) { + } + + void encode(bufferlist& bl) const; + void decode(__u8 version, bufferlist::const_iterator& it); + void dump(Formatter *f) const; +}; + +struct SnapRollbackEvent : public SnapEventBase { + static const EventType TYPE = EVENT_TYPE_SNAP_ROLLBACK; + + SnapRollbackEvent() { + } + SnapRollbackEvent(uint64_t op_tid, const cls::rbd::SnapshotNamespace& snap_namespace, + const std::string &snap_name) + : SnapEventBase(op_tid, snap_namespace, snap_name) { + } + + using SnapEventBase::encode; + using SnapEventBase::decode; + using SnapEventBase::dump; +}; + +struct RenameEvent : public OpEventBase { + static const EventType TYPE = EVENT_TYPE_RENAME; + + std::string image_name; + + RenameEvent() { + } + RenameEvent(uint64_t op_tid, const std::string &_image_name) + : OpEventBase(op_tid), image_name(_image_name) { + } + + void encode(bufferlist& bl) const; + void decode(__u8 version, bufferlist::const_iterator& it); + void dump(Formatter *f) const; +}; + +struct ResizeEvent : public OpEventBase { + static const EventType TYPE = EVENT_TYPE_RESIZE; + + uint64_t size; + + ResizeEvent() : size(0) { + } + ResizeEvent(uint64_t op_tid, uint64_t _size) + : OpEventBase(op_tid), size(_size) { + } + + void encode(bufferlist& bl) const; + void decode(__u8 version, bufferlist::const_iterator& it); + void dump(Formatter *f) const; +}; + +struct FlattenEvent : public OpEventBase { + static const EventType TYPE = EVENT_TYPE_FLATTEN; + + FlattenEvent() { + } + FlattenEvent(uint64_t op_tid) : OpEventBase(op_tid) { + } + + using OpEventBase::encode; + using OpEventBase::decode; + using OpEventBase::dump; +}; + +struct DemotePromoteEvent { + static const EventType TYPE = static_cast<EventType>( + EVENT_TYPE_DEMOTE_PROMOTE); + + void encode(bufferlist& bl) const; + void decode(__u8 version, bufferlist::const_iterator& it); + void dump(Formatter *f) const; +}; + +struct UpdateFeaturesEvent : public OpEventBase { + static const EventType TYPE = EVENT_TYPE_UPDATE_FEATURES; + + uint64_t features; + bool enabled; + + UpdateFeaturesEvent() : features(0), enabled(false) { + } + UpdateFeaturesEvent(uint64_t op_tid, uint64_t _features, bool _enabled) + : OpEventBase(op_tid), features(_features), enabled(_enabled) { + } + + void encode(bufferlist& bl) const; + void decode(__u8 version, bufferlist::const_iterator& it); + void dump(Formatter *f) const; +}; + +struct MetadataSetEvent : public OpEventBase { + static const EventType TYPE = EVENT_TYPE_METADATA_SET; + + string key; + string value; + + MetadataSetEvent() { + } + MetadataSetEvent(uint64_t op_tid, const string &_key, const string &_value) + : OpEventBase(op_tid), key(_key), value(_value) { + } + + void encode(bufferlist& bl) const; + void decode(__u8 version, bufferlist::const_iterator& it); + void dump(Formatter *f) const; +}; + +struct MetadataRemoveEvent : public OpEventBase { + static const EventType TYPE = EVENT_TYPE_METADATA_REMOVE; + + string key; + + MetadataRemoveEvent() { + } + MetadataRemoveEvent(uint64_t op_tid, const string &_key) + : OpEventBase(op_tid), key(_key) { + } + + void encode(bufferlist& bl) const; + void decode(__u8 version, bufferlist::const_iterator& it); + void dump(Formatter *f) const; +}; + +struct UnknownEvent { + static const EventType TYPE = static_cast<EventType>(-1); + + void encode(bufferlist& bl) const; + void decode(__u8 version, bufferlist::const_iterator& it); + void dump(Formatter *f) const; +}; + +typedef boost::mpl::vector<AioDiscardEvent, + AioWriteEvent, + AioFlushEvent, + OpFinishEvent, + SnapCreateEvent, + SnapRemoveEvent, + SnapRenameEvent, + SnapProtectEvent, + SnapUnprotectEvent, + SnapRollbackEvent, + RenameEvent, + ResizeEvent, + FlattenEvent, + DemotePromoteEvent, + SnapLimitEvent, + UpdateFeaturesEvent, + MetadataSetEvent, + MetadataRemoveEvent, + AioWriteSameEvent, + AioCompareAndWriteEvent, + UnknownEvent> EventVector; +typedef boost::make_variant_over<EventVector>::type Event; + +struct EventEntry { + static uint32_t get_fixed_size() { + return EVENT_FIXED_SIZE + METADATA_FIXED_SIZE; + } + + EventEntry() : event(UnknownEvent()) { + } + EventEntry(const Event &_event, const utime_t &_timestamp = utime_t()) + : event(_event), timestamp(_timestamp) { + } + + Event event; + utime_t timestamp; + + EventType get_event_type() const; + + void encode(bufferlist& bl) const; + void decode(bufferlist::const_iterator& it); + void dump(Formatter *f) const; + + static void generate_test_instances(std::list<EventEntry *> &o); + +private: + static const uint32_t EVENT_FIXED_SIZE = 14; /// version encoding, type + static const uint32_t METADATA_FIXED_SIZE = 14; /// version encoding, timestamp + + void encode_metadata(bufferlist& bl) const; + void decode_metadata(bufferlist::const_iterator& it); +}; + +// Journal Client data structures + +enum ClientMetaType { + IMAGE_CLIENT_META_TYPE = 0, + MIRROR_PEER_CLIENT_META_TYPE = 1, + CLI_CLIENT_META_TYPE = 2 +}; + +struct ImageClientMeta { + static const ClientMetaType TYPE = IMAGE_CLIENT_META_TYPE; + + uint64_t tag_class = 0; + bool resync_requested = false; + + ImageClientMeta() { + } + ImageClientMeta(uint64_t tag_class) : tag_class(tag_class) { + } + + void encode(bufferlist& bl) const; + void decode(__u8 version, bufferlist::const_iterator& it); + void dump(Formatter *f) const; +}; + +struct MirrorPeerSyncPoint { + typedef boost::optional<uint64_t> ObjectNumber; + + cls::rbd::SnapshotNamespace snap_namespace; + std::string snap_name; + std::string from_snap_name; + ObjectNumber object_number; + + MirrorPeerSyncPoint() : MirrorPeerSyncPoint({}, "", "", boost::none) { + } + MirrorPeerSyncPoint(const cls::rbd::SnapshotNamespace& snap_namespace, + const std::string &snap_name, + const ObjectNumber &object_number) + : MirrorPeerSyncPoint(snap_namespace, snap_name, "", object_number) { + } + MirrorPeerSyncPoint(const cls::rbd::SnapshotNamespace& snap_namespace, + const std::string &snap_name, + const std::string &from_snap_name, + const ObjectNumber &object_number) + : snap_namespace(snap_namespace), snap_name(snap_name), + from_snap_name(from_snap_name), object_number(object_number) { + } + + inline bool operator==(const MirrorPeerSyncPoint &sync) const { + return (snap_name == sync.snap_name && + from_snap_name == sync.from_snap_name && + object_number == sync.object_number && + snap_namespace == sync.snap_namespace); + } + + void encode(bufferlist& bl) const; + void decode(__u8 version, bufferlist::const_iterator& it); + void dump(Formatter *f) const; +}; + +enum MirrorPeerState { + MIRROR_PEER_STATE_SYNCING, + MIRROR_PEER_STATE_REPLAYING +}; + +struct MirrorPeerClientMeta { + typedef std::list<MirrorPeerSyncPoint> SyncPoints; + + static const ClientMetaType TYPE = MIRROR_PEER_CLIENT_META_TYPE; + + std::string image_id; + MirrorPeerState state = MIRROR_PEER_STATE_SYNCING; ///< replay state + uint64_t sync_object_count = 0; ///< maximum number of objects ever sync'ed + SyncPoints sync_points; ///< max two in-use snapshots for sync + SnapSeqs snap_seqs; ///< local to peer snap seq mapping + + MirrorPeerClientMeta() { + } + MirrorPeerClientMeta(const std::string &image_id, + const SyncPoints &sync_points = SyncPoints(), + const SnapSeqs &snap_seqs = SnapSeqs()) + : image_id(image_id), sync_points(sync_points), snap_seqs(snap_seqs) { + } + + inline bool operator==(const MirrorPeerClientMeta &meta) const { + return (image_id == meta.image_id && + state == meta.state && + sync_object_count == meta.sync_object_count && + sync_points == meta.sync_points && + snap_seqs == meta.snap_seqs); + } + + void encode(bufferlist& bl) const; + void decode(__u8 version, bufferlist::const_iterator& it); + void dump(Formatter *f) const; +}; + +struct CliClientMeta { + static const ClientMetaType TYPE = CLI_CLIENT_META_TYPE; + + void encode(bufferlist& bl) const; + void decode(__u8 version, bufferlist::const_iterator& it); + void dump(Formatter *f) const; +}; + +struct UnknownClientMeta { + static const ClientMetaType TYPE = static_cast<ClientMetaType>(-1); + + void encode(bufferlist& bl) const; + void decode(__u8 version, bufferlist::const_iterator& it); + void dump(Formatter *f) const; +}; + +typedef boost::variant<ImageClientMeta, + MirrorPeerClientMeta, + CliClientMeta, + UnknownClientMeta> ClientMeta; + +struct ClientData { + ClientData() { + } + ClientData(const ClientMeta &client_meta) : client_meta(client_meta) { + } + + ClientMeta client_meta; + + ClientMetaType get_client_meta_type() const; + + void encode(bufferlist& bl) const; + void decode(bufferlist::const_iterator& it); + void dump(Formatter *f) const; + + static void generate_test_instances(std::list<ClientData *> &o); +}; + +// Journal Tag data structures + +struct TagPredecessor { + std::string mirror_uuid; // empty if local + bool commit_valid = false; + uint64_t tag_tid = 0; + uint64_t entry_tid = 0; + + TagPredecessor() { + } + TagPredecessor(const std::string &mirror_uuid, bool commit_valid, + uint64_t tag_tid, uint64_t entry_tid) + : mirror_uuid(mirror_uuid), commit_valid(commit_valid), tag_tid(tag_tid), + entry_tid(entry_tid) { + } + + inline bool operator==(const TagPredecessor &rhs) const { + return (mirror_uuid == rhs.mirror_uuid && + commit_valid == rhs.commit_valid && + tag_tid == rhs.tag_tid && + entry_tid == rhs.entry_tid); + } + + void encode(bufferlist& bl) const; + void decode(bufferlist::const_iterator& it); + void dump(Formatter *f) const; +}; + +struct TagData { + // owner of the tag (exclusive lock epoch) + std::string mirror_uuid; // empty if local + + // mapping to last committed record of previous tag + TagPredecessor predecessor; + + TagData() { + } + TagData(const std::string &mirror_uuid) : mirror_uuid(mirror_uuid) { + } + TagData(const std::string &mirror_uuid, + const std::string &predecessor_mirror_uuid, + bool predecessor_commit_valid, + uint64_t predecessor_tag_tid, uint64_t predecessor_entry_tid) + : mirror_uuid(mirror_uuid), + predecessor(predecessor_mirror_uuid, predecessor_commit_valid, + predecessor_tag_tid, predecessor_entry_tid) { + } + + void encode(bufferlist& bl) const; + void decode(bufferlist::const_iterator& it); + void dump(Formatter *f) const; + + static void generate_test_instances(std::list<TagData *> &o); +}; + +std::ostream &operator<<(std::ostream &out, const EventType &type); +std::ostream &operator<<(std::ostream &out, const ClientMetaType &type); +std::ostream &operator<<(std::ostream &out, const ImageClientMeta &meta); +std::ostream &operator<<(std::ostream &out, const MirrorPeerSyncPoint &sync); +std::ostream &operator<<(std::ostream &out, const MirrorPeerState &meta); +std::ostream &operator<<(std::ostream &out, const MirrorPeerClientMeta &meta); +std::ostream &operator<<(std::ostream &out, const TagPredecessor &predecessor); +std::ostream &operator<<(std::ostream &out, const TagData &tag_data); + +struct Listener { + virtual ~Listener() { + } + + /// invoked when journal close is requested + virtual void handle_close() = 0; + + /// invoked when journal is promoted to primary + virtual void handle_promoted() = 0; + + /// invoked when journal resync is requested + virtual void handle_resync() = 0; +}; + +WRITE_CLASS_ENCODER(EventEntry); +WRITE_CLASS_ENCODER(ClientData); +WRITE_CLASS_ENCODER(TagData); + +} // namespace journal +} // namespace librbd + +#endif // CEPH_LIBRBD_JOURNAL_TYPES_H diff --git a/src/librbd/journal/Utils.cc b/src/librbd/journal/Utils.cc new file mode 100644 index 000000000..231bcae2d --- /dev/null +++ b/src/librbd/journal/Utils.cc @@ -0,0 +1,86 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#include "librbd/journal/Utils.h" +#include "common/dout.h" +#include "common/errno.h" +#include "librbd/journal/Types.h" + +#define dout_subsys ceph_subsys_rbd +#undef dout_prefix +#define dout_prefix *_dout << "librbd::journal::" + +namespace librbd { +namespace journal { +namespace util { + +int C_DecodeTag::decode(bufferlist::const_iterator *it, TagData *tag_data) { + try { + using ceph::decode; + decode(*tag_data, *it); + } catch (const buffer::error &err) { + return -EBADMSG; + } + return 0; +} + +int C_DecodeTag::process(int r) { + if (r < 0) { + lderr(cct) << "C_DecodeTag: " << this << " " << __func__ << ": " + << "failed to allocate tag: " << cpp_strerror(r) + << dendl; + return r; + } + + std::lock_guard locker{*lock}; + *tag_tid = tag.tid; + + auto data_it = tag.data.cbegin(); + r = decode(&data_it, tag_data); + if (r < 0) { + lderr(cct) << "C_DecodeTag: " << this << " " << __func__ << ": " + << "failed to decode allocated tag" << dendl; + return r; + } + + ldout(cct, 20) << "C_DecodeTag: " << this << " " << __func__ << ": " + << "allocated journal tag: " + << "tid=" << tag.tid << ", " + << "data=" << *tag_data << dendl; + return 0; +} + +int C_DecodeTags::process(int r) { + if (r < 0) { + lderr(cct) << "C_DecodeTags: " << this << " " << __func__ << ": " + << "failed to retrieve journal tags: " << cpp_strerror(r) + << dendl; + return r; + } + + if (tags.empty()) { + lderr(cct) << "C_DecodeTags: " << this << " " << __func__ << ": " + << "no journal tags retrieved" << dendl; + return -ENOENT; + } + + std::lock_guard locker{*lock}; + *tag_tid = tags.back().tid; + auto data_it = tags.back().data.cbegin(); + r = C_DecodeTag::decode(&data_it, tag_data); + if (r < 0) { + lderr(cct) << "C_DecodeTags: " << this << " " << __func__ << ": " + << "failed to decode journal tag" << dendl; + return r; + } + + ldout(cct, 20) << "C_DecodeTags: " << this << " " << __func__ << ": " + << "most recent journal tag: " + << "tid=" << *tag_tid << ", " + << "data=" << *tag_data << dendl; + return 0; +} + +} // namespace util +} // namespace journal +} // namespace librbd diff --git a/src/librbd/journal/Utils.h b/src/librbd/journal/Utils.h new file mode 100644 index 000000000..93643f9f9 --- /dev/null +++ b/src/librbd/journal/Utils.h @@ -0,0 +1,80 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#ifndef CEPH_LIBRBD_JOURNAL_UTILS_H +#define CEPH_LIBRBD_JOURNAL_UTILS_H + +#include "include/common_fwd.h" +#include "include/int_types.h" +#include "include/Context.h" +#include "cls/journal/cls_journal_types.h" +#include <list> + + +namespace librbd { +namespace journal { + +struct TagData; + +namespace util { + +struct C_DecodeTag : public Context { + CephContext *cct; + ceph::mutex *lock; + uint64_t *tag_tid; + TagData *tag_data; + Context *on_finish; + + cls::journal::Tag tag; + + C_DecodeTag(CephContext *cct, ceph::mutex *lock, uint64_t *tag_tid, + TagData *tag_data, Context *on_finish) + : cct(cct), lock(lock), tag_tid(tag_tid), tag_data(tag_data), + on_finish(on_finish) { + } + + void complete(int r) override { + on_finish->complete(process(r)); + Context::complete(0); + } + void finish(int r) override { + } + + int process(int r); + + static int decode(bufferlist::const_iterator *it, TagData *tag_data); + +}; + +struct C_DecodeTags : public Context { + typedef std::list<cls::journal::Tag> Tags; + + CephContext *cct; + ceph::mutex *lock; + uint64_t *tag_tid; + TagData *tag_data; + Context *on_finish; + + Tags tags; + + C_DecodeTags(CephContext *cct, ceph::mutex *lock, uint64_t *tag_tid, + TagData *tag_data, Context *on_finish) + : cct(cct), lock(lock), tag_tid(tag_tid), tag_data(tag_data), + on_finish(on_finish) { + } + + void complete(int r) override { + on_finish->complete(process(r)); + Context::complete(0); + } + void finish(int r) override { + } + + int process(int r); +}; + +} // namespace util +} // namespace journal +} // namespace librbd + +#endif // CEPH_LIBRBD_JOURNAL_UTILS_H |