example_test.py 4.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104
  1. from __future__ import unicode_literals
  2. import os
  3. import threading
  4. import time
  5. import serial
  6. import ttfw_idf
  7. from tiny_test_fw import Utility
  8. class SerialThread(object):
  9. '''
  10. Connect to serial port and fake responses just like from a real modem
  11. '''
  12. # Dictionary for transforming received AT command to expected response
  13. AT_FSM = {b'AT+CGMM': b'0G Dummy Model',
  14. b'AT+CGSN': b'0123456789',
  15. b'AT+CIMI': b'ESP',
  16. b'AT+COPS?': b'+COPS: 0,0,"ESP Network"',
  17. b'AT+CSQ': b'+CSQ: 4,0',
  18. b'AT+CBC': b'+CBC: 0,50',
  19. b'ATD*99***1#': b'CONNECT',
  20. }
  21. def run(self, log_path, exit_event):
  22. with serial.Serial(self.port, 115200) as ser, open(log_path, 'w') as f:
  23. buff = b''
  24. while not exit_event.is_set():
  25. time.sleep(0.1)
  26. buff += ser.read(ser.in_waiting)
  27. if not buff.endswith(b'\r'):
  28. continue # read more because the complete command wasn't yet received
  29. cmd_list = buff.split(b'\r')
  30. buff = b''
  31. for cmd in cmd_list:
  32. if len(cmd) == 0:
  33. continue
  34. snd = self.AT_FSM.get(cmd, b'')
  35. if snd != b'':
  36. snd += b'\n'
  37. snd += b'OK\n'
  38. f.write('Received: {}\n'.format(repr(cmd.decode())))
  39. f.write('Sent: {}\n'.format(repr(snd.decode())))
  40. ser.write(snd)
  41. def __init__(self, port, log_path):
  42. self.port = port
  43. self.exit_event = threading.Event()
  44. self.t = threading.Thread(target=self.run, args=(log_path, self.exit_event,))
  45. self.t.start()
  46. def __enter__(self):
  47. return self
  48. def __exit__(self, type, value, traceback):
  49. self.exit_event.set()
  50. self.t.join(60)
  51. if self.t.is_alive():
  52. Utility.console_log('The serial thread is still alive', 'O')
  53. @ttfw_idf.idf_example_test(env_tag='Example_PPP')
  54. def test_examples_pppos_client(env, extra_data):
  55. rel_project_path = 'examples/protocols/pppos_client'
  56. dut = env.get_dut('pppos_client', rel_project_path)
  57. project_path = os.path.join(dut.app.get_sdk_path(), rel_project_path)
  58. modem_port = '/dev/ttyUSB{}'.format(0 if dut.port.endswith('1') else 1)
  59. with SerialThread(modem_port, os.path.join(project_path, 'serial.log')):
  60. dut.start_app()
  61. dut.expect_all('pppos_example: Module: 0G Dummy Model',
  62. 'pppos_example: Operator: "ESP Network"',
  63. 'pppos_example: IMEI: 0123456789',
  64. 'pppos_example: IMSI: ESP',
  65. 'pppos_example: rssi: 4, ber: 0',
  66. 'pppos_example: Battery voltage: 0 mV',
  67. 'pppos_example: Modem PPP Started',
  68. timeout=60)
  69. cmd = ('pppd {} 115200 10.0.0.1:10.0.0.2 logfile {} local noauth debug nocrtscts nodetach +ipv6'
  70. ''.format(modem_port, os.path.join(project_path, 'ppp.log')))
  71. with ttfw_idf.CustomProcess(cmd, '/dev/null'): # Nothing is printed here
  72. dut.expect_all('pppos_example: Modem Connect to PPP Server',
  73. 'pppos_example: IP : 10.0.0.2',
  74. 'pppos_example: Netmask : 255.255.255.255',
  75. 'pppos_example: Gateway : 10.0.0.1',
  76. 'pppos_example: Name Server1: 0.0.0.0',
  77. 'pppos_example: Name Server2: 0.0.0.0',
  78. 'pppos_example: GOT ip event!!!',
  79. 'pppos_example: MQTT other event id: 7',
  80. # There are no fake DNS server and MQTT server set up so the example fails at this point
  81. 'esp-tls: couldn\'t get hostname',
  82. 'MQTT_CLIENT: Error transport connect',
  83. 'pppos_example: MQTT_EVENT_ERROR',
  84. 'pppos_example: MQTT_EVENT_DISCONNECTED')
  85. if __name__ == '__main__':
  86. test_examples_pppos_client()