Jelajahi Sumber

1.支持消息内容为任意对象类型
2.当消息内容为结构体对象时,结构体内部指针类型支持动态内存分配和自动内存回收(需要用户定义内存回收钩子函数)
3.更新示例

slyant 5 tahun lalu
induk
melakukan
c457a50fcf

+ 99 - 63
examples/task_msg_bus_sample.c

@@ -4,9 +4,28 @@
 #include "cJSON_util.h"
 #endif
 
-#define DBG_TAG "task.msg.bus.sample"
-#define DBG_LVL DBG_LOG
-#include <rtdbg.h>
+#define LOG_TAG              "task.msg.bus.sample"
+#define LOG_LVL              LOG_LVL_DBG
+#include <ulog.h>
+
+#ifdef TASK_MSG_USING_DYNAMIC_MEMORY
+    void msg_3_delete_hook(void *args)
+    {
+        struct msg_3_def *msg_3 = (struct msg_3_def *)args;
+        if(msg_3->buffer)
+            rt_free(msg_3->buffer);
+    }
+#endif
+
+static void net_reday_callback(task_msg_args_t args)
+{
+    LOG_D("[net_reday_callback]:TASK_MSG_NET_REDAY => args->msg_name:%d, args->msg_args:%s", args->msg_name, args->msg_args);
+}
+
+static void os_reday_callback(task_msg_args_t args)
+{
+    LOG_D("[os_reday_callback]:TASK_MSG_OS_REDAY => args is null:%s", args->msg_args==RT_NULL ? "true" : "false");
+}
 
 static void msg_wait_thread_entry(void *params)
 {
@@ -14,62 +33,65 @@ static void msg_wait_thread_entry(void *params)
     task_msg_args_t args;
     while(1)
     {
-        /*
         //测试 task_msg_wait_until
         rst = task_msg_wait_until(TASK_MSG_NET_REDAY, RT_WAITING_FOREVER, &args);
         if(rst==RT_EOK)
         {
-            LOG_D("task_msg_wait_until => args.msg_name:%d, args.msg_args_json:%s", args.msg_name, args.msg_args_json);
+            LOG_D("[task_msg_wait_until]:TASK_MSG_NET_REDAY => args.msg_name:%d, args.msg_args:%s", args->msg_name, args->msg_args);
+            //释放内存
+            task_msg_delete(args);
         }
-        */
-
+    }
+}
+static void msg_wait_any_thread_entry(void *params)
+{
+    rt_err_t rst;
+    task_msg_args_t args = RT_NULL;
+    while(1)
+    {
         //测试 task_msg_wait_any
-        const enum task_msg_name name_list[4] = {TASK_MSG_OS_REDAY, TASK_MSG_NET_REDAY, TASK_MSG_3, TASK_MSG_5};
+        const enum task_msg_name name_list[4] = {TASK_MSG_OS_REDAY, TASK_MSG_NET_REDAY, TASK_MSG_2, TASK_MSG_3};
         rst = task_msg_wait_any(name_list, sizeof(name_list)/sizeof(enum task_msg_name), RT_WAITING_FOREVER, &args);
         if(rst==RT_EOK)
         {
-            LOG_D("task_msg_wait_any => args.msg_name:%d, args.msg_args_json:%s", args->msg_name, args->msg_args_json);
-        #ifdef TASK_MSG_USING_JSON
-            cJSON *root = cJSON_Parse(args->msg_args_json);
-            if(root)
+            if(args->msg_name==TASK_MSG_OS_REDAY)
             {
-                if(args->msg_name==TASK_MSG_OS_REDAY)
-                {
-                    int os_reday, id;
-                    if(cJSON_item_get_number(root, "os_reday", &os_reday)==0)
-                        LOG_D("TASK_MSG_OS_REDAY=>os_reday:%d", os_reday);
-                    if(cJSON_item_get_number(root, "id", &id)==0)
-                        LOG_D("TASK_MSG_OS_REDAY=>id:%d", id);
-                }
-                else if(args->msg_name==TASK_MSG_NET_REDAY)
+                LOG_D("[task_msg_wait_any]:TASK_MSG_OS_REDAY => args is null:%s", args->msg_args==RT_NULL ? "true" : "false");
+            }
+            else if(args->msg_name==TASK_MSG_NET_REDAY)
+            {
+            #ifdef TASK_MSG_USING_JSON
+                cJSON *root = cJSON_Parse(args->msg_args);
+                if(root)
                 {
-                    int os_reday, id;
-                    if(cJSON_item_get_number(root, "net_reday", &os_reday)==0)
-                        LOG_D("TASK_MSG_NET_REDAY=>net_reday:%d", os_reday);
+                    int net_reday, id;
+                    cJSON_item_get_number(root, "net_reday", &net_reday);
+                    cJSON_item_get_number(root, "id", &id);
                     const char *ip = cJSON_item_get_string(root, "ip");
-                    if(ip)
-                        LOG_D("TASK_MSG_NET_REDAY=>ip:%s", ip);
-                    if(cJSON_item_get_number(root, "id", &id)==0)
-                        LOG_D("TASK_MSG_NET_REDAY=>id:%d", id);
+                    LOG_D("[task_msg_wait_any]:TASK_MSG_NET_REDAY => net_reday:%s, ip:%s, id:%d", (net_reday==0 ? "false" : "true"), ip, id);
+                    cJSON_Delete(root);
                 }
-                else if(args->msg_name==TASK_MSG_3)
-                {
-                    int id;
-                    const char *msg_3 = cJSON_item_get_string(root, "msg_3");
-                    if(msg_3)
-                        LOG_D("TASK_MSG_3=>msg_3:%s", msg_3);
-                    const char *name = cJSON_item_get_string(root, "name");
-                    if(name)
-                        LOG_D("TASK_MSG_3=>name:%s", name);
-                    if(cJSON_item_get_number(root, "id", &id)==0)
-                        LOG_D("TASK_MSG_3=>id:%d", id);
-                }
-                cJSON_Delete(root);
+            #else
+                LOG_D("[task_msg_wait_any]:TASK_MSG_NET_REDAY => args.msg_name:%d, args.msg_args:%s", args->msg_name, args->msg_args);
+            #endif
+            }
+            else if(args->msg_name==TASK_MSG_2)
+            {
+                struct msg_2_def *msg_2 = (struct msg_2_def *)args->msg_args;
+                LOG_D("[task_msg_wait_any]:TASK_MSG_2 => msg_2.id:%d, msg_2.name:%s", msg_2->id, msg_2->name);
+            }
+        #ifdef TASK_MSG_USING_DYNAMIC_MEMORY
+            else if(args->msg_name==TASK_MSG_3)
+            {
+                struct msg_3_def *msg_3 = (struct msg_3_def *)args->msg_args;
+                LOG_D("[task_msg_wait_any]:TASK_MSG_3 => msg_3.id:%d, msg_3.name:%s", msg_3->id, msg_3->name);
+                LOG_HEX("[task_msg_wait_any]:TASK_MSG_3 => msg_3.buffer", 16, msg_3->buffer, msg_3->buffer_size);
             }
         #endif
+
+            //释放内存
+            task_msg_delete(args);
         }
-        //释放内存
-        task_msg_delete(args);
     }    
 }
 
@@ -79,49 +101,63 @@ static void msg_publish_thread_entry(void *params)
     char arg_json[50];
     while (1)
     {
-        if(i % 3 == 0)
+        if(i % 4 == 0)
         {
-            rt_snprintf(arg_json, 50, "{\"os_reday\":%d,\"id\":%ld}", 1, i);
-            task_msg_publish(TASK_MSG_OS_REDAY, arg_json);
+            //不带消息内容
+            task_msg_publish(TASK_MSG_OS_REDAY, RT_NULL);
         }
-        else if(i % 3 == 1)
+        else if(i % 4 == 1)
         {
+            //json/text消息
             rt_snprintf(arg_json, 50, "{\"net_reday\":%d,\"ip\":\"%s\",\"id\":%ld}", 1, "10.0.0.20", i);
             task_msg_publish(TASK_MSG_NET_REDAY, arg_json);
         }
+        else if(i % 4 == 2)
+        {
+            //结构体消息(内部字段无动态内存分配)
+            struct msg_2_def msg_2;
+            msg_2.id = i;
+            rt_snprintf(msg_2.name, 8, "%s\0", "hello");
+            task_msg_publish_obj(TASK_MSG_2, &msg_2, sizeof(struct msg_2_def));
+        }
         else
         {
-            rt_snprintf(arg_json, 50, "{\"msg_3\":\"%s\",\"name\":\"%s\",\"id\":%ld}", "msg3", "slyant", i);
-            task_msg_publish(TASK_MSG_3, arg_json);
+        #ifdef TASK_MSG_USING_DYNAMIC_MEMORY
+            const char buffer_test[32] = {
+                0x0F, 0x51, 0xEE, 0x89, 0x9D, 0x40, 0x80, 0x22, 0x63, 0x44, 0x43, 0x39, 0x55, 0x2D, 0x12, 0xA1,
+                0x1C, 0x91, 0xE5, 0x2C, 0xC4, 0x6A, 0x62, 0x5B, 0xB6, 0x41, 0xF0, 0xF7, 0x75, 0x48, 0x05, 0xE9
+            };
+            //结构体消息(内部字段有动态内存分配)
+            struct msg_3_def msg_3;
+            msg_3.id = i;
+            rt_snprintf(msg_3.name, 8, "%s\0", "slyant");
+            msg_3.buffer = rt_calloc(1, 32);
+            rt_memcpy(msg_3.buffer, buffer_test, 32);
+            msg_3.buffer_size = 32;
+            task_msg_publish_obj(TASK_MSG_3, &msg_3, sizeof(struct msg_3_def));
+        #endif
         }
 
-        rt_thread_mdelay(1000);
+        rt_thread_mdelay(2000);
         i++;
     }
 }
 
-static void net_reday_callback(task_msg_args_t args)
-{
-    LOG_D("net_reday_callback => args->msg_name:%d, args->msg_args_json:%s", args->msg_name, args->msg_args_json);
-}
-
-static void os_reday_callback(task_msg_args_t args)
-{
-    LOG_D("os_reday_callback => args->msg_name:%d, args->msg_args_json:%s", args->msg_name, args->msg_args_json);
-}
-
 static int task_msg_bus_sample(void)
 {
     //初始化消息总线(线程栈大小, 优先级, 时间片)
-    task_msg_bus_init(512, 25, 10);
+    task_msg_bus_init(512, 11, 10);
     //订阅消息
     task_msg_subscribe(TASK_MSG_NET_REDAY, net_reday_callback);
     task_msg_subscribe(TASK_MSG_OS_REDAY, os_reday_callback);
     //创建一个等待消息的线程
-    rt_thread_t t_wait = rt_thread_create("msg_wait", msg_wait_thread_entry, RT_NULL, 1024*1, 20, 10);
+    rt_thread_t t_wait = rt_thread_create("msg_wt", msg_wait_thread_entry, RT_NULL, 512, 16, 10);
     rt_thread_startup(t_wait);
+    //创建一个同时等待多个消息的线程
+    rt_thread_t t_wait_any = rt_thread_create("msg_wa", msg_wait_any_thread_entry, RT_NULL, 1024, 17, 10);
+    rt_thread_startup(t_wait_any);
     //创建一个发布消息的线程
-    rt_thread_t t_publish = rt_thread_create("msg_pub", msg_publish_thread_entry, RT_NULL, 1024*1, 15, 10);
+    rt_thread_t t_publish = rt_thread_create("msg_pub", msg_publish_thread_entry, RT_NULL, 512, 15, 10);
     rt_thread_startup(t_publish);
 
     return RT_EOK;

+ 31 - 4
examples/task_msg_name_user_def.h

@@ -1,14 +1,41 @@
 #ifndef TASK_MSG_NAME_USER_DEF_H_
 #define TASK_MSG_NAME_USER_DEF_H_
 
+struct msg_2_def
+{
+    int id;
+    char name[8];
+};
+
 enum task_msg_name{
-    TASK_MSG_OS_REDAY = 0,
-    TASK_MSG_NET_REDAY,
+    TASK_MSG_OS_REDAY = 0,  //RT_NULL
+    TASK_MSG_NET_REDAY,     //json: net_reday:int,ip:string,id:int
     TASK_MSG_1,
-    TASK_MSG_2,
-    TASK_MSG_3,
+    TASK_MSG_2,             //struct msg_2_def
+    TASK_MSG_3,             //struct msg_3_def
     TASK_MSG_4,
     TASK_MSG_5,
     TASK_MSG_COUNT
 };
+
+#ifdef TASK_MSG_USING_DYNAMIC_MEMORY
+    struct msg_3_def
+    {
+        int id;
+        char name[8];
+        rt_uint8_t *buffer;
+        rt_size_t buffer_size;
+    };
+    extern void msg_3_delete_hook(void *args);
+    #define task_msg_delete_hooks {\
+            {TASK_MSG_OS_REDAY, RT_NULL},   \
+            {TASK_MSG_NET_REDAY, RT_NULL},  \
+            {TASK_MSG_1, RT_NULL},          \
+            {TASK_MSG_2, RT_NULL},          \
+            {TASK_MSG_3, msg_3_delete_hook},          \
+            {TASK_MSG_4, RT_NULL},          \
+            {TASK_MSG_5, RT_NULL},          \
+        }
+#endif
+
 #endif

+ 9 - 2
inc/task_msg_bus.h

@@ -18,7 +18,7 @@
 struct task_msg_args
 {
     enum task_msg_name msg_name;
-    char *msg_args_json;
+    void *msg_args;
 };
 typedef struct task_msg_args *task_msg_args_t;
 
@@ -63,10 +63,17 @@ struct task_msg_wait_any_node
 };
 typedef struct task_msg_wait_any_node *task_msg_wait_any_node_t;
 
+struct task_msg_delete_hook
+{
+    enum task_msg_name msg_name;
+    void (*hook)(void *args);
+};
+
 rt_err_t task_msg_bus_init(rt_uint32_t stack_size, rt_uint8_t  priority, rt_uint32_t tick);
 rt_err_t task_msg_subscribe(enum task_msg_name msg_name, void(*callback)(task_msg_args_t msg_args));
 rt_err_t task_msg_unsubscribe(enum task_msg_name msg_name, void(*callback)(task_msg_args_t msg_args));
-rt_err_t task_msg_publish(enum task_msg_name msg_name, const char *args_json);
+rt_err_t task_msg_publish(enum task_msg_name msg_name, const char *args_text);
+rt_err_t task_msg_publish_obj(enum task_msg_name msg_name, void *args, rt_size_t args_size);
 rt_err_t task_msg_wait_until(enum task_msg_name msg_name, rt_uint32_t timeout, struct task_msg_args **out_args);
 rt_err_t task_msg_wait_any(const enum task_msg_name *msg_name_list, rt_uint8_t msg_name_list_len, rt_uint32_t timeout, struct task_msg_args **out_args);
 void task_msg_delete(task_msg_args_t args);

+ 11 - 0
inc/task_msg_name_def.h

@@ -13,12 +13,23 @@
 
 #ifdef TASK_MSG_NAME_USER_DEF
     #include "task_msg_name_user_def.h"
+    #ifdef TASK_MSG_USING_DYNAMIC_MEMORY
+        #ifndef task_msg_delete_hooks
+            #error "Please define 'task_msg_delete_hooks' in the header file:'task_msg_name_user_def.h"
+        #endif
+    #endif
 #else
     enum task_msg_name{
         TASK_MSG_OS_REDAY = 0,
         TASK_MSG_NET_REDAY,
         TASK_MSG_COUNT
     };
+    #ifdef TASK_MSG_USING_DYNAMIC_MEMORY
+        #define task_msg_delete_hooks {\
+                {TASK_MSG_OS_REDAY, RT_NULL},   \
+                {TASK_MSG_NET_REDAY, RT_NULL},  \
+            }
+    #endif
 #endif
 
 #endif /* TASK_MSG_NAME_DEF_H_ */

+ 51 - 7
src/task_msg_bus.c

@@ -22,6 +22,9 @@ static struct rt_mutex cb_lock;
 static struct rt_mutex wt_lock;
 static struct rt_mutex wta_lock;
 static rt_slist_t callback_slist_array[TASK_MSG_COUNT];
+#ifdef TASK_MSG_USING_DYNAMIC_MEMORY
+    static struct task_msg_delete_hook delete_hooks[TASK_MSG_COUNT] = task_msg_delete_hooks;
+#endif
 static rt_slist_t msg_slist = RT_SLIST_OBJECT_INIT(msg_slist);
 static rt_slist_t msg_ref_slist = RT_SLIST_OBJECT_INIT(msg_ref_slist);
 static rt_slist_t wait_slist = RT_SLIST_OBJECT_INIT(wait_slist);
@@ -73,8 +76,16 @@ void task_msg_delete(task_msg_args_t args)
             if(item->ref_count <= 0)
             {
                 rt_slist_remove(&msg_ref_slist, &(item->slist));
-                if(item->args->msg_args_json)
-                    rt_free(item->args->msg_args_json);
+                if(item->args->msg_args)
+                {
+                #ifdef TASK_MSG_USING_DYNAMIC_MEMORY
+                    if(delete_hooks[item->args->msg_name].hook && delete_hooks[item->args->msg_name].msg_name == item->args->msg_name)
+                    {
+                        delete_hooks[item->args->msg_name].hook(item->args->msg_args);
+                    }
+                #endif
+                    rt_free(item->args->msg_args);
+                }
                 rt_free(item->args);
                 rt_free(item);
             }
@@ -110,6 +121,10 @@ rt_err_t task_msg_wait_any(const enum task_msg_name *msg_name_list, rt_uint8_t m
     {
         (*out_args)= node->args;
     }
+    else
+    {
+        task_msg_delete(node->args);
+    }
     rt_mutex_take(&wta_lock, RT_WAITING_FOREVER);
     rt_slist_remove(&wait_any_slist, &(node->slist));
     rt_mutex_release(&wta_lock);
@@ -144,6 +159,10 @@ rt_err_t task_msg_wait_until(enum task_msg_name msg_name, rt_uint32_t timeout, s
     {
         (*out_args) = node->args;
     }
+    else
+    {
+        task_msg_delete(node->args);
+    }
     rt_mutex_take(&wt_lock, RT_WAITING_FOREVER);
     rt_slist_remove(&wait_slist, &(node->slist));
     rt_mutex_release(&wt_lock);
@@ -155,7 +174,7 @@ rt_err_t task_msg_wait_until(enum task_msg_name msg_name, rt_uint32_t timeout, s
 
 rt_err_t task_msg_subscribe(enum task_msg_name msg_name, void(*callback)(task_msg_args_t msg_args))
 {
-    if(task_msg_bus_init_tag==RT_FALSE) return -RT_EINVAL;
+    if(task_msg_bus_init_tag==RT_FALSE || callback==RT_NULL) return -RT_EINVAL;
 
     rt_mutex_take(&cb_lock, RT_WAITING_FOREVER);
     rt_bool_t find_tag = RT_FALSE;
@@ -192,7 +211,7 @@ rt_err_t task_msg_subscribe(enum task_msg_name msg_name, void(*callback)(task_ms
 
 rt_err_t task_msg_unsubscribe(enum task_msg_name msg_name, void(*callback)(task_msg_args_t msg_args))
 {
-    if(task_msg_bus_init_tag==RT_FALSE) return -RT_EINVAL;
+    if(task_msg_bus_init_tag==RT_FALSE || callback==RT_NULL) return -RT_EINVAL;
 
     task_msg_callback_node_t node;
     rt_mutex_take(&cb_lock, RT_WAITING_FOREVER);
@@ -210,7 +229,7 @@ rt_err_t task_msg_unsubscribe(enum task_msg_name msg_name, void(*callback)(task_
     return RT_EOK;
 }
 
-rt_err_t task_msg_publish(enum task_msg_name msg_name, const char *args_json)
+rt_err_t task_msg_publish_obj(enum task_msg_name msg_name, void *args, rt_size_t args_size)
 {
     if(task_msg_bus_init_tag==RT_FALSE) return -RT_EINVAL;
 
@@ -230,8 +249,22 @@ rt_err_t task_msg_publish(enum task_msg_name msg_name, const char *args_json)
     }
 
     msg_args->msg_name = msg_name;
-    msg_args->msg_args_json = RT_NULL;
-    if(args_json) msg_args->msg_args_json = rt_strdup(args_json);
+    msg_args->msg_args = RT_NULL;
+    if(args && args_size>0)
+    {
+        msg_args->msg_args = rt_calloc(1, args_size);
+        if(msg_args->msg_args)
+        {
+            rt_memcpy(msg_args->msg_args, args, args_size);
+        }
+        else
+        {
+            rt_free(node);
+            rt_free(msg_args);
+            LOG_E("task msg publish failed! msg_args create failed!");
+            return -RT_ENOMEM;
+        }
+    }
     node->args = msg_args;
     rt_slist_init(&(node->slist));
     rt_mutex_take(&msg_lock, RT_WAITING_FOREVER);
@@ -243,6 +276,17 @@ rt_err_t task_msg_publish(enum task_msg_name msg_name, const char *args_json)
     return RT_EOK;
 }
 
+rt_err_t task_msg_publish(enum task_msg_name msg_name, const char *args_text)
+{
+    void *args = (void *)args_text;
+    rt_size_t args_size = 0;
+    if(args)
+    {
+        args_size = rt_strlen(args_text) + 1;
+    }
+    return task_msg_publish_obj(msg_name, args, args_size);
+}
+
 static void task_msg_callback_init(void)
 {
     rt_mutex_take(&cb_lock, RT_WAITING_FOREVER);