ppool.c 2.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131
  1. #include <rtthread.h>
  2. #include "ppool.h"
  3. //线程池执行任务函数
  4. void ppool_run(pool_t *pool);
  5. pool_t *ppool_init(int pool_max_num)
  6. {
  7. pool_t *pool;
  8. pool_w *head;
  9. int i;
  10. pool=rt_malloc(sizeof(pool_t));
  11. if(!pool)
  12. {
  13. ppool_errno=PE_POOL_NO_MEM;
  14. return NULL;
  15. }
  16. //创建任务队列
  17. head=ppool_queue_init();
  18. if(!head)
  19. {
  20. rt_free(pool);
  21. return NULL;
  22. }
  23. pool->pool_max_num=pool_max_num;
  24. pool->rel_num=0;
  25. pool->head=head;
  26. pool->id=rt_malloc(sizeof(pthread_t)*pool_max_num);
  27. if(!pool->id)
  28. {
  29. ppool_errno=PE_THREAD_NO_MEM;
  30. rt_free(head);
  31. rt_free(pool);
  32. return NULL;
  33. }
  34. if(pthread_mutex_init(&pool->ppool_lock,NULL) != 0)
  35. {
  36. ppool_errno=PE_THREAD_MUTEX_ERROR;
  37. rt_free(pool->id);
  38. rt_free(head);
  39. rt_free(pool);
  40. return NULL;
  41. }
  42. if(pthread_mutex_init(&PPOOL_LOCK,NULL) != 0)
  43. {
  44. ppool_errno=PE_THREAD_MUTEX_ERROR;
  45. rt_free(pool->id);
  46. rt_free(head);
  47. pthread_mutex_destroy(&pool->ppool_lock);
  48. rt_free(pool);
  49. }
  50. if(pthread_cond_init(&pool->ppool_cond,NULL) != 0)
  51. {
  52. ppool_errno=PE_THREAD_COND_ERROR;
  53. rt_free(pool->id);
  54. rt_free(head);
  55. pthread_mutex_destroy(&pool->ppool_lock);
  56. rt_free(pool);
  57. return NULL;
  58. }
  59. //创建任务
  60. for(i=0;i < pool_max_num;++i)
  61. {
  62. if(pthread_create(&pool->id[i],NULL,(void *)ppool_run,pool) == 0)
  63. ++pool->rel_num;
  64. pthread_detach(pool->id[i]);
  65. }
  66. return pool;
  67. }
  68. pbool ppool_add(pool_t *pool,pool_task *task)
  69. {
  70. pool_node *node;
  71. node=ppool_queue_new(task->task,task->arg,task->priority);
  72. if(!node)
  73. return PFALSE;
  74. while(pthread_mutex_lock(&pool->ppool_lock) != 0);
  75. ppool_queue_add(pool->head,node);
  76. while(pthread_cond_broadcast(&pool->ppool_cond) != 0);
  77. while(pthread_mutex_unlock(&pool->ppool_lock) != 0);
  78. return PTRUE;
  79. }
  80. void ppool_destroy(pool_t *pool)
  81. {
  82. int i;
  83. ppool_queue_destroy(pool->head);
  84. for(i=0;i < pool->pool_max_num;++i)
  85. pthread_cancel(pool->id[i]);
  86. rt_free(pool->id);
  87. pthread_mutex_destroy(&pool->ppool_lock);
  88. pthread_mutex_destroy(&PPOOL_LOCK);
  89. pthread_cond_destroy(&pool->ppool_cond);
  90. rt_free(pool);
  91. }
  92. void ppool_run(pool_t *pool)
  93. {
  94. pool_node *task;
  95. while(1)
  96. {
  97. while(pthread_mutex_lock(&pool->ppool_lock) != 0);
  98. while(pool->head->len <= 0)
  99. pthread_cond_wait(&pool->ppool_cond,&pool->ppool_lock);
  100. task=ppool_queue_get_task(pool->head);
  101. while(pthread_mutex_unlock(&pool->ppool_lock) != 0);
  102. if(task == NULL) continue;
  103. task->task(task->arg);
  104. rt_free(task);
  105. }
  106. }