bh_queue.c 5.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258
  1. /*
  2. * Copyright (C) 2019 Intel Corporation. All rights reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. #include "bh_queue.h"
  17. #include "bh_thread.h"
  18. #include "bh_memory.h"
  19. #include "bh_time.h"
  20. #include "bh_common.h"
  21. typedef struct _bh_queue_node {
  22. struct _bh_queue_node * next;
  23. struct _bh_queue_node * prev;
  24. unsigned short tag;
  25. unsigned long len;
  26. void * body;
  27. bh_msg_cleaner msg_cleaner;
  28. } bh_queue_node;
  29. struct bh_queue {
  30. bh_queue_mutex queue_lock;
  31. bh_queue_cond queue_wait_cond;
  32. unsigned int cnt;
  33. unsigned int max;
  34. unsigned int drops;
  35. bh_queue_node * head;
  36. bh_queue_node * tail;
  37. bool exit_loop_run;
  38. };
  39. char * bh_message_payload(bh_message_t message)
  40. {
  41. return message->body;
  42. }
  43. int bh_message_payload_len(bh_message_t message)
  44. {
  45. return message->len;
  46. }
  47. int bh_message_type(bh_message_t message)
  48. {
  49. return message->tag;
  50. }
  51. bh_queue *
  52. bh_queue_create()
  53. {
  54. int ret;
  55. bh_queue *queue = bh_queue_malloc(sizeof(bh_queue));
  56. if (queue) {
  57. memset(queue, 0, sizeof(bh_queue));
  58. queue->max = DEFAULT_QUEUE_LENGTH;
  59. ret = bh_queue_mutex_init(&queue->queue_lock);
  60. if (ret != 0) {
  61. bh_queue_free(queue);
  62. return NULL;
  63. }
  64. ret = bh_queue_cond_init(&queue->queue_wait_cond);
  65. if (ret != 0) {
  66. bh_queue_mutex_destroy(&queue->queue_lock);
  67. bh_queue_free(queue);
  68. return NULL;
  69. }
  70. }
  71. return queue;
  72. }
  73. void bh_queue_destroy(bh_queue *queue)
  74. {
  75. bh_queue_node *node;
  76. if (!queue)
  77. return;
  78. bh_queue_mutex_lock(&queue->queue_lock);
  79. while (queue->head) {
  80. node = queue->head;
  81. queue->head = node->next;
  82. bh_free_msg(node);
  83. }
  84. bh_queue_mutex_unlock(&queue->queue_lock);
  85. bh_queue_cond_destroy(&queue->queue_wait_cond);
  86. bh_queue_mutex_destroy(&queue->queue_lock);
  87. bh_queue_free(queue);
  88. }
  89. bool bh_post_msg2(bh_queue *queue, bh_queue_node *msg)
  90. {
  91. if (queue->cnt >= queue->max) {
  92. queue->drops++;
  93. bh_free_msg(msg);
  94. return false;
  95. }
  96. bh_queue_mutex_lock(&queue->queue_lock);
  97. if (queue->cnt == 0) {
  98. bh_assert(queue->head == NULL);
  99. bh_assert(queue->tail == NULL);
  100. queue->head = queue->tail = msg;
  101. msg->next = msg->prev = NULL;
  102. queue->cnt = 1;
  103. bh_queue_cond_signal(&queue->queue_wait_cond);
  104. } else {
  105. msg->next = NULL;
  106. msg->prev = queue->tail;
  107. queue->tail->next = msg;
  108. queue->tail = msg;
  109. queue->cnt++;
  110. }
  111. bh_queue_mutex_unlock(&queue->queue_lock);
  112. return true;
  113. }
  114. bool bh_post_msg(bh_queue *queue, unsigned short tag, void *body,
  115. unsigned int len)
  116. {
  117. bh_queue_node *msg = bh_new_msg(tag, body, len, NULL);
  118. if (msg == NULL) {
  119. queue->drops++;
  120. if (len != 0 && body)
  121. bh_free(body);
  122. return false;
  123. }
  124. if (!bh_post_msg2(queue, msg)) {
  125. // bh_post_msg2 already freed the msg for failure
  126. return false;
  127. }
  128. return true;
  129. }
  130. bh_queue_node * bh_new_msg(unsigned short tag, void *body, unsigned int len,
  131. void * handler)
  132. {
  133. bh_queue_node *msg = (bh_queue_node*) bh_queue_malloc(
  134. sizeof(bh_queue_node));
  135. if (msg == NULL)
  136. return NULL;
  137. memset(msg, 0, sizeof(bh_queue_node));
  138. msg->len = len;
  139. msg->body = body;
  140. msg->tag = tag;
  141. msg->msg_cleaner = (bh_msg_cleaner) handler;
  142. return msg;
  143. }
  144. void bh_free_msg(bh_queue_node *msg)
  145. {
  146. if (msg->msg_cleaner) {
  147. msg->msg_cleaner(msg->body);
  148. bh_queue_free(msg);
  149. return;
  150. }
  151. // note: sometime we just use the payload pointer for a integer value
  152. // len!=0 is the only indicator about the body is an allocated buffer.
  153. if (msg->body && msg->len)
  154. bh_queue_free(msg->body);
  155. bh_queue_free(msg);
  156. }
  157. bh_message_t bh_get_msg(bh_queue *queue, int timeout)
  158. {
  159. bh_queue_node *msg = NULL;
  160. bh_queue_mutex_lock(&queue->queue_lock);
  161. if (queue->cnt == 0) {
  162. bh_assert(queue->head == NULL);
  163. bh_assert(queue->tail == NULL);
  164. if (timeout == 0) {
  165. bh_queue_mutex_unlock(&queue->queue_lock);
  166. return NULL;
  167. }
  168. bh_queue_cond_timedwait(&queue->queue_wait_cond, &queue->queue_lock,
  169. timeout);
  170. }
  171. if (queue->cnt == 0) {
  172. bh_assert(queue->head == NULL);
  173. bh_assert(queue->tail == NULL);
  174. } else if (queue->cnt == 1) {
  175. bh_assert(queue->head == queue->tail);
  176. msg = queue->head;
  177. queue->head = queue->tail = NULL;
  178. queue->cnt = 0;
  179. } else {
  180. msg = queue->head;
  181. queue->head = queue->head->next;
  182. queue->head->prev = NULL;
  183. queue->cnt--;
  184. }
  185. bh_queue_mutex_unlock(&queue->queue_lock);
  186. return msg;
  187. }
  188. unsigned bh_queue_get_message_count(bh_queue *queue)
  189. {
  190. if (!queue)
  191. return 0;
  192. return queue->cnt;
  193. }
  194. void bh_queue_enter_loop_run(bh_queue *queue,
  195. bh_queue_handle_msg_callback handle_cb)
  196. {
  197. if (!queue)
  198. return;
  199. while (!queue->exit_loop_run) {
  200. bh_queue_node * message = bh_get_msg(queue, BH_WAIT_FOREVER);
  201. if (message) {
  202. handle_cb(message);
  203. bh_free_msg(message);
  204. }
  205. }
  206. }
  207. void bh_queue_exit_loop_run(bh_queue *queue)
  208. {
  209. if (queue) {
  210. queue->exit_loop_run = true;
  211. bh_queue_cond_signal(&queue->queue_wait_cond);
  212. }
  213. }