| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165 |
- # SPDX-FileCopyrightText: 2018-2022 Espressif Systems (Shanghai) CO LTD
- # SPDX-License-Identifier: Apache-2.0
- # APIs for interpreting and creating protobuf packets for
- # protocomm endpoint with security type protocomm_security2
- from typing import Any, Type
- import proto
- from cryptography.hazmat.primitives.ciphers.aead import AESGCM
- from utils import long_to_bytes, str_to_bytes
- from .security import Security
- from .srp6a import Srp6a, generate_salt_and_verifier
- AES_KEY_LEN = 256 // 8
- # Enum for state of protocomm_security1 FSM
- class security_state:
- REQUEST1 = 0
- RESPONSE1_REQUEST2 = 1
- RESPONSE2 = 2
- FINISHED = 3
- def sec2_gen_salt_verifier(username: str, password: str, salt_len: int) -> Any:
- salt, verifier = generate_salt_and_verifier(username, password, len_s=salt_len)
- salt_str = ', '.join([format(b, '#04x') for b in salt])
- salt_c_arr = '\n '.join(salt_str[i: i + 96] for i in range(0, len(salt_str), 96))
- print(f'static const char sec2_salt[] = {{\n {salt_c_arr}\n}};\n')
- verifier_str = ', '.join([format(b, '#04x') for b in verifier])
- verifier_c_arr = '\n '.join(verifier_str[i: i + 96] for i in range(0, len(verifier_str), 96))
- print(f'static const char sec2_verifier[] = {{\n {verifier_c_arr}\n}};\n')
- class Security2(Security):
- def __init__(self, username: str, password: str, verbose: bool) -> None:
- # Initialize state of the security2 FSM
- self.session_state = security_state.REQUEST1
- self.username = username
- self.password = password
- self.verbose = verbose
- self.srp6a_ctx: Type[Srp6a]
- self.cipher: Type[AESGCM]
- self.client_pop_key = None
- self.nonce = None
- Security.__init__(self, self.security2_session)
- def security2_session(self, response_data: bytes) -> Any:
- # protocomm security2 FSM which interprets/forms
- # protobuf packets according to present state of session
- if (self.session_state == security_state.REQUEST1):
- self.session_state = security_state.RESPONSE1_REQUEST2
- return self.setup0_request()
- if (self.session_state == security_state.RESPONSE1_REQUEST2):
- self.session_state = security_state.RESPONSE2
- self.setup0_response(response_data)
- return self.setup1_request()
- if (self.session_state == security_state.RESPONSE2):
- self.session_state = security_state.FINISHED
- self.setup1_response(response_data)
- return None
- print('---- Unexpected state! ----')
- return None
- def _print_verbose(self, data: str) -> None:
- if (self.verbose):
- print(f'\x1b[32;20m++++ {data} ++++\x1b[0m')
- def setup0_request(self) -> Any:
- # Form SessionCmd0 request packet using client public key
- setup_req = proto.session_pb2.SessionData()
- setup_req.sec_ver = proto.session_pb2.SecScheme2
- setup_req.sec2.msg = proto.sec2_pb2.S2Session_Command0
- setup_req.sec2.sc0.client_username = str_to_bytes(self.username)
- self.srp6a_ctx = Srp6a(self.username, self.password)
- if self.srp6a_ctx is None:
- raise RuntimeError('Failed to initialize SRP6a instance!')
- client_pubkey = long_to_bytes(self.srp6a_ctx.A)
- setup_req.sec2.sc0.client_pubkey = client_pubkey
- self._print_verbose(f'Client Public Key:\t0x{client_pubkey.hex()}')
- return setup_req.SerializeToString().decode('latin-1')
- def setup0_response(self, response_data: bytes) -> None:
- # Interpret SessionResp0 response packet
- setup_resp = proto.session_pb2.SessionData()
- setup_resp.ParseFromString(str_to_bytes(response_data))
- self._print_verbose(f'Security version:\t{str(setup_resp.sec_ver)}')
- if setup_resp.sec_ver != proto.session_pb2.SecScheme2:
- raise RuntimeError('Incorrect security scheme')
- # Device public key, random salt and password verifier
- device_pubkey = setup_resp.sec2.sr0.device_pubkey
- device_salt = setup_resp.sec2.sr0.device_salt
- self._print_verbose(f'Device Public Key:\t0x{device_pubkey.hex()}')
- self.client_pop_key = self.srp6a_ctx.process_challenge(device_salt, device_pubkey)
- def setup1_request(self) -> Any:
- # Form SessionCmd1 request packet using encrypted device public key
- setup_req = proto.session_pb2.SessionData()
- setup_req.sec_ver = proto.session_pb2.SecScheme2
- setup_req.sec2.msg = proto.sec2_pb2.S2Session_Command1
- # Encrypt device public key and attach to the request packet
- if self.client_pop_key is None:
- raise RuntimeError('Failed to generate client proof!')
- self._print_verbose(f'Client Proof:\t0x{self.client_pop_key.hex()}')
- setup_req.sec2.sc1.client_proof = self.client_pop_key
- return setup_req.SerializeToString().decode('latin-1')
- def setup1_response(self, response_data: bytes) -> Any:
- # Interpret SessionResp1 response packet
- setup_resp = proto.session_pb2.SessionData()
- setup_resp.ParseFromString(str_to_bytes(response_data))
- # Ensure security scheme matches
- if setup_resp.sec_ver == proto.session_pb2.SecScheme2:
- # Read encrypyed device proof string
- device_proof = setup_resp.sec2.sr1.device_proof
- self._print_verbose(f'Device Proof:\t0x{device_proof.hex()}')
- self.srp6a_ctx.verify_session(device_proof)
- if not self.srp6a_ctx.authenticated():
- raise RuntimeError('Failed to verify device proof')
- else:
- raise RuntimeError('Unsupported security protocol')
- # Getting the shared secret
- shared_secret = self.srp6a_ctx.get_session_key()
- self._print_verbose(f'Shared Secret:\t0x{shared_secret.hex()}')
- # Using the first 256 bits of a 512 bit key
- session_key = shared_secret[:AES_KEY_LEN]
- self._print_verbose(f'Session Key:\t0x{session_key.hex()}')
- # 96-bit nonce
- self.nonce = setup_resp.sec2.sr1.device_nonce
- if self.nonce is None:
- raise RuntimeError('Received invalid nonce from device!')
- self._print_verbose(f'Nonce:\t0x{self.nonce.hex()}')
- # Initialize the encryption engine with Shared Key and initialization vector
- self.cipher = AESGCM(session_key)
- if self.cipher is None:
- raise RuntimeError('Failed to initialize AES-GCM cryptographic engine!')
- def encrypt_data(self, data: bytes) -> Any:
- return self.cipher.encrypt(self.nonce, data, None)
- def decrypt_data(self, data: bytes) -> Any:
- return self.cipher.decrypt(self.nonce, data, None)
|