task_msg_bus_sample.c 7.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217
  1. #include <board.h>
  2. #include "task_msg_bus.h"
  3. #define LOG_TAG "sample"
  4. #define LOG_LVL LOG_LVL_DBG
  5. #include <ulog.h>
  6. #ifdef TASK_MSG_USING_DYNAMIC_MEMORY
  7. void *msg_3_dup_hook(void *args)
  8. {
  9. struct msg_3_def *msg_3 = (struct msg_3_def *) args;
  10. struct msg_3_def *r_msg_3 = rt_calloc(1, sizeof(struct msg_3_def));
  11. if (r_msg_3 == RT_NULL)
  12. {
  13. return RT_NULL;
  14. }
  15. rt_memcpy((rt_uint8_t *) r_msg_3, (rt_uint8_t *) msg_3, sizeof(struct msg_3_def));
  16. if (msg_3->buffer && msg_3->buffer_size > 0)
  17. {
  18. r_msg_3->buffer = rt_calloc(1, msg_3->buffer_size);
  19. if (r_msg_3->buffer == RT_NULL)
  20. {
  21. rt_free(r_msg_3);
  22. return RT_NULL;
  23. }
  24. rt_memcpy(r_msg_3->buffer, msg_3->buffer, msg_3->buffer_size);
  25. r_msg_3->buffer_size = msg_3->buffer_size;
  26. }
  27. return r_msg_3;
  28. }
  29. void msg_3_release_hook(void *args)
  30. {
  31. struct msg_3_def *msg_3 = (struct msg_3_def *) args;
  32. if (msg_3->buffer)
  33. rt_free(msg_3->buffer);
  34. }
  35. #endif
  36. static void net_reday_callback(task_msg_args_t args)
  37. {
  38. //这里不要做耗时操作
  39. LOG_D("[net_reday_callback]:TASK_MSG_NET_REDAY => args->msg_name:%d, args->msg_obj:%s", args->msg_name,
  40. args->msg_obj);
  41. }
  42. static void os_reday_callback(task_msg_args_t args)
  43. {
  44. //这里不要做耗时操作
  45. LOG_D("[os_reday_callback]:TASK_MSG_OS_REDAY => msg_obj is null:%s", args->msg_obj==RT_NULL ? "true" : "false");
  46. }
  47. static void msg_wait_thread_entry(void *params)
  48. {
  49. rt_err_t rst;
  50. task_msg_args_t args;
  51. //创建消息订阅者
  52. int subscriber_id = task_msg_subscriber_create(TASK_MSG_NET_REDAY);
  53. if (subscriber_id < 0)
  54. return;
  55. while (1)
  56. {
  57. rst = task_msg_wait_until(subscriber_id, 50, &args);
  58. if (rst == RT_EOK)
  59. {
  60. LOG_D("[task_msg_wait_until]:TASK_MSG_NET_REDAY => args.msg_name:%d, args.msg_obj:%s", args->msg_name,
  61. args->msg_obj);
  62. rt_thread_mdelay(200); //模拟耗时操作,在此期间发布的消息不会丢失
  63. //释放消息
  64. task_msg_release(args);
  65. }
  66. else
  67. {
  68. //可以做其它操作,在此期间发布的消息不会丢失
  69. }
  70. }
  71. }
  72. static void msg_wait_any_thread_entry(void *params)
  73. {
  74. rt_err_t rst;
  75. task_msg_args_t args = RT_NULL;
  76. const enum task_msg_name name_list[4] = { TASK_MSG_OS_REDAY, TASK_MSG_NET_REDAY, TASK_MSG_2, TASK_MSG_3 };
  77. //创建 多消息订阅者
  78. int subscriber_id = task_msg_subscriber_create2(name_list, sizeof(name_list) / sizeof(enum task_msg_name));
  79. if (subscriber_id < 0)
  80. return;
  81. while (1)
  82. {
  83. rst = task_msg_wait_until(subscriber_id, 50, &args);
  84. if (rst == RT_EOK)
  85. {
  86. if (args->msg_name == TASK_MSG_OS_REDAY)
  87. {
  88. LOG_D("[task_msg_wait_any]:TASK_MSG_OS_REDAY => msg_obj is null:%s",
  89. args->msg_obj==RT_NULL ? "true" : "false");
  90. }
  91. else if (args->msg_name == TASK_MSG_NET_REDAY)
  92. {
  93. LOG_D("[task_msg_wait_any]:TASK_MSG_NET_REDAY => args.msg_name:%d, args.msg_obj:%s", args->msg_name,
  94. args->msg_obj);
  95. }
  96. else if (args->msg_name == TASK_MSG_2)
  97. {
  98. struct msg_2_def *msg_2 = (struct msg_2_def *) args->msg_obj;
  99. LOG_D("[task_msg_wait_any]:TASK_MSG_2 => msg_2.id:%d, msg_2.name:%s", msg_2->id, msg_2->name);
  100. }
  101. #ifdef TASK_MSG_USING_DYNAMIC_MEMORY
  102. else if (args->msg_name == TASK_MSG_3)
  103. {
  104. struct msg_3_def *msg_3 = (struct msg_3_def *) args->msg_obj;
  105. LOG_D("[task_msg_wait_any]:TASK_MSG_3 => msg_3.id:%d, msg_3.name:%s", msg_3->id, msg_3->name);
  106. LOG_HEX("[task_msg_wait_any]:TASK_MSG_3 => msg_3.buffer", 16, msg_3->buffer, msg_3->buffer_size);
  107. }
  108. #endif
  109. rt_thread_mdelay(200); //模拟耗时操作,在此期间发布的消息不会丢失
  110. //释放消息
  111. task_msg_release(args);
  112. }
  113. else
  114. {
  115. //可以做其它操作,在此期间发布的消息不会丢失
  116. }
  117. }
  118. }
  119. static void msg_publish_thread_entry(void *params)
  120. {
  121. static int i = 0;
  122. char msg_text[50];
  123. rt_thread_mdelay(1000);
  124. while (1)
  125. {
  126. if (i % 4 == 0)
  127. {
  128. //不带消息内容
  129. task_msg_publish(TASK_MSG_OS_REDAY, RT_NULL);
  130. rt_thread_mdelay(10);
  131. task_msg_publish(TASK_MSG_OS_REDAY, RT_NULL);
  132. rt_thread_mdelay(10);
  133. task_msg_publish(TASK_MSG_OS_REDAY, RT_NULL);
  134. rt_thread_mdelay(10);
  135. task_msg_publish(TASK_MSG_OS_REDAY, RT_NULL);
  136. }
  137. else if (i % 4 == 1)
  138. {
  139. //json/text消息
  140. rt_snprintf(msg_text, 50, "{\"net_reday\":%d,\"ip\":\"%s\",\"id\":%ld}", 1, "10.0.0.20", i);
  141. task_msg_publish(TASK_MSG_NET_REDAY, msg_text);
  142. rt_thread_mdelay(10);
  143. task_msg_publish(TASK_MSG_NET_REDAY, msg_text);
  144. rt_thread_mdelay(10);
  145. task_msg_publish(TASK_MSG_NET_REDAY, msg_text);
  146. rt_thread_mdelay(10);
  147. task_msg_publish(TASK_MSG_NET_REDAY, msg_text);
  148. }
  149. else if (i % 4 == 2)
  150. {
  151. //结构体类型的消息(内部字段无动态内存分配)
  152. struct msg_2_def msg_2;
  153. msg_2.id = i;
  154. rt_snprintf(msg_2.name, 8, "%s\0", "hello");
  155. task_msg_publish_obj(TASK_MSG_2, &msg_2, sizeof(struct msg_2_def));
  156. rt_thread_mdelay(10);
  157. task_msg_publish_obj(TASK_MSG_2, &msg_2, sizeof(struct msg_2_def));
  158. rt_thread_mdelay(10);
  159. task_msg_publish_obj(TASK_MSG_2, &msg_2, sizeof(struct msg_2_def));
  160. rt_thread_mdelay(10);
  161. task_msg_publish_obj(TASK_MSG_2, &msg_2, sizeof(struct msg_2_def));
  162. }
  163. else
  164. {
  165. #ifdef TASK_MSG_USING_DYNAMIC_MEMORY
  166. const char buffer_test[32] =
  167. { 0x0F, 0x51, 0xEE, 0x89, 0x9D, 0x40, 0x80, 0x22, 0x63, 0x44, 0x43, 0x39, 0x55,
  168. 0x2D, 0x12, 0xA1, 0x1C, 0x91, 0xE5, 0x2C, 0xC4, 0x6A, 0x62, 0x5B, 0xB6, 0x41, 0xF0, 0xF7, 0x75,
  169. 0x48, 0x05, 0xE9};
  170. //结构体类型的消息(内部字段有动态内存分配)
  171. struct msg_3_def msg_3;
  172. msg_3.id = i;
  173. rt_snprintf(msg_3.name, 8, "%s\0", "slyant");
  174. msg_3.buffer = rt_calloc(1, 32);
  175. rt_memcpy(msg_3.buffer, buffer_test, 32);
  176. msg_3.buffer_size = 32;
  177. task_msg_publish_obj(TASK_MSG_3, &msg_3, sizeof(struct msg_3_def));
  178. rt_free(msg_3.buffer);
  179. #endif
  180. break;
  181. }
  182. rt_thread_mdelay(50);
  183. i++;
  184. }
  185. }
  186. static int task_msg_bus_sample(void)
  187. {
  188. //初始化消息总线(线程栈大小, 优先级, 时间片)
  189. task_msg_bus_init();
  190. //订阅消息
  191. task_msg_subscribe(TASK_MSG_NET_REDAY, net_reday_callback);
  192. task_msg_subscribe(TASK_MSG_OS_REDAY, os_reday_callback);
  193. //创建一个等待消息的线程
  194. rt_thread_t t_wait = rt_thread_create("msg_wt", msg_wait_thread_entry, RT_NULL, 1024, 17, 20);
  195. rt_thread_startup(t_wait);
  196. //创建一个同时等待多个消息的线程
  197. rt_thread_t t_wait_any = rt_thread_create("msg_wa", msg_wait_any_thread_entry, RT_NULL, 1024, 16, 20);
  198. rt_thread_startup(t_wait_any);
  199. //创建一个发布消息的线程
  200. rt_thread_t t_publish = rt_thread_create("msg_pub", msg_publish_thread_entry, RT_NULL, 1024, 15, 20);
  201. rt_thread_startup(t_publish);
  202. return RT_EOK;
  203. }
  204. INIT_APP_EXPORT(task_msg_bus_sample);