|
|
@@ -49,9 +49,11 @@ RT-Thread online packages
|
|
|
| rt_err_t task_msg_unsubscribe(enum task_msg_name msg_name, void(*callback)(task_msg_args_t msg_args)); | 取消订阅消息 |
|
|
|
| rt_err_t task_msg_publish(enum task_msg_name msg_name, const char *msg_text); | 发布text/json消息 |
|
|
|
| rt_err_t task_msg_publish_obj(enum task_msg_name msg_name, void *msg_obj, rt_size_t msg_size); | 发布任意数据类型消息 |
|
|
|
-| rt_err_t task_msg_wait_until(enum task_msg_name msg_name, rt_int32_t timeout, struct task_msg_args **out_args); | 阻塞等待指定名称消息 |
|
|
|
-| rt_err_t task_msg_wait_any(const enum task_msg_name *msg_name_list, rt_uint8_t msg_name_list_len, rt_int32_t timeout, struct task_msg_args **out_args); | 阻塞等待名称列表中的任一消息 |
|
|
|
+| int task_msg_subscriber_create(enum task_msg_name msg_name); | 创建一个消息订阅者,返回订阅者ID |
|
|
|
+| int task_msg_subscriber_create2(const enum task_msg_name *msg_name_list, rt_uint8_t msg_name_list_len); | 创建一个可以订阅多个主题的消息订阅者,返回订阅者ID |
|
|
|
+| rt_err_t task_msg_wait_until(int subscriber_id, rt_int32_t timeout_ms, struct task_msg_args **out_args); | 阻塞等待指定订阅者的消息 |
|
|
|
| void task_msg_release(task_msg_args_t args); | 释放已经消费的消息 |
|
|
|
+| void task_msg_subscriber_delete(int subscriber_id); | 删除一个消息订阅者 |
|
|
|
|
|
|
### 3.2 使用方法
|
|
|
* 在包管理器中取消Enable TaskMsgBus Sample选项
|
|
|
@@ -192,16 +194,24 @@ static void msg_wait_thread_entry(void *params)
|
|
|
{
|
|
|
rt_err_t rst;
|
|
|
task_msg_args_t args;
|
|
|
+ //创建消息订阅者
|
|
|
+ int subscriber_id = task_msg_subscriber_create(TASK_MSG_NET_REDAY);
|
|
|
+ if(subscriber_id < 0) return;
|
|
|
+
|
|
|
while(1)
|
|
|
{
|
|
|
- //测试 task_msg_wait_until
|
|
|
- rst = task_msg_wait_until(TASK_MSG_NET_REDAY, RT_WAITING_FOREVER, &args);
|
|
|
+ rst = task_msg_wait_until(subscriber_id, 50, &args);
|
|
|
if(rst==RT_EOK)
|
|
|
{
|
|
|
LOG_D("[task_msg_wait_until]:TASK_MSG_NET_REDAY => args.msg_name:%d, args.msg_obj:%s", args->msg_name, args->msg_obj);
|
|
|
+ rt_thread_mdelay(200);//模拟耗时操作,在此期间发布的消息不会丢失
|
|
|
//释放消息
|
|
|
task_msg_release(args);
|
|
|
}
|
|
|
+ else
|
|
|
+ {
|
|
|
+ //可以做其它操作,在此期间发布的消息不会丢失
|
|
|
+ }
|
|
|
}
|
|
|
}
|
|
|
|
|
|
@@ -216,11 +226,14 @@ static void msg_wait_any_thread_entry(void *params)
|
|
|
{
|
|
|
rt_err_t rst;
|
|
|
task_msg_args_t args = RT_NULL;
|
|
|
+ const enum task_msg_name name_list[4] = {TASK_MSG_OS_REDAY, TASK_MSG_NET_REDAY, TASK_MSG_2, TASK_MSG_3};
|
|
|
+ //创建 多消息订阅者
|
|
|
+ int subscriber_id = task_msg_subscriber_create2(name_list, sizeof(name_list)/sizeof(enum task_msg_name));
|
|
|
+ if(subscriber_id < 0) return;
|
|
|
+
|
|
|
while(1)
|
|
|
{
|
|
|
- //测试 task_msg_wait_any
|
|
|
- const enum task_msg_name name_list[4] = {TASK_MSG_OS_REDAY, TASK_MSG_NET_REDAY, TASK_MSG_2, TASK_MSG_3};
|
|
|
- rst = task_msg_wait_any(name_list, sizeof(name_list)/sizeof(enum task_msg_name), RT_WAITING_FOREVER, &args);
|
|
|
+ rst = task_msg_wait_until(subscriber_id, 50, &args);
|
|
|
if(rst==RT_EOK)
|
|
|
{
|
|
|
if(args->msg_name==TASK_MSG_OS_REDAY)
|
|
|
@@ -229,6 +242,7 @@ static void msg_wait_any_thread_entry(void *params)
|
|
|
}
|
|
|
else if(args->msg_name==TASK_MSG_NET_REDAY)
|
|
|
{
|
|
|
+ #ifdef TASK_MSG_USING_JSON
|
|
|
cJSON *root = cJSON_Parse(args->msg_obj);
|
|
|
if(root)
|
|
|
{
|
|
|
@@ -239,21 +253,31 @@ static void msg_wait_any_thread_entry(void *params)
|
|
|
LOG_D("[task_msg_wait_any]:TASK_MSG_NET_REDAY => net_reday:%s, ip:%s, id:%d", (net_reday==0 ? "false" : "true"), ip, id);
|
|
|
cJSON_Delete(root);
|
|
|
}
|
|
|
+ #else
|
|
|
+ LOG_D("[task_msg_wait_any]:TASK_MSG_NET_REDAY => args.msg_name:%d, args.msg_obj:%s", args->msg_name, args->msg_obj);
|
|
|
+ #endif
|
|
|
}
|
|
|
else if(args->msg_name==TASK_MSG_2)
|
|
|
{
|
|
|
struct msg_2_def *msg_2 = (struct msg_2_def *)args->msg_obj;
|
|
|
LOG_D("[task_msg_wait_any]:TASK_MSG_2 => msg_2.id:%d, msg_2.name:%s", msg_2->id, msg_2->name);
|
|
|
}
|
|
|
+ #ifdef TASK_MSG_USING_DYNAMIC_MEMORY
|
|
|
else if(args->msg_name==TASK_MSG_3)
|
|
|
{
|
|
|
struct msg_3_def *msg_3 = (struct msg_3_def *)args->msg_obj;
|
|
|
LOG_D("[task_msg_wait_any]:TASK_MSG_3 => msg_3.id:%d, msg_3.name:%s", msg_3->id, msg_3->name);
|
|
|
LOG_HEX("[task_msg_wait_any]:TASK_MSG_3 => msg_3.buffer", 16, msg_3->buffer, msg_3->buffer_size);
|
|
|
}
|
|
|
+ #endif
|
|
|
+ rt_thread_mdelay(200);//模拟耗时操作,在此期间发布的消息不会丢失
|
|
|
//释放消息
|
|
|
task_msg_release(args);
|
|
|
}
|
|
|
+ else
|
|
|
+ {
|
|
|
+ //可以做其它操作,在此期间发布的消息不会丢失
|
|
|
+ }
|
|
|
}
|
|
|
}
|
|
|
|
|
|
@@ -264,11 +288,11 @@ rt_thread_startup(t_wait_any);
|
|
|
|
|
|
## 4、注意事项
|
|
|
|
|
|
-* 不要在订阅消息的回调函数中执行消耗资源的操作,否则,请在单独的线程中,使用task_msg_wait_until或task_msg_wait_any来处理需要关注的消息。
|
|
|
+* 不要在订阅消息的回调函数中执行耗时的操作,否则,请在单独的线程中,使用task_msg_wait_until来处理需要关注的消息。
|
|
|
|
|
|
* 如果使用了结构体数据类型的消息,同时在结构体中定义了指针,且动态分配了内存,一定要设置释放内存的钩子函数,否则会造成内存泄露。
|
|
|
|
|
|
-* 在使用task_msg_wait_until或task_msg_wait_any函数接收消息时,仅当函数返回了RT_EOK时,记得使用task_msg_release函数释放该消息(在其它任何情况下都不要使用task_msg_release函数)。当所有关注该消息的订阅者全部释放了该消息时,该消息才真正从物理内存中释放。
|
|
|
+* 在使用task_msg_wait_until函数接收消息时,仅当函数返回了RT_EOK时,记得使用task_msg_release函数释放该消息(在其它任何情况下都不要使用task_msg_release函数)。当所有关注该消息的订阅者全部释放了该消息时,该消息才真正从物理内存中释放。
|
|
|
|
|
|
## 5、联系方式 & 感谢
|
|
|
|