~ubuntu-branches/ubuntu/maverick/python3.1/maverick

« back to all changes in this revision

Viewing changes to Lib/multiprocessing/sharedctypes.py

  • Committer: Bazaar Package Importer
  • Author(s): Matthias Klose
  • Date: 2009-03-23 00:01:27 UTC
  • Revision ID: james.westby@ubuntu.com-20090323000127-5fstfxju4ufrhthq
Tags: upstream-3.1~a1+20090322
ImportĀ upstreamĀ versionĀ 3.1~a1+20090322

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
#
 
2
# Module which supports allocation of ctypes objects from shared memory
 
3
#
 
4
# multiprocessing/sharedctypes.py
 
5
#
 
6
# Copyright (c) 2007-2008, R Oudkerk --- see COPYING.txt
 
7
#
 
8
 
 
9
import sys
 
10
import ctypes
 
11
import weakref
 
12
 
 
13
from multiprocessing import heap, RLock
 
14
from multiprocessing.forking import assert_spawning, ForkingPickler
 
15
 
 
16
__all__ = ['RawValue', 'RawArray', 'Value', 'Array', 'copy', 'synchronized']
 
17
 
 
18
#
 
19
#
 
20
#
 
21
 
 
22
typecode_to_type = {
 
23
    'c': ctypes.c_char,  'u': ctypes.c_wchar,
 
24
    'b': ctypes.c_byte,  'B': ctypes.c_ubyte,
 
25
    'h': ctypes.c_short, 'H': ctypes.c_ushort,
 
26
    'i': ctypes.c_int,   'I': ctypes.c_uint,
 
27
    'l': ctypes.c_long,  'L': ctypes.c_ulong,
 
28
    'f': ctypes.c_float, 'd': ctypes.c_double
 
29
    }
 
30
 
 
31
#
 
32
#
 
33
#
 
34
 
 
35
def _new_value(type_):
 
36
    size = ctypes.sizeof(type_)
 
37
    wrapper = heap.BufferWrapper(size)
 
38
    return rebuild_ctype(type_, wrapper, None)
 
39
 
 
40
def RawValue(typecode_or_type, *args):
 
41
    '''
 
42
    Returns a ctypes object allocated from shared memory
 
43
    '''
 
44
    type_ = typecode_to_type.get(typecode_or_type, typecode_or_type)
 
45
    obj = _new_value(type_)
 
46
    ctypes.memset(ctypes.addressof(obj), 0, ctypes.sizeof(obj))
 
47
    obj.__init__(*args)
 
48
    return obj
 
49
 
 
50
def RawArray(typecode_or_type, size_or_initializer):
 
51
    '''
 
52
    Returns a ctypes array allocated from shared memory
 
53
    '''
 
54
    type_ = typecode_to_type.get(typecode_or_type, typecode_or_type)
 
55
    if isinstance(size_or_initializer, int):
 
56
        type_ = type_ * size_or_initializer
 
57
        return _new_value(type_)
 
58
    else:
 
59
        type_ = type_ * len(size_or_initializer)
 
60
        result = _new_value(type_)
 
61
        result.__init__(*size_or_initializer)
 
62
        return result
 
63
 
 
64
def Value(typecode_or_type, *args, lock=None):
 
65
    '''
 
66
    Return a synchronization wrapper for a Value
 
67
    '''
 
68
    obj = RawValue(typecode_or_type, *args)
 
69
    if lock is False:
 
70
        return obj
 
71
    if lock in (True, None):
 
72
        lock = RLock()
 
73
    if not hasattr(lock, 'acquire'):
 
74
        raise AttributeError("'%r' has no method 'acquire'" % lock)
 
75
    return synchronized(obj, lock)
 
76
 
 
77
def Array(typecode_or_type, size_or_initializer, **kwds):
 
78
    '''
 
79
    Return a synchronization wrapper for a RawArray
 
80
    '''
 
81
    lock = kwds.pop('lock', None)
 
82
    if kwds:
 
83
        raise ValueError('unrecognized keyword argument(s): %s' % list(kwds.keys()))
 
84
    obj = RawArray(typecode_or_type, size_or_initializer)
 
85
    if lock is False:
 
86
        return obj
 
87
    if lock in (True, None):
 
88
        lock = RLock()
 
89
    if not hasattr(lock, 'acquire'):
 
90
        raise AttributeError("'%r' has no method 'acquire'" % lock)
 
91
    return synchronized(obj, lock)
 
92
 
 
93
def copy(obj):
 
94
    new_obj = _new_value(type(obj))
 
95
    ctypes.pointer(new_obj)[0] = obj
 
96
    return new_obj
 
97
 
 
98
def synchronized(obj, lock=None):
 
99
    assert not isinstance(obj, SynchronizedBase), 'object already synchronized'
 
100
 
 
101
    if isinstance(obj, ctypes._SimpleCData):
 
102
        return Synchronized(obj, lock)
 
103
    elif isinstance(obj, ctypes.Array):
 
104
        if obj._type_ is ctypes.c_char:
 
105
            return SynchronizedString(obj, lock)
 
106
        return SynchronizedArray(obj, lock)
 
107
    else:
 
108
        cls = type(obj)
 
109
        try:
 
110
            scls = class_cache[cls]
 
111
        except KeyError:
 
112
            names = [field[0] for field in cls._fields_]
 
113
            d = dict((name, make_property(name)) for name in names)
 
114
            classname = 'Synchronized' + cls.__name__
 
115
            scls = class_cache[cls] = type(classname, (SynchronizedBase,), d)
 
116
        return scls(obj, lock)
 
117
 
 
118
#
 
119
# Functions for pickling/unpickling
 
120
#
 
121
 
 
122
def reduce_ctype(obj):
 
123
    assert_spawning(obj)
 
124
    if isinstance(obj, ctypes.Array):
 
125
        return rebuild_ctype, (obj._type_, obj._wrapper, obj._length_)
 
126
    else:
 
127
        return rebuild_ctype, (type(obj), obj._wrapper, None)
 
128
 
 
129
def rebuild_ctype(type_, wrapper, length):
 
130
    if length is not None:
 
131
        type_ = type_ * length
 
132
    ForkingPickler.register(type_, reduce_ctype)
 
133
    obj = type_.from_address(wrapper.get_address())
 
134
    obj._wrapper = wrapper
 
135
    return obj
 
136
 
 
137
#
 
138
# Function to create properties
 
139
#
 
140
 
 
141
def make_property(name):
 
142
    try:
 
143
        return prop_cache[name]
 
144
    except KeyError:
 
145
        d = {}
 
146
        exec(template % ((name,)*7), d)
 
147
        prop_cache[name] = d[name]
 
148
        return d[name]
 
149
 
 
150
template = '''
 
151
def get%s(self):
 
152
    self.acquire()
 
153
    try:
 
154
        return self._obj.%s
 
155
    finally:
 
156
        self.release()
 
157
def set%s(self, value):
 
158
    self.acquire()
 
159
    try:
 
160
        self._obj.%s = value
 
161
    finally:
 
162
        self.release()
 
163
%s = property(get%s, set%s)
 
164
'''
 
165
 
 
166
prop_cache = {}
 
167
class_cache = weakref.WeakKeyDictionary()
 
168
 
 
169
#
 
170
# Synchronized wrappers
 
171
#
 
172
 
 
173
class SynchronizedBase(object):
 
174
 
 
175
    def __init__(self, obj, lock=None):
 
176
        self._obj = obj
 
177
        self._lock = lock or RLock()
 
178
        self.acquire = self._lock.acquire
 
179
        self.release = self._lock.release
 
180
 
 
181
    def __reduce__(self):
 
182
        assert_spawning(self)
 
183
        return synchronized, (self._obj, self._lock)
 
184
 
 
185
    def get_obj(self):
 
186
        return self._obj
 
187
 
 
188
    def get_lock(self):
 
189
        return self._lock
 
190
 
 
191
    def __repr__(self):
 
192
        return '<%s wrapper for %s>' % (type(self).__name__, self._obj)
 
193
 
 
194
 
 
195
class Synchronized(SynchronizedBase):
 
196
    value = make_property('value')
 
197
 
 
198
 
 
199
class SynchronizedArray(SynchronizedBase):
 
200
 
 
201
    def __len__(self):
 
202
        return len(self._obj)
 
203
 
 
204
    def __getitem__(self, i):
 
205
        self.acquire()
 
206
        try:
 
207
            return self._obj[i]
 
208
        finally:
 
209
            self.release()
 
210
 
 
211
    def __setitem__(self, i, value):
 
212
        self.acquire()
 
213
        try:
 
214
            self._obj[i] = value
 
215
        finally:
 
216
            self.release()
 
217
 
 
218
    def __getslice__(self, start, stop):
 
219
        self.acquire()
 
220
        try:
 
221
            return self._obj[start:stop]
 
222
        finally:
 
223
            self.release()
 
224
 
 
225
    def __setslice__(self, start, stop, values):
 
226
        self.acquire()
 
227
        try:
 
228
            self._obj[start:stop] = values
 
229
        finally:
 
230
            self.release()
 
231
 
 
232
 
 
233
class SynchronizedString(SynchronizedArray):
 
234
    value = make_property('value')
 
235
    raw = make_property('raw')