ble_cli.py 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383
  1. # Copyright 2018 Espressif Systems (Shanghai) PTE LTD
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the "License");
  4. # you may not use this file except in compliance with the License.
  5. # You may obtain a copy of the License at
  6. #
  7. # http://www.apache.org/licenses/LICENSE-2.0
  8. #
  9. # Unless required by applicable law or agreed to in writing, software
  10. # distributed under the License is distributed on an "AS IS" BASIS,
  11. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. # See the License for the specific language governing permissions and
  13. # limitations under the License.
  14. #
  15. from __future__ import print_function
  16. import platform
  17. from builtins import input
  18. import utils
  19. from future.utils import iteritems
  20. fallback = True
  21. # Check if platform is Linux and required packages are installed
  22. # else fallback to console mode
  23. if platform.system() == 'Linux':
  24. try:
  25. import time
  26. import dbus
  27. import dbus.mainloop.glib
  28. fallback = False
  29. except ImportError:
  30. pass
  31. # --------------------------------------------------------------------
  32. # BLE client (Linux Only) using Bluez and DBus
  33. class BLE_Bluez_Client:
  34. def __init__(self):
  35. self.adapter_props = None
  36. def connect(self, devname, iface, chrc_names, fallback_srv_uuid):
  37. self.devname = devname
  38. self.srv_uuid_fallback = fallback_srv_uuid
  39. self.chrc_names = [name.lower() for name in chrc_names]
  40. self.device = None
  41. self.adapter = None
  42. self.services = None
  43. self.nu_lookup = None
  44. self.characteristics = dict()
  45. self.srv_uuid_adv = None
  46. dbus.mainloop.glib.DBusGMainLoop(set_as_default=True)
  47. bus = dbus.SystemBus()
  48. manager = dbus.Interface(bus.get_object('org.bluez', '/'), 'org.freedesktop.DBus.ObjectManager')
  49. objects = manager.GetManagedObjects()
  50. adapter_path = None
  51. for path, interfaces in iteritems(objects):
  52. adapter = interfaces.get('org.bluez.Adapter1')
  53. if adapter is not None:
  54. if path.endswith(iface):
  55. self.adapter = dbus.Interface(bus.get_object('org.bluez', path), 'org.bluez.Adapter1')
  56. self.adapter_props = dbus.Interface(bus.get_object('org.bluez', path), 'org.freedesktop.DBus.Properties')
  57. adapter_path = path
  58. break
  59. if self.adapter is None:
  60. raise RuntimeError('Bluetooth adapter not found')
  61. # Power on bluetooth adapter
  62. self.adapter_props.Set('org.bluez.Adapter1', 'Powered', dbus.Boolean(1))
  63. print('checking if adapter is powered on')
  64. for cnt in range(10, 0, -1):
  65. time.sleep(5)
  66. powered_on = self.adapter_props.Get('org.bluez.Adapter1', 'Powered')
  67. if powered_on == 1:
  68. # Set adapter props again with powered on value
  69. self.adapter_props = dbus.Interface(bus.get_object('org.bluez', adapter_path), 'org.freedesktop.DBus.Properties')
  70. print('bluetooth adapter powered on')
  71. break
  72. print('number of retries left({})'.format(cnt - 1))
  73. if powered_on == 0:
  74. raise RuntimeError('Failed to starte bluetooth adapter')
  75. # Start discovery if not already discovering
  76. started_discovery = 0
  77. discovery_val = self.adapter_props.Get('org.bluez.Adapter1', 'Discovering')
  78. if discovery_val == 0:
  79. print('starting discovery')
  80. self.adapter.StartDiscovery()
  81. # Set as start discovery is called
  82. started_discovery = 1
  83. for cnt in range(10, 0, -1):
  84. time.sleep(5)
  85. discovery_val = self.adapter_props.Get('org.bluez.Adapter1', 'Discovering')
  86. if discovery_val == 1:
  87. print('start discovery successful')
  88. break
  89. print('number of retries left ({})'.format(cnt - 1))
  90. if discovery_val == 0:
  91. print('start discovery failed')
  92. raise RuntimeError('Failed to start discovery')
  93. retry = 10
  94. while (retry > 0):
  95. try:
  96. if self.device is None:
  97. print('Connecting...')
  98. # Wait for device to be discovered
  99. time.sleep(5)
  100. connected = self._connect_()
  101. if connected:
  102. print('Connected')
  103. else:
  104. return False
  105. print('Getting Services...')
  106. # Wait for services to be discovered
  107. time.sleep(5)
  108. self._get_services_()
  109. return True
  110. except Exception as e:
  111. print(e)
  112. retry -= 1
  113. print('Retries left', retry)
  114. continue
  115. # Call StopDiscovery() for corresponding StartDiscovery() session
  116. if started_discovery == 1:
  117. print('stopping discovery')
  118. self.adapter.StopDiscovery()
  119. for cnt in range(10, 0, -1):
  120. time.sleep(5)
  121. discovery_val = self.adapter_props.Get('org.bluez.Adapter1', 'Discovering')
  122. if discovery_val == 0:
  123. print('stop discovery successful')
  124. break
  125. print('number of retries left ({})'.format(cnt - 1))
  126. if discovery_val == 1:
  127. print('stop discovery failed')
  128. return False
  129. def _connect_(self):
  130. bus = dbus.SystemBus()
  131. manager = dbus.Interface(bus.get_object('org.bluez', '/'), 'org.freedesktop.DBus.ObjectManager')
  132. objects = manager.GetManagedObjects()
  133. dev_path = None
  134. for path, interfaces in iteritems(objects):
  135. if 'org.bluez.Device1' not in interfaces:
  136. continue
  137. if interfaces['org.bluez.Device1'].get('Name') == self.devname:
  138. dev_path = path
  139. break
  140. if dev_path is None:
  141. raise RuntimeError('BLE device not found')
  142. try:
  143. self.device = bus.get_object('org.bluez', dev_path)
  144. try:
  145. uuids = self.device.Get('org.bluez.Device1', 'UUIDs',
  146. dbus_interface='org.freedesktop.DBus.Properties')
  147. # There should be 1 service UUID in advertising data
  148. # If bluez had cached an old version of the advertisement data
  149. # the list of uuids may be incorrect, in which case connection
  150. # or service discovery may fail the first time. If that happens
  151. # the cache will be refreshed before next retry
  152. if len(uuids) == 1:
  153. self.srv_uuid_adv = uuids[0]
  154. except dbus.exceptions.DBusException as e:
  155. raise RuntimeError(e)
  156. self.device.Connect(dbus_interface='org.bluez.Device1')
  157. # Check device is connected successfully
  158. for cnt in range(10, 0, -1):
  159. time.sleep(5)
  160. device_conn = self.device.Get(
  161. 'org.bluez.Device1',
  162. 'Connected',
  163. dbus_interface='org.freedesktop.DBus.Properties')
  164. if device_conn == 1:
  165. print('device is connected')
  166. break
  167. print('number of retries left ({})'.format(cnt - 1))
  168. if device_conn == 0:
  169. print('failed to connect device')
  170. return False
  171. return True
  172. except Exception as e:
  173. print(e)
  174. self.device = None
  175. raise RuntimeError('BLE device could not connect')
  176. def _get_services_(self):
  177. bus = dbus.SystemBus()
  178. manager = dbus.Interface(bus.get_object('org.bluez', '/'), 'org.freedesktop.DBus.ObjectManager')
  179. objects = manager.GetManagedObjects()
  180. service_found = False
  181. for srv_path, srv_interfaces in iteritems(objects):
  182. if 'org.bluez.GattService1' not in srv_interfaces:
  183. continue
  184. if not srv_path.startswith(self.device.object_path):
  185. continue
  186. service = bus.get_object('org.bluez', srv_path)
  187. srv_uuid = service.Get('org.bluez.GattService1', 'UUID',
  188. dbus_interface='org.freedesktop.DBus.Properties')
  189. # If service UUID doesn't match the one found in advertisement data
  190. # then also check if it matches the fallback UUID
  191. if srv_uuid not in [self.srv_uuid_adv, self.srv_uuid_fallback]:
  192. continue
  193. nu_lookup = dict()
  194. characteristics = dict()
  195. for chrc_path, chrc_interfaces in iteritems(objects):
  196. if 'org.bluez.GattCharacteristic1' not in chrc_interfaces:
  197. continue
  198. if not chrc_path.startswith(service.object_path):
  199. continue
  200. chrc = bus.get_object('org.bluez', chrc_path)
  201. uuid = chrc.Get('org.bluez.GattCharacteristic1', 'UUID',
  202. dbus_interface='org.freedesktop.DBus.Properties')
  203. characteristics[uuid] = chrc
  204. for desc_path, desc_interfaces in iteritems(objects):
  205. if 'org.bluez.GattDescriptor1' not in desc_interfaces:
  206. continue
  207. if not desc_path.startswith(chrc.object_path):
  208. continue
  209. desc = bus.get_object('org.bluez', desc_path)
  210. desc_uuid = desc.Get('org.bluez.GattDescriptor1', 'UUID',
  211. dbus_interface='org.freedesktop.DBus.Properties')
  212. if desc_uuid[4:8] != '2901':
  213. continue
  214. try:
  215. readval = desc.ReadValue({}, dbus_interface='org.bluez.GattDescriptor1')
  216. except dbus.exceptions.DBusException as err:
  217. raise RuntimeError('Failed to read value for descriptor while getting services - {}'.format(err))
  218. found_name = ''.join(chr(b) for b in readval).lower()
  219. nu_lookup[found_name] = uuid
  220. break
  221. match_found = True
  222. for name in self.chrc_names:
  223. if name not in nu_lookup:
  224. # Endpoint name not present
  225. match_found = False
  226. break
  227. # Create lookup table only if all endpoint names found
  228. self.nu_lookup = [None, nu_lookup][match_found]
  229. self.characteristics = characteristics
  230. service_found = True
  231. # If the service UUID matches that in the advertisement
  232. # we can stop the search now. If it doesn't match, we
  233. # have found the service corresponding to the fallback
  234. # UUID, in which case don't break and keep searching
  235. # for the advertised service
  236. if srv_uuid == self.srv_uuid_adv:
  237. break
  238. if not service_found:
  239. self.device.Disconnect(dbus_interface='org.bluez.Device1')
  240. # Check if device is disconnected successfully
  241. self._check_device_disconnected()
  242. if self.adapter:
  243. self.adapter.RemoveDevice(self.device)
  244. self.device = None
  245. self.nu_lookup = None
  246. self.characteristics = dict()
  247. raise RuntimeError('Provisioning service not found')
  248. def get_nu_lookup(self):
  249. return self.nu_lookup
  250. def has_characteristic(self, uuid):
  251. if uuid in self.characteristics:
  252. return True
  253. return False
  254. def disconnect(self):
  255. if self.device:
  256. self.device.Disconnect(dbus_interface='org.bluez.Device1')
  257. # Check if device is disconnected successfully
  258. self._check_device_disconnected()
  259. if self.adapter:
  260. self.adapter.RemoveDevice(self.device)
  261. self.device = None
  262. self.nu_lookup = None
  263. self.characteristics = dict()
  264. if self.adapter_props:
  265. self.adapter_props.Set('org.bluez.Adapter1', 'Powered', dbus.Boolean(0))
  266. def _check_device_disconnected(self):
  267. for cnt in range(10, 0, -1):
  268. time.sleep(5)
  269. device_conn = self.device.Get(
  270. 'org.bluez.Device1',
  271. 'Connected',
  272. dbus_interface='org.freedesktop.DBus.Properties')
  273. if device_conn == 0:
  274. print('device disconnected')
  275. break
  276. print('number of retries left ({})'.format(cnt - 1))
  277. if device_conn == 1:
  278. print('failed to disconnect device')
  279. def send_data(self, characteristic_uuid, data):
  280. try:
  281. path = self.characteristics[characteristic_uuid]
  282. except KeyError:
  283. raise RuntimeError('Invalid characteristic : ' + characteristic_uuid)
  284. try:
  285. path.WriteValue([ord(c) for c in data], {}, dbus_interface='org.bluez.GattCharacteristic1')
  286. except TypeError: # python3 compatible
  287. path.WriteValue([c for c in data], {}, dbus_interface='org.bluez.GattCharacteristic1')
  288. except dbus.exceptions.DBusException as e:
  289. raise RuntimeError('Failed to write value to characteristic ' + characteristic_uuid + ': ' + str(e))
  290. try:
  291. readval = path.ReadValue({}, dbus_interface='org.bluez.GattCharacteristic1')
  292. except dbus.exceptions.DBusException as e:
  293. raise RuntimeError('Failed to read value from characteristic ' + characteristic_uuid + ': ' + str(e))
  294. return ''.join(chr(b) for b in readval)
  295. # --------------------------------------------------------------------
  296. # Console based BLE client for Cross Platform support
  297. class BLE_Console_Client:
  298. def connect(self, devname, iface, chrc_names, fallback_srv_uuid):
  299. print('BLE client is running in console mode')
  300. print('\tThis could be due to your platform not being supported or dependencies not being met')
  301. print('\tPlease ensure all pre-requisites are met to run the full fledged client')
  302. print('BLECLI >> Please connect to BLE device `' + devname + '` manually using your tool of choice')
  303. resp = input('BLECLI >> Was the device connected successfully? [y/n] ')
  304. if resp != 'Y' and resp != 'y':
  305. return False
  306. print('BLECLI >> List available attributes of the connected device')
  307. resp = input("BLECLI >> Is the service UUID '" + fallback_srv_uuid + "' listed among available attributes? [y/n] ")
  308. if resp != 'Y' and resp != 'y':
  309. return False
  310. return True
  311. def get_nu_lookup(self):
  312. return None
  313. def has_characteristic(self, uuid):
  314. resp = input("BLECLI >> Is the characteristic UUID '" + uuid + "' listed among available attributes? [y/n] ")
  315. if resp != 'Y' and resp != 'y':
  316. return False
  317. return True
  318. def disconnect(self):
  319. pass
  320. def send_data(self, characteristic_uuid, data):
  321. print("BLECLI >> Write following data to characteristic with UUID '" + characteristic_uuid + "' :")
  322. print('\t>> ' + utils.str_to_hexstr(data))
  323. print('BLECLI >> Enter data read from characteristic (in hex) :')
  324. resp = input('\t<< ')
  325. return utils.hexstr_to_str(resp)
  326. # --------------------------------------------------------------------
  327. # Function to get client instance depending upon platform
  328. def get_client():
  329. if fallback:
  330. return BLE_Console_Client()
  331. return BLE_Bluez_Client()