app_test.py 4.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126
  1. from __future__ import print_function, unicode_literals
  2. import os
  3. import re
  4. import socket
  5. import ssl
  6. from threading import Event, Thread
  7. import ttfw_idf
  8. SERVER_CERTS_DIR = 'server_certs/'
  9. def _path(f):
  10. return os.path.join(os.path.dirname(os.path.realpath(__file__)),f)
  11. def get_my_ip():
  12. s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
  13. try:
  14. # doesn't even have to be reachable
  15. s.connect(('10.255.255.255', 1))
  16. IP = s.getsockname()[0]
  17. except socket.error:
  18. IP = '127.0.0.1'
  19. finally:
  20. s.close()
  21. return IP
  22. # Simple TLS server
  23. class TlsServer:
  24. def __init__(self, port, negotiated_protocol=ssl.PROTOCOL_TLSv1):
  25. self.port = port
  26. self.socket = socket.socket()
  27. self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
  28. self.socket.settimeout(20.0)
  29. self.shutdown = Event()
  30. self.negotiated_protocol = negotiated_protocol
  31. self.conn = None
  32. self.ssl_error = None
  33. self.server_thread = None
  34. def __enter__(self):
  35. try:
  36. self.socket.bind(('', self.port))
  37. except socket.error as e:
  38. print('Bind failed:{}'.format(e))
  39. raise
  40. self.socket.listen(1)
  41. self.server_thread = Thread(target=self.run_server)
  42. self.server_thread.start()
  43. return self
  44. def __exit__(self, exc_type, exc_value, traceback):
  45. self.shutdown.set()
  46. self.server_thread.join()
  47. self.socket.close()
  48. if (self.conn is not None):
  49. self.conn.close()
  50. def run_server(self):
  51. context = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH)
  52. context.load_verify_locations(cafile=_path(SERVER_CERTS_DIR + 'ca.crt'))
  53. context.load_cert_chain(certfile=_path(SERVER_CERTS_DIR + 'server.crt'), keyfile=_path(SERVER_CERTS_DIR + 'server.key'))
  54. context.verify_flags = self.negotiated_protocol
  55. self.socket = context.wrap_socket(self.socket, server_side=True)
  56. try:
  57. print('Listening socket')
  58. self.conn, address = self.socket.accept() # accept new connection
  59. self.socket.settimeout(20.0)
  60. print(' - connection from: {}'.format(address))
  61. except ssl.SSLError as e:
  62. self.conn = None
  63. self.ssl_error = str(e)
  64. print(' - SSLError: {}'.format(str(e)))
  65. @ttfw_idf.idf_custom_test(env_tag='Example_WIFI', group='test-apps')
  66. def test_app_esp_openssl(env, extra_data):
  67. dut1 = env.get_dut('openssl_connect_test', 'tools/test_apps/protocols/openssl', dut_class=ttfw_idf.ESP32DUT)
  68. # check and log bin size
  69. binary_file = os.path.join(dut1.app.binary_path, 'openssl_connect_test.bin')
  70. bin_size = os.path.getsize(binary_file)
  71. ttfw_idf.log_performance('openssl_connect_test_bin_size', '{}KB'.format(bin_size // 1024))
  72. dut1.start_app()
  73. esp_ip = dut1.expect(re.compile(r' IPv4 address: ([0-9]+\.[0-9]+\.[0-9]+\.[0-9]+)'), timeout=30)
  74. print('Got IP={}'.format(esp_ip[0]))
  75. ip = get_my_ip()
  76. server_port = 2222
  77. def start_case(case, desc, negotiated_protocol, result):
  78. with TlsServer(server_port, negotiated_protocol=negotiated_protocol):
  79. print('Starting {}: {}'.format(case, desc))
  80. dut1.write('conn {} {} {}'.format(ip, server_port, case))
  81. dut1.expect(re.compile(result), timeout=10)
  82. return case
  83. # start test cases
  84. start_case(
  85. case='CONFIG_TLSV1_1_CONNECT_WRONG_CERT_VERIFY_NONE',
  86. desc='Connect with verify_none mode using wrong certs',
  87. negotiated_protocol=ssl.PROTOCOL_TLSv1_1,
  88. result='SSL Connection Succeed')
  89. start_case(
  90. case='CONFIG_TLSV1_1_CONNECT_WRONG_CERT_VERIFY_PEER',
  91. desc='Connect with verify_peer mode using wrong certs',
  92. negotiated_protocol=ssl.PROTOCOL_TLSv1_1,
  93. result='SSL Connection Failed')
  94. start_case(
  95. case='CONFIG_TLSV1_2_CONNECT_WRONG_CERT_VERIFY_NONE',
  96. desc='Connect with verify_none mode using wrong certs',
  97. negotiated_protocol=ssl.PROTOCOL_TLSv1_2,
  98. result='SSL Connection Succeed')
  99. start_case(
  100. case='CONFIG_TLSV1_1_CONNECT_WRONG_CERT_VERIFY_PEER',
  101. desc='Connect with verify_peer mode using wrong certs',
  102. negotiated_protocol=ssl.PROTOCOL_TLSv1_2,
  103. result='SSL Connection Failed')
  104. if __name__ == '__main__':
  105. test_app_esp_openssl()