gen_crt_bundle.py 7.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227
  1. #!/usr/bin/env python
  2. #
  3. # ESP32 x509 certificate bundle generation utility
  4. #
  5. # Converts PEM and DER certificates to a custom bundle format which stores just the
  6. # subject name and public key to reduce space
  7. #
  8. # The bundle will have the format: number of certificates; crt 1 subject name length; crt 1 public key length;
  9. # crt 1 subject name; crt 1 public key; crt 2...
  10. #
  11. # Copyright 2018-2019 Espressif Systems (Shanghai) PTE LTD
  12. #
  13. # Licensed under the Apache License, Version 2.0 (the "License");
  14. # you may not use this file except in compliance with the License.
  15. # You may obtain a copy of the License at
  16. #
  17. # http:#www.apache.org/licenses/LICENSE-2.0
  18. #
  19. # Unless required by applicable law or agreed to in writing, software
  20. # distributed under the License is distributed on an "AS IS" BASIS,
  21. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  22. # See the License for the specific language governing permissions and
  23. # limitations under the License.
  24. from __future__ import with_statement
  25. import argparse
  26. import csv
  27. import os
  28. import re
  29. import struct
  30. import sys
  31. from io import open
  32. try:
  33. from cryptography import x509
  34. from cryptography.hazmat.backends import default_backend
  35. from cryptography.hazmat.primitives import serialization
  36. except ImportError:
  37. print('The cryptography package is not installed.'
  38. 'Please refer to the Get Started section of the ESP-IDF Programming Guide for '
  39. 'setting up the required packages.')
  40. raise
  41. ca_bundle_bin_file = 'x509_crt_bundle'
  42. quiet = False
  43. def status(msg):
  44. """ Print status message to stderr """
  45. if not quiet:
  46. critical(msg)
  47. def critical(msg):
  48. """ Print critical message to stderr """
  49. sys.stderr.write('gen_crt_bundle.py: ')
  50. sys.stderr.write(msg)
  51. sys.stderr.write('\n')
  52. class CertificateBundle:
  53. def __init__(self):
  54. self.certificates = []
  55. self.compressed_crts = []
  56. if os.path.isfile(ca_bundle_bin_file):
  57. os.remove(ca_bundle_bin_file)
  58. def add_from_path(self, crts_path):
  59. found = False
  60. for file_path in os.listdir(crts_path):
  61. found |= self.add_from_file(os.path.join(crts_path, file_path))
  62. if found is False:
  63. raise InputError('No valid x509 certificates found in %s' % crts_path)
  64. def add_from_file(self, file_path):
  65. try:
  66. if file_path.endswith('.pem'):
  67. status('Parsing certificates from %s' % file_path)
  68. with open(file_path, 'r', encoding='utf-8') as f:
  69. crt_str = f.read()
  70. self.add_from_pem(crt_str)
  71. return True
  72. elif file_path.endswith('.der'):
  73. status('Parsing certificates from %s' % file_path)
  74. with open(file_path, 'rb') as f:
  75. crt_str = f.read()
  76. self.add_from_der(crt_str)
  77. return True
  78. except ValueError:
  79. critical('Invalid certificate in %s' % file_path)
  80. raise InputError('Invalid certificate')
  81. return False
  82. def add_from_pem(self, crt_str):
  83. """ A single PEM file may have multiple certificates """
  84. crt = ''
  85. count = 0
  86. start = False
  87. for strg in crt_str.splitlines(True):
  88. if strg == '-----BEGIN CERTIFICATE-----\n' and start is False:
  89. crt = ''
  90. start = True
  91. elif strg == '-----END CERTIFICATE-----\n' and start is True:
  92. crt += strg + '\n'
  93. start = False
  94. self.certificates.append(x509.load_pem_x509_certificate(crt.encode(), default_backend()))
  95. count += 1
  96. if start is True:
  97. crt += strg
  98. if(count == 0):
  99. raise InputError('No certificate found')
  100. status('Successfully added %d certificates' % count)
  101. def add_from_der(self, crt_str):
  102. self.certificates.append(x509.load_der_x509_certificate(crt_str, default_backend()))
  103. status('Successfully added 1 certificate')
  104. def create_bundle(self):
  105. # Sort certificates in order to do binary search when looking up certificates
  106. self.certificates = sorted(self.certificates, key=lambda cert: cert.subject.public_bytes(default_backend()))
  107. bundle = struct.pack('>H', len(self.certificates))
  108. for crt in self.certificates:
  109. """ Read the public key as DER format """
  110. pub_key = crt.public_key()
  111. pub_key_der = pub_key.public_bytes(serialization.Encoding.DER, serialization.PublicFormat.SubjectPublicKeyInfo)
  112. """ Read the subject name as DER format """
  113. sub_name_der = crt.subject.public_bytes(default_backend())
  114. name_len = len(sub_name_der)
  115. key_len = len(pub_key_der)
  116. len_data = struct.pack('>HH', name_len, key_len)
  117. bundle += len_data
  118. bundle += sub_name_der
  119. bundle += pub_key_der
  120. return bundle
  121. def add_with_filter(self, crts_path, filter_path):
  122. filter_set = set()
  123. with open(filter_path, 'r', encoding='utf-8') as f:
  124. csv_reader = csv.reader(f, delimiter=',')
  125. # Skip header
  126. next(csv_reader)
  127. for row in csv_reader:
  128. filter_set.add(row[1])
  129. status('Parsing certificates from %s' % crts_path)
  130. crt_str = []
  131. with open(crts_path, 'r', encoding='utf-8') as f:
  132. crt_str = f.read()
  133. # Split all certs into a list of (name, certificate string) tuples
  134. pem_crts = re.findall(r'(^.+?)\n(=+\n[\s\S]+?END CERTIFICATE-----\n)', crt_str, re.MULTILINE)
  135. filtered_crts = ''
  136. for name, crt in pem_crts:
  137. if name in filter_set:
  138. filtered_crts += crt
  139. self.add_from_pem(filtered_crts)
  140. class InputError(RuntimeError):
  141. def __init__(self, e):
  142. super(InputError, self).__init__(e)
  143. def main():
  144. global quiet
  145. parser = argparse.ArgumentParser(description='ESP-IDF x509 certificate bundle utility')
  146. parser.add_argument('--quiet', '-q', help="Don't print non-critical status messages to stderr", action='store_true')
  147. parser.add_argument('--input', '-i', nargs='+', required=True,
  148. help='Paths to the custom certificate folders or files to parse, parses all .pem or .der files')
  149. parser.add_argument('--filter', '-f', help='Path to CSV-file where the second columns contains the name of the certificates \
  150. that should be included from cacrt_all.pem')
  151. args = parser.parse_args()
  152. quiet = args.quiet
  153. bundle = CertificateBundle()
  154. for path in args.input:
  155. if os.path.isfile(path):
  156. if os.path.basename(path) == 'cacrt_all.pem' and args.filter:
  157. bundle.add_with_filter(path, args.filter)
  158. else:
  159. bundle.add_from_file(path)
  160. elif os.path.isdir(path):
  161. bundle.add_from_path(path)
  162. else:
  163. raise InputError('Invalid --input=%s, is neither file nor folder' % args.input)
  164. status('Successfully added %d certificates in total' % len(bundle.certificates))
  165. crt_bundle = bundle.create_bundle()
  166. with open(ca_bundle_bin_file, 'wb') as f:
  167. f.write(crt_bundle)
  168. if __name__ == '__main__':
  169. try:
  170. main()
  171. except InputError as e:
  172. print(e)
  173. sys.exit(2)