unix_events.py 40 KB


  1. """Selector event loop for Unix with signal handling."""
  2. import errno
  3. import io
  4. import os
  5. import selectors
  6. import signal
  7. import socket
  8. import stat
  9. import subprocess
  10. import sys
  11. import threading
  12. import warnings
  13. from . import base_events
  14. from . import base_subprocess
  15. from . import constants
  16. from . import coroutines
  17. from . import events
  18. from . import futures
  19. from . import selector_events
  20. from . import tasks
  21. from . import transports
  22. from .log import logger
  23. __all__ = (
  24. 'SelectorEventLoop',
  25. 'AbstractChildWatcher', 'SafeChildWatcher',
  26. 'FastChildWatcher', 'DefaultEventLoopPolicy',
  27. )
  28. if sys.platform == 'win32': # pragma: no cover
  29. raise ImportError('Signals are not really supported on Windows')
  30. def _sighandler_noop(signum, frame):
  31. """Dummy signal handler."""
  32. pass
  33. class _UnixSelectorEventLoop(selector_events.BaseSelectorEventLoop):
  34. """Unix event loop.
  35. Adds signal handling and UNIX Domain Socket support to SelectorEventLoop.
  36. """
  37. def __init__(self, selector=None):
  38. super().__init__(selector)
  39. self._signal_handlers = {}
  40. def close(self):
  41. super().close()
  42. if not sys.is_finalizing():
  43. for sig in list(self._signal_handlers):
  44. self.remove_signal_handler(sig)
  45. else:
  46. if self._signal_handlers:
  47. warnings.warn(f"Closing the loop {self!r} "
  48. f"on interpreter shutdown "
  49. f"stage, skipping signal handlers removal",
  50. ResourceWarning,
  51. source=self)
  52. self._signal_handlers.clear()
  53. def _process_self_data(self, data):
  54. for signum in data:
  55. if not signum:
  56. # ignore null bytes written by _write_to_self()
  57. continue
  58. self._handle_signal(signum)
  59. def add_signal_handler(self, sig, callback, *args):
  60. """Add a handler for a signal. UNIX only.
  61. Raise ValueError if the signal number is invalid or uncatchable.
  62. Raise RuntimeError if there is a problem setting up the handler.
  63. """
  64. if (coroutines.iscoroutine(callback) or
  65. coroutines.iscoroutinefunction(callback)):
  66. raise TypeError("coroutines cannot be used "
  67. "with add_signal_handler()")
  68. self._check_signal(sig)
  69. self._check_closed()
  70. try:
  71. # set_wakeup_fd() raises ValueError if this is not the
  72. # main thread. By calling it early we ensure that an
  73. # event loop running in another thread cannot add a signal
  74. # handler.
  75. signal.set_wakeup_fd(self._csock.fileno())
  76. except (ValueError, OSError) as exc:
  77. raise RuntimeError(str(exc))
  78. handle = events.Handle(callback, args, self, None)
  79. self._signal_handlers[sig] = handle
  80. try:
  81. # Register a dummy signal handler to ask Python to write the signal
  82. # number in the wakup file descriptor. _process_self_data() will
  83. # read signal numbers from this file descriptor to handle signals.
  84. signal.signal(sig, _sighandler_noop)
  85. # Set SA_RESTART to limit EINTR occurrences.
  86. signal.siginterrupt(sig, False)
  87. except OSError as exc:
  88. del self._signal_handlers[sig]
  89. if not self._signal_handlers:
  90. try:
  91. signal.set_wakeup_fd(-1)
  92. except (ValueError, OSError) as nexc:
  93. logger.info('set_wakeup_fd(-1) failed: %s', nexc)
  94. if exc.errno == errno.EINVAL:
  95. raise RuntimeError(f'sig {sig} cannot be caught')
  96. else:
  97. raise
  98. def _handle_signal(self, sig):
  99. """Internal helper that is the actual signal handler."""
  100. handle = self._signal_handlers.get(sig)
  101. if handle is None:
  102. return # Assume it's some race condition.
  103. if handle._cancelled:
  104. self.remove_signal_handler(sig) # Remove it properly.
  105. else:
  106. self._add_callback_signalsafe(handle)
  107. def remove_signal_handler(self, sig):
  108. """Remove a handler for a signal. UNIX only.
  109. Return True if a signal handler was removed, False if not.
  110. """
  111. self._check_signal(sig)
  112. try:
  113. del self._signal_handlers[sig]
  114. except KeyError:
  115. return False
  116. if sig == signal.SIGINT:
  117. handler = signal.default_int_handler
  118. else:
  119. handler = signal.SIG_DFL
  120. try:
  121. signal.signal(sig, handler)
  122. except OSError as exc:
  123. if exc.errno == errno.EINVAL:
  124. raise RuntimeError(f'sig {sig} cannot be caught')
  125. else:
  126. raise
  127. if not self._signal_handlers:
  128. try:
  129. signal.set_wakeup_fd(-1)
  130. except (ValueError, OSError) as exc:
  131. logger.info('set_wakeup_fd(-1) failed: %s', exc)
  132. return True
  133. def _check_signal(self, sig):
  134. """Internal helper to validate a signal.
  135. Raise ValueError if the signal number is invalid or uncatchable.
  136. Raise RuntimeError if there is a problem setting up the handler.
  137. """
  138. if not isinstance(sig, int):
  139. raise TypeError(f'sig must be an int, not {sig!r}')
  140. if not (1 <= sig < signal.NSIG):
  141. raise ValueError(f'sig {sig} out of range(1, {signal.NSIG})')
  142. def _make_read_pipe_transport(self, pipe, protocol, waiter=None,
  143. extra=None):
  144. return _UnixReadPipeTransport(self, pipe, protocol, waiter, extra)
  145. def _make_write_pipe_transport(self, pipe, protocol, waiter=None,
  146. extra=None):
  147. return _UnixWritePipeTransport(self, pipe, protocol, waiter, extra)
  148. async def _make_subprocess_transport(self, protocol, args, shell,
  149. stdin, stdout, stderr, bufsize,
  150. extra=None, **kwargs):
  151. with events.get_child_watcher() as watcher:
  152. waiter = self.create_future()
  153. transp = _UnixSubprocessTransport(self, protocol, args, shell,
  154. stdin, stdout, stderr, bufsize,
  155. waiter=waiter, extra=extra,
  156. **kwargs)
  157. watcher.add_child_handler(transp.get_pid(),
  158. self._child_watcher_callback, transp)
  159. try:
  160. await waiter
  161. except Exception:
  162. transp.close()
  163. await transp._wait()
  164. raise
  165. return transp
  166. def _child_watcher_callback(self, pid, returncode, transp):
  167. self.call_soon_threadsafe(transp._process_exited, returncode)
  168. async def create_unix_connection(
  169. self, protocol_factory, path=None, *,
  170. ssl=None, sock=None,
  171. server_hostname=None,
  172. ssl_handshake_timeout=None):
  173. assert server_hostname is None or isinstance(server_hostname, str)
  174. if ssl:
  175. if server_hostname is None:
  176. raise ValueError(
  177. 'you have to pass server_hostname when using ssl')
  178. else:
  179. if server_hostname is not None:
  180. raise ValueError('server_hostname is only meaningful with ssl')
  181. if ssl_handshake_timeout is not None:
  182. raise ValueError(
  183. 'ssl_handshake_timeout is only meaningful with ssl')
  184. if path is not None:
  185. if sock is not None:
  186. raise ValueError(
  187. 'path and sock can not be specified at the same time')
  188. path = os.fspath(path)
  189. sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM, 0)
  190. try:
  191. sock.setblocking(False)
  192. await self.sock_connect(sock, path)
  193. except:
  194. sock.close()
  195. raise
  196. else:
  197. if sock is None:
  198. raise ValueError('no path and sock were specified')
  199. if (sock.family != socket.AF_UNIX or
  200. sock.type != socket.SOCK_STREAM):
  201. raise ValueError(
  202. f'A UNIX Domain Stream Socket was expected, got {sock!r}')
  203. sock.setblocking(False)
  204. transport, protocol = await self._create_connection_transport(
  205. sock, protocol_factory, ssl, server_hostname,
  206. ssl_handshake_timeout=ssl_handshake_timeout)
  207. return transport, protocol
  208. async def create_unix_server(
  209. self, protocol_factory, path=None, *,
  210. sock=None, backlog=100, ssl=None,
  211. ssl_handshake_timeout=None,
  212. start_serving=True):
  213. if isinstance(ssl, bool):
  214. raise TypeError('ssl argument must be an SSLContext or None')
  215. if ssl_handshake_timeout is not None and not ssl:
  216. raise ValueError(
  217. 'ssl_handshake_timeout is only meaningful with ssl')
  218. if path is not None:
  219. if sock is not None:
  220. raise ValueError(
  221. 'path and sock can not be specified at the same time')
  222. path = os.fspath(path)
  223. sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
  224. # Check for abstract socket. `str` and `bytes` paths are supported.
  225. if path[0] not in (0, '\x00'):
  226. try:
  227. if stat.S_ISSOCK(os.stat(path).st_mode):
  228. os.remove(path)
  229. except FileNotFoundError:
  230. pass
  231. except OSError as err:
  232. # Directory may have permissions only to create socket.
  233. logger.error('Unable to check or remove stale UNIX socket '
  234. '%r: %r', path, err)
  235. try:
  236. sock.bind(path)
  237. except OSError as exc:
  238. sock.close()
  239. if exc.errno == errno.EADDRINUSE:
  240. # Let's improve the error message by adding
  241. # with what exact address it occurs.
  242. msg = f'Address {path!r} is already in use'
  243. raise OSError(errno.EADDRINUSE, msg) from None
  244. else:
  245. raise
  246. except:
  247. sock.close()
  248. raise
  249. else:
  250. if sock is None:
  251. raise ValueError(
  252. 'path was not specified, and no sock specified')
  253. if (sock.family != socket.AF_UNIX or
  254. sock.type != socket.SOCK_STREAM):
  255. raise ValueError(
  256. f'A UNIX Domain Stream Socket was expected, got {sock!r}')
  257. sock.setblocking(False)
  258. server = base_events.Server(self, [sock], protocol_factory,
  259. ssl, backlog, ssl_handshake_timeout)
  260. if start_serving:
  261. server._start_serving()
  262. # Skip one loop iteration so that all 'loop.add_reader'
  263. # go through.
  264. await tasks.sleep(0, loop=self)
  265. return server
  266. async def _sock_sendfile_native(self, sock, file, offset, count):
  267. try:
  268. os.sendfile
  269. except AttributeError as exc:
  270. raise events.SendfileNotAvailableError(
  271. "os.sendfile() is not available")
  272. try:
  273. fileno = file.fileno()
  274. except (AttributeError, io.UnsupportedOperation) as err:
  275. raise events.SendfileNotAvailableError("not a regular file")
  276. try:
  277. fsize = os.fstat(fileno).st_size
  278. except OSError as err:
  279. raise events.SendfileNotAvailableError("not a regular file")
  280. blocksize = count if count else fsize
  281. if not blocksize:
  282. return 0 # empty file
  283. fut = self.create_future()
  284. self._sock_sendfile_native_impl(fut, None, sock, fileno,
  285. offset, count, blocksize, 0)
  286. return await fut
  287. def _sock_sendfile_native_impl(self, fut, registered_fd, sock, fileno,
  288. offset, count, blocksize, total_sent):
  289. fd = sock.fileno()
  290. if registered_fd is not None:
  291. # Remove the callback early. It should be rare that the
  292. # selector says the fd is ready but the call still returns
  293. # EAGAIN, and I am willing to take a hit in that case in
  294. # order to simplify the common case.
  295. self.remove_writer(registered_fd)
  296. if fut.cancelled():
  297. self._sock_sendfile_update_filepos(fileno, offset, total_sent)
  298. return
  299. if count:
  300. blocksize = count - total_sent
  301. if blocksize <= 0:
  302. self._sock_sendfile_update_filepos(fileno, offset, total_sent)
  303. fut.set_result(total_sent)
  304. return
  305. try:
  306. sent = os.sendfile(fd, fileno, offset, blocksize)
  307. except (BlockingIOError, InterruptedError):
  308. if registered_fd is None:
  309. self._sock_add_cancellation_callback(fut, sock)
  310. self.add_writer(fd, self._sock_sendfile_native_impl, fut,
  311. fd, sock, fileno,
  312. offset, count, blocksize, total_sent)
  313. except OSError as exc:
  314. if (registered_fd is not None and
  315. exc.errno == errno.ENOTCONN and
  316. type(exc) is not ConnectionError):
  317. # If we have an ENOTCONN and this isn't a first call to
  318. # sendfile(), i.e. the connection was closed in the middle
  319. # of the operation, normalize the error to ConnectionError
  320. # to make it consistent across all Posix systems.
  321. new_exc = ConnectionError(
  322. "socket is not connected", errno.ENOTCONN)
  323. new_exc.__cause__ = exc
  324. exc = new_exc
  325. if total_sent == 0:
  326. # We can get here for different reasons, the main
  327. # one being 'file' is not a regular mmap(2)-like
  328. # file, in which case we'll fall back on using
  329. # plain send().
  330. err = events.SendfileNotAvailableError(
  331. "os.sendfile call failed")
  332. self._sock_sendfile_update_filepos(fileno, offset, total_sent)
  333. fut.set_exception(err)
  334. else:
  335. self._sock_sendfile_update_filepos(fileno, offset, total_sent)
  336. fut.set_exception(exc)
  337. except Exception as exc:
  338. self._sock_sendfile_update_filepos(fileno, offset, total_sent)
  339. fut.set_exception(exc)
  340. else:
  341. if sent == 0:
  342. # EOF
  343. self._sock_sendfile_update_filepos(fileno, offset, total_sent)
  344. fut.set_result(total_sent)
  345. else:
  346. offset += sent
  347. total_sent += sent
  348. if registered_fd is None:
  349. self._sock_add_cancellation_callback(fut, sock)
  350. self.add_writer(fd, self._sock_sendfile_native_impl, fut,
  351. fd, sock, fileno,
  352. offset, count, blocksize, total_sent)
  353. def _sock_sendfile_update_filepos(self, fileno, offset, total_sent):
  354. if total_sent > 0:
  355. os.lseek(fileno, offset, os.SEEK_SET)
  356. def _sock_add_cancellation_callback(self, fut, sock):
  357. def cb(fut):
  358. if fut.cancelled():
  359. fd = sock.fileno()
  360. if fd != -1:
  361. self.remove_writer(fd)
  362. fut.add_done_callback(cb)
  363. class _UnixReadPipeTransport(transports.ReadTransport):
  364. max_size = 256 * 1024 # max bytes we read in one event loop iteration
  365. def __init__(self, loop, pipe, protocol, waiter=None, extra=None):
  366. super().__init__(extra)
  367. self._extra['pipe'] = pipe
  368. self._loop = loop
  369. self._pipe = pipe
  370. self._fileno = pipe.fileno()
  371. self._protocol = protocol
  372. self._closing = False
  373. self._paused = False
  374. mode = os.fstat(self._fileno).st_mode
  375. if not (stat.S_ISFIFO(mode) or
  376. stat.S_ISSOCK(mode) or
  377. stat.S_ISCHR(mode)):
  378. self._pipe = None
  379. self._fileno = None
  380. self._protocol = None
  381. raise ValueError("Pipe transport is for pipes/sockets only.")
  382. os.set_blocking(self._fileno, False)
  383. self._loop.call_soon(self._protocol.connection_made, self)
  384. # only start reading when connection_made() has been called
  385. self._loop.call_soon(self._loop._add_reader,
  386. self._fileno, self._read_ready)
  387. if waiter is not None:
  388. # only wake up the waiter when connection_made() has been called
  389. self._loop.call_soon(futures._set_result_unless_cancelled,
  390. waiter, None)
  391. def __repr__(self):
  392. info = [self.__class__.__name__]
  393. if self._pipe is None:
  394. info.append('closed')
  395. elif self._closing:
  396. info.append('closing')
  397. info.append(f'fd={self._fileno}')
  398. selector = getattr(self._loop, '_selector', None)
  399. if self._pipe is not None and selector is not None:
  400. polling = selector_events._test_selector_event(
  401. selector, self._fileno, selectors.EVENT_READ)
  402. if polling:
  403. info.append('polling')
  404. else:
  405. info.append('idle')
  406. elif self._pipe is not None:
  407. info.append('open')
  408. else:
  409. info.append('closed')
  410. return '<{}>'.format(' '.join(info))
  411. def _read_ready(self):
  412. try:
  413. data = os.read(self._fileno, self.max_size)
  414. except (BlockingIOError, InterruptedError):
  415. pass
  416. except OSError as exc:
  417. self._fatal_error(exc, 'Fatal read error on pipe transport')
  418. else:
  419. if data:
  420. self._protocol.data_received(data)
  421. else:
  422. if self._loop.get_debug():
  423. logger.info("%r was closed by peer", self)
  424. self._closing = True
  425. self._loop._remove_reader(self._fileno)
  426. self._loop.call_soon(self._protocol.eof_received)
  427. self._loop.call_soon(self._call_connection_lost, None)
  428. def pause_reading(self):
  429. if self._closing or self._paused:
  430. return
  431. self._paused = True
  432. self._loop._remove_reader(self._fileno)
  433. if self._loop.get_debug():
  434. logger.debug("%r pauses reading", self)
  435. def resume_reading(self):
  436. if self._closing or not self._paused:
  437. return
  438. self._paused = False
  439. self._loop._add_reader(self._fileno, self._read_ready)
  440. if self._loop.get_debug():
  441. logger.debug("%r resumes reading", self)
  442. def set_protocol(self, protocol):
  443. self._protocol = protocol
  444. def get_protocol(self):
  445. return self._protocol
  446. def is_closing(self):
  447. return self._closing
  448. def close(self):
  449. if not self._closing:
  450. self._close(None)
  451. def __del__(self):
  452. if self._pipe is not None:
  453. warnings.warn(f"unclosed transport {self!r}", ResourceWarning,
  454. source=self)
  455. self._pipe.close()
  456. def _fatal_error(self, exc, message='Fatal error on pipe transport'):
  457. # should be called by exception handler only
  458. if (isinstance(exc, OSError) and exc.errno == errno.EIO):
  459. if self._loop.get_debug():
  460. logger.debug("%r: %s", self, message, exc_info=True)
  461. else:
  462. self._loop.call_exception_handler({
  463. 'message': message,
  464. 'exception': exc,
  465. 'transport': self,
  466. 'protocol': self._protocol,
  467. })
  468. self._close(exc)
  469. def _close(self, exc):
  470. self._closing = True
  471. self._loop._remove_reader(self._fileno)
  472. self._loop.call_soon(self._call_connection_lost, exc)
  473. def _call_connection_lost(self, exc):
  474. try:
  475. self._protocol.connection_lost(exc)
  476. finally:
  477. self._pipe.close()
  478. self._pipe = None
  479. self._protocol = None
  480. self._loop = None
  481. class _UnixWritePipeTransport(transports._FlowControlMixin,
  482. transports.WriteTransport):
  483. def __init__(self, loop, pipe, protocol, waiter=None, extra=None):
  484. super().__init__(extra, loop)
  485. self._extra['pipe'] = pipe
  486. self._pipe = pipe
  487. self._fileno = pipe.fileno()
  488. self._protocol = protocol
  489. self._buffer = bytearray()
  490. self._conn_lost = 0
  491. self._closing = False # Set when close() or write_eof() called.
  492. mode = os.fstat(self._fileno).st_mode
  493. is_char = stat.S_ISCHR(mode)
  494. is_fifo = stat.S_ISFIFO(mode)
  495. is_socket = stat.S_ISSOCK(mode)
  496. if not (is_char or is_fifo or is_socket):
  497. self._pipe = None
  498. self._fileno = None
  499. self._protocol = None
  500. raise ValueError("Pipe transport is only for "
  501. "pipes, sockets and character devices")
  502. os.set_blocking(self._fileno, False)
  503. self._loop.call_soon(self._protocol.connection_made, self)
  504. # On AIX, the reader trick (to be notified when the read end of the
  505. # socket is closed) only works for sockets. On other platforms it
  506. # works for pipes and sockets. (Exception: OS X 10.4? Issue #19294.)
  507. if is_socket or (is_fifo and not sys.platform.startswith("aix")):
  508. # only start reading when connection_made() has been called
  509. self._loop.call_soon(self._loop._add_reader,
  510. self._fileno, self._read_ready)
  511. if waiter is not None:
  512. # only wake up the waiter when connection_made() has been called
  513. self._loop.call_soon(futures._set_result_unless_cancelled,
  514. waiter, None)
  515. def __repr__(self):
  516. info = [self.__class__.__name__]
  517. if self._pipe is None:
  518. info.append('closed')
  519. elif self._closing:
  520. info.append('closing')
  521. info.append(f'fd={self._fileno}')
  522. selector = getattr(self._loop, '_selector', None)
  523. if self._pipe is not None and selector is not None:
  524. polling = selector_events._test_selector_event(
  525. selector, self._fileno, selectors.EVENT_WRITE)
  526. if polling:
  527. info.append('polling')
  528. else:
  529. info.append('idle')
  530. bufsize = self.get_write_buffer_size()
  531. info.append(f'bufsize={bufsize}')
  532. elif self._pipe is not None:
  533. info.append('open')
  534. else:
  535. info.append('closed')
  536. return '<{}>'.format(' '.join(info))
  537. def get_write_buffer_size(self):
  538. return len(self._buffer)
  539. def _read_ready(self):
  540. # Pipe was closed by peer.
  541. if self._loop.get_debug():
  542. logger.info("%r was closed by peer", self)
  543. if self._buffer:
  544. self._close(BrokenPipeError())
  545. else:
  546. self._close()
  547. def write(self, data):
  548. assert isinstance(data, (bytes, bytearray, memoryview)), repr(data)
  549. if isinstance(data, bytearray):
  550. data = memoryview(data)
  551. if not data:
  552. return
  553. if self._conn_lost or self._closing:
  554. if self._conn_lost >= constants.LOG_THRESHOLD_FOR_CONNLOST_WRITES:
  555. logger.warning('pipe closed by peer or '
  556. 'os.write(pipe, data) raised exception.')
  557. self._conn_lost += 1
  558. return
  559. if not self._buffer:
  560. # Attempt to send it right away first.
  561. try:
  562. n = os.write(self._fileno, data)
  563. except (BlockingIOError, InterruptedError):
  564. n = 0
  565. except Exception as exc:
  566. self._conn_lost += 1
  567. self._fatal_error(exc, 'Fatal write error on pipe transport')
  568. return
  569. if n == len(data):
  570. return
  571. elif n > 0:
  572. data = memoryview(data)[n:]
  573. self._loop._add_writer(self._fileno, self._write_ready)
  574. self._buffer += data
  575. self._maybe_pause_protocol()
  576. def _write_ready(self):
  577. assert self._buffer, 'Data should not be empty'
  578. try:
  579. n = os.write(self._fileno, self._buffer)
  580. except (BlockingIOError, InterruptedError):
  581. pass
  582. except Exception as exc:
  583. self._buffer.clear()
  584. self._conn_lost += 1
  585. # Remove writer here, _fatal_error() doesn't it
  586. # because _buffer is empty.
  587. self._loop._remove_writer(self._fileno)
  588. self._fatal_error(exc, 'Fatal write error on pipe transport')
  589. else:
  590. if n == len(self._buffer):
  591. self._buffer.clear()
  592. self._loop._remove_writer(self._fileno)
  593. self._maybe_resume_protocol() # May append to buffer.
  594. if self._closing:
  595. self._loop._remove_reader(self._fileno)
  596. self._call_connection_lost(None)
  597. return
  598. elif n > 0:
  599. del self._buffer[:n]
  600. def can_write_eof(self):
  601. return True
  602. def write_eof(self):
  603. if self._closing:
  604. return
  605. assert self._pipe
  606. self._closing = True
  607. if not self._buffer:
  608. self._loop._remove_reader(self._fileno)
  609. self._loop.call_soon(self._call_connection_lost, None)
  610. def set_protocol(self, protocol):
  611. self._protocol = protocol
  612. def get_protocol(self):
  613. return self._protocol
  614. def is_closing(self):
  615. return self._closing
  616. def close(self):
  617. if self._pipe is not None and not self._closing:
  618. # write_eof is all what we needed to close the write pipe
  619. self.write_eof()
  620. def __del__(self):
  621. if self._pipe is not None:
  622. warnings.warn(f"unclosed transport {self!r}", ResourceWarning,
  623. source=self)
  624. self._pipe.close()
  625. def abort(self):
  626. self._close(None)
  627. def _fatal_error(self, exc, message='Fatal error on pipe transport'):
  628. # should be called by exception handler only
  629. if isinstance(exc, OSError):
  630. if self._loop.get_debug():
  631. logger.debug("%r: %s", self, message, exc_info=True)
  632. else:
  633. self._loop.call_exception_handler({
  634. 'message': message,
  635. 'exception': exc,
  636. 'transport': self,
  637. 'protocol': self._protocol,
  638. })
  639. self._close(exc)
  640. def _close(self, exc=None):
  641. self._closing = True
  642. if self._buffer:
  643. self._loop._remove_writer(self._fileno)
  644. self._buffer.clear()
  645. self._loop._remove_reader(self._fileno)
  646. self._loop.call_soon(self._call_connection_lost, exc)
  647. def _call_connection_lost(self, exc):
  648. try:
  649. self._protocol.connection_lost(exc)
  650. finally:
  651. self._pipe.close()
  652. self._pipe = None
  653. self._protocol = None
  654. self._loop = None
  655. class _UnixSubprocessTransport(base_subprocess.BaseSubprocessTransport):
  656. def _start(self, args, shell, stdin, stdout, stderr, bufsize, **kwargs):
  657. stdin_w = None
  658. if stdin == subprocess.PIPE:
  659. # Use a socket pair for stdin, since not all platforms
  660. # support selecting read events on the write end of a
  661. # socket (which we use in order to detect closing of the
  662. # other end). Notably this is needed on AIX, and works
  663. # just fine on other platforms.
  664. stdin, stdin_w = socket.socketpair()
  665. try:
  666. self._proc = subprocess.Popen(
  667. args, shell=shell, stdin=stdin, stdout=stdout, stderr=stderr,
  668. universal_newlines=False, bufsize=bufsize, **kwargs)
  669. if stdin_w is not None:
  670. stdin.close()
  671. self._proc.stdin = open(stdin_w.detach(), 'wb', buffering=bufsize)
  672. stdin_w = None
  673. finally:
  674. if stdin_w is not None:
  675. stdin.close()
  676. stdin_w.close()
  677. class AbstractChildWatcher:
  678. """Abstract base class for monitoring child processes.
  679. Objects derived from this class monitor a collection of subprocesses and
  680. report their termination or interruption by a signal.
  681. New callbacks are registered with .add_child_handler(). Starting a new
  682. process must be done within a 'with' block to allow the watcher to suspend
  683. its activity until the new process if fully registered (this is needed to
  684. prevent a race condition in some implementations).
  685. Example:
  686. with watcher:
  687. proc = subprocess.Popen("sleep 1")
  688. watcher.add_child_handler(proc.pid, callback)
  689. Notes:
  690. Implementations of this class must be thread-safe.
  691. Since child watcher objects may catch the SIGCHLD signal and call
  692. waitpid(-1), there should be only one active object per process.
  693. """
  694. def add_child_handler(self, pid, callback, *args):
  695. """Register a new child handler.
  696. Arrange for callback(pid, returncode, *args) to be called when
  697. process 'pid' terminates. Specifying another callback for the same
  698. process replaces the previous handler.
  699. Note: callback() must be thread-safe.
  700. """
  701. raise NotImplementedError()
  702. def remove_child_handler(self, pid):
  703. """Removes the handler for process 'pid'.
  704. The function returns True if the handler was successfully removed,
  705. False if there was nothing to remove."""
  706. raise NotImplementedError()
  707. def attach_loop(self, loop):
  708. """Attach the watcher to an event loop.
  709. If the watcher was previously attached to an event loop, then it is
  710. first detached before attaching to the new loop.
  711. Note: loop may be None.
  712. """
  713. raise NotImplementedError()
  714. def close(self):
  715. """Close the watcher.
  716. This must be called to make sure that any underlying resource is freed.
  717. """
  718. raise NotImplementedError()
  719. def __enter__(self):
  720. """Enter the watcher's context and allow starting new processes
  721. This function must return self"""
  722. raise NotImplementedError()
  723. def __exit__(self, a, b, c):
  724. """Exit the watcher's context"""
  725. raise NotImplementedError()
  726. class BaseChildWatcher(AbstractChildWatcher):
  727. def __init__(self):
  728. self._loop = None
  729. self._callbacks = {}
  730. def close(self):
  731. self.attach_loop(None)
  732. def _do_waitpid(self, expected_pid):
  733. raise NotImplementedError()
  734. def _do_waitpid_all(self):
  735. raise NotImplementedError()
  736. def attach_loop(self, loop):
  737. assert loop is None or isinstance(loop, events.AbstractEventLoop)
  738. if self._loop is not None and loop is None and self._callbacks:
  739. warnings.warn(
  740. 'A loop is being detached '
  741. 'from a child watcher with pending handlers',
  742. RuntimeWarning)
  743. if self._loop is not None:
  744. self._loop.remove_signal_handler(signal.SIGCHLD)
  745. self._loop = loop
  746. if loop is not None:
  747. loop.add_signal_handler(signal.SIGCHLD, self._sig_chld)
  748. # Prevent a race condition in case a child terminated
  749. # during the switch.
  750. self._do_waitpid_all()
  751. def _sig_chld(self):
  752. try:
  753. self._do_waitpid_all()
  754. except Exception as exc:
  755. # self._loop should always be available here
  756. # as '_sig_chld' is added as a signal handler
  757. # in 'attach_loop'
  758. self._loop.call_exception_handler({
  759. 'message': 'Unknown exception in SIGCHLD handler',
  760. 'exception': exc,
  761. })
  762. def _compute_returncode(self, status):
  763. if os.WIFSIGNALED(status):
  764. # The child process died because of a signal.
  765. return -os.WTERMSIG(status)
  766. elif os.WIFEXITED(status):
  767. # The child process exited (e.g sys.exit()).
  768. return os.WEXITSTATUS(status)
  769. else:
  770. # The child exited, but we don't understand its status.
  771. # This shouldn't happen, but if it does, let's just
  772. # return that status; perhaps that helps debug it.
  773. return status
  774. class SafeChildWatcher(BaseChildWatcher):
  775. """'Safe' child watcher implementation.
  776. This implementation avoids disrupting other code spawning processes by
  777. polling explicitly each process in the SIGCHLD handler instead of calling
  778. os.waitpid(-1).
  779. This is a safe solution but it has a significant overhead when handling a
  780. big number of children (O(n) each time SIGCHLD is raised)
  781. """
  782. def close(self):
  783. self._callbacks.clear()
  784. super().close()
  785. def __enter__(self):
  786. return self
  787. def __exit__(self, a, b, c):
  788. pass
  789. def add_child_handler(self, pid, callback, *args):
  790. if self._loop is None:
  791. raise RuntimeError(
  792. "Cannot add child handler, "
  793. "the child watcher does not have a loop attached")
  794. self._callbacks[pid] = (callback, args)
  795. # Prevent a race condition in case the child is already terminated.
  796. self._do_waitpid(pid)
  797. def remove_child_handler(self, pid):
  798. try:
  799. del self._callbacks[pid]
  800. return True
  801. except KeyError:
  802. return False
  803. def _do_waitpid_all(self):
  804. for pid in list(self._callbacks):
  805. self._do_waitpid(pid)
  806. def _do_waitpid(self, expected_pid):
  807. assert expected_pid > 0
  808. try:
  809. pid, status = os.waitpid(expected_pid, os.WNOHANG)
  810. except ChildProcessError:
  811. # The child process is already reaped
  812. # (may happen if waitpid() is called elsewhere).
  813. pid = expected_pid
  814. returncode = 255
  815. logger.warning(
  816. "Unknown child process pid %d, will report returncode 255",
  817. pid)
  818. else:
  819. if pid == 0:
  820. # The child process is still alive.
  821. return
  822. returncode = self._compute_returncode(status)
  823. if self._loop.get_debug():
  824. logger.debug('process %s exited with returncode %s',
  825. expected_pid, returncode)
  826. try:
  827. callback, args = self._callbacks.pop(pid)
  828. except KeyError: # pragma: no cover
  829. # May happen if .remove_child_handler() is called
  830. # after os.waitpid() returns.
  831. if self._loop.get_debug():
  832. logger.warning("Child watcher got an unexpected pid: %r",
  833. pid, exc_info=True)
  834. else:
  835. callback(pid, returncode, *args)
  836. class FastChildWatcher(BaseChildWatcher):
  837. """'Fast' child watcher implementation.
  838. This implementation reaps every terminated processes by calling
  839. os.waitpid(-1) directly, possibly breaking other code spawning processes
  840. and waiting for their termination.
  841. There is no noticeable overhead when handling a big number of children
  842. (O(1) each time a child terminates).
  843. """
  844. def __init__(self):
  845. super().__init__()
  846. self._lock = threading.Lock()
  847. self._zombies = {}
  848. self._forks = 0
  849. def close(self):
  850. self._callbacks.clear()
  851. self._zombies.clear()
  852. super().close()
  853. def __enter__(self):
  854. with self._lock:
  855. self._forks += 1
  856. return self
  857. def __exit__(self, a, b, c):
  858. with self._lock:
  859. self._forks -= 1
  860. if self._forks or not self._zombies:
  861. return
  862. collateral_victims = str(self._zombies)
  863. self._zombies.clear()
  864. logger.warning(
  865. "Caught subprocesses termination from unknown pids: %s",
  866. collateral_victims)
  867. def add_child_handler(self, pid, callback, *args):
  868. assert self._forks, "Must use the context manager"
  869. if self._loop is None:
  870. raise RuntimeError(
  871. "Cannot add child handler, "
  872. "the child watcher does not have a loop attached")
  873. with self._lock:
  874. try:
  875. returncode = self._zombies.pop(pid)
  876. except KeyError:
  877. # The child is running.
  878. self._callbacks[pid] = callback, args
  879. return
  880. # The child is dead already. We can fire the callback.
  881. callback(pid, returncode, *args)
  882. def remove_child_handler(self, pid):
  883. try:
  884. del self._callbacks[pid]
  885. return True
  886. except KeyError:
  887. return False
  888. def _do_waitpid_all(self):
  889. # Because of signal coalescing, we must keep calling waitpid() as
  890. # long as we're able to reap a child.
  891. while True:
  892. try:
  893. pid, status = os.waitpid(-1, os.WNOHANG)
  894. except ChildProcessError:
  895. # No more child processes exist.
  896. return
  897. else:
  898. if pid == 0:
  899. # A child process is still alive.
  900. return
  901. returncode = self._compute_returncode(status)
  902. with self._lock:
  903. try:
  904. callback, args = self._callbacks.pop(pid)
  905. except KeyError:
  906. # unknown child
  907. if self._forks:
  908. # It may not be registered yet.
  909. self._zombies[pid] = returncode
  910. if self._loop.get_debug():
  911. logger.debug('unknown process %s exited '
  912. 'with returncode %s',
  913. pid, returncode)
  914. continue
  915. callback = None
  916. else:
  917. if self._loop.get_debug():
  918. logger.debug('process %s exited with returncode %s',
  919. pid, returncode)
  920. if callback is None:
  921. logger.warning(
  922. "Caught subprocess termination from unknown pid: "
  923. "%d -> %d", pid, returncode)
  924. else:
  925. callback(pid, returncode, *args)
  926. class _UnixDefaultEventLoopPolicy(events.BaseDefaultEventLoopPolicy):
  927. """UNIX event loop policy with a watcher for child processes."""
  928. _loop_factory = _UnixSelectorEventLoop
  929. def __init__(self):
  930. super().__init__()
  931. self._watcher = None
  932. def _init_watcher(self):
  933. with events._lock:
  934. if self._watcher is None: # pragma: no branch
  935. self._watcher = SafeChildWatcher()
  936. if isinstance(threading.current_thread(),
  937. threading._MainThread):
  938. self._watcher.attach_loop(self._local._loop)
  939. def set_event_loop(self, loop):
  940. """Set the event loop.
  941. As a side effect, if a child watcher was set before, then calling
  942. .set_event_loop() from the main thread will call .attach_loop(loop) on
  943. the child watcher.
  944. """
  945. super().set_event_loop(loop)
  946. if (self._watcher is not None and
  947. isinstance(threading.current_thread(), threading._MainThread)):
  948. self._watcher.attach_loop(loop)
  949. def get_child_watcher(self):
  950. """Get the watcher for child processes.
  951. If not yet set, a SafeChildWatcher object is automatically created.
  952. """
  953. if self._watcher is None:
  954. self._init_watcher()
  955. return self._watcher
  956. def set_child_watcher(self, watcher):
  957. """Set the watcher for child processes."""
  958. assert watcher is None or isinstance(watcher, AbstractChildWatcher)
  959. if self._watcher is not None:
  960. self._watcher.close()
  961. self._watcher = watcher
  962. SelectorEventLoop = _UnixSelectorEventLoop
  963. DefaultEventLoopPolicy = _UnixDefaultEventLoopPolicy