/* * Copyright (C) 2019 Intel Corporation. All rights reserved. * SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception */ #include "bh_queue.h" typedef struct bh_queue_node { struct bh_queue_node *next; struct bh_queue_node *prev; unsigned short tag; unsigned int len; void *body; bh_msg_cleaner msg_cleaner; } bh_queue_node; struct bh_queue { bh_queue_mutex queue_lock; bh_queue_cond queue_wait_cond; unsigned int cnt; unsigned int max; unsigned int drops; bh_queue_node *head; bh_queue_node *tail; bool exit_loop_run; }; char * bh_message_payload(bh_message_t message) { return message->body; } uint32 bh_message_payload_len(bh_message_t message) { return message->len; } int bh_message_type(bh_message_t message) { return message->tag; } bh_queue * bh_queue_create() { int ret; bh_queue *queue = bh_queue_malloc(sizeof(bh_queue)); if (queue) { memset(queue, 0, sizeof(bh_queue)); queue->max = DEFAULT_QUEUE_LENGTH; ret = bh_queue_mutex_init(&queue->queue_lock); if (ret != 0) { bh_queue_free(queue); return NULL; } ret = bh_queue_cond_init(&queue->queue_wait_cond); if (ret != 0) { bh_queue_mutex_destroy(&queue->queue_lock); bh_queue_free(queue); return NULL; } } return queue; } void bh_queue_destroy(bh_queue *queue) { bh_queue_node *node; if (!queue) return; bh_queue_mutex_lock(&queue->queue_lock); while (queue->head) { node = queue->head; queue->head = node->next; bh_free_msg(node); } bh_queue_mutex_unlock(&queue->queue_lock); bh_queue_cond_destroy(&queue->queue_wait_cond); bh_queue_mutex_destroy(&queue->queue_lock); bh_queue_free(queue); } bool bh_post_msg2(bh_queue *queue, bh_queue_node *msg) { if (queue->cnt >= queue->max) { queue->drops++; bh_free_msg(msg); return false; } bh_queue_mutex_lock(&queue->queue_lock); if (queue->cnt == 0) { bh_assert(queue->head == NULL); bh_assert(queue->tail == NULL); queue->head = queue->tail = msg; msg->next = msg->prev = NULL; queue->cnt = 1; bh_queue_cond_signal(&queue->queue_wait_cond); } else { msg->next = NULL; msg->prev = queue->tail; queue->tail->next = msg; queue->tail = msg; queue->cnt++; } bh_queue_mutex_unlock(&queue->queue_lock); return true; } bool bh_post_msg(bh_queue *queue, unsigned short tag, void *body, unsigned int len) { bh_queue_node *msg = bh_new_msg(tag, body, len, NULL); if (msg == NULL) { queue->drops++; if (len != 0 && body) BH_FREE(body); return false; } if (!bh_post_msg2(queue, msg)) { // bh_post_msg2 already freed the msg for failure return false; } return true; } bh_queue_node * bh_new_msg(unsigned short tag, void *body, unsigned int len, void *handler) { bh_queue_node *msg = (bh_queue_node *)bh_queue_malloc(sizeof(bh_queue_node)); if (msg == NULL) return NULL; memset(msg, 0, sizeof(bh_queue_node)); msg->len = len; msg->body = body; msg->tag = tag; msg->msg_cleaner = (bh_msg_cleaner)handler; return msg; } void bh_free_msg(bh_queue_node *msg) { if (msg->msg_cleaner) { msg->msg_cleaner(msg->body); bh_queue_free(msg); return; } // note: sometime we just use the payload pointer for a integer value // len!=0 is the only indicator about the body is an allocated buffer. if (msg->body && msg->len) bh_queue_free(msg->body); bh_queue_free(msg); } bh_message_t bh_get_msg(bh_queue *queue, uint64 timeout_us) { bh_queue_node *msg = NULL; bh_queue_mutex_lock(&queue->queue_lock); if (queue->cnt == 0) { bh_assert(queue->head == NULL); bh_assert(queue->tail == NULL); if (timeout_us == 0) { bh_queue_mutex_unlock(&queue->queue_lock); return NULL; } bh_queue_cond_timedwait(&queue->queue_wait_cond, &queue->queue_lock, timeout_us); } if (queue->cnt == 0) { bh_assert(queue->head == NULL); bh_assert(queue->tail == NULL); } else if (queue->cnt == 1) { bh_assert(queue->head == queue->tail); msg = queue->head; queue->head = queue->tail = NULL; queue->cnt = 0; } else { msg = queue->head; queue->head = queue->head->next; queue->head->prev = NULL; queue->cnt--; } bh_queue_mutex_unlock(&queue->queue_lock); return msg; } unsigned bh_queue_get_message_count(bh_queue *queue) { if (!queue) return 0; return queue->cnt; } void bh_queue_enter_loop_run(bh_queue *queue, bh_queue_handle_msg_callback handle_cb, void *arg) { if (!queue) return; while (!queue->exit_loop_run) { bh_queue_node *message = bh_get_msg(queue, BHT_WAIT_FOREVER); if (message) { handle_cb(message, arg); bh_free_msg(message); } } } void bh_queue_exit_loop_run(bh_queue *queue) { if (queue) { queue->exit_loop_run = true; bh_queue_cond_signal(&queue->queue_wait_cond); } }