summaryrefslogtreecommitdiffstats
path: root/src/rbd_replay
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-07 18:45:59 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-07 18:45:59 +0000
commit19fcec84d8d7d21e796c7624e521b60d28ee21ed (patch)
tree42d26aa27d1e3f7c0b8bd3fd14e7d7082f5008dc /src/rbd_replay
parentInitial commit. (diff)
downloadceph-19fcec84d8d7d21e796c7624e521b60d28ee21ed.tar.xz
ceph-19fcec84d8d7d21e796c7624e521b60d28ee21ed.zip
Adding upstream version 16.2.11+ds.upstream/16.2.11+dsupstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'src/rbd_replay')
-rw-r--r--src/rbd_replay/ActionTypes.cc431
-rw-r--r--src/rbd_replay/ActionTypes.h339
-rw-r--r--src/rbd_replay/BoundedBuffer.hpp71
-rw-r--r--src/rbd_replay/BufferReader.cc37
-rw-r--r--src/rbd_replay/BufferReader.h34
-rw-r--r--src/rbd_replay/CMakeLists.txt44
-rw-r--r--src/rbd_replay/ImageNameMap.cc69
-rw-r--r--src/rbd_replay/ImageNameMap.hpp54
-rw-r--r--src/rbd_replay/PendingIO.cc44
-rw-r--r--src/rbd_replay/PendingIO.hpp64
-rw-r--r--src/rbd_replay/Replayer.cc406
-rw-r--r--src/rbd_replay/Replayer.hpp167
-rw-r--r--src/rbd_replay/actions.cc249
-rw-r--r--src/rbd_replay/actions.hpp344
-rw-r--r--src/rbd_replay/ios.cc220
-rw-r--r--src/rbd_replay/ios.hpp401
-rw-r--r--src/rbd_replay/rbd-replay-prep.cc584
-rw-r--r--src/rbd_replay/rbd-replay.cc131
-rw-r--r--src/rbd_replay/rbd_loc.cc130
-rw-r--r--src/rbd_replay/rbd_loc.hpp90
-rw-r--r--src/rbd_replay/rbd_replay_debug.hpp34
21 files changed, 3943 insertions, 0 deletions
diff --git a/src/rbd_replay/ActionTypes.cc b/src/rbd_replay/ActionTypes.cc
new file mode 100644
index 000000000..e86aa6479
--- /dev/null
+++ b/src/rbd_replay/ActionTypes.cc
@@ -0,0 +1,431 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include "rbd_replay/ActionTypes.h"
+#include "include/ceph_assert.h"
+#include "include/byteorder.h"
+#include "include/stringify.h"
+#include "common/Formatter.h"
+#include <iostream>
+#include <boost/variant.hpp>
+
+namespace rbd_replay {
+namespace action {
+
+namespace {
+
+bool byte_swap_required(__u8 version) {
+#if defined(CEPH_LITTLE_ENDIAN)
+ return (version == 0);
+#else
+ return false;
+#endif
+}
+
+void decode_big_endian_string(std::string &str, bufferlist::const_iterator &it) {
+ using ceph::decode;
+#if defined(CEPH_LITTLE_ENDIAN)
+ uint32_t length;
+ decode(length, it);
+ length = swab(length);
+ str.clear();
+ it.copy(length, str);
+#else
+ ceph_abort();
+#endif
+}
+
+class EncodeVisitor : public boost::static_visitor<void> {
+public:
+ explicit EncodeVisitor(bufferlist &bl) : m_bl(bl) {
+ }
+
+ template <typename Action>
+ inline void operator()(const Action &action) const {
+ using ceph::encode;
+ encode(static_cast<uint8_t>(Action::ACTION_TYPE), m_bl);
+ action.encode(m_bl);
+ }
+private:
+ bufferlist &m_bl;
+};
+
+class DecodeVisitor : public boost::static_visitor<void> {
+public:
+ DecodeVisitor(__u8 version, bufferlist::const_iterator &iter)
+ : m_version(version), m_iter(iter) {
+ }
+
+ template <typename Action>
+ inline void operator()(Action &action) const {
+ action.decode(m_version, m_iter);
+ }
+private:
+ __u8 m_version;
+ bufferlist::const_iterator &m_iter;
+};
+
+class DumpVisitor : public boost::static_visitor<void> {
+public:
+ explicit DumpVisitor(Formatter *formatter) : m_formatter(formatter) {}
+
+ template <typename Action>
+ inline void operator()(const Action &action) const {
+ ActionType action_type = Action::ACTION_TYPE;
+ m_formatter->dump_string("action_type", stringify(action_type));
+ action.dump(m_formatter);
+ }
+private:
+ ceph::Formatter *m_formatter;
+};
+
+} // anonymous namespace
+
+void Dependency::encode(bufferlist &bl) const {
+ using ceph::encode;
+ encode(id, bl);
+ encode(time_delta, bl);
+}
+
+void Dependency::decode(bufferlist::const_iterator &it) {
+ decode(1, it);
+}
+
+void Dependency::decode(__u8 version, bufferlist::const_iterator &it) {
+ using ceph::decode;
+ decode(id, it);
+ decode(time_delta, it);
+ if (byte_swap_required(version)) {
+ id = swab(id);
+ time_delta = swab(time_delta);
+ }
+}
+
+void Dependency::dump(Formatter *f) const {
+ f->dump_unsigned("id", id);
+ f->dump_unsigned("time_delta", time_delta);
+}
+
+void Dependency::generate_test_instances(std::list<Dependency *> &o) {
+ o.push_back(new Dependency());
+ o.push_back(new Dependency(1, 123456789));
+}
+
+void ActionBase::encode(bufferlist &bl) const {
+ using ceph::encode;
+ encode(id, bl);
+ encode(thread_id, bl);
+ encode(dependencies, bl);
+}
+
+void ActionBase::decode(__u8 version, bufferlist::const_iterator &it) {
+ using ceph::decode;
+ decode(id, it);
+ decode(thread_id, it);
+ if (version == 0) {
+ uint32_t num_successors;
+ decode(num_successors, it);
+
+ uint32_t num_completion_successors;
+ decode(num_completion_successors, it);
+ }
+
+ if (byte_swap_required(version)) {
+ id = swab(id);
+ thread_id = swab(thread_id);
+
+ uint32_t dep_count;
+ decode(dep_count, it);
+ dep_count = swab(dep_count);
+ dependencies.resize(dep_count);
+ for (uint32_t i = 0; i < dep_count; ++i) {
+ dependencies[i].decode(0, it);
+ }
+ } else {
+ decode(dependencies, it);
+ }
+}
+
+void ActionBase::dump(Formatter *f) const {
+ f->dump_unsigned("id", id);
+ f->dump_unsigned("thread_id", thread_id);
+ f->open_array_section("dependencies");
+ for (size_t i = 0; i < dependencies.size(); ++i) {
+ f->open_object_section("dependency");
+ dependencies[i].dump(f);
+ f->close_section();
+ }
+ f->close_section();
+}
+
+void ImageActionBase::encode(bufferlist &bl) const {
+ using ceph::encode;
+ ActionBase::encode(bl);
+ encode(imagectx_id, bl);
+}
+
+void ImageActionBase::decode(__u8 version, bufferlist::const_iterator &it) {
+ using ceph::decode;
+ ActionBase::decode(version, it);
+ decode(imagectx_id, it);
+ if (byte_swap_required(version)) {
+ imagectx_id = swab(imagectx_id);
+ }
+}
+
+void ImageActionBase::dump(Formatter *f) const {
+ ActionBase::dump(f);
+ f->dump_unsigned("imagectx_id", imagectx_id);
+}
+
+void IoActionBase::encode(bufferlist &bl) const {
+ using ceph::encode;
+ ImageActionBase::encode(bl);
+ encode(offset, bl);
+ encode(length, bl);
+}
+
+void IoActionBase::decode(__u8 version, bufferlist::const_iterator &it) {
+ using ceph::decode;
+ ImageActionBase::decode(version, it);
+ decode(offset, it);
+ decode(length, it);
+ if (byte_swap_required(version)) {
+ offset = swab(offset);
+ length = swab(length);
+ }
+}
+
+void IoActionBase::dump(Formatter *f) const {
+ ImageActionBase::dump(f);
+ f->dump_unsigned("offset", offset);
+ f->dump_unsigned("length", length);
+}
+
+void OpenImageAction::encode(bufferlist &bl) const {
+ using ceph::encode;
+ ImageActionBase::encode(bl);
+ encode(name, bl);
+ encode(snap_name, bl);
+ encode(read_only, bl);
+}
+
+void OpenImageAction::decode(__u8 version, bufferlist::const_iterator &it) {
+ using ceph::decode;
+ ImageActionBase::decode(version, it);
+ if (byte_swap_required(version)) {
+ decode_big_endian_string(name, it);
+ decode_big_endian_string(snap_name, it);
+ } else {
+ decode(name, it);
+ decode(snap_name, it);
+ }
+ decode(read_only, it);
+}
+
+void OpenImageAction::dump(Formatter *f) const {
+ ImageActionBase::dump(f);
+ f->dump_string("name", name);
+ f->dump_string("snap_name", snap_name);
+ f->dump_bool("read_only", read_only);
+}
+
+void AioOpenImageAction::encode(bufferlist &bl) const {
+ using ceph::encode;
+ ImageActionBase::encode(bl);
+ encode(name, bl);
+ encode(snap_name, bl);
+ encode(read_only, bl);
+}
+
+void AioOpenImageAction::decode(__u8 version, bufferlist::const_iterator &it) {
+ using ceph::decode;
+ ImageActionBase::decode(version, it);
+ if (byte_swap_required(version)) {
+ decode_big_endian_string(name, it);
+ decode_big_endian_string(snap_name, it);
+ } else {
+ decode(name, it);
+ decode(snap_name, it);
+ }
+ decode(read_only, it);
+}
+
+void AioOpenImageAction::dump(Formatter *f) const {
+ ImageActionBase::dump(f);
+ f->dump_string("name", name);
+ f->dump_string("snap_name", snap_name);
+ f->dump_bool("read_only", read_only);
+}
+
+void UnknownAction::encode(bufferlist &bl) const {
+ ceph_abort();
+}
+
+void UnknownAction::decode(__u8 version, bufferlist::const_iterator &it) {
+}
+
+void UnknownAction::dump(Formatter *f) const {
+}
+
+void ActionEntry::encode(bufferlist &bl) const {
+ ENCODE_START(1, 1, bl);
+ boost::apply_visitor(EncodeVisitor(bl), action);
+ ENCODE_FINISH(bl);
+}
+
+void ActionEntry::decode(bufferlist::const_iterator &it) {
+ DECODE_START(1, it);
+ decode_versioned(struct_v, it);
+ DECODE_FINISH(it);
+}
+
+void ActionEntry::decode_unversioned(bufferlist::const_iterator &it) {
+ decode_versioned(0, it);
+}
+
+void ActionEntry::decode_versioned(__u8 version, bufferlist::const_iterator &it) {
+ using ceph::decode;
+ uint8_t action_type;
+ decode(action_type, it);
+
+ // select the correct action variant based upon the action_type
+ switch (action_type) {
+ case ACTION_TYPE_START_THREAD:
+ action = StartThreadAction();
+ break;
+ case ACTION_TYPE_STOP_THREAD:
+ action = StopThreadAction();
+ break;
+ case ACTION_TYPE_READ:
+ action = ReadAction();
+ break;
+ case ACTION_TYPE_WRITE:
+ action = WriteAction();
+ break;
+ case ACTION_TYPE_DISCARD:
+ action = DiscardAction();
+ break;
+ case ACTION_TYPE_AIO_READ:
+ action = AioReadAction();
+ break;
+ case ACTION_TYPE_AIO_WRITE:
+ action = AioWriteAction();
+ break;
+ case ACTION_TYPE_AIO_DISCARD:
+ action = AioDiscardAction();
+ break;
+ case ACTION_TYPE_OPEN_IMAGE:
+ action = OpenImageAction();
+ break;
+ case ACTION_TYPE_CLOSE_IMAGE:
+ action = CloseImageAction();
+ break;
+ case ACTION_TYPE_AIO_OPEN_IMAGE:
+ action = AioOpenImageAction();
+ break;
+ case ACTION_TYPE_AIO_CLOSE_IMAGE:
+ action = AioCloseImageAction();
+ break;
+ }
+
+ boost::apply_visitor(DecodeVisitor(version, it), action);
+}
+
+void ActionEntry::dump(Formatter *f) const {
+ boost::apply_visitor(DumpVisitor(f), action);
+}
+
+void ActionEntry::generate_test_instances(std::list<ActionEntry *> &o) {
+ Dependencies dependencies;
+ dependencies.push_back(Dependency(3, 123456789));
+ dependencies.push_back(Dependency(4, 234567890));
+
+ o.push_back(new ActionEntry(StartThreadAction()));
+ o.push_back(new ActionEntry(StartThreadAction(1, 123456789, dependencies)));
+ o.push_back(new ActionEntry(StopThreadAction()));
+ o.push_back(new ActionEntry(StopThreadAction(1, 123456789, dependencies)));
+
+ o.push_back(new ActionEntry(ReadAction()));
+ o.push_back(new ActionEntry(ReadAction(1, 123456789, dependencies, 3, 4, 5)));
+ o.push_back(new ActionEntry(WriteAction()));
+ o.push_back(new ActionEntry(WriteAction(1, 123456789, dependencies, 3, 4,
+ 5)));
+ o.push_back(new ActionEntry(DiscardAction()));
+ o.push_back(new ActionEntry(DiscardAction(1, 123456789, dependencies, 3, 4,
+ 5)));
+ o.push_back(new ActionEntry(AioReadAction()));
+ o.push_back(new ActionEntry(AioReadAction(1, 123456789, dependencies, 3, 4,
+ 5)));
+ o.push_back(new ActionEntry(AioWriteAction()));
+ o.push_back(new ActionEntry(AioWriteAction(1, 123456789, dependencies, 3, 4,
+ 5)));
+ o.push_back(new ActionEntry(AioDiscardAction()));
+ o.push_back(new ActionEntry(AioDiscardAction(1, 123456789, dependencies, 3, 4,
+ 5)));
+
+ o.push_back(new ActionEntry(OpenImageAction()));
+ o.push_back(new ActionEntry(OpenImageAction(1, 123456789, dependencies, 3,
+ "image_name", "snap_name",
+ true)));
+ o.push_back(new ActionEntry(CloseImageAction()));
+ o.push_back(new ActionEntry(CloseImageAction(1, 123456789, dependencies, 3)));
+
+ o.push_back(new ActionEntry(AioOpenImageAction()));
+ o.push_back(new ActionEntry(AioOpenImageAction(1, 123456789, dependencies, 3,
+ "image_name", "snap_name",
+ true)));
+ o.push_back(new ActionEntry(AioCloseImageAction()));
+ o.push_back(new ActionEntry(AioCloseImageAction(1, 123456789, dependencies, 3)));
+}
+
+std::ostream &operator<<(std::ostream &out,
+ const ActionType &type) {
+ using namespace rbd_replay::action;
+
+ switch (type) {
+ case ACTION_TYPE_START_THREAD:
+ out << "StartThread";
+ break;
+ case ACTION_TYPE_STOP_THREAD:
+ out << "StopThread";
+ break;
+ case ACTION_TYPE_READ:
+ out << "Read";
+ break;
+ case ACTION_TYPE_WRITE:
+ out << "Write";
+ break;
+ case ACTION_TYPE_DISCARD:
+ out << "Discard";
+ break;
+ case ACTION_TYPE_AIO_READ:
+ out << "AioRead";
+ break;
+ case ACTION_TYPE_AIO_WRITE:
+ out << "AioWrite";
+ break;
+ case ACTION_TYPE_AIO_DISCARD:
+ out << "AioDiscard";
+ break;
+ case ACTION_TYPE_OPEN_IMAGE:
+ out << "OpenImage";
+ break;
+ case ACTION_TYPE_CLOSE_IMAGE:
+ out << "CloseImage";
+ break;
+ case ACTION_TYPE_AIO_OPEN_IMAGE:
+ out << "AioOpenImage";
+ break;
+ case ACTION_TYPE_AIO_CLOSE_IMAGE:
+ out << "AioCloseImage";
+ break;
+ default:
+ out << "Unknown (" << static_cast<uint32_t>(type) << ")";
+ break;
+ }
+ return out;
+}
+
+} // namespace action
+} // namespace rbd_replay
diff --git a/src/rbd_replay/ActionTypes.h b/src/rbd_replay/ActionTypes.h
new file mode 100644
index 000000000..19e1bb8fa
--- /dev/null
+++ b/src/rbd_replay/ActionTypes.h
@@ -0,0 +1,339 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#ifndef CEPH_RBD_REPLAY_ACTION_TYPES_H
+#define CEPH_RBD_REPLAY_ACTION_TYPES_H
+
+#include "include/int_types.h"
+#include "include/buffer_fwd.h"
+#include "include/encoding.h"
+#include <iosfwd>
+#include <list>
+#include <string>
+#include <vector>
+#include <boost/variant/variant.hpp>
+
+namespace ceph { class Formatter; }
+
+namespace rbd_replay {
+namespace action {
+
+typedef uint64_t imagectx_id_t;
+typedef uint64_t thread_id_t;
+
+/// Even IDs are normal actions, odd IDs are completions.
+typedef uint32_t action_id_t;
+
+static const std::string BANNER("rbd-replay-trace");
+
+/**
+ * Dependencies link actions to earlier actions or completions.
+ * If an action has a dependency \c d then it waits until \c d.time_delta
+ * nanoseconds after the action or completion with ID \c d.id has fired.
+ */
+struct Dependency {
+ /// ID of the action or completion to wait for.
+ action_id_t id;
+
+ /// Nanoseconds of delay to wait until after the action or completion fires.
+ uint64_t time_delta;
+
+ /**
+ * @param id ID of the action or completion to wait for.
+ * @param time_delta Nanoseconds of delay to wait after the action or
+ * completion fires.
+ */
+ Dependency() : id(0), time_delta(0) {
+ }
+ Dependency(action_id_t id, uint64_t time_delta)
+ : id(id), time_delta(time_delta) {
+ }
+
+ void encode(bufferlist &bl) const;
+ void decode(bufferlist::const_iterator &it);
+ void decode(__u8 version, bufferlist::const_iterator &it);
+ void dump(Formatter *f) const;
+
+ static void generate_test_instances(std::list<Dependency *> &o);
+};
+
+WRITE_CLASS_ENCODER(Dependency);
+
+typedef std::vector<Dependency> Dependencies;
+
+enum ActionType {
+ ACTION_TYPE_START_THREAD = 0,
+ ACTION_TYPE_STOP_THREAD = 1,
+ ACTION_TYPE_READ = 2,
+ ACTION_TYPE_WRITE = 3,
+ ACTION_TYPE_AIO_READ = 4,
+ ACTION_TYPE_AIO_WRITE = 5,
+ ACTION_TYPE_OPEN_IMAGE = 6,
+ ACTION_TYPE_CLOSE_IMAGE = 7,
+ ACTION_TYPE_AIO_OPEN_IMAGE = 8,
+ ACTION_TYPE_AIO_CLOSE_IMAGE = 9,
+ ACTION_TYPE_DISCARD = 10,
+ ACTION_TYPE_AIO_DISCARD = 11
+};
+
+struct ActionBase {
+ action_id_t id;
+ thread_id_t thread_id;
+ Dependencies dependencies;
+
+ ActionBase() : id(0), thread_id(0) {
+ }
+ ActionBase(action_id_t id, thread_id_t thread_id,
+ const Dependencies &dependencies)
+ : id(id), thread_id(thread_id), dependencies(dependencies) {
+ }
+
+ void encode(bufferlist &bl) const;
+ void decode(__u8 version, bufferlist::const_iterator &it);
+ void dump(Formatter *f) const;
+};
+
+struct StartThreadAction : public ActionBase {
+ static const ActionType ACTION_TYPE = ACTION_TYPE_START_THREAD;
+
+ StartThreadAction() {
+ }
+ StartThreadAction(action_id_t id, thread_id_t thread_id,
+ const Dependencies &dependencies)
+ : ActionBase(id, thread_id, dependencies) {
+ }
+};
+
+struct StopThreadAction : public ActionBase {
+ static const ActionType ACTION_TYPE = ACTION_TYPE_STOP_THREAD;
+
+ StopThreadAction() {
+ }
+ StopThreadAction(action_id_t id, thread_id_t thread_id,
+ const Dependencies &dependencies)
+ : ActionBase(id, thread_id, dependencies) {
+ }
+};
+
+struct ImageActionBase : public ActionBase {
+ imagectx_id_t imagectx_id;
+
+ ImageActionBase() : imagectx_id(0) {
+ }
+ ImageActionBase(action_id_t id, thread_id_t thread_id,
+ const Dependencies &dependencies, imagectx_id_t imagectx_id)
+ : ActionBase(id, thread_id, dependencies), imagectx_id(imagectx_id) {
+ }
+
+ void encode(bufferlist &bl) const;
+ void decode(__u8 version, bufferlist::const_iterator &it);
+ void dump(Formatter *f) const;
+};
+
+struct IoActionBase : public ImageActionBase {
+ uint64_t offset;
+ uint64_t length;
+
+ IoActionBase() : offset(0), length(0) {
+ }
+ IoActionBase(action_id_t id, thread_id_t thread_id,
+ const Dependencies &dependencies, imagectx_id_t imagectx_id,
+ uint64_t offset, uint64_t length)
+ : ImageActionBase(id, thread_id, dependencies, imagectx_id),
+ offset(offset), length(length) {
+ }
+
+ void encode(bufferlist &bl) const;
+ void decode(__u8 version, bufferlist::const_iterator &it);
+ void dump(Formatter *f) const;
+};
+
+struct ReadAction : public IoActionBase {
+ static const ActionType ACTION_TYPE = ACTION_TYPE_READ;
+
+ ReadAction() {
+ }
+ ReadAction(action_id_t id, thread_id_t thread_id,
+ const Dependencies &dependencies, imagectx_id_t imagectx_id,
+ uint64_t offset, uint64_t length)
+ : IoActionBase(id, thread_id, dependencies, imagectx_id, offset, length) {
+ }
+};
+
+struct WriteAction : public IoActionBase {
+ static const ActionType ACTION_TYPE = ACTION_TYPE_WRITE;
+
+ WriteAction() {
+ }
+ WriteAction(action_id_t id, thread_id_t thread_id,
+ const Dependencies &dependencies, imagectx_id_t imagectx_id,
+ uint64_t offset, uint64_t length)
+ : IoActionBase(id, thread_id, dependencies, imagectx_id, offset, length) {
+ }
+};
+
+struct DiscardAction : public IoActionBase {
+ static const ActionType ACTION_TYPE = ACTION_TYPE_DISCARD;
+
+ DiscardAction() {
+ }
+ DiscardAction(action_id_t id, thread_id_t thread_id,
+ const Dependencies &dependencies, imagectx_id_t imagectx_id,
+ uint64_t offset, uint64_t length)
+ : IoActionBase(id, thread_id, dependencies, imagectx_id, offset, length) {
+ }
+};
+
+struct AioReadAction : public IoActionBase {
+ static const ActionType ACTION_TYPE = ACTION_TYPE_AIO_READ;
+
+ AioReadAction() {
+ }
+ AioReadAction(action_id_t id, thread_id_t thread_id,
+ const Dependencies &dependencies, imagectx_id_t imagectx_id,
+ uint64_t offset, uint64_t length)
+ : IoActionBase(id, thread_id, dependencies, imagectx_id, offset, length) {
+ }
+};
+
+struct AioWriteAction : public IoActionBase {
+ static const ActionType ACTION_TYPE = ACTION_TYPE_AIO_WRITE;
+
+ AioWriteAction() {
+ }
+ AioWriteAction(action_id_t id, thread_id_t thread_id,
+ const Dependencies &dependencies, imagectx_id_t imagectx_id,
+ uint64_t offset, uint64_t length)
+ : IoActionBase(id, thread_id, dependencies, imagectx_id, offset, length) {
+ }
+};
+
+struct AioDiscardAction : public IoActionBase {
+ static const ActionType ACTION_TYPE = ACTION_TYPE_AIO_DISCARD;
+
+ AioDiscardAction() {
+ }
+ AioDiscardAction(action_id_t id, thread_id_t thread_id,
+ const Dependencies &dependencies, imagectx_id_t imagectx_id,
+ uint64_t offset, uint64_t length)
+ : IoActionBase(id, thread_id, dependencies, imagectx_id, offset, length) {
+ }
+};
+
+struct OpenImageAction : public ImageActionBase {
+ static const ActionType ACTION_TYPE = ACTION_TYPE_OPEN_IMAGE;
+
+ std::string name;
+ std::string snap_name;
+ bool read_only;
+
+ OpenImageAction() : read_only(false) {
+ }
+ OpenImageAction(action_id_t id, thread_id_t thread_id,
+ const Dependencies &dependencies, imagectx_id_t imagectx_id,
+ const std::string &name, const std::string &snap_name,
+ bool read_only)
+ : ImageActionBase(id, thread_id, dependencies, imagectx_id),
+ name(name), snap_name(snap_name), read_only(read_only) {
+ }
+
+ void encode(bufferlist &bl) const;
+ void decode(__u8 version, bufferlist::const_iterator &it);
+ void dump(Formatter *f) const;
+};
+
+struct CloseImageAction : public ImageActionBase {
+ static const ActionType ACTION_TYPE = ACTION_TYPE_CLOSE_IMAGE;
+
+ CloseImageAction() {
+ }
+ CloseImageAction(action_id_t id, thread_id_t thread_id,
+ const Dependencies &dependencies, imagectx_id_t imagectx_id)
+ : ImageActionBase(id, thread_id, dependencies, imagectx_id) {
+ }
+};
+
+struct AioOpenImageAction : public ImageActionBase {
+ static const ActionType ACTION_TYPE = ACTION_TYPE_AIO_OPEN_IMAGE;
+
+ std::string name;
+ std::string snap_name;
+ bool read_only;
+
+ AioOpenImageAction() : read_only(false) {
+ }
+ AioOpenImageAction(action_id_t id, thread_id_t thread_id,
+ const Dependencies &dependencies, imagectx_id_t imagectx_id,
+ const std::string &name, const std::string &snap_name,
+ bool read_only)
+ : ImageActionBase(id, thread_id, dependencies, imagectx_id),
+ name(name), snap_name(snap_name), read_only(read_only) {
+ }
+
+ void encode(bufferlist &bl) const;
+ void decode(__u8 version, bufferlist::const_iterator &it);
+ void dump(Formatter *f) const;
+};
+
+struct AioCloseImageAction : public ImageActionBase {
+ static const ActionType ACTION_TYPE = ACTION_TYPE_AIO_CLOSE_IMAGE;
+
+ AioCloseImageAction() {
+ }
+ AioCloseImageAction(action_id_t id, thread_id_t thread_id,
+ const Dependencies &dependencies, imagectx_id_t imagectx_id)
+ : ImageActionBase(id, thread_id, dependencies, imagectx_id) {
+ }
+};
+
+struct UnknownAction {
+ static const ActionType ACTION_TYPE = static_cast<ActionType>(-1);
+
+ void encode(bufferlist &bl) const;
+ void decode(__u8 version, bufferlist::const_iterator &it);
+ void dump(Formatter *f) const;
+};
+
+typedef boost::variant<StartThreadAction,
+ StopThreadAction,
+ ReadAction,
+ WriteAction,
+ DiscardAction,
+ AioReadAction,
+ AioWriteAction,
+ AioDiscardAction,
+ OpenImageAction,
+ CloseImageAction,
+ AioOpenImageAction,
+ AioCloseImageAction,
+ UnknownAction> Action;
+
+class ActionEntry {
+public:
+ Action action;
+
+ ActionEntry() : action(UnknownAction()) {
+ }
+ ActionEntry(const Action &action) : action(action) {
+ }
+
+ void encode(bufferlist &bl) const;
+ void decode(bufferlist::const_iterator &it);
+ void decode_unversioned(bufferlist::const_iterator &it);
+ void dump(Formatter *f) const;
+
+ static void generate_test_instances(std::list<ActionEntry *> &o);
+
+private:
+ void decode_versioned(__u8 version, bufferlist::const_iterator &it);
+};
+
+WRITE_CLASS_ENCODER(ActionEntry);
+
+std::ostream &operator<<(std::ostream &out,
+ const rbd_replay::action::ActionType &type);
+
+} // namespace action
+} // namespace rbd_replay
+
+#endif // CEPH_RBD_REPLAY_ACTION_TYPES_H
diff --git a/src/rbd_replay/BoundedBuffer.hpp b/src/rbd_replay/BoundedBuffer.hpp
new file mode 100644
index 000000000..00fb1cb32
--- /dev/null
+++ b/src/rbd_replay/BoundedBuffer.hpp
@@ -0,0 +1,71 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#ifndef _INCLUDED_BOUNDED_BUFFER_HPP
+#define _INCLUDED_BOUNDED_BUFFER_HPP
+
+#include <boost/bind/bind.hpp>
+#include <boost/circular_buffer.hpp>
+#include <boost/thread/condition.hpp>
+#include <boost/thread/mutex.hpp>
+
+/**
+ Blocking, fixed-capacity, thread-safe FIFO queue useful for communicating between threads.
+ This code was taken from the Boost docs: http://www.boost.org/doc/libs/1_55_0/libs/circular_buffer/example/circular_buffer_bound_example.cpp
+ */
+template <class T>
+class BoundedBuffer {
+public:
+ typedef boost::circular_buffer<T> container_type;
+ typedef typename container_type::size_type size_type;
+ typedef typename container_type::value_type value_type;
+ typedef typename boost::call_traits<value_type>::param_type param_type;
+
+ explicit BoundedBuffer(size_type capacity) : m_unread(0), m_container(capacity) {
+ }
+
+ /**
+ Inserts an element into the queue.
+ Blocks if the queue is full.
+ */
+ void push_front(typename boost::call_traits<value_type>::param_type item) {
+ // `param_type` represents the "best" way to pass a parameter of type `value_type` to a method.
+ boost::mutex::scoped_lock lock(m_mutex);
+ m_not_full.wait(lock, boost::bind(&BoundedBuffer<value_type>::is_not_full, this));
+ m_container.push_front(item);
+ ++m_unread;
+ lock.unlock();
+ m_not_empty.notify_one();
+ }
+
+ /**
+ Removes an element from the queue.
+ Blocks if the queue is empty.
+ */
+ void pop_back(value_type* pItem) {
+ boost::mutex::scoped_lock lock(m_mutex);
+ m_not_empty.wait(lock, boost::bind(&BoundedBuffer<value_type>::is_not_empty, this));
+ *pItem = m_container[--m_unread];
+ lock.unlock();
+ m_not_full.notify_one();
+ }
+
+private:
+ BoundedBuffer(const BoundedBuffer&); // Disabled copy constructor.
+ BoundedBuffer& operator= (const BoundedBuffer&); // Disabled assign operator.
+
+ bool is_not_empty() const {
+ return m_unread > 0;
+ }
+ bool is_not_full() const {
+ return m_unread < m_container.capacity();
+ }
+
+ size_type m_unread;
+ container_type m_container;
+ boost::mutex m_mutex;
+ boost::condition m_not_empty;
+ boost::condition m_not_full;
+};
+
+#endif
diff --git a/src/rbd_replay/BufferReader.cc b/src/rbd_replay/BufferReader.cc
new file mode 100644
index 000000000..b4dce6515
--- /dev/null
+++ b/src/rbd_replay/BufferReader.cc
@@ -0,0 +1,37 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include "rbd_replay/BufferReader.h"
+#include "include/ceph_assert.h"
+#include "include/intarith.h"
+
+namespace rbd_replay {
+
+BufferReader::BufferReader(int fd, size_t min_bytes, size_t max_bytes)
+ : m_fd(fd), m_min_bytes(min_bytes), m_max_bytes(max_bytes),
+ m_bl_it(m_bl.begin()), m_eof_reached(false) {
+ ceph_assert(m_min_bytes <= m_max_bytes);
+}
+
+int BufferReader::fetch(bufferlist::const_iterator **it) {
+ if (m_bl_it.get_remaining() < m_min_bytes) {
+ ssize_t bytes_to_read = round_up_to(m_max_bytes - m_bl_it.get_remaining(),
+ CEPH_PAGE_SIZE);
+ while (!m_eof_reached && bytes_to_read > 0) {
+ int r = m_bl.read_fd(m_fd, CEPH_PAGE_SIZE);
+ if (r < 0) {
+ return r;
+ }
+ if (r == 0) {
+ m_eof_reached = true;
+ }
+ ceph_assert(r <= bytes_to_read);
+ bytes_to_read -= r;
+ }
+ }
+
+ *it = &m_bl_it;
+ return 0;
+}
+
+} // namespace rbd_replay
diff --git a/src/rbd_replay/BufferReader.h b/src/rbd_replay/BufferReader.h
new file mode 100644
index 000000000..38a105774
--- /dev/null
+++ b/src/rbd_replay/BufferReader.h
@@ -0,0 +1,34 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#ifndef CEPH_RBD_REPLAY_BUFFER_READER_H
+#define CEPH_RBD_REPLAY_BUFFER_READER_H
+
+#include "include/int_types.h"
+#include "include/buffer.h"
+
+namespace rbd_replay {
+
+class BufferReader {
+public:
+ static const size_t DEFAULT_MIN_BYTES = 1<<20;
+ static const size_t DEFAULT_MAX_BYTES = 1<<22;
+
+ BufferReader(int fd, size_t min_bytes = DEFAULT_MIN_BYTES,
+ size_t max_bytes = DEFAULT_MAX_BYTES);
+
+ int fetch(bufferlist::const_iterator **it);
+
+private:
+ int m_fd;
+ size_t m_min_bytes;
+ size_t m_max_bytes;
+ bufferlist m_bl;
+ bufferlist::const_iterator m_bl_it;
+ bool m_eof_reached;
+
+};
+
+} // namespace rbd_replay
+
+#endif // CEPH_RBD_REPLAY_BUFFER_READER_H
diff --git a/src/rbd_replay/CMakeLists.txt b/src/rbd_replay/CMakeLists.txt
new file mode 100644
index 000000000..63446626a
--- /dev/null
+++ b/src/rbd_replay/CMakeLists.txt
@@ -0,0 +1,44 @@
+set(librbd_replay_types_srcs
+ ActionTypes.cc)
+add_library(rbd_replay_types STATIC ${librbd_replay_types_srcs})
+
+set(librbd_replay_srcs
+ actions.cc
+ BufferReader.cc
+ ImageNameMap.cc
+ PendingIO.cc
+ rbd_loc.cc
+ Replayer.cc)
+add_library(rbd_replay STATIC ${librbd_replay_srcs})
+target_link_libraries(rbd_replay
+ PUBLIC rbd_replay_types
+ PRIVATE librbd librados global)
+
+add_executable(rbd-replay
+ rbd-replay.cc)
+target_link_libraries(rbd-replay
+ librbd librados global rbd_replay ceph-common)
+install(TARGETS rbd-replay DESTINATION bin)
+
+set(librbd_replay_ios_srcs
+ ios.cc)
+add_library(rbd_replay_ios STATIC ${librbd_replay_ios_srcs})
+target_link_libraries(rbd_replay_ios librbd librados global)
+
+if(HAVE_BABELTRACE)
+ add_executable(rbd-replay-prep
+ rbd-replay-prep.cc)
+ target_link_libraries(rbd-replay-prep
+ rbd_replay
+ rbd_replay_ios
+ librbd
+ librados
+ ceph-common
+ global
+ babeltrace
+ babeltrace-ctf
+ Boost::date_time
+ )
+ install(TARGETS rbd-replay-prep DESTINATION bin)
+endif(HAVE_BABELTRACE)
+
diff --git a/src/rbd_replay/ImageNameMap.cc b/src/rbd_replay/ImageNameMap.cc
new file mode 100644
index 000000000..f54265718
--- /dev/null
+++ b/src/rbd_replay/ImageNameMap.cc
@@ -0,0 +1,69 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2014 Adam Crume <adamcrume@gmail.com>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+#include "ImageNameMap.hpp"
+
+
+using namespace std;
+using namespace rbd_replay;
+
+
+bool ImageNameMap::parse_mapping(string mapping_string, Mapping *mapping) const {
+ string fields[2];
+ int field = 0;
+ for (size_t i = 0, n = mapping_string.length(); i < n; i++) {
+ char c = mapping_string[i];
+ switch (c) {
+ case '\\':
+ if (i != n - 1 && mapping_string[i + 1] == '=') {
+ i++;
+ fields[field].push_back('=');
+ } else {
+ fields[field].push_back('\\');
+ }
+ break;
+ case '=':
+ if (field == 1) {
+ return false;
+ }
+ field = 1;
+ break;
+ default:
+ fields[field].push_back(c);
+ }
+ }
+ if (field == 0) {
+ return false;
+ }
+ if (!mapping->first.parse(fields[0])) {
+ return false;
+ }
+ if (!mapping->second.parse(fields[1])) {
+ return false;
+ }
+ return true;
+}
+
+void ImageNameMap::add_mapping(const Mapping& mapping) {
+ m_map.insert(mapping);
+}
+
+rbd_loc ImageNameMap::map(const rbd_loc& name) const {
+ std::map<rbd_loc, rbd_loc>::const_iterator p(m_map.find(name));
+ if (p == m_map.end()) {
+ return name;
+ } else {
+ return p->second;
+ }
+}
diff --git a/src/rbd_replay/ImageNameMap.hpp b/src/rbd_replay/ImageNameMap.hpp
new file mode 100644
index 000000000..45cdaf686
--- /dev/null
+++ b/src/rbd_replay/ImageNameMap.hpp
@@ -0,0 +1,54 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2014 Adam Crume <adamcrume@gmail.com>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+#ifndef _INCLUDED_RBD_REPLAY_IMAGENAMEMAP_HPP
+#define _INCLUDED_RBD_REPLAY_IMAGENAMEMAP_HPP
+
+#include <map>
+#include <string>
+#include "rbd_loc.hpp"
+
+namespace rbd_replay {
+
+/**
+ Maps image names.
+ */
+class ImageNameMap {
+public:
+ typedef std::pair<rbd_loc, rbd_loc> Mapping;
+
+ /**
+ Parses a mapping.
+ If parsing fails, the contents of \c mapping are undefined.
+ @param[in] mapping_string string representation of the mapping
+ @param[out] mapping stores the parsed mapping
+ @retval true parsing was successful
+ */
+ bool parse_mapping(std::string mapping_string, Mapping *mapping) const;
+
+ void add_mapping(const Mapping& mapping);
+
+ /**
+ Maps an image name.
+ If no mapping matches the name, it is returned unmodified.
+ */
+ rbd_loc map(const rbd_loc& name) const;
+
+private:
+ std::map<rbd_loc, rbd_loc> m_map;
+};
+
+}
+
+#endif
diff --git a/src/rbd_replay/PendingIO.cc b/src/rbd_replay/PendingIO.cc
new file mode 100644
index 000000000..089a60aa5
--- /dev/null
+++ b/src/rbd_replay/PendingIO.cc
@@ -0,0 +1,44 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2014 Adam Crume <adamcrume@gmail.com>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+#include "PendingIO.hpp"
+#include "rbd_replay_debug.hpp"
+
+#define dout_context g_ceph_context
+
+using namespace rbd_replay;
+
+extern "C"
+void rbd_replay_pending_io_callback(librbd::completion_t cb, void *arg) {
+ PendingIO *io = static_cast<PendingIO*>(arg);
+ io->completed(cb);
+}
+
+PendingIO::PendingIO(action_id_t id,
+ ActionCtx &worker)
+ : m_id(id),
+ m_completion(new librbd::RBD::AioCompletion(this, rbd_replay_pending_io_callback)),
+ m_worker(worker) {
+ }
+
+PendingIO::~PendingIO() {
+ m_completion->release();
+}
+
+void PendingIO::completed(librbd::completion_t cb) {
+ dout(ACTION_LEVEL) << "Completed pending IO #" << m_id << dendl;
+ ssize_t r = m_completion->get_return_value();
+ assertf(r >= 0, "id = %d, r = %d", m_id, r);
+ m_worker.remove_pending(shared_from_this());
+}
diff --git a/src/rbd_replay/PendingIO.hpp b/src/rbd_replay/PendingIO.hpp
new file mode 100644
index 000000000..3942d5f6f
--- /dev/null
+++ b/src/rbd_replay/PendingIO.hpp
@@ -0,0 +1,64 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2014 Adam Crume <adamcrume@gmail.com>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+#ifndef _INCLUDED_RBD_REPLAY_PENDINGIO_HPP
+#define _INCLUDED_RBD_REPLAY_PENDINGIO_HPP
+
+#include <boost/enable_shared_from_this.hpp>
+#include "actions.hpp"
+
+/// Do not call outside of rbd_replay::PendingIO.
+extern "C"
+void rbd_replay_pending_io_callback(librbd::completion_t cb, void *arg);
+
+namespace rbd_replay {
+
+/**
+ A PendingIO is an I/O operation that has been started but not completed.
+*/
+class PendingIO : public boost::enable_shared_from_this<PendingIO> {
+public:
+ typedef boost::shared_ptr<PendingIO> ptr;
+
+ PendingIO(action_id_t id,
+ ActionCtx &worker);
+
+ ~PendingIO();
+
+ action_id_t id() const {
+ return m_id;
+ }
+
+ ceph::bufferlist &bufferlist() {
+ return m_bl;
+ }
+
+ librbd::RBD::AioCompletion &completion() {
+ return *m_completion;
+ }
+
+private:
+ void completed(librbd::completion_t cb);
+
+ friend void ::rbd_replay_pending_io_callback(librbd::completion_t cb, void *arg);
+
+ const action_id_t m_id;
+ ceph::bufferlist m_bl;
+ librbd::RBD::AioCompletion *m_completion;
+ ActionCtx &m_worker;
+};
+
+}
+
+#endif
diff --git a/src/rbd_replay/Replayer.cc b/src/rbd_replay/Replayer.cc
new file mode 100644
index 000000000..11e84900d
--- /dev/null
+++ b/src/rbd_replay/Replayer.cc
@@ -0,0 +1,406 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2014 Adam Crume <adamcrume@gmail.com>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+#include "Replayer.hpp"
+#include "common/errno.h"
+#include "include/scope_guard.h"
+#include "rbd_replay/ActionTypes.h"
+#include "rbd_replay/BufferReader.h"
+#include <boost/foreach.hpp>
+#include <chrono>
+#include <condition_variable>
+#include <thread>
+#include <fstream>
+#include "global/global_context.h"
+#include "rbd_replay_debug.hpp"
+
+#define dout_context g_ceph_context
+
+using namespace rbd_replay;
+
+namespace {
+
+bool is_versioned_replay(BufferReader &buffer_reader) {
+ bufferlist::const_iterator *it;
+ int r = buffer_reader.fetch(&it);
+ if (r < 0) {
+ return false;
+ }
+
+ if (it->get_remaining() < action::BANNER.size()) {
+ return false;
+ }
+
+ std::string banner;
+ it->copy(action::BANNER.size(), banner);
+ bool versioned = (banner == action::BANNER);
+ if (!versioned) {
+ it->seek(0);
+ }
+ return versioned;
+}
+
+} // anonymous namespace
+
+Worker::Worker(Replayer &replayer)
+ : m_replayer(replayer),
+ m_buffer(100),
+ m_done(false) {
+}
+
+void Worker::start() {
+ m_thread = std::make_shared<std::thread>(&Worker::run, this);
+}
+
+// Should only be called by StopThreadAction
+void Worker::stop() {
+ m_done = true;
+}
+
+void Worker::join() {
+ m_thread->join();
+}
+
+void Worker::send(Action::ptr action) {
+ ceph_assert(action);
+ m_buffer.push_front(action);
+}
+
+void Worker::add_pending(PendingIO::ptr io) {
+ ceph_assert(io);
+ std::scoped_lock lock{m_pending_ios_mutex};
+ assertf(m_pending_ios.count(io->id()) == 0, "id = %d", io->id());
+ m_pending_ios[io->id()] = io;
+}
+
+void Worker::run() {
+ dout(THREAD_LEVEL) << "Worker thread started" << dendl;
+ while (!m_done) {
+ Action::ptr action;
+ m_buffer.pop_back(&action);
+ m_replayer.wait_for_actions(action->predecessors());
+ action->perform(*this);
+ m_replayer.set_action_complete(action->id());
+ }
+ {
+ std::unique_lock lock{m_pending_ios_mutex};
+ bool first_time = true;
+ while (!m_pending_ios.empty()) {
+ if (!first_time) {
+ dout(THREAD_LEVEL) << "Worker thread trying to stop, still waiting for " << m_pending_ios.size() << " pending IOs to complete:" << dendl;
+ pair<action_id_t, PendingIO::ptr> p;
+ BOOST_FOREACH(p, m_pending_ios) {
+ dout(THREAD_LEVEL) << "> " << p.first << dendl;
+ }
+ }
+ m_pending_ios_empty.wait_for(lock, std::chrono::seconds(1));
+ first_time = false;
+ }
+ }
+ dout(THREAD_LEVEL) << "Worker thread stopped" << dendl;
+}
+
+
+void Worker::remove_pending(PendingIO::ptr io) {
+ ceph_assert(io);
+ m_replayer.set_action_complete(io->id());
+ std::scoped_lock lock{m_pending_ios_mutex};
+ size_t num_erased = m_pending_ios.erase(io->id());
+ assertf(num_erased == 1, "id = %d", io->id());
+ if (m_pending_ios.empty()) {
+ m_pending_ios_empty.notify_all();
+ }
+}
+
+
+librbd::Image* Worker::get_image(imagectx_id_t imagectx_id) {
+ return m_replayer.get_image(imagectx_id);
+}
+
+
+void Worker::put_image(imagectx_id_t imagectx_id, librbd::Image* image) {
+ ceph_assert(image);
+ m_replayer.put_image(imagectx_id, image);
+}
+
+
+void Worker::erase_image(imagectx_id_t imagectx_id) {
+ m_replayer.erase_image(imagectx_id);
+}
+
+
+librbd::RBD* Worker::rbd() {
+ return m_replayer.get_rbd();
+}
+
+
+librados::IoCtx* Worker::ioctx() {
+ return m_replayer.get_ioctx();
+}
+
+void Worker::set_action_complete(action_id_t id) {
+ m_replayer.set_action_complete(id);
+}
+
+bool Worker::readonly() const {
+ return m_replayer.readonly();
+}
+
+rbd_loc Worker::map_image_name(string image_name, string snap_name) const {
+ return m_replayer.image_name_map().map(rbd_loc("", image_name, snap_name));
+}
+
+
+Replayer::Replayer(int num_action_trackers)
+ : m_rbd(NULL), m_ioctx(0),
+ m_latency_multiplier(1.0),
+ m_readonly(false), m_dump_perf_counters(false),
+ m_num_action_trackers(num_action_trackers),
+ m_action_trackers(new action_tracker_d[m_num_action_trackers]) {
+ assertf(num_action_trackers > 0, "num_action_trackers = %d", num_action_trackers);
+}
+
+Replayer::~Replayer() {
+ delete[] m_action_trackers;
+}
+
+Replayer::action_tracker_d &Replayer::tracker_for(action_id_t id) {
+ return m_action_trackers[id % m_num_action_trackers];
+}
+
+void Replayer::run(const std::string& replay_file) {
+ {
+ librados::Rados rados;
+ rados.init(NULL);
+ int r = rados.init_with_context(g_ceph_context);
+ if (r) {
+ cerr << "Failed to initialize RADOS: " << cpp_strerror(r) << std::endl;
+ goto out;
+ }
+ r = rados.connect();
+ if (r) {
+ cerr << "Failed to connect to cluster: " << cpp_strerror(r) << std::endl;
+ goto out;
+ }
+
+ if (m_pool_name.empty()) {
+ r = rados.conf_get("rbd_default_pool", m_pool_name);
+ if (r < 0) {
+ cerr << "Failed to retrieve default pool: " << cpp_strerror(r)
+ << std::endl;
+ goto out;
+ }
+ }
+
+ m_ioctx = new librados::IoCtx();
+ {
+ r = rados.ioctx_create(m_pool_name.c_str(), *m_ioctx);
+ if (r) {
+ cerr << "Failed to open pool " << m_pool_name << ": "
+ << cpp_strerror(r) << std::endl;
+ goto out2;
+ }
+ m_rbd = new librbd::RBD();
+ map<thread_id_t, Worker*> workers;
+
+ int fd = open(replay_file.c_str(), O_RDONLY|O_BINARY);
+ if (fd < 0) {
+ std::cerr << "Failed to open " << replay_file << ": "
+ << cpp_strerror(errno) << std::endl;
+ exit(1);
+ }
+ auto close_fd = make_scope_guard([fd] { close(fd); });
+
+ BufferReader buffer_reader(fd);
+ bool versioned = is_versioned_replay(buffer_reader);
+ while (true) {
+ action::ActionEntry action_entry;
+ try {
+ bufferlist::const_iterator *it;
+ int r = buffer_reader.fetch(&it);
+ if (r < 0) {
+ std::cerr << "Failed to read from trace file: " << cpp_strerror(r)
+ << std::endl;
+ exit(-r);
+ }
+ if (it->get_remaining() == 0) {
+ break;
+ }
+
+ if (versioned) {
+ action_entry.decode(*it);
+ } else {
+ action_entry.decode_unversioned(*it);
+ }
+ } catch (const buffer::error &err) {
+ std::cerr << "Failed to decode trace action: " << err.what() << std::endl;
+ exit(1);
+ }
+
+ Action::ptr action = Action::construct(action_entry);
+ if (!action) {
+ // unknown / unsupported action
+ continue;
+ }
+
+ if (action->is_start_thread()) {
+ Worker *worker = new Worker(*this);
+ workers[action->thread_id()] = worker;
+ worker->start();
+ } else {
+ workers[action->thread_id()]->send(action);
+ }
+ }
+
+ dout(THREAD_LEVEL) << "Waiting for workers to die" << dendl;
+ pair<thread_id_t, Worker*> w;
+ BOOST_FOREACH(w, workers) {
+ w.second->join();
+ delete w.second;
+ }
+ clear_images();
+ delete m_rbd;
+ m_rbd = NULL;
+ }
+ out2:
+ delete m_ioctx;
+ m_ioctx = NULL;
+ }
+ out:
+ ;
+}
+
+
+librbd::Image* Replayer::get_image(imagectx_id_t imagectx_id) {
+ std::scoped_lock lock{m_images_mutex};
+ return m_images[imagectx_id];
+}
+
+void Replayer::put_image(imagectx_id_t imagectx_id, librbd::Image *image) {
+ ceph_assert(image);
+ std::unique_lock lock{m_images_mutex};
+ ceph_assert(m_images.count(imagectx_id) == 0);
+ m_images[imagectx_id] = image;
+}
+
+void Replayer::erase_image(imagectx_id_t imagectx_id) {
+ std::unique_lock lock{m_images_mutex};
+ librbd::Image* image = m_images[imagectx_id];
+ if (m_dump_perf_counters) {
+ string command = "perf dump";
+ cmdmap_t cmdmap;
+ JSONFormatter jf(true);
+ bufferlist out;
+ stringstream ss;
+ g_ceph_context->do_command(command, cmdmap, &jf, ss, &out);
+ jf.flush(cout);
+ cout << std::endl;
+ cout.flush();
+ }
+ delete image;
+ m_images.erase(imagectx_id);
+}
+
+void Replayer::set_action_complete(action_id_t id) {
+ dout(DEPGRAPH_LEVEL) << "ActionTracker::set_complete(" << id << ")" << dendl;
+ auto now = std::chrono::system_clock::now();
+ action_tracker_d &tracker = tracker_for(id);
+ std::unique_lock lock{tracker.mutex};
+ ceph_assert(tracker.actions.count(id) == 0);
+ tracker.actions[id] = now;
+ tracker.condition.notify_all();
+}
+
+bool Replayer::is_action_complete(action_id_t id) {
+ action_tracker_d &tracker = tracker_for(id);
+ std::shared_lock lock{tracker.mutex};
+ return tracker.actions.count(id) > 0;
+}
+
+void Replayer::wait_for_actions(const action::Dependencies &deps) {
+ auto release_time = std::chrono::time_point<std::chrono::system_clock>::min();
+ for(auto& dep : deps) {
+ dout(DEPGRAPH_LEVEL) << "Waiting for " << dep.id << dendl;
+ auto start_time = std::chrono::system_clock::now();
+ action_tracker_d &tracker = tracker_for(dep.id);
+ std::unique_lock lock{tracker.mutex};
+ bool first_time = true;
+ while (tracker.actions.count(dep.id) == 0) {
+ if (!first_time) {
+ dout(DEPGRAPH_LEVEL) << "Still waiting for " << dep.id << dendl;
+ }
+ tracker.condition.wait_for(lock, std::chrono::seconds(1));
+ first_time = false;
+ }
+ auto action_completed_time(tracker.actions[dep.id]);
+ lock.unlock();
+ auto end_time = std::chrono::system_clock::now();
+ auto micros = std::chrono::duration_cast<std::chrono::microseconds>(end_time - start_time).count();
+ dout(DEPGRAPH_LEVEL) << "Finished waiting for " << dep.id << " after " << micros << " microseconds" << dendl;
+ // Apparently the nanoseconds constructor is optional:
+ // http://www.boost.org/doc/libs/1_46_0/doc/html/date_time/details.html#compile_options
+ auto sub_release_time{action_completed_time +
+ std::chrono::microseconds{static_cast<long long>(dep.time_delta * m_latency_multiplier / 1000)}};
+ if (sub_release_time > release_time) {
+ release_time = sub_release_time;
+ }
+ }
+ if (release_time > std::chrono::system_clock::now()) {
+ auto sleep_for = release_time - std::chrono::system_clock::now();
+ dout(SLEEP_LEVEL) << "Sleeping for "
+ << std::chrono::duration_cast<std::chrono::microseconds>(sleep_for).count()
+ << " microseconds" << dendl;
+ std::this_thread::sleep_until(release_time);
+ }
+}
+
+void Replayer::clear_images() {
+ std::shared_lock lock{m_images_mutex};
+ if (m_dump_perf_counters && !m_images.empty()) {
+ string command = "perf dump";
+ cmdmap_t cmdmap;
+ JSONFormatter jf(true);
+ bufferlist out;
+ stringstream ss;
+ g_ceph_context->do_command(command, cmdmap, &jf, ss, &out);
+ jf.flush(cout);
+ cout << std::endl;
+ cout.flush();
+ }
+ for (auto& p : m_images) {
+ delete p.second;
+ }
+ m_images.clear();
+}
+
+void Replayer::set_latency_multiplier(float f) {
+ assertf(f >= 0, "f = %f", f);
+ m_latency_multiplier = f;
+}
+
+bool Replayer::readonly() const {
+ return m_readonly;
+}
+
+void Replayer::set_readonly(bool readonly) {
+ m_readonly = readonly;
+}
+
+string Replayer::pool_name() const {
+ return m_pool_name;
+}
+
+void Replayer::set_pool_name(string pool_name) {
+ m_pool_name = pool_name;
+}
diff --git a/src/rbd_replay/Replayer.hpp b/src/rbd_replay/Replayer.hpp
new file mode 100644
index 000000000..ddbd05743
--- /dev/null
+++ b/src/rbd_replay/Replayer.hpp
@@ -0,0 +1,167 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2014 Adam Crume <adamcrume@gmail.com>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+#ifndef _INCLUDED_RBD_REPLAY_REPLAYER_HPP
+#define _INCLUDED_RBD_REPLAY_REPLAYER_HPP
+
+#include <chrono>
+#include <mutex>
+#include <thread>
+#include <condition_variable>
+#include "rbd_replay/ActionTypes.h"
+#include "BoundedBuffer.hpp"
+#include "ImageNameMap.hpp"
+#include "PendingIO.hpp"
+
+namespace rbd_replay {
+
+class Replayer;
+
+/**
+ Performs Actions within a single thread.
+ */
+class Worker : public ActionCtx {
+public:
+ explicit Worker(Replayer &replayer);
+
+ void start();
+
+ /// Should only be called by StopThreadAction
+ void stop() override;
+
+ void join();
+
+ void send(Action::ptr action);
+
+ void add_pending(PendingIO::ptr io) override;
+
+ void remove_pending(PendingIO::ptr io) override;
+
+ librbd::Image* get_image(imagectx_id_t imagectx_id) override;
+
+ void put_image(imagectx_id_t imagectx_id, librbd::Image* image) override;
+
+ void erase_image(imagectx_id_t imagectx_id) override;
+
+ librbd::RBD* rbd() override;
+
+ librados::IoCtx* ioctx() override;
+
+ void set_action_complete(action_id_t id) override;
+
+ bool readonly() const override;
+
+ rbd_loc map_image_name(std::string image_name, std::string snap_name) const override;
+
+private:
+ void run();
+
+ Replayer &m_replayer;
+ BoundedBuffer<Action::ptr> m_buffer;
+ std::shared_ptr<std::thread> m_thread;
+ std::map<action_id_t, PendingIO::ptr> m_pending_ios;
+ std::mutex m_pending_ios_mutex;
+ std::condition_variable_any m_pending_ios_empty;
+ bool m_done;
+};
+
+
+class Replayer {
+public:
+ explicit Replayer(int num_action_trackers);
+
+ ~Replayer();
+
+ void run(const std::string &replay_file);
+
+ librbd::RBD* get_rbd() {
+ return m_rbd;
+ }
+
+ librados::IoCtx* get_ioctx() {
+ return m_ioctx;
+ }
+
+ librbd::Image* get_image(imagectx_id_t imagectx_id);
+
+ void put_image(imagectx_id_t imagectx_id, librbd::Image *image);
+
+ void erase_image(imagectx_id_t imagectx_id);
+
+ void set_action_complete(action_id_t id);
+
+ bool is_action_complete(action_id_t id);
+
+ void wait_for_actions(const action::Dependencies &deps);
+
+ std::string pool_name() const;
+
+ void set_pool_name(std::string pool_name);
+
+ void set_latency_multiplier(float f);
+
+ bool readonly() const;
+
+ void set_readonly(bool readonly);
+
+ void set_image_name_map(const ImageNameMap &map) {
+ m_image_name_map = map;
+ }
+
+ void set_dump_perf_counters(bool dump_perf_counters) {
+ m_dump_perf_counters = dump_perf_counters;
+ }
+
+ const ImageNameMap &image_name_map() const {
+ return m_image_name_map;
+ }
+
+private:
+ struct action_tracker_d {
+ /// Maps an action ID to the time the action completed
+ std::map<action_id_t, std::chrono::system_clock::time_point> actions;
+ std::shared_mutex mutex;
+ std::condition_variable_any condition;
+ };
+
+ void clear_images();
+
+ action_tracker_d &tracker_for(action_id_t id);
+
+ /// Disallow copying
+ Replayer(const Replayer& rhs);
+ /// Disallow assignment
+ const Replayer& operator=(const Replayer& rhs);
+
+ librbd::RBD* m_rbd;
+ librados::IoCtx* m_ioctx;
+ std::string m_pool_name;
+ float m_latency_multiplier;
+ bool m_readonly;
+ ImageNameMap m_image_name_map;
+ bool m_dump_perf_counters;
+
+ std::map<imagectx_id_t, librbd::Image*> m_images;
+ std::shared_mutex m_images_mutex;
+
+ /// Actions are hashed across the trackers by ID.
+ /// Number of trackers should probably be larger than the number of cores and prime.
+ /// Should definitely be odd.
+ const int m_num_action_trackers;
+ action_tracker_d* m_action_trackers;
+};
+
+}
+
+#endif
diff --git a/src/rbd_replay/actions.cc b/src/rbd_replay/actions.cc
new file mode 100644
index 000000000..3a95c399f
--- /dev/null
+++ b/src/rbd_replay/actions.cc
@@ -0,0 +1,249 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2014 Adam Crume <adamcrume@gmail.com>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+#include "actions.hpp"
+#include <boost/foreach.hpp>
+#include <cstdlib>
+#include "PendingIO.hpp"
+#include "rbd_replay_debug.hpp"
+
+#define dout_context g_ceph_context
+
+using namespace rbd_replay;
+
+namespace {
+
+std::string create_fake_data() {
+ char data[1 << 20]; // 1 MB
+ for (unsigned int i = 0; i < sizeof(data); i++) {
+ data[i] = (char) i;
+ }
+ return std::string(data, sizeof(data));
+}
+
+struct ConstructVisitor : public boost::static_visitor<Action::ptr> {
+ inline Action::ptr operator()(const action::StartThreadAction &action) const {
+ return Action::ptr(new StartThreadAction(action));
+ }
+
+ inline Action::ptr operator()(const action::StopThreadAction &action) const{
+ return Action::ptr(new StopThreadAction(action));
+ }
+
+ inline Action::ptr operator()(const action::ReadAction &action) const {
+ return Action::ptr(new ReadAction(action));
+ }
+
+ inline Action::ptr operator()(const action::AioReadAction &action) const {
+ return Action::ptr(new AioReadAction(action));
+ }
+
+ inline Action::ptr operator()(const action::WriteAction &action) const {
+ return Action::ptr(new WriteAction(action));
+ }
+
+ inline Action::ptr operator()(const action::AioWriteAction &action) const {
+ return Action::ptr(new AioWriteAction(action));
+ }
+
+ inline Action::ptr operator()(const action::DiscardAction &action) const {
+ return Action::ptr(new DiscardAction(action));
+ }
+
+ inline Action::ptr operator()(const action::AioDiscardAction &action) const {
+ return Action::ptr(new AioDiscardAction(action));
+ }
+
+ inline Action::ptr operator()(const action::OpenImageAction &action) const {
+ return Action::ptr(new OpenImageAction(action));
+ }
+
+ inline Action::ptr operator()(const action::CloseImageAction &action) const {
+ return Action::ptr(new CloseImageAction(action));
+ }
+
+ inline Action::ptr operator()(const action::AioOpenImageAction &action) const {
+ return Action::ptr(new AioOpenImageAction(action));
+ }
+
+ inline Action::ptr operator()(const action::AioCloseImageAction &action) const {
+ return Action::ptr(new AioCloseImageAction(action));
+ }
+
+ inline Action::ptr operator()(const action::UnknownAction &action) const {
+ return Action::ptr();
+ }
+};
+
+} // anonymous namespace
+
+std::ostream& rbd_replay::operator<<(std::ostream& o, const Action& a) {
+ return a.dump(o);
+}
+
+Action::ptr Action::construct(const action::ActionEntry &action_entry) {
+ return boost::apply_visitor(ConstructVisitor(), action_entry.action);
+}
+
+void StartThreadAction::perform(ActionCtx &ctx) {
+ cerr << "StartThreadAction should never actually be performed" << std::endl;
+ exit(1);
+}
+
+void StopThreadAction::perform(ActionCtx &ctx) {
+ dout(ACTION_LEVEL) << "Performing " << *this << dendl;
+ ctx.stop();
+}
+
+void AioReadAction::perform(ActionCtx &worker) {
+ dout(ACTION_LEVEL) << "Performing " << *this << dendl;
+ librbd::Image *image = worker.get_image(m_action.imagectx_id);
+ ceph_assert(image);
+ PendingIO::ptr io(new PendingIO(pending_io_id(), worker));
+ worker.add_pending(io);
+ int r = image->aio_read(m_action.offset, m_action.length, io->bufferlist(), &io->completion());
+ assertf(r >= 0, "id = %d, r = %d", id(), r);
+}
+
+void ReadAction::perform(ActionCtx &worker) {
+ dout(ACTION_LEVEL) << "Performing " << *this << dendl;
+ librbd::Image *image = worker.get_image(m_action.imagectx_id);
+ PendingIO::ptr io(new PendingIO(pending_io_id(), worker));
+ worker.add_pending(io);
+ ssize_t r = image->read(m_action.offset, m_action.length, io->bufferlist());
+ assertf(r >= 0, "id = %d, r = %d", id(), r);
+ worker.remove_pending(io);
+}
+
+void AioWriteAction::perform(ActionCtx &worker) {
+ static const std::string fake_data(create_fake_data());
+ dout(ACTION_LEVEL) << "Performing " << *this << dendl;
+ librbd::Image *image = worker.get_image(m_action.imagectx_id);
+ PendingIO::ptr io(new PendingIO(pending_io_id(), worker));
+ uint64_t remaining = m_action.length;
+ while (remaining > 0) {
+ uint64_t n = std::min(remaining, (uint64_t)fake_data.length());
+ io->bufferlist().append(fake_data.data(), n);
+ remaining -= n;
+ }
+ worker.add_pending(io);
+ if (worker.readonly()) {
+ worker.remove_pending(io);
+ } else {
+ int r = image->aio_write(m_action.offset, m_action.length, io->bufferlist(), &io->completion());
+ assertf(r >= 0, "id = %d, r = %d", id(), r);
+ }
+}
+
+void WriteAction::perform(ActionCtx &worker) {
+ dout(ACTION_LEVEL) << "Performing " << *this << dendl;
+ librbd::Image *image = worker.get_image(m_action.imagectx_id);
+ PendingIO::ptr io(new PendingIO(pending_io_id(), worker));
+ worker.add_pending(io);
+ io->bufferlist().append_zero(m_action.length);
+ if (!worker.readonly()) {
+ ssize_t r = image->write(m_action.offset, m_action.length, io->bufferlist());
+ assertf(r >= 0, "id = %d, r = %d", id(), r);
+ }
+ worker.remove_pending(io);
+}
+
+void AioDiscardAction::perform(ActionCtx &worker) {
+ dout(ACTION_LEVEL) << "Performing " << *this << dendl;
+ librbd::Image *image = worker.get_image(m_action.imagectx_id);
+ PendingIO::ptr io(new PendingIO(pending_io_id(), worker));
+ worker.add_pending(io);
+ if (worker.readonly()) {
+ worker.remove_pending(io);
+ } else {
+ int r = image->aio_discard(m_action.offset, m_action.length, &io->completion());
+ assertf(r >= 0, "id = %d, r = %d", id(), r);
+ }
+}
+
+void DiscardAction::perform(ActionCtx &worker) {
+ dout(ACTION_LEVEL) << "Performing " << *this << dendl;
+ librbd::Image *image = worker.get_image(m_action.imagectx_id);
+ PendingIO::ptr io(new PendingIO(pending_io_id(), worker));
+ worker.add_pending(io);
+ if (!worker.readonly()) {
+ ssize_t r = image->discard(m_action.offset, m_action.length);
+ assertf(r >= 0, "id = %d, r = %d", id(), r);
+ }
+ worker.remove_pending(io);
+}
+
+void OpenImageAction::perform(ActionCtx &worker) {
+ dout(ACTION_LEVEL) << "Performing " << *this << dendl;
+ PendingIO::ptr io(new PendingIO(pending_io_id(), worker));
+ worker.add_pending(io);
+ librbd::Image *image = new librbd::Image();
+ librbd::RBD *rbd = worker.rbd();
+ rbd_loc name(worker.map_image_name(m_action.name, m_action.snap_name));
+ int r;
+ if (m_action.read_only || worker.readonly()) {
+ r = rbd->open_read_only(*worker.ioctx(), *image, name.image.c_str(), name.snap.c_str());
+ } else {
+ r = rbd->open(*worker.ioctx(), *image, name.image.c_str(), name.snap.c_str());
+ }
+ if (r) {
+ cerr << "Unable to open image '" << m_action.name
+ << "' with snap '" << m_action.snap_name
+ << "' (mapped to '" << name.str()
+ << "') and readonly " << m_action.read_only
+ << ": (" << -r << ") " << strerror(-r) << std::endl;
+ exit(1);
+ }
+ worker.put_image(m_action.imagectx_id, image);
+ worker.remove_pending(io);
+}
+
+void CloseImageAction::perform(ActionCtx &worker) {
+ dout(ACTION_LEVEL) << "Performing " << *this << dendl;
+ worker.erase_image(m_action.imagectx_id);
+ worker.set_action_complete(pending_io_id());
+}
+
+void AioOpenImageAction::perform(ActionCtx &worker) {
+ dout(ACTION_LEVEL) << "Performing " << *this << dendl;
+ // TODO: Make it async
+ PendingIO::ptr io(new PendingIO(pending_io_id(), worker));
+ worker.add_pending(io);
+ librbd::Image *image = new librbd::Image();
+ librbd::RBD *rbd = worker.rbd();
+ rbd_loc name(worker.map_image_name(m_action.name, m_action.snap_name));
+ int r;
+ if (m_action.read_only || worker.readonly()) {
+ r = rbd->open_read_only(*worker.ioctx(), *image, name.image.c_str(), name.snap.c_str());
+ } else {
+ r = rbd->open(*worker.ioctx(), *image, name.image.c_str(), name.snap.c_str());
+ }
+ if (r) {
+ cerr << "Unable to open image '" << m_action.name
+ << "' with snap '" << m_action.snap_name
+ << "' (mapped to '" << name.str()
+ << "') and readonly " << m_action.read_only
+ << ": (" << -r << ") " << strerror(-r) << std::endl;
+ exit(1);
+ }
+ worker.put_image(m_action.imagectx_id, image);
+ worker.remove_pending(io);
+}
+
+void AioCloseImageAction::perform(ActionCtx &worker) {
+ dout(ACTION_LEVEL) << "Performing " << *this << dendl;
+ // TODO: Make it async
+ worker.erase_image(m_action.imagectx_id);
+ worker.set_action_complete(pending_io_id());
+}
diff --git a/src/rbd_replay/actions.hpp b/src/rbd_replay/actions.hpp
new file mode 100644
index 000000000..89e483158
--- /dev/null
+++ b/src/rbd_replay/actions.hpp
@@ -0,0 +1,344 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2014 Adam Crume <adamcrume@gmail.com>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+#ifndef _INCLUDED_RBD_REPLAY_ACTIONS_HPP
+#define _INCLUDED_RBD_REPLAY_ACTIONS_HPP
+
+#include <boost/shared_ptr.hpp>
+#include "include/rbd/librbd.hpp"
+#include "common/Formatter.h"
+#include "rbd_replay/ActionTypes.h"
+#include "rbd_loc.hpp"
+#include <iostream>
+
+// Stupid Doxygen requires this or else the typedef docs don't appear anywhere.
+/// @file rbd_replay/actions.hpp
+
+namespace rbd_replay {
+
+typedef uint64_t imagectx_id_t;
+typedef uint64_t thread_id_t;
+
+/// Even IDs are normal actions, odd IDs are completions.
+typedef uint32_t action_id_t;
+
+class PendingIO;
+
+/**
+ %Context through which an Action interacts with its environment.
+ */
+class ActionCtx {
+public:
+ virtual ~ActionCtx() {
+ }
+
+ /**
+ Returns the image with the given ID.
+ The image must have been previously tracked with put_image(imagectx_id_t,librbd::Image*).
+ */
+ virtual librbd::Image* get_image(imagectx_id_t imagectx_id) = 0;
+
+ /**
+ Tracks an image.
+ put_image(imagectx_id_t,librbd::Image*) must not have been called previously with the same ID,
+ and the image must not be NULL.
+ */
+ virtual void put_image(imagectx_id_t imagectx_id, librbd::Image* image) = 0;
+
+ /**
+ Stops tracking an Image and release it.
+ This deletes the C++ object, not the image itself.
+ The image must have been previously tracked with put_image(imagectx_id_t,librbd::Image*).
+ */
+ virtual void erase_image(imagectx_id_t imagectx_id) = 0;
+
+ virtual librbd::RBD* rbd() = 0;
+
+ virtual librados::IoCtx* ioctx() = 0;
+
+ virtual void add_pending(boost::shared_ptr<PendingIO> io) = 0;
+
+ virtual bool readonly() const = 0;
+
+ virtual void remove_pending(boost::shared_ptr<PendingIO> io) = 0;
+
+ virtual void set_action_complete(action_id_t id) = 0;
+
+ virtual void stop() = 0;
+
+ /**
+ Maps an image name from the name in the original trace to the name that should be used when replaying.
+ @param image_name name of the image in the original trace
+ @param snap_name name of the snap in the original trace
+ @return image name to replay against
+ */
+ virtual rbd_loc map_image_name(std::string image_name, std::string snap_name) const = 0;
+};
+
+
+/**
+ Performs an %IO or a maintenance action such as starting or stopping a thread.
+ Actions are read from a replay file and scheduled by Replayer.
+ Corresponds to the IO class, except that Actions are executed by rbd-replay,
+ and IOs are used by rbd-replay-prep for processing the raw trace.
+ */
+class Action {
+public:
+ typedef boost::shared_ptr<Action> ptr;
+
+ virtual ~Action() {
+ }
+
+ virtual void perform(ActionCtx &ctx) = 0;
+
+ /// Returns the ID of the completion corresponding to this action.
+ action_id_t pending_io_id() {
+ return id() + 1;
+ }
+
+ // There's probably a better way to do this, but oh well.
+ virtual bool is_start_thread() {
+ return false;
+ }
+
+ virtual action_id_t id() const = 0;
+ virtual thread_id_t thread_id() const = 0;
+ virtual const action::Dependencies& predecessors() const = 0;
+
+ virtual std::ostream& dump(std::ostream& o) const = 0;
+
+ static ptr construct(const action::ActionEntry &action_entry);
+};
+
+template <typename ActionType>
+class TypedAction : public Action {
+public:
+ explicit TypedAction(const ActionType &action) : m_action(action) {
+ }
+
+ action_id_t id() const override {
+ return m_action.id;
+ }
+
+ thread_id_t thread_id() const override {
+ return m_action.thread_id;
+ }
+
+ const action::Dependencies& predecessors() const override {
+ return m_action.dependencies;
+ }
+
+ std::ostream& dump(std::ostream& o) const override {
+ o << get_action_name() << ": ";
+ ceph::JSONFormatter formatter(false);
+ formatter.open_object_section("");
+ m_action.dump(&formatter);
+ formatter.close_section();
+ formatter.flush(o);
+ return o;
+ }
+
+protected:
+ const ActionType m_action;
+
+ virtual const char *get_action_name() const = 0;
+};
+
+/// Writes human-readable debug information about the action to the stream.
+/// @related Action
+std::ostream& operator<<(std::ostream& o, const Action& a);
+
+class StartThreadAction : public TypedAction<action::StartThreadAction> {
+public:
+ explicit StartThreadAction(const action::StartThreadAction &action)
+ : TypedAction<action::StartThreadAction>(action) {
+ }
+
+ bool is_start_thread() override {
+ return true;
+ }
+ void perform(ActionCtx &ctx) override;
+
+protected:
+ const char *get_action_name() const override {
+ return "StartThreadAction";
+ }
+};
+
+class StopThreadAction : public TypedAction<action::StopThreadAction> {
+public:
+ explicit StopThreadAction(const action::StopThreadAction &action)
+ : TypedAction<action::StopThreadAction>(action) {
+ }
+
+ void perform(ActionCtx &ctx) override;
+
+protected:
+ const char *get_action_name() const override {
+ return "StartThreadAction";
+ }
+};
+
+
+class AioReadAction : public TypedAction<action::AioReadAction> {
+public:
+ explicit AioReadAction(const action::AioReadAction &action)
+ : TypedAction<action::AioReadAction>(action) {
+ }
+
+ void perform(ActionCtx &ctx) override;
+
+protected:
+ const char *get_action_name() const override {
+ return "AioReadAction";
+ }
+};
+
+
+class ReadAction : public TypedAction<action::ReadAction> {
+public:
+ explicit ReadAction(const action::ReadAction &action)
+ : TypedAction<action::ReadAction>(action) {
+ }
+
+ void perform(ActionCtx &ctx) override;
+
+protected:
+ const char *get_action_name() const override {
+ return "ReadAction";
+ }
+};
+
+
+class AioWriteAction : public TypedAction<action::AioWriteAction> {
+public:
+ explicit AioWriteAction(const action::AioWriteAction &action)
+ : TypedAction<action::AioWriteAction>(action) {
+ }
+
+ void perform(ActionCtx &ctx) override;
+
+protected:
+ const char *get_action_name() const override {
+ return "AioWriteAction";
+ }
+};
+
+
+class WriteAction : public TypedAction<action::WriteAction> {
+public:
+ explicit WriteAction(const action::WriteAction &action)
+ : TypedAction<action::WriteAction>(action) {
+ }
+
+ void perform(ActionCtx &ctx) override;
+
+protected:
+ const char *get_action_name() const override {
+ return "WriteAction";
+ }
+};
+
+
+class AioDiscardAction : public TypedAction<action::AioDiscardAction> {
+public:
+ explicit AioDiscardAction(const action::AioDiscardAction &action)
+ : TypedAction<action::AioDiscardAction>(action) {
+ }
+
+ void perform(ActionCtx &ctx) override;
+
+protected:
+ const char *get_action_name() const override {
+ return "AioDiscardAction";
+ }
+};
+
+
+class DiscardAction : public TypedAction<action::DiscardAction> {
+public:
+ explicit DiscardAction(const action::DiscardAction &action)
+ : TypedAction<action::DiscardAction>(action) {
+ }
+
+ void perform(ActionCtx &ctx) override;
+
+protected:
+ const char *get_action_name() const override {
+ return "DiscardAction";
+ }
+};
+
+
+class OpenImageAction : public TypedAction<action::OpenImageAction> {
+public:
+ explicit OpenImageAction(const action::OpenImageAction &action)
+ : TypedAction<action::OpenImageAction>(action) {
+ }
+
+ void perform(ActionCtx &ctx) override;
+
+protected:
+ const char *get_action_name() const override {
+ return "OpenImageAction";
+ }
+};
+
+
+class CloseImageAction : public TypedAction<action::CloseImageAction> {
+public:
+ explicit CloseImageAction(const action::CloseImageAction &action)
+ : TypedAction<action::CloseImageAction>(action) {
+ }
+
+ void perform(ActionCtx &ctx) override;
+
+protected:
+ const char *get_action_name() const override {
+ return "CloseImageAction";
+ }
+};
+
+class AioOpenImageAction : public TypedAction<action::AioOpenImageAction> {
+public:
+ explicit AioOpenImageAction(const action::AioOpenImageAction &action)
+ : TypedAction<action::AioOpenImageAction>(action) {
+ }
+
+ void perform(ActionCtx &ctx) override;
+
+protected:
+ const char *get_action_name() const override {
+ return "AioOpenImageAction";
+ }
+};
+
+
+class AioCloseImageAction : public TypedAction<action::AioCloseImageAction> {
+public:
+ explicit AioCloseImageAction(const action::AioCloseImageAction &action)
+ : TypedAction<action::AioCloseImageAction>(action) {
+ }
+
+ void perform(ActionCtx &ctx) override;
+
+protected:
+ const char *get_action_name() const override {
+ return "AioCloseImageAction";
+ }
+};
+
+}
+
+#endif
diff --git a/src/rbd_replay/ios.cc b/src/rbd_replay/ios.cc
new file mode 100644
index 000000000..77b4f4858
--- /dev/null
+++ b/src/rbd_replay/ios.cc
@@ -0,0 +1,220 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2014 Adam Crume <adamcrume@gmail.com>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+// This code assumes that IO IDs and timestamps are related monotonically.
+// In other words, (a.id < b.id) == (a.timestamp < b.timestamp) for all IOs a and b.
+
+#include "ios.hpp"
+#include "rbd_replay/ActionTypes.h"
+
+using namespace std;
+using namespace rbd_replay;
+
+namespace {
+
+bool compare_dependencies_by_start_time(const action::Dependency &lhs,
+ const action::Dependency &rhs) {
+ return lhs.time_delta < rhs.time_delta;
+}
+
+action::Dependencies convert_dependencies(uint64_t start_time,
+ const io_set_t &deps) {
+ action::Dependencies action_deps;
+ action_deps.reserve(deps.size());
+ for (io_set_t::const_iterator it = deps.begin(); it != deps.end(); ++it) {
+ boost::shared_ptr<IO> io = *it;
+ uint64_t time_delta = 0;
+ if (start_time >= io->start_time()) {
+ time_delta = start_time - io->start_time();
+ }
+ action_deps.push_back(action::Dependency(io->ionum(), time_delta));
+ }
+ std::sort(action_deps.begin(), action_deps.end(),
+ compare_dependencies_by_start_time);
+ return action_deps;
+}
+
+} // anonymous namespace
+
+void IO::write_debug_base(ostream& out, const string &type) const {
+ out << m_ionum << ": " << m_start_time / 1000000.0 << ": " << type << ", thread = " << m_thread_id << ", deps = {";
+ bool first = true;
+ for (io_set_t::iterator itr = m_dependencies.begin(), end = m_dependencies.end(); itr != end; ++itr) {
+ if (first) {
+ first = false;
+ } else {
+ out << ", ";
+ }
+ out << (*itr)->m_ionum << ": " << m_start_time - (*itr)->m_start_time;
+ }
+ out << "}";
+}
+
+
+ostream& operator<<(ostream &out, const IO::ptr &io) {
+ io->write_debug(out);
+ return out;
+}
+
+void StartThreadIO::encode(bufferlist &bl) const {
+ using ceph::encode;
+ action::Action action((action::StartThreadAction(
+ ionum(), thread_id(), convert_dependencies(start_time(), dependencies()))));
+ encode(action, bl);
+}
+
+void StartThreadIO::write_debug(std::ostream& out) const {
+ write_debug_base(out, "start thread");
+}
+
+void StopThreadIO::encode(bufferlist &bl) const {
+ using ceph::encode;
+ action::Action action((action::StopThreadAction(
+ ionum(), thread_id(), convert_dependencies(start_time(), dependencies()))));
+ encode(action, bl);
+}
+
+void StopThreadIO::write_debug(std::ostream& out) const {
+ write_debug_base(out, "stop thread");
+}
+
+void ReadIO::encode(bufferlist &bl) const {
+ using ceph::encode;
+ action::Action action((action::ReadAction(
+ ionum(), thread_id(), convert_dependencies(start_time(), dependencies()),
+ m_imagectx, m_offset, m_length)));
+ encode(action, bl);
+}
+
+void ReadIO::write_debug(std::ostream& out) const {
+ write_debug_base(out, "read");
+ out << ", imagectx=" << m_imagectx << ", offset=" << m_offset << ", length=" << m_length << "]";
+}
+
+void WriteIO::encode(bufferlist &bl) const {
+ using ceph::encode;
+ action::Action action((action::WriteAction(
+ ionum(), thread_id(), convert_dependencies(start_time(), dependencies()),
+ m_imagectx, m_offset, m_length)));
+ encode(action, bl);
+}
+
+void WriteIO::write_debug(std::ostream& out) const {
+ write_debug_base(out, "write");
+ out << ", imagectx=" << m_imagectx << ", offset=" << m_offset << ", length=" << m_length << "]";
+}
+
+void DiscardIO::encode(bufferlist &bl) const {
+ using ceph::encode;
+ action::Action action((action::DiscardAction(
+ ionum(), thread_id(), convert_dependencies(start_time(), dependencies()),
+ m_imagectx, m_offset, m_length)));
+ encode(action, bl);
+}
+
+void DiscardIO::write_debug(std::ostream& out) const {
+ write_debug_base(out, "discard");
+ out << ", imagectx=" << m_imagectx << ", offset=" << m_offset << ", length=" << m_length << "]";
+}
+
+void AioReadIO::encode(bufferlist &bl) const {
+ using ceph::encode;
+ action::Action action((action::AioReadAction(
+ ionum(), thread_id(), convert_dependencies(start_time(), dependencies()),
+ m_imagectx, m_offset, m_length)));
+ encode(action, bl);
+}
+
+void AioReadIO::write_debug(std::ostream& out) const {
+ write_debug_base(out, "aio read");
+ out << ", imagectx=" << m_imagectx << ", offset=" << m_offset << ", length=" << m_length << "]";
+}
+
+void AioWriteIO::encode(bufferlist &bl) const {
+ using ceph::encode;
+ action::Action action((action::AioWriteAction(
+ ionum(), thread_id(), convert_dependencies(start_time(), dependencies()),
+ m_imagectx, m_offset, m_length)));
+ encode(action, bl);
+}
+
+void AioWriteIO::write_debug(std::ostream& out) const {
+ write_debug_base(out, "aio write");
+ out << ", imagectx=" << m_imagectx << ", offset=" << m_offset << ", length=" << m_length << "]";
+}
+
+void AioDiscardIO::encode(bufferlist &bl) const {
+ using ceph::encode;
+ action::Action action((action::AioDiscardAction(
+ ionum(), thread_id(), convert_dependencies(start_time(), dependencies()),
+ m_imagectx, m_offset, m_length)));
+ encode(action, bl);
+}
+
+void AioDiscardIO::write_debug(std::ostream& out) const {
+ write_debug_base(out, "aio discard");
+ out << ", imagectx=" << m_imagectx << ", offset=" << m_offset << ", length=" << m_length << "]";
+}
+
+void OpenImageIO::encode(bufferlist &bl) const {
+ using ceph::encode;
+ action::Action action((action::OpenImageAction(
+ ionum(), thread_id(), convert_dependencies(start_time(), dependencies()),
+ m_imagectx, m_name, m_snap_name, m_readonly)));
+ encode(action, bl);
+}
+
+void OpenImageIO::write_debug(std::ostream& out) const {
+ write_debug_base(out, "open image");
+ out << ", imagectx=" << m_imagectx << ", name='" << m_name << "', snap_name='" << m_snap_name << "', readonly=" << m_readonly;
+}
+
+void CloseImageIO::encode(bufferlist &bl) const {
+ using ceph::encode;
+ action::Action action((action::CloseImageAction(
+ ionum(), thread_id(), convert_dependencies(start_time(), dependencies()),
+ m_imagectx)));
+ encode(action, bl);
+}
+
+void CloseImageIO::write_debug(std::ostream& out) const {
+ write_debug_base(out, "close image");
+ out << ", imagectx=" << m_imagectx;
+}
+
+void AioOpenImageIO::encode(bufferlist &bl) const {
+ using ceph::encode;
+ action::Action action((action::AioOpenImageAction(
+ ionum(), thread_id(), convert_dependencies(start_time(), dependencies()),
+ m_imagectx, m_name, m_snap_name, m_readonly)));
+ encode(action, bl);
+}
+
+void AioOpenImageIO::write_debug(std::ostream& out) const {
+ write_debug_base(out, "aio open image");
+ out << ", imagectx=" << m_imagectx << ", name='" << m_name << "', snap_name='" << m_snap_name << "', readonly=" << m_readonly;
+}
+
+void AioCloseImageIO::encode(bufferlist &bl) const {
+ using ceph::encode;
+ action::Action action((action::AioCloseImageAction(
+ ionum(), thread_id(), convert_dependencies(start_time(), dependencies()),
+ m_imagectx)));
+ encode(action, bl);
+}
+
+void AioCloseImageIO::write_debug(std::ostream& out) const {
+ write_debug_base(out, "aio close image");
+ out << ", imagectx=" << m_imagectx;
+}
diff --git a/src/rbd_replay/ios.hpp b/src/rbd_replay/ios.hpp
new file mode 100644
index 000000000..8a105afd2
--- /dev/null
+++ b/src/rbd_replay/ios.hpp
@@ -0,0 +1,401 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2014 Adam Crume <adamcrume@gmail.com>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+#ifndef _INCLUDED_RBD_REPLAY_IOS_HPP
+#define _INCLUDED_RBD_REPLAY_IOS_HPP
+
+// This code assumes that IO IDs and timestamps are related monotonically.
+// In other words, (a.id < b.id) == (a.timestamp < b.timestamp) for all IOs a and b.
+
+#include "include/buffer_fwd.h"
+#include <boost/enable_shared_from_this.hpp>
+#include <boost/shared_ptr.hpp>
+#include <iostream>
+#include <map>
+#include <set>
+#include <vector>
+#include "actions.hpp"
+
+
+namespace rbd_replay {
+
+class IO;
+
+typedef std::set<boost::shared_ptr<IO> > io_set_t;
+
+typedef std::map<action_id_t, boost::shared_ptr<IO> > io_map_t;
+
+/**
+ Used by rbd-replay-prep for processing the raw trace.
+ Corresponds to the Action class, except that Actions are executed by rbd-replay,
+ and IOs are used by rbd-replay-prep for processing the raw trace.
+ */
+class IO : public boost::enable_shared_from_this<IO> {
+public:
+ typedef boost::shared_ptr<IO> ptr;
+ typedef std::vector<ptr> ptrs;
+
+ /**
+ @param ionum ID of this %IO
+ @param start_time time the %IO started, in nanoseconds
+ @param thread_id ID of the thread that issued the %IO
+ */
+ IO(action_id_t ionum,
+ uint64_t start_time,
+ thread_id_t thread_id,
+ const io_set_t& deps)
+ : m_ionum(ionum),
+ m_start_time(start_time),
+ m_dependencies(deps),
+ m_thread_id(thread_id),
+ m_completed(false) {
+ }
+
+ virtual ~IO() {
+ }
+
+ uint64_t start_time() const {
+ return m_start_time;
+ }
+
+ io_set_t& dependencies() {
+ return m_dependencies;
+ }
+
+ const io_set_t& dependencies() const {
+ return m_dependencies;
+ }
+
+ virtual void encode(bufferlist &bl) const = 0;
+
+ void set_ionum(action_id_t ionum) {
+ m_ionum = ionum;
+ }
+
+ action_id_t ionum() const {
+ return m_ionum;
+ }
+
+ thread_id_t thread_id() const {
+ return m_thread_id;
+ }
+
+ virtual void write_debug(std::ostream& out) const = 0;
+
+protected:
+ void write_debug_base(std::ostream& out, const std::string &iotype) const;
+
+private:
+ action_id_t m_ionum;
+ uint64_t m_start_time;
+ io_set_t m_dependencies;
+ thread_id_t m_thread_id;
+ bool m_completed;
+};
+
+/// Used for dumping debug info.
+/// @related IO
+std::ostream& operator<<(std::ostream &out, const IO::ptr &io);
+
+
+class StartThreadIO : public IO {
+public:
+ StartThreadIO(action_id_t ionum,
+ uint64_t start_time,
+ thread_id_t thread_id)
+ : IO(ionum, start_time, thread_id, io_set_t()) {
+ }
+
+ void encode(bufferlist &bl) const override;
+
+ void write_debug(std::ostream& out) const override;
+};
+
+class StopThreadIO : public IO {
+public:
+ StopThreadIO(action_id_t ionum,
+ uint64_t start_time,
+ thread_id_t thread_id,
+ const io_set_t& deps)
+ : IO(ionum, start_time, thread_id, deps) {
+ }
+
+ void encode(bufferlist &bl) const override;
+
+ void write_debug(std::ostream& out) const override;
+};
+
+class ReadIO : public IO {
+public:
+ ReadIO(action_id_t ionum,
+ uint64_t start_time,
+ thread_id_t thread_id,
+ const io_set_t& deps,
+ imagectx_id_t imagectx,
+ uint64_t offset,
+ uint64_t length)
+ : IO(ionum, start_time, thread_id, deps),
+ m_imagectx(imagectx),
+ m_offset(offset),
+ m_length(length) {
+ }
+
+ void encode(bufferlist &bl) const override;
+
+ void write_debug(std::ostream& out) const override;
+
+private:
+ imagectx_id_t m_imagectx;
+ uint64_t m_offset;
+ uint64_t m_length;
+};
+
+class WriteIO : public IO {
+public:
+ WriteIO(action_id_t ionum,
+ uint64_t start_time,
+ thread_id_t thread_id,
+ const io_set_t& deps,
+ imagectx_id_t imagectx,
+ uint64_t offset,
+ uint64_t length)
+ : IO(ionum, start_time, thread_id, deps),
+ m_imagectx(imagectx),
+ m_offset(offset),
+ m_length(length) {
+ }
+
+ void encode(bufferlist &bl) const override;
+
+ void write_debug(std::ostream& out) const override;
+
+private:
+ imagectx_id_t m_imagectx;
+ uint64_t m_offset;
+ uint64_t m_length;
+};
+
+class DiscardIO : public IO {
+public:
+ DiscardIO(action_id_t ionum,
+ uint64_t start_time,
+ thread_id_t thread_id,
+ const io_set_t& deps,
+ imagectx_id_t imagectx,
+ uint64_t offset,
+ uint64_t length)
+ : IO(ionum, start_time, thread_id, deps),
+ m_imagectx(imagectx),
+ m_offset(offset),
+ m_length(length) {
+ }
+
+ void encode(bufferlist &bl) const override;
+
+ void write_debug(std::ostream& out) const override;
+
+private:
+ imagectx_id_t m_imagectx;
+ uint64_t m_offset;
+ uint64_t m_length;
+};
+
+class AioReadIO : public IO {
+public:
+ AioReadIO(action_id_t ionum,
+ uint64_t start_time,
+ thread_id_t thread_id,
+ const io_set_t& deps,
+ imagectx_id_t imagectx,
+ uint64_t offset,
+ uint64_t length)
+ : IO(ionum, start_time, thread_id, deps),
+ m_imagectx(imagectx),
+ m_offset(offset),
+ m_length(length) {
+ }
+
+ void encode(bufferlist &bl) const override;
+
+ void write_debug(std::ostream& out) const override;
+
+private:
+ imagectx_id_t m_imagectx;
+ uint64_t m_offset;
+ uint64_t m_length;
+};
+
+class AioWriteIO : public IO {
+public:
+ AioWriteIO(action_id_t ionum,
+ uint64_t start_time,
+ thread_id_t thread_id,
+ const io_set_t& deps,
+ imagectx_id_t imagectx,
+ uint64_t offset,
+ uint64_t length)
+ : IO(ionum, start_time, thread_id, deps),
+ m_imagectx(imagectx),
+ m_offset(offset),
+ m_length(length) {
+ }
+
+ void encode(bufferlist &bl) const override;
+
+ void write_debug(std::ostream& out) const override;
+
+private:
+ imagectx_id_t m_imagectx;
+ uint64_t m_offset;
+ uint64_t m_length;
+};
+
+class AioDiscardIO : public IO {
+public:
+ AioDiscardIO(action_id_t ionum,
+ uint64_t start_time,
+ thread_id_t thread_id,
+ const io_set_t& deps,
+ imagectx_id_t imagectx,
+ uint64_t offset,
+ uint64_t length)
+ : IO(ionum, start_time, thread_id, deps),
+ m_imagectx(imagectx),
+ m_offset(offset),
+ m_length(length) {
+ }
+
+ void encode(bufferlist &bl) const override;
+
+ void write_debug(std::ostream& out) const override;
+
+private:
+ imagectx_id_t m_imagectx;
+ uint64_t m_offset;
+ uint64_t m_length;
+};
+
+class OpenImageIO : public IO {
+public:
+ OpenImageIO(action_id_t ionum,
+ uint64_t start_time,
+ thread_id_t thread_id,
+ const io_set_t& deps,
+ imagectx_id_t imagectx,
+ const std::string& name,
+ const std::string& snap_name,
+ bool readonly)
+ : IO(ionum, start_time, thread_id, deps),
+ m_imagectx(imagectx),
+ m_name(name),
+ m_snap_name(snap_name),
+ m_readonly(readonly) {
+ }
+
+ void encode(bufferlist &bl) const override;
+
+ imagectx_id_t imagectx() const {
+ return m_imagectx;
+ }
+
+ void write_debug(std::ostream& out) const override;
+
+private:
+ imagectx_id_t m_imagectx;
+ std::string m_name;
+ std::string m_snap_name;
+ bool m_readonly;
+};
+
+class CloseImageIO : public IO {
+public:
+ CloseImageIO(action_id_t ionum,
+ uint64_t start_time,
+ thread_id_t thread_id,
+ const io_set_t& deps,
+ imagectx_id_t imagectx)
+ : IO(ionum, start_time, thread_id, deps),
+ m_imagectx(imagectx) {
+ }
+
+ void encode(bufferlist &bl) const override;
+
+ imagectx_id_t imagectx() const {
+ return m_imagectx;
+ }
+
+ void write_debug(std::ostream& out) const override;
+
+private:
+ imagectx_id_t m_imagectx;
+};
+
+class AioOpenImageIO : public IO {
+public:
+ AioOpenImageIO(action_id_t ionum,
+ uint64_t start_time,
+ thread_id_t thread_id,
+ const io_set_t& deps,
+ imagectx_id_t imagectx,
+ const std::string& name,
+ const std::string& snap_name,
+ bool readonly)
+ : IO(ionum, start_time, thread_id, deps),
+ m_imagectx(imagectx),
+ m_name(name),
+ m_snap_name(snap_name),
+ m_readonly(readonly) {
+ }
+
+ void encode(bufferlist &bl) const override;
+
+ imagectx_id_t imagectx() const {
+ return m_imagectx;
+ }
+
+ void write_debug(std::ostream& out) const override;
+
+private:
+ imagectx_id_t m_imagectx;
+ std::string m_name;
+ std::string m_snap_name;
+ bool m_readonly;
+};
+
+class AioCloseImageIO : public IO {
+public:
+ AioCloseImageIO(action_id_t ionum,
+ uint64_t start_time,
+ thread_id_t thread_id,
+ const io_set_t& deps,
+ imagectx_id_t imagectx)
+ : IO(ionum, start_time, thread_id, deps),
+ m_imagectx(imagectx) {
+ }
+
+ void encode(bufferlist &bl) const override;
+
+ imagectx_id_t imagectx() const {
+ return m_imagectx;
+ }
+
+ void write_debug(std::ostream& out) const override;
+
+private:
+ imagectx_id_t m_imagectx;
+};
+
+}
+
+#endif
diff --git a/src/rbd_replay/rbd-replay-prep.cc b/src/rbd_replay/rbd-replay-prep.cc
new file mode 100644
index 000000000..977c7c19e
--- /dev/null
+++ b/src/rbd_replay/rbd-replay-prep.cc
@@ -0,0 +1,584 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2014 Adam Crume <adamcrume@gmail.com>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+// This code assumes that IO IDs and timestamps are related monotonically.
+// In other words, (a.id < b.id) == (a.timestamp < b.timestamp) for all IOs a and b.
+
+#include "include/compat.h"
+#include "common/errno.h"
+#include "rbd_replay/ActionTypes.h"
+#include <babeltrace/babeltrace.h>
+#include <babeltrace/ctf/events.h>
+#include <babeltrace/ctf/iterator.h>
+#include <sys/types.h>
+#include <fcntl.h>
+#include <cstdlib>
+#include <string>
+#include <assert.h>
+#include <fstream>
+#include <set>
+#include <boost/thread/thread.hpp>
+#include <boost/scope_exit.hpp>
+#include "ios.hpp"
+
+using namespace std;
+using namespace rbd_replay;
+
+#define ASSERT_EXIT(check, str) \
+ if (!(check)) { \
+ std::cerr << str << std::endl; \
+ exit(1); \
+ }
+
+class Thread {
+public:
+ typedef boost::shared_ptr<Thread> ptr;
+
+ Thread(thread_id_t id,
+ uint64_t window)
+ : m_id(id),
+ m_window(window),
+ m_latest_io(IO::ptr()),
+ m_max_ts(0) {
+ }
+
+ void insert_ts(uint64_t ts) {
+ if (m_max_ts == 0 || ts > m_max_ts) {
+ m_max_ts = ts;
+ }
+ }
+
+ uint64_t max_ts() const {
+ return m_max_ts;
+ }
+
+ void issued_io(IO::ptr io, std::set<IO::ptr> *latest_ios) {
+ ceph_assert(io);
+ if (m_latest_io.get() != NULL) {
+ latest_ios->erase(m_latest_io);
+ }
+ m_latest_io = io;
+ latest_ios->insert(io);
+ }
+
+ thread_id_t id() const {
+ return m_id;
+ }
+
+ IO::ptr latest_io() {
+ return m_latest_io;
+ }
+
+private:
+ thread_id_t m_id;
+ uint64_t m_window;
+ IO::ptr m_latest_io;
+ uint64_t m_max_ts;
+};
+
+class AnonymizedImage {
+public:
+ void init(const string &image_name, int index) {
+ ceph_assert(m_image_name == "");
+ m_image_name = image_name;
+ ostringstream oss;
+ oss << "image" << index;
+ m_anonymized_image_name = oss.str();
+ }
+
+ string image_name() const {
+ return m_image_name;
+ }
+
+ pair<string, string> anonymize(string snap_name) {
+ if (snap_name == "") {
+ return pair<string, string>(m_anonymized_image_name, "");
+ }
+ string& anonymized_snap_name(m_snaps[snap_name]);
+ if (anonymized_snap_name == "") {
+ ostringstream oss;
+ oss << "snap" << m_snaps.size();
+ anonymized_snap_name = oss.str();
+ }
+ return pair<string, string>(m_anonymized_image_name, anonymized_snap_name);
+ }
+
+private:
+ string m_image_name;
+ string m_anonymized_image_name;
+ map<string, string> m_snaps;
+};
+
+static void usage(const string &prog) {
+ std::stringstream str;
+ str << "Usage: " << prog << " ";
+ std::cout << str.str() << "[ --window <seconds> ] [ --anonymize ] [ --verbose ]" << std::endl
+ << std::string(str.str().size(), ' ') << "<trace-input> <replay-output>" << endl;
+}
+
+__attribute__((noreturn)) static void usage_exit(const string &prog, const string &msg) {
+ cerr << msg << endl;
+ usage(prog);
+ exit(1);
+}
+
+class Processor {
+public:
+ Processor()
+ : m_window(1000000000ULL), // 1 billion nanoseconds, i.e., one second
+ m_io_count(0),
+ m_anonymize(false),
+ m_verbose(false) {
+ }
+
+ void run(vector<string> args) {
+ string input_file_name;
+ string output_file_name;
+ bool got_input = false;
+ bool got_output = false;
+ for (int i = 1, nargs = args.size(); i < nargs; i++) {
+ const string& arg(args[i]);
+ if (arg == "--window") {
+ if (i == nargs - 1) {
+ usage_exit(args[0], "--window requires an argument");
+ }
+ m_window = (uint64_t)(1e9 * atof(args[++i].c_str()));
+ } else if (arg.compare(0, 9, "--window=") == 0) {
+ m_window = (uint64_t)(1e9 * atof(arg.c_str() + sizeof("--window=")));
+ } else if (arg == "--anonymize") {
+ m_anonymize = true;
+ } else if (arg == "--verbose") {
+ m_verbose = true;
+ } else if (arg == "-h" || arg == "--help") {
+ usage(args[0]);
+ exit(0);
+ } else if (arg.compare(0, 1, "-") == 0) {
+ usage_exit(args[0], "Unrecognized argument: " + arg);
+ } else if (!got_input) {
+ input_file_name = arg;
+ got_input = true;
+ } else if (!got_output) {
+ output_file_name = arg;
+ got_output = true;
+ } else {
+ usage_exit(args[0], "Too many arguments");
+ }
+ }
+ if (!got_output) {
+ usage_exit(args[0], "Not enough arguments");
+ }
+
+ struct bt_context *ctx = bt_context_create();
+ int trace_handle = bt_context_add_trace(ctx,
+ input_file_name.c_str(), // path
+ "ctf", // format
+ NULL, // packet_seek
+ NULL, // stream_list
+ NULL); // metadata
+ ASSERT_EXIT(trace_handle >= 0, "Error loading trace file");
+
+ uint64_t start_time_ns = bt_trace_handle_get_timestamp_begin(ctx, trace_handle, BT_CLOCK_REAL);
+ ASSERT_EXIT(start_time_ns != -1ULL,
+ "Error extracting creation time from trace");
+
+ struct bt_ctf_iter *itr = bt_ctf_iter_create(ctx,
+ NULL, // begin_pos
+ NULL); // end_pos
+ ceph_assert(itr);
+
+ struct bt_iter *bt_itr = bt_ctf_get_iter(itr);
+
+ int fd = open(output_file_name.c_str(), O_WRONLY | O_CREAT | O_EXCL | O_BINARY, 0644);
+ ASSERT_EXIT(fd >= 0, "Error opening output file " << output_file_name <<
+ ": " << cpp_strerror(errno));
+ BOOST_SCOPE_EXIT( (fd) ) {
+ close(fd);
+ } BOOST_SCOPE_EXIT_END;
+
+ write_banner(fd);
+
+ uint64_t trace_start = 0;
+ bool first = true;
+ while(true) {
+ struct bt_ctf_event *evt = bt_ctf_iter_read_event(itr);
+ if(!evt) {
+ break;
+ }
+ uint64_t ts = bt_ctf_get_timestamp(evt);
+ ASSERT_EXIT(ts != -1ULL, "Error extracting event timestamp");
+
+ if (first) {
+ trace_start = ts;
+ first = false;
+ }
+ ts -= trace_start;
+ ts += 4; // This is so we have room to insert two events (thread start and open image) at unique timestamps before whatever the first event is.
+
+ IO::ptrs ptrs;
+ process_event(ts, evt, &ptrs);
+ serialize_events(fd, ptrs);
+
+ int r = bt_iter_next(bt_itr);
+ ASSERT_EXIT(r == 0, "Error advancing event iterator");
+ }
+
+ bt_ctf_iter_destroy(itr);
+
+ insert_thread_stops(fd);
+ }
+
+private:
+ void write_banner(int fd) {
+ bufferlist bl;
+ bl.append(rbd_replay::action::BANNER);
+ int r = bl.write_fd(fd);
+ ASSERT_EXIT(r >= 0, "Error writing to output file: " << cpp_strerror(r));
+ }
+
+ void serialize_events(int fd, const IO::ptrs &ptrs) {
+ for (IO::ptrs::const_iterator it = ptrs.begin(); it != ptrs.end(); ++it) {
+ IO::ptr io(*it);
+
+ bufferlist bl;
+ io->encode(bl);
+
+ int r = bl.write_fd(fd);
+ ASSERT_EXIT(r >= 0, "Error writing to output file: " << cpp_strerror(r));
+
+ if (m_verbose) {
+ io->write_debug(std::cout);
+ std::cout << std::endl;
+ }
+ }
+ }
+
+ void insert_thread_stops(int fd) {
+ IO::ptrs ios;
+ for (map<thread_id_t, Thread::ptr>::const_iterator itr = m_threads.begin(),
+ end = m_threads.end(); itr != end; ++itr) {
+ Thread::ptr thread(itr->second);
+ ios.push_back(IO::ptr(new StopThreadIO(next_id(), thread->max_ts(),
+ thread->id(),
+ m_recent_completions)));
+ }
+ serialize_events(fd, ios);
+ }
+
+ void process_event(uint64_t ts, struct bt_ctf_event *evt,
+ IO::ptrs *ios) {
+ const char *event_name = bt_ctf_event_name(evt);
+ const struct bt_definition *scope_context = bt_ctf_get_top_level_scope(evt,
+ BT_STREAM_EVENT_CONTEXT);
+ ASSERT_EXIT(scope_context != NULL, "Error retrieving event context");
+
+ const struct bt_definition *scope_fields = bt_ctf_get_top_level_scope(evt,
+ BT_EVENT_FIELDS);
+ ASSERT_EXIT(scope_fields != NULL, "Error retrieving event fields");
+
+ const struct bt_definition *pthread_id_field = bt_ctf_get_field(evt, scope_context, "pthread_id");
+ ASSERT_EXIT(pthread_id_field != NULL, "Error retrieving thread id");
+
+ thread_id_t threadID = bt_ctf_get_uint64(pthread_id_field);
+ Thread::ptr &thread(m_threads[threadID]);
+ if (!thread) {
+ thread.reset(new Thread(threadID, m_window));
+ IO::ptr io(new StartThreadIO(next_id(), ts - 4, threadID));
+ ios->push_back(io);
+ }
+ thread->insert_ts(ts);
+
+ class FieldLookup {
+ public:
+ FieldLookup(struct bt_ctf_event *evt,
+ const struct bt_definition *scope)
+ : m_evt(evt),
+ m_scope(scope) {
+ }
+
+ const char* string(const char* name) {
+ const struct bt_definition *field = bt_ctf_get_field(m_evt, m_scope, name);
+ ASSERT_EXIT(field != NULL, "Error retrieving field '" << name << "'");
+
+ const char* c = bt_ctf_get_string(field);
+ int err = bt_ctf_field_get_error();
+ ASSERT_EXIT(c && err == 0, "Error retrieving field value '" << name <<
+ "': error=" << err);
+ return c;
+ }
+
+ int64_t int64(const char* name) {
+ const struct bt_definition *field = bt_ctf_get_field(m_evt, m_scope, name);
+ ASSERT_EXIT(field != NULL, "Error retrieving field '" << name << "'");
+
+ int64_t val = bt_ctf_get_int64(field);
+ int err = bt_ctf_field_get_error();
+ ASSERT_EXIT(err == 0, "Error retrieving field value '" << name <<
+ "': error=" << err);
+ return val;
+ }
+
+ uint64_t uint64(const char* name) {
+ const struct bt_definition *field = bt_ctf_get_field(m_evt, m_scope, name);
+ ASSERT_EXIT(field != NULL, "Error retrieving field '" << name << "'");
+
+ uint64_t val = bt_ctf_get_uint64(field);
+ int err = bt_ctf_field_get_error();
+ ASSERT_EXIT(err == 0, "Error retrieving field value '" << name <<
+ "': error=" << err);
+ return val;
+ }
+
+ private:
+ struct bt_ctf_event *m_evt;
+ const struct bt_definition *m_scope;
+ } fields(evt, scope_fields);
+
+ if (strcmp(event_name, "librbd:open_image_enter") == 0) {
+ string name(fields.string("name"));
+ string snap_name(fields.string("snap_name"));
+ bool readonly = fields.uint64("read_only");
+ imagectx_id_t imagectx = fields.uint64("imagectx");
+ action_id_t ionum = next_id();
+ pair<string, string> aname(map_image_snap(name, snap_name));
+ IO::ptr io(new OpenImageIO(ionum, ts, threadID, m_recent_completions,
+ imagectx, aname.first, aname.second,
+ readonly));
+ thread->issued_io(io, &m_latest_ios);
+ ios->push_back(io);
+ } else if (strcmp(event_name, "librbd:open_image_exit") == 0) {
+ completed(thread->latest_io());
+ boost::shared_ptr<OpenImageIO> io(boost::dynamic_pointer_cast<OpenImageIO>(thread->latest_io()));
+ ceph_assert(io);
+ m_open_images.insert(io->imagectx());
+ } else if (strcmp(event_name, "librbd:close_image_enter") == 0) {
+ imagectx_id_t imagectx = fields.uint64("imagectx");
+ action_id_t ionum = next_id();
+ IO::ptr io(new CloseImageIO(ionum, ts, threadID, m_recent_completions,
+ imagectx));
+ thread->issued_io(io, &m_latest_ios);
+ ios->push_back(thread->latest_io());
+ } else if (strcmp(event_name, "librbd:close_image_exit") == 0) {
+ completed(thread->latest_io());
+ boost::shared_ptr<CloseImageIO> io(boost::dynamic_pointer_cast<CloseImageIO>(thread->latest_io()));
+ ceph_assert(io);
+ m_open_images.erase(io->imagectx());
+ } else if (strcmp(event_name, "librbd:aio_open_image_enter") == 0) {
+ string name(fields.string("name"));
+ string snap_name(fields.string("snap_name"));
+ bool readonly = fields.uint64("read_only");
+ imagectx_id_t imagectx = fields.uint64("imagectx");
+ uint64_t completion = fields.uint64("completion");
+ action_id_t ionum = next_id();
+ pair<string, string> aname(map_image_snap(name, snap_name));
+ IO::ptr io(new AioOpenImageIO(ionum, ts, threadID, m_recent_completions,
+ imagectx, aname.first, aname.second,
+ readonly));
+ thread->issued_io(io, &m_latest_ios);
+ ios->push_back(io);
+ m_pending_ios[completion] = io;
+ } else if (strcmp(event_name, "librbd:aio_close_image_enter") == 0) {
+ imagectx_id_t imagectx = fields.uint64("imagectx");
+ uint64_t completion = fields.uint64("completion");
+ action_id_t ionum = next_id();
+ IO::ptr io(new AioCloseImageIO(ionum, ts, threadID, m_recent_completions,
+ imagectx));
+ thread->issued_io(io, &m_latest_ios);
+ ios->push_back(thread->latest_io());
+ m_pending_ios[completion] = io;
+ } else if (strcmp(event_name, "librbd:read_enter") == 0 ||
+ strcmp(event_name, "librbd:read2_enter") == 0) {
+ string name(fields.string("name"));
+ string snap_name(fields.string("snap_name"));
+ bool readonly = fields.int64("read_only");
+ imagectx_id_t imagectx = fields.uint64("imagectx");
+ uint64_t offset = fields.uint64("offset");
+ uint64_t length = fields.uint64("length");
+ require_image(ts, thread, imagectx, name, snap_name, readonly, ios);
+ action_id_t ionum = next_id();
+ IO::ptr io(new ReadIO(ionum, ts, threadID, m_recent_completions, imagectx,
+ offset, length));
+ thread->issued_io(io, &m_latest_ios);
+ ios->push_back(io);
+ } else if (strcmp(event_name, "librbd:read_exit") == 0) {
+ completed(thread->latest_io());
+ } else if (strcmp(event_name, "librbd:write_enter") == 0 ||
+ strcmp(event_name, "librbd:write2_enter") == 0) {
+ string name(fields.string("name"));
+ string snap_name(fields.string("snap_name"));
+ bool readonly = fields.int64("read_only");
+ uint64_t offset = fields.uint64("off");
+ uint64_t length = fields.uint64("buf_len");
+ imagectx_id_t imagectx = fields.uint64("imagectx");
+ require_image(ts, thread, imagectx, name, snap_name, readonly, ios);
+ action_id_t ionum = next_id();
+ IO::ptr io(new WriteIO(ionum, ts, threadID, m_recent_completions,
+ imagectx, offset, length));
+ thread->issued_io(io, &m_latest_ios);
+ ios->push_back(io);
+ } else if (strcmp(event_name, "librbd:write_exit") == 0) {
+ completed(thread->latest_io());
+ } else if (strcmp(event_name, "librbd:discard_enter") == 0) {
+ string name(fields.string("name"));
+ string snap_name(fields.string("snap_name"));
+ bool readonly = fields.int64("read_only");
+ uint64_t offset = fields.uint64("off");
+ uint64_t length = fields.uint64("len");
+ imagectx_id_t imagectx = fields.uint64("imagectx");
+ require_image(ts, thread, imagectx, name, snap_name, readonly, ios);
+ action_id_t ionum = next_id();
+ IO::ptr io(new DiscardIO(ionum, ts, threadID, m_recent_completions,
+ imagectx, offset, length));
+ thread->issued_io(io, &m_latest_ios);
+ ios->push_back(io);
+ } else if (strcmp(event_name, "librbd:discard_exit") == 0) {
+ completed(thread->latest_io());
+ } else if (strcmp(event_name, "librbd:aio_read_enter") == 0 ||
+ strcmp(event_name, "librbd:aio_read2_enter") == 0) {
+ string name(fields.string("name"));
+ string snap_name(fields.string("snap_name"));
+ bool readonly = fields.int64("read_only");
+ uint64_t completion = fields.uint64("completion");
+ imagectx_id_t imagectx = fields.uint64("imagectx");
+ uint64_t offset = fields.uint64("offset");
+ uint64_t length = fields.uint64("length");
+ require_image(ts, thread, imagectx, name, snap_name, readonly, ios);
+ action_id_t ionum = next_id();
+ IO::ptr io(new AioReadIO(ionum, ts, threadID, m_recent_completions,
+ imagectx, offset, length));
+ ios->push_back(io);
+ thread->issued_io(io, &m_latest_ios);
+ m_pending_ios[completion] = io;
+ } else if (strcmp(event_name, "librbd:aio_write_enter") == 0 ||
+ strcmp(event_name, "librbd:aio_write2_enter") == 0) {
+ string name(fields.string("name"));
+ string snap_name(fields.string("snap_name"));
+ bool readonly = fields.int64("read_only");
+ uint64_t offset = fields.uint64("off");
+ uint64_t length = fields.uint64("len");
+ uint64_t completion = fields.uint64("completion");
+ imagectx_id_t imagectx = fields.uint64("imagectx");
+ require_image(ts, thread, imagectx, name, snap_name, readonly, ios);
+ action_id_t ionum = next_id();
+ IO::ptr io(new AioWriteIO(ionum, ts, threadID, m_recent_completions,
+ imagectx, offset, length));
+ thread->issued_io(io, &m_latest_ios);
+ ios->push_back(io);
+ m_pending_ios[completion] = io;
+ } else if (strcmp(event_name, "librbd:aio_discard_enter") == 0) {
+ string name(fields.string("name"));
+ string snap_name(fields.string("snap_name"));
+ bool readonly = fields.int64("read_only");
+ uint64_t offset = fields.uint64("off");
+ uint64_t length = fields.uint64("len");
+ uint64_t completion = fields.uint64("completion");
+ imagectx_id_t imagectx = fields.uint64("imagectx");
+ require_image(ts, thread, imagectx, name, snap_name, readonly, ios);
+ action_id_t ionum = next_id();
+ IO::ptr io(new AioDiscardIO(ionum, ts, threadID, m_recent_completions,
+ imagectx, offset, length));
+ thread->issued_io(io, &m_latest_ios);
+ ios->push_back(io);
+ m_pending_ios[completion] = io;
+ } else if (strcmp(event_name, "librbd:aio_complete_enter") == 0) {
+ uint64_t completion = fields.uint64("completion");
+ map<uint64_t, IO::ptr>::iterator itr = m_pending_ios.find(completion);
+ if (itr != m_pending_ios.end()) {
+ IO::ptr completedIO(itr->second);
+ m_pending_ios.erase(itr);
+ completed(completedIO);
+ }
+ }
+ }
+
+ action_id_t next_id() {
+ action_id_t id = m_io_count;
+ m_io_count += 2;
+ return id;
+ }
+
+ void completed(IO::ptr io) {
+ uint64_t limit = (io->start_time() < m_window ?
+ 0 : io->start_time() - m_window);
+ for (io_set_t::iterator itr = m_recent_completions.begin();
+ itr != m_recent_completions.end(); ) {
+ IO::ptr recent_comp(*itr);
+ if ((recent_comp->start_time() < limit ||
+ io->dependencies().count(recent_comp) != 0) &&
+ m_latest_ios.count(recent_comp) == 0) {
+ m_recent_completions.erase(itr++);
+ } else {
+ ++itr;
+ }
+ }
+ m_recent_completions.insert(io);
+ }
+
+ pair<string, string> map_image_snap(string image_name, string snap_name) {
+ if (!m_anonymize) {
+ return pair<string, string>(image_name, snap_name);
+ }
+ AnonymizedImage& m(m_anonymized_images[image_name]);
+ if (m.image_name() == "") {
+ m.init(image_name, m_anonymized_images.size());
+ }
+ return m.anonymize(snap_name);
+ }
+
+ void require_image(uint64_t ts,
+ Thread::ptr thread,
+ imagectx_id_t imagectx,
+ const string& name,
+ const string& snap_name,
+ bool readonly,
+ IO::ptrs *ios) {
+ ceph_assert(thread);
+ if (m_open_images.count(imagectx) > 0) {
+ return;
+ }
+ action_id_t ionum = next_id();
+ pair<string, string> aname(map_image_snap(name, snap_name));
+ IO::ptr io(new OpenImageIO(ionum, ts - 2, thread->id(),
+ m_recent_completions, imagectx, aname.first,
+ aname.second, readonly));
+ thread->issued_io(io, &m_latest_ios);
+ ios->push_back(io);
+ completed(io);
+ m_open_images.insert(imagectx);
+ }
+
+ uint64_t m_window;
+ map<thread_id_t, Thread::ptr> m_threads;
+ uint32_t m_io_count;
+ io_set_t m_recent_completions;
+ set<imagectx_id_t> m_open_images;
+
+ // keyed by completion
+ map<uint64_t, IO::ptr> m_pending_ios;
+ std::set<IO::ptr> m_latest_ios;
+
+ bool m_anonymize;
+ map<string, AnonymizedImage> m_anonymized_images;
+
+ bool m_verbose;
+};
+
+int main(int argc, char** argv) {
+ vector<string> args;
+ for (int i = 0; i < argc; i++) {
+ args.push_back(string(argv[i]));
+ }
+
+ Processor p;
+ p.run(args);
+}
diff --git a/src/rbd_replay/rbd-replay.cc b/src/rbd_replay/rbd-replay.cc
new file mode 100644
index 000000000..f52e75fd9
--- /dev/null
+++ b/src/rbd_replay/rbd-replay.cc
@@ -0,0 +1,131 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2014 Adam Crume <adamcrume@gmail.com>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+#include <vector>
+#include <boost/thread.hpp>
+#include "common/ceph_argparse.h"
+#include "global/global_init.h"
+#include "Replayer.hpp"
+#include "rbd_replay_debug.hpp"
+#include "ImageNameMap.hpp"
+
+
+using namespace rbd_replay;
+
+
+static const char* get_remainder(const char *string, const char *prefix) {
+ while (*prefix) {
+ if (*prefix++ != *string++) {
+ return NULL;
+ }
+ }
+ return string;
+}
+
+static void usage(const char* program) {
+ cout << "Usage: " << program << " --conf=<config_file> <replay_file>" << std::endl;
+ cout << "Options:" << std::endl;
+ cout << " -p, --pool-name <pool> Name of the pool to use. Default: rbd" << std::endl;
+ cout << " --latency-multiplier <float> Multiplies inter-request latencies. Default: 1" << std::endl;
+ cout << " --read-only Only perform non-destructive operations." << std::endl;
+ cout << " --map-image <rule> Add a rule to map image names in the trace to" << std::endl;
+ cout << " image names in the replay cluster." << std::endl;
+ cout << " --dump-perf-counters *Experimental*" << std::endl;
+ cout << " Dump performance counters to standard out before" << std::endl;
+ cout << " an image is closed. Performance counters may be dumped" << std::endl;
+ cout << " multiple times if multiple images are closed, or if" << std::endl;
+ cout << " the same image is opened and closed multiple times." << std::endl;
+ cout << " Performance counters and their meaning may change between" << std::endl;
+ cout << " versions." << std::endl;
+ cout << std::endl;
+ cout << "Image mapping rules:" << std::endl;
+ cout << "A rule of image1@snap1=image2@snap2 would map snap1 of image1 to snap2 of" << std::endl;
+ cout << "image2." << std::endl;
+}
+
+int main(int argc, const char **argv) {
+ vector<const char*> args;
+
+ argv_to_vec(argc, argv, args);
+ if (args.empty()) {
+ cerr << argv[0] << ": -h or --help for usage" << std::endl;
+ exit(1);
+ }
+ if (ceph_argparse_need_usage(args)) {
+ usage(argv[0]);
+ exit(0);
+ }
+ auto cct = global_init(NULL, args, CEPH_ENTITY_TYPE_CLIENT,
+ CODE_ENVIRONMENT_UTILITY, 0);
+
+ std::vector<const char*>::iterator i;
+ string pool_name;
+ float latency_multiplier = 1;
+ bool readonly = false;
+ ImageNameMap image_name_map;
+ std::string val;
+ std::ostringstream err;
+ bool dump_perf_counters = false;
+ for (i = args.begin(); i != args.end(); ) {
+ if (ceph_argparse_double_dash(args, i)) {
+ break;
+ } else if (ceph_argparse_witharg(args, i, &val, "-p", "--pool", (char*)NULL)) {
+ pool_name = val;
+ } else if (ceph_argparse_witharg(args, i, &latency_multiplier, err, "--latency-multiplier",
+ (char*)NULL)) {
+ if (!err.str().empty()) {
+ cerr << err.str() << std::endl;
+ return 1;
+ }
+ } else if (ceph_argparse_flag(args, i, "--read-only", (char*)NULL)) {
+ readonly = true;
+ } else if (ceph_argparse_witharg(args, i, &val, "--map-image", (char*)NULL)) {
+ ImageNameMap::Mapping mapping;
+ if (image_name_map.parse_mapping(val, &mapping)) {
+ image_name_map.add_mapping(mapping);
+ } else {
+ cerr << "Unable to parse mapping string: '" << val << "'" << std::endl;
+ return 1;
+ }
+ } else if (ceph_argparse_flag(args, i, "--dump-perf-counters", (char*)NULL)) {
+ dump_perf_counters = true;
+ } else if (get_remainder(*i, "-")) {
+ cerr << "Unrecognized argument: " << *i << std::endl;
+ return 1;
+ } else {
+ ++i;
+ }
+ }
+
+ common_init_finish(g_ceph_context);
+
+ string replay_file;
+ if (!args.empty()) {
+ replay_file = args[0];
+ }
+
+ if (replay_file.empty()) {
+ cerr << "No replay file specified." << std::endl;
+ return 1;
+ }
+
+ unsigned int nthreads = boost::thread::hardware_concurrency();
+ Replayer replayer(2 * nthreads + 1);
+ replayer.set_latency_multiplier(latency_multiplier);
+ replayer.set_pool_name(pool_name);
+ replayer.set_readonly(readonly);
+ replayer.set_image_name_map(image_name_map);
+ replayer.set_dump_perf_counters(dump_perf_counters);
+ replayer.run(replay_file);
+}
diff --git a/src/rbd_replay/rbd_loc.cc b/src/rbd_replay/rbd_loc.cc
new file mode 100644
index 000000000..ce6e8e6ed
--- /dev/null
+++ b/src/rbd_replay/rbd_loc.cc
@@ -0,0 +1,130 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2014 Adam Crume <adamcrume@gmail.com>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+#include "rbd_loc.hpp"
+#include "include/ceph_assert.h"
+
+
+using namespace std;
+using namespace rbd_replay;
+
+
+rbd_loc::rbd_loc() {
+}
+
+rbd_loc::rbd_loc(const string &pool, const string &image, const string &snap)
+ : pool(pool),
+ image(image),
+ snap(snap) {
+}
+
+bool rbd_loc::parse(string name_string) {
+ int field = 0;
+ string fields[3];
+ bool read_slash = false;
+ bool read_at = false;
+ for (size_t i = 0, n = name_string.length(); i < n; i++) {
+ char c = name_string[i];
+ switch (c) {
+ case '/':
+ if (read_slash || read_at) {
+ return false;
+ }
+ ceph_assert(field == 0);
+ field++;
+ read_slash = true;
+ break;
+ case '@':
+ if (read_at) {
+ return false;
+ }
+ ceph_assert(field < 2);
+ field++;
+ read_at = true;
+ break;
+ case '\\':
+ if (i == n - 1) {
+ return false;
+ }
+ fields[field].push_back(name_string[++i]);
+ break;
+ default:
+ fields[field].push_back(c);
+ }
+ }
+
+ if (read_slash) {
+ pool = fields[0];
+ image = fields[1];
+ // note that if read_at is false, then fields[2] is the empty string,
+ // so this is still correct
+ snap = fields[2];
+ } else {
+ pool = "";
+ image = fields[0];
+ // note that if read_at is false, then fields[1] is the empty string,
+ // so this is still correct
+ snap = fields[1];
+ }
+ return true;
+}
+
+
+static void write(const string &in, string *out) {
+ for (size_t i = 0, n = in.length(); i < n; i++) {
+ char c = in[i];
+ if (c == '@' || c == '/' || c == '\\') {
+ out->push_back('\\');
+ }
+ out->push_back(c);
+ }
+}
+
+string rbd_loc::str() const {
+ string out;
+ if (!pool.empty()) {
+ write(pool, &out);
+ out.push_back('/');
+ }
+ write(image, &out);
+ if (!snap.empty()) {
+ out.push_back('@');
+ write(snap, &out);
+ }
+ return out;
+}
+
+int rbd_loc::compare(const rbd_loc& rhs) const {
+ int c = pool.compare(rhs.pool);
+ if (c) {
+ return c;
+ }
+ c = image.compare(rhs.image);
+ if (c) {
+ return c;
+ }
+ c = snap.compare(rhs.snap);
+ if (c) {
+ return c;
+ }
+ return 0;
+}
+
+bool rbd_loc::operator==(const rbd_loc& rhs) const {
+ return compare(rhs) == 0;
+}
+
+bool rbd_loc::operator<(const rbd_loc& rhs) const {
+ return compare(rhs) < 0;
+}
diff --git a/src/rbd_replay/rbd_loc.hpp b/src/rbd_replay/rbd_loc.hpp
new file mode 100644
index 000000000..8b7bfac1f
--- /dev/null
+++ b/src/rbd_replay/rbd_loc.hpp
@@ -0,0 +1,90 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2014 Adam Crume <adamcrume@gmail.com>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+#ifndef _INCLUDED_RBD_REPLAY_RBD_LOC_HPP
+#define _INCLUDED_RBD_REPLAY_RBD_LOC_HPP
+
+#include <string>
+
+namespace rbd_replay {
+
+/**
+ Stores a pool, image name, and snap name triple.
+ rbd_locs can be converted to/from strings with the format pool/image\@snap.
+ The slash and at signs can be omitted if the pool and snap are empty, respectively.
+ Backslashes can be used to escape slashes and at signs in names.
+ Examples:
+
+ |Pool | Image | Snap | String |
+ |------|-------|------|--------------------|
+ |rbd | vm | 1 | rbd/vm\@1 |
+ |rbd | vm | | rbd/vm |
+ | | vm | 1 | vm\@1 |
+ | | vm | | vm |
+ |rbd | | 1 | rbd/\@1 |
+ |rbd\@x| vm/y | 1 | rbd\\\@x/vm\\/y\@1 |
+
+ (The empty string should obviously be avoided as the image name.)
+
+ Note that the non-canonical forms /vm\@1 and rbd/vm\@ can also be parsed,
+ although they will be formatted as vm\@1 and rbd/vm.
+ */
+struct rbd_loc {
+ /**
+ Constructs an rbd_loc with the empty string for the pool, image, and snap.
+ */
+ rbd_loc();
+
+ /**
+ Constructs an rbd_loc with the given pool, image, and snap.
+ */
+ rbd_loc(const std::string &pool, const std::string &image, const std::string &snap);
+
+ /**
+ Parses an rbd_loc from the given string.
+ If parsing fails, the contents are unmodified.
+ @retval true if parsing succeeded
+ */
+ bool parse(std::string name_string);
+
+ /**
+ Returns the string representation of the locator.
+ */
+ std::string str() const;
+
+ /**
+ Compares the locators lexicographically by pool, then image, then snap.
+ */
+ int compare(const rbd_loc& rhs) const;
+
+ /**
+ Returns true if the locators have identical pool, image, and snap.
+ */
+ bool operator==(const rbd_loc& rhs) const;
+
+ /**
+ Compares the locators lexicographically by pool, then image, then snap.
+ */
+ bool operator<(const rbd_loc& rhs) const;
+
+ std::string pool;
+
+ std::string image;
+
+ std::string snap;
+};
+
+}
+
+#endif
diff --git a/src/rbd_replay/rbd_replay_debug.hpp b/src/rbd_replay/rbd_replay_debug.hpp
new file mode 100644
index 000000000..4b44588b4
--- /dev/null
+++ b/src/rbd_replay/rbd_replay_debug.hpp
@@ -0,0 +1,34 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2014 Adam Crume <adamcrume@gmail.com>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+#ifndef _INCLUDED_RBD_REPLAY_DEBUG_H
+#define _INCLUDED_RBD_REPLAY_DEBUG_H
+
+#include "common/debug.h"
+#include "include/ceph_assert.h"
+
+namespace rbd_replay {
+
+static const int ACTION_LEVEL = 11;
+static const int DEPGRAPH_LEVEL = 12;
+static const int SLEEP_LEVEL = 13;
+static const int THREAD_LEVEL = 10;
+
+}
+
+#define dout_subsys ceph_subsys_rbd_replay
+#undef dout_prefix
+#define dout_prefix *_dout << "rbd_replay: "
+
+#endif