summaryrefslogtreecommitdiffstats
path: root/src/librbd/migration
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-07 18:45:59 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-07 18:45:59 +0000
commit19fcec84d8d7d21e796c7624e521b60d28ee21ed (patch)
tree42d26aa27d1e3f7c0b8bd3fd14e7d7082f5008dc /src/librbd/migration
parentInitial commit. (diff)
downloadceph-upstream.tar.xz
ceph-upstream.zip
Adding upstream version 16.2.11+ds.upstream/16.2.11+dsupstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to '')
-rw-r--r--src/librbd/migration/FileStream.cc232
-rw-r--r--src/librbd/migration/FileStream.h68
-rw-r--r--src/librbd/migration/FormatInterface.h53
-rw-r--r--src/librbd/migration/HttpClient.cc946
-rw-r--r--src/librbd/migration/HttpClient.h205
-rw-r--r--src/librbd/migration/HttpProcessorInterface.h27
-rw-r--r--src/librbd/migration/HttpStream.cc83
-rw-r--r--src/librbd/migration/HttpStream.h68
-rw-r--r--src/librbd/migration/ImageDispatch.cc157
-rw-r--r--src/librbd/migration/ImageDispatch.h102
-rw-r--r--src/librbd/migration/NativeFormat.cc309
-rw-r--r--src/librbd/migration/NativeFormat.h82
-rw-r--r--src/librbd/migration/OpenSourceImageRequest.cc249
-rw-r--r--src/librbd/migration/OpenSourceImageRequest.h103
-rw-r--r--src/librbd/migration/QCOW.h466
-rw-r--r--src/librbd/migration/QCOWFormat.cc1542
-rw-r--r--src/librbd/migration/QCOWFormat.h211
-rw-r--r--src/librbd/migration/RawFormat.cc235
-rw-r--r--src/librbd/migration/RawFormat.h78
-rw-r--r--src/librbd/migration/RawSnapshot.cc220
-rw-r--r--src/librbd/migration/RawSnapshot.h75
-rw-r--r--src/librbd/migration/S3Stream.cc202
-rw-r--r--src/librbd/migration/S3Stream.h78
-rw-r--r--src/librbd/migration/SnapshotInterface.h48
-rw-r--r--src/librbd/migration/SourceSpecBuilder.cc147
-rw-r--r--src/librbd/migration/SourceSpecBuilder.h54
-rw-r--r--src/librbd/migration/StreamInterface.h32
-rw-r--r--src/librbd/migration/Types.h42
-rw-r--r--src/librbd/migration/Utils.cc133
-rw-r--r--src/librbd/migration/Utils.h30
30 files changed, 6277 insertions, 0 deletions
diff --git a/src/librbd/migration/FileStream.cc b/src/librbd/migration/FileStream.cc
new file mode 100644
index 000000000..63cd722dd
--- /dev/null
+++ b/src/librbd/migration/FileStream.cc
@@ -0,0 +1,232 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#ifndef _LARGEFILE64_SOURCE
+#define _LARGEFILE64_SOURCE
+#endif // _LARGEFILE64_SOURCE
+
+#include "librbd/migration/FileStream.h"
+#include "common/dout.h"
+#include "common/errno.h"
+#include "librbd/AsioEngine.h"
+#include "librbd/ImageCtx.h"
+#include "librbd/asio/Utils.h"
+#include <boost/asio/buffer.hpp>
+#include <boost/asio/post.hpp>
+#include <boost/asio/read.hpp>
+#include <fcntl.h>
+#include <unistd.h>
+
+namespace librbd {
+namespace migration {
+
+namespace {
+
+const std::string FILE_PATH {"file_path"};
+
+} // anonymous namespace
+
+#ifdef BOOST_ASIO_HAS_POSIX_STREAM_DESCRIPTOR
+
+#define dout_subsys ceph_subsys_rbd
+#undef dout_prefix
+#define dout_prefix *_dout << "librbd::migration::FileStream::ReadRequest " \
+ << this << " " << __func__ << ": "
+
+template <typename I>
+struct FileStream<I>::ReadRequest {
+ FileStream* file_stream;
+ io::Extents byte_extents;
+ bufferlist* data;
+ Context* on_finish;
+ size_t index = 0;
+
+ ReadRequest(FileStream* file_stream, io::Extents&& byte_extents,
+ bufferlist* data, Context* on_finish)
+ : file_stream(file_stream), byte_extents(std::move(byte_extents)),
+ data(data), on_finish(on_finish) {
+ auto cct = file_stream->m_cct;
+ ldout(cct, 20) << dendl;
+ }
+
+ void send() {
+ data->clear();
+ read();
+ }
+
+ void read() {
+ auto cct = file_stream->m_cct;
+ if (index >= byte_extents.size()) {
+ finish(0);
+ return;
+ }
+
+ auto& byte_extent = byte_extents[index++];
+ ldout(cct, 20) << "byte_extent=" << byte_extent << dendl;
+
+ auto ptr = buffer::ptr_node::create(buffer::create_small_page_aligned(
+ byte_extent.second));
+ auto buffer = boost::asio::mutable_buffer(
+ ptr->c_str(), byte_extent.second);
+ data->push_back(std::move(ptr));
+
+ int r;
+ auto offset = lseek64(file_stream->m_file_no, byte_extent.first, SEEK_SET);
+ if (offset == -1) {
+ r = -errno;
+ lderr(cct) << "failed to seek file stream: " << cpp_strerror(r) << dendl;
+ finish(r);
+ return;
+ }
+
+ boost::system::error_code ec;
+ size_t bytes_read = boost::asio::read(
+ *file_stream->m_stream_descriptor, std::move(buffer), ec);
+ r = -ec.value();
+ if (r < 0 && r != -ENOENT) {
+ lderr(cct) << "failed to read from file stream: " << cpp_strerror(r)
+ << dendl;
+ finish(r);
+ return;
+ } else if (bytes_read < byte_extent.second) {
+ lderr(cct) << "failed to read " << byte_extent.second << " bytes from "
+ << "file stream" << dendl;
+ finish(-ERANGE);
+ return;
+ }
+
+ // re-queue the remainder of the read requests
+ boost::asio::post(file_stream->m_strand, [this]() { read(); });
+ }
+
+ void finish(int r) {
+ auto cct = file_stream->m_cct;
+ ldout(cct, 20) << "r=" << r << dendl;
+
+ if (r < 0) {
+ data->clear();
+ }
+
+ on_finish->complete(r);
+ delete this;
+ }
+};
+
+#endif // BOOST_ASIO_HAS_POSIX_STREAM_DESCRIPTOR
+
+#undef dout_prefix
+#define dout_prefix *_dout << "librbd::migration::FileStream: " << this \
+ << " " << __func__ << ": "
+
+template <typename I>
+FileStream<I>::FileStream(I* image_ctx, const json_spirit::mObject& json_object)
+ : m_cct(image_ctx->cct), m_asio_engine(image_ctx->asio_engine),
+ m_json_object(json_object),
+ m_strand(boost::asio::make_strand(*m_asio_engine)) {
+}
+
+template <typename I>
+FileStream<I>::~FileStream() {
+ if (m_file_no != -1) {
+ ::close(m_file_no);
+ }
+}
+
+#ifdef BOOST_ASIO_HAS_POSIX_STREAM_DESCRIPTOR
+
+template <typename I>
+void FileStream<I>::open(Context* on_finish) {
+ auto& file_path_value = m_json_object[FILE_PATH];
+ if (file_path_value.type() != json_spirit::str_type) {
+ lderr(m_cct) << "failed to locate '" << FILE_PATH << "' key" << dendl;
+ on_finish->complete(-EINVAL);
+ return;
+ }
+
+ auto& file_path = file_path_value.get_str();
+ ldout(m_cct, 10) << "file_path=" << file_path << dendl;
+
+ m_file_no = ::open(file_path.c_str(), O_RDONLY);
+ if (m_file_no < 0) {
+ int r = -errno;
+ lderr(m_cct) << "failed to open file stream '" << file_path << "': "
+ << cpp_strerror(r) << dendl;
+ on_finish->complete(r);
+ return;
+ }
+
+ m_stream_descriptor = std::make_optional<
+ boost::asio::posix::stream_descriptor>(m_strand, m_file_no);
+ on_finish->complete(0);
+}
+
+template <typename I>
+void FileStream<I>::close(Context* on_finish) {
+ ldout(m_cct, 10) << dendl;
+
+ m_stream_descriptor.reset();
+ on_finish->complete(0);
+}
+
+template <typename I>
+void FileStream<I>::get_size(uint64_t* size, Context* on_finish) {
+ ldout(m_cct, 10) << dendl;
+
+ // execute IO operations in a single strand to prevent seek races
+ boost::asio::post(
+ m_strand, [this, size, on_finish]() {
+ auto offset = lseek64(m_file_no, 0, SEEK_END);
+ if (offset == -1) {
+ int r = -errno;
+ lderr(m_cct) << "failed to seek to file end: " << cpp_strerror(r)
+ << dendl;
+ on_finish->complete(r);
+ return;
+ }
+
+ ldout(m_cct, 10) << "size=" << offset << dendl;
+ *size = offset;
+ on_finish->complete(0);
+ });
+}
+
+template <typename I>
+void FileStream<I>::read(io::Extents&& byte_extents, bufferlist* data,
+ Context* on_finish) {
+ ldout(m_cct, 20) << byte_extents << dendl;
+
+ auto ctx = new ReadRequest(this, std::move(byte_extents), data, on_finish);
+
+ // execute IO operations in a single strand to prevent seek races
+ boost::asio::post(m_strand, [ctx]() { ctx->send(); });
+}
+
+#else // BOOST_ASIO_HAS_POSIX_STREAM_DESCRIPTOR
+
+template <typename I>
+void FileStream<I>::open(Context* on_finish) {
+ on_finish->complete(-EIO);
+}
+
+template <typename I>
+void FileStream<I>::close(Context* on_finish) {
+ on_finish->complete(-EIO);
+}
+
+template <typename I>
+void FileStream<I>::get_size(uint64_t* size, Context* on_finish) {
+ on_finish->complete(-EIO);
+}
+
+template <typename I>
+void FileStream<I>::read(io::Extents&& byte_extents, bufferlist* data,
+ Context* on_finish) {
+ on_finish->complete(-EIO);
+}
+
+#endif // BOOST_ASIO_HAS_POSIX_STREAM_DESCRIPTOR
+
+} // namespace migration
+} // namespace librbd
+
+template class librbd::migration::FileStream<librbd::ImageCtx>;
diff --git a/src/librbd/migration/FileStream.h b/src/librbd/migration/FileStream.h
new file mode 100644
index 000000000..32face71e
--- /dev/null
+++ b/src/librbd/migration/FileStream.h
@@ -0,0 +1,68 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#ifndef CEPH_LIBRBD_MIGRATION_FILE_STREAM_H
+#define CEPH_LIBRBD_MIGRATION_FILE_STREAM_H
+
+#include "include/int_types.h"
+#include "librbd/migration/StreamInterface.h"
+#include <boost/asio/io_context.hpp>
+#include <boost/asio/strand.hpp>
+#include <boost/asio/posix/basic_stream_descriptor.hpp>
+#include <json_spirit/json_spirit.h>
+#include <memory>
+#include <string>
+
+struct Context;
+
+namespace librbd {
+
+struct AsioEngine;
+struct ImageCtx;
+
+namespace migration {
+
+template <typename ImageCtxT>
+class FileStream : public StreamInterface {
+public:
+ static FileStream* create(ImageCtxT* image_ctx,
+ const json_spirit::mObject& json_object) {
+ return new FileStream(image_ctx, json_object);
+ }
+
+ FileStream(ImageCtxT* image_ctx, const json_spirit::mObject& json_object);
+ ~FileStream() override;
+
+ FileStream(const FileStream&) = delete;
+ FileStream& operator=(const FileStream&) = delete;
+
+ void open(Context* on_finish) override;
+ void close(Context* on_finish) override;
+
+ void get_size(uint64_t* size, Context* on_finish) override;
+
+ void read(io::Extents&& byte_extents, bufferlist* data,
+ Context* on_finish) override;
+
+private:
+ CephContext* m_cct;
+ std::shared_ptr<AsioEngine> m_asio_engine;
+ json_spirit::mObject m_json_object;
+
+ boost::asio::strand<boost::asio::io_context::executor_type> m_strand;
+#ifdef BOOST_ASIO_HAS_POSIX_STREAM_DESCRIPTOR
+ std::optional<boost::asio::posix::stream_descriptor> m_stream_descriptor;
+
+ struct ReadRequest;
+
+#endif // BOOST_ASIO_HAS_POSIX_STREAM_DESCRIPTOR
+
+ int m_file_no = -1;
+};
+
+} // namespace migration
+} // namespace librbd
+
+extern template class librbd::migration::FileStream<librbd::ImageCtx>;
+
+#endif // CEPH_LIBRBD_MIGRATION_FILE_STREAM_H
diff --git a/src/librbd/migration/FormatInterface.h b/src/librbd/migration/FormatInterface.h
new file mode 100644
index 000000000..d13521d11
--- /dev/null
+++ b/src/librbd/migration/FormatInterface.h
@@ -0,0 +1,53 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#ifndef CEPH_LIBRBD_MIGRATION_FORMAT_INTERFACE_H
+#define CEPH_LIBRBD_MIGRATION_FORMAT_INTERFACE_H
+
+#include "include/buffer_fwd.h"
+#include "include/int_types.h"
+#include "common/zipkin_trace.h"
+#include "librbd/Types.h"
+#include "librbd/io/Types.h"
+#include <map>
+
+struct Context;
+
+namespace librbd {
+
+namespace io {
+struct AioCompletion;
+struct ReadResult;
+} // namespace io
+
+namespace migration {
+
+struct FormatInterface {
+ typedef std::map<uint64_t, SnapInfo> SnapInfos;
+
+ virtual ~FormatInterface() {
+ }
+
+ virtual void open(Context* on_finish) = 0;
+ virtual void close(Context* on_finish) = 0;
+
+ virtual void get_snapshots(SnapInfos* snap_infos, Context* on_finish) = 0;
+ virtual void get_image_size(uint64_t snap_id, uint64_t* size,
+ Context* on_finish) = 0;
+
+ virtual bool read(io::AioCompletion* aio_comp, uint64_t snap_id,
+ io::Extents&& image_extents, io::ReadResult&& read_result,
+ int op_flags, int read_flags,
+ const ZTracer::Trace &parent_trace) = 0;
+
+ virtual void list_snaps(io::Extents&& image_extents, io::SnapIds&& snap_ids,
+ int list_snaps_flags,
+ io::SnapshotDelta* snapshot_delta,
+ const ZTracer::Trace &parent_trace,
+ Context* on_finish) = 0;
+};
+
+} // namespace migration
+} // namespace librbd
+
+#endif // CEPH_LIBRBD_MIGRATION_FORMAT_INTERFACE_H
diff --git a/src/librbd/migration/HttpClient.cc b/src/librbd/migration/HttpClient.cc
new file mode 100644
index 000000000..679c2bb07
--- /dev/null
+++ b/src/librbd/migration/HttpClient.cc
@@ -0,0 +1,946 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include "librbd/migration/HttpClient.h"
+#include "common/dout.h"
+#include "common/errno.h"
+#include "librbd/AsioEngine.h"
+#include "librbd/ImageCtx.h"
+#include "librbd/Utils.h"
+#include "librbd/asio/Utils.h"
+#include "librbd/io/AioCompletion.h"
+#include "librbd/io/ReadResult.h"
+#include "librbd/migration/Utils.h"
+#include <boost/asio/buffer.hpp>
+#include <boost/asio/post.hpp>
+#include <boost/asio/ip/tcp.hpp>
+#include <boost/asio/read.hpp>
+#include <boost/asio/ssl.hpp>
+#include <boost/beast/core.hpp>
+#include <boost/beast/http/read.hpp>
+#include <boost/lexical_cast.hpp>
+#include <deque>
+
+namespace librbd {
+namespace migration {
+
+#define dout_subsys ceph_subsys_rbd
+#undef dout_prefix
+#define dout_prefix *_dout << "librbd::migration::HttpClient::" \
+ << "HttpSession " << this << " " << __func__ \
+ << ": "
+
+/**
+ * boost::beast utilizes non-inheriting template classes for handling plain vs
+ * encrypted TCP streams. Utilize a base-class for handling the majority of the
+ * logic for handling connecting, disconnecting, reseting, and sending requests.
+ */
+
+template <typename I>
+template <typename D>
+class HttpClient<I>::HttpSession : public HttpSessionInterface {
+public:
+ void init(Context* on_finish) override {
+ ceph_assert(m_http_client->m_strand.running_in_this_thread());
+
+ auto cct = m_http_client->m_cct;
+ ldout(cct, 15) << dendl;
+
+ ceph_assert(m_state == STATE_UNINITIALIZED);
+ m_state = STATE_CONNECTING;
+
+ resolve_host(on_finish);
+ }
+
+ void shut_down(Context* on_finish) override {
+ ceph_assert(m_http_client->m_strand.running_in_this_thread());
+
+ auto cct = m_http_client->m_cct;
+ ldout(cct, 15) << dendl;
+
+ ceph_assert(on_finish != nullptr);
+ ceph_assert(m_on_shutdown == nullptr);
+ m_on_shutdown = on_finish;
+
+ auto current_state = m_state;
+ if (current_state == STATE_UNINITIALIZED) {
+ // never initialized or resolve/connect failed
+ on_finish->complete(0);
+ return;
+ }
+
+ m_state = STATE_SHUTTING_DOWN;
+ if (current_state != STATE_READY) {
+ // delay shutdown until current state transition completes
+ return;
+ }
+
+ disconnect(new LambdaContext([this](int r) { handle_shut_down(r); }));
+ }
+
+ void issue(std::shared_ptr<Work>&& work) override {
+ ceph_assert(m_http_client->m_strand.running_in_this_thread());
+
+ auto cct = m_http_client->m_cct;
+ ldout(cct, 20) << "work=" << work.get() << dendl;
+
+ if (is_shutdown()) {
+ lderr(cct) << "cannot issue HTTP request, client is shutdown"
+ << dendl;
+ work->complete(-ESHUTDOWN, {});
+ return;
+ }
+
+ bool first_issue = m_issue_queue.empty();
+ m_issue_queue.emplace_back(work);
+ if (m_state == STATE_READY && first_issue) {
+ ldout(cct, 20) << "sending http request: work=" << work.get() << dendl;
+ finalize_issue(std::move(work));
+ } else if (m_state == STATE_UNINITIALIZED) {
+ ldout(cct, 20) << "resetting HTTP session: work=" << work.get() << dendl;
+ m_state = STATE_RESET_CONNECTING;
+ resolve_host(nullptr);
+ } else {
+ ldout(cct, 20) << "queueing HTTP request: work=" << work.get() << dendl;
+ }
+ }
+
+ void finalize_issue(std::shared_ptr<Work>&& work) {
+ auto cct = m_http_client->m_cct;
+ ldout(cct, 20) << "work=" << work.get() << dendl;
+
+ ++m_in_flight_requests;
+ (*work)(derived().stream());
+ }
+
+ void handle_issue(boost::system::error_code ec,
+ std::shared_ptr<Work>&& work) override {
+ ceph_assert(m_http_client->m_strand.running_in_this_thread());
+
+ auto cct = m_http_client->m_cct;
+ ldout(cct, 20) << "work=" << work.get() << ", r=" << -ec.value() << dendl;
+
+ ceph_assert(m_in_flight_requests > 0);
+ --m_in_flight_requests;
+ if (maybe_finalize_reset()) {
+ // previous request is attempting reset to this request will be resent
+ return;
+ }
+
+ ceph_assert(!m_issue_queue.empty());
+ m_issue_queue.pop_front();
+
+ if (is_shutdown()) {
+ lderr(cct) << "client shutdown during in-flight request" << dendl;
+ work->complete(-ESHUTDOWN, {});
+
+ maybe_finalize_shutdown();
+ return;
+ }
+
+ if (ec) {
+ if (ec == boost::asio::error::bad_descriptor ||
+ ec == boost::asio::error::broken_pipe ||
+ ec == boost::asio::error::connection_reset ||
+ ec == boost::asio::error::operation_aborted ||
+ ec == boost::asio::ssl::error::stream_truncated ||
+ ec == boost::beast::http::error::end_of_stream ||
+ ec == boost::beast::http::error::partial_message) {
+ ldout(cct, 5) << "remote peer stream closed, retrying request" << dendl;
+ m_issue_queue.push_front(work);
+ } else if (ec == boost::beast::error::timeout) {
+ lderr(cct) << "timed-out while issuing request" << dendl;
+ work->complete(-ETIMEDOUT, {});
+ } else {
+ lderr(cct) << "failed to issue request: " << ec.message() << dendl;
+ work->complete(-ec.value(), {});
+ }
+
+ // attempt to recover the connection
+ reset();
+ return;
+ }
+
+ bool first_receive = m_receive_queue.empty();
+ m_receive_queue.push_back(work);
+ if (first_receive) {
+ receive(std::move(work));
+ }
+
+ // TODO disable pipelining for non-idempotent requests
+
+ // pipeline the next request into the stream
+ if (!m_issue_queue.empty()) {
+ work = m_issue_queue.front();
+ ldout(cct, 20) << "sending http request: work=" << work.get() << dendl;
+ finalize_issue(std::move(work));
+ }
+ }
+
+protected:
+ HttpClient* m_http_client;
+
+ HttpSession(HttpClient* http_client)
+ : m_http_client(http_client), m_resolver(http_client->m_strand) {
+ }
+
+ virtual void connect(boost::asio::ip::tcp::resolver::results_type results,
+ Context* on_finish) = 0;
+ virtual void disconnect(Context* on_finish) = 0;
+
+ void close_socket() {
+ auto cct = m_http_client->m_cct;
+ ldout(cct, 15) << dendl;
+
+ boost::system::error_code ec;
+ boost::beast::get_lowest_layer(derived().stream()).socket().close(ec);
+ }
+
+private:
+ enum State {
+ STATE_UNINITIALIZED,
+ STATE_CONNECTING,
+ STATE_READY,
+ STATE_RESET_PENDING,
+ STATE_RESET_DISCONNECTING,
+ STATE_RESET_CONNECTING,
+ STATE_SHUTTING_DOWN,
+ STATE_SHUTDOWN,
+ };
+
+ State m_state = STATE_UNINITIALIZED;
+ boost::asio::ip::tcp::resolver m_resolver;
+
+ Context* m_on_shutdown = nullptr;
+
+ uint64_t m_in_flight_requests = 0;
+ std::deque<std::shared_ptr<Work>> m_issue_queue;
+ std::deque<std::shared_ptr<Work>> m_receive_queue;
+
+ boost::beast::flat_buffer m_buffer;
+ std::optional<boost::beast::http::parser<false, EmptyBody>> m_header_parser;
+ std::optional<boost::beast::http::parser<false, StringBody>> m_parser;
+
+ D& derived() {
+ return static_cast<D&>(*this);
+ }
+
+ void resolve_host(Context* on_finish) {
+ auto cct = m_http_client->m_cct;
+ ldout(cct, 15) << dendl;
+
+ shutdown_socket();
+ m_resolver.async_resolve(
+ m_http_client->m_url_spec.host, m_http_client->m_url_spec.port,
+ [this, on_finish](boost::system::error_code ec, auto results) {
+ handle_resolve_host(ec, results, on_finish); });
+ }
+
+ void handle_resolve_host(
+ boost::system::error_code ec,
+ boost::asio::ip::tcp::resolver::results_type results,
+ Context* on_finish) {
+ auto cct = m_http_client->m_cct;
+ int r = -ec.value();
+ ldout(cct, 15) << "r=" << r << dendl;
+
+ if (ec) {
+ if (ec == boost::asio::error::host_not_found) {
+ r = -ENOENT;
+ } else if (ec == boost::asio::error::host_not_found_try_again) {
+ // TODO: add retry throttle
+ r = -EAGAIN;
+ }
+
+ lderr(cct) << "failed to resolve host '"
+ << m_http_client->m_url_spec.host << "': "
+ << cpp_strerror(r) << dendl;
+ advance_state(STATE_UNINITIALIZED, r, on_finish);
+ return;
+ }
+
+ connect(results, new LambdaContext([this, on_finish](int r) {
+ handle_connect(r, on_finish); }));
+ }
+
+ void handle_connect(int r, Context* on_finish) {
+ auto cct = m_http_client->m_cct;
+ ldout(cct, 15) << "r=" << r << dendl;
+
+ if (r < 0) {
+ lderr(cct) << "failed to connect to host '"
+ << m_http_client->m_url_spec.host << "': "
+ << cpp_strerror(r) << dendl;
+ advance_state(STATE_UNINITIALIZED, r, on_finish);
+ return;
+ }
+
+ advance_state(STATE_READY, 0, on_finish);
+ }
+
+ void handle_shut_down(int r) {
+ auto cct = m_http_client->m_cct;
+ ldout(cct, 15) << "r=" << r << dendl;
+
+ if (r < 0) {
+ lderr(cct) << "failed to disconnect stream: '" << cpp_strerror(r)
+ << dendl;
+ }
+
+ // cancel all in-flight send/receives (if any)
+ shutdown_socket();
+
+ maybe_finalize_shutdown();
+ }
+
+ void maybe_finalize_shutdown() {
+ if (m_in_flight_requests > 0) {
+ return;
+ }
+
+ // cancel any queued IOs
+ fail_queued_work(-ESHUTDOWN);
+
+ advance_state(STATE_SHUTDOWN, 0, nullptr);
+ }
+
+ bool is_shutdown() const {
+ ceph_assert(m_http_client->m_strand.running_in_this_thread());
+ return (m_state == STATE_SHUTTING_DOWN || m_state == STATE_SHUTDOWN);
+ }
+
+ void reset() {
+ ceph_assert(m_http_client->m_strand.running_in_this_thread());
+ ceph_assert(m_state == STATE_READY);
+
+ auto cct = m_http_client->m_cct;
+ ldout(cct, 15) << dendl;
+
+ m_state = STATE_RESET_PENDING;
+ maybe_finalize_reset();
+ }
+
+ bool maybe_finalize_reset() {
+ if (m_state != STATE_RESET_PENDING) {
+ return false;
+ }
+
+ if (m_in_flight_requests > 0) {
+ return true;
+ }
+
+ ceph_assert(m_http_client->m_strand.running_in_this_thread());
+ auto cct = m_http_client->m_cct;
+ ldout(cct, 15) << dendl;
+
+ m_buffer.clear();
+
+ // move in-flight request back to the front of the issue queue
+ m_issue_queue.insert(m_issue_queue.begin(),
+ m_receive_queue.begin(), m_receive_queue.end());
+ m_receive_queue.clear();
+
+ m_state = STATE_RESET_DISCONNECTING;
+ disconnect(new LambdaContext([this](int r) { handle_reset(r); }));
+ return true;
+ }
+
+ void handle_reset(int r) {
+ auto cct = m_http_client->m_cct;
+ ldout(cct, 15) << "r=" << r << dendl;
+
+ if (r < 0) {
+ lderr(cct) << "failed to disconnect stream: '" << cpp_strerror(r)
+ << dendl;
+ }
+
+ advance_state(STATE_RESET_CONNECTING, r, nullptr);
+ }
+
+ int shutdown_socket() {
+ if (!boost::beast::get_lowest_layer(
+ derived().stream()).socket().is_open()) {
+ return 0;
+ }
+
+ auto cct = m_http_client->m_cct;
+ ldout(cct, 15) << dendl;
+
+ boost::system::error_code ec;
+ boost::beast::get_lowest_layer(derived().stream()).socket().shutdown(
+ boost::asio::ip::tcp::socket::shutdown_both, ec);
+
+ if (ec && ec != boost::beast::errc::not_connected) {
+ lderr(cct) << "failed to shutdown socket: " << ec.message() << dendl;
+ return -ec.value();
+ }
+
+ close_socket();
+ return 0;
+ }
+
+ void receive(std::shared_ptr<Work>&& work) {
+ auto cct = m_http_client->m_cct;
+ ldout(cct, 15) << "work=" << work.get() << dendl;
+
+ ceph_assert(!m_receive_queue.empty());
+ ++m_in_flight_requests;
+
+ // receive the response for this request
+ m_parser.emplace();
+ if (work->header_only()) {
+ // HEAD requests don't trasfer data but the parser still cares about max
+ // content-length
+ m_header_parser.emplace();
+ m_header_parser->body_limit(std::numeric_limits<uint64_t>::max());
+
+ boost::beast::http::async_read_header(
+ derived().stream(), m_buffer, *m_header_parser,
+ [this, work=std::move(work)]
+ (boost::beast::error_code ec, std::size_t) mutable {
+ handle_receive(ec, std::move(work));
+ });
+ } else {
+ m_parser->body_limit(1 << 25); // max RBD object size
+ boost::beast::http::async_read(
+ derived().stream(), m_buffer, *m_parser,
+ [this, work=std::move(work)]
+ (boost::beast::error_code ec, std::size_t) mutable {
+ handle_receive(ec, std::move(work));
+ });
+ }
+ }
+
+ void handle_receive(boost::system::error_code ec,
+ std::shared_ptr<Work>&& work) {
+ auto cct = m_http_client->m_cct;
+ ldout(cct, 15) << "work=" << work.get() << ", r=" << -ec.value() << dendl;
+
+ ceph_assert(m_in_flight_requests > 0);
+ --m_in_flight_requests;
+ if (maybe_finalize_reset()) {
+ // previous request is attempting reset to this request will be resent
+ return;
+ }
+
+ ceph_assert(!m_receive_queue.empty());
+ m_receive_queue.pop_front();
+
+ if (is_shutdown()) {
+ lderr(cct) << "client shutdown with in-flight request" << dendl;
+ work->complete(-ESHUTDOWN, {});
+
+ maybe_finalize_shutdown();
+ return;
+ }
+
+ if (ec) {
+ if (ec == boost::asio::error::bad_descriptor ||
+ ec == boost::asio::error::broken_pipe ||
+ ec == boost::asio::error::connection_reset ||
+ ec == boost::asio::error::operation_aborted ||
+ ec == boost::asio::ssl::error::stream_truncated ||
+ ec == boost::beast::http::error::end_of_stream ||
+ ec == boost::beast::http::error::partial_message) {
+ ldout(cct, 5) << "remote peer stream closed, retrying request" << dendl;
+ m_receive_queue.push_front(work);
+ } else if (ec == boost::beast::error::timeout) {
+ lderr(cct) << "timed-out while issuing request" << dendl;
+ work->complete(-ETIMEDOUT, {});
+ } else {
+ lderr(cct) << "failed to issue request: " << ec.message() << dendl;
+ work->complete(-ec.value(), {});
+ }
+
+ reset();
+ return;
+ }
+
+ Response response;
+ if (work->header_only()) {
+ m_parser.emplace(std::move(*m_header_parser));
+ }
+ response = m_parser->release();
+
+ // basic response code handling in a common location
+ int r = 0;
+ auto result = response.result();
+ if (result == boost::beast::http::status::not_found) {
+ lderr(cct) << "requested resource does not exist" << dendl;
+ r = -ENOENT;
+ } else if (result == boost::beast::http::status::forbidden) {
+ lderr(cct) << "permission denied attempting to access resource" << dendl;
+ r = -EACCES;
+ } else if (boost::beast::http::to_status_class(result) !=
+ boost::beast::http::status_class::successful) {
+ lderr(cct) << "failed to retrieve size: HTTP " << result << dendl;
+ r = -EIO;
+ }
+
+ bool need_eof = response.need_eof();
+ if (r < 0) {
+ work->complete(r, {});
+ } else {
+ work->complete(0, std::move(response));
+ }
+
+ if (need_eof) {
+ ldout(cct, 20) << "reset required for non-pipelined response: "
+ << "work=" << work.get() << dendl;
+ reset();
+ } else if (!m_receive_queue.empty()) {
+ auto work = m_receive_queue.front();
+ receive(std::move(work));
+ }
+ }
+
+ void advance_state(State next_state, int r, Context* on_finish) {
+ auto cct = m_http_client->m_cct;
+ auto current_state = m_state;
+ ldout(cct, 15) << "current_state=" << current_state << ", "
+ << "next_state=" << next_state << ", "
+ << "r=" << r << dendl;
+
+ m_state = next_state;
+ if (current_state == STATE_CONNECTING) {
+ if (next_state == STATE_UNINITIALIZED) {
+ shutdown_socket();
+ on_finish->complete(r);
+ return;
+ } else if (next_state == STATE_READY) {
+ on_finish->complete(r);
+ return;
+ }
+ } else if (current_state == STATE_SHUTTING_DOWN) {
+ if (next_state == STATE_READY) {
+ // shut down requested while connecting/resetting
+ disconnect(new LambdaContext([this](int r) { handle_shut_down(r); }));
+ return;
+ } else if (next_state == STATE_UNINITIALIZED ||
+ next_state == STATE_SHUTDOWN ||
+ next_state == STATE_RESET_CONNECTING) {
+ ceph_assert(m_on_shutdown != nullptr);
+ m_on_shutdown->complete(r);
+ return;
+ }
+ } else if (current_state == STATE_RESET_DISCONNECTING) {
+ // disconnected from peer -- ignore errors and reconnect
+ ceph_assert(next_state == STATE_RESET_CONNECTING);
+ ceph_assert(on_finish == nullptr);
+ shutdown_socket();
+ resolve_host(nullptr);
+ return;
+ } else if (current_state == STATE_RESET_CONNECTING) {
+ ceph_assert(on_finish == nullptr);
+ if (next_state == STATE_READY) {
+ // restart queued IO
+ if (!m_issue_queue.empty()) {
+ auto& work = m_issue_queue.front();
+ finalize_issue(std::move(work));
+ }
+ return;
+ } else if (next_state == STATE_UNINITIALIZED) {
+ shutdown_socket();
+
+ // fail all queued IO
+ fail_queued_work(r);
+ return;
+ }
+ }
+
+ lderr(cct) << "unexpected state transition: "
+ << "current_state=" << current_state << ", "
+ << "next_state=" << next_state << dendl;
+ ceph_assert(false);
+ }
+
+ void complete_work(std::shared_ptr<Work> work, int r, Response&& response) {
+ auto cct = m_http_client->m_cct;
+ ldout(cct, 20) << "work=" << work.get() << ", r=" << r << dendl;
+
+ work->complete(r, std::move(response));
+ }
+
+ void fail_queued_work(int r) {
+ auto cct = m_http_client->m_cct;
+ ldout(cct, 10) << "r=" << r << dendl;
+
+ for (auto& work : m_issue_queue) {
+ complete_work(work, r, {});
+ }
+ m_issue_queue.clear();
+ ceph_assert(m_receive_queue.empty());
+ }
+};
+
+#undef dout_prefix
+#define dout_prefix *_dout << "librbd::migration::HttpClient::" \
+ << "PlainHttpSession " << this << " " << __func__ \
+ << ": "
+
+template <typename I>
+class HttpClient<I>::PlainHttpSession : public HttpSession<PlainHttpSession> {
+public:
+ PlainHttpSession(HttpClient* http_client)
+ : HttpSession<PlainHttpSession>(http_client),
+ m_stream(http_client->m_strand) {
+ }
+ ~PlainHttpSession() override {
+ this->close_socket();
+ }
+
+ inline boost::beast::tcp_stream&
+ stream() {
+ return m_stream;
+ }
+
+protected:
+ void connect(boost::asio::ip::tcp::resolver::results_type results,
+ Context* on_finish) override {
+ auto http_client = this->m_http_client;
+ auto cct = http_client->m_cct;
+ ldout(cct, 15) << dendl;
+
+ m_stream.async_connect(
+ results,
+ asio::util::get_callback_adapter(
+ [on_finish](int r, auto endpoint) { on_finish->complete(r); }));
+ }
+
+ void disconnect(Context* on_finish) override {
+ on_finish->complete(0);
+ }
+
+private:
+ boost::beast::tcp_stream m_stream;
+
+};
+
+#undef dout_prefix
+#define dout_prefix *_dout << "librbd::migration::HttpClient::" \
+ << "SslHttpSession " << this << " " << __func__ \
+ << ": "
+
+template <typename I>
+class HttpClient<I>::SslHttpSession : public HttpSession<SslHttpSession> {
+public:
+ SslHttpSession(HttpClient* http_client)
+ : HttpSession<SslHttpSession>(http_client),
+ m_stream(http_client->m_strand, http_client->m_ssl_context) {
+ }
+ ~SslHttpSession() override {
+ this->close_socket();
+ }
+
+ inline boost::beast::ssl_stream<boost::beast::tcp_stream>&
+ stream() {
+ return m_stream;
+ }
+
+protected:
+ void connect(boost::asio::ip::tcp::resolver::results_type results,
+ Context* on_finish) override {
+ auto http_client = this->m_http_client;
+ auto cct = http_client->m_cct;
+ ldout(cct, 15) << dendl;
+
+ boost::beast::get_lowest_layer(m_stream).async_connect(
+ results,
+ asio::util::get_callback_adapter(
+ [this, on_finish](int r, auto endpoint) {
+ handle_connect(r, on_finish); }));
+ }
+
+ void disconnect(Context* on_finish) override {
+ auto http_client = this->m_http_client;
+ auto cct = http_client->m_cct;
+ ldout(cct, 15) << dendl;
+
+ if (!m_ssl_enabled) {
+ on_finish->complete(0);
+ return;
+ }
+
+ m_stream.async_shutdown(
+ asio::util::get_callback_adapter([this, on_finish](int r) {
+ shutdown(r, on_finish); }));
+ }
+
+private:
+ boost::beast::ssl_stream<boost::beast::tcp_stream> m_stream;
+ bool m_ssl_enabled = false;
+
+ void handle_connect(int r, Context* on_finish) {
+ auto http_client = this->m_http_client;
+ auto cct = http_client->m_cct;
+ ldout(cct, 15) << dendl;
+
+ if (r < 0) {
+ lderr(cct) << "failed to connect to host '"
+ << http_client->m_url_spec.host << "': "
+ << cpp_strerror(r) << dendl;
+ on_finish->complete(r);
+ return;
+ }
+
+ handshake(on_finish);
+ }
+
+ void handshake(Context* on_finish) {
+ auto http_client = this->m_http_client;
+ auto cct = http_client->m_cct;
+ ldout(cct, 15) << dendl;
+
+ auto& host = http_client->m_url_spec.host;
+ m_stream.set_verify_mode(
+ boost::asio::ssl::verify_peer |
+ boost::asio::ssl::verify_fail_if_no_peer_cert);
+ m_stream.set_verify_callback(
+ [host, next=boost::asio::ssl::host_name_verification(host),
+ ignore_self_signed=http_client->m_ignore_self_signed_cert]
+ (bool preverified, boost::asio::ssl::verify_context& ctx) {
+ if (!preverified && ignore_self_signed) {
+ auto ec = X509_STORE_CTX_get_error(ctx.native_handle());
+ switch (ec) {
+ case X509_V_ERR_DEPTH_ZERO_SELF_SIGNED_CERT:
+ case X509_V_ERR_SELF_SIGNED_CERT_IN_CHAIN:
+ // ignore self-signed cert issues
+ preverified = true;
+ break;
+ default:
+ break;
+ }
+ }
+ return next(preverified, ctx);
+ });
+
+ // Set SNI Hostname (many hosts need this to handshake successfully)
+ if(!SSL_set_tlsext_host_name(m_stream.native_handle(),
+ http_client->m_url_spec.host.c_str())) {
+ int r = -::ERR_get_error();
+ lderr(cct) << "failed to initialize SNI hostname: " << cpp_strerror(r)
+ << dendl;
+ on_finish->complete(r);
+ return;
+ }
+
+ // Perform the SSL/TLS handshake
+ m_stream.async_handshake(
+ boost::asio::ssl::stream_base::client,
+ asio::util::get_callback_adapter(
+ [this, on_finish](int r) { handle_handshake(r, on_finish); }));
+ }
+
+ void handle_handshake(int r, Context* on_finish) {
+ auto http_client = this->m_http_client;
+ auto cct = http_client->m_cct;
+ ldout(cct, 15) << "r=" << r << dendl;
+
+ if (r < 0) {
+ lderr(cct) << "failed to complete handshake: " << cpp_strerror(r)
+ << dendl;
+ disconnect(new LambdaContext([r, on_finish](int) {
+ on_finish->complete(r); }));
+ return;
+ }
+
+ m_ssl_enabled = true;
+ on_finish->complete(0);
+ }
+
+ void shutdown(int r, Context* on_finish) {
+ auto http_client = this->m_http_client;
+ auto cct = http_client->m_cct;
+ ldout(cct, 15) << "r=" << r << dendl;
+
+ on_finish->complete(r);
+ }
+};
+
+#undef dout_prefix
+#define dout_prefix *_dout << "librbd::migration::HttpClient: " << this \
+ << " " << __func__ << ": "
+
+template <typename I>
+HttpClient<I>::HttpClient(I* image_ctx, const std::string& url)
+ : m_cct(image_ctx->cct), m_image_ctx(image_ctx),
+ m_asio_engine(image_ctx->asio_engine), m_url(url),
+ m_strand(boost::asio::make_strand(*m_asio_engine)),
+ m_ssl_context(boost::asio::ssl::context::sslv23_client) {
+ m_ssl_context.set_default_verify_paths();
+}
+
+template <typename I>
+void HttpClient<I>::open(Context* on_finish) {
+ ldout(m_cct, 10) << "url=" << m_url << dendl;
+
+ int r = util::parse_url(m_cct, m_url, &m_url_spec);
+ if (r < 0) {
+ lderr(m_cct) << "failed to parse url '" << m_url << "': " << cpp_strerror(r)
+ << dendl;
+ on_finish->complete(-EINVAL);
+ return;
+ }
+
+ boost::asio::post(m_strand, [this, on_finish]() mutable {
+ create_http_session(on_finish); });
+}
+
+template <typename I>
+void HttpClient<I>::close(Context* on_finish) {
+ boost::asio::post(m_strand, [this, on_finish]() mutable {
+ shut_down_http_session(on_finish); });
+}
+
+template <typename I>
+void HttpClient<I>::get_size(uint64_t* size, Context* on_finish) {
+ ldout(m_cct, 10) << dendl;
+
+ Request req;
+ req.method(boost::beast::http::verb::head);
+
+ issue(
+ std::move(req), [this, size, on_finish](int r, Response&& response) {
+ handle_get_size(r, std::move(response), size, on_finish);
+ });
+}
+
+template <typename I>
+void HttpClient<I>::handle_get_size(int r, Response&& response, uint64_t* size,
+ Context* on_finish) {
+ ldout(m_cct, 10) << "r=" << r << dendl;
+
+ if (r < 0) {
+ lderr(m_cct) << "failed to retrieve size: " << cpp_strerror(r) << dendl;
+ on_finish->complete(r);
+ return;
+ } else if (!response.has_content_length()) {
+ lderr(m_cct) << "failed to retrieve size: missing content-length" << dendl;
+ on_finish->complete(-EINVAL);
+ return;
+ }
+
+ auto content_length = response[boost::beast::http::field::content_length];
+ try {
+ *size = boost::lexical_cast<uint64_t>(content_length);
+ } catch (boost::bad_lexical_cast&) {
+ lderr(m_cct) << "invalid content-length in response" << dendl;
+ on_finish->complete(-EBADMSG);
+ return;
+ }
+
+ on_finish->complete(0);
+}
+
+template <typename I>
+void HttpClient<I>::read(io::Extents&& byte_extents, bufferlist* data,
+ Context* on_finish) {
+ ldout(m_cct, 20) << dendl;
+
+ auto aio_comp = io::AioCompletion::create_and_start(
+ on_finish, librbd::util::get_image_ctx(m_image_ctx), io::AIO_TYPE_READ);
+ aio_comp->set_request_count(byte_extents.size());
+
+ // utilize ReadResult to assemble multiple byte extents into a single bl
+ // since boost::beast doesn't support multipart responses out-of-the-box
+ io::ReadResult read_result{data};
+ aio_comp->read_result = std::move(read_result);
+ aio_comp->read_result.set_image_extents(byte_extents);
+
+ // issue a range get request for each extent
+ uint64_t buffer_offset = 0;
+ for (auto [byte_offset, byte_length] : byte_extents) {
+ auto ctx = new io::ReadResult::C_ImageReadRequest(
+ aio_comp, buffer_offset, {{byte_offset, byte_length}});
+ buffer_offset += byte_length;
+
+ Request req;
+ req.method(boost::beast::http::verb::get);
+
+ std::stringstream range;
+ ceph_assert(byte_length > 0);
+ range << "bytes=" << byte_offset << "-" << (byte_offset + byte_length - 1);
+ req.set(boost::beast::http::field::range, range.str());
+
+ issue(
+ std::move(req),
+ [this, byte_offset=byte_offset, byte_length=byte_length, ctx]
+ (int r, Response&& response) {
+ handle_read(r, std::move(response), byte_offset, byte_length, &ctx->bl,
+ ctx);
+ });
+ }
+}
+
+template <typename I>
+void HttpClient<I>::handle_read(int r, Response&& response,
+ uint64_t byte_offset, uint64_t byte_length,
+ bufferlist* data, Context* on_finish) {
+ ldout(m_cct, 20) << "bytes=" << byte_offset << "~" << byte_length << ", "
+ << "r=" << r << dendl;
+
+ if (r < 0) {
+ lderr(m_cct) << "failed to read requested byte range: "
+ << cpp_strerror(r) << dendl;
+ on_finish->complete(r);
+ return;
+ } else if (response.result() != boost::beast::http::status::partial_content) {
+ lderr(m_cct) << "failed to retrieve requested byte range: HTTP "
+ << response.result() << dendl;
+ on_finish->complete(-EIO);
+ return;
+ } else if (byte_length != response.body().size()) {
+ lderr(m_cct) << "unexpected short range read: "
+ << "wanted=" << byte_length << ", "
+ << "received=" << response.body().size() << dendl;
+ on_finish->complete(-EINVAL);
+ return;
+ }
+
+ data->clear();
+ data->append(response.body());
+ on_finish->complete(data->length());
+}
+
+template <typename I>
+void HttpClient<I>::issue(std::shared_ptr<Work>&& work) {
+ boost::asio::post(m_strand, [this, work=std::move(work)]() mutable {
+ m_http_session->issue(std::move(work)); });
+}
+
+template <typename I>
+void HttpClient<I>::create_http_session(Context* on_finish) {
+ ldout(m_cct, 15) << dendl;
+
+ ceph_assert(m_http_session == nullptr);
+ switch (m_url_spec.scheme) {
+ case URL_SCHEME_HTTP:
+ m_http_session = std::make_unique<PlainHttpSession>(this);
+ break;
+ case URL_SCHEME_HTTPS:
+ m_http_session = std::make_unique<SslHttpSession>(this);
+ break;
+ default:
+ ceph_assert(false);
+ break;
+ }
+
+ m_http_session->init(on_finish);
+}
+
+template <typename I>
+void HttpClient<I>::shut_down_http_session(Context* on_finish) {
+ ldout(m_cct, 15) << dendl;
+
+ if (m_http_session == nullptr) {
+ on_finish->complete(0);
+ return;
+ }
+
+ m_http_session->shut_down(on_finish);
+}
+
+} // namespace migration
+} // namespace librbd
+
+template class librbd::migration::HttpClient<librbd::ImageCtx>;
diff --git a/src/librbd/migration/HttpClient.h b/src/librbd/migration/HttpClient.h
new file mode 100644
index 000000000..3997e6159
--- /dev/null
+++ b/src/librbd/migration/HttpClient.h
@@ -0,0 +1,205 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#ifndef CEPH_LIBRBD_MIGRATION_HTTP_CLIENT_H
+#define CEPH_LIBRBD_MIGRATION_HTTP_CLIENT_H
+
+#include "include/common_fwd.h"
+#include "include/int_types.h"
+#include "librbd/io/Types.h"
+#include "librbd/migration/HttpProcessorInterface.h"
+#include "librbd/migration/Types.h"
+#include <boost/asio/io_context.hpp>
+#include <boost/asio/strand.hpp>
+#include <boost/asio/ip/tcp.hpp>
+#include <boost/asio/ssl/context.hpp>
+#include <boost/beast/version.hpp>
+#include <boost/beast/core/tcp_stream.hpp>
+#include <boost/beast/http/empty_body.hpp>
+#include <boost/beast/http/message.hpp>
+#include <boost/beast/http/string_body.hpp>
+#include <boost/beast/http/write.hpp>
+#include <boost/beast/ssl/ssl_stream.hpp>
+#include <functional>
+#include <memory>
+#include <string>
+#include <utility>
+
+struct Context;
+
+namespace librbd {
+
+struct AsioEngine;
+struct ImageCtx;
+
+namespace migration {
+
+template <typename ImageCtxT>
+class HttpClient {
+public:
+ using EmptyBody = boost::beast::http::empty_body;
+ using StringBody = boost::beast::http::string_body;
+ using Request = boost::beast::http::request<EmptyBody>;
+ using Response = boost::beast::http::response<StringBody>;
+
+ using RequestPreprocessor = std::function<void(Request&)>;
+
+ static HttpClient* create(ImageCtxT* image_ctx, const std::string& url) {
+ return new HttpClient(image_ctx, url);
+ }
+
+ HttpClient(ImageCtxT* image_ctx, const std::string& url);
+ HttpClient(const HttpClient&) = delete;
+ HttpClient& operator=(const HttpClient&) = delete;
+
+ void open(Context* on_finish);
+ void close(Context* on_finish);
+
+ void get_size(uint64_t* size, Context* on_finish);
+
+ void read(io::Extents&& byte_extents, bufferlist* data,
+ Context* on_finish);
+
+ void set_ignore_self_signed_cert(bool ignore) {
+ m_ignore_self_signed_cert = ignore;
+ }
+
+ void set_http_processor(HttpProcessorInterface* http_processor) {
+ m_http_processor = http_processor;
+ }
+
+ template <class Body, typename Completion>
+ void issue(boost::beast::http::request<Body>&& request,
+ Completion&& completion) {
+ struct WorkImpl : Work {
+ HttpClient* http_client;
+ boost::beast::http::request<Body> request;
+ Completion completion;
+
+ WorkImpl(HttpClient* http_client,
+ boost::beast::http::request<Body>&& request,
+ Completion&& completion)
+ : http_client(http_client), request(std::move(request)),
+ completion(std::move(completion)) {
+ }
+ WorkImpl(const WorkImpl&) = delete;
+ WorkImpl& operator=(const WorkImpl&) = delete;
+
+ bool need_eof() const override {
+ return request.need_eof();
+ }
+
+ bool header_only() const override {
+ return (request.method() == boost::beast::http::verb::head);
+ }
+
+ void complete(int r, Response&& response) override {
+ completion(r, std::move(response));
+ }
+
+ void operator()(boost::beast::tcp_stream& stream) override {
+ preprocess_request();
+
+ boost::beast::http::async_write(
+ stream, request,
+ [http_session=http_client->m_http_session.get(),
+ work=this->shared_from_this()]
+ (boost::beast::error_code ec, std::size_t) mutable {
+ http_session->handle_issue(ec, std::move(work));
+ });
+ }
+
+ void operator()(
+ boost::beast::ssl_stream<boost::beast::tcp_stream>& stream) override {
+ preprocess_request();
+
+ boost::beast::http::async_write(
+ stream, request,
+ [http_session=http_client->m_http_session.get(),
+ work=this->shared_from_this()]
+ (boost::beast::error_code ec, std::size_t) mutable {
+ http_session->handle_issue(ec, std::move(work));
+ });
+ }
+
+ void preprocess_request() {
+ if (http_client->m_http_processor) {
+ http_client->m_http_processor->process_request(request);
+ }
+ }
+ };
+
+ initialize_default_fields(request);
+ issue(std::make_shared<WorkImpl>(this, std::move(request),
+ std::move(completion)));
+ }
+
+private:
+ struct Work;
+ struct HttpSessionInterface {
+ virtual ~HttpSessionInterface() {}
+
+ virtual void init(Context* on_finish) = 0;
+ virtual void shut_down(Context* on_finish) = 0;
+
+ virtual void issue(std::shared_ptr<Work>&& work) = 0;
+ virtual void handle_issue(boost::system::error_code ec,
+ std::shared_ptr<Work>&& work) = 0;
+ };
+
+ struct Work : public std::enable_shared_from_this<Work> {
+ virtual ~Work() {}
+ virtual bool need_eof() const = 0;
+ virtual bool header_only() const = 0;
+ virtual void complete(int r, Response&&) = 0;
+ virtual void operator()(boost::beast::tcp_stream& stream) = 0;
+ virtual void operator()(
+ boost::beast::ssl_stream<boost::beast::tcp_stream>& stream) = 0;
+ };
+
+ template <typename D> struct HttpSession;
+ struct PlainHttpSession;
+ struct SslHttpSession;
+
+ CephContext* m_cct;
+ ImageCtxT* m_image_ctx;
+ std::shared_ptr<AsioEngine> m_asio_engine;
+ std::string m_url;
+
+ UrlSpec m_url_spec;
+
+ bool m_ignore_self_signed_cert = false;
+
+ HttpProcessorInterface* m_http_processor = nullptr;
+
+ boost::asio::strand<boost::asio::io_context::executor_type> m_strand;
+
+ boost::asio::ssl::context m_ssl_context;
+ std::unique_ptr<HttpSessionInterface> m_http_session;
+
+ template <typename Fields>
+ void initialize_default_fields(Fields& fields) const {
+ fields.target(m_url_spec.path);
+ fields.set(boost::beast::http::field::host, m_url_spec.host);
+ fields.set(boost::beast::http::field::user_agent,
+ BOOST_BEAST_VERSION_STRING);
+ }
+
+ void handle_get_size(int r, Response&& response, uint64_t* size,
+ Context* on_finish);
+
+ void handle_read(int r, Response&& response, uint64_t byte_offset,
+ uint64_t byte_length, bufferlist* data, Context* on_finish);
+
+ void issue(std::shared_ptr<Work>&& work);
+
+ void create_http_session(Context* on_finish);
+ void shut_down_http_session(Context* on_finish);
+};
+
+} // namespace migration
+} // namespace librbd
+
+extern template class librbd::migration::HttpClient<librbd::ImageCtx>;
+
+#endif // CEPH_LIBRBD_MIGRATION_HTTP_CLIENT_H
diff --git a/src/librbd/migration/HttpProcessorInterface.h b/src/librbd/migration/HttpProcessorInterface.h
new file mode 100644
index 000000000..3d9af88bd
--- /dev/null
+++ b/src/librbd/migration/HttpProcessorInterface.h
@@ -0,0 +1,27 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#ifndef CEPH_LIBRBD_MIGRATION_HTTP_PROCESSOR_INTERFACE_H
+#define CEPH_LIBRBD_MIGRATION_HTTP_PROCESSOR_INTERFACE_H
+
+#include <boost/beast/http/empty_body.hpp>
+#include <boost/beast/http/message.hpp>
+
+namespace librbd {
+namespace migration {
+
+struct HttpProcessorInterface {
+ using EmptyBody = boost::beast::http::empty_body;
+ using EmptyRequest = boost::beast::http::request<EmptyBody>;
+
+ virtual ~HttpProcessorInterface() {
+ }
+
+ virtual void process_request(EmptyRequest& request) = 0;
+
+};
+
+} // namespace migration
+} // namespace librbd
+
+#endif // CEPH_LIBRBD_MIGRATION_HTTP_PROCESSOR_INTERFACE_H
diff --git a/src/librbd/migration/HttpStream.cc b/src/librbd/migration/HttpStream.cc
new file mode 100644
index 000000000..fa3cc0032
--- /dev/null
+++ b/src/librbd/migration/HttpStream.cc
@@ -0,0 +1,83 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include "librbd/migration/HttpStream.h"
+#include "common/dout.h"
+#include "common/errno.h"
+#include "librbd/AsioEngine.h"
+#include "librbd/ImageCtx.h"
+#include "librbd/asio/Utils.h"
+#include "librbd/migration/HttpClient.h"
+#include <boost/beast/http.hpp>
+
+namespace librbd {
+namespace migration {
+
+namespace {
+
+const std::string URL_KEY {"url"};
+
+} // anonymous namespace
+
+#define dout_subsys ceph_subsys_rbd
+#undef dout_prefix
+#define dout_prefix *_dout << "librbd::migration::HttpStream: " << this \
+ << " " << __func__ << ": "
+
+template <typename I>
+HttpStream<I>::HttpStream(I* image_ctx, const json_spirit::mObject& json_object)
+ : m_image_ctx(image_ctx), m_cct(image_ctx->cct),
+ m_asio_engine(image_ctx->asio_engine), m_json_object(json_object) {
+}
+
+template <typename I>
+HttpStream<I>::~HttpStream() {
+}
+
+template <typename I>
+void HttpStream<I>::open(Context* on_finish) {
+ auto& url_value = m_json_object[URL_KEY];
+ if (url_value.type() != json_spirit::str_type) {
+ lderr(m_cct) << "failed to locate '" << URL_KEY << "' key" << dendl;
+ on_finish->complete(-EINVAL);
+ return;
+ }
+
+ m_url = url_value.get_str();
+ ldout(m_cct, 10) << "url=" << m_url << dendl;
+
+ m_http_client.reset(HttpClient<I>::create(m_image_ctx, m_url));
+ m_http_client->open(on_finish);
+}
+
+template <typename I>
+void HttpStream<I>::close(Context* on_finish) {
+ ldout(m_cct, 10) << dendl;
+
+ if (!m_http_client) {
+ on_finish->complete(0);
+ return;
+ }
+
+ m_http_client->close(on_finish);
+}
+
+template <typename I>
+void HttpStream<I>::get_size(uint64_t* size, Context* on_finish) {
+ ldout(m_cct, 10) << dendl;
+
+ m_http_client->get_size(size, on_finish);
+}
+
+template <typename I>
+void HttpStream<I>::read(io::Extents&& byte_extents, bufferlist* data,
+ Context* on_finish) {
+ ldout(m_cct, 20) << "byte_extents=" << byte_extents << dendl;
+
+ m_http_client->read(std::move(byte_extents), data, on_finish);
+}
+
+} // namespace migration
+} // namespace librbd
+
+template class librbd::migration::HttpStream<librbd::ImageCtx>;
diff --git a/src/librbd/migration/HttpStream.h b/src/librbd/migration/HttpStream.h
new file mode 100644
index 000000000..01a583714
--- /dev/null
+++ b/src/librbd/migration/HttpStream.h
@@ -0,0 +1,68 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#ifndef CEPH_LIBRBD_MIGRATION_HTTP_STREAM_H
+#define CEPH_LIBRBD_MIGRATION_HTTP_STREAM_H
+
+#include "include/int_types.h"
+#include "librbd/migration/StreamInterface.h"
+#include <boost/beast/http/message.hpp>
+#include <boost/beast/http/string_body.hpp>
+#include <json_spirit/json_spirit.h>
+#include <memory>
+#include <string>
+
+struct Context;
+
+namespace librbd {
+
+struct AsioEngine;
+struct ImageCtx;
+
+namespace migration {
+
+template <typename> class HttpClient;
+
+template <typename ImageCtxT>
+class HttpStream : public StreamInterface {
+public:
+ static HttpStream* create(ImageCtxT* image_ctx,
+ const json_spirit::mObject& json_object) {
+ return new HttpStream(image_ctx, json_object);
+ }
+
+ HttpStream(ImageCtxT* image_ctx, const json_spirit::mObject& json_object);
+ ~HttpStream() override;
+
+ HttpStream(const HttpStream&) = delete;
+ HttpStream& operator=(const HttpStream&) = delete;
+
+ void open(Context* on_finish) override;
+ void close(Context* on_finish) override;
+
+ void get_size(uint64_t* size, Context* on_finish) override;
+
+ void read(io::Extents&& byte_extents, bufferlist* data,
+ Context* on_finish) override;
+
+private:
+ using HttpResponse = boost::beast::http::response<
+ boost::beast::http::string_body>;
+
+ ImageCtxT* m_image_ctx;
+ CephContext* m_cct;
+ std::shared_ptr<AsioEngine> m_asio_engine;
+ json_spirit::mObject m_json_object;
+
+ std::string m_url;
+
+ std::unique_ptr<HttpClient<ImageCtxT>> m_http_client;
+
+};
+
+} // namespace migration
+} // namespace librbd
+
+extern template class librbd::migration::HttpStream<librbd::ImageCtx>;
+
+#endif // CEPH_LIBRBD_MIGRATION_HTTP_STREAM_H
diff --git a/src/librbd/migration/ImageDispatch.cc b/src/librbd/migration/ImageDispatch.cc
new file mode 100644
index 000000000..3aa2eeb0b
--- /dev/null
+++ b/src/librbd/migration/ImageDispatch.cc
@@ -0,0 +1,157 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include "librbd/migration/ImageDispatch.h"
+#include "include/neorados/RADOS.hpp"
+#include "common/dout.h"
+#include "librbd/ImageCtx.h"
+#include "librbd/io/AioCompletion.h"
+#include "librbd/migration/FormatInterface.h"
+
+#define dout_subsys ceph_subsys_rbd
+#undef dout_prefix
+#define dout_prefix *_dout << "librbd::migration::ImageDispatch: " << this \
+ << " " << __func__ << ": "
+
+namespace librbd {
+namespace migration {
+
+template <typename I>
+ImageDispatch<I>::ImageDispatch(I* image_ctx,
+ std::unique_ptr<FormatInterface> format)
+ : m_image_ctx(image_ctx), m_format(std::move(format)) {
+ auto cct = m_image_ctx->cct;
+ ldout(cct, 10) << "ictx=" << image_ctx << dendl;
+}
+
+template <typename I>
+void ImageDispatch<I>::shut_down(Context* on_finish) {
+ auto cct = m_image_ctx->cct;
+ ldout(cct, 10) << dendl;
+
+ on_finish->complete(0);
+}
+
+template <typename I>
+bool ImageDispatch<I>::read(
+ io::AioCompletion* aio_comp, io::Extents &&image_extents,
+ io::ReadResult &&read_result, IOContext io_context, int op_flags,
+ int read_flags, const ZTracer::Trace &parent_trace, uint64_t tid,
+ std::atomic<uint32_t>* image_dispatch_flags,
+ io::DispatchResult* dispatch_result, Context** on_finish,
+ Context* on_dispatched) {
+ auto cct = m_image_ctx->cct;
+ ldout(cct, 20) << dendl;
+
+ *dispatch_result = io::DISPATCH_RESULT_COMPLETE;
+ return m_format->read(aio_comp, io_context->read_snap().value_or(CEPH_NOSNAP),
+ std::move(image_extents), std::move(read_result),
+ op_flags, read_flags, parent_trace);
+}
+
+template <typename I>
+bool ImageDispatch<I>::write(
+ io::AioCompletion* aio_comp, io::Extents &&image_extents, bufferlist &&bl,
+ IOContext io_context, int op_flags, const ZTracer::Trace &parent_trace,
+ uint64_t tid, std::atomic<uint32_t>* image_dispatch_flags,
+ io::DispatchResult* dispatch_result, Context** on_finish,
+ Context* on_dispatched) {
+ auto cct = m_image_ctx->cct;
+ lderr(cct) << dendl;
+
+ fail_io(-EROFS, aio_comp, dispatch_result);
+ return true;
+}
+
+template <typename I>
+bool ImageDispatch<I>::discard(
+ io::AioCompletion* aio_comp, io::Extents &&image_extents,
+ uint32_t discard_granularity_bytes,
+ IOContext io_context, const ZTracer::Trace &parent_trace, uint64_t tid,
+ std::atomic<uint32_t>* image_dispatch_flags,
+ io::DispatchResult* dispatch_result, Context** on_finish,
+ Context* on_dispatched) {
+ auto cct = m_image_ctx->cct;
+ lderr(cct) << dendl;
+
+ fail_io(-EROFS, aio_comp, dispatch_result);
+ return true;
+}
+
+template <typename I>
+bool ImageDispatch<I>::write_same(
+ io::AioCompletion* aio_comp, io::Extents &&image_extents, bufferlist &&bl,
+ IOContext io_context, int op_flags, const ZTracer::Trace &parent_trace,
+ uint64_t tid, std::atomic<uint32_t>* image_dispatch_flags,
+ io::DispatchResult* dispatch_result, Context** on_finish,
+ Context* on_dispatched) {
+ auto cct = m_image_ctx->cct;
+ lderr(cct) << dendl;
+
+ fail_io(-EROFS, aio_comp, dispatch_result);
+ return true;
+}
+
+template <typename I>
+bool ImageDispatch<I>::compare_and_write(
+ io::AioCompletion* aio_comp, io::Extents &&image_extents,
+ bufferlist &&cmp_bl, bufferlist &&bl, uint64_t *mismatch_offset,
+ IOContext io_context, int op_flags, const ZTracer::Trace &parent_trace,
+ uint64_t tid, std::atomic<uint32_t>* image_dispatch_flags,
+ io::DispatchResult* dispatch_result, Context** on_finish,
+ Context* on_dispatched) {
+ auto cct = m_image_ctx->cct;
+ lderr(cct) << dendl;
+
+ fail_io(-EROFS, aio_comp, dispatch_result);
+ return true;
+}
+
+template <typename I>
+bool ImageDispatch<I>::flush(
+ io::AioCompletion* aio_comp, io::FlushSource flush_source,
+ const ZTracer::Trace &parent_trace, uint64_t tid,
+ std::atomic<uint32_t>* image_dispatch_flags,
+ io::DispatchResult* dispatch_result, Context** on_finish,
+ Context* on_dispatched) {
+ auto cct = m_image_ctx->cct;
+ ldout(cct, 20) << dendl;
+
+ *dispatch_result = io::DISPATCH_RESULT_COMPLETE;
+ aio_comp->set_request_count(0);
+ return true;
+}
+
+template <typename I>
+bool ImageDispatch<I>::list_snaps(
+ io::AioCompletion* aio_comp, io::Extents&& image_extents,
+ io::SnapIds&& snap_ids, int list_snaps_flags,
+ io::SnapshotDelta* snapshot_delta, const ZTracer::Trace &parent_trace,
+ uint64_t tid, std::atomic<uint32_t>* image_dispatch_flags,
+ io::DispatchResult* dispatch_result, Context** on_finish,
+ Context* on_dispatched) {
+ auto cct = m_image_ctx->cct;
+ ldout(cct, 20) << dendl;
+
+ *dispatch_result = io::DISPATCH_RESULT_COMPLETE;
+
+ aio_comp->set_request_count(1);
+ auto ctx = new io::C_AioRequest(aio_comp);
+
+ m_format->list_snaps(std::move(image_extents), std::move(snap_ids),
+ list_snaps_flags, snapshot_delta, parent_trace,
+ ctx);
+ return true;
+}
+
+template <typename I>
+void ImageDispatch<I>::fail_io(int r, io::AioCompletion* aio_comp,
+ io::DispatchResult* dispatch_result) {
+ *dispatch_result = io::DISPATCH_RESULT_COMPLETE;
+ aio_comp->fail(r);
+}
+
+} // namespace migration
+} // namespace librbd
+
+template class librbd::migration::ImageDispatch<librbd::ImageCtx>;
diff --git a/src/librbd/migration/ImageDispatch.h b/src/librbd/migration/ImageDispatch.h
new file mode 100644
index 000000000..03bb3aa52
--- /dev/null
+++ b/src/librbd/migration/ImageDispatch.h
@@ -0,0 +1,102 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#ifndef CEPH_LIBRBD_MIGRATION_IMAGE_DISPATCH_H
+#define CEPH_LIBRBD_MIGRATION_IMAGE_DISPATCH_H
+
+#include "librbd/io/ImageDispatchInterface.h"
+#include <memory>
+
+struct Context;
+
+namespace librbd {
+
+struct ImageCtx;
+
+namespace migration {
+
+struct FormatInterface;
+
+template <typename ImageCtxT>
+class ImageDispatch : public io::ImageDispatchInterface {
+public:
+ static ImageDispatch* create(ImageCtxT* image_ctx,
+ std::unique_ptr<FormatInterface> source) {
+ return new ImageDispatch(image_ctx, std::move(source));
+ }
+
+ ImageDispatch(ImageCtxT* image_ctx, std::unique_ptr<FormatInterface> source);
+
+ void shut_down(Context* on_finish) override;
+
+ io::ImageDispatchLayer get_dispatch_layer() const override {
+ return io::IMAGE_DISPATCH_LAYER_MIGRATION;
+ }
+
+ bool read(
+ io::AioCompletion* aio_comp, io::Extents &&image_extents,
+ io::ReadResult &&read_result, IOContext io_context, int op_flags,
+ int read_flags, const ZTracer::Trace &parent_trace, uint64_t tid,
+ std::atomic<uint32_t>* image_dispatch_flags,
+ io::DispatchResult* dispatch_result, Context** on_finish,
+ Context* on_dispatched) override;
+ bool write(
+ io::AioCompletion* aio_comp, io::Extents &&image_extents, bufferlist &&bl,
+ IOContext io_context, int op_flags, const ZTracer::Trace &parent_trace,
+ uint64_t tid, std::atomic<uint32_t>* image_dispatch_flags,
+ io::DispatchResult* dispatch_result, Context** on_finish,
+ Context* on_dispatched) override;
+ bool discard(
+ io::AioCompletion* aio_comp, io::Extents &&image_extents,
+ uint32_t discard_granularity_bytes,
+ IOContext io_context, const ZTracer::Trace &parent_trace, uint64_t tid,
+ std::atomic<uint32_t>* image_dispatch_flags,
+ io::DispatchResult* dispatch_result, Context** on_finish,
+ Context* on_dispatched) override;
+ bool write_same(
+ io::AioCompletion* aio_comp, io::Extents &&image_extents, bufferlist &&bl,
+ IOContext io_context, int op_flags, const ZTracer::Trace &parent_trace,
+ uint64_t tid, std::atomic<uint32_t>* image_dispatch_flags,
+ io::DispatchResult* dispatch_result, Context** on_finish,
+ Context* on_dispatched) override;
+ bool compare_and_write(
+ io::AioCompletion* aio_comp, io::Extents &&image_extents,
+ bufferlist &&cmp_bl, bufferlist &&bl, uint64_t *mismatch_offset,
+ IOContext io_context, int op_flags, const ZTracer::Trace &parent_trace,
+ uint64_t tid, std::atomic<uint32_t>* image_dispatch_flags,
+ io::DispatchResult* dispatch_result, Context** on_finish,
+ Context* on_dispatched) override;
+ bool flush(
+ io::AioCompletion* aio_comp, io::FlushSource flush_source,
+ const ZTracer::Trace &parent_trace, uint64_t tid,
+ std::atomic<uint32_t>* image_dispatch_flags,
+ io::DispatchResult* dispatch_result, Context** on_finish,
+ Context* on_dispatched) override;
+
+ bool list_snaps(
+ io::AioCompletion* aio_comp, io::Extents&& image_extents,
+ io::SnapIds&& snap_ids, int list_snaps_flags,
+ io::SnapshotDelta* snapshot_delta, const ZTracer::Trace &parent_trace,
+ uint64_t tid, std::atomic<uint32_t>* image_dispatch_flags,
+ io::DispatchResult* dispatch_result, Context** on_finish,
+ Context* on_dispatched) override;
+
+ bool invalidate_cache(Context* on_finish) override {
+ return false;
+ }
+
+private:
+ ImageCtxT* m_image_ctx;
+ std::unique_ptr<FormatInterface> m_format;
+
+ void fail_io(int r, io::AioCompletion* aio_comp,
+ io::DispatchResult* dispatch_result);
+
+};
+
+} // namespace migration
+} // namespace librbd
+
+extern template class librbd::migration::ImageDispatch<librbd::ImageCtx>;
+
+#endif // CEPH_LIBRBD_MIGRATION_IMAGE_DISPATCH_H
diff --git a/src/librbd/migration/NativeFormat.cc b/src/librbd/migration/NativeFormat.cc
new file mode 100644
index 000000000..a7682619c
--- /dev/null
+++ b/src/librbd/migration/NativeFormat.cc
@@ -0,0 +1,309 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include "librbd/migration/NativeFormat.h"
+#include "include/neorados/RADOS.hpp"
+#include "common/dout.h"
+#include "common/errno.h"
+#include "librbd/ImageCtx.h"
+#include "librbd/ImageState.h"
+#include "librbd/Utils.h"
+#include "librbd/asio/ContextWQ.h"
+#include "librbd/io/ImageDispatchSpec.h"
+#include "json_spirit/json_spirit.h"
+#include "boost/lexical_cast.hpp"
+#include <sstream>
+
+#define dout_subsys ceph_subsys_rbd
+#undef dout_prefix
+#define dout_prefix *_dout << "librbd::migration::NativeFormat: " << this \
+ << " " << __func__ << ": "
+
+namespace librbd {
+namespace migration {
+
+namespace {
+
+const std::string TYPE_KEY{"type"};
+const std::string POOL_ID_KEY{"pool_id"};
+const std::string POOL_NAME_KEY{"pool_name"};
+const std::string POOL_NAMESPACE_KEY{"pool_namespace"};
+const std::string IMAGE_NAME_KEY{"image_name"};
+const std::string IMAGE_ID_KEY{"image_id"};
+const std::string SNAP_NAME_KEY{"snap_name"};
+const std::string SNAP_ID_KEY{"snap_id"};
+
+} // anonymous namespace
+
+template <typename I>
+std::string NativeFormat<I>::build_source_spec(
+ int64_t pool_id, const std::string& pool_namespace,
+ const std::string& image_name, const std::string& image_id) {
+ json_spirit::mObject source_spec;
+ source_spec[TYPE_KEY] = "native";
+ source_spec[POOL_ID_KEY] = pool_id;
+ source_spec[POOL_NAMESPACE_KEY] = pool_namespace;
+ source_spec[IMAGE_NAME_KEY] = image_name;
+ if (!image_id.empty()) {
+ source_spec[IMAGE_ID_KEY] = image_id;
+ }
+ return json_spirit::write(source_spec);
+}
+
+template <typename I>
+NativeFormat<I>::NativeFormat(
+ I* image_ctx, const json_spirit::mObject& json_object, bool import_only)
+ : m_image_ctx(image_ctx), m_json_object(json_object),
+ m_import_only(import_only) {
+}
+
+template <typename I>
+void NativeFormat<I>::open(Context* on_finish) {
+ auto cct = m_image_ctx->cct;
+ ldout(cct, 10) << dendl;
+
+ auto& pool_name_val = m_json_object[POOL_NAME_KEY];
+ if (pool_name_val.type() == json_spirit::str_type) {
+ librados::Rados rados(m_image_ctx->md_ctx);
+ librados::IoCtx io_ctx;
+ int r = rados.ioctx_create(pool_name_val.get_str().c_str(), io_ctx);
+ if (r < 0 ) {
+ lderr(cct) << "invalid pool name" << dendl;
+ on_finish->complete(r);
+ return;
+ }
+
+ m_pool_id = io_ctx.get_id();
+ } else if (pool_name_val.type() != json_spirit::null_type) {
+ lderr(cct) << "invalid pool name" << dendl;
+ on_finish->complete(-EINVAL);
+ return;
+ }
+
+ auto& pool_id_val = m_json_object[POOL_ID_KEY];
+ if (m_pool_id != -1 && pool_id_val.type() != json_spirit::null_type) {
+ lderr(cct) << "cannot specify both pool name and pool id" << dendl;
+ on_finish->complete(-EINVAL);
+ return;
+ } else if (pool_id_val.type() == json_spirit::int_type) {
+ m_pool_id = pool_id_val.get_int64();
+ } else if (pool_id_val.type() == json_spirit::str_type) {
+ try {
+ m_pool_id = boost::lexical_cast<int64_t>(pool_id_val.get_str());
+ } catch (boost::bad_lexical_cast &) {
+ }
+ }
+
+ if (m_pool_id == -1) {
+ lderr(cct) << "missing or invalid pool id" << dendl;
+ on_finish->complete(-EINVAL);
+ return;
+ }
+
+ auto& pool_namespace_val = m_json_object[POOL_NAMESPACE_KEY];
+ if (pool_namespace_val.type() == json_spirit::str_type) {
+ m_pool_namespace = pool_namespace_val.get_str();
+ } else if (pool_namespace_val.type() != json_spirit::null_type) {
+ lderr(cct) << "invalid pool namespace" << dendl;
+ on_finish->complete(-EINVAL);
+ return;
+ }
+
+ auto& image_name_val = m_json_object[IMAGE_NAME_KEY];
+ if (image_name_val.type() != json_spirit::str_type) {
+ lderr(cct) << "missing or invalid image name" << dendl;
+ on_finish->complete(-EINVAL);
+ return;
+ }
+ m_image_name = image_name_val.get_str();
+
+ auto& image_id_val = m_json_object[IMAGE_ID_KEY];
+ if (image_id_val.type() == json_spirit::str_type) {
+ m_image_id = image_id_val.get_str();
+ } else if (image_id_val.type() != json_spirit::null_type) {
+ lderr(cct) << "invalid image id" << dendl;
+ on_finish->complete(-EINVAL);
+ return;
+ }
+
+ auto& snap_name_val = m_json_object[SNAP_NAME_KEY];
+ if (snap_name_val.type() == json_spirit::str_type) {
+ m_snap_name = snap_name_val.get_str();
+ } else if (snap_name_val.type() != json_spirit::null_type) {
+ lderr(cct) << "invalid snap name" << dendl;
+ on_finish->complete(-EINVAL);
+ return;
+ }
+
+ auto& snap_id_val = m_json_object[SNAP_ID_KEY];
+ if (!m_snap_name.empty() && snap_id_val.type() != json_spirit::null_type) {
+ lderr(cct) << "cannot specify both snap name and snap id" << dendl;
+ on_finish->complete(-EINVAL);
+ return;
+ } else if (snap_id_val.type() == json_spirit::str_type) {
+ try {
+ m_snap_id = boost::lexical_cast<uint64_t>(snap_id_val.get_str());
+ } catch (boost::bad_lexical_cast &) {
+ }
+ } else if (snap_id_val.type() == json_spirit::int_type) {
+ m_snap_id = snap_id_val.get_uint64();
+ }
+
+ if (snap_id_val.type() != json_spirit::null_type &&
+ m_snap_id == CEPH_NOSNAP) {
+ lderr(cct) << "invalid snap id" << dendl;
+ on_finish->complete(-EINVAL);
+ return;
+ }
+
+ // snapshot is required for import to keep source read-only
+ if (m_import_only && m_snap_name.empty() && m_snap_id == CEPH_NOSNAP) {
+ lderr(cct) << "snapshot required for import" << dendl;
+ on_finish->complete(-EINVAL);
+ return;
+ }
+
+ // TODO add support for external clusters
+ librados::IoCtx io_ctx;
+ int r = util::create_ioctx(m_image_ctx->md_ctx, "source image",
+ m_pool_id, m_pool_namespace, &io_ctx);
+ if (r < 0) {
+ on_finish->complete(r);
+ return;
+ }
+
+ m_image_ctx->md_ctx.dup(io_ctx);
+ m_image_ctx->data_ctx.dup(io_ctx);
+ m_image_ctx->name = m_image_name;
+
+ uint64_t flags = 0;
+ if (m_image_id.empty() && !m_import_only) {
+ flags |= OPEN_FLAG_OLD_FORMAT;
+ } else {
+ m_image_ctx->id = m_image_id;
+ }
+
+ if (m_image_ctx->child != nullptr) {
+ // set rados flags for reading the parent image
+ if (m_image_ctx->child->config.template get_val<bool>("rbd_balance_parent_reads")) {
+ m_image_ctx->set_read_flag(librados::OPERATION_BALANCE_READS);
+ } else if (m_image_ctx->child->config.template get_val<bool>("rbd_localize_parent_reads")) {
+ m_image_ctx->set_read_flag(librados::OPERATION_LOCALIZE_READS);
+ }
+ }
+
+ // open the source RBD image
+ on_finish = new LambdaContext([this, on_finish](int r) {
+ handle_open(r, on_finish); });
+ m_image_ctx->state->open(flags, on_finish);
+}
+
+template <typename I>
+void NativeFormat<I>::handle_open(int r, Context* on_finish) {
+ auto cct = m_image_ctx->cct;
+ ldout(cct, 10) << "r=" << r << dendl;
+
+ if (r < 0) {
+ lderr(cct) << "failed to open image: " << cpp_strerror(r) << dendl;
+ on_finish->complete(r);
+ return;
+ }
+
+ if (m_snap_id == CEPH_NOSNAP && m_snap_name.empty()) {
+ on_finish->complete(0);
+ return;
+ }
+
+ if (!m_snap_name.empty()) {
+ std::shared_lock image_locker{m_image_ctx->image_lock};
+ m_snap_id = m_image_ctx->get_snap_id(cls::rbd::UserSnapshotNamespace{},
+ m_snap_name);
+ }
+
+ if (m_snap_id == CEPH_NOSNAP) {
+ lderr(cct) << "failed to locate snapshot " << m_snap_name << dendl;
+ on_finish = new LambdaContext([on_finish](int) {
+ on_finish->complete(-ENOENT); });
+ m_image_ctx->state->close(on_finish);
+ return;
+ }
+
+ on_finish = new LambdaContext([this, on_finish](int r) {
+ handle_snap_set(r, on_finish); });
+ m_image_ctx->state->snap_set(m_snap_id, on_finish);
+}
+
+template <typename I>
+void NativeFormat<I>::handle_snap_set(int r, Context* on_finish) {
+ auto cct = m_image_ctx->cct;
+ ldout(cct, 10) << "r=" << r << dendl;
+
+ if (r < 0) {
+ lderr(cct) << "failed to set snapshot " << m_snap_id << ": "
+ << cpp_strerror(r) << dendl;
+ on_finish = new LambdaContext([r, on_finish](int) {
+ on_finish->complete(r); });
+ m_image_ctx->state->close(on_finish);
+ return;
+ }
+
+ on_finish->complete(0);
+}
+
+template <typename I>
+void NativeFormat<I>::close(Context* on_finish) {
+ auto cct = m_image_ctx->cct;
+ ldout(cct, 10) << dendl;
+
+ // the native librbd::image::CloseRequest handles all cleanup
+ on_finish->complete(0);
+}
+
+template <typename I>
+void NativeFormat<I>::get_snapshots(SnapInfos* snap_infos, Context* on_finish) {
+ auto cct = m_image_ctx->cct;
+ ldout(cct, 10) << dendl;
+
+ m_image_ctx->image_lock.lock_shared();
+ *snap_infos = m_image_ctx->snap_info;
+ m_image_ctx->image_lock.unlock_shared();
+
+ on_finish->complete(0);
+}
+
+template <typename I>
+void NativeFormat<I>::get_image_size(uint64_t snap_id, uint64_t* size,
+ Context* on_finish) {
+ auto cct = m_image_ctx->cct;
+ ldout(cct, 10) << dendl;
+
+ m_image_ctx->image_lock.lock_shared();
+ *size = m_image_ctx->get_image_size(snap_id);
+ m_image_ctx->image_lock.unlock_shared();
+
+
+ on_finish->complete(0);
+}
+
+template <typename I>
+void NativeFormat<I>::list_snaps(io::Extents&& image_extents,
+ io::SnapIds&& snap_ids, int list_snaps_flags,
+ io::SnapshotDelta* snapshot_delta,
+ const ZTracer::Trace &parent_trace,
+ Context* on_finish) {
+ auto cct = m_image_ctx->cct;
+ ldout(cct, 20) << "image_extents=" << image_extents << dendl;
+
+ auto aio_comp = io::AioCompletion::create_and_start(
+ on_finish, util::get_image_ctx(m_image_ctx), io::AIO_TYPE_GENERIC);
+ auto req = io::ImageDispatchSpec::create_list_snaps(
+ *m_image_ctx, io::IMAGE_DISPATCH_LAYER_MIGRATION, aio_comp,
+ std::move(image_extents), std::move(snap_ids), list_snaps_flags,
+ snapshot_delta, {});
+ req->send();
+}
+
+} // namespace migration
+} // namespace librbd
+
+template class librbd::migration::NativeFormat<librbd::ImageCtx>;
diff --git a/src/librbd/migration/NativeFormat.h b/src/librbd/migration/NativeFormat.h
new file mode 100644
index 000000000..e58c04121
--- /dev/null
+++ b/src/librbd/migration/NativeFormat.h
@@ -0,0 +1,82 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#ifndef CEPH_LIBRBD_MIGRATION_NATIVE_FORMAT_H
+#define CEPH_LIBRBD_MIGRATION_NATIVE_FORMAT_H
+
+#include "include/int_types.h"
+#include "librbd/Types.h"
+#include "librbd/migration/FormatInterface.h"
+#include "json_spirit/json_spirit.h"
+#include <memory>
+
+struct Context;
+
+namespace librbd {
+
+struct AsioEngine;
+struct ImageCtx;
+
+namespace migration {
+
+template <typename ImageCtxT>
+class NativeFormat : public FormatInterface {
+public:
+ static std::string build_source_spec(int64_t pool_id,
+ const std::string& pool_namespace,
+ const std::string& image_name,
+ const std::string& image_id);
+
+ static NativeFormat* create(ImageCtxT* image_ctx,
+ const json_spirit::mObject& json_object,
+ bool import_only) {
+ return new NativeFormat(image_ctx, json_object, import_only);
+ }
+
+ NativeFormat(ImageCtxT* image_ctx, const json_spirit::mObject& json_object,
+ bool import_only);
+ NativeFormat(const NativeFormat&) = delete;
+ NativeFormat& operator=(const NativeFormat&) = delete;
+
+ void open(Context* on_finish) override;
+ void close(Context* on_finish) override;
+
+ void get_snapshots(SnapInfos* snap_infos, Context* on_finish) override;
+ void get_image_size(uint64_t snap_id, uint64_t* size,
+ Context* on_finish) override;
+
+ bool read(io::AioCompletion* aio_comp, uint64_t snap_id,
+ io::Extents&& image_extents, io::ReadResult&& read_result,
+ int op_flags, int read_flags,
+ const ZTracer::Trace &parent_trace) override {
+ return false;
+ }
+
+ void list_snaps(io::Extents&& image_extents, io::SnapIds&& snap_ids,
+ int list_snaps_flags, io::SnapshotDelta* snapshot_delta,
+ const ZTracer::Trace &parent_trace,
+ Context* on_finish) override;
+
+private:
+ ImageCtxT* m_image_ctx;
+ json_spirit::mObject m_json_object;
+ bool m_import_only;
+
+ int64_t m_pool_id = -1;
+ std::string m_pool_namespace;
+ std::string m_image_name;
+ std::string m_image_id;
+ std::string m_snap_name;
+ uint64_t m_snap_id = CEPH_NOSNAP;
+
+ void handle_open(int r, Context* on_finish);
+ void handle_snap_set(int r, Context* on_finish);
+
+};
+
+} // namespace migration
+} // namespace librbd
+
+extern template class librbd::migration::NativeFormat<librbd::ImageCtx>;
+
+#endif // CEPH_LIBRBD_MIGRATION_NATIVE_FORMAT_H
diff --git a/src/librbd/migration/OpenSourceImageRequest.cc b/src/librbd/migration/OpenSourceImageRequest.cc
new file mode 100644
index 000000000..8abdedf33
--- /dev/null
+++ b/src/librbd/migration/OpenSourceImageRequest.cc
@@ -0,0 +1,249 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include "librbd/migration/OpenSourceImageRequest.h"
+#include "common/dout.h"
+#include "common/errno.h"
+#include "librbd/ImageCtx.h"
+#include "librbd/ImageState.h"
+#include "librbd/Utils.h"
+#include "librbd/io/ImageDispatcher.h"
+#include "librbd/migration/ImageDispatch.h"
+#include "librbd/migration/NativeFormat.h"
+#include "librbd/migration/SourceSpecBuilder.h"
+
+#define dout_subsys ceph_subsys_rbd
+#undef dout_prefix
+#define dout_prefix *_dout << "librbd::migration::OpenSourceImageRequest: " \
+ << this << " " << __func__ << ": "
+
+namespace librbd {
+namespace migration {
+
+template <typename I>
+OpenSourceImageRequest<I>::OpenSourceImageRequest(
+ librados::IoCtx& io_ctx, I* dst_image_ctx, uint64_t src_snap_id,
+ const MigrationInfo &migration_info, I** src_image_ctx, Context* on_finish)
+ : m_cct(reinterpret_cast<CephContext*>(io_ctx.cct())), m_io_ctx(io_ctx),
+ m_dst_image_ctx(dst_image_ctx), m_src_snap_id(src_snap_id),
+ m_migration_info(migration_info), m_src_image_ctx(src_image_ctx),
+ m_on_finish(on_finish) {
+ ldout(m_cct, 10) << dendl;
+}
+
+template <typename I>
+void OpenSourceImageRequest<I>::send() {
+ open_source();
+}
+
+template <typename I>
+void OpenSourceImageRequest<I>::open_source() {
+ ldout(m_cct, 10) << dendl;
+
+ // note that all source image ctx properties are placeholders
+ *m_src_image_ctx = I::create("", "", CEPH_NOSNAP, m_io_ctx, true);
+ auto src_image_ctx = *m_src_image_ctx;
+ src_image_ctx->child = m_dst_image_ctx;
+
+ // use default layout values (can be overridden by source layers later)
+ src_image_ctx->order = 22;
+ src_image_ctx->layout = file_layout_t();
+ src_image_ctx->layout.stripe_count = 1;
+ src_image_ctx->layout.stripe_unit = 1ULL << src_image_ctx->order;
+ src_image_ctx->layout.object_size = 1Ull << src_image_ctx->order;
+ src_image_ctx->layout.pool_id = -1;
+
+ bool import_only = true;
+ auto source_spec = m_migration_info.source_spec;
+ if (source_spec.empty()) {
+ // implies legacy migration from RBD image in same cluster
+ source_spec = NativeFormat<I>::build_source_spec(
+ m_migration_info.pool_id, m_migration_info.pool_namespace,
+ m_migration_info.image_name, m_migration_info.image_id);
+ import_only = false;
+ }
+
+ ldout(m_cct, 15) << "source_spec=" << source_spec << ", "
+ << "source_snap_id=" << m_src_snap_id << ", "
+ << "import_only=" << import_only << dendl;
+
+ SourceSpecBuilder<I> source_spec_builder{src_image_ctx};
+ json_spirit::mObject source_spec_object;
+ int r = source_spec_builder.parse_source_spec(source_spec,
+ &source_spec_object);
+ if (r < 0) {
+ lderr(m_cct) << "failed to parse migration source-spec:" << cpp_strerror(r)
+ << dendl;
+ (*m_src_image_ctx)->state->close();
+ finish(r);
+ return;
+ }
+
+ r = source_spec_builder.build_format(source_spec_object, import_only,
+ &m_format);
+ if (r < 0) {
+ lderr(m_cct) << "failed to build migration format handler: "
+ << cpp_strerror(r) << dendl;
+ (*m_src_image_ctx)->state->close();
+ finish(r);
+ return;
+ }
+
+ auto ctx = util::create_context_callback<
+ OpenSourceImageRequest<I>,
+ &OpenSourceImageRequest<I>::handle_open_source>(this);
+ m_format->open(ctx);
+}
+
+template <typename I>
+void OpenSourceImageRequest<I>::handle_open_source(int r) {
+ ldout(m_cct, 10) << "r=" << r << dendl;
+
+ if (r < 0) {
+ lderr(m_cct) << "failed to open migration source: " << cpp_strerror(r)
+ << dendl;
+ finish(r);
+ return;
+ }
+
+ get_image_size();
+}
+
+template <typename I>
+void OpenSourceImageRequest<I>::get_image_size() {
+ ldout(m_cct, 10) << dendl;
+
+ auto ctx = util::create_context_callback<
+ OpenSourceImageRequest<I>,
+ &OpenSourceImageRequest<I>::handle_get_image_size>(this);
+ m_format->get_image_size(CEPH_NOSNAP, &m_image_size, ctx);
+}
+
+template <typename I>
+void OpenSourceImageRequest<I>::handle_get_image_size(int r) {
+ ldout(m_cct, 10) << "r=" << r << ", "
+ << "image_size=" << m_image_size << dendl;
+
+ if (r < 0) {
+ lderr(m_cct) << "failed to retrieve image size: " << cpp_strerror(r)
+ << dendl;
+ close_image(r);
+ return;
+ }
+
+ auto src_image_ctx = *m_src_image_ctx;
+ src_image_ctx->image_lock.lock();
+ src_image_ctx->size = m_image_size;
+ src_image_ctx->image_lock.unlock();
+
+ get_snapshots();
+}
+
+template <typename I>
+void OpenSourceImageRequest<I>::get_snapshots() {
+ ldout(m_cct, 10) << dendl;
+
+ auto ctx = util::create_context_callback<
+ OpenSourceImageRequest<I>,
+ &OpenSourceImageRequest<I>::handle_get_snapshots>(this);
+ m_format->get_snapshots(&m_snap_infos, ctx);
+}
+
+template <typename I>
+void OpenSourceImageRequest<I>::handle_get_snapshots(int r) {
+ ldout(m_cct, 10) << "r=" << r << dendl;
+
+ if (r < 0) {
+ lderr(m_cct) << "failed to retrieve snapshots: " << cpp_strerror(r)
+ << dendl;
+ close_image(r);
+ return;
+ }
+
+ // copy snapshot metadata to image ctx
+ auto src_image_ctx = *m_src_image_ctx;
+ src_image_ctx->image_lock.lock();
+
+ src_image_ctx->snaps.clear();
+ src_image_ctx->snap_info.clear();
+ src_image_ctx->snap_ids.clear();
+
+ ::SnapContext snapc;
+ for (auto it = m_snap_infos.rbegin(); it != m_snap_infos.rend(); ++it) {
+ auto& [snap_id, snap_info] = *it;
+ snapc.snaps.push_back(snap_id);
+
+ ldout(m_cct, 10) << "adding snap: ns=" << snap_info.snap_namespace << ", "
+ << "name=" << snap_info.name << ", "
+ << "id=" << snap_id << dendl;
+ src_image_ctx->add_snap(
+ snap_info.snap_namespace, snap_info.name, snap_id,
+ snap_info.size, snap_info.parent, snap_info.protection_status,
+ snap_info.flags, snap_info.timestamp);
+ }
+ if (!snapc.snaps.empty()) {
+ snapc.seq = snapc.snaps[0];
+ }
+ src_image_ctx->snapc = snapc;
+
+ ldout(m_cct, 15) << "read snap id: " << m_src_snap_id << ", "
+ << "write snapc={"
+ << "seq=" << snapc.seq << ", "
+ << "snaps=" << snapc.snaps << "}" << dendl;
+
+ // ensure data_ctx and data_io_context are pointing to correct snapshot
+ if (m_src_snap_id != CEPH_NOSNAP) {
+ int r = src_image_ctx->snap_set(m_src_snap_id);
+ if (r < 0) {
+ src_image_ctx->image_lock.unlock();
+
+ lderr(m_cct) << "error setting source image snap id: "
+ << cpp_strerror(r) << dendl;
+ finish(r);
+ return;
+ }
+ }
+
+ src_image_ctx->image_lock.unlock();
+
+ finish(0);
+}
+
+template <typename I>
+void OpenSourceImageRequest<I>::close_image(int r) {
+ ldout(m_cct, 10) << "r=" << r << dendl;
+
+ auto ctx = new LambdaContext([this, r](int) {
+ finish(r);
+ });
+ (*m_src_image_ctx)->state->close(ctx);
+}
+
+template <typename I>
+void OpenSourceImageRequest<I>::register_image_dispatch() {
+ ldout(m_cct, 10) << dendl;
+
+ // intercept any IO requests to the source image
+ auto io_image_dispatch = ImageDispatch<I>::create(
+ *m_src_image_ctx, std::move(m_format));
+ (*m_src_image_ctx)->io_image_dispatcher->register_dispatch(io_image_dispatch);
+}
+
+template <typename I>
+void OpenSourceImageRequest<I>::finish(int r) {
+ ldout(m_cct, 10) << "r=" << r << dendl;
+
+ if (r < 0) {
+ *m_src_image_ctx = nullptr;
+ } else {
+ register_image_dispatch();
+ }
+
+ m_on_finish->complete(r);
+ delete this;
+}
+
+} // namespace migration
+} // namespace librbd
+
+template class librbd::migration::OpenSourceImageRequest<librbd::ImageCtx>;
diff --git a/src/librbd/migration/OpenSourceImageRequest.h b/src/librbd/migration/OpenSourceImageRequest.h
new file mode 100644
index 000000000..f0dab3ad9
--- /dev/null
+++ b/src/librbd/migration/OpenSourceImageRequest.h
@@ -0,0 +1,103 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#ifndef CEPH_LIBRBD_MIGRATION_OPEN_SOURCE_IMAGE_REQUEST_H
+#define CEPH_LIBRBD_MIGRATION_OPEN_SOURCE_IMAGE_REQUEST_H
+
+#include "include/rados/librados_fwd.hpp"
+#include "librbd/Types.h"
+#include <map>
+#include <memory>
+
+struct Context;
+
+namespace librbd {
+
+struct ImageCtx;
+
+namespace migration {
+
+struct FormatInterface;
+
+template <typename ImageCtxT>
+class OpenSourceImageRequest {
+public:
+ static OpenSourceImageRequest* create(librados::IoCtx& io_ctx,
+ ImageCtxT* destination_image_ctx,
+ uint64_t src_snap_id,
+ const MigrationInfo &migration_info,
+ ImageCtxT** source_image_ctx,
+ Context* on_finish) {
+ return new OpenSourceImageRequest(io_ctx, destination_image_ctx,
+ src_snap_id, migration_info,
+ source_image_ctx, on_finish);
+ }
+
+ OpenSourceImageRequest(librados::IoCtx& io_ctx,
+ ImageCtxT* destination_image_ctx,
+ uint64_t src_snap_id,
+ const MigrationInfo &migration_info,
+ ImageCtxT** source_image_ctx,
+ Context* on_finish);
+
+ void send();
+
+private:
+ /**
+ * @verbatim
+ *
+ * <start>
+ * |
+ * v
+ * OPEN_SOURCE
+ * |
+ * v
+ * GET_IMAGE_SIZE * * * * * * *
+ * | *
+ * v v
+ * GET_SNAPSHOTS * * * * > CLOSE_IMAGE
+ * | |
+ * v |
+ * <finish> <------------------/
+ *
+ * @endverbatim
+ */
+
+ typedef std::map<uint64_t, SnapInfo> SnapInfos;
+
+ CephContext* m_cct;
+ librados::IoCtx& m_io_ctx;
+ ImageCtxT* m_dst_image_ctx;
+ uint64_t m_src_snap_id;
+ MigrationInfo m_migration_info;
+ ImageCtxT** m_src_image_ctx;
+ Context* m_on_finish;
+
+ std::unique_ptr<FormatInterface> m_format;
+
+ uint64_t m_image_size = 0;
+ SnapInfos m_snap_infos;
+
+ void open_source();
+ void handle_open_source(int r);
+
+ void get_image_size();
+ void handle_get_image_size(int r);
+
+ void get_snapshots();
+ void handle_get_snapshots(int r);
+
+ void close_image(int r);
+
+ void register_image_dispatch();
+
+ void finish(int r);
+
+};
+
+} // namespace migration
+} // namespace librbd
+
+extern template class librbd::migration::OpenSourceImageRequest<librbd::ImageCtx>;
+
+#endif // CEPH_LIBRBD_MIGRATION_OPEN_SOURCE_IMAGE_REQUEST_H
diff --git a/src/librbd/migration/QCOW.h b/src/librbd/migration/QCOW.h
new file mode 100644
index 000000000..23401e515
--- /dev/null
+++ b/src/librbd/migration/QCOW.h
@@ -0,0 +1,466 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+/* Based on QEMU block/qcow.cc and block/qcow2.h, which has this license: */
+
+/*
+ * Block driver for the QCOW version 2 format
+ *
+ * Copyright (c) 2004-2006 Fabrice Bellard
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in
+ * all copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
+ * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
+ * THE SOFTWARE.
+ */
+
+#ifndef CEPH_LIBRBD_MIGRATION_QCOW2_H
+#define CEPH_LIBRBD_MIGRATION_QCOW2_H
+
+#include "include/ceph_assert.h"
+#include "include/int_types.h"
+#include "librbd/migration/QCOW.h"
+
+#define QCOW_MAGIC (('Q' << 24) | ('F' << 16) | ('I' << 8) | 0xfb)
+
+#define QCOW_CRYPT_NONE 0
+#define QCOW_CRYPT_AES 1
+#define QCOW_CRYPT_LUKS 2
+
+#define QCOW_MAX_CRYPT_CLUSTERS 32
+#define QCOW_MAX_SNAPSHOTS 65536
+
+/* Field widths in qcow2 mean normal cluster offsets cannot reach
+ * 64PB; depending on cluster size, compressed clusters can have a
+ * smaller limit (64PB for up to 16k clusters, then ramps down to
+ * 512TB for 2M clusters). */
+#define QCOW_MAX_CLUSTER_OFFSET ((1ULL << 56) - 1)
+
+/* 8 MB refcount table is enough for 2 PB images at 64k cluster size
+ * (128 GB for 512 byte clusters, 2 EB for 2 MB clusters) */
+#define QCOW_MAX_REFTABLE_SIZE (1ULL << 23)
+
+/* 32 MB L1 table is enough for 2 PB images at 64k cluster size
+ * (128 GB for 512 byte clusters, 2 EB for 2 MB clusters) */
+#define QCOW_MAX_L1_SIZE (1ULL << 25)
+
+/* Allow for an average of 1k per snapshot table entry, should be plenty of
+ * space for snapshot names and IDs */
+#define QCOW_MAX_SNAPSHOTS_SIZE (1024 * QCOW_MAX_SNAPSHOTS)
+
+/* Maximum amount of extra data per snapshot table entry to accept */
+#define QCOW_MAX_SNAPSHOT_EXTRA_DATA 1024
+
+/* Bitmap header extension constraints */
+#define QCOW2_MAX_BITMAPS 65535
+#define QCOW2_MAX_BITMAP_DIRECTORY_SIZE (1024 * QCOW2_MAX_BITMAPS)
+
+/* Maximum of parallel sub-request per guest request */
+#define QCOW2_MAX_WORKERS 8
+
+/* indicate that the refcount of the referenced cluster is exactly one. */
+#define QCOW_OFLAG_COPIED (1ULL << 63)
+/* indicate that the cluster is compressed (they never have the copied flag) */
+#define QCOW_OFLAG_COMPRESSED (1ULL << 62)
+/* The cluster reads as all zeros */
+#define QCOW_OFLAG_ZERO (1ULL << 0)
+
+#define QCOW_EXTL2_SUBCLUSTERS_PER_CLUSTER 32
+
+/* The subcluster X [0..31] is allocated */
+#define QCOW_OFLAG_SUB_ALLOC(X) (1ULL << (X))
+/* The subcluster X [0..31] reads as zeroes */
+#define QCOW_OFLAG_SUB_ZERO(X) (QCOW_OFLAG_SUB_ALLOC(X) << 32)
+/* Subclusters [X, Y) (0 <= X <= Y <= 32) are allocated */
+#define QCOW_OFLAG_SUB_ALLOC_RANGE(X, Y) \
+ (QCOW_OFLAG_SUB_ALLOC(Y) - QCOW_OFLAG_SUB_ALLOC(X))
+/* Subclusters [X, Y) (0 <= X <= Y <= 32) read as zeroes */
+#define QCOW_OFLAG_SUB_ZERO_RANGE(X, Y) \
+ (QCOW_OFLAG_SUB_ALLOC_RANGE(X, Y) << 32)
+/* L2 entry bitmap with all allocation bits set */
+#define QCOW_L2_BITMAP_ALL_ALLOC (QCOW_OFLAG_SUB_ALLOC_RANGE(0, 32))
+/* L2 entry bitmap with all "read as zeroes" bits set */
+#define QCOW_L2_BITMAP_ALL_ZEROES (QCOW_OFLAG_SUB_ZERO_RANGE(0, 32))
+
+/* Size of normal and extended L2 entries */
+#define QCOW_L2E_SIZE_NORMAL (sizeof(uint64_t))
+#define QCOW_L2E_SIZE_EXTENDED (sizeof(uint64_t) * 2)
+
+/* Size of L1 table entries */
+#define QCOW_L1E_SIZE (sizeof(uint64_t))
+
+/* Size of reftable entries */
+#define QCOW_REFTABLE_ENTRY_SIZE (sizeof(uint64_t))
+
+#define QCOW_MIN_CLUSTER_BITS 9
+#define QCOW_MAX_CLUSTER_BITS 21
+
+/* Defined in the qcow2 spec (compressed cluster descriptor) */
+#define QCOW2_COMPRESSED_SECTOR_SIZE 512U
+#define QCOW2_COMPRESSED_SECTOR_MASK (~(QCOW2_COMPRESSED_SECTOR_SIZE - 1ULL))
+
+#define QCOW_L2_CACHE_SIZE 16
+
+/* Must be at least 2 to cover COW */
+#define QCOW_MIN_L2_CACHE_SIZE 2 /* cache entries */
+
+/* Must be at least 4 to cover all cases of refcount table growth */
+#define QCOW_MIN_REFCOUNT_CACHE_SIZE 4 /* clusters */
+
+#define QCOW_DEFAULT_L2_CACHE_MAX_SIZE (1ULL << 25)
+#define QCOW_DEFAULT_CACHE_CLEAN_INTERVAL 600 /* seconds */
+
+#define QCOW_DEFAULT_CLUSTER_SIZE 65536
+
+#define QCOW2_OPT_DATA_FILE "data-file"
+#define QCOW2_OPT_LAZY_REFCOUNTS "lazy-refcounts"
+#define QCOW2_OPT_DISCARD_REQUEST "pass-discard-request"
+#define QCOW2_OPT_DISCARD_SNAPSHOT "pass-discard-snapshot"
+#define QCOW2_OPT_DISCARD_OTHER "pass-discard-other"
+#define QCOW2_OPT_OVERLAP "overlap-check"
+#define QCOW2_OPT_OVERLAP_TEMPLATE "overlap-check.template"
+#define QCOW2_OPT_OVERLAP_MAIN_HEADER "overlap-check.main-header"
+#define QCOW2_OPT_OVERLAP_ACTIVE_L1 "overlap-check.active-l1"
+#define QCOW2_OPT_OVERLAP_ACTIVE_L2 "overlap-check.active-l2"
+#define QCOW2_OPT_OVERLAP_REFCOUNT_TABLE "overlap-check.refcount-table"
+#define QCOW2_OPT_OVERLAP_REFCOUNT_BLOCK "overlap-check.refcount-block"
+#define QCOW2_OPT_OVERLAP_SNAPSHOT_TABLE "overlap-check.snapshot-table"
+#define QCOW2_OPT_OVERLAP_INACTIVE_L1 "overlap-check.inactive-l1"
+#define QCOW2_OPT_OVERLAP_INACTIVE_L2 "overlap-check.inactive-l2"
+#define QCOW2_OPT_OVERLAP_BITMAP_DIRECTORY "overlap-check.bitmap-directory"
+#define QCOW2_OPT_CACHE_SIZE "cache-size"
+#define QCOW2_OPT_L2_CACHE_SIZE "l2-cache-size"
+#define QCOW2_OPT_L2_CACHE_ENTRY_SIZE "l2-cache-entry-size"
+#define QCOW2_OPT_REFCOUNT_CACHE_SIZE "refcount-cache-size"
+#define QCOW2_OPT_CACHE_CLEAN_INTERVAL "cache-clean-interval"
+
+typedef struct QCowHeaderProbe {
+ uint32_t magic;
+ uint32_t version;
+} __attribute__((__packed__)) QCowHeaderProbe;
+
+typedef struct QCowHeaderV1
+{
+ uint32_t magic;
+ uint32_t version;
+ uint64_t backing_file_offset;
+ uint32_t backing_file_size;
+ uint32_t mtime;
+ uint64_t size; /* in bytes */
+ uint8_t cluster_bits;
+ uint8_t l2_bits;
+ uint16_t padding;
+ uint32_t crypt_method;
+ uint64_t l1_table_offset;
+} __attribute__((__packed__)) QCowHeaderV1;
+
+typedef struct QCowHeader {
+ uint32_t magic;
+ uint32_t version;
+ uint64_t backing_file_offset;
+ uint32_t backing_file_size;
+ uint32_t cluster_bits;
+ uint64_t size; /* in bytes */
+ uint32_t crypt_method;
+ uint32_t l1_size; /* XXX: save number of clusters instead ? */
+ uint64_t l1_table_offset;
+ uint64_t refcount_table_offset;
+ uint32_t refcount_table_clusters;
+ uint32_t nb_snapshots;
+ uint64_t snapshots_offset;
+
+ /* The following fields are only valid for version >= 3 */
+ uint64_t incompatible_features;
+ uint64_t compatible_features;
+ uint64_t autoclear_features;
+
+ uint32_t refcount_order;
+ uint32_t header_length;
+
+ /* Additional fields */
+ uint8_t compression_type;
+
+ /* header must be a multiple of 8 */
+ uint8_t padding[7];
+} __attribute__((__packed__)) QCowHeader;
+
+typedef struct QCowSnapshotHeader {
+ /* header is 8 byte aligned */
+ uint64_t l1_table_offset;
+
+ uint32_t l1_size;
+ uint16_t id_str_size;
+ uint16_t name_size;
+
+ uint32_t date_sec;
+ uint32_t date_nsec;
+
+ uint64_t vm_clock_nsec;
+
+ uint32_t vm_state_size;
+ uint32_t extra_data_size; /* for extension */
+ /* extra data follows */
+ /* id_str follows */
+ /* name follows */
+} __attribute__((__packed__)) QCowSnapshotHeader;
+
+typedef struct QCowSnapshotExtraData {
+ uint64_t vm_state_size_large;
+ uint64_t disk_size;
+ uint64_t icount;
+} __attribute__((__packed__)) QCowSnapshotExtraData;
+
+
+typedef struct QCowSnapshot {
+ uint64_t l1_table_offset;
+ uint32_t l1_size;
+ char *id_str;
+ char *name;
+ uint64_t disk_size;
+ uint64_t vm_state_size;
+ uint32_t date_sec;
+ uint32_t date_nsec;
+ uint64_t vm_clock_nsec;
+ /* icount value for the moment when snapshot was taken */
+ uint64_t icount;
+ /* Size of all extra data, including QCowSnapshotExtraData if available */
+ uint32_t extra_data_size;
+ /* Data beyond QCowSnapshotExtraData, if any */
+ void *unknown_extra_data;
+} QCowSnapshot;
+
+typedef struct Qcow2CryptoHeaderExtension {
+ uint64_t offset;
+ uint64_t length;
+} __attribute__((__packed__)) Qcow2CryptoHeaderExtension;
+
+typedef struct Qcow2UnknownHeaderExtension {
+ uint32_t magic;
+ uint32_t len;
+ uint8_t data[];
+} Qcow2UnknownHeaderExtension;
+
+enum {
+ QCOW2_FEAT_TYPE_INCOMPATIBLE = 0,
+ QCOW2_FEAT_TYPE_COMPATIBLE = 1,
+ QCOW2_FEAT_TYPE_AUTOCLEAR = 2,
+};
+
+/* Incompatible feature bits */
+enum {
+ QCOW2_INCOMPAT_DIRTY_BITNR = 0,
+ QCOW2_INCOMPAT_CORRUPT_BITNR = 1,
+ QCOW2_INCOMPAT_DATA_FILE_BITNR = 2,
+ QCOW2_INCOMPAT_COMPRESSION_BITNR = 3,
+ QCOW2_INCOMPAT_EXTL2_BITNR = 4,
+ QCOW2_INCOMPAT_DIRTY = 1 << QCOW2_INCOMPAT_DIRTY_BITNR,
+ QCOW2_INCOMPAT_CORRUPT = 1 << QCOW2_INCOMPAT_CORRUPT_BITNR,
+ QCOW2_INCOMPAT_DATA_FILE = 1 << QCOW2_INCOMPAT_DATA_FILE_BITNR,
+ QCOW2_INCOMPAT_COMPRESSION = 1 << QCOW2_INCOMPAT_COMPRESSION_BITNR,
+ QCOW2_INCOMPAT_EXTL2 = 1 << QCOW2_INCOMPAT_EXTL2_BITNR,
+
+ QCOW2_INCOMPAT_MASK = QCOW2_INCOMPAT_DIRTY
+ | QCOW2_INCOMPAT_CORRUPT
+ | QCOW2_INCOMPAT_DATA_FILE
+ | QCOW2_INCOMPAT_COMPRESSION
+ | QCOW2_INCOMPAT_EXTL2,
+};
+
+/* Compatible feature bits */
+enum {
+ QCOW2_COMPAT_LAZY_REFCOUNTS_BITNR = 0,
+ QCOW2_COMPAT_LAZY_REFCOUNTS = 1 << QCOW2_COMPAT_LAZY_REFCOUNTS_BITNR,
+
+ QCOW2_COMPAT_FEAT_MASK = QCOW2_COMPAT_LAZY_REFCOUNTS,
+};
+
+/* Autoclear feature bits */
+enum {
+ QCOW2_AUTOCLEAR_BITMAPS_BITNR = 0,
+ QCOW2_AUTOCLEAR_DATA_FILE_RAW_BITNR = 1,
+ QCOW2_AUTOCLEAR_BITMAPS = 1 << QCOW2_AUTOCLEAR_BITMAPS_BITNR,
+ QCOW2_AUTOCLEAR_DATA_FILE_RAW = 1 << QCOW2_AUTOCLEAR_DATA_FILE_RAW_BITNR,
+
+ QCOW2_AUTOCLEAR_MASK = QCOW2_AUTOCLEAR_BITMAPS
+ | QCOW2_AUTOCLEAR_DATA_FILE_RAW,
+};
+
+enum qcow2_discard_type {
+ QCOW2_DISCARD_NEVER = 0,
+ QCOW2_DISCARD_ALWAYS,
+ QCOW2_DISCARD_REQUEST,
+ QCOW2_DISCARD_SNAPSHOT,
+ QCOW2_DISCARD_OTHER,
+ QCOW2_DISCARD_MAX
+};
+
+typedef struct Qcow2Feature {
+ uint8_t type;
+ uint8_t bit;
+ char name[46];
+} __attribute__((__packed__)) Qcow2Feature;
+
+typedef struct Qcow2DiscardRegion {
+ uint64_t offset;
+ uint64_t bytes;
+} Qcow2DiscardRegion;
+
+typedef uint64_t Qcow2GetRefcountFunc(const void *refcount_array,
+ uint64_t index);
+typedef void Qcow2SetRefcountFunc(void *refcount_array,
+ uint64_t index, uint64_t value);
+
+typedef struct Qcow2BitmapHeaderExt {
+ uint32_t nb_bitmaps;
+ uint32_t reserved32;
+ uint64_t bitmap_directory_size;
+ uint64_t bitmap_directory_offset;
+} __attribute__((__packed__)) Qcow2BitmapHeaderExt;
+
+#define QCOW_RC_CACHE_SIZE QCOW_L2_CACHE_SIZE;
+
+typedef struct Qcow2COWRegion {
+ /**
+ * Offset of the COW region in bytes from the start of the first cluster
+ * touched by the request.
+ */
+ unsigned offset;
+
+ /** Number of bytes to copy */
+ unsigned nb_bytes;
+} Qcow2COWRegion;
+
+/**
+ * Describes an in-flight (part of a) write request that writes to clusters
+ * that are not referenced in their L2 table yet.
+ */
+typedef struct QCowL2Meta
+{
+ /** Guest offset of the first newly allocated cluster */
+ uint64_t offset;
+
+ /** Host offset of the first newly allocated cluster */
+ uint64_t alloc_offset;
+
+ /** Number of newly allocated clusters */
+ int nb_clusters;
+
+ /** Do not free the old clusters */
+ bool keep_old_clusters;
+
+ /**
+ * The COW Region between the start of the first allocated cluster and the
+ * area the guest actually writes to.
+ */
+ Qcow2COWRegion cow_start;
+
+ /**
+ * The COW Region between the area the guest actually writes to and the
+ * end of the last allocated cluster.
+ */
+ Qcow2COWRegion cow_end;
+
+ /*
+ * Indicates that COW regions are already handled and do not require
+ * any more processing.
+ */
+ bool skip_cow;
+
+ /**
+ * Indicates that this is not a normal write request but a preallocation.
+ * If the image has extended L2 entries this means that no new individual
+ * subclusters will be marked as allocated in the L2 bitmap (but any
+ * existing contents of that bitmap will be kept).
+ */
+ bool prealloc;
+
+ /** Pointer to next L2Meta of the same write request */
+ struct QCowL2Meta *next;
+} QCowL2Meta;
+
+typedef enum QCow2ClusterType {
+ QCOW2_CLUSTER_UNALLOCATED,
+ QCOW2_CLUSTER_ZERO_PLAIN,
+ QCOW2_CLUSTER_ZERO_ALLOC,
+ QCOW2_CLUSTER_NORMAL,
+ QCOW2_CLUSTER_COMPRESSED,
+} QCow2ClusterType;
+
+typedef enum QCow2MetadataOverlap {
+ QCOW2_OL_MAIN_HEADER_BITNR = 0,
+ QCOW2_OL_ACTIVE_L1_BITNR = 1,
+ QCOW2_OL_ACTIVE_L2_BITNR = 2,
+ QCOW2_OL_REFCOUNT_TABLE_BITNR = 3,
+ QCOW2_OL_REFCOUNT_BLOCK_BITNR = 4,
+ QCOW2_OL_SNAPSHOT_TABLE_BITNR = 5,
+ QCOW2_OL_INACTIVE_L1_BITNR = 6,
+ QCOW2_OL_INACTIVE_L2_BITNR = 7,
+ QCOW2_OL_BITMAP_DIRECTORY_BITNR = 8,
+
+ QCOW2_OL_MAX_BITNR = 9,
+
+ QCOW2_OL_NONE = 0,
+ QCOW2_OL_MAIN_HEADER = (1 << QCOW2_OL_MAIN_HEADER_BITNR),
+ QCOW2_OL_ACTIVE_L1 = (1 << QCOW2_OL_ACTIVE_L1_BITNR),
+ QCOW2_OL_ACTIVE_L2 = (1 << QCOW2_OL_ACTIVE_L2_BITNR),
+ QCOW2_OL_REFCOUNT_TABLE = (1 << QCOW2_OL_REFCOUNT_TABLE_BITNR),
+ QCOW2_OL_REFCOUNT_BLOCK = (1 << QCOW2_OL_REFCOUNT_BLOCK_BITNR),
+ QCOW2_OL_SNAPSHOT_TABLE = (1 << QCOW2_OL_SNAPSHOT_TABLE_BITNR),
+ QCOW2_OL_INACTIVE_L1 = (1 << QCOW2_OL_INACTIVE_L1_BITNR),
+ /* NOTE: Checking overlaps with inactive L2 tables will result in bdrv
+ * reads. */
+ QCOW2_OL_INACTIVE_L2 = (1 << QCOW2_OL_INACTIVE_L2_BITNR),
+ QCOW2_OL_BITMAP_DIRECTORY = (1 << QCOW2_OL_BITMAP_DIRECTORY_BITNR),
+} QCow2MetadataOverlap;
+
+/* Perform all overlap checks which can be done in constant time */
+#define QCOW2_OL_CONSTANT \
+ (QCOW2_OL_MAIN_HEADER | QCOW2_OL_ACTIVE_L1 | QCOW2_OL_REFCOUNT_TABLE | \
+ QCOW2_OL_SNAPSHOT_TABLE | QCOW2_OL_BITMAP_DIRECTORY)
+
+/* Perform all overlap checks which don't require disk access */
+#define QCOW2_OL_CACHED \
+ (QCOW2_OL_CONSTANT | QCOW2_OL_ACTIVE_L2 | QCOW2_OL_REFCOUNT_BLOCK | \
+ QCOW2_OL_INACTIVE_L1)
+
+/* Perform all overlap checks */
+#define QCOW2_OL_ALL \
+ (QCOW2_OL_CACHED | QCOW2_OL_INACTIVE_L2)
+
+#define QCOW_L1E_OFFSET_MASK 0x00fffffffffffe00ULL
+#define QCOW_L2E_OFFSET_MASK 0x00fffffffffffe00ULL
+#define QCOW_L2E_COMPRESSED_OFFSET_SIZE_MASK 0x3fffffffffffffffULL
+
+#define REFT_OFFSET_MASK 0xfffffffffffffe00ULL
+
+#define INV_OFFSET (-1ULL)
+
+static inline uint64_t l2meta_cow_start(QCowL2Meta *m)
+{
+ return m->offset + m->cow_start.offset;
+}
+
+static inline uint64_t l2meta_cow_end(QCowL2Meta *m)
+{
+ return m->offset + m->cow_end.offset + m->cow_end.nb_bytes;
+}
+
+static inline uint64_t refcount_diff(uint64_t r1, uint64_t r2)
+{
+ return r1 > r2 ? r1 - r2 : r2 - r1;
+}
+
+#endif // CEPH_LIBRBD_MIGRATION_QCOW2_H
diff --git a/src/librbd/migration/QCOWFormat.cc b/src/librbd/migration/QCOWFormat.cc
new file mode 100644
index 000000000..7bd4a5ef7
--- /dev/null
+++ b/src/librbd/migration/QCOWFormat.cc
@@ -0,0 +1,1542 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include "librbd/migration/QCOWFormat.h"
+#include "common/Clock.h"
+#include "common/dout.h"
+#include "common/errno.h"
+#include "include/intarith.h"
+#include "librbd/AsioEngine.h"
+#include "librbd/ImageCtx.h"
+#include "librbd/ImageState.h"
+#include "librbd/Utils.h"
+#include "librbd/io/AioCompletion.h"
+#include "librbd/io/ReadResult.h"
+#include "librbd/migration/SnapshotInterface.h"
+#include "librbd/migration/SourceSpecBuilder.h"
+#include "librbd/migration/StreamInterface.h"
+#include "librbd/migration/Utils.h"
+#include <boost/asio/dispatch.hpp>
+#include <boost/asio/post.hpp>
+#include <deque>
+#include <tuple>
+#include <unordered_map>
+#include <vector>
+
+#define dout_subsys ceph_subsys_rbd
+
+namespace librbd {
+namespace migration {
+
+#undef dout_prefix
+#define dout_prefix *_dout << "librbd::migration::QCOWFormat: " \
+ << __func__ << ": "
+
+namespace qcow_format {
+
+struct ClusterExtent {
+ uint64_t cluster_offset;
+ uint64_t cluster_length;
+ uint64_t intra_cluster_offset;
+ uint64_t image_offset;
+ uint64_t buffer_offset;
+
+ ClusterExtent(uint64_t cluster_offset, uint64_t cluster_length,
+ uint64_t intra_cluster_offset, uint64_t image_offset,
+ uint64_t buffer_offset)
+ : cluster_offset(cluster_offset), cluster_length(cluster_length),
+ intra_cluster_offset(intra_cluster_offset), image_offset(image_offset),
+ buffer_offset(buffer_offset) {
+ }
+};
+
+typedef std::vector<ClusterExtent> ClusterExtents;
+
+void LookupTable::init() {
+ if (cluster_offsets == nullptr) {
+ cluster_offsets = reinterpret_cast<uint64_t*>(bl.c_str());
+ }
+}
+
+void LookupTable::decode() {
+ init();
+
+ // L2 tables are selectively byte-swapped on demand if only requesting a
+ // single cluster offset
+ if (decoded) {
+ return;
+ }
+
+ // translate the lookup table (big-endian -> CPU endianess)
+ for (auto idx = 0UL; idx < size; ++idx) {
+ cluster_offsets[idx] = be64toh(cluster_offsets[idx]);
+ }
+
+ decoded = true;
+}
+
+void populate_cluster_extents(CephContext* cct, uint64_t cluster_size,
+ const io::Extents& image_extents,
+ ClusterExtents* cluster_extents) {
+ uint64_t buffer_offset = 0;
+ for (auto [image_offset, image_length] : image_extents) {
+ while (image_length > 0) {
+ auto intra_cluster_offset = image_offset & (cluster_size - 1);
+ auto intra_cluster_length = cluster_size - intra_cluster_offset;
+ auto cluster_length = std::min(image_length, intra_cluster_length);
+
+ ldout(cct, 20) << "image_offset=" << image_offset << ", "
+ << "image_length=" << image_length << ", "
+ << "cluster_length=" << cluster_length << dendl;
+
+
+ cluster_extents->emplace_back(0, cluster_length, intra_cluster_offset,
+ image_offset, buffer_offset);
+
+ image_offset += cluster_length;
+ image_length -= cluster_length;
+ buffer_offset += cluster_length;
+ }
+ }
+}
+
+} // namespace qcow_format
+
+using namespace qcow_format;
+
+template <typename I>
+struct QCOWFormat<I>::Cluster {
+ const uint64_t cluster_offset;
+ bufferlist cluster_data_bl;
+
+ Cluster(uint64_t cluster_offset) : cluster_offset(cluster_offset) {
+ }
+};
+
+#undef dout_prefix
+#define dout_prefix *_dout << "librbd::migration::QCOWFormat::ClusterCache: " \
+ << this << " " << __func__ << ": "
+
+template <typename I>
+class QCOWFormat<I>::ClusterCache {
+public:
+ ClusterCache(QCOWFormat* qcow_format)
+ : qcow_format(qcow_format),
+ m_strand(*qcow_format->m_image_ctx->asio_engine) {
+ }
+
+ void get_cluster(uint64_t cluster_offset, uint64_t cluster_length,
+ uint64_t intra_cluster_offset, bufferlist* bl,
+ Context* on_finish) {
+ auto cct = qcow_format->m_image_ctx->cct;
+ ldout(cct, 20) << "cluster_offset=" << cluster_offset << dendl;
+
+ // cache state machine runs in a single strand thread
+ boost::asio::dispatch(
+ m_strand,
+ [this, cluster_offset, cluster_length, intra_cluster_offset, bl,
+ on_finish]() {
+ execute_get_cluster(cluster_offset, cluster_length,
+ intra_cluster_offset, bl, on_finish);
+ });
+ }
+
+private:
+ typedef std::tuple<uint64_t, uint64_t, bufferlist*, Context*> Completion;
+ typedef std::list<Completion> Completions;
+
+ QCOWFormat* qcow_format;
+ boost::asio::io_context::strand m_strand;
+
+ std::shared_ptr<Cluster> cluster;
+ std::unordered_map<uint64_t, Completions> cluster_completions;
+
+ void execute_get_cluster(uint64_t cluster_offset, uint64_t cluster_length,
+ uint64_t intra_cluster_offset, bufferlist* bl,
+ Context* on_finish) {
+ auto cct = qcow_format->m_image_ctx->cct;
+ ldout(cct, 20) << "cluster_offset=" << cluster_offset << dendl;
+
+ if (cluster && cluster->cluster_offset == cluster_offset) {
+ // most-recent cluster matches
+ bl->substr_of(cluster->cluster_data_bl, intra_cluster_offset,
+ cluster_length);
+ boost::asio::post(*qcow_format->m_image_ctx->asio_engine,
+ [on_finish]() { on_finish->complete(0); });
+ return;
+ }
+
+ // record callback for cluster
+ bool new_request = (cluster_completions.count(cluster_offset) == 0);
+ cluster_completions[cluster_offset].emplace_back(
+ intra_cluster_offset, cluster_length, bl, on_finish);
+ if (new_request) {
+ // start the new read request
+ read_cluster(std::make_shared<Cluster>(cluster_offset));
+ }
+ }
+
+ void read_cluster(std::shared_ptr<Cluster> cluster) {
+ auto cct = qcow_format->m_image_ctx->cct;
+
+ uint64_t stream_offset = cluster->cluster_offset;
+ uint64_t stream_length = qcow_format->m_cluster_size;
+ if ((cluster->cluster_offset & QCOW_OFLAG_COMPRESSED) != 0) {
+ // compressed clusters encode the compressed length in the lower bits
+ stream_offset = cluster->cluster_offset &
+ qcow_format->m_cluster_offset_mask;
+ stream_length = (cluster->cluster_offset >>
+ (63 - qcow_format->m_cluster_bits)) &
+ (qcow_format->m_cluster_size - 1);
+ }
+
+ ldout(cct, 20) << "cluster_offset=" << cluster->cluster_offset << ", "
+ << "stream_offset=" << stream_offset << ", "
+ << "stream_length=" << stream_length << dendl;
+
+ // read the cluster into the cache entry
+ auto ctx = new LambdaContext([this, cluster](int r) {
+ boost::asio::post(m_strand, [this, cluster, r]() {
+ handle_read_cluster(r, cluster); }); });
+ qcow_format->m_stream->read({{stream_offset, stream_length}},
+ &cluster->cluster_data_bl, ctx);
+ }
+
+ void handle_read_cluster(int r, std::shared_ptr<Cluster> cluster) {
+ auto cct = qcow_format->m_image_ctx->cct;
+ ldout(cct, 20) << "r=" << r << ", "
+ << "cluster_offset=" << cluster->cluster_offset << dendl;
+
+ auto completions = std::move(cluster_completions[cluster->cluster_offset]);
+ cluster_completions.erase(cluster->cluster_offset);
+
+ if (r < 0) {
+ lderr(cct) << "failed to read cluster offset " << cluster->cluster_offset
+ << ": " << cpp_strerror(r) << dendl;
+ } else {
+ if ((cluster->cluster_offset & QCOW_OFLAG_COMPRESSED) != 0) {
+ bufferlist compressed_bl{std::move(cluster->cluster_data_bl)};
+ cluster->cluster_data_bl.clear();
+
+ // TODO
+ lderr(cct) << "support for compressed clusters is not available"
+ << dendl;
+ r = -EINVAL;
+ } else {
+ // cache the MRU cluster in case of sequential IO
+ this->cluster = cluster;
+ }
+ }
+
+ // complete the IO back to caller
+ boost::asio::post(*qcow_format->m_image_ctx->asio_engine,
+ [r, cluster, completions=std::move(completions)]() {
+ for (auto completion : completions) {
+ if (r >= 0) {
+ std::get<2>(completion)->substr_of(
+ cluster->cluster_data_bl,
+ std::get<0>(completion),
+ std::get<1>(completion));
+ }
+ std::get<3>(completion)->complete(r);
+ }
+ });
+ }
+};
+
+#undef dout_prefix
+#define dout_prefix *_dout << "librbd::migration::QCOWFormat::L2TableCache: " \
+ << this << " " << __func__ << ": "
+
+template <typename I>
+class QCOWFormat<I>::L2TableCache {
+public:
+ L2TableCache(QCOWFormat* qcow_format)
+ : qcow_format(qcow_format),
+ m_strand(*qcow_format->m_image_ctx->asio_engine),
+ l2_cache_entries(QCOW_L2_CACHE_SIZE) {
+ }
+
+ void get_l2_table(const LookupTable* l1_table, uint64_t l2_table_offset,
+ std::shared_ptr<const LookupTable>* l2_table,
+ Context* on_finish) {
+ auto cct = qcow_format->m_image_ctx->cct;
+ ldout(cct, 20) << "l2_table_offset=" << l2_table_offset << dendl;
+
+ // cache state machine runs in a single strand thread
+ Request request{l1_table, l2_table_offset, l2_table, on_finish};
+ boost::asio::dispatch(
+ m_strand, [this, request=std::move(request)]() {
+ requests.push_back(std::move(request));
+ });
+ dispatch_request();
+ }
+
+ void get_cluster_offset(const LookupTable* l1_table,
+ uint64_t image_offset, uint64_t* cluster_offset,
+ Context* on_finish) {
+ auto cct = qcow_format->m_image_ctx->cct;
+ uint32_t l1_table_index = image_offset >> qcow_format->m_l1_shift;
+ uint64_t l2_table_offset = l1_table->cluster_offsets[std::min<uint32_t>(
+ l1_table_index, l1_table->size - 1)] &
+ qcow_format->m_cluster_mask;
+ uint32_t l2_table_index = (image_offset >> qcow_format->m_cluster_bits) &
+ (qcow_format->m_l2_size - 1);
+ ldout(cct, 20) << "image_offset=" << image_offset << ", "
+ << "l1_table_index=" << l1_table_index << ", "
+ << "l2_table_offset=" << l2_table_offset << ", "
+ << "l2_table_index=" << l2_table_index << dendl;
+
+ if (l1_table_index >= l1_table->size) {
+ lderr(cct) << "L1 index " << l1_table_index << " out-of-bounds" << dendl;
+ on_finish->complete(-ERANGE);
+ return;
+ } else if (l2_table_offset == 0) {
+ // L2 table has not been allocated for specified offset
+ ldout(cct, 20) << "image_offset=" << image_offset << ", "
+ << "cluster_offset=DNE" << dendl;
+ *cluster_offset = 0;
+ on_finish->complete(-ENOENT);
+ return;
+ }
+
+ // cache state machine runs in a single strand thread
+ Request request{l1_table, l2_table_offset, l2_table_index, cluster_offset,
+ on_finish};
+ boost::asio::dispatch(
+ m_strand, [this, request=std::move(request)]() {
+ requests.push_back(std::move(request));
+ });
+ dispatch_request();
+ }
+
+private:
+ QCOWFormat* qcow_format;
+
+ boost::asio::io_context::strand m_strand;
+
+ struct Request {
+ const LookupTable* l1_table;
+
+ uint64_t l2_table_offset;
+
+ // get_cluster_offset request
+ uint32_t l2_table_index;
+ uint64_t* cluster_offset = nullptr;
+
+ // get_l2_table request
+ std::shared_ptr<const LookupTable>* l2_table;
+
+ Context* on_finish;
+
+ Request(const LookupTable* l1_table, uint64_t l2_table_offset,
+ uint32_t l2_table_index, uint64_t* cluster_offset,
+ Context* on_finish)
+ : l1_table(l1_table), l2_table_offset(l2_table_offset),
+ l2_table_index(l2_table_index), cluster_offset(cluster_offset),
+ on_finish(on_finish) {
+ }
+ Request(const LookupTable* l1_table, uint64_t l2_table_offset,
+ std::shared_ptr<const LookupTable>* l2_table, Context* on_finish)
+ : l1_table(l1_table), l2_table_offset(l2_table_offset),
+ l2_table(l2_table), on_finish(on_finish) {
+ }
+ };
+
+ typedef std::deque<Request> Requests;
+
+ struct L2Cache {
+ uint64_t l2_offset = 0;
+ std::shared_ptr<LookupTable> l2_table;
+
+ utime_t timestamp;
+ uint32_t count = 0;
+ bool in_flight = false;
+
+ int ret_val = 0;
+ };
+ std::vector<L2Cache> l2_cache_entries;
+
+ Requests requests;
+
+ void dispatch_request() {
+ boost::asio::dispatch(m_strand, [this]() { execute_request(); });
+ }
+
+ void execute_request() {
+ auto cct = qcow_format->m_image_ctx->cct;
+ if (requests.empty()) {
+ return;
+ }
+
+ auto request = requests.front();
+ ldout(cct, 20) << "l2_table_offset=" << request.l2_table_offset << dendl;
+
+ std::shared_ptr<LookupTable> l2_table;
+ int r = l2_table_lookup(request.l2_table_offset, &l2_table);
+ if (r < 0) {
+ lderr(cct) << "failed to load L2 table: l2_table_offset="
+ << request.l2_table_offset << ": "
+ << cpp_strerror(r) << dendl;
+ } else if (l2_table == nullptr) {
+ // table not in cache -- will restart once its loaded
+ return;
+ } else if (request.cluster_offset != nullptr) {
+ auto cluster_offset = l2_table->cluster_offsets[request.l2_table_index];
+ if (!l2_table->decoded) {
+ // table hasn't been byte-swapped
+ cluster_offset = be64toh(cluster_offset);
+ }
+
+ *request.cluster_offset = cluster_offset & qcow_format->m_cluster_mask;
+ if (*request.cluster_offset == QCOW_OFLAG_ZERO) {
+ ldout(cct, 20) << "l2_table_offset=" << request.l2_table_offset << ", "
+ << "l2_table_index=" << request.l2_table_index << ", "
+ << "cluster_offset=zeroed" << dendl;
+ } else {
+ ldout(cct, 20) << "l2_table_offset=" << request.l2_table_offset << ", "
+ << "l2_table_index=" << request.l2_table_index << ", "
+ << "cluster_offset=" << *request.cluster_offset
+ << dendl;
+ }
+ } else if (request.l2_table != nullptr) {
+ // ensure it's in the correct byte-order
+ l2_table->decode();
+ *request.l2_table = l2_table;
+ } else {
+ ceph_assert(false);
+ }
+
+ // complete the L2 cache request
+ boost::asio::post(*qcow_format->m_image_ctx->asio_engine,
+ [r, ctx=request.on_finish]() { ctx->complete(r); });
+ requests.pop_front();
+
+ // process next request (if any)
+ dispatch_request();
+ }
+
+ int l2_table_lookup(uint64_t l2_offset,
+ std::shared_ptr<LookupTable>* l2_table) {
+ auto cct = qcow_format->m_image_ctx->cct;
+
+ l2_table->reset();
+
+ // find a match in the existing cache
+ for (auto idx = 0U; idx < l2_cache_entries.size(); ++idx) {
+ auto& l2_cache = l2_cache_entries[idx];
+ if (l2_cache.l2_offset == l2_offset) {
+ if (l2_cache.in_flight) {
+ ldout(cct, 20) << "l2_offset=" << l2_offset << ", "
+ << "index=" << idx << " (in-flight)" << dendl;
+ return 0;
+ }
+
+ if (l2_cache.ret_val < 0) {
+ ldout(cct, 20) << "l2_offset=" << l2_offset << ", "
+ << "index=" << idx << " (error): "
+ << cpp_strerror(l2_cache.ret_val) << dendl;
+ int r = l2_cache.ret_val;
+ l2_cache = L2Cache{};
+
+ return r;
+ }
+
+ ++l2_cache.count;
+ if (l2_cache.count == std::numeric_limits<uint32_t>::max()) {
+ for (auto& entry : l2_cache_entries) {
+ entry.count >>= 1;
+ }
+ }
+
+ ldout(cct, 20) << "l2_offset=" << l2_offset << ", " << "index=" << idx
+ << dendl;
+ *l2_table = l2_cache.l2_table;
+ return 0;
+ }
+ }
+
+ // find the least used entry
+ int32_t min_idx = -1;
+ uint32_t min_count = std::numeric_limits<uint32_t>::max();
+ utime_t min_timestamp;
+ for (uint32_t idx = 0U; idx < l2_cache_entries.size(); ++idx) {
+ auto& l2_cache = l2_cache_entries[idx];
+ if (l2_cache.in_flight) {
+ continue;
+ }
+
+ if (l2_cache.count > 0) {
+ --l2_cache.count;
+ }
+
+ if (l2_cache.count <= min_count) {
+ if (min_idx == -1 || l2_cache.timestamp < min_timestamp) {
+ min_timestamp = l2_cache.timestamp;
+ min_count = l2_cache.count;
+ min_idx = idx;
+ }
+ }
+ }
+
+ if (min_idx == -1) {
+ // no space in the cache due to in-flight requests
+ ldout(cct, 20) << "l2_offset=" << l2_offset << ", "
+ << "index=DNE (cache busy)" << dendl;
+ return 0;
+ }
+
+ ldout(cct, 20) << "l2_offset=" << l2_offset << ", "
+ << "index=" << min_idx << " (loading)" << dendl;
+ auto& l2_cache = l2_cache_entries[min_idx];
+ l2_cache.l2_table = std::make_shared<LookupTable>(qcow_format->m_l2_size);
+ l2_cache.l2_offset = l2_offset;
+ l2_cache.timestamp = ceph_clock_now();
+ l2_cache.count = 1;
+ l2_cache.in_flight = true;
+
+ // read the L2 table into the L2 cache entry
+ auto ctx = new LambdaContext([this, index=min_idx, l2_offset](int r) {
+ boost::asio::post(m_strand, [this, index, l2_offset, r]() {
+ handle_l2_table_lookup(r, index, l2_offset); }); });
+ qcow_format->m_stream->read(
+ {{l2_offset, qcow_format->m_l2_size * sizeof(uint64_t)}},
+ &l2_cache.l2_table->bl, ctx);
+ return 0;
+ }
+
+ void handle_l2_table_lookup(int r, uint32_t index, uint64_t l2_offset) {
+ auto cct = qcow_format->m_image_ctx->cct;
+ ldout(cct, 20) << "r=" << r << ", "
+ << "l2_offset=" << l2_offset << ", "
+ << "index=" << index << dendl;
+
+ auto& l2_cache = l2_cache_entries[index];
+ ceph_assert(l2_cache.in_flight);
+ l2_cache.in_flight = false;
+
+ if (r < 0) {
+ lderr(cct) << "failed to load L2 table: "
+ << "l2_offset=" << l2_cache.l2_offset << ": "
+ << cpp_strerror(r) << dendl;
+ l2_cache.ret_val = r;
+ } else {
+ // keep the L2 table in big-endian byte-order until the full table
+ // is requested
+ l2_cache.l2_table->init();
+ }
+
+ // restart the state machine
+ dispatch_request();
+ }
+
+};
+
+#undef dout_prefix
+#define dout_prefix *_dout << "librbd::migration::QCOWFormat::ReadRequest: " \
+ << this << " " << __func__ << ": "
+
+template <typename I>
+class QCOWFormat<I>::ReadRequest {
+public:
+ ReadRequest(QCOWFormat* qcow_format, io::AioCompletion* aio_comp,
+ const LookupTable* l1_table, io::Extents&& image_extents)
+ : qcow_format(qcow_format), aio_comp(aio_comp), l1_table(l1_table),
+ image_extents(std::move(image_extents)) {
+ }
+
+ void send() {
+ get_cluster_offsets();
+ }
+
+private:
+ QCOWFormat* qcow_format;
+ io::AioCompletion* aio_comp;
+
+ const LookupTable* l1_table;
+ io::Extents image_extents;
+
+ size_t image_extents_idx = 0;
+ uint32_t image_extent_offset = 0;
+
+ ClusterExtents cluster_extents;
+
+ void get_cluster_offsets() {
+ auto cct = qcow_format->m_image_ctx->cct;
+ populate_cluster_extents(cct, qcow_format->m_cluster_size, image_extents,
+ &cluster_extents);
+
+ ldout(cct, 20) << dendl;
+ auto ctx = new LambdaContext([this](int r) {
+ handle_get_cluster_offsets(r); });
+ auto gather_ctx = new C_Gather(cct, ctx);
+
+ for (auto& cluster_extent : cluster_extents) {
+ auto sub_ctx = new LambdaContext(
+ [this, &cluster_extent, on_finish=gather_ctx->new_sub()](int r) {
+ handle_get_cluster_offset(r, cluster_extent, on_finish); });
+ qcow_format->m_l2_table_cache->get_cluster_offset(
+ l1_table, cluster_extent.image_offset,
+ &cluster_extent.cluster_offset, sub_ctx);
+ }
+
+ gather_ctx->activate();
+ }
+
+ void handle_get_cluster_offset(int r, const ClusterExtent& cluster_extent,
+ Context* on_finish) {
+ auto cct = qcow_format->m_image_ctx->cct;
+ ldout(cct, 20) << "r=" << r << ", "
+ << "image_offset=" << cluster_extent.image_offset << ", "
+ << "cluster_offset=" << cluster_extent.cluster_offset
+ << dendl;
+
+ if (r == -ENOENT) {
+ ldout(cct, 20) << "image offset DNE in QCOW image" << dendl;
+ r = 0;
+ } else if (r < 0) {
+ lderr(cct) << "failed to map image offset " << cluster_extent.image_offset
+ << ": " << cpp_strerror(r) << dendl;
+ }
+
+ on_finish->complete(r);
+ }
+
+ void handle_get_cluster_offsets(int r) {
+ auto cct = qcow_format->m_image_ctx->cct;
+ ldout(cct, 20) << "r=" << r << dendl;
+
+ if (r < 0) {
+ lderr(cct) << "failed to retrieve cluster extents: " << cpp_strerror(r)
+ << dendl;
+ aio_comp->fail(r);
+ delete this;
+ return;
+ }
+
+ read_clusters();
+ }
+
+ void read_clusters() {
+ auto cct = qcow_format->m_image_ctx->cct;
+ ldout(cct, 20) << dendl;
+
+ aio_comp->set_request_count(cluster_extents.size());
+ for (auto& cluster_extent : cluster_extents) {
+ auto read_ctx = new io::ReadResult::C_ImageReadRequest(
+ aio_comp, cluster_extent.buffer_offset,
+ {{cluster_extent.image_offset, cluster_extent.cluster_length}});
+ read_ctx->ignore_enoent = true;
+
+ auto log_ctx = new LambdaContext(
+ [this, cct=qcow_format->m_image_ctx->cct,
+ image_offset=cluster_extent.image_offset,
+ image_length=cluster_extent.cluster_length, ctx=read_ctx](int r) {
+ handle_read_cluster(cct, r, image_offset, image_length, ctx);
+ });
+
+ if (cluster_extent.cluster_offset == 0) {
+ // QCOW header is at offset 0, implies cluster DNE
+ log_ctx->complete(-ENOENT);
+ } else if (cluster_extent.cluster_offset == QCOW_OFLAG_ZERO) {
+ // explicitly zeroed section
+ read_ctx->bl.append_zero(cluster_extent.cluster_length);
+ log_ctx->complete(0);
+ } else {
+ // request the (sub)cluster from the cluster cache
+ qcow_format->m_cluster_cache->get_cluster(
+ cluster_extent.cluster_offset, cluster_extent.cluster_length,
+ cluster_extent.intra_cluster_offset, &read_ctx->bl, log_ctx);
+ }
+ }
+
+ delete this;
+ }
+
+ void handle_read_cluster(CephContext* cct, int r, uint64_t image_offset,
+ uint64_t image_length, Context* on_finish) const {
+ // NOTE: treat as static function, expect object has been deleted
+
+ ldout(cct, 20) << "r=" << r << ", "
+ << "image_offset=" << image_offset << ", "
+ << "image_length=" << image_length << dendl;
+
+ if (r != -ENOENT && r < 0) {
+ lderr(cct) << "failed to read image extent " << image_offset << "~"
+ << image_length << ": " << cpp_strerror(r) << dendl;
+ }
+
+ on_finish->complete(r);
+ }
+};
+
+#undef dout_prefix
+#define dout_prefix *_dout << "librbd::migration::QCOWFormat::" \
+ << "ListSnapsRequest: " << this << " " \
+ << __func__ << ": "
+
+template <typename I>
+class QCOWFormat<I>::ListSnapsRequest {
+public:
+ ListSnapsRequest(
+ QCOWFormat* qcow_format, uint32_t l1_table_index,
+ ClusterExtents&& cluster_extents,
+ const std::map<uint64_t, const LookupTable*>& snap_id_to_l1_table,
+ io::SnapshotDelta* snapshot_delta, Context* on_finish)
+ : qcow_format(qcow_format), l1_table_index(l1_table_index),
+ cluster_extents(std::move(cluster_extents)),
+ snap_id_to_l1_table(snap_id_to_l1_table), snapshot_delta(snapshot_delta),
+ on_finish(on_finish) {
+ }
+
+ void send() {
+ get_l2_table();
+ }
+
+private:
+ QCOWFormat* qcow_format;
+ uint32_t l1_table_index;
+ ClusterExtents cluster_extents;
+ std::map<uint64_t, const LookupTable*> snap_id_to_l1_table;
+ io::SnapshotDelta* snapshot_delta;
+ Context* on_finish;
+
+ std::shared_ptr<const LookupTable> previous_l2_table;
+ std::shared_ptr<const LookupTable> l2_table;
+
+ void get_l2_table() {
+ auto cct = qcow_format->m_image_ctx->cct;
+ if (snap_id_to_l1_table.empty()) {
+ finish(0);
+ return;
+ }
+
+ auto it = snap_id_to_l1_table.begin();
+ auto [snap_id, l1_table] = *it;
+ snap_id_to_l1_table.erase(it);
+
+ previous_l2_table = l2_table;
+ l2_table.reset();
+
+ auto ctx = new LambdaContext([this, snap_id = snap_id](int r) {
+ boost::asio::post(qcow_format->m_strand, [this, snap_id, r]() {
+ handle_get_l2_table(r, snap_id);
+ });
+ });
+
+ if (l1_table_index >= l1_table->size ||
+ l1_table->cluster_offsets[l1_table_index] == 0) {
+ ldout(cct, 20) << "l1_table_index=" << l1_table_index << ", "
+ << "snap_id=" << snap_id << ": DNE" << dendl;
+ ctx->complete(-ENOENT);
+ return;
+ }
+
+ uint64_t l2_table_offset = l1_table->cluster_offsets[l1_table_index] &
+ qcow_format->m_cluster_mask;
+
+ ldout(cct, 20) << "l1_table_index=" << l1_table_index << ", "
+ << "snap_id=" << snap_id << ", "
+ << "l2_table_offset=" << l2_table_offset << dendl;
+ qcow_format->m_l2_table_cache->get_l2_table(l1_table, l2_table_offset,
+ &l2_table, ctx);
+ }
+
+ void handle_get_l2_table(int r, uint64_t snap_id) {
+ ceph_assert(qcow_format->m_strand.running_in_this_thread());
+
+ auto cct = qcow_format->m_image_ctx->cct;
+ ldout(cct, 20) << "r=" << r << ", "
+ << "snap_id=" << snap_id << dendl;
+
+ if (r == -ENOENT) {
+ l2_table.reset();
+ } else if (r < 0) {
+ lderr(cct) << "failed to retrieve L2 table for snapshot " << snap_id
+ << ": " << cpp_strerror(r) << dendl;
+ finish(r);
+ return;
+ }
+
+ // compare the cluster offsets at each requested L2 offset between
+ // the previous snapshot's L2 table and the current L2 table.
+ auto& sparse_extents = (*snapshot_delta)[{snap_id, snap_id}];
+ for (auto& cluster_extent : cluster_extents) {
+ uint32_t l2_table_index =
+ (cluster_extent.image_offset >> qcow_format->m_cluster_bits) &
+ (qcow_format->m_l2_size - 1);
+
+ std::optional<uint64_t> cluster_offset;
+ if (l2_table && l2_table_index < l2_table->size) {
+ cluster_offset = l2_table->cluster_offsets[l2_table_index] &
+ qcow_format->m_cluster_offset_mask;
+ }
+
+ std::optional<uint64_t> prev_cluster_offset;
+ if (previous_l2_table && l2_table_index < previous_l2_table->size) {
+ prev_cluster_offset =
+ previous_l2_table->cluster_offsets[l2_table_index] &
+ qcow_format->m_cluster_offset_mask;
+ }
+
+ ldout(cct, 20) << "l1_table_index=" << l1_table_index << ", "
+ << "snap_id=" << snap_id << ", "
+ << "image_offset=" << cluster_extent.image_offset << ", "
+ << "l2_table_index=" << l2_table_index << ", "
+ << "cluster_offset=" << cluster_offset << ", "
+ << "prev_cluster_offset=" << prev_cluster_offset << dendl;
+
+ auto state = io::SPARSE_EXTENT_STATE_DATA;
+ if (cluster_offset == prev_cluster_offset) {
+ continue;
+ } else if ((prev_cluster_offset && !cluster_offset) ||
+ *cluster_offset == QCOW_OFLAG_ZERO) {
+ // explicitly zeroed or deallocated
+ state = io::SPARSE_EXTENT_STATE_ZEROED;
+ }
+
+ sparse_extents.insert(
+ cluster_extent.image_offset, cluster_extent.cluster_length,
+ {state, cluster_extent.cluster_length});
+ }
+
+ ldout(cct, 20) << "l1_table_index=" << l1_table_index << ", "
+ << "snap_id=" << snap_id << ", "
+ << "sparse_extents=" << sparse_extents << dendl;
+
+ // continue processing the L2 table at this index for all snapshots
+ boost::asio::post(*qcow_format->m_image_ctx->asio_engine,
+ [this]() { get_l2_table(); });
+ }
+
+
+ void finish(int r) {
+ auto cct = qcow_format->m_image_ctx->cct;
+ ldout(cct, 20) << "r=" << r << dendl;
+
+ on_finish->complete(r);
+ delete this;
+ }
+};
+
+#undef dout_prefix
+#define dout_prefix *_dout << "librbd::migration::QCOWFormat: " << this \
+ << " " << __func__ << ": "
+
+template <typename I>
+QCOWFormat<I>::QCOWFormat(
+ I* image_ctx, const json_spirit::mObject& json_object,
+ const SourceSpecBuilder<I>* source_spec_builder)
+ : m_image_ctx(image_ctx), m_json_object(json_object),
+ m_source_spec_builder(source_spec_builder),
+ m_strand(*image_ctx->asio_engine) {
+}
+
+template <typename I>
+void QCOWFormat<I>::open(Context* on_finish) {
+ auto cct = m_image_ctx->cct;
+ ldout(cct, 10) << dendl;
+
+ int r = m_source_spec_builder->build_stream(m_json_object, &m_stream);
+ if (r < 0) {
+ lderr(cct) << "failed to build migration stream handler" << cpp_strerror(r)
+ << dendl;
+ on_finish->complete(r);
+ return;
+ }
+
+ auto ctx = new LambdaContext([this, on_finish](int r) {
+ handle_open(r, on_finish); });
+ m_stream->open(ctx);
+}
+
+template <typename I>
+void QCOWFormat<I>::handle_open(int r, Context* on_finish) {
+ auto cct = m_image_ctx->cct;
+ ldout(cct, 10) << "r=" << r << dendl;
+
+ if (r < 0) {
+ lderr(cct) << "failed to open QCOW image: " << cpp_strerror(r)
+ << dendl;
+ on_finish->complete(r);
+ return;
+ }
+
+ probe(on_finish);
+}
+
+template <typename I>
+void QCOWFormat<I>::probe(Context* on_finish) {
+ auto cct = m_image_ctx->cct;
+ ldout(cct, 10) << dendl;
+
+ auto ctx = new LambdaContext([this, on_finish](int r) {
+ handle_probe(r, on_finish); });
+ m_bl.clear();
+ m_stream->read({{0, 8}}, &m_bl, ctx);
+}
+
+template <typename I>
+void QCOWFormat<I>::handle_probe(int r, Context* on_finish) {
+ auto cct = m_image_ctx->cct;
+ ldout(cct, 10) << "r=" << r << dendl;
+
+ if (r < 0) {
+ lderr(cct) << "failed to probe QCOW image: " << cpp_strerror(r)
+ << dendl;
+ on_finish->complete(r);
+ return;
+ }
+
+ auto header_probe = *reinterpret_cast<QCowHeaderProbe*>(
+ m_bl.c_str());
+ header_probe.magic = be32toh(header_probe.magic);
+ header_probe.version = be32toh(header_probe.version);
+
+ if (header_probe.magic != QCOW_MAGIC) {
+ lderr(cct) << "invalid QCOW header magic" << dendl;
+ on_finish->complete(-EINVAL);
+ return;
+ }
+
+ m_bl.clear();
+ if (header_probe.version == 1) {
+#ifdef WITH_RBD_MIGRATION_FORMAT_QCOW_V1
+ read_v1_header(on_finish);
+#else // WITH_RBD_MIGRATION_FORMAT_QCOW_V1
+ lderr(cct) << "QCOW is not supported" << dendl;
+ on_finish->complete(-ENOTSUP);
+#endif // WITH_RBD_MIGRATION_FORMAT_QCOW_V1
+ return;
+ } else if (header_probe.version >= 2 && header_probe.version <= 3) {
+ read_v2_header(on_finish);
+ return;
+ } else {
+ lderr(cct) << "invalid QCOW header version " << header_probe.version
+ << dendl;
+ on_finish->complete(-EINVAL);
+ return;
+ }
+}
+
+#ifdef WITH_RBD_MIGRATION_FORMAT_QCOW_V1
+
+template <typename I>
+void QCOWFormat<I>::read_v1_header(Context* on_finish) {
+ auto cct = m_image_ctx->cct;
+ ldout(cct, 10) << dendl;
+
+ auto ctx = new LambdaContext([this, on_finish](int r) {
+ handle_read_v1_header(r, on_finish); });
+ m_bl.clear();
+ m_stream->read({{0, sizeof(QCowHeaderV1)}}, &m_bl, ctx);
+}
+
+template <typename I>
+void QCOWFormat<I>::handle_read_v1_header(int r, Context* on_finish) {
+ auto cct = m_image_ctx->cct;
+ ldout(cct, 10) << "r=" << r << dendl;
+
+ if (r < 0) {
+ lderr(cct) << "failed to read QCOW header: " << cpp_strerror(r) << dendl;
+ on_finish->complete(r);
+ return;
+ }
+
+ auto header = *reinterpret_cast<QCowHeaderV1*>(m_bl.c_str());
+
+ // byte-swap important fields
+ header.magic = be32toh(header.magic);
+ header.version = be32toh(header.version);
+ header.backing_file_offset = be64toh(header.backing_file_offset);
+ header.backing_file_size = be32toh(header.backing_file_size);
+ header.size = be64toh(header.size);
+ header.crypt_method = be32toh(header.crypt_method);
+ header.l1_table_offset = be64toh(header.l1_table_offset);
+
+ if (header.magic != QCOW_MAGIC || header.version != 1) {
+ // honestly shouldn't happen since we've already validated it
+ lderr(cct) << "header is not QCOW" << dendl;
+ on_finish->complete(-EINVAL);
+ return;
+ }
+
+ if (header.cluster_bits < QCOW_MIN_CLUSTER_BITS ||
+ header.cluster_bits > QCOW_MAX_CLUSTER_BITS) {
+ lderr(cct) << "invalid cluster bits: " << header.cluster_bits << dendl;
+ on_finish->complete(-EINVAL);
+ return;
+ }
+
+ if (header.l2_bits < (QCOW_MIN_CLUSTER_BITS - 3) ||
+ header.l2_bits > (QCOW_MAX_CLUSTER_BITS - 3)) {
+ lderr(cct) << "invalid L2 bits: " << header.l2_bits << dendl;
+ on_finish->complete(-EINVAL);
+ return;
+ }
+
+ if (header.crypt_method != QCOW_CRYPT_NONE) {
+ lderr(cct) << "invalid or unsupported encryption method" << dendl;
+ on_finish->complete(-EINVAL);
+ return;
+ }
+
+ m_size = header.size;
+ if (p2roundup(m_size, static_cast<uint64_t>(512)) != m_size) {
+ lderr(cct) << "image size is not a multiple of block size" << dendl;
+ on_finish->complete(-EINVAL);
+ return;
+ }
+
+ m_backing_file_offset = header.backing_file_offset;
+ m_backing_file_size = header.backing_file_size;
+
+ m_cluster_bits = header.cluster_bits;
+ m_cluster_size = 1UL << header.cluster_bits;
+ m_cluster_offset_mask = (1ULL << (63 - header.cluster_bits)) - 1;
+ m_cluster_mask = ~QCOW_OFLAG_COMPRESSED;
+
+ m_l2_bits = header.l2_bits;
+ m_l2_size = (1UL << m_l2_bits);
+
+ m_l1_shift = m_cluster_bits + m_l2_bits;
+ m_l1_table.size = (m_size + (1LL << m_l1_shift) - 1) >> m_l1_shift;
+ m_l1_table_offset = header.l1_table_offset;
+ if (m_size > (std::numeric_limits<uint64_t>::max() - (1ULL << m_l1_shift)) ||
+ m_l1_table.size >
+ (std::numeric_limits<int32_t>::max() / sizeof(uint64_t))) {
+ lderr(cct) << "image size too big: " << m_size << dendl;
+ on_finish->complete(-EINVAL);
+ return;
+ }
+
+ ldout(cct, 15) << "size=" << m_size << ", "
+ << "cluster_bits=" << m_cluster_bits << ", "
+ << "l2_bits=" << m_l2_bits << dendl;
+
+ // allocate memory for L1 table and L2 + cluster caches
+ m_l2_table_cache = std::make_unique<L2TableCache>(this);
+ m_cluster_cache = std::make_unique<ClusterCache>(this);
+
+ read_l1_table(on_finish);
+}
+
+#endif // WITH_RBD_MIGRATION_FORMAT_QCOW_V1
+
+template <typename I>
+void QCOWFormat<I>::read_v2_header(Context* on_finish) {
+ auto cct = m_image_ctx->cct;
+ ldout(cct, 10) << dendl;
+
+ auto ctx = new LambdaContext([this, on_finish](int r) {
+ handle_read_v2_header(r, on_finish); });
+ m_bl.clear();
+ m_stream->read({{0, sizeof(QCowHeader)}}, &m_bl, ctx);
+}
+
+template <typename I>
+void QCOWFormat<I>::handle_read_v2_header(int r, Context* on_finish) {
+ auto cct = m_image_ctx->cct;
+ ldout(cct, 10) << "r=" << r << dendl;
+
+ if (r < 0) {
+ lderr(cct) << "failed to read QCOW2 header: " << cpp_strerror(r) << dendl;
+ on_finish->complete(r);
+ return;
+ }
+
+ auto header = *reinterpret_cast<QCowHeader*>(m_bl.c_str());
+
+ // byte-swap important fields
+ header.magic = be32toh(header.magic);
+ header.version = be32toh(header.version);
+ header.backing_file_offset = be64toh(header.backing_file_offset);
+ header.backing_file_size = be32toh(header.backing_file_size);
+ header.cluster_bits = be32toh(header.cluster_bits);
+ header.size = be64toh(header.size);
+ header.crypt_method = be32toh(header.crypt_method);
+ header.l1_size = be32toh(header.l1_size);
+ header.l1_table_offset = be64toh(header.l1_table_offset);
+ header.nb_snapshots = be32toh(header.nb_snapshots);
+ header.snapshots_offset = be64toh(header.snapshots_offset);
+
+ if (header.version == 2) {
+ // valid only for version >= 3
+ header.incompatible_features = 0;
+ header.compatible_features = 0;
+ header.autoclear_features = 0;
+ header.header_length = 72;
+ header.compression_type = 0;
+ } else {
+ header.incompatible_features = be64toh(header.incompatible_features);
+ header.compatible_features = be64toh(header.compatible_features);
+ header.autoclear_features = be64toh(header.autoclear_features);
+ header.header_length = be32toh(header.header_length);
+ }
+
+ if (header.magic != QCOW_MAGIC || header.version < 2 || header.version > 3) {
+ // honestly shouldn't happen since we've already validated it
+ lderr(cct) << "header is not QCOW2" << dendl;
+ on_finish->complete(-EINVAL);
+ return;
+ }
+
+ if (header.cluster_bits < QCOW_MIN_CLUSTER_BITS ||
+ header.cluster_bits > QCOW_MAX_CLUSTER_BITS) {
+ lderr(cct) << "invalid cluster bits: " << header.cluster_bits << dendl;
+ on_finish->complete(-EINVAL);
+ return;
+ }
+
+ if (header.crypt_method != QCOW_CRYPT_NONE) {
+ lderr(cct) << "invalid or unsupported encryption method" << dendl;
+ on_finish->complete(-EINVAL);
+ return;
+ }
+
+ m_size = header.size;
+ if (p2roundup(m_size, static_cast<uint64_t>(512)) != m_size) {
+ lderr(cct) << "image size is not a multiple of block size" << dendl;
+ on_finish->complete(-EINVAL);
+ return;
+ }
+
+ if (header.header_length <= offsetof(QCowHeader, compression_type)) {
+ header.compression_type = 0;
+ }
+
+ if ((header.compression_type != 0) ||
+ ((header.incompatible_features & QCOW2_INCOMPAT_COMPRESSION) != 0)) {
+ lderr(cct) << "invalid or unsupported compression type" << dendl;
+ on_finish->complete(-EINVAL);
+ return;
+ }
+
+ if ((header.incompatible_features & QCOW2_INCOMPAT_DATA_FILE) != 0) {
+ lderr(cct) << "external data file feature not supported" << dendl;
+ on_finish->complete(-ENOTSUP);
+ }
+
+ if ((header.incompatible_features & QCOW2_INCOMPAT_EXTL2) != 0) {
+ lderr(cct) << "extended L2 table feature not supported" << dendl;
+ on_finish->complete(-ENOTSUP);
+ return;
+ }
+
+ header.incompatible_features &= ~QCOW2_INCOMPAT_MASK;
+ if (header.incompatible_features != 0) {
+ lderr(cct) << "unknown incompatible feature enabled" << dendl;
+ on_finish->complete(-EINVAL);
+ return;
+ }
+
+ m_backing_file_offset = header.backing_file_offset;
+ m_backing_file_size = header.backing_file_size;
+
+ m_cluster_bits = header.cluster_bits;
+ m_cluster_size = 1UL << header.cluster_bits;
+ m_cluster_offset_mask = (1ULL << (63 - header.cluster_bits)) - 1;
+ m_cluster_mask = ~(QCOW_OFLAG_COMPRESSED | QCOW_OFLAG_COPIED);
+
+ // L2 table is fixed a (1) cluster block to hold 8-byte (3 bit) offsets
+ m_l2_bits = m_cluster_bits - 3;
+ m_l2_size = (1UL << m_l2_bits);
+
+ m_l1_shift = m_cluster_bits + m_l2_bits;
+ m_l1_table.size = (m_size + (1LL << m_l1_shift) - 1) >> m_l1_shift;
+ m_l1_table_offset = header.l1_table_offset;
+ if (m_size > (std::numeric_limits<uint64_t>::max() - (1ULL << m_l1_shift)) ||
+ m_l1_table.size >
+ (std::numeric_limits<int32_t>::max() / sizeof(uint64_t))) {
+ lderr(cct) << "image size too big: " << m_size << dendl;
+ on_finish->complete(-EINVAL);
+ return;
+ } else if (m_l1_table.size > header.l1_size) {
+ lderr(cct) << "invalid L1 table size in header (" << header.l1_size
+ << " < " << m_l1_table.size << ")" << dendl;
+ on_finish->complete(-EINVAL);
+ return;
+ }
+
+ m_snapshot_count = header.nb_snapshots;
+ m_snapshots_offset = header.snapshots_offset;
+
+ ldout(cct, 15) << "size=" << m_size << ", "
+ << "cluster_bits=" << m_cluster_bits << ", "
+ << "l1_table_offset=" << m_l1_table_offset << ", "
+ << "snapshot_count=" << m_snapshot_count << ", "
+ << "snapshots_offset=" << m_snapshots_offset << dendl;
+
+ // allocate memory for L1 table and L2 + cluster caches
+ m_l2_table_cache = std::make_unique<L2TableCache>(this);
+ m_cluster_cache = std::make_unique<ClusterCache>(this);
+
+ read_snapshot(on_finish);
+}
+
+template <typename I>
+void QCOWFormat<I>::read_snapshot(Context* on_finish) {
+ if (m_snapshots_offset == 0 || m_snapshots.size() == m_snapshot_count) {
+ read_l1_table(on_finish);
+ return;
+ }
+
+ // header is always aligned on 8 byte boundary
+ m_snapshots_offset = p2roundup(m_snapshots_offset, static_cast<uint64_t>(8));
+
+ auto cct = m_image_ctx->cct;
+ ldout(cct, 10) << "snap_id=" << (m_snapshots.size() + 1) << ", "
+ << "offset=" << m_snapshots_offset << dendl;
+
+ auto ctx = new LambdaContext([this, on_finish](int r) {
+ handle_read_snapshot(r, on_finish); });
+ m_bl.clear();
+ m_stream->read({{m_snapshots_offset, sizeof(QCowSnapshotHeader)}}, &m_bl,
+ ctx);
+}
+
+template <typename I>
+void QCOWFormat<I>::handle_read_snapshot(int r, Context* on_finish) {
+ auto cct = m_image_ctx->cct;
+ ldout(cct, 10) << "r=" << r << ", "
+ << "index=" << m_snapshots.size() << dendl;
+
+ if (r < 0) {
+ lderr(cct) << "failed to read QCOW2 snapshot header: " << cpp_strerror(r)
+ << dendl;
+ on_finish->complete(r);
+ return;
+ }
+
+ m_snapshots_offset += m_bl.length();
+ auto header = *reinterpret_cast<QCowSnapshotHeader*>(m_bl.c_str());
+
+ auto& snapshot = m_snapshots[m_snapshots.size() + 1];
+ snapshot.id.resize(be16toh(header.id_str_size));
+ snapshot.name.resize(be16toh(header.name_size));
+ snapshot.l1_table_offset = be64toh(header.l1_table_offset);
+ snapshot.l1_table.size = be32toh(header.l1_size);
+ snapshot.timestamp.sec_ref() = be32toh(header.date_sec);
+ snapshot.timestamp.nsec_ref() = be32toh(header.date_nsec);
+ snapshot.extra_data_size = be32toh(header.extra_data_size);
+
+ ldout(cct, 10) << "snap_id=" << m_snapshots.size() << ", "
+ << "id_str_len=" << snapshot.id.size() << ", "
+ << "name_str_len=" << snapshot.name.size() << ", "
+ << "l1_table_offset=" << snapshot.l1_table_offset << ", "
+ << "l1_size=" << snapshot.l1_table.size << ", "
+ << "extra_data_size=" << snapshot.extra_data_size << dendl;
+
+ read_snapshot_extra(on_finish);
+}
+
+template <typename I>
+void QCOWFormat<I>::read_snapshot_extra(Context* on_finish) {
+ ceph_assert(!m_snapshots.empty());
+ auto& snapshot = m_snapshots.rbegin()->second;
+
+ uint32_t length = snapshot.extra_data_size +
+ snapshot.id.size() +
+ snapshot.name.size();
+ if (length == 0) {
+ uuid_d uuid_gen;
+ uuid_gen.generate_random();
+ snapshot.name = uuid_gen.to_string();
+
+ read_snapshot(on_finish);
+ return;
+ }
+
+ auto cct = m_image_ctx->cct;
+ ldout(cct, 10) << "snap_id=" << m_snapshots.size() << ", "
+ << "offset=" << m_snapshots_offset << ", "
+ << "length=" << length << dendl;
+
+ auto offset = m_snapshots_offset;
+ m_snapshots_offset += length;
+
+ auto ctx = new LambdaContext([this, on_finish](int r) {
+ handle_read_snapshot_extra(r, on_finish); });
+ m_bl.clear();
+ m_stream->read({{offset, length}}, &m_bl, ctx);
+}
+
+template <typename I>
+void QCOWFormat<I>::handle_read_snapshot_extra(int r, Context* on_finish) {
+ ceph_assert(!m_snapshots.empty());
+ auto& snapshot = m_snapshots.rbegin()->second;
+
+ auto cct = m_image_ctx->cct;
+ ldout(cct, 10) << "r=" << r << ", "
+ << "snap_id=" << m_snapshots.size() << dendl;
+
+ if (r < 0) {
+ lderr(cct) << "failed to read QCOW2 snapshot header extra: "
+ << cpp_strerror(r) << dendl;
+ on_finish->complete(r);
+ return;
+ }
+
+ if (snapshot.extra_data_size >=
+ offsetof(QCowSnapshotExtraData, disk_size) + sizeof(uint64_t)) {
+ auto extra = reinterpret_cast<const QCowSnapshotExtraData*>(m_bl.c_str());
+ snapshot.size = be64toh(extra->disk_size);
+ } else {
+ snapshot.size = m_size;
+ }
+
+ auto data = reinterpret_cast<const char*>(m_bl.c_str());
+ data += snapshot.extra_data_size;
+
+ if (!snapshot.id.empty()) {
+ snapshot.id = std::string(data, snapshot.id.size());
+ data += snapshot.id.size();
+ }
+
+ if (!snapshot.name.empty()) {
+ snapshot.name = std::string(data, snapshot.name.size());
+ data += snapshot.name.size();
+ } else {
+ uuid_d uuid_gen;
+ uuid_gen.generate_random();
+ snapshot.name = uuid_gen.to_string();
+ }
+
+ ldout(cct, 10) << "snap_id=" << m_snapshots.size() << ", "
+ << "name=" << snapshot.name << ", "
+ << "size=" << snapshot.size << dendl;
+ read_snapshot_l1_table(on_finish);
+}
+
+template <typename I>
+void QCOWFormat<I>::read_snapshot_l1_table(Context* on_finish) {
+ ceph_assert(!m_snapshots.empty());
+ auto& snapshot = m_snapshots.rbegin()->second;
+
+ auto cct = m_image_ctx->cct;
+ ldout(cct, 10) << "snap_id=" << m_snapshots.size() << ", "
+ << "l1_table_offset=" << snapshot.l1_table_offset
+ << dendl;
+
+ auto ctx = new LambdaContext([this, on_finish](int r) {
+ handle_read_snapshot_l1_table(r, on_finish); });
+ m_stream->read({{snapshot.l1_table_offset,
+ snapshot.l1_table.size * sizeof(uint64_t)}},
+ &snapshot.l1_table.bl, ctx);
+}
+
+template <typename I>
+void QCOWFormat<I>::handle_read_snapshot_l1_table(int r, Context* on_finish) {
+ ceph_assert(!m_snapshots.empty());
+ auto& snapshot = m_snapshots.rbegin()->second;
+
+ auto cct = m_image_ctx->cct;
+ ldout(cct, 10) << "r=" << r << ", "
+ << "snap_id=" << m_snapshots.size() << dendl;
+
+ if (r < 0) {
+ lderr(cct) << "failed to read snapshot L1 table: " << cpp_strerror(r)
+ << dendl;
+ on_finish->complete(r);
+ return;
+ }
+
+ snapshot.l1_table.decode();
+ read_snapshot(on_finish);
+}
+
+template <typename I>
+void QCOWFormat<I>::read_l1_table(Context* on_finish) {
+ auto cct = m_image_ctx->cct;
+ ldout(cct, 10) << dendl;
+
+ auto ctx = new LambdaContext([this, on_finish](int r) {
+ handle_read_l1_table(r, on_finish); });
+ m_stream->read({{m_l1_table_offset,
+ m_l1_table.size * sizeof(uint64_t)}},
+ &m_l1_table.bl, ctx);
+}
+
+template <typename I>
+void QCOWFormat<I>::handle_read_l1_table(int r, Context* on_finish) {
+ auto cct = m_image_ctx->cct;
+ ldout(cct, 10) << "r=" << r << dendl;
+
+ if (r < 0) {
+ lderr(cct) << "failed to read L1 table: " << cpp_strerror(r) << dendl;
+ on_finish->complete(r);
+ return;
+ }
+
+ m_l1_table.decode();
+ read_backing_file(on_finish);
+}
+
+template <typename I>
+void QCOWFormat<I>::read_backing_file(Context* on_finish) {
+ if (m_backing_file_offset == 0 || m_backing_file_size == 0) {
+ // all data is within the specified file
+ on_finish->complete(0);
+ return;
+ }
+
+ auto cct = m_image_ctx->cct;
+ ldout(cct, 10) << dendl;
+
+ // TODO add support for backing files
+ on_finish->complete(-ENOTSUP);
+}
+
+template <typename I>
+void QCOWFormat<I>::close(Context* on_finish) {
+ auto cct = m_image_ctx->cct;
+ ldout(cct, 10) << dendl;
+
+ m_stream->close(on_finish);
+}
+
+template <typename I>
+void QCOWFormat<I>::get_snapshots(SnapInfos* snap_infos, Context* on_finish) {
+ auto cct = m_image_ctx->cct;
+ ldout(cct, 10) << dendl;
+
+ snap_infos->clear();
+ for (auto& [snap_id, snapshot] : m_snapshots) {
+ SnapInfo snap_info(snapshot.name, cls::rbd::UserSnapshotNamespace{},
+ snapshot.size, {}, 0, 0, snapshot.timestamp);
+ snap_infos->emplace(snap_id, snap_info);
+ }
+
+ on_finish->complete(0);
+}
+
+template <typename I>
+void QCOWFormat<I>::get_image_size(uint64_t snap_id, uint64_t* size,
+ Context* on_finish) {
+ auto cct = m_image_ctx->cct;
+ ldout(cct, 10) << "snap_id=" << snap_id << dendl;
+
+ if (snap_id == CEPH_NOSNAP) {
+ *size = m_size;
+ } else {
+ auto snapshot_it = m_snapshots.find(snap_id);
+ if (snapshot_it == m_snapshots.end()) {
+ on_finish->complete(-ENOENT);
+ return;
+ }
+
+ auto& snapshot = snapshot_it->second;
+ *size = snapshot.size;
+ }
+
+ on_finish->complete(0);
+}
+
+template <typename I>
+bool QCOWFormat<I>::read(
+ io::AioCompletion* aio_comp, uint64_t snap_id, io::Extents&& image_extents,
+ io::ReadResult&& read_result, int op_flags, int read_flags,
+ const ZTracer::Trace &parent_trace) {
+ auto cct = m_image_ctx->cct;
+ ldout(cct, 20) << "snap_id=" << snap_id << ", "
+ << "image_extents=" << image_extents << dendl;
+
+ const LookupTable* l1_table = nullptr;
+ if (snap_id == CEPH_NOSNAP) {
+ l1_table = &m_l1_table;
+ } else {
+ auto snapshot_it = m_snapshots.find(snap_id);
+ if (snapshot_it == m_snapshots.end()) {
+ aio_comp->fail(-ENOENT);
+ return true;
+ }
+
+ auto& snapshot = snapshot_it->second;
+ l1_table = &snapshot.l1_table;
+ }
+
+ aio_comp->read_result = std::move(read_result);
+ aio_comp->read_result.set_image_extents(image_extents);
+
+ auto read_request = new ReadRequest(this, aio_comp, l1_table,
+ std::move(image_extents));
+ read_request->send();
+
+ return true;
+}
+
+template <typename I>
+void QCOWFormat<I>::list_snaps(io::Extents&& image_extents,
+ io::SnapIds&& snap_ids, int list_snaps_flags,
+ io::SnapshotDelta* snapshot_delta,
+ const ZTracer::Trace &parent_trace,
+ Context* on_finish) {
+ auto cct = m_image_ctx->cct;
+ ldout(cct, 20) << "image_extents=" << image_extents << dendl;
+
+ ClusterExtents cluster_extents;
+ populate_cluster_extents(cct, m_cluster_size, image_extents,
+ &cluster_extents);
+
+ // map L1 table indexes to cluster extents
+ std::map<uint64_t, ClusterExtents> l1_cluster_extents;
+ for (auto& cluster_extent : cluster_extents) {
+ uint32_t l1_table_index = cluster_extent.image_offset >> m_l1_shift;
+ auto& l1_cluster_extent = l1_cluster_extents[l1_table_index];
+ l1_cluster_extent.reserve(cluster_extents.size());
+ l1_cluster_extent.push_back(cluster_extent);
+ }
+
+ std::map<uint64_t, const LookupTable*> snap_id_to_l1_table;
+ for (auto& [snap_id, snapshot] : m_snapshots) {
+ snap_id_to_l1_table[snap_id] = &snapshot.l1_table;
+ }
+ snap_id_to_l1_table[CEPH_NOSNAP] = &m_l1_table;
+
+ on_finish = new LambdaContext([this, image_extents,
+ snap_ids=std::move(snap_ids),
+ snapshot_delta, on_finish](int r) mutable {
+ handle_list_snaps(r, std::move(image_extents), std::move(snap_ids),
+ snapshot_delta, on_finish);
+ });
+
+ auto gather_ctx = new C_Gather(cct, on_finish);
+
+ for (auto& [l1_table_index, cluster_extents] : l1_cluster_extents) {
+ auto list_snaps_request = new ListSnapsRequest(
+ this, l1_table_index, std::move(cluster_extents), snap_id_to_l1_table,
+ snapshot_delta, gather_ctx->new_sub());
+ list_snaps_request->send();
+ }
+
+ gather_ctx->activate();
+}
+
+template <typename I>
+void QCOWFormat<I>::handle_list_snaps(int r, io::Extents&& image_extents,
+ io::SnapIds&& snap_ids,
+ io::SnapshotDelta* snapshot_delta,
+ Context* on_finish) {
+ auto cct = m_image_ctx->cct;
+ ldout(cct, 20) << "r=" << r << ", "
+ << "snapshot_delta=" << *snapshot_delta << dendl;
+
+ std::optional<uint64_t> previous_size = std::nullopt;
+ for (auto& [snap_id, snapshot] : m_snapshots) {
+ auto sparse_extents = &(*snapshot_delta)[{snap_id, snap_id}];
+ util::zero_shrunk_snapshot(cct, image_extents, snap_id, snapshot.size,
+ &previous_size, sparse_extents);
+ }
+
+ auto sparse_extents = &(*snapshot_delta)[{CEPH_NOSNAP, CEPH_NOSNAP}];
+ util::zero_shrunk_snapshot(cct, image_extents, CEPH_NOSNAP, m_size,
+ &previous_size, sparse_extents);
+
+ util::merge_snapshot_delta(snap_ids, snapshot_delta);
+ on_finish->complete(r);
+}
+
+} // namespace migration
+} // namespace librbd
+
+template class librbd::migration::QCOWFormat<librbd::ImageCtx>;
diff --git a/src/librbd/migration/QCOWFormat.h b/src/librbd/migration/QCOWFormat.h
new file mode 100644
index 000000000..b36506716
--- /dev/null
+++ b/src/librbd/migration/QCOWFormat.h
@@ -0,0 +1,211 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#ifndef CEPH_LIBRBD_MIGRATION_QCOW_FORMAT_H
+#define CEPH_LIBRBD_MIGRATION_QCOW_FORMAT_H
+
+#include "include/int_types.h"
+#include "librbd/Types.h"
+#include "librbd/migration/FormatInterface.h"
+#include "librbd/migration/QCOW.h"
+#include "acconfig.h"
+#include "json_spirit/json_spirit.h"
+#include <boost/asio/io_context_strand.hpp>
+#include <boost/iostreams/filter/zlib.hpp>
+#include <deque>
+#include <vector>
+#include <memory>
+
+struct Context;
+
+namespace librbd {
+
+struct AsioEngine;
+struct ImageCtx;
+
+namespace migration {
+
+template <typename> struct SourceSpecBuilder;
+struct StreamInterface;
+
+namespace qcow_format {
+
+struct LookupTable {
+ LookupTable() {}
+ LookupTable(uint32_t size) : size(size) {}
+
+ bufferlist bl;
+ uint64_t* cluster_offsets = nullptr;
+ uint32_t size = 0;
+ bool decoded = false;
+
+ void init();
+ void decode();
+};
+
+} // namespace qcow_format
+
+template <typename ImageCtxT>
+class QCOWFormat : public FormatInterface {
+public:
+ static QCOWFormat* create(
+ ImageCtxT* image_ctx, const json_spirit::mObject& json_object,
+ const SourceSpecBuilder<ImageCtxT>* source_spec_builder) {
+ return new QCOWFormat(image_ctx, json_object, source_spec_builder);
+ }
+
+ QCOWFormat(ImageCtxT* image_ctx, const json_spirit::mObject& json_object,
+ const SourceSpecBuilder<ImageCtxT>* source_spec_builder);
+ QCOWFormat(const QCOWFormat&) = delete;
+ QCOWFormat& operator=(const QCOWFormat&) = delete;
+
+ void open(Context* on_finish) override;
+ void close(Context* on_finish) override;
+
+ void get_snapshots(SnapInfos* snap_infos, Context* on_finish) override;
+ void get_image_size(uint64_t snap_id, uint64_t* size,
+ Context* on_finish) override;
+
+ bool read(io::AioCompletion* aio_comp, uint64_t snap_id,
+ io::Extents&& image_extents, io::ReadResult&& read_result,
+ int op_flags, int read_flags,
+ const ZTracer::Trace &parent_trace) override;
+
+ void list_snaps(io::Extents&& image_extents, io::SnapIds&& snap_ids,
+ int list_snaps_flags, io::SnapshotDelta* snapshot_delta,
+ const ZTracer::Trace &parent_trace,
+ Context* on_finish) override;
+
+private:
+ /**
+ * @verbatim
+ *
+ * <start>
+ * |
+ * v
+ * OPEN
+ * |
+ * v
+ * PROBE
+ * |
+ * |\---> READ V1 HEADER ----------\
+ * | |
+ * \----> READ V2 HEADER |
+ * | |
+ * | /----------\ |
+ * | | | |
+ * v v | |
+ * READ SNAPSHOT | |
+ * | | |
+ * v | |
+ * READ SNAPSHOT EXTRA | |
+ * | | |
+ * v | |
+ * READ SNAPSHOT L1 TABLE |
+ * | |
+ * \--------------------\|
+ * |
+ * v
+ * READ L1 TABLE
+ * |
+ * v
+ * READ BACKING FILE
+ * |
+ * /-------------------------------/
+ * |
+ * v
+ * <opened>
+ *
+ * @endverbatim
+ */
+
+ struct Cluster;
+ struct ClusterCache;
+ struct L2TableCache;
+ struct ReadRequest;
+ struct ListSnapsRequest;
+
+ struct Snapshot {
+ std::string id;
+ std::string name;
+
+ utime_t timestamp;
+ uint64_t size = 0;
+
+ uint64_t l1_table_offset = 0;
+ qcow_format::LookupTable l1_table;
+
+ uint32_t extra_data_size = 0;
+ };
+
+ ImageCtxT* m_image_ctx;
+ json_spirit::mObject m_json_object;
+ const SourceSpecBuilder<ImageCtxT>* m_source_spec_builder;
+
+ boost::asio::io_context::strand m_strand;
+ std::shared_ptr<StreamInterface> m_stream;
+
+ bufferlist m_bl;
+
+ uint64_t m_size = 0;
+
+ uint64_t m_backing_file_offset = 0;
+ uint32_t m_backing_file_size = 0;
+
+ uint32_t m_cluster_bits = 0;
+ uint32_t m_cluster_size = 0;
+ uint64_t m_cluster_offset_mask = 0;
+ uint64_t m_cluster_mask = 0;
+
+ uint32_t m_l1_shift = 0;
+ uint64_t m_l1_table_offset = 0;
+ qcow_format::LookupTable m_l1_table;
+
+ uint32_t m_l2_bits = 0;
+ uint32_t m_l2_size = 0;
+
+ uint32_t m_snapshot_count = 0;
+ uint64_t m_snapshots_offset = 0;
+ std::map<uint64_t, Snapshot> m_snapshots;
+
+ std::unique_ptr<L2TableCache> m_l2_table_cache;
+ std::unique_ptr<ClusterCache> m_cluster_cache;
+
+ void handle_open(int r, Context* on_finish);
+
+ void probe(Context* on_finish);
+ void handle_probe(int r, Context* on_finish);
+
+#ifdef WITH_RBD_MIGRATION_FORMAT_QCOW_V1
+ void read_v1_header(Context* on_finish);
+ void handle_read_v1_header(int r, Context* on_finish);
+#endif // WITH_RBD_MIGRATION_FORMAT_QCOW_V1
+
+ void read_v2_header(Context* on_finish);
+ void handle_read_v2_header(int r, Context* on_finish);
+
+ void read_snapshot(Context* on_finish);
+ void handle_read_snapshot(int r, Context* on_finish);
+
+ void read_snapshot_extra(Context* on_finish);
+ void handle_read_snapshot_extra(int r, Context* on_finish);
+
+ void read_snapshot_l1_table(Context* on_finish);
+ void handle_read_snapshot_l1_table(int r, Context* on_finish);
+
+ void read_l1_table(Context* on_finish);
+ void handle_read_l1_table(int r, Context* on_finish);
+
+ void read_backing_file(Context* on_finish);
+
+ void handle_list_snaps(int r, io::Extents&& image_extents,
+ io::SnapIds&& snap_ids,
+ io::SnapshotDelta* snapshot_delta, Context* on_finish);
+};
+
+} // namespace migration
+} // namespace librbd
+
+extern template class librbd::migration::QCOWFormat<librbd::ImageCtx>;
+
+#endif // CEPH_LIBRBD_MIGRATION_QCOW_FORMAT_H
diff --git a/src/librbd/migration/RawFormat.cc b/src/librbd/migration/RawFormat.cc
new file mode 100644
index 000000000..0b655d368
--- /dev/null
+++ b/src/librbd/migration/RawFormat.cc
@@ -0,0 +1,235 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include "librbd/migration/RawFormat.h"
+#include "common/dout.h"
+#include "common/errno.h"
+#include "librbd/ImageCtx.h"
+#include "librbd/ImageState.h"
+#include "librbd/Utils.h"
+#include "librbd/io/AioCompletion.h"
+#include "librbd/io/ReadResult.h"
+#include "librbd/migration/SnapshotInterface.h"
+#include "librbd/migration/SourceSpecBuilder.h"
+#include "librbd/migration/Utils.h"
+
+namespace librbd {
+namespace migration {
+
+namespace {
+
+static const std::string SNAPSHOTS_KEY {"snapshots"};
+
+
+} // anonymous namespace
+
+#define dout_subsys ceph_subsys_rbd
+#undef dout_prefix
+#define dout_prefix *_dout << "librbd::migration::RawFormat: " << this \
+ << " " << __func__ << ": "
+
+template <typename I>
+RawFormat<I>::RawFormat(
+ I* image_ctx, const json_spirit::mObject& json_object,
+ const SourceSpecBuilder<I>* source_spec_builder)
+ : m_image_ctx(image_ctx), m_json_object(json_object),
+ m_source_spec_builder(source_spec_builder) {
+}
+
+template <typename I>
+void RawFormat<I>::open(Context* on_finish) {
+ auto cct = m_image_ctx->cct;
+ ldout(cct, 10) << dendl;
+
+ on_finish = new LambdaContext([this, on_finish](int r) {
+ handle_open(r, on_finish); });
+
+ // treat the base image as a HEAD-revision snapshot
+ Snapshots snapshots;
+ int r = m_source_spec_builder->build_snapshot(m_json_object, CEPH_NOSNAP,
+ &snapshots[CEPH_NOSNAP]);
+ if (r < 0) {
+ lderr(cct) << "failed to build HEAD revision handler: " << cpp_strerror(r)
+ << dendl;
+ on_finish->complete(r);
+ return;
+ }
+
+ auto& snapshots_val = m_json_object[SNAPSHOTS_KEY];
+ if (snapshots_val.type() == json_spirit::array_type) {
+ auto& snapshots_arr = snapshots_val.get_array();
+ for (auto& snapshot_val : snapshots_arr) {
+ uint64_t index = snapshots.size();
+ if (snapshot_val.type() != json_spirit::obj_type) {
+ lderr(cct) << "invalid snapshot " << index << " JSON: "
+ << cpp_strerror(r) << dendl;
+ on_finish->complete(-EINVAL);
+ return;
+ }
+
+ auto& snapshot_obj = snapshot_val.get_obj();
+ r = m_source_spec_builder->build_snapshot(snapshot_obj, index,
+ &snapshots[index]);
+ if (r < 0) {
+ lderr(cct) << "failed to build snapshot " << index << " handler: "
+ << cpp_strerror(r) << dendl;
+ on_finish->complete(r);
+ return;
+ }
+ }
+ } else if (snapshots_val.type() != json_spirit::null_type) {
+ lderr(cct) << "invalid snapshots array" << dendl;
+ on_finish->complete(-EINVAL);
+ return;
+ }
+
+ m_snapshots = std::move(snapshots);
+
+ auto gather_ctx = new C_Gather(cct, on_finish);
+ SnapshotInterface* previous_snapshot = nullptr;
+ for (auto& [_, snapshot] : m_snapshots) {
+ snapshot->open(previous_snapshot, gather_ctx->new_sub());
+ previous_snapshot = snapshot.get();
+ }
+ gather_ctx->activate();
+}
+
+template <typename I>
+void RawFormat<I>::handle_open(int r, Context* on_finish) {
+ auto cct = m_image_ctx->cct;
+ ldout(cct, 10) << "r=" << r << dendl;
+
+ if (r < 0) {
+ lderr(cct) << "failed to open raw image: " << cpp_strerror(r)
+ << dendl;
+
+ auto gather_ctx = new C_Gather(cct, on_finish);
+ for (auto& [_, snapshot] : m_snapshots) {
+ snapshot->close(gather_ctx->new_sub());
+ }
+
+ m_image_ctx->state->close(new LambdaContext(
+ [r, on_finish=gather_ctx->new_sub()](int _) { on_finish->complete(r); }));
+
+ gather_ctx->activate();
+ return;
+ }
+
+ on_finish->complete(0);
+}
+
+template <typename I>
+void RawFormat<I>::close(Context* on_finish) {
+ auto cct = m_image_ctx->cct;
+ ldout(cct, 10) << dendl;
+
+ auto gather_ctx = new C_Gather(cct, on_finish);
+ for (auto& [snap_id, snapshot] : m_snapshots) {
+ snapshot->close(gather_ctx->new_sub());
+ }
+
+ gather_ctx->activate();
+}
+
+template <typename I>
+void RawFormat<I>::get_snapshots(SnapInfos* snap_infos, Context* on_finish) {
+ auto cct = m_image_ctx->cct;
+ ldout(cct, 10) << dendl;
+
+ snap_infos->clear();
+ for (auto& [snap_id, snapshot] : m_snapshots) {
+ if (snap_id == CEPH_NOSNAP) {
+ continue;
+ }
+ snap_infos->emplace(snap_id, snapshot->get_snap_info());
+ }
+ on_finish->complete(0);
+}
+
+template <typename I>
+void RawFormat<I>::get_image_size(uint64_t snap_id, uint64_t* size,
+ Context* on_finish) {
+ auto cct = m_image_ctx->cct;
+ ldout(cct, 10) << dendl;
+
+ auto snapshot_it = m_snapshots.find(snap_id);
+ if (snapshot_it == m_snapshots.end()) {
+ on_finish->complete(-ENOENT);
+ return;
+ }
+
+ *size = snapshot_it->second->get_snap_info().size;
+ on_finish->complete(0);
+}
+
+template <typename I>
+bool RawFormat<I>::read(
+ io::AioCompletion* aio_comp, uint64_t snap_id, io::Extents&& image_extents,
+ io::ReadResult&& read_result, int op_flags, int read_flags,
+ const ZTracer::Trace &parent_trace) {
+ auto cct = m_image_ctx->cct;
+ ldout(cct, 20) << "snap_id=" << snap_id << ", "
+ << "image_extents=" << image_extents << dendl;
+
+ auto snapshot_it = m_snapshots.find(snap_id);
+ if (snapshot_it == m_snapshots.end()) {
+ aio_comp->fail(-ENOENT);
+ return true;
+ }
+
+ snapshot_it->second->read(aio_comp, std::move(image_extents),
+ std::move(read_result), op_flags, read_flags,
+ parent_trace);
+ return true;
+}
+
+template <typename I>
+void RawFormat<I>::list_snaps(io::Extents&& image_extents,
+ io::SnapIds&& snap_ids, int list_snaps_flags,
+ io::SnapshotDelta* snapshot_delta,
+ const ZTracer::Trace &parent_trace,
+ Context* on_finish) {
+ auto cct = m_image_ctx->cct;
+ ldout(cct, 20) << "image_extents=" << image_extents << dendl;
+
+ on_finish = new LambdaContext([this, snap_ids=std::move(snap_ids),
+ snapshot_delta, on_finish](int r) mutable {
+ handle_list_snaps(r, std::move(snap_ids), snapshot_delta, on_finish);
+ });
+
+ auto gather_ctx = new C_Gather(cct, on_finish);
+
+ std::optional<uint64_t> previous_size = std::nullopt;
+ for (auto& [snap_id, snapshot] : m_snapshots) {
+ auto& sparse_extents = (*snapshot_delta)[{snap_id, snap_id}];
+
+ // zero out any space between the previous snapshot end and this
+ // snapshot's end
+ auto& snap_info = snapshot->get_snap_info();
+ util::zero_shrunk_snapshot(cct, image_extents, snap_id, snap_info.size,
+ &previous_size, &sparse_extents);
+
+ // build set of data/zeroed extents for the current snapshot
+ snapshot->list_snap(io::Extents{image_extents}, list_snaps_flags,
+ &sparse_extents, parent_trace, gather_ctx->new_sub());
+ }
+
+ gather_ctx->activate();
+}
+
+template <typename I>
+void RawFormat<I>::handle_list_snaps(int r, io::SnapIds&& snap_ids,
+ io::SnapshotDelta* snapshot_delta,
+ Context* on_finish) {
+ auto cct = m_image_ctx->cct;
+ ldout(cct, 20) << "r=" << r << ", "
+ << "snapshot_delta=" << snapshot_delta << dendl;
+
+ util::merge_snapshot_delta(snap_ids, snapshot_delta);
+ on_finish->complete(r);
+}
+
+} // namespace migration
+} // namespace librbd
+
+template class librbd::migration::RawFormat<librbd::ImageCtx>;
diff --git a/src/librbd/migration/RawFormat.h b/src/librbd/migration/RawFormat.h
new file mode 100644
index 000000000..a20c0814f
--- /dev/null
+++ b/src/librbd/migration/RawFormat.h
@@ -0,0 +1,78 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#ifndef CEPH_LIBRBD_MIGRATION_RAW_FORMAT_H
+#define CEPH_LIBRBD_MIGRATION_RAW_FORMAT_H
+
+#include "include/int_types.h"
+#include "librbd/Types.h"
+#include "librbd/migration/FormatInterface.h"
+#include "json_spirit/json_spirit.h"
+#include <map>
+#include <memory>
+
+struct Context;
+
+namespace librbd {
+
+struct AsioEngine;
+struct ImageCtx;
+
+namespace migration {
+
+template <typename> struct SourceSpecBuilder;
+struct SnapshotInterface;
+
+template <typename ImageCtxT>
+class RawFormat : public FormatInterface {
+public:
+ static RawFormat* create(
+ ImageCtxT* image_ctx, const json_spirit::mObject& json_object,
+ const SourceSpecBuilder<ImageCtxT>* source_spec_builder) {
+ return new RawFormat(image_ctx, json_object, source_spec_builder);
+ }
+
+ RawFormat(ImageCtxT* image_ctx, const json_spirit::mObject& json_object,
+ const SourceSpecBuilder<ImageCtxT>* source_spec_builder);
+ RawFormat(const RawFormat&) = delete;
+ RawFormat& operator=(const RawFormat&) = delete;
+
+ void open(Context* on_finish) override;
+ void close(Context* on_finish) override;
+
+ void get_snapshots(SnapInfos* snap_infos, Context* on_finish) override;
+ void get_image_size(uint64_t snap_id, uint64_t* size,
+ Context* on_finish) override;
+
+ bool read(io::AioCompletion* aio_comp, uint64_t snap_id,
+ io::Extents&& image_extents, io::ReadResult&& read_result,
+ int op_flags, int read_flags,
+ const ZTracer::Trace &parent_trace) override;
+
+ void list_snaps(io::Extents&& image_extents, io::SnapIds&& snap_ids,
+ int list_snaps_flags, io::SnapshotDelta* snapshot_delta,
+ const ZTracer::Trace &parent_trace,
+ Context* on_finish) override;
+
+private:
+ typedef std::shared_ptr<SnapshotInterface> Snapshot;
+ typedef std::map<uint64_t, Snapshot> Snapshots;
+
+ ImageCtxT* m_image_ctx;
+ json_spirit::mObject m_json_object;
+ const SourceSpecBuilder<ImageCtxT>* m_source_spec_builder;
+
+ Snapshots m_snapshots;
+
+ void handle_open(int r, Context* on_finish);
+
+ void handle_list_snaps(int r, io::SnapIds&& snap_ids,
+ io::SnapshotDelta* snapshot_delta, Context* on_finish);
+};
+
+} // namespace migration
+} // namespace librbd
+
+extern template class librbd::migration::RawFormat<librbd::ImageCtx>;
+
+#endif // CEPH_LIBRBD_MIGRATION_RAW_FORMAT_H
diff --git a/src/librbd/migration/RawSnapshot.cc b/src/librbd/migration/RawSnapshot.cc
new file mode 100644
index 000000000..4a83fd8ad
--- /dev/null
+++ b/src/librbd/migration/RawSnapshot.cc
@@ -0,0 +1,220 @@
+// -*- mode:c++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include "librbd/migration/RawSnapshot.h"
+#include "common/dout.h"
+#include "common/errno.h"
+#include "librbd/ImageCtx.h"
+#include "librbd/Utils.h"
+#include "librbd/io/AioCompletion.h"
+#include "librbd/io/ReadResult.h"
+#include "librbd/migration/SourceSpecBuilder.h"
+#include "librbd/migration/StreamInterface.h"
+
+namespace librbd {
+namespace migration {
+
+namespace {
+
+const std::string NAME_KEY{"name"};
+
+} // anonymous namespace
+
+#define dout_subsys ceph_subsys_rbd
+#undef dout_prefix
+#define dout_prefix *_dout << "librbd::migration::RawSnapshot::OpenRequest " \
+ << this << " " << __func__ << ": "
+
+template <typename I>
+struct RawSnapshot<I>::OpenRequest {
+ RawSnapshot* raw_snapshot;
+ Context* on_finish;
+
+ OpenRequest(RawSnapshot* raw_snapshot, Context* on_finish)
+ : raw_snapshot(raw_snapshot), on_finish(on_finish) {
+ }
+
+ void send() {
+ open_stream();
+ }
+
+ void open_stream() {
+ auto cct = raw_snapshot->m_image_ctx->cct;
+ ldout(cct, 10) << dendl;
+
+ auto ctx = util::create_context_callback<
+ OpenRequest, &OpenRequest::handle_open_stream>(this);
+ raw_snapshot->m_stream->open(ctx);
+ }
+
+ void handle_open_stream(int r) {
+ auto cct = raw_snapshot->m_image_ctx->cct;
+ ldout(cct, 10) << "r=" << r << dendl;
+
+ if (r < 0) {
+ lderr(cct) << "failed to open stream: " << cpp_strerror(r) << dendl;
+ finish(r);
+ return;
+ }
+
+ get_image_size();
+ }
+
+ void get_image_size() {
+ auto cct = raw_snapshot->m_image_ctx->cct;
+ ldout(cct, 10) << dendl;
+
+ auto ctx = util::create_context_callback<
+ OpenRequest, &OpenRequest::handle_get_image_size>(this);
+ raw_snapshot->m_stream->get_size(&raw_snapshot->m_snap_info.size, ctx);
+ }
+
+ void handle_get_image_size(int r) {
+ auto cct = raw_snapshot->m_image_ctx->cct;
+ ldout(cct, 10) << "r=" << r << ", "
+ << "image_size=" << raw_snapshot->m_snap_info.size << dendl;
+
+ if (r < 0) {
+ lderr(cct) << "failed to open stream: " << cpp_strerror(r) << dendl;
+ close_stream(r);
+ return;
+ }
+
+ finish(0);
+ }
+
+ void close_stream(int r) {
+ auto cct = raw_snapshot->m_image_ctx->cct;
+ ldout(cct, 10) << dendl;
+
+ auto ctx = new LambdaContext([this, r](int) {
+ handle_close_stream(r);
+ });
+ raw_snapshot->m_stream->close(ctx);
+ }
+
+ void handle_close_stream(int r) {
+ auto cct = raw_snapshot->m_image_ctx->cct;
+ ldout(cct, 10) << "r=" << r << dendl;
+
+ raw_snapshot->m_stream.reset();
+
+ finish(r);
+ }
+
+ void finish(int r) {
+ auto cct = raw_snapshot->m_image_ctx->cct;
+ ldout(cct, 10) << "r=" << r << dendl;
+
+ on_finish->complete(r);
+ delete this;
+ }
+};
+
+#undef dout_prefix
+#define dout_prefix *_dout << "librbd::migration::RawSnapshot: " << this \
+ << " " << __func__ << ": "
+
+template <typename I>
+RawSnapshot<I>::RawSnapshot(I* image_ctx,
+ const json_spirit::mObject& json_object,
+ const SourceSpecBuilder<I>* source_spec_builder,
+ uint64_t index)
+ : m_image_ctx(image_ctx), m_json_object(json_object),
+ m_source_spec_builder(source_spec_builder), m_index(index),
+ m_snap_info({}, {}, 0, {}, 0, 0, {}) {
+ auto cct = m_image_ctx->cct;
+ ldout(cct, 10) << dendl;
+}
+
+template <typename I>
+void RawSnapshot<I>::open(SnapshotInterface* previous_snapshot,
+ Context* on_finish) {
+ auto cct = m_image_ctx->cct;
+
+ // special-case for treating the HEAD revision as a snapshot
+ if (m_index != CEPH_NOSNAP) {
+ auto& name_val = m_json_object[NAME_KEY];
+ if (name_val.type() == json_spirit::str_type) {
+ m_snap_info.name = name_val.get_str();
+ } else if (name_val.type() == json_spirit::null_type) {
+ uuid_d uuid_gen;
+ uuid_gen.generate_random();
+
+ m_snap_info.name = uuid_gen.to_string();
+ } else {
+ lderr(cct) << "invalid snapshot name" << dendl;
+ on_finish->complete(-EINVAL);
+ return;
+ }
+ }
+
+ ldout(cct, 10) << "name=" << m_snap_info.name << dendl;
+
+ int r = m_source_spec_builder->build_stream(m_json_object, &m_stream);
+ if (r < 0) {
+ lderr(cct) << "failed to build migration stream handler" << cpp_strerror(r)
+ << dendl;
+ on_finish->complete(r);
+ return;
+ }
+
+ auto req = new OpenRequest(this, on_finish);
+ req->send();
+}
+
+template <typename I>
+void RawSnapshot<I>::close(Context* on_finish) {
+ auto cct = m_image_ctx->cct;
+ ldout(cct, 10) << dendl;
+
+ if (!m_stream) {
+ on_finish->complete(0);
+ return;
+ }
+
+ m_stream->close(on_finish);
+}
+
+template <typename I>
+void RawSnapshot<I>::read(io::AioCompletion* aio_comp,
+ io::Extents&& image_extents,
+ io::ReadResult&& read_result, int op_flags,
+ int read_flags,
+ const ZTracer::Trace &parent_trace) {
+ auto cct = m_image_ctx->cct;
+ ldout(cct, 20) << "image_extents=" << image_extents << dendl;
+
+ aio_comp->read_result = std::move(read_result);
+ aio_comp->read_result.set_image_extents(image_extents);
+
+ aio_comp->set_request_count(1);
+ auto ctx = new io::ReadResult::C_ImageReadRequest(aio_comp,
+ 0, image_extents);
+
+ // raw directly maps the image-extent IO down to a byte IO extent
+ m_stream->read(std::move(image_extents), &ctx->bl, ctx);
+}
+
+template <typename I>
+void RawSnapshot<I>::list_snap(io::Extents&& image_extents,
+ int list_snaps_flags,
+ io::SparseExtents* sparse_extents,
+ const ZTracer::Trace &parent_trace,
+ Context* on_finish) {
+ auto cct = m_image_ctx->cct;
+ ldout(cct, 20) << "image_extents=" << image_extents << dendl;
+
+ // raw does support sparse extents so list the full IO extent as a delta
+ for (auto& [image_offset, image_length] : image_extents) {
+ sparse_extents->insert(image_offset, image_length,
+ {io::SPARSE_EXTENT_STATE_DATA, image_length});
+ }
+
+ on_finish->complete(0);
+}
+
+} // namespace migration
+} // namespace librbd
+
+template class librbd::migration::RawSnapshot<librbd::ImageCtx>;
diff --git a/src/librbd/migration/RawSnapshot.h b/src/librbd/migration/RawSnapshot.h
new file mode 100644
index 000000000..9f76d6878
--- /dev/null
+++ b/src/librbd/migration/RawSnapshot.h
@@ -0,0 +1,75 @@
+// -*- mode:c++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#ifndef CEPH_LIBRBD_MIGRATION_RAW_SNAPSHOT_H
+#define CEPH_LIBRBD_MIGRATION_RAW_SNAPSHOT_H
+
+#include "include/buffer_fwd.h"
+#include "include/int_types.h"
+#include "common/zipkin_trace.h"
+#include "librbd/Types.h"
+#include "librbd/io/Types.h"
+#include "librbd/migration/SnapshotInterface.h"
+#include "json_spirit/json_spirit.h"
+#include <memory>
+
+namespace librbd {
+
+struct ImageCtx;
+
+namespace migration {
+
+template <typename> struct SourceSpecBuilder;
+struct StreamInterface;
+
+template <typename ImageCtxT>
+class RawSnapshot : public SnapshotInterface {
+public:
+ static RawSnapshot* create(
+ ImageCtx* image_ctx, const json_spirit::mObject& json_object,
+ const SourceSpecBuilder<ImageCtxT>* source_spec_builder, uint64_t index) {
+ return new RawSnapshot(image_ctx, json_object, source_spec_builder, index);
+ }
+
+ RawSnapshot(ImageCtxT* image_ctx, const json_spirit::mObject& json_object,
+ const SourceSpecBuilder<ImageCtxT>* source_spec_builder,
+ uint64_t index);
+ RawSnapshot(const RawSnapshot&) = delete;
+ RawSnapshot& operator=(const RawSnapshot&) = delete;
+
+ void open(SnapshotInterface* previous_snapshot, Context* on_finish) override;
+ void close(Context* on_finish) override;
+
+ const SnapInfo& get_snap_info() const override {
+ return m_snap_info;
+ }
+
+ void read(io::AioCompletion* aio_comp, io::Extents&& image_extents,
+ io::ReadResult&& read_result, int op_flags, int read_flags,
+ const ZTracer::Trace &parent_trace) override;
+
+ void list_snap(io::Extents&& image_extents, int list_snaps_flags,
+ io::SparseExtents* sparse_extents,
+ const ZTracer::Trace &parent_trace,
+ Context* on_finish) override;
+
+private:
+ struct OpenRequest;
+
+ ImageCtxT* m_image_ctx;
+ json_spirit::mObject m_json_object;
+ const SourceSpecBuilder<ImageCtxT>* m_source_spec_builder;
+ uint64_t m_index = 0;
+
+ SnapInfo m_snap_info;
+
+ std::shared_ptr<StreamInterface> m_stream;
+
+};
+
+} // namespace migration
+} // namespace librbd
+
+extern template class librbd::migration::RawSnapshot<librbd::ImageCtx>;
+
+#endif // CEPH_LIBRBD_MIGRATION_RAW_SNAPSHOT_H
diff --git a/src/librbd/migration/S3Stream.cc b/src/librbd/migration/S3Stream.cc
new file mode 100644
index 000000000..3b4db0cef
--- /dev/null
+++ b/src/librbd/migration/S3Stream.cc
@@ -0,0 +1,202 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include "librbd/migration/S3Stream.h"
+#include "common/armor.h"
+#include "common/ceph_crypto.h"
+#include "common/ceph_time.h"
+#include "common/dout.h"
+#include "common/errno.h"
+#include "librbd/AsioEngine.h"
+#include "librbd/ImageCtx.h"
+#include "librbd/Utils.h"
+#include "librbd/asio/Utils.h"
+#include "librbd/io/AioCompletion.h"
+#include "librbd/io/ReadResult.h"
+#include "librbd/migration/HttpClient.h"
+#include "librbd/migration/HttpProcessorInterface.h"
+#include <boost/beast/http.hpp>
+
+#undef FMT_HEADER_ONLY
+#define FMT_HEADER_ONLY 1
+#include <fmt/chrono.h>
+#include <fmt/format.h>
+
+#include <time.h>
+
+namespace librbd {
+namespace migration {
+
+using HttpRequest = boost::beast::http::request<boost::beast::http::empty_body>;
+
+namespace {
+
+const std::string URL_KEY {"url"};
+const std::string ACCESS_KEY {"access_key"};
+const std::string SECRET_KEY {"secret_key"};
+
+} // anonymous namespace
+
+template <typename I>
+struct S3Stream<I>::HttpProcessor : public HttpProcessorInterface {
+ S3Stream* s3stream;
+
+ HttpProcessor(S3Stream* s3stream) : s3stream(s3stream) {
+ }
+
+ void process_request(EmptyRequest& request) override {
+ s3stream->process_request(request);
+ }
+};
+
+#define dout_subsys ceph_subsys_rbd
+#undef dout_prefix
+#define dout_prefix *_dout << "librbd::migration::S3Stream: " << this \
+ << " " << __func__ << ": "
+
+template <typename I>
+S3Stream<I>::S3Stream(I* image_ctx, const json_spirit::mObject& json_object)
+ : m_image_ctx(image_ctx), m_cct(image_ctx->cct),
+ m_asio_engine(image_ctx->asio_engine), m_json_object(json_object),
+ m_http_processor(std::make_unique<HttpProcessor>(this)) {
+}
+
+template <typename I>
+S3Stream<I>::~S3Stream() {
+}
+
+template <typename I>
+void S3Stream<I>::open(Context* on_finish) {
+ auto& url_value = m_json_object[URL_KEY];
+ if (url_value.type() != json_spirit::str_type) {
+ lderr(m_cct) << "failed to locate '" << URL_KEY << "' key" << dendl;
+ on_finish->complete(-EINVAL);
+ return;
+ }
+
+ auto& access_key = m_json_object[ACCESS_KEY];
+ if (access_key.type() != json_spirit::str_type) {
+ lderr(m_cct) << "failed to locate '" << ACCESS_KEY << "' key" << dendl;
+ on_finish->complete(-EINVAL);
+ return;
+ }
+
+ auto& secret_key = m_json_object[SECRET_KEY];
+ if (secret_key.type() != json_spirit::str_type) {
+ lderr(m_cct) << "failed to locate '" << SECRET_KEY << "' key" << dendl;
+ on_finish->complete(-EINVAL);
+ return;
+ }
+
+ m_url = url_value.get_str();
+
+ librados::Rados rados(m_image_ctx->md_ctx);
+ int r = 0;
+ m_access_key = access_key.get_str();
+ if (util::is_config_key_uri(m_access_key)) {
+ r = util::get_config_key(rados, m_access_key, &m_access_key);
+ if (r < 0) {
+ lderr(m_cct) << "failed to retrieve access key from config: "
+ << cpp_strerror(r) << dendl;
+ on_finish->complete(r);
+ return;
+ }
+ }
+
+ m_secret_key = secret_key.get_str();
+ if (util::is_config_key_uri(m_secret_key)) {
+ r = util::get_config_key(rados, m_secret_key, &m_secret_key);
+ if (r < 0) {
+ lderr(m_cct) << "failed to retrieve secret key from config: "
+ << cpp_strerror(r) << dendl;
+ on_finish->complete(r);
+ return;
+ }
+ }
+
+ ldout(m_cct, 10) << "url=" << m_url << ", "
+ << "access_key=" << m_access_key << dendl;
+
+ m_http_client.reset(HttpClient<I>::create(m_image_ctx, m_url));
+ m_http_client->set_http_processor(m_http_processor.get());
+ m_http_client->open(on_finish);
+}
+
+template <typename I>
+void S3Stream<I>::close(Context* on_finish) {
+ ldout(m_cct, 10) << dendl;
+
+ if (!m_http_client) {
+ on_finish->complete(0);
+ return;
+ }
+
+ m_http_client->close(on_finish);
+}
+
+template <typename I>
+void S3Stream<I>::get_size(uint64_t* size, Context* on_finish) {
+ ldout(m_cct, 10) << dendl;
+
+ m_http_client->get_size(size, on_finish);
+}
+
+template <typename I>
+void S3Stream<I>::read(io::Extents&& byte_extents, bufferlist* data,
+ Context* on_finish) {
+ ldout(m_cct, 20) << "byte_extents=" << byte_extents << dendl;
+
+ m_http_client->read(std::move(byte_extents), data, on_finish);
+}
+
+template <typename I>
+void S3Stream<I>::process_request(HttpRequest& http_request) {
+ ldout(m_cct, 20) << dendl;
+
+ // format RFC 1123 date/time
+ auto time = ceph::real_clock::to_time_t(ceph::real_clock::now());
+ struct tm timeInfo;
+ gmtime_r(&time, &timeInfo);
+
+ std::string date = fmt::format("{:%a, %d %b %Y %H:%M:%S %z}", timeInfo);
+ http_request.set(boost::beast::http::field::date, date);
+
+ // note: we don't support S3 subresources
+ std::string canonicalized_resource = std::string(http_request.target());
+
+ std::string string_to_sign = fmt::format(
+ "{}\n\n\n{}\n{}",
+ std::string(boost::beast::http::to_string(http_request.method())),
+ date, canonicalized_resource);
+
+ // create HMAC-SHA1 signature from secret key + string-to-sign
+ sha1_digest_t digest;
+ ceph::crypto::HMACSHA1 hmac(
+ reinterpret_cast<const unsigned char*>(m_secret_key.data()),
+ m_secret_key.size());
+ hmac.Update(reinterpret_cast<const unsigned char*>(string_to_sign.data()),
+ string_to_sign.size());
+ hmac.Final(reinterpret_cast<unsigned char*>(digest.v));
+
+ // base64 encode the result
+ char buf[64];
+ int r = ceph_armor(std::begin(buf), std::begin(buf) + sizeof(buf),
+ reinterpret_cast<const char *>(digest.v),
+ reinterpret_cast<const char *>(digest.v + digest.SIZE));
+ if (r < 0) {
+ ceph_abort("ceph_armor failed");
+ }
+
+ // store the access-key + signature in the HTTP authorization header
+ std::string signature = std::string(std::begin(buf), std::begin(buf) + r);
+ std::string authorization = fmt::format("AWS {}:{}", m_access_key, signature);
+ http_request.set(boost::beast::http::field::authorization, authorization);
+
+ ldout(m_cct, 20) << "string_to_sign=" << string_to_sign << ", "
+ << "authorization=" << authorization << dendl;
+}
+
+} // namespace migration
+} // namespace librbd
+
+template class librbd::migration::S3Stream<librbd::ImageCtx>;
diff --git a/src/librbd/migration/S3Stream.h b/src/librbd/migration/S3Stream.h
new file mode 100644
index 000000000..586b21787
--- /dev/null
+++ b/src/librbd/migration/S3Stream.h
@@ -0,0 +1,78 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#ifndef CEPH_LIBRBD_MIGRATION_S3_STREAM_H
+#define CEPH_LIBRBD_MIGRATION_S3_STREAM_H
+
+#include "include/int_types.h"
+#include "librbd/migration/StreamInterface.h"
+#include <boost/beast/http/empty_body.hpp>
+#include <boost/beast/http/message.hpp>
+#include <boost/beast/http/string_body.hpp>
+#include <json_spirit/json_spirit.h>
+#include <memory>
+#include <string>
+
+struct Context;
+
+namespace librbd {
+
+struct AsioEngine;
+struct ImageCtx;
+
+namespace migration {
+
+template <typename> class HttpClient;
+
+template <typename ImageCtxT>
+class S3Stream : public StreamInterface {
+public:
+ static S3Stream* create(ImageCtxT* image_ctx,
+ const json_spirit::mObject& json_object) {
+ return new S3Stream(image_ctx, json_object);
+ }
+
+ S3Stream(ImageCtxT* image_ctx, const json_spirit::mObject& json_object);
+ ~S3Stream() override;
+
+ S3Stream(const S3Stream&) = delete;
+ S3Stream& operator=(const S3Stream&) = delete;
+
+ void open(Context* on_finish) override;
+ void close(Context* on_finish) override;
+
+ void get_size(uint64_t* size, Context* on_finish) override;
+
+ void read(io::Extents&& byte_extents, bufferlist* data,
+ Context* on_finish) override;
+
+private:
+ using HttpRequest = boost::beast::http::request<
+ boost::beast::http::empty_body>;
+ using HttpResponse = boost::beast::http::response<
+ boost::beast::http::string_body>;
+
+ struct HttpProcessor;
+
+ ImageCtxT* m_image_ctx;
+ CephContext* m_cct;
+ std::shared_ptr<AsioEngine> m_asio_engine;
+ json_spirit::mObject m_json_object;
+
+ std::string m_url;
+ std::string m_access_key;
+ std::string m_secret_key;
+
+ std::unique_ptr<HttpProcessor> m_http_processor;
+ std::unique_ptr<HttpClient<ImageCtxT>> m_http_client;
+
+ void process_request(HttpRequest& http_request);
+
+};
+
+} // namespace migration
+} // namespace librbd
+
+extern template class librbd::migration::S3Stream<librbd::ImageCtx>;
+
+#endif // CEPH_LIBRBD_MIGRATION_S3_STREAM_H
diff --git a/src/librbd/migration/SnapshotInterface.h b/src/librbd/migration/SnapshotInterface.h
new file mode 100644
index 000000000..9990802c5
--- /dev/null
+++ b/src/librbd/migration/SnapshotInterface.h
@@ -0,0 +1,48 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#ifndef CEPH_LIBRBD_MIGRATION_SNAPSHOT_INTERFACE_H
+#define CEPH_LIBRBD_MIGRATION_SNAPSHOT_INTERFACE_H
+
+#include "include/buffer_fwd.h"
+#include "include/int_types.h"
+#include "common/zipkin_trace.h"
+#include "librbd/Types.h"
+#include "librbd/io/Types.h"
+#include <string>
+
+struct Context;
+
+namespace librbd {
+
+namespace io {
+struct AioCompletion;
+struct ReadResult;
+} // namespace io
+
+namespace migration {
+
+struct SnapshotInterface {
+ virtual ~SnapshotInterface() {
+ }
+
+ virtual void open(SnapshotInterface* previous_snapshot,
+ Context* on_finish) = 0;
+ virtual void close(Context* on_finish) = 0;
+
+ virtual const SnapInfo& get_snap_info() const = 0;
+
+ virtual void read(io::AioCompletion* aio_comp, io::Extents&& image_extents,
+ io::ReadResult&& read_result, int op_flags, int read_flags,
+ const ZTracer::Trace &parent_trace) = 0;
+
+ virtual void list_snap(io::Extents&& image_extents, int list_snaps_flags,
+ io::SparseExtents* sparse_extents,
+ const ZTracer::Trace &parent_trace,
+ Context* on_finish) = 0;
+};
+
+} // namespace migration
+} // namespace librbd
+
+#endif // CEPH_LIBRBD_MIGRATION_SNAPSHOT_INTERFACE_H
diff --git a/src/librbd/migration/SourceSpecBuilder.cc b/src/librbd/migration/SourceSpecBuilder.cc
new file mode 100644
index 000000000..214d7ce0e
--- /dev/null
+++ b/src/librbd/migration/SourceSpecBuilder.cc
@@ -0,0 +1,147 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include "librbd/migration/SourceSpecBuilder.h"
+#include "common/dout.h"
+#include "librbd/ImageCtx.h"
+#include "librbd/migration/FileStream.h"
+#include "librbd/migration/HttpStream.h"
+#include "librbd/migration/S3Stream.h"
+#include "librbd/migration/NativeFormat.h"
+#include "librbd/migration/QCOWFormat.h"
+#include "librbd/migration/RawFormat.h"
+#include "librbd/migration/RawSnapshot.h"
+
+#define dout_subsys ceph_subsys_rbd
+#undef dout_prefix
+#define dout_prefix *_dout << "librbd::migration::SourceSpecBuilder: " << this \
+ << " " << __func__ << ": "
+
+namespace librbd {
+namespace migration {
+
+namespace {
+
+const std::string STREAM_KEY{"stream"};
+const std::string TYPE_KEY{"type"};
+
+} // anonymous namespace
+
+template <typename I>
+int SourceSpecBuilder<I>::parse_source_spec(
+ const std::string& source_spec,
+ json_spirit::mObject* source_spec_object) const {
+ auto cct = m_image_ctx->cct;
+ ldout(cct, 10) << dendl;
+
+ json_spirit::mValue json_root;
+ if(json_spirit::read(source_spec, json_root)) {
+ try {
+ *source_spec_object = json_root.get_obj();
+ return 0;
+ } catch (std::runtime_error&) {
+ }
+ }
+
+ lderr(cct) << "invalid source-spec JSON" << dendl;
+ return -EBADMSG;
+}
+
+template <typename I>
+int SourceSpecBuilder<I>::build_format(
+ const json_spirit::mObject& source_spec_object, bool import_only,
+ std::unique_ptr<FormatInterface>* format) const {
+ auto cct = m_image_ctx->cct;
+ ldout(cct, 10) << dendl;
+
+ auto type_value_it = source_spec_object.find(TYPE_KEY);
+ if (type_value_it == source_spec_object.end() ||
+ type_value_it->second.type() != json_spirit::str_type) {
+ lderr(cct) << "failed to locate format type value" << dendl;
+ return -EINVAL;
+ }
+
+ auto& type = type_value_it->second.get_str();
+ if (type == "native") {
+ format->reset(NativeFormat<I>::create(m_image_ctx, source_spec_object,
+ import_only));
+ } else if (type == "qcow") {
+ format->reset(QCOWFormat<I>::create(m_image_ctx, source_spec_object, this));
+ } else if (type == "raw") {
+ format->reset(RawFormat<I>::create(m_image_ctx, source_spec_object, this));
+ } else {
+ lderr(cct) << "unknown or unsupported format type '" << type << "'"
+ << dendl;
+ return -ENOSYS;
+ }
+ return 0;
+}
+
+template <typename I>
+int SourceSpecBuilder<I>::build_snapshot(
+ const json_spirit::mObject& source_spec_object, uint64_t index,
+ std::shared_ptr<SnapshotInterface>* snapshot) const {
+ auto cct = m_image_ctx->cct;
+ ldout(cct, 10) << dendl;
+
+ auto type_value_it = source_spec_object.find(TYPE_KEY);
+ if (type_value_it == source_spec_object.end() ||
+ type_value_it->second.type() != json_spirit::str_type) {
+ lderr(cct) << "failed to locate snapshot type value" << dendl;
+ return -EINVAL;
+ }
+
+ auto& type = type_value_it->second.get_str();
+ if (type == "raw") {
+ snapshot->reset(RawSnapshot<I>::create(m_image_ctx, source_spec_object,
+ this, index));
+ } else {
+ lderr(cct) << "unknown or unsupported format type '" << type << "'"
+ << dendl;
+ return -ENOSYS;
+ }
+ return 0;
+}
+
+template <typename I>
+int SourceSpecBuilder<I>::build_stream(
+ const json_spirit::mObject& source_spec_object,
+ std::shared_ptr<StreamInterface>* stream) const {
+ auto cct = m_image_ctx->cct;
+ ldout(cct, 10) << dendl;
+
+ auto stream_value_it = source_spec_object.find(STREAM_KEY);
+ if (stream_value_it == source_spec_object.end() ||
+ stream_value_it->second.type() != json_spirit::obj_type) {
+ lderr(cct) << "failed to locate stream object" << dendl;
+ return -EINVAL;
+ }
+
+ auto& stream_obj = stream_value_it->second.get_obj();
+ auto type_value_it = stream_obj.find(TYPE_KEY);
+ if (type_value_it == stream_obj.end() ||
+ type_value_it->second.type() != json_spirit::str_type) {
+ lderr(cct) << "failed to locate stream type value" << dendl;
+ return -EINVAL;
+ }
+
+ auto& type = type_value_it->second.get_str();
+ if (type == "file") {
+ stream->reset(FileStream<I>::create(m_image_ctx, stream_obj));
+ } else if (type == "http") {
+ stream->reset(HttpStream<I>::create(m_image_ctx, stream_obj));
+ } else if (type == "s3") {
+ stream->reset(S3Stream<I>::create(m_image_ctx, stream_obj));
+ } else {
+ lderr(cct) << "unknown or unsupported stream type '" << type << "'"
+ << dendl;
+ return -ENOSYS;
+ }
+
+ return 0;
+}
+
+} // namespace migration
+} // namespace librbd
+
+template class librbd::migration::SourceSpecBuilder<librbd::ImageCtx>;
diff --git a/src/librbd/migration/SourceSpecBuilder.h b/src/librbd/migration/SourceSpecBuilder.h
new file mode 100644
index 000000000..191cb1cbd
--- /dev/null
+++ b/src/librbd/migration/SourceSpecBuilder.h
@@ -0,0 +1,54 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#ifndef CEPH_LIBRBD_MIGRATION_SOURCE_SPEC_BUILDER_H
+#define CEPH_LIBRBD_MIGRATION_SOURCE_SPEC_BUILDER_H
+
+#include "include/int_types.h"
+#include <json_spirit/json_spirit.h>
+#include <memory>
+#include <optional>
+#include <string>
+
+struct Context;
+
+namespace librbd {
+
+struct ImageCtx;
+
+namespace migration {
+
+struct FormatInterface;
+struct SnapshotInterface;
+struct StreamInterface;
+
+template <typename ImageCtxT>
+class SourceSpecBuilder {
+public:
+ SourceSpecBuilder(ImageCtxT* image_ctx) : m_image_ctx(image_ctx) {
+ }
+
+ int parse_source_spec(const std::string& source_spec,
+ json_spirit::mObject* source_spec_object) const;
+
+ int build_format(const json_spirit::mObject& format_object, bool import_only,
+ std::unique_ptr<FormatInterface>* format) const;
+
+ int build_snapshot(const json_spirit::mObject& source_spec_object,
+ uint64_t index,
+ std::shared_ptr<SnapshotInterface>* snapshot) const;
+
+ int build_stream(const json_spirit::mObject& source_spec_object,
+ std::shared_ptr<StreamInterface>* stream) const;
+
+private:
+ ImageCtxT* m_image_ctx;
+
+};
+
+} // namespace migration
+} // namespace librbd
+
+extern template class librbd::migration::SourceSpecBuilder<librbd::ImageCtx>;
+
+#endif // CEPH_LIBRBD_MIGRATION_SOURCE_SPEC_BUILDER_H
diff --git a/src/librbd/migration/StreamInterface.h b/src/librbd/migration/StreamInterface.h
new file mode 100644
index 000000000..782a9a5f8
--- /dev/null
+++ b/src/librbd/migration/StreamInterface.h
@@ -0,0 +1,32 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#ifndef CEPH_LIBRBD_MIGRATION_STREAM_INTERFACE_H
+#define CEPH_LIBRBD_MIGRATION_STREAM_INTERFACE_H
+
+#include "include/buffer_fwd.h"
+#include "include/int_types.h"
+#include "librbd/io/Types.h"
+
+struct Context;
+
+namespace librbd {
+namespace migration {
+
+struct StreamInterface {
+ virtual ~StreamInterface() {
+ }
+
+ virtual void open(Context* on_finish) = 0;
+ virtual void close(Context* on_finish) = 0;
+
+ virtual void get_size(uint64_t* size, Context* on_finish) = 0;
+
+ virtual void read(io::Extents&& byte_extents, bufferlist* data,
+ Context* on_finish) = 0;
+};
+
+} // namespace migration
+} // namespace librbd
+
+#endif // CEPH_LIBRBD_MIGRATION_STREAM_INTERFACE_H
diff --git a/src/librbd/migration/Types.h b/src/librbd/migration/Types.h
new file mode 100644
index 000000000..244dc28b7
--- /dev/null
+++ b/src/librbd/migration/Types.h
@@ -0,0 +1,42 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#ifndef CEPH_LIBRBD_MIGRATION_TYPES_H
+#define CEPH_LIBRBD_MIGRATION_TYPES_H
+
+#include <string>
+#include <utility>
+
+namespace librbd {
+namespace migration {
+
+enum UrlScheme {
+ URL_SCHEME_HTTP,
+ URL_SCHEME_HTTPS,
+};
+
+struct UrlSpec {
+ UrlSpec() {}
+ UrlSpec(UrlScheme scheme, const std::string& host, const std::string& port,
+ const std::string& path)
+ : scheme(scheme), host(host), port(port), path(path) {
+ }
+
+ UrlScheme scheme = URL_SCHEME_HTTP;
+ std::string host;
+ std::string port = "80";
+ std::string path = "/";
+
+};
+
+inline bool operator==(const UrlSpec& lhs, const UrlSpec& rhs) {
+ return (lhs.scheme == rhs.scheme &&
+ lhs.host == rhs.host &&
+ lhs.port == rhs.port &&
+ lhs.path == rhs.path);
+}
+
+} // namespace migration
+} // namespace librbd
+
+#endif // CEPH_LIBRBD_MIGRATION_TYPES_H
diff --git a/src/librbd/migration/Utils.cc b/src/librbd/migration/Utils.cc
new file mode 100644
index 000000000..c5c1279d8
--- /dev/null
+++ b/src/librbd/migration/Utils.cc
@@ -0,0 +1,133 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include "librbd/migration/Utils.h"
+#include "common/dout.h"
+#include "common/errno.h"
+#include <boost/lexical_cast.hpp>
+#include <regex>
+
+namespace librbd {
+namespace migration {
+namespace util {
+
+#define dout_subsys ceph_subsys_rbd
+#undef dout_prefix
+#define dout_prefix *_dout << "librbd::migration::util::" << __func__ << ": "
+
+int parse_url(CephContext* cct, const std::string& url, UrlSpec* url_spec) {
+ ldout(cct, 10) << "url=" << url << dendl;
+ *url_spec = UrlSpec{};
+
+ // parse the provided URL (scheme, user, password, host, port, path,
+ // parameters, query, and fragment)
+ std::regex url_regex(
+ R"(^(?:([^:/]*)://)?(?:(\w+)(?::(\w+))?@)?([^/;\?:#]+)(?::([^/;\?#]+))?)"
+ R"((?:/([^;\?#]*))?(?:;([^\?#]+))?(?:\?([^#]+))?(?:#(\w+))?$)");
+ std::smatch match;
+ if(!std::regex_match(url, match, url_regex)) {
+ lderr(cct) << "invalid url: '" << url << "'" << dendl;
+ return -EINVAL;
+ }
+
+ auto& scheme = match[1];
+ if (scheme == "http" || scheme == "") {
+ url_spec->scheme = URL_SCHEME_HTTP;
+ } else if (scheme == "https") {
+ url_spec->scheme = URL_SCHEME_HTTPS;
+ url_spec->port = "443";
+ } else {
+ lderr(cct) << "invalid url scheme: '" << url << "'" << dendl;
+ return -EINVAL;
+ }
+
+ url_spec->host = match[4];
+ auto& port = match[5];
+ if (port.matched) {
+ try {
+ boost::lexical_cast<uint16_t>(port);
+ } catch (boost::bad_lexical_cast&) {
+ lderr(cct) << "invalid url port: '" << url << "'" << dendl;
+ return -EINVAL;
+ }
+ url_spec->port = port;
+ }
+
+ auto& path = match[6];
+ if (path.matched) {
+ url_spec->path += path;
+ }
+ return 0;
+}
+
+void zero_shrunk_snapshot(CephContext* cct, const io::Extents& image_extents,
+ uint64_t snap_id, uint64_t new_size,
+ std::optional<uint64_t> *previous_size,
+ io::SparseExtents* sparse_extents) {
+ if (*previous_size && **previous_size > new_size) {
+ ldout(cct, 20) << "snapshot resize " << **previous_size << " -> "
+ << new_size << dendl;
+ interval_set<uint64_t> zero_interval;
+ zero_interval.insert(new_size, **previous_size - new_size);
+
+ for (auto& image_extent : image_extents) {
+ interval_set<uint64_t> image_interval;
+ image_interval.insert(image_extent.first, image_extent.second);
+
+ image_interval.intersection_of(zero_interval);
+ for (auto [image_offset, image_length] : image_interval) {
+ ldout(cct, 20) << "zeroing extent " << image_offset << "~"
+ << image_length << " at snapshot " << snap_id << dendl;
+ sparse_extents->insert(image_offset, image_length,
+ {io::SPARSE_EXTENT_STATE_ZEROED, image_length});
+ }
+ }
+ }
+ *previous_size = new_size;
+}
+
+void merge_snapshot_delta(const io::SnapIds& snap_ids,
+ io::SnapshotDelta* snapshot_delta) {
+ io::SnapshotDelta orig_snapshot_delta = std::move(*snapshot_delta);
+ snapshot_delta->clear();
+
+ auto snap_id_it = snap_ids.begin();
+ ceph_assert(snap_id_it != snap_ids.end());
+
+ // merge any snapshot intervals that were not requested
+ std::list<io::SparseExtents*> pending_sparse_extents;
+ for (auto& [snap_key, sparse_extents] : orig_snapshot_delta) {
+ // advance to next valid requested snap id
+ while (snap_id_it != snap_ids.end() && *snap_id_it < snap_key.first) {
+ ++snap_id_it;
+ }
+ if (snap_id_it == snap_ids.end()) {
+ break;
+ }
+
+ // loop through older write/read snapshot sparse extents to remove any
+ // overlaps with the current sparse extent
+ for (auto prev_sparse_extents : pending_sparse_extents) {
+ for (auto& sparse_extent : sparse_extents) {
+ prev_sparse_extents->erase(sparse_extent.get_off(),
+ sparse_extent.get_len());
+ }
+ }
+
+ auto write_read_snap_ids = std::make_pair(*snap_id_it, snap_key.second);
+ (*snapshot_delta)[write_read_snap_ids] = std::move(sparse_extents);
+
+ if (write_read_snap_ids.first > snap_key.first) {
+ // the current snapshot wasn't requested so it might need to get
+ // merged with a later snapshot
+ pending_sparse_extents.push_back(&(*snapshot_delta)[write_read_snap_ids]);
+ } else {
+ // we don't merge results passed a valid requested snapshot
+ pending_sparse_extents.clear();
+ }
+ }
+}
+
+} // namespace util
+} // namespace migration
+} // namespace librbd
diff --git a/src/librbd/migration/Utils.h b/src/librbd/migration/Utils.h
new file mode 100644
index 000000000..afbadde7d
--- /dev/null
+++ b/src/librbd/migration/Utils.h
@@ -0,0 +1,30 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#ifndef CEPH_LIBRBD_MIGRATION_UTILS_H
+#define CEPH_LIBRBD_MIGRATION_UTILS_H
+
+#include "include/common_fwd.h"
+#include "librbd/io/Types.h"
+#include "librbd/migration/Types.h"
+#include <optional>
+#include <string>
+
+namespace librbd {
+namespace migration {
+namespace util {
+
+int parse_url(CephContext* cct, const std::string& url, UrlSpec* url_spec);
+
+void zero_shrunk_snapshot(CephContext* cct, const io::Extents& image_extents,
+ uint64_t snap_id, uint64_t new_size,
+ std::optional<uint64_t> *previous_size,
+ io::SparseExtents* sparse_extents);
+void merge_snapshot_delta(const io::SnapIds& snap_ids,
+ io::SnapshotDelta* snapshot_delta);
+
+} // namespace util
+} // namespace migration
+} // namespace librbd
+
+#endif // CEPH_LIBRBD_MIGRATION_UTILS_H