lib_gatt.py 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412
  1. #!/usr/bin/env python
  2. #
  3. # Copyright 2019 Espressif Systems (Shanghai) PTE LTD
  4. #
  5. # Licensed under the Apache License, Version 2.0 (the "License");
  6. # you may not use this file except in compliance with the License.
  7. # You may obtain a copy of the License at
  8. #
  9. # http://www.apache.org/licenses/LICENSE-2.0
  10. #
  11. # Unless required by applicable law or agreed to in writing, software
  12. # distributed under the License is distributed on an "AS IS" BASIS,
  13. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14. # See the License for the specific language governing permissions and
  15. # limitations under the License.
  16. #
  17. # Creating GATT Application which then becomes available to remote devices.
  18. from __future__ import print_function
  19. import sys
  20. try:
  21. import dbus
  22. import dbus.service
  23. except ImportError as e:
  24. if 'linux' not in sys.platform:
  25. raise e
  26. print(e)
  27. print("Install packages `libgirepository1.0-dev gir1.2-gtk-3.0 libcairo2-dev libdbus-1-dev libdbus-glib-1-dev` for resolving the issue")
  28. print("Run `pip install -r $IDF_PATH/tools/ble/requirements.txt` for resolving the issue")
  29. raise
  30. alert_status_char_obj = None
  31. GATT_APP_OBJ = False
  32. CHAR_READ = False
  33. CHAR_WRITE = False
  34. CHAR_SUBSCRIBE = False
  35. DBUS_OM_IFACE = 'org.freedesktop.DBus.ObjectManager'
  36. DBUS_PROP_IFACE = 'org.freedesktop.DBus.Properties'
  37. GATT_MANAGER_IFACE = 'org.bluez.GattManager1'
  38. GATT_SERVICE_IFACE = 'org.bluez.GattService1'
  39. GATT_CHRC_IFACE = 'org.bluez.GattCharacteristic1'
  40. GATT_DESC_IFACE = 'org.bluez.GattDescriptor1'
  41. class InvalidArgsException(dbus.exceptions.DBusException):
  42. _dbus_error_name = 'org.freedesktop.DBus.Error.InvalidArgs'
  43. class NotSupportedException(dbus.exceptions.DBusException):
  44. _dbus_error_name = 'org.bluez.Error.NotSupported'
  45. class Application(dbus.service.Object):
  46. """
  47. org.bluez.GattApplication1 interface implementation
  48. """
  49. def __init__(self, bus, path):
  50. self.path = path
  51. self.services = []
  52. srv_obj = AlertNotificationService(bus, '0001')
  53. self.add_service(srv_obj)
  54. dbus.service.Object.__init__(self, bus, self.path)
  55. def __del__(self):
  56. pass
  57. def get_path(self):
  58. return dbus.ObjectPath(self.path)
  59. def add_service(self, service):
  60. self.services.append(service)
  61. @dbus.service.method(DBUS_OM_IFACE, out_signature='a{oa{sa{sv}}}')
  62. def GetManagedObjects(self):
  63. global GATT_APP_OBJ
  64. response = {}
  65. for service in self.services:
  66. response[service.get_path()] = service.get_properties()
  67. chrcs = service.get_characteristics()
  68. for chrc in chrcs:
  69. response[chrc.get_path()] = chrc.get_properties()
  70. descs = chrc.get_descriptors()
  71. for desc in descs:
  72. response[desc.get_path()] = desc.get_properties()
  73. GATT_APP_OBJ = True
  74. return response
  75. def Release(self):
  76. pass
  77. class Service(dbus.service.Object):
  78. """
  79. org.bluez.GattService1 interface implementation
  80. """
  81. PATH_BASE = '/org/bluez/hci0/service'
  82. def __init__(self, bus, index, uuid, primary=False):
  83. self.path = self.PATH_BASE + str(index)
  84. self.bus = bus
  85. self.uuid = uuid
  86. self.primary = primary
  87. self.characteristics = []
  88. dbus.service.Object.__init__(self, bus, self.path)
  89. def get_properties(self):
  90. return {
  91. GATT_SERVICE_IFACE: {
  92. 'UUID': self.uuid,
  93. 'Primary': self.primary,
  94. 'Characteristics': dbus.Array(
  95. self.get_characteristic_paths(),
  96. signature='o')
  97. }
  98. }
  99. def get_path(self):
  100. return dbus.ObjectPath(self.path)
  101. def add_characteristic(self, characteristic):
  102. self.characteristics.append(characteristic)
  103. def get_characteristic_paths(self):
  104. result = []
  105. for chrc in self.characteristics:
  106. result.append(chrc.get_path())
  107. return result
  108. def get_characteristics(self):
  109. return self.characteristics
  110. @dbus.service.method(DBUS_PROP_IFACE,
  111. in_signature='s',
  112. out_signature='a{sv}')
  113. def GetAll(self, interface):
  114. if interface != GATT_SERVICE_IFACE:
  115. raise InvalidArgsException()
  116. return self.get_properties()[GATT_SERVICE_IFACE]
  117. class Characteristic(dbus.service.Object):
  118. """
  119. org.bluez.GattCharacteristic1 interface implementation
  120. """
  121. def __init__(self, bus, index, uuid, flags, service):
  122. self.path = service.path + '/char' + str(index)
  123. self.bus = bus
  124. self.uuid = uuid
  125. self.service = service
  126. self.flags = flags
  127. self.value = [0]
  128. self.descriptors = []
  129. dbus.service.Object.__init__(self, bus, self.path)
  130. def get_properties(self):
  131. return {
  132. GATT_CHRC_IFACE: {
  133. 'Service': self.service.get_path(),
  134. 'UUID': self.uuid,
  135. 'Flags': self.flags,
  136. 'Value': self.value,
  137. 'Descriptors': dbus.Array(self.get_descriptor_paths(), signature='o')
  138. }
  139. }
  140. def get_path(self):
  141. return dbus.ObjectPath(self.path)
  142. def add_descriptor(self, descriptor):
  143. self.descriptors.append(descriptor)
  144. def get_descriptor_paths(self):
  145. result = []
  146. for desc in self.descriptors:
  147. result.append(desc.get_path())
  148. return result
  149. def get_descriptors(self):
  150. return self.descriptors
  151. @dbus.service.method(DBUS_PROP_IFACE, in_signature='s', out_signature='a{sv}')
  152. def GetAll(self, interface):
  153. if interface != GATT_CHRC_IFACE:
  154. raise InvalidArgsException()
  155. return self.get_properties()[GATT_CHRC_IFACE]
  156. @dbus.service.method(GATT_CHRC_IFACE, in_signature='a{sv}', out_signature='ay')
  157. def ReadValue(self, options):
  158. print('\nDefault ReadValue called, returning error')
  159. raise NotSupportedException()
  160. @dbus.service.method(GATT_CHRC_IFACE, in_signature='aya{sv}')
  161. def WriteValue(self, value, options):
  162. print('\nDefault WriteValue called, returning error')
  163. raise NotSupportedException()
  164. @dbus.service.method(GATT_CHRC_IFACE)
  165. def StartNotify(self):
  166. print('Default StartNotify called, returning error')
  167. raise NotSupportedException()
  168. @dbus.service.method(GATT_CHRC_IFACE)
  169. def StopNotify(self):
  170. print('Default StopNotify called, returning error')
  171. raise NotSupportedException()
  172. @dbus.service.signal(DBUS_PROP_IFACE,
  173. signature='sa{sv}as')
  174. def PropertiesChanged(self, interface, changed, invalidated):
  175. print("\nProperties Changed")
  176. class Descriptor(dbus.service.Object):
  177. """
  178. org.bluez.GattDescriptor1 interface implementation
  179. """
  180. def __init__(self, bus, index, uuid, flags, characteristic):
  181. self.path = characteristic.path + '/desc' + str(index)
  182. self.bus = bus
  183. self.uuid = uuid
  184. self.flags = flags
  185. self.chrc = characteristic
  186. dbus.service.Object.__init__(self, bus, self.path)
  187. def get_properties(self):
  188. return {
  189. GATT_DESC_IFACE: {
  190. 'Characteristic': self.chrc.get_path(),
  191. 'UUID': self.uuid,
  192. 'Flags': self.flags,
  193. }
  194. }
  195. def get_path(self):
  196. return dbus.ObjectPath(self.path)
  197. @dbus.service.method(DBUS_PROP_IFACE,
  198. in_signature='s',
  199. out_signature='a{sv}')
  200. def GetAll(self, interface):
  201. if interface != GATT_DESC_IFACE:
  202. raise InvalidArgsException()
  203. return self.get_properties()[GATT_DESC_IFACE]
  204. @dbus.service.method(GATT_DESC_IFACE, in_signature='a{sv}', out_signature='ay')
  205. def ReadValue(self, options):
  206. print('Default ReadValue called, returning error')
  207. raise NotSupportedException()
  208. @dbus.service.method(GATT_DESC_IFACE, in_signature='aya{sv}')
  209. def WriteValue(self, value, options):
  210. print('Default WriteValue called, returning error')
  211. raise NotSupportedException()
  212. class AlertNotificationService(Service):
  213. TEST_SVC_UUID = '00001811-0000-1000-8000-00805f9b34fb'
  214. def __init__(self, bus, index):
  215. global alert_status_char_obj
  216. Service.__init__(self, bus, index, self.TEST_SVC_UUID, primary=True)
  217. self.add_characteristic(SupportedNewAlertCategoryCharacteristic(bus, '0001', self))
  218. self.add_characteristic(AlertNotificationControlPointCharacteristic(bus, '0002', self))
  219. alert_status_char_obj = UnreadAlertStatusCharacteristic(bus, '0003', self)
  220. self.add_characteristic(alert_status_char_obj)
  221. class SupportedNewAlertCategoryCharacteristic(Characteristic):
  222. SUPPORT_NEW_ALERT_UUID = '00002A47-0000-1000-8000-00805f9b34fb'
  223. def __init__(self, bus, index, service):
  224. Characteristic.__init__(
  225. self, bus, index,
  226. self.SUPPORT_NEW_ALERT_UUID,
  227. ['read'],
  228. service)
  229. self.value = [dbus.Byte(2)]
  230. def ReadValue(self, options):
  231. global CHAR_READ
  232. CHAR_READ = True
  233. val_list = []
  234. for val in self.value:
  235. val_list.append(dbus.Byte(val))
  236. print("Read Request received\n", "\tSupportedNewAlertCategoryCharacteristic")
  237. print("\tValue:", "\t", val_list)
  238. return val_list
  239. class AlertNotificationControlPointCharacteristic(Characteristic):
  240. ALERT_NOTIF_UUID = '00002A44-0000-1000-8000-00805f9b34fb'
  241. def __init__(self, bus, index, service):
  242. Characteristic.__init__(
  243. self, bus, index,
  244. self.ALERT_NOTIF_UUID,
  245. ['read','write'],
  246. service)
  247. self.value = [dbus.Byte(0)]
  248. def ReadValue(self, options):
  249. val_list = []
  250. for val in self.value:
  251. val_list.append(dbus.Byte(val))
  252. print("Read Request received\n", "\tAlertNotificationControlPointCharacteristic")
  253. print("\tValue:", "\t", val_list)
  254. return val_list
  255. def WriteValue(self, value, options):
  256. global CHAR_WRITE
  257. CHAR_WRITE = True
  258. print("Write Request received\n", "\tAlertNotificationControlPointCharacteristic")
  259. print("\tCurrent value:", "\t", self.value)
  260. val_list = []
  261. for val in value:
  262. val_list.append(val)
  263. self.value = val_list
  264. # Check if new value is written
  265. print("\tNew value:", "\t", self.value)
  266. if not self.value == value:
  267. print("Failed: Write Request\n\tNew value not written\tCurrent value:", self.value)
  268. class UnreadAlertStatusCharacteristic(Characteristic):
  269. UNREAD_ALERT_STATUS_UUID = '00002A45-0000-1000-8000-00805f9b34fb'
  270. def __init__(self, bus, index, service):
  271. Characteristic.__init__(
  272. self, bus, index,
  273. self.UNREAD_ALERT_STATUS_UUID,
  274. ['read', 'write', 'notify'],
  275. service)
  276. self.value = [dbus.Byte(0)]
  277. self.cccd_obj = ClientCharacteristicConfigurationDescriptor(bus, '0001', self)
  278. self.add_descriptor(self.cccd_obj)
  279. self.notifying = False
  280. def StartNotify(self):
  281. global CHAR_SUBSCRIBE
  282. CHAR_SUBSCRIBE = True
  283. if self.notifying:
  284. print('\nAlready notifying, nothing to do')
  285. return
  286. self.notifying = True
  287. print("\nNotify Started")
  288. self.cccd_obj.WriteValue([dbus.Byte(1), dbus.Byte(0)])
  289. self.cccd_obj.ReadValue()
  290. def StopNotify(self):
  291. if not self.notifying:
  292. print('\nNot notifying, nothing to do')
  293. return
  294. self.notifying = False
  295. print("\nNotify Stopped")
  296. def ReadValue(self, options):
  297. print("Read Request received\n", "\tUnreadAlertStatusCharacteristic")
  298. val_list = []
  299. for val in self.value:
  300. val_list.append(dbus.Byte(val))
  301. print("\tValue:", "\t", val_list)
  302. return val_list
  303. def WriteValue(self, value, options):
  304. print("Write Request received\n", "\tUnreadAlertStatusCharacteristic")
  305. val_list = []
  306. for val in value:
  307. val_list.append(val)
  308. self.value = val_list
  309. # Check if new value is written
  310. print("\tNew value:", "\t", self.value)
  311. if not self.value == value:
  312. print("Failed: Write Request\n\tNew value not written\tCurrent value:", self.value)
  313. class ClientCharacteristicConfigurationDescriptor(Descriptor):
  314. CCCD_UUID = '00002902-0000-1000-8000-00805f9b34fb'
  315. def __init__(self, bus, index, characteristic):
  316. self.value = [dbus.Byte(0)]
  317. Descriptor.__init__(
  318. self, bus, index,
  319. self.CCCD_UUID,
  320. ['read', 'write'],
  321. characteristic)
  322. def ReadValue(self):
  323. print("\tValue on read:", "\t", self.value)
  324. return self.value
  325. def WriteValue(self, value):
  326. val_list = []
  327. for val in value:
  328. val_list.append(val)
  329. self.value = val_list
  330. # Check if new value is written
  331. print("New value on write:", "\t", self.value)
  332. if not self.value == value:
  333. print("Failed: Write Request\n\tNew value not written\tCurrent value:", self.value)