Browse Source

component/bt: modify the implementation of osi_thread using freeRTOS queue

wangmengyang 3 năm trước cách đây
mục cha
commit
77e98e468d
1 tập tin đã thay đổi với 112 bổ sung24 xóa
  1. 112 24
      components/bt/common/osi/thread.c

+ 112 - 24
components/bt/common/osi/thread.c

@@ -19,16 +19,27 @@
 #include <string.h>
 
 #include "osi/allocator.h"
-#include "osi/fixed_queue.h"
+#include "freertos/FreeRTOS.h"
+#include "freertos/queue.h"
 #include "osi/semaphore.h"
 #include "osi/thread.h"
 
+struct work_item {
+    osi_thread_func_t func;
+    void *context;
+};
+
+struct work_queue {
+    QueueHandle_t queue;
+    size_t capacity;
+};
+
 struct osi_thread {
   TaskHandle_t thread_handle;           /*!< Store the thread object */
   int  thread_id;                       /*!< May for some OS, such as Linux */
   bool stop;
   uint8_t work_queue_num;               /*!< Work queue number */
-  fixed_queue_t **work_queues;          /*!< Point to queue array, and the priority inverse array index */
+  struct work_queue **work_queues;      /*!< Point to queue array, and the priority inverse array index */
   osi_sem_t work_sem;
   osi_sem_t stop_sem;
 };
@@ -39,13 +50,90 @@ struct osi_thread_start_arg {
   int error;
 };
 
-typedef struct {
-  osi_thread_func_t func;
-  void *context;
-} work_item_t;
-
 static const size_t DEFAULT_WORK_QUEUE_CAPACITY = 100;
 
+static struct work_queue *osi_work_queue_create(size_t capacity)
+{
+    if (capacity == 0) {
+        return NULL;
+    }
+
+    struct work_queue *wq = (struct work_queue *)osi_malloc(sizeof(struct work_queue));
+    if (wq != NULL) {
+        wq->queue = xQueueCreate(capacity, sizeof(struct work_item));
+        if (wq->queue != 0) {
+            wq->capacity = capacity;
+            return wq;
+        } else {
+            osi_free(wq);
+        }
+    }
+
+    return NULL;
+}
+
+static void osi_work_queue_delete(struct work_queue *wq)
+{
+    if (wq != NULL) {
+        if (wq->queue != 0) {
+            vQueueDelete(wq->queue);
+        }
+        wq->queue = 0;
+        wq->capacity = 0;
+        osi_free(wq);
+    }
+    return;
+}
+
+static bool osi_thead_work_queue_get(struct work_queue *wq, struct work_item *item)
+{
+    assert (wq != NULL);
+    assert (wq->queue != 0);
+    assert (item != NULL);
+
+    if (pdTRUE == xQueueReceive(wq->queue, item, 0)) {
+        return true;
+    } else {
+        return false;
+    }
+}
+
+static bool osi_thead_work_queue_put(struct work_queue *wq, const struct work_item *item, uint32_t timeout)
+{
+    assert (wq != NULL);
+    assert (wq->queue != 0);
+    assert (item != NULL);
+
+    bool ret = true;
+    if (timeout ==  OSI_SEM_MAX_TIMEOUT) {
+        if (xQueueSend(wq->queue, item, portMAX_DELAY) != pdTRUE) {
+            ret = false;
+        }
+    } else {
+        if (xQueueSend(wq->queue, item, timeout / portTICK_PERIOD_MS) != pdTRUE) {
+            ret = false;
+        }
+    }
+
+    return ret;
+}
+
+static size_t osi_thead_work_queue_len(struct work_queue *wq)
+{
+    assert (wq != NULL);
+    assert (wq->queue != 0);
+    assert (wq->capacity != 0);
+
+    size_t available_spaces = (size_t)uxQueueSpacesAvailable(wq->queue);
+
+    if (available_spaces <= wq->capacity) {
+        return wq->capacity - available_spaces;
+    } else {
+        assert (0);
+    }
+    return 0;
+}
+
 static void osi_thread_run(void *arg)
 {
     struct osi_thread_start_arg *start = (struct osi_thread_start_arg *)arg;
@@ -62,11 +150,10 @@ static void osi_thread_run(void *arg)
             break;
         }
 
+        struct work_item item;
         while (!thread->stop && idx < thread->work_queue_num) {
-            work_item_t *item = fixed_queue_dequeue(thread->work_queues[idx], 0);
-            if (item) {
-                item->func(item->context);
-                osi_free(item);
+            if (osi_thead_work_queue_get(thread->work_queues[idx], &item) == true) {
+                item.func(item.context);
                 idx = 0;
                 continue;
             } else {
@@ -125,13 +212,13 @@ osi_thread_t *osi_thread_create(const char *name, size_t stack_size, int priorit
 
     thread->stop = false;
     thread->work_queue_num = work_queue_num;
-    thread->work_queues = (fixed_queue_t **)osi_malloc(sizeof(fixed_queue_t *) * work_queue_num);
+    thread->work_queues = (struct work_queue **)osi_malloc(sizeof(struct work_queue *) * work_queue_num);
     if (thread->work_queues == NULL) {
         goto _err;
     }
 
     for (int i = 0; i < thread->work_queue_num; i++) {
-        thread->work_queues[i] = fixed_queue_new(DEFAULT_WORK_QUEUE_CAPACITY);
+        thread->work_queues[i] = osi_work_queue_create(DEFAULT_WORK_QUEUE_CAPACITY);
         if (thread->work_queues[i] == NULL) {
             goto _err;
         }
@@ -175,12 +262,14 @@ _err:
 
         for (int i = 0; i < thread->work_queue_num; i++) {
             if (thread->work_queues[i]) {
-                fixed_queue_free(thread->work_queues[i], osi_free_func);
+                osi_work_queue_delete(thread->work_queues[i]);
             }
+            thread->work_queues[i] = NULL;
         }
 
         if (thread->work_queues) {
             osi_free(thread->work_queues);
+            thread->work_queues = NULL;
         }
 
         if (thread->work_sem) {
@@ -206,12 +295,14 @@ void osi_thread_free(osi_thread_t *thread)
 
     for (int i = 0; i < thread->work_queue_num; i++) {
         if (thread->work_queues[i]) {
-            fixed_queue_free(thread->work_queues[i], osi_free_func);
+            osi_work_queue_delete(thread->work_queues[i]);
+            thread->work_queues[i] = NULL;
         }
     }
 
     if (thread->work_queues) {
         osi_free(thread->work_queues);
+        thread->work_queues = NULL;
     }
 
     if (thread->work_sem) {
@@ -235,15 +326,12 @@ bool osi_thread_post(osi_thread_t *thread, osi_thread_func_t func, void *context
         return false;
     }
 
-    work_item_t *item = (work_item_t *)osi_malloc(sizeof(work_item_t));
-    if (item == NULL) {
-        return false;
-    }
-    item->func = func;
-    item->context = context;
+    struct work_item item;
+
+    item.func = func;
+    item.context = context;
 
-    if (fixed_queue_enqueue(thread->work_queues[queue_idx], item, timeout) == false) {
-        osi_free(item);
+    if (osi_thead_work_queue_put(thread->work_queues[queue_idx], &item, timeout) == false) {
         return false;
     }
 
@@ -273,5 +361,5 @@ int osi_thread_queue_wait_size(osi_thread_t *thread, int wq_idx)
         return -1;
     }
 
-    return fixed_queue_length(thread->work_queues[wq_idx]);
+    return (int)(osi_thead_work_queue_len(thread->work_queues[wq_idx]));
 }