context.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357
  1. import os
  2. import sys
  3. import threading
  4. from . import process
  5. from . import reduction
  6. __all__ = [] # things are copied from here to __init__.py
  7. #
  8. # Exceptions
  9. #
  10. class ProcessError(Exception):
  11. pass
  12. class BufferTooShort(ProcessError):
  13. pass
  14. class TimeoutError(ProcessError):
  15. pass
  16. class AuthenticationError(ProcessError):
  17. pass
  18. #
  19. # Base type for contexts
  20. #
  21. class BaseContext(object):
  22. ProcessError = ProcessError
  23. BufferTooShort = BufferTooShort
  24. TimeoutError = TimeoutError
  25. AuthenticationError = AuthenticationError
  26. current_process = staticmethod(process.current_process)
  27. active_children = staticmethod(process.active_children)
  28. def cpu_count(self):
  29. '''Returns the number of CPUs in the system'''
  30. num = os.cpu_count()
  31. if num is None:
  32. raise NotImplementedError('cannot determine number of cpus')
  33. else:
  34. return num
  35. def Manager(self):
  36. '''Returns a manager associated with a running server process
  37. The managers methods such as `Lock()`, `Condition()` and `Queue()`
  38. can be used to create shared objects.
  39. '''
  40. from .managers import SyncManager
  41. m = SyncManager(ctx=self.get_context())
  42. m.start()
  43. return m
  44. def Pipe(self, duplex=True):
  45. '''Returns two connection object connected by a pipe'''
  46. from .connection import Pipe
  47. return Pipe(duplex)
  48. def Lock(self):
  49. '''Returns a non-recursive lock object'''
  50. from .synchronize import Lock
  51. return Lock(ctx=self.get_context())
  52. def RLock(self):
  53. '''Returns a recursive lock object'''
  54. from .synchronize import RLock
  55. return RLock(ctx=self.get_context())
  56. def Condition(self, lock=None):
  57. '''Returns a condition object'''
  58. from .synchronize import Condition
  59. return Condition(lock, ctx=self.get_context())
  60. def Semaphore(self, value=1):
  61. '''Returns a semaphore object'''
  62. from .synchronize import Semaphore
  63. return Semaphore(value, ctx=self.get_context())
  64. def BoundedSemaphore(self, value=1):
  65. '''Returns a bounded semaphore object'''
  66. from .synchronize import BoundedSemaphore
  67. return BoundedSemaphore(value, ctx=self.get_context())
  68. def Event(self):
  69. '''Returns an event object'''
  70. from .synchronize import Event
  71. return Event(ctx=self.get_context())
  72. def Barrier(self, parties, action=None, timeout=None):
  73. '''Returns a barrier object'''
  74. from .synchronize import Barrier
  75. return Barrier(parties, action, timeout, ctx=self.get_context())
  76. def Queue(self, maxsize=0):
  77. '''Returns a queue object'''
  78. from .queues import Queue
  79. return Queue(maxsize, ctx=self.get_context())
  80. def JoinableQueue(self, maxsize=0):
  81. '''Returns a queue object'''
  82. from .queues import JoinableQueue
  83. return JoinableQueue(maxsize, ctx=self.get_context())
  84. def SimpleQueue(self):
  85. '''Returns a queue object'''
  86. from .queues import SimpleQueue
  87. return SimpleQueue(ctx=self.get_context())
  88. def Pool(self, processes=None, initializer=None, initargs=(),
  89. maxtasksperchild=None):
  90. '''Returns a process pool object'''
  91. from .pool import Pool
  92. return Pool(processes, initializer, initargs, maxtasksperchild,
  93. context=self.get_context())
  94. def RawValue(self, typecode_or_type, *args):
  95. '''Returns a shared object'''
  96. from .sharedctypes import RawValue
  97. return RawValue(typecode_or_type, *args)
  98. def RawArray(self, typecode_or_type, size_or_initializer):
  99. '''Returns a shared array'''
  100. from .sharedctypes import RawArray
  101. return RawArray(typecode_or_type, size_or_initializer)
  102. def Value(self, typecode_or_type, *args, lock=True):
  103. '''Returns a synchronized shared object'''
  104. from .sharedctypes import Value
  105. return Value(typecode_or_type, *args, lock=lock,
  106. ctx=self.get_context())
  107. def Array(self, typecode_or_type, size_or_initializer, *, lock=True):
  108. '''Returns a synchronized shared array'''
  109. from .sharedctypes import Array
  110. return Array(typecode_or_type, size_or_initializer, lock=lock,
  111. ctx=self.get_context())
  112. def freeze_support(self):
  113. '''Check whether this is a fake forked process in a frozen executable.
  114. If so then run code specified by commandline and exit.
  115. '''
  116. if sys.platform == 'win32' and getattr(sys, 'frozen', False):
  117. from .spawn import freeze_support
  118. freeze_support()
  119. def get_logger(self):
  120. '''Return package logger -- if it does not already exist then
  121. it is created.
  122. '''
  123. from .util import get_logger
  124. return get_logger()
  125. def log_to_stderr(self, level=None):
  126. '''Turn on logging and add a handler which prints to stderr'''
  127. from .util import log_to_stderr
  128. return log_to_stderr(level)
  129. def allow_connection_pickling(self):
  130. '''Install support for sending connections and sockets
  131. between processes
  132. '''
  133. # This is undocumented. In previous versions of multiprocessing
  134. # its only effect was to make socket objects inheritable on Windows.
  135. from . import connection
  136. def set_executable(self, executable):
  137. '''Sets the path to a python.exe or pythonw.exe binary used to run
  138. child processes instead of sys.executable when using the 'spawn'
  139. start method. Useful for people embedding Python.
  140. '''
  141. from .spawn import set_executable
  142. set_executable(executable)
  143. def set_forkserver_preload(self, module_names):
  144. '''Set list of module names to try to load in forkserver process.
  145. This is really just a hint.
  146. '''
  147. from .forkserver import set_forkserver_preload
  148. set_forkserver_preload(module_names)
  149. def get_context(self, method=None):
  150. if method is None:
  151. return self
  152. try:
  153. ctx = _concrete_contexts[method]
  154. except KeyError:
  155. raise ValueError('cannot find context for %r' % method) from None
  156. ctx._check_available()
  157. return ctx
  158. def get_start_method(self, allow_none=False):
  159. return self._name
  160. def set_start_method(self, method, force=False):
  161. raise ValueError('cannot set start method of concrete context')
  162. @property
  163. def reducer(self):
  164. '''Controls how objects will be reduced to a form that can be
  165. shared with other processes.'''
  166. return globals().get('reduction')
  167. @reducer.setter
  168. def reducer(self, reduction):
  169. globals()['reduction'] = reduction
  170. def _check_available(self):
  171. pass
  172. #
  173. # Type of default context -- underlying context can be set at most once
  174. #
  175. class Process(process.BaseProcess):
  176. _start_method = None
  177. @staticmethod
  178. def _Popen(process_obj):
  179. return _default_context.get_context().Process._Popen(process_obj)
  180. class DefaultContext(BaseContext):
  181. Process = Process
  182. def __init__(self, context):
  183. self._default_context = context
  184. self._actual_context = None
  185. def get_context(self, method=None):
  186. if method is None:
  187. if self._actual_context is None:
  188. self._actual_context = self._default_context
  189. return self._actual_context
  190. else:
  191. return super().get_context(method)
  192. def set_start_method(self, method, force=False):
  193. if self._actual_context is not None and not force:
  194. raise RuntimeError('context has already been set')
  195. if method is None and force:
  196. self._actual_context = None
  197. return
  198. self._actual_context = self.get_context(method)
  199. def get_start_method(self, allow_none=False):
  200. if self._actual_context is None:
  201. if allow_none:
  202. return None
  203. self._actual_context = self._default_context
  204. return self._actual_context._name
  205. def get_all_start_methods(self):
  206. if sys.platform == 'win32':
  207. return ['spawn']
  208. else:
  209. if reduction.HAVE_SEND_HANDLE:
  210. return ['fork', 'spawn', 'forkserver']
  211. else:
  212. return ['fork', 'spawn']
  213. DefaultContext.__all__ = [x for x in dir(DefaultContext) if x[0] != '_']
  214. #
  215. # Context types for fixed start method
  216. #
  217. if sys.platform != 'win32':
  218. class ForkProcess(process.BaseProcess):
  219. _start_method = 'fork'
  220. @staticmethod
  221. def _Popen(process_obj):
  222. from .popen_fork import Popen
  223. return Popen(process_obj)
  224. class SpawnProcess(process.BaseProcess):
  225. _start_method = 'spawn'
  226. @staticmethod
  227. def _Popen(process_obj):
  228. from .popen_spawn_posix import Popen
  229. return Popen(process_obj)
  230. class ForkServerProcess(process.BaseProcess):
  231. _start_method = 'forkserver'
  232. @staticmethod
  233. def _Popen(process_obj):
  234. from .popen_forkserver import Popen
  235. return Popen(process_obj)
  236. class ForkContext(BaseContext):
  237. _name = 'fork'
  238. Process = ForkProcess
  239. class SpawnContext(BaseContext):
  240. _name = 'spawn'
  241. Process = SpawnProcess
  242. class ForkServerContext(BaseContext):
  243. _name = 'forkserver'
  244. Process = ForkServerProcess
  245. def _check_available(self):
  246. if not reduction.HAVE_SEND_HANDLE:
  247. raise ValueError('forkserver start method not available')
  248. _concrete_contexts = {
  249. 'fork': ForkContext(),
  250. 'spawn': SpawnContext(),
  251. 'forkserver': ForkServerContext(),
  252. }
  253. _default_context = DefaultContext(_concrete_contexts['fork'])
  254. else:
  255. class SpawnProcess(process.BaseProcess):
  256. _start_method = 'spawn'
  257. @staticmethod
  258. def _Popen(process_obj):
  259. from .popen_spawn_win32 import Popen
  260. return Popen(process_obj)
  261. class SpawnContext(BaseContext):
  262. _name = 'spawn'
  263. Process = SpawnProcess
  264. _concrete_contexts = {
  265. 'spawn': SpawnContext(),
  266. }
  267. _default_context = DefaultContext(_concrete_contexts['spawn'])
  268. #
  269. # Force the start method
  270. #
  271. def _force_start_method(method):
  272. _default_context._actual_context = _concrete_contexts[method]
  273. #
  274. # Check that the current thread is spawning a child process
  275. #
  276. _tls = threading.local()
  277. def get_spawning_popen():
  278. return getattr(_tls, 'spawning_popen', None)
  279. def set_spawning_popen(popen):
  280. _tls.spawning_popen = popen
  281. def assert_spawning(obj):
  282. if get_spawning_popen() is None:
  283. raise RuntimeError(
  284. '%s objects should only be shared between processes'
  285. ' through inheritance' % type(obj).__name__
  286. )