| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134 |
- import bluetooth
- FLAG_SRV_BSL = 0x01 # 感器位置服务
- FLAG_SRV_APPEND = 0x02 # 其他附加服务
- SENSOR_LOC_OTHER = 0X00
- SENSOR_LOC_CHEST = 0X01
- SENSOR_LOC_WRIST = 0X02
- SENSOR_LOC_FINGER = 0X03
- SENSOR_LOC_HAND = 0X04
- SENSOR_LOC_EAR_LOBE = 0X05
- SENSOR_LOC_FOOT = 0X06
- class Sender():
- hr_uuid = 0x180D
- hrm_uuid = 0x2A37
- bsl_uuid = 0x2A38 # body sensor location uuid
- hrm_handle = 20
- bsl_handle = 21
- sensor_locations = ["Other","Chest","Wrist","Finger","Hand","Ear Lobe","Foot"]
- def __init__(self):
- self._ble = bluetooth.BLE()
- self._ble.active(1)
- self._ble.config(gap_name = "HeartRate")
- self._ble.config(gap_uuid = bluetooth.UUID(0x180d))
- self._ble.irq(self._self_irq)
- self._callback = None
-
- # 激活
- def active(self,active_flag = None):
- return self._ble.active(active_flag)
-
- # 修改与查询参数
- def config(self,*param_name, **kv):
- try:
- self._ble._check_active()
- except:
- raise OSError
- # 查询参数
- if len(param_name) != 0:
- first_param = param_name[0]
- if first_param == "gap_name":
- return self._ble.config("gap_name")
- elif first_param == "sensor_location":
- return self.sensor_locations[int(self._ble.gatts_read(self.bsl_handle))]
- elif first_param == "hr":
- return int(self._ble.gatts_read(self.hrm_handle)[1])
- else:
- # TODO:未实现
- # return self._ble.config(first_param)
- pass
-
- # 修改参数
- if "gap_name" in kv:
- return self._ble.config(gap_name=kv["gap_name"])
-
- if "sensor_location" in kv:
- return self._ble.gatts_write(self.bsl_handle,kv["sensor_location"])
-
- if "hr" in kv:
- self.update_hr(kv["hr"])
- return 0
-
- # 注册服务
- def register_services(self,FLAG , services_append = None):
- HR_UUID = bluetooth.UUID(self.hr_uuid)
- HR_CHAR = (bluetooth.UUID(self.hrm_uuid), bluetooth.FLAG_NOTIFY,)
- if FLAG & FLAG_SRV_BSL != 0:
- BSL_CHAR = (bluetooth.UUID(self.bsl_uuid), bluetooth.FLAG_READ,)
- # BSL_CHAR = (bluetooth.UUID(self.bsl_uuid + 1), bluetooth.FLAG_READ,)
- HR_SERVICE = (HR_UUID, (HR_CHAR,BSL_CHAR),)
- else :
- HR_SERVICE = (HR_UUID, (HR_CHAR,),)
- services = [HR_SERVICE,]
-
- if FLAG & FLAG_SRV_APPEND != 0 and services_append != None:
- services_append = list(services_append)
- services += services_append
- return self._ble.gatts_register_services(tuple(services))
-
- # 广播
- def advertise(self, interval_us,adv_data=None,resp_data=None, connectable=True, adv_data_append=True):
- return self._ble.gap_advertise(interval_us,adv_data,resp_data, connectable,adv_data_append)
- # 设置回调函数
- def irq(self,func):
- self._callback = func
- # 默认回调函数
- def _self_irq(self,event_id,data):
- if event_id == 1: # 连接
- if self._callback == None:
- print(data)
- pass
- else :
- self._callback(event_id,data)
- elif event_id == 2: # 断开连接
- if self._callback == None: # 默认继续扫描
- print("adv again")
- self._ble.gap_advertise(6250)
- # self._ble.gap_advertise(6250,adv_data=bytearray("adv"),resp_data=bytes([0x12,0x34,0x56]))
- else :
- self._callback(event_id,data)
- elif event_id == 4: # 读请求
- if self._callback == None: # 默认可读
- return 0
- else :
- self._callback(event_id,data)
- elif event_id == 18: # 订阅通知
- if self._callback == None:
- pass
- else:
- self._callback(event_id,data)
- elif event_id == 27: # 连接参数更新
- if self._callback == None:
- print("_IRQ_CONNECTION_UPDATE")
- else:
- self._callback(event_id,data)
- else :
- self._callback(event_id,data)
-
- # 更新心率
- def update_hr(self,hr):
- # TODO: 首字节默认使用0X06, 暂未弄清楚其他数字含义
- return self._ble.gatts_write(self.hrm_handle,[0x06,hr],True)
-
|