ble_cli.py 7.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213
  1. # SPDX-FileCopyrightText: 2018-2023 Espressif Systems (Shanghai) CO LTD
  2. # SPDX-License-Identifier: Apache-2.0
  3. #
  4. import platform
  5. from utils import hex_str_to_bytes, str_to_bytes
  6. fallback = True
  7. # Check if required packages are installed
  8. # else fallback to console mode
  9. try:
  10. import bleak
  11. fallback = False
  12. except ImportError:
  13. pass
  14. # --------------------------------------------------------------------
  15. def device_sort(device):
  16. return device[0].address
  17. class BLE_Bleak_Client:
  18. def __init__(self):
  19. self.adapter = None
  20. self.adapter_props = None
  21. self.characteristics = dict()
  22. self.chrc_names = None
  23. self.device = None
  24. self.devname = None
  25. self.iface = None
  26. self.nu_lookup = None
  27. self.services = None
  28. self.srv_uuid_adv = None
  29. self.srv_uuid_fallback = None
  30. async def connect(self, devname, iface, chrc_names, fallback_srv_uuid):
  31. self.devname = devname
  32. self.srv_uuid_fallback = fallback_srv_uuid
  33. self.chrc_names = [name.lower() for name in chrc_names]
  34. self.iface = iface
  35. print('Discovering...')
  36. try:
  37. discovery = await bleak.BleakScanner.discover(return_adv=True)
  38. devices = list(discovery.values())
  39. except bleak.exc.BleakDBusError as e:
  40. if str(e) == '[org.bluez.Error.NotReady] Resource Not Ready':
  41. raise RuntimeError('Bluetooth is not ready. Maybe try `bluetoothctl power on`?')
  42. raise
  43. found_device = None
  44. if self.devname is None:
  45. if len(devices) == 0:
  46. print('No devices found!')
  47. exit(1)
  48. while True:
  49. devices.sort(key=device_sort)
  50. print('==== BLE Discovery results ====')
  51. print('{0: >4} {1: <33} {2: <12}'.format(
  52. 'S.N.', 'Name', 'Address'))
  53. for i, _ in enumerate(devices):
  54. print('[{0: >2}] {1: <33} {2: <12}'.format(i + 1, devices[i][0].name or 'Unknown', devices[i][0].address))
  55. while True:
  56. try:
  57. select = int(input('Select device by number (0 to rescan) : '))
  58. if select < 0 or select > len(devices):
  59. raise ValueError
  60. break
  61. except ValueError:
  62. print('Invalid input! Retry')
  63. if select != 0:
  64. break
  65. discovery = await bleak.BleakScanner.discover(return_adv=True)
  66. devices = list(discovery.values())
  67. self.devname = devices[select - 1][0].name
  68. found_device = devices[select - 1]
  69. else:
  70. for d in devices:
  71. if d[0].name == self.devname:
  72. found_device = d
  73. if not found_device:
  74. raise RuntimeError('Device not found')
  75. uuids = found_device[1].service_uuids
  76. # There should be 1 service UUID in advertising data
  77. # If bluez had cached an old version of the advertisement data
  78. # the list of uuids may be incorrect, in which case connection
  79. # or service discovery may fail the first time. If that happens
  80. # the cache will be refreshed before next retry
  81. if len(uuids) == 1:
  82. self.srv_uuid_adv = uuids[0]
  83. print('Connecting...')
  84. self.device = bleak.BleakClient(found_device[0].address)
  85. await self.device.connect()
  86. # must be paired on Windows to access characteristics;
  87. # cannot be paired on Mac
  88. if platform.system() == 'Windows':
  89. await self.device.pair()
  90. print('Getting Services...')
  91. services = self.device.services
  92. service = services[self.srv_uuid_adv] or services[self.srv_uuid_fallback]
  93. if not service:
  94. await self.device.disconnect()
  95. self.device = None
  96. raise RuntimeError('Provisioning service not found')
  97. nu_lookup = dict()
  98. for characteristic in service.characteristics:
  99. for descriptor in characteristic.descriptors:
  100. if descriptor.uuid[4:8] != '2901':
  101. continue
  102. readval = await self.device.read_gatt_descriptor(descriptor.handle)
  103. found_name = ''.join(chr(b) for b in readval).lower()
  104. nu_lookup[found_name] = characteristic.uuid
  105. self.characteristics[characteristic.uuid] = characteristic
  106. match_found = True
  107. for name in self.chrc_names:
  108. if name not in nu_lookup:
  109. # Endpoint name not present
  110. match_found = False
  111. break
  112. # Create lookup table only if all endpoint names found
  113. self.nu_lookup = [None, nu_lookup][match_found]
  114. return True
  115. def get_nu_lookup(self):
  116. return self.nu_lookup
  117. def has_characteristic(self, uuid):
  118. print('checking for characteristic ' + uuid)
  119. if uuid in self.characteristics:
  120. return True
  121. return False
  122. async def disconnect(self):
  123. if self.device:
  124. print('Disconnecting...')
  125. if platform.system() == 'Windows':
  126. await self.device.unpair()
  127. await self.device.disconnect()
  128. self.device = None
  129. self.nu_lookup = None
  130. self.characteristics = dict()
  131. async def send_data(self, characteristic_uuid, data):
  132. await self.device.write_gatt_char(characteristic_uuid, bytearray(data.encode('latin-1')), True)
  133. readval = await self.device.read_gatt_char(characteristic_uuid)
  134. return ''.join(chr(b) for b in readval)
  135. # --------------------------------------------------------------------
  136. # Console based BLE client for Cross Platform support
  137. class BLE_Console_Client:
  138. async def connect(self, devname, iface, chrc_names, fallback_srv_uuid):
  139. print('BLE client is running in console mode')
  140. print('\tThis could be due to your platform not being supported or dependencies not being met')
  141. print('\tPlease ensure all pre-requisites are met to run the full fledged client')
  142. print('BLECLI >> Please connect to BLE device `' + devname + '` manually using your tool of choice')
  143. resp = input('BLECLI >> Was the device connected successfully? [y/n] ')
  144. if resp != 'Y' and resp != 'y':
  145. return False
  146. print('BLECLI >> List available attributes of the connected device')
  147. resp = input("BLECLI >> Is the service UUID '" + fallback_srv_uuid + "' listed among available attributes? [y/n] ")
  148. if resp != 'Y' and resp != 'y':
  149. return False
  150. return True
  151. def get_nu_lookup(self):
  152. return None
  153. def has_characteristic(self, uuid):
  154. resp = input("BLECLI >> Is the characteristic UUID '" + uuid + "' listed among available attributes? [y/n] ")
  155. if resp != 'Y' and resp != 'y':
  156. return False
  157. return True
  158. async def disconnect(self):
  159. pass
  160. async def send_data(self, characteristic_uuid, data):
  161. print("BLECLI >> Write following data to characteristic with UUID '" + characteristic_uuid + "' :")
  162. print('\t>> ' + str_to_bytes(data).hex())
  163. print('BLECLI >> Enter data read from characteristic (in hex) :')
  164. resp = input('\t<< ')
  165. return hex_str_to_bytes(resp)
  166. # --------------------------------------------------------------------
  167. # Function to get client instance depending upon platform
  168. def get_client():
  169. if fallback:
  170. return BLE_Console_Client()
  171. return BLE_Bleak_Client()