| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911 |
- import _bluetooth
- # 特性标志位
- FLAG_BROADCAST = 0x0001
- FLAG_READ = 0x0002
- FLAG_WRITE_NO_RESPONSE = 0x0004
- FLAG_WRITE = 0x0008
- FLAG_NOTIFY = 0x0010
- FLAG_INDICATE = 0x0020
- FLAG_AUTHENTICATED_SIGNED_WRITE = 0x0040
- # 描述符标志位
- FLAG_DSC_READ = 0x01
- FLAG_DSC_WRITE = 0x02
- FLAG_DSC_READ_ENC = 0x04
- FLAG_DSC_READ_AUTHEN = 0x08
- FLAG_DSC_READ_AUTHOR = 0x10
- FLAG_DSC_WRITE_ENC = 0x20
- FLAG_DSC_WRITE_AUTHEN = 0x40
- FLAG_DSC_WRITE_AUTHOR = 0x80
- FLAG_AUX_WRITE = 0x0100
- FLAG_READ_ENCRYPTED = 0x0200
- FLAG_READ_AUTHENTICATED = 0x0400
- FLAG_READ_AUTHORIZED = 0x0800
- FLAG_WRITE_ENCRYPTED = 0x1000
- FLAG_WRITE_AUTHENTICATED = 0x2000
- FLAG_WRITE_AUTHORIZED = 0x4000
- '''
- 创建具有指定值的 UUID 实例
- 该值可以是
- - 一个 16 位整数. 例如`0x2908`
- - 一个 32 位整数. 例如`0x29081234`
- - 一个 128 位的 UUID 字符串。例如`'6E400001-B5A3-F393-E0A9-E50E24DCCA9E'`.
- '''
- class UUID():
- def __init__(self,value):
- # try:
- self.value,self._UUID_bits = self._UUID_to_bytes(value)
- # except:
- # print("ValueError: Invalid UUID")
- # raise ValueError
-
- def _UUID_to_bytes(self,value):
- value_bytes = []
- UUID_bits = 0
- # try:
- if isinstance(value,bytearray):
- value_bytes = bytes(value.decode()) # 暂不支持bytes(bytearray())
- if len(value_bytes) > 16 :
- value_bytes = []
- # raise ValueError
- raise ValueError
- elif isinstance(value, bytes):
- value_bytes = value
- elif isinstance(value,str):
- # if len(value) == 36 and (value[8] == value[13] == value[18] == value[23] == '-'): #暂不支持连等
- if len(value) == 36 and (value[8] == '-') and (value[13] == '-') and (value[18] == '-') and (value[23] == '-'):
- value_str = value.replace("-","")
- value_list = []
- if len(value_str) == 32:
- for i in range(0, 32, 2):
- value_list.append(int(value_str[i:i+2],16))
- value_bytes = bytes(value_list)
- else :
- raise ValueError
- else :
- raise ValueError
- elif isinstance(value,int):
- value_list = []
- if value >= 0 and value < 65536 : #16 bit uuid
- value_list.append(int(value/256))
- value_list.append(value%256)
- elif value >= 0 and value < 65536 * 65536: #32 bit uuid
- value_list.append(int(value/(65536 * 256)))
- value_list.append(int((value%(65536 * 256))/65536))
- value_list.append(int((value%(65536))/256))
- value_list.append((value%(256)))
- else :
- raise ValueError
- value_bytes = bytes(value_list)
- else:
- raise ValueError
- UUID_bits = len(value_bytes) # bytes的数量
- return value_bytes,UUID_bits
-
- class BLE(_bluetooth.BLE):
- def __init__(self):
- print("BLE init")
- # a = super().__init__()
- self._last_adv_data = "" #广播内容
- self._last_resp_data = "" #回应扫描内容
- self._addr_mode = 0 #地址类型 BLE_OWN_ADDR_PUBLIC,BLE_OWN_ADDR_RANDOM,BLE_OWN_ADDR_RPA_PUBLIC_DEFAULT,BLE_OWN_ADDR_RPA_RANDOM_DEFAULT
- self._callback_func = None #回调函数
- self._basic_value_handle = 20
- self._connectable = True
- self._py2c_dict = {}
- self._c2py_dict = {}
- self._c2value_dict = {}
- self._append_adv_data = True
- a = self.init()
- self.setCallback(self._callback)
- # 测试函数
- def test(self):
- if self.pyi_check_active() == False:
- raise OSError
-
- def test2(self):
- self.test()
- # return self.pyi_test2()s
- def test3(self,connhandle,valuehandle):
- return self.pyi_test3(connhandle,valuehandle)
-
- def test4(self,data):
- pass
-
- def test_call_some_name(self):
- super().test_call_some_name()
- # 检查蓝牙活跃性
- def _check_active(self):
- if self.pyi_check_active() == False:
- raise OSError
-
- '''
- 可选择更改 BLE 无线电的活动状态,并返回当前状态。
- Args:
- - active(bool): 为 1 则开启BLE, 否则关闭BLE; 为空则查询当前BLE状态
-
- Returns:
- - bool:设置状态时,返回操作成功与否; 查询状态时, 返回状态, True为活跃
- '''
- def active(self,active_flag = None ):
- if active_flag == None:
- return self.pyi_check_active()
- else:
- if (active_flag > 0 or active_flag == True):
- return self.pyi_active(True)
- elif (active_flag == 0 or active_flag == False):
- return self.pyi_active(False)
- '''
- 获取或设置 BLE 接口的配置值。
-
- 要获得一个值,参数名称应该被引用为一个字符串,并且一次只查询一个参数。
- 如 `config("gap_name")`
-
- 要设置值,请使用关键字语法,并且一次可以设置一个或多个参数。
- 如 `config(gap_name="nimble")`
- 当前支持的值是:
- - `'mac'`:
- TODO:需要支持设置任意值嘛?有点困难
-
- - `'addr_mode'`:
- - `'gap_name'`:
-
- - `'rxbuf'`:
- TODO:暂未支持, 没找到对应函数
- - `'mtu'`:
- TODO:不明意义
- - `'bond'`:
- TODO: 目前提供的接口不支持
- - `'mitm'`:
- TODO: 目前提供的接口不支持
- - `'io'`:
- TODO:未找到对应函数
- - `'lesure'`:
- TODO:只支持设置
- '''
- def config(self, *param_name, **kv): # a.config(mac="1123",gap_name="test")
- # 获取参数属性 a.config("mac")
- try:
- self._check_active()
- except:
- raise OSError
-
- if len(param_name) != 0 :
- first_param = param_name[0]
- if first_param == "mac":
- return (self.config_addr_mode_get(),self.config_mac_get())
- elif first_param == "addr_mode":
- return self.config_addr_mode_get()
- elif first_param == "gap_name":
- return self.config_gap_name_get()
- # elif first_param == "rxbuf" :
- # self.config_addr_rxbuf_get()
- # elif first_param == "mtu" :
- # self.config_mtu_get()
- # elif first_param == "bond" :
- # self.config_bond_get()
- # elif first_param == "mitm" :
- # self.config_mitm_get()
- # elif first_param == "io":
- # self.config_io_get()
- # elif first_param == "le_secure":
- # self.config_le_secure_get()
- elif first_param == "gap_uuid":
- return self.config_gap_uuid_get()
- else:
- print("ValueError: unknown config param")
- # 设置参数
- if "mac" in kv:
- self.config_mac_update(kv["mac"])
- if ("addr_mode" in kv):
- self.config_addr_mode_update(kv["_addr_mode"])
- if ("gap_name" in kv):
- self.config_gap_name_update(kv["gap_name"])
- # if ("rxbuf" in kv):
- # return self.config_rxbuf_update(kv["rxbuf"])
- # if ("mtu" in kv):
- # return self.config_mtu_update(kv["mtu"])
- # if ("bond" in kv):
- # return self.config_bond_update(kv["bond"])
- # if ("mitm" in kv):
- # return self.config_mitm_update(kv["mitm"])
- # if ("bond" in kv):
- # return self.config_mac_update(kv["bond"])
- # if ("io" in kv):
- # return self.config_io_update(kv["io"])
- # if ("le_secire" in kv):
- # return self.config_le_secire_update(kv["le_secire"])
-
- if ("gap_uuid" in kv):
- uuid = kv["gap_uuid"]
- if isinstance(uuid,UUID):
- self.config_gap_uuid_update(uuid.value,uuid._UUID_bits)
- '''
- 为来自 BLE 堆栈的事件注册回调。
-
- 处理程序采用两个参数,event(在下面的代码中的一个) 和 data(其是值的特定事件元组)。
- '''
- # 回调事件处理函数
- def irq(self,func):
- self._callback_func = func
- return 0
-
- def _callback(self,data):
- event_id = data[0]
- # print("_callback call")
- # print("event_id",event_id)
- # print("data ",data[1])
- if self._callback_func != None:
- if event_id > 100 : #自定义回调事件
- if event_id == 101 : # 建立句柄映射
- ble_value_handles = data[1]
- length = len(ble_value_handles)
- for i in range(length):
- key = self._basic_value_handle + i
- value = ble_value_handles[i]
- # py 映射 c handle
- self._py2c_dict[str(key)] = value
- # c 映射 py handle
- self._c2py_dict[str(value)] = key
- # c handle 映射 value, 默认值为空
- self._c2value_dict[str(value)] = bytes("")
- elif event_id == 102: #nimble蓝牙协议栈读属性
- buf = self._c2value_dict[str(data[1])]
- return len(buf),buf
- else:
- if event_id == 3: # write
- self._c2_change_value(data[2], data[3])
- data = data[:3]
- elif event_id == 4: # read 请求
- rc = self._callback_func(event_id,data[1:])
- value = -99
- length = -99
- if rc == 0: #允许读
- value = self._c2value(data[2])
- length = len(value)
- return rc,length,value
- else:
- return self._callback_func(event_id,data[1:])
- '''
- 以指定的时间间隔(以微秒为单位)开始广播。
- adv_data和resp_data可以是任何实现缓冲协议的类型(例如bytes, bytearray, str)。adv_data包含在所有广播中,并且resp_data被发送以响应主动扫描。
- 注意:如果adv_data(或resp_data)是None, 则传递给前一个调用的数据 gap_advertise将被重新使用。
-
- 要清除广告有效负载, 请传递一个空值bytes, 即b''
- Args:
- - interval_us(int):广播时间间隔, 单位us. 此间隔将向下舍入到最接近的 625us的倍数。要停止广告, 请将interval_us设置 为 None。
- - adv_data(bytes, bytearray, str): 自定义广播数据(添加到默认广播数据后),默认值为none
-
- TODO:1.append模式只能是0XFF数据; 归零模式全部自定义输入
- - resp_data(bytes, bytearray, str): 扫描响应数据,默认值为none
- - connectable(bool): 是否可连接, True 为可连接.
- - adv_data_append: 是否是增加数据
- TODO:存在问题
- '''
- def gap_advertise(self, interval_us,adv_data=None,resp_data=None, connectable=None, adv_data_append=None):
- try:
- self._check_active()
- except:
- raise OSError
-
- if connectable != None:
- self._connectable = connectable
-
- if interval_us is None:
- return self.stop_advertise()
- else:
- # 设置广播载荷
- if adv_data is None: #参数为空,则使用上次数据广播数据
- adv_data = self._last_adv_data
- else :
- if adv_data_append != None: # 设置是否增加数据
- self._append_adv_data = adv_data_append
- if self._append_adv_data == True: # 新增数据
- self._last_adv_data = _to_bytes(adv_data)
- else: # 覆盖数据
- empty_list = []
- for i in range(len(adv_data)):
- empty_list.append(len(adv_data[i]))
- empty_list += adv_data[i]
- # print(empty_list)
- self._last_adv_data = _to_bytes(empty_list)
- # 设置响应载荷
- if resp_data is None:
- resp_data = self._last_resp_data
- else :
- self._last_resp_data = _to_bytes(resp_data)
- # self._adv_data_append = False
- # print("_addr_mode : ",self._addr_mode)
- # print("int(interval_us/625) : ",int(interval_us/625))
- # print("self._connectable: ",self._connectable)
- # print("self._last_adv_data : ",self._last_adv_data)
- # print("len(self._last_adv_data) : ",len(self._last_adv_data))
- # print("adv_data_append : ",self._append_adv_data)
- # print("self._b",self._b)
- # print("self._last_resp_data : ",self._last_resp_data)
- # print("len(self._last_resp_data) : ",len(self._last_resp_data))
- return self.advertise(self._addr_mode,int(interval_us/625),self._connectable,self._last_adv_data,
- len(self._last_adv_data),self._append_adv_data,self._last_resp_data,len(self._last_resp_data))
-
- '''
- 运行持续指定持续时间(以毫秒为单位)的扫描操作。
-
- 要无限期扫描, 请将duration_ms设置为0;要停止扫描,请将duration_ms设置为 None.
- 使用interval_us和window_us可选择配置占空比。扫描器将每interval_us微秒运行window_us 微秒,总共持续duration_ms毫秒。
-
- 默认间隔和窗口分别为 1.28 秒和 11.25 毫秒(后台扫描).
- 对于每个扫描结果, _IRQ_SCAN_RESULT将引发事件, 并带有事件数据。`(addr_type, addr, adv_type, rssi, adv_data)`
- Args:
- - duration_ms(int):扫描持续时间。要无限期扫描, 请将duration_ms设置为0;要停止扫描,请将duration_ms设置为 None.
- - interval_us(int): 扫描间间隔时间。默认值为1280000
- - window_us(int): 扫描窗口时间。默认值为11250
- - active(bool): 是否接受扫描响应。默认值False(不接受)
- Return:
-
- - 操作成功返回 0; 否则返回错误代码
- events:
- - 单个扫描结果触发 _IRQ_SCAN_RESULT
- - 扫描结束触发 _IRQ_SCAN_DONE
-
- '''
- def gap_scan(self, duration_ms, interval_us=1280000, window_us=11250, active=False):
- try:
- self._check_active()
- except:
- raise OSError
-
- self._check_active()
- if duration_ms is None :
- return self.gap_stop_disc()
- else:
- return self.gap_disc(self._addr_mode, duration_ms,int(interval_us/625),int(window_us/625),active)
- '''
- central 设备连接 peripherals 设备
- Args:
- - peer_addr(list,bytes,bytearray): 需要连接的设备的mac地址
- - peer_addr_type(int): 连接设备的地址类型
- - 0x00 - PUBLIC - 使用控制器的公共地址。
- - 0x01 - RANDOM - 使用生成的静态地址。
- - 0x02 - RPA - 使用可解析的私有地址。
- - 0x03 - NRPA - 使用不可解析的私有地址。
- Return:
- - (int): 操作成功返回 0; 否则返回错误代码
- events:
- - 连接成功触发 CENTRAL设备:_IRQ_PERIPHERAL_CONNECT; PERIPHERAL设备:_IRQ_CENTRAL_CONNECT
- '''
- def gap_connect(self,peer_addr,peer_addr_type, scan_duration_ms=2000):
- try:
- self._check_active()
- except:
- raise OSError
- self._check_active()
- peer_addr_bytes = _to_bytes(peer_addr,6)
- return self.pyi_gap_connect(peer_addr_bytes,peer_addr_type ,scan_duration_ms)
-
- '''
- Args:
- - conn_handle(int): 连接句柄
- Return:
- - (int): 操作成功返回 0; 否则返回错误代码
- events:
- - 断连成功触发 CENTRAL设备:_IRQ_PERIPHERAL_DISCONNECT; PERIPHERAL设备:_IRQ_CENTRAL_DISCONNECT
- '''
- def gap_disconnect(self, conn_handle):
- try:
- self._check_active()
- except:
- raise OSError
- self._check_active()
- return self.pyi_gap_disconnect(conn_handle)
-
- '''
- 使用指定的服务配置服务器,替换任何现有服务。
- Args:
- - services(嵌套多层tuple):
- Return:
- - (int): 操作成功返回 0; 否则返回错误代码
- '''
- def gatts_register_services(self, services):
- try:
- self._check_active()
- except:
- raise OSError
-
- self._check_active()
- convert_services = _convert_ble_service_info(services)
- offset = 0
- chr_list = _count_chrs(services) #计算每个服务的特性数量
- all_chr_count = 0
- py_handles = []
- # 计算py handle返回值
- # 计算chr总数量
- for i in range(len(chr_list)):
- py_handles_one_service = []
- all_chr_count += chr_list[i]
- for j in range(chr_list[i]):
- value_handle = self._basic_value_handle + offset
- py_handles_one_service.append(value_handle)
- offset += 1
- py_handles.append(py_handles_one_service)
- rc = self.gatts_register_svcs(convert_services,all_chr_count)
-
- if rc != 0 :
- return rc
- return tuple(py_handles)
-
- '''
- Args:
- - value_handle(int): 蓝牙特性句柄
- Return:
- - (int): 操作成功返回 0; 否则返回错误代码
- '''
- def gatts_read(self,value_handle):
- try:
- self._check_active()
- except:
- raise OSError
-
- return self._py2value(value_handle)
-
- '''
- Args:
- - value_handle(int): 蓝牙特性句柄
- - data(str,int,bytes,list(元素为小于256大于0的整数)): 修改的数据
- - send_update(bool):是否通知客户端, False为不通知, 默认为False
- Return:
- - (int): 操作成功返回 0; 否则返回错误代码
- events:
- - send_update 为 True 时, 触发tx事件
- '''
- def gatts_write(self,value_handle, data, send_update=False):
- try:
- self._check_active()
- except:
- raise OSError
-
- print("send_update=", send_update)
- if send_update == False:
- return self._py2_change_value(value_handle,data)
- else :
- rc = self._py2_change_value(value_handle,data)
- if rc != 0:
- return rc
- return self.gatts_chr_updated(self._py2c_dict[str(value_handle)])
-
-
- '''
- 向连接的客户端发送通知请求。
- Args:
- - conn_handle(int): 连接句柄
-
- - value_handle(int): 蓝牙特性句柄
- - data(str,int,bytes,list(元素为小于256大于0的整数)): 发送的数据, 默认值为None
-
- 如果data不是None, 则该值将作为通知的一部分发送给客户端。本地值不会被修改。否则, 如果data是None, 则将发送当前本地值
- Return:
- - (int): 操作成功返回 0; 否则返回错误代码
- events:
- -
- '''
- def gatts_notify(self,conn_handle, value_handle, data=None):
- try:
- self._check_active()
- except:
- raise OSError
-
- self._check_active()
- value = ""
- if data is None:
- value = self._py2value(value_handle)
- c_value_handle = self._py2c_dict[str(value_handle)]
- else :
- if isinstance(data,bytes): #TODO:关注一下数据类型
- value = data
- elif isinstance(data,int):
- value = bytes([data])
- else:
- value = bytes(data)
- c_value_handle = self._py2c_dict[str(value_handle)]
- self.pyi_gatts_notify(conn_handle, c_value_handle,value,len(value))
-
- '''
- Args:
- -
- Return:
- - 操作是否成功
- events:
- -
- '''
- def gatts_indicate(self,conn_handle, value_handle,data=None):
- try:
- self._check_active()
- except:
- raise OSError
-
- value = ""
- if data is None:
- value = self._py2value(value_handle)
- c_value_handle = self._py2c_dict[str(value_handle)]
- else :
- if isinstance(data,bytes):
- value = data
- elif isinstance(data,int):
- value = bytes([data])
- else:
- value = bytes(data)
- c_value_handle = self._py2c_dict[str(value_handle)]
- self.pyi_gatts_indicate(conn_handle, c_value_handle,value,len(value))
-
- '''
- Args:
- -
- Return:
- - 操作是否成功
- events:
- -
- '''
- def gatts_set_buffer(self,value_handle, len, append=False):
- try:
- self._check_active()
- except:
- raise OSError
- # TODO:暂不清楚对照哪个函数
- pass
-
- '''
- 打印全部本地服务
- '''
- def gatts_show_local(self):
- try:
- self._check_active()
- except:
- raise OSError
- return self.pyi_gatts_show_local()
-
- '''
- Args:
- -
- Return:
- - 操作是否成功
- events:
- -
- '''
- def gattc_discover_services(self,conn_handle, uuid:UUID=None):
- try:
- self._check_active()
- except:
- raise OSError
-
- if uuid == None:
- return self.gattc_dis_svcs(conn_handle)
- else :
- return self.gattc_dis_svcs_by_uuid(conn_handle,uuid.value,len(uuid.value))
- '''
- Args:
- -
- Return:
- - 操作是否成功
- events:
- -
- '''
- def gattc_discover_characteristics(self,conn_handle, start_handle, end_handle, uuid:UUID=None):
- try:
- self._check_active()
- except:
- raise OSError
-
- if uuid == None:
- return self.gattc_dis_chrs(conn_handle,start_handle,end_handle)
- else :
- return self.gattc_dis_chrs_by_uuid(conn_handle, start_handle, end_handle,uuid.value,len(uuid.value))
- '''
- Args:
- -
- Return:
- - 操作是否成功
- events:
- -
- '''
- def gattc_discover_descriptors(self,conn_handle, start_handle, end_handle):
- try:
- self._check_active()
- except:
- raise OSError
- return self.gattc_dis_dscs(conn_handle,start_handle, end_handle)
- '''
- Args:
- -
- Return:
- - 操作是否成功
- events:
- -
- '''
- def gattc_read(self,conn_handle, value_handle):
- try:
- self._check_active()
- except:
- raise OSError
- return self.pyi_gattc_read(conn_handle, value_handle)
- '''
- Args:
- -
- Return:
- - 操作是否成功
- 注意:单次传输最多95个byte
- events:
- -
- '''
- def gattc_write(self,conn_handle, value_handle, data, mode = 0):
- try:
- self._check_active()
- except:
- raise OSError
-
- if mode == 0:
- return self.gattc_write_with_no_rsp(conn_handle, value_handle, data, len(data))
- elif mode == 1:
- return self.gattc_write_with_rsp(conn_handle, value_handle, data,len(data))
- '''
- Args:
- -
- Return:
- - 操作是否成功
- events:
- -
- '''
- def gattc_exchange_mtu(self,conn_handle):
- try:
- self._check_active()
- except:
- raise OSError
- return self.pyi_gattc_exchange_mtu(conn_handle)
- def gattc_subscribe(self,conn_handle,value_handle,subscribe = 0):
- try:
- self._check_active()
- except:
- raise OSError
- return self.pyi_gattc_subscribe(conn_handle,value_handle,subscribe)
- # 通过C handle找值
- def _c2value(self, handle):
- return self._c2value_dict[str(handle)]
-
- # 通过PY handle找值
- def _py2value(self,handle):
- c_handlue = self._py2c_dict[str(handle)]
- return self._c2value(c_handlue)
- # return self._c2value(25)
-
- # 通过C handle 改值, value均为bytes类型
- def _c2_change_value(self,handle,value):
- value_bytes = 12
- # self._c2value_dict[str(handle)] = value_bytes
- self._c2value_dict[str(handle)] = value
- return 0
- # 通过py handle 改值
- def _py2_change_value(self,handle,value):
- if len(self._py2c_dict) == 0:
- return -1
- c_handlue = self._py2c_dict[str(handle)]
- value_bytes = _to_bytes(value)
- return self._c2_change_value(c_handlue,value_bytes)
-
- # 将UUID类型转换为字符串
- def _convert_ble_service_info(data):
- new_list = []
- for i in data :
- if isinstance(i,UUID) :
- new_list.append(i.value)
- new_list.append(i._UUID_bits)
- # print(i.value)
- elif isinstance(i, tuple):
- new_list.append(_convert_ble_service_info(i))
- else:
- new_list.append(i)
- # print(i)
- return tuple(new_list)
- def _count_chrs(srvs_tuple):
- srv_count = len(srvs_tuple)
- chr_count = [] # 每个服务的特性数量
- for i in range(srv_count):
- if len(srvs_tuple[i]) > 1 :
- chr_count.append(len(srvs_tuple[i][1]))
- else :
- chr_count.append(0)
- return chr_count
- def _to_bytes(data,size=None):
- data_bytes = []
- if isinstance(data,bytes):
- data_bytes = data
- elif isinstance(data,bytearray):
- data_bytes = bytes(data.decode())
- elif isinstance(data,str):
- data_bytes = bytes(data)
- elif isinstance(data,list):
- data_bytes = bytes(data)
- elif isinstance(data,int):
- data_list = []
- if data == 0:
- data_list.append(0)
- else :
- while data > 0:
- remainder = data % 256
- data_list.insert(0, remainder)
- data //= 256
- data_bytes = bytes(data_list)
- else:
- raise ValueError
-
- if size != None and len(data_bytes) != size:
- raise ValueError
-
- return data_bytes
-
|