app_test.py 4.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125
  1. from __future__ import print_function
  2. from __future__ import unicode_literals
  3. import re
  4. import os
  5. import socket
  6. from threading import Thread, Event
  7. import ttfw_idf
  8. import ssl
  9. SERVER_CERTS_DIR = "server_certs/"
  10. def _path(f):
  11. return os.path.join(os.path.dirname(os.path.realpath(__file__)),f)
  12. def get_my_ip():
  13. s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
  14. try:
  15. # doesn't even have to be reachable
  16. s.connect(('10.255.255.255', 1))
  17. IP = s.getsockname()[0]
  18. except socket.error:
  19. IP = '127.0.0.1'
  20. finally:
  21. s.close()
  22. return IP
  23. # Simple TLS server
  24. class TlsServer:
  25. def __init__(self, port, negotiated_protocol=ssl.PROTOCOL_TLSv1):
  26. self.port = port
  27. self.socket = socket.socket()
  28. self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
  29. self.socket.settimeout(20.0)
  30. self.shutdown = Event()
  31. self.negotiated_protocol = negotiated_protocol
  32. self.conn = None
  33. self.ssl_error = None
  34. self.server_thread = None
  35. def __enter__(self):
  36. try:
  37. self.socket.bind(('', self.port))
  38. except socket.error as e:
  39. print("Bind failed:{}".format(e))
  40. raise
  41. self.socket.listen(1)
  42. self.server_thread = Thread(target=self.run_server)
  43. self.server_thread.start()
  44. return self
  45. def __exit__(self, exc_type, exc_value, traceback):
  46. self.shutdown.set()
  47. self.server_thread.join()
  48. self.socket.close()
  49. if (self.conn is not None):
  50. self.conn.close()
  51. def run_server(self):
  52. context = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH)
  53. context.load_verify_locations(cafile=_path(SERVER_CERTS_DIR + "ca.crt"))
  54. context.load_cert_chain(certfile=_path(SERVER_CERTS_DIR + "server.crt"), keyfile=_path(SERVER_CERTS_DIR + "server.key"))
  55. context.verify_flags = self.negotiated_protocol
  56. self.socket = context.wrap_socket(self.socket, server_side=True)
  57. try:
  58. print("Listening socket")
  59. self.conn, address = self.socket.accept() # accept new connection
  60. self.socket.settimeout(20.0)
  61. print(" - connection from: {}".format(address))
  62. except ssl.SSLError as e:
  63. self.conn = None
  64. self.ssl_error = str(e)
  65. print(" - SSLError: {}".format(str(e)))
  66. @ttfw_idf.idf_custom_test(env_tag="Example_WIFI", group="test-apps")
  67. def test_app_esp_openssl(env, extra_data):
  68. dut1 = env.get_dut("openssl_connect_test", "tools/test_apps/protocols/openssl", dut_class=ttfw_idf.ESP32DUT)
  69. # check and log bin size
  70. binary_file = os.path.join(dut1.app.binary_path, "openssl_connect_test.bin")
  71. bin_size = os.path.getsize(binary_file)
  72. ttfw_idf.log_performance("openssl_connect_test_bin_size", "{}KB".format(bin_size // 1024))
  73. dut1.start_app()
  74. esp_ip = dut1.expect(re.compile(r" IPv4 address: ([0-9]+\.[0-9]+\.[0-9]+\.[0-9]+)"), timeout=30)
  75. print("Got IP={}".format(esp_ip[0]))
  76. ip = get_my_ip()
  77. server_port = 2222
  78. def start_case(case, desc, negotiated_protocol, result):
  79. with TlsServer(server_port, negotiated_protocol=negotiated_protocol):
  80. print("Starting {}: {}".format(case, desc))
  81. dut1.write("conn {} {} {}".format(ip, server_port, case))
  82. dut1.expect(re.compile(result), timeout=10)
  83. return case
  84. # start test cases
  85. start_case(
  86. case="CONFIG_TLSV1_1_CONNECT_WRONG_CERT_VERIFY_NONE",
  87. desc="Connect with verify_none mode using wrong certs",
  88. negotiated_protocol=ssl.PROTOCOL_TLSv1_1,
  89. result="SSL Connection Succeed")
  90. start_case(
  91. case="CONFIG_TLSV1_1_CONNECT_WRONG_CERT_VERIFY_PEER",
  92. desc="Connect with verify_peer mode using wrong certs",
  93. negotiated_protocol=ssl.PROTOCOL_TLSv1_1,
  94. result="SSL Connection Failed")
  95. start_case(
  96. case="CONFIG_TLSV1_2_CONNECT_WRONG_CERT_VERIFY_NONE",
  97. desc="Connect with verify_none mode using wrong certs",
  98. negotiated_protocol=ssl.PROTOCOL_TLSv1_2,
  99. result="SSL Connection Succeed")
  100. start_case(
  101. case="CONFIG_TLSV1_1_CONNECT_WRONG_CERT_VERIFY_PEER",
  102. desc="Connect with verify_peer mode using wrong certs",
  103. negotiated_protocol=ssl.PROTOCOL_TLSv1_2,
  104. result="SSL Connection Failed")
  105. if __name__ == '__main__':
  106. test_app_esp_openssl()