| 1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586 |
- # SPDX-FileCopyrightText: 2015-2022 Espressif Systems (Shanghai) CO LTD
- # SPDX-License-Identifier: Apache-2.0
- import os
- import queue
- import time
- from serial.tools.miniterm import Console
- from .console_parser import ConsoleParser
- from .constants import CMD_STOP, TAG_CMD
- from .stoppable_thread import StoppableThread
- class ConsoleReader(StoppableThread):
- """ Read input keys from the console and push them to the queue,
- until stopped.
- """
- def __init__(self, console, event_queue, cmd_queue, parser, test_mode):
- # type: (Console, queue.Queue, queue.Queue, ConsoleParser, bool) -> None
- super(ConsoleReader, self).__init__()
- self.console = console
- self.event_queue = event_queue
- self.cmd_queue = cmd_queue
- self.parser = parser
- self.test_mode = test_mode
- def run(self):
- # type: () -> None
- self.console.setup()
- try:
- while self.alive:
- try:
- if os.name == 'nt':
- # Windows kludge: because the console.cancel() method doesn't
- # seem to work to unblock getkey() on the Windows implementation.
- #
- # So we only call getkey() if we know there's a key waiting for us.
- import msvcrt
- while not msvcrt.kbhit() and self.alive: # type: ignore
- time.sleep(0.1)
- if not self.alive:
- break
- elif self.test_mode:
- # In testing mode the stdin is connected to PTY but is not used for input anything. For PTY
- # the canceling by fcntl.ioctl isn't working and would hang in self.console.getkey().
- # Therefore, we avoid calling it.
- while self.alive:
- time.sleep(0.1)
- break
- c = self.console.getkey()
- except KeyboardInterrupt:
- c = '\x03'
- if c is not None:
- ret = self.parser.parse(c)
- if ret is not None:
- (tag, cmd) = ret
- # stop command should be executed last
- if tag == TAG_CMD and cmd != CMD_STOP:
- self.cmd_queue.put(ret)
- else:
- self.event_queue.put(ret)
- finally:
- self.console.cleanup()
- def _cancel(self):
- # type: () -> None
- if os.name == 'posix' and not self.test_mode:
- # this is the way cancel() is implemented in pyserial 3.3 or newer,
- # older pyserial (3.1+) has cancellation implemented via 'select',
- # which does not work when console sends an escape sequence response
- #
- # even older pyserial (<3.1) does not have this method
- #
- # on Windows there is a different (also hacky) fix, applied above.
- #
- # note that TIOCSTI is not implemented in WSL / bash-on-Windows.
- # TODO: introduce some workaround to make it work there.
- #
- # Note: This would throw exception in testing mode when the stdin is connected to PTY.
- import fcntl
- import termios
- fcntl.ioctl(self.console.fd, termios.TIOCSTI, b'\0')
|