Explorar o código

消息有多个订阅者时,仅保存1份节约内存,当所有订阅者都消费后才释放内存

slyant %!s(int64=5) %!d(string=hai) anos
pai
achega
3fb5b882cd
Modificáronse 3 ficheiros con 110 adicións e 58 borrados
  1. 7 11
      examples/task_msg_bus_sample.c
  2. 13 5
      inc/task_msg_bus.h
  3. 90 42
      src/task_msg_bus.c

+ 7 - 11
examples/task_msg_bus_sample.c

@@ -11,7 +11,7 @@
 static void msg_wait_thread_entry(void *params)
 {
     rt_err_t rst;
-    struct task_msg_args args;
+    task_msg_args_t args;
     while(1)
     {
         /*
@@ -28,12 +28,12 @@ static void msg_wait_thread_entry(void *params)
         rst = task_msg_wait_any(name_list, sizeof(name_list)/sizeof(enum task_msg_name), RT_WAITING_FOREVER, &args);
         if(rst==RT_EOK)
         {
-            LOG_D("task_msg_wait_any => args.msg_name:%d, args.msg_args_json:%s", args.msg_name, args.msg_args_json);
+            LOG_D("task_msg_wait_any => args.msg_name:%d, args.msg_args_json:%s", args->msg_name, args->msg_args_json);
         #ifdef TASK_MSG_USING_JSON
-            cJSON *root = cJSON_Parse(args.msg_args_json);
+            cJSON *root = cJSON_Parse(args->msg_args_json);
             if(root)
             {
-                if(args.msg_name==TASK_MSG_OS_REDAY)
+                if(args->msg_name==TASK_MSG_OS_REDAY)
                 {
                     int os_reday, id;
                     if(cJSON_item_get_number(root, "os_reday", &os_reday)==0)
@@ -41,7 +41,7 @@ static void msg_wait_thread_entry(void *params)
                     if(cJSON_item_get_number(root, "id", &id)==0)
                         LOG_D("TASK_MSG_OS_REDAY=>id:%d", id);
                 }
-                else if(args.msg_name==TASK_MSG_NET_REDAY)
+                else if(args->msg_name==TASK_MSG_NET_REDAY)
                 {
                     int os_reday, id;
                     if(cJSON_item_get_number(root, "net_reday", &os_reday)==0)
@@ -52,7 +52,7 @@ static void msg_wait_thread_entry(void *params)
                     if(cJSON_item_get_number(root, "id", &id)==0)
                         LOG_D("TASK_MSG_NET_REDAY=>id:%d", id);
                 }
-                else if(args.msg_name==TASK_MSG_3)
+                else if(args->msg_name==TASK_MSG_3)
                 {
                     int id;
                     const char *msg_3 = cJSON_item_get_string(root, "msg_3");
@@ -69,11 +69,7 @@ static void msg_wait_thread_entry(void *params)
         #endif
         }
         //释放内存
-        if(args.msg_args_json)
-        {
-            rt_free(args.msg_args_json);
-            args.msg_args_json = RT_NULL;
-        }
+        task_msg_delete(args);
     }    
 }
 

+ 13 - 5
inc/task_msg_bus.h

@@ -22,6 +22,14 @@ struct task_msg_args
 };
 typedef struct task_msg_args *task_msg_args_t;
 
+struct task_msg_ref_node
+{
+    task_msg_args_t args;
+    int ref_count;
+    rt_slist_t slist;
+};
+typedef struct task_msg_ref_node *task_msg_ref_node_t;
+
 struct task_msg_args_node
 {
     task_msg_args_t args;
@@ -40,7 +48,7 @@ struct task_msg_wait_node
 {
     enum task_msg_name msg_name;
     struct rt_semaphore msg_sem;
-    char *msg_args_json;
+    task_msg_args_t args;
     rt_slist_t slist;
 };
 typedef struct task_msg_wait_node *task_msg_wait_node_t;
@@ -50,8 +58,7 @@ struct task_msg_wait_any_node
     enum task_msg_name *msg_name_list;
     rt_uint8_t msg_name_list_len;
     struct rt_semaphore msg_sem;
-    enum task_msg_name msg_name;
-    char *msg_args_json;
+    task_msg_args_t args;
     rt_slist_t slist;
 };
 typedef struct task_msg_wait_any_node *task_msg_wait_any_node_t;
@@ -60,7 +67,8 @@ rt_err_t task_msg_bus_init(rt_uint32_t stack_size, rt_uint8_t  priority, rt_uint
 rt_err_t task_msg_subscribe(enum task_msg_name msg_name, void(*callback)(task_msg_args_t msg_args));
 rt_err_t task_msg_unsubscribe(enum task_msg_name msg_name, void(*callback)(task_msg_args_t msg_args));
 rt_err_t task_msg_publish(enum task_msg_name msg_name, const char *args_json);
-rt_err_t task_msg_wait_until(enum task_msg_name msg_name, rt_uint32_t timeout, task_msg_args_t out_args);
-rt_err_t task_msg_wait_any(const enum task_msg_name *msg_name_list, rt_uint8_t msg_name_list_len, rt_uint32_t timeout, task_msg_args_t out_args);
+rt_err_t task_msg_wait_until(enum task_msg_name msg_name, rt_uint32_t timeout, struct task_msg_args **out_args);
+rt_err_t task_msg_wait_any(const enum task_msg_name *msg_name_list, rt_uint8_t msg_name_list_len, rt_uint32_t timeout, struct task_msg_args **out_args);
+void task_msg_delete(task_msg_args_t args);
 
 #endif /* TASK_MSG_BUS_H_ */

+ 90 - 42
src/task_msg_bus.c

@@ -17,15 +17,74 @@
 static rt_bool_t task_msg_bus_init_tag = RT_FALSE;
 static struct rt_semaphore msg_sem;
 static struct rt_mutex msg_lock;
+static struct rt_mutex msg_ref_lock;
 static struct rt_mutex cb_lock;
 static struct rt_mutex wt_lock;
 static struct rt_mutex wta_lock;
 static rt_slist_t callback_slist_array[TASK_MSG_COUNT];
 static rt_slist_t msg_slist = RT_SLIST_OBJECT_INIT(msg_slist);
+static rt_slist_t msg_ref_slist = RT_SLIST_OBJECT_INIT(msg_ref_slist);
 static rt_slist_t wait_slist = RT_SLIST_OBJECT_INIT(wait_slist);
 static rt_slist_t wait_any_slist = RT_SLIST_OBJECT_INIT(wait_any_slist);
 
-rt_err_t task_msg_wait_any(const enum task_msg_name *msg_name_list, rt_uint8_t msg_name_list_len, rt_uint32_t timeout, task_msg_args_t out_args)
+static void msg_ref_append(task_msg_args_t args)
+{
+    rt_bool_t is_first_append = RT_TRUE;
+    task_msg_ref_node_t item;
+    rt_mutex_take(&msg_ref_lock, RT_WAITING_FOREVER);
+    rt_slist_for_each_entry(item, &msg_ref_slist, slist)
+    {
+        if(item->args==args)
+        {
+            is_first_append = RT_FALSE;
+            item->ref_count++;
+            break;
+        }
+    }
+    if(is_first_append)
+    {
+        char name[RT_NAME_MAX];
+        task_msg_ref_node_t node = rt_calloc(1, sizeof(struct task_msg_ref_node));
+        if(node == RT_NULL)
+        {
+            rt_mutex_release(&msg_ref_lock);
+            LOG_E("there is no memory available!");
+            return;
+        }
+        rt_uint32_t count = rt_slist_len(&msg_ref_slist);
+        rt_snprintf(name, RT_NAME_MAX, "ref_%d", count);
+        node->args = args;
+        node->ref_count = 1;
+        rt_slist_init(&(node->slist));
+        rt_slist_append(&msg_ref_slist, &(node->slist));
+    }
+    rt_mutex_release(&msg_ref_lock);
+}
+
+void task_msg_delete(task_msg_args_t args)
+{
+    task_msg_ref_node_t item;
+    rt_mutex_take(&msg_ref_lock, RT_WAITING_FOREVER);
+    rt_slist_for_each_entry(item, &msg_ref_slist, slist)
+    {
+        if(item->args==args)
+        {
+            item->ref_count--;
+            if(item->ref_count <= 0)
+            {
+                rt_slist_remove(&msg_ref_slist, &(item->slist));
+                if(item->args->msg_args_json)
+                    rt_free(item->args->msg_args_json);
+                rt_free(item->args);
+                rt_free(item);
+            }
+            break;
+        }
+    }
+    rt_mutex_release(&msg_ref_lock);
+}
+
+rt_err_t task_msg_wait_any(const enum task_msg_name *msg_name_list, rt_uint8_t msg_name_list_len, rt_uint32_t timeout, struct task_msg_args **out_args)
 {
     if(task_msg_bus_init_tag==RT_FALSE) return -RT_EINVAL;
 
@@ -41,7 +100,6 @@ rt_err_t task_msg_wait_any(const enum task_msg_name *msg_name_list, rt_uint8_t m
     rt_sem_init(&(node->msg_sem), name, 0, RT_IPC_FLAG_FIFO);
     node->msg_name_list = (enum task_msg_name *)msg_name_list;
     node->msg_name_list_len = msg_name_list_len;
-    node->msg_args_json = RT_NULL;
     rt_slist_init(&(node->slist));
     rt_mutex_take(&wta_lock, RT_WAITING_FOREVER);
     rt_slist_append(&wait_any_slist, &(node->slist));
@@ -50,22 +108,18 @@ rt_err_t task_msg_wait_any(const enum task_msg_name *msg_name_list, rt_uint8_t m
     rt_err_t rst = rt_sem_take(&(node->msg_sem), timeout);
     if(rst==RT_EOK && out_args)
     {
-        (*out_args).msg_name = node->msg_name;
-        (*out_args).msg_args_json = RT_NULL;
-        if(node->msg_args_json) (*out_args).msg_args_json = rt_strdup(node->msg_args_json);
+        (*out_args)= node->args;
     }
     rt_mutex_take(&wta_lock, RT_WAITING_FOREVER);
     rt_slist_remove(&wait_any_slist, &(node->slist));
     rt_mutex_release(&wta_lock);
     rt_sem_detach(&(node->msg_sem));
-    if(node->msg_args_json)
-        rt_free(node->msg_args_json);
     rt_free(node);
 
     return rst;
 }
 
-rt_err_t task_msg_wait_until(enum task_msg_name msg_name, rt_uint32_t timeout, task_msg_args_t out_args)
+rt_err_t task_msg_wait_until(enum task_msg_name msg_name, rt_uint32_t timeout, struct task_msg_args **out_args)
 {
     if(task_msg_bus_init_tag==RT_FALSE) return -RT_EINVAL;
 
@@ -80,7 +134,6 @@ rt_err_t task_msg_wait_until(enum task_msg_name msg_name, rt_uint32_t timeout, t
     rt_snprintf(name, RT_NAME_MAX, "wt_%d", count);
     rt_sem_init(&(node->msg_sem), name, 0, RT_IPC_FLAG_FIFO);
     node->msg_name = msg_name;
-    node->msg_args_json = RT_NULL;
     rt_slist_init(&(node->slist));
     rt_mutex_take(&wt_lock, RT_WAITING_FOREVER);
     rt_slist_append(&wait_slist, &(node->slist));
@@ -89,16 +142,12 @@ rt_err_t task_msg_wait_until(enum task_msg_name msg_name, rt_uint32_t timeout, t
     rt_err_t rst = rt_sem_take(&(node->msg_sem), timeout);
     if(rst==RT_EOK && out_args)
     {
-        (*out_args).msg_name = msg_name;
-        (*out_args).msg_args_json = RT_NULL;
-        if(node->msg_args_json) (*out_args).msg_args_json = rt_strdup(node->msg_args_json);
+        (*out_args) = node->args;
     }
     rt_mutex_take(&wt_lock, RT_WAITING_FOREVER);
     rt_slist_remove(&wait_slist, &(node->slist));
     rt_mutex_release(&wt_lock);
     rt_sem_detach(&(node->msg_sem));
-    if(node->msg_args_json)
-        rt_free(node->msg_args_json);
     rt_free(node);
 
     return rst;
@@ -108,14 +157,6 @@ rt_err_t task_msg_subscribe(enum task_msg_name msg_name, void(*callback)(task_ms
 {
     if(task_msg_bus_init_tag==RT_FALSE) return -RT_EINVAL;
 
-    task_msg_callback_node_t callback_node = rt_calloc(1, sizeof(struct task_msg_callback_node));
-    if(callback_node == RT_NULL)
-    {
-        LOG_E("there is no memory available!");
-        return RT_ENOMEM;
-    }
-    callback_node->callback = callback;
-    rt_slist_init(&(callback_node->slist));
     rt_mutex_take(&cb_lock, RT_WAITING_FOREVER);
     rt_bool_t find_tag = RT_FALSE;
     task_msg_callback_node_t node;
@@ -133,6 +174,15 @@ rt_err_t task_msg_subscribe(enum task_msg_name msg_name, void(*callback)(task_ms
     }
     else
     {
+        task_msg_callback_node_t callback_node = rt_calloc(1, sizeof(struct task_msg_callback_node));
+        if(callback_node == RT_NULL)
+        {
+            rt_mutex_release(&cb_lock);
+            LOG_E("there is no memory available!");
+            return RT_ENOMEM;
+        }
+        callback_node->callback = callback;
+        rt_slist_init(&(callback_node->slist));
         rt_slist_append(&callback_slist_array[msg_name], &(callback_node->slist));
     }
     rt_mutex_release(&cb_lock);
@@ -174,6 +224,7 @@ rt_err_t task_msg_publish(enum task_msg_name msg_name, const char *args_json)
     task_msg_args_t msg_args = rt_calloc(1, sizeof(struct task_msg_args));
     if(msg_args == RT_NULL)
     {
+        rt_free(node);
         LOG_E("task msg publish failed! msg_args create failed!");
         return -RT_ENOMEM;
     }
@@ -216,21 +267,15 @@ static void task_msg_bus_thread_entry(void *params)
             {
                 //get msg
                 msg_args_node = rt_slist_first_entry(&msg_slist, struct task_msg_args_node, slist);
-                //msg callback
-                rt_mutex_take(&cb_lock, RT_WAITING_FOREVER);
-                rt_slist_for_each_entry(msg_callback_node, &callback_slist_array[msg_args_node->args->msg_name], slist)
-                {
-                    if(msg_callback_node->callback) msg_callback_node->callback(msg_args_node->args);
-                }
-                rt_mutex_release(&cb_lock);
+                msg_ref_append(msg_args_node->args);
                 //check wait msg until
                 rt_mutex_take(&wt_lock, RT_WAITING_FOREVER);
                 rt_slist_for_each_entry(msg_wait_node, &wait_slist, slist)
                 {
                     if(msg_wait_node->msg_name==msg_args_node->args->msg_name)
                     {
-                        if(msg_args_node->args->msg_args_json)
-                            msg_wait_node->msg_args_json = rt_strdup(msg_args_node->args->msg_args_json);
+                        msg_wait_node->args = msg_args_node->args;
+                        msg_ref_append(msg_args_node->args);
                         rt_sem_release(&(msg_wait_node->msg_sem));
                     }
                 }
@@ -243,28 +288,30 @@ static void task_msg_bus_thread_entry(void *params)
                     {
                         if(msg_wait_any_node->msg_name_list[i]==msg_args_node->args->msg_name)
                         {
-                            msg_wait_any_node->msg_name = msg_args_node->args->msg_name;
-                            if(msg_args_node->args->msg_args_json)
-                                msg_wait_any_node->msg_args_json = rt_strdup(msg_args_node->args->msg_args_json);
+                            msg_wait_any_node->args = msg_args_node->args;
+                            msg_ref_append(msg_args_node->args);
                             rt_sem_release(&(msg_wait_any_node->msg_sem));
                             break;
                         }
                     }
                 }
                 rt_mutex_release(&wta_lock);
+                //msg callback
+                rt_mutex_take(&cb_lock, RT_WAITING_FOREVER);
+                rt_slist_for_each_entry(msg_callback_node, &callback_slist_array[msg_args_node->args->msg_name], slist)
+                {
+                    if(msg_callback_node->callback)
+                    {
+                        msg_callback_node->callback(msg_args_node->args);
+                    }
+                }
+                rt_mutex_release(&cb_lock);
                 //remove msg
                 rt_mutex_take(&msg_lock, RT_WAITING_FOREVER);
                 rt_slist_remove(&msg_slist, &(msg_args_node->slist));
                 rt_mutex_release(&msg_lock);
                 //free msg
-                if(msg_args_node->args)
-                {
-                    if(msg_args_node->args->msg_args_json)
-                    {
-                        rt_free(msg_args_node->args->msg_args_json);
-                    }
-                    rt_free(msg_args_node->args);
-                }
+                task_msg_delete(msg_args_node->args);
                 rt_free(msg_args_node);
             }
         }
@@ -277,6 +324,7 @@ rt_err_t task_msg_bus_init(rt_uint32_t stack_size, rt_uint8_t  priority, rt_uint
     {
         rt_sem_init(&msg_sem, "msg_sem", 0, RT_IPC_FLAG_FIFO);
         rt_mutex_init(&msg_lock, "msg_lock", RT_IPC_FLAG_FIFO);
+        rt_mutex_init(&msg_ref_lock, "ref_lock", RT_IPC_FLAG_FIFO);
         rt_mutex_init(&cb_lock, "cb_lock", RT_IPC_FLAG_FIFO);
         rt_mutex_init(&wt_lock, "wt_lock", RT_IPC_FLAG_FIFO);
         rt_mutex_init(&wta_lock, "wta_lock", RT_IPC_FLAG_FIFO);