diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-21 11:54:28 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-21 11:54:28 +0000 |
commit | e6918187568dbd01842d8d1d2c808ce16a894239 (patch) | |
tree | 64f88b554b444a49f656b6c656111a145cbbaa28 /src/spdk/module/sock | |
parent | Initial commit. (diff) | |
download | ceph-e6918187568dbd01842d8d1d2c808ce16a894239.tar.xz ceph-e6918187568dbd01842d8d1d2c808ce16a894239.zip |
Adding upstream version 18.2.2.upstream/18.2.2
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'src/spdk/module/sock')
-rw-r--r-- | src/spdk/module/sock/Makefile | 48 | ||||
-rw-r--r-- | src/spdk/module/sock/posix/Makefile | 45 | ||||
-rw-r--r-- | src/spdk/module/sock/posix/posix.c | 1405 | ||||
-rw-r--r-- | src/spdk/module/sock/uring/Makefile | 45 | ||||
-rw-r--r-- | src/spdk/module/sock/uring/uring.c | 1328 | ||||
-rw-r--r-- | src/spdk/module/sock/vpp/Makefile | 55 | ||||
-rw-r--r-- | src/spdk/module/sock/vpp/vpp.c | 1633 |
7 files changed, 4559 insertions, 0 deletions
diff --git a/src/spdk/module/sock/Makefile b/src/spdk/module/sock/Makefile new file mode 100644 index 000000000..865743d06 --- /dev/null +++ b/src/spdk/module/sock/Makefile @@ -0,0 +1,48 @@ +# +# BSD LICENSE +# +# Copyright (c) Intel Corporation. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions +# are met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above copyright +# notice, this list of conditions and the following disclaimer in +# the documentation and/or other materials provided with the +# distribution. +# * Neither the name of Intel Corporation nor the names of its +# contributors may be used to endorse or promote products derived +# from this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. +# + +SPDK_ROOT_DIR := $(abspath $(CURDIR)/../..) +include $(SPDK_ROOT_DIR)/mk/spdk.common.mk + +DIRS-y = posix +ifeq ($(OS), Linux) +DIRS-$(CONFIG_URING) += uring +endif +DIRS-$(CONFIG_VPP) += vpp + +.PHONY: all clean $(DIRS-y) + +all: $(DIRS-y) +clean: $(DIRS-y) + +include $(SPDK_ROOT_DIR)/mk/spdk.subdirs.mk diff --git a/src/spdk/module/sock/posix/Makefile b/src/spdk/module/sock/posix/Makefile new file mode 100644 index 000000000..9783e024d --- /dev/null +++ b/src/spdk/module/sock/posix/Makefile @@ -0,0 +1,45 @@ +# +# BSD LICENSE +# +# Copyright (c) Intel Corporation. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions +# are met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above copyright +# notice, this list of conditions and the following disclaimer in +# the documentation and/or other materials provided with the +# distribution. +# * Neither the name of Intel Corporation nor the names of its +# contributors may be used to endorse or promote products derived +# from this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. +# + +SPDK_ROOT_DIR := $(abspath $(CURDIR)/../../..) +include $(SPDK_ROOT_DIR)/mk/spdk.common.mk + +SO_VER := 2 +SO_MINOR := 0 + +LIBNAME = sock_posix +C_SRCS = posix.c + +SPDK_MAP_FILE = $(SPDK_ROOT_DIR)/mk/spdk_blank.map + +include $(SPDK_ROOT_DIR)/mk/spdk.lib.mk diff --git a/src/spdk/module/sock/posix/posix.c b/src/spdk/module/sock/posix/posix.c new file mode 100644 index 000000000..4eb1bf106 --- /dev/null +++ b/src/spdk/module/sock/posix/posix.c @@ -0,0 +1,1405 @@ +/*- + * BSD LICENSE + * + * Copyright (c) Intel Corporation. All rights reserved. + * Copyright (c) 2020 Mellanox Technologies LTD. All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions + * are met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in + * the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Intel Corporation nor the names of its + * contributors may be used to endorse or promote products derived + * from this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +#include "spdk/stdinc.h" + +#if defined(__linux__) +#include <sys/epoll.h> +#include <linux/errqueue.h> +#elif defined(__FreeBSD__) +#include <sys/event.h> +#endif + +#include "spdk/log.h" +#include "spdk/pipe.h" +#include "spdk/sock.h" +#include "spdk/util.h" +#include "spdk/likely.h" +#include "spdk_internal/sock.h" + +#define MAX_TMPBUF 1024 +#define PORTNUMLEN 32 +#define MIN_SO_RCVBUF_SIZE (2 * 1024 * 1024) +#define MIN_SO_SNDBUF_SIZE (2 * 1024 * 1024) +#define IOV_BATCH_SIZE 64 + +#if defined(SO_ZEROCOPY) && defined(MSG_ZEROCOPY) +#define SPDK_ZEROCOPY +#endif + +struct spdk_posix_sock { + struct spdk_sock base; + int fd; + + uint32_t sendmsg_idx; + bool zcopy; + + struct spdk_pipe *recv_pipe; + void *recv_buf; + int recv_buf_sz; + bool pending_recv; + int so_priority; + + TAILQ_ENTRY(spdk_posix_sock) link; +}; + +struct spdk_posix_sock_group_impl { + struct spdk_sock_group_impl base; + int fd; + TAILQ_HEAD(, spdk_posix_sock) pending_recv; +}; + +static struct spdk_sock_impl_opts g_spdk_posix_sock_impl_opts = { + .recv_buf_size = MIN_SO_RCVBUF_SIZE, + .send_buf_size = MIN_SO_SNDBUF_SIZE, + .enable_recv_pipe = true, + .enable_zerocopy_send = true +}; + +static int +get_addr_str(struct sockaddr *sa, char *host, size_t hlen) +{ + const char *result = NULL; + + if (sa == NULL || host == NULL) { + return -1; + } + + switch (sa->sa_family) { + case AF_INET: + result = inet_ntop(AF_INET, &(((struct sockaddr_in *)sa)->sin_addr), + host, hlen); + break; + case AF_INET6: + result = inet_ntop(AF_INET6, &(((struct sockaddr_in6 *)sa)->sin6_addr), + host, hlen); + break; + default: + break; + } + + if (result != NULL) { + return 0; + } else { + return -1; + } +} + +#define __posix_sock(sock) (struct spdk_posix_sock *)sock +#define __posix_group_impl(group) (struct spdk_posix_sock_group_impl *)group + +static int +posix_sock_getaddr(struct spdk_sock *_sock, char *saddr, int slen, uint16_t *sport, + char *caddr, int clen, uint16_t *cport) +{ + struct spdk_posix_sock *sock = __posix_sock(_sock); + struct sockaddr_storage sa; + socklen_t salen; + int rc; + + assert(sock != NULL); + + memset(&sa, 0, sizeof sa); + salen = sizeof sa; + rc = getsockname(sock->fd, (struct sockaddr *) &sa, &salen); + if (rc != 0) { + SPDK_ERRLOG("getsockname() failed (errno=%d)\n", errno); + return -1; + } + + switch (sa.ss_family) { + case AF_UNIX: + /* Acceptable connection types that don't have IPs */ + return 0; + case AF_INET: + case AF_INET6: + /* Code below will get IP addresses */ + break; + default: + /* Unsupported socket family */ + return -1; + } + + rc = get_addr_str((struct sockaddr *)&sa, saddr, slen); + if (rc != 0) { + SPDK_ERRLOG("getnameinfo() failed (errno=%d)\n", errno); + return -1; + } + + if (sport) { + if (sa.ss_family == AF_INET) { + *sport = ntohs(((struct sockaddr_in *) &sa)->sin_port); + } else if (sa.ss_family == AF_INET6) { + *sport = ntohs(((struct sockaddr_in6 *) &sa)->sin6_port); + } + } + + memset(&sa, 0, sizeof sa); + salen = sizeof sa; + rc = getpeername(sock->fd, (struct sockaddr *) &sa, &salen); + if (rc != 0) { + SPDK_ERRLOG("getpeername() failed (errno=%d)\n", errno); + return -1; + } + + rc = get_addr_str((struct sockaddr *)&sa, caddr, clen); + if (rc != 0) { + SPDK_ERRLOG("getnameinfo() failed (errno=%d)\n", errno); + return -1; + } + + if (cport) { + if (sa.ss_family == AF_INET) { + *cport = ntohs(((struct sockaddr_in *) &sa)->sin_port); + } else if (sa.ss_family == AF_INET6) { + *cport = ntohs(((struct sockaddr_in6 *) &sa)->sin6_port); + } + } + + return 0; +} + +enum posix_sock_create_type { + SPDK_SOCK_CREATE_LISTEN, + SPDK_SOCK_CREATE_CONNECT, +}; + +static int +posix_sock_alloc_pipe(struct spdk_posix_sock *sock, int sz) +{ + uint8_t *new_buf; + struct spdk_pipe *new_pipe; + struct iovec siov[2]; + struct iovec diov[2]; + int sbytes; + ssize_t bytes; + + if (sock->recv_buf_sz == sz) { + return 0; + } + + /* If the new size is 0, just free the pipe */ + if (sz == 0) { + spdk_pipe_destroy(sock->recv_pipe); + free(sock->recv_buf); + sock->recv_pipe = NULL; + sock->recv_buf = NULL; + return 0; + } else if (sz < MIN_SOCK_PIPE_SIZE) { + SPDK_ERRLOG("The size of the pipe must be larger than %d\n", MIN_SOCK_PIPE_SIZE); + return -1; + } + + /* Round up to next 64 byte multiple */ + new_buf = calloc(SPDK_ALIGN_CEIL(sz + 1, 64), sizeof(uint8_t)); + if (!new_buf) { + SPDK_ERRLOG("socket recv buf allocation failed\n"); + return -ENOMEM; + } + + new_pipe = spdk_pipe_create(new_buf, sz + 1); + if (new_pipe == NULL) { + SPDK_ERRLOG("socket pipe allocation failed\n"); + free(new_buf); + return -ENOMEM; + } + + if (sock->recv_pipe != NULL) { + /* Pull all of the data out of the old pipe */ + sbytes = spdk_pipe_reader_get_buffer(sock->recv_pipe, sock->recv_buf_sz, siov); + if (sbytes > sz) { + /* Too much data to fit into the new pipe size */ + spdk_pipe_destroy(new_pipe); + free(new_buf); + return -EINVAL; + } + + sbytes = spdk_pipe_writer_get_buffer(new_pipe, sz, diov); + assert(sbytes == sz); + + bytes = spdk_iovcpy(siov, 2, diov, 2); + spdk_pipe_writer_advance(new_pipe, bytes); + + spdk_pipe_destroy(sock->recv_pipe); + free(sock->recv_buf); + } + + sock->recv_buf_sz = sz; + sock->recv_buf = new_buf; + sock->recv_pipe = new_pipe; + + return 0; +} + +static int +posix_sock_set_recvbuf(struct spdk_sock *_sock, int sz) +{ + struct spdk_posix_sock *sock = __posix_sock(_sock); + int rc; + + assert(sock != NULL); + + if (g_spdk_posix_sock_impl_opts.enable_recv_pipe) { + rc = posix_sock_alloc_pipe(sock, sz); + if (rc) { + return rc; + } + } + + /* Set kernel buffer size to be at least MIN_SO_RCVBUF_SIZE */ + if (sz < MIN_SO_RCVBUF_SIZE) { + sz = MIN_SO_RCVBUF_SIZE; + } + + rc = setsockopt(sock->fd, SOL_SOCKET, SO_RCVBUF, &sz, sizeof(sz)); + if (rc < 0) { + return rc; + } + + return 0; +} + +static int +posix_sock_set_sendbuf(struct spdk_sock *_sock, int sz) +{ + struct spdk_posix_sock *sock = __posix_sock(_sock); + int rc; + + assert(sock != NULL); + + if (sz < MIN_SO_SNDBUF_SIZE) { + sz = MIN_SO_SNDBUF_SIZE; + } + + rc = setsockopt(sock->fd, SOL_SOCKET, SO_SNDBUF, &sz, sizeof(sz)); + if (rc < 0) { + return rc; + } + + return 0; +} + +static struct spdk_posix_sock * +posix_sock_alloc(int fd, bool enable_zero_copy) +{ + struct spdk_posix_sock *sock; +#ifdef SPDK_ZEROCOPY + int rc; + int flag; +#endif + + sock = calloc(1, sizeof(*sock)); + if (sock == NULL) { + SPDK_ERRLOG("sock allocation failed\n"); + return NULL; + } + + sock->fd = fd; + +#ifdef SPDK_ZEROCOPY + if (!enable_zero_copy || !g_spdk_posix_sock_impl_opts.enable_zerocopy_send) { + return sock; + } + + /* Try to turn on zero copy sends */ + flag = 1; + rc = setsockopt(sock->fd, SOL_SOCKET, SO_ZEROCOPY, &flag, sizeof(flag)); + if (rc == 0) { + sock->zcopy = true; + } +#endif + + return sock; +} + +static bool +sock_is_loopback(int fd) +{ + struct ifaddrs *addrs, *tmp; + struct sockaddr_storage sa = {}; + socklen_t salen; + struct ifreq ifr = {}; + char ip_addr[256], ip_addr_tmp[256]; + int rc; + bool is_loopback = false; + + salen = sizeof(sa); + rc = getsockname(fd, (struct sockaddr *)&sa, &salen); + if (rc != 0) { + return is_loopback; + } + + memset(ip_addr, 0, sizeof(ip_addr)); + rc = get_addr_str((struct sockaddr *)&sa, ip_addr, sizeof(ip_addr)); + if (rc != 0) { + return is_loopback; + } + + getifaddrs(&addrs); + for (tmp = addrs; tmp != NULL; tmp = tmp->ifa_next) { + if (tmp->ifa_addr && (tmp->ifa_flags & IFF_UP) && + (tmp->ifa_addr->sa_family == sa.ss_family)) { + memset(ip_addr_tmp, 0, sizeof(ip_addr_tmp)); + rc = get_addr_str(tmp->ifa_addr, ip_addr_tmp, sizeof(ip_addr_tmp)); + if (rc != 0) { + continue; + } + + if (strncmp(ip_addr, ip_addr_tmp, sizeof(ip_addr)) == 0) { + memcpy(ifr.ifr_name, tmp->ifa_name, sizeof(ifr.ifr_name)); + ioctl(fd, SIOCGIFFLAGS, &ifr); + if (ifr.ifr_flags & IFF_LOOPBACK) { + is_loopback = true; + } + goto end; + } + } + } + +end: + freeifaddrs(addrs); + return is_loopback; +} + +static struct spdk_sock * +posix_sock_create(const char *ip, int port, + enum posix_sock_create_type type, + struct spdk_sock_opts *opts) +{ + struct spdk_posix_sock *sock; + char buf[MAX_TMPBUF]; + char portnum[PORTNUMLEN]; + char *p; + struct addrinfo hints, *res, *res0; + int fd, flag; + int val = 1; + int rc, sz; + bool enable_zero_copy = true; + + if (ip == NULL) { + return NULL; + } + if (ip[0] == '[') { + snprintf(buf, sizeof(buf), "%s", ip + 1); + p = strchr(buf, ']'); + if (p != NULL) { + *p = '\0'; + } + ip = (const char *) &buf[0]; + } + + snprintf(portnum, sizeof portnum, "%d", port); + memset(&hints, 0, sizeof hints); + hints.ai_family = PF_UNSPEC; + hints.ai_socktype = SOCK_STREAM; + hints.ai_flags = AI_NUMERICSERV; + hints.ai_flags |= AI_PASSIVE; + hints.ai_flags |= AI_NUMERICHOST; + rc = getaddrinfo(ip, portnum, &hints, &res0); + if (rc != 0) { + SPDK_ERRLOG("getaddrinfo() failed (errno=%d)\n", errno); + return NULL; + } + + /* try listen */ + fd = -1; + for (res = res0; res != NULL; res = res->ai_next) { +retry: + fd = socket(res->ai_family, res->ai_socktype, res->ai_protocol); + if (fd < 0) { + /* error */ + continue; + } + + sz = g_spdk_posix_sock_impl_opts.recv_buf_size; + rc = setsockopt(fd, SOL_SOCKET, SO_RCVBUF, &sz, sizeof(sz)); + if (rc) { + /* Not fatal */ + } + + sz = g_spdk_posix_sock_impl_opts.send_buf_size; + rc = setsockopt(fd, SOL_SOCKET, SO_SNDBUF, &sz, sizeof(sz)); + if (rc) { + /* Not fatal */ + } + + rc = setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &val, sizeof val); + if (rc != 0) { + close(fd); + /* error */ + continue; + } + rc = setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &val, sizeof val); + if (rc != 0) { + close(fd); + /* error */ + continue; + } + +#if defined(SO_PRIORITY) + if (opts != NULL && opts->priority) { + rc = setsockopt(fd, SOL_SOCKET, SO_PRIORITY, &opts->priority, sizeof val); + if (rc != 0) { + close(fd); + /* error */ + continue; + } + } +#endif + + if (res->ai_family == AF_INET6) { + rc = setsockopt(fd, IPPROTO_IPV6, IPV6_V6ONLY, &val, sizeof val); + if (rc != 0) { + close(fd); + /* error */ + continue; + } + } + + if (type == SPDK_SOCK_CREATE_LISTEN) { + rc = bind(fd, res->ai_addr, res->ai_addrlen); + if (rc != 0) { + SPDK_ERRLOG("bind() failed at port %d, errno = %d\n", port, errno); + switch (errno) { + case EINTR: + /* interrupted? */ + close(fd); + goto retry; + case EADDRNOTAVAIL: + SPDK_ERRLOG("IP address %s not available. " + "Verify IP address in config file " + "and make sure setup script is " + "run before starting spdk app.\n", ip); + /* FALLTHROUGH */ + default: + /* try next family */ + close(fd); + fd = -1; + continue; + } + } + /* bind OK */ + rc = listen(fd, 512); + if (rc != 0) { + SPDK_ERRLOG("listen() failed, errno = %d\n", errno); + close(fd); + fd = -1; + break; + } + } else if (type == SPDK_SOCK_CREATE_CONNECT) { + rc = connect(fd, res->ai_addr, res->ai_addrlen); + if (rc != 0) { + SPDK_ERRLOG("connect() failed, errno = %d\n", errno); + /* try next family */ + close(fd); + fd = -1; + continue; + } + } + + flag = fcntl(fd, F_GETFL); + if (fcntl(fd, F_SETFL, flag | O_NONBLOCK) < 0) { + SPDK_ERRLOG("fcntl can't set nonblocking mode for socket, fd: %d (%d)\n", fd, errno); + close(fd); + fd = -1; + break; + } + break; + } + freeaddrinfo(res0); + + if (fd < 0) { + return NULL; + } + + if (type == SPDK_SOCK_CREATE_LISTEN) { + /* Only enable zero copy for non-loopback sockets. */ + enable_zero_copy = !sock_is_loopback(fd); + } else if (type == SPDK_SOCK_CREATE_CONNECT) { + /* Disable zero copy for client sockets until support is added */ + enable_zero_copy = false; + } + + sock = posix_sock_alloc(fd, enable_zero_copy); + if (sock == NULL) { + SPDK_ERRLOG("sock allocation failed\n"); + close(fd); + return NULL; + } + + if (opts != NULL) { + sock->so_priority = opts->priority; + } + return &sock->base; +} + +static struct spdk_sock * +posix_sock_listen(const char *ip, int port, struct spdk_sock_opts *opts) +{ + return posix_sock_create(ip, port, SPDK_SOCK_CREATE_LISTEN, opts); +} + +static struct spdk_sock * +posix_sock_connect(const char *ip, int port, struct spdk_sock_opts *opts) +{ + return posix_sock_create(ip, port, SPDK_SOCK_CREATE_CONNECT, opts); +} + +static struct spdk_sock * +posix_sock_accept(struct spdk_sock *_sock) +{ + struct spdk_posix_sock *sock = __posix_sock(_sock); + struct sockaddr_storage sa; + socklen_t salen; + int rc, fd; + struct spdk_posix_sock *new_sock; + int flag; + + memset(&sa, 0, sizeof(sa)); + salen = sizeof(sa); + + assert(sock != NULL); + + rc = accept(sock->fd, (struct sockaddr *)&sa, &salen); + + if (rc == -1) { + return NULL; + } + + fd = rc; + + flag = fcntl(fd, F_GETFL); + if ((!(flag & O_NONBLOCK)) && (fcntl(fd, F_SETFL, flag | O_NONBLOCK) < 0)) { + SPDK_ERRLOG("fcntl can't set nonblocking mode for socket, fd: %d (%d)\n", fd, errno); + close(fd); + return NULL; + } + +#if defined(SO_PRIORITY) + /* The priority is not inherited, so call this function again */ + if (sock->base.opts.priority) { + rc = setsockopt(fd, SOL_SOCKET, SO_PRIORITY, &sock->base.opts.priority, sizeof(int)); + if (rc != 0) { + close(fd); + return NULL; + } + } +#endif + + /* Inherit the zero copy feature from the listen socket */ + new_sock = posix_sock_alloc(fd, sock->zcopy); + if (new_sock == NULL) { + close(fd); + return NULL; + } + new_sock->so_priority = sock->base.opts.priority; + + return &new_sock->base; +} + +static int +posix_sock_close(struct spdk_sock *_sock) +{ + struct spdk_posix_sock *sock = __posix_sock(_sock); + + assert(TAILQ_EMPTY(&_sock->pending_reqs)); + + /* If the socket fails to close, the best choice is to + * leak the fd but continue to free the rest of the sock + * memory. */ + close(sock->fd); + + spdk_pipe_destroy(sock->recv_pipe); + free(sock->recv_buf); + free(sock); + + return 0; +} + +#ifdef SPDK_ZEROCOPY +static int +_sock_check_zcopy(struct spdk_sock *sock) +{ + struct spdk_posix_sock *psock = __posix_sock(sock); + struct msghdr msgh = {}; + uint8_t buf[sizeof(struct cmsghdr) + sizeof(struct sock_extended_err)]; + ssize_t rc; + struct sock_extended_err *serr; + struct cmsghdr *cm; + uint32_t idx; + struct spdk_sock_request *req, *treq; + bool found; + + msgh.msg_control = buf; + msgh.msg_controllen = sizeof(buf); + + while (true) { + rc = recvmsg(psock->fd, &msgh, MSG_ERRQUEUE); + + if (rc < 0) { + if (errno == EWOULDBLOCK || errno == EAGAIN) { + return 0; + } + + if (!TAILQ_EMPTY(&sock->pending_reqs)) { + SPDK_ERRLOG("Attempting to receive from ERRQUEUE yielded error, but pending list still has orphaned entries\n"); + } else { + SPDK_WARNLOG("Recvmsg yielded an error!\n"); + } + return 0; + } + + cm = CMSG_FIRSTHDR(&msgh); + if (!cm || cm->cmsg_level != SOL_IP || cm->cmsg_type != IP_RECVERR) { + SPDK_WARNLOG("Unexpected cmsg level or type!\n"); + return 0; + } + + serr = (struct sock_extended_err *)CMSG_DATA(cm); + if (serr->ee_errno != 0 || serr->ee_origin != SO_EE_ORIGIN_ZEROCOPY) { + SPDK_WARNLOG("Unexpected extended error origin\n"); + return 0; + } + + /* Most of the time, the pending_reqs array is in the exact + * order we need such that all of the requests to complete are + * in order, in the front. It is guaranteed that all requests + * belonging to the same sendmsg call are sequential, so once + * we encounter one match we can stop looping as soon as a + * non-match is found. + */ + for (idx = serr->ee_info; idx <= serr->ee_data; idx++) { + found = false; + TAILQ_FOREACH_SAFE(req, &sock->pending_reqs, internal.link, treq) { + if (req->internal.offset == idx) { + found = true; + + rc = spdk_sock_request_put(sock, req, 0); + if (rc < 0) { + return rc; + } + + } else if (found) { + break; + } + } + + } + } + + return 0; +} +#endif + +static int +_sock_flush(struct spdk_sock *sock) +{ + struct spdk_posix_sock *psock = __posix_sock(sock); + struct msghdr msg = {}; + int flags; + struct iovec iovs[IOV_BATCH_SIZE]; + int iovcnt; + int retval; + struct spdk_sock_request *req; + int i; + ssize_t rc; + unsigned int offset; + size_t len; + + /* Can't flush from within a callback or we end up with recursive calls */ + if (sock->cb_cnt > 0) { + return 0; + } + + /* Gather an iov */ + iovcnt = 0; + req = TAILQ_FIRST(&sock->queued_reqs); + while (req) { + offset = req->internal.offset; + + for (i = 0; i < req->iovcnt; i++) { + /* Consume any offset first */ + if (offset >= SPDK_SOCK_REQUEST_IOV(req, i)->iov_len) { + offset -= SPDK_SOCK_REQUEST_IOV(req, i)->iov_len; + continue; + } + + iovs[iovcnt].iov_base = SPDK_SOCK_REQUEST_IOV(req, i)->iov_base + offset; + iovs[iovcnt].iov_len = SPDK_SOCK_REQUEST_IOV(req, i)->iov_len - offset; + iovcnt++; + + offset = 0; + + if (iovcnt >= IOV_BATCH_SIZE) { + break; + } + } + + if (iovcnt >= IOV_BATCH_SIZE) { + break; + } + + req = TAILQ_NEXT(req, internal.link); + } + + if (iovcnt == 0) { + return 0; + } + + /* Perform the vectored write */ + msg.msg_iov = iovs; + msg.msg_iovlen = iovcnt; +#ifdef SPDK_ZEROCOPY + if (psock->zcopy) { + flags = MSG_ZEROCOPY; + } else +#endif + { + flags = 0; + } + rc = sendmsg(psock->fd, &msg, flags); + if (rc <= 0) { + if (errno == EAGAIN || errno == EWOULDBLOCK) { + return 0; + } + return rc; + } + + psock->sendmsg_idx++; + + /* Consume the requests that were actually written */ + req = TAILQ_FIRST(&sock->queued_reqs); + while (req) { + offset = req->internal.offset; + + for (i = 0; i < req->iovcnt; i++) { + /* Advance by the offset first */ + if (offset >= SPDK_SOCK_REQUEST_IOV(req, i)->iov_len) { + offset -= SPDK_SOCK_REQUEST_IOV(req, i)->iov_len; + continue; + } + + /* Calculate the remaining length of this element */ + len = SPDK_SOCK_REQUEST_IOV(req, i)->iov_len - offset; + + if (len > (size_t)rc) { + /* This element was partially sent. */ + req->internal.offset += rc; + return 0; + } + + offset = 0; + req->internal.offset += len; + rc -= len; + } + + /* Handled a full request. */ + spdk_sock_request_pend(sock, req); + + if (!psock->zcopy) { + /* The sendmsg syscall above isn't currently asynchronous, + * so it's already done. */ + retval = spdk_sock_request_put(sock, req, 0); + if (retval) { + break; + } + } else { + /* Re-use the offset field to hold the sendmsg call index. The + * index is 0 based, so subtract one here because we've already + * incremented above. */ + req->internal.offset = psock->sendmsg_idx - 1; + } + + if (rc == 0) { + break; + } + + req = TAILQ_FIRST(&sock->queued_reqs); + } + + return 0; +} + +static int +posix_sock_flush(struct spdk_sock *_sock) +{ + return _sock_flush(_sock); +} + +static ssize_t +posix_sock_recv_from_pipe(struct spdk_posix_sock *sock, struct iovec *diov, int diovcnt) +{ + struct iovec siov[2]; + int sbytes; + ssize_t bytes; + struct spdk_posix_sock_group_impl *group; + + sbytes = spdk_pipe_reader_get_buffer(sock->recv_pipe, sock->recv_buf_sz, siov); + if (sbytes < 0) { + errno = EINVAL; + return -1; + } else if (sbytes == 0) { + errno = EAGAIN; + return -1; + } + + bytes = spdk_iovcpy(siov, 2, diov, diovcnt); + + if (bytes == 0) { + /* The only way this happens is if diov is 0 length */ + errno = EINVAL; + return -1; + } + + spdk_pipe_reader_advance(sock->recv_pipe, bytes); + + /* If we drained the pipe, take it off the level-triggered list */ + if (sock->base.group_impl && spdk_pipe_reader_bytes_available(sock->recv_pipe) == 0) { + group = __posix_group_impl(sock->base.group_impl); + TAILQ_REMOVE(&group->pending_recv, sock, link); + sock->pending_recv = false; + } + + return bytes; +} + +static inline ssize_t +posix_sock_read(struct spdk_posix_sock *sock) +{ + struct iovec iov[2]; + int bytes; + struct spdk_posix_sock_group_impl *group; + + bytes = spdk_pipe_writer_get_buffer(sock->recv_pipe, sock->recv_buf_sz, iov); + + if (bytes > 0) { + bytes = readv(sock->fd, iov, 2); + if (bytes > 0) { + spdk_pipe_writer_advance(sock->recv_pipe, bytes); + if (sock->base.group_impl) { + group = __posix_group_impl(sock->base.group_impl); + TAILQ_INSERT_TAIL(&group->pending_recv, sock, link); + sock->pending_recv = true; + } + } + } + + return bytes; +} + +static ssize_t +posix_sock_readv(struct spdk_sock *_sock, struct iovec *iov, int iovcnt) +{ + struct spdk_posix_sock *sock = __posix_sock(_sock); + int rc, i; + size_t len; + + if (sock->recv_pipe == NULL) { + return readv(sock->fd, iov, iovcnt); + } + + len = 0; + for (i = 0; i < iovcnt; i++) { + len += iov[i].iov_len; + } + + if (spdk_pipe_reader_bytes_available(sock->recv_pipe) == 0) { + /* If the user is receiving a sufficiently large amount of data, + * receive directly to their buffers. */ + if (len >= MIN_SOCK_PIPE_SIZE) { + return readv(sock->fd, iov, iovcnt); + } + + /* Otherwise, do a big read into our pipe */ + rc = posix_sock_read(sock); + if (rc <= 0) { + return rc; + } + } + + return posix_sock_recv_from_pipe(sock, iov, iovcnt); +} + +static ssize_t +posix_sock_recv(struct spdk_sock *sock, void *buf, size_t len) +{ + struct iovec iov[1]; + + iov[0].iov_base = buf; + iov[0].iov_len = len; + + return posix_sock_readv(sock, iov, 1); +} + +static ssize_t +posix_sock_writev(struct spdk_sock *_sock, struct iovec *iov, int iovcnt) +{ + struct spdk_posix_sock *sock = __posix_sock(_sock); + int rc; + + /* In order to process a writev, we need to flush any asynchronous writes + * first. */ + rc = _sock_flush(_sock); + if (rc < 0) { + return rc; + } + + if (!TAILQ_EMPTY(&_sock->queued_reqs)) { + /* We weren't able to flush all requests */ + errno = EAGAIN; + return -1; + } + + return writev(sock->fd, iov, iovcnt); +} + +static void +posix_sock_writev_async(struct spdk_sock *sock, struct spdk_sock_request *req) +{ + int rc; + + spdk_sock_request_queue(sock, req); + + /* If there are a sufficient number queued, just flush them out immediately. */ + if (sock->queued_iovcnt >= IOV_BATCH_SIZE) { + rc = _sock_flush(sock); + if (rc) { + spdk_sock_abort_requests(sock); + } + } +} + +static int +posix_sock_set_recvlowat(struct spdk_sock *_sock, int nbytes) +{ + struct spdk_posix_sock *sock = __posix_sock(_sock); + int val; + int rc; + + assert(sock != NULL); + + val = nbytes; + rc = setsockopt(sock->fd, SOL_SOCKET, SO_RCVLOWAT, &val, sizeof val); + if (rc != 0) { + return -1; + } + return 0; +} + +static bool +posix_sock_is_ipv6(struct spdk_sock *_sock) +{ + struct spdk_posix_sock *sock = __posix_sock(_sock); + struct sockaddr_storage sa; + socklen_t salen; + int rc; + + assert(sock != NULL); + + memset(&sa, 0, sizeof sa); + salen = sizeof sa; + rc = getsockname(sock->fd, (struct sockaddr *) &sa, &salen); + if (rc != 0) { + SPDK_ERRLOG("getsockname() failed (errno=%d)\n", errno); + return false; + } + + return (sa.ss_family == AF_INET6); +} + +static bool +posix_sock_is_ipv4(struct spdk_sock *_sock) +{ + struct spdk_posix_sock *sock = __posix_sock(_sock); + struct sockaddr_storage sa; + socklen_t salen; + int rc; + + assert(sock != NULL); + + memset(&sa, 0, sizeof sa); + salen = sizeof sa; + rc = getsockname(sock->fd, (struct sockaddr *) &sa, &salen); + if (rc != 0) { + SPDK_ERRLOG("getsockname() failed (errno=%d)\n", errno); + return false; + } + + return (sa.ss_family == AF_INET); +} + +static bool +posix_sock_is_connected(struct spdk_sock *_sock) +{ + struct spdk_posix_sock *sock = __posix_sock(_sock); + uint8_t byte; + int rc; + + rc = recv(sock->fd, &byte, 1, MSG_PEEK); + if (rc == 0) { + return false; + } + + if (rc < 0) { + if (errno == EAGAIN || errno == EWOULDBLOCK) { + return true; + } + + return false; + } + + return true; +} + +static int +posix_sock_get_placement_id(struct spdk_sock *_sock, int *placement_id) +{ + int rc = -1; + +#if defined(SO_INCOMING_NAPI_ID) + struct spdk_posix_sock *sock = __posix_sock(_sock); + socklen_t salen = sizeof(int); + + rc = getsockopt(sock->fd, SOL_SOCKET, SO_INCOMING_NAPI_ID, placement_id, &salen); + if (rc != 0) { + SPDK_ERRLOG("getsockopt() failed (errno=%d)\n", errno); + } + +#endif + return rc; +} + +static struct spdk_sock_group_impl * +posix_sock_group_impl_create(void) +{ + struct spdk_posix_sock_group_impl *group_impl; + int fd; + +#if defined(__linux__) + fd = epoll_create1(0); +#elif defined(__FreeBSD__) + fd = kqueue(); +#endif + if (fd == -1) { + return NULL; + } + + group_impl = calloc(1, sizeof(*group_impl)); + if (group_impl == NULL) { + SPDK_ERRLOG("group_impl allocation failed\n"); + close(fd); + return NULL; + } + + group_impl->fd = fd; + TAILQ_INIT(&group_impl->pending_recv); + + return &group_impl->base; +} + +static int +posix_sock_group_impl_add_sock(struct spdk_sock_group_impl *_group, struct spdk_sock *_sock) +{ + struct spdk_posix_sock_group_impl *group = __posix_group_impl(_group); + struct spdk_posix_sock *sock = __posix_sock(_sock); + int rc; + +#if defined(__linux__) + struct epoll_event event; + + memset(&event, 0, sizeof(event)); + /* EPOLLERR is always on even if we don't set it, but be explicit for clarity */ + event.events = EPOLLIN | EPOLLERR; + event.data.ptr = sock; + + rc = epoll_ctl(group->fd, EPOLL_CTL_ADD, sock->fd, &event); +#elif defined(__FreeBSD__) + struct kevent event; + struct timespec ts = {0}; + + EV_SET(&event, sock->fd, EVFILT_READ, EV_ADD, 0, 0, sock); + + rc = kevent(group->fd, &event, 1, NULL, 0, &ts); +#endif + + /* switched from another polling group due to scheduling */ + if (spdk_unlikely(sock->recv_pipe != NULL && + (spdk_pipe_reader_bytes_available(sock->recv_pipe) > 0))) { + assert(sock->pending_recv == false); + sock->pending_recv = true; + TAILQ_INSERT_TAIL(&group->pending_recv, sock, link); + } + + return rc; +} + +static int +posix_sock_group_impl_remove_sock(struct spdk_sock_group_impl *_group, struct spdk_sock *_sock) +{ + struct spdk_posix_sock_group_impl *group = __posix_group_impl(_group); + struct spdk_posix_sock *sock = __posix_sock(_sock); + int rc; + + if (sock->recv_pipe != NULL) { + if (spdk_pipe_reader_bytes_available(sock->recv_pipe) > 0) { + TAILQ_REMOVE(&group->pending_recv, sock, link); + sock->pending_recv = false; + } + assert(sock->pending_recv == false); + } + +#if defined(__linux__) + struct epoll_event event; + + /* Event parameter is ignored but some old kernel version still require it. */ + rc = epoll_ctl(group->fd, EPOLL_CTL_DEL, sock->fd, &event); +#elif defined(__FreeBSD__) + struct kevent event; + struct timespec ts = {0}; + + EV_SET(&event, sock->fd, EVFILT_READ, EV_DELETE, 0, 0, NULL); + + rc = kevent(group->fd, &event, 1, NULL, 0, &ts); + if (rc == 0 && event.flags & EV_ERROR) { + rc = -1; + errno = event.data; + } +#endif + + spdk_sock_abort_requests(_sock); + + return rc; +} + +static int +posix_sock_group_impl_poll(struct spdk_sock_group_impl *_group, int max_events, + struct spdk_sock **socks) +{ + struct spdk_posix_sock_group_impl *group = __posix_group_impl(_group); + struct spdk_sock *sock, *tmp; + int num_events, i, rc; + struct spdk_posix_sock *psock, *ptmp; +#if defined(__linux__) + struct epoll_event events[MAX_EVENTS_PER_POLL]; +#elif defined(__FreeBSD__) + struct kevent events[MAX_EVENTS_PER_POLL]; + struct timespec ts = {0}; +#endif + + /* This must be a TAILQ_FOREACH_SAFE because while flushing, + * a completion callback could remove the sock from the + * group. */ + TAILQ_FOREACH_SAFE(sock, &_group->socks, link, tmp) { + rc = _sock_flush(sock); + if (rc) { + spdk_sock_abort_requests(sock); + } + } + +#if defined(__linux__) + num_events = epoll_wait(group->fd, events, max_events, 0); +#elif defined(__FreeBSD__) + num_events = kevent(group->fd, NULL, 0, events, max_events, &ts); +#endif + + if (num_events == -1) { + return -1; + } else if (num_events == 0 && !TAILQ_EMPTY(&_group->socks)) { + uint8_t byte; + + sock = TAILQ_FIRST(&_group->socks); + psock = __posix_sock(sock); + /* a recv is done here to busy poll the queue associated with + * first socket in list and potentially reap incoming data. + */ + if (psock->so_priority) { + recv(psock->fd, &byte, 1, MSG_PEEK); + } + } + + for (i = 0; i < num_events; i++) { +#if defined(__linux__) + sock = events[i].data.ptr; + psock = __posix_sock(sock); + +#ifdef SPDK_ZEROCOPY + if (events[i].events & EPOLLERR) { + rc = _sock_check_zcopy(sock); + /* If the socket was closed or removed from + * the group in response to a send ack, don't + * add it to the array here. */ + if (rc || sock->cb_fn == NULL) { + continue; + } + } +#endif + if ((events[i].events & EPOLLIN) == 0) { + continue; + } + +#elif defined(__FreeBSD__) + sock = events[i].udata; + psock = __posix_sock(sock); +#endif + + /* If the socket does not already have recv pending, add it now */ + if (!psock->pending_recv) { + psock->pending_recv = true; + TAILQ_INSERT_TAIL(&group->pending_recv, psock, link); + } + } + + num_events = 0; + + TAILQ_FOREACH_SAFE(psock, &group->pending_recv, link, ptmp) { + if (num_events == max_events) { + break; + } + + socks[num_events++] = &psock->base; + } + + /* Cycle the pending_recv list so that each time we poll things aren't + * in the same order. */ + for (i = 0; i < num_events; i++) { + psock = __posix_sock(socks[i]); + + TAILQ_REMOVE(&group->pending_recv, psock, link); + + if (psock->recv_pipe == NULL || spdk_pipe_reader_bytes_available(psock->recv_pipe) == 0) { + psock->pending_recv = false; + } else { + TAILQ_INSERT_TAIL(&group->pending_recv, psock, link); + } + + } + + return num_events; +} + +static int +posix_sock_group_impl_close(struct spdk_sock_group_impl *_group) +{ + struct spdk_posix_sock_group_impl *group = __posix_group_impl(_group); + int rc; + + rc = close(group->fd); + free(group); + return rc; +} + +static int +posix_sock_impl_get_opts(struct spdk_sock_impl_opts *opts, size_t *len) +{ + if (!opts || !len) { + errno = EINVAL; + return -1; + } + +#define FIELD_OK(field) \ + offsetof(struct spdk_sock_impl_opts, field) + sizeof(opts->field) <= *len + +#define GET_FIELD(field) \ + if (FIELD_OK(field)) { \ + opts->field = g_spdk_posix_sock_impl_opts.field; \ + } + + GET_FIELD(recv_buf_size); + GET_FIELD(send_buf_size); + GET_FIELD(enable_recv_pipe); + GET_FIELD(enable_zerocopy_send); + +#undef GET_FIELD +#undef FIELD_OK + + *len = spdk_min(*len, sizeof(g_spdk_posix_sock_impl_opts)); + return 0; +} + +static int +posix_sock_impl_set_opts(const struct spdk_sock_impl_opts *opts, size_t len) +{ + if (!opts) { + errno = EINVAL; + return -1; + } + +#define FIELD_OK(field) \ + offsetof(struct spdk_sock_impl_opts, field) + sizeof(opts->field) <= len + +#define SET_FIELD(field) \ + if (FIELD_OK(field)) { \ + g_spdk_posix_sock_impl_opts.field = opts->field; \ + } + + SET_FIELD(recv_buf_size); + SET_FIELD(send_buf_size); + SET_FIELD(enable_recv_pipe); + SET_FIELD(enable_zerocopy_send); + +#undef SET_FIELD +#undef FIELD_OK + + return 0; +} + + +static struct spdk_net_impl g_posix_net_impl = { + .name = "posix", + .getaddr = posix_sock_getaddr, + .connect = posix_sock_connect, + .listen = posix_sock_listen, + .accept = posix_sock_accept, + .close = posix_sock_close, + .recv = posix_sock_recv, + .readv = posix_sock_readv, + .writev = posix_sock_writev, + .writev_async = posix_sock_writev_async, + .flush = posix_sock_flush, + .set_recvlowat = posix_sock_set_recvlowat, + .set_recvbuf = posix_sock_set_recvbuf, + .set_sendbuf = posix_sock_set_sendbuf, + .is_ipv6 = posix_sock_is_ipv6, + .is_ipv4 = posix_sock_is_ipv4, + .is_connected = posix_sock_is_connected, + .get_placement_id = posix_sock_get_placement_id, + .group_impl_create = posix_sock_group_impl_create, + .group_impl_add_sock = posix_sock_group_impl_add_sock, + .group_impl_remove_sock = posix_sock_group_impl_remove_sock, + .group_impl_poll = posix_sock_group_impl_poll, + .group_impl_close = posix_sock_group_impl_close, + .get_opts = posix_sock_impl_get_opts, + .set_opts = posix_sock_impl_set_opts, +}; + +SPDK_NET_IMPL_REGISTER(posix, &g_posix_net_impl, DEFAULT_SOCK_PRIORITY); diff --git a/src/spdk/module/sock/uring/Makefile b/src/spdk/module/sock/uring/Makefile new file mode 100644 index 000000000..2d0e7c4e2 --- /dev/null +++ b/src/spdk/module/sock/uring/Makefile @@ -0,0 +1,45 @@ +# +# BSD LICENSE +# +# Copyright (c) Intel Corporation. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions +# are met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above copyright +# notice, this list of conditions and the following disclaimer in +# the documentation and/or other materials provided with the +# distribution. +# * Neither the name of Intel Corporation nor the names of its +# contributors may be used to endorse or promote products derived +# from this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. +# + +SPDK_ROOT_DIR := $(abspath $(CURDIR)/../../..) +include $(SPDK_ROOT_DIR)/mk/spdk.common.mk + +SO_VER := 1 +SO_MINOR := 0 + +LIBNAME = sock_uring +C_SRCS = uring.c + +SPDK_MAP_FILE = $(SPDK_ROOT_DIR)/mk/spdk_blank.map + +include $(SPDK_ROOT_DIR)/mk/spdk.lib.mk diff --git a/src/spdk/module/sock/uring/uring.c b/src/spdk/module/sock/uring/uring.c new file mode 100644 index 000000000..3066f2d16 --- /dev/null +++ b/src/spdk/module/sock/uring/uring.c @@ -0,0 +1,1328 @@ +/*- + * BSD LICENSE + * + * Copyright (c) Intel Corporation. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions + * are met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in + * the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Intel Corporation nor the names of its + * contributors may be used to endorse or promote products derived + * from this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +#include "spdk/stdinc.h" +#include "spdk/config.h" + +#include <sys/epoll.h> +#include <liburing.h> + +#include "spdk/barrier.h" +#include "spdk/likely.h" +#include "spdk/log.h" +#include "spdk/pipe.h" +#include "spdk/sock.h" +#include "spdk/string.h" +#include "spdk/util.h" + +#include "spdk_internal/sock.h" +#include "spdk_internal/assert.h" + +#define MAX_TMPBUF 1024 +#define PORTNUMLEN 32 +#define SO_RCVBUF_SIZE (2 * 1024 * 1024) +#define SO_SNDBUF_SIZE (2 * 1024 * 1024) +#define SPDK_SOCK_GROUP_QUEUE_DEPTH 4096 +#define IOV_BATCH_SIZE 64 + +enum spdk_sock_task_type { + SPDK_SOCK_TASK_POLLIN = 0, + SPDK_SOCK_TASK_WRITE, + SPDK_SOCK_TASK_CANCEL, +}; + +enum spdk_uring_sock_task_status { + SPDK_URING_SOCK_TASK_NOT_IN_USE = 0, + SPDK_URING_SOCK_TASK_IN_PROCESS, +}; + +struct spdk_uring_task { + enum spdk_uring_sock_task_status status; + enum spdk_sock_task_type type; + struct spdk_uring_sock *sock; + struct msghdr msg; + struct iovec iovs[IOV_BATCH_SIZE]; + int iov_cnt; + struct spdk_sock_request *last_req; + STAILQ_ENTRY(spdk_uring_task) link; +}; + +struct spdk_uring_sock { + struct spdk_sock base; + int fd; + struct spdk_uring_sock_group_impl *group; + struct spdk_uring_task write_task; + struct spdk_uring_task pollin_task; + struct spdk_uring_task cancel_task; + struct spdk_pipe *recv_pipe; + void *recv_buf; + int recv_buf_sz; + bool pending_recv; + int connection_status; + TAILQ_ENTRY(spdk_uring_sock) link; +}; + +struct spdk_uring_sock_group_impl { + struct spdk_sock_group_impl base; + struct io_uring uring; + uint32_t io_inflight; + uint32_t io_queued; + uint32_t io_avail; + TAILQ_HEAD(, spdk_uring_sock) pending_recv; +}; + +#define SPDK_URING_SOCK_REQUEST_IOV(req) ((struct iovec *)((uint8_t *)req + sizeof(struct spdk_sock_request))) + +static int +get_addr_str(struct sockaddr *sa, char *host, size_t hlen) +{ + const char *result = NULL; + + if (sa == NULL || host == NULL) { + return -1; + } + + switch (sa->sa_family) { + case AF_INET: + result = inet_ntop(AF_INET, &(((struct sockaddr_in *)sa)->sin_addr), + host, hlen); + break; + case AF_INET6: + result = inet_ntop(AF_INET6, &(((struct sockaddr_in6 *)sa)->sin6_addr), + host, hlen); + break; + default: + break; + } + + if (result != NULL) { + return 0; + } else { + return -1; + } +} + +#define __uring_sock(sock) (struct spdk_uring_sock *)sock +#define __uring_group_impl(group) (struct spdk_uring_sock_group_impl *)group + +static int +uring_sock_getaddr(struct spdk_sock *_sock, char *saddr, int slen, uint16_t *sport, + char *caddr, int clen, uint16_t *cport) +{ + struct spdk_uring_sock *sock = __uring_sock(_sock); + struct sockaddr_storage sa; + socklen_t salen; + int rc; + + assert(sock != NULL); + + memset(&sa, 0, sizeof sa); + salen = sizeof sa; + rc = getsockname(sock->fd, (struct sockaddr *) &sa, &salen); + if (rc != 0) { + SPDK_ERRLOG("getsockname() failed (errno=%d)\n", errno); + return -1; + } + + switch (sa.ss_family) { + case AF_UNIX: + /* Acceptable connection types that don't have IPs */ + return 0; + case AF_INET: + case AF_INET6: + /* Code below will get IP addresses */ + break; + default: + /* Unsupported socket family */ + return -1; + } + + rc = get_addr_str((struct sockaddr *)&sa, saddr, slen); + if (rc != 0) { + SPDK_ERRLOG("getnameinfo() failed (errno=%d)\n", errno); + return -1; + } + + if (sport) { + if (sa.ss_family == AF_INET) { + *sport = ntohs(((struct sockaddr_in *) &sa)->sin_port); + } else if (sa.ss_family == AF_INET6) { + *sport = ntohs(((struct sockaddr_in6 *) &sa)->sin6_port); + } + } + + memset(&sa, 0, sizeof sa); + salen = sizeof sa; + rc = getpeername(sock->fd, (struct sockaddr *) &sa, &salen); + if (rc != 0) { + SPDK_ERRLOG("getpeername() failed (errno=%d)\n", errno); + return -1; + } + + rc = get_addr_str((struct sockaddr *)&sa, caddr, clen); + if (rc != 0) { + SPDK_ERRLOG("getnameinfo() failed (errno=%d)\n", errno); + return -1; + } + + if (cport) { + if (sa.ss_family == AF_INET) { + *cport = ntohs(((struct sockaddr_in *) &sa)->sin_port); + } else if (sa.ss_family == AF_INET6) { + *cport = ntohs(((struct sockaddr_in6 *) &sa)->sin6_port); + } + } + + return 0; +} + +enum uring_sock_create_type { + SPDK_SOCK_CREATE_LISTEN, + SPDK_SOCK_CREATE_CONNECT, +}; + +static int +uring_sock_alloc_pipe(struct spdk_uring_sock *sock, int sz) +{ + uint8_t *new_buf; + struct spdk_pipe *new_pipe; + struct iovec siov[2]; + struct iovec diov[2]; + int sbytes; + ssize_t bytes; + + if (sock->recv_buf_sz == sz) { + return 0; + } + + /* If the new size is 0, just free the pipe */ + if (sz == 0) { + spdk_pipe_destroy(sock->recv_pipe); + free(sock->recv_buf); + sock->recv_pipe = NULL; + sock->recv_buf = NULL; + return 0; + } else if (sz < MIN_SOCK_PIPE_SIZE) { + SPDK_ERRLOG("The size of the pipe must be larger than %d\n", MIN_SOCK_PIPE_SIZE); + return -1; + } + + /* Round up to next 64 byte multiple */ + new_buf = calloc(SPDK_ALIGN_CEIL(sz + 1, 64), sizeof(uint8_t)); + if (!new_buf) { + SPDK_ERRLOG("socket recv buf allocation failed\n"); + return -ENOMEM; + } + + new_pipe = spdk_pipe_create(new_buf, sz + 1); + if (new_pipe == NULL) { + SPDK_ERRLOG("socket pipe allocation failed\n"); + free(new_buf); + return -ENOMEM; + } + + if (sock->recv_pipe != NULL) { + /* Pull all of the data out of the old pipe */ + sbytes = spdk_pipe_reader_get_buffer(sock->recv_pipe, sock->recv_buf_sz, siov); + if (sbytes > sz) { + /* Too much data to fit into the new pipe size */ + spdk_pipe_destroy(new_pipe); + free(new_buf); + return -EINVAL; + } + + sbytes = spdk_pipe_writer_get_buffer(new_pipe, sz, diov); + assert(sbytes == sz); + + bytes = spdk_iovcpy(siov, 2, diov, 2); + spdk_pipe_writer_advance(new_pipe, bytes); + + spdk_pipe_destroy(sock->recv_pipe); + free(sock->recv_buf); + } + + sock->recv_buf_sz = sz; + sock->recv_buf = new_buf; + sock->recv_pipe = new_pipe; + + return 0; +} + +static int +uring_sock_set_recvbuf(struct spdk_sock *_sock, int sz) +{ + struct spdk_uring_sock *sock = __uring_sock(_sock); + int rc; + + assert(sock != NULL); + +#ifndef __aarch64__ + /* On ARM systems, this buffering does not help. Skip it. */ + /* The size of the pipe is purely derived from benchmarks. It seems to work well. */ + rc = uring_sock_alloc_pipe(sock, sz); + if (rc) { + SPDK_ERRLOG("unable to allocate sufficient recvbuf with sz=%d on sock=%p\n", sz, _sock); + return rc; + } +#endif + + if (sz < SO_RCVBUF_SIZE) { + sz = SO_RCVBUF_SIZE; + } + + rc = setsockopt(sock->fd, SOL_SOCKET, SO_RCVBUF, &sz, sizeof(sz)); + if (rc < 0) { + return rc; + } + + return 0; +} + +static int +uring_sock_set_sendbuf(struct spdk_sock *_sock, int sz) +{ + struct spdk_uring_sock *sock = __uring_sock(_sock); + int rc; + + assert(sock != NULL); + + if (sz < SO_SNDBUF_SIZE) { + sz = SO_SNDBUF_SIZE; + } + + rc = setsockopt(sock->fd, SOL_SOCKET, SO_SNDBUF, &sz, sizeof(sz)); + if (rc < 0) { + return rc; + } + + return 0; +} + +static struct spdk_uring_sock * +uring_sock_alloc(int fd) +{ + struct spdk_uring_sock *sock; + + sock = calloc(1, sizeof(*sock)); + if (sock == NULL) { + SPDK_ERRLOG("sock allocation failed\n"); + return NULL; + } + + sock->fd = fd; + return sock; +} + +static struct spdk_sock * +uring_sock_create(const char *ip, int port, + enum uring_sock_create_type type, + struct spdk_sock_opts *opts) +{ + struct spdk_uring_sock *sock; + char buf[MAX_TMPBUF]; + char portnum[PORTNUMLEN]; + char *p; + struct addrinfo hints, *res, *res0; + int fd, flag; + int val = 1; + int rc; + + if (ip == NULL) { + return NULL; + } + if (ip[0] == '[') { + snprintf(buf, sizeof(buf), "%s", ip + 1); + p = strchr(buf, ']'); + if (p != NULL) { + *p = '\0'; + } + ip = (const char *) &buf[0]; + } + + snprintf(portnum, sizeof portnum, "%d", port); + memset(&hints, 0, sizeof hints); + hints.ai_family = PF_UNSPEC; + hints.ai_socktype = SOCK_STREAM; + hints.ai_flags = AI_NUMERICSERV; + hints.ai_flags |= AI_PASSIVE; + hints.ai_flags |= AI_NUMERICHOST; + rc = getaddrinfo(ip, portnum, &hints, &res0); + if (rc != 0) { + SPDK_ERRLOG("getaddrinfo() failed (errno=%d)\n", errno); + return NULL; + } + + /* try listen */ + fd = -1; + for (res = res0; res != NULL; res = res->ai_next) { +retry: + fd = socket(res->ai_family, res->ai_socktype, res->ai_protocol); + if (fd < 0) { + /* error */ + continue; + } + + val = SO_RCVBUF_SIZE; + rc = setsockopt(fd, SOL_SOCKET, SO_RCVBUF, &val, sizeof val); + if (rc) { + /* Not fatal */ + } + + val = SO_SNDBUF_SIZE; + rc = setsockopt(fd, SOL_SOCKET, SO_SNDBUF, &val, sizeof val); + if (rc) { + /* Not fatal */ + } + + rc = setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &val, sizeof val); + if (rc != 0) { + close(fd); + /* error */ + continue; + } + rc = setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &val, sizeof val); + if (rc != 0) { + close(fd); + /* error */ + continue; + } + +#if defined(SO_PRIORITY) + if (opts != NULL && opts->priority) { + rc = setsockopt(fd, SOL_SOCKET, SO_PRIORITY, &opts->priority, sizeof val); + if (rc != 0) { + close(fd); + /* error */ + continue; + } + } +#endif + if (res->ai_family == AF_INET6) { + rc = setsockopt(fd, IPPROTO_IPV6, IPV6_V6ONLY, &val, sizeof val); + if (rc != 0) { + close(fd); + /* error */ + continue; + } + } + + if (type == SPDK_SOCK_CREATE_LISTEN) { + rc = bind(fd, res->ai_addr, res->ai_addrlen); + if (rc != 0) { + SPDK_ERRLOG("bind() failed at port %d, errno = %d\n", port, errno); + switch (errno) { + case EINTR: + /* interrupted? */ + close(fd); + goto retry; + case EADDRNOTAVAIL: + SPDK_ERRLOG("IP address %s not available. " + "Verify IP address in config file " + "and make sure setup script is " + "run before starting spdk app.\n", ip); + /* FALLTHROUGH */ + default: + /* try next family */ + close(fd); + fd = -1; + continue; + } + } + /* bind OK */ + rc = listen(fd, 512); + if (rc != 0) { + SPDK_ERRLOG("listen() failed, errno = %d\n", errno); + close(fd); + fd = -1; + break; + } + } else if (type == SPDK_SOCK_CREATE_CONNECT) { + rc = connect(fd, res->ai_addr, res->ai_addrlen); + if (rc != 0) { + SPDK_ERRLOG("connect() failed, errno = %d\n", errno); + /* try next family */ + close(fd); + fd = -1; + continue; + } + } + + flag = fcntl(fd, F_GETFL); + if (fcntl(fd, F_SETFL, flag | O_NONBLOCK) < 0) { + SPDK_ERRLOG("fcntl can't set nonblocking mode for socket, fd: %d (%d)\n", fd, errno); + close(fd); + fd = -1; + break; + } + break; + } + freeaddrinfo(res0); + + if (fd < 0) { + return NULL; + } + + sock = uring_sock_alloc(fd); + if (sock == NULL) { + SPDK_ERRLOG("sock allocation failed\n"); + close(fd); + return NULL; + } + + return &sock->base; +} + +static struct spdk_sock * +uring_sock_listen(const char *ip, int port, struct spdk_sock_opts *opts) +{ + return uring_sock_create(ip, port, SPDK_SOCK_CREATE_LISTEN, opts); +} + +static struct spdk_sock * +uring_sock_connect(const char *ip, int port, struct spdk_sock_opts *opts) +{ + return uring_sock_create(ip, port, SPDK_SOCK_CREATE_CONNECT, opts); +} + +static struct spdk_sock * +uring_sock_accept(struct spdk_sock *_sock) +{ + struct spdk_uring_sock *sock = __uring_sock(_sock); + struct sockaddr_storage sa; + socklen_t salen; + int rc, fd; + struct spdk_uring_sock *new_sock; + int flag; + + memset(&sa, 0, sizeof(sa)); + salen = sizeof(sa); + + assert(sock != NULL); + + rc = accept(sock->fd, (struct sockaddr *)&sa, &salen); + + if (rc == -1) { + return NULL; + } + + fd = rc; + + flag = fcntl(fd, F_GETFL); + if ((!(flag & O_NONBLOCK)) && (fcntl(fd, F_SETFL, flag | O_NONBLOCK) < 0)) { + SPDK_ERRLOG("fcntl can't set nonblocking mode for socket, fd: %d (%d)\n", fd, errno); + close(fd); + return NULL; + } + +#if defined(SO_PRIORITY) + /* The priority is not inherited, so call this function again */ + if (sock->base.opts.priority) { + rc = setsockopt(fd, SOL_SOCKET, SO_PRIORITY, &sock->base.opts.priority, sizeof(int)); + if (rc != 0) { + close(fd); + return NULL; + } + } +#endif + + new_sock = uring_sock_alloc(fd); + if (new_sock == NULL) { + close(fd); + return NULL; + } + + return &new_sock->base; +} + +static int +uring_sock_close(struct spdk_sock *_sock) +{ + struct spdk_uring_sock *sock = __uring_sock(_sock); + int rc; + + assert(TAILQ_EMPTY(&_sock->pending_reqs)); + assert(sock->group == NULL); + + spdk_pipe_destroy(sock->recv_pipe); + free(sock->recv_buf); + rc = close(sock->fd); + if (rc == 0) { + free(sock); + } + + return rc; +} + +static ssize_t +uring_sock_recv_from_pipe(struct spdk_uring_sock *sock, struct iovec *diov, int diovcnt) +{ + struct iovec siov[2]; + int sbytes; + ssize_t bytes; + struct spdk_uring_sock_group_impl *group; + + sbytes = spdk_pipe_reader_get_buffer(sock->recv_pipe, sock->recv_buf_sz, siov); + if (sbytes < 0) { + errno = EINVAL; + return -1; + } else if (sbytes == 0) { + errno = EAGAIN; + return -1; + } + + bytes = spdk_iovcpy(siov, 2, diov, diovcnt); + + if (bytes == 0) { + /* The only way this happens is if diov is 0 length */ + errno = EINVAL; + return -1; + } + + spdk_pipe_reader_advance(sock->recv_pipe, bytes); + + /* If we drained the pipe, take it off the level-triggered list */ + if (sock->base.group_impl && spdk_pipe_reader_bytes_available(sock->recv_pipe) == 0) { + group = __uring_group_impl(sock->base.group_impl); + TAILQ_REMOVE(&group->pending_recv, sock, link); + sock->pending_recv = false; + } + + return bytes; +} + +static inline ssize_t +uring_sock_read(struct spdk_uring_sock *sock) +{ + struct iovec iov[2]; + int bytes; + struct spdk_uring_sock_group_impl *group; + + bytes = spdk_pipe_writer_get_buffer(sock->recv_pipe, sock->recv_buf_sz, iov); + + if (bytes > 0) { + bytes = readv(sock->fd, iov, 2); + if (bytes > 0) { + spdk_pipe_writer_advance(sock->recv_pipe, bytes); + if (sock->base.group_impl) { + group = __uring_group_impl(sock->base.group_impl); + TAILQ_INSERT_TAIL(&group->pending_recv, sock, link); + sock->pending_recv = true; + } + } + } + + return bytes; +} + +static ssize_t +uring_sock_readv(struct spdk_sock *_sock, struct iovec *iov, int iovcnt) +{ + struct spdk_uring_sock *sock = __uring_sock(_sock); + int rc, i; + size_t len; + + if (sock->recv_pipe == NULL) { + return readv(sock->fd, iov, iovcnt); + } + + len = 0; + for (i = 0; i < iovcnt; i++) { + len += iov[i].iov_len; + } + + if (spdk_pipe_reader_bytes_available(sock->recv_pipe) == 0) { + /* If the user is receiving a sufficiently large amount of data, + * receive directly to their buffers. */ + if (len >= MIN_SOCK_PIPE_SIZE) { + return readv(sock->fd, iov, iovcnt); + } + + /* Otherwise, do a big read into our pipe */ + rc = uring_sock_read(sock); + if (rc <= 0) { + return rc; + } + } + + return uring_sock_recv_from_pipe(sock, iov, iovcnt); +} + +static ssize_t +uring_sock_recv(struct spdk_sock *sock, void *buf, size_t len) +{ + struct iovec iov[1]; + + iov[0].iov_base = buf; + iov[0].iov_len = len; + + return uring_sock_readv(sock, iov, 1); +} + +static ssize_t +uring_sock_writev(struct spdk_sock *_sock, struct iovec *iov, int iovcnt) +{ + struct spdk_uring_sock *sock = __uring_sock(_sock); + + if (sock->write_task.status != SPDK_URING_SOCK_TASK_NOT_IN_USE) { + errno = EAGAIN; + return -1; + } + + return writev(sock->fd, iov, iovcnt); +} + +static int +sock_prep_reqs(struct spdk_sock *_sock, struct iovec *iovs, int index, + struct spdk_sock_request **last_req) +{ + int iovcnt, i; + struct spdk_sock_request *req; + unsigned int offset; + + /* Gather an iov */ + iovcnt = index; + if (spdk_unlikely(iovcnt >= IOV_BATCH_SIZE)) { + goto end; + } + + if (last_req != NULL && *last_req != NULL) { + req = TAILQ_NEXT(*last_req, internal.link); + } else { + req = TAILQ_FIRST(&_sock->queued_reqs); + } + + while (req) { + offset = req->internal.offset; + + for (i = 0; i < req->iovcnt; i++) { + /* Consume any offset first */ + if (offset >= SPDK_SOCK_REQUEST_IOV(req, i)->iov_len) { + offset -= SPDK_SOCK_REQUEST_IOV(req, i)->iov_len; + continue; + } + + iovs[iovcnt].iov_base = SPDK_SOCK_REQUEST_IOV(req, i)->iov_base + offset; + iovs[iovcnt].iov_len = SPDK_SOCK_REQUEST_IOV(req, i)->iov_len - offset; + iovcnt++; + + offset = 0; + + if (iovcnt >= IOV_BATCH_SIZE) { + break; + } + } + if (iovcnt >= IOV_BATCH_SIZE) { + break; + } + + if (last_req != NULL) { + *last_req = req; + } + req = TAILQ_NEXT(req, internal.link); + } + +end: + return iovcnt; +} + +static int +sock_complete_reqs(struct spdk_sock *_sock, ssize_t rc) +{ + struct spdk_sock_request *req; + int i, retval; + unsigned int offset; + size_t len; + + /* Consume the requests that were actually written */ + req = TAILQ_FIRST(&_sock->queued_reqs); + while (req) { + offset = req->internal.offset; + + for (i = 0; i < req->iovcnt; i++) { + /* Advance by the offset first */ + if (offset >= SPDK_SOCK_REQUEST_IOV(req, i)->iov_len) { + offset -= SPDK_SOCK_REQUEST_IOV(req, i)->iov_len; + continue; + } + + /* Calculate the remaining length of this element */ + len = SPDK_SOCK_REQUEST_IOV(req, i)->iov_len - offset; + + if (len > (size_t)rc) { + /* This element was partially sent. */ + req->internal.offset += rc; + return 0; + } + + offset = 0; + req->internal.offset += len; + rc -= len; + } + + /* Handled a full request. */ + spdk_sock_request_pend(_sock, req); + + retval = spdk_sock_request_put(_sock, req, 0); + if (retval) { + return retval; + } + + if (rc == 0) { + break; + } + + req = TAILQ_FIRST(&_sock->queued_reqs); + } + + return 0; +} + +static void +_sock_flush(struct spdk_sock *_sock) +{ + struct spdk_uring_sock *sock = __uring_sock(_sock); + struct spdk_uring_task *task = &sock->write_task; + uint32_t iovcnt; + struct io_uring_sqe *sqe; + + if (task->status == SPDK_URING_SOCK_TASK_IN_PROCESS) { + return; + } + + iovcnt = sock_prep_reqs(&sock->base, task->iovs, task->iov_cnt, &task->last_req); + if (!iovcnt) { + return; + } + + task->iov_cnt = iovcnt; + assert(sock->group != NULL); + task->msg.msg_iov = task->iovs; + task->msg.msg_iovlen = task->iov_cnt; + + sock->group->io_queued++; + + sqe = io_uring_get_sqe(&sock->group->uring); + io_uring_prep_sendmsg(sqe, sock->fd, &sock->write_task.msg, 0); + io_uring_sqe_set_data(sqe, task); + task->status = SPDK_URING_SOCK_TASK_IN_PROCESS; +} + +static void +_sock_prep_pollin(struct spdk_sock *_sock) +{ + struct spdk_uring_sock *sock = __uring_sock(_sock); + struct spdk_uring_task *task = &sock->pollin_task; + struct io_uring_sqe *sqe; + + /* Do not prepare pollin event */ + if (task->status == SPDK_URING_SOCK_TASK_IN_PROCESS || sock->pending_recv) { + return; + } + + assert(sock->group != NULL); + sock->group->io_queued++; + + sqe = io_uring_get_sqe(&sock->group->uring); + io_uring_prep_poll_add(sqe, sock->fd, POLLIN); + io_uring_sqe_set_data(sqe, task); + task->status = SPDK_URING_SOCK_TASK_IN_PROCESS; +} + +static void +_sock_prep_cancel_task(struct spdk_sock *_sock, void *user_data) +{ + struct spdk_uring_sock *sock = __uring_sock(_sock); + struct spdk_uring_task *task = &sock->cancel_task; + struct io_uring_sqe *sqe; + + if (task->status == SPDK_URING_SOCK_TASK_IN_PROCESS) { + return; + } + + assert(sock->group != NULL); + sock->group->io_queued++; + + sqe = io_uring_get_sqe(&sock->group->uring); + io_uring_prep_cancel(sqe, user_data, 0); + io_uring_sqe_set_data(sqe, task); + task->status = SPDK_URING_SOCK_TASK_IN_PROCESS; +} + +static int +sock_uring_group_reap(struct spdk_uring_sock_group_impl *group, int max, int max_read_events, + struct spdk_sock **socks) +{ + int i, count, ret; + struct io_uring_cqe *cqe; + struct spdk_uring_sock *sock, *tmp; + struct spdk_uring_task *task; + int status; + + for (i = 0; i < max; i++) { + ret = io_uring_peek_cqe(&group->uring, &cqe); + if (ret != 0) { + break; + } + + if (cqe == NULL) { + break; + } + + task = (struct spdk_uring_task *)cqe->user_data; + assert(task != NULL); + sock = task->sock; + assert(sock != NULL); + assert(sock->group != NULL); + assert(sock->group == group); + sock->group->io_inflight--; + sock->group->io_avail++; + status = cqe->res; + io_uring_cqe_seen(&group->uring, cqe); + + task->status = SPDK_URING_SOCK_TASK_NOT_IN_USE; + + if (spdk_unlikely(status <= 0)) { + if (status == -EAGAIN || status == -EWOULDBLOCK) { + continue; + } + } + + switch (task->type) { + case SPDK_SOCK_TASK_POLLIN: + if ((status & POLLIN) == POLLIN) { + if (sock->base.cb_fn != NULL) { + assert(sock->pending_recv == false); + sock->pending_recv = true; + TAILQ_INSERT_TAIL(&group->pending_recv, sock, link); + } + } + break; + case SPDK_SOCK_TASK_WRITE: + assert(TAILQ_EMPTY(&sock->base.pending_reqs)); + task->last_req = NULL; + task->iov_cnt = 0; + if (spdk_unlikely(status) < 0) { + sock->connection_status = status; + spdk_sock_abort_requests(&sock->base); + } else { + sock_complete_reqs(&sock->base, status); + } + + break; + case SPDK_SOCK_TASK_CANCEL: + /* Do nothing */ + break; + default: + SPDK_UNREACHABLE(); + } + } + + if (!socks) { + return 0; + } + count = 0; + TAILQ_FOREACH_SAFE(sock, &group->pending_recv, link, tmp) { + if (count == max_read_events) { + break; + } + + socks[count++] = &sock->base; + } + + /* Cycle the pending_recv list so that each time we poll things aren't + * in the same order. */ + for (i = 0; i < count; i++) { + sock = __uring_sock(socks[i]); + + TAILQ_REMOVE(&group->pending_recv, sock, link); + + if (sock->recv_pipe == NULL || spdk_pipe_reader_bytes_available(sock->recv_pipe) == 0) { + sock->pending_recv = false; + } else { + TAILQ_INSERT_TAIL(&group->pending_recv, sock, link); + } + } + + return count; +} + +static int +_sock_flush_client(struct spdk_sock *_sock) +{ + struct spdk_uring_sock *sock = __uring_sock(_sock); + struct msghdr msg = {}; + struct iovec iovs[IOV_BATCH_SIZE]; + int iovcnt; + ssize_t rc; + + /* Can't flush from within a callback or we end up with recursive calls */ + if (_sock->cb_cnt > 0) { + return 0; + } + + /* Gather an iov */ + iovcnt = sock_prep_reqs(_sock, iovs, 0, NULL); + if (iovcnt == 0) { + return 0; + } + + /* Perform the vectored write */ + msg.msg_iov = iovs; + msg.msg_iovlen = iovcnt; + rc = sendmsg(sock->fd, &msg, 0); + if (rc <= 0) { + if (errno == EAGAIN || errno == EWOULDBLOCK) { + return 0; + } + return rc; + } + + sock_complete_reqs(_sock, rc); + + return 0; +} + +static void +uring_sock_writev_async(struct spdk_sock *_sock, struct spdk_sock_request *req) +{ + struct spdk_uring_sock *sock = __uring_sock(_sock); + int rc; + + if (spdk_unlikely(sock->connection_status)) { + req->cb_fn(req->cb_arg, sock->connection_status); + return; + } + + spdk_sock_request_queue(_sock, req); + + if (!sock->group) { + if (_sock->queued_iovcnt >= IOV_BATCH_SIZE) { + rc = _sock_flush_client(_sock); + if (rc) { + spdk_sock_abort_requests(_sock); + } + } + } +} + +static int +uring_sock_set_recvlowat(struct spdk_sock *_sock, int nbytes) +{ + struct spdk_uring_sock *sock = __uring_sock(_sock); + int val; + int rc; + + assert(sock != NULL); + + val = nbytes; + rc = setsockopt(sock->fd, SOL_SOCKET, SO_RCVLOWAT, &val, sizeof val); + if (rc != 0) { + return -1; + } + return 0; +} + +static bool +uring_sock_is_ipv6(struct spdk_sock *_sock) +{ + struct spdk_uring_sock *sock = __uring_sock(_sock); + struct sockaddr_storage sa; + socklen_t salen; + int rc; + + assert(sock != NULL); + + memset(&sa, 0, sizeof sa); + salen = sizeof sa; + rc = getsockname(sock->fd, (struct sockaddr *) &sa, &salen); + if (rc != 0) { + SPDK_ERRLOG("getsockname() failed (errno=%d)\n", errno); + return false; + } + + return (sa.ss_family == AF_INET6); +} + +static bool +uring_sock_is_ipv4(struct spdk_sock *_sock) +{ + struct spdk_uring_sock *sock = __uring_sock(_sock); + struct sockaddr_storage sa; + socklen_t salen; + int rc; + + assert(sock != NULL); + + memset(&sa, 0, sizeof sa); + salen = sizeof sa; + rc = getsockname(sock->fd, (struct sockaddr *) &sa, &salen); + if (rc != 0) { + SPDK_ERRLOG("getsockname() failed (errno=%d)\n", errno); + return false; + } + + return (sa.ss_family == AF_INET); +} + +static bool +uring_sock_is_connected(struct spdk_sock *_sock) +{ + struct spdk_uring_sock *sock = __uring_sock(_sock); + uint8_t byte; + int rc; + + rc = recv(sock->fd, &byte, 1, MSG_PEEK); + if (rc == 0) { + return false; + } + + if (rc < 0) { + if (errno == EAGAIN || errno == EWOULDBLOCK) { + return true; + } + + return false; + } + + return true; +} + +static int +uring_sock_get_placement_id(struct spdk_sock *_sock, int *placement_id) +{ + int rc = -1; + +#if defined(SO_INCOMING_NAPI_ID) + struct spdk_uring_sock *sock = __uring_sock(_sock); + socklen_t salen = sizeof(int); + + rc = getsockopt(sock->fd, SOL_SOCKET, SO_INCOMING_NAPI_ID, placement_id, &salen); + if (rc != 0) { + SPDK_ERRLOG("getsockopt() failed (errno=%d)\n", errno); + } + +#endif + return rc; +} + +static struct spdk_sock_group_impl * +uring_sock_group_impl_create(void) +{ + struct spdk_uring_sock_group_impl *group_impl; + + group_impl = calloc(1, sizeof(*group_impl)); + if (group_impl == NULL) { + SPDK_ERRLOG("group_impl allocation failed\n"); + return NULL; + } + + group_impl->io_avail = SPDK_SOCK_GROUP_QUEUE_DEPTH; + + if (io_uring_queue_init(SPDK_SOCK_GROUP_QUEUE_DEPTH, &group_impl->uring, 0) < 0) { + SPDK_ERRLOG("uring I/O context setup failure\n"); + free(group_impl); + return NULL; + } + + TAILQ_INIT(&group_impl->pending_recv); + + return &group_impl->base; +} + +static int +uring_sock_group_impl_add_sock(struct spdk_sock_group_impl *_group, + struct spdk_sock *_sock) +{ + struct spdk_uring_sock *sock = __uring_sock(_sock); + struct spdk_uring_sock_group_impl *group = __uring_group_impl(_group); + + sock->group = group; + sock->write_task.sock = sock; + sock->write_task.type = SPDK_SOCK_TASK_WRITE; + + sock->pollin_task.sock = sock; + sock->pollin_task.type = SPDK_SOCK_TASK_POLLIN; + + sock->cancel_task.sock = sock; + sock->cancel_task.type = SPDK_SOCK_TASK_CANCEL; + + /* switched from another polling group due to scheduling */ + if (spdk_unlikely(sock->recv_pipe != NULL && + (spdk_pipe_reader_bytes_available(sock->recv_pipe) > 0))) { + assert(sock->pending_recv == false); + sock->pending_recv = true; + TAILQ_INSERT_TAIL(&group->pending_recv, sock, link); + } + + return 0; +} + +static int +uring_sock_group_impl_poll(struct spdk_sock_group_impl *_group, int max_events, + struct spdk_sock **socks) +{ + struct spdk_uring_sock_group_impl *group = __uring_group_impl(_group); + int count, ret; + int to_complete, to_submit; + struct spdk_sock *_sock, *tmp; + struct spdk_uring_sock *sock; + + if (spdk_likely(socks)) { + TAILQ_FOREACH_SAFE(_sock, &group->base.socks, link, tmp) { + sock = __uring_sock(_sock); + if (spdk_unlikely(sock->connection_status)) { + continue; + } + _sock_flush(_sock); + _sock_prep_pollin(_sock); + } + } + + to_submit = group->io_queued; + + /* For network I/O, it cannot be set with O_DIRECT, so we do not need to call spdk_io_uring_enter */ + if (to_submit > 0) { + /* If there are I/O to submit, use io_uring_submit here. + * It will automatically call io_uring_enter appropriately. */ + ret = io_uring_submit(&group->uring); + if (ret < 0) { + return 1; + } + group->io_queued = 0; + group->io_inflight += to_submit; + group->io_avail -= to_submit; + } + + count = 0; + to_complete = group->io_inflight; + if (to_complete > 0) { + count = sock_uring_group_reap(group, to_complete, max_events, socks); + } + + return count; +} + +static int +uring_sock_group_impl_remove_sock(struct spdk_sock_group_impl *_group, + struct spdk_sock *_sock) +{ + struct spdk_uring_sock *sock = __uring_sock(_sock); + struct spdk_uring_sock_group_impl *group = __uring_group_impl(_group); + + if (sock->write_task.status != SPDK_URING_SOCK_TASK_NOT_IN_USE) { + _sock_prep_cancel_task(_sock, &sock->write_task); + /* Since spdk_sock_group_remove_sock is not asynchronous interface, so + * currently can use a while loop here. */ + while ((sock->write_task.status != SPDK_URING_SOCK_TASK_NOT_IN_USE) || + (sock->cancel_task.status != SPDK_URING_SOCK_TASK_NOT_IN_USE)) { + uring_sock_group_impl_poll(_group, 32, NULL); + } + } + + if (sock->pollin_task.status != SPDK_URING_SOCK_TASK_NOT_IN_USE) { + _sock_prep_cancel_task(_sock, &sock->pollin_task); + /* Since spdk_sock_group_remove_sock is not asynchronous interface, so + * currently can use a while loop here. */ + while ((sock->pollin_task.status != SPDK_URING_SOCK_TASK_NOT_IN_USE) || + (sock->cancel_task.status != SPDK_URING_SOCK_TASK_NOT_IN_USE)) { + uring_sock_group_impl_poll(_group, 32, NULL); + } + } + + if (sock->pending_recv) { + TAILQ_REMOVE(&group->pending_recv, sock, link); + sock->pending_recv = false; + } + assert(sock->pending_recv == false); + + sock->group = NULL; + return 0; +} + +static int +uring_sock_group_impl_close(struct spdk_sock_group_impl *_group) +{ + struct spdk_uring_sock_group_impl *group = __uring_group_impl(_group); + + /* try to reap all the active I/O */ + while (group->io_inflight) { + uring_sock_group_impl_poll(_group, 32, NULL); + } + assert(group->io_inflight == 0); + assert(group->io_avail == SPDK_SOCK_GROUP_QUEUE_DEPTH); + + io_uring_queue_exit(&group->uring); + + free(group); + return 0; +} + +static int +uring_sock_flush(struct spdk_sock *_sock) +{ + struct spdk_uring_sock *sock = __uring_sock(_sock); + + if (!sock->group) { + return _sock_flush_client(_sock); + } + + return 0; +} + +static struct spdk_net_impl g_uring_net_impl = { + .name = "uring", + .getaddr = uring_sock_getaddr, + .connect = uring_sock_connect, + .listen = uring_sock_listen, + .accept = uring_sock_accept, + .close = uring_sock_close, + .recv = uring_sock_recv, + .readv = uring_sock_readv, + .writev = uring_sock_writev, + .writev_async = uring_sock_writev_async, + .flush = uring_sock_flush, + .set_recvlowat = uring_sock_set_recvlowat, + .set_recvbuf = uring_sock_set_recvbuf, + .set_sendbuf = uring_sock_set_sendbuf, + .is_ipv6 = uring_sock_is_ipv6, + .is_ipv4 = uring_sock_is_ipv4, + .is_connected = uring_sock_is_connected, + .get_placement_id = uring_sock_get_placement_id, + .group_impl_create = uring_sock_group_impl_create, + .group_impl_add_sock = uring_sock_group_impl_add_sock, + .group_impl_remove_sock = uring_sock_group_impl_remove_sock, + .group_impl_poll = uring_sock_group_impl_poll, + .group_impl_close = uring_sock_group_impl_close, +}; + +SPDK_NET_IMPL_REGISTER(uring, &g_uring_net_impl, DEFAULT_SOCK_PRIORITY + 1); diff --git a/src/spdk/module/sock/vpp/Makefile b/src/spdk/module/sock/vpp/Makefile new file mode 100644 index 000000000..016018c77 --- /dev/null +++ b/src/spdk/module/sock/vpp/Makefile @@ -0,0 +1,55 @@ +# +# BSD LICENSE +# +# Copyright (c) Intel Corporation. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions +# are met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above copyright +# notice, this list of conditions and the following disclaimer in +# the documentation and/or other materials provided with the +# distribution. +# * Neither the name of Intel Corporation nor the names of its +# contributors may be used to endorse or promote products derived +# from this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. +# + +SPDK_ROOT_DIR := $(abspath $(CURDIR)/../../..) +include $(SPDK_ROOT_DIR)/mk/spdk.common.mk + +SO_VER := 2 +SO_MINOR := 0 + +C_SRCS += vpp.c +CFLAGS += -Wno-sign-compare -Wno-error=old-style-definition +CFLAGS += -Wno-error=strict-prototypes -Wno-error=ignored-qualifiers + +GCC_VERSION=$(shell $(CC) -dumpversion | cut -d. -f1) + +# disable packed member unalign warnings +ifeq ($(shell test $(GCC_VERSION) -ge 9 && echo 1), 1) +CFLAGS += -Wno-error=address-of-packed-member +endif + +LIBNAME = sock_vpp + +SPDK_MAP_FILE = $(SPDK_ROOT_DIR)/mk/spdk_blank.map + +include $(SPDK_ROOT_DIR)/mk/spdk.lib.mk diff --git a/src/spdk/module/sock/vpp/vpp.c b/src/spdk/module/sock/vpp/vpp.c new file mode 100644 index 000000000..89a92e9d1 --- /dev/null +++ b/src/spdk/module/sock/vpp/vpp.c @@ -0,0 +1,1633 @@ +/*- + * BSD LICENSE + * + * Copyright (c) Intel Corporation. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions + * are met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in + * the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Intel Corporation nor the names of its + * contributors may be used to endorse or promote products derived + * from this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +/* Omit from static analysis. */ +#ifndef __clang_analyzer__ + +#include "spdk/stdinc.h" + +#include "spdk/log.h" +#include "spdk/sock.h" +#include "spdk/net.h" +#include "spdk/string.h" +#include "spdk_internal/sock.h" +#include "spdk/queue.h" +#include "spdk/event.h" +#include "spdk/thread.h" +#include "spdk_internal/log.h" + +/* _GNU_SOURCE is redefined in the vpp headers with no protection (dlmalloc.h) */ +#undef _GNU_SOURCE + +#include <svm/svm_fifo_segment.h> +#include <vlibmemory/api.h> +#include <vpp/api/vpe_msg_enum.h> +#include <vnet/session/application_interface.h> + +#define vl_typedefs /* define message structures */ +#include <vpp/api/vpe_all_api_h.h> +#undef vl_typedefs + +/* declare message handlers for each api */ + +#define vl_endianfun /* define message structures */ +#include <vpp/api/vpe_all_api_h.h> +#undef vl_endianfun + +/* instantiate all the print functions we know about */ +#define vl_print(handle, ...) +#define vl_printfun +#include <vpp/api/vpe_all_api_h.h> +#undef vl_printfun + +#define SPDK_VPP_CLIB_MEM_SIZE 256 << 20 +#define SPDK_VPP_SESSIONS_MAX 2048 +#define SPDK_VPP_LISTEN_QUEUE_SIZE SPDK_VPP_SESSIONS_MAX +#define SPDK_VPP_SEGMENT_BASEVA 0x200000000ULL +#define SPDK_VPP_SEGMENT_TIMEOUT 20 +#define IOV_BATCH_SIZE 64 + +/* VPP connection state */ +enum spdk_vpp_state { + VPP_STATE_START, + VPP_STATE_ENABLED, + VPP_STATE_ATTACHED, + VPP_STATE_READY, + VPP_STATE_DISCONNECTING, + VPP_STATE_FAILED +}; + +/* VPP session state */ +enum spdk_vpp_session_state { + VPP_SESSION_STATE_UNUSED = 0, + VPP_SESSION_STATE_INIT, /* Initial state */ + VPP_SESSION_STATE_READY, /* Ready for processing */ + VPP_SESSION_STATE_DISCONNECT, + VPP_SESSION_STATE_CLOSE, + VPP_SESSION_STATE_FAILED +}; + +struct spdk_vpp_session { + struct spdk_sock base; + + /* VPP app session */ + app_session_t app_session; + + uint32_t id; + + bool is_server; /* Server side session */ + bool is_listen; /* Session is listener */ + + uint64_t handle; + uint32_t context; + + /* Listener fields */ + pthread_mutex_t accept_session_lock; + uint32_t *accept_session_index_fifo; +}; + +static struct spdk_vpp_main { + int my_client_index; + enum spdk_vpp_state vpp_state; + bool vpp_initialized; + struct spdk_thread *init_thread; + + svm_fifo_segment_main_t segment_main; + svm_queue_t *vl_input_queue; + svm_queue_t *vl_output_queue; + svm_msg_q_t *app_event_queue; + + struct spdk_vpp_session sessions[SPDK_VPP_SESSIONS_MAX]; + pthread_mutex_t session_get_lock; + + struct spdk_poller *vpp_queue_poller; + struct spdk_poller *app_queue_poller; + struct spdk_poller *timeout_poller; +} g_svm; + +struct spdk_vpp_sock_group_impl { + struct spdk_sock_group_impl base; + struct spdk_sock *last_sock; +}; + +#define __vpp_session(sock) ((struct spdk_vpp_session *)sock) +#define __vpp_group_impl(group) ((struct spdk_vpp_sock_group_impl *)group) + +/****************************************************************************** + * Session management + */ +static struct spdk_vpp_session * +vpp_session_create(void) +{ + struct spdk_vpp_session *session; + int i; + + pthread_mutex_lock(&g_svm.session_get_lock); + for (i = 0; i < SPDK_VPP_SESSIONS_MAX && + g_svm.sessions[i].app_session.session_state != VPP_SESSION_STATE_UNUSED; i++) { + /* Empty loop body */ + } + if (i == SPDK_VPP_SESSIONS_MAX || + g_svm.sessions[i].app_session.session_state != VPP_SESSION_STATE_UNUSED) { + SPDK_ERRLOG("Cannot allocate space for new session\n"); + pthread_mutex_unlock(&g_svm.session_get_lock); + return NULL; + } + session = &g_svm.sessions[i]; + memset(session, 0, sizeof(struct spdk_vpp_session)); + pthread_mutex_init(&session->accept_session_lock, NULL); + + session->id = i; + session->app_session.session_state = VPP_SESSION_STATE_INIT; + + pthread_mutex_unlock(&g_svm.session_get_lock); + + SPDK_DEBUGLOG(SPDK_SOCK_VPP, "Creating new session %p (%d)\n", + session, session->id); + + return session; +} + +static struct spdk_vpp_session * +vpp_session_get(uint32_t id) +{ + struct spdk_vpp_session *session = NULL; + + if (id >= SPDK_VPP_SESSIONS_MAX) { + return NULL; + } + + pthread_mutex_lock(&g_svm.session_get_lock); + if (g_svm.sessions[id].app_session.session_state != VPP_SESSION_STATE_UNUSED) { + session = &g_svm.sessions[id]; + } + pthread_mutex_unlock(&g_svm.session_get_lock); + + return session; +} + +static struct spdk_vpp_session * +vpp_session_get_by_handle(uint64_t handle, bool is_listen) +{ + struct spdk_vpp_session *session = NULL; + int i; + + for (i = 0; i < SPDK_VPP_SESSIONS_MAX; i++) { + if (g_svm.sessions[i].app_session.session_state != VPP_SESSION_STATE_UNUSED && + g_svm.sessions[i].app_session.session_state != VPP_SESSION_STATE_DISCONNECT && + g_svm.sessions[i].handle == handle && + g_svm.sessions[i].is_listen == is_listen) { + session = &g_svm.sessions[i]; + break; + } + } + + return session; +} + +static int +vpp_session_free(struct spdk_vpp_session *session) +{ + /* Remove session */ + if (session == NULL) { + SPDK_ERRLOG("Wrong session\n"); + return -EINVAL; + } + + SPDK_DEBUGLOG(SPDK_SOCK_VPP, "Free session %p (%d)\n", session, session->id); + + pthread_mutex_lock(&g_svm.session_get_lock); + session->app_session.session_state = VPP_SESSION_STATE_UNUSED; + pthread_mutex_destroy(&session->accept_session_lock); + pthread_mutex_unlock(&g_svm.session_get_lock); + + return 0; +} + +static int +vpp_sock_getaddr(struct spdk_sock *_sock, char *saddr, int slen, uint16_t *sport, + char *caddr, int clen, uint16_t *cport) +{ + struct spdk_vpp_session *session = __vpp_session(_sock); + const char *result = NULL; + + assert(session != NULL); + assert(g_svm.vpp_initialized); + + if (session->app_session.transport.is_ip4) { + result = inet_ntop(AF_INET, &session->app_session.transport.lcl_ip.ip4.as_u8, + saddr, slen); + } else { + result = inet_ntop(AF_INET6, &session->app_session.transport.lcl_ip.ip6.as_u8, + saddr, slen); + } + if (result == NULL) { + return -1; + } + + if (sport) { + *sport = ntohs(session->app_session.transport.lcl_port); + } + + if (session->app_session.transport.is_ip4) { + result = inet_ntop(AF_INET, &session->app_session.transport.rmt_ip.ip4.as_u8, + caddr, clen); + } else { + result = inet_ntop(AF_INET6, &session->app_session.transport.rmt_ip.ip6.as_u8, + caddr, clen); + } + if (result == NULL) { + return -1; + } + + if (cport) { + *cport = ntohs(session->app_session.transport.rmt_port); + } + + return 0; +} + +enum spdk_vpp_create_type { + SPDK_SOCK_CREATE_LISTEN, + SPDK_SOCK_CREATE_CONNECT, +}; + +/****************************************************************************** + * VPP message handlers + */ +static void +session_accepted_handler(session_accepted_msg_t *mp) +{ + svm_fifo_t *rx_fifo, *tx_fifo; + struct spdk_vpp_session *client_session, *listen_session; + + pthread_mutex_lock(&g_svm.session_get_lock); + listen_session = vpp_session_get_by_handle(mp->listener_handle, true); + pthread_mutex_unlock(&g_svm.session_get_lock); + if (!listen_session) { + SPDK_ERRLOG("Listener not found\n"); + return; + } + + SPDK_DEBUGLOG(SPDK_SOCK_VPP, "Listeners handle is %" PRIu64 "\n", mp->listener_handle); + + /* Allocate local session for a client and set it up */ + client_session = vpp_session_create(); + if (client_session == NULL) { + SPDK_ERRLOG("Cannot create new session\n"); + return; + } + + SPDK_DEBUGLOG(SPDK_SOCK_VPP, "Accept session %p (%d) on %p (%d/%" PRIu64 ")\n", + client_session, client_session->id, listen_session, listen_session->id, + listen_session->handle); + + rx_fifo = uword_to_pointer(mp->server_rx_fifo, svm_fifo_t *); + rx_fifo->client_session_index = client_session->id; + tx_fifo = uword_to_pointer(mp->server_tx_fifo, svm_fifo_t *); + tx_fifo->client_session_index = client_session->id; + + client_session->handle = mp->handle; + client_session->context = mp->context; + client_session->app_session.rx_fifo = rx_fifo; + client_session->app_session.tx_fifo = tx_fifo; + client_session->app_session.vpp_evt_q = uword_to_pointer(mp->vpp_event_queue_address, + svm_msg_q_t *); + + client_session->is_server = true; + client_session->app_session.transport.rmt_port = mp->port; + client_session->app_session.transport.is_ip4 = mp->is_ip4; + memcpy(&client_session->app_session.transport.rmt_ip, mp->ip, sizeof(mp->ip)); + + client_session->app_session.transport.lcl_port = listen_session->app_session.transport.lcl_port; + memcpy(&client_session->app_session.transport.lcl_ip, &listen_session->app_session.transport.lcl_ip, + sizeof(listen_session->app_session.transport.lcl_ip)); + client_session->app_session.transport.is_ip4 = listen_session->app_session.transport.is_ip4; + + client_session->app_session.session_state = VPP_SESSION_STATE_READY; + + pthread_mutex_lock(&listen_session->accept_session_lock); + + clib_fifo_add1(listen_session->accept_session_index_fifo, + client_session->id); + + pthread_mutex_unlock(&listen_session->accept_session_lock); +} + +static void +session_connected_handler(session_connected_msg_t *mp) +{ + struct spdk_vpp_session *session; + svm_fifo_t *rx_fifo, *tx_fifo; + + session = vpp_session_get(mp->context); + if (session == NULL) { + return; + } + + if (mp->retval) { + SPDK_ERRLOG("Connection failed (%d).\n", ntohl(mp->retval)); + session->app_session.session_state = VPP_SESSION_STATE_FAILED; + return; + } + + session->app_session.vpp_evt_q = uword_to_pointer(mp->vpp_event_queue_address, + svm_msg_q_t *); + + rx_fifo = uword_to_pointer(mp->server_rx_fifo, svm_fifo_t *); + rx_fifo->client_session_index = session->id; + tx_fifo = uword_to_pointer(mp->server_tx_fifo, svm_fifo_t *); + tx_fifo->client_session_index = session->id; + + session->app_session.rx_fifo = rx_fifo; + session->app_session.tx_fifo = tx_fifo; + session->handle = mp->handle; + + /* Set lcl addr */ + session->app_session.transport.is_ip4 = mp->is_ip4; + memcpy(&session->app_session.transport.lcl_ip, mp->lcl_ip, sizeof(mp->lcl_ip)); + session->app_session.transport.lcl_port = mp->lcl_port; + + session->app_session.session_state = VPP_SESSION_STATE_READY; +} + +static void +session_disconnected_handler(session_disconnected_msg_t *mp) +{ + struct spdk_vpp_session *session = 0; + + pthread_mutex_lock(&g_svm.session_get_lock); + session = vpp_session_get_by_handle(mp->handle, false); + if (session == NULL) { + SPDK_ERRLOG("Session with handle=%" PRIu64 " not found.\n", + mp->handle); + pthread_mutex_unlock(&g_svm.session_get_lock); + return; + } + SPDK_DEBUGLOG(SPDK_SOCK_VPP, "Disconnect session %p (%d) handler\n", session, session->id); + + /* We need to postpone session deletion to inform upper layer */ + session->app_session.session_state = VPP_SESSION_STATE_DISCONNECT; + pthread_mutex_unlock(&g_svm.session_get_lock); +} + +static void +session_reset_handler(session_reset_msg_t *mp) +{ + int rv = 0; + struct spdk_vpp_session *session = NULL; + app_session_evt_t app_evt; + session_reset_reply_msg_t *rmp; + + pthread_mutex_lock(&g_svm.session_get_lock); + session = vpp_session_get_by_handle(mp->handle, false); + if (session == NULL) { + SPDK_ERRLOG("Session with handle=%" PRIu64 " not found.\n", + mp->handle); + pthread_mutex_unlock(&g_svm.session_get_lock); + return; + } + SPDK_DEBUGLOG(SPDK_SOCK_VPP, "Reset session %p (%d) handler\n", session, session->id); + + session->app_session.session_state = VPP_SESSION_STATE_DISCONNECT; + pthread_mutex_unlock(&g_svm.session_get_lock); + + app_alloc_ctrl_evt_to_vpp(session->app_session.vpp_evt_q, &app_evt, + SESSION_CTRL_EVT_RESET_REPLY); + rmp = (session_reset_reply_msg_t *) app_evt.evt->data; + rmp->retval = rv; + rmp->handle = mp->handle; + app_send_ctrl_evt_to_vpp(session->app_session.vpp_evt_q, &app_evt); +} + +static void +session_bound_handler(session_bound_msg_t *mp) +{ + struct spdk_vpp_session *session; + + /* Context should be set to the session index */ + session = vpp_session_get(mp->context); + + if (mp->retval) { + SPDK_ERRLOG("Bind failed (%d).\n", ntohl(mp->retval)); + session->app_session.session_state = VPP_SESSION_STATE_FAILED; + return; + } + + /* Set local address */ + session->app_session.transport.is_ip4 = mp->lcl_is_ip4; + memcpy(&session->app_session.transport.lcl_ip, mp->lcl_ip, sizeof(mp->lcl_ip)); + session->app_session.transport.lcl_port = mp->lcl_port; + + /* Register listener */ + session->handle = mp->handle; + + SPDK_DEBUGLOG(SPDK_SOCK_VPP, "Bind session %p (%d/%" PRIu64 ")\n", + session, session->id, session->handle); + + /* Session binded, set listen state */ + session->is_listen = true; + session->app_session.session_state = VPP_SESSION_STATE_READY; +} + +static void +session_unlisten_reply_handler(session_unlisten_reply_msg_t *mp) +{ + struct spdk_vpp_session *session; + + if (mp->retval != 0) { + SPDK_ERRLOG("Cannot unbind socket\n"); + return; + } + + session = vpp_session_get(mp->context); + if (session == NULL) { + SPDK_ERRLOG("Cannot find a session by context\n"); + return; + } + SPDK_DEBUGLOG(SPDK_SOCK_VPP, "Unbind session %p (%d)\n", session, session->id); + + session->app_session.session_state = VPP_SESSION_STATE_CLOSE; +} + +static void +handle_mq_event(session_event_t *e) +{ + switch (e->event_type) { + case SESSION_CTRL_EVT_BOUND: + session_bound_handler((session_bound_msg_t *) e->data); + break; + case SESSION_CTRL_EVT_ACCEPTED: + session_accepted_handler((session_accepted_msg_t *) e->data); + break; + case SESSION_CTRL_EVT_CONNECTED: + session_connected_handler((session_connected_msg_t *) e->data); + break; + case SESSION_CTRL_EVT_DISCONNECTED: + session_disconnected_handler((session_disconnected_msg_t *) e->data); + break; + case SESSION_CTRL_EVT_RESET: + session_reset_handler((session_reset_msg_t *) e->data); + break; + case SESSION_CTRL_EVT_UNLISTEN_REPLY: + session_unlisten_reply_handler((session_unlisten_reply_msg_t *) e->data); + break; + default: + SPDK_DEBUGLOG(SPDK_SOCK_VPP, "Unhandled event %u\n", e->event_type); + } +} + +static int +vpp_queue_poller(void *ctx) +{ + uword msg; + + if (g_svm.vl_output_queue->cursize > 0 && + !svm_queue_sub_raw(g_svm.vl_output_queue, (u8 *)&msg)) { + vl_msg_api_handler((void *)msg); + } + + return SPDK_POLLER_BUSY; +} + +static int +app_queue_poller(void *ctx) +{ + session_event_t *e; + svm_msg_q_msg_t msg; + + if (!svm_msg_q_is_empty(g_svm.app_event_queue)) { + svm_msg_q_sub(g_svm.app_event_queue, &msg, SVM_Q_WAIT, 0); + e = svm_msg_q_msg_data(g_svm.app_event_queue, &msg); + handle_mq_event(e); + svm_msg_q_free_msg(g_svm.app_event_queue, &msg); + } + return SPDK_POLLER_BUSY; +} + +/* This is required until sock.c API changes to asynchronous */ +static int +_wait_for_session_state_change(struct spdk_vpp_session *session, enum spdk_vpp_session_state state) +{ + time_t start = time(NULL); + while (time(NULL) - start < 10) { + if (session->app_session.session_state == VPP_SESSION_STATE_FAILED) { + errno = EADDRNOTAVAIL; + return -1; + } + if (session->app_session.session_state == state) { + errno = 0; + return 0; + } + if (spdk_get_thread() == g_svm.init_thread) { + usleep(100000); + app_queue_poller(NULL); + vpp_queue_poller(NULL); + } + } + /* timeout */ + errno = ETIMEDOUT; + return -1; +} + +static int +vpp_session_connect(struct spdk_vpp_session *session) +{ + vl_api_connect_sock_t *cmp; + + cmp = vl_msg_api_alloc(sizeof(*cmp)); + if (cmp == NULL) { + return -ENOMEM; + } + memset(cmp, 0, sizeof(*cmp)); + + cmp->_vl_msg_id = ntohs(VL_API_CONNECT_SOCK); + cmp->client_index = g_svm.my_client_index; + cmp->context = session->id; + + cmp->vrf = 0 /* VPPCOM_VRF_DEFAULT */; + cmp->is_ip4 = (session->app_session.transport.is_ip4); + memcpy(cmp->ip, &session->app_session.transport.rmt_ip, sizeof(cmp->ip)); + cmp->port = session->app_session.transport.rmt_port; + cmp->proto = TRANSPORT_PROTO_TCP; + vl_msg_api_send_shmem(g_svm.vl_input_queue, (u8 *)&cmp); + + return _wait_for_session_state_change(session, VPP_SESSION_STATE_READY); +} + +static void +vl_api_disconnect_session_reply_t_handler(vl_api_disconnect_session_reply_t *mp) +{ + struct spdk_vpp_session *session; + + if (mp->retval) { + SPDK_ERRLOG("Disconnecting session failed (%d).\n", ntohl(mp->retval)); + return; + } + + pthread_mutex_lock(&g_svm.session_get_lock); + session = vpp_session_get_by_handle(mp->handle, false); + if (session == NULL) { + SPDK_ERRLOG("Invalid session handler (%" PRIu64 ").\n", mp->handle); + pthread_mutex_unlock(&g_svm.session_get_lock); + return; + } + SPDK_DEBUGLOG(SPDK_SOCK_VPP, "Session disconnected %p (%d)\n", session, session->id); + session->app_session.session_state = VPP_SESSION_STATE_CLOSE; + pthread_mutex_unlock(&g_svm.session_get_lock); +} + +static int +vpp_session_disconnect(struct spdk_vpp_session *session) +{ + int rv = 0; + vl_api_disconnect_session_t *dmp; + session_disconnected_reply_msg_t *rmp; + app_session_evt_t app_evt; + + if (session->app_session.session_state == VPP_SESSION_STATE_DISCONNECT) { + SPDK_DEBUGLOG(SPDK_SOCK_VPP, "Session is already in disconnecting state %p (%d)\n", + session, session->id); + + app_alloc_ctrl_evt_to_vpp(session->app_session.vpp_evt_q, &app_evt, + SESSION_CTRL_EVT_DISCONNECTED_REPLY); + rmp = (session_disconnected_reply_msg_t *) app_evt.evt->data; + rmp->retval = rv; + rmp->handle = session->handle; + rmp->context = session->context; + app_send_ctrl_evt_to_vpp(session->app_session.vpp_evt_q, &app_evt); + + return 0; + } + SPDK_DEBUGLOG(SPDK_SOCK_VPP, "Disconnect session %p (%d)\n", session, session->id); + + dmp = vl_msg_api_alloc(sizeof(*dmp)); + if (dmp == NULL) { + return -ENOMEM; + } + memset(dmp, 0, sizeof(*dmp)); + dmp->_vl_msg_id = ntohs(VL_API_DISCONNECT_SESSION); + dmp->client_index = g_svm.my_client_index; + dmp->handle = session->handle; + vl_msg_api_send_shmem(g_svm.vl_input_queue, (u8 *)&dmp); + + return _wait_for_session_state_change(session, VPP_SESSION_STATE_CLOSE); +} + +static int +send_unbind_sock(struct spdk_vpp_session *session) +{ + vl_api_unbind_sock_t *ump; + + SPDK_DEBUGLOG(SPDK_SOCK_VPP, "Unbind session %p (%d) request\n", session, session->id); + + ump = vl_msg_api_alloc(sizeof(*ump)); + if (ump == NULL) { + return -ENOMEM; + } + memset(ump, 0, sizeof(*ump)); + + ump->_vl_msg_id = ntohs(VL_API_UNBIND_SOCK); + ump->client_index = g_svm.my_client_index; + ump->handle = session->handle; + ump->context = session->id; + vl_msg_api_send_shmem(g_svm.vl_input_queue, (u8 *)&ump); + + return _wait_for_session_state_change(session, VPP_SESSION_STATE_CLOSE); +} + +static int +vpp_session_listen(struct spdk_vpp_session *session) +{ + vl_api_bind_sock_t *bmp; + + if (session->is_listen) { + /* Already in the listen state */ + return 0; + } + + clib_fifo_resize(session->accept_session_index_fifo, SPDK_VPP_LISTEN_QUEUE_SIZE); + + session->is_server = 1; + bmp = vl_msg_api_alloc(sizeof(*bmp)); + if (bmp == NULL) { + return -ENOMEM; + } + memset(bmp, 0, sizeof(*bmp)); + + bmp->_vl_msg_id = ntohs(VL_API_BIND_SOCK); + bmp->client_index = g_svm.my_client_index; + bmp->context = session->id; + bmp->vrf = 0; + bmp->is_ip4 = session->app_session.transport.is_ip4; + memcpy(bmp->ip, &session->app_session.transport.lcl_ip, sizeof(bmp->ip)); + bmp->port = session->app_session.transport.lcl_port; + bmp->proto = TRANSPORT_PROTO_TCP; + + vl_msg_api_send_shmem(g_svm.vl_input_queue, (u8 *)&bmp); + + return _wait_for_session_state_change(session, VPP_SESSION_STATE_READY); +} + +static struct spdk_sock * +vpp_sock_create(const char *ip, int port, enum spdk_vpp_create_type type, + struct spdk_sock_opts *opts) +{ + struct spdk_vpp_session *session; + int rc; + uint8_t is_ip4 = 0; + ip46_address_t addr_buf; + + if (!g_svm.vpp_initialized || ip == NULL) { + return NULL; + } + + session = vpp_session_create(); + if (session == NULL) { + SPDK_ERRLOG("vpp_session_create() failed\n"); + errno = ENOMEM; + return NULL; + } + + /* Check address family */ + if (inet_pton(AF_INET, ip, &addr_buf.ip4.as_u8)) { + is_ip4 = 1; + } else if (inet_pton(AF_INET6, ip, &addr_buf.ip6.as_u8)) { + is_ip4 = 0; + } else { + SPDK_ERRLOG("IP address with invalid format\n"); + errno = EAFNOSUPPORT; + goto err; + } + + if (type == SPDK_SOCK_CREATE_LISTEN) { + session->app_session.transport.is_ip4 = is_ip4; + memcpy(&session->app_session.transport.lcl_ip, &addr_buf, sizeof(addr_buf)); + session->app_session.transport.lcl_port = htons(port); + + rc = vpp_session_listen(session); + if (rc != 0) { + errno = -rc; + SPDK_ERRLOG("session_listen() failed\n"); + goto err; + } + } else if (type == SPDK_SOCK_CREATE_CONNECT) { + session->app_session.transport.is_ip4 = is_ip4; + memcpy(&session->app_session.transport.rmt_ip, &addr_buf, sizeof(addr_buf)); + session->app_session.transport.rmt_port = htons(port); + + rc = vpp_session_connect(session); + if (rc != 0) { + SPDK_ERRLOG("session_connect() failed\n"); + goto err; + } + } + + return &session->base; + +err: + vpp_session_free(session); + return NULL; +} + +static struct spdk_sock * +vpp_sock_listen(const char *ip, int port, struct spdk_sock_opts *opts) +{ + return vpp_sock_create(ip, port, SPDK_SOCK_CREATE_LISTEN, opts); +} + +static struct spdk_sock * +vpp_sock_connect(const char *ip, int port, struct spdk_sock_opts *opts) +{ + return vpp_sock_create(ip, port, SPDK_SOCK_CREATE_CONNECT, opts); +} + +static struct spdk_sock * +vpp_sock_accept(struct spdk_sock *_sock) +{ + struct spdk_vpp_session *listen_session = __vpp_session(_sock); + struct spdk_vpp_session *client_session = NULL; + u32 client_session_index = ~0; + uword elts = 0; + app_session_evt_t app_evt; + session_accepted_reply_msg_t *rmp; + + assert(listen_session != NULL); + assert(g_svm.vpp_initialized); + + if (listen_session->app_session.session_state != VPP_SESSION_STATE_READY) { + /* Listen session should be in the listen state */ + errno = EWOULDBLOCK; + return NULL; + } + + pthread_mutex_lock(&listen_session->accept_session_lock); + + if (listen_session->accept_session_index_fifo != NULL) { + elts = clib_fifo_elts(listen_session->accept_session_index_fifo); + } + + if (elts == 0) { + /* No client sessions */ + errno = EAGAIN; + pthread_mutex_unlock(&listen_session->accept_session_lock); + return NULL; + } + + clib_fifo_sub1(listen_session->accept_session_index_fifo, + client_session_index); + + pthread_mutex_unlock(&listen_session->accept_session_lock); + + client_session = vpp_session_get(client_session_index); + if (client_session == NULL) { + SPDK_ERRLOG("client session closed or aborted\n"); + errno = ECONNABORTED; + return NULL; + } + + SPDK_DEBUGLOG(SPDK_SOCK_VPP, "Client %p(%" PRIu32 ") accepted.\n", + client_session, client_session_index); + + /* + * Send accept session reply + */ + app_alloc_ctrl_evt_to_vpp(client_session->app_session.vpp_evt_q, &app_evt, + SESSION_CTRL_EVT_ACCEPTED_REPLY); + rmp = (session_accepted_reply_msg_t *) app_evt.evt->data; + rmp->handle = client_session->handle; + rmp->context = client_session->context; + app_send_ctrl_evt_to_vpp(client_session->app_session.vpp_evt_q, &app_evt); + + return &client_session->base; +} + +static int +vpp_sock_close(struct spdk_sock *_sock) +{ + struct spdk_vpp_session *session = __vpp_session(_sock); + + assert(session != NULL); + assert(g_svm.vpp_initialized); + + if (session->is_listen) { + send_unbind_sock(session); + } else { + vpp_session_disconnect(session); + } + vpp_session_free(session); + + return 0; +} + +static ssize_t +vpp_sock_recv(struct spdk_sock *_sock, void *buf, size_t len) +{ + struct spdk_vpp_session *session = __vpp_session(_sock); + int rc; + svm_fifo_t *rx_fifo; + uint32_t bytes; + + assert(session != NULL); + assert(g_svm.vpp_initialized); + + rx_fifo = session->app_session.rx_fifo; + + bytes = svm_fifo_max_dequeue(session->app_session.rx_fifo); + if (bytes > (ssize_t)len) { + bytes = len; + } + + if (bytes == 0) { + if (session->app_session.session_state == VPP_SESSION_STATE_DISCONNECT) { + /* Socket is disconnected */ + SPDK_DEBUGLOG(SPDK_SOCK_VPP, "Client %p(%" PRIu32 ") is disconnected.\n", + session, session->id); + errno = 0; + return 0; + } + errno = EAGAIN; + return -1; + } + + rc = app_recv_stream_raw(rx_fifo, buf, bytes, 0, 0); + if (rc < 0) { + errno = -rc; + return rc; + } + + return rc; +} + +static ssize_t +vpp_sock_readv(struct spdk_sock *_sock, struct iovec *iov, int iovcnt) +{ + ssize_t total = 0; + int i, rc; + + assert(_sock != NULL); + assert(g_svm.vpp_initialized); + + for (i = 0; i < iovcnt; ++i) { + rc = vpp_sock_recv(_sock, iov[i].iov_base, iov[i].iov_len); + if (rc < 0) { + if (total > 0) { + break; + } else { + errno = -rc; + return -1; + } + } else { + total += rc; + if (rc < (ssize_t)iov[i].iov_len) { + /* Read less than buffer provided, no point to continue. */ + break; + } + } + } + return total; +} + +static ssize_t +_vpp_sock_writev(struct spdk_sock *_sock, struct iovec *iov, int iovcnt) +{ + struct spdk_vpp_session *session = __vpp_session(_sock); + ssize_t total = 0; + int i, rc; + svm_fifo_t *tx_fifo; + session_evt_type_t et; + + assert(session != NULL); + assert(g_svm.vpp_initialized); + + tx_fifo = session->app_session.tx_fifo; + et = SESSION_IO_EVT_TX; + + for (i = 0; i < iovcnt; ++i) { + if (svm_fifo_is_full(tx_fifo)) { + errno = EWOULDBLOCK; + return -1; + } + + /* We use only stream connection for now */ + rc = app_send_stream_raw(tx_fifo, session->app_session.vpp_evt_q, + iov[i].iov_base, iov[i].iov_len, et, + 1, SVM_Q_WAIT); + + if (rc < 0) { + if (total > 0) { + break; + } else { + SPDK_DEBUGLOG(SPDK_SOCK_VPP, "Buffer overflow\n"); + errno = EWOULDBLOCK; + return -1; + } + } else { + total += rc; + if (rc < (ssize_t)iov[i].iov_len) { + /* Write less than buffer provided, no point to continue. */ + break; + } + } + } + + return total; +} + +static int +_sock_flush(struct spdk_sock *sock) +{ + struct iovec iovs[IOV_BATCH_SIZE]; + int iovcnt; + int retval; + struct spdk_sock_request *req; + int i; + ssize_t rc; + unsigned int offset; + size_t len; + + /* Can't flush from within a callback or we end up with recursive calls */ + if (sock->cb_cnt > 0) { + return 0; + } + + /* Gather an iov */ + iovcnt = 0; + req = TAILQ_FIRST(&sock->queued_reqs); + while (req) { + offset = req->internal.offset; + + for (i = 0; i < req->iovcnt; i++) { + /* Consume any offset first */ + if (offset >= SPDK_SOCK_REQUEST_IOV(req, i)->iov_len) { + offset -= SPDK_SOCK_REQUEST_IOV(req, i)->iov_len; + continue; + } + + iovs[iovcnt].iov_base = SPDK_SOCK_REQUEST_IOV(req, i)->iov_base + offset; + iovs[iovcnt].iov_len = SPDK_SOCK_REQUEST_IOV(req, i)->iov_len - offset; + iovcnt++; + + offset = 0; + + if (iovcnt >= IOV_BATCH_SIZE) { + break; + } + } + + if (iovcnt >= IOV_BATCH_SIZE) { + break; + } + + req = TAILQ_NEXT(req, internal.link); + } + + if (iovcnt == 0) { + return 0; + } + + /* Perform the vectored write */ + rc = _vpp_sock_writev(sock, iovs, iovcnt); + if (rc <= 0) { + if (errno == EAGAIN || errno == EWOULDBLOCK) { + return 0; + } + return rc; + } + + /* Consume the requests that were actually written */ + req = TAILQ_FIRST(&sock->queued_reqs); + while (req) { + offset = req->internal.offset; + + for (i = 0; i < req->iovcnt; i++) { + /* Advance by the offset first */ + if (offset >= SPDK_SOCK_REQUEST_IOV(req, i)->iov_len) { + offset -= SPDK_SOCK_REQUEST_IOV(req, i)->iov_len; + continue; + } + + /* Calculate the remaining length of this element */ + len = SPDK_SOCK_REQUEST_IOV(req, i)->iov_len - offset; + + if (len > (size_t)rc) { + /* This element was partially sent. */ + req->internal.offset += rc; + return 0; + } + + offset = 0; + req->internal.offset += len; + rc -= len; + } + + /* Handled a full request. */ + req->internal.offset = 0; + spdk_sock_request_pend(sock, req); + + /* The _vpp_sock_writev above isn't currently asynchronous, + * so it's already done. */ + retval = spdk_sock_request_put(sock, req, 0); + + if (rc == 0 || retval) { + break; + } + + req = TAILQ_FIRST(&sock->queued_reqs); + } + + return 0; +} + +static ssize_t +vpp_sock_writev(struct spdk_sock *_sock, struct iovec *iov, int iovcnt) +{ + int rc; + + /* In order to process a writev, we need to flush any asynchronous writes + * first. */ + rc = _sock_flush(_sock); + if (rc < 0) { + return rc; + } + + if (!TAILQ_EMPTY(&_sock->queued_reqs)) { + /* We weren't able to flush all requests */ + errno = EAGAIN; + return -1; + } + + return _vpp_sock_writev(_sock, iov, iovcnt); +} + +static void +vpp_sock_writev_async(struct spdk_sock *sock, struct spdk_sock_request *req) +{ + int rc; + + spdk_sock_request_queue(sock, req); + + if (sock->group_impl == NULL) { + spdk_sock_request_put(sock, req, -ENOTSUP); + return; + } + + /* If there are a sufficient number queued, just flush them out immediately. */ + if (sock->queued_iovcnt >= IOV_BATCH_SIZE) { + rc = _sock_flush(sock); + if (rc) { + spdk_sock_abort_requests(sock); + } + } +} + +static int +vpp_sock_set_recvlowat(struct spdk_sock *_sock, int nbytes) +{ + assert(g_svm.vpp_initialized); + + return 0; +} + +static int +vpp_sock_set_recvbuf(struct spdk_sock *_sock, int sz) +{ + assert(g_svm.vpp_initialized); + + return 0; +} + +static int +vpp_sock_set_sendbuf(struct spdk_sock *_sock, int sz) +{ + assert(g_svm.vpp_initialized); + + return 0; +} + +static bool +vpp_sock_is_ipv6(struct spdk_sock *_sock) +{ + return !__vpp_session(_sock)->app_session.transport.is_ip4; +} + +static bool +vpp_sock_is_ipv4(struct spdk_sock *_sock) +{ + return __vpp_session(_sock)->app_session.transport.is_ip4; +} + +static bool +vpp_sock_is_connected(struct spdk_sock *_sock) +{ + assert(g_svm.vpp_initialized); + + return (__vpp_session(_sock)->app_session.session_state == VPP_SESSION_STATE_READY); +} + +static int +vpp_sock_get_placement_id(struct spdk_sock *_sock, int *placement_id) +{ + return -1; +} + +static struct spdk_sock_group_impl * +vpp_sock_group_impl_create(void) +{ + struct spdk_vpp_sock_group_impl *group_impl; + + if (!g_svm.vpp_initialized) { + return NULL; + } + + group_impl = calloc(1, sizeof(*group_impl)); + if (group_impl == NULL) { + SPDK_ERRLOG("sock_group allocation failed\n"); + errno = ENOMEM; + return NULL; + } + + return &group_impl->base; +} + +static int +vpp_sock_group_impl_add_sock(struct spdk_sock_group_impl *_group, + struct spdk_sock *_sock) +{ + /* We expect that higher level do it for us */ + return 0; +} + +static int +vpp_sock_group_impl_remove_sock(struct spdk_sock_group_impl *_group, + struct spdk_sock *_sock) +{ + /* We expect that higher level do it for us */ + return 0; +} + +static bool +vpp_session_read_ready(struct spdk_vpp_session *session) +{ + svm_fifo_t *rx_fifo = NULL; + uint32_t ready = 0; + + if (session->app_session.session_state == VPP_SESSION_STATE_DISCONNECT) { + /* If session not found force reading to close it. + * NOTE: We're expecting here that upper layer will close + * connection when next read fails. + */ + return true; + } + + if (session->app_session.session_state == VPP_SESSION_STATE_READY) { + rx_fifo = session->app_session.rx_fifo; + ready = svm_fifo_max_dequeue(rx_fifo); + } + + return ready > 0; +} + +static int +vpp_sock_group_impl_poll(struct spdk_sock_group_impl *_group, int max_events, + struct spdk_sock **socks) +{ + int num_events, rc; + struct spdk_sock *sock, *tmp; + struct spdk_vpp_session *session; + struct spdk_vpp_sock_group_impl *group; + + assert(_group != NULL); + assert(socks != NULL); + assert(g_svm.vpp_initialized); + + group = __vpp_group_impl(_group); + num_events = 0; + + /* This must be a TAILQ_FOREACH_SAFE because while flushing, + * a completion callback could remove the sock from the + * group. */ + TAILQ_FOREACH_SAFE(sock, &_group->socks, link, tmp) { + rc = _sock_flush(sock); + if (rc) { + spdk_sock_abort_requests(sock); + } + } + + sock = group->last_sock; + if (sock == NULL) { + sock = TAILQ_FIRST(&group->base.socks); + } + + while (sock != NULL) { + session = __vpp_session(sock); + if (vpp_session_read_ready(session)) { + socks[num_events] = sock; + num_events++; + if (num_events >= max_events) { + sock = TAILQ_NEXT(sock, link); + break; + } + } + sock = TAILQ_NEXT(sock, link); + } + group->last_sock = sock; + + return num_events; +} + +static int +vpp_sock_group_impl_close(struct spdk_sock_group_impl *_group) +{ + free(_group); + return 0; +} + +/****************************************************************************** + * Initialize and attach to the VPP + */ +static int +vpp_app_attach(void) +{ + vl_api_application_attach_t *bmp; + u32 fifo_size = 16 << 20; + + bmp = vl_msg_api_alloc(sizeof(*bmp)); + if (bmp == NULL) { + return -ENOMEM; + } + memset(bmp, 0, sizeof(*bmp)); + + bmp->_vl_msg_id = ntohs(VL_API_APPLICATION_ATTACH); + bmp->client_index = g_svm.my_client_index; + bmp->context = ntohl(0xfeedface); + + bmp->options[APP_OPTIONS_FLAGS] = APP_OPTIONS_FLAGS_ACCEPT_REDIRECT; + bmp->options[APP_OPTIONS_FLAGS] |= APP_OPTIONS_FLAGS_ADD_SEGMENT; + + bmp->options[APP_OPTIONS_PREALLOC_FIFO_PAIRS] = 16; + bmp->options[APP_OPTIONS_RX_FIFO_SIZE] = fifo_size; + bmp->options[APP_OPTIONS_TX_FIFO_SIZE] = fifo_size; + bmp->options[APP_OPTIONS_ADD_SEGMENT_SIZE] = 256 << 20; + bmp->options[APP_OPTIONS_SEGMENT_SIZE] = 512 << 20; + bmp->options[APP_OPTIONS_EVT_QUEUE_SIZE] = 256; + + vl_msg_api_send_shmem(g_svm.vl_input_queue, (u8 *)&bmp); + + return 0; +} +static void +vl_api_session_enable_disable_reply_t_handler(vl_api_session_enable_disable_reply_t *mp) +{ + if (mp->retval) { + SPDK_ERRLOG("Session enable failed (%d).\n", ntohl(mp->retval)); + } else { + SPDK_NOTICELOG("Session layer enabled\n"); + g_svm.vpp_state = VPP_STATE_ENABLED; + vpp_app_attach(); + } +} + +static int +vpp_session_enable(u8 is_enable) +{ + vl_api_session_enable_disable_t *bmp; + + bmp = vl_msg_api_alloc(sizeof(*bmp)); + if (bmp == NULL) { + return -ENOMEM; + } + memset(bmp, 0, sizeof(*bmp)); + + bmp->_vl_msg_id = ntohs(VL_API_SESSION_ENABLE_DISABLE); + bmp->client_index = g_svm.my_client_index; + bmp->context = htonl(0xfeedface); + bmp->is_enable = is_enable; + vl_msg_api_send_shmem(g_svm.vl_input_queue, (u8 *)&bmp); + + return 0; +} + +static void +vpp_application_attached(void *arg) +{ + SPDK_NOTICELOG("VPP net framework initialized.\n"); + g_svm.vpp_state = VPP_STATE_ATTACHED; + g_svm.vpp_initialized = true; + g_svm.app_queue_poller = SPDK_POLLER_REGISTER(app_queue_poller, NULL, 100); + spdk_net_framework_init_next(0); +} + +static int +ssvm_segment_attach(char *name, ssvm_segment_type_t type, int fd) +{ + svm_fifo_segment_create_args_t a; + int rv; + + SPDK_DEBUGLOG(SPDK_SOCK_VPP, "Attaching segment %s\n", name); + + clib_memset(&a, 0, sizeof(a)); + a.segment_name = (char *) name; + a.segment_type = type; + + assert(type == SSVM_SEGMENT_MEMFD); + a.memfd_fd = fd; + + if ((rv = svm_fifo_segment_attach(&g_svm.segment_main, &a))) { + SPDK_ERRLOG("Segment '%s' attach failed (%d).\n", name, rv); + return rv; + } + + vec_reset_length(a.new_segment_indices); + return 0; +} + +static void +vl_api_application_attach_reply_t_handler(vl_api_application_attach_reply_t *mp) +{ + u32 n_fds = 0; + + if (mp->retval) { + SPDK_ERRLOG("Application attach to VPP failed (%d)\n", + ntohl(mp->retval)); + goto err; + } + + if (mp->segment_name_length == 0) { + SPDK_ERRLOG("segment_name_length zero\n"); + goto err; + } + + assert(mp->app_event_queue_address); + g_svm.app_event_queue = uword_to_pointer(mp->app_event_queue_address, svm_msg_q_t *); + + if (mp->n_fds) { + int fds[mp->n_fds]; + + vl_socket_client_recv_fd_msg(fds, mp->n_fds, 5); + + if (mp->fd_flags & SESSION_FD_F_VPP_MQ_SEGMENT) { + if (ssvm_segment_attach(0, SSVM_SEGMENT_MEMFD, fds[n_fds++])) { + goto err; + } + } + + if (mp->fd_flags & SESSION_FD_F_MEMFD_SEGMENT) { + if (ssvm_segment_attach((char *) mp->segment_name, SSVM_SEGMENT_MEMFD, fds[n_fds++])) { + goto err; + } + } + + if (mp->fd_flags & SESSION_FD_F_MQ_EVENTFD) { + svm_msg_q_set_consumer_eventfd(g_svm.app_event_queue, fds[n_fds++]); + } + } + + spdk_thread_send_msg(g_svm.init_thread, vpp_application_attached, NULL); + return; +err: + g_svm.vpp_state = VPP_STATE_FAILED; + return; +} + +/* Detach */ +static void +vpp_application_detached(void *arg) +{ + if (!g_svm.vpp_initialized) { + return; + } + + spdk_poller_unregister(&g_svm.vpp_queue_poller); + spdk_poller_unregister(&g_svm.app_queue_poller); + spdk_poller_unregister(&g_svm.timeout_poller); + + g_svm.vpp_initialized = false; + g_svm.vpp_state = VPP_STATE_START; + pthread_mutex_destroy(&g_svm.session_get_lock); + vl_socket_client_disconnect(); + + SPDK_NOTICELOG("Application detached\n"); + + spdk_net_framework_fini_next(); +} + +static int +vpp_application_detached_timeout(void *arg) +{ + if (g_svm.vpp_initialized) { + /* We need to finish detach on initial thread */ + spdk_thread_send_msg(g_svm.init_thread, vpp_application_detached, NULL); + } + return SPDK_POLLER_BUSY; +} + +static void +vl_api_application_detach_reply_t_handler(vl_api_application_detach_reply_t *mp) +{ + if (mp->retval) { + SPDK_ERRLOG("Application detach from VPP failed (%d).\n", ntohl(mp->retval)); + g_svm.vpp_state = VPP_STATE_FAILED; + } + + /* We need to finish detach on initial thread */ + spdk_thread_send_msg(g_svm.init_thread, vpp_application_detached, NULL); +} + +static int +vpp_app_detach(void) +{ + vl_api_application_detach_t *bmp; + + bmp = vl_msg_api_alloc(sizeof(*bmp)); + if (bmp == NULL) { + return -ENOMEM; + } + memset(bmp, 0, sizeof(*bmp)); + + bmp->_vl_msg_id = ntohs(VL_API_APPLICATION_DETACH); + bmp->client_index = g_svm.my_client_index; + bmp->context = ntohl(0xfeedface); + vl_msg_api_send_shmem(g_svm.vl_input_queue, (u8 *)&bmp); + + g_svm.timeout_poller = SPDK_POLLER_REGISTER(vpp_application_detached_timeout, + NULL, 10000000); + + return 0; +} + +static void +vl_api_map_another_segment_t_handler(vl_api_map_another_segment_t *mp) +{ + ssvm_segment_type_t seg_type = SSVM_SEGMENT_SHM; + int fd = -1; + + if (mp->fd_flags) { + vl_socket_client_recv_fd_msg(&fd, 1, 5); + seg_type = SSVM_SEGMENT_MEMFD; + } + + if (ssvm_segment_attach((char *) mp->segment_name, + seg_type, fd)) { + SPDK_ERRLOG("svm_fifo_segment_attach ('%s') failed\n", + mp->segment_name); + return; + } + + SPDK_DEBUGLOG(SPDK_SOCK_VPP, "New segment ('%s') attached\n", + mp->segment_name); +} + +static void +vpp_net_framework_set_handlers(void) +{ + /* Set up VPP handlers */ +#define _(N,n) \ + vl_msg_api_set_handlers(VL_API_##N, #n, \ + vl_api_##n##_t_handler, \ + vl_noop_handler, \ + vl_api_##n##_t_endian, \ + vl_api_##n##_t_print, \ + sizeof(vl_api_##n##_t), 1); + _(SESSION_ENABLE_DISABLE_REPLY, session_enable_disable_reply) \ + _(DISCONNECT_SESSION_REPLY, disconnect_session_reply) \ + _(APPLICATION_ATTACH_REPLY, application_attach_reply) \ + _(APPLICATION_DETACH_REPLY, application_detach_reply) \ + _(MAP_ANOTHER_SEGMENT, map_another_segment) +#undef _ +} + +static void +vpp_net_framework_init(void) +{ + char *app_name; + api_main_t *am = &api_main; + + clib_mem_init_thread_safe(0, SPDK_VPP_CLIB_MEM_SIZE); + svm_fifo_segment_main_init(&g_svm.segment_main, SPDK_VPP_SEGMENT_BASEVA, + SPDK_VPP_SEGMENT_TIMEOUT); + + app_name = spdk_sprintf_alloc("SPDK_%d", getpid()); + if (app_name == NULL) { + SPDK_ERRLOG("Cannot alloc memory for SPDK app name\n"); + return; + } + + vpp_net_framework_set_handlers(); + + if (vl_socket_client_connect((char *) API_SOCKET_FILE, app_name, + 0 /* default rx, tx buffer */)) { + SPDK_ERRLOG("Client \"%s\" failed to connect to the socket \"%s\".\n", + app_name, API_SOCKET_FILE); + goto err; + } + + if (vl_socket_client_init_shm(0, 0 /* want_pthread */)) { + SPDK_ERRLOG("SHM API initialization failed.\n"); + vl_socket_client_disconnect(); + goto err; + } + + g_svm.vl_input_queue = am->shmem_hdr->vl_input_queue; + g_svm.vl_output_queue = am->vl_input_queue; + + g_svm.my_client_index = am->my_client_index; + pthread_mutex_init(&g_svm.session_get_lock, NULL); + + free(app_name); + + g_svm.init_thread = spdk_get_thread(); + SPDK_NOTICELOG("Enable VPP session\n"); + + g_svm.vpp_queue_poller = SPDK_POLLER_REGISTER(vpp_queue_poller, NULL, 100); + + vpp_session_enable(1); + + return; + +err: + free(app_name); + spdk_net_framework_init_next(0); +} + +/****************************************************************************** + * Register components + */ +static struct spdk_net_impl g_vpp_net_impl = { + .name = "vpp", + .getaddr = vpp_sock_getaddr, + .connect = vpp_sock_connect, + .listen = vpp_sock_listen, + .accept = vpp_sock_accept, + .close = vpp_sock_close, + .recv = vpp_sock_recv, + .readv = vpp_sock_readv, + .writev = vpp_sock_writev, + .writev_async = vpp_sock_writev_async, + .set_recvlowat = vpp_sock_set_recvlowat, + .set_recvbuf = vpp_sock_set_recvbuf, + .set_sendbuf = vpp_sock_set_sendbuf, + .is_ipv6 = vpp_sock_is_ipv6, + .is_ipv4 = vpp_sock_is_ipv4, + .is_connected = vpp_sock_is_connected, + .get_placement_id = vpp_sock_get_placement_id, + .group_impl_create = vpp_sock_group_impl_create, + .group_impl_add_sock = vpp_sock_group_impl_add_sock, + .group_impl_remove_sock = vpp_sock_group_impl_remove_sock, + .group_impl_poll = vpp_sock_group_impl_poll, + .group_impl_close = vpp_sock_group_impl_close, +}; + +SPDK_NET_IMPL_REGISTER(vpp, &g_vpp_net_impl, DEFAULT_SOCK_PRIORITY + 2); + +static void +vpp_net_framework_fini(void) +{ + if (g_svm.vpp_initialized) { + vpp_app_detach(); + } else { + spdk_net_framework_fini_next(); + } +} + +static struct spdk_net_framework g_vpp_net_framework = { + .name = "vpp", + .init = vpp_net_framework_init, + .fini = vpp_net_framework_fini, +}; + +SPDK_NET_FRAMEWORK_REGISTER(vpp, &g_vpp_net_framework); + +SPDK_LOG_REGISTER_COMPONENT("sock_vpp", SPDK_SOCK_VPP) + +#endif /* __clang_analyzer__ */ |