/* Copyright (C) 2011-2022 Open Information Security Foundation * * You can copy, redistribute or modify this Program under the terms of * the GNU General Public License version 2 as published by the Free * Software Foundation. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU General Public License for more details. * * You should have received a copy of the GNU General Public License * version 2 along with this program; if not, write to the Free Software * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA * 02110-1301, USA. */ /** * \defgroup afxdppacket AF_XDP running mode * * @{ */ /** * \file * * \author Richard McConnell * * AF_XDP socket acquisition support * */ #define PCAP_DONT_INCLUDE_PCAP_BPF_H 1 #define SC_PCAP_DONT_INCLUDE_PCAP_H 1 #include "suricata-common.h" #include "suricata.h" #include "decode.h" #include "packet-queue.h" #include "threads.h" #include "threadvars.h" #include "tm-queuehandlers.h" #include "tm-modules.h" #include "tm-threads.h" #include "tm-threads-common.h" #include "conf.h" #include "util-cpu.h" #include "util-datalink.h" #include "util-debug.h" #include "util-device.h" #include "util-ebpf.h" #include "util-error.h" #include "util-privs.h" #include "util-optimize.h" #include "util-checksum.h" #include "util-ioctl.h" #include "util-host-info.h" #include "util-sysfs.h" #include "tmqh-packetpool.h" #include "source-af-xdp.h" #include "runmodes.h" #include "flow-storage.h" #include "util-validate.h" #ifdef HAVE_AF_XDP #include #include #endif #if HAVE_LINUX_IF_ETHER_H #include #endif #ifndef HAVE_AF_XDP TmEcode NoAFXDPSupportExit(ThreadVars *, const void *, void **); void TmModuleReceiveAFXDPRegister(void) { tmm_modules[TMM_RECEIVEAFXDP].name = "ReceiveAFXDP"; tmm_modules[TMM_RECEIVEAFXDP].ThreadInit = NoAFXDPSupportExit; tmm_modules[TMM_RECEIVEAFXDP].Func = NULL; tmm_modules[TMM_RECEIVEAFXDP].ThreadExitPrintStats = NULL; tmm_modules[TMM_RECEIVEAFXDP].ThreadDeinit = NULL; tmm_modules[TMM_RECEIVEAFXDP].cap_flags = 0; tmm_modules[TMM_RECEIVEAFXDP].flags = TM_FLAG_RECEIVE_TM; } /** * \brief Registration Function for DecodeAFXDP. */ void TmModuleDecodeAFXDPRegister(void) { tmm_modules[TMM_DECODEAFXDP].name = "DecodeAFXDP"; tmm_modules[TMM_DECODEAFXDP].ThreadInit = NoAFXDPSupportExit; tmm_modules[TMM_DECODEAFXDP].Func = NULL; tmm_modules[TMM_DECODEAFXDP].ThreadExitPrintStats = NULL; tmm_modules[TMM_DECODEAFXDP].ThreadDeinit = NULL; tmm_modules[TMM_DECODEAFXDP].cap_flags = 0; tmm_modules[TMM_DECODEAFXDP].flags = TM_FLAG_DECODE_TM; } /** * \brief this function prints an error message and exits. */ TmEcode NoAFXDPSupportExit(ThreadVars *tv, const void *initdata, void **data) { SCLogError("Error creating thread %s: you do not have " "support for AF_XDP enabled, on Linux host please recompile " "with --enable-af-xdp", tv->name); exit(EXIT_FAILURE); } #else /* We have AF_XDP support */ #define POLL_TIMEOUT 100 #define NUM_FRAMES XSK_RING_PROD__DEFAULT_NUM_DESCS #define FRAME_SIZE XSK_UMEM__DEFAULT_FRAME_SIZE #define MEM_BYTES (NUM_FRAMES * FRAME_SIZE * 2) #define RECONNECT_TIMEOUT 500000 /* Interface state */ enum state { AFXDP_STATE_DOWN, AFXDP_STATE_UP }; struct XskInitProtect { SCMutex queue_protect; SC_ATOMIC_DECLARE(uint8_t, queue_num); } xsk_protect; struct UmemInfo { void *buf; struct xsk_umem *umem; struct xsk_ring_prod fq; struct xsk_ring_cons cq; struct xsk_umem_config cfg; int mmap_alignment_flag; }; struct QueueAssignment { uint32_t queue_num; bool assigned; }; struct XskSockInfo { struct xsk_ring_cons rx; struct xsk_ring_prod tx; struct xsk_socket *xsk; /* Queue assignment structure */ struct QueueAssignment queue; /* Configuration items */ struct xsk_socket_config cfg; bool enable_busy_poll; uint32_t busy_poll_time; uint32_t busy_poll_budget; struct pollfd fd; }; /** * \brief Structure to hold thread specific variables. */ typedef struct AFXDPThreadVars_ { ThreadVars *tv; TmSlot *slot; LiveDevice *livedev; /* thread specific socket */ int promisc; int threads; char iface[AFXDP_IFACE_NAME_LENGTH]; uint32_t ifindex; /* AF_XDP structure */ struct UmemInfo umem; struct XskSockInfo xsk; uint32_t gro_flush_timeout; uint32_t napi_defer_hard_irqs; uint32_t prog_id; /* Handle state */ uint8_t afxdp_state; /* Stats parameters */ uint64_t pkts; uint64_t bytes; uint16_t capture_afxdp_packets; uint16_t capture_kernel_drops; uint16_t capture_afxdp_poll; uint16_t capture_afxdp_poll_timeout; uint16_t capture_afxdp_poll_failed; uint16_t capture_afxdp_empty_reads; uint16_t capture_afxdp_failed_reads; uint16_t capture_afxdp_acquire_pkt_failed; } AFXDPThreadVars; static TmEcode ReceiveAFXDPThreadInit(ThreadVars *, const void *, void **); static void ReceiveAFXDPThreadExitStats(ThreadVars *, void *); static TmEcode ReceiveAFXDPThreadDeinit(ThreadVars *, void *); static TmEcode ReceiveAFXDPLoop(ThreadVars *tv, void *data, void *slot); static TmEcode DecodeAFXDPThreadInit(ThreadVars *, const void *, void **); static TmEcode DecodeAFXDPThreadDeinit(ThreadVars *tv, void *data); static TmEcode DecodeAFXDP(ThreadVars *, Packet *, void *); /** * \brief Registration Function for RecieveAFXDP. * \todo Unit tests are needed for this module. */ void TmModuleReceiveAFXDPRegister(void) { tmm_modules[TMM_RECEIVEAFXDP].name = "ReceiveAFXDP"; tmm_modules[TMM_RECEIVEAFXDP].ThreadInit = ReceiveAFXDPThreadInit; tmm_modules[TMM_RECEIVEAFXDP].Func = NULL; tmm_modules[TMM_RECEIVEAFXDP].PktAcqLoop = ReceiveAFXDPLoop; tmm_modules[TMM_RECEIVEAFXDP].PktAcqBreakLoop = NULL; tmm_modules[TMM_RECEIVEAFXDP].ThreadExitPrintStats = ReceiveAFXDPThreadExitStats; tmm_modules[TMM_RECEIVEAFXDP].ThreadDeinit = ReceiveAFXDPThreadDeinit; tmm_modules[TMM_RECEIVEAFXDP].cap_flags = SC_CAP_NET_RAW; tmm_modules[TMM_RECEIVEAFXDP].flags = TM_FLAG_RECEIVE_TM; } /** * \brief Registration Function for DecodeAFXDP. * \todo Unit tests are needed for this module. */ void TmModuleDecodeAFXDPRegister(void) { tmm_modules[TMM_DECODEAFXDP].name = "DecodeAFXDP"; tmm_modules[TMM_DECODEAFXDP].ThreadInit = DecodeAFXDPThreadInit; tmm_modules[TMM_DECODEAFXDP].Func = DecodeAFXDP; tmm_modules[TMM_DECODEAFXDP].ThreadExitPrintStats = NULL; tmm_modules[TMM_DECODEAFXDP].ThreadDeinit = DecodeAFXDPThreadDeinit; tmm_modules[TMM_DECODEAFXDP].cap_flags = 0; tmm_modules[TMM_DECODEAFXDP].flags = TM_FLAG_DECODE_TM; } static inline void AFXDPDumpCounters(AFXDPThreadVars *ptv) { struct xdp_statistics stats; socklen_t len = sizeof(struct xdp_statistics); int fd = xsk_socket__fd(ptv->xsk.xsk); if (getsockopt(fd, SOL_XDP, XDP_STATISTICS, &stats, &len) >= 0) { uint64_t rx_dropped = stats.rx_dropped + stats.rx_invalid_descs + stats.rx_ring_full; StatsAddUI64(ptv->tv, ptv->capture_kernel_drops, rx_dropped - StatsGetLocalCounterValue(ptv->tv, ptv->capture_kernel_drops)); StatsAddUI64(ptv->tv, ptv->capture_afxdp_packets, ptv->pkts); (void)SC_ATOMIC_SET(ptv->livedev->drop, rx_dropped); (void)SC_ATOMIC_ADD(ptv->livedev->pkts, ptv->pkts); SCLogDebug("(%s) Kernel: Packets %" PRIu64 ", bytes %" PRIu64 ", dropped %" PRIu64 "", ptv->tv->name, StatsGetLocalCounterValue(ptv->tv, ptv->capture_afxdp_packets), ptv->bytes, StatsGetLocalCounterValue(ptv->tv, ptv->capture_kernel_drops)); ptv->pkts = 0; } } /** * \brief Init function for socket creation. * * Mutex used to synchronise initialisation - each socket opens a * different queue. The specific order in which each queue is * opened is not important, but it is vital the queue_num's * are different. * * \param tv pointer to ThreadVars */ TmEcode AFXDPQueueProtectionInit(void) { SCEnter(); SCMutexInit(&xsk_protect.queue_protect, NULL); SC_ATOMIC_SET(xsk_protect.queue_num, 0); SCReturnInt(TM_ECODE_OK); } void AFXDPMutexClean(void) { SCMutexDestroy(&xsk_protect.queue_protect); } static TmEcode AFXDPAssignQueueID(AFXDPThreadVars *ptv) { if (ptv->xsk.queue.assigned == false) { ptv->xsk.queue.queue_num = SC_ATOMIC_GET(xsk_protect.queue_num); SC_ATOMIC_ADD(xsk_protect.queue_num, 1); /* Queue only needs assigned once, on startup */ ptv->xsk.queue.assigned = true; } SCReturnInt(TM_ECODE_OK); } static void AFXDPAllThreadsRunning(AFXDPThreadVars *ptv) { SCMutexLock(&xsk_protect.queue_protect); if ((ptv->threads - 1) == (int)ptv->xsk.queue.queue_num) { SCLogDebug("All AF_XDP capture threads are running."); } SCMutexUnlock(&xsk_protect.queue_protect); } static TmEcode AcquireBuffer(AFXDPThreadVars *ptv) { int mmap_flags = MAP_PRIVATE | MAP_ANONYMOUS | ptv->umem.mmap_alignment_flag; ptv->umem.buf = mmap(NULL, MEM_BYTES, PROT_READ | PROT_WRITE, mmap_flags, -1, 0); if (ptv->umem.buf == MAP_FAILED) { SCLogError("mmap: failed to acquire memory"); SCReturnInt(TM_ECODE_FAILED); } SCReturnInt(TM_ECODE_OK); } static TmEcode ConfigureXSKUmem(AFXDPThreadVars *ptv) { if (xsk_umem__create(&ptv->umem.umem, ptv->umem.buf, MEM_BYTES, &ptv->umem.fq, &ptv->umem.cq, &ptv->umem.cfg)) { SCLogError("failed to create umem: %s", strerror(errno)); SCReturnInt(TM_ECODE_FAILED); } SCReturnInt(TM_ECODE_OK); } static TmEcode InitFillRing(AFXDPThreadVars *ptv, const uint32_t cnt) { uint32_t idx_fq = 0; uint32_t ret = xsk_ring_prod__reserve(&ptv->umem.fq, cnt, &idx_fq); if (ret != cnt) { SCLogError("Failed to initialise the fill ring."); SCReturnInt(TM_ECODE_FAILED); } for (uint32_t i = 0; i < cnt; i++) { *xsk_ring_prod__fill_addr(&ptv->umem.fq, idx_fq++) = i * FRAME_SIZE; } xsk_ring_prod__submit(&ptv->umem.fq, cnt); SCReturnInt(TM_ECODE_OK); } /** * \brief Linux knobs are tuned to enable a NAPI polling context * * \param tv pointer to AFXDPThreadVars */ static TmEcode WriteLinuxTunables(AFXDPThreadVars *ptv) { char fname[SYSFS_MAX_FILENAME_SIZE]; if (snprintf(fname, SYSFS_MAX_FILENAME_SIZE, "class/net/%s/gro_flush_timeout", ptv->iface) < 0) { SCReturnInt(TM_ECODE_FAILED); } if (SysFsWriteValue(fname, ptv->gro_flush_timeout) != TM_ECODE_OK) { SCReturnInt(TM_ECODE_FAILED); } if (snprintf(fname, SYSFS_MAX_FILENAME_SIZE, "class/net/%s/napi_defer_hard_irqs", ptv->iface) < 0) { SCReturnInt(TM_ECODE_FAILED); } if (SysFsWriteValue(fname, ptv->napi_defer_hard_irqs) != TM_ECODE_OK) { SCReturnInt(TM_ECODE_FAILED); } SCReturnInt(TM_ECODE_OK); } static TmEcode ConfigureBusyPolling(AFXDPThreadVars *ptv) { if (!ptv->xsk.enable_busy_poll) { SCReturnInt(TM_ECODE_OK); } /* Kernel version must be >= 5.11 to avail of SO_PREFER_BUSY_POLL * see linux commit: 7fd3253a7de6a317a0683f83739479fb880bffc8 */ if (!SCKernelVersionIsAtLeast(5, 11)) { SCLogWarning("Kernel version older than required: v5.11," " upgrade kernel version to use 'enable-busy-poll' option."); SCReturnInt(TM_ECODE_FAILED); } #if defined SO_PREFER_BUSY_POLL && defined SO_BUSY_POLL && defined SO_BUSY_POLL_BUDGET const int fd = xsk_socket__fd(ptv->xsk.xsk); int sock_opt = 1; if (WriteLinuxTunables(ptv) != TM_ECODE_OK) { SCReturnInt(TM_ECODE_FAILED); } if (setsockopt(fd, SOL_SOCKET, SO_PREFER_BUSY_POLL, (void *)&sock_opt, sizeof(sock_opt)) < 0) { SCReturnInt(TM_ECODE_FAILED); } sock_opt = ptv->xsk.busy_poll_time; if (setsockopt(fd, SOL_SOCKET, SO_BUSY_POLL, (void *)&sock_opt, sizeof(sock_opt)) < 0) { SCReturnInt(TM_ECODE_FAILED); } sock_opt = ptv->xsk.busy_poll_budget; if (setsockopt(fd, SOL_SOCKET, SO_BUSY_POLL_BUDGET, (void *)&sock_opt, sizeof(sock_opt)) < 0) { SCReturnInt(TM_ECODE_FAILED); } SCReturnInt(TM_ECODE_OK); #else SCLogWarning( "Kernel does not support busy poll, upgrade kernel or disable \"enable-busy-poll\"."); SCReturnInt(TM_ECODE_FAILED); #endif } static void AFXDPSwitchState(AFXDPThreadVars *ptv, int state) { ptv->afxdp_state = state; } static TmEcode OpenXSKSocket(AFXDPThreadVars *ptv) { int ret; SCMutexLock(&xsk_protect.queue_protect); if (AFXDPAssignQueueID(ptv) != TM_ECODE_OK) { SCLogError("Failed to assign queue ID"); SCReturnInt(TM_ECODE_FAILED); } if ((ret = xsk_socket__create(&ptv->xsk.xsk, ptv->livedev->dev, ptv->xsk.queue.queue_num, ptv->umem.umem, &ptv->xsk.rx, &ptv->xsk.tx, &ptv->xsk.cfg))) { SCLogError("Failed to create socket: %s", strerror(-ret)); SCReturnInt(TM_ECODE_FAILED); } SCLogDebug("bind to %s on queue %u", ptv->iface, ptv->xsk.queue.queue_num); /* For polling and socket options */ ptv->xsk.fd.fd = xsk_socket__fd(ptv->xsk.xsk); ptv->xsk.fd.events = POLLIN; /* Set state */ AFXDPSwitchState(ptv, AFXDP_STATE_UP); SCMutexUnlock(&xsk_protect.queue_protect); SCReturnInt(TM_ECODE_OK); } static void AFXDPCloseSocket(AFXDPThreadVars *ptv) { if (ptv->xsk.xsk) { xsk_socket__delete(ptv->xsk.xsk); ptv->xsk.xsk = NULL; } if (ptv->umem.umem) { xsk_umem__delete(ptv->umem.umem); ptv->umem.umem = NULL; } memset(&ptv->umem.fq, 0, sizeof(struct xsk_ring_prod)); memset(&ptv->umem.cq, 0, sizeof(struct xsk_ring_cons)); } static TmEcode AFXDPSocketCreation(AFXDPThreadVars *ptv) { if (ConfigureXSKUmem(ptv) != TM_ECODE_OK) { SCReturnInt(TM_ECODE_FAILED); } if (InitFillRing(ptv, NUM_FRAMES * 2) != TM_ECODE_OK) { SCReturnInt(TM_ECODE_FAILED); } /* Open AF_XDP socket */ if (OpenXSKSocket(ptv) != TM_ECODE_OK) { SCReturnInt(TM_ECODE_FAILED); } if (ConfigureBusyPolling(ptv) != TM_ECODE_OK) { SCLogWarning("Failed to configure busy polling" " performance may be reduced."); } /* Has the eBPF program successfully bound? */ #ifdef HAVE_BPF_XDP_QUERY_ID if (bpf_xdp_query_id(ptv->ifindex, ptv->xsk.cfg.xdp_flags, &ptv->prog_id)) { SCLogError("Failed to attach eBPF program to interface: %s", ptv->livedev->dev); SCReturnInt(TM_ECODE_FAILED); } #else if (bpf_get_link_xdp_id(ptv->ifindex, &ptv->prog_id, ptv->xsk.cfg.xdp_flags)) { SCLogError("Failed to attach eBPF program to interface: %s", ptv->livedev->dev); SCReturnInt(TM_ECODE_FAILED); } #endif SCReturnInt(TM_ECODE_OK); } /** * \brief Try to reopen AF_XDP socket * * \retval: TM_ECODE_OK in case of success * TM_ECODE_FAILED if error occurs or a condition is not met. */ static TmEcode AFXDPTryReopen(AFXDPThreadVars *ptv) { AFXDPCloseSocket(ptv); usleep(RECONNECT_TIMEOUT); int if_flags = GetIfaceFlags(ptv->iface); if (if_flags == -1) { SCLogDebug("Couldn't get flags for interface '%s'", ptv->iface); goto sock_err; } else if ((if_flags & (IFF_UP | IFF_RUNNING)) == 0) { SCLogDebug("Interface '%s' is down", ptv->iface); goto sock_err; } if (AFXDPSocketCreation(ptv) != TM_ECODE_OK) { SCReturnInt(TM_ECODE_FAILED); } SCLogInfo("Interface '%s' is back", ptv->iface); SCReturnInt(TM_ECODE_OK); sock_err: SCReturnInt(TM_ECODE_FAILED); } /** * \brief Write packet entry to the fill ring, freeing * this slot for re/fill with inbound packet descriptor * \param pointer to Packet * \retval: None */ static void AFXDPReleasePacket(Packet *p) { *xsk_ring_prod__fill_addr((struct xsk_ring_prod *)p->afxdp_v.fq, p->afxdp_v.fq_idx) = p->afxdp_v.orig; PacketFreeOrRelease(p); } static inline int DumpStatsEverySecond(AFXDPThreadVars *ptv, time_t *last_dump) { int stats_dumped = 0; time_t current_time = time(NULL); if (current_time != *last_dump) { AFXDPDumpCounters(ptv); *last_dump = current_time; stats_dumped = 1; } StatsSyncCountersIfSignalled(ptv->tv); return stats_dumped; } static inline ssize_t WakeupSocket(void *data) { ssize_t res = 0; AFXDPThreadVars *ptv = (AFXDPThreadVars *)data; /* Assuming kernel >= 5.11 in use if xdp_busy_poll is enabled */ if (ptv->xsk.enable_busy_poll || xsk_ring_prod__needs_wakeup(&ptv->umem.fq)) { res = recvfrom(xsk_socket__fd(ptv->xsk.xsk), NULL, 0, MSG_DONTWAIT, NULL, NULL); } return res; } /** * \brief Init function for ReceiveAFXDP. * * \param tv pointer to ThreadVars * \param initdata pointer to the interface passed from the user * \param data pointer gets populated with AFPThreadVars * * \todo Create a general AFP setup function. */ static TmEcode ReceiveAFXDPThreadInit(ThreadVars *tv, const void *initdata, void **data) { SCEnter(); AFXDPIfaceConfig *afxdpconfig = (AFXDPIfaceConfig *)initdata; if (initdata == NULL) { SCLogError("initdata == NULL"); SCReturnInt(TM_ECODE_FAILED); } AFXDPThreadVars *ptv = SCMalloc(sizeof(AFXDPThreadVars)); if (unlikely(ptv == NULL)) { afxdpconfig->DerefFunc(afxdpconfig); SCReturnInt(TM_ECODE_FAILED); } memset(ptv, 0, sizeof(AFXDPThreadVars)); ptv->tv = tv; strlcpy(ptv->iface, afxdpconfig->iface, AFXDP_IFACE_NAME_LENGTH); ptv->iface[AFXDP_IFACE_NAME_LENGTH - 1] = '\0'; ptv->ifindex = if_nametoindex(ptv->iface); ptv->livedev = LiveGetDevice(ptv->iface); if (ptv->livedev == NULL) { SCLogError("Unable to find Live device"); SCFree(ptv); SCReturnInt(TM_ECODE_FAILED); } ptv->promisc = afxdpconfig->promisc; if (ptv->promisc != 0) { /* Force promiscuous mode */ if (SetIfaceFlags(ptv->iface, IFF_PROMISC | IFF_UP) != 0) { SCLogError("Failed to switch interface (%s) to promiscuous, error %s", ptv->iface, strerror(errno)); SCFree(ptv); SCReturnInt(TM_ECODE_FAILED); } } ptv->threads = afxdpconfig->threads; /* Socket configuration */ ptv->xsk.cfg.rx_size = XSK_RING_CONS__DEFAULT_NUM_DESCS; ptv->xsk.cfg.tx_size = XSK_RING_PROD__DEFAULT_NUM_DESCS; ptv->xsk.cfg.xdp_flags = afxdpconfig->mode; ptv->xsk.cfg.bind_flags = afxdpconfig->bind_flags; /* UMEM configuration */ ptv->umem.cfg.fill_size = XSK_RING_PROD__DEFAULT_NUM_DESCS * 2; ptv->umem.cfg.comp_size = XSK_RING_CONS__DEFAULT_NUM_DESCS; ptv->umem.cfg.frame_size = XSK_UMEM__DEFAULT_FRAME_SIZE; ptv->umem.cfg.frame_headroom = XSK_UMEM__DEFAULT_FRAME_HEADROOM; ptv->umem.cfg.flags = afxdpconfig->mem_alignment; /* Use hugepages if unaligned chunk mode */ if (ptv->umem.cfg.flags == XDP_UMEM_UNALIGNED_CHUNK_FLAG) { ptv->umem.mmap_alignment_flag = MAP_HUGETLB; } /* Busy polling configuration */ ptv->xsk.enable_busy_poll = afxdpconfig->enable_busy_poll; ptv->xsk.busy_poll_budget = afxdpconfig->busy_poll_budget; ptv->xsk.busy_poll_time = afxdpconfig->busy_poll_time; ptv->gro_flush_timeout = afxdpconfig->gro_flush_timeout; ptv->napi_defer_hard_irqs = afxdpconfig->napi_defer_hard_irqs; /* Stats registration */ ptv->capture_afxdp_packets = StatsRegisterCounter("capture.afxdp_packets", ptv->tv); ptv->capture_kernel_drops = StatsRegisterCounter("capture.kernel_drops", ptv->tv); ptv->capture_afxdp_poll = StatsRegisterCounter("capture.afxdp.poll", ptv->tv); ptv->capture_afxdp_poll_timeout = StatsRegisterCounter("capture.afxdp.poll_timeout", ptv->tv); ptv->capture_afxdp_poll_failed = StatsRegisterCounter("capture.afxdp.poll_failed", ptv->tv); ptv->capture_afxdp_empty_reads = StatsRegisterCounter("capture.afxdp.empty_reads", ptv->tv); ptv->capture_afxdp_failed_reads = StatsRegisterCounter("capture.afxdp.failed_reads", ptv->tv); ptv->capture_afxdp_acquire_pkt_failed = StatsRegisterCounter("capture.afxdp.acquire_pkt_failed", ptv->tv); /* Reserve memory for umem */ if (AcquireBuffer(ptv) != TM_ECODE_OK) { SCFree(ptv); SCReturnInt(TM_ECODE_FAILED); } if (AFXDPSocketCreation(ptv) != TM_ECODE_OK) { ReceiveAFXDPThreadDeinit(tv, ptv); SCReturnInt(TM_ECODE_FAILED); } *data = (void *)ptv; afxdpconfig->DerefFunc(afxdpconfig); SCReturnInt(TM_ECODE_OK); } /** * \brief Main AF_XDP reading Loop function */ static TmEcode ReceiveAFXDPLoop(ThreadVars *tv, void *data, void *slot) { SCEnter(); Packet *p; time_t last_dump = 0; struct timeval ts; uint32_t idx_rx = 0, idx_fq = 0, rcvd; int r; AFXDPThreadVars *ptv = (AFXDPThreadVars *)data; TmSlot *s = (TmSlot *)slot; ptv->slot = s->slot_next; AFXDPAllThreadsRunning(ptv); // Indicate that the thread is actually running its application level code (i.e., it can poll // packets) TmThreadsSetFlag(tv, THV_RUNNING); PacketPoolWait(); while (1) { /* Start by checking the state of our interface */ if (unlikely(ptv->afxdp_state == AFXDP_STATE_DOWN)) { do { usleep(RECONNECT_TIMEOUT); if (unlikely(suricata_ctl_flags != 0)) { break; } r = AFXDPTryReopen(ptv); } while (r != TM_ECODE_OK); } if (unlikely(suricata_ctl_flags != 0)) { SCLogDebug("Stopping Suricata!"); AFXDPDumpCounters(ptv); break; } /* Busy polling is not set, using poll() to maintain (relatively) decent * performance. xdp_busy_poll must be disabled for kernels < 5.11 */ if (!ptv->xsk.enable_busy_poll) { StatsIncr(ptv->tv, ptv->capture_afxdp_poll); r = poll(&ptv->xsk.fd, 1, POLL_TIMEOUT); /* Report poll results */ if (r <= 0) { if (r == 0) { StatsIncr(ptv->tv, ptv->capture_afxdp_poll_timeout); } else if (r < 0) { StatsIncr(ptv->tv, ptv->capture_afxdp_poll_failed); SCLogWarning("poll failed with retval %d", r); AFXDPSwitchState(ptv, AFXDP_STATE_DOWN); } DumpStatsEverySecond(ptv, &last_dump); continue; } } rcvd = xsk_ring_cons__peek(&ptv->xsk.rx, ptv->xsk.busy_poll_budget, &idx_rx); if (!rcvd) { StatsIncr(ptv->tv, ptv->capture_afxdp_empty_reads); ssize_t ret = WakeupSocket(ptv); if (ret < 0) { SCLogWarning("recv failed with retval %ld", ret); AFXDPSwitchState(ptv, AFXDP_STATE_DOWN); } DumpStatsEverySecond(ptv, &last_dump); continue; } uint32_t res = xsk_ring_prod__reserve(&ptv->umem.fq, rcvd, &idx_fq); while (res != rcvd) { StatsIncr(ptv->tv, ptv->capture_afxdp_failed_reads); ssize_t ret = WakeupSocket(ptv); if (ret < 0) { SCLogWarning("recv failed with retval %ld", ret); AFXDPSwitchState(ptv, AFXDP_STATE_DOWN); continue; } res = xsk_ring_prod__reserve(&ptv->umem.fq, rcvd, &idx_fq); } gettimeofday(&ts, NULL); ptv->pkts += rcvd; for (uint32_t i = 0; i < rcvd; i++) { p = PacketGetFromQueueOrAlloc(); if (unlikely(p == NULL)) { StatsIncr(ptv->tv, ptv->capture_afxdp_acquire_pkt_failed); continue; } PKT_SET_SRC(p, PKT_SRC_WIRE); p->datalink = LINKTYPE_ETHERNET; p->livedev = ptv->livedev; p->ReleasePacket = AFXDPReleasePacket; p->flags |= PKT_IGNORE_CHECKSUM; p->ts = SCTIME_FROM_TIMEVAL(&ts); uint64_t addr = xsk_ring_cons__rx_desc(&ptv->xsk.rx, idx_rx)->addr; uint32_t len = xsk_ring_cons__rx_desc(&ptv->xsk.rx, idx_rx++)->len; uint64_t orig = xsk_umem__extract_addr(addr); addr = xsk_umem__add_offset_to_addr(addr); uint8_t *pkt_data = xsk_umem__get_data(ptv->umem.buf, addr); ptv->bytes += len; p->afxdp_v.fq_idx = idx_fq++; p->afxdp_v.orig = orig; p->afxdp_v.fq = &ptv->umem.fq; PacketSetData(p, pkt_data, len); if (TmThreadsSlotProcessPkt(ptv->tv, ptv->slot, p) != TM_ECODE_OK) { TmqhOutputPacketpool(ptv->tv, p); SCReturnInt(EXIT_FAILURE); } } xsk_ring_prod__submit(&ptv->umem.fq, rcvd); xsk_ring_cons__release(&ptv->xsk.rx, rcvd); /* Trigger one dump of stats every second */ DumpStatsEverySecond(ptv, &last_dump); } SCReturnInt(TM_ECODE_OK); } /** * \brief DeInit function closes af-xdp socket at exit. * \param tv pointer to ThreadVars * \param data pointer that gets cast into AFXDPPThreadVars for ptv */ static TmEcode ReceiveAFXDPThreadDeinit(ThreadVars *tv, void *data) { AFXDPThreadVars *ptv = (AFXDPThreadVars *)data; if (ptv->xsk.xsk) { xsk_socket__delete(ptv->xsk.xsk); ptv->xsk.xsk = NULL; } if (ptv->umem.umem) { xsk_umem__delete(ptv->umem.umem); ptv->umem.umem = NULL; } munmap(ptv->umem.buf, MEM_BYTES); SCFree(ptv); SCReturnInt(TM_ECODE_OK); } /** * \brief This function prints stats to the screen at exit. * \param tv pointer to ThreadVars * \param data pointer that gets cast into AFXDPThreadVars for ptv */ static void ReceiveAFXDPThreadExitStats(ThreadVars *tv, void *data) { SCEnter(); AFXDPThreadVars *ptv = (AFXDPThreadVars *)data; AFXDPDumpCounters(ptv); SCLogPerf("(%s) Kernel: Packets %" PRIu64 ", bytes %" PRIu64 ", dropped %" PRIu64 "", tv->name, StatsGetLocalCounterValue(tv, ptv->capture_afxdp_packets), ptv->bytes, StatsGetLocalCounterValue(tv, ptv->capture_kernel_drops)); } /** * \brief This function passes off to link type decoders. * * DecodeAFXDP decodes packets from AF_XDP and passes * them off to the proper link type decoder. * * \param t pointer to ThreadVars * \param p pointer to the current packet * \param data pointer that gets cast into AFXDPThreadVars for ptv */ static TmEcode DecodeAFXDP(ThreadVars *tv, Packet *p, void *data) { SCEnter(); DecodeThreadVars *dtv = (DecodeThreadVars *)data; DEBUG_VALIDATE_BUG_ON(PKT_IS_PSEUDOPKT(p)); /* update counters */ DecodeUpdatePacketCounters(tv, dtv, p); /* If suri has set vlan during reading, we increase vlan counter */ if (p->vlan_idx) { StatsIncr(tv, dtv->counter_vlan); } /* call the decoder */ DecodeLinkLayer(tv, dtv, p->datalink, p, GET_PKT_DATA(p), GET_PKT_LEN(p)); PacketDecodeFinalize(tv, dtv, p); SCReturnInt(TM_ECODE_OK); } static TmEcode DecodeAFXDPThreadInit(ThreadVars *tv, const void *initdata, void **data) { SCEnter(); DecodeThreadVars *dtv = DecodeThreadVarsAlloc(tv); if (dtv == NULL) SCReturnInt(TM_ECODE_FAILED); DecodeRegisterPerfCounters(dtv, tv); *data = (void *)dtv; SCReturnInt(TM_ECODE_OK); } static TmEcode DecodeAFXDPThreadDeinit(ThreadVars *tv, void *data) { if (data != NULL) DecodeThreadVarsFree(tv, data); SCReturnInt(TM_ECODE_OK); } #endif /* HAVE_AF_XDP */ /* eof */ /** * @} */