| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217 |
- # Copyright 2009 Brian Quinlan. All Rights Reserved.
- # Licensed to PSF under a Contributor Agreement.
- """Implements ThreadPoolExecutor."""
- __author__ = 'Brian Quinlan (brian@sweetapp.com)'
- import atexit
- from concurrent.futures import _base
- import itertools
- import queue
- import threading
- import weakref
- import os
- # Workers are created as daemon threads. This is done to allow the interpreter
- # to exit when there are still idle threads in a ThreadPoolExecutor's thread
- # pool (i.e. shutdown() was not called). However, allowing workers to die with
- # the interpreter has two undesirable properties:
- # - The workers would still be running during interpreter shutdown,
- # meaning that they would fail in unpredictable ways.
- # - The workers could be killed while evaluating a work item, which could
- # be bad if the callable being evaluated has external side-effects e.g.
- # writing to a file.
- #
- # To work around this problem, an exit handler is installed which tells the
- # workers to exit when their work queues are empty and then waits until the
- # threads finish.
- _threads_queues = weakref.WeakKeyDictionary()
- _shutdown = False
- def _python_exit():
- global _shutdown
- _shutdown = True
- items = list(_threads_queues.items())
- for t, q in items:
- q.put(None)
- for t, q in items:
- t.join()
- atexit.register(_python_exit)
- class _WorkItem(object):
- def __init__(self, future, fn, args, kwargs):
- self.future = future
- self.fn = fn
- self.args = args
- self.kwargs = kwargs
- def run(self):
- if not self.future.set_running_or_notify_cancel():
- return
- try:
- result = self.fn(*self.args, **self.kwargs)
- except BaseException as exc:
- self.future.set_exception(exc)
- # Break a reference cycle with the exception 'exc'
- self = None
- else:
- self.future.set_result(result)
- def _worker(executor_reference, work_queue, initializer, initargs):
- if initializer is not None:
- try:
- initializer(*initargs)
- except BaseException:
- _base.LOGGER.critical('Exception in initializer:', exc_info=True)
- executor = executor_reference()
- if executor is not None:
- executor._initializer_failed()
- return
- try:
- while True:
- work_item = work_queue.get(block=True)
- if work_item is not None:
- work_item.run()
- # Delete references to object. See issue16284
- del work_item
- continue
- executor = executor_reference()
- # Exit if:
- # - The interpreter is shutting down OR
- # - The executor that owns the worker has been collected OR
- # - The executor that owns the worker has been shutdown.
- if _shutdown or executor is None or executor._shutdown:
- # Flag the executor as shutting down as early as possible if it
- # is not gc-ed yet.
- if executor is not None:
- executor._shutdown = True
- # Notice other workers
- work_queue.put(None)
- return
- del executor
- except BaseException:
- _base.LOGGER.critical('Exception in worker', exc_info=True)
- class BrokenThreadPool(_base.BrokenExecutor):
- """
- Raised when a worker thread in a ThreadPoolExecutor failed initializing.
- """
- class ThreadPoolExecutor(_base.Executor):
- # Used to assign unique thread names when thread_name_prefix is not supplied.
- _counter = itertools.count().__next__
- def __init__(self, max_workers=None, thread_name_prefix='',
- initializer=None, initargs=()):
- """Initializes a new ThreadPoolExecutor instance.
- Args:
- max_workers: The maximum number of threads that can be used to
- execute the given calls.
- thread_name_prefix: An optional name prefix to give our threads.
- initializer: A callable used to initialize worker threads.
- initargs: A tuple of arguments to pass to the initializer.
- """
- if max_workers is None:
- # Use this number because ThreadPoolExecutor is often
- # used to overlap I/O instead of CPU work.
- max_workers = (os.cpu_count() or 1) * 5
- if max_workers <= 0:
- raise ValueError("max_workers must be greater than 0")
- if initializer is not None and not callable(initializer):
- raise TypeError("initializer must be a callable")
- self._max_workers = max_workers
- self._work_queue = queue.SimpleQueue()
- self._threads = set()
- self._broken = False
- self._shutdown = False
- self._shutdown_lock = threading.Lock()
- self._thread_name_prefix = (thread_name_prefix or
- ("ThreadPoolExecutor-%d" % self._counter()))
- self._initializer = initializer
- self._initargs = initargs
- def submit(*args, **kwargs):
- if len(args) >= 2:
- self, fn, *args = args
- elif not args:
- raise TypeError("descriptor 'submit' of 'ThreadPoolExecutor' object "
- "needs an argument")
- elif 'fn' in kwargs:
- fn = kwargs.pop('fn')
- self, *args = args
- else:
- raise TypeError('submit expected at least 1 positional argument, '
- 'got %d' % (len(args)-1))
- with self._shutdown_lock:
- if self._broken:
- raise BrokenThreadPool(self._broken)
- if self._shutdown:
- raise RuntimeError('cannot schedule new futures after shutdown')
- if _shutdown:
- raise RuntimeError('cannot schedule new futures after '
- 'interpreter shutdown')
- f = _base.Future()
- w = _WorkItem(f, fn, args, kwargs)
- self._work_queue.put(w)
- self._adjust_thread_count()
- return f
- submit.__doc__ = _base.Executor.submit.__doc__
- def _adjust_thread_count(self):
- # When the executor gets lost, the weakref callback will wake up
- # the worker threads.
- def weakref_cb(_, q=self._work_queue):
- q.put(None)
- # TODO(bquinlan): Should avoid creating new threads if there are more
- # idle threads than items in the work queue.
- num_threads = len(self._threads)
- if num_threads < self._max_workers:
- thread_name = '%s_%d' % (self._thread_name_prefix or self,
- num_threads)
- t = threading.Thread(name=thread_name, target=_worker,
- args=(weakref.ref(self, weakref_cb),
- self._work_queue,
- self._initializer,
- self._initargs))
- t.daemon = True
- t.start()
- self._threads.add(t)
- _threads_queues[t] = self._work_queue
- def _initializer_failed(self):
- with self._shutdown_lock:
- self._broken = ('A thread initializer failed, the thread pool '
- 'is not usable anymore')
- # Drain work queue and mark pending futures failed
- while True:
- try:
- work_item = self._work_queue.get_nowait()
- except queue.Empty:
- break
- if work_item is not None:
- work_item.future.set_exception(BrokenThreadPool(self._broken))
- def shutdown(self, wait=True):
- with self._shutdown_lock:
- self._shutdown = True
- self._work_queue.put(None)
- if wait:
- for t in self._threads:
- t.join()
- shutdown.__doc__ = _base.Executor.shutdown.__doc__
|