blehr_test.py 4.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169
  1. #!/usr/bin/env python
  2. #
  3. # Copyright 2019 Espressif Systems (Shanghai) PTE LTD
  4. #
  5. # Licensed under the Apache License, Version 2.0 (the "License");
  6. # you may not use this file except in compliance with the License.
  7. # You may obtain a copy of the License at
  8. #
  9. # http://www.apache.org/licenses/LICENSE-2.0
  10. #
  11. # Unless required by applicable law or agreed to in writing, software
  12. # distributed under the License is distributed on an "AS IS" BASIS,
  13. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14. # See the License for the specific language governing permissions and
  15. # limitations under the License.
  16. from __future__ import print_function
  17. import os
  18. import re
  19. import subprocess
  20. import threading
  21. import traceback
  22. try:
  23. import Queue
  24. except ImportError:
  25. import queue as Queue
  26. import ttfw_idf
  27. from ble import lib_ble_client
  28. from tiny_test_fw import Utility
  29. # When running on local machine execute the following before running this script
  30. # > make app bootloader
  31. # > make print_flash_cmd | tail -n 1 > build/download.config
  32. def blehr_client_task(hr_obj, dut, dut_addr):
  33. interface = 'hci0'
  34. ble_devname = 'blehr_sensor_1.0'
  35. hr_srv_uuid = '180d'
  36. hr_char_uuid = '2a37'
  37. # Get BLE client module
  38. ble_client_obj = lib_ble_client.BLE_Bluez_Client(iface=interface)
  39. # Discover Bluetooth Adapter and power on
  40. is_adapter_set = ble_client_obj.set_adapter()
  41. if not is_adapter_set:
  42. return
  43. # Connect BLE Device
  44. is_connected = ble_client_obj.connect(
  45. devname=ble_devname,
  46. devaddr=dut_addr)
  47. if not is_connected:
  48. return
  49. # Get services of the connected device
  50. services = ble_client_obj.get_services()
  51. if not services:
  52. ble_client_obj.disconnect()
  53. return
  54. # Get characteristics of the connected device
  55. ble_client_obj.get_chars()
  56. '''
  57. Blehr application run:
  58. Start Notifications
  59. Retrieve updated value
  60. Stop Notifications
  61. '''
  62. # Get service if exists
  63. service = ble_client_obj.get_service_if_exists(hr_srv_uuid)
  64. if service:
  65. # Get characteristic if exists
  66. char = ble_client_obj.get_char_if_exists(hr_char_uuid)
  67. if not char:
  68. ble_client_obj.disconnect()
  69. return
  70. else:
  71. ble_client_obj.disconnect()
  72. return
  73. # Start Notify
  74. # Read updated value
  75. # Stop Notify
  76. notify = ble_client_obj.start_notify(char)
  77. if not notify:
  78. ble_client_obj.disconnect()
  79. return
  80. # Check dut responses
  81. dut.expect('subscribe event; cur_notify=1', timeout=30)
  82. dut.expect('subscribe event; cur_notify=0', timeout=30)
  83. # Call disconnect to perform cleanup operations before exiting application
  84. ble_client_obj.disconnect()
  85. # Check dut responses
  86. dut.expect('disconnect;', timeout=30)
  87. class BleHRThread(threading.Thread):
  88. def __init__(self, dut, dut_addr, exceptions_queue):
  89. threading.Thread.__init__(self)
  90. self.dut = dut
  91. self.dut_addr = dut_addr
  92. self.exceptions_queue = exceptions_queue
  93. def run(self):
  94. try:
  95. blehr_client_task(self, self.dut, self.dut_addr)
  96. except RuntimeError:
  97. self.exceptions_queue.put(traceback.format_exc(), block=False)
  98. @ttfw_idf.idf_example_test(env_tag='Example_WIFI_BT')
  99. def test_example_app_ble_hr(env, extra_data):
  100. """
  101. Steps:
  102. 1. Discover Bluetooth Adapter and Power On
  103. 2. Connect BLE Device
  104. 3. Start Notifications
  105. 4. Updated value is retrieved
  106. 5. Stop Notifications
  107. """
  108. # Remove cached bluetooth devices of any previous connections
  109. subprocess.check_output(['rm', '-rf', '/var/lib/bluetooth/*'])
  110. subprocess.check_output(['hciconfig', 'hci0', 'reset'])
  111. # Acquire DUT
  112. dut = env.get_dut('blehr', 'examples/bluetooth/nimble/blehr', dut_class=ttfw_idf.ESP32DUT)
  113. # Get binary file
  114. binary_file = os.path.join(dut.app.binary_path, 'blehr.bin')
  115. bin_size = os.path.getsize(binary_file)
  116. ttfw_idf.log_performance('blehr_bin_size', '{}KB'.format(bin_size // 1024))
  117. # Upload binary and start testing
  118. Utility.console_log('Starting blehr simple example test app')
  119. dut.start_app()
  120. dut.reset()
  121. # Get device address from dut
  122. dut_addr = dut.expect(re.compile(r'Device Address: ([a-fA-F0-9:]+)'), timeout=30)[0]
  123. exceptions_queue = Queue.Queue()
  124. # Starting a py-client in a separate thread
  125. blehr_thread_obj = BleHRThread(dut, dut_addr, exceptions_queue)
  126. blehr_thread_obj.start()
  127. blehr_thread_obj.join()
  128. exception_msg = None
  129. while True:
  130. try:
  131. exception_msg = exceptions_queue.get(block=False)
  132. except Queue.Empty:
  133. break
  134. else:
  135. Utility.console_log('\n' + exception_msg)
  136. if exception_msg:
  137. raise Exception('Blehr thread did not run successfully')
  138. if __name__ == '__main__':
  139. test_example_app_ble_hr()