From 19fcec84d8d7d21e796c7624e521b60d28ee21ed Mon Sep 17 00:00:00 2001 From: Daniel Baumann Date: Sun, 7 Apr 2024 20:45:59 +0200 Subject: Adding upstream version 16.2.11+ds. Signed-off-by: Daniel Baumann --- src/rbd_replay/ActionTypes.cc | 431 ++++++++++++++++++++++++++ src/rbd_replay/ActionTypes.h | 339 +++++++++++++++++++++ src/rbd_replay/BoundedBuffer.hpp | 71 +++++ src/rbd_replay/BufferReader.cc | 37 +++ src/rbd_replay/BufferReader.h | 34 +++ src/rbd_replay/CMakeLists.txt | 44 +++ src/rbd_replay/ImageNameMap.cc | 69 +++++ src/rbd_replay/ImageNameMap.hpp | 54 ++++ src/rbd_replay/PendingIO.cc | 44 +++ src/rbd_replay/PendingIO.hpp | 64 ++++ src/rbd_replay/Replayer.cc | 406 +++++++++++++++++++++++++ src/rbd_replay/Replayer.hpp | 167 +++++++++++ src/rbd_replay/actions.cc | 249 +++++++++++++++ src/rbd_replay/actions.hpp | 344 +++++++++++++++++++++ src/rbd_replay/ios.cc | 220 ++++++++++++++ src/rbd_replay/ios.hpp | 401 +++++++++++++++++++++++++ src/rbd_replay/rbd-replay-prep.cc | 584 ++++++++++++++++++++++++++++++++++++ src/rbd_replay/rbd-replay.cc | 131 ++++++++ src/rbd_replay/rbd_loc.cc | 130 ++++++++ src/rbd_replay/rbd_loc.hpp | 90 ++++++ src/rbd_replay/rbd_replay_debug.hpp | 34 +++ 21 files changed, 3943 insertions(+) create mode 100644 src/rbd_replay/ActionTypes.cc create mode 100644 src/rbd_replay/ActionTypes.h create mode 100644 src/rbd_replay/BoundedBuffer.hpp create mode 100644 src/rbd_replay/BufferReader.cc create mode 100644 src/rbd_replay/BufferReader.h create mode 100644 src/rbd_replay/CMakeLists.txt create mode 100644 src/rbd_replay/ImageNameMap.cc create mode 100644 src/rbd_replay/ImageNameMap.hpp create mode 100644 src/rbd_replay/PendingIO.cc create mode 100644 src/rbd_replay/PendingIO.hpp create mode 100644 src/rbd_replay/Replayer.cc create mode 100644 src/rbd_replay/Replayer.hpp create mode 100644 src/rbd_replay/actions.cc create mode 100644 src/rbd_replay/actions.hpp create mode 100644 src/rbd_replay/ios.cc create mode 100644 src/rbd_replay/ios.hpp create mode 100644 src/rbd_replay/rbd-replay-prep.cc create mode 100644 src/rbd_replay/rbd-replay.cc create mode 100644 src/rbd_replay/rbd_loc.cc create mode 100644 src/rbd_replay/rbd_loc.hpp create mode 100644 src/rbd_replay/rbd_replay_debug.hpp (limited to 'src/rbd_replay') diff --git a/src/rbd_replay/ActionTypes.cc b/src/rbd_replay/ActionTypes.cc new file mode 100644 index 000000000..e86aa6479 --- /dev/null +++ b/src/rbd_replay/ActionTypes.cc @@ -0,0 +1,431 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#include "rbd_replay/ActionTypes.h" +#include "include/ceph_assert.h" +#include "include/byteorder.h" +#include "include/stringify.h" +#include "common/Formatter.h" +#include +#include + +namespace rbd_replay { +namespace action { + +namespace { + +bool byte_swap_required(__u8 version) { +#if defined(CEPH_LITTLE_ENDIAN) + return (version == 0); +#else + return false; +#endif +} + +void decode_big_endian_string(std::string &str, bufferlist::const_iterator &it) { + using ceph::decode; +#if defined(CEPH_LITTLE_ENDIAN) + uint32_t length; + decode(length, it); + length = swab(length); + str.clear(); + it.copy(length, str); +#else + ceph_abort(); +#endif +} + +class EncodeVisitor : public boost::static_visitor { +public: + explicit EncodeVisitor(bufferlist &bl) : m_bl(bl) { + } + + template + inline void operator()(const Action &action) const { + using ceph::encode; + encode(static_cast(Action::ACTION_TYPE), m_bl); + action.encode(m_bl); + } +private: + bufferlist &m_bl; +}; + +class DecodeVisitor : public boost::static_visitor { +public: + DecodeVisitor(__u8 version, bufferlist::const_iterator &iter) + : m_version(version), m_iter(iter) { + } + + template + inline void operator()(Action &action) const { + action.decode(m_version, m_iter); + } +private: + __u8 m_version; + bufferlist::const_iterator &m_iter; +}; + +class DumpVisitor : public boost::static_visitor { +public: + explicit DumpVisitor(Formatter *formatter) : m_formatter(formatter) {} + + template + inline void operator()(const Action &action) const { + ActionType action_type = Action::ACTION_TYPE; + m_formatter->dump_string("action_type", stringify(action_type)); + action.dump(m_formatter); + } +private: + ceph::Formatter *m_formatter; +}; + +} // anonymous namespace + +void Dependency::encode(bufferlist &bl) const { + using ceph::encode; + encode(id, bl); + encode(time_delta, bl); +} + +void Dependency::decode(bufferlist::const_iterator &it) { + decode(1, it); +} + +void Dependency::decode(__u8 version, bufferlist::const_iterator &it) { + using ceph::decode; + decode(id, it); + decode(time_delta, it); + if (byte_swap_required(version)) { + id = swab(id); + time_delta = swab(time_delta); + } +} + +void Dependency::dump(Formatter *f) const { + f->dump_unsigned("id", id); + f->dump_unsigned("time_delta", time_delta); +} + +void Dependency::generate_test_instances(std::list &o) { + o.push_back(new Dependency()); + o.push_back(new Dependency(1, 123456789)); +} + +void ActionBase::encode(bufferlist &bl) const { + using ceph::encode; + encode(id, bl); + encode(thread_id, bl); + encode(dependencies, bl); +} + +void ActionBase::decode(__u8 version, bufferlist::const_iterator &it) { + using ceph::decode; + decode(id, it); + decode(thread_id, it); + if (version == 0) { + uint32_t num_successors; + decode(num_successors, it); + + uint32_t num_completion_successors; + decode(num_completion_successors, it); + } + + if (byte_swap_required(version)) { + id = swab(id); + thread_id = swab(thread_id); + + uint32_t dep_count; + decode(dep_count, it); + dep_count = swab(dep_count); + dependencies.resize(dep_count); + for (uint32_t i = 0; i < dep_count; ++i) { + dependencies[i].decode(0, it); + } + } else { + decode(dependencies, it); + } +} + +void ActionBase::dump(Formatter *f) const { + f->dump_unsigned("id", id); + f->dump_unsigned("thread_id", thread_id); + f->open_array_section("dependencies"); + for (size_t i = 0; i < dependencies.size(); ++i) { + f->open_object_section("dependency"); + dependencies[i].dump(f); + f->close_section(); + } + f->close_section(); +} + +void ImageActionBase::encode(bufferlist &bl) const { + using ceph::encode; + ActionBase::encode(bl); + encode(imagectx_id, bl); +} + +void ImageActionBase::decode(__u8 version, bufferlist::const_iterator &it) { + using ceph::decode; + ActionBase::decode(version, it); + decode(imagectx_id, it); + if (byte_swap_required(version)) { + imagectx_id = swab(imagectx_id); + } +} + +void ImageActionBase::dump(Formatter *f) const { + ActionBase::dump(f); + f->dump_unsigned("imagectx_id", imagectx_id); +} + +void IoActionBase::encode(bufferlist &bl) const { + using ceph::encode; + ImageActionBase::encode(bl); + encode(offset, bl); + encode(length, bl); +} + +void IoActionBase::decode(__u8 version, bufferlist::const_iterator &it) { + using ceph::decode; + ImageActionBase::decode(version, it); + decode(offset, it); + decode(length, it); + if (byte_swap_required(version)) { + offset = swab(offset); + length = swab(length); + } +} + +void IoActionBase::dump(Formatter *f) const { + ImageActionBase::dump(f); + f->dump_unsigned("offset", offset); + f->dump_unsigned("length", length); +} + +void OpenImageAction::encode(bufferlist &bl) const { + using ceph::encode; + ImageActionBase::encode(bl); + encode(name, bl); + encode(snap_name, bl); + encode(read_only, bl); +} + +void OpenImageAction::decode(__u8 version, bufferlist::const_iterator &it) { + using ceph::decode; + ImageActionBase::decode(version, it); + if (byte_swap_required(version)) { + decode_big_endian_string(name, it); + decode_big_endian_string(snap_name, it); + } else { + decode(name, it); + decode(snap_name, it); + } + decode(read_only, it); +} + +void OpenImageAction::dump(Formatter *f) const { + ImageActionBase::dump(f); + f->dump_string("name", name); + f->dump_string("snap_name", snap_name); + f->dump_bool("read_only", read_only); +} + +void AioOpenImageAction::encode(bufferlist &bl) const { + using ceph::encode; + ImageActionBase::encode(bl); + encode(name, bl); + encode(snap_name, bl); + encode(read_only, bl); +} + +void AioOpenImageAction::decode(__u8 version, bufferlist::const_iterator &it) { + using ceph::decode; + ImageActionBase::decode(version, it); + if (byte_swap_required(version)) { + decode_big_endian_string(name, it); + decode_big_endian_string(snap_name, it); + } else { + decode(name, it); + decode(snap_name, it); + } + decode(read_only, it); +} + +void AioOpenImageAction::dump(Formatter *f) const { + ImageActionBase::dump(f); + f->dump_string("name", name); + f->dump_string("snap_name", snap_name); + f->dump_bool("read_only", read_only); +} + +void UnknownAction::encode(bufferlist &bl) const { + ceph_abort(); +} + +void UnknownAction::decode(__u8 version, bufferlist::const_iterator &it) { +} + +void UnknownAction::dump(Formatter *f) const { +} + +void ActionEntry::encode(bufferlist &bl) const { + ENCODE_START(1, 1, bl); + boost::apply_visitor(EncodeVisitor(bl), action); + ENCODE_FINISH(bl); +} + +void ActionEntry::decode(bufferlist::const_iterator &it) { + DECODE_START(1, it); + decode_versioned(struct_v, it); + DECODE_FINISH(it); +} + +void ActionEntry::decode_unversioned(bufferlist::const_iterator &it) { + decode_versioned(0, it); +} + +void ActionEntry::decode_versioned(__u8 version, bufferlist::const_iterator &it) { + using ceph::decode; + uint8_t action_type; + decode(action_type, it); + + // select the correct action variant based upon the action_type + switch (action_type) { + case ACTION_TYPE_START_THREAD: + action = StartThreadAction(); + break; + case ACTION_TYPE_STOP_THREAD: + action = StopThreadAction(); + break; + case ACTION_TYPE_READ: + action = ReadAction(); + break; + case ACTION_TYPE_WRITE: + action = WriteAction(); + break; + case ACTION_TYPE_DISCARD: + action = DiscardAction(); + break; + case ACTION_TYPE_AIO_READ: + action = AioReadAction(); + break; + case ACTION_TYPE_AIO_WRITE: + action = AioWriteAction(); + break; + case ACTION_TYPE_AIO_DISCARD: + action = AioDiscardAction(); + break; + case ACTION_TYPE_OPEN_IMAGE: + action = OpenImageAction(); + break; + case ACTION_TYPE_CLOSE_IMAGE: + action = CloseImageAction(); + break; + case ACTION_TYPE_AIO_OPEN_IMAGE: + action = AioOpenImageAction(); + break; + case ACTION_TYPE_AIO_CLOSE_IMAGE: + action = AioCloseImageAction(); + break; + } + + boost::apply_visitor(DecodeVisitor(version, it), action); +} + +void ActionEntry::dump(Formatter *f) const { + boost::apply_visitor(DumpVisitor(f), action); +} + +void ActionEntry::generate_test_instances(std::list &o) { + Dependencies dependencies; + dependencies.push_back(Dependency(3, 123456789)); + dependencies.push_back(Dependency(4, 234567890)); + + o.push_back(new ActionEntry(StartThreadAction())); + o.push_back(new ActionEntry(StartThreadAction(1, 123456789, dependencies))); + o.push_back(new ActionEntry(StopThreadAction())); + o.push_back(new ActionEntry(StopThreadAction(1, 123456789, dependencies))); + + o.push_back(new ActionEntry(ReadAction())); + o.push_back(new ActionEntry(ReadAction(1, 123456789, dependencies, 3, 4, 5))); + o.push_back(new ActionEntry(WriteAction())); + o.push_back(new ActionEntry(WriteAction(1, 123456789, dependencies, 3, 4, + 5))); + o.push_back(new ActionEntry(DiscardAction())); + o.push_back(new ActionEntry(DiscardAction(1, 123456789, dependencies, 3, 4, + 5))); + o.push_back(new ActionEntry(AioReadAction())); + o.push_back(new ActionEntry(AioReadAction(1, 123456789, dependencies, 3, 4, + 5))); + o.push_back(new ActionEntry(AioWriteAction())); + o.push_back(new ActionEntry(AioWriteAction(1, 123456789, dependencies, 3, 4, + 5))); + o.push_back(new ActionEntry(AioDiscardAction())); + o.push_back(new ActionEntry(AioDiscardAction(1, 123456789, dependencies, 3, 4, + 5))); + + o.push_back(new ActionEntry(OpenImageAction())); + o.push_back(new ActionEntry(OpenImageAction(1, 123456789, dependencies, 3, + "image_name", "snap_name", + true))); + o.push_back(new ActionEntry(CloseImageAction())); + o.push_back(new ActionEntry(CloseImageAction(1, 123456789, dependencies, 3))); + + o.push_back(new ActionEntry(AioOpenImageAction())); + o.push_back(new ActionEntry(AioOpenImageAction(1, 123456789, dependencies, 3, + "image_name", "snap_name", + true))); + o.push_back(new ActionEntry(AioCloseImageAction())); + o.push_back(new ActionEntry(AioCloseImageAction(1, 123456789, dependencies, 3))); +} + +std::ostream &operator<<(std::ostream &out, + const ActionType &type) { + using namespace rbd_replay::action; + + switch (type) { + case ACTION_TYPE_START_THREAD: + out << "StartThread"; + break; + case ACTION_TYPE_STOP_THREAD: + out << "StopThread"; + break; + case ACTION_TYPE_READ: + out << "Read"; + break; + case ACTION_TYPE_WRITE: + out << "Write"; + break; + case ACTION_TYPE_DISCARD: + out << "Discard"; + break; + case ACTION_TYPE_AIO_READ: + out << "AioRead"; + break; + case ACTION_TYPE_AIO_WRITE: + out << "AioWrite"; + break; + case ACTION_TYPE_AIO_DISCARD: + out << "AioDiscard"; + break; + case ACTION_TYPE_OPEN_IMAGE: + out << "OpenImage"; + break; + case ACTION_TYPE_CLOSE_IMAGE: + out << "CloseImage"; + break; + case ACTION_TYPE_AIO_OPEN_IMAGE: + out << "AioOpenImage"; + break; + case ACTION_TYPE_AIO_CLOSE_IMAGE: + out << "AioCloseImage"; + break; + default: + out << "Unknown (" << static_cast(type) << ")"; + break; + } + return out; +} + +} // namespace action +} // namespace rbd_replay diff --git a/src/rbd_replay/ActionTypes.h b/src/rbd_replay/ActionTypes.h new file mode 100644 index 000000000..19e1bb8fa --- /dev/null +++ b/src/rbd_replay/ActionTypes.h @@ -0,0 +1,339 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#ifndef CEPH_RBD_REPLAY_ACTION_TYPES_H +#define CEPH_RBD_REPLAY_ACTION_TYPES_H + +#include "include/int_types.h" +#include "include/buffer_fwd.h" +#include "include/encoding.h" +#include +#include +#include +#include +#include + +namespace ceph { class Formatter; } + +namespace rbd_replay { +namespace action { + +typedef uint64_t imagectx_id_t; +typedef uint64_t thread_id_t; + +/// Even IDs are normal actions, odd IDs are completions. +typedef uint32_t action_id_t; + +static const std::string BANNER("rbd-replay-trace"); + +/** + * Dependencies link actions to earlier actions or completions. + * If an action has a dependency \c d then it waits until \c d.time_delta + * nanoseconds after the action or completion with ID \c d.id has fired. + */ +struct Dependency { + /// ID of the action or completion to wait for. + action_id_t id; + + /// Nanoseconds of delay to wait until after the action or completion fires. + uint64_t time_delta; + + /** + * @param id ID of the action or completion to wait for. + * @param time_delta Nanoseconds of delay to wait after the action or + * completion fires. + */ + Dependency() : id(0), time_delta(0) { + } + Dependency(action_id_t id, uint64_t time_delta) + : id(id), time_delta(time_delta) { + } + + void encode(bufferlist &bl) const; + void decode(bufferlist::const_iterator &it); + void decode(__u8 version, bufferlist::const_iterator &it); + void dump(Formatter *f) const; + + static void generate_test_instances(std::list &o); +}; + +WRITE_CLASS_ENCODER(Dependency); + +typedef std::vector Dependencies; + +enum ActionType { + ACTION_TYPE_START_THREAD = 0, + ACTION_TYPE_STOP_THREAD = 1, + ACTION_TYPE_READ = 2, + ACTION_TYPE_WRITE = 3, + ACTION_TYPE_AIO_READ = 4, + ACTION_TYPE_AIO_WRITE = 5, + ACTION_TYPE_OPEN_IMAGE = 6, + ACTION_TYPE_CLOSE_IMAGE = 7, + ACTION_TYPE_AIO_OPEN_IMAGE = 8, + ACTION_TYPE_AIO_CLOSE_IMAGE = 9, + ACTION_TYPE_DISCARD = 10, + ACTION_TYPE_AIO_DISCARD = 11 +}; + +struct ActionBase { + action_id_t id; + thread_id_t thread_id; + Dependencies dependencies; + + ActionBase() : id(0), thread_id(0) { + } + ActionBase(action_id_t id, thread_id_t thread_id, + const Dependencies &dependencies) + : id(id), thread_id(thread_id), dependencies(dependencies) { + } + + void encode(bufferlist &bl) const; + void decode(__u8 version, bufferlist::const_iterator &it); + void dump(Formatter *f) const; +}; + +struct StartThreadAction : public ActionBase { + static const ActionType ACTION_TYPE = ACTION_TYPE_START_THREAD; + + StartThreadAction() { + } + StartThreadAction(action_id_t id, thread_id_t thread_id, + const Dependencies &dependencies) + : ActionBase(id, thread_id, dependencies) { + } +}; + +struct StopThreadAction : public ActionBase { + static const ActionType ACTION_TYPE = ACTION_TYPE_STOP_THREAD; + + StopThreadAction() { + } + StopThreadAction(action_id_t id, thread_id_t thread_id, + const Dependencies &dependencies) + : ActionBase(id, thread_id, dependencies) { + } +}; + +struct ImageActionBase : public ActionBase { + imagectx_id_t imagectx_id; + + ImageActionBase() : imagectx_id(0) { + } + ImageActionBase(action_id_t id, thread_id_t thread_id, + const Dependencies &dependencies, imagectx_id_t imagectx_id) + : ActionBase(id, thread_id, dependencies), imagectx_id(imagectx_id) { + } + + void encode(bufferlist &bl) const; + void decode(__u8 version, bufferlist::const_iterator &it); + void dump(Formatter *f) const; +}; + +struct IoActionBase : public ImageActionBase { + uint64_t offset; + uint64_t length; + + IoActionBase() : offset(0), length(0) { + } + IoActionBase(action_id_t id, thread_id_t thread_id, + const Dependencies &dependencies, imagectx_id_t imagectx_id, + uint64_t offset, uint64_t length) + : ImageActionBase(id, thread_id, dependencies, imagectx_id), + offset(offset), length(length) { + } + + void encode(bufferlist &bl) const; + void decode(__u8 version, bufferlist::const_iterator &it); + void dump(Formatter *f) const; +}; + +struct ReadAction : public IoActionBase { + static const ActionType ACTION_TYPE = ACTION_TYPE_READ; + + ReadAction() { + } + ReadAction(action_id_t id, thread_id_t thread_id, + const Dependencies &dependencies, imagectx_id_t imagectx_id, + uint64_t offset, uint64_t length) + : IoActionBase(id, thread_id, dependencies, imagectx_id, offset, length) { + } +}; + +struct WriteAction : public IoActionBase { + static const ActionType ACTION_TYPE = ACTION_TYPE_WRITE; + + WriteAction() { + } + WriteAction(action_id_t id, thread_id_t thread_id, + const Dependencies &dependencies, imagectx_id_t imagectx_id, + uint64_t offset, uint64_t length) + : IoActionBase(id, thread_id, dependencies, imagectx_id, offset, length) { + } +}; + +struct DiscardAction : public IoActionBase { + static const ActionType ACTION_TYPE = ACTION_TYPE_DISCARD; + + DiscardAction() { + } + DiscardAction(action_id_t id, thread_id_t thread_id, + const Dependencies &dependencies, imagectx_id_t imagectx_id, + uint64_t offset, uint64_t length) + : IoActionBase(id, thread_id, dependencies, imagectx_id, offset, length) { + } +}; + +struct AioReadAction : public IoActionBase { + static const ActionType ACTION_TYPE = ACTION_TYPE_AIO_READ; + + AioReadAction() { + } + AioReadAction(action_id_t id, thread_id_t thread_id, + const Dependencies &dependencies, imagectx_id_t imagectx_id, + uint64_t offset, uint64_t length) + : IoActionBase(id, thread_id, dependencies, imagectx_id, offset, length) { + } +}; + +struct AioWriteAction : public IoActionBase { + static const ActionType ACTION_TYPE = ACTION_TYPE_AIO_WRITE; + + AioWriteAction() { + } + AioWriteAction(action_id_t id, thread_id_t thread_id, + const Dependencies &dependencies, imagectx_id_t imagectx_id, + uint64_t offset, uint64_t length) + : IoActionBase(id, thread_id, dependencies, imagectx_id, offset, length) { + } +}; + +struct AioDiscardAction : public IoActionBase { + static const ActionType ACTION_TYPE = ACTION_TYPE_AIO_DISCARD; + + AioDiscardAction() { + } + AioDiscardAction(action_id_t id, thread_id_t thread_id, + const Dependencies &dependencies, imagectx_id_t imagectx_id, + uint64_t offset, uint64_t length) + : IoActionBase(id, thread_id, dependencies, imagectx_id, offset, length) { + } +}; + +struct OpenImageAction : public ImageActionBase { + static const ActionType ACTION_TYPE = ACTION_TYPE_OPEN_IMAGE; + + std::string name; + std::string snap_name; + bool read_only; + + OpenImageAction() : read_only(false) { + } + OpenImageAction(action_id_t id, thread_id_t thread_id, + const Dependencies &dependencies, imagectx_id_t imagectx_id, + const std::string &name, const std::string &snap_name, + bool read_only) + : ImageActionBase(id, thread_id, dependencies, imagectx_id), + name(name), snap_name(snap_name), read_only(read_only) { + } + + void encode(bufferlist &bl) const; + void decode(__u8 version, bufferlist::const_iterator &it); + void dump(Formatter *f) const; +}; + +struct CloseImageAction : public ImageActionBase { + static const ActionType ACTION_TYPE = ACTION_TYPE_CLOSE_IMAGE; + + CloseImageAction() { + } + CloseImageAction(action_id_t id, thread_id_t thread_id, + const Dependencies &dependencies, imagectx_id_t imagectx_id) + : ImageActionBase(id, thread_id, dependencies, imagectx_id) { + } +}; + +struct AioOpenImageAction : public ImageActionBase { + static const ActionType ACTION_TYPE = ACTION_TYPE_AIO_OPEN_IMAGE; + + std::string name; + std::string snap_name; + bool read_only; + + AioOpenImageAction() : read_only(false) { + } + AioOpenImageAction(action_id_t id, thread_id_t thread_id, + const Dependencies &dependencies, imagectx_id_t imagectx_id, + const std::string &name, const std::string &snap_name, + bool read_only) + : ImageActionBase(id, thread_id, dependencies, imagectx_id), + name(name), snap_name(snap_name), read_only(read_only) { + } + + void encode(bufferlist &bl) const; + void decode(__u8 version, bufferlist::const_iterator &it); + void dump(Formatter *f) const; +}; + +struct AioCloseImageAction : public ImageActionBase { + static const ActionType ACTION_TYPE = ACTION_TYPE_AIO_CLOSE_IMAGE; + + AioCloseImageAction() { + } + AioCloseImageAction(action_id_t id, thread_id_t thread_id, + const Dependencies &dependencies, imagectx_id_t imagectx_id) + : ImageActionBase(id, thread_id, dependencies, imagectx_id) { + } +}; + +struct UnknownAction { + static const ActionType ACTION_TYPE = static_cast(-1); + + void encode(bufferlist &bl) const; + void decode(__u8 version, bufferlist::const_iterator &it); + void dump(Formatter *f) const; +}; + +typedef boost::variant Action; + +class ActionEntry { +public: + Action action; + + ActionEntry() : action(UnknownAction()) { + } + ActionEntry(const Action &action) : action(action) { + } + + void encode(bufferlist &bl) const; + void decode(bufferlist::const_iterator &it); + void decode_unversioned(bufferlist::const_iterator &it); + void dump(Formatter *f) const; + + static void generate_test_instances(std::list &o); + +private: + void decode_versioned(__u8 version, bufferlist::const_iterator &it); +}; + +WRITE_CLASS_ENCODER(ActionEntry); + +std::ostream &operator<<(std::ostream &out, + const rbd_replay::action::ActionType &type); + +} // namespace action +} // namespace rbd_replay + +#endif // CEPH_RBD_REPLAY_ACTION_TYPES_H diff --git a/src/rbd_replay/BoundedBuffer.hpp b/src/rbd_replay/BoundedBuffer.hpp new file mode 100644 index 000000000..00fb1cb32 --- /dev/null +++ b/src/rbd_replay/BoundedBuffer.hpp @@ -0,0 +1,71 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#ifndef _INCLUDED_BOUNDED_BUFFER_HPP +#define _INCLUDED_BOUNDED_BUFFER_HPP + +#include +#include +#include +#include + +/** + Blocking, fixed-capacity, thread-safe FIFO queue useful for communicating between threads. + This code was taken from the Boost docs: http://www.boost.org/doc/libs/1_55_0/libs/circular_buffer/example/circular_buffer_bound_example.cpp + */ +template +class BoundedBuffer { +public: + typedef boost::circular_buffer container_type; + typedef typename container_type::size_type size_type; + typedef typename container_type::value_type value_type; + typedef typename boost::call_traits::param_type param_type; + + explicit BoundedBuffer(size_type capacity) : m_unread(0), m_container(capacity) { + } + + /** + Inserts an element into the queue. + Blocks if the queue is full. + */ + void push_front(typename boost::call_traits::param_type item) { + // `param_type` represents the "best" way to pass a parameter of type `value_type` to a method. + boost::mutex::scoped_lock lock(m_mutex); + m_not_full.wait(lock, boost::bind(&BoundedBuffer::is_not_full, this)); + m_container.push_front(item); + ++m_unread; + lock.unlock(); + m_not_empty.notify_one(); + } + + /** + Removes an element from the queue. + Blocks if the queue is empty. + */ + void pop_back(value_type* pItem) { + boost::mutex::scoped_lock lock(m_mutex); + m_not_empty.wait(lock, boost::bind(&BoundedBuffer::is_not_empty, this)); + *pItem = m_container[--m_unread]; + lock.unlock(); + m_not_full.notify_one(); + } + +private: + BoundedBuffer(const BoundedBuffer&); // Disabled copy constructor. + BoundedBuffer& operator= (const BoundedBuffer&); // Disabled assign operator. + + bool is_not_empty() const { + return m_unread > 0; + } + bool is_not_full() const { + return m_unread < m_container.capacity(); + } + + size_type m_unread; + container_type m_container; + boost::mutex m_mutex; + boost::condition m_not_empty; + boost::condition m_not_full; +}; + +#endif diff --git a/src/rbd_replay/BufferReader.cc b/src/rbd_replay/BufferReader.cc new file mode 100644 index 000000000..b4dce6515 --- /dev/null +++ b/src/rbd_replay/BufferReader.cc @@ -0,0 +1,37 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#include "rbd_replay/BufferReader.h" +#include "include/ceph_assert.h" +#include "include/intarith.h" + +namespace rbd_replay { + +BufferReader::BufferReader(int fd, size_t min_bytes, size_t max_bytes) + : m_fd(fd), m_min_bytes(min_bytes), m_max_bytes(max_bytes), + m_bl_it(m_bl.begin()), m_eof_reached(false) { + ceph_assert(m_min_bytes <= m_max_bytes); +} + +int BufferReader::fetch(bufferlist::const_iterator **it) { + if (m_bl_it.get_remaining() < m_min_bytes) { + ssize_t bytes_to_read = round_up_to(m_max_bytes - m_bl_it.get_remaining(), + CEPH_PAGE_SIZE); + while (!m_eof_reached && bytes_to_read > 0) { + int r = m_bl.read_fd(m_fd, CEPH_PAGE_SIZE); + if (r < 0) { + return r; + } + if (r == 0) { + m_eof_reached = true; + } + ceph_assert(r <= bytes_to_read); + bytes_to_read -= r; + } + } + + *it = &m_bl_it; + return 0; +} + +} // namespace rbd_replay diff --git a/src/rbd_replay/BufferReader.h b/src/rbd_replay/BufferReader.h new file mode 100644 index 000000000..38a105774 --- /dev/null +++ b/src/rbd_replay/BufferReader.h @@ -0,0 +1,34 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#ifndef CEPH_RBD_REPLAY_BUFFER_READER_H +#define CEPH_RBD_REPLAY_BUFFER_READER_H + +#include "include/int_types.h" +#include "include/buffer.h" + +namespace rbd_replay { + +class BufferReader { +public: + static const size_t DEFAULT_MIN_BYTES = 1<<20; + static const size_t DEFAULT_MAX_BYTES = 1<<22; + + BufferReader(int fd, size_t min_bytes = DEFAULT_MIN_BYTES, + size_t max_bytes = DEFAULT_MAX_BYTES); + + int fetch(bufferlist::const_iterator **it); + +private: + int m_fd; + size_t m_min_bytes; + size_t m_max_bytes; + bufferlist m_bl; + bufferlist::const_iterator m_bl_it; + bool m_eof_reached; + +}; + +} // namespace rbd_replay + +#endif // CEPH_RBD_REPLAY_BUFFER_READER_H diff --git a/src/rbd_replay/CMakeLists.txt b/src/rbd_replay/CMakeLists.txt new file mode 100644 index 000000000..63446626a --- /dev/null +++ b/src/rbd_replay/CMakeLists.txt @@ -0,0 +1,44 @@ +set(librbd_replay_types_srcs + ActionTypes.cc) +add_library(rbd_replay_types STATIC ${librbd_replay_types_srcs}) + +set(librbd_replay_srcs + actions.cc + BufferReader.cc + ImageNameMap.cc + PendingIO.cc + rbd_loc.cc + Replayer.cc) +add_library(rbd_replay STATIC ${librbd_replay_srcs}) +target_link_libraries(rbd_replay + PUBLIC rbd_replay_types + PRIVATE librbd librados global) + +add_executable(rbd-replay + rbd-replay.cc) +target_link_libraries(rbd-replay + librbd librados global rbd_replay ceph-common) +install(TARGETS rbd-replay DESTINATION bin) + +set(librbd_replay_ios_srcs + ios.cc) +add_library(rbd_replay_ios STATIC ${librbd_replay_ios_srcs}) +target_link_libraries(rbd_replay_ios librbd librados global) + +if(HAVE_BABELTRACE) + add_executable(rbd-replay-prep + rbd-replay-prep.cc) + target_link_libraries(rbd-replay-prep + rbd_replay + rbd_replay_ios + librbd + librados + ceph-common + global + babeltrace + babeltrace-ctf + Boost::date_time + ) + install(TARGETS rbd-replay-prep DESTINATION bin) +endif(HAVE_BABELTRACE) + diff --git a/src/rbd_replay/ImageNameMap.cc b/src/rbd_replay/ImageNameMap.cc new file mode 100644 index 000000000..f54265718 --- /dev/null +++ b/src/rbd_replay/ImageNameMap.cc @@ -0,0 +1,69 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2014 Adam Crume + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#include "ImageNameMap.hpp" + + +using namespace std; +using namespace rbd_replay; + + +bool ImageNameMap::parse_mapping(string mapping_string, Mapping *mapping) const { + string fields[2]; + int field = 0; + for (size_t i = 0, n = mapping_string.length(); i < n; i++) { + char c = mapping_string[i]; + switch (c) { + case '\\': + if (i != n - 1 && mapping_string[i + 1] == '=') { + i++; + fields[field].push_back('='); + } else { + fields[field].push_back('\\'); + } + break; + case '=': + if (field == 1) { + return false; + } + field = 1; + break; + default: + fields[field].push_back(c); + } + } + if (field == 0) { + return false; + } + if (!mapping->first.parse(fields[0])) { + return false; + } + if (!mapping->second.parse(fields[1])) { + return false; + } + return true; +} + +void ImageNameMap::add_mapping(const Mapping& mapping) { + m_map.insert(mapping); +} + +rbd_loc ImageNameMap::map(const rbd_loc& name) const { + std::map::const_iterator p(m_map.find(name)); + if (p == m_map.end()) { + return name; + } else { + return p->second; + } +} diff --git a/src/rbd_replay/ImageNameMap.hpp b/src/rbd_replay/ImageNameMap.hpp new file mode 100644 index 000000000..45cdaf686 --- /dev/null +++ b/src/rbd_replay/ImageNameMap.hpp @@ -0,0 +1,54 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2014 Adam Crume + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#ifndef _INCLUDED_RBD_REPLAY_IMAGENAMEMAP_HPP +#define _INCLUDED_RBD_REPLAY_IMAGENAMEMAP_HPP + +#include +#include +#include "rbd_loc.hpp" + +namespace rbd_replay { + +/** + Maps image names. + */ +class ImageNameMap { +public: + typedef std::pair Mapping; + + /** + Parses a mapping. + If parsing fails, the contents of \c mapping are undefined. + @param[in] mapping_string string representation of the mapping + @param[out] mapping stores the parsed mapping + @retval true parsing was successful + */ + bool parse_mapping(std::string mapping_string, Mapping *mapping) const; + + void add_mapping(const Mapping& mapping); + + /** + Maps an image name. + If no mapping matches the name, it is returned unmodified. + */ + rbd_loc map(const rbd_loc& name) const; + +private: + std::map m_map; +}; + +} + +#endif diff --git a/src/rbd_replay/PendingIO.cc b/src/rbd_replay/PendingIO.cc new file mode 100644 index 000000000..089a60aa5 --- /dev/null +++ b/src/rbd_replay/PendingIO.cc @@ -0,0 +1,44 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2014 Adam Crume + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#include "PendingIO.hpp" +#include "rbd_replay_debug.hpp" + +#define dout_context g_ceph_context + +using namespace rbd_replay; + +extern "C" +void rbd_replay_pending_io_callback(librbd::completion_t cb, void *arg) { + PendingIO *io = static_cast(arg); + io->completed(cb); +} + +PendingIO::PendingIO(action_id_t id, + ActionCtx &worker) + : m_id(id), + m_completion(new librbd::RBD::AioCompletion(this, rbd_replay_pending_io_callback)), + m_worker(worker) { + } + +PendingIO::~PendingIO() { + m_completion->release(); +} + +void PendingIO::completed(librbd::completion_t cb) { + dout(ACTION_LEVEL) << "Completed pending IO #" << m_id << dendl; + ssize_t r = m_completion->get_return_value(); + assertf(r >= 0, "id = %d, r = %d", m_id, r); + m_worker.remove_pending(shared_from_this()); +} diff --git a/src/rbd_replay/PendingIO.hpp b/src/rbd_replay/PendingIO.hpp new file mode 100644 index 000000000..3942d5f6f --- /dev/null +++ b/src/rbd_replay/PendingIO.hpp @@ -0,0 +1,64 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2014 Adam Crume + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#ifndef _INCLUDED_RBD_REPLAY_PENDINGIO_HPP +#define _INCLUDED_RBD_REPLAY_PENDINGIO_HPP + +#include +#include "actions.hpp" + +/// Do not call outside of rbd_replay::PendingIO. +extern "C" +void rbd_replay_pending_io_callback(librbd::completion_t cb, void *arg); + +namespace rbd_replay { + +/** + A PendingIO is an I/O operation that has been started but not completed. +*/ +class PendingIO : public boost::enable_shared_from_this { +public: + typedef boost::shared_ptr ptr; + + PendingIO(action_id_t id, + ActionCtx &worker); + + ~PendingIO(); + + action_id_t id() const { + return m_id; + } + + ceph::bufferlist &bufferlist() { + return m_bl; + } + + librbd::RBD::AioCompletion &completion() { + return *m_completion; + } + +private: + void completed(librbd::completion_t cb); + + friend void ::rbd_replay_pending_io_callback(librbd::completion_t cb, void *arg); + + const action_id_t m_id; + ceph::bufferlist m_bl; + librbd::RBD::AioCompletion *m_completion; + ActionCtx &m_worker; +}; + +} + +#endif diff --git a/src/rbd_replay/Replayer.cc b/src/rbd_replay/Replayer.cc new file mode 100644 index 000000000..11e84900d --- /dev/null +++ b/src/rbd_replay/Replayer.cc @@ -0,0 +1,406 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2014 Adam Crume + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#include "Replayer.hpp" +#include "common/errno.h" +#include "include/scope_guard.h" +#include "rbd_replay/ActionTypes.h" +#include "rbd_replay/BufferReader.h" +#include +#include +#include +#include +#include +#include "global/global_context.h" +#include "rbd_replay_debug.hpp" + +#define dout_context g_ceph_context + +using namespace rbd_replay; + +namespace { + +bool is_versioned_replay(BufferReader &buffer_reader) { + bufferlist::const_iterator *it; + int r = buffer_reader.fetch(&it); + if (r < 0) { + return false; + } + + if (it->get_remaining() < action::BANNER.size()) { + return false; + } + + std::string banner; + it->copy(action::BANNER.size(), banner); + bool versioned = (banner == action::BANNER); + if (!versioned) { + it->seek(0); + } + return versioned; +} + +} // anonymous namespace + +Worker::Worker(Replayer &replayer) + : m_replayer(replayer), + m_buffer(100), + m_done(false) { +} + +void Worker::start() { + m_thread = std::make_shared(&Worker::run, this); +} + +// Should only be called by StopThreadAction +void Worker::stop() { + m_done = true; +} + +void Worker::join() { + m_thread->join(); +} + +void Worker::send(Action::ptr action) { + ceph_assert(action); + m_buffer.push_front(action); +} + +void Worker::add_pending(PendingIO::ptr io) { + ceph_assert(io); + std::scoped_lock lock{m_pending_ios_mutex}; + assertf(m_pending_ios.count(io->id()) == 0, "id = %d", io->id()); + m_pending_ios[io->id()] = io; +} + +void Worker::run() { + dout(THREAD_LEVEL) << "Worker thread started" << dendl; + while (!m_done) { + Action::ptr action; + m_buffer.pop_back(&action); + m_replayer.wait_for_actions(action->predecessors()); + action->perform(*this); + m_replayer.set_action_complete(action->id()); + } + { + std::unique_lock lock{m_pending_ios_mutex}; + bool first_time = true; + while (!m_pending_ios.empty()) { + if (!first_time) { + dout(THREAD_LEVEL) << "Worker thread trying to stop, still waiting for " << m_pending_ios.size() << " pending IOs to complete:" << dendl; + pair p; + BOOST_FOREACH(p, m_pending_ios) { + dout(THREAD_LEVEL) << "> " << p.first << dendl; + } + } + m_pending_ios_empty.wait_for(lock, std::chrono::seconds(1)); + first_time = false; + } + } + dout(THREAD_LEVEL) << "Worker thread stopped" << dendl; +} + + +void Worker::remove_pending(PendingIO::ptr io) { + ceph_assert(io); + m_replayer.set_action_complete(io->id()); + std::scoped_lock lock{m_pending_ios_mutex}; + size_t num_erased = m_pending_ios.erase(io->id()); + assertf(num_erased == 1, "id = %d", io->id()); + if (m_pending_ios.empty()) { + m_pending_ios_empty.notify_all(); + } +} + + +librbd::Image* Worker::get_image(imagectx_id_t imagectx_id) { + return m_replayer.get_image(imagectx_id); +} + + +void Worker::put_image(imagectx_id_t imagectx_id, librbd::Image* image) { + ceph_assert(image); + m_replayer.put_image(imagectx_id, image); +} + + +void Worker::erase_image(imagectx_id_t imagectx_id) { + m_replayer.erase_image(imagectx_id); +} + + +librbd::RBD* Worker::rbd() { + return m_replayer.get_rbd(); +} + + +librados::IoCtx* Worker::ioctx() { + return m_replayer.get_ioctx(); +} + +void Worker::set_action_complete(action_id_t id) { + m_replayer.set_action_complete(id); +} + +bool Worker::readonly() const { + return m_replayer.readonly(); +} + +rbd_loc Worker::map_image_name(string image_name, string snap_name) const { + return m_replayer.image_name_map().map(rbd_loc("", image_name, snap_name)); +} + + +Replayer::Replayer(int num_action_trackers) + : m_rbd(NULL), m_ioctx(0), + m_latency_multiplier(1.0), + m_readonly(false), m_dump_perf_counters(false), + m_num_action_trackers(num_action_trackers), + m_action_trackers(new action_tracker_d[m_num_action_trackers]) { + assertf(num_action_trackers > 0, "num_action_trackers = %d", num_action_trackers); +} + +Replayer::~Replayer() { + delete[] m_action_trackers; +} + +Replayer::action_tracker_d &Replayer::tracker_for(action_id_t id) { + return m_action_trackers[id % m_num_action_trackers]; +} + +void Replayer::run(const std::string& replay_file) { + { + librados::Rados rados; + rados.init(NULL); + int r = rados.init_with_context(g_ceph_context); + if (r) { + cerr << "Failed to initialize RADOS: " << cpp_strerror(r) << std::endl; + goto out; + } + r = rados.connect(); + if (r) { + cerr << "Failed to connect to cluster: " << cpp_strerror(r) << std::endl; + goto out; + } + + if (m_pool_name.empty()) { + r = rados.conf_get("rbd_default_pool", m_pool_name); + if (r < 0) { + cerr << "Failed to retrieve default pool: " << cpp_strerror(r) + << std::endl; + goto out; + } + } + + m_ioctx = new librados::IoCtx(); + { + r = rados.ioctx_create(m_pool_name.c_str(), *m_ioctx); + if (r) { + cerr << "Failed to open pool " << m_pool_name << ": " + << cpp_strerror(r) << std::endl; + goto out2; + } + m_rbd = new librbd::RBD(); + map workers; + + int fd = open(replay_file.c_str(), O_RDONLY|O_BINARY); + if (fd < 0) { + std::cerr << "Failed to open " << replay_file << ": " + << cpp_strerror(errno) << std::endl; + exit(1); + } + auto close_fd = make_scope_guard([fd] { close(fd); }); + + BufferReader buffer_reader(fd); + bool versioned = is_versioned_replay(buffer_reader); + while (true) { + action::ActionEntry action_entry; + try { + bufferlist::const_iterator *it; + int r = buffer_reader.fetch(&it); + if (r < 0) { + std::cerr << "Failed to read from trace file: " << cpp_strerror(r) + << std::endl; + exit(-r); + } + if (it->get_remaining() == 0) { + break; + } + + if (versioned) { + action_entry.decode(*it); + } else { + action_entry.decode_unversioned(*it); + } + } catch (const buffer::error &err) { + std::cerr << "Failed to decode trace action: " << err.what() << std::endl; + exit(1); + } + + Action::ptr action = Action::construct(action_entry); + if (!action) { + // unknown / unsupported action + continue; + } + + if (action->is_start_thread()) { + Worker *worker = new Worker(*this); + workers[action->thread_id()] = worker; + worker->start(); + } else { + workers[action->thread_id()]->send(action); + } + } + + dout(THREAD_LEVEL) << "Waiting for workers to die" << dendl; + pair w; + BOOST_FOREACH(w, workers) { + w.second->join(); + delete w.second; + } + clear_images(); + delete m_rbd; + m_rbd = NULL; + } + out2: + delete m_ioctx; + m_ioctx = NULL; + } + out: + ; +} + + +librbd::Image* Replayer::get_image(imagectx_id_t imagectx_id) { + std::scoped_lock lock{m_images_mutex}; + return m_images[imagectx_id]; +} + +void Replayer::put_image(imagectx_id_t imagectx_id, librbd::Image *image) { + ceph_assert(image); + std::unique_lock lock{m_images_mutex}; + ceph_assert(m_images.count(imagectx_id) == 0); + m_images[imagectx_id] = image; +} + +void Replayer::erase_image(imagectx_id_t imagectx_id) { + std::unique_lock lock{m_images_mutex}; + librbd::Image* image = m_images[imagectx_id]; + if (m_dump_perf_counters) { + string command = "perf dump"; + cmdmap_t cmdmap; + JSONFormatter jf(true); + bufferlist out; + stringstream ss; + g_ceph_context->do_command(command, cmdmap, &jf, ss, &out); + jf.flush(cout); + cout << std::endl; + cout.flush(); + } + delete image; + m_images.erase(imagectx_id); +} + +void Replayer::set_action_complete(action_id_t id) { + dout(DEPGRAPH_LEVEL) << "ActionTracker::set_complete(" << id << ")" << dendl; + auto now = std::chrono::system_clock::now(); + action_tracker_d &tracker = tracker_for(id); + std::unique_lock lock{tracker.mutex}; + ceph_assert(tracker.actions.count(id) == 0); + tracker.actions[id] = now; + tracker.condition.notify_all(); +} + +bool Replayer::is_action_complete(action_id_t id) { + action_tracker_d &tracker = tracker_for(id); + std::shared_lock lock{tracker.mutex}; + return tracker.actions.count(id) > 0; +} + +void Replayer::wait_for_actions(const action::Dependencies &deps) { + auto release_time = std::chrono::time_point::min(); + for(auto& dep : deps) { + dout(DEPGRAPH_LEVEL) << "Waiting for " << dep.id << dendl; + auto start_time = std::chrono::system_clock::now(); + action_tracker_d &tracker = tracker_for(dep.id); + std::unique_lock lock{tracker.mutex}; + bool first_time = true; + while (tracker.actions.count(dep.id) == 0) { + if (!first_time) { + dout(DEPGRAPH_LEVEL) << "Still waiting for " << dep.id << dendl; + } + tracker.condition.wait_for(lock, std::chrono::seconds(1)); + first_time = false; + } + auto action_completed_time(tracker.actions[dep.id]); + lock.unlock(); + auto end_time = std::chrono::system_clock::now(); + auto micros = std::chrono::duration_cast(end_time - start_time).count(); + dout(DEPGRAPH_LEVEL) << "Finished waiting for " << dep.id << " after " << micros << " microseconds" << dendl; + // Apparently the nanoseconds constructor is optional: + // http://www.boost.org/doc/libs/1_46_0/doc/html/date_time/details.html#compile_options + auto sub_release_time{action_completed_time + + std::chrono::microseconds{static_cast(dep.time_delta * m_latency_multiplier / 1000)}}; + if (sub_release_time > release_time) { + release_time = sub_release_time; + } + } + if (release_time > std::chrono::system_clock::now()) { + auto sleep_for = release_time - std::chrono::system_clock::now(); + dout(SLEEP_LEVEL) << "Sleeping for " + << std::chrono::duration_cast(sleep_for).count() + << " microseconds" << dendl; + std::this_thread::sleep_until(release_time); + } +} + +void Replayer::clear_images() { + std::shared_lock lock{m_images_mutex}; + if (m_dump_perf_counters && !m_images.empty()) { + string command = "perf dump"; + cmdmap_t cmdmap; + JSONFormatter jf(true); + bufferlist out; + stringstream ss; + g_ceph_context->do_command(command, cmdmap, &jf, ss, &out); + jf.flush(cout); + cout << std::endl; + cout.flush(); + } + for (auto& p : m_images) { + delete p.second; + } + m_images.clear(); +} + +void Replayer::set_latency_multiplier(float f) { + assertf(f >= 0, "f = %f", f); + m_latency_multiplier = f; +} + +bool Replayer::readonly() const { + return m_readonly; +} + +void Replayer::set_readonly(bool readonly) { + m_readonly = readonly; +} + +string Replayer::pool_name() const { + return m_pool_name; +} + +void Replayer::set_pool_name(string pool_name) { + m_pool_name = pool_name; +} diff --git a/src/rbd_replay/Replayer.hpp b/src/rbd_replay/Replayer.hpp new file mode 100644 index 000000000..ddbd05743 --- /dev/null +++ b/src/rbd_replay/Replayer.hpp @@ -0,0 +1,167 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2014 Adam Crume + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#ifndef _INCLUDED_RBD_REPLAY_REPLAYER_HPP +#define _INCLUDED_RBD_REPLAY_REPLAYER_HPP + +#include +#include +#include +#include +#include "rbd_replay/ActionTypes.h" +#include "BoundedBuffer.hpp" +#include "ImageNameMap.hpp" +#include "PendingIO.hpp" + +namespace rbd_replay { + +class Replayer; + +/** + Performs Actions within a single thread. + */ +class Worker : public ActionCtx { +public: + explicit Worker(Replayer &replayer); + + void start(); + + /// Should only be called by StopThreadAction + void stop() override; + + void join(); + + void send(Action::ptr action); + + void add_pending(PendingIO::ptr io) override; + + void remove_pending(PendingIO::ptr io) override; + + librbd::Image* get_image(imagectx_id_t imagectx_id) override; + + void put_image(imagectx_id_t imagectx_id, librbd::Image* image) override; + + void erase_image(imagectx_id_t imagectx_id) override; + + librbd::RBD* rbd() override; + + librados::IoCtx* ioctx() override; + + void set_action_complete(action_id_t id) override; + + bool readonly() const override; + + rbd_loc map_image_name(std::string image_name, std::string snap_name) const override; + +private: + void run(); + + Replayer &m_replayer; + BoundedBuffer m_buffer; + std::shared_ptr m_thread; + std::map m_pending_ios; + std::mutex m_pending_ios_mutex; + std::condition_variable_any m_pending_ios_empty; + bool m_done; +}; + + +class Replayer { +public: + explicit Replayer(int num_action_trackers); + + ~Replayer(); + + void run(const std::string &replay_file); + + librbd::RBD* get_rbd() { + return m_rbd; + } + + librados::IoCtx* get_ioctx() { + return m_ioctx; + } + + librbd::Image* get_image(imagectx_id_t imagectx_id); + + void put_image(imagectx_id_t imagectx_id, librbd::Image *image); + + void erase_image(imagectx_id_t imagectx_id); + + void set_action_complete(action_id_t id); + + bool is_action_complete(action_id_t id); + + void wait_for_actions(const action::Dependencies &deps); + + std::string pool_name() const; + + void set_pool_name(std::string pool_name); + + void set_latency_multiplier(float f); + + bool readonly() const; + + void set_readonly(bool readonly); + + void set_image_name_map(const ImageNameMap &map) { + m_image_name_map = map; + } + + void set_dump_perf_counters(bool dump_perf_counters) { + m_dump_perf_counters = dump_perf_counters; + } + + const ImageNameMap &image_name_map() const { + return m_image_name_map; + } + +private: + struct action_tracker_d { + /// Maps an action ID to the time the action completed + std::map actions; + std::shared_mutex mutex; + std::condition_variable_any condition; + }; + + void clear_images(); + + action_tracker_d &tracker_for(action_id_t id); + + /// Disallow copying + Replayer(const Replayer& rhs); + /// Disallow assignment + const Replayer& operator=(const Replayer& rhs); + + librbd::RBD* m_rbd; + librados::IoCtx* m_ioctx; + std::string m_pool_name; + float m_latency_multiplier; + bool m_readonly; + ImageNameMap m_image_name_map; + bool m_dump_perf_counters; + + std::map m_images; + std::shared_mutex m_images_mutex; + + /// Actions are hashed across the trackers by ID. + /// Number of trackers should probably be larger than the number of cores and prime. + /// Should definitely be odd. + const int m_num_action_trackers; + action_tracker_d* m_action_trackers; +}; + +} + +#endif diff --git a/src/rbd_replay/actions.cc b/src/rbd_replay/actions.cc new file mode 100644 index 000000000..3a95c399f --- /dev/null +++ b/src/rbd_replay/actions.cc @@ -0,0 +1,249 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2014 Adam Crume + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#include "actions.hpp" +#include +#include +#include "PendingIO.hpp" +#include "rbd_replay_debug.hpp" + +#define dout_context g_ceph_context + +using namespace rbd_replay; + +namespace { + +std::string create_fake_data() { + char data[1 << 20]; // 1 MB + for (unsigned int i = 0; i < sizeof(data); i++) { + data[i] = (char) i; + } + return std::string(data, sizeof(data)); +} + +struct ConstructVisitor : public boost::static_visitor { + inline Action::ptr operator()(const action::StartThreadAction &action) const { + return Action::ptr(new StartThreadAction(action)); + } + + inline Action::ptr operator()(const action::StopThreadAction &action) const{ + return Action::ptr(new StopThreadAction(action)); + } + + inline Action::ptr operator()(const action::ReadAction &action) const { + return Action::ptr(new ReadAction(action)); + } + + inline Action::ptr operator()(const action::AioReadAction &action) const { + return Action::ptr(new AioReadAction(action)); + } + + inline Action::ptr operator()(const action::WriteAction &action) const { + return Action::ptr(new WriteAction(action)); + } + + inline Action::ptr operator()(const action::AioWriteAction &action) const { + return Action::ptr(new AioWriteAction(action)); + } + + inline Action::ptr operator()(const action::DiscardAction &action) const { + return Action::ptr(new DiscardAction(action)); + } + + inline Action::ptr operator()(const action::AioDiscardAction &action) const { + return Action::ptr(new AioDiscardAction(action)); + } + + inline Action::ptr operator()(const action::OpenImageAction &action) const { + return Action::ptr(new OpenImageAction(action)); + } + + inline Action::ptr operator()(const action::CloseImageAction &action) const { + return Action::ptr(new CloseImageAction(action)); + } + + inline Action::ptr operator()(const action::AioOpenImageAction &action) const { + return Action::ptr(new AioOpenImageAction(action)); + } + + inline Action::ptr operator()(const action::AioCloseImageAction &action) const { + return Action::ptr(new AioCloseImageAction(action)); + } + + inline Action::ptr operator()(const action::UnknownAction &action) const { + return Action::ptr(); + } +}; + +} // anonymous namespace + +std::ostream& rbd_replay::operator<<(std::ostream& o, const Action& a) { + return a.dump(o); +} + +Action::ptr Action::construct(const action::ActionEntry &action_entry) { + return boost::apply_visitor(ConstructVisitor(), action_entry.action); +} + +void StartThreadAction::perform(ActionCtx &ctx) { + cerr << "StartThreadAction should never actually be performed" << std::endl; + exit(1); +} + +void StopThreadAction::perform(ActionCtx &ctx) { + dout(ACTION_LEVEL) << "Performing " << *this << dendl; + ctx.stop(); +} + +void AioReadAction::perform(ActionCtx &worker) { + dout(ACTION_LEVEL) << "Performing " << *this << dendl; + librbd::Image *image = worker.get_image(m_action.imagectx_id); + ceph_assert(image); + PendingIO::ptr io(new PendingIO(pending_io_id(), worker)); + worker.add_pending(io); + int r = image->aio_read(m_action.offset, m_action.length, io->bufferlist(), &io->completion()); + assertf(r >= 0, "id = %d, r = %d", id(), r); +} + +void ReadAction::perform(ActionCtx &worker) { + dout(ACTION_LEVEL) << "Performing " << *this << dendl; + librbd::Image *image = worker.get_image(m_action.imagectx_id); + PendingIO::ptr io(new PendingIO(pending_io_id(), worker)); + worker.add_pending(io); + ssize_t r = image->read(m_action.offset, m_action.length, io->bufferlist()); + assertf(r >= 0, "id = %d, r = %d", id(), r); + worker.remove_pending(io); +} + +void AioWriteAction::perform(ActionCtx &worker) { + static const std::string fake_data(create_fake_data()); + dout(ACTION_LEVEL) << "Performing " << *this << dendl; + librbd::Image *image = worker.get_image(m_action.imagectx_id); + PendingIO::ptr io(new PendingIO(pending_io_id(), worker)); + uint64_t remaining = m_action.length; + while (remaining > 0) { + uint64_t n = std::min(remaining, (uint64_t)fake_data.length()); + io->bufferlist().append(fake_data.data(), n); + remaining -= n; + } + worker.add_pending(io); + if (worker.readonly()) { + worker.remove_pending(io); + } else { + int r = image->aio_write(m_action.offset, m_action.length, io->bufferlist(), &io->completion()); + assertf(r >= 0, "id = %d, r = %d", id(), r); + } +} + +void WriteAction::perform(ActionCtx &worker) { + dout(ACTION_LEVEL) << "Performing " << *this << dendl; + librbd::Image *image = worker.get_image(m_action.imagectx_id); + PendingIO::ptr io(new PendingIO(pending_io_id(), worker)); + worker.add_pending(io); + io->bufferlist().append_zero(m_action.length); + if (!worker.readonly()) { + ssize_t r = image->write(m_action.offset, m_action.length, io->bufferlist()); + assertf(r >= 0, "id = %d, r = %d", id(), r); + } + worker.remove_pending(io); +} + +void AioDiscardAction::perform(ActionCtx &worker) { + dout(ACTION_LEVEL) << "Performing " << *this << dendl; + librbd::Image *image = worker.get_image(m_action.imagectx_id); + PendingIO::ptr io(new PendingIO(pending_io_id(), worker)); + worker.add_pending(io); + if (worker.readonly()) { + worker.remove_pending(io); + } else { + int r = image->aio_discard(m_action.offset, m_action.length, &io->completion()); + assertf(r >= 0, "id = %d, r = %d", id(), r); + } +} + +void DiscardAction::perform(ActionCtx &worker) { + dout(ACTION_LEVEL) << "Performing " << *this << dendl; + librbd::Image *image = worker.get_image(m_action.imagectx_id); + PendingIO::ptr io(new PendingIO(pending_io_id(), worker)); + worker.add_pending(io); + if (!worker.readonly()) { + ssize_t r = image->discard(m_action.offset, m_action.length); + assertf(r >= 0, "id = %d, r = %d", id(), r); + } + worker.remove_pending(io); +} + +void OpenImageAction::perform(ActionCtx &worker) { + dout(ACTION_LEVEL) << "Performing " << *this << dendl; + PendingIO::ptr io(new PendingIO(pending_io_id(), worker)); + worker.add_pending(io); + librbd::Image *image = new librbd::Image(); + librbd::RBD *rbd = worker.rbd(); + rbd_loc name(worker.map_image_name(m_action.name, m_action.snap_name)); + int r; + if (m_action.read_only || worker.readonly()) { + r = rbd->open_read_only(*worker.ioctx(), *image, name.image.c_str(), name.snap.c_str()); + } else { + r = rbd->open(*worker.ioctx(), *image, name.image.c_str(), name.snap.c_str()); + } + if (r) { + cerr << "Unable to open image '" << m_action.name + << "' with snap '" << m_action.snap_name + << "' (mapped to '" << name.str() + << "') and readonly " << m_action.read_only + << ": (" << -r << ") " << strerror(-r) << std::endl; + exit(1); + } + worker.put_image(m_action.imagectx_id, image); + worker.remove_pending(io); +} + +void CloseImageAction::perform(ActionCtx &worker) { + dout(ACTION_LEVEL) << "Performing " << *this << dendl; + worker.erase_image(m_action.imagectx_id); + worker.set_action_complete(pending_io_id()); +} + +void AioOpenImageAction::perform(ActionCtx &worker) { + dout(ACTION_LEVEL) << "Performing " << *this << dendl; + // TODO: Make it async + PendingIO::ptr io(new PendingIO(pending_io_id(), worker)); + worker.add_pending(io); + librbd::Image *image = new librbd::Image(); + librbd::RBD *rbd = worker.rbd(); + rbd_loc name(worker.map_image_name(m_action.name, m_action.snap_name)); + int r; + if (m_action.read_only || worker.readonly()) { + r = rbd->open_read_only(*worker.ioctx(), *image, name.image.c_str(), name.snap.c_str()); + } else { + r = rbd->open(*worker.ioctx(), *image, name.image.c_str(), name.snap.c_str()); + } + if (r) { + cerr << "Unable to open image '" << m_action.name + << "' with snap '" << m_action.snap_name + << "' (mapped to '" << name.str() + << "') and readonly " << m_action.read_only + << ": (" << -r << ") " << strerror(-r) << std::endl; + exit(1); + } + worker.put_image(m_action.imagectx_id, image); + worker.remove_pending(io); +} + +void AioCloseImageAction::perform(ActionCtx &worker) { + dout(ACTION_LEVEL) << "Performing " << *this << dendl; + // TODO: Make it async + worker.erase_image(m_action.imagectx_id); + worker.set_action_complete(pending_io_id()); +} diff --git a/src/rbd_replay/actions.hpp b/src/rbd_replay/actions.hpp new file mode 100644 index 000000000..89e483158 --- /dev/null +++ b/src/rbd_replay/actions.hpp @@ -0,0 +1,344 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2014 Adam Crume + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#ifndef _INCLUDED_RBD_REPLAY_ACTIONS_HPP +#define _INCLUDED_RBD_REPLAY_ACTIONS_HPP + +#include +#include "include/rbd/librbd.hpp" +#include "common/Formatter.h" +#include "rbd_replay/ActionTypes.h" +#include "rbd_loc.hpp" +#include + +// Stupid Doxygen requires this or else the typedef docs don't appear anywhere. +/// @file rbd_replay/actions.hpp + +namespace rbd_replay { + +typedef uint64_t imagectx_id_t; +typedef uint64_t thread_id_t; + +/// Even IDs are normal actions, odd IDs are completions. +typedef uint32_t action_id_t; + +class PendingIO; + +/** + %Context through which an Action interacts with its environment. + */ +class ActionCtx { +public: + virtual ~ActionCtx() { + } + + /** + Returns the image with the given ID. + The image must have been previously tracked with put_image(imagectx_id_t,librbd::Image*). + */ + virtual librbd::Image* get_image(imagectx_id_t imagectx_id) = 0; + + /** + Tracks an image. + put_image(imagectx_id_t,librbd::Image*) must not have been called previously with the same ID, + and the image must not be NULL. + */ + virtual void put_image(imagectx_id_t imagectx_id, librbd::Image* image) = 0; + + /** + Stops tracking an Image and release it. + This deletes the C++ object, not the image itself. + The image must have been previously tracked with put_image(imagectx_id_t,librbd::Image*). + */ + virtual void erase_image(imagectx_id_t imagectx_id) = 0; + + virtual librbd::RBD* rbd() = 0; + + virtual librados::IoCtx* ioctx() = 0; + + virtual void add_pending(boost::shared_ptr io) = 0; + + virtual bool readonly() const = 0; + + virtual void remove_pending(boost::shared_ptr io) = 0; + + virtual void set_action_complete(action_id_t id) = 0; + + virtual void stop() = 0; + + /** + Maps an image name from the name in the original trace to the name that should be used when replaying. + @param image_name name of the image in the original trace + @param snap_name name of the snap in the original trace + @return image name to replay against + */ + virtual rbd_loc map_image_name(std::string image_name, std::string snap_name) const = 0; +}; + + +/** + Performs an %IO or a maintenance action such as starting or stopping a thread. + Actions are read from a replay file and scheduled by Replayer. + Corresponds to the IO class, except that Actions are executed by rbd-replay, + and IOs are used by rbd-replay-prep for processing the raw trace. + */ +class Action { +public: + typedef boost::shared_ptr ptr; + + virtual ~Action() { + } + + virtual void perform(ActionCtx &ctx) = 0; + + /// Returns the ID of the completion corresponding to this action. + action_id_t pending_io_id() { + return id() + 1; + } + + // There's probably a better way to do this, but oh well. + virtual bool is_start_thread() { + return false; + } + + virtual action_id_t id() const = 0; + virtual thread_id_t thread_id() const = 0; + virtual const action::Dependencies& predecessors() const = 0; + + virtual std::ostream& dump(std::ostream& o) const = 0; + + static ptr construct(const action::ActionEntry &action_entry); +}; + +template +class TypedAction : public Action { +public: + explicit TypedAction(const ActionType &action) : m_action(action) { + } + + action_id_t id() const override { + return m_action.id; + } + + thread_id_t thread_id() const override { + return m_action.thread_id; + } + + const action::Dependencies& predecessors() const override { + return m_action.dependencies; + } + + std::ostream& dump(std::ostream& o) const override { + o << get_action_name() << ": "; + ceph::JSONFormatter formatter(false); + formatter.open_object_section(""); + m_action.dump(&formatter); + formatter.close_section(); + formatter.flush(o); + return o; + } + +protected: + const ActionType m_action; + + virtual const char *get_action_name() const = 0; +}; + +/// Writes human-readable debug information about the action to the stream. +/// @related Action +std::ostream& operator<<(std::ostream& o, const Action& a); + +class StartThreadAction : public TypedAction { +public: + explicit StartThreadAction(const action::StartThreadAction &action) + : TypedAction(action) { + } + + bool is_start_thread() override { + return true; + } + void perform(ActionCtx &ctx) override; + +protected: + const char *get_action_name() const override { + return "StartThreadAction"; + } +}; + +class StopThreadAction : public TypedAction { +public: + explicit StopThreadAction(const action::StopThreadAction &action) + : TypedAction(action) { + } + + void perform(ActionCtx &ctx) override; + +protected: + const char *get_action_name() const override { + return "StartThreadAction"; + } +}; + + +class AioReadAction : public TypedAction { +public: + explicit AioReadAction(const action::AioReadAction &action) + : TypedAction(action) { + } + + void perform(ActionCtx &ctx) override; + +protected: + const char *get_action_name() const override { + return "AioReadAction"; + } +}; + + +class ReadAction : public TypedAction { +public: + explicit ReadAction(const action::ReadAction &action) + : TypedAction(action) { + } + + void perform(ActionCtx &ctx) override; + +protected: + const char *get_action_name() const override { + return "ReadAction"; + } +}; + + +class AioWriteAction : public TypedAction { +public: + explicit AioWriteAction(const action::AioWriteAction &action) + : TypedAction(action) { + } + + void perform(ActionCtx &ctx) override; + +protected: + const char *get_action_name() const override { + return "AioWriteAction"; + } +}; + + +class WriteAction : public TypedAction { +public: + explicit WriteAction(const action::WriteAction &action) + : TypedAction(action) { + } + + void perform(ActionCtx &ctx) override; + +protected: + const char *get_action_name() const override { + return "WriteAction"; + } +}; + + +class AioDiscardAction : public TypedAction { +public: + explicit AioDiscardAction(const action::AioDiscardAction &action) + : TypedAction(action) { + } + + void perform(ActionCtx &ctx) override; + +protected: + const char *get_action_name() const override { + return "AioDiscardAction"; + } +}; + + +class DiscardAction : public TypedAction { +public: + explicit DiscardAction(const action::DiscardAction &action) + : TypedAction(action) { + } + + void perform(ActionCtx &ctx) override; + +protected: + const char *get_action_name() const override { + return "DiscardAction"; + } +}; + + +class OpenImageAction : public TypedAction { +public: + explicit OpenImageAction(const action::OpenImageAction &action) + : TypedAction(action) { + } + + void perform(ActionCtx &ctx) override; + +protected: + const char *get_action_name() const override { + return "OpenImageAction"; + } +}; + + +class CloseImageAction : public TypedAction { +public: + explicit CloseImageAction(const action::CloseImageAction &action) + : TypedAction(action) { + } + + void perform(ActionCtx &ctx) override; + +protected: + const char *get_action_name() const override { + return "CloseImageAction"; + } +}; + +class AioOpenImageAction : public TypedAction { +public: + explicit AioOpenImageAction(const action::AioOpenImageAction &action) + : TypedAction(action) { + } + + void perform(ActionCtx &ctx) override; + +protected: + const char *get_action_name() const override { + return "AioOpenImageAction"; + } +}; + + +class AioCloseImageAction : public TypedAction { +public: + explicit AioCloseImageAction(const action::AioCloseImageAction &action) + : TypedAction(action) { + } + + void perform(ActionCtx &ctx) override; + +protected: + const char *get_action_name() const override { + return "AioCloseImageAction"; + } +}; + +} + +#endif diff --git a/src/rbd_replay/ios.cc b/src/rbd_replay/ios.cc new file mode 100644 index 000000000..77b4f4858 --- /dev/null +++ b/src/rbd_replay/ios.cc @@ -0,0 +1,220 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2014 Adam Crume + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +// This code assumes that IO IDs and timestamps are related monotonically. +// In other words, (a.id < b.id) == (a.timestamp < b.timestamp) for all IOs a and b. + +#include "ios.hpp" +#include "rbd_replay/ActionTypes.h" + +using namespace std; +using namespace rbd_replay; + +namespace { + +bool compare_dependencies_by_start_time(const action::Dependency &lhs, + const action::Dependency &rhs) { + return lhs.time_delta < rhs.time_delta; +} + +action::Dependencies convert_dependencies(uint64_t start_time, + const io_set_t &deps) { + action::Dependencies action_deps; + action_deps.reserve(deps.size()); + for (io_set_t::const_iterator it = deps.begin(); it != deps.end(); ++it) { + boost::shared_ptr io = *it; + uint64_t time_delta = 0; + if (start_time >= io->start_time()) { + time_delta = start_time - io->start_time(); + } + action_deps.push_back(action::Dependency(io->ionum(), time_delta)); + } + std::sort(action_deps.begin(), action_deps.end(), + compare_dependencies_by_start_time); + return action_deps; +} + +} // anonymous namespace + +void IO::write_debug_base(ostream& out, const string &type) const { + out << m_ionum << ": " << m_start_time / 1000000.0 << ": " << type << ", thread = " << m_thread_id << ", deps = {"; + bool first = true; + for (io_set_t::iterator itr = m_dependencies.begin(), end = m_dependencies.end(); itr != end; ++itr) { + if (first) { + first = false; + } else { + out << ", "; + } + out << (*itr)->m_ionum << ": " << m_start_time - (*itr)->m_start_time; + } + out << "}"; +} + + +ostream& operator<<(ostream &out, const IO::ptr &io) { + io->write_debug(out); + return out; +} + +void StartThreadIO::encode(bufferlist &bl) const { + using ceph::encode; + action::Action action((action::StartThreadAction( + ionum(), thread_id(), convert_dependencies(start_time(), dependencies())))); + encode(action, bl); +} + +void StartThreadIO::write_debug(std::ostream& out) const { + write_debug_base(out, "start thread"); +} + +void StopThreadIO::encode(bufferlist &bl) const { + using ceph::encode; + action::Action action((action::StopThreadAction( + ionum(), thread_id(), convert_dependencies(start_time(), dependencies())))); + encode(action, bl); +} + +void StopThreadIO::write_debug(std::ostream& out) const { + write_debug_base(out, "stop thread"); +} + +void ReadIO::encode(bufferlist &bl) const { + using ceph::encode; + action::Action action((action::ReadAction( + ionum(), thread_id(), convert_dependencies(start_time(), dependencies()), + m_imagectx, m_offset, m_length))); + encode(action, bl); +} + +void ReadIO::write_debug(std::ostream& out) const { + write_debug_base(out, "read"); + out << ", imagectx=" << m_imagectx << ", offset=" << m_offset << ", length=" << m_length << "]"; +} + +void WriteIO::encode(bufferlist &bl) const { + using ceph::encode; + action::Action action((action::WriteAction( + ionum(), thread_id(), convert_dependencies(start_time(), dependencies()), + m_imagectx, m_offset, m_length))); + encode(action, bl); +} + +void WriteIO::write_debug(std::ostream& out) const { + write_debug_base(out, "write"); + out << ", imagectx=" << m_imagectx << ", offset=" << m_offset << ", length=" << m_length << "]"; +} + +void DiscardIO::encode(bufferlist &bl) const { + using ceph::encode; + action::Action action((action::DiscardAction( + ionum(), thread_id(), convert_dependencies(start_time(), dependencies()), + m_imagectx, m_offset, m_length))); + encode(action, bl); +} + +void DiscardIO::write_debug(std::ostream& out) const { + write_debug_base(out, "discard"); + out << ", imagectx=" << m_imagectx << ", offset=" << m_offset << ", length=" << m_length << "]"; +} + +void AioReadIO::encode(bufferlist &bl) const { + using ceph::encode; + action::Action action((action::AioReadAction( + ionum(), thread_id(), convert_dependencies(start_time(), dependencies()), + m_imagectx, m_offset, m_length))); + encode(action, bl); +} + +void AioReadIO::write_debug(std::ostream& out) const { + write_debug_base(out, "aio read"); + out << ", imagectx=" << m_imagectx << ", offset=" << m_offset << ", length=" << m_length << "]"; +} + +void AioWriteIO::encode(bufferlist &bl) const { + using ceph::encode; + action::Action action((action::AioWriteAction( + ionum(), thread_id(), convert_dependencies(start_time(), dependencies()), + m_imagectx, m_offset, m_length))); + encode(action, bl); +} + +void AioWriteIO::write_debug(std::ostream& out) const { + write_debug_base(out, "aio write"); + out << ", imagectx=" << m_imagectx << ", offset=" << m_offset << ", length=" << m_length << "]"; +} + +void AioDiscardIO::encode(bufferlist &bl) const { + using ceph::encode; + action::Action action((action::AioDiscardAction( + ionum(), thread_id(), convert_dependencies(start_time(), dependencies()), + m_imagectx, m_offset, m_length))); + encode(action, bl); +} + +void AioDiscardIO::write_debug(std::ostream& out) const { + write_debug_base(out, "aio discard"); + out << ", imagectx=" << m_imagectx << ", offset=" << m_offset << ", length=" << m_length << "]"; +} + +void OpenImageIO::encode(bufferlist &bl) const { + using ceph::encode; + action::Action action((action::OpenImageAction( + ionum(), thread_id(), convert_dependencies(start_time(), dependencies()), + m_imagectx, m_name, m_snap_name, m_readonly))); + encode(action, bl); +} + +void OpenImageIO::write_debug(std::ostream& out) const { + write_debug_base(out, "open image"); + out << ", imagectx=" << m_imagectx << ", name='" << m_name << "', snap_name='" << m_snap_name << "', readonly=" << m_readonly; +} + +void CloseImageIO::encode(bufferlist &bl) const { + using ceph::encode; + action::Action action((action::CloseImageAction( + ionum(), thread_id(), convert_dependencies(start_time(), dependencies()), + m_imagectx))); + encode(action, bl); +} + +void CloseImageIO::write_debug(std::ostream& out) const { + write_debug_base(out, "close image"); + out << ", imagectx=" << m_imagectx; +} + +void AioOpenImageIO::encode(bufferlist &bl) const { + using ceph::encode; + action::Action action((action::AioOpenImageAction( + ionum(), thread_id(), convert_dependencies(start_time(), dependencies()), + m_imagectx, m_name, m_snap_name, m_readonly))); + encode(action, bl); +} + +void AioOpenImageIO::write_debug(std::ostream& out) const { + write_debug_base(out, "aio open image"); + out << ", imagectx=" << m_imagectx << ", name='" << m_name << "', snap_name='" << m_snap_name << "', readonly=" << m_readonly; +} + +void AioCloseImageIO::encode(bufferlist &bl) const { + using ceph::encode; + action::Action action((action::AioCloseImageAction( + ionum(), thread_id(), convert_dependencies(start_time(), dependencies()), + m_imagectx))); + encode(action, bl); +} + +void AioCloseImageIO::write_debug(std::ostream& out) const { + write_debug_base(out, "aio close image"); + out << ", imagectx=" << m_imagectx; +} diff --git a/src/rbd_replay/ios.hpp b/src/rbd_replay/ios.hpp new file mode 100644 index 000000000..8a105afd2 --- /dev/null +++ b/src/rbd_replay/ios.hpp @@ -0,0 +1,401 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2014 Adam Crume + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#ifndef _INCLUDED_RBD_REPLAY_IOS_HPP +#define _INCLUDED_RBD_REPLAY_IOS_HPP + +// This code assumes that IO IDs and timestamps are related monotonically. +// In other words, (a.id < b.id) == (a.timestamp < b.timestamp) for all IOs a and b. + +#include "include/buffer_fwd.h" +#include +#include +#include +#include +#include +#include +#include "actions.hpp" + + +namespace rbd_replay { + +class IO; + +typedef std::set > io_set_t; + +typedef std::map > io_map_t; + +/** + Used by rbd-replay-prep for processing the raw trace. + Corresponds to the Action class, except that Actions are executed by rbd-replay, + and IOs are used by rbd-replay-prep for processing the raw trace. + */ +class IO : public boost::enable_shared_from_this { +public: + typedef boost::shared_ptr ptr; + typedef std::vector ptrs; + + /** + @param ionum ID of this %IO + @param start_time time the %IO started, in nanoseconds + @param thread_id ID of the thread that issued the %IO + */ + IO(action_id_t ionum, + uint64_t start_time, + thread_id_t thread_id, + const io_set_t& deps) + : m_ionum(ionum), + m_start_time(start_time), + m_dependencies(deps), + m_thread_id(thread_id), + m_completed(false) { + } + + virtual ~IO() { + } + + uint64_t start_time() const { + return m_start_time; + } + + io_set_t& dependencies() { + return m_dependencies; + } + + const io_set_t& dependencies() const { + return m_dependencies; + } + + virtual void encode(bufferlist &bl) const = 0; + + void set_ionum(action_id_t ionum) { + m_ionum = ionum; + } + + action_id_t ionum() const { + return m_ionum; + } + + thread_id_t thread_id() const { + return m_thread_id; + } + + virtual void write_debug(std::ostream& out) const = 0; + +protected: + void write_debug_base(std::ostream& out, const std::string &iotype) const; + +private: + action_id_t m_ionum; + uint64_t m_start_time; + io_set_t m_dependencies; + thread_id_t m_thread_id; + bool m_completed; +}; + +/// Used for dumping debug info. +/// @related IO +std::ostream& operator<<(std::ostream &out, const IO::ptr &io); + + +class StartThreadIO : public IO { +public: + StartThreadIO(action_id_t ionum, + uint64_t start_time, + thread_id_t thread_id) + : IO(ionum, start_time, thread_id, io_set_t()) { + } + + void encode(bufferlist &bl) const override; + + void write_debug(std::ostream& out) const override; +}; + +class StopThreadIO : public IO { +public: + StopThreadIO(action_id_t ionum, + uint64_t start_time, + thread_id_t thread_id, + const io_set_t& deps) + : IO(ionum, start_time, thread_id, deps) { + } + + void encode(bufferlist &bl) const override; + + void write_debug(std::ostream& out) const override; +}; + +class ReadIO : public IO { +public: + ReadIO(action_id_t ionum, + uint64_t start_time, + thread_id_t thread_id, + const io_set_t& deps, + imagectx_id_t imagectx, + uint64_t offset, + uint64_t length) + : IO(ionum, start_time, thread_id, deps), + m_imagectx(imagectx), + m_offset(offset), + m_length(length) { + } + + void encode(bufferlist &bl) const override; + + void write_debug(std::ostream& out) const override; + +private: + imagectx_id_t m_imagectx; + uint64_t m_offset; + uint64_t m_length; +}; + +class WriteIO : public IO { +public: + WriteIO(action_id_t ionum, + uint64_t start_time, + thread_id_t thread_id, + const io_set_t& deps, + imagectx_id_t imagectx, + uint64_t offset, + uint64_t length) + : IO(ionum, start_time, thread_id, deps), + m_imagectx(imagectx), + m_offset(offset), + m_length(length) { + } + + void encode(bufferlist &bl) const override; + + void write_debug(std::ostream& out) const override; + +private: + imagectx_id_t m_imagectx; + uint64_t m_offset; + uint64_t m_length; +}; + +class DiscardIO : public IO { +public: + DiscardIO(action_id_t ionum, + uint64_t start_time, + thread_id_t thread_id, + const io_set_t& deps, + imagectx_id_t imagectx, + uint64_t offset, + uint64_t length) + : IO(ionum, start_time, thread_id, deps), + m_imagectx(imagectx), + m_offset(offset), + m_length(length) { + } + + void encode(bufferlist &bl) const override; + + void write_debug(std::ostream& out) const override; + +private: + imagectx_id_t m_imagectx; + uint64_t m_offset; + uint64_t m_length; +}; + +class AioReadIO : public IO { +public: + AioReadIO(action_id_t ionum, + uint64_t start_time, + thread_id_t thread_id, + const io_set_t& deps, + imagectx_id_t imagectx, + uint64_t offset, + uint64_t length) + : IO(ionum, start_time, thread_id, deps), + m_imagectx(imagectx), + m_offset(offset), + m_length(length) { + } + + void encode(bufferlist &bl) const override; + + void write_debug(std::ostream& out) const override; + +private: + imagectx_id_t m_imagectx; + uint64_t m_offset; + uint64_t m_length; +}; + +class AioWriteIO : public IO { +public: + AioWriteIO(action_id_t ionum, + uint64_t start_time, + thread_id_t thread_id, + const io_set_t& deps, + imagectx_id_t imagectx, + uint64_t offset, + uint64_t length) + : IO(ionum, start_time, thread_id, deps), + m_imagectx(imagectx), + m_offset(offset), + m_length(length) { + } + + void encode(bufferlist &bl) const override; + + void write_debug(std::ostream& out) const override; + +private: + imagectx_id_t m_imagectx; + uint64_t m_offset; + uint64_t m_length; +}; + +class AioDiscardIO : public IO { +public: + AioDiscardIO(action_id_t ionum, + uint64_t start_time, + thread_id_t thread_id, + const io_set_t& deps, + imagectx_id_t imagectx, + uint64_t offset, + uint64_t length) + : IO(ionum, start_time, thread_id, deps), + m_imagectx(imagectx), + m_offset(offset), + m_length(length) { + } + + void encode(bufferlist &bl) const override; + + void write_debug(std::ostream& out) const override; + +private: + imagectx_id_t m_imagectx; + uint64_t m_offset; + uint64_t m_length; +}; + +class OpenImageIO : public IO { +public: + OpenImageIO(action_id_t ionum, + uint64_t start_time, + thread_id_t thread_id, + const io_set_t& deps, + imagectx_id_t imagectx, + const std::string& name, + const std::string& snap_name, + bool readonly) + : IO(ionum, start_time, thread_id, deps), + m_imagectx(imagectx), + m_name(name), + m_snap_name(snap_name), + m_readonly(readonly) { + } + + void encode(bufferlist &bl) const override; + + imagectx_id_t imagectx() const { + return m_imagectx; + } + + void write_debug(std::ostream& out) const override; + +private: + imagectx_id_t m_imagectx; + std::string m_name; + std::string m_snap_name; + bool m_readonly; +}; + +class CloseImageIO : public IO { +public: + CloseImageIO(action_id_t ionum, + uint64_t start_time, + thread_id_t thread_id, + const io_set_t& deps, + imagectx_id_t imagectx) + : IO(ionum, start_time, thread_id, deps), + m_imagectx(imagectx) { + } + + void encode(bufferlist &bl) const override; + + imagectx_id_t imagectx() const { + return m_imagectx; + } + + void write_debug(std::ostream& out) const override; + +private: + imagectx_id_t m_imagectx; +}; + +class AioOpenImageIO : public IO { +public: + AioOpenImageIO(action_id_t ionum, + uint64_t start_time, + thread_id_t thread_id, + const io_set_t& deps, + imagectx_id_t imagectx, + const std::string& name, + const std::string& snap_name, + bool readonly) + : IO(ionum, start_time, thread_id, deps), + m_imagectx(imagectx), + m_name(name), + m_snap_name(snap_name), + m_readonly(readonly) { + } + + void encode(bufferlist &bl) const override; + + imagectx_id_t imagectx() const { + return m_imagectx; + } + + void write_debug(std::ostream& out) const override; + +private: + imagectx_id_t m_imagectx; + std::string m_name; + std::string m_snap_name; + bool m_readonly; +}; + +class AioCloseImageIO : public IO { +public: + AioCloseImageIO(action_id_t ionum, + uint64_t start_time, + thread_id_t thread_id, + const io_set_t& deps, + imagectx_id_t imagectx) + : IO(ionum, start_time, thread_id, deps), + m_imagectx(imagectx) { + } + + void encode(bufferlist &bl) const override; + + imagectx_id_t imagectx() const { + return m_imagectx; + } + + void write_debug(std::ostream& out) const override; + +private: + imagectx_id_t m_imagectx; +}; + +} + +#endif diff --git a/src/rbd_replay/rbd-replay-prep.cc b/src/rbd_replay/rbd-replay-prep.cc new file mode 100644 index 000000000..977c7c19e --- /dev/null +++ b/src/rbd_replay/rbd-replay-prep.cc @@ -0,0 +1,584 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2014 Adam Crume + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +// This code assumes that IO IDs and timestamps are related monotonically. +// In other words, (a.id < b.id) == (a.timestamp < b.timestamp) for all IOs a and b. + +#include "include/compat.h" +#include "common/errno.h" +#include "rbd_replay/ActionTypes.h" +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include "ios.hpp" + +using namespace std; +using namespace rbd_replay; + +#define ASSERT_EXIT(check, str) \ + if (!(check)) { \ + std::cerr << str << std::endl; \ + exit(1); \ + } + +class Thread { +public: + typedef boost::shared_ptr ptr; + + Thread(thread_id_t id, + uint64_t window) + : m_id(id), + m_window(window), + m_latest_io(IO::ptr()), + m_max_ts(0) { + } + + void insert_ts(uint64_t ts) { + if (m_max_ts == 0 || ts > m_max_ts) { + m_max_ts = ts; + } + } + + uint64_t max_ts() const { + return m_max_ts; + } + + void issued_io(IO::ptr io, std::set *latest_ios) { + ceph_assert(io); + if (m_latest_io.get() != NULL) { + latest_ios->erase(m_latest_io); + } + m_latest_io = io; + latest_ios->insert(io); + } + + thread_id_t id() const { + return m_id; + } + + IO::ptr latest_io() { + return m_latest_io; + } + +private: + thread_id_t m_id; + uint64_t m_window; + IO::ptr m_latest_io; + uint64_t m_max_ts; +}; + +class AnonymizedImage { +public: + void init(const string &image_name, int index) { + ceph_assert(m_image_name == ""); + m_image_name = image_name; + ostringstream oss; + oss << "image" << index; + m_anonymized_image_name = oss.str(); + } + + string image_name() const { + return m_image_name; + } + + pair anonymize(string snap_name) { + if (snap_name == "") { + return pair(m_anonymized_image_name, ""); + } + string& anonymized_snap_name(m_snaps[snap_name]); + if (anonymized_snap_name == "") { + ostringstream oss; + oss << "snap" << m_snaps.size(); + anonymized_snap_name = oss.str(); + } + return pair(m_anonymized_image_name, anonymized_snap_name); + } + +private: + string m_image_name; + string m_anonymized_image_name; + map m_snaps; +}; + +static void usage(const string &prog) { + std::stringstream str; + str << "Usage: " << prog << " "; + std::cout << str.str() << "[ --window ] [ --anonymize ] [ --verbose ]" << std::endl + << std::string(str.str().size(), ' ') << " " << endl; +} + +__attribute__((noreturn)) static void usage_exit(const string &prog, const string &msg) { + cerr << msg << endl; + usage(prog); + exit(1); +} + +class Processor { +public: + Processor() + : m_window(1000000000ULL), // 1 billion nanoseconds, i.e., one second + m_io_count(0), + m_anonymize(false), + m_verbose(false) { + } + + void run(vector args) { + string input_file_name; + string output_file_name; + bool got_input = false; + bool got_output = false; + for (int i = 1, nargs = args.size(); i < nargs; i++) { + const string& arg(args[i]); + if (arg == "--window") { + if (i == nargs - 1) { + usage_exit(args[0], "--window requires an argument"); + } + m_window = (uint64_t)(1e9 * atof(args[++i].c_str())); + } else if (arg.compare(0, 9, "--window=") == 0) { + m_window = (uint64_t)(1e9 * atof(arg.c_str() + sizeof("--window="))); + } else if (arg == "--anonymize") { + m_anonymize = true; + } else if (arg == "--verbose") { + m_verbose = true; + } else if (arg == "-h" || arg == "--help") { + usage(args[0]); + exit(0); + } else if (arg.compare(0, 1, "-") == 0) { + usage_exit(args[0], "Unrecognized argument: " + arg); + } else if (!got_input) { + input_file_name = arg; + got_input = true; + } else if (!got_output) { + output_file_name = arg; + got_output = true; + } else { + usage_exit(args[0], "Too many arguments"); + } + } + if (!got_output) { + usage_exit(args[0], "Not enough arguments"); + } + + struct bt_context *ctx = bt_context_create(); + int trace_handle = bt_context_add_trace(ctx, + input_file_name.c_str(), // path + "ctf", // format + NULL, // packet_seek + NULL, // stream_list + NULL); // metadata + ASSERT_EXIT(trace_handle >= 0, "Error loading trace file"); + + uint64_t start_time_ns = bt_trace_handle_get_timestamp_begin(ctx, trace_handle, BT_CLOCK_REAL); + ASSERT_EXIT(start_time_ns != -1ULL, + "Error extracting creation time from trace"); + + struct bt_ctf_iter *itr = bt_ctf_iter_create(ctx, + NULL, // begin_pos + NULL); // end_pos + ceph_assert(itr); + + struct bt_iter *bt_itr = bt_ctf_get_iter(itr); + + int fd = open(output_file_name.c_str(), O_WRONLY | O_CREAT | O_EXCL | O_BINARY, 0644); + ASSERT_EXIT(fd >= 0, "Error opening output file " << output_file_name << + ": " << cpp_strerror(errno)); + BOOST_SCOPE_EXIT( (fd) ) { + close(fd); + } BOOST_SCOPE_EXIT_END; + + write_banner(fd); + + uint64_t trace_start = 0; + bool first = true; + while(true) { + struct bt_ctf_event *evt = bt_ctf_iter_read_event(itr); + if(!evt) { + break; + } + uint64_t ts = bt_ctf_get_timestamp(evt); + ASSERT_EXIT(ts != -1ULL, "Error extracting event timestamp"); + + if (first) { + trace_start = ts; + first = false; + } + ts -= trace_start; + ts += 4; // This is so we have room to insert two events (thread start and open image) at unique timestamps before whatever the first event is. + + IO::ptrs ptrs; + process_event(ts, evt, &ptrs); + serialize_events(fd, ptrs); + + int r = bt_iter_next(bt_itr); + ASSERT_EXIT(r == 0, "Error advancing event iterator"); + } + + bt_ctf_iter_destroy(itr); + + insert_thread_stops(fd); + } + +private: + void write_banner(int fd) { + bufferlist bl; + bl.append(rbd_replay::action::BANNER); + int r = bl.write_fd(fd); + ASSERT_EXIT(r >= 0, "Error writing to output file: " << cpp_strerror(r)); + } + + void serialize_events(int fd, const IO::ptrs &ptrs) { + for (IO::ptrs::const_iterator it = ptrs.begin(); it != ptrs.end(); ++it) { + IO::ptr io(*it); + + bufferlist bl; + io->encode(bl); + + int r = bl.write_fd(fd); + ASSERT_EXIT(r >= 0, "Error writing to output file: " << cpp_strerror(r)); + + if (m_verbose) { + io->write_debug(std::cout); + std::cout << std::endl; + } + } + } + + void insert_thread_stops(int fd) { + IO::ptrs ios; + for (map::const_iterator itr = m_threads.begin(), + end = m_threads.end(); itr != end; ++itr) { + Thread::ptr thread(itr->second); + ios.push_back(IO::ptr(new StopThreadIO(next_id(), thread->max_ts(), + thread->id(), + m_recent_completions))); + } + serialize_events(fd, ios); + } + + void process_event(uint64_t ts, struct bt_ctf_event *evt, + IO::ptrs *ios) { + const char *event_name = bt_ctf_event_name(evt); + const struct bt_definition *scope_context = bt_ctf_get_top_level_scope(evt, + BT_STREAM_EVENT_CONTEXT); + ASSERT_EXIT(scope_context != NULL, "Error retrieving event context"); + + const struct bt_definition *scope_fields = bt_ctf_get_top_level_scope(evt, + BT_EVENT_FIELDS); + ASSERT_EXIT(scope_fields != NULL, "Error retrieving event fields"); + + const struct bt_definition *pthread_id_field = bt_ctf_get_field(evt, scope_context, "pthread_id"); + ASSERT_EXIT(pthread_id_field != NULL, "Error retrieving thread id"); + + thread_id_t threadID = bt_ctf_get_uint64(pthread_id_field); + Thread::ptr &thread(m_threads[threadID]); + if (!thread) { + thread.reset(new Thread(threadID, m_window)); + IO::ptr io(new StartThreadIO(next_id(), ts - 4, threadID)); + ios->push_back(io); + } + thread->insert_ts(ts); + + class FieldLookup { + public: + FieldLookup(struct bt_ctf_event *evt, + const struct bt_definition *scope) + : m_evt(evt), + m_scope(scope) { + } + + const char* string(const char* name) { + const struct bt_definition *field = bt_ctf_get_field(m_evt, m_scope, name); + ASSERT_EXIT(field != NULL, "Error retrieving field '" << name << "'"); + + const char* c = bt_ctf_get_string(field); + int err = bt_ctf_field_get_error(); + ASSERT_EXIT(c && err == 0, "Error retrieving field value '" << name << + "': error=" << err); + return c; + } + + int64_t int64(const char* name) { + const struct bt_definition *field = bt_ctf_get_field(m_evt, m_scope, name); + ASSERT_EXIT(field != NULL, "Error retrieving field '" << name << "'"); + + int64_t val = bt_ctf_get_int64(field); + int err = bt_ctf_field_get_error(); + ASSERT_EXIT(err == 0, "Error retrieving field value '" << name << + "': error=" << err); + return val; + } + + uint64_t uint64(const char* name) { + const struct bt_definition *field = bt_ctf_get_field(m_evt, m_scope, name); + ASSERT_EXIT(field != NULL, "Error retrieving field '" << name << "'"); + + uint64_t val = bt_ctf_get_uint64(field); + int err = bt_ctf_field_get_error(); + ASSERT_EXIT(err == 0, "Error retrieving field value '" << name << + "': error=" << err); + return val; + } + + private: + struct bt_ctf_event *m_evt; + const struct bt_definition *m_scope; + } fields(evt, scope_fields); + + if (strcmp(event_name, "librbd:open_image_enter") == 0) { + string name(fields.string("name")); + string snap_name(fields.string("snap_name")); + bool readonly = fields.uint64("read_only"); + imagectx_id_t imagectx = fields.uint64("imagectx"); + action_id_t ionum = next_id(); + pair aname(map_image_snap(name, snap_name)); + IO::ptr io(new OpenImageIO(ionum, ts, threadID, m_recent_completions, + imagectx, aname.first, aname.second, + readonly)); + thread->issued_io(io, &m_latest_ios); + ios->push_back(io); + } else if (strcmp(event_name, "librbd:open_image_exit") == 0) { + completed(thread->latest_io()); + boost::shared_ptr io(boost::dynamic_pointer_cast(thread->latest_io())); + ceph_assert(io); + m_open_images.insert(io->imagectx()); + } else if (strcmp(event_name, "librbd:close_image_enter") == 0) { + imagectx_id_t imagectx = fields.uint64("imagectx"); + action_id_t ionum = next_id(); + IO::ptr io(new CloseImageIO(ionum, ts, threadID, m_recent_completions, + imagectx)); + thread->issued_io(io, &m_latest_ios); + ios->push_back(thread->latest_io()); + } else if (strcmp(event_name, "librbd:close_image_exit") == 0) { + completed(thread->latest_io()); + boost::shared_ptr io(boost::dynamic_pointer_cast(thread->latest_io())); + ceph_assert(io); + m_open_images.erase(io->imagectx()); + } else if (strcmp(event_name, "librbd:aio_open_image_enter") == 0) { + string name(fields.string("name")); + string snap_name(fields.string("snap_name")); + bool readonly = fields.uint64("read_only"); + imagectx_id_t imagectx = fields.uint64("imagectx"); + uint64_t completion = fields.uint64("completion"); + action_id_t ionum = next_id(); + pair aname(map_image_snap(name, snap_name)); + IO::ptr io(new AioOpenImageIO(ionum, ts, threadID, m_recent_completions, + imagectx, aname.first, aname.second, + readonly)); + thread->issued_io(io, &m_latest_ios); + ios->push_back(io); + m_pending_ios[completion] = io; + } else if (strcmp(event_name, "librbd:aio_close_image_enter") == 0) { + imagectx_id_t imagectx = fields.uint64("imagectx"); + uint64_t completion = fields.uint64("completion"); + action_id_t ionum = next_id(); + IO::ptr io(new AioCloseImageIO(ionum, ts, threadID, m_recent_completions, + imagectx)); + thread->issued_io(io, &m_latest_ios); + ios->push_back(thread->latest_io()); + m_pending_ios[completion] = io; + } else if (strcmp(event_name, "librbd:read_enter") == 0 || + strcmp(event_name, "librbd:read2_enter") == 0) { + string name(fields.string("name")); + string snap_name(fields.string("snap_name")); + bool readonly = fields.int64("read_only"); + imagectx_id_t imagectx = fields.uint64("imagectx"); + uint64_t offset = fields.uint64("offset"); + uint64_t length = fields.uint64("length"); + require_image(ts, thread, imagectx, name, snap_name, readonly, ios); + action_id_t ionum = next_id(); + IO::ptr io(new ReadIO(ionum, ts, threadID, m_recent_completions, imagectx, + offset, length)); + thread->issued_io(io, &m_latest_ios); + ios->push_back(io); + } else if (strcmp(event_name, "librbd:read_exit") == 0) { + completed(thread->latest_io()); + } else if (strcmp(event_name, "librbd:write_enter") == 0 || + strcmp(event_name, "librbd:write2_enter") == 0) { + string name(fields.string("name")); + string snap_name(fields.string("snap_name")); + bool readonly = fields.int64("read_only"); + uint64_t offset = fields.uint64("off"); + uint64_t length = fields.uint64("buf_len"); + imagectx_id_t imagectx = fields.uint64("imagectx"); + require_image(ts, thread, imagectx, name, snap_name, readonly, ios); + action_id_t ionum = next_id(); + IO::ptr io(new WriteIO(ionum, ts, threadID, m_recent_completions, + imagectx, offset, length)); + thread->issued_io(io, &m_latest_ios); + ios->push_back(io); + } else if (strcmp(event_name, "librbd:write_exit") == 0) { + completed(thread->latest_io()); + } else if (strcmp(event_name, "librbd:discard_enter") == 0) { + string name(fields.string("name")); + string snap_name(fields.string("snap_name")); + bool readonly = fields.int64("read_only"); + uint64_t offset = fields.uint64("off"); + uint64_t length = fields.uint64("len"); + imagectx_id_t imagectx = fields.uint64("imagectx"); + require_image(ts, thread, imagectx, name, snap_name, readonly, ios); + action_id_t ionum = next_id(); + IO::ptr io(new DiscardIO(ionum, ts, threadID, m_recent_completions, + imagectx, offset, length)); + thread->issued_io(io, &m_latest_ios); + ios->push_back(io); + } else if (strcmp(event_name, "librbd:discard_exit") == 0) { + completed(thread->latest_io()); + } else if (strcmp(event_name, "librbd:aio_read_enter") == 0 || + strcmp(event_name, "librbd:aio_read2_enter") == 0) { + string name(fields.string("name")); + string snap_name(fields.string("snap_name")); + bool readonly = fields.int64("read_only"); + uint64_t completion = fields.uint64("completion"); + imagectx_id_t imagectx = fields.uint64("imagectx"); + uint64_t offset = fields.uint64("offset"); + uint64_t length = fields.uint64("length"); + require_image(ts, thread, imagectx, name, snap_name, readonly, ios); + action_id_t ionum = next_id(); + IO::ptr io(new AioReadIO(ionum, ts, threadID, m_recent_completions, + imagectx, offset, length)); + ios->push_back(io); + thread->issued_io(io, &m_latest_ios); + m_pending_ios[completion] = io; + } else if (strcmp(event_name, "librbd:aio_write_enter") == 0 || + strcmp(event_name, "librbd:aio_write2_enter") == 0) { + string name(fields.string("name")); + string snap_name(fields.string("snap_name")); + bool readonly = fields.int64("read_only"); + uint64_t offset = fields.uint64("off"); + uint64_t length = fields.uint64("len"); + uint64_t completion = fields.uint64("completion"); + imagectx_id_t imagectx = fields.uint64("imagectx"); + require_image(ts, thread, imagectx, name, snap_name, readonly, ios); + action_id_t ionum = next_id(); + IO::ptr io(new AioWriteIO(ionum, ts, threadID, m_recent_completions, + imagectx, offset, length)); + thread->issued_io(io, &m_latest_ios); + ios->push_back(io); + m_pending_ios[completion] = io; + } else if (strcmp(event_name, "librbd:aio_discard_enter") == 0) { + string name(fields.string("name")); + string snap_name(fields.string("snap_name")); + bool readonly = fields.int64("read_only"); + uint64_t offset = fields.uint64("off"); + uint64_t length = fields.uint64("len"); + uint64_t completion = fields.uint64("completion"); + imagectx_id_t imagectx = fields.uint64("imagectx"); + require_image(ts, thread, imagectx, name, snap_name, readonly, ios); + action_id_t ionum = next_id(); + IO::ptr io(new AioDiscardIO(ionum, ts, threadID, m_recent_completions, + imagectx, offset, length)); + thread->issued_io(io, &m_latest_ios); + ios->push_back(io); + m_pending_ios[completion] = io; + } else if (strcmp(event_name, "librbd:aio_complete_enter") == 0) { + uint64_t completion = fields.uint64("completion"); + map::iterator itr = m_pending_ios.find(completion); + if (itr != m_pending_ios.end()) { + IO::ptr completedIO(itr->second); + m_pending_ios.erase(itr); + completed(completedIO); + } + } + } + + action_id_t next_id() { + action_id_t id = m_io_count; + m_io_count += 2; + return id; + } + + void completed(IO::ptr io) { + uint64_t limit = (io->start_time() < m_window ? + 0 : io->start_time() - m_window); + for (io_set_t::iterator itr = m_recent_completions.begin(); + itr != m_recent_completions.end(); ) { + IO::ptr recent_comp(*itr); + if ((recent_comp->start_time() < limit || + io->dependencies().count(recent_comp) != 0) && + m_latest_ios.count(recent_comp) == 0) { + m_recent_completions.erase(itr++); + } else { + ++itr; + } + } + m_recent_completions.insert(io); + } + + pair map_image_snap(string image_name, string snap_name) { + if (!m_anonymize) { + return pair(image_name, snap_name); + } + AnonymizedImage& m(m_anonymized_images[image_name]); + if (m.image_name() == "") { + m.init(image_name, m_anonymized_images.size()); + } + return m.anonymize(snap_name); + } + + void require_image(uint64_t ts, + Thread::ptr thread, + imagectx_id_t imagectx, + const string& name, + const string& snap_name, + bool readonly, + IO::ptrs *ios) { + ceph_assert(thread); + if (m_open_images.count(imagectx) > 0) { + return; + } + action_id_t ionum = next_id(); + pair aname(map_image_snap(name, snap_name)); + IO::ptr io(new OpenImageIO(ionum, ts - 2, thread->id(), + m_recent_completions, imagectx, aname.first, + aname.second, readonly)); + thread->issued_io(io, &m_latest_ios); + ios->push_back(io); + completed(io); + m_open_images.insert(imagectx); + } + + uint64_t m_window; + map m_threads; + uint32_t m_io_count; + io_set_t m_recent_completions; + set m_open_images; + + // keyed by completion + map m_pending_ios; + std::set m_latest_ios; + + bool m_anonymize; + map m_anonymized_images; + + bool m_verbose; +}; + +int main(int argc, char** argv) { + vector args; + for (int i = 0; i < argc; i++) { + args.push_back(string(argv[i])); + } + + Processor p; + p.run(args); +} diff --git a/src/rbd_replay/rbd-replay.cc b/src/rbd_replay/rbd-replay.cc new file mode 100644 index 000000000..f52e75fd9 --- /dev/null +++ b/src/rbd_replay/rbd-replay.cc @@ -0,0 +1,131 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2014 Adam Crume + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#include +#include +#include "common/ceph_argparse.h" +#include "global/global_init.h" +#include "Replayer.hpp" +#include "rbd_replay_debug.hpp" +#include "ImageNameMap.hpp" + + +using namespace rbd_replay; + + +static const char* get_remainder(const char *string, const char *prefix) { + while (*prefix) { + if (*prefix++ != *string++) { + return NULL; + } + } + return string; +} + +static void usage(const char* program) { + cout << "Usage: " << program << " --conf= " << std::endl; + cout << "Options:" << std::endl; + cout << " -p, --pool-name Name of the pool to use. Default: rbd" << std::endl; + cout << " --latency-multiplier Multiplies inter-request latencies. Default: 1" << std::endl; + cout << " --read-only Only perform non-destructive operations." << std::endl; + cout << " --map-image Add a rule to map image names in the trace to" << std::endl; + cout << " image names in the replay cluster." << std::endl; + cout << " --dump-perf-counters *Experimental*" << std::endl; + cout << " Dump performance counters to standard out before" << std::endl; + cout << " an image is closed. Performance counters may be dumped" << std::endl; + cout << " multiple times if multiple images are closed, or if" << std::endl; + cout << " the same image is opened and closed multiple times." << std::endl; + cout << " Performance counters and their meaning may change between" << std::endl; + cout << " versions." << std::endl; + cout << std::endl; + cout << "Image mapping rules:" << std::endl; + cout << "A rule of image1@snap1=image2@snap2 would map snap1 of image1 to snap2 of" << std::endl; + cout << "image2." << std::endl; +} + +int main(int argc, const char **argv) { + vector args; + + argv_to_vec(argc, argv, args); + if (args.empty()) { + cerr << argv[0] << ": -h or --help for usage" << std::endl; + exit(1); + } + if (ceph_argparse_need_usage(args)) { + usage(argv[0]); + exit(0); + } + auto cct = global_init(NULL, args, CEPH_ENTITY_TYPE_CLIENT, + CODE_ENVIRONMENT_UTILITY, 0); + + std::vector::iterator i; + string pool_name; + float latency_multiplier = 1; + bool readonly = false; + ImageNameMap image_name_map; + std::string val; + std::ostringstream err; + bool dump_perf_counters = false; + for (i = args.begin(); i != args.end(); ) { + if (ceph_argparse_double_dash(args, i)) { + break; + } else if (ceph_argparse_witharg(args, i, &val, "-p", "--pool", (char*)NULL)) { + pool_name = val; + } else if (ceph_argparse_witharg(args, i, &latency_multiplier, err, "--latency-multiplier", + (char*)NULL)) { + if (!err.str().empty()) { + cerr << err.str() << std::endl; + return 1; + } + } else if (ceph_argparse_flag(args, i, "--read-only", (char*)NULL)) { + readonly = true; + } else if (ceph_argparse_witharg(args, i, &val, "--map-image", (char*)NULL)) { + ImageNameMap::Mapping mapping; + if (image_name_map.parse_mapping(val, &mapping)) { + image_name_map.add_mapping(mapping); + } else { + cerr << "Unable to parse mapping string: '" << val << "'" << std::endl; + return 1; + } + } else if (ceph_argparse_flag(args, i, "--dump-perf-counters", (char*)NULL)) { + dump_perf_counters = true; + } else if (get_remainder(*i, "-")) { + cerr << "Unrecognized argument: " << *i << std::endl; + return 1; + } else { + ++i; + } + } + + common_init_finish(g_ceph_context); + + string replay_file; + if (!args.empty()) { + replay_file = args[0]; + } + + if (replay_file.empty()) { + cerr << "No replay file specified." << std::endl; + return 1; + } + + unsigned int nthreads = boost::thread::hardware_concurrency(); + Replayer replayer(2 * nthreads + 1); + replayer.set_latency_multiplier(latency_multiplier); + replayer.set_pool_name(pool_name); + replayer.set_readonly(readonly); + replayer.set_image_name_map(image_name_map); + replayer.set_dump_perf_counters(dump_perf_counters); + replayer.run(replay_file); +} diff --git a/src/rbd_replay/rbd_loc.cc b/src/rbd_replay/rbd_loc.cc new file mode 100644 index 000000000..ce6e8e6ed --- /dev/null +++ b/src/rbd_replay/rbd_loc.cc @@ -0,0 +1,130 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2014 Adam Crume + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#include "rbd_loc.hpp" +#include "include/ceph_assert.h" + + +using namespace std; +using namespace rbd_replay; + + +rbd_loc::rbd_loc() { +} + +rbd_loc::rbd_loc(const string &pool, const string &image, const string &snap) + : pool(pool), + image(image), + snap(snap) { +} + +bool rbd_loc::parse(string name_string) { + int field = 0; + string fields[3]; + bool read_slash = false; + bool read_at = false; + for (size_t i = 0, n = name_string.length(); i < n; i++) { + char c = name_string[i]; + switch (c) { + case '/': + if (read_slash || read_at) { + return false; + } + ceph_assert(field == 0); + field++; + read_slash = true; + break; + case '@': + if (read_at) { + return false; + } + ceph_assert(field < 2); + field++; + read_at = true; + break; + case '\\': + if (i == n - 1) { + return false; + } + fields[field].push_back(name_string[++i]); + break; + default: + fields[field].push_back(c); + } + } + + if (read_slash) { + pool = fields[0]; + image = fields[1]; + // note that if read_at is false, then fields[2] is the empty string, + // so this is still correct + snap = fields[2]; + } else { + pool = ""; + image = fields[0]; + // note that if read_at is false, then fields[1] is the empty string, + // so this is still correct + snap = fields[1]; + } + return true; +} + + +static void write(const string &in, string *out) { + for (size_t i = 0, n = in.length(); i < n; i++) { + char c = in[i]; + if (c == '@' || c == '/' || c == '\\') { + out->push_back('\\'); + } + out->push_back(c); + } +} + +string rbd_loc::str() const { + string out; + if (!pool.empty()) { + write(pool, &out); + out.push_back('/'); + } + write(image, &out); + if (!snap.empty()) { + out.push_back('@'); + write(snap, &out); + } + return out; +} + +int rbd_loc::compare(const rbd_loc& rhs) const { + int c = pool.compare(rhs.pool); + if (c) { + return c; + } + c = image.compare(rhs.image); + if (c) { + return c; + } + c = snap.compare(rhs.snap); + if (c) { + return c; + } + return 0; +} + +bool rbd_loc::operator==(const rbd_loc& rhs) const { + return compare(rhs) == 0; +} + +bool rbd_loc::operator<(const rbd_loc& rhs) const { + return compare(rhs) < 0; +} diff --git a/src/rbd_replay/rbd_loc.hpp b/src/rbd_replay/rbd_loc.hpp new file mode 100644 index 000000000..8b7bfac1f --- /dev/null +++ b/src/rbd_replay/rbd_loc.hpp @@ -0,0 +1,90 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2014 Adam Crume + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#ifndef _INCLUDED_RBD_REPLAY_RBD_LOC_HPP +#define _INCLUDED_RBD_REPLAY_RBD_LOC_HPP + +#include + +namespace rbd_replay { + +/** + Stores a pool, image name, and snap name triple. + rbd_locs can be converted to/from strings with the format pool/image\@snap. + The slash and at signs can be omitted if the pool and snap are empty, respectively. + Backslashes can be used to escape slashes and at signs in names. + Examples: + + |Pool | Image | Snap | String | + |------|-------|------|--------------------| + |rbd | vm | 1 | rbd/vm\@1 | + |rbd | vm | | rbd/vm | + | | vm | 1 | vm\@1 | + | | vm | | vm | + |rbd | | 1 | rbd/\@1 | + |rbd\@x| vm/y | 1 | rbd\\\@x/vm\\/y\@1 | + + (The empty string should obviously be avoided as the image name.) + + Note that the non-canonical forms /vm\@1 and rbd/vm\@ can also be parsed, + although they will be formatted as vm\@1 and rbd/vm. + */ +struct rbd_loc { + /** + Constructs an rbd_loc with the empty string for the pool, image, and snap. + */ + rbd_loc(); + + /** + Constructs an rbd_loc with the given pool, image, and snap. + */ + rbd_loc(const std::string &pool, const std::string &image, const std::string &snap); + + /** + Parses an rbd_loc from the given string. + If parsing fails, the contents are unmodified. + @retval true if parsing succeeded + */ + bool parse(std::string name_string); + + /** + Returns the string representation of the locator. + */ + std::string str() const; + + /** + Compares the locators lexicographically by pool, then image, then snap. + */ + int compare(const rbd_loc& rhs) const; + + /** + Returns true if the locators have identical pool, image, and snap. + */ + bool operator==(const rbd_loc& rhs) const; + + /** + Compares the locators lexicographically by pool, then image, then snap. + */ + bool operator<(const rbd_loc& rhs) const; + + std::string pool; + + std::string image; + + std::string snap; +}; + +} + +#endif diff --git a/src/rbd_replay/rbd_replay_debug.hpp b/src/rbd_replay/rbd_replay_debug.hpp new file mode 100644 index 000000000..4b44588b4 --- /dev/null +++ b/src/rbd_replay/rbd_replay_debug.hpp @@ -0,0 +1,34 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2014 Adam Crume + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#ifndef _INCLUDED_RBD_REPLAY_DEBUG_H +#define _INCLUDED_RBD_REPLAY_DEBUG_H + +#include "common/debug.h" +#include "include/ceph_assert.h" + +namespace rbd_replay { + +static const int ACTION_LEVEL = 11; +static const int DEPGRAPH_LEVEL = 12; +static const int SLEEP_LEVEL = 13; +static const int THREAD_LEVEL = 10; + +} + +#define dout_subsys ceph_subsys_rbd_replay +#undef dout_prefix +#define dout_prefix *_dout << "rbd_replay: " + +#endif -- cgit v1.2.3