diff options
Diffstat (limited to 'src/librbd/migration/HttpClient.cc')
-rw-r--r-- | src/librbd/migration/HttpClient.cc | 947 |
1 files changed, 947 insertions, 0 deletions
diff --git a/src/librbd/migration/HttpClient.cc b/src/librbd/migration/HttpClient.cc new file mode 100644 index 000000000..90d5723ed --- /dev/null +++ b/src/librbd/migration/HttpClient.cc @@ -0,0 +1,947 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#include "librbd/migration/HttpClient.h" +#include "common/dout.h" +#include "common/errno.h" +#include "librbd/AsioEngine.h" +#include "librbd/ImageCtx.h" +#include "librbd/Utils.h" +#include "librbd/asio/Utils.h" +#include "librbd/io/AioCompletion.h" +#include "librbd/io/ReadResult.h" +#include "librbd/migration/Utils.h" +#include <boost/asio/buffer.hpp> +#include <boost/asio/post.hpp> +#include <boost/asio/ip/tcp.hpp> +#include <boost/asio/read.hpp> +#include <boost/asio/ssl.hpp> +#include <boost/beast/core.hpp> +#include <boost/beast/http/read.hpp> +#include <boost/lexical_cast.hpp> +#include <deque> + +namespace librbd { +namespace migration { + +#define dout_subsys ceph_subsys_rbd +#undef dout_prefix +#define dout_prefix *_dout << "librbd::migration::HttpClient::" \ + << "HttpSession " << this << " " << __func__ \ + << ": " + +/** + * boost::beast utilizes non-inheriting template classes for handling plain vs + * encrypted TCP streams. Utilize a base-class for handling the majority of the + * logic for handling connecting, disconnecting, reseting, and sending requests. + */ + +template <typename I> +template <typename D> +class HttpClient<I>::HttpSession : public HttpSessionInterface { +public: + void init(Context* on_finish) override { + ceph_assert(m_http_client->m_strand.running_in_this_thread()); + + auto cct = m_http_client->m_cct; + ldout(cct, 15) << dendl; + + ceph_assert(m_state == STATE_UNINITIALIZED); + m_state = STATE_CONNECTING; + + resolve_host(on_finish); + } + + void shut_down(Context* on_finish) override { + ceph_assert(m_http_client->m_strand.running_in_this_thread()); + + auto cct = m_http_client->m_cct; + ldout(cct, 15) << dendl; + + ceph_assert(on_finish != nullptr); + ceph_assert(m_on_shutdown == nullptr); + m_on_shutdown = on_finish; + + auto current_state = m_state; + if (current_state == STATE_UNINITIALIZED) { + // never initialized or resolve/connect failed + on_finish->complete(0); + return; + } + + m_state = STATE_SHUTTING_DOWN; + if (current_state != STATE_READY) { + // delay shutdown until current state transition completes + return; + } + + disconnect(new LambdaContext([this](int r) { handle_shut_down(r); })); + } + + void issue(std::shared_ptr<Work>&& work) override { + ceph_assert(m_http_client->m_strand.running_in_this_thread()); + + auto cct = m_http_client->m_cct; + ldout(cct, 20) << "work=" << work.get() << dendl; + + if (is_shutdown()) { + lderr(cct) << "cannot issue HTTP request, client is shutdown" + << dendl; + work->complete(-ESHUTDOWN, {}); + return; + } + + bool first_issue = m_issue_queue.empty(); + m_issue_queue.emplace_back(work); + if (m_state == STATE_READY && first_issue) { + ldout(cct, 20) << "sending http request: work=" << work.get() << dendl; + finalize_issue(std::move(work)); + } else if (m_state == STATE_UNINITIALIZED) { + ldout(cct, 20) << "resetting HTTP session: work=" << work.get() << dendl; + m_state = STATE_RESET_CONNECTING; + resolve_host(nullptr); + } else { + ldout(cct, 20) << "queueing HTTP request: work=" << work.get() << dendl; + } + } + + void finalize_issue(std::shared_ptr<Work>&& work) { + auto cct = m_http_client->m_cct; + ldout(cct, 20) << "work=" << work.get() << dendl; + + ++m_in_flight_requests; + (*work)(derived().stream()); + } + + void handle_issue(boost::system::error_code ec, + std::shared_ptr<Work>&& work) override { + ceph_assert(m_http_client->m_strand.running_in_this_thread()); + + auto cct = m_http_client->m_cct; + ldout(cct, 20) << "work=" << work.get() << ", r=" << -ec.value() << dendl; + + ceph_assert(m_in_flight_requests > 0); + --m_in_flight_requests; + if (maybe_finalize_reset()) { + // previous request is attempting reset to this request will be resent + return; + } + + ceph_assert(!m_issue_queue.empty()); + m_issue_queue.pop_front(); + + if (is_shutdown()) { + lderr(cct) << "client shutdown during in-flight request" << dendl; + work->complete(-ESHUTDOWN, {}); + + maybe_finalize_shutdown(); + return; + } + + if (ec) { + if (ec == boost::asio::error::bad_descriptor || + ec == boost::asio::error::broken_pipe || + ec == boost::asio::error::connection_reset || + ec == boost::asio::error::operation_aborted || + ec == boost::asio::ssl::error::stream_truncated || + ec == boost::beast::http::error::end_of_stream || + ec == boost::beast::http::error::partial_message) { + ldout(cct, 5) << "remote peer stream closed, retrying request" << dendl; + m_issue_queue.push_front(work); + } else if (ec == boost::beast::error::timeout) { + lderr(cct) << "timed-out while issuing request" << dendl; + work->complete(-ETIMEDOUT, {}); + } else { + lderr(cct) << "failed to issue request: " << ec.message() << dendl; + work->complete(-ec.value(), {}); + } + + // attempt to recover the connection + reset(); + return; + } + + bool first_receive = m_receive_queue.empty(); + m_receive_queue.push_back(work); + if (first_receive) { + receive(std::move(work)); + } + + // TODO disable pipelining for non-idempotent requests + + // pipeline the next request into the stream + if (!m_issue_queue.empty()) { + work = m_issue_queue.front(); + ldout(cct, 20) << "sending http request: work=" << work.get() << dendl; + finalize_issue(std::move(work)); + } + } + +protected: + HttpClient* m_http_client; + + HttpSession(HttpClient* http_client) + : m_http_client(http_client), m_resolver(http_client->m_strand) { + } + + virtual void connect(boost::asio::ip::tcp::resolver::results_type results, + Context* on_finish) = 0; + virtual void disconnect(Context* on_finish) = 0; + + void close_socket() { + auto cct = m_http_client->m_cct; + ldout(cct, 15) << dendl; + + boost::system::error_code ec; + boost::beast::get_lowest_layer(derived().stream()).socket().close(ec); + } + +private: + enum State { + STATE_UNINITIALIZED, + STATE_CONNECTING, + STATE_READY, + STATE_RESET_PENDING, + STATE_RESET_DISCONNECTING, + STATE_RESET_CONNECTING, + STATE_SHUTTING_DOWN, + STATE_SHUTDOWN, + }; + + State m_state = STATE_UNINITIALIZED; + boost::asio::ip::tcp::resolver m_resolver; + + Context* m_on_shutdown = nullptr; + + uint64_t m_in_flight_requests = 0; + std::deque<std::shared_ptr<Work>> m_issue_queue; + std::deque<std::shared_ptr<Work>> m_receive_queue; + + boost::beast::flat_buffer m_buffer; + std::optional<boost::beast::http::parser<false, EmptyBody>> m_header_parser; + std::optional<boost::beast::http::parser<false, StringBody>> m_parser; + + D& derived() { + return static_cast<D&>(*this); + } + + void resolve_host(Context* on_finish) { + auto cct = m_http_client->m_cct; + ldout(cct, 15) << dendl; + + shutdown_socket(); + m_resolver.async_resolve( + m_http_client->m_url_spec.host, m_http_client->m_url_spec.port, + [this, on_finish](boost::system::error_code ec, auto results) { + handle_resolve_host(ec, results, on_finish); }); + } + + void handle_resolve_host( + boost::system::error_code ec, + boost::asio::ip::tcp::resolver::results_type results, + Context* on_finish) { + auto cct = m_http_client->m_cct; + int r = -ec.value(); + ldout(cct, 15) << "r=" << r << dendl; + + if (ec) { + if (ec == boost::asio::error::host_not_found) { + r = -ENOENT; + } else if (ec == boost::asio::error::host_not_found_try_again) { + // TODO: add retry throttle + r = -EAGAIN; + } + + lderr(cct) << "failed to resolve host '" + << m_http_client->m_url_spec.host << "': " + << cpp_strerror(r) << dendl; + advance_state(STATE_UNINITIALIZED, r, on_finish); + return; + } + + connect(results, new LambdaContext([this, on_finish](int r) { + handle_connect(r, on_finish); })); + } + + void handle_connect(int r, Context* on_finish) { + auto cct = m_http_client->m_cct; + ldout(cct, 15) << "r=" << r << dendl; + + if (r < 0) { + lderr(cct) << "failed to connect to host '" + << m_http_client->m_url_spec.host << "': " + << cpp_strerror(r) << dendl; + advance_state(STATE_UNINITIALIZED, r, on_finish); + return; + } + + advance_state(STATE_READY, 0, on_finish); + } + + void handle_shut_down(int r) { + auto cct = m_http_client->m_cct; + ldout(cct, 15) << "r=" << r << dendl; + + if (r < 0) { + lderr(cct) << "failed to disconnect stream: '" << cpp_strerror(r) + << dendl; + } + + // cancel all in-flight send/receives (if any) + shutdown_socket(); + + maybe_finalize_shutdown(); + } + + void maybe_finalize_shutdown() { + if (m_in_flight_requests > 0) { + return; + } + + // cancel any queued IOs + fail_queued_work(-ESHUTDOWN); + + advance_state(STATE_SHUTDOWN, 0, nullptr); + } + + bool is_shutdown() const { + ceph_assert(m_http_client->m_strand.running_in_this_thread()); + return (m_state == STATE_SHUTTING_DOWN || m_state == STATE_SHUTDOWN); + } + + void reset() { + ceph_assert(m_http_client->m_strand.running_in_this_thread()); + ceph_assert(m_state == STATE_READY); + + auto cct = m_http_client->m_cct; + ldout(cct, 15) << dendl; + + m_state = STATE_RESET_PENDING; + maybe_finalize_reset(); + } + + bool maybe_finalize_reset() { + if (m_state != STATE_RESET_PENDING) { + return false; + } + + if (m_in_flight_requests > 0) { + return true; + } + + ceph_assert(m_http_client->m_strand.running_in_this_thread()); + auto cct = m_http_client->m_cct; + ldout(cct, 15) << dendl; + + m_buffer.clear(); + + // move in-flight request back to the front of the issue queue + m_issue_queue.insert(m_issue_queue.begin(), + m_receive_queue.begin(), m_receive_queue.end()); + m_receive_queue.clear(); + + m_state = STATE_RESET_DISCONNECTING; + disconnect(new LambdaContext([this](int r) { handle_reset(r); })); + return true; + } + + void handle_reset(int r) { + auto cct = m_http_client->m_cct; + ldout(cct, 15) << "r=" << r << dendl; + + if (r < 0) { + lderr(cct) << "failed to disconnect stream: '" << cpp_strerror(r) + << dendl; + } + + advance_state(STATE_RESET_CONNECTING, r, nullptr); + } + + int shutdown_socket() { + if (!boost::beast::get_lowest_layer( + derived().stream()).socket().is_open()) { + return 0; + } + + auto cct = m_http_client->m_cct; + ldout(cct, 15) << dendl; + + boost::system::error_code ec; + boost::beast::get_lowest_layer(derived().stream()).socket().shutdown( + boost::asio::ip::tcp::socket::shutdown_both, ec); + + if (ec && ec != boost::beast::errc::not_connected) { + lderr(cct) << "failed to shutdown socket: " << ec.message() << dendl; + return -ec.value(); + } + + close_socket(); + return 0; + } + + void receive(std::shared_ptr<Work>&& work) { + auto cct = m_http_client->m_cct; + ldout(cct, 15) << "work=" << work.get() << dendl; + + ceph_assert(!m_receive_queue.empty()); + ++m_in_flight_requests; + + // receive the response for this request + m_parser.emplace(); + if (work->header_only()) { + // HEAD requests don't trasfer data but the parser still cares about max + // content-length + m_header_parser.emplace(); + m_header_parser->body_limit(std::numeric_limits<uint64_t>::max()); + + boost::beast::http::async_read_header( + derived().stream(), m_buffer, *m_header_parser, + [this, work=std::move(work)] + (boost::beast::error_code ec, std::size_t) mutable { + handle_receive(ec, std::move(work)); + }); + } else { + m_parser->body_limit(1 << 25); // max RBD object size + boost::beast::http::async_read( + derived().stream(), m_buffer, *m_parser, + [this, work=std::move(work)] + (boost::beast::error_code ec, std::size_t) mutable { + handle_receive(ec, std::move(work)); + }); + } + } + + void handle_receive(boost::system::error_code ec, + std::shared_ptr<Work>&& work) { + auto cct = m_http_client->m_cct; + ldout(cct, 15) << "work=" << work.get() << ", r=" << -ec.value() << dendl; + + ceph_assert(m_in_flight_requests > 0); + --m_in_flight_requests; + if (maybe_finalize_reset()) { + // previous request is attempting reset to this request will be resent + return; + } + + ceph_assert(!m_receive_queue.empty()); + m_receive_queue.pop_front(); + + if (is_shutdown()) { + lderr(cct) << "client shutdown with in-flight request" << dendl; + work->complete(-ESHUTDOWN, {}); + + maybe_finalize_shutdown(); + return; + } + + if (ec) { + if (ec == boost::asio::error::bad_descriptor || + ec == boost::asio::error::broken_pipe || + ec == boost::asio::error::connection_reset || + ec == boost::asio::error::operation_aborted || + ec == boost::asio::ssl::error::stream_truncated || + ec == boost::beast::http::error::end_of_stream || + ec == boost::beast::http::error::partial_message) { + ldout(cct, 5) << "remote peer stream closed, retrying request" << dendl; + m_receive_queue.push_front(work); + } else if (ec == boost::beast::error::timeout) { + lderr(cct) << "timed-out while issuing request" << dendl; + work->complete(-ETIMEDOUT, {}); + } else { + lderr(cct) << "failed to issue request: " << ec.message() << dendl; + work->complete(-ec.value(), {}); + } + + reset(); + return; + } + + Response response; + if (work->header_only()) { + m_parser.emplace(std::move(*m_header_parser)); + } + response = m_parser->release(); + + // basic response code handling in a common location + int r = 0; + auto result = response.result(); + if (result == boost::beast::http::status::not_found) { + lderr(cct) << "requested resource does not exist" << dendl; + r = -ENOENT; + } else if (result == boost::beast::http::status::forbidden) { + lderr(cct) << "permission denied attempting to access resource" << dendl; + r = -EACCES; + } else if (boost::beast::http::to_status_class(result) != + boost::beast::http::status_class::successful) { + lderr(cct) << "failed to retrieve size: HTTP " << result << dendl; + r = -EIO; + } + + bool need_eof = response.need_eof(); + if (r < 0) { + work->complete(r, {}); + } else { + work->complete(0, std::move(response)); + } + + if (need_eof) { + ldout(cct, 20) << "reset required for non-pipelined response: " + << "work=" << work.get() << dendl; + reset(); + } else if (!m_receive_queue.empty()) { + auto work = m_receive_queue.front(); + receive(std::move(work)); + } + } + + void advance_state(State next_state, int r, Context* on_finish) { + auto cct = m_http_client->m_cct; + auto current_state = m_state; + ldout(cct, 15) << "current_state=" << current_state << ", " + << "next_state=" << next_state << ", " + << "r=" << r << dendl; + + m_state = next_state; + if (current_state == STATE_CONNECTING) { + if (next_state == STATE_UNINITIALIZED) { + shutdown_socket(); + on_finish->complete(r); + return; + } else if (next_state == STATE_READY) { + on_finish->complete(r); + return; + } + } else if (current_state == STATE_SHUTTING_DOWN) { + if (next_state == STATE_READY) { + // shut down requested while connecting/resetting + disconnect(new LambdaContext([this](int r) { handle_shut_down(r); })); + return; + } else if (next_state == STATE_UNINITIALIZED || + next_state == STATE_SHUTDOWN || + next_state == STATE_RESET_CONNECTING) { + ceph_assert(m_on_shutdown != nullptr); + m_on_shutdown->complete(r); + return; + } + } else if (current_state == STATE_RESET_DISCONNECTING) { + // disconnected from peer -- ignore errors and reconnect + ceph_assert(next_state == STATE_RESET_CONNECTING); + ceph_assert(on_finish == nullptr); + shutdown_socket(); + resolve_host(nullptr); + return; + } else if (current_state == STATE_RESET_CONNECTING) { + ceph_assert(on_finish == nullptr); + if (next_state == STATE_READY) { + // restart queued IO + if (!m_issue_queue.empty()) { + auto& work = m_issue_queue.front(); + finalize_issue(std::move(work)); + } + return; + } else if (next_state == STATE_UNINITIALIZED) { + shutdown_socket(); + + // fail all queued IO + fail_queued_work(r); + return; + } + } + + lderr(cct) << "unexpected state transition: " + << "current_state=" << current_state << ", " + << "next_state=" << next_state << dendl; + ceph_assert(false); + } + + void complete_work(std::shared_ptr<Work> work, int r, Response&& response) { + auto cct = m_http_client->m_cct; + ldout(cct, 20) << "work=" << work.get() << ", r=" << r << dendl; + + work->complete(r, std::move(response)); + } + + void fail_queued_work(int r) { + auto cct = m_http_client->m_cct; + ldout(cct, 10) << "r=" << r << dendl; + + for (auto& work : m_issue_queue) { + complete_work(work, r, {}); + } + m_issue_queue.clear(); + ceph_assert(m_receive_queue.empty()); + } +}; + +#undef dout_prefix +#define dout_prefix *_dout << "librbd::migration::HttpClient::" \ + << "PlainHttpSession " << this << " " << __func__ \ + << ": " + +template <typename I> +class HttpClient<I>::PlainHttpSession : public HttpSession<PlainHttpSession> { +public: + PlainHttpSession(HttpClient* http_client) + : HttpSession<PlainHttpSession>(http_client), + m_stream(http_client->m_strand) { + } + ~PlainHttpSession() override { + this->close_socket(); + } + + inline boost::beast::tcp_stream& + stream() { + return m_stream; + } + +protected: + void connect(boost::asio::ip::tcp::resolver::results_type results, + Context* on_finish) override { + auto http_client = this->m_http_client; + auto cct = http_client->m_cct; + ldout(cct, 15) << dendl; + + m_stream.async_connect( + results, + [on_finish](boost::system::error_code ec, const auto& endpoint) { + on_finish->complete(-ec.value()); + }); + } + + void disconnect(Context* on_finish) override { + on_finish->complete(0); + } + +private: + boost::beast::tcp_stream m_stream; + +}; + +#undef dout_prefix +#define dout_prefix *_dout << "librbd::migration::HttpClient::" \ + << "SslHttpSession " << this << " " << __func__ \ + << ": " + +template <typename I> +class HttpClient<I>::SslHttpSession : public HttpSession<SslHttpSession> { +public: + SslHttpSession(HttpClient* http_client) + : HttpSession<SslHttpSession>(http_client), + m_stream(http_client->m_strand, http_client->m_ssl_context) { + } + ~SslHttpSession() override { + this->close_socket(); + } + + inline boost::beast::ssl_stream<boost::beast::tcp_stream>& + stream() { + return m_stream; + } + +protected: + void connect(boost::asio::ip::tcp::resolver::results_type results, + Context* on_finish) override { + auto http_client = this->m_http_client; + auto cct = http_client->m_cct; + ldout(cct, 15) << dendl; + + boost::beast::get_lowest_layer(m_stream).async_connect( + results, + [this, on_finish](boost::system::error_code ec, const auto& endpoint) { + handle_connect(-ec.value(), on_finish); + }); + } + + void disconnect(Context* on_finish) override { + auto http_client = this->m_http_client; + auto cct = http_client->m_cct; + ldout(cct, 15) << dendl; + + if (!m_ssl_enabled) { + on_finish->complete(0); + return; + } + + m_stream.async_shutdown( + asio::util::get_callback_adapter([this, on_finish](int r) { + shutdown(r, on_finish); })); + } + +private: + boost::beast::ssl_stream<boost::beast::tcp_stream> m_stream; + bool m_ssl_enabled = false; + + void handle_connect(int r, Context* on_finish) { + auto http_client = this->m_http_client; + auto cct = http_client->m_cct; + ldout(cct, 15) << dendl; + + if (r < 0) { + lderr(cct) << "failed to connect to host '" + << http_client->m_url_spec.host << "': " + << cpp_strerror(r) << dendl; + on_finish->complete(r); + return; + } + + handshake(on_finish); + } + + void handshake(Context* on_finish) { + auto http_client = this->m_http_client; + auto cct = http_client->m_cct; + ldout(cct, 15) << dendl; + + auto& host = http_client->m_url_spec.host; + m_stream.set_verify_mode( + boost::asio::ssl::verify_peer | + boost::asio::ssl::verify_fail_if_no_peer_cert); + m_stream.set_verify_callback( + [host, next=boost::asio::ssl::host_name_verification(host), + ignore_self_signed=http_client->m_ignore_self_signed_cert] + (bool preverified, boost::asio::ssl::verify_context& ctx) { + if (!preverified && ignore_self_signed) { + auto ec = X509_STORE_CTX_get_error(ctx.native_handle()); + switch (ec) { + case X509_V_ERR_DEPTH_ZERO_SELF_SIGNED_CERT: + case X509_V_ERR_SELF_SIGNED_CERT_IN_CHAIN: + // ignore self-signed cert issues + preverified = true; + break; + default: + break; + } + } + return next(preverified, ctx); + }); + + // Set SNI Hostname (many hosts need this to handshake successfully) + if(!SSL_set_tlsext_host_name(m_stream.native_handle(), + http_client->m_url_spec.host.c_str())) { + int r = -::ERR_get_error(); + lderr(cct) << "failed to initialize SNI hostname: " << cpp_strerror(r) + << dendl; + on_finish->complete(r); + return; + } + + // Perform the SSL/TLS handshake + m_stream.async_handshake( + boost::asio::ssl::stream_base::client, + asio::util::get_callback_adapter( + [this, on_finish](int r) { handle_handshake(r, on_finish); })); + } + + void handle_handshake(int r, Context* on_finish) { + auto http_client = this->m_http_client; + auto cct = http_client->m_cct; + ldout(cct, 15) << "r=" << r << dendl; + + if (r < 0) { + lderr(cct) << "failed to complete handshake: " << cpp_strerror(r) + << dendl; + disconnect(new LambdaContext([r, on_finish](int) { + on_finish->complete(r); })); + return; + } + + m_ssl_enabled = true; + on_finish->complete(0); + } + + void shutdown(int r, Context* on_finish) { + auto http_client = this->m_http_client; + auto cct = http_client->m_cct; + ldout(cct, 15) << "r=" << r << dendl; + + on_finish->complete(r); + } +}; + +#undef dout_prefix +#define dout_prefix *_dout << "librbd::migration::HttpClient: " << this \ + << " " << __func__ << ": " + +template <typename I> +HttpClient<I>::HttpClient(I* image_ctx, const std::string& url) + : m_cct(image_ctx->cct), m_image_ctx(image_ctx), + m_asio_engine(image_ctx->asio_engine), m_url(url), + m_strand(boost::asio::make_strand(*m_asio_engine)), + m_ssl_context(boost::asio::ssl::context::sslv23_client) { + m_ssl_context.set_default_verify_paths(); +} + +template <typename I> +void HttpClient<I>::open(Context* on_finish) { + ldout(m_cct, 10) << "url=" << m_url << dendl; + + int r = util::parse_url(m_cct, m_url, &m_url_spec); + if (r < 0) { + lderr(m_cct) << "failed to parse url '" << m_url << "': " << cpp_strerror(r) + << dendl; + on_finish->complete(-EINVAL); + return; + } + + boost::asio::post(m_strand, [this, on_finish]() mutable { + create_http_session(on_finish); }); +} + +template <typename I> +void HttpClient<I>::close(Context* on_finish) { + boost::asio::post(m_strand, [this, on_finish]() mutable { + shut_down_http_session(on_finish); }); +} + +template <typename I> +void HttpClient<I>::get_size(uint64_t* size, Context* on_finish) { + ldout(m_cct, 10) << dendl; + + Request req; + req.method(boost::beast::http::verb::head); + + issue( + std::move(req), [this, size, on_finish](int r, Response&& response) { + handle_get_size(r, std::move(response), size, on_finish); + }); +} + +template <typename I> +void HttpClient<I>::handle_get_size(int r, Response&& response, uint64_t* size, + Context* on_finish) { + ldout(m_cct, 10) << "r=" << r << dendl; + + if (r < 0) { + lderr(m_cct) << "failed to retrieve size: " << cpp_strerror(r) << dendl; + on_finish->complete(r); + return; + } else if (!response.has_content_length()) { + lderr(m_cct) << "failed to retrieve size: missing content-length" << dendl; + on_finish->complete(-EINVAL); + return; + } + + auto content_length = response[boost::beast::http::field::content_length]; + try { + *size = boost::lexical_cast<uint64_t>(content_length); + } catch (boost::bad_lexical_cast&) { + lderr(m_cct) << "invalid content-length in response" << dendl; + on_finish->complete(-EBADMSG); + return; + } + + on_finish->complete(0); +} + +template <typename I> +void HttpClient<I>::read(io::Extents&& byte_extents, bufferlist* data, + Context* on_finish) { + ldout(m_cct, 20) << dendl; + + auto aio_comp = io::AioCompletion::create_and_start( + on_finish, librbd::util::get_image_ctx(m_image_ctx), io::AIO_TYPE_READ); + aio_comp->set_request_count(byte_extents.size()); + + // utilize ReadResult to assemble multiple byte extents into a single bl + // since boost::beast doesn't support multipart responses out-of-the-box + io::ReadResult read_result{data}; + aio_comp->read_result = std::move(read_result); + aio_comp->read_result.set_image_extents(byte_extents); + + // issue a range get request for each extent + uint64_t buffer_offset = 0; + for (auto [byte_offset, byte_length] : byte_extents) { + auto ctx = new io::ReadResult::C_ImageReadRequest( + aio_comp, buffer_offset, {{byte_offset, byte_length}}); + buffer_offset += byte_length; + + Request req; + req.method(boost::beast::http::verb::get); + + std::stringstream range; + ceph_assert(byte_length > 0); + range << "bytes=" << byte_offset << "-" << (byte_offset + byte_length - 1); + req.set(boost::beast::http::field::range, range.str()); + + issue( + std::move(req), + [this, byte_offset=byte_offset, byte_length=byte_length, ctx] + (int r, Response&& response) { + handle_read(r, std::move(response), byte_offset, byte_length, &ctx->bl, + ctx); + }); + } +} + +template <typename I> +void HttpClient<I>::handle_read(int r, Response&& response, + uint64_t byte_offset, uint64_t byte_length, + bufferlist* data, Context* on_finish) { + ldout(m_cct, 20) << "bytes=" << byte_offset << "~" << byte_length << ", " + << "r=" << r << dendl; + + if (r < 0) { + lderr(m_cct) << "failed to read requested byte range: " + << cpp_strerror(r) << dendl; + on_finish->complete(r); + return; + } else if (response.result() != boost::beast::http::status::partial_content) { + lderr(m_cct) << "failed to retrieve requested byte range: HTTP " + << response.result() << dendl; + on_finish->complete(-EIO); + return; + } else if (byte_length != response.body().size()) { + lderr(m_cct) << "unexpected short range read: " + << "wanted=" << byte_length << ", " + << "received=" << response.body().size() << dendl; + on_finish->complete(-EINVAL); + return; + } + + data->clear(); + data->append(response.body()); + on_finish->complete(data->length()); +} + +template <typename I> +void HttpClient<I>::issue(std::shared_ptr<Work>&& work) { + boost::asio::post(m_strand, [this, work=std::move(work)]() mutable { + m_http_session->issue(std::move(work)); }); +} + +template <typename I> +void HttpClient<I>::create_http_session(Context* on_finish) { + ldout(m_cct, 15) << dendl; + + ceph_assert(m_http_session == nullptr); + switch (m_url_spec.scheme) { + case URL_SCHEME_HTTP: + m_http_session = std::make_unique<PlainHttpSession>(this); + break; + case URL_SCHEME_HTTPS: + m_http_session = std::make_unique<SslHttpSession>(this); + break; + default: + ceph_assert(false); + break; + } + + m_http_session->init(on_finish); +} + +template <typename I> +void HttpClient<I>::shut_down_http_session(Context* on_finish) { + ldout(m_cct, 15) << dendl; + + if (m_http_session == nullptr) { + on_finish->complete(0); + return; + } + + m_http_session->shut_down(on_finish); +} + +} // namespace migration +} // namespace librbd + +template class librbd::migration::HttpClient<librbd::ImageCtx>; |