summaryrefslogtreecommitdiffstats
path: root/src/proto_rhttp.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/proto_rhttp.c')
-rw-r--r--src/proto_rhttp.c464
1 files changed, 464 insertions, 0 deletions
diff --git a/src/proto_rhttp.c b/src/proto_rhttp.c
new file mode 100644
index 0000000..452ee32
--- /dev/null
+++ b/src/proto_rhttp.c
@@ -0,0 +1,464 @@
+#include <stdio.h>
+#include <string.h>
+
+#include <haproxy/api.h>
+#include <haproxy/connection.h>
+#include <haproxy/errors.h>
+#include <haproxy/intops.h>
+#include <haproxy/list.h>
+#include <haproxy/listener.h>
+#include <haproxy/log.h>
+#include <haproxy/proto_tcp.h>
+#include <haproxy/protocol.h>
+#include <haproxy/proxy.h>
+#include <haproxy/sample.h>
+#include <haproxy/server.h>
+#include <haproxy/sock.h>
+#include <haproxy/ssl_sock.h>
+#include <haproxy/task.h>
+
+#include <haproxy/proto_rhttp.h>
+
+struct proto_fam proto_fam_rhttp = {
+ .name = "rhttp",
+ .sock_domain = AF_CUST_RHTTP_SRV,
+ .sock_family = AF_INET,
+ .bind = rhttp_bind_receiver,
+};
+
+struct protocol proto_rhttp = {
+ .name = "rev",
+
+ /* connection layer (no outgoing connection) */
+ .listen = rhttp_bind_listener,
+ .enable = rhttp_enable_listener,
+ .disable = rhttp_disable_listener,
+ .add = default_add_listener,
+ .unbind = rhttp_unbind_receiver,
+ .resume = default_resume_listener,
+ .accept_conn = rhttp_accept_conn,
+ .set_affinity = rhttp_set_affinity,
+
+ /* address family */
+ .fam = &proto_fam_rhttp,
+
+ /* socket layer */
+ .proto_type = PROTO_TYPE_STREAM,
+ .sock_type = SOCK_STREAM,
+ .sock_prot = IPPROTO_TCP,
+ .rx_listening = rhttp_accepting_conn,
+ .receivers = LIST_HEAD_INIT(proto_rhttp.receivers),
+};
+
+static struct connection *new_reverse_conn(struct listener *l, struct server *srv)
+{
+ struct connection *conn = conn_new(srv);
+ struct sockaddr_storage *bind_addr = NULL;
+ if (!conn)
+ goto err;
+
+ HA_ATOMIC_INC(&th_ctx->nb_rhttp_conns);
+
+ conn_set_reverse(conn, &l->obj_type);
+
+ if (alloc_bind_address(&bind_addr, srv, srv->proxy, NULL) != SRV_STATUS_OK)
+ goto err;
+ conn->src = bind_addr;
+
+ sockaddr_alloc(&conn->dst, 0, 0);
+ if (!conn->dst)
+ goto err;
+ *conn->dst = srv->addr;
+ set_host_port(conn->dst, srv->svc_port);
+
+ if (conn_prepare(conn, protocol_lookup(conn->dst->ss_family, PROTO_TYPE_STREAM, 0), srv->xprt))
+ goto err;
+
+ if (conn->ctrl->connect(conn, 0) != SF_ERR_NONE)
+ goto err;
+
+#ifdef USE_OPENSSL
+ if (srv->ssl_ctx.sni) {
+ struct sample *sni_smp = NULL;
+ /* TODO remove NULL session which can cause crash depending on the SNI sample expr used. */
+ sni_smp = sample_fetch_as_type(srv->proxy, NULL, NULL,
+ SMP_OPT_DIR_REQ | SMP_OPT_FINAL,
+ srv->ssl_ctx.sni, SMP_T_STR);
+ if (smp_make_safe(sni_smp))
+ ssl_sock_set_servername(conn, sni_smp->data.u.str.area);
+ }
+#endif /* USE_OPENSSL */
+
+ if (conn_xprt_start(conn) < 0)
+ goto err;
+
+ if (!srv->use_ssl ||
+ (!srv->ssl_ctx.alpn_str && !srv->ssl_ctx.npn_str) ||
+ srv->mux_proto) {
+ if (conn_install_mux_be(conn, NULL, NULL, NULL) < 0)
+ goto err;
+ }
+
+ /* Not expected here. */
+ BUG_ON((conn->flags & CO_FL_HANDSHAKE));
+ return conn;
+
+ err:
+ if (conn) {
+ conn_stop_tracking(conn);
+ conn_xprt_shutw(conn);
+ conn_xprt_close(conn);
+ conn_sock_shutw(conn, 0);
+ conn_ctrl_close(conn);
+
+ if (conn->destroy_cb)
+ conn->destroy_cb(conn);
+
+ /* Mark connection as non-reversable. This prevents conn_free()
+ * to reschedule rhttp task on freeing a preconnect connection.
+ */
+ conn->reverse.target = NULL;
+ conn_free(conn);
+ }
+
+ return NULL;
+}
+
+/* Report that a connection used for preconnect on listener <l> is freed before
+ * reversal is completed. This is used to cleanup any reference to the
+ * connection and rearm a new preconnect attempt.
+ */
+void rhttp_notify_preconn_err(struct listener *l)
+{
+ /* Receiver must reference a reverse connection as pending. */
+ BUG_ON(!l->rx.rhttp.pend_conn);
+
+ /* Remove reference to the freed connection. */
+ l->rx.rhttp.pend_conn = NULL;
+
+ if (l->rx.rhttp.state != LI_PRECONN_ST_ERR) {
+ send_log(l->bind_conf->frontend, LOG_ERR,
+ "preconnect %s::%s: Error encountered.\n",
+ l->bind_conf->frontend->id, l->bind_conf->rhttp_srvname);
+ l->rx.rhttp.state = LI_PRECONN_ST_ERR;
+ }
+
+ /* Rearm a new preconnect attempt. */
+ l->rx.rhttp.task->expire = MS_TO_TICKS(now_ms + 1000);
+ task_queue(l->rx.rhttp.task);
+}
+
+/* Lookup over listener <l> threads for their current count of active reverse
+ * HTTP connections. Returns the less loaded thread ID.
+ */
+static unsigned int select_thread(struct listener *l)
+{
+ unsigned long mask = l->rx.bind_thread & _HA_ATOMIC_LOAD(&tg->threads_enabled);
+ unsigned int load_min = HA_ATOMIC_LOAD(&th_ctx->nb_rhttp_conns);
+ unsigned int load_thr;
+ unsigned int ret = tid;
+ int i;
+
+ /* Returns current tid if listener runs on one thread only. */
+ if (!atleast2(mask))
+ goto end;
+
+ /* Loop over all threads and return the less loaded one. This needs to
+ * be just an approximation so it's not important if the selected
+ * thread load has varied since its selection.
+ */
+
+ for (i = tg->base; mask; mask >>= 1, i++) {
+ if (!(mask & 0x1))
+ continue;
+
+ load_thr = HA_ATOMIC_LOAD(&ha_thread_ctx[i].nb_rhttp_conns);
+ if (load_min > load_thr) {
+ ret = i;
+ load_min = load_thr;
+ }
+ }
+
+ end:
+ return ret;
+}
+
+/* Detach <task> from its thread and assign it to <new_tid> thread. The task is
+ * queued to be woken up on the new thread.
+ */
+static void task_migrate(struct task *task, uint new_tid)
+{
+ task_unlink_wq(task);
+ task->expire = TICK_ETERNITY;
+ task_set_thread(task, new_tid);
+ task_wakeup(task, TASK_WOKEN_MSG);
+}
+
+struct task *rhttp_process(struct task *task, void *ctx, unsigned int state)
+{
+ struct listener *l = ctx;
+ struct connection *conn = l->rx.rhttp.pend_conn;
+
+ if (conn) {
+ /* Either connection is on error ot the connect timeout fired. */
+ if (conn->flags & CO_FL_ERROR || tick_is_expired(task->expire, now_ms)) {
+ /* If mux already instantiated, let it release the
+ * connection along with its context. Else do cleanup
+ * directly.
+ */
+ if (conn->mux && conn->mux->destroy) {
+ conn->mux->destroy(conn->ctx);
+ }
+ else {
+ conn_stop_tracking(conn);
+ conn_xprt_shutw(conn);
+ conn_xprt_close(conn);
+ conn_sock_shutw(conn, 0);
+ conn_ctrl_close(conn);
+
+ if (conn->destroy_cb)
+ conn->destroy_cb(conn);
+ conn_free(conn);
+ }
+
+ /* conn_free() must report preconnect failure using rhttp_notify_preconn_err(). */
+ BUG_ON(l->rx.rhttp.pend_conn);
+
+ l->rx.rhttp.task->expire = TICKS_TO_MS(now_ms);
+ }
+ else {
+ /* Spurious receiver task woken up despite pend_conn not ready/on error. */
+ BUG_ON(!(conn->flags & CO_FL_ACT_REVERSING));
+
+ /* A connection is ready to be accepted. */
+ listener_accept(l);
+ l->rx.rhttp.task->expire = TICK_ETERNITY;
+ }
+ }
+ else {
+ struct server *srv = l->rx.rhttp.srv;
+
+ if ((state & TASK_WOKEN_ANY) != TASK_WOKEN_MSG) {
+ unsigned int new_tid = select_thread(l);
+ if (new_tid != tid) {
+ task_migrate(l->rx.rhttp.task, new_tid);
+ return task;
+ }
+ }
+
+ /* No pending reverse connection, prepare a new one. Store it in the
+ * listener and return NULL. Connection will be returned later after
+ * reversal is completed.
+ */
+ conn = new_reverse_conn(l, srv);
+ l->rx.rhttp.pend_conn = conn;
+
+ /* On success task will be woken up by H2 mux after reversal. */
+ l->rx.rhttp.task->expire = conn ?
+ tick_add_ifset(now_ms, srv->proxy->timeout.connect) :
+ MS_TO_TICKS(now_ms + 1000);
+ }
+
+ return task;
+}
+
+int rhttp_bind_receiver(struct receiver *rx, char **errmsg)
+{
+ rx->flags |= RX_F_BOUND;
+ return ERR_NONE;
+}
+
+int rhttp_bind_listener(struct listener *listener, char *errmsg, int errlen)
+{
+ struct task *task;
+ struct proxy *be;
+ struct server *srv;
+ struct ist be_name, sv_name;
+ char *name = NULL;
+
+ unsigned long mask;
+ uint task_tid;
+
+ if (listener->state != LI_ASSIGNED)
+ return ERR_NONE; /* already bound */
+
+ /* Retrieve the first thread usable for this listener. */
+ mask = listener->rx.bind_thread & _HA_ATOMIC_LOAD(&tg->threads_enabled);
+ task_tid = my_ffsl(mask) + ha_tgroup_info[listener->rx.bind_tgroup].base;
+ if (!(task = task_new_on(task_tid))) {
+ snprintf(errmsg, errlen, "Out of memory.");
+ goto err;
+ }
+ task->process = rhttp_process;
+ task->context = listener;
+ listener->rx.rhttp.task = task;
+ listener->rx.rhttp.state = LI_PRECONN_ST_STOP;
+
+ /* Set maxconn which is defined via the special kw nbconn for reverse
+ * connect. Use a default value of 1 if not set. This guarantees that
+ * listener will be automatically re-enable each time it fell back below
+ * it due to a connection error.
+ */
+ listener->bind_conf->maxconn = listener->bind_conf->rhttp_nbconn;
+ if (!listener->bind_conf->maxconn)
+ listener->bind_conf->maxconn = 1;
+
+ name = strdup(listener->bind_conf->rhttp_srvname);
+ if (!name) {
+ snprintf(errmsg, errlen, "Out of memory.");
+ goto err;
+ }
+
+ sv_name = ist(name);
+ be_name = istsplit(&sv_name, '/');
+ if (!istlen(sv_name)) {
+ snprintf(errmsg, errlen, "Invalid server name: '%s'.", name);
+ goto err;
+ }
+
+ if (!(be = proxy_be_by_name(ist0(be_name)))) {
+ snprintf(errmsg, errlen, "No such backend: '%s'.", name);
+ goto err;
+ }
+ if (!(srv = server_find_by_name(be, ist0(sv_name)))) {
+ snprintf(errmsg, errlen, "No such server: '%s/%s'.", ist0(be_name), ist0(sv_name));
+ goto err;
+ }
+
+ if (srv->flags & SRV_F_RHTTP) {
+ snprintf(errmsg, errlen, "Cannot use reverse HTTP server '%s/%s' as target to a reverse bind.", ist0(be_name), ist0(sv_name));
+ goto err;
+ }
+
+ if (srv_is_transparent(srv)) {
+ snprintf(errmsg, errlen, "Cannot use transparent server '%s/%s' as target to a reverse bind.", ist0(be_name), ist0(sv_name));
+ goto err;
+ }
+
+ /* Check that server uses HTTP/2 either with proto or ALPN. */
+ if ((!srv->mux_proto || !isteqi(srv->mux_proto->token, ist("h2"))) &&
+ (!srv->use_ssl || !isteqi(ist(srv->ssl_ctx.alpn_str), ist("\x02h2")))) {
+ snprintf(errmsg, errlen, "Cannot reverse connect with server '%s/%s' unless HTTP/2 is activated on it with either proto or alpn keyword.", name, ist0(sv_name));
+ goto err;
+ }
+
+ /* Prevent dynamic source address settings. */
+ if (((srv->conn_src.opts & CO_SRC_TPROXY_MASK) &&
+ (srv->conn_src.opts & CO_SRC_TPROXY_MASK) != CO_SRC_TPROXY_ADDR) ||
+ ((srv->proxy->conn_src.opts & CO_SRC_TPROXY_MASK) &&
+ (srv->proxy->conn_src.opts & CO_SRC_TPROXY_MASK) != CO_SRC_TPROXY_ADDR)) {
+ snprintf(errmsg, errlen, "Cannot reverse connect with server '%s/%s' which uses dynamic source address setting.", name, ist0(sv_name));
+ goto err;
+ }
+
+ ha_free(&name);
+
+ listener->rx.rhttp.srv = srv;
+ listener_set_state(listener, LI_LISTEN);
+
+ return ERR_NONE;
+
+ err:
+ ha_free(&name);
+ return ERR_ALERT | ERR_FATAL;
+}
+
+void rhttp_enable_listener(struct listener *l)
+{
+ if (l->rx.rhttp.state < LI_PRECONN_ST_INIT) {
+ send_log(l->bind_conf->frontend, LOG_INFO,
+ "preconnect %s::%s: Initiating.\n",
+ l->bind_conf->frontend->id, l->bind_conf->rhttp_srvname);
+ l->rx.rhttp.state = LI_PRECONN_ST_INIT;
+ }
+
+ task_wakeup(l->rx.rhttp.task, TASK_WOKEN_ANY);
+}
+
+void rhttp_disable_listener(struct listener *l)
+{
+ if (l->rx.rhttp.state < LI_PRECONN_ST_FULL) {
+ send_log(l->bind_conf->frontend, LOG_INFO,
+ "preconnect %s::%s: Running with nbconn %d reached.\n",
+ l->bind_conf->frontend->id, l->bind_conf->rhttp_srvname,
+ l->bind_conf->maxconn);
+ l->rx.rhttp.state = LI_PRECONN_ST_FULL;
+ }
+}
+
+struct connection *rhttp_accept_conn(struct listener *l, int *status)
+{
+ struct connection *conn = l->rx.rhttp.pend_conn;
+
+ if (!conn) {
+ /* Reverse connect listener must have an explicit maxconn set
+ * to ensure it is re-enabled on connection error.
+ */
+ BUG_ON(!l->bind_conf->maxconn);
+
+ /* Instantiate a new conn if maxconn not yet exceeded. */
+ if (l->nbconn <= l->bind_conf->maxconn) {
+ /* Try first if a new thread should be used for the new connection. */
+ unsigned int new_tid = select_thread(l);
+ if (new_tid != tid) {
+ task_migrate(l->rx.rhttp.task, new_tid);
+ *status = CO_AC_DONE;
+ return NULL;
+ }
+
+ /* No need to use a new thread, use the opportunity to alloc the connection right now. */
+ l->rx.rhttp.pend_conn = new_reverse_conn(l, l->rx.rhttp.srv);
+ if (!l->rx.rhttp.pend_conn) {
+ *status = CO_AC_PAUSE;
+ return NULL;
+ }
+ }
+
+ *status = CO_AC_DONE;
+ return NULL;
+ }
+
+ /* listener_accept() must not be called if no pending connection is not yet reversed. */
+ BUG_ON(!(conn->flags & CO_FL_ACT_REVERSING));
+ conn->flags &= ~CO_FL_ACT_REVERSING;
+ conn->flags |= CO_FL_REVERSED;
+ conn->mux->ctl(conn, MUX_CTL_REVERSE_CONN, NULL);
+
+ l->rx.rhttp.pend_conn = NULL;
+ *status = CO_AC_NONE;
+
+ return conn;
+}
+
+void rhttp_unbind_receiver(struct listener *l)
+{
+ l->rx.flags &= ~RX_F_BOUND;
+}
+
+int rhttp_set_affinity(struct connection *conn, int new_tid)
+{
+ /* Explicitely disable connection thread migration on accept. Indeed,
+ * it's unsafe to move a connection with its FD to another thread. Note
+ * that active reverse task thread migration should be sufficient to
+ * ensure repartition of reversed connections accross listener threads.
+ */
+ return -1;
+}
+
+int rhttp_accepting_conn(const struct receiver *rx)
+{
+ return 1;
+}
+
+INITCALL1(STG_REGISTER, protocol_register, &proto_rhttp);
+
+/* perform minimal intializations */
+static void init_rhttp()
+{
+ int i;
+
+ for (i = 0; i < MAX_THREADS; i++)
+ ha_thread_ctx[i].nb_rhttp_conns = 0;
+}
+
+INITCALL0(STG_PREPARE, init_rhttp);