本节介绍如何使用 RT-Thread MicroPython 来将设备接入 OneNET 云平台,本次示例使用的接入协议为 MQTT。
urequests 模块和 umqtt.simple 模块,安装方法参考 HttpClient 和 MQTT 章节。设备接入方式和设备接入协议时,因为本次示例使用的是 MQTT 协议,所以要在设备接入方式中选择公开协议,设备接入协议选择 MQTT。本章节将介绍如何将设备接入到 OneNET 云平台上。并且演示一个云平台向设备发送命令,设备向云平台返回命令发送次数的示例。
正式环境注册码记录下来用于注册新的设备。DeviceRT_Thread_Test_Product 的设备已经注册完毕并且上线。$dpimport urequests as requests
from umqtt.simple import MQTTClient
import ujson as json
import time
class Register():
def __init__(self, url='', title='', sn='', mac=''):
self.url = url
self.title = title
self.sn = sn
self.mac = mac
self.sock = None
self.tjson = {}
self.erron = 0
self.key = ''
self.device_id = ''
def regist(self):
assert self.url is not None, "Url is not set"
_, _, host, path = self.url.split('/', 3)
if host == '':
return
device = {"mac":self.mac} if self.sn == '' else {"sn":self.sn}
if self.title != '':
device['title'] = self.title
jdata = json.dumps(device)
resp = requests.post(self.url, data=jdata)
if resp:
self.tjson = resp.json()
if self.tjson['errno'] == 0:
self.key = self.tjson['data']['key']
self.device_id = self.tjson['data']['device_id']
return 0
else:
return -1
class OneNetMqtt:
failed_count = 0
def __init__(self, client_id='', username='', password=''):
self.server = "183.230.40.39"
self.client_id = client_id
self.username = username
self.password = password
self.topic = "topic_sub" # 填入测试 topic
self.mqttClient = MQTTClient(self.client_id, self.server,6002,self.username,self.password)
self.cmd_times = 0 # publish count
def pubData(self, t):
value = {'datastreams':[{"id":"switch","datapoints":[{"value": self.cmd_times }]}]}
jdata = json.dumps(value)
jlen = len(jdata)
bdata = bytearray(jlen+3)
bdata[0] = 1 # publish data in type of json
bdata[1] = int(jlen / 256) # data lenght
bdata[2] = jlen % 256 # data lenght
bdata[3:jlen+4] = jdata.encode('ascii') # json data
print('publish data', str(self.cmd_times + 1))
try:
self.mqttClient.publish('$dp', bdata) # $dp 为特殊系统 topic,可以通过这个 topic 给系统推送信息,但是不能订阅这个 topic
self.cmd_times += 1
self.failed_count = 0
except Exception as ex:
self.failed_count += 1
print('publish failed:', ex.message())
if self.failed_count >= 3:
print('publish failed three times, esp resetting...')
reset()
def sub_callback(self, topic, msg):
print((topic,msg))
cmd = msg.decode('ascii').split(" ")
print('sub_callback')
def connect(self):
self.mqttClient.set_callback(self.sub_callback)
self.mqttClient.connect()
self.mqttClient.subscribe(self.topic)
print("Connected to %s, subscribed to %s topic." % (self.server, self.topic))
try:
while True:
self.mqttClient.check_msg()
print("pubdata")
self.pubData('x')
finally:
self.mqttClient.disconnect()
print('MQTT closed')
def main():
sn = 'RT_Thread_Test_Product' #1、填入设备唯一标识符
title = 'Device' + sn
product_id = 'XXXXX' #2、填入创建设备时获得的产品 ID
regKey = 'XXXXXXXX' #3、填入正式环境注册码
url = 'http://api.heclouds.com/register_de?register_code=' + regKey
reg = Register(url=url, title=title, sn=sn) #根据上面的信息注册设备,如果已经注册不再重复注册
if reg.regist()==0:
MQTT = OneNetMqtt(client_id=reg.device_id, username=product_id, password=reg.key) #开启 MQTT 服务
MQTT.connect()
else:
print('Error: No Client ID!')
if __name__ == "__main__":
main()