heap.py 8.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269
  1. #
  2. # Module which supports allocation of memory from an mmap
  3. #
  4. # multiprocessing/heap.py
  5. #
  6. # Copyright (c) 2006-2008, R Oudkerk
  7. # Licensed to PSF under a Contributor Agreement.
  8. #
  9. import bisect
  10. import mmap
  11. import os
  12. import sys
  13. import tempfile
  14. import threading
  15. from .context import reduction, assert_spawning
  16. from . import util
  17. __all__ = ['BufferWrapper']
  18. #
  19. # Inheritable class which wraps an mmap, and from which blocks can be allocated
  20. #
  21. if sys.platform == 'win32':
  22. import _winapi
  23. class Arena(object):
  24. _rand = tempfile._RandomNameSequence()
  25. def __init__(self, size):
  26. self.size = size
  27. for i in range(100):
  28. name = 'pym-%d-%s' % (os.getpid(), next(self._rand))
  29. buf = mmap.mmap(-1, size, tagname=name)
  30. if _winapi.GetLastError() == 0:
  31. break
  32. # We have reopened a preexisting mmap.
  33. buf.close()
  34. else:
  35. raise FileExistsError('Cannot find name for new mmap')
  36. self.name = name
  37. self.buffer = buf
  38. self._state = (self.size, self.name)
  39. def __getstate__(self):
  40. assert_spawning(self)
  41. return self._state
  42. def __setstate__(self, state):
  43. self.size, self.name = self._state = state
  44. self.buffer = mmap.mmap(-1, self.size, tagname=self.name)
  45. # XXX Temporarily preventing buildbot failures while determining
  46. # XXX the correct long-term fix. See issue 23060
  47. #assert _winapi.GetLastError() == _winapi.ERROR_ALREADY_EXISTS
  48. else:
  49. class Arena(object):
  50. if sys.platform == 'linux':
  51. _dir_candidates = ['/dev/shm']
  52. else:
  53. _dir_candidates = []
  54. def __init__(self, size, fd=-1):
  55. self.size = size
  56. self.fd = fd
  57. if fd == -1:
  58. self.fd, name = tempfile.mkstemp(
  59. prefix='pym-%d-'%os.getpid(),
  60. dir=self._choose_dir(size))
  61. os.unlink(name)
  62. util.Finalize(self, os.close, (self.fd,))
  63. os.ftruncate(self.fd, size)
  64. self.buffer = mmap.mmap(self.fd, self.size)
  65. def _choose_dir(self, size):
  66. # Choose a non-storage backed directory if possible,
  67. # to improve performance
  68. for d in self._dir_candidates:
  69. st = os.statvfs(d)
  70. if st.f_bavail * st.f_frsize >= size: # enough free space?
  71. return d
  72. return util.get_temp_dir()
  73. def reduce_arena(a):
  74. if a.fd == -1:
  75. raise ValueError('Arena is unpicklable because '
  76. 'forking was enabled when it was created')
  77. return rebuild_arena, (a.size, reduction.DupFd(a.fd))
  78. def rebuild_arena(size, dupfd):
  79. return Arena(size, dupfd.detach())
  80. reduction.register(Arena, reduce_arena)
  81. #
  82. # Class allowing allocation of chunks of memory from arenas
  83. #
  84. class Heap(object):
  85. _alignment = 8
  86. def __init__(self, size=mmap.PAGESIZE):
  87. self._lastpid = os.getpid()
  88. self._lock = threading.Lock()
  89. self._size = size
  90. self._lengths = []
  91. self._len_to_seq = {}
  92. self._start_to_block = {}
  93. self._stop_to_block = {}
  94. self._allocated_blocks = set()
  95. self._arenas = []
  96. # list of pending blocks to free - see free() comment below
  97. self._pending_free_blocks = []
  98. @staticmethod
  99. def _roundup(n, alignment):
  100. # alignment must be a power of 2
  101. mask = alignment - 1
  102. return (n + mask) & ~mask
  103. def _malloc(self, size):
  104. # returns a large enough block -- it might be much larger
  105. i = bisect.bisect_left(self._lengths, size)
  106. if i == len(self._lengths):
  107. length = self._roundup(max(self._size, size), mmap.PAGESIZE)
  108. self._size *= 2
  109. util.info('allocating a new mmap of length %d', length)
  110. arena = Arena(length)
  111. self._arenas.append(arena)
  112. return (arena, 0, length)
  113. else:
  114. length = self._lengths[i]
  115. seq = self._len_to_seq[length]
  116. block = seq.pop()
  117. if not seq:
  118. del self._len_to_seq[length], self._lengths[i]
  119. (arena, start, stop) = block
  120. del self._start_to_block[(arena, start)]
  121. del self._stop_to_block[(arena, stop)]
  122. return block
  123. def _free(self, block):
  124. # free location and try to merge with neighbours
  125. (arena, start, stop) = block
  126. try:
  127. prev_block = self._stop_to_block[(arena, start)]
  128. except KeyError:
  129. pass
  130. else:
  131. start, _ = self._absorb(prev_block)
  132. try:
  133. next_block = self._start_to_block[(arena, stop)]
  134. except KeyError:
  135. pass
  136. else:
  137. _, stop = self._absorb(next_block)
  138. block = (arena, start, stop)
  139. length = stop - start
  140. try:
  141. self._len_to_seq[length].append(block)
  142. except KeyError:
  143. self._len_to_seq[length] = [block]
  144. bisect.insort(self._lengths, length)
  145. self._start_to_block[(arena, start)] = block
  146. self._stop_to_block[(arena, stop)] = block
  147. def _absorb(self, block):
  148. # deregister this block so it can be merged with a neighbour
  149. (arena, start, stop) = block
  150. del self._start_to_block[(arena, start)]
  151. del self._stop_to_block[(arena, stop)]
  152. length = stop - start
  153. seq = self._len_to_seq[length]
  154. seq.remove(block)
  155. if not seq:
  156. del self._len_to_seq[length]
  157. self._lengths.remove(length)
  158. return start, stop
  159. def _free_pending_blocks(self):
  160. # Free all the blocks in the pending list - called with the lock held.
  161. while True:
  162. try:
  163. block = self._pending_free_blocks.pop()
  164. except IndexError:
  165. break
  166. self._allocated_blocks.remove(block)
  167. self._free(block)
  168. def free(self, block):
  169. # free a block returned by malloc()
  170. # Since free() can be called asynchronously by the GC, it could happen
  171. # that it's called while self._lock is held: in that case,
  172. # self._lock.acquire() would deadlock (issue #12352). To avoid that, a
  173. # trylock is used instead, and if the lock can't be acquired
  174. # immediately, the block is added to a list of blocks to be freed
  175. # synchronously sometimes later from malloc() or free(), by calling
  176. # _free_pending_blocks() (appending and retrieving from a list is not
  177. # strictly thread-safe but under cPython it's atomic thanks to the GIL).
  178. if os.getpid() != self._lastpid:
  179. raise ValueError(
  180. "My pid ({0:n}) is not last pid {1:n}".format(
  181. os.getpid(),self._lastpid))
  182. if not self._lock.acquire(False):
  183. # can't acquire the lock right now, add the block to the list of
  184. # pending blocks to free
  185. self._pending_free_blocks.append(block)
  186. else:
  187. # we hold the lock
  188. try:
  189. self._free_pending_blocks()
  190. self._allocated_blocks.remove(block)
  191. self._free(block)
  192. finally:
  193. self._lock.release()
  194. def malloc(self, size):
  195. # return a block of right size (possibly rounded up)
  196. if size < 0:
  197. raise ValueError("Size {0:n} out of range".format(size))
  198. if sys.maxsize <= size:
  199. raise OverflowError("Size {0:n} too large".format(size))
  200. if os.getpid() != self._lastpid:
  201. self.__init__() # reinitialize after fork
  202. with self._lock:
  203. self._free_pending_blocks()
  204. size = self._roundup(max(size,1), self._alignment)
  205. (arena, start, stop) = self._malloc(size)
  206. new_stop = start + size
  207. if new_stop < stop:
  208. self._free((arena, new_stop, stop))
  209. block = (arena, start, new_stop)
  210. self._allocated_blocks.add(block)
  211. return block
  212. #
  213. # Class representing a chunk of an mmap -- can be inherited by child process
  214. #
  215. class BufferWrapper(object):
  216. _heap = Heap()
  217. def __init__(self, size):
  218. if size < 0:
  219. raise ValueError("Size {0:n} out of range".format(size))
  220. if sys.maxsize <= size:
  221. raise OverflowError("Size {0:n} too large".format(size))
  222. block = BufferWrapper._heap.malloc(size)
  223. self._state = (block, size)
  224. util.Finalize(self, BufferWrapper._heap.free, args=(block,))
  225. def create_memoryview(self):
  226. (arena, start, stop), size = self._state
  227. return memoryview(arena.buffer)[start:start+size]