summaryrefslogtreecommitdiffstats
path: root/src/seastar/dpdk/examples/load_balancer
diff options
context:
space:
mode:
Diffstat (limited to 'src/seastar/dpdk/examples/load_balancer')
-rw-r--r--src/seastar/dpdk/examples/load_balancer/Makefile57
-rw-r--r--src/seastar/dpdk/examples/load_balancer/config.c1062
-rw-r--r--src/seastar/dpdk/examples/load_balancer/init.c521
-rw-r--r--src/seastar/dpdk/examples/load_balancer/main.c108
-rw-r--r--src/seastar/dpdk/examples/load_balancer/main.h371
-rw-r--r--src/seastar/dpdk/examples/load_balancer/runtime.c674
6 files changed, 2793 insertions, 0 deletions
diff --git a/src/seastar/dpdk/examples/load_balancer/Makefile b/src/seastar/dpdk/examples/load_balancer/Makefile
new file mode 100644
index 00000000..2c5fd9b0
--- /dev/null
+++ b/src/seastar/dpdk/examples/load_balancer/Makefile
@@ -0,0 +1,57 @@
+# BSD LICENSE
+#
+# Copyright(c) 2010-2014 Intel Corporation. All rights reserved.
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions
+# are met:
+#
+# * Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+# * Redistributions in binary form must reproduce the above copyright
+# notice, this list of conditions and the following disclaimer in
+# the documentation and/or other materials provided with the
+# distribution.
+# * Neither the name of Intel Corporation nor the names of its
+# contributors may be used to endorse or promote products derived
+# from this software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+ifeq ($(RTE_SDK),)
+$(error "Please define RTE_SDK environment variable")
+endif
+
+# Default target, can be overriden by command line or environment
+RTE_TARGET ?= x86_64-native-linuxapp-gcc
+
+include $(RTE_SDK)/mk/rte.vars.mk
+
+# binary name
+APP = load_balancer
+
+# all source are stored in SRCS-y
+SRCS-y := main.c config.c init.c runtime.c
+
+CFLAGS += -O3 -g
+CFLAGS += $(WERROR_FLAGS)
+CFLAGS_config.o := -D_GNU_SOURCE
+
+# workaround for a gcc bug with noreturn attribute
+# http://gcc.gnu.org/bugzilla/show_bug.cgi?id=12603
+ifeq ($(CONFIG_RTE_TOOLCHAIN_GCC),y)
+CFLAGS_main.o += -Wno-return-type
+endif
+
+include $(RTE_SDK)/mk/rte.extapp.mk
diff --git a/src/seastar/dpdk/examples/load_balancer/config.c b/src/seastar/dpdk/examples/load_balancer/config.c
new file mode 100644
index 00000000..07f92a1a
--- /dev/null
+++ b/src/seastar/dpdk/examples/load_balancer/config.c
@@ -0,0 +1,1062 @@
+/*-
+ * BSD LICENSE
+ *
+ * Copyright(c) 2010-2014 Intel Corporation. All rights reserved.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions
+ * are met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above copyright
+ * notice, this list of conditions and the following disclaimer in
+ * the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Intel Corporation nor the names of its
+ * contributors may be used to endorse or promote products derived
+ * from this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+#include <stdio.h>
+#include <stdlib.h>
+#include <stdint.h>
+#include <inttypes.h>
+#include <sys/types.h>
+#include <string.h>
+#include <sys/queue.h>
+#include <stdarg.h>
+#include <errno.h>
+#include <getopt.h>
+
+#include <rte_common.h>
+#include <rte_byteorder.h>
+#include <rte_log.h>
+#include <rte_memory.h>
+#include <rte_memcpy.h>
+#include <rte_memzone.h>
+#include <rte_eal.h>
+#include <rte_per_lcore.h>
+#include <rte_launch.h>
+#include <rte_atomic.h>
+#include <rte_cycles.h>
+#include <rte_prefetch.h>
+#include <rte_lcore.h>
+#include <rte_per_lcore.h>
+#include <rte_branch_prediction.h>
+#include <rte_interrupts.h>
+#include <rte_pci.h>
+#include <rte_random.h>
+#include <rte_debug.h>
+#include <rte_ether.h>
+#include <rte_ethdev.h>
+#include <rte_mempool.h>
+#include <rte_mbuf.h>
+#include <rte_ip.h>
+#include <rte_tcp.h>
+#include <rte_lpm.h>
+#include <rte_string_fns.h>
+
+#include "main.h"
+
+struct app_params app;
+
+static const char usage[] =
+" \n"
+" load_balancer <EAL PARAMS> -- <APP PARAMS> \n"
+" \n"
+"Application manadatory parameters: \n"
+" --rx \"(PORT, QUEUE, LCORE), ...\" : List of NIC RX ports and queues \n"
+" handled by the I/O RX lcores \n"
+" --tx \"(PORT, LCORE), ...\" : List of NIC TX ports handled by the I/O TX \n"
+" lcores \n"
+" --w \"LCORE, ...\" : List of the worker lcores \n"
+" --lpm \"IP / PREFIX => PORT; ...\" : List of LPM rules used by the worker \n"
+" lcores for packet forwarding \n"
+" \n"
+"Application optional parameters: \n"
+" --rsz \"A, B, C, D\" : Ring sizes \n"
+" A = Size (in number of buffer descriptors) of each of the NIC RX \n"
+" rings read by the I/O RX lcores (default value is %u) \n"
+" B = Size (in number of elements) of each of the SW rings used by the\n"
+" I/O RX lcores to send packets to worker lcores (default value is\n"
+" %u) \n"
+" C = Size (in number of elements) of each of the SW rings used by the\n"
+" worker lcores to send packets to I/O TX lcores (default value is\n"
+" %u) \n"
+" D = Size (in number of buffer descriptors) of each of the NIC TX \n"
+" rings written by I/O TX lcores (default value is %u) \n"
+" --bsz \"(A, B), (C, D), (E, F)\" : Burst sizes \n"
+" A = I/O RX lcore read burst size from NIC RX (default value is %u) \n"
+" B = I/O RX lcore write burst size to output SW rings (default value \n"
+" is %u) \n"
+" C = Worker lcore read burst size from input SW rings (default value \n"
+" is %u) \n"
+" D = Worker lcore write burst size to output SW rings (default value \n"
+" is %u) \n"
+" E = I/O TX lcore read burst size from input SW rings (default value \n"
+" is %u) \n"
+" F = I/O TX lcore write burst size to NIC TX (default value is %u) \n"
+" --pos-lb POS : Position of the 1-byte field within the input packet used by\n"
+" the I/O RX lcores to identify the worker lcore for the current \n"
+" packet (default value is %u) \n";
+
+void
+app_print_usage(void)
+{
+ printf(usage,
+ APP_DEFAULT_NIC_RX_RING_SIZE,
+ APP_DEFAULT_RING_RX_SIZE,
+ APP_DEFAULT_RING_TX_SIZE,
+ APP_DEFAULT_NIC_TX_RING_SIZE,
+ APP_DEFAULT_BURST_SIZE_IO_RX_READ,
+ APP_DEFAULT_BURST_SIZE_IO_RX_WRITE,
+ APP_DEFAULT_BURST_SIZE_WORKER_READ,
+ APP_DEFAULT_BURST_SIZE_WORKER_WRITE,
+ APP_DEFAULT_BURST_SIZE_IO_TX_READ,
+ APP_DEFAULT_BURST_SIZE_IO_TX_WRITE,
+ APP_DEFAULT_IO_RX_LB_POS
+ );
+}
+
+#ifndef APP_ARG_RX_MAX_CHARS
+#define APP_ARG_RX_MAX_CHARS 4096
+#endif
+
+#ifndef APP_ARG_RX_MAX_TUPLES
+#define APP_ARG_RX_MAX_TUPLES 128
+#endif
+
+static int
+str_to_unsigned_array(
+ const char *s, size_t sbuflen,
+ char separator,
+ unsigned num_vals,
+ unsigned *vals)
+{
+ char str[sbuflen+1];
+ char *splits[num_vals];
+ char *endptr = NULL;
+ int i, num_splits = 0;
+
+ /* copy s so we don't modify original string */
+ snprintf(str, sizeof(str), "%s", s);
+ num_splits = rte_strsplit(str, sizeof(str), splits, num_vals, separator);
+
+ errno = 0;
+ for (i = 0; i < num_splits; i++) {
+ vals[i] = strtoul(splits[i], &endptr, 0);
+ if (errno != 0 || *endptr != '\0')
+ return -1;
+ }
+
+ return num_splits;
+}
+
+static int
+str_to_unsigned_vals(
+ const char *s,
+ size_t sbuflen,
+ char separator,
+ unsigned num_vals, ...)
+{
+ unsigned i, vals[num_vals];
+ va_list ap;
+
+ num_vals = str_to_unsigned_array(s, sbuflen, separator, num_vals, vals);
+
+ va_start(ap, num_vals);
+ for (i = 0; i < num_vals; i++) {
+ unsigned *u = va_arg(ap, unsigned *);
+ *u = vals[i];
+ }
+ va_end(ap);
+ return num_vals;
+}
+
+static int
+parse_arg_rx(const char *arg)
+{
+ const char *p0 = arg, *p = arg;
+ uint32_t n_tuples;
+
+ if (strnlen(arg, APP_ARG_RX_MAX_CHARS + 1) == APP_ARG_RX_MAX_CHARS + 1) {
+ return -1;
+ }
+
+ n_tuples = 0;
+ while ((p = strchr(p0,'(')) != NULL) {
+ struct app_lcore_params *lp;
+ uint32_t port, queue, lcore, i;
+
+ p0 = strchr(p++, ')');
+ if ((p0 == NULL) ||
+ (str_to_unsigned_vals(p, p0 - p, ',', 3, &port, &queue, &lcore) != 3)) {
+ return -2;
+ }
+
+ /* Enable port and queue for later initialization */
+ if ((port >= APP_MAX_NIC_PORTS) || (queue >= APP_MAX_RX_QUEUES_PER_NIC_PORT)) {
+ return -3;
+ }
+ if (app.nic_rx_queue_mask[port][queue] != 0) {
+ return -4;
+ }
+ app.nic_rx_queue_mask[port][queue] = 1;
+
+ /* Check and assign (port, queue) to I/O lcore */
+ if (rte_lcore_is_enabled(lcore) == 0) {
+ return -5;
+ }
+
+ if (lcore >= APP_MAX_LCORES) {
+ return -6;
+ }
+ lp = &app.lcore_params[lcore];
+ if (lp->type == e_APP_LCORE_WORKER) {
+ return -7;
+ }
+ lp->type = e_APP_LCORE_IO;
+ const size_t n_queues = RTE_MIN(lp->io.rx.n_nic_queues,
+ RTE_DIM(lp->io.rx.nic_queues));
+ for (i = 0; i < n_queues; i ++) {
+ if ((lp->io.rx.nic_queues[i].port == port) &&
+ (lp->io.rx.nic_queues[i].queue == queue)) {
+ return -8;
+ }
+ }
+ if (lp->io.rx.n_nic_queues >= APP_MAX_NIC_RX_QUEUES_PER_IO_LCORE) {
+ return -9;
+ }
+ lp->io.rx.nic_queues[lp->io.rx.n_nic_queues].port = (uint8_t) port;
+ lp->io.rx.nic_queues[lp->io.rx.n_nic_queues].queue = (uint8_t) queue;
+ lp->io.rx.n_nic_queues ++;
+
+ n_tuples ++;
+ if (n_tuples > APP_ARG_RX_MAX_TUPLES) {
+ return -10;
+ }
+ }
+
+ if (n_tuples == 0) {
+ return -11;
+ }
+
+ return 0;
+}
+
+#ifndef APP_ARG_TX_MAX_CHARS
+#define APP_ARG_TX_MAX_CHARS 4096
+#endif
+
+#ifndef APP_ARG_TX_MAX_TUPLES
+#define APP_ARG_TX_MAX_TUPLES 128
+#endif
+
+static int
+parse_arg_tx(const char *arg)
+{
+ const char *p0 = arg, *p = arg;
+ uint32_t n_tuples;
+
+ if (strnlen(arg, APP_ARG_TX_MAX_CHARS + 1) == APP_ARG_TX_MAX_CHARS + 1) {
+ return -1;
+ }
+
+ n_tuples = 0;
+ while ((p = strchr(p0,'(')) != NULL) {
+ struct app_lcore_params *lp;
+ uint32_t port, lcore, i;
+
+ p0 = strchr(p++, ')');
+ if ((p0 == NULL) ||
+ (str_to_unsigned_vals(p, p0 - p, ',', 2, &port, &lcore) != 2)) {
+ return -2;
+ }
+
+ /* Enable port and queue for later initialization */
+ if (port >= APP_MAX_NIC_PORTS) {
+ return -3;
+ }
+ if (app.nic_tx_port_mask[port] != 0) {
+ return -4;
+ }
+ app.nic_tx_port_mask[port] = 1;
+
+ /* Check and assign (port, queue) to I/O lcore */
+ if (rte_lcore_is_enabled(lcore) == 0) {
+ return -5;
+ }
+
+ if (lcore >= APP_MAX_LCORES) {
+ return -6;
+ }
+ lp = &app.lcore_params[lcore];
+ if (lp->type == e_APP_LCORE_WORKER) {
+ return -7;
+ }
+ lp->type = e_APP_LCORE_IO;
+ const size_t n_ports = RTE_MIN(lp->io.tx.n_nic_ports,
+ RTE_DIM(lp->io.tx.nic_ports));
+ for (i = 0; i < n_ports; i ++) {
+ if (lp->io.tx.nic_ports[i] == port) {
+ return -8;
+ }
+ }
+ if (lp->io.tx.n_nic_ports >= APP_MAX_NIC_TX_PORTS_PER_IO_LCORE) {
+ return -9;
+ }
+ lp->io.tx.nic_ports[lp->io.tx.n_nic_ports] = (uint8_t) port;
+ lp->io.tx.n_nic_ports ++;
+
+ n_tuples ++;
+ if (n_tuples > APP_ARG_TX_MAX_TUPLES) {
+ return -10;
+ }
+ }
+
+ if (n_tuples == 0) {
+ return -11;
+ }
+
+ return 0;
+}
+
+#ifndef APP_ARG_W_MAX_CHARS
+#define APP_ARG_W_MAX_CHARS 4096
+#endif
+
+#ifndef APP_ARG_W_MAX_TUPLES
+#define APP_ARG_W_MAX_TUPLES APP_MAX_WORKER_LCORES
+#endif
+
+static int
+parse_arg_w(const char *arg)
+{
+ const char *p = arg;
+ uint32_t n_tuples;
+
+ if (strnlen(arg, APP_ARG_W_MAX_CHARS + 1) == APP_ARG_W_MAX_CHARS + 1) {
+ return -1;
+ }
+
+ n_tuples = 0;
+ while (*p != 0) {
+ struct app_lcore_params *lp;
+ uint32_t lcore;
+
+ errno = 0;
+ lcore = strtoul(p, NULL, 0);
+ if ((errno != 0)) {
+ return -2;
+ }
+
+ /* Check and enable worker lcore */
+ if (rte_lcore_is_enabled(lcore) == 0) {
+ return -3;
+ }
+
+ if (lcore >= APP_MAX_LCORES) {
+ return -4;
+ }
+ lp = &app.lcore_params[lcore];
+ if (lp->type == e_APP_LCORE_IO) {
+ return -5;
+ }
+ lp->type = e_APP_LCORE_WORKER;
+
+ n_tuples ++;
+ if (n_tuples > APP_ARG_W_MAX_TUPLES) {
+ return -6;
+ }
+
+ p = strchr(p, ',');
+ if (p == NULL) {
+ break;
+ }
+ p ++;
+ }
+
+ if (n_tuples == 0) {
+ return -7;
+ }
+
+ if ((n_tuples & (n_tuples - 1)) != 0) {
+ return -8;
+ }
+
+ return 0;
+}
+
+#ifndef APP_ARG_LPM_MAX_CHARS
+#define APP_ARG_LPM_MAX_CHARS 4096
+#endif
+
+static int
+parse_arg_lpm(const char *arg)
+{
+ const char *p = arg, *p0;
+
+ if (strnlen(arg, APP_ARG_LPM_MAX_CHARS + 1) == APP_ARG_TX_MAX_CHARS + 1) {
+ return -1;
+ }
+
+ while (*p != 0) {
+ uint32_t ip_a, ip_b, ip_c, ip_d, ip, depth, if_out;
+ char *endptr;
+
+ p0 = strchr(p, '/');
+ if ((p0 == NULL) ||
+ (str_to_unsigned_vals(p, p0 - p, '.', 4, &ip_a, &ip_b, &ip_c, &ip_d) != 4)) {
+ return -2;
+ }
+
+ p = p0 + 1;
+ errno = 0;
+ depth = strtoul(p, &endptr, 0);
+ if (errno != 0 || *endptr != '=') {
+ return -3;
+ }
+ p = strchr(p, '>');
+ if (p == NULL) {
+ return -4;
+ }
+ if_out = strtoul(++p, &endptr, 0);
+ if (errno != 0 || (*endptr != '\0' && *endptr != ';')) {
+ return -5;
+ }
+
+ if ((ip_a >= 256) || (ip_b >= 256) || (ip_c >= 256) || (ip_d >= 256) ||
+ (depth == 0) || (depth >= 32) ||
+ (if_out >= APP_MAX_NIC_PORTS)) {
+ return -6;
+ }
+ ip = (ip_a << 24) | (ip_b << 16) | (ip_c << 8) | ip_d;
+
+ if (app.n_lpm_rules >= APP_MAX_LPM_RULES) {
+ return -7;
+ }
+ app.lpm_rules[app.n_lpm_rules].ip = ip;
+ app.lpm_rules[app.n_lpm_rules].depth = (uint8_t) depth;
+ app.lpm_rules[app.n_lpm_rules].if_out = (uint8_t) if_out;
+ app.n_lpm_rules ++;
+
+ p = strchr(p, ';');
+ if (p == NULL) {
+ return -8;
+ }
+ p ++;
+ }
+
+ if (app.n_lpm_rules == 0) {
+ return -9;
+ }
+
+ return 0;
+}
+
+static int
+app_check_lpm_table(void)
+{
+ uint32_t rule;
+
+ /* For each rule, check that the output I/F is enabled */
+ for (rule = 0; rule < app.n_lpm_rules; rule ++)
+ {
+ uint32_t port = app.lpm_rules[rule].if_out;
+
+ if (app.nic_tx_port_mask[port] == 0) {
+ return -1;
+ }
+ }
+
+ return 0;
+}
+
+static int
+app_check_every_rx_port_is_tx_enabled(void)
+{
+ uint8_t port;
+
+ for (port = 0; port < APP_MAX_NIC_PORTS; port ++) {
+ if ((app_get_nic_rx_queues_per_port(port) > 0) && (app.nic_tx_port_mask[port] == 0)) {
+ return -1;
+ }
+ }
+
+ return 0;
+}
+
+#ifndef APP_ARG_RSZ_CHARS
+#define APP_ARG_RSZ_CHARS 63
+#endif
+
+static int
+parse_arg_rsz(const char *arg)
+{
+ if (strnlen(arg, APP_ARG_RSZ_CHARS + 1) == APP_ARG_RSZ_CHARS + 1) {
+ return -1;
+ }
+
+ if (str_to_unsigned_vals(arg, APP_ARG_RSZ_CHARS, ',', 4,
+ &app.nic_rx_ring_size,
+ &app.ring_rx_size,
+ &app.ring_tx_size,
+ &app.nic_tx_ring_size) != 4)
+ return -2;
+
+
+ if ((app.nic_rx_ring_size == 0) ||
+ (app.nic_tx_ring_size == 0) ||
+ (app.ring_rx_size == 0) ||
+ (app.ring_tx_size == 0)) {
+ return -3;
+ }
+
+ return 0;
+}
+
+#ifndef APP_ARG_BSZ_CHARS
+#define APP_ARG_BSZ_CHARS 63
+#endif
+
+static int
+parse_arg_bsz(const char *arg)
+{
+ const char *p = arg, *p0;
+ if (strnlen(arg, APP_ARG_BSZ_CHARS + 1) == APP_ARG_BSZ_CHARS + 1) {
+ return -1;
+ }
+
+ p0 = strchr(p++, ')');
+ if ((p0 == NULL) ||
+ (str_to_unsigned_vals(p, p0 - p, ',', 2, &app.burst_size_io_rx_read, &app.burst_size_io_rx_write) != 2)) {
+ return -2;
+ }
+
+ p = strchr(p0, '(');
+ if (p == NULL) {
+ return -3;
+ }
+
+ p0 = strchr(p++, ')');
+ if ((p0 == NULL) ||
+ (str_to_unsigned_vals(p, p0 - p, ',', 2, &app.burst_size_worker_read, &app.burst_size_worker_write) != 2)) {
+ return -4;
+ }
+
+ p = strchr(p0, '(');
+ if (p == NULL) {
+ return -5;
+ }
+
+ p0 = strchr(p++, ')');
+ if ((p0 == NULL) ||
+ (str_to_unsigned_vals(p, p0 - p, ',', 2, &app.burst_size_io_tx_read, &app.burst_size_io_tx_write) != 2)) {
+ return -6;
+ }
+
+ if ((app.burst_size_io_rx_read == 0) ||
+ (app.burst_size_io_rx_write == 0) ||
+ (app.burst_size_worker_read == 0) ||
+ (app.burst_size_worker_write == 0) ||
+ (app.burst_size_io_tx_read == 0) ||
+ (app.burst_size_io_tx_write == 0)) {
+ return -7;
+ }
+
+ if ((app.burst_size_io_rx_read > APP_MBUF_ARRAY_SIZE) ||
+ (app.burst_size_io_rx_write > APP_MBUF_ARRAY_SIZE) ||
+ (app.burst_size_worker_read > APP_MBUF_ARRAY_SIZE) ||
+ (app.burst_size_worker_write > APP_MBUF_ARRAY_SIZE) ||
+ ((2 * app.burst_size_io_tx_read) > APP_MBUF_ARRAY_SIZE) ||
+ (app.burst_size_io_tx_write > APP_MBUF_ARRAY_SIZE)) {
+ return -8;
+ }
+
+ return 0;
+}
+
+#ifndef APP_ARG_NUMERICAL_SIZE_CHARS
+#define APP_ARG_NUMERICAL_SIZE_CHARS 15
+#endif
+
+static int
+parse_arg_pos_lb(const char *arg)
+{
+ uint32_t x;
+ char *endpt;
+
+ if (strnlen(arg, APP_ARG_NUMERICAL_SIZE_CHARS + 1) == APP_ARG_NUMERICAL_SIZE_CHARS + 1) {
+ return -1;
+ }
+
+ errno = 0;
+ x = strtoul(arg, &endpt, 10);
+ if (errno != 0 || endpt == arg || *endpt != '\0'){
+ return -2;
+ }
+
+ if (x >= 64) {
+ return -3;
+ }
+
+ app.pos_lb = (uint8_t) x;
+
+ return 0;
+}
+
+/* Parse the argument given in the command line of the application */
+int
+app_parse_args(int argc, char **argv)
+{
+ int opt, ret;
+ char **argvopt;
+ int option_index;
+ char *prgname = argv[0];
+ static struct option lgopts[] = {
+ {"rx", 1, 0, 0},
+ {"tx", 1, 0, 0},
+ {"w", 1, 0, 0},
+ {"lpm", 1, 0, 0},
+ {"rsz", 1, 0, 0},
+ {"bsz", 1, 0, 0},
+ {"pos-lb", 1, 0, 0},
+ {NULL, 0, 0, 0}
+ };
+ uint32_t arg_w = 0;
+ uint32_t arg_rx = 0;
+ uint32_t arg_tx = 0;
+ uint32_t arg_lpm = 0;
+ uint32_t arg_rsz = 0;
+ uint32_t arg_bsz = 0;
+ uint32_t arg_pos_lb = 0;
+
+ argvopt = argv;
+
+ while ((opt = getopt_long(argc, argvopt, "",
+ lgopts, &option_index)) != EOF) {
+
+ switch (opt) {
+ /* long options */
+ case 0:
+ if (!strcmp(lgopts[option_index].name, "rx")) {
+ arg_rx = 1;
+ ret = parse_arg_rx(optarg);
+ if (ret) {
+ printf("Incorrect value for --rx argument (%d)\n", ret);
+ return -1;
+ }
+ }
+ if (!strcmp(lgopts[option_index].name, "tx")) {
+ arg_tx = 1;
+ ret = parse_arg_tx(optarg);
+ if (ret) {
+ printf("Incorrect value for --tx argument (%d)\n", ret);
+ return -1;
+ }
+ }
+ if (!strcmp(lgopts[option_index].name, "w")) {
+ arg_w = 1;
+ ret = parse_arg_w(optarg);
+ if (ret) {
+ printf("Incorrect value for --w argument (%d)\n", ret);
+ return -1;
+ }
+ }
+ if (!strcmp(lgopts[option_index].name, "lpm")) {
+ arg_lpm = 1;
+ ret = parse_arg_lpm(optarg);
+ if (ret) {
+ printf("Incorrect value for --lpm argument (%d)\n", ret);
+ return -1;
+ }
+ }
+ if (!strcmp(lgopts[option_index].name, "rsz")) {
+ arg_rsz = 1;
+ ret = parse_arg_rsz(optarg);
+ if (ret) {
+ printf("Incorrect value for --rsz argument (%d)\n", ret);
+ return -1;
+ }
+ }
+ if (!strcmp(lgopts[option_index].name, "bsz")) {
+ arg_bsz = 1;
+ ret = parse_arg_bsz(optarg);
+ if (ret) {
+ printf("Incorrect value for --bsz argument (%d)\n", ret);
+ return -1;
+ }
+ }
+ if (!strcmp(lgopts[option_index].name, "pos-lb")) {
+ arg_pos_lb = 1;
+ ret = parse_arg_pos_lb(optarg);
+ if (ret) {
+ printf("Incorrect value for --pos-lb argument (%d)\n", ret);
+ return -1;
+ }
+ }
+ break;
+
+ default:
+ return -1;
+ }
+ }
+
+ /* Check that all mandatory arguments are provided */
+ if ((arg_rx == 0) || (arg_tx == 0) || (arg_w == 0) || (arg_lpm == 0)){
+ printf("Not all mandatory arguments are present\n");
+ return -1;
+ }
+
+ /* Assign default values for the optional arguments not provided */
+ if (arg_rsz == 0) {
+ app.nic_rx_ring_size = APP_DEFAULT_NIC_RX_RING_SIZE;
+ app.nic_tx_ring_size = APP_DEFAULT_NIC_TX_RING_SIZE;
+ app.ring_rx_size = APP_DEFAULT_RING_RX_SIZE;
+ app.ring_tx_size = APP_DEFAULT_RING_TX_SIZE;
+ }
+
+ if (arg_bsz == 0) {
+ app.burst_size_io_rx_read = APP_DEFAULT_BURST_SIZE_IO_RX_READ;
+ app.burst_size_io_rx_write = APP_DEFAULT_BURST_SIZE_IO_RX_WRITE;
+ app.burst_size_io_tx_read = APP_DEFAULT_BURST_SIZE_IO_TX_READ;
+ app.burst_size_io_tx_write = APP_DEFAULT_BURST_SIZE_IO_TX_WRITE;
+ app.burst_size_worker_read = APP_DEFAULT_BURST_SIZE_WORKER_READ;
+ app.burst_size_worker_write = APP_DEFAULT_BURST_SIZE_WORKER_WRITE;
+ }
+
+ if (arg_pos_lb == 0) {
+ app.pos_lb = APP_DEFAULT_IO_RX_LB_POS;
+ }
+
+ /* Check cross-consistency of arguments */
+ if ((ret = app_check_lpm_table()) < 0) {
+ printf("At least one LPM rule is inconsistent (%d)\n", ret);
+ return -1;
+ }
+ if (app_check_every_rx_port_is_tx_enabled() < 0) {
+ printf("On LPM lookup miss, packet is sent back on the input port.\n");
+ printf("At least one RX port is not enabled for TX.\n");
+ return -2;
+ }
+
+ if (optind >= 0)
+ argv[optind - 1] = prgname;
+
+ ret = optind - 1;
+ optind = 1; /* reset getopt lib */
+ return ret;
+}
+
+int
+app_get_nic_rx_queues_per_port(uint8_t port)
+{
+ uint32_t i, count;
+
+ if (port >= APP_MAX_NIC_PORTS) {
+ return -1;
+ }
+
+ count = 0;
+ for (i = 0; i < APP_MAX_RX_QUEUES_PER_NIC_PORT; i ++) {
+ if (app.nic_rx_queue_mask[port][i] == 1) {
+ count ++;
+ }
+ }
+
+ return count;
+}
+
+int
+app_get_lcore_for_nic_rx(uint8_t port, uint8_t queue, uint32_t *lcore_out)
+{
+ uint32_t lcore;
+
+ for (lcore = 0; lcore < APP_MAX_LCORES; lcore ++) {
+ struct app_lcore_params_io *lp = &app.lcore_params[lcore].io;
+ uint32_t i;
+
+ if (app.lcore_params[lcore].type != e_APP_LCORE_IO) {
+ continue;
+ }
+
+ const size_t n_queues = RTE_MIN(lp->rx.n_nic_queues,
+ RTE_DIM(lp->rx.nic_queues));
+ for (i = 0; i < n_queues; i ++) {
+ if ((lp->rx.nic_queues[i].port == port) &&
+ (lp->rx.nic_queues[i].queue == queue)) {
+ *lcore_out = lcore;
+ return 0;
+ }
+ }
+ }
+
+ return -1;
+}
+
+int
+app_get_lcore_for_nic_tx(uint8_t port, uint32_t *lcore_out)
+{
+ uint32_t lcore;
+
+ for (lcore = 0; lcore < APP_MAX_LCORES; lcore ++) {
+ struct app_lcore_params_io *lp = &app.lcore_params[lcore].io;
+ uint32_t i;
+
+ if (app.lcore_params[lcore].type != e_APP_LCORE_IO) {
+ continue;
+ }
+
+ const size_t n_ports = RTE_MIN(lp->tx.n_nic_ports,
+ RTE_DIM(lp->tx.nic_ports));
+ for (i = 0; i < n_ports; i ++) {
+ if (lp->tx.nic_ports[i] == port) {
+ *lcore_out = lcore;
+ return 0;
+ }
+ }
+ }
+
+ return -1;
+}
+
+int
+app_is_socket_used(uint32_t socket)
+{
+ uint32_t lcore;
+
+ for (lcore = 0; lcore < APP_MAX_LCORES; lcore ++) {
+ if (app.lcore_params[lcore].type == e_APP_LCORE_DISABLED) {
+ continue;
+ }
+
+ if (socket == rte_lcore_to_socket_id(lcore)) {
+ return 1;
+ }
+ }
+
+ return 0;
+}
+
+uint32_t
+app_get_lcores_io_rx(void)
+{
+ uint32_t lcore, count;
+
+ count = 0;
+ for (lcore = 0; lcore < APP_MAX_LCORES; lcore ++) {
+ struct app_lcore_params_io *lp_io = &app.lcore_params[lcore].io;
+
+ if ((app.lcore_params[lcore].type != e_APP_LCORE_IO) ||
+ (lp_io->rx.n_nic_queues == 0)) {
+ continue;
+ }
+
+ count ++;
+ }
+
+ return count;
+}
+
+uint32_t
+app_get_lcores_worker(void)
+{
+ uint32_t lcore, count;
+
+ count = 0;
+ for (lcore = 0; lcore < APP_MAX_LCORES; lcore ++) {
+ if (app.lcore_params[lcore].type != e_APP_LCORE_WORKER) {
+ continue;
+ }
+
+ count ++;
+ }
+
+ if (count > APP_MAX_WORKER_LCORES) {
+ rte_panic("Algorithmic error (too many worker lcores)\n");
+ return 0;
+ }
+
+ return count;
+}
+
+void
+app_print_params(void)
+{
+ unsigned port, queue, lcore, rule, i, j;
+
+ /* Print NIC RX configuration */
+ printf("NIC RX ports: ");
+ for (port = 0; port < APP_MAX_NIC_PORTS; port ++) {
+ uint32_t n_rx_queues = app_get_nic_rx_queues_per_port((uint8_t) port);
+
+ if (n_rx_queues == 0) {
+ continue;
+ }
+
+ printf("%u (", port);
+ for (queue = 0; queue < APP_MAX_RX_QUEUES_PER_NIC_PORT; queue ++) {
+ if (app.nic_rx_queue_mask[port][queue] == 1) {
+ printf("%u ", queue);
+ }
+ }
+ printf(") ");
+ }
+ printf(";\n");
+
+ /* Print I/O lcore RX params */
+ for (lcore = 0; lcore < APP_MAX_LCORES; lcore ++) {
+ struct app_lcore_params_io *lp = &app.lcore_params[lcore].io;
+
+ if ((app.lcore_params[lcore].type != e_APP_LCORE_IO) ||
+ (lp->rx.n_nic_queues == 0)) {
+ continue;
+ }
+
+ printf("I/O lcore %u (socket %u): ", lcore, rte_lcore_to_socket_id(lcore));
+
+ printf("RX ports ");
+ for (i = 0; i < lp->rx.n_nic_queues; i ++) {
+ printf("(%u, %u) ",
+ (unsigned) lp->rx.nic_queues[i].port,
+ (unsigned) lp->rx.nic_queues[i].queue);
+ }
+ printf("; ");
+
+ printf("Output rings ");
+ for (i = 0; i < lp->rx.n_rings; i ++) {
+ printf("%p ", lp->rx.rings[i]);
+ }
+ printf(";\n");
+ }
+
+ /* Print worker lcore RX params */
+ for (lcore = 0; lcore < APP_MAX_LCORES; lcore ++) {
+ struct app_lcore_params_worker *lp = &app.lcore_params[lcore].worker;
+
+ if (app.lcore_params[lcore].type != e_APP_LCORE_WORKER) {
+ continue;
+ }
+
+ printf("Worker lcore %u (socket %u) ID %u: ",
+ lcore,
+ rte_lcore_to_socket_id(lcore),
+ (unsigned)lp->worker_id);
+
+ printf("Input rings ");
+ for (i = 0; i < lp->n_rings_in; i ++) {
+ printf("%p ", lp->rings_in[i]);
+ }
+
+ printf(";\n");
+ }
+
+ printf("\n");
+
+ /* Print NIC TX configuration */
+ printf("NIC TX ports: ");
+ for (port = 0; port < APP_MAX_NIC_PORTS; port ++) {
+ if (app.nic_tx_port_mask[port] == 1) {
+ printf("%u ", port);
+ }
+ }
+ printf(";\n");
+
+ /* Print I/O TX lcore params */
+ for (lcore = 0; lcore < APP_MAX_LCORES; lcore ++) {
+ struct app_lcore_params_io *lp = &app.lcore_params[lcore].io;
+ uint32_t n_workers = app_get_lcores_worker();
+
+ if ((app.lcore_params[lcore].type != e_APP_LCORE_IO) ||
+ (lp->tx.n_nic_ports == 0)) {
+ continue;
+ }
+
+ printf("I/O lcore %u (socket %u): ", lcore, rte_lcore_to_socket_id(lcore));
+
+ printf("Input rings per TX port ");
+ for (i = 0; i < lp->tx.n_nic_ports; i ++) {
+ port = lp->tx.nic_ports[i];
+
+ printf("%u (", port);
+ for (j = 0; j < n_workers; j ++) {
+ printf("%p ", lp->tx.rings[port][j]);
+ }
+ printf(") ");
+
+ }
+
+ printf(";\n");
+ }
+
+ /* Print worker lcore TX params */
+ for (lcore = 0; lcore < APP_MAX_LCORES; lcore ++) {
+ struct app_lcore_params_worker *lp = &app.lcore_params[lcore].worker;
+
+ if (app.lcore_params[lcore].type != e_APP_LCORE_WORKER) {
+ continue;
+ }
+
+ printf("Worker lcore %u (socket %u) ID %u: \n",
+ lcore,
+ rte_lcore_to_socket_id(lcore),
+ (unsigned)lp->worker_id);
+
+ printf("Output rings per TX port ");
+ for (port = 0; port < APP_MAX_NIC_PORTS; port ++) {
+ if (lp->rings_out[port] != NULL) {
+ printf("%u (%p) ", port, lp->rings_out[port]);
+ }
+ }
+
+ printf(";\n");
+ }
+
+ /* Print LPM rules */
+ printf("LPM rules: \n");
+ for (rule = 0; rule < app.n_lpm_rules; rule ++) {
+ uint32_t ip = app.lpm_rules[rule].ip;
+ uint8_t depth = app.lpm_rules[rule].depth;
+ uint8_t if_out = app.lpm_rules[rule].if_out;
+
+ printf("\t%u: %u.%u.%u.%u/%u => %u;\n",
+ rule,
+ (unsigned) (ip & 0xFF000000) >> 24,
+ (unsigned) (ip & 0x00FF0000) >> 16,
+ (unsigned) (ip & 0x0000FF00) >> 8,
+ (unsigned) ip & 0x000000FF,
+ (unsigned) depth,
+ (unsigned) if_out
+ );
+ }
+
+ /* Rings */
+ printf("Ring sizes: NIC RX = %u; Worker in = %u; Worker out = %u; NIC TX = %u;\n",
+ (unsigned) app.nic_rx_ring_size,
+ (unsigned) app.ring_rx_size,
+ (unsigned) app.ring_tx_size,
+ (unsigned) app.nic_tx_ring_size);
+
+ /* Bursts */
+ printf("Burst sizes: I/O RX (rd = %u, wr = %u); Worker (rd = %u, wr = %u); I/O TX (rd = %u, wr = %u)\n",
+ (unsigned) app.burst_size_io_rx_read,
+ (unsigned) app.burst_size_io_rx_write,
+ (unsigned) app.burst_size_worker_read,
+ (unsigned) app.burst_size_worker_write,
+ (unsigned) app.burst_size_io_tx_read,
+ (unsigned) app.burst_size_io_tx_write);
+}
diff --git a/src/seastar/dpdk/examples/load_balancer/init.c b/src/seastar/dpdk/examples/load_balancer/init.c
new file mode 100644
index 00000000..abd05a31
--- /dev/null
+++ b/src/seastar/dpdk/examples/load_balancer/init.c
@@ -0,0 +1,521 @@
+/*-
+ * BSD LICENSE
+ *
+ * Copyright(c) 2010-2014 Intel Corporation. All rights reserved.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions
+ * are met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above copyright
+ * notice, this list of conditions and the following disclaimer in
+ * the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Intel Corporation nor the names of its
+ * contributors may be used to endorse or promote products derived
+ * from this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+#include <stdio.h>
+#include <stdlib.h>
+#include <stdint.h>
+#include <inttypes.h>
+#include <sys/types.h>
+#include <string.h>
+#include <sys/queue.h>
+#include <stdarg.h>
+#include <errno.h>
+#include <getopt.h>
+
+#include <rte_common.h>
+#include <rte_byteorder.h>
+#include <rte_log.h>
+#include <rte_memory.h>
+#include <rte_memcpy.h>
+#include <rte_memzone.h>
+#include <rte_eal.h>
+#include <rte_per_lcore.h>
+#include <rte_launch.h>
+#include <rte_atomic.h>
+#include <rte_cycles.h>
+#include <rte_prefetch.h>
+#include <rte_lcore.h>
+#include <rte_per_lcore.h>
+#include <rte_branch_prediction.h>
+#include <rte_interrupts.h>
+#include <rte_pci.h>
+#include <rte_random.h>
+#include <rte_debug.h>
+#include <rte_ether.h>
+#include <rte_ethdev.h>
+#include <rte_ring.h>
+#include <rte_mempool.h>
+#include <rte_mbuf.h>
+#include <rte_string_fns.h>
+#include <rte_ip.h>
+#include <rte_tcp.h>
+#include <rte_lpm.h>
+
+#include "main.h"
+
+static struct rte_eth_conf port_conf = {
+ .rxmode = {
+ .mq_mode = ETH_MQ_RX_RSS,
+ .split_hdr_size = 0,
+ .header_split = 0, /**< Header Split disabled */
+ .hw_ip_checksum = 1, /**< IP checksum offload enabled */
+ .hw_vlan_filter = 0, /**< VLAN filtering disabled */
+ .jumbo_frame = 0, /**< Jumbo Frame Support disabled */
+ .hw_strip_crc = 1, /**< CRC stripped by hardware */
+ },
+ .rx_adv_conf = {
+ .rss_conf = {
+ .rss_key = NULL,
+ .rss_hf = ETH_RSS_IP,
+ },
+ },
+ .txmode = {
+ .mq_mode = ETH_MQ_TX_NONE,
+ },
+};
+
+static void
+app_assign_worker_ids(void)
+{
+ uint32_t lcore, worker_id;
+
+ /* Assign ID for each worker */
+ worker_id = 0;
+ for (lcore = 0; lcore < APP_MAX_LCORES; lcore ++) {
+ struct app_lcore_params_worker *lp_worker = &app.lcore_params[lcore].worker;
+
+ if (app.lcore_params[lcore].type != e_APP_LCORE_WORKER) {
+ continue;
+ }
+
+ lp_worker->worker_id = worker_id;
+ worker_id ++;
+ }
+}
+
+static void
+app_init_mbuf_pools(void)
+{
+ unsigned socket, lcore;
+
+ /* Init the buffer pools */
+ for (socket = 0; socket < APP_MAX_SOCKETS; socket ++) {
+ char name[32];
+ if (app_is_socket_used(socket) == 0) {
+ continue;
+ }
+
+ snprintf(name, sizeof(name), "mbuf_pool_%u", socket);
+ printf("Creating the mbuf pool for socket %u ...\n", socket);
+ app.pools[socket] = rte_pktmbuf_pool_create(
+ name, APP_DEFAULT_MEMPOOL_BUFFERS,
+ APP_DEFAULT_MEMPOOL_CACHE_SIZE,
+ 0, APP_DEFAULT_MBUF_DATA_SIZE, socket);
+ if (app.pools[socket] == NULL) {
+ rte_panic("Cannot create mbuf pool on socket %u\n", socket);
+ }
+ }
+
+ for (lcore = 0; lcore < APP_MAX_LCORES; lcore ++) {
+ if (app.lcore_params[lcore].type == e_APP_LCORE_DISABLED) {
+ continue;
+ }
+
+ socket = rte_lcore_to_socket_id(lcore);
+ app.lcore_params[lcore].pool = app.pools[socket];
+ }
+}
+
+static void
+app_init_lpm_tables(void)
+{
+ unsigned socket, lcore;
+
+ /* Init the LPM tables */
+ for (socket = 0; socket < APP_MAX_SOCKETS; socket ++) {
+ char name[32];
+ uint32_t rule;
+
+ if (app_is_socket_used(socket) == 0) {
+ continue;
+ }
+
+ struct rte_lpm_config lpm_config;
+
+ lpm_config.max_rules = APP_MAX_LPM_RULES;
+ lpm_config.number_tbl8s = 256;
+ lpm_config.flags = 0;
+ snprintf(name, sizeof(name), "lpm_table_%u", socket);
+ printf("Creating the LPM table for socket %u ...\n", socket);
+ app.lpm_tables[socket] = rte_lpm_create(
+ name,
+ socket,
+ &lpm_config);
+ if (app.lpm_tables[socket] == NULL) {
+ rte_panic("Unable to create LPM table on socket %u\n", socket);
+ }
+
+ for (rule = 0; rule < app.n_lpm_rules; rule ++) {
+ int ret;
+
+ ret = rte_lpm_add(app.lpm_tables[socket],
+ app.lpm_rules[rule].ip,
+ app.lpm_rules[rule].depth,
+ app.lpm_rules[rule].if_out);
+
+ if (ret < 0) {
+ rte_panic("Unable to add entry %u (%x/%u => %u) to the LPM table on socket %u (%d)\n",
+ (unsigned) rule,
+ (unsigned) app.lpm_rules[rule].ip,
+ (unsigned) app.lpm_rules[rule].depth,
+ (unsigned) app.lpm_rules[rule].if_out,
+ socket,
+ ret);
+ }
+ }
+
+ }
+
+ for (lcore = 0; lcore < APP_MAX_LCORES; lcore ++) {
+ if (app.lcore_params[lcore].type != e_APP_LCORE_WORKER) {
+ continue;
+ }
+
+ socket = rte_lcore_to_socket_id(lcore);
+ app.lcore_params[lcore].worker.lpm_table = app.lpm_tables[socket];
+ }
+}
+
+static void
+app_init_rings_rx(void)
+{
+ unsigned lcore;
+
+ /* Initialize the rings for the RX side */
+ for (lcore = 0; lcore < APP_MAX_LCORES; lcore ++) {
+ struct app_lcore_params_io *lp_io = &app.lcore_params[lcore].io;
+ unsigned socket_io, lcore_worker;
+
+ if ((app.lcore_params[lcore].type != e_APP_LCORE_IO) ||
+ (lp_io->rx.n_nic_queues == 0)) {
+ continue;
+ }
+
+ socket_io = rte_lcore_to_socket_id(lcore);
+
+ for (lcore_worker = 0; lcore_worker < APP_MAX_LCORES; lcore_worker ++) {
+ char name[32];
+ struct app_lcore_params_worker *lp_worker = &app.lcore_params[lcore_worker].worker;
+ struct rte_ring *ring = NULL;
+
+ if (app.lcore_params[lcore_worker].type != e_APP_LCORE_WORKER) {
+ continue;
+ }
+
+ printf("Creating ring to connect I/O lcore %u (socket %u) with worker lcore %u ...\n",
+ lcore,
+ socket_io,
+ lcore_worker);
+ snprintf(name, sizeof(name), "app_ring_rx_s%u_io%u_w%u",
+ socket_io,
+ lcore,
+ lcore_worker);
+ ring = rte_ring_create(
+ name,
+ app.ring_rx_size,
+ socket_io,
+ RING_F_SP_ENQ | RING_F_SC_DEQ);
+ if (ring == NULL) {
+ rte_panic("Cannot create ring to connect I/O core %u with worker core %u\n",
+ lcore,
+ lcore_worker);
+ }
+
+ lp_io->rx.rings[lp_io->rx.n_rings] = ring;
+ lp_io->rx.n_rings ++;
+
+ lp_worker->rings_in[lp_worker->n_rings_in] = ring;
+ lp_worker->n_rings_in ++;
+ }
+ }
+
+ for (lcore = 0; lcore < APP_MAX_LCORES; lcore ++) {
+ struct app_lcore_params_io *lp_io = &app.lcore_params[lcore].io;
+
+ if ((app.lcore_params[lcore].type != e_APP_LCORE_IO) ||
+ (lp_io->rx.n_nic_queues == 0)) {
+ continue;
+ }
+
+ if (lp_io->rx.n_rings != app_get_lcores_worker()) {
+ rte_panic("Algorithmic error (I/O RX rings)\n");
+ }
+ }
+
+ for (lcore = 0; lcore < APP_MAX_LCORES; lcore ++) {
+ struct app_lcore_params_worker *lp_worker = &app.lcore_params[lcore].worker;
+
+ if (app.lcore_params[lcore].type != e_APP_LCORE_WORKER) {
+ continue;
+ }
+
+ if (lp_worker->n_rings_in != app_get_lcores_io_rx()) {
+ rte_panic("Algorithmic error (worker input rings)\n");
+ }
+ }
+}
+
+static void
+app_init_rings_tx(void)
+{
+ unsigned lcore;
+
+ /* Initialize the rings for the TX side */
+ for (lcore = 0; lcore < APP_MAX_LCORES; lcore ++) {
+ struct app_lcore_params_worker *lp_worker = &app.lcore_params[lcore].worker;
+ unsigned port;
+
+ if (app.lcore_params[lcore].type != e_APP_LCORE_WORKER) {
+ continue;
+ }
+
+ for (port = 0; port < APP_MAX_NIC_PORTS; port ++) {
+ char name[32];
+ struct app_lcore_params_io *lp_io = NULL;
+ struct rte_ring *ring;
+ uint32_t socket_io, lcore_io;
+
+ if (app.nic_tx_port_mask[port] == 0) {
+ continue;
+ }
+
+ if (app_get_lcore_for_nic_tx((uint8_t) port, &lcore_io) < 0) {
+ rte_panic("Algorithmic error (no I/O core to handle TX of port %u)\n",
+ port);
+ }
+
+ lp_io = &app.lcore_params[lcore_io].io;
+ socket_io = rte_lcore_to_socket_id(lcore_io);
+
+ printf("Creating ring to connect worker lcore %u with TX port %u (through I/O lcore %u) (socket %u) ...\n",
+ lcore, port, (unsigned)lcore_io, (unsigned)socket_io);
+ snprintf(name, sizeof(name), "app_ring_tx_s%u_w%u_p%u", socket_io, lcore, port);
+ ring = rte_ring_create(
+ name,
+ app.ring_tx_size,
+ socket_io,
+ RING_F_SP_ENQ | RING_F_SC_DEQ);
+ if (ring == NULL) {
+ rte_panic("Cannot create ring to connect worker core %u with TX port %u\n",
+ lcore,
+ port);
+ }
+
+ lp_worker->rings_out[port] = ring;
+ lp_io->tx.rings[port][lp_worker->worker_id] = ring;
+ }
+ }
+
+ for (lcore = 0; lcore < APP_MAX_LCORES; lcore ++) {
+ struct app_lcore_params_io *lp_io = &app.lcore_params[lcore].io;
+ unsigned i;
+
+ if ((app.lcore_params[lcore].type != e_APP_LCORE_IO) ||
+ (lp_io->tx.n_nic_ports == 0)) {
+ continue;
+ }
+
+ for (i = 0; i < lp_io->tx.n_nic_ports; i ++){
+ unsigned port, j;
+
+ port = lp_io->tx.nic_ports[i];
+ for (j = 0; j < app_get_lcores_worker(); j ++) {
+ if (lp_io->tx.rings[port][j] == NULL) {
+ rte_panic("Algorithmic error (I/O TX rings)\n");
+ }
+ }
+ }
+ }
+}
+
+/* Check the link status of all ports in up to 9s, and print them finally */
+static void
+check_all_ports_link_status(uint8_t port_num, uint32_t port_mask)
+{
+#define CHECK_INTERVAL 100 /* 100ms */
+#define MAX_CHECK_TIME 90 /* 9s (90 * 100ms) in total */
+ uint8_t portid, count, all_ports_up, print_flag = 0;
+ struct rte_eth_link link;
+ uint32_t n_rx_queues, n_tx_queues;
+
+ printf("\nChecking link status");
+ fflush(stdout);
+ for (count = 0; count <= MAX_CHECK_TIME; count++) {
+ all_ports_up = 1;
+ for (portid = 0; portid < port_num; portid++) {
+ if ((port_mask & (1 << portid)) == 0)
+ continue;
+ n_rx_queues = app_get_nic_rx_queues_per_port(portid);
+ n_tx_queues = app.nic_tx_port_mask[portid];
+ if ((n_rx_queues == 0) && (n_tx_queues == 0))
+ continue;
+ memset(&link, 0, sizeof(link));
+ rte_eth_link_get_nowait(portid, &link);
+ /* print link status if flag set */
+ if (print_flag == 1) {
+ if (link.link_status)
+ printf("Port %d Link Up - speed %u "
+ "Mbps - %s\n", (uint8_t)portid,
+ (unsigned)link.link_speed,
+ (link.link_duplex == ETH_LINK_FULL_DUPLEX) ?
+ ("full-duplex") : ("half-duplex\n"));
+ else
+ printf("Port %d Link Down\n",
+ (uint8_t)portid);
+ continue;
+ }
+ /* clear all_ports_up flag if any link down */
+ if (link.link_status == ETH_LINK_DOWN) {
+ all_ports_up = 0;
+ break;
+ }
+ }
+ /* after finally printing all link status, get out */
+ if (print_flag == 1)
+ break;
+
+ if (all_ports_up == 0) {
+ printf(".");
+ fflush(stdout);
+ rte_delay_ms(CHECK_INTERVAL);
+ }
+
+ /* set the print_flag if all ports up or timeout */
+ if (all_ports_up == 1 || count == (MAX_CHECK_TIME - 1)) {
+ print_flag = 1;
+ printf("done\n");
+ }
+ }
+}
+
+static void
+app_init_nics(void)
+{
+ unsigned socket;
+ uint32_t lcore;
+ uint8_t port, queue;
+ int ret;
+ uint32_t n_rx_queues, n_tx_queues;
+
+ /* Init NIC ports and queues, then start the ports */
+ for (port = 0; port < APP_MAX_NIC_PORTS; port ++) {
+ struct rte_mempool *pool;
+
+ n_rx_queues = app_get_nic_rx_queues_per_port(port);
+ n_tx_queues = app.nic_tx_port_mask[port];
+
+ if ((n_rx_queues == 0) && (n_tx_queues == 0)) {
+ continue;
+ }
+
+ /* Init port */
+ printf("Initializing NIC port %u ...\n", (unsigned) port);
+ ret = rte_eth_dev_configure(
+ port,
+ (uint8_t) n_rx_queues,
+ (uint8_t) n_tx_queues,
+ &port_conf);
+ if (ret < 0) {
+ rte_panic("Cannot init NIC port %u (%d)\n", (unsigned) port, ret);
+ }
+ rte_eth_promiscuous_enable(port);
+
+ /* Init RX queues */
+ for (queue = 0; queue < APP_MAX_RX_QUEUES_PER_NIC_PORT; queue ++) {
+ if (app.nic_rx_queue_mask[port][queue] == 0) {
+ continue;
+ }
+
+ app_get_lcore_for_nic_rx(port, queue, &lcore);
+ socket = rte_lcore_to_socket_id(lcore);
+ pool = app.lcore_params[lcore].pool;
+
+ printf("Initializing NIC port %u RX queue %u ...\n",
+ (unsigned) port,
+ (unsigned) queue);
+ ret = rte_eth_rx_queue_setup(
+ port,
+ queue,
+ (uint16_t) app.nic_rx_ring_size,
+ socket,
+ NULL,
+ pool);
+ if (ret < 0) {
+ rte_panic("Cannot init RX queue %u for port %u (%d)\n",
+ (unsigned) queue,
+ (unsigned) port,
+ ret);
+ }
+ }
+
+ /* Init TX queues */
+ if (app.nic_tx_port_mask[port] == 1) {
+ app_get_lcore_for_nic_tx(port, &lcore);
+ socket = rte_lcore_to_socket_id(lcore);
+ printf("Initializing NIC port %u TX queue 0 ...\n",
+ (unsigned) port);
+ ret = rte_eth_tx_queue_setup(
+ port,
+ 0,
+ (uint16_t) app.nic_tx_ring_size,
+ socket,
+ NULL);
+ if (ret < 0) {
+ rte_panic("Cannot init TX queue 0 for port %d (%d)\n",
+ port,
+ ret);
+ }
+ }
+
+ /* Start port */
+ ret = rte_eth_dev_start(port);
+ if (ret < 0) {
+ rte_panic("Cannot start port %d (%d)\n", port, ret);
+ }
+ }
+
+ check_all_ports_link_status(APP_MAX_NIC_PORTS, (~0x0));
+}
+
+void
+app_init(void)
+{
+ app_assign_worker_ids();
+ app_init_mbuf_pools();
+ app_init_lpm_tables();
+ app_init_rings_rx();
+ app_init_rings_tx();
+ app_init_nics();
+
+ printf("Initialization completed.\n");
+}
diff --git a/src/seastar/dpdk/examples/load_balancer/main.c b/src/seastar/dpdk/examples/load_balancer/main.c
new file mode 100644
index 00000000..c97bf6fa
--- /dev/null
+++ b/src/seastar/dpdk/examples/load_balancer/main.c
@@ -0,0 +1,108 @@
+/*-
+ * BSD LICENSE
+ *
+ * Copyright(c) 2010-2014 Intel Corporation. All rights reserved.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions
+ * are met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above copyright
+ * notice, this list of conditions and the following disclaimer in
+ * the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Intel Corporation nor the names of its
+ * contributors may be used to endorse or promote products derived
+ * from this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+#include <stdio.h>
+#include <stdlib.h>
+#include <stdint.h>
+#include <inttypes.h>
+#include <sys/types.h>
+#include <string.h>
+#include <sys/queue.h>
+#include <stdarg.h>
+#include <errno.h>
+#include <getopt.h>
+#include <unistd.h>
+
+#include <rte_common.h>
+#include <rte_byteorder.h>
+#include <rte_log.h>
+#include <rte_memory.h>
+#include <rte_memcpy.h>
+#include <rte_memzone.h>
+#include <rte_eal.h>
+#include <rte_per_lcore.h>
+#include <rte_launch.h>
+#include <rte_atomic.h>
+#include <rte_cycles.h>
+#include <rte_prefetch.h>
+#include <rte_lcore.h>
+#include <rte_per_lcore.h>
+#include <rte_branch_prediction.h>
+#include <rte_interrupts.h>
+#include <rte_pci.h>
+#include <rte_random.h>
+#include <rte_debug.h>
+#include <rte_ether.h>
+#include <rte_ethdev.h>
+#include <rte_mempool.h>
+#include <rte_mbuf.h>
+#include <rte_ip.h>
+#include <rte_tcp.h>
+#include <rte_lpm.h>
+
+#include "main.h"
+
+int
+main(int argc, char **argv)
+{
+ uint32_t lcore;
+ int ret;
+
+ /* Init EAL */
+ ret = rte_eal_init(argc, argv);
+ if (ret < 0)
+ return -1;
+ argc -= ret;
+ argv += ret;
+
+ /* Parse application arguments (after the EAL ones) */
+ ret = app_parse_args(argc, argv);
+ if (ret < 0) {
+ app_print_usage();
+ return -1;
+ }
+
+ /* Init */
+ app_init();
+ app_print_params();
+
+ /* Launch per-lcore init on every lcore */
+ rte_eal_mp_remote_launch(app_lcore_main_loop, NULL, CALL_MASTER);
+ RTE_LCORE_FOREACH_SLAVE(lcore) {
+ if (rte_eal_wait_lcore(lcore) < 0) {
+ return -1;
+ }
+ }
+
+ return 0;
+}
diff --git a/src/seastar/dpdk/examples/load_balancer/main.h b/src/seastar/dpdk/examples/load_balancer/main.h
new file mode 100644
index 00000000..d98468a7
--- /dev/null
+++ b/src/seastar/dpdk/examples/load_balancer/main.h
@@ -0,0 +1,371 @@
+/*-
+ * BSD LICENSE
+ *
+ * Copyright(c) 2010-2014 Intel Corporation. All rights reserved.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions
+ * are met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above copyright
+ * notice, this list of conditions and the following disclaimer in
+ * the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Intel Corporation nor the names of its
+ * contributors may be used to endorse or promote products derived
+ * from this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+#ifndef _MAIN_H_
+#define _MAIN_H_
+
+/* Logical cores */
+#ifndef APP_MAX_SOCKETS
+#define APP_MAX_SOCKETS 2
+#endif
+
+#ifndef APP_MAX_LCORES
+#define APP_MAX_LCORES RTE_MAX_LCORE
+#endif
+
+#ifndef APP_MAX_NIC_PORTS
+#define APP_MAX_NIC_PORTS RTE_MAX_ETHPORTS
+#endif
+
+#ifndef APP_MAX_RX_QUEUES_PER_NIC_PORT
+#define APP_MAX_RX_QUEUES_PER_NIC_PORT 128
+#endif
+
+#ifndef APP_MAX_TX_QUEUES_PER_NIC_PORT
+#define APP_MAX_TX_QUEUES_PER_NIC_PORT 128
+#endif
+
+#ifndef APP_MAX_IO_LCORES
+#define APP_MAX_IO_LCORES 16
+#endif
+#if (APP_MAX_IO_LCORES > APP_MAX_LCORES)
+#error "APP_MAX_IO_LCORES is too big"
+#endif
+
+#ifndef APP_MAX_NIC_RX_QUEUES_PER_IO_LCORE
+#define APP_MAX_NIC_RX_QUEUES_PER_IO_LCORE 16
+#endif
+
+#ifndef APP_MAX_NIC_TX_PORTS_PER_IO_LCORE
+#define APP_MAX_NIC_TX_PORTS_PER_IO_LCORE 16
+#endif
+#if (APP_MAX_NIC_TX_PORTS_PER_IO_LCORE > APP_MAX_NIC_PORTS)
+#error "APP_MAX_NIC_TX_PORTS_PER_IO_LCORE too big"
+#endif
+
+#ifndef APP_MAX_WORKER_LCORES
+#define APP_MAX_WORKER_LCORES 16
+#endif
+#if (APP_MAX_WORKER_LCORES > APP_MAX_LCORES)
+#error "APP_MAX_WORKER_LCORES is too big"
+#endif
+
+
+/* Mempools */
+#ifndef APP_DEFAULT_MBUF_DATA_SIZE
+#define APP_DEFAULT_MBUF_DATA_SIZE RTE_MBUF_DEFAULT_BUF_SIZE
+#endif
+
+#ifndef APP_DEFAULT_MEMPOOL_BUFFERS
+#define APP_DEFAULT_MEMPOOL_BUFFERS 8192 * 4
+#endif
+
+#ifndef APP_DEFAULT_MEMPOOL_CACHE_SIZE
+#define APP_DEFAULT_MEMPOOL_CACHE_SIZE 256
+#endif
+
+/* LPM Tables */
+#ifndef APP_MAX_LPM_RULES
+#define APP_MAX_LPM_RULES 1024
+#endif
+
+/* NIC RX */
+#ifndef APP_DEFAULT_NIC_RX_RING_SIZE
+#define APP_DEFAULT_NIC_RX_RING_SIZE 1024
+#endif
+
+/*
+ * RX and TX Prefetch, Host, and Write-back threshold values should be
+ * carefully set for optimal performance. Consult the network
+ * controller's datasheet and supporting DPDK documentation for guidance
+ * on how these parameters should be set.
+ */
+#ifndef APP_DEFAULT_NIC_RX_PTHRESH
+#define APP_DEFAULT_NIC_RX_PTHRESH 8
+#endif
+
+#ifndef APP_DEFAULT_NIC_RX_HTHRESH
+#define APP_DEFAULT_NIC_RX_HTHRESH 8
+#endif
+
+#ifndef APP_DEFAULT_NIC_RX_WTHRESH
+#define APP_DEFAULT_NIC_RX_WTHRESH 4
+#endif
+
+#ifndef APP_DEFAULT_NIC_RX_FREE_THRESH
+#define APP_DEFAULT_NIC_RX_FREE_THRESH 64
+#endif
+
+#ifndef APP_DEFAULT_NIC_RX_DROP_EN
+#define APP_DEFAULT_NIC_RX_DROP_EN 0
+#endif
+
+/* NIC TX */
+#ifndef APP_DEFAULT_NIC_TX_RING_SIZE
+#define APP_DEFAULT_NIC_TX_RING_SIZE 1024
+#endif
+
+/*
+ * These default values are optimized for use with the Intel(R) 82599 10 GbE
+ * Controller and the DPDK ixgbe PMD. Consider using other values for other
+ * network controllers and/or network drivers.
+ */
+#ifndef APP_DEFAULT_NIC_TX_PTHRESH
+#define APP_DEFAULT_NIC_TX_PTHRESH 36
+#endif
+
+#ifndef APP_DEFAULT_NIC_TX_HTHRESH
+#define APP_DEFAULT_NIC_TX_HTHRESH 0
+#endif
+
+#ifndef APP_DEFAULT_NIC_TX_WTHRESH
+#define APP_DEFAULT_NIC_TX_WTHRESH 0
+#endif
+
+#ifndef APP_DEFAULT_NIC_TX_FREE_THRESH
+#define APP_DEFAULT_NIC_TX_FREE_THRESH 0
+#endif
+
+#ifndef APP_DEFAULT_NIC_TX_RS_THRESH
+#define APP_DEFAULT_NIC_TX_RS_THRESH 0
+#endif
+
+/* Software Rings */
+#ifndef APP_DEFAULT_RING_RX_SIZE
+#define APP_DEFAULT_RING_RX_SIZE 1024
+#endif
+
+#ifndef APP_DEFAULT_RING_TX_SIZE
+#define APP_DEFAULT_RING_TX_SIZE 1024
+#endif
+
+/* Bursts */
+#ifndef APP_MBUF_ARRAY_SIZE
+#define APP_MBUF_ARRAY_SIZE 512
+#endif
+
+#ifndef APP_DEFAULT_BURST_SIZE_IO_RX_READ
+#define APP_DEFAULT_BURST_SIZE_IO_RX_READ 144
+#endif
+#if (APP_DEFAULT_BURST_SIZE_IO_RX_READ > APP_MBUF_ARRAY_SIZE)
+#error "APP_DEFAULT_BURST_SIZE_IO_RX_READ is too big"
+#endif
+
+#ifndef APP_DEFAULT_BURST_SIZE_IO_RX_WRITE
+#define APP_DEFAULT_BURST_SIZE_IO_RX_WRITE 144
+#endif
+#if (APP_DEFAULT_BURST_SIZE_IO_RX_WRITE > APP_MBUF_ARRAY_SIZE)
+#error "APP_DEFAULT_BURST_SIZE_IO_RX_WRITE is too big"
+#endif
+
+#ifndef APP_DEFAULT_BURST_SIZE_IO_TX_READ
+#define APP_DEFAULT_BURST_SIZE_IO_TX_READ 144
+#endif
+#if (APP_DEFAULT_BURST_SIZE_IO_TX_READ > APP_MBUF_ARRAY_SIZE)
+#error "APP_DEFAULT_BURST_SIZE_IO_TX_READ is too big"
+#endif
+
+#ifndef APP_DEFAULT_BURST_SIZE_IO_TX_WRITE
+#define APP_DEFAULT_BURST_SIZE_IO_TX_WRITE 144
+#endif
+#if (APP_DEFAULT_BURST_SIZE_IO_TX_WRITE > APP_MBUF_ARRAY_SIZE)
+#error "APP_DEFAULT_BURST_SIZE_IO_TX_WRITE is too big"
+#endif
+
+#ifndef APP_DEFAULT_BURST_SIZE_WORKER_READ
+#define APP_DEFAULT_BURST_SIZE_WORKER_READ 144
+#endif
+#if ((2 * APP_DEFAULT_BURST_SIZE_WORKER_READ) > APP_MBUF_ARRAY_SIZE)
+#error "APP_DEFAULT_BURST_SIZE_WORKER_READ is too big"
+#endif
+
+#ifndef APP_DEFAULT_BURST_SIZE_WORKER_WRITE
+#define APP_DEFAULT_BURST_SIZE_WORKER_WRITE 144
+#endif
+#if (APP_DEFAULT_BURST_SIZE_WORKER_WRITE > APP_MBUF_ARRAY_SIZE)
+#error "APP_DEFAULT_BURST_SIZE_WORKER_WRITE is too big"
+#endif
+
+/* Load balancing logic */
+#ifndef APP_DEFAULT_IO_RX_LB_POS
+#define APP_DEFAULT_IO_RX_LB_POS 29
+#endif
+#if (APP_DEFAULT_IO_RX_LB_POS >= 64)
+#error "APP_DEFAULT_IO_RX_LB_POS is too big"
+#endif
+
+struct app_mbuf_array {
+ struct rte_mbuf *array[APP_MBUF_ARRAY_SIZE];
+ uint32_t n_mbufs;
+};
+
+enum app_lcore_type {
+ e_APP_LCORE_DISABLED = 0,
+ e_APP_LCORE_IO,
+ e_APP_LCORE_WORKER
+};
+
+struct app_lcore_params_io {
+ /* I/O RX */
+ struct {
+ /* NIC */
+ struct {
+ uint8_t port;
+ uint8_t queue;
+ } nic_queues[APP_MAX_NIC_RX_QUEUES_PER_IO_LCORE];
+ uint32_t n_nic_queues;
+
+ /* Rings */
+ struct rte_ring *rings[APP_MAX_WORKER_LCORES];
+ uint32_t n_rings;
+
+ /* Internal buffers */
+ struct app_mbuf_array mbuf_in;
+ struct app_mbuf_array mbuf_out[APP_MAX_WORKER_LCORES];
+ uint8_t mbuf_out_flush[APP_MAX_WORKER_LCORES];
+
+ /* Stats */
+ uint32_t nic_queues_count[APP_MAX_NIC_RX_QUEUES_PER_IO_LCORE];
+ uint32_t nic_queues_iters[APP_MAX_NIC_RX_QUEUES_PER_IO_LCORE];
+ uint32_t rings_count[APP_MAX_WORKER_LCORES];
+ uint32_t rings_iters[APP_MAX_WORKER_LCORES];
+ } rx;
+
+ /* I/O TX */
+ struct {
+ /* Rings */
+ struct rte_ring *rings[APP_MAX_NIC_PORTS][APP_MAX_WORKER_LCORES];
+
+ /* NIC */
+ uint8_t nic_ports[APP_MAX_NIC_TX_PORTS_PER_IO_LCORE];
+ uint32_t n_nic_ports;
+
+ /* Internal buffers */
+ struct app_mbuf_array mbuf_out[APP_MAX_NIC_TX_PORTS_PER_IO_LCORE];
+ uint8_t mbuf_out_flush[APP_MAX_NIC_TX_PORTS_PER_IO_LCORE];
+
+ /* Stats */
+ uint32_t rings_count[APP_MAX_NIC_PORTS][APP_MAX_WORKER_LCORES];
+ uint32_t rings_iters[APP_MAX_NIC_PORTS][APP_MAX_WORKER_LCORES];
+ uint32_t nic_ports_count[APP_MAX_NIC_TX_PORTS_PER_IO_LCORE];
+ uint32_t nic_ports_iters[APP_MAX_NIC_TX_PORTS_PER_IO_LCORE];
+ } tx;
+};
+
+struct app_lcore_params_worker {
+ /* Rings */
+ struct rte_ring *rings_in[APP_MAX_IO_LCORES];
+ uint32_t n_rings_in;
+ struct rte_ring *rings_out[APP_MAX_NIC_PORTS];
+
+ /* LPM table */
+ struct rte_lpm *lpm_table;
+ uint32_t worker_id;
+
+ /* Internal buffers */
+ struct app_mbuf_array mbuf_in;
+ struct app_mbuf_array mbuf_out[APP_MAX_NIC_PORTS];
+ uint8_t mbuf_out_flush[APP_MAX_NIC_PORTS];
+
+ /* Stats */
+ uint32_t rings_in_count[APP_MAX_IO_LCORES];
+ uint32_t rings_in_iters[APP_MAX_IO_LCORES];
+ uint32_t rings_out_count[APP_MAX_NIC_PORTS];
+ uint32_t rings_out_iters[APP_MAX_NIC_PORTS];
+};
+
+struct app_lcore_params {
+ union {
+ struct app_lcore_params_io io;
+ struct app_lcore_params_worker worker;
+ };
+ enum app_lcore_type type;
+ struct rte_mempool *pool;
+} __rte_cache_aligned;
+
+struct app_lpm_rule {
+ uint32_t ip;
+ uint8_t depth;
+ uint8_t if_out;
+};
+
+struct app_params {
+ /* lcore */
+ struct app_lcore_params lcore_params[APP_MAX_LCORES];
+
+ /* NIC */
+ uint8_t nic_rx_queue_mask[APP_MAX_NIC_PORTS][APP_MAX_RX_QUEUES_PER_NIC_PORT];
+ uint8_t nic_tx_port_mask[APP_MAX_NIC_PORTS];
+
+ /* mbuf pools */
+ struct rte_mempool *pools[APP_MAX_SOCKETS];
+
+ /* LPM tables */
+ struct rte_lpm *lpm_tables[APP_MAX_SOCKETS];
+ struct app_lpm_rule lpm_rules[APP_MAX_LPM_RULES];
+ uint32_t n_lpm_rules;
+
+ /* rings */
+ uint32_t nic_rx_ring_size;
+ uint32_t nic_tx_ring_size;
+ uint32_t ring_rx_size;
+ uint32_t ring_tx_size;
+
+ /* burst size */
+ uint32_t burst_size_io_rx_read;
+ uint32_t burst_size_io_rx_write;
+ uint32_t burst_size_io_tx_read;
+ uint32_t burst_size_io_tx_write;
+ uint32_t burst_size_worker_read;
+ uint32_t burst_size_worker_write;
+
+ /* load balancing */
+ uint8_t pos_lb;
+} __rte_cache_aligned;
+
+extern struct app_params app;
+
+int app_parse_args(int argc, char **argv);
+void app_print_usage(void);
+void app_init(void);
+int app_lcore_main_loop(void *arg);
+
+int app_get_nic_rx_queues_per_port(uint8_t port);
+int app_get_lcore_for_nic_rx(uint8_t port, uint8_t queue, uint32_t *lcore_out);
+int app_get_lcore_for_nic_tx(uint8_t port, uint32_t *lcore_out);
+int app_is_socket_used(uint32_t socket);
+uint32_t app_get_lcores_io_rx(void);
+uint32_t app_get_lcores_worker(void);
+void app_print_params(void);
+
+#endif /* _MAIN_H_ */
diff --git a/src/seastar/dpdk/examples/load_balancer/runtime.c b/src/seastar/dpdk/examples/load_balancer/runtime.c
new file mode 100644
index 00000000..7f918aa4
--- /dev/null
+++ b/src/seastar/dpdk/examples/load_balancer/runtime.c
@@ -0,0 +1,674 @@
+/*-
+ * BSD LICENSE
+ *
+ * Copyright(c) 2010-2014 Intel Corporation. All rights reserved.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions
+ * are met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above copyright
+ * notice, this list of conditions and the following disclaimer in
+ * the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Intel Corporation nor the names of its
+ * contributors may be used to endorse or promote products derived
+ * from this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+#include <stdio.h>
+#include <stdlib.h>
+#include <stdint.h>
+#include <inttypes.h>
+#include <sys/types.h>
+#include <string.h>
+#include <sys/queue.h>
+#include <stdarg.h>
+#include <errno.h>
+#include <getopt.h>
+
+#include <rte_common.h>
+#include <rte_byteorder.h>
+#include <rte_log.h>
+#include <rte_memory.h>
+#include <rte_memcpy.h>
+#include <rte_memzone.h>
+#include <rte_eal.h>
+#include <rte_per_lcore.h>
+#include <rte_launch.h>
+#include <rte_atomic.h>
+#include <rte_cycles.h>
+#include <rte_prefetch.h>
+#include <rte_lcore.h>
+#include <rte_per_lcore.h>
+#include <rte_branch_prediction.h>
+#include <rte_interrupts.h>
+#include <rte_pci.h>
+#include <rte_random.h>
+#include <rte_debug.h>
+#include <rte_ether.h>
+#include <rte_ethdev.h>
+#include <rte_ring.h>
+#include <rte_mempool.h>
+#include <rte_mbuf.h>
+#include <rte_ip.h>
+#include <rte_tcp.h>
+#include <rte_lpm.h>
+
+#include "main.h"
+
+#ifndef APP_LCORE_IO_FLUSH
+#define APP_LCORE_IO_FLUSH 1000000
+#endif
+
+#ifndef APP_LCORE_WORKER_FLUSH
+#define APP_LCORE_WORKER_FLUSH 1000000
+#endif
+
+#ifndef APP_STATS
+#define APP_STATS 1000000
+#endif
+
+#define APP_IO_RX_DROP_ALL_PACKETS 0
+#define APP_WORKER_DROP_ALL_PACKETS 0
+#define APP_IO_TX_DROP_ALL_PACKETS 0
+
+#ifndef APP_IO_RX_PREFETCH_ENABLE
+#define APP_IO_RX_PREFETCH_ENABLE 1
+#endif
+
+#ifndef APP_WORKER_PREFETCH_ENABLE
+#define APP_WORKER_PREFETCH_ENABLE 1
+#endif
+
+#ifndef APP_IO_TX_PREFETCH_ENABLE
+#define APP_IO_TX_PREFETCH_ENABLE 1
+#endif
+
+#if APP_IO_RX_PREFETCH_ENABLE
+#define APP_IO_RX_PREFETCH0(p) rte_prefetch0(p)
+#define APP_IO_RX_PREFETCH1(p) rte_prefetch1(p)
+#else
+#define APP_IO_RX_PREFETCH0(p)
+#define APP_IO_RX_PREFETCH1(p)
+#endif
+
+#if APP_WORKER_PREFETCH_ENABLE
+#define APP_WORKER_PREFETCH0(p) rte_prefetch0(p)
+#define APP_WORKER_PREFETCH1(p) rte_prefetch1(p)
+#else
+#define APP_WORKER_PREFETCH0(p)
+#define APP_WORKER_PREFETCH1(p)
+#endif
+
+#if APP_IO_TX_PREFETCH_ENABLE
+#define APP_IO_TX_PREFETCH0(p) rte_prefetch0(p)
+#define APP_IO_TX_PREFETCH1(p) rte_prefetch1(p)
+#else
+#define APP_IO_TX_PREFETCH0(p)
+#define APP_IO_TX_PREFETCH1(p)
+#endif
+
+static inline void
+app_lcore_io_rx_buffer_to_send (
+ struct app_lcore_params_io *lp,
+ uint32_t worker,
+ struct rte_mbuf *mbuf,
+ uint32_t bsz)
+{
+ uint32_t pos;
+ int ret;
+
+ pos = lp->rx.mbuf_out[worker].n_mbufs;
+ lp->rx.mbuf_out[worker].array[pos ++] = mbuf;
+ if (likely(pos < bsz)) {
+ lp->rx.mbuf_out[worker].n_mbufs = pos;
+ return;
+ }
+
+ ret = rte_ring_sp_enqueue_bulk(
+ lp->rx.rings[worker],
+ (void **) lp->rx.mbuf_out[worker].array,
+ bsz,
+ NULL);
+
+ if (unlikely(ret == 0)) {
+ uint32_t k;
+ for (k = 0; k < bsz; k ++) {
+ struct rte_mbuf *m = lp->rx.mbuf_out[worker].array[k];
+ rte_pktmbuf_free(m);
+ }
+ }
+
+ lp->rx.mbuf_out[worker].n_mbufs = 0;
+ lp->rx.mbuf_out_flush[worker] = 0;
+
+#if APP_STATS
+ lp->rx.rings_iters[worker] ++;
+ if (likely(ret == 0)) {
+ lp->rx.rings_count[worker] ++;
+ }
+ if (unlikely(lp->rx.rings_iters[worker] == APP_STATS)) {
+ unsigned lcore = rte_lcore_id();
+
+ printf("\tI/O RX %u out (worker %u): enq success rate = %.2f\n",
+ lcore,
+ (unsigned)worker,
+ ((double) lp->rx.rings_count[worker]) / ((double) lp->rx.rings_iters[worker]));
+ lp->rx.rings_iters[worker] = 0;
+ lp->rx.rings_count[worker] = 0;
+ }
+#endif
+}
+
+static inline void
+app_lcore_io_rx(
+ struct app_lcore_params_io *lp,
+ uint32_t n_workers,
+ uint32_t bsz_rd,
+ uint32_t bsz_wr,
+ uint8_t pos_lb)
+{
+ struct rte_mbuf *mbuf_1_0, *mbuf_1_1, *mbuf_2_0, *mbuf_2_1;
+ uint8_t *data_1_0, *data_1_1 = NULL;
+ uint32_t i;
+
+ for (i = 0; i < lp->rx.n_nic_queues; i ++) {
+ uint8_t port = lp->rx.nic_queues[i].port;
+ uint8_t queue = lp->rx.nic_queues[i].queue;
+ uint32_t n_mbufs, j;
+
+ n_mbufs = rte_eth_rx_burst(
+ port,
+ queue,
+ lp->rx.mbuf_in.array,
+ (uint16_t) bsz_rd);
+
+ if (unlikely(n_mbufs == 0)) {
+ continue;
+ }
+
+#if APP_STATS
+ lp->rx.nic_queues_iters[i] ++;
+ lp->rx.nic_queues_count[i] += n_mbufs;
+ if (unlikely(lp->rx.nic_queues_iters[i] == APP_STATS)) {
+ struct rte_eth_stats stats;
+ unsigned lcore = rte_lcore_id();
+
+ rte_eth_stats_get(port, &stats);
+
+ printf("I/O RX %u in (NIC port %u): NIC drop ratio = %.2f avg burst size = %.2f\n",
+ lcore,
+ (unsigned) port,
+ (double) stats.imissed / (double) (stats.imissed + stats.ipackets),
+ ((double) lp->rx.nic_queues_count[i]) / ((double) lp->rx.nic_queues_iters[i]));
+ lp->rx.nic_queues_iters[i] = 0;
+ lp->rx.nic_queues_count[i] = 0;
+ }
+#endif
+
+#if APP_IO_RX_DROP_ALL_PACKETS
+ for (j = 0; j < n_mbufs; j ++) {
+ struct rte_mbuf *pkt = lp->rx.mbuf_in.array[j];
+ rte_pktmbuf_free(pkt);
+ }
+
+ continue;
+#endif
+
+ mbuf_1_0 = lp->rx.mbuf_in.array[0];
+ mbuf_1_1 = lp->rx.mbuf_in.array[1];
+ data_1_0 = rte_pktmbuf_mtod(mbuf_1_0, uint8_t *);
+ if (likely(n_mbufs > 1)) {
+ data_1_1 = rte_pktmbuf_mtod(mbuf_1_1, uint8_t *);
+ }
+
+ mbuf_2_0 = lp->rx.mbuf_in.array[2];
+ mbuf_2_1 = lp->rx.mbuf_in.array[3];
+ APP_IO_RX_PREFETCH0(mbuf_2_0);
+ APP_IO_RX_PREFETCH0(mbuf_2_1);
+
+ for (j = 0; j + 3 < n_mbufs; j += 2) {
+ struct rte_mbuf *mbuf_0_0, *mbuf_0_1;
+ uint8_t *data_0_0, *data_0_1;
+ uint32_t worker_0, worker_1;
+
+ mbuf_0_0 = mbuf_1_0;
+ mbuf_0_1 = mbuf_1_1;
+ data_0_0 = data_1_0;
+ data_0_1 = data_1_1;
+
+ mbuf_1_0 = mbuf_2_0;
+ mbuf_1_1 = mbuf_2_1;
+ data_1_0 = rte_pktmbuf_mtod(mbuf_2_0, uint8_t *);
+ data_1_1 = rte_pktmbuf_mtod(mbuf_2_1, uint8_t *);
+ APP_IO_RX_PREFETCH0(data_1_0);
+ APP_IO_RX_PREFETCH0(data_1_1);
+
+ mbuf_2_0 = lp->rx.mbuf_in.array[j+4];
+ mbuf_2_1 = lp->rx.mbuf_in.array[j+5];
+ APP_IO_RX_PREFETCH0(mbuf_2_0);
+ APP_IO_RX_PREFETCH0(mbuf_2_1);
+
+ worker_0 = data_0_0[pos_lb] & (n_workers - 1);
+ worker_1 = data_0_1[pos_lb] & (n_workers - 1);
+
+ app_lcore_io_rx_buffer_to_send(lp, worker_0, mbuf_0_0, bsz_wr);
+ app_lcore_io_rx_buffer_to_send(lp, worker_1, mbuf_0_1, bsz_wr);
+ }
+
+ /* Handle the last 1, 2 (when n_mbufs is even) or 3 (when n_mbufs is odd) packets */
+ for ( ; j < n_mbufs; j += 1) {
+ struct rte_mbuf *mbuf;
+ uint8_t *data;
+ uint32_t worker;
+
+ mbuf = mbuf_1_0;
+ mbuf_1_0 = mbuf_1_1;
+ mbuf_1_1 = mbuf_2_0;
+ mbuf_2_0 = mbuf_2_1;
+
+ data = rte_pktmbuf_mtod(mbuf, uint8_t *);
+
+ APP_IO_RX_PREFETCH0(mbuf_1_0);
+
+ worker = data[pos_lb] & (n_workers - 1);
+
+ app_lcore_io_rx_buffer_to_send(lp, worker, mbuf, bsz_wr);
+ }
+ }
+}
+
+static inline void
+app_lcore_io_rx_flush(struct app_lcore_params_io *lp, uint32_t n_workers)
+{
+ uint32_t worker;
+
+ for (worker = 0; worker < n_workers; worker ++) {
+ int ret;
+
+ if (likely((lp->rx.mbuf_out_flush[worker] == 0) ||
+ (lp->rx.mbuf_out[worker].n_mbufs == 0))) {
+ lp->rx.mbuf_out_flush[worker] = 1;
+ continue;
+ }
+
+ ret = rte_ring_sp_enqueue_bulk(
+ lp->rx.rings[worker],
+ (void **) lp->rx.mbuf_out[worker].array,
+ lp->rx.mbuf_out[worker].n_mbufs,
+ NULL);
+
+ if (unlikely(ret == 0)) {
+ uint32_t k;
+ for (k = 0; k < lp->rx.mbuf_out[worker].n_mbufs; k ++) {
+ struct rte_mbuf *pkt_to_free = lp->rx.mbuf_out[worker].array[k];
+ rte_pktmbuf_free(pkt_to_free);
+ }
+ }
+
+ lp->rx.mbuf_out[worker].n_mbufs = 0;
+ lp->rx.mbuf_out_flush[worker] = 1;
+ }
+}
+
+static inline void
+app_lcore_io_tx(
+ struct app_lcore_params_io *lp,
+ uint32_t n_workers,
+ uint32_t bsz_rd,
+ uint32_t bsz_wr)
+{
+ uint32_t worker;
+
+ for (worker = 0; worker < n_workers; worker ++) {
+ uint32_t i;
+
+ for (i = 0; i < lp->tx.n_nic_ports; i ++) {
+ uint8_t port = lp->tx.nic_ports[i];
+ struct rte_ring *ring = lp->tx.rings[port][worker];
+ uint32_t n_mbufs, n_pkts;
+ int ret;
+
+ n_mbufs = lp->tx.mbuf_out[port].n_mbufs;
+ ret = rte_ring_sc_dequeue_bulk(
+ ring,
+ (void **) &lp->tx.mbuf_out[port].array[n_mbufs],
+ bsz_rd,
+ NULL);
+
+ if (unlikely(ret == 0))
+ continue;
+
+ n_mbufs += bsz_rd;
+
+#if APP_IO_TX_DROP_ALL_PACKETS
+ {
+ uint32_t j;
+ APP_IO_TX_PREFETCH0(lp->tx.mbuf_out[port].array[0]);
+ APP_IO_TX_PREFETCH0(lp->tx.mbuf_out[port].array[1]);
+
+ for (j = 0; j < n_mbufs; j ++) {
+ if (likely(j < n_mbufs - 2)) {
+ APP_IO_TX_PREFETCH0(lp->tx.mbuf_out[port].array[j + 2]);
+ }
+
+ rte_pktmbuf_free(lp->tx.mbuf_out[port].array[j]);
+ }
+
+ lp->tx.mbuf_out[port].n_mbufs = 0;
+
+ continue;
+ }
+#endif
+
+ if (unlikely(n_mbufs < bsz_wr)) {
+ lp->tx.mbuf_out[port].n_mbufs = n_mbufs;
+ continue;
+ }
+
+ n_pkts = rte_eth_tx_burst(
+ port,
+ 0,
+ lp->tx.mbuf_out[port].array,
+ (uint16_t) n_mbufs);
+
+#if APP_STATS
+ lp->tx.nic_ports_iters[port] ++;
+ lp->tx.nic_ports_count[port] += n_pkts;
+ if (unlikely(lp->tx.nic_ports_iters[port] == APP_STATS)) {
+ unsigned lcore = rte_lcore_id();
+
+ printf("\t\t\tI/O TX %u out (port %u): avg burst size = %.2f\n",
+ lcore,
+ (unsigned) port,
+ ((double) lp->tx.nic_ports_count[port]) / ((double) lp->tx.nic_ports_iters[port]));
+ lp->tx.nic_ports_iters[port] = 0;
+ lp->tx.nic_ports_count[port] = 0;
+ }
+#endif
+
+ if (unlikely(n_pkts < n_mbufs)) {
+ uint32_t k;
+ for (k = n_pkts; k < n_mbufs; k ++) {
+ struct rte_mbuf *pkt_to_free = lp->tx.mbuf_out[port].array[k];
+ rte_pktmbuf_free(pkt_to_free);
+ }
+ }
+ lp->tx.mbuf_out[port].n_mbufs = 0;
+ lp->tx.mbuf_out_flush[port] = 0;
+ }
+ }
+}
+
+static inline void
+app_lcore_io_tx_flush(struct app_lcore_params_io *lp)
+{
+ uint8_t port;
+ uint32_t i;
+
+ for (i = 0; i < lp->tx.n_nic_ports; i++) {
+ uint32_t n_pkts;
+
+ port = lp->tx.nic_ports[i];
+ if (likely((lp->tx.mbuf_out_flush[port] == 0) ||
+ (lp->tx.mbuf_out[port].n_mbufs == 0))) {
+ lp->tx.mbuf_out_flush[port] = 1;
+ continue;
+ }
+
+ n_pkts = rte_eth_tx_burst(
+ port,
+ 0,
+ lp->tx.mbuf_out[port].array,
+ (uint16_t) lp->tx.mbuf_out[port].n_mbufs);
+
+ if (unlikely(n_pkts < lp->tx.mbuf_out[port].n_mbufs)) {
+ uint32_t k;
+ for (k = n_pkts; k < lp->tx.mbuf_out[port].n_mbufs; k ++) {
+ struct rte_mbuf *pkt_to_free = lp->tx.mbuf_out[port].array[k];
+ rte_pktmbuf_free(pkt_to_free);
+ }
+ }
+
+ lp->tx.mbuf_out[port].n_mbufs = 0;
+ lp->tx.mbuf_out_flush[port] = 1;
+ }
+}
+
+static void
+app_lcore_main_loop_io(void)
+{
+ uint32_t lcore = rte_lcore_id();
+ struct app_lcore_params_io *lp = &app.lcore_params[lcore].io;
+ uint32_t n_workers = app_get_lcores_worker();
+ uint64_t i = 0;
+
+ uint32_t bsz_rx_rd = app.burst_size_io_rx_read;
+ uint32_t bsz_rx_wr = app.burst_size_io_rx_write;
+ uint32_t bsz_tx_rd = app.burst_size_io_tx_read;
+ uint32_t bsz_tx_wr = app.burst_size_io_tx_write;
+
+ uint8_t pos_lb = app.pos_lb;
+
+ for ( ; ; ) {
+ if (APP_LCORE_IO_FLUSH && (unlikely(i == APP_LCORE_IO_FLUSH))) {
+ if (likely(lp->rx.n_nic_queues > 0)) {
+ app_lcore_io_rx_flush(lp, n_workers);
+ }
+
+ if (likely(lp->tx.n_nic_ports > 0)) {
+ app_lcore_io_tx_flush(lp);
+ }
+
+ i = 0;
+ }
+
+ if (likely(lp->rx.n_nic_queues > 0)) {
+ app_lcore_io_rx(lp, n_workers, bsz_rx_rd, bsz_rx_wr, pos_lb);
+ }
+
+ if (likely(lp->tx.n_nic_ports > 0)) {
+ app_lcore_io_tx(lp, n_workers, bsz_tx_rd, bsz_tx_wr);
+ }
+
+ i ++;
+ }
+}
+
+static inline void
+app_lcore_worker(
+ struct app_lcore_params_worker *lp,
+ uint32_t bsz_rd,
+ uint32_t bsz_wr)
+{
+ uint32_t i;
+
+ for (i = 0; i < lp->n_rings_in; i ++) {
+ struct rte_ring *ring_in = lp->rings_in[i];
+ uint32_t j;
+ int ret;
+
+ ret = rte_ring_sc_dequeue_bulk(
+ ring_in,
+ (void **) lp->mbuf_in.array,
+ bsz_rd,
+ NULL);
+
+ if (unlikely(ret == 0))
+ continue;
+
+#if APP_WORKER_DROP_ALL_PACKETS
+ for (j = 0; j < bsz_rd; j ++) {
+ struct rte_mbuf *pkt = lp->mbuf_in.array[j];
+ rte_pktmbuf_free(pkt);
+ }
+
+ continue;
+#endif
+
+ APP_WORKER_PREFETCH1(rte_pktmbuf_mtod(lp->mbuf_in.array[0], unsigned char *));
+ APP_WORKER_PREFETCH0(lp->mbuf_in.array[1]);
+
+ for (j = 0; j < bsz_rd; j ++) {
+ struct rte_mbuf *pkt;
+ struct ipv4_hdr *ipv4_hdr;
+ uint32_t ipv4_dst, pos;
+ uint32_t port;
+
+ if (likely(j < bsz_rd - 1)) {
+ APP_WORKER_PREFETCH1(rte_pktmbuf_mtod(lp->mbuf_in.array[j+1], unsigned char *));
+ }
+ if (likely(j < bsz_rd - 2)) {
+ APP_WORKER_PREFETCH0(lp->mbuf_in.array[j+2]);
+ }
+
+ pkt = lp->mbuf_in.array[j];
+ ipv4_hdr = rte_pktmbuf_mtod_offset(pkt,
+ struct ipv4_hdr *,
+ sizeof(struct ether_hdr));
+ ipv4_dst = rte_be_to_cpu_32(ipv4_hdr->dst_addr);
+
+ if (unlikely(rte_lpm_lookup(lp->lpm_table, ipv4_dst, &port) != 0)) {
+ port = pkt->port;
+ }
+
+ pos = lp->mbuf_out[port].n_mbufs;
+
+ lp->mbuf_out[port].array[pos ++] = pkt;
+ if (likely(pos < bsz_wr)) {
+ lp->mbuf_out[port].n_mbufs = pos;
+ continue;
+ }
+
+ ret = rte_ring_sp_enqueue_bulk(
+ lp->rings_out[port],
+ (void **) lp->mbuf_out[port].array,
+ bsz_wr,
+ NULL);
+
+#if APP_STATS
+ lp->rings_out_iters[port] ++;
+ if (ret > 0) {
+ lp->rings_out_count[port] += 1;
+ }
+ if (lp->rings_out_iters[port] == APP_STATS){
+ printf("\t\tWorker %u out (NIC port %u): enq success rate = %.2f\n",
+ (unsigned) lp->worker_id,
+ (unsigned) port,
+ ((double) lp->rings_out_count[port]) / ((double) lp->rings_out_iters[port]));
+ lp->rings_out_iters[port] = 0;
+ lp->rings_out_count[port] = 0;
+ }
+#endif
+
+ if (unlikely(ret == 0)) {
+ uint32_t k;
+ for (k = 0; k < bsz_wr; k ++) {
+ struct rte_mbuf *pkt_to_free = lp->mbuf_out[port].array[k];
+ rte_pktmbuf_free(pkt_to_free);
+ }
+ }
+
+ lp->mbuf_out[port].n_mbufs = 0;
+ lp->mbuf_out_flush[port] = 0;
+ }
+ }
+}
+
+static inline void
+app_lcore_worker_flush(struct app_lcore_params_worker *lp)
+{
+ uint32_t port;
+
+ for (port = 0; port < APP_MAX_NIC_PORTS; port ++) {
+ int ret;
+
+ if (unlikely(lp->rings_out[port] == NULL)) {
+ continue;
+ }
+
+ if (likely((lp->mbuf_out_flush[port] == 0) ||
+ (lp->mbuf_out[port].n_mbufs == 0))) {
+ lp->mbuf_out_flush[port] = 1;
+ continue;
+ }
+
+ ret = rte_ring_sp_enqueue_bulk(
+ lp->rings_out[port],
+ (void **) lp->mbuf_out[port].array,
+ lp->mbuf_out[port].n_mbufs,
+ NULL);
+
+ if (unlikely(ret == 0)) {
+ uint32_t k;
+ for (k = 0; k < lp->mbuf_out[port].n_mbufs; k ++) {
+ struct rte_mbuf *pkt_to_free = lp->mbuf_out[port].array[k];
+ rte_pktmbuf_free(pkt_to_free);
+ }
+ }
+
+ lp->mbuf_out[port].n_mbufs = 0;
+ lp->mbuf_out_flush[port] = 1;
+ }
+}
+
+static void
+app_lcore_main_loop_worker(void) {
+ uint32_t lcore = rte_lcore_id();
+ struct app_lcore_params_worker *lp = &app.lcore_params[lcore].worker;
+ uint64_t i = 0;
+
+ uint32_t bsz_rd = app.burst_size_worker_read;
+ uint32_t bsz_wr = app.burst_size_worker_write;
+
+ for ( ; ; ) {
+ if (APP_LCORE_WORKER_FLUSH && (unlikely(i == APP_LCORE_WORKER_FLUSH))) {
+ app_lcore_worker_flush(lp);
+ i = 0;
+ }
+
+ app_lcore_worker(lp, bsz_rd, bsz_wr);
+
+ i ++;
+ }
+}
+
+int
+app_lcore_main_loop(__attribute__((unused)) void *arg)
+{
+ struct app_lcore_params *lp;
+ unsigned lcore;
+
+ lcore = rte_lcore_id();
+ lp = &app.lcore_params[lcore];
+
+ if (lp->type == e_APP_LCORE_IO) {
+ printf("Logical core %u (I/O) main loop.\n", lcore);
+ app_lcore_main_loop_io();
+ }
+
+ if (lp->type == e_APP_LCORE_WORKER) {
+ printf("Logical core %u (worker %u) main loop.\n",
+ lcore,
+ (unsigned) lp->worker.worker_id);
+ app_lcore_main_loop_worker();
+ }
+
+ return 0;
+}