console_reader.py 3.4 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586
  1. # SPDX-FileCopyrightText: 2015-2022 Espressif Systems (Shanghai) CO LTD
  2. # SPDX-License-Identifier: Apache-2.0
  3. import os
  4. import queue
  5. import time
  6. from serial.tools.miniterm import Console
  7. from .console_parser import ConsoleParser
  8. from .constants import CMD_STOP, TAG_CMD
  9. from .stoppable_thread import StoppableThread
  10. class ConsoleReader(StoppableThread):
  11. """ Read input keys from the console and push them to the queue,
  12. until stopped.
  13. """
  14. def __init__(self, console, event_queue, cmd_queue, parser, test_mode):
  15. # type: (Console, queue.Queue, queue.Queue, ConsoleParser, bool) -> None
  16. super(ConsoleReader, self).__init__()
  17. self.console = console
  18. self.event_queue = event_queue
  19. self.cmd_queue = cmd_queue
  20. self.parser = parser
  21. self.test_mode = test_mode
  22. def run(self):
  23. # type: () -> None
  24. self.console.setup()
  25. try:
  26. while self.alive:
  27. try:
  28. if os.name == 'nt':
  29. # Windows kludge: because the console.cancel() method doesn't
  30. # seem to work to unblock getkey() on the Windows implementation.
  31. #
  32. # So we only call getkey() if we know there's a key waiting for us.
  33. import msvcrt
  34. while not msvcrt.kbhit() and self.alive: # type: ignore
  35. time.sleep(0.1)
  36. if not self.alive:
  37. break
  38. elif self.test_mode:
  39. # In testing mode the stdin is connected to PTY but is not used for input anything. For PTY
  40. # the canceling by fcntl.ioctl isn't working and would hang in self.console.getkey().
  41. # Therefore, we avoid calling it.
  42. while self.alive:
  43. time.sleep(0.1)
  44. break
  45. c = self.console.getkey()
  46. except KeyboardInterrupt:
  47. c = '\x03'
  48. if c is not None:
  49. ret = self.parser.parse(c)
  50. if ret is not None:
  51. (tag, cmd) = ret
  52. # stop command should be executed last
  53. if tag == TAG_CMD and cmd != CMD_STOP:
  54. self.cmd_queue.put(ret)
  55. else:
  56. self.event_queue.put(ret)
  57. finally:
  58. self.console.cleanup()
  59. def _cancel(self):
  60. # type: () -> None
  61. if os.name == 'posix' and not self.test_mode:
  62. # this is the way cancel() is implemented in pyserial 3.3 or newer,
  63. # older pyserial (3.1+) has cancellation implemented via 'select',
  64. # which does not work when console sends an escape sequence response
  65. #
  66. # even older pyserial (<3.1) does not have this method
  67. #
  68. # on Windows there is a different (also hacky) fix, applied above.
  69. #
  70. # note that TIOCSTI is not implemented in WSL / bash-on-Windows.
  71. # TODO: introduce some workaround to make it work there.
  72. #
  73. # Note: This would throw exception in testing mode when the stdin is connected to PTY.
  74. import fcntl
  75. import termios
  76. fcntl.ioctl(self.console.fd, termios.TIOCSTI, b'\0')