esp_local_ctrl.py 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435
  1. #!/usr/bin/env python
  2. #
  3. # SPDX-FileCopyrightText: 2018-2022 Espressif Systems (Shanghai) CO LTD
  4. # SPDX-License-Identifier: Apache-2.0
  5. #
  6. import argparse
  7. import asyncio
  8. import json
  9. import os
  10. import ssl
  11. import struct
  12. import sys
  13. import textwrap
  14. from getpass import getpass
  15. import proto_lc
  16. try:
  17. import esp_prov
  18. import security
  19. except ImportError:
  20. idf_path = os.environ['IDF_PATH']
  21. sys.path.insert(0, idf_path + '/components/protocomm/python')
  22. sys.path.insert(1, idf_path + '/tools/esp_prov')
  23. import esp_prov
  24. import security
  25. # Set this to true to allow exceptions to be thrown
  26. config_throw_except = False
  27. # Property types enum
  28. PROP_TYPE_TIMESTAMP = 0
  29. PROP_TYPE_INT32 = 1
  30. PROP_TYPE_BOOLEAN = 2
  31. PROP_TYPE_STRING = 3
  32. # Property flags enum
  33. PROP_FLAG_READONLY = (1 << 0)
  34. def prop_typestr(prop):
  35. if prop['type'] == PROP_TYPE_TIMESTAMP:
  36. return 'TIME(us)'
  37. elif prop['type'] == PROP_TYPE_INT32:
  38. return 'INT32'
  39. elif prop['type'] == PROP_TYPE_BOOLEAN:
  40. return 'BOOLEAN'
  41. elif prop['type'] == PROP_TYPE_STRING:
  42. return 'STRING'
  43. return 'UNKNOWN'
  44. def encode_prop_value(prop, value):
  45. try:
  46. if prop['type'] == PROP_TYPE_TIMESTAMP:
  47. return struct.pack('q', value)
  48. elif prop['type'] == PROP_TYPE_INT32:
  49. return struct.pack('i', value)
  50. elif prop['type'] == PROP_TYPE_BOOLEAN:
  51. return struct.pack('?', value)
  52. elif prop['type'] == PROP_TYPE_STRING:
  53. return bytes(value, encoding='latin-1')
  54. return value
  55. except struct.error as e:
  56. print(e)
  57. return None
  58. def decode_prop_value(prop, value):
  59. try:
  60. if prop['type'] == PROP_TYPE_TIMESTAMP:
  61. return struct.unpack('q', value)[0]
  62. elif prop['type'] == PROP_TYPE_INT32:
  63. return struct.unpack('i', value)[0]
  64. elif prop['type'] == PROP_TYPE_BOOLEAN:
  65. return struct.unpack('?', value)[0]
  66. elif prop['type'] == PROP_TYPE_STRING:
  67. return value.decode('latin-1')
  68. return value
  69. except struct.error as e:
  70. print(e)
  71. return None
  72. def str_to_prop_value(prop, strval):
  73. try:
  74. if prop['type'] == PROP_TYPE_TIMESTAMP:
  75. return int(strval)
  76. elif prop['type'] == PROP_TYPE_INT32:
  77. return int(strval)
  78. elif prop['type'] == PROP_TYPE_BOOLEAN:
  79. return bool(strval)
  80. elif prop['type'] == PROP_TYPE_STRING:
  81. return strval
  82. return strval
  83. except ValueError as e:
  84. print(e)
  85. return None
  86. def prop_is_readonly(prop):
  87. return (prop['flags'] & PROP_FLAG_READONLY) != 0
  88. def on_except(err):
  89. if config_throw_except:
  90. raise RuntimeError(err)
  91. else:
  92. print(err)
  93. def get_security(secver, username, password, pop='', verbose=False):
  94. if secver == 2:
  95. return security.Security2(username, password, verbose)
  96. if secver == 1:
  97. return security.Security1(pop, verbose)
  98. if secver == 0:
  99. return security.Security0(verbose)
  100. return None
  101. async def get_transport(sel_transport, service_name, check_hostname):
  102. try:
  103. tp = None
  104. if (sel_transport == 'http'):
  105. example_path = os.environ['IDF_PATH'] + '/examples/protocols/esp_local_ctrl'
  106. cert_path = example_path + '/main/certs/rootCA.pem'
  107. ssl_ctx = ssl.create_default_context(cafile=cert_path)
  108. ssl_ctx.check_hostname = check_hostname
  109. tp = esp_prov.transport.Transport_HTTP(service_name, ssl_ctx)
  110. elif (sel_transport == 'ble'):
  111. tp = esp_prov.transport.Transport_BLE(
  112. devname=service_name, service_uuid='0000ffff-0000-1000-8000-00805f9b34fb',
  113. nu_lookup={'esp_local_ctrl/version': '0001',
  114. 'esp_local_ctrl/session': '0002',
  115. 'esp_local_ctrl/control': '0003'}
  116. )
  117. await tp.connect(devname=service_name)
  118. return tp
  119. except RuntimeError as e:
  120. on_except(e)
  121. return None
  122. async def version_match(tp, protover, verbose=False):
  123. try:
  124. response = await tp.send_data('esp_local_ctrl/version', protover)
  125. if verbose:
  126. print('esp_local_ctrl/version response : ', response)
  127. # First assume this to be a simple version string
  128. if response.lower() == protover.lower():
  129. return True
  130. try:
  131. # Else interpret this as JSON structure containing
  132. # information with versions and capabilities of both
  133. # provisioning service and application
  134. info = json.loads(response)
  135. if info['prov']['ver'].lower() == protover.lower():
  136. return True
  137. except ValueError:
  138. # If decoding as JSON fails, it means that capabilities
  139. # are not supported
  140. return False
  141. except Exception as e:
  142. on_except(e)
  143. return None
  144. async def has_capability(tp, capability='none', verbose=False):
  145. # Note : default value of `capability` argument cannot be empty string
  146. # because protocomm_httpd expects non zero content lengths
  147. try:
  148. response = await tp.send_data('esp_local_ctrl/version', capability)
  149. if verbose:
  150. print('esp_local_ctrl/version response : ', response)
  151. try:
  152. # Interpret this as JSON structure containing
  153. # information with versions and capabilities of both
  154. # provisioning service and application
  155. info = json.loads(response)
  156. supported_capabilities = info['prov']['cap']
  157. if capability.lower() == 'none':
  158. # No specific capability to check, but capabilities
  159. # feature is present so return True
  160. return True
  161. elif capability in supported_capabilities:
  162. return True
  163. return False
  164. except ValueError:
  165. # If decoding as JSON fails, it means that capabilities
  166. # are not supported
  167. return False
  168. except RuntimeError as e:
  169. on_except(e)
  170. return False
  171. async def establish_session(tp, sec):
  172. try:
  173. response = None
  174. while True:
  175. request = sec.security_session(response)
  176. if request is None:
  177. break
  178. response = await tp.send_data('esp_local_ctrl/session', request)
  179. if (response is None):
  180. return False
  181. return True
  182. except RuntimeError as e:
  183. on_except(e)
  184. return None
  185. async def get_all_property_values(tp, security_ctx):
  186. try:
  187. props = []
  188. message = proto_lc.get_prop_count_request(security_ctx)
  189. response = await tp.send_data('esp_local_ctrl/control', message)
  190. count = proto_lc.get_prop_count_response(security_ctx, response)
  191. if count == 0:
  192. raise RuntimeError('No properties found!')
  193. indices = [i for i in range(count)]
  194. message = proto_lc.get_prop_vals_request(security_ctx, indices)
  195. response = await tp.send_data('esp_local_ctrl/control', message)
  196. props = proto_lc.get_prop_vals_response(security_ctx, response)
  197. if len(props) != count:
  198. raise RuntimeError('Incorrect count of properties!', len(props), count)
  199. for p in props:
  200. p['value'] = decode_prop_value(p, p['value'])
  201. return props
  202. except RuntimeError as e:
  203. on_except(e)
  204. return []
  205. async def set_property_values(tp, security_ctx, props, indices, values, check_readonly=False):
  206. try:
  207. if check_readonly:
  208. for index in indices:
  209. if prop_is_readonly(props[index]):
  210. raise RuntimeError('Cannot set value of Read-Only property')
  211. message = proto_lc.set_prop_vals_request(security_ctx, indices, values)
  212. response = await tp.send_data('esp_local_ctrl/control', message)
  213. return proto_lc.set_prop_vals_response(security_ctx, response)
  214. except RuntimeError as e:
  215. on_except(e)
  216. return False
  217. def desc_format(*args):
  218. desc = ''
  219. for arg in args:
  220. desc += textwrap.fill(replace_whitespace=False, text=arg) + '\n'
  221. return desc
  222. async def main():
  223. parser = argparse.ArgumentParser(add_help=False)
  224. parser = argparse.ArgumentParser(description='Control an ESP32 running esp_local_ctrl service')
  225. parser.add_argument('--version', dest='version', type=str,
  226. help='Protocol version', default='')
  227. parser.add_argument('--transport', dest='transport', type=str,
  228. help='transport i.e http or ble', default='http')
  229. parser.add_argument('--name', dest='service_name', type=str,
  230. help='BLE Device Name / HTTP Server hostname or IP', default='')
  231. parser.add_argument('--sec_ver', dest='secver', type=int, default=None,
  232. help=desc_format(
  233. 'Protocomm security scheme used for secure '
  234. 'session establishment. Accepted values are :',
  235. '\t- 0 : No security',
  236. '\t- 1 : X25519 key exchange + AES-CTR encryption',
  237. '\t- 2 : SRP6a + AES-GCM encryption',
  238. '\t + Authentication using Proof of Possession (PoP)'))
  239. parser.add_argument('--pop', dest='pop', type=str, default='',
  240. help=desc_format(
  241. 'This specifies the Proof of possession (PoP) when security scheme 1 '
  242. 'is used'))
  243. parser.add_argument('--sec2_username', dest='sec2_usr', type=str, default='',
  244. help=desc_format(
  245. 'Username for security scheme 2 (SRP6a)'))
  246. parser.add_argument('--sec2_pwd', dest='sec2_pwd', type=str, default='',
  247. help=desc_format(
  248. 'Password for security scheme 2 (SRP6a)'))
  249. parser.add_argument('--sec2_gen_cred', help='Generate salt and verifier for security scheme 2 (SRP6a)', action='store_true')
  250. parser.add_argument('--sec2_salt_len', dest='sec2_salt_len', type=int, default=16,
  251. help=desc_format(
  252. 'Salt length for security scheme 2 (SRP6a)'))
  253. parser.add_argument('--dont-check-hostname', action='store_true',
  254. # If enabled, the certificate won't be rejected for hostname mismatch.
  255. # This option is hidden because it should be used only for testing purposes.
  256. help=argparse.SUPPRESS)
  257. parser.add_argument('-v', '--verbose', dest='verbose', help='increase output verbosity', action='store_true')
  258. args = parser.parse_args()
  259. if args.secver == 2 and args.sec2_gen_cred:
  260. if not args.sec2_usr or not args.sec2_pwd:
  261. raise ValueError('Username/password cannot be empty for security scheme 2 (SRP6a)')
  262. print('==== Salt-verifier for security scheme 2 (SRP6a) ====')
  263. security.sec2_gen_salt_verifier(args.sec2_usr, args.sec2_pwd, args.sec2_salt_len)
  264. sys.exit()
  265. if args.version != '':
  266. print(f'==== Esp_Ctrl Version: {args.version} ====')
  267. if args.service_name == '':
  268. args.service_name = 'my_esp_ctrl_device'
  269. if args.transport == 'http':
  270. args.service_name += '.local'
  271. obj_transport = await get_transport(args.transport, args.service_name, not args.dont_check_hostname)
  272. if obj_transport is None:
  273. raise RuntimeError('Failed to establish connection')
  274. # If security version not specified check in capabilities
  275. if args.secver is None:
  276. # First check if capabilities are supported or not
  277. if not await has_capability(obj_transport):
  278. print('Security capabilities could not be determined, please specify "--sec_ver" explicitly')
  279. raise ValueError('Invalid Security Version')
  280. # When no_sec is present, use security 0, else security 1
  281. args.secver = int(not await has_capability(obj_transport, 'no_sec'))
  282. print(f'==== Security Scheme: {args.secver} ====')
  283. if (args.secver == 1):
  284. if not await has_capability(obj_transport, 'no_pop'):
  285. if len(args.pop) == 0:
  286. print('---- Proof of Possession argument not provided ----')
  287. exit(2)
  288. elif len(args.pop) != 0:
  289. print('---- Proof of Possession will be ignored ----')
  290. args.pop = ''
  291. if (args.secver == 2):
  292. if len(args.sec2_usr) == 0:
  293. args.sec2_usr = input('Security Scheme 2 - SRP6a Username required: ')
  294. if len(args.sec2_pwd) == 0:
  295. prompt_str = 'Security Scheme 2 - SRP6a Password required: '
  296. args.sec2_pwd = getpass(prompt_str)
  297. obj_security = get_security(args.secver, args.sec2_usr, args.sec2_pwd, args.pop, args.verbose)
  298. if obj_security is None:
  299. raise ValueError('Invalid Security Version')
  300. if args.version != '':
  301. print('\n==== Verifying protocol version ====')
  302. if not await version_match(obj_transport, args.version, args.verbose):
  303. raise RuntimeError('Error in protocol version matching')
  304. print('==== Verified protocol version successfully ====')
  305. print('\n==== Starting Session ====')
  306. if not await establish_session(obj_transport, obj_security):
  307. print('Failed to establish session. Ensure that security scheme and proof of possession are correct')
  308. raise RuntimeError('Error in establishing session')
  309. print('==== Session Established ====')
  310. while True:
  311. properties = await get_all_property_values(obj_transport, obj_security)
  312. if len(properties) == 0:
  313. raise RuntimeError('Error in reading property value')
  314. print('\n==== Available Properties ====')
  315. print('{0: >4} {1: <16} {2: <10} {3: <16} {4: <16}'.format(
  316. 'S.N.', 'Name', 'Type', 'Flags', 'Value'))
  317. for i in range(len(properties)):
  318. print('[{0: >2}] {1: <16} {2: <10} {3: <16} {4: <16}'.format(
  319. i + 1, properties[i]['name'], prop_typestr(properties[i]),
  320. ['','Read-Only'][prop_is_readonly(properties[i])],
  321. str(properties[i]['value'])))
  322. select = 0
  323. while True:
  324. try:
  325. inval = input('\nSelect properties to set (0 to re-read, \'q\' to quit) : ')
  326. if inval.lower() == 'q':
  327. print('Quitting...')
  328. exit(0)
  329. invals = inval.split(',')
  330. selections = [int(val) for val in invals]
  331. if min(selections) < 0 or max(selections) > len(properties):
  332. raise ValueError('Invalid input')
  333. break
  334. except ValueError as e:
  335. print(str(e) + '! Retry...')
  336. if len(selections) == 1 and selections[0] == 0:
  337. continue
  338. set_values = []
  339. set_indices = []
  340. for select in selections:
  341. while True:
  342. inval = input('Enter value to set for property (' + properties[select - 1]['name'] + ') : ')
  343. value = encode_prop_value(properties[select - 1],
  344. str_to_prop_value(properties[select - 1], inval))
  345. if value is None:
  346. print('Invalid input! Retry...')
  347. continue
  348. break
  349. set_values += [value]
  350. set_indices += [select - 1]
  351. if not await set_property_values(obj_transport, obj_security, properties, set_indices, set_values):
  352. print('Failed to set values!')
  353. if __name__ == '__main__':
  354. asyncio.run(main())