diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-21 11:54:28 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-21 11:54:28 +0000 |
commit | e6918187568dbd01842d8d1d2c808ce16a894239 (patch) | |
tree | 64f88b554b444a49f656b6c656111a145cbbaa28 /src/rgw/rgw_asio_frontend.cc | |
parent | Initial commit. (diff) | |
download | ceph-e6918187568dbd01842d8d1d2c808ce16a894239.tar.xz ceph-e6918187568dbd01842d8d1d2c808ce16a894239.zip |
Adding upstream version 18.2.2.upstream/18.2.2
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'src/rgw/rgw_asio_frontend.cc')
-rw-r--r-- | src/rgw/rgw_asio_frontend.cc | 1199 |
1 files changed, 1199 insertions, 0 deletions
diff --git a/src/rgw/rgw_asio_frontend.cc b/src/rgw/rgw_asio_frontend.cc new file mode 100644 index 000000000..633a29633 --- /dev/null +++ b/src/rgw/rgw_asio_frontend.cc @@ -0,0 +1,1199 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab ft=cpp + +#include <atomic> +#include <ctime> +#include <thread> +#include <vector> + +#include <boost/asio.hpp> +#include <boost/intrusive/list.hpp> +#include <boost/smart_ptr/intrusive_ref_counter.hpp> + +#include <boost/context/protected_fixedsize_stack.hpp> +#include <spawn/spawn.hpp> + +#include "common/async/shared_mutex.h" +#include "common/errno.h" +#include "common/strtol.h" + +#include "rgw_asio_client.h" +#include "rgw_asio_frontend.h" + +#ifdef WITH_RADOSGW_BEAST_OPENSSL +#include <boost/asio/ssl.hpp> +#endif + +#include "common/split.h" + +#include "services/svc_config_key.h" +#include "services/svc_zone.h" + +#include "rgw_zone.h" + +#include "rgw_asio_frontend_timer.h" +#include "rgw_dmclock_async_scheduler.h" + +#define dout_subsys ceph_subsys_rgw + +namespace { + +using tcp = boost::asio::ip::tcp; +namespace http = boost::beast::http; +#ifdef WITH_RADOSGW_BEAST_OPENSSL +namespace ssl = boost::asio::ssl; +#endif + +struct Connection; + +// use explicit executor types instead of the type-erased boost::asio::executor +using executor_type = boost::asio::io_context::executor_type; + +using tcp_socket = boost::asio::basic_stream_socket<tcp, executor_type>; +using tcp_stream = boost::beast::basic_stream<tcp, executor_type>; + +using timeout_timer = rgw::basic_timeout_timer<ceph::coarse_mono_clock, + executor_type, Connection>; + +static constexpr size_t parse_buffer_size = 65536; +using parse_buffer = boost::beast::flat_static_buffer<parse_buffer_size>; + +// use mmap/mprotect to allocate 512k coroutine stacks +auto make_stack_allocator() { + return boost::context::protected_fixedsize_stack{512*1024}; +} + +using namespace std; + +template <typename Stream> +class StreamIO : public rgw::asio::ClientIO { + CephContext* const cct; + Stream& stream; + timeout_timer& timeout; + yield_context yield; + parse_buffer& buffer; + boost::system::error_code fatal_ec; + public: + StreamIO(CephContext *cct, Stream& stream, timeout_timer& timeout, + rgw::asio::parser_type& parser, yield_context yield, + parse_buffer& buffer, bool is_ssl, + const tcp::endpoint& local_endpoint, + const tcp::endpoint& remote_endpoint) + : ClientIO(parser, is_ssl, local_endpoint, remote_endpoint), + cct(cct), stream(stream), timeout(timeout), yield(yield), + buffer(buffer) + {} + + boost::system::error_code get_fatal_error_code() const { return fatal_ec; } + + size_t write_data(const char* buf, size_t len) override { + boost::system::error_code ec; + timeout.start(); + auto bytes = boost::asio::async_write(stream, boost::asio::buffer(buf, len), + yield[ec]); + timeout.cancel(); + if (ec) { + ldout(cct, 4) << "write_data failed: " << ec.message() << dendl; + if (ec == boost::asio::error::broken_pipe) { + boost::system::error_code ec_ignored; + stream.lowest_layer().shutdown(tcp_socket::shutdown_both, ec_ignored); + } + if (!fatal_ec) { + fatal_ec = ec; + } + throw rgw::io::Exception(ec.value(), std::system_category()); + } + return bytes; + } + + size_t recv_body(char* buf, size_t max) override { + auto& message = parser.get(); + auto& body_remaining = message.body(); + body_remaining.data = buf; + body_remaining.size = max; + + while (body_remaining.size && !parser.is_done()) { + boost::system::error_code ec; + timeout.start(); + http::async_read_some(stream, buffer, parser, yield[ec]); + timeout.cancel(); + if (ec == http::error::need_buffer) { + break; + } + if (ec) { + ldout(cct, 4) << "failed to read body: " << ec.message() << dendl; + if (!fatal_ec) { + fatal_ec = ec; + } + throw rgw::io::Exception(ec.value(), std::system_category()); + } + } + return max - body_remaining.size; + } +}; + +// output the http version as a string, ie 'HTTP/1.1' +struct http_version { + unsigned major_ver; + unsigned minor_ver; + explicit http_version(unsigned version) + : major_ver(version / 10), minor_ver(version % 10) {} +}; +std::ostream& operator<<(std::ostream& out, const http_version& v) { + return out << "HTTP/" << v.major_ver << '.' << v.minor_ver; +} + +// log an http header value or '-' if it's missing +struct log_header { + const http::fields& fields; + http::field field; + std::string_view quote; + log_header(const http::fields& fields, http::field field, + std::string_view quote = "") + : fields(fields), field(field), quote(quote) {} +}; +std::ostream& operator<<(std::ostream& out, const log_header& h) { + auto p = h.fields.find(h.field); + if (p == h.fields.end()) { + return out << '-'; + } + return out << h.quote << p->value() << h.quote; +} + +// log fractional seconds in milliseconds +struct log_ms_remainder { + ceph::coarse_real_time t; + log_ms_remainder(ceph::coarse_real_time t) : t(t) {} +}; +std::ostream& operator<<(std::ostream& out, const log_ms_remainder& m) { + using namespace std::chrono; + return out << std::setfill('0') << std::setw(3) + << duration_cast<milliseconds>(m.t.time_since_epoch()).count() % 1000; +} + +// log time in apache format: day/month/year:hour:minute:second zone +struct log_apache_time { + ceph::coarse_real_time t; + log_apache_time(ceph::coarse_real_time t) : t(t) {} +}; +std::ostream& operator<<(std::ostream& out, const log_apache_time& a) { + const auto t = ceph::coarse_real_clock::to_time_t(a.t); + const auto local = std::localtime(&t); + return out << std::put_time(local, "%d/%b/%Y:%T.") << log_ms_remainder{a.t} + << std::put_time(local, " %z"); +}; + +using SharedMutex = ceph::async::SharedMutex<boost::asio::io_context::executor_type>; + +template <typename Stream> +void handle_connection(boost::asio::io_context& context, + RGWProcessEnv& env, Stream& stream, + timeout_timer& timeout, size_t header_limit, + parse_buffer& buffer, bool is_ssl, + SharedMutex& pause_mutex, + rgw::dmclock::Scheduler *scheduler, + const std::string& uri_prefix, + boost::system::error_code& ec, + yield_context yield) +{ + // don't impose a limit on the body, since we read it in pieces + static constexpr size_t body_limit = std::numeric_limits<size_t>::max(); + + auto cct = env.driver->ctx(); + + // read messages from the stream until eof + for (;;) { + // configure the parser + rgw::asio::parser_type parser; + parser.header_limit(header_limit); + parser.body_limit(body_limit); + timeout.start(); + // parse the header + http::async_read_header(stream, buffer, parser, yield[ec]); + timeout.cancel(); + if (ec == boost::asio::error::connection_reset || + ec == boost::asio::error::bad_descriptor || + ec == boost::asio::error::operation_aborted || +#ifdef WITH_RADOSGW_BEAST_OPENSSL + ec == ssl::error::stream_truncated || +#endif + ec == http::error::end_of_stream) { + ldout(cct, 20) << "failed to read header: " << ec.message() << dendl; + return; + } + auto& message = parser.get(); + if (ec) { + ldout(cct, 1) << "failed to read header: " << ec.message() << dendl; + http::response<http::empty_body> response; + response.result(http::status::bad_request); + response.version(message.version() == 10 ? 10 : 11); + response.prepare_payload(); + timeout.start(); + http::async_write(stream, response, yield[ec]); + timeout.cancel(); + if (ec) { + ldout(cct, 5) << "failed to write response: " << ec.message() << dendl; + } + ldout(cct, 1) << "====== req done http_status=400 ======" << dendl; + return; + } + + bool expect_continue = (message[http::field::expect] == "100-continue"); + + { + auto lock = pause_mutex.async_lock_shared(yield[ec]); + if (ec == boost::asio::error::operation_aborted) { + return; + } else if (ec) { + ldout(cct, 1) << "failed to lock: " << ec.message() << dendl; + return; + } + + // process the request + RGWRequest req{env.driver->get_new_req_id()}; + + auto& socket = stream.lowest_layer(); + const auto& remote_endpoint = socket.remote_endpoint(ec); + if (ec) { + ldout(cct, 1) << "failed to connect client: " << ec.message() << dendl; + return; + } + const auto& local_endpoint = socket.local_endpoint(ec); + if (ec) { + ldout(cct, 1) << "failed to connect client: " << ec.message() << dendl; + return; + } + + StreamIO real_client{cct, stream, timeout, parser, yield, buffer, + is_ssl, local_endpoint, remote_endpoint}; + + auto real_client_io = rgw::io::add_reordering( + rgw::io::add_buffering(cct, + rgw::io::add_chunking( + rgw::io::add_conlen_controlling( + &real_client)))); + RGWRestfulIO client(cct, &real_client_io); + optional_yield y = null_yield; + if (cct->_conf->rgw_beast_enable_async) { + y = optional_yield{context, yield}; + } + int http_ret = 0; + string user = "-"; + const auto started = ceph::coarse_real_clock::now(); + ceph::coarse_real_clock::duration latency{}; + process_request(env, &req, uri_prefix, &client, y, + scheduler, &user, &latency, &http_ret); + + if (cct->_conf->subsys.should_gather(ceph_subsys_rgw_access, 1)) { + // access log line elements begin per Apache Combined Log Format with additions following + lsubdout(cct, rgw_access, 1) << "beast: " << std::hex << &req << std::dec << ": " + << remote_endpoint.address() << " - " << user << " [" << log_apache_time{started} << "] \"" + << message.method_string() << ' ' << message.target() << ' ' + << http_version{message.version()} << "\" " << http_ret << ' ' + << client.get_bytes_sent() + client.get_bytes_received() << ' ' + << log_header{message, http::field::referer, "\""} << ' ' + << log_header{message, http::field::user_agent, "\""} << ' ' + << log_header{message, http::field::range} << " latency=" + << latency << dendl; + } + + // process_request() can't distinguish between connection errors and + // http/s3 errors, so check StreamIO for fatal connection errors + ec = real_client.get_fatal_error_code(); + if (ec) { + return; + } + + if (real_client.sent_100_continue()) { + expect_continue = false; + } + } + + if (!parser.keep_alive()) { + return; + } + + // if we failed before reading the entire message, discard any remaining + // bytes before reading the next + while (!expect_continue && !parser.is_done()) { + static std::array<char, 1024> discard_buffer; + + auto& body = parser.get().body(); + body.size = discard_buffer.size(); + body.data = discard_buffer.data(); + + timeout.start(); + http::async_read_some(stream, buffer, parser, yield[ec]); + timeout.cancel(); + if (ec == http::error::need_buffer) { + continue; + } + if (ec == boost::asio::error::connection_reset) { + return; + } + if (ec) { + ldout(cct, 5) << "failed to discard unread message: " + << ec.message() << dendl; + return; + } + } + } +} + +// timeout support requires that connections are reference-counted, because the +// timeout_handler can outlive the coroutine +struct Connection : boost::intrusive::list_base_hook<>, + boost::intrusive_ref_counter<Connection> +{ + tcp_socket socket; + parse_buffer buffer; + + explicit Connection(tcp_socket&& socket) noexcept + : socket(std::move(socket)) {} + + void close(boost::system::error_code& ec) { + socket.close(ec); + } + + tcp_socket& get_socket() { return socket; } +}; + +class ConnectionList { + using List = boost::intrusive::list<Connection>; + List connections; + std::mutex mutex; + + void remove(Connection& c) { + std::lock_guard lock{mutex}; + if (c.is_linked()) { + connections.erase(List::s_iterator_to(c)); + } + } + public: + class Guard { + ConnectionList *list; + Connection *conn; + public: + Guard(ConnectionList *list, Connection *conn) : list(list), conn(conn) {} + ~Guard() { list->remove(*conn); } + }; + [[nodiscard]] Guard add(Connection& conn) { + std::lock_guard lock{mutex}; + connections.push_back(conn); + return Guard{this, &conn}; + } + void close(boost::system::error_code& ec) { + std::lock_guard lock{mutex}; + for (auto& conn : connections) { + conn.socket.close(ec); + } + connections.clear(); + } +}; + +namespace dmc = rgw::dmclock; +class AsioFrontend { + RGWProcessEnv& env; + RGWFrontendConfig* conf; + boost::asio::io_context context; + std::string uri_prefix; + ceph::timespan request_timeout = std::chrono::milliseconds(REQUEST_TIMEOUT); + size_t header_limit = 16384; +#ifdef WITH_RADOSGW_BEAST_OPENSSL + boost::optional<ssl::context> ssl_context; + int get_config_key_val(string name, + const string& type, + bufferlist *pbl); + int ssl_set_private_key(const string& name, bool is_ssl_cert); + int ssl_set_certificate_chain(const string& name); + int init_ssl(); +#endif + SharedMutex pause_mutex; + std::unique_ptr<rgw::dmclock::Scheduler> scheduler; + + struct Listener { + tcp::endpoint endpoint; + tcp::acceptor acceptor; + tcp_socket socket; + bool use_ssl = false; + bool use_nodelay = false; + + explicit Listener(boost::asio::io_context& context) + : acceptor(context), socket(context) {} + }; + std::vector<Listener> listeners; + + ConnectionList connections; + + // work guard to keep run() threads busy while listeners are paused + using Executor = boost::asio::io_context::executor_type; + std::optional<boost::asio::executor_work_guard<Executor>> work; + + std::vector<std::thread> threads; + std::atomic<bool> going_down{false}; + + CephContext* ctx() const { return env.driver->ctx(); } + std::optional<dmc::ClientCounters> client_counters; + std::unique_ptr<dmc::ClientConfig> client_config; + void accept(Listener& listener, boost::system::error_code ec); + + public: + AsioFrontend(RGWProcessEnv& env, RGWFrontendConfig* conf, + dmc::SchedulerCtx& sched_ctx) + : env(env), conf(conf), pause_mutex(context.get_executor()) + { + auto sched_t = dmc::get_scheduler_t(ctx()); + switch(sched_t){ + case dmc::scheduler_t::dmclock: + scheduler.reset(new dmc::AsyncScheduler(ctx(), + context, + std::ref(sched_ctx.get_dmc_client_counters()), + sched_ctx.get_dmc_client_config(), + *sched_ctx.get_dmc_client_config(), + dmc::AtLimit::Reject)); + break; + case dmc::scheduler_t::none: + lderr(ctx()) << "Got invalid scheduler type for beast, defaulting to throttler" << dendl; + [[fallthrough]]; + case dmc::scheduler_t::throttler: + scheduler.reset(new dmc::SimpleThrottler(ctx())); + + } + } + + int init(); + int run(); + void stop(); + void join(); + void pause(); + void unpause(); +}; + +unsigned short parse_port(const char *input, boost::system::error_code& ec) +{ + char *end = nullptr; + auto port = std::strtoul(input, &end, 10); + if (port > std::numeric_limits<unsigned short>::max()) { + ec.assign(ERANGE, boost::system::system_category()); + } else if (port == 0 && end == input) { + ec.assign(EINVAL, boost::system::system_category()); + } + return port; +} + +tcp::endpoint parse_endpoint(boost::asio::string_view input, + unsigned short default_port, + boost::system::error_code& ec) +{ + tcp::endpoint endpoint; + + if (input.empty()) { + ec = boost::asio::error::invalid_argument; + return endpoint; + } + + if (input[0] == '[') { // ipv6 + const size_t addr_begin = 1; + const size_t addr_end = input.find(']'); + if (addr_end == input.npos) { // no matching ] + ec = boost::asio::error::invalid_argument; + return endpoint; + } + if (addr_end + 1 < input.size()) { + // :port must must follow [ipv6] + if (input[addr_end + 1] != ':') { + ec = boost::asio::error::invalid_argument; + return endpoint; + } else { + auto port_str = input.substr(addr_end + 2); + endpoint.port(parse_port(port_str.data(), ec)); + } + } else { + endpoint.port(default_port); + } + auto addr = input.substr(addr_begin, addr_end - addr_begin); + endpoint.address(boost::asio::ip::make_address_v6(addr, ec)); + } else { // ipv4 + auto colon = input.find(':'); + if (colon != input.npos) { + auto port_str = input.substr(colon + 1); + endpoint.port(parse_port(port_str.data(), ec)); + if (ec) { + return endpoint; + } + } else { + endpoint.port(default_port); + } + auto addr = input.substr(0, colon); + endpoint.address(boost::asio::ip::make_address_v4(addr, ec)); + } + return endpoint; +} + +static int drop_privileges(CephContext *ctx) +{ + uid_t uid = ctx->get_set_uid(); + gid_t gid = ctx->get_set_gid(); + std::string uid_string = ctx->get_set_uid_string(); + std::string gid_string = ctx->get_set_gid_string(); + if (gid && setgid(gid) != 0) { + int err = errno; + ldout(ctx, -1) << "unable to setgid " << gid << ": " << cpp_strerror(err) << dendl; + return -err; + } + if (uid && setuid(uid) != 0) { + int err = errno; + ldout(ctx, -1) << "unable to setuid " << uid << ": " << cpp_strerror(err) << dendl; + return -err; + } + if (uid && gid) { + ldout(ctx, 0) << "set uid:gid to " << uid << ":" << gid + << " (" << uid_string << ":" << gid_string << ")" << dendl; + } + return 0; +} + +int AsioFrontend::init() +{ + boost::system::error_code ec; + auto& config = conf->get_config_map(); + + if (auto i = config.find("prefix"); i != config.end()) { + uri_prefix = i->second; + } + +// Setting global timeout + auto timeout = config.find("request_timeout_ms"); + if (timeout != config.end()) { + auto timeout_number = ceph::parse<uint64_t>(timeout->second); + if (timeout_number) { + request_timeout = std::chrono::milliseconds(*timeout_number); + } else { + lderr(ctx()) << "WARNING: invalid value for request_timeout_ms: " + << timeout->second << " setting it to the default value: " + << REQUEST_TIMEOUT << dendl; + } + } + + auto max_header_size = config.find("max_header_size"); + if (max_header_size != config.end()) { + auto limit = ceph::parse<uint64_t>(max_header_size->second); + if (!limit) { + lderr(ctx()) << "WARNING: invalid value for max_header_size: " + << max_header_size->second << ", using the default value: " + << header_limit << dendl; + } else if (*limit > parse_buffer_size) { // can't exceed parse buffer size + header_limit = parse_buffer_size; + lderr(ctx()) << "WARNING: max_header_size " << max_header_size->second + << " capped at maximum value " << header_limit << dendl; + } else { + header_limit = *limit; + } + } + +#ifdef WITH_RADOSGW_BEAST_OPENSSL + int r = init_ssl(); + if (r < 0) { + return r; + } +#endif + + // parse endpoints + auto ports = config.equal_range("port"); + for (auto i = ports.first; i != ports.second; ++i) { + auto port = parse_port(i->second.c_str(), ec); + if (ec) { + lderr(ctx()) << "failed to parse port=" << i->second << dendl; + return -ec.value(); + } + listeners.emplace_back(context); + listeners.back().endpoint.port(port); + + listeners.emplace_back(context); + listeners.back().endpoint = tcp::endpoint(tcp::v6(), port); + } + + auto endpoints = config.equal_range("endpoint"); + for (auto i = endpoints.first; i != endpoints.second; ++i) { + auto endpoint = parse_endpoint(i->second, 80, ec); + if (ec) { + lderr(ctx()) << "failed to parse endpoint=" << i->second << dendl; + return -ec.value(); + } + listeners.emplace_back(context); + listeners.back().endpoint = endpoint; + } + // parse tcp nodelay + auto nodelay = config.find("tcp_nodelay"); + if (nodelay != config.end()) { + for (auto& l : listeners) { + l.use_nodelay = (nodelay->second == "1"); + } + } + + + bool socket_bound = false; + // start listeners + for (auto& l : listeners) { + l.acceptor.open(l.endpoint.protocol(), ec); + if (ec) { + if (ec == boost::asio::error::address_family_not_supported) { + ldout(ctx(), 0) << "WARNING: cannot open socket for endpoint=" << l.endpoint + << ", " << ec.message() << dendl; + continue; + } + + lderr(ctx()) << "failed to open socket: " << ec.message() << dendl; + return -ec.value(); + } + + if (l.endpoint.protocol() == tcp::v6()) { + l.acceptor.set_option(boost::asio::ip::v6_only(true), ec); + if (ec) { + lderr(ctx()) << "failed to set v6_only socket option: " + << ec.message() << dendl; + return -ec.value(); + } + } + + l.acceptor.set_option(tcp::acceptor::reuse_address(true)); + l.acceptor.bind(l.endpoint, ec); + if (ec) { + lderr(ctx()) << "failed to bind address " << l.endpoint + << ": " << ec.message() << dendl; + return -ec.value(); + } + + auto it = config.find("max_connection_backlog"); + auto max_connection_backlog = boost::asio::socket_base::max_listen_connections; + if (it != config.end()) { + string err; + max_connection_backlog = strict_strtol(it->second.c_str(), 10, &err); + if (!err.empty()) { + ldout(ctx(), 0) << "WARNING: invalid value for max_connection_backlog=" << it->second << dendl; + max_connection_backlog = boost::asio::socket_base::max_listen_connections; + } + } + l.acceptor.listen(max_connection_backlog); + l.acceptor.async_accept(l.socket, + [this, &l] (boost::system::error_code ec) { + accept(l, ec); + }); + + ldout(ctx(), 4) << "frontend listening on " << l.endpoint << dendl; + socket_bound = true; + } + if (!socket_bound) { + lderr(ctx()) << "Unable to listen at any endpoints" << dendl; + return -EINVAL; + } + + return drop_privileges(ctx()); +} + +#ifdef WITH_RADOSGW_BEAST_OPENSSL + +static string config_val_prefix = "config://"; + +namespace { + +class ExpandMetaVar { + map<string, string> meta_map; + +public: + ExpandMetaVar(rgw::sal::Zone* zone_svc) { + meta_map["realm"] = zone_svc->get_realm_name(); + meta_map["realm_id"] = zone_svc->get_realm_id(); + meta_map["zonegroup"] = zone_svc->get_zonegroup().get_name(); + meta_map["zonegroup_id"] = zone_svc->get_zonegroup().get_id(); + meta_map["zone"] = zone_svc->get_name(); + meta_map["zone_id"] = zone_svc->get_id(); + } + + string process_str(const string& in); +}; + +string ExpandMetaVar::process_str(const string& in) +{ + if (meta_map.empty()) { + return in; + } + + auto pos = in.find('$'); + if (pos == std::string::npos) { + return in; + } + + string out; + decltype(pos) last_pos = 0; + + while (pos != std::string::npos) { + if (pos > last_pos) { + out += in.substr(last_pos, pos - last_pos); + } + + string var; + const char *valid_chars = "abcdefghijklmnopqrstuvwxyz_"; + + size_t endpos = 0; + if (in[pos+1] == '{') { + // ...${foo_bar}... + endpos = in.find_first_not_of(valid_chars, pos + 2); + if (endpos != std::string::npos && + in[endpos] == '}') { + var = in.substr(pos + 2, endpos - pos - 2); + endpos++; + } + } else { + // ...$foo... + endpos = in.find_first_not_of(valid_chars, pos + 1); + if (endpos != std::string::npos) + var = in.substr(pos + 1, endpos - pos - 1); + else + var = in.substr(pos + 1); + } + string var_source = in.substr(pos, endpos - pos); + last_pos = endpos; + + auto iter = meta_map.find(var); + if (iter != meta_map.end()) { + out += iter->second; + } else { + out += var_source; + } + pos = in.find('$', last_pos); + } + if (last_pos != std::string::npos) { + out += in.substr(last_pos); + } + + return out; +} + +} /* anonymous namespace */ + +int AsioFrontend::get_config_key_val(string name, + const string& type, + bufferlist *pbl) +{ + if (name.empty()) { + lderr(ctx()) << "bad " << type << " config value" << dendl; + return -EINVAL; + } + + int r = env.driver->get_config_key_val(name, pbl); + if (r < 0) { + lderr(ctx()) << type << " was not found: " << name << dendl; + return r; + } + return 0; +} + +int AsioFrontend::ssl_set_private_key(const string& name, bool is_ssl_certificate) +{ + boost::system::error_code ec; + + if (!boost::algorithm::starts_with(name, config_val_prefix)) { + ssl_context->use_private_key_file(name, ssl::context::pem, ec); + } else { + bufferlist bl; + int r = get_config_key_val(name.substr(config_val_prefix.size()), + "ssl_private_key", + &bl); + if (r < 0) { + return r; + } + ssl_context->use_private_key(boost::asio::buffer(bl.c_str(), bl.length()), + ssl::context::pem, ec); + } + + if (ec) { + if (!is_ssl_certificate) { + lderr(ctx()) << "failed to add ssl_private_key=" << name + << ": " << ec.message() << dendl; + } else { + lderr(ctx()) << "failed to use ssl_certificate=" << name + << " as a private key: " << ec.message() << dendl; + } + return -ec.value(); + } + + return 0; +} + +int AsioFrontend::ssl_set_certificate_chain(const string& name) +{ + boost::system::error_code ec; + + if (!boost::algorithm::starts_with(name, config_val_prefix)) { + ssl_context->use_certificate_chain_file(name, ec); + } else { + bufferlist bl; + int r = get_config_key_val(name.substr(config_val_prefix.size()), + "ssl_certificate", + &bl); + if (r < 0) { + return r; + } + ssl_context->use_certificate_chain(boost::asio::buffer(bl.c_str(), bl.length()), + ec); + } + + if (ec) { + lderr(ctx()) << "failed to use ssl_certificate=" << name + << ": " << ec.message() << dendl; + return -ec.value(); + } + + return 0; +} + +int AsioFrontend::init_ssl() +{ + boost::system::error_code ec; + auto& config = conf->get_config_map(); + + // ssl configuration + std::optional<string> cert = conf->get_val("ssl_certificate"); + if (cert) { + // only initialize the ssl context if it's going to be used + ssl_context = boost::in_place(ssl::context::tls); + } + + std::optional<string> key = conf->get_val("ssl_private_key"); + bool have_cert = false; + + if (key && !cert) { + lderr(ctx()) << "no ssl_certificate configured for ssl_private_key" << dendl; + return -EINVAL; + } + + std::optional<string> options = conf->get_val("ssl_options"); + if (options) { + if (!cert) { + lderr(ctx()) << "no ssl_certificate configured for ssl_options" << dendl; + return -EINVAL; + } + } else if (cert) { + options = "no_sslv2:no_sslv3:no_tlsv1:no_tlsv1_1"; + } + + if (options) { + for (auto &option : ceph::split(*options, ":")) { + if (option == "default_workarounds") { + ssl_context->set_options(ssl::context::default_workarounds); + } else if (option == "no_compression") { + ssl_context->set_options(ssl::context::no_compression); + } else if (option == "no_sslv2") { + ssl_context->set_options(ssl::context::no_sslv2); + } else if (option == "no_sslv3") { + ssl_context->set_options(ssl::context::no_sslv3); + } else if (option == "no_tlsv1") { + ssl_context->set_options(ssl::context::no_tlsv1); + } else if (option == "no_tlsv1_1") { + ssl_context->set_options(ssl::context::no_tlsv1_1); + } else if (option == "no_tlsv1_2") { + ssl_context->set_options(ssl::context::no_tlsv1_2); + } else if (option == "single_dh_use") { + ssl_context->set_options(ssl::context::single_dh_use); + } else { + lderr(ctx()) << "ignoring unknown ssl option '" << option << "'" << dendl; + } + } + } + + std::optional<string> ciphers = conf->get_val("ssl_ciphers"); + if (ciphers) { + if (!cert) { + lderr(ctx()) << "no ssl_certificate configured for ssl_ciphers" << dendl; + return -EINVAL; + } + + int r = SSL_CTX_set_cipher_list(ssl_context->native_handle(), + ciphers->c_str()); + if (r == 0) { + lderr(ctx()) << "no cipher could be selected from ssl_ciphers: " + << *ciphers << dendl; + return -EINVAL; + } + } + + auto ports = config.equal_range("ssl_port"); + auto endpoints = config.equal_range("ssl_endpoint"); + + /* + * don't try to config certificate if frontend isn't configured for ssl + */ + if (ports.first == ports.second && + endpoints.first == endpoints.second) { + return 0; + } + + bool key_is_cert = false; + + if (cert) { + if (!key) { + key = cert; + key_is_cert = true; + } + + ExpandMetaVar emv(env.driver->get_zone()); + + cert = emv.process_str(*cert); + key = emv.process_str(*key); + + int r = ssl_set_private_key(*key, key_is_cert); + bool have_private_key = (r >= 0); + if (r < 0) { + if (!key_is_cert) { + r = ssl_set_private_key(*cert, true); + have_private_key = (r >= 0); + } + } + + if (have_private_key) { + int r = ssl_set_certificate_chain(*cert); + have_cert = (r >= 0); + } + } + + // parse ssl endpoints + for (auto i = ports.first; i != ports.second; ++i) { + if (!have_cert) { + lderr(ctx()) << "no ssl_certificate configured for ssl_port" << dendl; + return -EINVAL; + } + auto port = parse_port(i->second.c_str(), ec); + if (ec) { + lderr(ctx()) << "failed to parse ssl_port=" << i->second << dendl; + return -ec.value(); + } + listeners.emplace_back(context); + listeners.back().endpoint.port(port); + listeners.back().use_ssl = true; + + listeners.emplace_back(context); + listeners.back().endpoint = tcp::endpoint(tcp::v6(), port); + listeners.back().use_ssl = true; + } + + for (auto i = endpoints.first; i != endpoints.second; ++i) { + if (!have_cert) { + lderr(ctx()) << "no ssl_certificate configured for ssl_endpoint" << dendl; + return -EINVAL; + } + auto endpoint = parse_endpoint(i->second, 443, ec); + if (ec) { + lderr(ctx()) << "failed to parse ssl_endpoint=" << i->second << dendl; + return -ec.value(); + } + listeners.emplace_back(context); + listeners.back().endpoint = endpoint; + listeners.back().use_ssl = true; + } + return 0; +} +#endif // WITH_RADOSGW_BEAST_OPENSSL + +void AsioFrontend::accept(Listener& l, boost::system::error_code ec) +{ + if (!l.acceptor.is_open()) { + return; + } else if (ec == boost::asio::error::operation_aborted) { + return; + } else if (ec) { + ldout(ctx(), 1) << "accept failed: " << ec.message() << dendl; + return; + } + auto stream = std::move(l.socket); + stream.set_option(tcp::no_delay(l.use_nodelay), ec); + l.acceptor.async_accept(l.socket, + [this, &l] (boost::system::error_code ec) { + accept(l, ec); + }); + + // spawn a coroutine to handle the connection +#ifdef WITH_RADOSGW_BEAST_OPENSSL + if (l.use_ssl) { + spawn::spawn(context, + [this, s=std::move(stream)] (yield_context yield) mutable { + auto conn = boost::intrusive_ptr{new Connection(std::move(s))}; + auto c = connections.add(*conn); + // wrap the tcp stream in an ssl stream + boost::asio::ssl::stream<tcp_socket&> stream{conn->socket, *ssl_context}; + auto timeout = timeout_timer{context.get_executor(), request_timeout, conn}; + // do ssl handshake + boost::system::error_code ec; + timeout.start(); + auto bytes = stream.async_handshake(ssl::stream_base::server, + conn->buffer.data(), yield[ec]); + timeout.cancel(); + if (ec) { + ldout(ctx(), 1) << "ssl handshake failed: " << ec.message() << dendl; + return; + } + conn->buffer.consume(bytes); + handle_connection(context, env, stream, timeout, header_limit, + conn->buffer, true, pause_mutex, scheduler.get(), + uri_prefix, ec, yield); + if (!ec) { + // ssl shutdown (ignoring errors) + stream.async_shutdown(yield[ec]); + } + conn->socket.shutdown(tcp::socket::shutdown_both, ec); + }, make_stack_allocator()); + } else { +#else + { +#endif // WITH_RADOSGW_BEAST_OPENSSL + spawn::spawn(context, + [this, s=std::move(stream)] (yield_context yield) mutable { + auto conn = boost::intrusive_ptr{new Connection(std::move(s))}; + auto c = connections.add(*conn); + auto timeout = timeout_timer{context.get_executor(), request_timeout, conn}; + boost::system::error_code ec; + handle_connection(context, env, conn->socket, timeout, header_limit, + conn->buffer, false, pause_mutex, scheduler.get(), + uri_prefix, ec, yield); + conn->socket.shutdown(tcp_socket::shutdown_both, ec); + }, make_stack_allocator()); + } +} + +int AsioFrontend::run() +{ + auto cct = ctx(); + const int thread_count = cct->_conf->rgw_thread_pool_size; + threads.reserve(thread_count); + + ldout(cct, 4) << "frontend spawning " << thread_count << " threads" << dendl; + + // the worker threads call io_context::run(), which will return when there's + // no work left. hold a work guard to keep these threads going until join() + work.emplace(boost::asio::make_work_guard(context)); + + for (int i = 0; i < thread_count; i++) { + threads.emplace_back([this]() noexcept { + // request warnings on synchronous librados calls in this thread + is_asio_thread = true; + // Have uncaught exceptions kill the process and give a + // stacktrace, not be swallowed. + context.run(); + }); + } + return 0; +} + +void AsioFrontend::stop() +{ + ldout(ctx(), 4) << "frontend initiating shutdown..." << dendl; + + going_down = true; + + boost::system::error_code ec; + // close all listeners + for (auto& listener : listeners) { + listener.acceptor.close(ec); + } + // close all connections + connections.close(ec); + pause_mutex.cancel(); +} + +void AsioFrontend::join() +{ + if (!going_down) { + stop(); + } + work.reset(); + + ldout(ctx(), 4) << "frontend joining threads..." << dendl; + for (auto& thread : threads) { + thread.join(); + } + ldout(ctx(), 4) << "frontend done" << dendl; +} + +void AsioFrontend::pause() +{ + ldout(ctx(), 4) << "frontend pausing connections..." << dendl; + + // cancel pending calls to accept(), but don't close the sockets + boost::system::error_code ec; + for (auto& l : listeners) { + l.acceptor.cancel(ec); + } + + // pause and wait for outstanding requests to complete + pause_mutex.lock(ec); + + if (ec) { + ldout(ctx(), 1) << "frontend failed to pause: " << ec.message() << dendl; + } else { + ldout(ctx(), 4) << "frontend paused" << dendl; + } +} + +void AsioFrontend::unpause() +{ + // unpause to unblock connections + pause_mutex.unlock(); + + // start accepting connections again + for (auto& l : listeners) { + l.acceptor.async_accept(l.socket, + [this, &l] (boost::system::error_code ec) { + accept(l, ec); + }); + } + + ldout(ctx(), 4) << "frontend unpaused" << dendl; +} + +} // anonymous namespace + +class RGWAsioFrontend::Impl : public AsioFrontend { + public: + Impl(RGWProcessEnv& env, RGWFrontendConfig* conf, + rgw::dmclock::SchedulerCtx& sched_ctx) + : AsioFrontend(env, conf, sched_ctx) {} +}; + +RGWAsioFrontend::RGWAsioFrontend(RGWProcessEnv& env, + RGWFrontendConfig* conf, + rgw::dmclock::SchedulerCtx& sched_ctx) + : impl(new Impl(env, conf, sched_ctx)) +{ +} + +RGWAsioFrontend::~RGWAsioFrontend() = default; + +int RGWAsioFrontend::init() +{ + return impl->init(); +} + +int RGWAsioFrontend::run() +{ + return impl->run(); +} + +void RGWAsioFrontend::stop() +{ + impl->stop(); +} + +void RGWAsioFrontend::join() +{ + impl->join(); +} + +void RGWAsioFrontend::pause_for_new_config() +{ + impl->pause(); +} + +void RGWAsioFrontend::unpause_with_new_config() +{ + impl->unpause(); +} |