2
0

task_msg_bus.c 26 KB


  1. /*
  2. * Copyright (c) 2006-2020, RT-Thread Development Team
  3. *
  4. * SPDX-License-Identifier: Apache-2.0
  5. *
  6. * Change Logs:
  7. * Date Author Notes
  8. * 2020-03-22 sly_ant the first version
  9. */
  10. #include "task_msg_bus.h"
  11. #define DBG_TAG "task.msg.bus"
  12. #define DBG_LVL DBG_LOG
  13. #include <rtdbg.h>
  14. static rt_bool_t task_msg_bus_init_tag = RT_FALSE;
  15. static struct rt_semaphore msg_sem;
  16. static struct rt_mutex msg_lock;
  17. static struct rt_mutex msg_ref_lock;
  18. static struct rt_mutex cb_lock;
  19. static struct rt_mutex sub_lock;
  20. static struct rt_mutex wt_lock;
  21. static rt_slist_t callback_slist_array[TASK_MSG_COUNT];
  22. #ifdef TASK_MSG_USING_DYNAMIC_MEMORY
  23. static struct task_msg_dup_release_hook dup_release_hooks[TASK_MSG_COUNT] = task_msg_dup_release_hooks;
  24. #endif
  25. static rt_slist_t msg_slist = RT_SLIST_OBJECT_INIT(msg_slist);
  26. static rt_slist_t msg_ref_slist = RT_SLIST_OBJECT_INIT(msg_ref_slist);
  27. static rt_slist_t msg_subscriber_slist = RT_SLIST_OBJECT_INIT(msg_subscriber_slist);
  28. static rt_slist_t msg_wait_slist = RT_SLIST_OBJECT_INIT(msg_wait_slist);
  29. static rt_uint32_t subscriber_id = 0;
  30. /**
  31. * Append a message reference to the slist:msg_ref_slist,
  32. * each message reference are added only once.
  33. *
  34. * @param args: message reference
  35. */
  36. static void msg_ref_append(task_msg_args_t args)
  37. {
  38. rt_bool_t is_first_append = RT_TRUE;
  39. task_msg_ref_node_t item;
  40. rt_mutex_take(&msg_ref_lock, RT_WAITING_FOREVER);
  41. rt_slist_for_each_entry(item, &msg_ref_slist, slist)
  42. {
  43. if (item->args == args)
  44. {
  45. is_first_append = RT_FALSE;
  46. item->ref_count++;
  47. break;
  48. }
  49. }
  50. if (is_first_append)
  51. {
  52. task_msg_ref_node_t node = rt_calloc(1, sizeof(struct task_msg_ref_node));
  53. if (node == RT_NULL)
  54. {
  55. rt_mutex_release(&msg_ref_lock);
  56. LOG_E("there is no memory available!");
  57. return;
  58. }
  59. node->args = args;
  60. node->ref_count = 1;
  61. rt_slist_init(&(node->slist));
  62. rt_slist_append(&msg_ref_slist, &(node->slist));
  63. }
  64. rt_mutex_release(&msg_ref_lock);
  65. }
  66. /**
  67. * Release a message reference from the slist:msg_ref_slist,
  68. * only when the subscribers of all messages have consumed,
  69. * can they really free from memory.
  70. *
  71. * @param args: message reference
  72. */
  73. void task_msg_release(task_msg_args_t args)
  74. {
  75. task_msg_ref_node_t item;
  76. rt_mutex_take(&msg_ref_lock, RT_WAITING_FOREVER);
  77. rt_slist_for_each_entry(item, &msg_ref_slist, slist)
  78. {
  79. if (item->args == args)
  80. {
  81. item->ref_count--;
  82. if (item->ref_count <= 0)
  83. {
  84. rt_slist_remove(&msg_ref_slist, &(item->slist));
  85. if (item->args->msg_obj)
  86. {
  87. #ifdef TASK_MSG_USING_DYNAMIC_MEMORY
  88. if (dup_release_hooks[item->args->msg_name].release)
  89. {
  90. RT_ASSERT(dup_release_hooks[item->args->msg_name].msg_name == item->args->msg_name);
  91. dup_release_hooks[item->args->msg_name].release(item->args->msg_obj);
  92. }
  93. #endif
  94. rt_free(item->args->msg_obj);
  95. }
  96. rt_free(item->args);
  97. rt_free(item);
  98. }
  99. break;
  100. }
  101. }
  102. rt_mutex_release(&msg_ref_lock);
  103. }
  104. /**
  105. * Create a subscriber.
  106. * @param msg_name: message name
  107. * @return create failed return -1,otherwise return >=0
  108. */
  109. int task_msg_subscriber_create(enum task_msg_name msg_name)
  110. {
  111. if (task_msg_bus_init_tag == RT_FALSE)
  112. return -1;
  113. task_msg_subscriber_node_t subscriber = (task_msg_subscriber_node_t) rt_calloc(1,
  114. sizeof(struct task_msg_subscriber_node));
  115. if (subscriber == RT_NULL)
  116. return -1;
  117. int id = subscriber_id++;
  118. char name[RT_NAME_MAX];
  119. rt_snprintf(name, RT_NAME_MAX, "sub_%d", id);
  120. subscriber->sem = rt_sem_create(name, 0, RT_IPC_FLAG_PRIO);
  121. if (subscriber->sem == RT_NULL)
  122. {
  123. rt_free(subscriber);
  124. return -1;
  125. }
  126. subscriber->msg_name = msg_name;
  127. subscriber->subscriber_id = id;
  128. rt_slist_init(&(subscriber->slist));
  129. rt_mutex_take(&sub_lock, RT_WAITING_FOREVER);
  130. rt_slist_append(&msg_subscriber_slist, &(subscriber->slist));
  131. rt_mutex_release(&sub_lock);
  132. return id;
  133. }
  134. /**
  135. * Create a subscriber and allow multiple topics to be subscribed.
  136. * @param msg_name_list: message name array
  137. * @param msg_name_list_len: message name array length
  138. * @return create failed return -1,otherwise return >=0
  139. */
  140. int task_msg_subscriber_create2(const enum task_msg_name *msg_name_list, rt_uint8_t msg_name_list_len)
  141. {
  142. if (task_msg_bus_init_tag == RT_FALSE)
  143. return -1;
  144. int id = subscriber_id++;
  145. char name[RT_NAME_MAX];
  146. rt_snprintf(name, RT_NAME_MAX, "sub_%d", id);
  147. rt_sem_t sem = rt_sem_create(name, 0, RT_IPC_FLAG_PRIO);
  148. if (sem == RT_NULL)
  149. return -1;
  150. int count = 0;
  151. rt_mutex_take(&sub_lock, RT_WAITING_FOREVER);
  152. for (int i = 0; i < msg_name_list_len; i++)
  153. {
  154. task_msg_subscriber_node_t subscriber = rt_calloc(1, sizeof(struct task_msg_subscriber_node));
  155. if (subscriber == RT_NULL)
  156. {
  157. goto ERROR;
  158. }
  159. subscriber->sem = sem;
  160. subscriber->msg_name = msg_name_list[i];
  161. subscriber->subscriber_id = id;
  162. rt_slist_init(&(subscriber->slist));
  163. rt_slist_append(&msg_subscriber_slist, &(subscriber->slist));
  164. count++;
  165. }
  166. if (count > 0)
  167. {
  168. rt_mutex_release(&sub_lock);
  169. return id;
  170. }
  171. ERROR: rt_sem_delete(sem);
  172. if (count > 0)
  173. {
  174. task_msg_subscriber_node_t subscriber;
  175. rt_slist_for_each_entry(subscriber, &msg_subscriber_slist, slist)
  176. {
  177. if (subscriber->subscriber_id == id)
  178. {
  179. rt_slist_remove(&msg_subscriber_slist, &(subscriber->slist));
  180. rt_free(subscriber);
  181. }
  182. }
  183. }
  184. rt_mutex_release(&sub_lock);
  185. return -1;
  186. }
  187. /**
  188. * Delete a subscriber.
  189. * @param subscriber_id: subscriber id
  190. */
  191. void task_msg_subscriber_delete(int subscriber_id)
  192. {
  193. task_msg_wait_node_t wait_node;
  194. task_msg_subscriber_node_t subscriber;
  195. rt_bool_t first_tag = RT_TRUE;
  196. rt_mutex_take(&wt_lock, RT_WAITING_FOREVER);
  197. rt_slist_for_each_entry(wait_node, &msg_wait_slist, slist)
  198. {
  199. if (wait_node->subscriber->subscriber_id == subscriber_id)
  200. {
  201. rt_slist_remove(&msg_wait_slist, &(wait_node->slist));
  202. task_msg_release(wait_node->args);
  203. rt_free(wait_node);
  204. }
  205. }
  206. rt_mutex_take(&sub_lock, RT_WAITING_FOREVER);
  207. rt_slist_for_each_entry(subscriber, &msg_subscriber_slist, slist)
  208. {
  209. if (subscriber->subscriber_id == subscriber_id)
  210. {
  211. rt_slist_remove(&msg_subscriber_slist, &(subscriber->slist));
  212. if (first_tag)
  213. {
  214. rt_sem_delete(subscriber->sem);
  215. first_tag = RT_FALSE;
  216. }
  217. rt_free(subscriber);
  218. }
  219. }
  220. rt_mutex_release(&sub_lock);
  221. rt_mutex_release(&wt_lock);
  222. }
  223. /**
  224. * Blocks the current thread until a message of the specified message name is received.
  225. *
  226. * @param subscriber_id: subscriber id
  227. * @param timeout_ms: the waiting millisecond (-1:waiting forever until get resource)
  228. * @param out_args: output parameter, return the received message reference address
  229. * @return error code
  230. */
  231. rt_err_t task_msg_wait_until(int subscriber_id, rt_int32_t timeout_ms, struct task_msg_args **out_args)
  232. {
  233. if (task_msg_bus_init_tag == RT_FALSE)
  234. return -RT_EINVAL;
  235. rt_err_t rst = -RT_ERROR;
  236. task_msg_subscriber_node_t subscriber;
  237. rt_sem_t sem = RT_NULL;
  238. rt_mutex_take(&sub_lock, RT_WAITING_FOREVER);
  239. rt_slist_for_each_entry(subscriber, &msg_subscriber_slist, slist)
  240. {
  241. if (subscriber->subscriber_id == subscriber_id)
  242. {
  243. sem = subscriber->sem;
  244. break;
  245. }
  246. }
  247. rt_mutex_release(&sub_lock);
  248. if (sem == RT_NULL)
  249. {
  250. rt_thread_mdelay(timeout_ms);
  251. return -RT_EINVAL;
  252. }
  253. rst = rt_sem_take(sem, timeout_ms);
  254. if (rst == RT_EOK)
  255. {
  256. rst = -RT_EINVAL;
  257. task_msg_wait_node_t wait_node;
  258. rt_mutex_take(&wt_lock, RT_WAITING_FOREVER);
  259. rt_slist_for_each_entry(wait_node, &msg_wait_slist, slist)
  260. {
  261. if (wait_node->subscriber->subscriber_id == subscriber_id)
  262. {
  263. *out_args = wait_node->args;
  264. rt_slist_remove(&msg_wait_slist, &(wait_node->slist));
  265. rt_free(wait_node);
  266. rst = RT_EOK;
  267. break;
  268. }
  269. }
  270. rt_mutex_release(&wt_lock);
  271. }
  272. return rst;
  273. }
  274. /**
  275. * Subscribe the message with the specified name and set the callback function.
  276. *
  277. * @param msg_name: message name
  278. * @param callback: callback function name
  279. * @return error code
  280. */
  281. rt_err_t task_msg_subscribe(enum task_msg_name msg_name, void (*callback)(task_msg_args_t msg_args))
  282. {
  283. if (task_msg_bus_init_tag == RT_FALSE || callback == RT_NULL)
  284. return -RT_EINVAL;
  285. rt_mutex_take(&cb_lock, RT_WAITING_FOREVER);
  286. rt_bool_t find_tag = RT_FALSE;
  287. task_msg_callback_node_t node;
  288. rt_slist_for_each_entry(node, &callback_slist_array[msg_name], slist)
  289. {
  290. if (node->callback == callback)
  291. {
  292. find_tag = RT_TRUE;
  293. break;
  294. }
  295. }
  296. if (find_tag)
  297. {
  298. LOG_W("this task msg callback with msg_name[%d] is exist!", msg_name);
  299. }
  300. else
  301. {
  302. task_msg_callback_node_t callback_node = rt_calloc(1, sizeof(struct task_msg_callback_node));
  303. if (callback_node == RT_NULL)
  304. {
  305. rt_mutex_release(&cb_lock);
  306. LOG_E("there is no memory available!");
  307. return RT_ENOMEM;
  308. }
  309. callback_node->callback = callback;
  310. rt_slist_init(&(callback_node->slist));
  311. rt_slist_append(&callback_slist_array[msg_name], &(callback_node->slist));
  312. }
  313. rt_mutex_release(&cb_lock);
  314. return RT_EOK;
  315. }
  316. /**
  317. * Unsubscribe the message with the specified name and cancle the callback function.
  318. *
  319. * @param msg_name: message name
  320. * @param callback: callback function name
  321. * @return error code
  322. */
  323. rt_err_t task_msg_unsubscribe(enum task_msg_name msg_name, void (*callback)(task_msg_args_t msg_args))
  324. {
  325. if (task_msg_bus_init_tag == RT_FALSE || callback == RT_NULL)
  326. return -RT_EINVAL;
  327. task_msg_callback_node_t node;
  328. rt_mutex_take(&cb_lock, RT_WAITING_FOREVER);
  329. rt_slist_for_each_entry(node, &callback_slist_array[msg_name], slist)
  330. {
  331. if (node->callback == callback)
  332. {
  333. rt_slist_remove(&callback_slist_array[msg_name], &(node->slist));
  334. rt_free(node);
  335. break;
  336. }
  337. }
  338. rt_mutex_release(&cb_lock);
  339. return RT_EOK;
  340. }
  341. /**
  342. * Publish a message object.
  343. *
  344. * @param msg_name: message name
  345. * @param msg_obj: message object
  346. * @param msg_size: message size
  347. * @return error code
  348. */
  349. rt_err_t task_msg_publish_obj(enum task_msg_name msg_name, void *msg_obj, rt_size_t msg_size)
  350. {
  351. if (task_msg_bus_init_tag == RT_FALSE)
  352. return -RT_EINVAL;
  353. task_msg_args_node_t node = rt_calloc(1, sizeof(struct task_msg_args_node));
  354. if (node == RT_NULL)
  355. {
  356. LOG_E("task msg publish failed! args_node create failed!");
  357. return -RT_ENOMEM;
  358. }
  359. task_msg_args_t msg_args = rt_calloc(1, sizeof(struct task_msg_args));
  360. if (msg_args == RT_NULL)
  361. {
  362. rt_free(node);
  363. LOG_E("task msg publish failed! msg_args create failed!");
  364. return -RT_ENOMEM;
  365. }
  366. msg_args->msg_name = msg_name;
  367. msg_args->msg_size = msg_size;
  368. msg_args->msg_obj = RT_NULL;
  369. if (msg_obj && msg_size > 0)
  370. {
  371. #ifdef TASK_MSG_USING_DYNAMIC_MEMORY
  372. if (dup_release_hooks[msg_name].dup)
  373. {
  374. RT_ASSERT(dup_release_hooks[msg_name].msg_name == msg_name);
  375. msg_args->msg_obj = dup_release_hooks[msg_name].dup(msg_obj);
  376. if (msg_args->msg_obj == RT_NULL)
  377. {
  378. rt_free(node);
  379. rt_free(msg_args);
  380. LOG_E("task msg publish failed! msg_args create failed!");
  381. return -RT_ENOMEM;
  382. }
  383. }
  384. else
  385. {
  386. msg_args->msg_obj = rt_calloc(1, msg_size);
  387. if (msg_args->msg_obj)
  388. {
  389. rt_memcpy(msg_args->msg_obj, msg_obj, msg_size);
  390. }
  391. else
  392. {
  393. rt_free(node);
  394. rt_free(msg_args);
  395. LOG_E("task msg publish failed! msg_args create failed!");
  396. return -RT_ENOMEM;
  397. }
  398. }
  399. #else
  400. msg_args->msg_obj = rt_calloc(1, msg_size);
  401. if (msg_args->msg_obj)
  402. {
  403. rt_memcpy(msg_args->msg_obj, msg_obj, msg_size);
  404. }
  405. else
  406. {
  407. rt_free(node);
  408. rt_free(msg_args);
  409. LOG_E("task msg publish failed! msg_args create failed!");
  410. return -RT_ENOMEM;
  411. }
  412. #endif
  413. }
  414. node->args = msg_args;
  415. rt_slist_init(&(node->slist));
  416. rt_mutex_take(&msg_lock, RT_WAITING_FOREVER);
  417. rt_slist_append(&msg_slist, &(node->slist));
  418. rt_mutex_release(&msg_lock);
  419. rt_sem_release(&msg_sem);
  420. return RT_EOK;
  421. }
  422. /**
  423. * Publish a text message.
  424. *
  425. * @param msg_name: message name
  426. * @param msg_text: message text
  427. * @return error code
  428. */
  429. rt_err_t task_msg_publish(enum task_msg_name msg_name, const char *msg_text)
  430. {
  431. void *msg_obj = (void *) msg_text;
  432. rt_size_t args_size = 0;
  433. if (msg_obj)
  434. {
  435. args_size = rt_strlen(msg_text) + 1;
  436. }
  437. return task_msg_publish_obj(msg_name, msg_obj, args_size);
  438. }
  439. static void msg_timing_timeout(void *params)
  440. {
  441. task_msg_loop_t msg_timing = (task_msg_loop_t) params;
  442. task_msg_publish_obj(msg_timing->msg_name, msg_timing->msg_obj, msg_timing->msg_size);
  443. if (msg_timing->msg_obj)
  444. {
  445. #ifdef TASK_MSG_USING_DYNAMIC_MEMORY
  446. if (dup_release_hooks[msg_timing->msg_name].release)
  447. {
  448. RT_ASSERT(dup_release_hooks[msg_timing->msg_name].msg_name == msg_timing->msg_name)
  449. dup_release_hooks[msg_timing->msg_name].release(msg_timing->msg_obj);
  450. }
  451. #endif
  452. rt_free(msg_timing->msg_obj);
  453. msg_timing->msg_obj = RT_NULL;
  454. }
  455. rt_timer_delete(msg_timing->timer);
  456. msg_timing->timer = RT_NULL;
  457. rt_free(msg_timing);
  458. }
  459. /**
  460. * Publish a delay message object.
  461. * @param delay_ms: delay ms
  462. * @param msg_name: message name
  463. * @param msg_obj: message object
  464. * @param msg_size: message size
  465. * @return error code
  466. */
  467. rt_err_t task_msg_delay_publish_obj(rt_uint32_t delay_ms, enum task_msg_name msg_name, void *msg_obj,
  468. rt_size_t msg_size)
  469. {
  470. task_msg_loop_t msg_loop = rt_calloc(1, sizeof(struct task_msg_loop));
  471. if (msg_loop == RT_NULL)
  472. return -RT_ENOMEM;
  473. msg_loop->msg_name = msg_name;
  474. msg_loop->msg_obj = RT_NULL;
  475. msg_loop->msg_size = 0;
  476. if (msg_obj && msg_size > 0)
  477. {
  478. #ifdef TASK_MSG_USING_DYNAMIC_MEMORY
  479. if (dup_release_hooks[msg_name].dup)
  480. {
  481. RT_ASSERT(dup_release_hooks[msg_name].msg_name == msg_name);
  482. msg_loop->msg_obj = dup_release_hooks[msg_name].dup(msg_obj);
  483. if (msg_loop->msg_obj == RT_NULL)
  484. {
  485. rt_free(msg_loop);
  486. return -RT_ENOMEM;
  487. }
  488. }
  489. else
  490. {
  491. msg_loop->msg_obj = rt_calloc(1, msg_size);
  492. if (msg_loop->msg_obj == RT_NULL)
  493. {
  494. rt_free(msg_loop);
  495. return -RT_ENOMEM;
  496. }
  497. rt_memcpy(msg_loop->msg_obj, msg_obj, msg_size);
  498. }
  499. #else
  500. msg_loop->msg_obj = rt_calloc(1, msg_size);
  501. if (msg_loop->msg_obj == RT_NULL)
  502. {
  503. rt_free(msg_loop);
  504. return -RT_ENOMEM;
  505. }
  506. rt_memcpy(msg_loop->msg_obj, msg_obj, msg_size);
  507. #endif
  508. msg_loop->msg_size = msg_size;
  509. }
  510. char name[RT_NAME_MAX];
  511. rt_snprintf(name, RT_NAME_MAX, "delay%d", msg_name);
  512. msg_loop->timer = rt_timer_create(name, msg_timing_timeout, msg_loop, rt_tick_from_millisecond(delay_ms),
  513. RT_TIMER_FLAG_SOFT_TIMER | RT_TIMER_FLAG_ONE_SHOT);
  514. if (msg_loop->timer == RT_NULL)
  515. {
  516. if (msg_loop->msg_obj)
  517. {
  518. #ifdef TASK_MSG_USING_DYNAMIC_MEMORY
  519. if (dup_release_hooks[msg_name].release)
  520. {
  521. RT_ASSERT(dup_release_hooks[msg_name].msg_name == msg_name);
  522. dup_release_hooks[msg_name].release(msg_obj);
  523. }
  524. #endif
  525. rt_free(msg_loop->msg_obj);
  526. }
  527. rt_free(msg_loop);
  528. return -RT_ENOMEM;
  529. }
  530. rt_err_t rst = rt_timer_start(msg_loop->timer);
  531. if (rst != RT_EOK)
  532. {
  533. if (msg_loop->msg_obj)
  534. {
  535. #ifdef TASK_MSG_USING_DYNAMIC_MEMORY
  536. if (dup_release_hooks[msg_name].release)
  537. {
  538. RT_ASSERT(dup_release_hooks[msg_name].msg_name == msg_name);
  539. dup_release_hooks[msg_name].release(msg_obj);
  540. }
  541. #endif
  542. rt_free(msg_loop->msg_obj);
  543. }
  544. rt_timer_delete(msg_loop->timer);
  545. rt_free(msg_loop);
  546. }
  547. return rst;
  548. }
  549. /**
  550. * Publish a delay text message.
  551. * @param delay_ms: delay ms
  552. * @param msg_name: message name
  553. * @param msg_text: message text
  554. * @return error code
  555. */
  556. rt_err_t task_msg_delay_publish(rt_uint32_t delay_ms, enum task_msg_name msg_name, const char *msg_text)
  557. {
  558. void *msg_obj = (void *) msg_text;
  559. rt_size_t args_size = 0;
  560. if (msg_obj)
  561. {
  562. args_size = rt_strlen(msg_text) + 1;
  563. }
  564. return task_msg_delay_publish_obj(delay_ms, msg_name, msg_obj, args_size);
  565. }
  566. static void msg_loop_timeout(void *params)
  567. {
  568. task_msg_loop_t msg_loop = (task_msg_loop_t) params;
  569. task_msg_publish_obj(msg_loop->msg_name, msg_loop->msg_obj, msg_loop->msg_size);
  570. }
  571. /**
  572. * create a loop message
  573. * @return error code
  574. */
  575. task_msg_loop_t task_msg_loop_create(void)
  576. {
  577. task_msg_loop_t msg_loop = rt_calloc(1, sizeof(struct task_msg_loop));
  578. if (msg_loop == RT_NULL)
  579. return RT_NULL;
  580. msg_loop->timer = RT_NULL;
  581. msg_loop->msg_obj = RT_NULL;
  582. msg_loop->msg_size = 0;
  583. return msg_loop;
  584. }
  585. /**
  586. * start a loop message
  587. * @param msg_loop
  588. * @param delay_ms
  589. * @param msg_name
  590. * @param msg_obj
  591. * @param msg_size
  592. * @return error code
  593. */
  594. rt_err_t task_msg_loop_start(task_msg_loop_t msg_loop, rt_uint32_t delay_ms, enum task_msg_name msg_name, void *msg_obj,
  595. rt_size_t msg_size)
  596. {
  597. if (msg_loop == RT_NULL)
  598. return -RT_EEMPTY;
  599. if (msg_loop->timer == RT_NULL)
  600. {
  601. char name[RT_NAME_MAX];
  602. rt_snprintf(name, RT_NAME_MAX, "loop%d", msg_name);
  603. msg_loop->timer = rt_timer_create(name, msg_loop_timeout, msg_loop, rt_tick_from_millisecond(delay_ms),
  604. RT_TIMER_FLAG_SOFT_TIMER | RT_TIMER_FLAG_PERIODIC);
  605. if (msg_loop->timer == RT_NULL)
  606. {
  607. return -RT_ENOMEM;
  608. }
  609. }
  610. else
  611. {
  612. rt_timer_stop(msg_loop->timer);
  613. rt_tick_t delay_tick = rt_tick_from_millisecond(delay_ms);
  614. rt_timer_control(msg_loop->timer, RT_TIMER_CTRL_SET_TIME, &delay_tick);
  615. if (msg_loop->msg_obj)
  616. {
  617. #ifdef TASK_MSG_USING_DYNAMIC_MEMORY
  618. if (dup_release_hooks[msg_name].release)
  619. {
  620. RT_ASSERT(dup_release_hooks[msg_name].msg_name == msg_name);
  621. dup_release_hooks[msg_name].release(msg_obj);
  622. }
  623. #endif
  624. rt_free(msg_loop->msg_obj);
  625. }
  626. }
  627. msg_loop->msg_name = msg_name;
  628. msg_loop->msg_obj = RT_NULL;
  629. msg_loop->msg_size = 0;
  630. if (msg_obj && msg_size > 0)
  631. {
  632. #ifdef TASK_MSG_USING_DYNAMIC_MEMORY
  633. if (dup_release_hooks[msg_name].dup)
  634. {
  635. RT_ASSERT(dup_release_hooks[msg_name].msg_name == msg_name);
  636. msg_loop->msg_obj = dup_release_hooks[msg_name].dup(msg_obj);
  637. if (msg_loop->msg_obj == RT_NULL)
  638. {
  639. return -RT_ENOMEM;
  640. }
  641. }
  642. else
  643. {
  644. msg_loop->msg_obj = rt_calloc(1, msg_size);
  645. if (msg_loop->msg_obj == RT_NULL)
  646. {
  647. return -RT_ENOMEM;
  648. }
  649. rt_memcpy(msg_loop->msg_obj, msg_obj, msg_size);
  650. }
  651. #else
  652. msg_loop->msg_obj = rt_calloc(1, msg_size);
  653. if (msg_loop->msg_obj == RT_NULL)
  654. {
  655. return -RT_ENOMEM;
  656. }
  657. rt_memcpy(msg_loop->msg_obj, msg_obj, msg_size);
  658. #endif
  659. msg_loop->msg_size = msg_size;
  660. }
  661. return rt_timer_start(msg_loop->timer);
  662. }
  663. /**
  664. * stop a loop message
  665. * @param msg_loop
  666. * @return error code
  667. */
  668. rt_err_t task_msg_loop_stop(task_msg_loop_t msg_loop)
  669. {
  670. if (msg_loop == RT_NULL || msg_loop->timer == RT_NULL)
  671. return -RT_EEMPTY;
  672. return rt_timer_stop(msg_loop->timer);
  673. }
  674. /**
  675. * delete a loop message
  676. * @param msg_loop
  677. * @return error code
  678. */
  679. rt_err_t task_msg_loop_delete(task_msg_loop_t msg_loop)
  680. {
  681. rt_err_t rst;
  682. if (msg_loop == RT_NULL)
  683. return -RT_EEMPTY;
  684. if (msg_loop->timer)
  685. {
  686. rt_timer_stop(msg_loop->timer);
  687. rst = rt_timer_delete(msg_loop->timer);
  688. if (rst == RT_EOK)
  689. msg_loop->timer = RT_NULL;
  690. }
  691. if (rst == RT_EOK)
  692. {
  693. if (msg_loop->msg_obj)
  694. {
  695. #ifdef TASK_MSG_USING_DYNAMIC_MEMORY
  696. if (dup_release_hooks[msg_loop->msg_name].release)
  697. {
  698. RT_ASSERT(dup_release_hooks[msg_loop->msg_name].msg_name == msg_loop->msg_name);
  699. dup_release_hooks[msg_loop->msg_name].release(msg_loop->msg_obj);
  700. }
  701. #endif
  702. rt_free(msg_loop->msg_obj);
  703. }
  704. msg_loop->msg_obj = RT_NULL;
  705. msg_loop->msg_size = 0;
  706. rt_free(msg_loop);
  707. }
  708. return rst;
  709. }
  710. /**
  711. * Initialize the callback slist array.
  712. */
  713. static void task_msg_callback_init(void)
  714. {
  715. rt_mutex_take(&cb_lock, RT_WAITING_FOREVER);
  716. for (int i = 0; i < TASK_MSG_COUNT; i++)
  717. {
  718. callback_slist_array[i].next = RT_NULL;
  719. }
  720. rt_mutex_release(&cb_lock);
  721. }
  722. /**
  723. * Task message bus thread entry.
  724. * @param params
  725. */
  726. static void task_msg_bus_thread_entry(void *params)
  727. {
  728. while (1)
  729. {
  730. if (rt_sem_take(&msg_sem, RT_WAITING_FOREVER) == RT_EOK)
  731. {
  732. task_msg_args_node_t msg_args_node;
  733. task_msg_callback_node_t msg_callback_node;
  734. task_msg_subscriber_node_t subscriber;
  735. task_msg_wait_node_t msg_wait_node;
  736. if (rt_slist_len(&msg_slist) > 0)
  737. {
  738. //get msg
  739. msg_args_node = rt_slist_first_entry(&msg_slist, struct task_msg_args_node, slist);
  740. msg_ref_append(msg_args_node->args);
  741. rt_mutex_take(&sub_lock, RT_WAITING_FOREVER);
  742. rt_slist_for_each_entry(subscriber, &msg_subscriber_slist, slist)
  743. {
  744. if (subscriber->msg_name == msg_args_node->args->msg_name)
  745. {
  746. msg_wait_node = rt_calloc(1, sizeof(struct task_msg_wait_node));
  747. if (msg_wait_node == RT_NULL)
  748. {
  749. LOG_W("no memory to create msg_wait_node!");
  750. break;
  751. }
  752. msg_ref_append(msg_args_node->args);
  753. msg_wait_node->subscriber = subscriber;
  754. msg_wait_node->args = msg_args_node->args;
  755. rt_slist_init(&(msg_wait_node->slist));
  756. rt_mutex_take(&wt_lock, RT_WAITING_FOREVER);
  757. rt_slist_append(&msg_wait_slist, &(msg_wait_node->slist));
  758. rt_mutex_release(&wt_lock);
  759. rt_sem_release(subscriber->sem);
  760. }
  761. }
  762. rt_mutex_release(&sub_lock);
  763. //msg callback
  764. rt_mutex_take(&cb_lock, RT_WAITING_FOREVER);
  765. rt_slist_for_each_entry(msg_callback_node, &callback_slist_array[msg_args_node->args->msg_name], slist)
  766. {
  767. if (msg_callback_node->callback)
  768. {
  769. msg_callback_node->callback(msg_args_node->args);
  770. }
  771. }
  772. rt_mutex_release(&cb_lock);
  773. //remove msg
  774. rt_mutex_take(&msg_lock, RT_WAITING_FOREVER);
  775. rt_slist_remove(&msg_slist, &(msg_args_node->slist));
  776. rt_mutex_release(&msg_lock);
  777. //release msg
  778. task_msg_release(msg_args_node->args);
  779. rt_free(msg_args_node);
  780. }
  781. }
  782. }
  783. }
  784. /**
  785. * Initialize message bus components.
  786. *
  787. * @param stack_size: message bus thread stack size
  788. * @param priority: thread priority
  789. * @param tick: thread tick
  790. * @return error code
  791. */
  792. int task_msg_bus_init(void)
  793. {
  794. if (task_msg_bus_init_tag)
  795. return -RT_EBUSY;
  796. rt_sem_init(&msg_sem, "msg_sem", 0, RT_IPC_FLAG_FIFO);
  797. rt_mutex_init(&msg_lock, "msg_lock", RT_IPC_FLAG_FIFO);
  798. rt_mutex_init(&msg_ref_lock, "ref_lock", RT_IPC_FLAG_FIFO);
  799. rt_mutex_init(&cb_lock, "cb_lock", RT_IPC_FLAG_FIFO);
  800. rt_mutex_init(&wt_lock, "wt_lock", RT_IPC_FLAG_FIFO);
  801. rt_mutex_init(&sub_lock, "sub_lock", RT_IPC_FLAG_FIFO);
  802. task_msg_callback_init();
  803. task_msg_bus_init_tag = RT_TRUE;
  804. rt_thread_t t = rt_thread_create("msg_bus", task_msg_bus_thread_entry,
  805. RT_NULL, TASK_MSG_THREAD_STACK_SIZE, TASK_MSG_THREAD_PRIORITY, 20);
  806. if (t == RT_NULL)
  807. {
  808. LOG_E("task msg bus initialize failed! msg_bus_thread create failed!");
  809. return -RT_ENOMEM;
  810. }
  811. rt_err_t rst = rt_thread_startup(t);
  812. if (rst == RT_EOK)
  813. {
  814. LOG_I("task msg bus initialize success!");
  815. }
  816. else
  817. {
  818. LOG_E("task msg bus initialize failed! msg_bus thread startup failed(%d)", rst);
  819. }
  820. return rst;
  821. }
  822. INIT_COMPONENT_EXPORT(task_msg_bus_init);