~ubuntu-branches/ubuntu/precise/pyzmq/precise

« back to all changes in this revision

Viewing changes to zmq/tests/test_socket.py

  • Committer: Bazaar Package Importer
  • Author(s): Piotr Ożarowski
  • Date: 2011-02-15 09:08:36 UTC
  • mfrom: (2.1.2 experimental)
  • Revision ID: james.westby@ubuntu.com-20110215090836-phh4slym1g6muucn
Tags: 2.0.10.1-2
* Team upload.
* Upload to unstable
* Add Breaks: ${python:Breaks}

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
#!/usr/bin/env python
 
2
# -*- coding: utf8 -*-
1
3
#
2
4
#    Copyright (c) 2010 Brian E. Granger
3
5
#
21
23
# Imports
22
24
#-----------------------------------------------------------------------------
23
25
 
 
26
import sys
 
27
import time
 
28
 
24
29
import zmq
25
 
from zmq.tests import BaseZMQTestCase
 
30
from zmq.tests import BaseZMQTestCase, SkipTest
 
31
from zmq.utils.strtypes import bytes, unicode
 
32
try:
 
33
    from queue import Queue
 
34
except:
 
35
    from Queue import Queue
26
36
 
27
37
#-----------------------------------------------------------------------------
28
38
# Tests
29
39
#-----------------------------------------------------------------------------
30
40
 
31
 
 
32
41
class TestSocket(BaseZMQTestCase):
33
42
 
34
43
    def test_create(self):
35
 
        ctx = zmq.Context()
36
 
        s = ctx.socket(zmq.PUB)
 
44
        s = self.context.socket(zmq.PUB)
37
45
        # Superluminal protocol not yet implemented
38
46
        self.assertRaisesErrno(zmq.EPROTONOSUPPORT, s.bind, 'ftl://')
39
47
        self.assertRaisesErrno(zmq.EPROTONOSUPPORT, s.connect, 'ftl://')
 
48
        s.close()
 
49
        # del ctx
 
50
    
 
51
    def test_unicode_sockopts(self):
 
52
        """test setting/getting sockopts with unicode strings"""
 
53
        topic = "tést"
 
54
        if str is not unicode:
 
55
            topic = topic.decode('utf8')
 
56
        p,s = self.create_bound_pair(zmq.PUB, zmq.SUB)
 
57
        self.assertEquals(s.send_unicode, s.send_unicode)
 
58
        self.assertEquals(p.recv_unicode, p.recv_unicode)
 
59
        self.assertRaises(TypeError, s.setsockopt, zmq.SUBSCRIBE, topic)
 
60
        self.assertRaises(TypeError, s.setsockopt, zmq.IDENTITY, topic)
 
61
        self.assertRaises(TypeError, s.setsockopt, zmq.AFFINITY, topic)
 
62
        s.setsockopt_unicode(zmq.SUBSCRIBE, topic)
 
63
        s.setsockopt_unicode(zmq.IDENTITY, topic, 'utf16')
 
64
        self.assertRaises(TypeError, s.getsockopt_unicode, zmq.AFFINITY)
 
65
        self.assertRaises(TypeError, s.getsockopt_unicode, zmq.SUBSCRIBE)
 
66
        st = s.getsockopt(zmq.IDENTITY)
 
67
        self.assertEquals(st.decode('utf16'), s.getsockopt_unicode(zmq.IDENTITY, 'utf16'))
 
68
        time.sleep(0.1) # wait for connection/subscription
 
69
        p.send_unicode(topic,zmq.SNDMORE)
 
70
        p.send_unicode(topic*2, encoding='latin-1')
 
71
        self.assertEquals(topic, s.recv_unicode())
 
72
        self.assertEquals(topic*2, s.recv_unicode(encoding='latin-1'))
 
73
    
 
74
    def test_send_unicode(self):
 
75
        "test sending unicode objects"
 
76
        a,b = self.create_bound_pair(zmq.PAIR, zmq.PAIR)
 
77
        self.sockets.extend([a,b])
 
78
        u = "çπ§"
 
79
        if str is not unicode:
 
80
            u = u.decode('utf8')
 
81
        self.assertRaises(TypeError, a.send, u,copy=False)
 
82
        self.assertRaises(TypeError, a.send, u,copy=True)
 
83
        a.send_unicode(u)
 
84
        s = b.recv()
 
85
        self.assertEquals(s,u.encode('utf8'))
 
86
        self.assertEquals(s.decode('utf8'),u)
 
87
        a.send_unicode(u,encoding='utf16')
 
88
        s = b.recv_unicode(encoding='utf16')
 
89
        self.assertEquals(s,u)
 
90
        
 
91
    def test_tracker(self):
 
92
        "test the MessageTracker object for tracking when zmq is done with a buffer"
 
93
        addr = 'tcp://127.0.0.1'
 
94
        a = self.context.socket(zmq.XREQ)
 
95
        port = a.bind_to_random_port(addr)
 
96
        a.close()
 
97
        del a 
 
98
        iface = "%s:%i"%(addr,port)
 
99
        a = self.context.socket(zmq.XREQ)
 
100
        a.setsockopt(zmq.IDENTITY, "a".encode())
 
101
        b = self.context.socket(zmq.XREP)
 
102
        self.sockets.extend([a,b])
 
103
        a.connect(iface)
 
104
        time.sleep(0.1)
 
105
        p1 = a.send('something'.encode(), copy=False, track=True)
 
106
        self.assertTrue(isinstance(p1, zmq.MessageTracker))
 
107
        self.assertFalse(p1.done)
 
108
        p2 = a.send_multipart(list(map(str.encode, ['something', 'else'])), copy=False, track=True)
 
109
        self.assert_(isinstance(p2, zmq.MessageTracker))
 
110
        self.assertEquals(p2.done, False)
 
111
        self.assertEquals(p1.done, False)
40
112
 
 
113
        b.bind(iface)
 
114
        msg = b.recv_multipart()
 
115
        self.assertEquals(p1.done, True)
 
116
        self.assertEquals(msg, (list(map(str.encode, ['a', 'something']))))
 
117
        msg = b.recv_multipart()
 
118
        self.assertEquals(p2.done, True)
 
119
        self.assertEquals(msg, list(map(str.encode, ['a', 'something', 'else'])))
 
120
        m = zmq.Message("again".encode(), track=True)
 
121
        self.assertEquals(m.done, False)
 
122
        p1 = a.send(m, copy=False)
 
123
        p2 = a.send(m, copy=False)
 
124
        self.assertEquals(m.done, False)
 
125
        self.assertEquals(p1.done, False)
 
126
        self.assertEquals(p2.done, False)
 
127
        msg = b.recv_multipart()
 
128
        self.assertEquals(m.done, False)
 
129
        self.assertEquals(msg, list(map(str.encode, ['a', 'again'])))
 
130
        msg = b.recv_multipart()
 
131
        self.assertEquals(m.done, False)
 
132
        self.assertEquals(msg, list(map(str.encode, ['a', 'again'])))
 
133
        self.assertEquals(p1.done, False)
 
134
        self.assertEquals(p2.done, False)
 
135
        pm = m.tracker
 
136
        del m
 
137
        time.sleep(0.1)
 
138
        self.assertEquals(p1.done, True)
 
139
        self.assertEquals(p2.done, True)
 
140
        m = zmq.Message('something'.encode(), track=False)
 
141
        self.assertRaises(ValueError, a.send, m, copy=False, track=True)
 
142
        
41
143
 
42
144
    def test_close(self):
43
 
        ctx = zmq.Context()
44
 
        s = ctx.socket(zmq.PUB)
 
145
        s = self.context.socket(zmq.PUB)
45
146
        s.close()
46
 
        self.assertRaises(zmq.ZMQError, s.bind, '')
47
 
        self.assertRaises(zmq.ZMQError, s.connect, '')
48
 
        self.assertRaises(zmq.ZMQError, s.setsockopt, zmq.SUBSCRIBE, '')
49
 
        self.assertRaises(zmq.ZMQError, s.send, 'asdf')
 
147
        self.assertRaises(zmq.ZMQError, s.bind, ''.encode())
 
148
        self.assertRaises(zmq.ZMQError, s.connect, ''.encode())
 
149
        self.assertRaises(zmq.ZMQError, s.setsockopt, zmq.SUBSCRIBE, ''.encode())
 
150
        self.assertRaises(zmq.ZMQError, s.send, 'asdf'.encode())
50
151
        self.assertRaises(zmq.ZMQError, s.recv)
 
152
    
51
153