|
|
@@ -21,12 +21,10 @@ from __future__ import print_function
|
|
|
|
|
|
import sys
|
|
|
import time
|
|
|
-import traceback
|
|
|
|
|
|
try:
|
|
|
import dbus
|
|
|
import dbus.mainloop.glib
|
|
|
- from future.moves.itertools import zip_longest
|
|
|
from gi.repository import GLib
|
|
|
except ImportError as e:
|
|
|
if 'linux' not in sys.platform:
|
|
|
@@ -38,38 +36,6 @@ except ImportError as e:
|
|
|
|
|
|
from . import lib_gap, lib_gatt
|
|
|
|
|
|
-srv_added_old_cnt = 0
|
|
|
-srv_added_new_cnt = 0
|
|
|
-verify_signal_check = 0
|
|
|
-blecent_retry_check_cnt = 0
|
|
|
-gatt_app_retry_check_cnt = 0
|
|
|
-verify_service_cnt = 0
|
|
|
-verify_readchars_cnt = 0
|
|
|
-adv_retry_check_cnt = 0
|
|
|
-blecent_adv_uuid = '1811'
|
|
|
-gatt_app_obj_check = False
|
|
|
-gatt_app_reg_check = False
|
|
|
-adv_checks_done = False
|
|
|
-gatt_checks_done = False
|
|
|
-adv_data_check = False
|
|
|
-adv_reg_check = False
|
|
|
-read_req_check = False
|
|
|
-write_req_check = False
|
|
|
-subscribe_req_check = False
|
|
|
-ble_hr_chrc = False
|
|
|
-discovery_start = False
|
|
|
-signal_caught = False
|
|
|
-test_checks_pass = False
|
|
|
-adv_stop = False
|
|
|
-services_resolved = False
|
|
|
-service_uuid_found = False
|
|
|
-adapter_on = False
|
|
|
-device_connected = False
|
|
|
-gatt_app_registered = False
|
|
|
-adv_registered = False
|
|
|
-adv_active_instance = False
|
|
|
-chrc_value_cnt = False
|
|
|
-
|
|
|
BLUEZ_SERVICE_NAME = 'org.bluez'
|
|
|
DBUS_OM_IFACE = 'org.freedesktop.DBus.ObjectManager'
|
|
|
DBUS_PROP_IFACE = 'org.freedesktop.DBus.Properties'
|
|
|
@@ -84,811 +50,663 @@ GATT_SERVICE_IFACE = 'org.bluez.GattService1'
|
|
|
GATT_CHRC_IFACE = 'org.bluez.GattCharacteristic1'
|
|
|
|
|
|
|
|
|
-# Set up the main loop.
|
|
|
-dbus.mainloop.glib.DBusGMainLoop(set_as_default=True)
|
|
|
-dbus.mainloop.glib.threads_init()
|
|
|
-# Set up the event main loop.
|
|
|
-event_loop = GLib.MainLoop()
|
|
|
+class DBusException(dbus.exceptions.DBusException):
|
|
|
+ pass
|
|
|
|
|
|
|
|
|
-def verify_signal_is_caught():
|
|
|
- global verify_signal_check
|
|
|
- verify_signal_check += 1
|
|
|
+class Characteristic:
|
|
|
+ def __init__(self):
|
|
|
+ self.iface = None
|
|
|
+ self.path = None
|
|
|
+ self.props = None
|
|
|
|
|
|
- if (not signal_caught and verify_signal_check == 15) or (signal_caught):
|
|
|
- if event_loop.is_running():
|
|
|
- event_loop.quit()
|
|
|
- return False # polling for checks will stop
|
|
|
|
|
|
- return True # polling will continue
|
|
|
+class Service:
|
|
|
+ def __init__(self):
|
|
|
+ self.iface = None
|
|
|
+ self.path = None
|
|
|
+ self.props = None
|
|
|
+ self.chars = []
|
|
|
|
|
|
|
|
|
-def set_props_status(props):
|
|
|
- """
|
|
|
- Set Adapter status if it is powered on or off
|
|
|
- """
|
|
|
- global adapter_on, services_resolved, GATT_OBJ_REMOVED, gatt_app_registered, \
|
|
|
- adv_registered, adv_active_instance, device_connected, CHRC_VALUE, chrc_value_cnt, \
|
|
|
- signal_caught
|
|
|
- is_service_uuid = False
|
|
|
- # Signal caught for change in Adapter Powered property
|
|
|
- if 'Powered' in props:
|
|
|
- if props['Powered'] == 1:
|
|
|
- signal_caught = True
|
|
|
- adapter_on = True
|
|
|
- else:
|
|
|
- signal_caught = True
|
|
|
- adapter_on = False
|
|
|
- if 'ServicesResolved' in props:
|
|
|
- if props['ServicesResolved'] == 1:
|
|
|
- signal_caught = True
|
|
|
- services_resolved = True
|
|
|
- else:
|
|
|
- signal_caught = True
|
|
|
- services_resolved = False
|
|
|
- if 'UUIDs' in props:
|
|
|
- # Signal caught for add/remove GATT data having service uuid
|
|
|
- for uuid in props['UUIDs']:
|
|
|
- if blecent_adv_uuid in uuid:
|
|
|
- is_service_uuid = True
|
|
|
- if not is_service_uuid:
|
|
|
- # Signal caught for removing GATT data having service uuid
|
|
|
- # and for unregistering GATT application
|
|
|
- gatt_app_registered = False
|
|
|
- lib_gatt.GATT_APP_OBJ = False
|
|
|
- if 'ActiveInstances' in props:
|
|
|
- # Signal caught for Advertising - add/remove Instances property
|
|
|
- if props['ActiveInstances'] == 1:
|
|
|
- adv_active_instance = True
|
|
|
- elif props['ActiveInstances'] == 0:
|
|
|
- adv_active_instance = False
|
|
|
- adv_registered = False
|
|
|
- lib_gap.ADV_OBJ = False
|
|
|
- if 'Connected' in props:
|
|
|
- # Signal caught for device connect/disconnect
|
|
|
- if props['Connected'] == 1:
|
|
|
- signal_caught = True
|
|
|
- device_connected = True
|
|
|
- else:
|
|
|
- signal_caught = True
|
|
|
- device_connected = False
|
|
|
- if 'Value' in props:
|
|
|
- # Signal caught for change in chars value
|
|
|
- if ble_hr_chrc:
|
|
|
- chrc_value_cnt += 1
|
|
|
- print(props['Value'])
|
|
|
- if chrc_value_cnt == 10:
|
|
|
- signal_caught = True
|
|
|
- return False
|
|
|
-
|
|
|
- return False
|
|
|
-
|
|
|
+class Device:
|
|
|
+ def __init__(self):
|
|
|
+ self.iface = None
|
|
|
+ self.path = None
|
|
|
+ self.props = None
|
|
|
+ self.name = None
|
|
|
+ self.addr = None
|
|
|
+ self.services = []
|
|
|
|
|
|
-def props_change_handler(iface, changed_props, invalidated):
|
|
|
- """
|
|
|
- PropertiesChanged Signal handler.
|
|
|
- Catch and print information about PropertiesChanged signal.
|
|
|
- """
|
|
|
|
|
|
- if iface == ADAPTER_IFACE:
|
|
|
- set_props_status(changed_props)
|
|
|
- if iface == LE_ADVERTISING_MANAGER_IFACE:
|
|
|
- set_props_status(changed_props)
|
|
|
- if iface == DEVICE_IFACE:
|
|
|
- set_props_status(changed_props)
|
|
|
- if iface == GATT_CHRC_IFACE:
|
|
|
- set_props_status(changed_props)
|
|
|
+class Adapter:
|
|
|
+ def __init__(self):
|
|
|
+ self.iface = None
|
|
|
+ self.path = None
|
|
|
+ self.props = None
|
|
|
|
|
|
|
|
|
class BLE_Bluez_Client:
|
|
|
- def __init__(self, iface, devname=None, devaddr=None):
|
|
|
+ def __init__(self, iface=None):
|
|
|
self.bus = None
|
|
|
+ self.hci_iface = iface
|
|
|
+ self.adapter = Adapter()
|
|
|
self.device = None
|
|
|
- self.devname = devname
|
|
|
- self.devaddr = devaddr
|
|
|
- self.iface = iface
|
|
|
- self.ble_objs = None
|
|
|
- self.props_iface_obj = None
|
|
|
- self.adapter_path = []
|
|
|
- self.adapter = None
|
|
|
- self.services = []
|
|
|
- self.srv_uuid = []
|
|
|
- self.chars = {}
|
|
|
- self.char_uuid = []
|
|
|
+ self.gatt_app = None
|
|
|
+ self.gatt_mgr = None
|
|
|
+ self.mainloop = None
|
|
|
+ self.loop_cnt = 0
|
|
|
|
|
|
try:
|
|
|
+ dbus.mainloop.glib.DBusGMainLoop(set_as_default=True)
|
|
|
self.bus = dbus.SystemBus()
|
|
|
- om_iface_obj = dbus.Interface(self.bus.get_object(BLUEZ_SERVICE_NAME, '/'), DBUS_OM_IFACE)
|
|
|
- self.ble_objs = om_iface_obj.GetManagedObjects()
|
|
|
-
|
|
|
- except Exception as e:
|
|
|
- print(e)
|
|
|
+ except dbus.exceptions.DBusException as dbus_err:
|
|
|
+ raise DBusException('Failed to initialise client: {}'.format(dbus_err))
|
|
|
+ except Exception as err:
|
|
|
+ raise Exception('Failed to initialise client: {}'.format(err))
|
|
|
|
|
|
def __del__(self):
|
|
|
try:
|
|
|
+ # Cleanup
|
|
|
+ self.disconnect()
|
|
|
print('Test Exit')
|
|
|
except Exception as e:
|
|
|
print(e)
|
|
|
- sys.exit(1)
|
|
|
|
|
|
def set_adapter(self):
|
|
|
'''
|
|
|
Discover Bluetooth Adapter
|
|
|
Power On Bluetooth Adapter
|
|
|
'''
|
|
|
- global verify_signal_check, signal_caught, adapter_on
|
|
|
- verify_signal_check = 0
|
|
|
- adapter_on = False
|
|
|
try:
|
|
|
- print('discovering adapter...')
|
|
|
- for path, interfaces in self.ble_objs.items():
|
|
|
+ print('discovering adapter')
|
|
|
+ dbus_obj_mgr = dbus.Interface(self.bus.get_object(BLUEZ_SERVICE_NAME, '/'), DBUS_OM_IFACE)
|
|
|
+ dbus_objs = dbus_obj_mgr.GetManagedObjects()
|
|
|
+ for path, interfaces in dbus_objs.items():
|
|
|
adapter = interfaces.get(ADAPTER_IFACE)
|
|
|
- if adapter is not None:
|
|
|
- if path.endswith(self.iface):
|
|
|
- self.adapter = dbus.Interface(self.bus.get_object(BLUEZ_SERVICE_NAME, path), ADAPTER_IFACE)
|
|
|
- # Create Properties Interface object only after adapter is found
|
|
|
- self.props_iface_obj = dbus.Interface(self.bus.get_object(BLUEZ_SERVICE_NAME, path), DBUS_PROP_IFACE)
|
|
|
- self.adapter_path = [path, interfaces]
|
|
|
- # Check adapter status - power on/off
|
|
|
- set_props_status(interfaces[ADAPTER_IFACE])
|
|
|
- break
|
|
|
-
|
|
|
- if self.adapter is None:
|
|
|
- raise Exception('Bluetooth adapter not found')
|
|
|
+ if adapter is not None and path.endswith(self.hci_iface):
|
|
|
+ self.adapter.iface = dbus.Interface(self.bus.get_object(BLUEZ_SERVICE_NAME, path), ADAPTER_IFACE)
|
|
|
+ self.adapter.props = dbus.Interface(self.bus.get_object(BLUEZ_SERVICE_NAME, path), DBUS_PROP_IFACE)
|
|
|
+ self.adapter.path = path
|
|
|
+ break
|
|
|
|
|
|
- if self.props_iface_obj is None:
|
|
|
- raise Exception('Properties interface not found')
|
|
|
+ if self.adapter.iface is None:
|
|
|
+ print('bluetooth adapter not found')
|
|
|
+ return False
|
|
|
|
|
|
print('bluetooth adapter discovered')
|
|
|
-
|
|
|
+ print('checking if bluetooth adapter is already powered on')
|
|
|
# Check if adapter is already powered on
|
|
|
- if adapter_on:
|
|
|
- print('Adapter already powered on')
|
|
|
+ powered = self.adapter.props.Get(ADAPTER_IFACE, 'Powered')
|
|
|
+ if powered == 1:
|
|
|
+ print('adapter already powered on')
|
|
|
return True
|
|
|
-
|
|
|
# Power On Adapter
|
|
|
- print('powering on adapter...')
|
|
|
- self.props_iface_obj.connect_to_signal('PropertiesChanged', props_change_handler)
|
|
|
- self.props_iface_obj.Set(ADAPTER_IFACE, 'Powered', dbus.Boolean(1))
|
|
|
-
|
|
|
- signal_caught = False
|
|
|
- GLib.timeout_add_seconds(5, verify_signal_is_caught)
|
|
|
- event_loop.run()
|
|
|
-
|
|
|
- if adapter_on:
|
|
|
- print('bluetooth adapter powered on')
|
|
|
- return True
|
|
|
- else:
|
|
|
- raise Exception('Failure: bluetooth adapter not powered on')
|
|
|
-
|
|
|
- except Exception:
|
|
|
- print(traceback.format_exc())
|
|
|
+ print('powering on adapter')
|
|
|
+ self.adapter.props.Set(ADAPTER_IFACE, 'Powered', dbus.Boolean(1))
|
|
|
+ # Check if adapter is powered on
|
|
|
+ print('checking if adapter is powered on')
|
|
|
+ for cnt in range(10, 0, -1):
|
|
|
+ time.sleep(5)
|
|
|
+ powered_on = self.adapter.props.Get(ADAPTER_IFACE, 'Powered')
|
|
|
+ if powered_on == 1:
|
|
|
+ # Set adapter props again with powered on value
|
|
|
+ self.adapter.props = dbus.Interface(self.bus.get_object(BLUEZ_SERVICE_NAME, self.adapter.path), DBUS_PROP_IFACE)
|
|
|
+ print('bluetooth adapter powered on')
|
|
|
+ return True
|
|
|
+ print('number of retries left({})'.format(cnt - 1))
|
|
|
+
|
|
|
+ # Adapter not powered on
|
|
|
+ print('bluetooth adapter not powered on')
|
|
|
return False
|
|
|
|
|
|
- def connect(self):
|
|
|
+ except Exception as err:
|
|
|
+ raise Exception('Failed to set adapter: {}'.format(err))
|
|
|
+
|
|
|
+ def connect(self, devname=None, devaddr=None):
|
|
|
'''
|
|
|
- Connect to the device discovered
|
|
|
- Retry 10 times to discover and connect to device
|
|
|
+ Start Discovery and Connect to the device
|
|
|
'''
|
|
|
- global discovery_start, signal_caught, device_connected, verify_signal_check
|
|
|
- device_found = False
|
|
|
- device_connected = False
|
|
|
try:
|
|
|
- self.adapter.StartDiscovery()
|
|
|
- print('\nStarted Discovery')
|
|
|
-
|
|
|
- discovery_start = True
|
|
|
+ device_found = None
|
|
|
+ start_discovery = False
|
|
|
+ self.device = Device()
|
|
|
+
|
|
|
+ discovery_val = self.adapter.props.Get(ADAPTER_IFACE, 'Discovering')
|
|
|
+ # Start Discovery
|
|
|
+ if discovery_val == 0:
|
|
|
+ print('starting discovery')
|
|
|
+ self.adapter.iface.StartDiscovery()
|
|
|
+ start_discovery = True
|
|
|
+
|
|
|
+ for cnt in range(10, 0, -1):
|
|
|
+ time.sleep(5)
|
|
|
+ discovery_val = self.adapter.props.Get(ADAPTER_IFACE, 'Discovering')
|
|
|
+ if discovery_val == 1:
|
|
|
+ print('start discovery successful')
|
|
|
+ break
|
|
|
+ print('number of retries left ({})'.format(cnt - 1))
|
|
|
+
|
|
|
+ if discovery_val == 0:
|
|
|
+ print('start discovery failed')
|
|
|
+ return False
|
|
|
+
|
|
|
+ # Get device
|
|
|
+ for cnt in range(10, 0, -1):
|
|
|
+ # Wait for device to be discovered
|
|
|
+ time.sleep(5)
|
|
|
+ device_found = self.get_device(
|
|
|
+ devname=devname,
|
|
|
+ devaddr=devaddr)
|
|
|
+ if device_found:
|
|
|
+ break
|
|
|
+ # Retry
|
|
|
+ print('number of retries left ({})'.format(cnt - 1))
|
|
|
|
|
|
- for retry_cnt in range(10,0,-1):
|
|
|
- verify_signal_check = 0
|
|
|
- try:
|
|
|
- if self.device is None:
|
|
|
- print('\nConnecting to device...')
|
|
|
- # Wait for device to be discovered
|
|
|
- time.sleep(5)
|
|
|
- device_found = self.get_device()
|
|
|
- if device_found:
|
|
|
- self.device.Connect(dbus_interface=DEVICE_IFACE)
|
|
|
- time.sleep(15)
|
|
|
- signal_caught = False
|
|
|
- GLib.timeout_add_seconds(5, verify_signal_is_caught)
|
|
|
- event_loop.run()
|
|
|
- if device_connected:
|
|
|
- print('\nConnected to device')
|
|
|
- return True
|
|
|
- else:
|
|
|
- raise Exception
|
|
|
- except Exception as e:
|
|
|
- print(e)
|
|
|
- print('\nRetries left', retry_cnt - 1)
|
|
|
- continue
|
|
|
+ if not device_found:
|
|
|
+ print('expected device {} [ {} ] not found'.format(devname, devaddr))
|
|
|
+ return False
|
|
|
|
|
|
- # Device not found
|
|
|
+ # Connect to expected device found
|
|
|
+ print('connecting to device {} [ {} ] '.format(self.device.name, self.device.addr))
|
|
|
+ self.device.iface.Connect(dbus_interface=DEVICE_IFACE)
|
|
|
+ for cnt in range(10, 0, -1):
|
|
|
+ time.sleep(5)
|
|
|
+ connected = self.device.props.Get(DEVICE_IFACE, 'Connected')
|
|
|
+ if connected == 1:
|
|
|
+ # Set device props again with connected on value
|
|
|
+ self.device.props = dbus.Interface(self.bus.get_object(BLUEZ_SERVICE_NAME, self.device.path), DBUS_PROP_IFACE)
|
|
|
+ print('connected to device with iface {}'.format(self.device.path))
|
|
|
+ return True
|
|
|
+ print('number of retries left({})'.format(cnt - 1))
|
|
|
+
|
|
|
+ # Device not connected
|
|
|
+ print('connection to device failed')
|
|
|
return False
|
|
|
|
|
|
- except Exception:
|
|
|
- print(traceback.format_exc())
|
|
|
- self.device = None
|
|
|
- return False
|
|
|
+ except Exception as err:
|
|
|
+ raise Exception('Connect to device failed : {}'.format(err))
|
|
|
+ finally:
|
|
|
+ try:
|
|
|
+ if start_discovery:
|
|
|
+ print('stopping discovery')
|
|
|
+ self.adapter.iface.StopDiscovery()
|
|
|
+ for cnt in range(10, 0, -1):
|
|
|
+ time.sleep(5)
|
|
|
+ discovery_val = self.adapter.props.Get(ADAPTER_IFACE, 'Discovering')
|
|
|
+ if discovery_val == 0:
|
|
|
+ print('stop discovery successful')
|
|
|
+ break
|
|
|
+ print('number of retries left ({})'.format(cnt - 1))
|
|
|
+ if discovery_val == 1:
|
|
|
+ print('stop discovery failed')
|
|
|
+ except dbus.exceptions.DBusException as dbus_err:
|
|
|
+ print('Warning: Failure during cleanup for device connection : {}'.format(dbus_err))
|
|
|
|
|
|
- def get_device(self):
|
|
|
+ def get_device(self, devname=None, devaddr=None):
|
|
|
'''
|
|
|
- Discover device based on device name
|
|
|
- and device address and connect
|
|
|
+ Get device based on device name
|
|
|
+ and device address and connect to device
|
|
|
'''
|
|
|
dev_path = None
|
|
|
+ expected_device_addr = devaddr.lower()
|
|
|
+ expected_device_name = devname.lower()
|
|
|
+
|
|
|
+ print('checking if expected device {} [ {} ] is present'.format(devname, devaddr))
|
|
|
|
|
|
- om_iface_obj = dbus.Interface(self.bus.get_object(BLUEZ_SERVICE_NAME, '/'), DBUS_OM_IFACE)
|
|
|
- self.ble_objs = om_iface_obj.GetManagedObjects()
|
|
|
- for path, interfaces in self.ble_objs.items():
|
|
|
+ dbus_obj_mgr = dbus.Interface(self.bus.get_object(BLUEZ_SERVICE_NAME, '/'), DBUS_OM_IFACE)
|
|
|
+ dbus_objs = dbus_obj_mgr.GetManagedObjects()
|
|
|
+
|
|
|
+ # Check if expected device is present
|
|
|
+ for path, interfaces in dbus_objs.items():
|
|
|
if DEVICE_IFACE not in interfaces.keys():
|
|
|
continue
|
|
|
- device_addr_iface = (path.replace('_', ':')).lower()
|
|
|
- dev_addr = self.devaddr.lower()
|
|
|
- if dev_addr in device_addr_iface and \
|
|
|
- interfaces[DEVICE_IFACE].get('Name') == self.devname:
|
|
|
+
|
|
|
+ # Check expected device address is received device address
|
|
|
+ received_device_addr_path = (path.replace('_', ':')).lower()
|
|
|
+ if expected_device_addr not in received_device_addr_path:
|
|
|
+ continue
|
|
|
+
|
|
|
+ device_props = dbus.Interface(self.bus.get_object(BLUEZ_SERVICE_NAME, path), DBUS_PROP_IFACE)
|
|
|
+ received_device_name = device_props.Get(DEVICE_IFACE, 'Name').lower()
|
|
|
+
|
|
|
+ # Check expected device name is received device name
|
|
|
+ if expected_device_name == received_device_name:
|
|
|
+ # Set device iface path
|
|
|
dev_path = path
|
|
|
break
|
|
|
|
|
|
- if dev_path is None:
|
|
|
- raise Exception('\nBLE device not found')
|
|
|
+ if not dev_path:
|
|
|
+ print('\nBLE device not found')
|
|
|
+ return False
|
|
|
|
|
|
- device_props_iface_obj = dbus.Interface(self.bus.get_object(BLUEZ_SERVICE_NAME, dev_path), DBUS_PROP_IFACE)
|
|
|
- device_props_iface_obj.connect_to_signal('PropertiesChanged', props_change_handler)
|
|
|
+ print('device {} [ {} ] found'.format(devname, devaddr))
|
|
|
|
|
|
- self.device = self.bus.get_object(BLUEZ_SERVICE_NAME, dev_path)
|
|
|
+ # Set device details
|
|
|
+ self.device.iface = dbus.Interface(self.bus.get_object(BLUEZ_SERVICE_NAME, dev_path), DEVICE_IFACE)
|
|
|
+ self.device.props = dbus.Interface(self.bus.get_object(BLUEZ_SERVICE_NAME, dev_path), DBUS_PROP_IFACE)
|
|
|
+ self.device.path = dev_path
|
|
|
+ self.device.name = devname
|
|
|
+ self.device.addr = devaddr
|
|
|
return True
|
|
|
|
|
|
- def srvc_iface_added_handler(self, path, interfaces):
|
|
|
- '''
|
|
|
- Add services found as lib_ble_client obj
|
|
|
- '''
|
|
|
- if self.device and path.startswith(self.device.object_path):
|
|
|
- if GATT_SERVICE_IFACE in interfaces.keys():
|
|
|
- service = self.bus.get_object(BLUEZ_SERVICE_NAME, path)
|
|
|
- uuid = service.Get(GATT_SERVICE_IFACE, 'UUID', dbus_interface=DBUS_PROP_IFACE)
|
|
|
- if uuid not in self.srv_uuid:
|
|
|
- self.srv_uuid.append(uuid)
|
|
|
- if path not in self.services:
|
|
|
- self.services.append(path)
|
|
|
-
|
|
|
- def verify_service_uuid_found(self, service_uuid):
|
|
|
- '''
|
|
|
- Verify service uuid found
|
|
|
- '''
|
|
|
- global service_uuid_found
|
|
|
-
|
|
|
- srv_uuid_found = [uuid for uuid in self.srv_uuid if service_uuid in uuid]
|
|
|
- if srv_uuid_found:
|
|
|
- service_uuid_found = True
|
|
|
-
|
|
|
- def get_services(self, service_uuid=None):
|
|
|
+ def get_services(self):
|
|
|
'''
|
|
|
Retrieve Services found in the device connected
|
|
|
'''
|
|
|
- global signal_caught, service_uuid_found, services_resolved, verify_signal_check
|
|
|
- verify_signal_check = 0
|
|
|
- service_uuid_found = False
|
|
|
- services_resolved = False
|
|
|
- signal_caught = False
|
|
|
-
|
|
|
try:
|
|
|
- om_iface_obj = dbus.Interface(self.bus.get_object(BLUEZ_SERVICE_NAME, '/'), DBUS_OM_IFACE)
|
|
|
- self.ble_objs = om_iface_obj.GetManagedObjects()
|
|
|
- for path, interfaces in self.ble_objs.items():
|
|
|
- self.srvc_iface_added_handler(path, interfaces)
|
|
|
- # If services not found, then they may not have been added yet on dbus
|
|
|
- if not self.services:
|
|
|
- GLib.timeout_add_seconds(5, verify_signal_is_caught)
|
|
|
- om_iface_obj.connect_to_signal('InterfacesAdded', self.srvc_iface_added_handler)
|
|
|
- event_loop.run()
|
|
|
- if not services_resolved:
|
|
|
- raise Exception('Services not found...')
|
|
|
-
|
|
|
- if service_uuid:
|
|
|
- self.verify_service_uuid_found(service_uuid)
|
|
|
- if not service_uuid_found:
|
|
|
- raise Exception('Service with uuid: %s not found...' % service_uuid)
|
|
|
-
|
|
|
- # Services found
|
|
|
- return self.srv_uuid
|
|
|
- except Exception:
|
|
|
- print(traceback.format_exc())
|
|
|
- return False
|
|
|
+ # Get current dbus objects
|
|
|
+ dbus_obj_mgr = dbus.Interface(self.bus.get_object(BLUEZ_SERVICE_NAME, '/'), DBUS_OM_IFACE)
|
|
|
+ dbus_objs = dbus_obj_mgr.GetManagedObjects()
|
|
|
+
|
|
|
+ # Get services
|
|
|
+ for path, interfaces in dbus_objs.items():
|
|
|
+ if GATT_SERVICE_IFACE in interfaces.keys():
|
|
|
+ if not path.startswith(self.device.path):
|
|
|
+ continue
|
|
|
+ received_service = self.bus.get_object(BLUEZ_SERVICE_NAME, path)
|
|
|
+ # Retrieve all services on device iface path
|
|
|
+ # and set each service received
|
|
|
+ service = Service()
|
|
|
+ service.path = path
|
|
|
+ service.iface = dbus.Interface(received_service, GATT_SERVICE_IFACE)
|
|
|
+ service.props = dbus.Interface(received_service, DBUS_PROP_IFACE)
|
|
|
+ self.device.services.append(service)
|
|
|
+
|
|
|
+ if not self.device.services:
|
|
|
+ print('no services found for device: {}'.format(self.device.path))
|
|
|
+ return False
|
|
|
|
|
|
- def chrc_iface_added_handler(self, path, interfaces):
|
|
|
- '''
|
|
|
- Add services found as lib_ble_client obj
|
|
|
- '''
|
|
|
- global chrc, chrc_discovered, signal_caught
|
|
|
- chrc_val = None
|
|
|
-
|
|
|
- if self.device and path.startswith(self.device.object_path):
|
|
|
- if GATT_CHRC_IFACE in interfaces.keys():
|
|
|
- chrc = self.bus.get_object(BLUEZ_SERVICE_NAME, path)
|
|
|
- chrc_props = chrc.GetAll(GATT_CHRC_IFACE,
|
|
|
- dbus_interface=DBUS_PROP_IFACE)
|
|
|
- chrc_flags = chrc_props['Flags']
|
|
|
- if 'read' in chrc_flags:
|
|
|
- chrc_val = chrc.ReadValue({}, dbus_interface=GATT_CHRC_IFACE)
|
|
|
- uuid = chrc_props['UUID']
|
|
|
- self.chars[path] = chrc_val, chrc_flags, uuid
|
|
|
- signal_caught = True
|
|
|
+ return True
|
|
|
|
|
|
- def read_chars(self):
|
|
|
+ except Exception as err:
|
|
|
+ raise Exception('Failed to get services: {}'.format(err))
|
|
|
+
|
|
|
+ def get_chars(self):
|
|
|
'''
|
|
|
- Read characteristics found in the device connected
|
|
|
+ Get characteristics of the services set for the device connected
|
|
|
'''
|
|
|
- global iface_added, chrc_discovered, signal_caught, verify_signal_check
|
|
|
- verify_signal_check = 0
|
|
|
- chrc_discovered = False
|
|
|
- iface_added = False
|
|
|
- signal_caught = False
|
|
|
-
|
|
|
try:
|
|
|
- om_iface_obj = dbus.Interface(self.bus.get_object(BLUEZ_SERVICE_NAME, '/'), DBUS_OM_IFACE)
|
|
|
- self.ble_objs = om_iface_obj.GetManagedObjects()
|
|
|
- for path, interfaces in self.ble_objs.items():
|
|
|
- self.chrc_iface_added_handler(path, interfaces)
|
|
|
-
|
|
|
- # If chars not found, then they have not been added yet to interface
|
|
|
- time.sleep(15)
|
|
|
- if not self.chars:
|
|
|
- iface_added = True
|
|
|
- GLib.timeout_add_seconds(5, verify_signal_is_caught)
|
|
|
- om_iface_obj.connect_to_signal('InterfacesAdded', self.chars_iface_added_handler)
|
|
|
- event_loop.run()
|
|
|
- return self.chars
|
|
|
- except Exception:
|
|
|
- print(traceback.format_exc())
|
|
|
- return False
|
|
|
+ if not self.device.services:
|
|
|
+ print('No services set for device: {}'.format(self.device.path))
|
|
|
+ return
|
|
|
+
|
|
|
+ # Read chars for all the services received for device
|
|
|
+ for service in self.device.services:
|
|
|
+ char_found = False
|
|
|
+ dbus_obj_mgr = dbus.Interface(self.bus.get_object(BLUEZ_SERVICE_NAME, '/'), DBUS_OM_IFACE)
|
|
|
+ dbus_objs = dbus_obj_mgr.GetManagedObjects()
|
|
|
+ for path, interfaces in dbus_objs.items():
|
|
|
+ if GATT_CHRC_IFACE in interfaces.keys():
|
|
|
+ if not path.startswith(self.device.path):
|
|
|
+ continue
|
|
|
+ if not path.startswith(service.path):
|
|
|
+ continue
|
|
|
+ # Set characteristics
|
|
|
+ received_char = self.bus.get_object(BLUEZ_SERVICE_NAME, path)
|
|
|
+ char = Characteristic()
|
|
|
+ char.path = path
|
|
|
+ char.iface = dbus.Interface(received_char, GATT_CHRC_IFACE)
|
|
|
+ char.props = dbus.Interface(received_char, DBUS_PROP_IFACE)
|
|
|
+ service.chars.append(char)
|
|
|
+ char_found = True
|
|
|
+
|
|
|
+ if not char_found:
|
|
|
+ print('Characteristic not found for service: {}'.format(service.iface))
|
|
|
+
|
|
|
+ except Exception as err:
|
|
|
+ raise Exception('Failed to get characteristics : {}'.format(err))
|
|
|
|
|
|
- def write_chars(self, write_val):
|
|
|
+ def read_chars(self):
|
|
|
'''
|
|
|
- Write characteristics to the device connected
|
|
|
+ Read value of characteristics
|
|
|
'''
|
|
|
- chrc = None
|
|
|
- chrc_val = None
|
|
|
- char_write_props = False
|
|
|
-
|
|
|
try:
|
|
|
- for path, props in self.chars.items():
|
|
|
- if 'write' in props[1]: # check permission
|
|
|
- char_write_props = True
|
|
|
- chrc = self.bus.get_object(BLUEZ_SERVICE_NAME, path)
|
|
|
- chrc.WriteValue(write_val,{},dbus_interface=GATT_CHRC_IFACE)
|
|
|
- if 'read' in props[1]:
|
|
|
- chrc_val = chrc.ReadValue({}, dbus_interface=GATT_CHRC_IFACE)
|
|
|
- else:
|
|
|
- print('Warning: Cannot read value. Characteristic does not have read permission.')
|
|
|
- if not (ord(write_val) == int(chrc_val[0])):
|
|
|
- print('\nWrite Failed')
|
|
|
- return False
|
|
|
- self.chars[path] = chrc_val, props[1], props[2] # update value
|
|
|
- if not char_write_props:
|
|
|
- raise Exception('Failure: Cannot perform write operation. Characteristic does not have write permission.')
|
|
|
-
|
|
|
- return self.chars
|
|
|
- except Exception:
|
|
|
- print(traceback.format_exc())
|
|
|
- return False
|
|
|
-
|
|
|
- def hr_update_simulation(self, hr_srv_uuid, hr_char_uuid):
|
|
|
- '''
|
|
|
- Start Notifications
|
|
|
- Retrieve updated value
|
|
|
- Stop Notifications
|
|
|
+ if not self.device.services:
|
|
|
+ print('No services set for device: {}'.format(self.device.path))
|
|
|
+ return
|
|
|
+
|
|
|
+ # Read chars for all services of device
|
|
|
+ for service in self.device.services:
|
|
|
+ # Read properties of characteristic
|
|
|
+ for char in service.chars:
|
|
|
+ # Print path
|
|
|
+ print('Characteristic: {}'.format(char.path))
|
|
|
+ # Print uuid
|
|
|
+ uuid = char.props.Get(GATT_CHRC_IFACE, 'UUID')
|
|
|
+ print('UUID: {}'.format(uuid))
|
|
|
+ # Print flags
|
|
|
+ flags = [flag for flag in char.props.Get(GATT_CHRC_IFACE, 'Flags')]
|
|
|
+ print('Flags: {}'.format(flags))
|
|
|
+ # Read value if `read` flag is present
|
|
|
+ if 'read' in flags:
|
|
|
+ value = char.iface.ReadValue({}, dbus_interface=GATT_CHRC_IFACE)
|
|
|
+ print('Value: {}'.format(value))
|
|
|
+
|
|
|
+ except Exception as err:
|
|
|
+ raise Exception('Failed to read characteristics : {}'.format(err))
|
|
|
+
|
|
|
+ def write_chars(self, new_value):
|
|
|
+ '''
|
|
|
+ Write to characteristics
|
|
|
'''
|
|
|
- global ble_hr_chrc, verify_signal_check, signal_caught, chrc_value_cnt
|
|
|
-
|
|
|
- srv_path = None
|
|
|
- chrc = None
|
|
|
- uuid = None
|
|
|
- chrc_path = None
|
|
|
- chars_ret = None
|
|
|
- ble_hr_chrc = True
|
|
|
- chrc_value_cnt = 0
|
|
|
-
|
|
|
try:
|
|
|
- # Get HR Measurement characteristic
|
|
|
- services = list(zip_longest(self.srv_uuid, self.services))
|
|
|
- for uuid, path in services:
|
|
|
- if hr_srv_uuid in uuid:
|
|
|
- srv_path = path
|
|
|
- break
|
|
|
-
|
|
|
- if srv_path is None:
|
|
|
- raise Exception('Failure: HR UUID:', hr_srv_uuid, 'not found')
|
|
|
-
|
|
|
- chars_ret = self.read_chars()
|
|
|
+ if not self.device.services:
|
|
|
+ print('No services set for device: {}'.format(self.device.path))
|
|
|
+ return False
|
|
|
|
|
|
- for path, props in chars_ret.items():
|
|
|
- if path.startswith(srv_path):
|
|
|
- chrc = self.bus.get_object(BLUEZ_SERVICE_NAME, path)
|
|
|
- chrc_path = path
|
|
|
- if hr_char_uuid in props[2]: # uuid
|
|
|
- break
|
|
|
- if chrc is None:
|
|
|
- raise Exception('Failure: Characteristics for service: ', srv_path, 'not found')
|
|
|
+ print('writing data to characteristics with read and write permission')
|
|
|
+ # Read chars of all services of device
|
|
|
+ for service in self.device.services:
|
|
|
+ if not service.chars:
|
|
|
+ print('No chars found for service: {}'.format(service.path))
|
|
|
+ continue
|
|
|
+ for char in service.chars:
|
|
|
+ flags = [flag.lower() for flag in char.props.Get(GATT_CHRC_IFACE, 'Flags')]
|
|
|
+ if not ('read' in flags and 'write' in flags):
|
|
|
+ continue
|
|
|
|
|
|
- # Subscribe to notifications
|
|
|
- print('\nSubscribe to notifications: On')
|
|
|
- chrc.StartNotify(dbus_interface=GATT_CHRC_IFACE)
|
|
|
+ # Write new value to characteristic
|
|
|
+ curr_value = char.iface.ReadValue({}, dbus_interface=GATT_CHRC_IFACE)
|
|
|
+ print('current value: {}'.format(curr_value))
|
|
|
|
|
|
- chrc_props_iface_obj = dbus.Interface(self.bus.get_object(BLUEZ_SERVICE_NAME, chrc_path), DBUS_PROP_IFACE)
|
|
|
- chrc_props_iface_obj.connect_to_signal('PropertiesChanged', props_change_handler)
|
|
|
+ print('writing {} to characteristic {}'.format(new_value, char.path))
|
|
|
+ char.iface.WriteValue(new_value, {}, dbus_interface=GATT_CHRC_IFACE)
|
|
|
|
|
|
- signal_caught = False
|
|
|
- verify_signal_check = 0
|
|
|
- GLib.timeout_add_seconds(5, verify_signal_is_caught)
|
|
|
- event_loop.run()
|
|
|
- chrc.StopNotify(dbus_interface=GATT_CHRC_IFACE)
|
|
|
- time.sleep(2)
|
|
|
- print('\nSubscribe to notifications: Off')
|
|
|
+ time.sleep(5)
|
|
|
+ updated_value = char.iface.ReadValue({}, dbus_interface=GATT_CHRC_IFACE)
|
|
|
+ print('updated value: {}'.format(updated_value))
|
|
|
|
|
|
- ble_hr_chrc = False
|
|
|
- return True
|
|
|
+ if not (ord(new_value) == int(updated_value[0])):
|
|
|
+ print('write operation to {} failed'.format(char.path))
|
|
|
+ return False
|
|
|
+ print('write operation to {} successful'.format(char.path))
|
|
|
+ return True
|
|
|
|
|
|
- except Exception:
|
|
|
- print(traceback.format_exc())
|
|
|
- return False
|
|
|
+ except Exception as err:
|
|
|
+ raise Exception('Failed to write to characteristics: {}'.format(err))
|
|
|
|
|
|
- def create_and_reg_gatt_app(self):
|
|
|
+ def get_char_if_exists(self, char_uuid):
|
|
|
'''
|
|
|
- Create GATT data
|
|
|
- Register GATT Application
|
|
|
+ Get char if exists for given uuid
|
|
|
'''
|
|
|
- global gatt_app_obj, gatt_manager_iface_obj, gatt_app_registered
|
|
|
-
|
|
|
- gatt_app_obj = None
|
|
|
- gatt_manager_iface_obj = None
|
|
|
- gatt_app_registered = False
|
|
|
- lib_gatt.GATT_APP_OBJ = False
|
|
|
-
|
|
|
try:
|
|
|
- gatt_app_obj = lib_gatt.Application(self.bus, self.adapter_path[0])
|
|
|
- gatt_manager_iface_obj = dbus.Interface(self.bus.get_object(BLUEZ_SERVICE_NAME,self.adapter_path[0]), GATT_MANAGER_IFACE)
|
|
|
-
|
|
|
- gatt_manager_iface_obj.RegisterApplication(gatt_app_obj, {},
|
|
|
- reply_handler=self.gatt_app_handler,
|
|
|
- error_handler=self.gatt_app_error_handler)
|
|
|
- return True
|
|
|
- except Exception:
|
|
|
- print(traceback.format_exc())
|
|
|
+ for service in self.device.services:
|
|
|
+ for char in service.chars:
|
|
|
+ curr_uuid = char.props.Get(GATT_CHRC_IFACE, 'UUID')
|
|
|
+ if char_uuid.lower() in curr_uuid.lower():
|
|
|
+ return char
|
|
|
+ print('char {} not found'.format(char_uuid))
|
|
|
return False
|
|
|
+ except Exception as err:
|
|
|
+ raise Exception('Failed to get char based on uuid {} - {}'.format(char_uuid, err))
|
|
|
|
|
|
- def gatt_app_handler(self):
|
|
|
- '''
|
|
|
- GATT Application Register success handler
|
|
|
- '''
|
|
|
- global gatt_app_registered
|
|
|
- gatt_app_registered = True
|
|
|
-
|
|
|
- def gatt_app_error_handler(self, error):
|
|
|
- '''
|
|
|
- GATT Application Register error handler
|
|
|
- '''
|
|
|
- global gatt_app_registered
|
|
|
- gatt_app_registered = False
|
|
|
+ def get_service_if_exists(self, service_uuid):
|
|
|
+ try:
|
|
|
+ for service in self.device.services:
|
|
|
+ uuid = service.props.Get(GATT_SERVICE_IFACE, 'UUID')
|
|
|
+ if service_uuid.lower() in uuid.lower():
|
|
|
+ return service
|
|
|
+ print('service {} not found'.format(service_uuid))
|
|
|
+ return False
|
|
|
+ except Exception as err:
|
|
|
+ raise Exception('Failed to get service based on uuid {} - {}'.format(service_uuid, err))
|
|
|
|
|
|
- def start_advertising(self, adv_host_name, adv_iface_index, adv_type, adv_uuid):
|
|
|
- '''
|
|
|
- Create Advertising data
|
|
|
- Register Advertisement
|
|
|
- Start Advertising
|
|
|
+ def start_notify(self, char):
|
|
|
+ try:
|
|
|
+ notify_started = 0
|
|
|
+ notifying = char.props.Get(GATT_CHRC_IFACE, 'Notifying')
|
|
|
+ if notifying == 0:
|
|
|
+ # Start Notify
|
|
|
+ char.iface.StartNotify()
|
|
|
+ notify_started = 1
|
|
|
+ # Check notify started
|
|
|
+ for _ in range(10, 0, -1):
|
|
|
+ notifying = char.props.Get(GATT_CHRC_IFACE, 'Notifying')
|
|
|
+ if notifying == 1:
|
|
|
+ print('subscribe to notifications: on')
|
|
|
+ break
|
|
|
+ if notifying == 0:
|
|
|
+ print('Failed to start notifications')
|
|
|
+ return False
|
|
|
+
|
|
|
+ # Get updated value
|
|
|
+ for _ in range(10, 0, -1):
|
|
|
+ time.sleep(1)
|
|
|
+ char_value = char.props.Get(GATT_CHRC_IFACE, 'Value')
|
|
|
+ print(char_value)
|
|
|
+
|
|
|
+ return None
|
|
|
+
|
|
|
+ except Exception as err:
|
|
|
+ raise Exception('Failed to perform notification operation: {}'.format(err))
|
|
|
+ finally:
|
|
|
+ try:
|
|
|
+ if notify_started == 1:
|
|
|
+ # Stop notify
|
|
|
+ char.iface.StopNotify()
|
|
|
+ for _ in range(10, 0, -1):
|
|
|
+ notifying = char.props.Get(GATT_CHRC_IFACE, 'Notifying')
|
|
|
+ if notifying == 0:
|
|
|
+ print('subscribe to notifications: off')
|
|
|
+ break
|
|
|
+ if notifying == 1:
|
|
|
+ print('Failed to stop notifications')
|
|
|
+ except dbus.exceptions.DBusException as dbus_err:
|
|
|
+ print('Warning: Failure during cleanup for start notify : {}'.format(dbus_err))
|
|
|
+
|
|
|
+ def _create_mainloop(self):
|
|
|
+ '''
|
|
|
+ Create GLibMainLoop
|
|
|
+ '''
|
|
|
+ if not self.mainloop:
|
|
|
+ self.mainloop = GLib.MainLoop()
|
|
|
+
|
|
|
+ def register_gatt_app(self):
|
|
|
+ '''
|
|
|
+ Create Gatt Application
|
|
|
+ Register Gatt Application
|
|
|
'''
|
|
|
- global le_adv_obj, le_adv_manager_iface_obj, adv_active_instance, adv_registered
|
|
|
+ try:
|
|
|
+ # Create mainloop, if does not exist
|
|
|
+ self._create_mainloop()
|
|
|
|
|
|
- le_adv_obj = None
|
|
|
- le_adv_manager_iface_obj = None
|
|
|
- le_adv_iface_path = None
|
|
|
- adv_active_instance = False
|
|
|
- adv_registered = False
|
|
|
- lib_gap.ADV_OBJ = False
|
|
|
+ # Create Gatt Application
|
|
|
+ self.gatt_app = lib_gatt.AlertNotificationApp(self.bus, self.adapter.path)
|
|
|
+ print('GATT Application created')
|
|
|
+ self.gatt_mgr = dbus.Interface(self.bus.get_object(BLUEZ_SERVICE_NAME, self.adapter.path), GATT_MANAGER_IFACE)
|
|
|
|
|
|
- try:
|
|
|
- print('Advertising started')
|
|
|
- gatt_app_ret = self.create_and_reg_gatt_app()
|
|
|
+ # Register Gatt Application
|
|
|
+ self.gatt_mgr.RegisterApplication(
|
|
|
+ self.gatt_app, {},
|
|
|
+ reply_handler=self.gatt_app_success_handler,
|
|
|
+ error_handler=self.gatt_app_error_handler)
|
|
|
+ self.mainloop.run()
|
|
|
|
|
|
- # Check if gatt app create and register command
|
|
|
- # is sent successfully
|
|
|
- assert gatt_app_ret
|
|
|
+ except dbus.exceptions.DBusException as dbus_err:
|
|
|
+ raise DBusException('Failed to create GATT Application : {}'.format(dbus_err))
|
|
|
+ except Exception as err:
|
|
|
+ raise Exception('Failed to register Gatt Application: {}'.format(err))
|
|
|
|
|
|
- GLib.timeout_add_seconds(2, self.verify_gatt_app_reg)
|
|
|
- event_loop.run()
|
|
|
+ def gatt_app_success_handler(self):
|
|
|
+ print('GATT Application successfully registered')
|
|
|
+ self.mainloop.quit()
|
|
|
|
|
|
- # Check if Gatt Application is registered
|
|
|
- assert gatt_app_registered
|
|
|
+ def gatt_app_error_handler(self):
|
|
|
+ raise DBusException('Failed to register GATT Application')
|
|
|
|
|
|
- for path,interface in self.ble_objs.items():
|
|
|
- if LE_ADVERTISING_MANAGER_IFACE in interface:
|
|
|
+ def check_le_iface(self):
|
|
|
+ '''
|
|
|
+ Check if LEAdvertisingManager1 interface exists
|
|
|
+ '''
|
|
|
+ try:
|
|
|
+ dbus_obj_mgr = dbus.Interface(self.bus.get_object(BLUEZ_SERVICE_NAME, '/'), DBUS_OM_IFACE)
|
|
|
+ dbus_objs = dbus_obj_mgr.GetManagedObjects()
|
|
|
+ for path, iface in dbus_objs.items():
|
|
|
+ if LE_ADVERTISING_MANAGER_IFACE in iface:
|
|
|
le_adv_iface_path = path
|
|
|
-
|
|
|
+ break
|
|
|
# Check LEAdvertisingManager1 interface is found
|
|
|
assert le_adv_iface_path, '\n Cannot start advertising. LEAdvertisingManager1 Interface not found'
|
|
|
|
|
|
- # Get device when connected
|
|
|
- if not self.device:
|
|
|
- om_iface_obj = dbus.Interface(self.bus.get_object(BLUEZ_SERVICE_NAME, '/'), DBUS_OM_IFACE)
|
|
|
- self.ble_objs = om_iface_obj.GetManagedObjects()
|
|
|
+ return le_adv_iface_path
|
|
|
|
|
|
- for path, interfaces in self.ble_objs.items():
|
|
|
- if DEVICE_IFACE not in interfaces.keys():
|
|
|
- continue
|
|
|
- device_props_iface_obj = dbus.Interface(self.bus.get_object(BLUEZ_SERVICE_NAME, path), DBUS_PROP_IFACE)
|
|
|
- device_props_iface_obj.connect_to_signal('PropertiesChanged', props_change_handler)
|
|
|
- self.device = self.bus.get_object(BLUEZ_SERVICE_NAME, path)
|
|
|
+ except AssertionError:
|
|
|
+ raise
|
|
|
+ except Exception as err:
|
|
|
+ raise Exception('Failed to find LEAdvertisingManager1 interface: {}'.format(err))
|
|
|
|
|
|
- le_adv_obj = lib_gap.Advertisement(self.bus, adv_iface_index, adv_type, adv_uuid, adv_host_name)
|
|
|
- le_adv_manager_iface_obj = dbus.Interface(self.bus.get_object(BLUEZ_SERVICE_NAME, le_adv_iface_path), LE_ADVERTISING_MANAGER_IFACE)
|
|
|
+ def register_adv(self, adv_host_name, adv_type, adv_uuid):
|
|
|
+ try:
|
|
|
+ # Gatt Application is expected to be registered
|
|
|
+ if not self.gatt_app:
|
|
|
+ print('No Gatt Application is registered')
|
|
|
+ return
|
|
|
+
|
|
|
+ adv_iface_index = 0
|
|
|
+
|
|
|
+ # Create mainloop, if does not exist
|
|
|
+ self._create_mainloop()
|
|
|
+
|
|
|
+ # Check LEAdvertisingManager1 interface exists
|
|
|
+ le_iface_path = self.check_le_iface()
|
|
|
+
|
|
|
+ # Create Advertisement data
|
|
|
+ leadv_obj = lib_gap.Advertisement(
|
|
|
+ self.bus,
|
|
|
+ adv_iface_index,
|
|
|
+ adv_type,
|
|
|
+ adv_uuid,
|
|
|
+ adv_host_name)
|
|
|
+ print('Advertisement registered')
|
|
|
|
|
|
- le_adv_manager_iface_obj.RegisterAdvertisement(le_adv_obj.get_path(), {},
|
|
|
- reply_handler=self.adv_handler,
|
|
|
- error_handler=self.adv_error_handler)
|
|
|
+ # Register Advertisement
|
|
|
+ leadv_mgr_iface_obj = dbus.Interface(self.bus.get_object(BLUEZ_SERVICE_NAME, le_iface_path), LE_ADVERTISING_MANAGER_IFACE)
|
|
|
+ leadv_mgr_iface_obj.RegisterAdvertisement(
|
|
|
+ leadv_obj.get_path(), {},
|
|
|
+ reply_handler=self.adv_success_handler,
|
|
|
+ error_handler=self.adv_error_handler)
|
|
|
|
|
|
- GLib.timeout_add_seconds(2, self.verify_adv_reg)
|
|
|
- event_loop.run()
|
|
|
+ # Handler to read events received and exit from mainloop
|
|
|
+ GLib.timeout_add_seconds(3, self.check_adv)
|
|
|
|
|
|
- # Check if advertising is registered
|
|
|
- assert adv_registered
|
|
|
+ self.mainloop.run()
|
|
|
|
|
|
- ret_val = self.verify_blecent_app()
|
|
|
+ except AssertionError:
|
|
|
+ raise
|
|
|
+ except dbus.exceptions.DBusException as dbus_err:
|
|
|
+ raise DBusException('Failure during registering advertisement : {}'.format(dbus_err))
|
|
|
+ except Exception as err:
|
|
|
+ raise Exception('Failure during registering advertisement : {}'.format(err))
|
|
|
+ else:
|
|
|
+ try:
|
|
|
+ try:
|
|
|
+ # Stop Notify if not already stopped
|
|
|
+ chars = self.gatt_app.service.get_characteristics()
|
|
|
+ for char in chars:
|
|
|
+ if char.uuid == lib_gatt.CHAR_UUIDS['UNREAD_ALERT_STATUS_UUID']:
|
|
|
+ if char.notifying:
|
|
|
+ char.StopNotify()
|
|
|
+ except dbus.exceptions.DBusException as dbus_err:
|
|
|
+ print('Warning: {}'.format(dbus_err))
|
|
|
|
|
|
- # Check if blecent has passed
|
|
|
- assert ret_val
|
|
|
+ try:
|
|
|
+ # Unregister Advertisement
|
|
|
+ leadv_mgr_iface_obj.UnregisterAdvertisement(leadv_obj.get_path())
|
|
|
+ except dbus.exceptions.DBusException as dbus_err:
|
|
|
+ print('Warning: {}'.format(dbus_err))
|
|
|
|
|
|
- except Exception:
|
|
|
- print(traceback.format_exc())
|
|
|
- return False
|
|
|
+ try:
|
|
|
+ # Remove advertising data
|
|
|
+ dbus.service.Object.remove_from_connection(leadv_obj)
|
|
|
+ except LookupError as err:
|
|
|
+ print('Warning: Failed to remove connection from dbus for advertisement object: {} - {}'.format(leadv_obj, err))
|
|
|
|
|
|
- def verify_blecent_app(self):
|
|
|
- '''
|
|
|
- Verify read/write/subscribe operations
|
|
|
- '''
|
|
|
- try:
|
|
|
- GLib.timeout_add_seconds(2, self.verify_blecent)
|
|
|
- event_loop.run()
|
|
|
+ try:
|
|
|
+ # Unregister Gatt Application
|
|
|
+ self.gatt_mgr.UnregisterApplication(self.gatt_app.get_path())
|
|
|
+ except dbus.exceptions.DBusException as dbus_err:
|
|
|
+ print('Warning: {}'.format(dbus_err))
|
|
|
|
|
|
- return test_checks_pass
|
|
|
+ try:
|
|
|
+ # Remove Gatt Application
|
|
|
+ dbus.service.Object.remove_from_connection(self.gatt_app)
|
|
|
+ except LookupError as err:
|
|
|
+ print('Warning: Failed to remove connection from dbus for Gatt application object: {} - {}'.format(self.gatt_app, err))
|
|
|
|
|
|
- except Exception:
|
|
|
- print(traceback.format_exc())
|
|
|
- return False
|
|
|
+ except RuntimeError as err:
|
|
|
+ print('Warning: Failure during cleanup of Advertisement: {}'.format(err))
|
|
|
|
|
|
- def adv_handler(self):
|
|
|
- '''
|
|
|
- Advertisement Register success handler
|
|
|
- '''
|
|
|
- global adv_registered
|
|
|
- adv_registered = True
|
|
|
+ def adv_success_handler(self):
|
|
|
+ print('Registered Advertisement successfully')
|
|
|
+
|
|
|
+ def adv_error_handler(self, err):
|
|
|
+ raise DBusException('{}'.format(err))
|
|
|
|
|
|
- def adv_error_handler(self, error):
|
|
|
+ def check_adv(self):
|
|
|
'''
|
|
|
- Advertisement Register error handler
|
|
|
+ Handler to check for events triggered (read/write/subscribe)
|
|
|
+ for advertisement registered for AlertNotificationApp
|
|
|
'''
|
|
|
- global adv_registered
|
|
|
- adv_registered = False
|
|
|
-
|
|
|
- def verify_gatt_app_reg(self):
|
|
|
- """
|
|
|
- Verify GATT Application is registered
|
|
|
- """
|
|
|
- global gatt_app_retry_check_cnt, gatt_checks_done
|
|
|
- gatt_app_retry_check_cnt = 0
|
|
|
- gatt_checks_done = False
|
|
|
-
|
|
|
- # Check for success
|
|
|
- if lib_gatt.GATT_APP_OBJ:
|
|
|
- print('GATT Data created')
|
|
|
- if gatt_app_registered:
|
|
|
- print('GATT Application registered')
|
|
|
- gatt_checks_done = True
|
|
|
- if gatt_app_retry_check_cnt == 20:
|
|
|
- if not gatt_app_registered:
|
|
|
- print('Failure: GATT Application could not be registered')
|
|
|
- gatt_checks_done = True
|
|
|
-
|
|
|
- # End polling if app is registered or cnt has reached 10
|
|
|
- if gatt_checks_done:
|
|
|
- if event_loop.is_running():
|
|
|
- event_loop.quit()
|
|
|
- return False # polling for checks will stop
|
|
|
-
|
|
|
- gatt_app_retry_check_cnt += 1
|
|
|
- # Default return True - polling for checks will continue
|
|
|
- return True
|
|
|
-
|
|
|
- def verify_adv_reg(self):
|
|
|
- """
|
|
|
- Verify Advertisement is registered
|
|
|
- """
|
|
|
- global adv_retry_check_cnt, adv_checks_done
|
|
|
- adv_retry_check_cnt = 0
|
|
|
- adv_checks_done = False
|
|
|
-
|
|
|
- if lib_gap.ADV_OBJ:
|
|
|
- print('Advertising data created')
|
|
|
- if adv_registered or adv_active_instance:
|
|
|
- print('Advertisement registered')
|
|
|
- adv_checks_done = True
|
|
|
- if adv_retry_check_cnt == 10:
|
|
|
- if not adv_registered and not adv_active_instance:
|
|
|
- print('Failure: Advertisement could not be registered')
|
|
|
- adv_checks_done = True
|
|
|
-
|
|
|
- # End polling if success or cnt has reached 10
|
|
|
- if adv_checks_done:
|
|
|
- if event_loop.is_running():
|
|
|
- event_loop.quit()
|
|
|
- return False # polling for checks will stop
|
|
|
-
|
|
|
- adv_retry_check_cnt += 1
|
|
|
- # Default return True - polling for checks will continue
|
|
|
- return True
|
|
|
-
|
|
|
- def verify_blecent(self):
|
|
|
- """
|
|
|
- Verify blecent test commands are successful
|
|
|
- """
|
|
|
- global blecent_retry_check_cnt, read_req_check, write_req_check,\
|
|
|
- subscribe_req_check, test_checks_pass
|
|
|
-
|
|
|
- # Check for failures after 10 retries
|
|
|
- if blecent_retry_check_cnt == 10:
|
|
|
- # check for failures
|
|
|
- if not read_req_check:
|
|
|
- print('Failure: Read Request not received')
|
|
|
- if not write_req_check:
|
|
|
- print('Failure: Write Request not received')
|
|
|
- if not subscribe_req_check:
|
|
|
- print('Failure: Subscribe Request not received')
|
|
|
-
|
|
|
- # Blecent Test failed
|
|
|
- test_checks_pass = False
|
|
|
- if subscribe_req_check:
|
|
|
- lib_gatt.alert_status_char_obj.StopNotify()
|
|
|
- else:
|
|
|
- # Check for success
|
|
|
- if not read_req_check and lib_gatt.CHAR_READ:
|
|
|
- read_req_check = True
|
|
|
- if not write_req_check and lib_gatt.CHAR_WRITE:
|
|
|
- write_req_check = True
|
|
|
- if not subscribe_req_check and lib_gatt.CHAR_SUBSCRIBE:
|
|
|
- subscribe_req_check = True
|
|
|
-
|
|
|
- if read_req_check and write_req_check and subscribe_req_check:
|
|
|
- # all checks passed
|
|
|
- # Blecent Test passed
|
|
|
- test_checks_pass = True
|
|
|
- lib_gatt.alert_status_char_obj.StopNotify()
|
|
|
-
|
|
|
- if (blecent_retry_check_cnt == 10 or test_checks_pass):
|
|
|
- if event_loop.is_running():
|
|
|
- event_loop.quit()
|
|
|
- return False # polling for checks will stop
|
|
|
-
|
|
|
- # Increment retry count
|
|
|
- blecent_retry_check_cnt += 1
|
|
|
-
|
|
|
- # Default return True - polling for checks will continue
|
|
|
- return True
|
|
|
+ try:
|
|
|
+ retry = 10
|
|
|
+ # Exit loop if read and write and subscribe is successful
|
|
|
+ if self.gatt_app.service.get_char_status(lib_gatt.CHAR_UUIDS['SUPPORT_NEW_ALERT_UUID'], 'read') and \
|
|
|
+ self.gatt_app.service.get_char_status(lib_gatt.CHAR_UUIDS['ALERT_NOTIF_UUID'], 'write') and \
|
|
|
+ self.gatt_app.service.get_char_status(lib_gatt.CHAR_UUIDS['UNREAD_ALERT_STATUS_UUID'], 'notify'):
|
|
|
+ if self.mainloop.is_running():
|
|
|
+ self.mainloop.quit()
|
|
|
+ # return False to stop polling
|
|
|
+ return False
|
|
|
+
|
|
|
+ self.loop_cnt += 1
|
|
|
+ print('Check read/write/subscribe events are received...Retry {}'.format(self.loop_cnt))
|
|
|
+
|
|
|
+ # Exit loop if max retry value is reached and
|
|
|
+ # all three events (read and write and subscribe) have not yet passed
|
|
|
+ # Retry total 10 times
|
|
|
+ if self.loop_cnt == (retry - 1):
|
|
|
+ if self.mainloop.is_running():
|
|
|
+ self.mainloop.quit()
|
|
|
+ # return False to stop polling
|
|
|
+ return False
|
|
|
+
|
|
|
+ # return True to continue polling
|
|
|
+ return True
|
|
|
|
|
|
- def verify_blecent_disconnect(self):
|
|
|
- """
|
|
|
- Verify cleanup is successful
|
|
|
- """
|
|
|
- global blecent_retry_check_cnt, gatt_app_obj_check, gatt_app_reg_check,\
|
|
|
- adv_data_check, adv_reg_check, adv_stop
|
|
|
-
|
|
|
- if blecent_retry_check_cnt == 0:
|
|
|
- gatt_app_obj_check = False
|
|
|
- gatt_app_reg_check = False
|
|
|
- adv_data_check = False
|
|
|
- adv_reg_check = False
|
|
|
-
|
|
|
- # Check for failures after 10 retries
|
|
|
- if blecent_retry_check_cnt == 10:
|
|
|
- # check for failures
|
|
|
- if not gatt_app_obj_check:
|
|
|
- print('Warning: GATT Data could not be removed')
|
|
|
- if not gatt_app_reg_check:
|
|
|
- print('Warning: GATT Application could not be unregistered')
|
|
|
- if not adv_data_check:
|
|
|
- print('Warning: Advertising data could not be removed')
|
|
|
- if not adv_reg_check:
|
|
|
- print('Warning: Advertisement could not be unregistered')
|
|
|
-
|
|
|
- # Blecent Test failed
|
|
|
- adv_stop = False
|
|
|
- else:
|
|
|
- # Check for success
|
|
|
- if not gatt_app_obj_check and not lib_gatt.GATT_APP_OBJ:
|
|
|
- print('GATT Data removed')
|
|
|
- gatt_app_obj_check = True
|
|
|
- if not gatt_app_reg_check and not gatt_app_registered:
|
|
|
- print('GATT Application unregistered')
|
|
|
- gatt_app_reg_check = True
|
|
|
- if not adv_data_check and not adv_reg_check and not (adv_registered or adv_active_instance or lib_gap.ADV_OBJ):
|
|
|
- print('Advertising data removed')
|
|
|
- print('Advertisement unregistered')
|
|
|
- adv_data_check = True
|
|
|
- adv_reg_check = True
|
|
|
- # all checks passed
|
|
|
- adv_stop = True
|
|
|
-
|
|
|
- if (blecent_retry_check_cnt == 10 or adv_stop):
|
|
|
- if event_loop.is_running():
|
|
|
- event_loop.quit()
|
|
|
- return False # polling for checks will stop
|
|
|
-
|
|
|
- # Increment retry count
|
|
|
- blecent_retry_check_cnt += 1
|
|
|
-
|
|
|
- # Default return True - polling for checks will continue
|
|
|
- return True
|
|
|
+ except RuntimeError as err:
|
|
|
+ print('Failure in advertisment handler: {}'.format(err))
|
|
|
+ if self.mainloop.is_running():
|
|
|
+ self.mainloop.quit()
|
|
|
+ # return False to stop polling
|
|
|
+ return False
|
|
|
|
|
|
def disconnect(self):
|
|
|
'''
|
|
|
- Before application exits:
|
|
|
- Advertisement is unregistered
|
|
|
- Advertisement object created is removed from dbus
|
|
|
- GATT Application is unregistered
|
|
|
- GATT Application object created is removed from dbus
|
|
|
- Disconnect device if connected
|
|
|
- Adapter is powered off
|
|
|
+ Disconnect device
|
|
|
'''
|
|
|
try:
|
|
|
- global blecent_retry_check_cnt, discovery_start, verify_signal_check, signal_caught
|
|
|
- blecent_retry_check_cnt = 0
|
|
|
- verify_signal_check = 0
|
|
|
-
|
|
|
- print('\nexiting from test...')
|
|
|
-
|
|
|
- self.props_iface_obj.connect_to_signal('PropertiesChanged', props_change_handler)
|
|
|
-
|
|
|
- if adv_registered:
|
|
|
- # Unregister Advertisement
|
|
|
- le_adv_manager_iface_obj.UnregisterAdvertisement(le_adv_obj.get_path())
|
|
|
-
|
|
|
- # Remove Advertising data
|
|
|
- dbus.service.Object.remove_from_connection(le_adv_obj)
|
|
|
-
|
|
|
- if gatt_app_registered:
|
|
|
- # Unregister GATT Application
|
|
|
- gatt_manager_iface_obj.UnregisterApplication(gatt_app_obj.get_path())
|
|
|
-
|
|
|
- # Remove GATT data
|
|
|
- dbus.service.Object.remove_from_connection(gatt_app_obj)
|
|
|
-
|
|
|
- GLib.timeout_add_seconds(5, self.verify_blecent_disconnect)
|
|
|
- event_loop.run()
|
|
|
-
|
|
|
- if adv_stop:
|
|
|
- print('Stop Advertising status: ', adv_stop)
|
|
|
- else:
|
|
|
- print('Warning: Stop Advertising status: ', adv_stop)
|
|
|
-
|
|
|
+ if not self.device or not self.device.iface:
|
|
|
+ return
|
|
|
+ print('disconnecting device')
|
|
|
# Disconnect device
|
|
|
- if self.device:
|
|
|
- print('disconnecting device...')
|
|
|
- self.device.Disconnect(dbus_interface=DEVICE_IFACE)
|
|
|
- if self.adapter:
|
|
|
- self.adapter.RemoveDevice(self.device)
|
|
|
+ device_conn = self.device.props.Get(DEVICE_IFACE, 'Connected')
|
|
|
+ if device_conn == 1:
|
|
|
+ self.device.iface.Disconnect(dbus_interface=DEVICE_IFACE)
|
|
|
+ for cnt in range(10, 0, -1):
|
|
|
+ time.sleep(5)
|
|
|
+ device_conn = self.device.props.Get(DEVICE_IFACE, 'Connected')
|
|
|
+ if device_conn == 0:
|
|
|
+ print('device disconnected')
|
|
|
+ break
|
|
|
+ print('number of retries left ({})'.format(cnt - 1))
|
|
|
+ if device_conn == 1:
|
|
|
+ print('failed to disconnect device')
|
|
|
+
|
|
|
+ self.adapter.iface.RemoveDevice(self.device.iface)
|
|
|
self.device = None
|
|
|
- if discovery_start:
|
|
|
- self.adapter.StopDiscovery()
|
|
|
- discovery_start = False
|
|
|
- time.sleep(15)
|
|
|
-
|
|
|
- signal_caught = False
|
|
|
- GLib.timeout_add_seconds(5, verify_signal_is_caught)
|
|
|
- event_loop.run()
|
|
|
-
|
|
|
- if not device_connected:
|
|
|
- print('device disconnected')
|
|
|
- else:
|
|
|
- print('Warning: device could not be disconnected')
|
|
|
-
|
|
|
- except Exception:
|
|
|
- print(traceback.format_exc())
|
|
|
- return False
|
|
|
+
|
|
|
+ except dbus.exceptions.DBusException as dbus_err:
|
|
|
+ print('Warning: {}'.format(dbus_err))
|
|
|
+ except Exception as err:
|
|
|
+ raise Exception('Failed to disconnect device: {}'.format(err))
|