summaryrefslogtreecommitdiffstats
path: root/src/shrpx_worker.h
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-28 07:34:15 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-28 07:34:15 +0000
commitafea5f9539cbf1eeaa85ec77d79eb2f59724f470 (patch)
tree2a4eba394a6bc60d2eaa8304d91168a07225d51e /src/shrpx_worker.h
parentInitial commit. (diff)
downloadnghttp2-upstream/1.52.0.tar.xz
nghttp2-upstream/1.52.0.zip
Adding upstream version 1.52.0.upstream/1.52.0upstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to '')
-rw-r--r--src/shrpx_worker.h480
1 files changed, 480 insertions, 0 deletions
diff --git a/src/shrpx_worker.h b/src/shrpx_worker.h
new file mode 100644
index 0000000..3cc7b57
--- /dev/null
+++ b/src/shrpx_worker.h
@@ -0,0 +1,480 @@
+/*
+ * nghttp2 - HTTP/2 C Library
+ *
+ * Copyright (c) 2012 Tatsuhiro Tsujikawa
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining
+ * a copy of this software and associated documentation files (the
+ * "Software"), to deal in the Software without restriction, including
+ * without limitation the rights to use, copy, modify, merge, publish,
+ * distribute, sublicense, and/or sell copies of the Software, and to
+ * permit persons to whom the Software is furnished to do so, subject to
+ * the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be
+ * included in all copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
+ * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
+ * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
+ * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
+ * LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
+ * OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
+ * WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
+ */
+#ifndef SHRPX_WORKER_H
+#define SHRPX_WORKER_H
+
+#include "shrpx.h"
+
+#include <mutex>
+#include <vector>
+#include <random>
+#include <unordered_map>
+#include <deque>
+#include <thread>
+#include <queue>
+#ifndef NOTHREADS
+# include <future>
+#endif // NOTHREADS
+
+#include <openssl/ssl.h>
+#include <openssl/err.h>
+
+#include <ev.h>
+
+#include "shrpx_config.h"
+#include "shrpx_downstream_connection_pool.h"
+#include "memchunk.h"
+#include "shrpx_tls.h"
+#include "shrpx_live_check.h"
+#include "shrpx_connect_blocker.h"
+#include "shrpx_dns_tracker.h"
+#ifdef ENABLE_HTTP3
+# include "shrpx_quic_connection_handler.h"
+# include "shrpx_quic.h"
+#endif // ENABLE_HTTP3
+#include "allocator.h"
+
+using namespace nghttp2;
+
+namespace shrpx {
+
+class Http2Session;
+class ConnectBlocker;
+class MemcachedDispatcher;
+struct UpstreamAddr;
+class ConnectionHandler;
+#ifdef ENABLE_HTTP3
+class QUICListener;
+#endif // ENABLE_HTTP3
+
+#ifdef HAVE_MRUBY
+namespace mruby {
+
+class MRubyContext;
+
+} // namespace mruby
+#endif // HAVE_MRUBY
+
+namespace tls {
+class CertLookupTree;
+} // namespace tls
+
+struct WeightGroup;
+
+struct DownstreamAddr {
+ Address addr;
+ // backend address. If |host_unix| is true, this is UNIX domain
+ // socket path.
+ StringRef host;
+ StringRef hostport;
+ // backend port. 0 if |host_unix| is true.
+ uint16_t port;
+ // true if |host| contains UNIX domain socket path.
+ bool host_unix;
+
+ // sni field to send remote server if TLS is enabled.
+ StringRef sni;
+
+ std::unique_ptr<ConnectBlocker> connect_blocker;
+ std::unique_ptr<LiveCheck> live_check;
+ // Connection pool for this particular address if session affinity
+ // is enabled
+ std::unique_ptr<DownstreamConnectionPool> dconn_pool;
+ size_t fall;
+ size_t rise;
+ // Client side TLS session cache
+ tls::TLSSessionCache tls_session_cache;
+ // List of Http2Session which is not fully utilized (i.e., the
+ // server advertised maximum concurrency is not reached). We will
+ // coalesce as much stream as possible in one Http2Session to fully
+ // utilize TCP connection.
+ DList<Http2Session> http2_extra_freelist;
+ WeightGroup *wg;
+ // total number of streams created in HTTP/2 connections for this
+ // address.
+ size_t num_dconn;
+ // the sequence number of this address to randomize the order access
+ // threads.
+ size_t seq;
+ // Application protocol used in this backend
+ Proto proto;
+ // cycle is used to prioritize this address. Lower value takes
+ // higher priority.
+ uint32_t cycle;
+ // penalty which is applied to the next cycle calculation.
+ uint32_t pending_penalty;
+ // Weight of this address inside a weight group. Its range is [1,
+ // 256], inclusive.
+ uint32_t weight;
+ // name of group which this address belongs to.
+ StringRef group;
+ // Weight of the weight group which this address belongs to. Its
+ // range is [1, 256], inclusive.
+ uint32_t group_weight;
+ // affinity hash for this address. It is assigned when strict
+ // stickiness is enabled.
+ uint32_t affinity_hash;
+ // true if TLS is used in this backend
+ bool tls;
+ // true if dynamic DNS is enabled
+ bool dns;
+ // true if :scheme pseudo header field should be upgraded to secure
+ // variant (e.g., "https") when forwarding request to a backend
+ // connected by TLS connection.
+ bool upgrade_scheme;
+ // true if this address is queued.
+ bool queued;
+};
+
+constexpr uint32_t MAX_DOWNSTREAM_ADDR_WEIGHT = 256;
+
+struct DownstreamAddrEntry {
+ DownstreamAddr *addr;
+ size_t seq;
+ uint32_t cycle;
+};
+
+struct DownstreamAddrEntryGreater {
+ bool operator()(const DownstreamAddrEntry &lhs,
+ const DownstreamAddrEntry &rhs) const {
+ auto d = lhs.cycle - rhs.cycle;
+ if (d == 0) {
+ return rhs.seq < lhs.seq;
+ }
+ return d <= 2 * MAX_DOWNSTREAM_ADDR_WEIGHT - 1;
+ }
+};
+
+struct WeightGroup {
+ std::priority_queue<DownstreamAddrEntry, std::vector<DownstreamAddrEntry>,
+ DownstreamAddrEntryGreater>
+ pq;
+ size_t seq;
+ uint32_t weight;
+ uint32_t cycle;
+ uint32_t pending_penalty;
+ // true if this object is queued.
+ bool queued;
+};
+
+struct WeightGroupEntry {
+ WeightGroup *wg;
+ size_t seq;
+ uint32_t cycle;
+};
+
+struct WeightGroupEntryGreater {
+ bool operator()(const WeightGroupEntry &lhs,
+ const WeightGroupEntry &rhs) const {
+ auto d = lhs.cycle - rhs.cycle;
+ if (d == 0) {
+ return rhs.seq < lhs.seq;
+ }
+ return d <= 2 * MAX_DOWNSTREAM_ADDR_WEIGHT - 1;
+ }
+};
+
+struct SharedDownstreamAddr {
+ SharedDownstreamAddr()
+ : balloc(1024, 1024),
+ affinity{SessionAffinity::NONE},
+ redirect_if_not_tls{false},
+ dnf{false},
+ timeout{} {}
+
+ SharedDownstreamAddr(const SharedDownstreamAddr &) = delete;
+ SharedDownstreamAddr(SharedDownstreamAddr &&) = delete;
+ SharedDownstreamAddr &operator=(const SharedDownstreamAddr &) = delete;
+ SharedDownstreamAddr &operator=(SharedDownstreamAddr &&) = delete;
+
+ BlockAllocator balloc;
+ std::vector<DownstreamAddr> addrs;
+ std::vector<WeightGroup> wgs;
+ std::priority_queue<WeightGroupEntry, std::vector<WeightGroupEntry>,
+ WeightGroupEntryGreater>
+ pq;
+ // Bunch of session affinity hash. Only used if affinity ==
+ // SessionAffinity::IP.
+ std::vector<AffinityHash> affinity_hash;
+ // Maps affinity hash of each DownstreamAddr to its index in addrs.
+ // It is only assigned when strict stickiness is enabled.
+ std::unordered_map<uint32_t, size_t> affinity_hash_map;
+#ifdef HAVE_MRUBY
+ std::shared_ptr<mruby::MRubyContext> mruby_ctx;
+#endif // HAVE_MRUBY
+ // Configuration for session affinity
+ AffinityConfig affinity;
+ // Session affinity
+ // true if this group requires that client connection must be TLS,
+ // and the request must be redirected to https URI.
+ bool redirect_if_not_tls;
+ // true if a request should not be forwarded to a backend.
+ bool dnf;
+ // Timeouts for backend connection.
+ struct {
+ ev_tstamp read;
+ ev_tstamp write;
+ } timeout;
+};
+
+struct DownstreamAddrGroup {
+ DownstreamAddrGroup();
+ ~DownstreamAddrGroup();
+
+ DownstreamAddrGroup(const DownstreamAddrGroup &) = delete;
+ DownstreamAddrGroup(DownstreamAddrGroup &&) = delete;
+ DownstreamAddrGroup &operator=(const DownstreamAddrGroup &) = delete;
+ DownstreamAddrGroup &operator=(DownstreamAddrGroup &&) = delete;
+
+ ImmutableString pattern;
+ std::shared_ptr<SharedDownstreamAddr> shared_addr;
+ // true if this group is no longer used for new request. If this is
+ // true, the connection made using one of address in shared_addr
+ // must not be pooled.
+ bool retired;
+};
+
+struct WorkerStat {
+ size_t num_connections;
+ size_t num_close_waits;
+};
+
+#ifdef ENABLE_HTTP3
+struct QUICPacket {
+ QUICPacket(size_t upstream_addr_index, const Address &remote_addr,
+ const Address &local_addr, const ngtcp2_pkt_info &pi,
+ const uint8_t *data, size_t datalen)
+ : upstream_addr_index{upstream_addr_index},
+ remote_addr{remote_addr},
+ local_addr{local_addr},
+ pi{pi},
+ data{data, data + datalen} {}
+ QUICPacket() : upstream_addr_index{}, remote_addr{}, local_addr{}, pi{} {}
+ size_t upstream_addr_index;
+ Address remote_addr;
+ Address local_addr;
+ ngtcp2_pkt_info pi;
+ std::vector<uint8_t> data;
+};
+#endif // ENABLE_HTTP3
+
+enum class WorkerEventType {
+ NEW_CONNECTION = 0x01,
+ REOPEN_LOG = 0x02,
+ GRACEFUL_SHUTDOWN = 0x03,
+ REPLACE_DOWNSTREAM = 0x04,
+#ifdef ENABLE_HTTP3
+ QUIC_PKT_FORWARD = 0x05,
+#endif // ENABLE_HTTP3
+};
+
+struct WorkerEvent {
+ WorkerEventType type;
+ struct {
+ sockaddr_union client_addr;
+ size_t client_addrlen;
+ int client_fd;
+ const UpstreamAddr *faddr;
+ };
+ std::shared_ptr<TicketKeys> ticket_keys;
+ std::shared_ptr<DownstreamConfig> downstreamconf;
+#ifdef ENABLE_HTTP3
+ std::unique_ptr<QUICPacket> quic_pkt;
+#endif // ENABLE_HTTP3
+};
+
+class Worker {
+public:
+ Worker(struct ev_loop *loop, SSL_CTX *sv_ssl_ctx, SSL_CTX *cl_ssl_ctx,
+ SSL_CTX *tls_session_cache_memcached_ssl_ctx,
+ tls::CertLookupTree *cert_tree,
+#ifdef ENABLE_HTTP3
+ SSL_CTX *quic_sv_ssl_ctx, tls::CertLookupTree *quic_cert_tree,
+ const uint8_t *cid_prefix, size_t cid_prefixlen,
+# ifdef HAVE_LIBBPF
+ size_t index,
+# endif // HAVE_LIBBPF
+#endif // ENABLE_HTTP3
+ const std::shared_ptr<TicketKeys> &ticket_keys,
+ ConnectionHandler *conn_handler,
+ std::shared_ptr<DownstreamConfig> downstreamconf);
+ ~Worker();
+ void run_async();
+ void wait();
+ void process_events();
+ void send(WorkerEvent event);
+
+ tls::CertLookupTree *get_cert_lookup_tree() const;
+#ifdef ENABLE_HTTP3
+ tls::CertLookupTree *get_quic_cert_lookup_tree() const;
+#endif // ENABLE_HTTP3
+
+ // These 2 functions make a lock m_ to get/set ticket keys
+ // atomically.
+ std::shared_ptr<TicketKeys> get_ticket_keys();
+ void set_ticket_keys(std::shared_ptr<TicketKeys> ticket_keys);
+
+ WorkerStat *get_worker_stat();
+ struct ev_loop *get_loop() const;
+ SSL_CTX *get_sv_ssl_ctx() const;
+ SSL_CTX *get_cl_ssl_ctx() const;
+#ifdef ENABLE_HTTP3
+ SSL_CTX *get_quic_sv_ssl_ctx() const;
+#endif // ENABLE_HTTP3
+
+ void set_graceful_shutdown(bool f);
+ bool get_graceful_shutdown() const;
+
+ MemchunkPool *get_mcpool();
+ void schedule_clear_mcpool();
+
+ MemcachedDispatcher *get_session_cache_memcached_dispatcher();
+
+ std::mt19937 &get_randgen();
+
+#ifdef HAVE_MRUBY
+ int create_mruby_context();
+
+ mruby::MRubyContext *get_mruby_context() const;
+#endif // HAVE_MRUBY
+
+ std::vector<std::shared_ptr<DownstreamAddrGroup>> &
+ get_downstream_addr_groups();
+
+ ConnectBlocker *get_connect_blocker() const;
+
+ const DownstreamConfig *get_downstream_config() const;
+
+ void
+ replace_downstream_config(std::shared_ptr<DownstreamConfig> downstreamconf);
+
+ ConnectionHandler *get_connection_handler() const;
+
+#ifdef ENABLE_HTTP3
+ QUICConnectionHandler *get_quic_connection_handler();
+
+ int setup_quic_server_socket();
+
+ const uint8_t *get_cid_prefix() const;
+
+# ifdef HAVE_LIBBPF
+ bool should_attach_bpf() const;
+
+ bool should_update_bpf_map() const;
+
+ uint32_t compute_sk_index() const;
+# endif // HAVE_LIBBPF
+
+ int create_quic_server_socket(UpstreamAddr &addr);
+
+ // Returns a pointer to UpstreamAddr which matches |local_addr|.
+ const UpstreamAddr *find_quic_upstream_addr(const Address &local_addr);
+#endif // ENABLE_HTTP3
+
+ DNSTracker *get_dns_tracker();
+
+private:
+#ifndef NOTHREADS
+ std::future<void> fut_;
+#endif // NOTHREADS
+#if defined(ENABLE_HTTP3) && defined(HAVE_LIBBPF)
+ // Unique index of this worker.
+ size_t index_;
+#endif // ENABLE_HTTP3 && HAVE_LIBBPF
+ std::mutex m_;
+ std::deque<WorkerEvent> q_;
+ std::mt19937 randgen_;
+ ev_async w_;
+ ev_timer mcpool_clear_timer_;
+ ev_timer proc_wev_timer_;
+ MemchunkPool mcpool_;
+ WorkerStat worker_stat_;
+ DNSTracker dns_tracker_;
+
+#ifdef ENABLE_HTTP3
+ std::array<uint8_t, SHRPX_QUIC_CID_PREFIXLEN> cid_prefix_;
+ std::vector<UpstreamAddr> quic_upstream_addrs_;
+ std::vector<std::unique_ptr<QUICListener>> quic_listeners_;
+#endif // ENABLE_HTTP3
+
+ std::shared_ptr<DownstreamConfig> downstreamconf_;
+ std::unique_ptr<MemcachedDispatcher> session_cache_memcached_dispatcher_;
+#ifdef HAVE_MRUBY
+ std::unique_ptr<mruby::MRubyContext> mruby_ctx_;
+#endif // HAVE_MRUBY
+ struct ev_loop *loop_;
+
+ // Following fields are shared across threads if
+ // get_config()->tls_ctx_per_worker == true.
+ SSL_CTX *sv_ssl_ctx_;
+ SSL_CTX *cl_ssl_ctx_;
+ tls::CertLookupTree *cert_tree_;
+ ConnectionHandler *conn_handler_;
+#ifdef ENABLE_HTTP3
+ SSL_CTX *quic_sv_ssl_ctx_;
+ tls::CertLookupTree *quic_cert_tree_;
+
+ QUICConnectionHandler quic_conn_handler_;
+#endif // ENABLE_HTTP3
+
+#ifndef HAVE_ATOMIC_STD_SHARED_PTR
+ std::mutex ticket_keys_m_;
+#endif // !HAVE_ATOMIC_STD_SHARED_PTR
+ std::shared_ptr<TicketKeys> ticket_keys_;
+ std::vector<std::shared_ptr<DownstreamAddrGroup>> downstream_addr_groups_;
+ // Worker level blocker for downstream connection. For example,
+ // this is used when file descriptor is exhausted.
+ std::unique_ptr<ConnectBlocker> connect_blocker_;
+
+ bool graceful_shutdown_;
+};
+
+// Selects group based on request's |hostport| and |path|. |hostport|
+// is the value taken from :authority or host header field, and may
+// contain port. The |path| may contain query part. We require the
+// catch-all pattern in place, so this function always selects one
+// group. The catch-all group index is given in |catch_all|. All
+// patterns are given in |groups|.
+size_t match_downstream_addr_group(
+ const RouterConfig &routerconfig, const StringRef &hostport,
+ const StringRef &path,
+ const std::vector<std::shared_ptr<DownstreamAddrGroup>> &groups,
+ size_t catch_all, BlockAllocator &balloc);
+
+// Calls this function if connecting to backend failed. |raddr| is
+// the actual address used to connect to backend, and it could be
+// nullptr. This function may schedule live check.
+void downstream_failure(DownstreamAddr *addr, const Address *raddr);
+
+#ifdef ENABLE_HTTP3
+// Creates unpredictable SHRPX_QUIC_CID_PREFIXLEN bytes sequence which
+// is used as a prefix of QUIC Connection ID. This function returns
+// -1 on failure. |server_id| must be 2 bytes long.
+int create_cid_prefix(uint8_t *cid_prefix, const uint8_t *server_id);
+#endif // ENABLE_HTTP3
+
+} // namespace shrpx
+
+#endif // SHRPX_WORKER_H