diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-07 18:45:59 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-07 18:45:59 +0000 |
commit | 19fcec84d8d7d21e796c7624e521b60d28ee21ed (patch) | |
tree | 42d26aa27d1e3f7c0b8bd3fd14e7d7082f5008dc /src/spdk/dpdk/lib/librte_ring | |
parent | Initial commit. (diff) | |
download | ceph-19fcec84d8d7d21e796c7624e521b60d28ee21ed.tar.xz ceph-19fcec84d8d7d21e796c7624e521b60d28ee21ed.zip |
Adding upstream version 16.2.11+ds.upstream/16.2.11+dsupstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'src/spdk/dpdk/lib/librte_ring')
-rw-r--r-- | src/spdk/dpdk/lib/librte_ring/Makefile | 30 | ||||
-rw-r--r-- | src/spdk/dpdk/lib/librte_ring/meson.build | 15 | ||||
-rw-r--r-- | src/spdk/dpdk/lib/librte_ring/rte_ring.c | 418 | ||||
-rw-r--r-- | src/spdk/dpdk/lib/librte_ring/rte_ring.h | 1037 | ||||
-rw-r--r-- | src/spdk/dpdk/lib/librte_ring/rte_ring_c11_mem.h | 181 | ||||
-rw-r--r-- | src/spdk/dpdk/lib/librte_ring/rte_ring_core.h | 184 | ||||
-rw-r--r-- | src/spdk/dpdk/lib/librte_ring/rte_ring_elem.h | 1102 | ||||
-rw-r--r-- | src/spdk/dpdk/lib/librte_ring/rte_ring_generic.h | 173 | ||||
-rw-r--r-- | src/spdk/dpdk/lib/librte_ring/rte_ring_hts.h | 332 | ||||
-rw-r--r-- | src/spdk/dpdk/lib/librte_ring/rte_ring_hts_c11_mem.h | 164 | ||||
-rw-r--r-- | src/spdk/dpdk/lib/librte_ring/rte_ring_peek.h | 454 | ||||
-rw-r--r-- | src/spdk/dpdk/lib/librte_ring/rte_ring_peek_c11_mem.h | 110 | ||||
-rw-r--r-- | src/spdk/dpdk/lib/librte_ring/rte_ring_rts.h | 439 | ||||
-rw-r--r-- | src/spdk/dpdk/lib/librte_ring/rte_ring_rts_c11_mem.h | 179 | ||||
-rw-r--r-- | src/spdk/dpdk/lib/librte_ring/rte_ring_version.map | 24 |
15 files changed, 4842 insertions, 0 deletions
diff --git a/src/spdk/dpdk/lib/librte_ring/Makefile b/src/spdk/dpdk/lib/librte_ring/Makefile new file mode 100644 index 000000000..83a9d0840 --- /dev/null +++ b/src/spdk/dpdk/lib/librte_ring/Makefile @@ -0,0 +1,30 @@ +# SPDX-License-Identifier: BSD-3-Clause +# Copyright(c) 2010-2014 Intel Corporation + +include $(RTE_SDK)/mk/rte.vars.mk + +# library name +LIB = librte_ring.a + +CFLAGS += $(WERROR_FLAGS) -I$(SRCDIR) -O3 +LDLIBS += -lrte_eal + +EXPORT_MAP := rte_ring_version.map + +# all source are stored in SRCS-y +SRCS-$(CONFIG_RTE_LIBRTE_RING) := rte_ring.c + +# install includes +SYMLINK-$(CONFIG_RTE_LIBRTE_RING)-include := rte_ring.h \ + rte_ring_core.h \ + rte_ring_elem.h \ + rte_ring_generic.h \ + rte_ring_c11_mem.h \ + rte_ring_hts.h \ + rte_ring_hts_c11_mem.h \ + rte_ring_peek.h \ + rte_ring_peek_c11_mem.h \ + rte_ring_rts.h \ + rte_ring_rts_c11_mem.h + +include $(RTE_SDK)/mk/rte.lib.mk diff --git a/src/spdk/dpdk/lib/librte_ring/meson.build b/src/spdk/dpdk/lib/librte_ring/meson.build new file mode 100644 index 000000000..31c0b4649 --- /dev/null +++ b/src/spdk/dpdk/lib/librte_ring/meson.build @@ -0,0 +1,15 @@ +# SPDX-License-Identifier: BSD-3-Clause +# Copyright(c) 2017 Intel Corporation + +sources = files('rte_ring.c') +headers = files('rte_ring.h', + 'rte_ring_core.h', + 'rte_ring_elem.h', + 'rte_ring_c11_mem.h', + 'rte_ring_generic.h', + 'rte_ring_hts.h', + 'rte_ring_hts_c11_mem.h', + 'rte_ring_peek.h', + 'rte_ring_peek_c11_mem.h', + 'rte_ring_rts.h', + 'rte_ring_rts_c11_mem.h') diff --git a/src/spdk/dpdk/lib/librte_ring/rte_ring.c b/src/spdk/dpdk/lib/librte_ring/rte_ring.c new file mode 100644 index 000000000..ebe5ccf0d --- /dev/null +++ b/src/spdk/dpdk/lib/librte_ring/rte_ring.c @@ -0,0 +1,418 @@ +/* SPDX-License-Identifier: BSD-3-Clause + * + * Copyright (c) 2010-2015 Intel Corporation + * Copyright (c) 2007,2008 Kip Macy kmacy@freebsd.org + * All rights reserved. + * Derived from FreeBSD's bufring.h + * Used as BSD-3 Licensed with permission from Kip Macy. + */ + +#include <stdio.h> +#include <stdarg.h> +#include <string.h> +#include <stdint.h> +#include <inttypes.h> +#include <errno.h> +#include <sys/queue.h> + +#include <rte_common.h> +#include <rte_log.h> +#include <rte_memory.h> +#include <rte_memzone.h> +#include <rte_malloc.h> +#include <rte_launch.h> +#include <rte_eal.h> +#include <rte_eal_memconfig.h> +#include <rte_atomic.h> +#include <rte_per_lcore.h> +#include <rte_lcore.h> +#include <rte_branch_prediction.h> +#include <rte_errno.h> +#include <rte_string_fns.h> +#include <rte_spinlock.h> +#include <rte_tailq.h> + +#include "rte_ring.h" +#include "rte_ring_elem.h" + +TAILQ_HEAD(rte_ring_list, rte_tailq_entry); + +static struct rte_tailq_elem rte_ring_tailq = { + .name = RTE_TAILQ_RING_NAME, +}; +EAL_REGISTER_TAILQ(rte_ring_tailq) + +/* true if x is a power of 2 */ +#define POWEROF2(x) ((((x)-1) & (x)) == 0) + +/* by default set head/tail distance as 1/8 of ring capacity */ +#define HTD_MAX_DEF 8 + +/* return the size of memory occupied by a ring */ +ssize_t +rte_ring_get_memsize_elem(unsigned int esize, unsigned int count) +{ + ssize_t sz; + + /* Check if element size is a multiple of 4B */ + if (esize % 4 != 0) { + RTE_LOG(ERR, RING, "element size is not a multiple of 4\n"); + + return -EINVAL; + } + + /* count must be a power of 2 */ + if ((!POWEROF2(count)) || (count > RTE_RING_SZ_MASK )) { + RTE_LOG(ERR, RING, + "Requested number of elements is invalid, must be power of 2, and not exceed %u\n", + RTE_RING_SZ_MASK); + + return -EINVAL; + } + + sz = sizeof(struct rte_ring) + count * esize; + sz = RTE_ALIGN(sz, RTE_CACHE_LINE_SIZE); + return sz; +} + +/* return the size of memory occupied by a ring */ +ssize_t +rte_ring_get_memsize(unsigned int count) +{ + return rte_ring_get_memsize_elem(sizeof(void *), count); +} + +/* + * internal helper function to reset prod/cons head-tail values. + */ +static void +reset_headtail(void *p) +{ + struct rte_ring_headtail *ht; + struct rte_ring_hts_headtail *ht_hts; + struct rte_ring_rts_headtail *ht_rts; + + ht = p; + ht_hts = p; + ht_rts = p; + + switch (ht->sync_type) { + case RTE_RING_SYNC_MT: + case RTE_RING_SYNC_ST: + ht->head = 0; + ht->tail = 0; + break; + case RTE_RING_SYNC_MT_RTS: + ht_rts->head.raw = 0; + ht_rts->tail.raw = 0; + break; + case RTE_RING_SYNC_MT_HTS: + ht_hts->ht.raw = 0; + break; + default: + /* unknown sync mode */ + RTE_ASSERT(0); + } +} + +void +rte_ring_reset(struct rte_ring *r) +{ + reset_headtail(&r->prod); + reset_headtail(&r->cons); +} + +/* + * helper function, calculates sync_type values for prod and cons + * based on input flags. Returns zero at success or negative + * errno value otherwise. + */ +static int +get_sync_type(uint32_t flags, enum rte_ring_sync_type *prod_st, + enum rte_ring_sync_type *cons_st) +{ + static const uint32_t prod_st_flags = + (RING_F_SP_ENQ | RING_F_MP_RTS_ENQ | RING_F_MP_HTS_ENQ); + static const uint32_t cons_st_flags = + (RING_F_SC_DEQ | RING_F_MC_RTS_DEQ | RING_F_MC_HTS_DEQ); + + switch (flags & prod_st_flags) { + case 0: + *prod_st = RTE_RING_SYNC_MT; + break; + case RING_F_SP_ENQ: + *prod_st = RTE_RING_SYNC_ST; + break; + case RING_F_MP_RTS_ENQ: + *prod_st = RTE_RING_SYNC_MT_RTS; + break; + case RING_F_MP_HTS_ENQ: + *prod_st = RTE_RING_SYNC_MT_HTS; + break; + default: + return -EINVAL; + } + + switch (flags & cons_st_flags) { + case 0: + *cons_st = RTE_RING_SYNC_MT; + break; + case RING_F_SC_DEQ: + *cons_st = RTE_RING_SYNC_ST; + break; + case RING_F_MC_RTS_DEQ: + *cons_st = RTE_RING_SYNC_MT_RTS; + break; + case RING_F_MC_HTS_DEQ: + *cons_st = RTE_RING_SYNC_MT_HTS; + break; + default: + return -EINVAL; + } + + return 0; +} + +int +rte_ring_init(struct rte_ring *r, const char *name, unsigned count, + unsigned flags) +{ + int ret; + + /* compilation-time checks */ + RTE_BUILD_BUG_ON((sizeof(struct rte_ring) & + RTE_CACHE_LINE_MASK) != 0); + RTE_BUILD_BUG_ON((offsetof(struct rte_ring, cons) & + RTE_CACHE_LINE_MASK) != 0); + RTE_BUILD_BUG_ON((offsetof(struct rte_ring, prod) & + RTE_CACHE_LINE_MASK) != 0); + + RTE_BUILD_BUG_ON(offsetof(struct rte_ring_headtail, sync_type) != + offsetof(struct rte_ring_hts_headtail, sync_type)); + RTE_BUILD_BUG_ON(offsetof(struct rte_ring_headtail, tail) != + offsetof(struct rte_ring_hts_headtail, ht.pos.tail)); + + RTE_BUILD_BUG_ON(offsetof(struct rte_ring_headtail, sync_type) != + offsetof(struct rte_ring_rts_headtail, sync_type)); + RTE_BUILD_BUG_ON(offsetof(struct rte_ring_headtail, tail) != + offsetof(struct rte_ring_rts_headtail, tail.val.pos)); + + /* init the ring structure */ + memset(r, 0, sizeof(*r)); + ret = strlcpy(r->name, name, sizeof(r->name)); + if (ret < 0 || ret >= (int)sizeof(r->name)) + return -ENAMETOOLONG; + r->flags = flags; + ret = get_sync_type(flags, &r->prod.sync_type, &r->cons.sync_type); + if (ret != 0) + return ret; + + if (flags & RING_F_EXACT_SZ) { + r->size = rte_align32pow2(count + 1); + r->mask = r->size - 1; + r->capacity = count; + } else { + if ((!POWEROF2(count)) || (count > RTE_RING_SZ_MASK)) { + RTE_LOG(ERR, RING, + "Requested size is invalid, must be power of 2, and not exceed the size limit %u\n", + RTE_RING_SZ_MASK); + return -EINVAL; + } + r->size = count; + r->mask = count - 1; + r->capacity = r->mask; + } + + /* set default values for head-tail distance */ + if (flags & RING_F_MP_RTS_ENQ) + rte_ring_set_prod_htd_max(r, r->capacity / HTD_MAX_DEF); + if (flags & RING_F_MC_RTS_DEQ) + rte_ring_set_cons_htd_max(r, r->capacity / HTD_MAX_DEF); + + return 0; +} + +/* create the ring for a given element size */ +struct rte_ring * +rte_ring_create_elem(const char *name, unsigned int esize, unsigned int count, + int socket_id, unsigned int flags) +{ + char mz_name[RTE_MEMZONE_NAMESIZE]; + struct rte_ring *r; + struct rte_tailq_entry *te; + const struct rte_memzone *mz; + ssize_t ring_size; + int mz_flags = 0; + struct rte_ring_list* ring_list = NULL; + const unsigned int requested_count = count; + int ret; + + ring_list = RTE_TAILQ_CAST(rte_ring_tailq.head, rte_ring_list); + + /* for an exact size ring, round up from count to a power of two */ + if (flags & RING_F_EXACT_SZ) + count = rte_align32pow2(count + 1); + + ring_size = rte_ring_get_memsize_elem(esize, count); + if (ring_size < 0) { + rte_errno = ring_size; + return NULL; + } + + ret = snprintf(mz_name, sizeof(mz_name), "%s%s", + RTE_RING_MZ_PREFIX, name); + if (ret < 0 || ret >= (int)sizeof(mz_name)) { + rte_errno = ENAMETOOLONG; + return NULL; + } + + te = rte_zmalloc("RING_TAILQ_ENTRY", sizeof(*te), 0); + if (te == NULL) { + RTE_LOG(ERR, RING, "Cannot reserve memory for tailq\n"); + rte_errno = ENOMEM; + return NULL; + } + + rte_mcfg_tailq_write_lock(); + + /* reserve a memory zone for this ring. If we can't get rte_config or + * we are secondary process, the memzone_reserve function will set + * rte_errno for us appropriately - hence no check in this this function */ + mz = rte_memzone_reserve_aligned(mz_name, ring_size, socket_id, + mz_flags, __alignof__(*r)); + if (mz != NULL) { + r = mz->addr; + /* no need to check return value here, we already checked the + * arguments above */ + rte_ring_init(r, name, requested_count, flags); + + te->data = (void *) r; + r->memzone = mz; + + TAILQ_INSERT_TAIL(ring_list, te, next); + } else { + r = NULL; + RTE_LOG(ERR, RING, "Cannot reserve memory\n"); + rte_free(te); + } + rte_mcfg_tailq_write_unlock(); + + return r; +} + +/* create the ring */ +struct rte_ring * +rte_ring_create(const char *name, unsigned int count, int socket_id, + unsigned int flags) +{ + return rte_ring_create_elem(name, sizeof(void *), count, socket_id, + flags); +} + +/* free the ring */ +void +rte_ring_free(struct rte_ring *r) +{ + struct rte_ring_list *ring_list = NULL; + struct rte_tailq_entry *te; + + if (r == NULL) + return; + + /* + * Ring was not created with rte_ring_create, + * therefore, there is no memzone to free. + */ + if (r->memzone == NULL) { + RTE_LOG(ERR, RING, + "Cannot free ring, not created with rte_ring_create()\n"); + return; + } + + if (rte_memzone_free(r->memzone) != 0) { + RTE_LOG(ERR, RING, "Cannot free memory\n"); + return; + } + + ring_list = RTE_TAILQ_CAST(rte_ring_tailq.head, rte_ring_list); + rte_mcfg_tailq_write_lock(); + + /* find out tailq entry */ + TAILQ_FOREACH(te, ring_list, next) { + if (te->data == (void *) r) + break; + } + + if (te == NULL) { + rte_mcfg_tailq_write_unlock(); + return; + } + + TAILQ_REMOVE(ring_list, te, next); + + rte_mcfg_tailq_write_unlock(); + + rte_free(te); +} + +/* dump the status of the ring on the console */ +void +rte_ring_dump(FILE *f, const struct rte_ring *r) +{ + fprintf(f, "ring <%s>@%p\n", r->name, r); + fprintf(f, " flags=%x\n", r->flags); + fprintf(f, " size=%"PRIu32"\n", r->size); + fprintf(f, " capacity=%"PRIu32"\n", r->capacity); + fprintf(f, " ct=%"PRIu32"\n", r->cons.tail); + fprintf(f, " ch=%"PRIu32"\n", r->cons.head); + fprintf(f, " pt=%"PRIu32"\n", r->prod.tail); + fprintf(f, " ph=%"PRIu32"\n", r->prod.head); + fprintf(f, " used=%u\n", rte_ring_count(r)); + fprintf(f, " avail=%u\n", rte_ring_free_count(r)); +} + +/* dump the status of all rings on the console */ +void +rte_ring_list_dump(FILE *f) +{ + const struct rte_tailq_entry *te; + struct rte_ring_list *ring_list; + + ring_list = RTE_TAILQ_CAST(rte_ring_tailq.head, rte_ring_list); + + rte_mcfg_tailq_read_lock(); + + TAILQ_FOREACH(te, ring_list, next) { + rte_ring_dump(f, (struct rte_ring *) te->data); + } + + rte_mcfg_tailq_read_unlock(); +} + +/* search a ring from its name */ +struct rte_ring * +rte_ring_lookup(const char *name) +{ + struct rte_tailq_entry *te; + struct rte_ring *r = NULL; + struct rte_ring_list *ring_list; + + ring_list = RTE_TAILQ_CAST(rte_ring_tailq.head, rte_ring_list); + + rte_mcfg_tailq_read_lock(); + + TAILQ_FOREACH(te, ring_list, next) { + r = (struct rte_ring *) te->data; + if (strncmp(name, r->name, RTE_RING_NAMESIZE) == 0) + break; + } + + rte_mcfg_tailq_read_unlock(); + + if (te == NULL) { + rte_errno = ENOENT; + return NULL; + } + + return r; +} diff --git a/src/spdk/dpdk/lib/librte_ring/rte_ring.h b/src/spdk/dpdk/lib/librte_ring/rte_ring.h new file mode 100644 index 000000000..86faede81 --- /dev/null +++ b/src/spdk/dpdk/lib/librte_ring/rte_ring.h @@ -0,0 +1,1037 @@ +/* SPDX-License-Identifier: BSD-3-Clause + * + * Copyright (c) 2010-2020 Intel Corporation + * Copyright (c) 2007-2009 Kip Macy kmacy@freebsd.org + * All rights reserved. + * Derived from FreeBSD's bufring.h + * Used as BSD-3 Licensed with permission from Kip Macy. + */ + +#ifndef _RTE_RING_H_ +#define _RTE_RING_H_ + +/** + * @file + * RTE Ring + * + * The Ring Manager is a fixed-size queue, implemented as a table of + * pointers. Head and tail pointers are modified atomically, allowing + * concurrent access to it. It has the following features: + * + * - FIFO (First In First Out) + * - Maximum size is fixed; the pointers are stored in a table. + * - Lockless implementation. + * - Multi- or single-consumer dequeue. + * - Multi- or single-producer enqueue. + * - Bulk dequeue. + * - Bulk enqueue. + * - Ability to select different sync modes for producer/consumer. + * - Dequeue start/finish (depending on consumer sync modes). + * - Enqueue start/finish (depending on producer sync mode). + * + * Note: the ring implementation is not preemptible. Refer to Programmer's + * guide/Environment Abstraction Layer/Multiple pthread/Known Issues/rte_ring + * for more information. + * + */ + +#ifdef __cplusplus +extern "C" { +#endif + +#include <rte_ring_core.h> + +/** + * Calculate the memory size needed for a ring + * + * This function returns the number of bytes needed for a ring, given + * the number of elements in it. This value is the sum of the size of + * the structure rte_ring and the size of the memory needed by the + * objects pointers. The value is aligned to a cache line size. + * + * @param count + * The number of elements in the ring (must be a power of 2). + * @return + * - The memory size needed for the ring on success. + * - -EINVAL if count is not a power of 2. + */ +ssize_t rte_ring_get_memsize(unsigned count); + +/** + * Initialize a ring structure. + * + * Initialize a ring structure in memory pointed by "r". The size of the + * memory area must be large enough to store the ring structure and the + * object table. It is advised to use rte_ring_get_memsize() to get the + * appropriate size. + * + * The ring size is set to *count*, which must be a power of two. Water + * marking is disabled by default. The real usable ring size is + * *count-1* instead of *count* to differentiate a free ring from an + * empty ring. + * + * The ring is not added in RTE_TAILQ_RING global list. Indeed, the + * memory given by the caller may not be shareable among dpdk + * processes. + * + * @param r + * The pointer to the ring structure followed by the objects table. + * @param name + * The name of the ring. + * @param count + * The number of elements in the ring (must be a power of 2). + * @param flags + * An OR of the following: + * - One of mutually exclusive flags that define producer behavior: + * - RING_F_SP_ENQ: If this flag is set, the default behavior when + * using ``rte_ring_enqueue()`` or ``rte_ring_enqueue_bulk()`` + * is "single-producer". + * - RING_F_MP_RTS_ENQ: If this flag is set, the default behavior when + * using ``rte_ring_enqueue()`` or ``rte_ring_enqueue_bulk()`` + * is "multi-producer RTS mode". + * - RING_F_MP_HTS_ENQ: If this flag is set, the default behavior when + * using ``rte_ring_enqueue()`` or ``rte_ring_enqueue_bulk()`` + * is "multi-producer HTS mode". + * If none of these flags is set, then default "multi-producer" + * behavior is selected. + * - One of mutually exclusive flags that define consumer behavior: + * - RING_F_SC_DEQ: If this flag is set, the default behavior when + * using ``rte_ring_dequeue()`` or ``rte_ring_dequeue_bulk()`` + * is "single-consumer". Otherwise, it is "multi-consumers". + * - RING_F_MC_RTS_DEQ: If this flag is set, the default behavior when + * using ``rte_ring_dequeue()`` or ``rte_ring_dequeue_bulk()`` + * is "multi-consumer RTS mode". + * - RING_F_MC_HTS_DEQ: If this flag is set, the default behavior when + * using ``rte_ring_dequeue()`` or ``rte_ring_dequeue_bulk()`` + * is "multi-consumer HTS mode". + * If none of these flags is set, then default "multi-consumer" + * behavior is selected. + * @return + * 0 on success, or a negative value on error. + */ +int rte_ring_init(struct rte_ring *r, const char *name, unsigned count, + unsigned flags); + +/** + * Create a new ring named *name* in memory. + * + * This function uses ``memzone_reserve()`` to allocate memory. Then it + * calls rte_ring_init() to initialize an empty ring. + * + * The new ring size is set to *count*, which must be a power of + * two. Water marking is disabled by default. The real usable ring size + * is *count-1* instead of *count* to differentiate a free ring from an + * empty ring. + * + * The ring is added in RTE_TAILQ_RING list. + * + * @param name + * The name of the ring. + * @param count + * The size of the ring (must be a power of 2). + * @param socket_id + * The *socket_id* argument is the socket identifier in case of + * NUMA. The value can be *SOCKET_ID_ANY* if there is no NUMA + * constraint for the reserved zone. + * @param flags + * An OR of the following: + * - One of mutually exclusive flags that define producer behavior: + * - RING_F_SP_ENQ: If this flag is set, the default behavior when + * using ``rte_ring_enqueue()`` or ``rte_ring_enqueue_bulk()`` + * is "single-producer". + * - RING_F_MP_RTS_ENQ: If this flag is set, the default behavior when + * using ``rte_ring_enqueue()`` or ``rte_ring_enqueue_bulk()`` + * is "multi-producer RTS mode". + * - RING_F_MP_HTS_ENQ: If this flag is set, the default behavior when + * using ``rte_ring_enqueue()`` or ``rte_ring_enqueue_bulk()`` + * is "multi-producer HTS mode". + * If none of these flags is set, then default "multi-producer" + * behavior is selected. + * - One of mutually exclusive flags that define consumer behavior: + * - RING_F_SC_DEQ: If this flag is set, the default behavior when + * using ``rte_ring_dequeue()`` or ``rte_ring_dequeue_bulk()`` + * is "single-consumer". Otherwise, it is "multi-consumers". + * - RING_F_MC_RTS_DEQ: If this flag is set, the default behavior when + * using ``rte_ring_dequeue()`` or ``rte_ring_dequeue_bulk()`` + * is "multi-consumer RTS mode". + * - RING_F_MC_HTS_DEQ: If this flag is set, the default behavior when + * using ``rte_ring_dequeue()`` or ``rte_ring_dequeue_bulk()`` + * is "multi-consumer HTS mode". + * If none of these flags is set, then default "multi-consumer" + * behavior is selected. + * @return + * On success, the pointer to the new allocated ring. NULL on error with + * rte_errno set appropriately. Possible errno values include: + * - E_RTE_NO_CONFIG - function could not get pointer to rte_config structure + * - E_RTE_SECONDARY - function was called from a secondary process instance + * - EINVAL - count provided is not a power of 2 + * - ENOSPC - the maximum number of memzones has already been allocated + * - EEXIST - a memzone with the same name already exists + * - ENOMEM - no appropriate memory area found in which to create memzone + */ +struct rte_ring *rte_ring_create(const char *name, unsigned count, + int socket_id, unsigned flags); + +/** + * De-allocate all memory used by the ring. + * + * @param r + * Ring to free + */ +void rte_ring_free(struct rte_ring *r); + +/** + * Dump the status of the ring to a file. + * + * @param f + * A pointer to a file for output + * @param r + * A pointer to the ring structure. + */ +void rte_ring_dump(FILE *f, const struct rte_ring *r); + +/* the actual enqueue of pointers on the ring. + * Placed here since identical code needed in both + * single and multi producer enqueue functions */ +#define ENQUEUE_PTRS(r, ring_start, prod_head, obj_table, n, obj_type) do { \ + unsigned int i; \ + const uint32_t size = (r)->size; \ + uint32_t idx = prod_head & (r)->mask; \ + obj_type *ring = (obj_type *)ring_start; \ + if (likely(idx + n < size)) { \ + for (i = 0; i < (n & ((~(unsigned)0x3))); i+=4, idx+=4) { \ + ring[idx] = obj_table[i]; \ + ring[idx+1] = obj_table[i+1]; \ + ring[idx+2] = obj_table[i+2]; \ + ring[idx+3] = obj_table[i+3]; \ + } \ + switch (n & 0x3) { \ + case 3: \ + ring[idx++] = obj_table[i++]; /* fallthrough */ \ + case 2: \ + ring[idx++] = obj_table[i++]; /* fallthrough */ \ + case 1: \ + ring[idx++] = obj_table[i++]; \ + } \ + } else { \ + for (i = 0; idx < size; i++, idx++)\ + ring[idx] = obj_table[i]; \ + for (idx = 0; i < n; i++, idx++) \ + ring[idx] = obj_table[i]; \ + } \ +} while (0) + +/* the actual copy of pointers on the ring to obj_table. + * Placed here since identical code needed in both + * single and multi consumer dequeue functions */ +#define DEQUEUE_PTRS(r, ring_start, cons_head, obj_table, n, obj_type) do { \ + unsigned int i; \ + uint32_t idx = cons_head & (r)->mask; \ + const uint32_t size = (r)->size; \ + obj_type *ring = (obj_type *)ring_start; \ + if (likely(idx + n < size)) { \ + for (i = 0; i < (n & (~(unsigned)0x3)); i+=4, idx+=4) {\ + obj_table[i] = ring[idx]; \ + obj_table[i+1] = ring[idx+1]; \ + obj_table[i+2] = ring[idx+2]; \ + obj_table[i+3] = ring[idx+3]; \ + } \ + switch (n & 0x3) { \ + case 3: \ + obj_table[i++] = ring[idx++]; /* fallthrough */ \ + case 2: \ + obj_table[i++] = ring[idx++]; /* fallthrough */ \ + case 1: \ + obj_table[i++] = ring[idx++]; \ + } \ + } else { \ + for (i = 0; idx < size; i++, idx++) \ + obj_table[i] = ring[idx]; \ + for (idx = 0; i < n; i++, idx++) \ + obj_table[i] = ring[idx]; \ + } \ +} while (0) + +/* Between load and load. there might be cpu reorder in weak model + * (powerpc/arm). + * There are 2 choices for the users + * 1.use rmb() memory barrier + * 2.use one-direction load_acquire/store_release barrier,defined by + * CONFIG_RTE_USE_C11_MEM_MODEL=y + * It depends on performance test results. + * By default, move common functions to rte_ring_generic.h + */ +#ifdef RTE_USE_C11_MEM_MODEL +#include "rte_ring_c11_mem.h" +#else +#include "rte_ring_generic.h" +#endif + +/** + * @internal Enqueue several objects on the ring + * + * @param r + * A pointer to the ring structure. + * @param obj_table + * A pointer to a table of void * pointers (objects). + * @param n + * The number of objects to add in the ring from the obj_table. + * @param behavior + * RTE_RING_QUEUE_FIXED: Enqueue a fixed number of items from a ring + * RTE_RING_QUEUE_VARIABLE: Enqueue as many items as possible from ring + * @param is_sp + * Indicates whether to use single producer or multi-producer head update + * @param free_space + * returns the amount of space after the enqueue operation has finished + * @return + * Actual number of objects enqueued. + * If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only. + */ +static __rte_always_inline unsigned int +__rte_ring_do_enqueue(struct rte_ring *r, void * const *obj_table, + unsigned int n, enum rte_ring_queue_behavior behavior, + unsigned int is_sp, unsigned int *free_space) +{ + uint32_t prod_head, prod_next; + uint32_t free_entries; + + n = __rte_ring_move_prod_head(r, is_sp, n, behavior, + &prod_head, &prod_next, &free_entries); + if (n == 0) + goto end; + + ENQUEUE_PTRS(r, &r[1], prod_head, obj_table, n, void *); + + update_tail(&r->prod, prod_head, prod_next, is_sp, 1); +end: + if (free_space != NULL) + *free_space = free_entries - n; + return n; +} + +/** + * @internal Dequeue several objects from the ring + * + * @param r + * A pointer to the ring structure. + * @param obj_table + * A pointer to a table of void * pointers (objects). + * @param n + * The number of objects to pull from the ring. + * @param behavior + * RTE_RING_QUEUE_FIXED: Dequeue a fixed number of items from a ring + * RTE_RING_QUEUE_VARIABLE: Dequeue as many items as possible from ring + * @param is_sc + * Indicates whether to use single consumer or multi-consumer head update + * @param available + * returns the number of remaining ring entries after the dequeue has finished + * @return + * - Actual number of objects dequeued. + * If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only. + */ +static __rte_always_inline unsigned int +__rte_ring_do_dequeue(struct rte_ring *r, void **obj_table, + unsigned int n, enum rte_ring_queue_behavior behavior, + unsigned int is_sc, unsigned int *available) +{ + uint32_t cons_head, cons_next; + uint32_t entries; + + n = __rte_ring_move_cons_head(r, (int)is_sc, n, behavior, + &cons_head, &cons_next, &entries); + if (n == 0) + goto end; + + DEQUEUE_PTRS(r, &r[1], cons_head, obj_table, n, void *); + + update_tail(&r->cons, cons_head, cons_next, is_sc, 0); + +end: + if (available != NULL) + *available = entries - n; + return n; +} + +/** + * Enqueue several objects on the ring (multi-producers safe). + * + * This function uses a "compare and set" instruction to move the + * producer index atomically. + * + * @param r + * A pointer to the ring structure. + * @param obj_table + * A pointer to a table of void * pointers (objects). + * @param n + * The number of objects to add in the ring from the obj_table. + * @param free_space + * if non-NULL, returns the amount of space in the ring after the + * enqueue operation has finished. + * @return + * The number of objects enqueued, either 0 or n + */ +static __rte_always_inline unsigned int +rte_ring_mp_enqueue_bulk(struct rte_ring *r, void * const *obj_table, + unsigned int n, unsigned int *free_space) +{ + return __rte_ring_do_enqueue(r, obj_table, n, RTE_RING_QUEUE_FIXED, + RTE_RING_SYNC_MT, free_space); +} + +/** + * Enqueue several objects on a ring (NOT multi-producers safe). + * + * @param r + * A pointer to the ring structure. + * @param obj_table + * A pointer to a table of void * pointers (objects). + * @param n + * The number of objects to add in the ring from the obj_table. + * @param free_space + * if non-NULL, returns the amount of space in the ring after the + * enqueue operation has finished. + * @return + * The number of objects enqueued, either 0 or n + */ +static __rte_always_inline unsigned int +rte_ring_sp_enqueue_bulk(struct rte_ring *r, void * const *obj_table, + unsigned int n, unsigned int *free_space) +{ + return __rte_ring_do_enqueue(r, obj_table, n, RTE_RING_QUEUE_FIXED, + RTE_RING_SYNC_ST, free_space); +} + +#ifdef ALLOW_EXPERIMENTAL_API +#include <rte_ring_elem.h> +#endif + +/** + * Enqueue several objects on a ring. + * + * This function calls the multi-producer or the single-producer + * version depending on the default behavior that was specified at + * ring creation time (see flags). + * + * @param r + * A pointer to the ring structure. + * @param obj_table + * A pointer to a table of void * pointers (objects). + * @param n + * The number of objects to add in the ring from the obj_table. + * @param free_space + * if non-NULL, returns the amount of space in the ring after the + * enqueue operation has finished. + * @return + * The number of objects enqueued, either 0 or n + */ +static __rte_always_inline unsigned int +rte_ring_enqueue_bulk(struct rte_ring *r, void * const *obj_table, + unsigned int n, unsigned int *free_space) +{ + switch (r->prod.sync_type) { + case RTE_RING_SYNC_MT: + return rte_ring_mp_enqueue_bulk(r, obj_table, n, free_space); + case RTE_RING_SYNC_ST: + return rte_ring_sp_enqueue_bulk(r, obj_table, n, free_space); +#ifdef ALLOW_EXPERIMENTAL_API + case RTE_RING_SYNC_MT_RTS: + return rte_ring_mp_rts_enqueue_bulk(r, obj_table, n, + free_space); + case RTE_RING_SYNC_MT_HTS: + return rte_ring_mp_hts_enqueue_bulk(r, obj_table, n, + free_space); +#endif + } + + /* valid ring should never reach this point */ + RTE_ASSERT(0); + return 0; +} + +/** + * Enqueue one object on a ring (multi-producers safe). + * + * This function uses a "compare and set" instruction to move the + * producer index atomically. + * + * @param r + * A pointer to the ring structure. + * @param obj + * A pointer to the object to be added. + * @return + * - 0: Success; objects enqueued. + * - -ENOBUFS: Not enough room in the ring to enqueue; no object is enqueued. + */ +static __rte_always_inline int +rte_ring_mp_enqueue(struct rte_ring *r, void *obj) +{ + return rte_ring_mp_enqueue_bulk(r, &obj, 1, NULL) ? 0 : -ENOBUFS; +} + +/** + * Enqueue one object on a ring (NOT multi-producers safe). + * + * @param r + * A pointer to the ring structure. + * @param obj + * A pointer to the object to be added. + * @return + * - 0: Success; objects enqueued. + * - -ENOBUFS: Not enough room in the ring to enqueue; no object is enqueued. + */ +static __rte_always_inline int +rte_ring_sp_enqueue(struct rte_ring *r, void *obj) +{ + return rte_ring_sp_enqueue_bulk(r, &obj, 1, NULL) ? 0 : -ENOBUFS; +} + +/** + * Enqueue one object on a ring. + * + * This function calls the multi-producer or the single-producer + * version, depending on the default behaviour that was specified at + * ring creation time (see flags). + * + * @param r + * A pointer to the ring structure. + * @param obj + * A pointer to the object to be added. + * @return + * - 0: Success; objects enqueued. + * - -ENOBUFS: Not enough room in the ring to enqueue; no object is enqueued. + */ +static __rte_always_inline int +rte_ring_enqueue(struct rte_ring *r, void *obj) +{ + return rte_ring_enqueue_bulk(r, &obj, 1, NULL) ? 0 : -ENOBUFS; +} + +/** + * Dequeue several objects from a ring (multi-consumers safe). + * + * This function uses a "compare and set" instruction to move the + * consumer index atomically. + * + * @param r + * A pointer to the ring structure. + * @param obj_table + * A pointer to a table of void * pointers (objects) that will be filled. + * @param n + * The number of objects to dequeue from the ring to the obj_table. + * @param available + * If non-NULL, returns the number of remaining ring entries after the + * dequeue has finished. + * @return + * The number of objects dequeued, either 0 or n + */ +static __rte_always_inline unsigned int +rte_ring_mc_dequeue_bulk(struct rte_ring *r, void **obj_table, + unsigned int n, unsigned int *available) +{ + return __rte_ring_do_dequeue(r, obj_table, n, RTE_RING_QUEUE_FIXED, + RTE_RING_SYNC_MT, available); +} + +/** + * Dequeue several objects from a ring (NOT multi-consumers safe). + * + * @param r + * A pointer to the ring structure. + * @param obj_table + * A pointer to a table of void * pointers (objects) that will be filled. + * @param n + * The number of objects to dequeue from the ring to the obj_table, + * must be strictly positive. + * @param available + * If non-NULL, returns the number of remaining ring entries after the + * dequeue has finished. + * @return + * The number of objects dequeued, either 0 or n + */ +static __rte_always_inline unsigned int +rte_ring_sc_dequeue_bulk(struct rte_ring *r, void **obj_table, + unsigned int n, unsigned int *available) +{ + return __rte_ring_do_dequeue(r, obj_table, n, RTE_RING_QUEUE_FIXED, + RTE_RING_SYNC_ST, available); +} + +/** + * Dequeue several objects from a ring. + * + * This function calls the multi-consumers or the single-consumer + * version, depending on the default behaviour that was specified at + * ring creation time (see flags). + * + * @param r + * A pointer to the ring structure. + * @param obj_table + * A pointer to a table of void * pointers (objects) that will be filled. + * @param n + * The number of objects to dequeue from the ring to the obj_table. + * @param available + * If non-NULL, returns the number of remaining ring entries after the + * dequeue has finished. + * @return + * The number of objects dequeued, either 0 or n + */ +static __rte_always_inline unsigned int +rte_ring_dequeue_bulk(struct rte_ring *r, void **obj_table, unsigned int n, + unsigned int *available) +{ + switch (r->cons.sync_type) { + case RTE_RING_SYNC_MT: + return rte_ring_mc_dequeue_bulk(r, obj_table, n, available); + case RTE_RING_SYNC_ST: + return rte_ring_sc_dequeue_bulk(r, obj_table, n, available); +#ifdef ALLOW_EXPERIMENTAL_API + case RTE_RING_SYNC_MT_RTS: + return rte_ring_mc_rts_dequeue_bulk(r, obj_table, n, available); + case RTE_RING_SYNC_MT_HTS: + return rte_ring_mc_hts_dequeue_bulk(r, obj_table, n, available); +#endif + } + + /* valid ring should never reach this point */ + RTE_ASSERT(0); + return 0; +} + +/** + * Dequeue one object from a ring (multi-consumers safe). + * + * This function uses a "compare and set" instruction to move the + * consumer index atomically. + * + * @param r + * A pointer to the ring structure. + * @param obj_p + * A pointer to a void * pointer (object) that will be filled. + * @return + * - 0: Success; objects dequeued. + * - -ENOENT: Not enough entries in the ring to dequeue; no object is + * dequeued. + */ +static __rte_always_inline int +rte_ring_mc_dequeue(struct rte_ring *r, void **obj_p) +{ + return rte_ring_mc_dequeue_bulk(r, obj_p, 1, NULL) ? 0 : -ENOENT; +} + +/** + * Dequeue one object from a ring (NOT multi-consumers safe). + * + * @param r + * A pointer to the ring structure. + * @param obj_p + * A pointer to a void * pointer (object) that will be filled. + * @return + * - 0: Success; objects dequeued. + * - -ENOENT: Not enough entries in the ring to dequeue, no object is + * dequeued. + */ +static __rte_always_inline int +rte_ring_sc_dequeue(struct rte_ring *r, void **obj_p) +{ + return rte_ring_sc_dequeue_bulk(r, obj_p, 1, NULL) ? 0 : -ENOENT; +} + +/** + * Dequeue one object from a ring. + * + * This function calls the multi-consumers or the single-consumer + * version depending on the default behaviour that was specified at + * ring creation time (see flags). + * + * @param r + * A pointer to the ring structure. + * @param obj_p + * A pointer to a void * pointer (object) that will be filled. + * @return + * - 0: Success, objects dequeued. + * - -ENOENT: Not enough entries in the ring to dequeue, no object is + * dequeued. + */ +static __rte_always_inline int +rte_ring_dequeue(struct rte_ring *r, void **obj_p) +{ + return rte_ring_dequeue_bulk(r, obj_p, 1, NULL) ? 0 : -ENOENT; +} + +/** + * Flush a ring. + * + * This function flush all the elements in a ring + * + * @b EXPERIMENTAL: this API may change without prior notice + * + * @warning + * Make sure the ring is not in use while calling this function. + * + * @param r + * A pointer to the ring structure. + */ +__rte_experimental +void +rte_ring_reset(struct rte_ring *r); + +/** + * Return the number of entries in a ring. + * + * @param r + * A pointer to the ring structure. + * @return + * The number of entries in the ring. + */ +static inline unsigned +rte_ring_count(const struct rte_ring *r) +{ + uint32_t prod_tail = r->prod.tail; + uint32_t cons_tail = r->cons.tail; + uint32_t count = (prod_tail - cons_tail) & r->mask; + return (count > r->capacity) ? r->capacity : count; +} + +/** + * Return the number of free entries in a ring. + * + * @param r + * A pointer to the ring structure. + * @return + * The number of free entries in the ring. + */ +static inline unsigned +rte_ring_free_count(const struct rte_ring *r) +{ + return r->capacity - rte_ring_count(r); +} + +/** + * Test if a ring is full. + * + * @param r + * A pointer to the ring structure. + * @return + * - 1: The ring is full. + * - 0: The ring is not full. + */ +static inline int +rte_ring_full(const struct rte_ring *r) +{ + return rte_ring_free_count(r) == 0; +} + +/** + * Test if a ring is empty. + * + * @param r + * A pointer to the ring structure. + * @return + * - 1: The ring is empty. + * - 0: The ring is not empty. + */ +static inline int +rte_ring_empty(const struct rte_ring *r) +{ + return rte_ring_count(r) == 0; +} + +/** + * Return the size of the ring. + * + * @param r + * A pointer to the ring structure. + * @return + * The size of the data store used by the ring. + * NOTE: this is not the same as the usable space in the ring. To query that + * use ``rte_ring_get_capacity()``. + */ +static inline unsigned int +rte_ring_get_size(const struct rte_ring *r) +{ + return r->size; +} + +/** + * Return the number of elements which can be stored in the ring. + * + * @param r + * A pointer to the ring structure. + * @return + * The usable size of the ring. + */ +static inline unsigned int +rte_ring_get_capacity(const struct rte_ring *r) +{ + return r->capacity; +} + +/** + * Return sync type used by producer in the ring. + * + * @param r + * A pointer to the ring structure. + * @return + * Producer sync type value. + */ +static inline enum rte_ring_sync_type +rte_ring_get_prod_sync_type(const struct rte_ring *r) +{ + return r->prod.sync_type; +} + +/** + * Check is the ring for single producer. + * + * @param r + * A pointer to the ring structure. + * @return + * true if ring is SP, zero otherwise. + */ +static inline int +rte_ring_is_prod_single(const struct rte_ring *r) +{ + return (rte_ring_get_prod_sync_type(r) == RTE_RING_SYNC_ST); +} + +/** + * Return sync type used by consumer in the ring. + * + * @param r + * A pointer to the ring structure. + * @return + * Consumer sync type value. + */ +static inline enum rte_ring_sync_type +rte_ring_get_cons_sync_type(const struct rte_ring *r) +{ + return r->cons.sync_type; +} + +/** + * Check is the ring for single consumer. + * + * @param r + * A pointer to the ring structure. + * @return + * true if ring is SC, zero otherwise. + */ +static inline int +rte_ring_is_cons_single(const struct rte_ring *r) +{ + return (rte_ring_get_cons_sync_type(r) == RTE_RING_SYNC_ST); +} + +/** + * Dump the status of all rings on the console + * + * @param f + * A pointer to a file for output + */ +void rte_ring_list_dump(FILE *f); + +/** + * Search a ring from its name + * + * @param name + * The name of the ring. + * @return + * The pointer to the ring matching the name, or NULL if not found, + * with rte_errno set appropriately. Possible rte_errno values include: + * - ENOENT - required entry not available to return. + */ +struct rte_ring *rte_ring_lookup(const char *name); + +/** + * Enqueue several objects on the ring (multi-producers safe). + * + * This function uses a "compare and set" instruction to move the + * producer index atomically. + * + * @param r + * A pointer to the ring structure. + * @param obj_table + * A pointer to a table of void * pointers (objects). + * @param n + * The number of objects to add in the ring from the obj_table. + * @param free_space + * if non-NULL, returns the amount of space in the ring after the + * enqueue operation has finished. + * @return + * - n: Actual number of objects enqueued. + */ +static __rte_always_inline unsigned +rte_ring_mp_enqueue_burst(struct rte_ring *r, void * const *obj_table, + unsigned int n, unsigned int *free_space) +{ + return __rte_ring_do_enqueue(r, obj_table, n, + RTE_RING_QUEUE_VARIABLE, RTE_RING_SYNC_MT, free_space); +} + +/** + * Enqueue several objects on a ring (NOT multi-producers safe). + * + * @param r + * A pointer to the ring structure. + * @param obj_table + * A pointer to a table of void * pointers (objects). + * @param n + * The number of objects to add in the ring from the obj_table. + * @param free_space + * if non-NULL, returns the amount of space in the ring after the + * enqueue operation has finished. + * @return + * - n: Actual number of objects enqueued. + */ +static __rte_always_inline unsigned +rte_ring_sp_enqueue_burst(struct rte_ring *r, void * const *obj_table, + unsigned int n, unsigned int *free_space) +{ + return __rte_ring_do_enqueue(r, obj_table, n, + RTE_RING_QUEUE_VARIABLE, RTE_RING_SYNC_ST, free_space); +} + +/** + * Enqueue several objects on a ring. + * + * This function calls the multi-producer or the single-producer + * version depending on the default behavior that was specified at + * ring creation time (see flags). + * + * @param r + * A pointer to the ring structure. + * @param obj_table + * A pointer to a table of void * pointers (objects). + * @param n + * The number of objects to add in the ring from the obj_table. + * @param free_space + * if non-NULL, returns the amount of space in the ring after the + * enqueue operation has finished. + * @return + * - n: Actual number of objects enqueued. + */ +static __rte_always_inline unsigned +rte_ring_enqueue_burst(struct rte_ring *r, void * const *obj_table, + unsigned int n, unsigned int *free_space) +{ + switch (r->prod.sync_type) { + case RTE_RING_SYNC_MT: + return rte_ring_mp_enqueue_burst(r, obj_table, n, free_space); + case RTE_RING_SYNC_ST: + return rte_ring_sp_enqueue_burst(r, obj_table, n, free_space); +#ifdef ALLOW_EXPERIMENTAL_API + case RTE_RING_SYNC_MT_RTS: + return rte_ring_mp_rts_enqueue_burst(r, obj_table, n, + free_space); + case RTE_RING_SYNC_MT_HTS: + return rte_ring_mp_hts_enqueue_burst(r, obj_table, n, + free_space); +#endif + } + + /* valid ring should never reach this point */ + RTE_ASSERT(0); + return 0; +} + +/** + * Dequeue several objects from a ring (multi-consumers safe). When the request + * objects are more than the available objects, only dequeue the actual number + * of objects + * + * This function uses a "compare and set" instruction to move the + * consumer index atomically. + * + * @param r + * A pointer to the ring structure. + * @param obj_table + * A pointer to a table of void * pointers (objects) that will be filled. + * @param n + * The number of objects to dequeue from the ring to the obj_table. + * @param available + * If non-NULL, returns the number of remaining ring entries after the + * dequeue has finished. + * @return + * - n: Actual number of objects dequeued, 0 if ring is empty + */ +static __rte_always_inline unsigned +rte_ring_mc_dequeue_burst(struct rte_ring *r, void **obj_table, + unsigned int n, unsigned int *available) +{ + return __rte_ring_do_dequeue(r, obj_table, n, + RTE_RING_QUEUE_VARIABLE, RTE_RING_SYNC_MT, available); +} + +/** + * Dequeue several objects from a ring (NOT multi-consumers safe).When the + * request objects are more than the available objects, only dequeue the + * actual number of objects + * + * @param r + * A pointer to the ring structure. + * @param obj_table + * A pointer to a table of void * pointers (objects) that will be filled. + * @param n + * The number of objects to dequeue from the ring to the obj_table. + * @param available + * If non-NULL, returns the number of remaining ring entries after the + * dequeue has finished. + * @return + * - n: Actual number of objects dequeued, 0 if ring is empty + */ +static __rte_always_inline unsigned +rte_ring_sc_dequeue_burst(struct rte_ring *r, void **obj_table, + unsigned int n, unsigned int *available) +{ + return __rte_ring_do_dequeue(r, obj_table, n, + RTE_RING_QUEUE_VARIABLE, RTE_RING_SYNC_ST, available); +} + +/** + * Dequeue multiple objects from a ring up to a maximum number. + * + * This function calls the multi-consumers or the single-consumer + * version, depending on the default behaviour that was specified at + * ring creation time (see flags). + * + * @param r + * A pointer to the ring structure. + * @param obj_table + * A pointer to a table of void * pointers (objects) that will be filled. + * @param n + * The number of objects to dequeue from the ring to the obj_table. + * @param available + * If non-NULL, returns the number of remaining ring entries after the + * dequeue has finished. + * @return + * - Number of objects dequeued + */ +static __rte_always_inline unsigned +rte_ring_dequeue_burst(struct rte_ring *r, void **obj_table, + unsigned int n, unsigned int *available) +{ + switch (r->cons.sync_type) { + case RTE_RING_SYNC_MT: + return rte_ring_mc_dequeue_burst(r, obj_table, n, available); + case RTE_RING_SYNC_ST: + return rte_ring_sc_dequeue_burst(r, obj_table, n, available); +#ifdef ALLOW_EXPERIMENTAL_API + case RTE_RING_SYNC_MT_RTS: + return rte_ring_mc_rts_dequeue_burst(r, obj_table, n, + available); + case RTE_RING_SYNC_MT_HTS: + return rte_ring_mc_hts_dequeue_burst(r, obj_table, n, + available); +#endif + } + + /* valid ring should never reach this point */ + RTE_ASSERT(0); + return 0; +} + +#ifdef __cplusplus +} +#endif + +#endif /* _RTE_RING_H_ */ diff --git a/src/spdk/dpdk/lib/librte_ring/rte_ring_c11_mem.h b/src/spdk/dpdk/lib/librte_ring/rte_ring_c11_mem.h new file mode 100644 index 000000000..0fb73a337 --- /dev/null +++ b/src/spdk/dpdk/lib/librte_ring/rte_ring_c11_mem.h @@ -0,0 +1,181 @@ +/* SPDX-License-Identifier: BSD-3-Clause + * + * Copyright (c) 2017,2018 HXT-semitech Corporation. + * Copyright (c) 2007-2009 Kip Macy kmacy@freebsd.org + * All rights reserved. + * Derived from FreeBSD's bufring.h + * Used as BSD-3 Licensed with permission from Kip Macy. + */ + +#ifndef _RTE_RING_C11_MEM_H_ +#define _RTE_RING_C11_MEM_H_ + +static __rte_always_inline void +update_tail(struct rte_ring_headtail *ht, uint32_t old_val, uint32_t new_val, + uint32_t single, uint32_t enqueue) +{ + RTE_SET_USED(enqueue); + + /* + * If there are other enqueues/dequeues in progress that preceded us, + * we need to wait for them to complete + */ + if (!single) + while (unlikely(ht->tail != old_val)) + rte_pause(); + + __atomic_store_n(&ht->tail, new_val, __ATOMIC_RELEASE); +} + +/** + * @internal This function updates the producer head for enqueue + * + * @param r + * A pointer to the ring structure + * @param is_sp + * Indicates whether multi-producer path is needed or not + * @param n + * The number of elements we will want to enqueue, i.e. how far should the + * head be moved + * @param behavior + * RTE_RING_QUEUE_FIXED: Enqueue a fixed number of items from a ring + * RTE_RING_QUEUE_VARIABLE: Enqueue as many items as possible from ring + * @param old_head + * Returns head value as it was before the move, i.e. where enqueue starts + * @param new_head + * Returns the current/new head value i.e. where enqueue finishes + * @param free_entries + * Returns the amount of free space in the ring BEFORE head was moved + * @return + * Actual number of objects enqueued. + * If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only. + */ +static __rte_always_inline unsigned int +__rte_ring_move_prod_head(struct rte_ring *r, unsigned int is_sp, + unsigned int n, enum rte_ring_queue_behavior behavior, + uint32_t *old_head, uint32_t *new_head, + uint32_t *free_entries) +{ + const uint32_t capacity = r->capacity; + uint32_t cons_tail; + unsigned int max = n; + int success; + + *old_head = __atomic_load_n(&r->prod.head, __ATOMIC_RELAXED); + do { + /* Reset n to the initial burst count */ + n = max; + + /* Ensure the head is read before tail */ + __atomic_thread_fence(__ATOMIC_ACQUIRE); + + /* load-acquire synchronize with store-release of ht->tail + * in update_tail. + */ + cons_tail = __atomic_load_n(&r->cons.tail, + __ATOMIC_ACQUIRE); + + /* The subtraction is done between two unsigned 32bits value + * (the result is always modulo 32 bits even if we have + * *old_head > cons_tail). So 'free_entries' is always between 0 + * and capacity (which is < size). + */ + *free_entries = (capacity + cons_tail - *old_head); + + /* check that we have enough room in ring */ + if (unlikely(n > *free_entries)) + n = (behavior == RTE_RING_QUEUE_FIXED) ? + 0 : *free_entries; + + if (n == 0) + return 0; + + *new_head = *old_head + n; + if (is_sp) + r->prod.head = *new_head, success = 1; + else + /* on failure, *old_head is updated */ + success = __atomic_compare_exchange_n(&r->prod.head, + old_head, *new_head, + 0, __ATOMIC_RELAXED, + __ATOMIC_RELAXED); + } while (unlikely(success == 0)); + return n; +} + +/** + * @internal This function updates the consumer head for dequeue + * + * @param r + * A pointer to the ring structure + * @param is_sc + * Indicates whether multi-consumer path is needed or not + * @param n + * The number of elements we will want to enqueue, i.e. how far should the + * head be moved + * @param behavior + * RTE_RING_QUEUE_FIXED: Dequeue a fixed number of items from a ring + * RTE_RING_QUEUE_VARIABLE: Dequeue as many items as possible from ring + * @param old_head + * Returns head value as it was before the move, i.e. where dequeue starts + * @param new_head + * Returns the current/new head value i.e. where dequeue finishes + * @param entries + * Returns the number of entries in the ring BEFORE head was moved + * @return + * - Actual number of objects dequeued. + * If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only. + */ +static __rte_always_inline unsigned int +__rte_ring_move_cons_head(struct rte_ring *r, int is_sc, + unsigned int n, enum rte_ring_queue_behavior behavior, + uint32_t *old_head, uint32_t *new_head, + uint32_t *entries) +{ + unsigned int max = n; + uint32_t prod_tail; + int success; + + /* move cons.head atomically */ + *old_head = __atomic_load_n(&r->cons.head, __ATOMIC_RELAXED); + do { + /* Restore n as it may change every loop */ + n = max; + + /* Ensure the head is read before tail */ + __atomic_thread_fence(__ATOMIC_ACQUIRE); + + /* this load-acquire synchronize with store-release of ht->tail + * in update_tail. + */ + prod_tail = __atomic_load_n(&r->prod.tail, + __ATOMIC_ACQUIRE); + + /* The subtraction is done between two unsigned 32bits value + * (the result is always modulo 32 bits even if we have + * cons_head > prod_tail). So 'entries' is always between 0 + * and size(ring)-1. + */ + *entries = (prod_tail - *old_head); + + /* Set the actual entries for dequeue */ + if (n > *entries) + n = (behavior == RTE_RING_QUEUE_FIXED) ? 0 : *entries; + + if (unlikely(n == 0)) + return 0; + + *new_head = *old_head + n; + if (is_sc) + r->cons.head = *new_head, success = 1; + else + /* on failure, *old_head will be updated */ + success = __atomic_compare_exchange_n(&r->cons.head, + old_head, *new_head, + 0, __ATOMIC_RELAXED, + __ATOMIC_RELAXED); + } while (unlikely(success == 0)); + return n; +} + +#endif /* _RTE_RING_C11_MEM_H_ */ diff --git a/src/spdk/dpdk/lib/librte_ring/rte_ring_core.h b/src/spdk/dpdk/lib/librte_ring/rte_ring_core.h new file mode 100644 index 000000000..16718ca7f --- /dev/null +++ b/src/spdk/dpdk/lib/librte_ring/rte_ring_core.h @@ -0,0 +1,184 @@ +/* SPDX-License-Identifier: BSD-3-Clause + * + * Copyright (c) 2010-2020 Intel Corporation + * Copyright (c) 2007-2009 Kip Macy kmacy@freebsd.org + * All rights reserved. + * Derived from FreeBSD's bufring.h + * Used as BSD-3 Licensed with permission from Kip Macy. + */ + +#ifndef _RTE_RING_CORE_H_ +#define _RTE_RING_CORE_H_ + +/** + * @file + * This file contains definion of RTE ring structure itself, + * init flags and some related macros. + * For majority of DPDK entities, it is not recommended to include + * this file directly, use include <rte_ring.h> or <rte_ring_elem.h> + * instead. + */ + +#ifdef __cplusplus +extern "C" { +#endif + +#include <stdio.h> +#include <stdint.h> +#include <string.h> +#include <sys/queue.h> +#include <errno.h> +#include <rte_common.h> +#include <rte_config.h> +#include <rte_memory.h> +#include <rte_lcore.h> +#include <rte_atomic.h> +#include <rte_branch_prediction.h> +#include <rte_memzone.h> +#include <rte_pause.h> +#include <rte_debug.h> + +#define RTE_TAILQ_RING_NAME "RTE_RING" + +/** enqueue/dequeue behavior types */ +enum rte_ring_queue_behavior { + /** Enq/Deq a fixed number of items from a ring */ + RTE_RING_QUEUE_FIXED = 0, + /** Enq/Deq as many items as possible from ring */ + RTE_RING_QUEUE_VARIABLE +}; + +#define RTE_RING_MZ_PREFIX "RG_" +/** The maximum length of a ring name. */ +#define RTE_RING_NAMESIZE (RTE_MEMZONE_NAMESIZE - \ + sizeof(RTE_RING_MZ_PREFIX) + 1) + +/** prod/cons sync types */ +enum rte_ring_sync_type { + RTE_RING_SYNC_MT, /**< multi-thread safe (default mode) */ + RTE_RING_SYNC_ST, /**< single thread only */ +#ifdef ALLOW_EXPERIMENTAL_API + RTE_RING_SYNC_MT_RTS, /**< multi-thread relaxed tail sync */ + RTE_RING_SYNC_MT_HTS, /**< multi-thread head/tail sync */ +#endif +}; + +/** + * structures to hold a pair of head/tail values and other metadata. + * Depending on sync_type format of that structure might be different, + * but offset for *sync_type* and *tail* values should remain the same. + */ +struct rte_ring_headtail { + volatile uint32_t head; /**< prod/consumer head. */ + volatile uint32_t tail; /**< prod/consumer tail. */ + RTE_STD_C11 + union { + /** sync type of prod/cons */ + enum rte_ring_sync_type sync_type; + /** deprecated - True if single prod/cons */ + uint32_t single; + }; +}; + +union __rte_ring_rts_poscnt { + /** raw 8B value to read/write *cnt* and *pos* as one atomic op */ + uint64_t raw __rte_aligned(8); + struct { + uint32_t cnt; /**< head/tail reference counter */ + uint32_t pos; /**< head/tail position */ + } val; +}; + +struct rte_ring_rts_headtail { + volatile union __rte_ring_rts_poscnt tail; + enum rte_ring_sync_type sync_type; /**< sync type of prod/cons */ + uint32_t htd_max; /**< max allowed distance between head/tail */ + volatile union __rte_ring_rts_poscnt head; +}; + +union __rte_ring_hts_pos { + /** raw 8B value to read/write *head* and *tail* as one atomic op */ + uint64_t raw __rte_aligned(8); + struct { + uint32_t head; /**< head position */ + uint32_t tail; /**< tail position */ + } pos; +}; + +struct rte_ring_hts_headtail { + volatile union __rte_ring_hts_pos ht; + enum rte_ring_sync_type sync_type; /**< sync type of prod/cons */ +}; + +/** + * An RTE ring structure. + * + * The producer and the consumer have a head and a tail index. The particularity + * of these index is that they are not between 0 and size(ring). These indexes + * are between 0 and 2^32, and we mask their value when we access the ring[] + * field. Thanks to this assumption, we can do subtractions between 2 index + * values in a modulo-32bit base: that's why the overflow of the indexes is not + * a problem. + */ +struct rte_ring { + /* + * Note: this field kept the RTE_MEMZONE_NAMESIZE size due to ABI + * compatibility requirements, it could be changed to RTE_RING_NAMESIZE + * next time the ABI changes + */ + char name[RTE_MEMZONE_NAMESIZE] __rte_cache_aligned; + /**< Name of the ring. */ + int flags; /**< Flags supplied at creation. */ + const struct rte_memzone *memzone; + /**< Memzone, if any, containing the rte_ring */ + uint32_t size; /**< Size of ring. */ + uint32_t mask; /**< Mask (size-1) of ring. */ + uint32_t capacity; /**< Usable size of ring */ + + char pad0 __rte_cache_aligned; /**< empty cache line */ + + /** Ring producer status. */ + RTE_STD_C11 + union { + struct rte_ring_headtail prod; + struct rte_ring_hts_headtail hts_prod; + struct rte_ring_rts_headtail rts_prod; + } __rte_cache_aligned; + + char pad1 __rte_cache_aligned; /**< empty cache line */ + + /** Ring consumer status. */ + RTE_STD_C11 + union { + struct rte_ring_headtail cons; + struct rte_ring_hts_headtail hts_cons; + struct rte_ring_rts_headtail rts_cons; + } __rte_cache_aligned; + + char pad2 __rte_cache_aligned; /**< empty cache line */ +}; + +#define RING_F_SP_ENQ 0x0001 /**< The default enqueue is "single-producer". */ +#define RING_F_SC_DEQ 0x0002 /**< The default dequeue is "single-consumer". */ +/** + * Ring is to hold exactly requested number of entries. + * Without this flag set, the ring size requested must be a power of 2, and the + * usable space will be that size - 1. With the flag, the requested size will + * be rounded up to the next power of two, but the usable space will be exactly + * that requested. Worst case, if a power-of-2 size is requested, half the + * ring space will be wasted. + */ +#define RING_F_EXACT_SZ 0x0004 +#define RTE_RING_SZ_MASK (0x7fffffffU) /**< Ring size mask */ + +#define RING_F_MP_RTS_ENQ 0x0008 /**< The default enqueue is "MP RTS". */ +#define RING_F_MC_RTS_DEQ 0x0010 /**< The default dequeue is "MC RTS". */ + +#define RING_F_MP_HTS_ENQ 0x0020 /**< The default enqueue is "MP HTS". */ +#define RING_F_MC_HTS_DEQ 0x0040 /**< The default dequeue is "MC HTS". */ + +#ifdef __cplusplus +} +#endif + +#endif /* _RTE_RING_CORE_H_ */ diff --git a/src/spdk/dpdk/lib/librte_ring/rte_ring_elem.h b/src/spdk/dpdk/lib/librte_ring/rte_ring_elem.h new file mode 100644 index 000000000..a5a4c46f9 --- /dev/null +++ b/src/spdk/dpdk/lib/librte_ring/rte_ring_elem.h @@ -0,0 +1,1102 @@ +/* SPDX-License-Identifier: BSD-3-Clause + * + * Copyright (c) 2019 Arm Limited + * Copyright (c) 2010-2017 Intel Corporation + * Copyright (c) 2007-2009 Kip Macy kmacy@freebsd.org + * All rights reserved. + * Derived from FreeBSD's bufring.h + * Used as BSD-3 Licensed with permission from Kip Macy. + */ + +#ifndef _RTE_RING_ELEM_H_ +#define _RTE_RING_ELEM_H_ + +/** + * @file + * RTE Ring with user defined element size + */ + +#ifdef __cplusplus +extern "C" { +#endif + +#include <rte_ring_core.h> + +/** + * @warning + * @b EXPERIMENTAL: this API may change without prior notice + * + * Calculate the memory size needed for a ring with given element size + * + * This function returns the number of bytes needed for a ring, given + * the number of elements in it and the size of the element. This value + * is the sum of the size of the structure rte_ring and the size of the + * memory needed for storing the elements. The value is aligned to a cache + * line size. + * + * @param esize + * The size of ring element, in bytes. It must be a multiple of 4. + * @param count + * The number of elements in the ring (must be a power of 2). + * @return + * - The memory size needed for the ring on success. + * - -EINVAL - esize is not a multiple of 4 or count provided is not a + * power of 2. + */ +__rte_experimental +ssize_t rte_ring_get_memsize_elem(unsigned int esize, unsigned int count); + +/** + * @warning + * @b EXPERIMENTAL: this API may change without prior notice + * + * Create a new ring named *name* that stores elements with given size. + * + * This function uses ``memzone_reserve()`` to allocate memory. Then it + * calls rte_ring_init() to initialize an empty ring. + * + * The new ring size is set to *count*, which must be a power of + * two. Water marking is disabled by default. The real usable ring size + * is *count-1* instead of *count* to differentiate a free ring from an + * empty ring. + * + * The ring is added in RTE_TAILQ_RING list. + * + * @param name + * The name of the ring. + * @param esize + * The size of ring element, in bytes. It must be a multiple of 4. + * @param count + * The number of elements in the ring (must be a power of 2). + * @param socket_id + * The *socket_id* argument is the socket identifier in case of + * NUMA. The value can be *SOCKET_ID_ANY* if there is no NUMA + * constraint for the reserved zone. + * @param flags + * An OR of the following: + * - One of mutually exclusive flags that define producer behavior: + * - RING_F_SP_ENQ: If this flag is set, the default behavior when + * using ``rte_ring_enqueue()`` or ``rte_ring_enqueue_bulk()`` + * is "single-producer". + * - RING_F_MP_RTS_ENQ: If this flag is set, the default behavior when + * using ``rte_ring_enqueue()`` or ``rte_ring_enqueue_bulk()`` + * is "multi-producer RTS mode". + * - RING_F_MP_HTS_ENQ: If this flag is set, the default behavior when + * using ``rte_ring_enqueue()`` or ``rte_ring_enqueue_bulk()`` + * is "multi-producer HTS mode". + * If none of these flags is set, then default "multi-producer" + * behavior is selected. + * - One of mutually exclusive flags that define consumer behavior: + * - RING_F_SC_DEQ: If this flag is set, the default behavior when + * using ``rte_ring_dequeue()`` or ``rte_ring_dequeue_bulk()`` + * is "single-consumer". Otherwise, it is "multi-consumers". + * - RING_F_MC_RTS_DEQ: If this flag is set, the default behavior when + * using ``rte_ring_dequeue()`` or ``rte_ring_dequeue_bulk()`` + * is "multi-consumer RTS mode". + * - RING_F_MC_HTS_DEQ: If this flag is set, the default behavior when + * using ``rte_ring_dequeue()`` or ``rte_ring_dequeue_bulk()`` + * is "multi-consumer HTS mode". + * If none of these flags is set, then default "multi-consumer" + * behavior is selected. + * @return + * On success, the pointer to the new allocated ring. NULL on error with + * rte_errno set appropriately. Possible errno values include: + * - E_RTE_NO_CONFIG - function could not get pointer to rte_config structure + * - E_RTE_SECONDARY - function was called from a secondary process instance + * - EINVAL - esize is not a multiple of 4 or count provided is not a + * power of 2. + * - ENOSPC - the maximum number of memzones has already been allocated + * - EEXIST - a memzone with the same name already exists + * - ENOMEM - no appropriate memory area found in which to create memzone + */ +__rte_experimental +struct rte_ring *rte_ring_create_elem(const char *name, unsigned int esize, + unsigned int count, int socket_id, unsigned int flags); + +static __rte_always_inline void +__rte_ring_enqueue_elems_32(struct rte_ring *r, const uint32_t size, + uint32_t idx, const void *obj_table, uint32_t n) +{ + unsigned int i; + uint32_t *ring = (uint32_t *)&r[1]; + const uint32_t *obj = (const uint32_t *)obj_table; + if (likely(idx + n < size)) { + for (i = 0; i < (n & ~0x7); i += 8, idx += 8) { + ring[idx] = obj[i]; + ring[idx + 1] = obj[i + 1]; + ring[idx + 2] = obj[i + 2]; + ring[idx + 3] = obj[i + 3]; + ring[idx + 4] = obj[i + 4]; + ring[idx + 5] = obj[i + 5]; + ring[idx + 6] = obj[i + 6]; + ring[idx + 7] = obj[i + 7]; + } + switch (n & 0x7) { + case 7: + ring[idx++] = obj[i++]; /* fallthrough */ + case 6: + ring[idx++] = obj[i++]; /* fallthrough */ + case 5: + ring[idx++] = obj[i++]; /* fallthrough */ + case 4: + ring[idx++] = obj[i++]; /* fallthrough */ + case 3: + ring[idx++] = obj[i++]; /* fallthrough */ + case 2: + ring[idx++] = obj[i++]; /* fallthrough */ + case 1: + ring[idx++] = obj[i++]; /* fallthrough */ + } + } else { + for (i = 0; idx < size; i++, idx++) + ring[idx] = obj[i]; + /* Start at the beginning */ + for (idx = 0; i < n; i++, idx++) + ring[idx] = obj[i]; + } +} + +static __rte_always_inline void +__rte_ring_enqueue_elems_64(struct rte_ring *r, uint32_t prod_head, + const void *obj_table, uint32_t n) +{ + unsigned int i; + const uint32_t size = r->size; + uint32_t idx = prod_head & r->mask; + uint64_t *ring = (uint64_t *)&r[1]; + const unaligned_uint64_t *obj = (const unaligned_uint64_t *)obj_table; + if (likely(idx + n < size)) { + for (i = 0; i < (n & ~0x3); i += 4, idx += 4) { + ring[idx] = obj[i]; + ring[idx + 1] = obj[i + 1]; + ring[idx + 2] = obj[i + 2]; + ring[idx + 3] = obj[i + 3]; + } + switch (n & 0x3) { + case 3: + ring[idx++] = obj[i++]; /* fallthrough */ + case 2: + ring[idx++] = obj[i++]; /* fallthrough */ + case 1: + ring[idx++] = obj[i++]; + } + } else { + for (i = 0; idx < size; i++, idx++) + ring[idx] = obj[i]; + /* Start at the beginning */ + for (idx = 0; i < n; i++, idx++) + ring[idx] = obj[i]; + } +} + +static __rte_always_inline void +__rte_ring_enqueue_elems_128(struct rte_ring *r, uint32_t prod_head, + const void *obj_table, uint32_t n) +{ + unsigned int i; + const uint32_t size = r->size; + uint32_t idx = prod_head & r->mask; + rte_int128_t *ring = (rte_int128_t *)&r[1]; + const rte_int128_t *obj = (const rte_int128_t *)obj_table; + if (likely(idx + n < size)) { + for (i = 0; i < (n & ~0x1); i += 2, idx += 2) + memcpy((void *)(ring + idx), + (const void *)(obj + i), 32); + switch (n & 0x1) { + case 1: + memcpy((void *)(ring + idx), + (const void *)(obj + i), 16); + } + } else { + for (i = 0; idx < size; i++, idx++) + memcpy((void *)(ring + idx), + (const void *)(obj + i), 16); + /* Start at the beginning */ + for (idx = 0; i < n; i++, idx++) + memcpy((void *)(ring + idx), + (const void *)(obj + i), 16); + } +} + +/* the actual enqueue of elements on the ring. + * Placed here since identical code needed in both + * single and multi producer enqueue functions. + */ +static __rte_always_inline void +__rte_ring_enqueue_elems(struct rte_ring *r, uint32_t prod_head, + const void *obj_table, uint32_t esize, uint32_t num) +{ + /* 8B and 16B copies implemented individually to retain + * the current performance. + */ + if (esize == 8) + __rte_ring_enqueue_elems_64(r, prod_head, obj_table, num); + else if (esize == 16) + __rte_ring_enqueue_elems_128(r, prod_head, obj_table, num); + else { + uint32_t idx, scale, nr_idx, nr_num, nr_size; + + /* Normalize to uint32_t */ + scale = esize / sizeof(uint32_t); + nr_num = num * scale; + idx = prod_head & r->mask; + nr_idx = idx * scale; + nr_size = r->size * scale; + __rte_ring_enqueue_elems_32(r, nr_size, nr_idx, + obj_table, nr_num); + } +} + +static __rte_always_inline void +__rte_ring_dequeue_elems_32(struct rte_ring *r, const uint32_t size, + uint32_t idx, void *obj_table, uint32_t n) +{ + unsigned int i; + uint32_t *ring = (uint32_t *)&r[1]; + uint32_t *obj = (uint32_t *)obj_table; + if (likely(idx + n < size)) { + for (i = 0; i < (n & ~0x7); i += 8, idx += 8) { + obj[i] = ring[idx]; + obj[i + 1] = ring[idx + 1]; + obj[i + 2] = ring[idx + 2]; + obj[i + 3] = ring[idx + 3]; + obj[i + 4] = ring[idx + 4]; + obj[i + 5] = ring[idx + 5]; + obj[i + 6] = ring[idx + 6]; + obj[i + 7] = ring[idx + 7]; + } + switch (n & 0x7) { + case 7: + obj[i++] = ring[idx++]; /* fallthrough */ + case 6: + obj[i++] = ring[idx++]; /* fallthrough */ + case 5: + obj[i++] = ring[idx++]; /* fallthrough */ + case 4: + obj[i++] = ring[idx++]; /* fallthrough */ + case 3: + obj[i++] = ring[idx++]; /* fallthrough */ + case 2: + obj[i++] = ring[idx++]; /* fallthrough */ + case 1: + obj[i++] = ring[idx++]; /* fallthrough */ + } + } else { + for (i = 0; idx < size; i++, idx++) + obj[i] = ring[idx]; + /* Start at the beginning */ + for (idx = 0; i < n; i++, idx++) + obj[i] = ring[idx]; + } +} + +static __rte_always_inline void +__rte_ring_dequeue_elems_64(struct rte_ring *r, uint32_t prod_head, + void *obj_table, uint32_t n) +{ + unsigned int i; + const uint32_t size = r->size; + uint32_t idx = prod_head & r->mask; + uint64_t *ring = (uint64_t *)&r[1]; + unaligned_uint64_t *obj = (unaligned_uint64_t *)obj_table; + if (likely(idx + n < size)) { + for (i = 0; i < (n & ~0x3); i += 4, idx += 4) { + obj[i] = ring[idx]; + obj[i + 1] = ring[idx + 1]; + obj[i + 2] = ring[idx + 2]; + obj[i + 3] = ring[idx + 3]; + } + switch (n & 0x3) { + case 3: + obj[i++] = ring[idx++]; /* fallthrough */ + case 2: + obj[i++] = ring[idx++]; /* fallthrough */ + case 1: + obj[i++] = ring[idx++]; /* fallthrough */ + } + } else { + for (i = 0; idx < size; i++, idx++) + obj[i] = ring[idx]; + /* Start at the beginning */ + for (idx = 0; i < n; i++, idx++) + obj[i] = ring[idx]; + } +} + +static __rte_always_inline void +__rte_ring_dequeue_elems_128(struct rte_ring *r, uint32_t prod_head, + void *obj_table, uint32_t n) +{ + unsigned int i; + const uint32_t size = r->size; + uint32_t idx = prod_head & r->mask; + rte_int128_t *ring = (rte_int128_t *)&r[1]; + rte_int128_t *obj = (rte_int128_t *)obj_table; + if (likely(idx + n < size)) { + for (i = 0; i < (n & ~0x1); i += 2, idx += 2) + memcpy((void *)(obj + i), (void *)(ring + idx), 32); + switch (n & 0x1) { + case 1: + memcpy((void *)(obj + i), (void *)(ring + idx), 16); + } + } else { + for (i = 0; idx < size; i++, idx++) + memcpy((void *)(obj + i), (void *)(ring + idx), 16); + /* Start at the beginning */ + for (idx = 0; i < n; i++, idx++) + memcpy((void *)(obj + i), (void *)(ring + idx), 16); + } +} + +/* the actual dequeue of elements from the ring. + * Placed here since identical code needed in both + * single and multi producer enqueue functions. + */ +static __rte_always_inline void +__rte_ring_dequeue_elems(struct rte_ring *r, uint32_t cons_head, + void *obj_table, uint32_t esize, uint32_t num) +{ + /* 8B and 16B copies implemented individually to retain + * the current performance. + */ + if (esize == 8) + __rte_ring_dequeue_elems_64(r, cons_head, obj_table, num); + else if (esize == 16) + __rte_ring_dequeue_elems_128(r, cons_head, obj_table, num); + else { + uint32_t idx, scale, nr_idx, nr_num, nr_size; + + /* Normalize to uint32_t */ + scale = esize / sizeof(uint32_t); + nr_num = num * scale; + idx = cons_head & r->mask; + nr_idx = idx * scale; + nr_size = r->size * scale; + __rte_ring_dequeue_elems_32(r, nr_size, nr_idx, + obj_table, nr_num); + } +} + +/* Between load and load. there might be cpu reorder in weak model + * (powerpc/arm). + * There are 2 choices for the users + * 1.use rmb() memory barrier + * 2.use one-direction load_acquire/store_release barrier,defined by + * CONFIG_RTE_USE_C11_MEM_MODEL=y + * It depends on performance test results. + * By default, move common functions to rte_ring_generic.h + */ +#ifdef RTE_USE_C11_MEM_MODEL +#include "rte_ring_c11_mem.h" +#else +#include "rte_ring_generic.h" +#endif + +/** + * @internal Enqueue several objects on the ring + * + * @param r + * A pointer to the ring structure. + * @param obj_table + * A pointer to a table of objects. + * @param esize + * The size of ring element, in bytes. It must be a multiple of 4. + * This must be the same value used while creating the ring. Otherwise + * the results are undefined. + * @param n + * The number of objects to add in the ring from the obj_table. + * @param behavior + * RTE_RING_QUEUE_FIXED: Enqueue a fixed number of items from a ring + * RTE_RING_QUEUE_VARIABLE: Enqueue as many items as possible from ring + * @param is_sp + * Indicates whether to use single producer or multi-producer head update + * @param free_space + * returns the amount of space after the enqueue operation has finished + * @return + * Actual number of objects enqueued. + * If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only. + */ +static __rte_always_inline unsigned int +__rte_ring_do_enqueue_elem(struct rte_ring *r, const void *obj_table, + unsigned int esize, unsigned int n, + enum rte_ring_queue_behavior behavior, unsigned int is_sp, + unsigned int *free_space) +{ + uint32_t prod_head, prod_next; + uint32_t free_entries; + + n = __rte_ring_move_prod_head(r, is_sp, n, behavior, + &prod_head, &prod_next, &free_entries); + if (n == 0) + goto end; + + __rte_ring_enqueue_elems(r, prod_head, obj_table, esize, n); + + update_tail(&r->prod, prod_head, prod_next, is_sp, 1); +end: + if (free_space != NULL) + *free_space = free_entries - n; + return n; +} + +/** + * @internal Dequeue several objects from the ring + * + * @param r + * A pointer to the ring structure. + * @param obj_table + * A pointer to a table of objects. + * @param esize + * The size of ring element, in bytes. It must be a multiple of 4. + * This must be the same value used while creating the ring. Otherwise + * the results are undefined. + * @param n + * The number of objects to pull from the ring. + * @param behavior + * RTE_RING_QUEUE_FIXED: Dequeue a fixed number of items from a ring + * RTE_RING_QUEUE_VARIABLE: Dequeue as many items as possible from ring + * @param is_sc + * Indicates whether to use single consumer or multi-consumer head update + * @param available + * returns the number of remaining ring entries after the dequeue has finished + * @return + * - Actual number of objects dequeued. + * If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only. + */ +static __rte_always_inline unsigned int +__rte_ring_do_dequeue_elem(struct rte_ring *r, void *obj_table, + unsigned int esize, unsigned int n, + enum rte_ring_queue_behavior behavior, unsigned int is_sc, + unsigned int *available) +{ + uint32_t cons_head, cons_next; + uint32_t entries; + + n = __rte_ring_move_cons_head(r, (int)is_sc, n, behavior, + &cons_head, &cons_next, &entries); + if (n == 0) + goto end; + + __rte_ring_dequeue_elems(r, cons_head, obj_table, esize, n); + + update_tail(&r->cons, cons_head, cons_next, is_sc, 0); + +end: + if (available != NULL) + *available = entries - n; + return n; +} + +/** + * Enqueue several objects on the ring (multi-producers safe). + * + * This function uses a "compare and set" instruction to move the + * producer index atomically. + * + * @param r + * A pointer to the ring structure. + * @param obj_table + * A pointer to a table of objects. + * @param esize + * The size of ring element, in bytes. It must be a multiple of 4. + * This must be the same value used while creating the ring. Otherwise + * the results are undefined. + * @param n + * The number of objects to add in the ring from the obj_table. + * @param free_space + * if non-NULL, returns the amount of space in the ring after the + * enqueue operation has finished. + * @return + * The number of objects enqueued, either 0 or n + */ +static __rte_always_inline unsigned int +rte_ring_mp_enqueue_bulk_elem(struct rte_ring *r, const void *obj_table, + unsigned int esize, unsigned int n, unsigned int *free_space) +{ + return __rte_ring_do_enqueue_elem(r, obj_table, esize, n, + RTE_RING_QUEUE_FIXED, RTE_RING_SYNC_MT, free_space); +} + +/** + * Enqueue several objects on a ring + * + * @warning This API is NOT multi-producers safe + * + * @param r + * A pointer to the ring structure. + * @param obj_table + * A pointer to a table of objects. + * @param esize + * The size of ring element, in bytes. It must be a multiple of 4. + * This must be the same value used while creating the ring. Otherwise + * the results are undefined. + * @param n + * The number of objects to add in the ring from the obj_table. + * @param free_space + * if non-NULL, returns the amount of space in the ring after the + * enqueue operation has finished. + * @return + * The number of objects enqueued, either 0 or n + */ +static __rte_always_inline unsigned int +rte_ring_sp_enqueue_bulk_elem(struct rte_ring *r, const void *obj_table, + unsigned int esize, unsigned int n, unsigned int *free_space) +{ + return __rte_ring_do_enqueue_elem(r, obj_table, esize, n, + RTE_RING_QUEUE_FIXED, RTE_RING_SYNC_ST, free_space); +} + +#ifdef ALLOW_EXPERIMENTAL_API +#include <rte_ring_hts.h> +#include <rte_ring_rts.h> +#endif + +/** + * Enqueue several objects on a ring. + * + * This function calls the multi-producer or the single-producer + * version depending on the default behavior that was specified at + * ring creation time (see flags). + * + * @param r + * A pointer to the ring structure. + * @param obj_table + * A pointer to a table of objects. + * @param esize + * The size of ring element, in bytes. It must be a multiple of 4. + * This must be the same value used while creating the ring. Otherwise + * the results are undefined. + * @param n + * The number of objects to add in the ring from the obj_table. + * @param free_space + * if non-NULL, returns the amount of space in the ring after the + * enqueue operation has finished. + * @return + * The number of objects enqueued, either 0 or n + */ +static __rte_always_inline unsigned int +rte_ring_enqueue_bulk_elem(struct rte_ring *r, const void *obj_table, + unsigned int esize, unsigned int n, unsigned int *free_space) +{ + return __rte_ring_do_enqueue_elem(r, obj_table, esize, n, + RTE_RING_QUEUE_FIXED, r->prod.sync_type, free_space); + + switch (r->prod.sync_type) { + case RTE_RING_SYNC_MT: + return rte_ring_mp_enqueue_bulk_elem(r, obj_table, esize, n, + free_space); + case RTE_RING_SYNC_ST: + return rte_ring_sp_enqueue_bulk_elem(r, obj_table, esize, n, + free_space); +#ifdef ALLOW_EXPERIMENTAL_API + case RTE_RING_SYNC_MT_RTS: + return rte_ring_mp_rts_enqueue_bulk_elem(r, obj_table, esize, n, + free_space); + case RTE_RING_SYNC_MT_HTS: + return rte_ring_mp_hts_enqueue_bulk_elem(r, obj_table, esize, n, + free_space); +#endif + } + + /* valid ring should never reach this point */ + RTE_ASSERT(0); + if (free_space != NULL) + *free_space = 0; + return 0; +} + +/** + * Enqueue one object on a ring (multi-producers safe). + * + * This function uses a "compare and set" instruction to move the + * producer index atomically. + * + * @param r + * A pointer to the ring structure. + * @param obj + * A pointer to the object to be added. + * @param esize + * The size of ring element, in bytes. It must be a multiple of 4. + * This must be the same value used while creating the ring. Otherwise + * the results are undefined. + * @return + * - 0: Success; objects enqueued. + * - -ENOBUFS: Not enough room in the ring to enqueue; no object is enqueued. + */ +static __rte_always_inline int +rte_ring_mp_enqueue_elem(struct rte_ring *r, void *obj, unsigned int esize) +{ + return rte_ring_mp_enqueue_bulk_elem(r, obj, esize, 1, NULL) ? 0 : + -ENOBUFS; +} + +/** + * Enqueue one object on a ring + * + * @warning This API is NOT multi-producers safe + * + * @param r + * A pointer to the ring structure. + * @param obj + * A pointer to the object to be added. + * @param esize + * The size of ring element, in bytes. It must be a multiple of 4. + * This must be the same value used while creating the ring. Otherwise + * the results are undefined. + * @return + * - 0: Success; objects enqueued. + * - -ENOBUFS: Not enough room in the ring to enqueue; no object is enqueued. + */ +static __rte_always_inline int +rte_ring_sp_enqueue_elem(struct rte_ring *r, void *obj, unsigned int esize) +{ + return rte_ring_sp_enqueue_bulk_elem(r, obj, esize, 1, NULL) ? 0 : + -ENOBUFS; +} + +/** + * Enqueue one object on a ring. + * + * This function calls the multi-producer or the single-producer + * version, depending on the default behaviour that was specified at + * ring creation time (see flags). + * + * @param r + * A pointer to the ring structure. + * @param obj + * A pointer to the object to be added. + * @param esize + * The size of ring element, in bytes. It must be a multiple of 4. + * This must be the same value used while creating the ring. Otherwise + * the results are undefined. + * @return + * - 0: Success; objects enqueued. + * - -ENOBUFS: Not enough room in the ring to enqueue; no object is enqueued. + */ +static __rte_always_inline int +rte_ring_enqueue_elem(struct rte_ring *r, void *obj, unsigned int esize) +{ + return rte_ring_enqueue_bulk_elem(r, obj, esize, 1, NULL) ? 0 : + -ENOBUFS; +} + +/** + * Dequeue several objects from a ring (multi-consumers safe). + * + * This function uses a "compare and set" instruction to move the + * consumer index atomically. + * + * @param r + * A pointer to the ring structure. + * @param obj_table + * A pointer to a table of objects that will be filled. + * @param esize + * The size of ring element, in bytes. It must be a multiple of 4. + * This must be the same value used while creating the ring. Otherwise + * the results are undefined. + * @param n + * The number of objects to dequeue from the ring to the obj_table. + * @param available + * If non-NULL, returns the number of remaining ring entries after the + * dequeue has finished. + * @return + * The number of objects dequeued, either 0 or n + */ +static __rte_always_inline unsigned int +rte_ring_mc_dequeue_bulk_elem(struct rte_ring *r, void *obj_table, + unsigned int esize, unsigned int n, unsigned int *available) +{ + return __rte_ring_do_dequeue_elem(r, obj_table, esize, n, + RTE_RING_QUEUE_FIXED, RTE_RING_SYNC_MT, available); +} + +/** + * Dequeue several objects from a ring (NOT multi-consumers safe). + * + * @param r + * A pointer to the ring structure. + * @param obj_table + * A pointer to a table of objects that will be filled. + * @param esize + * The size of ring element, in bytes. It must be a multiple of 4. + * This must be the same value used while creating the ring. Otherwise + * the results are undefined. + * @param n + * The number of objects to dequeue from the ring to the obj_table, + * must be strictly positive. + * @param available + * If non-NULL, returns the number of remaining ring entries after the + * dequeue has finished. + * @return + * The number of objects dequeued, either 0 or n + */ +static __rte_always_inline unsigned int +rte_ring_sc_dequeue_bulk_elem(struct rte_ring *r, void *obj_table, + unsigned int esize, unsigned int n, unsigned int *available) +{ + return __rte_ring_do_dequeue_elem(r, obj_table, esize, n, + RTE_RING_QUEUE_FIXED, RTE_RING_SYNC_ST, available); +} + +/** + * Dequeue several objects from a ring. + * + * This function calls the multi-consumers or the single-consumer + * version, depending on the default behaviour that was specified at + * ring creation time (see flags). + * + * @param r + * A pointer to the ring structure. + * @param obj_table + * A pointer to a table of objects that will be filled. + * @param esize + * The size of ring element, in bytes. It must be a multiple of 4. + * This must be the same value used while creating the ring. Otherwise + * the results are undefined. + * @param n + * The number of objects to dequeue from the ring to the obj_table. + * @param available + * If non-NULL, returns the number of remaining ring entries after the + * dequeue has finished. + * @return + * The number of objects dequeued, either 0 or n + */ +static __rte_always_inline unsigned int +rte_ring_dequeue_bulk_elem(struct rte_ring *r, void *obj_table, + unsigned int esize, unsigned int n, unsigned int *available) +{ + switch (r->cons.sync_type) { + case RTE_RING_SYNC_MT: + return rte_ring_mc_dequeue_bulk_elem(r, obj_table, esize, n, + available); + case RTE_RING_SYNC_ST: + return rte_ring_sc_dequeue_bulk_elem(r, obj_table, esize, n, + available); +#ifdef ALLOW_EXPERIMENTAL_API + case RTE_RING_SYNC_MT_RTS: + return rte_ring_mc_rts_dequeue_bulk_elem(r, obj_table, esize, + n, available); + case RTE_RING_SYNC_MT_HTS: + return rte_ring_mc_hts_dequeue_bulk_elem(r, obj_table, esize, + n, available); +#endif + } + + /* valid ring should never reach this point */ + RTE_ASSERT(0); + if (available != NULL) + *available = 0; + return 0; +} + +/** + * Dequeue one object from a ring (multi-consumers safe). + * + * This function uses a "compare and set" instruction to move the + * consumer index atomically. + * + * @param r + * A pointer to the ring structure. + * @param obj_p + * A pointer to the object that will be filled. + * @param esize + * The size of ring element, in bytes. It must be a multiple of 4. + * This must be the same value used while creating the ring. Otherwise + * the results are undefined. + * @return + * - 0: Success; objects dequeued. + * - -ENOENT: Not enough entries in the ring to dequeue; no object is + * dequeued. + */ +static __rte_always_inline int +rte_ring_mc_dequeue_elem(struct rte_ring *r, void *obj_p, + unsigned int esize) +{ + return rte_ring_mc_dequeue_bulk_elem(r, obj_p, esize, 1, NULL) ? 0 : + -ENOENT; +} + +/** + * Dequeue one object from a ring (NOT multi-consumers safe). + * + * @param r + * A pointer to the ring structure. + * @param obj_p + * A pointer to the object that will be filled. + * @param esize + * The size of ring element, in bytes. It must be a multiple of 4. + * This must be the same value used while creating the ring. Otherwise + * the results are undefined. + * @return + * - 0: Success; objects dequeued. + * - -ENOENT: Not enough entries in the ring to dequeue, no object is + * dequeued. + */ +static __rte_always_inline int +rte_ring_sc_dequeue_elem(struct rte_ring *r, void *obj_p, + unsigned int esize) +{ + return rte_ring_sc_dequeue_bulk_elem(r, obj_p, esize, 1, NULL) ? 0 : + -ENOENT; +} + +/** + * Dequeue one object from a ring. + * + * This function calls the multi-consumers or the single-consumer + * version depending on the default behaviour that was specified at + * ring creation time (see flags). + * + * @param r + * A pointer to the ring structure. + * @param obj_p + * A pointer to the object that will be filled. + * @param esize + * The size of ring element, in bytes. It must be a multiple of 4. + * This must be the same value used while creating the ring. Otherwise + * the results are undefined. + * @return + * - 0: Success, objects dequeued. + * - -ENOENT: Not enough entries in the ring to dequeue, no object is + * dequeued. + */ +static __rte_always_inline int +rte_ring_dequeue_elem(struct rte_ring *r, void *obj_p, unsigned int esize) +{ + return rte_ring_dequeue_bulk_elem(r, obj_p, esize, 1, NULL) ? 0 : + -ENOENT; +} + +/** + * Enqueue several objects on the ring (multi-producers safe). + * + * This function uses a "compare and set" instruction to move the + * producer index atomically. + * + * @param r + * A pointer to the ring structure. + * @param obj_table + * A pointer to a table of objects. + * @param esize + * The size of ring element, in bytes. It must be a multiple of 4. + * This must be the same value used while creating the ring. Otherwise + * the results are undefined. + * @param n + * The number of objects to add in the ring from the obj_table. + * @param free_space + * if non-NULL, returns the amount of space in the ring after the + * enqueue operation has finished. + * @return + * - n: Actual number of objects enqueued. + */ +static __rte_always_inline unsigned +rte_ring_mp_enqueue_burst_elem(struct rte_ring *r, const void *obj_table, + unsigned int esize, unsigned int n, unsigned int *free_space) +{ + return __rte_ring_do_enqueue_elem(r, obj_table, esize, n, + RTE_RING_QUEUE_VARIABLE, RTE_RING_SYNC_MT, free_space); +} + +/** + * Enqueue several objects on a ring + * + * @warning This API is NOT multi-producers safe + * + * @param r + * A pointer to the ring structure. + * @param obj_table + * A pointer to a table of objects. + * @param esize + * The size of ring element, in bytes. It must be a multiple of 4. + * This must be the same value used while creating the ring. Otherwise + * the results are undefined. + * @param n + * The number of objects to add in the ring from the obj_table. + * @param free_space + * if non-NULL, returns the amount of space in the ring after the + * enqueue operation has finished. + * @return + * - n: Actual number of objects enqueued. + */ +static __rte_always_inline unsigned +rte_ring_sp_enqueue_burst_elem(struct rte_ring *r, const void *obj_table, + unsigned int esize, unsigned int n, unsigned int *free_space) +{ + return __rte_ring_do_enqueue_elem(r, obj_table, esize, n, + RTE_RING_QUEUE_VARIABLE, RTE_RING_SYNC_ST, free_space); +} + +/** + * Enqueue several objects on a ring. + * + * This function calls the multi-producer or the single-producer + * version depending on the default behavior that was specified at + * ring creation time (see flags). + * + * @param r + * A pointer to the ring structure. + * @param obj_table + * A pointer to a table of objects. + * @param esize + * The size of ring element, in bytes. It must be a multiple of 4. + * This must be the same value used while creating the ring. Otherwise + * the results are undefined. + * @param n + * The number of objects to add in the ring from the obj_table. + * @param free_space + * if non-NULL, returns the amount of space in the ring after the + * enqueue operation has finished. + * @return + * - n: Actual number of objects enqueued. + */ +static __rte_always_inline unsigned +rte_ring_enqueue_burst_elem(struct rte_ring *r, const void *obj_table, + unsigned int esize, unsigned int n, unsigned int *free_space) +{ + switch (r->prod.sync_type) { + case RTE_RING_SYNC_MT: + return rte_ring_mp_enqueue_burst_elem(r, obj_table, esize, n, + free_space); + case RTE_RING_SYNC_ST: + return rte_ring_sp_enqueue_burst_elem(r, obj_table, esize, n, + free_space); +#ifdef ALLOW_EXPERIMENTAL_API + case RTE_RING_SYNC_MT_RTS: + return rte_ring_mp_rts_enqueue_burst_elem(r, obj_table, esize, + n, free_space); + case RTE_RING_SYNC_MT_HTS: + return rte_ring_mp_hts_enqueue_burst_elem(r, obj_table, esize, + n, free_space); +#endif + } + + /* valid ring should never reach this point */ + RTE_ASSERT(0); + if (free_space != NULL) + *free_space = 0; + return 0; +} + +/** + * Dequeue several objects from a ring (multi-consumers safe). When the request + * objects are more than the available objects, only dequeue the actual number + * of objects + * + * This function uses a "compare and set" instruction to move the + * consumer index atomically. + * + * @param r + * A pointer to the ring structure. + * @param obj_table + * A pointer to a table of objects that will be filled. + * @param esize + * The size of ring element, in bytes. It must be a multiple of 4. + * This must be the same value used while creating the ring. Otherwise + * the results are undefined. + * @param n + * The number of objects to dequeue from the ring to the obj_table. + * @param available + * If non-NULL, returns the number of remaining ring entries after the + * dequeue has finished. + * @return + * - n: Actual number of objects dequeued, 0 if ring is empty + */ +static __rte_always_inline unsigned +rte_ring_mc_dequeue_burst_elem(struct rte_ring *r, void *obj_table, + unsigned int esize, unsigned int n, unsigned int *available) +{ + return __rte_ring_do_dequeue_elem(r, obj_table, esize, n, + RTE_RING_QUEUE_VARIABLE, RTE_RING_SYNC_MT, available); +} + +/** + * Dequeue several objects from a ring (NOT multi-consumers safe).When the + * request objects are more than the available objects, only dequeue the + * actual number of objects + * + * @param r + * A pointer to the ring structure. + * @param obj_table + * A pointer to a table of objects that will be filled. + * @param esize + * The size of ring element, in bytes. It must be a multiple of 4. + * This must be the same value used while creating the ring. Otherwise + * the results are undefined. + * @param n + * The number of objects to dequeue from the ring to the obj_table. + * @param available + * If non-NULL, returns the number of remaining ring entries after the + * dequeue has finished. + * @return + * - n: Actual number of objects dequeued, 0 if ring is empty + */ +static __rte_always_inline unsigned +rte_ring_sc_dequeue_burst_elem(struct rte_ring *r, void *obj_table, + unsigned int esize, unsigned int n, unsigned int *available) +{ + return __rte_ring_do_dequeue_elem(r, obj_table, esize, n, + RTE_RING_QUEUE_VARIABLE, RTE_RING_SYNC_ST, available); +} + +/** + * Dequeue multiple objects from a ring up to a maximum number. + * + * This function calls the multi-consumers or the single-consumer + * version, depending on the default behaviour that was specified at + * ring creation time (see flags). + * + * @param r + * A pointer to the ring structure. + * @param obj_table + * A pointer to a table of objects that will be filled. + * @param esize + * The size of ring element, in bytes. It must be a multiple of 4. + * This must be the same value used while creating the ring. Otherwise + * the results are undefined. + * @param n + * The number of objects to dequeue from the ring to the obj_table. + * @param available + * If non-NULL, returns the number of remaining ring entries after the + * dequeue has finished. + * @return + * - Number of objects dequeued + */ +static __rte_always_inline unsigned int +rte_ring_dequeue_burst_elem(struct rte_ring *r, void *obj_table, + unsigned int esize, unsigned int n, unsigned int *available) +{ + switch (r->cons.sync_type) { + case RTE_RING_SYNC_MT: + return rte_ring_mc_dequeue_burst_elem(r, obj_table, esize, n, + available); + case RTE_RING_SYNC_ST: + return rte_ring_sc_dequeue_burst_elem(r, obj_table, esize, n, + available); +#ifdef ALLOW_EXPERIMENTAL_API + case RTE_RING_SYNC_MT_RTS: + return rte_ring_mc_rts_dequeue_burst_elem(r, obj_table, esize, + n, available); + case RTE_RING_SYNC_MT_HTS: + return rte_ring_mc_hts_dequeue_burst_elem(r, obj_table, esize, + n, available); +#endif + } + + /* valid ring should never reach this point */ + RTE_ASSERT(0); + if (available != NULL) + *available = 0; + return 0; +} + +#ifdef ALLOW_EXPERIMENTAL_API +#include <rte_ring_peek.h> +#endif + +#include <rte_ring.h> + +#ifdef __cplusplus +} +#endif + +#endif /* _RTE_RING_ELEM_H_ */ diff --git a/src/spdk/dpdk/lib/librte_ring/rte_ring_generic.h b/src/spdk/dpdk/lib/librte_ring/rte_ring_generic.h new file mode 100644 index 000000000..953cdbbd5 --- /dev/null +++ b/src/spdk/dpdk/lib/librte_ring/rte_ring_generic.h @@ -0,0 +1,173 @@ +/* SPDX-License-Identifier: BSD-3-Clause + * + * Copyright (c) 2010-2017 Intel Corporation + * Copyright (c) 2007-2009 Kip Macy kmacy@freebsd.org + * All rights reserved. + * Derived from FreeBSD's bufring.h + * Used as BSD-3 Licensed with permission from Kip Macy. + */ + +#ifndef _RTE_RING_GENERIC_H_ +#define _RTE_RING_GENERIC_H_ + +static __rte_always_inline void +update_tail(struct rte_ring_headtail *ht, uint32_t old_val, uint32_t new_val, + uint32_t single, uint32_t enqueue) +{ + if (enqueue) + rte_smp_wmb(); + else + rte_smp_rmb(); + /* + * If there are other enqueues/dequeues in progress that preceded us, + * we need to wait for them to complete + */ + if (!single) + while (unlikely(ht->tail != old_val)) + rte_pause(); + + ht->tail = new_val; +} + +/** + * @internal This function updates the producer head for enqueue + * + * @param r + * A pointer to the ring structure + * @param is_sp + * Indicates whether multi-producer path is needed or not + * @param n + * The number of elements we will want to enqueue, i.e. how far should the + * head be moved + * @param behavior + * RTE_RING_QUEUE_FIXED: Enqueue a fixed number of items from a ring + * RTE_RING_QUEUE_VARIABLE: Enqueue as many items as possible from ring + * @param old_head + * Returns head value as it was before the move, i.e. where enqueue starts + * @param new_head + * Returns the current/new head value i.e. where enqueue finishes + * @param free_entries + * Returns the amount of free space in the ring BEFORE head was moved + * @return + * Actual number of objects enqueued. + * If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only. + */ +static __rte_always_inline unsigned int +__rte_ring_move_prod_head(struct rte_ring *r, unsigned int is_sp, + unsigned int n, enum rte_ring_queue_behavior behavior, + uint32_t *old_head, uint32_t *new_head, + uint32_t *free_entries) +{ + const uint32_t capacity = r->capacity; + unsigned int max = n; + int success; + + do { + /* Reset n to the initial burst count */ + n = max; + + *old_head = r->prod.head; + + /* add rmb barrier to avoid load/load reorder in weak + * memory model. It is noop on x86 + */ + rte_smp_rmb(); + + /* + * The subtraction is done between two unsigned 32bits value + * (the result is always modulo 32 bits even if we have + * *old_head > cons_tail). So 'free_entries' is always between 0 + * and capacity (which is < size). + */ + *free_entries = (capacity + r->cons.tail - *old_head); + + /* check that we have enough room in ring */ + if (unlikely(n > *free_entries)) + n = (behavior == RTE_RING_QUEUE_FIXED) ? + 0 : *free_entries; + + if (n == 0) + return 0; + + *new_head = *old_head + n; + if (is_sp) + r->prod.head = *new_head, success = 1; + else + success = rte_atomic32_cmpset(&r->prod.head, + *old_head, *new_head); + } while (unlikely(success == 0)); + return n; +} + +/** + * @internal This function updates the consumer head for dequeue + * + * @param r + * A pointer to the ring structure + * @param is_sc + * Indicates whether multi-consumer path is needed or not + * @param n + * The number of elements we will want to enqueue, i.e. how far should the + * head be moved + * @param behavior + * RTE_RING_QUEUE_FIXED: Dequeue a fixed number of items from a ring + * RTE_RING_QUEUE_VARIABLE: Dequeue as many items as possible from ring + * @param old_head + * Returns head value as it was before the move, i.e. where dequeue starts + * @param new_head + * Returns the current/new head value i.e. where dequeue finishes + * @param entries + * Returns the number of entries in the ring BEFORE head was moved + * @return + * - Actual number of objects dequeued. + * If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only. + */ +static __rte_always_inline unsigned int +__rte_ring_move_cons_head(struct rte_ring *r, unsigned int is_sc, + unsigned int n, enum rte_ring_queue_behavior behavior, + uint32_t *old_head, uint32_t *new_head, + uint32_t *entries) +{ + unsigned int max = n; + int success; + + /* move cons.head atomically */ + do { + /* Restore n as it may change every loop */ + n = max; + + *old_head = r->cons.head; + + /* add rmb barrier to avoid load/load reorder in weak + * memory model. It is noop on x86 + */ + rte_smp_rmb(); + + /* The subtraction is done between two unsigned 32bits value + * (the result is always modulo 32 bits even if we have + * cons_head > prod_tail). So 'entries' is always between 0 + * and size(ring)-1. + */ + *entries = (r->prod.tail - *old_head); + + /* Set the actual entries for dequeue */ + if (n > *entries) + n = (behavior == RTE_RING_QUEUE_FIXED) ? 0 : *entries; + + if (unlikely(n == 0)) + return 0; + + *new_head = *old_head + n; + if (is_sc) { + r->cons.head = *new_head; + rte_smp_rmb(); + success = 1; + } else { + success = rte_atomic32_cmpset(&r->cons.head, *old_head, + *new_head); + } + } while (unlikely(success == 0)); + return n; +} + +#endif /* _RTE_RING_GENERIC_H_ */ diff --git a/src/spdk/dpdk/lib/librte_ring/rte_ring_hts.h b/src/spdk/dpdk/lib/librte_ring/rte_ring_hts.h new file mode 100644 index 000000000..c7701defc --- /dev/null +++ b/src/spdk/dpdk/lib/librte_ring/rte_ring_hts.h @@ -0,0 +1,332 @@ +/* SPDX-License-Identifier: BSD-3-Clause + * + * Copyright (c) 2010-2020 Intel Corporation + * Copyright (c) 2007-2009 Kip Macy kmacy@freebsd.org + * All rights reserved. + * Derived from FreeBSD's bufring.h + * Used as BSD-3 Licensed with permission from Kip Macy. + */ + +#ifndef _RTE_RING_HTS_H_ +#define _RTE_RING_HTS_H_ + +/** + * @file rte_ring_hts.h + * @b EXPERIMENTAL: this API may change without prior notice + * It is not recommended to include this file directly. + * Please include <rte_ring.h> instead. + * + * Contains functions for serialized, aka Head-Tail Sync (HTS) ring mode. + * In that mode enqueue/dequeue operation is fully serialized: + * at any given moment only one enqueue/dequeue operation can proceed. + * This is achieved by allowing a thread to proceed with changing head.value + * only when head.value == tail.value. + * Both head and tail values are updated atomically (as one 64-bit value). + * To achieve that 64-bit CAS is used by head update routine. + */ + +#ifdef __cplusplus +extern "C" { +#endif + +#include <rte_ring_hts_c11_mem.h> + +/** + * @internal Enqueue several objects on the HTS ring. + * + * @param r + * A pointer to the ring structure. + * @param obj_table + * A pointer to a table of objects. + * @param esize + * The size of ring element, in bytes. It must be a multiple of 4. + * This must be the same value used while creating the ring. Otherwise + * the results are undefined. + * @param n + * The number of objects to add in the ring from the obj_table. + * @param behavior + * RTE_RING_QUEUE_FIXED: Enqueue a fixed number of items from a ring + * RTE_RING_QUEUE_VARIABLE: Enqueue as many items as possible from ring + * @param free_space + * returns the amount of space after the enqueue operation has finished + * @return + * Actual number of objects enqueued. + * If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only. + */ +static __rte_always_inline unsigned int +__rte_ring_do_hts_enqueue_elem(struct rte_ring *r, const void *obj_table, + uint32_t esize, uint32_t n, enum rte_ring_queue_behavior behavior, + uint32_t *free_space) +{ + uint32_t free, head; + + n = __rte_ring_hts_move_prod_head(r, n, behavior, &head, &free); + + if (n != 0) { + __rte_ring_enqueue_elems(r, head, obj_table, esize, n); + __rte_ring_hts_update_tail(&r->hts_prod, head, n, 1); + } + + if (free_space != NULL) + *free_space = free - n; + return n; +} + +/** + * @internal Dequeue several objects from the HTS ring. + * + * @param r + * A pointer to the ring structure. + * @param obj_table + * A pointer to a table of objects. + * @param esize + * The size of ring element, in bytes. It must be a multiple of 4. + * This must be the same value used while creating the ring. Otherwise + * the results are undefined. + * @param n + * The number of objects to pull from the ring. + * @param behavior + * RTE_RING_QUEUE_FIXED: Dequeue a fixed number of items from a ring + * RTE_RING_QUEUE_VARIABLE: Dequeue as many items as possible from ring + * @param available + * returns the number of remaining ring entries after the dequeue has finished + * @return + * - Actual number of objects dequeued. + * If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only. + */ +static __rte_always_inline unsigned int +__rte_ring_do_hts_dequeue_elem(struct rte_ring *r, void *obj_table, + uint32_t esize, uint32_t n, enum rte_ring_queue_behavior behavior, + uint32_t *available) +{ + uint32_t entries, head; + + n = __rte_ring_hts_move_cons_head(r, n, behavior, &head, &entries); + + if (n != 0) { + __rte_ring_dequeue_elems(r, head, obj_table, esize, n); + __rte_ring_hts_update_tail(&r->hts_cons, head, n, 0); + } + + if (available != NULL) + *available = entries - n; + return n; +} + +/** + * Enqueue several objects on the HTS ring (multi-producers safe). + * + * @param r + * A pointer to the ring structure. + * @param obj_table + * A pointer to a table of objects. + * @param esize + * The size of ring element, in bytes. It must be a multiple of 4. + * This must be the same value used while creating the ring. Otherwise + * the results are undefined. + * @param n + * The number of objects to add in the ring from the obj_table. + * @param free_space + * if non-NULL, returns the amount of space in the ring after the + * enqueue operation has finished. + * @return + * The number of objects enqueued, either 0 or n + */ +__rte_experimental +static __rte_always_inline unsigned int +rte_ring_mp_hts_enqueue_bulk_elem(struct rte_ring *r, const void *obj_table, + unsigned int esize, unsigned int n, unsigned int *free_space) +{ + return __rte_ring_do_hts_enqueue_elem(r, obj_table, esize, n, + RTE_RING_QUEUE_FIXED, free_space); +} + +/** + * Dequeue several objects from an HTS ring (multi-consumers safe). + * + * @param r + * A pointer to the ring structure. + * @param obj_table + * A pointer to a table of objects that will be filled. + * @param esize + * The size of ring element, in bytes. It must be a multiple of 4. + * This must be the same value used while creating the ring. Otherwise + * the results are undefined. + * @param n + * The number of objects to dequeue from the ring to the obj_table. + * @param available + * If non-NULL, returns the number of remaining ring entries after the + * dequeue has finished. + * @return + * The number of objects dequeued, either 0 or n + */ +__rte_experimental +static __rte_always_inline unsigned int +rte_ring_mc_hts_dequeue_bulk_elem(struct rte_ring *r, void *obj_table, + unsigned int esize, unsigned int n, unsigned int *available) +{ + return __rte_ring_do_hts_dequeue_elem(r, obj_table, esize, n, + RTE_RING_QUEUE_FIXED, available); +} + +/** + * Enqueue several objects on the HTS ring (multi-producers safe). + * + * @param r + * A pointer to the ring structure. + * @param obj_table + * A pointer to a table of objects. + * @param esize + * The size of ring element, in bytes. It must be a multiple of 4. + * This must be the same value used while creating the ring. Otherwise + * the results are undefined. + * @param n + * The number of objects to add in the ring from the obj_table. + * @param free_space + * if non-NULL, returns the amount of space in the ring after the + * enqueue operation has finished. + * @return + * - n: Actual number of objects enqueued. + */ +__rte_experimental +static __rte_always_inline unsigned +rte_ring_mp_hts_enqueue_burst_elem(struct rte_ring *r, const void *obj_table, + unsigned int esize, unsigned int n, unsigned int *free_space) +{ + return __rte_ring_do_hts_enqueue_elem(r, obj_table, esize, n, + RTE_RING_QUEUE_VARIABLE, free_space); +} + +/** + * Dequeue several objects from an HTS ring (multi-consumers safe). + * When the requested objects are more than the available objects, + * only dequeue the actual number of objects. + * + * @param r + * A pointer to the ring structure. + * @param obj_table + * A pointer to a table of objects that will be filled. + * @param esize + * The size of ring element, in bytes. It must be a multiple of 4. + * This must be the same value used while creating the ring. Otherwise + * the results are undefined. + * @param n + * The number of objects to dequeue from the ring to the obj_table. + * @param available + * If non-NULL, returns the number of remaining ring entries after the + * dequeue has finished. + * @return + * - n: Actual number of objects dequeued, 0 if ring is empty + */ +__rte_experimental +static __rte_always_inline unsigned +rte_ring_mc_hts_dequeue_burst_elem(struct rte_ring *r, void *obj_table, + unsigned int esize, unsigned int n, unsigned int *available) +{ + return __rte_ring_do_hts_dequeue_elem(r, obj_table, esize, n, + RTE_RING_QUEUE_VARIABLE, available); +} + +/** + * Enqueue several objects on the HTS ring (multi-producers safe). + * + * @param r + * A pointer to the ring structure. + * @param obj_table + * A pointer to a table of void * pointers (objects). + * @param n + * The number of objects to add in the ring from the obj_table. + * @param free_space + * if non-NULL, returns the amount of space in the ring after the + * enqueue operation has finished. + * @return + * The number of objects enqueued, either 0 or n + */ +__rte_experimental +static __rte_always_inline unsigned int +rte_ring_mp_hts_enqueue_bulk(struct rte_ring *r, void * const *obj_table, + unsigned int n, unsigned int *free_space) +{ + return rte_ring_mp_hts_enqueue_bulk_elem(r, obj_table, + sizeof(uintptr_t), n, free_space); +} + +/** + * Dequeue several objects from an HTS ring (multi-consumers safe). + * + * @param r + * A pointer to the ring structure. + * @param obj_table + * A pointer to a table of void * pointers (objects) that will be filled. + * @param n + * The number of objects to dequeue from the ring to the obj_table. + * @param available + * If non-NULL, returns the number of remaining ring entries after the + * dequeue has finished. + * @return + * The number of objects dequeued, either 0 or n + */ +__rte_experimental +static __rte_always_inline unsigned int +rte_ring_mc_hts_dequeue_bulk(struct rte_ring *r, void **obj_table, + unsigned int n, unsigned int *available) +{ + return rte_ring_mc_hts_dequeue_bulk_elem(r, obj_table, + sizeof(uintptr_t), n, available); +} + +/** + * Enqueue several objects on the HTS ring (multi-producers safe). + * + * @param r + * A pointer to the ring structure. + * @param obj_table + * A pointer to a table of void * pointers (objects). + * @param n + * The number of objects to add in the ring from the obj_table. + * @param free_space + * if non-NULL, returns the amount of space in the ring after the + * enqueue operation has finished. + * @return + * - n: Actual number of objects enqueued. + */ +__rte_experimental +static __rte_always_inline unsigned +rte_ring_mp_hts_enqueue_burst(struct rte_ring *r, void * const *obj_table, + unsigned int n, unsigned int *free_space) +{ + return rte_ring_mp_hts_enqueue_burst_elem(r, obj_table, + sizeof(uintptr_t), n, free_space); +} + +/** + * Dequeue several objects from an HTS ring (multi-consumers safe). + * When the requested objects are more than the available objects, + * only dequeue the actual number of objects. + * + * @param r + * A pointer to the ring structure. + * @param obj_table + * A pointer to a table of void * pointers (objects) that will be filled. + * @param n + * The number of objects to dequeue from the ring to the obj_table. + * @param available + * If non-NULL, returns the number of remaining ring entries after the + * dequeue has finished. + * @return + * - n: Actual number of objects dequeued, 0 if ring is empty + */ +__rte_experimental +static __rte_always_inline unsigned +rte_ring_mc_hts_dequeue_burst(struct rte_ring *r, void **obj_table, + unsigned int n, unsigned int *available) +{ + return rte_ring_mc_hts_dequeue_burst_elem(r, obj_table, + sizeof(uintptr_t), n, available); +} + +#ifdef __cplusplus +} +#endif + +#endif /* _RTE_RING_HTS_H_ */ diff --git a/src/spdk/dpdk/lib/librte_ring/rte_ring_hts_c11_mem.h b/src/spdk/dpdk/lib/librte_ring/rte_ring_hts_c11_mem.h new file mode 100644 index 000000000..16e54b6ff --- /dev/null +++ b/src/spdk/dpdk/lib/librte_ring/rte_ring_hts_c11_mem.h @@ -0,0 +1,164 @@ +/* SPDX-License-Identifier: BSD-3-Clause + * + * Copyright (c) 2010-2020 Intel Corporation + * Copyright (c) 2007-2009 Kip Macy kmacy@freebsd.org + * All rights reserved. + * Derived from FreeBSD's bufring.h + * Used as BSD-3 Licensed with permission from Kip Macy. + */ + +#ifndef _RTE_RING_HTS_C11_MEM_H_ +#define _RTE_RING_HTS_C11_MEM_H_ + +/** + * @file rte_ring_hts_c11_mem.h + * It is not recommended to include this file directly, + * include <rte_ring.h> instead. + * Contains internal helper functions for head/tail sync (HTS) ring mode. + * For more information please refer to <rte_ring_hts.h>. + */ + +/** + * @internal update tail with new value. + */ +static __rte_always_inline void +__rte_ring_hts_update_tail(struct rte_ring_hts_headtail *ht, uint32_t old_tail, + uint32_t num, uint32_t enqueue) +{ + uint32_t tail; + + RTE_SET_USED(enqueue); + + tail = old_tail + num; + __atomic_store_n(&ht->ht.pos.tail, tail, __ATOMIC_RELEASE); +} + +/** + * @internal waits till tail will become equal to head. + * Means no writer/reader is active for that ring. + * Suppose to work as serialization point. + */ +static __rte_always_inline void +__rte_ring_hts_head_wait(const struct rte_ring_hts_headtail *ht, + union __rte_ring_hts_pos *p) +{ + while (p->pos.head != p->pos.tail) { + rte_pause(); + p->raw = __atomic_load_n(&ht->ht.raw, __ATOMIC_ACQUIRE); + } +} + +/** + * @internal This function updates the producer head for enqueue + */ +static __rte_always_inline unsigned int +__rte_ring_hts_move_prod_head(struct rte_ring *r, unsigned int num, + enum rte_ring_queue_behavior behavior, uint32_t *old_head, + uint32_t *free_entries) +{ + uint32_t n; + union __rte_ring_hts_pos np, op; + + const uint32_t capacity = r->capacity; + + op.raw = __atomic_load_n(&r->hts_prod.ht.raw, __ATOMIC_ACQUIRE); + + do { + /* Reset n to the initial burst count */ + n = num; + + /* + * wait for tail to be equal to head, + * make sure that we read prod head/tail *before* + * reading cons tail. + */ + __rte_ring_hts_head_wait(&r->hts_prod, &op); + + /* + * The subtraction is done between two unsigned 32bits value + * (the result is always modulo 32 bits even if we have + * *old_head > cons_tail). So 'free_entries' is always between 0 + * and capacity (which is < size). + */ + *free_entries = capacity + r->cons.tail - op.pos.head; + + /* check that we have enough room in ring */ + if (unlikely(n > *free_entries)) + n = (behavior == RTE_RING_QUEUE_FIXED) ? + 0 : *free_entries; + + if (n == 0) + break; + + np.pos.tail = op.pos.tail; + np.pos.head = op.pos.head + n; + + /* + * this CAS(ACQUIRE, ACQUIRE) serves as a hoist barrier to prevent: + * - OOO reads of cons tail value + * - OOO copy of elems from the ring + */ + } while (__atomic_compare_exchange_n(&r->hts_prod.ht.raw, + &op.raw, np.raw, + 0, __ATOMIC_ACQUIRE, __ATOMIC_ACQUIRE) == 0); + + *old_head = op.pos.head; + return n; +} + +/** + * @internal This function updates the consumer head for dequeue + */ +static __rte_always_inline unsigned int +__rte_ring_hts_move_cons_head(struct rte_ring *r, unsigned int num, + enum rte_ring_queue_behavior behavior, uint32_t *old_head, + uint32_t *entries) +{ + uint32_t n; + union __rte_ring_hts_pos np, op; + + op.raw = __atomic_load_n(&r->hts_cons.ht.raw, __ATOMIC_ACQUIRE); + + /* move cons.head atomically */ + do { + /* Restore n as it may change every loop */ + n = num; + + /* + * wait for tail to be equal to head, + * make sure that we read cons head/tail *before* + * reading prod tail. + */ + __rte_ring_hts_head_wait(&r->hts_cons, &op); + + /* The subtraction is done between two unsigned 32bits value + * (the result is always modulo 32 bits even if we have + * cons_head > prod_tail). So 'entries' is always between 0 + * and size(ring)-1. + */ + *entries = r->prod.tail - op.pos.head; + + /* Set the actual entries for dequeue */ + if (n > *entries) + n = (behavior == RTE_RING_QUEUE_FIXED) ? 0 : *entries; + + if (unlikely(n == 0)) + break; + + np.pos.tail = op.pos.tail; + np.pos.head = op.pos.head + n; + + /* + * this CAS(ACQUIRE, ACQUIRE) serves as a hoist barrier to prevent: + * - OOO reads of prod tail value + * - OOO copy of elems from the ring + */ + } while (__atomic_compare_exchange_n(&r->hts_cons.ht.raw, + &op.raw, np.raw, + 0, __ATOMIC_ACQUIRE, __ATOMIC_ACQUIRE) == 0); + + *old_head = op.pos.head; + return n; +} + +#endif /* _RTE_RING_HTS_C11_MEM_H_ */ diff --git a/src/spdk/dpdk/lib/librte_ring/rte_ring_peek.h b/src/spdk/dpdk/lib/librte_ring/rte_ring_peek.h new file mode 100644 index 000000000..45f707dc7 --- /dev/null +++ b/src/spdk/dpdk/lib/librte_ring/rte_ring_peek.h @@ -0,0 +1,454 @@ +/* SPDX-License-Identifier: BSD-3-Clause + * + * Copyright (c) 2010-2020 Intel Corporation + * Copyright (c) 2007-2009 Kip Macy kmacy@freebsd.org + * All rights reserved. + * Derived from FreeBSD's bufring.h + * Used as BSD-3 Licensed with permission from Kip Macy. + */ + +#ifndef _RTE_RING_PEEK_H_ +#define _RTE_RING_PEEK_H_ + +/** + * @file + * @b EXPERIMENTAL: this API may change without prior notice + * It is not recommended to include this file directly. + * Please include <rte_ring_elem.h> instead. + * + * Ring Peek API + * Introduction of rte_ring with serialized producer/consumer (HTS sync mode) + * makes possible to split public enqueue/dequeue API into two phases: + * - enqueue/dequeue start + * - enqueue/dequeue finish + * That allows user to inspect objects in the ring without removing them + * from it (aka MT safe peek). + * Note that right now this new API is available only for two sync modes: + * 1) Single Producer/Single Consumer (RTE_RING_SYNC_ST) + * 2) Serialized Producer/Serialized Consumer (RTE_RING_SYNC_MT_HTS). + * It is a user responsibility to create/init ring with appropriate sync + * modes selected. + * As an example: + * // read 1 elem from the ring: + * n = rte_ring_dequeue_bulk_start(ring, &obj, 1, NULL); + * if (n != 0) { + * //examine object + * if (object_examine(obj) == KEEP) + * //decided to keep it in the ring. + * rte_ring_dequeue_finish(ring, 0); + * else + * //decided to remove it from the ring. + * rte_ring_dequeue_finish(ring, n); + * } + * Note that between _start_ and _finish_ none other thread can proceed + * with enqueue(/dequeue) operation till _finish_ completes. + */ + +#ifdef __cplusplus +extern "C" { +#endif + +#include <rte_ring_peek_c11_mem.h> + +/** + * @internal This function moves prod head value. + */ +static __rte_always_inline unsigned int +__rte_ring_do_enqueue_start(struct rte_ring *r, uint32_t n, + enum rte_ring_queue_behavior behavior, uint32_t *free_space) +{ + uint32_t free, head, next; + + switch (r->prod.sync_type) { + case RTE_RING_SYNC_ST: + n = __rte_ring_move_prod_head(r, RTE_RING_SYNC_ST, n, + behavior, &head, &next, &free); + break; + case RTE_RING_SYNC_MT_HTS: + n = __rte_ring_hts_move_prod_head(r, n, behavior, + &head, &free); + break; + case RTE_RING_SYNC_MT: + case RTE_RING_SYNC_MT_RTS: + default: + /* unsupported mode, shouldn't be here */ + RTE_ASSERT(0); + n = 0; + free = 0; + } + + if (free_space != NULL) + *free_space = free - n; + return n; +} + +/** + * Start to enqueue several objects on the ring. + * Note that no actual objects are put in the queue by this function, + * it just reserves for user such ability. + * User has to call appropriate enqueue_elem_finish() to copy objects into the + * queue and complete given enqueue operation. + * + * @param r + * A pointer to the ring structure. + * @param n + * The number of objects to add in the ring from the obj_table. + * @param free_space + * if non-NULL, returns the amount of space in the ring after the + * enqueue operation has finished. + * @return + * The number of objects that can be enqueued, either 0 or n + */ +__rte_experimental +static __rte_always_inline unsigned int +rte_ring_enqueue_bulk_elem_start(struct rte_ring *r, unsigned int n, + unsigned int *free_space) +{ + return __rte_ring_do_enqueue_start(r, n, RTE_RING_QUEUE_FIXED, + free_space); +} + +/** + * Start to enqueue several objects on the ring. + * Note that no actual objects are put in the queue by this function, + * it just reserves for user such ability. + * User has to call appropriate enqueue_finish() to copy objects into the + * queue and complete given enqueue operation. + * + * @param r + * A pointer to the ring structure. + * @param n + * The number of objects to add in the ring from the obj_table. + * @param free_space + * if non-NULL, returns the amount of space in the ring after the + * enqueue operation has finished. + * @return + * The number of objects that can be enqueued, either 0 or n + */ +__rte_experimental +static __rte_always_inline unsigned int +rte_ring_enqueue_bulk_start(struct rte_ring *r, unsigned int n, + unsigned int *free_space) +{ + return rte_ring_enqueue_bulk_elem_start(r, n, free_space); +} + +/** + * Start to enqueue several objects on the ring. + * Note that no actual objects are put in the queue by this function, + * it just reserves for user such ability. + * User has to call appropriate enqueue_elem_finish() to copy objects into the + * queue and complete given enqueue operation. + * + * @param r + * A pointer to the ring structure. + * @param n + * The number of objects to add in the ring from the obj_table. + * @param free_space + * if non-NULL, returns the amount of space in the ring after the + * enqueue operation has finished. + * @return + * Actual number of objects that can be enqueued. + */ +__rte_experimental +static __rte_always_inline unsigned int +rte_ring_enqueue_burst_elem_start(struct rte_ring *r, unsigned int n, + unsigned int *free_space) +{ + return __rte_ring_do_enqueue_start(r, n, RTE_RING_QUEUE_VARIABLE, + free_space); +} + +/** + * Start to enqueue several objects on the ring. + * Note that no actual objects are put in the queue by this function, + * it just reserves for user such ability. + * User has to call appropriate enqueue_finish() to copy objects into the + * queue and complete given enqueue operation. + * + * @param r + * A pointer to the ring structure. + * @param n + * The number of objects to add in the ring from the obj_table. + * @param free_space + * if non-NULL, returns the amount of space in the ring after the + * enqueue operation has finished. + * @return + * Actual number of objects that can be enqueued. + */ +__rte_experimental +static __rte_always_inline unsigned int +rte_ring_enqueue_burst_start(struct rte_ring *r, unsigned int n, + unsigned int *free_space) +{ + return rte_ring_enqueue_burst_elem_start(r, n, free_space); +} + +/** + * Complete to enqueue several objects on the ring. + * Note that number of objects to enqueue should not exceed previous + * enqueue_start return value. + * + * @param r + * A pointer to the ring structure. + * @param obj_table + * A pointer to a table of objects. + * @param esize + * The size of ring element, in bytes. It must be a multiple of 4. + * This must be the same value used while creating the ring. Otherwise + * the results are undefined. + * @param n + * The number of objects to add to the ring from the obj_table. + */ +__rte_experimental +static __rte_always_inline void +rte_ring_enqueue_elem_finish(struct rte_ring *r, const void *obj_table, + unsigned int esize, unsigned int n) +{ + uint32_t tail; + + switch (r->prod.sync_type) { + case RTE_RING_SYNC_ST: + n = __rte_ring_st_get_tail(&r->prod, &tail, n); + if (n != 0) + __rte_ring_enqueue_elems(r, tail, obj_table, esize, n); + __rte_ring_st_set_head_tail(&r->prod, tail, n, 1); + break; + case RTE_RING_SYNC_MT_HTS: + n = __rte_ring_hts_get_tail(&r->hts_prod, &tail, n); + if (n != 0) + __rte_ring_enqueue_elems(r, tail, obj_table, esize, n); + __rte_ring_hts_set_head_tail(&r->hts_prod, tail, n, 1); + break; + case RTE_RING_SYNC_MT: + case RTE_RING_SYNC_MT_RTS: + default: + /* unsupported mode, shouldn't be here */ + RTE_ASSERT(0); + } +} + +/** + * Complete to enqueue several objects on the ring. + * Note that number of objects to enqueue should not exceed previous + * enqueue_start return value. + * + * @param r + * A pointer to the ring structure. + * @param obj_table + * A pointer to a table of objects. + * @param n + * The number of objects to add to the ring from the obj_table. + */ +__rte_experimental +static __rte_always_inline void +rte_ring_enqueue_finish(struct rte_ring *r, void * const *obj_table, + unsigned int n) +{ + rte_ring_enqueue_elem_finish(r, obj_table, sizeof(uintptr_t), n); +} + +/** + * @internal This function moves cons head value and copies up to *n* + * objects from the ring to the user provided obj_table. + */ +static __rte_always_inline unsigned int +__rte_ring_do_dequeue_start(struct rte_ring *r, void *obj_table, + uint32_t esize, uint32_t n, enum rte_ring_queue_behavior behavior, + uint32_t *available) +{ + uint32_t avail, head, next; + + switch (r->cons.sync_type) { + case RTE_RING_SYNC_ST: + n = __rte_ring_move_cons_head(r, RTE_RING_SYNC_ST, n, + behavior, &head, &next, &avail); + break; + case RTE_RING_SYNC_MT_HTS: + n = __rte_ring_hts_move_cons_head(r, n, behavior, + &head, &avail); + break; + case RTE_RING_SYNC_MT: + case RTE_RING_SYNC_MT_RTS: + default: + /* unsupported mode, shouldn't be here */ + RTE_ASSERT(0); + n = 0; + avail = 0; + } + + if (n != 0) + __rte_ring_dequeue_elems(r, head, obj_table, esize, n); + + if (available != NULL) + *available = avail - n; + return n; +} + +/** + * Start to dequeue several objects from the ring. + * Note that user has to call appropriate dequeue_finish() + * to complete given dequeue operation and actually remove objects the ring. + * + * @param r + * A pointer to the ring structure. + * @param obj_table + * A pointer to a table of objects that will be filled. + * @param esize + * The size of ring element, in bytes. It must be a multiple of 4. + * This must be the same value used while creating the ring. Otherwise + * the results are undefined. + * @param n + * The number of objects to dequeue from the ring to the obj_table. + * @param available + * If non-NULL, returns the number of remaining ring entries after the + * dequeue has finished. + * @return + * The number of objects dequeued, either 0 or n. + */ +__rte_experimental +static __rte_always_inline unsigned int +rte_ring_dequeue_bulk_elem_start(struct rte_ring *r, void *obj_table, + unsigned int esize, unsigned int n, unsigned int *available) +{ + return __rte_ring_do_dequeue_start(r, obj_table, esize, n, + RTE_RING_QUEUE_FIXED, available); +} + +/** + * Start to dequeue several objects from the ring. + * Note that user has to call appropriate dequeue_finish() + * to complete given dequeue operation and actually remove objects the ring. + * + * @param r + * A pointer to the ring structure. + * @param obj_table + * A pointer to a table of void * pointers (objects) that will be filled. + * @param n + * The number of objects to dequeue from the ring to the obj_table. + * @param available + * If non-NULL, returns the number of remaining ring entries after the + * dequeue has finished. + * @return + * Actual number of objects dequeued. + */ +__rte_experimental +static __rte_always_inline unsigned int +rte_ring_dequeue_bulk_start(struct rte_ring *r, void **obj_table, + unsigned int n, unsigned int *available) +{ + return rte_ring_dequeue_bulk_elem_start(r, obj_table, sizeof(uintptr_t), + n, available); +} + +/** + * Start to dequeue several objects from the ring. + * Note that user has to call appropriate dequeue_finish() + * to complete given dequeue operation and actually remove objects the ring. + * + * @param r + * A pointer to the ring structure. + * @param obj_table + * A pointer to a table of objects that will be filled. + * @param esize + * The size of ring element, in bytes. It must be a multiple of 4. + * This must be the same value used while creating the ring. Otherwise + * the results are undefined. + * @param n + * The number of objects to dequeue from the ring to the obj_table. + * @param available + * If non-NULL, returns the number of remaining ring entries after the + * dequeue has finished. + * @return + * The actual number of objects dequeued. + */ +__rte_experimental +static __rte_always_inline unsigned int +rte_ring_dequeue_burst_elem_start(struct rte_ring *r, void *obj_table, + unsigned int esize, unsigned int n, unsigned int *available) +{ + return __rte_ring_do_dequeue_start(r, obj_table, esize, n, + RTE_RING_QUEUE_VARIABLE, available); +} + +/** + * Start to dequeue several objects from the ring. + * Note that user has to call appropriate dequeue_finish() + * to complete given dequeue operation and actually remove objects the ring. + * + * @param r + * A pointer to the ring structure. + * @param obj_table + * A pointer to a table of void * pointers (objects) that will be filled. + * @param n + * The number of objects to dequeue from the ring to the obj_table. + * @param available + * If non-NULL, returns the number of remaining ring entries after the + * dequeue has finished. + * @return + * The actual number of objects dequeued. + */ +__rte_experimental +static __rte_always_inline unsigned int +rte_ring_dequeue_burst_start(struct rte_ring *r, void **obj_table, + unsigned int n, unsigned int *available) +{ + return rte_ring_dequeue_burst_elem_start(r, obj_table, + sizeof(uintptr_t), n, available); +} + +/** + * Complete to dequeue several objects from the ring. + * Note that number of objects to dequeue should not exceed previous + * dequeue_start return value. + * + * @param r + * A pointer to the ring structure. + * @param n + * The number of objects to remove from the ring. + */ +__rte_experimental +static __rte_always_inline void +rte_ring_dequeue_elem_finish(struct rte_ring *r, unsigned int n) +{ + uint32_t tail; + + switch (r->cons.sync_type) { + case RTE_RING_SYNC_ST: + n = __rte_ring_st_get_tail(&r->cons, &tail, n); + __rte_ring_st_set_head_tail(&r->cons, tail, n, 0); + break; + case RTE_RING_SYNC_MT_HTS: + n = __rte_ring_hts_get_tail(&r->hts_cons, &tail, n); + __rte_ring_hts_set_head_tail(&r->hts_cons, tail, n, 0); + break; + case RTE_RING_SYNC_MT: + case RTE_RING_SYNC_MT_RTS: + default: + /* unsupported mode, shouldn't be here */ + RTE_ASSERT(0); + } +} + +/** + * Complete to dequeue several objects from the ring. + * Note that number of objects to dequeue should not exceed previous + * dequeue_start return value. + * + * @param r + * A pointer to the ring structure. + * @param n + * The number of objects to remove from the ring. + */ +__rte_experimental +static __rte_always_inline void +rte_ring_dequeue_finish(struct rte_ring *r, unsigned int n) +{ + rte_ring_dequeue_elem_finish(r, n); +} + +#ifdef __cplusplus +} +#endif + +#endif /* _RTE_RING_PEEK_H_ */ diff --git a/src/spdk/dpdk/lib/librte_ring/rte_ring_peek_c11_mem.h b/src/spdk/dpdk/lib/librte_ring/rte_ring_peek_c11_mem.h new file mode 100644 index 000000000..99321f124 --- /dev/null +++ b/src/spdk/dpdk/lib/librte_ring/rte_ring_peek_c11_mem.h @@ -0,0 +1,110 @@ +/* SPDX-License-Identifier: BSD-3-Clause + * + * Copyright (c) 2010-2020 Intel Corporation + * Copyright (c) 2007-2009 Kip Macy kmacy@freebsd.org + * All rights reserved. + * Derived from FreeBSD's bufring.h + * Used as BSD-3 Licensed with permission from Kip Macy. + */ + +#ifndef _RTE_RING_PEEK_C11_MEM_H_ +#define _RTE_RING_PEEK_C11_MEM_H_ + +/** + * @file rte_ring_peek_c11_mem.h + * It is not recommended to include this file directly, + * include <rte_ring.h> instead. + * Contains internal helper functions for rte_ring peek API. + * For more information please refer to <rte_ring_peek.h>. + */ + +/** + * @internal get current tail value. + * This function should be used only for single thread producer/consumer. + * Check that user didn't request to move tail above the head. + * In that situation: + * - return zero, that will cause abort any pending changes and + * return head to its previous position. + * - throw an assert in debug mode. + */ +static __rte_always_inline uint32_t +__rte_ring_st_get_tail(struct rte_ring_headtail *ht, uint32_t *tail, + uint32_t num) +{ + uint32_t h, n, t; + + h = ht->head; + t = ht->tail; + n = h - t; + + RTE_ASSERT(n >= num); + num = (n >= num) ? num : 0; + + *tail = h; + return num; +} + +/** + * @internal set new values for head and tail. + * This function should be used only for single thread producer/consumer. + * Should be used only in conjunction with __rte_ring_st_get_tail. + */ +static __rte_always_inline void +__rte_ring_st_set_head_tail(struct rte_ring_headtail *ht, uint32_t tail, + uint32_t num, uint32_t enqueue) +{ + uint32_t pos; + + RTE_SET_USED(enqueue); + + pos = tail + num; + ht->head = pos; + __atomic_store_n(&ht->tail, pos, __ATOMIC_RELEASE); +} + +/** + * @internal get current tail value. + * This function should be used only for producer/consumer in MT_HTS mode. + * Check that user didn't request to move tail above the head. + * In that situation: + * - return zero, that will cause abort any pending changes and + * return head to its previous position. + * - throw an assert in debug mode. + */ +static __rte_always_inline uint32_t +__rte_ring_hts_get_tail(struct rte_ring_hts_headtail *ht, uint32_t *tail, + uint32_t num) +{ + uint32_t n; + union __rte_ring_hts_pos p; + + p.raw = __atomic_load_n(&ht->ht.raw, __ATOMIC_RELAXED); + n = p.pos.head - p.pos.tail; + + RTE_ASSERT(n >= num); + num = (n >= num) ? num : 0; + + *tail = p.pos.tail; + return num; +} + +/** + * @internal set new values for head and tail as one atomic 64 bit operation. + * This function should be used only for producer/consumer in MT_HTS mode. + * Should be used only in conjunction with __rte_ring_hts_get_tail. + */ +static __rte_always_inline void +__rte_ring_hts_set_head_tail(struct rte_ring_hts_headtail *ht, uint32_t tail, + uint32_t num, uint32_t enqueue) +{ + union __rte_ring_hts_pos p; + + RTE_SET_USED(enqueue); + + p.pos.head = tail + num; + p.pos.tail = p.pos.head; + + __atomic_store_n(&ht->ht.raw, p.raw, __ATOMIC_RELEASE); +} + +#endif /* _RTE_RING_PEEK_C11_MEM_H_ */ diff --git a/src/spdk/dpdk/lib/librte_ring/rte_ring_rts.h b/src/spdk/dpdk/lib/librte_ring/rte_ring_rts.h new file mode 100644 index 000000000..d40e9994f --- /dev/null +++ b/src/spdk/dpdk/lib/librte_ring/rte_ring_rts.h @@ -0,0 +1,439 @@ +/* SPDX-License-Identifier: BSD-3-Clause + * + * Copyright (c) 2010-2020 Intel Corporation + * Copyright (c) 2007-2009 Kip Macy kmacy@freebsd.org + * All rights reserved. + * Derived from FreeBSD's bufring.h + * Used as BSD-3 Licensed with permission from Kip Macy. + */ + +#ifndef _RTE_RING_RTS_H_ +#define _RTE_RING_RTS_H_ + +/** + * @file rte_ring_rts.h + * @b EXPERIMENTAL: this API may change without prior notice + * It is not recommended to include this file directly. + * Please include <rte_ring.h> instead. + * + * Contains functions for Relaxed Tail Sync (RTS) ring mode. + * The main idea remains the same as for our original MP/MC synchronization + * mechanism. + * The main difference is that tail value is increased not + * by every thread that finished enqueue/dequeue, + * but only by the current last one doing enqueue/dequeue. + * That allows threads to skip spinning on tail value, + * leaving actual tail value change to last thread at a given instance. + * RTS requires 2 64-bit CAS for each enqueue(/dequeue) operation: + * one for head update, second for tail update. + * As a gain it allows thread to avoid spinning/waiting on tail value. + * In comparison original MP/MC algorithm requires one 32-bit CAS + * for head update and waiting/spinning on tail value. + * + * Brief outline: + * - introduce update counter (cnt) for both head and tail. + * - increment head.cnt for each head.value update + * - write head.value and head.cnt atomically (64-bit CAS) + * - move tail.value ahead only when tail.cnt + 1 == head.cnt + * (indicating that this is the last thread updating the tail) + * - increment tail.cnt when each enqueue/dequeue op finishes + * (no matter if tail.value going to change or not) + * - write tail.value and tail.cnt atomically (64-bit CAS) + * + * To avoid producer/consumer starvation: + * - limit max allowed distance between head and tail value (HTD_MAX). + * I.E. thread is allowed to proceed with changing head.value, + * only when: head.value - tail.value <= HTD_MAX + * HTD_MAX is an optional parameter. + * With HTD_MAX == 0 we'll have fully serialized ring - + * i.e. only one thread at a time will be able to enqueue/dequeue + * to/from the ring. + * With HTD_MAX >= ring.capacity - no limitation. + * By default HTD_MAX == ring.capacity / 8. + */ + +#ifdef __cplusplus +extern "C" { +#endif + +#include <rte_ring_rts_c11_mem.h> + +/** + * @internal Enqueue several objects on the RTS ring. + * + * @param r + * A pointer to the ring structure. + * @param obj_table + * A pointer to a table of objects. + * @param esize + * The size of ring element, in bytes. It must be a multiple of 4. + * This must be the same value used while creating the ring. Otherwise + * the results are undefined. + * @param n + * The number of objects to add in the ring from the obj_table. + * @param behavior + * RTE_RING_QUEUE_FIXED: Enqueue a fixed number of items from a ring + * RTE_RING_QUEUE_VARIABLE: Enqueue as many items as possible from ring + * @param free_space + * returns the amount of space after the enqueue operation has finished + * @return + * Actual number of objects enqueued. + * If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only. + */ +static __rte_always_inline unsigned int +__rte_ring_do_rts_enqueue_elem(struct rte_ring *r, const void *obj_table, + uint32_t esize, uint32_t n, enum rte_ring_queue_behavior behavior, + uint32_t *free_space) +{ + uint32_t free, head; + + n = __rte_ring_rts_move_prod_head(r, n, behavior, &head, &free); + + if (n != 0) { + __rte_ring_enqueue_elems(r, head, obj_table, esize, n); + __rte_ring_rts_update_tail(&r->rts_prod); + } + + if (free_space != NULL) + *free_space = free - n; + return n; +} + +/** + * @internal Dequeue several objects from the RTS ring. + * + * @param r + * A pointer to the ring structure. + * @param obj_table + * A pointer to a table of objects. + * @param esize + * The size of ring element, in bytes. It must be a multiple of 4. + * This must be the same value used while creating the ring. Otherwise + * the results are undefined. + * @param n + * The number of objects to pull from the ring. + * @param behavior + * RTE_RING_QUEUE_FIXED: Dequeue a fixed number of items from a ring + * RTE_RING_QUEUE_VARIABLE: Dequeue as many items as possible from ring + * @param available + * returns the number of remaining ring entries after the dequeue has finished + * @return + * - Actual number of objects dequeued. + * If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only. + */ +static __rte_always_inline unsigned int +__rte_ring_do_rts_dequeue_elem(struct rte_ring *r, void *obj_table, + uint32_t esize, uint32_t n, enum rte_ring_queue_behavior behavior, + uint32_t *available) +{ + uint32_t entries, head; + + n = __rte_ring_rts_move_cons_head(r, n, behavior, &head, &entries); + + if (n != 0) { + __rte_ring_dequeue_elems(r, head, obj_table, esize, n); + __rte_ring_rts_update_tail(&r->rts_cons); + } + + if (available != NULL) + *available = entries - n; + return n; +} + +/** + * Enqueue several objects on the RTS ring (multi-producers safe). + * + * @param r + * A pointer to the ring structure. + * @param obj_table + * A pointer to a table of objects. + * @param esize + * The size of ring element, in bytes. It must be a multiple of 4. + * This must be the same value used while creating the ring. Otherwise + * the results are undefined. + * @param n + * The number of objects to add in the ring from the obj_table. + * @param free_space + * if non-NULL, returns the amount of space in the ring after the + * enqueue operation has finished. + * @return + * The number of objects enqueued, either 0 or n + */ +__rte_experimental +static __rte_always_inline unsigned int +rte_ring_mp_rts_enqueue_bulk_elem(struct rte_ring *r, const void *obj_table, + unsigned int esize, unsigned int n, unsigned int *free_space) +{ + return __rte_ring_do_rts_enqueue_elem(r, obj_table, esize, n, + RTE_RING_QUEUE_FIXED, free_space); +} + +/** + * Dequeue several objects from an RTS ring (multi-consumers safe). + * + * @param r + * A pointer to the ring structure. + * @param obj_table + * A pointer to a table of objects that will be filled. + * @param esize + * The size of ring element, in bytes. It must be a multiple of 4. + * This must be the same value used while creating the ring. Otherwise + * the results are undefined. + * @param n + * The number of objects to dequeue from the ring to the obj_table. + * @param available + * If non-NULL, returns the number of remaining ring entries after the + * dequeue has finished. + * @return + * The number of objects dequeued, either 0 or n + */ +__rte_experimental +static __rte_always_inline unsigned int +rte_ring_mc_rts_dequeue_bulk_elem(struct rte_ring *r, void *obj_table, + unsigned int esize, unsigned int n, unsigned int *available) +{ + return __rte_ring_do_rts_dequeue_elem(r, obj_table, esize, n, + RTE_RING_QUEUE_FIXED, available); +} + +/** + * Enqueue several objects on the RTS ring (multi-producers safe). + * + * @param r + * A pointer to the ring structure. + * @param obj_table + * A pointer to a table of objects. + * @param esize + * The size of ring element, in bytes. It must be a multiple of 4. + * This must be the same value used while creating the ring. Otherwise + * the results are undefined. + * @param n + * The number of objects to add in the ring from the obj_table. + * @param free_space + * if non-NULL, returns the amount of space in the ring after the + * enqueue operation has finished. + * @return + * - n: Actual number of objects enqueued. + */ +__rte_experimental +static __rte_always_inline unsigned +rte_ring_mp_rts_enqueue_burst_elem(struct rte_ring *r, const void *obj_table, + unsigned int esize, unsigned int n, unsigned int *free_space) +{ + return __rte_ring_do_rts_enqueue_elem(r, obj_table, esize, n, + RTE_RING_QUEUE_VARIABLE, free_space); +} + +/** + * Dequeue several objects from an RTS ring (multi-consumers safe). + * When the requested objects are more than the available objects, + * only dequeue the actual number of objects. + * + * @param r + * A pointer to the ring structure. + * @param obj_table + * A pointer to a table of objects that will be filled. + * @param esize + * The size of ring element, in bytes. It must be a multiple of 4. + * This must be the same value used while creating the ring. Otherwise + * the results are undefined. + * @param n + * The number of objects to dequeue from the ring to the obj_table. + * @param available + * If non-NULL, returns the number of remaining ring entries after the + * dequeue has finished. + * @return + * - n: Actual number of objects dequeued, 0 if ring is empty + */ +__rte_experimental +static __rte_always_inline unsigned +rte_ring_mc_rts_dequeue_burst_elem(struct rte_ring *r, void *obj_table, + unsigned int esize, unsigned int n, unsigned int *available) +{ + return __rte_ring_do_rts_dequeue_elem(r, obj_table, esize, n, + RTE_RING_QUEUE_VARIABLE, available); +} + +/** + * Enqueue several objects on the RTS ring (multi-producers safe). + * + * @param r + * A pointer to the ring structure. + * @param obj_table + * A pointer to a table of void * pointers (objects). + * @param n + * The number of objects to add in the ring from the obj_table. + * @param free_space + * if non-NULL, returns the amount of space in the ring after the + * enqueue operation has finished. + * @return + * The number of objects enqueued, either 0 or n + */ +__rte_experimental +static __rte_always_inline unsigned int +rte_ring_mp_rts_enqueue_bulk(struct rte_ring *r, void * const *obj_table, + unsigned int n, unsigned int *free_space) +{ + return rte_ring_mp_rts_enqueue_bulk_elem(r, obj_table, + sizeof(uintptr_t), n, free_space); +} + +/** + * Dequeue several objects from an RTS ring (multi-consumers safe). + * + * @param r + * A pointer to the ring structure. + * @param obj_table + * A pointer to a table of void * pointers (objects) that will be filled. + * @param n + * The number of objects to dequeue from the ring to the obj_table. + * @param available + * If non-NULL, returns the number of remaining ring entries after the + * dequeue has finished. + * @return + * The number of objects dequeued, either 0 or n + */ +__rte_experimental +static __rte_always_inline unsigned int +rte_ring_mc_rts_dequeue_bulk(struct rte_ring *r, void **obj_table, + unsigned int n, unsigned int *available) +{ + return rte_ring_mc_rts_dequeue_bulk_elem(r, obj_table, + sizeof(uintptr_t), n, available); +} + +/** + * Enqueue several objects on the RTS ring (multi-producers safe). + * + * @param r + * A pointer to the ring structure. + * @param obj_table + * A pointer to a table of void * pointers (objects). + * @param n + * The number of objects to add in the ring from the obj_table. + * @param free_space + * if non-NULL, returns the amount of space in the ring after the + * enqueue operation has finished. + * @return + * - n: Actual number of objects enqueued. + */ +__rte_experimental +static __rte_always_inline unsigned +rte_ring_mp_rts_enqueue_burst(struct rte_ring *r, void * const *obj_table, + unsigned int n, unsigned int *free_space) +{ + return rte_ring_mp_rts_enqueue_burst_elem(r, obj_table, + sizeof(uintptr_t), n, free_space); +} + +/** + * Dequeue several objects from an RTS ring (multi-consumers safe). + * When the requested objects are more than the available objects, + * only dequeue the actual number of objects. + * + * @param r + * A pointer to the ring structure. + * @param obj_table + * A pointer to a table of void * pointers (objects) that will be filled. + * @param n + * The number of objects to dequeue from the ring to the obj_table. + * @param available + * If non-NULL, returns the number of remaining ring entries after the + * dequeue has finished. + * @return + * - n: Actual number of objects dequeued, 0 if ring is empty + */ +__rte_experimental +static __rte_always_inline unsigned +rte_ring_mc_rts_dequeue_burst(struct rte_ring *r, void **obj_table, + unsigned int n, unsigned int *available) +{ + return rte_ring_mc_rts_dequeue_burst_elem(r, obj_table, + sizeof(uintptr_t), n, available); +} + +/** + * Return producer max Head-Tail-Distance (HTD). + * + * @param r + * A pointer to the ring structure. + * @return + * Producer HTD value, if producer is set in appropriate sync mode, + * or UINT32_MAX otherwise. + */ +__rte_experimental +static inline uint32_t +rte_ring_get_prod_htd_max(const struct rte_ring *r) +{ + if (r->prod.sync_type == RTE_RING_SYNC_MT_RTS) + return r->rts_prod.htd_max; + return UINT32_MAX; +} + +/** + * Set producer max Head-Tail-Distance (HTD). + * Note that producer has to use appropriate sync mode (RTS). + * + * @param r + * A pointer to the ring structure. + * @param v + * new HTD value to setup. + * @return + * Zero on success, or negative error code otherwise. + */ +__rte_experimental +static inline int +rte_ring_set_prod_htd_max(struct rte_ring *r, uint32_t v) +{ + if (r->prod.sync_type != RTE_RING_SYNC_MT_RTS) + return -ENOTSUP; + + r->rts_prod.htd_max = v; + return 0; +} + +/** + * Return consumer max Head-Tail-Distance (HTD). + * + * @param r + * A pointer to the ring structure. + * @return + * Consumer HTD value, if consumer is set in appropriate sync mode, + * or UINT32_MAX otherwise. + */ +__rte_experimental +static inline uint32_t +rte_ring_get_cons_htd_max(const struct rte_ring *r) +{ + if (r->cons.sync_type == RTE_RING_SYNC_MT_RTS) + return r->rts_cons.htd_max; + return UINT32_MAX; +} + +/** + * Set consumer max Head-Tail-Distance (HTD). + * Note that consumer has to use appropriate sync mode (RTS). + * + * @param r + * A pointer to the ring structure. + * @param v + * new HTD value to setup. + * @return + * Zero on success, or negative error code otherwise. + */ +__rte_experimental +static inline int +rte_ring_set_cons_htd_max(struct rte_ring *r, uint32_t v) +{ + if (r->cons.sync_type != RTE_RING_SYNC_MT_RTS) + return -ENOTSUP; + + r->rts_cons.htd_max = v; + return 0; +} + +#ifdef __cplusplus +} +#endif + +#endif /* _RTE_RING_RTS_H_ */ diff --git a/src/spdk/dpdk/lib/librte_ring/rte_ring_rts_c11_mem.h b/src/spdk/dpdk/lib/librte_ring/rte_ring_rts_c11_mem.h new file mode 100644 index 000000000..327f22796 --- /dev/null +++ b/src/spdk/dpdk/lib/librte_ring/rte_ring_rts_c11_mem.h @@ -0,0 +1,179 @@ +/* SPDX-License-Identifier: BSD-3-Clause + * + * Copyright (c) 2010-2020 Intel Corporation + * Copyright (c) 2007-2009 Kip Macy kmacy@freebsd.org + * All rights reserved. + * Derived from FreeBSD's bufring.h + * Used as BSD-3 Licensed with permission from Kip Macy. + */ + +#ifndef _RTE_RING_RTS_C11_MEM_H_ +#define _RTE_RING_RTS_C11_MEM_H_ + +/** + * @file rte_ring_rts_c11_mem.h + * It is not recommended to include this file directly, + * include <rte_ring.h> instead. + * Contains internal helper functions for Relaxed Tail Sync (RTS) ring mode. + * For more information please refer to <rte_ring_rts.h>. + */ + +/** + * @internal This function updates tail values. + */ +static __rte_always_inline void +__rte_ring_rts_update_tail(struct rte_ring_rts_headtail *ht) +{ + union __rte_ring_rts_poscnt h, ot, nt; + + /* + * If there are other enqueues/dequeues in progress that + * might preceded us, then don't update tail with new value. + */ + + ot.raw = __atomic_load_n(&ht->tail.raw, __ATOMIC_ACQUIRE); + + do { + /* on 32-bit systems we have to do atomic read here */ + h.raw = __atomic_load_n(&ht->head.raw, __ATOMIC_RELAXED); + + nt.raw = ot.raw; + if (++nt.val.cnt == h.val.cnt) + nt.val.pos = h.val.pos; + + } while (__atomic_compare_exchange_n(&ht->tail.raw, &ot.raw, nt.raw, + 0, __ATOMIC_RELEASE, __ATOMIC_ACQUIRE) == 0); +} + +/** + * @internal This function waits till head/tail distance wouldn't + * exceed pre-defined max value. + */ +static __rte_always_inline void +__rte_ring_rts_head_wait(const struct rte_ring_rts_headtail *ht, + union __rte_ring_rts_poscnt *h) +{ + uint32_t max; + + max = ht->htd_max; + + while (h->val.pos - ht->tail.val.pos > max) { + rte_pause(); + h->raw = __atomic_load_n(&ht->head.raw, __ATOMIC_ACQUIRE); + } +} + +/** + * @internal This function updates the producer head for enqueue. + */ +static __rte_always_inline uint32_t +__rte_ring_rts_move_prod_head(struct rte_ring *r, uint32_t num, + enum rte_ring_queue_behavior behavior, uint32_t *old_head, + uint32_t *free_entries) +{ + uint32_t n; + union __rte_ring_rts_poscnt nh, oh; + + const uint32_t capacity = r->capacity; + + oh.raw = __atomic_load_n(&r->rts_prod.head.raw, __ATOMIC_ACQUIRE); + + do { + /* Reset n to the initial burst count */ + n = num; + + /* + * wait for prod head/tail distance, + * make sure that we read prod head *before* + * reading cons tail. + */ + __rte_ring_rts_head_wait(&r->rts_prod, &oh); + + /* + * The subtraction is done between two unsigned 32bits value + * (the result is always modulo 32 bits even if we have + * *old_head > cons_tail). So 'free_entries' is always between 0 + * and capacity (which is < size). + */ + *free_entries = capacity + r->cons.tail - oh.val.pos; + + /* check that we have enough room in ring */ + if (unlikely(n > *free_entries)) + n = (behavior == RTE_RING_QUEUE_FIXED) ? + 0 : *free_entries; + + if (n == 0) + break; + + nh.val.pos = oh.val.pos + n; + nh.val.cnt = oh.val.cnt + 1; + + /* + * this CAS(ACQUIRE, ACQUIRE) serves as a hoist barrier to prevent: + * - OOO reads of cons tail value + * - OOO copy of elems to the ring + */ + } while (__atomic_compare_exchange_n(&r->rts_prod.head.raw, + &oh.raw, nh.raw, + 0, __ATOMIC_ACQUIRE, __ATOMIC_ACQUIRE) == 0); + + *old_head = oh.val.pos; + return n; +} + +/** + * @internal This function updates the consumer head for dequeue + */ +static __rte_always_inline unsigned int +__rte_ring_rts_move_cons_head(struct rte_ring *r, uint32_t num, + enum rte_ring_queue_behavior behavior, uint32_t *old_head, + uint32_t *entries) +{ + uint32_t n; + union __rte_ring_rts_poscnt nh, oh; + + oh.raw = __atomic_load_n(&r->rts_cons.head.raw, __ATOMIC_ACQUIRE); + + /* move cons.head atomically */ + do { + /* Restore n as it may change every loop */ + n = num; + + /* + * wait for cons head/tail distance, + * make sure that we read cons head *before* + * reading prod tail. + */ + __rte_ring_rts_head_wait(&r->rts_cons, &oh); + + /* The subtraction is done between two unsigned 32bits value + * (the result is always modulo 32 bits even if we have + * cons_head > prod_tail). So 'entries' is always between 0 + * and size(ring)-1. + */ + *entries = r->prod.tail - oh.val.pos; + + /* Set the actual entries for dequeue */ + if (n > *entries) + n = (behavior == RTE_RING_QUEUE_FIXED) ? 0 : *entries; + + if (unlikely(n == 0)) + break; + + nh.val.pos = oh.val.pos + n; + nh.val.cnt = oh.val.cnt + 1; + + /* + * this CAS(ACQUIRE, ACQUIRE) serves as a hoist barrier to prevent: + * - OOO reads of prod tail value + * - OOO copy of elems from the ring + */ + } while (__atomic_compare_exchange_n(&r->rts_cons.head.raw, + &oh.raw, nh.raw, + 0, __ATOMIC_ACQUIRE, __ATOMIC_ACQUIRE) == 0); + + *old_head = oh.val.pos; + return n; +} + +#endif /* _RTE_RING_RTS_C11_MEM_H_ */ diff --git a/src/spdk/dpdk/lib/librte_ring/rte_ring_version.map b/src/spdk/dpdk/lib/librte_ring/rte_ring_version.map new file mode 100644 index 000000000..e88c143cf --- /dev/null +++ b/src/spdk/dpdk/lib/librte_ring/rte_ring_version.map @@ -0,0 +1,24 @@ +DPDK_20.0 { + global: + + rte_ring_create; + rte_ring_dump; + rte_ring_free; + rte_ring_get_memsize; + rte_ring_init; + rte_ring_list_dump; + rte_ring_lookup; + + local: *; +}; + +EXPERIMENTAL { + global: + + # added in 19.08 + rte_ring_reset; + + # added in 20.02 + rte_ring_create_elem; + rte_ring_get_memsize_elem; +}; |