1
"""A Socket subclass that adds some serialization methods."""
4
import cPickle as pickle
10
class SerializingSocket(zmq.Socket):
11
"""A class with some extra serialization methods
13
send_zipped_pickle is just like send_pyobj, but uses
14
zlib to compress the stream before sending.
16
send_array sends numpy arrays with metadata necessary
17
for reconstructing the array on the other side (dtype,shape).
20
def send_zipped_pickle(self, obj, flags=0, protocol=-1):
21
"""pack and compress an object with pickle and zlib."""
22
pobj = pickle.dumps(obj, protocol)
23
zobj = zlib.compress(pobj)
24
print 'zipped pickle is %i bytes'%len(zobj)
25
return self.send(zobj, flags=flags)
27
def recv_zipped_pickle(self, flags=0):
28
"""reconstruct a Python object sent with zipped_pickle"""
29
zobj = self.recv(flags)
30
pobj = zlib.decompress(zobj)
31
return pickle.loads(pobj)
33
def send_array(self, A, flags=0, copy=True, track=False):
34
"""send a numpy array with metadata"""
39
self.send_json(md, flags|zmq.SNDMORE)
40
return self.send(A, flags, copy=copy, track=track)
42
def recv_array(self, flags=0, copy=True, track=False):
43
"""recv a numpy array"""
44
md = self.recv_json(flags=flags)
45
msg = self.recv(flags=flags, copy=copy, track=track)
47
A = numpy.frombuffer(buf, dtype=md['dtype'])
48
return A.reshape(md['shape'])
51
if __name__ == '__main__':
52
ctx = zmq.Context.instance()
53
req = SerializingSocket(ctx, zmq.REQ)
54
rep = SerializingSocket(ctx, zmq.REP)
56
rep.bind('inproc://a')
57
req.connect('inproc://a')
58
A = numpy.ones((1024,1024))
59
print "Array is %i bytes"%len(buffer(A))
61
# send/recv with pickle+zip
62
req.send_zipped_pickle(A)
63
B = rep.recv_zipped_pickle()
64
# now try non-copying version
65
rep.send_array(A, copy=False)
66
C = req.recv_array(copy=False)
67
print ("Checking zipped pickle...")
68
print ("Okay" if (A==B).all() else "Failed")
69
print ("Checking send_array...")
70
print ("Okay" if (C==B).all() else "Failed")