bh_queue.c 5.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248
  1. /*
  2. * Copyright (C) 2019 Intel Corporation. All rights reserved.
  3. * SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception
  4. */
  5. #include "bh_queue.h"
  6. #include "bh_thread.h"
  7. #include "bh_memory.h"
  8. #include "bh_time.h"
  9. #include "bh_common.h"
  10. typedef struct _bh_queue_node {
  11. struct _bh_queue_node * next;
  12. struct _bh_queue_node * prev;
  13. unsigned short tag;
  14. unsigned int len;
  15. void * body;
  16. bh_msg_cleaner msg_cleaner;
  17. } bh_queue_node;
  18. struct bh_queue {
  19. bh_queue_mutex queue_lock;
  20. bh_queue_cond queue_wait_cond;
  21. unsigned int cnt;
  22. unsigned int max;
  23. unsigned int drops;
  24. bh_queue_node * head;
  25. bh_queue_node * tail;
  26. bool exit_loop_run;
  27. };
  28. char * bh_message_payload(bh_message_t message)
  29. {
  30. return message->body;
  31. }
  32. uint32 bh_message_payload_len(bh_message_t message)
  33. {
  34. return message->len;
  35. }
  36. int bh_message_type(bh_message_t message)
  37. {
  38. return message->tag;
  39. }
  40. bh_queue *
  41. bh_queue_create()
  42. {
  43. int ret;
  44. bh_queue *queue = bh_queue_malloc(sizeof(bh_queue));
  45. if (queue) {
  46. memset(queue, 0, sizeof(bh_queue));
  47. queue->max = DEFAULT_QUEUE_LENGTH;
  48. ret = bh_queue_mutex_init(&queue->queue_lock);
  49. if (ret != 0) {
  50. bh_queue_free(queue);
  51. return NULL;
  52. }
  53. ret = bh_queue_cond_init(&queue->queue_wait_cond);
  54. if (ret != 0) {
  55. bh_queue_mutex_destroy(&queue->queue_lock);
  56. bh_queue_free(queue);
  57. return NULL;
  58. }
  59. }
  60. return queue;
  61. }
  62. void bh_queue_destroy(bh_queue *queue)
  63. {
  64. bh_queue_node *node;
  65. if (!queue)
  66. return;
  67. bh_queue_mutex_lock(&queue->queue_lock);
  68. while (queue->head) {
  69. node = queue->head;
  70. queue->head = node->next;
  71. bh_free_msg(node);
  72. }
  73. bh_queue_mutex_unlock(&queue->queue_lock);
  74. bh_queue_cond_destroy(&queue->queue_wait_cond);
  75. bh_queue_mutex_destroy(&queue->queue_lock);
  76. bh_queue_free(queue);
  77. }
  78. bool bh_post_msg2(bh_queue *queue, bh_queue_node *msg)
  79. {
  80. if (queue->cnt >= queue->max) {
  81. queue->drops++;
  82. bh_free_msg(msg);
  83. return false;
  84. }
  85. bh_queue_mutex_lock(&queue->queue_lock);
  86. if (queue->cnt == 0) {
  87. bh_assert(queue->head == NULL);
  88. bh_assert(queue->tail == NULL);
  89. queue->head = queue->tail = msg;
  90. msg->next = msg->prev = NULL;
  91. queue->cnt = 1;
  92. bh_queue_cond_signal(&queue->queue_wait_cond);
  93. } else {
  94. msg->next = NULL;
  95. msg->prev = queue->tail;
  96. queue->tail->next = msg;
  97. queue->tail = msg;
  98. queue->cnt++;
  99. }
  100. bh_queue_mutex_unlock(&queue->queue_lock);
  101. return true;
  102. }
  103. bool bh_post_msg(bh_queue *queue, unsigned short tag, void *body,
  104. unsigned int len)
  105. {
  106. bh_queue_node *msg = bh_new_msg(tag, body, len, NULL);
  107. if (msg == NULL) {
  108. queue->drops++;
  109. if (len != 0 && body)
  110. bh_free(body);
  111. return false;
  112. }
  113. if (!bh_post_msg2(queue, msg)) {
  114. // bh_post_msg2 already freed the msg for failure
  115. return false;
  116. }
  117. return true;
  118. }
  119. bh_queue_node * bh_new_msg(unsigned short tag, void *body, unsigned int len,
  120. void * handler)
  121. {
  122. bh_queue_node *msg = (bh_queue_node*) bh_queue_malloc(
  123. sizeof(bh_queue_node));
  124. if (msg == NULL)
  125. return NULL;
  126. memset(msg, 0, sizeof(bh_queue_node));
  127. msg->len = len;
  128. msg->body = body;
  129. msg->tag = tag;
  130. msg->msg_cleaner = (bh_msg_cleaner) handler;
  131. return msg;
  132. }
  133. void bh_free_msg(bh_queue_node *msg)
  134. {
  135. if (msg->msg_cleaner) {
  136. msg->msg_cleaner(msg->body);
  137. bh_queue_free(msg);
  138. return;
  139. }
  140. // note: sometime we just use the payload pointer for a integer value
  141. // len!=0 is the only indicator about the body is an allocated buffer.
  142. if (msg->body && msg->len)
  143. bh_queue_free(msg->body);
  144. bh_queue_free(msg);
  145. }
  146. bh_message_t bh_get_msg(bh_queue *queue, int timeout)
  147. {
  148. bh_queue_node *msg = NULL;
  149. bh_queue_mutex_lock(&queue->queue_lock);
  150. if (queue->cnt == 0) {
  151. bh_assert(queue->head == NULL);
  152. bh_assert(queue->tail == NULL);
  153. if (timeout == 0) {
  154. bh_queue_mutex_unlock(&queue->queue_lock);
  155. return NULL;
  156. }
  157. bh_queue_cond_timedwait(&queue->queue_wait_cond, &queue->queue_lock,
  158. timeout);
  159. }
  160. if (queue->cnt == 0) {
  161. bh_assert(queue->head == NULL);
  162. bh_assert(queue->tail == NULL);
  163. } else if (queue->cnt == 1) {
  164. bh_assert(queue->head == queue->tail);
  165. msg = queue->head;
  166. queue->head = queue->tail = NULL;
  167. queue->cnt = 0;
  168. } else {
  169. msg = queue->head;
  170. queue->head = queue->head->next;
  171. queue->head->prev = NULL;
  172. queue->cnt--;
  173. }
  174. bh_queue_mutex_unlock(&queue->queue_lock);
  175. return msg;
  176. }
  177. unsigned bh_queue_get_message_count(bh_queue *queue)
  178. {
  179. if (!queue)
  180. return 0;
  181. return queue->cnt;
  182. }
  183. void bh_queue_enter_loop_run(bh_queue *queue,
  184. bh_queue_handle_msg_callback handle_cb,
  185. void *arg)
  186. {
  187. if (!queue)
  188. return;
  189. while (!queue->exit_loop_run) {
  190. bh_queue_node * message = bh_get_msg(queue, (int)BH_WAIT_FOREVER);
  191. if (message) {
  192. handle_cb(message, arg);
  193. bh_free_msg(message);
  194. }
  195. }
  196. }
  197. void bh_queue_exit_loop_run(bh_queue *queue)
  198. {
  199. if (queue) {
  200. queue->exit_loop_run = true;
  201. bh_queue_cond_signal(&queue->queue_wait_cond);
  202. }
  203. }