dataqueue.c 9.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350
  1. /*
  2. * File : dataqueue.c
  3. * This file is part of RT-Thread RTOS
  4. * COPYRIGHT (C) 2012, RT-Thread Development Team
  5. *
  6. * This program is free software; you can redistribute it and/or modify
  7. * it under the terms of the GNU General Public License as published by
  8. * the Free Software Foundation; either version 2 of the License, or
  9. * (at your option) any later version.
  10. *
  11. * This program is distributed in the hope that it will be useful,
  12. * but WITHOUT ANY WARRANTY; without even the implied warranty of
  13. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  14. * GNU General Public License for more details.
  15. *
  16. * You should have received a copy of the GNU General Public License along
  17. * with this program; if not, write to the Free Software Foundation, Inc.,
  18. * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
  19. *
  20. * Change Logs:
  21. * Date Author Notes
  22. * 2012-09-30 Bernard first version.
  23. * 2016-10-31 armink fix some resume push and pop thread bugs
  24. */
  25. #include <rtthread.h>
  26. #include <rtdevice.h>
  27. #include <rthw.h>
  28. struct rt_data_item
  29. {
  30. const void *data_ptr;
  31. rt_size_t data_size;
  32. };
  33. rt_err_t
  34. rt_data_queue_init(struct rt_data_queue *queue,
  35. rt_uint16_t size,
  36. rt_uint16_t lwm,
  37. void (*evt_notify)(struct rt_data_queue *queue, rt_uint32_t event))
  38. {
  39. RT_ASSERT(queue != RT_NULL);
  40. queue->evt_notify = evt_notify;
  41. queue->size = size;
  42. queue->lwm = lwm;
  43. queue->waiting_lwm = RT_FALSE;
  44. queue->get_index = 0;
  45. queue->put_index = 0;
  46. rt_list_init(&(queue->suspended_push_list));
  47. rt_list_init(&(queue->suspended_pop_list));
  48. queue->queue = (struct rt_data_item *)rt_malloc(sizeof(struct rt_data_item) * size);
  49. if (queue->queue == RT_NULL)
  50. {
  51. return -RT_ENOMEM;
  52. }
  53. return RT_EOK;
  54. }
  55. RTM_EXPORT(rt_data_queue_init);
  56. rt_err_t rt_data_queue_push(struct rt_data_queue *queue,
  57. const void *data_ptr,
  58. rt_size_t data_size,
  59. rt_int32_t timeout)
  60. {
  61. rt_ubase_t level;
  62. rt_thread_t thread;
  63. rt_err_t result;
  64. RT_ASSERT(queue != RT_NULL);
  65. result = RT_EOK;
  66. thread = rt_thread_self();
  67. level = rt_hw_interrupt_disable();
  68. while (queue->put_index - queue->get_index == queue->size)
  69. {
  70. queue->waiting_lwm = RT_TRUE;
  71. /* queue is full */
  72. if (timeout == 0)
  73. {
  74. result = -RT_ETIMEOUT;
  75. goto __exit;
  76. }
  77. /* current context checking */
  78. RT_DEBUG_NOT_IN_INTERRUPT;
  79. /* reset thread error number */
  80. thread->error = RT_EOK;
  81. /* suspend thread on the push list */
  82. rt_thread_suspend(thread);
  83. rt_list_insert_before(&(queue->suspended_push_list), &(thread->tlist));
  84. /* start timer */
  85. if (timeout > 0)
  86. {
  87. /* reset the timeout of thread timer and start it */
  88. rt_timer_control(&(thread->thread_timer),
  89. RT_TIMER_CTRL_SET_TIME,
  90. &timeout);
  91. rt_timer_start(&(thread->thread_timer));
  92. }
  93. /* enable interrupt */
  94. rt_hw_interrupt_enable(level);
  95. /* do schedule */
  96. rt_schedule();
  97. /* thread is waked up */
  98. result = thread->error;
  99. level = rt_hw_interrupt_disable();
  100. if (result != RT_EOK) goto __exit;
  101. }
  102. queue->queue[queue->put_index % queue->size].data_ptr = data_ptr;
  103. queue->queue[queue->put_index % queue->size].data_size = data_size;
  104. queue->put_index += 1;
  105. if (!rt_list_isempty(&(queue->suspended_pop_list)))
  106. {
  107. /* there is at least one thread in suspended list */
  108. /* get thread entry */
  109. thread = rt_list_entry(queue->suspended_pop_list.next,
  110. struct rt_thread,
  111. tlist);
  112. /* resume it */
  113. rt_thread_resume(thread);
  114. rt_hw_interrupt_enable(level);
  115. /* perform a schedule */
  116. rt_schedule();
  117. return result;
  118. }
  119. __exit:
  120. rt_hw_interrupt_enable(level);
  121. if ((result == RT_EOK) && queue->evt_notify != RT_NULL)
  122. {
  123. queue->evt_notify(queue, RT_DATAQUEUE_EVENT_PUSH);
  124. }
  125. return result;
  126. }
  127. RTM_EXPORT(rt_data_queue_push);
  128. rt_err_t rt_data_queue_pop(struct rt_data_queue *queue,
  129. const void** data_ptr,
  130. rt_size_t *size,
  131. rt_int32_t timeout)
  132. {
  133. rt_ubase_t level;
  134. rt_thread_t thread;
  135. rt_err_t result;
  136. RT_ASSERT(queue != RT_NULL);
  137. RT_ASSERT(data_ptr != RT_NULL);
  138. RT_ASSERT(size != RT_NULL);
  139. result = RT_EOK;
  140. thread = rt_thread_self();
  141. level = rt_hw_interrupt_disable();
  142. while (queue->get_index == queue->put_index)
  143. {
  144. /* queue is empty */
  145. if (timeout == 0)
  146. {
  147. result = -RT_ETIMEOUT;
  148. goto __exit;
  149. }
  150. /* current context checking */
  151. RT_DEBUG_NOT_IN_INTERRUPT;
  152. /* reset thread error number */
  153. thread->error = RT_EOK;
  154. /* suspend thread on the pop list */
  155. rt_thread_suspend(thread);
  156. rt_list_insert_before(&(queue->suspended_pop_list), &(thread->tlist));
  157. /* start timer */
  158. if (timeout > 0)
  159. {
  160. /* reset the timeout of thread timer and start it */
  161. rt_timer_control(&(thread->thread_timer),
  162. RT_TIMER_CTRL_SET_TIME,
  163. &timeout);
  164. rt_timer_start(&(thread->thread_timer));
  165. }
  166. /* enable interrupt */
  167. rt_hw_interrupt_enable(level);
  168. /* do schedule */
  169. rt_schedule();
  170. /* thread is waked up */
  171. result = thread->error;
  172. level = rt_hw_interrupt_disable();
  173. if (result != RT_EOK)
  174. goto __exit;
  175. }
  176. *data_ptr = queue->queue[queue->get_index % queue->size].data_ptr;
  177. *size = queue->queue[queue->get_index % queue->size].data_size;
  178. queue->get_index += 1;
  179. if ((queue->waiting_lwm == RT_TRUE) &&
  180. (queue->put_index - queue->get_index) <= queue->lwm)
  181. {
  182. /*
  183. * there is at least one thread in suspended list
  184. * and less than low water mark
  185. */
  186. if (!rt_list_isempty(&(queue->suspended_push_list)))
  187. {
  188. /* get thread entry */
  189. thread = rt_list_entry(queue->suspended_push_list.next,
  190. struct rt_thread,
  191. tlist);
  192. /* resume it */
  193. rt_thread_resume(thread);
  194. rt_hw_interrupt_enable(level);
  195. /* perform a schedule */
  196. rt_schedule();
  197. }
  198. else
  199. {
  200. queue->waiting_lwm = RT_FALSE;
  201. rt_hw_interrupt_enable(level);
  202. }
  203. if (queue->evt_notify != RT_NULL)
  204. queue->evt_notify(queue, RT_DATAQUEUE_EVENT_LWM);
  205. return result;
  206. }
  207. __exit:
  208. rt_hw_interrupt_enable(level);
  209. if ((result == RT_EOK) && (queue->evt_notify != RT_NULL))
  210. {
  211. queue->evt_notify(queue, RT_DATAQUEUE_EVENT_POP);
  212. }
  213. return result;
  214. }
  215. RTM_EXPORT(rt_data_queue_pop);
  216. rt_err_t rt_data_queue_peak(struct rt_data_queue *queue,
  217. const void** data_ptr,
  218. rt_size_t *size)
  219. {
  220. rt_ubase_t level;
  221. RT_ASSERT(queue != RT_NULL);
  222. level = rt_hw_interrupt_disable();
  223. if (queue->get_index == queue->put_index)
  224. {
  225. rt_hw_interrupt_enable(level);
  226. return -RT_EEMPTY;
  227. }
  228. *data_ptr = queue->queue[queue->get_index % queue->size].data_ptr;
  229. *size = queue->queue[queue->get_index % queue->size].data_size;
  230. rt_hw_interrupt_enable(level);
  231. return RT_EOK;
  232. }
  233. RTM_EXPORT(rt_data_queue_peak);
  234. void rt_data_queue_reset(struct rt_data_queue *queue)
  235. {
  236. struct rt_thread *thread;
  237. register rt_ubase_t temp;
  238. rt_enter_critical();
  239. /* wakeup all suspend threads */
  240. /* resume on pop list */
  241. while (!rt_list_isempty(&(queue->suspended_pop_list)))
  242. {
  243. /* disable interrupt */
  244. temp = rt_hw_interrupt_disable();
  245. /* get next suspend thread */
  246. thread = rt_list_entry(queue->suspended_pop_list.next,
  247. struct rt_thread,
  248. tlist);
  249. /* set error code to RT_ERROR */
  250. thread->error = -RT_ERROR;
  251. /*
  252. * resume thread
  253. * In rt_thread_resume function, it will remove current thread from
  254. * suspend list
  255. */
  256. rt_thread_resume(thread);
  257. /* enable interrupt */
  258. rt_hw_interrupt_enable(temp);
  259. }
  260. /* resume on push list */
  261. while (!rt_list_isempty(&(queue->suspended_push_list)))
  262. {
  263. /* disable interrupt */
  264. temp = rt_hw_interrupt_disable();
  265. /* get next suspend thread */
  266. thread = rt_list_entry(queue->suspended_push_list.next,
  267. struct rt_thread,
  268. tlist);
  269. /* set error code to RT_ERROR */
  270. thread->error = -RT_ERROR;
  271. /*
  272. * resume thread
  273. * In rt_thread_resume function, it will remove current thread from
  274. * suspend list
  275. */
  276. rt_thread_resume(thread);
  277. /* enable interrupt */
  278. rt_hw_interrupt_enable(temp);
  279. }
  280. rt_exit_critical();
  281. rt_schedule();
  282. }
  283. RTM_EXPORT(rt_data_queue_reset);