diff options
Diffstat (limited to 'fluent-bit/lib/wasm-micro-runtime-WAMR-1.2.2/core/shared/utils/bh_queue.c')
-rw-r--r-- | fluent-bit/lib/wasm-micro-runtime-WAMR-1.2.2/core/shared/utils/bh_queue.c | 256 |
1 files changed, 256 insertions, 0 deletions
diff --git a/fluent-bit/lib/wasm-micro-runtime-WAMR-1.2.2/core/shared/utils/bh_queue.c b/fluent-bit/lib/wasm-micro-runtime-WAMR-1.2.2/core/shared/utils/bh_queue.c new file mode 100644 index 000000000..7c860d11a --- /dev/null +++ b/fluent-bit/lib/wasm-micro-runtime-WAMR-1.2.2/core/shared/utils/bh_queue.c @@ -0,0 +1,256 @@ +/* + * Copyright (C) 2019 Intel Corporation. All rights reserved. + * SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception + */ + +#include "bh_queue.h" + +typedef struct bh_queue_node { + struct bh_queue_node *next; + struct bh_queue_node *prev; + unsigned short tag; + unsigned int len; + void *body; + bh_msg_cleaner msg_cleaner; +} bh_queue_node; + +struct bh_queue { + bh_queue_mutex queue_lock; + bh_queue_cond queue_wait_cond; + unsigned int cnt; + unsigned int max; + unsigned int drops; + bh_queue_node *head; + bh_queue_node *tail; + + bool exit_loop_run; +}; + +char * +bh_message_payload(bh_message_t message) +{ + return message->body; +} + +uint32 +bh_message_payload_len(bh_message_t message) +{ + return message->len; +} + +int +bh_message_type(bh_message_t message) +{ + return message->tag; +} + +bh_queue * +bh_queue_create() +{ + int ret; + bh_queue *queue = bh_queue_malloc(sizeof(bh_queue)); + + if (queue) { + memset(queue, 0, sizeof(bh_queue)); + queue->max = DEFAULT_QUEUE_LENGTH; + + ret = bh_queue_mutex_init(&queue->queue_lock); + if (ret != 0) { + bh_queue_free(queue); + return NULL; + } + + ret = bh_queue_cond_init(&queue->queue_wait_cond); + if (ret != 0) { + bh_queue_mutex_destroy(&queue->queue_lock); + bh_queue_free(queue); + return NULL; + } + } + + return queue; +} + +void +bh_queue_destroy(bh_queue *queue) +{ + bh_queue_node *node; + + if (!queue) + return; + + bh_queue_mutex_lock(&queue->queue_lock); + while (queue->head) { + node = queue->head; + queue->head = node->next; + + bh_free_msg(node); + } + bh_queue_mutex_unlock(&queue->queue_lock); + + bh_queue_cond_destroy(&queue->queue_wait_cond); + bh_queue_mutex_destroy(&queue->queue_lock); + bh_queue_free(queue); +} + +bool +bh_post_msg2(bh_queue *queue, bh_queue_node *msg) +{ + if (queue->cnt >= queue->max) { + queue->drops++; + bh_free_msg(msg); + return false; + } + + bh_queue_mutex_lock(&queue->queue_lock); + + if (queue->cnt == 0) { + bh_assert(queue->head == NULL); + bh_assert(queue->tail == NULL); + queue->head = queue->tail = msg; + msg->next = msg->prev = NULL; + queue->cnt = 1; + + bh_queue_cond_signal(&queue->queue_wait_cond); + } + else { + msg->next = NULL; + msg->prev = queue->tail; + queue->tail->next = msg; + queue->tail = msg; + queue->cnt++; + } + + bh_queue_mutex_unlock(&queue->queue_lock); + + return true; +} + +bool +bh_post_msg(bh_queue *queue, unsigned short tag, void *body, unsigned int len) +{ + bh_queue_node *msg = bh_new_msg(tag, body, len, NULL); + if (msg == NULL) { + queue->drops++; + if (len != 0 && body) + BH_FREE(body); + return false; + } + + if (!bh_post_msg2(queue, msg)) { + // bh_post_msg2 already freed the msg for failure + return false; + } + + return true; +} + +bh_queue_node * +bh_new_msg(unsigned short tag, void *body, unsigned int len, void *handler) +{ + bh_queue_node *msg = + (bh_queue_node *)bh_queue_malloc(sizeof(bh_queue_node)); + if (msg == NULL) + return NULL; + memset(msg, 0, sizeof(bh_queue_node)); + msg->len = len; + msg->body = body; + msg->tag = tag; + msg->msg_cleaner = (bh_msg_cleaner)handler; + + return msg; +} + +void +bh_free_msg(bh_queue_node *msg) +{ + if (msg->msg_cleaner) { + msg->msg_cleaner(msg->body); + bh_queue_free(msg); + return; + } + + // note: sometime we just use the payload pointer for a integer value + // len!=0 is the only indicator about the body is an allocated buffer. + if (msg->body && msg->len) + bh_queue_free(msg->body); + + bh_queue_free(msg); +} + +bh_message_t +bh_get_msg(bh_queue *queue, uint64 timeout_us) +{ + bh_queue_node *msg = NULL; + bh_queue_mutex_lock(&queue->queue_lock); + + if (queue->cnt == 0) { + bh_assert(queue->head == NULL); + bh_assert(queue->tail == NULL); + + if (timeout_us == 0) { + bh_queue_mutex_unlock(&queue->queue_lock); + return NULL; + } + + bh_queue_cond_timedwait(&queue->queue_wait_cond, &queue->queue_lock, + timeout_us); + } + + if (queue->cnt == 0) { + bh_assert(queue->head == NULL); + bh_assert(queue->tail == NULL); + } + else if (queue->cnt == 1) { + bh_assert(queue->head == queue->tail); + + msg = queue->head; + queue->head = queue->tail = NULL; + queue->cnt = 0; + } + else { + msg = queue->head; + queue->head = queue->head->next; + queue->head->prev = NULL; + queue->cnt--; + } + + bh_queue_mutex_unlock(&queue->queue_lock); + + return msg; +} + +unsigned +bh_queue_get_message_count(bh_queue *queue) +{ + if (!queue) + return 0; + + return queue->cnt; +} + +void +bh_queue_enter_loop_run(bh_queue *queue, bh_queue_handle_msg_callback handle_cb, + void *arg) +{ + if (!queue) + return; + + while (!queue->exit_loop_run) { + bh_queue_node *message = bh_get_msg(queue, BHT_WAIT_FOREVER); + + if (message) { + handle_cb(message, arg); + bh_free_msg(message); + } + } +} + +void +bh_queue_exit_loop_run(bh_queue *queue) +{ + if (queue) { + queue->exit_loop_run = true; + bh_queue_cond_signal(&queue->queue_wait_cond); + } +} |