summaryrefslogtreecommitdiffstats
path: root/include/haproxy/fd.h
diff options
context:
space:
mode:
Diffstat (limited to 'include/haproxy/fd.h')
-rw-r--r--include/haproxy/fd.h542
1 files changed, 542 insertions, 0 deletions
diff --git a/include/haproxy/fd.h b/include/haproxy/fd.h
new file mode 100644
index 0000000..11212ff
--- /dev/null
+++ b/include/haproxy/fd.h
@@ -0,0 +1,542 @@
+/*
+ * include/haproxy/fd.h
+ * File descriptors states - exported variables and functions
+ *
+ * Copyright (C) 2000-2020 Willy Tarreau - w@1wt.eu
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation, version 2.1
+ * exclusively.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
+ */
+
+#ifndef _HAPROXY_FD_H
+#define _HAPROXY_FD_H
+
+#include <sys/time.h>
+#include <sys/types.h>
+#include <stdio.h>
+#include <unistd.h>
+#include <import/ist.h>
+#include <haproxy/api.h>
+#include <haproxy/atomic.h>
+#include <haproxy/fd-t.h>
+#include <haproxy/global.h>
+#include <haproxy/thread.h>
+
+/* public variables */
+
+extern struct poller cur_poller; /* the current poller */
+extern int nbpollers;
+extern struct poller pollers[MAX_POLLERS]; /* all registered pollers */
+extern struct fdtab *fdtab; /* array of all the file descriptors */
+extern struct fdinfo *fdinfo; /* less-often used infos for file descriptors */
+extern int totalconn; /* total # of terminated sessions */
+extern int actconn; /* # of active sessions */
+
+extern volatile struct fdlist update_list[MAX_TGROUPS];
+extern struct polled_mask *polled_mask;
+
+extern THREAD_LOCAL int *fd_updt; // FD updates list
+extern THREAD_LOCAL int fd_nbupdt; // number of updates in the list
+
+extern int poller_wr_pipe[MAX_THREADS];
+
+extern volatile int ha_used_fds; // Number of FDs we're currently using
+
+/* Deletes an FD from the fdsets.
+ * The file descriptor is also closed.
+ */
+void fd_delete(int fd);
+void _fd_delete_orphan(int fd);
+
+/* makes the new fd non-blocking and clears all other O_* flags;
+ * this is meant to be used on new FDs. Returns -1 on failure.
+ */
+int fd_set_nonblock(int fd);
+
+/* makes the fd close-on-exec; returns -1 on failure. */
+int fd_set_cloexec(int fd);
+
+/* Migrate a FD to a new thread <new_tid>. */
+void fd_migrate_on(int fd, uint new_tid);
+
+/*
+ * Take over a FD belonging to another thread.
+ * Returns 0 on success, and -1 on failure.
+ */
+int fd_takeover(int fd, void *expected_owner);
+
+ssize_t fd_write_frag_line(int fd, size_t maxlen, const struct ist pfx[], size_t npfx, const struct ist msg[], size_t nmsg, int nl);
+
+/* close all FDs starting from <start> */
+void my_closefrom(int start);
+
+struct rlimit;
+int raise_rlim_nofile(struct rlimit *old_limit, struct rlimit *new_limit);
+
+int compute_poll_timeout(int next);
+void fd_leaving_poll(int wait_time, int status);
+
+/* disable the specified poller */
+void disable_poller(const char *poller_name);
+
+void poller_pipe_io_handler(int fd);
+
+/*
+ * Initialize the pollers till the best one is found.
+ * If none works, returns 0, otherwise 1.
+ * The pollers register themselves just before main() is called.
+ */
+int init_pollers(void);
+
+/*
+ * Deinitialize the pollers.
+ */
+void deinit_pollers(void);
+
+/*
+ * Some pollers may lose their connection after a fork(). It may be necessary
+ * to create initialize part of them again. Returns 0 in case of failure,
+ * otherwise 1. The fork() function may be NULL if unused. In case of error,
+ * the the current poller is destroyed and the caller is responsible for trying
+ * another one by calling init_pollers() again.
+ */
+int fork_poller(void);
+
+/*
+ * Lists the known pollers on <out>.
+ * Should be performed only before initialization.
+ */
+int list_pollers(FILE *out);
+
+/*
+ * Runs the polling loop
+ */
+void run_poller();
+
+void fd_add_to_fd_list(volatile struct fdlist *list, int fd);
+void fd_rm_from_fd_list(volatile struct fdlist *list, int fd);
+void updt_fd_polling(const int fd);
+int fd_update_events(int fd, uint evts);
+void fd_reregister_all(int tgrp, ulong mask);
+
+/* Called from the poller to acknowledge we read an entry from the global
+ * update list, to remove our bit from the update_mask, and remove it from
+ * the list if we were the last one.
+ */
+static inline void done_update_polling(int fd)
+{
+ unsigned long update_mask;
+
+ update_mask = _HA_ATOMIC_AND_FETCH(&fdtab[fd].update_mask, ~ti->ltid_bit);
+ while ((update_mask & _HA_ATOMIC_LOAD(&tg->threads_enabled)) == 0) {
+ /* If we were the last one that had to update that entry, remove it from the list */
+ fd_rm_from_fd_list(&update_list[tgid - 1], fd);
+ update_mask = _HA_ATOMIC_LOAD(&fdtab[fd].update_mask);
+ if ((update_mask & _HA_ATOMIC_LOAD(&tg->threads_enabled)) != 0) {
+ /* Maybe it's been re-updated in the meanwhile, and we
+ * wrongly removed it from the list, if so, re-add it
+ */
+ fd_add_to_fd_list(&update_list[tgid - 1], fd);
+ update_mask = _HA_ATOMIC_LOAD(&fdtab[fd].update_mask);
+ /* And then check again, just in case after all it
+ * should be removed, even if it's very unlikely, given
+ * the current thread wouldn't have been able to take
+ * care of it yet */
+ } else
+ break;
+ }
+}
+
+/*
+ * returns true if the FD is active for recv
+ */
+static inline int fd_recv_active(const int fd)
+{
+ return (unsigned)fdtab[fd].state & FD_EV_ACTIVE_R;
+}
+
+/*
+ * returns true if the FD is ready for recv
+ */
+static inline int fd_recv_ready(const int fd)
+{
+ return (unsigned)fdtab[fd].state & FD_EV_READY_R;
+}
+
+/*
+ * returns true if the FD is active for send
+ */
+static inline int fd_send_active(const int fd)
+{
+ return (unsigned)fdtab[fd].state & FD_EV_ACTIVE_W;
+}
+
+/*
+ * returns true if the FD is ready for send
+ */
+static inline int fd_send_ready(const int fd)
+{
+ return (unsigned)fdtab[fd].state & FD_EV_READY_W;
+}
+
+/*
+ * returns true if the FD is active for recv or send
+ */
+static inline int fd_active(const int fd)
+{
+ return (unsigned)fdtab[fd].state & FD_EV_ACTIVE_RW;
+}
+
+/* Disable processing recv events on fd <fd> */
+static inline void fd_stop_recv(int fd)
+{
+ if (!(fdtab[fd].state & FD_EV_ACTIVE_R) ||
+ !HA_ATOMIC_BTR(&fdtab[fd].state, FD_EV_ACTIVE_R_BIT))
+ return;
+}
+
+/* Disable processing send events on fd <fd> */
+static inline void fd_stop_send(int fd)
+{
+ if (!(fdtab[fd].state & FD_EV_ACTIVE_W) ||
+ !HA_ATOMIC_BTR(&fdtab[fd].state, FD_EV_ACTIVE_W_BIT))
+ return;
+}
+
+/* Disable processing of events on fd <fd> for both directions. */
+static inline void fd_stop_both(int fd)
+{
+ uint old, new;
+
+ old = fdtab[fd].state;
+ do {
+ if (!(old & FD_EV_ACTIVE_RW))
+ return;
+ new = old & ~FD_EV_ACTIVE_RW;
+ } while (unlikely(!_HA_ATOMIC_CAS(&fdtab[fd].state, &old, new)));
+}
+
+/* Report that FD <fd> cannot receive anymore without polling (EAGAIN detected). */
+static inline void fd_cant_recv(const int fd)
+{
+ /* marking ready never changes polled status */
+ if (!(fdtab[fd].state & FD_EV_READY_R) ||
+ !HA_ATOMIC_BTR(&fdtab[fd].state, FD_EV_READY_R_BIT))
+ return;
+}
+
+/* Report that FD <fd> may receive again without polling. */
+static inline void fd_may_recv(const int fd)
+{
+ /* marking ready never changes polled status */
+ if ((fdtab[fd].state & FD_EV_READY_R) ||
+ HA_ATOMIC_BTS(&fdtab[fd].state, FD_EV_READY_R_BIT))
+ return;
+}
+
+/* Report that FD <fd> may receive again without polling but only if its not
+ * active yet. This is in order to speculatively try to enable I/Os when it's
+ * highly likely that these will succeed, but without interfering with polling.
+ */
+static inline void fd_cond_recv(const int fd)
+{
+ if ((fdtab[fd].state & (FD_EV_ACTIVE_R|FD_EV_READY_R)) == 0)
+ HA_ATOMIC_BTS(&fdtab[fd].state, FD_EV_READY_R_BIT);
+}
+
+/* Report that FD <fd> may send again without polling but only if its not
+ * active yet. This is in order to speculatively try to enable I/Os when it's
+ * highly likely that these will succeed, but without interfering with polling.
+ */
+static inline void fd_cond_send(const int fd)
+{
+ if ((fdtab[fd].state & (FD_EV_ACTIVE_W|FD_EV_READY_W)) == 0)
+ HA_ATOMIC_BTS(&fdtab[fd].state, FD_EV_READY_W_BIT);
+}
+
+/* Report that FD <fd> may receive and send without polling. Used at FD
+ * initialization.
+ */
+static inline void fd_may_both(const int fd)
+{
+ HA_ATOMIC_OR(&fdtab[fd].state, FD_EV_READY_RW);
+}
+
+/* Report that FD <fd> cannot send anymore without polling (EAGAIN detected). */
+static inline void fd_cant_send(const int fd)
+{
+ /* removing ready never changes polled status */
+ if (!(fdtab[fd].state & FD_EV_READY_W) ||
+ !HA_ATOMIC_BTR(&fdtab[fd].state, FD_EV_READY_W_BIT))
+ return;
+}
+
+/* Report that FD <fd> may send again without polling (EAGAIN not detected). */
+static inline void fd_may_send(const int fd)
+{
+ /* marking ready never changes polled status */
+ if ((fdtab[fd].state & FD_EV_READY_W) ||
+ HA_ATOMIC_BTS(&fdtab[fd].state, FD_EV_READY_W_BIT))
+ return;
+}
+
+/* Prepare FD <fd> to try to receive */
+static inline void fd_want_recv(int fd)
+{
+ if ((fdtab[fd].state & FD_EV_ACTIVE_R) ||
+ HA_ATOMIC_BTS(&fdtab[fd].state, FD_EV_ACTIVE_R_BIT))
+ return;
+ updt_fd_polling(fd);
+}
+
+/* Prepare FD <fd> to try to receive, and only create update if fd_updt exists
+ * (essentially for receivers during early boot).
+ */
+static inline void fd_want_recv_safe(int fd)
+{
+ if ((fdtab[fd].state & FD_EV_ACTIVE_R) ||
+ HA_ATOMIC_BTS(&fdtab[fd].state, FD_EV_ACTIVE_R_BIT))
+ return;
+ if (fd_updt)
+ updt_fd_polling(fd);
+}
+
+/* Prepare FD <fd> to try to send */
+static inline void fd_want_send(int fd)
+{
+ if ((fdtab[fd].state & FD_EV_ACTIVE_W) ||
+ HA_ATOMIC_BTS(&fdtab[fd].state, FD_EV_ACTIVE_W_BIT))
+ return;
+ updt_fd_polling(fd);
+}
+
+/* returns the tgid from an fd (masks the refcount) */
+static forceinline int fd_tgid(int fd)
+{
+ return _HA_ATOMIC_LOAD(&fdtab[fd].refc_tgid) & 0xFFFF;
+}
+
+/* Release a tgid previously taken by fd_grab_tgid() */
+static forceinline void fd_drop_tgid(int fd)
+{
+ HA_ATOMIC_SUB(&fdtab[fd].refc_tgid, 0x10000);
+}
+
+/* Unlock a tgid currently locked by fd_lock_tgid(). This will effectively
+ * allow threads from the FD's tgid to check the masks and manipulate the FD.
+ */
+static forceinline void fd_unlock_tgid(int fd)
+{
+ HA_ATOMIC_AND(&fdtab[fd].refc_tgid, 0xffff7fffU);
+}
+
+/* Switch the FD's TGID to the new value with a refcount of 1 and the lock bit
+ * set. It doesn't care about the current TGID, except that it will wait for
+ * the FD not to be already switching and having its refcount cleared. After
+ * the function returns, the caller is free to manipulate the masks, and it
+ * must call fd_unlock_tgid() to drop the lock, allowing threads from the
+ * designated group to use the FD. Finally a call to fd_drop_tgid() will be
+ * needed to drop the reference.
+ */
+static inline void fd_lock_tgid(int fd, uint desired_tgid)
+{
+ uint old;
+
+ BUG_ON(!desired_tgid);
+
+ old = tgid; // assume we start from the caller's tgid
+ desired_tgid |= 0x18000; // refcount=1, lock bit=1.
+
+ while (1) {
+ old &= 0x7fff; // expect no lock and refcount==0
+ if (_HA_ATOMIC_CAS(&fdtab[fd].refc_tgid, &old, desired_tgid))
+ break;
+ __ha_cpu_relax();
+ }
+}
+
+/* Grab a reference to the FD's TGID, and return the tgid. Note that a TGID of
+ * zero indicates the FD was closed, thus also fails (i.e. no need to drop it).
+ * On non-zero (success), the caller must release it using fd_drop_tgid().
+ */
+static inline uint fd_take_tgid(int fd)
+{
+ uint old;
+
+ old = _HA_ATOMIC_FETCH_ADD(&fdtab[fd].refc_tgid, 0x10000) & 0xffff;
+ if (likely(old))
+ return old;
+ HA_ATOMIC_SUB(&fdtab[fd].refc_tgid, 0x10000);
+ return 0;
+}
+
+/* Reset a tgid without affecting the refcount */
+static forceinline void fd_reset_tgid(int fd)
+{
+ HA_ATOMIC_AND(&fdtab[fd].refc_tgid, 0xffff0000U);
+}
+
+/* Try to grab a reference to the FD's TGID, but only if it matches the
+ * requested one (i.e. it succeeds with TGID refcnt held, or fails). Note that
+ * a TGID of zero indicates the FD was closed, thus also fails. It returns
+ * non-zero on success, in which case the caller must then release it using
+ * fd_drop_tgid(), or zero on failure. The function is optimized for use
+ * when it's likely that the tgid matches the desired one as it's by far
+ * the most common.
+ */
+static inline uint fd_grab_tgid(int fd, uint desired_tgid)
+{
+ uint old;
+
+ old = _HA_ATOMIC_FETCH_ADD(&fdtab[fd].refc_tgid, 0x10000) & 0xffff;
+ if (likely(old == desired_tgid))
+ return 1;
+ HA_ATOMIC_SUB(&fdtab[fd].refc_tgid, 0x10000);
+ return 0;
+}
+
+/* Set the FD's TGID to the new value with a refcount of 1, waiting for the
+ * current refcount to become 0, to cover the rare possibly that a late
+ * competing thread would be touching the tgid or the running mask in parallel.
+ * The caller must call fd_drop_tgid() once done.
+ */
+static inline void fd_claim_tgid(int fd, uint desired_tgid)
+{
+ uint old;
+
+ BUG_ON(!desired_tgid);
+
+ desired_tgid += 0x10000; // refcount=1
+ old = 0; // assume unused (most likely)
+ while (1) {
+ if (_HA_ATOMIC_CAS(&fdtab[fd].refc_tgid, &old, desired_tgid))
+ break;
+ __ha_cpu_relax();
+ old &= 0x7fff; // keep only the tgid and drop the lock
+ }
+}
+
+/* atomically read the running mask if the tgid matches, or returns zero if it
+ * does not match. This is meant for use in code paths where the bit is expected
+ * to be present and will be sufficient to protect against a short-term group
+ * migration (e.g. takss and return from iocb).
+ */
+static inline ulong fd_get_running(int fd, uint desired_tgid)
+{
+ ulong ret = 0;
+ uint old;
+
+ /* TODO: may also be checked using an atomic double-load from a DWCAS
+ * on compatible architectures, which wouldn't require to modify nor
+ * restore the original value.
+ */
+ old = _HA_ATOMIC_ADD_FETCH(&fdtab[fd].refc_tgid, 0x10000);
+ if (likely((old & 0xffff) == desired_tgid))
+ ret = _HA_ATOMIC_LOAD(&fdtab[fd].running_mask);
+ _HA_ATOMIC_SUB(&fdtab[fd].refc_tgid, 0x10000);
+ return ret;
+}
+
+/* remove tid_bit from the fd's running mask and returns the value before the
+ * atomic operation, so that the caller can know if it was present.
+ */
+static inline long fd_clr_running(int fd)
+{
+ return _HA_ATOMIC_FETCH_AND(&fdtab[fd].running_mask, ~ti->ltid_bit);
+}
+
+/* Prepares <fd> for being polled on all permitted threads of this group ID
+ * (these will then be refined to only cover running ones).
+*/
+static inline void fd_insert(int fd, void *owner, void (*iocb)(int fd), int tgid, unsigned long thread_mask)
+{
+ extern void sock_conn_iocb(int);
+ int newstate;
+
+ /* conn_fd_handler should support edge-triggered FDs */
+ newstate = 0;
+ if ((global.tune.options & GTUNE_FD_ET) && iocb == sock_conn_iocb)
+ newstate |= FD_ET_POSSIBLE;
+
+ /* This must never happen and would definitely indicate a bug, in
+ * addition to overwriting some unexpected memory areas.
+ */
+ BUG_ON(fd < 0);
+ BUG_ON(fd >= global.maxsock);
+ BUG_ON(fdtab[fd].owner != NULL);
+ BUG_ON(fdtab[fd].state != 0);
+ BUG_ON(tgid < 1 || tgid > MAX_TGROUPS);
+
+ thread_mask &= tg->threads_enabled;
+ BUG_ON(thread_mask == 0);
+
+ fd_claim_tgid(fd, tgid);
+
+ BUG_ON(fdtab[fd].running_mask);
+
+ fdtab[fd].owner = owner;
+ fdtab[fd].iocb = iocb;
+ fdtab[fd].state = newstate;
+ fdtab[fd].thread_mask = thread_mask;
+ fd_drop_tgid(fd);
+
+#ifdef DEBUG_FD
+ fdtab[fd].event_count = 0;
+#endif
+
+ /* note: do not reset polled_mask here as it indicates which poller
+ * still knows this FD from a possible previous round.
+ */
+
+ /* the two directions are ready until proven otherwise */
+ fd_may_both(fd);
+ _HA_ATOMIC_INC(&ha_used_fds);
+}
+
+/* These are replacements for FD_SET, FD_CLR, FD_ISSET, working on uints */
+static inline void hap_fd_set(int fd, unsigned int *evts)
+{
+ _HA_ATOMIC_OR(&evts[fd / (8*sizeof(*evts))], 1U << (fd & (8*sizeof(*evts) - 1)));
+}
+
+static inline void hap_fd_clr(int fd, unsigned int *evts)
+{
+ _HA_ATOMIC_AND(&evts[fd / (8*sizeof(*evts))], ~(1U << (fd & (8*sizeof(*evts) - 1))));
+}
+
+static inline unsigned int hap_fd_isset(int fd, unsigned int *evts)
+{
+ return evts[fd / (8*sizeof(*evts))] & (1U << (fd & (8*sizeof(*evts) - 1)));
+}
+
+/* send a wake-up event to this thread, only if it's asleep and not notified yet */
+static inline void wake_thread(int thr)
+{
+ struct thread_ctx *ctx = &ha_thread_ctx[thr];
+
+ if ((_HA_ATOMIC_FETCH_OR(&ctx->flags, TH_FL_NOTIFIED) & (TH_FL_SLEEPING|TH_FL_NOTIFIED)) == TH_FL_SLEEPING) {
+ char c = 'c';
+ DISGUISE(write(poller_wr_pipe[thr], &c, 1));
+ }
+}
+
+
+#endif /* _HAPROXY_FD_H */
+
+/*
+ * Local variables:
+ * c-indent-level: 8
+ * c-basic-offset: 8
+ * End:
+ */