~pythonregexp2.7/python/issue2636-09-01+10

« back to all changes in this revision

Viewing changes to Lib/multiprocessing/heap.py

  • Committer: Jeffrey C. "The TimeHorse" Jacobs
  • Date: 2008-09-22 21:39:45 UTC
  • mfrom: (39055.1.33 Regexp-2.7)
  • Revision ID: darklord@timehorse.com-20080922213945-23717m5eiqpamcyn
Merged in changes from the Single-Loop Engine branch.

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
#
 
2
# Module which supports allocation of memory from an mmap
 
3
#
 
4
# multiprocessing/heap.py
 
5
#
 
6
# Copyright (c) 2007-2008, R Oudkerk --- see COPYING.txt
 
7
#
 
8
 
 
9
import bisect
 
10
import mmap
 
11
import tempfile
 
12
import os
 
13
import sys
 
14
import threading
 
15
import itertools
 
16
 
 
17
import _multiprocessing
 
18
from multiprocessing.util import Finalize, info
 
19
from multiprocessing.forking import assert_spawning
 
20
 
 
21
__all__ = ['BufferWrapper']
 
22
 
 
23
#
 
24
# Inheirtable class which wraps an mmap, and from which blocks can be allocated
 
25
#
 
26
 
 
27
if sys.platform == 'win32':
 
28
 
 
29
    from ._multiprocessing import win32
 
30
 
 
31
    class Arena(object):
 
32
 
 
33
        _counter = itertools.count()
 
34
 
 
35
        def __init__(self, size):
 
36
            self.size = size
 
37
            self.name = 'pym-%d-%d' % (os.getpid(), Arena._counter.next())
 
38
            self.buffer = mmap.mmap(-1, self.size, tagname=self.name)
 
39
            assert win32.GetLastError() == 0, 'tagname already in use'
 
40
            self._state = (self.size, self.name)
 
41
 
 
42
        def __getstate__(self):
 
43
            assert_spawning(self)
 
44
            return self._state
 
45
 
 
46
        def __setstate__(self, state):
 
47
            self.size, self.name = self._state = state
 
48
            self.buffer = mmap.mmap(-1, self.size, tagname=self.name)
 
49
            assert win32.GetLastError() == win32.ERROR_ALREADY_EXISTS
 
50
 
 
51
else:
 
52
 
 
53
    class Arena(object):
 
54
 
 
55
        def __init__(self, size):
 
56
            self.buffer = mmap.mmap(-1, size)
 
57
            self.size = size
 
58
            self.name = None
 
59
 
 
60
#
 
61
# Class allowing allocation of chunks of memory from arenas
 
62
#
 
63
 
 
64
class Heap(object):
 
65
 
 
66
    _alignment = 8
 
67
 
 
68
    def __init__(self, size=mmap.PAGESIZE):
 
69
        self._lastpid = os.getpid()
 
70
        self._lock = threading.Lock()
 
71
        self._size = size
 
72
        self._lengths = []
 
73
        self._len_to_seq = {}
 
74
        self._start_to_block = {}
 
75
        self._stop_to_block = {}
 
76
        self._allocated_blocks = set()
 
77
        self._arenas = []
 
78
 
 
79
    @staticmethod
 
80
    def _roundup(n, alignment):
 
81
        # alignment must be a power of 2
 
82
        mask = alignment - 1
 
83
        return (n + mask) & ~mask
 
84
 
 
85
    def _malloc(self, size):
 
86
        # returns a large enough block -- it might be much larger
 
87
        i = bisect.bisect_left(self._lengths, size)
 
88
        if i == len(self._lengths):
 
89
            length = self._roundup(max(self._size, size), mmap.PAGESIZE)
 
90
            self._size *= 2
 
91
            info('allocating a new mmap of length %d', length)
 
92
            arena = Arena(length)
 
93
            self._arenas.append(arena)
 
94
            return (arena, 0, length)
 
95
        else:
 
96
            length = self._lengths[i]
 
97
            seq = self._len_to_seq[length]
 
98
            block = seq.pop()
 
99
            if not seq:
 
100
                del self._len_to_seq[length], self._lengths[i]
 
101
 
 
102
        (arena, start, stop) = block
 
103
        del self._start_to_block[(arena, start)]
 
104
        del self._stop_to_block[(arena, stop)]
 
105
        return block
 
106
 
 
107
    def _free(self, block):
 
108
        # free location and try to merge with neighbours
 
109
        (arena, start, stop) = block
 
110
 
 
111
        try:
 
112
            prev_block = self._stop_to_block[(arena, start)]
 
113
        except KeyError:
 
114
            pass
 
115
        else:
 
116
            start, _ = self._absorb(prev_block)
 
117
 
 
118
        try:
 
119
            next_block = self._start_to_block[(arena, stop)]
 
120
        except KeyError:
 
121
            pass
 
122
        else:
 
123
            _, stop = self._absorb(next_block)
 
124
 
 
125
        block = (arena, start, stop)
 
126
        length = stop - start
 
127
 
 
128
        try:
 
129
            self._len_to_seq[length].append(block)
 
130
        except KeyError:
 
131
            self._len_to_seq[length] = [block]
 
132
            bisect.insort(self._lengths, length)
 
133
 
 
134
        self._start_to_block[(arena, start)] = block
 
135
        self._stop_to_block[(arena, stop)] = block
 
136
 
 
137
    def _absorb(self, block):
 
138
        # deregister this block so it can be merged with a neighbour
 
139
        (arena, start, stop) = block
 
140
        del self._start_to_block[(arena, start)]
 
141
        del self._stop_to_block[(arena, stop)]
 
142
 
 
143
        length = stop - start
 
144
        seq = self._len_to_seq[length]
 
145
        seq.remove(block)
 
146
        if not seq:
 
147
            del self._len_to_seq[length]
 
148
            self._lengths.remove(length)
 
149
 
 
150
        return start, stop
 
151
 
 
152
    def free(self, block):
 
153
        # free a block returned by malloc()
 
154
        assert os.getpid() == self._lastpid
 
155
        self._lock.acquire()
 
156
        try:
 
157
            self._allocated_blocks.remove(block)
 
158
            self._free(block)
 
159
        finally:
 
160
            self._lock.release()
 
161
 
 
162
    def malloc(self, size):
 
163
        # return a block of right size (possibly rounded up)
 
164
        assert 0 <= size < sys.maxint
 
165
        if os.getpid() != self._lastpid:
 
166
            self.__init__()                     # reinitialize after fork
 
167
        self._lock.acquire()
 
168
        try:
 
169
            size = self._roundup(max(size,1), self._alignment)
 
170
            (arena, start, stop) = self._malloc(size)
 
171
            new_stop = start + size
 
172
            if new_stop < stop:
 
173
                self._free((arena, new_stop, stop))
 
174
            block = (arena, start, new_stop)
 
175
            self._allocated_blocks.add(block)
 
176
            return block
 
177
        finally:
 
178
            self._lock.release()
 
179
 
 
180
#
 
181
# Class representing a chunk of an mmap -- can be inherited
 
182
#
 
183
 
 
184
class BufferWrapper(object):
 
185
 
 
186
    _heap = Heap()
 
187
 
 
188
    def __init__(self, size):
 
189
        assert 0 <= size < sys.maxint
 
190
        block = BufferWrapper._heap.malloc(size)
 
191
        self._state = (block, size)
 
192
        Finalize(self, BufferWrapper._heap.free, args=(block,))
 
193
 
 
194
    def get_address(self):
 
195
        (arena, start, stop), size = self._state
 
196
        address, length = _multiprocessing.address_of_buffer(arena.buffer)
 
197
        assert size <= length
 
198
        return address + start
 
199
 
 
200
    def get_size(self):
 
201
        return self._state[1]