process.py 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379
  1. #
  2. # Module providing the `Process` class which emulates `threading.Thread`
  3. #
  4. # multiprocessing/process.py
  5. #
  6. # Copyright (c) 2006-2008, R Oudkerk
  7. # Licensed to PSF under a Contributor Agreement.
  8. #
  9. __all__ = ['BaseProcess', 'current_process', 'active_children']
  10. #
  11. # Imports
  12. #
  13. import os
  14. import sys
  15. import signal
  16. import itertools
  17. import threading
  18. from _weakrefset import WeakSet
  19. #
  20. #
  21. #
  22. try:
  23. ORIGINAL_DIR = os.path.abspath(os.getcwd())
  24. except OSError:
  25. ORIGINAL_DIR = None
  26. #
  27. # Public functions
  28. #
  29. def current_process():
  30. '''
  31. Return process object representing the current process
  32. '''
  33. return _current_process
  34. def active_children():
  35. '''
  36. Return list of process objects corresponding to live child processes
  37. '''
  38. _cleanup()
  39. return list(_children)
  40. #
  41. #
  42. #
  43. def _cleanup():
  44. # check for processes which have finished
  45. for p in list(_children):
  46. if p._popen.poll() is not None:
  47. _children.discard(p)
  48. #
  49. # The `Process` class
  50. #
  51. class BaseProcess(object):
  52. '''
  53. Process objects represent activity that is run in a separate process
  54. The class is analogous to `threading.Thread`
  55. '''
  56. def _Popen(self):
  57. raise NotImplementedError
  58. def __init__(self, group=None, target=None, name=None, args=(), kwargs={},
  59. *, daemon=None):
  60. assert group is None, 'group argument must be None for now'
  61. count = next(_process_counter)
  62. self._identity = _current_process._identity + (count,)
  63. self._config = _current_process._config.copy()
  64. self._parent_pid = os.getpid()
  65. self._popen = None
  66. self._closed = False
  67. self._target = target
  68. self._args = tuple(args)
  69. self._kwargs = dict(kwargs)
  70. self._name = name or type(self).__name__ + '-' + \
  71. ':'.join(str(i) for i in self._identity)
  72. if daemon is not None:
  73. self.daemon = daemon
  74. _dangling.add(self)
  75. def _check_closed(self):
  76. if self._closed:
  77. raise ValueError("process object is closed")
  78. def run(self):
  79. '''
  80. Method to be run in sub-process; can be overridden in sub-class
  81. '''
  82. if self._target:
  83. self._target(*self._args, **self._kwargs)
  84. def start(self):
  85. '''
  86. Start child process
  87. '''
  88. self._check_closed()
  89. assert self._popen is None, 'cannot start a process twice'
  90. assert self._parent_pid == os.getpid(), \
  91. 'can only start a process object created by current process'
  92. assert not _current_process._config.get('daemon'), \
  93. 'daemonic processes are not allowed to have children'
  94. _cleanup()
  95. self._popen = self._Popen(self)
  96. self._sentinel = self._popen.sentinel
  97. # Avoid a refcycle if the target function holds an indirect
  98. # reference to the process object (see bpo-30775)
  99. del self._target, self._args, self._kwargs
  100. _children.add(self)
  101. def terminate(self):
  102. '''
  103. Terminate process; sends SIGTERM signal or uses TerminateProcess()
  104. '''
  105. self._check_closed()
  106. self._popen.terminate()
  107. def kill(self):
  108. '''
  109. Terminate process; sends SIGKILL signal or uses TerminateProcess()
  110. '''
  111. self._check_closed()
  112. self._popen.kill()
  113. def join(self, timeout=None):
  114. '''
  115. Wait until child process terminates
  116. '''
  117. self._check_closed()
  118. assert self._parent_pid == os.getpid(), 'can only join a child process'
  119. assert self._popen is not None, 'can only join a started process'
  120. res = self._popen.wait(timeout)
  121. if res is not None:
  122. _children.discard(self)
  123. def is_alive(self):
  124. '''
  125. Return whether process is alive
  126. '''
  127. self._check_closed()
  128. if self is _current_process:
  129. return True
  130. assert self._parent_pid == os.getpid(), 'can only test a child process'
  131. if self._popen is None:
  132. return False
  133. returncode = self._popen.poll()
  134. if returncode is None:
  135. return True
  136. else:
  137. _children.discard(self)
  138. return False
  139. def close(self):
  140. '''
  141. Close the Process object.
  142. This method releases resources held by the Process object. It is
  143. an error to call this method if the child process is still running.
  144. '''
  145. if self._popen is not None:
  146. if self._popen.poll() is None:
  147. raise ValueError("Cannot close a process while it is still running. "
  148. "You should first call join() or terminate().")
  149. self._popen.close()
  150. self._popen = None
  151. del self._sentinel
  152. _children.discard(self)
  153. self._closed = True
  154. @property
  155. def name(self):
  156. return self._name
  157. @name.setter
  158. def name(self, name):
  159. assert isinstance(name, str), 'name must be a string'
  160. self._name = name
  161. @property
  162. def daemon(self):
  163. '''
  164. Return whether process is a daemon
  165. '''
  166. return self._config.get('daemon', False)
  167. @daemon.setter
  168. def daemon(self, daemonic):
  169. '''
  170. Set whether process is a daemon
  171. '''
  172. assert self._popen is None, 'process has already started'
  173. self._config['daemon'] = daemonic
  174. @property
  175. def authkey(self):
  176. return self._config['authkey']
  177. @authkey.setter
  178. def authkey(self, authkey):
  179. '''
  180. Set authorization key of process
  181. '''
  182. self._config['authkey'] = AuthenticationString(authkey)
  183. @property
  184. def exitcode(self):
  185. '''
  186. Return exit code of process or `None` if it has yet to stop
  187. '''
  188. self._check_closed()
  189. if self._popen is None:
  190. return self._popen
  191. return self._popen.poll()
  192. @property
  193. def ident(self):
  194. '''
  195. Return identifier (PID) of process or `None` if it has yet to start
  196. '''
  197. self._check_closed()
  198. if self is _current_process:
  199. return os.getpid()
  200. else:
  201. return self._popen and self._popen.pid
  202. pid = ident
  203. @property
  204. def sentinel(self):
  205. '''
  206. Return a file descriptor (Unix) or handle (Windows) suitable for
  207. waiting for process termination.
  208. '''
  209. self._check_closed()
  210. try:
  211. return self._sentinel
  212. except AttributeError:
  213. raise ValueError("process not started") from None
  214. def __repr__(self):
  215. if self is _current_process:
  216. status = 'started'
  217. elif self._closed:
  218. status = 'closed'
  219. elif self._parent_pid != os.getpid():
  220. status = 'unknown'
  221. elif self._popen is None:
  222. status = 'initial'
  223. else:
  224. if self._popen.poll() is not None:
  225. status = self.exitcode
  226. else:
  227. status = 'started'
  228. if type(status) is int:
  229. if status == 0:
  230. status = 'stopped'
  231. else:
  232. status = 'stopped[%s]' % _exitcode_to_name.get(status, status)
  233. return '<%s(%s, %s%s)>' % (type(self).__name__, self._name,
  234. status, self.daemon and ' daemon' or '')
  235. ##
  236. def _bootstrap(self):
  237. from . import util, context
  238. global _current_process, _process_counter, _children
  239. try:
  240. if self._start_method is not None:
  241. context._force_start_method(self._start_method)
  242. _process_counter = itertools.count(1)
  243. _children = set()
  244. util._close_stdin()
  245. old_process = _current_process
  246. _current_process = self
  247. try:
  248. util._finalizer_registry.clear()
  249. util._run_after_forkers()
  250. finally:
  251. # delay finalization of the old process object until after
  252. # _run_after_forkers() is executed
  253. del old_process
  254. util.info('child process calling self.run()')
  255. try:
  256. self.run()
  257. exitcode = 0
  258. finally:
  259. util._exit_function()
  260. except SystemExit as e:
  261. if not e.args:
  262. exitcode = 1
  263. elif isinstance(e.args[0], int):
  264. exitcode = e.args[0]
  265. else:
  266. sys.stderr.write(str(e.args[0]) + '\n')
  267. exitcode = 1
  268. except:
  269. exitcode = 1
  270. import traceback
  271. sys.stderr.write('Process %s:\n' % self.name)
  272. traceback.print_exc()
  273. finally:
  274. threading._shutdown()
  275. util.info('process exiting with exitcode %d' % exitcode)
  276. util._flush_std_streams()
  277. return exitcode
  278. #
  279. # We subclass bytes to avoid accidental transmission of auth keys over network
  280. #
  281. class AuthenticationString(bytes):
  282. def __reduce__(self):
  283. from .context import get_spawning_popen
  284. if get_spawning_popen() is None:
  285. raise TypeError(
  286. 'Pickling an AuthenticationString object is '
  287. 'disallowed for security reasons'
  288. )
  289. return AuthenticationString, (bytes(self),)
  290. #
  291. # Create object representing the main process
  292. #
  293. class _MainProcess(BaseProcess):
  294. def __init__(self):
  295. self._identity = ()
  296. self._name = 'MainProcess'
  297. self._parent_pid = None
  298. self._popen = None
  299. self._closed = False
  300. self._config = {'authkey': AuthenticationString(os.urandom(32)),
  301. 'semprefix': '/mp'}
  302. # Note that some versions of FreeBSD only allow named
  303. # semaphores to have names of up to 14 characters. Therefore
  304. # we choose a short prefix.
  305. #
  306. # On MacOSX in a sandbox it may be necessary to use a
  307. # different prefix -- see #19478.
  308. #
  309. # Everything in self._config will be inherited by descendant
  310. # processes.
  311. def close(self):
  312. pass
  313. _current_process = _MainProcess()
  314. _process_counter = itertools.count(1)
  315. _children = set()
  316. del _MainProcess
  317. #
  318. # Give names to some return codes
  319. #
  320. _exitcode_to_name = {}
  321. for name, signum in list(signal.__dict__.items()):
  322. if name[:3]=='SIG' and '_' not in name:
  323. _exitcode_to_name[-signum] = name
  324. # For debug and leak testing
  325. _dangling = WeakSet()