summaryrefslogtreecommitdiffstats
path: root/fluent-bit/lib/wasm-micro-runtime-WAMR-1.2.2/core/shared/utils/bh_queue.c
diff options
context:
space:
mode:
Diffstat (limited to 'fluent-bit/lib/wasm-micro-runtime-WAMR-1.2.2/core/shared/utils/bh_queue.c')
-rw-r--r--fluent-bit/lib/wasm-micro-runtime-WAMR-1.2.2/core/shared/utils/bh_queue.c256
1 files changed, 256 insertions, 0 deletions
diff --git a/fluent-bit/lib/wasm-micro-runtime-WAMR-1.2.2/core/shared/utils/bh_queue.c b/fluent-bit/lib/wasm-micro-runtime-WAMR-1.2.2/core/shared/utils/bh_queue.c
new file mode 100644
index 000000000..7c860d11a
--- /dev/null
+++ b/fluent-bit/lib/wasm-micro-runtime-WAMR-1.2.2/core/shared/utils/bh_queue.c
@@ -0,0 +1,256 @@
+/*
+ * Copyright (C) 2019 Intel Corporation. All rights reserved.
+ * SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception
+ */
+
+#include "bh_queue.h"
+
+typedef struct bh_queue_node {
+ struct bh_queue_node *next;
+ struct bh_queue_node *prev;
+ unsigned short tag;
+ unsigned int len;
+ void *body;
+ bh_msg_cleaner msg_cleaner;
+} bh_queue_node;
+
+struct bh_queue {
+ bh_queue_mutex queue_lock;
+ bh_queue_cond queue_wait_cond;
+ unsigned int cnt;
+ unsigned int max;
+ unsigned int drops;
+ bh_queue_node *head;
+ bh_queue_node *tail;
+
+ bool exit_loop_run;
+};
+
+char *
+bh_message_payload(bh_message_t message)
+{
+ return message->body;
+}
+
+uint32
+bh_message_payload_len(bh_message_t message)
+{
+ return message->len;
+}
+
+int
+bh_message_type(bh_message_t message)
+{
+ return message->tag;
+}
+
+bh_queue *
+bh_queue_create()
+{
+ int ret;
+ bh_queue *queue = bh_queue_malloc(sizeof(bh_queue));
+
+ if (queue) {
+ memset(queue, 0, sizeof(bh_queue));
+ queue->max = DEFAULT_QUEUE_LENGTH;
+
+ ret = bh_queue_mutex_init(&queue->queue_lock);
+ if (ret != 0) {
+ bh_queue_free(queue);
+ return NULL;
+ }
+
+ ret = bh_queue_cond_init(&queue->queue_wait_cond);
+ if (ret != 0) {
+ bh_queue_mutex_destroy(&queue->queue_lock);
+ bh_queue_free(queue);
+ return NULL;
+ }
+ }
+
+ return queue;
+}
+
+void
+bh_queue_destroy(bh_queue *queue)
+{
+ bh_queue_node *node;
+
+ if (!queue)
+ return;
+
+ bh_queue_mutex_lock(&queue->queue_lock);
+ while (queue->head) {
+ node = queue->head;
+ queue->head = node->next;
+
+ bh_free_msg(node);
+ }
+ bh_queue_mutex_unlock(&queue->queue_lock);
+
+ bh_queue_cond_destroy(&queue->queue_wait_cond);
+ bh_queue_mutex_destroy(&queue->queue_lock);
+ bh_queue_free(queue);
+}
+
+bool
+bh_post_msg2(bh_queue *queue, bh_queue_node *msg)
+{
+ if (queue->cnt >= queue->max) {
+ queue->drops++;
+ bh_free_msg(msg);
+ return false;
+ }
+
+ bh_queue_mutex_lock(&queue->queue_lock);
+
+ if (queue->cnt == 0) {
+ bh_assert(queue->head == NULL);
+ bh_assert(queue->tail == NULL);
+ queue->head = queue->tail = msg;
+ msg->next = msg->prev = NULL;
+ queue->cnt = 1;
+
+ bh_queue_cond_signal(&queue->queue_wait_cond);
+ }
+ else {
+ msg->next = NULL;
+ msg->prev = queue->tail;
+ queue->tail->next = msg;
+ queue->tail = msg;
+ queue->cnt++;
+ }
+
+ bh_queue_mutex_unlock(&queue->queue_lock);
+
+ return true;
+}
+
+bool
+bh_post_msg(bh_queue *queue, unsigned short tag, void *body, unsigned int len)
+{
+ bh_queue_node *msg = bh_new_msg(tag, body, len, NULL);
+ if (msg == NULL) {
+ queue->drops++;
+ if (len != 0 && body)
+ BH_FREE(body);
+ return false;
+ }
+
+ if (!bh_post_msg2(queue, msg)) {
+ // bh_post_msg2 already freed the msg for failure
+ return false;
+ }
+
+ return true;
+}
+
+bh_queue_node *
+bh_new_msg(unsigned short tag, void *body, unsigned int len, void *handler)
+{
+ bh_queue_node *msg =
+ (bh_queue_node *)bh_queue_malloc(sizeof(bh_queue_node));
+ if (msg == NULL)
+ return NULL;
+ memset(msg, 0, sizeof(bh_queue_node));
+ msg->len = len;
+ msg->body = body;
+ msg->tag = tag;
+ msg->msg_cleaner = (bh_msg_cleaner)handler;
+
+ return msg;
+}
+
+void
+bh_free_msg(bh_queue_node *msg)
+{
+ if (msg->msg_cleaner) {
+ msg->msg_cleaner(msg->body);
+ bh_queue_free(msg);
+ return;
+ }
+
+ // note: sometime we just use the payload pointer for a integer value
+ // len!=0 is the only indicator about the body is an allocated buffer.
+ if (msg->body && msg->len)
+ bh_queue_free(msg->body);
+
+ bh_queue_free(msg);
+}
+
+bh_message_t
+bh_get_msg(bh_queue *queue, uint64 timeout_us)
+{
+ bh_queue_node *msg = NULL;
+ bh_queue_mutex_lock(&queue->queue_lock);
+
+ if (queue->cnt == 0) {
+ bh_assert(queue->head == NULL);
+ bh_assert(queue->tail == NULL);
+
+ if (timeout_us == 0) {
+ bh_queue_mutex_unlock(&queue->queue_lock);
+ return NULL;
+ }
+
+ bh_queue_cond_timedwait(&queue->queue_wait_cond, &queue->queue_lock,
+ timeout_us);
+ }
+
+ if (queue->cnt == 0) {
+ bh_assert(queue->head == NULL);
+ bh_assert(queue->tail == NULL);
+ }
+ else if (queue->cnt == 1) {
+ bh_assert(queue->head == queue->tail);
+
+ msg = queue->head;
+ queue->head = queue->tail = NULL;
+ queue->cnt = 0;
+ }
+ else {
+ msg = queue->head;
+ queue->head = queue->head->next;
+ queue->head->prev = NULL;
+ queue->cnt--;
+ }
+
+ bh_queue_mutex_unlock(&queue->queue_lock);
+
+ return msg;
+}
+
+unsigned
+bh_queue_get_message_count(bh_queue *queue)
+{
+ if (!queue)
+ return 0;
+
+ return queue->cnt;
+}
+
+void
+bh_queue_enter_loop_run(bh_queue *queue, bh_queue_handle_msg_callback handle_cb,
+ void *arg)
+{
+ if (!queue)
+ return;
+
+ while (!queue->exit_loop_run) {
+ bh_queue_node *message = bh_get_msg(queue, BHT_WAIT_FOREVER);
+
+ if (message) {
+ handle_cb(message, arg);
+ bh_free_msg(message);
+ }
+ }
+}
+
+void
+bh_queue_exit_loop_run(bh_queue *queue)
+{
+ if (queue) {
+ queue->exit_loop_run = true;
+ bh_queue_cond_signal(&queue->queue_wait_cond);
+ }
+}