lib_gatt.py 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430
  1. #!/usr/bin/env python
  2. #
  3. # Copyright 2019 Espressif Systems (Shanghai) PTE LTD
  4. #
  5. # Licensed under the Apache License, Version 2.0 (the "License");
  6. # you may not use this file except in compliance with the License.
  7. # You may obtain a copy of the License at
  8. #
  9. # http://www.apache.org/licenses/LICENSE-2.0
  10. #
  11. # Unless required by applicable law or agreed to in writing, software
  12. # distributed under the License is distributed on an "AS IS" BASIS,
  13. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14. # See the License for the specific language governing permissions and
  15. # limitations under the License.
  16. #
  17. # Creating GATT Application which then becomes available to remote devices.
  18. from __future__ import print_function
  19. import sys
  20. try:
  21. import dbus
  22. import dbus.service
  23. except ImportError as e:
  24. if 'linux' not in sys.platform:
  25. raise e
  26. print(e)
  27. print('Install packages `libgirepository1.0-dev gir1.2-gtk-3.0 libcairo2-dev libdbus-1-dev libdbus-glib-1-dev` for resolving the issue')
  28. print('Run `pip install -r $IDF_PATH/tools/ble/requirements.txt` for resolving the issue')
  29. raise
  30. DBUS_OM_IFACE = 'org.freedesktop.DBus.ObjectManager'
  31. DBUS_PROP_IFACE = 'org.freedesktop.DBus.Properties'
  32. GATT_MANAGER_IFACE = 'org.bluez.GattManager1'
  33. GATT_SERVICE_IFACE = 'org.bluez.GattService1'
  34. GATT_CHRC_IFACE = 'org.bluez.GattCharacteristic1'
  35. GATT_DESC_IFACE = 'org.bluez.GattDescriptor1'
  36. SERVICE_UUIDS = {
  37. 'ALERT_NOTIF_SVC_UUID': '00001811-0000-1000-8000-00805f9b34fb'
  38. }
  39. CHAR_UUIDS = {
  40. 'SUPPORT_NEW_ALERT_UUID': '00002A47-0000-1000-8000-00805f9b34fb',
  41. 'ALERT_NOTIF_UUID': '00002A44-0000-1000-8000-00805f9b34fb',
  42. 'UNREAD_ALERT_STATUS_UUID': '00002A45-0000-1000-8000-00805f9b34fb'
  43. }
  44. DESCR_UUIDS = {
  45. 'CCCD_UUID': '00002902-0000-1000-8000-00805f9b34fb'
  46. }
  47. class InvalidArgsException(dbus.exceptions.DBusException):
  48. _dbus_error_name = 'org.freedesktop.DBus.Error.InvalidArgs'
  49. class NotSupportedException(dbus.exceptions.DBusException):
  50. _dbus_error_name = 'org.bluez.Error.NotSupported'
  51. class Application(dbus.service.Object):
  52. """
  53. org.bluez.GattApplication1 interface implementation
  54. """
  55. def __init__(self, bus, path):
  56. self.path = path
  57. self.services = []
  58. dbus.service.Object.__init__(self, bus, self.path)
  59. def __del__(self):
  60. pass
  61. def add_service(self, service):
  62. self.services.append(service)
  63. @dbus.service.method(DBUS_OM_IFACE, out_signature='a{oa{sa{sv}}}')
  64. def GetManagedObjects(self):
  65. response = {}
  66. for service in self.services:
  67. response[service.get_path()] = service.get_properties()
  68. chrcs = service.get_characteristics()
  69. for chrc in chrcs:
  70. response[chrc.get_path()] = chrc.get_properties()
  71. descs = chrc.get_descriptors()
  72. for desc in descs:
  73. response[desc.get_path()] = desc.get_properties()
  74. return response
  75. def get_path(self):
  76. return dbus.ObjectPath(self.path)
  77. def Release(self):
  78. pass
  79. class AlertNotificationApp(Application):
  80. '''
  81. Alert Notification Application
  82. '''
  83. def __init__(self, bus, path):
  84. Application.__init__(self, bus, path)
  85. self.service = AlertNotificationService(bus, '0001')
  86. self.add_service(self.service)
  87. class Service(dbus.service.Object):
  88. """
  89. org.bluez.GattService1 interface implementation
  90. """
  91. PATH_BASE = '/org/bluez/hci0/service'
  92. def __init__(self, bus, index, uuid, primary=False):
  93. self.path = self.PATH_BASE + str(index)
  94. self.bus = bus
  95. self.uuid = uuid
  96. self.primary = primary
  97. self.characteristics = []
  98. dbus.service.Object.__init__(self, bus, self.path)
  99. def get_properties(self):
  100. return {
  101. GATT_SERVICE_IFACE: {
  102. 'UUID': self.uuid,
  103. 'Primary': self.primary,
  104. 'Characteristics': dbus.Array(
  105. self.get_characteristic_paths(),
  106. signature='o')
  107. }
  108. }
  109. def get_path(self):
  110. return dbus.ObjectPath(self.path)
  111. def add_characteristic(self, characteristic):
  112. self.characteristics.append(characteristic)
  113. def get_characteristic_paths(self):
  114. result = []
  115. for chrc in self.characteristics:
  116. result.append(chrc.get_path())
  117. return result
  118. def get_characteristics(self):
  119. return self.characteristics
  120. @dbus.service.method(DBUS_PROP_IFACE,
  121. in_signature='s',
  122. out_signature='a{sv}')
  123. def GetAll(self, interface):
  124. if interface != GATT_SERVICE_IFACE:
  125. raise InvalidArgsException()
  126. return self.get_properties()[GATT_SERVICE_IFACE]
  127. class Characteristic(dbus.service.Object):
  128. """
  129. org.bluez.GattCharacteristic1 interface implementation
  130. """
  131. def __init__(self, bus, index, uuid, flags, service):
  132. self.path = service.path + '/char' + str(index)
  133. self.bus = bus
  134. self.uuid = uuid
  135. self.service = service
  136. self.flags = flags
  137. self.value = [0]
  138. self.descriptors = []
  139. dbus.service.Object.__init__(self, bus, self.path)
  140. def get_properties(self):
  141. return {
  142. GATT_CHRC_IFACE: {
  143. 'Service': self.service.get_path(),
  144. 'UUID': self.uuid,
  145. 'Flags': self.flags,
  146. 'Value': self.value,
  147. 'Descriptors': dbus.Array(self.get_descriptor_paths(), signature='o')
  148. }
  149. }
  150. def get_path(self):
  151. return dbus.ObjectPath(self.path)
  152. def add_descriptor(self, descriptor):
  153. self.descriptors.append(descriptor)
  154. def get_descriptor_paths(self):
  155. result = []
  156. for desc in self.descriptors:
  157. result.append(desc.get_path())
  158. return result
  159. def get_descriptors(self):
  160. return self.descriptors
  161. @dbus.service.method(DBUS_PROP_IFACE, in_signature='s', out_signature='a{sv}')
  162. def GetAll(self, interface):
  163. if interface != GATT_CHRC_IFACE:
  164. raise InvalidArgsException()
  165. return self.get_properties()[GATT_CHRC_IFACE]
  166. @dbus.service.method(GATT_CHRC_IFACE, in_signature='a{sv}', out_signature='ay')
  167. def ReadValue(self, options):
  168. print('\nDefault ReadValue called, returning error')
  169. raise NotSupportedException()
  170. @dbus.service.method(GATT_CHRC_IFACE, in_signature='aya{sv}')
  171. def WriteValue(self, value, options):
  172. print('\nDefault WriteValue called, returning error')
  173. raise NotSupportedException()
  174. @dbus.service.method(GATT_CHRC_IFACE)
  175. def StartNotify(self):
  176. print('Default StartNotify called, returning error')
  177. raise NotSupportedException()
  178. @dbus.service.method(GATT_CHRC_IFACE)
  179. def StopNotify(self):
  180. print('Default StopNotify called, returning error')
  181. raise NotSupportedException()
  182. @dbus.service.signal(DBUS_PROP_IFACE,
  183. signature='sa{sv}as')
  184. def PropertiesChanged(self, interface, changed, invalidated):
  185. pass
  186. # print('\nProperties Changed')
  187. class Descriptor(dbus.service.Object):
  188. """
  189. org.bluez.GattDescriptor1 interface implementation
  190. """
  191. def __init__(self, bus, index, uuid, flags, characteristic):
  192. self.path = characteristic.path + '/desc' + str(index)
  193. self.bus = bus
  194. self.uuid = uuid
  195. self.flags = flags
  196. self.chrc = characteristic
  197. dbus.service.Object.__init__(self, bus, self.path)
  198. def get_properties(self):
  199. return {
  200. GATT_DESC_IFACE: {
  201. 'Characteristic': self.chrc.get_path(),
  202. 'UUID': self.uuid,
  203. 'Flags': self.flags,
  204. }
  205. }
  206. def get_path(self):
  207. return dbus.ObjectPath(self.path)
  208. @dbus.service.method(DBUS_PROP_IFACE,
  209. in_signature='s',
  210. out_signature='a{sv}')
  211. def GetAll(self, interface):
  212. if interface != GATT_DESC_IFACE:
  213. raise InvalidArgsException()
  214. return self.get_properties()[GATT_DESC_IFACE]
  215. @dbus.service.method(GATT_DESC_IFACE, in_signature='a{sv}', out_signature='ay')
  216. def ReadValue(self, options):
  217. print('Default ReadValue called, returning error')
  218. raise NotSupportedException()
  219. @dbus.service.method(GATT_DESC_IFACE, in_signature='aya{sv}')
  220. def WriteValue(self, value, options):
  221. print('Default WriteValue called, returning error')
  222. raise NotSupportedException()
  223. @dbus.service.signal(DBUS_PROP_IFACE,
  224. signature='sa{sv}as')
  225. def PropertiesChanged(self, interface, changed, invalidated):
  226. pass
  227. # print('\nProperties Changed')
  228. class AlertNotificationService(Service):
  229. def __init__(self, bus, index):
  230. Service.__init__(self, bus, index, SERVICE_UUIDS['ALERT_NOTIF_SVC_UUID'], primary=True)
  231. self.add_characteristic(SupportedNewAlertCategoryCharacteristic(bus, '0001', self))
  232. self.add_characteristic(AlertNotificationControlPointCharacteristic(bus, '0002', self))
  233. self.add_characteristic(UnreadAlertStatusCharacteristic(bus, '0003', self))
  234. def get_char_status(self, uuid, status):
  235. for char in self.characteristics:
  236. if char.uuid == uuid:
  237. if status in char.status:
  238. return True
  239. return False
  240. class SupportedNewAlertCategoryCharacteristic(Characteristic):
  241. def __init__(self, bus, index, service):
  242. Characteristic.__init__(
  243. self, bus, index,
  244. CHAR_UUIDS['SUPPORT_NEW_ALERT_UUID'],
  245. ['read'],
  246. service)
  247. self.value = [dbus.Byte(2)]
  248. self.status = []
  249. def ReadValue(self, options):
  250. val_list = []
  251. for val in self.value:
  252. val_list.append(dbus.Byte(val))
  253. print('Read Request received\n', '\tSupportedNewAlertCategoryCharacteristic')
  254. print('\tValue:', '\t', val_list)
  255. self.status.append('read')
  256. return val_list
  257. class AlertNotificationControlPointCharacteristic(Characteristic):
  258. def __init__(self, bus, index, service):
  259. Characteristic.__init__(
  260. self, bus, index,
  261. CHAR_UUIDS['ALERT_NOTIF_UUID'],
  262. ['read', 'write'],
  263. service)
  264. self.value = [dbus.Byte(0)]
  265. self.status = []
  266. def ReadValue(self, options):
  267. val_list = []
  268. for val in self.value:
  269. val_list.append(dbus.Byte(val))
  270. print('Read Request received\n', '\tAlertNotificationControlPointCharacteristic')
  271. print('\tValue:', '\t', val_list)
  272. self.status.append('read')
  273. return val_list
  274. def WriteValue(self, value, options):
  275. print('Write Request received\n', '\tAlertNotificationControlPointCharacteristic')
  276. print('\tCurrent value:', '\t', self.value)
  277. val_list = []
  278. for val in value:
  279. val_list.append(val)
  280. self.value = val_list
  281. self.PropertiesChanged(GATT_CHRC_IFACE, {'Value': self.value}, [])
  282. # Check if new value is written
  283. print('\tNew value:', '\t', self.value)
  284. if not self.value == value:
  285. print('Failed: Write Request\n\tNew value not written\tCurrent value:', self.value)
  286. self.status.append('write')
  287. class UnreadAlertStatusCharacteristic(Characteristic):
  288. def __init__(self, bus, index, service):
  289. Characteristic.__init__(
  290. self, bus, index,
  291. CHAR_UUIDS['UNREAD_ALERT_STATUS_UUID'],
  292. ['read', 'write', 'notify'],
  293. service)
  294. self.value = [dbus.Byte(0)]
  295. self.cccd_obj = ClientCharacteristicConfigurationDescriptor(bus, '0001', self)
  296. self.add_descriptor(self.cccd_obj)
  297. self.notifying = False
  298. self.status = []
  299. def StartNotify(self):
  300. try:
  301. if self.notifying:
  302. print('\nAlready notifying, nothing to do')
  303. return
  304. print('Notify Started')
  305. self.notifying = True
  306. self.ReadValue()
  307. self.WriteValue([dbus.Byte(1), dbus.Byte(0)])
  308. self.status.append('notify')
  309. except Exception as e:
  310. print(e)
  311. def StopNotify(self):
  312. if not self.notifying:
  313. print('\nNot notifying, nothing to do')
  314. return
  315. self.notifying = False
  316. print('\nNotify Stopped')
  317. def ReadValue(self, options=None):
  318. val_list = []
  319. for val in self.value:
  320. val_list.append(dbus.Byte(val))
  321. self.status.append('read')
  322. print('\tValue:', '\t', val_list)
  323. return val_list
  324. def WriteValue(self, value, options=None):
  325. val_list = []
  326. for val in value:
  327. val_list.append(val)
  328. self.value = val_list
  329. self.PropertiesChanged(GATT_CHRC_IFACE, {'Value': self.value}, [])
  330. # Check if new value is written
  331. if not self.value == value:
  332. print('Failed: Write Request\n\tNew value not written\tCurrent value:', self.value)
  333. print('New value:', '\t', self.value)
  334. self.status.append('write')
  335. class ClientCharacteristicConfigurationDescriptor(Descriptor):
  336. def __init__(self, bus, index, characteristic):
  337. self.value = [dbus.Byte(1)]
  338. Descriptor.__init__(
  339. self, bus, index,
  340. DESCR_UUIDS['CCCD_UUID'],
  341. ['read', 'write'],
  342. characteristic)
  343. def ReadValue(self, options=None):
  344. return self.value
  345. def WriteValue(self, value, options=None):
  346. val_list = []
  347. for val in value:
  348. val_list.append(val)
  349. self.value = val_list
  350. self.PropertiesChanged(GATT_DESC_IFACE, {'Value': self.value}, [])
  351. # Check if new value is written
  352. if not self.value == value:
  353. print('Failed: Write Request\n\tNew value not written\tCurrent value:', self.value)