workqueue.c 5.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237
  1. /*
  2. * Copyright (c) 2006-2018, RT-Thread Development Team
  3. *
  4. * SPDX-License-Identifier: Apache-2.0
  5. *
  6. * Change Logs:
  7. * Date Author Notes
  8. * 2017-02-27 bernard fix the re-work issue.
  9. */
  10. #include <rthw.h>
  11. #include <rtthread.h>
  12. #include <rtdevice.h>
  13. #ifdef RT_USING_HEAP
  14. rt_inline rt_err_t _workqueue_work_completion(struct rt_workqueue *queue)
  15. {
  16. rt_err_t result;
  17. rt_enter_critical();
  18. while (1)
  19. {
  20. /* try to take condition semaphore */
  21. result = rt_sem_trytake(&(queue->sem));
  22. if (result == -RT_ETIMEOUT)
  23. {
  24. /* it's timeout, release this semaphore */
  25. rt_sem_release(&(queue->sem));
  26. }
  27. else if (result == RT_EOK)
  28. {
  29. /* keep the sem value = 0 */
  30. result = RT_EOK;
  31. break;
  32. }
  33. else
  34. {
  35. result = -RT_ERROR;
  36. break;
  37. }
  38. }
  39. rt_exit_critical();
  40. return result;
  41. }
  42. static void _workqueue_thread_entry(void* parameter)
  43. {
  44. rt_base_t level;
  45. struct rt_work* work;
  46. struct rt_workqueue* queue;
  47. queue = (struct rt_workqueue*) parameter;
  48. RT_ASSERT(queue != RT_NULL);
  49. while (1)
  50. {
  51. if (rt_list_isempty(&(queue->work_list)))
  52. {
  53. /* no software timer exist, suspend self. */
  54. rt_thread_suspend(rt_thread_self());
  55. rt_schedule();
  56. }
  57. /* we have work to do with. */
  58. level = rt_hw_interrupt_disable();
  59. work = rt_list_entry(queue->work_list.next, struct rt_work, list);
  60. rt_list_remove(&(work->list));
  61. queue->work_current = work;
  62. rt_hw_interrupt_enable(level);
  63. /* do work */
  64. work->work_func(work, work->work_data);
  65. level = rt_hw_interrupt_disable();
  66. /* clean current work */
  67. queue->work_current = RT_NULL;
  68. rt_hw_interrupt_enable(level);
  69. /* ack work completion */
  70. _workqueue_work_completion(queue);
  71. }
  72. }
  73. struct rt_workqueue *rt_workqueue_create(const char* name, rt_uint16_t stack_size, rt_uint8_t priority)
  74. {
  75. struct rt_workqueue *queue = RT_NULL;
  76. queue = (struct rt_workqueue*)RT_KERNEL_MALLOC(sizeof(struct rt_workqueue));
  77. if (queue != RT_NULL)
  78. {
  79. /* initialize work list */
  80. rt_list_init(&(queue->work_list));
  81. queue->work_current = RT_NULL;
  82. rt_sem_init(&(queue->sem), "wqueue", 0, RT_IPC_FLAG_FIFO);
  83. /* create the work thread */
  84. queue->work_thread = rt_thread_create(name, _workqueue_thread_entry, queue, stack_size, priority, 10);
  85. if (queue->work_thread == RT_NULL)
  86. {
  87. RT_KERNEL_FREE(queue);
  88. return RT_NULL;
  89. }
  90. rt_thread_startup(queue->work_thread);
  91. }
  92. return queue;
  93. }
  94. rt_err_t rt_workqueue_destroy(struct rt_workqueue* queue)
  95. {
  96. RT_ASSERT(queue != RT_NULL);
  97. rt_thread_delete(queue->work_thread);
  98. RT_KERNEL_FREE(queue);
  99. return RT_EOK;
  100. }
  101. rt_err_t rt_workqueue_dowork(struct rt_workqueue* queue, struct rt_work* work)
  102. {
  103. rt_base_t level;
  104. RT_ASSERT(queue != RT_NULL);
  105. RT_ASSERT(work != RT_NULL);
  106. level = rt_hw_interrupt_disable();
  107. if (queue->work_current == work)
  108. {
  109. rt_hw_interrupt_enable(level);
  110. return -RT_EBUSY;
  111. }
  112. /* NOTE: the work MUST be initialized firstly */
  113. rt_list_remove(&(work->list));
  114. rt_list_insert_after(queue->work_list.prev, &(work->list));
  115. /* whether the workqueue is doing work */
  116. if (queue->work_current == RT_NULL)
  117. {
  118. rt_hw_interrupt_enable(level);
  119. /* resume work thread */
  120. rt_thread_resume(queue->work_thread);
  121. rt_schedule();
  122. }
  123. else rt_hw_interrupt_enable(level);
  124. return RT_EOK;
  125. }
  126. rt_err_t rt_workqueue_critical_work(struct rt_workqueue* queue, struct rt_work* work)
  127. {
  128. rt_base_t level;
  129. RT_ASSERT(queue != RT_NULL);
  130. RT_ASSERT(work != RT_NULL);
  131. level = rt_hw_interrupt_disable();
  132. if (queue->work_current == work)
  133. {
  134. rt_hw_interrupt_enable(level);
  135. return -RT_EBUSY;
  136. }
  137. /* NOTE: the work MUST be initialized firstly */
  138. rt_list_remove(&(work->list));
  139. rt_list_insert_after(queue->work_list.prev, &(work->list));
  140. if (queue->work_current == RT_NULL)
  141. {
  142. rt_hw_interrupt_enable(level);
  143. /* resume work thread */
  144. rt_thread_resume(queue->work_thread);
  145. rt_schedule();
  146. }
  147. else rt_hw_interrupt_enable(level);
  148. return RT_EOK;
  149. }
  150. rt_err_t rt_workqueue_cancel_work(struct rt_workqueue* queue, struct rt_work* work)
  151. {
  152. rt_base_t level;
  153. RT_ASSERT(queue != RT_NULL);
  154. RT_ASSERT(work != RT_NULL);
  155. level = rt_hw_interrupt_disable();
  156. if (queue->work_current == work)
  157. {
  158. rt_hw_interrupt_enable(level);
  159. return -RT_EBUSY;
  160. }
  161. rt_list_remove(&(work->list));
  162. rt_hw_interrupt_enable(level);
  163. return RT_EOK;
  164. }
  165. rt_err_t rt_workqueue_cancel_work_sync(struct rt_workqueue* queue, struct rt_work* work)
  166. {
  167. rt_base_t level;
  168. RT_ASSERT(queue != RT_NULL);
  169. RT_ASSERT(work != RT_NULL);
  170. level = rt_hw_interrupt_disable();
  171. if (queue->work_current == work) /* it's current work in the queue */
  172. {
  173. /* wait for work completion */
  174. rt_sem_take(&(queue->sem), RT_WAITING_FOREVER);
  175. }
  176. else
  177. {
  178. rt_list_remove(&(work->list));
  179. }
  180. rt_hw_interrupt_enable(level);
  181. return RT_EOK;
  182. }
  183. rt_err_t rt_workqueue_cancel_all_work(struct rt_workqueue* queue)
  184. {
  185. struct rt_list_node *node, *next;
  186. RT_ASSERT(queue != RT_NULL);
  187. rt_enter_critical();
  188. for (node = queue->work_list.next; node != &(queue->work_list); node = next)
  189. {
  190. next = node->next;
  191. rt_list_remove(node);
  192. }
  193. rt_exit_critical();
  194. return RT_EOK;
  195. }
  196. #endif