iBeacon.py 6.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190
  1. import bluetooth
  2. class Sender():
  3. def __init__(self):
  4. self.ble = bluetooth.BLE()
  5. self._UUID_basic = [0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x10, 0x00, 0x80, 0x00, 0x00, 0x80,0x05, 0x9b, 0x34, 0xfb]
  6. self._UUID = self._UUID_basic
  7. self._Major = [0x00,0x00]
  8. self._Minor = [0x00,0x00]
  9. self._company_id = [0x4C,0x00]
  10. self._ibeacon_id = [0x02,0x15]
  11. self._rssi_level = [0xC0]
  12. def active(self,active_flag = None):
  13. if active_flag == None:
  14. return self.ble.pyi_check_active()
  15. else:
  16. if (active_flag > 0 or active_flag == True):
  17. return self.ble.pyi_active(True)
  18. elif (active_flag == 0 or active_flag == False):
  19. return self.ble.pyi_active(False)
  20. def config(self,*param_name, **kv):
  21. try:
  22. self.ble._check_active()
  23. except:
  24. raise OSError
  25. print(param_name)
  26. if len(param_name) != 0:
  27. first_param = param_name[0]
  28. if first_param == "UUID":
  29. return bytes(self._UUID)
  30. elif first_param == "Major":
  31. return self._Major[0] * 65536 + self._Major[1]
  32. elif first_param == "Minor":
  33. return self._Minor[0] * 65536 + self._Minor[1]
  34. elif first_param == "company_id":
  35. return self._company_id
  36. elif first_param == "ibeacon_id":
  37. return self._ibeacon_id
  38. elif first_param == "rssi_level":
  39. return self._rssi_level[0]
  40. else:
  41. raise KeyError
  42. if "UUID" in kv:
  43. self._input2UUID(kv["UUID"])
  44. if "Major" in kv:
  45. self._Major = self._uint2list(kv["Major"])
  46. if "Minor" in kv:
  47. self._Minor = self._uint2list(kv["Minor"])
  48. if "company_id" in kv:
  49. self._company_id = self._uint2list(kv["company_id"])
  50. if "ibeacon_id" in kv:
  51. self._ibeacon_id = self._uint2list(kv["ibeacon_id"])
  52. if "rssi_level" in kv:
  53. self._rssi_level = self._uint2list(kv["rssi_level"],1)
  54. return 0
  55. def adv(self,interval_us):
  56. try:
  57. self.ble._check_active()
  58. except:
  59. raise OSError
  60. flags = [0x01,0x02]
  61. ibeacon_data = [0xFF] + self._company_id + self._ibeacon_id + self._UUID + self._Major + self._Minor + self._rssi_level
  62. adv_data = [flags,ibeacon_data]
  63. # TODO:指定函数赋值问题
  64. return self.ble.gap_advertise(interval_us,adv_data=adv_data,connectable=False, adv_data_append=False)
  65. # self.ble.gap_advertise(interval_us,adv_data,connectable=False,adv_data_append=False)
  66. # self.ble.gap_advertise(6250,"adv_test","rsp_test",connectable=False)
  67. # self.ble.gap_advertise(6250,"adv_test","rsp_test")
  68. def _input2UUID(self,value):
  69. value_list = []
  70. if isinstance(value, bytes):
  71. if len(value) == 16:
  72. value_list = list[value]
  73. elif isinstance(value,int):
  74. value_list = self._uint2list(value)
  75. elif isinstance(value,list):
  76. value_list = value
  77. elif isinstance(value,str):
  78. if len(value) == 36 and (value[8] == '-') and (value[13] == '-') and (value[18] == '-') and (value[23] == '-'):
  79. value_str = value.replace("-","")
  80. if len(value_str) == 32:
  81. for i in range(0, 32, 2):
  82. value_list.append(int(value_str[i:i+2],16))
  83. else :
  84. raise ValueError
  85. else :
  86. raise ValueError
  87. else:
  88. raise ValueError
  89. UUID_bytes = len(value_list)
  90. if UUID_bytes == 2:
  91. self._UUID = self._UUID_basic
  92. self._UUID[2] = value_list[0]
  93. self._UUID[3] = value_list[1]
  94. elif UUID_bytes == 4:
  95. self._UUID = self._UUID_basic
  96. self._UUID[0] = value_list[0]
  97. self._UUID[1] = value_list[1]
  98. self._UUID[2] = value_list[2]
  99. self._UUID[3] = value_list[3]
  100. else:
  101. self._UUID = value_list
  102. def _uint2list(self,value,bytes_size = 2):
  103. if isinstance(value,int):
  104. value_list = []
  105. if value >= 0 and value < 65536 : #16 bit
  106. value_list.append(int(value/256))
  107. value_list.append(value%256)
  108. elif value > 65535 and value < 65536 * 65536 and bytes_size == 2: #32 bit
  109. value_list.append(int(value/(65536 * 256)))
  110. value_list.append(int((value%(65536 * 256))/65536))
  111. value_list.append(int((value%(65536))/256))
  112. value_list.append((value%(256)))
  113. else :
  114. raise ValueError
  115. return value_list
  116. else:
  117. raise ValueError
  118. class Receiver():
  119. def __init__(self) -> None:
  120. self._ble = bluetooth.BLE()
  121. self._ble.irq(self._self_irq)
  122. self.callback = None
  123. def active(self,active_flag = None):
  124. if active_flag == None:
  125. return self._ble.pyi_check_active()
  126. else:
  127. if (active_flag > 0 or active_flag == True):
  128. return self._ble.pyi_active(True)
  129. elif (active_flag == 0 or active_flag == False):
  130. return self._ble.pyi_active(False)
  131. def scan(self, duration_ms,interval_us=1280000, window_us=11250):
  132. return self._ble.gap_scan(duration_ms,interval_us,interval_us)
  133. def irq(self,func):
  134. self.callback = func
  135. def _self_irq(self,event_id,data):
  136. # 过滤
  137. if self.callback != None:
  138. if event_id == 5: #_IRQ_SCAN_RESULT
  139. # addr_type, addr, adv_type, rssi, adv_data = data
  140. addr = data[1]
  141. rssi = data[3]
  142. if self._is_ibeacon_packet(data[4]):
  143. uuid,measured_power,Major,Minor,company_id = self._unpack_ibeacon(data[4])
  144. self.callback(event_id,(addr,uuid,rssi,measured_power,Major,Minor,company_id))
  145. elif event_id == 6: #_IRQ_SCAN_DONE
  146. self.callback(event_id,data)
  147. def _unpack_ibeacon(self,value):
  148. company_id = bytes([value[6],value[5]])
  149. uuid = value[9:25]
  150. Major = value[25] * 256 + value[26]
  151. Minor = value[27] * 256 + value[28]
  152. measured_power = value[29] - 256
  153. return uuid,measured_power,Major,Minor,company_id
  154. # TODO:判断是不是iBeacon包的绝对正确条件未确定
  155. def _is_ibeacon_packet(self,data):
  156. if len(data) == 30 :
  157. if data[0] == 0x02 and data[1] == 0x01 and data[3] == 0x1A and data[4] == 0xFF \
  158. and data[7] == 0x02 and data[8] == 0x15 :
  159. return True
  160. else:
  161. return False
  162. else:
  163. return False