diff options
Diffstat (limited to 'include/haproxy/fd.h')
-rw-r--r-- | include/haproxy/fd.h | 542 |
1 files changed, 542 insertions, 0 deletions
diff --git a/include/haproxy/fd.h b/include/haproxy/fd.h new file mode 100644 index 0000000..11212ff --- /dev/null +++ b/include/haproxy/fd.h @@ -0,0 +1,542 @@ +/* + * include/haproxy/fd.h + * File descriptors states - exported variables and functions + * + * Copyright (C) 2000-2020 Willy Tarreau - w@1wt.eu + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation, version 2.1 + * exclusively. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public + * License along with this library; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA + */ + +#ifndef _HAPROXY_FD_H +#define _HAPROXY_FD_H + +#include <sys/time.h> +#include <sys/types.h> +#include <stdio.h> +#include <unistd.h> +#include <import/ist.h> +#include <haproxy/api.h> +#include <haproxy/atomic.h> +#include <haproxy/fd-t.h> +#include <haproxy/global.h> +#include <haproxy/thread.h> + +/* public variables */ + +extern struct poller cur_poller; /* the current poller */ +extern int nbpollers; +extern struct poller pollers[MAX_POLLERS]; /* all registered pollers */ +extern struct fdtab *fdtab; /* array of all the file descriptors */ +extern struct fdinfo *fdinfo; /* less-often used infos for file descriptors */ +extern int totalconn; /* total # of terminated sessions */ +extern int actconn; /* # of active sessions */ + +extern volatile struct fdlist update_list[MAX_TGROUPS]; +extern struct polled_mask *polled_mask; + +extern THREAD_LOCAL int *fd_updt; // FD updates list +extern THREAD_LOCAL int fd_nbupdt; // number of updates in the list + +extern int poller_wr_pipe[MAX_THREADS]; + +extern volatile int ha_used_fds; // Number of FDs we're currently using + +/* Deletes an FD from the fdsets. + * The file descriptor is also closed. + */ +void fd_delete(int fd); +void _fd_delete_orphan(int fd); + +/* makes the new fd non-blocking and clears all other O_* flags; + * this is meant to be used on new FDs. Returns -1 on failure. + */ +int fd_set_nonblock(int fd); + +/* makes the fd close-on-exec; returns -1 on failure. */ +int fd_set_cloexec(int fd); + +/* Migrate a FD to a new thread <new_tid>. */ +void fd_migrate_on(int fd, uint new_tid); + +/* + * Take over a FD belonging to another thread. + * Returns 0 on success, and -1 on failure. + */ +int fd_takeover(int fd, void *expected_owner); + +ssize_t fd_write_frag_line(int fd, size_t maxlen, const struct ist pfx[], size_t npfx, const struct ist msg[], size_t nmsg, int nl); + +/* close all FDs starting from <start> */ +void my_closefrom(int start); + +struct rlimit; +int raise_rlim_nofile(struct rlimit *old_limit, struct rlimit *new_limit); + +int compute_poll_timeout(int next); +void fd_leaving_poll(int wait_time, int status); + +/* disable the specified poller */ +void disable_poller(const char *poller_name); + +void poller_pipe_io_handler(int fd); + +/* + * Initialize the pollers till the best one is found. + * If none works, returns 0, otherwise 1. + * The pollers register themselves just before main() is called. + */ +int init_pollers(void); + +/* + * Deinitialize the pollers. + */ +void deinit_pollers(void); + +/* + * Some pollers may lose their connection after a fork(). It may be necessary + * to create initialize part of them again. Returns 0 in case of failure, + * otherwise 1. The fork() function may be NULL if unused. In case of error, + * the the current poller is destroyed and the caller is responsible for trying + * another one by calling init_pollers() again. + */ +int fork_poller(void); + +/* + * Lists the known pollers on <out>. + * Should be performed only before initialization. + */ +int list_pollers(FILE *out); + +/* + * Runs the polling loop + */ +void run_poller(); + +void fd_add_to_fd_list(volatile struct fdlist *list, int fd); +void fd_rm_from_fd_list(volatile struct fdlist *list, int fd); +void updt_fd_polling(const int fd); +int fd_update_events(int fd, uint evts); +void fd_reregister_all(int tgrp, ulong mask); + +/* Called from the poller to acknowledge we read an entry from the global + * update list, to remove our bit from the update_mask, and remove it from + * the list if we were the last one. + */ +static inline void done_update_polling(int fd) +{ + unsigned long update_mask; + + update_mask = _HA_ATOMIC_AND_FETCH(&fdtab[fd].update_mask, ~ti->ltid_bit); + while ((update_mask & _HA_ATOMIC_LOAD(&tg->threads_enabled)) == 0) { + /* If we were the last one that had to update that entry, remove it from the list */ + fd_rm_from_fd_list(&update_list[tgid - 1], fd); + update_mask = _HA_ATOMIC_LOAD(&fdtab[fd].update_mask); + if ((update_mask & _HA_ATOMIC_LOAD(&tg->threads_enabled)) != 0) { + /* Maybe it's been re-updated in the meanwhile, and we + * wrongly removed it from the list, if so, re-add it + */ + fd_add_to_fd_list(&update_list[tgid - 1], fd); + update_mask = _HA_ATOMIC_LOAD(&fdtab[fd].update_mask); + /* And then check again, just in case after all it + * should be removed, even if it's very unlikely, given + * the current thread wouldn't have been able to take + * care of it yet */ + } else + break; + } +} + +/* + * returns true if the FD is active for recv + */ +static inline int fd_recv_active(const int fd) +{ + return (unsigned)fdtab[fd].state & FD_EV_ACTIVE_R; +} + +/* + * returns true if the FD is ready for recv + */ +static inline int fd_recv_ready(const int fd) +{ + return (unsigned)fdtab[fd].state & FD_EV_READY_R; +} + +/* + * returns true if the FD is active for send + */ +static inline int fd_send_active(const int fd) +{ + return (unsigned)fdtab[fd].state & FD_EV_ACTIVE_W; +} + +/* + * returns true if the FD is ready for send + */ +static inline int fd_send_ready(const int fd) +{ + return (unsigned)fdtab[fd].state & FD_EV_READY_W; +} + +/* + * returns true if the FD is active for recv or send + */ +static inline int fd_active(const int fd) +{ + return (unsigned)fdtab[fd].state & FD_EV_ACTIVE_RW; +} + +/* Disable processing recv events on fd <fd> */ +static inline void fd_stop_recv(int fd) +{ + if (!(fdtab[fd].state & FD_EV_ACTIVE_R) || + !HA_ATOMIC_BTR(&fdtab[fd].state, FD_EV_ACTIVE_R_BIT)) + return; +} + +/* Disable processing send events on fd <fd> */ +static inline void fd_stop_send(int fd) +{ + if (!(fdtab[fd].state & FD_EV_ACTIVE_W) || + !HA_ATOMIC_BTR(&fdtab[fd].state, FD_EV_ACTIVE_W_BIT)) + return; +} + +/* Disable processing of events on fd <fd> for both directions. */ +static inline void fd_stop_both(int fd) +{ + uint old, new; + + old = fdtab[fd].state; + do { + if (!(old & FD_EV_ACTIVE_RW)) + return; + new = old & ~FD_EV_ACTIVE_RW; + } while (unlikely(!_HA_ATOMIC_CAS(&fdtab[fd].state, &old, new))); +} + +/* Report that FD <fd> cannot receive anymore without polling (EAGAIN detected). */ +static inline void fd_cant_recv(const int fd) +{ + /* marking ready never changes polled status */ + if (!(fdtab[fd].state & FD_EV_READY_R) || + !HA_ATOMIC_BTR(&fdtab[fd].state, FD_EV_READY_R_BIT)) + return; +} + +/* Report that FD <fd> may receive again without polling. */ +static inline void fd_may_recv(const int fd) +{ + /* marking ready never changes polled status */ + if ((fdtab[fd].state & FD_EV_READY_R) || + HA_ATOMIC_BTS(&fdtab[fd].state, FD_EV_READY_R_BIT)) + return; +} + +/* Report that FD <fd> may receive again without polling but only if its not + * active yet. This is in order to speculatively try to enable I/Os when it's + * highly likely that these will succeed, but without interfering with polling. + */ +static inline void fd_cond_recv(const int fd) +{ + if ((fdtab[fd].state & (FD_EV_ACTIVE_R|FD_EV_READY_R)) == 0) + HA_ATOMIC_BTS(&fdtab[fd].state, FD_EV_READY_R_BIT); +} + +/* Report that FD <fd> may send again without polling but only if its not + * active yet. This is in order to speculatively try to enable I/Os when it's + * highly likely that these will succeed, but without interfering with polling. + */ +static inline void fd_cond_send(const int fd) +{ + if ((fdtab[fd].state & (FD_EV_ACTIVE_W|FD_EV_READY_W)) == 0) + HA_ATOMIC_BTS(&fdtab[fd].state, FD_EV_READY_W_BIT); +} + +/* Report that FD <fd> may receive and send without polling. Used at FD + * initialization. + */ +static inline void fd_may_both(const int fd) +{ + HA_ATOMIC_OR(&fdtab[fd].state, FD_EV_READY_RW); +} + +/* Report that FD <fd> cannot send anymore without polling (EAGAIN detected). */ +static inline void fd_cant_send(const int fd) +{ + /* removing ready never changes polled status */ + if (!(fdtab[fd].state & FD_EV_READY_W) || + !HA_ATOMIC_BTR(&fdtab[fd].state, FD_EV_READY_W_BIT)) + return; +} + +/* Report that FD <fd> may send again without polling (EAGAIN not detected). */ +static inline void fd_may_send(const int fd) +{ + /* marking ready never changes polled status */ + if ((fdtab[fd].state & FD_EV_READY_W) || + HA_ATOMIC_BTS(&fdtab[fd].state, FD_EV_READY_W_BIT)) + return; +} + +/* Prepare FD <fd> to try to receive */ +static inline void fd_want_recv(int fd) +{ + if ((fdtab[fd].state & FD_EV_ACTIVE_R) || + HA_ATOMIC_BTS(&fdtab[fd].state, FD_EV_ACTIVE_R_BIT)) + return; + updt_fd_polling(fd); +} + +/* Prepare FD <fd> to try to receive, and only create update if fd_updt exists + * (essentially for receivers during early boot). + */ +static inline void fd_want_recv_safe(int fd) +{ + if ((fdtab[fd].state & FD_EV_ACTIVE_R) || + HA_ATOMIC_BTS(&fdtab[fd].state, FD_EV_ACTIVE_R_BIT)) + return; + if (fd_updt) + updt_fd_polling(fd); +} + +/* Prepare FD <fd> to try to send */ +static inline void fd_want_send(int fd) +{ + if ((fdtab[fd].state & FD_EV_ACTIVE_W) || + HA_ATOMIC_BTS(&fdtab[fd].state, FD_EV_ACTIVE_W_BIT)) + return; + updt_fd_polling(fd); +} + +/* returns the tgid from an fd (masks the refcount) */ +static forceinline int fd_tgid(int fd) +{ + return _HA_ATOMIC_LOAD(&fdtab[fd].refc_tgid) & 0xFFFF; +} + +/* Release a tgid previously taken by fd_grab_tgid() */ +static forceinline void fd_drop_tgid(int fd) +{ + HA_ATOMIC_SUB(&fdtab[fd].refc_tgid, 0x10000); +} + +/* Unlock a tgid currently locked by fd_lock_tgid(). This will effectively + * allow threads from the FD's tgid to check the masks and manipulate the FD. + */ +static forceinline void fd_unlock_tgid(int fd) +{ + HA_ATOMIC_AND(&fdtab[fd].refc_tgid, 0xffff7fffU); +} + +/* Switch the FD's TGID to the new value with a refcount of 1 and the lock bit + * set. It doesn't care about the current TGID, except that it will wait for + * the FD not to be already switching and having its refcount cleared. After + * the function returns, the caller is free to manipulate the masks, and it + * must call fd_unlock_tgid() to drop the lock, allowing threads from the + * designated group to use the FD. Finally a call to fd_drop_tgid() will be + * needed to drop the reference. + */ +static inline void fd_lock_tgid(int fd, uint desired_tgid) +{ + uint old; + + BUG_ON(!desired_tgid); + + old = tgid; // assume we start from the caller's tgid + desired_tgid |= 0x18000; // refcount=1, lock bit=1. + + while (1) { + old &= 0x7fff; // expect no lock and refcount==0 + if (_HA_ATOMIC_CAS(&fdtab[fd].refc_tgid, &old, desired_tgid)) + break; + __ha_cpu_relax(); + } +} + +/* Grab a reference to the FD's TGID, and return the tgid. Note that a TGID of + * zero indicates the FD was closed, thus also fails (i.e. no need to drop it). + * On non-zero (success), the caller must release it using fd_drop_tgid(). + */ +static inline uint fd_take_tgid(int fd) +{ + uint old; + + old = _HA_ATOMIC_FETCH_ADD(&fdtab[fd].refc_tgid, 0x10000) & 0xffff; + if (likely(old)) + return old; + HA_ATOMIC_SUB(&fdtab[fd].refc_tgid, 0x10000); + return 0; +} + +/* Reset a tgid without affecting the refcount */ +static forceinline void fd_reset_tgid(int fd) +{ + HA_ATOMIC_AND(&fdtab[fd].refc_tgid, 0xffff0000U); +} + +/* Try to grab a reference to the FD's TGID, but only if it matches the + * requested one (i.e. it succeeds with TGID refcnt held, or fails). Note that + * a TGID of zero indicates the FD was closed, thus also fails. It returns + * non-zero on success, in which case the caller must then release it using + * fd_drop_tgid(), or zero on failure. The function is optimized for use + * when it's likely that the tgid matches the desired one as it's by far + * the most common. + */ +static inline uint fd_grab_tgid(int fd, uint desired_tgid) +{ + uint old; + + old = _HA_ATOMIC_FETCH_ADD(&fdtab[fd].refc_tgid, 0x10000) & 0xffff; + if (likely(old == desired_tgid)) + return 1; + HA_ATOMIC_SUB(&fdtab[fd].refc_tgid, 0x10000); + return 0; +} + +/* Set the FD's TGID to the new value with a refcount of 1, waiting for the + * current refcount to become 0, to cover the rare possibly that a late + * competing thread would be touching the tgid or the running mask in parallel. + * The caller must call fd_drop_tgid() once done. + */ +static inline void fd_claim_tgid(int fd, uint desired_tgid) +{ + uint old; + + BUG_ON(!desired_tgid); + + desired_tgid += 0x10000; // refcount=1 + old = 0; // assume unused (most likely) + while (1) { + if (_HA_ATOMIC_CAS(&fdtab[fd].refc_tgid, &old, desired_tgid)) + break; + __ha_cpu_relax(); + old &= 0x7fff; // keep only the tgid and drop the lock + } +} + +/* atomically read the running mask if the tgid matches, or returns zero if it + * does not match. This is meant for use in code paths where the bit is expected + * to be present and will be sufficient to protect against a short-term group + * migration (e.g. takss and return from iocb). + */ +static inline ulong fd_get_running(int fd, uint desired_tgid) +{ + ulong ret = 0; + uint old; + + /* TODO: may also be checked using an atomic double-load from a DWCAS + * on compatible architectures, which wouldn't require to modify nor + * restore the original value. + */ + old = _HA_ATOMIC_ADD_FETCH(&fdtab[fd].refc_tgid, 0x10000); + if (likely((old & 0xffff) == desired_tgid)) + ret = _HA_ATOMIC_LOAD(&fdtab[fd].running_mask); + _HA_ATOMIC_SUB(&fdtab[fd].refc_tgid, 0x10000); + return ret; +} + +/* remove tid_bit from the fd's running mask and returns the value before the + * atomic operation, so that the caller can know if it was present. + */ +static inline long fd_clr_running(int fd) +{ + return _HA_ATOMIC_FETCH_AND(&fdtab[fd].running_mask, ~ti->ltid_bit); +} + +/* Prepares <fd> for being polled on all permitted threads of this group ID + * (these will then be refined to only cover running ones). +*/ +static inline void fd_insert(int fd, void *owner, void (*iocb)(int fd), int tgid, unsigned long thread_mask) +{ + extern void sock_conn_iocb(int); + int newstate; + + /* conn_fd_handler should support edge-triggered FDs */ + newstate = 0; + if ((global.tune.options & GTUNE_FD_ET) && iocb == sock_conn_iocb) + newstate |= FD_ET_POSSIBLE; + + /* This must never happen and would definitely indicate a bug, in + * addition to overwriting some unexpected memory areas. + */ + BUG_ON(fd < 0); + BUG_ON(fd >= global.maxsock); + BUG_ON(fdtab[fd].owner != NULL); + BUG_ON(fdtab[fd].state != 0); + BUG_ON(tgid < 1 || tgid > MAX_TGROUPS); + + thread_mask &= tg->threads_enabled; + BUG_ON(thread_mask == 0); + + fd_claim_tgid(fd, tgid); + + BUG_ON(fdtab[fd].running_mask); + + fdtab[fd].owner = owner; + fdtab[fd].iocb = iocb; + fdtab[fd].state = newstate; + fdtab[fd].thread_mask = thread_mask; + fd_drop_tgid(fd); + +#ifdef DEBUG_FD + fdtab[fd].event_count = 0; +#endif + + /* note: do not reset polled_mask here as it indicates which poller + * still knows this FD from a possible previous round. + */ + + /* the two directions are ready until proven otherwise */ + fd_may_both(fd); + _HA_ATOMIC_INC(&ha_used_fds); +} + +/* These are replacements for FD_SET, FD_CLR, FD_ISSET, working on uints */ +static inline void hap_fd_set(int fd, unsigned int *evts) +{ + _HA_ATOMIC_OR(&evts[fd / (8*sizeof(*evts))], 1U << (fd & (8*sizeof(*evts) - 1))); +} + +static inline void hap_fd_clr(int fd, unsigned int *evts) +{ + _HA_ATOMIC_AND(&evts[fd / (8*sizeof(*evts))], ~(1U << (fd & (8*sizeof(*evts) - 1)))); +} + +static inline unsigned int hap_fd_isset(int fd, unsigned int *evts) +{ + return evts[fd / (8*sizeof(*evts))] & (1U << (fd & (8*sizeof(*evts) - 1))); +} + +/* send a wake-up event to this thread, only if it's asleep and not notified yet */ +static inline void wake_thread(int thr) +{ + struct thread_ctx *ctx = &ha_thread_ctx[thr]; + + if ((_HA_ATOMIC_FETCH_OR(&ctx->flags, TH_FL_NOTIFIED) & (TH_FL_SLEEPING|TH_FL_NOTIFIED)) == TH_FL_SLEEPING) { + char c = 'c'; + DISGUISE(write(poller_wr_pipe[thr], &c, 1)); + } +} + + +#endif /* _HAPROXY_FD_H */ + +/* + * Local variables: + * c-indent-level: 8 + * c-basic-offset: 8 + * End: + */ |