Procházet zdrojové kódy

improvements were made to subscribe function but still have a bit problem

FlintJ před 3 roky
rodič
revize
23b53fe4e0

+ 1 - 1
examples/mqtt/mqtt_connect.py

@@ -1,6 +1,6 @@
 import mqtt
 
-client = mqtt.MQTT('11111',port=1883,clinetID='clientid',username='name_',password='passwd_')
+client = mqtt.MQTT('broker.emqx.io',port=1883,clinetID='clientid',username='name_',password='passwd_')
 
 ret = client.connect()
 print("ret:%d" % ret)

+ 24 - 15
examples/mqtt/mqtt_subscribe.py

@@ -1,28 +1,37 @@
 import mqtt
+import PikaStdDevice
 
-client = mqtt.MQTT('192.168.1.255')
-
-client.setHost('broker.emqx.io')
-client.setPort(1883)
-client.setClientID('123456dddecetdc')
-client.setUsername('test1')
-client.setPassword('aabbccdd')
-client.setVersion('4')
-client.setKeepAlive('10')
+client = mqtt.MQTT('broker.emqx.io',port=1883,clinetID='clientid',username='name_',password='passwd_')
 
 ret = client.connect()
 print("ret:%d" % ret)
 
-client.publish('topic1234', 'hello pikascript')
+def callback0(signal):
+    print("py cb: %s:%s" % (client.recv_topic, client.recv_msg))
 
 
-def callback1(signal):
-    print("py cb: %s:%s" % (client.recv_topic, client.recv_msg))
+ret = client.subscribe('topic_pikapy_qos0', 0, callback0)
+print("ret:%d" % ret)
+# ret = client.subscribe('topic_pikapy_qos1', 1,0)
+# print("ret:%d" % ret)
+# ret = client.subscribe('topic_pikapy_qos2', 2,0)
+# print("ret:%d" % ret)
 
 
-ret = client.subscribe('topic', 1, callback1)
+#sleep wait for recv data
+T = PikaStdDevice.Time()
+T.sleep_s(5)
+    
+client.listSubscribrTopic()
+
+# client.unsubscribe('topic_pikapy_qos0');
+# client.unsubscribe('topic_pikapy_qos1');
+# client.unsubscribe('topic_pikapy_qos2');
 
+T.sleep_s(5)
 client.listSubscribrTopic()
 
-#ret = client.disconnect()
-#print("ret:%d" % ret)
+T.sleep_s(10)
+
+ret = client.disconnect()
+print("ret:%d" % ret)

+ 25 - 19
package/mqtt/_mqtt__MQTT.c

@@ -162,8 +162,8 @@ int _mqtt__MQTT_disconnect(PikaObj* self) {
 // 返 回 值:对象指针
 ///////////////////////////////////////////////////////////////////
 PikaObj* _mqtt__MQTT_listSubscribrTopic(PikaObj* self) {
-    int ret;
     mqtt_client_t* _client = obj_getPtr(self, "_client");
+    int ret;
 
     ret = mqtt_list_subscribe_topic(_client);
     if (ret == 0)
@@ -455,8 +455,9 @@ int _mqtt__MQTT_setWill(PikaObj* self,
 int _mqtt__MQTT_subscribe(PikaObj* self, char* topic, int qos, Arg* cb) {
     mqtt_client_t* _client = obj_getPtr(self, "_client");
     int ret;
+    char topic_str[MQTT_TOPIC_LEN_MAX];
 
-    if (strlen(topic) <= 0) {
+    if ((strlen(topic) > MQTT_TOPIC_LEN_MAX)||(strlen(topic) <= 0)) {
         __platform_printf("input topic error\r\n");
         return -1;
     }
@@ -466,22 +467,27 @@ int _mqtt__MQTT_subscribe(PikaObj* self, char* topic, int qos, Arg* cb) {
         return -1;
     }
     
-    obj_setArg(self, "eventCallBack", cb);
-    /* init event_listener for the first time */
-    if (NULL == g_mqtt_event_listener) {
-        pks_eventLisener_init(&g_mqtt_event_listener);
-    }
-    uint32_t eventId = hash_time33(topic);
-    pks_eventLicener_registEvent(g_mqtt_event_listener, eventId, self);
-
-    ret = mqtt_subscribe(_client, topic, qos, Subscribe_Handler);
+    // obj_setArg(self, "eventCallBack", cb);
+    // /* init event_listener for the first time */
+    // if (NULL == g_mqtt_event_listener) {
+    //     pks_eventLisener_init(&g_mqtt_event_listener);
+    // }
+    // uint32_t eventId = hash_time33(topic);
+    // pks_eventLicener_registEvent(g_mqtt_event_listener, eventId, self);
+
+    //必须转换成python环境的变量,否则函数退出后,topic里的是个空指针
+    memset(topic_str,0,sizeof(topic_str));
+    sprintf(topic_str,"%s",topic);
+    obj_setStr(self, "topic_str", topic);
+    __platform_printf("topic_str:%s \r\n", topic_str);
+    ret = mqtt_subscribe(_client, obj_getStr(self, "topic_str"), qos, Subscribe_Handler);
 
     if (ret == 0) {
-        __platform_printf("MQTT_subscribe Topic :%s OK\r\n", topic);
+        __platform_printf("MQTT_subscribe Topic :%s Qos:%d OK\r\n", topic,qos);
     } else
         __platform_printf("MQTT_subscribe Topic ERROR\r\n");
 
-    return 0;
+    return ret;
 }
 
 ////////////////////////////////////////////////////////////////////
@@ -515,12 +521,12 @@ int _mqtt__MQTT_unsubscribe(PikaObj* self, char* topic) {
 // 返 回 值:0=成功;非0=错误码
 ///////////////////////////////////////////////////////////////////
 void Subscribe_Handler(void* client, message_data_t* msg) {
-    PikaObj* self = ((mqtt_client_t*)client)->user_data;
-    Arg* cb = obj_getArg(self, "callback");
-    obj_setStr(self, "recv_topic", msg->topic_name);
-    obj_setStr(self, "recv_msg", msg->message->payload);
-    pks_eventLisener_sendSignal(g_mqtt_event_listener,
-                                hash_time33(msg->topic_name), 1);
+    // PikaObj* self = ((mqtt_client_t*)client)->user_data;
+    // Arg* cb = obj_getArg(self, "eventCallBack");
+    // obj_setStr(self, "recv_topic", msg->topic_name);
+    // obj_setStr(self, "recv_msg", msg->message->payload);
+    // pks_eventLisener_sendSignal(g_mqtt_event_listener,
+    //                             hash_time33(msg->topic_name), 1);
 
     MQTT_LOG_I("\n>>>------------------");
     MQTT_LOG_I("Topic:%s \nlen:%d,message: %s", msg->topic_name,

+ 1 - 1
package/mqtt/mqtt.py

@@ -10,7 +10,7 @@ class MQTT(_mqtt._MQTT):
                  password='',
                  version='3.1.1',
                  ca='',
-                 keepalive=60):
+                 keepalive=10):
         super().__init__(ip, port, clinetID,
                          username, password, version,
                          ca, keepalive)

+ 1 - 1
port/linux/package/pikascript/mqtt.py

@@ -10,7 +10,7 @@ class MQTT(_mqtt._MQTT):
                  password='',
                  version='3.1.1',
                  ca='',
-                 keepalive=60):
+                 keepalive=10):
         super().__init__(ip, port, clinetID,
                          username, password, version,
                          ca, keepalive)

+ 25 - 19
port/linux/package/pikascript/pikascript-lib/mqtt/_mqtt__MQTT.c

@@ -162,8 +162,8 @@ int _mqtt__MQTT_disconnect(PikaObj* self) {
 // 返 回 值:对象指针
 ///////////////////////////////////////////////////////////////////
 PikaObj* _mqtt__MQTT_listSubscribrTopic(PikaObj* self) {
-    int ret;
     mqtt_client_t* _client = obj_getPtr(self, "_client");
+    int ret;
 
     ret = mqtt_list_subscribe_topic(_client);
     if (ret == 0)
@@ -455,8 +455,9 @@ int _mqtt__MQTT_setWill(PikaObj* self,
 int _mqtt__MQTT_subscribe(PikaObj* self, char* topic, int qos, Arg* cb) {
     mqtt_client_t* _client = obj_getPtr(self, "_client");
     int ret;
+    char topic_str[MQTT_TOPIC_LEN_MAX];
 
-    if (strlen(topic) <= 0) {
+    if ((strlen(topic) > MQTT_TOPIC_LEN_MAX)||(strlen(topic) <= 0)) {
         __platform_printf("input topic error\r\n");
         return -1;
     }
@@ -466,22 +467,27 @@ int _mqtt__MQTT_subscribe(PikaObj* self, char* topic, int qos, Arg* cb) {
         return -1;
     }
     
-    obj_setArg(self, "eventCallBack", cb);
-    /* init event_listener for the first time */
-    if (NULL == g_mqtt_event_listener) {
-        pks_eventLisener_init(&g_mqtt_event_listener);
-    }
-    uint32_t eventId = hash_time33(topic);
-    pks_eventLicener_registEvent(g_mqtt_event_listener, eventId, self);
-
-    ret = mqtt_subscribe(_client, topic, qos, Subscribe_Handler);
+    // obj_setArg(self, "eventCallBack", cb);
+    // /* init event_listener for the first time */
+    // if (NULL == g_mqtt_event_listener) {
+    //     pks_eventLisener_init(&g_mqtt_event_listener);
+    // }
+    // uint32_t eventId = hash_time33(topic);
+    // pks_eventLicener_registEvent(g_mqtt_event_listener, eventId, self);
+
+    //必须转换成python环境的变量,否则函数退出后,topic里的是个空指针
+    memset(topic_str,0,sizeof(topic_str));
+    sprintf(topic_str,"%s",topic);
+    obj_setStr(self, "topic_str", topic);
+    __platform_printf("topic_str:%s \r\n", topic_str);
+    ret = mqtt_subscribe(_client, obj_getStr(self, "topic_str"), qos, Subscribe_Handler);
 
     if (ret == 0) {
-        __platform_printf("MQTT_subscribe Topic :%s OK\r\n", topic);
+        __platform_printf("MQTT_subscribe Topic :%s Qos:%d OK\r\n", topic,qos);
     } else
         __platform_printf("MQTT_subscribe Topic ERROR\r\n");
 
-    return 0;
+    return ret;
 }
 
 ////////////////////////////////////////////////////////////////////
@@ -515,12 +521,12 @@ int _mqtt__MQTT_unsubscribe(PikaObj* self, char* topic) {
 // 返 回 值:0=成功;非0=错误码
 ///////////////////////////////////////////////////////////////////
 void Subscribe_Handler(void* client, message_data_t* msg) {
-    PikaObj* self = ((mqtt_client_t*)client)->user_data;
-    Arg* cb = obj_getArg(self, "callback");
-    obj_setStr(self, "recv_topic", msg->topic_name);
-    obj_setStr(self, "recv_msg", msg->message->payload);
-    pks_eventLisener_sendSignal(g_mqtt_event_listener,
-                                hash_time33(msg->topic_name), 1);
+    // PikaObj* self = ((mqtt_client_t*)client)->user_data;
+    // Arg* cb = obj_getArg(self, "eventCallBack");
+    // obj_setStr(self, "recv_topic", msg->topic_name);
+    // obj_setStr(self, "recv_msg", msg->message->payload);
+    // pks_eventLisener_sendSignal(g_mqtt_event_listener,
+    //                             hash_time33(msg->topic_name), 1);
 
     MQTT_LOG_I("\n>>>------------------");
     MQTT_LOG_I("Topic:%s \nlen:%d,message: %s", msg->topic_name,

+ 1 - 1
test/mqtt-test.cpp

@@ -134,7 +134,7 @@ TEST(mqtt, publish) {
 }
 #endif
 
-#if 0
+#if 1
 //! Mqtt subscribe will break the gichub actions, 
 //! if need to test, please enable it manually.
 TEST(mqtt, subscribe) {

+ 1 - 1
test/python/mqtt/mqtt_connect.py

@@ -1,6 +1,6 @@
 import mqtt
 
-client = mqtt.MQTT('11111',port=1883,clinetID='clientid',username='name_',password='passwd_')
+client = mqtt.MQTT('broker.emqx.io',port=1883,clinetID='clientid',username='name_',password='passwd_')
 
 ret = client.connect()
 print("ret:%d" % ret)

+ 24 - 15
test/python/mqtt/mqtt_subscribe.py

@@ -1,28 +1,37 @@
 import mqtt
+import PikaStdDevice
 
-client = mqtt.MQTT('192.168.1.255')
-
-client.setHost('broker.emqx.io')
-client.setPort(1883)
-client.setClientID('123456dddecetdc')
-client.setUsername('test1')
-client.setPassword('aabbccdd')
-client.setVersion('4')
-client.setKeepAlive('10')
+client = mqtt.MQTT('broker.emqx.io',port=1883,clinetID='clientid',username='name_',password='passwd_')
 
 ret = client.connect()
 print("ret:%d" % ret)
 
-client.publish('topic1234', 'hello pikascript')
+def callback0(signal):
+    print("py cb: %s:%s" % (client.recv_topic, client.recv_msg))
 
 
-def callback1(signal):
-    print("py cb: %s:%s" % (client.recv_topic, client.recv_msg))
+ret = client.subscribe('topic_pikapy_qos0', 0, callback0)
+print("ret:%d" % ret)
+# ret = client.subscribe('topic_pikapy_qos1', 1,0)
+# print("ret:%d" % ret)
+# ret = client.subscribe('topic_pikapy_qos2', 2,0)
+# print("ret:%d" % ret)
 
 
-ret = client.subscribe('topic', 1, callback1)
+#sleep wait for recv data
+T = PikaStdDevice.Time()
+T.sleep_s(5)
+    
+client.listSubscribrTopic()
+
+# client.unsubscribe('topic_pikapy_qos0');
+# client.unsubscribe('topic_pikapy_qos1');
+# client.unsubscribe('topic_pikapy_qos2');
 
+T.sleep_s(5)
 client.listSubscribrTopic()
 
-#ret = client.disconnect()
-#print("ret:%d" % ret)
+T.sleep_s(10)
+
+ret = client.disconnect()
+print("ret:%d" % ret)