task_msg_bus_sample.c 4.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129
  1. #include <board.h>
  2. #include "task_msg_bus.h"
  3. #ifdef TASK_MSG_USING_JSON
  4. #include "cJSON_util.h"
  5. #endif
  6. #define DBG_TAG "task.msg.bus.sample"
  7. #define DBG_LVL DBG_LOG
  8. #include <rtdbg.h>
  9. static void msg_wait_thread_entry(void *params)
  10. {
  11. rt_err_t rst;
  12. task_msg_args_t args;
  13. while(1)
  14. {
  15. /*
  16. //测试 task_msg_wait_until
  17. rst = task_msg_wait_until(TASK_MSG_NET_REDAY, RT_WAITING_FOREVER, &args);
  18. if(rst==RT_EOK)
  19. {
  20. LOG_D("task_msg_wait_until => args.msg_name:%d, args.msg_args_json:%s", args.msg_name, args.msg_args_json);
  21. }
  22. */
  23. //测试 task_msg_wait_any
  24. const enum task_msg_name name_list[4] = {TASK_MSG_OS_REDAY, TASK_MSG_NET_REDAY, TASK_MSG_3, TASK_MSG_5};
  25. rst = task_msg_wait_any(name_list, sizeof(name_list)/sizeof(enum task_msg_name), RT_WAITING_FOREVER, &args);
  26. if(rst==RT_EOK)
  27. {
  28. LOG_D("task_msg_wait_any => args.msg_name:%d, args.msg_args_json:%s", args->msg_name, args->msg_args_json);
  29. #ifdef TASK_MSG_USING_JSON
  30. cJSON *root = cJSON_Parse(args->msg_args_json);
  31. if(root)
  32. {
  33. if(args->msg_name==TASK_MSG_OS_REDAY)
  34. {
  35. int os_reday, id;
  36. if(cJSON_item_get_number(root, "os_reday", &os_reday)==0)
  37. LOG_D("TASK_MSG_OS_REDAY=>os_reday:%d", os_reday);
  38. if(cJSON_item_get_number(root, "id", &id)==0)
  39. LOG_D("TASK_MSG_OS_REDAY=>id:%d", id);
  40. }
  41. else if(args->msg_name==TASK_MSG_NET_REDAY)
  42. {
  43. int os_reday, id;
  44. if(cJSON_item_get_number(root, "net_reday", &os_reday)==0)
  45. LOG_D("TASK_MSG_NET_REDAY=>net_reday:%d", os_reday);
  46. const char *ip = cJSON_item_get_string(root, "ip");
  47. if(ip)
  48. LOG_D("TASK_MSG_NET_REDAY=>ip:%s", ip);
  49. if(cJSON_item_get_number(root, "id", &id)==0)
  50. LOG_D("TASK_MSG_NET_REDAY=>id:%d", id);
  51. }
  52. else if(args->msg_name==TASK_MSG_3)
  53. {
  54. int id;
  55. const char *msg_3 = cJSON_item_get_string(root, "msg_3");
  56. if(msg_3)
  57. LOG_D("TASK_MSG_3=>msg_3:%s", msg_3);
  58. const char *name = cJSON_item_get_string(root, "name");
  59. if(name)
  60. LOG_D("TASK_MSG_3=>name:%s", name);
  61. if(cJSON_item_get_number(root, "id", &id)==0)
  62. LOG_D("TASK_MSG_3=>id:%d", id);
  63. }
  64. cJSON_Delete(root);
  65. }
  66. #endif
  67. }
  68. //释放内存
  69. task_msg_delete(args);
  70. }
  71. }
  72. static void msg_publish_thread_entry(void *params)
  73. {
  74. static int i = 0;
  75. char arg_json[50];
  76. while (1)
  77. {
  78. if(i % 3 == 0)
  79. {
  80. rt_snprintf(arg_json, 50, "{\"os_reday\":%d,\"id\":%ld}", 1, i);
  81. task_msg_publish(TASK_MSG_OS_REDAY, arg_json);
  82. }
  83. else if(i % 3 == 1)
  84. {
  85. rt_snprintf(arg_json, 50, "{\"net_reday\":%d,\"ip\":\"%s\",\"id\":%ld}", 1, "10.0.0.20", i);
  86. task_msg_publish(TASK_MSG_NET_REDAY, arg_json);
  87. }
  88. else
  89. {
  90. rt_snprintf(arg_json, 50, "{\"msg_3\":\"%s\",\"name\":\"%s\",\"id\":%ld}", "msg3", "slyant", i);
  91. task_msg_publish(TASK_MSG_3, arg_json);
  92. }
  93. rt_thread_mdelay(1000);
  94. i++;
  95. }
  96. }
  97. static void net_reday_callback(task_msg_args_t args)
  98. {
  99. LOG_D("net_reday_callback => args->msg_name:%d, args->msg_args_json:%s", args->msg_name, args->msg_args_json);
  100. }
  101. static void os_reday_callback(task_msg_args_t args)
  102. {
  103. LOG_D("os_reday_callback => args->msg_name:%d, args->msg_args_json:%s", args->msg_name, args->msg_args_json);
  104. }
  105. static int task_msg_bus_sample(void)
  106. {
  107. //初始化消息总线(线程栈大小, 优先级, 时间片)
  108. task_msg_bus_init(512, 25, 10);
  109. //订阅消息
  110. task_msg_subscribe(TASK_MSG_NET_REDAY, net_reday_callback);
  111. task_msg_subscribe(TASK_MSG_OS_REDAY, os_reday_callback);
  112. //创建一个等待消息的线程
  113. rt_thread_t t_wait = rt_thread_create("msg_wait", msg_wait_thread_entry, RT_NULL, 1024*1, 20, 10);
  114. rt_thread_startup(t_wait);
  115. //创建一个发布消息的线程
  116. rt_thread_t t_publish = rt_thread_create("msg_pub", msg_publish_thread_entry, RT_NULL, 1024*1, 15, 10);
  117. rt_thread_startup(t_publish);
  118. return RT_EOK;
  119. }
  120. INIT_APP_EXPORT(task_msg_bus_sample);