瀏覽代碼

Implement POSIX semaphore support for linux platform (#1345)

Implement POSIX semaphore support for linux platform
Huang Qi 3 年之前
父節點
當前提交
f3f8d684b3

+ 3 - 0
build-scripts/config_common.cmake

@@ -191,6 +191,9 @@ endif ()
 if (WAMR_BUILD_LIB_PTHREAD EQUAL 1)
   message ("     Lib pthread enabled")
 endif ()
+if (WAMR_BUILD_LIB_PTHREAD_SEMAPHORE EQUAL 1)
+  message ("     Lib pthread semaphore enabled")
+endif ()
 if (WAMR_BUILD_LIBC_EMCC EQUAL 1)
   message ("     Libc emcc enabled")
 endif ()

+ 5 - 0
build-scripts/runtime_lib.cmake

@@ -86,6 +86,11 @@ elseif (WAMR_BUILD_LIBC_WASI EQUAL 1)
     include (${IWASM_DIR}/libraries/libc-wasi/libc_wasi.cmake)
 endif ()
 
+if (WAMR_BUILD_LIB_PTHREAD_SEMAPHORE EQUAL 1)
+    # Enable the dependent feature if lib pthread semaphore is enabled
+    set (WAMR_BUILD_LIB_PTHREAD 1)
+endif ()
+
 if (WAMR_BUILD_LIB_PTHREAD EQUAL 1)
     include (${IWASM_DIR}/libraries/lib-pthread/lib_pthread.cmake)
     # Enable the dependent feature if lib pthread is enabled

+ 4 - 0
core/config.h

@@ -139,6 +139,10 @@
 #define WASM_ENABLE_LIB_PTHREAD 0
 #endif
 
+#ifndef WASM_ENABLE_LIB_PTHREAD_SEMAPHORE
+#define WASM_ENABLE_LIB_PTHREAD_SEMAPHORE 0
+#endif
+
 #ifndef WASM_ENABLE_BASE_LIB
 #define WASM_ENABLE_BASE_LIB 0
 #endif

+ 4 - 0
core/iwasm/libraries/lib-pthread/lib_pthread.cmake

@@ -5,6 +5,10 @@ set (LIB_PTHREAD_DIR ${CMAKE_CURRENT_LIST_DIR})
 
 add_definitions (-DWASM_ENABLE_LIB_PTHREAD=1)
 
+if (WAMR_BUILD_LIB_PTHREAD_SEMAPHORE EQUAL 1)
+add_definitions (-DWASM_ENABLE_LIB_PTHREAD_SEMAPHORE=1)
+endif()
+
 include_directories(${LIB_PTHREAD_DIR})
 
 file (GLOB source_all ${LIB_PTHREAD_DIR}/*.c)

+ 214 - 0
core/iwasm/libraries/lib-pthread/lib_pthread_wrapper.c

@@ -54,6 +54,7 @@ enum {
     T_THREAD,
     T_MUTEX,
     T_COND,
+    T_SEM,
 };
 
 enum thread_status_t {
@@ -73,6 +74,12 @@ enum cond_status_t {
     COND_DESTROYED,
 };
 
+enum sem_status_t {
+    SEM_CREATED,
+    SEM_CLOSED,
+    SEM_DESTROYED,
+};
+
 typedef struct ThreadKeyValueNode {
     bh_list_link l;
     wasm_exec_env_t exec_env;
@@ -111,6 +118,9 @@ typedef struct ThreadInfoNode {
         korp_tid thread;
         korp_mutex *mutex;
         korp_cond *cond;
+#if WASM_ENABLE_LIB_PTHREAD_SEMAPHORE != 0
+        korp_sem *sem;
+#endif
         /* A copy of the thread return value */
         void *ret;
     } u;
@@ -125,7 +135,15 @@ typedef struct {
     wasm_module_inst_t module_inst;
 } ThreadRoutineArgs;
 
+typedef struct {
+    uint32 handle;
+    ThreadInfoNode *node;
+} SemCallbackArgs;
+
 static bh_list cluster_info_list;
+#if WASM_ENABLE_LIB_PTHREAD_SEMAPHORE != 0
+static HashMap *sem_info_map;
+#endif
 static korp_mutex thread_global_lock;
 static uint32 handle_id = 1;
 
@@ -160,6 +178,12 @@ thread_info_destroy(void *node)
             os_cond_destroy(info_node->u.cond);
         wasm_runtime_free(info_node->u.cond);
     }
+#if WASM_ENABLE_LIB_PTHREAD_SEMAPHORE != 0
+    else if (info_node->type == T_SEM) {
+        if (info_node->status != SEM_DESTROYED)
+            os_sem_close(info_node->u.sem);
+    }
+#endif
     wasm_runtime_free(info_node);
     os_mutex_unlock(&thread_global_lock);
 }
@@ -174,12 +198,23 @@ lib_pthread_init()
         os_mutex_destroy(&thread_global_lock);
         return false;
     }
+#if WASM_ENABLE_LIB_PTHREAD_SEMAPHORE != 0
+    if (!(sem_info_map = bh_hash_map_create(
+              32, true, (HashFunc)wasm_string_hash,
+              (KeyEqualFunc)wasm_string_equal, NULL, thread_info_destroy))) {
+        os_mutex_destroy(&thread_global_lock);
+        return false;
+    }
+#endif
     return true;
 }
 
 void
 lib_pthread_destroy()
 {
+#if WASM_ENABLE_LIB_PTHREAD_SEMAPHORE != 0
+    bh_hash_map_destroy(sem_info_map);
+#endif
     os_mutex_destroy(&thread_global_lock);
 }
 
@@ -1085,6 +1120,176 @@ posix_memalign_wrapper(wasm_exec_env_t exec_env, void **memptr, int32 align,
     return 0;
 }
 
+#if WASM_ENABLE_LIB_PTHREAD_SEMAPHORE != 0
+
+static int32
+sem_open_wrapper(wasm_exec_env_t exec_env, const char *name, int32 oflags,
+                 int32 mode, int32 val)
+{
+    korp_sem *psem = NULL;
+    ThreadInfoNode *info_node = NULL;
+
+    /**
+     * For RTOS, global semaphore map is safe for share the same semaphore
+     * between task/pthread.
+     * For Unix like system, it's dedicated for multiple processes.
+     */
+
+    if ((info_node = bh_hash_map_find(sem_info_map, (void *)name))) {
+        return info_node->handle;
+    }
+
+    if (!(psem = os_sem_open(name, oflags, mode, val))) {
+        goto fail1;
+    }
+
+    if (!(info_node = wasm_runtime_malloc(sizeof(ThreadInfoNode))))
+        goto fail2;
+
+    memset(info_node, 0, sizeof(ThreadInfoNode));
+    info_node->exec_env = exec_env;
+    info_node->handle = allocate_handle();
+    info_node->type = T_SEM;
+    info_node->u.sem = psem;
+    info_node->status = SEM_CREATED;
+
+    if (!bh_hash_map_insert(sem_info_map, (void *)name, info_node))
+        goto fail3;
+
+    return info_node->handle;
+
+fail3:
+    wasm_runtime_free(info_node);
+fail2:
+    os_sem_close(psem);
+fail1:
+    return -1;
+}
+
+void
+sem_fetch_cb(void *key, void *value, void *user_data)
+{
+    (void)key;
+    SemCallbackArgs *args = user_data;
+    ThreadInfoNode *info_node = value;
+    if (args->handle == info_node->handle && info_node->status == SEM_CREATED) {
+        args->node = info_node;
+    }
+}
+
+static int32
+sem_close_wrapper(wasm_exec_env_t exec_env, uint32 sem)
+{
+    (void)exec_env;
+    int ret = -1;
+    SemCallbackArgs args = { sem, NULL };
+
+    bh_hash_map_traverse(sem_info_map, sem_fetch_cb, &args);
+
+    if (args.node) {
+        ret = os_sem_close(args.node->u.sem);
+        if (ret == 0) {
+            args.node->status = SEM_CLOSED;
+        }
+    }
+
+    return ret;
+}
+
+static int32
+sem_wait_wrapper(wasm_exec_env_t exec_env, uint32 sem)
+{
+    (void)exec_env;
+    SemCallbackArgs args = { sem, NULL };
+
+    bh_hash_map_traverse(sem_info_map, sem_fetch_cb, &args);
+
+    if (args.node) {
+        return os_sem_wait(args.node->u.sem);
+    }
+
+    return -1;
+}
+
+static int32
+sem_trywait_wrapper(wasm_exec_env_t exec_env, uint32 sem)
+{
+    (void)exec_env;
+    SemCallbackArgs args = { sem, NULL };
+
+    bh_hash_map_traverse(sem_info_map, sem_fetch_cb, &args);
+
+    if (args.node) {
+        return os_sem_trywait(args.node->u.sem);
+    }
+
+    return -1;
+}
+
+static int32
+sem_post_wrapper(wasm_exec_env_t exec_env, uint32 sem)
+{
+    (void)exec_env;
+    SemCallbackArgs args = { sem, NULL };
+
+    bh_hash_map_traverse(sem_info_map, sem_fetch_cb, &args);
+
+    if (args.node) {
+        return os_sem_post(args.node->u.sem);
+    }
+
+    return -1;
+}
+
+static int32
+sem_getvalue_wrapper(wasm_exec_env_t exec_env, uint32 sem, int32 *sval)
+{
+    int32 ret = -1;
+    wasm_module_inst_t module_inst = get_module_inst(exec_env);
+
+    (void)exec_env;
+    SemCallbackArgs args = { sem, NULL };
+
+    if (validate_native_addr(sval, sizeof(int32))) {
+
+        bh_hash_map_traverse(sem_info_map, sem_fetch_cb, &args);
+
+        if (args.node) {
+            ret = os_sem_getvalue(args.node->u.sem, sval);
+        }
+    }
+    return ret;
+}
+
+static int32
+sem_unlink_wrapper(wasm_exec_env_t exec_env, const char *name)
+{
+    (void)exec_env;
+    int32 ret_val;
+
+    ThreadInfoNode *info_node = bh_hash_map_find(sem_info_map, (void *)name);
+    if (!info_node || info_node->type != T_SEM)
+        return -1;
+
+    if (info_node->status != SEM_CLOSED) {
+        ret_val = os_sem_close(info_node->u.sem);
+        if (ret_val != 0) {
+            return ret_val;
+        }
+    }
+
+    ret_val = os_sem_unlink(name);
+
+    if (ret_val == 0) {
+        bh_hash_map_remove(sem_info_map, (void *)name, NULL, NULL);
+        info_node->status = SEM_DESTROYED;
+        thread_info_destroy(info_node);
+    }
+    return ret_val;
+}
+
+#endif
+
 /* clang-format off */
 #define REG_NATIVE_FUNC(func_name, signature) \
     { #func_name, func_name##_wrapper, signature, NULL }
@@ -1113,6 +1318,15 @@ static NativeSymbol native_symbols_lib_pthread[] = {
     REG_NATIVE_FUNC(pthread_getspecific, "(i)i"),
     REG_NATIVE_FUNC(pthread_key_delete, "(i)i"),
     REG_NATIVE_FUNC(posix_memalign, "(*ii)i"),
+#if WASM_ENABLE_LIB_PTHREAD_SEMAPHORE != 0
+    REG_NATIVE_FUNC(sem_open, "($iii)i"),
+    REG_NATIVE_FUNC(sem_close, "(i)i"),
+    REG_NATIVE_FUNC(sem_wait, "(i)i"),
+    REG_NATIVE_FUNC(sem_trywait, "(i)i"),
+    REG_NATIVE_FUNC(sem_post, "(i)i"),
+    REG_NATIVE_FUNC(sem_getvalue, "(i*)i"),
+    REG_NATIVE_FUNC(sem_unlink, "($)i"),
+#endif
 };
 
 uint32

+ 1 - 0
core/shared/platform/android/platform_internal.h

@@ -55,6 +55,7 @@ typedef pthread_t korp_tid;
 typedef pthread_mutex_t korp_mutex;
 typedef pthread_cond_t korp_cond;
 typedef pthread_t korp_thread;
+typedef sem_t korp_sem;
 
 #define os_thread_local_attribute __thread
 

+ 42 - 0
core/shared/platform/common/posix/posix_thread.c

@@ -195,6 +195,48 @@ os_cond_wait(korp_cond *cond, korp_mutex *mutex)
     return BHT_OK;
 }
 
+korp_sem *
+os_sem_open(const char *name, int oflags, int mode, int val)
+{
+    return sem_open(name, oflags, mode, val);
+}
+
+int
+os_sem_close(korp_sem *sem)
+{
+    return sem_close(sem);
+}
+
+int
+os_sem_wait(korp_sem *sem)
+{
+    return sem_wait(sem);
+}
+
+int
+os_sem_trywait(korp_sem *sem)
+{
+    return sem_trywait(sem);
+}
+
+int
+os_sem_post(korp_sem *sem)
+{
+    return sem_post(sem);
+}
+
+int
+os_sem_getvalue(korp_sem *sem, int *sval)
+{
+    return sem_getvalue(sem, sval);
+}
+
+int
+os_sem_unlink(const char *name)
+{
+    return sem_unlink(name);
+}
+
 static void
 msec_nsec_to_abstime(struct timespec *ts, uint64 usec)
 {

+ 1 - 0
core/shared/platform/darwin/platform_internal.h

@@ -56,6 +56,7 @@ typedef pthread_t korp_tid;
 typedef pthread_mutex_t korp_mutex;
 typedef pthread_cond_t korp_cond;
 typedef pthread_t korp_thread;
+typedef sem_t korp_sem;
 
 #define os_thread_local_attribute __thread
 

+ 1 - 0
core/shared/platform/esp-idf/platform_internal.h

@@ -36,6 +36,7 @@ typedef pthread_t korp_tid;
 typedef pthread_mutex_t korp_mutex;
 typedef pthread_cond_t korp_cond;
 typedef pthread_t korp_thread;
+typedef unsigned int korp_sem;
 
 #define BH_APPLET_PRESERVED_STACK_SIZE (2 * BH_KB)
 

+ 85 - 0
core/shared/platform/include/platform_api_extension.h

@@ -195,6 +195,91 @@ os_cond_signal(korp_cond *cond);
 int
 os_cond_broadcast(korp_cond *cond);
 
+/**
+ * Creates a new POSIX-like semaphore or opens an existing
+ * semaphore.  The semaphore is identified by name.  For details of
+ * the construction of name, please refer to
+ * https://man7.org/linux/man-pages/man3/sem_open.3.html.
+ *
+ * @param name semaphore name
+ * @param oflasg specifies flags that control the operation of the call
+ * @param mode permission flags
+ * @param val initial value of the named semaphore.
+ *
+ * @return korp_sem * if success, NULL otherwise
+ */
+korp_sem *
+os_sem_open(const char *name, int oflags, int mode, int val);
+
+/**
+ * Closes the named semaphore referred to by sem,
+ * allowing any resources that the system has allocated to the
+ * calling process for this semaphore to be freed.
+ *
+ * @param sem
+ *
+ * @return 0 if success
+ */
+int
+os_sem_close(korp_sem *sem);
+
+/**
+ * Decrements (locks) the semaphore pointed to by sem.
+ * If the semaphore's value is greater than zero, then the decrement
+ * proceeds, and the function returns, immediately.  If the
+ * semaphore currently has the value zero, then the call blocks
+ * until either it becomes possible to perform the decrement (i.e.,
+ * the semaphore value rises above zero), or a signal handler
+ * interrupts the call.
+ *
+ * @return 0 if success
+ */
+int
+os_sem_wait(korp_sem *sem);
+
+/**
+ * Is the same as sem_wait(), except that if the
+ * decrement cannot be immediately performed, then call returns an
+ * error (errno set to EAGAIN) instead of blocking.
+ *
+ * @return 0 if success
+ */
+int
+os_sem_trywait(korp_sem *sem);
+
+/**
+ * Increments (unlocks) the semaphore pointed to by sem.
+ * If the semaphore's value consequently becomes greater than zero,
+ * then another process or thread blocked in a sem_wait(3) call will
+ * be woken up and proceed to lock the semaphore.
+ *
+ * @return 0 if success
+ */
+int
+os_sem_post(korp_sem *sem);
+
+/**
+ * Places the current value of the semaphore pointed
+ * to sem into the integer pointed to by sval.
+ *
+ * @return 0 if success
+ */
+int
+os_sem_getvalue(korp_sem *sem, int *sval);
+
+/**
+ * Remove the named semaphore referred to by name.
+ * The semaphore name is removed immediately.  The semaphore is
+ * destroyed once all other processes that have the semaphore open
+ * close it.
+ *
+ * @param name semaphore name
+ *
+ * @return 0 if success
+ */
+int
+os_sem_unlink(const char *name);
+
 /****************************************************
  *                     Section 2                    *
  *                   Socket support                 *

+ 1 - 0
core/shared/platform/linux-sgx/platform_internal.h

@@ -50,6 +50,7 @@ typedef pthread_t korp_thread;
 typedef pthread_t korp_tid;
 typedef pthread_mutex_t korp_mutex;
 typedef pthread_cond_t korp_cond;
+typedef unsigned int korp_sem;
 
 typedef void (*os_print_function_t)(const char *message);
 void

+ 1 - 0
core/shared/platform/linux/platform_internal.h

@@ -55,6 +55,7 @@ typedef pthread_t korp_tid;
 typedef pthread_mutex_t korp_mutex;
 typedef pthread_cond_t korp_cond;
 typedef pthread_t korp_thread;
+typedef sem_t korp_sem;
 
 #define os_thread_local_attribute __thread
 

+ 2 - 0
core/shared/platform/nuttx/platform_internal.h

@@ -25,6 +25,7 @@
 #include <sys/stat.h>
 #include <sys/time.h>
 #include <sys/mman.h>
+#include <semaphore.h>
 
 #ifdef __cplusplus
 extern "C" {
@@ -38,6 +39,7 @@ typedef pthread_t korp_tid;
 typedef pthread_mutex_t korp_mutex;
 typedef pthread_cond_t korp_cond;
 typedef pthread_t korp_thread;
+typedef sem_t korp_sem;
 
 #define BH_APPLET_PRESERVED_STACK_SIZE (2 * BH_KB)
 

+ 1 - 0
core/shared/platform/riot/platform_internal.h

@@ -38,6 +38,7 @@
 typedef thread_t korp_thread;
 typedef kernel_pid_t korp_tid;
 typedef mutex_t korp_mutex;
+typedef unsigned int korp_sem;
 
 /* typedef sema_t korp_sem; */
 

+ 1 - 0
core/shared/platform/rt-thread/platform_internal.h

@@ -36,6 +36,7 @@ typedef rt_thread_t korp_tid;
 typedef struct rt_mutex korp_mutex;
 typedef struct rt_thread korp_cond;
 typedef struct rt_thread korp_thread;
+typedef unsigned int korp_sem;
 
 typedef rt_uint8_t uint8_t;
 typedef rt_int8_t int8_t;

+ 1 - 0
core/shared/platform/vxworks/platform_internal.h

@@ -54,6 +54,7 @@ typedef pthread_t korp_tid;
 typedef pthread_mutex_t korp_mutex;
 typedef pthread_cond_t korp_cond;
 typedef pthread_t korp_thread;
+typedef sem_t korp_sem;
 
 #define os_thread_local_attribute __thread
 

+ 1 - 0
core/shared/platform/zephyr/platform_internal.h

@@ -49,6 +49,7 @@
 typedef struct k_thread korp_thread;
 typedef korp_thread *korp_tid;
 typedef struct k_mutex korp_mutex;
+typedef unsigned int korp_sem;
 
 struct os_thread_wait_node;
 typedef struct os_thread_wait_node *os_thread_wait_list;

+ 22 - 1
samples/multi-thread/wasm-apps/main.c

@@ -5,9 +5,11 @@
 
 #include <stdio.h>
 #include <pthread.h>
+#include <semaphore.h>
 
 static pthread_mutex_t mutex;
 static pthread_cond_t cond;
+static sem_t *sem;
 
 static void *
 thread(void *arg)
@@ -24,6 +26,7 @@ thread(void *arg)
 
     pthread_cond_signal(&cond);
     pthread_mutex_unlock(&mutex);
+    sem_post(sem);
 
     printf("thread exit \n");
 
@@ -45,10 +48,18 @@ main(int argc, char *argv[])
         goto fail1;
     }
 
+    // O_CREAT and S_IRGRPS_IRGRP | S_IWGRP on linux (glibc), initial value is 0
+
+    if (!(sem = sem_open("tessstsem", 0100, 0x10 | 0x20, 0))) {
+        printf("Failed to open sem. %p\n", sem);
+        goto fail2;
+    }
+
     pthread_mutex_lock(&mutex);
     if (pthread_create(&tid, NULL, thread, &num) != 0) {
         printf("Failed to create thread.\n");
-        goto fail2;
+        pthread_mutex_unlock(&mutex);
+        goto fail3;
     }
 
     printf("cond wait start\n");
@@ -56,12 +67,22 @@ main(int argc, char *argv[])
     pthread_mutex_unlock(&mutex);
     printf("cond wait success.\n");
 
+    if (sem_wait(sem) != 0) {
+        printf("Failed to wait sem.\n");
+    }
+    else {
+        printf("sem wait success.\n");
+    }
+
     if (pthread_join(tid, NULL) != 0) {
         printf("Failed to join thread.\n");
     }
 
     ret = 0;
 
+fail3:
+    sem_close(sem);
+    sem_unlink("tessstsem");
 fail2:
     pthread_cond_destroy(&cond);
 fail1:

+ 44 - 0
wamr-sdk/app/libc-builtin-sysroot/include/semaphore.h

@@ -0,0 +1,44 @@
+/*
+ * Copyright (C) 2019 Intel Corporation.  All rights reserved.
+ * SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception
+ */
+
+#ifndef _WAMR_LIB_SEMAPHORE_H
+#define _WAMR_LIB_SEMAPHORE_H
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+#include <stdint.h>
+
+typedef unsigned int sem_t;
+
+/* Semaphore APIs */
+
+sem_t *
+sem_open(const char *name, int oflag, int mode, int val);
+
+int
+sem_wait(sem_t *sem);
+
+int
+sem_trywait(sem_t *sem);
+
+int
+sem_post(sem_t *sem);
+
+int
+sem_getvalue(sem_t *restrict sem, int *sval);
+
+int
+sem_unlink(const char *name);
+
+int
+sem_close(sem_t *sem);
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif /* end of _WAMR_LIB_SEMAPHORE_H */