~ntt-pf-lab/nova/monkey_patch_notification

« back to all changes in this revision

Viewing changes to vendor/Twisted-10.0.0/twisted/words/test/test_jabbersasl.py

  • Committer: Jesse Andrews
  • Date: 2010-05-28 06:05:26 UTC
  • Revision ID: git-v1:bf6e6e718cdc7488e2da87b21e258ccc065fe499
initial commit

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# Copyright (c) 2001-2009 Twisted Matrix Laboratories.
 
2
# See LICENSE for details.
 
3
 
 
4
from zope.interface import implements
 
5
from twisted.internet import defer
 
6
from twisted.trial import unittest
 
7
from twisted.words.protocols.jabber import sasl, sasl_mechanisms, xmlstream, jid
 
8
from twisted.words.xish import domish
 
9
 
 
10
NS_XMPP_SASL = 'urn:ietf:params:xml:ns:xmpp-sasl'
 
11
 
 
12
class DummySASLMechanism(object):
 
13
    """
 
14
    Dummy SASL mechanism.
 
15
 
 
16
    This just returns the initialResponse passed on creation, stores any
 
17
    challenges and replies with an empty response.
 
18
 
 
19
    @ivar challenge: Last received challenge.
 
20
    @type challenge: C{unicode}.
 
21
    @ivar initialResponse: Initial response to be returned when requested
 
22
                           via C{getInitialResponse} or C{None}.
 
23
    @type initialResponse: C{unicode}
 
24
    """
 
25
 
 
26
    implements(sasl_mechanisms.ISASLMechanism)
 
27
 
 
28
    challenge = None
 
29
    name = "DUMMY"
 
30
 
 
31
    def __init__(self, initialResponse):
 
32
        self.initialResponse = initialResponse
 
33
 
 
34
    def getInitialResponse(self):
 
35
        return self.initialResponse
 
36
 
 
37
    def getResponse(self, challenge):
 
38
        self.challenge = challenge
 
39
        return ""
 
40
 
 
41
class DummySASLInitiatingInitializer(sasl.SASLInitiatingInitializer):
 
42
    """
 
43
    Dummy SASL Initializer for initiating entities.
 
44
 
 
45
    This hardwires the SASL mechanism to L{DummySASLMechanism}, that is
 
46
    instantiated with the value of C{initialResponse}.
 
47
 
 
48
    @ivar initialResponse: The initial response to be returned by the
 
49
                           dummy SASL mechanism or C{None}.
 
50
    @type initialResponse: C{unicode}.
 
51
    """
 
52
 
 
53
    initialResponse = None
 
54
 
 
55
    def setMechanism(self):
 
56
        self.mechanism = DummySASLMechanism(self.initialResponse)
 
57
 
 
58
 
 
59
 
 
60
class SASLInitiatingInitializerTest(unittest.TestCase):
 
61
    """
 
62
    Tests for L{sasl.SASLInitiatingInitializer}
 
63
    """
 
64
 
 
65
    def setUp(self):
 
66
        self.output = []
 
67
 
 
68
        self.authenticator = xmlstream.Authenticator()
 
69
        self.xmlstream = xmlstream.XmlStream(self.authenticator)
 
70
        self.xmlstream.send = self.output.append
 
71
        self.xmlstream.connectionMade()
 
72
        self.xmlstream.dataReceived("<stream:stream xmlns='jabber:client' "
 
73
                        "xmlns:stream='http://etherx.jabber.org/streams' "
 
74
                        "from='example.com' id='12345' version='1.0'>")
 
75
        self.init = DummySASLInitiatingInitializer(self.xmlstream)
 
76
 
 
77
 
 
78
    def test_onFailure(self):
 
79
        """
 
80
        Test that the SASL error condition is correctly extracted.
 
81
        """
 
82
        failure = domish.Element(('urn:ietf:params:xml:ns:xmpp-sasl',
 
83
                                  'failure'))
 
84
        failure.addElement('not-authorized')
 
85
        self.init._deferred = defer.Deferred()
 
86
        self.init.onFailure(failure)
 
87
        self.assertFailure(self.init._deferred, sasl.SASLAuthError)
 
88
        self.init._deferred.addCallback(lambda e:
 
89
                                        self.assertEquals('not-authorized',
 
90
                                                          e.condition))
 
91
        return self.init._deferred
 
92
 
 
93
 
 
94
    def test_sendAuthInitialResponse(self):
 
95
        """
 
96
        Test starting authentication with an initial response.
 
97
        """
 
98
        self.init.initialResponse = "dummy"
 
99
        self.init.start()
 
100
        auth = self.output[0]
 
101
        self.assertEquals(NS_XMPP_SASL, auth.uri)
 
102
        self.assertEquals('auth', auth.name)
 
103
        self.assertEquals('DUMMY', auth['mechanism'])
 
104
        self.assertEquals('ZHVtbXk=', str(auth))
 
105
 
 
106
 
 
107
    def test_sendAuthNoInitialResponse(self):
 
108
        """
 
109
        Test starting authentication without an initial response.
 
110
        """
 
111
        self.init.initialResponse = None
 
112
        self.init.start()
 
113
        auth = self.output[0]
 
114
        self.assertEquals('', str(auth))
 
115
 
 
116
 
 
117
    def test_sendAuthEmptyInitialResponse(self):
 
118
        """
 
119
        Test starting authentication where the initial response is empty.
 
120
        """
 
121
        self.init.initialResponse = ""
 
122
        self.init.start()
 
123
        auth = self.output[0]
 
124
        self.assertEquals('=', str(auth))
 
125
 
 
126
 
 
127
    def test_onChallenge(self):
 
128
        """
 
129
        Test receiving a challenge message.
 
130
        """
 
131
        d = self.init.start()
 
132
        challenge = domish.Element((NS_XMPP_SASL, 'challenge'))
 
133
        challenge.addContent('bXkgY2hhbGxlbmdl')
 
134
        self.init.onChallenge(challenge)
 
135
        self.assertEqual('my challenge', self.init.mechanism.challenge)
 
136
        self.init.onSuccess(None)
 
137
        return d
 
138
 
 
139
 
 
140
    def test_onChallengeEmpty(self):
 
141
        """
 
142
        Test receiving an empty challenge message.
 
143
        """
 
144
        d = self.init.start()
 
145
        challenge = domish.Element((NS_XMPP_SASL, 'challenge'))
 
146
        self.init.onChallenge(challenge)
 
147
        self.assertEqual('', self.init.mechanism.challenge)
 
148
        self.init.onSuccess(None)
 
149
        return d
 
150
 
 
151
 
 
152
    def test_onChallengeIllegalPadding(self):
 
153
        """
 
154
        Test receiving a challenge message with illegal padding.
 
155
        """
 
156
        d = self.init.start()
 
157
        challenge = domish.Element((NS_XMPP_SASL, 'challenge'))
 
158
        challenge.addContent('bXkg=Y2hhbGxlbmdl')
 
159
        self.init.onChallenge(challenge)
 
160
        self.assertFailure(d, sasl.SASLIncorrectEncodingError)
 
161
        return d
 
162
 
 
163
 
 
164
    def test_onChallengeIllegalCharacters(self):
 
165
        """
 
166
        Test receiving a challenge message with illegal characters.
 
167
        """
 
168
        d = self.init.start()
 
169
        challenge = domish.Element((NS_XMPP_SASL, 'challenge'))
 
170
        challenge.addContent('bXkg*Y2hhbGxlbmdl')
 
171
        self.init.onChallenge(challenge)
 
172
        self.assertFailure(d, sasl.SASLIncorrectEncodingError)
 
173
        return d
 
174
 
 
175
 
 
176
    def test_onChallengeMalformed(self):
 
177
        """
 
178
        Test receiving a malformed challenge message.
 
179
        """
 
180
        d = self.init.start()
 
181
        challenge = domish.Element((NS_XMPP_SASL, 'challenge'))
 
182
        challenge.addContent('a')
 
183
        self.init.onChallenge(challenge)
 
184
        self.assertFailure(d, sasl.SASLIncorrectEncodingError)
 
185
        return d
 
186
 
 
187
 
 
188
class SASLInitiatingInitializerSetMechanismTest(unittest.TestCase):
 
189
    """
 
190
    Test for L{sasl.SASLInitiatingInitializer.setMechanism}.
 
191
    """
 
192
 
 
193
    def setUp(self):
 
194
        self.output = []
 
195
 
 
196
        self.authenticator = xmlstream.Authenticator()
 
197
        self.xmlstream = xmlstream.XmlStream(self.authenticator)
 
198
        self.xmlstream.send = self.output.append
 
199
        self.xmlstream.connectionMade()
 
200
        self.xmlstream.dataReceived("<stream:stream xmlns='jabber:client' "
 
201
                        "xmlns:stream='http://etherx.jabber.org/streams' "
 
202
                        "from='example.com' id='12345' version='1.0'>")
 
203
 
 
204
        self.init = sasl.SASLInitiatingInitializer(self.xmlstream)
 
205
 
 
206
 
 
207
    def _setMechanism(self, name):
 
208
        """
 
209
        Set up the XML Stream to have a SASL feature with the given mechanism.
 
210
        """
 
211
        feature = domish.Element((NS_XMPP_SASL, 'mechanisms'))
 
212
        feature.addElement('mechanism', content=name)
 
213
        self.xmlstream.features[(feature.uri, feature.name)] = feature
 
214
 
 
215
        self.init.setMechanism()
 
216
        return self.init.mechanism.name
 
217
 
 
218
 
 
219
    def test_anonymous(self):
 
220
        """
 
221
        Test setting ANONYMOUS as the authentication mechanism.
 
222
        """
 
223
        self.authenticator.jid = jid.JID('example.com')
 
224
        self.authenticator.password = None
 
225
        name = "ANONYMOUS"
 
226
 
 
227
        self.assertEqual(name, self._setMechanism(name))
 
228
 
 
229
 
 
230
    def test_plain(self):
 
231
        """
 
232
        Test setting PLAIN as the authentication mechanism.
 
233
        """
 
234
        self.authenticator.jid = jid.JID('test@example.com')
 
235
        self.authenticator.password = 'secret'
 
236
        name = "PLAIN"
 
237
 
 
238
        self.assertEqual(name, self._setMechanism(name))
 
239
 
 
240
 
 
241
    def test_digest(self):
 
242
        """
 
243
        Test setting DIGEST-MD5 as the authentication mechanism.
 
244
        """
 
245
        self.authenticator.jid = jid.JID('test@example.com')
 
246
        self.authenticator.password = 'secret'
 
247
        name = "DIGEST-MD5"
 
248
 
 
249
        self.assertEqual(name, self._setMechanism(name))
 
250
 
 
251
 
 
252
    def test_notAcceptable(self):
 
253
        """
 
254
        Test using an unacceptable SASL authentication mechanism.
 
255
        """
 
256
 
 
257
        self.authenticator.jid = jid.JID('test@example.com')
 
258
        self.authenticator.password = 'secret'
 
259
 
 
260
        self.assertRaises(sasl.SASLNoAcceptableMechanism,
 
261
                          self._setMechanism, 'SOMETHING_UNACCEPTABLE')
 
262
 
 
263
 
 
264
    def test_notAcceptableWithoutUser(self):
 
265
        """
 
266
        Test using an unacceptable SASL authentication mechanism with no JID.
 
267
        """
 
268
        self.authenticator.jid = jid.JID('example.com')
 
269
        self.authenticator.password = 'secret'
 
270
 
 
271
        self.assertRaises(sasl.SASLNoAcceptableMechanism,
 
272
                          self._setMechanism, 'SOMETHING_UNACCEPTABLE')