BatteryService.py 3.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106
  1. import bluetooth
  2. FLAG_SRV_ONLY = 0x00 # 只有当前主要服务
  3. FLAG_SRV_APPEND = 0x01 # 其他附加服务
  4. class Sender():
  5. BAS_uuid = 0x180F
  6. BL_uuid = 0x2A19
  7. BL_handle = 20
  8. def __init__(self):
  9. self._ble = bluetooth.BLE()
  10. self._ble.active(1)
  11. self._ble.config(gap_name = "BatteryService")
  12. self._ble.config(gap_uuid = bluetooth.UUID(0x180F))
  13. self._ble.irq(self._self_irq)
  14. self._callback = None
  15. # 激活
  16. def active(self,active_flag = None):
  17. return self._ble.active(active_flag)
  18. # 修改与查询参数
  19. def config(self,*param_name, **kv):
  20. # 查询参数
  21. if len(param_name) != 0:
  22. first_param = param_name[0]
  23. if first_param == "gap_name":
  24. return self._ble.config("gap_name")
  25. elif first_param == "bl":
  26. return int(self._ble.gatts_read(self.BL_handle))
  27. else:
  28. return self._ble.config(first_param)
  29. # 修改参数
  30. if "gap_name" in kv:
  31. return self._ble.config(gap_name=kv["gap_name"])
  32. if "bl" in kv:
  33. self.update_bl(kv["bl"])
  34. return 0
  35. # 注册服务
  36. def register_services(self,FLAG, services_append = None):
  37. BAS_UUID = bluetooth.UUID(self.BAS_uuid)
  38. BAS_CHAR = (bluetooth.UUID(self.BL_uuid), bluetooth.FLAG_NOTIFY|bluetooth.FLAG_READ,)
  39. BAS_SERVICE = (BAS_UUID, (BAS_CHAR,),)
  40. services = [BAS_SERVICE,]
  41. if FLAG & FLAG_SRV_APPEND != 0 and services_append != None:
  42. services_append = list(services_append)
  43. services += services_append
  44. return self._ble.gatts_register_services(tuple(services))
  45. # 广播
  46. def advertise(self, interval_us,adv_data=None,resp_data=None, connectable=True, adv_data_append=True):
  47. return self._ble.gap_advertise(interval_us,adv_data,resp_data, connectable,adv_data_append)
  48. # 设置回调函数
  49. def irq(self,func):
  50. self._callback = func
  51. # 默认回调函数
  52. def _self_irq(self,event_id,data):
  53. if event_id == 1: # 连接
  54. if self._callback == None:
  55. print(data)
  56. pass
  57. else :
  58. self._callback(event_id,data)
  59. elif event_id == 2: # 断开连接
  60. if self._callback == None: # 默认继续扫描
  61. print("adv again")
  62. self._ble.gap_advertise(6250)
  63. else :
  64. self._callback(event_id,data)
  65. elif event_id == 4: # 读请求
  66. if self._callback == None: # 默认可读
  67. return 0
  68. else :
  69. self._callback(event_id,data)
  70. elif event_id == 18: # 订阅通知
  71. if self._callback == None:
  72. pass
  73. else:
  74. self._callback(event_id,data)
  75. elif event_id == 27: # 连接参数更新
  76. if self._callback == None:
  77. print("_IRQ_CONNECTION_UPDATE")
  78. else:
  79. self._callback(event_id,data)
  80. else :
  81. if self._callback == None:
  82. pass
  83. else:
  84. self._callback(event_id,data)
  85. # 更新电池
  86. def update_bl(self,bl):
  87. return self._ble.gatts_write(self.BL_handle,bl,True)