ble_cli.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296
  1. # Copyright 2018 Espressif Systems (Shanghai) PTE LTD
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the "License");
  4. # you may not use this file except in compliance with the License.
  5. # You may obtain a copy of the License at
  6. #
  7. # http://www.apache.org/licenses/LICENSE-2.0
  8. #
  9. # Unless required by applicable law or agreed to in writing, software
  10. # distributed under the License is distributed on an "AS IS" BASIS,
  11. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. # See the License for the specific language governing permissions and
  13. # limitations under the License.
  14. #
  15. from __future__ import print_function
  16. import platform
  17. from builtins import input
  18. import utils
  19. from future.utils import iteritems
  20. fallback = True
  21. # Check if platform is Linux and required packages are installed
  22. # else fallback to console mode
  23. if platform.system() == 'Linux':
  24. try:
  25. import time
  26. import dbus
  27. import dbus.mainloop.glib
  28. fallback = False
  29. except ImportError:
  30. pass
  31. # --------------------------------------------------------------------
  32. # BLE client (Linux Only) using Bluez and DBus
  33. class BLE_Bluez_Client:
  34. def connect(self, devname, iface, chrc_names, fallback_srv_uuid):
  35. self.devname = devname
  36. self.srv_uuid_fallback = fallback_srv_uuid
  37. self.chrc_names = [name.lower() for name in chrc_names]
  38. self.device = None
  39. self.adapter = None
  40. self.adapter_props = None
  41. self.services = None
  42. self.nu_lookup = None
  43. self.characteristics = dict()
  44. self.srv_uuid_adv = None
  45. dbus.mainloop.glib.DBusGMainLoop(set_as_default=True)
  46. bus = dbus.SystemBus()
  47. manager = dbus.Interface(bus.get_object('org.bluez', '/'), 'org.freedesktop.DBus.ObjectManager')
  48. objects = manager.GetManagedObjects()
  49. for path, interfaces in iteritems(objects):
  50. adapter = interfaces.get('org.bluez.Adapter1')
  51. if adapter is not None:
  52. if path.endswith(iface):
  53. self.adapter = dbus.Interface(bus.get_object('org.bluez', path), 'org.bluez.Adapter1')
  54. self.adapter_props = dbus.Interface(bus.get_object('org.bluez', path), 'org.freedesktop.DBus.Properties')
  55. break
  56. if self.adapter is None:
  57. raise RuntimeError('Bluetooth adapter not found')
  58. self.adapter_props.Set('org.bluez.Adapter1', 'Powered', dbus.Boolean(1))
  59. self.adapter.StartDiscovery()
  60. retry = 10
  61. while (retry > 0):
  62. try:
  63. if self.device is None:
  64. print('Connecting...')
  65. # Wait for device to be discovered
  66. time.sleep(5)
  67. self._connect_()
  68. print('Connected')
  69. print('Getting Services...')
  70. # Wait for services to be discovered
  71. time.sleep(5)
  72. self._get_services_()
  73. return True
  74. except Exception as e:
  75. print(e)
  76. retry -= 1
  77. print('Retries left', retry)
  78. continue
  79. self.adapter.StopDiscovery()
  80. return False
  81. def _connect_(self):
  82. bus = dbus.SystemBus()
  83. manager = dbus.Interface(bus.get_object('org.bluez', '/'), 'org.freedesktop.DBus.ObjectManager')
  84. objects = manager.GetManagedObjects()
  85. dev_path = None
  86. for path, interfaces in iteritems(objects):
  87. if 'org.bluez.Device1' not in interfaces:
  88. continue
  89. if interfaces['org.bluez.Device1'].get('Name') == self.devname:
  90. dev_path = path
  91. break
  92. if dev_path is None:
  93. raise RuntimeError('BLE device not found')
  94. try:
  95. self.device = bus.get_object('org.bluez', dev_path)
  96. try:
  97. uuids = self.device.Get('org.bluez.Device1', 'UUIDs',
  98. dbus_interface='org.freedesktop.DBus.Properties')
  99. # There should be 1 service UUID in advertising data
  100. # If bluez had cached an old version of the advertisement data
  101. # the list of uuids may be incorrect, in which case connection
  102. # or service discovery may fail the first time. If that happens
  103. # the cache will be refreshed before next retry
  104. if len(uuids) == 1:
  105. self.srv_uuid_adv = uuids[0]
  106. except dbus.exceptions.DBusException as e:
  107. print(e)
  108. self.device.Connect(dbus_interface='org.bluez.Device1')
  109. except Exception as e:
  110. print(e)
  111. self.device = None
  112. raise RuntimeError('BLE device could not connect')
  113. def _get_services_(self):
  114. bus = dbus.SystemBus()
  115. manager = dbus.Interface(bus.get_object('org.bluez', '/'), 'org.freedesktop.DBus.ObjectManager')
  116. objects = manager.GetManagedObjects()
  117. service_found = False
  118. for srv_path, srv_interfaces in iteritems(objects):
  119. if 'org.bluez.GattService1' not in srv_interfaces:
  120. continue
  121. if not srv_path.startswith(self.device.object_path):
  122. continue
  123. service = bus.get_object('org.bluez', srv_path)
  124. srv_uuid = service.Get('org.bluez.GattService1', 'UUID',
  125. dbus_interface='org.freedesktop.DBus.Properties')
  126. # If service UUID doesn't match the one found in advertisement data
  127. # then also check if it matches the fallback UUID
  128. if srv_uuid not in [self.srv_uuid_adv, self.srv_uuid_fallback]:
  129. continue
  130. nu_lookup = dict()
  131. characteristics = dict()
  132. for chrc_path, chrc_interfaces in iteritems(objects):
  133. if 'org.bluez.GattCharacteristic1' not in chrc_interfaces:
  134. continue
  135. if not chrc_path.startswith(service.object_path):
  136. continue
  137. chrc = bus.get_object('org.bluez', chrc_path)
  138. uuid = chrc.Get('org.bluez.GattCharacteristic1', 'UUID',
  139. dbus_interface='org.freedesktop.DBus.Properties')
  140. characteristics[uuid] = chrc
  141. for desc_path, desc_interfaces in iteritems(objects):
  142. if 'org.bluez.GattDescriptor1' not in desc_interfaces:
  143. continue
  144. if not desc_path.startswith(chrc.object_path):
  145. continue
  146. desc = bus.get_object('org.bluez', desc_path)
  147. desc_uuid = desc.Get('org.bluez.GattDescriptor1', 'UUID',
  148. dbus_interface='org.freedesktop.DBus.Properties')
  149. if desc_uuid[4:8] != '2901':
  150. continue
  151. try:
  152. readval = desc.ReadValue({}, dbus_interface='org.bluez.GattDescriptor1')
  153. except dbus.exceptions.DBusException:
  154. break
  155. found_name = ''.join(chr(b) for b in readval).lower()
  156. nu_lookup[found_name] = uuid
  157. break
  158. match_found = True
  159. for name in self.chrc_names:
  160. if name not in nu_lookup:
  161. # Endpoint name not present
  162. match_found = False
  163. break
  164. # Create lookup table only if all endpoint names found
  165. self.nu_lookup = [None, nu_lookup][match_found]
  166. self.characteristics = characteristics
  167. service_found = True
  168. # If the service UUID matches that in the advertisement
  169. # we can stop the search now. If it doesn't match, we
  170. # have found the service corresponding to the fallback
  171. # UUID, in which case don't break and keep searching
  172. # for the advertised service
  173. if srv_uuid == self.srv_uuid_adv:
  174. break
  175. if not service_found:
  176. self.device.Disconnect(dbus_interface='org.bluez.Device1')
  177. if self.adapter:
  178. self.adapter.RemoveDevice(self.device)
  179. self.device = None
  180. self.nu_lookup = None
  181. self.characteristics = dict()
  182. raise RuntimeError('Provisioning service not found')
  183. def get_nu_lookup(self):
  184. return self.nu_lookup
  185. def has_characteristic(self, uuid):
  186. if uuid in self.characteristics:
  187. return True
  188. return False
  189. def disconnect(self):
  190. if self.device:
  191. self.device.Disconnect(dbus_interface='org.bluez.Device1')
  192. if self.adapter:
  193. self.adapter.RemoveDevice(self.device)
  194. self.device = None
  195. self.nu_lookup = None
  196. self.characteristics = dict()
  197. if self.adapter_props:
  198. self.adapter_props.Set('org.bluez.Adapter1', 'Powered', dbus.Boolean(0))
  199. def send_data(self, characteristic_uuid, data):
  200. try:
  201. path = self.characteristics[characteristic_uuid]
  202. except KeyError:
  203. raise RuntimeError('Invalid characteristic : ' + characteristic_uuid)
  204. try:
  205. path.WriteValue([ord(c) for c in data], {}, dbus_interface='org.bluez.GattCharacteristic1')
  206. except TypeError: # python3 compatible
  207. path.WriteValue([c for c in data], {}, dbus_interface='org.bluez.GattCharacteristic1')
  208. except dbus.exceptions.DBusException as e:
  209. raise RuntimeError('Failed to write value to characteristic ' + characteristic_uuid + ': ' + str(e))
  210. try:
  211. readval = path.ReadValue({}, dbus_interface='org.bluez.GattCharacteristic1')
  212. except dbus.exceptions.DBusException as e:
  213. raise RuntimeError('Failed to read value from characteristic ' + characteristic_uuid + ': ' + str(e))
  214. return ''.join(chr(b) for b in readval)
  215. # --------------------------------------------------------------------
  216. # Console based BLE client for Cross Platform support
  217. class BLE_Console_Client:
  218. def connect(self, devname, iface, chrc_names, fallback_srv_uuid):
  219. print('BLE client is running in console mode')
  220. print('\tThis could be due to your platform not being supported or dependencies not being met')
  221. print('\tPlease ensure all pre-requisites are met to run the full fledged client')
  222. print('BLECLI >> Please connect to BLE device `' + devname + '` manually using your tool of choice')
  223. resp = input('BLECLI >> Was the device connected successfully? [y/n] ')
  224. if resp != 'Y' and resp != 'y':
  225. return False
  226. print('BLECLI >> List available attributes of the connected device')
  227. resp = input("BLECLI >> Is the service UUID '" + fallback_srv_uuid + "' listed among available attributes? [y/n] ")
  228. if resp != 'Y' and resp != 'y':
  229. return False
  230. return True
  231. def get_nu_lookup(self):
  232. return None
  233. def has_characteristic(self, uuid):
  234. resp = input("BLECLI >> Is the characteristic UUID '" + uuid + "' listed among available attributes? [y/n] ")
  235. if resp != 'Y' and resp != 'y':
  236. return False
  237. return True
  238. def disconnect(self):
  239. pass
  240. def send_data(self, characteristic_uuid, data):
  241. print("BLECLI >> Write following data to characteristic with UUID '" + characteristic_uuid + "' :")
  242. print('\t>> ' + utils.str_to_hexstr(data))
  243. print('BLECLI >> Enter data read from characteristic (in hex) :')
  244. resp = input('\t<< ')
  245. return utils.hexstr_to_str(resp)
  246. # --------------------------------------------------------------------
  247. # Function to get client instance depending upon platform
  248. def get_client():
  249. if fallback:
  250. return BLE_Console_Client()
  251. return BLE_Bluez_Client()