summaryrefslogtreecommitdiffstats
path: root/src/shrpx_live_check.cc
diff options
context:
space:
mode:
Diffstat (limited to 'src/shrpx_live_check.cc')
-rw-r--r--src/shrpx_live_check.cc799
1 files changed, 799 insertions, 0 deletions
diff --git a/src/shrpx_live_check.cc b/src/shrpx_live_check.cc
new file mode 100644
index 0000000..a81e992
--- /dev/null
+++ b/src/shrpx_live_check.cc
@@ -0,0 +1,799 @@
+/*
+ * nghttp2 - HTTP/2 C Library
+ *
+ * Copyright (c) 2016 Tatsuhiro Tsujikawa
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining
+ * a copy of this software and associated documentation files (the
+ * "Software"), to deal in the Software without restriction, including
+ * without limitation the rights to use, copy, modify, merge, publish,
+ * distribute, sublicense, and/or sell copies of the Software, and to
+ * permit persons to whom the Software is furnished to do so, subject to
+ * the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be
+ * included in all copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
+ * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
+ * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
+ * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
+ * LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
+ * OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
+ * WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
+ */
+#include "shrpx_live_check.h"
+#include "shrpx_worker.h"
+#include "shrpx_connect_blocker.h"
+#include "shrpx_tls.h"
+#include "shrpx_log.h"
+
+namespace shrpx {
+
+namespace {
+constexpr size_t MAX_BUFFER_SIZE = 4_k;
+} // namespace
+
+namespace {
+void readcb(struct ev_loop *loop, ev_io *w, int revents) {
+ int rv;
+ auto conn = static_cast<Connection *>(w->data);
+ auto live_check = static_cast<LiveCheck *>(conn->data);
+
+ rv = live_check->do_read();
+ if (rv != 0) {
+ live_check->on_failure();
+ return;
+ }
+}
+} // namespace
+
+namespace {
+void writecb(struct ev_loop *loop, ev_io *w, int revents) {
+ int rv;
+ auto conn = static_cast<Connection *>(w->data);
+ auto live_check = static_cast<LiveCheck *>(conn->data);
+
+ rv = live_check->do_write();
+ if (rv != 0) {
+ live_check->on_failure();
+ return;
+ }
+}
+} // namespace
+
+namespace {
+void timeoutcb(struct ev_loop *loop, ev_timer *w, int revents) {
+ auto conn = static_cast<Connection *>(w->data);
+ auto live_check = static_cast<LiveCheck *>(conn->data);
+
+ if (w == &conn->rt && !conn->expired_rt()) {
+ return;
+ }
+
+ live_check->on_failure();
+}
+} // namespace
+
+namespace {
+void backoff_timeoutcb(struct ev_loop *loop, ev_timer *w, int revents) {
+ int rv;
+ auto live_check = static_cast<LiveCheck *>(w->data);
+
+ rv = live_check->initiate_connection();
+ if (rv != 0) {
+ live_check->on_failure();
+ return;
+ }
+}
+} // namespace
+
+namespace {
+void settings_timeout_cb(struct ev_loop *loop, ev_timer *w, int revents) {
+ auto live_check = static_cast<LiveCheck *>(w->data);
+
+ if (LOG_ENABLED(INFO)) {
+ LOG(INFO) << "SETTINGS timeout";
+ }
+
+ live_check->on_failure();
+}
+} // namespace
+
+LiveCheck::LiveCheck(struct ev_loop *loop, SSL_CTX *ssl_ctx, Worker *worker,
+ DownstreamAddr *addr, std::mt19937 &gen)
+ : conn_(loop, -1, nullptr, worker->get_mcpool(),
+ worker->get_downstream_config()->timeout.write,
+ worker->get_downstream_config()->timeout.read, {}, {}, writecb,
+ readcb, timeoutcb, this, get_config()->tls.dyn_rec.warmup_threshold,
+ get_config()->tls.dyn_rec.idle_timeout, Proto::NONE),
+ wb_(worker->get_mcpool()),
+ gen_(gen),
+ read_(&LiveCheck::noop),
+ write_(&LiveCheck::noop),
+ worker_(worker),
+ ssl_ctx_(ssl_ctx),
+ addr_(addr),
+ session_(nullptr),
+ raddr_(nullptr),
+ success_count_(0),
+ fail_count_(0),
+ settings_ack_received_(false),
+ session_closing_(false) {
+ ev_timer_init(&backoff_timer_, backoff_timeoutcb, 0., 0.);
+ backoff_timer_.data = this;
+
+ // SETTINGS ACK must be received in a short timeout. Otherwise, we
+ // assume that connection is broken.
+ ev_timer_init(&settings_timer_, settings_timeout_cb, 0., 0.);
+ settings_timer_.data = this;
+}
+
+LiveCheck::~LiveCheck() {
+ disconnect();
+
+ ev_timer_stop(conn_.loop, &backoff_timer_);
+}
+
+void LiveCheck::disconnect() {
+ if (dns_query_) {
+ auto dns_tracker = worker_->get_dns_tracker();
+
+ dns_tracker->cancel(dns_query_.get());
+ }
+
+ dns_query_.reset();
+ // We can reuse resolved_addr_
+ raddr_ = nullptr;
+
+ conn_.rlimit.stopw();
+ conn_.wlimit.stopw();
+
+ ev_timer_stop(conn_.loop, &settings_timer_);
+
+ read_ = write_ = &LiveCheck::noop;
+
+ conn_.disconnect();
+
+ nghttp2_session_del(session_);
+ session_ = nullptr;
+
+ settings_ack_received_ = false;
+ session_closing_ = false;
+
+ wb_.reset();
+}
+
+// Use the similar backoff algorithm described in
+// https://github.com/grpc/grpc/blob/master/doc/connection-backoff.md
+namespace {
+constexpr size_t MAX_BACKOFF_EXP = 10;
+constexpr auto MULTIPLIER = 1.6;
+constexpr auto JITTER = 0.2;
+} // namespace
+
+void LiveCheck::schedule() {
+ auto base_backoff =
+ util::int_pow(MULTIPLIER, std::min(fail_count_, MAX_BACKOFF_EXP));
+ auto dist = std::uniform_real_distribution<>(-JITTER * base_backoff,
+ JITTER * base_backoff);
+
+ auto &downstreamconf = *get_config()->conn.downstream;
+
+ auto backoff =
+ std::min(downstreamconf.timeout.max_backoff, base_backoff + dist(gen_));
+
+ ev_timer_set(&backoff_timer_, backoff, 0.);
+ ev_timer_start(conn_.loop, &backoff_timer_);
+}
+
+int LiveCheck::do_read() { return read_(*this); }
+
+int LiveCheck::do_write() { return write_(*this); }
+
+int LiveCheck::initiate_connection() {
+ int rv;
+
+ auto worker_blocker = worker_->get_connect_blocker();
+ if (worker_blocker->blocked()) {
+ if (LOG_ENABLED(INFO)) {
+ LOG(INFO) << "Worker wide backend connection was blocked temporarily";
+ }
+ return -1;
+ }
+
+ if (!dns_query_ && addr_->tls) {
+ assert(ssl_ctx_);
+
+ auto ssl = tls::create_ssl(ssl_ctx_);
+ if (!ssl) {
+ return -1;
+ }
+
+ switch (addr_->proto) {
+ case Proto::HTTP1:
+ tls::setup_downstream_http1_alpn(ssl);
+ break;
+ case Proto::HTTP2:
+ tls::setup_downstream_http2_alpn(ssl);
+ break;
+ default:
+ assert(0);
+ }
+
+ conn_.set_ssl(ssl);
+ conn_.tls.client_session_cache = &addr_->tls_session_cache;
+ }
+
+ if (addr_->dns) {
+ if (!dns_query_) {
+ auto dns_query = std::make_unique<DNSQuery>(
+ addr_->host, [this](DNSResolverStatus status, const Address *result) {
+ int rv;
+
+ if (status == DNSResolverStatus::OK) {
+ *this->resolved_addr_ = *result;
+ }
+ rv = this->initiate_connection();
+ if (rv != 0) {
+ this->on_failure();
+ }
+ });
+ auto dns_tracker = worker_->get_dns_tracker();
+
+ if (!resolved_addr_) {
+ resolved_addr_ = std::make_unique<Address>();
+ }
+
+ switch (dns_tracker->resolve(resolved_addr_.get(), dns_query.get())) {
+ case DNSResolverStatus::ERROR:
+ return -1;
+ case DNSResolverStatus::RUNNING:
+ dns_query_ = std::move(dns_query);
+ return 0;
+ case DNSResolverStatus::OK:
+ break;
+ default:
+ assert(0);
+ }
+ } else {
+ switch (dns_query_->status) {
+ case DNSResolverStatus::ERROR:
+ dns_query_.reset();
+ return -1;
+ case DNSResolverStatus::OK:
+ dns_query_.reset();
+ break;
+ default:
+ assert(0);
+ }
+ }
+
+ util::set_port(*resolved_addr_, addr_->port);
+ raddr_ = resolved_addr_.get();
+ } else {
+ raddr_ = &addr_->addr;
+ }
+
+ conn_.fd = util::create_nonblock_socket(raddr_->su.storage.ss_family);
+
+ if (conn_.fd == -1) {
+ auto error = errno;
+ LOG(WARN) << "socket() failed; addr=" << util::to_numeric_addr(raddr_)
+ << ", errno=" << error;
+ return -1;
+ }
+
+ rv = connect(conn_.fd, &raddr_->su.sa, raddr_->len);
+ if (rv != 0 && errno != EINPROGRESS) {
+ auto error = errno;
+ LOG(WARN) << "connect() failed; addr=" << util::to_numeric_addr(raddr_)
+ << ", errno=" << error;
+
+ close(conn_.fd);
+ conn_.fd = -1;
+
+ return -1;
+ }
+
+ if (addr_->tls) {
+ auto sni_name =
+ addr_->sni.empty() ? StringRef{addr_->host} : StringRef{addr_->sni};
+ if (!util::numeric_host(sni_name.c_str())) {
+ SSL_set_tlsext_host_name(conn_.tls.ssl, sni_name.c_str());
+ }
+
+ auto session = tls::reuse_tls_session(addr_->tls_session_cache);
+ if (session) {
+ SSL_set_session(conn_.tls.ssl, session);
+ SSL_SESSION_free(session);
+ }
+
+ conn_.prepare_client_handshake();
+ }
+
+ write_ = &LiveCheck::connected;
+
+ ev_io_set(&conn_.wev, conn_.fd, EV_WRITE);
+ ev_io_set(&conn_.rev, conn_.fd, EV_READ);
+
+ conn_.wlimit.startw();
+
+ auto &downstreamconf = *get_config()->conn.downstream;
+
+ conn_.wt.repeat = downstreamconf.timeout.connect;
+ ev_timer_again(conn_.loop, &conn_.wt);
+
+ return 0;
+}
+
+int LiveCheck::connected() {
+ auto sock_error = util::get_socket_error(conn_.fd);
+ if (sock_error != 0) {
+ if (LOG_ENABLED(INFO)) {
+ LOG(INFO) << "Backend connect failed; addr="
+ << util::to_numeric_addr(raddr_) << ": errno=" << sock_error;
+ }
+
+ return -1;
+ }
+
+ if (LOG_ENABLED(INFO)) {
+ LOG(INFO) << "Connection established";
+ }
+
+ auto &downstreamconf = *get_config()->conn.downstream;
+
+ // Reset timeout for write. Previously, we set timeout for connect.
+ conn_.wt.repeat = downstreamconf.timeout.write;
+ ev_timer_again(conn_.loop, &conn_.wt);
+
+ conn_.rlimit.startw();
+ conn_.again_rt();
+
+ if (conn_.tls.ssl) {
+ read_ = &LiveCheck::tls_handshake;
+ write_ = &LiveCheck::tls_handshake;
+
+ return do_write();
+ }
+
+ if (addr_->proto == Proto::HTTP2) {
+ // For HTTP/2, we try to read SETTINGS ACK from server to make
+ // sure it is really alive, and serving HTTP/2.
+ read_ = &LiveCheck::read_clear;
+ write_ = &LiveCheck::write_clear;
+
+ if (connection_made() != 0) {
+ return -1;
+ }
+
+ return 0;
+ }
+
+ on_success();
+
+ return 0;
+}
+
+int LiveCheck::tls_handshake() {
+ conn_.last_read = ev_now(conn_.loop);
+
+ ERR_clear_error();
+
+ auto rv = conn_.tls_handshake();
+
+ if (rv == SHRPX_ERR_INPROGRESS) {
+ return 0;
+ }
+
+ if (rv < 0) {
+ return rv;
+ }
+
+ if (LOG_ENABLED(INFO)) {
+ LOG(INFO) << "SSL/TLS handshake completed";
+ }
+
+ if (!get_config()->tls.insecure &&
+ tls::check_cert(conn_.tls.ssl, addr_, raddr_) != 0) {
+ return -1;
+ }
+
+ // Check negotiated ALPN
+
+ const unsigned char *next_proto = nullptr;
+ unsigned int next_proto_len = 0;
+
+#ifndef OPENSSL_NO_NEXTPROTONEG
+ SSL_get0_next_proto_negotiated(conn_.tls.ssl, &next_proto, &next_proto_len);
+#endif // !OPENSSL_NO_NEXTPROTONEG
+#if OPENSSL_VERSION_NUMBER >= 0x10002000L
+ if (next_proto == nullptr) {
+ SSL_get0_alpn_selected(conn_.tls.ssl, &next_proto, &next_proto_len);
+ }
+#endif // OPENSSL_VERSION_NUMBER >= 0x10002000L
+
+ auto proto = StringRef{next_proto, next_proto_len};
+
+ switch (addr_->proto) {
+ case Proto::HTTP1:
+ if (proto.empty() || proto == StringRef::from_lit("http/1.1")) {
+ break;
+ }
+ return -1;
+ case Proto::HTTP2:
+ if (util::check_h2_is_selected(proto)) {
+ // For HTTP/2, we try to read SETTINGS ACK from server to make
+ // sure it is really alive, and serving HTTP/2.
+ read_ = &LiveCheck::read_tls;
+ write_ = &LiveCheck::write_tls;
+
+ if (connection_made() != 0) {
+ return -1;
+ }
+
+ return 0;
+ }
+ return -1;
+ default:
+ break;
+ }
+
+ on_success();
+
+ return 0;
+}
+
+int LiveCheck::read_tls() {
+ conn_.last_read = ev_now(conn_.loop);
+
+ std::array<uint8_t, 4_k> buf;
+
+ ERR_clear_error();
+
+ for (;;) {
+ auto nread = conn_.read_tls(buf.data(), buf.size());
+
+ if (nread == 0) {
+ return 0;
+ }
+
+ if (nread < 0) {
+ return nread;
+ }
+
+ if (on_read(buf.data(), nread) != 0) {
+ return -1;
+ }
+ }
+}
+
+int LiveCheck::write_tls() {
+ conn_.last_read = ev_now(conn_.loop);
+
+ ERR_clear_error();
+
+ struct iovec iov;
+
+ for (;;) {
+ if (wb_.rleft() > 0) {
+ auto iovcnt = wb_.riovec(&iov, 1);
+ if (iovcnt != 1) {
+ assert(0);
+ return -1;
+ }
+ auto nwrite = conn_.write_tls(iov.iov_base, iov.iov_len);
+
+ if (nwrite == 0) {
+ return 0;
+ }
+
+ if (nwrite < 0) {
+ return nwrite;
+ }
+
+ wb_.drain(nwrite);
+
+ continue;
+ }
+
+ if (on_write() != 0) {
+ return -1;
+ }
+
+ if (wb_.rleft() == 0) {
+ conn_.start_tls_write_idle();
+ break;
+ }
+ }
+
+ conn_.wlimit.stopw();
+ ev_timer_stop(conn_.loop, &conn_.wt);
+
+ if (settings_ack_received_) {
+ on_success();
+ }
+
+ return 0;
+}
+
+int LiveCheck::read_clear() {
+ conn_.last_read = ev_now(conn_.loop);
+
+ std::array<uint8_t, 4_k> buf;
+
+ for (;;) {
+ auto nread = conn_.read_clear(buf.data(), buf.size());
+
+ if (nread == 0) {
+ return 0;
+ }
+
+ if (nread < 0) {
+ return nread;
+ }
+
+ if (on_read(buf.data(), nread) != 0) {
+ return -1;
+ }
+ }
+}
+
+int LiveCheck::write_clear() {
+ conn_.last_read = ev_now(conn_.loop);
+
+ struct iovec iov;
+
+ for (;;) {
+ if (wb_.rleft() > 0) {
+ auto iovcnt = wb_.riovec(&iov, 1);
+ if (iovcnt != 1) {
+ assert(0);
+ return -1;
+ }
+ auto nwrite = conn_.write_clear(iov.iov_base, iov.iov_len);
+
+ if (nwrite == 0) {
+ return 0;
+ }
+
+ if (nwrite < 0) {
+ return nwrite;
+ }
+
+ wb_.drain(nwrite);
+
+ continue;
+ }
+
+ if (on_write() != 0) {
+ return -1;
+ }
+
+ if (wb_.rleft() == 0) {
+ break;
+ }
+ }
+
+ conn_.wlimit.stopw();
+ ev_timer_stop(conn_.loop, &conn_.wt);
+
+ if (settings_ack_received_) {
+ on_success();
+ }
+
+ return 0;
+}
+
+int LiveCheck::on_read(const uint8_t *data, size_t len) {
+ ssize_t rv;
+
+ rv = nghttp2_session_mem_recv(session_, data, len);
+ if (rv < 0) {
+ LOG(ERROR) << "nghttp2_session_mem_recv() returned error: "
+ << nghttp2_strerror(rv);
+ return -1;
+ }
+
+ if (settings_ack_received_ && !session_closing_) {
+ session_closing_ = true;
+ rv = nghttp2_session_terminate_session(session_, NGHTTP2_NO_ERROR);
+ if (rv != 0) {
+ return -1;
+ }
+ }
+
+ if (nghttp2_session_want_read(session_) == 0 &&
+ nghttp2_session_want_write(session_) == 0 && wb_.rleft() == 0) {
+ if (LOG_ENABLED(INFO)) {
+ LOG(INFO) << "No more read/write for this session";
+ }
+
+ // If we have SETTINGS ACK already, we treat this success.
+ if (settings_ack_received_) {
+ return 0;
+ }
+
+ return -1;
+ }
+
+ signal_write();
+
+ return 0;
+}
+
+int LiveCheck::on_write() {
+ for (;;) {
+ const uint8_t *data;
+ auto datalen = nghttp2_session_mem_send(session_, &data);
+
+ if (datalen < 0) {
+ LOG(ERROR) << "nghttp2_session_mem_send() returned error: "
+ << nghttp2_strerror(datalen);
+ return -1;
+ }
+ if (datalen == 0) {
+ break;
+ }
+ wb_.append(data, datalen);
+
+ if (wb_.rleft() >= MAX_BUFFER_SIZE) {
+ break;
+ }
+ }
+
+ if (nghttp2_session_want_read(session_) == 0 &&
+ nghttp2_session_want_write(session_) == 0 && wb_.rleft() == 0) {
+ if (LOG_ENABLED(INFO)) {
+ LOG(INFO) << "No more read/write for this session";
+ }
+
+ if (settings_ack_received_) {
+ return 0;
+ }
+
+ return -1;
+ }
+
+ return 0;
+}
+
+void LiveCheck::on_failure() {
+ ++fail_count_;
+
+ if (LOG_ENABLED(INFO)) {
+ LOG(INFO) << "Liveness check for " << addr_->host << ":" << addr_->port
+ << " failed " << fail_count_ << " time(s) in a row";
+ }
+
+ disconnect();
+
+ schedule();
+}
+
+void LiveCheck::on_success() {
+ ++success_count_;
+ fail_count_ = 0;
+
+ if (LOG_ENABLED(INFO)) {
+ LOG(INFO) << "Liveness check for " << addr_->host << ":" << addr_->port
+ << " succeeded " << success_count_ << " time(s) in a row";
+ }
+
+ if (success_count_ < addr_->rise) {
+ disconnect();
+
+ schedule();
+
+ return;
+ }
+
+ LOG(NOTICE) << util::to_numeric_addr(&addr_->addr) << " is considered online";
+
+ addr_->connect_blocker->online();
+
+ success_count_ = 0;
+ fail_count_ = 0;
+
+ disconnect();
+}
+
+int LiveCheck::noop() { return 0; }
+
+void LiveCheck::start_settings_timer() {
+ auto &downstreamconf = get_config()->http2.downstream;
+
+ ev_timer_set(&settings_timer_, downstreamconf.timeout.settings, 0.);
+ ev_timer_start(conn_.loop, &settings_timer_);
+}
+
+void LiveCheck::stop_settings_timer() {
+ ev_timer_stop(conn_.loop, &settings_timer_);
+}
+
+void LiveCheck::settings_ack_received() { settings_ack_received_ = true; }
+
+namespace {
+int on_frame_send_callback(nghttp2_session *session, const nghttp2_frame *frame,
+ void *user_data) {
+ auto live_check = static_cast<LiveCheck *>(user_data);
+
+ if (frame->hd.type != NGHTTP2_SETTINGS ||
+ (frame->hd.flags & NGHTTP2_FLAG_ACK)) {
+ return 0;
+ }
+
+ live_check->start_settings_timer();
+
+ return 0;
+}
+} // namespace
+
+namespace {
+int on_frame_recv_callback(nghttp2_session *session, const nghttp2_frame *frame,
+ void *user_data) {
+ auto live_check = static_cast<LiveCheck *>(user_data);
+
+ if (frame->hd.type != NGHTTP2_SETTINGS ||
+ (frame->hd.flags & NGHTTP2_FLAG_ACK) == 0) {
+ return 0;
+ }
+
+ live_check->stop_settings_timer();
+ live_check->settings_ack_received();
+
+ return 0;
+}
+} // namespace
+
+int LiveCheck::connection_made() {
+ int rv;
+
+ nghttp2_session_callbacks *callbacks;
+ rv = nghttp2_session_callbacks_new(&callbacks);
+ if (rv != 0) {
+ return -1;
+ }
+
+ nghttp2_session_callbacks_set_on_frame_send_callback(callbacks,
+ on_frame_send_callback);
+ nghttp2_session_callbacks_set_on_frame_recv_callback(callbacks,
+ on_frame_recv_callback);
+
+ rv = nghttp2_session_client_new(&session_, callbacks, this);
+
+ nghttp2_session_callbacks_del(callbacks);
+
+ if (rv != 0) {
+ return -1;
+ }
+
+ rv = nghttp2_submit_settings(session_, NGHTTP2_FLAG_NONE, nullptr, 0);
+ if (rv != 0) {
+ return -1;
+ }
+
+ auto must_terminate =
+ addr_->tls && !nghttp2::tls::check_http2_requirement(conn_.tls.ssl);
+
+ if (must_terminate) {
+ if (LOG_ENABLED(INFO)) {
+ LOG(INFO) << "TLSv1.2 was not negotiated. HTTP/2 must not be negotiated.";
+ }
+
+ rv = nghttp2_session_terminate_session(session_,
+ NGHTTP2_INADEQUATE_SECURITY);
+ if (rv != 0) {
+ return -1;
+ }
+ }
+
+ signal_write();
+
+ return 0;
+}
+
+void LiveCheck::signal_write() { conn_.wlimit.startw(); }
+
+} // namespace shrpx