| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712 |
- #!/usr/bin/env python
- #
- # Copyright 2019 Espressif Systems (Shanghai) PTE LTD
- #
- # Licensed under the Apache License, Version 2.0 (the "License");
- # you may not use this file except in compliance with the License.
- # You may obtain a copy of the License at
- #
- # http://www.apache.org/licenses/LICENSE-2.0
- #
- # Unless required by applicable law or agreed to in writing, software
- # distributed under the License is distributed on an "AS IS" BASIS,
- # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- # See the License for the specific language governing permissions and
- # limitations under the License.
- #
- # DBus-Bluez BLE library
- from __future__ import print_function
- import sys
- import time
- try:
- import dbus
- import dbus.mainloop.glib
- from gi.repository import GLib
- except ImportError as e:
- if 'linux' not in sys.platform:
- raise e
- print(e)
- print('Install packages `libgirepository1.0-dev gir1.2-gtk-3.0 libcairo2-dev libdbus-1-dev libdbus-glib-1-dev` for resolving the issue')
- print('Run `pip install -r $IDF_PATH/tools/ble/requirements.txt` for resolving the issue')
- raise
- from . import lib_gap, lib_gatt
- BLUEZ_SERVICE_NAME = 'org.bluez'
- DBUS_OM_IFACE = 'org.freedesktop.DBus.ObjectManager'
- DBUS_PROP_IFACE = 'org.freedesktop.DBus.Properties'
- ADAPTER_IFACE = 'org.bluez.Adapter1'
- DEVICE_IFACE = 'org.bluez.Device1'
- GATT_MANAGER_IFACE = 'org.bluez.GattManager1'
- LE_ADVERTISING_MANAGER_IFACE = 'org.bluez.LEAdvertisingManager1'
- GATT_SERVICE_IFACE = 'org.bluez.GattService1'
- GATT_CHRC_IFACE = 'org.bluez.GattCharacteristic1'
- class DBusException(dbus.exceptions.DBusException):
- pass
- class Characteristic:
- def __init__(self):
- self.iface = None
- self.path = None
- self.props = None
- class Service:
- def __init__(self):
- self.iface = None
- self.path = None
- self.props = None
- self.chars = []
- class Device:
- def __init__(self):
- self.iface = None
- self.path = None
- self.props = None
- self.name = None
- self.addr = None
- self.services = []
- class Adapter:
- def __init__(self):
- self.iface = None
- self.path = None
- self.props = None
- class BLE_Bluez_Client:
- def __init__(self, iface=None):
- self.bus = None
- self.hci_iface = iface
- self.adapter = Adapter()
- self.device = None
- self.gatt_app = None
- self.gatt_mgr = None
- self.mainloop = None
- self.loop_cnt = 0
- try:
- dbus.mainloop.glib.DBusGMainLoop(set_as_default=True)
- self.bus = dbus.SystemBus()
- except dbus.exceptions.DBusException as dbus_err:
- raise DBusException('Failed to initialise client: {}'.format(dbus_err))
- except Exception as err:
- raise Exception('Failed to initialise client: {}'.format(err))
- def __del__(self):
- try:
- # Cleanup
- self.disconnect()
- print('Test Exit')
- except Exception as e:
- print(e)
- def set_adapter(self):
- '''
- Discover Bluetooth Adapter
- Power On Bluetooth Adapter
- '''
- try:
- print('discovering adapter')
- dbus_obj_mgr = dbus.Interface(self.bus.get_object(BLUEZ_SERVICE_NAME, '/'), DBUS_OM_IFACE)
- dbus_objs = dbus_obj_mgr.GetManagedObjects()
- for path, interfaces in dbus_objs.items():
- adapter = interfaces.get(ADAPTER_IFACE)
- if adapter is not None and path.endswith(self.hci_iface):
- self.adapter.iface = dbus.Interface(self.bus.get_object(BLUEZ_SERVICE_NAME, path), ADAPTER_IFACE)
- self.adapter.props = dbus.Interface(self.bus.get_object(BLUEZ_SERVICE_NAME, path), DBUS_PROP_IFACE)
- self.adapter.path = path
- break
- if self.adapter.iface is None:
- print('bluetooth adapter not found')
- return False
- print('bluetooth adapter discovered')
- print('checking if bluetooth adapter is already powered on')
- # Check if adapter is already powered on
- powered = self.adapter.props.Get(ADAPTER_IFACE, 'Powered')
- if powered == 1:
- print('adapter already powered on')
- return True
- # Power On Adapter
- print('powering on adapter')
- self.adapter.props.Set(ADAPTER_IFACE, 'Powered', dbus.Boolean(1))
- # Check if adapter is powered on
- print('checking if adapter is powered on')
- for cnt in range(10, 0, -1):
- time.sleep(5)
- powered_on = self.adapter.props.Get(ADAPTER_IFACE, 'Powered')
- if powered_on == 1:
- # Set adapter props again with powered on value
- self.adapter.props = dbus.Interface(self.bus.get_object(BLUEZ_SERVICE_NAME, self.adapter.path), DBUS_PROP_IFACE)
- print('bluetooth adapter powered on')
- return True
- print('number of retries left({})'.format(cnt - 1))
- # Adapter not powered on
- print('bluetooth adapter not powered on')
- return False
- except Exception as err:
- raise Exception('Failed to set adapter: {}'.format(err))
- def connect(self, devname=None, devaddr=None):
- '''
- Start Discovery and Connect to the device
- '''
- try:
- device_found = None
- start_discovery = False
- self.device = Device()
- discovery_val = self.adapter.props.Get(ADAPTER_IFACE, 'Discovering')
- # Start Discovery
- if discovery_val == 0:
- print('starting discovery')
- self.adapter.iface.StartDiscovery()
- start_discovery = True
- for cnt in range(10, 0, -1):
- time.sleep(5)
- discovery_val = self.adapter.props.Get(ADAPTER_IFACE, 'Discovering')
- if discovery_val == 1:
- print('start discovery successful')
- break
- print('number of retries left ({})'.format(cnt - 1))
- if discovery_val == 0:
- print('start discovery failed')
- return False
- # Get device
- for cnt in range(10, 0, -1):
- # Wait for device to be discovered
- time.sleep(5)
- device_found = self.get_device(
- devname=devname,
- devaddr=devaddr)
- if device_found:
- break
- # Retry
- print('number of retries left ({})'.format(cnt - 1))
- if not device_found:
- print('expected device {} [ {} ] not found'.format(devname, devaddr))
- return False
- # Connect to expected device found
- print('connecting to device {} [ {} ] '.format(self.device.name, self.device.addr))
- self.device.iface.Connect(dbus_interface=DEVICE_IFACE)
- for cnt in range(10, 0, -1):
- time.sleep(5)
- connected = self.device.props.Get(DEVICE_IFACE, 'Connected')
- if connected == 1:
- # Set device props again with connected on value
- self.device.props = dbus.Interface(self.bus.get_object(BLUEZ_SERVICE_NAME, self.device.path), DBUS_PROP_IFACE)
- print('connected to device with iface {}'.format(self.device.path))
- return True
- print('number of retries left({})'.format(cnt - 1))
- # Device not connected
- print('connection to device failed')
- return False
- except Exception as err:
- raise Exception('Connect to device failed : {}'.format(err))
- finally:
- try:
- if start_discovery:
- print('stopping discovery')
- self.adapter.iface.StopDiscovery()
- for cnt in range(10, 0, -1):
- time.sleep(5)
- discovery_val = self.adapter.props.Get(ADAPTER_IFACE, 'Discovering')
- if discovery_val == 0:
- print('stop discovery successful')
- break
- print('number of retries left ({})'.format(cnt - 1))
- if discovery_val == 1:
- print('stop discovery failed')
- except dbus.exceptions.DBusException as dbus_err:
- print('Warning: Failure during cleanup for device connection : {}'.format(dbus_err))
- def get_device(self, devname=None, devaddr=None):
- '''
- Get device based on device name
- and device address and connect to device
- '''
- dev_path = None
- expected_device_addr = devaddr.lower()
- expected_device_name = devname.lower()
- print('checking if expected device {} [ {} ] is present'.format(devname, devaddr))
- dbus_obj_mgr = dbus.Interface(self.bus.get_object(BLUEZ_SERVICE_NAME, '/'), DBUS_OM_IFACE)
- dbus_objs = dbus_obj_mgr.GetManagedObjects()
- # Check if expected device is present
- for path, interfaces in dbus_objs.items():
- if DEVICE_IFACE not in interfaces.keys():
- continue
- # Check expected device address is received device address
- received_device_addr_path = (path.replace('_', ':')).lower()
- if expected_device_addr not in received_device_addr_path:
- continue
- device_props = dbus.Interface(self.bus.get_object(BLUEZ_SERVICE_NAME, path), DBUS_PROP_IFACE)
- received_device_name = device_props.Get(DEVICE_IFACE, 'Name').lower()
- # Check expected device name is received device name
- if expected_device_name == received_device_name:
- # Set device iface path
- dev_path = path
- break
- if not dev_path:
- print('\nBLE device not found')
- return False
- print('device {} [ {} ] found'.format(devname, devaddr))
- # Set device details
- self.device.iface = dbus.Interface(self.bus.get_object(BLUEZ_SERVICE_NAME, dev_path), DEVICE_IFACE)
- self.device.props = dbus.Interface(self.bus.get_object(BLUEZ_SERVICE_NAME, dev_path), DBUS_PROP_IFACE)
- self.device.path = dev_path
- self.device.name = devname
- self.device.addr = devaddr
- return True
- def get_services(self):
- '''
- Retrieve Services found in the device connected
- '''
- try:
- # Get current dbus objects
- dbus_obj_mgr = dbus.Interface(self.bus.get_object(BLUEZ_SERVICE_NAME, '/'), DBUS_OM_IFACE)
- dbus_objs = dbus_obj_mgr.GetManagedObjects()
- # Get services
- for path, interfaces in dbus_objs.items():
- if GATT_SERVICE_IFACE in interfaces.keys():
- if not path.startswith(self.device.path):
- continue
- received_service = self.bus.get_object(BLUEZ_SERVICE_NAME, path)
- # Retrieve all services on device iface path
- # and set each service received
- service = Service()
- service.path = path
- service.iface = dbus.Interface(received_service, GATT_SERVICE_IFACE)
- service.props = dbus.Interface(received_service, DBUS_PROP_IFACE)
- self.device.services.append(service)
- if not self.device.services:
- print('no services found for device: {}'.format(self.device.path))
- return False
- return True
- except Exception as err:
- raise Exception('Failed to get services: {}'.format(err))
- def get_chars(self):
- '''
- Get characteristics of the services set for the device connected
- '''
- try:
- if not self.device.services:
- print('No services set for device: {}'.format(self.device.path))
- return
- # Read chars for all the services received for device
- for service in self.device.services:
- char_found = False
- dbus_obj_mgr = dbus.Interface(self.bus.get_object(BLUEZ_SERVICE_NAME, '/'), DBUS_OM_IFACE)
- dbus_objs = dbus_obj_mgr.GetManagedObjects()
- for path, interfaces in dbus_objs.items():
- if GATT_CHRC_IFACE in interfaces.keys():
- if not path.startswith(self.device.path):
- continue
- if not path.startswith(service.path):
- continue
- # Set characteristics
- received_char = self.bus.get_object(BLUEZ_SERVICE_NAME, path)
- char = Characteristic()
- char.path = path
- char.iface = dbus.Interface(received_char, GATT_CHRC_IFACE)
- char.props = dbus.Interface(received_char, DBUS_PROP_IFACE)
- service.chars.append(char)
- char_found = True
- if not char_found:
- print('Characteristic not found for service: {}'.format(service.iface))
- except Exception as err:
- raise Exception('Failed to get characteristics : {}'.format(err))
- def read_chars(self):
- '''
- Read value of characteristics
- '''
- try:
- if not self.device.services:
- print('No services set for device: {}'.format(self.device.path))
- return
- # Read chars for all services of device
- for service in self.device.services:
- # Read properties of characteristic
- for char in service.chars:
- # Print path
- print('Characteristic: {}'.format(char.path))
- # Print uuid
- uuid = char.props.Get(GATT_CHRC_IFACE, 'UUID')
- print('UUID: {}'.format(uuid))
- # Print flags
- flags = [flag for flag in char.props.Get(GATT_CHRC_IFACE, 'Flags')]
- print('Flags: {}'.format(flags))
- # Read value if `read` flag is present
- if 'read' in flags:
- value = char.iface.ReadValue({}, dbus_interface=GATT_CHRC_IFACE)
- print('Value: {}'.format(value))
- except Exception as err:
- raise Exception('Failed to read characteristics : {}'.format(err))
- def write_chars(self, new_value):
- '''
- Write to characteristics
- '''
- try:
- if not self.device.services:
- print('No services set for device: {}'.format(self.device.path))
- return False
- print('writing data to characteristics with read and write permission')
- # Read chars of all services of device
- for service in self.device.services:
- if not service.chars:
- print('No chars found for service: {}'.format(service.path))
- continue
- for char in service.chars:
- flags = [flag.lower() for flag in char.props.Get(GATT_CHRC_IFACE, 'Flags')]
- if not ('read' in flags and 'write' in flags):
- continue
- # Write new value to characteristic
- curr_value = char.iface.ReadValue({}, dbus_interface=GATT_CHRC_IFACE)
- print('current value: {}'.format(curr_value))
- print('writing {} to characteristic {}'.format(new_value, char.path))
- char.iface.WriteValue(new_value, {}, dbus_interface=GATT_CHRC_IFACE)
- time.sleep(5)
- updated_value = char.iface.ReadValue({}, dbus_interface=GATT_CHRC_IFACE)
- print('updated value: {}'.format(updated_value))
- if not (ord(new_value) == int(updated_value[0])):
- print('write operation to {} failed'.format(char.path))
- return False
- print('write operation to {} successful'.format(char.path))
- return True
- except Exception as err:
- raise Exception('Failed to write to characteristics: {}'.format(err))
- def get_char_if_exists(self, char_uuid):
- '''
- Get char if exists for given uuid
- '''
- try:
- for service in self.device.services:
- for char in service.chars:
- curr_uuid = char.props.Get(GATT_CHRC_IFACE, 'UUID')
- if char_uuid.lower() in curr_uuid.lower():
- return char
- print('char {} not found'.format(char_uuid))
- return False
- except Exception as err:
- raise Exception('Failed to get char based on uuid {} - {}'.format(char_uuid, err))
- def get_service_if_exists(self, service_uuid):
- try:
- for service in self.device.services:
- uuid = service.props.Get(GATT_SERVICE_IFACE, 'UUID')
- if service_uuid.lower() in uuid.lower():
- return service
- print('service {} not found'.format(service_uuid))
- return False
- except Exception as err:
- raise Exception('Failed to get service based on uuid {} - {}'.format(service_uuid, err))
- def start_notify(self, char):
- try:
- notify_started = 0
- notifying = char.props.Get(GATT_CHRC_IFACE, 'Notifying')
- if notifying == 0:
- # Start Notify
- char.iface.StartNotify()
- notify_started = 1
- # Check notify started
- for _ in range(10, 0, -1):
- notifying = char.props.Get(GATT_CHRC_IFACE, 'Notifying')
- if notifying == 1:
- print('subscribe to notifications: on')
- break
- if notifying == 0:
- print('Failed to start notifications')
- return False
- # Get updated value
- for _ in range(10, 0, -1):
- time.sleep(1)
- char_value = char.props.Get(GATT_CHRC_IFACE, 'Value')
- print(char_value)
- return None
- except Exception as err:
- raise Exception('Failed to perform notification operation: {}'.format(err))
- finally:
- try:
- if notify_started == 1:
- # Stop notify
- char.iface.StopNotify()
- for _ in range(10, 0, -1):
- notifying = char.props.Get(GATT_CHRC_IFACE, 'Notifying')
- if notifying == 0:
- print('subscribe to notifications: off')
- break
- if notifying == 1:
- print('Failed to stop notifications')
- except dbus.exceptions.DBusException as dbus_err:
- print('Warning: Failure during cleanup for start notify : {}'.format(dbus_err))
- def _create_mainloop(self):
- '''
- Create GLibMainLoop
- '''
- if not self.mainloop:
- self.mainloop = GLib.MainLoop()
- def register_gatt_app(self):
- '''
- Create Gatt Application
- Register Gatt Application
- '''
- try:
- # Create mainloop, if does not exist
- self._create_mainloop()
- # Create Gatt Application
- self.gatt_app = lib_gatt.AlertNotificationApp(self.bus, self.adapter.path)
- print('GATT Application created')
- self.gatt_mgr = dbus.Interface(self.bus.get_object(BLUEZ_SERVICE_NAME, self.adapter.path), GATT_MANAGER_IFACE)
- # Register Gatt Application
- self.gatt_mgr.RegisterApplication(
- self.gatt_app, {},
- reply_handler=self.gatt_app_success_handler,
- error_handler=self.gatt_app_error_handler)
- self.mainloop.run()
- except dbus.exceptions.DBusException as dbus_err:
- raise DBusException('Failed to create GATT Application : {}'.format(dbus_err))
- except Exception as err:
- raise Exception('Failed to register Gatt Application: {}'.format(err))
- def gatt_app_success_handler(self):
- print('GATT Application successfully registered')
- self.mainloop.quit()
- def gatt_app_error_handler(self):
- raise DBusException('Failed to register GATT Application')
- def check_le_iface(self):
- '''
- Check if LEAdvertisingManager1 interface exists
- '''
- try:
- dbus_obj_mgr = dbus.Interface(self.bus.get_object(BLUEZ_SERVICE_NAME, '/'), DBUS_OM_IFACE)
- dbus_objs = dbus_obj_mgr.GetManagedObjects()
- for path, iface in dbus_objs.items():
- if LE_ADVERTISING_MANAGER_IFACE in iface:
- le_adv_iface_path = path
- break
- # Check LEAdvertisingManager1 interface is found
- assert le_adv_iface_path, '\n Cannot start advertising. LEAdvertisingManager1 Interface not found'
- return le_adv_iface_path
- except AssertionError:
- raise
- except Exception as err:
- raise Exception('Failed to find LEAdvertisingManager1 interface: {}'.format(err))
- def register_adv(self, adv_host_name, adv_type, adv_uuid):
- try:
- # Gatt Application is expected to be registered
- if not self.gatt_app:
- print('No Gatt Application is registered')
- return
- adv_iface_index = 0
- # Create mainloop, if does not exist
- self._create_mainloop()
- # Check LEAdvertisingManager1 interface exists
- le_iface_path = self.check_le_iface()
- # Create Advertisement data
- leadv_obj = lib_gap.Advertisement(
- self.bus,
- adv_iface_index,
- adv_type,
- adv_uuid,
- adv_host_name)
- print('Advertisement registered')
- # Register Advertisement
- leadv_mgr_iface_obj = dbus.Interface(self.bus.get_object(BLUEZ_SERVICE_NAME, le_iface_path), LE_ADVERTISING_MANAGER_IFACE)
- leadv_mgr_iface_obj.RegisterAdvertisement(
- leadv_obj.get_path(), {},
- reply_handler=self.adv_success_handler,
- error_handler=self.adv_error_handler)
- # Handler to read events received and exit from mainloop
- GLib.timeout_add_seconds(3, self.check_adv)
- self.mainloop.run()
- except AssertionError:
- raise
- except dbus.exceptions.DBusException as dbus_err:
- raise DBusException('Failure during registering advertisement : {}'.format(dbus_err))
- except Exception as err:
- raise Exception('Failure during registering advertisement : {}'.format(err))
- else:
- try:
- try:
- # Stop Notify if not already stopped
- chars = self.gatt_app.service.get_characteristics()
- for char in chars:
- if char.uuid == lib_gatt.CHAR_UUIDS['UNREAD_ALERT_STATUS_UUID']:
- if char.notifying:
- char.StopNotify()
- except dbus.exceptions.DBusException as dbus_err:
- print('Warning: {}'.format(dbus_err))
- try:
- # Unregister Advertisement
- leadv_mgr_iface_obj.UnregisterAdvertisement(leadv_obj.get_path())
- except dbus.exceptions.DBusException as dbus_err:
- print('Warning: {}'.format(dbus_err))
- try:
- # Remove advertising data
- dbus.service.Object.remove_from_connection(leadv_obj)
- except LookupError as err:
- print('Warning: Failed to remove connection from dbus for advertisement object: {} - {}'.format(leadv_obj, err))
- try:
- # Unregister Gatt Application
- self.gatt_mgr.UnregisterApplication(self.gatt_app.get_path())
- except dbus.exceptions.DBusException as dbus_err:
- print('Warning: {}'.format(dbus_err))
- try:
- # Remove Gatt Application
- dbus.service.Object.remove_from_connection(self.gatt_app)
- except LookupError as err:
- print('Warning: Failed to remove connection from dbus for Gatt application object: {} - {}'.format(self.gatt_app, err))
- except RuntimeError as err:
- print('Warning: Failure during cleanup of Advertisement: {}'.format(err))
- def adv_success_handler(self):
- print('Registered Advertisement successfully')
- def adv_error_handler(self, err):
- raise DBusException('{}'.format(err))
- def check_adv(self):
- '''
- Handler to check for events triggered (read/write/subscribe)
- for advertisement registered for AlertNotificationApp
- '''
- try:
- retry = 10
- # Exit loop if read and write and subscribe is successful
- if self.gatt_app.service.get_char_status(lib_gatt.CHAR_UUIDS['SUPPORT_NEW_ALERT_UUID'], 'read') and \
- self.gatt_app.service.get_char_status(lib_gatt.CHAR_UUIDS['ALERT_NOTIF_UUID'], 'write') and \
- self.gatt_app.service.get_char_status(lib_gatt.CHAR_UUIDS['UNREAD_ALERT_STATUS_UUID'], 'notify'):
- if self.mainloop.is_running():
- self.mainloop.quit()
- # return False to stop polling
- return False
- self.loop_cnt += 1
- print('Check read/write/subscribe events are received...Retry {}'.format(self.loop_cnt))
- # Exit loop if max retry value is reached and
- # all three events (read and write and subscribe) have not yet passed
- # Retry total 10 times
- if self.loop_cnt == (retry - 1):
- if self.mainloop.is_running():
- self.mainloop.quit()
- # return False to stop polling
- return False
- # return True to continue polling
- return True
- except RuntimeError as err:
- print('Failure in advertisment handler: {}'.format(err))
- if self.mainloop.is_running():
- self.mainloop.quit()
- # return False to stop polling
- return False
- def disconnect(self):
- '''
- Disconnect device
- '''
- try:
- if not self.device or not self.device.iface:
- return
- print('disconnecting device')
- # Disconnect device
- device_conn = self.device.props.Get(DEVICE_IFACE, 'Connected')
- if device_conn == 1:
- self.device.iface.Disconnect(dbus_interface=DEVICE_IFACE)
- for cnt in range(10, 0, -1):
- time.sleep(5)
- device_conn = self.device.props.Get(DEVICE_IFACE, 'Connected')
- if device_conn == 0:
- print('device disconnected')
- break
- print('number of retries left ({})'.format(cnt - 1))
- if device_conn == 1:
- print('failed to disconnect device')
- self.adapter.iface.RemoveDevice(self.device.iface)
- self.device = None
- except dbus.exceptions.DBusException as dbus_err:
- print('Warning: {}'.format(dbus_err))
- except Exception as err:
- raise Exception('Failed to disconnect device: {}'.format(err))
|