task_msg_bus_sample.c 5.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165
  1. #include <board.h>
  2. #include "task_msg_bus.h"
  3. #ifdef TASK_MSG_USING_JSON
  4. #include "cJSON_util.h"
  5. #endif
  6. #define LOG_TAG "task.msg.bus.sample"
  7. #define LOG_LVL LOG_LVL_DBG
  8. #include <ulog.h>
  9. #ifdef TASK_MSG_USING_DYNAMIC_MEMORY
  10. void msg_3_delete_hook(void *args)
  11. {
  12. struct msg_3_def *msg_3 = (struct msg_3_def *)args;
  13. if(msg_3->buffer)
  14. rt_free(msg_3->buffer);
  15. }
  16. #endif
  17. static void net_reday_callback(task_msg_args_t args)
  18. {
  19. LOG_D("[net_reday_callback]:TASK_MSG_NET_REDAY => args->msg_name:%d, args->msg_args:%s", args->msg_name, args->msg_args);
  20. }
  21. static void os_reday_callback(task_msg_args_t args)
  22. {
  23. LOG_D("[os_reday_callback]:TASK_MSG_OS_REDAY => args is null:%s", args->msg_args==RT_NULL ? "true" : "false");
  24. }
  25. static void msg_wait_thread_entry(void *params)
  26. {
  27. rt_err_t rst;
  28. task_msg_args_t args;
  29. while(1)
  30. {
  31. //测试 task_msg_wait_until
  32. rst = task_msg_wait_until(TASK_MSG_NET_REDAY, RT_WAITING_FOREVER, &args);
  33. if(rst==RT_EOK)
  34. {
  35. LOG_D("[task_msg_wait_until]:TASK_MSG_NET_REDAY => args.msg_name:%d, args.msg_args:%s", args->msg_name, args->msg_args);
  36. //释放内存
  37. task_msg_delete(args);
  38. }
  39. }
  40. }
  41. static void msg_wait_any_thread_entry(void *params)
  42. {
  43. rt_err_t rst;
  44. task_msg_args_t args = RT_NULL;
  45. while(1)
  46. {
  47. //测试 task_msg_wait_any
  48. const enum task_msg_name name_list[4] = {TASK_MSG_OS_REDAY, TASK_MSG_NET_REDAY, TASK_MSG_2, TASK_MSG_3};
  49. rst = task_msg_wait_any(name_list, sizeof(name_list)/sizeof(enum task_msg_name), RT_WAITING_FOREVER, &args);
  50. if(rst==RT_EOK)
  51. {
  52. if(args->msg_name==TASK_MSG_OS_REDAY)
  53. {
  54. LOG_D("[task_msg_wait_any]:TASK_MSG_OS_REDAY => args is null:%s", args->msg_args==RT_NULL ? "true" : "false");
  55. }
  56. else if(args->msg_name==TASK_MSG_NET_REDAY)
  57. {
  58. #ifdef TASK_MSG_USING_JSON
  59. cJSON *root = cJSON_Parse(args->msg_args);
  60. if(root)
  61. {
  62. int net_reday, id;
  63. cJSON_item_get_number(root, "net_reday", &net_reday);
  64. cJSON_item_get_number(root, "id", &id);
  65. const char *ip = cJSON_item_get_string(root, "ip");
  66. LOG_D("[task_msg_wait_any]:TASK_MSG_NET_REDAY => net_reday:%s, ip:%s, id:%d", (net_reday==0 ? "false" : "true"), ip, id);
  67. cJSON_Delete(root);
  68. }
  69. #else
  70. LOG_D("[task_msg_wait_any]:TASK_MSG_NET_REDAY => args.msg_name:%d, args.msg_args:%s", args->msg_name, args->msg_args);
  71. #endif
  72. }
  73. else if(args->msg_name==TASK_MSG_2)
  74. {
  75. struct msg_2_def *msg_2 = (struct msg_2_def *)args->msg_args;
  76. LOG_D("[task_msg_wait_any]:TASK_MSG_2 => msg_2.id:%d, msg_2.name:%s", msg_2->id, msg_2->name);
  77. }
  78. #ifdef TASK_MSG_USING_DYNAMIC_MEMORY
  79. else if(args->msg_name==TASK_MSG_3)
  80. {
  81. struct msg_3_def *msg_3 = (struct msg_3_def *)args->msg_args;
  82. LOG_D("[task_msg_wait_any]:TASK_MSG_3 => msg_3.id:%d, msg_3.name:%s", msg_3->id, msg_3->name);
  83. LOG_HEX("[task_msg_wait_any]:TASK_MSG_3 => msg_3.buffer", 16, msg_3->buffer, msg_3->buffer_size);
  84. }
  85. #endif
  86. //释放内存
  87. task_msg_delete(args);
  88. }
  89. }
  90. }
  91. static void msg_publish_thread_entry(void *params)
  92. {
  93. static int i = 0;
  94. char arg_json[50];
  95. while (1)
  96. {
  97. if(i % 4 == 0)
  98. {
  99. //不带消息内容
  100. task_msg_publish(TASK_MSG_OS_REDAY, RT_NULL);
  101. }
  102. else if(i % 4 == 1)
  103. {
  104. //json/text消息
  105. rt_snprintf(arg_json, 50, "{\"net_reday\":%d,\"ip\":\"%s\",\"id\":%ld}", 1, "10.0.0.20", i);
  106. task_msg_publish(TASK_MSG_NET_REDAY, arg_json);
  107. }
  108. else if(i % 4 == 2)
  109. {
  110. //结构体消息(内部字段无动态内存分配)
  111. struct msg_2_def msg_2;
  112. msg_2.id = i;
  113. rt_snprintf(msg_2.name, 8, "%s\0", "hello");
  114. task_msg_publish_obj(TASK_MSG_2, &msg_2, sizeof(struct msg_2_def));
  115. }
  116. else
  117. {
  118. #ifdef TASK_MSG_USING_DYNAMIC_MEMORY
  119. const char buffer_test[32] = {
  120. 0x0F, 0x51, 0xEE, 0x89, 0x9D, 0x40, 0x80, 0x22, 0x63, 0x44, 0x43, 0x39, 0x55, 0x2D, 0x12, 0xA1,
  121. 0x1C, 0x91, 0xE5, 0x2C, 0xC4, 0x6A, 0x62, 0x5B, 0xB6, 0x41, 0xF0, 0xF7, 0x75, 0x48, 0x05, 0xE9
  122. };
  123. //结构体消息(内部字段有动态内存分配)
  124. struct msg_3_def msg_3;
  125. msg_3.id = i;
  126. rt_snprintf(msg_3.name, 8, "%s\0", "slyant");
  127. msg_3.buffer = rt_calloc(1, 32);
  128. rt_memcpy(msg_3.buffer, buffer_test, 32);
  129. msg_3.buffer_size = 32;
  130. task_msg_publish_obj(TASK_MSG_3, &msg_3, sizeof(struct msg_3_def));
  131. #endif
  132. }
  133. rt_thread_mdelay(2000);
  134. i++;
  135. }
  136. }
  137. static int task_msg_bus_sample(void)
  138. {
  139. //初始化消息总线(线程栈大小, 优先级, 时间片)
  140. task_msg_bus_init(512, 11, 10);
  141. //订阅消息
  142. task_msg_subscribe(TASK_MSG_NET_REDAY, net_reday_callback);
  143. task_msg_subscribe(TASK_MSG_OS_REDAY, os_reday_callback);
  144. //创建一个等待消息的线程
  145. rt_thread_t t_wait = rt_thread_create("msg_wt", msg_wait_thread_entry, RT_NULL, 512, 16, 10);
  146. rt_thread_startup(t_wait);
  147. //创建一个同时等待多个消息的线程
  148. rt_thread_t t_wait_any = rt_thread_create("msg_wa", msg_wait_any_thread_entry, RT_NULL, 1024, 17, 10);
  149. rt_thread_startup(t_wait_any);
  150. //创建一个发布消息的线程
  151. rt_thread_t t_publish = rt_thread_create("msg_pub", msg_publish_thread_entry, RT_NULL, 512, 15, 10);
  152. rt_thread_startup(t_publish);
  153. return RT_EOK;
  154. }
  155. INIT_APP_EXPORT(task_msg_bus_sample);