~landscape/zope3/newer-from-ztk

« back to all changes in this revision

Viewing changes to src/twisted/words/protocols/jabber/sasl_mechanisms.py

  • Committer: Thomas Hervé
  • Date: 2009-07-08 13:52:04 UTC
  • Revision ID: thomas@canonical.com-20090708135204-df5eesrthifpylf8
Remove twisted copy

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# -*- test-case-name: twisted.words.test.test_jabbersaslmechanisms -*-
2
 
#
3
 
# Copyright (c) 2001-2006 Twisted Matrix Laboratories.
4
 
# See LICENSE for details.
5
 
 
6
 
"""
7
 
Protocol agnostic implementations of SASL authentication mechanisms.
8
 
"""
9
 
 
10
 
import md5, binascii, random, time, os
11
 
 
12
 
from zope.interface import Interface, Attribute, implements
13
 
 
14
 
class ISASLMechanism(Interface):
15
 
    name = Attribute("""Common name for the SASL Mechanism.""")
16
 
 
17
 
    def getInitialResponse():
18
 
        """
19
 
        Get the initial client response, if defined for this mechanism.
20
 
 
21
 
        @return: initial client response string.
22
 
        @rtype: L{str}.
23
 
        """
24
 
 
25
 
 
26
 
    def getResponse(challenge):
27
 
        """
28
 
        Get the response to a server challenge.
29
 
 
30
 
        @param challenge: server challenge.
31
 
        @type challenge: L{str}.
32
 
        @return: client response.
33
 
        @rtype: L{str}.
34
 
        """
35
 
 
36
 
 
37
 
 
38
 
class Plain(object):
39
 
    """
40
 
    Implements the PLAIN SASL authentication mechanism.
41
 
 
42
 
    The PLAIN SASL authentication mechanism is defined in RFC 2595.
43
 
    """
44
 
    implements(ISASLMechanism)
45
 
 
46
 
    name = 'PLAIN'
47
 
 
48
 
    def __init__(self, authzid, authcid, password):
49
 
        self.authzid = authzid or ''
50
 
        self.authcid = authcid or ''
51
 
        self.password = password or ''
52
 
 
53
 
 
54
 
    def getInitialResponse(self):
55
 
        return "%s\x00%s\x00%s" % (self.authzid.encode('utf-8'),
56
 
                                   self.authcid.encode('utf-8'),
57
 
                                   self.password.encode('utf-8'))
58
 
 
59
 
 
60
 
 
61
 
class DigestMD5(object):
62
 
    """
63
 
    Implements the DIGEST-MD5 SASL authentication mechanism.
64
 
 
65
 
    The DIGEST-MD5 SASL authentication mechanism is defined in RFC 2831.
66
 
    """
67
 
    implements(ISASLMechanism)
68
 
 
69
 
    name = 'DIGEST-MD5'
70
 
 
71
 
    def __init__(self, serv_type, host, serv_name, username, password):
72
 
        self.username = username
73
 
        self.password = password
74
 
        self.defaultRealm = host
75
 
 
76
 
        self.digest_uri = '%s/%s' % (serv_type, host)
77
 
        if serv_name is not None:
78
 
            self.digest_uri += '/%s' % serv_name
79
 
 
80
 
 
81
 
    def getInitialResponse(self):
82
 
        return None
83
 
 
84
 
 
85
 
    def getResponse(self, challenge):
86
 
        directives = self._parse(challenge)
87
 
 
88
 
        # Compat for implementations that do not send this along with
89
 
        # a succesful authentication.
90
 
        if directives.has_key('rspauth'):
91
 
            return ''
92
 
 
93
 
        try:
94
 
            realm = directives['realm']
95
 
        except KeyError:
96
 
            realm = self.defaultRealm
97
 
 
98
 
        return self._gen_response(directives['charset'],
99
 
                                  realm,
100
 
                                  directives['nonce'])
101
 
 
102
 
    def _parse(self, challenge):
103
 
        """
104
 
        Parses the server challenge.
105
 
 
106
 
        Splits the challenge into a dictionary of directives with values.
107
 
 
108
 
        @return: challenge directives and their values.
109
 
        @rtype: L{dict} of L{str} to L{str}.
110
 
        """
111
 
        directive_list = challenge.split(',')
112
 
        directives = {}
113
 
        for directive in directive_list:
114
 
            name, value = directive.split('=')
115
 
            value = value.replace("'","")
116
 
            value = value.replace('"','')
117
 
            directives[name] = value
118
 
        return directives
119
 
 
120
 
 
121
 
    def _unparse(self, directives):
122
 
        """
123
 
        Create message string from directives.
124
 
 
125
 
        @param directives: dictionary of directives (names to their values).
126
 
                           For certain directives, extra quotes are added, as
127
 
                           needed.
128
 
        @type directives: L{dict} of L{str} to L{str}
129
 
        @return: message string.
130
 
        @rtype: L{str}.
131
 
        """
132
 
 
133
 
        directive_list = []
134
 
        for name, value in directives.iteritems():
135
 
            if name in ('username', 'realm', 'cnonce',
136
 
                        'nonce', 'digest-uri', 'authzid'):
137
 
                directive = '%s="%s"' % (name, value)
138
 
            else:
139
 
                directive = '%s=%s' % (name, value)
140
 
 
141
 
            directive_list.append(directive)
142
 
 
143
 
        return ','.join(directive_list)
144
 
 
145
 
 
146
 
    def _gen_response(self, charset, realm, nonce):
147
 
        """
148
 
        Generate response-value.
149
 
 
150
 
        Creates a response to a challenge according to section 2.1.2.1 of
151
 
        RFC 2831 using the L{charset}, L{realm} and L{nonce} directives
152
 
        from the challenge.
153
 
        """
154
 
 
155
 
        def H(s):
156
 
            return md5.new(s).digest()
157
 
 
158
 
        def HEX(n):
159
 
            return binascii.b2a_hex(n)
160
 
 
161
 
        def KD(k, s):
162
 
            return H('%s:%s' % (k, s))
163
 
 
164
 
        try:
165
 
            username = self.username.encode(charset)
166
 
            password = self.password.encode(charset)
167
 
        except UnicodeError:
168
 
            # TODO - add error checking
169
 
            raise
170
 
 
171
 
        nc = '%08x' % 1 # TODO: support subsequent auth.
172
 
        cnonce = self._gen_nonce()
173
 
        qop = 'auth'
174
 
 
175
 
        # TODO - add support for authzid
176
 
        a1 = "%s:%s:%s" % (H("%s:%s:%s" % (username, realm, password)),
177
 
                           nonce,
178
 
                           cnonce)
179
 
        a2 = "AUTHENTICATE:%s" % self.digest_uri
180
 
 
181
 
        response = HEX( KD ( HEX(H(a1)),
182
 
                             "%s:%s:%s:%s:%s" % (nonce, nc,
183
 
                                                 cnonce, "auth", HEX(H(a2)))))
184
 
 
185
 
        directives = {'username': username,
186
 
                      'realm' : realm,
187
 
                      'nonce' : nonce,
188
 
                      'cnonce' : cnonce,
189
 
                      'nc' : nc,
190
 
                      'qop' : qop,
191
 
                      'digest-uri': self.digest_uri,
192
 
                      'response': response,
193
 
                      'charset': charset}
194
 
 
195
 
        return self._unparse(directives)
196
 
 
197
 
 
198
 
    def _gen_nonce(self):
199
 
        return md5.new("%s:%s:%s" % (str(random.random()) , str(time.gmtime()),str(os.getpid()))).hexdigest()