streams.py 24 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697
  1. __all__ = (
  2. 'StreamReader', 'StreamWriter', 'StreamReaderProtocol',
  3. 'open_connection', 'start_server',
  4. 'IncompleteReadError', 'LimitOverrunError',
  5. )
  6. import socket
  7. if hasattr(socket, 'AF_UNIX'):
  8. __all__ += ('open_unix_connection', 'start_unix_server')
  9. from . import coroutines
  10. from . import events
  11. from . import protocols
  12. from .log import logger
  13. from .tasks import sleep
  14. _DEFAULT_LIMIT = 2 ** 16 # 64 KiB
  15. class IncompleteReadError(EOFError):
  16. """
  17. Incomplete read error. Attributes:
  18. - partial: read bytes string before the end of stream was reached
  19. - expected: total number of expected bytes (or None if unknown)
  20. """
  21. def __init__(self, partial, expected):
  22. super().__init__(f'{len(partial)} bytes read on a total of '
  23. f'{expected!r} expected bytes')
  24. self.partial = partial
  25. self.expected = expected
  26. def __reduce__(self):
  27. return type(self), (self.partial, self.expected)
  28. class LimitOverrunError(Exception):
  29. """Reached the buffer limit while looking for a separator.
  30. Attributes:
  31. - consumed: total number of to be consumed bytes.
  32. """
  33. def __init__(self, message, consumed):
  34. super().__init__(message)
  35. self.consumed = consumed
  36. def __reduce__(self):
  37. return type(self), (self.args[0], self.consumed)
  38. async def open_connection(host=None, port=None, *,
  39. loop=None, limit=_DEFAULT_LIMIT, **kwds):
  40. """A wrapper for create_connection() returning a (reader, writer) pair.
  41. The reader returned is a StreamReader instance; the writer is a
  42. StreamWriter instance.
  43. The arguments are all the usual arguments to create_connection()
  44. except protocol_factory; most common are positional host and port,
  45. with various optional keyword arguments following.
  46. Additional optional keyword arguments are loop (to set the event loop
  47. instance to use) and limit (to set the buffer limit passed to the
  48. StreamReader).
  49. (If you want to customize the StreamReader and/or
  50. StreamReaderProtocol classes, just copy the code -- there's
  51. really nothing special here except some convenience.)
  52. """
  53. if loop is None:
  54. loop = events.get_event_loop()
  55. reader = StreamReader(limit=limit, loop=loop)
  56. protocol = StreamReaderProtocol(reader, loop=loop)
  57. transport, _ = await loop.create_connection(
  58. lambda: protocol, host, port, **kwds)
  59. writer = StreamWriter(transport, protocol, reader, loop)
  60. return reader, writer
  61. async def start_server(client_connected_cb, host=None, port=None, *,
  62. loop=None, limit=_DEFAULT_LIMIT, **kwds):
  63. """Start a socket server, call back for each client connected.
  64. The first parameter, `client_connected_cb`, takes two parameters:
  65. client_reader, client_writer. client_reader is a StreamReader
  66. object, while client_writer is a StreamWriter object. This
  67. parameter can either be a plain callback function or a coroutine;
  68. if it is a coroutine, it will be automatically converted into a
  69. Task.
  70. The rest of the arguments are all the usual arguments to
  71. loop.create_server() except protocol_factory; most common are
  72. positional host and port, with various optional keyword arguments
  73. following. The return value is the same as loop.create_server().
  74. Additional optional keyword arguments are loop (to set the event loop
  75. instance to use) and limit (to set the buffer limit passed to the
  76. StreamReader).
  77. The return value is the same as loop.create_server(), i.e. a
  78. Server object which can be used to stop the service.
  79. """
  80. if loop is None:
  81. loop = events.get_event_loop()
  82. def factory():
  83. reader = StreamReader(limit=limit, loop=loop)
  84. protocol = StreamReaderProtocol(reader, client_connected_cb,
  85. loop=loop)
  86. return protocol
  87. return await loop.create_server(factory, host, port, **kwds)
  88. if hasattr(socket, 'AF_UNIX'):
  89. # UNIX Domain Sockets are supported on this platform
  90. async def open_unix_connection(path=None, *,
  91. loop=None, limit=_DEFAULT_LIMIT, **kwds):
  92. """Similar to `open_connection` but works with UNIX Domain Sockets."""
  93. if loop is None:
  94. loop = events.get_event_loop()
  95. reader = StreamReader(limit=limit, loop=loop)
  96. protocol = StreamReaderProtocol(reader, loop=loop)
  97. transport, _ = await loop.create_unix_connection(
  98. lambda: protocol, path, **kwds)
  99. writer = StreamWriter(transport, protocol, reader, loop)
  100. return reader, writer
  101. async def start_unix_server(client_connected_cb, path=None, *,
  102. loop=None, limit=_DEFAULT_LIMIT, **kwds):
  103. """Similar to `start_server` but works with UNIX Domain Sockets."""
  104. if loop is None:
  105. loop = events.get_event_loop()
  106. def factory():
  107. reader = StreamReader(limit=limit, loop=loop)
  108. protocol = StreamReaderProtocol(reader, client_connected_cb,
  109. loop=loop)
  110. return protocol
  111. return await loop.create_unix_server(factory, path, **kwds)
  112. class FlowControlMixin(protocols.Protocol):
  113. """Reusable flow control logic for StreamWriter.drain().
  114. This implements the protocol methods pause_writing(),
  115. resume_writing() and connection_lost(). If the subclass overrides
  116. these it must call the super methods.
  117. StreamWriter.drain() must wait for _drain_helper() coroutine.
  118. """
  119. def __init__(self, loop=None):
  120. if loop is None:
  121. self._loop = events.get_event_loop()
  122. else:
  123. self._loop = loop
  124. self._paused = False
  125. self._drain_waiter = None
  126. self._connection_lost = False
  127. def pause_writing(self):
  128. assert not self._paused
  129. self._paused = True
  130. if self._loop.get_debug():
  131. logger.debug("%r pauses writing", self)
  132. def resume_writing(self):
  133. assert self._paused
  134. self._paused = False
  135. if self._loop.get_debug():
  136. logger.debug("%r resumes writing", self)
  137. waiter = self._drain_waiter
  138. if waiter is not None:
  139. self._drain_waiter = None
  140. if not waiter.done():
  141. waiter.set_result(None)
  142. def connection_lost(self, exc):
  143. self._connection_lost = True
  144. # Wake up the writer if currently paused.
  145. if not self._paused:
  146. return
  147. waiter = self._drain_waiter
  148. if waiter is None:
  149. return
  150. self._drain_waiter = None
  151. if waiter.done():
  152. return
  153. if exc is None:
  154. waiter.set_result(None)
  155. else:
  156. waiter.set_exception(exc)
  157. async def _drain_helper(self):
  158. if self._connection_lost:
  159. raise ConnectionResetError('Connection lost')
  160. if not self._paused:
  161. return
  162. waiter = self._drain_waiter
  163. assert waiter is None or waiter.cancelled()
  164. waiter = self._loop.create_future()
  165. self._drain_waiter = waiter
  166. await waiter
  167. class StreamReaderProtocol(FlowControlMixin, protocols.Protocol):
  168. """Helper class to adapt between Protocol and StreamReader.
  169. (This is a helper class instead of making StreamReader itself a
  170. Protocol subclass, because the StreamReader has other potential
  171. uses, and to prevent the user of the StreamReader to accidentally
  172. call inappropriate methods of the protocol.)
  173. """
  174. def __init__(self, stream_reader, client_connected_cb=None, loop=None):
  175. super().__init__(loop=loop)
  176. self._stream_reader = stream_reader
  177. self._stream_writer = None
  178. self._client_connected_cb = client_connected_cb
  179. self._over_ssl = False
  180. self._closed = self._loop.create_future()
  181. def connection_made(self, transport):
  182. self._stream_reader.set_transport(transport)
  183. self._over_ssl = transport.get_extra_info('sslcontext') is not None
  184. if self._client_connected_cb is not None:
  185. self._stream_writer = StreamWriter(transport, self,
  186. self._stream_reader,
  187. self._loop)
  188. res = self._client_connected_cb(self._stream_reader,
  189. self._stream_writer)
  190. if coroutines.iscoroutine(res):
  191. self._loop.create_task(res)
  192. def connection_lost(self, exc):
  193. if self._stream_reader is not None:
  194. if exc is None:
  195. self._stream_reader.feed_eof()
  196. else:
  197. self._stream_reader.set_exception(exc)
  198. if not self._closed.done():
  199. if exc is None:
  200. self._closed.set_result(None)
  201. else:
  202. self._closed.set_exception(exc)
  203. super().connection_lost(exc)
  204. self._stream_reader = None
  205. self._stream_writer = None
  206. def data_received(self, data):
  207. self._stream_reader.feed_data(data)
  208. def eof_received(self):
  209. self._stream_reader.feed_eof()
  210. if self._over_ssl:
  211. # Prevent a warning in SSLProtocol.eof_received:
  212. # "returning true from eof_received()
  213. # has no effect when using ssl"
  214. return False
  215. return True
  216. def __del__(self):
  217. # Prevent reports about unhandled exceptions.
  218. # Better than self._closed._log_traceback = False hack
  219. closed = self._closed
  220. if closed.done() and not closed.cancelled():
  221. closed.exception()
  222. class StreamWriter:
  223. """Wraps a Transport.
  224. This exposes write(), writelines(), [can_]write_eof(),
  225. get_extra_info() and close(). It adds drain() which returns an
  226. optional Future on which you can wait for flow control. It also
  227. adds a transport property which references the Transport
  228. directly.
  229. """
  230. def __init__(self, transport, protocol, reader, loop):
  231. self._transport = transport
  232. self._protocol = protocol
  233. # drain() expects that the reader has an exception() method
  234. assert reader is None or isinstance(reader, StreamReader)
  235. self._reader = reader
  236. self._loop = loop
  237. def __repr__(self):
  238. info = [self.__class__.__name__, f'transport={self._transport!r}']
  239. if self._reader is not None:
  240. info.append(f'reader={self._reader!r}')
  241. return '<{}>'.format(' '.join(info))
  242. @property
  243. def transport(self):
  244. return self._transport
  245. def write(self, data):
  246. self._transport.write(data)
  247. def writelines(self, data):
  248. self._transport.writelines(data)
  249. def write_eof(self):
  250. return self._transport.write_eof()
  251. def can_write_eof(self):
  252. return self._transport.can_write_eof()
  253. def close(self):
  254. return self._transport.close()
  255. def is_closing(self):
  256. return self._transport.is_closing()
  257. async def wait_closed(self):
  258. await self._protocol._closed
  259. def get_extra_info(self, name, default=None):
  260. return self._transport.get_extra_info(name, default)
  261. async def drain(self):
  262. """Flush the write buffer.
  263. The intended use is to write
  264. w.write(data)
  265. await w.drain()
  266. """
  267. if self._reader is not None:
  268. exc = self._reader.exception()
  269. if exc is not None:
  270. raise exc
  271. if self._transport.is_closing():
  272. # Yield to the event loop so connection_lost() may be
  273. # called. Without this, _drain_helper() would return
  274. # immediately, and code that calls
  275. # write(...); await drain()
  276. # in a loop would never call connection_lost(), so it
  277. # would not see an error when the socket is closed.
  278. await sleep(0, loop=self._loop)
  279. await self._protocol._drain_helper()
  280. class StreamReader:
  281. def __init__(self, limit=_DEFAULT_LIMIT, loop=None):
  282. # The line length limit is a security feature;
  283. # it also doubles as half the buffer limit.
  284. if limit <= 0:
  285. raise ValueError('Limit cannot be <= 0')
  286. self._limit = limit
  287. if loop is None:
  288. self._loop = events.get_event_loop()
  289. else:
  290. self._loop = loop
  291. self._buffer = bytearray()
  292. self._eof = False # Whether we're done.
  293. self._waiter = None # A future used by _wait_for_data()
  294. self._exception = None
  295. self._transport = None
  296. self._paused = False
  297. def __repr__(self):
  298. info = ['StreamReader']
  299. if self._buffer:
  300. info.append(f'{len(self._buffer)} bytes')
  301. if self._eof:
  302. info.append('eof')
  303. if self._limit != _DEFAULT_LIMIT:
  304. info.append(f'limit={self._limit}')
  305. if self._waiter:
  306. info.append(f'waiter={self._waiter!r}')
  307. if self._exception:
  308. info.append(f'exception={self._exception!r}')
  309. if self._transport:
  310. info.append(f'transport={self._transport!r}')
  311. if self._paused:
  312. info.append('paused')
  313. return '<{}>'.format(' '.join(info))
  314. def exception(self):
  315. return self._exception
  316. def set_exception(self, exc):
  317. self._exception = exc
  318. waiter = self._waiter
  319. if waiter is not None:
  320. self._waiter = None
  321. if not waiter.cancelled():
  322. waiter.set_exception(exc)
  323. def _wakeup_waiter(self):
  324. """Wakeup read*() functions waiting for data or EOF."""
  325. waiter = self._waiter
  326. if waiter is not None:
  327. self._waiter = None
  328. if not waiter.cancelled():
  329. waiter.set_result(None)
  330. def set_transport(self, transport):
  331. assert self._transport is None, 'Transport already set'
  332. self._transport = transport
  333. def _maybe_resume_transport(self):
  334. if self._paused and len(self._buffer) <= self._limit:
  335. self._paused = False
  336. self._transport.resume_reading()
  337. def feed_eof(self):
  338. self._eof = True
  339. self._wakeup_waiter()
  340. def at_eof(self):
  341. """Return True if the buffer is empty and 'feed_eof' was called."""
  342. return self._eof and not self._buffer
  343. def feed_data(self, data):
  344. assert not self._eof, 'feed_data after feed_eof'
  345. if not data:
  346. return
  347. self._buffer.extend(data)
  348. self._wakeup_waiter()
  349. if (self._transport is not None and
  350. not self._paused and
  351. len(self._buffer) > 2 * self._limit):
  352. try:
  353. self._transport.pause_reading()
  354. except NotImplementedError:
  355. # The transport can't be paused.
  356. # We'll just have to buffer all data.
  357. # Forget the transport so we don't keep trying.
  358. self._transport = None
  359. else:
  360. self._paused = True
  361. async def _wait_for_data(self, func_name):
  362. """Wait until feed_data() or feed_eof() is called.
  363. If stream was paused, automatically resume it.
  364. """
  365. # StreamReader uses a future to link the protocol feed_data() method
  366. # to a read coroutine. Running two read coroutines at the same time
  367. # would have an unexpected behaviour. It would not possible to know
  368. # which coroutine would get the next data.
  369. if self._waiter is not None:
  370. raise RuntimeError(
  371. f'{func_name}() called while another coroutine is '
  372. f'already waiting for incoming data')
  373. assert not self._eof, '_wait_for_data after EOF'
  374. # Waiting for data while paused will make deadlock, so prevent it.
  375. # This is essential for readexactly(n) for case when n > self._limit.
  376. if self._paused:
  377. self._paused = False
  378. self._transport.resume_reading()
  379. self._waiter = self._loop.create_future()
  380. try:
  381. await self._waiter
  382. finally:
  383. self._waiter = None
  384. async def readline(self):
  385. """Read chunk of data from the stream until newline (b'\n') is found.
  386. On success, return chunk that ends with newline. If only partial
  387. line can be read due to EOF, return incomplete line without
  388. terminating newline. When EOF was reached while no bytes read, empty
  389. bytes object is returned.
  390. If limit is reached, ValueError will be raised. In that case, if
  391. newline was found, complete line including newline will be removed
  392. from internal buffer. Else, internal buffer will be cleared. Limit is
  393. compared against part of the line without newline.
  394. If stream was paused, this function will automatically resume it if
  395. needed.
  396. """
  397. sep = b'\n'
  398. seplen = len(sep)
  399. try:
  400. line = await self.readuntil(sep)
  401. except IncompleteReadError as e:
  402. return e.partial
  403. except LimitOverrunError as e:
  404. if self._buffer.startswith(sep, e.consumed):
  405. del self._buffer[:e.consumed + seplen]
  406. else:
  407. self._buffer.clear()
  408. self._maybe_resume_transport()
  409. raise ValueError(e.args[0])
  410. return line
  411. async def readuntil(self, separator=b'\n'):
  412. """Read data from the stream until ``separator`` is found.
  413. On success, the data and separator will be removed from the
  414. internal buffer (consumed). Returned data will include the
  415. separator at the end.
  416. Configured stream limit is used to check result. Limit sets the
  417. maximal length of data that can be returned, not counting the
  418. separator.
  419. If an EOF occurs and the complete separator is still not found,
  420. an IncompleteReadError exception will be raised, and the internal
  421. buffer will be reset. The IncompleteReadError.partial attribute
  422. may contain the separator partially.
  423. If the data cannot be read because of over limit, a
  424. LimitOverrunError exception will be raised, and the data
  425. will be left in the internal buffer, so it can be read again.
  426. """
  427. seplen = len(separator)
  428. if seplen == 0:
  429. raise ValueError('Separator should be at least one-byte string')
  430. if self._exception is not None:
  431. raise self._exception
  432. # Consume whole buffer except last bytes, which length is
  433. # one less than seplen. Let's check corner cases with
  434. # separator='SEPARATOR':
  435. # * we have received almost complete separator (without last
  436. # byte). i.e buffer='some textSEPARATO'. In this case we
  437. # can safely consume len(separator) - 1 bytes.
  438. # * last byte of buffer is first byte of separator, i.e.
  439. # buffer='abcdefghijklmnopqrS'. We may safely consume
  440. # everything except that last byte, but this require to
  441. # analyze bytes of buffer that match partial separator.
  442. # This is slow and/or require FSM. For this case our
  443. # implementation is not optimal, since require rescanning
  444. # of data that is known to not belong to separator. In
  445. # real world, separator will not be so long to notice
  446. # performance problems. Even when reading MIME-encoded
  447. # messages :)
  448. # `offset` is the number of bytes from the beginning of the buffer
  449. # where there is no occurrence of `separator`.
  450. offset = 0
  451. # Loop until we find `separator` in the buffer, exceed the buffer size,
  452. # or an EOF has happened.
  453. while True:
  454. buflen = len(self._buffer)
  455. # Check if we now have enough data in the buffer for `separator` to
  456. # fit.
  457. if buflen - offset >= seplen:
  458. isep = self._buffer.find(separator, offset)
  459. if isep != -1:
  460. # `separator` is in the buffer. `isep` will be used later
  461. # to retrieve the data.
  462. break
  463. # see upper comment for explanation.
  464. offset = buflen + 1 - seplen
  465. if offset > self._limit:
  466. raise LimitOverrunError(
  467. 'Separator is not found, and chunk exceed the limit',
  468. offset)
  469. # Complete message (with full separator) may be present in buffer
  470. # even when EOF flag is set. This may happen when the last chunk
  471. # adds data which makes separator be found. That's why we check for
  472. # EOF *ater* inspecting the buffer.
  473. if self._eof:
  474. chunk = bytes(self._buffer)
  475. self._buffer.clear()
  476. raise IncompleteReadError(chunk, None)
  477. # _wait_for_data() will resume reading if stream was paused.
  478. await self._wait_for_data('readuntil')
  479. if isep > self._limit:
  480. raise LimitOverrunError(
  481. 'Separator is found, but chunk is longer than limit', isep)
  482. chunk = self._buffer[:isep + seplen]
  483. del self._buffer[:isep + seplen]
  484. self._maybe_resume_transport()
  485. return bytes(chunk)
  486. async def read(self, n=-1):
  487. """Read up to `n` bytes from the stream.
  488. If n is not provided, or set to -1, read until EOF and return all read
  489. bytes. If the EOF was received and the internal buffer is empty, return
  490. an empty bytes object.
  491. If n is zero, return empty bytes object immediately.
  492. If n is positive, this function try to read `n` bytes, and may return
  493. less or equal bytes than requested, but at least one byte. If EOF was
  494. received before any byte is read, this function returns empty byte
  495. object.
  496. Returned value is not limited with limit, configured at stream
  497. creation.
  498. If stream was paused, this function will automatically resume it if
  499. needed.
  500. """
  501. if self._exception is not None:
  502. raise self._exception
  503. if n == 0:
  504. return b''
  505. if n < 0:
  506. # This used to just loop creating a new waiter hoping to
  507. # collect everything in self._buffer, but that would
  508. # deadlock if the subprocess sends more than self.limit
  509. # bytes. So just call self.read(self._limit) until EOF.
  510. blocks = []
  511. while True:
  512. block = await self.read(self._limit)
  513. if not block:
  514. break
  515. blocks.append(block)
  516. return b''.join(blocks)
  517. if not self._buffer and not self._eof:
  518. await self._wait_for_data('read')
  519. # This will work right even if buffer is less than n bytes
  520. data = bytes(self._buffer[:n])
  521. del self._buffer[:n]
  522. self._maybe_resume_transport()
  523. return data
  524. async def readexactly(self, n):
  525. """Read exactly `n` bytes.
  526. Raise an IncompleteReadError if EOF is reached before `n` bytes can be
  527. read. The IncompleteReadError.partial attribute of the exception will
  528. contain the partial read bytes.
  529. if n is zero, return empty bytes object.
  530. Returned value is not limited with limit, configured at stream
  531. creation.
  532. If stream was paused, this function will automatically resume it if
  533. needed.
  534. """
  535. if n < 0:
  536. raise ValueError('readexactly size can not be less than zero')
  537. if self._exception is not None:
  538. raise self._exception
  539. if n == 0:
  540. return b''
  541. while len(self._buffer) < n:
  542. if self._eof:
  543. incomplete = bytes(self._buffer)
  544. self._buffer.clear()
  545. raise IncompleteReadError(incomplete, n)
  546. await self._wait_for_data('readexactly')
  547. if len(self._buffer) == n:
  548. data = bytes(self._buffer)
  549. self._buffer.clear()
  550. else:
  551. data = bytes(self._buffer[:n])
  552. del self._buffer[:n]
  553. self._maybe_resume_transport()
  554. return data
  555. def __aiter__(self):
  556. return self
  557. async def __anext__(self):
  558. val = await self.readline()
  559. if val == b'':
  560. raise StopAsyncIteration
  561. return val