| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430 |
- #!/usr/bin/env python
- #
- # Copyright 2019 Espressif Systems (Shanghai) PTE LTD
- #
- # Licensed under the Apache License, Version 2.0 (the "License");
- # you may not use this file except in compliance with the License.
- # You may obtain a copy of the License at
- #
- # http://www.apache.org/licenses/LICENSE-2.0
- #
- # Unless required by applicable law or agreed to in writing, software
- # distributed under the License is distributed on an "AS IS" BASIS,
- # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- # See the License for the specific language governing permissions and
- # limitations under the License.
- #
- # Creating GATT Application which then becomes available to remote devices.
- from __future__ import print_function
- import sys
- try:
- import dbus
- import dbus.service
- except ImportError as e:
- if 'linux' not in sys.platform:
- raise e
- print(e)
- print('Install packages `libgirepository1.0-dev gir1.2-gtk-3.0 libcairo2-dev libdbus-1-dev libdbus-glib-1-dev` for resolving the issue')
- print('Run `pip install -r $IDF_PATH/tools/ble/requirements.txt` for resolving the issue')
- raise
- DBUS_OM_IFACE = 'org.freedesktop.DBus.ObjectManager'
- DBUS_PROP_IFACE = 'org.freedesktop.DBus.Properties'
- GATT_MANAGER_IFACE = 'org.bluez.GattManager1'
- GATT_SERVICE_IFACE = 'org.bluez.GattService1'
- GATT_CHRC_IFACE = 'org.bluez.GattCharacteristic1'
- GATT_DESC_IFACE = 'org.bluez.GattDescriptor1'
- SERVICE_UUIDS = {
- 'ALERT_NOTIF_SVC_UUID': '00001811-0000-1000-8000-00805f9b34fb'
- }
- CHAR_UUIDS = {
- 'SUPPORT_NEW_ALERT_UUID': '00002A47-0000-1000-8000-00805f9b34fb',
- 'ALERT_NOTIF_UUID': '00002A44-0000-1000-8000-00805f9b34fb',
- 'UNREAD_ALERT_STATUS_UUID': '00002A45-0000-1000-8000-00805f9b34fb'
- }
- DESCR_UUIDS = {
- 'CCCD_UUID': '00002902-0000-1000-8000-00805f9b34fb'
- }
- class InvalidArgsException(dbus.exceptions.DBusException):
- _dbus_error_name = 'org.freedesktop.DBus.Error.InvalidArgs'
- class NotSupportedException(dbus.exceptions.DBusException):
- _dbus_error_name = 'org.bluez.Error.NotSupported'
- class Application(dbus.service.Object):
- """
- org.bluez.GattApplication1 interface implementation
- """
- def __init__(self, bus, path):
- self.path = path
- self.services = []
- dbus.service.Object.__init__(self, bus, self.path)
- def __del__(self):
- pass
- def add_service(self, service):
- self.services.append(service)
- @dbus.service.method(DBUS_OM_IFACE, out_signature='a{oa{sa{sv}}}')
- def GetManagedObjects(self):
- response = {}
- for service in self.services:
- response[service.get_path()] = service.get_properties()
- chrcs = service.get_characteristics()
- for chrc in chrcs:
- response[chrc.get_path()] = chrc.get_properties()
- descs = chrc.get_descriptors()
- for desc in descs:
- response[desc.get_path()] = desc.get_properties()
- return response
- def get_path(self):
- return dbus.ObjectPath(self.path)
- def Release(self):
- pass
- class AlertNotificationApp(Application):
- '''
- Alert Notification Application
- '''
- def __init__(self, bus, path):
- Application.__init__(self, bus, path)
- self.service = AlertNotificationService(bus, '0001')
- self.add_service(self.service)
- class Service(dbus.service.Object):
- """
- org.bluez.GattService1 interface implementation
- """
- PATH_BASE = '/org/bluez/hci0/service'
- def __init__(self, bus, index, uuid, primary=False):
- self.path = self.PATH_BASE + str(index)
- self.bus = bus
- self.uuid = uuid
- self.primary = primary
- self.characteristics = []
- dbus.service.Object.__init__(self, bus, self.path)
- def get_properties(self):
- return {
- GATT_SERVICE_IFACE: {
- 'UUID': self.uuid,
- 'Primary': self.primary,
- 'Characteristics': dbus.Array(
- self.get_characteristic_paths(),
- signature='o')
- }
- }
- def get_path(self):
- return dbus.ObjectPath(self.path)
- def add_characteristic(self, characteristic):
- self.characteristics.append(characteristic)
- def get_characteristic_paths(self):
- result = []
- for chrc in self.characteristics:
- result.append(chrc.get_path())
- return result
- def get_characteristics(self):
- return self.characteristics
- @dbus.service.method(DBUS_PROP_IFACE,
- in_signature='s',
- out_signature='a{sv}')
- def GetAll(self, interface):
- if interface != GATT_SERVICE_IFACE:
- raise InvalidArgsException()
- return self.get_properties()[GATT_SERVICE_IFACE]
- class Characteristic(dbus.service.Object):
- """
- org.bluez.GattCharacteristic1 interface implementation
- """
- def __init__(self, bus, index, uuid, flags, service):
- self.path = service.path + '/char' + str(index)
- self.bus = bus
- self.uuid = uuid
- self.service = service
- self.flags = flags
- self.value = [0]
- self.descriptors = []
- dbus.service.Object.__init__(self, bus, self.path)
- def get_properties(self):
- return {
- GATT_CHRC_IFACE: {
- 'Service': self.service.get_path(),
- 'UUID': self.uuid,
- 'Flags': self.flags,
- 'Value': self.value,
- 'Descriptors': dbus.Array(self.get_descriptor_paths(), signature='o')
- }
- }
- def get_path(self):
- return dbus.ObjectPath(self.path)
- def add_descriptor(self, descriptor):
- self.descriptors.append(descriptor)
- def get_descriptor_paths(self):
- result = []
- for desc in self.descriptors:
- result.append(desc.get_path())
- return result
- def get_descriptors(self):
- return self.descriptors
- @dbus.service.method(DBUS_PROP_IFACE, in_signature='s', out_signature='a{sv}')
- def GetAll(self, interface):
- if interface != GATT_CHRC_IFACE:
- raise InvalidArgsException()
- return self.get_properties()[GATT_CHRC_IFACE]
- @dbus.service.method(GATT_CHRC_IFACE, in_signature='a{sv}', out_signature='ay')
- def ReadValue(self, options):
- print('\nDefault ReadValue called, returning error')
- raise NotSupportedException()
- @dbus.service.method(GATT_CHRC_IFACE, in_signature='aya{sv}')
- def WriteValue(self, value, options):
- print('\nDefault WriteValue called, returning error')
- raise NotSupportedException()
- @dbus.service.method(GATT_CHRC_IFACE)
- def StartNotify(self):
- print('Default StartNotify called, returning error')
- raise NotSupportedException()
- @dbus.service.method(GATT_CHRC_IFACE)
- def StopNotify(self):
- print('Default StopNotify called, returning error')
- raise NotSupportedException()
- @dbus.service.signal(DBUS_PROP_IFACE,
- signature='sa{sv}as')
- def PropertiesChanged(self, interface, changed, invalidated):
- pass
- # print('\nProperties Changed')
- class Descriptor(dbus.service.Object):
- """
- org.bluez.GattDescriptor1 interface implementation
- """
- def __init__(self, bus, index, uuid, flags, characteristic):
- self.path = characteristic.path + '/desc' + str(index)
- self.bus = bus
- self.uuid = uuid
- self.flags = flags
- self.chrc = characteristic
- dbus.service.Object.__init__(self, bus, self.path)
- def get_properties(self):
- return {
- GATT_DESC_IFACE: {
- 'Characteristic': self.chrc.get_path(),
- 'UUID': self.uuid,
- 'Flags': self.flags,
- }
- }
- def get_path(self):
- return dbus.ObjectPath(self.path)
- @dbus.service.method(DBUS_PROP_IFACE,
- in_signature='s',
- out_signature='a{sv}')
- def GetAll(self, interface):
- if interface != GATT_DESC_IFACE:
- raise InvalidArgsException()
- return self.get_properties()[GATT_DESC_IFACE]
- @dbus.service.method(GATT_DESC_IFACE, in_signature='a{sv}', out_signature='ay')
- def ReadValue(self, options):
- print('Default ReadValue called, returning error')
- raise NotSupportedException()
- @dbus.service.method(GATT_DESC_IFACE, in_signature='aya{sv}')
- def WriteValue(self, value, options):
- print('Default WriteValue called, returning error')
- raise NotSupportedException()
- @dbus.service.signal(DBUS_PROP_IFACE,
- signature='sa{sv}as')
- def PropertiesChanged(self, interface, changed, invalidated):
- pass
- # print('\nProperties Changed')
- class AlertNotificationService(Service):
- def __init__(self, bus, index):
- Service.__init__(self, bus, index, SERVICE_UUIDS['ALERT_NOTIF_SVC_UUID'], primary=True)
- self.add_characteristic(SupportedNewAlertCategoryCharacteristic(bus, '0001', self))
- self.add_characteristic(AlertNotificationControlPointCharacteristic(bus, '0002', self))
- self.add_characteristic(UnreadAlertStatusCharacteristic(bus, '0003', self))
- def get_char_status(self, uuid, status):
- for char in self.characteristics:
- if char.uuid == uuid:
- if status in char.status:
- return True
- return False
- class SupportedNewAlertCategoryCharacteristic(Characteristic):
- def __init__(self, bus, index, service):
- Characteristic.__init__(
- self, bus, index,
- CHAR_UUIDS['SUPPORT_NEW_ALERT_UUID'],
- ['read'],
- service)
- self.value = [dbus.Byte(2)]
- self.status = []
- def ReadValue(self, options):
- val_list = []
- for val in self.value:
- val_list.append(dbus.Byte(val))
- print('Read Request received\n', '\tSupportedNewAlertCategoryCharacteristic')
- print('\tValue:', '\t', val_list)
- self.status.append('read')
- return val_list
- class AlertNotificationControlPointCharacteristic(Characteristic):
- def __init__(self, bus, index, service):
- Characteristic.__init__(
- self, bus, index,
- CHAR_UUIDS['ALERT_NOTIF_UUID'],
- ['read', 'write'],
- service)
- self.value = [dbus.Byte(0)]
- self.status = []
- def ReadValue(self, options):
- val_list = []
- for val in self.value:
- val_list.append(dbus.Byte(val))
- print('Read Request received\n', '\tAlertNotificationControlPointCharacteristic')
- print('\tValue:', '\t', val_list)
- self.status.append('read')
- return val_list
- def WriteValue(self, value, options):
- print('Write Request received\n', '\tAlertNotificationControlPointCharacteristic')
- print('\tCurrent value:', '\t', self.value)
- val_list = []
- for val in value:
- val_list.append(val)
- self.value = val_list
- self.PropertiesChanged(GATT_CHRC_IFACE, {'Value': self.value}, [])
- # Check if new value is written
- print('\tNew value:', '\t', self.value)
- if not self.value == value:
- print('Failed: Write Request\n\tNew value not written\tCurrent value:', self.value)
- self.status.append('write')
- class UnreadAlertStatusCharacteristic(Characteristic):
- def __init__(self, bus, index, service):
- Characteristic.__init__(
- self, bus, index,
- CHAR_UUIDS['UNREAD_ALERT_STATUS_UUID'],
- ['read', 'write', 'notify'],
- service)
- self.value = [dbus.Byte(0)]
- self.cccd_obj = ClientCharacteristicConfigurationDescriptor(bus, '0001', self)
- self.add_descriptor(self.cccd_obj)
- self.notifying = False
- self.status = []
- def StartNotify(self):
- try:
- if self.notifying:
- print('\nAlready notifying, nothing to do')
- return
- print('Notify Started')
- self.notifying = True
- self.ReadValue()
- self.WriteValue([dbus.Byte(1), dbus.Byte(0)])
- self.status.append('notify')
- except Exception as e:
- print(e)
- def StopNotify(self):
- if not self.notifying:
- print('\nNot notifying, nothing to do')
- return
- self.notifying = False
- print('\nNotify Stopped')
- def ReadValue(self, options=None):
- val_list = []
- for val in self.value:
- val_list.append(dbus.Byte(val))
- self.status.append('read')
- print('\tValue:', '\t', val_list)
- return val_list
- def WriteValue(self, value, options=None):
- val_list = []
- for val in value:
- val_list.append(val)
- self.value = val_list
- self.PropertiesChanged(GATT_CHRC_IFACE, {'Value': self.value}, [])
- # Check if new value is written
- if not self.value == value:
- print('Failed: Write Request\n\tNew value not written\tCurrent value:', self.value)
- print('New value:', '\t', self.value)
- self.status.append('write')
- class ClientCharacteristicConfigurationDescriptor(Descriptor):
- def __init__(self, bus, index, characteristic):
- self.value = [dbus.Byte(1)]
- Descriptor.__init__(
- self, bus, index,
- DESCR_UUIDS['CCCD_UUID'],
- ['read', 'write'],
- characteristic)
- def ReadValue(self, options=None):
- return self.value
- def WriteValue(self, value, options=None):
- val_list = []
- for val in value:
- val_list.append(val)
- self.value = val_list
- self.PropertiesChanged(GATT_DESC_IFACE, {'Value': self.value}, [])
- # Check if new value is written
- if not self.value == value:
- print('Failed: Write Request\n\tNew value not written\tCurrent value:', self.value)
|