bh_queue.c 5.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244
  1. /*
  2. * Copyright (C) 2019 Intel Corporation. All rights reserved.
  3. * SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception
  4. */
  5. #include "bh_queue.h"
  6. typedef struct bh_queue_node {
  7. struct bh_queue_node * next;
  8. struct bh_queue_node * prev;
  9. unsigned short tag;
  10. unsigned int len;
  11. void * body;
  12. bh_msg_cleaner msg_cleaner;
  13. } bh_queue_node;
  14. struct bh_queue {
  15. bh_queue_mutex queue_lock;
  16. bh_queue_cond queue_wait_cond;
  17. unsigned int cnt;
  18. unsigned int max;
  19. unsigned int drops;
  20. bh_queue_node * head;
  21. bh_queue_node * tail;
  22. bool exit_loop_run;
  23. };
  24. char * bh_message_payload(bh_message_t message)
  25. {
  26. return message->body;
  27. }
  28. uint32 bh_message_payload_len(bh_message_t message)
  29. {
  30. return message->len;
  31. }
  32. int bh_message_type(bh_message_t message)
  33. {
  34. return message->tag;
  35. }
  36. bh_queue *
  37. bh_queue_create()
  38. {
  39. int ret;
  40. bh_queue *queue = bh_queue_malloc(sizeof(bh_queue));
  41. if (queue) {
  42. memset(queue, 0, sizeof(bh_queue));
  43. queue->max = DEFAULT_QUEUE_LENGTH;
  44. ret = bh_queue_mutex_init(&queue->queue_lock);
  45. if (ret != 0) {
  46. bh_queue_free(queue);
  47. return NULL;
  48. }
  49. ret = bh_queue_cond_init(&queue->queue_wait_cond);
  50. if (ret != 0) {
  51. bh_queue_mutex_destroy(&queue->queue_lock);
  52. bh_queue_free(queue);
  53. return NULL;
  54. }
  55. }
  56. return queue;
  57. }
  58. void bh_queue_destroy(bh_queue *queue)
  59. {
  60. bh_queue_node *node;
  61. if (!queue)
  62. return;
  63. bh_queue_mutex_lock(&queue->queue_lock);
  64. while (queue->head) {
  65. node = queue->head;
  66. queue->head = node->next;
  67. bh_free_msg(node);
  68. }
  69. bh_queue_mutex_unlock(&queue->queue_lock);
  70. bh_queue_cond_destroy(&queue->queue_wait_cond);
  71. bh_queue_mutex_destroy(&queue->queue_lock);
  72. bh_queue_free(queue);
  73. }
  74. bool bh_post_msg2(bh_queue *queue, bh_queue_node *msg)
  75. {
  76. if (queue->cnt >= queue->max) {
  77. queue->drops++;
  78. bh_free_msg(msg);
  79. return false;
  80. }
  81. bh_queue_mutex_lock(&queue->queue_lock);
  82. if (queue->cnt == 0) {
  83. bh_assert(queue->head == NULL);
  84. bh_assert(queue->tail == NULL);
  85. queue->head = queue->tail = msg;
  86. msg->next = msg->prev = NULL;
  87. queue->cnt = 1;
  88. bh_queue_cond_signal(&queue->queue_wait_cond);
  89. } else {
  90. msg->next = NULL;
  91. msg->prev = queue->tail;
  92. queue->tail->next = msg;
  93. queue->tail = msg;
  94. queue->cnt++;
  95. }
  96. bh_queue_mutex_unlock(&queue->queue_lock);
  97. return true;
  98. }
  99. bool bh_post_msg(bh_queue *queue, unsigned short tag, void *body,
  100. unsigned int len)
  101. {
  102. bh_queue_node *msg = bh_new_msg(tag, body, len, NULL);
  103. if (msg == NULL) {
  104. queue->drops++;
  105. if (len != 0 && body)
  106. BH_FREE(body);
  107. return false;
  108. }
  109. if (!bh_post_msg2(queue, msg)) {
  110. // bh_post_msg2 already freed the msg for failure
  111. return false;
  112. }
  113. return true;
  114. }
  115. bh_queue_node * bh_new_msg(unsigned short tag, void *body, unsigned int len,
  116. void * handler)
  117. {
  118. bh_queue_node *msg = (bh_queue_node*)
  119. bh_queue_malloc(sizeof(bh_queue_node));
  120. if (msg == NULL)
  121. return NULL;
  122. memset(msg, 0, sizeof(bh_queue_node));
  123. msg->len = len;
  124. msg->body = body;
  125. msg->tag = tag;
  126. msg->msg_cleaner = (bh_msg_cleaner) handler;
  127. return msg;
  128. }
  129. void bh_free_msg(bh_queue_node *msg)
  130. {
  131. if (msg->msg_cleaner) {
  132. msg->msg_cleaner(msg->body);
  133. bh_queue_free(msg);
  134. return;
  135. }
  136. // note: sometime we just use the payload pointer for a integer value
  137. // len!=0 is the only indicator about the body is an allocated buffer.
  138. if (msg->body && msg->len)
  139. bh_queue_free(msg->body);
  140. bh_queue_free(msg);
  141. }
  142. bh_message_t bh_get_msg(bh_queue *queue, uint64 timeout_us)
  143. {
  144. bh_queue_node *msg = NULL;
  145. bh_queue_mutex_lock(&queue->queue_lock);
  146. if (queue->cnt == 0) {
  147. bh_assert(queue->head == NULL);
  148. bh_assert(queue->tail == NULL);
  149. if (timeout_us == 0) {
  150. bh_queue_mutex_unlock(&queue->queue_lock);
  151. return NULL;
  152. }
  153. bh_queue_cond_timedwait(&queue->queue_wait_cond, &queue->queue_lock,
  154. timeout_us);
  155. }
  156. if (queue->cnt == 0) {
  157. bh_assert(queue->head == NULL);
  158. bh_assert(queue->tail == NULL);
  159. } else if (queue->cnt == 1) {
  160. bh_assert(queue->head == queue->tail);
  161. msg = queue->head;
  162. queue->head = queue->tail = NULL;
  163. queue->cnt = 0;
  164. } else {
  165. msg = queue->head;
  166. queue->head = queue->head->next;
  167. queue->head->prev = NULL;
  168. queue->cnt--;
  169. }
  170. bh_queue_mutex_unlock(&queue->queue_lock);
  171. return msg;
  172. }
  173. unsigned bh_queue_get_message_count(bh_queue *queue)
  174. {
  175. if (!queue)
  176. return 0;
  177. return queue->cnt;
  178. }
  179. void bh_queue_enter_loop_run(bh_queue *queue,
  180. bh_queue_handle_msg_callback handle_cb,
  181. void *arg)
  182. {
  183. if (!queue)
  184. return;
  185. while (!queue->exit_loop_run) {
  186. bh_queue_node * message = bh_get_msg(queue, BHT_WAIT_FOREVER);
  187. if (message) {
  188. handle_cb(message, arg);
  189. bh_free_msg(message);
  190. }
  191. }
  192. }
  193. void bh_queue_exit_loop_run(bh_queue *queue)
  194. {
  195. if (queue) {
  196. queue->exit_loop_run = true;
  197. bh_queue_cond_signal(&queue->queue_wait_cond);
  198. }
  199. }