subprocess.py 7.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218
  1. __all__ = 'create_subprocess_exec', 'create_subprocess_shell'
  2. import subprocess
  3. from . import events
  4. from . import protocols
  5. from . import streams
  6. from . import tasks
  7. from .log import logger
  8. PIPE = subprocess.PIPE
  9. STDOUT = subprocess.STDOUT
  10. DEVNULL = subprocess.DEVNULL
  11. class SubprocessStreamProtocol(streams.FlowControlMixin,
  12. protocols.SubprocessProtocol):
  13. """Like StreamReaderProtocol, but for a subprocess."""
  14. def __init__(self, limit, loop):
  15. super().__init__(loop=loop)
  16. self._limit = limit
  17. self.stdin = self.stdout = self.stderr = None
  18. self._transport = None
  19. self._process_exited = False
  20. self._pipe_fds = []
  21. def __repr__(self):
  22. info = [self.__class__.__name__]
  23. if self.stdin is not None:
  24. info.append(f'stdin={self.stdin!r}')
  25. if self.stdout is not None:
  26. info.append(f'stdout={self.stdout!r}')
  27. if self.stderr is not None:
  28. info.append(f'stderr={self.stderr!r}')
  29. return '<{}>'.format(' '.join(info))
  30. def connection_made(self, transport):
  31. self._transport = transport
  32. stdout_transport = transport.get_pipe_transport(1)
  33. if stdout_transport is not None:
  34. self.stdout = streams.StreamReader(limit=self._limit,
  35. loop=self._loop)
  36. self.stdout.set_transport(stdout_transport)
  37. self._pipe_fds.append(1)
  38. stderr_transport = transport.get_pipe_transport(2)
  39. if stderr_transport is not None:
  40. self.stderr = streams.StreamReader(limit=self._limit,
  41. loop=self._loop)
  42. self.stderr.set_transport(stderr_transport)
  43. self._pipe_fds.append(2)
  44. stdin_transport = transport.get_pipe_transport(0)
  45. if stdin_transport is not None:
  46. self.stdin = streams.StreamWriter(stdin_transport,
  47. protocol=self,
  48. reader=None,
  49. loop=self._loop)
  50. def pipe_data_received(self, fd, data):
  51. if fd == 1:
  52. reader = self.stdout
  53. elif fd == 2:
  54. reader = self.stderr
  55. else:
  56. reader = None
  57. if reader is not None:
  58. reader.feed_data(data)
  59. def pipe_connection_lost(self, fd, exc):
  60. if fd == 0:
  61. pipe = self.stdin
  62. if pipe is not None:
  63. pipe.close()
  64. self.connection_lost(exc)
  65. return
  66. if fd == 1:
  67. reader = self.stdout
  68. elif fd == 2:
  69. reader = self.stderr
  70. else:
  71. reader = None
  72. if reader is not None:
  73. if exc is None:
  74. reader.feed_eof()
  75. else:
  76. reader.set_exception(exc)
  77. if fd in self._pipe_fds:
  78. self._pipe_fds.remove(fd)
  79. self._maybe_close_transport()
  80. def process_exited(self):
  81. self._process_exited = True
  82. self._maybe_close_transport()
  83. def _maybe_close_transport(self):
  84. if len(self._pipe_fds) == 0 and self._process_exited:
  85. self._transport.close()
  86. self._transport = None
  87. class Process:
  88. def __init__(self, transport, protocol, loop):
  89. self._transport = transport
  90. self._protocol = protocol
  91. self._loop = loop
  92. self.stdin = protocol.stdin
  93. self.stdout = protocol.stdout
  94. self.stderr = protocol.stderr
  95. self.pid = transport.get_pid()
  96. def __repr__(self):
  97. return f'<{self.__class__.__name__} {self.pid}>'
  98. @property
  99. def returncode(self):
  100. return self._transport.get_returncode()
  101. async def wait(self):
  102. """Wait until the process exit and return the process return code."""
  103. return await self._transport._wait()
  104. def send_signal(self, signal):
  105. self._transport.send_signal(signal)
  106. def terminate(self):
  107. self._transport.terminate()
  108. def kill(self):
  109. self._transport.kill()
  110. async def _feed_stdin(self, input):
  111. debug = self._loop.get_debug()
  112. self.stdin.write(input)
  113. if debug:
  114. logger.debug(
  115. '%r communicate: feed stdin (%s bytes)', self, len(input))
  116. try:
  117. await self.stdin.drain()
  118. except (BrokenPipeError, ConnectionResetError) as exc:
  119. # communicate() ignores BrokenPipeError and ConnectionResetError
  120. if debug:
  121. logger.debug('%r communicate: stdin got %r', self, exc)
  122. if debug:
  123. logger.debug('%r communicate: close stdin', self)
  124. self.stdin.close()
  125. async def _noop(self):
  126. return None
  127. async def _read_stream(self, fd):
  128. transport = self._transport.get_pipe_transport(fd)
  129. if fd == 2:
  130. stream = self.stderr
  131. else:
  132. assert fd == 1
  133. stream = self.stdout
  134. if self._loop.get_debug():
  135. name = 'stdout' if fd == 1 else 'stderr'
  136. logger.debug('%r communicate: read %s', self, name)
  137. output = await stream.read()
  138. if self._loop.get_debug():
  139. name = 'stdout' if fd == 1 else 'stderr'
  140. logger.debug('%r communicate: close %s', self, name)
  141. transport.close()
  142. return output
  143. async def communicate(self, input=None):
  144. if input is not None:
  145. stdin = self._feed_stdin(input)
  146. else:
  147. stdin = self._noop()
  148. if self.stdout is not None:
  149. stdout = self._read_stream(1)
  150. else:
  151. stdout = self._noop()
  152. if self.stderr is not None:
  153. stderr = self._read_stream(2)
  154. else:
  155. stderr = self._noop()
  156. stdin, stdout, stderr = await tasks.gather(stdin, stdout, stderr,
  157. loop=self._loop)
  158. await self.wait()
  159. return (stdout, stderr)
  160. async def create_subprocess_shell(cmd, stdin=None, stdout=None, stderr=None,
  161. loop=None, limit=streams._DEFAULT_LIMIT,
  162. **kwds):
  163. if loop is None:
  164. loop = events.get_event_loop()
  165. protocol_factory = lambda: SubprocessStreamProtocol(limit=limit,
  166. loop=loop)
  167. transport, protocol = await loop.subprocess_shell(
  168. protocol_factory,
  169. cmd, stdin=stdin, stdout=stdout,
  170. stderr=stderr, **kwds)
  171. return Process(transport, protocol, loop)
  172. async def create_subprocess_exec(program, *args, stdin=None, stdout=None,
  173. stderr=None, loop=None,
  174. limit=streams._DEFAULT_LIMIT, **kwds):
  175. if loop is None:
  176. loop = events.get_event_loop()
  177. protocol_factory = lambda: SubprocessStreamProtocol(limit=limit,
  178. loop=loop)
  179. transport, protocol = await loop.subprocess_exec(
  180. protocol_factory,
  181. program, *args,
  182. stdin=stdin, stdout=stdout,
  183. stderr=stderr, **kwds)
  184. return Process(transport, protocol, loop)