| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256 |
- /*
- * Copyright (C) 2019 Intel Corporation. All rights reserved.
- * SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception
- */
- #include "bh_queue.h"
- typedef struct bh_queue_node {
- struct bh_queue_node *next;
- struct bh_queue_node *prev;
- unsigned short tag;
- unsigned int len;
- void *body;
- bh_msg_cleaner msg_cleaner;
- } bh_queue_node;
- struct bh_queue {
- bh_queue_mutex queue_lock;
- bh_queue_cond queue_wait_cond;
- unsigned int cnt;
- unsigned int max;
- unsigned int drops;
- bh_queue_node *head;
- bh_queue_node *tail;
- bool exit_loop_run;
- };
- char *
- bh_message_payload(bh_message_t message)
- {
- return message->body;
- }
- uint32
- bh_message_payload_len(bh_message_t message)
- {
- return message->len;
- }
- int
- bh_message_type(bh_message_t message)
- {
- return message->tag;
- }
- bh_queue *
- bh_queue_create()
- {
- int ret;
- bh_queue *queue = bh_queue_malloc(sizeof(bh_queue));
- if (queue) {
- memset(queue, 0, sizeof(bh_queue));
- queue->max = DEFAULT_QUEUE_LENGTH;
- ret = bh_queue_mutex_init(&queue->queue_lock);
- if (ret != 0) {
- bh_queue_free(queue);
- return NULL;
- }
- ret = bh_queue_cond_init(&queue->queue_wait_cond);
- if (ret != 0) {
- bh_queue_mutex_destroy(&queue->queue_lock);
- bh_queue_free(queue);
- return NULL;
- }
- }
- return queue;
- }
- void
- bh_queue_destroy(bh_queue *queue)
- {
- bh_queue_node *node;
- if (!queue)
- return;
- bh_queue_mutex_lock(&queue->queue_lock);
- while (queue->head) {
- node = queue->head;
- queue->head = node->next;
- bh_free_msg(node);
- }
- bh_queue_mutex_unlock(&queue->queue_lock);
- bh_queue_cond_destroy(&queue->queue_wait_cond);
- bh_queue_mutex_destroy(&queue->queue_lock);
- bh_queue_free(queue);
- }
- bool
- bh_post_msg2(bh_queue *queue, bh_queue_node *msg)
- {
- if (queue->cnt >= queue->max) {
- queue->drops++;
- bh_free_msg(msg);
- return false;
- }
- bh_queue_mutex_lock(&queue->queue_lock);
- if (queue->cnt == 0) {
- bh_assert(queue->head == NULL);
- bh_assert(queue->tail == NULL);
- queue->head = queue->tail = msg;
- msg->next = msg->prev = NULL;
- queue->cnt = 1;
- bh_queue_cond_signal(&queue->queue_wait_cond);
- }
- else {
- msg->next = NULL;
- msg->prev = queue->tail;
- queue->tail->next = msg;
- queue->tail = msg;
- queue->cnt++;
- }
- bh_queue_mutex_unlock(&queue->queue_lock);
- return true;
- }
- bool
- bh_post_msg(bh_queue *queue, unsigned short tag, void *body, unsigned int len)
- {
- bh_queue_node *msg = bh_new_msg(tag, body, len, NULL);
- if (msg == NULL) {
- queue->drops++;
- if (len != 0 && body)
- BH_FREE(body);
- return false;
- }
- if (!bh_post_msg2(queue, msg)) {
- // bh_post_msg2 already freed the msg for failure
- return false;
- }
- return true;
- }
- bh_queue_node *
- bh_new_msg(unsigned short tag, void *body, unsigned int len, void *handler)
- {
- bh_queue_node *msg =
- (bh_queue_node *)bh_queue_malloc(sizeof(bh_queue_node));
- if (msg == NULL)
- return NULL;
- memset(msg, 0, sizeof(bh_queue_node));
- msg->len = len;
- msg->body = body;
- msg->tag = tag;
- msg->msg_cleaner = (bh_msg_cleaner)handler;
- return msg;
- }
- void
- bh_free_msg(bh_queue_node *msg)
- {
- if (msg->msg_cleaner) {
- msg->msg_cleaner(msg->body);
- bh_queue_free(msg);
- return;
- }
- // note: sometime we just use the payload pointer for a integer value
- // len!=0 is the only indicator about the body is an allocated buffer.
- if (msg->body && msg->len)
- bh_queue_free(msg->body);
- bh_queue_free(msg);
- }
- bh_message_t
- bh_get_msg(bh_queue *queue, uint64 timeout_us)
- {
- bh_queue_node *msg = NULL;
- bh_queue_mutex_lock(&queue->queue_lock);
- if (queue->cnt == 0) {
- bh_assert(queue->head == NULL);
- bh_assert(queue->tail == NULL);
- if (timeout_us == 0) {
- bh_queue_mutex_unlock(&queue->queue_lock);
- return NULL;
- }
- bh_queue_cond_timedwait(&queue->queue_wait_cond, &queue->queue_lock,
- timeout_us);
- }
- if (queue->cnt == 0) {
- bh_assert(queue->head == NULL);
- bh_assert(queue->tail == NULL);
- }
- else if (queue->cnt == 1) {
- bh_assert(queue->head == queue->tail);
- msg = queue->head;
- queue->head = queue->tail = NULL;
- queue->cnt = 0;
- }
- else {
- msg = queue->head;
- queue->head = queue->head->next;
- queue->head->prev = NULL;
- queue->cnt--;
- }
- bh_queue_mutex_unlock(&queue->queue_lock);
- return msg;
- }
- unsigned
- bh_queue_get_message_count(bh_queue *queue)
- {
- if (!queue)
- return 0;
- return queue->cnt;
- }
- void
- bh_queue_enter_loop_run(bh_queue *queue, bh_queue_handle_msg_callback handle_cb,
- void *arg)
- {
- if (!queue)
- return;
- while (!queue->exit_loop_run) {
- bh_queue_node *message = bh_get_msg(queue, BHT_WAIT_FOREVER);
- if (message) {
- handle_cb(message, arg);
- bh_free_msg(message);
- }
- }
- }
- void
- bh_queue_exit_loop_run(bh_queue *queue)
- {
- if (queue) {
- queue->exit_loop_run = true;
- bh_queue_cond_signal(&queue->queue_wait_cond);
- }
- }
|