From 19fcec84d8d7d21e796c7624e521b60d28ee21ed Mon Sep 17 00:00:00 2001 From: Daniel Baumann Date: Sun, 7 Apr 2024 20:45:59 +0200 Subject: Adding upstream version 16.2.11+ds. Signed-off-by: Daniel Baumann --- src/spdk/lib/thread/Makefile | 45 + src/spdk/lib/thread/spdk_thread.map | 55 ++ src/spdk/lib/thread/thread.c | 1636 +++++++++++++++++++++++++++++++++++ 3 files changed, 1736 insertions(+) create mode 100644 src/spdk/lib/thread/Makefile create mode 100644 src/spdk/lib/thread/spdk_thread.map create mode 100644 src/spdk/lib/thread/thread.c (limited to 'src/spdk/lib/thread') diff --git a/src/spdk/lib/thread/Makefile b/src/spdk/lib/thread/Makefile new file mode 100644 index 000000000..ceb7a394e --- /dev/null +++ b/src/spdk/lib/thread/Makefile @@ -0,0 +1,45 @@ +# +# BSD LICENSE +# +# Copyright (c) Intel Corporation. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions +# are met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above copyright +# notice, this list of conditions and the following disclaimer in +# the documentation and/or other materials provided with the +# distribution. +# * Neither the name of Intel Corporation nor the names of its +# contributors may be used to endorse or promote products derived +# from this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. +# + +SPDK_ROOT_DIR := $(abspath $(CURDIR)/../..) +include $(SPDK_ROOT_DIR)/mk/spdk.common.mk + +SO_VER := 3 +SO_MINOR := 0 + +C_SRCS = thread.c +LIBNAME = thread + +SPDK_MAP_FILE = $(abspath $(CURDIR)/spdk_thread.map) + +include $(SPDK_ROOT_DIR)/mk/spdk.lib.mk diff --git a/src/spdk/lib/thread/spdk_thread.map b/src/spdk/lib/thread/spdk_thread.map new file mode 100644 index 000000000..b71fa06eb --- /dev/null +++ b/src/spdk/lib/thread/spdk_thread.map @@ -0,0 +1,55 @@ +{ + global: + + # public functions in spdk/thread.h + spdk_thread_lib_init; + spdk_thread_lib_init_ext; + spdk_thread_lib_fini; + spdk_thread_create; + spdk_set_thread; + spdk_thread_exit; + spdk_thread_is_exited; + spdk_thread_destroy; + spdk_thread_get_ctx; + spdk_thread_get_cpumask; + spdk_thread_set_cpumask; + spdk_thread_get_from_ctx; + spdk_thread_poll; + spdk_thread_next_poller_expiration; + spdk_thread_has_active_pollers; + spdk_thread_has_pollers; + spdk_thread_is_idle; + spdk_thread_get_count; + spdk_get_thread; + spdk_thread_get_name; + spdk_thread_get_id; + spdk_thread_get_by_id; + spdk_thread_get_stats; + spdk_thread_get_last_tsc; + spdk_thread_send_msg; + spdk_thread_send_critical_msg; + spdk_for_each_thread; + spdk_poller_register; + spdk_poller_register_named; + spdk_poller_unregister; + spdk_poller_pause; + spdk_poller_resume; + spdk_io_device_register; + spdk_io_device_unregister; + spdk_get_io_channel; + spdk_put_io_channel; + spdk_io_channel_get_ctx; + spdk_io_channel_from_ctx; + spdk_io_channel_get_thread; + spdk_for_each_channel; + spdk_io_channel_iter_get_io_device; + spdk_io_channel_iter_get_channel; + spdk_io_channel_iter_get_ctx; + spdk_for_each_channel_continue; + + # internal functions in spdk_internal/thread.h + spdk_poller_state_str; + spdk_io_device_get_name; + + local: *; +}; diff --git a/src/spdk/lib/thread/thread.c b/src/spdk/lib/thread/thread.c new file mode 100644 index 000000000..65d91ce35 --- /dev/null +++ b/src/spdk/lib/thread/thread.c @@ -0,0 +1,1636 @@ +/*- + * BSD LICENSE + * + * Copyright (c) Intel Corporation. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions + * are met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in + * the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Intel Corporation nor the names of its + * contributors may be used to endorse or promote products derived + * from this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +#include "spdk/stdinc.h" + +#include "spdk/env.h" +#include "spdk/likely.h" +#include "spdk/queue.h" +#include "spdk/string.h" +#include "spdk/thread.h" +#include "spdk/util.h" + +#include "spdk_internal/log.h" +#include "spdk_internal/thread.h" + +#define SPDK_MSG_BATCH_SIZE 8 +#define SPDK_MAX_DEVICE_NAME_LEN 256 +#define SPDK_THREAD_EXIT_TIMEOUT_SEC 5 + +static pthread_mutex_t g_devlist_mutex = PTHREAD_MUTEX_INITIALIZER; + +static spdk_new_thread_fn g_new_thread_fn = NULL; +static spdk_thread_op_fn g_thread_op_fn = NULL; +static spdk_thread_op_supported_fn g_thread_op_supported_fn; +static size_t g_ctx_sz = 0; +/* Monotonic increasing ID is set to each created thread beginning at 1. Once the + * ID exceeds UINT64_MAX, further thread creation is not allowed and restarting + * SPDK application is required. + */ +static uint64_t g_thread_id = 1; + +struct io_device { + void *io_device; + char name[SPDK_MAX_DEVICE_NAME_LEN + 1]; + spdk_io_channel_create_cb create_cb; + spdk_io_channel_destroy_cb destroy_cb; + spdk_io_device_unregister_cb unregister_cb; + struct spdk_thread *unregister_thread; + uint32_t ctx_size; + uint32_t for_each_count; + TAILQ_ENTRY(io_device) tailq; + + uint32_t refcnt; + + bool unregistered; +}; + +static TAILQ_HEAD(, io_device) g_io_devices = TAILQ_HEAD_INITIALIZER(g_io_devices); + +struct spdk_msg { + spdk_msg_fn fn; + void *arg; + + SLIST_ENTRY(spdk_msg) link; +}; + +#define SPDK_MSG_MEMPOOL_CACHE_SIZE 1024 +static struct spdk_mempool *g_spdk_msg_mempool = NULL; + +static TAILQ_HEAD(, spdk_thread) g_threads = TAILQ_HEAD_INITIALIZER(g_threads); +static uint32_t g_thread_count = 0; + +static __thread struct spdk_thread *tls_thread = NULL; + +static inline struct spdk_thread * +_get_thread(void) +{ + return tls_thread; +} + +static int +_thread_lib_init(size_t ctx_sz) +{ + char mempool_name[SPDK_MAX_MEMZONE_NAME_LEN]; + + g_ctx_sz = ctx_sz; + + snprintf(mempool_name, sizeof(mempool_name), "msgpool_%d", getpid()); + g_spdk_msg_mempool = spdk_mempool_create(mempool_name, + 262144 - 1, /* Power of 2 minus 1 is optimal for memory consumption */ + sizeof(struct spdk_msg), + 0, /* No cache. We do our own. */ + SPDK_ENV_SOCKET_ID_ANY); + + if (!g_spdk_msg_mempool) { + return -1; + } + + return 0; +} + +int +spdk_thread_lib_init(spdk_new_thread_fn new_thread_fn, size_t ctx_sz) +{ + assert(g_new_thread_fn == NULL); + assert(g_thread_op_fn == NULL); + + if (new_thread_fn == NULL) { + SPDK_INFOLOG(SPDK_LOG_THREAD, "new_thread_fn was not specified at spdk_thread_lib_init\n"); + } else { + g_new_thread_fn = new_thread_fn; + } + + return _thread_lib_init(ctx_sz); +} + +int +spdk_thread_lib_init_ext(spdk_thread_op_fn thread_op_fn, + spdk_thread_op_supported_fn thread_op_supported_fn, + size_t ctx_sz) +{ + assert(g_new_thread_fn == NULL); + assert(g_thread_op_fn == NULL); + assert(g_thread_op_supported_fn == NULL); + + if ((thread_op_fn != NULL) != (thread_op_supported_fn != NULL)) { + SPDK_ERRLOG("Both must be defined or undefined together.\n"); + return -EINVAL; + } + + if (thread_op_fn == NULL && thread_op_supported_fn == NULL) { + SPDK_INFOLOG(SPDK_LOG_THREAD, "thread_op_fn and thread_op_supported_fn were not specified\n"); + } else { + g_thread_op_fn = thread_op_fn; + g_thread_op_supported_fn = thread_op_supported_fn; + } + + return _thread_lib_init(ctx_sz); +} + +void +spdk_thread_lib_fini(void) +{ + struct io_device *dev; + + TAILQ_FOREACH(dev, &g_io_devices, tailq) { + SPDK_ERRLOG("io_device %s not unregistered\n", dev->name); + } + + if (g_spdk_msg_mempool) { + spdk_mempool_free(g_spdk_msg_mempool); + g_spdk_msg_mempool = NULL; + } + + g_new_thread_fn = NULL; + g_thread_op_fn = NULL; + g_thread_op_supported_fn = NULL; + g_ctx_sz = 0; +} + +static void +_free_thread(struct spdk_thread *thread) +{ + struct spdk_io_channel *ch; + struct spdk_msg *msg; + struct spdk_poller *poller, *ptmp; + + TAILQ_FOREACH(ch, &thread->io_channels, tailq) { + SPDK_ERRLOG("thread %s still has channel for io_device %s\n", + thread->name, ch->dev->name); + } + + TAILQ_FOREACH_SAFE(poller, &thread->active_pollers, tailq, ptmp) { + if (poller->state != SPDK_POLLER_STATE_UNREGISTERED) { + SPDK_WARNLOG("poller %s still registered at thread exit\n", + poller->name); + } + TAILQ_REMOVE(&thread->active_pollers, poller, tailq); + free(poller); + } + + TAILQ_FOREACH_SAFE(poller, &thread->timed_pollers, tailq, ptmp) { + if (poller->state != SPDK_POLLER_STATE_UNREGISTERED) { + SPDK_WARNLOG("poller %s still registered at thread exit\n", + poller->name); + } + TAILQ_REMOVE(&thread->timed_pollers, poller, tailq); + free(poller); + } + + TAILQ_FOREACH_SAFE(poller, &thread->paused_pollers, tailq, ptmp) { + SPDK_WARNLOG("poller %s still registered at thread exit\n", poller->name); + TAILQ_REMOVE(&thread->paused_pollers, poller, tailq); + free(poller); + } + + pthread_mutex_lock(&g_devlist_mutex); + assert(g_thread_count > 0); + g_thread_count--; + TAILQ_REMOVE(&g_threads, thread, tailq); + pthread_mutex_unlock(&g_devlist_mutex); + + msg = SLIST_FIRST(&thread->msg_cache); + while (msg != NULL) { + SLIST_REMOVE_HEAD(&thread->msg_cache, link); + + assert(thread->msg_cache_count > 0); + thread->msg_cache_count--; + spdk_mempool_put(g_spdk_msg_mempool, msg); + + msg = SLIST_FIRST(&thread->msg_cache); + } + + assert(thread->msg_cache_count == 0); + + spdk_ring_free(thread->messages); + free(thread); +} + +struct spdk_thread * +spdk_thread_create(const char *name, struct spdk_cpuset *cpumask) +{ + struct spdk_thread *thread; + struct spdk_msg *msgs[SPDK_MSG_MEMPOOL_CACHE_SIZE]; + int rc = 0, i; + + thread = calloc(1, sizeof(*thread) + g_ctx_sz); + if (!thread) { + SPDK_ERRLOG("Unable to allocate memory for thread\n"); + return NULL; + } + + if (cpumask) { + spdk_cpuset_copy(&thread->cpumask, cpumask); + } else { + spdk_cpuset_negate(&thread->cpumask); + } + + TAILQ_INIT(&thread->io_channels); + TAILQ_INIT(&thread->active_pollers); + TAILQ_INIT(&thread->timed_pollers); + TAILQ_INIT(&thread->paused_pollers); + SLIST_INIT(&thread->msg_cache); + thread->msg_cache_count = 0; + + thread->tsc_last = spdk_get_ticks(); + + thread->messages = spdk_ring_create(SPDK_RING_TYPE_MP_SC, 65536, SPDK_ENV_SOCKET_ID_ANY); + if (!thread->messages) { + SPDK_ERRLOG("Unable to allocate memory for message ring\n"); + free(thread); + return NULL; + } + + /* Fill the local message pool cache. */ + rc = spdk_mempool_get_bulk(g_spdk_msg_mempool, (void **)msgs, SPDK_MSG_MEMPOOL_CACHE_SIZE); + if (rc == 0) { + /* If we can't populate the cache it's ok. The cache will get filled + * up organically as messages are passed to the thread. */ + for (i = 0; i < SPDK_MSG_MEMPOOL_CACHE_SIZE; i++) { + SLIST_INSERT_HEAD(&thread->msg_cache, msgs[i], link); + thread->msg_cache_count++; + } + } + + if (name) { + snprintf(thread->name, sizeof(thread->name), "%s", name); + } else { + snprintf(thread->name, sizeof(thread->name), "%p", thread); + } + + pthread_mutex_lock(&g_devlist_mutex); + if (g_thread_id == 0) { + SPDK_ERRLOG("Thread ID rolled over. Further thread creation is not allowed.\n"); + pthread_mutex_unlock(&g_devlist_mutex); + _free_thread(thread); + return NULL; + } + thread->id = g_thread_id++; + TAILQ_INSERT_TAIL(&g_threads, thread, tailq); + g_thread_count++; + pthread_mutex_unlock(&g_devlist_mutex); + + SPDK_DEBUGLOG(SPDK_LOG_THREAD, "Allocating new thread (%" PRIu64 ", %s)\n", + thread->id, thread->name); + + if (g_new_thread_fn) { + rc = g_new_thread_fn(thread); + } else if (g_thread_op_supported_fn && g_thread_op_supported_fn(SPDK_THREAD_OP_NEW)) { + rc = g_thread_op_fn(thread, SPDK_THREAD_OP_NEW); + } + + if (rc != 0) { + _free_thread(thread); + return NULL; + } + + thread->state = SPDK_THREAD_STATE_RUNNING; + + return thread; +} + +void +spdk_set_thread(struct spdk_thread *thread) +{ + tls_thread = thread; +} + +static void +thread_exit(struct spdk_thread *thread, uint64_t now) +{ + struct spdk_poller *poller; + struct spdk_io_channel *ch; + + if (now >= thread->exit_timeout_tsc) { + SPDK_ERRLOG("thread %s got timeout, and move it to the exited state forcefully\n", + thread->name); + goto exited; + } + + TAILQ_FOREACH(poller, &thread->active_pollers, tailq) { + if (poller->state != SPDK_POLLER_STATE_UNREGISTERED) { + SPDK_INFOLOG(SPDK_LOG_THREAD, + "thread %s still has active poller %s\n", + thread->name, poller->name); + return; + } + } + + TAILQ_FOREACH(poller, &thread->timed_pollers, tailq) { + if (poller->state != SPDK_POLLER_STATE_UNREGISTERED) { + SPDK_INFOLOG(SPDK_LOG_THREAD, + "thread %s still has active timed poller %s\n", + thread->name, poller->name); + return; + } + } + + TAILQ_FOREACH(poller, &thread->paused_pollers, tailq) { + SPDK_INFOLOG(SPDK_LOG_THREAD, + "thread %s still has paused poller %s\n", + thread->name, poller->name); + return; + } + + TAILQ_FOREACH(ch, &thread->io_channels, tailq) { + SPDK_INFOLOG(SPDK_LOG_THREAD, + "thread %s still has channel for io_device %s\n", + thread->name, ch->dev->name); + return; + } + +exited: + thread->state = SPDK_THREAD_STATE_EXITED; +} + +int +spdk_thread_exit(struct spdk_thread *thread) +{ + SPDK_DEBUGLOG(SPDK_LOG_THREAD, "Exit thread %s\n", thread->name); + + assert(tls_thread == thread); + + if (thread->state >= SPDK_THREAD_STATE_EXITING) { + SPDK_INFOLOG(SPDK_LOG_THREAD, + "thread %s is already exiting\n", + thread->name); + return 0; + } + + thread->exit_timeout_tsc = spdk_get_ticks() + (spdk_get_ticks_hz() * + SPDK_THREAD_EXIT_TIMEOUT_SEC); + thread->state = SPDK_THREAD_STATE_EXITING; + return 0; +} + +bool +spdk_thread_is_exited(struct spdk_thread *thread) +{ + return thread->state == SPDK_THREAD_STATE_EXITED; +} + +void +spdk_thread_destroy(struct spdk_thread *thread) +{ + SPDK_DEBUGLOG(SPDK_LOG_THREAD, "Destroy thread %s\n", thread->name); + + assert(thread->state == SPDK_THREAD_STATE_EXITED); + + if (tls_thread == thread) { + tls_thread = NULL; + } + + _free_thread(thread); +} + +void * +spdk_thread_get_ctx(struct spdk_thread *thread) +{ + if (g_ctx_sz > 0) { + return thread->ctx; + } + + return NULL; +} + +struct spdk_cpuset * +spdk_thread_get_cpumask(struct spdk_thread *thread) +{ + return &thread->cpumask; +} + +int +spdk_thread_set_cpumask(struct spdk_cpuset *cpumask) +{ + struct spdk_thread *thread; + + if (!g_thread_op_supported_fn || !g_thread_op_supported_fn(SPDK_THREAD_OP_RESCHED)) { + SPDK_ERRLOG("Framework does not support reschedule operation.\n"); + assert(false); + return -ENOTSUP; + } + + thread = spdk_get_thread(); + if (!thread) { + SPDK_ERRLOG("Called from non-SPDK thread\n"); + assert(false); + return -EINVAL; + } + + spdk_cpuset_copy(&thread->cpumask, cpumask); + + /* Invoke framework's reschedule operation. If this function is called multiple times + * in a single spdk_thread_poll() context, the last cpumask will be used in the + * reschedule operation. + */ + g_thread_op_fn(thread, SPDK_THREAD_OP_RESCHED); + + return 0; +} + +struct spdk_thread * +spdk_thread_get_from_ctx(void *ctx) +{ + if (ctx == NULL) { + assert(false); + return NULL; + } + + assert(g_ctx_sz > 0); + + return SPDK_CONTAINEROF(ctx, struct spdk_thread, ctx); +} + +static inline uint32_t +msg_queue_run_batch(struct spdk_thread *thread, uint32_t max_msgs) +{ + unsigned count, i; + void *messages[SPDK_MSG_BATCH_SIZE]; + +#ifdef DEBUG + /* + * spdk_ring_dequeue() fills messages and returns how many entries it wrote, + * so we will never actually read uninitialized data from events, but just to be sure + * (and to silence a static analyzer false positive), initialize the array to NULL pointers. + */ + memset(messages, 0, sizeof(messages)); +#endif + + if (max_msgs > 0) { + max_msgs = spdk_min(max_msgs, SPDK_MSG_BATCH_SIZE); + } else { + max_msgs = SPDK_MSG_BATCH_SIZE; + } + + count = spdk_ring_dequeue(thread->messages, messages, max_msgs); + if (count == 0) { + return 0; + } + + for (i = 0; i < count; i++) { + struct spdk_msg *msg = messages[i]; + + assert(msg != NULL); + msg->fn(msg->arg); + + if (thread->msg_cache_count < SPDK_MSG_MEMPOOL_CACHE_SIZE) { + /* Insert the messages at the head. We want to re-use the hot + * ones. */ + SLIST_INSERT_HEAD(&thread->msg_cache, msg, link); + thread->msg_cache_count++; + } else { + spdk_mempool_put(g_spdk_msg_mempool, msg); + } + } + + return count; +} + +static void +poller_insert_timer(struct spdk_thread *thread, struct spdk_poller *poller, uint64_t now) +{ + struct spdk_poller *iter; + + poller->next_run_tick = now + poller->period_ticks; + + /* + * Insert poller in the thread's timed_pollers list in sorted order by next scheduled + * run time. + */ + TAILQ_FOREACH_REVERSE(iter, &thread->timed_pollers, timed_pollers_head, tailq) { + if (iter->next_run_tick <= poller->next_run_tick) { + TAILQ_INSERT_AFTER(&thread->timed_pollers, iter, poller, tailq); + return; + } + } + + /* No earlier pollers were found, so this poller must be the new head */ + TAILQ_INSERT_HEAD(&thread->timed_pollers, poller, tailq); +} + +static void +thread_insert_poller(struct spdk_thread *thread, struct spdk_poller *poller) +{ + if (poller->period_ticks) { + poller_insert_timer(thread, poller, spdk_get_ticks()); + } else { + TAILQ_INSERT_TAIL(&thread->active_pollers, poller, tailq); + } +} + +static inline void +thread_update_stats(struct spdk_thread *thread, uint64_t end, + uint64_t start, int rc) +{ + if (rc == 0) { + /* Poller status idle */ + thread->stats.idle_tsc += end - start; + } else if (rc > 0) { + /* Poller status busy */ + thread->stats.busy_tsc += end - start; + } + /* Store end time to use it as start time of the next spdk_thread_poll(). */ + thread->tsc_last = end; +} + +static int +thread_poll(struct spdk_thread *thread, uint32_t max_msgs, uint64_t now) +{ + uint32_t msg_count; + struct spdk_poller *poller, *tmp; + spdk_msg_fn critical_msg; + int rc = 0; + + critical_msg = thread->critical_msg; + if (spdk_unlikely(critical_msg != NULL)) { + critical_msg(NULL); + thread->critical_msg = NULL; + } + + msg_count = msg_queue_run_batch(thread, max_msgs); + if (msg_count) { + rc = 1; + } + + TAILQ_FOREACH_REVERSE_SAFE(poller, &thread->active_pollers, + active_pollers_head, tailq, tmp) { + int poller_rc; + + if (poller->state == SPDK_POLLER_STATE_UNREGISTERED) { + TAILQ_REMOVE(&thread->active_pollers, poller, tailq); + free(poller); + continue; + } else if (poller->state == SPDK_POLLER_STATE_PAUSING) { + TAILQ_REMOVE(&thread->active_pollers, poller, tailq); + TAILQ_INSERT_TAIL(&thread->paused_pollers, poller, tailq); + poller->state = SPDK_POLLER_STATE_PAUSED; + continue; + } + + poller->state = SPDK_POLLER_STATE_RUNNING; + poller_rc = poller->fn(poller->arg); + + poller->run_count++; + if (poller_rc > 0) { + poller->busy_count++; + } + +#ifdef DEBUG + if (poller_rc == -1) { + SPDK_DEBUGLOG(SPDK_LOG_THREAD, "Poller %s returned -1\n", poller->name); + } +#endif + + if (poller->state == SPDK_POLLER_STATE_UNREGISTERED) { + TAILQ_REMOVE(&thread->active_pollers, poller, tailq); + free(poller); + } else if (poller->state != SPDK_POLLER_STATE_PAUSED) { + poller->state = SPDK_POLLER_STATE_WAITING; + } + + if (poller_rc > rc) { + rc = poller_rc; + } + } + + TAILQ_FOREACH_SAFE(poller, &thread->timed_pollers, tailq, tmp) { + int timer_rc = 0; + + if (poller->state == SPDK_POLLER_STATE_UNREGISTERED) { + TAILQ_REMOVE(&thread->timed_pollers, poller, tailq); + free(poller); + continue; + } else if (poller->state == SPDK_POLLER_STATE_PAUSING) { + TAILQ_REMOVE(&thread->timed_pollers, poller, tailq); + TAILQ_INSERT_TAIL(&thread->paused_pollers, poller, tailq); + poller->state = SPDK_POLLER_STATE_PAUSED; + continue; + } + + if (now < poller->next_run_tick) { + break; + } + + poller->state = SPDK_POLLER_STATE_RUNNING; + timer_rc = poller->fn(poller->arg); + + poller->run_count++; + if (timer_rc > 0) { + poller->busy_count++; + } + +#ifdef DEBUG + if (timer_rc == -1) { + SPDK_DEBUGLOG(SPDK_LOG_THREAD, "Timed poller %s returned -1\n", poller->name); + } +#endif + + if (poller->state == SPDK_POLLER_STATE_UNREGISTERED) { + TAILQ_REMOVE(&thread->timed_pollers, poller, tailq); + free(poller); + } else if (poller->state != SPDK_POLLER_STATE_PAUSED) { + poller->state = SPDK_POLLER_STATE_WAITING; + TAILQ_REMOVE(&thread->timed_pollers, poller, tailq); + poller_insert_timer(thread, poller, now); + } + + if (timer_rc > rc) { + rc = timer_rc; + } + } + + return rc; +} + +int +spdk_thread_poll(struct spdk_thread *thread, uint32_t max_msgs, uint64_t now) +{ + struct spdk_thread *orig_thread; + int rc; + + orig_thread = _get_thread(); + tls_thread = thread; + + if (now == 0) { + now = spdk_get_ticks(); + } + + rc = thread_poll(thread, max_msgs, now); + + if (spdk_unlikely(thread->state == SPDK_THREAD_STATE_EXITING)) { + thread_exit(thread, now); + } + + thread_update_stats(thread, spdk_get_ticks(), now, rc); + + tls_thread = orig_thread; + + return rc; +} + +uint64_t +spdk_thread_next_poller_expiration(struct spdk_thread *thread) +{ + struct spdk_poller *poller; + + poller = TAILQ_FIRST(&thread->timed_pollers); + if (poller) { + return poller->next_run_tick; + } + + return 0; +} + +int +spdk_thread_has_active_pollers(struct spdk_thread *thread) +{ + return !TAILQ_EMPTY(&thread->active_pollers); +} + +static bool +thread_has_unpaused_pollers(struct spdk_thread *thread) +{ + if (TAILQ_EMPTY(&thread->active_pollers) && + TAILQ_EMPTY(&thread->timed_pollers)) { + return false; + } + + return true; +} + +bool +spdk_thread_has_pollers(struct spdk_thread *thread) +{ + if (!thread_has_unpaused_pollers(thread) && + TAILQ_EMPTY(&thread->paused_pollers)) { + return false; + } + + return true; +} + +bool +spdk_thread_is_idle(struct spdk_thread *thread) +{ + if (spdk_ring_count(thread->messages) || + thread_has_unpaused_pollers(thread) || + thread->critical_msg != NULL) { + return false; + } + + return true; +} + +uint32_t +spdk_thread_get_count(void) +{ + /* + * Return cached value of the current thread count. We could acquire the + * lock and iterate through the TAILQ of threads to count them, but that + * count could still be invalidated after we release the lock. + */ + return g_thread_count; +} + +struct spdk_thread * +spdk_get_thread(void) +{ + return _get_thread(); +} + +const char * +spdk_thread_get_name(const struct spdk_thread *thread) +{ + return thread->name; +} + +uint64_t +spdk_thread_get_id(const struct spdk_thread *thread) +{ + return thread->id; +} + +struct spdk_thread * +spdk_thread_get_by_id(uint64_t id) +{ + struct spdk_thread *thread; + + pthread_mutex_lock(&g_devlist_mutex); + TAILQ_FOREACH(thread, &g_threads, tailq) { + if (thread->id == id) { + pthread_mutex_unlock(&g_devlist_mutex); + + return thread; + } + } + pthread_mutex_unlock(&g_devlist_mutex); + + return NULL; +} + +int +spdk_thread_get_stats(struct spdk_thread_stats *stats) +{ + struct spdk_thread *thread; + + thread = _get_thread(); + if (!thread) { + SPDK_ERRLOG("No thread allocated\n"); + return -EINVAL; + } + + if (stats == NULL) { + return -EINVAL; + } + + *stats = thread->stats; + + return 0; +} + +uint64_t +spdk_thread_get_last_tsc(struct spdk_thread *thread) +{ + return thread->tsc_last; +} + +int +spdk_thread_send_msg(const struct spdk_thread *thread, spdk_msg_fn fn, void *ctx) +{ + struct spdk_thread *local_thread; + struct spdk_msg *msg; + int rc; + + assert(thread != NULL); + + if (spdk_unlikely(thread->state == SPDK_THREAD_STATE_EXITED)) { + SPDK_ERRLOG("Thread %s is marked as exited.\n", thread->name); + return -EIO; + } + + local_thread = _get_thread(); + + msg = NULL; + if (local_thread != NULL) { + if (local_thread->msg_cache_count > 0) { + msg = SLIST_FIRST(&local_thread->msg_cache); + assert(msg != NULL); + SLIST_REMOVE_HEAD(&local_thread->msg_cache, link); + local_thread->msg_cache_count--; + } + } + + if (msg == NULL) { + msg = spdk_mempool_get(g_spdk_msg_mempool); + if (!msg) { + SPDK_ERRLOG("msg could not be allocated\n"); + return -ENOMEM; + } + } + + msg->fn = fn; + msg->arg = ctx; + + rc = spdk_ring_enqueue(thread->messages, (void **)&msg, 1, NULL); + if (rc != 1) { + SPDK_ERRLOG("msg could not be enqueued\n"); + spdk_mempool_put(g_spdk_msg_mempool, msg); + return -EIO; + } + + return 0; +} + +int +spdk_thread_send_critical_msg(struct spdk_thread *thread, spdk_msg_fn fn) +{ + spdk_msg_fn expected = NULL; + + if (__atomic_compare_exchange_n(&thread->critical_msg, &expected, fn, false, __ATOMIC_SEQ_CST, + __ATOMIC_SEQ_CST)) { + return 0; + } + + return -EIO; +} + +static struct spdk_poller * +poller_register(spdk_poller_fn fn, + void *arg, + uint64_t period_microseconds, + const char *name) +{ + struct spdk_thread *thread; + struct spdk_poller *poller; + uint64_t quotient, remainder, ticks; + + thread = spdk_get_thread(); + if (!thread) { + assert(false); + return NULL; + } + + if (spdk_unlikely(thread->state == SPDK_THREAD_STATE_EXITED)) { + SPDK_ERRLOG("thread %s is marked as exited\n", thread->name); + return NULL; + } + + poller = calloc(1, sizeof(*poller)); + if (poller == NULL) { + SPDK_ERRLOG("Poller memory allocation failed\n"); + return NULL; + } + + if (name) { + snprintf(poller->name, sizeof(poller->name), "%s", name); + } else { + snprintf(poller->name, sizeof(poller->name), "%p", fn); + } + + poller->state = SPDK_POLLER_STATE_WAITING; + poller->fn = fn; + poller->arg = arg; + poller->thread = thread; + + if (period_microseconds) { + quotient = period_microseconds / SPDK_SEC_TO_USEC; + remainder = period_microseconds % SPDK_SEC_TO_USEC; + ticks = spdk_get_ticks_hz(); + + poller->period_ticks = ticks * quotient + (ticks * remainder) / SPDK_SEC_TO_USEC; + } else { + poller->period_ticks = 0; + } + + thread_insert_poller(thread, poller); + + return poller; +} + +struct spdk_poller * +spdk_poller_register(spdk_poller_fn fn, + void *arg, + uint64_t period_microseconds) +{ + return poller_register(fn, arg, period_microseconds, NULL); +} + +struct spdk_poller * +spdk_poller_register_named(spdk_poller_fn fn, + void *arg, + uint64_t period_microseconds, + const char *name) +{ + return poller_register(fn, arg, period_microseconds, name); +} + +void +spdk_poller_unregister(struct spdk_poller **ppoller) +{ + struct spdk_thread *thread; + struct spdk_poller *poller; + + poller = *ppoller; + if (poller == NULL) { + return; + } + + *ppoller = NULL; + + thread = spdk_get_thread(); + if (!thread) { + assert(false); + return; + } + + if (poller->thread != thread) { + SPDK_ERRLOG("different from the thread that called spdk_poller_register()\n"); + assert(false); + return; + } + + /* If the poller was paused, put it on the active_pollers list so that + * its unregistration can be processed by spdk_thread_poll(). + */ + if (poller->state == SPDK_POLLER_STATE_PAUSED) { + TAILQ_REMOVE(&thread->paused_pollers, poller, tailq); + TAILQ_INSERT_TAIL(&thread->active_pollers, poller, tailq); + poller->period_ticks = 0; + } + + /* Simply set the state to unregistered. The poller will get cleaned up + * in a subsequent call to spdk_thread_poll(). + */ + poller->state = SPDK_POLLER_STATE_UNREGISTERED; +} + +void +spdk_poller_pause(struct spdk_poller *poller) +{ + struct spdk_thread *thread; + + if (poller->state == SPDK_POLLER_STATE_PAUSED || + poller->state == SPDK_POLLER_STATE_PAUSING) { + return; + } + + thread = spdk_get_thread(); + if (!thread) { + assert(false); + return; + } + + /* If a poller is paused from within itself, we can immediately move it + * on the paused_pollers list. Otherwise we just set its state to + * SPDK_POLLER_STATE_PAUSING and let spdk_thread_poll() move it. It + * allows a poller to be paused from another one's context without + * breaking the TAILQ_FOREACH_REVERSE_SAFE iteration. + */ + if (poller->state != SPDK_POLLER_STATE_RUNNING) { + poller->state = SPDK_POLLER_STATE_PAUSING; + } else { + if (poller->period_ticks > 0) { + TAILQ_REMOVE(&thread->timed_pollers, poller, tailq); + } else { + TAILQ_REMOVE(&thread->active_pollers, poller, tailq); + } + + TAILQ_INSERT_TAIL(&thread->paused_pollers, poller, tailq); + poller->state = SPDK_POLLER_STATE_PAUSED; + } +} + +void +spdk_poller_resume(struct spdk_poller *poller) +{ + struct spdk_thread *thread; + + if (poller->state != SPDK_POLLER_STATE_PAUSED && + poller->state != SPDK_POLLER_STATE_PAUSING) { + return; + } + + thread = spdk_get_thread(); + if (!thread) { + assert(false); + return; + } + + /* If a poller is paused it has to be removed from the paused pollers + * list and put on the active / timer list depending on its + * period_ticks. If a poller is still in the process of being paused, + * we just need to flip its state back to waiting, as it's already on + * the appropriate list. + */ + if (poller->state == SPDK_POLLER_STATE_PAUSED) { + TAILQ_REMOVE(&thread->paused_pollers, poller, tailq); + thread_insert_poller(thread, poller); + } + + poller->state = SPDK_POLLER_STATE_WAITING; +} + +const char * +spdk_poller_state_str(enum spdk_poller_state state) +{ + switch (state) { + case SPDK_POLLER_STATE_WAITING: + return "waiting"; + case SPDK_POLLER_STATE_RUNNING: + return "running"; + case SPDK_POLLER_STATE_UNREGISTERED: + return "unregistered"; + case SPDK_POLLER_STATE_PAUSING: + return "pausing"; + case SPDK_POLLER_STATE_PAUSED: + return "paused"; + default: + return NULL; + } +} + +struct call_thread { + struct spdk_thread *cur_thread; + spdk_msg_fn fn; + void *ctx; + + struct spdk_thread *orig_thread; + spdk_msg_fn cpl; +}; + +static void +_on_thread(void *ctx) +{ + struct call_thread *ct = ctx; + int rc __attribute__((unused)); + + ct->fn(ct->ctx); + + pthread_mutex_lock(&g_devlist_mutex); + ct->cur_thread = TAILQ_NEXT(ct->cur_thread, tailq); + pthread_mutex_unlock(&g_devlist_mutex); + + if (!ct->cur_thread) { + SPDK_DEBUGLOG(SPDK_LOG_THREAD, "Completed thread iteration\n"); + + rc = spdk_thread_send_msg(ct->orig_thread, ct->cpl, ct->ctx); + free(ctx); + } else { + SPDK_DEBUGLOG(SPDK_LOG_THREAD, "Continuing thread iteration to %s\n", + ct->cur_thread->name); + + rc = spdk_thread_send_msg(ct->cur_thread, _on_thread, ctx); + } + assert(rc == 0); +} + +void +spdk_for_each_thread(spdk_msg_fn fn, void *ctx, spdk_msg_fn cpl) +{ + struct call_thread *ct; + struct spdk_thread *thread; + int rc __attribute__((unused)); + + ct = calloc(1, sizeof(*ct)); + if (!ct) { + SPDK_ERRLOG("Unable to perform thread iteration\n"); + cpl(ctx); + return; + } + + ct->fn = fn; + ct->ctx = ctx; + ct->cpl = cpl; + + thread = _get_thread(); + if (!thread) { + SPDK_ERRLOG("No thread allocated\n"); + free(ct); + cpl(ctx); + return; + } + ct->orig_thread = thread; + + pthread_mutex_lock(&g_devlist_mutex); + ct->cur_thread = TAILQ_FIRST(&g_threads); + pthread_mutex_unlock(&g_devlist_mutex); + + SPDK_DEBUGLOG(SPDK_LOG_THREAD, "Starting thread iteration from %s\n", + ct->orig_thread->name); + + rc = spdk_thread_send_msg(ct->cur_thread, _on_thread, ct); + assert(rc == 0); +} + +void +spdk_io_device_register(void *io_device, spdk_io_channel_create_cb create_cb, + spdk_io_channel_destroy_cb destroy_cb, uint32_t ctx_size, + const char *name) +{ + struct io_device *dev, *tmp; + struct spdk_thread *thread; + + assert(io_device != NULL); + assert(create_cb != NULL); + assert(destroy_cb != NULL); + + thread = spdk_get_thread(); + if (!thread) { + SPDK_ERRLOG("called from non-SPDK thread\n"); + assert(false); + return; + } + + dev = calloc(1, sizeof(struct io_device)); + if (dev == NULL) { + SPDK_ERRLOG("could not allocate io_device\n"); + return; + } + + dev->io_device = io_device; + if (name) { + snprintf(dev->name, sizeof(dev->name), "%s", name); + } else { + snprintf(dev->name, sizeof(dev->name), "%p", dev); + } + dev->create_cb = create_cb; + dev->destroy_cb = destroy_cb; + dev->unregister_cb = NULL; + dev->ctx_size = ctx_size; + dev->for_each_count = 0; + dev->unregistered = false; + dev->refcnt = 0; + + SPDK_DEBUGLOG(SPDK_LOG_THREAD, "Registering io_device %s (%p) on thread %s\n", + dev->name, dev->io_device, thread->name); + + pthread_mutex_lock(&g_devlist_mutex); + TAILQ_FOREACH(tmp, &g_io_devices, tailq) { + if (tmp->io_device == io_device) { + SPDK_ERRLOG("io_device %p already registered (old:%s new:%s)\n", + io_device, tmp->name, dev->name); + free(dev); + pthread_mutex_unlock(&g_devlist_mutex); + return; + } + } + TAILQ_INSERT_TAIL(&g_io_devices, dev, tailq); + pthread_mutex_unlock(&g_devlist_mutex); +} + +static void +_finish_unregister(void *arg) +{ + struct io_device *dev = arg; + + SPDK_DEBUGLOG(SPDK_LOG_THREAD, "Finishing unregistration of io_device %s (%p) on thread %s\n", + dev->name, dev->io_device, dev->unregister_thread->name); + + dev->unregister_cb(dev->io_device); + free(dev); +} + +static void +io_device_free(struct io_device *dev) +{ + int rc __attribute__((unused)); + + if (dev->unregister_cb == NULL) { + free(dev); + } else { + assert(dev->unregister_thread != NULL); + SPDK_DEBUGLOG(SPDK_LOG_THREAD, "io_device %s (%p) needs to unregister from thread %s\n", + dev->name, dev->io_device, dev->unregister_thread->name); + rc = spdk_thread_send_msg(dev->unregister_thread, _finish_unregister, dev); + assert(rc == 0); + } +} + +void +spdk_io_device_unregister(void *io_device, spdk_io_device_unregister_cb unregister_cb) +{ + struct io_device *dev; + uint32_t refcnt; + struct spdk_thread *thread; + + thread = spdk_get_thread(); + if (!thread) { + SPDK_ERRLOG("called from non-SPDK thread\n"); + assert(false); + return; + } + + pthread_mutex_lock(&g_devlist_mutex); + TAILQ_FOREACH(dev, &g_io_devices, tailq) { + if (dev->io_device == io_device) { + break; + } + } + + if (!dev) { + SPDK_ERRLOG("io_device %p not found\n", io_device); + assert(false); + pthread_mutex_unlock(&g_devlist_mutex); + return; + } + + if (dev->for_each_count > 0) { + SPDK_ERRLOG("io_device %s (%p) has %u for_each calls outstanding\n", + dev->name, io_device, dev->for_each_count); + pthread_mutex_unlock(&g_devlist_mutex); + return; + } + + dev->unregister_cb = unregister_cb; + dev->unregistered = true; + TAILQ_REMOVE(&g_io_devices, dev, tailq); + refcnt = dev->refcnt; + dev->unregister_thread = thread; + pthread_mutex_unlock(&g_devlist_mutex); + + SPDK_DEBUGLOG(SPDK_LOG_THREAD, "Unregistering io_device %s (%p) from thread %s\n", + dev->name, dev->io_device, thread->name); + + if (refcnt > 0) { + /* defer deletion */ + return; + } + + io_device_free(dev); +} + +const char * +spdk_io_device_get_name(struct io_device *dev) +{ + return dev->name; +} + +struct spdk_io_channel * +spdk_get_io_channel(void *io_device) +{ + struct spdk_io_channel *ch; + struct spdk_thread *thread; + struct io_device *dev; + int rc; + + pthread_mutex_lock(&g_devlist_mutex); + TAILQ_FOREACH(dev, &g_io_devices, tailq) { + if (dev->io_device == io_device) { + break; + } + } + if (dev == NULL) { + SPDK_ERRLOG("could not find io_device %p\n", io_device); + pthread_mutex_unlock(&g_devlist_mutex); + return NULL; + } + + thread = _get_thread(); + if (!thread) { + SPDK_ERRLOG("No thread allocated\n"); + pthread_mutex_unlock(&g_devlist_mutex); + return NULL; + } + + if (spdk_unlikely(thread->state == SPDK_THREAD_STATE_EXITED)) { + SPDK_ERRLOG("Thread %s is marked as exited\n", thread->name); + pthread_mutex_unlock(&g_devlist_mutex); + return NULL; + } + + TAILQ_FOREACH(ch, &thread->io_channels, tailq) { + if (ch->dev == dev) { + ch->ref++; + + SPDK_DEBUGLOG(SPDK_LOG_THREAD, "Get io_channel %p for io_device %s (%p) on thread %s refcnt %u\n", + ch, dev->name, dev->io_device, thread->name, ch->ref); + + /* + * An I/O channel already exists for this device on this + * thread, so return it. + */ + pthread_mutex_unlock(&g_devlist_mutex); + return ch; + } + } + + ch = calloc(1, sizeof(*ch) + dev->ctx_size); + if (ch == NULL) { + SPDK_ERRLOG("could not calloc spdk_io_channel\n"); + pthread_mutex_unlock(&g_devlist_mutex); + return NULL; + } + + ch->dev = dev; + ch->destroy_cb = dev->destroy_cb; + ch->thread = thread; + ch->ref = 1; + ch->destroy_ref = 0; + TAILQ_INSERT_TAIL(&thread->io_channels, ch, tailq); + + SPDK_DEBUGLOG(SPDK_LOG_THREAD, "Get io_channel %p for io_device %s (%p) on thread %s refcnt %u\n", + ch, dev->name, dev->io_device, thread->name, ch->ref); + + dev->refcnt++; + + pthread_mutex_unlock(&g_devlist_mutex); + + rc = dev->create_cb(io_device, (uint8_t *)ch + sizeof(*ch)); + if (rc != 0) { + pthread_mutex_lock(&g_devlist_mutex); + TAILQ_REMOVE(&ch->thread->io_channels, ch, tailq); + dev->refcnt--; + free(ch); + pthread_mutex_unlock(&g_devlist_mutex); + return NULL; + } + + return ch; +} + +static void +put_io_channel(void *arg) +{ + struct spdk_io_channel *ch = arg; + bool do_remove_dev = true; + struct spdk_thread *thread; + + thread = spdk_get_thread(); + if (!thread) { + SPDK_ERRLOG("called from non-SPDK thread\n"); + assert(false); + return; + } + + SPDK_DEBUGLOG(SPDK_LOG_THREAD, + "Releasing io_channel %p for io_device %s (%p) on thread %s\n", + ch, ch->dev->name, ch->dev->io_device, thread->name); + + assert(ch->thread == thread); + + ch->destroy_ref--; + + if (ch->ref > 0 || ch->destroy_ref > 0) { + /* + * Another reference to the associated io_device was requested + * after this message was sent but before it had a chance to + * execute. + */ + return; + } + + pthread_mutex_lock(&g_devlist_mutex); + TAILQ_REMOVE(&ch->thread->io_channels, ch, tailq); + pthread_mutex_unlock(&g_devlist_mutex); + + /* Don't hold the devlist mutex while the destroy_cb is called. */ + ch->destroy_cb(ch->dev->io_device, spdk_io_channel_get_ctx(ch)); + + pthread_mutex_lock(&g_devlist_mutex); + ch->dev->refcnt--; + + if (!ch->dev->unregistered) { + do_remove_dev = false; + } + + if (ch->dev->refcnt > 0) { + do_remove_dev = false; + } + + pthread_mutex_unlock(&g_devlist_mutex); + + if (do_remove_dev) { + io_device_free(ch->dev); + } + free(ch); +} + +void +spdk_put_io_channel(struct spdk_io_channel *ch) +{ + struct spdk_thread *thread; + int rc __attribute__((unused)); + + thread = spdk_get_thread(); + if (!thread) { + SPDK_ERRLOG("called from non-SPDK thread\n"); + assert(false); + return; + } + + if (ch->thread != thread) { + SPDK_ERRLOG("different from the thread that called get_io_channel()\n"); + assert(false); + return; + } + + SPDK_DEBUGLOG(SPDK_LOG_THREAD, + "Putting io_channel %p for io_device %s (%p) on thread %s refcnt %u\n", + ch, ch->dev->name, ch->dev->io_device, thread->name, ch->ref); + + ch->ref--; + + if (ch->ref == 0) { + ch->destroy_ref++; + rc = spdk_thread_send_msg(thread, put_io_channel, ch); + assert(rc == 0); + } +} + +struct spdk_io_channel * +spdk_io_channel_from_ctx(void *ctx) +{ + return (struct spdk_io_channel *)((uint8_t *)ctx - sizeof(struct spdk_io_channel)); +} + +struct spdk_thread * +spdk_io_channel_get_thread(struct spdk_io_channel *ch) +{ + return ch->thread; +} + +struct spdk_io_channel_iter { + void *io_device; + struct io_device *dev; + spdk_channel_msg fn; + int status; + void *ctx; + struct spdk_io_channel *ch; + + struct spdk_thread *cur_thread; + + struct spdk_thread *orig_thread; + spdk_channel_for_each_cpl cpl; +}; + +void * +spdk_io_channel_iter_get_io_device(struct spdk_io_channel_iter *i) +{ + return i->io_device; +} + +struct spdk_io_channel * +spdk_io_channel_iter_get_channel(struct spdk_io_channel_iter *i) +{ + return i->ch; +} + +void * +spdk_io_channel_iter_get_ctx(struct spdk_io_channel_iter *i) +{ + return i->ctx; +} + +static void +_call_completion(void *ctx) +{ + struct spdk_io_channel_iter *i = ctx; + + if (i->cpl != NULL) { + i->cpl(i, i->status); + } + free(i); +} + +static void +_call_channel(void *ctx) +{ + struct spdk_io_channel_iter *i = ctx; + struct spdk_io_channel *ch; + + /* + * It is possible that the channel was deleted before this + * message had a chance to execute. If so, skip calling + * the fn() on this thread. + */ + pthread_mutex_lock(&g_devlist_mutex); + TAILQ_FOREACH(ch, &i->cur_thread->io_channels, tailq) { + if (ch->dev->io_device == i->io_device) { + break; + } + } + pthread_mutex_unlock(&g_devlist_mutex); + + if (ch) { + i->fn(i); + } else { + spdk_for_each_channel_continue(i, 0); + } +} + +void +spdk_for_each_channel(void *io_device, spdk_channel_msg fn, void *ctx, + spdk_channel_for_each_cpl cpl) +{ + struct spdk_thread *thread; + struct spdk_io_channel *ch; + struct spdk_io_channel_iter *i; + int rc __attribute__((unused)); + + i = calloc(1, sizeof(*i)); + if (!i) { + SPDK_ERRLOG("Unable to allocate iterator\n"); + return; + } + + i->io_device = io_device; + i->fn = fn; + i->ctx = ctx; + i->cpl = cpl; + + pthread_mutex_lock(&g_devlist_mutex); + i->orig_thread = _get_thread(); + + TAILQ_FOREACH(thread, &g_threads, tailq) { + TAILQ_FOREACH(ch, &thread->io_channels, tailq) { + if (ch->dev->io_device == io_device) { + ch->dev->for_each_count++; + i->dev = ch->dev; + i->cur_thread = thread; + i->ch = ch; + pthread_mutex_unlock(&g_devlist_mutex); + rc = spdk_thread_send_msg(thread, _call_channel, i); + assert(rc == 0); + return; + } + } + } + + pthread_mutex_unlock(&g_devlist_mutex); + + rc = spdk_thread_send_msg(i->orig_thread, _call_completion, i); + assert(rc == 0); +} + +void +spdk_for_each_channel_continue(struct spdk_io_channel_iter *i, int status) +{ + struct spdk_thread *thread; + struct spdk_io_channel *ch; + int rc __attribute__((unused)); + + assert(i->cur_thread == spdk_get_thread()); + + i->status = status; + + pthread_mutex_lock(&g_devlist_mutex); + if (status) { + goto end; + } + thread = TAILQ_NEXT(i->cur_thread, tailq); + while (thread) { + TAILQ_FOREACH(ch, &thread->io_channels, tailq) { + if (ch->dev->io_device == i->io_device) { + i->cur_thread = thread; + i->ch = ch; + pthread_mutex_unlock(&g_devlist_mutex); + rc = spdk_thread_send_msg(thread, _call_channel, i); + assert(rc == 0); + return; + } + } + thread = TAILQ_NEXT(thread, tailq); + } + +end: + i->dev->for_each_count--; + i->ch = NULL; + pthread_mutex_unlock(&g_devlist_mutex); + + rc = spdk_thread_send_msg(i->orig_thread, _call_completion, i); + assert(rc == 0); +} + + +SPDK_LOG_REGISTER_COMPONENT("thread", SPDK_LOG_THREAD) -- cgit v1.2.3