esp_local_ctrl.py 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422
  1. #!/usr/bin/env python
  2. #
  3. # Copyright 2018 Espressif Systems (Shanghai) PTE LTD
  4. #
  5. # Licensed under the Apache License, Version 2.0 (the "License");
  6. # you may not use this file except in compliance with the License.
  7. # You may obtain a copy of the License at
  8. #
  9. # http://www.apache.org/licenses/LICENSE-2.0
  10. #
  11. # Unless required by applicable law or agreed to in writing, software
  12. # distributed under the License is distributed on an "AS IS" BASIS,
  13. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14. # See the License for the specific language governing permissions and
  15. # limitations under the License.
  16. #
  17. from __future__ import print_function
  18. from future.utils import tobytes
  19. from builtins import input
  20. import os
  21. import sys
  22. import struct
  23. import argparse
  24. import json
  25. import ssl
  26. import textwrap
  27. import proto_lc
  28. try:
  29. import esp_prov
  30. import security
  31. except ImportError:
  32. idf_path = os.environ['IDF_PATH']
  33. sys.path.insert(0, idf_path + '/components/protocomm/python')
  34. sys.path.insert(1, idf_path + '/tools/esp_prov')
  35. import esp_prov
  36. import security
  37. # Set this to true to allow exceptions to be thrown
  38. config_throw_except = False
  39. # Property types enum
  40. PROP_TYPE_TIMESTAMP = 0
  41. PROP_TYPE_INT32 = 1
  42. PROP_TYPE_BOOLEAN = 2
  43. PROP_TYPE_STRING = 3
  44. # Property flags enum
  45. PROP_FLAG_READONLY = (1 << 0)
  46. def prop_typestr(prop):
  47. if prop["type"] == PROP_TYPE_TIMESTAMP:
  48. return "TIME(us)"
  49. elif prop["type"] == PROP_TYPE_INT32:
  50. return "INT32"
  51. elif prop["type"] == PROP_TYPE_BOOLEAN:
  52. return "BOOLEAN"
  53. elif prop["type"] == PROP_TYPE_STRING:
  54. return "STRING"
  55. return "UNKNOWN"
  56. def encode_prop_value(prop, value):
  57. try:
  58. if prop["type"] == PROP_TYPE_TIMESTAMP:
  59. return struct.pack('q', value)
  60. elif prop["type"] == PROP_TYPE_INT32:
  61. return struct.pack('i', value)
  62. elif prop["type"] == PROP_TYPE_BOOLEAN:
  63. return struct.pack('?', value)
  64. elif prop["type"] == PROP_TYPE_STRING:
  65. return tobytes(value)
  66. return value
  67. except struct.error as e:
  68. print(e)
  69. return None
  70. def decode_prop_value(prop, value):
  71. try:
  72. if prop["type"] == PROP_TYPE_TIMESTAMP:
  73. return struct.unpack('q', value)[0]
  74. elif prop["type"] == PROP_TYPE_INT32:
  75. return struct.unpack('i', value)[0]
  76. elif prop["type"] == PROP_TYPE_BOOLEAN:
  77. return struct.unpack('?', value)[0]
  78. elif prop["type"] == PROP_TYPE_STRING:
  79. return value.decode('latin-1')
  80. return value
  81. except struct.error as e:
  82. print(e)
  83. return None
  84. def str_to_prop_value(prop, strval):
  85. try:
  86. if prop["type"] == PROP_TYPE_TIMESTAMP:
  87. return int(strval)
  88. elif prop["type"] == PROP_TYPE_INT32:
  89. return int(strval)
  90. elif prop["type"] == PROP_TYPE_BOOLEAN:
  91. return bool(strval)
  92. elif prop["type"] == PROP_TYPE_STRING:
  93. return strval
  94. return strval
  95. except ValueError as e:
  96. print(e)
  97. return None
  98. def prop_is_readonly(prop):
  99. return (prop["flags"] & PROP_FLAG_READONLY) != 0
  100. def on_except(err):
  101. if config_throw_except:
  102. raise RuntimeError(err)
  103. else:
  104. print(err)
  105. def get_security(secver, pop=None, verbose=False):
  106. if secver == 1:
  107. return security.Security1(pop, verbose)
  108. elif secver == 0:
  109. return security.Security0(verbose)
  110. return None
  111. def get_transport(sel_transport, service_name, check_hostname):
  112. try:
  113. tp = None
  114. if (sel_transport == 'http'):
  115. example_path = os.environ['IDF_PATH'] + "/examples/protocols/esp_local_ctrl"
  116. cert_path = example_path + "/main/certs/rootCA.pem"
  117. ssl_ctx = ssl.create_default_context(cafile=cert_path)
  118. ssl_ctx.check_hostname = check_hostname
  119. tp = esp_prov.transport.Transport_HTTP(service_name, ssl_ctx)
  120. elif (sel_transport == 'ble'):
  121. tp = esp_prov.transport.Transport_BLE(
  122. devname=service_name, service_uuid='0000ffff-0000-1000-8000-00805f9b34fb',
  123. nu_lookup={'esp_local_ctrl/version': '0001',
  124. 'esp_local_ctrl/session': '0002',
  125. 'esp_local_ctrl/control': '0003'}
  126. )
  127. return tp
  128. except RuntimeError as e:
  129. on_except(e)
  130. return None
  131. def version_match(tp, protover, verbose=False):
  132. try:
  133. response = tp.send_data('proto-ver', protover)
  134. if verbose:
  135. print('proto-ver response : ', response)
  136. # First assume this to be a simple version string
  137. if response.lower() == protover.lower():
  138. return True
  139. try:
  140. # Else interpret this as JSON structure containing
  141. # information with versions and capabilities of both
  142. # provisioning service and application
  143. info = json.loads(response)
  144. if info['prov']['ver'].lower() == protover.lower():
  145. return True
  146. except ValueError:
  147. # If decoding as JSON fails, it means that capabilities
  148. # are not supported
  149. return False
  150. except Exception as e:
  151. on_except(e)
  152. return None
  153. def has_capability(tp, capability='none', verbose=False):
  154. # Note : default value of `capability` argument cannot be empty string
  155. # because protocomm_httpd expects non zero content lengths
  156. try:
  157. response = tp.send_data('proto-ver', capability)
  158. if verbose:
  159. print('proto-ver response : ', response)
  160. try:
  161. # Interpret this as JSON structure containing
  162. # information with versions and capabilities of both
  163. # provisioning service and application
  164. info = json.loads(response)
  165. supported_capabilities = info['prov']['cap']
  166. if capability.lower() == 'none':
  167. # No specific capability to check, but capabilities
  168. # feature is present so return True
  169. return True
  170. elif capability in supported_capabilities:
  171. return True
  172. return False
  173. except ValueError:
  174. # If decoding as JSON fails, it means that capabilities
  175. # are not supported
  176. return False
  177. except RuntimeError as e:
  178. on_except(e)
  179. return False
  180. def establish_session(tp, sec):
  181. try:
  182. response = None
  183. while True:
  184. request = sec.security_session(response)
  185. if request is None:
  186. break
  187. response = tp.send_data('esp_local_ctrl/session', request)
  188. if (response is None):
  189. return False
  190. return True
  191. except RuntimeError as e:
  192. on_except(e)
  193. return None
  194. def get_all_property_values(tp, security_ctx):
  195. try:
  196. props = []
  197. message = proto_lc.get_prop_count_request(security_ctx)
  198. response = tp.send_data('esp_local_ctrl/control', message)
  199. count = proto_lc.get_prop_count_response(security_ctx, response)
  200. if count == 0:
  201. raise RuntimeError("No properties found!")
  202. indices = [i for i in range(count)]
  203. message = proto_lc.get_prop_vals_request(security_ctx, indices)
  204. response = tp.send_data('esp_local_ctrl/control', message)
  205. props = proto_lc.get_prop_vals_response(security_ctx, response)
  206. if len(props) != count:
  207. raise RuntimeError('Incorrect count of properties!', len(props), count)
  208. for p in props:
  209. p["value"] = decode_prop_value(p, p["value"])
  210. return props
  211. except RuntimeError as e:
  212. on_except(e)
  213. return []
  214. def set_property_values(tp, security_ctx, props, indices, values, check_readonly=False):
  215. try:
  216. if check_readonly:
  217. for index in indices:
  218. if prop_is_readonly(props[index]):
  219. raise RuntimeError('Cannot set value of Read-Only property')
  220. message = proto_lc.set_prop_vals_request(security_ctx, indices, values)
  221. response = tp.send_data('esp_local_ctrl/control', message)
  222. return proto_lc.set_prop_vals_response(security_ctx, response)
  223. except RuntimeError as e:
  224. on_except(e)
  225. return False
  226. def desc_format(*args):
  227. desc = ''
  228. for arg in args:
  229. desc += textwrap.fill(replace_whitespace=False, text=arg) + '\n'
  230. return desc
  231. if __name__ == '__main__':
  232. parser = argparse.ArgumentParser(add_help=False)
  233. parser = argparse.ArgumentParser(description="Control an ESP32 running esp_local_ctrl service")
  234. parser.add_argument("--version", dest='version', type=str,
  235. help="Protocol version", default='')
  236. parser.add_argument("--transport", dest='transport', type=str,
  237. help="transport i.e http or ble", default='http')
  238. parser.add_argument("--name", dest='service_name', type=str,
  239. help="BLE Device Name / HTTP Server hostname or IP", default='')
  240. parser.add_argument('--sec_ver', dest='secver', type=int, default=None,
  241. help=desc_format(
  242. 'Protocomm security scheme used by the provisioning service for secure '
  243. 'session establishment. Accepted values are :',
  244. '\t- 0 : No security',
  245. '\t- 1 : X25519 key exchange + AES-CTR encryption',
  246. '\t + Authentication using Proof of Possession (PoP)',
  247. 'In case device side application uses IDF\'s provisioning manager, '
  248. 'the compatible security version is automatically determined from '
  249. 'capabilities retrieved via the version endpoint'))
  250. parser.add_argument('--pop', dest='pop', type=str, default='',
  251. help=desc_format(
  252. 'This specifies the Proof of possession (PoP) when security scheme 1 '
  253. 'is used'))
  254. parser.add_argument('--dont-check-hostname', action='store_true',
  255. # If enabled, the certificate won't be rejected for hostname mismatch.
  256. # This option is hidden because it should be used only for testing purposes.
  257. help=argparse.SUPPRESS)
  258. parser.add_argument("-v", "--verbose", dest='verbose', help="increase output verbosity", action="store_true")
  259. args = parser.parse_args()
  260. if args.version != '':
  261. print("==== Esp_Ctrl Version: " + args.version + " ====")
  262. if args.service_name == '':
  263. args.service_name = 'my_esp_ctrl_device'
  264. if args.transport == 'http':
  265. args.service_name += '.local'
  266. obj_transport = get_transport(args.transport, args.service_name, not args.dont_check_hostname)
  267. if obj_transport is None:
  268. print("---- Invalid transport ----")
  269. exit(1)
  270. # If security version not specified check in capabilities
  271. if args.secver is None:
  272. # First check if capabilities are supported or not
  273. if not has_capability(obj_transport):
  274. print('Security capabilities could not be determined. Please specify \'--sec_ver\' explicitly')
  275. print('---- Invalid Security Version ----')
  276. exit(2)
  277. # When no_sec is present, use security 0, else security 1
  278. args.secver = int(not has_capability(obj_transport, 'no_sec'))
  279. print('Security scheme determined to be :', args.secver)
  280. if (args.secver != 0) and not has_capability(obj_transport, 'no_pop'):
  281. if len(args.pop) == 0:
  282. print('---- Proof of Possession argument not provided ----')
  283. exit(2)
  284. elif len(args.pop) != 0:
  285. print('---- Proof of Possession will be ignored ----')
  286. args.pop = ''
  287. obj_security = get_security(args.secver, args.pop, False)
  288. if obj_security is None:
  289. print('---- Invalid Security Version ----')
  290. exit(2)
  291. if args.version != '':
  292. print("\n==== Verifying protocol version ====")
  293. if not version_match(obj_transport, args.version, args.verbose):
  294. print("---- Error in protocol version matching ----")
  295. exit(2)
  296. print("==== Verified protocol version successfully ====")
  297. print('\n==== Starting Session ====')
  298. if not establish_session(obj_transport, obj_security):
  299. print('Failed to establish session. Ensure that security scheme and proof of possession are correct')
  300. print('---- Error in establishing session ----')
  301. exit(3)
  302. print('==== Session Established ====')
  303. while True:
  304. properties = get_all_property_values(obj_transport, obj_security)
  305. if len(properties) == 0:
  306. print("---- Error in reading property values ----")
  307. exit(4)
  308. print("\n==== Available Properties ====")
  309. print("{0: >4} {1: <16} {2: <10} {3: <16} {4: <16}".format(
  310. "S.N.", "Name", "Type", "Flags", "Value"))
  311. for i in range(len(properties)):
  312. print("[{0: >2}] {1: <16} {2: <10} {3: <16} {4: <16}".format(
  313. i + 1, properties[i]["name"], prop_typestr(properties[i]),
  314. ["","Read-Only"][prop_is_readonly(properties[i])],
  315. str(properties[i]["value"])))
  316. select = 0
  317. while True:
  318. try:
  319. inval = input('\nSelect properties to set (0 to re-read, \'q\' to quit) : ')
  320. if inval.lower() == 'q':
  321. print("Quitting...")
  322. exit(5)
  323. invals = inval.split(',')
  324. selections = [int(val) for val in invals]
  325. if min(selections) < 0 or max(selections) > len(properties):
  326. raise ValueError("Invalid input")
  327. break
  328. except ValueError as e:
  329. print(str(e) + "! Retry...")
  330. if len(selections) == 1 and selections[0] == 0:
  331. continue
  332. set_values = []
  333. set_indices = []
  334. for select in selections:
  335. while True:
  336. inval = input("Enter value to set for property (" + properties[select - 1]["name"] + ") : ")
  337. value = encode_prop_value(properties[select - 1],
  338. str_to_prop_value(properties[select - 1], inval))
  339. if value is None:
  340. print("Invalid input! Retry...")
  341. continue
  342. break
  343. set_values += [value]
  344. set_indices += [select - 1]
  345. if not set_property_values(obj_transport, obj_security, properties, set_indices, set_values):
  346. print('Failed to set values!')