/* * include/haproxy/fd.h * File descriptors states - exported variables and functions * * Copyright (C) 2000-2020 Willy Tarreau - w@1wt.eu * * This library is free software; you can redistribute it and/or * modify it under the terms of the GNU Lesser General Public * License as published by the Free Software Foundation, version 2.1 * exclusively. * * This library is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * Lesser General Public License for more details. * * You should have received a copy of the GNU Lesser General Public * License along with this library; if not, write to the Free Software * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA */ #ifndef _HAPROXY_FD_H #define _HAPROXY_FD_H #include #include #include #include #include #include #include #include #include #include /* public variables */ extern struct poller cur_poller; /* the current poller */ extern int nbpollers; extern struct poller pollers[MAX_POLLERS]; /* all registered pollers */ extern struct fdtab *fdtab; /* array of all the file descriptors */ extern struct fdinfo *fdinfo; /* less-often used infos for file descriptors */ extern int totalconn; /* total # of terminated sessions */ extern int actconn; /* # of active sessions */ extern volatile struct fdlist update_list[MAX_TGROUPS]; extern struct polled_mask *polled_mask; extern THREAD_LOCAL int *fd_updt; // FD updates list extern THREAD_LOCAL int fd_nbupdt; // number of updates in the list extern int poller_wr_pipe[MAX_THREADS]; extern volatile int ha_used_fds; // Number of FDs we're currently using /* Deletes an FD from the fdsets. * The file descriptor is also closed. */ void fd_delete(int fd); void _fd_delete_orphan(int fd); /* makes the new fd non-blocking and clears all other O_* flags; * this is meant to be used on new FDs. Returns -1 on failure. */ int fd_set_nonblock(int fd); /* makes the fd close-on-exec; returns -1 on failure. */ int fd_set_cloexec(int fd); /* Migrate a FD to a new thread . */ void fd_migrate_on(int fd, uint new_tid); /* * Take over a FD belonging to another thread. * Returns 0 on success, and -1 on failure. */ int fd_takeover(int fd, void *expected_owner); ssize_t fd_write_frag_line(int fd, size_t maxlen, const struct ist pfx[], size_t npfx, const struct ist msg[], size_t nmsg, int nl); /* close all FDs starting from */ void my_closefrom(int start); struct rlimit; int raise_rlim_nofile(struct rlimit *old_limit, struct rlimit *new_limit); int compute_poll_timeout(int next); void fd_leaving_poll(int wait_time, int status); /* disable the specified poller */ void disable_poller(const char *poller_name); void poller_pipe_io_handler(int fd); /* * Initialize the pollers till the best one is found. * If none works, returns 0, otherwise 1. * The pollers register themselves just before main() is called. */ int init_pollers(void); /* * Deinitialize the pollers. */ void deinit_pollers(void); /* * Some pollers may lose their connection after a fork(). It may be necessary * to create initialize part of them again. Returns 0 in case of failure, * otherwise 1. The fork() function may be NULL if unused. In case of error, * the the current poller is destroyed and the caller is responsible for trying * another one by calling init_pollers() again. */ int fork_poller(void); /* * Lists the known pollers on . * Should be performed only before initialization. */ int list_pollers(FILE *out); /* * Runs the polling loop */ void run_poller(); void fd_add_to_fd_list(volatile struct fdlist *list, int fd); void fd_rm_from_fd_list(volatile struct fdlist *list, int fd); void updt_fd_polling(const int fd); int fd_update_events(int fd, uint evts); void fd_reregister_all(int tgrp, ulong mask); /* Called from the poller to acknowledge we read an entry from the global * update list, to remove our bit from the update_mask, and remove it from * the list if we were the last one. */ static inline void done_update_polling(int fd) { unsigned long update_mask; update_mask = _HA_ATOMIC_AND_FETCH(&fdtab[fd].update_mask, ~ti->ltid_bit); while ((update_mask & _HA_ATOMIC_LOAD(&tg->threads_enabled)) == 0) { /* If we were the last one that had to update that entry, remove it from the list */ fd_rm_from_fd_list(&update_list[tgid - 1], fd); update_mask = _HA_ATOMIC_LOAD(&fdtab[fd].update_mask); if ((update_mask & _HA_ATOMIC_LOAD(&tg->threads_enabled)) != 0) { /* Maybe it's been re-updated in the meanwhile, and we * wrongly removed it from the list, if so, re-add it */ fd_add_to_fd_list(&update_list[tgid - 1], fd); update_mask = _HA_ATOMIC_LOAD(&fdtab[fd].update_mask); /* And then check again, just in case after all it * should be removed, even if it's very unlikely, given * the current thread wouldn't have been able to take * care of it yet */ } else break; } } /* * returns true if the FD is active for recv */ static inline int fd_recv_active(const int fd) { return (unsigned)fdtab[fd].state & FD_EV_ACTIVE_R; } /* * returns true if the FD is ready for recv */ static inline int fd_recv_ready(const int fd) { return (unsigned)fdtab[fd].state & FD_EV_READY_R; } /* * returns true if the FD is active for send */ static inline int fd_send_active(const int fd) { return (unsigned)fdtab[fd].state & FD_EV_ACTIVE_W; } /* * returns true if the FD is ready for send */ static inline int fd_send_ready(const int fd) { return (unsigned)fdtab[fd].state & FD_EV_READY_W; } /* * returns true if the FD is active for recv or send */ static inline int fd_active(const int fd) { return (unsigned)fdtab[fd].state & FD_EV_ACTIVE_RW; } /* Disable processing recv events on fd */ static inline void fd_stop_recv(int fd) { if (!(fdtab[fd].state & FD_EV_ACTIVE_R) || !HA_ATOMIC_BTR(&fdtab[fd].state, FD_EV_ACTIVE_R_BIT)) return; } /* Disable processing send events on fd */ static inline void fd_stop_send(int fd) { if (!(fdtab[fd].state & FD_EV_ACTIVE_W) || !HA_ATOMIC_BTR(&fdtab[fd].state, FD_EV_ACTIVE_W_BIT)) return; } /* Disable processing of events on fd for both directions. */ static inline void fd_stop_both(int fd) { uint old, new; old = fdtab[fd].state; do { if (!(old & FD_EV_ACTIVE_RW)) return; new = old & ~FD_EV_ACTIVE_RW; } while (unlikely(!_HA_ATOMIC_CAS(&fdtab[fd].state, &old, new))); } /* Report that FD cannot receive anymore without polling (EAGAIN detected). */ static inline void fd_cant_recv(const int fd) { /* marking ready never changes polled status */ if (!(fdtab[fd].state & FD_EV_READY_R) || !HA_ATOMIC_BTR(&fdtab[fd].state, FD_EV_READY_R_BIT)) return; } /* Report that FD may receive again without polling. */ static inline void fd_may_recv(const int fd) { /* marking ready never changes polled status */ if ((fdtab[fd].state & FD_EV_READY_R) || HA_ATOMIC_BTS(&fdtab[fd].state, FD_EV_READY_R_BIT)) return; } /* Report that FD may receive again without polling but only if its not * active yet. This is in order to speculatively try to enable I/Os when it's * highly likely that these will succeed, but without interfering with polling. */ static inline void fd_cond_recv(const int fd) { if ((fdtab[fd].state & (FD_EV_ACTIVE_R|FD_EV_READY_R)) == 0) HA_ATOMIC_BTS(&fdtab[fd].state, FD_EV_READY_R_BIT); } /* Report that FD may send again without polling but only if its not * active yet. This is in order to speculatively try to enable I/Os when it's * highly likely that these will succeed, but without interfering with polling. */ static inline void fd_cond_send(const int fd) { if ((fdtab[fd].state & (FD_EV_ACTIVE_W|FD_EV_READY_W)) == 0) HA_ATOMIC_BTS(&fdtab[fd].state, FD_EV_READY_W_BIT); } /* Report that FD may receive and send without polling. Used at FD * initialization. */ static inline void fd_may_both(const int fd) { HA_ATOMIC_OR(&fdtab[fd].state, FD_EV_READY_RW); } /* Report that FD cannot send anymore without polling (EAGAIN detected). */ static inline void fd_cant_send(const int fd) { /* removing ready never changes polled status */ if (!(fdtab[fd].state & FD_EV_READY_W) || !HA_ATOMIC_BTR(&fdtab[fd].state, FD_EV_READY_W_BIT)) return; } /* Report that FD may send again without polling (EAGAIN not detected). */ static inline void fd_may_send(const int fd) { /* marking ready never changes polled status */ if ((fdtab[fd].state & FD_EV_READY_W) || HA_ATOMIC_BTS(&fdtab[fd].state, FD_EV_READY_W_BIT)) return; } /* Prepare FD to try to receive */ static inline void fd_want_recv(int fd) { if ((fdtab[fd].state & FD_EV_ACTIVE_R) || HA_ATOMIC_BTS(&fdtab[fd].state, FD_EV_ACTIVE_R_BIT)) return; updt_fd_polling(fd); } /* Prepare FD to try to receive, and only create update if fd_updt exists * (essentially for receivers during early boot). */ static inline void fd_want_recv_safe(int fd) { if ((fdtab[fd].state & FD_EV_ACTIVE_R) || HA_ATOMIC_BTS(&fdtab[fd].state, FD_EV_ACTIVE_R_BIT)) return; if (fd_updt) updt_fd_polling(fd); } /* Prepare FD to try to send */ static inline void fd_want_send(int fd) { if ((fdtab[fd].state & FD_EV_ACTIVE_W) || HA_ATOMIC_BTS(&fdtab[fd].state, FD_EV_ACTIVE_W_BIT)) return; updt_fd_polling(fd); } /* returns the tgid from an fd (masks the refcount) */ static forceinline int fd_tgid(int fd) { return _HA_ATOMIC_LOAD(&fdtab[fd].refc_tgid) & 0xFFFF; } /* Release a tgid previously taken by fd_grab_tgid() */ static forceinline void fd_drop_tgid(int fd) { HA_ATOMIC_SUB(&fdtab[fd].refc_tgid, 0x10000); } /* Unlock a tgid currently locked by fd_lock_tgid(). This will effectively * allow threads from the FD's tgid to check the masks and manipulate the FD. */ static forceinline void fd_unlock_tgid(int fd) { HA_ATOMIC_AND(&fdtab[fd].refc_tgid, 0xffff7fffU); } /* Switch the FD's TGID to the new value with a refcount of 1 and the lock bit * set. It doesn't care about the current TGID, except that it will wait for * the FD not to be already switching and having its refcount cleared. After * the function returns, the caller is free to manipulate the masks, and it * must call fd_unlock_tgid() to drop the lock, allowing threads from the * designated group to use the FD. Finally a call to fd_drop_tgid() will be * needed to drop the reference. */ static inline void fd_lock_tgid(int fd, uint desired_tgid) { uint old; BUG_ON(!desired_tgid); old = tgid; // assume we start from the caller's tgid desired_tgid |= 0x18000; // refcount=1, lock bit=1. while (1) { old &= 0x7fff; // expect no lock and refcount==0 if (_HA_ATOMIC_CAS(&fdtab[fd].refc_tgid, &old, desired_tgid)) break; __ha_cpu_relax(); } } /* Grab a reference to the FD's TGID, and return the tgid. Note that a TGID of * zero indicates the FD was closed, thus also fails (i.e. no need to drop it). * On non-zero (success), the caller must release it using fd_drop_tgid(). */ static inline uint fd_take_tgid(int fd) { uint old; old = _HA_ATOMIC_FETCH_ADD(&fdtab[fd].refc_tgid, 0x10000) & 0xffff; if (likely(old)) return old; HA_ATOMIC_SUB(&fdtab[fd].refc_tgid, 0x10000); return 0; } /* Reset a tgid without affecting the refcount */ static forceinline void fd_reset_tgid(int fd) { HA_ATOMIC_AND(&fdtab[fd].refc_tgid, 0xffff0000U); } /* Try to grab a reference to the FD's TGID, but only if it matches the * requested one (i.e. it succeeds with TGID refcnt held, or fails). Note that * a TGID of zero indicates the FD was closed, thus also fails. It returns * non-zero on success, in which case the caller must then release it using * fd_drop_tgid(), or zero on failure. The function is optimized for use * when it's likely that the tgid matches the desired one as it's by far * the most common. */ static inline uint fd_grab_tgid(int fd, uint desired_tgid) { uint old; old = _HA_ATOMIC_FETCH_ADD(&fdtab[fd].refc_tgid, 0x10000) & 0xffff; if (likely(old == desired_tgid)) return 1; HA_ATOMIC_SUB(&fdtab[fd].refc_tgid, 0x10000); return 0; } /* Set the FD's TGID to the new value with a refcount of 1, waiting for the * current refcount to become 0, to cover the rare possibly that a late * competing thread would be touching the tgid or the running mask in parallel. * The caller must call fd_drop_tgid() once done. */ static inline void fd_claim_tgid(int fd, uint desired_tgid) { uint old; BUG_ON(!desired_tgid); desired_tgid += 0x10000; // refcount=1 old = 0; // assume unused (most likely) while (1) { if (_HA_ATOMIC_CAS(&fdtab[fd].refc_tgid, &old, desired_tgid)) break; __ha_cpu_relax(); old &= 0x7fff; // keep only the tgid and drop the lock } } /* atomically read the running mask if the tgid matches, or returns zero if it * does not match. This is meant for use in code paths where the bit is expected * to be present and will be sufficient to protect against a short-term group * migration (e.g. takss and return from iocb). */ static inline ulong fd_get_running(int fd, uint desired_tgid) { ulong ret = 0; uint old; /* TODO: may also be checked using an atomic double-load from a DWCAS * on compatible architectures, which wouldn't require to modify nor * restore the original value. */ old = _HA_ATOMIC_ADD_FETCH(&fdtab[fd].refc_tgid, 0x10000); if (likely((old & 0xffff) == desired_tgid)) ret = _HA_ATOMIC_LOAD(&fdtab[fd].running_mask); _HA_ATOMIC_SUB(&fdtab[fd].refc_tgid, 0x10000); return ret; } /* remove tid_bit from the fd's running mask and returns the value before the * atomic operation, so that the caller can know if it was present. */ static inline long fd_clr_running(int fd) { return _HA_ATOMIC_FETCH_AND(&fdtab[fd].running_mask, ~ti->ltid_bit); } /* Prepares for being polled on all permitted threads of this group ID * (these will then be refined to only cover running ones). */ static inline void fd_insert(int fd, void *owner, void (*iocb)(int fd), int tgid, unsigned long thread_mask) { extern void sock_conn_iocb(int); int newstate; /* conn_fd_handler should support edge-triggered FDs */ newstate = 0; if ((global.tune.options & GTUNE_FD_ET) && iocb == sock_conn_iocb) newstate |= FD_ET_POSSIBLE; /* This must never happen and would definitely indicate a bug, in * addition to overwriting some unexpected memory areas. */ BUG_ON(fd < 0); BUG_ON(fd >= global.maxsock); BUG_ON(fdtab[fd].owner != NULL); BUG_ON(fdtab[fd].state != 0); BUG_ON(tgid < 1 || tgid > MAX_TGROUPS); thread_mask &= tg->threads_enabled; BUG_ON(thread_mask == 0); fd_claim_tgid(fd, tgid); BUG_ON(fdtab[fd].running_mask); fdtab[fd].owner = owner; fdtab[fd].iocb = iocb; fdtab[fd].state = newstate; fdtab[fd].thread_mask = thread_mask; fd_drop_tgid(fd); #ifdef DEBUG_FD fdtab[fd].event_count = 0; #endif /* note: do not reset polled_mask here as it indicates which poller * still knows this FD from a possible previous round. */ /* the two directions are ready until proven otherwise */ fd_may_both(fd); _HA_ATOMIC_INC(&ha_used_fds); } /* These are replacements for FD_SET, FD_CLR, FD_ISSET, working on uints */ static inline void hap_fd_set(int fd, unsigned int *evts) { _HA_ATOMIC_OR(&evts[fd / (8*sizeof(*evts))], 1U << (fd & (8*sizeof(*evts) - 1))); } static inline void hap_fd_clr(int fd, unsigned int *evts) { _HA_ATOMIC_AND(&evts[fd / (8*sizeof(*evts))], ~(1U << (fd & (8*sizeof(*evts) - 1)))); } static inline unsigned int hap_fd_isset(int fd, unsigned int *evts) { return evts[fd / (8*sizeof(*evts))] & (1U << (fd & (8*sizeof(*evts) - 1))); } /* send a wake-up event to this thread, only if it's asleep and not notified yet */ static inline void wake_thread(int thr) { struct thread_ctx *ctx = &ha_thread_ctx[thr]; if ((_HA_ATOMIC_FETCH_OR(&ctx->flags, TH_FL_NOTIFIED) & (TH_FL_SLEEPING|TH_FL_NOTIFIED)) == TH_FL_SLEEPING) { char c = 'c'; DISGUISE(write(poller_wr_pipe[thr], &c, 1)); } } #endif /* _HAPROXY_FD_H */ /* * Local variables: * c-indent-level: 8 * c-basic-offset: 8 * End: */