diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-21 11:54:28 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-21 11:54:28 +0000 |
commit | e6918187568dbd01842d8d1d2c808ce16a894239 (patch) | |
tree | 64f88b554b444a49f656b6c656111a145cbbaa28 /src/spdk/dpdk/drivers/event/sw | |
parent | Initial commit. (diff) | |
download | ceph-e6918187568dbd01842d8d1d2c808ce16a894239.tar.xz ceph-e6918187568dbd01842d8d1d2c808ce16a894239.zip |
Adding upstream version 18.2.2.upstream/18.2.2
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'src/spdk/dpdk/drivers/event/sw')
-rw-r--r-- | src/spdk/dpdk/drivers/event/sw/Makefile | 29 | ||||
-rw-r--r-- | src/spdk/dpdk/drivers/event/sw/event_ring.h | 153 | ||||
-rw-r--r-- | src/spdk/dpdk/drivers/event/sw/iq_chunk.h | 196 | ||||
-rw-r--r-- | src/spdk/dpdk/drivers/event/sw/meson.build | 10 | ||||
-rw-r--r-- | src/spdk/dpdk/drivers/event/sw/rte_pmd_sw_event_version.map | 3 | ||||
-rw-r--r-- | src/spdk/dpdk/drivers/event/sw/sw_evdev.c | 1095 | ||||
-rw-r--r-- | src/spdk/dpdk/drivers/event/sw/sw_evdev.h | 300 | ||||
-rw-r--r-- | src/spdk/dpdk/drivers/event/sw/sw_evdev_log.h | 23 | ||||
-rw-r--r-- | src/spdk/dpdk/drivers/event/sw/sw_evdev_scheduler.c | 568 | ||||
-rw-r--r-- | src/spdk/dpdk/drivers/event/sw/sw_evdev_selftest.c | 3401 | ||||
-rw-r--r-- | src/spdk/dpdk/drivers/event/sw/sw_evdev_worker.c | 186 | ||||
-rw-r--r-- | src/spdk/dpdk/drivers/event/sw/sw_evdev_xstats.c | 649 |
12 files changed, 6613 insertions, 0 deletions
diff --git a/src/spdk/dpdk/drivers/event/sw/Makefile b/src/spdk/dpdk/drivers/event/sw/Makefile new file mode 100644 index 000000000..8ea5cceb8 --- /dev/null +++ b/src/spdk/dpdk/drivers/event/sw/Makefile @@ -0,0 +1,29 @@ +# SPDX-License-Identifier: BSD-3-Clause +# Copyright(c) 2016-2017 Intel Corporation + +include $(RTE_SDK)/mk/rte.vars.mk + +# library name +LIB = librte_pmd_sw_event.a + +# build flags +CFLAGS += -O3 +CFLAGS += $(WERROR_FLAGS) +LDLIBS += -lrte_eal -lrte_eventdev -lrte_kvargs -lrte_ring +LDLIBS += -lrte_mempool -lrte_mbuf +LDLIBS += -lrte_bus_vdev + +# versioning export map +EXPORT_MAP := rte_pmd_sw_event_version.map + +# library source files +SRCS-$(CONFIG_RTE_LIBRTE_PMD_SW_EVENTDEV) += sw_evdev.c +SRCS-$(CONFIG_RTE_LIBRTE_PMD_SW_EVENTDEV) += sw_evdev_worker.c +SRCS-$(CONFIG_RTE_LIBRTE_PMD_SW_EVENTDEV) += sw_evdev_scheduler.c +SRCS-$(CONFIG_RTE_LIBRTE_PMD_SW_EVENTDEV) += sw_evdev_xstats.c +SRCS-$(CONFIG_RTE_LIBRTE_PMD_SW_EVENTDEV) += sw_evdev_selftest.c + +# export include files +SYMLINK-y-include += + +include $(RTE_SDK)/mk/rte.lib.mk diff --git a/src/spdk/dpdk/drivers/event/sw/event_ring.h b/src/spdk/dpdk/drivers/event/sw/event_ring.h new file mode 100644 index 000000000..02308728b --- /dev/null +++ b/src/spdk/dpdk/drivers/event/sw/event_ring.h @@ -0,0 +1,153 @@ +/* SPDX-License-Identifier: BSD-3-Clause + * Copyright(c) 2016-2017 Intel Corporation + */ + +/* + * Generic ring structure for passing events from one core to another. + * + * Used by the software scheduler for the producer and consumer rings for + * each port, i.e. for passing events from worker cores to scheduler and + * vice-versa. Designed for single-producer, single-consumer use with two + * cores working on each ring. + */ + +#ifndef _EVENT_RING_ +#define _EVENT_RING_ + +#include <stdint.h> + +#include <rte_common.h> +#include <rte_memory.h> +#include <rte_malloc.h> + +#define QE_RING_NAMESIZE 32 + +struct qe_ring { + char name[QE_RING_NAMESIZE] __rte_cache_aligned; + uint32_t ring_size; /* size of memory block allocated to the ring */ + uint32_t mask; /* mask for read/write values == ring_size -1 */ + uint32_t size; /* actual usable space in the ring */ + volatile uint32_t write_idx __rte_cache_aligned; + volatile uint32_t read_idx __rte_cache_aligned; + + struct rte_event ring[0] __rte_cache_aligned; +}; + +static inline struct qe_ring * +qe_ring_create(const char *name, unsigned int size, unsigned int socket_id) +{ + struct qe_ring *retval; + const uint32_t ring_size = rte_align32pow2(size + 1); + size_t memsize = sizeof(*retval) + + (ring_size * sizeof(retval->ring[0])); + + retval = rte_zmalloc_socket(NULL, memsize, 0, socket_id); + if (retval == NULL) + goto end; + + snprintf(retval->name, sizeof(retval->name), "EVDEV_RG_%s", name); + retval->ring_size = ring_size; + retval->mask = ring_size - 1; + retval->size = size; +end: + return retval; +} + +static inline void +qe_ring_destroy(struct qe_ring *r) +{ + rte_free(r); +} + +static __rte_always_inline unsigned int +qe_ring_count(const struct qe_ring *r) +{ + return r->write_idx - r->read_idx; +} + +static __rte_always_inline unsigned int +qe_ring_free_count(const struct qe_ring *r) +{ + return r->size - qe_ring_count(r); +} + +static __rte_always_inline unsigned int +qe_ring_enqueue_burst(struct qe_ring *r, const struct rte_event *qes, + unsigned int nb_qes, uint16_t *free_count) +{ + const uint32_t size = r->size; + const uint32_t mask = r->mask; + const uint32_t read = r->read_idx; + uint32_t write = r->write_idx; + const uint32_t space = read + size - write; + uint32_t i; + + if (space < nb_qes) + nb_qes = space; + + for (i = 0; i < nb_qes; i++, write++) + r->ring[write & mask] = qes[i]; + + rte_smp_wmb(); + + if (nb_qes != 0) + r->write_idx = write; + + *free_count = space - nb_qes; + + return nb_qes; +} + +static __rte_always_inline unsigned int +qe_ring_enqueue_burst_with_ops(struct qe_ring *r, const struct rte_event *qes, + unsigned int nb_qes, uint8_t *ops) +{ + const uint32_t size = r->size; + const uint32_t mask = r->mask; + const uint32_t read = r->read_idx; + uint32_t write = r->write_idx; + const uint32_t space = read + size - write; + uint32_t i; + + if (space < nb_qes) + nb_qes = space; + + for (i = 0; i < nb_qes; i++, write++) { + r->ring[write & mask] = qes[i]; + r->ring[write & mask].op = ops[i]; + } + + rte_smp_wmb(); + + if (nb_qes != 0) + r->write_idx = write; + + return nb_qes; +} + +static __rte_always_inline unsigned int +qe_ring_dequeue_burst(struct qe_ring *r, struct rte_event *qes, + unsigned int nb_qes) +{ + const uint32_t mask = r->mask; + uint32_t read = r->read_idx; + const uint32_t write = r->write_idx; + const uint32_t items = write - read; + uint32_t i; + + if (items < nb_qes) + nb_qes = items; + + + for (i = 0; i < nb_qes; i++, read++) + qes[i] = r->ring[read & mask]; + + rte_smp_rmb(); + + if (nb_qes != 0) + r->read_idx += nb_qes; + + return nb_qes; +} + +#endif diff --git a/src/spdk/dpdk/drivers/event/sw/iq_chunk.h b/src/spdk/dpdk/drivers/event/sw/iq_chunk.h new file mode 100644 index 000000000..31d013eab --- /dev/null +++ b/src/spdk/dpdk/drivers/event/sw/iq_chunk.h @@ -0,0 +1,196 @@ +/* SPDX-License-Identifier: BSD-3-Clause + * Copyright(c) 2017 Intel Corporation + */ + +#ifndef _IQ_CHUNK_H_ +#define _IQ_CHUNK_H_ + +#include <stdint.h> +#include <stdbool.h> +#include <rte_eventdev.h> + +#define IQ_ROB_NAMESIZE 12 + +struct sw_queue_chunk { + struct rte_event events[SW_EVS_PER_Q_CHUNK]; + struct sw_queue_chunk *next; +} __rte_cache_aligned; + +static __rte_always_inline bool +iq_empty(struct sw_iq *iq) +{ + return (iq->count == 0); +} + +static __rte_always_inline uint16_t +iq_count(const struct sw_iq *iq) +{ + return iq->count; +} + +static __rte_always_inline struct sw_queue_chunk * +iq_alloc_chunk(struct sw_evdev *sw) +{ + struct sw_queue_chunk *chunk = sw->chunk_list_head; + sw->chunk_list_head = chunk->next; + chunk->next = NULL; + return chunk; +} + +static __rte_always_inline void +iq_free_chunk(struct sw_evdev *sw, struct sw_queue_chunk *chunk) +{ + chunk->next = sw->chunk_list_head; + sw->chunk_list_head = chunk; +} + +static __rte_always_inline void +iq_free_chunk_list(struct sw_evdev *sw, struct sw_queue_chunk *head) +{ + while (head) { + struct sw_queue_chunk *next; + next = head->next; + iq_free_chunk(sw, head); + head = next; + } +} + +static __rte_always_inline void +iq_init(struct sw_evdev *sw, struct sw_iq *iq) +{ + iq->head = iq_alloc_chunk(sw); + iq->tail = iq->head; + iq->head_idx = 0; + iq->tail_idx = 0; + iq->count = 0; +} + +static __rte_always_inline void +iq_enqueue(struct sw_evdev *sw, struct sw_iq *iq, const struct rte_event *ev) +{ + iq->tail->events[iq->tail_idx++] = *ev; + iq->count++; + + if (unlikely(iq->tail_idx == SW_EVS_PER_Q_CHUNK)) { + /* The number of chunks is defined in relation to the total + * number of inflight events and number of IQS such that + * allocation will always succeed. + */ + struct sw_queue_chunk *chunk = iq_alloc_chunk(sw); + iq->tail->next = chunk; + iq->tail = chunk; + iq->tail_idx = 0; + } +} + +static __rte_always_inline void +iq_pop(struct sw_evdev *sw, struct sw_iq *iq) +{ + iq->head_idx++; + iq->count--; + + if (unlikely(iq->head_idx == SW_EVS_PER_Q_CHUNK)) { + struct sw_queue_chunk *next = iq->head->next; + iq_free_chunk(sw, iq->head); + iq->head = next; + iq->head_idx = 0; + } +} + +static __rte_always_inline const struct rte_event * +iq_peek(struct sw_iq *iq) +{ + return &iq->head->events[iq->head_idx]; +} + +/* Note: the caller must ensure that count <= iq_count() */ +static __rte_always_inline uint16_t +iq_dequeue_burst(struct sw_evdev *sw, + struct sw_iq *iq, + struct rte_event *ev, + uint16_t count) +{ + struct sw_queue_chunk *current; + uint16_t total, index; + + count = RTE_MIN(count, iq_count(iq)); + + current = iq->head; + index = iq->head_idx; + total = 0; + + /* Loop over the chunks */ + while (1) { + struct sw_queue_chunk *next; + for (; index < SW_EVS_PER_Q_CHUNK;) { + ev[total++] = current->events[index++]; + + if (unlikely(total == count)) + goto done; + } + + /* Move to the next chunk */ + next = current->next; + iq_free_chunk(sw, current); + current = next; + index = 0; + } + +done: + if (unlikely(index == SW_EVS_PER_Q_CHUNK)) { + struct sw_queue_chunk *next = current->next; + iq_free_chunk(sw, current); + iq->head = next; + iq->head_idx = 0; + } else { + iq->head = current; + iq->head_idx = index; + } + + iq->count -= total; + + return total; +} + +static __rte_always_inline void +iq_put_back(struct sw_evdev *sw, + struct sw_iq *iq, + struct rte_event *ev, + unsigned int count) +{ + /* Put back events that fit in the current head chunk. If necessary, + * put back events in a new head chunk. The caller must ensure that + * count <= SW_EVS_PER_Q_CHUNK, to ensure that at most one new head is + * needed. + */ + uint16_t avail_space = iq->head_idx; + + if (avail_space >= count) { + const uint16_t idx = avail_space - count; + uint16_t i; + + for (i = 0; i < count; i++) + iq->head->events[idx + i] = ev[i]; + + iq->head_idx = idx; + } else if (avail_space < count) { + const uint16_t remaining = count - avail_space; + struct sw_queue_chunk *new_head; + uint16_t i; + + for (i = 0; i < avail_space; i++) + iq->head->events[i] = ev[remaining + i]; + + new_head = iq_alloc_chunk(sw); + new_head->next = iq->head; + iq->head = new_head; + iq->head_idx = SW_EVS_PER_Q_CHUNK - remaining; + + for (i = 0; i < remaining; i++) + iq->head->events[iq->head_idx + i] = ev[i]; + } + + iq->count += count; +} + +#endif /* _IQ_CHUNK_H_ */ diff --git a/src/spdk/dpdk/drivers/event/sw/meson.build b/src/spdk/dpdk/drivers/event/sw/meson.build new file mode 100644 index 000000000..985012219 --- /dev/null +++ b/src/spdk/dpdk/drivers/event/sw/meson.build @@ -0,0 +1,10 @@ +# SPDX-License-Identifier: BSD-3-Clause +# Copyright(c) 2017 Intel Corporation + +sources = files('sw_evdev_scheduler.c', + 'sw_evdev_selftest.c', + 'sw_evdev_worker.c', + 'sw_evdev_xstats.c', + 'sw_evdev.c' +) +deps += ['hash', 'bus_vdev'] diff --git a/src/spdk/dpdk/drivers/event/sw/rte_pmd_sw_event_version.map b/src/spdk/dpdk/drivers/event/sw/rte_pmd_sw_event_version.map new file mode 100644 index 000000000..f9f17e4f6 --- /dev/null +++ b/src/spdk/dpdk/drivers/event/sw/rte_pmd_sw_event_version.map @@ -0,0 +1,3 @@ +DPDK_20.0 { + local: *; +}; diff --git a/src/spdk/dpdk/drivers/event/sw/sw_evdev.c b/src/spdk/dpdk/drivers/event/sw/sw_evdev.c new file mode 100644 index 000000000..fb8e8bebb --- /dev/null +++ b/src/spdk/dpdk/drivers/event/sw/sw_evdev.c @@ -0,0 +1,1095 @@ +/* SPDX-License-Identifier: BSD-3-Clause + * Copyright(c) 2016-2017 Intel Corporation + */ + +#include <inttypes.h> +#include <string.h> + +#include <rte_bus_vdev.h> +#include <rte_kvargs.h> +#include <rte_ring.h> +#include <rte_errno.h> +#include <rte_event_ring.h> +#include <rte_service_component.h> + +#include "sw_evdev.h" +#include "iq_chunk.h" + +#define EVENTDEV_NAME_SW_PMD event_sw +#define NUMA_NODE_ARG "numa_node" +#define SCHED_QUANTA_ARG "sched_quanta" +#define CREDIT_QUANTA_ARG "credit_quanta" + +static void +sw_info_get(struct rte_eventdev *dev, struct rte_event_dev_info *info); + +static int +sw_port_link(struct rte_eventdev *dev, void *port, const uint8_t queues[], + const uint8_t priorities[], uint16_t num) +{ + struct sw_port *p = port; + struct sw_evdev *sw = sw_pmd_priv(dev); + int i; + + RTE_SET_USED(priorities); + for (i = 0; i < num; i++) { + struct sw_qid *q = &sw->qids[queues[i]]; + unsigned int j; + + /* check for qid map overflow */ + if (q->cq_num_mapped_cqs >= RTE_DIM(q->cq_map)) { + rte_errno = EDQUOT; + break; + } + + if (p->is_directed && p->num_qids_mapped > 0) { + rte_errno = EDQUOT; + break; + } + + for (j = 0; j < q->cq_num_mapped_cqs; j++) { + if (q->cq_map[j] == p->id) + break; + } + + /* check if port is already linked */ + if (j < q->cq_num_mapped_cqs) + continue; + + if (q->type == SW_SCHED_TYPE_DIRECT) { + /* check directed qids only map to one port */ + if (p->num_qids_mapped > 0) { + rte_errno = EDQUOT; + break; + } + /* check port only takes a directed flow */ + if (num > 1) { + rte_errno = EDQUOT; + break; + } + + p->is_directed = 1; + p->num_qids_mapped = 1; + } else if (q->type == RTE_SCHED_TYPE_ORDERED) { + p->num_ordered_qids++; + p->num_qids_mapped++; + } else if (q->type == RTE_SCHED_TYPE_ATOMIC || + q->type == RTE_SCHED_TYPE_PARALLEL) { + p->num_qids_mapped++; + } + + q->cq_map[q->cq_num_mapped_cqs] = p->id; + rte_smp_wmb(); + q->cq_num_mapped_cqs++; + } + return i; +} + +static int +sw_port_unlink(struct rte_eventdev *dev, void *port, uint8_t queues[], + uint16_t nb_unlinks) +{ + struct sw_port *p = port; + struct sw_evdev *sw = sw_pmd_priv(dev); + unsigned int i, j; + + int unlinked = 0; + for (i = 0; i < nb_unlinks; i++) { + struct sw_qid *q = &sw->qids[queues[i]]; + for (j = 0; j < q->cq_num_mapped_cqs; j++) { + if (q->cq_map[j] == p->id) { + q->cq_map[j] = + q->cq_map[q->cq_num_mapped_cqs - 1]; + rte_smp_wmb(); + q->cq_num_mapped_cqs--; + unlinked++; + + p->num_qids_mapped--; + + if (q->type == RTE_SCHED_TYPE_ORDERED) + p->num_ordered_qids--; + + continue; + } + } + } + + p->unlinks_in_progress += unlinked; + rte_smp_mb(); + + return unlinked; +} + +static int +sw_port_unlinks_in_progress(struct rte_eventdev *dev, void *port) +{ + RTE_SET_USED(dev); + struct sw_port *p = port; + return p->unlinks_in_progress; +} + +static int +sw_port_setup(struct rte_eventdev *dev, uint8_t port_id, + const struct rte_event_port_conf *conf) +{ + struct sw_evdev *sw = sw_pmd_priv(dev); + struct sw_port *p = &sw->ports[port_id]; + char buf[RTE_RING_NAMESIZE]; + unsigned int i; + + struct rte_event_dev_info info; + sw_info_get(dev, &info); + + /* detect re-configuring and return credits to instance if needed */ + if (p->initialized) { + /* taking credits from pool is done one quanta at a time, and + * credits may be spend (counted in p->inflights) or still + * available in the port (p->inflight_credits). We must return + * the sum to no leak credits + */ + int possible_inflights = p->inflight_credits + p->inflights; + rte_atomic32_sub(&sw->inflights, possible_inflights); + } + + *p = (struct sw_port){0}; /* zero entire structure */ + p->id = port_id; + p->sw = sw; + + /* check to see if rings exists - port_setup() can be called multiple + * times legally (assuming device is stopped). If ring exists, free it + * to so it gets re-created with the correct size + */ + snprintf(buf, sizeof(buf), "sw%d_p%u_%s", dev->data->dev_id, + port_id, "rx_worker_ring"); + struct rte_event_ring *existing_ring = rte_event_ring_lookup(buf); + if (existing_ring) + rte_event_ring_free(existing_ring); + + p->rx_worker_ring = rte_event_ring_create(buf, MAX_SW_PROD_Q_DEPTH, + dev->data->socket_id, + RING_F_SP_ENQ | RING_F_SC_DEQ | RING_F_EXACT_SZ); + if (p->rx_worker_ring == NULL) { + SW_LOG_ERR("Error creating RX worker ring for port %d\n", + port_id); + return -1; + } + + p->inflight_max = conf->new_event_threshold; + p->implicit_release = !conf->disable_implicit_release; + + /* check if ring exists, same as rx_worker above */ + snprintf(buf, sizeof(buf), "sw%d_p%u, %s", dev->data->dev_id, + port_id, "cq_worker_ring"); + existing_ring = rte_event_ring_lookup(buf); + if (existing_ring) + rte_event_ring_free(existing_ring); + + p->cq_worker_ring = rte_event_ring_create(buf, conf->dequeue_depth, + dev->data->socket_id, + RING_F_SP_ENQ | RING_F_SC_DEQ | RING_F_EXACT_SZ); + if (p->cq_worker_ring == NULL) { + rte_event_ring_free(p->rx_worker_ring); + SW_LOG_ERR("Error creating CQ worker ring for port %d\n", + port_id); + return -1; + } + sw->cq_ring_space[port_id] = conf->dequeue_depth; + + /* set hist list contents to empty */ + for (i = 0; i < SW_PORT_HIST_LIST; i++) { + p->hist_list[i].fid = -1; + p->hist_list[i].qid = -1; + } + dev->data->ports[port_id] = p; + + rte_smp_wmb(); + p->initialized = 1; + return 0; +} + +static void +sw_port_release(void *port) +{ + struct sw_port *p = (void *)port; + if (p == NULL) + return; + + rte_event_ring_free(p->rx_worker_ring); + rte_event_ring_free(p->cq_worker_ring); + memset(p, 0, sizeof(*p)); +} + +static int32_t +qid_init(struct sw_evdev *sw, unsigned int idx, int type, + const struct rte_event_queue_conf *queue_conf) +{ + unsigned int i; + int dev_id = sw->data->dev_id; + int socket_id = sw->data->socket_id; + char buf[IQ_ROB_NAMESIZE]; + struct sw_qid *qid = &sw->qids[idx]; + + /* Initialize the FID structures to no pinning (-1), and zero packets */ + const struct sw_fid_t fid = {.cq = -1, .pcount = 0}; + for (i = 0; i < RTE_DIM(qid->fids); i++) + qid->fids[i] = fid; + + qid->id = idx; + qid->type = type; + qid->priority = queue_conf->priority; + + if (qid->type == RTE_SCHED_TYPE_ORDERED) { + char ring_name[RTE_RING_NAMESIZE]; + uint32_t window_size; + + /* rte_ring and window_size_mask require require window_size to + * be a power-of-2. + */ + window_size = rte_align32pow2( + queue_conf->nb_atomic_order_sequences); + + qid->window_size = window_size - 1; + + if (!window_size) { + SW_LOG_DBG( + "invalid reorder_window_size for ordered queue\n" + ); + goto cleanup; + } + + snprintf(buf, sizeof(buf), "sw%d_iq_%d_rob", dev_id, i); + qid->reorder_buffer = rte_zmalloc_socket(buf, + window_size * sizeof(qid->reorder_buffer[0]), + 0, socket_id); + if (!qid->reorder_buffer) { + SW_LOG_DBG("reorder_buffer malloc failed\n"); + goto cleanup; + } + + memset(&qid->reorder_buffer[0], + 0, + window_size * sizeof(qid->reorder_buffer[0])); + + snprintf(ring_name, sizeof(ring_name), "sw%d_q%d_freelist", + dev_id, idx); + + /* lookup the ring, and if it already exists, free it */ + struct rte_ring *cleanup = rte_ring_lookup(ring_name); + if (cleanup) + rte_ring_free(cleanup); + + qid->reorder_buffer_freelist = rte_ring_create(ring_name, + window_size, + socket_id, + RING_F_SP_ENQ | RING_F_SC_DEQ); + if (!qid->reorder_buffer_freelist) { + SW_LOG_DBG("freelist ring create failed"); + goto cleanup; + } + + /* Populate the freelist with reorder buffer entries. Enqueue + * 'window_size - 1' entries because the rte_ring holds only + * that many. + */ + for (i = 0; i < window_size - 1; i++) { + if (rte_ring_sp_enqueue(qid->reorder_buffer_freelist, + &qid->reorder_buffer[i]) < 0) + goto cleanup; + } + + qid->reorder_buffer_index = 0; + qid->cq_next_tx = 0; + } + + qid->initialized = 1; + + return 0; + +cleanup: + if (qid->reorder_buffer) { + rte_free(qid->reorder_buffer); + qid->reorder_buffer = NULL; + } + + if (qid->reorder_buffer_freelist) { + rte_ring_free(qid->reorder_buffer_freelist); + qid->reorder_buffer_freelist = NULL; + } + + return -EINVAL; +} + +static void +sw_queue_release(struct rte_eventdev *dev, uint8_t id) +{ + struct sw_evdev *sw = sw_pmd_priv(dev); + struct sw_qid *qid = &sw->qids[id]; + + if (qid->type == RTE_SCHED_TYPE_ORDERED) { + rte_free(qid->reorder_buffer); + rte_ring_free(qid->reorder_buffer_freelist); + } + memset(qid, 0, sizeof(*qid)); +} + +static int +sw_queue_setup(struct rte_eventdev *dev, uint8_t queue_id, + const struct rte_event_queue_conf *conf) +{ + int type; + + type = conf->schedule_type; + + if (RTE_EVENT_QUEUE_CFG_SINGLE_LINK & conf->event_queue_cfg) { + type = SW_SCHED_TYPE_DIRECT; + } else if (RTE_EVENT_QUEUE_CFG_ALL_TYPES + & conf->event_queue_cfg) { + SW_LOG_ERR("QUEUE_CFG_ALL_TYPES not supported\n"); + return -ENOTSUP; + } + + struct sw_evdev *sw = sw_pmd_priv(dev); + + if (sw->qids[queue_id].initialized) + sw_queue_release(dev, queue_id); + + return qid_init(sw, queue_id, type, conf); +} + +static void +sw_init_qid_iqs(struct sw_evdev *sw) +{ + int i, j; + + /* Initialize the IQ memory of all configured qids */ + for (i = 0; i < RTE_EVENT_MAX_QUEUES_PER_DEV; i++) { + struct sw_qid *qid = &sw->qids[i]; + + if (!qid->initialized) + continue; + + for (j = 0; j < SW_IQS_MAX; j++) + iq_init(sw, &qid->iq[j]); + } +} + +static int +sw_qids_empty(struct sw_evdev *sw) +{ + unsigned int i, j; + + for (i = 0; i < sw->qid_count; i++) { + for (j = 0; j < SW_IQS_MAX; j++) { + if (iq_count(&sw->qids[i].iq[j])) + return 0; + } + } + + return 1; +} + +static int +sw_ports_empty(struct sw_evdev *sw) +{ + unsigned int i; + + for (i = 0; i < sw->port_count; i++) { + if ((rte_event_ring_count(sw->ports[i].rx_worker_ring)) || + rte_event_ring_count(sw->ports[i].cq_worker_ring)) + return 0; + } + + return 1; +} + +static void +sw_drain_ports(struct rte_eventdev *dev) +{ + struct sw_evdev *sw = sw_pmd_priv(dev); + eventdev_stop_flush_t flush; + unsigned int i; + uint8_t dev_id; + void *arg; + + flush = dev->dev_ops->dev_stop_flush; + dev_id = dev->data->dev_id; + arg = dev->data->dev_stop_flush_arg; + + for (i = 0; i < sw->port_count; i++) { + struct rte_event ev; + + while (rte_event_dequeue_burst(dev_id, i, &ev, 1, 0)) { + if (flush) + flush(dev_id, ev, arg); + + ev.op = RTE_EVENT_OP_RELEASE; + rte_event_enqueue_burst(dev_id, i, &ev, 1); + } + } +} + +static void +sw_drain_queue(struct rte_eventdev *dev, struct sw_iq *iq) +{ + struct sw_evdev *sw = sw_pmd_priv(dev); + eventdev_stop_flush_t flush; + uint8_t dev_id; + void *arg; + + flush = dev->dev_ops->dev_stop_flush; + dev_id = dev->data->dev_id; + arg = dev->data->dev_stop_flush_arg; + + while (iq_count(iq) > 0) { + struct rte_event ev; + + iq_dequeue_burst(sw, iq, &ev, 1); + + if (flush) + flush(dev_id, ev, arg); + } +} + +static void +sw_drain_queues(struct rte_eventdev *dev) +{ + struct sw_evdev *sw = sw_pmd_priv(dev); + unsigned int i, j; + + for (i = 0; i < sw->qid_count; i++) { + for (j = 0; j < SW_IQS_MAX; j++) + sw_drain_queue(dev, &sw->qids[i].iq[j]); + } +} + +static void +sw_clean_qid_iqs(struct rte_eventdev *dev) +{ + struct sw_evdev *sw = sw_pmd_priv(dev); + int i, j; + + /* Release the IQ memory of all configured qids */ + for (i = 0; i < RTE_EVENT_MAX_QUEUES_PER_DEV; i++) { + struct sw_qid *qid = &sw->qids[i]; + + for (j = 0; j < SW_IQS_MAX; j++) { + if (!qid->iq[j].head) + continue; + iq_free_chunk_list(sw, qid->iq[j].head); + qid->iq[j].head = NULL; + } + } +} + +static void +sw_queue_def_conf(struct rte_eventdev *dev, uint8_t queue_id, + struct rte_event_queue_conf *conf) +{ + RTE_SET_USED(dev); + RTE_SET_USED(queue_id); + + static const struct rte_event_queue_conf default_conf = { + .nb_atomic_flows = 4096, + .nb_atomic_order_sequences = 1, + .schedule_type = RTE_SCHED_TYPE_ATOMIC, + .priority = RTE_EVENT_DEV_PRIORITY_NORMAL, + }; + + *conf = default_conf; +} + +static void +sw_port_def_conf(struct rte_eventdev *dev, uint8_t port_id, + struct rte_event_port_conf *port_conf) +{ + RTE_SET_USED(dev); + RTE_SET_USED(port_id); + + port_conf->new_event_threshold = 1024; + port_conf->dequeue_depth = 16; + port_conf->enqueue_depth = 16; + port_conf->disable_implicit_release = 0; +} + +static int +sw_dev_configure(const struct rte_eventdev *dev) +{ + struct sw_evdev *sw = sw_pmd_priv(dev); + const struct rte_eventdev_data *data = dev->data; + const struct rte_event_dev_config *conf = &data->dev_conf; + int num_chunks, i; + + sw->qid_count = conf->nb_event_queues; + sw->port_count = conf->nb_event_ports; + sw->nb_events_limit = conf->nb_events_limit; + rte_atomic32_set(&sw->inflights, 0); + + /* Number of chunks sized for worst-case spread of events across IQs */ + num_chunks = ((SW_INFLIGHT_EVENTS_TOTAL/SW_EVS_PER_Q_CHUNK)+1) + + sw->qid_count*SW_IQS_MAX*2; + + /* If this is a reconfiguration, free the previous IQ allocation. All + * IQ chunk references were cleaned out of the QIDs in sw_stop(), and + * will be reinitialized in sw_start(). + */ + if (sw->chunks) + rte_free(sw->chunks); + + sw->chunks = rte_malloc_socket(NULL, + sizeof(struct sw_queue_chunk) * + num_chunks, + 0, + sw->data->socket_id); + if (!sw->chunks) + return -ENOMEM; + + sw->chunk_list_head = NULL; + for (i = 0; i < num_chunks; i++) + iq_free_chunk(sw, &sw->chunks[i]); + + if (conf->event_dev_cfg & RTE_EVENT_DEV_CFG_PER_DEQUEUE_TIMEOUT) + return -ENOTSUP; + + return 0; +} + +struct rte_eth_dev; + +static int +sw_eth_rx_adapter_caps_get(const struct rte_eventdev *dev, + const struct rte_eth_dev *eth_dev, + uint32_t *caps) +{ + RTE_SET_USED(dev); + RTE_SET_USED(eth_dev); + *caps = RTE_EVENT_ETH_RX_ADAPTER_SW_CAP; + return 0; +} + +static int +sw_timer_adapter_caps_get(const struct rte_eventdev *dev, + uint64_t flags, + uint32_t *caps, + const struct rte_event_timer_adapter_ops **ops) +{ + RTE_SET_USED(dev); + RTE_SET_USED(flags); + *caps = 0; + + /* Use default SW ops */ + *ops = NULL; + + return 0; +} + +static int +sw_crypto_adapter_caps_get(const struct rte_eventdev *dev, + const struct rte_cryptodev *cdev, + uint32_t *caps) +{ + RTE_SET_USED(dev); + RTE_SET_USED(cdev); + *caps = RTE_EVENT_CRYPTO_ADAPTER_SW_CAP; + return 0; +} + +static void +sw_info_get(struct rte_eventdev *dev, struct rte_event_dev_info *info) +{ + RTE_SET_USED(dev); + + static const struct rte_event_dev_info evdev_sw_info = { + .driver_name = SW_PMD_NAME, + .max_event_queues = RTE_EVENT_MAX_QUEUES_PER_DEV, + .max_event_queue_flows = SW_QID_NUM_FIDS, + .max_event_queue_priority_levels = SW_Q_PRIORITY_MAX, + .max_event_priority_levels = SW_IQS_MAX, + .max_event_ports = SW_PORTS_MAX, + .max_event_port_dequeue_depth = MAX_SW_CONS_Q_DEPTH, + .max_event_port_enqueue_depth = MAX_SW_PROD_Q_DEPTH, + .max_num_events = SW_INFLIGHT_EVENTS_TOTAL, + .event_dev_cap = ( + RTE_EVENT_DEV_CAP_QUEUE_QOS | + RTE_EVENT_DEV_CAP_BURST_MODE | + RTE_EVENT_DEV_CAP_EVENT_QOS | + RTE_EVENT_DEV_CAP_IMPLICIT_RELEASE_DISABLE| + RTE_EVENT_DEV_CAP_RUNTIME_PORT_LINK | + RTE_EVENT_DEV_CAP_MULTIPLE_QUEUE_PORT | + RTE_EVENT_DEV_CAP_NONSEQ_MODE), + }; + + *info = evdev_sw_info; +} + +static void +sw_dump(struct rte_eventdev *dev, FILE *f) +{ + const struct sw_evdev *sw = sw_pmd_priv(dev); + + static const char * const q_type_strings[] = { + "Ordered", "Atomic", "Parallel", "Directed" + }; + uint32_t i; + fprintf(f, "EventDev %s: ports %d, qids %d\n", "todo-fix-name", + sw->port_count, sw->qid_count); + + fprintf(f, "\trx %"PRIu64"\n\tdrop %"PRIu64"\n\ttx %"PRIu64"\n", + sw->stats.rx_pkts, sw->stats.rx_dropped, sw->stats.tx_pkts); + fprintf(f, "\tsched calls: %"PRIu64"\n", sw->sched_called); + fprintf(f, "\tsched cq/qid call: %"PRIu64"\n", sw->sched_cq_qid_called); + fprintf(f, "\tsched no IQ enq: %"PRIu64"\n", sw->sched_no_iq_enqueues); + fprintf(f, "\tsched no CQ enq: %"PRIu64"\n", sw->sched_no_cq_enqueues); + uint32_t inflights = rte_atomic32_read(&sw->inflights); + uint32_t credits = sw->nb_events_limit - inflights; + fprintf(f, "\tinflight %d, credits: %d\n", inflights, credits); + +#define COL_RED "\x1b[31m" +#define COL_RESET "\x1b[0m" + + for (i = 0; i < sw->port_count; i++) { + int max, j; + const struct sw_port *p = &sw->ports[i]; + if (!p->initialized) { + fprintf(f, " %sPort %d not initialized.%s\n", + COL_RED, i, COL_RESET); + continue; + } + fprintf(f, " Port %d %s\n", i, + p->is_directed ? " (SingleCons)" : ""); + fprintf(f, "\trx %"PRIu64"\tdrop %"PRIu64"\ttx %"PRIu64 + "\t%sinflight %d%s\n", sw->ports[i].stats.rx_pkts, + sw->ports[i].stats.rx_dropped, + sw->ports[i].stats.tx_pkts, + (p->inflights == p->inflight_max) ? + COL_RED : COL_RESET, + sw->ports[i].inflights, COL_RESET); + + fprintf(f, "\tMax New: %u" + "\tAvg cycles PP: %"PRIu64"\tCredits: %u\n", + sw->ports[i].inflight_max, + sw->ports[i].avg_pkt_ticks, + sw->ports[i].inflight_credits); + fprintf(f, "\tReceive burst distribution:\n"); + float zp_percent = p->zero_polls * 100.0 / p->total_polls; + fprintf(f, zp_percent < 10 ? "\t\t0:%.02f%% " : "\t\t0:%.0f%% ", + zp_percent); + for (max = (int)RTE_DIM(p->poll_buckets); max-- > 0;) + if (p->poll_buckets[max] != 0) + break; + for (j = 0; j <= max; j++) { + if (p->poll_buckets[j] != 0) { + float poll_pc = p->poll_buckets[j] * 100.0 / + p->total_polls; + fprintf(f, "%u-%u:%.02f%% ", + ((j << SW_DEQ_STAT_BUCKET_SHIFT) + 1), + ((j+1) << SW_DEQ_STAT_BUCKET_SHIFT), + poll_pc); + } + } + fprintf(f, "\n"); + + if (p->rx_worker_ring) { + uint64_t used = rte_event_ring_count(p->rx_worker_ring); + uint64_t space = rte_event_ring_free_count( + p->rx_worker_ring); + const char *col = (space == 0) ? COL_RED : COL_RESET; + fprintf(f, "\t%srx ring used: %4"PRIu64"\tfree: %4" + PRIu64 COL_RESET"\n", col, used, space); + } else + fprintf(f, "\trx ring not initialized.\n"); + + if (p->cq_worker_ring) { + uint64_t used = rte_event_ring_count(p->cq_worker_ring); + uint64_t space = rte_event_ring_free_count( + p->cq_worker_ring); + const char *col = (space == 0) ? COL_RED : COL_RESET; + fprintf(f, "\t%scq ring used: %4"PRIu64"\tfree: %4" + PRIu64 COL_RESET"\n", col, used, space); + } else + fprintf(f, "\tcq ring not initialized.\n"); + } + + for (i = 0; i < sw->qid_count; i++) { + const struct sw_qid *qid = &sw->qids[i]; + if (!qid->initialized) { + fprintf(f, " %sQueue %d not initialized.%s\n", + COL_RED, i, COL_RESET); + continue; + } + int affinities_per_port[SW_PORTS_MAX] = {0}; + uint32_t inflights = 0; + + fprintf(f, " Queue %d (%s)\n", i, q_type_strings[qid->type]); + fprintf(f, "\trx %"PRIu64"\tdrop %"PRIu64"\ttx %"PRIu64"\n", + qid->stats.rx_pkts, qid->stats.rx_dropped, + qid->stats.tx_pkts); + if (qid->type == RTE_SCHED_TYPE_ORDERED) { + struct rte_ring *rob_buf_free = + qid->reorder_buffer_freelist; + if (rob_buf_free) + fprintf(f, "\tReorder entries in use: %u\n", + rte_ring_free_count(rob_buf_free)); + else + fprintf(f, + "\tReorder buffer not initialized\n"); + } + + uint32_t flow; + for (flow = 0; flow < RTE_DIM(qid->fids); flow++) + if (qid->fids[flow].cq != -1) { + affinities_per_port[qid->fids[flow].cq]++; + inflights += qid->fids[flow].pcount; + } + + uint32_t port; + fprintf(f, "\tPer Port Stats:\n"); + for (port = 0; port < sw->port_count; port++) { + fprintf(f, "\t Port %d: Pkts: %"PRIu64, port, + qid->to_port[port]); + fprintf(f, "\tFlows: %d\n", affinities_per_port[port]); + } + + uint32_t iq; + uint32_t iq_printed = 0; + for (iq = 0; iq < SW_IQS_MAX; iq++) { + if (!qid->iq[iq].head) { + fprintf(f, "\tiq %d is not initialized.\n", iq); + iq_printed = 1; + continue; + } + uint32_t used = iq_count(&qid->iq[iq]); + const char *col = COL_RESET; + if (used > 0) { + fprintf(f, "\t%siq %d: Used %d" + COL_RESET"\n", col, iq, used); + iq_printed = 1; + } + } + if (iq_printed == 0) + fprintf(f, "\t-- iqs empty --\n"); + } +} + +static int +sw_start(struct rte_eventdev *dev) +{ + unsigned int i, j; + struct sw_evdev *sw = sw_pmd_priv(dev); + + rte_service_component_runstate_set(sw->service_id, 1); + + /* check a service core is mapped to this service */ + if (!rte_service_runstate_get(sw->service_id)) { + SW_LOG_ERR("Warning: No Service core enabled on service %s\n", + sw->service_name); + return -ENOENT; + } + + /* check all ports are set up */ + for (i = 0; i < sw->port_count; i++) + if (sw->ports[i].rx_worker_ring == NULL) { + SW_LOG_ERR("Port %d not configured\n", i); + return -ESTALE; + } + + /* check all queues are configured and mapped to ports*/ + for (i = 0; i < sw->qid_count; i++) + if (!sw->qids[i].initialized || + sw->qids[i].cq_num_mapped_cqs == 0) { + SW_LOG_ERR("Queue %d not configured\n", i); + return -ENOLINK; + } + + /* build up our prioritized array of qids */ + /* We don't use qsort here, as if all/multiple entries have the same + * priority, the result is non-deterministic. From "man 3 qsort": + * "If two members compare as equal, their order in the sorted + * array is undefined." + */ + uint32_t qidx = 0; + for (j = 0; j <= RTE_EVENT_DEV_PRIORITY_LOWEST; j++) { + for (i = 0; i < sw->qid_count; i++) { + if (sw->qids[i].priority == j) { + sw->qids_prioritized[qidx] = &sw->qids[i]; + qidx++; + } + } + } + + sw_init_qid_iqs(sw); + + if (sw_xstats_init(sw) < 0) + return -EINVAL; + + rte_smp_wmb(); + sw->started = 1; + + return 0; +} + +static void +sw_stop(struct rte_eventdev *dev) +{ + struct sw_evdev *sw = sw_pmd_priv(dev); + int32_t runstate; + + /* Stop the scheduler if it's running */ + runstate = rte_service_runstate_get(sw->service_id); + if (runstate == 1) + rte_service_runstate_set(sw->service_id, 0); + + while (rte_service_may_be_active(sw->service_id)) + rte_pause(); + + /* Flush all events out of the device */ + while (!(sw_qids_empty(sw) && sw_ports_empty(sw))) { + sw_event_schedule(dev); + sw_drain_ports(dev); + sw_drain_queues(dev); + } + + sw_clean_qid_iqs(dev); + sw_xstats_uninit(sw); + sw->started = 0; + rte_smp_wmb(); + + if (runstate == 1) + rte_service_runstate_set(sw->service_id, 1); +} + +static int +sw_close(struct rte_eventdev *dev) +{ + struct sw_evdev *sw = sw_pmd_priv(dev); + uint32_t i; + + for (i = 0; i < sw->qid_count; i++) + sw_queue_release(dev, i); + sw->qid_count = 0; + + for (i = 0; i < sw->port_count; i++) + sw_port_release(&sw->ports[i]); + sw->port_count = 0; + + memset(&sw->stats, 0, sizeof(sw->stats)); + sw->sched_called = 0; + sw->sched_no_iq_enqueues = 0; + sw->sched_no_cq_enqueues = 0; + sw->sched_cq_qid_called = 0; + + return 0; +} + +static int +assign_numa_node(const char *key __rte_unused, const char *value, void *opaque) +{ + int *socket_id = opaque; + *socket_id = atoi(value); + if (*socket_id >= RTE_MAX_NUMA_NODES) + return -1; + return 0; +} + +static int +set_sched_quanta(const char *key __rte_unused, const char *value, void *opaque) +{ + int *quanta = opaque; + *quanta = atoi(value); + if (*quanta < 0 || *quanta >= 4096) + return -1; + return 0; +} + +static int +set_credit_quanta(const char *key __rte_unused, const char *value, void *opaque) +{ + int *credit = opaque; + *credit = atoi(value); + if (*credit < 0 || *credit >= 128) + return -1; + return 0; +} + + +static int32_t sw_sched_service_func(void *args) +{ + struct rte_eventdev *dev = args; + sw_event_schedule(dev); + return 0; +} + +static int +sw_probe(struct rte_vdev_device *vdev) +{ + static struct rte_eventdev_ops evdev_sw_ops = { + .dev_configure = sw_dev_configure, + .dev_infos_get = sw_info_get, + .dev_close = sw_close, + .dev_start = sw_start, + .dev_stop = sw_stop, + .dump = sw_dump, + + .queue_def_conf = sw_queue_def_conf, + .queue_setup = sw_queue_setup, + .queue_release = sw_queue_release, + .port_def_conf = sw_port_def_conf, + .port_setup = sw_port_setup, + .port_release = sw_port_release, + .port_link = sw_port_link, + .port_unlink = sw_port_unlink, + .port_unlinks_in_progress = sw_port_unlinks_in_progress, + + .eth_rx_adapter_caps_get = sw_eth_rx_adapter_caps_get, + + .timer_adapter_caps_get = sw_timer_adapter_caps_get, + + .crypto_adapter_caps_get = sw_crypto_adapter_caps_get, + + .xstats_get = sw_xstats_get, + .xstats_get_names = sw_xstats_get_names, + .xstats_get_by_name = sw_xstats_get_by_name, + .xstats_reset = sw_xstats_reset, + + .dev_selftest = test_sw_eventdev, + }; + + static const char *const args[] = { + NUMA_NODE_ARG, + SCHED_QUANTA_ARG, + CREDIT_QUANTA_ARG, + NULL + }; + const char *name; + const char *params; + struct rte_eventdev *dev; + struct sw_evdev *sw; + int socket_id = rte_socket_id(); + int sched_quanta = SW_DEFAULT_SCHED_QUANTA; + int credit_quanta = SW_DEFAULT_CREDIT_QUANTA; + + name = rte_vdev_device_name(vdev); + params = rte_vdev_device_args(vdev); + if (params != NULL && params[0] != '\0') { + struct rte_kvargs *kvlist = rte_kvargs_parse(params, args); + + if (!kvlist) { + SW_LOG_INFO( + "Ignoring unsupported parameters when creating device '%s'\n", + name); + } else { + int ret = rte_kvargs_process(kvlist, NUMA_NODE_ARG, + assign_numa_node, &socket_id); + if (ret != 0) { + SW_LOG_ERR( + "%s: Error parsing numa node parameter", + name); + rte_kvargs_free(kvlist); + return ret; + } + + ret = rte_kvargs_process(kvlist, SCHED_QUANTA_ARG, + set_sched_quanta, &sched_quanta); + if (ret != 0) { + SW_LOG_ERR( + "%s: Error parsing sched quanta parameter", + name); + rte_kvargs_free(kvlist); + return ret; + } + + ret = rte_kvargs_process(kvlist, CREDIT_QUANTA_ARG, + set_credit_quanta, &credit_quanta); + if (ret != 0) { + SW_LOG_ERR( + "%s: Error parsing credit quanta parameter", + name); + rte_kvargs_free(kvlist); + return ret; + } + + rte_kvargs_free(kvlist); + } + } + + SW_LOG_INFO( + "Creating eventdev sw device %s, numa_node=%d, sched_quanta=%d, credit_quanta=%d\n", + name, socket_id, sched_quanta, credit_quanta); + + dev = rte_event_pmd_vdev_init(name, + sizeof(struct sw_evdev), socket_id); + if (dev == NULL) { + SW_LOG_ERR("eventdev vdev init() failed"); + return -EFAULT; + } + dev->dev_ops = &evdev_sw_ops; + dev->enqueue = sw_event_enqueue; + dev->enqueue_burst = sw_event_enqueue_burst; + dev->enqueue_new_burst = sw_event_enqueue_burst; + dev->enqueue_forward_burst = sw_event_enqueue_burst; + dev->dequeue = sw_event_dequeue; + dev->dequeue_burst = sw_event_dequeue_burst; + + if (rte_eal_process_type() != RTE_PROC_PRIMARY) + return 0; + + sw = dev->data->dev_private; + sw->data = dev->data; + + /* copy values passed from vdev command line to instance */ + sw->credit_update_quanta = credit_quanta; + sw->sched_quanta = sched_quanta; + + /* register service with EAL */ + struct rte_service_spec service; + memset(&service, 0, sizeof(struct rte_service_spec)); + snprintf(service.name, sizeof(service.name), "%s_service", name); + snprintf(sw->service_name, sizeof(sw->service_name), "%s_service", + name); + service.socket_id = socket_id; + service.callback = sw_sched_service_func; + service.callback_userdata = (void *)dev; + + int32_t ret = rte_service_component_register(&service, &sw->service_id); + if (ret) { + SW_LOG_ERR("service register() failed"); + return -ENOEXEC; + } + + dev->data->service_inited = 1; + dev->data->service_id = sw->service_id; + + return 0; +} + +static int +sw_remove(struct rte_vdev_device *vdev) +{ + const char *name; + + name = rte_vdev_device_name(vdev); + if (name == NULL) + return -EINVAL; + + SW_LOG_INFO("Closing eventdev sw device %s\n", name); + + return rte_event_pmd_vdev_uninit(name); +} + +static struct rte_vdev_driver evdev_sw_pmd_drv = { + .probe = sw_probe, + .remove = sw_remove +}; + +RTE_PMD_REGISTER_VDEV(EVENTDEV_NAME_SW_PMD, evdev_sw_pmd_drv); +RTE_PMD_REGISTER_PARAM_STRING(event_sw, NUMA_NODE_ARG "=<int> " + SCHED_QUANTA_ARG "=<int>" CREDIT_QUANTA_ARG "=<int>"); + +/* declared extern in header, for access from other .c files */ +int eventdev_sw_log_level; + +RTE_INIT(evdev_sw_init_log) +{ + eventdev_sw_log_level = rte_log_register("pmd.event.sw"); + if (eventdev_sw_log_level >= 0) + rte_log_set_level(eventdev_sw_log_level, RTE_LOG_NOTICE); +} diff --git a/src/spdk/dpdk/drivers/event/sw/sw_evdev.h b/src/spdk/dpdk/drivers/event/sw/sw_evdev.h new file mode 100644 index 000000000..7c77b2495 --- /dev/null +++ b/src/spdk/dpdk/drivers/event/sw/sw_evdev.h @@ -0,0 +1,300 @@ +/* SPDX-License-Identifier: BSD-3-Clause + * Copyright(c) 2016-2017 Intel Corporation + */ + +#ifndef _SW_EVDEV_H_ +#define _SW_EVDEV_H_ + +#include "sw_evdev_log.h" +#include <rte_eventdev.h> +#include <rte_eventdev_pmd_vdev.h> +#include <rte_atomic.h> + +#define SW_DEFAULT_CREDIT_QUANTA 32 +#define SW_DEFAULT_SCHED_QUANTA 128 +#define SW_QID_NUM_FIDS 16384 +#define SW_IQS_MAX 4 +#define SW_Q_PRIORITY_MAX 255 +#define SW_PORTS_MAX 64 +#define MAX_SW_CONS_Q_DEPTH 128 +#define SW_INFLIGHT_EVENTS_TOTAL 4096 +/* allow for lots of over-provisioning */ +#define MAX_SW_PROD_Q_DEPTH 4096 +#define SW_FRAGMENTS_MAX 16 + +/* Should be power-of-two minus one, to leave room for the next pointer */ +#define SW_EVS_PER_Q_CHUNK 255 +#define SW_Q_CHUNK_SIZE ((SW_EVS_PER_Q_CHUNK + 1) * sizeof(struct rte_event)) + +/* report dequeue burst sizes in buckets */ +#define SW_DEQ_STAT_BUCKET_SHIFT 2 +/* how many packets pulled from port by sched */ +#define SCHED_DEQUEUE_BURST_SIZE 32 + +#define SW_PORT_HIST_LIST (MAX_SW_PROD_Q_DEPTH) /* size of our history list */ +#define NUM_SAMPLES 64 /* how many data points use for average stats */ + +#define EVENTDEV_NAME_SW_PMD event_sw +#define SW_PMD_NAME RTE_STR(event_sw) +#define SW_PMD_NAME_MAX 64 + +#define SW_SCHED_TYPE_DIRECT (RTE_SCHED_TYPE_PARALLEL + 1) + +#define SW_NUM_POLL_BUCKETS (MAX_SW_CONS_Q_DEPTH >> SW_DEQ_STAT_BUCKET_SHIFT) + +enum { + QE_FLAG_VALID_SHIFT = 0, + QE_FLAG_COMPLETE_SHIFT, + QE_FLAG_NOT_EOP_SHIFT, + _QE_FLAG_COUNT +}; + +#define QE_FLAG_VALID (1 << QE_FLAG_VALID_SHIFT) /* for NEW FWD, FRAG */ +#define QE_FLAG_COMPLETE (1 << QE_FLAG_COMPLETE_SHIFT) /* set for FWD, DROP */ +#define QE_FLAG_NOT_EOP (1 << QE_FLAG_NOT_EOP_SHIFT) /* set for FRAG only */ + +static const uint8_t sw_qe_flag_map[] = { + QE_FLAG_VALID /* NEW Event */, + QE_FLAG_VALID | QE_FLAG_COMPLETE /* FWD Event */, + QE_FLAG_COMPLETE /* RELEASE Event */, + + /* Values which can be used for future support for partial + * events, i.e. where one event comes back to the scheduler + * as multiple which need to be tracked together + */ + QE_FLAG_VALID | QE_FLAG_COMPLETE | QE_FLAG_NOT_EOP, +}; + +/* Records basic event stats at a given point. Used in port and qid structs */ +struct sw_point_stats { + uint64_t rx_pkts; + uint64_t rx_dropped; + uint64_t tx_pkts; +}; + +/* structure used to track what port a flow (FID) is pinned to */ +struct sw_fid_t { + /* which CQ this FID is currently pinned to */ + int32_t cq; + /* number of packets gone to the CQ with this FID */ + uint32_t pcount; +}; + +struct reorder_buffer_entry { + uint16_t num_fragments; /**< Number of packet fragments */ + uint16_t fragment_index; /**< Points to the oldest valid frag */ + uint8_t ready; /**< Entry is ready to be reordered */ + struct rte_event fragments[SW_FRAGMENTS_MAX]; +}; + +struct sw_iq { + struct sw_queue_chunk *head; + struct sw_queue_chunk *tail; + uint16_t head_idx; + uint16_t tail_idx; + uint16_t count; +}; + +struct sw_qid { + /* set when the QID has been initialized */ + uint8_t initialized; + /* The type of this QID */ + int8_t type; + /* Integer ID representing the queue. This is used in history lists, + * to identify the stage of processing. + */ + uint32_t id; + struct sw_point_stats stats; + + /* Internal priority rings for packets */ + struct sw_iq iq[SW_IQS_MAX]; + uint32_t iq_pkt_mask; /* A mask to indicate packets in an IQ */ + uint64_t iq_pkt_count[SW_IQS_MAX]; + + /* Information on what CQs are polling this IQ */ + uint32_t cq_num_mapped_cqs; + uint32_t cq_next_tx; /* cq to write next (non-atomic) packet */ + uint32_t cq_map[SW_PORTS_MAX]; + uint64_t to_port[SW_PORTS_MAX]; + + /* Track flow ids for atomic load balancing */ + struct sw_fid_t fids[SW_QID_NUM_FIDS]; + + /* Track packet order for reordering when needed */ + struct reorder_buffer_entry *reorder_buffer; /*< pkts await reorder */ + struct rte_ring *reorder_buffer_freelist; /* available reorder slots */ + uint32_t reorder_buffer_index; /* oldest valid reorder buffer entry */ + uint32_t window_size; /* Used to wrap reorder_buffer_index */ + + uint8_t priority; +}; + +struct sw_hist_list_entry { + int32_t qid; + int32_t fid; + struct reorder_buffer_entry *rob_entry; +}; + +struct sw_evdev; + +struct sw_port { + /* new enqueue / dequeue API doesn't have an instance pointer, only the + * pointer to the port being enqueue/dequeued from + */ + struct sw_evdev *sw; + + /* set when the port is initialized */ + uint8_t initialized; + /* A numeric ID for the port */ + uint8_t id; + + /* An atomic counter for when the port has been unlinked, and the + * scheduler has not yet acked this unlink - hence there may still be + * events in the buffers going to the port. When the unlinks in + * progress is read by the scheduler, no more events will be pushed to + * the port - hence the scheduler core can just assign zero. + */ + uint8_t unlinks_in_progress; + + int16_t is_directed; /** Takes from a single directed QID */ + /** + * For loadbalanced we can optimise pulling packets from + * producers if there is no reordering involved + */ + int16_t num_ordered_qids; + + /** Ring and buffer for pulling events from workers for scheduling */ + struct rte_event_ring *rx_worker_ring __rte_cache_aligned; + /** Ring and buffer for pushing packets to workers after scheduling */ + struct rte_event_ring *cq_worker_ring; + + /* hole */ + + /* num releases yet to be completed on this port */ + uint16_t outstanding_releases __rte_cache_aligned; + uint16_t inflight_max; /* app requested max inflights for this port */ + uint16_t inflight_credits; /* num credits this port has right now */ + uint8_t implicit_release; /* release events before dequeueing */ + + uint16_t last_dequeue_burst_sz; /* how big the burst was */ + uint64_t last_dequeue_ticks; /* used to track burst processing time */ + uint64_t avg_pkt_ticks; /* tracks average over NUM_SAMPLES burst */ + uint64_t total_polls; /* how many polls were counted in stats */ + uint64_t zero_polls; /* tracks polls returning nothing */ + uint32_t poll_buckets[SW_NUM_POLL_BUCKETS]; + /* bucket values in 4s for shorter reporting */ + + /* History list structs, containing info on pkts egressed to worker */ + uint16_t hist_head __rte_cache_aligned; + uint16_t hist_tail; + uint16_t inflights; + struct sw_hist_list_entry hist_list[SW_PORT_HIST_LIST]; + + /* track packets in and out of this port */ + struct sw_point_stats stats; + + + uint32_t pp_buf_start; + uint32_t pp_buf_count; + uint16_t cq_buf_count; + struct rte_event pp_buf[SCHED_DEQUEUE_BURST_SIZE]; + struct rte_event cq_buf[MAX_SW_CONS_Q_DEPTH]; + + uint8_t num_qids_mapped; +}; + +struct sw_evdev { + struct rte_eventdev_data *data; + + uint32_t port_count; + uint32_t qid_count; + uint32_t xstats_count; + struct sw_xstats_entry *xstats; + uint32_t xstats_count_mode_dev; + uint32_t xstats_count_mode_port; + uint32_t xstats_count_mode_queue; + + /* Contains all ports - load balanced and directed */ + struct sw_port ports[SW_PORTS_MAX] __rte_cache_aligned; + + rte_atomic32_t inflights __rte_cache_aligned; + + /* + * max events in this instance. Cached here for performance. + * (also available in data->conf.nb_events_limit) + */ + uint32_t nb_events_limit; + + /* Internal queues - one per logical queue */ + struct sw_qid qids[RTE_EVENT_MAX_QUEUES_PER_DEV] __rte_cache_aligned; + struct sw_queue_chunk *chunk_list_head; + struct sw_queue_chunk *chunks; + + /* Cache how many packets are in each cq */ + uint16_t cq_ring_space[SW_PORTS_MAX] __rte_cache_aligned; + + /* Array of pointers to load-balanced QIDs sorted by priority level */ + struct sw_qid *qids_prioritized[RTE_EVENT_MAX_QUEUES_PER_DEV]; + + /* Stats */ + struct sw_point_stats stats __rte_cache_aligned; + uint64_t sched_called; + int32_t sched_quanta; + uint64_t sched_no_iq_enqueues; + uint64_t sched_no_cq_enqueues; + uint64_t sched_cq_qid_called; + + uint8_t started; + uint32_t credit_update_quanta; + + /* store num stats and offset of the stats for each port */ + uint16_t xstats_count_per_port[SW_PORTS_MAX]; + uint16_t xstats_offset_for_port[SW_PORTS_MAX]; + /* store num stats and offset of the stats for each queue */ + uint16_t xstats_count_per_qid[RTE_EVENT_MAX_QUEUES_PER_DEV]; + uint16_t xstats_offset_for_qid[RTE_EVENT_MAX_QUEUES_PER_DEV]; + + uint32_t service_id; + char service_name[SW_PMD_NAME_MAX]; +}; + +static inline struct sw_evdev * +sw_pmd_priv(const struct rte_eventdev *eventdev) +{ + return eventdev->data->dev_private; +} + +static inline const struct sw_evdev * +sw_pmd_priv_const(const struct rte_eventdev *eventdev) +{ + return eventdev->data->dev_private; +} + +uint16_t sw_event_enqueue(void *port, const struct rte_event *ev); +uint16_t sw_event_enqueue_burst(void *port, const struct rte_event ev[], + uint16_t num); + +uint16_t sw_event_dequeue(void *port, struct rte_event *ev, uint64_t wait); +uint16_t sw_event_dequeue_burst(void *port, struct rte_event *ev, uint16_t num, + uint64_t wait); +void sw_event_schedule(struct rte_eventdev *dev); +int sw_xstats_init(struct sw_evdev *dev); +int sw_xstats_uninit(struct sw_evdev *dev); +int sw_xstats_get_names(const struct rte_eventdev *dev, + enum rte_event_dev_xstats_mode mode, uint8_t queue_port_id, + struct rte_event_dev_xstats_name *xstats_names, + unsigned int *ids, unsigned int size); +int sw_xstats_get(const struct rte_eventdev *dev, + enum rte_event_dev_xstats_mode mode, uint8_t queue_port_id, + const unsigned int ids[], uint64_t values[], unsigned int n); +uint64_t sw_xstats_get_by_name(const struct rte_eventdev *dev, + const char *name, unsigned int *id); +int sw_xstats_reset(struct rte_eventdev *dev, + enum rte_event_dev_xstats_mode mode, + int16_t queue_port_id, + const uint32_t ids[], + uint32_t nb_ids); + +int test_sw_eventdev(void); + +#endif /* _SW_EVDEV_H_ */ diff --git a/src/spdk/dpdk/drivers/event/sw/sw_evdev_log.h b/src/spdk/dpdk/drivers/event/sw/sw_evdev_log.h new file mode 100644 index 000000000..f76825abc --- /dev/null +++ b/src/spdk/dpdk/drivers/event/sw/sw_evdev_log.h @@ -0,0 +1,23 @@ +/* SPDX-License-Identifier: BSD-3-Clause + * Copyright(c) 2018 Intel Corporation + */ + +#ifndef _SW_EVDEV_LOG_H_ +#define _SW_EVDEV_LOG_H_ + +extern int eventdev_sw_log_level; + +#define SW_LOG_IMPL(level, fmt, args...) \ + rte_log(RTE_LOG_ ## level, eventdev_sw_log_level, "%s" fmt "\n", \ + __func__, ##args) + +#define SW_LOG_INFO(fmt, args...) \ + SW_LOG_IMPL(INFO, fmt, ## args) + +#define SW_LOG_DBG(fmt, args...) \ + SW_LOG_IMPL(DEBUG, fmt, ## args) + +#define SW_LOG_ERR(fmt, args...) \ + SW_LOG_IMPL(ERR, fmt, ## args) + +#endif /* _SW_EVDEV_LOG_H_ */ diff --git a/src/spdk/dpdk/drivers/event/sw/sw_evdev_scheduler.c b/src/spdk/dpdk/drivers/event/sw/sw_evdev_scheduler.c new file mode 100644 index 000000000..cff747da8 --- /dev/null +++ b/src/spdk/dpdk/drivers/event/sw/sw_evdev_scheduler.c @@ -0,0 +1,568 @@ +/* SPDX-License-Identifier: BSD-3-Clause + * Copyright(c) 2016-2017 Intel Corporation + */ + +#include <rte_ring.h> +#include <rte_hash_crc.h> +#include <rte_event_ring.h> +#include "sw_evdev.h" +#include "iq_chunk.h" + +#define SW_IQS_MASK (SW_IQS_MAX-1) + +/* Retrieve the highest priority IQ or -1 if no pkts available. Doing the + * CLZ twice is faster than caching the value due to data dependencies + */ +#define PKT_MASK_TO_IQ(pkts) \ + (__builtin_ctz(pkts | (1 << SW_IQS_MAX))) + +#if SW_IQS_MAX != 4 +#error Misconfigured PRIO_TO_IQ caused by SW_IQS_MAX value change +#endif +#define PRIO_TO_IQ(prio) (prio >> 6) + +#define MAX_PER_IQ_DEQUEUE 48 +#define FLOWID_MASK (SW_QID_NUM_FIDS-1) +/* use cheap bit mixing, we only need to lose a few bits */ +#define SW_HASH_FLOWID(f) (((f) ^ (f >> 10)) & FLOWID_MASK) + +static inline uint32_t +sw_schedule_atomic_to_cq(struct sw_evdev *sw, struct sw_qid * const qid, + uint32_t iq_num, unsigned int count) +{ + struct rte_event qes[MAX_PER_IQ_DEQUEUE]; /* count <= MAX */ + struct rte_event blocked_qes[MAX_PER_IQ_DEQUEUE]; + uint32_t nb_blocked = 0; + uint32_t i; + + if (count > MAX_PER_IQ_DEQUEUE) + count = MAX_PER_IQ_DEQUEUE; + + /* This is the QID ID. The QID ID is static, hence it can be + * used to identify the stage of processing in history lists etc + */ + uint32_t qid_id = qid->id; + + iq_dequeue_burst(sw, &qid->iq[iq_num], qes, count); + for (i = 0; i < count; i++) { + const struct rte_event *qe = &qes[i]; + const uint16_t flow_id = SW_HASH_FLOWID(qes[i].flow_id); + struct sw_fid_t *fid = &qid->fids[flow_id]; + int cq = fid->cq; + + if (cq < 0) { + uint32_t cq_idx; + if (qid->cq_next_tx >= qid->cq_num_mapped_cqs) + qid->cq_next_tx = 0; + cq_idx = qid->cq_next_tx++; + + cq = qid->cq_map[cq_idx]; + + /* find least used */ + int cq_free_cnt = sw->cq_ring_space[cq]; + for (cq_idx = 0; cq_idx < qid->cq_num_mapped_cqs; + cq_idx++) { + int test_cq = qid->cq_map[cq_idx]; + int test_cq_free = sw->cq_ring_space[test_cq]; + if (test_cq_free > cq_free_cnt) { + cq = test_cq; + cq_free_cnt = test_cq_free; + } + } + + fid->cq = cq; /* this pins early */ + } + + if (sw->cq_ring_space[cq] == 0 || + sw->ports[cq].inflights == SW_PORT_HIST_LIST) { + blocked_qes[nb_blocked++] = *qe; + continue; + } + + struct sw_port *p = &sw->ports[cq]; + + /* at this point we can queue up the packet on the cq_buf */ + fid->pcount++; + p->cq_buf[p->cq_buf_count++] = *qe; + p->inflights++; + sw->cq_ring_space[cq]--; + + int head = (p->hist_head++ & (SW_PORT_HIST_LIST-1)); + p->hist_list[head].fid = flow_id; + p->hist_list[head].qid = qid_id; + + p->stats.tx_pkts++; + qid->stats.tx_pkts++; + qid->to_port[cq]++; + + /* if we just filled in the last slot, flush the buffer */ + if (sw->cq_ring_space[cq] == 0) { + struct rte_event_ring *worker = p->cq_worker_ring; + rte_event_ring_enqueue_burst(worker, p->cq_buf, + p->cq_buf_count, + &sw->cq_ring_space[cq]); + p->cq_buf_count = 0; + } + } + iq_put_back(sw, &qid->iq[iq_num], blocked_qes, nb_blocked); + + return count - nb_blocked; +} + +static inline uint32_t +sw_schedule_parallel_to_cq(struct sw_evdev *sw, struct sw_qid * const qid, + uint32_t iq_num, unsigned int count, int keep_order) +{ + uint32_t i; + uint32_t cq_idx = qid->cq_next_tx; + + /* This is the QID ID. The QID ID is static, hence it can be + * used to identify the stage of processing in history lists etc + */ + uint32_t qid_id = qid->id; + + if (count > MAX_PER_IQ_DEQUEUE) + count = MAX_PER_IQ_DEQUEUE; + + if (keep_order) + /* only schedule as many as we have reorder buffer entries */ + count = RTE_MIN(count, + rte_ring_count(qid->reorder_buffer_freelist)); + + for (i = 0; i < count; i++) { + const struct rte_event *qe = iq_peek(&qid->iq[iq_num]); + uint32_t cq_check_count = 0; + uint32_t cq; + + /* + * for parallel, just send to next available CQ in round-robin + * fashion. So scan for an available CQ. If all CQs are full + * just return and move on to next QID + */ + do { + if (++cq_check_count > qid->cq_num_mapped_cqs) + goto exit; + if (cq_idx >= qid->cq_num_mapped_cqs) + cq_idx = 0; + cq = qid->cq_map[cq_idx++]; + + } while (rte_event_ring_free_count( + sw->ports[cq].cq_worker_ring) == 0 || + sw->ports[cq].inflights == SW_PORT_HIST_LIST); + + struct sw_port *p = &sw->ports[cq]; + if (sw->cq_ring_space[cq] == 0 || + p->inflights == SW_PORT_HIST_LIST) + break; + + sw->cq_ring_space[cq]--; + + qid->stats.tx_pkts++; + + const int head = (p->hist_head & (SW_PORT_HIST_LIST-1)); + p->hist_list[head].fid = SW_HASH_FLOWID(qe->flow_id); + p->hist_list[head].qid = qid_id; + + if (keep_order) + rte_ring_sc_dequeue(qid->reorder_buffer_freelist, + (void *)&p->hist_list[head].rob_entry); + + sw->ports[cq].cq_buf[sw->ports[cq].cq_buf_count++] = *qe; + iq_pop(sw, &qid->iq[iq_num]); + + rte_compiler_barrier(); + p->inflights++; + p->stats.tx_pkts++; + p->hist_head++; + } +exit: + qid->cq_next_tx = cq_idx; + return i; +} + +static uint32_t +sw_schedule_dir_to_cq(struct sw_evdev *sw, struct sw_qid * const qid, + uint32_t iq_num, unsigned int count __rte_unused) +{ + uint32_t cq_id = qid->cq_map[0]; + struct sw_port *port = &sw->ports[cq_id]; + + /* get max burst enq size for cq_ring */ + uint32_t count_free = sw->cq_ring_space[cq_id]; + if (count_free == 0) + return 0; + + /* burst dequeue from the QID IQ ring */ + struct sw_iq *iq = &qid->iq[iq_num]; + uint32_t ret = iq_dequeue_burst(sw, iq, + &port->cq_buf[port->cq_buf_count], count_free); + port->cq_buf_count += ret; + + /* Update QID, Port and Total TX stats */ + qid->stats.tx_pkts += ret; + port->stats.tx_pkts += ret; + + /* Subtract credits from cached value */ + sw->cq_ring_space[cq_id] -= ret; + + return ret; +} + +static uint32_t +sw_schedule_qid_to_cq(struct sw_evdev *sw) +{ + uint32_t pkts = 0; + uint32_t qid_idx; + + sw->sched_cq_qid_called++; + + for (qid_idx = 0; qid_idx < sw->qid_count; qid_idx++) { + struct sw_qid *qid = sw->qids_prioritized[qid_idx]; + + int type = qid->type; + int iq_num = PKT_MASK_TO_IQ(qid->iq_pkt_mask); + + /* zero mapped CQs indicates directed */ + if (iq_num >= SW_IQS_MAX || qid->cq_num_mapped_cqs == 0) + continue; + + uint32_t pkts_done = 0; + uint32_t count = iq_count(&qid->iq[iq_num]); + + if (count > 0) { + if (type == SW_SCHED_TYPE_DIRECT) + pkts_done += sw_schedule_dir_to_cq(sw, qid, + iq_num, count); + else if (type == RTE_SCHED_TYPE_ATOMIC) + pkts_done += sw_schedule_atomic_to_cq(sw, qid, + iq_num, count); + else + pkts_done += sw_schedule_parallel_to_cq(sw, qid, + iq_num, count, + type == RTE_SCHED_TYPE_ORDERED); + } + + /* Check if the IQ that was polled is now empty, and unset it + * in the IQ mask if its empty. + */ + int all_done = (pkts_done == count); + + qid->iq_pkt_mask &= ~(all_done << (iq_num)); + pkts += pkts_done; + } + + return pkts; +} + +/* This function will perform re-ordering of packets, and injecting into + * the appropriate QID IQ. As LB and DIR QIDs are in the same array, but *NOT* + * contiguous in that array, this function accepts a "range" of QIDs to scan. + */ +static uint16_t +sw_schedule_reorder(struct sw_evdev *sw, int qid_start, int qid_end) +{ + /* Perform egress reordering */ + struct rte_event *qe; + uint32_t pkts_iter = 0; + + for (; qid_start < qid_end; qid_start++) { + struct sw_qid *qid = &sw->qids[qid_start]; + int i, num_entries_in_use; + + if (qid->type != RTE_SCHED_TYPE_ORDERED) + continue; + + num_entries_in_use = rte_ring_free_count( + qid->reorder_buffer_freelist); + + for (i = 0; i < num_entries_in_use; i++) { + struct reorder_buffer_entry *entry; + int j; + + entry = &qid->reorder_buffer[qid->reorder_buffer_index]; + + if (!entry->ready) + break; + + for (j = 0; j < entry->num_fragments; j++) { + uint16_t dest_qid; + uint16_t dest_iq; + + int idx = entry->fragment_index + j; + qe = &entry->fragments[idx]; + + dest_qid = qe->queue_id; + dest_iq = PRIO_TO_IQ(qe->priority); + + if (dest_qid >= sw->qid_count) { + sw->stats.rx_dropped++; + continue; + } + + pkts_iter++; + + struct sw_qid *q = &sw->qids[dest_qid]; + struct sw_iq *iq = &q->iq[dest_iq]; + + /* we checked for space above, so enqueue must + * succeed + */ + iq_enqueue(sw, iq, qe); + q->iq_pkt_mask |= (1 << (dest_iq)); + q->iq_pkt_count[dest_iq]++; + q->stats.rx_pkts++; + } + + entry->ready = (j != entry->num_fragments); + entry->num_fragments -= j; + entry->fragment_index += j; + + if (!entry->ready) { + entry->fragment_index = 0; + + rte_ring_sp_enqueue( + qid->reorder_buffer_freelist, + entry); + + qid->reorder_buffer_index++; + qid->reorder_buffer_index %= qid->window_size; + } + } + } + return pkts_iter; +} + +static __rte_always_inline void +sw_refill_pp_buf(struct sw_evdev *sw, struct sw_port *port) +{ + RTE_SET_USED(sw); + struct rte_event_ring *worker = port->rx_worker_ring; + port->pp_buf_start = 0; + port->pp_buf_count = rte_event_ring_dequeue_burst(worker, port->pp_buf, + RTE_DIM(port->pp_buf), NULL); +} + +static __rte_always_inline uint32_t +__pull_port_lb(struct sw_evdev *sw, uint32_t port_id, int allow_reorder) +{ + static struct reorder_buffer_entry dummy_rob; + uint32_t pkts_iter = 0; + struct sw_port *port = &sw->ports[port_id]; + + /* If shadow ring has 0 pkts, pull from worker ring */ + if (port->pp_buf_count == 0) + sw_refill_pp_buf(sw, port); + + while (port->pp_buf_count) { + const struct rte_event *qe = &port->pp_buf[port->pp_buf_start]; + struct sw_hist_list_entry *hist_entry = NULL; + uint8_t flags = qe->op; + const uint16_t eop = !(flags & QE_FLAG_NOT_EOP); + int needs_reorder = 0; + /* if no-reordering, having PARTIAL == NEW */ + if (!allow_reorder && !eop) + flags = QE_FLAG_VALID; + + /* + * if we don't have space for this packet in an IQ, + * then move on to next queue. Technically, for a + * packet that needs reordering, we don't need to check + * here, but it simplifies things not to special-case + */ + uint32_t iq_num = PRIO_TO_IQ(qe->priority); + struct sw_qid *qid = &sw->qids[qe->queue_id]; + + /* now process based on flags. Note that for directed + * queues, the enqueue_flush masks off all but the + * valid flag. This makes FWD and PARTIAL enqueues just + * NEW type, and makes DROPS no-op calls. + */ + if ((flags & QE_FLAG_COMPLETE) && port->inflights > 0) { + const uint32_t hist_tail = port->hist_tail & + (SW_PORT_HIST_LIST - 1); + + hist_entry = &port->hist_list[hist_tail]; + const uint32_t hist_qid = hist_entry->qid; + const uint32_t hist_fid = hist_entry->fid; + + struct sw_fid_t *fid = + &sw->qids[hist_qid].fids[hist_fid]; + fid->pcount -= eop; + if (fid->pcount == 0) + fid->cq = -1; + + if (allow_reorder) { + /* set reorder ready if an ordered QID */ + uintptr_t rob_ptr = + (uintptr_t)hist_entry->rob_entry; + const uintptr_t valid = (rob_ptr != 0); + needs_reorder = valid; + rob_ptr |= + ((valid - 1) & (uintptr_t)&dummy_rob); + struct reorder_buffer_entry *tmp_rob_ptr = + (struct reorder_buffer_entry *)rob_ptr; + tmp_rob_ptr->ready = eop * needs_reorder; + } + + port->inflights -= eop; + port->hist_tail += eop; + } + if (flags & QE_FLAG_VALID) { + port->stats.rx_pkts++; + + if (allow_reorder && needs_reorder) { + struct reorder_buffer_entry *rob_entry = + hist_entry->rob_entry; + + hist_entry->rob_entry = NULL; + /* Although fragmentation not currently + * supported by eventdev API, we support it + * here. Open: How do we alert the user that + * they've exceeded max frags? + */ + int num_frag = rob_entry->num_fragments; + if (num_frag == SW_FRAGMENTS_MAX) + sw->stats.rx_dropped++; + else { + int idx = rob_entry->num_fragments++; + rob_entry->fragments[idx] = *qe; + } + goto end_qe; + } + + /* Use the iq_num from above to push the QE + * into the qid at the right priority + */ + + qid->iq_pkt_mask |= (1 << (iq_num)); + iq_enqueue(sw, &qid->iq[iq_num], qe); + qid->iq_pkt_count[iq_num]++; + qid->stats.rx_pkts++; + pkts_iter++; + } + +end_qe: + port->pp_buf_start++; + port->pp_buf_count--; + } /* while (avail_qes) */ + + return pkts_iter; +} + +static uint32_t +sw_schedule_pull_port_lb(struct sw_evdev *sw, uint32_t port_id) +{ + return __pull_port_lb(sw, port_id, 1); +} + +static uint32_t +sw_schedule_pull_port_no_reorder(struct sw_evdev *sw, uint32_t port_id) +{ + return __pull_port_lb(sw, port_id, 0); +} + +static uint32_t +sw_schedule_pull_port_dir(struct sw_evdev *sw, uint32_t port_id) +{ + uint32_t pkts_iter = 0; + struct sw_port *port = &sw->ports[port_id]; + + /* If shadow ring has 0 pkts, pull from worker ring */ + if (port->pp_buf_count == 0) + sw_refill_pp_buf(sw, port); + + while (port->pp_buf_count) { + const struct rte_event *qe = &port->pp_buf[port->pp_buf_start]; + uint8_t flags = qe->op; + + if ((flags & QE_FLAG_VALID) == 0) + goto end_qe; + + uint32_t iq_num = PRIO_TO_IQ(qe->priority); + struct sw_qid *qid = &sw->qids[qe->queue_id]; + struct sw_iq *iq = &qid->iq[iq_num]; + + port->stats.rx_pkts++; + + /* Use the iq_num from above to push the QE + * into the qid at the right priority + */ + qid->iq_pkt_mask |= (1 << (iq_num)); + iq_enqueue(sw, iq, qe); + qid->iq_pkt_count[iq_num]++; + qid->stats.rx_pkts++; + pkts_iter++; + +end_qe: + port->pp_buf_start++; + port->pp_buf_count--; + } /* while port->pp_buf_count */ + + return pkts_iter; +} + +void +sw_event_schedule(struct rte_eventdev *dev) +{ + struct sw_evdev *sw = sw_pmd_priv(dev); + uint32_t in_pkts, out_pkts; + uint32_t out_pkts_total = 0, in_pkts_total = 0; + int32_t sched_quanta = sw->sched_quanta; + uint32_t i; + + sw->sched_called++; + if (unlikely(!sw->started)) + return; + + do { + uint32_t in_pkts_this_iteration = 0; + + /* Pull from rx_ring for ports */ + do { + in_pkts = 0; + for (i = 0; i < sw->port_count; i++) { + /* ack the unlinks in progress as done */ + if (sw->ports[i].unlinks_in_progress) + sw->ports[i].unlinks_in_progress = 0; + + if (sw->ports[i].is_directed) + in_pkts += sw_schedule_pull_port_dir(sw, i); + else if (sw->ports[i].num_ordered_qids > 0) + in_pkts += sw_schedule_pull_port_lb(sw, i); + else + in_pkts += sw_schedule_pull_port_no_reorder(sw, i); + } + + /* QID scan for re-ordered */ + in_pkts += sw_schedule_reorder(sw, 0, + sw->qid_count); + in_pkts_this_iteration += in_pkts; + } while (in_pkts > 4 && + (int)in_pkts_this_iteration < sched_quanta); + + out_pkts = sw_schedule_qid_to_cq(sw); + out_pkts_total += out_pkts; + in_pkts_total += in_pkts_this_iteration; + + if (in_pkts == 0 && out_pkts == 0) + break; + } while ((int)out_pkts_total < sched_quanta); + + sw->stats.tx_pkts += out_pkts_total; + sw->stats.rx_pkts += in_pkts_total; + + sw->sched_no_iq_enqueues += (in_pkts_total == 0); + sw->sched_no_cq_enqueues += (out_pkts_total == 0); + + /* push all the internal buffered QEs in port->cq_ring to the + * worker cores: aka, do the ring transfers batched. + */ + for (i = 0; i < sw->port_count; i++) { + struct rte_event_ring *worker = sw->ports[i].cq_worker_ring; + rte_event_ring_enqueue_burst(worker, sw->ports[i].cq_buf, + sw->ports[i].cq_buf_count, + &sw->cq_ring_space[i]); + sw->ports[i].cq_buf_count = 0; + } + +} diff --git a/src/spdk/dpdk/drivers/event/sw/sw_evdev_selftest.c b/src/spdk/dpdk/drivers/event/sw/sw_evdev_selftest.c new file mode 100644 index 000000000..38c21fa0f --- /dev/null +++ b/src/spdk/dpdk/drivers/event/sw/sw_evdev_selftest.c @@ -0,0 +1,3401 @@ +/* SPDX-License-Identifier: BSD-3-Clause + * Copyright(c) 2016-2017 Intel Corporation + */ + +#include <stdio.h> +#include <string.h> +#include <stdint.h> +#include <errno.h> +#include <unistd.h> +#include <sys/queue.h> + +#include <rte_memory.h> +#include <rte_launch.h> +#include <rte_eal.h> +#include <rte_per_lcore.h> +#include <rte_lcore.h> +#include <rte_debug.h> +#include <rte_ethdev.h> +#include <rte_cycles.h> +#include <rte_eventdev.h> +#include <rte_pause.h> +#include <rte_service.h> +#include <rte_service_component.h> +#include <rte_bus_vdev.h> + +#include "sw_evdev.h" + +#define MAX_PORTS 16 +#define MAX_QIDS 16 +#define NUM_PACKETS (1<<18) +#define DEQUEUE_DEPTH 128 + +static int evdev; + +struct test { + struct rte_mempool *mbuf_pool; + uint8_t port[MAX_PORTS]; + uint8_t qid[MAX_QIDS]; + int nb_qids; + uint32_t service_id; +}; + +static struct rte_event release_ev; + +static inline struct rte_mbuf * +rte_gen_arp(int portid, struct rte_mempool *mp) +{ + /* + * len = 14 + 46 + * ARP, Request who-has 10.0.0.1 tell 10.0.0.2, length 46 + */ + static const uint8_t arp_request[] = { + /*0x0000:*/ 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xec, 0xa8, + 0x6b, 0xfd, 0x02, 0x29, 0x08, 0x06, 0x00, 0x01, + /*0x0010:*/ 0x08, 0x00, 0x06, 0x04, 0x00, 0x01, 0xec, 0xa8, + 0x6b, 0xfd, 0x02, 0x29, 0x0a, 0x00, 0x00, 0x01, + /*0x0020:*/ 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x0a, 0x00, + 0x00, 0x02, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, + /*0x0030:*/ 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, + 0x00, 0x00, 0x00, 0x00 + }; + struct rte_mbuf *m; + int pkt_len = sizeof(arp_request) - 1; + + m = rte_pktmbuf_alloc(mp); + if (!m) + return 0; + + memcpy((void *)((uintptr_t)m->buf_addr + m->data_off), + arp_request, pkt_len); + rte_pktmbuf_pkt_len(m) = pkt_len; + rte_pktmbuf_data_len(m) = pkt_len; + + RTE_SET_USED(portid); + + return m; +} + +static void +xstats_print(void) +{ + const uint32_t XSTATS_MAX = 1024; + uint32_t i; + uint32_t ids[XSTATS_MAX]; + uint64_t values[XSTATS_MAX]; + struct rte_event_dev_xstats_name xstats_names[XSTATS_MAX]; + + for (i = 0; i < XSTATS_MAX; i++) + ids[i] = i; + + /* Device names / values */ + int ret = rte_event_dev_xstats_names_get(evdev, + RTE_EVENT_DEV_XSTATS_DEVICE, 0, + xstats_names, ids, XSTATS_MAX); + if (ret < 0) { + printf("%d: xstats names get() returned error\n", + __LINE__); + return; + } + ret = rte_event_dev_xstats_get(evdev, + RTE_EVENT_DEV_XSTATS_DEVICE, + 0, ids, values, ret); + if (ret > (signed int)XSTATS_MAX) + printf("%s %d: more xstats available than space\n", + __func__, __LINE__); + for (i = 0; (signed int)i < ret; i++) { + printf("%d : %s : %"PRIu64"\n", + i, xstats_names[i].name, values[i]); + } + + /* Port names / values */ + ret = rte_event_dev_xstats_names_get(evdev, + RTE_EVENT_DEV_XSTATS_PORT, 0, + xstats_names, ids, XSTATS_MAX); + ret = rte_event_dev_xstats_get(evdev, + RTE_EVENT_DEV_XSTATS_PORT, 1, + ids, values, ret); + if (ret > (signed int)XSTATS_MAX) + printf("%s %d: more xstats available than space\n", + __func__, __LINE__); + for (i = 0; (signed int)i < ret; i++) { + printf("%d : %s : %"PRIu64"\n", + i, xstats_names[i].name, values[i]); + } + + /* Queue names / values */ + ret = rte_event_dev_xstats_names_get(evdev, + RTE_EVENT_DEV_XSTATS_QUEUE, 0, + xstats_names, ids, XSTATS_MAX); + ret = rte_event_dev_xstats_get(evdev, + RTE_EVENT_DEV_XSTATS_QUEUE, + 1, ids, values, ret); + if (ret > (signed int)XSTATS_MAX) + printf("%s %d: more xstats available than space\n", + __func__, __LINE__); + for (i = 0; (signed int)i < ret; i++) { + printf("%d : %s : %"PRIu64"\n", + i, xstats_names[i].name, values[i]); + } +} + +/* initialization and config */ +static inline int +init(struct test *t, int nb_queues, int nb_ports) +{ + struct rte_event_dev_config config = { + .nb_event_queues = nb_queues, + .nb_event_ports = nb_ports, + .nb_event_queue_flows = 1024, + .nb_events_limit = 4096, + .nb_event_port_dequeue_depth = DEQUEUE_DEPTH, + .nb_event_port_enqueue_depth = 128, + }; + int ret; + + void *temp = t->mbuf_pool; /* save and restore mbuf pool */ + + memset(t, 0, sizeof(*t)); + t->mbuf_pool = temp; + + ret = rte_event_dev_configure(evdev, &config); + if (ret < 0) + printf("%d: Error configuring device\n", __LINE__); + return ret; +}; + +static inline int +create_ports(struct test *t, int num_ports) +{ + int i; + static const struct rte_event_port_conf conf = { + .new_event_threshold = 1024, + .dequeue_depth = 32, + .enqueue_depth = 64, + .disable_implicit_release = 0, + }; + if (num_ports > MAX_PORTS) + return -1; + + for (i = 0; i < num_ports; i++) { + if (rte_event_port_setup(evdev, i, &conf) < 0) { + printf("Error setting up port %d\n", i); + return -1; + } + t->port[i] = i; + } + + return 0; +} + +static inline int +create_lb_qids(struct test *t, int num_qids, uint32_t flags) +{ + int i; + + /* Q creation */ + const struct rte_event_queue_conf conf = { + .schedule_type = flags, + .priority = RTE_EVENT_DEV_PRIORITY_NORMAL, + .nb_atomic_flows = 1024, + .nb_atomic_order_sequences = 1024, + }; + + for (i = t->nb_qids; i < t->nb_qids + num_qids; i++) { + if (rte_event_queue_setup(evdev, i, &conf) < 0) { + printf("%d: error creating qid %d\n", __LINE__, i); + return -1; + } + t->qid[i] = i; + } + t->nb_qids += num_qids; + if (t->nb_qids > MAX_QIDS) + return -1; + + return 0; +} + +static inline int +create_atomic_qids(struct test *t, int num_qids) +{ + return create_lb_qids(t, num_qids, RTE_SCHED_TYPE_ATOMIC); +} + +static inline int +create_ordered_qids(struct test *t, int num_qids) +{ + return create_lb_qids(t, num_qids, RTE_SCHED_TYPE_ORDERED); +} + + +static inline int +create_unordered_qids(struct test *t, int num_qids) +{ + return create_lb_qids(t, num_qids, RTE_SCHED_TYPE_PARALLEL); +} + +static inline int +create_directed_qids(struct test *t, int num_qids, const uint8_t ports[]) +{ + int i; + + /* Q creation */ + static const struct rte_event_queue_conf conf = { + .priority = RTE_EVENT_DEV_PRIORITY_NORMAL, + .event_queue_cfg = RTE_EVENT_QUEUE_CFG_SINGLE_LINK, + }; + + for (i = t->nb_qids; i < t->nb_qids + num_qids; i++) { + if (rte_event_queue_setup(evdev, i, &conf) < 0) { + printf("%d: error creating qid %d\n", __LINE__, i); + return -1; + } + t->qid[i] = i; + + if (rte_event_port_link(evdev, ports[i - t->nb_qids], + &t->qid[i], NULL, 1) != 1) { + printf("%d: error creating link for qid %d\n", + __LINE__, i); + return -1; + } + } + t->nb_qids += num_qids; + if (t->nb_qids > MAX_QIDS) + return -1; + + return 0; +} + +/* destruction */ +static inline int +cleanup(struct test *t __rte_unused) +{ + rte_event_dev_stop(evdev); + rte_event_dev_close(evdev); + return 0; +}; + +struct test_event_dev_stats { + uint64_t rx_pkts; /**< Total packets received */ + uint64_t rx_dropped; /**< Total packets dropped (Eg Invalid QID) */ + uint64_t tx_pkts; /**< Total packets transmitted */ + + /** Packets received on this port */ + uint64_t port_rx_pkts[MAX_PORTS]; + /** Packets dropped on this port */ + uint64_t port_rx_dropped[MAX_PORTS]; + /** Packets inflight on this port */ + uint64_t port_inflight[MAX_PORTS]; + /** Packets transmitted on this port */ + uint64_t port_tx_pkts[MAX_PORTS]; + /** Packets received on this qid */ + uint64_t qid_rx_pkts[MAX_QIDS]; + /** Packets dropped on this qid */ + uint64_t qid_rx_dropped[MAX_QIDS]; + /** Packets transmitted on this qid */ + uint64_t qid_tx_pkts[MAX_QIDS]; +}; + +static inline int +test_event_dev_stats_get(int dev_id, struct test_event_dev_stats *stats) +{ + static uint32_t i; + static uint32_t total_ids[3]; /* rx, tx and drop */ + static uint32_t port_rx_pkts_ids[MAX_PORTS]; + static uint32_t port_rx_dropped_ids[MAX_PORTS]; + static uint32_t port_inflight_ids[MAX_PORTS]; + static uint32_t port_tx_pkts_ids[MAX_PORTS]; + static uint32_t qid_rx_pkts_ids[MAX_QIDS]; + static uint32_t qid_rx_dropped_ids[MAX_QIDS]; + static uint32_t qid_tx_pkts_ids[MAX_QIDS]; + + + stats->rx_pkts = rte_event_dev_xstats_by_name_get(dev_id, + "dev_rx", &total_ids[0]); + stats->rx_dropped = rte_event_dev_xstats_by_name_get(dev_id, + "dev_drop", &total_ids[1]); + stats->tx_pkts = rte_event_dev_xstats_by_name_get(dev_id, + "dev_tx", &total_ids[2]); + for (i = 0; i < MAX_PORTS; i++) { + char name[32]; + snprintf(name, sizeof(name), "port_%u_rx", i); + stats->port_rx_pkts[i] = rte_event_dev_xstats_by_name_get( + dev_id, name, &port_rx_pkts_ids[i]); + snprintf(name, sizeof(name), "port_%u_drop", i); + stats->port_rx_dropped[i] = rte_event_dev_xstats_by_name_get( + dev_id, name, &port_rx_dropped_ids[i]); + snprintf(name, sizeof(name), "port_%u_inflight", i); + stats->port_inflight[i] = rte_event_dev_xstats_by_name_get( + dev_id, name, &port_inflight_ids[i]); + snprintf(name, sizeof(name), "port_%u_tx", i); + stats->port_tx_pkts[i] = rte_event_dev_xstats_by_name_get( + dev_id, name, &port_tx_pkts_ids[i]); + } + for (i = 0; i < MAX_QIDS; i++) { + char name[32]; + snprintf(name, sizeof(name), "qid_%u_rx", i); + stats->qid_rx_pkts[i] = rte_event_dev_xstats_by_name_get( + dev_id, name, &qid_rx_pkts_ids[i]); + snprintf(name, sizeof(name), "qid_%u_drop", i); + stats->qid_rx_dropped[i] = rte_event_dev_xstats_by_name_get( + dev_id, name, &qid_rx_dropped_ids[i]); + snprintf(name, sizeof(name), "qid_%u_tx", i); + stats->qid_tx_pkts[i] = rte_event_dev_xstats_by_name_get( + dev_id, name, &qid_tx_pkts_ids[i]); + } + + return 0; +} + +/* run_prio_packet_test + * This performs a basic packet priority check on the test instance passed in. + * It is factored out of the main priority tests as the same tests must be + * performed to ensure prioritization of each type of QID. + * + * Requirements: + * - An initialized test structure, including mempool + * - t->port[0] is initialized for both Enq / Deq of packets to the QID + * - t->qid[0] is the QID to be tested + * - if LB QID, the CQ must be mapped to the QID. + */ +static int +run_prio_packet_test(struct test *t) +{ + int err; + const uint32_t MAGIC_SEQN[] = {4711, 1234}; + const uint32_t PRIORITY[] = { + RTE_EVENT_DEV_PRIORITY_NORMAL, + RTE_EVENT_DEV_PRIORITY_HIGHEST + }; + unsigned int i; + for (i = 0; i < RTE_DIM(MAGIC_SEQN); i++) { + /* generate pkt and enqueue */ + struct rte_event ev; + struct rte_mbuf *arp = rte_gen_arp(0, t->mbuf_pool); + if (!arp) { + printf("%d: gen of pkt failed\n", __LINE__); + return -1; + } + arp->seqn = MAGIC_SEQN[i]; + + ev = (struct rte_event){ + .priority = PRIORITY[i], + .op = RTE_EVENT_OP_NEW, + .queue_id = t->qid[0], + .mbuf = arp + }; + err = rte_event_enqueue_burst(evdev, t->port[0], &ev, 1); + if (err != 1) { + printf("%d: error failed to enqueue\n", __LINE__); + return -1; + } + } + + rte_service_run_iter_on_app_lcore(t->service_id, 1); + + struct test_event_dev_stats stats; + err = test_event_dev_stats_get(evdev, &stats); + if (err) { + printf("%d: error failed to get stats\n", __LINE__); + return -1; + } + + if (stats.port_rx_pkts[t->port[0]] != 2) { + printf("%d: error stats incorrect for directed port\n", + __LINE__); + rte_event_dev_dump(evdev, stdout); + return -1; + } + + struct rte_event ev, ev2; + uint32_t deq_pkts; + deq_pkts = rte_event_dequeue_burst(evdev, t->port[0], &ev, 1, 0); + if (deq_pkts != 1) { + printf("%d: error failed to deq\n", __LINE__); + rte_event_dev_dump(evdev, stdout); + return -1; + } + if (ev.mbuf->seqn != MAGIC_SEQN[1]) { + printf("%d: first packet out not highest priority\n", + __LINE__); + rte_event_dev_dump(evdev, stdout); + return -1; + } + rte_pktmbuf_free(ev.mbuf); + + deq_pkts = rte_event_dequeue_burst(evdev, t->port[0], &ev2, 1, 0); + if (deq_pkts != 1) { + printf("%d: error failed to deq\n", __LINE__); + rte_event_dev_dump(evdev, stdout); + return -1; + } + if (ev2.mbuf->seqn != MAGIC_SEQN[0]) { + printf("%d: second packet out not lower priority\n", + __LINE__); + rte_event_dev_dump(evdev, stdout); + return -1; + } + rte_pktmbuf_free(ev2.mbuf); + + cleanup(t); + return 0; +} + +static int +test_single_directed_packet(struct test *t) +{ + const int rx_enq = 0; + const int wrk_enq = 2; + int err; + + /* Create instance with 3 directed QIDs going to 3 ports */ + if (init(t, 3, 3) < 0 || + create_ports(t, 3) < 0 || + create_directed_qids(t, 3, t->port) < 0) + return -1; + + if (rte_event_dev_start(evdev) < 0) { + printf("%d: Error with start call\n", __LINE__); + return -1; + } + + /************** FORWARD ****************/ + struct rte_mbuf *arp = rte_gen_arp(0, t->mbuf_pool); + struct rte_event ev = { + .op = RTE_EVENT_OP_NEW, + .queue_id = wrk_enq, + .mbuf = arp, + }; + + if (!arp) { + printf("%d: gen of pkt failed\n", __LINE__); + return -1; + } + + const uint32_t MAGIC_SEQN = 4711; + arp->seqn = MAGIC_SEQN; + + /* generate pkt and enqueue */ + err = rte_event_enqueue_burst(evdev, rx_enq, &ev, 1); + if (err != 1) { + printf("%d: error failed to enqueue\n", __LINE__); + return -1; + } + + /* Run schedule() as dir packets may need to be re-ordered */ + rte_service_run_iter_on_app_lcore(t->service_id, 1); + + struct test_event_dev_stats stats; + err = test_event_dev_stats_get(evdev, &stats); + if (err) { + printf("%d: error failed to get stats\n", __LINE__); + return -1; + } + + if (stats.port_rx_pkts[rx_enq] != 1) { + printf("%d: error stats incorrect for directed port\n", + __LINE__); + return -1; + } + + uint32_t deq_pkts; + deq_pkts = rte_event_dequeue_burst(evdev, wrk_enq, &ev, 1, 0); + if (deq_pkts != 1) { + printf("%d: error failed to deq\n", __LINE__); + return -1; + } + + err = test_event_dev_stats_get(evdev, &stats); + if (stats.port_rx_pkts[wrk_enq] != 0 && + stats.port_rx_pkts[wrk_enq] != 1) { + printf("%d: error directed stats post-dequeue\n", __LINE__); + return -1; + } + + if (ev.mbuf->seqn != MAGIC_SEQN) { + printf("%d: error magic sequence number not dequeued\n", + __LINE__); + return -1; + } + + rte_pktmbuf_free(ev.mbuf); + cleanup(t); + return 0; +} + +static int +test_directed_forward_credits(struct test *t) +{ + uint32_t i; + int32_t err; + + if (init(t, 1, 1) < 0 || + create_ports(t, 1) < 0 || + create_directed_qids(t, 1, t->port) < 0) + return -1; + + if (rte_event_dev_start(evdev) < 0) { + printf("%d: Error with start call\n", __LINE__); + return -1; + } + + struct rte_event ev = { + .op = RTE_EVENT_OP_NEW, + .queue_id = 0, + }; + + for (i = 0; i < 1000; i++) { + err = rte_event_enqueue_burst(evdev, 0, &ev, 1); + if (err != 1) { + printf("%d: error failed to enqueue\n", __LINE__); + return -1; + } + rte_service_run_iter_on_app_lcore(t->service_id, 1); + + uint32_t deq_pkts; + deq_pkts = rte_event_dequeue_burst(evdev, 0, &ev, 1, 0); + if (deq_pkts != 1) { + printf("%d: error failed to deq\n", __LINE__); + return -1; + } + + /* re-write event to be a forward, and continue looping it */ + ev.op = RTE_EVENT_OP_FORWARD; + } + + cleanup(t); + return 0; +} + + +static int +test_priority_directed(struct test *t) +{ + if (init(t, 1, 1) < 0 || + create_ports(t, 1) < 0 || + create_directed_qids(t, 1, t->port) < 0) { + printf("%d: Error initializing device\n", __LINE__); + return -1; + } + + if (rte_event_dev_start(evdev) < 0) { + printf("%d: Error with start call\n", __LINE__); + return -1; + } + + return run_prio_packet_test(t); +} + +static int +test_priority_atomic(struct test *t) +{ + if (init(t, 1, 1) < 0 || + create_ports(t, 1) < 0 || + create_atomic_qids(t, 1) < 0) { + printf("%d: Error initializing device\n", __LINE__); + return -1; + } + + /* map the QID */ + if (rte_event_port_link(evdev, t->port[0], &t->qid[0], NULL, 1) != 1) { + printf("%d: error mapping qid to port\n", __LINE__); + return -1; + } + if (rte_event_dev_start(evdev) < 0) { + printf("%d: Error with start call\n", __LINE__); + return -1; + } + + return run_prio_packet_test(t); +} + +static int +test_priority_ordered(struct test *t) +{ + if (init(t, 1, 1) < 0 || + create_ports(t, 1) < 0 || + create_ordered_qids(t, 1) < 0) { + printf("%d: Error initializing device\n", __LINE__); + return -1; + } + + /* map the QID */ + if (rte_event_port_link(evdev, t->port[0], &t->qid[0], NULL, 1) != 1) { + printf("%d: error mapping qid to port\n", __LINE__); + return -1; + } + if (rte_event_dev_start(evdev) < 0) { + printf("%d: Error with start call\n", __LINE__); + return -1; + } + + return run_prio_packet_test(t); +} + +static int +test_priority_unordered(struct test *t) +{ + if (init(t, 1, 1) < 0 || + create_ports(t, 1) < 0 || + create_unordered_qids(t, 1) < 0) { + printf("%d: Error initializing device\n", __LINE__); + return -1; + } + + /* map the QID */ + if (rte_event_port_link(evdev, t->port[0], &t->qid[0], NULL, 1) != 1) { + printf("%d: error mapping qid to port\n", __LINE__); + return -1; + } + if (rte_event_dev_start(evdev) < 0) { + printf("%d: Error with start call\n", __LINE__); + return -1; + } + + return run_prio_packet_test(t); +} + +static int +burst_packets(struct test *t) +{ + /************** CONFIG ****************/ + uint32_t i; + int err; + int ret; + + /* Create instance with 2 ports and 2 queues */ + if (init(t, 2, 2) < 0 || + create_ports(t, 2) < 0 || + create_atomic_qids(t, 2) < 0) { + printf("%d: Error initializing device\n", __LINE__); + return -1; + } + + /* CQ mapping to QID */ + ret = rte_event_port_link(evdev, t->port[0], &t->qid[0], NULL, 1); + if (ret != 1) { + printf("%d: error mapping lb qid0\n", __LINE__); + return -1; + } + ret = rte_event_port_link(evdev, t->port[1], &t->qid[1], NULL, 1); + if (ret != 1) { + printf("%d: error mapping lb qid1\n", __LINE__); + return -1; + } + + if (rte_event_dev_start(evdev) < 0) { + printf("%d: Error with start call\n", __LINE__); + return -1; + } + + /************** FORWARD ****************/ + const uint32_t rx_port = 0; + const uint32_t NUM_PKTS = 2; + + for (i = 0; i < NUM_PKTS; i++) { + struct rte_mbuf *arp = rte_gen_arp(0, t->mbuf_pool); + if (!arp) { + printf("%d: error generating pkt\n", __LINE__); + return -1; + } + + struct rte_event ev = { + .op = RTE_EVENT_OP_NEW, + .queue_id = i % 2, + .flow_id = i % 3, + .mbuf = arp, + }; + /* generate pkt and enqueue */ + err = rte_event_enqueue_burst(evdev, t->port[rx_port], &ev, 1); + if (err != 1) { + printf("%d: Failed to enqueue\n", __LINE__); + return -1; + } + } + rte_service_run_iter_on_app_lcore(t->service_id, 1); + + /* Check stats for all NUM_PKTS arrived to sched core */ + struct test_event_dev_stats stats; + + err = test_event_dev_stats_get(evdev, &stats); + if (err) { + printf("%d: failed to get stats\n", __LINE__); + return -1; + } + if (stats.rx_pkts != NUM_PKTS || stats.tx_pkts != NUM_PKTS) { + printf("%d: Sched core didn't receive all %d pkts\n", + __LINE__, NUM_PKTS); + rte_event_dev_dump(evdev, stdout); + return -1; + } + + uint32_t deq_pkts; + int p; + + deq_pkts = 0; + /******** DEQ QID 1 *******/ + do { + struct rte_event ev; + p = rte_event_dequeue_burst(evdev, t->port[0], &ev, 1, 0); + deq_pkts += p; + rte_pktmbuf_free(ev.mbuf); + } while (p); + + if (deq_pkts != NUM_PKTS/2) { + printf("%d: Half of NUM_PKTS didn't arrive at port 1\n", + __LINE__); + return -1; + } + + /******** DEQ QID 2 *******/ + deq_pkts = 0; + do { + struct rte_event ev; + p = rte_event_dequeue_burst(evdev, t->port[1], &ev, 1, 0); + deq_pkts += p; + rte_pktmbuf_free(ev.mbuf); + } while (p); + if (deq_pkts != NUM_PKTS/2) { + printf("%d: Half of NUM_PKTS didn't arrive at port 2\n", + __LINE__); + return -1; + } + + cleanup(t); + return 0; +} + +static int +abuse_inflights(struct test *t) +{ + const int rx_enq = 0; + const int wrk_enq = 2; + int err; + + /* Create instance with 4 ports */ + if (init(t, 1, 4) < 0 || + create_ports(t, 4) < 0 || + create_atomic_qids(t, 1) < 0) { + printf("%d: Error initializing device\n", __LINE__); + return -1; + } + + /* CQ mapping to QID */ + err = rte_event_port_link(evdev, t->port[wrk_enq], NULL, NULL, 0); + if (err != 1) { + printf("%d: error mapping lb qid\n", __LINE__); + cleanup(t); + return -1; + } + + if (rte_event_dev_start(evdev) < 0) { + printf("%d: Error with start call\n", __LINE__); + return -1; + } + + /* Enqueue op only */ + err = rte_event_enqueue_burst(evdev, t->port[rx_enq], &release_ev, 1); + if (err != 1) { + printf("%d: Failed to enqueue\n", __LINE__); + return -1; + } + + /* schedule */ + rte_service_run_iter_on_app_lcore(t->service_id, 1); + + struct test_event_dev_stats stats; + + err = test_event_dev_stats_get(evdev, &stats); + if (err) { + printf("%d: failed to get stats\n", __LINE__); + return -1; + } + + if (stats.rx_pkts != 0 || + stats.tx_pkts != 0 || + stats.port_inflight[wrk_enq] != 0) { + printf("%d: Sched core didn't handle pkt as expected\n", + __LINE__); + return -1; + } + + cleanup(t); + return 0; +} + +static int +xstats_tests(struct test *t) +{ + const int wrk_enq = 2; + int err; + + /* Create instance with 4 ports */ + if (init(t, 1, 4) < 0 || + create_ports(t, 4) < 0 || + create_atomic_qids(t, 1) < 0) { + printf("%d: Error initializing device\n", __LINE__); + return -1; + } + + /* CQ mapping to QID */ + err = rte_event_port_link(evdev, t->port[wrk_enq], NULL, NULL, 0); + if (err != 1) { + printf("%d: error mapping lb qid\n", __LINE__); + cleanup(t); + return -1; + } + + if (rte_event_dev_start(evdev) < 0) { + printf("%d: Error with start call\n", __LINE__); + return -1; + } + + const uint32_t XSTATS_MAX = 1024; + + uint32_t i; + uint32_t ids[XSTATS_MAX]; + uint64_t values[XSTATS_MAX]; + struct rte_event_dev_xstats_name xstats_names[XSTATS_MAX]; + + for (i = 0; i < XSTATS_MAX; i++) + ids[i] = i; + + /* Device names / values */ + int ret = rte_event_dev_xstats_names_get(evdev, + RTE_EVENT_DEV_XSTATS_DEVICE, + 0, xstats_names, ids, XSTATS_MAX); + if (ret != 6) { + printf("%d: expected 6 stats, got return %d\n", __LINE__, ret); + return -1; + } + ret = rte_event_dev_xstats_get(evdev, + RTE_EVENT_DEV_XSTATS_DEVICE, + 0, ids, values, ret); + if (ret != 6) { + printf("%d: expected 6 stats, got return %d\n", __LINE__, ret); + return -1; + } + + /* Port names / values */ + ret = rte_event_dev_xstats_names_get(evdev, + RTE_EVENT_DEV_XSTATS_PORT, 0, + xstats_names, ids, XSTATS_MAX); + if (ret != 21) { + printf("%d: expected 21 stats, got return %d\n", __LINE__, ret); + return -1; + } + ret = rte_event_dev_xstats_get(evdev, + RTE_EVENT_DEV_XSTATS_PORT, 0, + ids, values, ret); + if (ret != 21) { + printf("%d: expected 21 stats, got return %d\n", __LINE__, ret); + return -1; + } + + /* Queue names / values */ + ret = rte_event_dev_xstats_names_get(evdev, + RTE_EVENT_DEV_XSTATS_QUEUE, + 0, xstats_names, ids, XSTATS_MAX); + if (ret != 16) { + printf("%d: expected 16 stats, got return %d\n", __LINE__, ret); + return -1; + } + + /* NEGATIVE TEST: with wrong queue passed, 0 stats should be returned */ + ret = rte_event_dev_xstats_get(evdev, + RTE_EVENT_DEV_XSTATS_QUEUE, + 1, ids, values, ret); + if (ret != -EINVAL) { + printf("%d: expected 0 stats, got return %d\n", __LINE__, ret); + return -1; + } + + ret = rte_event_dev_xstats_get(evdev, + RTE_EVENT_DEV_XSTATS_QUEUE, + 0, ids, values, ret); + if (ret != 16) { + printf("%d: expected 16 stats, got return %d\n", __LINE__, ret); + return -1; + } + + /* enqueue packets to check values */ + for (i = 0; i < 3; i++) { + struct rte_event ev; + struct rte_mbuf *arp = rte_gen_arp(0, t->mbuf_pool); + if (!arp) { + printf("%d: gen of pkt failed\n", __LINE__); + return -1; + } + ev.queue_id = t->qid[i]; + ev.op = RTE_EVENT_OP_NEW; + ev.mbuf = arp; + ev.flow_id = 7; + arp->seqn = i; + + int err = rte_event_enqueue_burst(evdev, t->port[0], &ev, 1); + if (err != 1) { + printf("%d: Failed to enqueue\n", __LINE__); + return -1; + } + } + + rte_service_run_iter_on_app_lcore(t->service_id, 1); + + /* Device names / values */ + int num_stats = rte_event_dev_xstats_names_get(evdev, + RTE_EVENT_DEV_XSTATS_DEVICE, 0, + xstats_names, ids, XSTATS_MAX); + if (num_stats < 0) + goto fail; + ret = rte_event_dev_xstats_get(evdev, + RTE_EVENT_DEV_XSTATS_DEVICE, + 0, ids, values, num_stats); + static const uint64_t expected[] = {3, 3, 0, 1, 0, 0}; + for (i = 0; (signed int)i < ret; i++) { + if (expected[i] != values[i]) { + printf( + "%d Error xstat %d (id %d) %s : %"PRIu64 + ", expect %"PRIu64"\n", + __LINE__, i, ids[i], xstats_names[i].name, + values[i], expected[i]); + goto fail; + } + } + + ret = rte_event_dev_xstats_reset(evdev, RTE_EVENT_DEV_XSTATS_DEVICE, + 0, NULL, 0); + + /* ensure reset statistics are zero-ed */ + static const uint64_t expected_zero[] = {0, 0, 0, 0, 0, 0}; + ret = rte_event_dev_xstats_get(evdev, + RTE_EVENT_DEV_XSTATS_DEVICE, + 0, ids, values, num_stats); + for (i = 0; (signed int)i < ret; i++) { + if (expected_zero[i] != values[i]) { + printf( + "%d Error, xstat %d (id %d) %s : %"PRIu64 + ", expect %"PRIu64"\n", + __LINE__, i, ids[i], xstats_names[i].name, + values[i], expected_zero[i]); + goto fail; + } + } + + /* port reset checks */ + num_stats = rte_event_dev_xstats_names_get(evdev, + RTE_EVENT_DEV_XSTATS_PORT, 0, + xstats_names, ids, XSTATS_MAX); + if (num_stats < 0) + goto fail; + ret = rte_event_dev_xstats_get(evdev, RTE_EVENT_DEV_XSTATS_PORT, + 0, ids, values, num_stats); + + static const uint64_t port_expected[] = { + 3 /* rx */, + 0 /* tx */, + 0 /* drop */, + 0 /* inflights */, + 0 /* avg pkt cycles */, + 29 /* credits */, + 0 /* rx ring used */, + 4096 /* rx ring free */, + 0 /* cq ring used */, + 32 /* cq ring free */, + 0 /* dequeue calls */, + /* 10 dequeue burst buckets */ + 0, 0, 0, 0, 0, + 0, 0, 0, 0, 0, + }; + if (ret != RTE_DIM(port_expected)) { + printf( + "%s %d: wrong number of port stats (%d), expected %zu\n", + __func__, __LINE__, ret, RTE_DIM(port_expected)); + } + + for (i = 0; (signed int)i < ret; i++) { + if (port_expected[i] != values[i]) { + printf( + "%s : %d: Error stat %s is %"PRIu64 + ", expected %"PRIu64"\n", + __func__, __LINE__, xstats_names[i].name, + values[i], port_expected[i]); + goto fail; + } + } + + ret = rte_event_dev_xstats_reset(evdev, RTE_EVENT_DEV_XSTATS_PORT, + 0, NULL, 0); + + /* ensure reset statistics are zero-ed */ + static const uint64_t port_expected_zero[] = { + 0 /* rx */, + 0 /* tx */, + 0 /* drop */, + 0 /* inflights */, + 0 /* avg pkt cycles */, + 29 /* credits */, + 0 /* rx ring used */, + 4096 /* rx ring free */, + 0 /* cq ring used */, + 32 /* cq ring free */, + 0 /* dequeue calls */, + /* 10 dequeue burst buckets */ + 0, 0, 0, 0, 0, + 0, 0, 0, 0, 0, + }; + ret = rte_event_dev_xstats_get(evdev, + RTE_EVENT_DEV_XSTATS_PORT, + 0, ids, values, num_stats); + for (i = 0; (signed int)i < ret; i++) { + if (port_expected_zero[i] != values[i]) { + printf( + "%d, Error, xstat %d (id %d) %s : %"PRIu64 + ", expect %"PRIu64"\n", + __LINE__, i, ids[i], xstats_names[i].name, + values[i], port_expected_zero[i]); + goto fail; + } + } + + /* QUEUE STATS TESTS */ + num_stats = rte_event_dev_xstats_names_get(evdev, + RTE_EVENT_DEV_XSTATS_QUEUE, 0, + xstats_names, ids, XSTATS_MAX); + ret = rte_event_dev_xstats_get(evdev, RTE_EVENT_DEV_XSTATS_QUEUE, + 0, ids, values, num_stats); + if (ret < 0) { + printf("xstats get returned %d\n", ret); + goto fail; + } + if ((unsigned int)ret > XSTATS_MAX) + printf("%s %d: more xstats available than space\n", + __func__, __LINE__); + + static const uint64_t queue_expected[] = { + 3 /* rx */, + 3 /* tx */, + 0 /* drop */, + 3 /* inflights */, + 0, 0, 0, 0, /* iq 0, 1, 2, 3 used */ + /* QID-to-Port: pinned_flows, packets */ + 0, 0, + 0, 0, + 1, 3, + 0, 0, + }; + for (i = 0; (signed int)i < ret; i++) { + if (queue_expected[i] != values[i]) { + printf( + "%d, Error, xstat %d (id %d) %s : %"PRIu64 + ", expect %"PRIu64"\n", + __LINE__, i, ids[i], xstats_names[i].name, + values[i], queue_expected[i]); + goto fail; + } + } + + /* Reset the queue stats here */ + ret = rte_event_dev_xstats_reset(evdev, + RTE_EVENT_DEV_XSTATS_QUEUE, 0, + NULL, + 0); + + /* Verify that the resetable stats are reset, and others are not */ + static const uint64_t queue_expected_zero[] = { + 0 /* rx */, + 0 /* tx */, + 0 /* drop */, + 3 /* inflight */, + 0, 0, 0, 0, /* 4 iq used */ + /* QID-to-Port: pinned_flows, packets */ + 0, 0, + 0, 0, + 1, 0, + 0, 0, + }; + + ret = rte_event_dev_xstats_get(evdev, RTE_EVENT_DEV_XSTATS_QUEUE, 0, + ids, values, num_stats); + int fails = 0; + for (i = 0; (signed int)i < ret; i++) { + if (queue_expected_zero[i] != values[i]) { + printf( + "%d, Error, xstat %d (id %d) %s : %"PRIu64 + ", expect %"PRIu64"\n", + __LINE__, i, ids[i], xstats_names[i].name, + values[i], queue_expected_zero[i]); + fails++; + } + } + if (fails) { + printf("%d : %d of values were not as expected above\n", + __LINE__, fails); + goto fail; + } + + cleanup(t); + return 0; + +fail: + rte_event_dev_dump(0, stdout); + cleanup(t); + return -1; +} + + +static int +xstats_id_abuse_tests(struct test *t) +{ + int err; + const uint32_t XSTATS_MAX = 1024; + const uint32_t link_port = 2; + + uint32_t ids[XSTATS_MAX]; + struct rte_event_dev_xstats_name xstats_names[XSTATS_MAX]; + + /* Create instance with 4 ports */ + if (init(t, 1, 4) < 0 || + create_ports(t, 4) < 0 || + create_atomic_qids(t, 1) < 0) { + printf("%d: Error initializing device\n", __LINE__); + goto fail; + } + + err = rte_event_port_link(evdev, t->port[link_port], NULL, NULL, 0); + if (err != 1) { + printf("%d: error mapping lb qid\n", __LINE__); + goto fail; + } + + if (rte_event_dev_start(evdev) < 0) { + printf("%d: Error with start call\n", __LINE__); + goto fail; + } + + /* no test for device, as it ignores the port/q number */ + int num_stats = rte_event_dev_xstats_names_get(evdev, + RTE_EVENT_DEV_XSTATS_PORT, + UINT8_MAX-1, xstats_names, ids, + XSTATS_MAX); + if (num_stats != 0) { + printf("%d: expected %d stats, got return %d\n", __LINE__, + 0, num_stats); + goto fail; + } + + num_stats = rte_event_dev_xstats_names_get(evdev, + RTE_EVENT_DEV_XSTATS_QUEUE, + UINT8_MAX-1, xstats_names, ids, + XSTATS_MAX); + if (num_stats != 0) { + printf("%d: expected %d stats, got return %d\n", __LINE__, + 0, num_stats); + goto fail; + } + + cleanup(t); + return 0; +fail: + cleanup(t); + return -1; +} + +static int +port_reconfig_credits(struct test *t) +{ + if (init(t, 1, 1) < 0) { + printf("%d: Error initializing device\n", __LINE__); + return -1; + } + + uint32_t i; + const uint32_t NUM_ITERS = 32; + for (i = 0; i < NUM_ITERS; i++) { + const struct rte_event_queue_conf conf = { + .schedule_type = RTE_SCHED_TYPE_ATOMIC, + .priority = RTE_EVENT_DEV_PRIORITY_NORMAL, + .nb_atomic_flows = 1024, + .nb_atomic_order_sequences = 1024, + }; + if (rte_event_queue_setup(evdev, 0, &conf) < 0) { + printf("%d: error creating qid\n", __LINE__); + return -1; + } + t->qid[0] = 0; + + static const struct rte_event_port_conf port_conf = { + .new_event_threshold = 128, + .dequeue_depth = 32, + .enqueue_depth = 64, + .disable_implicit_release = 0, + }; + if (rte_event_port_setup(evdev, 0, &port_conf) < 0) { + printf("%d Error setting up port\n", __LINE__); + return -1; + } + + int links = rte_event_port_link(evdev, 0, NULL, NULL, 0); + if (links != 1) { + printf("%d: error mapping lb qid\n", __LINE__); + goto fail; + } + + if (rte_event_dev_start(evdev) < 0) { + printf("%d: Error with start call\n", __LINE__); + goto fail; + } + + const uint32_t NPKTS = 1; + uint32_t j; + for (j = 0; j < NPKTS; j++) { + struct rte_event ev; + struct rte_mbuf *arp = rte_gen_arp(0, t->mbuf_pool); + if (!arp) { + printf("%d: gen of pkt failed\n", __LINE__); + goto fail; + } + ev.queue_id = t->qid[0]; + ev.op = RTE_EVENT_OP_NEW; + ev.mbuf = arp; + int err = rte_event_enqueue_burst(evdev, 0, &ev, 1); + if (err != 1) { + printf("%d: Failed to enqueue\n", __LINE__); + rte_event_dev_dump(0, stdout); + goto fail; + } + } + + rte_service_run_iter_on_app_lcore(t->service_id, 1); + + struct rte_event ev[NPKTS]; + int deq = rte_event_dequeue_burst(evdev, t->port[0], ev, + NPKTS, 0); + if (deq != 1) + printf("%d error; no packet dequeued\n", __LINE__); + + /* let cleanup below stop the device on last iter */ + if (i != NUM_ITERS-1) + rte_event_dev_stop(evdev); + } + + cleanup(t); + return 0; +fail: + cleanup(t); + return -1; +} + +static int +port_single_lb_reconfig(struct test *t) +{ + if (init(t, 2, 2) < 0) { + printf("%d: Error initializing device\n", __LINE__); + goto fail; + } + + static const struct rte_event_queue_conf conf_lb_atomic = { + .priority = RTE_EVENT_DEV_PRIORITY_NORMAL, + .schedule_type = RTE_SCHED_TYPE_ATOMIC, + .nb_atomic_flows = 1024, + .nb_atomic_order_sequences = 1024, + }; + if (rte_event_queue_setup(evdev, 0, &conf_lb_atomic) < 0) { + printf("%d: error creating qid\n", __LINE__); + goto fail; + } + + static const struct rte_event_queue_conf conf_single_link = { + .priority = RTE_EVENT_DEV_PRIORITY_NORMAL, + .event_queue_cfg = RTE_EVENT_QUEUE_CFG_SINGLE_LINK, + }; + if (rte_event_queue_setup(evdev, 1, &conf_single_link) < 0) { + printf("%d: error creating qid\n", __LINE__); + goto fail; + } + + struct rte_event_port_conf port_conf = { + .new_event_threshold = 128, + .dequeue_depth = 32, + .enqueue_depth = 64, + .disable_implicit_release = 0, + }; + if (rte_event_port_setup(evdev, 0, &port_conf) < 0) { + printf("%d Error setting up port\n", __LINE__); + goto fail; + } + if (rte_event_port_setup(evdev, 1, &port_conf) < 0) { + printf("%d Error setting up port\n", __LINE__); + goto fail; + } + + /* link port to lb queue */ + uint8_t queue_id = 0; + if (rte_event_port_link(evdev, 0, &queue_id, NULL, 1) != 1) { + printf("%d: error creating link for qid\n", __LINE__); + goto fail; + } + + int ret = rte_event_port_unlink(evdev, 0, &queue_id, 1); + if (ret != 1) { + printf("%d: Error unlinking lb port\n", __LINE__); + goto fail; + } + + queue_id = 1; + if (rte_event_port_link(evdev, 0, &queue_id, NULL, 1) != 1) { + printf("%d: error creating link for qid\n", __LINE__); + goto fail; + } + + queue_id = 0; + int err = rte_event_port_link(evdev, 1, &queue_id, NULL, 1); + if (err != 1) { + printf("%d: error mapping lb qid\n", __LINE__); + goto fail; + } + + if (rte_event_dev_start(evdev) < 0) { + printf("%d: Error with start call\n", __LINE__); + goto fail; + } + + cleanup(t); + return 0; +fail: + cleanup(t); + return -1; +} + +static int +xstats_brute_force(struct test *t) +{ + uint32_t i; + const uint32_t XSTATS_MAX = 1024; + uint32_t ids[XSTATS_MAX]; + uint64_t values[XSTATS_MAX]; + struct rte_event_dev_xstats_name xstats_names[XSTATS_MAX]; + + + /* Create instance with 4 ports */ + if (init(t, 1, 4) < 0 || + create_ports(t, 4) < 0 || + create_atomic_qids(t, 1) < 0) { + printf("%d: Error initializing device\n", __LINE__); + return -1; + } + + int err = rte_event_port_link(evdev, t->port[0], NULL, NULL, 0); + if (err != 1) { + printf("%d: error mapping lb qid\n", __LINE__); + goto fail; + } + + if (rte_event_dev_start(evdev) < 0) { + printf("%d: Error with start call\n", __LINE__); + goto fail; + } + + for (i = 0; i < XSTATS_MAX; i++) + ids[i] = i; + + for (i = 0; i < 3; i++) { + uint32_t mode = RTE_EVENT_DEV_XSTATS_DEVICE + i; + uint32_t j; + for (j = 0; j < UINT8_MAX; j++) { + rte_event_dev_xstats_names_get(evdev, mode, + j, xstats_names, ids, XSTATS_MAX); + + rte_event_dev_xstats_get(evdev, mode, j, ids, + values, XSTATS_MAX); + } + } + + cleanup(t); + return 0; +fail: + cleanup(t); + return -1; +} + +static int +xstats_id_reset_tests(struct test *t) +{ + const int wrk_enq = 2; + int err; + + /* Create instance with 4 ports */ + if (init(t, 1, 4) < 0 || + create_ports(t, 4) < 0 || + create_atomic_qids(t, 1) < 0) { + printf("%d: Error initializing device\n", __LINE__); + return -1; + } + + /* CQ mapping to QID */ + err = rte_event_port_link(evdev, t->port[wrk_enq], NULL, NULL, 0); + if (err != 1) { + printf("%d: error mapping lb qid\n", __LINE__); + goto fail; + } + + if (rte_event_dev_start(evdev) < 0) { + printf("%d: Error with start call\n", __LINE__); + goto fail; + } + +#define XSTATS_MAX 1024 + int ret; + uint32_t i; + uint32_t ids[XSTATS_MAX]; + uint64_t values[XSTATS_MAX]; + struct rte_event_dev_xstats_name xstats_names[XSTATS_MAX]; + + for (i = 0; i < XSTATS_MAX; i++) + ids[i] = i; + +#define NUM_DEV_STATS 6 + /* Device names / values */ + int num_stats = rte_event_dev_xstats_names_get(evdev, + RTE_EVENT_DEV_XSTATS_DEVICE, + 0, xstats_names, ids, XSTATS_MAX); + if (num_stats != NUM_DEV_STATS) { + printf("%d: expected %d stats, got return %d\n", __LINE__, + NUM_DEV_STATS, num_stats); + goto fail; + } + ret = rte_event_dev_xstats_get(evdev, + RTE_EVENT_DEV_XSTATS_DEVICE, + 0, ids, values, num_stats); + if (ret != NUM_DEV_STATS) { + printf("%d: expected %d stats, got return %d\n", __LINE__, + NUM_DEV_STATS, ret); + goto fail; + } + +#define NPKTS 7 + for (i = 0; i < NPKTS; i++) { + struct rte_event ev; + struct rte_mbuf *arp = rte_gen_arp(0, t->mbuf_pool); + if (!arp) { + printf("%d: gen of pkt failed\n", __LINE__); + goto fail; + } + ev.queue_id = t->qid[i]; + ev.op = RTE_EVENT_OP_NEW; + ev.mbuf = arp; + arp->seqn = i; + + int err = rte_event_enqueue_burst(evdev, t->port[0], &ev, 1); + if (err != 1) { + printf("%d: Failed to enqueue\n", __LINE__); + goto fail; + } + } + + rte_service_run_iter_on_app_lcore(t->service_id, 1); + + static const char * const dev_names[] = { + "dev_rx", "dev_tx", "dev_drop", "dev_sched_calls", + "dev_sched_no_iq_enq", "dev_sched_no_cq_enq", + }; + uint64_t dev_expected[] = {NPKTS, NPKTS, 0, 1, 0, 0}; + for (i = 0; (int)i < ret; i++) { + unsigned int id; + uint64_t val = rte_event_dev_xstats_by_name_get(evdev, + dev_names[i], + &id); + if (id != i) { + printf("%d: %s id incorrect, expected %d got %d\n", + __LINE__, dev_names[i], i, id); + goto fail; + } + if (val != dev_expected[i]) { + printf("%d: %s value incorrect, expected %" + PRIu64" got %d\n", __LINE__, dev_names[i], + dev_expected[i], id); + goto fail; + } + /* reset to zero */ + int reset_ret = rte_event_dev_xstats_reset(evdev, + RTE_EVENT_DEV_XSTATS_DEVICE, 0, + &id, + 1); + if (reset_ret) { + printf("%d: failed to reset successfully\n", __LINE__); + goto fail; + } + dev_expected[i] = 0; + /* check value again */ + val = rte_event_dev_xstats_by_name_get(evdev, dev_names[i], 0); + if (val != dev_expected[i]) { + printf("%d: %s value incorrect, expected %"PRIu64 + " got %"PRIu64"\n", __LINE__, dev_names[i], + dev_expected[i], val); + goto fail; + } + }; + +/* 48 is stat offset from start of the devices whole xstats. + * This WILL break every time we add a statistic to a port + * or the device, but there is no other way to test + */ +#define PORT_OFF 48 +/* num stats for the tested port. CQ size adds more stats to a port */ +#define NUM_PORT_STATS 21 +/* the port to test. */ +#define PORT 2 + num_stats = rte_event_dev_xstats_names_get(evdev, + RTE_EVENT_DEV_XSTATS_PORT, PORT, + xstats_names, ids, XSTATS_MAX); + if (num_stats != NUM_PORT_STATS) { + printf("%d: expected %d stats, got return %d\n", + __LINE__, NUM_PORT_STATS, num_stats); + goto fail; + } + ret = rte_event_dev_xstats_get(evdev, RTE_EVENT_DEV_XSTATS_PORT, PORT, + ids, values, num_stats); + + if (ret != NUM_PORT_STATS) { + printf("%d: expected %d stats, got return %d\n", + __LINE__, NUM_PORT_STATS, ret); + goto fail; + } + static const char * const port_names[] = { + "port_2_rx", + "port_2_tx", + "port_2_drop", + "port_2_inflight", + "port_2_avg_pkt_cycles", + "port_2_credits", + "port_2_rx_ring_used", + "port_2_rx_ring_free", + "port_2_cq_ring_used", + "port_2_cq_ring_free", + "port_2_dequeue_calls", + "port_2_dequeues_returning_0", + "port_2_dequeues_returning_1-4", + "port_2_dequeues_returning_5-8", + "port_2_dequeues_returning_9-12", + "port_2_dequeues_returning_13-16", + "port_2_dequeues_returning_17-20", + "port_2_dequeues_returning_21-24", + "port_2_dequeues_returning_25-28", + "port_2_dequeues_returning_29-32", + "port_2_dequeues_returning_33-36", + }; + uint64_t port_expected[] = { + 0, /* rx */ + NPKTS, /* tx */ + 0, /* drop */ + NPKTS, /* inflight */ + 0, /* avg pkt cycles */ + 0, /* credits */ + 0, /* rx ring used */ + 4096, /* rx ring free */ + NPKTS, /* cq ring used */ + 25, /* cq ring free */ + 0, /* dequeue zero calls */ + 0, 0, 0, 0, 0, /* 10 dequeue buckets */ + 0, 0, 0, 0, 0, + }; + uint64_t port_expected_zero[] = { + 0, /* rx */ + 0, /* tx */ + 0, /* drop */ + NPKTS, /* inflight */ + 0, /* avg pkt cycles */ + 0, /* credits */ + 0, /* rx ring used */ + 4096, /* rx ring free */ + NPKTS, /* cq ring used */ + 25, /* cq ring free */ + 0, /* dequeue zero calls */ + 0, 0, 0, 0, 0, /* 10 dequeue buckets */ + 0, 0, 0, 0, 0, + }; + if (RTE_DIM(port_expected) != NUM_PORT_STATS || + RTE_DIM(port_names) != NUM_PORT_STATS) { + printf("%d: port array of wrong size\n", __LINE__); + goto fail; + } + + int failed = 0; + for (i = 0; (int)i < ret; i++) { + unsigned int id; + uint64_t val = rte_event_dev_xstats_by_name_get(evdev, + port_names[i], + &id); + if (id != i + PORT_OFF) { + printf("%d: %s id incorrect, expected %d got %d\n", + __LINE__, port_names[i], i+PORT_OFF, + id); + failed = 1; + } + if (val != port_expected[i]) { + printf("%d: %s value incorrect, expected %"PRIu64 + " got %d\n", __LINE__, port_names[i], + port_expected[i], id); + failed = 1; + } + /* reset to zero */ + int reset_ret = rte_event_dev_xstats_reset(evdev, + RTE_EVENT_DEV_XSTATS_PORT, PORT, + &id, + 1); + if (reset_ret) { + printf("%d: failed to reset successfully\n", __LINE__); + failed = 1; + } + /* check value again */ + val = rte_event_dev_xstats_by_name_get(evdev, port_names[i], 0); + if (val != port_expected_zero[i]) { + printf("%d: %s value incorrect, expected %"PRIu64 + " got %"PRIu64"\n", __LINE__, port_names[i], + port_expected_zero[i], val); + failed = 1; + } + }; + if (failed) + goto fail; + +/* num queue stats */ +#define NUM_Q_STATS 16 +/* queue offset from start of the devices whole xstats. + * This will break every time we add a statistic to a device/port/queue + */ +#define QUEUE_OFF 90 + const uint32_t queue = 0; + num_stats = rte_event_dev_xstats_names_get(evdev, + RTE_EVENT_DEV_XSTATS_QUEUE, queue, + xstats_names, ids, XSTATS_MAX); + if (num_stats != NUM_Q_STATS) { + printf("%d: expected %d stats, got return %d\n", + __LINE__, NUM_Q_STATS, num_stats); + goto fail; + } + ret = rte_event_dev_xstats_get(evdev, RTE_EVENT_DEV_XSTATS_QUEUE, + queue, ids, values, num_stats); + if (ret != NUM_Q_STATS) { + printf("%d: expected 21 stats, got return %d\n", __LINE__, ret); + goto fail; + } + static const char * const queue_names[] = { + "qid_0_rx", + "qid_0_tx", + "qid_0_drop", + "qid_0_inflight", + "qid_0_iq_0_used", + "qid_0_iq_1_used", + "qid_0_iq_2_used", + "qid_0_iq_3_used", + "qid_0_port_0_pinned_flows", + "qid_0_port_0_packets", + "qid_0_port_1_pinned_flows", + "qid_0_port_1_packets", + "qid_0_port_2_pinned_flows", + "qid_0_port_2_packets", + "qid_0_port_3_pinned_flows", + "qid_0_port_3_packets", + }; + uint64_t queue_expected[] = { + 7, /* rx */ + 7, /* tx */ + 0, /* drop */ + 7, /* inflight */ + 0, /* iq 0 used */ + 0, /* iq 1 used */ + 0, /* iq 2 used */ + 0, /* iq 3 used */ + /* QID-to-Port: pinned_flows, packets */ + 0, 0, + 0, 0, + 1, 7, + 0, 0, + }; + uint64_t queue_expected_zero[] = { + 0, /* rx */ + 0, /* tx */ + 0, /* drop */ + 7, /* inflight */ + 0, /* iq 0 used */ + 0, /* iq 1 used */ + 0, /* iq 2 used */ + 0, /* iq 3 used */ + /* QID-to-Port: pinned_flows, packets */ + 0, 0, + 0, 0, + 1, 0, + 0, 0, + }; + if (RTE_DIM(queue_expected) != NUM_Q_STATS || + RTE_DIM(queue_expected_zero) != NUM_Q_STATS || + RTE_DIM(queue_names) != NUM_Q_STATS) { + printf("%d : queue array of wrong size\n", __LINE__); + goto fail; + } + + failed = 0; + for (i = 0; (int)i < ret; i++) { + unsigned int id; + uint64_t val = rte_event_dev_xstats_by_name_get(evdev, + queue_names[i], + &id); + if (id != i + QUEUE_OFF) { + printf("%d: %s id incorrect, expected %d got %d\n", + __LINE__, queue_names[i], i+QUEUE_OFF, + id); + failed = 1; + } + if (val != queue_expected[i]) { + printf("%d: %d: %s value , expected %"PRIu64 + " got %"PRIu64"\n", i, __LINE__, + queue_names[i], queue_expected[i], val); + failed = 1; + } + /* reset to zero */ + int reset_ret = rte_event_dev_xstats_reset(evdev, + RTE_EVENT_DEV_XSTATS_QUEUE, + queue, &id, 1); + if (reset_ret) { + printf("%d: failed to reset successfully\n", __LINE__); + failed = 1; + } + /* check value again */ + val = rte_event_dev_xstats_by_name_get(evdev, queue_names[i], + 0); + if (val != queue_expected_zero[i]) { + printf("%d: %s value incorrect, expected %"PRIu64 + " got %"PRIu64"\n", __LINE__, queue_names[i], + queue_expected_zero[i], val); + failed = 1; + } + }; + + if (failed) + goto fail; + + cleanup(t); + return 0; +fail: + cleanup(t); + return -1; +} + +static int +ordered_reconfigure(struct test *t) +{ + if (init(t, 1, 1) < 0 || + create_ports(t, 1) < 0) { + printf("%d: Error initializing device\n", __LINE__); + return -1; + } + + const struct rte_event_queue_conf conf = { + .schedule_type = RTE_SCHED_TYPE_ORDERED, + .priority = RTE_EVENT_DEV_PRIORITY_NORMAL, + .nb_atomic_flows = 1024, + .nb_atomic_order_sequences = 1024, + }; + + if (rte_event_queue_setup(evdev, 0, &conf) < 0) { + printf("%d: error creating qid\n", __LINE__); + goto failed; + } + + if (rte_event_queue_setup(evdev, 0, &conf) < 0) { + printf("%d: error creating qid, for 2nd time\n", __LINE__); + goto failed; + } + + rte_event_port_link(evdev, t->port[0], NULL, NULL, 0); + if (rte_event_dev_start(evdev) < 0) { + printf("%d: Error with start call\n", __LINE__); + return -1; + } + + cleanup(t); + return 0; +failed: + cleanup(t); + return -1; +} + +static int +qid_priorities(struct test *t) +{ + /* Test works by having a CQ with enough empty space for all packets, + * and enqueueing 3 packets to 3 QIDs. They must return based on the + * priority of the QID, not the ingress order, to pass the test + */ + unsigned int i; + /* Create instance with 1 ports, and 3 qids */ + if (init(t, 3, 1) < 0 || + create_ports(t, 1) < 0) { + printf("%d: Error initializing device\n", __LINE__); + return -1; + } + + for (i = 0; i < 3; i++) { + /* Create QID */ + const struct rte_event_queue_conf conf = { + .schedule_type = RTE_SCHED_TYPE_ATOMIC, + /* increase priority (0 == highest), as we go */ + .priority = RTE_EVENT_DEV_PRIORITY_NORMAL - i, + .nb_atomic_flows = 1024, + .nb_atomic_order_sequences = 1024, + }; + + if (rte_event_queue_setup(evdev, i, &conf) < 0) { + printf("%d: error creating qid %d\n", __LINE__, i); + return -1; + } + t->qid[i] = i; + } + t->nb_qids = i; + /* map all QIDs to port */ + rte_event_port_link(evdev, t->port[0], NULL, NULL, 0); + + if (rte_event_dev_start(evdev) < 0) { + printf("%d: Error with start call\n", __LINE__); + return -1; + } + + /* enqueue 3 packets, setting seqn and QID to check priority */ + for (i = 0; i < 3; i++) { + struct rte_event ev; + struct rte_mbuf *arp = rte_gen_arp(0, t->mbuf_pool); + if (!arp) { + printf("%d: gen of pkt failed\n", __LINE__); + return -1; + } + ev.queue_id = t->qid[i]; + ev.op = RTE_EVENT_OP_NEW; + ev.mbuf = arp; + arp->seqn = i; + + int err = rte_event_enqueue_burst(evdev, t->port[0], &ev, 1); + if (err != 1) { + printf("%d: Failed to enqueue\n", __LINE__); + return -1; + } + } + + rte_service_run_iter_on_app_lcore(t->service_id, 1); + + /* dequeue packets, verify priority was upheld */ + struct rte_event ev[32]; + uint32_t deq_pkts = + rte_event_dequeue_burst(evdev, t->port[0], ev, 32, 0); + if (deq_pkts != 3) { + printf("%d: failed to deq packets\n", __LINE__); + rte_event_dev_dump(evdev, stdout); + return -1; + } + for (i = 0; i < 3; i++) { + if (ev[i].mbuf->seqn != 2-i) { + printf( + "%d: qid priority test: seqn %d incorrectly prioritized\n", + __LINE__, i); + } + } + + cleanup(t); + return 0; +} + +static int +unlink_in_progress(struct test *t) +{ + /* Test unlinking API, in particular that when an unlink request has + * not yet been seen by the scheduler thread, that the + * unlink_in_progress() function returns the number of unlinks. + */ + unsigned int i; + /* Create instance with 1 ports, and 3 qids */ + if (init(t, 3, 1) < 0 || + create_ports(t, 1) < 0) { + printf("%d: Error initializing device\n", __LINE__); + return -1; + } + + for (i = 0; i < 3; i++) { + /* Create QID */ + const struct rte_event_queue_conf conf = { + .schedule_type = RTE_SCHED_TYPE_ATOMIC, + /* increase priority (0 == highest), as we go */ + .priority = RTE_EVENT_DEV_PRIORITY_NORMAL - i, + .nb_atomic_flows = 1024, + .nb_atomic_order_sequences = 1024, + }; + + if (rte_event_queue_setup(evdev, i, &conf) < 0) { + printf("%d: error creating qid %d\n", __LINE__, i); + return -1; + } + t->qid[i] = i; + } + t->nb_qids = i; + /* map all QIDs to port */ + rte_event_port_link(evdev, t->port[0], NULL, NULL, 0); + + if (rte_event_dev_start(evdev) < 0) { + printf("%d: Error with start call\n", __LINE__); + return -1; + } + + /* unlink all ports to have outstanding unlink requests */ + int ret = rte_event_port_unlink(evdev, t->port[0], NULL, 0); + if (ret < 0) { + printf("%d: Failed to unlink queues\n", __LINE__); + return -1; + } + + /* get active unlinks here, expect 3 */ + int unlinks_in_progress = + rte_event_port_unlinks_in_progress(evdev, t->port[0]); + if (unlinks_in_progress != 3) { + printf("%d: Expected num unlinks in progress == 3, got %d\n", + __LINE__, unlinks_in_progress); + return -1; + } + + /* run scheduler service on this thread to ack the unlinks */ + rte_service_run_iter_on_app_lcore(t->service_id, 1); + + /* active unlinks expected as 0 as scheduler thread has acked */ + unlinks_in_progress = + rte_event_port_unlinks_in_progress(evdev, t->port[0]); + if (unlinks_in_progress != 0) { + printf("%d: Expected num unlinks in progress == 0, got %d\n", + __LINE__, unlinks_in_progress); + } + + cleanup(t); + return 0; +} + +static int +load_balancing(struct test *t) +{ + const int rx_enq = 0; + int err; + uint32_t i; + + if (init(t, 1, 4) < 0 || + create_ports(t, 4) < 0 || + create_atomic_qids(t, 1) < 0) { + printf("%d: Error initializing device\n", __LINE__); + return -1; + } + + for (i = 0; i < 3; i++) { + /* map port 1 - 3 inclusive */ + if (rte_event_port_link(evdev, t->port[i+1], &t->qid[0], + NULL, 1) != 1) { + printf("%d: error mapping qid to port %d\n", + __LINE__, i); + return -1; + } + } + + if (rte_event_dev_start(evdev) < 0) { + printf("%d: Error with start call\n", __LINE__); + return -1; + } + + /************** FORWARD ****************/ + /* + * Create a set of flows that test the load-balancing operation of the + * implementation. Fill CQ 0 and 1 with flows 0 and 1, and test + * with a new flow, which should be sent to the 3rd mapped CQ + */ + static uint32_t flows[] = {0, 1, 1, 0, 0, 2, 2, 0, 2}; + + for (i = 0; i < RTE_DIM(flows); i++) { + struct rte_mbuf *arp = rte_gen_arp(0, t->mbuf_pool); + if (!arp) { + printf("%d: gen of pkt failed\n", __LINE__); + return -1; + } + + struct rte_event ev = { + .op = RTE_EVENT_OP_NEW, + .queue_id = t->qid[0], + .flow_id = flows[i], + .mbuf = arp, + }; + /* generate pkt and enqueue */ + err = rte_event_enqueue_burst(evdev, t->port[rx_enq], &ev, 1); + if (err != 1) { + printf("%d: Failed to enqueue\n", __LINE__); + return -1; + } + } + + rte_service_run_iter_on_app_lcore(t->service_id, 1); + + struct test_event_dev_stats stats; + err = test_event_dev_stats_get(evdev, &stats); + if (err) { + printf("%d: failed to get stats\n", __LINE__); + return -1; + } + + if (stats.port_inflight[1] != 4) { + printf("%d:%s: port 1 inflight not correct\n", __LINE__, + __func__); + return -1; + } + if (stats.port_inflight[2] != 2) { + printf("%d:%s: port 2 inflight not correct\n", __LINE__, + __func__); + return -1; + } + if (stats.port_inflight[3] != 3) { + printf("%d:%s: port 3 inflight not correct\n", __LINE__, + __func__); + return -1; + } + + cleanup(t); + return 0; +} + +static int +load_balancing_history(struct test *t) +{ + struct test_event_dev_stats stats = {0}; + const int rx_enq = 0; + int err; + uint32_t i; + + /* Create instance with 1 atomic QID going to 3 ports + 1 prod port */ + if (init(t, 1, 4) < 0 || + create_ports(t, 4) < 0 || + create_atomic_qids(t, 1) < 0) + return -1; + + /* CQ mapping to QID */ + if (rte_event_port_link(evdev, t->port[1], &t->qid[0], NULL, 1) != 1) { + printf("%d: error mapping port 1 qid\n", __LINE__); + return -1; + } + if (rte_event_port_link(evdev, t->port[2], &t->qid[0], NULL, 1) != 1) { + printf("%d: error mapping port 2 qid\n", __LINE__); + return -1; + } + if (rte_event_port_link(evdev, t->port[3], &t->qid[0], NULL, 1) != 1) { + printf("%d: error mapping port 3 qid\n", __LINE__); + return -1; + } + if (rte_event_dev_start(evdev) < 0) { + printf("%d: Error with start call\n", __LINE__); + return -1; + } + + /* + * Create a set of flows that test the load-balancing operation of the + * implementation. Fill CQ 0, 1 and 2 with flows 0, 1 and 2, drop + * the packet from CQ 0, send in a new set of flows. Ensure that: + * 1. The new flow 3 gets into the empty CQ0 + * 2. packets for existing flow gets added into CQ1 + * 3. Next flow 0 pkt is now onto CQ2, since CQ0 and CQ1 now contain + * more outstanding pkts + * + * This test makes sure that when a flow ends (i.e. all packets + * have been completed for that flow), that the flow can be moved + * to a different CQ when new packets come in for that flow. + */ + static uint32_t flows1[] = {0, 1, 1, 2}; + + for (i = 0; i < RTE_DIM(flows1); i++) { + struct rte_mbuf *arp = rte_gen_arp(0, t->mbuf_pool); + struct rte_event ev = { + .flow_id = flows1[i], + .op = RTE_EVENT_OP_NEW, + .queue_id = t->qid[0], + .event_type = RTE_EVENT_TYPE_CPU, + .priority = RTE_EVENT_DEV_PRIORITY_NORMAL, + .mbuf = arp + }; + + if (!arp) { + printf("%d: gen of pkt failed\n", __LINE__); + return -1; + } + arp->hash.rss = flows1[i]; + err = rte_event_enqueue_burst(evdev, t->port[rx_enq], &ev, 1); + if (err != 1) { + printf("%d: Failed to enqueue\n", __LINE__); + return -1; + } + } + + /* call the scheduler */ + rte_service_run_iter_on_app_lcore(t->service_id, 1); + + /* Dequeue the flow 0 packet from port 1, so that we can then drop */ + struct rte_event ev; + if (!rte_event_dequeue_burst(evdev, t->port[1], &ev, 1, 0)) { + printf("%d: failed to dequeue\n", __LINE__); + return -1; + } + if (ev.mbuf->hash.rss != flows1[0]) { + printf("%d: unexpected flow received\n", __LINE__); + return -1; + } + + /* drop the flow 0 packet from port 1 */ + rte_event_enqueue_burst(evdev, t->port[1], &release_ev, 1); + + /* call the scheduler */ + rte_service_run_iter_on_app_lcore(t->service_id, 1); + + /* + * Set up the next set of flows, first a new flow to fill up + * CQ 0, so that the next flow 0 packet should go to CQ2 + */ + static uint32_t flows2[] = { 3, 3, 3, 1, 1, 0 }; + + for (i = 0; i < RTE_DIM(flows2); i++) { + struct rte_mbuf *arp = rte_gen_arp(0, t->mbuf_pool); + struct rte_event ev = { + .flow_id = flows2[i], + .op = RTE_EVENT_OP_NEW, + .queue_id = t->qid[0], + .event_type = RTE_EVENT_TYPE_CPU, + .priority = RTE_EVENT_DEV_PRIORITY_NORMAL, + .mbuf = arp + }; + + if (!arp) { + printf("%d: gen of pkt failed\n", __LINE__); + return -1; + } + arp->hash.rss = flows2[i]; + + err = rte_event_enqueue_burst(evdev, t->port[rx_enq], &ev, 1); + if (err != 1) { + printf("%d: Failed to enqueue\n", __LINE__); + return -1; + } + } + + /* schedule */ + rte_service_run_iter_on_app_lcore(t->service_id, 1); + + err = test_event_dev_stats_get(evdev, &stats); + if (err) { + printf("%d:failed to get stats\n", __LINE__); + return -1; + } + + /* + * Now check the resulting inflights on each port. + */ + if (stats.port_inflight[1] != 3) { + printf("%d:%s: port 1 inflight not correct\n", __LINE__, + __func__); + printf("Inflights, ports 1, 2, 3: %u, %u, %u\n", + (unsigned int)stats.port_inflight[1], + (unsigned int)stats.port_inflight[2], + (unsigned int)stats.port_inflight[3]); + return -1; + } + if (stats.port_inflight[2] != 4) { + printf("%d:%s: port 2 inflight not correct\n", __LINE__, + __func__); + printf("Inflights, ports 1, 2, 3: %u, %u, %u\n", + (unsigned int)stats.port_inflight[1], + (unsigned int)stats.port_inflight[2], + (unsigned int)stats.port_inflight[3]); + return -1; + } + if (stats.port_inflight[3] != 2) { + printf("%d:%s: port 3 inflight not correct\n", __LINE__, + __func__); + printf("Inflights, ports 1, 2, 3: %u, %u, %u\n", + (unsigned int)stats.port_inflight[1], + (unsigned int)stats.port_inflight[2], + (unsigned int)stats.port_inflight[3]); + return -1; + } + + for (i = 1; i <= 3; i++) { + struct rte_event ev; + while (rte_event_dequeue_burst(evdev, i, &ev, 1, 0)) + rte_event_enqueue_burst(evdev, i, &release_ev, 1); + } + rte_service_run_iter_on_app_lcore(t->service_id, 1); + + cleanup(t); + return 0; +} + +static int +invalid_qid(struct test *t) +{ + struct test_event_dev_stats stats; + const int rx_enq = 0; + int err; + uint32_t i; + + if (init(t, 1, 4) < 0 || + create_ports(t, 4) < 0 || + create_atomic_qids(t, 1) < 0) { + printf("%d: Error initializing device\n", __LINE__); + return -1; + } + + /* CQ mapping to QID */ + for (i = 0; i < 4; i++) { + err = rte_event_port_link(evdev, t->port[i], &t->qid[0], + NULL, 1); + if (err != 1) { + printf("%d: error mapping port 1 qid\n", __LINE__); + return -1; + } + } + + if (rte_event_dev_start(evdev) < 0) { + printf("%d: Error with start call\n", __LINE__); + return -1; + } + + /* + * Send in a packet with an invalid qid to the scheduler. + * We should see the packed enqueued OK, but the inflights for + * that packet should not be incremented, and the rx_dropped + * should be incremented. + */ + static uint32_t flows1[] = {20}; + + for (i = 0; i < RTE_DIM(flows1); i++) { + struct rte_mbuf *arp = rte_gen_arp(0, t->mbuf_pool); + if (!arp) { + printf("%d: gen of pkt failed\n", __LINE__); + return -1; + } + + struct rte_event ev = { + .op = RTE_EVENT_OP_NEW, + .queue_id = t->qid[0] + flows1[i], + .flow_id = i, + .mbuf = arp, + }; + /* generate pkt and enqueue */ + err = rte_event_enqueue_burst(evdev, t->port[rx_enq], &ev, 1); + if (err != 1) { + printf("%d: Failed to enqueue\n", __LINE__); + return -1; + } + } + + /* call the scheduler */ + rte_service_run_iter_on_app_lcore(t->service_id, 1); + + err = test_event_dev_stats_get(evdev, &stats); + if (err) { + printf("%d: failed to get stats\n", __LINE__); + return -1; + } + + /* + * Now check the resulting inflights on the port, and the rx_dropped. + */ + if (stats.port_inflight[0] != 0) { + printf("%d:%s: port 1 inflight count not correct\n", __LINE__, + __func__); + rte_event_dev_dump(evdev, stdout); + return -1; + } + if (stats.port_rx_dropped[0] != 1) { + printf("%d:%s: port 1 drops\n", __LINE__, __func__); + rte_event_dev_dump(evdev, stdout); + return -1; + } + /* each packet drop should only be counted in one place - port or dev */ + if (stats.rx_dropped != 0) { + printf("%d:%s: port 1 dropped count not correct\n", __LINE__, + __func__); + rte_event_dev_dump(evdev, stdout); + return -1; + } + + cleanup(t); + return 0; +} + +static int +single_packet(struct test *t) +{ + const uint32_t MAGIC_SEQN = 7321; + struct rte_event ev; + struct test_event_dev_stats stats; + const int rx_enq = 0; + const int wrk_enq = 2; + int err; + + /* Create instance with 4 ports */ + if (init(t, 1, 4) < 0 || + create_ports(t, 4) < 0 || + create_atomic_qids(t, 1) < 0) { + printf("%d: Error initializing device\n", __LINE__); + return -1; + } + + /* CQ mapping to QID */ + err = rte_event_port_link(evdev, t->port[wrk_enq], NULL, NULL, 0); + if (err != 1) { + printf("%d: error mapping lb qid\n", __LINE__); + cleanup(t); + return -1; + } + + if (rte_event_dev_start(evdev) < 0) { + printf("%d: Error with start call\n", __LINE__); + return -1; + } + + /************** Gen pkt and enqueue ****************/ + struct rte_mbuf *arp = rte_gen_arp(0, t->mbuf_pool); + if (!arp) { + printf("%d: gen of pkt failed\n", __LINE__); + return -1; + } + + ev.op = RTE_EVENT_OP_NEW; + ev.priority = RTE_EVENT_DEV_PRIORITY_NORMAL; + ev.mbuf = arp; + ev.queue_id = 0; + ev.flow_id = 3; + arp->seqn = MAGIC_SEQN; + + err = rte_event_enqueue_burst(evdev, t->port[rx_enq], &ev, 1); + if (err != 1) { + printf("%d: Failed to enqueue\n", __LINE__); + return -1; + } + + rte_service_run_iter_on_app_lcore(t->service_id, 1); + + err = test_event_dev_stats_get(evdev, &stats); + if (err) { + printf("%d: failed to get stats\n", __LINE__); + return -1; + } + + if (stats.rx_pkts != 1 || + stats.tx_pkts != 1 || + stats.port_inflight[wrk_enq] != 1) { + printf("%d: Sched core didn't handle pkt as expected\n", + __LINE__); + rte_event_dev_dump(evdev, stdout); + return -1; + } + + uint32_t deq_pkts; + + deq_pkts = rte_event_dequeue_burst(evdev, t->port[wrk_enq], &ev, 1, 0); + if (deq_pkts < 1) { + printf("%d: Failed to deq\n", __LINE__); + return -1; + } + + err = test_event_dev_stats_get(evdev, &stats); + if (err) { + printf("%d: failed to get stats\n", __LINE__); + return -1; + } + + err = test_event_dev_stats_get(evdev, &stats); + if (ev.mbuf->seqn != MAGIC_SEQN) { + printf("%d: magic sequence number not dequeued\n", __LINE__); + return -1; + } + + rte_pktmbuf_free(ev.mbuf); + err = rte_event_enqueue_burst(evdev, t->port[wrk_enq], &release_ev, 1); + if (err != 1) { + printf("%d: Failed to enqueue\n", __LINE__); + return -1; + } + rte_service_run_iter_on_app_lcore(t->service_id, 1); + + err = test_event_dev_stats_get(evdev, &stats); + if (stats.port_inflight[wrk_enq] != 0) { + printf("%d: port inflight not correct\n", __LINE__); + return -1; + } + + cleanup(t); + return 0; +} + +static int +inflight_counts(struct test *t) +{ + struct rte_event ev; + struct test_event_dev_stats stats; + const int rx_enq = 0; + const int p1 = 1; + const int p2 = 2; + int err; + int i; + + /* Create instance with 4 ports */ + if (init(t, 2, 3) < 0 || + create_ports(t, 3) < 0 || + create_atomic_qids(t, 2) < 0) { + printf("%d: Error initializing device\n", __LINE__); + return -1; + } + + /* CQ mapping to QID */ + err = rte_event_port_link(evdev, t->port[p1], &t->qid[0], NULL, 1); + if (err != 1) { + printf("%d: error mapping lb qid\n", __LINE__); + cleanup(t); + return -1; + } + err = rte_event_port_link(evdev, t->port[p2], &t->qid[1], NULL, 1); + if (err != 1) { + printf("%d: error mapping lb qid\n", __LINE__); + cleanup(t); + return -1; + } + + if (rte_event_dev_start(evdev) < 0) { + printf("%d: Error with start call\n", __LINE__); + return -1; + } + + /************** FORWARD ****************/ +#define QID1_NUM 5 + for (i = 0; i < QID1_NUM; i++) { + struct rte_mbuf *arp = rte_gen_arp(0, t->mbuf_pool); + + if (!arp) { + printf("%d: gen of pkt failed\n", __LINE__); + goto err; + } + + ev.queue_id = t->qid[0]; + ev.op = RTE_EVENT_OP_NEW; + ev.mbuf = arp; + err = rte_event_enqueue_burst(evdev, t->port[rx_enq], &ev, 1); + if (err != 1) { + printf("%d: Failed to enqueue\n", __LINE__); + goto err; + } + } +#define QID2_NUM 3 + for (i = 0; i < QID2_NUM; i++) { + struct rte_mbuf *arp = rte_gen_arp(0, t->mbuf_pool); + + if (!arp) { + printf("%d: gen of pkt failed\n", __LINE__); + goto err; + } + ev.queue_id = t->qid[1]; + ev.op = RTE_EVENT_OP_NEW; + ev.mbuf = arp; + err = rte_event_enqueue_burst(evdev, t->port[rx_enq], &ev, 1); + if (err != 1) { + printf("%d: Failed to enqueue\n", __LINE__); + goto err; + } + } + + /* schedule */ + rte_service_run_iter_on_app_lcore(t->service_id, 1); + + err = test_event_dev_stats_get(evdev, &stats); + if (err) { + printf("%d: failed to get stats\n", __LINE__); + goto err; + } + + if (stats.rx_pkts != QID1_NUM + QID2_NUM || + stats.tx_pkts != QID1_NUM + QID2_NUM) { + printf("%d: Sched core didn't handle pkt as expected\n", + __LINE__); + goto err; + } + + if (stats.port_inflight[p1] != QID1_NUM) { + printf("%d: %s port 1 inflight not correct\n", __LINE__, + __func__); + goto err; + } + if (stats.port_inflight[p2] != QID2_NUM) { + printf("%d: %s port 2 inflight not correct\n", __LINE__, + __func__); + goto err; + } + + /************** DEQUEUE INFLIGHT COUNT CHECKS ****************/ + /* port 1 */ + struct rte_event events[QID1_NUM + QID2_NUM]; + uint32_t deq_pkts = rte_event_dequeue_burst(evdev, t->port[p1], events, + RTE_DIM(events), 0); + + if (deq_pkts != QID1_NUM) { + printf("%d: Port 1: DEQUEUE inflight failed\n", __LINE__); + goto err; + } + err = test_event_dev_stats_get(evdev, &stats); + if (stats.port_inflight[p1] != QID1_NUM) { + printf("%d: port 1 inflight decrement after DEQ != 0\n", + __LINE__); + goto err; + } + for (i = 0; i < QID1_NUM; i++) { + err = rte_event_enqueue_burst(evdev, t->port[p1], &release_ev, + 1); + if (err != 1) { + printf("%d: %s rte enqueue of inf release failed\n", + __LINE__, __func__); + goto err; + } + } + + /* + * As the scheduler core decrements inflights, it needs to run to + * process packets to act on the drop messages + */ + rte_service_run_iter_on_app_lcore(t->service_id, 1); + + err = test_event_dev_stats_get(evdev, &stats); + if (stats.port_inflight[p1] != 0) { + printf("%d: port 1 inflight NON NULL after DROP\n", __LINE__); + goto err; + } + + /* port2 */ + deq_pkts = rte_event_dequeue_burst(evdev, t->port[p2], events, + RTE_DIM(events), 0); + if (deq_pkts != QID2_NUM) { + printf("%d: Port 2: DEQUEUE inflight failed\n", __LINE__); + goto err; + } + err = test_event_dev_stats_get(evdev, &stats); + if (stats.port_inflight[p2] != QID2_NUM) { + printf("%d: port 1 inflight decrement after DEQ != 0\n", + __LINE__); + goto err; + } + for (i = 0; i < QID2_NUM; i++) { + err = rte_event_enqueue_burst(evdev, t->port[p2], &release_ev, + 1); + if (err != 1) { + printf("%d: %s rte enqueue of inf release failed\n", + __LINE__, __func__); + goto err; + } + } + + /* + * As the scheduler core decrements inflights, it needs to run to + * process packets to act on the drop messages + */ + rte_service_run_iter_on_app_lcore(t->service_id, 1); + + err = test_event_dev_stats_get(evdev, &stats); + if (stats.port_inflight[p2] != 0) { + printf("%d: port 2 inflight NON NULL after DROP\n", __LINE__); + goto err; + } + cleanup(t); + return 0; + +err: + rte_event_dev_dump(evdev, stdout); + cleanup(t); + return -1; +} + +static int +parallel_basic(struct test *t, int check_order) +{ + const uint8_t rx_port = 0; + const uint8_t w1_port = 1; + const uint8_t w3_port = 3; + const uint8_t tx_port = 4; + int err; + int i; + uint32_t deq_pkts, j; + struct rte_mbuf *mbufs[3]; + struct rte_mbuf *mbufs_out[3] = { 0 }; + const uint32_t MAGIC_SEQN = 1234; + + /* Create instance with 4 ports */ + if (init(t, 2, tx_port + 1) < 0 || + create_ports(t, tx_port + 1) < 0 || + (check_order ? create_ordered_qids(t, 1) : + create_unordered_qids(t, 1)) < 0 || + create_directed_qids(t, 1, &tx_port)) { + printf("%d: Error initializing device\n", __LINE__); + return -1; + } + + /* + * CQ mapping to QID + * We need three ports, all mapped to the same ordered qid0. Then we'll + * take a packet out to each port, re-enqueue in reverse order, + * then make sure the reordering has taken place properly when we + * dequeue from the tx_port. + * + * Simplified test setup diagram: + * + * rx_port w1_port + * \ / \ + * qid0 - w2_port - qid1 + * \ / \ + * w3_port tx_port + */ + /* CQ mapping to QID for LB ports (directed mapped on create) */ + for (i = w1_port; i <= w3_port; i++) { + err = rte_event_port_link(evdev, t->port[i], &t->qid[0], NULL, + 1); + if (err != 1) { + printf("%d: error mapping lb qid\n", __LINE__); + cleanup(t); + return -1; + } + } + + if (rte_event_dev_start(evdev) < 0) { + printf("%d: Error with start call\n", __LINE__); + return -1; + } + + /* Enqueue 3 packets to the rx port */ + for (i = 0; i < 3; i++) { + struct rte_event ev; + mbufs[i] = rte_gen_arp(0, t->mbuf_pool); + if (!mbufs[i]) { + printf("%d: gen of pkt failed\n", __LINE__); + return -1; + } + + ev.queue_id = t->qid[0]; + ev.op = RTE_EVENT_OP_NEW; + ev.mbuf = mbufs[i]; + mbufs[i]->seqn = MAGIC_SEQN + i; + + /* generate pkt and enqueue */ + err = rte_event_enqueue_burst(evdev, t->port[rx_port], &ev, 1); + if (err != 1) { + printf("%d: Failed to enqueue pkt %u, retval = %u\n", + __LINE__, i, err); + return -1; + } + } + + rte_service_run_iter_on_app_lcore(t->service_id, 1); + + /* use extra slot to make logic in loops easier */ + struct rte_event deq_ev[w3_port + 1]; + + /* Dequeue the 3 packets, one from each worker port */ + for (i = w1_port; i <= w3_port; i++) { + deq_pkts = rte_event_dequeue_burst(evdev, t->port[i], + &deq_ev[i], 1, 0); + if (deq_pkts != 1) { + printf("%d: Failed to deq\n", __LINE__); + rte_event_dev_dump(evdev, stdout); + return -1; + } + } + + /* Enqueue each packet in reverse order, flushing after each one */ + for (i = w3_port; i >= w1_port; i--) { + + deq_ev[i].op = RTE_EVENT_OP_FORWARD; + deq_ev[i].queue_id = t->qid[1]; + err = rte_event_enqueue_burst(evdev, t->port[i], &deq_ev[i], 1); + if (err != 1) { + printf("%d: Failed to enqueue\n", __LINE__); + return -1; + } + } + rte_service_run_iter_on_app_lcore(t->service_id, 1); + + /* dequeue from the tx ports, we should get 3 packets */ + deq_pkts = rte_event_dequeue_burst(evdev, t->port[tx_port], deq_ev, + 3, 0); + + /* Check to see if we've got all 3 packets */ + if (deq_pkts != 3) { + printf("%d: expected 3 pkts at tx port got %d from port %d\n", + __LINE__, deq_pkts, tx_port); + rte_event_dev_dump(evdev, stdout); + return 1; + } + + /* Check to see if the sequence numbers are in expected order */ + if (check_order) { + for (j = 0 ; j < deq_pkts ; j++) { + if (deq_ev[j].mbuf->seqn != MAGIC_SEQN + j) { + printf( + "%d: Incorrect sequence number(%d) from port %d\n", + __LINE__, mbufs_out[j]->seqn, tx_port); + return -1; + } + } + } + + /* Destroy the instance */ + cleanup(t); + return 0; +} + +static int +ordered_basic(struct test *t) +{ + return parallel_basic(t, 1); +} + +static int +unordered_basic(struct test *t) +{ + return parallel_basic(t, 0); +} + +static int +holb(struct test *t) /* test to check we avoid basic head-of-line blocking */ +{ + const struct rte_event new_ev = { + .op = RTE_EVENT_OP_NEW + /* all other fields zero */ + }; + struct rte_event ev = new_ev; + unsigned int rx_port = 0; /* port we get the first flow on */ + char rx_port_used_stat[64]; + char rx_port_free_stat[64]; + char other_port_used_stat[64]; + + if (init(t, 1, 2) < 0 || + create_ports(t, 2) < 0 || + create_atomic_qids(t, 1) < 0) { + printf("%d: Error initializing device\n", __LINE__); + return -1; + } + int nb_links = rte_event_port_link(evdev, t->port[1], NULL, NULL, 0); + if (rte_event_port_link(evdev, t->port[0], NULL, NULL, 0) != 1 || + nb_links != 1) { + printf("%d: Error links queue to ports\n", __LINE__); + goto err; + } + if (rte_event_dev_start(evdev) < 0) { + printf("%d: Error with start call\n", __LINE__); + goto err; + } + + /* send one packet and see where it goes, port 0 or 1 */ + if (rte_event_enqueue_burst(evdev, t->port[0], &ev, 1) != 1) { + printf("%d: Error doing first enqueue\n", __LINE__); + goto err; + } + rte_service_run_iter_on_app_lcore(t->service_id, 1); + + if (rte_event_dev_xstats_by_name_get(evdev, "port_0_cq_ring_used", NULL) + != 1) + rx_port = 1; + + snprintf(rx_port_used_stat, sizeof(rx_port_used_stat), + "port_%u_cq_ring_used", rx_port); + snprintf(rx_port_free_stat, sizeof(rx_port_free_stat), + "port_%u_cq_ring_free", rx_port); + snprintf(other_port_used_stat, sizeof(other_port_used_stat), + "port_%u_cq_ring_used", rx_port ^ 1); + if (rte_event_dev_xstats_by_name_get(evdev, rx_port_used_stat, NULL) + != 1) { + printf("%d: Error, first event not scheduled\n", __LINE__); + goto err; + } + + /* now fill up the rx port's queue with one flow to cause HOLB */ + do { + ev = new_ev; + if (rte_event_enqueue_burst(evdev, t->port[0], &ev, 1) != 1) { + printf("%d: Error with enqueue\n", __LINE__); + goto err; + } + rte_service_run_iter_on_app_lcore(t->service_id, 1); + } while (rte_event_dev_xstats_by_name_get(evdev, + rx_port_free_stat, NULL) != 0); + + /* one more packet, which needs to stay in IQ - i.e. HOLB */ + ev = new_ev; + if (rte_event_enqueue_burst(evdev, t->port[0], &ev, 1) != 1) { + printf("%d: Error with enqueue\n", __LINE__); + goto err; + } + rte_service_run_iter_on_app_lcore(t->service_id, 1); + + /* check that the other port still has an empty CQ */ + if (rte_event_dev_xstats_by_name_get(evdev, other_port_used_stat, NULL) + != 0) { + printf("%d: Error, second port CQ is not empty\n", __LINE__); + goto err; + } + /* check IQ now has one packet */ + if (rte_event_dev_xstats_by_name_get(evdev, "qid_0_iq_0_used", NULL) + != 1) { + printf("%d: Error, QID does not have exactly 1 packet\n", + __LINE__); + goto err; + } + + /* send another flow, which should pass the other IQ entry */ + ev = new_ev; + ev.flow_id = 1; + if (rte_event_enqueue_burst(evdev, t->port[0], &ev, 1) != 1) { + printf("%d: Error with enqueue\n", __LINE__); + goto err; + } + rte_service_run_iter_on_app_lcore(t->service_id, 1); + + if (rte_event_dev_xstats_by_name_get(evdev, other_port_used_stat, NULL) + != 1) { + printf("%d: Error, second flow did not pass out first\n", + __LINE__); + goto err; + } + + if (rte_event_dev_xstats_by_name_get(evdev, "qid_0_iq_0_used", NULL) + != 1) { + printf("%d: Error, QID does not have exactly 1 packet\n", + __LINE__); + goto err; + } + cleanup(t); + return 0; +err: + rte_event_dev_dump(evdev, stdout); + cleanup(t); + return -1; +} + +static void +flush(uint8_t dev_id __rte_unused, struct rte_event event, void *arg) +{ + *((uint8_t *) arg) += (event.u64 == 0xCA11BACC) ? 1 : 0; +} + +static int +dev_stop_flush(struct test *t) /* test to check we can properly flush events */ +{ + const struct rte_event new_ev = { + .op = RTE_EVENT_OP_NEW, + .u64 = 0xCA11BACC, + .queue_id = 0 + }; + struct rte_event ev = new_ev; + uint8_t count = 0; + int i; + + if (init(t, 1, 1) < 0 || + create_ports(t, 1) < 0 || + create_atomic_qids(t, 1) < 0) { + printf("%d: Error initializing device\n", __LINE__); + return -1; + } + + /* Link the queue so *_start() doesn't error out */ + if (rte_event_port_link(evdev, t->port[0], NULL, NULL, 0) != 1) { + printf("%d: Error linking queue to port\n", __LINE__); + goto err; + } + + if (rte_event_dev_start(evdev) < 0) { + printf("%d: Error with start call\n", __LINE__); + goto err; + } + + for (i = 0; i < DEQUEUE_DEPTH + 1; i++) { + if (rte_event_enqueue_burst(evdev, t->port[0], &ev, 1) != 1) { + printf("%d: Error enqueuing events\n", __LINE__); + goto err; + } + } + + /* Schedule the events from the port to the IQ. At least one event + * should be remaining in the queue. + */ + rte_service_run_iter_on_app_lcore(t->service_id, 1); + + if (rte_event_dev_stop_flush_callback_register(evdev, flush, &count)) { + printf("%d: Error installing the flush callback\n", __LINE__); + goto err; + } + + cleanup(t); + + if (count == 0) { + printf("%d: Error executing the flush callback\n", __LINE__); + goto err; + } + + if (rte_event_dev_stop_flush_callback_register(evdev, NULL, NULL)) { + printf("%d: Error uninstalling the flush callback\n", __LINE__); + goto err; + } + + return 0; +err: + rte_event_dev_dump(evdev, stdout); + cleanup(t); + return -1; +} + +static int +worker_loopback_worker_fn(void *arg) +{ + struct test *t = arg; + uint8_t port = t->port[1]; + int count = 0; + int enqd; + + /* + * Takes packets from the input port and then loops them back through + * the Eventdev. Each packet gets looped through QIDs 0-8, 16 times + * so each packet goes through 8*16 = 128 times. + */ + printf("%d: \tWorker function started\n", __LINE__); + while (count < NUM_PACKETS) { +#define BURST_SIZE 32 + struct rte_event ev[BURST_SIZE]; + uint16_t i, nb_rx = rte_event_dequeue_burst(evdev, port, ev, + BURST_SIZE, 0); + if (nb_rx == 0) { + rte_pause(); + continue; + } + + for (i = 0; i < nb_rx; i++) { + ev[i].queue_id++; + if (ev[i].queue_id != 8) { + ev[i].op = RTE_EVENT_OP_FORWARD; + enqd = rte_event_enqueue_burst(evdev, port, + &ev[i], 1); + if (enqd != 1) { + printf("%d: Can't enqueue FWD!!\n", + __LINE__); + return -1; + } + continue; + } + + ev[i].queue_id = 0; + ev[i].mbuf->udata64++; + if (ev[i].mbuf->udata64 != 16) { + ev[i].op = RTE_EVENT_OP_FORWARD; + enqd = rte_event_enqueue_burst(evdev, port, + &ev[i], 1); + if (enqd != 1) { + printf("%d: Can't enqueue FWD!!\n", + __LINE__); + return -1; + } + continue; + } + /* we have hit 16 iterations through system - drop */ + rte_pktmbuf_free(ev[i].mbuf); + count++; + ev[i].op = RTE_EVENT_OP_RELEASE; + enqd = rte_event_enqueue_burst(evdev, port, &ev[i], 1); + if (enqd != 1) { + printf("%d drop enqueue failed\n", __LINE__); + return -1; + } + } + } + + return 0; +} + +static int +worker_loopback_producer_fn(void *arg) +{ + struct test *t = arg; + uint8_t port = t->port[0]; + uint64_t count = 0; + + printf("%d: \tProducer function started\n", __LINE__); + while (count < NUM_PACKETS) { + struct rte_mbuf *m = 0; + do { + m = rte_pktmbuf_alloc(t->mbuf_pool); + } while (m == NULL); + + m->udata64 = 0; + + struct rte_event ev = { + .op = RTE_EVENT_OP_NEW, + .queue_id = t->qid[0], + .flow_id = (uintptr_t)m & 0xFFFF, + .mbuf = m, + }; + + if (rte_event_enqueue_burst(evdev, port, &ev, 1) != 1) { + while (rte_event_enqueue_burst(evdev, port, &ev, 1) != + 1) + rte_pause(); + } + + count++; + } + + return 0; +} + +static int +worker_loopback(struct test *t, uint8_t disable_implicit_release) +{ + /* use a single producer core, and a worker core to see what happens + * if the worker loops packets back multiple times + */ + struct test_event_dev_stats stats; + uint64_t print_cycles = 0, cycles = 0; + uint64_t tx_pkts = 0; + int err; + int w_lcore, p_lcore; + + if (init(t, 8, 2) < 0 || + create_atomic_qids(t, 8) < 0) { + printf("%d: Error initializing device\n", __LINE__); + return -1; + } + + /* RX with low max events */ + static struct rte_event_port_conf conf = { + .dequeue_depth = 32, + .enqueue_depth = 64, + }; + /* beware: this cannot be initialized in the static above as it would + * only be initialized once - and this needs to be set for multiple runs + */ + conf.new_event_threshold = 512; + conf.disable_implicit_release = disable_implicit_release; + + if (rte_event_port_setup(evdev, 0, &conf) < 0) { + printf("Error setting up RX port\n"); + return -1; + } + t->port[0] = 0; + /* TX with higher max events */ + conf.new_event_threshold = 4096; + if (rte_event_port_setup(evdev, 1, &conf) < 0) { + printf("Error setting up TX port\n"); + return -1; + } + t->port[1] = 1; + + /* CQ mapping to QID */ + err = rte_event_port_link(evdev, t->port[1], NULL, NULL, 0); + if (err != 8) { /* should have mapped all queues*/ + printf("%d: error mapping port 2 to all qids\n", __LINE__); + return -1; + } + + if (rte_event_dev_start(evdev) < 0) { + printf("%d: Error with start call\n", __LINE__); + return -1; + } + + p_lcore = rte_get_next_lcore( + /* start core */ -1, + /* skip master */ 1, + /* wrap */ 0); + w_lcore = rte_get_next_lcore(p_lcore, 1, 0); + + rte_eal_remote_launch(worker_loopback_producer_fn, t, p_lcore); + rte_eal_remote_launch(worker_loopback_worker_fn, t, w_lcore); + + print_cycles = cycles = rte_get_timer_cycles(); + while (rte_eal_get_lcore_state(p_lcore) != FINISHED || + rte_eal_get_lcore_state(w_lcore) != FINISHED) { + + rte_service_run_iter_on_app_lcore(t->service_id, 1); + + uint64_t new_cycles = rte_get_timer_cycles(); + + if (new_cycles - print_cycles > rte_get_timer_hz()) { + test_event_dev_stats_get(evdev, &stats); + printf( + "%d: \tSched Rx = %"PRIu64", Tx = %"PRIu64"\n", + __LINE__, stats.rx_pkts, stats.tx_pkts); + + print_cycles = new_cycles; + } + if (new_cycles - cycles > rte_get_timer_hz() * 3) { + test_event_dev_stats_get(evdev, &stats); + if (stats.tx_pkts == tx_pkts) { + rte_event_dev_dump(evdev, stdout); + printf("Dumping xstats:\n"); + xstats_print(); + printf( + "%d: No schedules for seconds, deadlock\n", + __LINE__); + return -1; + } + tx_pkts = stats.tx_pkts; + cycles = new_cycles; + } + } + rte_service_run_iter_on_app_lcore(t->service_id, 1); + /* ensure all completions are flushed */ + + rte_eal_mp_wait_lcore(); + + cleanup(t); + return 0; +} + +static struct rte_mempool *eventdev_func_mempool; + +int +test_sw_eventdev(void) +{ + struct test *t; + int ret; + + t = malloc(sizeof(struct test)); + if (t == NULL) + return -1; + /* manually initialize the op, older gcc's complain on static + * initialization of struct elements that are a bitfield. + */ + release_ev.op = RTE_EVENT_OP_RELEASE; + + const char *eventdev_name = "event_sw"; + evdev = rte_event_dev_get_dev_id(eventdev_name); + if (evdev < 0) { + printf("%d: Eventdev %s not found - creating.\n", + __LINE__, eventdev_name); + if (rte_vdev_init(eventdev_name, NULL) < 0) { + printf("Error creating eventdev\n"); + goto test_fail; + } + evdev = rte_event_dev_get_dev_id(eventdev_name); + if (evdev < 0) { + printf("Error finding newly created eventdev\n"); + goto test_fail; + } + } + + if (rte_event_dev_service_id_get(evdev, &t->service_id) < 0) { + printf("Failed to get service ID for software event dev\n"); + goto test_fail; + } + + rte_service_runstate_set(t->service_id, 1); + rte_service_set_runstate_mapped_check(t->service_id, 0); + + /* Only create mbuf pool once, reuse for each test run */ + if (!eventdev_func_mempool) { + eventdev_func_mempool = rte_pktmbuf_pool_create( + "EVENTDEV_SW_SA_MBUF_POOL", + (1<<12), /* 4k buffers */ + 32 /*MBUF_CACHE_SIZE*/, + 0, + 512, /* use very small mbufs */ + rte_socket_id()); + if (!eventdev_func_mempool) { + printf("ERROR creating mempool\n"); + goto test_fail; + } + } + t->mbuf_pool = eventdev_func_mempool; + printf("*** Running Single Directed Packet test...\n"); + ret = test_single_directed_packet(t); + if (ret != 0) { + printf("ERROR - Single Directed Packet test FAILED.\n"); + goto test_fail; + } + printf("*** Running Directed Forward Credit test...\n"); + ret = test_directed_forward_credits(t); + if (ret != 0) { + printf("ERROR - Directed Forward Credit test FAILED.\n"); + goto test_fail; + } + printf("*** Running Single Load Balanced Packet test...\n"); + ret = single_packet(t); + if (ret != 0) { + printf("ERROR - Single Packet test FAILED.\n"); + goto test_fail; + } + printf("*** Running Unordered Basic test...\n"); + ret = unordered_basic(t); + if (ret != 0) { + printf("ERROR - Unordered Basic test FAILED.\n"); + goto test_fail; + } + printf("*** Running Ordered Basic test...\n"); + ret = ordered_basic(t); + if (ret != 0) { + printf("ERROR - Ordered Basic test FAILED.\n"); + goto test_fail; + } + printf("*** Running Burst Packets test...\n"); + ret = burst_packets(t); + if (ret != 0) { + printf("ERROR - Burst Packets test FAILED.\n"); + goto test_fail; + } + printf("*** Running Load Balancing test...\n"); + ret = load_balancing(t); + if (ret != 0) { + printf("ERROR - Load Balancing test FAILED.\n"); + goto test_fail; + } + printf("*** Running Prioritized Directed test...\n"); + ret = test_priority_directed(t); + if (ret != 0) { + printf("ERROR - Prioritized Directed test FAILED.\n"); + goto test_fail; + } + printf("*** Running Prioritized Atomic test...\n"); + ret = test_priority_atomic(t); + if (ret != 0) { + printf("ERROR - Prioritized Atomic test FAILED.\n"); + goto test_fail; + } + + printf("*** Running Prioritized Ordered test...\n"); + ret = test_priority_ordered(t); + if (ret != 0) { + printf("ERROR - Prioritized Ordered test FAILED.\n"); + goto test_fail; + } + printf("*** Running Prioritized Unordered test...\n"); + ret = test_priority_unordered(t); + if (ret != 0) { + printf("ERROR - Prioritized Unordered test FAILED.\n"); + goto test_fail; + } + printf("*** Running Invalid QID test...\n"); + ret = invalid_qid(t); + if (ret != 0) { + printf("ERROR - Invalid QID test FAILED.\n"); + goto test_fail; + } + printf("*** Running Load Balancing History test...\n"); + ret = load_balancing_history(t); + if (ret != 0) { + printf("ERROR - Load Balancing History test FAILED.\n"); + goto test_fail; + } + printf("*** Running Inflight Count test...\n"); + ret = inflight_counts(t); + if (ret != 0) { + printf("ERROR - Inflight Count test FAILED.\n"); + goto test_fail; + } + printf("*** Running Abuse Inflights test...\n"); + ret = abuse_inflights(t); + if (ret != 0) { + printf("ERROR - Abuse Inflights test FAILED.\n"); + goto test_fail; + } + printf("*** Running XStats test...\n"); + ret = xstats_tests(t); + if (ret != 0) { + printf("ERROR - XStats test FAILED.\n"); + goto test_fail; + } + printf("*** Running XStats ID Reset test...\n"); + ret = xstats_id_reset_tests(t); + if (ret != 0) { + printf("ERROR - XStats ID Reset test FAILED.\n"); + goto test_fail; + } + printf("*** Running XStats Brute Force test...\n"); + ret = xstats_brute_force(t); + if (ret != 0) { + printf("ERROR - XStats Brute Force test FAILED.\n"); + goto test_fail; + } + printf("*** Running XStats ID Abuse test...\n"); + ret = xstats_id_abuse_tests(t); + if (ret != 0) { + printf("ERROR - XStats ID Abuse test FAILED.\n"); + goto test_fail; + } + printf("*** Running QID Priority test...\n"); + ret = qid_priorities(t); + if (ret != 0) { + printf("ERROR - QID Priority test FAILED.\n"); + goto test_fail; + } + printf("*** Running Unlink-in-progress test...\n"); + ret = unlink_in_progress(t); + if (ret != 0) { + printf("ERROR - Unlink in progress test FAILED.\n"); + goto test_fail; + } + printf("*** Running Ordered Reconfigure test...\n"); + ret = ordered_reconfigure(t); + if (ret != 0) { + printf("ERROR - Ordered Reconfigure test FAILED.\n"); + goto test_fail; + } + printf("*** Running Port LB Single Reconfig test...\n"); + ret = port_single_lb_reconfig(t); + if (ret != 0) { + printf("ERROR - Port LB Single Reconfig test FAILED.\n"); + goto test_fail; + } + printf("*** Running Port Reconfig Credits test...\n"); + ret = port_reconfig_credits(t); + if (ret != 0) { + printf("ERROR - Port Reconfig Credits Reset test FAILED.\n"); + goto test_fail; + } + printf("*** Running Head-of-line-blocking test...\n"); + ret = holb(t); + if (ret != 0) { + printf("ERROR - Head-of-line-blocking test FAILED.\n"); + goto test_fail; + } + printf("*** Running Stop Flush test...\n"); + ret = dev_stop_flush(t); + if (ret != 0) { + printf("ERROR - Stop Flush test FAILED.\n"); + goto test_fail; + } + if (rte_lcore_count() >= 3) { + printf("*** Running Worker loopback test...\n"); + ret = worker_loopback(t, 0); + if (ret != 0) { + printf("ERROR - Worker loopback test FAILED.\n"); + return ret; + } + + printf("*** Running Worker loopback test (implicit release disabled)...\n"); + ret = worker_loopback(t, 1); + if (ret != 0) { + printf("ERROR - Worker loopback test FAILED.\n"); + goto test_fail; + } + } else { + printf("### Not enough cores for worker loopback tests.\n"); + printf("### Need at least 3 cores for the tests.\n"); + } + + /* + * Free test instance, leaving mempool initialized, and a pointer to it + * in static eventdev_func_mempool, as it is re-used on re-runs + */ + free(t); + + printf("SW Eventdev Selftest Successful.\n"); + return 0; +test_fail: + free(t); + printf("SW Eventdev Selftest Failed.\n"); + return -1; +} diff --git a/src/spdk/dpdk/drivers/event/sw/sw_evdev_worker.c b/src/spdk/dpdk/drivers/event/sw/sw_evdev_worker.c new file mode 100644 index 000000000..063b919c7 --- /dev/null +++ b/src/spdk/dpdk/drivers/event/sw/sw_evdev_worker.c @@ -0,0 +1,186 @@ +/* SPDX-License-Identifier: BSD-3-Clause + * Copyright(c) 2016-2017 Intel Corporation + */ + +#include <rte_atomic.h> +#include <rte_cycles.h> +#include <rte_event_ring.h> + +#include "sw_evdev.h" + +#define PORT_ENQUEUE_MAX_BURST_SIZE 64 + +static inline void +sw_event_release(struct sw_port *p, uint8_t index) +{ + /* + * Drops the next outstanding event in our history. Used on dequeue + * to clear any history before dequeuing more events. + */ + RTE_SET_USED(index); + + /* create drop message */ + struct rte_event ev; + ev.op = sw_qe_flag_map[RTE_EVENT_OP_RELEASE]; + + uint16_t free_count; + rte_event_ring_enqueue_burst(p->rx_worker_ring, &ev, 1, &free_count); + + /* each release returns one credit */ + p->outstanding_releases--; + p->inflight_credits++; +} + +/* + * special-case of rte_event_ring enqueue, with overriding the ops member on + * the events that get written to the ring. + */ +static inline unsigned int +enqueue_burst_with_ops(struct rte_event_ring *r, const struct rte_event *events, + unsigned int n, uint8_t *ops) +{ + struct rte_event tmp_evs[PORT_ENQUEUE_MAX_BURST_SIZE]; + unsigned int i; + + memcpy(tmp_evs, events, n * sizeof(events[0])); + for (i = 0; i < n; i++) + tmp_evs[i].op = ops[i]; + + return rte_event_ring_enqueue_burst(r, tmp_evs, n, NULL); +} + +uint16_t +sw_event_enqueue_burst(void *port, const struct rte_event ev[], uint16_t num) +{ + int32_t i; + uint8_t new_ops[PORT_ENQUEUE_MAX_BURST_SIZE]; + struct sw_port *p = port; + struct sw_evdev *sw = (void *)p->sw; + uint32_t sw_inflights = rte_atomic32_read(&sw->inflights); + uint32_t credit_update_quanta = sw->credit_update_quanta; + int new = 0; + + if (num > PORT_ENQUEUE_MAX_BURST_SIZE) + num = PORT_ENQUEUE_MAX_BURST_SIZE; + + for (i = 0; i < num; i++) + new += (ev[i].op == RTE_EVENT_OP_NEW); + + if (unlikely(new > 0 && p->inflight_max < sw_inflights)) + return 0; + + if (p->inflight_credits < new) { + /* check if event enqueue brings port over max threshold */ + if (sw_inflights + credit_update_quanta > sw->nb_events_limit) + return 0; + + rte_atomic32_add(&sw->inflights, credit_update_quanta); + p->inflight_credits += (credit_update_quanta); + + /* If there are fewer inflight credits than new events, limit + * the number of enqueued events. + */ + num = (p->inflight_credits < new) ? p->inflight_credits : new; + } + + for (i = 0; i < num; i++) { + int op = ev[i].op; + int outstanding = p->outstanding_releases > 0; + const uint8_t invalid_qid = (ev[i].queue_id >= sw->qid_count); + + p->inflight_credits -= (op == RTE_EVENT_OP_NEW); + p->inflight_credits += (op == RTE_EVENT_OP_RELEASE) * + outstanding; + + new_ops[i] = sw_qe_flag_map[op]; + new_ops[i] &= ~(invalid_qid << QE_FLAG_VALID_SHIFT); + + /* FWD and RELEASE packets will both resolve to taken (assuming + * correct usage of the API), providing very high correct + * prediction rate. + */ + if ((new_ops[i] & QE_FLAG_COMPLETE) && outstanding) + p->outstanding_releases--; + + /* error case: branch to avoid touching p->stats */ + if (unlikely(invalid_qid && op != RTE_EVENT_OP_RELEASE)) { + p->stats.rx_dropped++; + p->inflight_credits++; + } + } + + /* returns number of events actually enqueued */ + uint32_t enq = enqueue_burst_with_ops(p->rx_worker_ring, ev, i, + new_ops); + if (p->outstanding_releases == 0 && p->last_dequeue_burst_sz != 0) { + uint64_t burst_ticks = rte_get_timer_cycles() - + p->last_dequeue_ticks; + uint64_t burst_pkt_ticks = + burst_ticks / p->last_dequeue_burst_sz; + p->avg_pkt_ticks -= p->avg_pkt_ticks / NUM_SAMPLES; + p->avg_pkt_ticks += burst_pkt_ticks / NUM_SAMPLES; + p->last_dequeue_ticks = 0; + } + + /* Replenish credits if enough releases are performed */ + if (p->inflight_credits >= credit_update_quanta * 2) { + rte_atomic32_sub(&sw->inflights, credit_update_quanta); + p->inflight_credits -= credit_update_quanta; + } + + return enq; +} + +uint16_t +sw_event_enqueue(void *port, const struct rte_event *ev) +{ + return sw_event_enqueue_burst(port, ev, 1); +} + +uint16_t +sw_event_dequeue_burst(void *port, struct rte_event *ev, uint16_t num, + uint64_t wait) +{ + RTE_SET_USED(wait); + struct sw_port *p = (void *)port; + struct rte_event_ring *ring = p->cq_worker_ring; + + /* check that all previous dequeues have been released */ + if (p->implicit_release) { + struct sw_evdev *sw = (void *)p->sw; + uint32_t credit_update_quanta = sw->credit_update_quanta; + uint16_t out_rels = p->outstanding_releases; + uint16_t i; + for (i = 0; i < out_rels; i++) + sw_event_release(p, i); + + /* Replenish credits if enough releases are performed */ + if (p->inflight_credits >= credit_update_quanta * 2) { + rte_atomic32_sub(&sw->inflights, credit_update_quanta); + p->inflight_credits -= credit_update_quanta; + } + } + + /* returns number of events actually dequeued */ + uint16_t ndeq = rte_event_ring_dequeue_burst(ring, ev, num, NULL); + if (unlikely(ndeq == 0)) { + p->zero_polls++; + p->total_polls++; + goto end; + } + + p->outstanding_releases += ndeq; + p->last_dequeue_burst_sz = ndeq; + p->last_dequeue_ticks = rte_get_timer_cycles(); + p->poll_buckets[(ndeq - 1) >> SW_DEQ_STAT_BUCKET_SHIFT]++; + p->total_polls++; + +end: + return ndeq; +} + +uint16_t +sw_event_dequeue(void *port, struct rte_event *ev, uint64_t wait) +{ + return sw_event_dequeue_burst(port, ev, 1, wait); +} diff --git a/src/spdk/dpdk/drivers/event/sw/sw_evdev_xstats.c b/src/spdk/dpdk/drivers/event/sw/sw_evdev_xstats.c new file mode 100644 index 000000000..02f787418 --- /dev/null +++ b/src/spdk/dpdk/drivers/event/sw/sw_evdev_xstats.c @@ -0,0 +1,649 @@ +/* SPDX-License-Identifier: BSD-3-Clause + * Copyright(c) 2016-2017 Intel Corporation + */ + +#include <rte_event_ring.h> +#include "sw_evdev.h" +#include "iq_chunk.h" + +enum xstats_type { + /* common stats */ + rx, + tx, + dropped, + inflight, + calls, + credits, + /* device instance specific */ + no_iq_enq, + no_cq_enq, + /* port_specific */ + rx_used, + rx_free, + tx_used, + tx_free, + pkt_cycles, + poll_return, /* for zero-count and used also for port bucket loop */ + /* qid_specific */ + iq_used, + /* qid port mapping specific */ + pinned, + pkts, /* note: qid-to-port pkts */ +}; + +typedef uint64_t (*xstats_fn)(const struct sw_evdev *dev, + uint16_t obj_idx, /* port or queue id */ + enum xstats_type stat, int extra_arg); + +struct sw_xstats_entry { + struct rte_event_dev_xstats_name name; + xstats_fn fn; + uint16_t obj_idx; + enum xstats_type stat; + enum rte_event_dev_xstats_mode mode; + int extra_arg; + uint8_t reset_allowed; /* when set, this value can be reset */ + uint64_t reset_value; /* an offset to be taken away to emulate resets */ +}; + +static uint64_t +get_dev_stat(const struct sw_evdev *sw, uint16_t obj_idx __rte_unused, + enum xstats_type type, int extra_arg __rte_unused) +{ + switch (type) { + case rx: return sw->stats.rx_pkts; + case tx: return sw->stats.tx_pkts; + case dropped: return sw->stats.rx_dropped; + case calls: return sw->sched_called; + case no_iq_enq: return sw->sched_no_iq_enqueues; + case no_cq_enq: return sw->sched_no_cq_enqueues; + default: return -1; + } +} + +static uint64_t +get_port_stat(const struct sw_evdev *sw, uint16_t obj_idx, + enum xstats_type type, int extra_arg __rte_unused) +{ + const struct sw_port *p = &sw->ports[obj_idx]; + + switch (type) { + case rx: return p->stats.rx_pkts; + case tx: return p->stats.tx_pkts; + case dropped: return p->stats.rx_dropped; + case inflight: return p->inflights; + case pkt_cycles: return p->avg_pkt_ticks; + case calls: return p->total_polls; + case credits: return p->inflight_credits; + case poll_return: return p->zero_polls; + case rx_used: return rte_event_ring_count(p->rx_worker_ring); + case rx_free: return rte_event_ring_free_count(p->rx_worker_ring); + case tx_used: return rte_event_ring_count(p->cq_worker_ring); + case tx_free: return rte_event_ring_free_count(p->cq_worker_ring); + default: return -1; + } +} + +static uint64_t +get_port_bucket_stat(const struct sw_evdev *sw, uint16_t obj_idx, + enum xstats_type type, int extra_arg) +{ + const struct sw_port *p = &sw->ports[obj_idx]; + + switch (type) { + case poll_return: return p->poll_buckets[extra_arg]; + default: return -1; + } +} + +static uint64_t +get_qid_stat(const struct sw_evdev *sw, uint16_t obj_idx, + enum xstats_type type, int extra_arg __rte_unused) +{ + const struct sw_qid *qid = &sw->qids[obj_idx]; + + switch (type) { + case rx: return qid->stats.rx_pkts; + case tx: return qid->stats.tx_pkts; + case dropped: return qid->stats.rx_dropped; + case inflight: + do { + uint64_t infl = 0; + unsigned int i; + for (i = 0; i < RTE_DIM(qid->fids); i++) + infl += qid->fids[i].pcount; + return infl; + } while (0); + break; + default: return -1; + } +} + +static uint64_t +get_qid_iq_stat(const struct sw_evdev *sw, uint16_t obj_idx, + enum xstats_type type, int extra_arg) +{ + const struct sw_qid *qid = &sw->qids[obj_idx]; + const int iq_idx = extra_arg; + + switch (type) { + case iq_used: return iq_count(&qid->iq[iq_idx]); + default: return -1; + } +} + +static uint64_t +get_qid_port_stat(const struct sw_evdev *sw, uint16_t obj_idx, + enum xstats_type type, int extra_arg) +{ + const struct sw_qid *qid = &sw->qids[obj_idx]; + uint16_t port = extra_arg; + + switch (type) { + case pinned: + do { + uint64_t pin = 0; + unsigned int i; + for (i = 0; i < RTE_DIM(qid->fids); i++) + if (qid->fids[i].cq == port) + pin++; + return pin; + } while (0); + break; + case pkts: + return qid->to_port[port]; + default: return -1; + } +} + +int +sw_xstats_init(struct sw_evdev *sw) +{ + /* + * define the stats names and types. Used to build up the device + * xstats array + * There are multiple set of stats: + * - device-level, + * - per-port, + * - per-port-dequeue-burst-sizes + * - per-qid, + * - per-iq + * - per-port-per-qid + * + * For each of these sets, we have three parallel arrays, one for the + * names, the other for the stat type parameter to be passed in the fn + * call to get that stat. The third array allows resetting or not. + * All these arrays must be kept in sync + */ + static const char * const dev_stats[] = { "rx", "tx", "drop", + "sched_calls", "sched_no_iq_enq", "sched_no_cq_enq", + }; + static const enum xstats_type dev_types[] = { rx, tx, dropped, + calls, no_iq_enq, no_cq_enq, + }; + /* all device stats are allowed to be reset */ + + static const char * const port_stats[] = {"rx", "tx", "drop", + "inflight", "avg_pkt_cycles", "credits", + "rx_ring_used", "rx_ring_free", + "cq_ring_used", "cq_ring_free", + "dequeue_calls", "dequeues_returning_0", + }; + static const enum xstats_type port_types[] = { rx, tx, dropped, + inflight, pkt_cycles, credits, + rx_used, rx_free, tx_used, tx_free, + calls, poll_return, + }; + static const uint8_t port_reset_allowed[] = {1, 1, 1, + 0, 1, 0, + 0, 0, 0, 0, + 1, 1, + }; + + static const char * const port_bucket_stats[] = { + "dequeues_returning" }; + static const enum xstats_type port_bucket_types[] = { poll_return }; + /* all bucket dequeues are allowed to be reset, handled in loop below */ + + static const char * const qid_stats[] = {"rx", "tx", "drop", + "inflight" + }; + static const enum xstats_type qid_types[] = { rx, tx, dropped, + inflight + }; + static const uint8_t qid_reset_allowed[] = {1, 1, 1, + 0 + }; + + static const char * const qid_iq_stats[] = { "used" }; + static const enum xstats_type qid_iq_types[] = { iq_used }; + /* reset allowed */ + + static const char * const qid_port_stats[] = { "pinned_flows", + "packets" + }; + static const enum xstats_type qid_port_types[] = { pinned, pkts }; + static const uint8_t qid_port_reset_allowed[] = {0, 1}; + /* reset allowed */ + /* ---- end of stat definitions ---- */ + + /* check sizes, since a missed comma can lead to strings being + * joined by the compiler. + */ + RTE_BUILD_BUG_ON(RTE_DIM(dev_stats) != RTE_DIM(dev_types)); + RTE_BUILD_BUG_ON(RTE_DIM(port_stats) != RTE_DIM(port_types)); + RTE_BUILD_BUG_ON(RTE_DIM(qid_stats) != RTE_DIM(qid_types)); + RTE_BUILD_BUG_ON(RTE_DIM(qid_iq_stats) != RTE_DIM(qid_iq_types)); + RTE_BUILD_BUG_ON(RTE_DIM(qid_port_stats) != RTE_DIM(qid_port_types)); + RTE_BUILD_BUG_ON(RTE_DIM(port_bucket_stats) != + RTE_DIM(port_bucket_types)); + + RTE_BUILD_BUG_ON(RTE_DIM(port_stats) != RTE_DIM(port_reset_allowed)); + RTE_BUILD_BUG_ON(RTE_DIM(qid_stats) != RTE_DIM(qid_reset_allowed)); + + /* other vars */ + const uint32_t cons_bkt_shift = + (MAX_SW_CONS_Q_DEPTH >> SW_DEQ_STAT_BUCKET_SHIFT); + const unsigned int count = RTE_DIM(dev_stats) + + sw->port_count * RTE_DIM(port_stats) + + sw->port_count * RTE_DIM(port_bucket_stats) * + (cons_bkt_shift + 1) + + sw->qid_count * RTE_DIM(qid_stats) + + sw->qid_count * SW_IQS_MAX * RTE_DIM(qid_iq_stats) + + sw->qid_count * sw->port_count * + RTE_DIM(qid_port_stats); + unsigned int i, port, qid, iq, bkt, stat = 0; + + sw->xstats = rte_zmalloc_socket(NULL, sizeof(sw->xstats[0]) * count, 0, + sw->data->socket_id); + if (sw->xstats == NULL) + return -ENOMEM; + +#define sname sw->xstats[stat].name.name + for (i = 0; i < RTE_DIM(dev_stats); i++, stat++) { + sw->xstats[stat] = (struct sw_xstats_entry){ + .fn = get_dev_stat, + .stat = dev_types[i], + .mode = RTE_EVENT_DEV_XSTATS_DEVICE, + .reset_allowed = 1, + }; + snprintf(sname, sizeof(sname), "dev_%s", dev_stats[i]); + } + sw->xstats_count_mode_dev = stat; + + for (port = 0; port < sw->port_count; port++) { + sw->xstats_offset_for_port[port] = stat; + + uint32_t count_offset = stat; + + for (i = 0; i < RTE_DIM(port_stats); i++, stat++) { + sw->xstats[stat] = (struct sw_xstats_entry){ + .fn = get_port_stat, + .obj_idx = port, + .stat = port_types[i], + .mode = RTE_EVENT_DEV_XSTATS_PORT, + .reset_allowed = port_reset_allowed[i], + }; + snprintf(sname, sizeof(sname), "port_%u_%s", + port, port_stats[i]); + } + + for (bkt = 0; bkt < (rte_event_ring_get_capacity( + sw->ports[port].cq_worker_ring) >> + SW_DEQ_STAT_BUCKET_SHIFT) + 1; bkt++) { + for (i = 0; i < RTE_DIM(port_bucket_stats); i++) { + sw->xstats[stat] = (struct sw_xstats_entry){ + .fn = get_port_bucket_stat, + .obj_idx = port, + .stat = port_bucket_types[i], + .mode = RTE_EVENT_DEV_XSTATS_PORT, + .extra_arg = bkt, + .reset_allowed = 1, + }; + snprintf(sname, sizeof(sname), + "port_%u_%s_%u-%u", + port, port_bucket_stats[i], + (bkt << SW_DEQ_STAT_BUCKET_SHIFT) + 1, + (bkt + 1) << SW_DEQ_STAT_BUCKET_SHIFT); + stat++; + } + } + + sw->xstats_count_per_port[port] = stat - count_offset; + } + + sw->xstats_count_mode_port = stat - sw->xstats_count_mode_dev; + + for (qid = 0; qid < sw->qid_count; qid++) { + uint32_t count_offset = stat; + sw->xstats_offset_for_qid[qid] = stat; + + for (i = 0; i < RTE_DIM(qid_stats); i++, stat++) { + sw->xstats[stat] = (struct sw_xstats_entry){ + .fn = get_qid_stat, + .obj_idx = qid, + .stat = qid_types[i], + .mode = RTE_EVENT_DEV_XSTATS_QUEUE, + .reset_allowed = qid_reset_allowed[i], + }; + snprintf(sname, sizeof(sname), "qid_%u_%s", + qid, qid_stats[i]); + } + for (iq = 0; iq < SW_IQS_MAX; iq++) + for (i = 0; i < RTE_DIM(qid_iq_stats); i++, stat++) { + sw->xstats[stat] = (struct sw_xstats_entry){ + .fn = get_qid_iq_stat, + .obj_idx = qid, + .stat = qid_iq_types[i], + .mode = RTE_EVENT_DEV_XSTATS_QUEUE, + .extra_arg = iq, + .reset_allowed = 0, + }; + snprintf(sname, sizeof(sname), + "qid_%u_iq_%u_%s", + qid, iq, + qid_iq_stats[i]); + } + + for (port = 0; port < sw->port_count; port++) + for (i = 0; i < RTE_DIM(qid_port_stats); i++, stat++) { + sw->xstats[stat] = (struct sw_xstats_entry){ + .fn = get_qid_port_stat, + .obj_idx = qid, + .stat = qid_port_types[i], + .mode = RTE_EVENT_DEV_XSTATS_QUEUE, + .extra_arg = port, + .reset_allowed = + qid_port_reset_allowed[i], + }; + snprintf(sname, sizeof(sname), + "qid_%u_port_%u_%s", + qid, port, + qid_port_stats[i]); + } + + sw->xstats_count_per_qid[qid] = stat - count_offset; + } + + sw->xstats_count_mode_queue = stat - + (sw->xstats_count_mode_dev + sw->xstats_count_mode_port); +#undef sname + + sw->xstats_count = stat; + + return stat; +} + +int +sw_xstats_uninit(struct sw_evdev *sw) +{ + rte_free(sw->xstats); + sw->xstats_count = 0; + return 0; +} + +int +sw_xstats_get_names(const struct rte_eventdev *dev, + enum rte_event_dev_xstats_mode mode, uint8_t queue_port_id, + struct rte_event_dev_xstats_name *xstats_names, + unsigned int *ids, unsigned int size) +{ + const struct sw_evdev *sw = sw_pmd_priv_const(dev); + unsigned int i; + unsigned int xidx = 0; + + uint32_t xstats_mode_count = 0; + uint32_t start_offset = 0; + + switch (mode) { + case RTE_EVENT_DEV_XSTATS_DEVICE: + xstats_mode_count = sw->xstats_count_mode_dev; + break; + case RTE_EVENT_DEV_XSTATS_PORT: + if (queue_port_id >= (signed int)sw->port_count) + break; + xstats_mode_count = sw->xstats_count_per_port[queue_port_id]; + start_offset = sw->xstats_offset_for_port[queue_port_id]; + break; + case RTE_EVENT_DEV_XSTATS_QUEUE: + if (queue_port_id >= (signed int)sw->qid_count) + break; + xstats_mode_count = sw->xstats_count_per_qid[queue_port_id]; + start_offset = sw->xstats_offset_for_qid[queue_port_id]; + break; + default: + SW_LOG_ERR("Invalid mode received in sw_xstats_get_names()\n"); + return -EINVAL; + }; + + if (xstats_mode_count > size || !ids || !xstats_names) + return xstats_mode_count; + + for (i = 0; i < sw->xstats_count && xidx < size; i++) { + if (sw->xstats[i].mode != mode) + continue; + + if (mode != RTE_EVENT_DEV_XSTATS_DEVICE && + queue_port_id != sw->xstats[i].obj_idx) + continue; + + xstats_names[xidx] = sw->xstats[i].name; + if (ids) + ids[xidx] = start_offset + xidx; + xidx++; + } + return xidx; +} + +static int +sw_xstats_update(struct sw_evdev *sw, enum rte_event_dev_xstats_mode mode, + uint8_t queue_port_id, const unsigned int ids[], + uint64_t values[], unsigned int n, const uint32_t reset, + const uint32_t ret_if_n_lt_nstats) +{ + unsigned int i; + unsigned int xidx = 0; + RTE_SET_USED(mode); + RTE_SET_USED(queue_port_id); + + uint32_t xstats_mode_count = 0; + + switch (mode) { + case RTE_EVENT_DEV_XSTATS_DEVICE: + xstats_mode_count = sw->xstats_count_mode_dev; + break; + case RTE_EVENT_DEV_XSTATS_PORT: + if (queue_port_id >= (signed int)sw->port_count) + goto invalid_value; + xstats_mode_count = sw->xstats_count_per_port[queue_port_id]; + break; + case RTE_EVENT_DEV_XSTATS_QUEUE: + if (queue_port_id >= (signed int)sw->qid_count) + goto invalid_value; + xstats_mode_count = sw->xstats_count_per_qid[queue_port_id]; + break; + default: + SW_LOG_ERR("Invalid mode received in sw_xstats_get()\n"); + goto invalid_value; + }; + + /* this function can check num stats and return them (xstats_get() style + * behaviour) or ignore n for reset() of a single stat style behaviour. + */ + if (ret_if_n_lt_nstats && xstats_mode_count > n) + return xstats_mode_count; + + for (i = 0; i < n && xidx < xstats_mode_count; i++) { + struct sw_xstats_entry *xs = &sw->xstats[ids[i]]; + if (ids[i] > sw->xstats_count || xs->mode != mode) + continue; + + if (mode != RTE_EVENT_DEV_XSTATS_DEVICE && + queue_port_id != xs->obj_idx) + continue; + + uint64_t val = xs->fn(sw, xs->obj_idx, xs->stat, xs->extra_arg) + - xs->reset_value; + + if (values) + values[xidx] = val; + + if (xs->reset_allowed && reset) + xs->reset_value += val; + + xidx++; + } + + return xidx; +invalid_value: + return -EINVAL; +} + +int +sw_xstats_get(const struct rte_eventdev *dev, + enum rte_event_dev_xstats_mode mode, uint8_t queue_port_id, + const unsigned int ids[], uint64_t values[], unsigned int n) +{ + struct sw_evdev *sw = sw_pmd_priv(dev); + const uint32_t reset = 0; + const uint32_t ret_n_lt_stats = 0; + return sw_xstats_update(sw, mode, queue_port_id, ids, values, n, + reset, ret_n_lt_stats); +} + +uint64_t +sw_xstats_get_by_name(const struct rte_eventdev *dev, + const char *name, unsigned int *id) +{ + const struct sw_evdev *sw = sw_pmd_priv_const(dev); + unsigned int i; + + for (i = 0; i < sw->xstats_count; i++) { + struct sw_xstats_entry *xs = &sw->xstats[i]; + if (strncmp(xs->name.name, name, + RTE_EVENT_DEV_XSTATS_NAME_SIZE) == 0){ + if (id != NULL) + *id = i; + return xs->fn(sw, xs->obj_idx, xs->stat, xs->extra_arg) + - xs->reset_value; + } + } + if (id != NULL) + *id = (uint32_t)-1; + return (uint64_t)-1; +} + +static void +sw_xstats_reset_range(struct sw_evdev *sw, uint32_t start, uint32_t num) +{ + uint32_t i; + for (i = start; i < start + num; i++) { + struct sw_xstats_entry *xs = &sw->xstats[i]; + if (!xs->reset_allowed) + continue; + + uint64_t val = xs->fn(sw, xs->obj_idx, xs->stat, xs->extra_arg); + xs->reset_value = val; + } +} + +static int +sw_xstats_reset_queue(struct sw_evdev *sw, uint8_t queue_id, + const uint32_t ids[], uint32_t nb_ids) +{ + const uint32_t reset = 1; + const uint32_t ret_n_lt_stats = 0; + if (ids) { + uint32_t nb_reset = sw_xstats_update(sw, + RTE_EVENT_DEV_XSTATS_QUEUE, + queue_id, ids, NULL, nb_ids, + reset, ret_n_lt_stats); + return nb_reset == nb_ids ? 0 : -EINVAL; + } + + if (ids == NULL) + sw_xstats_reset_range(sw, sw->xstats_offset_for_qid[queue_id], + sw->xstats_count_per_qid[queue_id]); + + return 0; +} + +static int +sw_xstats_reset_port(struct sw_evdev *sw, uint8_t port_id, + const uint32_t ids[], uint32_t nb_ids) +{ + const uint32_t reset = 1; + const uint32_t ret_n_lt_stats = 0; + int offset = sw->xstats_offset_for_port[port_id]; + int nb_stat = sw->xstats_count_per_port[port_id]; + + if (ids) { + uint32_t nb_reset = sw_xstats_update(sw, + RTE_EVENT_DEV_XSTATS_PORT, port_id, + ids, NULL, nb_ids, + reset, ret_n_lt_stats); + return nb_reset == nb_ids ? 0 : -EINVAL; + } + + sw_xstats_reset_range(sw, offset, nb_stat); + return 0; +} + +static int +sw_xstats_reset_dev(struct sw_evdev *sw, const uint32_t ids[], uint32_t nb_ids) +{ + uint32_t i; + if (ids) { + for (i = 0; i < nb_ids; i++) { + uint32_t id = ids[i]; + if (id >= sw->xstats_count_mode_dev) + return -EINVAL; + sw_xstats_reset_range(sw, id, 1); + } + } else { + for (i = 0; i < sw->xstats_count_mode_dev; i++) + sw_xstats_reset_range(sw, i, 1); + } + + return 0; +} + +int +sw_xstats_reset(struct rte_eventdev *dev, + enum rte_event_dev_xstats_mode mode, + int16_t queue_port_id, + const uint32_t ids[], + uint32_t nb_ids) +{ + struct sw_evdev *sw = sw_pmd_priv(dev); + uint32_t i, err; + + /* handle -1 for queue_port_id here, looping over all ports/queues */ + switch (mode) { + case RTE_EVENT_DEV_XSTATS_DEVICE: + sw_xstats_reset_dev(sw, ids, nb_ids); + break; + case RTE_EVENT_DEV_XSTATS_PORT: + if (queue_port_id == -1) { + for (i = 0; i < sw->port_count; i++) { + err = sw_xstats_reset_port(sw, i, ids, nb_ids); + if (err) + return -EINVAL; + } + } else if (queue_port_id < (int16_t)sw->port_count) + sw_xstats_reset_port(sw, queue_port_id, ids, nb_ids); + break; + case RTE_EVENT_DEV_XSTATS_QUEUE: + if (queue_port_id == -1) { + for (i = 0; i < sw->qid_count; i++) { + err = sw_xstats_reset_queue(sw, i, ids, nb_ids); + if (err) + return -EINVAL; + } + } else if (queue_port_id < (int16_t)sw->qid_count) + sw_xstats_reset_queue(sw, queue_port_id, ids, nb_ids); + break; + }; + + return 0; +} |