1
# Copyright (c) 2001-2009 Twisted Matrix Laboratories.
2
# See LICENSE for details.
4
from zope.interface import implements
5
from twisted.internet import defer
6
from twisted.trial import unittest
7
from twisted.words.protocols.jabber import sasl, sasl_mechanisms, xmlstream, jid
8
from twisted.words.xish import domish
10
NS_XMPP_SASL = 'urn:ietf:params:xml:ns:xmpp-sasl'
12
class DummySASLMechanism(object):
16
This just returns the initialResponse passed on creation, stores any
17
challenges and replies with an empty response.
19
@ivar challenge: Last received challenge.
20
@type challenge: C{unicode}.
21
@ivar initialResponse: Initial response to be returned when requested
22
via C{getInitialResponse} or C{None}.
23
@type initialResponse: C{unicode}
26
implements(sasl_mechanisms.ISASLMechanism)
31
def __init__(self, initialResponse):
32
self.initialResponse = initialResponse
34
def getInitialResponse(self):
35
return self.initialResponse
37
def getResponse(self, challenge):
38
self.challenge = challenge
41
class DummySASLInitiatingInitializer(sasl.SASLInitiatingInitializer):
43
Dummy SASL Initializer for initiating entities.
45
This hardwires the SASL mechanism to L{DummySASLMechanism}, that is
46
instantiated with the value of C{initialResponse}.
48
@ivar initialResponse: The initial response to be returned by the
49
dummy SASL mechanism or C{None}.
50
@type initialResponse: C{unicode}.
53
initialResponse = None
55
def setMechanism(self):
56
self.mechanism = DummySASLMechanism(self.initialResponse)
60
class SASLInitiatingInitializerTest(unittest.TestCase):
62
Tests for L{sasl.SASLInitiatingInitializer}
68
self.authenticator = xmlstream.Authenticator()
69
self.xmlstream = xmlstream.XmlStream(self.authenticator)
70
self.xmlstream.send = self.output.append
71
self.xmlstream.connectionMade()
72
self.xmlstream.dataReceived("<stream:stream xmlns='jabber:client' "
73
"xmlns:stream='http://etherx.jabber.org/streams' "
74
"from='example.com' id='12345' version='1.0'>")
75
self.init = DummySASLInitiatingInitializer(self.xmlstream)
78
def test_onFailure(self):
80
Test that the SASL error condition is correctly extracted.
82
failure = domish.Element(('urn:ietf:params:xml:ns:xmpp-sasl',
84
failure.addElement('not-authorized')
85
self.init._deferred = defer.Deferred()
86
self.init.onFailure(failure)
87
self.assertFailure(self.init._deferred, sasl.SASLAuthError)
88
self.init._deferred.addCallback(lambda e:
89
self.assertEquals('not-authorized',
91
return self.init._deferred
94
def test_sendAuthInitialResponse(self):
96
Test starting authentication with an initial response.
98
self.init.initialResponse = "dummy"
100
auth = self.output[0]
101
self.assertEquals(NS_XMPP_SASL, auth.uri)
102
self.assertEquals('auth', auth.name)
103
self.assertEquals('DUMMY', auth['mechanism'])
104
self.assertEquals('ZHVtbXk=', str(auth))
107
def test_sendAuthNoInitialResponse(self):
109
Test starting authentication without an initial response.
111
self.init.initialResponse = None
113
auth = self.output[0]
114
self.assertEquals('', str(auth))
117
def test_sendAuthEmptyInitialResponse(self):
119
Test starting authentication where the initial response is empty.
121
self.init.initialResponse = ""
123
auth = self.output[0]
124
self.assertEquals('=', str(auth))
127
def test_onChallenge(self):
129
Test receiving a challenge message.
131
d = self.init.start()
132
challenge = domish.Element((NS_XMPP_SASL, 'challenge'))
133
challenge.addContent('bXkgY2hhbGxlbmdl')
134
self.init.onChallenge(challenge)
135
self.assertEqual('my challenge', self.init.mechanism.challenge)
136
self.init.onSuccess(None)
140
def test_onChallengeEmpty(self):
142
Test receiving an empty challenge message.
144
d = self.init.start()
145
challenge = domish.Element((NS_XMPP_SASL, 'challenge'))
146
self.init.onChallenge(challenge)
147
self.assertEqual('', self.init.mechanism.challenge)
148
self.init.onSuccess(None)
152
def test_onChallengeIllegalPadding(self):
154
Test receiving a challenge message with illegal padding.
156
d = self.init.start()
157
challenge = domish.Element((NS_XMPP_SASL, 'challenge'))
158
challenge.addContent('bXkg=Y2hhbGxlbmdl')
159
self.init.onChallenge(challenge)
160
self.assertFailure(d, sasl.SASLIncorrectEncodingError)
164
def test_onChallengeIllegalCharacters(self):
166
Test receiving a challenge message with illegal characters.
168
d = self.init.start()
169
challenge = domish.Element((NS_XMPP_SASL, 'challenge'))
170
challenge.addContent('bXkg*Y2hhbGxlbmdl')
171
self.init.onChallenge(challenge)
172
self.assertFailure(d, sasl.SASLIncorrectEncodingError)
176
def test_onChallengeMalformed(self):
178
Test receiving a malformed challenge message.
180
d = self.init.start()
181
challenge = domish.Element((NS_XMPP_SASL, 'challenge'))
182
challenge.addContent('a')
183
self.init.onChallenge(challenge)
184
self.assertFailure(d, sasl.SASLIncorrectEncodingError)
188
class SASLInitiatingInitializerSetMechanismTest(unittest.TestCase):
190
Test for L{sasl.SASLInitiatingInitializer.setMechanism}.
196
self.authenticator = xmlstream.Authenticator()
197
self.xmlstream = xmlstream.XmlStream(self.authenticator)
198
self.xmlstream.send = self.output.append
199
self.xmlstream.connectionMade()
200
self.xmlstream.dataReceived("<stream:stream xmlns='jabber:client' "
201
"xmlns:stream='http://etherx.jabber.org/streams' "
202
"from='example.com' id='12345' version='1.0'>")
204
self.init = sasl.SASLInitiatingInitializer(self.xmlstream)
207
def _setMechanism(self, name):
209
Set up the XML Stream to have a SASL feature with the given mechanism.
211
feature = domish.Element((NS_XMPP_SASL, 'mechanisms'))
212
feature.addElement('mechanism', content=name)
213
self.xmlstream.features[(feature.uri, feature.name)] = feature
215
self.init.setMechanism()
216
return self.init.mechanism.name
219
def test_anonymous(self):
221
Test setting ANONYMOUS as the authentication mechanism.
223
self.authenticator.jid = jid.JID('example.com')
224
self.authenticator.password = None
227
self.assertEqual(name, self._setMechanism(name))
230
def test_plain(self):
232
Test setting PLAIN as the authentication mechanism.
234
self.authenticator.jid = jid.JID('test@example.com')
235
self.authenticator.password = 'secret'
238
self.assertEqual(name, self._setMechanism(name))
241
def test_digest(self):
243
Test setting DIGEST-MD5 as the authentication mechanism.
245
self.authenticator.jid = jid.JID('test@example.com')
246
self.authenticator.password = 'secret'
249
self.assertEqual(name, self._setMechanism(name))
252
def test_notAcceptable(self):
254
Test using an unacceptable SASL authentication mechanism.
257
self.authenticator.jid = jid.JID('test@example.com')
258
self.authenticator.password = 'secret'
260
self.assertRaises(sasl.SASLNoAcceptableMechanism,
261
self._setMechanism, 'SOMETHING_UNACCEPTABLE')
264
def test_notAcceptableWithoutUser(self):
266
Test using an unacceptable SASL authentication mechanism with no JID.
268
self.authenticator.jid = jid.JID('example.com')
269
self.authenticator.password = 'secret'
271
self.assertRaises(sasl.SASLNoAcceptableMechanism,
272
self._setMechanism, 'SOMETHING_UNACCEPTABLE')