summaryrefslogtreecommitdiffstats
path: root/src/spdk/module/sock
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-21 11:54:28 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-21 11:54:28 +0000
commite6918187568dbd01842d8d1d2c808ce16a894239 (patch)
tree64f88b554b444a49f656b6c656111a145cbbaa28 /src/spdk/module/sock
parentInitial commit. (diff)
downloadceph-e6918187568dbd01842d8d1d2c808ce16a894239.tar.xz
ceph-e6918187568dbd01842d8d1d2c808ce16a894239.zip
Adding upstream version 18.2.2.upstream/18.2.2
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'src/spdk/module/sock')
-rw-r--r--src/spdk/module/sock/Makefile48
-rw-r--r--src/spdk/module/sock/posix/Makefile45
-rw-r--r--src/spdk/module/sock/posix/posix.c1405
-rw-r--r--src/spdk/module/sock/uring/Makefile45
-rw-r--r--src/spdk/module/sock/uring/uring.c1328
-rw-r--r--src/spdk/module/sock/vpp/Makefile55
-rw-r--r--src/spdk/module/sock/vpp/vpp.c1633
7 files changed, 4559 insertions, 0 deletions
diff --git a/src/spdk/module/sock/Makefile b/src/spdk/module/sock/Makefile
new file mode 100644
index 000000000..865743d06
--- /dev/null
+++ b/src/spdk/module/sock/Makefile
@@ -0,0 +1,48 @@
+#
+# BSD LICENSE
+#
+# Copyright (c) Intel Corporation.
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions
+# are met:
+#
+# * Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+# * Redistributions in binary form must reproduce the above copyright
+# notice, this list of conditions and the following disclaimer in
+# the documentation and/or other materials provided with the
+# distribution.
+# * Neither the name of Intel Corporation nor the names of its
+# contributors may be used to endorse or promote products derived
+# from this software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+#
+
+SPDK_ROOT_DIR := $(abspath $(CURDIR)/../..)
+include $(SPDK_ROOT_DIR)/mk/spdk.common.mk
+
+DIRS-y = posix
+ifeq ($(OS), Linux)
+DIRS-$(CONFIG_URING) += uring
+endif
+DIRS-$(CONFIG_VPP) += vpp
+
+.PHONY: all clean $(DIRS-y)
+
+all: $(DIRS-y)
+clean: $(DIRS-y)
+
+include $(SPDK_ROOT_DIR)/mk/spdk.subdirs.mk
diff --git a/src/spdk/module/sock/posix/Makefile b/src/spdk/module/sock/posix/Makefile
new file mode 100644
index 000000000..9783e024d
--- /dev/null
+++ b/src/spdk/module/sock/posix/Makefile
@@ -0,0 +1,45 @@
+#
+# BSD LICENSE
+#
+# Copyright (c) Intel Corporation.
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions
+# are met:
+#
+# * Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+# * Redistributions in binary form must reproduce the above copyright
+# notice, this list of conditions and the following disclaimer in
+# the documentation and/or other materials provided with the
+# distribution.
+# * Neither the name of Intel Corporation nor the names of its
+# contributors may be used to endorse or promote products derived
+# from this software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+#
+
+SPDK_ROOT_DIR := $(abspath $(CURDIR)/../../..)
+include $(SPDK_ROOT_DIR)/mk/spdk.common.mk
+
+SO_VER := 2
+SO_MINOR := 0
+
+LIBNAME = sock_posix
+C_SRCS = posix.c
+
+SPDK_MAP_FILE = $(SPDK_ROOT_DIR)/mk/spdk_blank.map
+
+include $(SPDK_ROOT_DIR)/mk/spdk.lib.mk
diff --git a/src/spdk/module/sock/posix/posix.c b/src/spdk/module/sock/posix/posix.c
new file mode 100644
index 000000000..4eb1bf106
--- /dev/null
+++ b/src/spdk/module/sock/posix/posix.c
@@ -0,0 +1,1405 @@
+/*-
+ * BSD LICENSE
+ *
+ * Copyright (c) Intel Corporation. All rights reserved.
+ * Copyright (c) 2020 Mellanox Technologies LTD. All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions
+ * are met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above copyright
+ * notice, this list of conditions and the following disclaimer in
+ * the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Intel Corporation nor the names of its
+ * contributors may be used to endorse or promote products derived
+ * from this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+#include "spdk/stdinc.h"
+
+#if defined(__linux__)
+#include <sys/epoll.h>
+#include <linux/errqueue.h>
+#elif defined(__FreeBSD__)
+#include <sys/event.h>
+#endif
+
+#include "spdk/log.h"
+#include "spdk/pipe.h"
+#include "spdk/sock.h"
+#include "spdk/util.h"
+#include "spdk/likely.h"
+#include "spdk_internal/sock.h"
+
+#define MAX_TMPBUF 1024
+#define PORTNUMLEN 32
+#define MIN_SO_RCVBUF_SIZE (2 * 1024 * 1024)
+#define MIN_SO_SNDBUF_SIZE (2 * 1024 * 1024)
+#define IOV_BATCH_SIZE 64
+
+#if defined(SO_ZEROCOPY) && defined(MSG_ZEROCOPY)
+#define SPDK_ZEROCOPY
+#endif
+
+struct spdk_posix_sock {
+ struct spdk_sock base;
+ int fd;
+
+ uint32_t sendmsg_idx;
+ bool zcopy;
+
+ struct spdk_pipe *recv_pipe;
+ void *recv_buf;
+ int recv_buf_sz;
+ bool pending_recv;
+ int so_priority;
+
+ TAILQ_ENTRY(spdk_posix_sock) link;
+};
+
+struct spdk_posix_sock_group_impl {
+ struct spdk_sock_group_impl base;
+ int fd;
+ TAILQ_HEAD(, spdk_posix_sock) pending_recv;
+};
+
+static struct spdk_sock_impl_opts g_spdk_posix_sock_impl_opts = {
+ .recv_buf_size = MIN_SO_RCVBUF_SIZE,
+ .send_buf_size = MIN_SO_SNDBUF_SIZE,
+ .enable_recv_pipe = true,
+ .enable_zerocopy_send = true
+};
+
+static int
+get_addr_str(struct sockaddr *sa, char *host, size_t hlen)
+{
+ const char *result = NULL;
+
+ if (sa == NULL || host == NULL) {
+ return -1;
+ }
+
+ switch (sa->sa_family) {
+ case AF_INET:
+ result = inet_ntop(AF_INET, &(((struct sockaddr_in *)sa)->sin_addr),
+ host, hlen);
+ break;
+ case AF_INET6:
+ result = inet_ntop(AF_INET6, &(((struct sockaddr_in6 *)sa)->sin6_addr),
+ host, hlen);
+ break;
+ default:
+ break;
+ }
+
+ if (result != NULL) {
+ return 0;
+ } else {
+ return -1;
+ }
+}
+
+#define __posix_sock(sock) (struct spdk_posix_sock *)sock
+#define __posix_group_impl(group) (struct spdk_posix_sock_group_impl *)group
+
+static int
+posix_sock_getaddr(struct spdk_sock *_sock, char *saddr, int slen, uint16_t *sport,
+ char *caddr, int clen, uint16_t *cport)
+{
+ struct spdk_posix_sock *sock = __posix_sock(_sock);
+ struct sockaddr_storage sa;
+ socklen_t salen;
+ int rc;
+
+ assert(sock != NULL);
+
+ memset(&sa, 0, sizeof sa);
+ salen = sizeof sa;
+ rc = getsockname(sock->fd, (struct sockaddr *) &sa, &salen);
+ if (rc != 0) {
+ SPDK_ERRLOG("getsockname() failed (errno=%d)\n", errno);
+ return -1;
+ }
+
+ switch (sa.ss_family) {
+ case AF_UNIX:
+ /* Acceptable connection types that don't have IPs */
+ return 0;
+ case AF_INET:
+ case AF_INET6:
+ /* Code below will get IP addresses */
+ break;
+ default:
+ /* Unsupported socket family */
+ return -1;
+ }
+
+ rc = get_addr_str((struct sockaddr *)&sa, saddr, slen);
+ if (rc != 0) {
+ SPDK_ERRLOG("getnameinfo() failed (errno=%d)\n", errno);
+ return -1;
+ }
+
+ if (sport) {
+ if (sa.ss_family == AF_INET) {
+ *sport = ntohs(((struct sockaddr_in *) &sa)->sin_port);
+ } else if (sa.ss_family == AF_INET6) {
+ *sport = ntohs(((struct sockaddr_in6 *) &sa)->sin6_port);
+ }
+ }
+
+ memset(&sa, 0, sizeof sa);
+ salen = sizeof sa;
+ rc = getpeername(sock->fd, (struct sockaddr *) &sa, &salen);
+ if (rc != 0) {
+ SPDK_ERRLOG("getpeername() failed (errno=%d)\n", errno);
+ return -1;
+ }
+
+ rc = get_addr_str((struct sockaddr *)&sa, caddr, clen);
+ if (rc != 0) {
+ SPDK_ERRLOG("getnameinfo() failed (errno=%d)\n", errno);
+ return -1;
+ }
+
+ if (cport) {
+ if (sa.ss_family == AF_INET) {
+ *cport = ntohs(((struct sockaddr_in *) &sa)->sin_port);
+ } else if (sa.ss_family == AF_INET6) {
+ *cport = ntohs(((struct sockaddr_in6 *) &sa)->sin6_port);
+ }
+ }
+
+ return 0;
+}
+
+enum posix_sock_create_type {
+ SPDK_SOCK_CREATE_LISTEN,
+ SPDK_SOCK_CREATE_CONNECT,
+};
+
+static int
+posix_sock_alloc_pipe(struct spdk_posix_sock *sock, int sz)
+{
+ uint8_t *new_buf;
+ struct spdk_pipe *new_pipe;
+ struct iovec siov[2];
+ struct iovec diov[2];
+ int sbytes;
+ ssize_t bytes;
+
+ if (sock->recv_buf_sz == sz) {
+ return 0;
+ }
+
+ /* If the new size is 0, just free the pipe */
+ if (sz == 0) {
+ spdk_pipe_destroy(sock->recv_pipe);
+ free(sock->recv_buf);
+ sock->recv_pipe = NULL;
+ sock->recv_buf = NULL;
+ return 0;
+ } else if (sz < MIN_SOCK_PIPE_SIZE) {
+ SPDK_ERRLOG("The size of the pipe must be larger than %d\n", MIN_SOCK_PIPE_SIZE);
+ return -1;
+ }
+
+ /* Round up to next 64 byte multiple */
+ new_buf = calloc(SPDK_ALIGN_CEIL(sz + 1, 64), sizeof(uint8_t));
+ if (!new_buf) {
+ SPDK_ERRLOG("socket recv buf allocation failed\n");
+ return -ENOMEM;
+ }
+
+ new_pipe = spdk_pipe_create(new_buf, sz + 1);
+ if (new_pipe == NULL) {
+ SPDK_ERRLOG("socket pipe allocation failed\n");
+ free(new_buf);
+ return -ENOMEM;
+ }
+
+ if (sock->recv_pipe != NULL) {
+ /* Pull all of the data out of the old pipe */
+ sbytes = spdk_pipe_reader_get_buffer(sock->recv_pipe, sock->recv_buf_sz, siov);
+ if (sbytes > sz) {
+ /* Too much data to fit into the new pipe size */
+ spdk_pipe_destroy(new_pipe);
+ free(new_buf);
+ return -EINVAL;
+ }
+
+ sbytes = spdk_pipe_writer_get_buffer(new_pipe, sz, diov);
+ assert(sbytes == sz);
+
+ bytes = spdk_iovcpy(siov, 2, diov, 2);
+ spdk_pipe_writer_advance(new_pipe, bytes);
+
+ spdk_pipe_destroy(sock->recv_pipe);
+ free(sock->recv_buf);
+ }
+
+ sock->recv_buf_sz = sz;
+ sock->recv_buf = new_buf;
+ sock->recv_pipe = new_pipe;
+
+ return 0;
+}
+
+static int
+posix_sock_set_recvbuf(struct spdk_sock *_sock, int sz)
+{
+ struct spdk_posix_sock *sock = __posix_sock(_sock);
+ int rc;
+
+ assert(sock != NULL);
+
+ if (g_spdk_posix_sock_impl_opts.enable_recv_pipe) {
+ rc = posix_sock_alloc_pipe(sock, sz);
+ if (rc) {
+ return rc;
+ }
+ }
+
+ /* Set kernel buffer size to be at least MIN_SO_RCVBUF_SIZE */
+ if (sz < MIN_SO_RCVBUF_SIZE) {
+ sz = MIN_SO_RCVBUF_SIZE;
+ }
+
+ rc = setsockopt(sock->fd, SOL_SOCKET, SO_RCVBUF, &sz, sizeof(sz));
+ if (rc < 0) {
+ return rc;
+ }
+
+ return 0;
+}
+
+static int
+posix_sock_set_sendbuf(struct spdk_sock *_sock, int sz)
+{
+ struct spdk_posix_sock *sock = __posix_sock(_sock);
+ int rc;
+
+ assert(sock != NULL);
+
+ if (sz < MIN_SO_SNDBUF_SIZE) {
+ sz = MIN_SO_SNDBUF_SIZE;
+ }
+
+ rc = setsockopt(sock->fd, SOL_SOCKET, SO_SNDBUF, &sz, sizeof(sz));
+ if (rc < 0) {
+ return rc;
+ }
+
+ return 0;
+}
+
+static struct spdk_posix_sock *
+posix_sock_alloc(int fd, bool enable_zero_copy)
+{
+ struct spdk_posix_sock *sock;
+#ifdef SPDK_ZEROCOPY
+ int rc;
+ int flag;
+#endif
+
+ sock = calloc(1, sizeof(*sock));
+ if (sock == NULL) {
+ SPDK_ERRLOG("sock allocation failed\n");
+ return NULL;
+ }
+
+ sock->fd = fd;
+
+#ifdef SPDK_ZEROCOPY
+ if (!enable_zero_copy || !g_spdk_posix_sock_impl_opts.enable_zerocopy_send) {
+ return sock;
+ }
+
+ /* Try to turn on zero copy sends */
+ flag = 1;
+ rc = setsockopt(sock->fd, SOL_SOCKET, SO_ZEROCOPY, &flag, sizeof(flag));
+ if (rc == 0) {
+ sock->zcopy = true;
+ }
+#endif
+
+ return sock;
+}
+
+static bool
+sock_is_loopback(int fd)
+{
+ struct ifaddrs *addrs, *tmp;
+ struct sockaddr_storage sa = {};
+ socklen_t salen;
+ struct ifreq ifr = {};
+ char ip_addr[256], ip_addr_tmp[256];
+ int rc;
+ bool is_loopback = false;
+
+ salen = sizeof(sa);
+ rc = getsockname(fd, (struct sockaddr *)&sa, &salen);
+ if (rc != 0) {
+ return is_loopback;
+ }
+
+ memset(ip_addr, 0, sizeof(ip_addr));
+ rc = get_addr_str((struct sockaddr *)&sa, ip_addr, sizeof(ip_addr));
+ if (rc != 0) {
+ return is_loopback;
+ }
+
+ getifaddrs(&addrs);
+ for (tmp = addrs; tmp != NULL; tmp = tmp->ifa_next) {
+ if (tmp->ifa_addr && (tmp->ifa_flags & IFF_UP) &&
+ (tmp->ifa_addr->sa_family == sa.ss_family)) {
+ memset(ip_addr_tmp, 0, sizeof(ip_addr_tmp));
+ rc = get_addr_str(tmp->ifa_addr, ip_addr_tmp, sizeof(ip_addr_tmp));
+ if (rc != 0) {
+ continue;
+ }
+
+ if (strncmp(ip_addr, ip_addr_tmp, sizeof(ip_addr)) == 0) {
+ memcpy(ifr.ifr_name, tmp->ifa_name, sizeof(ifr.ifr_name));
+ ioctl(fd, SIOCGIFFLAGS, &ifr);
+ if (ifr.ifr_flags & IFF_LOOPBACK) {
+ is_loopback = true;
+ }
+ goto end;
+ }
+ }
+ }
+
+end:
+ freeifaddrs(addrs);
+ return is_loopback;
+}
+
+static struct spdk_sock *
+posix_sock_create(const char *ip, int port,
+ enum posix_sock_create_type type,
+ struct spdk_sock_opts *opts)
+{
+ struct spdk_posix_sock *sock;
+ char buf[MAX_TMPBUF];
+ char portnum[PORTNUMLEN];
+ char *p;
+ struct addrinfo hints, *res, *res0;
+ int fd, flag;
+ int val = 1;
+ int rc, sz;
+ bool enable_zero_copy = true;
+
+ if (ip == NULL) {
+ return NULL;
+ }
+ if (ip[0] == '[') {
+ snprintf(buf, sizeof(buf), "%s", ip + 1);
+ p = strchr(buf, ']');
+ if (p != NULL) {
+ *p = '\0';
+ }
+ ip = (const char *) &buf[0];
+ }
+
+ snprintf(portnum, sizeof portnum, "%d", port);
+ memset(&hints, 0, sizeof hints);
+ hints.ai_family = PF_UNSPEC;
+ hints.ai_socktype = SOCK_STREAM;
+ hints.ai_flags = AI_NUMERICSERV;
+ hints.ai_flags |= AI_PASSIVE;
+ hints.ai_flags |= AI_NUMERICHOST;
+ rc = getaddrinfo(ip, portnum, &hints, &res0);
+ if (rc != 0) {
+ SPDK_ERRLOG("getaddrinfo() failed (errno=%d)\n", errno);
+ return NULL;
+ }
+
+ /* try listen */
+ fd = -1;
+ for (res = res0; res != NULL; res = res->ai_next) {
+retry:
+ fd = socket(res->ai_family, res->ai_socktype, res->ai_protocol);
+ if (fd < 0) {
+ /* error */
+ continue;
+ }
+
+ sz = g_spdk_posix_sock_impl_opts.recv_buf_size;
+ rc = setsockopt(fd, SOL_SOCKET, SO_RCVBUF, &sz, sizeof(sz));
+ if (rc) {
+ /* Not fatal */
+ }
+
+ sz = g_spdk_posix_sock_impl_opts.send_buf_size;
+ rc = setsockopt(fd, SOL_SOCKET, SO_SNDBUF, &sz, sizeof(sz));
+ if (rc) {
+ /* Not fatal */
+ }
+
+ rc = setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &val, sizeof val);
+ if (rc != 0) {
+ close(fd);
+ /* error */
+ continue;
+ }
+ rc = setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &val, sizeof val);
+ if (rc != 0) {
+ close(fd);
+ /* error */
+ continue;
+ }
+
+#if defined(SO_PRIORITY)
+ if (opts != NULL && opts->priority) {
+ rc = setsockopt(fd, SOL_SOCKET, SO_PRIORITY, &opts->priority, sizeof val);
+ if (rc != 0) {
+ close(fd);
+ /* error */
+ continue;
+ }
+ }
+#endif
+
+ if (res->ai_family == AF_INET6) {
+ rc = setsockopt(fd, IPPROTO_IPV6, IPV6_V6ONLY, &val, sizeof val);
+ if (rc != 0) {
+ close(fd);
+ /* error */
+ continue;
+ }
+ }
+
+ if (type == SPDK_SOCK_CREATE_LISTEN) {
+ rc = bind(fd, res->ai_addr, res->ai_addrlen);
+ if (rc != 0) {
+ SPDK_ERRLOG("bind() failed at port %d, errno = %d\n", port, errno);
+ switch (errno) {
+ case EINTR:
+ /* interrupted? */
+ close(fd);
+ goto retry;
+ case EADDRNOTAVAIL:
+ SPDK_ERRLOG("IP address %s not available. "
+ "Verify IP address in config file "
+ "and make sure setup script is "
+ "run before starting spdk app.\n", ip);
+ /* FALLTHROUGH */
+ default:
+ /* try next family */
+ close(fd);
+ fd = -1;
+ continue;
+ }
+ }
+ /* bind OK */
+ rc = listen(fd, 512);
+ if (rc != 0) {
+ SPDK_ERRLOG("listen() failed, errno = %d\n", errno);
+ close(fd);
+ fd = -1;
+ break;
+ }
+ } else if (type == SPDK_SOCK_CREATE_CONNECT) {
+ rc = connect(fd, res->ai_addr, res->ai_addrlen);
+ if (rc != 0) {
+ SPDK_ERRLOG("connect() failed, errno = %d\n", errno);
+ /* try next family */
+ close(fd);
+ fd = -1;
+ continue;
+ }
+ }
+
+ flag = fcntl(fd, F_GETFL);
+ if (fcntl(fd, F_SETFL, flag | O_NONBLOCK) < 0) {
+ SPDK_ERRLOG("fcntl can't set nonblocking mode for socket, fd: %d (%d)\n", fd, errno);
+ close(fd);
+ fd = -1;
+ break;
+ }
+ break;
+ }
+ freeaddrinfo(res0);
+
+ if (fd < 0) {
+ return NULL;
+ }
+
+ if (type == SPDK_SOCK_CREATE_LISTEN) {
+ /* Only enable zero copy for non-loopback sockets. */
+ enable_zero_copy = !sock_is_loopback(fd);
+ } else if (type == SPDK_SOCK_CREATE_CONNECT) {
+ /* Disable zero copy for client sockets until support is added */
+ enable_zero_copy = false;
+ }
+
+ sock = posix_sock_alloc(fd, enable_zero_copy);
+ if (sock == NULL) {
+ SPDK_ERRLOG("sock allocation failed\n");
+ close(fd);
+ return NULL;
+ }
+
+ if (opts != NULL) {
+ sock->so_priority = opts->priority;
+ }
+ return &sock->base;
+}
+
+static struct spdk_sock *
+posix_sock_listen(const char *ip, int port, struct spdk_sock_opts *opts)
+{
+ return posix_sock_create(ip, port, SPDK_SOCK_CREATE_LISTEN, opts);
+}
+
+static struct spdk_sock *
+posix_sock_connect(const char *ip, int port, struct spdk_sock_opts *opts)
+{
+ return posix_sock_create(ip, port, SPDK_SOCK_CREATE_CONNECT, opts);
+}
+
+static struct spdk_sock *
+posix_sock_accept(struct spdk_sock *_sock)
+{
+ struct spdk_posix_sock *sock = __posix_sock(_sock);
+ struct sockaddr_storage sa;
+ socklen_t salen;
+ int rc, fd;
+ struct spdk_posix_sock *new_sock;
+ int flag;
+
+ memset(&sa, 0, sizeof(sa));
+ salen = sizeof(sa);
+
+ assert(sock != NULL);
+
+ rc = accept(sock->fd, (struct sockaddr *)&sa, &salen);
+
+ if (rc == -1) {
+ return NULL;
+ }
+
+ fd = rc;
+
+ flag = fcntl(fd, F_GETFL);
+ if ((!(flag & O_NONBLOCK)) && (fcntl(fd, F_SETFL, flag | O_NONBLOCK) < 0)) {
+ SPDK_ERRLOG("fcntl can't set nonblocking mode for socket, fd: %d (%d)\n", fd, errno);
+ close(fd);
+ return NULL;
+ }
+
+#if defined(SO_PRIORITY)
+ /* The priority is not inherited, so call this function again */
+ if (sock->base.opts.priority) {
+ rc = setsockopt(fd, SOL_SOCKET, SO_PRIORITY, &sock->base.opts.priority, sizeof(int));
+ if (rc != 0) {
+ close(fd);
+ return NULL;
+ }
+ }
+#endif
+
+ /* Inherit the zero copy feature from the listen socket */
+ new_sock = posix_sock_alloc(fd, sock->zcopy);
+ if (new_sock == NULL) {
+ close(fd);
+ return NULL;
+ }
+ new_sock->so_priority = sock->base.opts.priority;
+
+ return &new_sock->base;
+}
+
+static int
+posix_sock_close(struct spdk_sock *_sock)
+{
+ struct spdk_posix_sock *sock = __posix_sock(_sock);
+
+ assert(TAILQ_EMPTY(&_sock->pending_reqs));
+
+ /* If the socket fails to close, the best choice is to
+ * leak the fd but continue to free the rest of the sock
+ * memory. */
+ close(sock->fd);
+
+ spdk_pipe_destroy(sock->recv_pipe);
+ free(sock->recv_buf);
+ free(sock);
+
+ return 0;
+}
+
+#ifdef SPDK_ZEROCOPY
+static int
+_sock_check_zcopy(struct spdk_sock *sock)
+{
+ struct spdk_posix_sock *psock = __posix_sock(sock);
+ struct msghdr msgh = {};
+ uint8_t buf[sizeof(struct cmsghdr) + sizeof(struct sock_extended_err)];
+ ssize_t rc;
+ struct sock_extended_err *serr;
+ struct cmsghdr *cm;
+ uint32_t idx;
+ struct spdk_sock_request *req, *treq;
+ bool found;
+
+ msgh.msg_control = buf;
+ msgh.msg_controllen = sizeof(buf);
+
+ while (true) {
+ rc = recvmsg(psock->fd, &msgh, MSG_ERRQUEUE);
+
+ if (rc < 0) {
+ if (errno == EWOULDBLOCK || errno == EAGAIN) {
+ return 0;
+ }
+
+ if (!TAILQ_EMPTY(&sock->pending_reqs)) {
+ SPDK_ERRLOG("Attempting to receive from ERRQUEUE yielded error, but pending list still has orphaned entries\n");
+ } else {
+ SPDK_WARNLOG("Recvmsg yielded an error!\n");
+ }
+ return 0;
+ }
+
+ cm = CMSG_FIRSTHDR(&msgh);
+ if (!cm || cm->cmsg_level != SOL_IP || cm->cmsg_type != IP_RECVERR) {
+ SPDK_WARNLOG("Unexpected cmsg level or type!\n");
+ return 0;
+ }
+
+ serr = (struct sock_extended_err *)CMSG_DATA(cm);
+ if (serr->ee_errno != 0 || serr->ee_origin != SO_EE_ORIGIN_ZEROCOPY) {
+ SPDK_WARNLOG("Unexpected extended error origin\n");
+ return 0;
+ }
+
+ /* Most of the time, the pending_reqs array is in the exact
+ * order we need such that all of the requests to complete are
+ * in order, in the front. It is guaranteed that all requests
+ * belonging to the same sendmsg call are sequential, so once
+ * we encounter one match we can stop looping as soon as a
+ * non-match is found.
+ */
+ for (idx = serr->ee_info; idx <= serr->ee_data; idx++) {
+ found = false;
+ TAILQ_FOREACH_SAFE(req, &sock->pending_reqs, internal.link, treq) {
+ if (req->internal.offset == idx) {
+ found = true;
+
+ rc = spdk_sock_request_put(sock, req, 0);
+ if (rc < 0) {
+ return rc;
+ }
+
+ } else if (found) {
+ break;
+ }
+ }
+
+ }
+ }
+
+ return 0;
+}
+#endif
+
+static int
+_sock_flush(struct spdk_sock *sock)
+{
+ struct spdk_posix_sock *psock = __posix_sock(sock);
+ struct msghdr msg = {};
+ int flags;
+ struct iovec iovs[IOV_BATCH_SIZE];
+ int iovcnt;
+ int retval;
+ struct spdk_sock_request *req;
+ int i;
+ ssize_t rc;
+ unsigned int offset;
+ size_t len;
+
+ /* Can't flush from within a callback or we end up with recursive calls */
+ if (sock->cb_cnt > 0) {
+ return 0;
+ }
+
+ /* Gather an iov */
+ iovcnt = 0;
+ req = TAILQ_FIRST(&sock->queued_reqs);
+ while (req) {
+ offset = req->internal.offset;
+
+ for (i = 0; i < req->iovcnt; i++) {
+ /* Consume any offset first */
+ if (offset >= SPDK_SOCK_REQUEST_IOV(req, i)->iov_len) {
+ offset -= SPDK_SOCK_REQUEST_IOV(req, i)->iov_len;
+ continue;
+ }
+
+ iovs[iovcnt].iov_base = SPDK_SOCK_REQUEST_IOV(req, i)->iov_base + offset;
+ iovs[iovcnt].iov_len = SPDK_SOCK_REQUEST_IOV(req, i)->iov_len - offset;
+ iovcnt++;
+
+ offset = 0;
+
+ if (iovcnt >= IOV_BATCH_SIZE) {
+ break;
+ }
+ }
+
+ if (iovcnt >= IOV_BATCH_SIZE) {
+ break;
+ }
+
+ req = TAILQ_NEXT(req, internal.link);
+ }
+
+ if (iovcnt == 0) {
+ return 0;
+ }
+
+ /* Perform the vectored write */
+ msg.msg_iov = iovs;
+ msg.msg_iovlen = iovcnt;
+#ifdef SPDK_ZEROCOPY
+ if (psock->zcopy) {
+ flags = MSG_ZEROCOPY;
+ } else
+#endif
+ {
+ flags = 0;
+ }
+ rc = sendmsg(psock->fd, &msg, flags);
+ if (rc <= 0) {
+ if (errno == EAGAIN || errno == EWOULDBLOCK) {
+ return 0;
+ }
+ return rc;
+ }
+
+ psock->sendmsg_idx++;
+
+ /* Consume the requests that were actually written */
+ req = TAILQ_FIRST(&sock->queued_reqs);
+ while (req) {
+ offset = req->internal.offset;
+
+ for (i = 0; i < req->iovcnt; i++) {
+ /* Advance by the offset first */
+ if (offset >= SPDK_SOCK_REQUEST_IOV(req, i)->iov_len) {
+ offset -= SPDK_SOCK_REQUEST_IOV(req, i)->iov_len;
+ continue;
+ }
+
+ /* Calculate the remaining length of this element */
+ len = SPDK_SOCK_REQUEST_IOV(req, i)->iov_len - offset;
+
+ if (len > (size_t)rc) {
+ /* This element was partially sent. */
+ req->internal.offset += rc;
+ return 0;
+ }
+
+ offset = 0;
+ req->internal.offset += len;
+ rc -= len;
+ }
+
+ /* Handled a full request. */
+ spdk_sock_request_pend(sock, req);
+
+ if (!psock->zcopy) {
+ /* The sendmsg syscall above isn't currently asynchronous,
+ * so it's already done. */
+ retval = spdk_sock_request_put(sock, req, 0);
+ if (retval) {
+ break;
+ }
+ } else {
+ /* Re-use the offset field to hold the sendmsg call index. The
+ * index is 0 based, so subtract one here because we've already
+ * incremented above. */
+ req->internal.offset = psock->sendmsg_idx - 1;
+ }
+
+ if (rc == 0) {
+ break;
+ }
+
+ req = TAILQ_FIRST(&sock->queued_reqs);
+ }
+
+ return 0;
+}
+
+static int
+posix_sock_flush(struct spdk_sock *_sock)
+{
+ return _sock_flush(_sock);
+}
+
+static ssize_t
+posix_sock_recv_from_pipe(struct spdk_posix_sock *sock, struct iovec *diov, int diovcnt)
+{
+ struct iovec siov[2];
+ int sbytes;
+ ssize_t bytes;
+ struct spdk_posix_sock_group_impl *group;
+
+ sbytes = spdk_pipe_reader_get_buffer(sock->recv_pipe, sock->recv_buf_sz, siov);
+ if (sbytes < 0) {
+ errno = EINVAL;
+ return -1;
+ } else if (sbytes == 0) {
+ errno = EAGAIN;
+ return -1;
+ }
+
+ bytes = spdk_iovcpy(siov, 2, diov, diovcnt);
+
+ if (bytes == 0) {
+ /* The only way this happens is if diov is 0 length */
+ errno = EINVAL;
+ return -1;
+ }
+
+ spdk_pipe_reader_advance(sock->recv_pipe, bytes);
+
+ /* If we drained the pipe, take it off the level-triggered list */
+ if (sock->base.group_impl && spdk_pipe_reader_bytes_available(sock->recv_pipe) == 0) {
+ group = __posix_group_impl(sock->base.group_impl);
+ TAILQ_REMOVE(&group->pending_recv, sock, link);
+ sock->pending_recv = false;
+ }
+
+ return bytes;
+}
+
+static inline ssize_t
+posix_sock_read(struct spdk_posix_sock *sock)
+{
+ struct iovec iov[2];
+ int bytes;
+ struct spdk_posix_sock_group_impl *group;
+
+ bytes = spdk_pipe_writer_get_buffer(sock->recv_pipe, sock->recv_buf_sz, iov);
+
+ if (bytes > 0) {
+ bytes = readv(sock->fd, iov, 2);
+ if (bytes > 0) {
+ spdk_pipe_writer_advance(sock->recv_pipe, bytes);
+ if (sock->base.group_impl) {
+ group = __posix_group_impl(sock->base.group_impl);
+ TAILQ_INSERT_TAIL(&group->pending_recv, sock, link);
+ sock->pending_recv = true;
+ }
+ }
+ }
+
+ return bytes;
+}
+
+static ssize_t
+posix_sock_readv(struct spdk_sock *_sock, struct iovec *iov, int iovcnt)
+{
+ struct spdk_posix_sock *sock = __posix_sock(_sock);
+ int rc, i;
+ size_t len;
+
+ if (sock->recv_pipe == NULL) {
+ return readv(sock->fd, iov, iovcnt);
+ }
+
+ len = 0;
+ for (i = 0; i < iovcnt; i++) {
+ len += iov[i].iov_len;
+ }
+
+ if (spdk_pipe_reader_bytes_available(sock->recv_pipe) == 0) {
+ /* If the user is receiving a sufficiently large amount of data,
+ * receive directly to their buffers. */
+ if (len >= MIN_SOCK_PIPE_SIZE) {
+ return readv(sock->fd, iov, iovcnt);
+ }
+
+ /* Otherwise, do a big read into our pipe */
+ rc = posix_sock_read(sock);
+ if (rc <= 0) {
+ return rc;
+ }
+ }
+
+ return posix_sock_recv_from_pipe(sock, iov, iovcnt);
+}
+
+static ssize_t
+posix_sock_recv(struct spdk_sock *sock, void *buf, size_t len)
+{
+ struct iovec iov[1];
+
+ iov[0].iov_base = buf;
+ iov[0].iov_len = len;
+
+ return posix_sock_readv(sock, iov, 1);
+}
+
+static ssize_t
+posix_sock_writev(struct spdk_sock *_sock, struct iovec *iov, int iovcnt)
+{
+ struct spdk_posix_sock *sock = __posix_sock(_sock);
+ int rc;
+
+ /* In order to process a writev, we need to flush any asynchronous writes
+ * first. */
+ rc = _sock_flush(_sock);
+ if (rc < 0) {
+ return rc;
+ }
+
+ if (!TAILQ_EMPTY(&_sock->queued_reqs)) {
+ /* We weren't able to flush all requests */
+ errno = EAGAIN;
+ return -1;
+ }
+
+ return writev(sock->fd, iov, iovcnt);
+}
+
+static void
+posix_sock_writev_async(struct spdk_sock *sock, struct spdk_sock_request *req)
+{
+ int rc;
+
+ spdk_sock_request_queue(sock, req);
+
+ /* If there are a sufficient number queued, just flush them out immediately. */
+ if (sock->queued_iovcnt >= IOV_BATCH_SIZE) {
+ rc = _sock_flush(sock);
+ if (rc) {
+ spdk_sock_abort_requests(sock);
+ }
+ }
+}
+
+static int
+posix_sock_set_recvlowat(struct spdk_sock *_sock, int nbytes)
+{
+ struct spdk_posix_sock *sock = __posix_sock(_sock);
+ int val;
+ int rc;
+
+ assert(sock != NULL);
+
+ val = nbytes;
+ rc = setsockopt(sock->fd, SOL_SOCKET, SO_RCVLOWAT, &val, sizeof val);
+ if (rc != 0) {
+ return -1;
+ }
+ return 0;
+}
+
+static bool
+posix_sock_is_ipv6(struct spdk_sock *_sock)
+{
+ struct spdk_posix_sock *sock = __posix_sock(_sock);
+ struct sockaddr_storage sa;
+ socklen_t salen;
+ int rc;
+
+ assert(sock != NULL);
+
+ memset(&sa, 0, sizeof sa);
+ salen = sizeof sa;
+ rc = getsockname(sock->fd, (struct sockaddr *) &sa, &salen);
+ if (rc != 0) {
+ SPDK_ERRLOG("getsockname() failed (errno=%d)\n", errno);
+ return false;
+ }
+
+ return (sa.ss_family == AF_INET6);
+}
+
+static bool
+posix_sock_is_ipv4(struct spdk_sock *_sock)
+{
+ struct spdk_posix_sock *sock = __posix_sock(_sock);
+ struct sockaddr_storage sa;
+ socklen_t salen;
+ int rc;
+
+ assert(sock != NULL);
+
+ memset(&sa, 0, sizeof sa);
+ salen = sizeof sa;
+ rc = getsockname(sock->fd, (struct sockaddr *) &sa, &salen);
+ if (rc != 0) {
+ SPDK_ERRLOG("getsockname() failed (errno=%d)\n", errno);
+ return false;
+ }
+
+ return (sa.ss_family == AF_INET);
+}
+
+static bool
+posix_sock_is_connected(struct spdk_sock *_sock)
+{
+ struct spdk_posix_sock *sock = __posix_sock(_sock);
+ uint8_t byte;
+ int rc;
+
+ rc = recv(sock->fd, &byte, 1, MSG_PEEK);
+ if (rc == 0) {
+ return false;
+ }
+
+ if (rc < 0) {
+ if (errno == EAGAIN || errno == EWOULDBLOCK) {
+ return true;
+ }
+
+ return false;
+ }
+
+ return true;
+}
+
+static int
+posix_sock_get_placement_id(struct spdk_sock *_sock, int *placement_id)
+{
+ int rc = -1;
+
+#if defined(SO_INCOMING_NAPI_ID)
+ struct spdk_posix_sock *sock = __posix_sock(_sock);
+ socklen_t salen = sizeof(int);
+
+ rc = getsockopt(sock->fd, SOL_SOCKET, SO_INCOMING_NAPI_ID, placement_id, &salen);
+ if (rc != 0) {
+ SPDK_ERRLOG("getsockopt() failed (errno=%d)\n", errno);
+ }
+
+#endif
+ return rc;
+}
+
+static struct spdk_sock_group_impl *
+posix_sock_group_impl_create(void)
+{
+ struct spdk_posix_sock_group_impl *group_impl;
+ int fd;
+
+#if defined(__linux__)
+ fd = epoll_create1(0);
+#elif defined(__FreeBSD__)
+ fd = kqueue();
+#endif
+ if (fd == -1) {
+ return NULL;
+ }
+
+ group_impl = calloc(1, sizeof(*group_impl));
+ if (group_impl == NULL) {
+ SPDK_ERRLOG("group_impl allocation failed\n");
+ close(fd);
+ return NULL;
+ }
+
+ group_impl->fd = fd;
+ TAILQ_INIT(&group_impl->pending_recv);
+
+ return &group_impl->base;
+}
+
+static int
+posix_sock_group_impl_add_sock(struct spdk_sock_group_impl *_group, struct spdk_sock *_sock)
+{
+ struct spdk_posix_sock_group_impl *group = __posix_group_impl(_group);
+ struct spdk_posix_sock *sock = __posix_sock(_sock);
+ int rc;
+
+#if defined(__linux__)
+ struct epoll_event event;
+
+ memset(&event, 0, sizeof(event));
+ /* EPOLLERR is always on even if we don't set it, but be explicit for clarity */
+ event.events = EPOLLIN | EPOLLERR;
+ event.data.ptr = sock;
+
+ rc = epoll_ctl(group->fd, EPOLL_CTL_ADD, sock->fd, &event);
+#elif defined(__FreeBSD__)
+ struct kevent event;
+ struct timespec ts = {0};
+
+ EV_SET(&event, sock->fd, EVFILT_READ, EV_ADD, 0, 0, sock);
+
+ rc = kevent(group->fd, &event, 1, NULL, 0, &ts);
+#endif
+
+ /* switched from another polling group due to scheduling */
+ if (spdk_unlikely(sock->recv_pipe != NULL &&
+ (spdk_pipe_reader_bytes_available(sock->recv_pipe) > 0))) {
+ assert(sock->pending_recv == false);
+ sock->pending_recv = true;
+ TAILQ_INSERT_TAIL(&group->pending_recv, sock, link);
+ }
+
+ return rc;
+}
+
+static int
+posix_sock_group_impl_remove_sock(struct spdk_sock_group_impl *_group, struct spdk_sock *_sock)
+{
+ struct spdk_posix_sock_group_impl *group = __posix_group_impl(_group);
+ struct spdk_posix_sock *sock = __posix_sock(_sock);
+ int rc;
+
+ if (sock->recv_pipe != NULL) {
+ if (spdk_pipe_reader_bytes_available(sock->recv_pipe) > 0) {
+ TAILQ_REMOVE(&group->pending_recv, sock, link);
+ sock->pending_recv = false;
+ }
+ assert(sock->pending_recv == false);
+ }
+
+#if defined(__linux__)
+ struct epoll_event event;
+
+ /* Event parameter is ignored but some old kernel version still require it. */
+ rc = epoll_ctl(group->fd, EPOLL_CTL_DEL, sock->fd, &event);
+#elif defined(__FreeBSD__)
+ struct kevent event;
+ struct timespec ts = {0};
+
+ EV_SET(&event, sock->fd, EVFILT_READ, EV_DELETE, 0, 0, NULL);
+
+ rc = kevent(group->fd, &event, 1, NULL, 0, &ts);
+ if (rc == 0 && event.flags & EV_ERROR) {
+ rc = -1;
+ errno = event.data;
+ }
+#endif
+
+ spdk_sock_abort_requests(_sock);
+
+ return rc;
+}
+
+static int
+posix_sock_group_impl_poll(struct spdk_sock_group_impl *_group, int max_events,
+ struct spdk_sock **socks)
+{
+ struct spdk_posix_sock_group_impl *group = __posix_group_impl(_group);
+ struct spdk_sock *sock, *tmp;
+ int num_events, i, rc;
+ struct spdk_posix_sock *psock, *ptmp;
+#if defined(__linux__)
+ struct epoll_event events[MAX_EVENTS_PER_POLL];
+#elif defined(__FreeBSD__)
+ struct kevent events[MAX_EVENTS_PER_POLL];
+ struct timespec ts = {0};
+#endif
+
+ /* This must be a TAILQ_FOREACH_SAFE because while flushing,
+ * a completion callback could remove the sock from the
+ * group. */
+ TAILQ_FOREACH_SAFE(sock, &_group->socks, link, tmp) {
+ rc = _sock_flush(sock);
+ if (rc) {
+ spdk_sock_abort_requests(sock);
+ }
+ }
+
+#if defined(__linux__)
+ num_events = epoll_wait(group->fd, events, max_events, 0);
+#elif defined(__FreeBSD__)
+ num_events = kevent(group->fd, NULL, 0, events, max_events, &ts);
+#endif
+
+ if (num_events == -1) {
+ return -1;
+ } else if (num_events == 0 && !TAILQ_EMPTY(&_group->socks)) {
+ uint8_t byte;
+
+ sock = TAILQ_FIRST(&_group->socks);
+ psock = __posix_sock(sock);
+ /* a recv is done here to busy poll the queue associated with
+ * first socket in list and potentially reap incoming data.
+ */
+ if (psock->so_priority) {
+ recv(psock->fd, &byte, 1, MSG_PEEK);
+ }
+ }
+
+ for (i = 0; i < num_events; i++) {
+#if defined(__linux__)
+ sock = events[i].data.ptr;
+ psock = __posix_sock(sock);
+
+#ifdef SPDK_ZEROCOPY
+ if (events[i].events & EPOLLERR) {
+ rc = _sock_check_zcopy(sock);
+ /* If the socket was closed or removed from
+ * the group in response to a send ack, don't
+ * add it to the array here. */
+ if (rc || sock->cb_fn == NULL) {
+ continue;
+ }
+ }
+#endif
+ if ((events[i].events & EPOLLIN) == 0) {
+ continue;
+ }
+
+#elif defined(__FreeBSD__)
+ sock = events[i].udata;
+ psock = __posix_sock(sock);
+#endif
+
+ /* If the socket does not already have recv pending, add it now */
+ if (!psock->pending_recv) {
+ psock->pending_recv = true;
+ TAILQ_INSERT_TAIL(&group->pending_recv, psock, link);
+ }
+ }
+
+ num_events = 0;
+
+ TAILQ_FOREACH_SAFE(psock, &group->pending_recv, link, ptmp) {
+ if (num_events == max_events) {
+ break;
+ }
+
+ socks[num_events++] = &psock->base;
+ }
+
+ /* Cycle the pending_recv list so that each time we poll things aren't
+ * in the same order. */
+ for (i = 0; i < num_events; i++) {
+ psock = __posix_sock(socks[i]);
+
+ TAILQ_REMOVE(&group->pending_recv, psock, link);
+
+ if (psock->recv_pipe == NULL || spdk_pipe_reader_bytes_available(psock->recv_pipe) == 0) {
+ psock->pending_recv = false;
+ } else {
+ TAILQ_INSERT_TAIL(&group->pending_recv, psock, link);
+ }
+
+ }
+
+ return num_events;
+}
+
+static int
+posix_sock_group_impl_close(struct spdk_sock_group_impl *_group)
+{
+ struct spdk_posix_sock_group_impl *group = __posix_group_impl(_group);
+ int rc;
+
+ rc = close(group->fd);
+ free(group);
+ return rc;
+}
+
+static int
+posix_sock_impl_get_opts(struct spdk_sock_impl_opts *opts, size_t *len)
+{
+ if (!opts || !len) {
+ errno = EINVAL;
+ return -1;
+ }
+
+#define FIELD_OK(field) \
+ offsetof(struct spdk_sock_impl_opts, field) + sizeof(opts->field) <= *len
+
+#define GET_FIELD(field) \
+ if (FIELD_OK(field)) { \
+ opts->field = g_spdk_posix_sock_impl_opts.field; \
+ }
+
+ GET_FIELD(recv_buf_size);
+ GET_FIELD(send_buf_size);
+ GET_FIELD(enable_recv_pipe);
+ GET_FIELD(enable_zerocopy_send);
+
+#undef GET_FIELD
+#undef FIELD_OK
+
+ *len = spdk_min(*len, sizeof(g_spdk_posix_sock_impl_opts));
+ return 0;
+}
+
+static int
+posix_sock_impl_set_opts(const struct spdk_sock_impl_opts *opts, size_t len)
+{
+ if (!opts) {
+ errno = EINVAL;
+ return -1;
+ }
+
+#define FIELD_OK(field) \
+ offsetof(struct spdk_sock_impl_opts, field) + sizeof(opts->field) <= len
+
+#define SET_FIELD(field) \
+ if (FIELD_OK(field)) { \
+ g_spdk_posix_sock_impl_opts.field = opts->field; \
+ }
+
+ SET_FIELD(recv_buf_size);
+ SET_FIELD(send_buf_size);
+ SET_FIELD(enable_recv_pipe);
+ SET_FIELD(enable_zerocopy_send);
+
+#undef SET_FIELD
+#undef FIELD_OK
+
+ return 0;
+}
+
+
+static struct spdk_net_impl g_posix_net_impl = {
+ .name = "posix",
+ .getaddr = posix_sock_getaddr,
+ .connect = posix_sock_connect,
+ .listen = posix_sock_listen,
+ .accept = posix_sock_accept,
+ .close = posix_sock_close,
+ .recv = posix_sock_recv,
+ .readv = posix_sock_readv,
+ .writev = posix_sock_writev,
+ .writev_async = posix_sock_writev_async,
+ .flush = posix_sock_flush,
+ .set_recvlowat = posix_sock_set_recvlowat,
+ .set_recvbuf = posix_sock_set_recvbuf,
+ .set_sendbuf = posix_sock_set_sendbuf,
+ .is_ipv6 = posix_sock_is_ipv6,
+ .is_ipv4 = posix_sock_is_ipv4,
+ .is_connected = posix_sock_is_connected,
+ .get_placement_id = posix_sock_get_placement_id,
+ .group_impl_create = posix_sock_group_impl_create,
+ .group_impl_add_sock = posix_sock_group_impl_add_sock,
+ .group_impl_remove_sock = posix_sock_group_impl_remove_sock,
+ .group_impl_poll = posix_sock_group_impl_poll,
+ .group_impl_close = posix_sock_group_impl_close,
+ .get_opts = posix_sock_impl_get_opts,
+ .set_opts = posix_sock_impl_set_opts,
+};
+
+SPDK_NET_IMPL_REGISTER(posix, &g_posix_net_impl, DEFAULT_SOCK_PRIORITY);
diff --git a/src/spdk/module/sock/uring/Makefile b/src/spdk/module/sock/uring/Makefile
new file mode 100644
index 000000000..2d0e7c4e2
--- /dev/null
+++ b/src/spdk/module/sock/uring/Makefile
@@ -0,0 +1,45 @@
+#
+# BSD LICENSE
+#
+# Copyright (c) Intel Corporation.
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions
+# are met:
+#
+# * Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+# * Redistributions in binary form must reproduce the above copyright
+# notice, this list of conditions and the following disclaimer in
+# the documentation and/or other materials provided with the
+# distribution.
+# * Neither the name of Intel Corporation nor the names of its
+# contributors may be used to endorse or promote products derived
+# from this software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+#
+
+SPDK_ROOT_DIR := $(abspath $(CURDIR)/../../..)
+include $(SPDK_ROOT_DIR)/mk/spdk.common.mk
+
+SO_VER := 1
+SO_MINOR := 0
+
+LIBNAME = sock_uring
+C_SRCS = uring.c
+
+SPDK_MAP_FILE = $(SPDK_ROOT_DIR)/mk/spdk_blank.map
+
+include $(SPDK_ROOT_DIR)/mk/spdk.lib.mk
diff --git a/src/spdk/module/sock/uring/uring.c b/src/spdk/module/sock/uring/uring.c
new file mode 100644
index 000000000..3066f2d16
--- /dev/null
+++ b/src/spdk/module/sock/uring/uring.c
@@ -0,0 +1,1328 @@
+/*-
+ * BSD LICENSE
+ *
+ * Copyright (c) Intel Corporation.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions
+ * are met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above copyright
+ * notice, this list of conditions and the following disclaimer in
+ * the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Intel Corporation nor the names of its
+ * contributors may be used to endorse or promote products derived
+ * from this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+#include "spdk/stdinc.h"
+#include "spdk/config.h"
+
+#include <sys/epoll.h>
+#include <liburing.h>
+
+#include "spdk/barrier.h"
+#include "spdk/likely.h"
+#include "spdk/log.h"
+#include "spdk/pipe.h"
+#include "spdk/sock.h"
+#include "spdk/string.h"
+#include "spdk/util.h"
+
+#include "spdk_internal/sock.h"
+#include "spdk_internal/assert.h"
+
+#define MAX_TMPBUF 1024
+#define PORTNUMLEN 32
+#define SO_RCVBUF_SIZE (2 * 1024 * 1024)
+#define SO_SNDBUF_SIZE (2 * 1024 * 1024)
+#define SPDK_SOCK_GROUP_QUEUE_DEPTH 4096
+#define IOV_BATCH_SIZE 64
+
+enum spdk_sock_task_type {
+ SPDK_SOCK_TASK_POLLIN = 0,
+ SPDK_SOCK_TASK_WRITE,
+ SPDK_SOCK_TASK_CANCEL,
+};
+
+enum spdk_uring_sock_task_status {
+ SPDK_URING_SOCK_TASK_NOT_IN_USE = 0,
+ SPDK_URING_SOCK_TASK_IN_PROCESS,
+};
+
+struct spdk_uring_task {
+ enum spdk_uring_sock_task_status status;
+ enum spdk_sock_task_type type;
+ struct spdk_uring_sock *sock;
+ struct msghdr msg;
+ struct iovec iovs[IOV_BATCH_SIZE];
+ int iov_cnt;
+ struct spdk_sock_request *last_req;
+ STAILQ_ENTRY(spdk_uring_task) link;
+};
+
+struct spdk_uring_sock {
+ struct spdk_sock base;
+ int fd;
+ struct spdk_uring_sock_group_impl *group;
+ struct spdk_uring_task write_task;
+ struct spdk_uring_task pollin_task;
+ struct spdk_uring_task cancel_task;
+ struct spdk_pipe *recv_pipe;
+ void *recv_buf;
+ int recv_buf_sz;
+ bool pending_recv;
+ int connection_status;
+ TAILQ_ENTRY(spdk_uring_sock) link;
+};
+
+struct spdk_uring_sock_group_impl {
+ struct spdk_sock_group_impl base;
+ struct io_uring uring;
+ uint32_t io_inflight;
+ uint32_t io_queued;
+ uint32_t io_avail;
+ TAILQ_HEAD(, spdk_uring_sock) pending_recv;
+};
+
+#define SPDK_URING_SOCK_REQUEST_IOV(req) ((struct iovec *)((uint8_t *)req + sizeof(struct spdk_sock_request)))
+
+static int
+get_addr_str(struct sockaddr *sa, char *host, size_t hlen)
+{
+ const char *result = NULL;
+
+ if (sa == NULL || host == NULL) {
+ return -1;
+ }
+
+ switch (sa->sa_family) {
+ case AF_INET:
+ result = inet_ntop(AF_INET, &(((struct sockaddr_in *)sa)->sin_addr),
+ host, hlen);
+ break;
+ case AF_INET6:
+ result = inet_ntop(AF_INET6, &(((struct sockaddr_in6 *)sa)->sin6_addr),
+ host, hlen);
+ break;
+ default:
+ break;
+ }
+
+ if (result != NULL) {
+ return 0;
+ } else {
+ return -1;
+ }
+}
+
+#define __uring_sock(sock) (struct spdk_uring_sock *)sock
+#define __uring_group_impl(group) (struct spdk_uring_sock_group_impl *)group
+
+static int
+uring_sock_getaddr(struct spdk_sock *_sock, char *saddr, int slen, uint16_t *sport,
+ char *caddr, int clen, uint16_t *cport)
+{
+ struct spdk_uring_sock *sock = __uring_sock(_sock);
+ struct sockaddr_storage sa;
+ socklen_t salen;
+ int rc;
+
+ assert(sock != NULL);
+
+ memset(&sa, 0, sizeof sa);
+ salen = sizeof sa;
+ rc = getsockname(sock->fd, (struct sockaddr *) &sa, &salen);
+ if (rc != 0) {
+ SPDK_ERRLOG("getsockname() failed (errno=%d)\n", errno);
+ return -1;
+ }
+
+ switch (sa.ss_family) {
+ case AF_UNIX:
+ /* Acceptable connection types that don't have IPs */
+ return 0;
+ case AF_INET:
+ case AF_INET6:
+ /* Code below will get IP addresses */
+ break;
+ default:
+ /* Unsupported socket family */
+ return -1;
+ }
+
+ rc = get_addr_str((struct sockaddr *)&sa, saddr, slen);
+ if (rc != 0) {
+ SPDK_ERRLOG("getnameinfo() failed (errno=%d)\n", errno);
+ return -1;
+ }
+
+ if (sport) {
+ if (sa.ss_family == AF_INET) {
+ *sport = ntohs(((struct sockaddr_in *) &sa)->sin_port);
+ } else if (sa.ss_family == AF_INET6) {
+ *sport = ntohs(((struct sockaddr_in6 *) &sa)->sin6_port);
+ }
+ }
+
+ memset(&sa, 0, sizeof sa);
+ salen = sizeof sa;
+ rc = getpeername(sock->fd, (struct sockaddr *) &sa, &salen);
+ if (rc != 0) {
+ SPDK_ERRLOG("getpeername() failed (errno=%d)\n", errno);
+ return -1;
+ }
+
+ rc = get_addr_str((struct sockaddr *)&sa, caddr, clen);
+ if (rc != 0) {
+ SPDK_ERRLOG("getnameinfo() failed (errno=%d)\n", errno);
+ return -1;
+ }
+
+ if (cport) {
+ if (sa.ss_family == AF_INET) {
+ *cport = ntohs(((struct sockaddr_in *) &sa)->sin_port);
+ } else if (sa.ss_family == AF_INET6) {
+ *cport = ntohs(((struct sockaddr_in6 *) &sa)->sin6_port);
+ }
+ }
+
+ return 0;
+}
+
+enum uring_sock_create_type {
+ SPDK_SOCK_CREATE_LISTEN,
+ SPDK_SOCK_CREATE_CONNECT,
+};
+
+static int
+uring_sock_alloc_pipe(struct spdk_uring_sock *sock, int sz)
+{
+ uint8_t *new_buf;
+ struct spdk_pipe *new_pipe;
+ struct iovec siov[2];
+ struct iovec diov[2];
+ int sbytes;
+ ssize_t bytes;
+
+ if (sock->recv_buf_sz == sz) {
+ return 0;
+ }
+
+ /* If the new size is 0, just free the pipe */
+ if (sz == 0) {
+ spdk_pipe_destroy(sock->recv_pipe);
+ free(sock->recv_buf);
+ sock->recv_pipe = NULL;
+ sock->recv_buf = NULL;
+ return 0;
+ } else if (sz < MIN_SOCK_PIPE_SIZE) {
+ SPDK_ERRLOG("The size of the pipe must be larger than %d\n", MIN_SOCK_PIPE_SIZE);
+ return -1;
+ }
+
+ /* Round up to next 64 byte multiple */
+ new_buf = calloc(SPDK_ALIGN_CEIL(sz + 1, 64), sizeof(uint8_t));
+ if (!new_buf) {
+ SPDK_ERRLOG("socket recv buf allocation failed\n");
+ return -ENOMEM;
+ }
+
+ new_pipe = spdk_pipe_create(new_buf, sz + 1);
+ if (new_pipe == NULL) {
+ SPDK_ERRLOG("socket pipe allocation failed\n");
+ free(new_buf);
+ return -ENOMEM;
+ }
+
+ if (sock->recv_pipe != NULL) {
+ /* Pull all of the data out of the old pipe */
+ sbytes = spdk_pipe_reader_get_buffer(sock->recv_pipe, sock->recv_buf_sz, siov);
+ if (sbytes > sz) {
+ /* Too much data to fit into the new pipe size */
+ spdk_pipe_destroy(new_pipe);
+ free(new_buf);
+ return -EINVAL;
+ }
+
+ sbytes = spdk_pipe_writer_get_buffer(new_pipe, sz, diov);
+ assert(sbytes == sz);
+
+ bytes = spdk_iovcpy(siov, 2, diov, 2);
+ spdk_pipe_writer_advance(new_pipe, bytes);
+
+ spdk_pipe_destroy(sock->recv_pipe);
+ free(sock->recv_buf);
+ }
+
+ sock->recv_buf_sz = sz;
+ sock->recv_buf = new_buf;
+ sock->recv_pipe = new_pipe;
+
+ return 0;
+}
+
+static int
+uring_sock_set_recvbuf(struct spdk_sock *_sock, int sz)
+{
+ struct spdk_uring_sock *sock = __uring_sock(_sock);
+ int rc;
+
+ assert(sock != NULL);
+
+#ifndef __aarch64__
+ /* On ARM systems, this buffering does not help. Skip it. */
+ /* The size of the pipe is purely derived from benchmarks. It seems to work well. */
+ rc = uring_sock_alloc_pipe(sock, sz);
+ if (rc) {
+ SPDK_ERRLOG("unable to allocate sufficient recvbuf with sz=%d on sock=%p\n", sz, _sock);
+ return rc;
+ }
+#endif
+
+ if (sz < SO_RCVBUF_SIZE) {
+ sz = SO_RCVBUF_SIZE;
+ }
+
+ rc = setsockopt(sock->fd, SOL_SOCKET, SO_RCVBUF, &sz, sizeof(sz));
+ if (rc < 0) {
+ return rc;
+ }
+
+ return 0;
+}
+
+static int
+uring_sock_set_sendbuf(struct spdk_sock *_sock, int sz)
+{
+ struct spdk_uring_sock *sock = __uring_sock(_sock);
+ int rc;
+
+ assert(sock != NULL);
+
+ if (sz < SO_SNDBUF_SIZE) {
+ sz = SO_SNDBUF_SIZE;
+ }
+
+ rc = setsockopt(sock->fd, SOL_SOCKET, SO_SNDBUF, &sz, sizeof(sz));
+ if (rc < 0) {
+ return rc;
+ }
+
+ return 0;
+}
+
+static struct spdk_uring_sock *
+uring_sock_alloc(int fd)
+{
+ struct spdk_uring_sock *sock;
+
+ sock = calloc(1, sizeof(*sock));
+ if (sock == NULL) {
+ SPDK_ERRLOG("sock allocation failed\n");
+ return NULL;
+ }
+
+ sock->fd = fd;
+ return sock;
+}
+
+static struct spdk_sock *
+uring_sock_create(const char *ip, int port,
+ enum uring_sock_create_type type,
+ struct spdk_sock_opts *opts)
+{
+ struct spdk_uring_sock *sock;
+ char buf[MAX_TMPBUF];
+ char portnum[PORTNUMLEN];
+ char *p;
+ struct addrinfo hints, *res, *res0;
+ int fd, flag;
+ int val = 1;
+ int rc;
+
+ if (ip == NULL) {
+ return NULL;
+ }
+ if (ip[0] == '[') {
+ snprintf(buf, sizeof(buf), "%s", ip + 1);
+ p = strchr(buf, ']');
+ if (p != NULL) {
+ *p = '\0';
+ }
+ ip = (const char *) &buf[0];
+ }
+
+ snprintf(portnum, sizeof portnum, "%d", port);
+ memset(&hints, 0, sizeof hints);
+ hints.ai_family = PF_UNSPEC;
+ hints.ai_socktype = SOCK_STREAM;
+ hints.ai_flags = AI_NUMERICSERV;
+ hints.ai_flags |= AI_PASSIVE;
+ hints.ai_flags |= AI_NUMERICHOST;
+ rc = getaddrinfo(ip, portnum, &hints, &res0);
+ if (rc != 0) {
+ SPDK_ERRLOG("getaddrinfo() failed (errno=%d)\n", errno);
+ return NULL;
+ }
+
+ /* try listen */
+ fd = -1;
+ for (res = res0; res != NULL; res = res->ai_next) {
+retry:
+ fd = socket(res->ai_family, res->ai_socktype, res->ai_protocol);
+ if (fd < 0) {
+ /* error */
+ continue;
+ }
+
+ val = SO_RCVBUF_SIZE;
+ rc = setsockopt(fd, SOL_SOCKET, SO_RCVBUF, &val, sizeof val);
+ if (rc) {
+ /* Not fatal */
+ }
+
+ val = SO_SNDBUF_SIZE;
+ rc = setsockopt(fd, SOL_SOCKET, SO_SNDBUF, &val, sizeof val);
+ if (rc) {
+ /* Not fatal */
+ }
+
+ rc = setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &val, sizeof val);
+ if (rc != 0) {
+ close(fd);
+ /* error */
+ continue;
+ }
+ rc = setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &val, sizeof val);
+ if (rc != 0) {
+ close(fd);
+ /* error */
+ continue;
+ }
+
+#if defined(SO_PRIORITY)
+ if (opts != NULL && opts->priority) {
+ rc = setsockopt(fd, SOL_SOCKET, SO_PRIORITY, &opts->priority, sizeof val);
+ if (rc != 0) {
+ close(fd);
+ /* error */
+ continue;
+ }
+ }
+#endif
+ if (res->ai_family == AF_INET6) {
+ rc = setsockopt(fd, IPPROTO_IPV6, IPV6_V6ONLY, &val, sizeof val);
+ if (rc != 0) {
+ close(fd);
+ /* error */
+ continue;
+ }
+ }
+
+ if (type == SPDK_SOCK_CREATE_LISTEN) {
+ rc = bind(fd, res->ai_addr, res->ai_addrlen);
+ if (rc != 0) {
+ SPDK_ERRLOG("bind() failed at port %d, errno = %d\n", port, errno);
+ switch (errno) {
+ case EINTR:
+ /* interrupted? */
+ close(fd);
+ goto retry;
+ case EADDRNOTAVAIL:
+ SPDK_ERRLOG("IP address %s not available. "
+ "Verify IP address in config file "
+ "and make sure setup script is "
+ "run before starting spdk app.\n", ip);
+ /* FALLTHROUGH */
+ default:
+ /* try next family */
+ close(fd);
+ fd = -1;
+ continue;
+ }
+ }
+ /* bind OK */
+ rc = listen(fd, 512);
+ if (rc != 0) {
+ SPDK_ERRLOG("listen() failed, errno = %d\n", errno);
+ close(fd);
+ fd = -1;
+ break;
+ }
+ } else if (type == SPDK_SOCK_CREATE_CONNECT) {
+ rc = connect(fd, res->ai_addr, res->ai_addrlen);
+ if (rc != 0) {
+ SPDK_ERRLOG("connect() failed, errno = %d\n", errno);
+ /* try next family */
+ close(fd);
+ fd = -1;
+ continue;
+ }
+ }
+
+ flag = fcntl(fd, F_GETFL);
+ if (fcntl(fd, F_SETFL, flag | O_NONBLOCK) < 0) {
+ SPDK_ERRLOG("fcntl can't set nonblocking mode for socket, fd: %d (%d)\n", fd, errno);
+ close(fd);
+ fd = -1;
+ break;
+ }
+ break;
+ }
+ freeaddrinfo(res0);
+
+ if (fd < 0) {
+ return NULL;
+ }
+
+ sock = uring_sock_alloc(fd);
+ if (sock == NULL) {
+ SPDK_ERRLOG("sock allocation failed\n");
+ close(fd);
+ return NULL;
+ }
+
+ return &sock->base;
+}
+
+static struct spdk_sock *
+uring_sock_listen(const char *ip, int port, struct spdk_sock_opts *opts)
+{
+ return uring_sock_create(ip, port, SPDK_SOCK_CREATE_LISTEN, opts);
+}
+
+static struct spdk_sock *
+uring_sock_connect(const char *ip, int port, struct spdk_sock_opts *opts)
+{
+ return uring_sock_create(ip, port, SPDK_SOCK_CREATE_CONNECT, opts);
+}
+
+static struct spdk_sock *
+uring_sock_accept(struct spdk_sock *_sock)
+{
+ struct spdk_uring_sock *sock = __uring_sock(_sock);
+ struct sockaddr_storage sa;
+ socklen_t salen;
+ int rc, fd;
+ struct spdk_uring_sock *new_sock;
+ int flag;
+
+ memset(&sa, 0, sizeof(sa));
+ salen = sizeof(sa);
+
+ assert(sock != NULL);
+
+ rc = accept(sock->fd, (struct sockaddr *)&sa, &salen);
+
+ if (rc == -1) {
+ return NULL;
+ }
+
+ fd = rc;
+
+ flag = fcntl(fd, F_GETFL);
+ if ((!(flag & O_NONBLOCK)) && (fcntl(fd, F_SETFL, flag | O_NONBLOCK) < 0)) {
+ SPDK_ERRLOG("fcntl can't set nonblocking mode for socket, fd: %d (%d)\n", fd, errno);
+ close(fd);
+ return NULL;
+ }
+
+#if defined(SO_PRIORITY)
+ /* The priority is not inherited, so call this function again */
+ if (sock->base.opts.priority) {
+ rc = setsockopt(fd, SOL_SOCKET, SO_PRIORITY, &sock->base.opts.priority, sizeof(int));
+ if (rc != 0) {
+ close(fd);
+ return NULL;
+ }
+ }
+#endif
+
+ new_sock = uring_sock_alloc(fd);
+ if (new_sock == NULL) {
+ close(fd);
+ return NULL;
+ }
+
+ return &new_sock->base;
+}
+
+static int
+uring_sock_close(struct spdk_sock *_sock)
+{
+ struct spdk_uring_sock *sock = __uring_sock(_sock);
+ int rc;
+
+ assert(TAILQ_EMPTY(&_sock->pending_reqs));
+ assert(sock->group == NULL);
+
+ spdk_pipe_destroy(sock->recv_pipe);
+ free(sock->recv_buf);
+ rc = close(sock->fd);
+ if (rc == 0) {
+ free(sock);
+ }
+
+ return rc;
+}
+
+static ssize_t
+uring_sock_recv_from_pipe(struct spdk_uring_sock *sock, struct iovec *diov, int diovcnt)
+{
+ struct iovec siov[2];
+ int sbytes;
+ ssize_t bytes;
+ struct spdk_uring_sock_group_impl *group;
+
+ sbytes = spdk_pipe_reader_get_buffer(sock->recv_pipe, sock->recv_buf_sz, siov);
+ if (sbytes < 0) {
+ errno = EINVAL;
+ return -1;
+ } else if (sbytes == 0) {
+ errno = EAGAIN;
+ return -1;
+ }
+
+ bytes = spdk_iovcpy(siov, 2, diov, diovcnt);
+
+ if (bytes == 0) {
+ /* The only way this happens is if diov is 0 length */
+ errno = EINVAL;
+ return -1;
+ }
+
+ spdk_pipe_reader_advance(sock->recv_pipe, bytes);
+
+ /* If we drained the pipe, take it off the level-triggered list */
+ if (sock->base.group_impl && spdk_pipe_reader_bytes_available(sock->recv_pipe) == 0) {
+ group = __uring_group_impl(sock->base.group_impl);
+ TAILQ_REMOVE(&group->pending_recv, sock, link);
+ sock->pending_recv = false;
+ }
+
+ return bytes;
+}
+
+static inline ssize_t
+uring_sock_read(struct spdk_uring_sock *sock)
+{
+ struct iovec iov[2];
+ int bytes;
+ struct spdk_uring_sock_group_impl *group;
+
+ bytes = spdk_pipe_writer_get_buffer(sock->recv_pipe, sock->recv_buf_sz, iov);
+
+ if (bytes > 0) {
+ bytes = readv(sock->fd, iov, 2);
+ if (bytes > 0) {
+ spdk_pipe_writer_advance(sock->recv_pipe, bytes);
+ if (sock->base.group_impl) {
+ group = __uring_group_impl(sock->base.group_impl);
+ TAILQ_INSERT_TAIL(&group->pending_recv, sock, link);
+ sock->pending_recv = true;
+ }
+ }
+ }
+
+ return bytes;
+}
+
+static ssize_t
+uring_sock_readv(struct spdk_sock *_sock, struct iovec *iov, int iovcnt)
+{
+ struct spdk_uring_sock *sock = __uring_sock(_sock);
+ int rc, i;
+ size_t len;
+
+ if (sock->recv_pipe == NULL) {
+ return readv(sock->fd, iov, iovcnt);
+ }
+
+ len = 0;
+ for (i = 0; i < iovcnt; i++) {
+ len += iov[i].iov_len;
+ }
+
+ if (spdk_pipe_reader_bytes_available(sock->recv_pipe) == 0) {
+ /* If the user is receiving a sufficiently large amount of data,
+ * receive directly to their buffers. */
+ if (len >= MIN_SOCK_PIPE_SIZE) {
+ return readv(sock->fd, iov, iovcnt);
+ }
+
+ /* Otherwise, do a big read into our pipe */
+ rc = uring_sock_read(sock);
+ if (rc <= 0) {
+ return rc;
+ }
+ }
+
+ return uring_sock_recv_from_pipe(sock, iov, iovcnt);
+}
+
+static ssize_t
+uring_sock_recv(struct spdk_sock *sock, void *buf, size_t len)
+{
+ struct iovec iov[1];
+
+ iov[0].iov_base = buf;
+ iov[0].iov_len = len;
+
+ return uring_sock_readv(sock, iov, 1);
+}
+
+static ssize_t
+uring_sock_writev(struct spdk_sock *_sock, struct iovec *iov, int iovcnt)
+{
+ struct spdk_uring_sock *sock = __uring_sock(_sock);
+
+ if (sock->write_task.status != SPDK_URING_SOCK_TASK_NOT_IN_USE) {
+ errno = EAGAIN;
+ return -1;
+ }
+
+ return writev(sock->fd, iov, iovcnt);
+}
+
+static int
+sock_prep_reqs(struct spdk_sock *_sock, struct iovec *iovs, int index,
+ struct spdk_sock_request **last_req)
+{
+ int iovcnt, i;
+ struct spdk_sock_request *req;
+ unsigned int offset;
+
+ /* Gather an iov */
+ iovcnt = index;
+ if (spdk_unlikely(iovcnt >= IOV_BATCH_SIZE)) {
+ goto end;
+ }
+
+ if (last_req != NULL && *last_req != NULL) {
+ req = TAILQ_NEXT(*last_req, internal.link);
+ } else {
+ req = TAILQ_FIRST(&_sock->queued_reqs);
+ }
+
+ while (req) {
+ offset = req->internal.offset;
+
+ for (i = 0; i < req->iovcnt; i++) {
+ /* Consume any offset first */
+ if (offset >= SPDK_SOCK_REQUEST_IOV(req, i)->iov_len) {
+ offset -= SPDK_SOCK_REQUEST_IOV(req, i)->iov_len;
+ continue;
+ }
+
+ iovs[iovcnt].iov_base = SPDK_SOCK_REQUEST_IOV(req, i)->iov_base + offset;
+ iovs[iovcnt].iov_len = SPDK_SOCK_REQUEST_IOV(req, i)->iov_len - offset;
+ iovcnt++;
+
+ offset = 0;
+
+ if (iovcnt >= IOV_BATCH_SIZE) {
+ break;
+ }
+ }
+ if (iovcnt >= IOV_BATCH_SIZE) {
+ break;
+ }
+
+ if (last_req != NULL) {
+ *last_req = req;
+ }
+ req = TAILQ_NEXT(req, internal.link);
+ }
+
+end:
+ return iovcnt;
+}
+
+static int
+sock_complete_reqs(struct spdk_sock *_sock, ssize_t rc)
+{
+ struct spdk_sock_request *req;
+ int i, retval;
+ unsigned int offset;
+ size_t len;
+
+ /* Consume the requests that were actually written */
+ req = TAILQ_FIRST(&_sock->queued_reqs);
+ while (req) {
+ offset = req->internal.offset;
+
+ for (i = 0; i < req->iovcnt; i++) {
+ /* Advance by the offset first */
+ if (offset >= SPDK_SOCK_REQUEST_IOV(req, i)->iov_len) {
+ offset -= SPDK_SOCK_REQUEST_IOV(req, i)->iov_len;
+ continue;
+ }
+
+ /* Calculate the remaining length of this element */
+ len = SPDK_SOCK_REQUEST_IOV(req, i)->iov_len - offset;
+
+ if (len > (size_t)rc) {
+ /* This element was partially sent. */
+ req->internal.offset += rc;
+ return 0;
+ }
+
+ offset = 0;
+ req->internal.offset += len;
+ rc -= len;
+ }
+
+ /* Handled a full request. */
+ spdk_sock_request_pend(_sock, req);
+
+ retval = spdk_sock_request_put(_sock, req, 0);
+ if (retval) {
+ return retval;
+ }
+
+ if (rc == 0) {
+ break;
+ }
+
+ req = TAILQ_FIRST(&_sock->queued_reqs);
+ }
+
+ return 0;
+}
+
+static void
+_sock_flush(struct spdk_sock *_sock)
+{
+ struct spdk_uring_sock *sock = __uring_sock(_sock);
+ struct spdk_uring_task *task = &sock->write_task;
+ uint32_t iovcnt;
+ struct io_uring_sqe *sqe;
+
+ if (task->status == SPDK_URING_SOCK_TASK_IN_PROCESS) {
+ return;
+ }
+
+ iovcnt = sock_prep_reqs(&sock->base, task->iovs, task->iov_cnt, &task->last_req);
+ if (!iovcnt) {
+ return;
+ }
+
+ task->iov_cnt = iovcnt;
+ assert(sock->group != NULL);
+ task->msg.msg_iov = task->iovs;
+ task->msg.msg_iovlen = task->iov_cnt;
+
+ sock->group->io_queued++;
+
+ sqe = io_uring_get_sqe(&sock->group->uring);
+ io_uring_prep_sendmsg(sqe, sock->fd, &sock->write_task.msg, 0);
+ io_uring_sqe_set_data(sqe, task);
+ task->status = SPDK_URING_SOCK_TASK_IN_PROCESS;
+}
+
+static void
+_sock_prep_pollin(struct spdk_sock *_sock)
+{
+ struct spdk_uring_sock *sock = __uring_sock(_sock);
+ struct spdk_uring_task *task = &sock->pollin_task;
+ struct io_uring_sqe *sqe;
+
+ /* Do not prepare pollin event */
+ if (task->status == SPDK_URING_SOCK_TASK_IN_PROCESS || sock->pending_recv) {
+ return;
+ }
+
+ assert(sock->group != NULL);
+ sock->group->io_queued++;
+
+ sqe = io_uring_get_sqe(&sock->group->uring);
+ io_uring_prep_poll_add(sqe, sock->fd, POLLIN);
+ io_uring_sqe_set_data(sqe, task);
+ task->status = SPDK_URING_SOCK_TASK_IN_PROCESS;
+}
+
+static void
+_sock_prep_cancel_task(struct spdk_sock *_sock, void *user_data)
+{
+ struct spdk_uring_sock *sock = __uring_sock(_sock);
+ struct spdk_uring_task *task = &sock->cancel_task;
+ struct io_uring_sqe *sqe;
+
+ if (task->status == SPDK_URING_SOCK_TASK_IN_PROCESS) {
+ return;
+ }
+
+ assert(sock->group != NULL);
+ sock->group->io_queued++;
+
+ sqe = io_uring_get_sqe(&sock->group->uring);
+ io_uring_prep_cancel(sqe, user_data, 0);
+ io_uring_sqe_set_data(sqe, task);
+ task->status = SPDK_URING_SOCK_TASK_IN_PROCESS;
+}
+
+static int
+sock_uring_group_reap(struct spdk_uring_sock_group_impl *group, int max, int max_read_events,
+ struct spdk_sock **socks)
+{
+ int i, count, ret;
+ struct io_uring_cqe *cqe;
+ struct spdk_uring_sock *sock, *tmp;
+ struct spdk_uring_task *task;
+ int status;
+
+ for (i = 0; i < max; i++) {
+ ret = io_uring_peek_cqe(&group->uring, &cqe);
+ if (ret != 0) {
+ break;
+ }
+
+ if (cqe == NULL) {
+ break;
+ }
+
+ task = (struct spdk_uring_task *)cqe->user_data;
+ assert(task != NULL);
+ sock = task->sock;
+ assert(sock != NULL);
+ assert(sock->group != NULL);
+ assert(sock->group == group);
+ sock->group->io_inflight--;
+ sock->group->io_avail++;
+ status = cqe->res;
+ io_uring_cqe_seen(&group->uring, cqe);
+
+ task->status = SPDK_URING_SOCK_TASK_NOT_IN_USE;
+
+ if (spdk_unlikely(status <= 0)) {
+ if (status == -EAGAIN || status == -EWOULDBLOCK) {
+ continue;
+ }
+ }
+
+ switch (task->type) {
+ case SPDK_SOCK_TASK_POLLIN:
+ if ((status & POLLIN) == POLLIN) {
+ if (sock->base.cb_fn != NULL) {
+ assert(sock->pending_recv == false);
+ sock->pending_recv = true;
+ TAILQ_INSERT_TAIL(&group->pending_recv, sock, link);
+ }
+ }
+ break;
+ case SPDK_SOCK_TASK_WRITE:
+ assert(TAILQ_EMPTY(&sock->base.pending_reqs));
+ task->last_req = NULL;
+ task->iov_cnt = 0;
+ if (spdk_unlikely(status) < 0) {
+ sock->connection_status = status;
+ spdk_sock_abort_requests(&sock->base);
+ } else {
+ sock_complete_reqs(&sock->base, status);
+ }
+
+ break;
+ case SPDK_SOCK_TASK_CANCEL:
+ /* Do nothing */
+ break;
+ default:
+ SPDK_UNREACHABLE();
+ }
+ }
+
+ if (!socks) {
+ return 0;
+ }
+ count = 0;
+ TAILQ_FOREACH_SAFE(sock, &group->pending_recv, link, tmp) {
+ if (count == max_read_events) {
+ break;
+ }
+
+ socks[count++] = &sock->base;
+ }
+
+ /* Cycle the pending_recv list so that each time we poll things aren't
+ * in the same order. */
+ for (i = 0; i < count; i++) {
+ sock = __uring_sock(socks[i]);
+
+ TAILQ_REMOVE(&group->pending_recv, sock, link);
+
+ if (sock->recv_pipe == NULL || spdk_pipe_reader_bytes_available(sock->recv_pipe) == 0) {
+ sock->pending_recv = false;
+ } else {
+ TAILQ_INSERT_TAIL(&group->pending_recv, sock, link);
+ }
+ }
+
+ return count;
+}
+
+static int
+_sock_flush_client(struct spdk_sock *_sock)
+{
+ struct spdk_uring_sock *sock = __uring_sock(_sock);
+ struct msghdr msg = {};
+ struct iovec iovs[IOV_BATCH_SIZE];
+ int iovcnt;
+ ssize_t rc;
+
+ /* Can't flush from within a callback or we end up with recursive calls */
+ if (_sock->cb_cnt > 0) {
+ return 0;
+ }
+
+ /* Gather an iov */
+ iovcnt = sock_prep_reqs(_sock, iovs, 0, NULL);
+ if (iovcnt == 0) {
+ return 0;
+ }
+
+ /* Perform the vectored write */
+ msg.msg_iov = iovs;
+ msg.msg_iovlen = iovcnt;
+ rc = sendmsg(sock->fd, &msg, 0);
+ if (rc <= 0) {
+ if (errno == EAGAIN || errno == EWOULDBLOCK) {
+ return 0;
+ }
+ return rc;
+ }
+
+ sock_complete_reqs(_sock, rc);
+
+ return 0;
+}
+
+static void
+uring_sock_writev_async(struct spdk_sock *_sock, struct spdk_sock_request *req)
+{
+ struct spdk_uring_sock *sock = __uring_sock(_sock);
+ int rc;
+
+ if (spdk_unlikely(sock->connection_status)) {
+ req->cb_fn(req->cb_arg, sock->connection_status);
+ return;
+ }
+
+ spdk_sock_request_queue(_sock, req);
+
+ if (!sock->group) {
+ if (_sock->queued_iovcnt >= IOV_BATCH_SIZE) {
+ rc = _sock_flush_client(_sock);
+ if (rc) {
+ spdk_sock_abort_requests(_sock);
+ }
+ }
+ }
+}
+
+static int
+uring_sock_set_recvlowat(struct spdk_sock *_sock, int nbytes)
+{
+ struct spdk_uring_sock *sock = __uring_sock(_sock);
+ int val;
+ int rc;
+
+ assert(sock != NULL);
+
+ val = nbytes;
+ rc = setsockopt(sock->fd, SOL_SOCKET, SO_RCVLOWAT, &val, sizeof val);
+ if (rc != 0) {
+ return -1;
+ }
+ return 0;
+}
+
+static bool
+uring_sock_is_ipv6(struct spdk_sock *_sock)
+{
+ struct spdk_uring_sock *sock = __uring_sock(_sock);
+ struct sockaddr_storage sa;
+ socklen_t salen;
+ int rc;
+
+ assert(sock != NULL);
+
+ memset(&sa, 0, sizeof sa);
+ salen = sizeof sa;
+ rc = getsockname(sock->fd, (struct sockaddr *) &sa, &salen);
+ if (rc != 0) {
+ SPDK_ERRLOG("getsockname() failed (errno=%d)\n", errno);
+ return false;
+ }
+
+ return (sa.ss_family == AF_INET6);
+}
+
+static bool
+uring_sock_is_ipv4(struct spdk_sock *_sock)
+{
+ struct spdk_uring_sock *sock = __uring_sock(_sock);
+ struct sockaddr_storage sa;
+ socklen_t salen;
+ int rc;
+
+ assert(sock != NULL);
+
+ memset(&sa, 0, sizeof sa);
+ salen = sizeof sa;
+ rc = getsockname(sock->fd, (struct sockaddr *) &sa, &salen);
+ if (rc != 0) {
+ SPDK_ERRLOG("getsockname() failed (errno=%d)\n", errno);
+ return false;
+ }
+
+ return (sa.ss_family == AF_INET);
+}
+
+static bool
+uring_sock_is_connected(struct spdk_sock *_sock)
+{
+ struct spdk_uring_sock *sock = __uring_sock(_sock);
+ uint8_t byte;
+ int rc;
+
+ rc = recv(sock->fd, &byte, 1, MSG_PEEK);
+ if (rc == 0) {
+ return false;
+ }
+
+ if (rc < 0) {
+ if (errno == EAGAIN || errno == EWOULDBLOCK) {
+ return true;
+ }
+
+ return false;
+ }
+
+ return true;
+}
+
+static int
+uring_sock_get_placement_id(struct spdk_sock *_sock, int *placement_id)
+{
+ int rc = -1;
+
+#if defined(SO_INCOMING_NAPI_ID)
+ struct spdk_uring_sock *sock = __uring_sock(_sock);
+ socklen_t salen = sizeof(int);
+
+ rc = getsockopt(sock->fd, SOL_SOCKET, SO_INCOMING_NAPI_ID, placement_id, &salen);
+ if (rc != 0) {
+ SPDK_ERRLOG("getsockopt() failed (errno=%d)\n", errno);
+ }
+
+#endif
+ return rc;
+}
+
+static struct spdk_sock_group_impl *
+uring_sock_group_impl_create(void)
+{
+ struct spdk_uring_sock_group_impl *group_impl;
+
+ group_impl = calloc(1, sizeof(*group_impl));
+ if (group_impl == NULL) {
+ SPDK_ERRLOG("group_impl allocation failed\n");
+ return NULL;
+ }
+
+ group_impl->io_avail = SPDK_SOCK_GROUP_QUEUE_DEPTH;
+
+ if (io_uring_queue_init(SPDK_SOCK_GROUP_QUEUE_DEPTH, &group_impl->uring, 0) < 0) {
+ SPDK_ERRLOG("uring I/O context setup failure\n");
+ free(group_impl);
+ return NULL;
+ }
+
+ TAILQ_INIT(&group_impl->pending_recv);
+
+ return &group_impl->base;
+}
+
+static int
+uring_sock_group_impl_add_sock(struct spdk_sock_group_impl *_group,
+ struct spdk_sock *_sock)
+{
+ struct spdk_uring_sock *sock = __uring_sock(_sock);
+ struct spdk_uring_sock_group_impl *group = __uring_group_impl(_group);
+
+ sock->group = group;
+ sock->write_task.sock = sock;
+ sock->write_task.type = SPDK_SOCK_TASK_WRITE;
+
+ sock->pollin_task.sock = sock;
+ sock->pollin_task.type = SPDK_SOCK_TASK_POLLIN;
+
+ sock->cancel_task.sock = sock;
+ sock->cancel_task.type = SPDK_SOCK_TASK_CANCEL;
+
+ /* switched from another polling group due to scheduling */
+ if (spdk_unlikely(sock->recv_pipe != NULL &&
+ (spdk_pipe_reader_bytes_available(sock->recv_pipe) > 0))) {
+ assert(sock->pending_recv == false);
+ sock->pending_recv = true;
+ TAILQ_INSERT_TAIL(&group->pending_recv, sock, link);
+ }
+
+ return 0;
+}
+
+static int
+uring_sock_group_impl_poll(struct spdk_sock_group_impl *_group, int max_events,
+ struct spdk_sock **socks)
+{
+ struct spdk_uring_sock_group_impl *group = __uring_group_impl(_group);
+ int count, ret;
+ int to_complete, to_submit;
+ struct spdk_sock *_sock, *tmp;
+ struct spdk_uring_sock *sock;
+
+ if (spdk_likely(socks)) {
+ TAILQ_FOREACH_SAFE(_sock, &group->base.socks, link, tmp) {
+ sock = __uring_sock(_sock);
+ if (spdk_unlikely(sock->connection_status)) {
+ continue;
+ }
+ _sock_flush(_sock);
+ _sock_prep_pollin(_sock);
+ }
+ }
+
+ to_submit = group->io_queued;
+
+ /* For network I/O, it cannot be set with O_DIRECT, so we do not need to call spdk_io_uring_enter */
+ if (to_submit > 0) {
+ /* If there are I/O to submit, use io_uring_submit here.
+ * It will automatically call io_uring_enter appropriately. */
+ ret = io_uring_submit(&group->uring);
+ if (ret < 0) {
+ return 1;
+ }
+ group->io_queued = 0;
+ group->io_inflight += to_submit;
+ group->io_avail -= to_submit;
+ }
+
+ count = 0;
+ to_complete = group->io_inflight;
+ if (to_complete > 0) {
+ count = sock_uring_group_reap(group, to_complete, max_events, socks);
+ }
+
+ return count;
+}
+
+static int
+uring_sock_group_impl_remove_sock(struct spdk_sock_group_impl *_group,
+ struct spdk_sock *_sock)
+{
+ struct spdk_uring_sock *sock = __uring_sock(_sock);
+ struct spdk_uring_sock_group_impl *group = __uring_group_impl(_group);
+
+ if (sock->write_task.status != SPDK_URING_SOCK_TASK_NOT_IN_USE) {
+ _sock_prep_cancel_task(_sock, &sock->write_task);
+ /* Since spdk_sock_group_remove_sock is not asynchronous interface, so
+ * currently can use a while loop here. */
+ while ((sock->write_task.status != SPDK_URING_SOCK_TASK_NOT_IN_USE) ||
+ (sock->cancel_task.status != SPDK_URING_SOCK_TASK_NOT_IN_USE)) {
+ uring_sock_group_impl_poll(_group, 32, NULL);
+ }
+ }
+
+ if (sock->pollin_task.status != SPDK_URING_SOCK_TASK_NOT_IN_USE) {
+ _sock_prep_cancel_task(_sock, &sock->pollin_task);
+ /* Since spdk_sock_group_remove_sock is not asynchronous interface, so
+ * currently can use a while loop here. */
+ while ((sock->pollin_task.status != SPDK_URING_SOCK_TASK_NOT_IN_USE) ||
+ (sock->cancel_task.status != SPDK_URING_SOCK_TASK_NOT_IN_USE)) {
+ uring_sock_group_impl_poll(_group, 32, NULL);
+ }
+ }
+
+ if (sock->pending_recv) {
+ TAILQ_REMOVE(&group->pending_recv, sock, link);
+ sock->pending_recv = false;
+ }
+ assert(sock->pending_recv == false);
+
+ sock->group = NULL;
+ return 0;
+}
+
+static int
+uring_sock_group_impl_close(struct spdk_sock_group_impl *_group)
+{
+ struct spdk_uring_sock_group_impl *group = __uring_group_impl(_group);
+
+ /* try to reap all the active I/O */
+ while (group->io_inflight) {
+ uring_sock_group_impl_poll(_group, 32, NULL);
+ }
+ assert(group->io_inflight == 0);
+ assert(group->io_avail == SPDK_SOCK_GROUP_QUEUE_DEPTH);
+
+ io_uring_queue_exit(&group->uring);
+
+ free(group);
+ return 0;
+}
+
+static int
+uring_sock_flush(struct spdk_sock *_sock)
+{
+ struct spdk_uring_sock *sock = __uring_sock(_sock);
+
+ if (!sock->group) {
+ return _sock_flush_client(_sock);
+ }
+
+ return 0;
+}
+
+static struct spdk_net_impl g_uring_net_impl = {
+ .name = "uring",
+ .getaddr = uring_sock_getaddr,
+ .connect = uring_sock_connect,
+ .listen = uring_sock_listen,
+ .accept = uring_sock_accept,
+ .close = uring_sock_close,
+ .recv = uring_sock_recv,
+ .readv = uring_sock_readv,
+ .writev = uring_sock_writev,
+ .writev_async = uring_sock_writev_async,
+ .flush = uring_sock_flush,
+ .set_recvlowat = uring_sock_set_recvlowat,
+ .set_recvbuf = uring_sock_set_recvbuf,
+ .set_sendbuf = uring_sock_set_sendbuf,
+ .is_ipv6 = uring_sock_is_ipv6,
+ .is_ipv4 = uring_sock_is_ipv4,
+ .is_connected = uring_sock_is_connected,
+ .get_placement_id = uring_sock_get_placement_id,
+ .group_impl_create = uring_sock_group_impl_create,
+ .group_impl_add_sock = uring_sock_group_impl_add_sock,
+ .group_impl_remove_sock = uring_sock_group_impl_remove_sock,
+ .group_impl_poll = uring_sock_group_impl_poll,
+ .group_impl_close = uring_sock_group_impl_close,
+};
+
+SPDK_NET_IMPL_REGISTER(uring, &g_uring_net_impl, DEFAULT_SOCK_PRIORITY + 1);
diff --git a/src/spdk/module/sock/vpp/Makefile b/src/spdk/module/sock/vpp/Makefile
new file mode 100644
index 000000000..016018c77
--- /dev/null
+++ b/src/spdk/module/sock/vpp/Makefile
@@ -0,0 +1,55 @@
+#
+# BSD LICENSE
+#
+# Copyright (c) Intel Corporation.
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions
+# are met:
+#
+# * Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+# * Redistributions in binary form must reproduce the above copyright
+# notice, this list of conditions and the following disclaimer in
+# the documentation and/or other materials provided with the
+# distribution.
+# * Neither the name of Intel Corporation nor the names of its
+# contributors may be used to endorse or promote products derived
+# from this software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+#
+
+SPDK_ROOT_DIR := $(abspath $(CURDIR)/../../..)
+include $(SPDK_ROOT_DIR)/mk/spdk.common.mk
+
+SO_VER := 2
+SO_MINOR := 0
+
+C_SRCS += vpp.c
+CFLAGS += -Wno-sign-compare -Wno-error=old-style-definition
+CFLAGS += -Wno-error=strict-prototypes -Wno-error=ignored-qualifiers
+
+GCC_VERSION=$(shell $(CC) -dumpversion | cut -d. -f1)
+
+# disable packed member unalign warnings
+ifeq ($(shell test $(GCC_VERSION) -ge 9 && echo 1), 1)
+CFLAGS += -Wno-error=address-of-packed-member
+endif
+
+LIBNAME = sock_vpp
+
+SPDK_MAP_FILE = $(SPDK_ROOT_DIR)/mk/spdk_blank.map
+
+include $(SPDK_ROOT_DIR)/mk/spdk.lib.mk
diff --git a/src/spdk/module/sock/vpp/vpp.c b/src/spdk/module/sock/vpp/vpp.c
new file mode 100644
index 000000000..89a92e9d1
--- /dev/null
+++ b/src/spdk/module/sock/vpp/vpp.c
@@ -0,0 +1,1633 @@
+/*-
+ * BSD LICENSE
+ *
+ * Copyright (c) Intel Corporation.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions
+ * are met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above copyright
+ * notice, this list of conditions and the following disclaimer in
+ * the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Intel Corporation nor the names of its
+ * contributors may be used to endorse or promote products derived
+ * from this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+/* Omit from static analysis. */
+#ifndef __clang_analyzer__
+
+#include "spdk/stdinc.h"
+
+#include "spdk/log.h"
+#include "spdk/sock.h"
+#include "spdk/net.h"
+#include "spdk/string.h"
+#include "spdk_internal/sock.h"
+#include "spdk/queue.h"
+#include "spdk/event.h"
+#include "spdk/thread.h"
+#include "spdk_internal/log.h"
+
+/* _GNU_SOURCE is redefined in the vpp headers with no protection (dlmalloc.h) */
+#undef _GNU_SOURCE
+
+#include <svm/svm_fifo_segment.h>
+#include <vlibmemory/api.h>
+#include <vpp/api/vpe_msg_enum.h>
+#include <vnet/session/application_interface.h>
+
+#define vl_typedefs /* define message structures */
+#include <vpp/api/vpe_all_api_h.h>
+#undef vl_typedefs
+
+/* declare message handlers for each api */
+
+#define vl_endianfun /* define message structures */
+#include <vpp/api/vpe_all_api_h.h>
+#undef vl_endianfun
+
+/* instantiate all the print functions we know about */
+#define vl_print(handle, ...)
+#define vl_printfun
+#include <vpp/api/vpe_all_api_h.h>
+#undef vl_printfun
+
+#define SPDK_VPP_CLIB_MEM_SIZE 256 << 20
+#define SPDK_VPP_SESSIONS_MAX 2048
+#define SPDK_VPP_LISTEN_QUEUE_SIZE SPDK_VPP_SESSIONS_MAX
+#define SPDK_VPP_SEGMENT_BASEVA 0x200000000ULL
+#define SPDK_VPP_SEGMENT_TIMEOUT 20
+#define IOV_BATCH_SIZE 64
+
+/* VPP connection state */
+enum spdk_vpp_state {
+ VPP_STATE_START,
+ VPP_STATE_ENABLED,
+ VPP_STATE_ATTACHED,
+ VPP_STATE_READY,
+ VPP_STATE_DISCONNECTING,
+ VPP_STATE_FAILED
+};
+
+/* VPP session state */
+enum spdk_vpp_session_state {
+ VPP_SESSION_STATE_UNUSED = 0,
+ VPP_SESSION_STATE_INIT, /* Initial state */
+ VPP_SESSION_STATE_READY, /* Ready for processing */
+ VPP_SESSION_STATE_DISCONNECT,
+ VPP_SESSION_STATE_CLOSE,
+ VPP_SESSION_STATE_FAILED
+};
+
+struct spdk_vpp_session {
+ struct spdk_sock base;
+
+ /* VPP app session */
+ app_session_t app_session;
+
+ uint32_t id;
+
+ bool is_server; /* Server side session */
+ bool is_listen; /* Session is listener */
+
+ uint64_t handle;
+ uint32_t context;
+
+ /* Listener fields */
+ pthread_mutex_t accept_session_lock;
+ uint32_t *accept_session_index_fifo;
+};
+
+static struct spdk_vpp_main {
+ int my_client_index;
+ enum spdk_vpp_state vpp_state;
+ bool vpp_initialized;
+ struct spdk_thread *init_thread;
+
+ svm_fifo_segment_main_t segment_main;
+ svm_queue_t *vl_input_queue;
+ svm_queue_t *vl_output_queue;
+ svm_msg_q_t *app_event_queue;
+
+ struct spdk_vpp_session sessions[SPDK_VPP_SESSIONS_MAX];
+ pthread_mutex_t session_get_lock;
+
+ struct spdk_poller *vpp_queue_poller;
+ struct spdk_poller *app_queue_poller;
+ struct spdk_poller *timeout_poller;
+} g_svm;
+
+struct spdk_vpp_sock_group_impl {
+ struct spdk_sock_group_impl base;
+ struct spdk_sock *last_sock;
+};
+
+#define __vpp_session(sock) ((struct spdk_vpp_session *)sock)
+#define __vpp_group_impl(group) ((struct spdk_vpp_sock_group_impl *)group)
+
+/******************************************************************************
+ * Session management
+ */
+static struct spdk_vpp_session *
+vpp_session_create(void)
+{
+ struct spdk_vpp_session *session;
+ int i;
+
+ pthread_mutex_lock(&g_svm.session_get_lock);
+ for (i = 0; i < SPDK_VPP_SESSIONS_MAX &&
+ g_svm.sessions[i].app_session.session_state != VPP_SESSION_STATE_UNUSED; i++) {
+ /* Empty loop body */
+ }
+ if (i == SPDK_VPP_SESSIONS_MAX ||
+ g_svm.sessions[i].app_session.session_state != VPP_SESSION_STATE_UNUSED) {
+ SPDK_ERRLOG("Cannot allocate space for new session\n");
+ pthread_mutex_unlock(&g_svm.session_get_lock);
+ return NULL;
+ }
+ session = &g_svm.sessions[i];
+ memset(session, 0, sizeof(struct spdk_vpp_session));
+ pthread_mutex_init(&session->accept_session_lock, NULL);
+
+ session->id = i;
+ session->app_session.session_state = VPP_SESSION_STATE_INIT;
+
+ pthread_mutex_unlock(&g_svm.session_get_lock);
+
+ SPDK_DEBUGLOG(SPDK_SOCK_VPP, "Creating new session %p (%d)\n",
+ session, session->id);
+
+ return session;
+}
+
+static struct spdk_vpp_session *
+vpp_session_get(uint32_t id)
+{
+ struct spdk_vpp_session *session = NULL;
+
+ if (id >= SPDK_VPP_SESSIONS_MAX) {
+ return NULL;
+ }
+
+ pthread_mutex_lock(&g_svm.session_get_lock);
+ if (g_svm.sessions[id].app_session.session_state != VPP_SESSION_STATE_UNUSED) {
+ session = &g_svm.sessions[id];
+ }
+ pthread_mutex_unlock(&g_svm.session_get_lock);
+
+ return session;
+}
+
+static struct spdk_vpp_session *
+vpp_session_get_by_handle(uint64_t handle, bool is_listen)
+{
+ struct spdk_vpp_session *session = NULL;
+ int i;
+
+ for (i = 0; i < SPDK_VPP_SESSIONS_MAX; i++) {
+ if (g_svm.sessions[i].app_session.session_state != VPP_SESSION_STATE_UNUSED &&
+ g_svm.sessions[i].app_session.session_state != VPP_SESSION_STATE_DISCONNECT &&
+ g_svm.sessions[i].handle == handle &&
+ g_svm.sessions[i].is_listen == is_listen) {
+ session = &g_svm.sessions[i];
+ break;
+ }
+ }
+
+ return session;
+}
+
+static int
+vpp_session_free(struct spdk_vpp_session *session)
+{
+ /* Remove session */
+ if (session == NULL) {
+ SPDK_ERRLOG("Wrong session\n");
+ return -EINVAL;
+ }
+
+ SPDK_DEBUGLOG(SPDK_SOCK_VPP, "Free session %p (%d)\n", session, session->id);
+
+ pthread_mutex_lock(&g_svm.session_get_lock);
+ session->app_session.session_state = VPP_SESSION_STATE_UNUSED;
+ pthread_mutex_destroy(&session->accept_session_lock);
+ pthread_mutex_unlock(&g_svm.session_get_lock);
+
+ return 0;
+}
+
+static int
+vpp_sock_getaddr(struct spdk_sock *_sock, char *saddr, int slen, uint16_t *sport,
+ char *caddr, int clen, uint16_t *cport)
+{
+ struct spdk_vpp_session *session = __vpp_session(_sock);
+ const char *result = NULL;
+
+ assert(session != NULL);
+ assert(g_svm.vpp_initialized);
+
+ if (session->app_session.transport.is_ip4) {
+ result = inet_ntop(AF_INET, &session->app_session.transport.lcl_ip.ip4.as_u8,
+ saddr, slen);
+ } else {
+ result = inet_ntop(AF_INET6, &session->app_session.transport.lcl_ip.ip6.as_u8,
+ saddr, slen);
+ }
+ if (result == NULL) {
+ return -1;
+ }
+
+ if (sport) {
+ *sport = ntohs(session->app_session.transport.lcl_port);
+ }
+
+ if (session->app_session.transport.is_ip4) {
+ result = inet_ntop(AF_INET, &session->app_session.transport.rmt_ip.ip4.as_u8,
+ caddr, clen);
+ } else {
+ result = inet_ntop(AF_INET6, &session->app_session.transport.rmt_ip.ip6.as_u8,
+ caddr, clen);
+ }
+ if (result == NULL) {
+ return -1;
+ }
+
+ if (cport) {
+ *cport = ntohs(session->app_session.transport.rmt_port);
+ }
+
+ return 0;
+}
+
+enum spdk_vpp_create_type {
+ SPDK_SOCK_CREATE_LISTEN,
+ SPDK_SOCK_CREATE_CONNECT,
+};
+
+/******************************************************************************
+ * VPP message handlers
+ */
+static void
+session_accepted_handler(session_accepted_msg_t *mp)
+{
+ svm_fifo_t *rx_fifo, *tx_fifo;
+ struct spdk_vpp_session *client_session, *listen_session;
+
+ pthread_mutex_lock(&g_svm.session_get_lock);
+ listen_session = vpp_session_get_by_handle(mp->listener_handle, true);
+ pthread_mutex_unlock(&g_svm.session_get_lock);
+ if (!listen_session) {
+ SPDK_ERRLOG("Listener not found\n");
+ return;
+ }
+
+ SPDK_DEBUGLOG(SPDK_SOCK_VPP, "Listeners handle is %" PRIu64 "\n", mp->listener_handle);
+
+ /* Allocate local session for a client and set it up */
+ client_session = vpp_session_create();
+ if (client_session == NULL) {
+ SPDK_ERRLOG("Cannot create new session\n");
+ return;
+ }
+
+ SPDK_DEBUGLOG(SPDK_SOCK_VPP, "Accept session %p (%d) on %p (%d/%" PRIu64 ")\n",
+ client_session, client_session->id, listen_session, listen_session->id,
+ listen_session->handle);
+
+ rx_fifo = uword_to_pointer(mp->server_rx_fifo, svm_fifo_t *);
+ rx_fifo->client_session_index = client_session->id;
+ tx_fifo = uword_to_pointer(mp->server_tx_fifo, svm_fifo_t *);
+ tx_fifo->client_session_index = client_session->id;
+
+ client_session->handle = mp->handle;
+ client_session->context = mp->context;
+ client_session->app_session.rx_fifo = rx_fifo;
+ client_session->app_session.tx_fifo = tx_fifo;
+ client_session->app_session.vpp_evt_q = uword_to_pointer(mp->vpp_event_queue_address,
+ svm_msg_q_t *);
+
+ client_session->is_server = true;
+ client_session->app_session.transport.rmt_port = mp->port;
+ client_session->app_session.transport.is_ip4 = mp->is_ip4;
+ memcpy(&client_session->app_session.transport.rmt_ip, mp->ip, sizeof(mp->ip));
+
+ client_session->app_session.transport.lcl_port = listen_session->app_session.transport.lcl_port;
+ memcpy(&client_session->app_session.transport.lcl_ip, &listen_session->app_session.transport.lcl_ip,
+ sizeof(listen_session->app_session.transport.lcl_ip));
+ client_session->app_session.transport.is_ip4 = listen_session->app_session.transport.is_ip4;
+
+ client_session->app_session.session_state = VPP_SESSION_STATE_READY;
+
+ pthread_mutex_lock(&listen_session->accept_session_lock);
+
+ clib_fifo_add1(listen_session->accept_session_index_fifo,
+ client_session->id);
+
+ pthread_mutex_unlock(&listen_session->accept_session_lock);
+}
+
+static void
+session_connected_handler(session_connected_msg_t *mp)
+{
+ struct spdk_vpp_session *session;
+ svm_fifo_t *rx_fifo, *tx_fifo;
+
+ session = vpp_session_get(mp->context);
+ if (session == NULL) {
+ return;
+ }
+
+ if (mp->retval) {
+ SPDK_ERRLOG("Connection failed (%d).\n", ntohl(mp->retval));
+ session->app_session.session_state = VPP_SESSION_STATE_FAILED;
+ return;
+ }
+
+ session->app_session.vpp_evt_q = uword_to_pointer(mp->vpp_event_queue_address,
+ svm_msg_q_t *);
+
+ rx_fifo = uword_to_pointer(mp->server_rx_fifo, svm_fifo_t *);
+ rx_fifo->client_session_index = session->id;
+ tx_fifo = uword_to_pointer(mp->server_tx_fifo, svm_fifo_t *);
+ tx_fifo->client_session_index = session->id;
+
+ session->app_session.rx_fifo = rx_fifo;
+ session->app_session.tx_fifo = tx_fifo;
+ session->handle = mp->handle;
+
+ /* Set lcl addr */
+ session->app_session.transport.is_ip4 = mp->is_ip4;
+ memcpy(&session->app_session.transport.lcl_ip, mp->lcl_ip, sizeof(mp->lcl_ip));
+ session->app_session.transport.lcl_port = mp->lcl_port;
+
+ session->app_session.session_state = VPP_SESSION_STATE_READY;
+}
+
+static void
+session_disconnected_handler(session_disconnected_msg_t *mp)
+{
+ struct spdk_vpp_session *session = 0;
+
+ pthread_mutex_lock(&g_svm.session_get_lock);
+ session = vpp_session_get_by_handle(mp->handle, false);
+ if (session == NULL) {
+ SPDK_ERRLOG("Session with handle=%" PRIu64 " not found.\n",
+ mp->handle);
+ pthread_mutex_unlock(&g_svm.session_get_lock);
+ return;
+ }
+ SPDK_DEBUGLOG(SPDK_SOCK_VPP, "Disconnect session %p (%d) handler\n", session, session->id);
+
+ /* We need to postpone session deletion to inform upper layer */
+ session->app_session.session_state = VPP_SESSION_STATE_DISCONNECT;
+ pthread_mutex_unlock(&g_svm.session_get_lock);
+}
+
+static void
+session_reset_handler(session_reset_msg_t *mp)
+{
+ int rv = 0;
+ struct spdk_vpp_session *session = NULL;
+ app_session_evt_t app_evt;
+ session_reset_reply_msg_t *rmp;
+
+ pthread_mutex_lock(&g_svm.session_get_lock);
+ session = vpp_session_get_by_handle(mp->handle, false);
+ if (session == NULL) {
+ SPDK_ERRLOG("Session with handle=%" PRIu64 " not found.\n",
+ mp->handle);
+ pthread_mutex_unlock(&g_svm.session_get_lock);
+ return;
+ }
+ SPDK_DEBUGLOG(SPDK_SOCK_VPP, "Reset session %p (%d) handler\n", session, session->id);
+
+ session->app_session.session_state = VPP_SESSION_STATE_DISCONNECT;
+ pthread_mutex_unlock(&g_svm.session_get_lock);
+
+ app_alloc_ctrl_evt_to_vpp(session->app_session.vpp_evt_q, &app_evt,
+ SESSION_CTRL_EVT_RESET_REPLY);
+ rmp = (session_reset_reply_msg_t *) app_evt.evt->data;
+ rmp->retval = rv;
+ rmp->handle = mp->handle;
+ app_send_ctrl_evt_to_vpp(session->app_session.vpp_evt_q, &app_evt);
+}
+
+static void
+session_bound_handler(session_bound_msg_t *mp)
+{
+ struct spdk_vpp_session *session;
+
+ /* Context should be set to the session index */
+ session = vpp_session_get(mp->context);
+
+ if (mp->retval) {
+ SPDK_ERRLOG("Bind failed (%d).\n", ntohl(mp->retval));
+ session->app_session.session_state = VPP_SESSION_STATE_FAILED;
+ return;
+ }
+
+ /* Set local address */
+ session->app_session.transport.is_ip4 = mp->lcl_is_ip4;
+ memcpy(&session->app_session.transport.lcl_ip, mp->lcl_ip, sizeof(mp->lcl_ip));
+ session->app_session.transport.lcl_port = mp->lcl_port;
+
+ /* Register listener */
+ session->handle = mp->handle;
+
+ SPDK_DEBUGLOG(SPDK_SOCK_VPP, "Bind session %p (%d/%" PRIu64 ")\n",
+ session, session->id, session->handle);
+
+ /* Session binded, set listen state */
+ session->is_listen = true;
+ session->app_session.session_state = VPP_SESSION_STATE_READY;
+}
+
+static void
+session_unlisten_reply_handler(session_unlisten_reply_msg_t *mp)
+{
+ struct spdk_vpp_session *session;
+
+ if (mp->retval != 0) {
+ SPDK_ERRLOG("Cannot unbind socket\n");
+ return;
+ }
+
+ session = vpp_session_get(mp->context);
+ if (session == NULL) {
+ SPDK_ERRLOG("Cannot find a session by context\n");
+ return;
+ }
+ SPDK_DEBUGLOG(SPDK_SOCK_VPP, "Unbind session %p (%d)\n", session, session->id);
+
+ session->app_session.session_state = VPP_SESSION_STATE_CLOSE;
+}
+
+static void
+handle_mq_event(session_event_t *e)
+{
+ switch (e->event_type) {
+ case SESSION_CTRL_EVT_BOUND:
+ session_bound_handler((session_bound_msg_t *) e->data);
+ break;
+ case SESSION_CTRL_EVT_ACCEPTED:
+ session_accepted_handler((session_accepted_msg_t *) e->data);
+ break;
+ case SESSION_CTRL_EVT_CONNECTED:
+ session_connected_handler((session_connected_msg_t *) e->data);
+ break;
+ case SESSION_CTRL_EVT_DISCONNECTED:
+ session_disconnected_handler((session_disconnected_msg_t *) e->data);
+ break;
+ case SESSION_CTRL_EVT_RESET:
+ session_reset_handler((session_reset_msg_t *) e->data);
+ break;
+ case SESSION_CTRL_EVT_UNLISTEN_REPLY:
+ session_unlisten_reply_handler((session_unlisten_reply_msg_t *) e->data);
+ break;
+ default:
+ SPDK_DEBUGLOG(SPDK_SOCK_VPP, "Unhandled event %u\n", e->event_type);
+ }
+}
+
+static int
+vpp_queue_poller(void *ctx)
+{
+ uword msg;
+
+ if (g_svm.vl_output_queue->cursize > 0 &&
+ !svm_queue_sub_raw(g_svm.vl_output_queue, (u8 *)&msg)) {
+ vl_msg_api_handler((void *)msg);
+ }
+
+ return SPDK_POLLER_BUSY;
+}
+
+static int
+app_queue_poller(void *ctx)
+{
+ session_event_t *e;
+ svm_msg_q_msg_t msg;
+
+ if (!svm_msg_q_is_empty(g_svm.app_event_queue)) {
+ svm_msg_q_sub(g_svm.app_event_queue, &msg, SVM_Q_WAIT, 0);
+ e = svm_msg_q_msg_data(g_svm.app_event_queue, &msg);
+ handle_mq_event(e);
+ svm_msg_q_free_msg(g_svm.app_event_queue, &msg);
+ }
+ return SPDK_POLLER_BUSY;
+}
+
+/* This is required until sock.c API changes to asynchronous */
+static int
+_wait_for_session_state_change(struct spdk_vpp_session *session, enum spdk_vpp_session_state state)
+{
+ time_t start = time(NULL);
+ while (time(NULL) - start < 10) {
+ if (session->app_session.session_state == VPP_SESSION_STATE_FAILED) {
+ errno = EADDRNOTAVAIL;
+ return -1;
+ }
+ if (session->app_session.session_state == state) {
+ errno = 0;
+ return 0;
+ }
+ if (spdk_get_thread() == g_svm.init_thread) {
+ usleep(100000);
+ app_queue_poller(NULL);
+ vpp_queue_poller(NULL);
+ }
+ }
+ /* timeout */
+ errno = ETIMEDOUT;
+ return -1;
+}
+
+static int
+vpp_session_connect(struct spdk_vpp_session *session)
+{
+ vl_api_connect_sock_t *cmp;
+
+ cmp = vl_msg_api_alloc(sizeof(*cmp));
+ if (cmp == NULL) {
+ return -ENOMEM;
+ }
+ memset(cmp, 0, sizeof(*cmp));
+
+ cmp->_vl_msg_id = ntohs(VL_API_CONNECT_SOCK);
+ cmp->client_index = g_svm.my_client_index;
+ cmp->context = session->id;
+
+ cmp->vrf = 0 /* VPPCOM_VRF_DEFAULT */;
+ cmp->is_ip4 = (session->app_session.transport.is_ip4);
+ memcpy(cmp->ip, &session->app_session.transport.rmt_ip, sizeof(cmp->ip));
+ cmp->port = session->app_session.transport.rmt_port;
+ cmp->proto = TRANSPORT_PROTO_TCP;
+ vl_msg_api_send_shmem(g_svm.vl_input_queue, (u8 *)&cmp);
+
+ return _wait_for_session_state_change(session, VPP_SESSION_STATE_READY);
+}
+
+static void
+vl_api_disconnect_session_reply_t_handler(vl_api_disconnect_session_reply_t *mp)
+{
+ struct spdk_vpp_session *session;
+
+ if (mp->retval) {
+ SPDK_ERRLOG("Disconnecting session failed (%d).\n", ntohl(mp->retval));
+ return;
+ }
+
+ pthread_mutex_lock(&g_svm.session_get_lock);
+ session = vpp_session_get_by_handle(mp->handle, false);
+ if (session == NULL) {
+ SPDK_ERRLOG("Invalid session handler (%" PRIu64 ").\n", mp->handle);
+ pthread_mutex_unlock(&g_svm.session_get_lock);
+ return;
+ }
+ SPDK_DEBUGLOG(SPDK_SOCK_VPP, "Session disconnected %p (%d)\n", session, session->id);
+ session->app_session.session_state = VPP_SESSION_STATE_CLOSE;
+ pthread_mutex_unlock(&g_svm.session_get_lock);
+}
+
+static int
+vpp_session_disconnect(struct spdk_vpp_session *session)
+{
+ int rv = 0;
+ vl_api_disconnect_session_t *dmp;
+ session_disconnected_reply_msg_t *rmp;
+ app_session_evt_t app_evt;
+
+ if (session->app_session.session_state == VPP_SESSION_STATE_DISCONNECT) {
+ SPDK_DEBUGLOG(SPDK_SOCK_VPP, "Session is already in disconnecting state %p (%d)\n",
+ session, session->id);
+
+ app_alloc_ctrl_evt_to_vpp(session->app_session.vpp_evt_q, &app_evt,
+ SESSION_CTRL_EVT_DISCONNECTED_REPLY);
+ rmp = (session_disconnected_reply_msg_t *) app_evt.evt->data;
+ rmp->retval = rv;
+ rmp->handle = session->handle;
+ rmp->context = session->context;
+ app_send_ctrl_evt_to_vpp(session->app_session.vpp_evt_q, &app_evt);
+
+ return 0;
+ }
+ SPDK_DEBUGLOG(SPDK_SOCK_VPP, "Disconnect session %p (%d)\n", session, session->id);
+
+ dmp = vl_msg_api_alloc(sizeof(*dmp));
+ if (dmp == NULL) {
+ return -ENOMEM;
+ }
+ memset(dmp, 0, sizeof(*dmp));
+ dmp->_vl_msg_id = ntohs(VL_API_DISCONNECT_SESSION);
+ dmp->client_index = g_svm.my_client_index;
+ dmp->handle = session->handle;
+ vl_msg_api_send_shmem(g_svm.vl_input_queue, (u8 *)&dmp);
+
+ return _wait_for_session_state_change(session, VPP_SESSION_STATE_CLOSE);
+}
+
+static int
+send_unbind_sock(struct spdk_vpp_session *session)
+{
+ vl_api_unbind_sock_t *ump;
+
+ SPDK_DEBUGLOG(SPDK_SOCK_VPP, "Unbind session %p (%d) request\n", session, session->id);
+
+ ump = vl_msg_api_alloc(sizeof(*ump));
+ if (ump == NULL) {
+ return -ENOMEM;
+ }
+ memset(ump, 0, sizeof(*ump));
+
+ ump->_vl_msg_id = ntohs(VL_API_UNBIND_SOCK);
+ ump->client_index = g_svm.my_client_index;
+ ump->handle = session->handle;
+ ump->context = session->id;
+ vl_msg_api_send_shmem(g_svm.vl_input_queue, (u8 *)&ump);
+
+ return _wait_for_session_state_change(session, VPP_SESSION_STATE_CLOSE);
+}
+
+static int
+vpp_session_listen(struct spdk_vpp_session *session)
+{
+ vl_api_bind_sock_t *bmp;
+
+ if (session->is_listen) {
+ /* Already in the listen state */
+ return 0;
+ }
+
+ clib_fifo_resize(session->accept_session_index_fifo, SPDK_VPP_LISTEN_QUEUE_SIZE);
+
+ session->is_server = 1;
+ bmp = vl_msg_api_alloc(sizeof(*bmp));
+ if (bmp == NULL) {
+ return -ENOMEM;
+ }
+ memset(bmp, 0, sizeof(*bmp));
+
+ bmp->_vl_msg_id = ntohs(VL_API_BIND_SOCK);
+ bmp->client_index = g_svm.my_client_index;
+ bmp->context = session->id;
+ bmp->vrf = 0;
+ bmp->is_ip4 = session->app_session.transport.is_ip4;
+ memcpy(bmp->ip, &session->app_session.transport.lcl_ip, sizeof(bmp->ip));
+ bmp->port = session->app_session.transport.lcl_port;
+ bmp->proto = TRANSPORT_PROTO_TCP;
+
+ vl_msg_api_send_shmem(g_svm.vl_input_queue, (u8 *)&bmp);
+
+ return _wait_for_session_state_change(session, VPP_SESSION_STATE_READY);
+}
+
+static struct spdk_sock *
+vpp_sock_create(const char *ip, int port, enum spdk_vpp_create_type type,
+ struct spdk_sock_opts *opts)
+{
+ struct spdk_vpp_session *session;
+ int rc;
+ uint8_t is_ip4 = 0;
+ ip46_address_t addr_buf;
+
+ if (!g_svm.vpp_initialized || ip == NULL) {
+ return NULL;
+ }
+
+ session = vpp_session_create();
+ if (session == NULL) {
+ SPDK_ERRLOG("vpp_session_create() failed\n");
+ errno = ENOMEM;
+ return NULL;
+ }
+
+ /* Check address family */
+ if (inet_pton(AF_INET, ip, &addr_buf.ip4.as_u8)) {
+ is_ip4 = 1;
+ } else if (inet_pton(AF_INET6, ip, &addr_buf.ip6.as_u8)) {
+ is_ip4 = 0;
+ } else {
+ SPDK_ERRLOG("IP address with invalid format\n");
+ errno = EAFNOSUPPORT;
+ goto err;
+ }
+
+ if (type == SPDK_SOCK_CREATE_LISTEN) {
+ session->app_session.transport.is_ip4 = is_ip4;
+ memcpy(&session->app_session.transport.lcl_ip, &addr_buf, sizeof(addr_buf));
+ session->app_session.transport.lcl_port = htons(port);
+
+ rc = vpp_session_listen(session);
+ if (rc != 0) {
+ errno = -rc;
+ SPDK_ERRLOG("session_listen() failed\n");
+ goto err;
+ }
+ } else if (type == SPDK_SOCK_CREATE_CONNECT) {
+ session->app_session.transport.is_ip4 = is_ip4;
+ memcpy(&session->app_session.transport.rmt_ip, &addr_buf, sizeof(addr_buf));
+ session->app_session.transport.rmt_port = htons(port);
+
+ rc = vpp_session_connect(session);
+ if (rc != 0) {
+ SPDK_ERRLOG("session_connect() failed\n");
+ goto err;
+ }
+ }
+
+ return &session->base;
+
+err:
+ vpp_session_free(session);
+ return NULL;
+}
+
+static struct spdk_sock *
+vpp_sock_listen(const char *ip, int port, struct spdk_sock_opts *opts)
+{
+ return vpp_sock_create(ip, port, SPDK_SOCK_CREATE_LISTEN, opts);
+}
+
+static struct spdk_sock *
+vpp_sock_connect(const char *ip, int port, struct spdk_sock_opts *opts)
+{
+ return vpp_sock_create(ip, port, SPDK_SOCK_CREATE_CONNECT, opts);
+}
+
+static struct spdk_sock *
+vpp_sock_accept(struct spdk_sock *_sock)
+{
+ struct spdk_vpp_session *listen_session = __vpp_session(_sock);
+ struct spdk_vpp_session *client_session = NULL;
+ u32 client_session_index = ~0;
+ uword elts = 0;
+ app_session_evt_t app_evt;
+ session_accepted_reply_msg_t *rmp;
+
+ assert(listen_session != NULL);
+ assert(g_svm.vpp_initialized);
+
+ if (listen_session->app_session.session_state != VPP_SESSION_STATE_READY) {
+ /* Listen session should be in the listen state */
+ errno = EWOULDBLOCK;
+ return NULL;
+ }
+
+ pthread_mutex_lock(&listen_session->accept_session_lock);
+
+ if (listen_session->accept_session_index_fifo != NULL) {
+ elts = clib_fifo_elts(listen_session->accept_session_index_fifo);
+ }
+
+ if (elts == 0) {
+ /* No client sessions */
+ errno = EAGAIN;
+ pthread_mutex_unlock(&listen_session->accept_session_lock);
+ return NULL;
+ }
+
+ clib_fifo_sub1(listen_session->accept_session_index_fifo,
+ client_session_index);
+
+ pthread_mutex_unlock(&listen_session->accept_session_lock);
+
+ client_session = vpp_session_get(client_session_index);
+ if (client_session == NULL) {
+ SPDK_ERRLOG("client session closed or aborted\n");
+ errno = ECONNABORTED;
+ return NULL;
+ }
+
+ SPDK_DEBUGLOG(SPDK_SOCK_VPP, "Client %p(%" PRIu32 ") accepted.\n",
+ client_session, client_session_index);
+
+ /*
+ * Send accept session reply
+ */
+ app_alloc_ctrl_evt_to_vpp(client_session->app_session.vpp_evt_q, &app_evt,
+ SESSION_CTRL_EVT_ACCEPTED_REPLY);
+ rmp = (session_accepted_reply_msg_t *) app_evt.evt->data;
+ rmp->handle = client_session->handle;
+ rmp->context = client_session->context;
+ app_send_ctrl_evt_to_vpp(client_session->app_session.vpp_evt_q, &app_evt);
+
+ return &client_session->base;
+}
+
+static int
+vpp_sock_close(struct spdk_sock *_sock)
+{
+ struct spdk_vpp_session *session = __vpp_session(_sock);
+
+ assert(session != NULL);
+ assert(g_svm.vpp_initialized);
+
+ if (session->is_listen) {
+ send_unbind_sock(session);
+ } else {
+ vpp_session_disconnect(session);
+ }
+ vpp_session_free(session);
+
+ return 0;
+}
+
+static ssize_t
+vpp_sock_recv(struct spdk_sock *_sock, void *buf, size_t len)
+{
+ struct spdk_vpp_session *session = __vpp_session(_sock);
+ int rc;
+ svm_fifo_t *rx_fifo;
+ uint32_t bytes;
+
+ assert(session != NULL);
+ assert(g_svm.vpp_initialized);
+
+ rx_fifo = session->app_session.rx_fifo;
+
+ bytes = svm_fifo_max_dequeue(session->app_session.rx_fifo);
+ if (bytes > (ssize_t)len) {
+ bytes = len;
+ }
+
+ if (bytes == 0) {
+ if (session->app_session.session_state == VPP_SESSION_STATE_DISCONNECT) {
+ /* Socket is disconnected */
+ SPDK_DEBUGLOG(SPDK_SOCK_VPP, "Client %p(%" PRIu32 ") is disconnected.\n",
+ session, session->id);
+ errno = 0;
+ return 0;
+ }
+ errno = EAGAIN;
+ return -1;
+ }
+
+ rc = app_recv_stream_raw(rx_fifo, buf, bytes, 0, 0);
+ if (rc < 0) {
+ errno = -rc;
+ return rc;
+ }
+
+ return rc;
+}
+
+static ssize_t
+vpp_sock_readv(struct spdk_sock *_sock, struct iovec *iov, int iovcnt)
+{
+ ssize_t total = 0;
+ int i, rc;
+
+ assert(_sock != NULL);
+ assert(g_svm.vpp_initialized);
+
+ for (i = 0; i < iovcnt; ++i) {
+ rc = vpp_sock_recv(_sock, iov[i].iov_base, iov[i].iov_len);
+ if (rc < 0) {
+ if (total > 0) {
+ break;
+ } else {
+ errno = -rc;
+ return -1;
+ }
+ } else {
+ total += rc;
+ if (rc < (ssize_t)iov[i].iov_len) {
+ /* Read less than buffer provided, no point to continue. */
+ break;
+ }
+ }
+ }
+ return total;
+}
+
+static ssize_t
+_vpp_sock_writev(struct spdk_sock *_sock, struct iovec *iov, int iovcnt)
+{
+ struct spdk_vpp_session *session = __vpp_session(_sock);
+ ssize_t total = 0;
+ int i, rc;
+ svm_fifo_t *tx_fifo;
+ session_evt_type_t et;
+
+ assert(session != NULL);
+ assert(g_svm.vpp_initialized);
+
+ tx_fifo = session->app_session.tx_fifo;
+ et = SESSION_IO_EVT_TX;
+
+ for (i = 0; i < iovcnt; ++i) {
+ if (svm_fifo_is_full(tx_fifo)) {
+ errno = EWOULDBLOCK;
+ return -1;
+ }
+
+ /* We use only stream connection for now */
+ rc = app_send_stream_raw(tx_fifo, session->app_session.vpp_evt_q,
+ iov[i].iov_base, iov[i].iov_len, et,
+ 1, SVM_Q_WAIT);
+
+ if (rc < 0) {
+ if (total > 0) {
+ break;
+ } else {
+ SPDK_DEBUGLOG(SPDK_SOCK_VPP, "Buffer overflow\n");
+ errno = EWOULDBLOCK;
+ return -1;
+ }
+ } else {
+ total += rc;
+ if (rc < (ssize_t)iov[i].iov_len) {
+ /* Write less than buffer provided, no point to continue. */
+ break;
+ }
+ }
+ }
+
+ return total;
+}
+
+static int
+_sock_flush(struct spdk_sock *sock)
+{
+ struct iovec iovs[IOV_BATCH_SIZE];
+ int iovcnt;
+ int retval;
+ struct spdk_sock_request *req;
+ int i;
+ ssize_t rc;
+ unsigned int offset;
+ size_t len;
+
+ /* Can't flush from within a callback or we end up with recursive calls */
+ if (sock->cb_cnt > 0) {
+ return 0;
+ }
+
+ /* Gather an iov */
+ iovcnt = 0;
+ req = TAILQ_FIRST(&sock->queued_reqs);
+ while (req) {
+ offset = req->internal.offset;
+
+ for (i = 0; i < req->iovcnt; i++) {
+ /* Consume any offset first */
+ if (offset >= SPDK_SOCK_REQUEST_IOV(req, i)->iov_len) {
+ offset -= SPDK_SOCK_REQUEST_IOV(req, i)->iov_len;
+ continue;
+ }
+
+ iovs[iovcnt].iov_base = SPDK_SOCK_REQUEST_IOV(req, i)->iov_base + offset;
+ iovs[iovcnt].iov_len = SPDK_SOCK_REQUEST_IOV(req, i)->iov_len - offset;
+ iovcnt++;
+
+ offset = 0;
+
+ if (iovcnt >= IOV_BATCH_SIZE) {
+ break;
+ }
+ }
+
+ if (iovcnt >= IOV_BATCH_SIZE) {
+ break;
+ }
+
+ req = TAILQ_NEXT(req, internal.link);
+ }
+
+ if (iovcnt == 0) {
+ return 0;
+ }
+
+ /* Perform the vectored write */
+ rc = _vpp_sock_writev(sock, iovs, iovcnt);
+ if (rc <= 0) {
+ if (errno == EAGAIN || errno == EWOULDBLOCK) {
+ return 0;
+ }
+ return rc;
+ }
+
+ /* Consume the requests that were actually written */
+ req = TAILQ_FIRST(&sock->queued_reqs);
+ while (req) {
+ offset = req->internal.offset;
+
+ for (i = 0; i < req->iovcnt; i++) {
+ /* Advance by the offset first */
+ if (offset >= SPDK_SOCK_REQUEST_IOV(req, i)->iov_len) {
+ offset -= SPDK_SOCK_REQUEST_IOV(req, i)->iov_len;
+ continue;
+ }
+
+ /* Calculate the remaining length of this element */
+ len = SPDK_SOCK_REQUEST_IOV(req, i)->iov_len - offset;
+
+ if (len > (size_t)rc) {
+ /* This element was partially sent. */
+ req->internal.offset += rc;
+ return 0;
+ }
+
+ offset = 0;
+ req->internal.offset += len;
+ rc -= len;
+ }
+
+ /* Handled a full request. */
+ req->internal.offset = 0;
+ spdk_sock_request_pend(sock, req);
+
+ /* The _vpp_sock_writev above isn't currently asynchronous,
+ * so it's already done. */
+ retval = spdk_sock_request_put(sock, req, 0);
+
+ if (rc == 0 || retval) {
+ break;
+ }
+
+ req = TAILQ_FIRST(&sock->queued_reqs);
+ }
+
+ return 0;
+}
+
+static ssize_t
+vpp_sock_writev(struct spdk_sock *_sock, struct iovec *iov, int iovcnt)
+{
+ int rc;
+
+ /* In order to process a writev, we need to flush any asynchronous writes
+ * first. */
+ rc = _sock_flush(_sock);
+ if (rc < 0) {
+ return rc;
+ }
+
+ if (!TAILQ_EMPTY(&_sock->queued_reqs)) {
+ /* We weren't able to flush all requests */
+ errno = EAGAIN;
+ return -1;
+ }
+
+ return _vpp_sock_writev(_sock, iov, iovcnt);
+}
+
+static void
+vpp_sock_writev_async(struct spdk_sock *sock, struct spdk_sock_request *req)
+{
+ int rc;
+
+ spdk_sock_request_queue(sock, req);
+
+ if (sock->group_impl == NULL) {
+ spdk_sock_request_put(sock, req, -ENOTSUP);
+ return;
+ }
+
+ /* If there are a sufficient number queued, just flush them out immediately. */
+ if (sock->queued_iovcnt >= IOV_BATCH_SIZE) {
+ rc = _sock_flush(sock);
+ if (rc) {
+ spdk_sock_abort_requests(sock);
+ }
+ }
+}
+
+static int
+vpp_sock_set_recvlowat(struct spdk_sock *_sock, int nbytes)
+{
+ assert(g_svm.vpp_initialized);
+
+ return 0;
+}
+
+static int
+vpp_sock_set_recvbuf(struct spdk_sock *_sock, int sz)
+{
+ assert(g_svm.vpp_initialized);
+
+ return 0;
+}
+
+static int
+vpp_sock_set_sendbuf(struct spdk_sock *_sock, int sz)
+{
+ assert(g_svm.vpp_initialized);
+
+ return 0;
+}
+
+static bool
+vpp_sock_is_ipv6(struct spdk_sock *_sock)
+{
+ return !__vpp_session(_sock)->app_session.transport.is_ip4;
+}
+
+static bool
+vpp_sock_is_ipv4(struct spdk_sock *_sock)
+{
+ return __vpp_session(_sock)->app_session.transport.is_ip4;
+}
+
+static bool
+vpp_sock_is_connected(struct spdk_sock *_sock)
+{
+ assert(g_svm.vpp_initialized);
+
+ return (__vpp_session(_sock)->app_session.session_state == VPP_SESSION_STATE_READY);
+}
+
+static int
+vpp_sock_get_placement_id(struct spdk_sock *_sock, int *placement_id)
+{
+ return -1;
+}
+
+static struct spdk_sock_group_impl *
+vpp_sock_group_impl_create(void)
+{
+ struct spdk_vpp_sock_group_impl *group_impl;
+
+ if (!g_svm.vpp_initialized) {
+ return NULL;
+ }
+
+ group_impl = calloc(1, sizeof(*group_impl));
+ if (group_impl == NULL) {
+ SPDK_ERRLOG("sock_group allocation failed\n");
+ errno = ENOMEM;
+ return NULL;
+ }
+
+ return &group_impl->base;
+}
+
+static int
+vpp_sock_group_impl_add_sock(struct spdk_sock_group_impl *_group,
+ struct spdk_sock *_sock)
+{
+ /* We expect that higher level do it for us */
+ return 0;
+}
+
+static int
+vpp_sock_group_impl_remove_sock(struct spdk_sock_group_impl *_group,
+ struct spdk_sock *_sock)
+{
+ /* We expect that higher level do it for us */
+ return 0;
+}
+
+static bool
+vpp_session_read_ready(struct spdk_vpp_session *session)
+{
+ svm_fifo_t *rx_fifo = NULL;
+ uint32_t ready = 0;
+
+ if (session->app_session.session_state == VPP_SESSION_STATE_DISCONNECT) {
+ /* If session not found force reading to close it.
+ * NOTE: We're expecting here that upper layer will close
+ * connection when next read fails.
+ */
+ return true;
+ }
+
+ if (session->app_session.session_state == VPP_SESSION_STATE_READY) {
+ rx_fifo = session->app_session.rx_fifo;
+ ready = svm_fifo_max_dequeue(rx_fifo);
+ }
+
+ return ready > 0;
+}
+
+static int
+vpp_sock_group_impl_poll(struct spdk_sock_group_impl *_group, int max_events,
+ struct spdk_sock **socks)
+{
+ int num_events, rc;
+ struct spdk_sock *sock, *tmp;
+ struct spdk_vpp_session *session;
+ struct spdk_vpp_sock_group_impl *group;
+
+ assert(_group != NULL);
+ assert(socks != NULL);
+ assert(g_svm.vpp_initialized);
+
+ group = __vpp_group_impl(_group);
+ num_events = 0;
+
+ /* This must be a TAILQ_FOREACH_SAFE because while flushing,
+ * a completion callback could remove the sock from the
+ * group. */
+ TAILQ_FOREACH_SAFE(sock, &_group->socks, link, tmp) {
+ rc = _sock_flush(sock);
+ if (rc) {
+ spdk_sock_abort_requests(sock);
+ }
+ }
+
+ sock = group->last_sock;
+ if (sock == NULL) {
+ sock = TAILQ_FIRST(&group->base.socks);
+ }
+
+ while (sock != NULL) {
+ session = __vpp_session(sock);
+ if (vpp_session_read_ready(session)) {
+ socks[num_events] = sock;
+ num_events++;
+ if (num_events >= max_events) {
+ sock = TAILQ_NEXT(sock, link);
+ break;
+ }
+ }
+ sock = TAILQ_NEXT(sock, link);
+ }
+ group->last_sock = sock;
+
+ return num_events;
+}
+
+static int
+vpp_sock_group_impl_close(struct spdk_sock_group_impl *_group)
+{
+ free(_group);
+ return 0;
+}
+
+/******************************************************************************
+ * Initialize and attach to the VPP
+ */
+static int
+vpp_app_attach(void)
+{
+ vl_api_application_attach_t *bmp;
+ u32 fifo_size = 16 << 20;
+
+ bmp = vl_msg_api_alloc(sizeof(*bmp));
+ if (bmp == NULL) {
+ return -ENOMEM;
+ }
+ memset(bmp, 0, sizeof(*bmp));
+
+ bmp->_vl_msg_id = ntohs(VL_API_APPLICATION_ATTACH);
+ bmp->client_index = g_svm.my_client_index;
+ bmp->context = ntohl(0xfeedface);
+
+ bmp->options[APP_OPTIONS_FLAGS] = APP_OPTIONS_FLAGS_ACCEPT_REDIRECT;
+ bmp->options[APP_OPTIONS_FLAGS] |= APP_OPTIONS_FLAGS_ADD_SEGMENT;
+
+ bmp->options[APP_OPTIONS_PREALLOC_FIFO_PAIRS] = 16;
+ bmp->options[APP_OPTIONS_RX_FIFO_SIZE] = fifo_size;
+ bmp->options[APP_OPTIONS_TX_FIFO_SIZE] = fifo_size;
+ bmp->options[APP_OPTIONS_ADD_SEGMENT_SIZE] = 256 << 20;
+ bmp->options[APP_OPTIONS_SEGMENT_SIZE] = 512 << 20;
+ bmp->options[APP_OPTIONS_EVT_QUEUE_SIZE] = 256;
+
+ vl_msg_api_send_shmem(g_svm.vl_input_queue, (u8 *)&bmp);
+
+ return 0;
+}
+static void
+vl_api_session_enable_disable_reply_t_handler(vl_api_session_enable_disable_reply_t *mp)
+{
+ if (mp->retval) {
+ SPDK_ERRLOG("Session enable failed (%d).\n", ntohl(mp->retval));
+ } else {
+ SPDK_NOTICELOG("Session layer enabled\n");
+ g_svm.vpp_state = VPP_STATE_ENABLED;
+ vpp_app_attach();
+ }
+}
+
+static int
+vpp_session_enable(u8 is_enable)
+{
+ vl_api_session_enable_disable_t *bmp;
+
+ bmp = vl_msg_api_alloc(sizeof(*bmp));
+ if (bmp == NULL) {
+ return -ENOMEM;
+ }
+ memset(bmp, 0, sizeof(*bmp));
+
+ bmp->_vl_msg_id = ntohs(VL_API_SESSION_ENABLE_DISABLE);
+ bmp->client_index = g_svm.my_client_index;
+ bmp->context = htonl(0xfeedface);
+ bmp->is_enable = is_enable;
+ vl_msg_api_send_shmem(g_svm.vl_input_queue, (u8 *)&bmp);
+
+ return 0;
+}
+
+static void
+vpp_application_attached(void *arg)
+{
+ SPDK_NOTICELOG("VPP net framework initialized.\n");
+ g_svm.vpp_state = VPP_STATE_ATTACHED;
+ g_svm.vpp_initialized = true;
+ g_svm.app_queue_poller = SPDK_POLLER_REGISTER(app_queue_poller, NULL, 100);
+ spdk_net_framework_init_next(0);
+}
+
+static int
+ssvm_segment_attach(char *name, ssvm_segment_type_t type, int fd)
+{
+ svm_fifo_segment_create_args_t a;
+ int rv;
+
+ SPDK_DEBUGLOG(SPDK_SOCK_VPP, "Attaching segment %s\n", name);
+
+ clib_memset(&a, 0, sizeof(a));
+ a.segment_name = (char *) name;
+ a.segment_type = type;
+
+ assert(type == SSVM_SEGMENT_MEMFD);
+ a.memfd_fd = fd;
+
+ if ((rv = svm_fifo_segment_attach(&g_svm.segment_main, &a))) {
+ SPDK_ERRLOG("Segment '%s' attach failed (%d).\n", name, rv);
+ return rv;
+ }
+
+ vec_reset_length(a.new_segment_indices);
+ return 0;
+}
+
+static void
+vl_api_application_attach_reply_t_handler(vl_api_application_attach_reply_t *mp)
+{
+ u32 n_fds = 0;
+
+ if (mp->retval) {
+ SPDK_ERRLOG("Application attach to VPP failed (%d)\n",
+ ntohl(mp->retval));
+ goto err;
+ }
+
+ if (mp->segment_name_length == 0) {
+ SPDK_ERRLOG("segment_name_length zero\n");
+ goto err;
+ }
+
+ assert(mp->app_event_queue_address);
+ g_svm.app_event_queue = uword_to_pointer(mp->app_event_queue_address, svm_msg_q_t *);
+
+ if (mp->n_fds) {
+ int fds[mp->n_fds];
+
+ vl_socket_client_recv_fd_msg(fds, mp->n_fds, 5);
+
+ if (mp->fd_flags & SESSION_FD_F_VPP_MQ_SEGMENT) {
+ if (ssvm_segment_attach(0, SSVM_SEGMENT_MEMFD, fds[n_fds++])) {
+ goto err;
+ }
+ }
+
+ if (mp->fd_flags & SESSION_FD_F_MEMFD_SEGMENT) {
+ if (ssvm_segment_attach((char *) mp->segment_name, SSVM_SEGMENT_MEMFD, fds[n_fds++])) {
+ goto err;
+ }
+ }
+
+ if (mp->fd_flags & SESSION_FD_F_MQ_EVENTFD) {
+ svm_msg_q_set_consumer_eventfd(g_svm.app_event_queue, fds[n_fds++]);
+ }
+ }
+
+ spdk_thread_send_msg(g_svm.init_thread, vpp_application_attached, NULL);
+ return;
+err:
+ g_svm.vpp_state = VPP_STATE_FAILED;
+ return;
+}
+
+/* Detach */
+static void
+vpp_application_detached(void *arg)
+{
+ if (!g_svm.vpp_initialized) {
+ return;
+ }
+
+ spdk_poller_unregister(&g_svm.vpp_queue_poller);
+ spdk_poller_unregister(&g_svm.app_queue_poller);
+ spdk_poller_unregister(&g_svm.timeout_poller);
+
+ g_svm.vpp_initialized = false;
+ g_svm.vpp_state = VPP_STATE_START;
+ pthread_mutex_destroy(&g_svm.session_get_lock);
+ vl_socket_client_disconnect();
+
+ SPDK_NOTICELOG("Application detached\n");
+
+ spdk_net_framework_fini_next();
+}
+
+static int
+vpp_application_detached_timeout(void *arg)
+{
+ if (g_svm.vpp_initialized) {
+ /* We need to finish detach on initial thread */
+ spdk_thread_send_msg(g_svm.init_thread, vpp_application_detached, NULL);
+ }
+ return SPDK_POLLER_BUSY;
+}
+
+static void
+vl_api_application_detach_reply_t_handler(vl_api_application_detach_reply_t *mp)
+{
+ if (mp->retval) {
+ SPDK_ERRLOG("Application detach from VPP failed (%d).\n", ntohl(mp->retval));
+ g_svm.vpp_state = VPP_STATE_FAILED;
+ }
+
+ /* We need to finish detach on initial thread */
+ spdk_thread_send_msg(g_svm.init_thread, vpp_application_detached, NULL);
+}
+
+static int
+vpp_app_detach(void)
+{
+ vl_api_application_detach_t *bmp;
+
+ bmp = vl_msg_api_alloc(sizeof(*bmp));
+ if (bmp == NULL) {
+ return -ENOMEM;
+ }
+ memset(bmp, 0, sizeof(*bmp));
+
+ bmp->_vl_msg_id = ntohs(VL_API_APPLICATION_DETACH);
+ bmp->client_index = g_svm.my_client_index;
+ bmp->context = ntohl(0xfeedface);
+ vl_msg_api_send_shmem(g_svm.vl_input_queue, (u8 *)&bmp);
+
+ g_svm.timeout_poller = SPDK_POLLER_REGISTER(vpp_application_detached_timeout,
+ NULL, 10000000);
+
+ return 0;
+}
+
+static void
+vl_api_map_another_segment_t_handler(vl_api_map_another_segment_t *mp)
+{
+ ssvm_segment_type_t seg_type = SSVM_SEGMENT_SHM;
+ int fd = -1;
+
+ if (mp->fd_flags) {
+ vl_socket_client_recv_fd_msg(&fd, 1, 5);
+ seg_type = SSVM_SEGMENT_MEMFD;
+ }
+
+ if (ssvm_segment_attach((char *) mp->segment_name,
+ seg_type, fd)) {
+ SPDK_ERRLOG("svm_fifo_segment_attach ('%s') failed\n",
+ mp->segment_name);
+ return;
+ }
+
+ SPDK_DEBUGLOG(SPDK_SOCK_VPP, "New segment ('%s') attached\n",
+ mp->segment_name);
+}
+
+static void
+vpp_net_framework_set_handlers(void)
+{
+ /* Set up VPP handlers */
+#define _(N,n) \
+ vl_msg_api_set_handlers(VL_API_##N, #n, \
+ vl_api_##n##_t_handler, \
+ vl_noop_handler, \
+ vl_api_##n##_t_endian, \
+ vl_api_##n##_t_print, \
+ sizeof(vl_api_##n##_t), 1);
+ _(SESSION_ENABLE_DISABLE_REPLY, session_enable_disable_reply) \
+ _(DISCONNECT_SESSION_REPLY, disconnect_session_reply) \
+ _(APPLICATION_ATTACH_REPLY, application_attach_reply) \
+ _(APPLICATION_DETACH_REPLY, application_detach_reply) \
+ _(MAP_ANOTHER_SEGMENT, map_another_segment)
+#undef _
+}
+
+static void
+vpp_net_framework_init(void)
+{
+ char *app_name;
+ api_main_t *am = &api_main;
+
+ clib_mem_init_thread_safe(0, SPDK_VPP_CLIB_MEM_SIZE);
+ svm_fifo_segment_main_init(&g_svm.segment_main, SPDK_VPP_SEGMENT_BASEVA,
+ SPDK_VPP_SEGMENT_TIMEOUT);
+
+ app_name = spdk_sprintf_alloc("SPDK_%d", getpid());
+ if (app_name == NULL) {
+ SPDK_ERRLOG("Cannot alloc memory for SPDK app name\n");
+ return;
+ }
+
+ vpp_net_framework_set_handlers();
+
+ if (vl_socket_client_connect((char *) API_SOCKET_FILE, app_name,
+ 0 /* default rx, tx buffer */)) {
+ SPDK_ERRLOG("Client \"%s\" failed to connect to the socket \"%s\".\n",
+ app_name, API_SOCKET_FILE);
+ goto err;
+ }
+
+ if (vl_socket_client_init_shm(0, 0 /* want_pthread */)) {
+ SPDK_ERRLOG("SHM API initialization failed.\n");
+ vl_socket_client_disconnect();
+ goto err;
+ }
+
+ g_svm.vl_input_queue = am->shmem_hdr->vl_input_queue;
+ g_svm.vl_output_queue = am->vl_input_queue;
+
+ g_svm.my_client_index = am->my_client_index;
+ pthread_mutex_init(&g_svm.session_get_lock, NULL);
+
+ free(app_name);
+
+ g_svm.init_thread = spdk_get_thread();
+ SPDK_NOTICELOG("Enable VPP session\n");
+
+ g_svm.vpp_queue_poller = SPDK_POLLER_REGISTER(vpp_queue_poller, NULL, 100);
+
+ vpp_session_enable(1);
+
+ return;
+
+err:
+ free(app_name);
+ spdk_net_framework_init_next(0);
+}
+
+/******************************************************************************
+ * Register components
+ */
+static struct spdk_net_impl g_vpp_net_impl = {
+ .name = "vpp",
+ .getaddr = vpp_sock_getaddr,
+ .connect = vpp_sock_connect,
+ .listen = vpp_sock_listen,
+ .accept = vpp_sock_accept,
+ .close = vpp_sock_close,
+ .recv = vpp_sock_recv,
+ .readv = vpp_sock_readv,
+ .writev = vpp_sock_writev,
+ .writev_async = vpp_sock_writev_async,
+ .set_recvlowat = vpp_sock_set_recvlowat,
+ .set_recvbuf = vpp_sock_set_recvbuf,
+ .set_sendbuf = vpp_sock_set_sendbuf,
+ .is_ipv6 = vpp_sock_is_ipv6,
+ .is_ipv4 = vpp_sock_is_ipv4,
+ .is_connected = vpp_sock_is_connected,
+ .get_placement_id = vpp_sock_get_placement_id,
+ .group_impl_create = vpp_sock_group_impl_create,
+ .group_impl_add_sock = vpp_sock_group_impl_add_sock,
+ .group_impl_remove_sock = vpp_sock_group_impl_remove_sock,
+ .group_impl_poll = vpp_sock_group_impl_poll,
+ .group_impl_close = vpp_sock_group_impl_close,
+};
+
+SPDK_NET_IMPL_REGISTER(vpp, &g_vpp_net_impl, DEFAULT_SOCK_PRIORITY + 2);
+
+static void
+vpp_net_framework_fini(void)
+{
+ if (g_svm.vpp_initialized) {
+ vpp_app_detach();
+ } else {
+ spdk_net_framework_fini_next();
+ }
+}
+
+static struct spdk_net_framework g_vpp_net_framework = {
+ .name = "vpp",
+ .init = vpp_net_framework_init,
+ .fini = vpp_net_framework_fini,
+};
+
+SPDK_NET_FRAMEWORK_REGISTER(vpp, &g_vpp_net_framework);
+
+SPDK_LOG_REGISTER_COMPONENT("sock_vpp", SPDK_SOCK_VPP)
+
+#endif /* __clang_analyzer__ */