~ubuntu-branches/ubuntu/wily/pyzmq/wily

« back to all changes in this revision

Viewing changes to zmq/tests/test_device.py

  • Committer: Package Import Robot
  • Author(s): Julian Taylor
  • Date: 2013-02-24 19:23:15 UTC
  • mfrom: (1.2.1) (9 sid)
  • mto: This revision was merged to the branch mainline in revision 10.
  • Revision ID: package-import@ubuntu.com-20130224192315-qhmwp3m3ymk8r60d
Tags: 2.2.0.1-1
* New upstream release
* relicense debian packaging to LGPL-3
* update watch file to use github directly
  thanks to Bart Martens for the file
* add autopkgtests
* drop obsolete DM-Upload-Allowed
* bump standard to 3.9.4, no changes required

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
#
2
 
#    Copyright (c) 2010 Min Ragan-Kelley
3
 
#
4
 
#    This file is part of pyzmq.
5
 
#
6
 
#    pyzmq is free software; you can redistribute it and/or modify it under
7
 
#    the terms of the Lesser GNU General Public License as published by
8
 
#    the Free Software Foundation; either version 3 of the License, or
9
 
#    (at your option) any later version.
10
 
#
11
 
#    pyzmq is distributed in the hope that it will be useful,
12
 
#    but WITHOUT ANY WARRANTY; without even the implied warranty of
13
 
#    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14
 
#    Lesser GNU General Public License for more details.
15
 
#
16
 
#    You should have received a copy of the Lesser GNU General Public License
17
 
#    along with this program.  If not, see <http://www.gnu.org/licenses/>.
18
 
#
 
1
#-----------------------------------------------------------------------------
 
2
#  Copyright (c) 2010-2012 Brian Granger, Min Ragan-Kelley
 
3
#
 
4
#  This file is part of pyzmq
 
5
#
 
6
#  Distributed under the terms of the New BSD License.  The full license is in
 
7
#  the file COPYING.BSD, distributed as part of this software.
 
8
#-----------------------------------------------------------------------------
19
9
 
20
10
#-----------------------------------------------------------------------------
21
11
# Imports
25
15
 
26
16
import zmq
27
17
from zmq import devices
28
 
from zmq.tests import BaseZMQTestCase
 
18
from zmq.tests import BaseZMQTestCase, SkipTest
29
19
from zmq.utils.strtypes import (bytes,unicode,basestring)
30
20
 
31
21
#-----------------------------------------------------------------------------
32
22
# Tests
33
23
#-----------------------------------------------------------------------------
34
 
 
 
24
devices.ThreadDevice.context_factory = zmq.Context
35
25
 
36
26
class TestDevice(BaseZMQTestCase):
37
 
 
 
27
    
38
28
    def test_device_types(self):
39
 
        # a = self.context.socket(zmq.SUB)
40
29
        for devtype in (zmq.STREAMER, zmq.FORWARDER, zmq.QUEUE):
41
 
            dev = devices.Device(devtype, zmq.PAIR,zmq.PAIR)
 
30
            dev = devices.Device(devtype, zmq.PAIR, zmq.PAIR)
42
31
            self.assertEquals(dev.device_type, devtype)
43
32
            del dev
44
 
        # del a
45
33
    
46
34
    def test_device_attributes(self):
47
 
        # a = self.context.socket(zmq.SUB)
48
 
        # b = self.context.socket(zmq.PUB)
49
 
        dev = devices.Device(zmq.FORWARDER, zmq.SUB, zmq.PUB)
50
 
        self.assert_(dev.in_type == zmq.SUB)
51
 
        self.assert_(dev.out_type == zmq.PUB)
52
 
        self.assertEquals(dev.device_type, zmq.FORWARDER)
 
35
        dev = devices.Device(zmq.QUEUE, zmq.SUB, zmq.PUB)
 
36
        self.assertEquals(dev.in_type, zmq.SUB)
 
37
        self.assertEquals(dev.out_type, zmq.PUB)
 
38
        self.assertEquals(dev.device_type, zmq.QUEUE)
53
39
        self.assertEquals(dev.daemon, True)
54
 
        # del a
55
40
        del dev
56
41
    
57
42
    def test_tsdevice_attributes(self):
64
49
        
65
50
    
66
51
    def test_single_socket_forwarder_connect(self):
67
 
        self.skip_if_pgm()
68
 
        dev = devices.ThreadDevice(zmq.FORWARDER, zmq.REP, -1)
 
52
        dev = devices.ThreadDevice(zmq.QUEUE, zmq.REP, -1)
69
53
        req = self.context.socket(zmq.REQ)
70
54
        port = req.bind_to_random_port('tcp://127.0.0.1')
71
55
        dev.connect_in('tcp://127.0.0.1:%i'%port)
72
56
        dev.start()
73
57
        time.sleep(.25)
74
 
        msg = 'hello'.encode()
 
58
        msg = b'hello'
75
59
        req.send(msg)
76
 
        self.assertEquals(msg, req.recv())
 
60
        self.assertEquals(msg, self.recv(req))
77
61
        del dev
78
 
        del req
79
 
        dev = devices.ThreadDevice(zmq.FORWARDER, zmq.REP, -1)
 
62
        req.close()
 
63
        dev = devices.ThreadDevice(zmq.QUEUE, zmq.REP, -1)
80
64
        req = self.context.socket(zmq.REQ)
81
65
        port = req.bind_to_random_port('tcp://127.0.0.1')
82
66
        dev.connect_out('tcp://127.0.0.1:%i'%port)
83
67
        dev.start()
84
68
        time.sleep(.25)
85
 
        msg = 'hello again'.encode()
 
69
        msg = b'hello again'
86
70
        req.send(msg)
87
 
        self.assertEquals(msg, req.recv())
 
71
        self.assertEquals(msg, self.recv(req))
88
72
        del dev
89
 
        del req
 
73
        req.close()
90
74
        
91
75
    def test_single_socket_forwarder_bind(self):
92
 
        self.skip_if_pgm()
93
 
        dev = devices.ThreadDevice(zmq.FORWARDER, zmq.REP, -1)
94
 
        req = self.context.socket(zmq.REQ)
95
 
        port = 12345
96
 
        req.connect('tcp://127.0.0.1:%i'%port)
97
 
        dev.bind_in('tcp://127.0.0.1:%i'%port)
98
 
        dev.start()
99
 
        time.sleep(.25)
100
 
        msg = 'hello'.encode()
101
 
        req.send(msg)
102
 
        self.assertEquals(msg, req.recv())
103
 
        del dev
104
 
        del req
105
 
        dev = devices.ThreadDevice(zmq.FORWARDER, zmq.REP, -1)
106
 
        req = self.context.socket(zmq.REQ)
107
 
        port = 12346
108
 
        req.connect('tcp://127.0.0.1:%i'%port)
109
 
        dev.bind_in('tcp://127.0.0.1:%i'%port)
110
 
        dev.start()
111
 
        time.sleep(.25)
112
 
        msg = 'hello again'.encode()
113
 
        req.send(msg)
114
 
        self.assertEquals(msg, req.recv())
115
 
        del dev
116
 
        del req
 
76
        dev = devices.ThreadDevice(zmq.QUEUE, zmq.REP, -1)
 
77
        # select random port:
 
78
        binder = self.context.socket(zmq.REQ)
 
79
        port = binder.bind_to_random_port('tcp://127.0.0.1')
 
80
        binder.close()
 
81
        time.sleep(0.1)
 
82
        req = self.context.socket(zmq.REQ)
 
83
        req.connect('tcp://127.0.0.1:%i'%port)
 
84
        dev.bind_in('tcp://127.0.0.1:%i'%port)
 
85
        dev.start()
 
86
        time.sleep(.25)
 
87
        msg = b'hello'
 
88
        req.send(msg)
 
89
        self.assertEquals(msg, self.recv(req))
 
90
        del dev
 
91
        req.close()
 
92
        dev = devices.ThreadDevice(zmq.QUEUE, zmq.REP, -1)
 
93
        # select random port:
 
94
        binder = self.context.socket(zmq.REQ)
 
95
        port = binder.bind_to_random_port('tcp://127.0.0.1')
 
96
        binder.close()
 
97
        time.sleep(0.1)
 
98
        req = self.context.socket(zmq.REQ)
 
99
        req.connect('tcp://127.0.0.1:%i'%port)
 
100
        dev.bind_in('tcp://127.0.0.1:%i'%port)
 
101
        dev.start()
 
102
        time.sleep(.25)
 
103
        msg = b'hello again'
 
104
        req.send(msg)
 
105
        self.assertEquals(msg, self.recv(req))
 
106
        del dev
 
107
        req.close()
 
108