thread_pool.c 8.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253
  1. /*
  2. * This file is part of the EasyDataManager Library.
  3. *
  4. * Copyright (C) 2013-2019 by Armink <armink.ztl@gmail.com>
  5. *
  6. * Function: a thread pool base on RT-Thread
  7. * Created on: 2013-11-14
  8. */
  9. #include <thread_pool.h>
  10. #define DBG_SECTION_NAME "thread_pool"
  11. #define DBG_LEVEL DBG_INFO
  12. #include <rtdbg.h>
  13. #ifdef PKG_USING_THREAD_POOL
  14. static thread_pool_err add_task(thread_pool_t const pool, void (*process)(void *arg), void *arg);
  15. static thread_pool_err destroy(thread_pool_t pool);
  16. static void thread_job(void* arg);
  17. static void sync_lock(thread_pool_t pool);
  18. static void sync_unlock(thread_pool_t pool);
  19. static thread_pool_err del_all(thread_pool_t const pool);
  20. /**
  21. * This function will initialize the thread pool.
  22. *
  23. * @param pool the thread_pool pointer
  24. * @param name the thread_pool name
  25. * @param max_thread_num the max thread number in this thread_pool
  26. * @param thread_stack_size the thread stack size in this thread_pool
  27. *
  28. * @return error code
  29. */
  30. thread_pool_err init_thread_pool(thread_pool_t const pool, const char* name, uint8_t max_thread_num,
  31. uint32_t thread_stack_size) {
  32. thread_pool_err errorCode = THREAD_POOL_NO_ERR;
  33. char job_name[THREAD_POOL_NAME_MAX] = { 0 };
  34. uint8_t i;
  35. RT_ASSERT(name);
  36. strncpy(pool->name, name, THREAD_POOL_NAME_MAX);
  37. strncpy(job_name, name, THREAD_POOL_NAME_MAX);
  38. pool->queue_lock = rt_mutex_create("tp_qlock", RT_IPC_FLAG_FIFO);
  39. RT_ASSERT(pool->queue_lock != NULL);
  40. pool->user_lock = rt_mutex_create("tp_ulock", RT_IPC_FLAG_FIFO);
  41. RT_ASSERT(pool->user_lock != NULL);
  42. pool->queue_ready = rt_sem_create("tp_qready", 0, RT_IPC_FLAG_FIFO);
  43. RT_ASSERT(pool->queue_ready != NULL);
  44. pool->queue_head = NULL;
  45. pool->max_thread_num = max_thread_num;
  46. pool->cur_wait_thread_num = 0;
  47. pool->is_shutdown = RT_FALSE;
  48. pool->add_task = add_task;
  49. pool->del_all = del_all;
  50. pool->destroy = destroy;
  51. pool->lock = sync_lock;
  52. pool->unlock = sync_unlock;
  53. pool->thread_id = (rt_thread_t*) rt_malloc(max_thread_num * sizeof(rt_thread_t));
  54. RT_ASSERT(pool->thread_id != NULL);
  55. for (i = 0; i < max_thread_num; i++) {
  56. rt_snprintf(job_name, THREAD_POOL_NAME_MAX, "%s_%d", name, i);
  57. pool->thread_id[i] = rt_thread_create(job_name, thread_job, pool, thread_stack_size,
  58. THREAD_POOL_JOB_DEFAULT_PRIORITY, THREAD_POOL_JOB_TICK);
  59. RT_ASSERT(pool->thread_id[i] != NULL);
  60. rt_thread_startup(pool->thread_id[i]);
  61. LOG_D("create thread success.Current total thread number is %d", i + 1);
  62. rt_thread_delay(THREAD_POOL_THREADS_INIT_TIME);
  63. }
  64. LOG_D("initialize thread pool success!");
  65. return errorCode;
  66. }
  67. /**
  68. * This function will add a task to thread pool.
  69. *
  70. * @param pool the thread_pool pointer
  71. * @param process task function pointer
  72. * @param arg task function arguments
  73. *
  74. * @return error code
  75. */
  76. static thread_pool_err add_task(thread_pool_t const pool, void (*process)(void *arg), void *arg) {
  77. thread_pool_err error_code = THREAD_POOL_NO_ERR;
  78. thread_pool_task_t member = NULL;
  79. thread_pool_task_t newtask = (thread_pool_task_t) rt_malloc(sizeof(thread_pool_task));
  80. if (!newtask) {
  81. LOG_W("Memory full!");
  82. return THREAD_POOL_MEM_FULL_ERR;
  83. }
  84. newtask->process = process;
  85. newtask->arg = arg;
  86. newtask->next = NULL;
  87. /* lock thread pool */
  88. rt_mutex_take(pool->queue_lock, RT_WAITING_FOREVER);
  89. member = pool->queue_head;
  90. /* task queue is NULL */
  91. if (member == NULL) {
  92. pool->queue_head = newtask;
  93. } else {
  94. /* look up for queue tail */
  95. while (member->next != NULL) {
  96. member = member->next;
  97. }
  98. member->next = newtask;
  99. }
  100. /* add current waiting thread number */
  101. pool->cur_wait_thread_num++;
  102. rt_mutex_release(pool->queue_lock);
  103. /* wake up a waiting thread to process task */
  104. rt_sem_release(pool->queue_ready);
  105. LOG_D("add a task to task queue success.");
  106. return error_code;
  107. }
  108. /**
  109. * This function will delete all wait task.
  110. *
  111. * @param pool the thread_pool pointer
  112. *
  113. * @return error code
  114. */
  115. static thread_pool_err del_all(thread_pool_t const pool) {
  116. thread_pool_err error = THREAD_POOL_NO_ERR;
  117. rt_mutex_take(pool->queue_lock, RT_WAITING_FOREVER);
  118. /* delete all task in queue */
  119. for (;;) {
  120. if (pool->queue_head != NULL) {
  121. rt_free(pool->queue_head);
  122. pool->queue_head = pool->queue_head->next;
  123. pool->cur_wait_thread_num--;
  124. } else {
  125. break;
  126. }
  127. }
  128. rt_sem_control(pool->queue_ready, RT_IPC_CMD_RESET, NULL);
  129. LOG_D("delete all wait task success");
  130. rt_mutex_release(pool->queue_lock);
  131. return error;
  132. }
  133. /**
  134. * This function will destroy thread pool.
  135. *
  136. * @param pool the thread_pool pointer
  137. *
  138. * @return error code
  139. */
  140. static thread_pool_err destroy(thread_pool_t pool) {
  141. thread_pool_err error = THREAD_POOL_NO_ERR;
  142. thread_pool_task_t head = NULL;
  143. uint8_t i;
  144. if (pool->is_shutdown) {/* thread already shutdown */
  145. error = THREAD_POOL_ALREADY_SHUTDOWN_ERR;
  146. }
  147. if (error == THREAD_POOL_NO_ERR) {
  148. pool->is_shutdown = RT_TRUE;
  149. /* wait all thread exit */
  150. for (i = 0; i < pool->max_thread_num; i++) {
  151. rt_thread_delete(pool->thread_id[i]);
  152. }
  153. /* wake up all thread from broadcast */
  154. /* delete mutex and semaphore then all waiting thread will wake up */
  155. rt_mutex_delete(pool->queue_lock);
  156. rt_sem_delete(pool->queue_ready);
  157. /* release memory */
  158. rt_free(pool->thread_id);
  159. pool->thread_id = NULL;
  160. /* destroy task queue */
  161. while (pool->queue_head != NULL) {
  162. head = pool->queue_head;
  163. pool->queue_head = pool->queue_head->next;
  164. rt_free(head);
  165. }
  166. /* destroy mutex */
  167. rt_mutex_delete(pool->user_lock);
  168. pool = NULL;
  169. LOG_D("Thread pool destroy success");
  170. }
  171. return error;
  172. }
  173. /**
  174. * This function is thread job.
  175. *
  176. * @param arg the thread job arguments
  177. *
  178. */
  179. static void thread_job(void* arg) {
  180. thread_pool_t pool = NULL;
  181. thread_pool_task_t task = NULL;
  182. while (1) {
  183. pool = (thread_pool_t) arg;
  184. /* lock thread pool */
  185. rt_mutex_take(pool->queue_lock, RT_WAITING_FOREVER);
  186. /* If waiting thread number is 0 ,and thread is not shutdown.
  187. * The thread will block.
  188. * Before thread block the queueLock will unlock.
  189. * After thread wake up ,the queueLock will relock.*/
  190. while (pool->cur_wait_thread_num == 0 && !pool->is_shutdown) {
  191. /* ququeReady is NULL,the thread will block */
  192. if (pool->queue_ready->value == 0) {
  193. rt_mutex_release(pool->queue_lock);
  194. rt_sem_take(pool->queue_ready, RT_WAITING_FOREVER);
  195. rt_mutex_take(pool->queue_lock, RT_WAITING_FOREVER);
  196. } else {/* ququeReady is not NULL,the ququeReady semaphore will decrease */
  197. rt_sem_take(pool->queue_ready, RT_WAITING_FOREVER);
  198. }
  199. }
  200. if (pool->is_shutdown) { /* thread pool will shutdown */
  201. rt_mutex_release(pool->queue_lock);
  202. return;
  203. }
  204. RT_ASSERT(pool->cur_wait_thread_num != 0);
  205. RT_ASSERT(pool->queue_head != NULL);
  206. /* load task to thread job */
  207. pool->cur_wait_thread_num--;
  208. task = pool->queue_head;
  209. pool->queue_head = task->next;
  210. rt_mutex_release(pool->queue_lock);
  211. /* run task */
  212. (*(task->process))(task->arg);
  213. /* release memory */
  214. rt_free(task);
  215. task = NULL;
  216. }
  217. }
  218. /**
  219. * This function will lock the synchronized lock.
  220. *
  221. * @param pool the thread_pool pointer
  222. *
  223. */
  224. static void sync_lock(thread_pool_t pool) {
  225. rt_mutex_take(pool->user_lock, RT_WAITING_FOREVER);
  226. }
  227. /**
  228. * This function will unlock the synchronized lock.
  229. *
  230. * @param pool the thread_pool pointer
  231. *
  232. */
  233. static void sync_unlock(thread_pool_t pool) {
  234. rt_mutex_release(pool->user_lock);
  235. }
  236. #endif /* PKG_USING_THREAD_POOL */