diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-21 11:54:28 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-21 11:54:28 +0000 |
commit | e6918187568dbd01842d8d1d2c808ce16a894239 (patch) | |
tree | 64f88b554b444a49f656b6c656111a145cbbaa28 /src/spdk/module/sock/vpp | |
parent | Initial commit. (diff) | |
download | ceph-upstream/18.2.2.tar.xz ceph-upstream/18.2.2.zip |
Adding upstream version 18.2.2.upstream/18.2.2
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to '')
-rw-r--r-- | src/spdk/module/sock/vpp/Makefile | 55 | ||||
-rw-r--r-- | src/spdk/module/sock/vpp/vpp.c | 1633 |
2 files changed, 1688 insertions, 0 deletions
diff --git a/src/spdk/module/sock/vpp/Makefile b/src/spdk/module/sock/vpp/Makefile new file mode 100644 index 000000000..016018c77 --- /dev/null +++ b/src/spdk/module/sock/vpp/Makefile @@ -0,0 +1,55 @@ +# +# BSD LICENSE +# +# Copyright (c) Intel Corporation. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions +# are met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above copyright +# notice, this list of conditions and the following disclaimer in +# the documentation and/or other materials provided with the +# distribution. +# * Neither the name of Intel Corporation nor the names of its +# contributors may be used to endorse or promote products derived +# from this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. +# + +SPDK_ROOT_DIR := $(abspath $(CURDIR)/../../..) +include $(SPDK_ROOT_DIR)/mk/spdk.common.mk + +SO_VER := 2 +SO_MINOR := 0 + +C_SRCS += vpp.c +CFLAGS += -Wno-sign-compare -Wno-error=old-style-definition +CFLAGS += -Wno-error=strict-prototypes -Wno-error=ignored-qualifiers + +GCC_VERSION=$(shell $(CC) -dumpversion | cut -d. -f1) + +# disable packed member unalign warnings +ifeq ($(shell test $(GCC_VERSION) -ge 9 && echo 1), 1) +CFLAGS += -Wno-error=address-of-packed-member +endif + +LIBNAME = sock_vpp + +SPDK_MAP_FILE = $(SPDK_ROOT_DIR)/mk/spdk_blank.map + +include $(SPDK_ROOT_DIR)/mk/spdk.lib.mk diff --git a/src/spdk/module/sock/vpp/vpp.c b/src/spdk/module/sock/vpp/vpp.c new file mode 100644 index 000000000..89a92e9d1 --- /dev/null +++ b/src/spdk/module/sock/vpp/vpp.c @@ -0,0 +1,1633 @@ +/*- + * BSD LICENSE + * + * Copyright (c) Intel Corporation. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions + * are met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in + * the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Intel Corporation nor the names of its + * contributors may be used to endorse or promote products derived + * from this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +/* Omit from static analysis. */ +#ifndef __clang_analyzer__ + +#include "spdk/stdinc.h" + +#include "spdk/log.h" +#include "spdk/sock.h" +#include "spdk/net.h" +#include "spdk/string.h" +#include "spdk_internal/sock.h" +#include "spdk/queue.h" +#include "spdk/event.h" +#include "spdk/thread.h" +#include "spdk_internal/log.h" + +/* _GNU_SOURCE is redefined in the vpp headers with no protection (dlmalloc.h) */ +#undef _GNU_SOURCE + +#include <svm/svm_fifo_segment.h> +#include <vlibmemory/api.h> +#include <vpp/api/vpe_msg_enum.h> +#include <vnet/session/application_interface.h> + +#define vl_typedefs /* define message structures */ +#include <vpp/api/vpe_all_api_h.h> +#undef vl_typedefs + +/* declare message handlers for each api */ + +#define vl_endianfun /* define message structures */ +#include <vpp/api/vpe_all_api_h.h> +#undef vl_endianfun + +/* instantiate all the print functions we know about */ +#define vl_print(handle, ...) +#define vl_printfun +#include <vpp/api/vpe_all_api_h.h> +#undef vl_printfun + +#define SPDK_VPP_CLIB_MEM_SIZE 256 << 20 +#define SPDK_VPP_SESSIONS_MAX 2048 +#define SPDK_VPP_LISTEN_QUEUE_SIZE SPDK_VPP_SESSIONS_MAX +#define SPDK_VPP_SEGMENT_BASEVA 0x200000000ULL +#define SPDK_VPP_SEGMENT_TIMEOUT 20 +#define IOV_BATCH_SIZE 64 + +/* VPP connection state */ +enum spdk_vpp_state { + VPP_STATE_START, + VPP_STATE_ENABLED, + VPP_STATE_ATTACHED, + VPP_STATE_READY, + VPP_STATE_DISCONNECTING, + VPP_STATE_FAILED +}; + +/* VPP session state */ +enum spdk_vpp_session_state { + VPP_SESSION_STATE_UNUSED = 0, + VPP_SESSION_STATE_INIT, /* Initial state */ + VPP_SESSION_STATE_READY, /* Ready for processing */ + VPP_SESSION_STATE_DISCONNECT, + VPP_SESSION_STATE_CLOSE, + VPP_SESSION_STATE_FAILED +}; + +struct spdk_vpp_session { + struct spdk_sock base; + + /* VPP app session */ + app_session_t app_session; + + uint32_t id; + + bool is_server; /* Server side session */ + bool is_listen; /* Session is listener */ + + uint64_t handle; + uint32_t context; + + /* Listener fields */ + pthread_mutex_t accept_session_lock; + uint32_t *accept_session_index_fifo; +}; + +static struct spdk_vpp_main { + int my_client_index; + enum spdk_vpp_state vpp_state; + bool vpp_initialized; + struct spdk_thread *init_thread; + + svm_fifo_segment_main_t segment_main; + svm_queue_t *vl_input_queue; + svm_queue_t *vl_output_queue; + svm_msg_q_t *app_event_queue; + + struct spdk_vpp_session sessions[SPDK_VPP_SESSIONS_MAX]; + pthread_mutex_t session_get_lock; + + struct spdk_poller *vpp_queue_poller; + struct spdk_poller *app_queue_poller; + struct spdk_poller *timeout_poller; +} g_svm; + +struct spdk_vpp_sock_group_impl { + struct spdk_sock_group_impl base; + struct spdk_sock *last_sock; +}; + +#define __vpp_session(sock) ((struct spdk_vpp_session *)sock) +#define __vpp_group_impl(group) ((struct spdk_vpp_sock_group_impl *)group) + +/****************************************************************************** + * Session management + */ +static struct spdk_vpp_session * +vpp_session_create(void) +{ + struct spdk_vpp_session *session; + int i; + + pthread_mutex_lock(&g_svm.session_get_lock); + for (i = 0; i < SPDK_VPP_SESSIONS_MAX && + g_svm.sessions[i].app_session.session_state != VPP_SESSION_STATE_UNUSED; i++) { + /* Empty loop body */ + } + if (i == SPDK_VPP_SESSIONS_MAX || + g_svm.sessions[i].app_session.session_state != VPP_SESSION_STATE_UNUSED) { + SPDK_ERRLOG("Cannot allocate space for new session\n"); + pthread_mutex_unlock(&g_svm.session_get_lock); + return NULL; + } + session = &g_svm.sessions[i]; + memset(session, 0, sizeof(struct spdk_vpp_session)); + pthread_mutex_init(&session->accept_session_lock, NULL); + + session->id = i; + session->app_session.session_state = VPP_SESSION_STATE_INIT; + + pthread_mutex_unlock(&g_svm.session_get_lock); + + SPDK_DEBUGLOG(SPDK_SOCK_VPP, "Creating new session %p (%d)\n", + session, session->id); + + return session; +} + +static struct spdk_vpp_session * +vpp_session_get(uint32_t id) +{ + struct spdk_vpp_session *session = NULL; + + if (id >= SPDK_VPP_SESSIONS_MAX) { + return NULL; + } + + pthread_mutex_lock(&g_svm.session_get_lock); + if (g_svm.sessions[id].app_session.session_state != VPP_SESSION_STATE_UNUSED) { + session = &g_svm.sessions[id]; + } + pthread_mutex_unlock(&g_svm.session_get_lock); + + return session; +} + +static struct spdk_vpp_session * +vpp_session_get_by_handle(uint64_t handle, bool is_listen) +{ + struct spdk_vpp_session *session = NULL; + int i; + + for (i = 0; i < SPDK_VPP_SESSIONS_MAX; i++) { + if (g_svm.sessions[i].app_session.session_state != VPP_SESSION_STATE_UNUSED && + g_svm.sessions[i].app_session.session_state != VPP_SESSION_STATE_DISCONNECT && + g_svm.sessions[i].handle == handle && + g_svm.sessions[i].is_listen == is_listen) { + session = &g_svm.sessions[i]; + break; + } + } + + return session; +} + +static int +vpp_session_free(struct spdk_vpp_session *session) +{ + /* Remove session */ + if (session == NULL) { + SPDK_ERRLOG("Wrong session\n"); + return -EINVAL; + } + + SPDK_DEBUGLOG(SPDK_SOCK_VPP, "Free session %p (%d)\n", session, session->id); + + pthread_mutex_lock(&g_svm.session_get_lock); + session->app_session.session_state = VPP_SESSION_STATE_UNUSED; + pthread_mutex_destroy(&session->accept_session_lock); + pthread_mutex_unlock(&g_svm.session_get_lock); + + return 0; +} + +static int +vpp_sock_getaddr(struct spdk_sock *_sock, char *saddr, int slen, uint16_t *sport, + char *caddr, int clen, uint16_t *cport) +{ + struct spdk_vpp_session *session = __vpp_session(_sock); + const char *result = NULL; + + assert(session != NULL); + assert(g_svm.vpp_initialized); + + if (session->app_session.transport.is_ip4) { + result = inet_ntop(AF_INET, &session->app_session.transport.lcl_ip.ip4.as_u8, + saddr, slen); + } else { + result = inet_ntop(AF_INET6, &session->app_session.transport.lcl_ip.ip6.as_u8, + saddr, slen); + } + if (result == NULL) { + return -1; + } + + if (sport) { + *sport = ntohs(session->app_session.transport.lcl_port); + } + + if (session->app_session.transport.is_ip4) { + result = inet_ntop(AF_INET, &session->app_session.transport.rmt_ip.ip4.as_u8, + caddr, clen); + } else { + result = inet_ntop(AF_INET6, &session->app_session.transport.rmt_ip.ip6.as_u8, + caddr, clen); + } + if (result == NULL) { + return -1; + } + + if (cport) { + *cport = ntohs(session->app_session.transport.rmt_port); + } + + return 0; +} + +enum spdk_vpp_create_type { + SPDK_SOCK_CREATE_LISTEN, + SPDK_SOCK_CREATE_CONNECT, +}; + +/****************************************************************************** + * VPP message handlers + */ +static void +session_accepted_handler(session_accepted_msg_t *mp) +{ + svm_fifo_t *rx_fifo, *tx_fifo; + struct spdk_vpp_session *client_session, *listen_session; + + pthread_mutex_lock(&g_svm.session_get_lock); + listen_session = vpp_session_get_by_handle(mp->listener_handle, true); + pthread_mutex_unlock(&g_svm.session_get_lock); + if (!listen_session) { + SPDK_ERRLOG("Listener not found\n"); + return; + } + + SPDK_DEBUGLOG(SPDK_SOCK_VPP, "Listeners handle is %" PRIu64 "\n", mp->listener_handle); + + /* Allocate local session for a client and set it up */ + client_session = vpp_session_create(); + if (client_session == NULL) { + SPDK_ERRLOG("Cannot create new session\n"); + return; + } + + SPDK_DEBUGLOG(SPDK_SOCK_VPP, "Accept session %p (%d) on %p (%d/%" PRIu64 ")\n", + client_session, client_session->id, listen_session, listen_session->id, + listen_session->handle); + + rx_fifo = uword_to_pointer(mp->server_rx_fifo, svm_fifo_t *); + rx_fifo->client_session_index = client_session->id; + tx_fifo = uword_to_pointer(mp->server_tx_fifo, svm_fifo_t *); + tx_fifo->client_session_index = client_session->id; + + client_session->handle = mp->handle; + client_session->context = mp->context; + client_session->app_session.rx_fifo = rx_fifo; + client_session->app_session.tx_fifo = tx_fifo; + client_session->app_session.vpp_evt_q = uword_to_pointer(mp->vpp_event_queue_address, + svm_msg_q_t *); + + client_session->is_server = true; + client_session->app_session.transport.rmt_port = mp->port; + client_session->app_session.transport.is_ip4 = mp->is_ip4; + memcpy(&client_session->app_session.transport.rmt_ip, mp->ip, sizeof(mp->ip)); + + client_session->app_session.transport.lcl_port = listen_session->app_session.transport.lcl_port; + memcpy(&client_session->app_session.transport.lcl_ip, &listen_session->app_session.transport.lcl_ip, + sizeof(listen_session->app_session.transport.lcl_ip)); + client_session->app_session.transport.is_ip4 = listen_session->app_session.transport.is_ip4; + + client_session->app_session.session_state = VPP_SESSION_STATE_READY; + + pthread_mutex_lock(&listen_session->accept_session_lock); + + clib_fifo_add1(listen_session->accept_session_index_fifo, + client_session->id); + + pthread_mutex_unlock(&listen_session->accept_session_lock); +} + +static void +session_connected_handler(session_connected_msg_t *mp) +{ + struct spdk_vpp_session *session; + svm_fifo_t *rx_fifo, *tx_fifo; + + session = vpp_session_get(mp->context); + if (session == NULL) { + return; + } + + if (mp->retval) { + SPDK_ERRLOG("Connection failed (%d).\n", ntohl(mp->retval)); + session->app_session.session_state = VPP_SESSION_STATE_FAILED; + return; + } + + session->app_session.vpp_evt_q = uword_to_pointer(mp->vpp_event_queue_address, + svm_msg_q_t *); + + rx_fifo = uword_to_pointer(mp->server_rx_fifo, svm_fifo_t *); + rx_fifo->client_session_index = session->id; + tx_fifo = uword_to_pointer(mp->server_tx_fifo, svm_fifo_t *); + tx_fifo->client_session_index = session->id; + + session->app_session.rx_fifo = rx_fifo; + session->app_session.tx_fifo = tx_fifo; + session->handle = mp->handle; + + /* Set lcl addr */ + session->app_session.transport.is_ip4 = mp->is_ip4; + memcpy(&session->app_session.transport.lcl_ip, mp->lcl_ip, sizeof(mp->lcl_ip)); + session->app_session.transport.lcl_port = mp->lcl_port; + + session->app_session.session_state = VPP_SESSION_STATE_READY; +} + +static void +session_disconnected_handler(session_disconnected_msg_t *mp) +{ + struct spdk_vpp_session *session = 0; + + pthread_mutex_lock(&g_svm.session_get_lock); + session = vpp_session_get_by_handle(mp->handle, false); + if (session == NULL) { + SPDK_ERRLOG("Session with handle=%" PRIu64 " not found.\n", + mp->handle); + pthread_mutex_unlock(&g_svm.session_get_lock); + return; + } + SPDK_DEBUGLOG(SPDK_SOCK_VPP, "Disconnect session %p (%d) handler\n", session, session->id); + + /* We need to postpone session deletion to inform upper layer */ + session->app_session.session_state = VPP_SESSION_STATE_DISCONNECT; + pthread_mutex_unlock(&g_svm.session_get_lock); +} + +static void +session_reset_handler(session_reset_msg_t *mp) +{ + int rv = 0; + struct spdk_vpp_session *session = NULL; + app_session_evt_t app_evt; + session_reset_reply_msg_t *rmp; + + pthread_mutex_lock(&g_svm.session_get_lock); + session = vpp_session_get_by_handle(mp->handle, false); + if (session == NULL) { + SPDK_ERRLOG("Session with handle=%" PRIu64 " not found.\n", + mp->handle); + pthread_mutex_unlock(&g_svm.session_get_lock); + return; + } + SPDK_DEBUGLOG(SPDK_SOCK_VPP, "Reset session %p (%d) handler\n", session, session->id); + + session->app_session.session_state = VPP_SESSION_STATE_DISCONNECT; + pthread_mutex_unlock(&g_svm.session_get_lock); + + app_alloc_ctrl_evt_to_vpp(session->app_session.vpp_evt_q, &app_evt, + SESSION_CTRL_EVT_RESET_REPLY); + rmp = (session_reset_reply_msg_t *) app_evt.evt->data; + rmp->retval = rv; + rmp->handle = mp->handle; + app_send_ctrl_evt_to_vpp(session->app_session.vpp_evt_q, &app_evt); +} + +static void +session_bound_handler(session_bound_msg_t *mp) +{ + struct spdk_vpp_session *session; + + /* Context should be set to the session index */ + session = vpp_session_get(mp->context); + + if (mp->retval) { + SPDK_ERRLOG("Bind failed (%d).\n", ntohl(mp->retval)); + session->app_session.session_state = VPP_SESSION_STATE_FAILED; + return; + } + + /* Set local address */ + session->app_session.transport.is_ip4 = mp->lcl_is_ip4; + memcpy(&session->app_session.transport.lcl_ip, mp->lcl_ip, sizeof(mp->lcl_ip)); + session->app_session.transport.lcl_port = mp->lcl_port; + + /* Register listener */ + session->handle = mp->handle; + + SPDK_DEBUGLOG(SPDK_SOCK_VPP, "Bind session %p (%d/%" PRIu64 ")\n", + session, session->id, session->handle); + + /* Session binded, set listen state */ + session->is_listen = true; + session->app_session.session_state = VPP_SESSION_STATE_READY; +} + +static void +session_unlisten_reply_handler(session_unlisten_reply_msg_t *mp) +{ + struct spdk_vpp_session *session; + + if (mp->retval != 0) { + SPDK_ERRLOG("Cannot unbind socket\n"); + return; + } + + session = vpp_session_get(mp->context); + if (session == NULL) { + SPDK_ERRLOG("Cannot find a session by context\n"); + return; + } + SPDK_DEBUGLOG(SPDK_SOCK_VPP, "Unbind session %p (%d)\n", session, session->id); + + session->app_session.session_state = VPP_SESSION_STATE_CLOSE; +} + +static void +handle_mq_event(session_event_t *e) +{ + switch (e->event_type) { + case SESSION_CTRL_EVT_BOUND: + session_bound_handler((session_bound_msg_t *) e->data); + break; + case SESSION_CTRL_EVT_ACCEPTED: + session_accepted_handler((session_accepted_msg_t *) e->data); + break; + case SESSION_CTRL_EVT_CONNECTED: + session_connected_handler((session_connected_msg_t *) e->data); + break; + case SESSION_CTRL_EVT_DISCONNECTED: + session_disconnected_handler((session_disconnected_msg_t *) e->data); + break; + case SESSION_CTRL_EVT_RESET: + session_reset_handler((session_reset_msg_t *) e->data); + break; + case SESSION_CTRL_EVT_UNLISTEN_REPLY: + session_unlisten_reply_handler((session_unlisten_reply_msg_t *) e->data); + break; + default: + SPDK_DEBUGLOG(SPDK_SOCK_VPP, "Unhandled event %u\n", e->event_type); + } +} + +static int +vpp_queue_poller(void *ctx) +{ + uword msg; + + if (g_svm.vl_output_queue->cursize > 0 && + !svm_queue_sub_raw(g_svm.vl_output_queue, (u8 *)&msg)) { + vl_msg_api_handler((void *)msg); + } + + return SPDK_POLLER_BUSY; +} + +static int +app_queue_poller(void *ctx) +{ + session_event_t *e; + svm_msg_q_msg_t msg; + + if (!svm_msg_q_is_empty(g_svm.app_event_queue)) { + svm_msg_q_sub(g_svm.app_event_queue, &msg, SVM_Q_WAIT, 0); + e = svm_msg_q_msg_data(g_svm.app_event_queue, &msg); + handle_mq_event(e); + svm_msg_q_free_msg(g_svm.app_event_queue, &msg); + } + return SPDK_POLLER_BUSY; +} + +/* This is required until sock.c API changes to asynchronous */ +static int +_wait_for_session_state_change(struct spdk_vpp_session *session, enum spdk_vpp_session_state state) +{ + time_t start = time(NULL); + while (time(NULL) - start < 10) { + if (session->app_session.session_state == VPP_SESSION_STATE_FAILED) { + errno = EADDRNOTAVAIL; + return -1; + } + if (session->app_session.session_state == state) { + errno = 0; + return 0; + } + if (spdk_get_thread() == g_svm.init_thread) { + usleep(100000); + app_queue_poller(NULL); + vpp_queue_poller(NULL); + } + } + /* timeout */ + errno = ETIMEDOUT; + return -1; +} + +static int +vpp_session_connect(struct spdk_vpp_session *session) +{ + vl_api_connect_sock_t *cmp; + + cmp = vl_msg_api_alloc(sizeof(*cmp)); + if (cmp == NULL) { + return -ENOMEM; + } + memset(cmp, 0, sizeof(*cmp)); + + cmp->_vl_msg_id = ntohs(VL_API_CONNECT_SOCK); + cmp->client_index = g_svm.my_client_index; + cmp->context = session->id; + + cmp->vrf = 0 /* VPPCOM_VRF_DEFAULT */; + cmp->is_ip4 = (session->app_session.transport.is_ip4); + memcpy(cmp->ip, &session->app_session.transport.rmt_ip, sizeof(cmp->ip)); + cmp->port = session->app_session.transport.rmt_port; + cmp->proto = TRANSPORT_PROTO_TCP; + vl_msg_api_send_shmem(g_svm.vl_input_queue, (u8 *)&cmp); + + return _wait_for_session_state_change(session, VPP_SESSION_STATE_READY); +} + +static void +vl_api_disconnect_session_reply_t_handler(vl_api_disconnect_session_reply_t *mp) +{ + struct spdk_vpp_session *session; + + if (mp->retval) { + SPDK_ERRLOG("Disconnecting session failed (%d).\n", ntohl(mp->retval)); + return; + } + + pthread_mutex_lock(&g_svm.session_get_lock); + session = vpp_session_get_by_handle(mp->handle, false); + if (session == NULL) { + SPDK_ERRLOG("Invalid session handler (%" PRIu64 ").\n", mp->handle); + pthread_mutex_unlock(&g_svm.session_get_lock); + return; + } + SPDK_DEBUGLOG(SPDK_SOCK_VPP, "Session disconnected %p (%d)\n", session, session->id); + session->app_session.session_state = VPP_SESSION_STATE_CLOSE; + pthread_mutex_unlock(&g_svm.session_get_lock); +} + +static int +vpp_session_disconnect(struct spdk_vpp_session *session) +{ + int rv = 0; + vl_api_disconnect_session_t *dmp; + session_disconnected_reply_msg_t *rmp; + app_session_evt_t app_evt; + + if (session->app_session.session_state == VPP_SESSION_STATE_DISCONNECT) { + SPDK_DEBUGLOG(SPDK_SOCK_VPP, "Session is already in disconnecting state %p (%d)\n", + session, session->id); + + app_alloc_ctrl_evt_to_vpp(session->app_session.vpp_evt_q, &app_evt, + SESSION_CTRL_EVT_DISCONNECTED_REPLY); + rmp = (session_disconnected_reply_msg_t *) app_evt.evt->data; + rmp->retval = rv; + rmp->handle = session->handle; + rmp->context = session->context; + app_send_ctrl_evt_to_vpp(session->app_session.vpp_evt_q, &app_evt); + + return 0; + } + SPDK_DEBUGLOG(SPDK_SOCK_VPP, "Disconnect session %p (%d)\n", session, session->id); + + dmp = vl_msg_api_alloc(sizeof(*dmp)); + if (dmp == NULL) { + return -ENOMEM; + } + memset(dmp, 0, sizeof(*dmp)); + dmp->_vl_msg_id = ntohs(VL_API_DISCONNECT_SESSION); + dmp->client_index = g_svm.my_client_index; + dmp->handle = session->handle; + vl_msg_api_send_shmem(g_svm.vl_input_queue, (u8 *)&dmp); + + return _wait_for_session_state_change(session, VPP_SESSION_STATE_CLOSE); +} + +static int +send_unbind_sock(struct spdk_vpp_session *session) +{ + vl_api_unbind_sock_t *ump; + + SPDK_DEBUGLOG(SPDK_SOCK_VPP, "Unbind session %p (%d) request\n", session, session->id); + + ump = vl_msg_api_alloc(sizeof(*ump)); + if (ump == NULL) { + return -ENOMEM; + } + memset(ump, 0, sizeof(*ump)); + + ump->_vl_msg_id = ntohs(VL_API_UNBIND_SOCK); + ump->client_index = g_svm.my_client_index; + ump->handle = session->handle; + ump->context = session->id; + vl_msg_api_send_shmem(g_svm.vl_input_queue, (u8 *)&ump); + + return _wait_for_session_state_change(session, VPP_SESSION_STATE_CLOSE); +} + +static int +vpp_session_listen(struct spdk_vpp_session *session) +{ + vl_api_bind_sock_t *bmp; + + if (session->is_listen) { + /* Already in the listen state */ + return 0; + } + + clib_fifo_resize(session->accept_session_index_fifo, SPDK_VPP_LISTEN_QUEUE_SIZE); + + session->is_server = 1; + bmp = vl_msg_api_alloc(sizeof(*bmp)); + if (bmp == NULL) { + return -ENOMEM; + } + memset(bmp, 0, sizeof(*bmp)); + + bmp->_vl_msg_id = ntohs(VL_API_BIND_SOCK); + bmp->client_index = g_svm.my_client_index; + bmp->context = session->id; + bmp->vrf = 0; + bmp->is_ip4 = session->app_session.transport.is_ip4; + memcpy(bmp->ip, &session->app_session.transport.lcl_ip, sizeof(bmp->ip)); + bmp->port = session->app_session.transport.lcl_port; + bmp->proto = TRANSPORT_PROTO_TCP; + + vl_msg_api_send_shmem(g_svm.vl_input_queue, (u8 *)&bmp); + + return _wait_for_session_state_change(session, VPP_SESSION_STATE_READY); +} + +static struct spdk_sock * +vpp_sock_create(const char *ip, int port, enum spdk_vpp_create_type type, + struct spdk_sock_opts *opts) +{ + struct spdk_vpp_session *session; + int rc; + uint8_t is_ip4 = 0; + ip46_address_t addr_buf; + + if (!g_svm.vpp_initialized || ip == NULL) { + return NULL; + } + + session = vpp_session_create(); + if (session == NULL) { + SPDK_ERRLOG("vpp_session_create() failed\n"); + errno = ENOMEM; + return NULL; + } + + /* Check address family */ + if (inet_pton(AF_INET, ip, &addr_buf.ip4.as_u8)) { + is_ip4 = 1; + } else if (inet_pton(AF_INET6, ip, &addr_buf.ip6.as_u8)) { + is_ip4 = 0; + } else { + SPDK_ERRLOG("IP address with invalid format\n"); + errno = EAFNOSUPPORT; + goto err; + } + + if (type == SPDK_SOCK_CREATE_LISTEN) { + session->app_session.transport.is_ip4 = is_ip4; + memcpy(&session->app_session.transport.lcl_ip, &addr_buf, sizeof(addr_buf)); + session->app_session.transport.lcl_port = htons(port); + + rc = vpp_session_listen(session); + if (rc != 0) { + errno = -rc; + SPDK_ERRLOG("session_listen() failed\n"); + goto err; + } + } else if (type == SPDK_SOCK_CREATE_CONNECT) { + session->app_session.transport.is_ip4 = is_ip4; + memcpy(&session->app_session.transport.rmt_ip, &addr_buf, sizeof(addr_buf)); + session->app_session.transport.rmt_port = htons(port); + + rc = vpp_session_connect(session); + if (rc != 0) { + SPDK_ERRLOG("session_connect() failed\n"); + goto err; + } + } + + return &session->base; + +err: + vpp_session_free(session); + return NULL; +} + +static struct spdk_sock * +vpp_sock_listen(const char *ip, int port, struct spdk_sock_opts *opts) +{ + return vpp_sock_create(ip, port, SPDK_SOCK_CREATE_LISTEN, opts); +} + +static struct spdk_sock * +vpp_sock_connect(const char *ip, int port, struct spdk_sock_opts *opts) +{ + return vpp_sock_create(ip, port, SPDK_SOCK_CREATE_CONNECT, opts); +} + +static struct spdk_sock * +vpp_sock_accept(struct spdk_sock *_sock) +{ + struct spdk_vpp_session *listen_session = __vpp_session(_sock); + struct spdk_vpp_session *client_session = NULL; + u32 client_session_index = ~0; + uword elts = 0; + app_session_evt_t app_evt; + session_accepted_reply_msg_t *rmp; + + assert(listen_session != NULL); + assert(g_svm.vpp_initialized); + + if (listen_session->app_session.session_state != VPP_SESSION_STATE_READY) { + /* Listen session should be in the listen state */ + errno = EWOULDBLOCK; + return NULL; + } + + pthread_mutex_lock(&listen_session->accept_session_lock); + + if (listen_session->accept_session_index_fifo != NULL) { + elts = clib_fifo_elts(listen_session->accept_session_index_fifo); + } + + if (elts == 0) { + /* No client sessions */ + errno = EAGAIN; + pthread_mutex_unlock(&listen_session->accept_session_lock); + return NULL; + } + + clib_fifo_sub1(listen_session->accept_session_index_fifo, + client_session_index); + + pthread_mutex_unlock(&listen_session->accept_session_lock); + + client_session = vpp_session_get(client_session_index); + if (client_session == NULL) { + SPDK_ERRLOG("client session closed or aborted\n"); + errno = ECONNABORTED; + return NULL; + } + + SPDK_DEBUGLOG(SPDK_SOCK_VPP, "Client %p(%" PRIu32 ") accepted.\n", + client_session, client_session_index); + + /* + * Send accept session reply + */ + app_alloc_ctrl_evt_to_vpp(client_session->app_session.vpp_evt_q, &app_evt, + SESSION_CTRL_EVT_ACCEPTED_REPLY); + rmp = (session_accepted_reply_msg_t *) app_evt.evt->data; + rmp->handle = client_session->handle; + rmp->context = client_session->context; + app_send_ctrl_evt_to_vpp(client_session->app_session.vpp_evt_q, &app_evt); + + return &client_session->base; +} + +static int +vpp_sock_close(struct spdk_sock *_sock) +{ + struct spdk_vpp_session *session = __vpp_session(_sock); + + assert(session != NULL); + assert(g_svm.vpp_initialized); + + if (session->is_listen) { + send_unbind_sock(session); + } else { + vpp_session_disconnect(session); + } + vpp_session_free(session); + + return 0; +} + +static ssize_t +vpp_sock_recv(struct spdk_sock *_sock, void *buf, size_t len) +{ + struct spdk_vpp_session *session = __vpp_session(_sock); + int rc; + svm_fifo_t *rx_fifo; + uint32_t bytes; + + assert(session != NULL); + assert(g_svm.vpp_initialized); + + rx_fifo = session->app_session.rx_fifo; + + bytes = svm_fifo_max_dequeue(session->app_session.rx_fifo); + if (bytes > (ssize_t)len) { + bytes = len; + } + + if (bytes == 0) { + if (session->app_session.session_state == VPP_SESSION_STATE_DISCONNECT) { + /* Socket is disconnected */ + SPDK_DEBUGLOG(SPDK_SOCK_VPP, "Client %p(%" PRIu32 ") is disconnected.\n", + session, session->id); + errno = 0; + return 0; + } + errno = EAGAIN; + return -1; + } + + rc = app_recv_stream_raw(rx_fifo, buf, bytes, 0, 0); + if (rc < 0) { + errno = -rc; + return rc; + } + + return rc; +} + +static ssize_t +vpp_sock_readv(struct spdk_sock *_sock, struct iovec *iov, int iovcnt) +{ + ssize_t total = 0; + int i, rc; + + assert(_sock != NULL); + assert(g_svm.vpp_initialized); + + for (i = 0; i < iovcnt; ++i) { + rc = vpp_sock_recv(_sock, iov[i].iov_base, iov[i].iov_len); + if (rc < 0) { + if (total > 0) { + break; + } else { + errno = -rc; + return -1; + } + } else { + total += rc; + if (rc < (ssize_t)iov[i].iov_len) { + /* Read less than buffer provided, no point to continue. */ + break; + } + } + } + return total; +} + +static ssize_t +_vpp_sock_writev(struct spdk_sock *_sock, struct iovec *iov, int iovcnt) +{ + struct spdk_vpp_session *session = __vpp_session(_sock); + ssize_t total = 0; + int i, rc; + svm_fifo_t *tx_fifo; + session_evt_type_t et; + + assert(session != NULL); + assert(g_svm.vpp_initialized); + + tx_fifo = session->app_session.tx_fifo; + et = SESSION_IO_EVT_TX; + + for (i = 0; i < iovcnt; ++i) { + if (svm_fifo_is_full(tx_fifo)) { + errno = EWOULDBLOCK; + return -1; + } + + /* We use only stream connection for now */ + rc = app_send_stream_raw(tx_fifo, session->app_session.vpp_evt_q, + iov[i].iov_base, iov[i].iov_len, et, + 1, SVM_Q_WAIT); + + if (rc < 0) { + if (total > 0) { + break; + } else { + SPDK_DEBUGLOG(SPDK_SOCK_VPP, "Buffer overflow\n"); + errno = EWOULDBLOCK; + return -1; + } + } else { + total += rc; + if (rc < (ssize_t)iov[i].iov_len) { + /* Write less than buffer provided, no point to continue. */ + break; + } + } + } + + return total; +} + +static int +_sock_flush(struct spdk_sock *sock) +{ + struct iovec iovs[IOV_BATCH_SIZE]; + int iovcnt; + int retval; + struct spdk_sock_request *req; + int i; + ssize_t rc; + unsigned int offset; + size_t len; + + /* Can't flush from within a callback or we end up with recursive calls */ + if (sock->cb_cnt > 0) { + return 0; + } + + /* Gather an iov */ + iovcnt = 0; + req = TAILQ_FIRST(&sock->queued_reqs); + while (req) { + offset = req->internal.offset; + + for (i = 0; i < req->iovcnt; i++) { + /* Consume any offset first */ + if (offset >= SPDK_SOCK_REQUEST_IOV(req, i)->iov_len) { + offset -= SPDK_SOCK_REQUEST_IOV(req, i)->iov_len; + continue; + } + + iovs[iovcnt].iov_base = SPDK_SOCK_REQUEST_IOV(req, i)->iov_base + offset; + iovs[iovcnt].iov_len = SPDK_SOCK_REQUEST_IOV(req, i)->iov_len - offset; + iovcnt++; + + offset = 0; + + if (iovcnt >= IOV_BATCH_SIZE) { + break; + } + } + + if (iovcnt >= IOV_BATCH_SIZE) { + break; + } + + req = TAILQ_NEXT(req, internal.link); + } + + if (iovcnt == 0) { + return 0; + } + + /* Perform the vectored write */ + rc = _vpp_sock_writev(sock, iovs, iovcnt); + if (rc <= 0) { + if (errno == EAGAIN || errno == EWOULDBLOCK) { + return 0; + } + return rc; + } + + /* Consume the requests that were actually written */ + req = TAILQ_FIRST(&sock->queued_reqs); + while (req) { + offset = req->internal.offset; + + for (i = 0; i < req->iovcnt; i++) { + /* Advance by the offset first */ + if (offset >= SPDK_SOCK_REQUEST_IOV(req, i)->iov_len) { + offset -= SPDK_SOCK_REQUEST_IOV(req, i)->iov_len; + continue; + } + + /* Calculate the remaining length of this element */ + len = SPDK_SOCK_REQUEST_IOV(req, i)->iov_len - offset; + + if (len > (size_t)rc) { + /* This element was partially sent. */ + req->internal.offset += rc; + return 0; + } + + offset = 0; + req->internal.offset += len; + rc -= len; + } + + /* Handled a full request. */ + req->internal.offset = 0; + spdk_sock_request_pend(sock, req); + + /* The _vpp_sock_writev above isn't currently asynchronous, + * so it's already done. */ + retval = spdk_sock_request_put(sock, req, 0); + + if (rc == 0 || retval) { + break; + } + + req = TAILQ_FIRST(&sock->queued_reqs); + } + + return 0; +} + +static ssize_t +vpp_sock_writev(struct spdk_sock *_sock, struct iovec *iov, int iovcnt) +{ + int rc; + + /* In order to process a writev, we need to flush any asynchronous writes + * first. */ + rc = _sock_flush(_sock); + if (rc < 0) { + return rc; + } + + if (!TAILQ_EMPTY(&_sock->queued_reqs)) { + /* We weren't able to flush all requests */ + errno = EAGAIN; + return -1; + } + + return _vpp_sock_writev(_sock, iov, iovcnt); +} + +static void +vpp_sock_writev_async(struct spdk_sock *sock, struct spdk_sock_request *req) +{ + int rc; + + spdk_sock_request_queue(sock, req); + + if (sock->group_impl == NULL) { + spdk_sock_request_put(sock, req, -ENOTSUP); + return; + } + + /* If there are a sufficient number queued, just flush them out immediately. */ + if (sock->queued_iovcnt >= IOV_BATCH_SIZE) { + rc = _sock_flush(sock); + if (rc) { + spdk_sock_abort_requests(sock); + } + } +} + +static int +vpp_sock_set_recvlowat(struct spdk_sock *_sock, int nbytes) +{ + assert(g_svm.vpp_initialized); + + return 0; +} + +static int +vpp_sock_set_recvbuf(struct spdk_sock *_sock, int sz) +{ + assert(g_svm.vpp_initialized); + + return 0; +} + +static int +vpp_sock_set_sendbuf(struct spdk_sock *_sock, int sz) +{ + assert(g_svm.vpp_initialized); + + return 0; +} + +static bool +vpp_sock_is_ipv6(struct spdk_sock *_sock) +{ + return !__vpp_session(_sock)->app_session.transport.is_ip4; +} + +static bool +vpp_sock_is_ipv4(struct spdk_sock *_sock) +{ + return __vpp_session(_sock)->app_session.transport.is_ip4; +} + +static bool +vpp_sock_is_connected(struct spdk_sock *_sock) +{ + assert(g_svm.vpp_initialized); + + return (__vpp_session(_sock)->app_session.session_state == VPP_SESSION_STATE_READY); +} + +static int +vpp_sock_get_placement_id(struct spdk_sock *_sock, int *placement_id) +{ + return -1; +} + +static struct spdk_sock_group_impl * +vpp_sock_group_impl_create(void) +{ + struct spdk_vpp_sock_group_impl *group_impl; + + if (!g_svm.vpp_initialized) { + return NULL; + } + + group_impl = calloc(1, sizeof(*group_impl)); + if (group_impl == NULL) { + SPDK_ERRLOG("sock_group allocation failed\n"); + errno = ENOMEM; + return NULL; + } + + return &group_impl->base; +} + +static int +vpp_sock_group_impl_add_sock(struct spdk_sock_group_impl *_group, + struct spdk_sock *_sock) +{ + /* We expect that higher level do it for us */ + return 0; +} + +static int +vpp_sock_group_impl_remove_sock(struct spdk_sock_group_impl *_group, + struct spdk_sock *_sock) +{ + /* We expect that higher level do it for us */ + return 0; +} + +static bool +vpp_session_read_ready(struct spdk_vpp_session *session) +{ + svm_fifo_t *rx_fifo = NULL; + uint32_t ready = 0; + + if (session->app_session.session_state == VPP_SESSION_STATE_DISCONNECT) { + /* If session not found force reading to close it. + * NOTE: We're expecting here that upper layer will close + * connection when next read fails. + */ + return true; + } + + if (session->app_session.session_state == VPP_SESSION_STATE_READY) { + rx_fifo = session->app_session.rx_fifo; + ready = svm_fifo_max_dequeue(rx_fifo); + } + + return ready > 0; +} + +static int +vpp_sock_group_impl_poll(struct spdk_sock_group_impl *_group, int max_events, + struct spdk_sock **socks) +{ + int num_events, rc; + struct spdk_sock *sock, *tmp; + struct spdk_vpp_session *session; + struct spdk_vpp_sock_group_impl *group; + + assert(_group != NULL); + assert(socks != NULL); + assert(g_svm.vpp_initialized); + + group = __vpp_group_impl(_group); + num_events = 0; + + /* This must be a TAILQ_FOREACH_SAFE because while flushing, + * a completion callback could remove the sock from the + * group. */ + TAILQ_FOREACH_SAFE(sock, &_group->socks, link, tmp) { + rc = _sock_flush(sock); + if (rc) { + spdk_sock_abort_requests(sock); + } + } + + sock = group->last_sock; + if (sock == NULL) { + sock = TAILQ_FIRST(&group->base.socks); + } + + while (sock != NULL) { + session = __vpp_session(sock); + if (vpp_session_read_ready(session)) { + socks[num_events] = sock; + num_events++; + if (num_events >= max_events) { + sock = TAILQ_NEXT(sock, link); + break; + } + } + sock = TAILQ_NEXT(sock, link); + } + group->last_sock = sock; + + return num_events; +} + +static int +vpp_sock_group_impl_close(struct spdk_sock_group_impl *_group) +{ + free(_group); + return 0; +} + +/****************************************************************************** + * Initialize and attach to the VPP + */ +static int +vpp_app_attach(void) +{ + vl_api_application_attach_t *bmp; + u32 fifo_size = 16 << 20; + + bmp = vl_msg_api_alloc(sizeof(*bmp)); + if (bmp == NULL) { + return -ENOMEM; + } + memset(bmp, 0, sizeof(*bmp)); + + bmp->_vl_msg_id = ntohs(VL_API_APPLICATION_ATTACH); + bmp->client_index = g_svm.my_client_index; + bmp->context = ntohl(0xfeedface); + + bmp->options[APP_OPTIONS_FLAGS] = APP_OPTIONS_FLAGS_ACCEPT_REDIRECT; + bmp->options[APP_OPTIONS_FLAGS] |= APP_OPTIONS_FLAGS_ADD_SEGMENT; + + bmp->options[APP_OPTIONS_PREALLOC_FIFO_PAIRS] = 16; + bmp->options[APP_OPTIONS_RX_FIFO_SIZE] = fifo_size; + bmp->options[APP_OPTIONS_TX_FIFO_SIZE] = fifo_size; + bmp->options[APP_OPTIONS_ADD_SEGMENT_SIZE] = 256 << 20; + bmp->options[APP_OPTIONS_SEGMENT_SIZE] = 512 << 20; + bmp->options[APP_OPTIONS_EVT_QUEUE_SIZE] = 256; + + vl_msg_api_send_shmem(g_svm.vl_input_queue, (u8 *)&bmp); + + return 0; +} +static void +vl_api_session_enable_disable_reply_t_handler(vl_api_session_enable_disable_reply_t *mp) +{ + if (mp->retval) { + SPDK_ERRLOG("Session enable failed (%d).\n", ntohl(mp->retval)); + } else { + SPDK_NOTICELOG("Session layer enabled\n"); + g_svm.vpp_state = VPP_STATE_ENABLED; + vpp_app_attach(); + } +} + +static int +vpp_session_enable(u8 is_enable) +{ + vl_api_session_enable_disable_t *bmp; + + bmp = vl_msg_api_alloc(sizeof(*bmp)); + if (bmp == NULL) { + return -ENOMEM; + } + memset(bmp, 0, sizeof(*bmp)); + + bmp->_vl_msg_id = ntohs(VL_API_SESSION_ENABLE_DISABLE); + bmp->client_index = g_svm.my_client_index; + bmp->context = htonl(0xfeedface); + bmp->is_enable = is_enable; + vl_msg_api_send_shmem(g_svm.vl_input_queue, (u8 *)&bmp); + + return 0; +} + +static void +vpp_application_attached(void *arg) +{ + SPDK_NOTICELOG("VPP net framework initialized.\n"); + g_svm.vpp_state = VPP_STATE_ATTACHED; + g_svm.vpp_initialized = true; + g_svm.app_queue_poller = SPDK_POLLER_REGISTER(app_queue_poller, NULL, 100); + spdk_net_framework_init_next(0); +} + +static int +ssvm_segment_attach(char *name, ssvm_segment_type_t type, int fd) +{ + svm_fifo_segment_create_args_t a; + int rv; + + SPDK_DEBUGLOG(SPDK_SOCK_VPP, "Attaching segment %s\n", name); + + clib_memset(&a, 0, sizeof(a)); + a.segment_name = (char *) name; + a.segment_type = type; + + assert(type == SSVM_SEGMENT_MEMFD); + a.memfd_fd = fd; + + if ((rv = svm_fifo_segment_attach(&g_svm.segment_main, &a))) { + SPDK_ERRLOG("Segment '%s' attach failed (%d).\n", name, rv); + return rv; + } + + vec_reset_length(a.new_segment_indices); + return 0; +} + +static void +vl_api_application_attach_reply_t_handler(vl_api_application_attach_reply_t *mp) +{ + u32 n_fds = 0; + + if (mp->retval) { + SPDK_ERRLOG("Application attach to VPP failed (%d)\n", + ntohl(mp->retval)); + goto err; + } + + if (mp->segment_name_length == 0) { + SPDK_ERRLOG("segment_name_length zero\n"); + goto err; + } + + assert(mp->app_event_queue_address); + g_svm.app_event_queue = uword_to_pointer(mp->app_event_queue_address, svm_msg_q_t *); + + if (mp->n_fds) { + int fds[mp->n_fds]; + + vl_socket_client_recv_fd_msg(fds, mp->n_fds, 5); + + if (mp->fd_flags & SESSION_FD_F_VPP_MQ_SEGMENT) { + if (ssvm_segment_attach(0, SSVM_SEGMENT_MEMFD, fds[n_fds++])) { + goto err; + } + } + + if (mp->fd_flags & SESSION_FD_F_MEMFD_SEGMENT) { + if (ssvm_segment_attach((char *) mp->segment_name, SSVM_SEGMENT_MEMFD, fds[n_fds++])) { + goto err; + } + } + + if (mp->fd_flags & SESSION_FD_F_MQ_EVENTFD) { + svm_msg_q_set_consumer_eventfd(g_svm.app_event_queue, fds[n_fds++]); + } + } + + spdk_thread_send_msg(g_svm.init_thread, vpp_application_attached, NULL); + return; +err: + g_svm.vpp_state = VPP_STATE_FAILED; + return; +} + +/* Detach */ +static void +vpp_application_detached(void *arg) +{ + if (!g_svm.vpp_initialized) { + return; + } + + spdk_poller_unregister(&g_svm.vpp_queue_poller); + spdk_poller_unregister(&g_svm.app_queue_poller); + spdk_poller_unregister(&g_svm.timeout_poller); + + g_svm.vpp_initialized = false; + g_svm.vpp_state = VPP_STATE_START; + pthread_mutex_destroy(&g_svm.session_get_lock); + vl_socket_client_disconnect(); + + SPDK_NOTICELOG("Application detached\n"); + + spdk_net_framework_fini_next(); +} + +static int +vpp_application_detached_timeout(void *arg) +{ + if (g_svm.vpp_initialized) { + /* We need to finish detach on initial thread */ + spdk_thread_send_msg(g_svm.init_thread, vpp_application_detached, NULL); + } + return SPDK_POLLER_BUSY; +} + +static void +vl_api_application_detach_reply_t_handler(vl_api_application_detach_reply_t *mp) +{ + if (mp->retval) { + SPDK_ERRLOG("Application detach from VPP failed (%d).\n", ntohl(mp->retval)); + g_svm.vpp_state = VPP_STATE_FAILED; + } + + /* We need to finish detach on initial thread */ + spdk_thread_send_msg(g_svm.init_thread, vpp_application_detached, NULL); +} + +static int +vpp_app_detach(void) +{ + vl_api_application_detach_t *bmp; + + bmp = vl_msg_api_alloc(sizeof(*bmp)); + if (bmp == NULL) { + return -ENOMEM; + } + memset(bmp, 0, sizeof(*bmp)); + + bmp->_vl_msg_id = ntohs(VL_API_APPLICATION_DETACH); + bmp->client_index = g_svm.my_client_index; + bmp->context = ntohl(0xfeedface); + vl_msg_api_send_shmem(g_svm.vl_input_queue, (u8 *)&bmp); + + g_svm.timeout_poller = SPDK_POLLER_REGISTER(vpp_application_detached_timeout, + NULL, 10000000); + + return 0; +} + +static void +vl_api_map_another_segment_t_handler(vl_api_map_another_segment_t *mp) +{ + ssvm_segment_type_t seg_type = SSVM_SEGMENT_SHM; + int fd = -1; + + if (mp->fd_flags) { + vl_socket_client_recv_fd_msg(&fd, 1, 5); + seg_type = SSVM_SEGMENT_MEMFD; + } + + if (ssvm_segment_attach((char *) mp->segment_name, + seg_type, fd)) { + SPDK_ERRLOG("svm_fifo_segment_attach ('%s') failed\n", + mp->segment_name); + return; + } + + SPDK_DEBUGLOG(SPDK_SOCK_VPP, "New segment ('%s') attached\n", + mp->segment_name); +} + +static void +vpp_net_framework_set_handlers(void) +{ + /* Set up VPP handlers */ +#define _(N,n) \ + vl_msg_api_set_handlers(VL_API_##N, #n, \ + vl_api_##n##_t_handler, \ + vl_noop_handler, \ + vl_api_##n##_t_endian, \ + vl_api_##n##_t_print, \ + sizeof(vl_api_##n##_t), 1); + _(SESSION_ENABLE_DISABLE_REPLY, session_enable_disable_reply) \ + _(DISCONNECT_SESSION_REPLY, disconnect_session_reply) \ + _(APPLICATION_ATTACH_REPLY, application_attach_reply) \ + _(APPLICATION_DETACH_REPLY, application_detach_reply) \ + _(MAP_ANOTHER_SEGMENT, map_another_segment) +#undef _ +} + +static void +vpp_net_framework_init(void) +{ + char *app_name; + api_main_t *am = &api_main; + + clib_mem_init_thread_safe(0, SPDK_VPP_CLIB_MEM_SIZE); + svm_fifo_segment_main_init(&g_svm.segment_main, SPDK_VPP_SEGMENT_BASEVA, + SPDK_VPP_SEGMENT_TIMEOUT); + + app_name = spdk_sprintf_alloc("SPDK_%d", getpid()); + if (app_name == NULL) { + SPDK_ERRLOG("Cannot alloc memory for SPDK app name\n"); + return; + } + + vpp_net_framework_set_handlers(); + + if (vl_socket_client_connect((char *) API_SOCKET_FILE, app_name, + 0 /* default rx, tx buffer */)) { + SPDK_ERRLOG("Client \"%s\" failed to connect to the socket \"%s\".\n", + app_name, API_SOCKET_FILE); + goto err; + } + + if (vl_socket_client_init_shm(0, 0 /* want_pthread */)) { + SPDK_ERRLOG("SHM API initialization failed.\n"); + vl_socket_client_disconnect(); + goto err; + } + + g_svm.vl_input_queue = am->shmem_hdr->vl_input_queue; + g_svm.vl_output_queue = am->vl_input_queue; + + g_svm.my_client_index = am->my_client_index; + pthread_mutex_init(&g_svm.session_get_lock, NULL); + + free(app_name); + + g_svm.init_thread = spdk_get_thread(); + SPDK_NOTICELOG("Enable VPP session\n"); + + g_svm.vpp_queue_poller = SPDK_POLLER_REGISTER(vpp_queue_poller, NULL, 100); + + vpp_session_enable(1); + + return; + +err: + free(app_name); + spdk_net_framework_init_next(0); +} + +/****************************************************************************** + * Register components + */ +static struct spdk_net_impl g_vpp_net_impl = { + .name = "vpp", + .getaddr = vpp_sock_getaddr, + .connect = vpp_sock_connect, + .listen = vpp_sock_listen, + .accept = vpp_sock_accept, + .close = vpp_sock_close, + .recv = vpp_sock_recv, + .readv = vpp_sock_readv, + .writev = vpp_sock_writev, + .writev_async = vpp_sock_writev_async, + .set_recvlowat = vpp_sock_set_recvlowat, + .set_recvbuf = vpp_sock_set_recvbuf, + .set_sendbuf = vpp_sock_set_sendbuf, + .is_ipv6 = vpp_sock_is_ipv6, + .is_ipv4 = vpp_sock_is_ipv4, + .is_connected = vpp_sock_is_connected, + .get_placement_id = vpp_sock_get_placement_id, + .group_impl_create = vpp_sock_group_impl_create, + .group_impl_add_sock = vpp_sock_group_impl_add_sock, + .group_impl_remove_sock = vpp_sock_group_impl_remove_sock, + .group_impl_poll = vpp_sock_group_impl_poll, + .group_impl_close = vpp_sock_group_impl_close, +}; + +SPDK_NET_IMPL_REGISTER(vpp, &g_vpp_net_impl, DEFAULT_SOCK_PRIORITY + 2); + +static void +vpp_net_framework_fini(void) +{ + if (g_svm.vpp_initialized) { + vpp_app_detach(); + } else { + spdk_net_framework_fini_next(); + } +} + +static struct spdk_net_framework g_vpp_net_framework = { + .name = "vpp", + .init = vpp_net_framework_init, + .fini = vpp_net_framework_fini, +}; + +SPDK_NET_FRAMEWORK_REGISTER(vpp, &g_vpp_net_framework); + +SPDK_LOG_REGISTER_COMPONENT("sock_vpp", SPDK_SOCK_VPP) + +#endif /* __clang_analyzer__ */ |