diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-07 18:45:59 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-07 18:45:59 +0000 |
commit | 19fcec84d8d7d21e796c7624e521b60d28ee21ed (patch) | |
tree | 42d26aa27d1e3f7c0b8bd3fd14e7d7082f5008dc /src/librbd/migration | |
parent | Initial commit. (diff) | |
download | ceph-upstream.tar.xz ceph-upstream.zip |
Adding upstream version 16.2.11+ds.upstream/16.2.11+dsupstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to '')
30 files changed, 6277 insertions, 0 deletions
diff --git a/src/librbd/migration/FileStream.cc b/src/librbd/migration/FileStream.cc new file mode 100644 index 000000000..63cd722dd --- /dev/null +++ b/src/librbd/migration/FileStream.cc @@ -0,0 +1,232 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#ifndef _LARGEFILE64_SOURCE +#define _LARGEFILE64_SOURCE +#endif // _LARGEFILE64_SOURCE + +#include "librbd/migration/FileStream.h" +#include "common/dout.h" +#include "common/errno.h" +#include "librbd/AsioEngine.h" +#include "librbd/ImageCtx.h" +#include "librbd/asio/Utils.h" +#include <boost/asio/buffer.hpp> +#include <boost/asio/post.hpp> +#include <boost/asio/read.hpp> +#include <fcntl.h> +#include <unistd.h> + +namespace librbd { +namespace migration { + +namespace { + +const std::string FILE_PATH {"file_path"}; + +} // anonymous namespace + +#ifdef BOOST_ASIO_HAS_POSIX_STREAM_DESCRIPTOR + +#define dout_subsys ceph_subsys_rbd +#undef dout_prefix +#define dout_prefix *_dout << "librbd::migration::FileStream::ReadRequest " \ + << this << " " << __func__ << ": " + +template <typename I> +struct FileStream<I>::ReadRequest { + FileStream* file_stream; + io::Extents byte_extents; + bufferlist* data; + Context* on_finish; + size_t index = 0; + + ReadRequest(FileStream* file_stream, io::Extents&& byte_extents, + bufferlist* data, Context* on_finish) + : file_stream(file_stream), byte_extents(std::move(byte_extents)), + data(data), on_finish(on_finish) { + auto cct = file_stream->m_cct; + ldout(cct, 20) << dendl; + } + + void send() { + data->clear(); + read(); + } + + void read() { + auto cct = file_stream->m_cct; + if (index >= byte_extents.size()) { + finish(0); + return; + } + + auto& byte_extent = byte_extents[index++]; + ldout(cct, 20) << "byte_extent=" << byte_extent << dendl; + + auto ptr = buffer::ptr_node::create(buffer::create_small_page_aligned( + byte_extent.second)); + auto buffer = boost::asio::mutable_buffer( + ptr->c_str(), byte_extent.second); + data->push_back(std::move(ptr)); + + int r; + auto offset = lseek64(file_stream->m_file_no, byte_extent.first, SEEK_SET); + if (offset == -1) { + r = -errno; + lderr(cct) << "failed to seek file stream: " << cpp_strerror(r) << dendl; + finish(r); + return; + } + + boost::system::error_code ec; + size_t bytes_read = boost::asio::read( + *file_stream->m_stream_descriptor, std::move(buffer), ec); + r = -ec.value(); + if (r < 0 && r != -ENOENT) { + lderr(cct) << "failed to read from file stream: " << cpp_strerror(r) + << dendl; + finish(r); + return; + } else if (bytes_read < byte_extent.second) { + lderr(cct) << "failed to read " << byte_extent.second << " bytes from " + << "file stream" << dendl; + finish(-ERANGE); + return; + } + + // re-queue the remainder of the read requests + boost::asio::post(file_stream->m_strand, [this]() { read(); }); + } + + void finish(int r) { + auto cct = file_stream->m_cct; + ldout(cct, 20) << "r=" << r << dendl; + + if (r < 0) { + data->clear(); + } + + on_finish->complete(r); + delete this; + } +}; + +#endif // BOOST_ASIO_HAS_POSIX_STREAM_DESCRIPTOR + +#undef dout_prefix +#define dout_prefix *_dout << "librbd::migration::FileStream: " << this \ + << " " << __func__ << ": " + +template <typename I> +FileStream<I>::FileStream(I* image_ctx, const json_spirit::mObject& json_object) + : m_cct(image_ctx->cct), m_asio_engine(image_ctx->asio_engine), + m_json_object(json_object), + m_strand(boost::asio::make_strand(*m_asio_engine)) { +} + +template <typename I> +FileStream<I>::~FileStream() { + if (m_file_no != -1) { + ::close(m_file_no); + } +} + +#ifdef BOOST_ASIO_HAS_POSIX_STREAM_DESCRIPTOR + +template <typename I> +void FileStream<I>::open(Context* on_finish) { + auto& file_path_value = m_json_object[FILE_PATH]; + if (file_path_value.type() != json_spirit::str_type) { + lderr(m_cct) << "failed to locate '" << FILE_PATH << "' key" << dendl; + on_finish->complete(-EINVAL); + return; + } + + auto& file_path = file_path_value.get_str(); + ldout(m_cct, 10) << "file_path=" << file_path << dendl; + + m_file_no = ::open(file_path.c_str(), O_RDONLY); + if (m_file_no < 0) { + int r = -errno; + lderr(m_cct) << "failed to open file stream '" << file_path << "': " + << cpp_strerror(r) << dendl; + on_finish->complete(r); + return; + } + + m_stream_descriptor = std::make_optional< + boost::asio::posix::stream_descriptor>(m_strand, m_file_no); + on_finish->complete(0); +} + +template <typename I> +void FileStream<I>::close(Context* on_finish) { + ldout(m_cct, 10) << dendl; + + m_stream_descriptor.reset(); + on_finish->complete(0); +} + +template <typename I> +void FileStream<I>::get_size(uint64_t* size, Context* on_finish) { + ldout(m_cct, 10) << dendl; + + // execute IO operations in a single strand to prevent seek races + boost::asio::post( + m_strand, [this, size, on_finish]() { + auto offset = lseek64(m_file_no, 0, SEEK_END); + if (offset == -1) { + int r = -errno; + lderr(m_cct) << "failed to seek to file end: " << cpp_strerror(r) + << dendl; + on_finish->complete(r); + return; + } + + ldout(m_cct, 10) << "size=" << offset << dendl; + *size = offset; + on_finish->complete(0); + }); +} + +template <typename I> +void FileStream<I>::read(io::Extents&& byte_extents, bufferlist* data, + Context* on_finish) { + ldout(m_cct, 20) << byte_extents << dendl; + + auto ctx = new ReadRequest(this, std::move(byte_extents), data, on_finish); + + // execute IO operations in a single strand to prevent seek races + boost::asio::post(m_strand, [ctx]() { ctx->send(); }); +} + +#else // BOOST_ASIO_HAS_POSIX_STREAM_DESCRIPTOR + +template <typename I> +void FileStream<I>::open(Context* on_finish) { + on_finish->complete(-EIO); +} + +template <typename I> +void FileStream<I>::close(Context* on_finish) { + on_finish->complete(-EIO); +} + +template <typename I> +void FileStream<I>::get_size(uint64_t* size, Context* on_finish) { + on_finish->complete(-EIO); +} + +template <typename I> +void FileStream<I>::read(io::Extents&& byte_extents, bufferlist* data, + Context* on_finish) { + on_finish->complete(-EIO); +} + +#endif // BOOST_ASIO_HAS_POSIX_STREAM_DESCRIPTOR + +} // namespace migration +} // namespace librbd + +template class librbd::migration::FileStream<librbd::ImageCtx>; diff --git a/src/librbd/migration/FileStream.h b/src/librbd/migration/FileStream.h new file mode 100644 index 000000000..32face71e --- /dev/null +++ b/src/librbd/migration/FileStream.h @@ -0,0 +1,68 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#ifndef CEPH_LIBRBD_MIGRATION_FILE_STREAM_H +#define CEPH_LIBRBD_MIGRATION_FILE_STREAM_H + +#include "include/int_types.h" +#include "librbd/migration/StreamInterface.h" +#include <boost/asio/io_context.hpp> +#include <boost/asio/strand.hpp> +#include <boost/asio/posix/basic_stream_descriptor.hpp> +#include <json_spirit/json_spirit.h> +#include <memory> +#include <string> + +struct Context; + +namespace librbd { + +struct AsioEngine; +struct ImageCtx; + +namespace migration { + +template <typename ImageCtxT> +class FileStream : public StreamInterface { +public: + static FileStream* create(ImageCtxT* image_ctx, + const json_spirit::mObject& json_object) { + return new FileStream(image_ctx, json_object); + } + + FileStream(ImageCtxT* image_ctx, const json_spirit::mObject& json_object); + ~FileStream() override; + + FileStream(const FileStream&) = delete; + FileStream& operator=(const FileStream&) = delete; + + void open(Context* on_finish) override; + void close(Context* on_finish) override; + + void get_size(uint64_t* size, Context* on_finish) override; + + void read(io::Extents&& byte_extents, bufferlist* data, + Context* on_finish) override; + +private: + CephContext* m_cct; + std::shared_ptr<AsioEngine> m_asio_engine; + json_spirit::mObject m_json_object; + + boost::asio::strand<boost::asio::io_context::executor_type> m_strand; +#ifdef BOOST_ASIO_HAS_POSIX_STREAM_DESCRIPTOR + std::optional<boost::asio::posix::stream_descriptor> m_stream_descriptor; + + struct ReadRequest; + +#endif // BOOST_ASIO_HAS_POSIX_STREAM_DESCRIPTOR + + int m_file_no = -1; +}; + +} // namespace migration +} // namespace librbd + +extern template class librbd::migration::FileStream<librbd::ImageCtx>; + +#endif // CEPH_LIBRBD_MIGRATION_FILE_STREAM_H diff --git a/src/librbd/migration/FormatInterface.h b/src/librbd/migration/FormatInterface.h new file mode 100644 index 000000000..d13521d11 --- /dev/null +++ b/src/librbd/migration/FormatInterface.h @@ -0,0 +1,53 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#ifndef CEPH_LIBRBD_MIGRATION_FORMAT_INTERFACE_H +#define CEPH_LIBRBD_MIGRATION_FORMAT_INTERFACE_H + +#include "include/buffer_fwd.h" +#include "include/int_types.h" +#include "common/zipkin_trace.h" +#include "librbd/Types.h" +#include "librbd/io/Types.h" +#include <map> + +struct Context; + +namespace librbd { + +namespace io { +struct AioCompletion; +struct ReadResult; +} // namespace io + +namespace migration { + +struct FormatInterface { + typedef std::map<uint64_t, SnapInfo> SnapInfos; + + virtual ~FormatInterface() { + } + + virtual void open(Context* on_finish) = 0; + virtual void close(Context* on_finish) = 0; + + virtual void get_snapshots(SnapInfos* snap_infos, Context* on_finish) = 0; + virtual void get_image_size(uint64_t snap_id, uint64_t* size, + Context* on_finish) = 0; + + virtual bool read(io::AioCompletion* aio_comp, uint64_t snap_id, + io::Extents&& image_extents, io::ReadResult&& read_result, + int op_flags, int read_flags, + const ZTracer::Trace &parent_trace) = 0; + + virtual void list_snaps(io::Extents&& image_extents, io::SnapIds&& snap_ids, + int list_snaps_flags, + io::SnapshotDelta* snapshot_delta, + const ZTracer::Trace &parent_trace, + Context* on_finish) = 0; +}; + +} // namespace migration +} // namespace librbd + +#endif // CEPH_LIBRBD_MIGRATION_FORMAT_INTERFACE_H diff --git a/src/librbd/migration/HttpClient.cc b/src/librbd/migration/HttpClient.cc new file mode 100644 index 000000000..679c2bb07 --- /dev/null +++ b/src/librbd/migration/HttpClient.cc @@ -0,0 +1,946 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#include "librbd/migration/HttpClient.h" +#include "common/dout.h" +#include "common/errno.h" +#include "librbd/AsioEngine.h" +#include "librbd/ImageCtx.h" +#include "librbd/Utils.h" +#include "librbd/asio/Utils.h" +#include "librbd/io/AioCompletion.h" +#include "librbd/io/ReadResult.h" +#include "librbd/migration/Utils.h" +#include <boost/asio/buffer.hpp> +#include <boost/asio/post.hpp> +#include <boost/asio/ip/tcp.hpp> +#include <boost/asio/read.hpp> +#include <boost/asio/ssl.hpp> +#include <boost/beast/core.hpp> +#include <boost/beast/http/read.hpp> +#include <boost/lexical_cast.hpp> +#include <deque> + +namespace librbd { +namespace migration { + +#define dout_subsys ceph_subsys_rbd +#undef dout_prefix +#define dout_prefix *_dout << "librbd::migration::HttpClient::" \ + << "HttpSession " << this << " " << __func__ \ + << ": " + +/** + * boost::beast utilizes non-inheriting template classes for handling plain vs + * encrypted TCP streams. Utilize a base-class for handling the majority of the + * logic for handling connecting, disconnecting, reseting, and sending requests. + */ + +template <typename I> +template <typename D> +class HttpClient<I>::HttpSession : public HttpSessionInterface { +public: + void init(Context* on_finish) override { + ceph_assert(m_http_client->m_strand.running_in_this_thread()); + + auto cct = m_http_client->m_cct; + ldout(cct, 15) << dendl; + + ceph_assert(m_state == STATE_UNINITIALIZED); + m_state = STATE_CONNECTING; + + resolve_host(on_finish); + } + + void shut_down(Context* on_finish) override { + ceph_assert(m_http_client->m_strand.running_in_this_thread()); + + auto cct = m_http_client->m_cct; + ldout(cct, 15) << dendl; + + ceph_assert(on_finish != nullptr); + ceph_assert(m_on_shutdown == nullptr); + m_on_shutdown = on_finish; + + auto current_state = m_state; + if (current_state == STATE_UNINITIALIZED) { + // never initialized or resolve/connect failed + on_finish->complete(0); + return; + } + + m_state = STATE_SHUTTING_DOWN; + if (current_state != STATE_READY) { + // delay shutdown until current state transition completes + return; + } + + disconnect(new LambdaContext([this](int r) { handle_shut_down(r); })); + } + + void issue(std::shared_ptr<Work>&& work) override { + ceph_assert(m_http_client->m_strand.running_in_this_thread()); + + auto cct = m_http_client->m_cct; + ldout(cct, 20) << "work=" << work.get() << dendl; + + if (is_shutdown()) { + lderr(cct) << "cannot issue HTTP request, client is shutdown" + << dendl; + work->complete(-ESHUTDOWN, {}); + return; + } + + bool first_issue = m_issue_queue.empty(); + m_issue_queue.emplace_back(work); + if (m_state == STATE_READY && first_issue) { + ldout(cct, 20) << "sending http request: work=" << work.get() << dendl; + finalize_issue(std::move(work)); + } else if (m_state == STATE_UNINITIALIZED) { + ldout(cct, 20) << "resetting HTTP session: work=" << work.get() << dendl; + m_state = STATE_RESET_CONNECTING; + resolve_host(nullptr); + } else { + ldout(cct, 20) << "queueing HTTP request: work=" << work.get() << dendl; + } + } + + void finalize_issue(std::shared_ptr<Work>&& work) { + auto cct = m_http_client->m_cct; + ldout(cct, 20) << "work=" << work.get() << dendl; + + ++m_in_flight_requests; + (*work)(derived().stream()); + } + + void handle_issue(boost::system::error_code ec, + std::shared_ptr<Work>&& work) override { + ceph_assert(m_http_client->m_strand.running_in_this_thread()); + + auto cct = m_http_client->m_cct; + ldout(cct, 20) << "work=" << work.get() << ", r=" << -ec.value() << dendl; + + ceph_assert(m_in_flight_requests > 0); + --m_in_flight_requests; + if (maybe_finalize_reset()) { + // previous request is attempting reset to this request will be resent + return; + } + + ceph_assert(!m_issue_queue.empty()); + m_issue_queue.pop_front(); + + if (is_shutdown()) { + lderr(cct) << "client shutdown during in-flight request" << dendl; + work->complete(-ESHUTDOWN, {}); + + maybe_finalize_shutdown(); + return; + } + + if (ec) { + if (ec == boost::asio::error::bad_descriptor || + ec == boost::asio::error::broken_pipe || + ec == boost::asio::error::connection_reset || + ec == boost::asio::error::operation_aborted || + ec == boost::asio::ssl::error::stream_truncated || + ec == boost::beast::http::error::end_of_stream || + ec == boost::beast::http::error::partial_message) { + ldout(cct, 5) << "remote peer stream closed, retrying request" << dendl; + m_issue_queue.push_front(work); + } else if (ec == boost::beast::error::timeout) { + lderr(cct) << "timed-out while issuing request" << dendl; + work->complete(-ETIMEDOUT, {}); + } else { + lderr(cct) << "failed to issue request: " << ec.message() << dendl; + work->complete(-ec.value(), {}); + } + + // attempt to recover the connection + reset(); + return; + } + + bool first_receive = m_receive_queue.empty(); + m_receive_queue.push_back(work); + if (first_receive) { + receive(std::move(work)); + } + + // TODO disable pipelining for non-idempotent requests + + // pipeline the next request into the stream + if (!m_issue_queue.empty()) { + work = m_issue_queue.front(); + ldout(cct, 20) << "sending http request: work=" << work.get() << dendl; + finalize_issue(std::move(work)); + } + } + +protected: + HttpClient* m_http_client; + + HttpSession(HttpClient* http_client) + : m_http_client(http_client), m_resolver(http_client->m_strand) { + } + + virtual void connect(boost::asio::ip::tcp::resolver::results_type results, + Context* on_finish) = 0; + virtual void disconnect(Context* on_finish) = 0; + + void close_socket() { + auto cct = m_http_client->m_cct; + ldout(cct, 15) << dendl; + + boost::system::error_code ec; + boost::beast::get_lowest_layer(derived().stream()).socket().close(ec); + } + +private: + enum State { + STATE_UNINITIALIZED, + STATE_CONNECTING, + STATE_READY, + STATE_RESET_PENDING, + STATE_RESET_DISCONNECTING, + STATE_RESET_CONNECTING, + STATE_SHUTTING_DOWN, + STATE_SHUTDOWN, + }; + + State m_state = STATE_UNINITIALIZED; + boost::asio::ip::tcp::resolver m_resolver; + + Context* m_on_shutdown = nullptr; + + uint64_t m_in_flight_requests = 0; + std::deque<std::shared_ptr<Work>> m_issue_queue; + std::deque<std::shared_ptr<Work>> m_receive_queue; + + boost::beast::flat_buffer m_buffer; + std::optional<boost::beast::http::parser<false, EmptyBody>> m_header_parser; + std::optional<boost::beast::http::parser<false, StringBody>> m_parser; + + D& derived() { + return static_cast<D&>(*this); + } + + void resolve_host(Context* on_finish) { + auto cct = m_http_client->m_cct; + ldout(cct, 15) << dendl; + + shutdown_socket(); + m_resolver.async_resolve( + m_http_client->m_url_spec.host, m_http_client->m_url_spec.port, + [this, on_finish](boost::system::error_code ec, auto results) { + handle_resolve_host(ec, results, on_finish); }); + } + + void handle_resolve_host( + boost::system::error_code ec, + boost::asio::ip::tcp::resolver::results_type results, + Context* on_finish) { + auto cct = m_http_client->m_cct; + int r = -ec.value(); + ldout(cct, 15) << "r=" << r << dendl; + + if (ec) { + if (ec == boost::asio::error::host_not_found) { + r = -ENOENT; + } else if (ec == boost::asio::error::host_not_found_try_again) { + // TODO: add retry throttle + r = -EAGAIN; + } + + lderr(cct) << "failed to resolve host '" + << m_http_client->m_url_spec.host << "': " + << cpp_strerror(r) << dendl; + advance_state(STATE_UNINITIALIZED, r, on_finish); + return; + } + + connect(results, new LambdaContext([this, on_finish](int r) { + handle_connect(r, on_finish); })); + } + + void handle_connect(int r, Context* on_finish) { + auto cct = m_http_client->m_cct; + ldout(cct, 15) << "r=" << r << dendl; + + if (r < 0) { + lderr(cct) << "failed to connect to host '" + << m_http_client->m_url_spec.host << "': " + << cpp_strerror(r) << dendl; + advance_state(STATE_UNINITIALIZED, r, on_finish); + return; + } + + advance_state(STATE_READY, 0, on_finish); + } + + void handle_shut_down(int r) { + auto cct = m_http_client->m_cct; + ldout(cct, 15) << "r=" << r << dendl; + + if (r < 0) { + lderr(cct) << "failed to disconnect stream: '" << cpp_strerror(r) + << dendl; + } + + // cancel all in-flight send/receives (if any) + shutdown_socket(); + + maybe_finalize_shutdown(); + } + + void maybe_finalize_shutdown() { + if (m_in_flight_requests > 0) { + return; + } + + // cancel any queued IOs + fail_queued_work(-ESHUTDOWN); + + advance_state(STATE_SHUTDOWN, 0, nullptr); + } + + bool is_shutdown() const { + ceph_assert(m_http_client->m_strand.running_in_this_thread()); + return (m_state == STATE_SHUTTING_DOWN || m_state == STATE_SHUTDOWN); + } + + void reset() { + ceph_assert(m_http_client->m_strand.running_in_this_thread()); + ceph_assert(m_state == STATE_READY); + + auto cct = m_http_client->m_cct; + ldout(cct, 15) << dendl; + + m_state = STATE_RESET_PENDING; + maybe_finalize_reset(); + } + + bool maybe_finalize_reset() { + if (m_state != STATE_RESET_PENDING) { + return false; + } + + if (m_in_flight_requests > 0) { + return true; + } + + ceph_assert(m_http_client->m_strand.running_in_this_thread()); + auto cct = m_http_client->m_cct; + ldout(cct, 15) << dendl; + + m_buffer.clear(); + + // move in-flight request back to the front of the issue queue + m_issue_queue.insert(m_issue_queue.begin(), + m_receive_queue.begin(), m_receive_queue.end()); + m_receive_queue.clear(); + + m_state = STATE_RESET_DISCONNECTING; + disconnect(new LambdaContext([this](int r) { handle_reset(r); })); + return true; + } + + void handle_reset(int r) { + auto cct = m_http_client->m_cct; + ldout(cct, 15) << "r=" << r << dendl; + + if (r < 0) { + lderr(cct) << "failed to disconnect stream: '" << cpp_strerror(r) + << dendl; + } + + advance_state(STATE_RESET_CONNECTING, r, nullptr); + } + + int shutdown_socket() { + if (!boost::beast::get_lowest_layer( + derived().stream()).socket().is_open()) { + return 0; + } + + auto cct = m_http_client->m_cct; + ldout(cct, 15) << dendl; + + boost::system::error_code ec; + boost::beast::get_lowest_layer(derived().stream()).socket().shutdown( + boost::asio::ip::tcp::socket::shutdown_both, ec); + + if (ec && ec != boost::beast::errc::not_connected) { + lderr(cct) << "failed to shutdown socket: " << ec.message() << dendl; + return -ec.value(); + } + + close_socket(); + return 0; + } + + void receive(std::shared_ptr<Work>&& work) { + auto cct = m_http_client->m_cct; + ldout(cct, 15) << "work=" << work.get() << dendl; + + ceph_assert(!m_receive_queue.empty()); + ++m_in_flight_requests; + + // receive the response for this request + m_parser.emplace(); + if (work->header_only()) { + // HEAD requests don't trasfer data but the parser still cares about max + // content-length + m_header_parser.emplace(); + m_header_parser->body_limit(std::numeric_limits<uint64_t>::max()); + + boost::beast::http::async_read_header( + derived().stream(), m_buffer, *m_header_parser, + [this, work=std::move(work)] + (boost::beast::error_code ec, std::size_t) mutable { + handle_receive(ec, std::move(work)); + }); + } else { + m_parser->body_limit(1 << 25); // max RBD object size + boost::beast::http::async_read( + derived().stream(), m_buffer, *m_parser, + [this, work=std::move(work)] + (boost::beast::error_code ec, std::size_t) mutable { + handle_receive(ec, std::move(work)); + }); + } + } + + void handle_receive(boost::system::error_code ec, + std::shared_ptr<Work>&& work) { + auto cct = m_http_client->m_cct; + ldout(cct, 15) << "work=" << work.get() << ", r=" << -ec.value() << dendl; + + ceph_assert(m_in_flight_requests > 0); + --m_in_flight_requests; + if (maybe_finalize_reset()) { + // previous request is attempting reset to this request will be resent + return; + } + + ceph_assert(!m_receive_queue.empty()); + m_receive_queue.pop_front(); + + if (is_shutdown()) { + lderr(cct) << "client shutdown with in-flight request" << dendl; + work->complete(-ESHUTDOWN, {}); + + maybe_finalize_shutdown(); + return; + } + + if (ec) { + if (ec == boost::asio::error::bad_descriptor || + ec == boost::asio::error::broken_pipe || + ec == boost::asio::error::connection_reset || + ec == boost::asio::error::operation_aborted || + ec == boost::asio::ssl::error::stream_truncated || + ec == boost::beast::http::error::end_of_stream || + ec == boost::beast::http::error::partial_message) { + ldout(cct, 5) << "remote peer stream closed, retrying request" << dendl; + m_receive_queue.push_front(work); + } else if (ec == boost::beast::error::timeout) { + lderr(cct) << "timed-out while issuing request" << dendl; + work->complete(-ETIMEDOUT, {}); + } else { + lderr(cct) << "failed to issue request: " << ec.message() << dendl; + work->complete(-ec.value(), {}); + } + + reset(); + return; + } + + Response response; + if (work->header_only()) { + m_parser.emplace(std::move(*m_header_parser)); + } + response = m_parser->release(); + + // basic response code handling in a common location + int r = 0; + auto result = response.result(); + if (result == boost::beast::http::status::not_found) { + lderr(cct) << "requested resource does not exist" << dendl; + r = -ENOENT; + } else if (result == boost::beast::http::status::forbidden) { + lderr(cct) << "permission denied attempting to access resource" << dendl; + r = -EACCES; + } else if (boost::beast::http::to_status_class(result) != + boost::beast::http::status_class::successful) { + lderr(cct) << "failed to retrieve size: HTTP " << result << dendl; + r = -EIO; + } + + bool need_eof = response.need_eof(); + if (r < 0) { + work->complete(r, {}); + } else { + work->complete(0, std::move(response)); + } + + if (need_eof) { + ldout(cct, 20) << "reset required for non-pipelined response: " + << "work=" << work.get() << dendl; + reset(); + } else if (!m_receive_queue.empty()) { + auto work = m_receive_queue.front(); + receive(std::move(work)); + } + } + + void advance_state(State next_state, int r, Context* on_finish) { + auto cct = m_http_client->m_cct; + auto current_state = m_state; + ldout(cct, 15) << "current_state=" << current_state << ", " + << "next_state=" << next_state << ", " + << "r=" << r << dendl; + + m_state = next_state; + if (current_state == STATE_CONNECTING) { + if (next_state == STATE_UNINITIALIZED) { + shutdown_socket(); + on_finish->complete(r); + return; + } else if (next_state == STATE_READY) { + on_finish->complete(r); + return; + } + } else if (current_state == STATE_SHUTTING_DOWN) { + if (next_state == STATE_READY) { + // shut down requested while connecting/resetting + disconnect(new LambdaContext([this](int r) { handle_shut_down(r); })); + return; + } else if (next_state == STATE_UNINITIALIZED || + next_state == STATE_SHUTDOWN || + next_state == STATE_RESET_CONNECTING) { + ceph_assert(m_on_shutdown != nullptr); + m_on_shutdown->complete(r); + return; + } + } else if (current_state == STATE_RESET_DISCONNECTING) { + // disconnected from peer -- ignore errors and reconnect + ceph_assert(next_state == STATE_RESET_CONNECTING); + ceph_assert(on_finish == nullptr); + shutdown_socket(); + resolve_host(nullptr); + return; + } else if (current_state == STATE_RESET_CONNECTING) { + ceph_assert(on_finish == nullptr); + if (next_state == STATE_READY) { + // restart queued IO + if (!m_issue_queue.empty()) { + auto& work = m_issue_queue.front(); + finalize_issue(std::move(work)); + } + return; + } else if (next_state == STATE_UNINITIALIZED) { + shutdown_socket(); + + // fail all queued IO + fail_queued_work(r); + return; + } + } + + lderr(cct) << "unexpected state transition: " + << "current_state=" << current_state << ", " + << "next_state=" << next_state << dendl; + ceph_assert(false); + } + + void complete_work(std::shared_ptr<Work> work, int r, Response&& response) { + auto cct = m_http_client->m_cct; + ldout(cct, 20) << "work=" << work.get() << ", r=" << r << dendl; + + work->complete(r, std::move(response)); + } + + void fail_queued_work(int r) { + auto cct = m_http_client->m_cct; + ldout(cct, 10) << "r=" << r << dendl; + + for (auto& work : m_issue_queue) { + complete_work(work, r, {}); + } + m_issue_queue.clear(); + ceph_assert(m_receive_queue.empty()); + } +}; + +#undef dout_prefix +#define dout_prefix *_dout << "librbd::migration::HttpClient::" \ + << "PlainHttpSession " << this << " " << __func__ \ + << ": " + +template <typename I> +class HttpClient<I>::PlainHttpSession : public HttpSession<PlainHttpSession> { +public: + PlainHttpSession(HttpClient* http_client) + : HttpSession<PlainHttpSession>(http_client), + m_stream(http_client->m_strand) { + } + ~PlainHttpSession() override { + this->close_socket(); + } + + inline boost::beast::tcp_stream& + stream() { + return m_stream; + } + +protected: + void connect(boost::asio::ip::tcp::resolver::results_type results, + Context* on_finish) override { + auto http_client = this->m_http_client; + auto cct = http_client->m_cct; + ldout(cct, 15) << dendl; + + m_stream.async_connect( + results, + asio::util::get_callback_adapter( + [on_finish](int r, auto endpoint) { on_finish->complete(r); })); + } + + void disconnect(Context* on_finish) override { + on_finish->complete(0); + } + +private: + boost::beast::tcp_stream m_stream; + +}; + +#undef dout_prefix +#define dout_prefix *_dout << "librbd::migration::HttpClient::" \ + << "SslHttpSession " << this << " " << __func__ \ + << ": " + +template <typename I> +class HttpClient<I>::SslHttpSession : public HttpSession<SslHttpSession> { +public: + SslHttpSession(HttpClient* http_client) + : HttpSession<SslHttpSession>(http_client), + m_stream(http_client->m_strand, http_client->m_ssl_context) { + } + ~SslHttpSession() override { + this->close_socket(); + } + + inline boost::beast::ssl_stream<boost::beast::tcp_stream>& + stream() { + return m_stream; + } + +protected: + void connect(boost::asio::ip::tcp::resolver::results_type results, + Context* on_finish) override { + auto http_client = this->m_http_client; + auto cct = http_client->m_cct; + ldout(cct, 15) << dendl; + + boost::beast::get_lowest_layer(m_stream).async_connect( + results, + asio::util::get_callback_adapter( + [this, on_finish](int r, auto endpoint) { + handle_connect(r, on_finish); })); + } + + void disconnect(Context* on_finish) override { + auto http_client = this->m_http_client; + auto cct = http_client->m_cct; + ldout(cct, 15) << dendl; + + if (!m_ssl_enabled) { + on_finish->complete(0); + return; + } + + m_stream.async_shutdown( + asio::util::get_callback_adapter([this, on_finish](int r) { + shutdown(r, on_finish); })); + } + +private: + boost::beast::ssl_stream<boost::beast::tcp_stream> m_stream; + bool m_ssl_enabled = false; + + void handle_connect(int r, Context* on_finish) { + auto http_client = this->m_http_client; + auto cct = http_client->m_cct; + ldout(cct, 15) << dendl; + + if (r < 0) { + lderr(cct) << "failed to connect to host '" + << http_client->m_url_spec.host << "': " + << cpp_strerror(r) << dendl; + on_finish->complete(r); + return; + } + + handshake(on_finish); + } + + void handshake(Context* on_finish) { + auto http_client = this->m_http_client; + auto cct = http_client->m_cct; + ldout(cct, 15) << dendl; + + auto& host = http_client->m_url_spec.host; + m_stream.set_verify_mode( + boost::asio::ssl::verify_peer | + boost::asio::ssl::verify_fail_if_no_peer_cert); + m_stream.set_verify_callback( + [host, next=boost::asio::ssl::host_name_verification(host), + ignore_self_signed=http_client->m_ignore_self_signed_cert] + (bool preverified, boost::asio::ssl::verify_context& ctx) { + if (!preverified && ignore_self_signed) { + auto ec = X509_STORE_CTX_get_error(ctx.native_handle()); + switch (ec) { + case X509_V_ERR_DEPTH_ZERO_SELF_SIGNED_CERT: + case X509_V_ERR_SELF_SIGNED_CERT_IN_CHAIN: + // ignore self-signed cert issues + preverified = true; + break; + default: + break; + } + } + return next(preverified, ctx); + }); + + // Set SNI Hostname (many hosts need this to handshake successfully) + if(!SSL_set_tlsext_host_name(m_stream.native_handle(), + http_client->m_url_spec.host.c_str())) { + int r = -::ERR_get_error(); + lderr(cct) << "failed to initialize SNI hostname: " << cpp_strerror(r) + << dendl; + on_finish->complete(r); + return; + } + + // Perform the SSL/TLS handshake + m_stream.async_handshake( + boost::asio::ssl::stream_base::client, + asio::util::get_callback_adapter( + [this, on_finish](int r) { handle_handshake(r, on_finish); })); + } + + void handle_handshake(int r, Context* on_finish) { + auto http_client = this->m_http_client; + auto cct = http_client->m_cct; + ldout(cct, 15) << "r=" << r << dendl; + + if (r < 0) { + lderr(cct) << "failed to complete handshake: " << cpp_strerror(r) + << dendl; + disconnect(new LambdaContext([r, on_finish](int) { + on_finish->complete(r); })); + return; + } + + m_ssl_enabled = true; + on_finish->complete(0); + } + + void shutdown(int r, Context* on_finish) { + auto http_client = this->m_http_client; + auto cct = http_client->m_cct; + ldout(cct, 15) << "r=" << r << dendl; + + on_finish->complete(r); + } +}; + +#undef dout_prefix +#define dout_prefix *_dout << "librbd::migration::HttpClient: " << this \ + << " " << __func__ << ": " + +template <typename I> +HttpClient<I>::HttpClient(I* image_ctx, const std::string& url) + : m_cct(image_ctx->cct), m_image_ctx(image_ctx), + m_asio_engine(image_ctx->asio_engine), m_url(url), + m_strand(boost::asio::make_strand(*m_asio_engine)), + m_ssl_context(boost::asio::ssl::context::sslv23_client) { + m_ssl_context.set_default_verify_paths(); +} + +template <typename I> +void HttpClient<I>::open(Context* on_finish) { + ldout(m_cct, 10) << "url=" << m_url << dendl; + + int r = util::parse_url(m_cct, m_url, &m_url_spec); + if (r < 0) { + lderr(m_cct) << "failed to parse url '" << m_url << "': " << cpp_strerror(r) + << dendl; + on_finish->complete(-EINVAL); + return; + } + + boost::asio::post(m_strand, [this, on_finish]() mutable { + create_http_session(on_finish); }); +} + +template <typename I> +void HttpClient<I>::close(Context* on_finish) { + boost::asio::post(m_strand, [this, on_finish]() mutable { + shut_down_http_session(on_finish); }); +} + +template <typename I> +void HttpClient<I>::get_size(uint64_t* size, Context* on_finish) { + ldout(m_cct, 10) << dendl; + + Request req; + req.method(boost::beast::http::verb::head); + + issue( + std::move(req), [this, size, on_finish](int r, Response&& response) { + handle_get_size(r, std::move(response), size, on_finish); + }); +} + +template <typename I> +void HttpClient<I>::handle_get_size(int r, Response&& response, uint64_t* size, + Context* on_finish) { + ldout(m_cct, 10) << "r=" << r << dendl; + + if (r < 0) { + lderr(m_cct) << "failed to retrieve size: " << cpp_strerror(r) << dendl; + on_finish->complete(r); + return; + } else if (!response.has_content_length()) { + lderr(m_cct) << "failed to retrieve size: missing content-length" << dendl; + on_finish->complete(-EINVAL); + return; + } + + auto content_length = response[boost::beast::http::field::content_length]; + try { + *size = boost::lexical_cast<uint64_t>(content_length); + } catch (boost::bad_lexical_cast&) { + lderr(m_cct) << "invalid content-length in response" << dendl; + on_finish->complete(-EBADMSG); + return; + } + + on_finish->complete(0); +} + +template <typename I> +void HttpClient<I>::read(io::Extents&& byte_extents, bufferlist* data, + Context* on_finish) { + ldout(m_cct, 20) << dendl; + + auto aio_comp = io::AioCompletion::create_and_start( + on_finish, librbd::util::get_image_ctx(m_image_ctx), io::AIO_TYPE_READ); + aio_comp->set_request_count(byte_extents.size()); + + // utilize ReadResult to assemble multiple byte extents into a single bl + // since boost::beast doesn't support multipart responses out-of-the-box + io::ReadResult read_result{data}; + aio_comp->read_result = std::move(read_result); + aio_comp->read_result.set_image_extents(byte_extents); + + // issue a range get request for each extent + uint64_t buffer_offset = 0; + for (auto [byte_offset, byte_length] : byte_extents) { + auto ctx = new io::ReadResult::C_ImageReadRequest( + aio_comp, buffer_offset, {{byte_offset, byte_length}}); + buffer_offset += byte_length; + + Request req; + req.method(boost::beast::http::verb::get); + + std::stringstream range; + ceph_assert(byte_length > 0); + range << "bytes=" << byte_offset << "-" << (byte_offset + byte_length - 1); + req.set(boost::beast::http::field::range, range.str()); + + issue( + std::move(req), + [this, byte_offset=byte_offset, byte_length=byte_length, ctx] + (int r, Response&& response) { + handle_read(r, std::move(response), byte_offset, byte_length, &ctx->bl, + ctx); + }); + } +} + +template <typename I> +void HttpClient<I>::handle_read(int r, Response&& response, + uint64_t byte_offset, uint64_t byte_length, + bufferlist* data, Context* on_finish) { + ldout(m_cct, 20) << "bytes=" << byte_offset << "~" << byte_length << ", " + << "r=" << r << dendl; + + if (r < 0) { + lderr(m_cct) << "failed to read requested byte range: " + << cpp_strerror(r) << dendl; + on_finish->complete(r); + return; + } else if (response.result() != boost::beast::http::status::partial_content) { + lderr(m_cct) << "failed to retrieve requested byte range: HTTP " + << response.result() << dendl; + on_finish->complete(-EIO); + return; + } else if (byte_length != response.body().size()) { + lderr(m_cct) << "unexpected short range read: " + << "wanted=" << byte_length << ", " + << "received=" << response.body().size() << dendl; + on_finish->complete(-EINVAL); + return; + } + + data->clear(); + data->append(response.body()); + on_finish->complete(data->length()); +} + +template <typename I> +void HttpClient<I>::issue(std::shared_ptr<Work>&& work) { + boost::asio::post(m_strand, [this, work=std::move(work)]() mutable { + m_http_session->issue(std::move(work)); }); +} + +template <typename I> +void HttpClient<I>::create_http_session(Context* on_finish) { + ldout(m_cct, 15) << dendl; + + ceph_assert(m_http_session == nullptr); + switch (m_url_spec.scheme) { + case URL_SCHEME_HTTP: + m_http_session = std::make_unique<PlainHttpSession>(this); + break; + case URL_SCHEME_HTTPS: + m_http_session = std::make_unique<SslHttpSession>(this); + break; + default: + ceph_assert(false); + break; + } + + m_http_session->init(on_finish); +} + +template <typename I> +void HttpClient<I>::shut_down_http_session(Context* on_finish) { + ldout(m_cct, 15) << dendl; + + if (m_http_session == nullptr) { + on_finish->complete(0); + return; + } + + m_http_session->shut_down(on_finish); +} + +} // namespace migration +} // namespace librbd + +template class librbd::migration::HttpClient<librbd::ImageCtx>; diff --git a/src/librbd/migration/HttpClient.h b/src/librbd/migration/HttpClient.h new file mode 100644 index 000000000..3997e6159 --- /dev/null +++ b/src/librbd/migration/HttpClient.h @@ -0,0 +1,205 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#ifndef CEPH_LIBRBD_MIGRATION_HTTP_CLIENT_H +#define CEPH_LIBRBD_MIGRATION_HTTP_CLIENT_H + +#include "include/common_fwd.h" +#include "include/int_types.h" +#include "librbd/io/Types.h" +#include "librbd/migration/HttpProcessorInterface.h" +#include "librbd/migration/Types.h" +#include <boost/asio/io_context.hpp> +#include <boost/asio/strand.hpp> +#include <boost/asio/ip/tcp.hpp> +#include <boost/asio/ssl/context.hpp> +#include <boost/beast/version.hpp> +#include <boost/beast/core/tcp_stream.hpp> +#include <boost/beast/http/empty_body.hpp> +#include <boost/beast/http/message.hpp> +#include <boost/beast/http/string_body.hpp> +#include <boost/beast/http/write.hpp> +#include <boost/beast/ssl/ssl_stream.hpp> +#include <functional> +#include <memory> +#include <string> +#include <utility> + +struct Context; + +namespace librbd { + +struct AsioEngine; +struct ImageCtx; + +namespace migration { + +template <typename ImageCtxT> +class HttpClient { +public: + using EmptyBody = boost::beast::http::empty_body; + using StringBody = boost::beast::http::string_body; + using Request = boost::beast::http::request<EmptyBody>; + using Response = boost::beast::http::response<StringBody>; + + using RequestPreprocessor = std::function<void(Request&)>; + + static HttpClient* create(ImageCtxT* image_ctx, const std::string& url) { + return new HttpClient(image_ctx, url); + } + + HttpClient(ImageCtxT* image_ctx, const std::string& url); + HttpClient(const HttpClient&) = delete; + HttpClient& operator=(const HttpClient&) = delete; + + void open(Context* on_finish); + void close(Context* on_finish); + + void get_size(uint64_t* size, Context* on_finish); + + void read(io::Extents&& byte_extents, bufferlist* data, + Context* on_finish); + + void set_ignore_self_signed_cert(bool ignore) { + m_ignore_self_signed_cert = ignore; + } + + void set_http_processor(HttpProcessorInterface* http_processor) { + m_http_processor = http_processor; + } + + template <class Body, typename Completion> + void issue(boost::beast::http::request<Body>&& request, + Completion&& completion) { + struct WorkImpl : Work { + HttpClient* http_client; + boost::beast::http::request<Body> request; + Completion completion; + + WorkImpl(HttpClient* http_client, + boost::beast::http::request<Body>&& request, + Completion&& completion) + : http_client(http_client), request(std::move(request)), + completion(std::move(completion)) { + } + WorkImpl(const WorkImpl&) = delete; + WorkImpl& operator=(const WorkImpl&) = delete; + + bool need_eof() const override { + return request.need_eof(); + } + + bool header_only() const override { + return (request.method() == boost::beast::http::verb::head); + } + + void complete(int r, Response&& response) override { + completion(r, std::move(response)); + } + + void operator()(boost::beast::tcp_stream& stream) override { + preprocess_request(); + + boost::beast::http::async_write( + stream, request, + [http_session=http_client->m_http_session.get(), + work=this->shared_from_this()] + (boost::beast::error_code ec, std::size_t) mutable { + http_session->handle_issue(ec, std::move(work)); + }); + } + + void operator()( + boost::beast::ssl_stream<boost::beast::tcp_stream>& stream) override { + preprocess_request(); + + boost::beast::http::async_write( + stream, request, + [http_session=http_client->m_http_session.get(), + work=this->shared_from_this()] + (boost::beast::error_code ec, std::size_t) mutable { + http_session->handle_issue(ec, std::move(work)); + }); + } + + void preprocess_request() { + if (http_client->m_http_processor) { + http_client->m_http_processor->process_request(request); + } + } + }; + + initialize_default_fields(request); + issue(std::make_shared<WorkImpl>(this, std::move(request), + std::move(completion))); + } + +private: + struct Work; + struct HttpSessionInterface { + virtual ~HttpSessionInterface() {} + + virtual void init(Context* on_finish) = 0; + virtual void shut_down(Context* on_finish) = 0; + + virtual void issue(std::shared_ptr<Work>&& work) = 0; + virtual void handle_issue(boost::system::error_code ec, + std::shared_ptr<Work>&& work) = 0; + }; + + struct Work : public std::enable_shared_from_this<Work> { + virtual ~Work() {} + virtual bool need_eof() const = 0; + virtual bool header_only() const = 0; + virtual void complete(int r, Response&&) = 0; + virtual void operator()(boost::beast::tcp_stream& stream) = 0; + virtual void operator()( + boost::beast::ssl_stream<boost::beast::tcp_stream>& stream) = 0; + }; + + template <typename D> struct HttpSession; + struct PlainHttpSession; + struct SslHttpSession; + + CephContext* m_cct; + ImageCtxT* m_image_ctx; + std::shared_ptr<AsioEngine> m_asio_engine; + std::string m_url; + + UrlSpec m_url_spec; + + bool m_ignore_self_signed_cert = false; + + HttpProcessorInterface* m_http_processor = nullptr; + + boost::asio::strand<boost::asio::io_context::executor_type> m_strand; + + boost::asio::ssl::context m_ssl_context; + std::unique_ptr<HttpSessionInterface> m_http_session; + + template <typename Fields> + void initialize_default_fields(Fields& fields) const { + fields.target(m_url_spec.path); + fields.set(boost::beast::http::field::host, m_url_spec.host); + fields.set(boost::beast::http::field::user_agent, + BOOST_BEAST_VERSION_STRING); + } + + void handle_get_size(int r, Response&& response, uint64_t* size, + Context* on_finish); + + void handle_read(int r, Response&& response, uint64_t byte_offset, + uint64_t byte_length, bufferlist* data, Context* on_finish); + + void issue(std::shared_ptr<Work>&& work); + + void create_http_session(Context* on_finish); + void shut_down_http_session(Context* on_finish); +}; + +} // namespace migration +} // namespace librbd + +extern template class librbd::migration::HttpClient<librbd::ImageCtx>; + +#endif // CEPH_LIBRBD_MIGRATION_HTTP_CLIENT_H diff --git a/src/librbd/migration/HttpProcessorInterface.h b/src/librbd/migration/HttpProcessorInterface.h new file mode 100644 index 000000000..3d9af88bd --- /dev/null +++ b/src/librbd/migration/HttpProcessorInterface.h @@ -0,0 +1,27 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#ifndef CEPH_LIBRBD_MIGRATION_HTTP_PROCESSOR_INTERFACE_H +#define CEPH_LIBRBD_MIGRATION_HTTP_PROCESSOR_INTERFACE_H + +#include <boost/beast/http/empty_body.hpp> +#include <boost/beast/http/message.hpp> + +namespace librbd { +namespace migration { + +struct HttpProcessorInterface { + using EmptyBody = boost::beast::http::empty_body; + using EmptyRequest = boost::beast::http::request<EmptyBody>; + + virtual ~HttpProcessorInterface() { + } + + virtual void process_request(EmptyRequest& request) = 0; + +}; + +} // namespace migration +} // namespace librbd + +#endif // CEPH_LIBRBD_MIGRATION_HTTP_PROCESSOR_INTERFACE_H diff --git a/src/librbd/migration/HttpStream.cc b/src/librbd/migration/HttpStream.cc new file mode 100644 index 000000000..fa3cc0032 --- /dev/null +++ b/src/librbd/migration/HttpStream.cc @@ -0,0 +1,83 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#include "librbd/migration/HttpStream.h" +#include "common/dout.h" +#include "common/errno.h" +#include "librbd/AsioEngine.h" +#include "librbd/ImageCtx.h" +#include "librbd/asio/Utils.h" +#include "librbd/migration/HttpClient.h" +#include <boost/beast/http.hpp> + +namespace librbd { +namespace migration { + +namespace { + +const std::string URL_KEY {"url"}; + +} // anonymous namespace + +#define dout_subsys ceph_subsys_rbd +#undef dout_prefix +#define dout_prefix *_dout << "librbd::migration::HttpStream: " << this \ + << " " << __func__ << ": " + +template <typename I> +HttpStream<I>::HttpStream(I* image_ctx, const json_spirit::mObject& json_object) + : m_image_ctx(image_ctx), m_cct(image_ctx->cct), + m_asio_engine(image_ctx->asio_engine), m_json_object(json_object) { +} + +template <typename I> +HttpStream<I>::~HttpStream() { +} + +template <typename I> +void HttpStream<I>::open(Context* on_finish) { + auto& url_value = m_json_object[URL_KEY]; + if (url_value.type() != json_spirit::str_type) { + lderr(m_cct) << "failed to locate '" << URL_KEY << "' key" << dendl; + on_finish->complete(-EINVAL); + return; + } + + m_url = url_value.get_str(); + ldout(m_cct, 10) << "url=" << m_url << dendl; + + m_http_client.reset(HttpClient<I>::create(m_image_ctx, m_url)); + m_http_client->open(on_finish); +} + +template <typename I> +void HttpStream<I>::close(Context* on_finish) { + ldout(m_cct, 10) << dendl; + + if (!m_http_client) { + on_finish->complete(0); + return; + } + + m_http_client->close(on_finish); +} + +template <typename I> +void HttpStream<I>::get_size(uint64_t* size, Context* on_finish) { + ldout(m_cct, 10) << dendl; + + m_http_client->get_size(size, on_finish); +} + +template <typename I> +void HttpStream<I>::read(io::Extents&& byte_extents, bufferlist* data, + Context* on_finish) { + ldout(m_cct, 20) << "byte_extents=" << byte_extents << dendl; + + m_http_client->read(std::move(byte_extents), data, on_finish); +} + +} // namespace migration +} // namespace librbd + +template class librbd::migration::HttpStream<librbd::ImageCtx>; diff --git a/src/librbd/migration/HttpStream.h b/src/librbd/migration/HttpStream.h new file mode 100644 index 000000000..01a583714 --- /dev/null +++ b/src/librbd/migration/HttpStream.h @@ -0,0 +1,68 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#ifndef CEPH_LIBRBD_MIGRATION_HTTP_STREAM_H +#define CEPH_LIBRBD_MIGRATION_HTTP_STREAM_H + +#include "include/int_types.h" +#include "librbd/migration/StreamInterface.h" +#include <boost/beast/http/message.hpp> +#include <boost/beast/http/string_body.hpp> +#include <json_spirit/json_spirit.h> +#include <memory> +#include <string> + +struct Context; + +namespace librbd { + +struct AsioEngine; +struct ImageCtx; + +namespace migration { + +template <typename> class HttpClient; + +template <typename ImageCtxT> +class HttpStream : public StreamInterface { +public: + static HttpStream* create(ImageCtxT* image_ctx, + const json_spirit::mObject& json_object) { + return new HttpStream(image_ctx, json_object); + } + + HttpStream(ImageCtxT* image_ctx, const json_spirit::mObject& json_object); + ~HttpStream() override; + + HttpStream(const HttpStream&) = delete; + HttpStream& operator=(const HttpStream&) = delete; + + void open(Context* on_finish) override; + void close(Context* on_finish) override; + + void get_size(uint64_t* size, Context* on_finish) override; + + void read(io::Extents&& byte_extents, bufferlist* data, + Context* on_finish) override; + +private: + using HttpResponse = boost::beast::http::response< + boost::beast::http::string_body>; + + ImageCtxT* m_image_ctx; + CephContext* m_cct; + std::shared_ptr<AsioEngine> m_asio_engine; + json_spirit::mObject m_json_object; + + std::string m_url; + + std::unique_ptr<HttpClient<ImageCtxT>> m_http_client; + +}; + +} // namespace migration +} // namespace librbd + +extern template class librbd::migration::HttpStream<librbd::ImageCtx>; + +#endif // CEPH_LIBRBD_MIGRATION_HTTP_STREAM_H diff --git a/src/librbd/migration/ImageDispatch.cc b/src/librbd/migration/ImageDispatch.cc new file mode 100644 index 000000000..3aa2eeb0b --- /dev/null +++ b/src/librbd/migration/ImageDispatch.cc @@ -0,0 +1,157 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#include "librbd/migration/ImageDispatch.h" +#include "include/neorados/RADOS.hpp" +#include "common/dout.h" +#include "librbd/ImageCtx.h" +#include "librbd/io/AioCompletion.h" +#include "librbd/migration/FormatInterface.h" + +#define dout_subsys ceph_subsys_rbd +#undef dout_prefix +#define dout_prefix *_dout << "librbd::migration::ImageDispatch: " << this \ + << " " << __func__ << ": " + +namespace librbd { +namespace migration { + +template <typename I> +ImageDispatch<I>::ImageDispatch(I* image_ctx, + std::unique_ptr<FormatInterface> format) + : m_image_ctx(image_ctx), m_format(std::move(format)) { + auto cct = m_image_ctx->cct; + ldout(cct, 10) << "ictx=" << image_ctx << dendl; +} + +template <typename I> +void ImageDispatch<I>::shut_down(Context* on_finish) { + auto cct = m_image_ctx->cct; + ldout(cct, 10) << dendl; + + on_finish->complete(0); +} + +template <typename I> +bool ImageDispatch<I>::read( + io::AioCompletion* aio_comp, io::Extents &&image_extents, + io::ReadResult &&read_result, IOContext io_context, int op_flags, + int read_flags, const ZTracer::Trace &parent_trace, uint64_t tid, + std::atomic<uint32_t>* image_dispatch_flags, + io::DispatchResult* dispatch_result, Context** on_finish, + Context* on_dispatched) { + auto cct = m_image_ctx->cct; + ldout(cct, 20) << dendl; + + *dispatch_result = io::DISPATCH_RESULT_COMPLETE; + return m_format->read(aio_comp, io_context->read_snap().value_or(CEPH_NOSNAP), + std::move(image_extents), std::move(read_result), + op_flags, read_flags, parent_trace); +} + +template <typename I> +bool ImageDispatch<I>::write( + io::AioCompletion* aio_comp, io::Extents &&image_extents, bufferlist &&bl, + IOContext io_context, int op_flags, const ZTracer::Trace &parent_trace, + uint64_t tid, std::atomic<uint32_t>* image_dispatch_flags, + io::DispatchResult* dispatch_result, Context** on_finish, + Context* on_dispatched) { + auto cct = m_image_ctx->cct; + lderr(cct) << dendl; + + fail_io(-EROFS, aio_comp, dispatch_result); + return true; +} + +template <typename I> +bool ImageDispatch<I>::discard( + io::AioCompletion* aio_comp, io::Extents &&image_extents, + uint32_t discard_granularity_bytes, + IOContext io_context, const ZTracer::Trace &parent_trace, uint64_t tid, + std::atomic<uint32_t>* image_dispatch_flags, + io::DispatchResult* dispatch_result, Context** on_finish, + Context* on_dispatched) { + auto cct = m_image_ctx->cct; + lderr(cct) << dendl; + + fail_io(-EROFS, aio_comp, dispatch_result); + return true; +} + +template <typename I> +bool ImageDispatch<I>::write_same( + io::AioCompletion* aio_comp, io::Extents &&image_extents, bufferlist &&bl, + IOContext io_context, int op_flags, const ZTracer::Trace &parent_trace, + uint64_t tid, std::atomic<uint32_t>* image_dispatch_flags, + io::DispatchResult* dispatch_result, Context** on_finish, + Context* on_dispatched) { + auto cct = m_image_ctx->cct; + lderr(cct) << dendl; + + fail_io(-EROFS, aio_comp, dispatch_result); + return true; +} + +template <typename I> +bool ImageDispatch<I>::compare_and_write( + io::AioCompletion* aio_comp, io::Extents &&image_extents, + bufferlist &&cmp_bl, bufferlist &&bl, uint64_t *mismatch_offset, + IOContext io_context, int op_flags, const ZTracer::Trace &parent_trace, + uint64_t tid, std::atomic<uint32_t>* image_dispatch_flags, + io::DispatchResult* dispatch_result, Context** on_finish, + Context* on_dispatched) { + auto cct = m_image_ctx->cct; + lderr(cct) << dendl; + + fail_io(-EROFS, aio_comp, dispatch_result); + return true; +} + +template <typename I> +bool ImageDispatch<I>::flush( + io::AioCompletion* aio_comp, io::FlushSource flush_source, + const ZTracer::Trace &parent_trace, uint64_t tid, + std::atomic<uint32_t>* image_dispatch_flags, + io::DispatchResult* dispatch_result, Context** on_finish, + Context* on_dispatched) { + auto cct = m_image_ctx->cct; + ldout(cct, 20) << dendl; + + *dispatch_result = io::DISPATCH_RESULT_COMPLETE; + aio_comp->set_request_count(0); + return true; +} + +template <typename I> +bool ImageDispatch<I>::list_snaps( + io::AioCompletion* aio_comp, io::Extents&& image_extents, + io::SnapIds&& snap_ids, int list_snaps_flags, + io::SnapshotDelta* snapshot_delta, const ZTracer::Trace &parent_trace, + uint64_t tid, std::atomic<uint32_t>* image_dispatch_flags, + io::DispatchResult* dispatch_result, Context** on_finish, + Context* on_dispatched) { + auto cct = m_image_ctx->cct; + ldout(cct, 20) << dendl; + + *dispatch_result = io::DISPATCH_RESULT_COMPLETE; + + aio_comp->set_request_count(1); + auto ctx = new io::C_AioRequest(aio_comp); + + m_format->list_snaps(std::move(image_extents), std::move(snap_ids), + list_snaps_flags, snapshot_delta, parent_trace, + ctx); + return true; +} + +template <typename I> +void ImageDispatch<I>::fail_io(int r, io::AioCompletion* aio_comp, + io::DispatchResult* dispatch_result) { + *dispatch_result = io::DISPATCH_RESULT_COMPLETE; + aio_comp->fail(r); +} + +} // namespace migration +} // namespace librbd + +template class librbd::migration::ImageDispatch<librbd::ImageCtx>; diff --git a/src/librbd/migration/ImageDispatch.h b/src/librbd/migration/ImageDispatch.h new file mode 100644 index 000000000..03bb3aa52 --- /dev/null +++ b/src/librbd/migration/ImageDispatch.h @@ -0,0 +1,102 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#ifndef CEPH_LIBRBD_MIGRATION_IMAGE_DISPATCH_H +#define CEPH_LIBRBD_MIGRATION_IMAGE_DISPATCH_H + +#include "librbd/io/ImageDispatchInterface.h" +#include <memory> + +struct Context; + +namespace librbd { + +struct ImageCtx; + +namespace migration { + +struct FormatInterface; + +template <typename ImageCtxT> +class ImageDispatch : public io::ImageDispatchInterface { +public: + static ImageDispatch* create(ImageCtxT* image_ctx, + std::unique_ptr<FormatInterface> source) { + return new ImageDispatch(image_ctx, std::move(source)); + } + + ImageDispatch(ImageCtxT* image_ctx, std::unique_ptr<FormatInterface> source); + + void shut_down(Context* on_finish) override; + + io::ImageDispatchLayer get_dispatch_layer() const override { + return io::IMAGE_DISPATCH_LAYER_MIGRATION; + } + + bool read( + io::AioCompletion* aio_comp, io::Extents &&image_extents, + io::ReadResult &&read_result, IOContext io_context, int op_flags, + int read_flags, const ZTracer::Trace &parent_trace, uint64_t tid, + std::atomic<uint32_t>* image_dispatch_flags, + io::DispatchResult* dispatch_result, Context** on_finish, + Context* on_dispatched) override; + bool write( + io::AioCompletion* aio_comp, io::Extents &&image_extents, bufferlist &&bl, + IOContext io_context, int op_flags, const ZTracer::Trace &parent_trace, + uint64_t tid, std::atomic<uint32_t>* image_dispatch_flags, + io::DispatchResult* dispatch_result, Context** on_finish, + Context* on_dispatched) override; + bool discard( + io::AioCompletion* aio_comp, io::Extents &&image_extents, + uint32_t discard_granularity_bytes, + IOContext io_context, const ZTracer::Trace &parent_trace, uint64_t tid, + std::atomic<uint32_t>* image_dispatch_flags, + io::DispatchResult* dispatch_result, Context** on_finish, + Context* on_dispatched) override; + bool write_same( + io::AioCompletion* aio_comp, io::Extents &&image_extents, bufferlist &&bl, + IOContext io_context, int op_flags, const ZTracer::Trace &parent_trace, + uint64_t tid, std::atomic<uint32_t>* image_dispatch_flags, + io::DispatchResult* dispatch_result, Context** on_finish, + Context* on_dispatched) override; + bool compare_and_write( + io::AioCompletion* aio_comp, io::Extents &&image_extents, + bufferlist &&cmp_bl, bufferlist &&bl, uint64_t *mismatch_offset, + IOContext io_context, int op_flags, const ZTracer::Trace &parent_trace, + uint64_t tid, std::atomic<uint32_t>* image_dispatch_flags, + io::DispatchResult* dispatch_result, Context** on_finish, + Context* on_dispatched) override; + bool flush( + io::AioCompletion* aio_comp, io::FlushSource flush_source, + const ZTracer::Trace &parent_trace, uint64_t tid, + std::atomic<uint32_t>* image_dispatch_flags, + io::DispatchResult* dispatch_result, Context** on_finish, + Context* on_dispatched) override; + + bool list_snaps( + io::AioCompletion* aio_comp, io::Extents&& image_extents, + io::SnapIds&& snap_ids, int list_snaps_flags, + io::SnapshotDelta* snapshot_delta, const ZTracer::Trace &parent_trace, + uint64_t tid, std::atomic<uint32_t>* image_dispatch_flags, + io::DispatchResult* dispatch_result, Context** on_finish, + Context* on_dispatched) override; + + bool invalidate_cache(Context* on_finish) override { + return false; + } + +private: + ImageCtxT* m_image_ctx; + std::unique_ptr<FormatInterface> m_format; + + void fail_io(int r, io::AioCompletion* aio_comp, + io::DispatchResult* dispatch_result); + +}; + +} // namespace migration +} // namespace librbd + +extern template class librbd::migration::ImageDispatch<librbd::ImageCtx>; + +#endif // CEPH_LIBRBD_MIGRATION_IMAGE_DISPATCH_H diff --git a/src/librbd/migration/NativeFormat.cc b/src/librbd/migration/NativeFormat.cc new file mode 100644 index 000000000..a7682619c --- /dev/null +++ b/src/librbd/migration/NativeFormat.cc @@ -0,0 +1,309 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#include "librbd/migration/NativeFormat.h" +#include "include/neorados/RADOS.hpp" +#include "common/dout.h" +#include "common/errno.h" +#include "librbd/ImageCtx.h" +#include "librbd/ImageState.h" +#include "librbd/Utils.h" +#include "librbd/asio/ContextWQ.h" +#include "librbd/io/ImageDispatchSpec.h" +#include "json_spirit/json_spirit.h" +#include "boost/lexical_cast.hpp" +#include <sstream> + +#define dout_subsys ceph_subsys_rbd +#undef dout_prefix +#define dout_prefix *_dout << "librbd::migration::NativeFormat: " << this \ + << " " << __func__ << ": " + +namespace librbd { +namespace migration { + +namespace { + +const std::string TYPE_KEY{"type"}; +const std::string POOL_ID_KEY{"pool_id"}; +const std::string POOL_NAME_KEY{"pool_name"}; +const std::string POOL_NAMESPACE_KEY{"pool_namespace"}; +const std::string IMAGE_NAME_KEY{"image_name"}; +const std::string IMAGE_ID_KEY{"image_id"}; +const std::string SNAP_NAME_KEY{"snap_name"}; +const std::string SNAP_ID_KEY{"snap_id"}; + +} // anonymous namespace + +template <typename I> +std::string NativeFormat<I>::build_source_spec( + int64_t pool_id, const std::string& pool_namespace, + const std::string& image_name, const std::string& image_id) { + json_spirit::mObject source_spec; + source_spec[TYPE_KEY] = "native"; + source_spec[POOL_ID_KEY] = pool_id; + source_spec[POOL_NAMESPACE_KEY] = pool_namespace; + source_spec[IMAGE_NAME_KEY] = image_name; + if (!image_id.empty()) { + source_spec[IMAGE_ID_KEY] = image_id; + } + return json_spirit::write(source_spec); +} + +template <typename I> +NativeFormat<I>::NativeFormat( + I* image_ctx, const json_spirit::mObject& json_object, bool import_only) + : m_image_ctx(image_ctx), m_json_object(json_object), + m_import_only(import_only) { +} + +template <typename I> +void NativeFormat<I>::open(Context* on_finish) { + auto cct = m_image_ctx->cct; + ldout(cct, 10) << dendl; + + auto& pool_name_val = m_json_object[POOL_NAME_KEY]; + if (pool_name_val.type() == json_spirit::str_type) { + librados::Rados rados(m_image_ctx->md_ctx); + librados::IoCtx io_ctx; + int r = rados.ioctx_create(pool_name_val.get_str().c_str(), io_ctx); + if (r < 0 ) { + lderr(cct) << "invalid pool name" << dendl; + on_finish->complete(r); + return; + } + + m_pool_id = io_ctx.get_id(); + } else if (pool_name_val.type() != json_spirit::null_type) { + lderr(cct) << "invalid pool name" << dendl; + on_finish->complete(-EINVAL); + return; + } + + auto& pool_id_val = m_json_object[POOL_ID_KEY]; + if (m_pool_id != -1 && pool_id_val.type() != json_spirit::null_type) { + lderr(cct) << "cannot specify both pool name and pool id" << dendl; + on_finish->complete(-EINVAL); + return; + } else if (pool_id_val.type() == json_spirit::int_type) { + m_pool_id = pool_id_val.get_int64(); + } else if (pool_id_val.type() == json_spirit::str_type) { + try { + m_pool_id = boost::lexical_cast<int64_t>(pool_id_val.get_str()); + } catch (boost::bad_lexical_cast &) { + } + } + + if (m_pool_id == -1) { + lderr(cct) << "missing or invalid pool id" << dendl; + on_finish->complete(-EINVAL); + return; + } + + auto& pool_namespace_val = m_json_object[POOL_NAMESPACE_KEY]; + if (pool_namespace_val.type() == json_spirit::str_type) { + m_pool_namespace = pool_namespace_val.get_str(); + } else if (pool_namespace_val.type() != json_spirit::null_type) { + lderr(cct) << "invalid pool namespace" << dendl; + on_finish->complete(-EINVAL); + return; + } + + auto& image_name_val = m_json_object[IMAGE_NAME_KEY]; + if (image_name_val.type() != json_spirit::str_type) { + lderr(cct) << "missing or invalid image name" << dendl; + on_finish->complete(-EINVAL); + return; + } + m_image_name = image_name_val.get_str(); + + auto& image_id_val = m_json_object[IMAGE_ID_KEY]; + if (image_id_val.type() == json_spirit::str_type) { + m_image_id = image_id_val.get_str(); + } else if (image_id_val.type() != json_spirit::null_type) { + lderr(cct) << "invalid image id" << dendl; + on_finish->complete(-EINVAL); + return; + } + + auto& snap_name_val = m_json_object[SNAP_NAME_KEY]; + if (snap_name_val.type() == json_spirit::str_type) { + m_snap_name = snap_name_val.get_str(); + } else if (snap_name_val.type() != json_spirit::null_type) { + lderr(cct) << "invalid snap name" << dendl; + on_finish->complete(-EINVAL); + return; + } + + auto& snap_id_val = m_json_object[SNAP_ID_KEY]; + if (!m_snap_name.empty() && snap_id_val.type() != json_spirit::null_type) { + lderr(cct) << "cannot specify both snap name and snap id" << dendl; + on_finish->complete(-EINVAL); + return; + } else if (snap_id_val.type() == json_spirit::str_type) { + try { + m_snap_id = boost::lexical_cast<uint64_t>(snap_id_val.get_str()); + } catch (boost::bad_lexical_cast &) { + } + } else if (snap_id_val.type() == json_spirit::int_type) { + m_snap_id = snap_id_val.get_uint64(); + } + + if (snap_id_val.type() != json_spirit::null_type && + m_snap_id == CEPH_NOSNAP) { + lderr(cct) << "invalid snap id" << dendl; + on_finish->complete(-EINVAL); + return; + } + + // snapshot is required for import to keep source read-only + if (m_import_only && m_snap_name.empty() && m_snap_id == CEPH_NOSNAP) { + lderr(cct) << "snapshot required for import" << dendl; + on_finish->complete(-EINVAL); + return; + } + + // TODO add support for external clusters + librados::IoCtx io_ctx; + int r = util::create_ioctx(m_image_ctx->md_ctx, "source image", + m_pool_id, m_pool_namespace, &io_ctx); + if (r < 0) { + on_finish->complete(r); + return; + } + + m_image_ctx->md_ctx.dup(io_ctx); + m_image_ctx->data_ctx.dup(io_ctx); + m_image_ctx->name = m_image_name; + + uint64_t flags = 0; + if (m_image_id.empty() && !m_import_only) { + flags |= OPEN_FLAG_OLD_FORMAT; + } else { + m_image_ctx->id = m_image_id; + } + + if (m_image_ctx->child != nullptr) { + // set rados flags for reading the parent image + if (m_image_ctx->child->config.template get_val<bool>("rbd_balance_parent_reads")) { + m_image_ctx->set_read_flag(librados::OPERATION_BALANCE_READS); + } else if (m_image_ctx->child->config.template get_val<bool>("rbd_localize_parent_reads")) { + m_image_ctx->set_read_flag(librados::OPERATION_LOCALIZE_READS); + } + } + + // open the source RBD image + on_finish = new LambdaContext([this, on_finish](int r) { + handle_open(r, on_finish); }); + m_image_ctx->state->open(flags, on_finish); +} + +template <typename I> +void NativeFormat<I>::handle_open(int r, Context* on_finish) { + auto cct = m_image_ctx->cct; + ldout(cct, 10) << "r=" << r << dendl; + + if (r < 0) { + lderr(cct) << "failed to open image: " << cpp_strerror(r) << dendl; + on_finish->complete(r); + return; + } + + if (m_snap_id == CEPH_NOSNAP && m_snap_name.empty()) { + on_finish->complete(0); + return; + } + + if (!m_snap_name.empty()) { + std::shared_lock image_locker{m_image_ctx->image_lock}; + m_snap_id = m_image_ctx->get_snap_id(cls::rbd::UserSnapshotNamespace{}, + m_snap_name); + } + + if (m_snap_id == CEPH_NOSNAP) { + lderr(cct) << "failed to locate snapshot " << m_snap_name << dendl; + on_finish = new LambdaContext([on_finish](int) { + on_finish->complete(-ENOENT); }); + m_image_ctx->state->close(on_finish); + return; + } + + on_finish = new LambdaContext([this, on_finish](int r) { + handle_snap_set(r, on_finish); }); + m_image_ctx->state->snap_set(m_snap_id, on_finish); +} + +template <typename I> +void NativeFormat<I>::handle_snap_set(int r, Context* on_finish) { + auto cct = m_image_ctx->cct; + ldout(cct, 10) << "r=" << r << dendl; + + if (r < 0) { + lderr(cct) << "failed to set snapshot " << m_snap_id << ": " + << cpp_strerror(r) << dendl; + on_finish = new LambdaContext([r, on_finish](int) { + on_finish->complete(r); }); + m_image_ctx->state->close(on_finish); + return; + } + + on_finish->complete(0); +} + +template <typename I> +void NativeFormat<I>::close(Context* on_finish) { + auto cct = m_image_ctx->cct; + ldout(cct, 10) << dendl; + + // the native librbd::image::CloseRequest handles all cleanup + on_finish->complete(0); +} + +template <typename I> +void NativeFormat<I>::get_snapshots(SnapInfos* snap_infos, Context* on_finish) { + auto cct = m_image_ctx->cct; + ldout(cct, 10) << dendl; + + m_image_ctx->image_lock.lock_shared(); + *snap_infos = m_image_ctx->snap_info; + m_image_ctx->image_lock.unlock_shared(); + + on_finish->complete(0); +} + +template <typename I> +void NativeFormat<I>::get_image_size(uint64_t snap_id, uint64_t* size, + Context* on_finish) { + auto cct = m_image_ctx->cct; + ldout(cct, 10) << dendl; + + m_image_ctx->image_lock.lock_shared(); + *size = m_image_ctx->get_image_size(snap_id); + m_image_ctx->image_lock.unlock_shared(); + + + on_finish->complete(0); +} + +template <typename I> +void NativeFormat<I>::list_snaps(io::Extents&& image_extents, + io::SnapIds&& snap_ids, int list_snaps_flags, + io::SnapshotDelta* snapshot_delta, + const ZTracer::Trace &parent_trace, + Context* on_finish) { + auto cct = m_image_ctx->cct; + ldout(cct, 20) << "image_extents=" << image_extents << dendl; + + auto aio_comp = io::AioCompletion::create_and_start( + on_finish, util::get_image_ctx(m_image_ctx), io::AIO_TYPE_GENERIC); + auto req = io::ImageDispatchSpec::create_list_snaps( + *m_image_ctx, io::IMAGE_DISPATCH_LAYER_MIGRATION, aio_comp, + std::move(image_extents), std::move(snap_ids), list_snaps_flags, + snapshot_delta, {}); + req->send(); +} + +} // namespace migration +} // namespace librbd + +template class librbd::migration::NativeFormat<librbd::ImageCtx>; diff --git a/src/librbd/migration/NativeFormat.h b/src/librbd/migration/NativeFormat.h new file mode 100644 index 000000000..e58c04121 --- /dev/null +++ b/src/librbd/migration/NativeFormat.h @@ -0,0 +1,82 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#ifndef CEPH_LIBRBD_MIGRATION_NATIVE_FORMAT_H +#define CEPH_LIBRBD_MIGRATION_NATIVE_FORMAT_H + +#include "include/int_types.h" +#include "librbd/Types.h" +#include "librbd/migration/FormatInterface.h" +#include "json_spirit/json_spirit.h" +#include <memory> + +struct Context; + +namespace librbd { + +struct AsioEngine; +struct ImageCtx; + +namespace migration { + +template <typename ImageCtxT> +class NativeFormat : public FormatInterface { +public: + static std::string build_source_spec(int64_t pool_id, + const std::string& pool_namespace, + const std::string& image_name, + const std::string& image_id); + + static NativeFormat* create(ImageCtxT* image_ctx, + const json_spirit::mObject& json_object, + bool import_only) { + return new NativeFormat(image_ctx, json_object, import_only); + } + + NativeFormat(ImageCtxT* image_ctx, const json_spirit::mObject& json_object, + bool import_only); + NativeFormat(const NativeFormat&) = delete; + NativeFormat& operator=(const NativeFormat&) = delete; + + void open(Context* on_finish) override; + void close(Context* on_finish) override; + + void get_snapshots(SnapInfos* snap_infos, Context* on_finish) override; + void get_image_size(uint64_t snap_id, uint64_t* size, + Context* on_finish) override; + + bool read(io::AioCompletion* aio_comp, uint64_t snap_id, + io::Extents&& image_extents, io::ReadResult&& read_result, + int op_flags, int read_flags, + const ZTracer::Trace &parent_trace) override { + return false; + } + + void list_snaps(io::Extents&& image_extents, io::SnapIds&& snap_ids, + int list_snaps_flags, io::SnapshotDelta* snapshot_delta, + const ZTracer::Trace &parent_trace, + Context* on_finish) override; + +private: + ImageCtxT* m_image_ctx; + json_spirit::mObject m_json_object; + bool m_import_only; + + int64_t m_pool_id = -1; + std::string m_pool_namespace; + std::string m_image_name; + std::string m_image_id; + std::string m_snap_name; + uint64_t m_snap_id = CEPH_NOSNAP; + + void handle_open(int r, Context* on_finish); + void handle_snap_set(int r, Context* on_finish); + +}; + +} // namespace migration +} // namespace librbd + +extern template class librbd::migration::NativeFormat<librbd::ImageCtx>; + +#endif // CEPH_LIBRBD_MIGRATION_NATIVE_FORMAT_H diff --git a/src/librbd/migration/OpenSourceImageRequest.cc b/src/librbd/migration/OpenSourceImageRequest.cc new file mode 100644 index 000000000..8abdedf33 --- /dev/null +++ b/src/librbd/migration/OpenSourceImageRequest.cc @@ -0,0 +1,249 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#include "librbd/migration/OpenSourceImageRequest.h" +#include "common/dout.h" +#include "common/errno.h" +#include "librbd/ImageCtx.h" +#include "librbd/ImageState.h" +#include "librbd/Utils.h" +#include "librbd/io/ImageDispatcher.h" +#include "librbd/migration/ImageDispatch.h" +#include "librbd/migration/NativeFormat.h" +#include "librbd/migration/SourceSpecBuilder.h" + +#define dout_subsys ceph_subsys_rbd +#undef dout_prefix +#define dout_prefix *_dout << "librbd::migration::OpenSourceImageRequest: " \ + << this << " " << __func__ << ": " + +namespace librbd { +namespace migration { + +template <typename I> +OpenSourceImageRequest<I>::OpenSourceImageRequest( + librados::IoCtx& io_ctx, I* dst_image_ctx, uint64_t src_snap_id, + const MigrationInfo &migration_info, I** src_image_ctx, Context* on_finish) + : m_cct(reinterpret_cast<CephContext*>(io_ctx.cct())), m_io_ctx(io_ctx), + m_dst_image_ctx(dst_image_ctx), m_src_snap_id(src_snap_id), + m_migration_info(migration_info), m_src_image_ctx(src_image_ctx), + m_on_finish(on_finish) { + ldout(m_cct, 10) << dendl; +} + +template <typename I> +void OpenSourceImageRequest<I>::send() { + open_source(); +} + +template <typename I> +void OpenSourceImageRequest<I>::open_source() { + ldout(m_cct, 10) << dendl; + + // note that all source image ctx properties are placeholders + *m_src_image_ctx = I::create("", "", CEPH_NOSNAP, m_io_ctx, true); + auto src_image_ctx = *m_src_image_ctx; + src_image_ctx->child = m_dst_image_ctx; + + // use default layout values (can be overridden by source layers later) + src_image_ctx->order = 22; + src_image_ctx->layout = file_layout_t(); + src_image_ctx->layout.stripe_count = 1; + src_image_ctx->layout.stripe_unit = 1ULL << src_image_ctx->order; + src_image_ctx->layout.object_size = 1Ull << src_image_ctx->order; + src_image_ctx->layout.pool_id = -1; + + bool import_only = true; + auto source_spec = m_migration_info.source_spec; + if (source_spec.empty()) { + // implies legacy migration from RBD image in same cluster + source_spec = NativeFormat<I>::build_source_spec( + m_migration_info.pool_id, m_migration_info.pool_namespace, + m_migration_info.image_name, m_migration_info.image_id); + import_only = false; + } + + ldout(m_cct, 15) << "source_spec=" << source_spec << ", " + << "source_snap_id=" << m_src_snap_id << ", " + << "import_only=" << import_only << dendl; + + SourceSpecBuilder<I> source_spec_builder{src_image_ctx}; + json_spirit::mObject source_spec_object; + int r = source_spec_builder.parse_source_spec(source_spec, + &source_spec_object); + if (r < 0) { + lderr(m_cct) << "failed to parse migration source-spec:" << cpp_strerror(r) + << dendl; + (*m_src_image_ctx)->state->close(); + finish(r); + return; + } + + r = source_spec_builder.build_format(source_spec_object, import_only, + &m_format); + if (r < 0) { + lderr(m_cct) << "failed to build migration format handler: " + << cpp_strerror(r) << dendl; + (*m_src_image_ctx)->state->close(); + finish(r); + return; + } + + auto ctx = util::create_context_callback< + OpenSourceImageRequest<I>, + &OpenSourceImageRequest<I>::handle_open_source>(this); + m_format->open(ctx); +} + +template <typename I> +void OpenSourceImageRequest<I>::handle_open_source(int r) { + ldout(m_cct, 10) << "r=" << r << dendl; + + if (r < 0) { + lderr(m_cct) << "failed to open migration source: " << cpp_strerror(r) + << dendl; + finish(r); + return; + } + + get_image_size(); +} + +template <typename I> +void OpenSourceImageRequest<I>::get_image_size() { + ldout(m_cct, 10) << dendl; + + auto ctx = util::create_context_callback< + OpenSourceImageRequest<I>, + &OpenSourceImageRequest<I>::handle_get_image_size>(this); + m_format->get_image_size(CEPH_NOSNAP, &m_image_size, ctx); +} + +template <typename I> +void OpenSourceImageRequest<I>::handle_get_image_size(int r) { + ldout(m_cct, 10) << "r=" << r << ", " + << "image_size=" << m_image_size << dendl; + + if (r < 0) { + lderr(m_cct) << "failed to retrieve image size: " << cpp_strerror(r) + << dendl; + close_image(r); + return; + } + + auto src_image_ctx = *m_src_image_ctx; + src_image_ctx->image_lock.lock(); + src_image_ctx->size = m_image_size; + src_image_ctx->image_lock.unlock(); + + get_snapshots(); +} + +template <typename I> +void OpenSourceImageRequest<I>::get_snapshots() { + ldout(m_cct, 10) << dendl; + + auto ctx = util::create_context_callback< + OpenSourceImageRequest<I>, + &OpenSourceImageRequest<I>::handle_get_snapshots>(this); + m_format->get_snapshots(&m_snap_infos, ctx); +} + +template <typename I> +void OpenSourceImageRequest<I>::handle_get_snapshots(int r) { + ldout(m_cct, 10) << "r=" << r << dendl; + + if (r < 0) { + lderr(m_cct) << "failed to retrieve snapshots: " << cpp_strerror(r) + << dendl; + close_image(r); + return; + } + + // copy snapshot metadata to image ctx + auto src_image_ctx = *m_src_image_ctx; + src_image_ctx->image_lock.lock(); + + src_image_ctx->snaps.clear(); + src_image_ctx->snap_info.clear(); + src_image_ctx->snap_ids.clear(); + + ::SnapContext snapc; + for (auto it = m_snap_infos.rbegin(); it != m_snap_infos.rend(); ++it) { + auto& [snap_id, snap_info] = *it; + snapc.snaps.push_back(snap_id); + + ldout(m_cct, 10) << "adding snap: ns=" << snap_info.snap_namespace << ", " + << "name=" << snap_info.name << ", " + << "id=" << snap_id << dendl; + src_image_ctx->add_snap( + snap_info.snap_namespace, snap_info.name, snap_id, + snap_info.size, snap_info.parent, snap_info.protection_status, + snap_info.flags, snap_info.timestamp); + } + if (!snapc.snaps.empty()) { + snapc.seq = snapc.snaps[0]; + } + src_image_ctx->snapc = snapc; + + ldout(m_cct, 15) << "read snap id: " << m_src_snap_id << ", " + << "write snapc={" + << "seq=" << snapc.seq << ", " + << "snaps=" << snapc.snaps << "}" << dendl; + + // ensure data_ctx and data_io_context are pointing to correct snapshot + if (m_src_snap_id != CEPH_NOSNAP) { + int r = src_image_ctx->snap_set(m_src_snap_id); + if (r < 0) { + src_image_ctx->image_lock.unlock(); + + lderr(m_cct) << "error setting source image snap id: " + << cpp_strerror(r) << dendl; + finish(r); + return; + } + } + + src_image_ctx->image_lock.unlock(); + + finish(0); +} + +template <typename I> +void OpenSourceImageRequest<I>::close_image(int r) { + ldout(m_cct, 10) << "r=" << r << dendl; + + auto ctx = new LambdaContext([this, r](int) { + finish(r); + }); + (*m_src_image_ctx)->state->close(ctx); +} + +template <typename I> +void OpenSourceImageRequest<I>::register_image_dispatch() { + ldout(m_cct, 10) << dendl; + + // intercept any IO requests to the source image + auto io_image_dispatch = ImageDispatch<I>::create( + *m_src_image_ctx, std::move(m_format)); + (*m_src_image_ctx)->io_image_dispatcher->register_dispatch(io_image_dispatch); +} + +template <typename I> +void OpenSourceImageRequest<I>::finish(int r) { + ldout(m_cct, 10) << "r=" << r << dendl; + + if (r < 0) { + *m_src_image_ctx = nullptr; + } else { + register_image_dispatch(); + } + + m_on_finish->complete(r); + delete this; +} + +} // namespace migration +} // namespace librbd + +template class librbd::migration::OpenSourceImageRequest<librbd::ImageCtx>; diff --git a/src/librbd/migration/OpenSourceImageRequest.h b/src/librbd/migration/OpenSourceImageRequest.h new file mode 100644 index 000000000..f0dab3ad9 --- /dev/null +++ b/src/librbd/migration/OpenSourceImageRequest.h @@ -0,0 +1,103 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#ifndef CEPH_LIBRBD_MIGRATION_OPEN_SOURCE_IMAGE_REQUEST_H +#define CEPH_LIBRBD_MIGRATION_OPEN_SOURCE_IMAGE_REQUEST_H + +#include "include/rados/librados_fwd.hpp" +#include "librbd/Types.h" +#include <map> +#include <memory> + +struct Context; + +namespace librbd { + +struct ImageCtx; + +namespace migration { + +struct FormatInterface; + +template <typename ImageCtxT> +class OpenSourceImageRequest { +public: + static OpenSourceImageRequest* create(librados::IoCtx& io_ctx, + ImageCtxT* destination_image_ctx, + uint64_t src_snap_id, + const MigrationInfo &migration_info, + ImageCtxT** source_image_ctx, + Context* on_finish) { + return new OpenSourceImageRequest(io_ctx, destination_image_ctx, + src_snap_id, migration_info, + source_image_ctx, on_finish); + } + + OpenSourceImageRequest(librados::IoCtx& io_ctx, + ImageCtxT* destination_image_ctx, + uint64_t src_snap_id, + const MigrationInfo &migration_info, + ImageCtxT** source_image_ctx, + Context* on_finish); + + void send(); + +private: + /** + * @verbatim + * + * <start> + * | + * v + * OPEN_SOURCE + * | + * v + * GET_IMAGE_SIZE * * * * * * * + * | * + * v v + * GET_SNAPSHOTS * * * * > CLOSE_IMAGE + * | | + * v | + * <finish> <------------------/ + * + * @endverbatim + */ + + typedef std::map<uint64_t, SnapInfo> SnapInfos; + + CephContext* m_cct; + librados::IoCtx& m_io_ctx; + ImageCtxT* m_dst_image_ctx; + uint64_t m_src_snap_id; + MigrationInfo m_migration_info; + ImageCtxT** m_src_image_ctx; + Context* m_on_finish; + + std::unique_ptr<FormatInterface> m_format; + + uint64_t m_image_size = 0; + SnapInfos m_snap_infos; + + void open_source(); + void handle_open_source(int r); + + void get_image_size(); + void handle_get_image_size(int r); + + void get_snapshots(); + void handle_get_snapshots(int r); + + void close_image(int r); + + void register_image_dispatch(); + + void finish(int r); + +}; + +} // namespace migration +} // namespace librbd + +extern template class librbd::migration::OpenSourceImageRequest<librbd::ImageCtx>; + +#endif // CEPH_LIBRBD_MIGRATION_OPEN_SOURCE_IMAGE_REQUEST_H diff --git a/src/librbd/migration/QCOW.h b/src/librbd/migration/QCOW.h new file mode 100644 index 000000000..23401e515 --- /dev/null +++ b/src/librbd/migration/QCOW.h @@ -0,0 +1,466 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +/* Based on QEMU block/qcow.cc and block/qcow2.h, which has this license: */ + +/* + * Block driver for the QCOW version 2 format + * + * Copyright (c) 2004-2006 Fabrice Bellard + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL + * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN + * THE SOFTWARE. + */ + +#ifndef CEPH_LIBRBD_MIGRATION_QCOW2_H +#define CEPH_LIBRBD_MIGRATION_QCOW2_H + +#include "include/ceph_assert.h" +#include "include/int_types.h" +#include "librbd/migration/QCOW.h" + +#define QCOW_MAGIC (('Q' << 24) | ('F' << 16) | ('I' << 8) | 0xfb) + +#define QCOW_CRYPT_NONE 0 +#define QCOW_CRYPT_AES 1 +#define QCOW_CRYPT_LUKS 2 + +#define QCOW_MAX_CRYPT_CLUSTERS 32 +#define QCOW_MAX_SNAPSHOTS 65536 + +/* Field widths in qcow2 mean normal cluster offsets cannot reach + * 64PB; depending on cluster size, compressed clusters can have a + * smaller limit (64PB for up to 16k clusters, then ramps down to + * 512TB for 2M clusters). */ +#define QCOW_MAX_CLUSTER_OFFSET ((1ULL << 56) - 1) + +/* 8 MB refcount table is enough for 2 PB images at 64k cluster size + * (128 GB for 512 byte clusters, 2 EB for 2 MB clusters) */ +#define QCOW_MAX_REFTABLE_SIZE (1ULL << 23) + +/* 32 MB L1 table is enough for 2 PB images at 64k cluster size + * (128 GB for 512 byte clusters, 2 EB for 2 MB clusters) */ +#define QCOW_MAX_L1_SIZE (1ULL << 25) + +/* Allow for an average of 1k per snapshot table entry, should be plenty of + * space for snapshot names and IDs */ +#define QCOW_MAX_SNAPSHOTS_SIZE (1024 * QCOW_MAX_SNAPSHOTS) + +/* Maximum amount of extra data per snapshot table entry to accept */ +#define QCOW_MAX_SNAPSHOT_EXTRA_DATA 1024 + +/* Bitmap header extension constraints */ +#define QCOW2_MAX_BITMAPS 65535 +#define QCOW2_MAX_BITMAP_DIRECTORY_SIZE (1024 * QCOW2_MAX_BITMAPS) + +/* Maximum of parallel sub-request per guest request */ +#define QCOW2_MAX_WORKERS 8 + +/* indicate that the refcount of the referenced cluster is exactly one. */ +#define QCOW_OFLAG_COPIED (1ULL << 63) +/* indicate that the cluster is compressed (they never have the copied flag) */ +#define QCOW_OFLAG_COMPRESSED (1ULL << 62) +/* The cluster reads as all zeros */ +#define QCOW_OFLAG_ZERO (1ULL << 0) + +#define QCOW_EXTL2_SUBCLUSTERS_PER_CLUSTER 32 + +/* The subcluster X [0..31] is allocated */ +#define QCOW_OFLAG_SUB_ALLOC(X) (1ULL << (X)) +/* The subcluster X [0..31] reads as zeroes */ +#define QCOW_OFLAG_SUB_ZERO(X) (QCOW_OFLAG_SUB_ALLOC(X) << 32) +/* Subclusters [X, Y) (0 <= X <= Y <= 32) are allocated */ +#define QCOW_OFLAG_SUB_ALLOC_RANGE(X, Y) \ + (QCOW_OFLAG_SUB_ALLOC(Y) - QCOW_OFLAG_SUB_ALLOC(X)) +/* Subclusters [X, Y) (0 <= X <= Y <= 32) read as zeroes */ +#define QCOW_OFLAG_SUB_ZERO_RANGE(X, Y) \ + (QCOW_OFLAG_SUB_ALLOC_RANGE(X, Y) << 32) +/* L2 entry bitmap with all allocation bits set */ +#define QCOW_L2_BITMAP_ALL_ALLOC (QCOW_OFLAG_SUB_ALLOC_RANGE(0, 32)) +/* L2 entry bitmap with all "read as zeroes" bits set */ +#define QCOW_L2_BITMAP_ALL_ZEROES (QCOW_OFLAG_SUB_ZERO_RANGE(0, 32)) + +/* Size of normal and extended L2 entries */ +#define QCOW_L2E_SIZE_NORMAL (sizeof(uint64_t)) +#define QCOW_L2E_SIZE_EXTENDED (sizeof(uint64_t) * 2) + +/* Size of L1 table entries */ +#define QCOW_L1E_SIZE (sizeof(uint64_t)) + +/* Size of reftable entries */ +#define QCOW_REFTABLE_ENTRY_SIZE (sizeof(uint64_t)) + +#define QCOW_MIN_CLUSTER_BITS 9 +#define QCOW_MAX_CLUSTER_BITS 21 + +/* Defined in the qcow2 spec (compressed cluster descriptor) */ +#define QCOW2_COMPRESSED_SECTOR_SIZE 512U +#define QCOW2_COMPRESSED_SECTOR_MASK (~(QCOW2_COMPRESSED_SECTOR_SIZE - 1ULL)) + +#define QCOW_L2_CACHE_SIZE 16 + +/* Must be at least 2 to cover COW */ +#define QCOW_MIN_L2_CACHE_SIZE 2 /* cache entries */ + +/* Must be at least 4 to cover all cases of refcount table growth */ +#define QCOW_MIN_REFCOUNT_CACHE_SIZE 4 /* clusters */ + +#define QCOW_DEFAULT_L2_CACHE_MAX_SIZE (1ULL << 25) +#define QCOW_DEFAULT_CACHE_CLEAN_INTERVAL 600 /* seconds */ + +#define QCOW_DEFAULT_CLUSTER_SIZE 65536 + +#define QCOW2_OPT_DATA_FILE "data-file" +#define QCOW2_OPT_LAZY_REFCOUNTS "lazy-refcounts" +#define QCOW2_OPT_DISCARD_REQUEST "pass-discard-request" +#define QCOW2_OPT_DISCARD_SNAPSHOT "pass-discard-snapshot" +#define QCOW2_OPT_DISCARD_OTHER "pass-discard-other" +#define QCOW2_OPT_OVERLAP "overlap-check" +#define QCOW2_OPT_OVERLAP_TEMPLATE "overlap-check.template" +#define QCOW2_OPT_OVERLAP_MAIN_HEADER "overlap-check.main-header" +#define QCOW2_OPT_OVERLAP_ACTIVE_L1 "overlap-check.active-l1" +#define QCOW2_OPT_OVERLAP_ACTIVE_L2 "overlap-check.active-l2" +#define QCOW2_OPT_OVERLAP_REFCOUNT_TABLE "overlap-check.refcount-table" +#define QCOW2_OPT_OVERLAP_REFCOUNT_BLOCK "overlap-check.refcount-block" +#define QCOW2_OPT_OVERLAP_SNAPSHOT_TABLE "overlap-check.snapshot-table" +#define QCOW2_OPT_OVERLAP_INACTIVE_L1 "overlap-check.inactive-l1" +#define QCOW2_OPT_OVERLAP_INACTIVE_L2 "overlap-check.inactive-l2" +#define QCOW2_OPT_OVERLAP_BITMAP_DIRECTORY "overlap-check.bitmap-directory" +#define QCOW2_OPT_CACHE_SIZE "cache-size" +#define QCOW2_OPT_L2_CACHE_SIZE "l2-cache-size" +#define QCOW2_OPT_L2_CACHE_ENTRY_SIZE "l2-cache-entry-size" +#define QCOW2_OPT_REFCOUNT_CACHE_SIZE "refcount-cache-size" +#define QCOW2_OPT_CACHE_CLEAN_INTERVAL "cache-clean-interval" + +typedef struct QCowHeaderProbe { + uint32_t magic; + uint32_t version; +} __attribute__((__packed__)) QCowHeaderProbe; + +typedef struct QCowHeaderV1 +{ + uint32_t magic; + uint32_t version; + uint64_t backing_file_offset; + uint32_t backing_file_size; + uint32_t mtime; + uint64_t size; /* in bytes */ + uint8_t cluster_bits; + uint8_t l2_bits; + uint16_t padding; + uint32_t crypt_method; + uint64_t l1_table_offset; +} __attribute__((__packed__)) QCowHeaderV1; + +typedef struct QCowHeader { + uint32_t magic; + uint32_t version; + uint64_t backing_file_offset; + uint32_t backing_file_size; + uint32_t cluster_bits; + uint64_t size; /* in bytes */ + uint32_t crypt_method; + uint32_t l1_size; /* XXX: save number of clusters instead ? */ + uint64_t l1_table_offset; + uint64_t refcount_table_offset; + uint32_t refcount_table_clusters; + uint32_t nb_snapshots; + uint64_t snapshots_offset; + + /* The following fields are only valid for version >= 3 */ + uint64_t incompatible_features; + uint64_t compatible_features; + uint64_t autoclear_features; + + uint32_t refcount_order; + uint32_t header_length; + + /* Additional fields */ + uint8_t compression_type; + + /* header must be a multiple of 8 */ + uint8_t padding[7]; +} __attribute__((__packed__)) QCowHeader; + +typedef struct QCowSnapshotHeader { + /* header is 8 byte aligned */ + uint64_t l1_table_offset; + + uint32_t l1_size; + uint16_t id_str_size; + uint16_t name_size; + + uint32_t date_sec; + uint32_t date_nsec; + + uint64_t vm_clock_nsec; + + uint32_t vm_state_size; + uint32_t extra_data_size; /* for extension */ + /* extra data follows */ + /* id_str follows */ + /* name follows */ +} __attribute__((__packed__)) QCowSnapshotHeader; + +typedef struct QCowSnapshotExtraData { + uint64_t vm_state_size_large; + uint64_t disk_size; + uint64_t icount; +} __attribute__((__packed__)) QCowSnapshotExtraData; + + +typedef struct QCowSnapshot { + uint64_t l1_table_offset; + uint32_t l1_size; + char *id_str; + char *name; + uint64_t disk_size; + uint64_t vm_state_size; + uint32_t date_sec; + uint32_t date_nsec; + uint64_t vm_clock_nsec; + /* icount value for the moment when snapshot was taken */ + uint64_t icount; + /* Size of all extra data, including QCowSnapshotExtraData if available */ + uint32_t extra_data_size; + /* Data beyond QCowSnapshotExtraData, if any */ + void *unknown_extra_data; +} QCowSnapshot; + +typedef struct Qcow2CryptoHeaderExtension { + uint64_t offset; + uint64_t length; +} __attribute__((__packed__)) Qcow2CryptoHeaderExtension; + +typedef struct Qcow2UnknownHeaderExtension { + uint32_t magic; + uint32_t len; + uint8_t data[]; +} Qcow2UnknownHeaderExtension; + +enum { + QCOW2_FEAT_TYPE_INCOMPATIBLE = 0, + QCOW2_FEAT_TYPE_COMPATIBLE = 1, + QCOW2_FEAT_TYPE_AUTOCLEAR = 2, +}; + +/* Incompatible feature bits */ +enum { + QCOW2_INCOMPAT_DIRTY_BITNR = 0, + QCOW2_INCOMPAT_CORRUPT_BITNR = 1, + QCOW2_INCOMPAT_DATA_FILE_BITNR = 2, + QCOW2_INCOMPAT_COMPRESSION_BITNR = 3, + QCOW2_INCOMPAT_EXTL2_BITNR = 4, + QCOW2_INCOMPAT_DIRTY = 1 << QCOW2_INCOMPAT_DIRTY_BITNR, + QCOW2_INCOMPAT_CORRUPT = 1 << QCOW2_INCOMPAT_CORRUPT_BITNR, + QCOW2_INCOMPAT_DATA_FILE = 1 << QCOW2_INCOMPAT_DATA_FILE_BITNR, + QCOW2_INCOMPAT_COMPRESSION = 1 << QCOW2_INCOMPAT_COMPRESSION_BITNR, + QCOW2_INCOMPAT_EXTL2 = 1 << QCOW2_INCOMPAT_EXTL2_BITNR, + + QCOW2_INCOMPAT_MASK = QCOW2_INCOMPAT_DIRTY + | QCOW2_INCOMPAT_CORRUPT + | QCOW2_INCOMPAT_DATA_FILE + | QCOW2_INCOMPAT_COMPRESSION + | QCOW2_INCOMPAT_EXTL2, +}; + +/* Compatible feature bits */ +enum { + QCOW2_COMPAT_LAZY_REFCOUNTS_BITNR = 0, + QCOW2_COMPAT_LAZY_REFCOUNTS = 1 << QCOW2_COMPAT_LAZY_REFCOUNTS_BITNR, + + QCOW2_COMPAT_FEAT_MASK = QCOW2_COMPAT_LAZY_REFCOUNTS, +}; + +/* Autoclear feature bits */ +enum { + QCOW2_AUTOCLEAR_BITMAPS_BITNR = 0, + QCOW2_AUTOCLEAR_DATA_FILE_RAW_BITNR = 1, + QCOW2_AUTOCLEAR_BITMAPS = 1 << QCOW2_AUTOCLEAR_BITMAPS_BITNR, + QCOW2_AUTOCLEAR_DATA_FILE_RAW = 1 << QCOW2_AUTOCLEAR_DATA_FILE_RAW_BITNR, + + QCOW2_AUTOCLEAR_MASK = QCOW2_AUTOCLEAR_BITMAPS + | QCOW2_AUTOCLEAR_DATA_FILE_RAW, +}; + +enum qcow2_discard_type { + QCOW2_DISCARD_NEVER = 0, + QCOW2_DISCARD_ALWAYS, + QCOW2_DISCARD_REQUEST, + QCOW2_DISCARD_SNAPSHOT, + QCOW2_DISCARD_OTHER, + QCOW2_DISCARD_MAX +}; + +typedef struct Qcow2Feature { + uint8_t type; + uint8_t bit; + char name[46]; +} __attribute__((__packed__)) Qcow2Feature; + +typedef struct Qcow2DiscardRegion { + uint64_t offset; + uint64_t bytes; +} Qcow2DiscardRegion; + +typedef uint64_t Qcow2GetRefcountFunc(const void *refcount_array, + uint64_t index); +typedef void Qcow2SetRefcountFunc(void *refcount_array, + uint64_t index, uint64_t value); + +typedef struct Qcow2BitmapHeaderExt { + uint32_t nb_bitmaps; + uint32_t reserved32; + uint64_t bitmap_directory_size; + uint64_t bitmap_directory_offset; +} __attribute__((__packed__)) Qcow2BitmapHeaderExt; + +#define QCOW_RC_CACHE_SIZE QCOW_L2_CACHE_SIZE; + +typedef struct Qcow2COWRegion { + /** + * Offset of the COW region in bytes from the start of the first cluster + * touched by the request. + */ + unsigned offset; + + /** Number of bytes to copy */ + unsigned nb_bytes; +} Qcow2COWRegion; + +/** + * Describes an in-flight (part of a) write request that writes to clusters + * that are not referenced in their L2 table yet. + */ +typedef struct QCowL2Meta +{ + /** Guest offset of the first newly allocated cluster */ + uint64_t offset; + + /** Host offset of the first newly allocated cluster */ + uint64_t alloc_offset; + + /** Number of newly allocated clusters */ + int nb_clusters; + + /** Do not free the old clusters */ + bool keep_old_clusters; + + /** + * The COW Region between the start of the first allocated cluster and the + * area the guest actually writes to. + */ + Qcow2COWRegion cow_start; + + /** + * The COW Region between the area the guest actually writes to and the + * end of the last allocated cluster. + */ + Qcow2COWRegion cow_end; + + /* + * Indicates that COW regions are already handled and do not require + * any more processing. + */ + bool skip_cow; + + /** + * Indicates that this is not a normal write request but a preallocation. + * If the image has extended L2 entries this means that no new individual + * subclusters will be marked as allocated in the L2 bitmap (but any + * existing contents of that bitmap will be kept). + */ + bool prealloc; + + /** Pointer to next L2Meta of the same write request */ + struct QCowL2Meta *next; +} QCowL2Meta; + +typedef enum QCow2ClusterType { + QCOW2_CLUSTER_UNALLOCATED, + QCOW2_CLUSTER_ZERO_PLAIN, + QCOW2_CLUSTER_ZERO_ALLOC, + QCOW2_CLUSTER_NORMAL, + QCOW2_CLUSTER_COMPRESSED, +} QCow2ClusterType; + +typedef enum QCow2MetadataOverlap { + QCOW2_OL_MAIN_HEADER_BITNR = 0, + QCOW2_OL_ACTIVE_L1_BITNR = 1, + QCOW2_OL_ACTIVE_L2_BITNR = 2, + QCOW2_OL_REFCOUNT_TABLE_BITNR = 3, + QCOW2_OL_REFCOUNT_BLOCK_BITNR = 4, + QCOW2_OL_SNAPSHOT_TABLE_BITNR = 5, + QCOW2_OL_INACTIVE_L1_BITNR = 6, + QCOW2_OL_INACTIVE_L2_BITNR = 7, + QCOW2_OL_BITMAP_DIRECTORY_BITNR = 8, + + QCOW2_OL_MAX_BITNR = 9, + + QCOW2_OL_NONE = 0, + QCOW2_OL_MAIN_HEADER = (1 << QCOW2_OL_MAIN_HEADER_BITNR), + QCOW2_OL_ACTIVE_L1 = (1 << QCOW2_OL_ACTIVE_L1_BITNR), + QCOW2_OL_ACTIVE_L2 = (1 << QCOW2_OL_ACTIVE_L2_BITNR), + QCOW2_OL_REFCOUNT_TABLE = (1 << QCOW2_OL_REFCOUNT_TABLE_BITNR), + QCOW2_OL_REFCOUNT_BLOCK = (1 << QCOW2_OL_REFCOUNT_BLOCK_BITNR), + QCOW2_OL_SNAPSHOT_TABLE = (1 << QCOW2_OL_SNAPSHOT_TABLE_BITNR), + QCOW2_OL_INACTIVE_L1 = (1 << QCOW2_OL_INACTIVE_L1_BITNR), + /* NOTE: Checking overlaps with inactive L2 tables will result in bdrv + * reads. */ + QCOW2_OL_INACTIVE_L2 = (1 << QCOW2_OL_INACTIVE_L2_BITNR), + QCOW2_OL_BITMAP_DIRECTORY = (1 << QCOW2_OL_BITMAP_DIRECTORY_BITNR), +} QCow2MetadataOverlap; + +/* Perform all overlap checks which can be done in constant time */ +#define QCOW2_OL_CONSTANT \ + (QCOW2_OL_MAIN_HEADER | QCOW2_OL_ACTIVE_L1 | QCOW2_OL_REFCOUNT_TABLE | \ + QCOW2_OL_SNAPSHOT_TABLE | QCOW2_OL_BITMAP_DIRECTORY) + +/* Perform all overlap checks which don't require disk access */ +#define QCOW2_OL_CACHED \ + (QCOW2_OL_CONSTANT | QCOW2_OL_ACTIVE_L2 | QCOW2_OL_REFCOUNT_BLOCK | \ + QCOW2_OL_INACTIVE_L1) + +/* Perform all overlap checks */ +#define QCOW2_OL_ALL \ + (QCOW2_OL_CACHED | QCOW2_OL_INACTIVE_L2) + +#define QCOW_L1E_OFFSET_MASK 0x00fffffffffffe00ULL +#define QCOW_L2E_OFFSET_MASK 0x00fffffffffffe00ULL +#define QCOW_L2E_COMPRESSED_OFFSET_SIZE_MASK 0x3fffffffffffffffULL + +#define REFT_OFFSET_MASK 0xfffffffffffffe00ULL + +#define INV_OFFSET (-1ULL) + +static inline uint64_t l2meta_cow_start(QCowL2Meta *m) +{ + return m->offset + m->cow_start.offset; +} + +static inline uint64_t l2meta_cow_end(QCowL2Meta *m) +{ + return m->offset + m->cow_end.offset + m->cow_end.nb_bytes; +} + +static inline uint64_t refcount_diff(uint64_t r1, uint64_t r2) +{ + return r1 > r2 ? r1 - r2 : r2 - r1; +} + +#endif // CEPH_LIBRBD_MIGRATION_QCOW2_H diff --git a/src/librbd/migration/QCOWFormat.cc b/src/librbd/migration/QCOWFormat.cc new file mode 100644 index 000000000..7bd4a5ef7 --- /dev/null +++ b/src/librbd/migration/QCOWFormat.cc @@ -0,0 +1,1542 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#include "librbd/migration/QCOWFormat.h" +#include "common/Clock.h" +#include "common/dout.h" +#include "common/errno.h" +#include "include/intarith.h" +#include "librbd/AsioEngine.h" +#include "librbd/ImageCtx.h" +#include "librbd/ImageState.h" +#include "librbd/Utils.h" +#include "librbd/io/AioCompletion.h" +#include "librbd/io/ReadResult.h" +#include "librbd/migration/SnapshotInterface.h" +#include "librbd/migration/SourceSpecBuilder.h" +#include "librbd/migration/StreamInterface.h" +#include "librbd/migration/Utils.h" +#include <boost/asio/dispatch.hpp> +#include <boost/asio/post.hpp> +#include <deque> +#include <tuple> +#include <unordered_map> +#include <vector> + +#define dout_subsys ceph_subsys_rbd + +namespace librbd { +namespace migration { + +#undef dout_prefix +#define dout_prefix *_dout << "librbd::migration::QCOWFormat: " \ + << __func__ << ": " + +namespace qcow_format { + +struct ClusterExtent { + uint64_t cluster_offset; + uint64_t cluster_length; + uint64_t intra_cluster_offset; + uint64_t image_offset; + uint64_t buffer_offset; + + ClusterExtent(uint64_t cluster_offset, uint64_t cluster_length, + uint64_t intra_cluster_offset, uint64_t image_offset, + uint64_t buffer_offset) + : cluster_offset(cluster_offset), cluster_length(cluster_length), + intra_cluster_offset(intra_cluster_offset), image_offset(image_offset), + buffer_offset(buffer_offset) { + } +}; + +typedef std::vector<ClusterExtent> ClusterExtents; + +void LookupTable::init() { + if (cluster_offsets == nullptr) { + cluster_offsets = reinterpret_cast<uint64_t*>(bl.c_str()); + } +} + +void LookupTable::decode() { + init(); + + // L2 tables are selectively byte-swapped on demand if only requesting a + // single cluster offset + if (decoded) { + return; + } + + // translate the lookup table (big-endian -> CPU endianess) + for (auto idx = 0UL; idx < size; ++idx) { + cluster_offsets[idx] = be64toh(cluster_offsets[idx]); + } + + decoded = true; +} + +void populate_cluster_extents(CephContext* cct, uint64_t cluster_size, + const io::Extents& image_extents, + ClusterExtents* cluster_extents) { + uint64_t buffer_offset = 0; + for (auto [image_offset, image_length] : image_extents) { + while (image_length > 0) { + auto intra_cluster_offset = image_offset & (cluster_size - 1); + auto intra_cluster_length = cluster_size - intra_cluster_offset; + auto cluster_length = std::min(image_length, intra_cluster_length); + + ldout(cct, 20) << "image_offset=" << image_offset << ", " + << "image_length=" << image_length << ", " + << "cluster_length=" << cluster_length << dendl; + + + cluster_extents->emplace_back(0, cluster_length, intra_cluster_offset, + image_offset, buffer_offset); + + image_offset += cluster_length; + image_length -= cluster_length; + buffer_offset += cluster_length; + } + } +} + +} // namespace qcow_format + +using namespace qcow_format; + +template <typename I> +struct QCOWFormat<I>::Cluster { + const uint64_t cluster_offset; + bufferlist cluster_data_bl; + + Cluster(uint64_t cluster_offset) : cluster_offset(cluster_offset) { + } +}; + +#undef dout_prefix +#define dout_prefix *_dout << "librbd::migration::QCOWFormat::ClusterCache: " \ + << this << " " << __func__ << ": " + +template <typename I> +class QCOWFormat<I>::ClusterCache { +public: + ClusterCache(QCOWFormat* qcow_format) + : qcow_format(qcow_format), + m_strand(*qcow_format->m_image_ctx->asio_engine) { + } + + void get_cluster(uint64_t cluster_offset, uint64_t cluster_length, + uint64_t intra_cluster_offset, bufferlist* bl, + Context* on_finish) { + auto cct = qcow_format->m_image_ctx->cct; + ldout(cct, 20) << "cluster_offset=" << cluster_offset << dendl; + + // cache state machine runs in a single strand thread + boost::asio::dispatch( + m_strand, + [this, cluster_offset, cluster_length, intra_cluster_offset, bl, + on_finish]() { + execute_get_cluster(cluster_offset, cluster_length, + intra_cluster_offset, bl, on_finish); + }); + } + +private: + typedef std::tuple<uint64_t, uint64_t, bufferlist*, Context*> Completion; + typedef std::list<Completion> Completions; + + QCOWFormat* qcow_format; + boost::asio::io_context::strand m_strand; + + std::shared_ptr<Cluster> cluster; + std::unordered_map<uint64_t, Completions> cluster_completions; + + void execute_get_cluster(uint64_t cluster_offset, uint64_t cluster_length, + uint64_t intra_cluster_offset, bufferlist* bl, + Context* on_finish) { + auto cct = qcow_format->m_image_ctx->cct; + ldout(cct, 20) << "cluster_offset=" << cluster_offset << dendl; + + if (cluster && cluster->cluster_offset == cluster_offset) { + // most-recent cluster matches + bl->substr_of(cluster->cluster_data_bl, intra_cluster_offset, + cluster_length); + boost::asio::post(*qcow_format->m_image_ctx->asio_engine, + [on_finish]() { on_finish->complete(0); }); + return; + } + + // record callback for cluster + bool new_request = (cluster_completions.count(cluster_offset) == 0); + cluster_completions[cluster_offset].emplace_back( + intra_cluster_offset, cluster_length, bl, on_finish); + if (new_request) { + // start the new read request + read_cluster(std::make_shared<Cluster>(cluster_offset)); + } + } + + void read_cluster(std::shared_ptr<Cluster> cluster) { + auto cct = qcow_format->m_image_ctx->cct; + + uint64_t stream_offset = cluster->cluster_offset; + uint64_t stream_length = qcow_format->m_cluster_size; + if ((cluster->cluster_offset & QCOW_OFLAG_COMPRESSED) != 0) { + // compressed clusters encode the compressed length in the lower bits + stream_offset = cluster->cluster_offset & + qcow_format->m_cluster_offset_mask; + stream_length = (cluster->cluster_offset >> + (63 - qcow_format->m_cluster_bits)) & + (qcow_format->m_cluster_size - 1); + } + + ldout(cct, 20) << "cluster_offset=" << cluster->cluster_offset << ", " + << "stream_offset=" << stream_offset << ", " + << "stream_length=" << stream_length << dendl; + + // read the cluster into the cache entry + auto ctx = new LambdaContext([this, cluster](int r) { + boost::asio::post(m_strand, [this, cluster, r]() { + handle_read_cluster(r, cluster); }); }); + qcow_format->m_stream->read({{stream_offset, stream_length}}, + &cluster->cluster_data_bl, ctx); + } + + void handle_read_cluster(int r, std::shared_ptr<Cluster> cluster) { + auto cct = qcow_format->m_image_ctx->cct; + ldout(cct, 20) << "r=" << r << ", " + << "cluster_offset=" << cluster->cluster_offset << dendl; + + auto completions = std::move(cluster_completions[cluster->cluster_offset]); + cluster_completions.erase(cluster->cluster_offset); + + if (r < 0) { + lderr(cct) << "failed to read cluster offset " << cluster->cluster_offset + << ": " << cpp_strerror(r) << dendl; + } else { + if ((cluster->cluster_offset & QCOW_OFLAG_COMPRESSED) != 0) { + bufferlist compressed_bl{std::move(cluster->cluster_data_bl)}; + cluster->cluster_data_bl.clear(); + + // TODO + lderr(cct) << "support for compressed clusters is not available" + << dendl; + r = -EINVAL; + } else { + // cache the MRU cluster in case of sequential IO + this->cluster = cluster; + } + } + + // complete the IO back to caller + boost::asio::post(*qcow_format->m_image_ctx->asio_engine, + [r, cluster, completions=std::move(completions)]() { + for (auto completion : completions) { + if (r >= 0) { + std::get<2>(completion)->substr_of( + cluster->cluster_data_bl, + std::get<0>(completion), + std::get<1>(completion)); + } + std::get<3>(completion)->complete(r); + } + }); + } +}; + +#undef dout_prefix +#define dout_prefix *_dout << "librbd::migration::QCOWFormat::L2TableCache: " \ + << this << " " << __func__ << ": " + +template <typename I> +class QCOWFormat<I>::L2TableCache { +public: + L2TableCache(QCOWFormat* qcow_format) + : qcow_format(qcow_format), + m_strand(*qcow_format->m_image_ctx->asio_engine), + l2_cache_entries(QCOW_L2_CACHE_SIZE) { + } + + void get_l2_table(const LookupTable* l1_table, uint64_t l2_table_offset, + std::shared_ptr<const LookupTable>* l2_table, + Context* on_finish) { + auto cct = qcow_format->m_image_ctx->cct; + ldout(cct, 20) << "l2_table_offset=" << l2_table_offset << dendl; + + // cache state machine runs in a single strand thread + Request request{l1_table, l2_table_offset, l2_table, on_finish}; + boost::asio::dispatch( + m_strand, [this, request=std::move(request)]() { + requests.push_back(std::move(request)); + }); + dispatch_request(); + } + + void get_cluster_offset(const LookupTable* l1_table, + uint64_t image_offset, uint64_t* cluster_offset, + Context* on_finish) { + auto cct = qcow_format->m_image_ctx->cct; + uint32_t l1_table_index = image_offset >> qcow_format->m_l1_shift; + uint64_t l2_table_offset = l1_table->cluster_offsets[std::min<uint32_t>( + l1_table_index, l1_table->size - 1)] & + qcow_format->m_cluster_mask; + uint32_t l2_table_index = (image_offset >> qcow_format->m_cluster_bits) & + (qcow_format->m_l2_size - 1); + ldout(cct, 20) << "image_offset=" << image_offset << ", " + << "l1_table_index=" << l1_table_index << ", " + << "l2_table_offset=" << l2_table_offset << ", " + << "l2_table_index=" << l2_table_index << dendl; + + if (l1_table_index >= l1_table->size) { + lderr(cct) << "L1 index " << l1_table_index << " out-of-bounds" << dendl; + on_finish->complete(-ERANGE); + return; + } else if (l2_table_offset == 0) { + // L2 table has not been allocated for specified offset + ldout(cct, 20) << "image_offset=" << image_offset << ", " + << "cluster_offset=DNE" << dendl; + *cluster_offset = 0; + on_finish->complete(-ENOENT); + return; + } + + // cache state machine runs in a single strand thread + Request request{l1_table, l2_table_offset, l2_table_index, cluster_offset, + on_finish}; + boost::asio::dispatch( + m_strand, [this, request=std::move(request)]() { + requests.push_back(std::move(request)); + }); + dispatch_request(); + } + +private: + QCOWFormat* qcow_format; + + boost::asio::io_context::strand m_strand; + + struct Request { + const LookupTable* l1_table; + + uint64_t l2_table_offset; + + // get_cluster_offset request + uint32_t l2_table_index; + uint64_t* cluster_offset = nullptr; + + // get_l2_table request + std::shared_ptr<const LookupTable>* l2_table; + + Context* on_finish; + + Request(const LookupTable* l1_table, uint64_t l2_table_offset, + uint32_t l2_table_index, uint64_t* cluster_offset, + Context* on_finish) + : l1_table(l1_table), l2_table_offset(l2_table_offset), + l2_table_index(l2_table_index), cluster_offset(cluster_offset), + on_finish(on_finish) { + } + Request(const LookupTable* l1_table, uint64_t l2_table_offset, + std::shared_ptr<const LookupTable>* l2_table, Context* on_finish) + : l1_table(l1_table), l2_table_offset(l2_table_offset), + l2_table(l2_table), on_finish(on_finish) { + } + }; + + typedef std::deque<Request> Requests; + + struct L2Cache { + uint64_t l2_offset = 0; + std::shared_ptr<LookupTable> l2_table; + + utime_t timestamp; + uint32_t count = 0; + bool in_flight = false; + + int ret_val = 0; + }; + std::vector<L2Cache> l2_cache_entries; + + Requests requests; + + void dispatch_request() { + boost::asio::dispatch(m_strand, [this]() { execute_request(); }); + } + + void execute_request() { + auto cct = qcow_format->m_image_ctx->cct; + if (requests.empty()) { + return; + } + + auto request = requests.front(); + ldout(cct, 20) << "l2_table_offset=" << request.l2_table_offset << dendl; + + std::shared_ptr<LookupTable> l2_table; + int r = l2_table_lookup(request.l2_table_offset, &l2_table); + if (r < 0) { + lderr(cct) << "failed to load L2 table: l2_table_offset=" + << request.l2_table_offset << ": " + << cpp_strerror(r) << dendl; + } else if (l2_table == nullptr) { + // table not in cache -- will restart once its loaded + return; + } else if (request.cluster_offset != nullptr) { + auto cluster_offset = l2_table->cluster_offsets[request.l2_table_index]; + if (!l2_table->decoded) { + // table hasn't been byte-swapped + cluster_offset = be64toh(cluster_offset); + } + + *request.cluster_offset = cluster_offset & qcow_format->m_cluster_mask; + if (*request.cluster_offset == QCOW_OFLAG_ZERO) { + ldout(cct, 20) << "l2_table_offset=" << request.l2_table_offset << ", " + << "l2_table_index=" << request.l2_table_index << ", " + << "cluster_offset=zeroed" << dendl; + } else { + ldout(cct, 20) << "l2_table_offset=" << request.l2_table_offset << ", " + << "l2_table_index=" << request.l2_table_index << ", " + << "cluster_offset=" << *request.cluster_offset + << dendl; + } + } else if (request.l2_table != nullptr) { + // ensure it's in the correct byte-order + l2_table->decode(); + *request.l2_table = l2_table; + } else { + ceph_assert(false); + } + + // complete the L2 cache request + boost::asio::post(*qcow_format->m_image_ctx->asio_engine, + [r, ctx=request.on_finish]() { ctx->complete(r); }); + requests.pop_front(); + + // process next request (if any) + dispatch_request(); + } + + int l2_table_lookup(uint64_t l2_offset, + std::shared_ptr<LookupTable>* l2_table) { + auto cct = qcow_format->m_image_ctx->cct; + + l2_table->reset(); + + // find a match in the existing cache + for (auto idx = 0U; idx < l2_cache_entries.size(); ++idx) { + auto& l2_cache = l2_cache_entries[idx]; + if (l2_cache.l2_offset == l2_offset) { + if (l2_cache.in_flight) { + ldout(cct, 20) << "l2_offset=" << l2_offset << ", " + << "index=" << idx << " (in-flight)" << dendl; + return 0; + } + + if (l2_cache.ret_val < 0) { + ldout(cct, 20) << "l2_offset=" << l2_offset << ", " + << "index=" << idx << " (error): " + << cpp_strerror(l2_cache.ret_val) << dendl; + int r = l2_cache.ret_val; + l2_cache = L2Cache{}; + + return r; + } + + ++l2_cache.count; + if (l2_cache.count == std::numeric_limits<uint32_t>::max()) { + for (auto& entry : l2_cache_entries) { + entry.count >>= 1; + } + } + + ldout(cct, 20) << "l2_offset=" << l2_offset << ", " << "index=" << idx + << dendl; + *l2_table = l2_cache.l2_table; + return 0; + } + } + + // find the least used entry + int32_t min_idx = -1; + uint32_t min_count = std::numeric_limits<uint32_t>::max(); + utime_t min_timestamp; + for (uint32_t idx = 0U; idx < l2_cache_entries.size(); ++idx) { + auto& l2_cache = l2_cache_entries[idx]; + if (l2_cache.in_flight) { + continue; + } + + if (l2_cache.count > 0) { + --l2_cache.count; + } + + if (l2_cache.count <= min_count) { + if (min_idx == -1 || l2_cache.timestamp < min_timestamp) { + min_timestamp = l2_cache.timestamp; + min_count = l2_cache.count; + min_idx = idx; + } + } + } + + if (min_idx == -1) { + // no space in the cache due to in-flight requests + ldout(cct, 20) << "l2_offset=" << l2_offset << ", " + << "index=DNE (cache busy)" << dendl; + return 0; + } + + ldout(cct, 20) << "l2_offset=" << l2_offset << ", " + << "index=" << min_idx << " (loading)" << dendl; + auto& l2_cache = l2_cache_entries[min_idx]; + l2_cache.l2_table = std::make_shared<LookupTable>(qcow_format->m_l2_size); + l2_cache.l2_offset = l2_offset; + l2_cache.timestamp = ceph_clock_now(); + l2_cache.count = 1; + l2_cache.in_flight = true; + + // read the L2 table into the L2 cache entry + auto ctx = new LambdaContext([this, index=min_idx, l2_offset](int r) { + boost::asio::post(m_strand, [this, index, l2_offset, r]() { + handle_l2_table_lookup(r, index, l2_offset); }); }); + qcow_format->m_stream->read( + {{l2_offset, qcow_format->m_l2_size * sizeof(uint64_t)}}, + &l2_cache.l2_table->bl, ctx); + return 0; + } + + void handle_l2_table_lookup(int r, uint32_t index, uint64_t l2_offset) { + auto cct = qcow_format->m_image_ctx->cct; + ldout(cct, 20) << "r=" << r << ", " + << "l2_offset=" << l2_offset << ", " + << "index=" << index << dendl; + + auto& l2_cache = l2_cache_entries[index]; + ceph_assert(l2_cache.in_flight); + l2_cache.in_flight = false; + + if (r < 0) { + lderr(cct) << "failed to load L2 table: " + << "l2_offset=" << l2_cache.l2_offset << ": " + << cpp_strerror(r) << dendl; + l2_cache.ret_val = r; + } else { + // keep the L2 table in big-endian byte-order until the full table + // is requested + l2_cache.l2_table->init(); + } + + // restart the state machine + dispatch_request(); + } + +}; + +#undef dout_prefix +#define dout_prefix *_dout << "librbd::migration::QCOWFormat::ReadRequest: " \ + << this << " " << __func__ << ": " + +template <typename I> +class QCOWFormat<I>::ReadRequest { +public: + ReadRequest(QCOWFormat* qcow_format, io::AioCompletion* aio_comp, + const LookupTable* l1_table, io::Extents&& image_extents) + : qcow_format(qcow_format), aio_comp(aio_comp), l1_table(l1_table), + image_extents(std::move(image_extents)) { + } + + void send() { + get_cluster_offsets(); + } + +private: + QCOWFormat* qcow_format; + io::AioCompletion* aio_comp; + + const LookupTable* l1_table; + io::Extents image_extents; + + size_t image_extents_idx = 0; + uint32_t image_extent_offset = 0; + + ClusterExtents cluster_extents; + + void get_cluster_offsets() { + auto cct = qcow_format->m_image_ctx->cct; + populate_cluster_extents(cct, qcow_format->m_cluster_size, image_extents, + &cluster_extents); + + ldout(cct, 20) << dendl; + auto ctx = new LambdaContext([this](int r) { + handle_get_cluster_offsets(r); }); + auto gather_ctx = new C_Gather(cct, ctx); + + for (auto& cluster_extent : cluster_extents) { + auto sub_ctx = new LambdaContext( + [this, &cluster_extent, on_finish=gather_ctx->new_sub()](int r) { + handle_get_cluster_offset(r, cluster_extent, on_finish); }); + qcow_format->m_l2_table_cache->get_cluster_offset( + l1_table, cluster_extent.image_offset, + &cluster_extent.cluster_offset, sub_ctx); + } + + gather_ctx->activate(); + } + + void handle_get_cluster_offset(int r, const ClusterExtent& cluster_extent, + Context* on_finish) { + auto cct = qcow_format->m_image_ctx->cct; + ldout(cct, 20) << "r=" << r << ", " + << "image_offset=" << cluster_extent.image_offset << ", " + << "cluster_offset=" << cluster_extent.cluster_offset + << dendl; + + if (r == -ENOENT) { + ldout(cct, 20) << "image offset DNE in QCOW image" << dendl; + r = 0; + } else if (r < 0) { + lderr(cct) << "failed to map image offset " << cluster_extent.image_offset + << ": " << cpp_strerror(r) << dendl; + } + + on_finish->complete(r); + } + + void handle_get_cluster_offsets(int r) { + auto cct = qcow_format->m_image_ctx->cct; + ldout(cct, 20) << "r=" << r << dendl; + + if (r < 0) { + lderr(cct) << "failed to retrieve cluster extents: " << cpp_strerror(r) + << dendl; + aio_comp->fail(r); + delete this; + return; + } + + read_clusters(); + } + + void read_clusters() { + auto cct = qcow_format->m_image_ctx->cct; + ldout(cct, 20) << dendl; + + aio_comp->set_request_count(cluster_extents.size()); + for (auto& cluster_extent : cluster_extents) { + auto read_ctx = new io::ReadResult::C_ImageReadRequest( + aio_comp, cluster_extent.buffer_offset, + {{cluster_extent.image_offset, cluster_extent.cluster_length}}); + read_ctx->ignore_enoent = true; + + auto log_ctx = new LambdaContext( + [this, cct=qcow_format->m_image_ctx->cct, + image_offset=cluster_extent.image_offset, + image_length=cluster_extent.cluster_length, ctx=read_ctx](int r) { + handle_read_cluster(cct, r, image_offset, image_length, ctx); + }); + + if (cluster_extent.cluster_offset == 0) { + // QCOW header is at offset 0, implies cluster DNE + log_ctx->complete(-ENOENT); + } else if (cluster_extent.cluster_offset == QCOW_OFLAG_ZERO) { + // explicitly zeroed section + read_ctx->bl.append_zero(cluster_extent.cluster_length); + log_ctx->complete(0); + } else { + // request the (sub)cluster from the cluster cache + qcow_format->m_cluster_cache->get_cluster( + cluster_extent.cluster_offset, cluster_extent.cluster_length, + cluster_extent.intra_cluster_offset, &read_ctx->bl, log_ctx); + } + } + + delete this; + } + + void handle_read_cluster(CephContext* cct, int r, uint64_t image_offset, + uint64_t image_length, Context* on_finish) const { + // NOTE: treat as static function, expect object has been deleted + + ldout(cct, 20) << "r=" << r << ", " + << "image_offset=" << image_offset << ", " + << "image_length=" << image_length << dendl; + + if (r != -ENOENT && r < 0) { + lderr(cct) << "failed to read image extent " << image_offset << "~" + << image_length << ": " << cpp_strerror(r) << dendl; + } + + on_finish->complete(r); + } +}; + +#undef dout_prefix +#define dout_prefix *_dout << "librbd::migration::QCOWFormat::" \ + << "ListSnapsRequest: " << this << " " \ + << __func__ << ": " + +template <typename I> +class QCOWFormat<I>::ListSnapsRequest { +public: + ListSnapsRequest( + QCOWFormat* qcow_format, uint32_t l1_table_index, + ClusterExtents&& cluster_extents, + const std::map<uint64_t, const LookupTable*>& snap_id_to_l1_table, + io::SnapshotDelta* snapshot_delta, Context* on_finish) + : qcow_format(qcow_format), l1_table_index(l1_table_index), + cluster_extents(std::move(cluster_extents)), + snap_id_to_l1_table(snap_id_to_l1_table), snapshot_delta(snapshot_delta), + on_finish(on_finish) { + } + + void send() { + get_l2_table(); + } + +private: + QCOWFormat* qcow_format; + uint32_t l1_table_index; + ClusterExtents cluster_extents; + std::map<uint64_t, const LookupTable*> snap_id_to_l1_table; + io::SnapshotDelta* snapshot_delta; + Context* on_finish; + + std::shared_ptr<const LookupTable> previous_l2_table; + std::shared_ptr<const LookupTable> l2_table; + + void get_l2_table() { + auto cct = qcow_format->m_image_ctx->cct; + if (snap_id_to_l1_table.empty()) { + finish(0); + return; + } + + auto it = snap_id_to_l1_table.begin(); + auto [snap_id, l1_table] = *it; + snap_id_to_l1_table.erase(it); + + previous_l2_table = l2_table; + l2_table.reset(); + + auto ctx = new LambdaContext([this, snap_id = snap_id](int r) { + boost::asio::post(qcow_format->m_strand, [this, snap_id, r]() { + handle_get_l2_table(r, snap_id); + }); + }); + + if (l1_table_index >= l1_table->size || + l1_table->cluster_offsets[l1_table_index] == 0) { + ldout(cct, 20) << "l1_table_index=" << l1_table_index << ", " + << "snap_id=" << snap_id << ": DNE" << dendl; + ctx->complete(-ENOENT); + return; + } + + uint64_t l2_table_offset = l1_table->cluster_offsets[l1_table_index] & + qcow_format->m_cluster_mask; + + ldout(cct, 20) << "l1_table_index=" << l1_table_index << ", " + << "snap_id=" << snap_id << ", " + << "l2_table_offset=" << l2_table_offset << dendl; + qcow_format->m_l2_table_cache->get_l2_table(l1_table, l2_table_offset, + &l2_table, ctx); + } + + void handle_get_l2_table(int r, uint64_t snap_id) { + ceph_assert(qcow_format->m_strand.running_in_this_thread()); + + auto cct = qcow_format->m_image_ctx->cct; + ldout(cct, 20) << "r=" << r << ", " + << "snap_id=" << snap_id << dendl; + + if (r == -ENOENT) { + l2_table.reset(); + } else if (r < 0) { + lderr(cct) << "failed to retrieve L2 table for snapshot " << snap_id + << ": " << cpp_strerror(r) << dendl; + finish(r); + return; + } + + // compare the cluster offsets at each requested L2 offset between + // the previous snapshot's L2 table and the current L2 table. + auto& sparse_extents = (*snapshot_delta)[{snap_id, snap_id}]; + for (auto& cluster_extent : cluster_extents) { + uint32_t l2_table_index = + (cluster_extent.image_offset >> qcow_format->m_cluster_bits) & + (qcow_format->m_l2_size - 1); + + std::optional<uint64_t> cluster_offset; + if (l2_table && l2_table_index < l2_table->size) { + cluster_offset = l2_table->cluster_offsets[l2_table_index] & + qcow_format->m_cluster_offset_mask; + } + + std::optional<uint64_t> prev_cluster_offset; + if (previous_l2_table && l2_table_index < previous_l2_table->size) { + prev_cluster_offset = + previous_l2_table->cluster_offsets[l2_table_index] & + qcow_format->m_cluster_offset_mask; + } + + ldout(cct, 20) << "l1_table_index=" << l1_table_index << ", " + << "snap_id=" << snap_id << ", " + << "image_offset=" << cluster_extent.image_offset << ", " + << "l2_table_index=" << l2_table_index << ", " + << "cluster_offset=" << cluster_offset << ", " + << "prev_cluster_offset=" << prev_cluster_offset << dendl; + + auto state = io::SPARSE_EXTENT_STATE_DATA; + if (cluster_offset == prev_cluster_offset) { + continue; + } else if ((prev_cluster_offset && !cluster_offset) || + *cluster_offset == QCOW_OFLAG_ZERO) { + // explicitly zeroed or deallocated + state = io::SPARSE_EXTENT_STATE_ZEROED; + } + + sparse_extents.insert( + cluster_extent.image_offset, cluster_extent.cluster_length, + {state, cluster_extent.cluster_length}); + } + + ldout(cct, 20) << "l1_table_index=" << l1_table_index << ", " + << "snap_id=" << snap_id << ", " + << "sparse_extents=" << sparse_extents << dendl; + + // continue processing the L2 table at this index for all snapshots + boost::asio::post(*qcow_format->m_image_ctx->asio_engine, + [this]() { get_l2_table(); }); + } + + + void finish(int r) { + auto cct = qcow_format->m_image_ctx->cct; + ldout(cct, 20) << "r=" << r << dendl; + + on_finish->complete(r); + delete this; + } +}; + +#undef dout_prefix +#define dout_prefix *_dout << "librbd::migration::QCOWFormat: " << this \ + << " " << __func__ << ": " + +template <typename I> +QCOWFormat<I>::QCOWFormat( + I* image_ctx, const json_spirit::mObject& json_object, + const SourceSpecBuilder<I>* source_spec_builder) + : m_image_ctx(image_ctx), m_json_object(json_object), + m_source_spec_builder(source_spec_builder), + m_strand(*image_ctx->asio_engine) { +} + +template <typename I> +void QCOWFormat<I>::open(Context* on_finish) { + auto cct = m_image_ctx->cct; + ldout(cct, 10) << dendl; + + int r = m_source_spec_builder->build_stream(m_json_object, &m_stream); + if (r < 0) { + lderr(cct) << "failed to build migration stream handler" << cpp_strerror(r) + << dendl; + on_finish->complete(r); + return; + } + + auto ctx = new LambdaContext([this, on_finish](int r) { + handle_open(r, on_finish); }); + m_stream->open(ctx); +} + +template <typename I> +void QCOWFormat<I>::handle_open(int r, Context* on_finish) { + auto cct = m_image_ctx->cct; + ldout(cct, 10) << "r=" << r << dendl; + + if (r < 0) { + lderr(cct) << "failed to open QCOW image: " << cpp_strerror(r) + << dendl; + on_finish->complete(r); + return; + } + + probe(on_finish); +} + +template <typename I> +void QCOWFormat<I>::probe(Context* on_finish) { + auto cct = m_image_ctx->cct; + ldout(cct, 10) << dendl; + + auto ctx = new LambdaContext([this, on_finish](int r) { + handle_probe(r, on_finish); }); + m_bl.clear(); + m_stream->read({{0, 8}}, &m_bl, ctx); +} + +template <typename I> +void QCOWFormat<I>::handle_probe(int r, Context* on_finish) { + auto cct = m_image_ctx->cct; + ldout(cct, 10) << "r=" << r << dendl; + + if (r < 0) { + lderr(cct) << "failed to probe QCOW image: " << cpp_strerror(r) + << dendl; + on_finish->complete(r); + return; + } + + auto header_probe = *reinterpret_cast<QCowHeaderProbe*>( + m_bl.c_str()); + header_probe.magic = be32toh(header_probe.magic); + header_probe.version = be32toh(header_probe.version); + + if (header_probe.magic != QCOW_MAGIC) { + lderr(cct) << "invalid QCOW header magic" << dendl; + on_finish->complete(-EINVAL); + return; + } + + m_bl.clear(); + if (header_probe.version == 1) { +#ifdef WITH_RBD_MIGRATION_FORMAT_QCOW_V1 + read_v1_header(on_finish); +#else // WITH_RBD_MIGRATION_FORMAT_QCOW_V1 + lderr(cct) << "QCOW is not supported" << dendl; + on_finish->complete(-ENOTSUP); +#endif // WITH_RBD_MIGRATION_FORMAT_QCOW_V1 + return; + } else if (header_probe.version >= 2 && header_probe.version <= 3) { + read_v2_header(on_finish); + return; + } else { + lderr(cct) << "invalid QCOW header version " << header_probe.version + << dendl; + on_finish->complete(-EINVAL); + return; + } +} + +#ifdef WITH_RBD_MIGRATION_FORMAT_QCOW_V1 + +template <typename I> +void QCOWFormat<I>::read_v1_header(Context* on_finish) { + auto cct = m_image_ctx->cct; + ldout(cct, 10) << dendl; + + auto ctx = new LambdaContext([this, on_finish](int r) { + handle_read_v1_header(r, on_finish); }); + m_bl.clear(); + m_stream->read({{0, sizeof(QCowHeaderV1)}}, &m_bl, ctx); +} + +template <typename I> +void QCOWFormat<I>::handle_read_v1_header(int r, Context* on_finish) { + auto cct = m_image_ctx->cct; + ldout(cct, 10) << "r=" << r << dendl; + + if (r < 0) { + lderr(cct) << "failed to read QCOW header: " << cpp_strerror(r) << dendl; + on_finish->complete(r); + return; + } + + auto header = *reinterpret_cast<QCowHeaderV1*>(m_bl.c_str()); + + // byte-swap important fields + header.magic = be32toh(header.magic); + header.version = be32toh(header.version); + header.backing_file_offset = be64toh(header.backing_file_offset); + header.backing_file_size = be32toh(header.backing_file_size); + header.size = be64toh(header.size); + header.crypt_method = be32toh(header.crypt_method); + header.l1_table_offset = be64toh(header.l1_table_offset); + + if (header.magic != QCOW_MAGIC || header.version != 1) { + // honestly shouldn't happen since we've already validated it + lderr(cct) << "header is not QCOW" << dendl; + on_finish->complete(-EINVAL); + return; + } + + if (header.cluster_bits < QCOW_MIN_CLUSTER_BITS || + header.cluster_bits > QCOW_MAX_CLUSTER_BITS) { + lderr(cct) << "invalid cluster bits: " << header.cluster_bits << dendl; + on_finish->complete(-EINVAL); + return; + } + + if (header.l2_bits < (QCOW_MIN_CLUSTER_BITS - 3) || + header.l2_bits > (QCOW_MAX_CLUSTER_BITS - 3)) { + lderr(cct) << "invalid L2 bits: " << header.l2_bits << dendl; + on_finish->complete(-EINVAL); + return; + } + + if (header.crypt_method != QCOW_CRYPT_NONE) { + lderr(cct) << "invalid or unsupported encryption method" << dendl; + on_finish->complete(-EINVAL); + return; + } + + m_size = header.size; + if (p2roundup(m_size, static_cast<uint64_t>(512)) != m_size) { + lderr(cct) << "image size is not a multiple of block size" << dendl; + on_finish->complete(-EINVAL); + return; + } + + m_backing_file_offset = header.backing_file_offset; + m_backing_file_size = header.backing_file_size; + + m_cluster_bits = header.cluster_bits; + m_cluster_size = 1UL << header.cluster_bits; + m_cluster_offset_mask = (1ULL << (63 - header.cluster_bits)) - 1; + m_cluster_mask = ~QCOW_OFLAG_COMPRESSED; + + m_l2_bits = header.l2_bits; + m_l2_size = (1UL << m_l2_bits); + + m_l1_shift = m_cluster_bits + m_l2_bits; + m_l1_table.size = (m_size + (1LL << m_l1_shift) - 1) >> m_l1_shift; + m_l1_table_offset = header.l1_table_offset; + if (m_size > (std::numeric_limits<uint64_t>::max() - (1ULL << m_l1_shift)) || + m_l1_table.size > + (std::numeric_limits<int32_t>::max() / sizeof(uint64_t))) { + lderr(cct) << "image size too big: " << m_size << dendl; + on_finish->complete(-EINVAL); + return; + } + + ldout(cct, 15) << "size=" << m_size << ", " + << "cluster_bits=" << m_cluster_bits << ", " + << "l2_bits=" << m_l2_bits << dendl; + + // allocate memory for L1 table and L2 + cluster caches + m_l2_table_cache = std::make_unique<L2TableCache>(this); + m_cluster_cache = std::make_unique<ClusterCache>(this); + + read_l1_table(on_finish); +} + +#endif // WITH_RBD_MIGRATION_FORMAT_QCOW_V1 + +template <typename I> +void QCOWFormat<I>::read_v2_header(Context* on_finish) { + auto cct = m_image_ctx->cct; + ldout(cct, 10) << dendl; + + auto ctx = new LambdaContext([this, on_finish](int r) { + handle_read_v2_header(r, on_finish); }); + m_bl.clear(); + m_stream->read({{0, sizeof(QCowHeader)}}, &m_bl, ctx); +} + +template <typename I> +void QCOWFormat<I>::handle_read_v2_header(int r, Context* on_finish) { + auto cct = m_image_ctx->cct; + ldout(cct, 10) << "r=" << r << dendl; + + if (r < 0) { + lderr(cct) << "failed to read QCOW2 header: " << cpp_strerror(r) << dendl; + on_finish->complete(r); + return; + } + + auto header = *reinterpret_cast<QCowHeader*>(m_bl.c_str()); + + // byte-swap important fields + header.magic = be32toh(header.magic); + header.version = be32toh(header.version); + header.backing_file_offset = be64toh(header.backing_file_offset); + header.backing_file_size = be32toh(header.backing_file_size); + header.cluster_bits = be32toh(header.cluster_bits); + header.size = be64toh(header.size); + header.crypt_method = be32toh(header.crypt_method); + header.l1_size = be32toh(header.l1_size); + header.l1_table_offset = be64toh(header.l1_table_offset); + header.nb_snapshots = be32toh(header.nb_snapshots); + header.snapshots_offset = be64toh(header.snapshots_offset); + + if (header.version == 2) { + // valid only for version >= 3 + header.incompatible_features = 0; + header.compatible_features = 0; + header.autoclear_features = 0; + header.header_length = 72; + header.compression_type = 0; + } else { + header.incompatible_features = be64toh(header.incompatible_features); + header.compatible_features = be64toh(header.compatible_features); + header.autoclear_features = be64toh(header.autoclear_features); + header.header_length = be32toh(header.header_length); + } + + if (header.magic != QCOW_MAGIC || header.version < 2 || header.version > 3) { + // honestly shouldn't happen since we've already validated it + lderr(cct) << "header is not QCOW2" << dendl; + on_finish->complete(-EINVAL); + return; + } + + if (header.cluster_bits < QCOW_MIN_CLUSTER_BITS || + header.cluster_bits > QCOW_MAX_CLUSTER_BITS) { + lderr(cct) << "invalid cluster bits: " << header.cluster_bits << dendl; + on_finish->complete(-EINVAL); + return; + } + + if (header.crypt_method != QCOW_CRYPT_NONE) { + lderr(cct) << "invalid or unsupported encryption method" << dendl; + on_finish->complete(-EINVAL); + return; + } + + m_size = header.size; + if (p2roundup(m_size, static_cast<uint64_t>(512)) != m_size) { + lderr(cct) << "image size is not a multiple of block size" << dendl; + on_finish->complete(-EINVAL); + return; + } + + if (header.header_length <= offsetof(QCowHeader, compression_type)) { + header.compression_type = 0; + } + + if ((header.compression_type != 0) || + ((header.incompatible_features & QCOW2_INCOMPAT_COMPRESSION) != 0)) { + lderr(cct) << "invalid or unsupported compression type" << dendl; + on_finish->complete(-EINVAL); + return; + } + + if ((header.incompatible_features & QCOW2_INCOMPAT_DATA_FILE) != 0) { + lderr(cct) << "external data file feature not supported" << dendl; + on_finish->complete(-ENOTSUP); + } + + if ((header.incompatible_features & QCOW2_INCOMPAT_EXTL2) != 0) { + lderr(cct) << "extended L2 table feature not supported" << dendl; + on_finish->complete(-ENOTSUP); + return; + } + + header.incompatible_features &= ~QCOW2_INCOMPAT_MASK; + if (header.incompatible_features != 0) { + lderr(cct) << "unknown incompatible feature enabled" << dendl; + on_finish->complete(-EINVAL); + return; + } + + m_backing_file_offset = header.backing_file_offset; + m_backing_file_size = header.backing_file_size; + + m_cluster_bits = header.cluster_bits; + m_cluster_size = 1UL << header.cluster_bits; + m_cluster_offset_mask = (1ULL << (63 - header.cluster_bits)) - 1; + m_cluster_mask = ~(QCOW_OFLAG_COMPRESSED | QCOW_OFLAG_COPIED); + + // L2 table is fixed a (1) cluster block to hold 8-byte (3 bit) offsets + m_l2_bits = m_cluster_bits - 3; + m_l2_size = (1UL << m_l2_bits); + + m_l1_shift = m_cluster_bits + m_l2_bits; + m_l1_table.size = (m_size + (1LL << m_l1_shift) - 1) >> m_l1_shift; + m_l1_table_offset = header.l1_table_offset; + if (m_size > (std::numeric_limits<uint64_t>::max() - (1ULL << m_l1_shift)) || + m_l1_table.size > + (std::numeric_limits<int32_t>::max() / sizeof(uint64_t))) { + lderr(cct) << "image size too big: " << m_size << dendl; + on_finish->complete(-EINVAL); + return; + } else if (m_l1_table.size > header.l1_size) { + lderr(cct) << "invalid L1 table size in header (" << header.l1_size + << " < " << m_l1_table.size << ")" << dendl; + on_finish->complete(-EINVAL); + return; + } + + m_snapshot_count = header.nb_snapshots; + m_snapshots_offset = header.snapshots_offset; + + ldout(cct, 15) << "size=" << m_size << ", " + << "cluster_bits=" << m_cluster_bits << ", " + << "l1_table_offset=" << m_l1_table_offset << ", " + << "snapshot_count=" << m_snapshot_count << ", " + << "snapshots_offset=" << m_snapshots_offset << dendl; + + // allocate memory for L1 table and L2 + cluster caches + m_l2_table_cache = std::make_unique<L2TableCache>(this); + m_cluster_cache = std::make_unique<ClusterCache>(this); + + read_snapshot(on_finish); +} + +template <typename I> +void QCOWFormat<I>::read_snapshot(Context* on_finish) { + if (m_snapshots_offset == 0 || m_snapshots.size() == m_snapshot_count) { + read_l1_table(on_finish); + return; + } + + // header is always aligned on 8 byte boundary + m_snapshots_offset = p2roundup(m_snapshots_offset, static_cast<uint64_t>(8)); + + auto cct = m_image_ctx->cct; + ldout(cct, 10) << "snap_id=" << (m_snapshots.size() + 1) << ", " + << "offset=" << m_snapshots_offset << dendl; + + auto ctx = new LambdaContext([this, on_finish](int r) { + handle_read_snapshot(r, on_finish); }); + m_bl.clear(); + m_stream->read({{m_snapshots_offset, sizeof(QCowSnapshotHeader)}}, &m_bl, + ctx); +} + +template <typename I> +void QCOWFormat<I>::handle_read_snapshot(int r, Context* on_finish) { + auto cct = m_image_ctx->cct; + ldout(cct, 10) << "r=" << r << ", " + << "index=" << m_snapshots.size() << dendl; + + if (r < 0) { + lderr(cct) << "failed to read QCOW2 snapshot header: " << cpp_strerror(r) + << dendl; + on_finish->complete(r); + return; + } + + m_snapshots_offset += m_bl.length(); + auto header = *reinterpret_cast<QCowSnapshotHeader*>(m_bl.c_str()); + + auto& snapshot = m_snapshots[m_snapshots.size() + 1]; + snapshot.id.resize(be16toh(header.id_str_size)); + snapshot.name.resize(be16toh(header.name_size)); + snapshot.l1_table_offset = be64toh(header.l1_table_offset); + snapshot.l1_table.size = be32toh(header.l1_size); + snapshot.timestamp.sec_ref() = be32toh(header.date_sec); + snapshot.timestamp.nsec_ref() = be32toh(header.date_nsec); + snapshot.extra_data_size = be32toh(header.extra_data_size); + + ldout(cct, 10) << "snap_id=" << m_snapshots.size() << ", " + << "id_str_len=" << snapshot.id.size() << ", " + << "name_str_len=" << snapshot.name.size() << ", " + << "l1_table_offset=" << snapshot.l1_table_offset << ", " + << "l1_size=" << snapshot.l1_table.size << ", " + << "extra_data_size=" << snapshot.extra_data_size << dendl; + + read_snapshot_extra(on_finish); +} + +template <typename I> +void QCOWFormat<I>::read_snapshot_extra(Context* on_finish) { + ceph_assert(!m_snapshots.empty()); + auto& snapshot = m_snapshots.rbegin()->second; + + uint32_t length = snapshot.extra_data_size + + snapshot.id.size() + + snapshot.name.size(); + if (length == 0) { + uuid_d uuid_gen; + uuid_gen.generate_random(); + snapshot.name = uuid_gen.to_string(); + + read_snapshot(on_finish); + return; + } + + auto cct = m_image_ctx->cct; + ldout(cct, 10) << "snap_id=" << m_snapshots.size() << ", " + << "offset=" << m_snapshots_offset << ", " + << "length=" << length << dendl; + + auto offset = m_snapshots_offset; + m_snapshots_offset += length; + + auto ctx = new LambdaContext([this, on_finish](int r) { + handle_read_snapshot_extra(r, on_finish); }); + m_bl.clear(); + m_stream->read({{offset, length}}, &m_bl, ctx); +} + +template <typename I> +void QCOWFormat<I>::handle_read_snapshot_extra(int r, Context* on_finish) { + ceph_assert(!m_snapshots.empty()); + auto& snapshot = m_snapshots.rbegin()->second; + + auto cct = m_image_ctx->cct; + ldout(cct, 10) << "r=" << r << ", " + << "snap_id=" << m_snapshots.size() << dendl; + + if (r < 0) { + lderr(cct) << "failed to read QCOW2 snapshot header extra: " + << cpp_strerror(r) << dendl; + on_finish->complete(r); + return; + } + + if (snapshot.extra_data_size >= + offsetof(QCowSnapshotExtraData, disk_size) + sizeof(uint64_t)) { + auto extra = reinterpret_cast<const QCowSnapshotExtraData*>(m_bl.c_str()); + snapshot.size = be64toh(extra->disk_size); + } else { + snapshot.size = m_size; + } + + auto data = reinterpret_cast<const char*>(m_bl.c_str()); + data += snapshot.extra_data_size; + + if (!snapshot.id.empty()) { + snapshot.id = std::string(data, snapshot.id.size()); + data += snapshot.id.size(); + } + + if (!snapshot.name.empty()) { + snapshot.name = std::string(data, snapshot.name.size()); + data += snapshot.name.size(); + } else { + uuid_d uuid_gen; + uuid_gen.generate_random(); + snapshot.name = uuid_gen.to_string(); + } + + ldout(cct, 10) << "snap_id=" << m_snapshots.size() << ", " + << "name=" << snapshot.name << ", " + << "size=" << snapshot.size << dendl; + read_snapshot_l1_table(on_finish); +} + +template <typename I> +void QCOWFormat<I>::read_snapshot_l1_table(Context* on_finish) { + ceph_assert(!m_snapshots.empty()); + auto& snapshot = m_snapshots.rbegin()->second; + + auto cct = m_image_ctx->cct; + ldout(cct, 10) << "snap_id=" << m_snapshots.size() << ", " + << "l1_table_offset=" << snapshot.l1_table_offset + << dendl; + + auto ctx = new LambdaContext([this, on_finish](int r) { + handle_read_snapshot_l1_table(r, on_finish); }); + m_stream->read({{snapshot.l1_table_offset, + snapshot.l1_table.size * sizeof(uint64_t)}}, + &snapshot.l1_table.bl, ctx); +} + +template <typename I> +void QCOWFormat<I>::handle_read_snapshot_l1_table(int r, Context* on_finish) { + ceph_assert(!m_snapshots.empty()); + auto& snapshot = m_snapshots.rbegin()->second; + + auto cct = m_image_ctx->cct; + ldout(cct, 10) << "r=" << r << ", " + << "snap_id=" << m_snapshots.size() << dendl; + + if (r < 0) { + lderr(cct) << "failed to read snapshot L1 table: " << cpp_strerror(r) + << dendl; + on_finish->complete(r); + return; + } + + snapshot.l1_table.decode(); + read_snapshot(on_finish); +} + +template <typename I> +void QCOWFormat<I>::read_l1_table(Context* on_finish) { + auto cct = m_image_ctx->cct; + ldout(cct, 10) << dendl; + + auto ctx = new LambdaContext([this, on_finish](int r) { + handle_read_l1_table(r, on_finish); }); + m_stream->read({{m_l1_table_offset, + m_l1_table.size * sizeof(uint64_t)}}, + &m_l1_table.bl, ctx); +} + +template <typename I> +void QCOWFormat<I>::handle_read_l1_table(int r, Context* on_finish) { + auto cct = m_image_ctx->cct; + ldout(cct, 10) << "r=" << r << dendl; + + if (r < 0) { + lderr(cct) << "failed to read L1 table: " << cpp_strerror(r) << dendl; + on_finish->complete(r); + return; + } + + m_l1_table.decode(); + read_backing_file(on_finish); +} + +template <typename I> +void QCOWFormat<I>::read_backing_file(Context* on_finish) { + if (m_backing_file_offset == 0 || m_backing_file_size == 0) { + // all data is within the specified file + on_finish->complete(0); + return; + } + + auto cct = m_image_ctx->cct; + ldout(cct, 10) << dendl; + + // TODO add support for backing files + on_finish->complete(-ENOTSUP); +} + +template <typename I> +void QCOWFormat<I>::close(Context* on_finish) { + auto cct = m_image_ctx->cct; + ldout(cct, 10) << dendl; + + m_stream->close(on_finish); +} + +template <typename I> +void QCOWFormat<I>::get_snapshots(SnapInfos* snap_infos, Context* on_finish) { + auto cct = m_image_ctx->cct; + ldout(cct, 10) << dendl; + + snap_infos->clear(); + for (auto& [snap_id, snapshot] : m_snapshots) { + SnapInfo snap_info(snapshot.name, cls::rbd::UserSnapshotNamespace{}, + snapshot.size, {}, 0, 0, snapshot.timestamp); + snap_infos->emplace(snap_id, snap_info); + } + + on_finish->complete(0); +} + +template <typename I> +void QCOWFormat<I>::get_image_size(uint64_t snap_id, uint64_t* size, + Context* on_finish) { + auto cct = m_image_ctx->cct; + ldout(cct, 10) << "snap_id=" << snap_id << dendl; + + if (snap_id == CEPH_NOSNAP) { + *size = m_size; + } else { + auto snapshot_it = m_snapshots.find(snap_id); + if (snapshot_it == m_snapshots.end()) { + on_finish->complete(-ENOENT); + return; + } + + auto& snapshot = snapshot_it->second; + *size = snapshot.size; + } + + on_finish->complete(0); +} + +template <typename I> +bool QCOWFormat<I>::read( + io::AioCompletion* aio_comp, uint64_t snap_id, io::Extents&& image_extents, + io::ReadResult&& read_result, int op_flags, int read_flags, + const ZTracer::Trace &parent_trace) { + auto cct = m_image_ctx->cct; + ldout(cct, 20) << "snap_id=" << snap_id << ", " + << "image_extents=" << image_extents << dendl; + + const LookupTable* l1_table = nullptr; + if (snap_id == CEPH_NOSNAP) { + l1_table = &m_l1_table; + } else { + auto snapshot_it = m_snapshots.find(snap_id); + if (snapshot_it == m_snapshots.end()) { + aio_comp->fail(-ENOENT); + return true; + } + + auto& snapshot = snapshot_it->second; + l1_table = &snapshot.l1_table; + } + + aio_comp->read_result = std::move(read_result); + aio_comp->read_result.set_image_extents(image_extents); + + auto read_request = new ReadRequest(this, aio_comp, l1_table, + std::move(image_extents)); + read_request->send(); + + return true; +} + +template <typename I> +void QCOWFormat<I>::list_snaps(io::Extents&& image_extents, + io::SnapIds&& snap_ids, int list_snaps_flags, + io::SnapshotDelta* snapshot_delta, + const ZTracer::Trace &parent_trace, + Context* on_finish) { + auto cct = m_image_ctx->cct; + ldout(cct, 20) << "image_extents=" << image_extents << dendl; + + ClusterExtents cluster_extents; + populate_cluster_extents(cct, m_cluster_size, image_extents, + &cluster_extents); + + // map L1 table indexes to cluster extents + std::map<uint64_t, ClusterExtents> l1_cluster_extents; + for (auto& cluster_extent : cluster_extents) { + uint32_t l1_table_index = cluster_extent.image_offset >> m_l1_shift; + auto& l1_cluster_extent = l1_cluster_extents[l1_table_index]; + l1_cluster_extent.reserve(cluster_extents.size()); + l1_cluster_extent.push_back(cluster_extent); + } + + std::map<uint64_t, const LookupTable*> snap_id_to_l1_table; + for (auto& [snap_id, snapshot] : m_snapshots) { + snap_id_to_l1_table[snap_id] = &snapshot.l1_table; + } + snap_id_to_l1_table[CEPH_NOSNAP] = &m_l1_table; + + on_finish = new LambdaContext([this, image_extents, + snap_ids=std::move(snap_ids), + snapshot_delta, on_finish](int r) mutable { + handle_list_snaps(r, std::move(image_extents), std::move(snap_ids), + snapshot_delta, on_finish); + }); + + auto gather_ctx = new C_Gather(cct, on_finish); + + for (auto& [l1_table_index, cluster_extents] : l1_cluster_extents) { + auto list_snaps_request = new ListSnapsRequest( + this, l1_table_index, std::move(cluster_extents), snap_id_to_l1_table, + snapshot_delta, gather_ctx->new_sub()); + list_snaps_request->send(); + } + + gather_ctx->activate(); +} + +template <typename I> +void QCOWFormat<I>::handle_list_snaps(int r, io::Extents&& image_extents, + io::SnapIds&& snap_ids, + io::SnapshotDelta* snapshot_delta, + Context* on_finish) { + auto cct = m_image_ctx->cct; + ldout(cct, 20) << "r=" << r << ", " + << "snapshot_delta=" << *snapshot_delta << dendl; + + std::optional<uint64_t> previous_size = std::nullopt; + for (auto& [snap_id, snapshot] : m_snapshots) { + auto sparse_extents = &(*snapshot_delta)[{snap_id, snap_id}]; + util::zero_shrunk_snapshot(cct, image_extents, snap_id, snapshot.size, + &previous_size, sparse_extents); + } + + auto sparse_extents = &(*snapshot_delta)[{CEPH_NOSNAP, CEPH_NOSNAP}]; + util::zero_shrunk_snapshot(cct, image_extents, CEPH_NOSNAP, m_size, + &previous_size, sparse_extents); + + util::merge_snapshot_delta(snap_ids, snapshot_delta); + on_finish->complete(r); +} + +} // namespace migration +} // namespace librbd + +template class librbd::migration::QCOWFormat<librbd::ImageCtx>; diff --git a/src/librbd/migration/QCOWFormat.h b/src/librbd/migration/QCOWFormat.h new file mode 100644 index 000000000..b36506716 --- /dev/null +++ b/src/librbd/migration/QCOWFormat.h @@ -0,0 +1,211 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#ifndef CEPH_LIBRBD_MIGRATION_QCOW_FORMAT_H +#define CEPH_LIBRBD_MIGRATION_QCOW_FORMAT_H + +#include "include/int_types.h" +#include "librbd/Types.h" +#include "librbd/migration/FormatInterface.h" +#include "librbd/migration/QCOW.h" +#include "acconfig.h" +#include "json_spirit/json_spirit.h" +#include <boost/asio/io_context_strand.hpp> +#include <boost/iostreams/filter/zlib.hpp> +#include <deque> +#include <vector> +#include <memory> + +struct Context; + +namespace librbd { + +struct AsioEngine; +struct ImageCtx; + +namespace migration { + +template <typename> struct SourceSpecBuilder; +struct StreamInterface; + +namespace qcow_format { + +struct LookupTable { + LookupTable() {} + LookupTable(uint32_t size) : size(size) {} + + bufferlist bl; + uint64_t* cluster_offsets = nullptr; + uint32_t size = 0; + bool decoded = false; + + void init(); + void decode(); +}; + +} // namespace qcow_format + +template <typename ImageCtxT> +class QCOWFormat : public FormatInterface { +public: + static QCOWFormat* create( + ImageCtxT* image_ctx, const json_spirit::mObject& json_object, + const SourceSpecBuilder<ImageCtxT>* source_spec_builder) { + return new QCOWFormat(image_ctx, json_object, source_spec_builder); + } + + QCOWFormat(ImageCtxT* image_ctx, const json_spirit::mObject& json_object, + const SourceSpecBuilder<ImageCtxT>* source_spec_builder); + QCOWFormat(const QCOWFormat&) = delete; + QCOWFormat& operator=(const QCOWFormat&) = delete; + + void open(Context* on_finish) override; + void close(Context* on_finish) override; + + void get_snapshots(SnapInfos* snap_infos, Context* on_finish) override; + void get_image_size(uint64_t snap_id, uint64_t* size, + Context* on_finish) override; + + bool read(io::AioCompletion* aio_comp, uint64_t snap_id, + io::Extents&& image_extents, io::ReadResult&& read_result, + int op_flags, int read_flags, + const ZTracer::Trace &parent_trace) override; + + void list_snaps(io::Extents&& image_extents, io::SnapIds&& snap_ids, + int list_snaps_flags, io::SnapshotDelta* snapshot_delta, + const ZTracer::Trace &parent_trace, + Context* on_finish) override; + +private: + /** + * @verbatim + * + * <start> + * | + * v + * OPEN + * | + * v + * PROBE + * | + * |\---> READ V1 HEADER ----------\ + * | | + * \----> READ V2 HEADER | + * | | + * | /----------\ | + * | | | | + * v v | | + * READ SNAPSHOT | | + * | | | + * v | | + * READ SNAPSHOT EXTRA | | + * | | | + * v | | + * READ SNAPSHOT L1 TABLE | + * | | + * \--------------------\| + * | + * v + * READ L1 TABLE + * | + * v + * READ BACKING FILE + * | + * /-------------------------------/ + * | + * v + * <opened> + * + * @endverbatim + */ + + struct Cluster; + struct ClusterCache; + struct L2TableCache; + struct ReadRequest; + struct ListSnapsRequest; + + struct Snapshot { + std::string id; + std::string name; + + utime_t timestamp; + uint64_t size = 0; + + uint64_t l1_table_offset = 0; + qcow_format::LookupTable l1_table; + + uint32_t extra_data_size = 0; + }; + + ImageCtxT* m_image_ctx; + json_spirit::mObject m_json_object; + const SourceSpecBuilder<ImageCtxT>* m_source_spec_builder; + + boost::asio::io_context::strand m_strand; + std::shared_ptr<StreamInterface> m_stream; + + bufferlist m_bl; + + uint64_t m_size = 0; + + uint64_t m_backing_file_offset = 0; + uint32_t m_backing_file_size = 0; + + uint32_t m_cluster_bits = 0; + uint32_t m_cluster_size = 0; + uint64_t m_cluster_offset_mask = 0; + uint64_t m_cluster_mask = 0; + + uint32_t m_l1_shift = 0; + uint64_t m_l1_table_offset = 0; + qcow_format::LookupTable m_l1_table; + + uint32_t m_l2_bits = 0; + uint32_t m_l2_size = 0; + + uint32_t m_snapshot_count = 0; + uint64_t m_snapshots_offset = 0; + std::map<uint64_t, Snapshot> m_snapshots; + + std::unique_ptr<L2TableCache> m_l2_table_cache; + std::unique_ptr<ClusterCache> m_cluster_cache; + + void handle_open(int r, Context* on_finish); + + void probe(Context* on_finish); + void handle_probe(int r, Context* on_finish); + +#ifdef WITH_RBD_MIGRATION_FORMAT_QCOW_V1 + void read_v1_header(Context* on_finish); + void handle_read_v1_header(int r, Context* on_finish); +#endif // WITH_RBD_MIGRATION_FORMAT_QCOW_V1 + + void read_v2_header(Context* on_finish); + void handle_read_v2_header(int r, Context* on_finish); + + void read_snapshot(Context* on_finish); + void handle_read_snapshot(int r, Context* on_finish); + + void read_snapshot_extra(Context* on_finish); + void handle_read_snapshot_extra(int r, Context* on_finish); + + void read_snapshot_l1_table(Context* on_finish); + void handle_read_snapshot_l1_table(int r, Context* on_finish); + + void read_l1_table(Context* on_finish); + void handle_read_l1_table(int r, Context* on_finish); + + void read_backing_file(Context* on_finish); + + void handle_list_snaps(int r, io::Extents&& image_extents, + io::SnapIds&& snap_ids, + io::SnapshotDelta* snapshot_delta, Context* on_finish); +}; + +} // namespace migration +} // namespace librbd + +extern template class librbd::migration::QCOWFormat<librbd::ImageCtx>; + +#endif // CEPH_LIBRBD_MIGRATION_QCOW_FORMAT_H diff --git a/src/librbd/migration/RawFormat.cc b/src/librbd/migration/RawFormat.cc new file mode 100644 index 000000000..0b655d368 --- /dev/null +++ b/src/librbd/migration/RawFormat.cc @@ -0,0 +1,235 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#include "librbd/migration/RawFormat.h" +#include "common/dout.h" +#include "common/errno.h" +#include "librbd/ImageCtx.h" +#include "librbd/ImageState.h" +#include "librbd/Utils.h" +#include "librbd/io/AioCompletion.h" +#include "librbd/io/ReadResult.h" +#include "librbd/migration/SnapshotInterface.h" +#include "librbd/migration/SourceSpecBuilder.h" +#include "librbd/migration/Utils.h" + +namespace librbd { +namespace migration { + +namespace { + +static const std::string SNAPSHOTS_KEY {"snapshots"}; + + +} // anonymous namespace + +#define dout_subsys ceph_subsys_rbd +#undef dout_prefix +#define dout_prefix *_dout << "librbd::migration::RawFormat: " << this \ + << " " << __func__ << ": " + +template <typename I> +RawFormat<I>::RawFormat( + I* image_ctx, const json_spirit::mObject& json_object, + const SourceSpecBuilder<I>* source_spec_builder) + : m_image_ctx(image_ctx), m_json_object(json_object), + m_source_spec_builder(source_spec_builder) { +} + +template <typename I> +void RawFormat<I>::open(Context* on_finish) { + auto cct = m_image_ctx->cct; + ldout(cct, 10) << dendl; + + on_finish = new LambdaContext([this, on_finish](int r) { + handle_open(r, on_finish); }); + + // treat the base image as a HEAD-revision snapshot + Snapshots snapshots; + int r = m_source_spec_builder->build_snapshot(m_json_object, CEPH_NOSNAP, + &snapshots[CEPH_NOSNAP]); + if (r < 0) { + lderr(cct) << "failed to build HEAD revision handler: " << cpp_strerror(r) + << dendl; + on_finish->complete(r); + return; + } + + auto& snapshots_val = m_json_object[SNAPSHOTS_KEY]; + if (snapshots_val.type() == json_spirit::array_type) { + auto& snapshots_arr = snapshots_val.get_array(); + for (auto& snapshot_val : snapshots_arr) { + uint64_t index = snapshots.size(); + if (snapshot_val.type() != json_spirit::obj_type) { + lderr(cct) << "invalid snapshot " << index << " JSON: " + << cpp_strerror(r) << dendl; + on_finish->complete(-EINVAL); + return; + } + + auto& snapshot_obj = snapshot_val.get_obj(); + r = m_source_spec_builder->build_snapshot(snapshot_obj, index, + &snapshots[index]); + if (r < 0) { + lderr(cct) << "failed to build snapshot " << index << " handler: " + << cpp_strerror(r) << dendl; + on_finish->complete(r); + return; + } + } + } else if (snapshots_val.type() != json_spirit::null_type) { + lderr(cct) << "invalid snapshots array" << dendl; + on_finish->complete(-EINVAL); + return; + } + + m_snapshots = std::move(snapshots); + + auto gather_ctx = new C_Gather(cct, on_finish); + SnapshotInterface* previous_snapshot = nullptr; + for (auto& [_, snapshot] : m_snapshots) { + snapshot->open(previous_snapshot, gather_ctx->new_sub()); + previous_snapshot = snapshot.get(); + } + gather_ctx->activate(); +} + +template <typename I> +void RawFormat<I>::handle_open(int r, Context* on_finish) { + auto cct = m_image_ctx->cct; + ldout(cct, 10) << "r=" << r << dendl; + + if (r < 0) { + lderr(cct) << "failed to open raw image: " << cpp_strerror(r) + << dendl; + + auto gather_ctx = new C_Gather(cct, on_finish); + for (auto& [_, snapshot] : m_snapshots) { + snapshot->close(gather_ctx->new_sub()); + } + + m_image_ctx->state->close(new LambdaContext( + [r, on_finish=gather_ctx->new_sub()](int _) { on_finish->complete(r); })); + + gather_ctx->activate(); + return; + } + + on_finish->complete(0); +} + +template <typename I> +void RawFormat<I>::close(Context* on_finish) { + auto cct = m_image_ctx->cct; + ldout(cct, 10) << dendl; + + auto gather_ctx = new C_Gather(cct, on_finish); + for (auto& [snap_id, snapshot] : m_snapshots) { + snapshot->close(gather_ctx->new_sub()); + } + + gather_ctx->activate(); +} + +template <typename I> +void RawFormat<I>::get_snapshots(SnapInfos* snap_infos, Context* on_finish) { + auto cct = m_image_ctx->cct; + ldout(cct, 10) << dendl; + + snap_infos->clear(); + for (auto& [snap_id, snapshot] : m_snapshots) { + if (snap_id == CEPH_NOSNAP) { + continue; + } + snap_infos->emplace(snap_id, snapshot->get_snap_info()); + } + on_finish->complete(0); +} + +template <typename I> +void RawFormat<I>::get_image_size(uint64_t snap_id, uint64_t* size, + Context* on_finish) { + auto cct = m_image_ctx->cct; + ldout(cct, 10) << dendl; + + auto snapshot_it = m_snapshots.find(snap_id); + if (snapshot_it == m_snapshots.end()) { + on_finish->complete(-ENOENT); + return; + } + + *size = snapshot_it->second->get_snap_info().size; + on_finish->complete(0); +} + +template <typename I> +bool RawFormat<I>::read( + io::AioCompletion* aio_comp, uint64_t snap_id, io::Extents&& image_extents, + io::ReadResult&& read_result, int op_flags, int read_flags, + const ZTracer::Trace &parent_trace) { + auto cct = m_image_ctx->cct; + ldout(cct, 20) << "snap_id=" << snap_id << ", " + << "image_extents=" << image_extents << dendl; + + auto snapshot_it = m_snapshots.find(snap_id); + if (snapshot_it == m_snapshots.end()) { + aio_comp->fail(-ENOENT); + return true; + } + + snapshot_it->second->read(aio_comp, std::move(image_extents), + std::move(read_result), op_flags, read_flags, + parent_trace); + return true; +} + +template <typename I> +void RawFormat<I>::list_snaps(io::Extents&& image_extents, + io::SnapIds&& snap_ids, int list_snaps_flags, + io::SnapshotDelta* snapshot_delta, + const ZTracer::Trace &parent_trace, + Context* on_finish) { + auto cct = m_image_ctx->cct; + ldout(cct, 20) << "image_extents=" << image_extents << dendl; + + on_finish = new LambdaContext([this, snap_ids=std::move(snap_ids), + snapshot_delta, on_finish](int r) mutable { + handle_list_snaps(r, std::move(snap_ids), snapshot_delta, on_finish); + }); + + auto gather_ctx = new C_Gather(cct, on_finish); + + std::optional<uint64_t> previous_size = std::nullopt; + for (auto& [snap_id, snapshot] : m_snapshots) { + auto& sparse_extents = (*snapshot_delta)[{snap_id, snap_id}]; + + // zero out any space between the previous snapshot end and this + // snapshot's end + auto& snap_info = snapshot->get_snap_info(); + util::zero_shrunk_snapshot(cct, image_extents, snap_id, snap_info.size, + &previous_size, &sparse_extents); + + // build set of data/zeroed extents for the current snapshot + snapshot->list_snap(io::Extents{image_extents}, list_snaps_flags, + &sparse_extents, parent_trace, gather_ctx->new_sub()); + } + + gather_ctx->activate(); +} + +template <typename I> +void RawFormat<I>::handle_list_snaps(int r, io::SnapIds&& snap_ids, + io::SnapshotDelta* snapshot_delta, + Context* on_finish) { + auto cct = m_image_ctx->cct; + ldout(cct, 20) << "r=" << r << ", " + << "snapshot_delta=" << snapshot_delta << dendl; + + util::merge_snapshot_delta(snap_ids, snapshot_delta); + on_finish->complete(r); +} + +} // namespace migration +} // namespace librbd + +template class librbd::migration::RawFormat<librbd::ImageCtx>; diff --git a/src/librbd/migration/RawFormat.h b/src/librbd/migration/RawFormat.h new file mode 100644 index 000000000..a20c0814f --- /dev/null +++ b/src/librbd/migration/RawFormat.h @@ -0,0 +1,78 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#ifndef CEPH_LIBRBD_MIGRATION_RAW_FORMAT_H +#define CEPH_LIBRBD_MIGRATION_RAW_FORMAT_H + +#include "include/int_types.h" +#include "librbd/Types.h" +#include "librbd/migration/FormatInterface.h" +#include "json_spirit/json_spirit.h" +#include <map> +#include <memory> + +struct Context; + +namespace librbd { + +struct AsioEngine; +struct ImageCtx; + +namespace migration { + +template <typename> struct SourceSpecBuilder; +struct SnapshotInterface; + +template <typename ImageCtxT> +class RawFormat : public FormatInterface { +public: + static RawFormat* create( + ImageCtxT* image_ctx, const json_spirit::mObject& json_object, + const SourceSpecBuilder<ImageCtxT>* source_spec_builder) { + return new RawFormat(image_ctx, json_object, source_spec_builder); + } + + RawFormat(ImageCtxT* image_ctx, const json_spirit::mObject& json_object, + const SourceSpecBuilder<ImageCtxT>* source_spec_builder); + RawFormat(const RawFormat&) = delete; + RawFormat& operator=(const RawFormat&) = delete; + + void open(Context* on_finish) override; + void close(Context* on_finish) override; + + void get_snapshots(SnapInfos* snap_infos, Context* on_finish) override; + void get_image_size(uint64_t snap_id, uint64_t* size, + Context* on_finish) override; + + bool read(io::AioCompletion* aio_comp, uint64_t snap_id, + io::Extents&& image_extents, io::ReadResult&& read_result, + int op_flags, int read_flags, + const ZTracer::Trace &parent_trace) override; + + void list_snaps(io::Extents&& image_extents, io::SnapIds&& snap_ids, + int list_snaps_flags, io::SnapshotDelta* snapshot_delta, + const ZTracer::Trace &parent_trace, + Context* on_finish) override; + +private: + typedef std::shared_ptr<SnapshotInterface> Snapshot; + typedef std::map<uint64_t, Snapshot> Snapshots; + + ImageCtxT* m_image_ctx; + json_spirit::mObject m_json_object; + const SourceSpecBuilder<ImageCtxT>* m_source_spec_builder; + + Snapshots m_snapshots; + + void handle_open(int r, Context* on_finish); + + void handle_list_snaps(int r, io::SnapIds&& snap_ids, + io::SnapshotDelta* snapshot_delta, Context* on_finish); +}; + +} // namespace migration +} // namespace librbd + +extern template class librbd::migration::RawFormat<librbd::ImageCtx>; + +#endif // CEPH_LIBRBD_MIGRATION_RAW_FORMAT_H diff --git a/src/librbd/migration/RawSnapshot.cc b/src/librbd/migration/RawSnapshot.cc new file mode 100644 index 000000000..4a83fd8ad --- /dev/null +++ b/src/librbd/migration/RawSnapshot.cc @@ -0,0 +1,220 @@ +// -*- mode:c++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#include "librbd/migration/RawSnapshot.h" +#include "common/dout.h" +#include "common/errno.h" +#include "librbd/ImageCtx.h" +#include "librbd/Utils.h" +#include "librbd/io/AioCompletion.h" +#include "librbd/io/ReadResult.h" +#include "librbd/migration/SourceSpecBuilder.h" +#include "librbd/migration/StreamInterface.h" + +namespace librbd { +namespace migration { + +namespace { + +const std::string NAME_KEY{"name"}; + +} // anonymous namespace + +#define dout_subsys ceph_subsys_rbd +#undef dout_prefix +#define dout_prefix *_dout << "librbd::migration::RawSnapshot::OpenRequest " \ + << this << " " << __func__ << ": " + +template <typename I> +struct RawSnapshot<I>::OpenRequest { + RawSnapshot* raw_snapshot; + Context* on_finish; + + OpenRequest(RawSnapshot* raw_snapshot, Context* on_finish) + : raw_snapshot(raw_snapshot), on_finish(on_finish) { + } + + void send() { + open_stream(); + } + + void open_stream() { + auto cct = raw_snapshot->m_image_ctx->cct; + ldout(cct, 10) << dendl; + + auto ctx = util::create_context_callback< + OpenRequest, &OpenRequest::handle_open_stream>(this); + raw_snapshot->m_stream->open(ctx); + } + + void handle_open_stream(int r) { + auto cct = raw_snapshot->m_image_ctx->cct; + ldout(cct, 10) << "r=" << r << dendl; + + if (r < 0) { + lderr(cct) << "failed to open stream: " << cpp_strerror(r) << dendl; + finish(r); + return; + } + + get_image_size(); + } + + void get_image_size() { + auto cct = raw_snapshot->m_image_ctx->cct; + ldout(cct, 10) << dendl; + + auto ctx = util::create_context_callback< + OpenRequest, &OpenRequest::handle_get_image_size>(this); + raw_snapshot->m_stream->get_size(&raw_snapshot->m_snap_info.size, ctx); + } + + void handle_get_image_size(int r) { + auto cct = raw_snapshot->m_image_ctx->cct; + ldout(cct, 10) << "r=" << r << ", " + << "image_size=" << raw_snapshot->m_snap_info.size << dendl; + + if (r < 0) { + lderr(cct) << "failed to open stream: " << cpp_strerror(r) << dendl; + close_stream(r); + return; + } + + finish(0); + } + + void close_stream(int r) { + auto cct = raw_snapshot->m_image_ctx->cct; + ldout(cct, 10) << dendl; + + auto ctx = new LambdaContext([this, r](int) { + handle_close_stream(r); + }); + raw_snapshot->m_stream->close(ctx); + } + + void handle_close_stream(int r) { + auto cct = raw_snapshot->m_image_ctx->cct; + ldout(cct, 10) << "r=" << r << dendl; + + raw_snapshot->m_stream.reset(); + + finish(r); + } + + void finish(int r) { + auto cct = raw_snapshot->m_image_ctx->cct; + ldout(cct, 10) << "r=" << r << dendl; + + on_finish->complete(r); + delete this; + } +}; + +#undef dout_prefix +#define dout_prefix *_dout << "librbd::migration::RawSnapshot: " << this \ + << " " << __func__ << ": " + +template <typename I> +RawSnapshot<I>::RawSnapshot(I* image_ctx, + const json_spirit::mObject& json_object, + const SourceSpecBuilder<I>* source_spec_builder, + uint64_t index) + : m_image_ctx(image_ctx), m_json_object(json_object), + m_source_spec_builder(source_spec_builder), m_index(index), + m_snap_info({}, {}, 0, {}, 0, 0, {}) { + auto cct = m_image_ctx->cct; + ldout(cct, 10) << dendl; +} + +template <typename I> +void RawSnapshot<I>::open(SnapshotInterface* previous_snapshot, + Context* on_finish) { + auto cct = m_image_ctx->cct; + + // special-case for treating the HEAD revision as a snapshot + if (m_index != CEPH_NOSNAP) { + auto& name_val = m_json_object[NAME_KEY]; + if (name_val.type() == json_spirit::str_type) { + m_snap_info.name = name_val.get_str(); + } else if (name_val.type() == json_spirit::null_type) { + uuid_d uuid_gen; + uuid_gen.generate_random(); + + m_snap_info.name = uuid_gen.to_string(); + } else { + lderr(cct) << "invalid snapshot name" << dendl; + on_finish->complete(-EINVAL); + return; + } + } + + ldout(cct, 10) << "name=" << m_snap_info.name << dendl; + + int r = m_source_spec_builder->build_stream(m_json_object, &m_stream); + if (r < 0) { + lderr(cct) << "failed to build migration stream handler" << cpp_strerror(r) + << dendl; + on_finish->complete(r); + return; + } + + auto req = new OpenRequest(this, on_finish); + req->send(); +} + +template <typename I> +void RawSnapshot<I>::close(Context* on_finish) { + auto cct = m_image_ctx->cct; + ldout(cct, 10) << dendl; + + if (!m_stream) { + on_finish->complete(0); + return; + } + + m_stream->close(on_finish); +} + +template <typename I> +void RawSnapshot<I>::read(io::AioCompletion* aio_comp, + io::Extents&& image_extents, + io::ReadResult&& read_result, int op_flags, + int read_flags, + const ZTracer::Trace &parent_trace) { + auto cct = m_image_ctx->cct; + ldout(cct, 20) << "image_extents=" << image_extents << dendl; + + aio_comp->read_result = std::move(read_result); + aio_comp->read_result.set_image_extents(image_extents); + + aio_comp->set_request_count(1); + auto ctx = new io::ReadResult::C_ImageReadRequest(aio_comp, + 0, image_extents); + + // raw directly maps the image-extent IO down to a byte IO extent + m_stream->read(std::move(image_extents), &ctx->bl, ctx); +} + +template <typename I> +void RawSnapshot<I>::list_snap(io::Extents&& image_extents, + int list_snaps_flags, + io::SparseExtents* sparse_extents, + const ZTracer::Trace &parent_trace, + Context* on_finish) { + auto cct = m_image_ctx->cct; + ldout(cct, 20) << "image_extents=" << image_extents << dendl; + + // raw does support sparse extents so list the full IO extent as a delta + for (auto& [image_offset, image_length] : image_extents) { + sparse_extents->insert(image_offset, image_length, + {io::SPARSE_EXTENT_STATE_DATA, image_length}); + } + + on_finish->complete(0); +} + +} // namespace migration +} // namespace librbd + +template class librbd::migration::RawSnapshot<librbd::ImageCtx>; diff --git a/src/librbd/migration/RawSnapshot.h b/src/librbd/migration/RawSnapshot.h new file mode 100644 index 000000000..9f76d6878 --- /dev/null +++ b/src/librbd/migration/RawSnapshot.h @@ -0,0 +1,75 @@ +// -*- mode:c++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#ifndef CEPH_LIBRBD_MIGRATION_RAW_SNAPSHOT_H +#define CEPH_LIBRBD_MIGRATION_RAW_SNAPSHOT_H + +#include "include/buffer_fwd.h" +#include "include/int_types.h" +#include "common/zipkin_trace.h" +#include "librbd/Types.h" +#include "librbd/io/Types.h" +#include "librbd/migration/SnapshotInterface.h" +#include "json_spirit/json_spirit.h" +#include <memory> + +namespace librbd { + +struct ImageCtx; + +namespace migration { + +template <typename> struct SourceSpecBuilder; +struct StreamInterface; + +template <typename ImageCtxT> +class RawSnapshot : public SnapshotInterface { +public: + static RawSnapshot* create( + ImageCtx* image_ctx, const json_spirit::mObject& json_object, + const SourceSpecBuilder<ImageCtxT>* source_spec_builder, uint64_t index) { + return new RawSnapshot(image_ctx, json_object, source_spec_builder, index); + } + + RawSnapshot(ImageCtxT* image_ctx, const json_spirit::mObject& json_object, + const SourceSpecBuilder<ImageCtxT>* source_spec_builder, + uint64_t index); + RawSnapshot(const RawSnapshot&) = delete; + RawSnapshot& operator=(const RawSnapshot&) = delete; + + void open(SnapshotInterface* previous_snapshot, Context* on_finish) override; + void close(Context* on_finish) override; + + const SnapInfo& get_snap_info() const override { + return m_snap_info; + } + + void read(io::AioCompletion* aio_comp, io::Extents&& image_extents, + io::ReadResult&& read_result, int op_flags, int read_flags, + const ZTracer::Trace &parent_trace) override; + + void list_snap(io::Extents&& image_extents, int list_snaps_flags, + io::SparseExtents* sparse_extents, + const ZTracer::Trace &parent_trace, + Context* on_finish) override; + +private: + struct OpenRequest; + + ImageCtxT* m_image_ctx; + json_spirit::mObject m_json_object; + const SourceSpecBuilder<ImageCtxT>* m_source_spec_builder; + uint64_t m_index = 0; + + SnapInfo m_snap_info; + + std::shared_ptr<StreamInterface> m_stream; + +}; + +} // namespace migration +} // namespace librbd + +extern template class librbd::migration::RawSnapshot<librbd::ImageCtx>; + +#endif // CEPH_LIBRBD_MIGRATION_RAW_SNAPSHOT_H diff --git a/src/librbd/migration/S3Stream.cc b/src/librbd/migration/S3Stream.cc new file mode 100644 index 000000000..3b4db0cef --- /dev/null +++ b/src/librbd/migration/S3Stream.cc @@ -0,0 +1,202 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#include "librbd/migration/S3Stream.h" +#include "common/armor.h" +#include "common/ceph_crypto.h" +#include "common/ceph_time.h" +#include "common/dout.h" +#include "common/errno.h" +#include "librbd/AsioEngine.h" +#include "librbd/ImageCtx.h" +#include "librbd/Utils.h" +#include "librbd/asio/Utils.h" +#include "librbd/io/AioCompletion.h" +#include "librbd/io/ReadResult.h" +#include "librbd/migration/HttpClient.h" +#include "librbd/migration/HttpProcessorInterface.h" +#include <boost/beast/http.hpp> + +#undef FMT_HEADER_ONLY +#define FMT_HEADER_ONLY 1 +#include <fmt/chrono.h> +#include <fmt/format.h> + +#include <time.h> + +namespace librbd { +namespace migration { + +using HttpRequest = boost::beast::http::request<boost::beast::http::empty_body>; + +namespace { + +const std::string URL_KEY {"url"}; +const std::string ACCESS_KEY {"access_key"}; +const std::string SECRET_KEY {"secret_key"}; + +} // anonymous namespace + +template <typename I> +struct S3Stream<I>::HttpProcessor : public HttpProcessorInterface { + S3Stream* s3stream; + + HttpProcessor(S3Stream* s3stream) : s3stream(s3stream) { + } + + void process_request(EmptyRequest& request) override { + s3stream->process_request(request); + } +}; + +#define dout_subsys ceph_subsys_rbd +#undef dout_prefix +#define dout_prefix *_dout << "librbd::migration::S3Stream: " << this \ + << " " << __func__ << ": " + +template <typename I> +S3Stream<I>::S3Stream(I* image_ctx, const json_spirit::mObject& json_object) + : m_image_ctx(image_ctx), m_cct(image_ctx->cct), + m_asio_engine(image_ctx->asio_engine), m_json_object(json_object), + m_http_processor(std::make_unique<HttpProcessor>(this)) { +} + +template <typename I> +S3Stream<I>::~S3Stream() { +} + +template <typename I> +void S3Stream<I>::open(Context* on_finish) { + auto& url_value = m_json_object[URL_KEY]; + if (url_value.type() != json_spirit::str_type) { + lderr(m_cct) << "failed to locate '" << URL_KEY << "' key" << dendl; + on_finish->complete(-EINVAL); + return; + } + + auto& access_key = m_json_object[ACCESS_KEY]; + if (access_key.type() != json_spirit::str_type) { + lderr(m_cct) << "failed to locate '" << ACCESS_KEY << "' key" << dendl; + on_finish->complete(-EINVAL); + return; + } + + auto& secret_key = m_json_object[SECRET_KEY]; + if (secret_key.type() != json_spirit::str_type) { + lderr(m_cct) << "failed to locate '" << SECRET_KEY << "' key" << dendl; + on_finish->complete(-EINVAL); + return; + } + + m_url = url_value.get_str(); + + librados::Rados rados(m_image_ctx->md_ctx); + int r = 0; + m_access_key = access_key.get_str(); + if (util::is_config_key_uri(m_access_key)) { + r = util::get_config_key(rados, m_access_key, &m_access_key); + if (r < 0) { + lderr(m_cct) << "failed to retrieve access key from config: " + << cpp_strerror(r) << dendl; + on_finish->complete(r); + return; + } + } + + m_secret_key = secret_key.get_str(); + if (util::is_config_key_uri(m_secret_key)) { + r = util::get_config_key(rados, m_secret_key, &m_secret_key); + if (r < 0) { + lderr(m_cct) << "failed to retrieve secret key from config: " + << cpp_strerror(r) << dendl; + on_finish->complete(r); + return; + } + } + + ldout(m_cct, 10) << "url=" << m_url << ", " + << "access_key=" << m_access_key << dendl; + + m_http_client.reset(HttpClient<I>::create(m_image_ctx, m_url)); + m_http_client->set_http_processor(m_http_processor.get()); + m_http_client->open(on_finish); +} + +template <typename I> +void S3Stream<I>::close(Context* on_finish) { + ldout(m_cct, 10) << dendl; + + if (!m_http_client) { + on_finish->complete(0); + return; + } + + m_http_client->close(on_finish); +} + +template <typename I> +void S3Stream<I>::get_size(uint64_t* size, Context* on_finish) { + ldout(m_cct, 10) << dendl; + + m_http_client->get_size(size, on_finish); +} + +template <typename I> +void S3Stream<I>::read(io::Extents&& byte_extents, bufferlist* data, + Context* on_finish) { + ldout(m_cct, 20) << "byte_extents=" << byte_extents << dendl; + + m_http_client->read(std::move(byte_extents), data, on_finish); +} + +template <typename I> +void S3Stream<I>::process_request(HttpRequest& http_request) { + ldout(m_cct, 20) << dendl; + + // format RFC 1123 date/time + auto time = ceph::real_clock::to_time_t(ceph::real_clock::now()); + struct tm timeInfo; + gmtime_r(&time, &timeInfo); + + std::string date = fmt::format("{:%a, %d %b %Y %H:%M:%S %z}", timeInfo); + http_request.set(boost::beast::http::field::date, date); + + // note: we don't support S3 subresources + std::string canonicalized_resource = std::string(http_request.target()); + + std::string string_to_sign = fmt::format( + "{}\n\n\n{}\n{}", + std::string(boost::beast::http::to_string(http_request.method())), + date, canonicalized_resource); + + // create HMAC-SHA1 signature from secret key + string-to-sign + sha1_digest_t digest; + ceph::crypto::HMACSHA1 hmac( + reinterpret_cast<const unsigned char*>(m_secret_key.data()), + m_secret_key.size()); + hmac.Update(reinterpret_cast<const unsigned char*>(string_to_sign.data()), + string_to_sign.size()); + hmac.Final(reinterpret_cast<unsigned char*>(digest.v)); + + // base64 encode the result + char buf[64]; + int r = ceph_armor(std::begin(buf), std::begin(buf) + sizeof(buf), + reinterpret_cast<const char *>(digest.v), + reinterpret_cast<const char *>(digest.v + digest.SIZE)); + if (r < 0) { + ceph_abort("ceph_armor failed"); + } + + // store the access-key + signature in the HTTP authorization header + std::string signature = std::string(std::begin(buf), std::begin(buf) + r); + std::string authorization = fmt::format("AWS {}:{}", m_access_key, signature); + http_request.set(boost::beast::http::field::authorization, authorization); + + ldout(m_cct, 20) << "string_to_sign=" << string_to_sign << ", " + << "authorization=" << authorization << dendl; +} + +} // namespace migration +} // namespace librbd + +template class librbd::migration::S3Stream<librbd::ImageCtx>; diff --git a/src/librbd/migration/S3Stream.h b/src/librbd/migration/S3Stream.h new file mode 100644 index 000000000..586b21787 --- /dev/null +++ b/src/librbd/migration/S3Stream.h @@ -0,0 +1,78 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#ifndef CEPH_LIBRBD_MIGRATION_S3_STREAM_H +#define CEPH_LIBRBD_MIGRATION_S3_STREAM_H + +#include "include/int_types.h" +#include "librbd/migration/StreamInterface.h" +#include <boost/beast/http/empty_body.hpp> +#include <boost/beast/http/message.hpp> +#include <boost/beast/http/string_body.hpp> +#include <json_spirit/json_spirit.h> +#include <memory> +#include <string> + +struct Context; + +namespace librbd { + +struct AsioEngine; +struct ImageCtx; + +namespace migration { + +template <typename> class HttpClient; + +template <typename ImageCtxT> +class S3Stream : public StreamInterface { +public: + static S3Stream* create(ImageCtxT* image_ctx, + const json_spirit::mObject& json_object) { + return new S3Stream(image_ctx, json_object); + } + + S3Stream(ImageCtxT* image_ctx, const json_spirit::mObject& json_object); + ~S3Stream() override; + + S3Stream(const S3Stream&) = delete; + S3Stream& operator=(const S3Stream&) = delete; + + void open(Context* on_finish) override; + void close(Context* on_finish) override; + + void get_size(uint64_t* size, Context* on_finish) override; + + void read(io::Extents&& byte_extents, bufferlist* data, + Context* on_finish) override; + +private: + using HttpRequest = boost::beast::http::request< + boost::beast::http::empty_body>; + using HttpResponse = boost::beast::http::response< + boost::beast::http::string_body>; + + struct HttpProcessor; + + ImageCtxT* m_image_ctx; + CephContext* m_cct; + std::shared_ptr<AsioEngine> m_asio_engine; + json_spirit::mObject m_json_object; + + std::string m_url; + std::string m_access_key; + std::string m_secret_key; + + std::unique_ptr<HttpProcessor> m_http_processor; + std::unique_ptr<HttpClient<ImageCtxT>> m_http_client; + + void process_request(HttpRequest& http_request); + +}; + +} // namespace migration +} // namespace librbd + +extern template class librbd::migration::S3Stream<librbd::ImageCtx>; + +#endif // CEPH_LIBRBD_MIGRATION_S3_STREAM_H diff --git a/src/librbd/migration/SnapshotInterface.h b/src/librbd/migration/SnapshotInterface.h new file mode 100644 index 000000000..9990802c5 --- /dev/null +++ b/src/librbd/migration/SnapshotInterface.h @@ -0,0 +1,48 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#ifndef CEPH_LIBRBD_MIGRATION_SNAPSHOT_INTERFACE_H +#define CEPH_LIBRBD_MIGRATION_SNAPSHOT_INTERFACE_H + +#include "include/buffer_fwd.h" +#include "include/int_types.h" +#include "common/zipkin_trace.h" +#include "librbd/Types.h" +#include "librbd/io/Types.h" +#include <string> + +struct Context; + +namespace librbd { + +namespace io { +struct AioCompletion; +struct ReadResult; +} // namespace io + +namespace migration { + +struct SnapshotInterface { + virtual ~SnapshotInterface() { + } + + virtual void open(SnapshotInterface* previous_snapshot, + Context* on_finish) = 0; + virtual void close(Context* on_finish) = 0; + + virtual const SnapInfo& get_snap_info() const = 0; + + virtual void read(io::AioCompletion* aio_comp, io::Extents&& image_extents, + io::ReadResult&& read_result, int op_flags, int read_flags, + const ZTracer::Trace &parent_trace) = 0; + + virtual void list_snap(io::Extents&& image_extents, int list_snaps_flags, + io::SparseExtents* sparse_extents, + const ZTracer::Trace &parent_trace, + Context* on_finish) = 0; +}; + +} // namespace migration +} // namespace librbd + +#endif // CEPH_LIBRBD_MIGRATION_SNAPSHOT_INTERFACE_H diff --git a/src/librbd/migration/SourceSpecBuilder.cc b/src/librbd/migration/SourceSpecBuilder.cc new file mode 100644 index 000000000..214d7ce0e --- /dev/null +++ b/src/librbd/migration/SourceSpecBuilder.cc @@ -0,0 +1,147 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#include "librbd/migration/SourceSpecBuilder.h" +#include "common/dout.h" +#include "librbd/ImageCtx.h" +#include "librbd/migration/FileStream.h" +#include "librbd/migration/HttpStream.h" +#include "librbd/migration/S3Stream.h" +#include "librbd/migration/NativeFormat.h" +#include "librbd/migration/QCOWFormat.h" +#include "librbd/migration/RawFormat.h" +#include "librbd/migration/RawSnapshot.h" + +#define dout_subsys ceph_subsys_rbd +#undef dout_prefix +#define dout_prefix *_dout << "librbd::migration::SourceSpecBuilder: " << this \ + << " " << __func__ << ": " + +namespace librbd { +namespace migration { + +namespace { + +const std::string STREAM_KEY{"stream"}; +const std::string TYPE_KEY{"type"}; + +} // anonymous namespace + +template <typename I> +int SourceSpecBuilder<I>::parse_source_spec( + const std::string& source_spec, + json_spirit::mObject* source_spec_object) const { + auto cct = m_image_ctx->cct; + ldout(cct, 10) << dendl; + + json_spirit::mValue json_root; + if(json_spirit::read(source_spec, json_root)) { + try { + *source_spec_object = json_root.get_obj(); + return 0; + } catch (std::runtime_error&) { + } + } + + lderr(cct) << "invalid source-spec JSON" << dendl; + return -EBADMSG; +} + +template <typename I> +int SourceSpecBuilder<I>::build_format( + const json_spirit::mObject& source_spec_object, bool import_only, + std::unique_ptr<FormatInterface>* format) const { + auto cct = m_image_ctx->cct; + ldout(cct, 10) << dendl; + + auto type_value_it = source_spec_object.find(TYPE_KEY); + if (type_value_it == source_spec_object.end() || + type_value_it->second.type() != json_spirit::str_type) { + lderr(cct) << "failed to locate format type value" << dendl; + return -EINVAL; + } + + auto& type = type_value_it->second.get_str(); + if (type == "native") { + format->reset(NativeFormat<I>::create(m_image_ctx, source_spec_object, + import_only)); + } else if (type == "qcow") { + format->reset(QCOWFormat<I>::create(m_image_ctx, source_spec_object, this)); + } else if (type == "raw") { + format->reset(RawFormat<I>::create(m_image_ctx, source_spec_object, this)); + } else { + lderr(cct) << "unknown or unsupported format type '" << type << "'" + << dendl; + return -ENOSYS; + } + return 0; +} + +template <typename I> +int SourceSpecBuilder<I>::build_snapshot( + const json_spirit::mObject& source_spec_object, uint64_t index, + std::shared_ptr<SnapshotInterface>* snapshot) const { + auto cct = m_image_ctx->cct; + ldout(cct, 10) << dendl; + + auto type_value_it = source_spec_object.find(TYPE_KEY); + if (type_value_it == source_spec_object.end() || + type_value_it->second.type() != json_spirit::str_type) { + lderr(cct) << "failed to locate snapshot type value" << dendl; + return -EINVAL; + } + + auto& type = type_value_it->second.get_str(); + if (type == "raw") { + snapshot->reset(RawSnapshot<I>::create(m_image_ctx, source_spec_object, + this, index)); + } else { + lderr(cct) << "unknown or unsupported format type '" << type << "'" + << dendl; + return -ENOSYS; + } + return 0; +} + +template <typename I> +int SourceSpecBuilder<I>::build_stream( + const json_spirit::mObject& source_spec_object, + std::shared_ptr<StreamInterface>* stream) const { + auto cct = m_image_ctx->cct; + ldout(cct, 10) << dendl; + + auto stream_value_it = source_spec_object.find(STREAM_KEY); + if (stream_value_it == source_spec_object.end() || + stream_value_it->second.type() != json_spirit::obj_type) { + lderr(cct) << "failed to locate stream object" << dendl; + return -EINVAL; + } + + auto& stream_obj = stream_value_it->second.get_obj(); + auto type_value_it = stream_obj.find(TYPE_KEY); + if (type_value_it == stream_obj.end() || + type_value_it->second.type() != json_spirit::str_type) { + lderr(cct) << "failed to locate stream type value" << dendl; + return -EINVAL; + } + + auto& type = type_value_it->second.get_str(); + if (type == "file") { + stream->reset(FileStream<I>::create(m_image_ctx, stream_obj)); + } else if (type == "http") { + stream->reset(HttpStream<I>::create(m_image_ctx, stream_obj)); + } else if (type == "s3") { + stream->reset(S3Stream<I>::create(m_image_ctx, stream_obj)); + } else { + lderr(cct) << "unknown or unsupported stream type '" << type << "'" + << dendl; + return -ENOSYS; + } + + return 0; +} + +} // namespace migration +} // namespace librbd + +template class librbd::migration::SourceSpecBuilder<librbd::ImageCtx>; diff --git a/src/librbd/migration/SourceSpecBuilder.h b/src/librbd/migration/SourceSpecBuilder.h new file mode 100644 index 000000000..191cb1cbd --- /dev/null +++ b/src/librbd/migration/SourceSpecBuilder.h @@ -0,0 +1,54 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#ifndef CEPH_LIBRBD_MIGRATION_SOURCE_SPEC_BUILDER_H +#define CEPH_LIBRBD_MIGRATION_SOURCE_SPEC_BUILDER_H + +#include "include/int_types.h" +#include <json_spirit/json_spirit.h> +#include <memory> +#include <optional> +#include <string> + +struct Context; + +namespace librbd { + +struct ImageCtx; + +namespace migration { + +struct FormatInterface; +struct SnapshotInterface; +struct StreamInterface; + +template <typename ImageCtxT> +class SourceSpecBuilder { +public: + SourceSpecBuilder(ImageCtxT* image_ctx) : m_image_ctx(image_ctx) { + } + + int parse_source_spec(const std::string& source_spec, + json_spirit::mObject* source_spec_object) const; + + int build_format(const json_spirit::mObject& format_object, bool import_only, + std::unique_ptr<FormatInterface>* format) const; + + int build_snapshot(const json_spirit::mObject& source_spec_object, + uint64_t index, + std::shared_ptr<SnapshotInterface>* snapshot) const; + + int build_stream(const json_spirit::mObject& source_spec_object, + std::shared_ptr<StreamInterface>* stream) const; + +private: + ImageCtxT* m_image_ctx; + +}; + +} // namespace migration +} // namespace librbd + +extern template class librbd::migration::SourceSpecBuilder<librbd::ImageCtx>; + +#endif // CEPH_LIBRBD_MIGRATION_SOURCE_SPEC_BUILDER_H diff --git a/src/librbd/migration/StreamInterface.h b/src/librbd/migration/StreamInterface.h new file mode 100644 index 000000000..782a9a5f8 --- /dev/null +++ b/src/librbd/migration/StreamInterface.h @@ -0,0 +1,32 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#ifndef CEPH_LIBRBD_MIGRATION_STREAM_INTERFACE_H +#define CEPH_LIBRBD_MIGRATION_STREAM_INTERFACE_H + +#include "include/buffer_fwd.h" +#include "include/int_types.h" +#include "librbd/io/Types.h" + +struct Context; + +namespace librbd { +namespace migration { + +struct StreamInterface { + virtual ~StreamInterface() { + } + + virtual void open(Context* on_finish) = 0; + virtual void close(Context* on_finish) = 0; + + virtual void get_size(uint64_t* size, Context* on_finish) = 0; + + virtual void read(io::Extents&& byte_extents, bufferlist* data, + Context* on_finish) = 0; +}; + +} // namespace migration +} // namespace librbd + +#endif // CEPH_LIBRBD_MIGRATION_STREAM_INTERFACE_H diff --git a/src/librbd/migration/Types.h b/src/librbd/migration/Types.h new file mode 100644 index 000000000..244dc28b7 --- /dev/null +++ b/src/librbd/migration/Types.h @@ -0,0 +1,42 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#ifndef CEPH_LIBRBD_MIGRATION_TYPES_H +#define CEPH_LIBRBD_MIGRATION_TYPES_H + +#include <string> +#include <utility> + +namespace librbd { +namespace migration { + +enum UrlScheme { + URL_SCHEME_HTTP, + URL_SCHEME_HTTPS, +}; + +struct UrlSpec { + UrlSpec() {} + UrlSpec(UrlScheme scheme, const std::string& host, const std::string& port, + const std::string& path) + : scheme(scheme), host(host), port(port), path(path) { + } + + UrlScheme scheme = URL_SCHEME_HTTP; + std::string host; + std::string port = "80"; + std::string path = "/"; + +}; + +inline bool operator==(const UrlSpec& lhs, const UrlSpec& rhs) { + return (lhs.scheme == rhs.scheme && + lhs.host == rhs.host && + lhs.port == rhs.port && + lhs.path == rhs.path); +} + +} // namespace migration +} // namespace librbd + +#endif // CEPH_LIBRBD_MIGRATION_TYPES_H diff --git a/src/librbd/migration/Utils.cc b/src/librbd/migration/Utils.cc new file mode 100644 index 000000000..c5c1279d8 --- /dev/null +++ b/src/librbd/migration/Utils.cc @@ -0,0 +1,133 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#include "librbd/migration/Utils.h" +#include "common/dout.h" +#include "common/errno.h" +#include <boost/lexical_cast.hpp> +#include <regex> + +namespace librbd { +namespace migration { +namespace util { + +#define dout_subsys ceph_subsys_rbd +#undef dout_prefix +#define dout_prefix *_dout << "librbd::migration::util::" << __func__ << ": " + +int parse_url(CephContext* cct, const std::string& url, UrlSpec* url_spec) { + ldout(cct, 10) << "url=" << url << dendl; + *url_spec = UrlSpec{}; + + // parse the provided URL (scheme, user, password, host, port, path, + // parameters, query, and fragment) + std::regex url_regex( + R"(^(?:([^:/]*)://)?(?:(\w+)(?::(\w+))?@)?([^/;\?:#]+)(?::([^/;\?#]+))?)" + R"((?:/([^;\?#]*))?(?:;([^\?#]+))?(?:\?([^#]+))?(?:#(\w+))?$)"); + std::smatch match; + if(!std::regex_match(url, match, url_regex)) { + lderr(cct) << "invalid url: '" << url << "'" << dendl; + return -EINVAL; + } + + auto& scheme = match[1]; + if (scheme == "http" || scheme == "") { + url_spec->scheme = URL_SCHEME_HTTP; + } else if (scheme == "https") { + url_spec->scheme = URL_SCHEME_HTTPS; + url_spec->port = "443"; + } else { + lderr(cct) << "invalid url scheme: '" << url << "'" << dendl; + return -EINVAL; + } + + url_spec->host = match[4]; + auto& port = match[5]; + if (port.matched) { + try { + boost::lexical_cast<uint16_t>(port); + } catch (boost::bad_lexical_cast&) { + lderr(cct) << "invalid url port: '" << url << "'" << dendl; + return -EINVAL; + } + url_spec->port = port; + } + + auto& path = match[6]; + if (path.matched) { + url_spec->path += path; + } + return 0; +} + +void zero_shrunk_snapshot(CephContext* cct, const io::Extents& image_extents, + uint64_t snap_id, uint64_t new_size, + std::optional<uint64_t> *previous_size, + io::SparseExtents* sparse_extents) { + if (*previous_size && **previous_size > new_size) { + ldout(cct, 20) << "snapshot resize " << **previous_size << " -> " + << new_size << dendl; + interval_set<uint64_t> zero_interval; + zero_interval.insert(new_size, **previous_size - new_size); + + for (auto& image_extent : image_extents) { + interval_set<uint64_t> image_interval; + image_interval.insert(image_extent.first, image_extent.second); + + image_interval.intersection_of(zero_interval); + for (auto [image_offset, image_length] : image_interval) { + ldout(cct, 20) << "zeroing extent " << image_offset << "~" + << image_length << " at snapshot " << snap_id << dendl; + sparse_extents->insert(image_offset, image_length, + {io::SPARSE_EXTENT_STATE_ZEROED, image_length}); + } + } + } + *previous_size = new_size; +} + +void merge_snapshot_delta(const io::SnapIds& snap_ids, + io::SnapshotDelta* snapshot_delta) { + io::SnapshotDelta orig_snapshot_delta = std::move(*snapshot_delta); + snapshot_delta->clear(); + + auto snap_id_it = snap_ids.begin(); + ceph_assert(snap_id_it != snap_ids.end()); + + // merge any snapshot intervals that were not requested + std::list<io::SparseExtents*> pending_sparse_extents; + for (auto& [snap_key, sparse_extents] : orig_snapshot_delta) { + // advance to next valid requested snap id + while (snap_id_it != snap_ids.end() && *snap_id_it < snap_key.first) { + ++snap_id_it; + } + if (snap_id_it == snap_ids.end()) { + break; + } + + // loop through older write/read snapshot sparse extents to remove any + // overlaps with the current sparse extent + for (auto prev_sparse_extents : pending_sparse_extents) { + for (auto& sparse_extent : sparse_extents) { + prev_sparse_extents->erase(sparse_extent.get_off(), + sparse_extent.get_len()); + } + } + + auto write_read_snap_ids = std::make_pair(*snap_id_it, snap_key.second); + (*snapshot_delta)[write_read_snap_ids] = std::move(sparse_extents); + + if (write_read_snap_ids.first > snap_key.first) { + // the current snapshot wasn't requested so it might need to get + // merged with a later snapshot + pending_sparse_extents.push_back(&(*snapshot_delta)[write_read_snap_ids]); + } else { + // we don't merge results passed a valid requested snapshot + pending_sparse_extents.clear(); + } + } +} + +} // namespace util +} // namespace migration +} // namespace librbd diff --git a/src/librbd/migration/Utils.h b/src/librbd/migration/Utils.h new file mode 100644 index 000000000..afbadde7d --- /dev/null +++ b/src/librbd/migration/Utils.h @@ -0,0 +1,30 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#ifndef CEPH_LIBRBD_MIGRATION_UTILS_H +#define CEPH_LIBRBD_MIGRATION_UTILS_H + +#include "include/common_fwd.h" +#include "librbd/io/Types.h" +#include "librbd/migration/Types.h" +#include <optional> +#include <string> + +namespace librbd { +namespace migration { +namespace util { + +int parse_url(CephContext* cct, const std::string& url, UrlSpec* url_spec); + +void zero_shrunk_snapshot(CephContext* cct, const io::Extents& image_extents, + uint64_t snap_id, uint64_t new_size, + std::optional<uint64_t> *previous_size, + io::SparseExtents* sparse_extents); +void merge_snapshot_delta(const io::SnapIds& snap_ids, + io::SnapshotDelta* snapshot_delta); + +} // namespace util +} // namespace migration +} // namespace librbd + +#endif // CEPH_LIBRBD_MIGRATION_UTILS_H |