summaryrefslogtreecommitdiffstats
path: root/src/packet_ring.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/packet_ring.c')
-rw-r--r--src/packet_ring.c219
1 files changed, 219 insertions, 0 deletions
diff --git a/src/packet_ring.c b/src/packet_ring.c
new file mode 100644
index 0000000..93e7578
--- /dev/null
+++ b/src/packet_ring.c
@@ -0,0 +1,219 @@
+/*---------------------------------------------------------------
+ * Copyright (c) 2019
+ * Broadcom Corporation
+ * All Rights Reserved.
+ *---------------------------------------------------------------
+ * Permission is hereby granted, free of charge, to any person
+ * obtaining a copy of this software and associated
+ * documentation files (the "Software"), to deal in the Software
+ * without restriction, including without limitation the
+ * rights to use, copy, modify, merge, publish, distribute,
+ * sublicense, and/or sell copies of the Software, and to permit
+ * persons to whom the Software is furnished to do
+ * so, subject to the following conditions:
+ *
+ *
+ * Redistributions of source code must retain the above
+ * copyright notice, this list of conditions and
+ * the following disclaimers.
+ *
+ *
+ * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following
+ * disclaimers in the documentation and/or other materials
+ * provided with the distribution.
+ *
+ *
+ * Neither the name of Broadcom Coporation,
+ * nor the names of its contributors may be used to endorse
+ * or promote products derived from this Software without
+ * specific prior written permission.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
+ * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES
+ * OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
+ * NONINFRINGEMENT. IN NO EVENT SHALL THE CONTIBUTORS OR COPYRIGHT
+ * HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
+ * WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE,
+ * ARISING FROM, OUT OF OR IN CONNECTION WITH THE
+ * SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
+ * ________________________________________________________________
+ *
+ * Suppport for packet rings between threads
+ *
+ * by Robert J. McMahon (rjmcmahon@rjmcmahon.com, bob.mcmahon@broadcom.com)
+ * -------------------------------------------------------------------
+ */
+#include "headers.h"
+#include "packet_ring.h"
+#include "Condition.h"
+#include "Thread.h"
+
+#ifdef HAVE_THREAD_DEBUG
+#include "Mutex.h"
+static int totalpacketringcount = 0;
+Mutex packetringdebug_mutex;
+#endif
+
+struct PacketRing * packetring_init (int count, struct Condition *awake_consumer, struct Condition *awake_producer) {
+ assert(awake_consumer != NULL);
+ struct PacketRing *pr = NULL;
+ if ((pr = (struct PacketRing *) calloc(1, sizeof(struct PacketRing)))) {
+ pr->bytes = sizeof(struct PacketRing);
+ pr->data = (struct ReportStruct *) calloc(count, sizeof(struct ReportStruct));
+ pr->bytes += count * sizeof(struct ReportStruct);
+ }
+ if (!pr || !pr->data) {
+ fprintf(stderr, "ERROR: no memory for packet ring of size %d count, try to reduce with option --NUM_REPORT_STRUCTS\n", count);
+ exit(1);
+ }
+ pr->producer = 0;
+ pr->consumer = 0;
+ pr->maxcount = count;
+ pr->awake_producer = awake_producer;
+ pr->awake_consumer = awake_consumer;
+ if (!awake_producer)
+ pr->mutex_enable=0;
+ else
+ pr->mutex_enable=1;
+ pr->consumerdone = 0;
+ pr->awaitcounter = 0;
+#ifdef HAVE_THREAD_DEBUG
+ Mutex_Lock(&packetringdebug_mutex);
+ totalpacketringcount++;
+ thread_debug("Init %d element packet ring=%p consumer=%p producer=%p total rings=%d enable=%d", count, \
+ (void *)pr, (void *) pr->awake_consumer, (void *) pr->awake_producer, totalpacketringcount, pr->mutex_enable);
+ Mutex_Unlock(&packetringdebug_mutex);
+#endif
+ return (pr);
+}
+
+inline void packetring_enqueue (struct PacketRing *pr, struct ReportStruct *metapacket) {
+ while (((pr->producer == pr->maxcount) && (pr->consumer == 0)) || \
+ ((pr->producer + 1) == pr->consumer)) {
+ // Signal the consumer thread to process a full queue
+ if (pr->mutex_enable) {
+ assert(pr->awake_consumer != NULL);
+ Condition_Signal(pr->awake_consumer);
+ // Wait for the consumer to create some queue space
+ assert(pr->awake_producer != NULL);
+ Condition_Lock((*(pr->awake_producer)));
+ pr->awaitcounter++;
+#ifdef HAVE_THREAD_DEBUG_PERF
+ {
+ struct timeval now;
+ static struct timeval prev={0, 0};
+ gettimeofday( &now, NULL );
+ if (!prev.tv_sec || (TimeDifference(now, prev) > 1.0)) {
+ prev = now;
+ thread_debug( "Not good, traffic's packet ring %p stalled per %p", (void *)pr, (void *)&pr->awake_producer);
+ }
+ }
+#endif
+ Condition_TimedWait(pr->awake_producer, 1);
+ Condition_Unlock((*(pr->awake_producer)));
+ }
+ }
+ int writeindex;
+ if ((pr->producer + 1) == pr->maxcount)
+ writeindex = 0;
+ else
+ writeindex = (pr->producer + 1);
+
+ /* Next two lines must be maintained as is */
+ memcpy((pr->data + writeindex), metapacket, sizeof(struct ReportStruct));
+ pr->producer = writeindex;
+}
+
+inline struct ReportStruct *packetring_dequeue (struct PacketRing *pr) {
+ struct ReportStruct *packet = NULL;
+ if (pr->producer == pr->consumer)
+ return NULL;
+
+ int readindex;
+ if ((pr->consumer + 1) == pr->maxcount)
+ readindex = 0;
+ else
+ readindex = (pr->consumer + 1);
+ packet = (pr->data + readindex);
+ // advance the consumer pointer last
+ pr->consumer = readindex;
+ if (pr->mutex_enable) {
+ // Signal the traffic thread assigned to this ring
+ // when the ring goes from having something to empty
+ if (pr->producer == pr->consumer) {
+#ifdef HAVE_THREAD_DEBUG
+ // thread_debug( "Consumer signal packet ring %p empty per %p", (void *)pr, (void *)&pr->awake_producer);
+#endif
+ assert(pr->awake_producer);
+ Condition_Signal(pr->awake_producer);
+ }
+ }
+ return packet;
+}
+
+inline void enqueue_ackring (struct PacketRing *pr, struct ReportStruct *metapacket) {
+ packetring_enqueue(pr, metapacket);
+ // Keep the latency low by signaling the consumer thread
+ // per each enqueue
+#ifdef HAVE_THREAD_DEBUG
+ // thread_debug( "Producer signal consumer ackring=%p per %p", (void *)pr, (void *)&pr->awake_consumer);
+#endif
+ Condition_Signal(pr->awake_consumer);
+}
+
+inline struct ReportStruct *dequeue_ackring (struct PacketRing *pr) {
+ struct ReportStruct *packet = NULL;
+ Condition_Lock((*(pr->awake_consumer)));
+ while ((packet = packetring_dequeue(pr)) == NULL) {
+ Condition_TimedWait(pr->awake_consumer, 1);
+ }
+ Condition_Unlock((*(pr->awake_consumer)));
+ if (packet) {
+ // Signal the producer thread for low latency
+ // indication of space available
+ Condition_Signal(pr->awake_producer);
+ }
+ return packet;
+}
+
+void packetring_free (struct PacketRing *pr) {
+ if (pr) {
+ if (pr->awaitcounter > 1000) fprintf(stderr, "WARN: Reporter thread may be too slow, await counter=%d, " \
+ "consider increasing NUM_REPORT_STRUCTS\n", pr->awaitcounter);
+ if (pr->data) {
+#ifdef HAVE_THREAD_DEBUG
+ Mutex_Lock(&packetringdebug_mutex);
+ totalpacketringcount--;
+ thread_debug("Free packet ring=%p producer=%p (consumer=%p) awaits = %d total rings = %d", \
+ (void *)pr, (void *) pr->awake_producer, (void *) pr->awake_consumer, pr->awaitcounter, totalpacketringcount);
+ Mutex_Unlock(&packetringdebug_mutex);
+#endif
+ free(pr->data);
+ }
+ free(pr);
+ }
+}
+
+void free_ackring(struct PacketRing *pr) {
+ packetring_free(pr);
+ Condition_Destroy(pr->awake_consumer);
+}
+
+/*
+ * This is an estimate and can be incorrect as these counters
+ * done like this is not thread safe. Use with care as there
+ * is no guarantee the return value is accurate
+ */
+#ifdef HAVE_THREAD_DEBUG
+inline int packetring_getcount (struct PacketRing *pr) {
+ int depth = 0;
+ if (pr->producer != pr->consumer) {
+ depth = (pr->producer > pr->consumer) ? \
+ (pr->producer - pr->consumer) : \
+ ((pr->maxcount - pr->consumer) + pr->producer);
+ // printf("DEBUG: Depth=%d for packet ring %p\n", depth, (void *)pr);
+ }
+ return depth;
+}
+#endif