selector_events.py 37 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041
  1. """Event loop using a selector and related classes.
  2. A selector is a "notify-when-ready" multiplexer. For a subclass which
  3. also includes support for signal handling, see the unix_events sub-module.
  4. """
  5. __all__ = 'BaseSelectorEventLoop',
  6. import collections
  7. import errno
  8. import functools
  9. import selectors
  10. import socket
  11. import warnings
  12. import weakref
  13. try:
  14. import ssl
  15. except ImportError: # pragma: no cover
  16. ssl = None
  17. from . import base_events
  18. from . import constants
  19. from . import events
  20. from . import futures
  21. from . import protocols
  22. from . import sslproto
  23. from . import transports
  24. from .log import logger
  25. def _test_selector_event(selector, fd, event):
  26. # Test if the selector is monitoring 'event' events
  27. # for the file descriptor 'fd'.
  28. try:
  29. key = selector.get_key(fd)
  30. except KeyError:
  31. return False
  32. else:
  33. return bool(key.events & event)
  34. def _check_ssl_socket(sock):
  35. if ssl is not None and isinstance(sock, ssl.SSLSocket):
  36. raise TypeError("Socket cannot be of type SSLSocket")
  37. class BaseSelectorEventLoop(base_events.BaseEventLoop):
  38. """Selector event loop.
  39. See events.EventLoop for API specification.
  40. """
  41. def __init__(self, selector=None):
  42. super().__init__()
  43. if selector is None:
  44. selector = selectors.DefaultSelector()
  45. logger.debug('Using selector: %s', selector.__class__.__name__)
  46. self._selector = selector
  47. self._make_self_pipe()
  48. self._transports = weakref.WeakValueDictionary()
  49. def _make_socket_transport(self, sock, protocol, waiter=None, *,
  50. extra=None, server=None):
  51. return _SelectorSocketTransport(self, sock, protocol, waiter,
  52. extra, server)
  53. def _make_ssl_transport(
  54. self, rawsock, protocol, sslcontext, waiter=None,
  55. *, server_side=False, server_hostname=None,
  56. extra=None, server=None,
  57. ssl_handshake_timeout=constants.SSL_HANDSHAKE_TIMEOUT):
  58. ssl_protocol = sslproto.SSLProtocol(
  59. self, protocol, sslcontext, waiter,
  60. server_side, server_hostname,
  61. ssl_handshake_timeout=ssl_handshake_timeout)
  62. _SelectorSocketTransport(self, rawsock, ssl_protocol,
  63. extra=extra, server=server)
  64. return ssl_protocol._app_transport
  65. def _make_datagram_transport(self, sock, protocol,
  66. address=None, waiter=None, extra=None):
  67. return _SelectorDatagramTransport(self, sock, protocol,
  68. address, waiter, extra)
  69. def close(self):
  70. if self.is_running():
  71. raise RuntimeError("Cannot close a running event loop")
  72. if self.is_closed():
  73. return
  74. self._close_self_pipe()
  75. super().close()
  76. if self._selector is not None:
  77. self._selector.close()
  78. self._selector = None
  79. def _close_self_pipe(self):
  80. self._remove_reader(self._ssock.fileno())
  81. self._ssock.close()
  82. self._ssock = None
  83. self._csock.close()
  84. self._csock = None
  85. self._internal_fds -= 1
  86. def _make_self_pipe(self):
  87. # A self-socket, really. :-)
  88. self._ssock, self._csock = socket.socketpair()
  89. self._ssock.setblocking(False)
  90. self._csock.setblocking(False)
  91. self._internal_fds += 1
  92. self._add_reader(self._ssock.fileno(), self._read_from_self)
  93. def _process_self_data(self, data):
  94. pass
  95. def _read_from_self(self):
  96. while True:
  97. try:
  98. data = self._ssock.recv(4096)
  99. if not data:
  100. break
  101. self._process_self_data(data)
  102. except InterruptedError:
  103. continue
  104. except BlockingIOError:
  105. break
  106. def _write_to_self(self):
  107. # This may be called from a different thread, possibly after
  108. # _close_self_pipe() has been called or even while it is
  109. # running. Guard for self._csock being None or closed. When
  110. # a socket is closed, send() raises OSError (with errno set to
  111. # EBADF, but let's not rely on the exact error code).
  112. csock = self._csock
  113. if csock is not None:
  114. try:
  115. csock.send(b'\0')
  116. except OSError:
  117. if self._debug:
  118. logger.debug("Fail to write a null byte into the "
  119. "self-pipe socket",
  120. exc_info=True)
  121. def _start_serving(self, protocol_factory, sock,
  122. sslcontext=None, server=None, backlog=100,
  123. ssl_handshake_timeout=constants.SSL_HANDSHAKE_TIMEOUT):
  124. self._add_reader(sock.fileno(), self._accept_connection,
  125. protocol_factory, sock, sslcontext, server, backlog,
  126. ssl_handshake_timeout)
  127. def _accept_connection(
  128. self, protocol_factory, sock,
  129. sslcontext=None, server=None, backlog=100,
  130. ssl_handshake_timeout=constants.SSL_HANDSHAKE_TIMEOUT):
  131. # This method is only called once for each event loop tick where the
  132. # listening socket has triggered an EVENT_READ. There may be multiple
  133. # connections waiting for an .accept() so it is called in a loop.
  134. # See https://bugs.python.org/issue27906 for more details.
  135. for _ in range(backlog):
  136. try:
  137. conn, addr = sock.accept()
  138. if self._debug:
  139. logger.debug("%r got a new connection from %r: %r",
  140. server, addr, conn)
  141. conn.setblocking(False)
  142. except (BlockingIOError, InterruptedError, ConnectionAbortedError):
  143. # Early exit because the socket accept buffer is empty.
  144. return None
  145. except OSError as exc:
  146. # There's nowhere to send the error, so just log it.
  147. if exc.errno in (errno.EMFILE, errno.ENFILE,
  148. errno.ENOBUFS, errno.ENOMEM):
  149. # Some platforms (e.g. Linux keep reporting the FD as
  150. # ready, so we remove the read handler temporarily.
  151. # We'll try again in a while.
  152. self.call_exception_handler({
  153. 'message': 'socket.accept() out of system resource',
  154. 'exception': exc,
  155. 'socket': sock,
  156. })
  157. self._remove_reader(sock.fileno())
  158. self.call_later(constants.ACCEPT_RETRY_DELAY,
  159. self._start_serving,
  160. protocol_factory, sock, sslcontext, server,
  161. backlog, ssl_handshake_timeout)
  162. else:
  163. raise # The event loop will catch, log and ignore it.
  164. else:
  165. extra = {'peername': addr}
  166. accept = self._accept_connection2(
  167. protocol_factory, conn, extra, sslcontext, server,
  168. ssl_handshake_timeout)
  169. self.create_task(accept)
  170. async def _accept_connection2(
  171. self, protocol_factory, conn, extra,
  172. sslcontext=None, server=None,
  173. ssl_handshake_timeout=constants.SSL_HANDSHAKE_TIMEOUT):
  174. protocol = None
  175. transport = None
  176. try:
  177. protocol = protocol_factory()
  178. waiter = self.create_future()
  179. if sslcontext:
  180. transport = self._make_ssl_transport(
  181. conn, protocol, sslcontext, waiter=waiter,
  182. server_side=True, extra=extra, server=server,
  183. ssl_handshake_timeout=ssl_handshake_timeout)
  184. else:
  185. transport = self._make_socket_transport(
  186. conn, protocol, waiter=waiter, extra=extra,
  187. server=server)
  188. try:
  189. await waiter
  190. except:
  191. transport.close()
  192. raise
  193. # It's now up to the protocol to handle the connection.
  194. except Exception as exc:
  195. if self._debug:
  196. context = {
  197. 'message':
  198. 'Error on transport creation for incoming connection',
  199. 'exception': exc,
  200. }
  201. if protocol is not None:
  202. context['protocol'] = protocol
  203. if transport is not None:
  204. context['transport'] = transport
  205. self.call_exception_handler(context)
  206. def _ensure_fd_no_transport(self, fd):
  207. fileno = fd
  208. if not isinstance(fileno, int):
  209. try:
  210. fileno = int(fileno.fileno())
  211. except (AttributeError, TypeError, ValueError):
  212. # This code matches selectors._fileobj_to_fd function.
  213. raise ValueError(f"Invalid file object: {fd!r}") from None
  214. try:
  215. transport = self._transports[fileno]
  216. except KeyError:
  217. pass
  218. else:
  219. if not transport.is_closing():
  220. raise RuntimeError(
  221. f'File descriptor {fd!r} is used by transport '
  222. f'{transport!r}')
  223. def _add_reader(self, fd, callback, *args):
  224. self._check_closed()
  225. handle = events.Handle(callback, args, self, None)
  226. try:
  227. key = self._selector.get_key(fd)
  228. except KeyError:
  229. self._selector.register(fd, selectors.EVENT_READ,
  230. (handle, None))
  231. else:
  232. mask, (reader, writer) = key.events, key.data
  233. self._selector.modify(fd, mask | selectors.EVENT_READ,
  234. (handle, writer))
  235. if reader is not None:
  236. reader.cancel()
  237. def _remove_reader(self, fd):
  238. if self.is_closed():
  239. return False
  240. try:
  241. key = self._selector.get_key(fd)
  242. except KeyError:
  243. return False
  244. else:
  245. mask, (reader, writer) = key.events, key.data
  246. mask &= ~selectors.EVENT_READ
  247. if not mask:
  248. self._selector.unregister(fd)
  249. else:
  250. self._selector.modify(fd, mask, (None, writer))
  251. if reader is not None:
  252. reader.cancel()
  253. return True
  254. else:
  255. return False
  256. def _add_writer(self, fd, callback, *args):
  257. self._check_closed()
  258. handle = events.Handle(callback, args, self, None)
  259. try:
  260. key = self._selector.get_key(fd)
  261. except KeyError:
  262. self._selector.register(fd, selectors.EVENT_WRITE,
  263. (None, handle))
  264. else:
  265. mask, (reader, writer) = key.events, key.data
  266. self._selector.modify(fd, mask | selectors.EVENT_WRITE,
  267. (reader, handle))
  268. if writer is not None:
  269. writer.cancel()
  270. def _remove_writer(self, fd):
  271. """Remove a writer callback."""
  272. if self.is_closed():
  273. return False
  274. try:
  275. key = self._selector.get_key(fd)
  276. except KeyError:
  277. return False
  278. else:
  279. mask, (reader, writer) = key.events, key.data
  280. # Remove both writer and connector.
  281. mask &= ~selectors.EVENT_WRITE
  282. if not mask:
  283. self._selector.unregister(fd)
  284. else:
  285. self._selector.modify(fd, mask, (reader, None))
  286. if writer is not None:
  287. writer.cancel()
  288. return True
  289. else:
  290. return False
  291. def add_reader(self, fd, callback, *args):
  292. """Add a reader callback."""
  293. self._ensure_fd_no_transport(fd)
  294. return self._add_reader(fd, callback, *args)
  295. def remove_reader(self, fd):
  296. """Remove a reader callback."""
  297. self._ensure_fd_no_transport(fd)
  298. return self._remove_reader(fd)
  299. def add_writer(self, fd, callback, *args):
  300. """Add a writer callback.."""
  301. self._ensure_fd_no_transport(fd)
  302. return self._add_writer(fd, callback, *args)
  303. def remove_writer(self, fd):
  304. """Remove a writer callback."""
  305. self._ensure_fd_no_transport(fd)
  306. return self._remove_writer(fd)
  307. async def sock_recv(self, sock, n):
  308. """Receive data from the socket.
  309. The return value is a bytes object representing the data received.
  310. The maximum amount of data to be received at once is specified by
  311. nbytes.
  312. """
  313. _check_ssl_socket(sock)
  314. if self._debug and sock.gettimeout() != 0:
  315. raise ValueError("the socket must be non-blocking")
  316. fut = self.create_future()
  317. self._sock_recv(fut, None, sock, n)
  318. return await fut
  319. def _sock_recv(self, fut, registered_fd, sock, n):
  320. # _sock_recv() can add itself as an I/O callback if the operation can't
  321. # be done immediately. Don't use it directly, call sock_recv().
  322. if registered_fd is not None:
  323. # Remove the callback early. It should be rare that the
  324. # selector says the fd is ready but the call still returns
  325. # EAGAIN, and I am willing to take a hit in that case in
  326. # order to simplify the common case.
  327. self.remove_reader(registered_fd)
  328. if fut.cancelled():
  329. return
  330. try:
  331. data = sock.recv(n)
  332. except (BlockingIOError, InterruptedError):
  333. fd = sock.fileno()
  334. self.add_reader(fd, self._sock_recv, fut, fd, sock, n)
  335. except Exception as exc:
  336. fut.set_exception(exc)
  337. else:
  338. fut.set_result(data)
  339. async def sock_recv_into(self, sock, buf):
  340. """Receive data from the socket.
  341. The received data is written into *buf* (a writable buffer).
  342. The return value is the number of bytes written.
  343. """
  344. _check_ssl_socket(sock)
  345. if self._debug and sock.gettimeout() != 0:
  346. raise ValueError("the socket must be non-blocking")
  347. fut = self.create_future()
  348. self._sock_recv_into(fut, None, sock, buf)
  349. return await fut
  350. def _sock_recv_into(self, fut, registered_fd, sock, buf):
  351. # _sock_recv_into() can add itself as an I/O callback if the operation
  352. # can't be done immediately. Don't use it directly, call
  353. # sock_recv_into().
  354. if registered_fd is not None:
  355. # Remove the callback early. It should be rare that the
  356. # selector says the FD is ready but the call still returns
  357. # EAGAIN, and I am willing to take a hit in that case in
  358. # order to simplify the common case.
  359. self.remove_reader(registered_fd)
  360. if fut.cancelled():
  361. return
  362. try:
  363. nbytes = sock.recv_into(buf)
  364. except (BlockingIOError, InterruptedError):
  365. fd = sock.fileno()
  366. self.add_reader(fd, self._sock_recv_into, fut, fd, sock, buf)
  367. except Exception as exc:
  368. fut.set_exception(exc)
  369. else:
  370. fut.set_result(nbytes)
  371. async def sock_sendall(self, sock, data):
  372. """Send data to the socket.
  373. The socket must be connected to a remote socket. This method continues
  374. to send data from data until either all data has been sent or an
  375. error occurs. None is returned on success. On error, an exception is
  376. raised, and there is no way to determine how much data, if any, was
  377. successfully processed by the receiving end of the connection.
  378. """
  379. _check_ssl_socket(sock)
  380. if self._debug and sock.gettimeout() != 0:
  381. raise ValueError("the socket must be non-blocking")
  382. fut = self.create_future()
  383. if data:
  384. self._sock_sendall(fut, None, sock, data)
  385. else:
  386. fut.set_result(None)
  387. return await fut
  388. def _sock_sendall(self, fut, registered_fd, sock, data):
  389. if registered_fd is not None:
  390. self.remove_writer(registered_fd)
  391. if fut.cancelled():
  392. return
  393. try:
  394. n = sock.send(data)
  395. except (BlockingIOError, InterruptedError):
  396. n = 0
  397. except Exception as exc:
  398. fut.set_exception(exc)
  399. return
  400. if n == len(data):
  401. fut.set_result(None)
  402. else:
  403. if n:
  404. data = data[n:]
  405. fd = sock.fileno()
  406. self.add_writer(fd, self._sock_sendall, fut, fd, sock, data)
  407. async def sock_connect(self, sock, address):
  408. """Connect to a remote socket at address.
  409. This method is a coroutine.
  410. """
  411. _check_ssl_socket(sock)
  412. if self._debug and sock.gettimeout() != 0:
  413. raise ValueError("the socket must be non-blocking")
  414. if not hasattr(socket, 'AF_UNIX') or sock.family != socket.AF_UNIX:
  415. resolved = await self._ensure_resolved(
  416. address, family=sock.family, proto=sock.proto, loop=self)
  417. _, _, _, _, address = resolved[0]
  418. fut = self.create_future()
  419. self._sock_connect(fut, sock, address)
  420. return await fut
  421. def _sock_connect(self, fut, sock, address):
  422. fd = sock.fileno()
  423. try:
  424. sock.connect(address)
  425. except (BlockingIOError, InterruptedError):
  426. # Issue #23618: When the C function connect() fails with EINTR, the
  427. # connection runs in background. We have to wait until the socket
  428. # becomes writable to be notified when the connection succeed or
  429. # fails.
  430. fut.add_done_callback(
  431. functools.partial(self._sock_connect_done, fd))
  432. self.add_writer(fd, self._sock_connect_cb, fut, sock, address)
  433. except Exception as exc:
  434. fut.set_exception(exc)
  435. else:
  436. fut.set_result(None)
  437. def _sock_connect_done(self, fd, fut):
  438. self.remove_writer(fd)
  439. def _sock_connect_cb(self, fut, sock, address):
  440. if fut.cancelled():
  441. return
  442. try:
  443. err = sock.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR)
  444. if err != 0:
  445. # Jump to any except clause below.
  446. raise OSError(err, f'Connect call failed {address}')
  447. except (BlockingIOError, InterruptedError):
  448. # socket is still registered, the callback will be retried later
  449. pass
  450. except Exception as exc:
  451. fut.set_exception(exc)
  452. else:
  453. fut.set_result(None)
  454. async def sock_accept(self, sock):
  455. """Accept a connection.
  456. The socket must be bound to an address and listening for connections.
  457. The return value is a pair (conn, address) where conn is a new socket
  458. object usable to send and receive data on the connection, and address
  459. is the address bound to the socket on the other end of the connection.
  460. """
  461. _check_ssl_socket(sock)
  462. if self._debug and sock.gettimeout() != 0:
  463. raise ValueError("the socket must be non-blocking")
  464. fut = self.create_future()
  465. self._sock_accept(fut, False, sock)
  466. return await fut
  467. def _sock_accept(self, fut, registered, sock):
  468. fd = sock.fileno()
  469. if registered:
  470. self.remove_reader(fd)
  471. if fut.cancelled():
  472. return
  473. try:
  474. conn, address = sock.accept()
  475. conn.setblocking(False)
  476. except (BlockingIOError, InterruptedError):
  477. self.add_reader(fd, self._sock_accept, fut, True, sock)
  478. except Exception as exc:
  479. fut.set_exception(exc)
  480. else:
  481. fut.set_result((conn, address))
  482. async def _sendfile_native(self, transp, file, offset, count):
  483. del self._transports[transp._sock_fd]
  484. resume_reading = transp.is_reading()
  485. transp.pause_reading()
  486. await transp._make_empty_waiter()
  487. try:
  488. return await self.sock_sendfile(transp._sock, file, offset, count,
  489. fallback=False)
  490. finally:
  491. transp._reset_empty_waiter()
  492. if resume_reading:
  493. transp.resume_reading()
  494. self._transports[transp._sock_fd] = transp
  495. def _process_events(self, event_list):
  496. for key, mask in event_list:
  497. fileobj, (reader, writer) = key.fileobj, key.data
  498. if mask & selectors.EVENT_READ and reader is not None:
  499. if reader._cancelled:
  500. self._remove_reader(fileobj)
  501. else:
  502. self._add_callback(reader)
  503. if mask & selectors.EVENT_WRITE and writer is not None:
  504. if writer._cancelled:
  505. self._remove_writer(fileobj)
  506. else:
  507. self._add_callback(writer)
  508. def _stop_serving(self, sock):
  509. self._remove_reader(sock.fileno())
  510. sock.close()
  511. class _SelectorTransport(transports._FlowControlMixin,
  512. transports.Transport):
  513. max_size = 256 * 1024 # Buffer size passed to recv().
  514. _buffer_factory = bytearray # Constructs initial value for self._buffer.
  515. # Attribute used in the destructor: it must be set even if the constructor
  516. # is not called (see _SelectorSslTransport which may start by raising an
  517. # exception)
  518. _sock = None
  519. def __init__(self, loop, sock, protocol, extra=None, server=None):
  520. super().__init__(extra, loop)
  521. self._extra['socket'] = sock
  522. try:
  523. self._extra['sockname'] = sock.getsockname()
  524. except OSError:
  525. self._extra['sockname'] = None
  526. if 'peername' not in self._extra:
  527. try:
  528. self._extra['peername'] = sock.getpeername()
  529. except socket.error:
  530. self._extra['peername'] = None
  531. self._sock = sock
  532. self._sock_fd = sock.fileno()
  533. self._protocol_connected = False
  534. self.set_protocol(protocol)
  535. self._server = server
  536. self._buffer = self._buffer_factory()
  537. self._conn_lost = 0 # Set when call to connection_lost scheduled.
  538. self._closing = False # Set when close() called.
  539. if self._server is not None:
  540. self._server._attach()
  541. loop._transports[self._sock_fd] = self
  542. def __repr__(self):
  543. info = [self.__class__.__name__]
  544. if self._sock is None:
  545. info.append('closed')
  546. elif self._closing:
  547. info.append('closing')
  548. info.append(f'fd={self._sock_fd}')
  549. # test if the transport was closed
  550. if self._loop is not None and not self._loop.is_closed():
  551. polling = _test_selector_event(self._loop._selector,
  552. self._sock_fd, selectors.EVENT_READ)
  553. if polling:
  554. info.append('read=polling')
  555. else:
  556. info.append('read=idle')
  557. polling = _test_selector_event(self._loop._selector,
  558. self._sock_fd,
  559. selectors.EVENT_WRITE)
  560. if polling:
  561. state = 'polling'
  562. else:
  563. state = 'idle'
  564. bufsize = self.get_write_buffer_size()
  565. info.append(f'write=<{state}, bufsize={bufsize}>')
  566. return '<{}>'.format(' '.join(info))
  567. def abort(self):
  568. self._force_close(None)
  569. def set_protocol(self, protocol):
  570. self._protocol = protocol
  571. self._protocol_connected = True
  572. def get_protocol(self):
  573. return self._protocol
  574. def is_closing(self):
  575. return self._closing
  576. def close(self):
  577. if self._closing:
  578. return
  579. self._closing = True
  580. self._loop._remove_reader(self._sock_fd)
  581. if not self._buffer:
  582. self._conn_lost += 1
  583. self._loop._remove_writer(self._sock_fd)
  584. self._loop.call_soon(self._call_connection_lost, None)
  585. def __del__(self):
  586. if self._sock is not None:
  587. warnings.warn(f"unclosed transport {self!r}", ResourceWarning,
  588. source=self)
  589. self._sock.close()
  590. def _fatal_error(self, exc, message='Fatal error on transport'):
  591. # Should be called from exception handler only.
  592. if isinstance(exc, OSError):
  593. if self._loop.get_debug():
  594. logger.debug("%r: %s", self, message, exc_info=True)
  595. else:
  596. self._loop.call_exception_handler({
  597. 'message': message,
  598. 'exception': exc,
  599. 'transport': self,
  600. 'protocol': self._protocol,
  601. })
  602. self._force_close(exc)
  603. def _force_close(self, exc):
  604. if self._conn_lost:
  605. return
  606. if self._buffer:
  607. self._buffer.clear()
  608. self._loop._remove_writer(self._sock_fd)
  609. if not self._closing:
  610. self._closing = True
  611. self._loop._remove_reader(self._sock_fd)
  612. self._conn_lost += 1
  613. self._loop.call_soon(self._call_connection_lost, exc)
  614. def _call_connection_lost(self, exc):
  615. try:
  616. if self._protocol_connected:
  617. self._protocol.connection_lost(exc)
  618. finally:
  619. self._sock.close()
  620. self._sock = None
  621. self._protocol = None
  622. self._loop = None
  623. server = self._server
  624. if server is not None:
  625. server._detach()
  626. self._server = None
  627. def get_write_buffer_size(self):
  628. return len(self._buffer)
  629. def _add_reader(self, fd, callback, *args):
  630. if self._closing:
  631. return
  632. self._loop._add_reader(fd, callback, *args)
  633. class _SelectorSocketTransport(_SelectorTransport):
  634. _start_tls_compatible = True
  635. _sendfile_compatible = constants._SendfileMode.TRY_NATIVE
  636. def __init__(self, loop, sock, protocol, waiter=None,
  637. extra=None, server=None):
  638. self._read_ready_cb = None
  639. super().__init__(loop, sock, protocol, extra, server)
  640. self._eof = False
  641. self._paused = False
  642. self._empty_waiter = None
  643. # Disable the Nagle algorithm -- small writes will be
  644. # sent without waiting for the TCP ACK. This generally
  645. # decreases the latency (in some cases significantly.)
  646. base_events._set_nodelay(self._sock)
  647. self._loop.call_soon(self._protocol.connection_made, self)
  648. # only start reading when connection_made() has been called
  649. self._loop.call_soon(self._add_reader,
  650. self._sock_fd, self._read_ready)
  651. if waiter is not None:
  652. # only wake up the waiter when connection_made() has been called
  653. self._loop.call_soon(futures._set_result_unless_cancelled,
  654. waiter, None)
  655. def set_protocol(self, protocol):
  656. if isinstance(protocol, protocols.BufferedProtocol):
  657. self._read_ready_cb = self._read_ready__get_buffer
  658. else:
  659. self._read_ready_cb = self._read_ready__data_received
  660. super().set_protocol(protocol)
  661. def is_reading(self):
  662. return not self._paused and not self._closing
  663. def pause_reading(self):
  664. if self._closing or self._paused:
  665. return
  666. self._paused = True
  667. self._loop._remove_reader(self._sock_fd)
  668. if self._loop.get_debug():
  669. logger.debug("%r pauses reading", self)
  670. def resume_reading(self):
  671. if self._closing or not self._paused:
  672. return
  673. self._paused = False
  674. self._add_reader(self._sock_fd, self._read_ready)
  675. if self._loop.get_debug():
  676. logger.debug("%r resumes reading", self)
  677. def _read_ready(self):
  678. self._read_ready_cb()
  679. def _read_ready__get_buffer(self):
  680. if self._conn_lost:
  681. return
  682. try:
  683. buf = self._protocol.get_buffer(-1)
  684. if not len(buf):
  685. raise RuntimeError('get_buffer() returned an empty buffer')
  686. except Exception as exc:
  687. self._fatal_error(
  688. exc, 'Fatal error: protocol.get_buffer() call failed.')
  689. return
  690. try:
  691. nbytes = self._sock.recv_into(buf)
  692. except (BlockingIOError, InterruptedError):
  693. return
  694. except Exception as exc:
  695. self._fatal_error(exc, 'Fatal read error on socket transport')
  696. return
  697. if not nbytes:
  698. self._read_ready__on_eof()
  699. return
  700. try:
  701. self._protocol.buffer_updated(nbytes)
  702. except Exception as exc:
  703. self._fatal_error(
  704. exc, 'Fatal error: protocol.buffer_updated() call failed.')
  705. def _read_ready__data_received(self):
  706. if self._conn_lost:
  707. return
  708. try:
  709. data = self._sock.recv(self.max_size)
  710. except (BlockingIOError, InterruptedError):
  711. return
  712. except Exception as exc:
  713. self._fatal_error(exc, 'Fatal read error on socket transport')
  714. return
  715. if not data:
  716. self._read_ready__on_eof()
  717. return
  718. try:
  719. self._protocol.data_received(data)
  720. except Exception as exc:
  721. self._fatal_error(
  722. exc, 'Fatal error: protocol.data_received() call failed.')
  723. def _read_ready__on_eof(self):
  724. if self._loop.get_debug():
  725. logger.debug("%r received EOF", self)
  726. try:
  727. keep_open = self._protocol.eof_received()
  728. except Exception as exc:
  729. self._fatal_error(
  730. exc, 'Fatal error: protocol.eof_received() call failed.')
  731. return
  732. if keep_open:
  733. # We're keeping the connection open so the
  734. # protocol can write more, but we still can't
  735. # receive more, so remove the reader callback.
  736. self._loop._remove_reader(self._sock_fd)
  737. else:
  738. self.close()
  739. def write(self, data):
  740. if not isinstance(data, (bytes, bytearray, memoryview)):
  741. raise TypeError(f'data argument must be a bytes-like object, '
  742. f'not {type(data).__name__!r}')
  743. if self._eof:
  744. raise RuntimeError('Cannot call write() after write_eof()')
  745. if self._empty_waiter is not None:
  746. raise RuntimeError('unable to write; sendfile is in progress')
  747. if not data:
  748. return
  749. if self._conn_lost:
  750. if self._conn_lost >= constants.LOG_THRESHOLD_FOR_CONNLOST_WRITES:
  751. logger.warning('socket.send() raised exception.')
  752. self._conn_lost += 1
  753. return
  754. if not self._buffer:
  755. # Optimization: try to send now.
  756. try:
  757. n = self._sock.send(data)
  758. except (BlockingIOError, InterruptedError):
  759. pass
  760. except Exception as exc:
  761. self._fatal_error(exc, 'Fatal write error on socket transport')
  762. return
  763. else:
  764. data = data[n:]
  765. if not data:
  766. return
  767. # Not all was written; register write handler.
  768. self._loop._add_writer(self._sock_fd, self._write_ready)
  769. # Add it to the buffer.
  770. self._buffer.extend(data)
  771. self._maybe_pause_protocol()
  772. def _write_ready(self):
  773. assert self._buffer, 'Data should not be empty'
  774. if self._conn_lost:
  775. return
  776. try:
  777. n = self._sock.send(self._buffer)
  778. except (BlockingIOError, InterruptedError):
  779. pass
  780. except Exception as exc:
  781. self._loop._remove_writer(self._sock_fd)
  782. self._buffer.clear()
  783. self._fatal_error(exc, 'Fatal write error on socket transport')
  784. if self._empty_waiter is not None:
  785. self._empty_waiter.set_exception(exc)
  786. else:
  787. if n:
  788. del self._buffer[:n]
  789. self._maybe_resume_protocol() # May append to buffer.
  790. if not self._buffer:
  791. self._loop._remove_writer(self._sock_fd)
  792. if self._empty_waiter is not None:
  793. self._empty_waiter.set_result(None)
  794. if self._closing:
  795. self._call_connection_lost(None)
  796. elif self._eof:
  797. self._sock.shutdown(socket.SHUT_WR)
  798. def write_eof(self):
  799. if self._closing or self._eof:
  800. return
  801. self._eof = True
  802. if not self._buffer:
  803. self._sock.shutdown(socket.SHUT_WR)
  804. def can_write_eof(self):
  805. return True
  806. def _call_connection_lost(self, exc):
  807. super()._call_connection_lost(exc)
  808. if self._empty_waiter is not None:
  809. self._empty_waiter.set_exception(
  810. ConnectionError("Connection is closed by peer"))
  811. def _make_empty_waiter(self):
  812. if self._empty_waiter is not None:
  813. raise RuntimeError("Empty waiter is already set")
  814. self._empty_waiter = self._loop.create_future()
  815. if not self._buffer:
  816. self._empty_waiter.set_result(None)
  817. return self._empty_waiter
  818. def _reset_empty_waiter(self):
  819. self._empty_waiter = None
  820. class _SelectorDatagramTransport(_SelectorTransport):
  821. _buffer_factory = collections.deque
  822. def __init__(self, loop, sock, protocol, address=None,
  823. waiter=None, extra=None):
  824. super().__init__(loop, sock, protocol, extra)
  825. self._address = address
  826. self._loop.call_soon(self._protocol.connection_made, self)
  827. # only start reading when connection_made() has been called
  828. self._loop.call_soon(self._add_reader,
  829. self._sock_fd, self._read_ready)
  830. if waiter is not None:
  831. # only wake up the waiter when connection_made() has been called
  832. self._loop.call_soon(futures._set_result_unless_cancelled,
  833. waiter, None)
  834. def get_write_buffer_size(self):
  835. return sum(len(data) for data, _ in self._buffer)
  836. def _read_ready(self):
  837. if self._conn_lost:
  838. return
  839. try:
  840. data, addr = self._sock.recvfrom(self.max_size)
  841. except (BlockingIOError, InterruptedError):
  842. pass
  843. except OSError as exc:
  844. self._protocol.error_received(exc)
  845. except Exception as exc:
  846. self._fatal_error(exc, 'Fatal read error on datagram transport')
  847. else:
  848. self._protocol.datagram_received(data, addr)
  849. def sendto(self, data, addr=None):
  850. if not isinstance(data, (bytes, bytearray, memoryview)):
  851. raise TypeError(f'data argument must be a bytes-like object, '
  852. f'not {type(data).__name__!r}')
  853. if not data:
  854. return
  855. if self._address:
  856. if addr not in (None, self._address):
  857. raise ValueError(
  858. f'Invalid address: must be None or {self._address}')
  859. addr = self._address
  860. if self._conn_lost and self._address:
  861. if self._conn_lost >= constants.LOG_THRESHOLD_FOR_CONNLOST_WRITES:
  862. logger.warning('socket.send() raised exception.')
  863. self._conn_lost += 1
  864. return
  865. if not self._buffer:
  866. # Attempt to send it right away first.
  867. try:
  868. if self._extra['peername']:
  869. self._sock.send(data)
  870. else:
  871. self._sock.sendto(data, addr)
  872. return
  873. except (BlockingIOError, InterruptedError):
  874. self._loop._add_writer(self._sock_fd, self._sendto_ready)
  875. except OSError as exc:
  876. self._protocol.error_received(exc)
  877. return
  878. except Exception as exc:
  879. self._fatal_error(
  880. exc, 'Fatal write error on datagram transport')
  881. return
  882. # Ensure that what we buffer is immutable.
  883. self._buffer.append((bytes(data), addr))
  884. self._maybe_pause_protocol()
  885. def _sendto_ready(self):
  886. while self._buffer:
  887. data, addr = self._buffer.popleft()
  888. try:
  889. if self._extra['peername']:
  890. self._sock.send(data)
  891. else:
  892. self._sock.sendto(data, addr)
  893. except (BlockingIOError, InterruptedError):
  894. self._buffer.appendleft((data, addr)) # Try again later.
  895. break
  896. except OSError as exc:
  897. self._protocol.error_received(exc)
  898. return
  899. except Exception as exc:
  900. self._fatal_error(
  901. exc, 'Fatal write error on datagram transport')
  902. return
  903. self._maybe_resume_protocol() # May append to buffer.
  904. if not self._buffer:
  905. self._loop._remove_writer(self._sock_fd)
  906. if self._closing:
  907. self._call_connection_lost(None)