summaryrefslogtreecommitdiffstats
path: root/src/shrpx_http2_downstream_connection.cc
diff options
context:
space:
mode:
Diffstat (limited to 'src/shrpx_http2_downstream_connection.cc')
-rw-r--r--src/shrpx_http2_downstream_connection.cc621
1 files changed, 621 insertions, 0 deletions
diff --git a/src/shrpx_http2_downstream_connection.cc b/src/shrpx_http2_downstream_connection.cc
new file mode 100644
index 0000000..d27dcc1
--- /dev/null
+++ b/src/shrpx_http2_downstream_connection.cc
@@ -0,0 +1,621 @@
+/*
+ * nghttp2 - HTTP/2 C Library
+ *
+ * Copyright (c) 2012 Tatsuhiro Tsujikawa
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining
+ * a copy of this software and associated documentation files (the
+ * "Software"), to deal in the Software without restriction, including
+ * without limitation the rights to use, copy, modify, merge, publish,
+ * distribute, sublicense, and/or sell copies of the Software, and to
+ * permit persons to whom the Software is furnished to do so, subject to
+ * the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be
+ * included in all copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
+ * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
+ * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
+ * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
+ * LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
+ * OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
+ * WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
+ */
+#include "shrpx_http2_downstream_connection.h"
+
+#ifdef HAVE_UNISTD_H
+# include <unistd.h>
+#endif // HAVE_UNISTD_H
+
+#include "llhttp.h"
+
+#include "shrpx_client_handler.h"
+#include "shrpx_upstream.h"
+#include "shrpx_downstream.h"
+#include "shrpx_config.h"
+#include "shrpx_error.h"
+#include "shrpx_http.h"
+#include "shrpx_http2_session.h"
+#include "shrpx_worker.h"
+#include "shrpx_log.h"
+#include "http2.h"
+#include "util.h"
+#include "ssl_compat.h"
+
+using namespace nghttp2;
+
+namespace shrpx {
+
+Http2DownstreamConnection::Http2DownstreamConnection(Http2Session *http2session)
+ : dlnext(nullptr),
+ dlprev(nullptr),
+ http2session_(http2session),
+ sd_(nullptr) {}
+
+Http2DownstreamConnection::~Http2DownstreamConnection() {
+ if (LOG_ENABLED(INFO)) {
+ DCLOG(INFO, this) << "Deleting";
+ }
+ if (downstream_) {
+ downstream_->disable_downstream_rtimer();
+ downstream_->disable_downstream_wtimer();
+
+ uint32_t error_code;
+ if (downstream_->get_request_state() == DownstreamState::STREAM_CLOSED &&
+ downstream_->get_upgraded()) {
+ // For upgraded connection, send NO_ERROR. Should we consider
+ // request states other than DownstreamState::STREAM_CLOSED ?
+ error_code = NGHTTP2_NO_ERROR;
+ } else {
+ error_code = NGHTTP2_INTERNAL_ERROR;
+ }
+
+ if (http2session_->get_state() == Http2SessionState::CONNECTED &&
+ downstream_->get_downstream_stream_id() != -1) {
+ submit_rst_stream(downstream_, error_code);
+
+ auto &resp = downstream_->response();
+
+ http2session_->consume(downstream_->get_downstream_stream_id(),
+ resp.unconsumed_body_length);
+
+ resp.unconsumed_body_length = 0;
+
+ http2session_->signal_write();
+ }
+ }
+ http2session_->remove_downstream_connection(this);
+
+ if (LOG_ENABLED(INFO)) {
+ DCLOG(INFO, this) << "Deleted";
+ }
+}
+
+int Http2DownstreamConnection::attach_downstream(Downstream *downstream) {
+ if (LOG_ENABLED(INFO)) {
+ DCLOG(INFO, this) << "Attaching to DOWNSTREAM:" << downstream;
+ }
+ http2session_->add_downstream_connection(this);
+ http2session_->signal_write();
+
+ downstream_ = downstream;
+ downstream_->reset_downstream_rtimer();
+
+ auto &req = downstream_->request();
+
+ // HTTP/2 disables HTTP Upgrade.
+ if (req.method != HTTP_CONNECT && req.connect_proto == ConnectProto::NONE) {
+ req.upgrade_request = false;
+ }
+
+ return 0;
+}
+
+void Http2DownstreamConnection::detach_downstream(Downstream *downstream) {
+ if (LOG_ENABLED(INFO)) {
+ DCLOG(INFO, this) << "Detaching from DOWNSTREAM:" << downstream;
+ }
+
+ auto &resp = downstream_->response();
+
+ if (downstream_->get_downstream_stream_id() != -1) {
+ if (submit_rst_stream(downstream) == 0) {
+ http2session_->signal_write();
+ }
+
+ http2session_->consume(downstream_->get_downstream_stream_id(),
+ resp.unconsumed_body_length);
+
+ resp.unconsumed_body_length = 0;
+
+ http2session_->signal_write();
+ }
+
+ downstream->disable_downstream_rtimer();
+ downstream->disable_downstream_wtimer();
+ downstream_ = nullptr;
+}
+
+int Http2DownstreamConnection::submit_rst_stream(Downstream *downstream,
+ uint32_t error_code) {
+ int rv = -1;
+ if (http2session_->get_state() == Http2SessionState::CONNECTED &&
+ downstream->get_downstream_stream_id() != -1) {
+ switch (downstream->get_response_state()) {
+ case DownstreamState::MSG_RESET:
+ case DownstreamState::MSG_BAD_HEADER:
+ case DownstreamState::MSG_COMPLETE:
+ break;
+ default:
+ if (LOG_ENABLED(INFO)) {
+ DCLOG(INFO, this) << "Submit RST_STREAM for DOWNSTREAM:" << downstream
+ << ", stream_id="
+ << downstream->get_downstream_stream_id()
+ << ", error_code=" << error_code;
+ }
+ rv = http2session_->submit_rst_stream(
+ downstream->get_downstream_stream_id(), error_code);
+ }
+ }
+ return rv;
+}
+
+namespace {
+ssize_t http2_data_read_callback(nghttp2_session *session, int32_t stream_id,
+ uint8_t *buf, size_t length,
+ uint32_t *data_flags,
+ nghttp2_data_source *source, void *user_data) {
+ int rv;
+ auto sd = static_cast<StreamData *>(
+ nghttp2_session_get_stream_user_data(session, stream_id));
+ if (!sd || !sd->dconn) {
+ return NGHTTP2_ERR_DEFERRED;
+ }
+ auto dconn = sd->dconn;
+ auto downstream = dconn->get_downstream();
+ if (!downstream) {
+ // In this case, RST_STREAM should have been issued. But depending
+ // on the priority, DATA frame may come first.
+ return NGHTTP2_ERR_DEFERRED;
+ }
+ const auto &req = downstream->request();
+ auto input = downstream->get_request_buf();
+
+ auto nread = std::min(input->rleft(), length);
+ auto input_empty = input->rleft() == nread;
+
+ *data_flags |= NGHTTP2_DATA_FLAG_NO_COPY;
+
+ if (input_empty &&
+ downstream->get_request_state() == DownstreamState::MSG_COMPLETE &&
+ // If connection is upgraded, don't set EOF flag, since HTTP/1
+ // will set MSG_COMPLETE to request state after upgrade response
+ // header is seen.
+ (!req.upgrade_request ||
+ (downstream->get_response_state() == DownstreamState::HEADER_COMPLETE &&
+ !downstream->get_upgraded()))) {
+
+ *data_flags |= NGHTTP2_DATA_FLAG_EOF;
+
+ const auto &trailers = req.fs.trailers();
+ if (!trailers.empty()) {
+ std::vector<nghttp2_nv> nva;
+ nva.reserve(trailers.size());
+ http2::copy_headers_to_nva_nocopy(nva, trailers, http2::HDOP_STRIP_ALL);
+ if (!nva.empty()) {
+ rv = nghttp2_submit_trailer(session, stream_id, nva.data(), nva.size());
+ if (rv != 0) {
+ if (nghttp2_is_fatal(rv)) {
+ return NGHTTP2_ERR_CALLBACK_FAILURE;
+ }
+ } else {
+ *data_flags |= NGHTTP2_DATA_FLAG_NO_END_STREAM;
+ }
+ }
+ }
+ }
+
+ if (nread == 0 && (*data_flags & NGHTTP2_DATA_FLAG_EOF) == 0) {
+ downstream->disable_downstream_wtimer();
+
+ return NGHTTP2_ERR_DEFERRED;
+ }
+
+ return nread;
+}
+} // namespace
+
+int Http2DownstreamConnection::push_request_headers() {
+ int rv;
+ if (!downstream_) {
+ return 0;
+ }
+ if (!http2session_->can_push_request(downstream_)) {
+ // The HTTP2 session to the backend has not been established or
+ // connection is now being checked. This function will be called
+ // again just after it is established.
+ downstream_->set_request_pending(true);
+ http2session_->start_checking_connection();
+ return 0;
+ }
+
+ downstream_->set_request_pending(false);
+
+ const auto &req = downstream_->request();
+
+ if (req.connect_proto != ConnectProto::NONE &&
+ !http2session_->get_allow_connect_proto()) {
+ return -1;
+ }
+
+ auto &balloc = downstream_->get_block_allocator();
+
+ auto config = get_config();
+ auto &httpconf = config->http;
+ auto &http2conf = config->http2;
+
+ auto no_host_rewrite = httpconf.no_host_rewrite || config->http2_proxy ||
+ req.regular_connect_method();
+
+ // http2session_ has already in CONNECTED state, so we can get
+ // addr_idx here.
+ const auto &downstream_hostport = http2session_->get_addr()->hostport;
+
+ // For HTTP/1.0 request, there is no authority in request. In that
+ // case, we use backend server's host nonetheless.
+ auto authority = StringRef(downstream_hostport);
+
+ if (no_host_rewrite && !req.authority.empty()) {
+ authority = req.authority;
+ }
+
+ downstream_->set_request_downstream_host(authority);
+
+ size_t num_cookies = 0;
+ if (!http2conf.no_cookie_crumbling) {
+ num_cookies = downstream_->count_crumble_request_cookie();
+ }
+
+ // 11 means:
+ // 1. :method
+ // 2. :scheme
+ // 3. :path
+ // 4. :authority (or host)
+ // 5. :protocol (optional)
+ // 6. via (optional)
+ // 7. x-forwarded-for (optional)
+ // 8. x-forwarded-proto (optional)
+ // 9. te (optional)
+ // 10. forwarded (optional)
+ // 11. early-data (optional)
+ auto nva = std::vector<nghttp2_nv>();
+ nva.reserve(req.fs.headers().size() + 11 + num_cookies +
+ httpconf.add_request_headers.size());
+
+ if (req.connect_proto == ConnectProto::WEBSOCKET) {
+ nva.push_back(http2::make_nv_ll(":method", "CONNECT"));
+ nva.push_back(http2::make_nv_ll(":protocol", "websocket"));
+ } else {
+ nva.push_back(http2::make_nv_ls_nocopy(
+ ":method", http2::to_method_string(req.method)));
+ }
+
+ if (!req.regular_connect_method()) {
+ assert(!req.scheme.empty());
+
+ auto addr = http2session_->get_addr();
+ assert(addr);
+ // We will handle more protocol scheme upgrade in the future.
+ if (addr->tls && addr->upgrade_scheme && req.scheme == "http") {
+ nva.push_back(http2::make_nv_ll(":scheme", "https"));
+ } else {
+ nva.push_back(http2::make_nv_ls_nocopy(":scheme", req.scheme));
+ }
+
+ if (req.method == HTTP_OPTIONS && req.path.empty()) {
+ nva.push_back(http2::make_nv_ll(":path", "*"));
+ } else {
+ nva.push_back(http2::make_nv_ls_nocopy(":path", req.path));
+ }
+
+ if (!req.no_authority || req.connect_proto != ConnectProto::NONE) {
+ nva.push_back(http2::make_nv_ls_nocopy(":authority", authority));
+ } else {
+ nva.push_back(http2::make_nv_ls_nocopy("host", authority));
+ }
+ } else {
+ nva.push_back(http2::make_nv_ls_nocopy(":authority", authority));
+ }
+
+ auto &fwdconf = httpconf.forwarded;
+ auto &xffconf = httpconf.xff;
+ auto &xfpconf = httpconf.xfp;
+ auto &earlydataconf = httpconf.early_data;
+
+ uint32_t build_flags =
+ (fwdconf.strip_incoming ? http2::HDOP_STRIP_FORWARDED : 0) |
+ (xffconf.strip_incoming ? http2::HDOP_STRIP_X_FORWARDED_FOR : 0) |
+ (xfpconf.strip_incoming ? http2::HDOP_STRIP_X_FORWARDED_PROTO : 0) |
+ (earlydataconf.strip_incoming ? http2::HDOP_STRIP_EARLY_DATA : 0) |
+ http2::HDOP_STRIP_SEC_WEBSOCKET_KEY;
+
+ http2::copy_headers_to_nva_nocopy(nva, req.fs.headers(), build_flags);
+
+ if (!http2conf.no_cookie_crumbling) {
+ downstream_->crumble_request_cookie(nva);
+ }
+
+ auto upstream = downstream_->get_upstream();
+ auto handler = upstream->get_client_handler();
+
+#if OPENSSL_1_1_1_API
+ auto conn = handler->get_connection();
+
+ if (conn->tls.ssl && !SSL_is_init_finished(conn->tls.ssl)) {
+ nva.push_back(http2::make_nv_ll("early-data", "1"));
+ }
+#endif // OPENSSL_1_1_1_API
+
+ auto fwd =
+ fwdconf.strip_incoming ? nullptr : req.fs.header(http2::HD_FORWARDED);
+
+ if (fwdconf.params) {
+ auto params = fwdconf.params;
+
+ if (config->http2_proxy || req.regular_connect_method()) {
+ params &= ~FORWARDED_PROTO;
+ }
+
+ auto value = http::create_forwarded(
+ balloc, params, handler->get_forwarded_by(),
+ handler->get_forwarded_for(), req.authority, req.scheme);
+
+ if (fwd || !value.empty()) {
+ if (fwd) {
+ if (value.empty()) {
+ value = fwd->value;
+ } else {
+ value = concat_string_ref(balloc, fwd->value,
+ StringRef::from_lit(", "), value);
+ }
+ }
+
+ nva.push_back(http2::make_nv_ls_nocopy("forwarded", value));
+ }
+ } else if (fwd) {
+ nva.push_back(http2::make_nv_ls_nocopy("forwarded", fwd->value));
+ }
+
+ auto xff = xffconf.strip_incoming ? nullptr
+ : req.fs.header(http2::HD_X_FORWARDED_FOR);
+
+ if (xffconf.add) {
+ StringRef xff_value;
+ const auto &addr = upstream->get_client_handler()->get_ipaddr();
+ if (xff) {
+ xff_value = concat_string_ref(balloc, xff->value,
+ StringRef::from_lit(", "), addr);
+ } else {
+ xff_value = addr;
+ }
+ nva.push_back(http2::make_nv_ls_nocopy("x-forwarded-for", xff_value));
+ } else if (xff) {
+ nva.push_back(http2::make_nv_ls_nocopy("x-forwarded-for", xff->value));
+ }
+
+ if (!config->http2_proxy && !req.regular_connect_method()) {
+ auto xfp = xfpconf.strip_incoming
+ ? nullptr
+ : req.fs.header(http2::HD_X_FORWARDED_PROTO);
+
+ if (xfpconf.add) {
+ StringRef xfp_value;
+ // We use same protocol with :scheme header field
+ if (xfp) {
+ xfp_value = concat_string_ref(balloc, xfp->value,
+ StringRef::from_lit(", "), req.scheme);
+ } else {
+ xfp_value = req.scheme;
+ }
+ nva.push_back(http2::make_nv_ls_nocopy("x-forwarded-proto", xfp_value));
+ } else if (xfp) {
+ nva.push_back(http2::make_nv_ls_nocopy("x-forwarded-proto", xfp->value));
+ }
+ }
+
+ auto via = req.fs.header(http2::HD_VIA);
+ if (httpconf.no_via) {
+ if (via) {
+ nva.push_back(http2::make_nv_ls_nocopy("via", (*via).value));
+ }
+ } else {
+ size_t vialen = 16;
+ if (via) {
+ vialen += via->value.size() + 2;
+ }
+
+ auto iov = make_byte_ref(balloc, vialen + 1);
+ auto p = iov.base;
+
+ if (via) {
+ p = std::copy(std::begin(via->value), std::end(via->value), p);
+ p = util::copy_lit(p, ", ");
+ }
+ p = http::create_via_header_value(p, req.http_major, req.http_minor);
+ *p = '\0';
+
+ nva.push_back(http2::make_nv_ls_nocopy("via", StringRef{iov.base, p}));
+ }
+
+ auto te = req.fs.header(http2::HD_TE);
+ // HTTP/1 upstream request can contain keyword other than
+ // "trailers". We just forward "trailers".
+ // TODO more strict handling required here.
+ if (te && http2::contains_trailers(te->value)) {
+ nva.push_back(http2::make_nv_ll("te", "trailers"));
+ }
+
+ for (auto &p : httpconf.add_request_headers) {
+ nva.push_back(http2::make_nv_nocopy(p.name, p.value));
+ }
+
+ if (LOG_ENABLED(INFO)) {
+ std::stringstream ss;
+ for (auto &nv : nva) {
+ if (util::streq_l("authorization", nv.name, nv.namelen)) {
+ ss << TTY_HTTP_HD << StringRef{nv.name, nv.namelen} << TTY_RST
+ << ": <redacted>\n";
+ continue;
+ }
+ ss << TTY_HTTP_HD << StringRef{nv.name, nv.namelen} << TTY_RST << ": "
+ << StringRef{nv.value, nv.valuelen} << "\n";
+ }
+ DCLOG(INFO, this) << "HTTP request headers\n" << ss.str();
+ }
+
+ auto transfer_encoding = req.fs.header(http2::HD_TRANSFER_ENCODING);
+
+ nghttp2_data_provider *data_prdptr = nullptr;
+ nghttp2_data_provider data_prd;
+
+ // Add body as long as transfer-encoding is given even if
+ // req.fs.content_length == 0 to forward trailer fields.
+ if (req.method == HTTP_CONNECT || req.connect_proto != ConnectProto::NONE ||
+ transfer_encoding || req.fs.content_length > 0 || req.http2_expect_body) {
+ // Request-body is expected.
+ data_prd = {{}, http2_data_read_callback};
+ data_prdptr = &data_prd;
+ }
+
+ rv = http2session_->submit_request(this, nva.data(), nva.size(), data_prdptr);
+ if (rv != 0) {
+ DCLOG(FATAL, this) << "nghttp2_submit_request() failed";
+ return -1;
+ }
+
+ if (data_prdptr) {
+ downstream_->reset_downstream_wtimer();
+ }
+
+ http2session_->signal_write();
+ return 0;
+}
+
+int Http2DownstreamConnection::push_upload_data_chunk(const uint8_t *data,
+ size_t datalen) {
+ if (!downstream_->get_request_header_sent()) {
+ auto output = downstream_->get_blocked_request_buf();
+ auto &req = downstream_->request();
+ output->append(data, datalen);
+ req.unconsumed_body_length += datalen;
+ return 0;
+ }
+
+ int rv;
+ auto output = downstream_->get_request_buf();
+ output->append(data, datalen);
+ if (downstream_->get_downstream_stream_id() != -1) {
+ rv = http2session_->resume_data(this);
+ if (rv != 0) {
+ return -1;
+ }
+
+ downstream_->ensure_downstream_wtimer();
+
+ http2session_->signal_write();
+ }
+ return 0;
+}
+
+int Http2DownstreamConnection::end_upload_data() {
+ if (!downstream_->get_request_header_sent()) {
+ downstream_->set_blocked_request_data_eof(true);
+ return 0;
+ }
+
+ int rv;
+ if (downstream_->get_downstream_stream_id() != -1) {
+ rv = http2session_->resume_data(this);
+ if (rv != 0) {
+ return -1;
+ }
+
+ downstream_->ensure_downstream_wtimer();
+
+ http2session_->signal_write();
+ }
+ return 0;
+}
+
+int Http2DownstreamConnection::resume_read(IOCtrlReason reason,
+ size_t consumed) {
+ int rv;
+
+ if (http2session_->get_state() != Http2SessionState::CONNECTED) {
+ return 0;
+ }
+
+ if (!downstream_ || downstream_->get_downstream_stream_id() == -1) {
+ return 0;
+ }
+
+ if (consumed > 0) {
+ rv = http2session_->consume(downstream_->get_downstream_stream_id(),
+ consumed);
+
+ if (rv != 0) {
+ return -1;
+ }
+
+ auto &resp = downstream_->response();
+
+ resp.unconsumed_body_length -= consumed;
+
+ http2session_->signal_write();
+ }
+
+ return 0;
+}
+
+int Http2DownstreamConnection::on_read() { return 0; }
+
+int Http2DownstreamConnection::on_write() { return 0; }
+
+void Http2DownstreamConnection::attach_stream_data(StreamData *sd) {
+ // It is possible sd->dconn is not NULL. sd is detached when
+ // on_stream_close_callback. Before that, after MSG_COMPLETE is set
+ // to Downstream::set_response_state(), upstream's readcb is called
+ // and execution path eventually could reach here. Since the
+ // response was already handled, we just detach sd.
+ detach_stream_data();
+ sd_ = sd;
+ sd_->dconn = this;
+}
+
+StreamData *Http2DownstreamConnection::detach_stream_data() {
+ if (sd_) {
+ auto sd = sd_;
+ sd_ = nullptr;
+ sd->dconn = nullptr;
+ return sd;
+ }
+ return nullptr;
+}
+
+int Http2DownstreamConnection::on_timeout() {
+ if (!downstream_) {
+ return 0;
+ }
+
+ return submit_rst_stream(downstream_, NGHTTP2_NO_ERROR);
+}
+
+const std::shared_ptr<DownstreamAddrGroup> &
+Http2DownstreamConnection::get_downstream_addr_group() const {
+ return http2session_->get_downstream_addr_group();
+}
+
+DownstreamAddr *Http2DownstreamConnection::get_addr() const { return nullptr; }
+
+} // namespace shrpx