Răsfoiți Sursa

feat(wasi-threads): Improve thread id allocator to reuse identifiers (#1809)

This PR allows reusing thread ids once they are released. That is done by using
a stack data structure to keep track of the used ids.
When a thread is created, it takes an available identifier from the stack. When
the thread exits, it returns the id to the stack of available identifiers.
Enrico Loparco 3 ani în urmă
părinte
comite
0c85cb1fe6

+ 78 - 8
core/iwasm/libraries/lib-wasi-threads/lib_wasi_threads_wrapper.c

@@ -18,6 +18,14 @@ static const char *THREAD_START_FUNCTION = "wasi_thread_start";
 
 static korp_mutex thread_id_lock;
 
+// Stack data structure to track available thread identifiers
+#define AVAIL_TIDS_INIT_SIZE CLUSTER_MAX_THREAD_NUM
+typedef struct {
+    int32 *ids;
+    uint32 pos, size;
+} AvailableThreadIds;
+static AvailableThreadIds avail_tids;
+
 typedef struct {
     /* app's entry function */
     wasm_function_inst_t start_func;
@@ -30,16 +38,56 @@ typedef struct {
 static int32
 allocate_thread_id()
 {
-    static int32 thread_id = 0;
-
-    int32 id;
+    int32 id = -1;
 
     os_mutex_lock(&thread_id_lock);
-    id = thread_id++;
+    if (avail_tids.pos == 0) { // Resize stack and push new thread ids
+        uint32 old_size = avail_tids.size;
+        uint32 new_size = avail_tids.size * 2;
+        if (new_size / 2 != avail_tids.size) {
+            LOG_ERROR("Overflow detected during new size calculation");
+            goto return_id;
+        }
+
+        size_t realloc_size = new_size * sizeof(int32);
+        if (realloc_size / sizeof(int32) != new_size) {
+            LOG_ERROR("Overflow detected during realloc");
+            goto return_id;
+        }
+        int32 *tmp =
+            (int32 *)wasm_runtime_realloc(avail_tids.ids, realloc_size);
+        if (tmp == NULL) {
+            LOG_ERROR("Thread ID allocator realloc failed");
+            goto return_id;
+        }
+
+        avail_tids.size = new_size;
+        avail_tids.pos = old_size;
+        avail_tids.ids = tmp;
+        for (uint32 i = 0; i < old_size; i++)
+            avail_tids.ids[i] = new_size - i;
+    }
+
+    // Pop available thread identifier from `avail_tids` stack
+    id = avail_tids.ids[--avail_tids.pos];
+
+return_id:
     os_mutex_unlock(&thread_id_lock);
     return id;
 }
 
+void
+deallocate_thread_id(int32 thread_id)
+{
+    os_mutex_lock(&thread_id_lock);
+
+    // Release thread identifier by pushing it into `avail_tids` stack
+    bh_assert(avail_tids.pos < avail_tids.size);
+    avail_tids.ids[avail_tids.pos++] = thread_id;
+
+    os_mutex_unlock(&thread_id_lock);
+}
+
 static void *
 thread_start(void *arg)
 {
@@ -57,6 +105,8 @@ thread_start(void *arg)
             wasm_cluster_spread_exception(exec_env);
     }
 
+    // Routine exit
+    deallocate_thread_id(thread_arg->thread_id);
     wasm_runtime_free(thread_arg);
     exec_env->thread_arg = NULL;
 
@@ -101,15 +151,19 @@ thread_spawn_wrapper(wasm_exec_env_t exec_env, uint32 start_arg)
     if (!start_func) {
         LOG_ERROR("Failed to find thread start function %s",
                   THREAD_START_FUNCTION);
-        goto thread_spawn_fail;
+        goto thread_preparation_fail;
     }
 
     if (!(thread_start_arg = wasm_runtime_malloc(sizeof(ThreadStartArg)))) {
         LOG_ERROR("Runtime args allocation failed");
-        goto thread_spawn_fail;
+        goto thread_preparation_fail;
     }
 
     thread_start_arg->thread_id = thread_id = allocate_thread_id();
+    if (thread_id < 0) {
+        LOG_ERROR("Failed to get thread identifier");
+        goto thread_preparation_fail;
+    }
     thread_start_arg->arg = start_arg;
     thread_start_arg->start_func = start_func;
 
@@ -117,7 +171,6 @@ thread_spawn_wrapper(wasm_exec_env_t exec_env, uint32 start_arg)
     ret = wasm_cluster_create_thread(exec_env, new_module_inst, thread_start,
                                      thread_start_arg);
     if (ret != 0) {
-        os_mutex_unlock(&exec_env->wait_lock);
         LOG_ERROR("Failed to spawn a new thread");
         goto thread_spawn_fail;
     }
@@ -126,9 +179,12 @@ thread_spawn_wrapper(wasm_exec_env_t exec_env, uint32 start_arg)
     return thread_id;
 
 thread_spawn_fail:
+    os_mutex_unlock(&exec_env->wait_lock);
+    deallocate_thread_id(thread_id);
+
+thread_preparation_fail:
     if (new_module_inst)
         wasm_runtime_deinstantiate_internal(new_module_inst, true);
-
     if (thread_start_arg)
         wasm_runtime_free(thread_start_arg);
 
@@ -156,11 +212,25 @@ lib_wasi_threads_init(void)
     if (0 != os_mutex_init(&thread_id_lock))
         return false;
 
+    // Initialize stack to store thread identifiers
+    avail_tids.size = AVAIL_TIDS_INIT_SIZE;
+    avail_tids.pos = avail_tids.size;
+    avail_tids.ids =
+        (int32 *)wasm_runtime_malloc(avail_tids.size * sizeof(int32));
+    if (avail_tids.ids == NULL) {
+        os_mutex_destroy(&thread_id_lock);
+        return false;
+    }
+    for (uint32 i = 0; i < avail_tids.size; i++)
+        avail_tids.ids[i] = avail_tids.size - i;
+
     return true;
 }
 
 void
 lib_wasi_threads_destroy(void)
 {
+    wasm_runtime_free(avail_tids.ids);
+    avail_tids.ids = NULL;
     os_mutex_destroy(&thread_id_lock);
 }