summaryrefslogtreecommitdiffstats
path: root/runtime/lib_ksi_queue.c
diff options
context:
space:
mode:
Diffstat (limited to 'runtime/lib_ksi_queue.c')
-rw-r--r--runtime/lib_ksi_queue.c220
1 files changed, 220 insertions, 0 deletions
diff --git a/runtime/lib_ksi_queue.c b/runtime/lib_ksi_queue.c
new file mode 100644
index 0000000..f407156
--- /dev/null
+++ b/runtime/lib_ksi_queue.c
@@ -0,0 +1,220 @@
+#include <malloc.h>
+#include <time.h>
+#include <errno.h>
+#include "lib_ksi_queue.h"
+
+RingBuffer* RingBuffer_new(size_t size) {
+ RingBuffer *p = calloc(1, sizeof (RingBuffer));
+ if (!p)
+ return NULL;
+
+ p->buffer = calloc(size, sizeof (void*));
+ p->size = size;
+ return p;
+}
+
+void RingBuffer_free(RingBuffer* this) {
+ if (this->buffer != NULL)
+ free(this->buffer);
+ free(this);
+}
+
+static bool RingBuffer_grow(RingBuffer* this) {
+ void **pTmp = calloc(this->size * RB_GROW_FACTOR, sizeof (void*));
+ void *pTmpItem = NULL;
+ if (!pTmp)
+ return false;
+
+ for (size_t i = 0; i < this->size; ++i) {
+ RingBuffer_popFront(this, &pTmpItem);
+ pTmp[i] = pTmpItem;
+ }
+
+ free(this->buffer);
+ this->buffer = pTmp;
+ this->head = 0;
+ this->tail = this->size;
+ this->count = this->size;
+ this->size = this->size * RB_GROW_FACTOR;
+ return true;
+}
+
+bool RingBuffer_pushBack(RingBuffer* this, void* item) {
+
+ if (this->size == this->count && !RingBuffer_grow(this))
+ return false;
+
+ if(this->size == 0)
+ return false;
+
+ this->buffer[this->tail] = item;
+ this->tail = (this->tail + 1) % this->size;
+ this->count += 1;
+ return true;
+}
+
+bool RingBuffer_popFront(RingBuffer* this, void** item) {
+ if (this->count == 0)
+ return false;
+
+ *item = this->buffer[this->head];
+ this->buffer[this->head] = NULL;
+ this->count -= 1;
+ this->head = (this->head + 1) % this->size;
+ return true;
+}
+
+bool RingBuffer_peekFront(RingBuffer* this, void** item) {
+ if (this->count == 0)
+ return false;
+
+ *item = this->buffer[this->head];
+ return true;
+}
+
+size_t RingBuffer_count(RingBuffer* this) {
+ return this->count;
+}
+
+bool RingBuffer_getItem(RingBuffer* this, size_t index, void** item) {
+ if (this->count == 0 || index >= this->count)
+ return false;
+
+ *item = this->buffer[(this->head + index) % this->size];
+ return true;
+}
+
+
+ProtectedQueue* ProtectedQueue_new(size_t queueSize) {
+ ProtectedQueue *p = calloc(1, sizeof (ProtectedQueue));
+ if (!p)
+ return NULL;
+
+ pthread_mutex_init(&p->mutex, 0);
+ p->bStop = false;
+ p->workItems = RingBuffer_new(queueSize);
+ return p;
+}
+
+void ProtectedQueue_free(ProtectedQueue* this) {
+ pthread_mutex_destroy(&this->mutex);
+ pthread_cond_destroy(&this->condition);
+ this->bStop = true;
+ RingBuffer_free(this->workItems);
+ free(this);
+}
+
+/// Signal stop. All threads waiting in FetchItme will be returned false from FetchItem
+
+void ProtectedQueue_stop(ProtectedQueue* this) {
+ this->bStop = true;
+ pthread_cond_broadcast(&this->condition);
+}
+
+/// Atomically adds an item into work item queue and releases a thread waiting
+/// in FetchItem
+
+bool ProtectedQueue_addItem(ProtectedQueue* this, void* item) {
+ bool ret = false;
+
+ if (this->bStop)
+ return false;
+
+ pthread_mutex_lock(&this->mutex);
+ if ((ret = RingBuffer_pushBack(this->workItems, item)) == true)
+ pthread_cond_signal(&this->condition);
+ pthread_mutex_unlock(&this->mutex);
+ return ret;
+}
+
+bool ProtectedQueue_peekFront(ProtectedQueue* this, void** item) {
+ bool ret;
+ pthread_mutex_lock(&this->mutex);
+ ret = RingBuffer_peekFront(this->workItems, item);
+ pthread_mutex_unlock(&this->mutex);
+ return ret;
+}
+
+bool ProtectedQueue_popFront(ProtectedQueue* this, void** item) {
+ bool ret;
+ pthread_mutex_lock(&this->mutex);
+ ret = RingBuffer_popFront(this->workItems, item);
+ pthread_mutex_unlock(&this->mutex);
+ return ret;
+}
+
+size_t ProtectedQueue_popFrontBatch(ProtectedQueue* this, void** items, size_t bufSize) {
+ size_t i;
+ pthread_mutex_lock(&this->mutex);
+ for (i = 0; RingBuffer_count(this->workItems) > 0 && i < bufSize; ++i)
+ RingBuffer_popFront(this->workItems, items[i]);
+ pthread_mutex_unlock(&this->mutex);
+ return i;
+}
+
+bool ProtectedQueue_getItem(ProtectedQueue* this, size_t index, void** item) {
+ bool ret=false;
+ pthread_mutex_lock(&this->mutex);
+ ret=RingBuffer_getItem(this->workItems, index, item);
+ pthread_mutex_unlock(&this->mutex);
+ return ret;
+}
+
+/* Waits for a new work item or timeout (if specified). Returns 0 in case of exit
+ * condition, 1 if item became available and ETIMEDOUT in case of timeout. */
+int ProtectedQueue_waitForItem(ProtectedQueue* this, void** item, uint64_t timeout) {
+ struct timespec ts;
+ pthread_mutex_lock(&this->mutex);
+
+ if (timeout > 0) {
+ clock_gettime(CLOCK_REALTIME, &ts);
+ ts.tv_sec += timeout / 1000LL;
+ ts.tv_nsec += (timeout % 1000LL)*1000LL;
+ }
+
+ if (timeout) {
+ if (pthread_cond_timedwait(&this->condition, &this->mutex, &ts) == ETIMEDOUT) {
+ pthread_mutex_unlock(&this->mutex);
+ return ETIMEDOUT;
+ }
+ } else
+ pthread_cond_wait(&this->condition, &this->mutex);
+ if (this->bStop) {
+ pthread_mutex_unlock(&this->mutex);
+ return 0;
+ }
+
+ if (RingBuffer_count(this->workItems) != 0 && item != NULL)
+ RingBuffer_popFront(this->workItems, item);
+
+ pthread_mutex_unlock(&this->mutex);
+
+ return 1;
+}
+
+size_t ProtectedQueue_count(ProtectedQueue* this) {
+ size_t nCount;
+ pthread_mutex_lock(&this->mutex);
+ nCount = RingBuffer_count(this->workItems);
+ pthread_mutex_unlock(&this->mutex);
+ return nCount;
+}
+
+void *worker_thread_main(void *arg) {
+ int res;
+ void* item;
+ WorkerThreadContext* tc = (WorkerThreadContext*) arg;
+
+ while (1) {
+ item = NULL;
+ res = ProtectedQueue_waitForItem(tc->queue, &item, tc->timeout);
+ if (tc->queue->bStop)
+ return NULL;
+
+ if (res == ETIMEDOUT) {
+ if (!tc->timeoutFunc())
+ return NULL;
+ } else if (item != NULL && !tc->workerFunc(item))
+ return NULL;
+ }
+}