pool.py 26 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819
  1. #
  2. # Module providing the `Pool` class for managing a process pool
  3. #
  4. # multiprocessing/pool.py
  5. #
  6. # Copyright (c) 2006-2008, R Oudkerk
  7. # Licensed to PSF under a Contributor Agreement.
  8. #
  9. __all__ = ['Pool', 'ThreadPool']
  10. #
  11. # Imports
  12. #
  13. import threading
  14. import queue
  15. import itertools
  16. import collections
  17. import os
  18. import time
  19. import traceback
  20. # If threading is available then ThreadPool should be provided. Therefore
  21. # we avoid top-level imports which are liable to fail on some systems.
  22. from . import util
  23. from . import get_context, TimeoutError
  24. #
  25. # Constants representing the state of a pool
  26. #
  27. RUN = 0
  28. CLOSE = 1
  29. TERMINATE = 2
  30. #
  31. # Miscellaneous
  32. #
  33. job_counter = itertools.count()
  34. def mapstar(args):
  35. return list(map(*args))
  36. def starmapstar(args):
  37. return list(itertools.starmap(args[0], args[1]))
  38. #
  39. # Hack to embed stringification of remote traceback in local traceback
  40. #
  41. class RemoteTraceback(Exception):
  42. def __init__(self, tb):
  43. self.tb = tb
  44. def __str__(self):
  45. return self.tb
  46. class ExceptionWithTraceback:
  47. def __init__(self, exc, tb):
  48. tb = traceback.format_exception(type(exc), exc, tb)
  49. tb = ''.join(tb)
  50. self.exc = exc
  51. self.tb = '\n"""\n%s"""' % tb
  52. def __reduce__(self):
  53. return rebuild_exc, (self.exc, self.tb)
  54. def rebuild_exc(exc, tb):
  55. exc.__cause__ = RemoteTraceback(tb)
  56. return exc
  57. #
  58. # Code run by worker processes
  59. #
  60. class MaybeEncodingError(Exception):
  61. """Wraps possible unpickleable errors, so they can be
  62. safely sent through the socket."""
  63. def __init__(self, exc, value):
  64. self.exc = repr(exc)
  65. self.value = repr(value)
  66. super(MaybeEncodingError, self).__init__(self.exc, self.value)
  67. def __str__(self):
  68. return "Error sending result: '%s'. Reason: '%s'" % (self.value,
  69. self.exc)
  70. def __repr__(self):
  71. return "<%s: %s>" % (self.__class__.__name__, self)
  72. def worker(inqueue, outqueue, initializer=None, initargs=(), maxtasks=None,
  73. wrap_exception=False):
  74. if (maxtasks is not None) and not (isinstance(maxtasks, int)
  75. and maxtasks >= 1):
  76. raise AssertionError("Maxtasks {!r} is not valid".format(maxtasks))
  77. put = outqueue.put
  78. get = inqueue.get
  79. if hasattr(inqueue, '_writer'):
  80. inqueue._writer.close()
  81. outqueue._reader.close()
  82. if initializer is not None:
  83. initializer(*initargs)
  84. completed = 0
  85. while maxtasks is None or (maxtasks and completed < maxtasks):
  86. try:
  87. task = get()
  88. except (EOFError, OSError):
  89. util.debug('worker got EOFError or OSError -- exiting')
  90. break
  91. if task is None:
  92. util.debug('worker got sentinel -- exiting')
  93. break
  94. job, i, func, args, kwds = task
  95. try:
  96. result = (True, func(*args, **kwds))
  97. except Exception as e:
  98. if wrap_exception and func is not _helper_reraises_exception:
  99. e = ExceptionWithTraceback(e, e.__traceback__)
  100. result = (False, e)
  101. try:
  102. put((job, i, result))
  103. except Exception as e:
  104. wrapped = MaybeEncodingError(e, result[1])
  105. util.debug("Possible encoding error while sending result: %s" % (
  106. wrapped))
  107. put((job, i, (False, wrapped)))
  108. task = job = result = func = args = kwds = None
  109. completed += 1
  110. util.debug('worker exiting after %d tasks' % completed)
  111. def _helper_reraises_exception(ex):
  112. 'Pickle-able helper function for use by _guarded_task_generation.'
  113. raise ex
  114. #
  115. # Class representing a process pool
  116. #
  117. class Pool(object):
  118. '''
  119. Class which supports an async version of applying functions to arguments.
  120. '''
  121. _wrap_exception = True
  122. def Process(self, *args, **kwds):
  123. return self._ctx.Process(*args, **kwds)
  124. def __init__(self, processes=None, initializer=None, initargs=(),
  125. maxtasksperchild=None, context=None):
  126. self._ctx = context or get_context()
  127. self._setup_queues()
  128. self._taskqueue = queue.SimpleQueue()
  129. self._cache = {}
  130. self._state = RUN
  131. self._maxtasksperchild = maxtasksperchild
  132. self._initializer = initializer
  133. self._initargs = initargs
  134. if processes is None:
  135. processes = os.cpu_count() or 1
  136. if processes < 1:
  137. raise ValueError("Number of processes must be at least 1")
  138. if initializer is not None and not callable(initializer):
  139. raise TypeError('initializer must be a callable')
  140. self._processes = processes
  141. self._pool = []
  142. self._repopulate_pool()
  143. self._worker_handler = threading.Thread(
  144. target=Pool._handle_workers,
  145. args=(self, )
  146. )
  147. self._worker_handler.daemon = True
  148. self._worker_handler._state = RUN
  149. self._worker_handler.start()
  150. self._task_handler = threading.Thread(
  151. target=Pool._handle_tasks,
  152. args=(self._taskqueue, self._quick_put, self._outqueue,
  153. self._pool, self._cache)
  154. )
  155. self._task_handler.daemon = True
  156. self._task_handler._state = RUN
  157. self._task_handler.start()
  158. self._result_handler = threading.Thread(
  159. target=Pool._handle_results,
  160. args=(self._outqueue, self._quick_get, self._cache)
  161. )
  162. self._result_handler.daemon = True
  163. self._result_handler._state = RUN
  164. self._result_handler.start()
  165. self._terminate = util.Finalize(
  166. self, self._terminate_pool,
  167. args=(self._taskqueue, self._inqueue, self._outqueue, self._pool,
  168. self._worker_handler, self._task_handler,
  169. self._result_handler, self._cache),
  170. exitpriority=15
  171. )
  172. def _join_exited_workers(self):
  173. """Cleanup after any worker processes which have exited due to reaching
  174. their specified lifetime. Returns True if any workers were cleaned up.
  175. """
  176. cleaned = False
  177. for i in reversed(range(len(self._pool))):
  178. worker = self._pool[i]
  179. if worker.exitcode is not None:
  180. # worker exited
  181. util.debug('cleaning up worker %d' % i)
  182. worker.join()
  183. cleaned = True
  184. del self._pool[i]
  185. return cleaned
  186. def _repopulate_pool(self):
  187. """Bring the number of pool processes up to the specified number,
  188. for use after reaping workers which have exited.
  189. """
  190. for i in range(self._processes - len(self._pool)):
  191. w = self.Process(target=worker,
  192. args=(self._inqueue, self._outqueue,
  193. self._initializer,
  194. self._initargs, self._maxtasksperchild,
  195. self._wrap_exception)
  196. )
  197. self._pool.append(w)
  198. w.name = w.name.replace('Process', 'PoolWorker')
  199. w.daemon = True
  200. w.start()
  201. util.debug('added worker')
  202. def _maintain_pool(self):
  203. """Clean up any exited workers and start replacements for them.
  204. """
  205. if self._join_exited_workers():
  206. self._repopulate_pool()
  207. def _setup_queues(self):
  208. self._inqueue = self._ctx.SimpleQueue()
  209. self._outqueue = self._ctx.SimpleQueue()
  210. self._quick_put = self._inqueue._writer.send
  211. self._quick_get = self._outqueue._reader.recv
  212. def apply(self, func, args=(), kwds={}):
  213. '''
  214. Equivalent of `func(*args, **kwds)`.
  215. Pool must be running.
  216. '''
  217. return self.apply_async(func, args, kwds).get()
  218. def map(self, func, iterable, chunksize=None):
  219. '''
  220. Apply `func` to each element in `iterable`, collecting the results
  221. in a list that is returned.
  222. '''
  223. return self._map_async(func, iterable, mapstar, chunksize).get()
  224. def starmap(self, func, iterable, chunksize=None):
  225. '''
  226. Like `map()` method but the elements of the `iterable` are expected to
  227. be iterables as well and will be unpacked as arguments. Hence
  228. `func` and (a, b) becomes func(a, b).
  229. '''
  230. return self._map_async(func, iterable, starmapstar, chunksize).get()
  231. def starmap_async(self, func, iterable, chunksize=None, callback=None,
  232. error_callback=None):
  233. '''
  234. Asynchronous version of `starmap()` method.
  235. '''
  236. return self._map_async(func, iterable, starmapstar, chunksize,
  237. callback, error_callback)
  238. def _guarded_task_generation(self, result_job, func, iterable):
  239. '''Provides a generator of tasks for imap and imap_unordered with
  240. appropriate handling for iterables which throw exceptions during
  241. iteration.'''
  242. try:
  243. i = -1
  244. for i, x in enumerate(iterable):
  245. yield (result_job, i, func, (x,), {})
  246. except Exception as e:
  247. yield (result_job, i+1, _helper_reraises_exception, (e,), {})
  248. def imap(self, func, iterable, chunksize=1):
  249. '''
  250. Equivalent of `map()` -- can be MUCH slower than `Pool.map()`.
  251. '''
  252. if self._state != RUN:
  253. raise ValueError("Pool not running")
  254. if chunksize == 1:
  255. result = IMapIterator(self._cache)
  256. self._taskqueue.put(
  257. (
  258. self._guarded_task_generation(result._job, func, iterable),
  259. result._set_length
  260. ))
  261. return result
  262. else:
  263. if chunksize < 1:
  264. raise ValueError(
  265. "Chunksize must be 1+, not {0:n}".format(
  266. chunksize))
  267. task_batches = Pool._get_tasks(func, iterable, chunksize)
  268. result = IMapIterator(self._cache)
  269. self._taskqueue.put(
  270. (
  271. self._guarded_task_generation(result._job,
  272. mapstar,
  273. task_batches),
  274. result._set_length
  275. ))
  276. return (item for chunk in result for item in chunk)
  277. def imap_unordered(self, func, iterable, chunksize=1):
  278. '''
  279. Like `imap()` method but ordering of results is arbitrary.
  280. '''
  281. if self._state != RUN:
  282. raise ValueError("Pool not running")
  283. if chunksize == 1:
  284. result = IMapUnorderedIterator(self._cache)
  285. self._taskqueue.put(
  286. (
  287. self._guarded_task_generation(result._job, func, iterable),
  288. result._set_length
  289. ))
  290. return result
  291. else:
  292. if chunksize < 1:
  293. raise ValueError(
  294. "Chunksize must be 1+, not {0!r}".format(chunksize))
  295. task_batches = Pool._get_tasks(func, iterable, chunksize)
  296. result = IMapUnorderedIterator(self._cache)
  297. self._taskqueue.put(
  298. (
  299. self._guarded_task_generation(result._job,
  300. mapstar,
  301. task_batches),
  302. result._set_length
  303. ))
  304. return (item for chunk in result for item in chunk)
  305. def apply_async(self, func, args=(), kwds={}, callback=None,
  306. error_callback=None):
  307. '''
  308. Asynchronous version of `apply()` method.
  309. '''
  310. if self._state != RUN:
  311. raise ValueError("Pool not running")
  312. result = ApplyResult(self._cache, callback, error_callback)
  313. self._taskqueue.put(([(result._job, 0, func, args, kwds)], None))
  314. return result
  315. def map_async(self, func, iterable, chunksize=None, callback=None,
  316. error_callback=None):
  317. '''
  318. Asynchronous version of `map()` method.
  319. '''
  320. return self._map_async(func, iterable, mapstar, chunksize, callback,
  321. error_callback)
  322. def _map_async(self, func, iterable, mapper, chunksize=None, callback=None,
  323. error_callback=None):
  324. '''
  325. Helper function to implement map, starmap and their async counterparts.
  326. '''
  327. if self._state != RUN:
  328. raise ValueError("Pool not running")
  329. if not hasattr(iterable, '__len__'):
  330. iterable = list(iterable)
  331. if chunksize is None:
  332. chunksize, extra = divmod(len(iterable), len(self._pool) * 4)
  333. if extra:
  334. chunksize += 1
  335. if len(iterable) == 0:
  336. chunksize = 0
  337. task_batches = Pool._get_tasks(func, iterable, chunksize)
  338. result = MapResult(self._cache, chunksize, len(iterable), callback,
  339. error_callback=error_callback)
  340. self._taskqueue.put(
  341. (
  342. self._guarded_task_generation(result._job,
  343. mapper,
  344. task_batches),
  345. None
  346. )
  347. )
  348. return result
  349. @staticmethod
  350. def _handle_workers(pool):
  351. thread = threading.current_thread()
  352. # Keep maintaining workers until the cache gets drained, unless the pool
  353. # is terminated.
  354. while thread._state == RUN or (pool._cache and thread._state != TERMINATE):
  355. pool._maintain_pool()
  356. time.sleep(0.1)
  357. # send sentinel to stop workers
  358. pool._taskqueue.put(None)
  359. util.debug('worker handler exiting')
  360. @staticmethod
  361. def _handle_tasks(taskqueue, put, outqueue, pool, cache):
  362. thread = threading.current_thread()
  363. for taskseq, set_length in iter(taskqueue.get, None):
  364. task = None
  365. try:
  366. # iterating taskseq cannot fail
  367. for task in taskseq:
  368. if thread._state:
  369. util.debug('task handler found thread._state != RUN')
  370. break
  371. try:
  372. put(task)
  373. except Exception as e:
  374. job, idx = task[:2]
  375. try:
  376. cache[job]._set(idx, (False, e))
  377. except KeyError:
  378. pass
  379. else:
  380. if set_length:
  381. util.debug('doing set_length()')
  382. idx = task[1] if task else -1
  383. set_length(idx + 1)
  384. continue
  385. break
  386. finally:
  387. task = taskseq = job = None
  388. else:
  389. util.debug('task handler got sentinel')
  390. try:
  391. # tell result handler to finish when cache is empty
  392. util.debug('task handler sending sentinel to result handler')
  393. outqueue.put(None)
  394. # tell workers there is no more work
  395. util.debug('task handler sending sentinel to workers')
  396. for p in pool:
  397. put(None)
  398. except OSError:
  399. util.debug('task handler got OSError when sending sentinels')
  400. util.debug('task handler exiting')
  401. @staticmethod
  402. def _handle_results(outqueue, get, cache):
  403. thread = threading.current_thread()
  404. while 1:
  405. try:
  406. task = get()
  407. except (OSError, EOFError):
  408. util.debug('result handler got EOFError/OSError -- exiting')
  409. return
  410. if thread._state:
  411. assert thread._state == TERMINATE, "Thread not in TERMINATE"
  412. util.debug('result handler found thread._state=TERMINATE')
  413. break
  414. if task is None:
  415. util.debug('result handler got sentinel')
  416. break
  417. job, i, obj = task
  418. try:
  419. cache[job]._set(i, obj)
  420. except KeyError:
  421. pass
  422. task = job = obj = None
  423. while cache and thread._state != TERMINATE:
  424. try:
  425. task = get()
  426. except (OSError, EOFError):
  427. util.debug('result handler got EOFError/OSError -- exiting')
  428. return
  429. if task is None:
  430. util.debug('result handler ignoring extra sentinel')
  431. continue
  432. job, i, obj = task
  433. try:
  434. cache[job]._set(i, obj)
  435. except KeyError:
  436. pass
  437. task = job = obj = None
  438. if hasattr(outqueue, '_reader'):
  439. util.debug('ensuring that outqueue is not full')
  440. # If we don't make room available in outqueue then
  441. # attempts to add the sentinel (None) to outqueue may
  442. # block. There is guaranteed to be no more than 2 sentinels.
  443. try:
  444. for i in range(10):
  445. if not outqueue._reader.poll():
  446. break
  447. get()
  448. except (OSError, EOFError):
  449. pass
  450. util.debug('result handler exiting: len(cache)=%s, thread._state=%s',
  451. len(cache), thread._state)
  452. @staticmethod
  453. def _get_tasks(func, it, size):
  454. it = iter(it)
  455. while 1:
  456. x = tuple(itertools.islice(it, size))
  457. if not x:
  458. return
  459. yield (func, x)
  460. def __reduce__(self):
  461. raise NotImplementedError(
  462. 'pool objects cannot be passed between processes or pickled'
  463. )
  464. def close(self):
  465. util.debug('closing pool')
  466. if self._state == RUN:
  467. self._state = CLOSE
  468. self._worker_handler._state = CLOSE
  469. def terminate(self):
  470. util.debug('terminating pool')
  471. self._state = TERMINATE
  472. self._worker_handler._state = TERMINATE
  473. self._terminate()
  474. def join(self):
  475. util.debug('joining pool')
  476. if self._state == RUN:
  477. raise ValueError("Pool is still running")
  478. elif self._state not in (CLOSE, TERMINATE):
  479. raise ValueError("In unknown state")
  480. self._worker_handler.join()
  481. self._task_handler.join()
  482. self._result_handler.join()
  483. for p in self._pool:
  484. p.join()
  485. @staticmethod
  486. def _help_stuff_finish(inqueue, task_handler, size):
  487. # task_handler may be blocked trying to put items on inqueue
  488. util.debug('removing tasks from inqueue until task handler finished')
  489. inqueue._rlock.acquire()
  490. while task_handler.is_alive() and inqueue._reader.poll():
  491. inqueue._reader.recv()
  492. time.sleep(0)
  493. @classmethod
  494. def _terminate_pool(cls, taskqueue, inqueue, outqueue, pool,
  495. worker_handler, task_handler, result_handler, cache):
  496. # this is guaranteed to only be called once
  497. util.debug('finalizing pool')
  498. worker_handler._state = TERMINATE
  499. task_handler._state = TERMINATE
  500. util.debug('helping task handler/workers to finish')
  501. cls._help_stuff_finish(inqueue, task_handler, len(pool))
  502. if (not result_handler.is_alive()) and (len(cache) != 0):
  503. raise AssertionError(
  504. "Cannot have cache with result_hander not alive")
  505. result_handler._state = TERMINATE
  506. outqueue.put(None) # sentinel
  507. # We must wait for the worker handler to exit before terminating
  508. # workers because we don't want workers to be restarted behind our back.
  509. util.debug('joining worker handler')
  510. if threading.current_thread() is not worker_handler:
  511. worker_handler.join()
  512. # Terminate workers which haven't already finished.
  513. if pool and hasattr(pool[0], 'terminate'):
  514. util.debug('terminating workers')
  515. for p in pool:
  516. if p.exitcode is None:
  517. p.terminate()
  518. util.debug('joining task handler')
  519. if threading.current_thread() is not task_handler:
  520. task_handler.join()
  521. util.debug('joining result handler')
  522. if threading.current_thread() is not result_handler:
  523. result_handler.join()
  524. if pool and hasattr(pool[0], 'terminate'):
  525. util.debug('joining pool workers')
  526. for p in pool:
  527. if p.is_alive():
  528. # worker has not yet exited
  529. util.debug('cleaning up worker %d' % p.pid)
  530. p.join()
  531. def __enter__(self):
  532. return self
  533. def __exit__(self, exc_type, exc_val, exc_tb):
  534. self.terminate()
  535. #
  536. # Class whose instances are returned by `Pool.apply_async()`
  537. #
  538. class ApplyResult(object):
  539. def __init__(self, cache, callback, error_callback):
  540. self._event = threading.Event()
  541. self._job = next(job_counter)
  542. self._cache = cache
  543. self._callback = callback
  544. self._error_callback = error_callback
  545. cache[self._job] = self
  546. def ready(self):
  547. return self._event.is_set()
  548. def successful(self):
  549. if not self.ready():
  550. raise ValueError("{0!r} not ready".format(self))
  551. return self._success
  552. def wait(self, timeout=None):
  553. self._event.wait(timeout)
  554. def get(self, timeout=None):
  555. self.wait(timeout)
  556. if not self.ready():
  557. raise TimeoutError
  558. if self._success:
  559. return self._value
  560. else:
  561. raise self._value
  562. def _set(self, i, obj):
  563. self._success, self._value = obj
  564. if self._callback and self._success:
  565. self._callback(self._value)
  566. if self._error_callback and not self._success:
  567. self._error_callback(self._value)
  568. self._event.set()
  569. del self._cache[self._job]
  570. AsyncResult = ApplyResult # create alias -- see #17805
  571. #
  572. # Class whose instances are returned by `Pool.map_async()`
  573. #
  574. class MapResult(ApplyResult):
  575. def __init__(self, cache, chunksize, length, callback, error_callback):
  576. ApplyResult.__init__(self, cache, callback,
  577. error_callback=error_callback)
  578. self._success = True
  579. self._value = [None] * length
  580. self._chunksize = chunksize
  581. if chunksize <= 0:
  582. self._number_left = 0
  583. self._event.set()
  584. del cache[self._job]
  585. else:
  586. self._number_left = length//chunksize + bool(length % chunksize)
  587. def _set(self, i, success_result):
  588. self._number_left -= 1
  589. success, result = success_result
  590. if success and self._success:
  591. self._value[i*self._chunksize:(i+1)*self._chunksize] = result
  592. if self._number_left == 0:
  593. if self._callback:
  594. self._callback(self._value)
  595. del self._cache[self._job]
  596. self._event.set()
  597. else:
  598. if not success and self._success:
  599. # only store first exception
  600. self._success = False
  601. self._value = result
  602. if self._number_left == 0:
  603. # only consider the result ready once all jobs are done
  604. if self._error_callback:
  605. self._error_callback(self._value)
  606. del self._cache[self._job]
  607. self._event.set()
  608. #
  609. # Class whose instances are returned by `Pool.imap()`
  610. #
  611. class IMapIterator(object):
  612. def __init__(self, cache):
  613. self._cond = threading.Condition(threading.Lock())
  614. self._job = next(job_counter)
  615. self._cache = cache
  616. self._items = collections.deque()
  617. self._index = 0
  618. self._length = None
  619. self._unsorted = {}
  620. cache[self._job] = self
  621. def __iter__(self):
  622. return self
  623. def next(self, timeout=None):
  624. with self._cond:
  625. try:
  626. item = self._items.popleft()
  627. except IndexError:
  628. if self._index == self._length:
  629. raise StopIteration from None
  630. self._cond.wait(timeout)
  631. try:
  632. item = self._items.popleft()
  633. except IndexError:
  634. if self._index == self._length:
  635. raise StopIteration from None
  636. raise TimeoutError from None
  637. success, value = item
  638. if success:
  639. return value
  640. raise value
  641. __next__ = next # XXX
  642. def _set(self, i, obj):
  643. with self._cond:
  644. if self._index == i:
  645. self._items.append(obj)
  646. self._index += 1
  647. while self._index in self._unsorted:
  648. obj = self._unsorted.pop(self._index)
  649. self._items.append(obj)
  650. self._index += 1
  651. self._cond.notify()
  652. else:
  653. self._unsorted[i] = obj
  654. if self._index == self._length:
  655. del self._cache[self._job]
  656. def _set_length(self, length):
  657. with self._cond:
  658. self._length = length
  659. if self._index == self._length:
  660. self._cond.notify()
  661. del self._cache[self._job]
  662. #
  663. # Class whose instances are returned by `Pool.imap_unordered()`
  664. #
  665. class IMapUnorderedIterator(IMapIterator):
  666. def _set(self, i, obj):
  667. with self._cond:
  668. self._items.append(obj)
  669. self._index += 1
  670. self._cond.notify()
  671. if self._index == self._length:
  672. del self._cache[self._job]
  673. #
  674. #
  675. #
  676. class ThreadPool(Pool):
  677. _wrap_exception = False
  678. @staticmethod
  679. def Process(*args, **kwds):
  680. from .dummy import Process
  681. return Process(*args, **kwds)
  682. def __init__(self, processes=None, initializer=None, initargs=()):
  683. Pool.__init__(self, processes, initializer, initargs)
  684. def _setup_queues(self):
  685. self._inqueue = queue.SimpleQueue()
  686. self._outqueue = queue.SimpleQueue()
  687. self._quick_put = self._inqueue.put
  688. self._quick_get = self._outqueue.get
  689. @staticmethod
  690. def _help_stuff_finish(inqueue, task_handler, size):
  691. # drain inqueue, and put sentinels at its head to make workers finish
  692. try:
  693. while True:
  694. inqueue.get(block=False)
  695. except queue.Empty:
  696. pass
  697. for i in range(size):
  698. inqueue.put(None)