util.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435
  1. #
  2. # Module providing various facilities to other parts of the package
  3. #
  4. # multiprocessing/util.py
  5. #
  6. # Copyright (c) 2006-2008, R Oudkerk
  7. # Licensed to PSF under a Contributor Agreement.
  8. #
  9. import os
  10. import itertools
  11. import sys
  12. import weakref
  13. import atexit
  14. import threading # we want threading to install it's
  15. # cleanup function before multiprocessing does
  16. from subprocess import _args_from_interpreter_flags
  17. from . import process
  18. __all__ = [
  19. 'sub_debug', 'debug', 'info', 'sub_warning', 'get_logger',
  20. 'log_to_stderr', 'get_temp_dir', 'register_after_fork',
  21. 'is_exiting', 'Finalize', 'ForkAwareThreadLock', 'ForkAwareLocal',
  22. 'close_all_fds_except', 'SUBDEBUG', 'SUBWARNING',
  23. ]
  24. #
  25. # Logging
  26. #
  27. NOTSET = 0
  28. SUBDEBUG = 5
  29. DEBUG = 10
  30. INFO = 20
  31. SUBWARNING = 25
  32. LOGGER_NAME = 'multiprocessing'
  33. DEFAULT_LOGGING_FORMAT = '[%(levelname)s/%(processName)s] %(message)s'
  34. _logger = None
  35. _log_to_stderr = False
  36. def sub_debug(msg, *args):
  37. if _logger:
  38. _logger.log(SUBDEBUG, msg, *args)
  39. def debug(msg, *args):
  40. if _logger:
  41. _logger.log(DEBUG, msg, *args)
  42. def info(msg, *args):
  43. if _logger:
  44. _logger.log(INFO, msg, *args)
  45. def sub_warning(msg, *args):
  46. if _logger:
  47. _logger.log(SUBWARNING, msg, *args)
  48. def get_logger():
  49. '''
  50. Returns logger used by multiprocessing
  51. '''
  52. global _logger
  53. import logging
  54. logging._acquireLock()
  55. try:
  56. if not _logger:
  57. _logger = logging.getLogger(LOGGER_NAME)
  58. _logger.propagate = 0
  59. # XXX multiprocessing should cleanup before logging
  60. if hasattr(atexit, 'unregister'):
  61. atexit.unregister(_exit_function)
  62. atexit.register(_exit_function)
  63. else:
  64. atexit._exithandlers.remove((_exit_function, (), {}))
  65. atexit._exithandlers.append((_exit_function, (), {}))
  66. finally:
  67. logging._releaseLock()
  68. return _logger
  69. def log_to_stderr(level=None):
  70. '''
  71. Turn on logging and add a handler which prints to stderr
  72. '''
  73. global _log_to_stderr
  74. import logging
  75. logger = get_logger()
  76. formatter = logging.Formatter(DEFAULT_LOGGING_FORMAT)
  77. handler = logging.StreamHandler()
  78. handler.setFormatter(formatter)
  79. logger.addHandler(handler)
  80. if level:
  81. logger.setLevel(level)
  82. _log_to_stderr = True
  83. return _logger
  84. #
  85. # Function returning a temp directory which will be removed on exit
  86. #
  87. def _remove_temp_dir(rmtree, tempdir):
  88. rmtree(tempdir)
  89. current_process = process.current_process()
  90. # current_process() can be None if the finalizer is called
  91. # late during Python finalization
  92. if current_process is not None:
  93. current_process._config['tempdir'] = None
  94. def get_temp_dir():
  95. # get name of a temp directory which will be automatically cleaned up
  96. tempdir = process.current_process()._config.get('tempdir')
  97. if tempdir is None:
  98. import shutil, tempfile
  99. tempdir = tempfile.mkdtemp(prefix='pymp-')
  100. info('created temp directory %s', tempdir)
  101. # keep a strong reference to shutil.rmtree(), since the finalizer
  102. # can be called late during Python shutdown
  103. Finalize(None, _remove_temp_dir, args=(shutil.rmtree, tempdir),
  104. exitpriority=-100)
  105. process.current_process()._config['tempdir'] = tempdir
  106. return tempdir
  107. #
  108. # Support for reinitialization of objects when bootstrapping a child process
  109. #
  110. _afterfork_registry = weakref.WeakValueDictionary()
  111. _afterfork_counter = itertools.count()
  112. def _run_after_forkers():
  113. items = list(_afterfork_registry.items())
  114. items.sort()
  115. for (index, ident, func), obj in items:
  116. try:
  117. func(obj)
  118. except Exception as e:
  119. info('after forker raised exception %s', e)
  120. def register_after_fork(obj, func):
  121. _afterfork_registry[(next(_afterfork_counter), id(obj), func)] = obj
  122. #
  123. # Finalization using weakrefs
  124. #
  125. _finalizer_registry = {}
  126. _finalizer_counter = itertools.count()
  127. class Finalize(object):
  128. '''
  129. Class which supports object finalization using weakrefs
  130. '''
  131. def __init__(self, obj, callback, args=(), kwargs=None, exitpriority=None):
  132. if (exitpriority is not None) and not isinstance(exitpriority,int):
  133. raise TypeError(
  134. "Exitpriority ({0!r}) must be None or int, not {1!s}".format(
  135. exitpriority, type(exitpriority)))
  136. if obj is not None:
  137. self._weakref = weakref.ref(obj, self)
  138. elif exitpriority is None:
  139. raise ValueError("Without object, exitpriority cannot be None")
  140. self._callback = callback
  141. self._args = args
  142. self._kwargs = kwargs or {}
  143. self._key = (exitpriority, next(_finalizer_counter))
  144. self._pid = os.getpid()
  145. _finalizer_registry[self._key] = self
  146. def __call__(self, wr=None,
  147. # Need to bind these locally because the globals can have
  148. # been cleared at shutdown
  149. _finalizer_registry=_finalizer_registry,
  150. sub_debug=sub_debug, getpid=os.getpid):
  151. '''
  152. Run the callback unless it has already been called or cancelled
  153. '''
  154. try:
  155. del _finalizer_registry[self._key]
  156. except KeyError:
  157. sub_debug('finalizer no longer registered')
  158. else:
  159. if self._pid != getpid():
  160. sub_debug('finalizer ignored because different process')
  161. res = None
  162. else:
  163. sub_debug('finalizer calling %s with args %s and kwargs %s',
  164. self._callback, self._args, self._kwargs)
  165. res = self._callback(*self._args, **self._kwargs)
  166. self._weakref = self._callback = self._args = \
  167. self._kwargs = self._key = None
  168. return res
  169. def cancel(self):
  170. '''
  171. Cancel finalization of the object
  172. '''
  173. try:
  174. del _finalizer_registry[self._key]
  175. except KeyError:
  176. pass
  177. else:
  178. self._weakref = self._callback = self._args = \
  179. self._kwargs = self._key = None
  180. def still_active(self):
  181. '''
  182. Return whether this finalizer is still waiting to invoke callback
  183. '''
  184. return self._key in _finalizer_registry
  185. def __repr__(self):
  186. try:
  187. obj = self._weakref()
  188. except (AttributeError, TypeError):
  189. obj = None
  190. if obj is None:
  191. return '<%s object, dead>' % self.__class__.__name__
  192. x = '<%s object, callback=%s' % (
  193. self.__class__.__name__,
  194. getattr(self._callback, '__name__', self._callback))
  195. if self._args:
  196. x += ', args=' + str(self._args)
  197. if self._kwargs:
  198. x += ', kwargs=' + str(self._kwargs)
  199. if self._key[0] is not None:
  200. x += ', exitprority=' + str(self._key[0])
  201. return x + '>'
  202. def _run_finalizers(minpriority=None):
  203. '''
  204. Run all finalizers whose exit priority is not None and at least minpriority
  205. Finalizers with highest priority are called first; finalizers with
  206. the same priority will be called in reverse order of creation.
  207. '''
  208. if _finalizer_registry is None:
  209. # This function may be called after this module's globals are
  210. # destroyed. See the _exit_function function in this module for more
  211. # notes.
  212. return
  213. if minpriority is None:
  214. f = lambda p : p[0] is not None
  215. else:
  216. f = lambda p : p[0] is not None and p[0] >= minpriority
  217. # Careful: _finalizer_registry may be mutated while this function
  218. # is running (either by a GC run or by another thread).
  219. # list(_finalizer_registry) should be atomic, while
  220. # list(_finalizer_registry.items()) is not.
  221. keys = [key for key in list(_finalizer_registry) if f(key)]
  222. keys.sort(reverse=True)
  223. for key in keys:
  224. finalizer = _finalizer_registry.get(key)
  225. # key may have been removed from the registry
  226. if finalizer is not None:
  227. sub_debug('calling %s', finalizer)
  228. try:
  229. finalizer()
  230. except Exception:
  231. import traceback
  232. traceback.print_exc()
  233. if minpriority is None:
  234. _finalizer_registry.clear()
  235. #
  236. # Clean up on exit
  237. #
  238. def is_exiting():
  239. '''
  240. Returns true if the process is shutting down
  241. '''
  242. return _exiting or _exiting is None
  243. _exiting = False
  244. def _exit_function(info=info, debug=debug, _run_finalizers=_run_finalizers,
  245. active_children=process.active_children,
  246. current_process=process.current_process):
  247. # We hold on to references to functions in the arglist due to the
  248. # situation described below, where this function is called after this
  249. # module's globals are destroyed.
  250. global _exiting
  251. if not _exiting:
  252. _exiting = True
  253. info('process shutting down')
  254. debug('running all "atexit" finalizers with priority >= 0')
  255. _run_finalizers(0)
  256. if current_process() is not None:
  257. # We check if the current process is None here because if
  258. # it's None, any call to ``active_children()`` will raise
  259. # an AttributeError (active_children winds up trying to
  260. # get attributes from util._current_process). One
  261. # situation where this can happen is if someone has
  262. # manipulated sys.modules, causing this module to be
  263. # garbage collected. The destructor for the module type
  264. # then replaces all values in the module dict with None.
  265. # For instance, after setuptools runs a test it replaces
  266. # sys.modules with a copy created earlier. See issues
  267. # #9775 and #15881. Also related: #4106, #9205, and
  268. # #9207.
  269. for p in active_children():
  270. if p.daemon:
  271. info('calling terminate() for daemon %s', p.name)
  272. p._popen.terminate()
  273. for p in active_children():
  274. info('calling join() for process %s', p.name)
  275. p.join()
  276. debug('running the remaining "atexit" finalizers')
  277. _run_finalizers()
  278. atexit.register(_exit_function)
  279. #
  280. # Some fork aware types
  281. #
  282. class ForkAwareThreadLock(object):
  283. def __init__(self):
  284. self._reset()
  285. register_after_fork(self, ForkAwareThreadLock._reset)
  286. def _reset(self):
  287. self._lock = threading.Lock()
  288. self.acquire = self._lock.acquire
  289. self.release = self._lock.release
  290. def __enter__(self):
  291. return self._lock.__enter__()
  292. def __exit__(self, *args):
  293. return self._lock.__exit__(*args)
  294. class ForkAwareLocal(threading.local):
  295. def __init__(self):
  296. register_after_fork(self, lambda obj : obj.__dict__.clear())
  297. def __reduce__(self):
  298. return type(self), ()
  299. #
  300. # Close fds except those specified
  301. #
  302. try:
  303. MAXFD = os.sysconf("SC_OPEN_MAX")
  304. except Exception:
  305. MAXFD = 256
  306. def close_all_fds_except(fds):
  307. fds = list(fds) + [-1, MAXFD]
  308. fds.sort()
  309. assert fds[-1] == MAXFD, 'fd too large'
  310. for i in range(len(fds) - 1):
  311. os.closerange(fds[i]+1, fds[i+1])
  312. #
  313. # Close sys.stdin and replace stdin with os.devnull
  314. #
  315. def _close_stdin():
  316. if sys.stdin is None:
  317. return
  318. try:
  319. sys.stdin.close()
  320. except (OSError, ValueError):
  321. pass
  322. try:
  323. fd = os.open(os.devnull, os.O_RDONLY)
  324. try:
  325. sys.stdin = open(fd, closefd=False)
  326. except:
  327. os.close(fd)
  328. raise
  329. except (OSError, ValueError):
  330. pass
  331. #
  332. # Flush standard streams, if any
  333. #
  334. def _flush_std_streams():
  335. try:
  336. sys.stdout.flush()
  337. except (AttributeError, ValueError):
  338. pass
  339. try:
  340. sys.stderr.flush()
  341. except (AttributeError, ValueError):
  342. pass
  343. #
  344. # Start a program with only specified fds kept open
  345. #
  346. def spawnv_passfds(path, args, passfds):
  347. import _posixsubprocess
  348. passfds = tuple(sorted(map(int, passfds)))
  349. errpipe_read, errpipe_write = os.pipe()
  350. try:
  351. return _posixsubprocess.fork_exec(
  352. args, [os.fsencode(path)], True, passfds, None, None,
  353. -1, -1, -1, -1, -1, -1, errpipe_read, errpipe_write,
  354. False, False, None)
  355. finally:
  356. os.close(errpipe_read)
  357. os.close(errpipe_write)