Jelajahi Sumber

Merge pull request #43 from SunXofRTT/master

net/request/mqtt修正
Bernard Xiong 7 tahun lalu
induk
melakukan
555d14499e

+ 11 - 17
rtthread-port/jerry_mqtt.c

@@ -1,6 +1,9 @@
 #include "jerry_mqtt.h"
-#include <rtdbg.h>
 
+#ifdef PKG_USING_PAHOMQTT
+
+#include <rtdbg.h>
+#define PIPE_BUFSZ    512
 static bool hasClient = false;
 
 void mqtt_event_callback(const void *args, uint32_t size)
@@ -137,12 +140,11 @@ void mqtt_offline_callback(MQTTClient *c)
 
 void mqtt_client_free_callback(void *native_p)
 {
-    rt_kprintf("=============free call back============\n");
     mqtt_info_t* mqtt_info =  (mqtt_info_t*)native_p;
 
     if(!mqtt_info)
     {
-        rt_kprintf("close >> null\n");
+		return;
     }
     
     if(mqtt_info->client->isconnected == 1)
@@ -150,15 +152,14 @@ void mqtt_client_free_callback(void *native_p)
         MQTT_CMD(mqtt_info->client,"DISCONNECT");
     }
 
-    rt_kprintf("1\n");
     if(mqtt_info->sem)
     {
         rt_sem_delete(mqtt_info->sem);
         mqtt_info->sem = RT_NULL;
+		js_destroy_emitter(mqtt_info->this_value);
     }
-    js_destroy_emitter(mqtt_info->this_value);
+    
 
-    rt_kprintf("2\n");
     if(mqtt_info->close_callback)
     {
         js_remove_callback(mqtt_info->close_callback);
@@ -174,37 +175,30 @@ void mqtt_client_free_callback(void *native_p)
         js_remove_callback(mqtt_info->fun_callback);
     }
 
-    rt_kprintf("3\n");
     if(mqtt_info->client)
     {
-        rt_kprintf("3-1\n");
         if(mqtt_info->client->uri)
         {
             free((void*)mqtt_info->client->uri);
         }
-        rt_kprintf("3-2\n");
         if(mqtt_info->client->buf)
         {
             free(mqtt_info->client->buf);
         }
-        rt_kprintf("3-3\n");
         if(mqtt_info->client->readbuf)
         {
             free(mqtt_info->client->readbuf);
         }
-        rt_kprintf("3-4\n");
         for(int i =0 ; i < MAX_MESSAGE_HANDLERS ; i++)
         {
             if(mqtt_info->client->messageHandlers[i].topicFilter)
             {
-                rt_kprintf("free topic : %s\n",mqtt_info->client->messageHandlers[i].topicFilter);
                 free(mqtt_info->client->messageHandlers[i].topicFilter);
             }
         }
         free(mqtt_info->client);
     }
 
-    rt_kprintf("4\n");
     for(int i =0 ; i < MAX_MESSAGE_HANDLERS ; i++)
     {
         if(mqtt_info->callbackHandler[i].topic)
@@ -214,11 +208,10 @@ void mqtt_client_free_callback(void *native_p)
             mqtt_info->callbackHandler[i].js_func = RT_NULL;
         }
     }
-    rt_kprintf("5\n");
+
     free(mqtt_info);
     
     hasClient = false;
-    rt_kprintf("end\n");
 }
 
 const static jerry_object_native_info_t mqtt_client_info =
@@ -645,7 +638,6 @@ static jerry_value_t mqtt_create_client(int arg_cnt,const jerry_value_t* args)
     jerry_value_t js_client = jerry_create_object();
     mqtt_info_t* client_info = RT_NULL;
 
-    rt_kprintf("hasClient  : %d \n",hasClient);
     if(hasClient == true)
     {
         goto _exit;
@@ -935,4 +927,6 @@ static jerry_value_t jerry_mqtt_init()
     return js_mqtt;
 }
 
-JS_MODULE(mqtt, jerry_mqtt_init)
+JS_MODULE(mqtt, jerry_mqtt_init)
+
+#endif

+ 65 - 29
rtthread-port/jerry_net.c

@@ -1,5 +1,8 @@
 #include "jerry_net.h"
 /* manage the pipe of socket */
+
+#define PIPE_BUFSZ    512
+
 static rt_int32_t pipe_init(struct socket_info *thiz)
 {
     char dname[8];
@@ -192,7 +195,7 @@ void net_event_callback_func(const void *args, uint32_t size)
 
     if (cb_info->js_return != RT_NULL && cb_info->js_return != cb_info->js_target)//&& strcmp(cb_info->event_name, "connection") != 0 )
     {
-        jerry_release_value(cb_info->js_return);;
+        jerry_release_value(cb_info->js_return);
     }
 
     free(cb_info->event_name);
@@ -243,20 +246,39 @@ void net_socket_callback_free(const void *args, uint32_t size)
     get_net_info((void **)&js_socket_info, close_info->js_socket);
     jerry_value_t js_server = js_socket_info->js_server;
 
+    net_writeInfo_t write_info;
+    memset(&write_info, 0, sizeof(net_writeInfo_t));
     if (js_socket_info->readData_thread != RT_NULL)
     {
-        rt_thread_delete(js_socket_info->readData_thread);
-        js_socket_info->readData_thread = RT_NULL;
+        /*Tell readThread to stop reading*/
+
+        write_info.js_data = RT_NULL;
+        write_info.js_callback = RT_NULL;
+        write_info.stop_read = true;
+        write_info.stop_sem = rt_sem_create("stop_read_sem", 1, RT_IPC_FLAG_FIFO);
+        rt_sem_take(write_info.stop_sem, RT_WAITING_FOREVER);
+        write(js_socket_info->pipe_write_fd, &write_info, sizeof(net_writeInfo_t));
     }
-    closesocket(js_socket_info->socket_id);
+
+    if (write_info.stop_sem)
+    {
+        rt_sem_take(write_info.stop_sem, RT_WAITING_FOREVER);
+        closesocket(js_socket_info->socket_id);
+        rt_sem_release(write_info.stop_sem);
+        rt_sem_delete(write_info.stop_sem);
+    }
+    else
+    {
+        closesocket(js_socket_info->socket_id);
+    }
+
+    rt_sem_release(js_socket_info->socket_sem);
     js_socket_info->socket_id = -1;
 
     js_remove_callback(js_socket_info->event_callback);
     js_remove_callback(js_socket_info->fun_callback);
     js_remove_callback(js_socket_info->close_callback);
 
-    pipe_deinit(js_socket_info);
-
     rt_sem_delete(js_socket_info->socket_sem);
     js_socket_info->socket_sem = RT_NULL;
     js_destroy_emitter(close_info->js_socket);
@@ -321,7 +343,6 @@ void net_socket_free_callback(void *native_p)
 
 void net_server_callback_free(const void *args, uint32_t size)
 {
-    rt_kprintf(">> net_server_callback_free \n");
     net_closeInfo_t *close_info = (net_closeInfo_t *)args;
 
     net_serverInfo_t *js_server_info = RT_NULL;
@@ -488,7 +509,7 @@ void net_socket_sendData(net_writeInfo_t *write_info, jerry_value_t js_socket)
     }
 }
 
-static void net_socket_readData_entry(void *p)
+void net_socket_readData_entry(void *p)
 {
     jerry_value_t js_socket = ((net_readInfo_t *)p)->js_socket;
     free(p);
@@ -548,6 +569,12 @@ static void net_socket_readData_entry(void *p)
             static net_writeInfo_t write_info;
             memset(&write_info, 0, sizeof(net_writeInfo_t));
             int bytesRead = read(js_socket_info->pipe_read_fd, &write_info, sizeof(net_writeInfo_t));
+            if (write_info.stop_read && write_info.stop_sem)
+            {
+                pipe_deinit(js_socket_info);
+                rt_sem_release(write_info.stop_sem);
+                break;
+            }
             net_socket_sendData(&write_info, js_socket);
             rt_sem_release(js_socket_info->socket_sem);
         }
@@ -561,6 +588,10 @@ void net_socket_startRead(jerry_value_t js_socket)
     struct socket_info *js_socket_info = RT_NULL;
     get_net_info((void **)&js_socket_info, js_socket);
 
+    if (js_socket_info->socket_id < 0)
+    {
+        return;
+    }
     /*create a thread to read data from server*/
     if (js_socket_info->allowHalfOpen == false)
     {
@@ -579,11 +610,6 @@ void net_socket_startRead(jerry_value_t js_socket)
     event_info->js_return = RT_NULL;
     js_send_callback(js_socket_info->event_callback, event_info, sizeof(net_event_info));
     free(event_info);
-
-    if (js_socket_info->socket_id < 0)
-    {
-        return;
-    }
 }
 
 /*update property : remoteAddress , remoteFamily, remotePort, localAddress and localPort*/
@@ -593,6 +619,7 @@ int net_socket_updateProperty(jerry_value_t js_socket)
     struct socket_info *js_socket_info = RT_NULL;
     get_net_info((void **)&js_socket_info, js_socket);
 
+    rt_kprintf("js_socket_info->socket_id  : %d\n", js_socket_info->socket_id);
     if (js_socket_info->socket_id < 0)
     {
         return 0;
@@ -748,7 +775,7 @@ DECLARE_HANDLER(socket_connect)
         if (ret >= 0)
         {
 __isConnected:
-            if(net_socket_updateProperty(this_value))
+            if (net_socket_updateProperty(this_value))
             {
                 net_socket_startRead(this_value);
             }
@@ -770,6 +797,7 @@ DECLARE_HANDLER(socket_destory)
     struct socket_info *js_socket_info = RT_NULL;
     get_net_info((void **)&js_socket_info, this_value);
 
+    rt_kprintf("socket_destory >> js_socket_info->socket_id  : %d\n", js_socket_info->socket_id);
     if (js_socket_info->socket_id == -1)
     {
         return jerry_create_undefined();
@@ -920,6 +948,8 @@ DECLARE_HANDLER(socket_write)
         {
             write_info.js_callback = RT_NULL;
         }
+        write_info.stop_read = false;
+        write_info.stop_sem = RT_NULL;
         write(js_socket_info->pipe_write_fd, &write_info, sizeof(net_writeInfo_t));
     }
     return jerry_create_undefined();
@@ -1053,13 +1083,13 @@ static void net_server_listen_entry(void *listen_info)
     /*get socket_info*/
     net_serverInfo_t *js_server_info = RT_NULL;
     get_net_info((void **)&js_server_info, js_server);
-    int socket_id = js_server_info->server_id;
+    int server_id = js_server_info->server_id;
     rt_sem_t server_sem = js_server_info->server_sem;
 
     /*start listening and accepting*/
     if (socket > 0)
     {
-        int ret = listen(socket_id, backlog); // start listen
+        int ret = listen(server_id, backlog); // start listen
         struct sockaddr_in client_addr;
         socklen_t addrlen = 1;
 
@@ -1067,7 +1097,7 @@ static void net_server_listen_entry(void *listen_info)
         while (1)
         {
             memset(&client_addr, 0, sizeof(struct sockaddr_in));
-            int client_fd = accept(socket_id, (struct sockaddr *)&client_addr, &addrlen); //accept ths client socket
+            int client_fd = accept(server_id, (struct sockaddr *)&client_addr, &addrlen); //accept ths client socket
             /*judge whether the new client are allow to connect*/
             rt_sem_take(server_sem, RT_WAITING_FOREVER);
 
@@ -1109,8 +1139,10 @@ static void net_server_listen_entry(void *listen_info)
 
                     socket_list_insert(&js_server_info->socket_list, js_socket);
                     client_count = get_socket_list_count(&js_server_info->socket_list);
-                    net_socket_updateProperty(js_socket);
-
+                    if (net_socket_updateProperty(js_socket))
+                    {
+                        net_socket_startRead(js_socket);
+                    }
                     rt_sem_release(server_sem);
                     /*emit 'connention' event*/
                     /*send the new socket to connection listener of js_server*/
@@ -1128,7 +1160,6 @@ static void net_server_listen_entry(void *listen_info)
             {
                 closesocket(client_fd);
             }
-
         }
     }
 }
@@ -1183,8 +1214,11 @@ DECLARE_HANDLER(server_listen)
     net_serverInfo_t *js_server_info = RT_NULL;
     get_net_info((void **)&js_server_info, this_value);
     /*end listen_thread*/
-    //rt_thread_delete(js_server_info->listen_thread);
-    //js_server_info->listen_thread = RT_NULL;
+    if (js_server_info->listen_thread)
+    {
+        rt_thread_delete(js_server_info->listen_thread);
+        js_server_info->listen_thread = RT_NULL;
+    }
     /*the value of option*/
     int port = -1;
     char *host = RT_NULL;
@@ -1316,21 +1350,21 @@ DECLARE_HANDLER(server_listen)
 
         const int on = 1;
         setsockopt(js_server_info->server_id, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on));
-        int ret = bind(js_server_info->server_id, (struct sockaddr *)&server_addr, sizeof(struct sockaddr));;
-        if (ret == 0)
+        int bind_ret = bind(js_server_info->server_id, (struct sockaddr *)&server_addr, sizeof(struct sockaddr));
+        if (bind_ret == 0)
         {
             net_listenInfo_t *listen_info = (net_listenInfo_t *)malloc(sizeof(net_listenInfo_t));
             memset(listen_info, 0, sizeof(net_listenInfo_t));
             listen_info->js_server = this_value;
             listen_info->backlog = backlog;
 
-            struct socket_info *js_socket_info = RT_NULL;
-            get_net_info((void **)&js_socket_info, this_value);
+            net_serverInfo_t *js_server_info = RT_NULL;
+            get_net_info((void **)&js_server_info, this_value);
 
             js_server_info->listen_thread = rt_thread_create("net_listen", net_server_listen_entry, listen_info, 1024, 20, 5);
-            rt_err_t ret = rt_thread_startup(js_server_info->listen_thread);
+            rt_err_t thread_ret = rt_thread_startup(js_server_info->listen_thread);
 
-            if (ret == RT_EOK)
+            if (thread_ret == RT_EOK)
             {
                 /*emit 'listening' event*/
                 net_event_info *event_info = (net_event_info *)malloc(sizeof(net_event_info));
@@ -1340,13 +1374,15 @@ DECLARE_HANDLER(server_listen)
                 event_info->js_target = this_value;
                 event_info->js_return = RT_NULL;
 
-                js_send_callback(js_socket_info->event_callback, event_info, sizeof(net_event_info));
+                js_send_callback(js_server_info->event_callback, event_info, sizeof(net_event_info));
+
                 free(event_info);
             }
         }
     }
 
 __exit:
+    free(host);
     jerry_release_value(js_listen_cb);
     return jerry_create_undefined();
 }

+ 2 - 0
rtthread-port/jerry_net.h

@@ -84,6 +84,8 @@ struct write_data_info
 {
     jerry_value_t js_data;
     jerry_value_t js_callback;
+    bool stop_read;
+    rt_sem_t stop_sem;
 } typedef net_writeInfo_t;
 
 struct event_callback_info

+ 1 - 1
rtthread-port/jerry_request_init.c

@@ -22,7 +22,6 @@ void request_callback_func(const void *args, uint32_t size)
         jerry_release_value(cb_info->data_value);
     }
 
-	free(cb_info->callback_name);
     free(cb_info);
 }
 
@@ -365,6 +364,7 @@ int jerry_request_init(jerry_value_t obj)
 
 static jerry_value_t _jerry_request_init()
 {
+    rt_kprintf(">>>>>>>>>>>>>>>_jerry_request_init<<<<<<<<<<<<<<<<<<<<<\n");
     jerry_value_t js_requset = jerry_create_object();
 
     REGISTER_METHOD_NAME(js_requset, "request", request);