summaryrefslogtreecommitdiffstats
path: root/src/spdk/lib/event
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-07 18:45:59 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-07 18:45:59 +0000
commit19fcec84d8d7d21e796c7624e521b60d28ee21ed (patch)
tree42d26aa27d1e3f7c0b8bd3fd14e7d7082f5008dc /src/spdk/lib/event
parentInitial commit. (diff)
downloadceph-6d07fdb6bb33b1af39833b850bb6cf8af79fe293.tar.xz
ceph-6d07fdb6bb33b1af39833b850bb6cf8af79fe293.zip
Adding upstream version 16.2.11+ds.upstream/16.2.11+dsupstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'src/spdk/lib/event')
-rw-r--r--src/spdk/lib/event/Makefile45
-rw-r--r--src/spdk/lib/event/app.c1177
-rw-r--r--src/spdk/lib/event/json_config.c630
-rw-r--r--src/spdk/lib/event/reactor.c664
-rw-r--r--src/spdk/lib/event/rpc.c87
-rw-r--r--src/spdk/lib/event/spdk_event.map46
-rw-r--r--src/spdk/lib/event/subsystem.c288
7 files changed, 2937 insertions, 0 deletions
diff --git a/src/spdk/lib/event/Makefile b/src/spdk/lib/event/Makefile
new file mode 100644
index 000000000..87a6209c7
--- /dev/null
+++ b/src/spdk/lib/event/Makefile
@@ -0,0 +1,45 @@
+#
+# BSD LICENSE
+#
+# Copyright (c) Intel Corporation.
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions
+# are met:
+#
+# * Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+# * Redistributions in binary form must reproduce the above copyright
+# notice, this list of conditions and the following disclaimer in
+# the documentation and/or other materials provided with the
+# distribution.
+# * Neither the name of Intel Corporation nor the names of its
+# contributors may be used to endorse or promote products derived
+# from this software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+#
+
+SPDK_ROOT_DIR := $(abspath $(CURDIR)/../..)
+include $(SPDK_ROOT_DIR)/mk/spdk.common.mk
+
+SO_VER := 5
+SO_MINOR := 0
+
+LIBNAME = event
+C_SRCS = app.c reactor.c rpc.c subsystem.c json_config.c
+
+SPDK_MAP_FILE = $(abspath $(CURDIR)/spdk_event.map)
+
+include $(SPDK_ROOT_DIR)/mk/spdk.lib.mk
diff --git a/src/spdk/lib/event/app.c b/src/spdk/lib/event/app.c
new file mode 100644
index 000000000..b6cab05a3
--- /dev/null
+++ b/src/spdk/lib/event/app.c
@@ -0,0 +1,1177 @@
+/*-
+ * BSD LICENSE
+ *
+ * Copyright (c) Intel Corporation. All rights reserved.
+ * Copyright (c) 2019 Mellanox Technologies LTD. All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions
+ * are met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above copyright
+ * notice, this list of conditions and the following disclaimer in
+ * the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Intel Corporation nor the names of its
+ * contributors may be used to endorse or promote products derived
+ * from this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+#include "spdk/stdinc.h"
+#include "spdk/version.h"
+
+#include "spdk_internal/event.h"
+
+#include "spdk/env.h"
+#include "spdk/log.h"
+#include "spdk/conf.h"
+#include "spdk/thread.h"
+#include "spdk/trace.h"
+#include "spdk/string.h"
+#include "spdk/rpc.h"
+#include "spdk/util.h"
+
+#define SPDK_APP_DEFAULT_LOG_LEVEL SPDK_LOG_NOTICE
+#define SPDK_APP_DEFAULT_LOG_PRINT_LEVEL SPDK_LOG_INFO
+#define SPDK_APP_DEFAULT_NUM_TRACE_ENTRIES SPDK_DEFAULT_NUM_TRACE_ENTRIES
+
+#define SPDK_APP_DPDK_DEFAULT_MEM_SIZE -1
+#define SPDK_APP_DPDK_DEFAULT_MASTER_CORE -1
+#define SPDK_APP_DPDK_DEFAULT_MEM_CHANNEL -1
+#define SPDK_APP_DPDK_DEFAULT_CORE_MASK "0x1"
+#define SPDK_APP_DPDK_DEFAULT_BASE_VIRTADDR 0x200000000000
+#define SPDK_APP_DEFAULT_CORE_LIMIT 0x140000000 /* 5 GiB */
+
+struct spdk_app {
+ struct spdk_conf *config;
+ const char *json_config_file;
+ bool json_config_ignore_errors;
+ const char *rpc_addr;
+ int shm_id;
+ spdk_app_shutdown_cb shutdown_cb;
+ int rc;
+};
+
+static struct spdk_app g_spdk_app;
+static spdk_msg_fn g_start_fn = NULL;
+static void *g_start_arg = NULL;
+static struct spdk_thread *g_app_thread = NULL;
+static bool g_delay_subsystem_init = false;
+static bool g_shutdown_sig_received = false;
+static char *g_executable_name;
+static struct spdk_app_opts g_default_opts;
+
+int
+spdk_app_get_shm_id(void)
+{
+ return g_spdk_app.shm_id;
+}
+
+/* append one empty option to indicate the end of the array */
+static const struct option g_cmdline_options[] = {
+#define CONFIG_FILE_OPT_IDX 'c'
+ {"config", required_argument, NULL, CONFIG_FILE_OPT_IDX},
+#define LIMIT_COREDUMP_OPT_IDX 'd'
+ {"limit-coredump", no_argument, NULL, LIMIT_COREDUMP_OPT_IDX},
+#define TPOINT_GROUP_MASK_OPT_IDX 'e'
+ {"tpoint-group-mask", required_argument, NULL, TPOINT_GROUP_MASK_OPT_IDX},
+#define SINGLE_FILE_SEGMENTS_OPT_IDX 'g'
+ {"single-file-segments", no_argument, NULL, SINGLE_FILE_SEGMENTS_OPT_IDX},
+#define HELP_OPT_IDX 'h'
+ {"help", no_argument, NULL, HELP_OPT_IDX},
+#define SHM_ID_OPT_IDX 'i'
+ {"shm-id", required_argument, NULL, SHM_ID_OPT_IDX},
+#define CPUMASK_OPT_IDX 'm'
+ {"cpumask", required_argument, NULL, CPUMASK_OPT_IDX},
+#define MEM_CHANNELS_OPT_IDX 'n'
+ {"mem-channels", required_argument, NULL, MEM_CHANNELS_OPT_IDX},
+#define MASTER_CORE_OPT_IDX 'p'
+ {"master-core", required_argument, NULL, MASTER_CORE_OPT_IDX},
+#define RPC_SOCKET_OPT_IDX 'r'
+ {"rpc-socket", required_argument, NULL, RPC_SOCKET_OPT_IDX},
+#define MEM_SIZE_OPT_IDX 's'
+ {"mem-size", required_argument, NULL, MEM_SIZE_OPT_IDX},
+#define NO_PCI_OPT_IDX 'u'
+ {"no-pci", no_argument, NULL, NO_PCI_OPT_IDX},
+#define VERSION_OPT_IDX 'v'
+ {"version", no_argument, NULL, VERSION_OPT_IDX},
+#define PCI_BLACKLIST_OPT_IDX 'B'
+ {"pci-blacklist", required_argument, NULL, PCI_BLACKLIST_OPT_IDX},
+#define LOGFLAG_OPT_IDX 'L'
+ {"logflag", required_argument, NULL, LOGFLAG_OPT_IDX},
+#define HUGE_UNLINK_OPT_IDX 'R'
+ {"huge-unlink", no_argument, NULL, HUGE_UNLINK_OPT_IDX},
+#define PCI_WHITELIST_OPT_IDX 'W'
+ {"pci-whitelist", required_argument, NULL, PCI_WHITELIST_OPT_IDX},
+#define SILENCE_NOTICELOG_OPT_IDX 257
+ {"silence-noticelog", no_argument, NULL, SILENCE_NOTICELOG_OPT_IDX},
+#define WAIT_FOR_RPC_OPT_IDX 258
+ {"wait-for-rpc", no_argument, NULL, WAIT_FOR_RPC_OPT_IDX},
+#define HUGE_DIR_OPT_IDX 259
+ {"huge-dir", required_argument, NULL, HUGE_DIR_OPT_IDX},
+#define NUM_TRACE_ENTRIES_OPT_IDX 260
+ {"num-trace-entries", required_argument, NULL, NUM_TRACE_ENTRIES_OPT_IDX},
+#define MAX_REACTOR_DELAY_OPT_IDX 261
+ {"max-delay", required_argument, NULL, MAX_REACTOR_DELAY_OPT_IDX},
+#define JSON_CONFIG_OPT_IDX 262
+ {"json", required_argument, NULL, JSON_CONFIG_OPT_IDX},
+#define JSON_CONFIG_IGNORE_INIT_ERRORS_IDX 263
+ {"json-ignore-init-errors", no_argument, NULL, JSON_CONFIG_IGNORE_INIT_ERRORS_IDX},
+#define IOVA_MODE_OPT_IDX 264
+ {"iova-mode", required_argument, NULL, IOVA_MODE_OPT_IDX},
+#define BASE_VIRTADDR_OPT_IDX 265
+ {"base-virtaddr", required_argument, NULL, BASE_VIRTADDR_OPT_IDX},
+};
+
+/* Global section */
+#define GLOBAL_CONFIG_TMPL \
+"# Configuration file\n" \
+"#\n" \
+"# Please write all parameters using ASCII.\n" \
+"# The parameter must be quoted if it includes whitespace.\n" \
+"#\n" \
+"# Configuration syntax:\n" \
+"# Spaces at head of line are deleted, other spaces are as separator\n" \
+"# Lines starting with '#' are comments and not evaluated.\n" \
+"# Lines ending with '\\' are concatenated with the next line.\n" \
+"# Bracketed keys are section keys grouping the following value keys.\n" \
+"# Number of section key is used as a tag number.\n" \
+"# Ex. [TargetNode1] = TargetNode section key with tag number 1\n" \
+"[Global]\n" \
+" Comment \"Global section\"\n" \
+"\n" \
+" # Users can restrict work items to only run on certain cores by\n" \
+" # specifying a ReactorMask. Default is to allow work items to run\n" \
+" # on all cores. Core 0 must be set in the mask if one is specified.\n" \
+" # Default: 0xFFFF (cores 0-15)\n" \
+" ReactorMask \"0x%s\"\n" \
+"\n" \
+" # Tracepoint group mask for spdk trace buffers\n" \
+" # Default: 0x0 (all tracepoint groups disabled)\n" \
+" # Set to 0xFFFF to enable all tracepoint groups.\n" \
+" TpointGroupMask \"0x%" PRIX64 "\"\n" \
+"\n" \
+
+static void
+app_config_dump_global_section(FILE *fp)
+{
+ struct spdk_cpuset *coremask;
+
+ if (NULL == fp) {
+ return;
+ }
+
+ coremask = spdk_app_get_core_mask();
+
+ fprintf(fp, GLOBAL_CONFIG_TMPL, spdk_cpuset_fmt(coremask),
+ spdk_trace_get_tpoint_group_mask());
+}
+
+int
+spdk_app_get_running_config(char **config_str, char *name)
+{
+ FILE *fp = NULL;
+ int fd = -1;
+ long length = 0, ret = 0;
+ char vbuf[BUFSIZ];
+ char config_template[64];
+
+ snprintf(config_template, sizeof(config_template), "/tmp/%s.XXXXXX", name);
+ /* Create temporary file to hold config */
+ fd = mkstemp(config_template);
+ if (fd == -1) {
+ SPDK_ERRLOG("mkstemp failed\n");
+ return -1;
+ }
+ fp = fdopen(fd, "wb+");
+ if (NULL == fp) {
+ SPDK_ERRLOG("error opening tmpfile fd = %d\n", fd);
+ return -1;
+ }
+
+ /* Buffered IO */
+ setvbuf(fp, vbuf, _IOFBF, BUFSIZ);
+
+ app_config_dump_global_section(fp);
+ spdk_subsystem_config(fp);
+
+ length = ftell(fp);
+
+ *config_str = malloc(length + 1);
+ if (!*config_str) {
+ SPDK_ERRLOG("out-of-memory for config\n");
+ fclose(fp);
+ return -1;
+ }
+ fseek(fp, 0, SEEK_SET);
+ ret = fread(*config_str, sizeof(char), length, fp);
+ if (ret < length) {
+ SPDK_ERRLOG("short read\n");
+ }
+ fclose(fp);
+ (*config_str)[length] = '\0';
+
+ return 0;
+}
+
+static void
+app_start_shutdown(void *ctx)
+{
+ if (g_spdk_app.shutdown_cb) {
+ g_spdk_app.shutdown_cb();
+ g_spdk_app.shutdown_cb = NULL;
+ } else {
+ spdk_app_stop(0);
+ }
+}
+
+void
+spdk_app_start_shutdown(void)
+{
+ spdk_thread_send_critical_msg(g_app_thread, app_start_shutdown);
+}
+
+static void
+__shutdown_signal(int signo)
+{
+ if (!g_shutdown_sig_received) {
+ g_shutdown_sig_received = true;
+ spdk_app_start_shutdown();
+ }
+}
+
+static int
+app_opts_validate(const char *app_opts)
+{
+ int i = 0, j;
+
+ for (i = 0; app_opts[i] != '\0'; i++) {
+ /* ignore getopt control characters */
+ if (app_opts[i] == ':' || app_opts[i] == '+' || app_opts[i] == '-') {
+ continue;
+ }
+
+ for (j = 0; SPDK_APP_GETOPT_STRING[j] != '\0'; j++) {
+ if (app_opts[i] == SPDK_APP_GETOPT_STRING[j]) {
+ return app_opts[i];
+ }
+ }
+ }
+ return 0;
+}
+
+void
+spdk_app_opts_init(struct spdk_app_opts *opts)
+{
+ if (!opts) {
+ return;
+ }
+
+ memset(opts, 0, sizeof(*opts));
+
+ opts->enable_coredump = true;
+ opts->shm_id = -1;
+ opts->mem_size = SPDK_APP_DPDK_DEFAULT_MEM_SIZE;
+ opts->master_core = SPDK_APP_DPDK_DEFAULT_MASTER_CORE;
+ opts->mem_channel = SPDK_APP_DPDK_DEFAULT_MEM_CHANNEL;
+ opts->reactor_mask = NULL;
+ opts->base_virtaddr = SPDK_APP_DPDK_DEFAULT_BASE_VIRTADDR;
+ opts->print_level = SPDK_APP_DEFAULT_LOG_PRINT_LEVEL;
+ opts->rpc_addr = SPDK_DEFAULT_RPC_ADDR;
+ opts->num_entries = SPDK_APP_DEFAULT_NUM_TRACE_ENTRIES;
+ opts->delay_subsystem_init = false;
+}
+
+static int
+app_setup_signal_handlers(struct spdk_app_opts *opts)
+{
+ struct sigaction sigact;
+ sigset_t sigmask;
+ int rc;
+
+ sigemptyset(&sigmask);
+ memset(&sigact, 0, sizeof(sigact));
+ sigemptyset(&sigact.sa_mask);
+
+ sigact.sa_handler = SIG_IGN;
+ rc = sigaction(SIGPIPE, &sigact, NULL);
+ if (rc < 0) {
+ SPDK_ERRLOG("sigaction(SIGPIPE) failed\n");
+ return rc;
+ }
+
+ /* Install the same handler for SIGINT and SIGTERM */
+ g_shutdown_sig_received = false;
+ sigact.sa_handler = __shutdown_signal;
+ rc = sigaction(SIGINT, &sigact, NULL);
+ if (rc < 0) {
+ SPDK_ERRLOG("sigaction(SIGINT) failed\n");
+ return rc;
+ }
+ sigaddset(&sigmask, SIGINT);
+
+ rc = sigaction(SIGTERM, &sigact, NULL);
+ if (rc < 0) {
+ SPDK_ERRLOG("sigaction(SIGTERM) failed\n");
+ return rc;
+ }
+ sigaddset(&sigmask, SIGTERM);
+
+ if (opts->usr1_handler != NULL) {
+ sigact.sa_handler = opts->usr1_handler;
+ rc = sigaction(SIGUSR1, &sigact, NULL);
+ if (rc < 0) {
+ SPDK_ERRLOG("sigaction(SIGUSR1) failed\n");
+ return rc;
+ }
+ sigaddset(&sigmask, SIGUSR1);
+ }
+
+ pthread_sigmask(SIG_UNBLOCK, &sigmask, NULL);
+
+ return 0;
+}
+
+static void
+app_start_application(void)
+{
+ assert(spdk_get_thread() == g_app_thread);
+
+ g_start_fn(g_start_arg);
+}
+
+static void
+app_start_rpc(int rc, void *arg1)
+{
+ if (rc) {
+ spdk_app_stop(rc);
+ return;
+ }
+
+ spdk_rpc_initialize(g_spdk_app.rpc_addr);
+ if (!g_delay_subsystem_init) {
+ spdk_rpc_set_state(SPDK_RPC_RUNTIME);
+ app_start_application();
+ }
+}
+
+static struct spdk_conf *
+app_setup_conf(const char *config_file)
+{
+ struct spdk_conf *config;
+ int rc;
+
+ config = spdk_conf_allocate();
+ assert(config != NULL);
+ if (config_file) {
+ rc = spdk_conf_read(config, config_file);
+ if (rc != 0) {
+ SPDK_ERRLOG("Could not read config file %s\n", config_file);
+ goto error;
+ }
+ if (spdk_conf_first_section(config) == NULL) {
+ SPDK_ERRLOG("Invalid config file %s\n", config_file);
+ goto error;
+ }
+ }
+ spdk_conf_set_as_default(config);
+ return config;
+
+error:
+ spdk_conf_free(config);
+ return NULL;
+}
+
+static int
+app_opts_add_pci_addr(struct spdk_app_opts *opts, struct spdk_pci_addr **list, char *bdf)
+{
+ struct spdk_pci_addr *tmp = *list;
+ size_t i = opts->num_pci_addr;
+
+ tmp = realloc(tmp, sizeof(*tmp) * (i + 1));
+ if (tmp == NULL) {
+ SPDK_ERRLOG("realloc error\n");
+ return -ENOMEM;
+ }
+
+ *list = tmp;
+ if (spdk_pci_addr_parse(*list + i, bdf) < 0) {
+ SPDK_ERRLOG("Invalid address %s\n", bdf);
+ return -EINVAL;
+ }
+
+ opts->num_pci_addr++;
+ return 0;
+}
+
+static int
+app_read_config_file_global_params(struct spdk_app_opts *opts)
+{
+ struct spdk_conf_section *sp;
+ char *bdf;
+ int i, rc = 0;
+
+ sp = spdk_conf_find_section(NULL, "Global");
+
+ if (opts->shm_id == -1) {
+ if (sp != NULL) {
+ opts->shm_id = spdk_conf_section_get_intval(sp, "SharedMemoryID");
+ }
+ }
+
+ if (opts->reactor_mask == NULL) {
+ if (sp && spdk_conf_section_get_val(sp, "ReactorMask")) {
+ SPDK_ERRLOG("ReactorMask config option is deprecated. Use -m/--cpumask\n"
+ "command line parameter instead.\n");
+ opts->reactor_mask = spdk_conf_section_get_val(sp, "ReactorMask");
+ } else {
+ opts->reactor_mask = SPDK_APP_DPDK_DEFAULT_CORE_MASK;
+ }
+ }
+
+ if (!opts->no_pci && sp) {
+ opts->no_pci = spdk_conf_section_get_boolval(sp, "NoPci", false);
+ }
+
+ if (opts->tpoint_group_mask == NULL) {
+ if (sp != NULL) {
+ opts->tpoint_group_mask = spdk_conf_section_get_val(sp, "TpointGroupMask");
+ }
+ }
+
+ if (sp == NULL) {
+ return 0;
+ }
+
+ for (i = 0; ; i++) {
+ bdf = spdk_conf_section_get_nmval(sp, "PciBlacklist", i, 0);
+ if (!bdf) {
+ break;
+ }
+
+ rc = app_opts_add_pci_addr(opts, &opts->pci_blacklist, bdf);
+ if (rc != 0) {
+ free(opts->pci_blacklist);
+ return rc;
+ }
+ }
+
+ for (i = 0; ; i++) {
+ bdf = spdk_conf_section_get_nmval(sp, "PciWhitelist", i, 0);
+ if (!bdf) {
+ break;
+ }
+
+ if (opts->pci_blacklist != NULL) {
+ SPDK_ERRLOG("PciBlacklist and PciWhitelist cannot be used at the same time\n");
+ free(opts->pci_blacklist);
+ return -EINVAL;
+ }
+
+ rc = app_opts_add_pci_addr(opts, &opts->pci_whitelist, bdf);
+ if (rc != 0) {
+ free(opts->pci_whitelist);
+ return rc;
+ }
+ }
+ return 0;
+}
+
+static int
+app_setup_env(struct spdk_app_opts *opts)
+{
+ struct spdk_env_opts env_opts = {};
+ int rc;
+
+ if (opts == NULL) {
+ rc = spdk_env_init(NULL);
+ if (rc != 0) {
+ SPDK_ERRLOG("Unable to reinitialize SPDK env\n");
+ }
+
+ return rc;
+ }
+
+
+ spdk_env_opts_init(&env_opts);
+
+ env_opts.name = opts->name;
+ env_opts.core_mask = opts->reactor_mask;
+ env_opts.shm_id = opts->shm_id;
+ env_opts.mem_channel = opts->mem_channel;
+ env_opts.master_core = opts->master_core;
+ env_opts.mem_size = opts->mem_size;
+ env_opts.hugepage_single_segments = opts->hugepage_single_segments;
+ env_opts.unlink_hugepage = opts->unlink_hugepage;
+ env_opts.hugedir = opts->hugedir;
+ env_opts.no_pci = opts->no_pci;
+ env_opts.num_pci_addr = opts->num_pci_addr;
+ env_opts.pci_blacklist = opts->pci_blacklist;
+ env_opts.pci_whitelist = opts->pci_whitelist;
+ env_opts.env_context = opts->env_context;
+ env_opts.iova_mode = opts->iova_mode;
+
+ rc = spdk_env_init(&env_opts);
+ free(env_opts.pci_blacklist);
+ free(env_opts.pci_whitelist);
+
+
+ if (rc < 0) {
+ SPDK_ERRLOG("Unable to initialize SPDK env\n");
+ }
+
+ return rc;
+}
+
+static int
+app_setup_trace(struct spdk_app_opts *opts)
+{
+ char shm_name[64];
+ uint64_t tpoint_group_mask;
+ char *end;
+
+ if (opts->shm_id >= 0) {
+ snprintf(shm_name, sizeof(shm_name), "/%s_trace.%d", opts->name, opts->shm_id);
+ } else {
+ snprintf(shm_name, sizeof(shm_name), "/%s_trace.pid%d", opts->name, (int)getpid());
+ }
+
+ if (spdk_trace_init(shm_name, opts->num_entries) != 0) {
+ return -1;
+ }
+
+ if (opts->tpoint_group_mask != NULL) {
+ errno = 0;
+ tpoint_group_mask = strtoull(opts->tpoint_group_mask, &end, 16);
+ if (*end != '\0' || errno) {
+ SPDK_ERRLOG("invalid tpoint mask %s\n", opts->tpoint_group_mask);
+ } else {
+ SPDK_NOTICELOG("Tracepoint Group Mask %s specified.\n", opts->tpoint_group_mask);
+ SPDK_NOTICELOG("Use 'spdk_trace -s %s %s %d' to capture a snapshot of events at runtime.\n",
+ opts->name,
+ opts->shm_id >= 0 ? "-i" : "-p",
+ opts->shm_id >= 0 ? opts->shm_id : getpid());
+#if defined(__linux__)
+ SPDK_NOTICELOG("Or copy /dev/shm%s for offline analysis/debug.\n", shm_name);
+#endif
+ spdk_trace_set_tpoint_group_mask(tpoint_group_mask);
+ }
+ }
+
+ return 0;
+}
+
+static void
+bootstrap_fn(void *arg1)
+{
+ if (g_spdk_app.json_config_file) {
+ g_delay_subsystem_init = false;
+ spdk_app_json_config_load(g_spdk_app.json_config_file, g_spdk_app.rpc_addr, app_start_rpc,
+ NULL, !g_spdk_app.json_config_ignore_errors);
+ } else {
+ if (!g_delay_subsystem_init) {
+ spdk_subsystem_init(app_start_rpc, NULL);
+ } else {
+ spdk_rpc_initialize(g_spdk_app.rpc_addr);
+ }
+ }
+}
+
+int
+spdk_app_start(struct spdk_app_opts *opts, spdk_msg_fn start_fn,
+ void *arg1)
+{
+ struct spdk_conf *config = NULL;
+ int rc;
+ char *tty;
+ struct spdk_cpuset tmp_cpumask = {};
+ static bool g_env_was_setup = false;
+
+ if (!opts) {
+ SPDK_ERRLOG("opts should not be NULL\n");
+ return 1;
+ }
+
+ if (!start_fn) {
+ SPDK_ERRLOG("start_fn should not be NULL\n");
+ return 1;
+ }
+
+ tty = ttyname(STDERR_FILENO);
+ if (opts->print_level > SPDK_LOG_WARN &&
+ isatty(STDERR_FILENO) &&
+ tty &&
+ !strncmp(tty, "/dev/tty", strlen("/dev/tty"))) {
+ printf("Warning: printing stderr to console terminal without -q option specified.\n");
+ printf("Suggest using --silence-noticelog to disable logging to stderr and\n");
+ printf("monitor syslog, or redirect stderr to a file.\n");
+ printf("(Delaying for 10 seconds...)\n");
+ sleep(10);
+ }
+
+ spdk_log_set_print_level(opts->print_level);
+
+#ifndef SPDK_NO_RLIMIT
+ if (opts->enable_coredump) {
+ struct rlimit core_limits;
+
+ core_limits.rlim_cur = core_limits.rlim_max = SPDK_APP_DEFAULT_CORE_LIMIT;
+ setrlimit(RLIMIT_CORE, &core_limits);
+ }
+#endif
+
+ config = app_setup_conf(opts->config_file);
+ if (config == NULL) {
+ return 1;
+ }
+
+ if (app_read_config_file_global_params(opts) < 0) {
+ spdk_conf_free(config);
+ return 1;
+ }
+
+ memset(&g_spdk_app, 0, sizeof(g_spdk_app));
+ g_spdk_app.config = config;
+ g_spdk_app.json_config_file = opts->json_config_file;
+ g_spdk_app.json_config_ignore_errors = opts->json_config_ignore_errors;
+ g_spdk_app.rpc_addr = opts->rpc_addr;
+ g_spdk_app.shm_id = opts->shm_id;
+ g_spdk_app.shutdown_cb = opts->shutdown_cb;
+ g_spdk_app.rc = 0;
+
+ spdk_log_set_level(SPDK_APP_DEFAULT_LOG_LEVEL);
+
+ /* Pass NULL to app_setup_env if SPDK app has been set up, in order to
+ * indicate that this is a reinitialization.
+ */
+ if (app_setup_env(g_env_was_setup ? NULL : opts) < 0) {
+ return 1;
+ }
+
+ spdk_log_open(opts->log);
+ SPDK_NOTICELOG("Total cores available: %d\n", spdk_env_get_core_count());
+
+ /*
+ * If mask not specified on command line or in configuration file,
+ * reactor_mask will be 0x1 which will enable core 0 to run one
+ * reactor.
+ */
+ if ((rc = spdk_reactors_init()) != 0) {
+ SPDK_ERRLOG("Reactor Initilization failed: rc = %d\n", rc);
+ return 1;
+ }
+
+ spdk_cpuset_set_cpu(&tmp_cpumask, spdk_env_get_current_core(), true);
+
+ /* Now that the reactors have been initialized, we can create an
+ * initialization thread. */
+ g_app_thread = spdk_thread_create("app_thread", &tmp_cpumask);
+ if (!g_app_thread) {
+ SPDK_ERRLOG("Unable to create an spdk_thread for initialization\n");
+ return 1;
+ }
+
+ /*
+ * Note the call to app_setup_trace() is located here
+ * ahead of app_setup_signal_handlers().
+ * That's because there is not an easy/direct clean
+ * way of unwinding alloc'd resources that can occur
+ * in app_setup_signal_handlers().
+ */
+ if (app_setup_trace(opts) != 0) {
+ return 1;
+ }
+
+ if ((rc = app_setup_signal_handlers(opts)) != 0) {
+ return 1;
+ }
+
+ g_delay_subsystem_init = opts->delay_subsystem_init;
+ g_start_fn = start_fn;
+ g_start_arg = arg1;
+
+ spdk_thread_send_msg(g_app_thread, bootstrap_fn, NULL);
+
+ /* This blocks until spdk_app_stop is called */
+ spdk_reactors_start();
+
+ g_env_was_setup = true;
+
+ return g_spdk_app.rc;
+}
+
+void
+spdk_app_fini(void)
+{
+ spdk_trace_cleanup();
+ spdk_reactors_fini();
+ spdk_env_fini();
+ spdk_conf_free(g_spdk_app.config);
+ spdk_log_close();
+}
+
+static void
+app_stop(void *arg1)
+{
+ spdk_rpc_finish();
+ spdk_subsystem_fini(spdk_reactors_stop, NULL);
+}
+
+void
+spdk_app_stop(int rc)
+{
+ if (rc) {
+ SPDK_WARNLOG("spdk_app_stop'd on non-zero\n");
+ }
+ g_spdk_app.rc = rc;
+ /*
+ * We want to run spdk_subsystem_fini() from the same thread where spdk_subsystem_init()
+ * was called.
+ */
+ spdk_thread_send_msg(g_app_thread, app_stop, NULL);
+}
+
+static void
+usage(void (*app_usage)(void))
+{
+ printf("%s [options]\n", g_executable_name);
+ printf("options:\n");
+ printf(" -c, --config <config> config file (default %s)\n",
+ g_default_opts.config_file != NULL ? g_default_opts.config_file : "none");
+ printf(" --json <config> JSON config file (default %s)\n",
+ g_default_opts.json_config_file != NULL ? g_default_opts.json_config_file : "none");
+ printf(" --json-ignore-init-errors\n");
+ printf(" don't exit on invalid config entry\n");
+ printf(" -d, --limit-coredump do not set max coredump size to RLIM_INFINITY\n");
+ printf(" -g, --single-file-segments\n");
+ printf(" force creating just one hugetlbfs file\n");
+ printf(" -h, --help show this usage\n");
+ printf(" -i, --shm-id <id> shared memory ID (optional)\n");
+ printf(" -m, --cpumask <mask> core mask for DPDK\n");
+ printf(" -n, --mem-channels <num> channel number of memory channels used for DPDK\n");
+ printf(" -p, --master-core <id> master (primary) core for DPDK\n");
+ printf(" -r, --rpc-socket <path> RPC listen address (default %s)\n", SPDK_DEFAULT_RPC_ADDR);
+ printf(" -s, --mem-size <size> memory size in MB for DPDK (default: ");
+#ifndef __linux__
+ if (g_default_opts.mem_size <= 0) {
+ printf("all hugepage memory)\n");
+ } else
+#endif
+ {
+ printf("%dMB)\n", g_default_opts.mem_size >= 0 ? g_default_opts.mem_size : 0);
+ }
+ printf(" --silence-noticelog disable notice level logging to stderr\n");
+ printf(" -u, --no-pci disable PCI access\n");
+ printf(" --wait-for-rpc wait for RPCs to initialize subsystems\n");
+ printf(" --max-delay <num> maximum reactor delay (in microseconds)\n");
+ printf(" -B, --pci-blacklist <bdf>\n");
+ printf(" pci addr to blacklist (can be used more than once)\n");
+ printf(" -R, --huge-unlink unlink huge files after initialization\n");
+ printf(" -v, --version print SPDK version\n");
+ printf(" -W, --pci-whitelist <bdf>\n");
+ printf(" pci addr to whitelist (-B and -W cannot be used at the same time)\n");
+ printf(" --huge-dir <path> use a specific hugetlbfs mount to reserve memory from\n");
+ printf(" --iova-mode <pa/va> set IOVA mode ('pa' for IOVA_PA and 'va' for IOVA_VA)\n");
+ printf(" --base-virtaddr <addr> the base virtual address for DPDK (default: 0x200000000000)\n");
+ printf(" --num-trace-entries <num> number of trace entries for each core, must be power of 2. (default %d)\n",
+ SPDK_APP_DEFAULT_NUM_TRACE_ENTRIES);
+ spdk_log_usage(stdout, "-L");
+ spdk_trace_mask_usage(stdout, "-e");
+ if (app_usage) {
+ app_usage();
+ }
+}
+
+spdk_app_parse_args_rvals_t
+spdk_app_parse_args(int argc, char **argv, struct spdk_app_opts *opts,
+ const char *app_getopt_str, struct option *app_long_opts,
+ int (*app_parse)(int ch, char *arg),
+ void (*app_usage)(void))
+{
+ int ch, rc, opt_idx, global_long_opts_len, app_long_opts_len;
+ struct option *cmdline_options;
+ char *cmdline_short_opts = NULL;
+ enum spdk_app_parse_args_rvals retval = SPDK_APP_PARSE_ARGS_FAIL;
+ long int tmp;
+
+ memcpy(&g_default_opts, opts, sizeof(g_default_opts));
+
+ if (opts->config_file && access(opts->config_file, R_OK) != 0) {
+ SPDK_WARNLOG("Can't read legacy configuration file '%s'\n", opts->config_file);
+ opts->config_file = NULL;
+ }
+
+ if (opts->json_config_file && access(opts->json_config_file, R_OK) != 0) {
+ SPDK_WARNLOG("Can't read JSON configuration file '%s'\n", opts->json_config_file);
+ opts->json_config_file = NULL;
+ }
+
+ if (app_long_opts == NULL) {
+ app_long_opts_len = 0;
+ } else {
+ for (app_long_opts_len = 0;
+ app_long_opts[app_long_opts_len].name != NULL;
+ app_long_opts_len++);
+ }
+
+ global_long_opts_len = SPDK_COUNTOF(g_cmdline_options);
+
+ cmdline_options = calloc(global_long_opts_len + app_long_opts_len + 1, sizeof(*cmdline_options));
+ if (!cmdline_options) {
+ SPDK_ERRLOG("Out of memory\n");
+ return SPDK_APP_PARSE_ARGS_FAIL;
+ }
+
+ memcpy(&cmdline_options[0], g_cmdline_options, sizeof(g_cmdline_options));
+ if (app_long_opts) {
+ memcpy(&cmdline_options[global_long_opts_len], app_long_opts,
+ app_long_opts_len * sizeof(*app_long_opts));
+ }
+
+ if (app_getopt_str != NULL) {
+ ch = app_opts_validate(app_getopt_str);
+ if (ch) {
+ SPDK_ERRLOG("Duplicated option '%c' between the generic and application specific spdk opts.\n",
+ ch);
+ goto out;
+ }
+ }
+
+ cmdline_short_opts = spdk_sprintf_alloc("%s%s", app_getopt_str, SPDK_APP_GETOPT_STRING);
+ if (!cmdline_short_opts) {
+ SPDK_ERRLOG("Out of memory\n");
+ goto out;
+ }
+
+ g_executable_name = argv[0];
+
+ while ((ch = getopt_long(argc, argv, cmdline_short_opts, cmdline_options, &opt_idx)) != -1) {
+ switch (ch) {
+ case CONFIG_FILE_OPT_IDX:
+ opts->config_file = optarg;
+ break;
+ case JSON_CONFIG_OPT_IDX:
+ opts->json_config_file = optarg;
+ break;
+ case JSON_CONFIG_IGNORE_INIT_ERRORS_IDX:
+ opts->json_config_ignore_errors = true;
+ break;
+ case LIMIT_COREDUMP_OPT_IDX:
+ opts->enable_coredump = false;
+ break;
+ case TPOINT_GROUP_MASK_OPT_IDX:
+ opts->tpoint_group_mask = optarg;
+ break;
+ case SINGLE_FILE_SEGMENTS_OPT_IDX:
+ opts->hugepage_single_segments = true;
+ break;
+ case HELP_OPT_IDX:
+ usage(app_usage);
+ retval = SPDK_APP_PARSE_ARGS_HELP;
+ goto out;
+ case SHM_ID_OPT_IDX:
+ opts->shm_id = spdk_strtol(optarg, 0);
+ if (opts->shm_id < 0) {
+ SPDK_ERRLOG("Invalid shared memory ID %s\n", optarg);
+ goto out;
+ }
+ break;
+ case CPUMASK_OPT_IDX:
+ opts->reactor_mask = optarg;
+ break;
+ case MEM_CHANNELS_OPT_IDX:
+ opts->mem_channel = spdk_strtol(optarg, 0);
+ if (opts->mem_channel < 0) {
+ SPDK_ERRLOG("Invalid memory channel %s\n", optarg);
+ goto out;
+ }
+ break;
+ case MASTER_CORE_OPT_IDX:
+ opts->master_core = spdk_strtol(optarg, 0);
+ if (opts->master_core < 0) {
+ SPDK_ERRLOG("Invalid master core %s\n", optarg);
+ goto out;
+ }
+ break;
+ case SILENCE_NOTICELOG_OPT_IDX:
+ opts->print_level = SPDK_LOG_WARN;
+ break;
+ case RPC_SOCKET_OPT_IDX:
+ opts->rpc_addr = optarg;
+ break;
+ case MEM_SIZE_OPT_IDX: {
+ uint64_t mem_size_mb;
+ bool mem_size_has_prefix;
+
+ rc = spdk_parse_capacity(optarg, &mem_size_mb, &mem_size_has_prefix);
+ if (rc != 0) {
+ SPDK_ERRLOG("invalid memory pool size `-s %s`\n", optarg);
+ usage(app_usage);
+ goto out;
+ }
+
+ if (mem_size_has_prefix) {
+ /* the mem size is in MB by default, so if a prefix was
+ * specified, we need to manually convert to MB.
+ */
+ mem_size_mb /= 1024 * 1024;
+ }
+
+ if (mem_size_mb > INT_MAX) {
+ SPDK_ERRLOG("invalid memory pool size `-s %s`\n", optarg);
+ usage(app_usage);
+ goto out;
+ }
+
+ opts->mem_size = (int) mem_size_mb;
+ break;
+ }
+ case NO_PCI_OPT_IDX:
+ opts->no_pci = true;
+ break;
+ case WAIT_FOR_RPC_OPT_IDX:
+ opts->delay_subsystem_init = true;
+ break;
+ case PCI_BLACKLIST_OPT_IDX:
+ if (opts->pci_whitelist) {
+ free(opts->pci_whitelist);
+ opts->pci_whitelist = NULL;
+ SPDK_ERRLOG("-B and -W cannot be used at the same time\n");
+ usage(app_usage);
+ goto out;
+ }
+
+ rc = app_opts_add_pci_addr(opts, &opts->pci_blacklist, optarg);
+ if (rc != 0) {
+ free(opts->pci_blacklist);
+ opts->pci_blacklist = NULL;
+ goto out;
+ }
+ break;
+ case LOGFLAG_OPT_IDX:
+#ifndef DEBUG
+ SPDK_ERRLOG("%s must be configured with --enable-debug for -L flag\n",
+ argv[0]);
+ usage(app_usage);
+ goto out;
+#else
+ rc = spdk_log_set_flag(optarg);
+ if (rc < 0) {
+ SPDK_ERRLOG("unknown flag\n");
+ usage(app_usage);
+ goto out;
+ }
+ opts->print_level = SPDK_LOG_DEBUG;
+ break;
+#endif
+ case HUGE_UNLINK_OPT_IDX:
+ opts->unlink_hugepage = true;
+ break;
+ case PCI_WHITELIST_OPT_IDX:
+ if (opts->pci_blacklist) {
+ free(opts->pci_blacklist);
+ opts->pci_blacklist = NULL;
+ SPDK_ERRLOG("-B and -W cannot be used at the same time\n");
+ usage(app_usage);
+ goto out;
+ }
+
+ rc = app_opts_add_pci_addr(opts, &opts->pci_whitelist, optarg);
+ if (rc != 0) {
+ free(opts->pci_whitelist);
+ opts->pci_whitelist = NULL;
+ goto out;
+ }
+ break;
+ case BASE_VIRTADDR_OPT_IDX:
+ tmp = spdk_strtoll(optarg, 0);
+ if (tmp <= 0) {
+ SPDK_ERRLOG("Invalid base-virtaddr %s\n", optarg);
+ usage(app_usage);
+ goto out;
+ }
+ opts->base_virtaddr = (uint64_t)tmp;
+ break;
+ case HUGE_DIR_OPT_IDX:
+ opts->hugedir = optarg;
+ break;
+ case IOVA_MODE_OPT_IDX:
+ opts->iova_mode = optarg;
+ break;
+ case NUM_TRACE_ENTRIES_OPT_IDX:
+ tmp = spdk_strtoll(optarg, 0);
+ if (tmp <= 0) {
+ SPDK_ERRLOG("Invalid num-trace-entries %s\n", optarg);
+ usage(app_usage);
+ goto out;
+ }
+ opts->num_entries = (uint64_t)tmp;
+ if (opts->num_entries & (opts->num_entries - 1)) {
+ SPDK_ERRLOG("num-trace-entries must be power of 2\n");
+ usage(app_usage);
+ goto out;
+ }
+ break;
+ case MAX_REACTOR_DELAY_OPT_IDX:
+ SPDK_ERRLOG("Deprecation warning: The maximum allowed latency parameter is no longer supported.\n");
+ break;
+ case VERSION_OPT_IDX:
+ printf(SPDK_VERSION_STRING"\n");
+ retval = SPDK_APP_PARSE_ARGS_HELP;
+ goto out;
+ case '?':
+ /*
+ * In the event getopt() above detects an option
+ * in argv that is NOT in the getopt_str,
+ * getopt() will return a '?' indicating failure.
+ */
+ usage(app_usage);
+ goto out;
+ default:
+ rc = app_parse(ch, optarg);
+ if (rc) {
+ SPDK_ERRLOG("Parsing application specific arguments failed: %d\n", rc);
+ goto out;
+ }
+ }
+ }
+
+ if (opts->config_file && opts->json_config_file) {
+ SPDK_ERRLOG("ERROR: Legacy config and JSON config can't be used together.\n");
+ goto out;
+ }
+
+ if (opts->json_config_file && opts->delay_subsystem_init) {
+ SPDK_ERRLOG("ERROR: JSON configuration file can't be used together with --wait-for-rpc.\n");
+ goto out;
+ }
+
+ /* TBD: Replace warning by failure when RPCs for startup are prepared. */
+ if (opts->config_file && opts->delay_subsystem_init) {
+ fprintf(stderr,
+ "WARNING: --wait-for-rpc and config file are used at the same time. "
+ "- Please be careful one options might overwrite others.\n");
+ }
+
+ retval = SPDK_APP_PARSE_ARGS_SUCCESS;
+out:
+ if (retval != SPDK_APP_PARSE_ARGS_SUCCESS) {
+ free(opts->pci_blacklist);
+ opts->pci_blacklist = NULL;
+ free(opts->pci_whitelist);
+ opts->pci_whitelist = NULL;
+ }
+ free(cmdline_short_opts);
+ free(cmdline_options);
+ return retval;
+}
+
+void
+spdk_app_usage(void)
+{
+ if (g_executable_name == NULL) {
+ SPDK_ERRLOG("%s not valid before calling spdk_app_parse_args()\n", __func__);
+ return;
+ }
+
+ usage(NULL);
+}
+
+static void
+rpc_framework_start_init_cpl(int rc, void *arg1)
+{
+ struct spdk_jsonrpc_request *request = arg1;
+ struct spdk_json_write_ctx *w;
+
+ assert(spdk_get_thread() == g_app_thread);
+
+ if (rc) {
+ spdk_jsonrpc_send_error_response(request, SPDK_JSONRPC_ERROR_INTERNAL_ERROR,
+ "framework_initialization failed");
+ return;
+ }
+
+ spdk_rpc_set_state(SPDK_RPC_RUNTIME);
+ app_start_application();
+
+ w = spdk_jsonrpc_begin_result(request);
+ spdk_json_write_bool(w, true);
+ spdk_jsonrpc_end_result(request, w);
+}
+
+static void
+rpc_framework_start_init(struct spdk_jsonrpc_request *request,
+ const struct spdk_json_val *params)
+{
+ if (params != NULL) {
+ spdk_jsonrpc_send_error_response(request, SPDK_JSONRPC_ERROR_INVALID_PARAMS,
+ "framework_start_init requires no parameters");
+ return;
+ }
+
+ spdk_subsystem_init(rpc_framework_start_init_cpl, request);
+}
+SPDK_RPC_REGISTER("framework_start_init", rpc_framework_start_init, SPDK_RPC_STARTUP)
+SPDK_RPC_REGISTER_ALIAS_DEPRECATED(framework_start_init, start_subsystem_init)
+
+struct subsystem_init_poller_ctx {
+ struct spdk_poller *init_poller;
+ struct spdk_jsonrpc_request *request;
+};
+
+static int
+rpc_subsystem_init_poller_ctx(void *ctx)
+{
+ struct spdk_json_write_ctx *w;
+ struct subsystem_init_poller_ctx *poller_ctx = ctx;
+
+ if (spdk_rpc_get_state() == SPDK_RPC_RUNTIME) {
+ w = spdk_jsonrpc_begin_result(poller_ctx->request);
+ spdk_json_write_bool(w, true);
+ spdk_jsonrpc_end_result(poller_ctx->request, w);
+ spdk_poller_unregister(&poller_ctx->init_poller);
+ free(poller_ctx);
+ }
+
+ return SPDK_POLLER_BUSY;
+}
+
+static void
+rpc_framework_wait_init(struct spdk_jsonrpc_request *request,
+ const struct spdk_json_val *params)
+{
+ struct spdk_json_write_ctx *w;
+ struct subsystem_init_poller_ctx *ctx;
+
+ if (spdk_rpc_get_state() == SPDK_RPC_RUNTIME) {
+ w = spdk_jsonrpc_begin_result(request);
+ spdk_json_write_bool(w, true);
+ spdk_jsonrpc_end_result(request, w);
+ } else {
+ ctx = malloc(sizeof(struct subsystem_init_poller_ctx));
+ if (ctx == NULL) {
+ spdk_jsonrpc_send_error_response(request, SPDK_JSONRPC_ERROR_INTERNAL_ERROR,
+ "Unable to allocate memory for the request context\n");
+ return;
+ }
+ ctx->request = request;
+ ctx->init_poller = SPDK_POLLER_REGISTER(rpc_subsystem_init_poller_ctx, ctx, 0);
+ }
+}
+SPDK_RPC_REGISTER("framework_wait_init", rpc_framework_wait_init,
+ SPDK_RPC_STARTUP | SPDK_RPC_RUNTIME)
+SPDK_RPC_REGISTER_ALIAS_DEPRECATED(framework_wait_init, wait_subsystem_init)
diff --git a/src/spdk/lib/event/json_config.c b/src/spdk/lib/event/json_config.c
new file mode 100644
index 000000000..69a95097a
--- /dev/null
+++ b/src/spdk/lib/event/json_config.c
@@ -0,0 +1,630 @@
+/*-
+ * BSD LICENSE
+ *
+ * Copyright (c) Intel Corporation.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions
+ * are met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above copyright
+ * notice, this list of conditions and the following disclaimer in
+ * the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Intel Corporation nor the names of its
+ * contributors may be used to endorse or promote products derived
+ * from this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+#include "spdk/stdinc.h"
+
+#include "spdk/util.h"
+#include "spdk/file.h"
+#include "spdk/log.h"
+#include "spdk/env.h"
+#include "spdk/thread.h"
+#include "spdk/jsonrpc.h"
+#include "spdk/rpc.h"
+
+#include "spdk_internal/event.h"
+#include "spdk_internal/log.h"
+
+#define SPDK_DEBUG_APP_CFG(...) SPDK_DEBUGLOG(SPDK_LOG_APP_CONFIG, __VA_ARGS__)
+
+/* JSON configuration format is as follows
+ *
+ * {
+ * "subsystems" : [ <<== *subsystems JSON array
+ * { <<== *subsystems_it array entry pointer (iterator)
+ * "subsystem": "<< SUBSYSTEM NAME >>",
+ * "config": [ <<== *config JSON array
+ * { <<== *config_it array entry pointer (iterator)
+ * "method": "<< METHOD NAME >>", <<== *method
+ * "params": { << PARAMS >> } <<== *params
+ * },
+ * << MORE "config" ARRY ENTRIES >>
+ * ]
+ * },
+ * << MORE "subsystems" ARRAY ENTRIES >>
+ * ]
+ *
+ * << ANYTHING ELSE IS IGNORRED IN ROOT OBJECT>>
+ * }
+ *
+ */
+
+struct load_json_config_ctx;
+typedef void (*client_resp_handler)(struct load_json_config_ctx *,
+ struct spdk_jsonrpc_client_response *);
+
+#define RPC_SOCKET_PATH_MAX sizeof(((struct sockaddr_un *)0)->sun_path)
+
+/* 1s connections timeout */
+#define RPC_CLIENT_CONNECT_TIMEOUT_US (1U * 1000U * 1000U)
+
+/*
+ * Currently there is no timeout in SPDK for any RPC command. This result that
+ * we can't put a hard limit during configuration load as it most likely randomly fail.
+ * So just print WARNLOG every 10s. */
+#define RPC_CLIENT_REQUEST_TIMEOUT_US (10U * 1000 * 1000)
+
+struct load_json_config_ctx {
+ /* Thread used during configuration. */
+ struct spdk_thread *thread;
+ spdk_subsystem_init_fn cb_fn;
+ void *cb_arg;
+ bool stop_on_error;
+
+ /* Current subsystem */
+ struct spdk_json_val *subsystems; /* "subsystems" array */
+ struct spdk_json_val *subsystems_it; /* current subsystem array position in "subsystems" array */
+
+ struct spdk_json_val *subsystem_name; /* current subsystem name */
+
+ /* Current "config" entry we are processing */
+ struct spdk_json_val *config; /* "config" array */
+ struct spdk_json_val *config_it; /* current config position in "config" array */
+
+ /* Current request id we are sending. */
+ uint32_t rpc_request_id;
+
+ /* Whole configuration file read and parsed. */
+ size_t json_data_size;
+ char *json_data;
+
+ size_t values_cnt;
+ struct spdk_json_val *values;
+
+ char rpc_socket_path_temp[RPC_SOCKET_PATH_MAX + 1];
+
+ struct spdk_jsonrpc_client *client_conn;
+ struct spdk_poller *client_conn_poller;
+
+ client_resp_handler client_resp_cb;
+
+ /* Timeout for current RPC client action. */
+ uint64_t timeout;
+};
+
+static void app_json_config_load_subsystem(void *_ctx);
+
+static void
+app_json_config_load_done(struct load_json_config_ctx *ctx, int rc)
+{
+ spdk_poller_unregister(&ctx->client_conn_poller);
+ if (ctx->client_conn != NULL) {
+ spdk_jsonrpc_client_close(ctx->client_conn);
+ }
+
+ spdk_rpc_finish();
+
+ SPDK_DEBUG_APP_CFG("Config load finished with rc %d\n", rc);
+ ctx->cb_fn(rc, ctx->cb_arg);
+
+ free(ctx->json_data);
+ free(ctx->values);
+ free(ctx);
+}
+
+static void
+rpc_client_set_timeout(struct load_json_config_ctx *ctx, uint64_t timeout_us)
+{
+ ctx->timeout = spdk_get_ticks() + timeout_us * spdk_get_ticks_hz() / (1000 * 1000);
+}
+
+static int
+rpc_client_check_timeout(struct load_json_config_ctx *ctx)
+{
+ if (ctx->timeout < spdk_get_ticks()) {
+ SPDK_WARNLOG("RPC client command timeout.\n");
+ return -ETIMEDOUT;
+ }
+
+ return 0;
+}
+
+struct json_write_buf {
+ char data[1024];
+ unsigned cur_off;
+};
+
+static int
+json_write_stdout(void *cb_ctx, const void *data, size_t size)
+{
+ struct json_write_buf *buf = cb_ctx;
+ size_t rc;
+
+ rc = snprintf(buf->data + buf->cur_off, sizeof(buf->data) - buf->cur_off,
+ "%s", (const char *)data);
+ if (rc > 0) {
+ buf->cur_off += rc;
+ }
+ return rc == size ? 0 : -1;
+}
+
+static int
+rpc_client_poller(void *arg)
+{
+ struct load_json_config_ctx *ctx = arg;
+ struct spdk_jsonrpc_client_response *resp;
+ client_resp_handler cb;
+ int rc;
+
+ assert(spdk_get_thread() == ctx->thread);
+
+ rc = spdk_jsonrpc_client_poll(ctx->client_conn, 0);
+ if (rc == 0) {
+ rc = rpc_client_check_timeout(ctx);
+ if (rc == -ETIMEDOUT) {
+ rpc_client_set_timeout(ctx, RPC_CLIENT_REQUEST_TIMEOUT_US);
+ rc = 0;
+ }
+ }
+
+ if (rc == 0) {
+ /* No response yet */
+ return SPDK_POLLER_BUSY;
+ } else if (rc < 0) {
+ app_json_config_load_done(ctx, rc);
+ return SPDK_POLLER_BUSY;
+ }
+
+ resp = spdk_jsonrpc_client_get_response(ctx->client_conn);
+ assert(resp);
+
+ if (resp->error) {
+ struct json_write_buf buf = {};
+ struct spdk_json_write_ctx *w = spdk_json_write_begin(json_write_stdout,
+ &buf, SPDK_JSON_PARSE_FLAG_DECODE_IN_PLACE);
+
+ if (w == NULL) {
+ SPDK_ERRLOG("error response: (?)\n");
+ } else {
+ spdk_json_write_val(w, resp->error);
+ spdk_json_write_end(w);
+ SPDK_ERRLOG("error response: \n%s\n", buf.data);
+ }
+ }
+
+ if (resp->error && ctx->stop_on_error) {
+ spdk_jsonrpc_client_free_response(resp);
+ app_json_config_load_done(ctx, -EINVAL);
+ } else {
+ /* We have response so we must have callback for it. */
+ cb = ctx->client_resp_cb;
+ assert(cb != NULL);
+
+ /* Mark we are done with this handler. */
+ ctx->client_resp_cb = NULL;
+ cb(ctx, resp);
+ }
+
+
+ return SPDK_POLLER_BUSY;
+}
+
+static int
+rpc_client_connect_poller(void *_ctx)
+{
+ struct load_json_config_ctx *ctx = _ctx;
+ int rc;
+
+ rc = spdk_jsonrpc_client_poll(ctx->client_conn, 0);
+ if (rc != -ENOTCONN) {
+ /* We are connected. Start regular poller and issue first request */
+ spdk_poller_unregister(&ctx->client_conn_poller);
+ ctx->client_conn_poller = SPDK_POLLER_REGISTER(rpc_client_poller, ctx, 100);
+ app_json_config_load_subsystem(ctx);
+ } else {
+ rc = rpc_client_check_timeout(ctx);
+ if (rc) {
+ app_json_config_load_done(ctx, rc);
+ }
+
+ return SPDK_POLLER_IDLE;
+ }
+
+ return SPDK_POLLER_BUSY;
+}
+
+static int
+client_send_request(struct load_json_config_ctx *ctx, struct spdk_jsonrpc_client_request *request,
+ client_resp_handler client_resp_cb)
+{
+ int rc;
+
+ assert(spdk_get_thread() == ctx->thread);
+
+ ctx->client_resp_cb = client_resp_cb;
+ rpc_client_set_timeout(ctx, RPC_CLIENT_REQUEST_TIMEOUT_US);
+ rc = spdk_jsonrpc_client_send_request(ctx->client_conn, request);
+
+ if (rc) {
+ SPDK_DEBUG_APP_CFG("Sending request to client failed (%d)\n", rc);
+ }
+
+ return rc;
+}
+
+static int
+cap_string(const struct spdk_json_val *val, void *out)
+{
+ const struct spdk_json_val **vptr = out;
+
+ if (val->type != SPDK_JSON_VAL_STRING) {
+ return -EINVAL;
+ }
+
+ *vptr = val;
+ return 0;
+}
+
+static int
+cap_object(const struct spdk_json_val *val, void *out)
+{
+ const struct spdk_json_val **vptr = out;
+
+ if (val->type != SPDK_JSON_VAL_OBJECT_BEGIN) {
+ return -EINVAL;
+ }
+
+ *vptr = val;
+ return 0;
+}
+
+
+static int
+cap_array_or_null(const struct spdk_json_val *val, void *out)
+{
+ const struct spdk_json_val **vptr = out;
+
+ if (val->type != SPDK_JSON_VAL_ARRAY_BEGIN && val->type != SPDK_JSON_VAL_NULL) {
+ return -EINVAL;
+ }
+
+ *vptr = val;
+ return 0;
+}
+
+struct config_entry {
+ char *method;
+ struct spdk_json_val *params;
+};
+
+static struct spdk_json_object_decoder jsonrpc_cmd_decoders[] = {
+ {"method", offsetof(struct config_entry, method), spdk_json_decode_string},
+ {"params", offsetof(struct config_entry, params), cap_object, true}
+};
+
+static void app_json_config_load_subsystem_config_entry(void *_ctx);
+
+static void
+app_json_config_load_subsystem_config_entry_next(struct load_json_config_ctx *ctx,
+ struct spdk_jsonrpc_client_response *resp)
+{
+ /* Don't care about the response */
+ spdk_jsonrpc_client_free_response(resp);
+
+ ctx->config_it = spdk_json_next(ctx->config_it);
+ app_json_config_load_subsystem_config_entry(ctx);
+}
+
+/* Load "config" entry */
+static void
+app_json_config_load_subsystem_config_entry(void *_ctx)
+{
+ struct load_json_config_ctx *ctx = _ctx;
+ struct spdk_jsonrpc_client_request *rpc_request;
+ struct spdk_json_write_ctx *w;
+ struct config_entry cfg = {};
+ struct spdk_json_val *params_end;
+ size_t params_len;
+ int rc;
+
+ if (ctx->config_it == NULL) {
+ SPDK_DEBUG_APP_CFG("Subsystem '%.*s': configuration done.\n", ctx->subsystem_name->len,
+ (char *)ctx->subsystem_name->start);
+ ctx->subsystems_it = spdk_json_next(ctx->subsystems_it);
+ /* Invoke later to avoid recurrency */
+ spdk_thread_send_msg(ctx->thread, app_json_config_load_subsystem, ctx);
+ return;
+ }
+
+ if (spdk_json_decode_object(ctx->config_it, jsonrpc_cmd_decoders,
+ SPDK_COUNTOF(jsonrpc_cmd_decoders), &cfg)) {
+ params_end = spdk_json_next(ctx->config_it);
+ assert(params_end != NULL);
+ params_len = params_end->start - ctx->config->start + 1;
+ SPDK_ERRLOG("Failed to decode config entry: %.*s!\n", (int)params_len, (char *)ctx->config_it);
+ app_json_config_load_done(ctx, -EINVAL);
+ goto out;
+ }
+
+ rc = spdk_rpc_is_method_allowed(cfg.method, spdk_rpc_get_state());
+ if (rc == -EPERM) {
+ SPDK_DEBUG_APP_CFG("Method '%s' not allowed -> skipping\n", cfg.method);
+ /* Invoke later to avoid recurrency */
+ ctx->config_it = spdk_json_next(ctx->config_it);
+ spdk_thread_send_msg(ctx->thread, app_json_config_load_subsystem_config_entry, ctx);
+ goto out;
+ }
+
+ /* Get _END by skipping params and going back by one element. */
+ params_end = cfg.params + spdk_json_val_len(cfg.params) - 1;
+
+ /* Need to add one character to include '}' */
+ params_len = params_end->start - cfg.params->start + 1;
+
+ SPDK_DEBUG_APP_CFG("\tmethod: %s\n", cfg.method);
+ SPDK_DEBUG_APP_CFG("\tparams: %.*s\n", (int)params_len, (char *)cfg.params->start);
+
+ rpc_request = spdk_jsonrpc_client_create_request();
+ if (!rpc_request) {
+ app_json_config_load_done(ctx, -errno);
+ goto out;
+ }
+
+ w = spdk_jsonrpc_begin_request(rpc_request, ctx->rpc_request_id, NULL);
+ if (!w) {
+ spdk_jsonrpc_client_free_request(rpc_request);
+ app_json_config_load_done(ctx, -ENOMEM);
+ goto out;
+ }
+
+ spdk_json_write_named_string(w, "method", cfg.method);
+
+ /* No need to parse "params". Just dump the whole content of "params"
+ * directly into the request and let the remote side verify it. */
+ spdk_json_write_name(w, "params");
+ spdk_json_write_val_raw(w, cfg.params->start, params_len);
+ spdk_jsonrpc_end_request(rpc_request, w);
+
+ rc = client_send_request(ctx, rpc_request, app_json_config_load_subsystem_config_entry_next);
+ if (rc != 0) {
+ app_json_config_load_done(ctx, -rc);
+ goto out;
+ }
+out:
+ free(cfg.method);
+}
+
+static void
+subsystem_init_done(int rc, void *arg1)
+{
+ struct load_json_config_ctx *ctx = arg1;
+
+ if (rc) {
+ app_json_config_load_done(ctx, rc);
+ return;
+ }
+
+ spdk_rpc_set_state(SPDK_RPC_RUNTIME);
+ /* Another round. This time for RUNTIME methods */
+ SPDK_DEBUG_APP_CFG("'framework_start_init' done - continuing configuration\n");
+
+ assert(ctx != NULL);
+ if (ctx->subsystems) {
+ ctx->subsystems_it = spdk_json_array_first(ctx->subsystems);
+ }
+
+ app_json_config_load_subsystem(ctx);
+}
+
+static struct spdk_json_object_decoder subsystem_decoders[] = {
+ {"subsystem", offsetof(struct load_json_config_ctx, subsystem_name), cap_string},
+ {"config", offsetof(struct load_json_config_ctx, config), cap_array_or_null}
+};
+
+/*
+ * Start loading subsystem pointed by ctx->subsystems_it. This must point to the
+ * beginning of the "subsystem" object in "subsystems" array or be NULL. If it is
+ * NULL then no more subsystems to load.
+ *
+ * There are two iterations:
+ *
+ * In first iteration only STARTUP RPC methods are used, other methods are ignored. When
+ * allsubsystems are walked the ctx->subsystems_it became NULL and "framework_start_init"
+ * is called to let the SPDK move to RUNTIME state (initialize all subsystems) and
+ * second iteration begins.
+ *
+ * In second iteration "subsystems" array is walked through again, this time only
+ * RUNTIME RPC methods are used. When ctx->subsystems_it became NULL second time it
+ * indicate that there is no more subsystems to load. The cb_fn is called to finish
+ * configuration.
+ */
+static void
+app_json_config_load_subsystem(void *_ctx)
+{
+ struct load_json_config_ctx *ctx = _ctx;
+
+ if (ctx->subsystems_it == NULL) {
+ if (spdk_rpc_get_state() == SPDK_RPC_STARTUP) {
+ SPDK_DEBUG_APP_CFG("No more entries for current state, calling 'framework_start_init'\n");
+ spdk_subsystem_init(subsystem_init_done, ctx);
+ } else {
+ app_json_config_load_done(ctx, 0);
+ }
+
+ return;
+ }
+
+ /* Capture subsystem name and config array */
+ if (spdk_json_decode_object(ctx->subsystems_it, subsystem_decoders,
+ SPDK_COUNTOF(subsystem_decoders), ctx)) {
+ SPDK_ERRLOG("Failed to parse subsystem configuration\n");
+ app_json_config_load_done(ctx, -EINVAL);
+ return;
+ }
+
+ SPDK_DEBUG_APP_CFG("Loading subsystem '%.*s' configuration\n", ctx->subsystem_name->len,
+ (char *)ctx->subsystem_name->start);
+
+ /* Get 'config' array first configuration entry */
+ ctx->config_it = spdk_json_array_first(ctx->config);
+ app_json_config_load_subsystem_config_entry(ctx);
+}
+
+static void *
+read_file(const char *filename, size_t *size)
+{
+ FILE *file = fopen(filename, "r");
+ void *data;
+
+ if (file == NULL) {
+ /* errno is set by fopen */
+ return NULL;
+ }
+
+ data = spdk_posix_file_load(file, size);
+ fclose(file);
+ return data;
+}
+
+static int
+app_json_config_read(const char *config_file, struct load_json_config_ctx *ctx)
+{
+ struct spdk_json_val *values = NULL;
+ void *json = NULL, *end;
+ ssize_t values_cnt, rc;
+ size_t json_size;
+
+ json = read_file(config_file, &json_size);
+ if (!json) {
+ return -errno;
+ }
+
+ rc = spdk_json_parse(json, json_size, NULL, 0, &end,
+ SPDK_JSON_PARSE_FLAG_ALLOW_COMMENTS);
+ if (rc < 0) {
+ SPDK_ERRLOG("Parsing JSON configuration failed (%zd)\n", rc);
+ goto err;
+ }
+
+ values_cnt = rc;
+ values = calloc(values_cnt, sizeof(struct spdk_json_val));
+ if (values == NULL) {
+ SPDK_ERRLOG("Out of memory\n");
+ goto err;
+ }
+
+ rc = spdk_json_parse(json, json_size, values, values_cnt, &end,
+ SPDK_JSON_PARSE_FLAG_ALLOW_COMMENTS);
+ if (rc != values_cnt) {
+ SPDK_ERRLOG("Parsing JSON configuration failed (%zd)\n", rc);
+ goto err;
+ }
+
+ ctx->json_data = json;
+ ctx->json_data_size = json_size;
+
+ ctx->values = values;
+ ctx->values_cnt = values_cnt;
+
+ return 0;
+err:
+ free(json);
+ free(values);
+ return rc;
+}
+
+void
+spdk_app_json_config_load(const char *json_config_file, const char *rpc_addr,
+ spdk_subsystem_init_fn cb_fn, void *cb_arg,
+ bool stop_on_error)
+{
+ struct load_json_config_ctx *ctx = calloc(1, sizeof(*ctx));
+ int rc;
+
+ assert(cb_fn);
+ if (!ctx) {
+ cb_fn(-ENOMEM, cb_arg);
+ return;
+ }
+
+ ctx->cb_fn = cb_fn;
+ ctx->cb_arg = cb_arg;
+ ctx->stop_on_error = stop_on_error;
+ ctx->thread = spdk_get_thread();
+
+ rc = app_json_config_read(json_config_file, ctx);
+ if (rc) {
+ goto fail;
+ }
+
+ /* Capture subsystems array */
+ rc = spdk_json_find_array(ctx->values, "subsystems", NULL, &ctx->subsystems);
+ if (rc) {
+ SPDK_WARNLOG("No 'subsystems' key JSON configuration file.\n");
+ } else {
+ /* Get first subsystem */
+ ctx->subsystems_it = spdk_json_array_first(ctx->subsystems);
+ if (ctx->subsystems_it == NULL) {
+ SPDK_NOTICELOG("'subsystems' configuration is empty\n");
+ }
+ }
+
+ /* If rpc_addr is not an Unix socket use default address as prefix. */
+ if (rpc_addr == NULL || rpc_addr[0] != '/') {
+ rpc_addr = SPDK_DEFAULT_RPC_ADDR;
+ }
+
+ /* FIXME: rpc client should use socketpair() instead of this temporary socket nonsense */
+ rc = snprintf(ctx->rpc_socket_path_temp, sizeof(ctx->rpc_socket_path_temp), "%s.%d_config",
+ rpc_addr, getpid());
+ if (rc >= (int)sizeof(ctx->rpc_socket_path_temp)) {
+ SPDK_ERRLOG("Socket name create failed\n");
+ goto fail;
+ }
+
+ /* FIXME: spdk_rpc_initialize() function should return error code. */
+ spdk_rpc_initialize(ctx->rpc_socket_path_temp);
+ ctx->client_conn = spdk_jsonrpc_client_connect(ctx->rpc_socket_path_temp, AF_UNIX);
+ if (ctx->client_conn == NULL) {
+ SPDK_ERRLOG("Failed to connect to '%s'\n", ctx->rpc_socket_path_temp);
+ goto fail;
+ }
+
+ rpc_client_set_timeout(ctx, RPC_CLIENT_CONNECT_TIMEOUT_US);
+ ctx->client_conn_poller = SPDK_POLLER_REGISTER(rpc_client_connect_poller, ctx, 100);
+ return;
+
+fail:
+ app_json_config_load_done(ctx, -EINVAL);
+}
+
+SPDK_LOG_REGISTER_COMPONENT("app_config", SPDK_LOG_APP_CONFIG)
diff --git a/src/spdk/lib/event/reactor.c b/src/spdk/lib/event/reactor.c
new file mode 100644
index 000000000..cda4a32b1
--- /dev/null
+++ b/src/spdk/lib/event/reactor.c
@@ -0,0 +1,664 @@
+/*-
+ * BSD LICENSE
+ *
+ * Copyright (c) Intel Corporation.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions
+ * are met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above copyright
+ * notice, this list of conditions and the following disclaimer in
+ * the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Intel Corporation nor the names of its
+ * contributors may be used to endorse or promote products derived
+ * from this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+#include "spdk/stdinc.h"
+#include "spdk/likely.h"
+
+#include "spdk_internal/event.h"
+#include "spdk_internal/log.h"
+#include "spdk_internal/thread.h"
+
+#include "spdk/log.h"
+#include "spdk/thread.h"
+#include "spdk/env.h"
+#include "spdk/util.h"
+
+#ifdef __linux__
+#include <sys/prctl.h>
+#endif
+
+#ifdef __FreeBSD__
+#include <pthread_np.h>
+#endif
+
+#define SPDK_EVENT_BATCH_SIZE 8
+
+static struct spdk_reactor *g_reactors;
+static struct spdk_cpuset g_reactor_core_mask;
+static enum spdk_reactor_state g_reactor_state = SPDK_REACTOR_STATE_UNINITIALIZED;
+
+static bool g_framework_context_switch_monitor_enabled = true;
+
+static struct spdk_mempool *g_spdk_event_mempool = NULL;
+
+static void
+reactor_construct(struct spdk_reactor *reactor, uint32_t lcore)
+{
+ reactor->lcore = lcore;
+ reactor->flags.is_valid = true;
+
+ TAILQ_INIT(&reactor->threads);
+ reactor->thread_count = 0;
+
+ reactor->events = spdk_ring_create(SPDK_RING_TYPE_MP_SC, 65536, SPDK_ENV_SOCKET_ID_ANY);
+ assert(reactor->events != NULL);
+}
+
+struct spdk_reactor *
+spdk_reactor_get(uint32_t lcore)
+{
+ struct spdk_reactor *reactor;
+
+ if (g_reactors == NULL) {
+ SPDK_WARNLOG("Called spdk_reactor_get() while the g_reactors array was NULL!\n");
+ return NULL;
+ }
+
+ reactor = &g_reactors[lcore];
+
+ if (reactor->flags.is_valid == false) {
+ return NULL;
+ }
+
+ return reactor;
+}
+
+static int reactor_thread_op(struct spdk_thread *thread, enum spdk_thread_op op);
+static bool reactor_thread_op_supported(enum spdk_thread_op op);
+
+int
+spdk_reactors_init(void)
+{
+ int rc;
+ uint32_t i, last_core;
+ char mempool_name[32];
+
+ snprintf(mempool_name, sizeof(mempool_name), "evtpool_%d", getpid());
+ g_spdk_event_mempool = spdk_mempool_create(mempool_name,
+ 262144 - 1, /* Power of 2 minus 1 is optimal for memory consumption */
+ sizeof(struct spdk_event),
+ SPDK_MEMPOOL_DEFAULT_CACHE_SIZE,
+ SPDK_ENV_SOCKET_ID_ANY);
+
+ if (g_spdk_event_mempool == NULL) {
+ SPDK_ERRLOG("spdk_event_mempool creation failed\n");
+ return -1;
+ }
+
+ /* struct spdk_reactor must be aligned on 64 byte boundary */
+ last_core = spdk_env_get_last_core();
+ rc = posix_memalign((void **)&g_reactors, 64,
+ (last_core + 1) * sizeof(struct spdk_reactor));
+ if (rc != 0) {
+ SPDK_ERRLOG("Could not allocate array size=%u for g_reactors\n",
+ last_core + 1);
+ spdk_mempool_free(g_spdk_event_mempool);
+ return -1;
+ }
+
+ memset(g_reactors, 0, (last_core + 1) * sizeof(struct spdk_reactor));
+
+ spdk_thread_lib_init_ext(reactor_thread_op, reactor_thread_op_supported,
+ sizeof(struct spdk_lw_thread));
+
+ SPDK_ENV_FOREACH_CORE(i) {
+ reactor_construct(&g_reactors[i], i);
+ }
+
+ g_reactor_state = SPDK_REACTOR_STATE_INITIALIZED;
+
+ return 0;
+}
+
+void
+spdk_reactors_fini(void)
+{
+ uint32_t i;
+ struct spdk_reactor *reactor;
+
+ if (g_reactor_state == SPDK_REACTOR_STATE_UNINITIALIZED) {
+ return;
+ }
+
+ spdk_thread_lib_fini();
+
+ SPDK_ENV_FOREACH_CORE(i) {
+ reactor = spdk_reactor_get(i);
+ assert(reactor != NULL);
+ assert(reactor->thread_count == 0);
+ if (reactor->events != NULL) {
+ spdk_ring_free(reactor->events);
+ }
+ }
+
+ spdk_mempool_free(g_spdk_event_mempool);
+
+ free(g_reactors);
+ g_reactors = NULL;
+}
+
+struct spdk_event *
+spdk_event_allocate(uint32_t lcore, spdk_event_fn fn, void *arg1, void *arg2)
+{
+ struct spdk_event *event = NULL;
+ struct spdk_reactor *reactor = spdk_reactor_get(lcore);
+
+ if (!reactor) {
+ assert(false);
+ return NULL;
+ }
+
+ event = spdk_mempool_get(g_spdk_event_mempool);
+ if (event == NULL) {
+ assert(false);
+ return NULL;
+ }
+
+ event->lcore = lcore;
+ event->fn = fn;
+ event->arg1 = arg1;
+ event->arg2 = arg2;
+
+ return event;
+}
+
+void
+spdk_event_call(struct spdk_event *event)
+{
+ int rc;
+ struct spdk_reactor *reactor;
+
+ reactor = spdk_reactor_get(event->lcore);
+
+ assert(reactor != NULL);
+ assert(reactor->events != NULL);
+
+ rc = spdk_ring_enqueue(reactor->events, (void **)&event, 1, NULL);
+ if (rc != 1) {
+ assert(false);
+ }
+}
+
+static inline uint32_t
+event_queue_run_batch(struct spdk_reactor *reactor)
+{
+ unsigned count, i;
+ void *events[SPDK_EVENT_BATCH_SIZE];
+ struct spdk_thread *thread;
+ struct spdk_lw_thread *lw_thread;
+
+#ifdef DEBUG
+ /*
+ * spdk_ring_dequeue() fills events and returns how many entries it wrote,
+ * so we will never actually read uninitialized data from events, but just to be sure
+ * (and to silence a static analyzer false positive), initialize the array to NULL pointers.
+ */
+ memset(events, 0, sizeof(events));
+#endif
+
+ count = spdk_ring_dequeue(reactor->events, events, SPDK_EVENT_BATCH_SIZE);
+ if (count == 0) {
+ return 0;
+ }
+
+ /* Execute the events. There are still some remaining events
+ * that must occur on an SPDK thread. To accomodate those, try to
+ * run them on the first thread in the list, if it exists. */
+ lw_thread = TAILQ_FIRST(&reactor->threads);
+ if (lw_thread) {
+ thread = spdk_thread_get_from_ctx(lw_thread);
+ } else {
+ thread = NULL;
+ }
+
+ spdk_set_thread(thread);
+
+ for (i = 0; i < count; i++) {
+ struct spdk_event *event = events[i];
+
+ assert(event != NULL);
+ event->fn(event->arg1, event->arg2);
+ }
+
+ spdk_set_thread(NULL);
+
+ spdk_mempool_put_bulk(g_spdk_event_mempool, events, count);
+
+ return count;
+}
+
+/* 1s */
+#define CONTEXT_SWITCH_MONITOR_PERIOD 1000000
+
+static int
+get_rusage(struct spdk_reactor *reactor)
+{
+ struct rusage rusage;
+
+ if (getrusage(RUSAGE_THREAD, &rusage) != 0) {
+ return -1;
+ }
+
+ if (rusage.ru_nvcsw != reactor->rusage.ru_nvcsw || rusage.ru_nivcsw != reactor->rusage.ru_nivcsw) {
+ SPDK_INFOLOG(SPDK_LOG_REACTOR,
+ "Reactor %d: %ld voluntary context switches and %ld involuntary context switches in the last second.\n",
+ reactor->lcore, rusage.ru_nvcsw - reactor->rusage.ru_nvcsw,
+ rusage.ru_nivcsw - reactor->rusage.ru_nivcsw);
+ }
+ reactor->rusage = rusage;
+
+ return -1;
+}
+
+void
+spdk_framework_enable_context_switch_monitor(bool enable)
+{
+ /* This global is being read by multiple threads, so this isn't
+ * strictly thread safe. However, we're toggling between true and
+ * false here, and if a thread sees the value update later than it
+ * should, it's no big deal. */
+ g_framework_context_switch_monitor_enabled = enable;
+}
+
+bool
+spdk_framework_context_switch_monitor_enabled(void)
+{
+ return g_framework_context_switch_monitor_enabled;
+}
+
+static void
+_set_thread_name(const char *thread_name)
+{
+#if defined(__linux__)
+ prctl(PR_SET_NAME, thread_name, 0, 0, 0);
+#elif defined(__FreeBSD__)
+ pthread_set_name_np(pthread_self(), thread_name);
+#else
+#error missing platform support for thread name
+#endif
+}
+
+static int _reactor_schedule_thread(struct spdk_thread *thread);
+static uint64_t g_rusage_period;
+
+static void
+_reactor_run(struct spdk_reactor *reactor)
+{
+ struct spdk_thread *thread;
+ struct spdk_lw_thread *lw_thread, *tmp;
+ uint64_t now;
+ int rc;
+
+ event_queue_run_batch(reactor);
+
+ TAILQ_FOREACH_SAFE(lw_thread, &reactor->threads, link, tmp) {
+ thread = spdk_thread_get_from_ctx(lw_thread);
+ rc = spdk_thread_poll(thread, 0, reactor->tsc_last);
+
+ now = spdk_thread_get_last_tsc(thread);
+ if (rc == 0) {
+ reactor->idle_tsc += now - reactor->tsc_last;
+ } else if (rc > 0) {
+ reactor->busy_tsc += now - reactor->tsc_last;
+ }
+ reactor->tsc_last = now;
+
+ if (spdk_unlikely(lw_thread->resched)) {
+ lw_thread->resched = false;
+ TAILQ_REMOVE(&reactor->threads, lw_thread, link);
+ assert(reactor->thread_count > 0);
+ reactor->thread_count--;
+ _reactor_schedule_thread(thread);
+ continue;
+ }
+
+ if (spdk_unlikely(spdk_thread_is_exited(thread) &&
+ spdk_thread_is_idle(thread))) {
+ TAILQ_REMOVE(&reactor->threads, lw_thread, link);
+ assert(reactor->thread_count > 0);
+ reactor->thread_count--;
+ spdk_thread_destroy(thread);
+ continue;
+ }
+ }
+
+ if (g_framework_context_switch_monitor_enabled) {
+ if ((reactor->last_rusage + g_rusage_period) < reactor->tsc_last) {
+ get_rusage(reactor);
+ reactor->last_rusage = reactor->tsc_last;
+ }
+ }
+}
+
+static int
+reactor_run(void *arg)
+{
+ struct spdk_reactor *reactor = arg;
+ struct spdk_thread *thread;
+ struct spdk_lw_thread *lw_thread, *tmp;
+ char thread_name[32];
+
+ SPDK_NOTICELOG("Reactor started on core %u\n", reactor->lcore);
+
+ /* Rename the POSIX thread because the reactor is tied to the POSIX
+ * thread in the SPDK event library.
+ */
+ snprintf(thread_name, sizeof(thread_name), "reactor_%u", reactor->lcore);
+ _set_thread_name(thread_name);
+
+ reactor->tsc_last = spdk_get_ticks();
+
+ while (1) {
+ _reactor_run(reactor);
+
+ if (g_reactor_state != SPDK_REACTOR_STATE_RUNNING) {
+ break;
+ }
+ }
+
+ TAILQ_FOREACH(lw_thread, &reactor->threads, link) {
+ thread = spdk_thread_get_from_ctx(lw_thread);
+ spdk_set_thread(thread);
+ spdk_thread_exit(thread);
+ }
+
+ while (!TAILQ_EMPTY(&reactor->threads)) {
+ TAILQ_FOREACH_SAFE(lw_thread, &reactor->threads, link, tmp) {
+ thread = spdk_thread_get_from_ctx(lw_thread);
+ spdk_set_thread(thread);
+ if (spdk_thread_is_exited(thread)) {
+ TAILQ_REMOVE(&reactor->threads, lw_thread, link);
+ assert(reactor->thread_count > 0);
+ reactor->thread_count--;
+ spdk_thread_destroy(thread);
+ } else {
+ spdk_thread_poll(thread, 0, 0);
+ }
+ }
+ }
+
+ return 0;
+}
+
+int
+spdk_app_parse_core_mask(const char *mask, struct spdk_cpuset *cpumask)
+{
+ int ret;
+ struct spdk_cpuset *validmask;
+
+ ret = spdk_cpuset_parse(cpumask, mask);
+ if (ret < 0) {
+ return ret;
+ }
+
+ validmask = spdk_app_get_core_mask();
+ spdk_cpuset_and(cpumask, validmask);
+
+ return 0;
+}
+
+struct spdk_cpuset *
+spdk_app_get_core_mask(void)
+{
+ return &g_reactor_core_mask;
+}
+
+void
+spdk_reactors_start(void)
+{
+ struct spdk_reactor *reactor;
+ struct spdk_cpuset tmp_cpumask = {};
+ uint32_t i, current_core;
+ int rc;
+ char thread_name[32];
+
+ g_rusage_period = (CONTEXT_SWITCH_MONITOR_PERIOD * spdk_get_ticks_hz()) / SPDK_SEC_TO_USEC;
+ g_reactor_state = SPDK_REACTOR_STATE_RUNNING;
+
+ current_core = spdk_env_get_current_core();
+ SPDK_ENV_FOREACH_CORE(i) {
+ if (i != current_core) {
+ reactor = spdk_reactor_get(i);
+ if (reactor == NULL) {
+ continue;
+ }
+
+ rc = spdk_env_thread_launch_pinned(reactor->lcore, reactor_run, reactor);
+ if (rc < 0) {
+ SPDK_ERRLOG("Unable to start reactor thread on core %u\n", reactor->lcore);
+ assert(false);
+ return;
+ }
+
+ /* For now, for each reactor spawn one thread. */
+ snprintf(thread_name, sizeof(thread_name), "reactor_%u", reactor->lcore);
+
+ spdk_cpuset_zero(&tmp_cpumask);
+ spdk_cpuset_set_cpu(&tmp_cpumask, i, true);
+
+ spdk_thread_create(thread_name, &tmp_cpumask);
+ }
+ spdk_cpuset_set_cpu(&g_reactor_core_mask, i, true);
+ }
+
+ /* Start the master reactor */
+ reactor = spdk_reactor_get(current_core);
+ assert(reactor != NULL);
+ reactor_run(reactor);
+
+ spdk_env_thread_wait_all();
+
+ g_reactor_state = SPDK_REACTOR_STATE_SHUTDOWN;
+}
+
+void
+spdk_reactors_stop(void *arg1)
+{
+ g_reactor_state = SPDK_REACTOR_STATE_EXITING;
+}
+
+static pthread_mutex_t g_scheduler_mtx = PTHREAD_MUTEX_INITIALIZER;
+static uint32_t g_next_core = UINT32_MAX;
+
+static void
+_schedule_thread(void *arg1, void *arg2)
+{
+ struct spdk_lw_thread *lw_thread = arg1;
+ struct spdk_thread *thread;
+ struct spdk_cpuset *cpumask;
+ struct spdk_reactor *reactor;
+ uint32_t current_core;
+
+ current_core = spdk_env_get_current_core();
+
+ thread = spdk_thread_get_from_ctx(lw_thread);
+ cpumask = spdk_thread_get_cpumask(thread);
+ if (!spdk_cpuset_get_cpu(cpumask, current_core)) {
+ SPDK_ERRLOG("Thread was scheduled to the wrong core %d\n", current_core);
+ assert(false);
+ }
+
+ reactor = spdk_reactor_get(current_core);
+ assert(reactor != NULL);
+
+ TAILQ_INSERT_TAIL(&reactor->threads, lw_thread, link);
+ reactor->thread_count++;
+}
+
+static int
+_reactor_schedule_thread(struct spdk_thread *thread)
+{
+ uint32_t core;
+ struct spdk_lw_thread *lw_thread;
+ struct spdk_event *evt = NULL;
+ struct spdk_cpuset *cpumask;
+ uint32_t i;
+
+ cpumask = spdk_thread_get_cpumask(thread);
+
+ lw_thread = spdk_thread_get_ctx(thread);
+ assert(lw_thread != NULL);
+ memset(lw_thread, 0, sizeof(*lw_thread));
+
+ pthread_mutex_lock(&g_scheduler_mtx);
+ for (i = 0; i < spdk_env_get_core_count(); i++) {
+ if (g_next_core > spdk_env_get_last_core()) {
+ g_next_core = spdk_env_get_first_core();
+ }
+ core = g_next_core;
+ g_next_core = spdk_env_get_next_core(g_next_core);
+
+ if (spdk_cpuset_get_cpu(cpumask, core)) {
+ evt = spdk_event_allocate(core, _schedule_thread, lw_thread, NULL);
+ break;
+ }
+ }
+ pthread_mutex_unlock(&g_scheduler_mtx);
+
+ assert(evt != NULL);
+ if (evt == NULL) {
+ SPDK_ERRLOG("Unable to schedule thread on requested core mask.\n");
+ return -1;
+ }
+
+ lw_thread->tsc_start = spdk_get_ticks();
+
+ spdk_event_call(evt);
+
+ return 0;
+}
+
+static void
+_reactor_request_thread_reschedule(struct spdk_thread *thread)
+{
+ struct spdk_lw_thread *lw_thread;
+
+ assert(thread == spdk_get_thread());
+
+ lw_thread = spdk_thread_get_ctx(thread);
+
+ assert(lw_thread != NULL);
+
+ lw_thread->resched = true;
+}
+
+static int
+reactor_thread_op(struct spdk_thread *thread, enum spdk_thread_op op)
+{
+ switch (op) {
+ case SPDK_THREAD_OP_NEW:
+ return _reactor_schedule_thread(thread);
+ case SPDK_THREAD_OP_RESCHED:
+ _reactor_request_thread_reschedule(thread);
+ return 0;
+ default:
+ return -ENOTSUP;
+ }
+}
+
+static bool
+reactor_thread_op_supported(enum spdk_thread_op op)
+{
+ switch (op) {
+ case SPDK_THREAD_OP_NEW:
+ case SPDK_THREAD_OP_RESCHED:
+ return true;
+ default:
+ return false;
+ }
+}
+
+struct call_reactor {
+ uint32_t cur_core;
+ spdk_event_fn fn;
+ void *arg1;
+ void *arg2;
+
+ uint32_t orig_core;
+ spdk_event_fn cpl;
+};
+
+static void
+on_reactor(void *arg1, void *arg2)
+{
+ struct call_reactor *cr = arg1;
+ struct spdk_event *evt;
+
+ cr->fn(cr->arg1, cr->arg2);
+
+ cr->cur_core = spdk_env_get_next_core(cr->cur_core);
+
+ if (cr->cur_core > spdk_env_get_last_core()) {
+ SPDK_DEBUGLOG(SPDK_LOG_REACTOR, "Completed reactor iteration\n");
+
+ evt = spdk_event_allocate(cr->orig_core, cr->cpl, cr->arg1, cr->arg2);
+ free(cr);
+ } else {
+ SPDK_DEBUGLOG(SPDK_LOG_REACTOR, "Continuing reactor iteration to %d\n",
+ cr->cur_core);
+
+ evt = spdk_event_allocate(cr->cur_core, on_reactor, arg1, NULL);
+ }
+ assert(evt != NULL);
+ spdk_event_call(evt);
+}
+
+void
+spdk_for_each_reactor(spdk_event_fn fn, void *arg1, void *arg2, spdk_event_fn cpl)
+{
+ struct call_reactor *cr;
+ struct spdk_event *evt;
+
+ cr = calloc(1, sizeof(*cr));
+ if (!cr) {
+ SPDK_ERRLOG("Unable to perform reactor iteration\n");
+ cpl(arg1, arg2);
+ return;
+ }
+
+ cr->fn = fn;
+ cr->arg1 = arg1;
+ cr->arg2 = arg2;
+ cr->cpl = cpl;
+ cr->orig_core = spdk_env_get_current_core();
+ cr->cur_core = spdk_env_get_first_core();
+
+ SPDK_DEBUGLOG(SPDK_LOG_REACTOR, "Starting reactor iteration from %d\n", cr->orig_core);
+
+ evt = spdk_event_allocate(cr->cur_core, on_reactor, cr, NULL);
+ assert(evt != NULL);
+
+ spdk_event_call(evt);
+}
+
+SPDK_LOG_REGISTER_COMPONENT("reactor", SPDK_LOG_REACTOR)
diff --git a/src/spdk/lib/event/rpc.c b/src/spdk/lib/event/rpc.c
new file mode 100644
index 000000000..a42d5ebeb
--- /dev/null
+++ b/src/spdk/lib/event/rpc.c
@@ -0,0 +1,87 @@
+/*-
+ * BSD LICENSE
+ *
+ * Copyright (c) Intel Corporation.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions
+ * are met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above copyright
+ * notice, this list of conditions and the following disclaimer in
+ * the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Intel Corporation nor the names of its
+ * contributors may be used to endorse or promote products derived
+ * from this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+#include "spdk/stdinc.h"
+
+#include "spdk/conf.h"
+#include "spdk/env.h"
+#include "spdk/thread.h"
+#include "spdk/log.h"
+#include "spdk/rpc.h"
+
+#include "spdk_internal/event.h"
+
+#define RPC_SELECT_INTERVAL 4000 /* 4ms */
+
+static struct spdk_poller *g_rpc_poller = NULL;
+
+static int
+rpc_subsystem_poll(void *arg)
+{
+ spdk_rpc_accept();
+ return SPDK_POLLER_BUSY;
+}
+
+void
+spdk_rpc_initialize(const char *listen_addr)
+{
+ int rc;
+
+ if (listen_addr == NULL) {
+ return;
+ }
+
+ if (!spdk_rpc_verify_methods()) {
+ spdk_app_stop(-EINVAL);
+ return;
+ }
+
+ /* Listen on the requested address */
+ rc = spdk_rpc_listen(listen_addr);
+ if (rc != 0) {
+ SPDK_ERRLOG("Unable to start RPC service at %s\n", listen_addr);
+ return;
+ }
+
+ spdk_rpc_set_state(SPDK_RPC_STARTUP);
+
+ /* Register a poller to periodically check for RPCs */
+ g_rpc_poller = SPDK_POLLER_REGISTER(rpc_subsystem_poll, NULL, RPC_SELECT_INTERVAL);
+}
+
+void
+spdk_rpc_finish(void)
+{
+ spdk_rpc_close();
+ spdk_poller_unregister(&g_rpc_poller);
+}
diff --git a/src/spdk/lib/event/spdk_event.map b/src/spdk/lib/event/spdk_event.map
new file mode 100644
index 000000000..8208c5e1f
--- /dev/null
+++ b/src/spdk/lib/event/spdk_event.map
@@ -0,0 +1,46 @@
+{
+ global:
+
+ # Public functions
+ spdk_app_opts_init;
+ spdk_app_start;
+ spdk_app_fini;
+ spdk_app_start_shutdown;
+ spdk_app_stop;
+ spdk_app_get_running_config;
+ spdk_app_get_shm_id;
+ spdk_app_parse_core_mask;
+ spdk_app_get_core_mask;
+ spdk_app_parse_args;
+ spdk_app_usage;
+ spdk_event_allocate;
+ spdk_event_call;
+ spdk_framework_enable_context_switch_monitor;
+ spdk_framework_context_switch_monitor_enabled;
+
+ # Functions used by other SPDK libraries
+ spdk_reactors_init;
+ spdk_reactors_fini;
+ spdk_reactors_start;
+ spdk_reactors_stop;
+ spdk_reactor_get;
+ spdk_for_each_reactor;
+ spdk_subsystem_find;
+ spdk_subsystem_get_first;
+ spdk_subsystem_get_next;
+ spdk_subsystem_get_first_depend;
+ spdk_subsystem_get_next_depend;
+ spdk_add_subsystem;
+ spdk_add_subsystem_depend;
+ spdk_subsystem_init;
+ spdk_subsystem_fini;
+ spdk_subsystem_init_next;
+ spdk_subsystem_fini_next;
+ spdk_subsystem_config;
+ spdk_app_json_config_load;
+ spdk_subsystem_config_json;
+ spdk_rpc_initialize;
+ spdk_rpc_finish;
+
+ local: *;
+};
diff --git a/src/spdk/lib/event/subsystem.c b/src/spdk/lib/event/subsystem.c
new file mode 100644
index 000000000..2cff890b2
--- /dev/null
+++ b/src/spdk/lib/event/subsystem.c
@@ -0,0 +1,288 @@
+/*-
+ * BSD LICENSE
+ *
+ * Copyright (c) Intel Corporation.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions
+ * are met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above copyright
+ * notice, this list of conditions and the following disclaimer in
+ * the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Intel Corporation nor the names of its
+ * contributors may be used to endorse or promote products derived
+ * from this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+#include "spdk/stdinc.h"
+
+#include "spdk/log.h"
+#include "spdk/thread.h"
+
+#include "spdk_internal/event.h"
+#include "spdk/env.h"
+
+TAILQ_HEAD(spdk_subsystem_list, spdk_subsystem);
+struct spdk_subsystem_list g_subsystems = TAILQ_HEAD_INITIALIZER(g_subsystems);
+
+TAILQ_HEAD(spdk_subsystem_depend_list, spdk_subsystem_depend);
+struct spdk_subsystem_depend_list g_subsystems_deps = TAILQ_HEAD_INITIALIZER(g_subsystems_deps);
+static struct spdk_subsystem *g_next_subsystem;
+static bool g_subsystems_initialized = false;
+static bool g_subsystems_init_interrupted = false;
+static spdk_subsystem_init_fn g_subsystem_start_fn = NULL;
+static void *g_subsystem_start_arg = NULL;
+static spdk_msg_fn g_subsystem_stop_fn = NULL;
+static void *g_subsystem_stop_arg = NULL;
+static struct spdk_thread *g_fini_thread = NULL;
+
+void
+spdk_add_subsystem(struct spdk_subsystem *subsystem)
+{
+ TAILQ_INSERT_TAIL(&g_subsystems, subsystem, tailq);
+}
+
+void
+spdk_add_subsystem_depend(struct spdk_subsystem_depend *depend)
+{
+ TAILQ_INSERT_TAIL(&g_subsystems_deps, depend, tailq);
+}
+
+static struct spdk_subsystem *
+_subsystem_find(struct spdk_subsystem_list *list, const char *name)
+{
+ struct spdk_subsystem *iter;
+
+ TAILQ_FOREACH(iter, list, tailq) {
+ if (strcmp(name, iter->name) == 0) {
+ return iter;
+ }
+ }
+
+ return NULL;
+}
+
+struct spdk_subsystem *
+spdk_subsystem_find(const char *name)
+{
+ return _subsystem_find(&g_subsystems, name);
+}
+
+struct spdk_subsystem *
+spdk_subsystem_get_first(void)
+{
+ return TAILQ_FIRST(&g_subsystems);
+}
+
+struct spdk_subsystem *
+spdk_subsystem_get_next(struct spdk_subsystem *cur_subsystem)
+{
+ return TAILQ_NEXT(cur_subsystem, tailq);
+}
+
+
+struct spdk_subsystem_depend *
+spdk_subsystem_get_first_depend(void)
+{
+ return TAILQ_FIRST(&g_subsystems_deps);
+}
+
+struct spdk_subsystem_depend *
+spdk_subsystem_get_next_depend(struct spdk_subsystem_depend *cur_depend)
+{
+ return TAILQ_NEXT(cur_depend, tailq);
+}
+
+static void
+subsystem_sort(void)
+{
+ bool depends_on, depends_on_sorted;
+ struct spdk_subsystem *subsystem, *subsystem_tmp;
+ struct spdk_subsystem_depend *subsystem_dep;
+
+ struct spdk_subsystem_list subsystems_list = TAILQ_HEAD_INITIALIZER(subsystems_list);
+
+ while (!TAILQ_EMPTY(&g_subsystems)) {
+ TAILQ_FOREACH_SAFE(subsystem, &g_subsystems, tailq, subsystem_tmp) {
+ depends_on = false;
+ TAILQ_FOREACH(subsystem_dep, &g_subsystems_deps, tailq) {
+ if (strcmp(subsystem->name, subsystem_dep->name) == 0) {
+ depends_on = true;
+ depends_on_sorted = !!_subsystem_find(&subsystems_list, subsystem_dep->depends_on);
+ if (depends_on_sorted) {
+ continue;
+ }
+ break;
+ }
+ }
+
+ if (depends_on == false) {
+ TAILQ_REMOVE(&g_subsystems, subsystem, tailq);
+ TAILQ_INSERT_TAIL(&subsystems_list, subsystem, tailq);
+ } else {
+ if (depends_on_sorted == true) {
+ TAILQ_REMOVE(&g_subsystems, subsystem, tailq);
+ TAILQ_INSERT_TAIL(&subsystems_list, subsystem, tailq);
+ }
+ }
+ }
+ }
+
+ TAILQ_FOREACH_SAFE(subsystem, &subsystems_list, tailq, subsystem_tmp) {
+ TAILQ_REMOVE(&subsystems_list, subsystem, tailq);
+ TAILQ_INSERT_TAIL(&g_subsystems, subsystem, tailq);
+ }
+}
+
+void
+spdk_subsystem_init_next(int rc)
+{
+ /* The initialization is interrupted by the spdk_subsystem_fini, so just return */
+ if (g_subsystems_init_interrupted) {
+ return;
+ }
+
+ if (rc) {
+ SPDK_ERRLOG("Init subsystem %s failed\n", g_next_subsystem->name);
+ g_subsystem_start_fn(rc, g_subsystem_start_arg);
+ return;
+ }
+
+ if (!g_next_subsystem) {
+ g_next_subsystem = TAILQ_FIRST(&g_subsystems);
+ } else {
+ g_next_subsystem = TAILQ_NEXT(g_next_subsystem, tailq);
+ }
+
+ if (!g_next_subsystem) {
+ g_subsystems_initialized = true;
+ g_subsystem_start_fn(0, g_subsystem_start_arg);
+ return;
+ }
+
+ if (g_next_subsystem->init) {
+ g_next_subsystem->init();
+ } else {
+ spdk_subsystem_init_next(0);
+ }
+}
+
+void
+spdk_subsystem_init(spdk_subsystem_init_fn cb_fn, void *cb_arg)
+{
+ struct spdk_subsystem_depend *dep;
+
+ g_subsystem_start_fn = cb_fn;
+ g_subsystem_start_arg = cb_arg;
+
+ /* Verify that all dependency name and depends_on subsystems are registered */
+ TAILQ_FOREACH(dep, &g_subsystems_deps, tailq) {
+ if (!spdk_subsystem_find(dep->name)) {
+ SPDK_ERRLOG("subsystem %s is missing\n", dep->name);
+ g_subsystem_start_fn(-1, g_subsystem_start_arg);
+ return;
+ }
+ if (!spdk_subsystem_find(dep->depends_on)) {
+ SPDK_ERRLOG("subsystem %s dependency %s is missing\n",
+ dep->name, dep->depends_on);
+ g_subsystem_start_fn(-1, g_subsystem_start_arg);
+ return;
+ }
+ }
+
+ subsystem_sort();
+
+ spdk_subsystem_init_next(0);
+}
+
+static void
+subsystem_fini_next(void *arg1)
+{
+ assert(g_fini_thread == spdk_get_thread());
+
+ if (!g_next_subsystem) {
+ /* If the initialized flag is false, then we've failed to initialize
+ * the very first subsystem and no de-init is needed
+ */
+ if (g_subsystems_initialized) {
+ g_next_subsystem = TAILQ_LAST(&g_subsystems, spdk_subsystem_list);
+ }
+ } else {
+ if (g_subsystems_initialized || g_subsystems_init_interrupted) {
+ g_next_subsystem = TAILQ_PREV(g_next_subsystem, spdk_subsystem_list, tailq);
+ } else {
+ g_subsystems_init_interrupted = true;
+ }
+ }
+
+ while (g_next_subsystem) {
+ if (g_next_subsystem->fini) {
+ g_next_subsystem->fini();
+ return;
+ }
+ g_next_subsystem = TAILQ_PREV(g_next_subsystem, spdk_subsystem_list, tailq);
+ }
+
+ g_subsystem_stop_fn(g_subsystem_stop_arg);
+ return;
+}
+
+void
+spdk_subsystem_fini_next(void)
+{
+ if (g_fini_thread != spdk_get_thread()) {
+ spdk_thread_send_msg(g_fini_thread, subsystem_fini_next, NULL);
+ } else {
+ subsystem_fini_next(NULL);
+ }
+}
+
+void
+spdk_subsystem_fini(spdk_msg_fn cb_fn, void *cb_arg)
+{
+ g_subsystem_stop_fn = cb_fn;
+ g_subsystem_stop_arg = cb_arg;
+
+ g_fini_thread = spdk_get_thread();
+
+ spdk_subsystem_fini_next();
+}
+
+void
+spdk_subsystem_config(FILE *fp)
+{
+ struct spdk_subsystem *subsystem;
+
+ TAILQ_FOREACH(subsystem, &g_subsystems, tailq) {
+ if (subsystem->config) {
+ subsystem->config(fp);
+ }
+ }
+}
+
+void
+spdk_subsystem_config_json(struct spdk_json_write_ctx *w, struct spdk_subsystem *subsystem)
+{
+ if (subsystem && subsystem->write_config_json) {
+ subsystem->write_config_json(w);
+ } else {
+ spdk_json_write_null(w);
+ }
+}