summaryrefslogtreecommitdiffstats
path: root/src/tools/rbd/action/Journal.cc
diff options
context:
space:
mode:
Diffstat (limited to 'src/tools/rbd/action/Journal.cc')
-rw-r--r--src/tools/rbd/action/Journal.cc1251
1 files changed, 1251 insertions, 0 deletions
diff --git a/src/tools/rbd/action/Journal.cc b/src/tools/rbd/action/Journal.cc
new file mode 100644
index 000000000..08606fcc3
--- /dev/null
+++ b/src/tools/rbd/action/Journal.cc
@@ -0,0 +1,1251 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include "tools/rbd/ArgumentTypes.h"
+#include "tools/rbd/Shell.h"
+#include "tools/rbd/Utils.h"
+#include "common/Cond.h"
+#include "common/Formatter.h"
+#include "common/ceph_json.h"
+#include "common/errno.h"
+#include "common/safe_io.h"
+#include "include/stringify.h"
+#include <fstream>
+#include <sstream>
+#include <boost/program_options.hpp>
+#include "cls/rbd/cls_rbd_client.h"
+#include "cls/journal/cls_journal_types.h"
+#include "cls/journal/cls_journal_client.h"
+
+#include "journal/Journaler.h"
+#include "journal/ReplayEntry.h"
+#include "journal/ReplayHandler.h"
+#include "journal/Settings.h"
+#include "librbd/journal/Types.h"
+
+namespace rbd {
+namespace action {
+namespace journal {
+
+namespace at = argument_types;
+namespace po = boost::program_options;
+
+static const std::string JOURNAL_SPEC("journal-spec");
+static const std::string JOURNAL_NAME("journal");
+static const std::string DEST_JOURNAL_NAME("dest-journal");
+
+void add_journal_option(po::options_description *opt,
+ at::ArgumentModifier modifier) {
+ std::string name = JOURNAL_NAME;
+ std::string description = at::get_description_prefix(modifier) +
+ "journal name";
+ switch (modifier) {
+ case at::ARGUMENT_MODIFIER_NONE:
+ case at::ARGUMENT_MODIFIER_SOURCE:
+ break;
+ case at::ARGUMENT_MODIFIER_DEST:
+ name = DEST_JOURNAL_NAME;
+ break;
+ }
+
+ // TODO add validator
+ opt->add_options()
+ (name.c_str(), po::value<std::string>(), description.c_str());
+}
+
+void add_journal_spec_options(po::options_description *pos,
+ po::options_description *opt,
+ at::ArgumentModifier modifier) {
+
+ pos->add_options()
+ ((get_name_prefix(modifier) + JOURNAL_SPEC).c_str(),
+ (get_description_prefix(modifier) + "journal specification\n" +
+ "(example: [<pool-name>/[<namespace>/]]<journal-name>)").c_str());
+ add_pool_option(opt, modifier);
+ add_namespace_option(opt, modifier);
+ add_image_option(opt, modifier);
+ add_journal_option(opt, modifier);
+}
+
+int get_pool_journal_names(const po::variables_map &vm,
+ at::ArgumentModifier mod,
+ size_t *spec_arg_index,
+ std::string *pool_name,
+ std::string *namespace_name,
+ std::string *journal_name) {
+ std::string pool_key = (mod == at::ARGUMENT_MODIFIER_DEST ?
+ at::DEST_POOL_NAME : at::POOL_NAME);
+ std::string namespace_key = (mod == at::ARGUMENT_MODIFIER_DEST ?
+ at::DEST_NAMESPACE_NAME : at::NAMESPACE_NAME);
+ std::string image_key = (mod == at::ARGUMENT_MODIFIER_DEST ?
+ at::DEST_IMAGE_NAME : at::IMAGE_NAME);
+ std::string journal_key = (mod == at::ARGUMENT_MODIFIER_DEST ?
+ DEST_JOURNAL_NAME : JOURNAL_NAME);
+
+ if (vm.count(pool_key) && pool_name != nullptr) {
+ *pool_name = vm[pool_key].as<std::string>();
+ }
+ if (vm.count(namespace_key) && namespace_name != nullptr) {
+ *namespace_name = vm[namespace_key].as<std::string>();
+ }
+ if (vm.count(journal_key) && journal_name != nullptr) {
+ *journal_name = vm[journal_key].as<std::string>();
+ }
+
+ std::string image_name;
+ if (vm.count(image_key)) {
+ image_name = vm[image_key].as<std::string>();
+ }
+
+ int r;
+ if (journal_name != nullptr && !journal_name->empty()) {
+ // despite the separate pool option,
+ // we can also specify them via the journal option
+ std::string journal_name_copy(*journal_name);
+ r = extract_spec(journal_name_copy, pool_name, namespace_name, journal_name,
+ nullptr, utils::SPEC_VALIDATION_FULL);
+ if (r < 0) {
+ return r;
+ }
+ }
+
+ if (!image_name.empty()) {
+ // despite the separate pool option,
+ // we can also specify them via the image option
+ std::string image_name_copy(image_name);
+ r = extract_spec(image_name_copy, pool_name, namespace_name, &image_name,
+ nullptr, utils::SPEC_VALIDATION_NONE);
+ if (r < 0) {
+ return r;
+ }
+ }
+
+ if (journal_name != nullptr && spec_arg_index != nullptr &&
+ journal_name->empty()) {
+ std::string spec = utils::get_positional_argument(vm, (*spec_arg_index)++);
+ if (!spec.empty()) {
+ r = extract_spec(spec, pool_name, namespace_name, journal_name, nullptr,
+ utils::SPEC_VALIDATION_FULL);
+ if (r < 0) {
+ return r;
+ }
+ }
+ }
+
+ if (pool_name != nullptr && pool_name->empty()) {
+ *pool_name = utils::get_default_pool_name();
+ }
+
+ if (pool_name != nullptr && namespace_name != nullptr &&
+ journal_name != nullptr && journal_name->empty() && !image_name.empty()) {
+ // Try to get journal name from image info.
+ librados::Rados rados;
+ librados::IoCtx io_ctx;
+ librbd::Image image;
+ int r = utils::init_and_open_image(*pool_name, *namespace_name, image_name,
+ "", "", true, &rados, &io_ctx, &image);
+ if (r < 0) {
+ std::cerr << "rbd: failed to open image " << image_name
+ << " to get journal name: " << cpp_strerror(r) << std::endl;
+ return r;
+ }
+
+ uint64_t features;
+ r = image.features(&features);
+ if (r < 0) {
+ return r;
+ }
+ if ((features & RBD_FEATURE_JOURNALING) == 0) {
+ std::cerr << "rbd: journaling is not enabled for image " << image_name
+ << std::endl;
+ return -EINVAL;
+ }
+ *journal_name = utils::image_id(image);
+ }
+
+ if (journal_name != nullptr && journal_name->empty()) {
+ std::string prefix = at::get_description_prefix(mod);
+ std::cerr << "rbd: "
+ << (mod == at::ARGUMENT_MODIFIER_DEST ? prefix : std::string())
+ << "journal was not specified" << std::endl;
+ return -EINVAL;
+ }
+
+ return 0;
+}
+
+static int do_show_journal_info(librados::Rados& rados, librados::IoCtx& io_ctx,
+ const std::string& journal_id, Formatter *f)
+{
+ int r;
+ C_SaferCond cond;
+
+ std::string header_oid = ::journal::Journaler::header_oid(journal_id);
+ std::string object_oid_prefix = ::journal::Journaler::object_oid_prefix(
+ io_ctx.get_id(), journal_id);
+ uint8_t order;
+ uint8_t splay_width;
+ int64_t pool_id;
+
+ cls::journal::client::get_immutable_metadata(io_ctx, header_oid, &order,
+ &splay_width, &pool_id, &cond);
+ r = cond.wait();
+ if (r < 0) {
+ std::cerr << "failed to get journal metadata: " << cpp_strerror(r)
+ << std::endl;
+ return r;
+ }
+
+ std::string object_pool_name;
+ if (pool_id >= 0) {
+ r = rados.pool_reverse_lookup(pool_id, &object_pool_name);
+ if (r < 0) {
+ std::cerr << "error looking up pool name for pool_id=" << pool_id << ": "
+ << cpp_strerror(r) << std::endl;
+ }
+ }
+
+ if (f) {
+ f->open_object_section("journal");
+ f->dump_string("journal_id", journal_id);
+ f->dump_string("header_oid", header_oid);
+ f->dump_string("object_oid_prefix", object_oid_prefix);
+ f->dump_int("order", order);
+ f->dump_int("splay_width", splay_width);
+ if (!object_pool_name.empty()) {
+ f->dump_string("object_pool", object_pool_name);
+ }
+ f->close_section();
+ f->flush(std::cout);
+ } else {
+ std::cout << "rbd journal '" << journal_id << "':" << std::endl;
+ std::cout << "\theader_oid: " << header_oid << std::endl;
+ std::cout << "\tobject_oid_prefix: " << object_oid_prefix << std::endl;
+ std::cout << "\torder: " << static_cast<int>(order) << " ("
+ << byte_u_t(1ull << order) << " objects)"<< std::endl;
+ std::cout << "\tsplay_width: " << static_cast<int>(splay_width) << std::endl;
+ if (!object_pool_name.empty()) {
+ std::cout << "\tobject_pool: " << object_pool_name << std::endl;
+ }
+ }
+ return 0;
+}
+
+static int do_show_journal_status(librados::IoCtx& io_ctx,
+ const std::string& journal_id, Formatter *f)
+{
+ int r;
+
+ C_SaferCond cond;
+ uint64_t minimum_set;
+ uint64_t active_set;
+ std::set<cls::journal::Client> registered_clients;
+ std::string oid = ::journal::Journaler::header_oid(journal_id);
+
+ cls::journal::client::get_mutable_metadata(io_ctx, oid, &minimum_set,
+ &active_set, &registered_clients,
+ &cond);
+ r = cond.wait();
+ if (r < 0) {
+ std::cerr << "warning: failed to get journal metadata" << std::endl;
+ return r;
+ }
+
+ if (f) {
+ f->open_object_section("status");
+ f->dump_unsigned("minimum_set", minimum_set);
+ f->dump_unsigned("active_set", active_set);
+ f->open_array_section("registered_clients");
+ for (std::set<cls::journal::Client>::iterator c =
+ registered_clients.begin(); c != registered_clients.end(); ++c) {
+ f->open_object_section("client");
+ c->dump(f);
+ f->close_section();
+ }
+ f->close_section();
+ f->close_section();
+ f->flush(std::cout);
+ } else {
+ std::cout << "minimum_set: " << minimum_set << std::endl;
+ std::cout << "active_set: " << active_set << std::endl;
+ std::cout << "registered clients: " << std::endl;
+ for (std::set<cls::journal::Client>::iterator c =
+ registered_clients.begin(); c != registered_clients.end(); ++c) {
+ std::cout << "\t" << *c << std::endl;
+ }
+ }
+ return 0;
+}
+
+static int do_reset_journal(librados::IoCtx& io_ctx,
+ const std::string& journal_id)
+{
+ // disable/re-enable journaling to delete/re-create the journal
+ // to properly handle mirroring constraints
+ std::string image_name;
+ int r = librbd::cls_client::dir_get_name(&io_ctx, RBD_DIRECTORY, journal_id,
+ &image_name);
+ if (r < 0) {
+ std::cerr << "failed to locate journal's image: " << cpp_strerror(r)
+ << std::endl;
+ return r;
+ }
+
+ librbd::Image image;
+ r = utils::open_image(io_ctx, image_name, false, &image);
+ if (r < 0) {
+ std::cerr << "failed to open image: " << cpp_strerror(r) << std::endl;
+ return r;
+ }
+
+ r = image.update_features(RBD_FEATURE_JOURNALING, false);
+ if (r < 0) {
+ std::cerr << "failed to disable image journaling: " << cpp_strerror(r)
+ << std::endl;
+ return r;
+ }
+
+ r = image.update_features(RBD_FEATURE_JOURNALING, true);
+ if (r < 0) {
+ std::cerr << "failed to re-enable image journaling: " << cpp_strerror(r)
+ << std::endl;
+ return r;
+ }
+ return 0;
+}
+
+static int do_disconnect_journal_client(librados::IoCtx& io_ctx,
+ const std::string& journal_id,
+ const std::string& client_id)
+{
+ int r;
+
+ C_SaferCond cond;
+ uint64_t minimum_set;
+ uint64_t active_set;
+ std::set<cls::journal::Client> registered_clients;
+ std::string oid = ::journal::Journaler::header_oid(journal_id);
+
+ cls::journal::client::get_mutable_metadata(io_ctx, oid, &minimum_set,
+ &active_set, &registered_clients,
+ &cond);
+ r = cond.wait();
+ if (r < 0) {
+ std::cerr << "warning: failed to get journal metadata" << std::endl;
+ return r;
+ }
+
+ static const std::string IMAGE_CLIENT_ID("");
+
+ bool found = false;
+ for (auto &c : registered_clients) {
+ if (c.id == IMAGE_CLIENT_ID || (!client_id.empty() && client_id != c.id)) {
+ continue;
+ }
+ r = cls::journal::client::client_update_state(io_ctx, oid, c.id,
+ cls::journal::CLIENT_STATE_DISCONNECTED);
+ if (r < 0) {
+ std::cerr << "warning: failed to disconnect client " << c.id << ": "
+ << cpp_strerror(r) << std::endl;
+ return r;
+ }
+ std::cout << "client " << c.id << " disconnected" << std::endl;
+ found = true;
+ }
+
+ if (!found) {
+ if (!client_id.empty()) {
+ std::cerr << "warning: client " << client_id << " is not registered"
+ << std::endl;
+ } else {
+ std::cerr << "no registered clients to disconnect" << std::endl;
+ }
+ return -ENOENT;
+ }
+
+ bufferlist bl;
+ r = io_ctx.notify2(oid, bl, 5000, NULL);
+ if (r < 0) {
+ std::cerr << "warning: failed to notify state change:" << ": "
+ << cpp_strerror(r) << std::endl;
+ return r;
+ }
+
+ return 0;
+}
+
+class Journaler : public ::journal::Journaler {
+public:
+ Journaler(librados::IoCtx& io_ctx, const std::string& journal_id,
+ const std::string &client_id) :
+ ::journal::Journaler(io_ctx, journal_id, client_id, {}, nullptr) {
+ }
+
+ int init() {
+ int r;
+
+ // TODO register with librbd payload
+ r = register_client(bufferlist());
+ if (r < 0) {
+ std::cerr << "failed to register client: " << cpp_strerror(r)
+ << std::endl;
+ return r;
+ }
+
+ C_SaferCond cond;
+
+ ::journal::Journaler::init(&cond);
+ r = cond.wait();
+ if (r < 0) {
+ std::cerr << "failed to initialize journal: " << cpp_strerror(r)
+ << std::endl;
+ (void) unregister_client();
+ return r;
+ }
+
+ return 0;
+ }
+
+ int shut_down() {
+ int r = unregister_client();
+ if (r < 0) {
+ std::cerr << "rbd: failed to unregister journal client: "
+ << cpp_strerror(r) << std::endl;
+ }
+ ::journal::Journaler::shut_down();
+
+ return r;
+ }
+};
+
+class JournalPlayer {
+public:
+ JournalPlayer(librados::IoCtx& io_ctx, const std::string& journal_id,
+ const std::string &client_id) :
+ m_journaler(io_ctx, journal_id, client_id),
+ m_cond(),
+ m_r(0) {
+ }
+
+ virtual ~JournalPlayer() {}
+
+ virtual int exec() {
+ int r;
+
+ r = m_journaler.init();
+ if (r < 0) {
+ return r;
+ }
+
+ ReplayHandler replay_handler(this);
+
+ m_journaler.start_replay(&replay_handler);
+
+ r = m_cond.wait();
+ if (r < 0) {
+ std::cerr << "rbd: failed to process journal: " << cpp_strerror(r)
+ << std::endl;
+ if (m_r == 0) {
+ m_r = r;
+ }
+ }
+ return m_r;
+ }
+
+ int shut_down() {
+ return m_journaler.shut_down();
+ }
+
+protected:
+ struct ReplayHandler : public ::journal::ReplayHandler {
+ JournalPlayer *journal;
+ explicit ReplayHandler(JournalPlayer *_journal) : journal(_journal) {}
+
+ void handle_entries_available() override {
+ journal->handle_replay_ready();
+ }
+ void handle_complete(int r) override {
+ journal->handle_replay_complete(r);
+ }
+ };
+
+ void handle_replay_ready() {
+ int r = 0;
+ while (true) {
+ ::journal::ReplayEntry replay_entry;
+ uint64_t tag_id;
+ if (!m_journaler.try_pop_front(&replay_entry, &tag_id)) {
+ break;
+ }
+
+ r = process_entry(replay_entry, tag_id);
+ if (r < 0) {
+ break;
+ }
+ }
+ }
+
+ virtual int process_entry(::journal::ReplayEntry replay_entry,
+ uint64_t tag_id) = 0;
+
+ void handle_replay_complete(int r) {
+ if (m_r == 0 && r < 0) {
+ m_r = r;
+ }
+ m_journaler.stop_replay(&m_cond);
+ }
+
+ Journaler m_journaler;
+ C_SaferCond m_cond;
+ int m_r;
+};
+
+static int inspect_entry(bufferlist& data,
+ librbd::journal::EventEntry& event_entry,
+ bool verbose) {
+ try {
+ auto it = data.cbegin();
+ decode(event_entry, it);
+ } catch (const buffer::error &err) {
+ std::cerr << "failed to decode event entry: " << err.what() << std::endl;
+ return -EINVAL;
+ }
+ if (verbose) {
+ JSONFormatter f(true);
+ f.open_object_section("event_entry");
+ event_entry.dump(&f);
+ f.close_section();
+ f.flush(std::cout);
+ }
+ return 0;
+}
+
+class JournalInspector : public JournalPlayer {
+public:
+ JournalInspector(librados::IoCtx& io_ctx, const std::string& journal_id,
+ bool verbose) :
+ JournalPlayer(io_ctx, journal_id, "INSPECT"),
+ m_verbose(verbose),
+ m_s() {
+ }
+
+ int exec() override {
+ int r = JournalPlayer::exec();
+ m_s.print();
+ return r;
+ }
+
+private:
+ struct Stats {
+ Stats() : total(0), error(0) {}
+
+ void print() {
+ std::cout << "Summary:" << std::endl
+ << " " << total << " entries inspected, " << error << " errors"
+ << std::endl;
+ }
+
+ int total;
+ int error;
+ };
+
+ int process_entry(::journal::ReplayEntry replay_entry,
+ uint64_t tag_id) override {
+ m_s.total++;
+ if (m_verbose) {
+ std::cout << "Entry: tag_id=" << tag_id << ", commit_tid="
+ << replay_entry.get_commit_tid() << std::endl;
+ }
+ bufferlist data = replay_entry.get_data();
+ librbd::journal::EventEntry event_entry;
+ int r = inspect_entry(data, event_entry, m_verbose);
+ if (r < 0) {
+ m_r = r;
+ m_s.error++;
+ }
+ return 0;
+ }
+
+ bool m_verbose;
+ Stats m_s;
+};
+
+static int do_inspect_journal(librados::IoCtx& io_ctx,
+ const std::string& journal_id,
+ bool verbose) {
+ JournalInspector inspector(io_ctx, journal_id, verbose);
+ int r = inspector.exec();
+ if (r < 0) {
+ inspector.shut_down();
+ return r;
+ }
+
+ r = inspector.shut_down();
+ if (r < 0) {
+ return r;
+ }
+ return 0;
+}
+
+struct ExportEntry {
+ uint64_t tag_id;
+ uint64_t commit_tid;
+ int type;
+ bufferlist entry;
+
+ ExportEntry() : tag_id(0), commit_tid(0), type(0), entry() {}
+
+ ExportEntry(uint64_t tag_id, uint64_t commit_tid, int type,
+ const bufferlist& entry)
+ : tag_id(tag_id), commit_tid(commit_tid), type(type), entry(entry) {
+ }
+
+ void dump(Formatter *f) const {
+ ::encode_json("tag_id", tag_id, f);
+ ::encode_json("commit_tid", commit_tid, f);
+ ::encode_json("type", type, f);
+ ::encode_json("entry", entry, f);
+ }
+
+ void decode_json(JSONObj *obj) {
+ JSONDecoder::decode_json("tag_id", tag_id, obj);
+ JSONDecoder::decode_json("commit_tid", commit_tid, obj);
+ JSONDecoder::decode_json("type", type, obj);
+ JSONDecoder::decode_json("entry", entry, obj);
+ }
+};
+
+class JournalExporter : public JournalPlayer {
+public:
+ JournalExporter(librados::IoCtx& io_ctx, const std::string& journal_id,
+ int fd, bool no_error, bool verbose) :
+ JournalPlayer(io_ctx, journal_id, "EXPORT"),
+ m_journal_id(journal_id),
+ m_fd(fd),
+ m_no_error(no_error),
+ m_verbose(verbose),
+ m_s() {
+ }
+
+ int exec() override {
+ std::string header("# journal_id: " + m_journal_id + "\n");
+ int r;
+ r = safe_write(m_fd, header.c_str(), header.size());
+ if (r < 0) {
+ std::cerr << "rbd: failed to write to export file: " << cpp_strerror(r)
+ << std::endl;
+ return r;
+ }
+ r = JournalPlayer::exec();
+ m_s.print();
+ return r;
+ }
+
+private:
+ struct Stats {
+ Stats() : total(0), error(0) {}
+
+ void print() {
+ std::cout << total << " entries processed, " << error << " errors"
+ << std::endl;
+ }
+
+ int total;
+ int error;
+ };
+
+ int process_entry(::journal::ReplayEntry replay_entry,
+ uint64_t tag_id) override {
+ m_s.total++;
+ int type = -1;
+ bufferlist entry = replay_entry.get_data();
+ librbd::journal::EventEntry event_entry;
+ int r = inspect_entry(entry, event_entry, m_verbose);
+ if (r < 0) {
+ m_s.error++;
+ m_r = r;
+ return m_no_error ? 0 : r;
+ } else {
+ type = event_entry.get_event_type();
+ }
+ ExportEntry export_entry(tag_id, replay_entry.get_commit_tid(), type,
+ entry);
+ JSONFormatter f;
+ ::encode_json("event_entry", export_entry, &f);
+ std::ostringstream oss;
+ f.flush(oss);
+ std::string objstr = oss.str();
+ std::string header = stringify(objstr.size()) + " ";
+ r = safe_write(m_fd, header.c_str(), header.size());
+ if (r == 0) {
+ r = safe_write(m_fd, objstr.c_str(), objstr.size());
+ }
+ if (r == 0) {
+ r = safe_write(m_fd, "\n", 1);
+ }
+ if (r < 0) {
+ std::cerr << "rbd: failed to write to export file: " << cpp_strerror(r)
+ << std::endl;
+ m_s.error++;
+ return r;
+ }
+ return 0;
+ }
+
+ std::string m_journal_id;
+ int m_fd;
+ bool m_no_error;
+ bool m_verbose;
+ Stats m_s;
+};
+
+static int do_export_journal(librados::IoCtx& io_ctx,
+ const std::string& journal_id,
+ const std::string& path,
+ bool no_error, bool verbose) {
+ int r;
+ int fd;
+ bool to_stdout = path == "-";
+ if (to_stdout) {
+ fd = STDOUT_FILENO;
+ } else {
+ fd = open(path.c_str(), O_WRONLY | O_CREAT | O_EXCL | O_BINARY, 0644);
+ if (fd < 0) {
+ r = -errno;
+ std::cerr << "rbd: error creating " << path << std::endl;
+ return r;
+ }
+#ifdef HAVE_POSIX_FADVISE
+ posix_fadvise(fd, 0, 0, POSIX_FADV_SEQUENTIAL);
+#endif
+ }
+
+ JournalExporter exporter(io_ctx, journal_id, fd, no_error, verbose);
+ r = exporter.exec();
+
+ if (!to_stdout) {
+ close(fd);
+ }
+
+ int shut_down_r = exporter.shut_down();
+ if (r == 0 && shut_down_r < 0) {
+ r = shut_down_r;
+ }
+
+ return r;
+}
+
+class JournalImporter {
+public:
+ JournalImporter(librados::IoCtx& io_ctx, const std::string& journal_id,
+ int fd, bool no_error, bool verbose) :
+ m_journaler(io_ctx, journal_id, "IMPORT"),
+ m_fd(fd),
+ m_no_error(no_error),
+ m_verbose(verbose) {
+ }
+
+ bool read_entry(bufferlist& bl, int& r) {
+ // Entries are stored in the file using the following format:
+ //
+ // # Optional comments
+ // NNN {json encoded entry}
+ // ...
+ //
+ // Where NNN is the encoded entry size.
+ bl.clear();
+ char buf[80];
+ // Skip line feed and comments (lines started with #).
+ while ((r = safe_read_exact(m_fd, buf, 1)) == 0) {
+ if (buf[0] == '\n') {
+ continue;
+ } else if (buf[0] == '#') {
+ while ((r = safe_read_exact(m_fd, buf, 1)) == 0) {
+ if (buf[0] == '\n') {
+ break;
+ }
+ }
+ } else {
+ break;
+ }
+ }
+ if (r < 0) {
+ if (r == -EDOM) {
+ r = 0;
+ }
+ return false;
+ }
+ // Read entry size to buf.
+ if (!isdigit(buf[0])) {
+ r = -EINVAL;
+ std::cerr << "rbd: import data invalid format (digit expected)"
+ << std::endl;
+ return false;
+ }
+ for (size_t i = 1; i < sizeof(buf); i++) {
+ r = safe_read_exact(m_fd, buf + i, 1);
+ if (r < 0) {
+ std::cerr << "rbd: error reading import data" << std::endl;
+ return false;
+ }
+ if (!isdigit(buf[i])) {
+ if (buf[i] != ' ') {
+ r = -EINVAL;
+ std::cerr << "rbd: import data invalid format (space expected)"
+ << std::endl;
+ return false;
+ }
+ buf[i] = '\0';
+ break;
+ }
+ }
+ int entry_size = atoi(buf);
+ if (entry_size == 0) {
+ r = -EINVAL;
+ std::cerr << "rbd: import data invalid format (zero entry size)"
+ << std::endl;
+ return false;
+ }
+ ceph_assert(entry_size > 0);
+ // Read entry.
+ r = bl.read_fd(m_fd, entry_size);
+ if (r < 0) {
+ std::cerr << "rbd: error reading from stdin: " << cpp_strerror(r)
+ << std::endl;
+ return false;
+ }
+ if (r != entry_size) {
+ std::cerr << "rbd: error reading from stdin: truncated"
+ << std::endl;
+ r = -EINVAL;
+ return false;
+ }
+ r = 0;
+ return true;
+ }
+
+ int exec() {
+ int r = m_journaler.init();
+ if (r < 0) {
+ return r;
+ }
+ m_journaler.start_append(0);
+
+ int r1 = 0;
+ bufferlist bl;
+ int n = 0;
+ int error_count = 0;
+ while (read_entry(bl, r)) {
+ n++;
+ error_count++;
+ JSONParser p;
+ if (!p.parse(bl.c_str(), bl.length())) {
+ std::cerr << "rbd: error parsing input (entry " << n << ")"
+ << std::endl;
+ r = -EINVAL;
+ if (m_no_error) {
+ r1 = r;
+ continue;
+ } else {
+ break;
+ }
+ }
+ ExportEntry e;
+ try {
+ decode_json_obj(e, &p);
+ } catch (const JSONDecoder::err& err) {
+ std::cerr << "rbd: error json decoding import data (entry " << n << "):"
+ << err.what() << std::endl;
+ r = -EINVAL;
+ if (m_no_error) {
+ r1 = r;
+ continue;
+ } else {
+ break;
+ }
+ }
+ librbd::journal::EventEntry event_entry;
+ r = inspect_entry(e.entry, event_entry, m_verbose);
+ if (r < 0) {
+ std::cerr << "rbd: corrupted entry " << n << ": tag_tid=" << e.tag_id
+ << ", commit_tid=" << e.commit_tid << std::endl;
+ if (m_no_error) {
+ r1 = r;
+ continue;
+ } else {
+ break;
+ }
+ }
+ m_journaler.append(e.tag_id, e.entry);
+ error_count--;
+ }
+
+ std::cout << n << " entries processed, " << error_count << " errors" << std::endl;
+
+ std::cout << "Waiting for journal append to complete..." << std::endl;
+
+ C_SaferCond cond;
+ m_journaler.stop_append(&cond);
+ r = cond.wait();
+
+ if (r < 0) {
+ std::cerr << "failed to append journal: " << cpp_strerror(r) << std::endl;
+ }
+
+ if (r1 < 0 && r == 0) {
+ r = r1;
+ }
+ return r;
+ }
+
+ int shut_down() {
+ return m_journaler.shut_down();
+ }
+
+private:
+ Journaler m_journaler;
+ int m_fd;
+ bool m_no_error;
+ bool m_verbose;
+};
+
+static int do_import_journal(librados::IoCtx& io_ctx,
+ const std::string& journal_id,
+ const std::string& path,
+ bool no_error, bool verbose) {
+ int r;
+
+ int fd;
+ bool from_stdin = path == "-";
+ if (from_stdin) {
+ fd = STDIN_FILENO;
+ } else {
+ if ((fd = open(path.c_str(), O_RDONLY|O_BINARY)) < 0) {
+ r = -errno;
+ std::cerr << "rbd: error opening " << path << std::endl;
+ return r;
+ }
+#ifdef HAVE_POSIX_FADVISE
+ posix_fadvise(fd, 0, 0, POSIX_FADV_SEQUENTIAL);
+#endif
+ }
+
+ JournalImporter importer(io_ctx, journal_id, fd, no_error, verbose);
+ r = importer.exec();
+
+ if (!from_stdin) {
+ close(fd);
+ }
+
+ int shut_down_r = importer.shut_down();
+ if (r == 0 && shut_down_r < 0) {
+ r = shut_down_r;
+ }
+
+ return r;
+}
+
+void get_info_arguments(po::options_description *positional,
+ po::options_description *options) {
+ add_journal_spec_options(positional, options, at::ARGUMENT_MODIFIER_NONE);
+ at::add_format_options(options);
+}
+
+int execute_info(const po::variables_map &vm,
+ const std::vector<std::string> &ceph_global_init_args) {
+ size_t arg_index = 0;
+ std::string pool_name;
+ std::string namespace_name;
+ std::string journal_name;
+ int r = get_pool_journal_names(vm, at::ARGUMENT_MODIFIER_NONE, &arg_index,
+ &pool_name, &namespace_name, &journal_name);
+ if (r < 0) {
+ return r;
+ }
+
+ at::Format::Formatter formatter;
+ r = utils::get_formatter(vm, &formatter);
+ if (r < 0) {
+ return r;
+ }
+
+ librados::Rados rados;
+ librados::IoCtx io_ctx;
+ r = utils::init(pool_name, namespace_name, &rados, &io_ctx);
+ if (r < 0) {
+ return r;
+ }
+
+ r = do_show_journal_info(rados, io_ctx, journal_name, formatter.get());
+ if (r < 0) {
+ std::cerr << "rbd: journal info: " << cpp_strerror(r) << std::endl;
+ return r;
+ }
+ return 0;
+
+}
+
+void get_status_arguments(po::options_description *positional,
+ po::options_description *options) {
+ add_journal_spec_options(positional, options, at::ARGUMENT_MODIFIER_NONE);
+ at::add_format_options(options);
+}
+
+int execute_status(const po::variables_map &vm,
+ const std::vector<std::string> &ceph_global_init_args) {
+ size_t arg_index = 0;
+ std::string pool_name;
+ std::string namespace_name;
+ std::string journal_name;
+ int r = get_pool_journal_names(vm, at::ARGUMENT_MODIFIER_NONE, &arg_index,
+ &pool_name, &namespace_name, &journal_name);
+ if (r < 0) {
+ return r;
+ }
+
+ at::Format::Formatter formatter;
+ r = utils::get_formatter(vm, &formatter);
+ if (r < 0) {
+ return r;
+ }
+
+ librados::Rados rados;
+ librados::IoCtx io_ctx;
+ r = utils::init(pool_name, namespace_name, &rados, &io_ctx);
+ if (r < 0) {
+ return r;
+ }
+
+ r = do_show_journal_status(io_ctx, journal_name, formatter.get());
+ if (r < 0) {
+ std::cerr << "rbd: journal status: " << cpp_strerror(r) << std::endl;
+ return r;
+ }
+ return 0;
+}
+
+void get_reset_arguments(po::options_description *positional,
+ po::options_description *options) {
+ add_journal_spec_options(positional, options, at::ARGUMENT_MODIFIER_NONE);
+}
+
+int execute_reset(const po::variables_map &vm,
+ const std::vector<std::string> &ceph_global_init_args) {
+ size_t arg_index = 0;
+ std::string pool_name;
+ std::string namespace_name;
+ std::string journal_name;
+ int r = get_pool_journal_names(vm, at::ARGUMENT_MODIFIER_NONE, &arg_index,
+ &pool_name, &namespace_name, &journal_name);
+ if (r < 0) {
+ return r;
+ }
+
+ librados::Rados rados;
+ librados::IoCtx io_ctx;
+ r = utils::init(pool_name, namespace_name, &rados, &io_ctx);
+ if (r < 0) {
+ return r;
+ }
+
+ r = do_reset_journal(io_ctx, journal_name);
+ if (r < 0) {
+ std::cerr << "rbd: journal reset: " << cpp_strerror(r) << std::endl;
+ return r;
+ }
+ return 0;
+}
+
+void get_client_disconnect_arguments(po::options_description *positional,
+ po::options_description *options) {
+ add_journal_spec_options(positional, options, at::ARGUMENT_MODIFIER_NONE);
+ options->add_options()
+ ("client-id", po::value<std::string>(),
+ "client ID (or leave unspecified to disconnect all)");
+}
+
+int execute_client_disconnect(const po::variables_map &vm,
+ const std::vector<std::string> &ceph_global_init_args) {
+ size_t arg_index = 0;
+ std::string pool_name;
+ std::string namespace_name;
+ std::string journal_name;
+ int r = get_pool_journal_names(vm, at::ARGUMENT_MODIFIER_NONE, &arg_index,
+ &pool_name, &namespace_name, &journal_name);
+ if (r < 0) {
+ return r;
+ }
+
+ std::string client_id;
+ if (vm.count("client-id")) {
+ client_id = vm["client-id"].as<std::string>();
+ }
+
+ librados::Rados rados;
+ librados::IoCtx io_ctx;
+ r = utils::init(pool_name, namespace_name, &rados, &io_ctx);
+ if (r < 0) {
+ return r;
+ }
+
+ r = do_disconnect_journal_client(io_ctx, journal_name, client_id);
+ if (r < 0) {
+ std::cerr << "rbd: journal client disconnect: " << cpp_strerror(r)
+ << std::endl;
+ return r;
+ }
+ return 0;
+}
+
+void get_inspect_arguments(po::options_description *positional,
+ po::options_description *options) {
+ add_journal_spec_options(positional, options, at::ARGUMENT_MODIFIER_NONE);
+ at::add_verbose_option(options);
+}
+
+int execute_inspect(const po::variables_map &vm,
+ const std::vector<std::string> &ceph_global_init_args) {
+ size_t arg_index = 0;
+ std::string pool_name;
+ std::string namespace_name;
+ std::string journal_name;
+ int r = get_pool_journal_names(vm, at::ARGUMENT_MODIFIER_NONE, &arg_index,
+ &pool_name, &namespace_name, &journal_name);
+ if (r < 0) {
+ return r;
+ }
+
+ librados::Rados rados;
+ librados::IoCtx io_ctx;
+ r = utils::init(pool_name, namespace_name, &rados, &io_ctx);
+ if (r < 0) {
+ return r;
+ }
+
+ r = do_inspect_journal(io_ctx, journal_name, vm[at::VERBOSE].as<bool>());
+ if (r < 0) {
+ std::cerr << "rbd: journal inspect: " << cpp_strerror(r) << std::endl;
+ return r;
+ }
+ return 0;
+}
+
+void get_export_arguments(po::options_description *positional,
+ po::options_description *options) {
+ add_journal_spec_options(positional, options,
+ at::ARGUMENT_MODIFIER_SOURCE);
+ at::add_path_options(positional, options,
+ "export file (or '-' for stdout)");
+ at::add_verbose_option(options);
+ at::add_no_error_option(options);
+}
+
+int execute_export(const po::variables_map &vm,
+ const std::vector<std::string> &ceph_global_init_args) {
+ size_t arg_index = 0;
+ std::string pool_name;
+ std::string namespace_name;
+ std::string journal_name;
+ int r = get_pool_journal_names(vm, at::ARGUMENT_MODIFIER_SOURCE, &arg_index,
+ &pool_name, &namespace_name, &journal_name);
+ if (r < 0) {
+ return r;
+ }
+
+ std::string path;
+ r = utils::get_path(vm, &arg_index, &path);
+ if (r < 0) {
+ return r;
+ }
+
+ librados::Rados rados;
+ librados::IoCtx io_ctx;
+ r = utils::init(pool_name, namespace_name, &rados, &io_ctx);
+ if (r < 0) {
+ return r;
+ }
+
+ r = do_export_journal(io_ctx, journal_name, path, vm[at::NO_ERR].as<bool>(),
+ vm[at::VERBOSE].as<bool>());
+ if (r < 0) {
+ std::cerr << "rbd: journal export: " << cpp_strerror(r) << std::endl;
+ return r;
+ }
+ return 0;
+}
+
+void get_import_arguments(po::options_description *positional,
+ po::options_description *options) {
+ at::add_path_options(positional, options,
+ "import file (or '-' for stdin)");
+ add_journal_spec_options(positional, options, at::ARGUMENT_MODIFIER_DEST);
+ at::add_verbose_option(options);
+ at::add_no_error_option(options);
+}
+
+int execute_import(const po::variables_map &vm,
+ const std::vector<std::string> &ceph_global_init_args) {
+ std::string path;
+ size_t arg_index = 0;
+ int r = utils::get_path(vm, &arg_index, &path);
+ if (r < 0) {
+ return r;
+ }
+
+ std::string pool_name;
+ std::string namespace_name;
+ std::string journal_name;
+ r = get_pool_journal_names(vm, at::ARGUMENT_MODIFIER_DEST, &arg_index,
+ &pool_name, &namespace_name, &journal_name);
+ if (r < 0) {
+ return r;
+ }
+
+ librados::Rados rados;
+ librados::IoCtx io_ctx;
+ r = utils::init(pool_name, namespace_name, &rados, &io_ctx);
+ if (r < 0) {
+ return r;
+ }
+
+ r = do_import_journal(io_ctx, journal_name, path, vm[at::NO_ERR].as<bool>(),
+ vm[at::VERBOSE].as<bool>());
+ if (r < 0) {
+ std::cerr << "rbd: journal import: " << cpp_strerror(r) << std::endl;
+ return r;
+ }
+ return 0;
+}
+
+Shell::Action action_info(
+ {"journal", "info"}, {}, "Show information about image journal.", "",
+ &get_info_arguments, &execute_info);
+
+Shell::Action action_status(
+ {"journal", "status"}, {}, "Show status of image journal.", "",
+ &get_status_arguments, &execute_status);
+
+Shell::Action action_reset(
+ {"journal", "reset"}, {}, "Reset image journal.", "",
+ &get_reset_arguments, &execute_reset);
+
+Shell::Action action_inspect(
+ {"journal", "inspect"}, {}, "Inspect image journal for structural errors.", "",
+ &get_inspect_arguments, &execute_inspect);
+
+Shell::Action action_export(
+ {"journal", "export"}, {}, "Export image journal.", "",
+ &get_export_arguments, &execute_export);
+
+Shell::Action action_import(
+ {"journal", "import"}, {}, "Import image journal.", "",
+ &get_import_arguments, &execute_import);
+
+Shell::Action action_disconnect(
+ {"journal", "client", "disconnect"}, {},
+ "Flag image journal client as disconnected.", "",
+ &get_client_disconnect_arguments, &execute_client_disconnect);
+
+} // namespace journal
+} // namespace action
+} // namespace rbd