windows_events.py 30 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840
  1. """Selector and proactor event loops for Windows."""
  2. import _overlapped
  3. import _winapi
  4. import errno
  5. import math
  6. import msvcrt
  7. import socket
  8. import struct
  9. import time
  10. import weakref
  11. from . import events
  12. from . import base_subprocess
  13. from . import futures
  14. from . import proactor_events
  15. from . import selector_events
  16. from . import tasks
  17. from . import windows_utils
  18. from .log import logger
  19. __all__ = (
  20. 'SelectorEventLoop', 'ProactorEventLoop', 'IocpProactor',
  21. 'DefaultEventLoopPolicy', 'WindowsSelectorEventLoopPolicy',
  22. 'WindowsProactorEventLoopPolicy',
  23. )
  24. NULL = 0
  25. INFINITE = 0xffffffff
  26. ERROR_CONNECTION_REFUSED = 1225
  27. ERROR_CONNECTION_ABORTED = 1236
  28. # Initial delay in seconds for connect_pipe() before retrying to connect
  29. CONNECT_PIPE_INIT_DELAY = 0.001
  30. # Maximum delay in seconds for connect_pipe() before retrying to connect
  31. CONNECT_PIPE_MAX_DELAY = 0.100
  32. class _OverlappedFuture(futures.Future):
  33. """Subclass of Future which represents an overlapped operation.
  34. Cancelling it will immediately cancel the overlapped operation.
  35. """
  36. def __init__(self, ov, *, loop=None):
  37. super().__init__(loop=loop)
  38. if self._source_traceback:
  39. del self._source_traceback[-1]
  40. self._ov = ov
  41. def _repr_info(self):
  42. info = super()._repr_info()
  43. if self._ov is not None:
  44. state = 'pending' if self._ov.pending else 'completed'
  45. info.insert(1, f'overlapped=<{state}, {self._ov.address:#x}>')
  46. return info
  47. def _cancel_overlapped(self):
  48. if self._ov is None:
  49. return
  50. try:
  51. self._ov.cancel()
  52. except OSError as exc:
  53. context = {
  54. 'message': 'Cancelling an overlapped future failed',
  55. 'exception': exc,
  56. 'future': self,
  57. }
  58. if self._source_traceback:
  59. context['source_traceback'] = self._source_traceback
  60. self._loop.call_exception_handler(context)
  61. self._ov = None
  62. def cancel(self):
  63. self._cancel_overlapped()
  64. return super().cancel()
  65. def set_exception(self, exception):
  66. super().set_exception(exception)
  67. self._cancel_overlapped()
  68. def set_result(self, result):
  69. super().set_result(result)
  70. self._ov = None
  71. class _BaseWaitHandleFuture(futures.Future):
  72. """Subclass of Future which represents a wait handle."""
  73. def __init__(self, ov, handle, wait_handle, *, loop=None):
  74. super().__init__(loop=loop)
  75. if self._source_traceback:
  76. del self._source_traceback[-1]
  77. # Keep a reference to the Overlapped object to keep it alive until the
  78. # wait is unregistered
  79. self._ov = ov
  80. self._handle = handle
  81. self._wait_handle = wait_handle
  82. # Should we call UnregisterWaitEx() if the wait completes
  83. # or is cancelled?
  84. self._registered = True
  85. def _poll(self):
  86. # non-blocking wait: use a timeout of 0 millisecond
  87. return (_winapi.WaitForSingleObject(self._handle, 0) ==
  88. _winapi.WAIT_OBJECT_0)
  89. def _repr_info(self):
  90. info = super()._repr_info()
  91. info.append(f'handle={self._handle:#x}')
  92. if self._handle is not None:
  93. state = 'signaled' if self._poll() else 'waiting'
  94. info.append(state)
  95. if self._wait_handle is not None:
  96. info.append(f'wait_handle={self._wait_handle:#x}')
  97. return info
  98. def _unregister_wait_cb(self, fut):
  99. # The wait was unregistered: it's not safe to destroy the Overlapped
  100. # object
  101. self._ov = None
  102. def _unregister_wait(self):
  103. if not self._registered:
  104. return
  105. self._registered = False
  106. wait_handle = self._wait_handle
  107. self._wait_handle = None
  108. try:
  109. _overlapped.UnregisterWait(wait_handle)
  110. except OSError as exc:
  111. if exc.winerror != _overlapped.ERROR_IO_PENDING:
  112. context = {
  113. 'message': 'Failed to unregister the wait handle',
  114. 'exception': exc,
  115. 'future': self,
  116. }
  117. if self._source_traceback:
  118. context['source_traceback'] = self._source_traceback
  119. self._loop.call_exception_handler(context)
  120. return
  121. # ERROR_IO_PENDING means that the unregister is pending
  122. self._unregister_wait_cb(None)
  123. def cancel(self):
  124. self._unregister_wait()
  125. return super().cancel()
  126. def set_exception(self, exception):
  127. self._unregister_wait()
  128. super().set_exception(exception)
  129. def set_result(self, result):
  130. self._unregister_wait()
  131. super().set_result(result)
  132. class _WaitCancelFuture(_BaseWaitHandleFuture):
  133. """Subclass of Future which represents a wait for the cancellation of a
  134. _WaitHandleFuture using an event.
  135. """
  136. def __init__(self, ov, event, wait_handle, *, loop=None):
  137. super().__init__(ov, event, wait_handle, loop=loop)
  138. self._done_callback = None
  139. def cancel(self):
  140. raise RuntimeError("_WaitCancelFuture must not be cancelled")
  141. def set_result(self, result):
  142. super().set_result(result)
  143. if self._done_callback is not None:
  144. self._done_callback(self)
  145. def set_exception(self, exception):
  146. super().set_exception(exception)
  147. if self._done_callback is not None:
  148. self._done_callback(self)
  149. class _WaitHandleFuture(_BaseWaitHandleFuture):
  150. def __init__(self, ov, handle, wait_handle, proactor, *, loop=None):
  151. super().__init__(ov, handle, wait_handle, loop=loop)
  152. self._proactor = proactor
  153. self._unregister_proactor = True
  154. self._event = _overlapped.CreateEvent(None, True, False, None)
  155. self._event_fut = None
  156. def _unregister_wait_cb(self, fut):
  157. if self._event is not None:
  158. _winapi.CloseHandle(self._event)
  159. self._event = None
  160. self._event_fut = None
  161. # If the wait was cancelled, the wait may never be signalled, so
  162. # it's required to unregister it. Otherwise, IocpProactor.close() will
  163. # wait forever for an event which will never come.
  164. #
  165. # If the IocpProactor already received the event, it's safe to call
  166. # _unregister() because we kept a reference to the Overlapped object
  167. # which is used as a unique key.
  168. self._proactor._unregister(self._ov)
  169. self._proactor = None
  170. super()._unregister_wait_cb(fut)
  171. def _unregister_wait(self):
  172. if not self._registered:
  173. return
  174. self._registered = False
  175. wait_handle = self._wait_handle
  176. self._wait_handle = None
  177. try:
  178. _overlapped.UnregisterWaitEx(wait_handle, self._event)
  179. except OSError as exc:
  180. if exc.winerror != _overlapped.ERROR_IO_PENDING:
  181. context = {
  182. 'message': 'Failed to unregister the wait handle',
  183. 'exception': exc,
  184. 'future': self,
  185. }
  186. if self._source_traceback:
  187. context['source_traceback'] = self._source_traceback
  188. self._loop.call_exception_handler(context)
  189. return
  190. # ERROR_IO_PENDING is not an error, the wait was unregistered
  191. self._event_fut = self._proactor._wait_cancel(self._event,
  192. self._unregister_wait_cb)
  193. class PipeServer(object):
  194. """Class representing a pipe server.
  195. This is much like a bound, listening socket.
  196. """
  197. def __init__(self, address):
  198. self._address = address
  199. self._free_instances = weakref.WeakSet()
  200. # initialize the pipe attribute before calling _server_pipe_handle()
  201. # because this function can raise an exception and the destructor calls
  202. # the close() method
  203. self._pipe = None
  204. self._accept_pipe_future = None
  205. self._pipe = self._server_pipe_handle(True)
  206. def _get_unconnected_pipe(self):
  207. # Create new instance and return previous one. This ensures
  208. # that (until the server is closed) there is always at least
  209. # one pipe handle for address. Therefore if a client attempt
  210. # to connect it will not fail with FileNotFoundError.
  211. tmp, self._pipe = self._pipe, self._server_pipe_handle(False)
  212. return tmp
  213. def _server_pipe_handle(self, first):
  214. # Return a wrapper for a new pipe handle.
  215. if self.closed():
  216. return None
  217. flags = _winapi.PIPE_ACCESS_DUPLEX | _winapi.FILE_FLAG_OVERLAPPED
  218. if first:
  219. flags |= _winapi.FILE_FLAG_FIRST_PIPE_INSTANCE
  220. h = _winapi.CreateNamedPipe(
  221. self._address, flags,
  222. _winapi.PIPE_TYPE_MESSAGE | _winapi.PIPE_READMODE_MESSAGE |
  223. _winapi.PIPE_WAIT,
  224. _winapi.PIPE_UNLIMITED_INSTANCES,
  225. windows_utils.BUFSIZE, windows_utils.BUFSIZE,
  226. _winapi.NMPWAIT_WAIT_FOREVER, _winapi.NULL)
  227. pipe = windows_utils.PipeHandle(h)
  228. self._free_instances.add(pipe)
  229. return pipe
  230. def closed(self):
  231. return (self._address is None)
  232. def close(self):
  233. if self._accept_pipe_future is not None:
  234. self._accept_pipe_future.cancel()
  235. self._accept_pipe_future = None
  236. # Close all instances which have not been connected to by a client.
  237. if self._address is not None:
  238. for pipe in self._free_instances:
  239. pipe.close()
  240. self._pipe = None
  241. self._address = None
  242. self._free_instances.clear()
  243. __del__ = close
  244. class _WindowsSelectorEventLoop(selector_events.BaseSelectorEventLoop):
  245. """Windows version of selector event loop."""
  246. class ProactorEventLoop(proactor_events.BaseProactorEventLoop):
  247. """Windows version of proactor event loop using IOCP."""
  248. def __init__(self, proactor=None):
  249. if proactor is None:
  250. proactor = IocpProactor()
  251. super().__init__(proactor)
  252. async def create_pipe_connection(self, protocol_factory, address):
  253. f = self._proactor.connect_pipe(address)
  254. pipe = await f
  255. protocol = protocol_factory()
  256. trans = self._make_duplex_pipe_transport(pipe, protocol,
  257. extra={'addr': address})
  258. return trans, protocol
  259. async def start_serving_pipe(self, protocol_factory, address):
  260. server = PipeServer(address)
  261. def loop_accept_pipe(f=None):
  262. pipe = None
  263. try:
  264. if f:
  265. pipe = f.result()
  266. server._free_instances.discard(pipe)
  267. if server.closed():
  268. # A client connected before the server was closed:
  269. # drop the client (close the pipe) and exit
  270. pipe.close()
  271. return
  272. protocol = protocol_factory()
  273. self._make_duplex_pipe_transport(
  274. pipe, protocol, extra={'addr': address})
  275. pipe = server._get_unconnected_pipe()
  276. if pipe is None:
  277. return
  278. f = self._proactor.accept_pipe(pipe)
  279. except OSError as exc:
  280. if pipe and pipe.fileno() != -1:
  281. self.call_exception_handler({
  282. 'message': 'Pipe accept failed',
  283. 'exception': exc,
  284. 'pipe': pipe,
  285. })
  286. pipe.close()
  287. elif self._debug:
  288. logger.warning("Accept pipe failed on pipe %r",
  289. pipe, exc_info=True)
  290. except futures.CancelledError:
  291. if pipe:
  292. pipe.close()
  293. else:
  294. server._accept_pipe_future = f
  295. f.add_done_callback(loop_accept_pipe)
  296. self.call_soon(loop_accept_pipe)
  297. return [server]
  298. async def _make_subprocess_transport(self, protocol, args, shell,
  299. stdin, stdout, stderr, bufsize,
  300. extra=None, **kwargs):
  301. waiter = self.create_future()
  302. transp = _WindowsSubprocessTransport(self, protocol, args, shell,
  303. stdin, stdout, stderr, bufsize,
  304. waiter=waiter, extra=extra,
  305. **kwargs)
  306. try:
  307. await waiter
  308. except Exception:
  309. transp.close()
  310. await transp._wait()
  311. raise
  312. return transp
  313. class IocpProactor:
  314. """Proactor implementation using IOCP."""
  315. def __init__(self, concurrency=0xffffffff):
  316. self._loop = None
  317. self._results = []
  318. self._iocp = _overlapped.CreateIoCompletionPort(
  319. _overlapped.INVALID_HANDLE_VALUE, NULL, 0, concurrency)
  320. self._cache = {}
  321. self._registered = weakref.WeakSet()
  322. self._unregistered = []
  323. self._stopped_serving = weakref.WeakSet()
  324. def _check_closed(self):
  325. if self._iocp is None:
  326. raise RuntimeError('IocpProactor is closed')
  327. def __repr__(self):
  328. info = ['overlapped#=%s' % len(self._cache),
  329. 'result#=%s' % len(self._results)]
  330. if self._iocp is None:
  331. info.append('closed')
  332. return '<%s %s>' % (self.__class__.__name__, " ".join(info))
  333. def set_loop(self, loop):
  334. self._loop = loop
  335. def select(self, timeout=None):
  336. if not self._results:
  337. self._poll(timeout)
  338. tmp = self._results
  339. self._results = []
  340. return tmp
  341. def _result(self, value):
  342. fut = self._loop.create_future()
  343. fut.set_result(value)
  344. return fut
  345. def recv(self, conn, nbytes, flags=0):
  346. self._register_with_iocp(conn)
  347. ov = _overlapped.Overlapped(NULL)
  348. try:
  349. if isinstance(conn, socket.socket):
  350. ov.WSARecv(conn.fileno(), nbytes, flags)
  351. else:
  352. ov.ReadFile(conn.fileno(), nbytes)
  353. except BrokenPipeError:
  354. return self._result(b'')
  355. def finish_recv(trans, key, ov):
  356. try:
  357. return ov.getresult()
  358. except OSError as exc:
  359. if exc.winerror in (_overlapped.ERROR_NETNAME_DELETED,
  360. _overlapped.ERROR_OPERATION_ABORTED):
  361. raise ConnectionResetError(*exc.args)
  362. else:
  363. raise
  364. return self._register(ov, conn, finish_recv)
  365. def recv_into(self, conn, buf, flags=0):
  366. self._register_with_iocp(conn)
  367. ov = _overlapped.Overlapped(NULL)
  368. try:
  369. if isinstance(conn, socket.socket):
  370. ov.WSARecvInto(conn.fileno(), buf, flags)
  371. else:
  372. ov.ReadFileInto(conn.fileno(), buf)
  373. except BrokenPipeError:
  374. return self._result(b'')
  375. def finish_recv(trans, key, ov):
  376. try:
  377. return ov.getresult()
  378. except OSError as exc:
  379. if exc.winerror in (_overlapped.ERROR_NETNAME_DELETED,
  380. _overlapped.ERROR_OPERATION_ABORTED):
  381. raise ConnectionResetError(*exc.args)
  382. else:
  383. raise
  384. return self._register(ov, conn, finish_recv)
  385. def send(self, conn, buf, flags=0):
  386. self._register_with_iocp(conn)
  387. ov = _overlapped.Overlapped(NULL)
  388. if isinstance(conn, socket.socket):
  389. ov.WSASend(conn.fileno(), buf, flags)
  390. else:
  391. ov.WriteFile(conn.fileno(), buf)
  392. def finish_send(trans, key, ov):
  393. try:
  394. return ov.getresult()
  395. except OSError as exc:
  396. if exc.winerror in (_overlapped.ERROR_NETNAME_DELETED,
  397. _overlapped.ERROR_OPERATION_ABORTED):
  398. raise ConnectionResetError(*exc.args)
  399. else:
  400. raise
  401. return self._register(ov, conn, finish_send)
  402. def accept(self, listener):
  403. self._register_with_iocp(listener)
  404. conn = self._get_accept_socket(listener.family)
  405. ov = _overlapped.Overlapped(NULL)
  406. ov.AcceptEx(listener.fileno(), conn.fileno())
  407. def finish_accept(trans, key, ov):
  408. ov.getresult()
  409. # Use SO_UPDATE_ACCEPT_CONTEXT so getsockname() etc work.
  410. buf = struct.pack('@P', listener.fileno())
  411. conn.setsockopt(socket.SOL_SOCKET,
  412. _overlapped.SO_UPDATE_ACCEPT_CONTEXT, buf)
  413. conn.settimeout(listener.gettimeout())
  414. return conn, conn.getpeername()
  415. async def accept_coro(future, conn):
  416. # Coroutine closing the accept socket if the future is cancelled
  417. try:
  418. await future
  419. except futures.CancelledError:
  420. conn.close()
  421. raise
  422. future = self._register(ov, listener, finish_accept)
  423. coro = accept_coro(future, conn)
  424. tasks.ensure_future(coro, loop=self._loop)
  425. return future
  426. def connect(self, conn, address):
  427. self._register_with_iocp(conn)
  428. # The socket needs to be locally bound before we call ConnectEx().
  429. try:
  430. _overlapped.BindLocal(conn.fileno(), conn.family)
  431. except OSError as e:
  432. if e.winerror != errno.WSAEINVAL:
  433. raise
  434. # Probably already locally bound; check using getsockname().
  435. if conn.getsockname()[1] == 0:
  436. raise
  437. ov = _overlapped.Overlapped(NULL)
  438. ov.ConnectEx(conn.fileno(), address)
  439. def finish_connect(trans, key, ov):
  440. ov.getresult()
  441. # Use SO_UPDATE_CONNECT_CONTEXT so getsockname() etc work.
  442. conn.setsockopt(socket.SOL_SOCKET,
  443. _overlapped.SO_UPDATE_CONNECT_CONTEXT, 0)
  444. return conn
  445. return self._register(ov, conn, finish_connect)
  446. def sendfile(self, sock, file, offset, count):
  447. self._register_with_iocp(sock)
  448. ov = _overlapped.Overlapped(NULL)
  449. offset_low = offset & 0xffff_ffff
  450. offset_high = (offset >> 32) & 0xffff_ffff
  451. ov.TransmitFile(sock.fileno(),
  452. msvcrt.get_osfhandle(file.fileno()),
  453. offset_low, offset_high,
  454. count, 0, 0)
  455. def finish_sendfile(trans, key, ov):
  456. try:
  457. return ov.getresult()
  458. except OSError as exc:
  459. if exc.winerror in (_overlapped.ERROR_NETNAME_DELETED,
  460. _overlapped.ERROR_OPERATION_ABORTED):
  461. raise ConnectionResetError(*exc.args)
  462. else:
  463. raise
  464. return self._register(ov, sock, finish_sendfile)
  465. def accept_pipe(self, pipe):
  466. self._register_with_iocp(pipe)
  467. ov = _overlapped.Overlapped(NULL)
  468. connected = ov.ConnectNamedPipe(pipe.fileno())
  469. if connected:
  470. # ConnectNamePipe() failed with ERROR_PIPE_CONNECTED which means
  471. # that the pipe is connected. There is no need to wait for the
  472. # completion of the connection.
  473. return self._result(pipe)
  474. def finish_accept_pipe(trans, key, ov):
  475. ov.getresult()
  476. return pipe
  477. return self._register(ov, pipe, finish_accept_pipe)
  478. async def connect_pipe(self, address):
  479. delay = CONNECT_PIPE_INIT_DELAY
  480. while True:
  481. # Unfortunately there is no way to do an overlapped connect to
  482. # a pipe. Call CreateFile() in a loop until it doesn't fail with
  483. # ERROR_PIPE_BUSY.
  484. try:
  485. handle = _overlapped.ConnectPipe(address)
  486. break
  487. except OSError as exc:
  488. if exc.winerror != _overlapped.ERROR_PIPE_BUSY:
  489. raise
  490. # ConnectPipe() failed with ERROR_PIPE_BUSY: retry later
  491. delay = min(delay * 2, CONNECT_PIPE_MAX_DELAY)
  492. await tasks.sleep(delay, loop=self._loop)
  493. return windows_utils.PipeHandle(handle)
  494. def wait_for_handle(self, handle, timeout=None):
  495. """Wait for a handle.
  496. Return a Future object. The result of the future is True if the wait
  497. completed, or False if the wait did not complete (on timeout).
  498. """
  499. return self._wait_for_handle(handle, timeout, False)
  500. def _wait_cancel(self, event, done_callback):
  501. fut = self._wait_for_handle(event, None, True)
  502. # add_done_callback() cannot be used because the wait may only complete
  503. # in IocpProactor.close(), while the event loop is not running.
  504. fut._done_callback = done_callback
  505. return fut
  506. def _wait_for_handle(self, handle, timeout, _is_cancel):
  507. self._check_closed()
  508. if timeout is None:
  509. ms = _winapi.INFINITE
  510. else:
  511. # RegisterWaitForSingleObject() has a resolution of 1 millisecond,
  512. # round away from zero to wait *at least* timeout seconds.
  513. ms = math.ceil(timeout * 1e3)
  514. # We only create ov so we can use ov.address as a key for the cache.
  515. ov = _overlapped.Overlapped(NULL)
  516. wait_handle = _overlapped.RegisterWaitWithQueue(
  517. handle, self._iocp, ov.address, ms)
  518. if _is_cancel:
  519. f = _WaitCancelFuture(ov, handle, wait_handle, loop=self._loop)
  520. else:
  521. f = _WaitHandleFuture(ov, handle, wait_handle, self,
  522. loop=self._loop)
  523. if f._source_traceback:
  524. del f._source_traceback[-1]
  525. def finish_wait_for_handle(trans, key, ov):
  526. # Note that this second wait means that we should only use
  527. # this with handles types where a successful wait has no
  528. # effect. So events or processes are all right, but locks
  529. # or semaphores are not. Also note if the handle is
  530. # signalled and then quickly reset, then we may return
  531. # False even though we have not timed out.
  532. return f._poll()
  533. self._cache[ov.address] = (f, ov, 0, finish_wait_for_handle)
  534. return f
  535. def _register_with_iocp(self, obj):
  536. # To get notifications of finished ops on this objects sent to the
  537. # completion port, were must register the handle.
  538. if obj not in self._registered:
  539. self._registered.add(obj)
  540. _overlapped.CreateIoCompletionPort(obj.fileno(), self._iocp, 0, 0)
  541. # XXX We could also use SetFileCompletionNotificationModes()
  542. # to avoid sending notifications to completion port of ops
  543. # that succeed immediately.
  544. def _register(self, ov, obj, callback):
  545. self._check_closed()
  546. # Return a future which will be set with the result of the
  547. # operation when it completes. The future's value is actually
  548. # the value returned by callback().
  549. f = _OverlappedFuture(ov, loop=self._loop)
  550. if f._source_traceback:
  551. del f._source_traceback[-1]
  552. if not ov.pending:
  553. # The operation has completed, so no need to postpone the
  554. # work. We cannot take this short cut if we need the
  555. # NumberOfBytes, CompletionKey values returned by
  556. # PostQueuedCompletionStatus().
  557. try:
  558. value = callback(None, None, ov)
  559. except OSError as e:
  560. f.set_exception(e)
  561. else:
  562. f.set_result(value)
  563. # Even if GetOverlappedResult() was called, we have to wait for the
  564. # notification of the completion in GetQueuedCompletionStatus().
  565. # Register the overlapped operation to keep a reference to the
  566. # OVERLAPPED object, otherwise the memory is freed and Windows may
  567. # read uninitialized memory.
  568. # Register the overlapped operation for later. Note that
  569. # we only store obj to prevent it from being garbage
  570. # collected too early.
  571. self._cache[ov.address] = (f, ov, obj, callback)
  572. return f
  573. def _unregister(self, ov):
  574. """Unregister an overlapped object.
  575. Call this method when its future has been cancelled. The event can
  576. already be signalled (pending in the proactor event queue). It is also
  577. safe if the event is never signalled (because it was cancelled).
  578. """
  579. self._check_closed()
  580. self._unregistered.append(ov)
  581. def _get_accept_socket(self, family):
  582. s = socket.socket(family)
  583. s.settimeout(0)
  584. return s
  585. def _poll(self, timeout=None):
  586. if timeout is None:
  587. ms = INFINITE
  588. elif timeout < 0:
  589. raise ValueError("negative timeout")
  590. else:
  591. # GetQueuedCompletionStatus() has a resolution of 1 millisecond,
  592. # round away from zero to wait *at least* timeout seconds.
  593. ms = math.ceil(timeout * 1e3)
  594. if ms >= INFINITE:
  595. raise ValueError("timeout too big")
  596. while True:
  597. status = _overlapped.GetQueuedCompletionStatus(self._iocp, ms)
  598. if status is None:
  599. break
  600. ms = 0
  601. err, transferred, key, address = status
  602. try:
  603. f, ov, obj, callback = self._cache.pop(address)
  604. except KeyError:
  605. if self._loop.get_debug():
  606. self._loop.call_exception_handler({
  607. 'message': ('GetQueuedCompletionStatus() returned an '
  608. 'unexpected event'),
  609. 'status': ('err=%s transferred=%s key=%#x address=%#x'
  610. % (err, transferred, key, address)),
  611. })
  612. # key is either zero, or it is used to return a pipe
  613. # handle which should be closed to avoid a leak.
  614. if key not in (0, _overlapped.INVALID_HANDLE_VALUE):
  615. _winapi.CloseHandle(key)
  616. continue
  617. if obj in self._stopped_serving:
  618. f.cancel()
  619. # Don't call the callback if _register() already read the result or
  620. # if the overlapped has been cancelled
  621. elif not f.done():
  622. try:
  623. value = callback(transferred, key, ov)
  624. except OSError as e:
  625. f.set_exception(e)
  626. self._results.append(f)
  627. else:
  628. f.set_result(value)
  629. self._results.append(f)
  630. # Remove unregistered futures
  631. for ov in self._unregistered:
  632. self._cache.pop(ov.address, None)
  633. self._unregistered.clear()
  634. def _stop_serving(self, obj):
  635. # obj is a socket or pipe handle. It will be closed in
  636. # BaseProactorEventLoop._stop_serving() which will make any
  637. # pending operations fail quickly.
  638. self._stopped_serving.add(obj)
  639. def close(self):
  640. if self._iocp is None:
  641. # already closed
  642. return
  643. # Cancel remaining registered operations.
  644. for address, (fut, ov, obj, callback) in list(self._cache.items()):
  645. if fut.cancelled():
  646. # Nothing to do with cancelled futures
  647. pass
  648. elif isinstance(fut, _WaitCancelFuture):
  649. # _WaitCancelFuture must not be cancelled
  650. pass
  651. else:
  652. try:
  653. fut.cancel()
  654. except OSError as exc:
  655. if self._loop is not None:
  656. context = {
  657. 'message': 'Cancelling a future failed',
  658. 'exception': exc,
  659. 'future': fut,
  660. }
  661. if fut._source_traceback:
  662. context['source_traceback'] = fut._source_traceback
  663. self._loop.call_exception_handler(context)
  664. # Wait until all cancelled overlapped complete: don't exit with running
  665. # overlapped to prevent a crash. Display progress every second if the
  666. # loop is still running.
  667. msg_update = 1.0
  668. start_time = time.monotonic()
  669. next_msg = start_time + msg_update
  670. while self._cache:
  671. if next_msg <= time.monotonic():
  672. logger.debug('%r is running after closing for %.1f seconds',
  673. self, time.monotonic() - start_time)
  674. next_msg = time.monotonic() + msg_update
  675. # handle a few events, or timeout
  676. self._poll(msg_update)
  677. self._results = []
  678. _winapi.CloseHandle(self._iocp)
  679. self._iocp = None
  680. def __del__(self):
  681. self.close()
  682. class _WindowsSubprocessTransport(base_subprocess.BaseSubprocessTransport):
  683. def _start(self, args, shell, stdin, stdout, stderr, bufsize, **kwargs):
  684. self._proc = windows_utils.Popen(
  685. args, shell=shell, stdin=stdin, stdout=stdout, stderr=stderr,
  686. bufsize=bufsize, **kwargs)
  687. def callback(f):
  688. returncode = self._proc.poll()
  689. self._process_exited(returncode)
  690. f = self._loop._proactor.wait_for_handle(int(self._proc._handle))
  691. f.add_done_callback(callback)
  692. SelectorEventLoop = _WindowsSelectorEventLoop
  693. class WindowsSelectorEventLoopPolicy(events.BaseDefaultEventLoopPolicy):
  694. _loop_factory = SelectorEventLoop
  695. class WindowsProactorEventLoopPolicy(events.BaseDefaultEventLoopPolicy):
  696. _loop_factory = ProactorEventLoop
  697. DefaultEventLoopPolicy = WindowsSelectorEventLoopPolicy