bh_queue.c 5.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259
  1. /*
  2. * Copyright (C) 2019 Intel Corporation. All rights reserved.
  3. * SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception
  4. */
  5. #include "bh_queue.h"
  6. typedef struct bh_queue_node {
  7. struct bh_queue_node *next;
  8. struct bh_queue_node *prev;
  9. unsigned short tag;
  10. unsigned int len;
  11. void *body;
  12. bh_msg_cleaner msg_cleaner;
  13. } bh_queue_node;
  14. struct bh_queue {
  15. bh_queue_mutex queue_lock;
  16. bh_queue_cond queue_wait_cond;
  17. unsigned int cnt;
  18. unsigned int max;
  19. unsigned int drops;
  20. bh_queue_node *head;
  21. bh_queue_node *tail;
  22. bool exit_loop_run;
  23. };
  24. char *
  25. bh_message_payload(bh_message_t message)
  26. {
  27. return message->body;
  28. }
  29. uint32
  30. bh_message_payload_len(bh_message_t message)
  31. {
  32. return message->len;
  33. }
  34. int
  35. bh_message_type(bh_message_t message)
  36. {
  37. return message->tag;
  38. }
  39. bh_queue *
  40. bh_queue_create()
  41. {
  42. int ret;
  43. bh_queue *queue = bh_queue_malloc(sizeof(bh_queue));
  44. if (queue) {
  45. memset(queue, 0, sizeof(bh_queue));
  46. queue->max = DEFAULT_QUEUE_LENGTH;
  47. ret = bh_queue_mutex_init(&queue->queue_lock);
  48. if (ret != 0) {
  49. bh_queue_free(queue);
  50. return NULL;
  51. }
  52. ret = bh_queue_cond_init(&queue->queue_wait_cond);
  53. if (ret != 0) {
  54. bh_queue_mutex_destroy(&queue->queue_lock);
  55. bh_queue_free(queue);
  56. return NULL;
  57. }
  58. }
  59. return queue;
  60. }
  61. void
  62. bh_queue_destroy(bh_queue *queue)
  63. {
  64. bh_queue_node *node;
  65. if (!queue)
  66. return;
  67. bh_queue_mutex_lock(&queue->queue_lock);
  68. while (queue->head) {
  69. node = queue->head;
  70. queue->head = node->next;
  71. bh_free_msg(node);
  72. }
  73. bh_queue_mutex_unlock(&queue->queue_lock);
  74. bh_queue_cond_destroy(&queue->queue_wait_cond);
  75. bh_queue_mutex_destroy(&queue->queue_lock);
  76. bh_queue_free(queue);
  77. }
  78. bool
  79. bh_post_msg2(bh_queue *queue, bh_queue_node *msg)
  80. {
  81. bh_queue_mutex_lock(&queue->queue_lock);
  82. if (queue->cnt >= queue->max) {
  83. queue->drops++;
  84. bh_free_msg(msg);
  85. bh_queue_mutex_unlock(&queue->queue_lock);
  86. return false;
  87. }
  88. if (queue->cnt == 0) {
  89. bh_assert(queue->head == NULL);
  90. bh_assert(queue->tail == NULL);
  91. queue->head = queue->tail = msg;
  92. msg->next = msg->prev = NULL;
  93. queue->cnt = 1;
  94. bh_queue_cond_signal(&queue->queue_wait_cond);
  95. }
  96. else {
  97. msg->next = NULL;
  98. msg->prev = queue->tail;
  99. queue->tail->next = msg;
  100. queue->tail = msg;
  101. queue->cnt++;
  102. }
  103. bh_queue_mutex_unlock(&queue->queue_lock);
  104. return true;
  105. }
  106. bool
  107. bh_post_msg(bh_queue *queue, unsigned short tag, void *body, unsigned int len)
  108. {
  109. bh_queue_node *msg = bh_new_msg(tag, body, len, NULL);
  110. if (msg == NULL) {
  111. bh_queue_mutex_lock(&queue->queue_lock);
  112. queue->drops++;
  113. bh_queue_mutex_unlock(&queue->queue_lock);
  114. if (len != 0 && body)
  115. BH_FREE(body);
  116. return false;
  117. }
  118. if (!bh_post_msg2(queue, msg)) {
  119. // bh_post_msg2 already freed the msg for failure
  120. return false;
  121. }
  122. return true;
  123. }
  124. bh_queue_node *
  125. bh_new_msg(unsigned short tag, void *body, unsigned int len, void *handler)
  126. {
  127. bh_queue_node *msg =
  128. (bh_queue_node *)bh_queue_malloc(sizeof(bh_queue_node));
  129. if (msg == NULL)
  130. return NULL;
  131. memset(msg, 0, sizeof(bh_queue_node));
  132. msg->len = len;
  133. msg->body = body;
  134. msg->tag = tag;
  135. msg->msg_cleaner = (bh_msg_cleaner)handler;
  136. return msg;
  137. }
  138. void
  139. bh_free_msg(bh_queue_node *msg)
  140. {
  141. if (msg->msg_cleaner) {
  142. msg->msg_cleaner(msg->body);
  143. bh_queue_free(msg);
  144. return;
  145. }
  146. // note: sometimes we just use the payload pointer for an integer value
  147. // len!=0 is the only indicator about the body is an allocated buffer.
  148. if (msg->body && msg->len)
  149. bh_queue_free(msg->body);
  150. bh_queue_free(msg);
  151. }
  152. bh_message_t
  153. bh_get_msg(bh_queue *queue, uint64 timeout_us)
  154. {
  155. bh_queue_node *msg = NULL;
  156. bh_queue_mutex_lock(&queue->queue_lock);
  157. if (queue->cnt == 0) {
  158. bh_assert(queue->head == NULL);
  159. bh_assert(queue->tail == NULL);
  160. if (timeout_us == 0) {
  161. bh_queue_mutex_unlock(&queue->queue_lock);
  162. return NULL;
  163. }
  164. bh_queue_cond_timedwait(&queue->queue_wait_cond, &queue->queue_lock,
  165. timeout_us);
  166. }
  167. if (queue->cnt == 0) {
  168. bh_assert(queue->head == NULL);
  169. bh_assert(queue->tail == NULL);
  170. }
  171. else if (queue->cnt == 1) {
  172. bh_assert(queue->head == queue->tail);
  173. msg = queue->head;
  174. queue->head = queue->tail = NULL;
  175. queue->cnt = 0;
  176. }
  177. else {
  178. msg = queue->head;
  179. queue->head = queue->head->next;
  180. queue->head->prev = NULL;
  181. queue->cnt--;
  182. }
  183. bh_queue_mutex_unlock(&queue->queue_lock);
  184. return msg;
  185. }
  186. unsigned
  187. bh_queue_get_message_count(bh_queue *queue)
  188. {
  189. if (!queue)
  190. return 0;
  191. return queue->cnt;
  192. }
  193. void
  194. bh_queue_enter_loop_run(bh_queue *queue, bh_queue_handle_msg_callback handle_cb,
  195. void *arg)
  196. {
  197. if (!queue)
  198. return;
  199. while (!queue->exit_loop_run) {
  200. bh_queue_node *message = bh_get_msg(queue, BHT_WAIT_FOREVER);
  201. if (message) {
  202. handle_cb(message, arg);
  203. bh_free_msg(message);
  204. }
  205. }
  206. }
  207. void
  208. bh_queue_exit_loop_run(bh_queue *queue)
  209. {
  210. if (queue) {
  211. queue->exit_loop_run = true;
  212. bh_queue_cond_signal(&queue->queue_wait_cond);
  213. }
  214. }