summaryrefslogtreecommitdiffstats
path: root/src/shrpx_http3_upstream.cc
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-10 19:37:08 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-10 19:37:08 +0000
commitd710a65c8b50bc3d4d0920dc6e865296f42edd5e (patch)
treed3bf9843448af9398b55f49a50a194bbaacd724e /src/shrpx_http3_upstream.cc
parentInitial commit. (diff)
downloadnghttp2-d710a65c8b50bc3d4d0920dc6e865296f42edd5e.tar.xz
nghttp2-d710a65c8b50bc3d4d0920dc6e865296f42edd5e.zip
Adding upstream version 1.59.0.upstream/1.59.0
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'src/shrpx_http3_upstream.cc')
-rw-r--r--src/shrpx_http3_upstream.cc2906
1 files changed, 2906 insertions, 0 deletions
diff --git a/src/shrpx_http3_upstream.cc b/src/shrpx_http3_upstream.cc
new file mode 100644
index 0000000..f8ae1ce
--- /dev/null
+++ b/src/shrpx_http3_upstream.cc
@@ -0,0 +1,2906 @@
+/*
+ * nghttp2 - HTTP/2 C Library
+ *
+ * Copyright (c) 2021 Tatsuhiro Tsujikawa
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining
+ * a copy of this software and associated documentation files (the
+ * "Software"), to deal in the Software without restriction, including
+ * without limitation the rights to use, copy, modify, merge, publish,
+ * distribute, sublicense, and/or sell copies of the Software, and to
+ * permit persons to whom the Software is furnished to do so, subject to
+ * the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be
+ * included in all copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
+ * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
+ * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
+ * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
+ * LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
+ * OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
+ * WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
+ */
+#include "shrpx_http3_upstream.h"
+
+#include <sys/types.h>
+#include <sys/stat.h>
+#include <fcntl.h>
+#include <netinet/udp.h>
+
+#include <cstdio>
+
+#include <ngtcp2/ngtcp2_crypto.h>
+
+#include "shrpx_client_handler.h"
+#include "shrpx_downstream.h"
+#include "shrpx_downstream_connection.h"
+#include "shrpx_log.h"
+#include "shrpx_quic.h"
+#include "shrpx_worker.h"
+#include "shrpx_http.h"
+#include "shrpx_connection_handler.h"
+#ifdef HAVE_MRUBY
+# include "shrpx_mruby.h"
+#endif // HAVE_MRUBY
+#include "http3.h"
+#include "util.h"
+#include "ssl_compat.h"
+
+namespace shrpx {
+
+namespace {
+void timeoutcb(struct ev_loop *loop, ev_timer *w, int revents) {
+ auto upstream = static_cast<Http3Upstream *>(w->data);
+
+ if (upstream->handle_expiry() != 0 || upstream->on_write() != 0) {
+ goto fail;
+ }
+
+ return;
+
+fail:
+ auto handler = upstream->get_client_handler();
+
+ delete handler;
+}
+} // namespace
+
+namespace {
+void shutdown_timeout_cb(struct ev_loop *loop, ev_timer *w, int revents) {
+ auto upstream = static_cast<Http3Upstream *>(w->data);
+ auto handler = upstream->get_client_handler();
+
+ if (upstream->submit_goaway() != 0) {
+ delete handler;
+ }
+}
+} // namespace
+
+namespace {
+void prepare_cb(struct ev_loop *loop, ev_prepare *w, int revent) {
+ auto upstream = static_cast<Http3Upstream *>(w->data);
+ auto handler = upstream->get_client_handler();
+
+ if (upstream->check_shutdown() != 0) {
+ delete handler;
+ }
+}
+} // namespace
+
+namespace {
+size_t downstream_queue_size(Worker *worker) {
+ auto &downstreamconf = *worker->get_downstream_config();
+
+ if (get_config()->http2_proxy) {
+ return downstreamconf.connections_per_host;
+ }
+
+ return downstreamconf.connections_per_frontend;
+}
+} // namespace
+
+namespace {
+ngtcp2_conn *get_conn(ngtcp2_crypto_conn_ref *conn_ref) {
+ auto conn = static_cast<Connection *>(conn_ref->user_data);
+ auto handler = static_cast<ClientHandler *>(conn->data);
+ auto upstream = static_cast<Http3Upstream *>(handler->get_upstream());
+ return upstream->get_conn();
+}
+} // namespace
+
+Http3Upstream::Http3Upstream(ClientHandler *handler)
+ : handler_{handler},
+ qlog_fd_{-1},
+ hashed_scid_{},
+ conn_{nullptr},
+ httpconn_{nullptr},
+ downstream_queue_{downstream_queue_size(handler->get_worker()),
+ !get_config()->http2_proxy},
+ retry_close_{false},
+ tx_{
+ .data = std::unique_ptr<uint8_t[]>(new uint8_t[64_k]),
+ } {
+ auto conn = handler_->get_connection();
+ conn->conn_ref.get_conn = shrpx::get_conn;
+
+ ev_timer_init(&timer_, timeoutcb, 0., 0.);
+ timer_.data = this;
+
+ ngtcp2_ccerr_default(&last_error_);
+
+ ev_timer_init(&shutdown_timer_, shutdown_timeout_cb, 0., 0.);
+ shutdown_timer_.data = this;
+
+ ev_prepare_init(&prep_, prepare_cb);
+ prep_.data = this;
+ ev_prepare_start(handler_->get_loop(), &prep_);
+}
+
+Http3Upstream::~Http3Upstream() {
+ auto loop = handler_->get_loop();
+
+ ev_prepare_stop(loop, &prep_);
+ ev_timer_stop(loop, &shutdown_timer_);
+ ev_timer_stop(loop, &timer_);
+
+ nghttp3_conn_del(httpconn_);
+
+ ngtcp2_conn_del(conn_);
+
+ if (qlog_fd_ != -1) {
+ close(qlog_fd_);
+ }
+}
+
+namespace {
+void log_printf(void *user_data, const char *fmt, ...) {
+ va_list ap;
+ std::array<char, 4096> buf;
+
+ va_start(ap, fmt);
+ auto nwrite = vsnprintf(buf.data(), buf.size(), fmt, ap);
+ va_end(ap);
+
+ if (static_cast<size_t>(nwrite) >= buf.size()) {
+ nwrite = buf.size() - 1;
+ }
+
+ buf[nwrite++] = '\n';
+
+ while (write(fileno(stderr), buf.data(), nwrite) == -1 && errno == EINTR)
+ ;
+}
+} // namespace
+
+namespace {
+void qlog_write(void *user_data, uint32_t flags, const void *data,
+ size_t datalen) {
+ auto upstream = static_cast<Http3Upstream *>(user_data);
+
+ upstream->qlog_write(data, datalen, flags & NGTCP2_QLOG_WRITE_FLAG_FIN);
+}
+} // namespace
+
+void Http3Upstream::qlog_write(const void *data, size_t datalen, bool fin) {
+ assert(qlog_fd_ != -1);
+
+ while (write(qlog_fd_, data, datalen) == -1 && errno == EINTR)
+ ;
+
+ if (fin) {
+ close(qlog_fd_);
+ qlog_fd_ = -1;
+ }
+}
+
+namespace {
+void rand(uint8_t *dest, size_t destlen, const ngtcp2_rand_ctx *rand_ctx) {
+ util::random_bytes(dest, dest + destlen,
+ *static_cast<std::mt19937 *>(rand_ctx->native_handle));
+}
+} // namespace
+
+namespace {
+int get_new_connection_id(ngtcp2_conn *conn, ngtcp2_cid *cid, uint8_t *token,
+ size_t cidlen, void *user_data) {
+ auto upstream = static_cast<Http3Upstream *>(user_data);
+ auto handler = upstream->get_client_handler();
+ auto worker = handler->get_worker();
+ auto conn_handler = worker->get_connection_handler();
+ auto &qkms = conn_handler->get_quic_keying_materials();
+ auto &qkm = qkms->keying_materials.front();
+
+ if (generate_quic_connection_id(*cid, cidlen, worker->get_cid_prefix(),
+ qkm.id, qkm.cid_encryption_key.data()) != 0) {
+ return NGTCP2_ERR_CALLBACK_FAILURE;
+ }
+
+ if (generate_quic_stateless_reset_token(token, *cid, qkm.secret.data(),
+ qkm.secret.size()) != 0) {
+ return NGTCP2_ERR_CALLBACK_FAILURE;
+ }
+
+ auto quic_connection_handler = worker->get_quic_connection_handler();
+
+ quic_connection_handler->add_connection_id(*cid, handler);
+
+ return 0;
+}
+} // namespace
+
+namespace {
+int remove_connection_id(ngtcp2_conn *conn, const ngtcp2_cid *cid,
+ void *user_data) {
+ auto upstream = static_cast<Http3Upstream *>(user_data);
+ auto handler = upstream->get_client_handler();
+ auto worker = handler->get_worker();
+ auto quic_conn_handler = worker->get_quic_connection_handler();
+
+ quic_conn_handler->remove_connection_id(*cid);
+
+ return 0;
+}
+} // namespace
+
+void Http3Upstream::http_begin_request_headers(int64_t stream_id) {
+ auto downstream =
+ std::make_unique<Downstream>(this, handler_->get_mcpool(), stream_id);
+ nghttp3_conn_set_stream_user_data(httpconn_, stream_id, downstream.get());
+
+ downstream->reset_upstream_rtimer();
+
+ handler_->repeat_read_timer();
+
+ auto &req = downstream->request();
+ req.http_major = 3;
+ req.http_minor = 0;
+
+ add_pending_downstream(std::move(downstream));
+}
+
+void Http3Upstream::add_pending_downstream(
+ std::unique_ptr<Downstream> downstream) {
+ downstream_queue_.add_pending(std::move(downstream));
+}
+
+namespace {
+int recv_stream_data(ngtcp2_conn *conn, uint32_t flags, int64_t stream_id,
+ uint64_t offset, const uint8_t *data, size_t datalen,
+ void *user_data, void *stream_user_data) {
+ auto upstream = static_cast<Http3Upstream *>(user_data);
+
+ if (upstream->recv_stream_data(flags, stream_id, data, datalen) != 0) {
+ return NGTCP2_ERR_CALLBACK_FAILURE;
+ }
+
+ return 0;
+}
+} // namespace
+
+int Http3Upstream::recv_stream_data(uint32_t flags, int64_t stream_id,
+ const uint8_t *data, size_t datalen) {
+ assert(httpconn_);
+
+ auto nconsumed = nghttp3_conn_read_stream(
+ httpconn_, stream_id, data, datalen, flags & NGTCP2_STREAM_DATA_FLAG_FIN);
+ if (nconsumed < 0) {
+ ULOG(ERROR, this) << "nghttp3_conn_read_stream: "
+ << nghttp3_strerror(nconsumed);
+ ngtcp2_ccerr_set_application_error(
+ &last_error_, nghttp3_err_infer_quic_app_error_code(nconsumed), nullptr,
+ 0);
+ return -1;
+ }
+
+ ngtcp2_conn_extend_max_stream_offset(conn_, stream_id, nconsumed);
+ ngtcp2_conn_extend_max_offset(conn_, nconsumed);
+
+ return 0;
+}
+
+namespace {
+int stream_close(ngtcp2_conn *conn, uint32_t flags, int64_t stream_id,
+ uint64_t app_error_code, void *user_data,
+ void *stream_user_data) {
+ auto upstream = static_cast<Http3Upstream *>(user_data);
+
+ if (!(flags & NGTCP2_STREAM_CLOSE_FLAG_APP_ERROR_CODE_SET)) {
+ app_error_code = NGHTTP3_H3_NO_ERROR;
+ }
+
+ if (upstream->stream_close(stream_id, app_error_code) != 0) {
+ return NGTCP2_ERR_CALLBACK_FAILURE;
+ }
+
+ return 0;
+}
+} // namespace
+
+int Http3Upstream::stream_close(int64_t stream_id, uint64_t app_error_code) {
+ if (!httpconn_) {
+ return 0;
+ }
+
+ auto rv = nghttp3_conn_close_stream(httpconn_, stream_id, app_error_code);
+ switch (rv) {
+ case 0:
+ break;
+ case NGHTTP3_ERR_STREAM_NOT_FOUND:
+ if (ngtcp2_is_bidi_stream(stream_id)) {
+ ngtcp2_conn_extend_max_streams_bidi(conn_, 1);
+ }
+ break;
+ default:
+ ULOG(ERROR, this) << "nghttp3_conn_close_stream: " << nghttp3_strerror(rv);
+ ngtcp2_ccerr_set_application_error(
+ &last_error_, nghttp3_err_infer_quic_app_error_code(rv), nullptr, 0);
+ return -1;
+ }
+
+ return 0;
+}
+
+namespace {
+int acked_stream_data_offset(ngtcp2_conn *conn, int64_t stream_id,
+ uint64_t offset, uint64_t datalen, void *user_data,
+ void *stream_user_data) {
+ auto upstream = static_cast<Http3Upstream *>(user_data);
+
+ if (upstream->acked_stream_data_offset(stream_id, datalen) != 0) {
+ return NGTCP2_ERR_CALLBACK_FAILURE;
+ }
+
+ return 0;
+}
+} // namespace
+
+int Http3Upstream::acked_stream_data_offset(int64_t stream_id,
+ uint64_t datalen) {
+ if (!httpconn_) {
+ return 0;
+ }
+
+ auto rv = nghttp3_conn_add_ack_offset(httpconn_, stream_id, datalen);
+ if (rv != 0) {
+ ULOG(ERROR, this) << "nghttp3_conn_add_ack_offset: "
+ << nghttp3_strerror(rv);
+ return -1;
+ }
+
+ return 0;
+}
+
+namespace {
+int extend_max_stream_data(ngtcp2_conn *conn, int64_t stream_id,
+ uint64_t max_data, void *user_data,
+ void *stream_user_data) {
+ auto upstream = static_cast<Http3Upstream *>(user_data);
+
+ if (upstream->extend_max_stream_data(stream_id) != 0) {
+ return NGTCP2_ERR_CALLBACK_FAILURE;
+ }
+
+ return 0;
+}
+} // namespace
+
+int Http3Upstream::extend_max_stream_data(int64_t stream_id) {
+ if (!httpconn_) {
+ return 0;
+ }
+
+ auto rv = nghttp3_conn_unblock_stream(httpconn_, stream_id);
+ if (rv != 0) {
+ ULOG(ERROR, this) << "nghttp3_conn_unblock_stream: "
+ << nghttp3_strerror(rv);
+ return -1;
+ }
+
+ return 0;
+}
+
+namespace {
+int extend_max_remote_streams_bidi(ngtcp2_conn *conn, uint64_t max_streams,
+ void *user_data) {
+ auto upstream = static_cast<Http3Upstream *>(user_data);
+
+ upstream->extend_max_remote_streams_bidi(max_streams);
+
+ return 0;
+}
+} // namespace
+
+void Http3Upstream::extend_max_remote_streams_bidi(uint64_t max_streams) {
+ nghttp3_conn_set_max_client_streams_bidi(httpconn_, max_streams);
+}
+
+namespace {
+int stream_reset(ngtcp2_conn *conn, int64_t stream_id, uint64_t final_size,
+ uint64_t app_error_code, void *user_data,
+ void *stream_user_data) {
+ auto upstream = static_cast<Http3Upstream *>(user_data);
+
+ if (upstream->http_shutdown_stream_read(stream_id) != 0) {
+ return NGTCP2_ERR_CALLBACK_FAILURE;
+ }
+
+ return 0;
+}
+} // namespace
+
+int Http3Upstream::http_shutdown_stream_read(int64_t stream_id) {
+ if (!httpconn_) {
+ return 0;
+ }
+
+ auto rv = nghttp3_conn_shutdown_stream_read(httpconn_, stream_id);
+ if (rv != 0) {
+ ULOG(ERROR, this) << "nghttp3_conn_shutdown_stream_read: "
+ << nghttp3_strerror(rv);
+ return -1;
+ }
+
+ return 0;
+}
+
+namespace {
+int stream_stop_sending(ngtcp2_conn *conn, int64_t stream_id,
+ uint64_t app_error_code, void *user_data,
+ void *stream_user_data) {
+ auto upstream = static_cast<Http3Upstream *>(user_data);
+
+ if (upstream->http_shutdown_stream_read(stream_id) != 0) {
+ return NGTCP2_ERR_CALLBACK_FAILURE;
+ }
+
+ return 0;
+}
+} // namespace
+
+namespace {
+int handshake_completed(ngtcp2_conn *conn, void *user_data) {
+ auto upstream = static_cast<Http3Upstream *>(user_data);
+
+ if (upstream->handshake_completed() != 0) {
+ return NGTCP2_ERR_CALLBACK_FAILURE;
+ }
+
+ return 0;
+}
+} // namespace
+
+int Http3Upstream::handshake_completed() {
+ handler_->set_alpn_from_conn();
+
+ auto alpn = handler_->get_alpn();
+ if (alpn.empty()) {
+ ULOG(ERROR, this) << "NO ALPN was negotiated";
+ return -1;
+ }
+
+ auto path = ngtcp2_conn_get_path(conn_);
+
+ return send_new_token(&path->remote);
+}
+
+namespace {
+int path_validation(ngtcp2_conn *conn, uint32_t flags, const ngtcp2_path *path,
+ const ngtcp2_path *old_path,
+ ngtcp2_path_validation_result res, void *user_data) {
+ if (res != NGTCP2_PATH_VALIDATION_RESULT_SUCCESS ||
+ !(flags & NGTCP2_PATH_VALIDATION_FLAG_NEW_TOKEN)) {
+ return 0;
+ }
+
+ auto upstream = static_cast<Http3Upstream *>(user_data);
+ if (upstream->send_new_token(&path->remote) != 0) {
+ return NGTCP2_ERR_CALLBACK_FAILURE;
+ }
+
+ return 0;
+}
+} // namespace
+
+int Http3Upstream::send_new_token(const ngtcp2_addr *remote_addr) {
+ std::array<uint8_t, NGTCP2_CRYPTO_MAX_REGULAR_TOKENLEN + 1> token;
+ size_t tokenlen;
+
+ auto worker = handler_->get_worker();
+ auto conn_handler = worker->get_connection_handler();
+ auto &qkms = conn_handler->get_quic_keying_materials();
+ auto &qkm = qkms->keying_materials.front();
+
+ if (generate_token(token.data(), tokenlen, remote_addr->addr,
+ remote_addr->addrlen, qkm.secret.data(),
+ qkm.secret.size()) != 0) {
+ return -1;
+ }
+
+ assert(tokenlen == NGTCP2_CRYPTO_MAX_REGULAR_TOKENLEN);
+
+ token[tokenlen++] = qkm.id;
+
+ auto rv = ngtcp2_conn_submit_new_token(conn_, token.data(), tokenlen);
+ if (rv != 0) {
+ ULOG(ERROR, this) << "ngtcp2_conn_submit_new_token: "
+ << ngtcp2_strerror(rv);
+ return -1;
+ }
+
+ return 0;
+}
+
+namespace {
+int recv_tx_key(ngtcp2_conn *conn, ngtcp2_encryption_level level,
+ void *user_data) {
+ if (level != NGTCP2_ENCRYPTION_LEVEL_1RTT) {
+ return 0;
+ }
+
+ auto upstream = static_cast<Http3Upstream *>(user_data);
+ if (upstream->setup_httpconn() != 0) {
+ return NGTCP2_ERR_CALLBACK_FAILURE;
+ }
+
+ return 0;
+}
+} // namespace
+
+int Http3Upstream::init(const UpstreamAddr *faddr, const Address &remote_addr,
+ const Address &local_addr,
+ const ngtcp2_pkt_hd &initial_hd,
+ const ngtcp2_cid *odcid, const uint8_t *token,
+ size_t tokenlen, ngtcp2_token_type token_type) {
+ int rv;
+
+ auto worker = handler_->get_worker();
+ auto conn_handler = worker->get_connection_handler();
+
+ auto callbacks = ngtcp2_callbacks{
+ nullptr, // client_initial
+ ngtcp2_crypto_recv_client_initial_cb,
+ ngtcp2_crypto_recv_crypto_data_cb,
+ shrpx::handshake_completed,
+ nullptr, // recv_version_negotiation
+ ngtcp2_crypto_encrypt_cb,
+ ngtcp2_crypto_decrypt_cb,
+ ngtcp2_crypto_hp_mask_cb,
+ shrpx::recv_stream_data,
+ shrpx::acked_stream_data_offset,
+ nullptr, // stream_open
+ shrpx::stream_close,
+ nullptr, // recv_stateless_reset
+ nullptr, // recv_retry
+ nullptr, // extend_max_local_streams_bidi
+ nullptr, // extend_max_local_streams_uni
+ rand,
+ get_new_connection_id,
+ remove_connection_id,
+ ngtcp2_crypto_update_key_cb,
+ shrpx::path_validation,
+ nullptr, // select_preferred_addr
+ shrpx::stream_reset,
+ shrpx::extend_max_remote_streams_bidi,
+ nullptr, // extend_max_remote_streams_uni
+ shrpx::extend_max_stream_data,
+ nullptr, // dcid_status
+ nullptr, // handshake_confirmed
+ nullptr, // recv_new_token
+ ngtcp2_crypto_delete_crypto_aead_ctx_cb,
+ ngtcp2_crypto_delete_crypto_cipher_ctx_cb,
+ nullptr, // recv_datagram
+ nullptr, // ack_datagram
+ nullptr, // lost_datagram
+ ngtcp2_crypto_get_path_challenge_data_cb,
+ shrpx::stream_stop_sending,
+ nullptr, // version_negotiation
+ nullptr, // recv_rx_key
+ shrpx::recv_tx_key,
+ };
+
+ auto config = get_config();
+ auto &quicconf = config->quic;
+ auto &http3conf = config->http3;
+
+ auto &qkms = conn_handler->get_quic_keying_materials();
+ auto &qkm = qkms->keying_materials.front();
+
+ ngtcp2_cid scid;
+
+ if (generate_quic_connection_id(scid, SHRPX_QUIC_SCIDLEN,
+ worker->get_cid_prefix(), qkm.id,
+ qkm.cid_encryption_key.data()) != 0) {
+ return -1;
+ }
+
+ ngtcp2_settings settings;
+ ngtcp2_settings_default(&settings);
+ if (quicconf.upstream.debug.log) {
+ settings.log_printf = log_printf;
+ }
+
+ if (!quicconf.upstream.qlog.dir.empty()) {
+ auto fd = open_qlog_file(quicconf.upstream.qlog.dir, scid);
+ if (fd != -1) {
+ qlog_fd_ = fd;
+ settings.qlog_write = shrpx::qlog_write;
+ }
+ }
+
+ settings.initial_ts = quic_timestamp();
+ settings.initial_rtt = static_cast<ngtcp2_tstamp>(
+ quicconf.upstream.initial_rtt * NGTCP2_SECONDS);
+ settings.cc_algo = quicconf.upstream.congestion_controller;
+ settings.max_window = http3conf.upstream.max_connection_window_size;
+ settings.max_stream_window = http3conf.upstream.max_window_size;
+ settings.max_tx_udp_payload_size = SHRPX_QUIC_MAX_UDP_PAYLOAD_SIZE;
+ settings.rand_ctx.native_handle = &worker->get_randgen();
+ settings.token = token;
+ settings.tokenlen = tokenlen;
+ settings.token_type = token_type;
+ settings.initial_pkt_num = std::uniform_int_distribution<uint32_t>(
+ 0, std::numeric_limits<int32_t>::max())(worker->get_randgen());
+
+ ngtcp2_transport_params params;
+ ngtcp2_transport_params_default(&params);
+ params.initial_max_streams_bidi = http3conf.upstream.max_concurrent_streams;
+ // The minimum number of unidirectional streams required for HTTP/3.
+ params.initial_max_streams_uni = 3;
+ params.initial_max_data = http3conf.upstream.connection_window_size;
+ params.initial_max_stream_data_bidi_remote = http3conf.upstream.window_size;
+ params.initial_max_stream_data_uni = http3conf.upstream.window_size;
+ params.max_idle_timeout = static_cast<ngtcp2_tstamp>(
+ quicconf.upstream.timeout.idle * NGTCP2_SECONDS);
+
+#ifdef NGHTTP2_OPENSSL_IS_BORINGSSL
+ if (quicconf.upstream.early_data) {
+ ngtcp2_transport_params early_data_params;
+
+ ngtcp2_transport_params_default(&early_data_params);
+
+ early_data_params.initial_max_stream_data_bidi_local =
+ params.initial_max_stream_data_bidi_local;
+ early_data_params.initial_max_stream_data_bidi_remote =
+ params.initial_max_stream_data_bidi_remote;
+ early_data_params.initial_max_stream_data_uni =
+ params.initial_max_stream_data_uni;
+ early_data_params.initial_max_data = params.initial_max_data;
+ early_data_params.initial_max_streams_bidi =
+ params.initial_max_streams_bidi;
+ early_data_params.initial_max_streams_uni = params.initial_max_streams_uni;
+
+ // TODO include HTTP/3 SETTINGS
+
+ std::array<uint8_t, 128> quic_early_data_ctx;
+
+ auto quic_early_data_ctxlen = ngtcp2_transport_params_encode(
+ quic_early_data_ctx.data(), quic_early_data_ctx.size(),
+ &early_data_params);
+
+ assert(quic_early_data_ctxlen > 0);
+ assert(static_cast<size_t>(quic_early_data_ctxlen) <=
+ quic_early_data_ctx.size());
+
+ if (SSL_set_quic_early_data_context(handler_->get_ssl(),
+ quic_early_data_ctx.data(),
+ quic_early_data_ctxlen) != 1) {
+ ULOG(ERROR, this) << "SSL_set_quic_early_data_context failed";
+ return -1;
+ }
+ }
+#endif // NGHTTP2_OPENSSL_IS_BORINGSSL
+
+ if (odcid) {
+ params.original_dcid = *odcid;
+ params.retry_scid = initial_hd.dcid;
+ params.retry_scid_present = 1;
+ } else {
+ params.original_dcid = initial_hd.dcid;
+ }
+
+ params.original_dcid_present = 1;
+
+ rv = generate_quic_stateless_reset_token(
+ params.stateless_reset_token, scid, qkm.secret.data(), qkm.secret.size());
+ if (rv != 0) {
+ ULOG(ERROR, this) << "generate_quic_stateless_reset_token failed";
+ return -1;
+ }
+ params.stateless_reset_token_present = 1;
+
+ auto path = ngtcp2_path{
+ {
+ const_cast<sockaddr *>(&local_addr.su.sa),
+ static_cast<socklen_t>(local_addr.len),
+ },
+ {
+ const_cast<sockaddr *>(&remote_addr.su.sa),
+ static_cast<socklen_t>(remote_addr.len),
+ },
+ const_cast<UpstreamAddr *>(faddr),
+ };
+
+ rv = ngtcp2_conn_server_new(&conn_, &initial_hd.scid, &scid, &path,
+ initial_hd.version, &callbacks, &settings,
+ &params, nullptr, this);
+ if (rv != 0) {
+ ULOG(ERROR, this) << "ngtcp2_conn_server_new: " << ngtcp2_strerror(rv);
+ return -1;
+ }
+
+ ngtcp2_conn_set_tls_native_handle(conn_, handler_->get_ssl());
+
+ auto quic_connection_handler = worker->get_quic_connection_handler();
+
+ if (generate_quic_hashed_connection_id(hashed_scid_, remote_addr, local_addr,
+ initial_hd.dcid) != 0) {
+ return -1;
+ }
+
+ quic_connection_handler->add_connection_id(hashed_scid_, handler_);
+ quic_connection_handler->add_connection_id(scid, handler_);
+
+ return 0;
+}
+
+int Http3Upstream::on_read() { return 0; }
+
+int Http3Upstream::on_write() {
+ int rv;
+
+ if (tx_.send_blocked) {
+ rv = send_blocked_packet();
+ if (rv != 0) {
+ return -1;
+ }
+
+ if (tx_.send_blocked) {
+ return 0;
+ }
+ }
+
+ handler_->get_connection()->wlimit.stopw();
+
+ if (write_streams() != 0) {
+ return -1;
+ }
+
+ if (httpconn_ && nghttp3_conn_is_drained(httpconn_)) {
+ return -1;
+ }
+
+ reset_timer();
+
+ return 0;
+}
+
+int Http3Upstream::write_streams() {
+ std::array<nghttp3_vec, 16> vec;
+ auto max_udp_payload_size = ngtcp2_conn_get_max_tx_udp_payload_size(conn_);
+#ifdef UDP_SEGMENT
+ auto path_max_udp_payload_size =
+ ngtcp2_conn_get_path_max_tx_udp_payload_size(conn_);
+#endif // UDP_SEGMENT
+ auto max_pktcnt = ngtcp2_conn_get_send_quantum(conn_) / max_udp_payload_size;
+ ngtcp2_pkt_info pi, prev_pi;
+ uint8_t *bufpos = tx_.data.get();
+ ngtcp2_path_storage ps, prev_ps;
+ size_t pktcnt = 0;
+ int rv;
+ size_t gso_size = 0;
+ auto ts = quic_timestamp();
+
+ ngtcp2_path_storage_zero(&ps);
+ ngtcp2_path_storage_zero(&prev_ps);
+
+ for (;;) {
+ int64_t stream_id = -1;
+ int fin = 0;
+ nghttp3_ssize sveccnt = 0;
+
+ if (httpconn_ && ngtcp2_conn_get_max_data_left(conn_)) {
+ sveccnt = nghttp3_conn_writev_stream(httpconn_, &stream_id, &fin,
+ vec.data(), vec.size());
+ if (sveccnt < 0) {
+ ULOG(ERROR, this) << "nghttp3_conn_writev_stream: "
+ << nghttp3_strerror(sveccnt);
+ ngtcp2_ccerr_set_application_error(
+ &last_error_, nghttp3_err_infer_quic_app_error_code(sveccnt),
+ nullptr, 0);
+ return handle_error();
+ }
+ }
+
+ ngtcp2_ssize ndatalen;
+ auto v = vec.data();
+ auto vcnt = static_cast<size_t>(sveccnt);
+
+ uint32_t flags = NGTCP2_WRITE_STREAM_FLAG_MORE;
+ if (fin) {
+ flags |= NGTCP2_WRITE_STREAM_FLAG_FIN;
+ }
+
+ auto nwrite = ngtcp2_conn_writev_stream(
+ conn_, &ps.path, &pi, bufpos, max_udp_payload_size, &ndatalen, flags,
+ stream_id, reinterpret_cast<const ngtcp2_vec *>(v), vcnt, ts);
+ if (nwrite < 0) {
+ switch (nwrite) {
+ case NGTCP2_ERR_STREAM_DATA_BLOCKED:
+ assert(ndatalen == -1);
+ nghttp3_conn_block_stream(httpconn_, stream_id);
+ continue;
+ case NGTCP2_ERR_STREAM_SHUT_WR:
+ assert(ndatalen == -1);
+ nghttp3_conn_shutdown_stream_write(httpconn_, stream_id);
+ continue;
+ case NGTCP2_ERR_WRITE_MORE:
+ assert(ndatalen >= 0);
+ rv = nghttp3_conn_add_write_offset(httpconn_, stream_id, ndatalen);
+ if (rv != 0) {
+ ULOG(ERROR, this)
+ << "nghttp3_conn_add_write_offset: " << nghttp3_strerror(rv);
+ ngtcp2_ccerr_set_application_error(
+ &last_error_, nghttp3_err_infer_quic_app_error_code(rv), nullptr,
+ 0);
+ return handle_error();
+ }
+ continue;
+ }
+
+ assert(ndatalen == -1);
+
+ ULOG(ERROR, this) << "ngtcp2_conn_writev_stream: "
+ << ngtcp2_strerror(nwrite);
+
+ ngtcp2_ccerr_set_liberr(&last_error_, nwrite, nullptr, 0);
+
+ return handle_error();
+ } else if (ndatalen >= 0) {
+ rv = nghttp3_conn_add_write_offset(httpconn_, stream_id, ndatalen);
+ if (rv != 0) {
+ ULOG(ERROR, this) << "nghttp3_conn_add_write_offset: "
+ << nghttp3_strerror(rv);
+ ngtcp2_ccerr_set_application_error(
+ &last_error_, nghttp3_err_infer_quic_app_error_code(rv), nullptr,
+ 0);
+ return handle_error();
+ }
+ }
+
+ if (nwrite == 0) {
+ if (bufpos - tx_.data.get()) {
+ auto faddr = static_cast<UpstreamAddr *>(prev_ps.path.user_data);
+ auto data = tx_.data.get();
+ auto datalen = bufpos - data;
+
+ rv = send_packet(faddr, prev_ps.path.remote.addr,
+ prev_ps.path.remote.addrlen, prev_ps.path.local.addr,
+ prev_ps.path.local.addrlen, prev_pi, data, datalen,
+ gso_size);
+ if (rv == SHRPX_ERR_SEND_BLOCKED) {
+ on_send_blocked(faddr, prev_ps.path.remote, prev_ps.path.local,
+ prev_pi, data, datalen, gso_size);
+
+ signal_write_upstream_addr(faddr);
+ }
+ }
+
+ ngtcp2_conn_update_pkt_tx_time(conn_, ts);
+
+ return 0;
+ }
+
+ bufpos += nwrite;
+
+#ifdef UDP_SEGMENT
+ if (pktcnt == 0) {
+ ngtcp2_path_copy(&prev_ps.path, &ps.path);
+ prev_pi = pi;
+ gso_size = nwrite;
+ } else if (!ngtcp2_path_eq(&prev_ps.path, &ps.path) ||
+ prev_pi.ecn != pi.ecn ||
+ static_cast<size_t>(nwrite) > gso_size ||
+ (gso_size > path_max_udp_payload_size &&
+ static_cast<size_t>(nwrite) != gso_size)) {
+ auto faddr = static_cast<UpstreamAddr *>(prev_ps.path.user_data);
+ auto data = tx_.data.get();
+ auto datalen = bufpos - data - nwrite;
+
+ rv = send_packet(faddr, prev_ps.path.remote.addr,
+ prev_ps.path.remote.addrlen, prev_ps.path.local.addr,
+ prev_ps.path.local.addrlen, prev_pi, data, datalen,
+ gso_size);
+ switch (rv) {
+ case SHRPX_ERR_SEND_BLOCKED:
+ on_send_blocked(faddr, prev_ps.path.remote, prev_ps.path.local, prev_pi,
+ data, datalen, gso_size);
+
+ on_send_blocked(static_cast<UpstreamAddr *>(ps.path.user_data),
+ ps.path.remote, ps.path.local, pi, bufpos - nwrite,
+ nwrite, 0);
+
+ signal_write_upstream_addr(faddr);
+
+ break;
+ default: {
+ auto faddr = static_cast<UpstreamAddr *>(ps.path.user_data);
+ auto data = bufpos - nwrite;
+
+ rv = send_packet(faddr, ps.path.remote.addr, ps.path.remote.addrlen,
+ ps.path.local.addr, ps.path.local.addrlen, pi, data,
+ nwrite, 0);
+ if (rv == SHRPX_ERR_SEND_BLOCKED) {
+ on_send_blocked(faddr, ps.path.remote, ps.path.local, pi, data,
+ nwrite, 0);
+
+ signal_write_upstream_addr(faddr);
+ }
+ }
+ }
+
+ ngtcp2_conn_update_pkt_tx_time(conn_, ts);
+
+ return 0;
+ }
+
+ if (++pktcnt == max_pktcnt || static_cast<size_t>(nwrite) < gso_size) {
+ auto faddr = static_cast<UpstreamAddr *>(ps.path.user_data);
+ auto data = tx_.data.get();
+ auto datalen = bufpos - data;
+
+ rv = send_packet(faddr, ps.path.remote.addr, ps.path.remote.addrlen,
+ ps.path.local.addr, ps.path.local.addrlen, pi, data,
+ datalen, gso_size);
+ if (rv == SHRPX_ERR_SEND_BLOCKED) {
+ on_send_blocked(faddr, ps.path.remote, ps.path.local, pi, data, datalen,
+ gso_size);
+
+ signal_write_upstream_addr(faddr);
+ }
+
+ ngtcp2_conn_update_pkt_tx_time(conn_, ts);
+
+ return 0;
+ }
+#else // !UDP_SEGMENT
+ auto faddr = static_cast<UpstreamAddr *>(ps.path.user_data);
+ auto data = tx_.data.get();
+ auto datalen = bufpos - data;
+
+ rv = send_packet(faddr, ps.path.remote.addr, ps.path.remote.addrlen,
+ ps.path.local.addr, ps.path.local.addrlen, pi, data,
+ datalen, 0);
+ if (rv == SHRPX_ERR_SEND_BLOCKED) {
+ on_send_blocked(faddr, ps.path.remote, ps.path.local, pi, data, datalen,
+ 0);
+
+ ngtcp2_conn_update_pkt_tx_time(conn_, ts);
+
+ signal_write_upstream_addr(faddr);
+
+ return 0;
+ }
+
+ if (++pktcnt == max_pktcnt) {
+ ngtcp2_conn_update_pkt_tx_time(conn_, ts);
+
+ return 0;
+ }
+
+ bufpos = tx_.data.get();
+#endif // !UDP_SEGMENT
+ }
+
+ return 0;
+}
+
+int Http3Upstream::on_timeout(Downstream *downstream) { return 0; }
+
+int Http3Upstream::on_downstream_abort_request(Downstream *downstream,
+ unsigned int status_code) {
+ int rv;
+
+ rv = error_reply(downstream, status_code);
+
+ if (rv != 0) {
+ return -1;
+ }
+
+ handler_->signal_write();
+
+ return 0;
+}
+
+int Http3Upstream::on_downstream_abort_request_with_https_redirect(
+ Downstream *downstream) {
+ assert(0);
+ abort();
+}
+
+namespace {
+uint64_t
+infer_upstream_shutdown_stream_error_code(uint32_t downstream_error_code) {
+ // NGHTTP2_REFUSED_STREAM is important because it tells upstream
+ // client to retry.
+ switch (downstream_error_code) {
+ case NGHTTP2_NO_ERROR:
+ return NGHTTP3_H3_NO_ERROR;
+ case NGHTTP2_REFUSED_STREAM:
+ return NGHTTP3_H3_REQUEST_REJECTED;
+ default:
+ return NGHTTP3_H3_INTERNAL_ERROR;
+ }
+}
+} // namespace
+
+int Http3Upstream::downstream_read(DownstreamConnection *dconn) {
+ auto downstream = dconn->get_downstream();
+
+ if (downstream->get_response_state() == DownstreamState::MSG_RESET) {
+ // The downstream stream was reset (canceled). In this case,
+ // RST_STREAM to the upstream and delete downstream connection
+ // here. Deleting downstream will be taken place at
+ // on_stream_close_callback.
+ shutdown_stream(downstream,
+ infer_upstream_shutdown_stream_error_code(
+ downstream->get_response_rst_stream_error_code()));
+ downstream->pop_downstream_connection();
+ // dconn was deleted
+ dconn = nullptr;
+ } else if (downstream->get_response_state() ==
+ DownstreamState::MSG_BAD_HEADER) {
+ if (error_reply(downstream, 502) != 0) {
+ return -1;
+ }
+ downstream->pop_downstream_connection();
+ // dconn was deleted
+ dconn = nullptr;
+ } else {
+ auto rv = downstream->on_read();
+ if (rv == SHRPX_ERR_EOF) {
+ if (downstream->get_request_header_sent()) {
+ return downstream_eof(dconn);
+ }
+ return SHRPX_ERR_RETRY;
+ }
+ if (rv == SHRPX_ERR_DCONN_CANCELED) {
+ downstream->pop_downstream_connection();
+ handler_->signal_write();
+ return 0;
+ }
+ if (rv != 0) {
+ if (rv != SHRPX_ERR_NETWORK) {
+ if (LOG_ENABLED(INFO)) {
+ DCLOG(INFO, dconn) << "HTTP parser failure";
+ }
+ }
+ return downstream_error(dconn, Downstream::EVENT_ERROR);
+ }
+
+ if (downstream->can_detach_downstream_connection()) {
+ // Keep-alive
+ downstream->detach_downstream_connection();
+ }
+ }
+
+ handler_->signal_write();
+
+ // At this point, downstream may be deleted.
+
+ return 0;
+}
+
+int Http3Upstream::downstream_write(DownstreamConnection *dconn) {
+ int rv;
+ rv = dconn->on_write();
+ if (rv == SHRPX_ERR_NETWORK) {
+ return downstream_error(dconn, Downstream::EVENT_ERROR);
+ }
+ if (rv != 0) {
+ return rv;
+ }
+ return 0;
+}
+
+int Http3Upstream::downstream_eof(DownstreamConnection *dconn) {
+ auto downstream = dconn->get_downstream();
+
+ if (LOG_ENABLED(INFO)) {
+ DCLOG(INFO, dconn) << "EOF. stream_id=" << downstream->get_stream_id();
+ }
+
+ // Delete downstream connection. If we don't delete it here, it will
+ // be pooled in on_stream_close_callback.
+ downstream->pop_downstream_connection();
+ // dconn was deleted
+ dconn = nullptr;
+ // downstream will be deleted in on_stream_close_callback.
+ if (downstream->get_response_state() == DownstreamState::HEADER_COMPLETE) {
+ // Server may indicate the end of the request by EOF
+ if (LOG_ENABLED(INFO)) {
+ ULOG(INFO, this) << "Downstream body was ended by EOF";
+ }
+ downstream->set_response_state(DownstreamState::MSG_COMPLETE);
+
+ // For tunneled connection, MSG_COMPLETE signals
+ // downstream_read_data_callback to send RST_STREAM after pending
+ // response body is sent. This is needed to ensure that RST_STREAM
+ // is sent after all pending data are sent.
+ if (on_downstream_body_complete(downstream) != 0) {
+ return -1;
+ }
+ } else if (downstream->get_response_state() !=
+ DownstreamState::MSG_COMPLETE) {
+ // If stream was not closed, then we set MSG_COMPLETE and let
+ // on_stream_close_callback delete downstream.
+ if (error_reply(downstream, 502) != 0) {
+ return -1;
+ }
+ }
+ handler_->signal_write();
+ // At this point, downstream may be deleted.
+ return 0;
+}
+
+int Http3Upstream::downstream_error(DownstreamConnection *dconn, int events) {
+ auto downstream = dconn->get_downstream();
+
+ if (LOG_ENABLED(INFO)) {
+ if (events & Downstream::EVENT_ERROR) {
+ DCLOG(INFO, dconn) << "Downstream network/general error";
+ } else {
+ DCLOG(INFO, dconn) << "Timeout";
+ }
+ if (downstream->get_upgraded()) {
+ DCLOG(INFO, dconn) << "Note: this is tunnel connection";
+ }
+ }
+
+ // Delete downstream connection. If we don't delete it here, it will
+ // be pooled in on_stream_close_callback.
+ downstream->pop_downstream_connection();
+ // dconn was deleted
+ dconn = nullptr;
+
+ if (downstream->get_response_state() == DownstreamState::MSG_COMPLETE) {
+ // For SSL tunneling, we issue RST_STREAM. For other types of
+ // stream, we don't have to do anything since response was
+ // complete.
+ if (downstream->get_upgraded()) {
+ shutdown_stream(downstream, NGHTTP3_H3_NO_ERROR);
+ }
+ } else {
+ if (downstream->get_response_state() == DownstreamState::HEADER_COMPLETE) {
+ if (downstream->get_upgraded()) {
+ if (on_downstream_body_complete(downstream) != 0) {
+ return -1;
+ }
+ } else {
+ shutdown_stream(downstream, NGHTTP3_H3_INTERNAL_ERROR);
+ }
+ } else {
+ unsigned int status;
+ if (events & Downstream::EVENT_TIMEOUT) {
+ if (downstream->get_request_header_sent()) {
+ status = 504;
+ } else {
+ status = 408;
+ }
+ } else {
+ status = 502;
+ }
+ if (error_reply(downstream, status) != 0) {
+ return -1;
+ }
+ }
+ downstream->set_response_state(DownstreamState::MSG_COMPLETE);
+ }
+ handler_->signal_write();
+ // At this point, downstream may be deleted.
+ return 0;
+}
+
+ClientHandler *Http3Upstream::get_client_handler() const { return handler_; }
+
+namespace {
+nghttp3_ssize downstream_read_data_callback(nghttp3_conn *conn,
+ int64_t stream_id, nghttp3_vec *vec,
+ size_t veccnt, uint32_t *pflags,
+ void *conn_user_data,
+ void *stream_user_data) {
+ auto upstream = static_cast<Http3Upstream *>(conn_user_data);
+ auto downstream = static_cast<Downstream *>(stream_user_data);
+
+ assert(downstream);
+
+ auto body = downstream->get_response_buf();
+
+ assert(body);
+
+ if (downstream->get_response_state() != DownstreamState::MSG_COMPLETE &&
+ body->rleft_mark() == 0) {
+ downstream->disable_upstream_wtimer();
+ return NGHTTP3_ERR_WOULDBLOCK;
+ }
+
+ downstream->reset_upstream_wtimer();
+
+ veccnt = body->riovec_mark(reinterpret_cast<struct iovec *>(vec), veccnt);
+
+ if (downstream->get_response_state() == DownstreamState::MSG_COMPLETE &&
+ body->rleft_mark() == 0) {
+ *pflags |= NGHTTP3_DATA_FLAG_EOF;
+ }
+
+ assert((*pflags & NGHTTP3_DATA_FLAG_EOF) || veccnt);
+
+ downstream->response_sent_body_length += nghttp3_vec_len(vec, veccnt);
+
+ if ((*pflags & NGHTTP3_DATA_FLAG_EOF) &&
+ upstream->shutdown_stream_read(stream_id, NGHTTP3_H3_NO_ERROR) != 0) {
+ return NGHTTP3_ERR_CALLBACK_FAILURE;
+ }
+
+ return veccnt;
+}
+} // namespace
+
+int Http3Upstream::on_downstream_header_complete(Downstream *downstream) {
+ int rv;
+
+ const auto &req = downstream->request();
+ auto &resp = downstream->response();
+
+ auto &balloc = downstream->get_block_allocator();
+
+ if (LOG_ENABLED(INFO)) {
+ if (downstream->get_non_final_response()) {
+ DLOG(INFO, downstream) << "HTTP non-final response header";
+ } else {
+ DLOG(INFO, downstream) << "HTTP response header completed";
+ }
+ }
+
+ auto config = get_config();
+ auto &httpconf = config->http;
+
+ if (!config->http2_proxy && !httpconf.no_location_rewrite) {
+ downstream->rewrite_location_response_header(req.scheme);
+ }
+
+#ifdef HAVE_MRUBY
+ if (!downstream->get_non_final_response()) {
+ auto dconn = downstream->get_downstream_connection();
+ const auto &group = dconn->get_downstream_addr_group();
+ if (group) {
+ const auto &dmruby_ctx = group->shared_addr->mruby_ctx;
+
+ if (dmruby_ctx->run_on_response_proc(downstream) != 0) {
+ if (error_reply(downstream, 500) != 0) {
+ return -1;
+ }
+ // Returning -1 will signal deletion of dconn.
+ return -1;
+ }
+
+ if (downstream->get_response_state() == DownstreamState::MSG_COMPLETE) {
+ return -1;
+ }
+ }
+
+ auto worker = handler_->get_worker();
+ auto mruby_ctx = worker->get_mruby_context();
+
+ if (mruby_ctx->run_on_response_proc(downstream) != 0) {
+ if (error_reply(downstream, 500) != 0) {
+ return -1;
+ }
+ // Returning -1 will signal deletion of dconn.
+ return -1;
+ }
+
+ if (downstream->get_response_state() == DownstreamState::MSG_COMPLETE) {
+ return -1;
+ }
+ }
+#endif // HAVE_MRUBY
+
+ auto nva = std::vector<nghttp3_nv>();
+ // 4 means :status and possible server, via, and set-cookie (for
+ // affinity cookie) header field.
+ nva.reserve(resp.fs.headers().size() + 4 +
+ httpconf.add_response_headers.size());
+
+ if (downstream->get_non_final_response()) {
+ auto response_status = http2::stringify_status(balloc, resp.http_status);
+
+ nva.push_back(http3::make_nv_ls_nocopy(":status", response_status));
+
+ http3::copy_headers_to_nva_nocopy(nva, resp.fs.headers(),
+ http2::HDOP_STRIP_ALL);
+
+ if (LOG_ENABLED(INFO)) {
+ log_response_headers(downstream, nva);
+ }
+
+ rv = nghttp3_conn_submit_info(httpconn_, downstream->get_stream_id(),
+ nva.data(), nva.size());
+
+ resp.fs.clear_headers();
+
+ if (rv != 0) {
+ ULOG(FATAL, this) << "nghttp3_conn_submit_info() failed";
+ return -1;
+ }
+
+ return 0;
+ }
+
+ auto striphd_flags = http2::HDOP_STRIP_ALL & ~http2::HDOP_STRIP_VIA;
+ StringRef response_status;
+
+ if (req.connect_proto == ConnectProto::WEBSOCKET && resp.http_status == 101) {
+ response_status = http2::stringify_status(balloc, 200);
+ striphd_flags |= http2::HDOP_STRIP_SEC_WEBSOCKET_ACCEPT;
+ } else {
+ response_status = http2::stringify_status(balloc, resp.http_status);
+ }
+
+ nva.push_back(http3::make_nv_ls_nocopy(":status", response_status));
+
+ http3::copy_headers_to_nva_nocopy(nva, resp.fs.headers(), striphd_flags);
+
+ if (!config->http2_proxy && !httpconf.no_server_rewrite) {
+ nva.push_back(http3::make_nv_ls_nocopy("server", httpconf.server_name));
+ } else {
+ auto server = resp.fs.header(http2::HD_SERVER);
+ if (server) {
+ nva.push_back(http3::make_nv_ls_nocopy("server", (*server).value));
+ }
+ }
+
+ if (!req.regular_connect_method() || !downstream->get_upgraded()) {
+ auto affinity_cookie = downstream->get_affinity_cookie_to_send();
+ if (affinity_cookie) {
+ auto dconn = downstream->get_downstream_connection();
+ assert(dconn);
+ auto &group = dconn->get_downstream_addr_group();
+ auto &shared_addr = group->shared_addr;
+ auto &cookieconf = shared_addr->affinity.cookie;
+ auto secure =
+ http::require_cookie_secure_attribute(cookieconf.secure, req.scheme);
+ auto cookie_str = http::create_affinity_cookie(
+ balloc, cookieconf.name, affinity_cookie, cookieconf.path, secure);
+ nva.push_back(http3::make_nv_ls_nocopy("set-cookie", cookie_str));
+ }
+ }
+
+ auto via = resp.fs.header(http2::HD_VIA);
+ if (httpconf.no_via) {
+ if (via) {
+ nva.push_back(http3::make_nv_ls_nocopy("via", (*via).value));
+ }
+ } else {
+ // we don't create more than 16 bytes in
+ // http::create_via_header_value.
+ size_t len = 16;
+ if (via) {
+ len += via->value.size() + 2;
+ }
+
+ auto iov = make_byte_ref(balloc, len + 1);
+ auto p = iov.base;
+ if (via) {
+ p = std::copy(std::begin(via->value), std::end(via->value), p);
+ p = util::copy_lit(p, ", ");
+ }
+ p = http::create_via_header_value(p, resp.http_major, resp.http_minor);
+ *p = '\0';
+
+ nva.push_back(http3::make_nv_ls_nocopy("via", StringRef{iov.base, p}));
+ }
+
+ for (auto &p : httpconf.add_response_headers) {
+ nva.push_back(http3::make_nv_nocopy(p.name, p.value));
+ }
+
+ if (LOG_ENABLED(INFO)) {
+ log_response_headers(downstream, nva);
+ }
+
+ auto priority = resp.fs.header(http2::HD_PRIORITY);
+ if (priority) {
+ nghttp3_pri pri;
+
+ if (nghttp3_conn_get_stream_priority(httpconn_, &pri,
+ downstream->get_stream_id()) == 0 &&
+ nghttp3_pri_parse_priority(&pri, priority->value.byte(),
+ priority->value.size()) == 0) {
+ rv = nghttp3_conn_set_server_stream_priority(
+ httpconn_, downstream->get_stream_id(), &pri);
+ if (rv != 0) {
+ ULOG(ERROR, this) << "nghttp3_conn_set_server_stream_priority: "
+ << nghttp3_strerror(rv);
+ }
+ }
+ }
+
+ nghttp3_data_reader data_read;
+ data_read.read_data = downstream_read_data_callback;
+
+ nghttp3_data_reader *data_readptr;
+
+ if (downstream->expect_response_body() ||
+ downstream->expect_response_trailer()) {
+ data_readptr = &data_read;
+ } else {
+ data_readptr = nullptr;
+ }
+
+ rv = nghttp3_conn_submit_response(httpconn_, downstream->get_stream_id(),
+ nva.data(), nva.size(), data_readptr);
+ if (rv != 0) {
+ ULOG(FATAL, this) << "nghttp3_conn_submit_response() failed";
+ return -1;
+ }
+
+ if (data_readptr) {
+ downstream->reset_upstream_wtimer();
+ } else if (shutdown_stream_read(downstream->get_stream_id(),
+ NGHTTP3_H3_NO_ERROR) != 0) {
+ return -1;
+ }
+
+ return 0;
+}
+
+int Http3Upstream::on_downstream_body(Downstream *downstream,
+ const uint8_t *data, size_t len,
+ bool flush) {
+ auto body = downstream->get_response_buf();
+ body->append(data, len);
+
+ if (flush) {
+ nghttp3_conn_resume_stream(httpconn_, downstream->get_stream_id());
+
+ downstream->ensure_upstream_wtimer();
+ }
+
+ return 0;
+}
+
+int Http3Upstream::on_downstream_body_complete(Downstream *downstream) {
+ if (LOG_ENABLED(INFO)) {
+ DLOG(INFO, downstream) << "HTTP response completed";
+ }
+
+ auto &resp = downstream->response();
+
+ if (!downstream->validate_response_recv_body_length()) {
+ shutdown_stream(downstream, NGHTTP3_H3_GENERAL_PROTOCOL_ERROR);
+ resp.connection_close = true;
+ return 0;
+ }
+
+ if (!downstream->get_upgraded()) {
+ const auto &trailers = resp.fs.trailers();
+ if (!trailers.empty()) {
+ std::vector<nghttp3_nv> nva;
+ nva.reserve(trailers.size());
+ http3::copy_headers_to_nva_nocopy(nva, trailers, http2::HDOP_STRIP_ALL);
+ if (!nva.empty()) {
+ auto rv = nghttp3_conn_submit_trailers(
+ httpconn_, downstream->get_stream_id(), nva.data(), nva.size());
+ if (rv != 0) {
+ ULOG(FATAL, this) << "nghttp3_conn_submit_trailers() failed: "
+ << nghttp3_strerror(rv);
+ return -1;
+ }
+ }
+ }
+ }
+
+ nghttp3_conn_resume_stream(httpconn_, downstream->get_stream_id());
+ downstream->ensure_upstream_wtimer();
+
+ return 0;
+}
+
+void Http3Upstream::on_handler_delete() {
+ for (auto d = downstream_queue_.get_downstreams(); d; d = d->dlnext) {
+ if (d->get_dispatch_state() == DispatchState::ACTIVE &&
+ d->accesslog_ready()) {
+ handler_->write_accesslog(d);
+ }
+ }
+
+ auto worker = handler_->get_worker();
+ auto quic_conn_handler = worker->get_quic_connection_handler();
+
+ std::vector<ngtcp2_cid> scids(ngtcp2_conn_get_scid(conn_, nullptr) + 1);
+ ngtcp2_conn_get_scid(conn_, scids.data());
+ scids.back() = hashed_scid_;
+
+ for (auto &cid : scids) {
+ quic_conn_handler->remove_connection_id(cid);
+ }
+
+ if (retry_close_ || last_error_.type == NGTCP2_CCERR_TYPE_IDLE_CLOSE) {
+ return;
+ }
+
+ // If this is not idle close, send CONNECTION_CLOSE.
+ if (!ngtcp2_conn_in_closing_period(conn_) &&
+ !ngtcp2_conn_in_draining_period(conn_)) {
+ ngtcp2_path_storage ps;
+ ngtcp2_pkt_info pi;
+ conn_close_.resize(SHRPX_QUIC_CONN_CLOSE_PKTLEN);
+
+ ngtcp2_path_storage_zero(&ps);
+
+ ngtcp2_ccerr ccerr;
+ ngtcp2_ccerr_default(&ccerr);
+
+ if (worker->get_graceful_shutdown() &&
+ !ngtcp2_conn_get_handshake_completed(conn_)) {
+ ccerr.error_code = NGTCP2_CONNECTION_REFUSED;
+ }
+
+ auto nwrite = ngtcp2_conn_write_connection_close(
+ conn_, &ps.path, &pi, conn_close_.data(), conn_close_.size(), &ccerr,
+ quic_timestamp());
+ if (nwrite < 0) {
+ if (nwrite != NGTCP2_ERR_INVALID_STATE) {
+ ULOG(ERROR, this) << "ngtcp2_conn_write_connection_close: "
+ << ngtcp2_strerror(nwrite);
+ }
+
+ return;
+ }
+
+ conn_close_.resize(nwrite);
+
+ send_packet(static_cast<UpstreamAddr *>(ps.path.user_data),
+ ps.path.remote.addr, ps.path.remote.addrlen, ps.path.local.addr,
+ ps.path.local.addrlen, pi, conn_close_.data(), nwrite, 0);
+ }
+
+ auto d =
+ static_cast<ev_tstamp>(ngtcp2_conn_get_pto(conn_) * 3) / NGTCP2_SECONDS;
+
+ if (LOG_ENABLED(INFO)) {
+ ULOG(INFO, this) << "Enter close-wait period " << d << "s with "
+ << conn_close_.size() << " bytes sentinel packet";
+ }
+
+ auto cw = std::make_unique<CloseWait>(worker, std::move(scids),
+ std::move(conn_close_), d);
+
+ quic_conn_handler->add_close_wait(cw.release());
+}
+
+int Http3Upstream::on_downstream_reset(Downstream *downstream, bool no_retry) {
+ int rv;
+
+ if (downstream->get_dispatch_state() != DispatchState::ACTIVE) {
+ // This is error condition when we failed push_request_headers()
+ // in initiate_downstream(). Otherwise, we have
+ // DispatchState::ACTIVE state, or we did not set
+ // DownstreamConnection.
+ downstream->pop_downstream_connection();
+ handler_->signal_write();
+
+ return 0;
+ }
+
+ if (!downstream->request_submission_ready()) {
+ if (downstream->get_response_state() == DownstreamState::MSG_COMPLETE) {
+ // We have got all response body already. Send it off.
+ downstream->pop_downstream_connection();
+ return 0;
+ }
+ // pushed stream is handled here
+ shutdown_stream(downstream, NGHTTP3_H3_INTERNAL_ERROR);
+ downstream->pop_downstream_connection();
+
+ handler_->signal_write();
+
+ return 0;
+ }
+
+ downstream->pop_downstream_connection();
+
+ downstream->add_retry();
+
+ std::unique_ptr<DownstreamConnection> dconn;
+
+ rv = 0;
+
+ if (no_retry || downstream->no_more_retry()) {
+ goto fail;
+ }
+
+ // downstream connection is clean; we can retry with new
+ // downstream connection.
+
+ for (;;) {
+ auto dconn = handler_->get_downstream_connection(rv, downstream);
+ if (!dconn) {
+ goto fail;
+ }
+
+ rv = downstream->attach_downstream_connection(std::move(dconn));
+ if (rv == 0) {
+ break;
+ }
+ }
+
+ rv = downstream->push_request_headers();
+ if (rv != 0) {
+ goto fail;
+ }
+
+ return 0;
+
+fail:
+ if (rv == SHRPX_ERR_TLS_REQUIRED) {
+ assert(0);
+ abort();
+ }
+
+ rv = on_downstream_abort_request(downstream, 502);
+ if (rv != 0) {
+ shutdown_stream(downstream, NGHTTP3_H3_INTERNAL_ERROR);
+ }
+ downstream->pop_downstream_connection();
+
+ handler_->signal_write();
+
+ return 0;
+}
+
+void Http3Upstream::pause_read(IOCtrlReason reason) {}
+
+int Http3Upstream::resume_read(IOCtrlReason reason, Downstream *downstream,
+ size_t consumed) {
+ consume(downstream->get_stream_id(), consumed);
+
+ auto &req = downstream->request();
+
+ req.consume(consumed);
+
+ handler_->signal_write();
+
+ return 0;
+}
+
+int Http3Upstream::send_reply(Downstream *downstream, const uint8_t *body,
+ size_t bodylen) {
+ int rv;
+
+ nghttp3_data_reader data_read, *data_read_ptr = nullptr;
+
+ if (bodylen) {
+ data_read.read_data = downstream_read_data_callback;
+ data_read_ptr = &data_read;
+ }
+
+ const auto &resp = downstream->response();
+ auto config = get_config();
+ auto &httpconf = config->http;
+
+ auto &balloc = downstream->get_block_allocator();
+
+ const auto &headers = resp.fs.headers();
+ auto nva = std::vector<nghttp3_nv>();
+ // 2 for :status and server
+ nva.reserve(2 + headers.size() + httpconf.add_response_headers.size());
+
+ auto response_status = http2::stringify_status(balloc, resp.http_status);
+
+ nva.push_back(http3::make_nv_ls_nocopy(":status", response_status));
+
+ for (auto &kv : headers) {
+ if (kv.name.empty() || kv.name[0] == ':') {
+ continue;
+ }
+ switch (kv.token) {
+ case http2::HD_CONNECTION:
+ case http2::HD_KEEP_ALIVE:
+ case http2::HD_PROXY_CONNECTION:
+ case http2::HD_TE:
+ case http2::HD_TRANSFER_ENCODING:
+ case http2::HD_UPGRADE:
+ continue;
+ }
+ nva.push_back(http3::make_nv_nocopy(kv.name, kv.value, kv.no_index));
+ }
+
+ if (!resp.fs.header(http2::HD_SERVER)) {
+ nva.push_back(http3::make_nv_ls_nocopy("server", config->http.server_name));
+ }
+
+ for (auto &p : httpconf.add_response_headers) {
+ nva.push_back(http3::make_nv_nocopy(p.name, p.value));
+ }
+
+ rv = nghttp3_conn_submit_response(httpconn_, downstream->get_stream_id(),
+ nva.data(), nva.size(), data_read_ptr);
+ if (nghttp3_err_is_fatal(rv)) {
+ ULOG(FATAL, this) << "nghttp3_conn_submit_response() failed: "
+ << nghttp3_strerror(rv);
+ return -1;
+ }
+
+ auto buf = downstream->get_response_buf();
+
+ buf->append(body, bodylen);
+
+ downstream->set_response_state(DownstreamState::MSG_COMPLETE);
+
+ if (data_read_ptr) {
+ downstream->reset_upstream_wtimer();
+ }
+
+ if (shutdown_stream_read(downstream->get_stream_id(), NGHTTP3_H3_NO_ERROR) !=
+ 0) {
+ return -1;
+ }
+
+ return 0;
+}
+
+int Http3Upstream::initiate_push(Downstream *downstream, const StringRef &uri) {
+ return 0;
+}
+
+int Http3Upstream::response_riovec(struct iovec *iov, int iovcnt) const {
+ return 0;
+}
+
+void Http3Upstream::response_drain(size_t n) {}
+
+bool Http3Upstream::response_empty() const { return false; }
+
+Downstream *
+Http3Upstream::on_downstream_push_promise(Downstream *downstream,
+ int32_t promised_stream_id) {
+ return nullptr;
+}
+
+int Http3Upstream::on_downstream_push_promise_complete(
+ Downstream *downstream, Downstream *promised_downstream) {
+ return 0;
+}
+
+bool Http3Upstream::push_enabled() const { return false; }
+
+void Http3Upstream::cancel_premature_downstream(
+ Downstream *promised_downstream) {}
+
+int Http3Upstream::on_read(const UpstreamAddr *faddr,
+ const Address &remote_addr,
+ const Address &local_addr, const ngtcp2_pkt_info &pi,
+ const uint8_t *data, size_t datalen) {
+ int rv;
+
+ auto path = ngtcp2_path{
+ {
+ const_cast<sockaddr *>(&local_addr.su.sa),
+ static_cast<socklen_t>(local_addr.len),
+ },
+ {
+ const_cast<sockaddr *>(&remote_addr.su.sa),
+ static_cast<socklen_t>(remote_addr.len),
+ },
+ const_cast<UpstreamAddr *>(faddr),
+ };
+
+ rv = ngtcp2_conn_read_pkt(conn_, &path, &pi, data, datalen, quic_timestamp());
+ if (rv != 0) {
+ switch (rv) {
+ case NGTCP2_ERR_DRAINING:
+ return -1;
+ case NGTCP2_ERR_RETRY: {
+ auto worker = handler_->get_worker();
+ auto quic_conn_handler = worker->get_quic_connection_handler();
+
+ if (worker->get_graceful_shutdown()) {
+ ngtcp2_ccerr_set_transport_error(&last_error_,
+ NGTCP2_CONNECTION_REFUSED, nullptr, 0);
+
+ return handle_error();
+ }
+
+ ngtcp2_version_cid vc;
+
+ rv =
+ ngtcp2_pkt_decode_version_cid(&vc, data, datalen, SHRPX_QUIC_SCIDLEN);
+ if (rv != 0) {
+ return -1;
+ }
+
+ retry_close_ = true;
+
+ quic_conn_handler->send_retry(handler_->get_upstream_addr(), vc.version,
+ vc.dcid, vc.dcidlen, vc.scid, vc.scidlen,
+ remote_addr, local_addr, datalen * 3);
+
+ return -1;
+ }
+ case NGTCP2_ERR_CRYPTO:
+ if (!last_error_.error_code) {
+ ngtcp2_ccerr_set_tls_alert(
+ &last_error_, ngtcp2_conn_get_tls_alert(conn_), nullptr, 0);
+ }
+ break;
+ case NGTCP2_ERR_DROP_CONN:
+ return -1;
+ default:
+ if (!last_error_.error_code) {
+ ngtcp2_ccerr_set_liberr(&last_error_, rv, nullptr, 0);
+ }
+ }
+
+ ULOG(ERROR, this) << "ngtcp2_conn_read_pkt: " << ngtcp2_strerror(rv);
+
+ return handle_error();
+ }
+
+ return 0;
+}
+
+int Http3Upstream::send_packet(const UpstreamAddr *faddr,
+ const sockaddr *remote_sa, size_t remote_salen,
+ const sockaddr *local_sa, size_t local_salen,
+ const ngtcp2_pkt_info &pi, const uint8_t *data,
+ size_t datalen, size_t gso_size) {
+ auto rv = quic_send_packet(faddr, remote_sa, remote_salen, local_sa,
+ local_salen, pi, data, datalen, gso_size);
+ switch (rv) {
+ case 0:
+ return 0;
+ // With GSO, sendmsg may fail with EINVAL if UDP payload is too
+ // large.
+ case -EINVAL:
+ case -EMSGSIZE:
+ // Let the packet lost.
+ break;
+ case -EAGAIN:
+#if EAGAIN != EWOULDBLOCK
+ case -EWOULDBLOCK:
+#endif // EAGAIN != EWOULDBLOCK
+ return SHRPX_ERR_SEND_BLOCKED;
+ default:
+ break;
+ }
+
+ return -1;
+}
+
+void Http3Upstream::on_send_blocked(const UpstreamAddr *faddr,
+ const ngtcp2_addr &remote_addr,
+ const ngtcp2_addr &local_addr,
+ const ngtcp2_pkt_info &pi,
+ const uint8_t *data, size_t datalen,
+ size_t gso_size) {
+ assert(tx_.num_blocked || !tx_.send_blocked);
+ assert(tx_.num_blocked < 2);
+
+ tx_.send_blocked = true;
+
+ auto &p = tx_.blocked[tx_.num_blocked++];
+
+ memcpy(&p.local_addr.su, local_addr.addr, local_addr.addrlen);
+ memcpy(&p.remote_addr.su, remote_addr.addr, remote_addr.addrlen);
+
+ p.local_addr.len = local_addr.addrlen;
+ p.remote_addr.len = remote_addr.addrlen;
+ p.faddr = faddr;
+ p.pi = pi;
+ p.data = data;
+ p.datalen = datalen;
+ p.gso_size = gso_size;
+}
+
+int Http3Upstream::send_blocked_packet() {
+ int rv;
+
+ assert(tx_.send_blocked);
+
+ for (; tx_.num_blocked_sent < tx_.num_blocked; ++tx_.num_blocked_sent) {
+ auto &p = tx_.blocked[tx_.num_blocked_sent];
+
+ rv = send_packet(p.faddr, &p.remote_addr.su.sa, p.remote_addr.len,
+ &p.local_addr.su.sa, p.local_addr.len, p.pi, p.data,
+ p.datalen, p.gso_size);
+ if (rv == SHRPX_ERR_SEND_BLOCKED) {
+ signal_write_upstream_addr(p.faddr);
+
+ return 0;
+ }
+ }
+
+ tx_.send_blocked = false;
+ tx_.num_blocked = 0;
+ tx_.num_blocked_sent = 0;
+
+ return 0;
+}
+
+void Http3Upstream::signal_write_upstream_addr(const UpstreamAddr *faddr) {
+ auto conn = handler_->get_connection();
+
+ if (faddr->fd != conn->wev.fd) {
+ if (ev_is_active(&conn->wev)) {
+ ev_io_stop(handler_->get_loop(), &conn->wev);
+ }
+
+ ev_io_set(&conn->wev, faddr->fd, EV_WRITE);
+ }
+
+ conn->wlimit.startw();
+}
+
+int Http3Upstream::handle_error() {
+ if (ngtcp2_conn_in_closing_period(conn_) ||
+ ngtcp2_conn_in_draining_period(conn_)) {
+ return -1;
+ }
+
+ ngtcp2_path_storage ps;
+ ngtcp2_pkt_info pi;
+
+ ngtcp2_path_storage_zero(&ps);
+
+ auto ts = quic_timestamp();
+
+ conn_close_.resize(SHRPX_QUIC_CONN_CLOSE_PKTLEN);
+
+ auto nwrite = ngtcp2_conn_write_connection_close(
+ conn_, &ps.path, &pi, conn_close_.data(), conn_close_.size(),
+ &last_error_, ts);
+ if (nwrite < 0) {
+ ULOG(ERROR, this) << "ngtcp2_conn_write_connection_close: "
+ << ngtcp2_strerror(nwrite);
+ return -1;
+ }
+
+ conn_close_.resize(nwrite);
+
+ if (nwrite == 0) {
+ return -1;
+ }
+
+ send_packet(static_cast<UpstreamAddr *>(ps.path.user_data),
+ ps.path.remote.addr, ps.path.remote.addrlen, ps.path.local.addr,
+ ps.path.local.addrlen, pi, conn_close_.data(), nwrite, 0);
+
+ return -1;
+}
+
+int Http3Upstream::handle_expiry() {
+ int rv;
+
+ auto ts = quic_timestamp();
+
+ rv = ngtcp2_conn_handle_expiry(conn_, ts);
+ if (rv != 0) {
+ if (rv == NGTCP2_ERR_IDLE_CLOSE) {
+ ULOG(INFO, this) << "Idle connection timeout";
+ } else {
+ ULOG(ERROR, this) << "ngtcp2_conn_handle_expiry: " << ngtcp2_strerror(rv);
+ }
+ ngtcp2_ccerr_set_liberr(&last_error_, rv, nullptr, 0);
+ return handle_error();
+ }
+
+ return 0;
+}
+
+void Http3Upstream::reset_timer() {
+ auto ts = quic_timestamp();
+ auto expiry_ts = ngtcp2_conn_get_expiry(conn_);
+ auto loop = handler_->get_loop();
+
+ if (expiry_ts <= ts) {
+ ev_feed_event(loop, &timer_, EV_TIMER);
+ return;
+ }
+
+ timer_.repeat = static_cast<ev_tstamp>(expiry_ts - ts) / NGTCP2_SECONDS;
+
+ ev_timer_again(loop, &timer_);
+}
+
+namespace {
+int http_deferred_consume(nghttp3_conn *conn, int64_t stream_id,
+ size_t nconsumed, void *user_data,
+ void *stream_user_data) {
+ auto upstream = static_cast<Http3Upstream *>(user_data);
+
+ upstream->consume(stream_id, nconsumed);
+
+ return 0;
+}
+} // namespace
+
+namespace {
+int http_acked_stream_data(nghttp3_conn *conn, int64_t stream_id,
+ uint64_t datalen, void *user_data,
+ void *stream_user_data) {
+ auto upstream = static_cast<Http3Upstream *>(user_data);
+ auto downstream = static_cast<Downstream *>(stream_user_data);
+
+ assert(downstream);
+
+ if (upstream->http_acked_stream_data(downstream, datalen) != 0) {
+ return NGHTTP3_ERR_CALLBACK_FAILURE;
+ }
+
+ return 0;
+}
+} // namespace
+
+int Http3Upstream::http_acked_stream_data(Downstream *downstream,
+ uint64_t datalen) {
+ if (LOG_ENABLED(INFO)) {
+ ULOG(INFO, this) << "Stream " << downstream->get_stream_id() << " "
+ << datalen << " bytes acknowledged";
+ }
+
+ auto body = downstream->get_response_buf();
+ auto drained = body->drain_mark(datalen);
+ (void)drained;
+
+ assert(datalen == drained);
+
+ if (downstream->resume_read(SHRPX_NO_BUFFER, datalen) != 0) {
+ return -1;
+ }
+
+ return 0;
+}
+
+namespace {
+int http_begin_request_headers(nghttp3_conn *conn, int64_t stream_id,
+ void *user_data, void *stream_user_data) {
+ if (!ngtcp2_is_bidi_stream(stream_id)) {
+ return 0;
+ }
+
+ auto upstream = static_cast<Http3Upstream *>(user_data);
+ upstream->http_begin_request_headers(stream_id);
+
+ return 0;
+}
+} // namespace
+
+namespace {
+int http_recv_request_header(nghttp3_conn *conn, int64_t stream_id,
+ int32_t token, nghttp3_rcbuf *name,
+ nghttp3_rcbuf *value, uint8_t flags,
+ void *user_data, void *stream_user_data) {
+ auto upstream = static_cast<Http3Upstream *>(user_data);
+ auto downstream = static_cast<Downstream *>(stream_user_data);
+
+ if (!downstream || downstream->get_stop_reading()) {
+ return 0;
+ }
+
+ if (upstream->http_recv_request_header(downstream, token, name, value, flags,
+ /* trailer = */ false) != 0) {
+ return NGHTTP3_ERR_CALLBACK_FAILURE;
+ }
+
+ return 0;
+}
+} // namespace
+
+namespace {
+int http_recv_request_trailer(nghttp3_conn *conn, int64_t stream_id,
+ int32_t token, nghttp3_rcbuf *name,
+ nghttp3_rcbuf *value, uint8_t flags,
+ void *user_data, void *stream_user_data) {
+ auto upstream = static_cast<Http3Upstream *>(user_data);
+ auto downstream = static_cast<Downstream *>(stream_user_data);
+
+ if (!downstream || downstream->get_stop_reading()) {
+ return 0;
+ }
+
+ if (upstream->http_recv_request_header(downstream, token, name, value, flags,
+ /* trailer = */ true) != 0) {
+ return NGHTTP3_ERR_CALLBACK_FAILURE;
+ }
+
+ return 0;
+}
+} // namespace
+
+int Http3Upstream::http_recv_request_header(Downstream *downstream,
+ int32_t h3token,
+ nghttp3_rcbuf *name,
+ nghttp3_rcbuf *value, uint8_t flags,
+ bool trailer) {
+ auto namebuf = nghttp3_rcbuf_get_buf(name);
+ auto valuebuf = nghttp3_rcbuf_get_buf(value);
+ auto &req = downstream->request();
+ auto config = get_config();
+ auto &httpconf = config->http;
+
+ if (req.fs.buffer_size() + namebuf.len + valuebuf.len >
+ httpconf.request_header_field_buffer ||
+ req.fs.num_fields() >= httpconf.max_request_header_fields) {
+ downstream->set_stop_reading(true);
+
+ if (downstream->get_response_state() == DownstreamState::MSG_COMPLETE) {
+ return 0;
+ }
+
+ if (LOG_ENABLED(INFO)) {
+ ULOG(INFO, this) << "Too large or many header field size="
+ << req.fs.buffer_size() + namebuf.len + valuebuf.len
+ << ", num=" << req.fs.num_fields() + 1;
+ }
+
+ // just ignore if this is a trailer part.
+ if (trailer) {
+ return 0;
+ }
+
+ if (error_reply(downstream, 431) != 0) {
+ return -1;
+ }
+
+ return 0;
+ }
+
+ auto token = http2::lookup_token(namebuf.base, namebuf.len);
+ auto no_index = flags & NGHTTP3_NV_FLAG_NEVER_INDEX;
+
+ downstream->add_rcbuf(name);
+ downstream->add_rcbuf(value);
+
+ if (trailer) {
+ req.fs.add_trailer_token(StringRef{namebuf.base, namebuf.len},
+ StringRef{valuebuf.base, valuebuf.len}, no_index,
+ token);
+ return 0;
+ }
+
+ req.fs.add_header_token(StringRef{namebuf.base, namebuf.len},
+ StringRef{valuebuf.base, valuebuf.len}, no_index,
+ token);
+ return 0;
+}
+
+namespace {
+int http_end_request_headers(nghttp3_conn *conn, int64_t stream_id, int fin,
+ void *user_data, void *stream_user_data) {
+ auto upstream = static_cast<Http3Upstream *>(user_data);
+ auto handler = upstream->get_client_handler();
+ auto downstream = static_cast<Downstream *>(stream_user_data);
+
+ if (!downstream || downstream->get_stop_reading()) {
+ return 0;
+ }
+
+ if (upstream->http_end_request_headers(downstream, fin) != 0) {
+ return NGHTTP3_ERR_CALLBACK_FAILURE;
+ }
+
+ downstream->reset_upstream_rtimer();
+ handler->stop_read_timer();
+
+ return 0;
+}
+} // namespace
+
+int Http3Upstream::http_end_request_headers(Downstream *downstream, int fin) {
+ auto lgconf = log_config();
+ lgconf->update_tstamp(std::chrono::system_clock::now());
+ auto &req = downstream->request();
+ req.tstamp = lgconf->tstamp;
+
+ if (downstream->get_response_state() == DownstreamState::MSG_COMPLETE) {
+ return 0;
+ }
+
+ auto &nva = req.fs.headers();
+
+ if (LOG_ENABLED(INFO)) {
+ std::stringstream ss;
+ for (auto &nv : nva) {
+ if (nv.name == "authorization") {
+ ss << TTY_HTTP_HD << nv.name << TTY_RST << ": <redacted>\n";
+ continue;
+ }
+ ss << TTY_HTTP_HD << nv.name << TTY_RST << ": " << nv.value << "\n";
+ }
+ ULOG(INFO, this) << "HTTP request headers. stream_id="
+ << downstream->get_stream_id() << "\n"
+ << ss.str();
+ }
+
+ auto content_length = req.fs.header(http2::HD_CONTENT_LENGTH);
+ if (content_length) {
+ // libnghttp3 guarantees this can be parsed
+ req.fs.content_length = util::parse_uint(content_length->value);
+ }
+
+ // presence of mandatory header fields are guaranteed by libnghttp3.
+ auto authority = req.fs.header(http2::HD__AUTHORITY);
+ auto path = req.fs.header(http2::HD__PATH);
+ auto method = req.fs.header(http2::HD__METHOD);
+ auto scheme = req.fs.header(http2::HD__SCHEME);
+
+ auto method_token = http2::lookup_method_token(method->value);
+ if (method_token == -1) {
+ if (error_reply(downstream, 501) != 0) {
+ return -1;
+ }
+ return 0;
+ }
+
+ auto faddr = handler_->get_upstream_addr();
+
+ auto config = get_config();
+
+ // For HTTP/2 proxy, we require :authority.
+ if (method_token != HTTP_CONNECT && config->http2_proxy &&
+ faddr->alt_mode == UpstreamAltMode::NONE && !authority) {
+ shutdown_stream(downstream, NGHTTP3_H3_GENERAL_PROTOCOL_ERROR);
+ return 0;
+ }
+
+ req.method = method_token;
+ if (scheme) {
+ req.scheme = scheme->value;
+ }
+
+ // nghttp2 library guarantees either :authority or host exist
+ if (!authority) {
+ req.no_authority = true;
+ authority = req.fs.header(http2::HD_HOST);
+ }
+
+ if (authority) {
+ req.authority = authority->value;
+ }
+
+ if (path) {
+ if (method_token == HTTP_OPTIONS &&
+ path->value == StringRef::from_lit("*")) {
+ // Server-wide OPTIONS request. Path is empty.
+ } else if (config->http2_proxy &&
+ faddr->alt_mode == UpstreamAltMode::NONE) {
+ req.path = path->value;
+ } else {
+ req.path = http2::rewrite_clean_path(downstream->get_block_allocator(),
+ path->value);
+ }
+ }
+
+ auto connect_proto = req.fs.header(http2::HD__PROTOCOL);
+ if (connect_proto) {
+ if (connect_proto->value != "websocket") {
+ if (error_reply(downstream, 400) != 0) {
+ return -1;
+ }
+ return 0;
+ }
+ req.connect_proto = ConnectProto::WEBSOCKET;
+ }
+
+ if (!fin) {
+ req.http2_expect_body = true;
+ } else if (req.fs.content_length == -1) {
+ req.fs.content_length = 0;
+ }
+
+ downstream->inspect_http2_request();
+
+ downstream->set_request_state(DownstreamState::HEADER_COMPLETE);
+
+ if (config->http.require_http_scheme &&
+ !http::check_http_scheme(req.scheme, /* encrypted = */ true)) {
+ if (error_reply(downstream, 400) != 0) {
+ return -1;
+ }
+ }
+
+#ifdef HAVE_MRUBY
+ auto worker = handler_->get_worker();
+ auto mruby_ctx = worker->get_mruby_context();
+
+ if (mruby_ctx->run_on_request_proc(downstream) != 0) {
+ if (error_reply(downstream, 500) != 0) {
+ return -1;
+ }
+ return 0;
+ }
+#endif // HAVE_MRUBY
+
+ if (downstream->get_response_state() == DownstreamState::MSG_COMPLETE) {
+ return 0;
+ }
+
+ start_downstream(downstream);
+
+ return 0;
+}
+
+void Http3Upstream::start_downstream(Downstream *downstream) {
+ if (downstream_queue_.can_activate(downstream->request().authority)) {
+ initiate_downstream(downstream);
+ return;
+ }
+
+ downstream_queue_.mark_blocked(downstream);
+}
+
+void Http3Upstream::initiate_downstream(Downstream *downstream) {
+ int rv;
+
+#ifdef HAVE_MRUBY
+ DownstreamConnection *dconn_ptr;
+#endif // HAVE_MRUBY
+
+ for (;;) {
+ auto dconn = handler_->get_downstream_connection(rv, downstream);
+ if (!dconn) {
+ if (rv == SHRPX_ERR_TLS_REQUIRED) {
+ assert(0);
+ abort();
+ }
+
+ rv = error_reply(downstream, 502);
+ if (rv != 0) {
+ shutdown_stream(downstream, NGHTTP3_H3_INTERNAL_ERROR);
+ }
+
+ downstream->set_request_state(DownstreamState::CONNECT_FAIL);
+ downstream_queue_.mark_failure(downstream);
+
+ return;
+ }
+
+#ifdef HAVE_MRUBY
+ dconn_ptr = dconn.get();
+#endif // HAVE_MRUBY
+ rv = downstream->attach_downstream_connection(std::move(dconn));
+ if (rv == 0) {
+ break;
+ }
+ }
+
+#ifdef HAVE_MRUBY
+ const auto &group = dconn_ptr->get_downstream_addr_group();
+ if (group) {
+ const auto &mruby_ctx = group->shared_addr->mruby_ctx;
+ if (mruby_ctx->run_on_request_proc(downstream) != 0) {
+ if (error_reply(downstream, 500) != 0) {
+ shutdown_stream(downstream, NGHTTP3_H3_INTERNAL_ERROR);
+ }
+
+ downstream_queue_.mark_failure(downstream);
+
+ return;
+ }
+
+ if (downstream->get_response_state() == DownstreamState::MSG_COMPLETE) {
+ return;
+ }
+ }
+#endif // HAVE_MRUBY
+
+ rv = downstream->push_request_headers();
+ if (rv != 0) {
+
+ if (error_reply(downstream, 502) != 0) {
+ shutdown_stream(downstream, NGHTTP3_H3_INTERNAL_ERROR);
+ }
+
+ downstream_queue_.mark_failure(downstream);
+
+ return;
+ }
+
+ downstream_queue_.mark_active(downstream);
+
+ auto &req = downstream->request();
+ if (!req.http2_expect_body) {
+ rv = downstream->end_upload_data();
+ if (rv != 0) {
+ shutdown_stream(downstream, NGHTTP3_H3_INTERNAL_ERROR);
+ }
+ }
+}
+
+namespace {
+int http_recv_data(nghttp3_conn *conn, int64_t stream_id, const uint8_t *data,
+ size_t datalen, void *user_data, void *stream_user_data) {
+ auto upstream = static_cast<Http3Upstream *>(user_data);
+ auto downstream = static_cast<Downstream *>(stream_user_data);
+
+ if (upstream->http_recv_data(downstream, data, datalen) != 0) {
+ return NGHTTP3_ERR_CALLBACK_FAILURE;
+ }
+
+ return 0;
+}
+} // namespace
+
+int Http3Upstream::http_recv_data(Downstream *downstream, const uint8_t *data,
+ size_t datalen) {
+ downstream->reset_upstream_rtimer();
+
+ if (downstream->push_upload_data_chunk(data, datalen) != 0) {
+ if (downstream->get_response_state() != DownstreamState::MSG_COMPLETE) {
+ shutdown_stream(downstream, NGHTTP3_H3_INTERNAL_ERROR);
+ }
+
+ consume(downstream->get_stream_id(), datalen);
+
+ return 0;
+ }
+
+ return 0;
+}
+
+namespace {
+int http_end_stream(nghttp3_conn *conn, int64_t stream_id, void *user_data,
+ void *stream_user_data) {
+ auto upstream = static_cast<Http3Upstream *>(user_data);
+ auto downstream = static_cast<Downstream *>(stream_user_data);
+
+ if (!downstream || downstream->get_stop_reading()) {
+ return 0;
+ }
+
+ if (upstream->http_end_stream(downstream) != 0) {
+ return NGHTTP3_ERR_CALLBACK_FAILURE;
+ }
+
+ return 0;
+}
+} // namespace
+
+int Http3Upstream::http_end_stream(Downstream *downstream) {
+ downstream->disable_upstream_rtimer();
+
+ if (downstream->end_upload_data() != 0) {
+ if (downstream->get_response_state() != DownstreamState::MSG_COMPLETE) {
+ shutdown_stream(downstream, NGHTTP3_H3_INTERNAL_ERROR);
+ }
+ }
+
+ downstream->set_request_state(DownstreamState::MSG_COMPLETE);
+
+ return 0;
+}
+
+namespace {
+int http_stream_close(nghttp3_conn *conn, int64_t stream_id,
+ uint64_t app_error_code, void *conn_user_data,
+ void *stream_user_data) {
+ auto upstream = static_cast<Http3Upstream *>(conn_user_data);
+ auto downstream = static_cast<Downstream *>(stream_user_data);
+
+ if (!downstream) {
+ return 0;
+ }
+
+ if (upstream->http_stream_close(downstream, app_error_code) != 0) {
+ return NGHTTP3_ERR_CALLBACK_FAILURE;
+ }
+
+ return 0;
+}
+} // namespace
+
+int Http3Upstream::http_stream_close(Downstream *downstream,
+ uint64_t app_error_code) {
+ auto stream_id = downstream->get_stream_id();
+
+ if (LOG_ENABLED(INFO)) {
+ ULOG(INFO, this) << "Stream stream_id=" << stream_id
+ << " is being closed with app_error_code="
+ << app_error_code;
+
+ auto body = downstream->get_response_buf();
+
+ ULOG(INFO, this) << "response unacked_left=" << body->rleft()
+ << " not_sent=" << body->rleft_mark();
+ }
+
+ auto &req = downstream->request();
+
+ consume(stream_id, req.unconsumed_body_length);
+
+ req.unconsumed_body_length = 0;
+
+ ngtcp2_conn_extend_max_streams_bidi(conn_, 1);
+
+ if (downstream->get_request_state() == DownstreamState::CONNECT_FAIL) {
+ remove_downstream(downstream);
+ // downstream was deleted
+
+ return 0;
+ }
+
+ if (downstream->can_detach_downstream_connection()) {
+ // Keep-alive
+ downstream->detach_downstream_connection();
+ }
+
+ downstream->set_request_state(DownstreamState::STREAM_CLOSED);
+
+ // At this point, downstream read may be paused.
+
+ // If shrpx_downstream::push_request_headers() failed, the
+ // error is handled here.
+ remove_downstream(downstream);
+ // downstream was deleted
+
+ return 0;
+}
+
+namespace {
+int http_stop_sending(nghttp3_conn *conn, int64_t stream_id,
+ uint64_t app_error_code, void *user_data,
+ void *stream_user_data) {
+ auto upstream = static_cast<Http3Upstream *>(user_data);
+
+ if (upstream->http_stop_sending(stream_id, app_error_code) != 0) {
+ return NGHTTP3_ERR_CALLBACK_FAILURE;
+ }
+
+ return 0;
+}
+} // namespace
+
+int Http3Upstream::http_stop_sending(int64_t stream_id,
+ uint64_t app_error_code) {
+ auto rv =
+ ngtcp2_conn_shutdown_stream_read(conn_, 0, stream_id, app_error_code);
+ if (ngtcp2_err_is_fatal(rv)) {
+ ULOG(ERROR, this) << "ngtcp2_conn_shutdown_stream_read: "
+ << ngtcp2_strerror(rv);
+ return -1;
+ }
+
+ return 0;
+}
+
+namespace {
+int http_reset_stream(nghttp3_conn *conn, int64_t stream_id,
+ uint64_t app_error_code, void *user_data,
+ void *stream_user_data) {
+ auto upstream = static_cast<Http3Upstream *>(user_data);
+
+ if (upstream->http_reset_stream(stream_id, app_error_code) != 0) {
+ return NGHTTP3_ERR_CALLBACK_FAILURE;
+ }
+
+ return 0;
+}
+} // namespace
+
+int Http3Upstream::http_reset_stream(int64_t stream_id,
+ uint64_t app_error_code) {
+ auto rv =
+ ngtcp2_conn_shutdown_stream_write(conn_, 0, stream_id, app_error_code);
+ if (ngtcp2_err_is_fatal(rv)) {
+ ULOG(ERROR, this) << "ngtcp2_conn_shutdown_stream_write: "
+ << ngtcp2_strerror(rv);
+ return -1;
+ }
+
+ return 0;
+}
+
+int Http3Upstream::setup_httpconn() {
+ int rv;
+
+ if (ngtcp2_conn_get_streams_uni_left(conn_) < 3) {
+ return -1;
+ }
+
+ nghttp3_callbacks callbacks{
+ shrpx::http_acked_stream_data,
+ shrpx::http_stream_close,
+ shrpx::http_recv_data,
+ http_deferred_consume,
+ shrpx::http_begin_request_headers,
+ shrpx::http_recv_request_header,
+ shrpx::http_end_request_headers,
+ nullptr, // begin_trailers
+ shrpx::http_recv_request_trailer,
+ nullptr, // end_trailers
+ shrpx::http_stop_sending,
+ shrpx::http_end_stream,
+ shrpx::http_reset_stream,
+ };
+
+ auto config = get_config();
+
+ nghttp3_settings settings;
+ nghttp3_settings_default(&settings);
+ settings.qpack_max_dtable_capacity = 4_k;
+
+ if (!config->http2_proxy) {
+ settings.enable_connect_protocol = 1;
+ }
+
+ auto mem = nghttp3_mem_default();
+
+ rv = nghttp3_conn_server_new(&httpconn_, &callbacks, &settings, mem, this);
+ if (rv != 0) {
+ ULOG(ERROR, this) << "nghttp3_conn_server_new: " << nghttp3_strerror(rv);
+ return -1;
+ }
+
+ auto params = ngtcp2_conn_get_local_transport_params(conn_);
+
+ nghttp3_conn_set_max_client_streams_bidi(httpconn_,
+ params->initial_max_streams_bidi);
+
+ int64_t ctrl_stream_id;
+
+ rv = ngtcp2_conn_open_uni_stream(conn_, &ctrl_stream_id, nullptr);
+ if (rv != 0) {
+ ULOG(ERROR, this) << "ngtcp2_conn_open_uni_stream: " << ngtcp2_strerror(rv);
+ return -1;
+ }
+
+ rv = nghttp3_conn_bind_control_stream(httpconn_, ctrl_stream_id);
+ if (rv != 0) {
+ ULOG(ERROR, this) << "nghttp3_conn_bind_control_stream: "
+ << nghttp3_strerror(rv);
+ return -1;
+ }
+
+ int64_t qpack_enc_stream_id, qpack_dec_stream_id;
+
+ rv = ngtcp2_conn_open_uni_stream(conn_, &qpack_enc_stream_id, nullptr);
+ if (rv != 0) {
+ ULOG(ERROR, this) << "ngtcp2_conn_open_uni_stream: " << ngtcp2_strerror(rv);
+ return -1;
+ }
+
+ rv = ngtcp2_conn_open_uni_stream(conn_, &qpack_dec_stream_id, nullptr);
+ if (rv != 0) {
+ ULOG(ERROR, this) << "ngtcp2_conn_open_uni_stream: " << ngtcp2_strerror(rv);
+ return -1;
+ }
+
+ rv = nghttp3_conn_bind_qpack_streams(httpconn_, qpack_enc_stream_id,
+ qpack_dec_stream_id);
+ if (rv != 0) {
+ ULOG(ERROR, this) << "nghttp3_conn_bind_qpack_streams: "
+ << nghttp3_strerror(rv);
+ return -1;
+ }
+
+ return 0;
+}
+
+int Http3Upstream::error_reply(Downstream *downstream,
+ unsigned int status_code) {
+ int rv;
+ auto &resp = downstream->response();
+
+ auto &balloc = downstream->get_block_allocator();
+
+ auto html = http::create_error_html(balloc, status_code);
+ resp.http_status = status_code;
+ auto body = downstream->get_response_buf();
+ body->append(html);
+ downstream->set_response_state(DownstreamState::MSG_COMPLETE);
+
+ nghttp3_data_reader data_read;
+ data_read.read_data = downstream_read_data_callback;
+
+ auto lgconf = log_config();
+ lgconf->update_tstamp(std::chrono::system_clock::now());
+
+ auto response_status = http2::stringify_status(balloc, status_code);
+ auto content_length = util::make_string_ref_uint(balloc, html.size());
+ auto date = make_string_ref(balloc, lgconf->tstamp->time_http);
+
+ auto nva = std::array<nghttp3_nv, 5>{
+ {http3::make_nv_ls_nocopy(":status", response_status),
+ http3::make_nv_ll("content-type", "text/html; charset=UTF-8"),
+ http3::make_nv_ls_nocopy("server", get_config()->http.server_name),
+ http3::make_nv_ls_nocopy("content-length", content_length),
+ http3::make_nv_ls_nocopy("date", date)}};
+
+ rv = nghttp3_conn_submit_response(httpconn_, downstream->get_stream_id(),
+ nva.data(), nva.size(), &data_read);
+ if (nghttp3_err_is_fatal(rv)) {
+ ULOG(FATAL, this) << "nghttp3_conn_submit_response() failed: "
+ << nghttp3_strerror(rv);
+ return -1;
+ }
+
+ downstream->reset_upstream_wtimer();
+
+ if (shutdown_stream_read(downstream->get_stream_id(), NGHTTP3_H3_NO_ERROR) !=
+ 0) {
+ return -1;
+ }
+
+ return 0;
+}
+
+int Http3Upstream::shutdown_stream(Downstream *downstream,
+ uint64_t app_error_code) {
+ auto stream_id = downstream->get_stream_id();
+
+ if (LOG_ENABLED(INFO)) {
+ ULOG(INFO, this) << "Shutdown stream_id=" << stream_id
+ << " with app_error_code=" << app_error_code;
+ }
+
+ auto rv = ngtcp2_conn_shutdown_stream(conn_, 0, stream_id, app_error_code);
+ if (rv != 0) {
+ ULOG(FATAL, this) << "ngtcp2_conn_shutdown_stream() failed: "
+ << ngtcp2_strerror(rv);
+ return -1;
+ }
+
+ return 0;
+}
+
+int Http3Upstream::shutdown_stream_read(int64_t stream_id,
+ uint64_t app_error_code) {
+ auto rv = ngtcp2_conn_shutdown_stream_read(conn_, 0, stream_id,
+ NGHTTP3_H3_NO_ERROR);
+ if (ngtcp2_err_is_fatal(rv)) {
+ ULOG(FATAL, this) << "ngtcp2_conn_shutdown_stream_read: "
+ << ngtcp2_strerror(rv);
+ return -1;
+ }
+
+ return 0;
+}
+
+void Http3Upstream::consume(int64_t stream_id, size_t nconsumed) {
+ ngtcp2_conn_extend_max_stream_offset(conn_, stream_id, nconsumed);
+ ngtcp2_conn_extend_max_offset(conn_, nconsumed);
+}
+
+void Http3Upstream::remove_downstream(Downstream *downstream) {
+ if (downstream->accesslog_ready()) {
+ handler_->write_accesslog(downstream);
+ }
+
+ nghttp3_conn_set_stream_user_data(httpconn_, downstream->get_stream_id(),
+ nullptr);
+
+ auto next_downstream = downstream_queue_.remove_and_get_blocked(downstream);
+
+ if (next_downstream) {
+ initiate_downstream(next_downstream);
+ }
+
+ if (downstream_queue_.get_downstreams() == nullptr) {
+ // There is no downstream at the moment. Start idle timer now.
+ handler_->repeat_read_timer();
+ }
+}
+
+void Http3Upstream::log_response_headers(
+ Downstream *downstream, const std::vector<nghttp3_nv> &nva) const {
+ std::stringstream ss;
+ for (auto &nv : nva) {
+ ss << TTY_HTTP_HD << StringRef{nv.name, nv.namelen} << TTY_RST << ": "
+ << StringRef{nv.value, nv.valuelen} << "\n";
+ }
+ ULOG(INFO, this) << "HTTP response headers. stream_id="
+ << downstream->get_stream_id() << "\n"
+ << ss.str();
+}
+
+int Http3Upstream::check_shutdown() {
+ auto worker = handler_->get_worker();
+
+ if (!worker->get_graceful_shutdown()) {
+ return 0;
+ }
+
+ ev_prepare_stop(handler_->get_loop(), &prep_);
+
+ return start_graceful_shutdown();
+}
+
+int Http3Upstream::start_graceful_shutdown() {
+ int rv;
+
+ if (ev_is_active(&shutdown_timer_)) {
+ return 0;
+ }
+
+ if (!httpconn_) {
+ return -1;
+ }
+
+ rv = nghttp3_conn_submit_shutdown_notice(httpconn_);
+ if (rv != 0) {
+ ULOG(FATAL, this) << "nghttp3_conn_submit_shutdown_notice: "
+ << nghttp3_strerror(rv);
+ return -1;
+ }
+
+ handler_->signal_write();
+
+ auto t = ngtcp2_conn_get_pto(conn_);
+
+ ev_timer_set(&shutdown_timer_, static_cast<ev_tstamp>(t * 3) / NGTCP2_SECONDS,
+ 0.);
+ ev_timer_start(handler_->get_loop(), &shutdown_timer_);
+
+ return 0;
+}
+
+int Http3Upstream::submit_goaway() {
+ int rv;
+
+ rv = nghttp3_conn_shutdown(httpconn_);
+ if (rv != 0) {
+ ULOG(FATAL, this) << "nghttp3_conn_shutdown: " << nghttp3_strerror(rv);
+ return -1;
+ }
+
+ handler_->signal_write();
+
+ return 0;
+}
+
+int Http3Upstream::open_qlog_file(const StringRef &dir,
+ const ngtcp2_cid &scid) const {
+ std::array<char, sizeof("20141115T125824.741+0900")> buf;
+
+ auto path = dir.str();
+ path += '/';
+ path +=
+ util::format_iso8601_basic(buf.data(), std::chrono::system_clock::now());
+ path += '-';
+ path += util::format_hex(scid.data, scid.datalen);
+ path += ".sqlog";
+
+ int fd;
+
+#ifdef O_CLOEXEC
+ while ((fd = open(path.c_str(), O_WRONLY | O_CREAT | O_TRUNC | O_CLOEXEC,
+ S_IRUSR | S_IWUSR | S_IRGRP)) == -1 &&
+ errno == EINTR)
+ ;
+#else // !O_CLOEXEC
+ while ((fd = open(path.c_str(), O_WRONLY | O_CREAT | O_TRUNC,
+ S_IRUSR | S_IWUSR | S_IRGRP)) == -1 &&
+ errno == EINTR)
+ ;
+
+ if (fd != -1) {
+ util::make_socket_closeonexec(fd);
+ }
+#endif // !O_CLOEXEC
+
+ if (fd == -1) {
+ auto error = errno;
+ ULOG(ERROR, this) << "Failed to open qlog file " << path
+ << ": errno=" << error;
+ return -1;
+ }
+
+ return fd;
+}
+
+ngtcp2_conn *Http3Upstream::get_conn() const { return conn_; }
+
+} // namespace shrpx