console_reader.py 4.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101
  1. # Copyright 2015-2021 Espressif Systems (Shanghai) CO LTD
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the "License");
  4. # you may not use this file except in compliance with the License.
  5. # You may obtain a copy of the License at
  6. #
  7. # http://www.apache.org/licenses/LICENSE-2.0
  8. #
  9. # Unless required by applicable law or agreed to in writing, software
  10. # distributed under the License is distributed on an "AS IS" BASIS,
  11. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. # See the License for the specific language governing permissions and
  13. # limitations under the License.
  14. import os
  15. import time
  16. from serial.tools.miniterm import Console
  17. from .console_parser import ConsoleParser
  18. from .constants import CMD_STOP, TAG_CMD
  19. from .stoppable_thread import StoppableThread
  20. try:
  21. import queue
  22. except ImportError:
  23. import Queue as queue # type: ignore
  24. class ConsoleReader(StoppableThread):
  25. """ Read input keys from the console and push them to the queue,
  26. until stopped.
  27. """
  28. def __init__(self, console, event_queue, cmd_queue, parser, test_mode):
  29. # type: (Console, queue.Queue, queue.Queue, ConsoleParser, bool) -> None
  30. super(ConsoleReader, self).__init__()
  31. self.console = console
  32. self.event_queue = event_queue
  33. self.cmd_queue = cmd_queue
  34. self.parser = parser
  35. self.test_mode = test_mode
  36. def run(self):
  37. # type: () -> None
  38. self.console.setup()
  39. try:
  40. while self.alive:
  41. try:
  42. if os.name == 'nt':
  43. # Windows kludge: because the console.cancel() method doesn't
  44. # seem to work to unblock getkey() on the Windows implementation.
  45. #
  46. # So we only call getkey() if we know there's a key waiting for us.
  47. import msvcrt
  48. while not msvcrt.kbhit() and self.alive: # type: ignore
  49. time.sleep(0.1)
  50. if not self.alive:
  51. break
  52. elif self.test_mode:
  53. # In testing mode the stdin is connected to PTY but is not used for input anything. For PTY
  54. # the canceling by fcntl.ioctl isn't working and would hang in self.console.getkey().
  55. # Therefore, we avoid calling it.
  56. while self.alive:
  57. time.sleep(0.1)
  58. break
  59. c = self.console.getkey()
  60. except KeyboardInterrupt:
  61. c = '\x03'
  62. if c is not None:
  63. ret = self.parser.parse(c)
  64. if ret is not None:
  65. (tag, cmd) = ret
  66. # stop command should be executed last
  67. if tag == TAG_CMD and cmd != CMD_STOP:
  68. self.cmd_queue.put(ret)
  69. else:
  70. self.event_queue.put(ret)
  71. finally:
  72. self.console.cleanup()
  73. def _cancel(self):
  74. # type: () -> None
  75. if os.name == 'posix' and not self.test_mode:
  76. # this is the way cancel() is implemented in pyserial 3.3 or newer,
  77. # older pyserial (3.1+) has cancellation implemented via 'select',
  78. # which does not work when console sends an escape sequence response
  79. #
  80. # even older pyserial (<3.1) does not have this method
  81. #
  82. # on Windows there is a different (also hacky) fix, applied above.
  83. #
  84. # note that TIOCSTI is not implemented in WSL / bash-on-Windows.
  85. # TODO: introduce some workaround to make it work there.
  86. #
  87. # Note: This would throw exception in testing mode when the stdin is connected to PTY.
  88. import fcntl
  89. import termios
  90. fcntl.ioctl(self.console.fd, termios.TIOCSTI, b'\0')