ble_cli.py 7.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211
  1. # SPDX-FileCopyrightText: 2018-2022 Espressif Systems (Shanghai) CO LTD
  2. # SPDX-License-Identifier: Apache-2.0
  3. #
  4. import platform
  5. from utils import hex_str_to_bytes, str_to_bytes
  6. fallback = True
  7. # Check if required packages are installed
  8. # else fallback to console mode
  9. try:
  10. import bleak
  11. fallback = False
  12. except ImportError:
  13. pass
  14. # --------------------------------------------------------------------
  15. def device_sort(device):
  16. return device.address
  17. class BLE_Bleak_Client:
  18. def __init__(self):
  19. self.adapter = None
  20. self.adapter_props = None
  21. self.characteristics = dict()
  22. self.chrc_names = None
  23. self.device = None
  24. self.devname = None
  25. self.iface = None
  26. self.nu_lookup = None
  27. self.services = None
  28. self.srv_uuid_adv = None
  29. self.srv_uuid_fallback = None
  30. async def connect(self, devname, iface, chrc_names, fallback_srv_uuid):
  31. self.devname = devname
  32. self.srv_uuid_fallback = fallback_srv_uuid
  33. self.chrc_names = [name.lower() for name in chrc_names]
  34. self.iface = iface
  35. print('Discovering...')
  36. try:
  37. devices = await bleak.discover()
  38. except bleak.exc.BleakDBusError as e:
  39. if str(e) == '[org.bluez.Error.NotReady] Resource Not Ready':
  40. raise RuntimeError('Bluetooth is not ready. Maybe try `bluetoothctl power on`?')
  41. raise
  42. found_device = None
  43. if self.devname is None:
  44. if len(devices) == 0:
  45. print('No devices found!')
  46. exit(1)
  47. while True:
  48. devices.sort(key=device_sort)
  49. print('==== BLE Discovery results ====')
  50. print('{0: >4} {1: <33} {2: <12}'.format(
  51. 'S.N.', 'Name', 'Address'))
  52. for i, _ in enumerate(devices):
  53. print('[{0: >2}] {1: <33} {2: <12}'.format(i + 1, devices[i].name or 'Unknown', devices[i].address))
  54. while True:
  55. try:
  56. select = int(input('Select device by number (0 to rescan) : '))
  57. if select < 0 or select > len(devices):
  58. raise ValueError
  59. break
  60. except ValueError:
  61. print('Invalid input! Retry')
  62. if select != 0:
  63. break
  64. devices = await bleak.discover()
  65. self.devname = devices[select - 1].name
  66. found_device = devices[select - 1]
  67. else:
  68. for d in devices:
  69. if d.name == self.devname:
  70. found_device = d
  71. if not found_device:
  72. raise RuntimeError('Device not found')
  73. uuids = found_device.metadata['uuids']
  74. # There should be 1 service UUID in advertising data
  75. # If bluez had cached an old version of the advertisement data
  76. # the list of uuids may be incorrect, in which case connection
  77. # or service discovery may fail the first time. If that happens
  78. # the cache will be refreshed before next retry
  79. if len(uuids) == 1:
  80. self.srv_uuid_adv = uuids[0]
  81. print('Connecting...')
  82. self.device = bleak.BleakClient(found_device.address)
  83. await self.device.connect()
  84. # must be paired on Windows to access characteristics;
  85. # cannot be paired on Mac
  86. if platform.system() == 'Windows':
  87. await self.device.pair()
  88. print('Getting Services...')
  89. services = await self.device.get_services()
  90. service = services[self.srv_uuid_adv] or services[self.srv_uuid_fallback]
  91. if not service:
  92. await self.device.disconnect()
  93. self.device = None
  94. raise RuntimeError('Provisioning service not found')
  95. nu_lookup = dict()
  96. for characteristic in service.characteristics:
  97. for descriptor in characteristic.descriptors:
  98. if descriptor.uuid[4:8] != '2901':
  99. continue
  100. readval = await self.device.read_gatt_descriptor(descriptor.handle)
  101. found_name = ''.join(chr(b) for b in readval).lower()
  102. nu_lookup[found_name] = characteristic.uuid
  103. self.characteristics[characteristic.uuid] = characteristic
  104. match_found = True
  105. for name in self.chrc_names:
  106. if name not in nu_lookup:
  107. # Endpoint name not present
  108. match_found = False
  109. break
  110. # Create lookup table only if all endpoint names found
  111. self.nu_lookup = [None, nu_lookup][match_found]
  112. return True
  113. def get_nu_lookup(self):
  114. return self.nu_lookup
  115. def has_characteristic(self, uuid):
  116. print('checking for characteristic ' + uuid)
  117. if uuid in self.characteristics:
  118. return True
  119. return False
  120. async def disconnect(self):
  121. if self.device:
  122. print('Disconnecting...')
  123. if platform.system() == 'Windows':
  124. await self.device.unpair()
  125. await self.device.disconnect()
  126. self.device = None
  127. self.nu_lookup = None
  128. self.characteristics = dict()
  129. async def send_data(self, characteristic_uuid, data):
  130. await self.device.write_gatt_char(characteristic_uuid, bytearray(data.encode('latin-1')), True)
  131. readval = await self.device.read_gatt_char(characteristic_uuid)
  132. return ''.join(chr(b) for b in readval)
  133. # --------------------------------------------------------------------
  134. # Console based BLE client for Cross Platform support
  135. class BLE_Console_Client:
  136. async def connect(self, devname, iface, chrc_names, fallback_srv_uuid):
  137. print('BLE client is running in console mode')
  138. print('\tThis could be due to your platform not being supported or dependencies not being met')
  139. print('\tPlease ensure all pre-requisites are met to run the full fledged client')
  140. print('BLECLI >> Please connect to BLE device `' + devname + '` manually using your tool of choice')
  141. resp = input('BLECLI >> Was the device connected successfully? [y/n] ')
  142. if resp != 'Y' and resp != 'y':
  143. return False
  144. print('BLECLI >> List available attributes of the connected device')
  145. resp = input("BLECLI >> Is the service UUID '" + fallback_srv_uuid + "' listed among available attributes? [y/n] ")
  146. if resp != 'Y' and resp != 'y':
  147. return False
  148. return True
  149. def get_nu_lookup(self):
  150. return None
  151. def has_characteristic(self, uuid):
  152. resp = input("BLECLI >> Is the characteristic UUID '" + uuid + "' listed among available attributes? [y/n] ")
  153. if resp != 'Y' and resp != 'y':
  154. return False
  155. return True
  156. async def disconnect(self):
  157. pass
  158. async def send_data(self, characteristic_uuid, data):
  159. print("BLECLI >> Write following data to characteristic with UUID '" + characteristic_uuid + "' :")
  160. print('\t>> ' + str_to_bytes(data).hex())
  161. print('BLECLI >> Enter data read from characteristic (in hex) :')
  162. resp = input('\t<< ')
  163. return hex_str_to_bytes(resp)
  164. # --------------------------------------------------------------------
  165. # Function to get client instance depending upon platform
  166. def get_client():
  167. if fallback:
  168. return BLE_Console_Client()
  169. return BLE_Bleak_Client()