2
# Copyright (c) 2010 Min Ragan-Kelley
4
# This file is part of pyzmq.
6
# pyzmq is free software; you can redistribute it and/or modify it under
7
# the terms of the Lesser GNU General Public License as published by
8
# the Free Software Foundation; either version 3 of the License, or
9
# (at your option) any later version.
11
# pyzmq is distributed in the hope that it will be useful,
12
# but WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14
# Lesser GNU General Public License for more details.
16
# You should have received a copy of the Lesser GNU General Public License
17
# along with this program. If not, see <http://www.gnu.org/licenses/>.
1
#-----------------------------------------------------------------------------
2
# Copyright (c) 2010-2012 Brian Granger, Min Ragan-Kelley
4
# This file is part of pyzmq
6
# Distributed under the terms of the New BSD License. The full license is in
7
# the file COPYING.BSD, distributed as part of this software.
8
#-----------------------------------------------------------------------------
20
10
#-----------------------------------------------------------------------------
27
17
from zmq import devices
28
from zmq.tests import BaseZMQTestCase
18
from zmq.tests import BaseZMQTestCase, SkipTest
29
19
from zmq.utils.strtypes import (bytes,unicode,basestring)
31
21
#-----------------------------------------------------------------------------
33
23
#-----------------------------------------------------------------------------
24
devices.ThreadDevice.context_factory = zmq.Context
36
26
class TestDevice(BaseZMQTestCase):
38
28
def test_device_types(self):
39
# a = self.context.socket(zmq.SUB)
40
29
for devtype in (zmq.STREAMER, zmq.FORWARDER, zmq.QUEUE):
41
dev = devices.Device(devtype, zmq.PAIR,zmq.PAIR)
30
dev = devices.Device(devtype, zmq.PAIR, zmq.PAIR)
42
31
self.assertEquals(dev.device_type, devtype)
46
34
def test_device_attributes(self):
47
# a = self.context.socket(zmq.SUB)
48
# b = self.context.socket(zmq.PUB)
49
dev = devices.Device(zmq.FORWARDER, zmq.SUB, zmq.PUB)
50
self.assert_(dev.in_type == zmq.SUB)
51
self.assert_(dev.out_type == zmq.PUB)
52
self.assertEquals(dev.device_type, zmq.FORWARDER)
35
dev = devices.Device(zmq.QUEUE, zmq.SUB, zmq.PUB)
36
self.assertEquals(dev.in_type, zmq.SUB)
37
self.assertEquals(dev.out_type, zmq.PUB)
38
self.assertEquals(dev.device_type, zmq.QUEUE)
53
39
self.assertEquals(dev.daemon, True)
57
42
def test_tsdevice_attributes(self):
66
51
def test_single_socket_forwarder_connect(self):
68
dev = devices.ThreadDevice(zmq.FORWARDER, zmq.REP, -1)
52
dev = devices.ThreadDevice(zmq.QUEUE, zmq.REP, -1)
69
53
req = self.context.socket(zmq.REQ)
70
54
port = req.bind_to_random_port('tcp://127.0.0.1')
71
55
dev.connect_in('tcp://127.0.0.1:%i'%port)
74
msg = 'hello'.encode()
76
self.assertEquals(msg, req.recv())
60
self.assertEquals(msg, self.recv(req))
79
dev = devices.ThreadDevice(zmq.FORWARDER, zmq.REP, -1)
63
dev = devices.ThreadDevice(zmq.QUEUE, zmq.REP, -1)
80
64
req = self.context.socket(zmq.REQ)
81
65
port = req.bind_to_random_port('tcp://127.0.0.1')
82
66
dev.connect_out('tcp://127.0.0.1:%i'%port)
85
msg = 'hello again'.encode()
87
self.assertEquals(msg, req.recv())
71
self.assertEquals(msg, self.recv(req))
91
75
def test_single_socket_forwarder_bind(self):
93
dev = devices.ThreadDevice(zmq.FORWARDER, zmq.REP, -1)
94
req = self.context.socket(zmq.REQ)
96
req.connect('tcp://127.0.0.1:%i'%port)
97
dev.bind_in('tcp://127.0.0.1:%i'%port)
100
msg = 'hello'.encode()
102
self.assertEquals(msg, req.recv())
105
dev = devices.ThreadDevice(zmq.FORWARDER, zmq.REP, -1)
106
req = self.context.socket(zmq.REQ)
108
req.connect('tcp://127.0.0.1:%i'%port)
109
dev.bind_in('tcp://127.0.0.1:%i'%port)
112
msg = 'hello again'.encode()
114
self.assertEquals(msg, req.recv())
76
dev = devices.ThreadDevice(zmq.QUEUE, zmq.REP, -1)
78
binder = self.context.socket(zmq.REQ)
79
port = binder.bind_to_random_port('tcp://127.0.0.1')
82
req = self.context.socket(zmq.REQ)
83
req.connect('tcp://127.0.0.1:%i'%port)
84
dev.bind_in('tcp://127.0.0.1:%i'%port)
89
self.assertEquals(msg, self.recv(req))
92
dev = devices.ThreadDevice(zmq.QUEUE, zmq.REP, -1)
94
binder = self.context.socket(zmq.REQ)
95
port = binder.bind_to_random_port('tcp://127.0.0.1')
98
req = self.context.socket(zmq.REQ)
99
req.connect('tcp://127.0.0.1:%i'%port)
100
dev.bind_in('tcp://127.0.0.1:%i'%port)
105
self.assertEquals(msg, self.recv(req))