task_msg_bus_sample.c 4.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133
  1. #include <board.h>
  2. #include "task_msg_bus.h"
  3. #ifdef TASK_MSG_USING_JSON
  4. #include "cJSON_util.h"
  5. #endif
  6. #define DBG_TAG "task.msg.bus.sample"
  7. #define DBG_LVL DBG_LOG
  8. #include <rtdbg.h>
  9. static void msg_wait_thread_entry(void *params)
  10. {
  11. rt_err_t rst;
  12. struct task_msg_args args;
  13. while(1)
  14. {
  15. /*
  16. //测试 task_msg_wait_until
  17. rst = task_msg_wait_until(TASK_MSG_NET_REDAY, RT_WAITING_FOREVER, &args);
  18. if(rst==RT_EOK)
  19. {
  20. LOG_D("task_msg_wait_until => args.msg_name:%d, args.msg_args_json:%s", args.msg_name, args.msg_args_json);
  21. }
  22. */
  23. //测试 task_msg_wait_any
  24. const enum task_msg_name name_list[4] = {TASK_MSG_OS_REDAY, TASK_MSG_NET_REDAY, TASK_MSG_3, TASK_MSG_5};
  25. rst = task_msg_wait_any(name_list, sizeof(name_list)/sizeof(enum task_msg_name), RT_WAITING_FOREVER, &args);
  26. if(rst==RT_EOK)
  27. {
  28. LOG_D("task_msg_wait_any => args.msg_name:%d, args.msg_args_json:%s", args.msg_name, args.msg_args_json);
  29. #ifdef TASK_MSG_USING_JSON
  30. cJSON *root = cJSON_Parse(args.msg_args_json);
  31. if(root)
  32. {
  33. if(args.msg_name==TASK_MSG_OS_REDAY)
  34. {
  35. int os_reday, id;
  36. if(cJSON_item_get_number(root, "os_reday", &os_reday)==0)
  37. LOG_D("TASK_MSG_OS_REDAY=>os_reday:%d", os_reday);
  38. if(cJSON_item_get_number(root, "id", &id)==0)
  39. LOG_D("TASK_MSG_OS_REDAY=>id:%d", id);
  40. }
  41. else if(args.msg_name==TASK_MSG_NET_REDAY)
  42. {
  43. int os_reday, id;
  44. if(cJSON_item_get_number(root, "net_reday", &os_reday)==0)
  45. LOG_D("TASK_MSG_NET_REDAY=>net_reday:%d", os_reday);
  46. const char *ip = cJSON_item_get_string(root, "ip");
  47. if(ip)
  48. LOG_D("TASK_MSG_NET_REDAY=>ip:%s", ip);
  49. if(cJSON_item_get_number(root, "id", &id)==0)
  50. LOG_D("TASK_MSG_NET_REDAY=>id:%d", id);
  51. }
  52. else if(args.msg_name==TASK_MSG_3)
  53. {
  54. int id;
  55. const char *msg_3 = cJSON_item_get_string(root, "msg_3");
  56. if(msg_3)
  57. LOG_D("TASK_MSG_3=>msg_3:%s", msg_3);
  58. const char *name = cJSON_item_get_string(root, "name");
  59. if(name)
  60. LOG_D("TASK_MSG_3=>name:%s", name);
  61. if(cJSON_item_get_number(root, "id", &id)==0)
  62. LOG_D("TASK_MSG_3=>id:%d", id);
  63. }
  64. cJSON_Delete(root);
  65. }
  66. #endif
  67. }
  68. //释放内存
  69. if(args.msg_args_json)
  70. {
  71. rt_free(args.msg_args_json);
  72. args.msg_args_json = RT_NULL;
  73. }
  74. }
  75. }
  76. static void msg_publish_thread_entry(void *params)
  77. {
  78. static int i = 0;
  79. char arg_json[50];
  80. while (1)
  81. {
  82. if(i % 3 == 0)
  83. {
  84. rt_snprintf(arg_json, 50, "{\"os_reday\":%d,\"id\":%ld}", 1, i);
  85. task_msg_publish(TASK_MSG_OS_REDAY, arg_json);
  86. }
  87. else if(i % 3 == 1)
  88. {
  89. rt_snprintf(arg_json, 50, "{\"net_reday\":%d,\"ip\":\"%s\",\"id\":%ld}", 1, "10.0.0.20", i);
  90. task_msg_publish(TASK_MSG_NET_REDAY, arg_json);
  91. }
  92. else
  93. {
  94. rt_snprintf(arg_json, 50, "{\"msg_3\":\"%s\",\"name\":\"%s\",\"id\":%ld}", "msg3", "slyant", i);
  95. task_msg_publish(TASK_MSG_3, arg_json);
  96. }
  97. rt_thread_mdelay(1000);
  98. i++;
  99. }
  100. }
  101. static void net_reday_callback(task_msg_args_t args)
  102. {
  103. LOG_D("net_reday_callback => args->msg_name:%d, args->msg_args_json:%s", args->msg_name, args->msg_args_json);
  104. }
  105. static void os_reday_callback(task_msg_args_t args)
  106. {
  107. LOG_D("os_reday_callback => args->msg_name:%d, args->msg_args_json:%s", args->msg_name, args->msg_args_json);
  108. }
  109. static int task_msg_bus_sample(void)
  110. {
  111. //初始化消息总线(线程栈大小, 优先级, 时间片)
  112. task_msg_bus_init(512, 25, 10);
  113. //订阅消息
  114. task_msg_subscribe(TASK_MSG_NET_REDAY, net_reday_callback);
  115. task_msg_subscribe(TASK_MSG_OS_REDAY, os_reday_callback);
  116. //创建一个等待消息的线程
  117. rt_thread_t t_wait = rt_thread_create("msg_wait", msg_wait_thread_entry, RT_NULL, 1024*1, 20, 10);
  118. rt_thread_startup(t_wait);
  119. //创建一个发布消息的线程
  120. rt_thread_t t_publish = rt_thread_create("msg_pub", msg_publish_thread_entry, RT_NULL, 1024*1, 15, 10);
  121. rt_thread_startup(t_publish);
  122. return RT_EOK;
  123. }
  124. INIT_APP_EXPORT(task_msg_bus_sample);