summaryrefslogtreecommitdiffstats
path: root/src/spdk/dpdk/examples/eventdev_pipeline
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-27 18:24:20 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-27 18:24:20 +0000
commit483eb2f56657e8e7f419ab1a4fab8dce9ade8609 (patch)
treee5d88d25d870d5dedacb6bbdbe2a966086a0a5cf /src/spdk/dpdk/examples/eventdev_pipeline
parentInitial commit. (diff)
downloadceph-483eb2f56657e8e7f419ab1a4fab8dce9ade8609.tar.xz
ceph-483eb2f56657e8e7f419ab1a4fab8dce9ade8609.zip
Adding upstream version 14.2.21.upstream/14.2.21upstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'src/spdk/dpdk/examples/eventdev_pipeline')
-rw-r--r--src/spdk/dpdk/examples/eventdev_pipeline/Makefile61
-rw-r--r--src/spdk/dpdk/examples/eventdev_pipeline/main.c582
-rw-r--r--src/spdk/dpdk/examples/eventdev_pipeline/meson.build15
-rw-r--r--src/spdk/dpdk/examples/eventdev_pipeline/pipeline_common.h153
-rw-r--r--src/spdk/dpdk/examples/eventdev_pipeline/pipeline_worker_generic.c568
-rw-r--r--src/spdk/dpdk/examples/eventdev_pipeline/pipeline_worker_tx.c838
6 files changed, 2217 insertions, 0 deletions
diff --git a/src/spdk/dpdk/examples/eventdev_pipeline/Makefile b/src/spdk/dpdk/examples/eventdev_pipeline/Makefile
new file mode 100644
index 00000000..1a789ccc
--- /dev/null
+++ b/src/spdk/dpdk/examples/eventdev_pipeline/Makefile
@@ -0,0 +1,61 @@
+# SPDX-License-Identifier: BSD-3-Clause
+# Copyright(c) 2016-2017 Intel Corporation
+
+# binary name
+APP = eventdev_pipeline
+
+# all source are stored in SRCS-y
+SRCS-y := main.c
+SRCS-y += pipeline_worker_generic.c
+SRCS-y += pipeline_worker_tx.c
+
+# Build using pkg-config variables if possible
+$(shell pkg-config --exists libdpdk)
+ifeq ($(.SHELLSTATUS),0)
+
+all: shared
+.PHONY: shared static
+shared: build/$(APP)-shared
+ ln -sf $(APP)-shared build/$(APP)
+static: build/$(APP)-static
+ ln -sf $(APP)-static build/$(APP)
+
+PC_FILE := $(shell pkg-config --path libdpdk)
+CFLAGS += -O3 $(shell pkg-config --cflags libdpdk)
+LDFLAGS_SHARED = $(shell pkg-config --libs libdpdk)
+LDFLAGS_STATIC = -Wl,-Bstatic $(shell pkg-config --static --libs libdpdk)
+
+CFLAGS += -DALLOW_EXPERIMENTAL_API
+
+build/$(APP)-shared: $(SRCS-y) Makefile $(PC_FILE) | build
+ $(CC) $(CFLAGS) $(SRCS-y) -o $@ $(LDFLAGS) $(LDFLAGS_SHARED)
+
+build/$(APP)-static: $(SRCS-y) Makefile $(PC_FILE) | build
+ $(CC) $(CFLAGS) $(SRCS-y) -o $@ $(LDFLAGS) $(LDFLAGS_STATIC)
+
+build:
+ @mkdir -p $@
+
+.PHONY: clean
+clean:
+ rm -f build/$(APP) build/$(APP)-static build/$(APP)-shared
+ rmdir --ignore-fail-on-non-empty build
+
+else
+
+ifeq ($(RTE_SDK),)
+$(error "Please define RTE_SDK environment variable")
+endif
+
+# Default target, can be overridden by command line or environment
+RTE_TARGET ?= x86_64-native-linuxapp-gcc
+
+include $(RTE_SDK)/mk/rte.vars.mk
+
+CFLAGS += -DALLOW_EXPERIMENTAL_API
+CFLAGS += -O3
+CFLAGS += $(WERROR_FLAGS)
+
+include $(RTE_SDK)/mk/rte.extapp.mk
+
+endif
diff --git a/src/spdk/dpdk/examples/eventdev_pipeline/main.c b/src/spdk/dpdk/examples/eventdev_pipeline/main.c
new file mode 100644
index 00000000..700bc696
--- /dev/null
+++ b/src/spdk/dpdk/examples/eventdev_pipeline/main.c
@@ -0,0 +1,582 @@
+/* SPDX-License-Identifier: BSD-3-Clause
+ * Copyright(c) 2016-2017 Intel Corporation
+ */
+
+#include <getopt.h>
+#include <stdint.h>
+#include <stdio.h>
+#include <signal.h>
+#include <sched.h>
+
+#include "pipeline_common.h"
+
+struct config_data cdata = {
+ .num_packets = (1L << 25), /* do ~32M packets */
+ .num_fids = 512,
+ .queue_type = RTE_SCHED_TYPE_ATOMIC,
+ .next_qid = {-1},
+ .qid = {-1},
+ .num_stages = 1,
+ .worker_cq_depth = 16
+};
+
+static bool
+core_in_use(unsigned int lcore_id) {
+ return (fdata->rx_core[lcore_id] || fdata->sched_core[lcore_id] ||
+ fdata->tx_core[lcore_id] || fdata->worker_core[lcore_id]);
+}
+
+static void
+eth_tx_buffer_retry(struct rte_mbuf **pkts, uint16_t unsent,
+ void *userdata)
+{
+ int port_id = (uintptr_t) userdata;
+ unsigned int _sent = 0;
+
+ do {
+ /* Note: hard-coded TX queue */
+ _sent += rte_eth_tx_burst(port_id, 0, &pkts[_sent],
+ unsent - _sent);
+ } while (_sent != unsent);
+}
+
+/*
+ * Parse the coremask given as argument (hexadecimal string) and fill
+ * the global configuration (core role and core count) with the parsed
+ * value.
+ */
+static int xdigit2val(unsigned char c)
+{
+ int val;
+
+ if (isdigit(c))
+ val = c - '0';
+ else if (isupper(c))
+ val = c - 'A' + 10;
+ else
+ val = c - 'a' + 10;
+ return val;
+}
+
+static uint64_t
+parse_coremask(const char *coremask)
+{
+ int i, j, idx = 0;
+ unsigned int count = 0;
+ char c;
+ int val;
+ uint64_t mask = 0;
+ const int32_t BITS_HEX = 4;
+
+ if (coremask == NULL)
+ return -1;
+ /* Remove all blank characters ahead and after .
+ * Remove 0x/0X if exists.
+ */
+ while (isblank(*coremask))
+ coremask++;
+ if (coremask[0] == '0' && ((coremask[1] == 'x')
+ || (coremask[1] == 'X')))
+ coremask += 2;
+ i = strlen(coremask);
+ while ((i > 0) && isblank(coremask[i - 1]))
+ i--;
+ if (i == 0)
+ return -1;
+
+ for (i = i - 1; i >= 0 && idx < MAX_NUM_CORE; i--) {
+ c = coremask[i];
+ if (isxdigit(c) == 0) {
+ /* invalid characters */
+ return -1;
+ }
+ val = xdigit2val(c);
+ for (j = 0; j < BITS_HEX && idx < MAX_NUM_CORE; j++, idx++) {
+ if ((1 << j) & val) {
+ mask |= (1UL << idx);
+ count++;
+ }
+ }
+ }
+ for (; i >= 0; i--)
+ if (coremask[i] != '0')
+ return -1;
+ if (count == 0)
+ return -1;
+ return mask;
+}
+
+static struct option long_options[] = {
+ {"workers", required_argument, 0, 'w'},
+ {"packets", required_argument, 0, 'n'},
+ {"atomic-flows", required_argument, 0, 'f'},
+ {"num_stages", required_argument, 0, 's'},
+ {"rx-mask", required_argument, 0, 'r'},
+ {"tx-mask", required_argument, 0, 't'},
+ {"sched-mask", required_argument, 0, 'e'},
+ {"cq-depth", required_argument, 0, 'c'},
+ {"work-cycles", required_argument, 0, 'W'},
+ {"mempool-size", required_argument, 0, 'm'},
+ {"queue-priority", no_argument, 0, 'P'},
+ {"parallel", no_argument, 0, 'p'},
+ {"ordered", no_argument, 0, 'o'},
+ {"quiet", no_argument, 0, 'q'},
+ {"use-atq", no_argument, 0, 'a'},
+ {"dump", no_argument, 0, 'D'},
+ {0, 0, 0, 0}
+};
+
+static void
+usage(void)
+{
+ const char *usage_str =
+ " Usage: eventdev_demo [options]\n"
+ " Options:\n"
+ " -n, --packets=N Send N packets (default ~32M), 0 implies no limit\n"
+ " -f, --atomic-flows=N Use N random flows from 1 to N (default 16)\n"
+ " -s, --num_stages=N Use N atomic stages (default 1)\n"
+ " -r, --rx-mask=core mask Run NIC rx on CPUs in core mask\n"
+ " -w, --worker-mask=core mask Run worker on CPUs in core mask\n"
+ " -t, --tx-mask=core mask Run NIC tx on CPUs in core mask\n"
+ " -e --sched-mask=core mask Run scheduler on CPUs in core mask\n"
+ " -c --cq-depth=N Worker CQ depth (default 16)\n"
+ " -W --work-cycles=N Worker cycles (default 0)\n"
+ " -P --queue-priority Enable scheduler queue prioritization\n"
+ " -o, --ordered Use ordered scheduling\n"
+ " -p, --parallel Use parallel scheduling\n"
+ " -q, --quiet Minimize printed output\n"
+ " -a, --use-atq Use all type queues\n"
+ " -m, --mempool-size=N Dictate the mempool size\n"
+ " -D, --dump Print detailed statistics before exit"
+ "\n";
+ fprintf(stderr, "%s", usage_str);
+ exit(1);
+}
+
+static void
+parse_app_args(int argc, char **argv)
+{
+ /* Parse cli options*/
+ int option_index;
+ int c;
+ opterr = 0;
+ uint64_t rx_lcore_mask = 0;
+ uint64_t tx_lcore_mask = 0;
+ uint64_t sched_lcore_mask = 0;
+ uint64_t worker_lcore_mask = 0;
+ int i;
+
+ for (;;) {
+ c = getopt_long(argc, argv, "r:t:e:c:w:n:f:s:m:paoPqDW:",
+ long_options, &option_index);
+ if (c == -1)
+ break;
+
+ int popcnt = 0;
+ switch (c) {
+ case 'n':
+ cdata.num_packets = (int64_t)atol(optarg);
+ if (cdata.num_packets == 0)
+ cdata.num_packets = INT64_MAX;
+ break;
+ case 'f':
+ cdata.num_fids = (unsigned int)atoi(optarg);
+ break;
+ case 's':
+ cdata.num_stages = (unsigned int)atoi(optarg);
+ break;
+ case 'c':
+ cdata.worker_cq_depth = (unsigned int)atoi(optarg);
+ break;
+ case 'W':
+ cdata.worker_cycles = (unsigned int)atoi(optarg);
+ break;
+ case 'P':
+ cdata.enable_queue_priorities = 1;
+ break;
+ case 'o':
+ cdata.queue_type = RTE_SCHED_TYPE_ORDERED;
+ break;
+ case 'p':
+ cdata.queue_type = RTE_SCHED_TYPE_PARALLEL;
+ break;
+ case 'a':
+ cdata.all_type_queues = 1;
+ break;
+ case 'q':
+ cdata.quiet = 1;
+ break;
+ case 'D':
+ cdata.dump_dev = 1;
+ break;
+ case 'w':
+ worker_lcore_mask = parse_coremask(optarg);
+ break;
+ case 'r':
+ rx_lcore_mask = parse_coremask(optarg);
+ popcnt = __builtin_popcountll(rx_lcore_mask);
+ fdata->rx_single = (popcnt == 1);
+ break;
+ case 't':
+ tx_lcore_mask = parse_coremask(optarg);
+ popcnt = __builtin_popcountll(tx_lcore_mask);
+ fdata->tx_single = (popcnt == 1);
+ break;
+ case 'e':
+ sched_lcore_mask = parse_coremask(optarg);
+ popcnt = __builtin_popcountll(sched_lcore_mask);
+ fdata->sched_single = (popcnt == 1);
+ break;
+ case 'm':
+ cdata.num_mbuf = (uint64_t)atol(optarg);
+ break;
+ default:
+ usage();
+ }
+ }
+
+ cdata.worker_lcore_mask = worker_lcore_mask;
+ cdata.sched_lcore_mask = sched_lcore_mask;
+ cdata.rx_lcore_mask = rx_lcore_mask;
+ cdata.tx_lcore_mask = tx_lcore_mask;
+
+ if (cdata.num_stages == 0 || cdata.num_stages > MAX_NUM_STAGES)
+ usage();
+
+ for (i = 0; i < MAX_NUM_CORE; i++) {
+ fdata->rx_core[i] = !!(rx_lcore_mask & (1UL << i));
+ fdata->tx_core[i] = !!(tx_lcore_mask & (1UL << i));
+ fdata->sched_core[i] = !!(sched_lcore_mask & (1UL << i));
+ fdata->worker_core[i] = !!(worker_lcore_mask & (1UL << i));
+
+ if (fdata->worker_core[i])
+ cdata.num_workers++;
+ if (core_in_use(i))
+ cdata.active_cores++;
+ }
+}
+
+/*
+ * Initializes a given port using global settings and with the RX buffers
+ * coming from the mbuf_pool passed as a parameter.
+ */
+static inline int
+port_init(uint8_t port, struct rte_mempool *mbuf_pool)
+{
+ static const struct rte_eth_conf port_conf_default = {
+ .rxmode = {
+ .mq_mode = ETH_MQ_RX_RSS,
+ .max_rx_pkt_len = ETHER_MAX_LEN,
+ },
+ .rx_adv_conf = {
+ .rss_conf = {
+ .rss_hf = ETH_RSS_IP |
+ ETH_RSS_TCP |
+ ETH_RSS_UDP,
+ }
+ }
+ };
+ const uint16_t rx_rings = 1, tx_rings = 1;
+ const uint16_t rx_ring_size = 512, tx_ring_size = 512;
+ struct rte_eth_conf port_conf = port_conf_default;
+ int retval;
+ uint16_t q;
+ struct rte_eth_dev_info dev_info;
+ struct rte_eth_txconf txconf;
+
+ if (!rte_eth_dev_is_valid_port(port))
+ return -1;
+
+ rte_eth_dev_info_get(port, &dev_info);
+ if (dev_info.tx_offload_capa & DEV_TX_OFFLOAD_MBUF_FAST_FREE)
+ port_conf.txmode.offloads |=
+ DEV_TX_OFFLOAD_MBUF_FAST_FREE;
+
+ port_conf.rx_adv_conf.rss_conf.rss_hf &=
+ dev_info.flow_type_rss_offloads;
+ if (port_conf.rx_adv_conf.rss_conf.rss_hf !=
+ port_conf_default.rx_adv_conf.rss_conf.rss_hf) {
+ printf("Port %u modified RSS hash function based on hardware support,"
+ "requested:%#"PRIx64" configured:%#"PRIx64"\n",
+ port,
+ port_conf_default.rx_adv_conf.rss_conf.rss_hf,
+ port_conf.rx_adv_conf.rss_conf.rss_hf);
+ }
+
+ /* Configure the Ethernet device. */
+ retval = rte_eth_dev_configure(port, rx_rings, tx_rings, &port_conf);
+ if (retval != 0)
+ return retval;
+
+ /* Allocate and set up 1 RX queue per Ethernet port. */
+ for (q = 0; q < rx_rings; q++) {
+ retval = rte_eth_rx_queue_setup(port, q, rx_ring_size,
+ rte_eth_dev_socket_id(port), NULL, mbuf_pool);
+ if (retval < 0)
+ return retval;
+ }
+
+ txconf = dev_info.default_txconf;
+ txconf.offloads = port_conf_default.txmode.offloads;
+ /* Allocate and set up 1 TX queue per Ethernet port. */
+ for (q = 0; q < tx_rings; q++) {
+ retval = rte_eth_tx_queue_setup(port, q, tx_ring_size,
+ rte_eth_dev_socket_id(port), &txconf);
+ if (retval < 0)
+ return retval;
+ }
+
+ /* Start the Ethernet port. */
+ retval = rte_eth_dev_start(port);
+ if (retval < 0)
+ return retval;
+
+ /* Display the port MAC address. */
+ struct ether_addr addr;
+ rte_eth_macaddr_get(port, &addr);
+ printf("Port %u MAC: %02" PRIx8 " %02" PRIx8 " %02" PRIx8
+ " %02" PRIx8 " %02" PRIx8 " %02" PRIx8 "\n",
+ (unsigned int)port,
+ addr.addr_bytes[0], addr.addr_bytes[1],
+ addr.addr_bytes[2], addr.addr_bytes[3],
+ addr.addr_bytes[4], addr.addr_bytes[5]);
+
+ /* Enable RX in promiscuous mode for the Ethernet device. */
+ rte_eth_promiscuous_enable(port);
+
+ return 0;
+}
+
+static int
+init_ports(uint16_t num_ports)
+{
+ uint16_t portid, i;
+
+ if (!cdata.num_mbuf)
+ cdata.num_mbuf = 16384 * num_ports;
+
+ struct rte_mempool *mp = rte_pktmbuf_pool_create("packet_pool",
+ /* mbufs */ cdata.num_mbuf,
+ /* cache_size */ 512,
+ /* priv_size*/ 0,
+ /* data_room_size */ RTE_MBUF_DEFAULT_BUF_SIZE,
+ rte_socket_id());
+
+ RTE_ETH_FOREACH_DEV(portid)
+ if (port_init(portid, mp) != 0)
+ rte_exit(EXIT_FAILURE, "Cannot init port %"PRIu16 "\n",
+ portid);
+
+ RTE_ETH_FOREACH_DEV(i) {
+ void *userdata = (void *)(uintptr_t) i;
+ fdata->tx_buf[i] =
+ rte_malloc(NULL, RTE_ETH_TX_BUFFER_SIZE(32), 0);
+ if (fdata->tx_buf[i] == NULL)
+ rte_panic("Out of memory\n");
+ rte_eth_tx_buffer_init(fdata->tx_buf[i], 32);
+ rte_eth_tx_buffer_set_err_callback(fdata->tx_buf[i],
+ eth_tx_buffer_retry,
+ userdata);
+ }
+
+ return 0;
+}
+
+static void
+do_capability_setup(uint8_t eventdev_id)
+{
+ uint16_t i;
+ uint8_t mt_unsafe = 0;
+ uint8_t burst = 0;
+
+ RTE_ETH_FOREACH_DEV(i) {
+ struct rte_eth_dev_info dev_info;
+ memset(&dev_info, 0, sizeof(struct rte_eth_dev_info));
+
+ rte_eth_dev_info_get(i, &dev_info);
+ /* Check if it is safe ask worker to tx. */
+ mt_unsafe |= !(dev_info.tx_offload_capa &
+ DEV_TX_OFFLOAD_MT_LOCKFREE);
+ }
+
+ struct rte_event_dev_info eventdev_info;
+ memset(&eventdev_info, 0, sizeof(struct rte_event_dev_info));
+
+ rte_event_dev_info_get(eventdev_id, &eventdev_info);
+ burst = eventdev_info.event_dev_cap & RTE_EVENT_DEV_CAP_BURST_MODE ? 1 :
+ 0;
+
+ if (mt_unsafe)
+ set_worker_generic_setup_data(&fdata->cap, burst);
+ else
+ set_worker_tx_setup_data(&fdata->cap, burst);
+}
+
+static void
+signal_handler(int signum)
+{
+ if (fdata->done)
+ rte_exit(1, "Exiting on signal %d\n", signum);
+ if (signum == SIGINT || signum == SIGTERM) {
+ printf("\n\nSignal %d received, preparing to exit...\n",
+ signum);
+ fdata->done = 1;
+ }
+ if (signum == SIGTSTP)
+ rte_event_dev_dump(0, stdout);
+}
+
+static inline uint64_t
+port_stat(int dev_id, int32_t p)
+{
+ char statname[64];
+ snprintf(statname, sizeof(statname), "port_%u_rx", p);
+ return rte_event_dev_xstats_by_name_get(dev_id, statname, NULL);
+}
+
+int
+main(int argc, char **argv)
+{
+ struct worker_data *worker_data;
+ uint16_t num_ports;
+ int lcore_id;
+ int err;
+
+ signal(SIGINT, signal_handler);
+ signal(SIGTERM, signal_handler);
+ signal(SIGTSTP, signal_handler);
+
+ err = rte_eal_init(argc, argv);
+ if (err < 0)
+ rte_panic("Invalid EAL arguments\n");
+
+ argc -= err;
+ argv += err;
+
+ fdata = rte_malloc(NULL, sizeof(struct fastpath_data), 0);
+ if (fdata == NULL)
+ rte_panic("Out of memory\n");
+
+ /* Parse cli options*/
+ parse_app_args(argc, argv);
+
+ num_ports = rte_eth_dev_count_avail();
+ if (num_ports == 0)
+ rte_panic("No ethernet ports found\n");
+
+ const unsigned int cores_needed = cdata.active_cores;
+
+ if (!cdata.quiet) {
+ printf(" Config:\n");
+ printf("\tports: %u\n", num_ports);
+ printf("\tworkers: %u\n", cdata.num_workers);
+ printf("\tpackets: %"PRIi64"\n", cdata.num_packets);
+ printf("\tQueue-prio: %u\n", cdata.enable_queue_priorities);
+ if (cdata.queue_type == RTE_SCHED_TYPE_ORDERED)
+ printf("\tqid0 type: ordered\n");
+ if (cdata.queue_type == RTE_SCHED_TYPE_ATOMIC)
+ printf("\tqid0 type: atomic\n");
+ printf("\tCores available: %u\n", rte_lcore_count());
+ printf("\tCores used: %u\n", cores_needed);
+ }
+
+ if (rte_lcore_count() < cores_needed)
+ rte_panic("Too few cores (%d < %d)\n", rte_lcore_count(),
+ cores_needed);
+
+ const unsigned int ndevs = rte_event_dev_count();
+ if (ndevs == 0)
+ rte_panic("No dev_id devs found. Pasl in a --vdev eventdev.\n");
+ if (ndevs > 1)
+ fprintf(stderr, "Warning: More than one eventdev, using idx 0");
+
+
+ do_capability_setup(0);
+ fdata->cap.check_opt();
+
+ worker_data = rte_calloc(0, cdata.num_workers,
+ sizeof(worker_data[0]), 0);
+ if (worker_data == NULL)
+ rte_panic("rte_calloc failed\n");
+
+ int dev_id = fdata->cap.evdev_setup(&cons_data, worker_data);
+ if (dev_id < 0)
+ rte_exit(EXIT_FAILURE, "Error setting up eventdev\n");
+
+ init_ports(num_ports);
+ fdata->cap.adptr_setup(num_ports);
+
+ int worker_idx = 0;
+ RTE_LCORE_FOREACH_SLAVE(lcore_id) {
+ if (lcore_id >= MAX_NUM_CORE)
+ break;
+
+ if (!fdata->rx_core[lcore_id] &&
+ !fdata->worker_core[lcore_id] &&
+ !fdata->tx_core[lcore_id] &&
+ !fdata->sched_core[lcore_id])
+ continue;
+
+ if (fdata->rx_core[lcore_id])
+ printf(
+ "[%s()] lcore %d executing NIC Rx\n",
+ __func__, lcore_id);
+
+ if (fdata->tx_core[lcore_id])
+ printf(
+ "[%s()] lcore %d executing NIC Tx, and using eventdev port %u\n",
+ __func__, lcore_id, cons_data.port_id);
+
+ if (fdata->sched_core[lcore_id])
+ printf("[%s()] lcore %d executing scheduler\n",
+ __func__, lcore_id);
+
+ if (fdata->worker_core[lcore_id])
+ printf(
+ "[%s()] lcore %d executing worker, using eventdev port %u\n",
+ __func__, lcore_id,
+ worker_data[worker_idx].port_id);
+
+ err = rte_eal_remote_launch(fdata->cap.worker,
+ &worker_data[worker_idx], lcore_id);
+ if (err) {
+ rte_panic("Failed to launch worker on core %d\n",
+ lcore_id);
+ continue;
+ }
+ if (fdata->worker_core[lcore_id])
+ worker_idx++;
+ }
+
+ lcore_id = rte_lcore_id();
+
+ if (core_in_use(lcore_id))
+ fdata->cap.worker(&worker_data[worker_idx++]);
+
+ rte_eal_mp_wait_lcore();
+
+ if (cdata.dump_dev)
+ rte_event_dev_dump(dev_id, stdout);
+
+ if (!cdata.quiet && (port_stat(dev_id, worker_data[0].port_id) !=
+ (uint64_t)-ENOTSUP)) {
+ printf("\nPort Workload distribution:\n");
+ uint32_t i;
+ uint64_t tot_pkts = 0;
+ uint64_t pkts_per_wkr[RTE_MAX_LCORE] = {0};
+ for (i = 0; i < cdata.num_workers; i++) {
+ pkts_per_wkr[i] =
+ port_stat(dev_id, worker_data[i].port_id);
+ tot_pkts += pkts_per_wkr[i];
+ }
+ for (i = 0; i < cdata.num_workers; i++) {
+ float pc = pkts_per_wkr[i] * 100 /
+ ((float)tot_pkts);
+ printf("worker %i :\t%.1f %% (%"PRIu64" pkts)\n",
+ i, pc, pkts_per_wkr[i]);
+ }
+
+ }
+
+ return 0;
+}
diff --git a/src/spdk/dpdk/examples/eventdev_pipeline/meson.build b/src/spdk/dpdk/examples/eventdev_pipeline/meson.build
new file mode 100644
index 00000000..0fc916b0
--- /dev/null
+++ b/src/spdk/dpdk/examples/eventdev_pipeline/meson.build
@@ -0,0 +1,15 @@
+# SPDX-License-Identifier: BSD-3-Clause
+# Copyright(c) 2017 Intel Corporation
+
+# meson file, for building this example as part of a main DPDK build.
+#
+# To build this example as a standalone application with an already-installed
+# DPDK instance, use 'make'
+
+deps += 'eventdev'
+allow_experimental_apis = true
+sources = files(
+ 'main.c',
+ 'pipeline_worker_generic.c',
+ 'pipeline_worker_tx.c'
+)
diff --git a/src/spdk/dpdk/examples/eventdev_pipeline/pipeline_common.h b/src/spdk/dpdk/examples/eventdev_pipeline/pipeline_common.h
new file mode 100644
index 00000000..9703396f
--- /dev/null
+++ b/src/spdk/dpdk/examples/eventdev_pipeline/pipeline_common.h
@@ -0,0 +1,153 @@
+/*
+ * SPDX-License-Identifier: BSD-3-Clause
+ * Copyright 2016 Intel Corporation.
+ * Copyright 2017 Cavium, Inc.
+ */
+
+#include <stdbool.h>
+
+#include <rte_eal.h>
+#include <rte_mempool.h>
+#include <rte_mbuf.h>
+#include <rte_launch.h>
+#include <rte_malloc.h>
+#include <rte_random.h>
+#include <rte_cycles.h>
+#include <rte_ethdev.h>
+#include <rte_eventdev.h>
+#include <rte_event_eth_rx_adapter.h>
+#include <rte_service.h>
+#include <rte_service_component.h>
+
+#define MAX_NUM_STAGES 8
+#define BATCH_SIZE 16
+#define MAX_NUM_CORE 64
+
+struct cons_data {
+ uint8_t dev_id;
+ uint8_t port_id;
+ uint8_t release;
+} __rte_cache_aligned;
+
+struct worker_data {
+ uint8_t dev_id;
+ uint8_t port_id;
+} __rte_cache_aligned;
+
+typedef int (*worker_loop)(void *);
+typedef int (*consumer_loop)(void);
+typedef void (*schedule_loop)(unsigned int);
+typedef int (*eventdev_setup)(struct cons_data *, struct worker_data *);
+typedef void (*rx_adapter_setup)(uint16_t nb_ports);
+typedef void (*opt_check)(void);
+
+struct setup_data {
+ worker_loop worker;
+ consumer_loop consumer;
+ schedule_loop scheduler;
+ eventdev_setup evdev_setup;
+ rx_adapter_setup adptr_setup;
+ opt_check check_opt;
+};
+
+struct fastpath_data {
+ volatile int done;
+ uint32_t tx_lock;
+ uint32_t evdev_service_id;
+ uint32_t rxadptr_service_id;
+ bool rx_single;
+ bool tx_single;
+ bool sched_single;
+ unsigned int rx_core[MAX_NUM_CORE];
+ unsigned int tx_core[MAX_NUM_CORE];
+ unsigned int sched_core[MAX_NUM_CORE];
+ unsigned int worker_core[MAX_NUM_CORE];
+ struct rte_eth_dev_tx_buffer *tx_buf[RTE_MAX_ETHPORTS];
+ struct setup_data cap;
+} __rte_cache_aligned;
+
+struct config_data {
+ unsigned int active_cores;
+ unsigned int num_workers;
+ int64_t num_packets;
+ uint64_t num_mbuf;
+ unsigned int num_fids;
+ int queue_type;
+ int worker_cycles;
+ int enable_queue_priorities;
+ int quiet;
+ int dump_dev;
+ int dump_dev_signal;
+ int all_type_queues;
+ unsigned int num_stages;
+ unsigned int worker_cq_depth;
+ unsigned int rx_stride;
+ /* Use rx stride value to reduce congestion in entry queue when using
+ * multiple eth ports by forming multiple event queue pipelines.
+ */
+ int16_t next_qid[MAX_NUM_STAGES+2];
+ int16_t qid[MAX_NUM_STAGES];
+ uint8_t rx_adapter_id;
+ uint64_t worker_lcore_mask;
+ uint64_t rx_lcore_mask;
+ uint64_t tx_lcore_mask;
+ uint64_t sched_lcore_mask;
+};
+
+struct port_link {
+ uint8_t queue_id;
+ uint8_t priority;
+};
+
+struct cons_data cons_data;
+
+struct fastpath_data *fdata;
+struct config_data cdata;
+
+static __rte_always_inline void
+exchange_mac(struct rte_mbuf *m)
+{
+ struct ether_hdr *eth;
+ struct ether_addr addr;
+
+ /* change mac addresses on packet (to use mbuf data) */
+ eth = rte_pktmbuf_mtod(m, struct ether_hdr *);
+ ether_addr_copy(&eth->d_addr, &addr);
+ ether_addr_copy(&addr, &eth->d_addr);
+}
+
+static __rte_always_inline void
+work(void)
+{
+ /* do a number of cycles of work per packet */
+ volatile uint64_t start_tsc = rte_rdtsc();
+ while (rte_rdtsc() < start_tsc + cdata.worker_cycles)
+ rte_pause();
+}
+
+static __rte_always_inline void
+schedule_devices(unsigned int lcore_id)
+{
+ if (fdata->rx_core[lcore_id]) {
+ rte_service_run_iter_on_app_lcore(fdata->rxadptr_service_id,
+ !fdata->rx_single);
+ }
+
+ if (fdata->sched_core[lcore_id]) {
+ rte_service_run_iter_on_app_lcore(fdata->evdev_service_id,
+ !fdata->sched_single);
+ if (cdata.dump_dev_signal) {
+ rte_event_dev_dump(0, stdout);
+ cdata.dump_dev_signal = 0;
+ }
+ }
+
+ if (fdata->tx_core[lcore_id] && (fdata->tx_single ||
+ rte_atomic32_cmpset(&(fdata->tx_lock), 0, 1))) {
+ fdata->cap.consumer();
+ rte_atomic32_clear((rte_atomic32_t *)&(fdata->tx_lock));
+ }
+}
+
+void set_worker_generic_setup_data(struct setup_data *caps, bool burst);
+void set_worker_tx_setup_data(struct setup_data *caps, bool burst);
diff --git a/src/spdk/dpdk/examples/eventdev_pipeline/pipeline_worker_generic.c b/src/spdk/dpdk/examples/eventdev_pipeline/pipeline_worker_generic.c
new file mode 100644
index 00000000..2215e9eb
--- /dev/null
+++ b/src/spdk/dpdk/examples/eventdev_pipeline/pipeline_worker_generic.c
@@ -0,0 +1,568 @@
+/*
+ * SPDX-License-Identifier: BSD-3-Clause
+ * Copyright 2016 Intel Corporation.
+ * Copyright 2017 Cavium, Inc.
+ */
+
+#include "pipeline_common.h"
+
+static __rte_always_inline int
+worker_generic(void *arg)
+{
+ struct rte_event ev;
+
+ struct worker_data *data = (struct worker_data *)arg;
+ uint8_t dev_id = data->dev_id;
+ uint8_t port_id = data->port_id;
+ size_t sent = 0, received = 0;
+ unsigned int lcore_id = rte_lcore_id();
+
+ while (!fdata->done) {
+
+ if (fdata->cap.scheduler)
+ fdata->cap.scheduler(lcore_id);
+
+ if (!fdata->worker_core[lcore_id]) {
+ rte_pause();
+ continue;
+ }
+
+ const uint16_t nb_rx = rte_event_dequeue_burst(dev_id, port_id,
+ &ev, 1, 0);
+
+ if (nb_rx == 0) {
+ rte_pause();
+ continue;
+ }
+ received++;
+
+ /* The first worker stage does classification */
+ if (ev.queue_id == cdata.qid[0])
+ ev.flow_id = ev.mbuf->hash.rss
+ % cdata.num_fids;
+
+ ev.queue_id = cdata.next_qid[ev.queue_id];
+ ev.op = RTE_EVENT_OP_FORWARD;
+ ev.sched_type = cdata.queue_type;
+
+ work();
+
+ while (rte_event_enqueue_burst(dev_id, port_id, &ev, 1) != 1)
+ rte_pause();
+ sent++;
+ }
+
+ if (!cdata.quiet)
+ printf(" worker %u thread done. RX=%zu TX=%zu\n",
+ rte_lcore_id(), received, sent);
+
+ return 0;
+}
+
+static int
+worker_generic_burst(void *arg)
+{
+ struct rte_event events[BATCH_SIZE];
+
+ struct worker_data *data = (struct worker_data *)arg;
+ uint8_t dev_id = data->dev_id;
+ uint8_t port_id = data->port_id;
+ size_t sent = 0, received = 0;
+ unsigned int lcore_id = rte_lcore_id();
+
+ while (!fdata->done) {
+ uint16_t i;
+
+ if (fdata->cap.scheduler)
+ fdata->cap.scheduler(lcore_id);
+
+ if (!fdata->worker_core[lcore_id]) {
+ rte_pause();
+ continue;
+ }
+
+ const uint16_t nb_rx = rte_event_dequeue_burst(dev_id, port_id,
+ events, RTE_DIM(events), 0);
+
+ if (nb_rx == 0) {
+ rte_pause();
+ continue;
+ }
+ received += nb_rx;
+
+ for (i = 0; i < nb_rx; i++) {
+
+ /* The first worker stage does classification */
+ if (events[i].queue_id == cdata.qid[0])
+ events[i].flow_id = events[i].mbuf->hash.rss
+ % cdata.num_fids;
+
+ events[i].queue_id = cdata.next_qid[events[i].queue_id];
+ events[i].op = RTE_EVENT_OP_FORWARD;
+ events[i].sched_type = cdata.queue_type;
+
+ work();
+ }
+ uint16_t nb_tx = rte_event_enqueue_burst(dev_id, port_id,
+ events, nb_rx);
+ while (nb_tx < nb_rx && !fdata->done)
+ nb_tx += rte_event_enqueue_burst(dev_id, port_id,
+ events + nb_tx,
+ nb_rx - nb_tx);
+ sent += nb_tx;
+ }
+
+ if (!cdata.quiet)
+ printf(" worker %u thread done. RX=%zu TX=%zu\n",
+ rte_lcore_id(), received, sent);
+
+ return 0;
+}
+
+static __rte_always_inline int
+consumer(void)
+{
+ const uint64_t freq_khz = rte_get_timer_hz() / 1000;
+ struct rte_event packet;
+
+ static uint64_t received;
+ static uint64_t last_pkts;
+ static uint64_t last_time;
+ static uint64_t start_time;
+ int i;
+ uint8_t dev_id = cons_data.dev_id;
+ uint8_t port_id = cons_data.port_id;
+
+ do {
+ uint16_t n = rte_event_dequeue_burst(dev_id, port_id,
+ &packet, 1, 0);
+
+ if (n == 0) {
+ RTE_ETH_FOREACH_DEV(i)
+ rte_eth_tx_buffer_flush(i, 0, fdata->tx_buf[i]);
+ return 0;
+ }
+ if (start_time == 0)
+ last_time = start_time = rte_get_timer_cycles();
+
+ received++;
+ uint8_t outport = packet.mbuf->port;
+
+ exchange_mac(packet.mbuf);
+ rte_eth_tx_buffer(outport, 0, fdata->tx_buf[outport],
+ packet.mbuf);
+
+ if (cons_data.release)
+ rte_event_enqueue_burst(dev_id, port_id,
+ &packet, n);
+
+ /* Print out mpps every 1<22 packets */
+ if (!cdata.quiet && received >= last_pkts + (1<<22)) {
+ const uint64_t now = rte_get_timer_cycles();
+ const uint64_t total_ms = (now - start_time) / freq_khz;
+ const uint64_t delta_ms = (now - last_time) / freq_khz;
+ uint64_t delta_pkts = received - last_pkts;
+
+ printf("# %s RX=%"PRIu64", time %"PRIu64 "ms, "
+ "avg %.3f mpps [current %.3f mpps]\n",
+ __func__,
+ received,
+ total_ms,
+ received / (total_ms * 1000.0),
+ delta_pkts / (delta_ms * 1000.0));
+ last_pkts = received;
+ last_time = now;
+ }
+
+ cdata.num_packets--;
+ if (cdata.num_packets <= 0)
+ fdata->done = 1;
+ /* Be stuck in this loop if single. */
+ } while (!fdata->done && fdata->tx_single);
+
+ return 0;
+}
+
+static __rte_always_inline int
+consumer_burst(void)
+{
+ const uint64_t freq_khz = rte_get_timer_hz() / 1000;
+ struct rte_event packets[BATCH_SIZE];
+
+ static uint64_t received;
+ static uint64_t last_pkts;
+ static uint64_t last_time;
+ static uint64_t start_time;
+ unsigned int i, j;
+ uint8_t dev_id = cons_data.dev_id;
+ uint8_t port_id = cons_data.port_id;
+
+ do {
+ uint16_t n = rte_event_dequeue_burst(dev_id, port_id,
+ packets, RTE_DIM(packets), 0);
+
+ if (n == 0) {
+ RTE_ETH_FOREACH_DEV(j)
+ rte_eth_tx_buffer_flush(j, 0, fdata->tx_buf[j]);
+ return 0;
+ }
+ if (start_time == 0)
+ last_time = start_time = rte_get_timer_cycles();
+
+ received += n;
+ for (i = 0; i < n; i++) {
+ uint8_t outport = packets[i].mbuf->port;
+
+ exchange_mac(packets[i].mbuf);
+ rte_eth_tx_buffer(outport, 0, fdata->tx_buf[outport],
+ packets[i].mbuf);
+
+ packets[i].op = RTE_EVENT_OP_RELEASE;
+ }
+
+ if (cons_data.release) {
+ uint16_t nb_tx;
+
+ nb_tx = rte_event_enqueue_burst(dev_id, port_id,
+ packets, n);
+ while (nb_tx < n)
+ nb_tx += rte_event_enqueue_burst(dev_id,
+ port_id, packets + nb_tx,
+ n - nb_tx);
+ }
+
+ /* Print out mpps every 1<22 packets */
+ if (!cdata.quiet && received >= last_pkts + (1<<22)) {
+ const uint64_t now = rte_get_timer_cycles();
+ const uint64_t total_ms = (now - start_time) / freq_khz;
+ const uint64_t delta_ms = (now - last_time) / freq_khz;
+ uint64_t delta_pkts = received - last_pkts;
+
+ printf("# consumer RX=%"PRIu64", time %"PRIu64 "ms, "
+ "avg %.3f mpps [current %.3f mpps]\n",
+ received,
+ total_ms,
+ received / (total_ms * 1000.0),
+ delta_pkts / (delta_ms * 1000.0));
+ last_pkts = received;
+ last_time = now;
+ }
+
+ cdata.num_packets -= n;
+ if (cdata.num_packets <= 0)
+ fdata->done = 1;
+ /* Be stuck in this loop if single. */
+ } while (!fdata->done && fdata->tx_single);
+
+ return 0;
+}
+
+static int
+setup_eventdev_generic(struct cons_data *cons_data,
+ struct worker_data *worker_data)
+{
+ const uint8_t dev_id = 0;
+ /* +1 stages is for a SINGLE_LINK TX stage */
+ const uint8_t nb_queues = cdata.num_stages + 1;
+ /* + 1 is one port for consumer */
+ const uint8_t nb_ports = cdata.num_workers + 1;
+ struct rte_event_dev_config config = {
+ .nb_event_queues = nb_queues,
+ .nb_event_ports = nb_ports,
+ .nb_events_limit = 4096,
+ .nb_event_queue_flows = 1024,
+ .nb_event_port_dequeue_depth = 128,
+ .nb_event_port_enqueue_depth = 128,
+ };
+ struct rte_event_port_conf wkr_p_conf = {
+ .dequeue_depth = cdata.worker_cq_depth,
+ .enqueue_depth = 64,
+ .new_event_threshold = 4096,
+ };
+ struct rte_event_queue_conf wkr_q_conf = {
+ .schedule_type = cdata.queue_type,
+ .priority = RTE_EVENT_DEV_PRIORITY_NORMAL,
+ .nb_atomic_flows = 1024,
+ .nb_atomic_order_sequences = 1024,
+ };
+ struct rte_event_port_conf tx_p_conf = {
+ .dequeue_depth = 128,
+ .enqueue_depth = 128,
+ .new_event_threshold = 4096,
+ };
+ struct rte_event_queue_conf tx_q_conf = {
+ .priority = RTE_EVENT_DEV_PRIORITY_HIGHEST,
+ .event_queue_cfg = RTE_EVENT_QUEUE_CFG_SINGLE_LINK,
+ };
+
+ struct port_link worker_queues[MAX_NUM_STAGES];
+ uint8_t disable_implicit_release;
+ struct port_link tx_queue;
+ unsigned int i;
+
+ int ret, ndev = rte_event_dev_count();
+ if (ndev < 1) {
+ printf("%d: No Eventdev Devices Found\n", __LINE__);
+ return -1;
+ }
+
+ struct rte_event_dev_info dev_info;
+ ret = rte_event_dev_info_get(dev_id, &dev_info);
+ printf("\tEventdev %d: %s\n", dev_id, dev_info.driver_name);
+
+ disable_implicit_release = (dev_info.event_dev_cap &
+ RTE_EVENT_DEV_CAP_IMPLICIT_RELEASE_DISABLE);
+
+ wkr_p_conf.disable_implicit_release = disable_implicit_release;
+ tx_p_conf.disable_implicit_release = disable_implicit_release;
+
+ if (dev_info.max_event_port_dequeue_depth <
+ config.nb_event_port_dequeue_depth)
+ config.nb_event_port_dequeue_depth =
+ dev_info.max_event_port_dequeue_depth;
+ if (dev_info.max_event_port_enqueue_depth <
+ config.nb_event_port_enqueue_depth)
+ config.nb_event_port_enqueue_depth =
+ dev_info.max_event_port_enqueue_depth;
+
+ ret = rte_event_dev_configure(dev_id, &config);
+ if (ret < 0) {
+ printf("%d: Error configuring device\n", __LINE__);
+ return -1;
+ }
+
+ /* Q creation - one load balanced per pipeline stage*/
+ printf(" Stages:\n");
+ for (i = 0; i < cdata.num_stages; i++) {
+ if (rte_event_queue_setup(dev_id, i, &wkr_q_conf) < 0) {
+ printf("%d: error creating qid %d\n", __LINE__, i);
+ return -1;
+ }
+ cdata.qid[i] = i;
+ cdata.next_qid[i] = i+1;
+ worker_queues[i].queue_id = i;
+ if (cdata.enable_queue_priorities) {
+ /* calculate priority stepping for each stage, leaving
+ * headroom of 1 for the SINGLE_LINK TX below
+ */
+ const uint32_t prio_delta =
+ (RTE_EVENT_DEV_PRIORITY_LOWEST-1) / nb_queues;
+
+ /* higher priority for queues closer to tx */
+ wkr_q_conf.priority =
+ RTE_EVENT_DEV_PRIORITY_LOWEST - prio_delta * i;
+ }
+
+ const char *type_str = "Atomic";
+ switch (wkr_q_conf.schedule_type) {
+ case RTE_SCHED_TYPE_ORDERED:
+ type_str = "Ordered";
+ break;
+ case RTE_SCHED_TYPE_PARALLEL:
+ type_str = "Parallel";
+ break;
+ }
+ printf("\tStage %d, Type %s\tPriority = %d\n", i, type_str,
+ wkr_q_conf.priority);
+ }
+ printf("\n");
+
+ /* final queue for sending to TX core */
+ if (rte_event_queue_setup(dev_id, i, &tx_q_conf) < 0) {
+ printf("%d: error creating qid %d\n", __LINE__, i);
+ return -1;
+ }
+ tx_queue.queue_id = i;
+ tx_queue.priority = RTE_EVENT_DEV_PRIORITY_HIGHEST;
+
+ if (wkr_p_conf.dequeue_depth > config.nb_event_port_dequeue_depth)
+ wkr_p_conf.dequeue_depth = config.nb_event_port_dequeue_depth;
+ if (wkr_p_conf.enqueue_depth > config.nb_event_port_enqueue_depth)
+ wkr_p_conf.enqueue_depth = config.nb_event_port_enqueue_depth;
+
+ /* set up one port per worker, linking to all stage queues */
+ for (i = 0; i < cdata.num_workers; i++) {
+ struct worker_data *w = &worker_data[i];
+ w->dev_id = dev_id;
+ if (rte_event_port_setup(dev_id, i, &wkr_p_conf) < 0) {
+ printf("Error setting up port %d\n", i);
+ return -1;
+ }
+
+ uint32_t s;
+ for (s = 0; s < cdata.num_stages; s++) {
+ if (rte_event_port_link(dev_id, i,
+ &worker_queues[s].queue_id,
+ &worker_queues[s].priority,
+ 1) != 1) {
+ printf("%d: error creating link for port %d\n",
+ __LINE__, i);
+ return -1;
+ }
+ }
+ w->port_id = i;
+ }
+
+ if (tx_p_conf.dequeue_depth > config.nb_event_port_dequeue_depth)
+ tx_p_conf.dequeue_depth = config.nb_event_port_dequeue_depth;
+ if (tx_p_conf.enqueue_depth > config.nb_event_port_enqueue_depth)
+ tx_p_conf.enqueue_depth = config.nb_event_port_enqueue_depth;
+
+ /* port for consumer, linked to TX queue */
+ if (rte_event_port_setup(dev_id, i, &tx_p_conf) < 0) {
+ printf("Error setting up port %d\n", i);
+ return -1;
+ }
+ if (rte_event_port_link(dev_id, i, &tx_queue.queue_id,
+ &tx_queue.priority, 1) != 1) {
+ printf("%d: error creating link for port %d\n",
+ __LINE__, i);
+ return -1;
+ }
+ *cons_data = (struct cons_data){.dev_id = dev_id,
+ .port_id = i,
+ .release = disable_implicit_release };
+
+ ret = rte_event_dev_service_id_get(dev_id,
+ &fdata->evdev_service_id);
+ if (ret != -ESRCH && ret != 0) {
+ printf("Error getting the service ID for sw eventdev\n");
+ return -1;
+ }
+ rte_service_runstate_set(fdata->evdev_service_id, 1);
+ rte_service_set_runstate_mapped_check(fdata->evdev_service_id, 0);
+ if (rte_event_dev_start(dev_id) < 0) {
+ printf("Error starting eventdev\n");
+ return -1;
+ }
+
+ return dev_id;
+}
+
+static void
+init_rx_adapter(uint16_t nb_ports)
+{
+ int i;
+ int ret;
+ uint8_t evdev_id = 0;
+ struct rte_event_dev_info dev_info;
+
+ ret = rte_event_dev_info_get(evdev_id, &dev_info);
+
+ struct rte_event_port_conf rx_p_conf = {
+ .dequeue_depth = 8,
+ .enqueue_depth = 8,
+ .new_event_threshold = 1200,
+ };
+
+ if (rx_p_conf.dequeue_depth > dev_info.max_event_port_dequeue_depth)
+ rx_p_conf.dequeue_depth = dev_info.max_event_port_dequeue_depth;
+ if (rx_p_conf.enqueue_depth > dev_info.max_event_port_enqueue_depth)
+ rx_p_conf.enqueue_depth = dev_info.max_event_port_enqueue_depth;
+
+ /* Create one adapter for all the ethernet ports. */
+ ret = rte_event_eth_rx_adapter_create(cdata.rx_adapter_id, evdev_id,
+ &rx_p_conf);
+ if (ret)
+ rte_exit(EXIT_FAILURE, "failed to create rx adapter[%d]",
+ cdata.rx_adapter_id);
+
+ struct rte_event_eth_rx_adapter_queue_conf queue_conf;
+ memset(&queue_conf, 0, sizeof(queue_conf));
+ queue_conf.ev.sched_type = cdata.queue_type;
+ queue_conf.ev.queue_id = cdata.qid[0];
+
+ for (i = 0; i < nb_ports; i++) {
+ uint32_t cap;
+
+ ret = rte_event_eth_rx_adapter_caps_get(evdev_id, i, &cap);
+ if (ret)
+ rte_exit(EXIT_FAILURE,
+ "failed to get event rx adapter "
+ "capabilities");
+
+ ret = rte_event_eth_rx_adapter_queue_add(cdata.rx_adapter_id, i,
+ -1, &queue_conf);
+ if (ret)
+ rte_exit(EXIT_FAILURE,
+ "Failed to add queues to Rx adapter");
+ }
+
+ ret = rte_event_eth_rx_adapter_service_id_get(cdata.rx_adapter_id,
+ &fdata->rxadptr_service_id);
+ if (ret != -ESRCH && ret != 0) {
+ rte_exit(EXIT_FAILURE,
+ "Error getting the service ID for sw eventdev\n");
+ }
+ rte_service_runstate_set(fdata->rxadptr_service_id, 1);
+ rte_service_set_runstate_mapped_check(fdata->rxadptr_service_id, 0);
+
+ ret = rte_event_eth_rx_adapter_start(cdata.rx_adapter_id);
+ if (ret)
+ rte_exit(EXIT_FAILURE, "Rx adapter[%d] start failed",
+ cdata.rx_adapter_id);
+}
+
+static void
+generic_opt_check(void)
+{
+ int i;
+ int ret;
+ uint32_t cap = 0;
+ uint8_t rx_needed = 0;
+ struct rte_event_dev_info eventdev_info;
+
+ memset(&eventdev_info, 0, sizeof(struct rte_event_dev_info));
+ rte_event_dev_info_get(0, &eventdev_info);
+
+ if (cdata.all_type_queues && !(eventdev_info.event_dev_cap &
+ RTE_EVENT_DEV_CAP_QUEUE_ALL_TYPES))
+ rte_exit(EXIT_FAILURE,
+ "Event dev doesn't support all type queues\n");
+
+ RTE_ETH_FOREACH_DEV(i) {
+ ret = rte_event_eth_rx_adapter_caps_get(0, i, &cap);
+ if (ret)
+ rte_exit(EXIT_FAILURE,
+ "failed to get event rx adapter capabilities");
+ rx_needed |=
+ !(cap & RTE_EVENT_ETH_RX_ADAPTER_CAP_INTERNAL_PORT);
+ }
+
+ if (cdata.worker_lcore_mask == 0 ||
+ (rx_needed && cdata.rx_lcore_mask == 0) ||
+ cdata.tx_lcore_mask == 0 || (cdata.sched_lcore_mask == 0
+ && !(eventdev_info.event_dev_cap &
+ RTE_EVENT_DEV_CAP_DISTRIBUTED_SCHED))) {
+ printf("Core part of pipeline was not assigned any cores. "
+ "This will stall the pipeline, please check core masks "
+ "(use -h for details on setting core masks):\n"
+ "\trx: %"PRIu64"\n\ttx: %"PRIu64"\n\tsched: %"PRIu64
+ "\n\tworkers: %"PRIu64"\n",
+ cdata.rx_lcore_mask, cdata.tx_lcore_mask,
+ cdata.sched_lcore_mask,
+ cdata.worker_lcore_mask);
+ rte_exit(-1, "Fix core masks\n");
+ }
+
+ if (eventdev_info.event_dev_cap & RTE_EVENT_DEV_CAP_DISTRIBUTED_SCHED)
+ memset(fdata->sched_core, 0,
+ sizeof(unsigned int) * MAX_NUM_CORE);
+}
+
+void
+set_worker_generic_setup_data(struct setup_data *caps, bool burst)
+{
+ if (burst) {
+ caps->consumer = consumer_burst;
+ caps->worker = worker_generic_burst;
+ } else {
+ caps->consumer = consumer;
+ caps->worker = worker_generic;
+ }
+
+ caps->adptr_setup = init_rx_adapter;
+ caps->scheduler = schedule_devices;
+ caps->evdev_setup = setup_eventdev_generic;
+ caps->check_opt = generic_opt_check;
+}
diff --git a/src/spdk/dpdk/examples/eventdev_pipeline/pipeline_worker_tx.c b/src/spdk/dpdk/examples/eventdev_pipeline/pipeline_worker_tx.c
new file mode 100644
index 00000000..3dbde92d
--- /dev/null
+++ b/src/spdk/dpdk/examples/eventdev_pipeline/pipeline_worker_tx.c
@@ -0,0 +1,838 @@
+/*
+ * SPDX-License-Identifier: BSD-3-Clause
+ * Copyright(c) 2010-2014 Intel Corporation
+ * Copyright 2017 Cavium, Inc.
+ */
+
+#include "pipeline_common.h"
+
+static __rte_always_inline void
+worker_fwd_event(struct rte_event *ev, uint8_t sched)
+{
+ ev->event_type = RTE_EVENT_TYPE_CPU;
+ ev->op = RTE_EVENT_OP_FORWARD;
+ ev->sched_type = sched;
+}
+
+static __rte_always_inline void
+worker_event_enqueue(const uint8_t dev, const uint8_t port,
+ struct rte_event *ev)
+{
+ while (rte_event_enqueue_burst(dev, port, ev, 1) != 1)
+ rte_pause();
+}
+
+static __rte_always_inline void
+worker_event_enqueue_burst(const uint8_t dev, const uint8_t port,
+ struct rte_event *ev, const uint16_t nb_rx)
+{
+ uint16_t enq;
+
+ enq = rte_event_enqueue_burst(dev, port, ev, nb_rx);
+ while (enq < nb_rx) {
+ enq += rte_event_enqueue_burst(dev, port,
+ ev + enq, nb_rx - enq);
+ }
+}
+
+static __rte_always_inline void
+worker_tx_pkt(struct rte_mbuf *mbuf)
+{
+ exchange_mac(mbuf);
+ while (rte_eth_tx_burst(mbuf->port, 0, &mbuf, 1) != 1)
+ rte_pause();
+}
+
+/* Single stage pipeline workers */
+
+static int
+worker_do_tx_single(void *arg)
+{
+ struct worker_data *data = (struct worker_data *)arg;
+ const uint8_t dev = data->dev_id;
+ const uint8_t port = data->port_id;
+ size_t fwd = 0, received = 0, tx = 0;
+ struct rte_event ev;
+
+ while (!fdata->done) {
+
+ if (!rte_event_dequeue_burst(dev, port, &ev, 1, 0)) {
+ rte_pause();
+ continue;
+ }
+
+ received++;
+
+ if (ev.sched_type == RTE_SCHED_TYPE_ATOMIC) {
+ worker_tx_pkt(ev.mbuf);
+ tx++;
+ continue;
+ }
+ work();
+ ev.queue_id++;
+ worker_fwd_event(&ev, RTE_SCHED_TYPE_ATOMIC);
+ worker_event_enqueue(dev, port, &ev);
+ fwd++;
+ }
+
+ if (!cdata.quiet)
+ printf(" worker %u thread done. RX=%zu FWD=%zu TX=%zu\n",
+ rte_lcore_id(), received, fwd, tx);
+ return 0;
+}
+
+static int
+worker_do_tx_single_atq(void *arg)
+{
+ struct worker_data *data = (struct worker_data *)arg;
+ const uint8_t dev = data->dev_id;
+ const uint8_t port = data->port_id;
+ size_t fwd = 0, received = 0, tx = 0;
+ struct rte_event ev;
+
+ while (!fdata->done) {
+
+ if (!rte_event_dequeue_burst(dev, port, &ev, 1, 0)) {
+ rte_pause();
+ continue;
+ }
+
+ received++;
+
+ if (ev.sched_type == RTE_SCHED_TYPE_ATOMIC) {
+ worker_tx_pkt(ev.mbuf);
+ tx++;
+ continue;
+ }
+ work();
+ worker_fwd_event(&ev, RTE_SCHED_TYPE_ATOMIC);
+ worker_event_enqueue(dev, port, &ev);
+ fwd++;
+ }
+
+ if (!cdata.quiet)
+ printf(" worker %u thread done. RX=%zu FWD=%zu TX=%zu\n",
+ rte_lcore_id(), received, fwd, tx);
+ return 0;
+}
+
+static int
+worker_do_tx_single_burst(void *arg)
+{
+ struct rte_event ev[BATCH_SIZE + 1];
+
+ struct worker_data *data = (struct worker_data *)arg;
+ const uint8_t dev = data->dev_id;
+ const uint8_t port = data->port_id;
+ size_t fwd = 0, received = 0, tx = 0;
+
+ while (!fdata->done) {
+ uint16_t i;
+ uint16_t nb_rx = rte_event_dequeue_burst(dev, port, ev,
+ BATCH_SIZE, 0);
+
+ if (!nb_rx) {
+ rte_pause();
+ continue;
+ }
+ received += nb_rx;
+
+ for (i = 0; i < nb_rx; i++) {
+ rte_prefetch0(ev[i + 1].mbuf);
+ if (ev[i].sched_type == RTE_SCHED_TYPE_ATOMIC) {
+
+ worker_tx_pkt(ev[i].mbuf);
+ ev[i].op = RTE_EVENT_OP_RELEASE;
+ tx++;
+
+ } else {
+ ev[i].queue_id++;
+ worker_fwd_event(&ev[i], RTE_SCHED_TYPE_ATOMIC);
+ }
+ work();
+ }
+
+ worker_event_enqueue_burst(dev, port, ev, nb_rx);
+ fwd += nb_rx;
+ }
+
+ if (!cdata.quiet)
+ printf(" worker %u thread done. RX=%zu FWD=%zu TX=%zu\n",
+ rte_lcore_id(), received, fwd, tx);
+ return 0;
+}
+
+static int
+worker_do_tx_single_burst_atq(void *arg)
+{
+ struct rte_event ev[BATCH_SIZE + 1];
+
+ struct worker_data *data = (struct worker_data *)arg;
+ const uint8_t dev = data->dev_id;
+ const uint8_t port = data->port_id;
+ size_t fwd = 0, received = 0, tx = 0;
+
+ while (!fdata->done) {
+ uint16_t i;
+ uint16_t nb_rx = rte_event_dequeue_burst(dev, port, ev,
+ BATCH_SIZE, 0);
+
+ if (!nb_rx) {
+ rte_pause();
+ continue;
+ }
+
+ received += nb_rx;
+
+ for (i = 0; i < nb_rx; i++) {
+ rte_prefetch0(ev[i + 1].mbuf);
+ if (ev[i].sched_type == RTE_SCHED_TYPE_ATOMIC) {
+
+ worker_tx_pkt(ev[i].mbuf);
+ ev[i].op = RTE_EVENT_OP_RELEASE;
+ tx++;
+ } else
+ worker_fwd_event(&ev[i], RTE_SCHED_TYPE_ATOMIC);
+ work();
+ }
+
+ worker_event_enqueue_burst(dev, port, ev, nb_rx);
+ fwd += nb_rx;
+ }
+
+ if (!cdata.quiet)
+ printf(" worker %u thread done. RX=%zu FWD=%zu TX=%zu\n",
+ rte_lcore_id(), received, fwd, tx);
+ return 0;
+}
+
+/* Multi stage Pipeline Workers */
+
+static int
+worker_do_tx(void *arg)
+{
+ struct rte_event ev;
+
+ struct worker_data *data = (struct worker_data *)arg;
+ const uint8_t dev = data->dev_id;
+ const uint8_t port = data->port_id;
+ const uint8_t lst_qid = cdata.num_stages - 1;
+ size_t fwd = 0, received = 0, tx = 0;
+
+
+ while (!fdata->done) {
+
+ if (!rte_event_dequeue_burst(dev, port, &ev, 1, 0)) {
+ rte_pause();
+ continue;
+ }
+
+ received++;
+ const uint8_t cq_id = ev.queue_id % cdata.num_stages;
+
+ if (cq_id >= lst_qid) {
+ if (ev.sched_type == RTE_SCHED_TYPE_ATOMIC) {
+ worker_tx_pkt(ev.mbuf);
+ tx++;
+ continue;
+ }
+
+ worker_fwd_event(&ev, RTE_SCHED_TYPE_ATOMIC);
+ ev.queue_id = (cq_id == lst_qid) ?
+ cdata.next_qid[ev.queue_id] : ev.queue_id;
+ } else {
+ ev.queue_id = cdata.next_qid[ev.queue_id];
+ worker_fwd_event(&ev, cdata.queue_type);
+ }
+ work();
+
+ worker_event_enqueue(dev, port, &ev);
+ fwd++;
+ }
+
+ if (!cdata.quiet)
+ printf(" worker %u thread done. RX=%zu FWD=%zu TX=%zu\n",
+ rte_lcore_id(), received, fwd, tx);
+
+ return 0;
+}
+
+static int
+worker_do_tx_atq(void *arg)
+{
+ struct rte_event ev;
+
+ struct worker_data *data = (struct worker_data *)arg;
+ const uint8_t dev = data->dev_id;
+ const uint8_t port = data->port_id;
+ const uint8_t lst_qid = cdata.num_stages - 1;
+ size_t fwd = 0, received = 0, tx = 0;
+
+ while (!fdata->done) {
+
+ if (!rte_event_dequeue_burst(dev, port, &ev, 1, 0)) {
+ rte_pause();
+ continue;
+ }
+
+ received++;
+ const uint8_t cq_id = ev.sub_event_type % cdata.num_stages;
+
+ if (cq_id == lst_qid) {
+ if (ev.sched_type == RTE_SCHED_TYPE_ATOMIC) {
+ worker_tx_pkt(ev.mbuf);
+ tx++;
+ continue;
+ }
+
+ worker_fwd_event(&ev, RTE_SCHED_TYPE_ATOMIC);
+ } else {
+ ev.sub_event_type++;
+ worker_fwd_event(&ev, cdata.queue_type);
+ }
+ work();
+
+ worker_event_enqueue(dev, port, &ev);
+ fwd++;
+ }
+
+ if (!cdata.quiet)
+ printf(" worker %u thread done. RX=%zu FWD=%zu TX=%zu\n",
+ rte_lcore_id(), received, fwd, tx);
+
+ return 0;
+}
+
+static int
+worker_do_tx_burst(void *arg)
+{
+ struct rte_event ev[BATCH_SIZE];
+
+ struct worker_data *data = (struct worker_data *)arg;
+ uint8_t dev = data->dev_id;
+ uint8_t port = data->port_id;
+ uint8_t lst_qid = cdata.num_stages - 1;
+ size_t fwd = 0, received = 0, tx = 0;
+
+ while (!fdata->done) {
+ uint16_t i;
+ const uint16_t nb_rx = rte_event_dequeue_burst(dev, port,
+ ev, BATCH_SIZE, 0);
+
+ if (nb_rx == 0) {
+ rte_pause();
+ continue;
+ }
+ received += nb_rx;
+
+ for (i = 0; i < nb_rx; i++) {
+ const uint8_t cq_id = ev[i].queue_id % cdata.num_stages;
+
+ if (cq_id >= lst_qid) {
+ if (ev[i].sched_type == RTE_SCHED_TYPE_ATOMIC) {
+ worker_tx_pkt(ev[i].mbuf);
+ tx++;
+ ev[i].op = RTE_EVENT_OP_RELEASE;
+ continue;
+ }
+ ev[i].queue_id = (cq_id == lst_qid) ?
+ cdata.next_qid[ev[i].queue_id] :
+ ev[i].queue_id;
+
+ worker_fwd_event(&ev[i], RTE_SCHED_TYPE_ATOMIC);
+ } else {
+ ev[i].queue_id = cdata.next_qid[ev[i].queue_id];
+ worker_fwd_event(&ev[i], cdata.queue_type);
+ }
+ work();
+ }
+ worker_event_enqueue_burst(dev, port, ev, nb_rx);
+
+ fwd += nb_rx;
+ }
+
+ if (!cdata.quiet)
+ printf(" worker %u thread done. RX=%zu FWD=%zu TX=%zu\n",
+ rte_lcore_id(), received, fwd, tx);
+
+ return 0;
+}
+
+static int
+worker_do_tx_burst_atq(void *arg)
+{
+ struct rte_event ev[BATCH_SIZE];
+
+ struct worker_data *data = (struct worker_data *)arg;
+ uint8_t dev = data->dev_id;
+ uint8_t port = data->port_id;
+ uint8_t lst_qid = cdata.num_stages - 1;
+ size_t fwd = 0, received = 0, tx = 0;
+
+ while (!fdata->done) {
+ uint16_t i;
+
+ const uint16_t nb_rx = rte_event_dequeue_burst(dev, port,
+ ev, BATCH_SIZE, 0);
+
+ if (nb_rx == 0) {
+ rte_pause();
+ continue;
+ }
+ received += nb_rx;
+
+ for (i = 0; i < nb_rx; i++) {
+ const uint8_t cq_id = ev[i].sub_event_type %
+ cdata.num_stages;
+
+ if (cq_id == lst_qid) {
+ if (ev[i].sched_type == RTE_SCHED_TYPE_ATOMIC) {
+ worker_tx_pkt(ev[i].mbuf);
+ tx++;
+ ev[i].op = RTE_EVENT_OP_RELEASE;
+ continue;
+ }
+
+ worker_fwd_event(&ev[i], RTE_SCHED_TYPE_ATOMIC);
+ } else {
+ ev[i].sub_event_type++;
+ worker_fwd_event(&ev[i], cdata.queue_type);
+ }
+ work();
+ }
+
+ worker_event_enqueue_burst(dev, port, ev, nb_rx);
+ fwd += nb_rx;
+ }
+
+ if (!cdata.quiet)
+ printf(" worker %u thread done. RX=%zu FWD=%zu TX=%zu\n",
+ rte_lcore_id(), received, fwd, tx);
+
+ return 0;
+}
+
+static int
+setup_eventdev_worker_tx(struct cons_data *cons_data,
+ struct worker_data *worker_data)
+{
+ RTE_SET_USED(cons_data);
+ uint8_t i;
+ const uint8_t atq = cdata.all_type_queues ? 1 : 0;
+ const uint8_t dev_id = 0;
+ const uint8_t nb_ports = cdata.num_workers;
+ uint8_t nb_slots = 0;
+ uint8_t nb_queues = rte_eth_dev_count_avail();
+
+ /*
+ * In case where all type queues are not enabled, use queues equal to
+ * number of stages * eth_dev_count and one extra queue per pipeline
+ * for Tx.
+ */
+ if (!atq) {
+ nb_queues *= cdata.num_stages;
+ nb_queues += rte_eth_dev_count_avail();
+ }
+
+ struct rte_event_dev_config config = {
+ .nb_event_queues = nb_queues,
+ .nb_event_ports = nb_ports,
+ .nb_events_limit = 4096,
+ .nb_event_queue_flows = 1024,
+ .nb_event_port_dequeue_depth = 128,
+ .nb_event_port_enqueue_depth = 128,
+ };
+ struct rte_event_port_conf wkr_p_conf = {
+ .dequeue_depth = cdata.worker_cq_depth,
+ .enqueue_depth = 64,
+ .new_event_threshold = 4096,
+ };
+ struct rte_event_queue_conf wkr_q_conf = {
+ .schedule_type = cdata.queue_type,
+ .priority = RTE_EVENT_DEV_PRIORITY_NORMAL,
+ .nb_atomic_flows = 1024,
+ .nb_atomic_order_sequences = 1024,
+ };
+
+ int ret, ndev = rte_event_dev_count();
+
+ if (ndev < 1) {
+ printf("%d: No Eventdev Devices Found\n", __LINE__);
+ return -1;
+ }
+
+
+ struct rte_event_dev_info dev_info;
+ ret = rte_event_dev_info_get(dev_id, &dev_info);
+ printf("\tEventdev %d: %s\n", dev_id, dev_info.driver_name);
+
+ if (dev_info.max_event_port_dequeue_depth <
+ config.nb_event_port_dequeue_depth)
+ config.nb_event_port_dequeue_depth =
+ dev_info.max_event_port_dequeue_depth;
+ if (dev_info.max_event_port_enqueue_depth <
+ config.nb_event_port_enqueue_depth)
+ config.nb_event_port_enqueue_depth =
+ dev_info.max_event_port_enqueue_depth;
+
+ ret = rte_event_dev_configure(dev_id, &config);
+ if (ret < 0) {
+ printf("%d: Error configuring device\n", __LINE__);
+ return -1;
+ }
+
+ printf(" Stages:\n");
+ for (i = 0; i < nb_queues; i++) {
+
+ if (atq) {
+
+ nb_slots = cdata.num_stages;
+ wkr_q_conf.event_queue_cfg =
+ RTE_EVENT_QUEUE_CFG_ALL_TYPES;
+ } else {
+ uint8_t slot;
+
+ nb_slots = cdata.num_stages + 1;
+ slot = i % nb_slots;
+ wkr_q_conf.schedule_type = slot == cdata.num_stages ?
+ RTE_SCHED_TYPE_ATOMIC : cdata.queue_type;
+ }
+
+ if (rte_event_queue_setup(dev_id, i, &wkr_q_conf) < 0) {
+ printf("%d: error creating qid %d\n", __LINE__, i);
+ return -1;
+ }
+ cdata.qid[i] = i;
+ cdata.next_qid[i] = i+1;
+ if (cdata.enable_queue_priorities) {
+ const uint32_t prio_delta =
+ (RTE_EVENT_DEV_PRIORITY_LOWEST) /
+ nb_slots;
+
+ /* higher priority for queues closer to tx */
+ wkr_q_conf.priority =
+ RTE_EVENT_DEV_PRIORITY_LOWEST - prio_delta *
+ (i % nb_slots);
+ }
+
+ const char *type_str = "Atomic";
+ switch (wkr_q_conf.schedule_type) {
+ case RTE_SCHED_TYPE_ORDERED:
+ type_str = "Ordered";
+ break;
+ case RTE_SCHED_TYPE_PARALLEL:
+ type_str = "Parallel";
+ break;
+ }
+ printf("\tStage %d, Type %s\tPriority = %d\n", i, type_str,
+ wkr_q_conf.priority);
+ }
+
+ printf("\n");
+ if (wkr_p_conf.dequeue_depth > config.nb_event_port_dequeue_depth)
+ wkr_p_conf.dequeue_depth = config.nb_event_port_dequeue_depth;
+ if (wkr_p_conf.enqueue_depth > config.nb_event_port_enqueue_depth)
+ wkr_p_conf.enqueue_depth = config.nb_event_port_enqueue_depth;
+
+ /* set up one port per worker, linking to all stage queues */
+ for (i = 0; i < cdata.num_workers; i++) {
+ struct worker_data *w = &worker_data[i];
+ w->dev_id = dev_id;
+ if (rte_event_port_setup(dev_id, i, &wkr_p_conf) < 0) {
+ printf("Error setting up port %d\n", i);
+ return -1;
+ }
+
+ if (rte_event_port_link(dev_id, i, NULL, NULL, 0)
+ != nb_queues) {
+ printf("%d: error creating link for port %d\n",
+ __LINE__, i);
+ return -1;
+ }
+ w->port_id = i;
+ }
+ /*
+ * Reduce the load on ingress event queue by splitting the traffic
+ * across multiple event queues.
+ * for example, nb_stages = 2 and nb_ethdev = 2 then
+ *
+ * nb_queues = (2 * 2) + 2 = 6 (non atq)
+ * rx_stride = 3
+ *
+ * So, traffic is split across queue 0 and queue 3 since queue id for
+ * rx adapter is chosen <ethport_id> * <rx_stride> i.e in the above
+ * case eth port 0, 1 will inject packets into event queue 0, 3
+ * respectively.
+ *
+ * This forms two set of queue pipelines 0->1->2->tx and 3->4->5->tx.
+ */
+ cdata.rx_stride = atq ? 1 : nb_slots;
+ ret = rte_event_dev_service_id_get(dev_id,
+ &fdata->evdev_service_id);
+ if (ret != -ESRCH && ret != 0) {
+ printf("Error getting the service ID\n");
+ return -1;
+ }
+ rte_service_runstate_set(fdata->evdev_service_id, 1);
+ rte_service_set_runstate_mapped_check(fdata->evdev_service_id, 0);
+ if (rte_event_dev_start(dev_id) < 0) {
+ printf("Error starting eventdev\n");
+ return -1;
+ }
+
+ return dev_id;
+}
+
+
+struct rx_adptr_services {
+ uint16_t nb_rx_adptrs;
+ uint32_t *rx_adpt_arr;
+};
+
+static int32_t
+service_rx_adapter(void *arg)
+{
+ int i;
+ struct rx_adptr_services *adptr_services = arg;
+
+ for (i = 0; i < adptr_services->nb_rx_adptrs; i++)
+ rte_service_run_iter_on_app_lcore(
+ adptr_services->rx_adpt_arr[i], 1);
+ return 0;
+}
+
+static void
+init_rx_adapter(uint16_t nb_ports)
+{
+ int i;
+ int ret;
+ uint8_t evdev_id = 0;
+ struct rx_adptr_services *adptr_services = NULL;
+ struct rte_event_dev_info dev_info;
+
+ ret = rte_event_dev_info_get(evdev_id, &dev_info);
+ adptr_services = rte_zmalloc(NULL, sizeof(struct rx_adptr_services), 0);
+
+ struct rte_event_port_conf rx_p_conf = {
+ .dequeue_depth = 8,
+ .enqueue_depth = 8,
+ .new_event_threshold = 1200,
+ };
+
+ if (rx_p_conf.dequeue_depth > dev_info.max_event_port_dequeue_depth)
+ rx_p_conf.dequeue_depth = dev_info.max_event_port_dequeue_depth;
+ if (rx_p_conf.enqueue_depth > dev_info.max_event_port_enqueue_depth)
+ rx_p_conf.enqueue_depth = dev_info.max_event_port_enqueue_depth;
+
+
+ struct rte_event_eth_rx_adapter_queue_conf queue_conf;
+ memset(&queue_conf, 0, sizeof(queue_conf));
+ queue_conf.ev.sched_type = cdata.queue_type;
+
+ for (i = 0; i < nb_ports; i++) {
+ uint32_t cap;
+ uint32_t service_id;
+
+ ret = rte_event_eth_rx_adapter_create(i, evdev_id, &rx_p_conf);
+ if (ret)
+ rte_exit(EXIT_FAILURE,
+ "failed to create rx adapter[%d]",
+ cdata.rx_adapter_id);
+
+ ret = rte_event_eth_rx_adapter_caps_get(evdev_id, i, &cap);
+ if (ret)
+ rte_exit(EXIT_FAILURE,
+ "failed to get event rx adapter "
+ "capabilities");
+
+ queue_conf.ev.queue_id = cdata.rx_stride ?
+ (i * cdata.rx_stride)
+ : (uint8_t)cdata.qid[0];
+
+ ret = rte_event_eth_rx_adapter_queue_add(i, i, -1, &queue_conf);
+ if (ret)
+ rte_exit(EXIT_FAILURE,
+ "Failed to add queues to Rx adapter");
+
+
+ /* Producer needs to be scheduled. */
+ if (!(cap & RTE_EVENT_ETH_RX_ADAPTER_CAP_INTERNAL_PORT)) {
+ ret = rte_event_eth_rx_adapter_service_id_get(i,
+ &service_id);
+ if (ret != -ESRCH && ret != 0) {
+ rte_exit(EXIT_FAILURE,
+ "Error getting the service ID for rx adptr\n");
+ }
+
+ rte_service_runstate_set(service_id, 1);
+ rte_service_set_runstate_mapped_check(service_id, 0);
+
+ adptr_services->nb_rx_adptrs++;
+ adptr_services->rx_adpt_arr = rte_realloc(
+ adptr_services->rx_adpt_arr,
+ adptr_services->nb_rx_adptrs *
+ sizeof(uint32_t), 0);
+ adptr_services->rx_adpt_arr[
+ adptr_services->nb_rx_adptrs - 1] =
+ service_id;
+ }
+
+ ret = rte_event_eth_rx_adapter_start(i);
+ if (ret)
+ rte_exit(EXIT_FAILURE, "Rx adapter[%d] start failed",
+ cdata.rx_adapter_id);
+ }
+
+ if (adptr_services->nb_rx_adptrs) {
+ struct rte_service_spec service;
+
+ memset(&service, 0, sizeof(struct rte_service_spec));
+ snprintf(service.name, sizeof(service.name), "rx_service");
+ service.callback = service_rx_adapter;
+ service.callback_userdata = (void *)adptr_services;
+
+ int32_t ret = rte_service_component_register(&service,
+ &fdata->rxadptr_service_id);
+ if (ret)
+ rte_exit(EXIT_FAILURE,
+ "Rx adapter[%d] service register failed",
+ cdata.rx_adapter_id);
+
+ rte_service_runstate_set(fdata->rxadptr_service_id, 1);
+ rte_service_component_runstate_set(fdata->rxadptr_service_id,
+ 1);
+ rte_service_set_runstate_mapped_check(fdata->rxadptr_service_id,
+ 0);
+ } else {
+ memset(fdata->rx_core, 0, sizeof(unsigned int) * MAX_NUM_CORE);
+ rte_free(adptr_services);
+ }
+
+ if (!adptr_services->nb_rx_adptrs && fdata->cap.consumer == NULL &&
+ (dev_info.event_dev_cap &
+ RTE_EVENT_DEV_CAP_DISTRIBUTED_SCHED))
+ fdata->cap.scheduler = NULL;
+
+ if (dev_info.event_dev_cap & RTE_EVENT_DEV_CAP_DISTRIBUTED_SCHED)
+ memset(fdata->sched_core, 0,
+ sizeof(unsigned int) * MAX_NUM_CORE);
+}
+
+static void
+worker_tx_opt_check(void)
+{
+ int i;
+ int ret;
+ uint32_t cap = 0;
+ uint8_t rx_needed = 0;
+ struct rte_event_dev_info eventdev_info;
+
+ memset(&eventdev_info, 0, sizeof(struct rte_event_dev_info));
+ rte_event_dev_info_get(0, &eventdev_info);
+
+ if (cdata.all_type_queues && !(eventdev_info.event_dev_cap &
+ RTE_EVENT_DEV_CAP_QUEUE_ALL_TYPES))
+ rte_exit(EXIT_FAILURE,
+ "Event dev doesn't support all type queues\n");
+
+ RTE_ETH_FOREACH_DEV(i) {
+ ret = rte_event_eth_rx_adapter_caps_get(0, i, &cap);
+ if (ret)
+ rte_exit(EXIT_FAILURE,
+ "failed to get event rx adapter "
+ "capabilities");
+ rx_needed |=
+ !(cap & RTE_EVENT_ETH_RX_ADAPTER_CAP_INTERNAL_PORT);
+ }
+
+ if (cdata.worker_lcore_mask == 0 ||
+ (rx_needed && cdata.rx_lcore_mask == 0) ||
+ (cdata.sched_lcore_mask == 0 &&
+ !(eventdev_info.event_dev_cap &
+ RTE_EVENT_DEV_CAP_DISTRIBUTED_SCHED))) {
+ printf("Core part of pipeline was not assigned any cores. "
+ "This will stall the pipeline, please check core masks "
+ "(use -h for details on setting core masks):\n"
+ "\trx: %"PRIu64"\n\ttx: %"PRIu64"\n\tsched: %"PRIu64
+ "\n\tworkers: %"PRIu64"\n",
+ cdata.rx_lcore_mask, cdata.tx_lcore_mask,
+ cdata.sched_lcore_mask,
+ cdata.worker_lcore_mask);
+ rte_exit(-1, "Fix core masks\n");
+ }
+}
+
+static worker_loop
+get_worker_loop_single_burst(uint8_t atq)
+{
+ if (atq)
+ return worker_do_tx_single_burst_atq;
+
+ return worker_do_tx_single_burst;
+}
+
+static worker_loop
+get_worker_loop_single_non_burst(uint8_t atq)
+{
+ if (atq)
+ return worker_do_tx_single_atq;
+
+ return worker_do_tx_single;
+}
+
+static worker_loop
+get_worker_loop_burst(uint8_t atq)
+{
+ if (atq)
+ return worker_do_tx_burst_atq;
+
+ return worker_do_tx_burst;
+}
+
+static worker_loop
+get_worker_loop_non_burst(uint8_t atq)
+{
+ if (atq)
+ return worker_do_tx_atq;
+
+ return worker_do_tx;
+}
+
+static worker_loop
+get_worker_single_stage(bool burst)
+{
+ uint8_t atq = cdata.all_type_queues ? 1 : 0;
+
+ if (burst)
+ return get_worker_loop_single_burst(atq);
+
+ return get_worker_loop_single_non_burst(atq);
+}
+
+static worker_loop
+get_worker_multi_stage(bool burst)
+{
+ uint8_t atq = cdata.all_type_queues ? 1 : 0;
+
+ if (burst)
+ return get_worker_loop_burst(atq);
+
+ return get_worker_loop_non_burst(atq);
+}
+
+void
+set_worker_tx_setup_data(struct setup_data *caps, bool burst)
+{
+ if (cdata.num_stages == 1)
+ caps->worker = get_worker_single_stage(burst);
+ else
+ caps->worker = get_worker_multi_stage(burst);
+
+ memset(fdata->tx_core, 0, sizeof(unsigned int) * MAX_NUM_CORE);
+
+ caps->check_opt = worker_tx_opt_check;
+ caps->consumer = NULL;
+ caps->scheduler = schedule_devices;
+ caps->evdev_setup = setup_eventdev_worker_tx;
+ caps->adptr_setup = init_rx_adapter;
+}