security2.py 6.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165
  1. # SPDX-FileCopyrightText: 2018-2022 Espressif Systems (Shanghai) CO LTD
  2. # SPDX-License-Identifier: Apache-2.0
  3. # APIs for interpreting and creating protobuf packets for
  4. # protocomm endpoint with security type protocomm_security2
  5. from typing import Any, Type
  6. import proto
  7. from cryptography.hazmat.primitives.ciphers.aead import AESGCM
  8. from utils import long_to_bytes, str_to_bytes
  9. from .security import Security
  10. from .srp6a import Srp6a, generate_salt_and_verifier
  11. AES_KEY_LEN = 256 // 8
  12. # Enum for state of protocomm_security1 FSM
  13. class security_state:
  14. REQUEST1 = 0
  15. RESPONSE1_REQUEST2 = 1
  16. RESPONSE2 = 2
  17. FINISHED = 3
  18. def sec2_gen_salt_verifier(username: str, password: str, salt_len: int) -> Any:
  19. salt, verifier = generate_salt_and_verifier(username, password, len_s=salt_len)
  20. salt_str = ', '.join([format(b, '#04x') for b in salt])
  21. salt_c_arr = '\n '.join(salt_str[i: i + 96] for i in range(0, len(salt_str), 96))
  22. print(f'static const char sec2_salt[] = {{\n {salt_c_arr}\n}};\n')
  23. verifier_str = ', '.join([format(b, '#04x') for b in verifier])
  24. verifier_c_arr = '\n '.join(verifier_str[i: i + 96] for i in range(0, len(verifier_str), 96))
  25. print(f'static const char sec2_verifier[] = {{\n {verifier_c_arr}\n}};\n')
  26. class Security2(Security):
  27. def __init__(self, username: str, password: str, verbose: bool) -> None:
  28. # Initialize state of the security2 FSM
  29. self.session_state = security_state.REQUEST1
  30. self.username = username
  31. self.password = password
  32. self.verbose = verbose
  33. self.srp6a_ctx: Type[Srp6a]
  34. self.cipher: Type[AESGCM]
  35. self.client_pop_key = None
  36. self.nonce = None
  37. Security.__init__(self, self.security2_session)
  38. def security2_session(self, response_data: bytes) -> Any:
  39. # protocomm security2 FSM which interprets/forms
  40. # protobuf packets according to present state of session
  41. if (self.session_state == security_state.REQUEST1):
  42. self.session_state = security_state.RESPONSE1_REQUEST2
  43. return self.setup0_request()
  44. if (self.session_state == security_state.RESPONSE1_REQUEST2):
  45. self.session_state = security_state.RESPONSE2
  46. self.setup0_response(response_data)
  47. return self.setup1_request()
  48. if (self.session_state == security_state.RESPONSE2):
  49. self.session_state = security_state.FINISHED
  50. self.setup1_response(response_data)
  51. return None
  52. print('---- Unexpected state! ----')
  53. return None
  54. def _print_verbose(self, data: str) -> None:
  55. if (self.verbose):
  56. print(f'\x1b[32;20m++++ {data} ++++\x1b[0m')
  57. def setup0_request(self) -> Any:
  58. # Form SessionCmd0 request packet using client public key
  59. setup_req = proto.session_pb2.SessionData()
  60. setup_req.sec_ver = proto.session_pb2.SecScheme2
  61. setup_req.sec2.msg = proto.sec2_pb2.S2Session_Command0
  62. setup_req.sec2.sc0.client_username = str_to_bytes(self.username)
  63. self.srp6a_ctx = Srp6a(self.username, self.password)
  64. if self.srp6a_ctx is None:
  65. raise RuntimeError('Failed to initialize SRP6a instance!')
  66. client_pubkey = long_to_bytes(self.srp6a_ctx.A)
  67. setup_req.sec2.sc0.client_pubkey = client_pubkey
  68. self._print_verbose(f'Client Public Key:\t0x{client_pubkey.hex()}')
  69. return setup_req.SerializeToString().decode('latin-1')
  70. def setup0_response(self, response_data: bytes) -> None:
  71. # Interpret SessionResp0 response packet
  72. setup_resp = proto.session_pb2.SessionData()
  73. setup_resp.ParseFromString(str_to_bytes(response_data))
  74. self._print_verbose(f'Security version:\t{str(setup_resp.sec_ver)}')
  75. if setup_resp.sec_ver != proto.session_pb2.SecScheme2:
  76. raise RuntimeError('Incorrect security scheme')
  77. # Device public key, random salt and password verifier
  78. device_pubkey = setup_resp.sec2.sr0.device_pubkey
  79. device_salt = setup_resp.sec2.sr0.device_salt
  80. self._print_verbose(f'Device Public Key:\t0x{device_pubkey.hex()}')
  81. self.client_pop_key = self.srp6a_ctx.process_challenge(device_salt, device_pubkey)
  82. def setup1_request(self) -> Any:
  83. # Form SessionCmd1 request packet using encrypted device public key
  84. setup_req = proto.session_pb2.SessionData()
  85. setup_req.sec_ver = proto.session_pb2.SecScheme2
  86. setup_req.sec2.msg = proto.sec2_pb2.S2Session_Command1
  87. # Encrypt device public key and attach to the request packet
  88. if self.client_pop_key is None:
  89. raise RuntimeError('Failed to generate client proof!')
  90. self._print_verbose(f'Client Proof:\t0x{self.client_pop_key.hex()}')
  91. setup_req.sec2.sc1.client_proof = self.client_pop_key
  92. return setup_req.SerializeToString().decode('latin-1')
  93. def setup1_response(self, response_data: bytes) -> Any:
  94. # Interpret SessionResp1 response packet
  95. setup_resp = proto.session_pb2.SessionData()
  96. setup_resp.ParseFromString(str_to_bytes(response_data))
  97. # Ensure security scheme matches
  98. if setup_resp.sec_ver == proto.session_pb2.SecScheme2:
  99. # Read encrypyed device proof string
  100. device_proof = setup_resp.sec2.sr1.device_proof
  101. self._print_verbose(f'Device Proof:\t0x{device_proof.hex()}')
  102. self.srp6a_ctx.verify_session(device_proof)
  103. if not self.srp6a_ctx.authenticated():
  104. raise RuntimeError('Failed to verify device proof')
  105. else:
  106. raise RuntimeError('Unsupported security protocol')
  107. # Getting the shared secret
  108. shared_secret = self.srp6a_ctx.get_session_key()
  109. self._print_verbose(f'Shared Secret:\t0x{shared_secret.hex()}')
  110. # Using the first 256 bits of a 512 bit key
  111. session_key = shared_secret[:AES_KEY_LEN]
  112. self._print_verbose(f'Session Key:\t0x{session_key.hex()}')
  113. # 96-bit nonce
  114. self.nonce = setup_resp.sec2.sr1.device_nonce
  115. if self.nonce is None:
  116. raise RuntimeError('Received invalid nonce from device!')
  117. self._print_verbose(f'Nonce:\t0x{self.nonce.hex()}')
  118. # Initialize the encryption engine with Shared Key and initialization vector
  119. self.cipher = AESGCM(session_key)
  120. if self.cipher is None:
  121. raise RuntimeError('Failed to initialize AES-GCM cryptographic engine!')
  122. def encrypt_data(self, data: bytes) -> Any:
  123. return self.cipher.encrypt(self.nonce, data, None)
  124. def decrypt_data(self, data: bytes) -> Any:
  125. return self.cipher.decrypt(self.nonce, data, None)