esp_prov.py 20 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498
  1. #!/usr/bin/env python
  2. #
  3. # SPDX-FileCopyrightText: 2018-2022 Espressif Systems (Shanghai) CO LTD
  4. # SPDX-License-Identifier: Apache-2.0
  5. #
  6. import argparse
  7. import asyncio
  8. import json
  9. import os
  10. import sys
  11. import textwrap
  12. import time
  13. from getpass import getpass
  14. try:
  15. import prov
  16. import security
  17. import transport
  18. except ImportError:
  19. idf_path = os.environ['IDF_PATH']
  20. sys.path.insert(0, idf_path + '/components/protocomm/python')
  21. sys.path.insert(1, idf_path + '/tools/esp_prov')
  22. import prov
  23. import security
  24. import transport
  25. # Set this to true to allow exceptions to be thrown
  26. config_throw_except = False
  27. def on_except(err):
  28. if config_throw_except:
  29. raise RuntimeError(err)
  30. else:
  31. print(err)
  32. def get_security(secver, username, password, pop='', verbose=False):
  33. if secver == 2:
  34. return security.Security2(username, password, verbose)
  35. elif secver == 1:
  36. return security.Security1(pop, verbose)
  37. elif secver == 0:
  38. return security.Security0(verbose)
  39. return None
  40. async def get_transport(sel_transport, service_name):
  41. try:
  42. tp = None
  43. if (sel_transport == 'softap'):
  44. if service_name is None:
  45. service_name = '192.168.4.1:80'
  46. tp = transport.Transport_HTTP(service_name)
  47. elif (sel_transport == 'ble'):
  48. # BLE client is now capable of automatically figuring out
  49. # the primary service from the advertisement data and the
  50. # characteristics corresponding to each endpoint.
  51. # Below, the service_uuid field and 16bit UUIDs in the nu_lookup
  52. # table are provided only to support devices running older firmware,
  53. # in which case, the automated discovery will fail and the client
  54. # will fallback to using the provided UUIDs instead
  55. nu_lookup = {'prov-session': 'ff51', 'prov-config': 'ff52', 'proto-ver': 'ff53'}
  56. tp = transport.Transport_BLE(service_uuid='021a9004-0382-4aea-bff4-6b3f1c5adfb4',
  57. nu_lookup=nu_lookup)
  58. await tp.connect(devname=service_name)
  59. elif (sel_transport == 'console'):
  60. tp = transport.Transport_Console()
  61. return tp
  62. except RuntimeError as e:
  63. on_except(e)
  64. return None
  65. async def version_match(tp, protover, verbose=False):
  66. try:
  67. response = await tp.send_data('proto-ver', protover)
  68. if verbose:
  69. print('proto-ver response : ', response)
  70. # First assume this to be a simple version string
  71. if response.lower() == protover.lower():
  72. return True
  73. try:
  74. # Else interpret this as JSON structure containing
  75. # information with versions and capabilities of both
  76. # provisioning service and application
  77. info = json.loads(response)
  78. if info['prov']['ver'].lower() == protover.lower():
  79. return True
  80. except ValueError:
  81. # If decoding as JSON fails, it means that capabilities
  82. # are not supported
  83. return False
  84. except Exception as e:
  85. on_except(e)
  86. return None
  87. async def has_capability(tp, capability='none', verbose=False):
  88. # Note : default value of `capability` argument cannot be empty string
  89. # because protocomm_httpd expects non zero content lengths
  90. try:
  91. response = await tp.send_data('proto-ver', capability)
  92. if verbose:
  93. print('proto-ver response : ', response)
  94. try:
  95. # Interpret this as JSON structure containing
  96. # information with versions and capabilities of both
  97. # provisioning service and application
  98. info = json.loads(response)
  99. supported_capabilities = info['prov']['cap']
  100. if capability.lower() == 'none':
  101. # No specific capability to check, but capabilities
  102. # feature is present so return True
  103. return True
  104. elif capability in supported_capabilities:
  105. return True
  106. return False
  107. except ValueError:
  108. # If decoding as JSON fails, it means that capabilities
  109. # are not supported
  110. return False
  111. except RuntimeError as e:
  112. on_except(e)
  113. return False
  114. async def get_version(tp):
  115. response = None
  116. try:
  117. response = await tp.send_data('proto-ver', '---')
  118. except RuntimeError as e:
  119. on_except(e)
  120. response = ''
  121. return response
  122. async def establish_session(tp, sec):
  123. try:
  124. response = None
  125. while True:
  126. request = sec.security_session(response)
  127. if request is None:
  128. break
  129. response = await tp.send_data('prov-session', request)
  130. if (response is None):
  131. return False
  132. return True
  133. except RuntimeError as e:
  134. on_except(e)
  135. return None
  136. async def custom_config(tp, sec, custom_info, custom_ver):
  137. try:
  138. message = prov.custom_config_request(sec, custom_info, custom_ver)
  139. response = await tp.send_data('custom-config', message)
  140. return (prov.custom_config_response(sec, response) == 0)
  141. except RuntimeError as e:
  142. on_except(e)
  143. return None
  144. async def custom_data(tp, sec, custom_data):
  145. try:
  146. message = prov.custom_data_request(sec, custom_data)
  147. response = await tp.send_data('custom-data', message)
  148. return (prov.custom_data_response(sec, response) == 0)
  149. except RuntimeError as e:
  150. on_except(e)
  151. return None
  152. async def scan_wifi_APs(sel_transport, tp, sec):
  153. APs = []
  154. group_channels = 0
  155. readlen = 100
  156. if sel_transport == 'softap':
  157. # In case of softAP we must perform the scan on individual channels, one by one,
  158. # so that the Wi-Fi controller gets ample time to send out beacons (necessary to
  159. # maintain connectivity with authenticated stations. As scanning one channel at a
  160. # time will be slow, we can group more than one channels to be scanned in quick
  161. # succession, hence speeding up the scan process. Though if too many channels are
  162. # present in a group, the controller may again miss out on sending beacons. Hence,
  163. # the application must should use an optimum value. The following value usually
  164. # works out in most cases
  165. group_channels = 5
  166. elif sel_transport == 'ble':
  167. # Read at most 4 entries at a time. This is because if we are using BLE transport
  168. # then the response packet size should not exceed the present limit of 256 bytes of
  169. # characteristic value imposed by protocomm_ble. This limit may be removed in the
  170. # future
  171. readlen = 4
  172. try:
  173. message = prov.scan_start_request(sec, blocking=True, group_channels=group_channels)
  174. start_time = time.time()
  175. response = await tp.send_data('prov-scan', message)
  176. stop_time = time.time()
  177. print('++++ Scan process executed in ' + str(stop_time - start_time) + ' sec')
  178. prov.scan_start_response(sec, response)
  179. message = prov.scan_status_request(sec)
  180. response = await tp.send_data('prov-scan', message)
  181. result = prov.scan_status_response(sec, response)
  182. print('++++ Scan results : ' + str(result['count']))
  183. if result['count'] != 0:
  184. index = 0
  185. remaining = result['count']
  186. while remaining:
  187. count = [remaining, readlen][remaining > readlen]
  188. message = prov.scan_result_request(sec, index, count)
  189. response = await tp.send_data('prov-scan', message)
  190. APs += prov.scan_result_response(sec, response)
  191. remaining -= count
  192. index += count
  193. except RuntimeError as e:
  194. on_except(e)
  195. return None
  196. return APs
  197. async def send_wifi_config(tp, sec, ssid, passphrase):
  198. try:
  199. message = prov.config_set_config_request(sec, ssid, passphrase)
  200. response = await tp.send_data('prov-config', message)
  201. return (prov.config_set_config_response(sec, response) == 0)
  202. except RuntimeError as e:
  203. on_except(e)
  204. return None
  205. async def apply_wifi_config(tp, sec):
  206. try:
  207. message = prov.config_apply_config_request(sec)
  208. response = await tp.send_data('prov-config', message)
  209. return (prov.config_apply_config_response(sec, response) == 0)
  210. except RuntimeError as e:
  211. on_except(e)
  212. return None
  213. async def get_wifi_config(tp, sec):
  214. try:
  215. message = prov.config_get_status_request(sec)
  216. response = await tp.send_data('prov-config', message)
  217. return prov.config_get_status_response(sec, response)
  218. except RuntimeError as e:
  219. on_except(e)
  220. return None
  221. async def wait_wifi_connected(tp, sec):
  222. """
  223. Wait for provisioning to report Wi-Fi is connected
  224. Returns True if Wi-Fi connection succeeded, False if connection consistently failed
  225. """
  226. TIME_PER_POLL = 5
  227. retry = 3
  228. while True:
  229. time.sleep(TIME_PER_POLL)
  230. print('\n==== Wi-Fi connection state ====')
  231. ret = await get_wifi_config(tp, sec)
  232. if ret == 'connecting':
  233. continue
  234. elif ret == 'connected':
  235. print('==== Provisioning was successful ====')
  236. return True
  237. elif retry > 0:
  238. retry -= 1
  239. print('Waiting to poll status again (status %s, %d tries left)...' % (ret, retry))
  240. else:
  241. print('---- Provisioning failed! ----')
  242. return False
  243. def desc_format(*args):
  244. desc = ''
  245. for arg in args:
  246. desc += textwrap.fill(replace_whitespace=False, text=arg) + '\n'
  247. return desc
  248. async def main():
  249. parser = argparse.ArgumentParser(description=desc_format(
  250. 'ESP Provisioning tool for configuring devices '
  251. 'running protocomm based provisioning service.',
  252. 'See esp-idf/examples/provisioning for sample applications'),
  253. formatter_class=argparse.RawTextHelpFormatter)
  254. parser.add_argument('--transport', required=True, dest='mode', type=str,
  255. help=desc_format(
  256. 'Mode of transport over which provisioning is to be performed.',
  257. 'This should be one of "softap", "ble" or "console"'))
  258. parser.add_argument('--service_name', dest='name', type=str,
  259. help=desc_format(
  260. 'This specifies the name of the provisioning service to connect to, '
  261. 'depending upon the mode of transport :',
  262. '\t- transport "ble" : The BLE Device Name',
  263. '\t- transport "softap" : HTTP Server hostname or IP',
  264. '\t (default "192.168.4.1:80")'))
  265. parser.add_argument('--proto_ver', dest='version', type=str, default='',
  266. help=desc_format(
  267. 'This checks the protocol version of the provisioning service running '
  268. 'on the device before initiating Wi-Fi configuration'))
  269. parser.add_argument('--sec_ver', dest='secver', type=int, default=None,
  270. help=desc_format(
  271. 'Protocomm security scheme used by the provisioning service for secure '
  272. 'session establishment. Accepted values are :',
  273. '\t- 0 : No security',
  274. '\t- 1 : X25519 key exchange + AES-CTR encryption',
  275. '\t + Authentication using Proof of Possession (PoP)',
  276. '\t- 2 : SRP6a + AES-GCM encryption',
  277. 'In case device side application uses IDF\'s provisioning manager, '
  278. 'the compatible security version is automatically determined from '
  279. 'capabilities retrieved via the version endpoint'))
  280. parser.add_argument('--pop', dest='sec1_pop', type=str, default='',
  281. help=desc_format(
  282. 'This specifies the Proof of possession (PoP) when security scheme 1 '
  283. 'is used'))
  284. parser.add_argument('--sec2_username', dest='sec2_usr', type=str, default='',
  285. help=desc_format(
  286. 'Username for security scheme 2 (SRP6a)'))
  287. parser.add_argument('--sec2_pwd', dest='sec2_pwd', type=str, default='',
  288. help=desc_format(
  289. 'Password for security scheme 2 (SRP6a)'))
  290. parser.add_argument('--sec2_gen_cred', help='Generate salt and verifier for security scheme 2 (SRP6a)', action='store_true')
  291. parser.add_argument('--sec2_salt_len', dest='sec2_salt_len', type=int, default=16,
  292. help=desc_format(
  293. 'Salt length for security scheme 2 (SRP6a)'))
  294. parser.add_argument('--ssid', dest='ssid', type=str, default='',
  295. help=desc_format(
  296. 'This configures the device to use SSID of the Wi-Fi network to which '
  297. 'we would like it to connect to permanently, once provisioning is complete. '
  298. 'If Wi-Fi scanning is supported by the provisioning service, this need not '
  299. 'be specified'))
  300. parser.add_argument('--passphrase', dest='passphrase', type=str,
  301. help=desc_format(
  302. 'This configures the device to use Passphrase for the Wi-Fi network to which '
  303. 'we would like it to connect to permanently, once provisioning is complete. '
  304. 'If Wi-Fi scanning is supported by the provisioning service, this need not '
  305. 'be specified'))
  306. parser.add_argument('--custom_data', dest='custom_data', type=str, default='',
  307. help=desc_format(
  308. 'This is an optional parameter, only intended for use with '
  309. '"examples/provisioning/wifi_prov_mgr"'))
  310. parser.add_argument('-v','--verbose', help='Increase output verbosity', action='store_true')
  311. args = parser.parse_args()
  312. if args.secver == 2 and args.sec2_gen_cred:
  313. if not args.sec2_usr or not args.sec2_pwd:
  314. raise ValueError('Username/password cannot be empty for security scheme 2 (SRP6a)')
  315. print('==== Salt-verifier for security scheme 2 (SRP6a) ====')
  316. security.sec2_gen_salt_verifier(args.sec2_usr, args.sec2_pwd, args.sec2_salt_len)
  317. sys.exit()
  318. obj_transport = await get_transport(args.mode.lower(), args.name)
  319. if obj_transport is None:
  320. raise RuntimeError('Failed to establish connection')
  321. try:
  322. # If security version not specified check in capabilities
  323. if args.secver is None:
  324. # First check if capabilities are supported or not
  325. if not await has_capability(obj_transport):
  326. print('Security capabilities could not be determined, please specify "--sec_ver" explicitly')
  327. raise ValueError('Invalid Security Version')
  328. # When no_sec is present, use security 0, else security 1
  329. args.secver = int(not await has_capability(obj_transport, 'no_sec'))
  330. print(f'==== Security Scheme: {args.secver} ====')
  331. if (args.secver == 1):
  332. if not await has_capability(obj_transport, 'no_pop'):
  333. if len(args.sec1_pop) == 0:
  334. prompt_str = 'Proof of Possession required: '
  335. args.sec1_pop = getpass(prompt_str)
  336. elif len(args.sec1_pop) != 0:
  337. print('Proof of Possession will be ignored')
  338. args.sec1_pop = ''
  339. if (args.secver == 2):
  340. if len(args.sec2_usr) == 0:
  341. args.sec2_usr = input('Security Scheme 2 - SRP6a Username required: ')
  342. if len(args.sec2_pwd) == 0:
  343. prompt_str = 'Security Scheme 2 - SRP6a Password required: '
  344. args.sec2_pwd = getpass(prompt_str)
  345. obj_security = get_security(args.secver, args.sec2_usr, args.sec2_pwd, args.sec1_pop, args.verbose)
  346. if obj_security is None:
  347. raise ValueError('Invalid Security Version')
  348. if args.version != '':
  349. print('\n==== Verifying protocol version ====')
  350. if not await version_match(obj_transport, args.version, args.verbose):
  351. raise RuntimeError('Error in protocol version matching')
  352. print('==== Verified protocol version successfully ====')
  353. print('\n==== Starting Session ====')
  354. if not await establish_session(obj_transport, obj_security):
  355. print('Failed to establish session. Ensure that security scheme and proof of possession are correct')
  356. raise RuntimeError('Error in establishing session')
  357. print('==== Session Established ====')
  358. if args.custom_data != '':
  359. print('\n==== Sending Custom data to Target ====')
  360. if not await custom_data(obj_transport, obj_security, args.custom_data):
  361. raise RuntimeError('Error in custom data')
  362. print('==== Custom data sent successfully ====')
  363. if args.ssid == '':
  364. if not await has_capability(obj_transport, 'wifi_scan'):
  365. raise RuntimeError('Wi-Fi Scan List is not supported by provisioning service')
  366. while True:
  367. print('\n==== Scanning Wi-Fi APs ====')
  368. start_time = time.time()
  369. APs = await scan_wifi_APs(args.mode.lower(), obj_transport, obj_security)
  370. end_time = time.time()
  371. print('\n++++ Scan finished in ' + str(end_time - start_time) + ' sec')
  372. if APs is None:
  373. raise RuntimeError('Error in scanning Wi-Fi APs')
  374. if len(APs) == 0:
  375. print('No APs found!')
  376. sys.exit()
  377. print('==== Wi-Fi Scan results ====')
  378. print('{0: >4} {1: <33} {2: <12} {3: >4} {4: <4} {5: <16}'.format(
  379. 'S.N.', 'SSID', 'BSSID', 'CHN', 'RSSI', 'AUTH'))
  380. for i in range(len(APs)):
  381. print('[{0: >2}] {1: <33} {2: <12} {3: >4} {4: <4} {5: <16}'.format(
  382. i + 1, APs[i]['ssid'], APs[i]['bssid'], APs[i]['channel'], APs[i]['rssi'], APs[i]['auth']))
  383. while True:
  384. try:
  385. select = int(input('Select AP by number (0 to rescan) : '))
  386. if select < 0 or select > len(APs):
  387. raise ValueError
  388. break
  389. except ValueError:
  390. print('Invalid input! Retry')
  391. if select != 0:
  392. break
  393. args.ssid = APs[select - 1]['ssid']
  394. if args.passphrase is None:
  395. prompt_str = 'Enter passphrase for {0} : '.format(args.ssid)
  396. args.passphrase = getpass(prompt_str)
  397. print('\n==== Sending Wi-Fi Credentials to Target ====')
  398. if not await send_wifi_config(obj_transport, obj_security, args.ssid, args.passphrase):
  399. raise RuntimeError('Error in send Wi-Fi config')
  400. print('==== Wi-Fi Credentials sent successfully ====')
  401. print('\n==== Applying Wi-Fi Config to Target ====')
  402. if not await apply_wifi_config(obj_transport, obj_security):
  403. raise RuntimeError('Error in apply Wi-Fi config')
  404. print('==== Apply config sent successfully ====')
  405. await wait_wifi_connected(obj_transport, obj_security)
  406. finally:
  407. await obj_transport.disconnect()
  408. if __name__ == '__main__':
  409. asyncio.run(main())