| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383 |
- # Copyright 2018 Espressif Systems (Shanghai) PTE LTD
- #
- # Licensed under the Apache License, Version 2.0 (the "License");
- # you may not use this file except in compliance with the License.
- # You may obtain a copy of the License at
- #
- # http://www.apache.org/licenses/LICENSE-2.0
- #
- # Unless required by applicable law or agreed to in writing, software
- # distributed under the License is distributed on an "AS IS" BASIS,
- # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- # See the License for the specific language governing permissions and
- # limitations under the License.
- #
- from __future__ import print_function
- import platform
- from builtins import input
- import utils
- from future.utils import iteritems
- fallback = True
- # Check if platform is Linux and required packages are installed
- # else fallback to console mode
- if platform.system() == 'Linux':
- try:
- import time
- import dbus
- import dbus.mainloop.glib
- fallback = False
- except ImportError:
- pass
- # --------------------------------------------------------------------
- # BLE client (Linux Only) using Bluez and DBus
- class BLE_Bluez_Client:
- def __init__(self):
- self.adapter_props = None
- def connect(self, devname, iface, chrc_names, fallback_srv_uuid):
- self.devname = devname
- self.srv_uuid_fallback = fallback_srv_uuid
- self.chrc_names = [name.lower() for name in chrc_names]
- self.device = None
- self.adapter = None
- self.services = None
- self.nu_lookup = None
- self.characteristics = dict()
- self.srv_uuid_adv = None
- dbus.mainloop.glib.DBusGMainLoop(set_as_default=True)
- bus = dbus.SystemBus()
- manager = dbus.Interface(bus.get_object('org.bluez', '/'), 'org.freedesktop.DBus.ObjectManager')
- objects = manager.GetManagedObjects()
- adapter_path = None
- for path, interfaces in iteritems(objects):
- adapter = interfaces.get('org.bluez.Adapter1')
- if adapter is not None:
- if path.endswith(iface):
- self.adapter = dbus.Interface(bus.get_object('org.bluez', path), 'org.bluez.Adapter1')
- self.adapter_props = dbus.Interface(bus.get_object('org.bluez', path), 'org.freedesktop.DBus.Properties')
- adapter_path = path
- break
- if self.adapter is None:
- raise RuntimeError('Bluetooth adapter not found')
- # Power on bluetooth adapter
- self.adapter_props.Set('org.bluez.Adapter1', 'Powered', dbus.Boolean(1))
- print('checking if adapter is powered on')
- for cnt in range(10, 0, -1):
- time.sleep(5)
- powered_on = self.adapter_props.Get('org.bluez.Adapter1', 'Powered')
- if powered_on == 1:
- # Set adapter props again with powered on value
- self.adapter_props = dbus.Interface(bus.get_object('org.bluez', adapter_path), 'org.freedesktop.DBus.Properties')
- print('bluetooth adapter powered on')
- break
- print('number of retries left({})'.format(cnt - 1))
- if powered_on == 0:
- raise RuntimeError('Failed to starte bluetooth adapter')
- # Start discovery if not already discovering
- started_discovery = 0
- discovery_val = self.adapter_props.Get('org.bluez.Adapter1', 'Discovering')
- if discovery_val == 0:
- print('starting discovery')
- self.adapter.StartDiscovery()
- # Set as start discovery is called
- started_discovery = 1
- for cnt in range(10, 0, -1):
- time.sleep(5)
- discovery_val = self.adapter_props.Get('org.bluez.Adapter1', 'Discovering')
- if discovery_val == 1:
- print('start discovery successful')
- break
- print('number of retries left ({})'.format(cnt - 1))
- if discovery_val == 0:
- print('start discovery failed')
- raise RuntimeError('Failed to start discovery')
- retry = 10
- while (retry > 0):
- try:
- if self.device is None:
- print('Connecting...')
- # Wait for device to be discovered
- time.sleep(5)
- connected = self._connect_()
- if connected:
- print('Connected')
- else:
- return False
- print('Getting Services...')
- # Wait for services to be discovered
- time.sleep(5)
- self._get_services_()
- return True
- except Exception as e:
- print(e)
- retry -= 1
- print('Retries left', retry)
- continue
- # Call StopDiscovery() for corresponding StartDiscovery() session
- if started_discovery == 1:
- print('stopping discovery')
- self.adapter.StopDiscovery()
- for cnt in range(10, 0, -1):
- time.sleep(5)
- discovery_val = self.adapter_props.Get('org.bluez.Adapter1', 'Discovering')
- if discovery_val == 0:
- print('stop discovery successful')
- break
- print('number of retries left ({})'.format(cnt - 1))
- if discovery_val == 1:
- print('stop discovery failed')
- return False
- def _connect_(self):
- bus = dbus.SystemBus()
- manager = dbus.Interface(bus.get_object('org.bluez', '/'), 'org.freedesktop.DBus.ObjectManager')
- objects = manager.GetManagedObjects()
- dev_path = None
- for path, interfaces in iteritems(objects):
- if 'org.bluez.Device1' not in interfaces:
- continue
- if interfaces['org.bluez.Device1'].get('Name') == self.devname:
- dev_path = path
- break
- if dev_path is None:
- raise RuntimeError('BLE device not found')
- try:
- self.device = bus.get_object('org.bluez', dev_path)
- try:
- uuids = self.device.Get('org.bluez.Device1', 'UUIDs',
- dbus_interface='org.freedesktop.DBus.Properties')
- # There should be 1 service UUID in advertising data
- # If bluez had cached an old version of the advertisement data
- # the list of uuids may be incorrect, in which case connection
- # or service discovery may fail the first time. If that happens
- # the cache will be refreshed before next retry
- if len(uuids) == 1:
- self.srv_uuid_adv = uuids[0]
- except dbus.exceptions.DBusException as e:
- raise RuntimeError(e)
- self.device.Connect(dbus_interface='org.bluez.Device1')
- # Check device is connected successfully
- for cnt in range(10, 0, -1):
- time.sleep(5)
- device_conn = self.device.Get(
- 'org.bluez.Device1',
- 'Connected',
- dbus_interface='org.freedesktop.DBus.Properties')
- if device_conn == 1:
- print('device is connected')
- break
- print('number of retries left ({})'.format(cnt - 1))
- if device_conn == 0:
- print('failed to connect device')
- return False
- return True
- except Exception as e:
- print(e)
- self.device = None
- raise RuntimeError('BLE device could not connect')
- def _get_services_(self):
- bus = dbus.SystemBus()
- manager = dbus.Interface(bus.get_object('org.bluez', '/'), 'org.freedesktop.DBus.ObjectManager')
- objects = manager.GetManagedObjects()
- service_found = False
- for srv_path, srv_interfaces in iteritems(objects):
- if 'org.bluez.GattService1' not in srv_interfaces:
- continue
- if not srv_path.startswith(self.device.object_path):
- continue
- service = bus.get_object('org.bluez', srv_path)
- srv_uuid = service.Get('org.bluez.GattService1', 'UUID',
- dbus_interface='org.freedesktop.DBus.Properties')
- # If service UUID doesn't match the one found in advertisement data
- # then also check if it matches the fallback UUID
- if srv_uuid not in [self.srv_uuid_adv, self.srv_uuid_fallback]:
- continue
- nu_lookup = dict()
- characteristics = dict()
- for chrc_path, chrc_interfaces in iteritems(objects):
- if 'org.bluez.GattCharacteristic1' not in chrc_interfaces:
- continue
- if not chrc_path.startswith(service.object_path):
- continue
- chrc = bus.get_object('org.bluez', chrc_path)
- uuid = chrc.Get('org.bluez.GattCharacteristic1', 'UUID',
- dbus_interface='org.freedesktop.DBus.Properties')
- characteristics[uuid] = chrc
- for desc_path, desc_interfaces in iteritems(objects):
- if 'org.bluez.GattDescriptor1' not in desc_interfaces:
- continue
- if not desc_path.startswith(chrc.object_path):
- continue
- desc = bus.get_object('org.bluez', desc_path)
- desc_uuid = desc.Get('org.bluez.GattDescriptor1', 'UUID',
- dbus_interface='org.freedesktop.DBus.Properties')
- if desc_uuid[4:8] != '2901':
- continue
- try:
- readval = desc.ReadValue({}, dbus_interface='org.bluez.GattDescriptor1')
- except dbus.exceptions.DBusException as err:
- raise RuntimeError('Failed to read value for descriptor while getting services - {}'.format(err))
- found_name = ''.join(chr(b) for b in readval).lower()
- nu_lookup[found_name] = uuid
- break
- match_found = True
- for name in self.chrc_names:
- if name not in nu_lookup:
- # Endpoint name not present
- match_found = False
- break
- # Create lookup table only if all endpoint names found
- self.nu_lookup = [None, nu_lookup][match_found]
- self.characteristics = characteristics
- service_found = True
- # If the service UUID matches that in the advertisement
- # we can stop the search now. If it doesn't match, we
- # have found the service corresponding to the fallback
- # UUID, in which case don't break and keep searching
- # for the advertised service
- if srv_uuid == self.srv_uuid_adv:
- break
- if not service_found:
- self.device.Disconnect(dbus_interface='org.bluez.Device1')
- # Check if device is disconnected successfully
- self._check_device_disconnected()
- if self.adapter:
- self.adapter.RemoveDevice(self.device)
- self.device = None
- self.nu_lookup = None
- self.characteristics = dict()
- raise RuntimeError('Provisioning service not found')
- def get_nu_lookup(self):
- return self.nu_lookup
- def has_characteristic(self, uuid):
- if uuid in self.characteristics:
- return True
- return False
- def disconnect(self):
- if self.device:
- self.device.Disconnect(dbus_interface='org.bluez.Device1')
- # Check if device is disconnected successfully
- self._check_device_disconnected()
- if self.adapter:
- self.adapter.RemoveDevice(self.device)
- self.device = None
- self.nu_lookup = None
- self.characteristics = dict()
- if self.adapter_props:
- self.adapter_props.Set('org.bluez.Adapter1', 'Powered', dbus.Boolean(0))
- def _check_device_disconnected(self):
- for cnt in range(10, 0, -1):
- time.sleep(5)
- device_conn = self.device.Get(
- 'org.bluez.Device1',
- 'Connected',
- dbus_interface='org.freedesktop.DBus.Properties')
- if device_conn == 0:
- print('device disconnected')
- break
- print('number of retries left ({})'.format(cnt - 1))
- if device_conn == 1:
- print('failed to disconnect device')
- def send_data(self, characteristic_uuid, data):
- try:
- path = self.characteristics[characteristic_uuid]
- except KeyError:
- raise RuntimeError('Invalid characteristic : ' + characteristic_uuid)
- try:
- path.WriteValue([ord(c) for c in data], {}, dbus_interface='org.bluez.GattCharacteristic1')
- except TypeError: # python3 compatible
- path.WriteValue([c for c in data], {}, dbus_interface='org.bluez.GattCharacteristic1')
- except dbus.exceptions.DBusException as e:
- raise RuntimeError('Failed to write value to characteristic ' + characteristic_uuid + ': ' + str(e))
- try:
- readval = path.ReadValue({}, dbus_interface='org.bluez.GattCharacteristic1')
- except dbus.exceptions.DBusException as e:
- raise RuntimeError('Failed to read value from characteristic ' + characteristic_uuid + ': ' + str(e))
- return ''.join(chr(b) for b in readval)
- # --------------------------------------------------------------------
- # Console based BLE client for Cross Platform support
- class BLE_Console_Client:
- def connect(self, devname, iface, chrc_names, fallback_srv_uuid):
- print('BLE client is running in console mode')
- print('\tThis could be due to your platform not being supported or dependencies not being met')
- print('\tPlease ensure all pre-requisites are met to run the full fledged client')
- print('BLECLI >> Please connect to BLE device `' + devname + '` manually using your tool of choice')
- resp = input('BLECLI >> Was the device connected successfully? [y/n] ')
- if resp != 'Y' and resp != 'y':
- return False
- print('BLECLI >> List available attributes of the connected device')
- resp = input("BLECLI >> Is the service UUID '" + fallback_srv_uuid + "' listed among available attributes? [y/n] ")
- if resp != 'Y' and resp != 'y':
- return False
- return True
- def get_nu_lookup(self):
- return None
- def has_characteristic(self, uuid):
- resp = input("BLECLI >> Is the characteristic UUID '" + uuid + "' listed among available attributes? [y/n] ")
- if resp != 'Y' and resp != 'y':
- return False
- return True
- def disconnect(self):
- pass
- def send_data(self, characteristic_uuid, data):
- print("BLECLI >> Write following data to characteristic with UUID '" + characteristic_uuid + "' :")
- print('\t>> ' + utils.str_to_hexstr(data))
- print('BLECLI >> Enter data read from characteristic (in hex) :')
- resp = input('\t<< ')
- return utils.hexstr_to_str(resp)
- # --------------------------------------------------------------------
- # Function to get client instance depending upon platform
- def get_client():
- if fallback:
- return BLE_Console_Client()
- return BLE_Bluez_Client()
|