web_socket_client.py 3.3 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798
  1. # SPDX-FileCopyrightText: 2015-2022 Espressif Systems (Shanghai) CO LTD
  2. # SPDX-License-Identifier: Apache-2.0
  3. import json
  4. import time
  5. from .output_helpers import red_print, yellow_print
  6. try:
  7. import websocket
  8. except ImportError:
  9. # This is needed for IDE integration only.
  10. pass
  11. class WebSocketClient(object):
  12. """
  13. WebSocket client used to advertise debug events to WebSocket server by sending and receiving JSON-serialized
  14. dictionaries.
  15. Advertisement of debug event:
  16. {'event': 'gdb_stub', 'port': '/dev/ttyUSB1', 'prog': 'build/elf_file'} for GDB Stub, or
  17. {'event': 'coredump', 'file': '/tmp/xy', 'prog': 'build/elf_file'} for coredump,
  18. where 'port' is the port for the connected device, 'prog' is the full path to the ELF file and 'file' is the
  19. generated coredump file.
  20. Expected end of external debugging:
  21. {'event': 'debug_finished'}
  22. """
  23. RETRIES = 3
  24. CONNECTION_RETRY_DELAY = 1
  25. def __init__(self, url): # type: (str) -> None
  26. self.url = url
  27. self._connect()
  28. def _connect(self): # type: () -> None
  29. """
  30. Connect to WebSocket server at url
  31. """
  32. self.close()
  33. for _ in range(self.RETRIES):
  34. try:
  35. self.ws = websocket.create_connection(self.url)
  36. break # success
  37. except NameError:
  38. raise RuntimeError('Please install the websocket_client package for IDE integration!')
  39. except Exception as e: # noqa
  40. red_print('WebSocket connection error: {}'.format(e))
  41. time.sleep(self.CONNECTION_RETRY_DELAY)
  42. else:
  43. raise RuntimeError('Cannot connect to WebSocket server')
  44. def close(self): # type: () -> None
  45. try:
  46. self.ws.close()
  47. except AttributeError:
  48. # Not yet connected
  49. pass
  50. except Exception as e: # noqa
  51. red_print('WebSocket close error: {}'.format(e))
  52. def send(self, payload_dict): # type: (dict) -> None
  53. """
  54. Serialize payload_dict in JSON format and send it to the server
  55. """
  56. for _ in range(self.RETRIES):
  57. try:
  58. self.ws.send(json.dumps(payload_dict))
  59. yellow_print('WebSocket sent: {}'.format(payload_dict))
  60. break
  61. except Exception as e: # noqa
  62. red_print('WebSocket send error: {}'.format(e))
  63. self._connect()
  64. else:
  65. raise RuntimeError('Cannot send to WebSocket server')
  66. def wait(self, expect_iterable): # type: (list) -> None
  67. """
  68. Wait until a dictionary in JSON format is received from the server with all (key, value) tuples from
  69. expect_iterable.
  70. """
  71. for _ in range(self.RETRIES):
  72. try:
  73. r = self.ws.recv()
  74. except Exception as e:
  75. red_print('WebSocket receive error: {}'.format(e))
  76. self._connect()
  77. continue
  78. obj = json.loads(r)
  79. if all([k in obj and obj[k] == v for k, v in expect_iterable]):
  80. yellow_print('WebSocket received: {}'.format(obj))
  81. break
  82. red_print('WebSocket expected: {}, received: {}'.format(dict(expect_iterable), obj))
  83. else:
  84. raise RuntimeError('Cannot receive from WebSocket server')