/*--------------------------------------------------------------- * Copyright (c) 2019 * Broadcom Corporation * All Rights Reserved. *--------------------------------------------------------------- * Permission is hereby granted, free of charge, to any person * obtaining a copy of this software and associated * documentation files (the "Software"), to deal in the Software * without restriction, including without limitation the * rights to use, copy, modify, merge, publish, distribute, * sublicense, and/or sell copies of the Software, and to permit * persons to whom the Software is furnished to do * so, subject to the following conditions: * * * Redistributions of source code must retain the above * copyright notice, this list of conditions and * the following disclaimers. * * * Redistributions in binary form must reproduce the above * copyright notice, this list of conditions and the following * disclaimers in the documentation and/or other materials * provided with the distribution. * * * Neither the name of Broadcom Coporation, * nor the names of its contributors may be used to endorse * or promote products derived from this Software without * specific prior written permission. * * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES * OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND * NONINFRINGEMENT. IN NO EVENT SHALL THE CONTIBUTORS OR COPYRIGHT * HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, * WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, * ARISING FROM, OUT OF OR IN CONNECTION WITH THE * SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. * ________________________________________________________________ * * Suppport for packet rings between threads * * by Robert J. McMahon (rjmcmahon@rjmcmahon.com, bob.mcmahon@broadcom.com) * ------------------------------------------------------------------- */ #include "headers.h" #include "packet_ring.h" #include "Condition.h" #include "Thread.h" #ifdef HAVE_THREAD_DEBUG #include "Mutex.h" static int totalpacketringcount = 0; Mutex packetringdebug_mutex; #endif struct PacketRing * packetring_init (int count, struct Condition *awake_consumer, struct Condition *awake_producer) { assert(awake_consumer != NULL); struct PacketRing *pr = NULL; if ((pr = (struct PacketRing *) calloc(1, sizeof(struct PacketRing)))) { pr->bytes = sizeof(struct PacketRing); pr->data = (struct ReportStruct *) calloc(count, sizeof(struct ReportStruct)); pr->bytes += count * sizeof(struct ReportStruct); } if (!pr || !pr->data) { fprintf(stderr, "ERROR: no memory for packet ring of size %d count, try to reduce with option --NUM_REPORT_STRUCTS\n", count); exit(1); } pr->producer = 0; pr->consumer = 0; pr->maxcount = count; pr->awake_producer = awake_producer; pr->awake_consumer = awake_consumer; if (!awake_producer) pr->mutex_enable=0; else pr->mutex_enable=1; pr->consumerdone = 0; pr->awaitcounter = 0; #ifdef HAVE_THREAD_DEBUG Mutex_Lock(&packetringdebug_mutex); totalpacketringcount++; thread_debug("Init %d element packet ring=%p consumer=%p producer=%p total rings=%d enable=%d", count, \ (void *)pr, (void *) pr->awake_consumer, (void *) pr->awake_producer, totalpacketringcount, pr->mutex_enable); Mutex_Unlock(&packetringdebug_mutex); #endif return (pr); } inline void packetring_enqueue (struct PacketRing *pr, struct ReportStruct *metapacket) { while (((pr->producer == pr->maxcount) && (pr->consumer == 0)) || \ ((pr->producer + 1) == pr->consumer)) { // Signal the consumer thread to process a full queue if (pr->mutex_enable) { assert(pr->awake_consumer != NULL); Condition_Signal(pr->awake_consumer); // Wait for the consumer to create some queue space assert(pr->awake_producer != NULL); Condition_Lock((*(pr->awake_producer))); pr->awaitcounter++; #ifdef HAVE_THREAD_DEBUG_PERF { struct timeval now; static struct timeval prev={0, 0}; gettimeofday( &now, NULL ); if (!prev.tv_sec || (TimeDifference(now, prev) > 1.0)) { prev = now; thread_debug( "Not good, traffic's packet ring %p stalled per %p", (void *)pr, (void *)&pr->awake_producer); } } #endif Condition_TimedWait(pr->awake_producer, 1); Condition_Unlock((*(pr->awake_producer))); } } int writeindex; if ((pr->producer + 1) == pr->maxcount) writeindex = 0; else writeindex = (pr->producer + 1); /* Next two lines must be maintained as is */ memcpy((pr->data + writeindex), metapacket, sizeof(struct ReportStruct)); pr->producer = writeindex; } inline struct ReportStruct *packetring_dequeue (struct PacketRing *pr) { struct ReportStruct *packet = NULL; if (pr->producer == pr->consumer) return NULL; int readindex; if ((pr->consumer + 1) == pr->maxcount) readindex = 0; else readindex = (pr->consumer + 1); packet = (pr->data + readindex); // advance the consumer pointer last pr->consumer = readindex; if (pr->mutex_enable) { // Signal the traffic thread assigned to this ring // when the ring goes from having something to empty if (pr->producer == pr->consumer) { #ifdef HAVE_THREAD_DEBUG // thread_debug( "Consumer signal packet ring %p empty per %p", (void *)pr, (void *)&pr->awake_producer); #endif assert(pr->awake_producer); Condition_Signal(pr->awake_producer); } } return packet; } inline void enqueue_ackring (struct PacketRing *pr, struct ReportStruct *metapacket) { packetring_enqueue(pr, metapacket); // Keep the latency low by signaling the consumer thread // per each enqueue #ifdef HAVE_THREAD_DEBUG // thread_debug( "Producer signal consumer ackring=%p per %p", (void *)pr, (void *)&pr->awake_consumer); #endif Condition_Signal(pr->awake_consumer); } inline struct ReportStruct *dequeue_ackring (struct PacketRing *pr) { struct ReportStruct *packet = NULL; Condition_Lock((*(pr->awake_consumer))); while ((packet = packetring_dequeue(pr)) == NULL) { Condition_TimedWait(pr->awake_consumer, 1); } Condition_Unlock((*(pr->awake_consumer))); if (packet) { // Signal the producer thread for low latency // indication of space available Condition_Signal(pr->awake_producer); } return packet; } void packetring_free (struct PacketRing *pr) { if (pr) { if (pr->awaitcounter > 1000) fprintf(stderr, "WARN: Reporter thread may be too slow, await counter=%d, " \ "consider increasing NUM_REPORT_STRUCTS\n", pr->awaitcounter); if (pr->data) { #ifdef HAVE_THREAD_DEBUG Mutex_Lock(&packetringdebug_mutex); totalpacketringcount--; thread_debug("Free packet ring=%p producer=%p (consumer=%p) awaits = %d total rings = %d", \ (void *)pr, (void *) pr->awake_producer, (void *) pr->awake_consumer, pr->awaitcounter, totalpacketringcount); Mutex_Unlock(&packetringdebug_mutex); #endif free(pr->data); } free(pr); } } void free_ackring(struct PacketRing *pr) { packetring_free(pr); Condition_Destroy(pr->awake_consumer); } /* * This is an estimate and can be incorrect as these counters * done like this is not thread safe. Use with care as there * is no guarantee the return value is accurate */ #ifdef HAVE_THREAD_DEBUG inline int packetring_getcount (struct PacketRing *pr) { int depth = 0; if (pr->producer != pr->consumer) { depth = (pr->producer > pr->consumer) ? \ (pr->producer - pr->consumer) : \ ((pr->maxcount - pr->consumer) + pr->producer); // printf("DEBUG: Depth=%d for packet ring %p\n", depth, (void *)pr); } return depth; } #endif