~ubuntu-branches/ubuntu/precise/pyzmq/precise

« back to all changes in this revision

Viewing changes to zmq/tests/test_device.py

  • Committer: Package Import Robot
  • Author(s): Debian Python Modules Team
  • Date: 2011-09-23 00:16:39 UTC
  • mfrom: (1.1.4 upstream)
  • Revision ID: package-import@ubuntu.com-20110923001639-girjqodpb7uv17yu
Tags: 2.1.9-1
* New upstream version
  - should build on kFreeBSD without patches (Closes: #637777).
* Build-depend on zeromq 2.1.9

Show diffs side-by-side

added added

removed removed

Lines of Context:
31
31
#-----------------------------------------------------------------------------
32
32
# Tests
33
33
#-----------------------------------------------------------------------------
34
 
 
 
34
devices.ThreadDevice.context_factory = zmq.Context
35
35
 
36
36
class TestDevice(BaseZMQTestCase):
37
37
    
38
38
    def test_device_types(self):
39
39
        for devtype in (zmq.STREAMER, zmq.FORWARDER, zmq.QUEUE):
40
 
            dev = devices.Device(devtype, zmq.PAIR,zmq.PAIR)
 
40
            dev = devices.Device(devtype, zmq.PAIR, zmq.PAIR)
41
41
            self.assertEquals(dev.device_type, devtype)
42
42
            del dev
43
43
    
44
44
    def test_device_attributes(self):
45
 
        dev = devices.Device(zmq.FORWARDER, zmq.SUB, zmq.PUB)
 
45
        dev = devices.Device(zmq.QUEUE, zmq.SUB, zmq.PUB)
46
46
        self.assertEquals(dev.in_type, zmq.SUB)
47
47
        self.assertEquals(dev.out_type, zmq.PUB)
48
 
        self.assertEquals(dev.device_type, zmq.FORWARDER)
 
48
        self.assertEquals(dev.device_type, zmq.QUEUE)
49
49
        self.assertEquals(dev.daemon, True)
50
50
        del dev
51
51
    
59
59
        
60
60
    
61
61
    def test_single_socket_forwarder_connect(self):
62
 
        dev = devices.ThreadDevice(zmq.FORWARDER, zmq.REP, -1)
 
62
        dev = devices.ThreadDevice(zmq.QUEUE, zmq.REP, -1)
63
63
        req = self.context.socket(zmq.REQ)
64
64
        port = req.bind_to_random_port('tcp://127.0.0.1')
65
65
        dev.connect_in('tcp://127.0.0.1:%i'%port)
70
70
        self.assertEquals(msg, req.recv())
71
71
        del dev
72
72
        req.close()
73
 
        dev = devices.ThreadDevice(zmq.FORWARDER, zmq.REP, -1)
 
73
        dev = devices.ThreadDevice(zmq.QUEUE, zmq.REP, -1)
74
74
        req = self.context.socket(zmq.REQ)
75
75
        port = req.bind_to_random_port('tcp://127.0.0.1')
76
76
        dev.connect_out('tcp://127.0.0.1:%i'%port)
83
83
        req.close()
84
84
        
85
85
    def test_single_socket_forwarder_bind(self):
86
 
        dev = devices.ThreadDevice(zmq.FORWARDER, zmq.REP, -1)
 
86
        dev = devices.ThreadDevice(zmq.QUEUE, zmq.REP, -1)
87
87
        # select random port:
88
88
        binder = self.context.socket(zmq.REQ)
89
89
        port = binder.bind_to_random_port('tcp://127.0.0.1')
99
99
        self.assertEquals(msg, req.recv())
100
100
        del dev
101
101
        req.close()
102
 
        dev = devices.ThreadDevice(zmq.FORWARDER, zmq.REP, -1)
 
102
        dev = devices.ThreadDevice(zmq.QUEUE, zmq.REP, -1)
103
103
        # select random port:
104
104
        binder = self.context.socket(zmq.REQ)
105
105
        port = binder.bind_to_random_port('tcp://127.0.0.1')