summaryrefslogtreecommitdiffstats
path: root/src/shrpx_http2_session.cc
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--src/shrpx_http2_session.cc2435
1 files changed, 2435 insertions, 0 deletions
diff --git a/src/shrpx_http2_session.cc b/src/shrpx_http2_session.cc
new file mode 100644
index 0000000..f0e9bf0
--- /dev/null
+++ b/src/shrpx_http2_session.cc
@@ -0,0 +1,2435 @@
+/*
+ * nghttp2 - HTTP/2 C Library
+ *
+ * Copyright (c) 2012 Tatsuhiro Tsujikawa
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining
+ * a copy of this software and associated documentation files (the
+ * "Software"), to deal in the Software without restriction, including
+ * without limitation the rights to use, copy, modify, merge, publish,
+ * distribute, sublicense, and/or sell copies of the Software, and to
+ * permit persons to whom the Software is furnished to do so, subject to
+ * the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be
+ * included in all copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
+ * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
+ * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
+ * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
+ * LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
+ * OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
+ * WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
+ */
+#include "shrpx_http2_session.h"
+
+#include <netinet/tcp.h>
+#ifdef HAVE_UNISTD_H
+# include <unistd.h>
+#endif // HAVE_UNISTD_H
+
+#include <vector>
+
+#include <openssl/err.h>
+
+#include "shrpx_upstream.h"
+#include "shrpx_downstream.h"
+#include "shrpx_config.h"
+#include "shrpx_error.h"
+#include "shrpx_http2_downstream_connection.h"
+#include "shrpx_client_handler.h"
+#include "shrpx_tls.h"
+#include "shrpx_http.h"
+#include "shrpx_worker.h"
+#include "shrpx_connect_blocker.h"
+#include "shrpx_log.h"
+#include "http2.h"
+#include "util.h"
+#include "base64.h"
+#include "tls.h"
+
+using namespace nghttp2;
+
+namespace shrpx {
+
+namespace {
+constexpr ev_tstamp CONNCHK_TIMEOUT = 5.;
+constexpr ev_tstamp CONNCHK_PING_TIMEOUT = 1.;
+} // namespace
+
+namespace {
+constexpr size_t MAX_BUFFER_SIZE = 32_k;
+} // namespace
+
+namespace {
+void connchk_timeout_cb(struct ev_loop *loop, ev_timer *w, int revents) {
+ auto http2session = static_cast<Http2Session *>(w->data);
+
+ ev_timer_stop(loop, w);
+
+ switch (http2session->get_connection_check_state()) {
+ case ConnectionCheck::STARTED:
+ // ping timeout; disconnect
+ if (LOG_ENABLED(INFO)) {
+ SSLOG(INFO, http2session) << "ping timeout";
+ }
+
+ delete http2session;
+
+ return;
+ default:
+ if (LOG_ENABLED(INFO)) {
+ SSLOG(INFO, http2session) << "connection check required";
+ }
+ http2session->set_connection_check_state(ConnectionCheck::REQUIRED);
+ }
+}
+} // namespace
+
+namespace {
+void settings_timeout_cb(struct ev_loop *loop, ev_timer *w, int revents) {
+ auto http2session = static_cast<Http2Session *>(w->data);
+
+ if (LOG_ENABLED(INFO)) {
+ SSLOG(INFO, http2session) << "SETTINGS timeout";
+ }
+
+ downstream_failure(http2session->get_addr(), http2session->get_raddr());
+
+ if (http2session->terminate_session(NGHTTP2_SETTINGS_TIMEOUT) != 0) {
+ delete http2session;
+
+ return;
+ }
+ http2session->signal_write();
+}
+} // namespace
+
+namespace {
+void timeoutcb(struct ev_loop *loop, ev_timer *w, int revents) {
+ auto conn = static_cast<Connection *>(w->data);
+ auto http2session = static_cast<Http2Session *>(conn->data);
+
+ if (w == &conn->rt && !conn->expired_rt()) {
+ return;
+ }
+
+ if (LOG_ENABLED(INFO)) {
+ SSLOG(INFO, http2session) << "Timeout";
+ }
+
+ http2session->on_timeout();
+
+ delete http2session;
+}
+} // namespace
+
+namespace {
+void readcb(struct ev_loop *loop, ev_io *w, int revents) {
+ int rv;
+ auto conn = static_cast<Connection *>(w->data);
+ auto http2session = static_cast<Http2Session *>(conn->data);
+ rv = http2session->do_read();
+ if (rv != 0) {
+ delete http2session;
+
+ return;
+ }
+ http2session->connection_alive();
+}
+} // namespace
+
+namespace {
+void writecb(struct ev_loop *loop, ev_io *w, int revents) {
+ int rv;
+ auto conn = static_cast<Connection *>(w->data);
+ auto http2session = static_cast<Http2Session *>(conn->data);
+ rv = http2session->do_write();
+ if (rv != 0) {
+ delete http2session;
+
+ return;
+ }
+ http2session->reset_connection_check_timer_if_not_checking();
+}
+} // namespace
+
+namespace {
+void initiate_connection_cb(struct ev_loop *loop, ev_timer *w, int revents) {
+ auto http2session = static_cast<Http2Session *>(w->data);
+ ev_timer_stop(loop, w);
+ if (http2session->initiate_connection() != 0) {
+ if (LOG_ENABLED(INFO)) {
+ SSLOG(INFO, http2session) << "Could not initiate backend connection";
+ }
+
+ delete http2session;
+
+ return;
+ }
+}
+} // namespace
+
+namespace {
+void prepare_cb(struct ev_loop *loop, ev_prepare *w, int revents) {
+ auto http2session = static_cast<Http2Session *>(w->data);
+ http2session->check_retire();
+}
+} // namespace
+
+Http2Session::Http2Session(struct ev_loop *loop, SSL_CTX *ssl_ctx,
+ Worker *worker,
+ const std::shared_ptr<DownstreamAddrGroup> &group,
+ DownstreamAddr *addr)
+ : dlnext(nullptr),
+ dlprev(nullptr),
+ conn_(loop, -1, nullptr, worker->get_mcpool(),
+ group->shared_addr->timeout.write, group->shared_addr->timeout.read,
+ {}, {}, writecb, readcb, timeoutcb, this,
+ get_config()->tls.dyn_rec.warmup_threshold,
+ get_config()->tls.dyn_rec.idle_timeout, Proto::HTTP2),
+ wb_(worker->get_mcpool()),
+ worker_(worker),
+ ssl_ctx_(ssl_ctx),
+ group_(group),
+ addr_(addr),
+ session_(nullptr),
+ raddr_(nullptr),
+ state_(Http2SessionState::DISCONNECTED),
+ connection_check_state_(ConnectionCheck::NONE),
+ freelist_zone_(FreelistZone::NONE),
+ settings_recved_(false),
+ allow_connect_proto_(false) {
+ read_ = write_ = &Http2Session::noop;
+
+ on_read_ = &Http2Session::read_noop;
+ on_write_ = &Http2Session::write_noop;
+
+ // We will reuse this many times, so use repeat timeout value. The
+ // timeout value is set later.
+ ev_timer_init(&connchk_timer_, connchk_timeout_cb, 0., 0.);
+
+ connchk_timer_.data = this;
+
+ // SETTINGS ACK timeout is 10 seconds for now. We will reuse this
+ // many times, so use repeat timeout value.
+ ev_timer_init(&settings_timer_, settings_timeout_cb, 0., 0.);
+
+ settings_timer_.data = this;
+
+ ev_timer_init(&initiate_connection_timer_, initiate_connection_cb, 0., 0.);
+ initiate_connection_timer_.data = this;
+
+ ev_prepare_init(&prep_, prepare_cb);
+ prep_.data = this;
+ ev_prepare_start(loop, &prep_);
+}
+
+Http2Session::~Http2Session() {
+ exclude_from_scheduling();
+ disconnect(should_hard_fail());
+}
+
+int Http2Session::disconnect(bool hard) {
+ if (LOG_ENABLED(INFO)) {
+ SSLOG(INFO, this) << "Disconnecting";
+ }
+ nghttp2_session_del(session_);
+ session_ = nullptr;
+
+ wb_.reset();
+
+ if (dns_query_) {
+ auto dns_tracker = worker_->get_dns_tracker();
+ dns_tracker->cancel(dns_query_.get());
+ }
+
+ conn_.rlimit.stopw();
+ conn_.wlimit.stopw();
+
+ ev_prepare_stop(conn_.loop, &prep_);
+
+ ev_timer_stop(conn_.loop, &initiate_connection_timer_);
+ ev_timer_stop(conn_.loop, &settings_timer_);
+ ev_timer_stop(conn_.loop, &connchk_timer_);
+
+ read_ = write_ = &Http2Session::noop;
+
+ on_read_ = &Http2Session::read_noop;
+ on_write_ = &Http2Session::write_noop;
+
+ conn_.disconnect();
+
+ if (proxy_htp_) {
+ proxy_htp_.reset();
+ }
+
+ connection_check_state_ = ConnectionCheck::NONE;
+ state_ = Http2SessionState::DISCONNECTED;
+
+ // When deleting Http2DownstreamConnection, it calls this object's
+ // remove_downstream_connection(). The multiple
+ // Http2DownstreamConnection objects belong to the same
+ // ClientHandler object if upstream is h2. So be careful when you
+ // delete ClientHandler here.
+ //
+ // We allow creating new pending Http2DownstreamConnection with this
+ // object. Upstream::on_downstream_reset() may add
+ // Http2DownstreamConnection to another Http2Session.
+
+ for (auto dc = dconns_.head; dc;) {
+ auto next = dc->dlnext;
+ auto downstream = dc->get_downstream();
+ auto upstream = downstream->get_upstream();
+
+ // Failure is allowed only for HTTP/1 upstream where upstream is
+ // not shared by multiple Downstreams.
+ if (upstream->on_downstream_reset(downstream, hard) != 0) {
+ delete upstream->get_client_handler();
+ }
+
+ // dc was deleted
+ dc = next;
+ }
+
+ auto streams = std::move(streams_);
+ for (auto s = streams.head; s;) {
+ auto next = s->dlnext;
+ delete s;
+ s = next;
+ }
+
+ return 0;
+}
+
+int Http2Session::resolve_name() {
+ auto dns_query = std::make_unique<DNSQuery>(
+ addr_->host, [this](DNSResolverStatus status, const Address *result) {
+ int rv;
+
+ if (status == DNSResolverStatus::OK) {
+ *resolved_addr_ = *result;
+ util::set_port(*this->resolved_addr_, this->addr_->port);
+ }
+
+ rv = this->initiate_connection();
+ if (rv != 0) {
+ delete this;
+ }
+ });
+ resolved_addr_ = std::make_unique<Address>();
+ auto dns_tracker = worker_->get_dns_tracker();
+ switch (dns_tracker->resolve(resolved_addr_.get(), dns_query.get())) {
+ case DNSResolverStatus::ERROR:
+ return -1;
+ case DNSResolverStatus::RUNNING:
+ dns_query_ = std::move(dns_query);
+ state_ = Http2SessionState::RESOLVING_NAME;
+ return 0;
+ case DNSResolverStatus::OK:
+ util::set_port(*resolved_addr_, addr_->port);
+ return 0;
+ default:
+ assert(0);
+ abort();
+ }
+}
+
+namespace {
+int htp_hdrs_completecb(llhttp_t *htp);
+} // namespace
+
+namespace {
+constexpr llhttp_settings_t htp_hooks = {
+ nullptr, // llhttp_cb on_message_begin;
+ nullptr, // llhttp_data_cb on_url;
+ nullptr, // llhttp_data_cb on_status;
+ nullptr, // llhttp_data_cb on_method;
+ nullptr, // llhttp_data_cb on_version;
+ nullptr, // llhttp_data_cb on_header_field;
+ nullptr, // llhttp_data_cb on_header_value;
+ nullptr, // llhttp_data_cb on_chunk_extension_name;
+ nullptr, // llhttp_data_cb on_chunk_extension_value;
+ htp_hdrs_completecb, // llhttp_cb on_headers_complete;
+ nullptr, // llhttp_data_cb on_body;
+ nullptr, // llhttp_cb on_message_complete;
+ nullptr, // llhttp_cb on_url_complete;
+ nullptr, // llhttp_cb on_status_complete;
+ nullptr, // llhttp_cb on_method_complete;
+ nullptr, // llhttp_cb on_version_complete;
+ nullptr, // llhttp_cb on_header_field_complete;
+ nullptr, // llhttp_cb on_header_value_complete;
+ nullptr, // llhttp_cb on_chunk_extension_name_complete;
+ nullptr, // llhttp_cb on_chunk_extension_value_complete;
+ nullptr, // llhttp_cb on_chunk_header;
+ nullptr, // llhttp_cb on_chunk_complete;
+ nullptr, // llhttp_cb on_reset;
+};
+} // namespace
+
+int Http2Session::initiate_connection() {
+ int rv = 0;
+
+ auto worker_blocker = worker_->get_connect_blocker();
+
+ if (state_ == Http2SessionState::DISCONNECTED ||
+ state_ == Http2SessionState::RESOLVING_NAME) {
+ if (worker_blocker->blocked()) {
+ if (LOG_ENABLED(INFO)) {
+ SSLOG(INFO, this)
+ << "Worker wide backend connection was blocked temporarily";
+ }
+ return -1;
+ }
+ }
+
+ auto &downstreamconf = *get_config()->conn.downstream;
+
+ const auto &proxy = get_config()->downstream_http_proxy;
+ if (!proxy.host.empty() && state_ == Http2SessionState::DISCONNECTED) {
+ if (LOG_ENABLED(INFO)) {
+ SSLOG(INFO, this) << "Connecting to the proxy " << proxy.host << ":"
+ << proxy.port;
+ }
+
+ conn_.fd = util::create_nonblock_socket(proxy.addr.su.storage.ss_family);
+
+ if (conn_.fd == -1) {
+ auto error = errno;
+ SSLOG(WARN, this) << "Backend proxy socket() failed; addr="
+ << util::to_numeric_addr(&proxy.addr)
+ << ", errno=" << error;
+
+ worker_blocker->on_failure();
+ return -1;
+ }
+
+ rv = connect(conn_.fd, &proxy.addr.su.sa, proxy.addr.len);
+ if (rv != 0 && errno != EINPROGRESS) {
+ auto error = errno;
+ SSLOG(WARN, this) << "Backend proxy connect() failed; addr="
+ << util::to_numeric_addr(&proxy.addr)
+ << ", errno=" << error;
+
+ worker_blocker->on_failure();
+
+ return -1;
+ }
+
+ raddr_ = &proxy.addr;
+
+ worker_blocker->on_success();
+
+ ev_io_set(&conn_.rev, conn_.fd, EV_READ);
+ ev_io_set(&conn_.wev, conn_.fd, EV_WRITE);
+
+ conn_.wlimit.startw();
+
+ conn_.wt.repeat = downstreamconf.timeout.connect;
+ ev_timer_again(conn_.loop, &conn_.wt);
+
+ write_ = &Http2Session::connected;
+
+ on_read_ = &Http2Session::downstream_read_proxy;
+ on_write_ = &Http2Session::downstream_connect_proxy;
+
+ proxy_htp_ = std::make_unique<llhttp_t>();
+ llhttp_init(proxy_htp_.get(), HTTP_RESPONSE, &htp_hooks);
+ proxy_htp_->data = this;
+
+ state_ = Http2SessionState::PROXY_CONNECTING;
+
+ return 0;
+ }
+
+ if (state_ == Http2SessionState::DISCONNECTED ||
+ state_ == Http2SessionState::PROXY_CONNECTED ||
+ state_ == Http2SessionState::RESOLVING_NAME) {
+ if (LOG_ENABLED(INFO)) {
+ if (state_ != Http2SessionState::RESOLVING_NAME) {
+ SSLOG(INFO, this) << "Connecting to downstream server";
+ }
+ }
+ if (addr_->tls) {
+ assert(ssl_ctx_);
+
+ if (state_ != Http2SessionState::RESOLVING_NAME) {
+ auto ssl = tls::create_ssl(ssl_ctx_);
+ if (!ssl) {
+ return -1;
+ }
+
+ tls::setup_downstream_http2_alpn(ssl);
+
+ conn_.set_ssl(ssl);
+ conn_.tls.client_session_cache = &addr_->tls_session_cache;
+
+ auto sni_name =
+ addr_->sni.empty() ? StringRef{addr_->host} : StringRef{addr_->sni};
+
+ if (!util::numeric_host(sni_name.c_str())) {
+ // TLS extensions: SNI. There is no documentation about the return
+ // code for this function (actually this is macro wrapping SSL_ctrl
+ // at the time of this writing).
+ SSL_set_tlsext_host_name(conn_.tls.ssl, sni_name.c_str());
+ }
+
+ auto tls_session = tls::reuse_tls_session(addr_->tls_session_cache);
+ if (tls_session) {
+ SSL_set_session(conn_.tls.ssl, tls_session);
+ SSL_SESSION_free(tls_session);
+ }
+ }
+
+ if (state_ == Http2SessionState::DISCONNECTED) {
+ if (addr_->dns) {
+ rv = resolve_name();
+ if (rv != 0) {
+ downstream_failure(addr_, nullptr);
+ return -1;
+ }
+ if (state_ == Http2SessionState::RESOLVING_NAME) {
+ return 0;
+ }
+ raddr_ = resolved_addr_.get();
+ } else {
+ raddr_ = &addr_->addr;
+ }
+ }
+
+ if (state_ == Http2SessionState::RESOLVING_NAME) {
+ if (dns_query_->status == DNSResolverStatus::ERROR) {
+ downstream_failure(addr_, nullptr);
+ return -1;
+ }
+ assert(dns_query_->status == DNSResolverStatus::OK);
+ state_ = Http2SessionState::DISCONNECTED;
+ dns_query_.reset();
+ raddr_ = resolved_addr_.get();
+ }
+
+ // If state_ == Http2SessionState::PROXY_CONNECTED, we have
+ // connected to the proxy using conn_.fd and tunnel has been
+ // established.
+ if (state_ == Http2SessionState::DISCONNECTED) {
+ assert(conn_.fd == -1);
+
+ conn_.fd = util::create_nonblock_socket(raddr_->su.storage.ss_family);
+ if (conn_.fd == -1) {
+ auto error = errno;
+ SSLOG(WARN, this)
+ << "socket() failed; addr=" << util::to_numeric_addr(raddr_)
+ << ", errno=" << error;
+
+ worker_blocker->on_failure();
+ return -1;
+ }
+
+ worker_blocker->on_success();
+
+ rv = connect(conn_.fd,
+ // TODO maybe not thread-safe?
+ const_cast<sockaddr *>(&raddr_->su.sa), raddr_->len);
+ if (rv != 0 && errno != EINPROGRESS) {
+ auto error = errno;
+ SSLOG(WARN, this)
+ << "connect() failed; addr=" << util::to_numeric_addr(raddr_)
+ << ", errno=" << error;
+
+ downstream_failure(addr_, raddr_);
+ return -1;
+ }
+
+ ev_io_set(&conn_.rev, conn_.fd, EV_READ);
+ ev_io_set(&conn_.wev, conn_.fd, EV_WRITE);
+ }
+
+ conn_.prepare_client_handshake();
+ } else {
+ if (state_ == Http2SessionState::DISCONNECTED) {
+ // Without TLS and proxy.
+ if (addr_->dns) {
+ rv = resolve_name();
+ if (rv != 0) {
+ downstream_failure(addr_, nullptr);
+ return -1;
+ }
+ if (state_ == Http2SessionState::RESOLVING_NAME) {
+ return 0;
+ }
+ raddr_ = resolved_addr_.get();
+ } else {
+ raddr_ = &addr_->addr;
+ }
+ }
+
+ if (state_ == Http2SessionState::RESOLVING_NAME) {
+ if (dns_query_->status == DNSResolverStatus::ERROR) {
+ downstream_failure(addr_, nullptr);
+ return -1;
+ }
+ assert(dns_query_->status == DNSResolverStatus::OK);
+ state_ = Http2SessionState::DISCONNECTED;
+ dns_query_.reset();
+ raddr_ = resolved_addr_.get();
+ }
+
+ if (state_ == Http2SessionState::DISCONNECTED) {
+ // Without TLS and proxy.
+ assert(conn_.fd == -1);
+
+ conn_.fd = util::create_nonblock_socket(raddr_->su.storage.ss_family);
+
+ if (conn_.fd == -1) {
+ auto error = errno;
+ SSLOG(WARN, this)
+ << "socket() failed; addr=" << util::to_numeric_addr(raddr_)
+ << ", errno=" << error;
+
+ worker_blocker->on_failure();
+ return -1;
+ }
+
+ worker_blocker->on_success();
+
+ rv = connect(conn_.fd, const_cast<sockaddr *>(&raddr_->su.sa),
+ raddr_->len);
+ if (rv != 0 && errno != EINPROGRESS) {
+ auto error = errno;
+ SSLOG(WARN, this)
+ << "connect() failed; addr=" << util::to_numeric_addr(raddr_)
+ << ", errno=" << error;
+
+ downstream_failure(addr_, raddr_);
+ return -1;
+ }
+
+ ev_io_set(&conn_.rev, conn_.fd, EV_READ);
+ ev_io_set(&conn_.wev, conn_.fd, EV_WRITE);
+ }
+ }
+
+ // We have been already connected when no TLS and proxy is used.
+ if (state_ == Http2SessionState::PROXY_CONNECTED) {
+ on_read_ = &Http2Session::read_noop;
+ on_write_ = &Http2Session::write_noop;
+
+ return connected();
+ }
+
+ write_ = &Http2Session::connected;
+
+ state_ = Http2SessionState::CONNECTING;
+ conn_.wlimit.startw();
+
+ conn_.wt.repeat = downstreamconf.timeout.connect;
+ ev_timer_again(conn_.loop, &conn_.wt);
+
+ return 0;
+ }
+
+ // Unreachable
+ assert(0);
+
+ return 0;
+}
+
+namespace {
+int htp_hdrs_completecb(llhttp_t *htp) {
+ auto http2session = static_cast<Http2Session *>(htp->data);
+
+ // We only read HTTP header part. If tunneling succeeds, response
+ // body is a different protocol (HTTP/2 in this case), we don't read
+ // them here.
+
+ // We just check status code here
+ if (htp->status_code / 100 == 2) {
+ if (LOG_ENABLED(INFO)) {
+ SSLOG(INFO, http2session) << "Tunneling success";
+ }
+ http2session->set_state(Http2SessionState::PROXY_CONNECTED);
+
+ return HPE_PAUSED;
+ }
+
+ SSLOG(WARN, http2session) << "Tunneling failed: " << htp->status_code;
+ http2session->set_state(Http2SessionState::PROXY_FAILED);
+
+ return HPE_PAUSED;
+}
+} // namespace
+
+int Http2Session::downstream_read_proxy(const uint8_t *data, size_t datalen) {
+ auto htperr = llhttp_execute(proxy_htp_.get(),
+ reinterpret_cast<const char *>(data), datalen);
+ if (htperr == HPE_PAUSED) {
+ switch (state_) {
+ case Http2SessionState::PROXY_CONNECTED:
+ // Initiate SSL/TLS handshake through established tunnel.
+ if (initiate_connection() != 0) {
+ return -1;
+ }
+ return 0;
+ case Http2SessionState::PROXY_FAILED:
+ return -1;
+ default:
+ break;
+ }
+ // should not be here
+ assert(0);
+ }
+
+ if (htperr != HPE_OK) {
+ return -1;
+ }
+
+ return 0;
+}
+
+int Http2Session::downstream_connect_proxy() {
+ if (LOG_ENABLED(INFO)) {
+ SSLOG(INFO, this) << "Connected to the proxy";
+ }
+
+ std::string req = "CONNECT ";
+ req.append(addr_->hostport.c_str(), addr_->hostport.size());
+ if (addr_->port == 80 || addr_->port == 443) {
+ req += ':';
+ req += util::utos(addr_->port);
+ }
+ req += " HTTP/1.1\r\nHost: ";
+ req += addr_->host;
+ req += "\r\n";
+ const auto &proxy = get_config()->downstream_http_proxy;
+ if (!proxy.userinfo.empty()) {
+ req += "Proxy-Authorization: Basic ";
+ req += base64::encode(std::begin(proxy.userinfo), std::end(proxy.userinfo));
+ req += "\r\n";
+ }
+ req += "\r\n";
+ if (LOG_ENABLED(INFO)) {
+ SSLOG(INFO, this) << "HTTP proxy request headers\n" << req;
+ }
+ wb_.append(req);
+
+ on_write_ = &Http2Session::write_noop;
+
+ signal_write();
+ return 0;
+}
+
+void Http2Session::add_downstream_connection(Http2DownstreamConnection *dconn) {
+ dconns_.append(dconn);
+ ++addr_->num_dconn;
+}
+
+void Http2Session::remove_downstream_connection(
+ Http2DownstreamConnection *dconn) {
+ --addr_->num_dconn;
+ dconns_.remove(dconn);
+ dconn->detach_stream_data();
+
+ if (LOG_ENABLED(INFO)) {
+ SSLOG(INFO, this) << "Remove downstream";
+ }
+
+ if (freelist_zone_ == FreelistZone::NONE && !max_concurrency_reached()) {
+ if (LOG_ENABLED(INFO)) {
+ SSLOG(INFO, this) << "Append to http2_extra_freelist, addr=" << addr_
+ << ", freelist.size="
+ << addr_->http2_extra_freelist.size();
+ }
+
+ add_to_extra_freelist();
+ }
+}
+
+void Http2Session::remove_stream_data(StreamData *sd) {
+ streams_.remove(sd);
+ if (sd->dconn) {
+ sd->dconn->detach_stream_data();
+ }
+ delete sd;
+}
+
+int Http2Session::submit_request(Http2DownstreamConnection *dconn,
+ const nghttp2_nv *nva, size_t nvlen,
+ const nghttp2_data_provider *data_prd) {
+ assert(state_ == Http2SessionState::CONNECTED);
+ auto sd = std::make_unique<StreamData>();
+ sd->dlnext = sd->dlprev = nullptr;
+ // TODO Specify nullptr to pri_spec for now
+ auto stream_id =
+ nghttp2_submit_request(session_, nullptr, nva, nvlen, data_prd, sd.get());
+ if (stream_id < 0) {
+ SSLOG(FATAL, this) << "nghttp2_submit_request() failed: "
+ << nghttp2_strerror(stream_id);
+ return -1;
+ }
+
+ dconn->attach_stream_data(sd.get());
+ dconn->get_downstream()->set_downstream_stream_id(stream_id);
+ streams_.append(sd.release());
+
+ return 0;
+}
+
+int Http2Session::submit_rst_stream(int32_t stream_id, uint32_t error_code) {
+ assert(state_ == Http2SessionState::CONNECTED);
+ if (LOG_ENABLED(INFO)) {
+ SSLOG(INFO, this) << "RST_STREAM stream_id=" << stream_id
+ << " with error_code=" << error_code;
+ }
+ int rv = nghttp2_submit_rst_stream(session_, NGHTTP2_FLAG_NONE, stream_id,
+ error_code);
+ if (rv != 0) {
+ SSLOG(FATAL, this) << "nghttp2_submit_rst_stream() failed: "
+ << nghttp2_strerror(rv);
+ return -1;
+ }
+ return 0;
+}
+
+nghttp2_session *Http2Session::get_session() const { return session_; }
+
+int Http2Session::resume_data(Http2DownstreamConnection *dconn) {
+ assert(state_ == Http2SessionState::CONNECTED);
+ auto downstream = dconn->get_downstream();
+ int rv = nghttp2_session_resume_data(session_,
+ downstream->get_downstream_stream_id());
+ switch (rv) {
+ case 0:
+ case NGHTTP2_ERR_INVALID_ARGUMENT:
+ return 0;
+ default:
+ SSLOG(FATAL, this) << "nghttp2_resume_session() failed: "
+ << nghttp2_strerror(rv);
+ return -1;
+ }
+}
+
+namespace {
+void call_downstream_readcb(Http2Session *http2session,
+ Downstream *downstream) {
+ auto upstream = downstream->get_upstream();
+ if (!upstream) {
+ return;
+ }
+ if (upstream->downstream_read(downstream->get_downstream_connection()) != 0) {
+ delete upstream->get_client_handler();
+ }
+}
+} // namespace
+
+namespace {
+int on_stream_close_callback(nghttp2_session *session, int32_t stream_id,
+ uint32_t error_code, void *user_data) {
+ auto http2session = static_cast<Http2Session *>(user_data);
+ if (LOG_ENABLED(INFO)) {
+ SSLOG(INFO, http2session)
+ << "Stream stream_id=" << stream_id
+ << " is being closed with error code " << error_code;
+ }
+ auto sd = static_cast<StreamData *>(
+ nghttp2_session_get_stream_user_data(session, stream_id));
+ if (sd == 0) {
+ // We might get this close callback when pushed streams are
+ // closed.
+ return 0;
+ }
+ auto dconn = sd->dconn;
+ if (dconn) {
+ auto downstream = dconn->get_downstream();
+ auto upstream = downstream->get_upstream();
+
+ if (downstream->get_downstream_stream_id() % 2 == 0 &&
+ downstream->get_request_state() == DownstreamState::INITIAL) {
+ // Downstream is canceled in backend before it is submitted in
+ // frontend session.
+
+ // This will avoid to send RST_STREAM to backend
+ downstream->set_response_state(DownstreamState::MSG_RESET);
+ upstream->cancel_premature_downstream(downstream);
+ } else {
+ if (downstream->get_upgraded() && downstream->get_response_state() ==
+ DownstreamState::HEADER_COMPLETE) {
+ // For tunneled connection, we have to submit RST_STREAM to
+ // upstream *after* whole response body is sent. We just set
+ // MSG_COMPLETE here. Upstream will take care of that.
+ downstream->get_upstream()->on_downstream_body_complete(downstream);
+ downstream->set_response_state(DownstreamState::MSG_COMPLETE);
+ } else if (error_code == NGHTTP2_NO_ERROR) {
+ switch (downstream->get_response_state()) {
+ case DownstreamState::MSG_COMPLETE:
+ case DownstreamState::MSG_BAD_HEADER:
+ break;
+ default:
+ downstream->set_response_state(DownstreamState::MSG_RESET);
+ }
+ } else if (downstream->get_response_state() !=
+ DownstreamState::MSG_BAD_HEADER) {
+ downstream->set_response_state(DownstreamState::MSG_RESET);
+ }
+ if (downstream->get_response_state() == DownstreamState::MSG_RESET &&
+ downstream->get_response_rst_stream_error_code() ==
+ NGHTTP2_NO_ERROR) {
+ downstream->set_response_rst_stream_error_code(error_code);
+ }
+ call_downstream_readcb(http2session, downstream);
+ }
+ // dconn may be deleted
+ }
+ // The life time of StreamData ends here
+ http2session->remove_stream_data(sd);
+ return 0;
+}
+} // namespace
+
+void Http2Session::start_settings_timer() {
+ auto &downstreamconf = get_config()->http2.downstream;
+
+ ev_timer_set(&settings_timer_, downstreamconf.timeout.settings, 0.);
+ ev_timer_start(conn_.loop, &settings_timer_);
+}
+
+void Http2Session::stop_settings_timer() {
+ ev_timer_stop(conn_.loop, &settings_timer_);
+}
+
+namespace {
+int on_header_callback2(nghttp2_session *session, const nghttp2_frame *frame,
+ nghttp2_rcbuf *name, nghttp2_rcbuf *value,
+ uint8_t flags, void *user_data) {
+ auto http2session = static_cast<Http2Session *>(user_data);
+ auto sd = static_cast<StreamData *>(
+ nghttp2_session_get_stream_user_data(session, frame->hd.stream_id));
+ if (!sd || !sd->dconn) {
+ return 0;
+ }
+ auto downstream = sd->dconn->get_downstream();
+
+ auto namebuf = nghttp2_rcbuf_get_buf(name);
+ auto valuebuf = nghttp2_rcbuf_get_buf(value);
+
+ auto &resp = downstream->response();
+ auto &httpconf = get_config()->http;
+
+ switch (frame->hd.type) {
+ case NGHTTP2_HEADERS: {
+ auto trailer = frame->headers.cat == NGHTTP2_HCAT_HEADERS &&
+ !downstream->get_expect_final_response();
+
+ if (resp.fs.buffer_size() + namebuf.len + valuebuf.len >
+ httpconf.response_header_field_buffer ||
+ resp.fs.num_fields() >= httpconf.max_response_header_fields) {
+ if (LOG_ENABLED(INFO)) {
+ DLOG(INFO, downstream)
+ << "Too large or many header field size="
+ << resp.fs.buffer_size() + namebuf.len + valuebuf.len
+ << ", num=" << resp.fs.num_fields() + 1;
+ }
+
+ if (trailer) {
+ // We don't care trailer part exceeds header size limit; just
+ // discard it.
+ return 0;
+ }
+
+ return NGHTTP2_ERR_TEMPORAL_CALLBACK_FAILURE;
+ }
+
+ auto token = http2::lookup_token(namebuf.base, namebuf.len);
+ auto no_index = flags & NGHTTP2_NV_FLAG_NO_INDEX;
+
+ downstream->add_rcbuf(name);
+ downstream->add_rcbuf(value);
+
+ if (trailer) {
+ // just store header fields for trailer part
+ resp.fs.add_trailer_token(StringRef{namebuf.base, namebuf.len},
+ StringRef{valuebuf.base, valuebuf.len},
+ no_index, token);
+ return 0;
+ }
+
+ resp.fs.add_header_token(StringRef{namebuf.base, namebuf.len},
+ StringRef{valuebuf.base, valuebuf.len}, no_index,
+ token);
+ return 0;
+ }
+ case NGHTTP2_PUSH_PROMISE: {
+ auto promised_stream_id = frame->push_promise.promised_stream_id;
+ auto promised_sd = static_cast<StreamData *>(
+ nghttp2_session_get_stream_user_data(session, promised_stream_id));
+ if (!promised_sd || !promised_sd->dconn) {
+ http2session->submit_rst_stream(promised_stream_id, NGHTTP2_CANCEL);
+ return NGHTTP2_ERR_TEMPORAL_CALLBACK_FAILURE;
+ }
+
+ auto promised_downstream = promised_sd->dconn->get_downstream();
+
+ auto namebuf = nghttp2_rcbuf_get_buf(name);
+ auto valuebuf = nghttp2_rcbuf_get_buf(value);
+
+ assert(promised_downstream);
+
+ auto &promised_req = promised_downstream->request();
+
+ // We use request header limit for PUSH_PROMISE
+ if (promised_req.fs.buffer_size() + namebuf.len + valuebuf.len >
+ httpconf.request_header_field_buffer ||
+ promised_req.fs.num_fields() >= httpconf.max_request_header_fields) {
+ if (LOG_ENABLED(INFO)) {
+ DLOG(INFO, downstream)
+ << "Too large or many header field size="
+ << promised_req.fs.buffer_size() + namebuf.len + valuebuf.len
+ << ", num=" << promised_req.fs.num_fields() + 1;
+ }
+
+ return NGHTTP2_ERR_TEMPORAL_CALLBACK_FAILURE;
+ }
+
+ promised_downstream->add_rcbuf(name);
+ promised_downstream->add_rcbuf(value);
+
+ auto token = http2::lookup_token(namebuf.base, namebuf.len);
+ promised_req.fs.add_header_token(StringRef{namebuf.base, namebuf.len},
+ StringRef{valuebuf.base, valuebuf.len},
+ flags & NGHTTP2_NV_FLAG_NO_INDEX, token);
+
+ return 0;
+ }
+ }
+
+ return 0;
+}
+} // namespace
+
+namespace {
+int on_invalid_header_callback2(nghttp2_session *session,
+ const nghttp2_frame *frame, nghttp2_rcbuf *name,
+ nghttp2_rcbuf *value, uint8_t flags,
+ void *user_data) {
+ auto http2session = static_cast<Http2Session *>(user_data);
+ auto sd = static_cast<StreamData *>(
+ nghttp2_session_get_stream_user_data(session, frame->hd.stream_id));
+ if (!sd || !sd->dconn) {
+ return 0;
+ }
+
+ int32_t stream_id;
+
+ if (frame->hd.type == NGHTTP2_PUSH_PROMISE) {
+ stream_id = frame->push_promise.promised_stream_id;
+ } else {
+ stream_id = frame->hd.stream_id;
+ }
+
+ if (LOG_ENABLED(INFO)) {
+ auto namebuf = nghttp2_rcbuf_get_buf(name);
+ auto valuebuf = nghttp2_rcbuf_get_buf(value);
+
+ SSLOG(INFO, http2session)
+ << "Invalid header field for stream_id=" << stream_id
+ << " in frame type=" << static_cast<uint32_t>(frame->hd.type)
+ << ": name=[" << StringRef{namebuf.base, namebuf.len} << "], value=["
+ << StringRef{valuebuf.base, valuebuf.len} << "]";
+ }
+
+ http2session->submit_rst_stream(stream_id, NGHTTP2_PROTOCOL_ERROR);
+
+ return NGHTTP2_ERR_TEMPORAL_CALLBACK_FAILURE;
+}
+} // namespace
+
+namespace {
+int on_begin_headers_callback(nghttp2_session *session,
+ const nghttp2_frame *frame, void *user_data) {
+ auto http2session = static_cast<Http2Session *>(user_data);
+
+ switch (frame->hd.type) {
+ case NGHTTP2_HEADERS: {
+ if (frame->headers.cat != NGHTTP2_HCAT_RESPONSE &&
+ frame->headers.cat != NGHTTP2_HCAT_PUSH_RESPONSE) {
+ return 0;
+ }
+ auto sd = static_cast<StreamData *>(
+ nghttp2_session_get_stream_user_data(session, frame->hd.stream_id));
+ if (!sd || !sd->dconn) {
+ http2session->submit_rst_stream(frame->hd.stream_id,
+ NGHTTP2_INTERNAL_ERROR);
+ return 0;
+ }
+ return 0;
+ }
+ case NGHTTP2_PUSH_PROMISE: {
+ auto promised_stream_id = frame->push_promise.promised_stream_id;
+ auto sd = static_cast<StreamData *>(
+ nghttp2_session_get_stream_user_data(session, frame->hd.stream_id));
+ if (!sd || !sd->dconn) {
+ http2session->submit_rst_stream(promised_stream_id, NGHTTP2_CANCEL);
+ return NGHTTP2_ERR_TEMPORAL_CALLBACK_FAILURE;
+ }
+
+ auto downstream = sd->dconn->get_downstream();
+
+ assert(downstream);
+ assert(downstream->get_downstream_stream_id() == frame->hd.stream_id);
+
+ if (http2session->handle_downstream_push_promise(downstream,
+ promised_stream_id) != 0) {
+ http2session->submit_rst_stream(promised_stream_id, NGHTTP2_CANCEL);
+ return NGHTTP2_ERR_TEMPORAL_CALLBACK_FAILURE;
+ }
+
+ return 0;
+ }
+ }
+
+ return 0;
+}
+} // namespace
+
+namespace {
+int on_response_headers(Http2Session *http2session, Downstream *downstream,
+ nghttp2_session *session, const nghttp2_frame *frame) {
+ int rv;
+
+ auto upstream = downstream->get_upstream();
+ auto handler = upstream->get_client_handler();
+ const auto &req = downstream->request();
+ auto &resp = downstream->response();
+
+ auto &nva = resp.fs.headers();
+
+ auto config = get_config();
+ auto &loggingconf = config->logging;
+
+ downstream->set_expect_final_response(false);
+
+ auto status = resp.fs.header(http2::HD__STATUS);
+ // libnghttp2 guarantees this exists and can be parsed
+ assert(status);
+ auto status_code = http2::parse_http_status_code(status->value);
+
+ resp.http_status = status_code;
+ resp.http_major = 2;
+ resp.http_minor = 0;
+
+ downstream->set_downstream_addr_group(
+ http2session->get_downstream_addr_group());
+ downstream->set_addr(http2session->get_addr());
+
+ if (LOG_ENABLED(INFO)) {
+ std::stringstream ss;
+ for (auto &nv : nva) {
+ ss << TTY_HTTP_HD << nv.name << TTY_RST << ": " << nv.value << "\n";
+ }
+ SSLOG(INFO, http2session)
+ << "HTTP response headers. stream_id=" << frame->hd.stream_id << "\n"
+ << ss.str();
+ }
+
+ if (downstream->get_non_final_response()) {
+
+ if (LOG_ENABLED(INFO)) {
+ SSLOG(INFO, http2session) << "This is non-final response.";
+ }
+
+ downstream->set_expect_final_response(true);
+ rv = upstream->on_downstream_header_complete(downstream);
+
+ // Now Dowstream's response headers are erased.
+
+ if (rv != 0) {
+ http2session->submit_rst_stream(frame->hd.stream_id,
+ NGHTTP2_PROTOCOL_ERROR);
+ downstream->set_response_state(DownstreamState::MSG_RESET);
+ }
+
+ return 0;
+ }
+
+ downstream->set_response_state(DownstreamState::HEADER_COMPLETE);
+ downstream->check_upgrade_fulfilled_http2();
+
+ if (downstream->get_upgraded()) {
+ resp.connection_close = true;
+ // On upgrade success, both ends can send data
+ if (upstream->resume_read(SHRPX_NO_BUFFER, downstream, 0) != 0) {
+ // If resume_read fails, just drop connection. Not ideal.
+ delete handler;
+ return -1;
+ }
+ downstream->set_request_state(DownstreamState::HEADER_COMPLETE);
+ if (LOG_ENABLED(INFO)) {
+ SSLOG(INFO, http2session)
+ << "HTTP upgrade success. stream_id=" << frame->hd.stream_id;
+ }
+ } else {
+ auto content_length = resp.fs.header(http2::HD_CONTENT_LENGTH);
+ if (content_length) {
+ // libnghttp2 guarantees this can be parsed
+ resp.fs.content_length = util::parse_uint(content_length->value);
+ }
+
+ if (resp.fs.content_length == -1 && downstream->expect_response_body()) {
+ // Here we have response body but Content-Length is not known in
+ // advance.
+ if (req.http_major <= 0 || (req.http_major == 1 && req.http_minor == 0)) {
+ // We simply close connection for pre-HTTP/1.1 in this case.
+ resp.connection_close = true;
+ } else {
+ // Otherwise, use chunked encoding to keep upstream connection
+ // open. In HTTP2, we are supposed not to receive
+ // transfer-encoding.
+ resp.fs.add_header_token(StringRef::from_lit("transfer-encoding"),
+ StringRef::from_lit("chunked"), false,
+ http2::HD_TRANSFER_ENCODING);
+ downstream->set_chunked_response(true);
+ }
+ }
+ }
+
+ if (frame->hd.flags & NGHTTP2_FLAG_END_STREAM) {
+ resp.headers_only = true;
+ }
+
+ if (loggingconf.access.write_early && downstream->accesslog_ready()) {
+ handler->write_accesslog(downstream);
+ downstream->set_accesslog_written(true);
+ }
+
+ rv = upstream->on_downstream_header_complete(downstream);
+ if (rv != 0) {
+ // Handling early return (in other words, response was hijacked by
+ // mruby scripting).
+ if (downstream->get_response_state() == DownstreamState::MSG_COMPLETE) {
+ http2session->submit_rst_stream(frame->hd.stream_id, NGHTTP2_CANCEL);
+ } else {
+ http2session->submit_rst_stream(frame->hd.stream_id,
+ NGHTTP2_INTERNAL_ERROR);
+ downstream->set_response_state(DownstreamState::MSG_RESET);
+ }
+ }
+
+ return 0;
+}
+} // namespace
+
+namespace {
+int on_frame_recv_callback(nghttp2_session *session, const nghttp2_frame *frame,
+ void *user_data) {
+ int rv;
+ auto http2session = static_cast<Http2Session *>(user_data);
+
+ switch (frame->hd.type) {
+ case NGHTTP2_DATA: {
+ auto sd = static_cast<StreamData *>(
+ nghttp2_session_get_stream_user_data(session, frame->hd.stream_id));
+ if (!sd || !sd->dconn) {
+ return 0;
+ }
+ auto downstream = sd->dconn->get_downstream();
+ auto upstream = downstream->get_upstream();
+ rv = upstream->on_downstream_body(downstream, nullptr, 0, true);
+ if (rv != 0) {
+ http2session->submit_rst_stream(frame->hd.stream_id,
+ NGHTTP2_INTERNAL_ERROR);
+ downstream->set_response_state(DownstreamState::MSG_RESET);
+
+ } else if (frame->hd.flags & NGHTTP2_FLAG_END_STREAM) {
+
+ downstream->disable_downstream_rtimer();
+
+ if (downstream->get_response_state() ==
+ DownstreamState::HEADER_COMPLETE) {
+
+ downstream->set_response_state(DownstreamState::MSG_COMPLETE);
+
+ rv = upstream->on_downstream_body_complete(downstream);
+
+ if (rv != 0) {
+ downstream->set_response_state(DownstreamState::MSG_RESET);
+ }
+ }
+ }
+
+ call_downstream_readcb(http2session, downstream);
+ return 0;
+ }
+ case NGHTTP2_HEADERS: {
+ auto sd = static_cast<StreamData *>(
+ nghttp2_session_get_stream_user_data(session, frame->hd.stream_id));
+ if (!sd || !sd->dconn) {
+ return 0;
+ }
+ auto downstream = sd->dconn->get_downstream();
+
+ if (frame->headers.cat == NGHTTP2_HCAT_RESPONSE ||
+ frame->headers.cat == NGHTTP2_HCAT_PUSH_RESPONSE) {
+ rv = on_response_headers(http2session, downstream, session, frame);
+
+ if (rv != 0) {
+ return 0;
+ }
+ } else if (frame->headers.cat == NGHTTP2_HCAT_HEADERS) {
+ if (downstream->get_expect_final_response()) {
+ rv = on_response_headers(http2session, downstream, session, frame);
+
+ if (rv != 0) {
+ return 0;
+ }
+ }
+ }
+
+ if (frame->hd.flags & NGHTTP2_FLAG_END_STREAM) {
+ downstream->disable_downstream_rtimer();
+
+ if (downstream->get_response_state() ==
+ DownstreamState::HEADER_COMPLETE) {
+ downstream->set_response_state(DownstreamState::MSG_COMPLETE);
+
+ auto upstream = downstream->get_upstream();
+
+ rv = upstream->on_downstream_body_complete(downstream);
+
+ if (rv != 0) {
+ downstream->set_response_state(DownstreamState::MSG_RESET);
+ }
+ }
+ } else {
+ downstream->reset_downstream_rtimer();
+ }
+
+ // This may delete downstream
+ call_downstream_readcb(http2session, downstream);
+
+ return 0;
+ }
+ case NGHTTP2_RST_STREAM: {
+ auto sd = static_cast<StreamData *>(
+ nghttp2_session_get_stream_user_data(session, frame->hd.stream_id));
+ if (sd && sd->dconn) {
+ auto downstream = sd->dconn->get_downstream();
+ downstream->set_response_rst_stream_error_code(
+ frame->rst_stream.error_code);
+ call_downstream_readcb(http2session, downstream);
+ }
+ return 0;
+ }
+ case NGHTTP2_SETTINGS: {
+ if ((frame->hd.flags & NGHTTP2_FLAG_ACK) == 0) {
+ http2session->on_settings_received(frame);
+ return 0;
+ }
+
+ http2session->stop_settings_timer();
+
+ auto addr = http2session->get_addr();
+ auto &connect_blocker = addr->connect_blocker;
+
+ connect_blocker->on_success();
+
+ return 0;
+ }
+ case NGHTTP2_PING:
+ if (frame->hd.flags & NGHTTP2_FLAG_ACK) {
+ if (LOG_ENABLED(INFO)) {
+ LOG(INFO) << "PING ACK received";
+ }
+ http2session->connection_alive();
+ }
+ return 0;
+ case NGHTTP2_PUSH_PROMISE: {
+ auto promised_stream_id = frame->push_promise.promised_stream_id;
+
+ if (LOG_ENABLED(INFO)) {
+ SSLOG(INFO, http2session)
+ << "Received downstream PUSH_PROMISE stream_id="
+ << frame->hd.stream_id
+ << ", promised_stream_id=" << promised_stream_id;
+ }
+
+ auto sd = static_cast<StreamData *>(
+ nghttp2_session_get_stream_user_data(session, frame->hd.stream_id));
+ if (!sd || !sd->dconn) {
+ http2session->submit_rst_stream(promised_stream_id, NGHTTP2_CANCEL);
+ return 0;
+ }
+
+ auto downstream = sd->dconn->get_downstream();
+
+ assert(downstream);
+ assert(downstream->get_downstream_stream_id() == frame->hd.stream_id);
+
+ auto promised_sd = static_cast<StreamData *>(
+ nghttp2_session_get_stream_user_data(session, promised_stream_id));
+ if (!promised_sd || !promised_sd->dconn) {
+ http2session->submit_rst_stream(promised_stream_id, NGHTTP2_CANCEL);
+ return 0;
+ }
+
+ auto promised_downstream = promised_sd->dconn->get_downstream();
+
+ assert(promised_downstream);
+
+ if (http2session->handle_downstream_push_promise_complete(
+ downstream, promised_downstream) != 0) {
+ http2session->submit_rst_stream(promised_stream_id, NGHTTP2_CANCEL);
+ return 0;
+ }
+
+ return 0;
+ }
+ case NGHTTP2_GOAWAY:
+ if (LOG_ENABLED(INFO)) {
+ auto debug_data = util::ascii_dump(frame->goaway.opaque_data,
+ frame->goaway.opaque_data_len);
+
+ SSLOG(INFO, http2session)
+ << "GOAWAY received: last-stream-id=" << frame->goaway.last_stream_id
+ << ", error_code=" << frame->goaway.error_code
+ << ", debug_data=" << debug_data;
+ }
+ return 0;
+ default:
+ return 0;
+ }
+}
+} // namespace
+
+namespace {
+int on_data_chunk_recv_callback(nghttp2_session *session, uint8_t flags,
+ int32_t stream_id, const uint8_t *data,
+ size_t len, void *user_data) {
+ int rv;
+ auto http2session = static_cast<Http2Session *>(user_data);
+ auto sd = static_cast<StreamData *>(
+ nghttp2_session_get_stream_user_data(session, stream_id));
+ if (!sd || !sd->dconn) {
+ http2session->submit_rst_stream(stream_id, NGHTTP2_INTERNAL_ERROR);
+
+ if (http2session->consume(stream_id, len) != 0) {
+ return NGHTTP2_ERR_CALLBACK_FAILURE;
+ }
+
+ return 0;
+ }
+ auto downstream = sd->dconn->get_downstream();
+ if (!downstream->expect_response_body()) {
+ http2session->submit_rst_stream(stream_id, NGHTTP2_INTERNAL_ERROR);
+
+ if (http2session->consume(stream_id, len) != 0) {
+ return NGHTTP2_ERR_CALLBACK_FAILURE;
+ }
+
+ return 0;
+ }
+
+ // We don't want DATA after non-final response, which is illegal in
+ // HTTP.
+ if (downstream->get_non_final_response()) {
+ http2session->submit_rst_stream(stream_id, NGHTTP2_PROTOCOL_ERROR);
+
+ if (http2session->consume(stream_id, len) != 0) {
+ return NGHTTP2_ERR_CALLBACK_FAILURE;
+ }
+
+ return 0;
+ }
+
+ downstream->reset_downstream_rtimer();
+
+ auto &resp = downstream->response();
+
+ resp.recv_body_length += len;
+ resp.unconsumed_body_length += len;
+
+ auto upstream = downstream->get_upstream();
+ rv = upstream->on_downstream_body(downstream, data, len, false);
+ if (rv != 0) {
+ http2session->submit_rst_stream(stream_id, NGHTTP2_INTERNAL_ERROR);
+
+ if (http2session->consume(stream_id, len) != 0) {
+ return NGHTTP2_ERR_CALLBACK_FAILURE;
+ }
+
+ downstream->set_response_state(DownstreamState::MSG_RESET);
+ }
+
+ call_downstream_readcb(http2session, downstream);
+ return 0;
+}
+} // namespace
+
+namespace {
+int on_frame_send_callback(nghttp2_session *session, const nghttp2_frame *frame,
+ void *user_data) {
+ auto http2session = static_cast<Http2Session *>(user_data);
+
+ if (frame->hd.type == NGHTTP2_DATA || frame->hd.type == NGHTTP2_HEADERS) {
+ auto sd = static_cast<StreamData *>(
+ nghttp2_session_get_stream_user_data(session, frame->hd.stream_id));
+
+ if (!sd || !sd->dconn) {
+ return 0;
+ }
+
+ auto downstream = sd->dconn->get_downstream();
+
+ if (frame->hd.type == NGHTTP2_HEADERS &&
+ frame->headers.cat == NGHTTP2_HCAT_REQUEST) {
+ downstream->set_request_header_sent(true);
+ auto src = downstream->get_blocked_request_buf();
+ if (src->rleft()) {
+ auto dest = downstream->get_request_buf();
+ src->remove(*dest);
+ if (http2session->resume_data(sd->dconn) != 0) {
+ return NGHTTP2_ERR_CALLBACK_FAILURE;
+ }
+ downstream->ensure_downstream_wtimer();
+ }
+ }
+
+ if ((frame->hd.flags & NGHTTP2_FLAG_END_STREAM) == 0) {
+ return 0;
+ }
+
+ downstream->reset_downstream_rtimer();
+
+ return 0;
+ }
+
+ if (frame->hd.type == NGHTTP2_SETTINGS &&
+ (frame->hd.flags & NGHTTP2_FLAG_ACK) == 0) {
+ http2session->start_settings_timer();
+ }
+ return 0;
+}
+} // namespace
+
+namespace {
+int on_frame_not_send_callback(nghttp2_session *session,
+ const nghttp2_frame *frame, int lib_error_code,
+ void *user_data) {
+ auto http2session = static_cast<Http2Session *>(user_data);
+ if (LOG_ENABLED(INFO)) {
+ SSLOG(INFO, http2session) << "Failed to send control frame type="
+ << static_cast<uint32_t>(frame->hd.type)
+ << ", lib_error_code=" << lib_error_code << ": "
+ << nghttp2_strerror(lib_error_code);
+ }
+ if (frame->hd.type != NGHTTP2_HEADERS ||
+ lib_error_code == NGHTTP2_ERR_STREAM_CLOSED ||
+ lib_error_code == NGHTTP2_ERR_STREAM_CLOSING) {
+ return 0;
+ }
+
+ auto sd = static_cast<StreamData *>(
+ nghttp2_session_get_stream_user_data(session, frame->hd.stream_id));
+ if (!sd) {
+ return 0;
+ }
+ if (!sd->dconn) {
+ return 0;
+ }
+ auto downstream = sd->dconn->get_downstream();
+
+ if (lib_error_code == NGHTTP2_ERR_START_STREAM_NOT_ALLOWED) {
+ // Migrate to another downstream connection.
+ auto upstream = downstream->get_upstream();
+
+ if (upstream->on_downstream_reset(downstream, false)) {
+ // This should be done for h1 upstream only. Deleting
+ // ClientHandler for h2 upstream may lead to crash.
+ delete upstream->get_client_handler();
+ }
+
+ return 0;
+ }
+
+ // To avoid stream hanging around, flag DownstreamState::MSG_RESET.
+ downstream->set_response_state(DownstreamState::MSG_RESET);
+ call_downstream_readcb(http2session, downstream);
+
+ return 0;
+}
+} // namespace
+
+namespace {
+constexpr auto PADDING = std::array<uint8_t, 256>{};
+} // namespace
+
+namespace {
+int send_data_callback(nghttp2_session *session, nghttp2_frame *frame,
+ const uint8_t *framehd, size_t length,
+ nghttp2_data_source *source, void *user_data) {
+ auto http2session = static_cast<Http2Session *>(user_data);
+ auto sd = static_cast<StreamData *>(
+ nghttp2_session_get_stream_user_data(session, frame->hd.stream_id));
+
+ if (sd == nullptr) {
+ return NGHTTP2_ERR_TEMPORAL_CALLBACK_FAILURE;
+ }
+
+ auto dconn = sd->dconn;
+ auto downstream = dconn->get_downstream();
+ auto input = downstream->get_request_buf();
+ auto wb = http2session->get_request_buf();
+
+ size_t padlen = 0;
+
+ wb->append(framehd, 9);
+ if (frame->data.padlen > 0) {
+ padlen = frame->data.padlen - 1;
+ wb->append(static_cast<uint8_t>(padlen));
+ }
+
+ input->remove(*wb, length);
+
+ wb->append(PADDING.data(), padlen);
+
+ if (input->rleft() == 0) {
+ downstream->disable_downstream_wtimer();
+ } else {
+ downstream->reset_downstream_wtimer();
+ }
+
+ if (length > 0) {
+ // This is important because it will handle flow control
+ // stuff.
+ if (downstream->get_upstream()->resume_read(SHRPX_NO_BUFFER, downstream,
+ length) != 0) {
+ // In this case, downstream may be deleted.
+ return NGHTTP2_ERR_TEMPORAL_CALLBACK_FAILURE;
+ }
+
+ // Here sd->dconn could be nullptr, because
+ // Upstream::resume_read() may delete downstream which will delete
+ // dconn. Is this still really true?
+ }
+
+ return 0;
+}
+} // namespace
+
+nghttp2_session_callbacks *create_http2_downstream_callbacks() {
+ int rv;
+ nghttp2_session_callbacks *callbacks;
+
+ rv = nghttp2_session_callbacks_new(&callbacks);
+
+ if (rv != 0) {
+ return nullptr;
+ }
+
+ nghttp2_session_callbacks_set_on_stream_close_callback(
+ callbacks, on_stream_close_callback);
+
+ nghttp2_session_callbacks_set_on_frame_recv_callback(callbacks,
+ on_frame_recv_callback);
+
+ nghttp2_session_callbacks_set_on_data_chunk_recv_callback(
+ callbacks, on_data_chunk_recv_callback);
+
+ nghttp2_session_callbacks_set_on_frame_send_callback(callbacks,
+ on_frame_send_callback);
+
+ nghttp2_session_callbacks_set_on_frame_not_send_callback(
+ callbacks, on_frame_not_send_callback);
+
+ nghttp2_session_callbacks_set_on_header_callback2(callbacks,
+ on_header_callback2);
+
+ nghttp2_session_callbacks_set_on_invalid_header_callback2(
+ callbacks, on_invalid_header_callback2);
+
+ nghttp2_session_callbacks_set_on_begin_headers_callback(
+ callbacks, on_begin_headers_callback);
+
+ nghttp2_session_callbacks_set_send_data_callback(callbacks,
+ send_data_callback);
+
+ if (get_config()->padding) {
+ nghttp2_session_callbacks_set_select_padding_callback(
+ callbacks, http::select_padding_callback);
+ }
+
+ return callbacks;
+}
+
+int Http2Session::connection_made() {
+ int rv;
+
+ state_ = Http2SessionState::CONNECTED;
+
+ on_write_ = &Http2Session::downstream_write;
+ on_read_ = &Http2Session::downstream_read;
+
+ if (addr_->tls) {
+ const unsigned char *next_proto = nullptr;
+ unsigned int next_proto_len = 0;
+
+#ifndef OPENSSL_NO_NEXTPROTONEG
+ SSL_get0_next_proto_negotiated(conn_.tls.ssl, &next_proto, &next_proto_len);
+#endif // !OPENSSL_NO_NEXTPROTONEG
+#if OPENSSL_VERSION_NUMBER >= 0x10002000L
+ if (!next_proto) {
+ SSL_get0_alpn_selected(conn_.tls.ssl, &next_proto, &next_proto_len);
+ }
+#endif // OPENSSL_VERSION_NUMBER >= 0x10002000L
+
+ if (!next_proto) {
+ downstream_failure(addr_, raddr_);
+ return -1;
+ }
+
+ auto proto = StringRef{next_proto, next_proto_len};
+ if (LOG_ENABLED(INFO)) {
+ SSLOG(INFO, this) << "Negotiated next protocol: " << proto;
+ }
+ if (!util::check_h2_is_selected(proto)) {
+ downstream_failure(addr_, raddr_);
+ return -1;
+ }
+ }
+
+ auto config = get_config();
+ auto &http2conf = config->http2;
+
+ rv = nghttp2_session_client_new2(&session_, http2conf.downstream.callbacks,
+ this, http2conf.downstream.option);
+
+ if (rv != 0) {
+ return -1;
+ }
+
+ std::array<nghttp2_settings_entry, 5> entry;
+ size_t nentry = 3;
+ entry[0].settings_id = NGHTTP2_SETTINGS_MAX_CONCURRENT_STREAMS;
+ entry[0].value = http2conf.downstream.max_concurrent_streams;
+
+ entry[1].settings_id = NGHTTP2_SETTINGS_INITIAL_WINDOW_SIZE;
+ entry[1].value = http2conf.downstream.window_size;
+
+ entry[2].settings_id = NGHTTP2_SETTINGS_NO_RFC7540_PRIORITIES;
+ entry[2].value = 1;
+
+ if (http2conf.no_server_push || config->http2_proxy) {
+ entry[nentry].settings_id = NGHTTP2_SETTINGS_ENABLE_PUSH;
+ entry[nentry].value = 0;
+ ++nentry;
+ }
+
+ if (http2conf.downstream.decoder_dynamic_table_size !=
+ NGHTTP2_DEFAULT_HEADER_TABLE_SIZE) {
+ entry[nentry].settings_id = NGHTTP2_SETTINGS_HEADER_TABLE_SIZE;
+ entry[nentry].value = http2conf.downstream.decoder_dynamic_table_size;
+ ++nentry;
+ }
+
+ rv = nghttp2_submit_settings(session_, NGHTTP2_FLAG_NONE, entry.data(),
+ nentry);
+ if (rv != 0) {
+ return -1;
+ }
+
+ rv = nghttp2_session_set_local_window_size(
+ session_, NGHTTP2_FLAG_NONE, 0,
+ http2conf.downstream.connection_window_size);
+ if (rv != 0) {
+ return -1;
+ }
+
+ reset_connection_check_timer(CONNCHK_TIMEOUT);
+
+ submit_pending_requests();
+
+ signal_write();
+ return 0;
+}
+
+int Http2Session::do_read() { return read_(*this); }
+int Http2Session::do_write() { return write_(*this); }
+
+int Http2Session::on_read(const uint8_t *data, size_t datalen) {
+ return on_read_(*this, data, datalen);
+}
+
+int Http2Session::on_write() { return on_write_(*this); }
+
+int Http2Session::downstream_read(const uint8_t *data, size_t datalen) {
+ ssize_t rv;
+
+ rv = nghttp2_session_mem_recv(session_, data, datalen);
+ if (rv < 0) {
+ SSLOG(ERROR, this) << "nghttp2_session_mem_recv() returned error: "
+ << nghttp2_strerror(rv);
+ return -1;
+ }
+
+ if (nghttp2_session_want_read(session_) == 0 &&
+ nghttp2_session_want_write(session_) == 0 && wb_.rleft() == 0) {
+ if (LOG_ENABLED(INFO)) {
+ SSLOG(INFO, this) << "No more read/write for this HTTP2 session";
+ }
+ return -1;
+ }
+
+ signal_write();
+ return 0;
+}
+
+int Http2Session::downstream_write() {
+ for (;;) {
+ const uint8_t *data;
+ auto datalen = nghttp2_session_mem_send(session_, &data);
+ if (datalen < 0) {
+ SSLOG(ERROR, this) << "nghttp2_session_mem_send() returned error: "
+ << nghttp2_strerror(datalen);
+ return -1;
+ }
+ if (datalen == 0) {
+ break;
+ }
+ wb_.append(data, datalen);
+
+ if (wb_.rleft() >= MAX_BUFFER_SIZE) {
+ break;
+ }
+ }
+
+ if (nghttp2_session_want_read(session_) == 0 &&
+ nghttp2_session_want_write(session_) == 0 && wb_.rleft() == 0) {
+ if (LOG_ENABLED(INFO)) {
+ SSLOG(INFO, this) << "No more read/write for this session";
+ }
+ return -1;
+ }
+
+ return 0;
+}
+
+void Http2Session::signal_write() {
+ switch (state_) {
+ case Http2SessionState::DISCONNECTED:
+ if (!ev_is_active(&initiate_connection_timer_)) {
+ if (LOG_ENABLED(INFO)) {
+ LOG(INFO) << "Start connecting to backend server";
+ }
+ // Since the timer is set to 0., these will feed 2 events. We
+ // will stop the timer in the initiate_connection_timer_ to void
+ // 2nd event.
+ ev_timer_start(conn_.loop, &initiate_connection_timer_);
+ ev_feed_event(conn_.loop, &initiate_connection_timer_, 0);
+ }
+ break;
+ case Http2SessionState::CONNECTED:
+ conn_.wlimit.startw();
+ break;
+ default:
+ break;
+ }
+}
+
+struct ev_loop *Http2Session::get_loop() const {
+ return conn_.loop;
+}
+
+ev_io *Http2Session::get_wev() { return &conn_.wev; }
+
+Http2SessionState Http2Session::get_state() const { return state_; }
+
+void Http2Session::set_state(Http2SessionState state) { state_ = state; }
+
+int Http2Session::terminate_session(uint32_t error_code) {
+ int rv;
+ rv = nghttp2_session_terminate_session(session_, error_code);
+ if (rv != 0) {
+ return -1;
+ }
+ return 0;
+}
+
+SSL *Http2Session::get_ssl() const { return conn_.tls.ssl; }
+
+int Http2Session::consume(int32_t stream_id, size_t len) {
+ int rv;
+
+ if (!session_) {
+ return 0;
+ }
+
+ rv = nghttp2_session_consume(session_, stream_id, len);
+
+ if (rv != 0) {
+ SSLOG(WARN, this) << "nghttp2_session_consume() returned error: "
+ << nghttp2_strerror(rv);
+
+ return -1;
+ }
+
+ return 0;
+}
+
+bool Http2Session::can_push_request(const Downstream *downstream) const {
+ auto &req = downstream->request();
+ return state_ == Http2SessionState::CONNECTED &&
+ connection_check_state_ == ConnectionCheck::NONE &&
+ (req.connect_proto == ConnectProto::NONE || settings_recved_);
+}
+
+void Http2Session::start_checking_connection() {
+ if (state_ != Http2SessionState::CONNECTED ||
+ connection_check_state_ != ConnectionCheck::REQUIRED) {
+ return;
+ }
+ connection_check_state_ = ConnectionCheck::STARTED;
+
+ SSLOG(INFO, this) << "Start checking connection";
+ // If connection is down, we may get error when writing data. Issue
+ // ping frame to see whether connection is alive.
+ nghttp2_submit_ping(session_, NGHTTP2_FLAG_NONE, nullptr);
+
+ // set ping timeout and start timer again
+ reset_connection_check_timer(CONNCHK_PING_TIMEOUT);
+
+ signal_write();
+}
+
+void Http2Session::reset_connection_check_timer(ev_tstamp t) {
+ connchk_timer_.repeat = t;
+ ev_timer_again(conn_.loop, &connchk_timer_);
+}
+
+void Http2Session::reset_connection_check_timer_if_not_checking() {
+ if (connection_check_state_ != ConnectionCheck::NONE) {
+ return;
+ }
+
+ reset_connection_check_timer(CONNCHK_TIMEOUT);
+}
+
+void Http2Session::connection_alive() {
+ reset_connection_check_timer(CONNCHK_TIMEOUT);
+
+ if (connection_check_state_ == ConnectionCheck::NONE) {
+ return;
+ }
+
+ if (LOG_ENABLED(INFO)) {
+ SSLOG(INFO, this) << "Connection alive";
+ }
+
+ connection_check_state_ = ConnectionCheck::NONE;
+
+ submit_pending_requests();
+}
+
+void Http2Session::submit_pending_requests() {
+ for (auto dconn = dconns_.head; dconn; dconn = dconn->dlnext) {
+ auto downstream = dconn->get_downstream();
+
+ if (!downstream->get_request_pending() ||
+ !downstream->request_submission_ready()) {
+ continue;
+ }
+
+ auto &req = downstream->request();
+ if (req.connect_proto != ConnectProto::NONE && !settings_recved_) {
+ continue;
+ }
+
+ auto upstream = downstream->get_upstream();
+
+ if (dconn->push_request_headers() != 0) {
+ if (LOG_ENABLED(INFO)) {
+ SSLOG(INFO, this) << "backend request failed";
+ }
+
+ upstream->on_downstream_abort_request(downstream, 400);
+
+ continue;
+ }
+
+ upstream->resume_read(SHRPX_NO_BUFFER, downstream, 0);
+ }
+}
+
+void Http2Session::set_connection_check_state(ConnectionCheck state) {
+ connection_check_state_ = state;
+}
+
+ConnectionCheck Http2Session::get_connection_check_state() const {
+ return connection_check_state_;
+}
+
+int Http2Session::noop() { return 0; }
+
+int Http2Session::read_noop(const uint8_t *data, size_t datalen) { return 0; }
+
+int Http2Session::write_noop() { return 0; }
+
+int Http2Session::connected() {
+ auto sock_error = util::get_socket_error(conn_.fd);
+ if (sock_error != 0) {
+ SSLOG(WARN, this) << "Backend connect failed; addr="
+ << util::to_numeric_addr(raddr_)
+ << ": errno=" << sock_error;
+
+ downstream_failure(addr_, raddr_);
+
+ return -1;
+ }
+
+ if (LOG_ENABLED(INFO)) {
+ SSLOG(INFO, this) << "Connection established";
+ }
+
+ // Reset timeout for write. Previously, we set timeout for connect.
+ conn_.wt.repeat = group_->shared_addr->timeout.write;
+ ev_timer_again(conn_.loop, &conn_.wt);
+
+ conn_.rlimit.startw();
+ conn_.again_rt();
+
+ read_ = &Http2Session::read_clear;
+ write_ = &Http2Session::write_clear;
+
+ if (state_ == Http2SessionState::PROXY_CONNECTING) {
+ return do_write();
+ }
+
+ if (conn_.tls.ssl) {
+ read_ = &Http2Session::tls_handshake;
+ write_ = &Http2Session::tls_handshake;
+
+ return do_write();
+ }
+
+ if (connection_made() != 0) {
+ state_ = Http2SessionState::CONNECT_FAILING;
+ return -1;
+ }
+
+ return 0;
+}
+
+int Http2Session::read_clear() {
+ conn_.last_read = ev_now(conn_.loop);
+
+ std::array<uint8_t, 16_k> buf;
+
+ for (;;) {
+ auto nread = conn_.read_clear(buf.data(), buf.size());
+
+ if (nread == 0) {
+ return write_clear();
+ }
+
+ if (nread < 0) {
+ return nread;
+ }
+
+ if (on_read(buf.data(), nread) != 0) {
+ return -1;
+ }
+ }
+}
+
+int Http2Session::write_clear() {
+ conn_.last_read = ev_now(conn_.loop);
+
+ std::array<struct iovec, MAX_WR_IOVCNT> iov;
+
+ for (;;) {
+ if (wb_.rleft() > 0) {
+ auto iovcnt = wb_.riovec(iov.data(), iov.size());
+ auto nwrite = conn_.writev_clear(iov.data(), iovcnt);
+
+ if (nwrite == 0) {
+ return 0;
+ }
+
+ if (nwrite < 0) {
+ // We may have pending data in receive buffer which may
+ // contain part of response body. So keep reading. Invoke
+ // read event to get read(2) error just in case.
+ ev_feed_event(conn_.loop, &conn_.rev, EV_READ);
+ write_ = &Http2Session::write_void;
+ break;
+ }
+
+ wb_.drain(nwrite);
+ continue;
+ }
+
+ if (on_write() != 0) {
+ return -1;
+ }
+ if (wb_.rleft() == 0) {
+ break;
+ }
+ }
+
+ conn_.wlimit.stopw();
+ ev_timer_stop(conn_.loop, &conn_.wt);
+
+ return 0;
+}
+
+int Http2Session::tls_handshake() {
+ conn_.last_read = ev_now(conn_.loop);
+
+ ERR_clear_error();
+
+ auto rv = conn_.tls_handshake();
+
+ if (rv == SHRPX_ERR_INPROGRESS) {
+ return 0;
+ }
+
+ if (rv < 0) {
+ downstream_failure(addr_, raddr_);
+
+ return rv;
+ }
+
+ if (LOG_ENABLED(INFO)) {
+ SSLOG(INFO, this) << "SSL/TLS handshake completed";
+ }
+
+ if (!get_config()->tls.insecure &&
+ tls::check_cert(conn_.tls.ssl, addr_, raddr_) != 0) {
+ downstream_failure(addr_, raddr_);
+
+ return -1;
+ }
+
+ read_ = &Http2Session::read_tls;
+ write_ = &Http2Session::write_tls;
+
+ if (connection_made() != 0) {
+ state_ = Http2SessionState::CONNECT_FAILING;
+ return -1;
+ }
+
+ return 0;
+}
+
+int Http2Session::read_tls() {
+ conn_.last_read = ev_now(conn_.loop);
+
+ std::array<uint8_t, 16_k> buf;
+
+ ERR_clear_error();
+
+ for (;;) {
+ auto nread = conn_.read_tls(buf.data(), buf.size());
+
+ if (nread == 0) {
+ return write_tls();
+ }
+
+ if (nread < 0) {
+ return nread;
+ }
+
+ if (on_read(buf.data(), nread) != 0) {
+ return -1;
+ }
+ }
+}
+
+int Http2Session::write_tls() {
+ conn_.last_read = ev_now(conn_.loop);
+
+ ERR_clear_error();
+
+ struct iovec iov;
+
+ for (;;) {
+ if (wb_.rleft() > 0) {
+ auto iovcnt = wb_.riovec(&iov, 1);
+ if (iovcnt != 1) {
+ assert(0);
+ return -1;
+ }
+ auto nwrite = conn_.write_tls(iov.iov_base, iov.iov_len);
+
+ if (nwrite == 0) {
+ return 0;
+ }
+
+ if (nwrite < 0) {
+ // We may have pending data in receive buffer which may
+ // contain part of response body. So keep reading. Invoke
+ // read event to get read(2) error just in case.
+ ev_feed_event(conn_.loop, &conn_.rev, EV_READ);
+ write_ = &Http2Session::write_void;
+ break;
+ }
+
+ wb_.drain(nwrite);
+
+ continue;
+ }
+
+ if (on_write() != 0) {
+ return -1;
+ }
+ if (wb_.rleft() == 0) {
+ conn_.start_tls_write_idle();
+ break;
+ }
+ }
+
+ conn_.wlimit.stopw();
+ ev_timer_stop(conn_.loop, &conn_.wt);
+
+ return 0;
+}
+
+int Http2Session::write_void() {
+ conn_.wlimit.stopw();
+ return 0;
+}
+
+bool Http2Session::should_hard_fail() const {
+ switch (state_) {
+ case Http2SessionState::PROXY_CONNECTING:
+ case Http2SessionState::PROXY_FAILED:
+ return true;
+ case Http2SessionState::DISCONNECTED: {
+ const auto &proxy = get_config()->downstream_http_proxy;
+ return !proxy.host.empty();
+ }
+ default:
+ return false;
+ }
+}
+
+DownstreamAddr *Http2Session::get_addr() const { return addr_; }
+
+int Http2Session::handle_downstream_push_promise(Downstream *downstream,
+ int32_t promised_stream_id) {
+ auto upstream = downstream->get_upstream();
+ if (!upstream->push_enabled()) {
+ return -1;
+ }
+
+ auto promised_downstream =
+ upstream->on_downstream_push_promise(downstream, promised_stream_id);
+ if (!promised_downstream) {
+ return -1;
+ }
+
+ // Now we have Downstream object for pushed stream.
+ // promised_downstream->get_stream() still returns 0.
+
+ auto handler = upstream->get_client_handler();
+
+ auto promised_dconn = std::make_unique<Http2DownstreamConnection>(this);
+ promised_dconn->set_client_handler(handler);
+
+ auto ptr = promised_dconn.get();
+
+ if (promised_downstream->attach_downstream_connection(
+ std::move(promised_dconn)) != 0) {
+ return -1;
+ }
+
+ auto promised_sd = std::make_unique<StreamData>();
+
+ nghttp2_session_set_stream_user_data(session_, promised_stream_id,
+ promised_sd.get());
+
+ ptr->attach_stream_data(promised_sd.get());
+ streams_.append(promised_sd.release());
+
+ return 0;
+}
+
+int Http2Session::handle_downstream_push_promise_complete(
+ Downstream *downstream, Downstream *promised_downstream) {
+ auto &promised_req = promised_downstream->request();
+
+ auto &promised_balloc = promised_downstream->get_block_allocator();
+
+ auto authority = promised_req.fs.header(http2::HD__AUTHORITY);
+ auto path = promised_req.fs.header(http2::HD__PATH);
+ auto method = promised_req.fs.header(http2::HD__METHOD);
+ auto scheme = promised_req.fs.header(http2::HD__SCHEME);
+
+ if (!authority) {
+ authority = promised_req.fs.header(http2::HD_HOST);
+ }
+
+ auto method_token = http2::lookup_method_token(method->value);
+ if (method_token == -1) {
+ if (LOG_ENABLED(INFO)) {
+ SSLOG(INFO, this) << "Unrecognized method: " << method->value;
+ }
+
+ return -1;
+ }
+
+ // TODO Rewrite authority if we enabled rewrite host. But we
+ // really don't know how to rewrite host. Should we use the same
+ // host in associated stream?
+ if (authority) {
+ promised_req.authority = authority->value;
+ }
+ promised_req.method = method_token;
+ // libnghttp2 ensures that we don't have CONNECT method in
+ // PUSH_PROMISE, and guarantees that :scheme exists.
+ if (scheme) {
+ promised_req.scheme = scheme->value;
+ }
+
+ // For server-wide OPTIONS request, path is empty.
+ if (method_token != HTTP_OPTIONS || path->value != "*") {
+ promised_req.path = http2::rewrite_clean_path(promised_balloc, path->value);
+ }
+
+ promised_downstream->inspect_http2_request();
+
+ auto upstream = promised_downstream->get_upstream();
+
+ promised_downstream->set_request_state(DownstreamState::MSG_COMPLETE);
+ promised_downstream->set_request_header_sent(true);
+
+ if (upstream->on_downstream_push_promise_complete(downstream,
+ promised_downstream) != 0) {
+ return -1;
+ }
+
+ return 0;
+}
+
+size_t Http2Session::get_num_dconns() const { return dconns_.size(); }
+
+bool Http2Session::max_concurrency_reached(size_t extra) const {
+ if (!session_) {
+ return dconns_.size() + extra >= 100;
+ }
+
+ // If session does not allow further requests, it effectively means
+ // that maximum concurrency is reached.
+ return !nghttp2_session_check_request_allowed(session_) ||
+ dconns_.size() + extra >=
+ nghttp2_session_get_remote_settings(
+ session_, NGHTTP2_SETTINGS_MAX_CONCURRENT_STREAMS);
+}
+
+const std::shared_ptr<DownstreamAddrGroup> &
+Http2Session::get_downstream_addr_group() const {
+ return group_;
+}
+
+void Http2Session::add_to_extra_freelist() {
+ if (freelist_zone_ != FreelistZone::NONE) {
+ return;
+ }
+
+ if (LOG_ENABLED(INFO)) {
+ SSLOG(INFO, this) << "Append to http2_extra_freelist, addr=" << addr_
+ << ", freelist.size="
+ << addr_->http2_extra_freelist.size();
+ }
+
+ freelist_zone_ = FreelistZone::EXTRA;
+ addr_->http2_extra_freelist.append(this);
+}
+
+void Http2Session::remove_from_freelist() {
+ switch (freelist_zone_) {
+ case FreelistZone::NONE:
+ return;
+ case FreelistZone::EXTRA:
+ if (LOG_ENABLED(INFO)) {
+ SSLOG(INFO, this) << "Remove from http2_extra_freelist, addr=" << addr_
+ << ", freelist.size="
+ << addr_->http2_extra_freelist.size();
+ }
+ addr_->http2_extra_freelist.remove(this);
+ break;
+ case FreelistZone::GONE:
+ return;
+ }
+
+ freelist_zone_ = FreelistZone::NONE;
+}
+
+void Http2Session::exclude_from_scheduling() {
+ remove_from_freelist();
+ freelist_zone_ = FreelistZone::GONE;
+}
+
+DefaultMemchunks *Http2Session::get_request_buf() { return &wb_; }
+
+void Http2Session::on_timeout() {
+ switch (state_) {
+ case Http2SessionState::PROXY_CONNECTING: {
+ auto worker_blocker = worker_->get_connect_blocker();
+ worker_blocker->on_failure();
+ break;
+ }
+ case Http2SessionState::CONNECTING:
+ SSLOG(WARN, this) << "Connect time out; addr="
+ << util::to_numeric_addr(raddr_);
+
+ downstream_failure(addr_, raddr_);
+ break;
+ default:
+ break;
+ }
+}
+
+void Http2Session::check_retire() {
+ if (!group_->retired) {
+ return;
+ }
+
+ ev_prepare_stop(conn_.loop, &prep_);
+
+ if (!session_) {
+ return;
+ }
+
+ auto last_stream_id = nghttp2_session_get_last_proc_stream_id(session_);
+ nghttp2_submit_goaway(session_, NGHTTP2_FLAG_NONE, last_stream_id,
+ NGHTTP2_NO_ERROR, nullptr, 0);
+
+ signal_write();
+}
+
+const Address *Http2Session::get_raddr() const { return raddr_; }
+
+void Http2Session::on_settings_received(const nghttp2_frame *frame) {
+ // TODO This effectively disallows nghttpx to change its behaviour
+ // based on the 2nd SETTINGS.
+ if (settings_recved_) {
+ return;
+ }
+
+ settings_recved_ = true;
+
+ for (size_t i = 0; i < frame->settings.niv; ++i) {
+ auto &ent = frame->settings.iv[i];
+ if (ent.settings_id == NGHTTP2_SETTINGS_ENABLE_CONNECT_PROTOCOL) {
+ allow_connect_proto_ = true;
+ break;
+ }
+ }
+
+ submit_pending_requests();
+}
+
+bool Http2Session::get_allow_connect_proto() const {
+ return allow_connect_proto_;
+}
+
+} // namespace shrpx