esp_local_ctrl.py 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437
  1. #!/usr/bin/env python
  2. #
  3. # SPDX-FileCopyrightText: 2018-2022 Espressif Systems (Shanghai) CO LTD
  4. # SPDX-License-Identifier: Apache-2.0
  5. #
  6. import argparse
  7. import asyncio
  8. import json
  9. import os
  10. import ssl
  11. import struct
  12. import sys
  13. import textwrap
  14. from getpass import getpass
  15. import proto_lc
  16. try:
  17. import esp_prov
  18. import security
  19. except ImportError:
  20. idf_path = os.environ['IDF_PATH']
  21. sys.path.insert(0, idf_path + '/components/protocomm/python')
  22. sys.path.insert(1, idf_path + '/tools/esp_prov')
  23. import esp_prov
  24. import security
  25. # Set this to true to allow exceptions to be thrown
  26. config_throw_except = False
  27. # Property types enum
  28. PROP_TYPE_TIMESTAMP = 0
  29. PROP_TYPE_INT32 = 1
  30. PROP_TYPE_BOOLEAN = 2
  31. PROP_TYPE_STRING = 3
  32. # Property flags enum
  33. PROP_FLAG_READONLY = (1 << 0)
  34. def prop_typestr(prop):
  35. if prop['type'] == PROP_TYPE_TIMESTAMP:
  36. return 'TIME(us)'
  37. elif prop['type'] == PROP_TYPE_INT32:
  38. return 'INT32'
  39. elif prop['type'] == PROP_TYPE_BOOLEAN:
  40. return 'BOOLEAN'
  41. elif prop['type'] == PROP_TYPE_STRING:
  42. return 'STRING'
  43. return 'UNKNOWN'
  44. def encode_prop_value(prop, value):
  45. try:
  46. if prop['type'] == PROP_TYPE_TIMESTAMP:
  47. return struct.pack('q', value)
  48. elif prop['type'] == PROP_TYPE_INT32:
  49. return struct.pack('i', value)
  50. elif prop['type'] == PROP_TYPE_BOOLEAN:
  51. return struct.pack('?', value)
  52. elif prop['type'] == PROP_TYPE_STRING:
  53. return bytes(value, encoding='latin-1')
  54. return value
  55. except struct.error as e:
  56. print(e)
  57. return None
  58. def decode_prop_value(prop, value):
  59. try:
  60. if prop['type'] == PROP_TYPE_TIMESTAMP:
  61. return struct.unpack('q', value)[0]
  62. elif prop['type'] == PROP_TYPE_INT32:
  63. return struct.unpack('i', value)[0]
  64. elif prop['type'] == PROP_TYPE_BOOLEAN:
  65. return struct.unpack('?', value)[0]
  66. elif prop['type'] == PROP_TYPE_STRING:
  67. return value.decode('latin-1')
  68. return value
  69. except struct.error as e:
  70. print(e)
  71. return None
  72. def str_to_prop_value(prop, strval):
  73. try:
  74. if prop['type'] == PROP_TYPE_TIMESTAMP:
  75. return int(strval)
  76. elif prop['type'] == PROP_TYPE_INT32:
  77. return int(strval)
  78. elif prop['type'] == PROP_TYPE_BOOLEAN:
  79. return bool(strval)
  80. elif prop['type'] == PROP_TYPE_STRING:
  81. return strval
  82. return strval
  83. except ValueError as e:
  84. print(e)
  85. return None
  86. def prop_is_readonly(prop):
  87. return (prop['flags'] & PROP_FLAG_READONLY) != 0
  88. def on_except(err):
  89. if config_throw_except:
  90. raise RuntimeError(err)
  91. else:
  92. print(err)
  93. def get_security(secver, username, password, pop='', verbose=False):
  94. if secver == 2:
  95. return security.Security2(username, password, verbose)
  96. if secver == 1:
  97. return security.Security1(pop, verbose)
  98. if secver == 0:
  99. return security.Security0(verbose)
  100. return None
  101. async def get_transport(sel_transport, service_name, check_hostname):
  102. try:
  103. tp = None
  104. if (sel_transport == 'http'):
  105. tp = esp_prov.transport.Transport_HTTP(service_name, None)
  106. elif (sel_transport == 'https'):
  107. example_path = os.environ['IDF_PATH'] + '/examples/protocols/esp_local_ctrl'
  108. cert_path = example_path + '/main/certs/rootCA.pem'
  109. ssl_ctx = ssl.create_default_context(cafile=cert_path)
  110. ssl_ctx.check_hostname = check_hostname
  111. tp = esp_prov.transport.Transport_HTTP(service_name, ssl_ctx)
  112. elif (sel_transport == 'ble'):
  113. tp = esp_prov.transport.Transport_BLE(
  114. devname=service_name, service_uuid='0000ffff-0000-1000-8000-00805f9b34fb',
  115. nu_lookup={'esp_local_ctrl/version': '0001',
  116. 'esp_local_ctrl/session': '0002',
  117. 'esp_local_ctrl/control': '0003'}
  118. )
  119. await tp.connect(devname=service_name)
  120. return tp
  121. except RuntimeError as e:
  122. on_except(e)
  123. return None
  124. async def version_match(tp, protover, verbose=False):
  125. try:
  126. response = await tp.send_data('esp_local_ctrl/version', protover)
  127. if verbose:
  128. print('esp_local_ctrl/version response : ', response)
  129. # First assume this to be a simple version string
  130. if response.lower() == protover.lower():
  131. return True
  132. try:
  133. # Else interpret this as JSON structure containing
  134. # information with versions and capabilities of both
  135. # provisioning service and application
  136. info = json.loads(response)
  137. if info['prov']['ver'].lower() == protover.lower():
  138. return True
  139. except ValueError:
  140. # If decoding as JSON fails, it means that capabilities
  141. # are not supported
  142. return False
  143. except Exception as e:
  144. on_except(e)
  145. return None
  146. async def has_capability(tp, capability='none', verbose=False):
  147. # Note : default value of `capability` argument cannot be empty string
  148. # because protocomm_httpd expects non zero content lengths
  149. try:
  150. response = await tp.send_data('esp_local_ctrl/version', capability)
  151. if verbose:
  152. print('esp_local_ctrl/version response : ', response)
  153. try:
  154. # Interpret this as JSON structure containing
  155. # information with versions and capabilities of both
  156. # provisioning service and application
  157. info = json.loads(response)
  158. supported_capabilities = info['prov']['cap']
  159. if capability.lower() == 'none':
  160. # No specific capability to check, but capabilities
  161. # feature is present so return True
  162. return True
  163. elif capability in supported_capabilities:
  164. return True
  165. return False
  166. except ValueError:
  167. # If decoding as JSON fails, it means that capabilities
  168. # are not supported
  169. return False
  170. except RuntimeError as e:
  171. on_except(e)
  172. return False
  173. async def establish_session(tp, sec):
  174. try:
  175. response = None
  176. while True:
  177. request = sec.security_session(response)
  178. if request is None:
  179. break
  180. response = await tp.send_data('esp_local_ctrl/session', request)
  181. if (response is None):
  182. return False
  183. return True
  184. except RuntimeError as e:
  185. on_except(e)
  186. return None
  187. async def get_all_property_values(tp, security_ctx):
  188. try:
  189. props = []
  190. message = proto_lc.get_prop_count_request(security_ctx)
  191. response = await tp.send_data('esp_local_ctrl/control', message)
  192. count = proto_lc.get_prop_count_response(security_ctx, response)
  193. if count == 0:
  194. raise RuntimeError('No properties found!')
  195. indices = [i for i in range(count)]
  196. message = proto_lc.get_prop_vals_request(security_ctx, indices)
  197. response = await tp.send_data('esp_local_ctrl/control', message)
  198. props = proto_lc.get_prop_vals_response(security_ctx, response)
  199. if len(props) != count:
  200. raise RuntimeError('Incorrect count of properties!', len(props), count)
  201. for p in props:
  202. p['value'] = decode_prop_value(p, p['value'])
  203. return props
  204. except RuntimeError as e:
  205. on_except(e)
  206. return []
  207. async def set_property_values(tp, security_ctx, props, indices, values, check_readonly=False):
  208. try:
  209. if check_readonly:
  210. for index in indices:
  211. if prop_is_readonly(props[index]):
  212. raise RuntimeError('Cannot set value of Read-Only property')
  213. message = proto_lc.set_prop_vals_request(security_ctx, indices, values)
  214. response = await tp.send_data('esp_local_ctrl/control', message)
  215. return proto_lc.set_prop_vals_response(security_ctx, response)
  216. except RuntimeError as e:
  217. on_except(e)
  218. return False
  219. def desc_format(*args):
  220. desc = ''
  221. for arg in args:
  222. desc += textwrap.fill(replace_whitespace=False, text=arg) + '\n'
  223. return desc
  224. async def main():
  225. parser = argparse.ArgumentParser(add_help=False)
  226. parser = argparse.ArgumentParser(description='Control an ESP32 running esp_local_ctrl service')
  227. parser.add_argument('--version', dest='version', type=str,
  228. help='Protocol version', default='')
  229. parser.add_argument('--transport', dest='transport', type=str,
  230. help='transport i.e http/https/ble', default='https')
  231. parser.add_argument('--name', dest='service_name', type=str,
  232. help='BLE Device Name / HTTP Server hostname or IP', default='')
  233. parser.add_argument('--sec_ver', dest='secver', type=int, default=None,
  234. help=desc_format(
  235. 'Protocomm security scheme used for secure '
  236. 'session establishment. Accepted values are :',
  237. '\t- 0 : No security',
  238. '\t- 1 : X25519 key exchange + AES-CTR encryption',
  239. '\t- 2 : SRP6a + AES-GCM encryption',
  240. '\t + Authentication using Proof of Possession (PoP)'))
  241. parser.add_argument('--pop', dest='pop', type=str, default='',
  242. help=desc_format(
  243. 'This specifies the Proof of possession (PoP) when security scheme 1 '
  244. 'is used'))
  245. parser.add_argument('--sec2_username', dest='sec2_usr', type=str, default='',
  246. help=desc_format(
  247. 'Username for security scheme 2 (SRP6a)'))
  248. parser.add_argument('--sec2_pwd', dest='sec2_pwd', type=str, default='',
  249. help=desc_format(
  250. 'Password for security scheme 2 (SRP6a)'))
  251. parser.add_argument('--sec2_gen_cred', help='Generate salt and verifier for security scheme 2 (SRP6a)', action='store_true')
  252. parser.add_argument('--sec2_salt_len', dest='sec2_salt_len', type=int, default=16,
  253. help=desc_format(
  254. 'Salt length for security scheme 2 (SRP6a)'))
  255. parser.add_argument('--dont-check-hostname', action='store_true',
  256. # If enabled, the certificate won't be rejected for hostname mismatch.
  257. # This option is hidden because it should be used only for testing purposes.
  258. help=argparse.SUPPRESS)
  259. parser.add_argument('-v', '--verbose', dest='verbose', help='increase output verbosity', action='store_true')
  260. args = parser.parse_args()
  261. if args.secver == 2 and args.sec2_gen_cred:
  262. if not args.sec2_usr or not args.sec2_pwd:
  263. raise ValueError('Username/password cannot be empty for security scheme 2 (SRP6a)')
  264. print('==== Salt-verifier for security scheme 2 (SRP6a) ====')
  265. security.sec2_gen_salt_verifier(args.sec2_usr, args.sec2_pwd, args.sec2_salt_len)
  266. sys.exit()
  267. if args.version != '':
  268. print(f'==== Esp_Ctrl Version: {args.version} ====')
  269. if args.service_name == '':
  270. args.service_name = 'my_esp_ctrl_device'
  271. if args.transport == 'http' or args.transport == 'https':
  272. args.service_name += '.local'
  273. obj_transport = await get_transport(args.transport, args.service_name, not args.dont_check_hostname)
  274. if obj_transport is None:
  275. raise RuntimeError('Failed to establish connection')
  276. # If security version not specified check in capabilities
  277. if args.secver is None:
  278. # First check if capabilities are supported or not
  279. if not await has_capability(obj_transport):
  280. print('Security capabilities could not be determined, please specify "--sec_ver" explicitly')
  281. raise ValueError('Invalid Security Version')
  282. # When no_sec is present, use security 0, else security 1
  283. args.secver = int(not await has_capability(obj_transport, 'no_sec'))
  284. print(f'==== Security Scheme: {args.secver} ====')
  285. if (args.secver == 1):
  286. if not await has_capability(obj_transport, 'no_pop'):
  287. if len(args.pop) == 0:
  288. print('---- Proof of Possession argument not provided ----')
  289. exit(2)
  290. elif len(args.pop) != 0:
  291. print('---- Proof of Possession will be ignored ----')
  292. args.pop = ''
  293. if (args.secver == 2):
  294. if len(args.sec2_usr) == 0:
  295. args.sec2_usr = input('Security Scheme 2 - SRP6a Username required: ')
  296. if len(args.sec2_pwd) == 0:
  297. prompt_str = 'Security Scheme 2 - SRP6a Password required: '
  298. args.sec2_pwd = getpass(prompt_str)
  299. obj_security = get_security(args.secver, args.sec2_usr, args.sec2_pwd, args.pop, args.verbose)
  300. if obj_security is None:
  301. raise ValueError('Invalid Security Version')
  302. if args.version != '':
  303. print('\n==== Verifying protocol version ====')
  304. if not await version_match(obj_transport, args.version, args.verbose):
  305. raise RuntimeError('Error in protocol version matching')
  306. print('==== Verified protocol version successfully ====')
  307. print('\n==== Starting Session ====')
  308. if not await establish_session(obj_transport, obj_security):
  309. print('Failed to establish session. Ensure that security scheme and proof of possession are correct')
  310. raise RuntimeError('Error in establishing session')
  311. print('==== Session Established ====')
  312. while True:
  313. properties = await get_all_property_values(obj_transport, obj_security)
  314. if len(properties) == 0:
  315. raise RuntimeError('Error in reading property value')
  316. print('\n==== Available Properties ====')
  317. print('{0: >4} {1: <16} {2: <10} {3: <16} {4: <16}'.format(
  318. 'S.N.', 'Name', 'Type', 'Flags', 'Value'))
  319. for i in range(len(properties)):
  320. print('[{0: >2}] {1: <16} {2: <10} {3: <16} {4: <16}'.format(
  321. i + 1, properties[i]['name'], prop_typestr(properties[i]),
  322. ['','Read-Only'][prop_is_readonly(properties[i])],
  323. str(properties[i]['value'])))
  324. select = 0
  325. while True:
  326. try:
  327. inval = input('\nSelect properties to set (0 to re-read, \'q\' to quit) : ')
  328. if inval.lower() == 'q':
  329. print('Quitting...')
  330. exit(0)
  331. invals = inval.split(',')
  332. selections = [int(val) for val in invals]
  333. if min(selections) < 0 or max(selections) > len(properties):
  334. raise ValueError('Invalid input')
  335. break
  336. except ValueError as e:
  337. print(str(e) + '! Retry...')
  338. if len(selections) == 1 and selections[0] == 0:
  339. continue
  340. set_values = []
  341. set_indices = []
  342. for select in selections:
  343. while True:
  344. inval = input('Enter value to set for property (' + properties[select - 1]['name'] + ') : ')
  345. value = encode_prop_value(properties[select - 1],
  346. str_to_prop_value(properties[select - 1], inval))
  347. if value is None:
  348. print('Invalid input! Retry...')
  349. continue
  350. break
  351. set_values += [value]
  352. set_indices += [select - 1]
  353. if not await set_property_values(obj_transport, obj_security, properties, set_indices, set_values):
  354. print('Failed to set values!')
  355. if __name__ == '__main__':
  356. asyncio.run(main())