Procházet zdrojové kódy

增加延时消息和定时消息类型

slyant před 5 roky
rodič
revize
69a839719d
2 změnil soubory, kde provedl 269 přidání a 67 odebrání
  1. 21 4
      inc/task_msg_bus.h
  2. 248 63
      src/task_msg_bus.c

+ 21 - 4
inc/task_msg_bus.h

@@ -39,7 +39,7 @@ typedef struct task_msg_args_node *task_msg_args_node_t;
 
 struct task_msg_callback_node
 {
-    void(*callback)(const task_msg_args_t msg_args);
+    void (*callback)(const task_msg_args_t msg_args);
     rt_slist_t slist;
 };
 typedef struct task_msg_callback_node *task_msg_callback_node_t;
@@ -67,11 +67,28 @@ struct task_msg_release_hook
     void (*hook)(void *args);
 };
 
-rt_err_t task_msg_bus_init(rt_uint32_t stack_size, rt_uint8_t  priority, rt_uint32_t tick);
-rt_err_t task_msg_subscribe(enum task_msg_name msg_name, void(*callback)(task_msg_args_t msg_args));
-rt_err_t task_msg_unsubscribe(enum task_msg_name msg_name, void(*callback)(task_msg_args_t msg_args));
+struct task_msg_loop
+{
+    enum task_msg_name msg_name;
+    void *msg_obj;
+    rt_uint32_t msg_size;
+    rt_timer_t timer;
+};
+typedef struct task_msg_loop *task_msg_loop_t;
+
+rt_err_t task_msg_bus_init(rt_uint32_t stack_size, rt_uint8_t priority, rt_uint32_t tick);
+rt_err_t task_msg_subscribe(enum task_msg_name msg_name, void (*callback)(task_msg_args_t msg_args));
+rt_err_t task_msg_unsubscribe(enum task_msg_name msg_name, void (*callback)(task_msg_args_t msg_args));
 rt_err_t task_msg_publish(enum task_msg_name msg_name, const char *msg_text);
 rt_err_t task_msg_publish_obj(enum task_msg_name msg_name, void *msg_obj, rt_size_t msg_size);
+rt_err_t task_msg_delay_publish(rt_uint32_t delay_ms, enum task_msg_name msg_name, const char *msg_text);
+rt_err_t task_msg_delay_publish_obj(rt_uint32_t delay_ms, enum task_msg_name msg_name, void *msg_obj,
+        rt_size_t msg_size);
+task_msg_loop_t task_msg_loop_create(rt_uint32_t delay_ms, enum task_msg_name msg_name, void *msg_obj,
+        rt_size_t msg_size);
+rt_err_t task_msg_loop_delete(task_msg_loop_t msg_timing);
+rt_err_t task_msg_loop_start(task_msg_loop_t msg_timing);
+rt_err_t task_msg_loop_stop(task_msg_loop_t msg_timing);
 int task_msg_subscriber_create(enum task_msg_name msg_name);
 int task_msg_subscriber_create2(const enum task_msg_name *msg_name_list, rt_uint8_t msg_name_list_len);
 void task_msg_subscriber_delete(int subscriber_id);

+ 248 - 63
src/task_msg_bus.c

@@ -23,7 +23,7 @@ static struct rt_mutex sub_lock;
 static struct rt_mutex wt_lock;
 static rt_slist_t callback_slist_array[TASK_MSG_COUNT];
 #ifdef TASK_MSG_USING_DYNAMIC_MEMORY
-    static struct task_msg_release_hook release_hooks[TASK_MSG_COUNT] = task_msg_release_hooks;
+static struct task_msg_release_hook release_hooks[TASK_MSG_COUNT] = task_msg_release_hooks;
 #endif
 static rt_slist_t msg_slist = RT_SLIST_OBJECT_INIT(msg_slist);
 static rt_slist_t msg_ref_slist = RT_SLIST_OBJECT_INIT(msg_ref_slist);
@@ -44,17 +44,17 @@ static void msg_ref_append(task_msg_args_t args)
     rt_mutex_take(&msg_ref_lock, RT_WAITING_FOREVER);
     rt_slist_for_each_entry(item, &msg_ref_slist, slist)
     {
-        if(item->args==args)
+        if (item->args == args)
         {
             is_first_append = RT_FALSE;
             item->ref_count++;
             break;
         }
     }
-    if(is_first_append)
+    if (is_first_append)
     {
         task_msg_ref_node_t node = rt_calloc(1, sizeof(struct task_msg_ref_node));
-        if(node == RT_NULL)
+        if (node == RT_NULL)
         {
             rt_mutex_release(&msg_ref_lock);
             LOG_E("there is no memory available!");
@@ -81,20 +81,21 @@ void task_msg_release(task_msg_args_t args)
     rt_mutex_take(&msg_ref_lock, RT_WAITING_FOREVER);
     rt_slist_for_each_entry(item, &msg_ref_slist, slist)
     {
-        if(item->args==args)
+        if (item->args == args)
         {
             item->ref_count--;
-            if(item->ref_count <= 0)
+            if (item->ref_count <= 0)
             {
                 rt_slist_remove(&msg_ref_slist, &(item->slist));
-                if(item->args->msg_obj)
+                if (item->args->msg_obj)
                 {
-                #ifdef TASK_MSG_USING_DYNAMIC_MEMORY
-                    if(release_hooks[item->args->msg_name].hook && release_hooks[item->args->msg_name].msg_name == item->args->msg_name)
+#ifdef TASK_MSG_USING_DYNAMIC_MEMORY
+                    if (release_hooks[item->args->msg_name].hook
+                            && release_hooks[item->args->msg_name].msg_name == item->args->msg_name)
                     {
                         release_hooks[item->args->msg_name].hook(item->args->msg_obj);
                     }
-                #endif
+#endif
                     rt_free(item->args->msg_obj);
                 }
                 rt_free(item->args);
@@ -113,16 +114,19 @@ void task_msg_release(task_msg_args_t args)
  */
 int task_msg_subscriber_create(enum task_msg_name msg_name)
 {
-    if(task_msg_bus_init_tag==RT_FALSE) return -1;
+    if (task_msg_bus_init_tag == RT_FALSE)
+        return -1;
 
-    task_msg_subscriber_node_t subscriber = (task_msg_subscriber_node_t)rt_calloc(1, sizeof(struct task_msg_subscriber_node));
-    if(subscriber==RT_NULL) return -1;
+    task_msg_subscriber_node_t subscriber = (task_msg_subscriber_node_t) rt_calloc(1,
+            sizeof(struct task_msg_subscriber_node));
+    if (subscriber == RT_NULL)
+        return -1;
 
     int id = subscriber_id++;
     char name[RT_NAME_MAX];
     rt_snprintf(name, RT_NAME_MAX, "sub_%d", id);
     subscriber->sem = rt_sem_create(name, 0, RT_IPC_FLAG_FIFO);
-    if(subscriber->sem==RT_NULL)
+    if (subscriber->sem == RT_NULL)
     {
         rt_free(subscriber);
         return -1;
@@ -146,20 +150,22 @@ int task_msg_subscriber_create(enum task_msg_name msg_name)
  */
 int task_msg_subscriber_create2(const enum task_msg_name *msg_name_list, rt_uint8_t msg_name_list_len)
 {
-    if(task_msg_bus_init_tag==RT_FALSE) return -1;
+    if (task_msg_bus_init_tag == RT_FALSE)
+        return -1;
 
     int id = subscriber_id++;
     char name[RT_NAME_MAX];
     rt_snprintf(name, RT_NAME_MAX, "sub_%d", id);
     rt_sem_t sem = rt_sem_create(name, 0, RT_IPC_FLAG_FIFO);
-    if(sem==RT_NULL) return -1;
+    if (sem == RT_NULL)
+        return -1;
 
     int count = 0;
     rt_mutex_take(&sub_lock, RT_WAITING_FOREVER);
-    for(int i=0; i<msg_name_list_len; i++)
+    for (int i = 0; i < msg_name_list_len; i++)
     {
         task_msg_subscriber_node_t subscriber = rt_calloc(1, sizeof(struct task_msg_subscriber_node));
-        if(subscriber==RT_NULL)
+        if (subscriber == RT_NULL)
         {
             goto ERROR;
         }
@@ -172,20 +178,19 @@ int task_msg_subscriber_create2(const enum task_msg_name *msg_name_list, rt_uint
         count++;
     }
 
-    if(count>0)
+    if (count > 0)
     {
         rt_mutex_release(&sub_lock);
         return id;
     }
 
-ERROR:
-    rt_sem_delete(sem);
-    if(count>0)
+    ERROR: rt_sem_delete(sem);
+    if (count > 0)
     {
         task_msg_subscriber_node_t subscriber;
         rt_slist_for_each_entry(subscriber, &msg_subscriber_slist, slist)
         {
-            if(subscriber->subscriber_id == id)
+            if (subscriber->subscriber_id == id)
             {
                 rt_slist_remove(&msg_subscriber_slist, &(subscriber->slist));
                 rt_free(subscriber);
@@ -210,7 +215,7 @@ void task_msg_subscriber_delete(int subscriber_id)
     rt_mutex_take(&wt_lock, RT_WAITING_FOREVER);
     rt_slist_for_each_entry(wait_node, &msg_wait_slist, slist)
     {
-        if(wait_node->subscriber->subscriber_id == subscriber_id)
+        if (wait_node->subscriber->subscriber_id == subscriber_id)
         {
             rt_slist_remove(&msg_wait_slist, &(wait_node->slist));
             task_msg_release(wait_node->args);
@@ -221,10 +226,10 @@ void task_msg_subscriber_delete(int subscriber_id)
     rt_mutex_take(&sub_lock, RT_WAITING_FOREVER);
     rt_slist_for_each_entry(subscriber, &msg_subscriber_slist, slist)
     {
-        if(subscriber->subscriber_id == subscriber_id)
+        if (subscriber->subscriber_id == subscriber_id)
         {
             rt_slist_remove(&msg_subscriber_slist, &(subscriber->slist));
-            if(first_tag)
+            if (first_tag)
             {
                 rt_sem_delete(subscriber->sem);
                 first_tag = RT_FALSE;
@@ -246,7 +251,8 @@ void task_msg_subscriber_delete(int subscriber_id)
  */
 rt_err_t task_msg_wait_until(int subscriber_id, rt_int32_t timeout_ms, struct task_msg_args **out_args)
 {
-    if(task_msg_bus_init_tag==RT_FALSE) return -RT_EINVAL;
+    if (task_msg_bus_init_tag == RT_FALSE)
+        return -RT_EINVAL;
 
     rt_err_t rst = -RT_ERROR;
     task_msg_subscriber_node_t subscriber;
@@ -255,7 +261,7 @@ rt_err_t task_msg_wait_until(int subscriber_id, rt_int32_t timeout_ms, struct ta
     rt_mutex_take(&sub_lock, RT_WAITING_FOREVER);
     rt_slist_for_each_entry(subscriber, &msg_subscriber_slist, slist)
     {
-        if(subscriber->subscriber_id == subscriber_id)
+        if (subscriber->subscriber_id == subscriber_id)
         {
             sem = subscriber->sem;
             break;
@@ -263,21 +269,21 @@ rt_err_t task_msg_wait_until(int subscriber_id, rt_int32_t timeout_ms, struct ta
     }
     rt_mutex_release(&sub_lock);
 
-    if(sem==RT_NULL)
+    if (sem == RT_NULL)
     {
         rt_thread_mdelay(timeout_ms);
         return -RT_EINVAL;
     }
 
     rst = rt_sem_take(sem, timeout_ms);
-    if(rst==RT_EOK)
+    if (rst == RT_EOK)
     {
         rst = -RT_EINVAL;
         task_msg_wait_node_t wait_node;
         rt_mutex_take(&wt_lock, RT_WAITING_FOREVER);
         rt_slist_for_each_entry(wait_node, &msg_wait_slist, slist)
         {
-            if(wait_node->subscriber->subscriber_id == subscriber_id)
+            if (wait_node->subscriber->subscriber_id == subscriber_id)
             {
                 *out_args = wait_node->args;
                 rt_slist_remove(&msg_wait_slist, &(wait_node->slist));
@@ -299,29 +305,30 @@ rt_err_t task_msg_wait_until(int subscriber_id, rt_int32_t timeout_ms, struct ta
  * @param callback: callback function name
  * @return error code
  */
-rt_err_t task_msg_subscribe(enum task_msg_name msg_name, void(*callback)(task_msg_args_t msg_args))
+rt_err_t task_msg_subscribe(enum task_msg_name msg_name, void (*callback)(task_msg_args_t msg_args))
 {
-    if(task_msg_bus_init_tag==RT_FALSE || callback==RT_NULL) return -RT_EINVAL;
+    if (task_msg_bus_init_tag == RT_FALSE || callback == RT_NULL)
+        return -RT_EINVAL;
 
     rt_mutex_take(&cb_lock, RT_WAITING_FOREVER);
     rt_bool_t find_tag = RT_FALSE;
     task_msg_callback_node_t node;
     rt_slist_for_each_entry(node, &callback_slist_array[msg_name], slist)
     {
-        if(node->callback == callback)
+        if (node->callback == callback)
         {
             find_tag = RT_TRUE;
             break;
         }
     }
-    if(find_tag)
+    if (find_tag)
     {
         LOG_W("this task msg callback with msg_name[%d] is exist!", msg_name);
     }
     else
     {
         task_msg_callback_node_t callback_node = rt_calloc(1, sizeof(struct task_msg_callback_node));
-        if(callback_node == RT_NULL)
+        if (callback_node == RT_NULL)
         {
             rt_mutex_release(&cb_lock);
             LOG_E("there is no memory available!");
@@ -343,15 +350,16 @@ rt_err_t task_msg_subscribe(enum task_msg_name msg_name, void(*callback)(task_ms
  * @param callback: callback function name
  * @return error code
  */
-rt_err_t task_msg_unsubscribe(enum task_msg_name msg_name, void(*callback)(task_msg_args_t msg_args))
+rt_err_t task_msg_unsubscribe(enum task_msg_name msg_name, void (*callback)(task_msg_args_t msg_args))
 {
-    if(task_msg_bus_init_tag==RT_FALSE || callback==RT_NULL) return -RT_EINVAL;
+    if (task_msg_bus_init_tag == RT_FALSE || callback == RT_NULL)
+        return -RT_EINVAL;
 
     task_msg_callback_node_t node;
     rt_mutex_take(&cb_lock, RT_WAITING_FOREVER);
     rt_slist_for_each_entry(node, &callback_slist_array[msg_name], slist)
     {
-        if(node->callback == callback)
+        if (node->callback == callback)
         {
             rt_slist_remove(&callback_slist_array[msg_name], &(node->slist));
             rt_free(node);
@@ -373,17 +381,18 @@ rt_err_t task_msg_unsubscribe(enum task_msg_name msg_name, void(*callback)(task_
  */
 rt_err_t task_msg_publish_obj(enum task_msg_name msg_name, void *msg_obj, rt_size_t msg_size)
 {
-    if(task_msg_bus_init_tag==RT_FALSE) return -RT_EINVAL;
+    if (task_msg_bus_init_tag == RT_FALSE)
+        return -RT_EINVAL;
 
     task_msg_args_node_t node = rt_calloc(1, sizeof(struct task_msg_args_node));
-    if(node == RT_NULL)
+    if (node == RT_NULL)
     {
         LOG_E("task msg publish failed! args_node create failed!");
         return -RT_ENOMEM;
     }
 
     task_msg_args_t msg_args = rt_calloc(1, sizeof(struct task_msg_args));
-    if(msg_args == RT_NULL)
+    if (msg_args == RT_NULL)
     {
         rt_free(node);
         LOG_E("task msg publish failed! msg_args create failed!");
@@ -392,10 +401,10 @@ rt_err_t task_msg_publish_obj(enum task_msg_name msg_name, void *msg_obj, rt_siz
 
     msg_args->msg_name = msg_name;
     msg_args->msg_obj = RT_NULL;
-    if(msg_obj && msg_size>0)
+    if (msg_obj && msg_size > 0)
     {
         msg_args->msg_obj = rt_calloc(1, msg_size);
-        if(msg_args->msg_obj)
+        if (msg_args->msg_obj)
         {
             rt_memcpy(msg_args->msg_obj, msg_obj, msg_size);
         }
@@ -426,22 +435,201 @@ rt_err_t task_msg_publish_obj(enum task_msg_name msg_name, void *msg_obj, rt_siz
  */
 rt_err_t task_msg_publish(enum task_msg_name msg_name, const char *msg_text)
 {
-    void *msg_obj = (void *)msg_text;
+    void *msg_obj = (void *) msg_text;
     rt_size_t args_size = 0;
-    if(msg_obj)
+    if (msg_obj)
     {
         args_size = rt_strlen(msg_text) + 1;
     }
     return task_msg_publish_obj(msg_name, msg_obj, args_size);
 }
 
+static void msg_timing_timeout(void *params)
+{
+    task_msg_loop_t msg_timing = (task_msg_loop_t) params;
+    task_msg_publish_obj(msg_timing->msg_name, msg_timing->msg_obj, msg_timing->msg_size);
+    if (msg_timing->msg_obj)
+    {
+        rt_free(msg_timing->msg_obj);
+        msg_timing->msg_obj = RT_NULL;
+    }
+    rt_timer_delete(msg_timing->timer);
+    msg_timing->timer = RT_NULL;
+    rt_free(msg_timing);
+}
+/**
+ * Publish a delay message object.
+ * @param delay_ms: delay ms
+ * @param msg_name: message name
+ * @param msg_obj: message object
+ * @param msg_size: message size
+ * @return error code
+ */
+rt_err_t task_msg_delay_publish_obj(rt_uint32_t delay_ms, enum task_msg_name msg_name, void *msg_obj,
+        rt_size_t msg_size)
+{
+    task_msg_loop_t msg_loop = rt_calloc(1, sizeof(struct task_msg_loop));
+    if (msg_loop == RT_NULL)
+        return -RT_ENOMEM;
+
+    msg_loop->msg_name = msg_name;
+    msg_loop->msg_obj = RT_NULL;
+    msg_loop->msg_size = 0;
+    if (msg_obj && msg_size > 0)
+    {
+        msg_loop->msg_obj = rt_calloc(1, msg_size);
+        if (msg_loop->msg_obj == RT_NULL)
+        {
+            rt_free(msg_loop);
+            return -RT_ENOMEM;
+        }
+        rt_memcpy(msg_loop->msg_obj, msg_obj, msg_size);
+    }
+    char name[RT_NAME_MAX];
+    rt_snprintf(name, RT_NAME_MAX, "msg_t%d", msg_name);
+    msg_loop->timer = rt_timer_create(name, msg_timing_timeout, msg_loop, rt_tick_from_millisecond(delay_ms),
+    RT_TIMER_FLAG_SOFT_TIMER | RT_TIMER_FLAG_ONE_SHOT);
+    if (msg_loop->timer == RT_NULL)
+    {
+        if (msg_loop->msg_obj)
+            rt_free(msg_loop->msg_obj);
+        rt_free(msg_loop);
+        return -RT_ENOMEM;
+    }
+
+    rt_err_t rst = rt_timer_start(msg_loop->timer);
+    if (rst != RT_EOK)
+    {
+        if (msg_loop->msg_obj)
+            rt_free(msg_loop->msg_obj);
+        rt_timer_delete(msg_loop->timer);
+        rt_free(msg_loop);
+    }
+
+    return rst;
+}
+/**
+ * Publish a delay text message.
+ * @param delay_ms: delay ms
+ * @param msg_name: message name
+ * @param msg_text: message text
+ * @return error code
+ */
+rt_err_t task_msg_delay_publish(rt_uint32_t delay_ms, enum task_msg_name msg_name, const char *msg_text)
+{
+    void *msg_obj = (void *) msg_text;
+    rt_size_t args_size = 0;
+    if (msg_obj)
+    {
+        args_size = rt_strlen(msg_text) + 1;
+    }
+    return task_msg_delay_publish_obj(delay_ms, msg_name, msg_obj, args_size);
+}
+
+static void msg_loop_timeout(void *params)
+{
+    task_msg_loop_t msg_timing = (task_msg_loop_t) params;
+    task_msg_publish_obj(msg_timing->msg_name, msg_timing->msg_obj, msg_timing->msg_size);
+}
+/**
+ * create a loop message
+ * @param delay_ms
+ * @param msg_name
+ * @param msg_obj
+ * @param msg_size
+ * @return error code
+ */
+task_msg_loop_t task_msg_loop_create(rt_uint32_t delay_ms, enum task_msg_name msg_name, void *msg_obj,
+        rt_size_t msg_size)
+{
+    task_msg_loop_t msg_loop = rt_calloc(1, sizeof(struct task_msg_loop));
+    if (msg_loop == RT_NULL)
+        return RT_NULL;
+
+    msg_loop->msg_name = msg_name;
+    msg_loop->msg_obj = RT_NULL;
+    msg_loop->msg_size = 0;
+    if (msg_obj && msg_size > 0)
+    {
+        msg_loop->msg_obj = rt_calloc(1, msg_size);
+        if (msg_loop->msg_obj == RT_NULL)
+        {
+            rt_free(msg_loop);
+            return RT_NULL;
+        }
+        rt_memcpy(msg_loop->msg_obj, msg_obj, msg_size);
+    }
+    char name[RT_NAME_MAX];
+    rt_snprintf(name, RT_NAME_MAX, "msg_l%d", msg_name);
+    msg_loop->timer = rt_timer_create(name, msg_loop_timeout, msg_loop, rt_tick_from_millisecond(delay_ms),
+    RT_TIMER_FLAG_SOFT_TIMER | RT_TIMER_FLAG_PERIODIC);
+    if (msg_loop->timer == RT_NULL)
+    {
+        if (msg_loop->msg_obj)
+            rt_free(msg_loop->msg_obj);
+        rt_free(msg_loop);
+        return RT_NULL;
+    }
+
+    return msg_loop;
+}
+/**
+ * start a loop message
+ * @param msg_timing
+ * @return error code
+ */
+rt_err_t task_msg_loop_start(task_msg_loop_t msg_timing)
+{
+    if (msg_timing == RT_NULL || msg_timing->timer == RT_NULL)
+        return -RT_EEMPTY;
+
+    return rt_timer_start(msg_timing->timer);
+}
+/**
+ * stop a loop message
+ * @param msg_timing
+ * @return error code
+ */
+rt_err_t task_msg_loop_stop(task_msg_loop_t msg_timing)
+{
+    if (msg_timing == RT_NULL || msg_timing->timer == RT_NULL)
+        return -RT_EEMPTY;
+
+    return rt_timer_stop(msg_timing->timer);
+}
+/**
+ * delete a loop message
+ * @param msg_timing
+ * @return error code
+ */
+rt_err_t task_msg_loop_delete(task_msg_loop_t msg_timing)
+{
+    rt_err_t rst;
+
+    if (msg_timing == RT_NULL)
+        return -RT_EEMPTY;
+
+    if (msg_timing->msg_obj)
+    {
+        rt_free(msg_timing->msg_obj);
+        msg_timing->msg_obj = RT_NULL;
+    }
+    if (msg_timing->timer)
+    {
+        rst = rt_timer_delete(msg_timing->timer);
+        msg_timing->timer = RT_NULL;
+    }
+    rt_free(msg_timing);
+
+    return rst;
+}
 /**
  * Initialize the callback slist array.
  */
 static void task_msg_callback_init(void)
 {
     rt_mutex_take(&cb_lock, RT_WAITING_FOREVER);
-    for(int i=0; i<TASK_MSG_COUNT; i++)
+    for (int i = 0; i < TASK_MSG_COUNT; i++)
     {
         callback_slist_array[i].next = RT_NULL;
     }
@@ -454,15 +642,15 @@ static void task_msg_callback_init(void)
  */
 static void task_msg_bus_thread_entry(void *params)
 {
-    while(1)
+    while (1)
     {
-        if(rt_sem_take(&msg_sem, RT_WAITING_FOREVER) == RT_EOK)
+        if (rt_sem_take(&msg_sem, RT_WAITING_FOREVER) == RT_EOK)
         {
             task_msg_args_node_t msg_args_node;
             task_msg_callback_node_t msg_callback_node;
             task_msg_subscriber_node_t subscriber;
             task_msg_wait_node_t msg_wait_node;
-            if(rt_slist_len(&msg_slist) > 0)
+            if (rt_slist_len(&msg_slist) > 0)
             {
                 //get msg
                 msg_args_node = rt_slist_first_entry(&msg_slist, struct task_msg_args_node, slist);
@@ -471,10 +659,10 @@ static void task_msg_bus_thread_entry(void *params)
                 rt_mutex_take(&sub_lock, RT_WAITING_FOREVER);
                 rt_slist_for_each_entry(subscriber, &msg_subscriber_slist, slist)
                 {
-                    if(subscriber->msg_name == msg_args_node->args->msg_name)
+                    if (subscriber->msg_name == msg_args_node->args->msg_name)
                     {
                         msg_wait_node = rt_calloc(1, sizeof(struct task_msg_wait_node));
-                        if(msg_wait_node==RT_NULL)
+                        if (msg_wait_node == RT_NULL)
                         {
                             LOG_W("no memory to create msg_wait_node!");
                             break;
@@ -497,7 +685,7 @@ static void task_msg_bus_thread_entry(void *params)
                 rt_mutex_take(&cb_lock, RT_WAITING_FOREVER);
                 rt_slist_for_each_entry(msg_callback_node, &callback_slist_array[msg_args_node->args->msg_name], slist)
                 {
-                    if(msg_callback_node->callback)
+                    if (msg_callback_node->callback)
                     {
                         msg_callback_node->callback(msg_args_node->args);
                     }
@@ -523,9 +711,10 @@ static void task_msg_bus_thread_entry(void *params)
  * @param tick: thread tick
  * @return error code
  */
-rt_err_t task_msg_bus_init(rt_uint32_t stack_size, rt_uint8_t  priority, rt_uint32_t tick)
+rt_err_t task_msg_bus_init(rt_uint32_t stack_size, rt_uint8_t priority, rt_uint32_t tick)
 {
-    if(task_msg_bus_init_tag) return -RT_EBUSY;
+    if (task_msg_bus_init_tag)
+        return -RT_EBUSY;
 
     rt_sem_init(&msg_sem, "msg_sem", 0, RT_IPC_FLAG_FIFO);
     rt_mutex_init(&msg_lock, "msg_lock", RT_IPC_FLAG_FIFO);
@@ -536,20 +725,16 @@ rt_err_t task_msg_bus_init(rt_uint32_t stack_size, rt_uint8_t  priority, rt_uint
     task_msg_callback_init();
     task_msg_bus_init_tag = RT_TRUE;
 
-    rt_thread_t t = rt_thread_create("msg_bus",
-            task_msg_bus_thread_entry,
-            RT_NULL,
-            stack_size,
-            priority,
-            tick);
-    if(t==RT_NULL)
+    rt_thread_t t = rt_thread_create("msg_bus", task_msg_bus_thread_entry,
+    RT_NULL, stack_size, priority, tick);
+    if (t == RT_NULL)
     {
         LOG_E("task msg bus initialize failed! msg_bus_thread create failed!");
         return -RT_ENOMEM;
     }
 
     rt_err_t rst = rt_thread_startup(t);
-    if(rst == RT_EOK)
+    if (rst == RT_EOK)
     {
         LOG_I("task msg bus initialize success!");
     }