2
# Module which supports allocation of ctypes objects from shared memory
4
# multiprocessing/sharedctypes.py
6
# Copyright (c) 2007-2008, R Oudkerk --- see COPYING.txt
13
from multiprocessing import heap, RLock
14
from multiprocessing.forking import assert_spawning, ForkingPickler
16
__all__ = ['RawValue', 'RawArray', 'Value', 'Array', 'copy', 'synchronized']
23
'c': ctypes.c_char, 'u': ctypes.c_wchar,
24
'b': ctypes.c_byte, 'B': ctypes.c_ubyte,
25
'h': ctypes.c_short, 'H': ctypes.c_ushort,
26
'i': ctypes.c_int, 'I': ctypes.c_uint,
27
'l': ctypes.c_long, 'L': ctypes.c_ulong,
28
'f': ctypes.c_float, 'd': ctypes.c_double
35
def _new_value(type_):
36
size = ctypes.sizeof(type_)
37
wrapper = heap.BufferWrapper(size)
38
return rebuild_ctype(type_, wrapper, None)
40
def RawValue(typecode_or_type, *args):
42
Returns a ctypes object allocated from shared memory
44
type_ = typecode_to_type.get(typecode_or_type, typecode_or_type)
45
obj = _new_value(type_)
46
ctypes.memset(ctypes.addressof(obj), 0, ctypes.sizeof(obj))
50
def RawArray(typecode_or_type, size_or_initializer):
52
Returns a ctypes array allocated from shared memory
54
type_ = typecode_to_type.get(typecode_or_type, typecode_or_type)
55
if isinstance(size_or_initializer, int):
56
type_ = type_ * size_or_initializer
57
return _new_value(type_)
59
type_ = type_ * len(size_or_initializer)
60
result = _new_value(type_)
61
result.__init__(*size_or_initializer)
64
def Value(typecode_or_type, *args, lock=None):
66
Return a synchronization wrapper for a Value
68
obj = RawValue(typecode_or_type, *args)
71
if lock in (True, None):
73
if not hasattr(lock, 'acquire'):
74
raise AttributeError("'%r' has no method 'acquire'" % lock)
75
return synchronized(obj, lock)
77
def Array(typecode_or_type, size_or_initializer, **kwds):
79
Return a synchronization wrapper for a RawArray
81
lock = kwds.pop('lock', None)
83
raise ValueError('unrecognized keyword argument(s): %s' % list(kwds.keys()))
84
obj = RawArray(typecode_or_type, size_or_initializer)
87
if lock in (True, None):
89
if not hasattr(lock, 'acquire'):
90
raise AttributeError("'%r' has no method 'acquire'" % lock)
91
return synchronized(obj, lock)
94
new_obj = _new_value(type(obj))
95
ctypes.pointer(new_obj)[0] = obj
98
def synchronized(obj, lock=None):
99
assert not isinstance(obj, SynchronizedBase), 'object already synchronized'
101
if isinstance(obj, ctypes._SimpleCData):
102
return Synchronized(obj, lock)
103
elif isinstance(obj, ctypes.Array):
104
if obj._type_ is ctypes.c_char:
105
return SynchronizedString(obj, lock)
106
return SynchronizedArray(obj, lock)
110
scls = class_cache[cls]
112
names = [field[0] for field in cls._fields_]
113
d = dict((name, make_property(name)) for name in names)
114
classname = 'Synchronized' + cls.__name__
115
scls = class_cache[cls] = type(classname, (SynchronizedBase,), d)
116
return scls(obj, lock)
119
# Functions for pickling/unpickling
122
def reduce_ctype(obj):
124
if isinstance(obj, ctypes.Array):
125
return rebuild_ctype, (obj._type_, obj._wrapper, obj._length_)
127
return rebuild_ctype, (type(obj), obj._wrapper, None)
129
def rebuild_ctype(type_, wrapper, length):
130
if length is not None:
131
type_ = type_ * length
132
ForkingPickler.register(type_, reduce_ctype)
133
obj = type_.from_address(wrapper.get_address())
134
obj._wrapper = wrapper
138
# Function to create properties
141
def make_property(name):
143
return prop_cache[name]
146
exec(template % ((name,)*7), d)
147
prop_cache[name] = d[name]
157
def set%s(self, value):
163
%s = property(get%s, set%s)
167
class_cache = weakref.WeakKeyDictionary()
170
# Synchronized wrappers
173
class SynchronizedBase(object):
175
def __init__(self, obj, lock=None):
177
self._lock = lock or RLock()
178
self.acquire = self._lock.acquire
179
self.release = self._lock.release
181
def __reduce__(self):
182
assert_spawning(self)
183
return synchronized, (self._obj, self._lock)
192
return '<%s wrapper for %s>' % (type(self).__name__, self._obj)
195
class Synchronized(SynchronizedBase):
196
value = make_property('value')
199
class SynchronizedArray(SynchronizedBase):
202
return len(self._obj)
204
def __getitem__(self, i):
211
def __setitem__(self, i, value):
218
def __getslice__(self, start, stop):
221
return self._obj[start:stop]
225
def __setslice__(self, start, stop, values):
228
self._obj[start:stop] = values
233
class SynchronizedString(SynchronizedArray):
234
value = make_property('value')
235
raw = make_property('raw')