1
# -*- test-case-name: twisted.words.test.test_jabbersaslmechanisms -*-
3
# Copyright (c) 2001-2006 Twisted Matrix Laboratories.
4
# See LICENSE for details.
7
Protocol agnostic implementations of SASL authentication mechanisms.
10
import md5, binascii, random, time, os
12
from zope.interface import Interface, Attribute, implements
14
class ISASLMechanism(Interface):
15
name = Attribute("""Common name for the SASL Mechanism.""")
17
def getInitialResponse():
19
Get the initial client response, if defined for this mechanism.
21
@return: initial client response string.
26
def getResponse(challenge):
28
Get the response to a server challenge.
30
@param challenge: server challenge.
31
@type challenge: L{str}.
32
@return: client response.
40
Implements the PLAIN SASL authentication mechanism.
42
The PLAIN SASL authentication mechanism is defined in RFC 2595.
44
implements(ISASLMechanism)
48
def __init__(self, authzid, authcid, password):
49
self.authzid = authzid or ''
50
self.authcid = authcid or ''
51
self.password = password or ''
54
def getInitialResponse(self):
55
return "%s\x00%s\x00%s" % (self.authzid.encode('utf-8'),
56
self.authcid.encode('utf-8'),
57
self.password.encode('utf-8'))
61
class DigestMD5(object):
63
Implements the DIGEST-MD5 SASL authentication mechanism.
65
The DIGEST-MD5 SASL authentication mechanism is defined in RFC 2831.
67
implements(ISASLMechanism)
71
def __init__(self, serv_type, host, serv_name, username, password):
72
self.username = username
73
self.password = password
74
self.defaultRealm = host
76
self.digest_uri = '%s/%s' % (serv_type, host)
77
if serv_name is not None:
78
self.digest_uri += '/%s' % serv_name
81
def getInitialResponse(self):
85
def getResponse(self, challenge):
86
directives = self._parse(challenge)
88
# Compat for implementations that do not send this along with
89
# a succesful authentication.
90
if directives.has_key('rspauth'):
94
realm = directives['realm']
96
realm = self.defaultRealm
98
return self._gen_response(directives['charset'],
102
def _parse(self, challenge):
104
Parses the server challenge.
106
Splits the challenge into a dictionary of directives with values.
108
@return: challenge directives and their values.
109
@rtype: L{dict} of L{str} to L{str}.
111
directive_list = challenge.split(',')
113
for directive in directive_list:
114
name, value = directive.split('=')
115
value = value.replace("'","")
116
value = value.replace('"','')
117
directives[name] = value
121
def _unparse(self, directives):
123
Create message string from directives.
125
@param directives: dictionary of directives (names to their values).
126
For certain directives, extra quotes are added, as
128
@type directives: L{dict} of L{str} to L{str}
129
@return: message string.
134
for name, value in directives.iteritems():
135
if name in ('username', 'realm', 'cnonce',
136
'nonce', 'digest-uri', 'authzid'):
137
directive = '%s="%s"' % (name, value)
139
directive = '%s=%s' % (name, value)
141
directive_list.append(directive)
143
return ','.join(directive_list)
146
def _gen_response(self, charset, realm, nonce):
148
Generate response-value.
150
Creates a response to a challenge according to section 2.1.2.1 of
151
RFC 2831 using the L{charset}, L{realm} and L{nonce} directives
156
return md5.new(s).digest()
159
return binascii.b2a_hex(n)
162
return H('%s:%s' % (k, s))
165
username = self.username.encode(charset)
166
password = self.password.encode(charset)
168
# TODO - add error checking
171
nc = '%08x' % 1 # TODO: support subsequent auth.
172
cnonce = self._gen_nonce()
175
# TODO - add support for authzid
176
a1 = "%s:%s:%s" % (H("%s:%s:%s" % (username, realm, password)),
179
a2 = "AUTHENTICATE:%s" % self.digest_uri
181
response = HEX( KD ( HEX(H(a1)),
182
"%s:%s:%s:%s:%s" % (nonce, nc,
183
cnonce, "auth", HEX(H(a2)))))
185
directives = {'username': username,
191
'digest-uri': self.digest_uri,
192
'response': response,
195
return self._unparse(directives)
198
def _gen_nonce(self):
199
return md5.new("%s:%s:%s" % (str(random.random()) , str(time.gmtime()),str(os.getpid()))).hexdigest()