security1.py 6.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140
  1. # SPDX-FileCopyrightText: 2018-2022 Espressif Systems (Shanghai) CO LTD
  2. # SPDX-License-Identifier: Apache-2.0
  3. #
  4. # APIs for interpreting and creating protobuf packets for
  5. # protocomm endpoint with security type protocomm_security1
  6. import proto
  7. from cryptography.hazmat.backends import default_backend
  8. from cryptography.hazmat.primitives import hashes, serialization
  9. from cryptography.hazmat.primitives.asymmetric.x25519 import X25519PrivateKey, X25519PublicKey
  10. from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
  11. from utils import long_to_bytes, str_to_bytes
  12. from .security import Security
  13. def a_xor_b(a: bytes, b: bytes) -> bytes:
  14. return b''.join(long_to_bytes(a[i] ^ b[i]) for i in range(0, len(b)))
  15. # Enum for state of protocomm_security1 FSM
  16. class security_state:
  17. REQUEST1 = 0
  18. RESPONSE1_REQUEST2 = 1
  19. RESPONSE2 = 2
  20. FINISHED = 3
  21. class Security1(Security):
  22. def __init__(self, pop, verbose):
  23. # Initialize state of the security1 FSM
  24. self.session_state = security_state.REQUEST1
  25. self.pop = str_to_bytes(pop)
  26. self.verbose = verbose
  27. Security.__init__(self, self.security1_session)
  28. def security1_session(self, response_data):
  29. # protocomm security1 FSM which interprets/forms
  30. # protobuf packets according to present state of session
  31. if (self.session_state == security_state.REQUEST1):
  32. self.session_state = security_state.RESPONSE1_REQUEST2
  33. return self.setup0_request()
  34. elif (self.session_state == security_state.RESPONSE1_REQUEST2):
  35. self.session_state = security_state.RESPONSE2
  36. self.setup0_response(response_data)
  37. return self.setup1_request()
  38. elif (self.session_state == security_state.RESPONSE2):
  39. self.session_state = security_state.FINISHED
  40. self.setup1_response(response_data)
  41. return None
  42. print('Unexpected state')
  43. return None
  44. def __generate_key(self):
  45. # Generate private and public key pair for client
  46. self.client_private_key = X25519PrivateKey.generate()
  47. self.client_public_key = self.client_private_key.public_key().public_bytes(
  48. encoding=serialization.Encoding.Raw,
  49. format=serialization.PublicFormat.Raw)
  50. def _print_verbose(self, data):
  51. if (self.verbose):
  52. print(f'\x1b[32;20m++++ {data} ++++\x1b[0m')
  53. def setup0_request(self):
  54. # Form SessionCmd0 request packet using client public key
  55. setup_req = proto.session_pb2.SessionData()
  56. setup_req.sec_ver = proto.session_pb2.SecScheme1
  57. self.__generate_key()
  58. setup_req.sec1.sc0.client_pubkey = self.client_public_key
  59. self._print_verbose(f'Client Public Key:\t0x{self.client_public_key.hex()}')
  60. return setup_req.SerializeToString().decode('latin-1')
  61. def setup0_response(self, response_data):
  62. # Interpret SessionResp0 response packet
  63. setup_resp = proto.session_pb2.SessionData()
  64. setup_resp.ParseFromString(str_to_bytes(response_data))
  65. self._print_verbose('Security version:\t' + str(setup_resp.sec_ver))
  66. if setup_resp.sec_ver != proto.session_pb2.SecScheme1:
  67. raise RuntimeError('Incorrect security scheme')
  68. self.device_public_key = setup_resp.sec1.sr0.device_pubkey
  69. # Device random is the initialization vector
  70. device_random = setup_resp.sec1.sr0.device_random
  71. self._print_verbose(f'Device Public Key:\t0x{self.device_public_key.hex()}')
  72. self._print_verbose(f'Device Random:\t0x{device_random.hex()}')
  73. # Calculate Curve25519 shared key using Client private key and Device public key
  74. sharedK = self.client_private_key.exchange(X25519PublicKey.from_public_bytes(self.device_public_key))
  75. self._print_verbose(f'Shared Key:\t0x{sharedK.hex()}')
  76. # If PoP is provided, XOR SHA256 of PoP with the previously
  77. # calculated Shared Key to form the actual Shared Key
  78. if len(self.pop) > 0:
  79. # Calculate SHA256 of PoP
  80. h = hashes.Hash(hashes.SHA256(), backend=default_backend())
  81. h.update(self.pop)
  82. digest = h.finalize()
  83. # XOR with and update Shared Key
  84. sharedK = a_xor_b(sharedK, digest)
  85. self._print_verbose(f'Updated Shared Key (Shared key XORed with PoP):\t0x{sharedK.hex()}')
  86. # Initialize the encryption engine with Shared Key and initialization vector
  87. cipher = Cipher(algorithms.AES(sharedK), modes.CTR(device_random), backend=default_backend())
  88. self.cipher = cipher.encryptor()
  89. def setup1_request(self):
  90. # Form SessionCmd1 request packet using encrypted device public key
  91. setup_req = proto.session_pb2.SessionData()
  92. setup_req.sec_ver = proto.session_pb2.SecScheme1
  93. setup_req.sec1.msg = proto.sec1_pb2.Session_Command1
  94. # Encrypt device public key and attach to the request packet
  95. client_verify = self.cipher.update(self.device_public_key)
  96. self._print_verbose(f'Client Proof:\t0x{client_verify.hex()}')
  97. setup_req.sec1.sc1.client_verify_data = client_verify
  98. return setup_req.SerializeToString().decode('latin-1')
  99. def setup1_response(self, response_data):
  100. # Interpret SessionResp1 response packet
  101. setup_resp = proto.session_pb2.SessionData()
  102. setup_resp.ParseFromString(str_to_bytes(response_data))
  103. # Ensure security scheme matches
  104. if setup_resp.sec_ver == proto.session_pb2.SecScheme1:
  105. # Read encrypyed device verify string
  106. device_verify = setup_resp.sec1.sr1.device_verify_data
  107. self._print_verbose(f'Device Proof:\t0x{device_verify.hex()}')
  108. # Decrypt the device verify string
  109. enc_client_pubkey = self.cipher.update(setup_resp.sec1.sr1.device_verify_data)
  110. # Match decryped string with client public key
  111. if enc_client_pubkey != self.client_public_key:
  112. raise RuntimeError('Failed to verify device!')
  113. else:
  114. raise RuntimeError('Unsupported security protocol')
  115. def encrypt_data(self, data):
  116. return self.cipher.update(data)
  117. def decrypt_data(self, data):
  118. return self.cipher.update(data)