diff options
Diffstat (limited to 'src/spdk/lib/event')
-rw-r--r-- | src/spdk/lib/event/Makefile | 45 | ||||
-rw-r--r-- | src/spdk/lib/event/app.c | 1177 | ||||
-rw-r--r-- | src/spdk/lib/event/json_config.c | 630 | ||||
-rw-r--r-- | src/spdk/lib/event/reactor.c | 664 | ||||
-rw-r--r-- | src/spdk/lib/event/rpc.c | 87 | ||||
-rw-r--r-- | src/spdk/lib/event/spdk_event.map | 46 | ||||
-rw-r--r-- | src/spdk/lib/event/subsystem.c | 288 |
7 files changed, 2937 insertions, 0 deletions
diff --git a/src/spdk/lib/event/Makefile b/src/spdk/lib/event/Makefile new file mode 100644 index 000000000..87a6209c7 --- /dev/null +++ b/src/spdk/lib/event/Makefile @@ -0,0 +1,45 @@ +# +# BSD LICENSE +# +# Copyright (c) Intel Corporation. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions +# are met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above copyright +# notice, this list of conditions and the following disclaimer in +# the documentation and/or other materials provided with the +# distribution. +# * Neither the name of Intel Corporation nor the names of its +# contributors may be used to endorse or promote products derived +# from this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. +# + +SPDK_ROOT_DIR := $(abspath $(CURDIR)/../..) +include $(SPDK_ROOT_DIR)/mk/spdk.common.mk + +SO_VER := 5 +SO_MINOR := 0 + +LIBNAME = event +C_SRCS = app.c reactor.c rpc.c subsystem.c json_config.c + +SPDK_MAP_FILE = $(abspath $(CURDIR)/spdk_event.map) + +include $(SPDK_ROOT_DIR)/mk/spdk.lib.mk diff --git a/src/spdk/lib/event/app.c b/src/spdk/lib/event/app.c new file mode 100644 index 000000000..b6cab05a3 --- /dev/null +++ b/src/spdk/lib/event/app.c @@ -0,0 +1,1177 @@ +/*- + * BSD LICENSE + * + * Copyright (c) Intel Corporation. All rights reserved. + * Copyright (c) 2019 Mellanox Technologies LTD. All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions + * are met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in + * the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Intel Corporation nor the names of its + * contributors may be used to endorse or promote products derived + * from this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +#include "spdk/stdinc.h" +#include "spdk/version.h" + +#include "spdk_internal/event.h" + +#include "spdk/env.h" +#include "spdk/log.h" +#include "spdk/conf.h" +#include "spdk/thread.h" +#include "spdk/trace.h" +#include "spdk/string.h" +#include "spdk/rpc.h" +#include "spdk/util.h" + +#define SPDK_APP_DEFAULT_LOG_LEVEL SPDK_LOG_NOTICE +#define SPDK_APP_DEFAULT_LOG_PRINT_LEVEL SPDK_LOG_INFO +#define SPDK_APP_DEFAULT_NUM_TRACE_ENTRIES SPDK_DEFAULT_NUM_TRACE_ENTRIES + +#define SPDK_APP_DPDK_DEFAULT_MEM_SIZE -1 +#define SPDK_APP_DPDK_DEFAULT_MASTER_CORE -1 +#define SPDK_APP_DPDK_DEFAULT_MEM_CHANNEL -1 +#define SPDK_APP_DPDK_DEFAULT_CORE_MASK "0x1" +#define SPDK_APP_DPDK_DEFAULT_BASE_VIRTADDR 0x200000000000 +#define SPDK_APP_DEFAULT_CORE_LIMIT 0x140000000 /* 5 GiB */ + +struct spdk_app { + struct spdk_conf *config; + const char *json_config_file; + bool json_config_ignore_errors; + const char *rpc_addr; + int shm_id; + spdk_app_shutdown_cb shutdown_cb; + int rc; +}; + +static struct spdk_app g_spdk_app; +static spdk_msg_fn g_start_fn = NULL; +static void *g_start_arg = NULL; +static struct spdk_thread *g_app_thread = NULL; +static bool g_delay_subsystem_init = false; +static bool g_shutdown_sig_received = false; +static char *g_executable_name; +static struct spdk_app_opts g_default_opts; + +int +spdk_app_get_shm_id(void) +{ + return g_spdk_app.shm_id; +} + +/* append one empty option to indicate the end of the array */ +static const struct option g_cmdline_options[] = { +#define CONFIG_FILE_OPT_IDX 'c' + {"config", required_argument, NULL, CONFIG_FILE_OPT_IDX}, +#define LIMIT_COREDUMP_OPT_IDX 'd' + {"limit-coredump", no_argument, NULL, LIMIT_COREDUMP_OPT_IDX}, +#define TPOINT_GROUP_MASK_OPT_IDX 'e' + {"tpoint-group-mask", required_argument, NULL, TPOINT_GROUP_MASK_OPT_IDX}, +#define SINGLE_FILE_SEGMENTS_OPT_IDX 'g' + {"single-file-segments", no_argument, NULL, SINGLE_FILE_SEGMENTS_OPT_IDX}, +#define HELP_OPT_IDX 'h' + {"help", no_argument, NULL, HELP_OPT_IDX}, +#define SHM_ID_OPT_IDX 'i' + {"shm-id", required_argument, NULL, SHM_ID_OPT_IDX}, +#define CPUMASK_OPT_IDX 'm' + {"cpumask", required_argument, NULL, CPUMASK_OPT_IDX}, +#define MEM_CHANNELS_OPT_IDX 'n' + {"mem-channels", required_argument, NULL, MEM_CHANNELS_OPT_IDX}, +#define MASTER_CORE_OPT_IDX 'p' + {"master-core", required_argument, NULL, MASTER_CORE_OPT_IDX}, +#define RPC_SOCKET_OPT_IDX 'r' + {"rpc-socket", required_argument, NULL, RPC_SOCKET_OPT_IDX}, +#define MEM_SIZE_OPT_IDX 's' + {"mem-size", required_argument, NULL, MEM_SIZE_OPT_IDX}, +#define NO_PCI_OPT_IDX 'u' + {"no-pci", no_argument, NULL, NO_PCI_OPT_IDX}, +#define VERSION_OPT_IDX 'v' + {"version", no_argument, NULL, VERSION_OPT_IDX}, +#define PCI_BLACKLIST_OPT_IDX 'B' + {"pci-blacklist", required_argument, NULL, PCI_BLACKLIST_OPT_IDX}, +#define LOGFLAG_OPT_IDX 'L' + {"logflag", required_argument, NULL, LOGFLAG_OPT_IDX}, +#define HUGE_UNLINK_OPT_IDX 'R' + {"huge-unlink", no_argument, NULL, HUGE_UNLINK_OPT_IDX}, +#define PCI_WHITELIST_OPT_IDX 'W' + {"pci-whitelist", required_argument, NULL, PCI_WHITELIST_OPT_IDX}, +#define SILENCE_NOTICELOG_OPT_IDX 257 + {"silence-noticelog", no_argument, NULL, SILENCE_NOTICELOG_OPT_IDX}, +#define WAIT_FOR_RPC_OPT_IDX 258 + {"wait-for-rpc", no_argument, NULL, WAIT_FOR_RPC_OPT_IDX}, +#define HUGE_DIR_OPT_IDX 259 + {"huge-dir", required_argument, NULL, HUGE_DIR_OPT_IDX}, +#define NUM_TRACE_ENTRIES_OPT_IDX 260 + {"num-trace-entries", required_argument, NULL, NUM_TRACE_ENTRIES_OPT_IDX}, +#define MAX_REACTOR_DELAY_OPT_IDX 261 + {"max-delay", required_argument, NULL, MAX_REACTOR_DELAY_OPT_IDX}, +#define JSON_CONFIG_OPT_IDX 262 + {"json", required_argument, NULL, JSON_CONFIG_OPT_IDX}, +#define JSON_CONFIG_IGNORE_INIT_ERRORS_IDX 263 + {"json-ignore-init-errors", no_argument, NULL, JSON_CONFIG_IGNORE_INIT_ERRORS_IDX}, +#define IOVA_MODE_OPT_IDX 264 + {"iova-mode", required_argument, NULL, IOVA_MODE_OPT_IDX}, +#define BASE_VIRTADDR_OPT_IDX 265 + {"base-virtaddr", required_argument, NULL, BASE_VIRTADDR_OPT_IDX}, +}; + +/* Global section */ +#define GLOBAL_CONFIG_TMPL \ +"# Configuration file\n" \ +"#\n" \ +"# Please write all parameters using ASCII.\n" \ +"# The parameter must be quoted if it includes whitespace.\n" \ +"#\n" \ +"# Configuration syntax:\n" \ +"# Spaces at head of line are deleted, other spaces are as separator\n" \ +"# Lines starting with '#' are comments and not evaluated.\n" \ +"# Lines ending with '\\' are concatenated with the next line.\n" \ +"# Bracketed keys are section keys grouping the following value keys.\n" \ +"# Number of section key is used as a tag number.\n" \ +"# Ex. [TargetNode1] = TargetNode section key with tag number 1\n" \ +"[Global]\n" \ +" Comment \"Global section\"\n" \ +"\n" \ +" # Users can restrict work items to only run on certain cores by\n" \ +" # specifying a ReactorMask. Default is to allow work items to run\n" \ +" # on all cores. Core 0 must be set in the mask if one is specified.\n" \ +" # Default: 0xFFFF (cores 0-15)\n" \ +" ReactorMask \"0x%s\"\n" \ +"\n" \ +" # Tracepoint group mask for spdk trace buffers\n" \ +" # Default: 0x0 (all tracepoint groups disabled)\n" \ +" # Set to 0xFFFF to enable all tracepoint groups.\n" \ +" TpointGroupMask \"0x%" PRIX64 "\"\n" \ +"\n" \ + +static void +app_config_dump_global_section(FILE *fp) +{ + struct spdk_cpuset *coremask; + + if (NULL == fp) { + return; + } + + coremask = spdk_app_get_core_mask(); + + fprintf(fp, GLOBAL_CONFIG_TMPL, spdk_cpuset_fmt(coremask), + spdk_trace_get_tpoint_group_mask()); +} + +int +spdk_app_get_running_config(char **config_str, char *name) +{ + FILE *fp = NULL; + int fd = -1; + long length = 0, ret = 0; + char vbuf[BUFSIZ]; + char config_template[64]; + + snprintf(config_template, sizeof(config_template), "/tmp/%s.XXXXXX", name); + /* Create temporary file to hold config */ + fd = mkstemp(config_template); + if (fd == -1) { + SPDK_ERRLOG("mkstemp failed\n"); + return -1; + } + fp = fdopen(fd, "wb+"); + if (NULL == fp) { + SPDK_ERRLOG("error opening tmpfile fd = %d\n", fd); + return -1; + } + + /* Buffered IO */ + setvbuf(fp, vbuf, _IOFBF, BUFSIZ); + + app_config_dump_global_section(fp); + spdk_subsystem_config(fp); + + length = ftell(fp); + + *config_str = malloc(length + 1); + if (!*config_str) { + SPDK_ERRLOG("out-of-memory for config\n"); + fclose(fp); + return -1; + } + fseek(fp, 0, SEEK_SET); + ret = fread(*config_str, sizeof(char), length, fp); + if (ret < length) { + SPDK_ERRLOG("short read\n"); + } + fclose(fp); + (*config_str)[length] = '\0'; + + return 0; +} + +static void +app_start_shutdown(void *ctx) +{ + if (g_spdk_app.shutdown_cb) { + g_spdk_app.shutdown_cb(); + g_spdk_app.shutdown_cb = NULL; + } else { + spdk_app_stop(0); + } +} + +void +spdk_app_start_shutdown(void) +{ + spdk_thread_send_critical_msg(g_app_thread, app_start_shutdown); +} + +static void +__shutdown_signal(int signo) +{ + if (!g_shutdown_sig_received) { + g_shutdown_sig_received = true; + spdk_app_start_shutdown(); + } +} + +static int +app_opts_validate(const char *app_opts) +{ + int i = 0, j; + + for (i = 0; app_opts[i] != '\0'; i++) { + /* ignore getopt control characters */ + if (app_opts[i] == ':' || app_opts[i] == '+' || app_opts[i] == '-') { + continue; + } + + for (j = 0; SPDK_APP_GETOPT_STRING[j] != '\0'; j++) { + if (app_opts[i] == SPDK_APP_GETOPT_STRING[j]) { + return app_opts[i]; + } + } + } + return 0; +} + +void +spdk_app_opts_init(struct spdk_app_opts *opts) +{ + if (!opts) { + return; + } + + memset(opts, 0, sizeof(*opts)); + + opts->enable_coredump = true; + opts->shm_id = -1; + opts->mem_size = SPDK_APP_DPDK_DEFAULT_MEM_SIZE; + opts->master_core = SPDK_APP_DPDK_DEFAULT_MASTER_CORE; + opts->mem_channel = SPDK_APP_DPDK_DEFAULT_MEM_CHANNEL; + opts->reactor_mask = NULL; + opts->base_virtaddr = SPDK_APP_DPDK_DEFAULT_BASE_VIRTADDR; + opts->print_level = SPDK_APP_DEFAULT_LOG_PRINT_LEVEL; + opts->rpc_addr = SPDK_DEFAULT_RPC_ADDR; + opts->num_entries = SPDK_APP_DEFAULT_NUM_TRACE_ENTRIES; + opts->delay_subsystem_init = false; +} + +static int +app_setup_signal_handlers(struct spdk_app_opts *opts) +{ + struct sigaction sigact; + sigset_t sigmask; + int rc; + + sigemptyset(&sigmask); + memset(&sigact, 0, sizeof(sigact)); + sigemptyset(&sigact.sa_mask); + + sigact.sa_handler = SIG_IGN; + rc = sigaction(SIGPIPE, &sigact, NULL); + if (rc < 0) { + SPDK_ERRLOG("sigaction(SIGPIPE) failed\n"); + return rc; + } + + /* Install the same handler for SIGINT and SIGTERM */ + g_shutdown_sig_received = false; + sigact.sa_handler = __shutdown_signal; + rc = sigaction(SIGINT, &sigact, NULL); + if (rc < 0) { + SPDK_ERRLOG("sigaction(SIGINT) failed\n"); + return rc; + } + sigaddset(&sigmask, SIGINT); + + rc = sigaction(SIGTERM, &sigact, NULL); + if (rc < 0) { + SPDK_ERRLOG("sigaction(SIGTERM) failed\n"); + return rc; + } + sigaddset(&sigmask, SIGTERM); + + if (opts->usr1_handler != NULL) { + sigact.sa_handler = opts->usr1_handler; + rc = sigaction(SIGUSR1, &sigact, NULL); + if (rc < 0) { + SPDK_ERRLOG("sigaction(SIGUSR1) failed\n"); + return rc; + } + sigaddset(&sigmask, SIGUSR1); + } + + pthread_sigmask(SIG_UNBLOCK, &sigmask, NULL); + + return 0; +} + +static void +app_start_application(void) +{ + assert(spdk_get_thread() == g_app_thread); + + g_start_fn(g_start_arg); +} + +static void +app_start_rpc(int rc, void *arg1) +{ + if (rc) { + spdk_app_stop(rc); + return; + } + + spdk_rpc_initialize(g_spdk_app.rpc_addr); + if (!g_delay_subsystem_init) { + spdk_rpc_set_state(SPDK_RPC_RUNTIME); + app_start_application(); + } +} + +static struct spdk_conf * +app_setup_conf(const char *config_file) +{ + struct spdk_conf *config; + int rc; + + config = spdk_conf_allocate(); + assert(config != NULL); + if (config_file) { + rc = spdk_conf_read(config, config_file); + if (rc != 0) { + SPDK_ERRLOG("Could not read config file %s\n", config_file); + goto error; + } + if (spdk_conf_first_section(config) == NULL) { + SPDK_ERRLOG("Invalid config file %s\n", config_file); + goto error; + } + } + spdk_conf_set_as_default(config); + return config; + +error: + spdk_conf_free(config); + return NULL; +} + +static int +app_opts_add_pci_addr(struct spdk_app_opts *opts, struct spdk_pci_addr **list, char *bdf) +{ + struct spdk_pci_addr *tmp = *list; + size_t i = opts->num_pci_addr; + + tmp = realloc(tmp, sizeof(*tmp) * (i + 1)); + if (tmp == NULL) { + SPDK_ERRLOG("realloc error\n"); + return -ENOMEM; + } + + *list = tmp; + if (spdk_pci_addr_parse(*list + i, bdf) < 0) { + SPDK_ERRLOG("Invalid address %s\n", bdf); + return -EINVAL; + } + + opts->num_pci_addr++; + return 0; +} + +static int +app_read_config_file_global_params(struct spdk_app_opts *opts) +{ + struct spdk_conf_section *sp; + char *bdf; + int i, rc = 0; + + sp = spdk_conf_find_section(NULL, "Global"); + + if (opts->shm_id == -1) { + if (sp != NULL) { + opts->shm_id = spdk_conf_section_get_intval(sp, "SharedMemoryID"); + } + } + + if (opts->reactor_mask == NULL) { + if (sp && spdk_conf_section_get_val(sp, "ReactorMask")) { + SPDK_ERRLOG("ReactorMask config option is deprecated. Use -m/--cpumask\n" + "command line parameter instead.\n"); + opts->reactor_mask = spdk_conf_section_get_val(sp, "ReactorMask"); + } else { + opts->reactor_mask = SPDK_APP_DPDK_DEFAULT_CORE_MASK; + } + } + + if (!opts->no_pci && sp) { + opts->no_pci = spdk_conf_section_get_boolval(sp, "NoPci", false); + } + + if (opts->tpoint_group_mask == NULL) { + if (sp != NULL) { + opts->tpoint_group_mask = spdk_conf_section_get_val(sp, "TpointGroupMask"); + } + } + + if (sp == NULL) { + return 0; + } + + for (i = 0; ; i++) { + bdf = spdk_conf_section_get_nmval(sp, "PciBlacklist", i, 0); + if (!bdf) { + break; + } + + rc = app_opts_add_pci_addr(opts, &opts->pci_blacklist, bdf); + if (rc != 0) { + free(opts->pci_blacklist); + return rc; + } + } + + for (i = 0; ; i++) { + bdf = spdk_conf_section_get_nmval(sp, "PciWhitelist", i, 0); + if (!bdf) { + break; + } + + if (opts->pci_blacklist != NULL) { + SPDK_ERRLOG("PciBlacklist and PciWhitelist cannot be used at the same time\n"); + free(opts->pci_blacklist); + return -EINVAL; + } + + rc = app_opts_add_pci_addr(opts, &opts->pci_whitelist, bdf); + if (rc != 0) { + free(opts->pci_whitelist); + return rc; + } + } + return 0; +} + +static int +app_setup_env(struct spdk_app_opts *opts) +{ + struct spdk_env_opts env_opts = {}; + int rc; + + if (opts == NULL) { + rc = spdk_env_init(NULL); + if (rc != 0) { + SPDK_ERRLOG("Unable to reinitialize SPDK env\n"); + } + + return rc; + } + + + spdk_env_opts_init(&env_opts); + + env_opts.name = opts->name; + env_opts.core_mask = opts->reactor_mask; + env_opts.shm_id = opts->shm_id; + env_opts.mem_channel = opts->mem_channel; + env_opts.master_core = opts->master_core; + env_opts.mem_size = opts->mem_size; + env_opts.hugepage_single_segments = opts->hugepage_single_segments; + env_opts.unlink_hugepage = opts->unlink_hugepage; + env_opts.hugedir = opts->hugedir; + env_opts.no_pci = opts->no_pci; + env_opts.num_pci_addr = opts->num_pci_addr; + env_opts.pci_blacklist = opts->pci_blacklist; + env_opts.pci_whitelist = opts->pci_whitelist; + env_opts.env_context = opts->env_context; + env_opts.iova_mode = opts->iova_mode; + + rc = spdk_env_init(&env_opts); + free(env_opts.pci_blacklist); + free(env_opts.pci_whitelist); + + + if (rc < 0) { + SPDK_ERRLOG("Unable to initialize SPDK env\n"); + } + + return rc; +} + +static int +app_setup_trace(struct spdk_app_opts *opts) +{ + char shm_name[64]; + uint64_t tpoint_group_mask; + char *end; + + if (opts->shm_id >= 0) { + snprintf(shm_name, sizeof(shm_name), "/%s_trace.%d", opts->name, opts->shm_id); + } else { + snprintf(shm_name, sizeof(shm_name), "/%s_trace.pid%d", opts->name, (int)getpid()); + } + + if (spdk_trace_init(shm_name, opts->num_entries) != 0) { + return -1; + } + + if (opts->tpoint_group_mask != NULL) { + errno = 0; + tpoint_group_mask = strtoull(opts->tpoint_group_mask, &end, 16); + if (*end != '\0' || errno) { + SPDK_ERRLOG("invalid tpoint mask %s\n", opts->tpoint_group_mask); + } else { + SPDK_NOTICELOG("Tracepoint Group Mask %s specified.\n", opts->tpoint_group_mask); + SPDK_NOTICELOG("Use 'spdk_trace -s %s %s %d' to capture a snapshot of events at runtime.\n", + opts->name, + opts->shm_id >= 0 ? "-i" : "-p", + opts->shm_id >= 0 ? opts->shm_id : getpid()); +#if defined(__linux__) + SPDK_NOTICELOG("Or copy /dev/shm%s for offline analysis/debug.\n", shm_name); +#endif + spdk_trace_set_tpoint_group_mask(tpoint_group_mask); + } + } + + return 0; +} + +static void +bootstrap_fn(void *arg1) +{ + if (g_spdk_app.json_config_file) { + g_delay_subsystem_init = false; + spdk_app_json_config_load(g_spdk_app.json_config_file, g_spdk_app.rpc_addr, app_start_rpc, + NULL, !g_spdk_app.json_config_ignore_errors); + } else { + if (!g_delay_subsystem_init) { + spdk_subsystem_init(app_start_rpc, NULL); + } else { + spdk_rpc_initialize(g_spdk_app.rpc_addr); + } + } +} + +int +spdk_app_start(struct spdk_app_opts *opts, spdk_msg_fn start_fn, + void *arg1) +{ + struct spdk_conf *config = NULL; + int rc; + char *tty; + struct spdk_cpuset tmp_cpumask = {}; + static bool g_env_was_setup = false; + + if (!opts) { + SPDK_ERRLOG("opts should not be NULL\n"); + return 1; + } + + if (!start_fn) { + SPDK_ERRLOG("start_fn should not be NULL\n"); + return 1; + } + + tty = ttyname(STDERR_FILENO); + if (opts->print_level > SPDK_LOG_WARN && + isatty(STDERR_FILENO) && + tty && + !strncmp(tty, "/dev/tty", strlen("/dev/tty"))) { + printf("Warning: printing stderr to console terminal without -q option specified.\n"); + printf("Suggest using --silence-noticelog to disable logging to stderr and\n"); + printf("monitor syslog, or redirect stderr to a file.\n"); + printf("(Delaying for 10 seconds...)\n"); + sleep(10); + } + + spdk_log_set_print_level(opts->print_level); + +#ifndef SPDK_NO_RLIMIT + if (opts->enable_coredump) { + struct rlimit core_limits; + + core_limits.rlim_cur = core_limits.rlim_max = SPDK_APP_DEFAULT_CORE_LIMIT; + setrlimit(RLIMIT_CORE, &core_limits); + } +#endif + + config = app_setup_conf(opts->config_file); + if (config == NULL) { + return 1; + } + + if (app_read_config_file_global_params(opts) < 0) { + spdk_conf_free(config); + return 1; + } + + memset(&g_spdk_app, 0, sizeof(g_spdk_app)); + g_spdk_app.config = config; + g_spdk_app.json_config_file = opts->json_config_file; + g_spdk_app.json_config_ignore_errors = opts->json_config_ignore_errors; + g_spdk_app.rpc_addr = opts->rpc_addr; + g_spdk_app.shm_id = opts->shm_id; + g_spdk_app.shutdown_cb = opts->shutdown_cb; + g_spdk_app.rc = 0; + + spdk_log_set_level(SPDK_APP_DEFAULT_LOG_LEVEL); + + /* Pass NULL to app_setup_env if SPDK app has been set up, in order to + * indicate that this is a reinitialization. + */ + if (app_setup_env(g_env_was_setup ? NULL : opts) < 0) { + return 1; + } + + spdk_log_open(opts->log); + SPDK_NOTICELOG("Total cores available: %d\n", spdk_env_get_core_count()); + + /* + * If mask not specified on command line or in configuration file, + * reactor_mask will be 0x1 which will enable core 0 to run one + * reactor. + */ + if ((rc = spdk_reactors_init()) != 0) { + SPDK_ERRLOG("Reactor Initilization failed: rc = %d\n", rc); + return 1; + } + + spdk_cpuset_set_cpu(&tmp_cpumask, spdk_env_get_current_core(), true); + + /* Now that the reactors have been initialized, we can create an + * initialization thread. */ + g_app_thread = spdk_thread_create("app_thread", &tmp_cpumask); + if (!g_app_thread) { + SPDK_ERRLOG("Unable to create an spdk_thread for initialization\n"); + return 1; + } + + /* + * Note the call to app_setup_trace() is located here + * ahead of app_setup_signal_handlers(). + * That's because there is not an easy/direct clean + * way of unwinding alloc'd resources that can occur + * in app_setup_signal_handlers(). + */ + if (app_setup_trace(opts) != 0) { + return 1; + } + + if ((rc = app_setup_signal_handlers(opts)) != 0) { + return 1; + } + + g_delay_subsystem_init = opts->delay_subsystem_init; + g_start_fn = start_fn; + g_start_arg = arg1; + + spdk_thread_send_msg(g_app_thread, bootstrap_fn, NULL); + + /* This blocks until spdk_app_stop is called */ + spdk_reactors_start(); + + g_env_was_setup = true; + + return g_spdk_app.rc; +} + +void +spdk_app_fini(void) +{ + spdk_trace_cleanup(); + spdk_reactors_fini(); + spdk_env_fini(); + spdk_conf_free(g_spdk_app.config); + spdk_log_close(); +} + +static void +app_stop(void *arg1) +{ + spdk_rpc_finish(); + spdk_subsystem_fini(spdk_reactors_stop, NULL); +} + +void +spdk_app_stop(int rc) +{ + if (rc) { + SPDK_WARNLOG("spdk_app_stop'd on non-zero\n"); + } + g_spdk_app.rc = rc; + /* + * We want to run spdk_subsystem_fini() from the same thread where spdk_subsystem_init() + * was called. + */ + spdk_thread_send_msg(g_app_thread, app_stop, NULL); +} + +static void +usage(void (*app_usage)(void)) +{ + printf("%s [options]\n", g_executable_name); + printf("options:\n"); + printf(" -c, --config <config> config file (default %s)\n", + g_default_opts.config_file != NULL ? g_default_opts.config_file : "none"); + printf(" --json <config> JSON config file (default %s)\n", + g_default_opts.json_config_file != NULL ? g_default_opts.json_config_file : "none"); + printf(" --json-ignore-init-errors\n"); + printf(" don't exit on invalid config entry\n"); + printf(" -d, --limit-coredump do not set max coredump size to RLIM_INFINITY\n"); + printf(" -g, --single-file-segments\n"); + printf(" force creating just one hugetlbfs file\n"); + printf(" -h, --help show this usage\n"); + printf(" -i, --shm-id <id> shared memory ID (optional)\n"); + printf(" -m, --cpumask <mask> core mask for DPDK\n"); + printf(" -n, --mem-channels <num> channel number of memory channels used for DPDK\n"); + printf(" -p, --master-core <id> master (primary) core for DPDK\n"); + printf(" -r, --rpc-socket <path> RPC listen address (default %s)\n", SPDK_DEFAULT_RPC_ADDR); + printf(" -s, --mem-size <size> memory size in MB for DPDK (default: "); +#ifndef __linux__ + if (g_default_opts.mem_size <= 0) { + printf("all hugepage memory)\n"); + } else +#endif + { + printf("%dMB)\n", g_default_opts.mem_size >= 0 ? g_default_opts.mem_size : 0); + } + printf(" --silence-noticelog disable notice level logging to stderr\n"); + printf(" -u, --no-pci disable PCI access\n"); + printf(" --wait-for-rpc wait for RPCs to initialize subsystems\n"); + printf(" --max-delay <num> maximum reactor delay (in microseconds)\n"); + printf(" -B, --pci-blacklist <bdf>\n"); + printf(" pci addr to blacklist (can be used more than once)\n"); + printf(" -R, --huge-unlink unlink huge files after initialization\n"); + printf(" -v, --version print SPDK version\n"); + printf(" -W, --pci-whitelist <bdf>\n"); + printf(" pci addr to whitelist (-B and -W cannot be used at the same time)\n"); + printf(" --huge-dir <path> use a specific hugetlbfs mount to reserve memory from\n"); + printf(" --iova-mode <pa/va> set IOVA mode ('pa' for IOVA_PA and 'va' for IOVA_VA)\n"); + printf(" --base-virtaddr <addr> the base virtual address for DPDK (default: 0x200000000000)\n"); + printf(" --num-trace-entries <num> number of trace entries for each core, must be power of 2. (default %d)\n", + SPDK_APP_DEFAULT_NUM_TRACE_ENTRIES); + spdk_log_usage(stdout, "-L"); + spdk_trace_mask_usage(stdout, "-e"); + if (app_usage) { + app_usage(); + } +} + +spdk_app_parse_args_rvals_t +spdk_app_parse_args(int argc, char **argv, struct spdk_app_opts *opts, + const char *app_getopt_str, struct option *app_long_opts, + int (*app_parse)(int ch, char *arg), + void (*app_usage)(void)) +{ + int ch, rc, opt_idx, global_long_opts_len, app_long_opts_len; + struct option *cmdline_options; + char *cmdline_short_opts = NULL; + enum spdk_app_parse_args_rvals retval = SPDK_APP_PARSE_ARGS_FAIL; + long int tmp; + + memcpy(&g_default_opts, opts, sizeof(g_default_opts)); + + if (opts->config_file && access(opts->config_file, R_OK) != 0) { + SPDK_WARNLOG("Can't read legacy configuration file '%s'\n", opts->config_file); + opts->config_file = NULL; + } + + if (opts->json_config_file && access(opts->json_config_file, R_OK) != 0) { + SPDK_WARNLOG("Can't read JSON configuration file '%s'\n", opts->json_config_file); + opts->json_config_file = NULL; + } + + if (app_long_opts == NULL) { + app_long_opts_len = 0; + } else { + for (app_long_opts_len = 0; + app_long_opts[app_long_opts_len].name != NULL; + app_long_opts_len++); + } + + global_long_opts_len = SPDK_COUNTOF(g_cmdline_options); + + cmdline_options = calloc(global_long_opts_len + app_long_opts_len + 1, sizeof(*cmdline_options)); + if (!cmdline_options) { + SPDK_ERRLOG("Out of memory\n"); + return SPDK_APP_PARSE_ARGS_FAIL; + } + + memcpy(&cmdline_options[0], g_cmdline_options, sizeof(g_cmdline_options)); + if (app_long_opts) { + memcpy(&cmdline_options[global_long_opts_len], app_long_opts, + app_long_opts_len * sizeof(*app_long_opts)); + } + + if (app_getopt_str != NULL) { + ch = app_opts_validate(app_getopt_str); + if (ch) { + SPDK_ERRLOG("Duplicated option '%c' between the generic and application specific spdk opts.\n", + ch); + goto out; + } + } + + cmdline_short_opts = spdk_sprintf_alloc("%s%s", app_getopt_str, SPDK_APP_GETOPT_STRING); + if (!cmdline_short_opts) { + SPDK_ERRLOG("Out of memory\n"); + goto out; + } + + g_executable_name = argv[0]; + + while ((ch = getopt_long(argc, argv, cmdline_short_opts, cmdline_options, &opt_idx)) != -1) { + switch (ch) { + case CONFIG_FILE_OPT_IDX: + opts->config_file = optarg; + break; + case JSON_CONFIG_OPT_IDX: + opts->json_config_file = optarg; + break; + case JSON_CONFIG_IGNORE_INIT_ERRORS_IDX: + opts->json_config_ignore_errors = true; + break; + case LIMIT_COREDUMP_OPT_IDX: + opts->enable_coredump = false; + break; + case TPOINT_GROUP_MASK_OPT_IDX: + opts->tpoint_group_mask = optarg; + break; + case SINGLE_FILE_SEGMENTS_OPT_IDX: + opts->hugepage_single_segments = true; + break; + case HELP_OPT_IDX: + usage(app_usage); + retval = SPDK_APP_PARSE_ARGS_HELP; + goto out; + case SHM_ID_OPT_IDX: + opts->shm_id = spdk_strtol(optarg, 0); + if (opts->shm_id < 0) { + SPDK_ERRLOG("Invalid shared memory ID %s\n", optarg); + goto out; + } + break; + case CPUMASK_OPT_IDX: + opts->reactor_mask = optarg; + break; + case MEM_CHANNELS_OPT_IDX: + opts->mem_channel = spdk_strtol(optarg, 0); + if (opts->mem_channel < 0) { + SPDK_ERRLOG("Invalid memory channel %s\n", optarg); + goto out; + } + break; + case MASTER_CORE_OPT_IDX: + opts->master_core = spdk_strtol(optarg, 0); + if (opts->master_core < 0) { + SPDK_ERRLOG("Invalid master core %s\n", optarg); + goto out; + } + break; + case SILENCE_NOTICELOG_OPT_IDX: + opts->print_level = SPDK_LOG_WARN; + break; + case RPC_SOCKET_OPT_IDX: + opts->rpc_addr = optarg; + break; + case MEM_SIZE_OPT_IDX: { + uint64_t mem_size_mb; + bool mem_size_has_prefix; + + rc = spdk_parse_capacity(optarg, &mem_size_mb, &mem_size_has_prefix); + if (rc != 0) { + SPDK_ERRLOG("invalid memory pool size `-s %s`\n", optarg); + usage(app_usage); + goto out; + } + + if (mem_size_has_prefix) { + /* the mem size is in MB by default, so if a prefix was + * specified, we need to manually convert to MB. + */ + mem_size_mb /= 1024 * 1024; + } + + if (mem_size_mb > INT_MAX) { + SPDK_ERRLOG("invalid memory pool size `-s %s`\n", optarg); + usage(app_usage); + goto out; + } + + opts->mem_size = (int) mem_size_mb; + break; + } + case NO_PCI_OPT_IDX: + opts->no_pci = true; + break; + case WAIT_FOR_RPC_OPT_IDX: + opts->delay_subsystem_init = true; + break; + case PCI_BLACKLIST_OPT_IDX: + if (opts->pci_whitelist) { + free(opts->pci_whitelist); + opts->pci_whitelist = NULL; + SPDK_ERRLOG("-B and -W cannot be used at the same time\n"); + usage(app_usage); + goto out; + } + + rc = app_opts_add_pci_addr(opts, &opts->pci_blacklist, optarg); + if (rc != 0) { + free(opts->pci_blacklist); + opts->pci_blacklist = NULL; + goto out; + } + break; + case LOGFLAG_OPT_IDX: +#ifndef DEBUG + SPDK_ERRLOG("%s must be configured with --enable-debug for -L flag\n", + argv[0]); + usage(app_usage); + goto out; +#else + rc = spdk_log_set_flag(optarg); + if (rc < 0) { + SPDK_ERRLOG("unknown flag\n"); + usage(app_usage); + goto out; + } + opts->print_level = SPDK_LOG_DEBUG; + break; +#endif + case HUGE_UNLINK_OPT_IDX: + opts->unlink_hugepage = true; + break; + case PCI_WHITELIST_OPT_IDX: + if (opts->pci_blacklist) { + free(opts->pci_blacklist); + opts->pci_blacklist = NULL; + SPDK_ERRLOG("-B and -W cannot be used at the same time\n"); + usage(app_usage); + goto out; + } + + rc = app_opts_add_pci_addr(opts, &opts->pci_whitelist, optarg); + if (rc != 0) { + free(opts->pci_whitelist); + opts->pci_whitelist = NULL; + goto out; + } + break; + case BASE_VIRTADDR_OPT_IDX: + tmp = spdk_strtoll(optarg, 0); + if (tmp <= 0) { + SPDK_ERRLOG("Invalid base-virtaddr %s\n", optarg); + usage(app_usage); + goto out; + } + opts->base_virtaddr = (uint64_t)tmp; + break; + case HUGE_DIR_OPT_IDX: + opts->hugedir = optarg; + break; + case IOVA_MODE_OPT_IDX: + opts->iova_mode = optarg; + break; + case NUM_TRACE_ENTRIES_OPT_IDX: + tmp = spdk_strtoll(optarg, 0); + if (tmp <= 0) { + SPDK_ERRLOG("Invalid num-trace-entries %s\n", optarg); + usage(app_usage); + goto out; + } + opts->num_entries = (uint64_t)tmp; + if (opts->num_entries & (opts->num_entries - 1)) { + SPDK_ERRLOG("num-trace-entries must be power of 2\n"); + usage(app_usage); + goto out; + } + break; + case MAX_REACTOR_DELAY_OPT_IDX: + SPDK_ERRLOG("Deprecation warning: The maximum allowed latency parameter is no longer supported.\n"); + break; + case VERSION_OPT_IDX: + printf(SPDK_VERSION_STRING"\n"); + retval = SPDK_APP_PARSE_ARGS_HELP; + goto out; + case '?': + /* + * In the event getopt() above detects an option + * in argv that is NOT in the getopt_str, + * getopt() will return a '?' indicating failure. + */ + usage(app_usage); + goto out; + default: + rc = app_parse(ch, optarg); + if (rc) { + SPDK_ERRLOG("Parsing application specific arguments failed: %d\n", rc); + goto out; + } + } + } + + if (opts->config_file && opts->json_config_file) { + SPDK_ERRLOG("ERROR: Legacy config and JSON config can't be used together.\n"); + goto out; + } + + if (opts->json_config_file && opts->delay_subsystem_init) { + SPDK_ERRLOG("ERROR: JSON configuration file can't be used together with --wait-for-rpc.\n"); + goto out; + } + + /* TBD: Replace warning by failure when RPCs for startup are prepared. */ + if (opts->config_file && opts->delay_subsystem_init) { + fprintf(stderr, + "WARNING: --wait-for-rpc and config file are used at the same time. " + "- Please be careful one options might overwrite others.\n"); + } + + retval = SPDK_APP_PARSE_ARGS_SUCCESS; +out: + if (retval != SPDK_APP_PARSE_ARGS_SUCCESS) { + free(opts->pci_blacklist); + opts->pci_blacklist = NULL; + free(opts->pci_whitelist); + opts->pci_whitelist = NULL; + } + free(cmdline_short_opts); + free(cmdline_options); + return retval; +} + +void +spdk_app_usage(void) +{ + if (g_executable_name == NULL) { + SPDK_ERRLOG("%s not valid before calling spdk_app_parse_args()\n", __func__); + return; + } + + usage(NULL); +} + +static void +rpc_framework_start_init_cpl(int rc, void *arg1) +{ + struct spdk_jsonrpc_request *request = arg1; + struct spdk_json_write_ctx *w; + + assert(spdk_get_thread() == g_app_thread); + + if (rc) { + spdk_jsonrpc_send_error_response(request, SPDK_JSONRPC_ERROR_INTERNAL_ERROR, + "framework_initialization failed"); + return; + } + + spdk_rpc_set_state(SPDK_RPC_RUNTIME); + app_start_application(); + + w = spdk_jsonrpc_begin_result(request); + spdk_json_write_bool(w, true); + spdk_jsonrpc_end_result(request, w); +} + +static void +rpc_framework_start_init(struct spdk_jsonrpc_request *request, + const struct spdk_json_val *params) +{ + if (params != NULL) { + spdk_jsonrpc_send_error_response(request, SPDK_JSONRPC_ERROR_INVALID_PARAMS, + "framework_start_init requires no parameters"); + return; + } + + spdk_subsystem_init(rpc_framework_start_init_cpl, request); +} +SPDK_RPC_REGISTER("framework_start_init", rpc_framework_start_init, SPDK_RPC_STARTUP) +SPDK_RPC_REGISTER_ALIAS_DEPRECATED(framework_start_init, start_subsystem_init) + +struct subsystem_init_poller_ctx { + struct spdk_poller *init_poller; + struct spdk_jsonrpc_request *request; +}; + +static int +rpc_subsystem_init_poller_ctx(void *ctx) +{ + struct spdk_json_write_ctx *w; + struct subsystem_init_poller_ctx *poller_ctx = ctx; + + if (spdk_rpc_get_state() == SPDK_RPC_RUNTIME) { + w = spdk_jsonrpc_begin_result(poller_ctx->request); + spdk_json_write_bool(w, true); + spdk_jsonrpc_end_result(poller_ctx->request, w); + spdk_poller_unregister(&poller_ctx->init_poller); + free(poller_ctx); + } + + return SPDK_POLLER_BUSY; +} + +static void +rpc_framework_wait_init(struct spdk_jsonrpc_request *request, + const struct spdk_json_val *params) +{ + struct spdk_json_write_ctx *w; + struct subsystem_init_poller_ctx *ctx; + + if (spdk_rpc_get_state() == SPDK_RPC_RUNTIME) { + w = spdk_jsonrpc_begin_result(request); + spdk_json_write_bool(w, true); + spdk_jsonrpc_end_result(request, w); + } else { + ctx = malloc(sizeof(struct subsystem_init_poller_ctx)); + if (ctx == NULL) { + spdk_jsonrpc_send_error_response(request, SPDK_JSONRPC_ERROR_INTERNAL_ERROR, + "Unable to allocate memory for the request context\n"); + return; + } + ctx->request = request; + ctx->init_poller = SPDK_POLLER_REGISTER(rpc_subsystem_init_poller_ctx, ctx, 0); + } +} +SPDK_RPC_REGISTER("framework_wait_init", rpc_framework_wait_init, + SPDK_RPC_STARTUP | SPDK_RPC_RUNTIME) +SPDK_RPC_REGISTER_ALIAS_DEPRECATED(framework_wait_init, wait_subsystem_init) diff --git a/src/spdk/lib/event/json_config.c b/src/spdk/lib/event/json_config.c new file mode 100644 index 000000000..69a95097a --- /dev/null +++ b/src/spdk/lib/event/json_config.c @@ -0,0 +1,630 @@ +/*- + * BSD LICENSE + * + * Copyright (c) Intel Corporation. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions + * are met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in + * the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Intel Corporation nor the names of its + * contributors may be used to endorse or promote products derived + * from this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +#include "spdk/stdinc.h" + +#include "spdk/util.h" +#include "spdk/file.h" +#include "spdk/log.h" +#include "spdk/env.h" +#include "spdk/thread.h" +#include "spdk/jsonrpc.h" +#include "spdk/rpc.h" + +#include "spdk_internal/event.h" +#include "spdk_internal/log.h" + +#define SPDK_DEBUG_APP_CFG(...) SPDK_DEBUGLOG(SPDK_LOG_APP_CONFIG, __VA_ARGS__) + +/* JSON configuration format is as follows + * + * { + * "subsystems" : [ <<== *subsystems JSON array + * { <<== *subsystems_it array entry pointer (iterator) + * "subsystem": "<< SUBSYSTEM NAME >>", + * "config": [ <<== *config JSON array + * { <<== *config_it array entry pointer (iterator) + * "method": "<< METHOD NAME >>", <<== *method + * "params": { << PARAMS >> } <<== *params + * }, + * << MORE "config" ARRY ENTRIES >> + * ] + * }, + * << MORE "subsystems" ARRAY ENTRIES >> + * ] + * + * << ANYTHING ELSE IS IGNORRED IN ROOT OBJECT>> + * } + * + */ + +struct load_json_config_ctx; +typedef void (*client_resp_handler)(struct load_json_config_ctx *, + struct spdk_jsonrpc_client_response *); + +#define RPC_SOCKET_PATH_MAX sizeof(((struct sockaddr_un *)0)->sun_path) + +/* 1s connections timeout */ +#define RPC_CLIENT_CONNECT_TIMEOUT_US (1U * 1000U * 1000U) + +/* + * Currently there is no timeout in SPDK for any RPC command. This result that + * we can't put a hard limit during configuration load as it most likely randomly fail. + * So just print WARNLOG every 10s. */ +#define RPC_CLIENT_REQUEST_TIMEOUT_US (10U * 1000 * 1000) + +struct load_json_config_ctx { + /* Thread used during configuration. */ + struct spdk_thread *thread; + spdk_subsystem_init_fn cb_fn; + void *cb_arg; + bool stop_on_error; + + /* Current subsystem */ + struct spdk_json_val *subsystems; /* "subsystems" array */ + struct spdk_json_val *subsystems_it; /* current subsystem array position in "subsystems" array */ + + struct spdk_json_val *subsystem_name; /* current subsystem name */ + + /* Current "config" entry we are processing */ + struct spdk_json_val *config; /* "config" array */ + struct spdk_json_val *config_it; /* current config position in "config" array */ + + /* Current request id we are sending. */ + uint32_t rpc_request_id; + + /* Whole configuration file read and parsed. */ + size_t json_data_size; + char *json_data; + + size_t values_cnt; + struct spdk_json_val *values; + + char rpc_socket_path_temp[RPC_SOCKET_PATH_MAX + 1]; + + struct spdk_jsonrpc_client *client_conn; + struct spdk_poller *client_conn_poller; + + client_resp_handler client_resp_cb; + + /* Timeout for current RPC client action. */ + uint64_t timeout; +}; + +static void app_json_config_load_subsystem(void *_ctx); + +static void +app_json_config_load_done(struct load_json_config_ctx *ctx, int rc) +{ + spdk_poller_unregister(&ctx->client_conn_poller); + if (ctx->client_conn != NULL) { + spdk_jsonrpc_client_close(ctx->client_conn); + } + + spdk_rpc_finish(); + + SPDK_DEBUG_APP_CFG("Config load finished with rc %d\n", rc); + ctx->cb_fn(rc, ctx->cb_arg); + + free(ctx->json_data); + free(ctx->values); + free(ctx); +} + +static void +rpc_client_set_timeout(struct load_json_config_ctx *ctx, uint64_t timeout_us) +{ + ctx->timeout = spdk_get_ticks() + timeout_us * spdk_get_ticks_hz() / (1000 * 1000); +} + +static int +rpc_client_check_timeout(struct load_json_config_ctx *ctx) +{ + if (ctx->timeout < spdk_get_ticks()) { + SPDK_WARNLOG("RPC client command timeout.\n"); + return -ETIMEDOUT; + } + + return 0; +} + +struct json_write_buf { + char data[1024]; + unsigned cur_off; +}; + +static int +json_write_stdout(void *cb_ctx, const void *data, size_t size) +{ + struct json_write_buf *buf = cb_ctx; + size_t rc; + + rc = snprintf(buf->data + buf->cur_off, sizeof(buf->data) - buf->cur_off, + "%s", (const char *)data); + if (rc > 0) { + buf->cur_off += rc; + } + return rc == size ? 0 : -1; +} + +static int +rpc_client_poller(void *arg) +{ + struct load_json_config_ctx *ctx = arg; + struct spdk_jsonrpc_client_response *resp; + client_resp_handler cb; + int rc; + + assert(spdk_get_thread() == ctx->thread); + + rc = spdk_jsonrpc_client_poll(ctx->client_conn, 0); + if (rc == 0) { + rc = rpc_client_check_timeout(ctx); + if (rc == -ETIMEDOUT) { + rpc_client_set_timeout(ctx, RPC_CLIENT_REQUEST_TIMEOUT_US); + rc = 0; + } + } + + if (rc == 0) { + /* No response yet */ + return SPDK_POLLER_BUSY; + } else if (rc < 0) { + app_json_config_load_done(ctx, rc); + return SPDK_POLLER_BUSY; + } + + resp = spdk_jsonrpc_client_get_response(ctx->client_conn); + assert(resp); + + if (resp->error) { + struct json_write_buf buf = {}; + struct spdk_json_write_ctx *w = spdk_json_write_begin(json_write_stdout, + &buf, SPDK_JSON_PARSE_FLAG_DECODE_IN_PLACE); + + if (w == NULL) { + SPDK_ERRLOG("error response: (?)\n"); + } else { + spdk_json_write_val(w, resp->error); + spdk_json_write_end(w); + SPDK_ERRLOG("error response: \n%s\n", buf.data); + } + } + + if (resp->error && ctx->stop_on_error) { + spdk_jsonrpc_client_free_response(resp); + app_json_config_load_done(ctx, -EINVAL); + } else { + /* We have response so we must have callback for it. */ + cb = ctx->client_resp_cb; + assert(cb != NULL); + + /* Mark we are done with this handler. */ + ctx->client_resp_cb = NULL; + cb(ctx, resp); + } + + + return SPDK_POLLER_BUSY; +} + +static int +rpc_client_connect_poller(void *_ctx) +{ + struct load_json_config_ctx *ctx = _ctx; + int rc; + + rc = spdk_jsonrpc_client_poll(ctx->client_conn, 0); + if (rc != -ENOTCONN) { + /* We are connected. Start regular poller and issue first request */ + spdk_poller_unregister(&ctx->client_conn_poller); + ctx->client_conn_poller = SPDK_POLLER_REGISTER(rpc_client_poller, ctx, 100); + app_json_config_load_subsystem(ctx); + } else { + rc = rpc_client_check_timeout(ctx); + if (rc) { + app_json_config_load_done(ctx, rc); + } + + return SPDK_POLLER_IDLE; + } + + return SPDK_POLLER_BUSY; +} + +static int +client_send_request(struct load_json_config_ctx *ctx, struct spdk_jsonrpc_client_request *request, + client_resp_handler client_resp_cb) +{ + int rc; + + assert(spdk_get_thread() == ctx->thread); + + ctx->client_resp_cb = client_resp_cb; + rpc_client_set_timeout(ctx, RPC_CLIENT_REQUEST_TIMEOUT_US); + rc = spdk_jsonrpc_client_send_request(ctx->client_conn, request); + + if (rc) { + SPDK_DEBUG_APP_CFG("Sending request to client failed (%d)\n", rc); + } + + return rc; +} + +static int +cap_string(const struct spdk_json_val *val, void *out) +{ + const struct spdk_json_val **vptr = out; + + if (val->type != SPDK_JSON_VAL_STRING) { + return -EINVAL; + } + + *vptr = val; + return 0; +} + +static int +cap_object(const struct spdk_json_val *val, void *out) +{ + const struct spdk_json_val **vptr = out; + + if (val->type != SPDK_JSON_VAL_OBJECT_BEGIN) { + return -EINVAL; + } + + *vptr = val; + return 0; +} + + +static int +cap_array_or_null(const struct spdk_json_val *val, void *out) +{ + const struct spdk_json_val **vptr = out; + + if (val->type != SPDK_JSON_VAL_ARRAY_BEGIN && val->type != SPDK_JSON_VAL_NULL) { + return -EINVAL; + } + + *vptr = val; + return 0; +} + +struct config_entry { + char *method; + struct spdk_json_val *params; +}; + +static struct spdk_json_object_decoder jsonrpc_cmd_decoders[] = { + {"method", offsetof(struct config_entry, method), spdk_json_decode_string}, + {"params", offsetof(struct config_entry, params), cap_object, true} +}; + +static void app_json_config_load_subsystem_config_entry(void *_ctx); + +static void +app_json_config_load_subsystem_config_entry_next(struct load_json_config_ctx *ctx, + struct spdk_jsonrpc_client_response *resp) +{ + /* Don't care about the response */ + spdk_jsonrpc_client_free_response(resp); + + ctx->config_it = spdk_json_next(ctx->config_it); + app_json_config_load_subsystem_config_entry(ctx); +} + +/* Load "config" entry */ +static void +app_json_config_load_subsystem_config_entry(void *_ctx) +{ + struct load_json_config_ctx *ctx = _ctx; + struct spdk_jsonrpc_client_request *rpc_request; + struct spdk_json_write_ctx *w; + struct config_entry cfg = {}; + struct spdk_json_val *params_end; + size_t params_len; + int rc; + + if (ctx->config_it == NULL) { + SPDK_DEBUG_APP_CFG("Subsystem '%.*s': configuration done.\n", ctx->subsystem_name->len, + (char *)ctx->subsystem_name->start); + ctx->subsystems_it = spdk_json_next(ctx->subsystems_it); + /* Invoke later to avoid recurrency */ + spdk_thread_send_msg(ctx->thread, app_json_config_load_subsystem, ctx); + return; + } + + if (spdk_json_decode_object(ctx->config_it, jsonrpc_cmd_decoders, + SPDK_COUNTOF(jsonrpc_cmd_decoders), &cfg)) { + params_end = spdk_json_next(ctx->config_it); + assert(params_end != NULL); + params_len = params_end->start - ctx->config->start + 1; + SPDK_ERRLOG("Failed to decode config entry: %.*s!\n", (int)params_len, (char *)ctx->config_it); + app_json_config_load_done(ctx, -EINVAL); + goto out; + } + + rc = spdk_rpc_is_method_allowed(cfg.method, spdk_rpc_get_state()); + if (rc == -EPERM) { + SPDK_DEBUG_APP_CFG("Method '%s' not allowed -> skipping\n", cfg.method); + /* Invoke later to avoid recurrency */ + ctx->config_it = spdk_json_next(ctx->config_it); + spdk_thread_send_msg(ctx->thread, app_json_config_load_subsystem_config_entry, ctx); + goto out; + } + + /* Get _END by skipping params and going back by one element. */ + params_end = cfg.params + spdk_json_val_len(cfg.params) - 1; + + /* Need to add one character to include '}' */ + params_len = params_end->start - cfg.params->start + 1; + + SPDK_DEBUG_APP_CFG("\tmethod: %s\n", cfg.method); + SPDK_DEBUG_APP_CFG("\tparams: %.*s\n", (int)params_len, (char *)cfg.params->start); + + rpc_request = spdk_jsonrpc_client_create_request(); + if (!rpc_request) { + app_json_config_load_done(ctx, -errno); + goto out; + } + + w = spdk_jsonrpc_begin_request(rpc_request, ctx->rpc_request_id, NULL); + if (!w) { + spdk_jsonrpc_client_free_request(rpc_request); + app_json_config_load_done(ctx, -ENOMEM); + goto out; + } + + spdk_json_write_named_string(w, "method", cfg.method); + + /* No need to parse "params". Just dump the whole content of "params" + * directly into the request and let the remote side verify it. */ + spdk_json_write_name(w, "params"); + spdk_json_write_val_raw(w, cfg.params->start, params_len); + spdk_jsonrpc_end_request(rpc_request, w); + + rc = client_send_request(ctx, rpc_request, app_json_config_load_subsystem_config_entry_next); + if (rc != 0) { + app_json_config_load_done(ctx, -rc); + goto out; + } +out: + free(cfg.method); +} + +static void +subsystem_init_done(int rc, void *arg1) +{ + struct load_json_config_ctx *ctx = arg1; + + if (rc) { + app_json_config_load_done(ctx, rc); + return; + } + + spdk_rpc_set_state(SPDK_RPC_RUNTIME); + /* Another round. This time for RUNTIME methods */ + SPDK_DEBUG_APP_CFG("'framework_start_init' done - continuing configuration\n"); + + assert(ctx != NULL); + if (ctx->subsystems) { + ctx->subsystems_it = spdk_json_array_first(ctx->subsystems); + } + + app_json_config_load_subsystem(ctx); +} + +static struct spdk_json_object_decoder subsystem_decoders[] = { + {"subsystem", offsetof(struct load_json_config_ctx, subsystem_name), cap_string}, + {"config", offsetof(struct load_json_config_ctx, config), cap_array_or_null} +}; + +/* + * Start loading subsystem pointed by ctx->subsystems_it. This must point to the + * beginning of the "subsystem" object in "subsystems" array or be NULL. If it is + * NULL then no more subsystems to load. + * + * There are two iterations: + * + * In first iteration only STARTUP RPC methods are used, other methods are ignored. When + * allsubsystems are walked the ctx->subsystems_it became NULL and "framework_start_init" + * is called to let the SPDK move to RUNTIME state (initialize all subsystems) and + * second iteration begins. + * + * In second iteration "subsystems" array is walked through again, this time only + * RUNTIME RPC methods are used. When ctx->subsystems_it became NULL second time it + * indicate that there is no more subsystems to load. The cb_fn is called to finish + * configuration. + */ +static void +app_json_config_load_subsystem(void *_ctx) +{ + struct load_json_config_ctx *ctx = _ctx; + + if (ctx->subsystems_it == NULL) { + if (spdk_rpc_get_state() == SPDK_RPC_STARTUP) { + SPDK_DEBUG_APP_CFG("No more entries for current state, calling 'framework_start_init'\n"); + spdk_subsystem_init(subsystem_init_done, ctx); + } else { + app_json_config_load_done(ctx, 0); + } + + return; + } + + /* Capture subsystem name and config array */ + if (spdk_json_decode_object(ctx->subsystems_it, subsystem_decoders, + SPDK_COUNTOF(subsystem_decoders), ctx)) { + SPDK_ERRLOG("Failed to parse subsystem configuration\n"); + app_json_config_load_done(ctx, -EINVAL); + return; + } + + SPDK_DEBUG_APP_CFG("Loading subsystem '%.*s' configuration\n", ctx->subsystem_name->len, + (char *)ctx->subsystem_name->start); + + /* Get 'config' array first configuration entry */ + ctx->config_it = spdk_json_array_first(ctx->config); + app_json_config_load_subsystem_config_entry(ctx); +} + +static void * +read_file(const char *filename, size_t *size) +{ + FILE *file = fopen(filename, "r"); + void *data; + + if (file == NULL) { + /* errno is set by fopen */ + return NULL; + } + + data = spdk_posix_file_load(file, size); + fclose(file); + return data; +} + +static int +app_json_config_read(const char *config_file, struct load_json_config_ctx *ctx) +{ + struct spdk_json_val *values = NULL; + void *json = NULL, *end; + ssize_t values_cnt, rc; + size_t json_size; + + json = read_file(config_file, &json_size); + if (!json) { + return -errno; + } + + rc = spdk_json_parse(json, json_size, NULL, 0, &end, + SPDK_JSON_PARSE_FLAG_ALLOW_COMMENTS); + if (rc < 0) { + SPDK_ERRLOG("Parsing JSON configuration failed (%zd)\n", rc); + goto err; + } + + values_cnt = rc; + values = calloc(values_cnt, sizeof(struct spdk_json_val)); + if (values == NULL) { + SPDK_ERRLOG("Out of memory\n"); + goto err; + } + + rc = spdk_json_parse(json, json_size, values, values_cnt, &end, + SPDK_JSON_PARSE_FLAG_ALLOW_COMMENTS); + if (rc != values_cnt) { + SPDK_ERRLOG("Parsing JSON configuration failed (%zd)\n", rc); + goto err; + } + + ctx->json_data = json; + ctx->json_data_size = json_size; + + ctx->values = values; + ctx->values_cnt = values_cnt; + + return 0; +err: + free(json); + free(values); + return rc; +} + +void +spdk_app_json_config_load(const char *json_config_file, const char *rpc_addr, + spdk_subsystem_init_fn cb_fn, void *cb_arg, + bool stop_on_error) +{ + struct load_json_config_ctx *ctx = calloc(1, sizeof(*ctx)); + int rc; + + assert(cb_fn); + if (!ctx) { + cb_fn(-ENOMEM, cb_arg); + return; + } + + ctx->cb_fn = cb_fn; + ctx->cb_arg = cb_arg; + ctx->stop_on_error = stop_on_error; + ctx->thread = spdk_get_thread(); + + rc = app_json_config_read(json_config_file, ctx); + if (rc) { + goto fail; + } + + /* Capture subsystems array */ + rc = spdk_json_find_array(ctx->values, "subsystems", NULL, &ctx->subsystems); + if (rc) { + SPDK_WARNLOG("No 'subsystems' key JSON configuration file.\n"); + } else { + /* Get first subsystem */ + ctx->subsystems_it = spdk_json_array_first(ctx->subsystems); + if (ctx->subsystems_it == NULL) { + SPDK_NOTICELOG("'subsystems' configuration is empty\n"); + } + } + + /* If rpc_addr is not an Unix socket use default address as prefix. */ + if (rpc_addr == NULL || rpc_addr[0] != '/') { + rpc_addr = SPDK_DEFAULT_RPC_ADDR; + } + + /* FIXME: rpc client should use socketpair() instead of this temporary socket nonsense */ + rc = snprintf(ctx->rpc_socket_path_temp, sizeof(ctx->rpc_socket_path_temp), "%s.%d_config", + rpc_addr, getpid()); + if (rc >= (int)sizeof(ctx->rpc_socket_path_temp)) { + SPDK_ERRLOG("Socket name create failed\n"); + goto fail; + } + + /* FIXME: spdk_rpc_initialize() function should return error code. */ + spdk_rpc_initialize(ctx->rpc_socket_path_temp); + ctx->client_conn = spdk_jsonrpc_client_connect(ctx->rpc_socket_path_temp, AF_UNIX); + if (ctx->client_conn == NULL) { + SPDK_ERRLOG("Failed to connect to '%s'\n", ctx->rpc_socket_path_temp); + goto fail; + } + + rpc_client_set_timeout(ctx, RPC_CLIENT_CONNECT_TIMEOUT_US); + ctx->client_conn_poller = SPDK_POLLER_REGISTER(rpc_client_connect_poller, ctx, 100); + return; + +fail: + app_json_config_load_done(ctx, -EINVAL); +} + +SPDK_LOG_REGISTER_COMPONENT("app_config", SPDK_LOG_APP_CONFIG) diff --git a/src/spdk/lib/event/reactor.c b/src/spdk/lib/event/reactor.c new file mode 100644 index 000000000..cda4a32b1 --- /dev/null +++ b/src/spdk/lib/event/reactor.c @@ -0,0 +1,664 @@ +/*- + * BSD LICENSE + * + * Copyright (c) Intel Corporation. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions + * are met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in + * the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Intel Corporation nor the names of its + * contributors may be used to endorse or promote products derived + * from this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +#include "spdk/stdinc.h" +#include "spdk/likely.h" + +#include "spdk_internal/event.h" +#include "spdk_internal/log.h" +#include "spdk_internal/thread.h" + +#include "spdk/log.h" +#include "spdk/thread.h" +#include "spdk/env.h" +#include "spdk/util.h" + +#ifdef __linux__ +#include <sys/prctl.h> +#endif + +#ifdef __FreeBSD__ +#include <pthread_np.h> +#endif + +#define SPDK_EVENT_BATCH_SIZE 8 + +static struct spdk_reactor *g_reactors; +static struct spdk_cpuset g_reactor_core_mask; +static enum spdk_reactor_state g_reactor_state = SPDK_REACTOR_STATE_UNINITIALIZED; + +static bool g_framework_context_switch_monitor_enabled = true; + +static struct spdk_mempool *g_spdk_event_mempool = NULL; + +static void +reactor_construct(struct spdk_reactor *reactor, uint32_t lcore) +{ + reactor->lcore = lcore; + reactor->flags.is_valid = true; + + TAILQ_INIT(&reactor->threads); + reactor->thread_count = 0; + + reactor->events = spdk_ring_create(SPDK_RING_TYPE_MP_SC, 65536, SPDK_ENV_SOCKET_ID_ANY); + assert(reactor->events != NULL); +} + +struct spdk_reactor * +spdk_reactor_get(uint32_t lcore) +{ + struct spdk_reactor *reactor; + + if (g_reactors == NULL) { + SPDK_WARNLOG("Called spdk_reactor_get() while the g_reactors array was NULL!\n"); + return NULL; + } + + reactor = &g_reactors[lcore]; + + if (reactor->flags.is_valid == false) { + return NULL; + } + + return reactor; +} + +static int reactor_thread_op(struct spdk_thread *thread, enum spdk_thread_op op); +static bool reactor_thread_op_supported(enum spdk_thread_op op); + +int +spdk_reactors_init(void) +{ + int rc; + uint32_t i, last_core; + char mempool_name[32]; + + snprintf(mempool_name, sizeof(mempool_name), "evtpool_%d", getpid()); + g_spdk_event_mempool = spdk_mempool_create(mempool_name, + 262144 - 1, /* Power of 2 minus 1 is optimal for memory consumption */ + sizeof(struct spdk_event), + SPDK_MEMPOOL_DEFAULT_CACHE_SIZE, + SPDK_ENV_SOCKET_ID_ANY); + + if (g_spdk_event_mempool == NULL) { + SPDK_ERRLOG("spdk_event_mempool creation failed\n"); + return -1; + } + + /* struct spdk_reactor must be aligned on 64 byte boundary */ + last_core = spdk_env_get_last_core(); + rc = posix_memalign((void **)&g_reactors, 64, + (last_core + 1) * sizeof(struct spdk_reactor)); + if (rc != 0) { + SPDK_ERRLOG("Could not allocate array size=%u for g_reactors\n", + last_core + 1); + spdk_mempool_free(g_spdk_event_mempool); + return -1; + } + + memset(g_reactors, 0, (last_core + 1) * sizeof(struct spdk_reactor)); + + spdk_thread_lib_init_ext(reactor_thread_op, reactor_thread_op_supported, + sizeof(struct spdk_lw_thread)); + + SPDK_ENV_FOREACH_CORE(i) { + reactor_construct(&g_reactors[i], i); + } + + g_reactor_state = SPDK_REACTOR_STATE_INITIALIZED; + + return 0; +} + +void +spdk_reactors_fini(void) +{ + uint32_t i; + struct spdk_reactor *reactor; + + if (g_reactor_state == SPDK_REACTOR_STATE_UNINITIALIZED) { + return; + } + + spdk_thread_lib_fini(); + + SPDK_ENV_FOREACH_CORE(i) { + reactor = spdk_reactor_get(i); + assert(reactor != NULL); + assert(reactor->thread_count == 0); + if (reactor->events != NULL) { + spdk_ring_free(reactor->events); + } + } + + spdk_mempool_free(g_spdk_event_mempool); + + free(g_reactors); + g_reactors = NULL; +} + +struct spdk_event * +spdk_event_allocate(uint32_t lcore, spdk_event_fn fn, void *arg1, void *arg2) +{ + struct spdk_event *event = NULL; + struct spdk_reactor *reactor = spdk_reactor_get(lcore); + + if (!reactor) { + assert(false); + return NULL; + } + + event = spdk_mempool_get(g_spdk_event_mempool); + if (event == NULL) { + assert(false); + return NULL; + } + + event->lcore = lcore; + event->fn = fn; + event->arg1 = arg1; + event->arg2 = arg2; + + return event; +} + +void +spdk_event_call(struct spdk_event *event) +{ + int rc; + struct spdk_reactor *reactor; + + reactor = spdk_reactor_get(event->lcore); + + assert(reactor != NULL); + assert(reactor->events != NULL); + + rc = spdk_ring_enqueue(reactor->events, (void **)&event, 1, NULL); + if (rc != 1) { + assert(false); + } +} + +static inline uint32_t +event_queue_run_batch(struct spdk_reactor *reactor) +{ + unsigned count, i; + void *events[SPDK_EVENT_BATCH_SIZE]; + struct spdk_thread *thread; + struct spdk_lw_thread *lw_thread; + +#ifdef DEBUG + /* + * spdk_ring_dequeue() fills events and returns how many entries it wrote, + * so we will never actually read uninitialized data from events, but just to be sure + * (and to silence a static analyzer false positive), initialize the array to NULL pointers. + */ + memset(events, 0, sizeof(events)); +#endif + + count = spdk_ring_dequeue(reactor->events, events, SPDK_EVENT_BATCH_SIZE); + if (count == 0) { + return 0; + } + + /* Execute the events. There are still some remaining events + * that must occur on an SPDK thread. To accomodate those, try to + * run them on the first thread in the list, if it exists. */ + lw_thread = TAILQ_FIRST(&reactor->threads); + if (lw_thread) { + thread = spdk_thread_get_from_ctx(lw_thread); + } else { + thread = NULL; + } + + spdk_set_thread(thread); + + for (i = 0; i < count; i++) { + struct spdk_event *event = events[i]; + + assert(event != NULL); + event->fn(event->arg1, event->arg2); + } + + spdk_set_thread(NULL); + + spdk_mempool_put_bulk(g_spdk_event_mempool, events, count); + + return count; +} + +/* 1s */ +#define CONTEXT_SWITCH_MONITOR_PERIOD 1000000 + +static int +get_rusage(struct spdk_reactor *reactor) +{ + struct rusage rusage; + + if (getrusage(RUSAGE_THREAD, &rusage) != 0) { + return -1; + } + + if (rusage.ru_nvcsw != reactor->rusage.ru_nvcsw || rusage.ru_nivcsw != reactor->rusage.ru_nivcsw) { + SPDK_INFOLOG(SPDK_LOG_REACTOR, + "Reactor %d: %ld voluntary context switches and %ld involuntary context switches in the last second.\n", + reactor->lcore, rusage.ru_nvcsw - reactor->rusage.ru_nvcsw, + rusage.ru_nivcsw - reactor->rusage.ru_nivcsw); + } + reactor->rusage = rusage; + + return -1; +} + +void +spdk_framework_enable_context_switch_monitor(bool enable) +{ + /* This global is being read by multiple threads, so this isn't + * strictly thread safe. However, we're toggling between true and + * false here, and if a thread sees the value update later than it + * should, it's no big deal. */ + g_framework_context_switch_monitor_enabled = enable; +} + +bool +spdk_framework_context_switch_monitor_enabled(void) +{ + return g_framework_context_switch_monitor_enabled; +} + +static void +_set_thread_name(const char *thread_name) +{ +#if defined(__linux__) + prctl(PR_SET_NAME, thread_name, 0, 0, 0); +#elif defined(__FreeBSD__) + pthread_set_name_np(pthread_self(), thread_name); +#else +#error missing platform support for thread name +#endif +} + +static int _reactor_schedule_thread(struct spdk_thread *thread); +static uint64_t g_rusage_period; + +static void +_reactor_run(struct spdk_reactor *reactor) +{ + struct spdk_thread *thread; + struct spdk_lw_thread *lw_thread, *tmp; + uint64_t now; + int rc; + + event_queue_run_batch(reactor); + + TAILQ_FOREACH_SAFE(lw_thread, &reactor->threads, link, tmp) { + thread = spdk_thread_get_from_ctx(lw_thread); + rc = spdk_thread_poll(thread, 0, reactor->tsc_last); + + now = spdk_thread_get_last_tsc(thread); + if (rc == 0) { + reactor->idle_tsc += now - reactor->tsc_last; + } else if (rc > 0) { + reactor->busy_tsc += now - reactor->tsc_last; + } + reactor->tsc_last = now; + + if (spdk_unlikely(lw_thread->resched)) { + lw_thread->resched = false; + TAILQ_REMOVE(&reactor->threads, lw_thread, link); + assert(reactor->thread_count > 0); + reactor->thread_count--; + _reactor_schedule_thread(thread); + continue; + } + + if (spdk_unlikely(spdk_thread_is_exited(thread) && + spdk_thread_is_idle(thread))) { + TAILQ_REMOVE(&reactor->threads, lw_thread, link); + assert(reactor->thread_count > 0); + reactor->thread_count--; + spdk_thread_destroy(thread); + continue; + } + } + + if (g_framework_context_switch_monitor_enabled) { + if ((reactor->last_rusage + g_rusage_period) < reactor->tsc_last) { + get_rusage(reactor); + reactor->last_rusage = reactor->tsc_last; + } + } +} + +static int +reactor_run(void *arg) +{ + struct spdk_reactor *reactor = arg; + struct spdk_thread *thread; + struct spdk_lw_thread *lw_thread, *tmp; + char thread_name[32]; + + SPDK_NOTICELOG("Reactor started on core %u\n", reactor->lcore); + + /* Rename the POSIX thread because the reactor is tied to the POSIX + * thread in the SPDK event library. + */ + snprintf(thread_name, sizeof(thread_name), "reactor_%u", reactor->lcore); + _set_thread_name(thread_name); + + reactor->tsc_last = spdk_get_ticks(); + + while (1) { + _reactor_run(reactor); + + if (g_reactor_state != SPDK_REACTOR_STATE_RUNNING) { + break; + } + } + + TAILQ_FOREACH(lw_thread, &reactor->threads, link) { + thread = spdk_thread_get_from_ctx(lw_thread); + spdk_set_thread(thread); + spdk_thread_exit(thread); + } + + while (!TAILQ_EMPTY(&reactor->threads)) { + TAILQ_FOREACH_SAFE(lw_thread, &reactor->threads, link, tmp) { + thread = spdk_thread_get_from_ctx(lw_thread); + spdk_set_thread(thread); + if (spdk_thread_is_exited(thread)) { + TAILQ_REMOVE(&reactor->threads, lw_thread, link); + assert(reactor->thread_count > 0); + reactor->thread_count--; + spdk_thread_destroy(thread); + } else { + spdk_thread_poll(thread, 0, 0); + } + } + } + + return 0; +} + +int +spdk_app_parse_core_mask(const char *mask, struct spdk_cpuset *cpumask) +{ + int ret; + struct spdk_cpuset *validmask; + + ret = spdk_cpuset_parse(cpumask, mask); + if (ret < 0) { + return ret; + } + + validmask = spdk_app_get_core_mask(); + spdk_cpuset_and(cpumask, validmask); + + return 0; +} + +struct spdk_cpuset * +spdk_app_get_core_mask(void) +{ + return &g_reactor_core_mask; +} + +void +spdk_reactors_start(void) +{ + struct spdk_reactor *reactor; + struct spdk_cpuset tmp_cpumask = {}; + uint32_t i, current_core; + int rc; + char thread_name[32]; + + g_rusage_period = (CONTEXT_SWITCH_MONITOR_PERIOD * spdk_get_ticks_hz()) / SPDK_SEC_TO_USEC; + g_reactor_state = SPDK_REACTOR_STATE_RUNNING; + + current_core = spdk_env_get_current_core(); + SPDK_ENV_FOREACH_CORE(i) { + if (i != current_core) { + reactor = spdk_reactor_get(i); + if (reactor == NULL) { + continue; + } + + rc = spdk_env_thread_launch_pinned(reactor->lcore, reactor_run, reactor); + if (rc < 0) { + SPDK_ERRLOG("Unable to start reactor thread on core %u\n", reactor->lcore); + assert(false); + return; + } + + /* For now, for each reactor spawn one thread. */ + snprintf(thread_name, sizeof(thread_name), "reactor_%u", reactor->lcore); + + spdk_cpuset_zero(&tmp_cpumask); + spdk_cpuset_set_cpu(&tmp_cpumask, i, true); + + spdk_thread_create(thread_name, &tmp_cpumask); + } + spdk_cpuset_set_cpu(&g_reactor_core_mask, i, true); + } + + /* Start the master reactor */ + reactor = spdk_reactor_get(current_core); + assert(reactor != NULL); + reactor_run(reactor); + + spdk_env_thread_wait_all(); + + g_reactor_state = SPDK_REACTOR_STATE_SHUTDOWN; +} + +void +spdk_reactors_stop(void *arg1) +{ + g_reactor_state = SPDK_REACTOR_STATE_EXITING; +} + +static pthread_mutex_t g_scheduler_mtx = PTHREAD_MUTEX_INITIALIZER; +static uint32_t g_next_core = UINT32_MAX; + +static void +_schedule_thread(void *arg1, void *arg2) +{ + struct spdk_lw_thread *lw_thread = arg1; + struct spdk_thread *thread; + struct spdk_cpuset *cpumask; + struct spdk_reactor *reactor; + uint32_t current_core; + + current_core = spdk_env_get_current_core(); + + thread = spdk_thread_get_from_ctx(lw_thread); + cpumask = spdk_thread_get_cpumask(thread); + if (!spdk_cpuset_get_cpu(cpumask, current_core)) { + SPDK_ERRLOG("Thread was scheduled to the wrong core %d\n", current_core); + assert(false); + } + + reactor = spdk_reactor_get(current_core); + assert(reactor != NULL); + + TAILQ_INSERT_TAIL(&reactor->threads, lw_thread, link); + reactor->thread_count++; +} + +static int +_reactor_schedule_thread(struct spdk_thread *thread) +{ + uint32_t core; + struct spdk_lw_thread *lw_thread; + struct spdk_event *evt = NULL; + struct spdk_cpuset *cpumask; + uint32_t i; + + cpumask = spdk_thread_get_cpumask(thread); + + lw_thread = spdk_thread_get_ctx(thread); + assert(lw_thread != NULL); + memset(lw_thread, 0, sizeof(*lw_thread)); + + pthread_mutex_lock(&g_scheduler_mtx); + for (i = 0; i < spdk_env_get_core_count(); i++) { + if (g_next_core > spdk_env_get_last_core()) { + g_next_core = spdk_env_get_first_core(); + } + core = g_next_core; + g_next_core = spdk_env_get_next_core(g_next_core); + + if (spdk_cpuset_get_cpu(cpumask, core)) { + evt = spdk_event_allocate(core, _schedule_thread, lw_thread, NULL); + break; + } + } + pthread_mutex_unlock(&g_scheduler_mtx); + + assert(evt != NULL); + if (evt == NULL) { + SPDK_ERRLOG("Unable to schedule thread on requested core mask.\n"); + return -1; + } + + lw_thread->tsc_start = spdk_get_ticks(); + + spdk_event_call(evt); + + return 0; +} + +static void +_reactor_request_thread_reschedule(struct spdk_thread *thread) +{ + struct spdk_lw_thread *lw_thread; + + assert(thread == spdk_get_thread()); + + lw_thread = spdk_thread_get_ctx(thread); + + assert(lw_thread != NULL); + + lw_thread->resched = true; +} + +static int +reactor_thread_op(struct spdk_thread *thread, enum spdk_thread_op op) +{ + switch (op) { + case SPDK_THREAD_OP_NEW: + return _reactor_schedule_thread(thread); + case SPDK_THREAD_OP_RESCHED: + _reactor_request_thread_reschedule(thread); + return 0; + default: + return -ENOTSUP; + } +} + +static bool +reactor_thread_op_supported(enum spdk_thread_op op) +{ + switch (op) { + case SPDK_THREAD_OP_NEW: + case SPDK_THREAD_OP_RESCHED: + return true; + default: + return false; + } +} + +struct call_reactor { + uint32_t cur_core; + spdk_event_fn fn; + void *arg1; + void *arg2; + + uint32_t orig_core; + spdk_event_fn cpl; +}; + +static void +on_reactor(void *arg1, void *arg2) +{ + struct call_reactor *cr = arg1; + struct spdk_event *evt; + + cr->fn(cr->arg1, cr->arg2); + + cr->cur_core = spdk_env_get_next_core(cr->cur_core); + + if (cr->cur_core > spdk_env_get_last_core()) { + SPDK_DEBUGLOG(SPDK_LOG_REACTOR, "Completed reactor iteration\n"); + + evt = spdk_event_allocate(cr->orig_core, cr->cpl, cr->arg1, cr->arg2); + free(cr); + } else { + SPDK_DEBUGLOG(SPDK_LOG_REACTOR, "Continuing reactor iteration to %d\n", + cr->cur_core); + + evt = spdk_event_allocate(cr->cur_core, on_reactor, arg1, NULL); + } + assert(evt != NULL); + spdk_event_call(evt); +} + +void +spdk_for_each_reactor(spdk_event_fn fn, void *arg1, void *arg2, spdk_event_fn cpl) +{ + struct call_reactor *cr; + struct spdk_event *evt; + + cr = calloc(1, sizeof(*cr)); + if (!cr) { + SPDK_ERRLOG("Unable to perform reactor iteration\n"); + cpl(arg1, arg2); + return; + } + + cr->fn = fn; + cr->arg1 = arg1; + cr->arg2 = arg2; + cr->cpl = cpl; + cr->orig_core = spdk_env_get_current_core(); + cr->cur_core = spdk_env_get_first_core(); + + SPDK_DEBUGLOG(SPDK_LOG_REACTOR, "Starting reactor iteration from %d\n", cr->orig_core); + + evt = spdk_event_allocate(cr->cur_core, on_reactor, cr, NULL); + assert(evt != NULL); + + spdk_event_call(evt); +} + +SPDK_LOG_REGISTER_COMPONENT("reactor", SPDK_LOG_REACTOR) diff --git a/src/spdk/lib/event/rpc.c b/src/spdk/lib/event/rpc.c new file mode 100644 index 000000000..a42d5ebeb --- /dev/null +++ b/src/spdk/lib/event/rpc.c @@ -0,0 +1,87 @@ +/*- + * BSD LICENSE + * + * Copyright (c) Intel Corporation. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions + * are met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in + * the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Intel Corporation nor the names of its + * contributors may be used to endorse or promote products derived + * from this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +#include "spdk/stdinc.h" + +#include "spdk/conf.h" +#include "spdk/env.h" +#include "spdk/thread.h" +#include "spdk/log.h" +#include "spdk/rpc.h" + +#include "spdk_internal/event.h" + +#define RPC_SELECT_INTERVAL 4000 /* 4ms */ + +static struct spdk_poller *g_rpc_poller = NULL; + +static int +rpc_subsystem_poll(void *arg) +{ + spdk_rpc_accept(); + return SPDK_POLLER_BUSY; +} + +void +spdk_rpc_initialize(const char *listen_addr) +{ + int rc; + + if (listen_addr == NULL) { + return; + } + + if (!spdk_rpc_verify_methods()) { + spdk_app_stop(-EINVAL); + return; + } + + /* Listen on the requested address */ + rc = spdk_rpc_listen(listen_addr); + if (rc != 0) { + SPDK_ERRLOG("Unable to start RPC service at %s\n", listen_addr); + return; + } + + spdk_rpc_set_state(SPDK_RPC_STARTUP); + + /* Register a poller to periodically check for RPCs */ + g_rpc_poller = SPDK_POLLER_REGISTER(rpc_subsystem_poll, NULL, RPC_SELECT_INTERVAL); +} + +void +spdk_rpc_finish(void) +{ + spdk_rpc_close(); + spdk_poller_unregister(&g_rpc_poller); +} diff --git a/src/spdk/lib/event/spdk_event.map b/src/spdk/lib/event/spdk_event.map new file mode 100644 index 000000000..8208c5e1f --- /dev/null +++ b/src/spdk/lib/event/spdk_event.map @@ -0,0 +1,46 @@ +{ + global: + + # Public functions + spdk_app_opts_init; + spdk_app_start; + spdk_app_fini; + spdk_app_start_shutdown; + spdk_app_stop; + spdk_app_get_running_config; + spdk_app_get_shm_id; + spdk_app_parse_core_mask; + spdk_app_get_core_mask; + spdk_app_parse_args; + spdk_app_usage; + spdk_event_allocate; + spdk_event_call; + spdk_framework_enable_context_switch_monitor; + spdk_framework_context_switch_monitor_enabled; + + # Functions used by other SPDK libraries + spdk_reactors_init; + spdk_reactors_fini; + spdk_reactors_start; + spdk_reactors_stop; + spdk_reactor_get; + spdk_for_each_reactor; + spdk_subsystem_find; + spdk_subsystem_get_first; + spdk_subsystem_get_next; + spdk_subsystem_get_first_depend; + spdk_subsystem_get_next_depend; + spdk_add_subsystem; + spdk_add_subsystem_depend; + spdk_subsystem_init; + spdk_subsystem_fini; + spdk_subsystem_init_next; + spdk_subsystem_fini_next; + spdk_subsystem_config; + spdk_app_json_config_load; + spdk_subsystem_config_json; + spdk_rpc_initialize; + spdk_rpc_finish; + + local: *; +}; diff --git a/src/spdk/lib/event/subsystem.c b/src/spdk/lib/event/subsystem.c new file mode 100644 index 000000000..2cff890b2 --- /dev/null +++ b/src/spdk/lib/event/subsystem.c @@ -0,0 +1,288 @@ +/*- + * BSD LICENSE + * + * Copyright (c) Intel Corporation. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions + * are met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in + * the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Intel Corporation nor the names of its + * contributors may be used to endorse or promote products derived + * from this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +#include "spdk/stdinc.h" + +#include "spdk/log.h" +#include "spdk/thread.h" + +#include "spdk_internal/event.h" +#include "spdk/env.h" + +TAILQ_HEAD(spdk_subsystem_list, spdk_subsystem); +struct spdk_subsystem_list g_subsystems = TAILQ_HEAD_INITIALIZER(g_subsystems); + +TAILQ_HEAD(spdk_subsystem_depend_list, spdk_subsystem_depend); +struct spdk_subsystem_depend_list g_subsystems_deps = TAILQ_HEAD_INITIALIZER(g_subsystems_deps); +static struct spdk_subsystem *g_next_subsystem; +static bool g_subsystems_initialized = false; +static bool g_subsystems_init_interrupted = false; +static spdk_subsystem_init_fn g_subsystem_start_fn = NULL; +static void *g_subsystem_start_arg = NULL; +static spdk_msg_fn g_subsystem_stop_fn = NULL; +static void *g_subsystem_stop_arg = NULL; +static struct spdk_thread *g_fini_thread = NULL; + +void +spdk_add_subsystem(struct spdk_subsystem *subsystem) +{ + TAILQ_INSERT_TAIL(&g_subsystems, subsystem, tailq); +} + +void +spdk_add_subsystem_depend(struct spdk_subsystem_depend *depend) +{ + TAILQ_INSERT_TAIL(&g_subsystems_deps, depend, tailq); +} + +static struct spdk_subsystem * +_subsystem_find(struct spdk_subsystem_list *list, const char *name) +{ + struct spdk_subsystem *iter; + + TAILQ_FOREACH(iter, list, tailq) { + if (strcmp(name, iter->name) == 0) { + return iter; + } + } + + return NULL; +} + +struct spdk_subsystem * +spdk_subsystem_find(const char *name) +{ + return _subsystem_find(&g_subsystems, name); +} + +struct spdk_subsystem * +spdk_subsystem_get_first(void) +{ + return TAILQ_FIRST(&g_subsystems); +} + +struct spdk_subsystem * +spdk_subsystem_get_next(struct spdk_subsystem *cur_subsystem) +{ + return TAILQ_NEXT(cur_subsystem, tailq); +} + + +struct spdk_subsystem_depend * +spdk_subsystem_get_first_depend(void) +{ + return TAILQ_FIRST(&g_subsystems_deps); +} + +struct spdk_subsystem_depend * +spdk_subsystem_get_next_depend(struct spdk_subsystem_depend *cur_depend) +{ + return TAILQ_NEXT(cur_depend, tailq); +} + +static void +subsystem_sort(void) +{ + bool depends_on, depends_on_sorted; + struct spdk_subsystem *subsystem, *subsystem_tmp; + struct spdk_subsystem_depend *subsystem_dep; + + struct spdk_subsystem_list subsystems_list = TAILQ_HEAD_INITIALIZER(subsystems_list); + + while (!TAILQ_EMPTY(&g_subsystems)) { + TAILQ_FOREACH_SAFE(subsystem, &g_subsystems, tailq, subsystem_tmp) { + depends_on = false; + TAILQ_FOREACH(subsystem_dep, &g_subsystems_deps, tailq) { + if (strcmp(subsystem->name, subsystem_dep->name) == 0) { + depends_on = true; + depends_on_sorted = !!_subsystem_find(&subsystems_list, subsystem_dep->depends_on); + if (depends_on_sorted) { + continue; + } + break; + } + } + + if (depends_on == false) { + TAILQ_REMOVE(&g_subsystems, subsystem, tailq); + TAILQ_INSERT_TAIL(&subsystems_list, subsystem, tailq); + } else { + if (depends_on_sorted == true) { + TAILQ_REMOVE(&g_subsystems, subsystem, tailq); + TAILQ_INSERT_TAIL(&subsystems_list, subsystem, tailq); + } + } + } + } + + TAILQ_FOREACH_SAFE(subsystem, &subsystems_list, tailq, subsystem_tmp) { + TAILQ_REMOVE(&subsystems_list, subsystem, tailq); + TAILQ_INSERT_TAIL(&g_subsystems, subsystem, tailq); + } +} + +void +spdk_subsystem_init_next(int rc) +{ + /* The initialization is interrupted by the spdk_subsystem_fini, so just return */ + if (g_subsystems_init_interrupted) { + return; + } + + if (rc) { + SPDK_ERRLOG("Init subsystem %s failed\n", g_next_subsystem->name); + g_subsystem_start_fn(rc, g_subsystem_start_arg); + return; + } + + if (!g_next_subsystem) { + g_next_subsystem = TAILQ_FIRST(&g_subsystems); + } else { + g_next_subsystem = TAILQ_NEXT(g_next_subsystem, tailq); + } + + if (!g_next_subsystem) { + g_subsystems_initialized = true; + g_subsystem_start_fn(0, g_subsystem_start_arg); + return; + } + + if (g_next_subsystem->init) { + g_next_subsystem->init(); + } else { + spdk_subsystem_init_next(0); + } +} + +void +spdk_subsystem_init(spdk_subsystem_init_fn cb_fn, void *cb_arg) +{ + struct spdk_subsystem_depend *dep; + + g_subsystem_start_fn = cb_fn; + g_subsystem_start_arg = cb_arg; + + /* Verify that all dependency name and depends_on subsystems are registered */ + TAILQ_FOREACH(dep, &g_subsystems_deps, tailq) { + if (!spdk_subsystem_find(dep->name)) { + SPDK_ERRLOG("subsystem %s is missing\n", dep->name); + g_subsystem_start_fn(-1, g_subsystem_start_arg); + return; + } + if (!spdk_subsystem_find(dep->depends_on)) { + SPDK_ERRLOG("subsystem %s dependency %s is missing\n", + dep->name, dep->depends_on); + g_subsystem_start_fn(-1, g_subsystem_start_arg); + return; + } + } + + subsystem_sort(); + + spdk_subsystem_init_next(0); +} + +static void +subsystem_fini_next(void *arg1) +{ + assert(g_fini_thread == spdk_get_thread()); + + if (!g_next_subsystem) { + /* If the initialized flag is false, then we've failed to initialize + * the very first subsystem and no de-init is needed + */ + if (g_subsystems_initialized) { + g_next_subsystem = TAILQ_LAST(&g_subsystems, spdk_subsystem_list); + } + } else { + if (g_subsystems_initialized || g_subsystems_init_interrupted) { + g_next_subsystem = TAILQ_PREV(g_next_subsystem, spdk_subsystem_list, tailq); + } else { + g_subsystems_init_interrupted = true; + } + } + + while (g_next_subsystem) { + if (g_next_subsystem->fini) { + g_next_subsystem->fini(); + return; + } + g_next_subsystem = TAILQ_PREV(g_next_subsystem, spdk_subsystem_list, tailq); + } + + g_subsystem_stop_fn(g_subsystem_stop_arg); + return; +} + +void +spdk_subsystem_fini_next(void) +{ + if (g_fini_thread != spdk_get_thread()) { + spdk_thread_send_msg(g_fini_thread, subsystem_fini_next, NULL); + } else { + subsystem_fini_next(NULL); + } +} + +void +spdk_subsystem_fini(spdk_msg_fn cb_fn, void *cb_arg) +{ + g_subsystem_stop_fn = cb_fn; + g_subsystem_stop_arg = cb_arg; + + g_fini_thread = spdk_get_thread(); + + spdk_subsystem_fini_next(); +} + +void +spdk_subsystem_config(FILE *fp) +{ + struct spdk_subsystem *subsystem; + + TAILQ_FOREACH(subsystem, &g_subsystems, tailq) { + if (subsystem->config) { + subsystem->config(fp); + } + } +} + +void +spdk_subsystem_config_json(struct spdk_json_write_ctx *w, struct spdk_subsystem *subsystem) +{ + if (subsystem && subsystem->write_config_json) { + subsystem->write_config_json(w); + } else { + spdk_json_write_null(w); + } +} |