~gary/python-openid/python-openid-2.2.1-patched

« back to all changes in this revision

Viewing changes to openid/test/server.py

  • Committer: Launchpad Patch Queue Manager
  • Date: 2007-11-30 02:46:28 UTC
  • mfrom: (1.1.1 pyopenid-2.0)
  • Revision ID: launchpad@pqm.canonical.com-20071130024628-qktwsew3383iawmq
[rs=SteveA] upgrade to python-openid-2.0.1

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# -*- test-case-name: openid.test.server -*-
2
 
"""Tests for openid.server.
3
 
"""
4
 
from openid.server import server
5
 
from openid import association, cryptutil, oidutil
6
 
import _memstore
7
 
import cgi
8
 
 
9
 
import unittest
10
 
 
11
 
from urlparse import urlparse
12
 
 
13
 
# In general, if you edit or add tests here, try to move in the direction
14
 
# of testing smaller units.  For testing the external interfaces, we'll be
15
 
# developing an implementation-agnostic testing suite.
16
 
 
17
 
# for more, see /etc/ssh/moduli
18
 
 
19
 
ALT_MODULUS = 0xCAADDDEC1667FC68B5FA15D53C4E1532DD24561A1A2D47A12C01ABEA1E00731F6921AAC40742311FDF9E634BB7131BEE1AF240261554389A910425E044E88C8359B010F5AD2B80E29CB1A5B027B19D9E01A6F63A6F45E5D7ED2FF6A2A0085050A7D0CF307C3DB51D2490355907B4427C23A98DF1EB8ABEF2BA209BB7AFFE86A7
20
 
ALT_GEN = 5
21
 
 
22
 
class CatchLogs(object):
23
 
    def setUp(self):
24
 
        self.old_logger = oidutil.log
25
 
        oidutil.log = self.gotLogMessage
26
 
        self.messages = []
27
 
 
28
 
    def gotLogMessage(self, message):
29
 
        self.messages.append(message)
30
 
 
31
 
    def tearDown(self):
32
 
        oidutil.log = self.old_logger
33
 
 
34
 
class TestProtocolError(unittest.TestCase):
35
 
    def test_browserWithReturnTo(self):
36
 
        return_to = "http://rp.unittest/consumer"
37
 
        # will be a ProtocolError raised by Decode or CheckIDRequest.answer
38
 
        args = {
39
 
            'openid.mode': 'monkeydance',
40
 
            'openid.identity': 'http://wagu.unittest/',
41
 
            'openid.return_to': return_to,
42
 
            }
43
 
        e = server.ProtocolError(args, "plucky")
44
 
        self.failUnless(e.hasReturnTo())
45
 
        expected_args = {
46
 
            'openid.mode': ['error'],
47
 
            'openid.error': ['plucky'],
48
 
            }
49
 
 
50
 
        rt_base, result_args = e.encodeToURL().split('?', 1)
51
 
        result_args = cgi.parse_qs(result_args)
52
 
        self.failUnlessEqual(result_args, expected_args)
53
 
 
54
 
    def test_noReturnTo(self):
55
 
        # will be a ProtocolError raised by Decode or CheckIDRequest.answer
56
 
        args = {
57
 
            'openid.mode': 'zebradance',
58
 
            'openid.identity': 'http://wagu.unittest/',
59
 
            }
60
 
        e = server.ProtocolError(args, "waffles")
61
 
        self.failIf(e.hasReturnTo())
62
 
        expected = """error:waffles
63
 
mode:error
64
 
"""
65
 
        self.failUnlessEqual(e.encodeToKVForm(), expected)
66
 
 
67
 
 
68
 
 
69
 
class TestDecode(unittest.TestCase):
70
 
    def setUp(self):
71
 
        self.id_url = "http://decoder.am.unittest/"
72
 
        self.rt_url = "http://rp.unittest/foobot/?qux=zam"
73
 
        self.tr_url = "http://rp.unittest/"
74
 
        self.assoc_handle = "{assoc}{handle}"
75
 
        self.decode = server.Decoder().decode
76
 
 
77
 
    def test_none(self):
78
 
        args = {}
79
 
        r = self.decode(args)
80
 
        self.failUnlessEqual(r, None)
81
 
 
82
 
    def test_irrelevant(self):
83
 
        args = {
84
 
            'pony': 'spotted',
85
 
            'sreg.mutant_power': 'decaffinator',
86
 
            }
87
 
        r = self.decode(args)
88
 
        self.failUnlessEqual(r, None)
89
 
 
90
 
    def test_bad(self):
91
 
        args = {
92
 
            'openid.mode': 'twos-compliment',
93
 
            'openid.pants': 'zippered',
94
 
            }
95
 
        self.failUnlessRaises(server.ProtocolError, self.decode, args)
96
 
 
97
 
    def test_dictOfLists(self):
98
 
        args = {
99
 
            'openid.mode': ['checkid_setup'],
100
 
            'openid.identity': self.id_url,
101
 
            'openid.assoc_handle': self.assoc_handle,
102
 
            'openid.return_to': self.rt_url,
103
 
            'openid.trust_root': self.tr_url,
104
 
            }
105
 
        try:
106
 
            result = self.decode(args)
107
 
        except TypeError, err:
108
 
            self.failUnless(str(err).find('values') != -1, err)
109
 
        else:
110
 
            self.fail("Expected TypeError, but got result %s" % (result,))
111
 
 
112
 
    def test_checkidImmediate(self):
113
 
        args = {
114
 
            'openid.mode': 'checkid_immediate',
115
 
            'openid.identity': self.id_url,
116
 
            'openid.assoc_handle': self.assoc_handle,
117
 
            'openid.return_to': self.rt_url,
118
 
            'openid.trust_root': self.tr_url,
119
 
            # should be ignored
120
 
            'openid.some.extension': 'junk',
121
 
            }
122
 
        r = self.decode(args)
123
 
        self.failUnless(isinstance(r, server.CheckIDRequest))
124
 
        self.failUnlessEqual(r.mode, "checkid_immediate")
125
 
        self.failUnlessEqual(r.immediate, True)
126
 
        self.failUnlessEqual(r.identity, self.id_url)
127
 
        self.failUnlessEqual(r.trust_root, self.tr_url)
128
 
        self.failUnlessEqual(r.return_to, self.rt_url)
129
 
        self.failUnlessEqual(r.assoc_handle, self.assoc_handle)
130
 
 
131
 
    def test_checkidSetup(self):
132
 
        args = {
133
 
            'openid.mode': 'checkid_setup',
134
 
            'openid.identity': self.id_url,
135
 
            'openid.assoc_handle': self.assoc_handle,
136
 
            'openid.return_to': self.rt_url,
137
 
            'openid.trust_root': self.tr_url,
138
 
            }
139
 
        r = self.decode(args)
140
 
        self.failUnless(isinstance(r, server.CheckIDRequest))
141
 
        self.failUnlessEqual(r.mode, "checkid_setup")
142
 
        self.failUnlessEqual(r.immediate, False)
143
 
        self.failUnlessEqual(r.identity, self.id_url)
144
 
        self.failUnlessEqual(r.trust_root, self.tr_url)
145
 
        self.failUnlessEqual(r.return_to, self.rt_url)
146
 
 
147
 
    def test_checkidSetupNoIdentity(self):
148
 
        args = {
149
 
            'openid.mode': 'checkid_setup',
150
 
            'openid.assoc_handle': self.assoc_handle,
151
 
            'openid.return_to': self.rt_url,
152
 
            'openid.trust_root': self.tr_url,
153
 
            }
154
 
        try:
155
 
            result = self.decode(args)
156
 
        except server.ProtocolError, err:
157
 
            self.failUnless(err.query)
158
 
        else:
159
 
            self.fail("Expected ProtocolError, instead returned with %s" %
160
 
                      (result,))
161
 
 
162
 
    def test_checkidSetupNoReturn(self):
163
 
        args = {
164
 
            'openid.mode': 'checkid_setup',
165
 
            'openid.identity': self.id_url,
166
 
            'openid.assoc_handle': self.assoc_handle,
167
 
            'openid.trust_root': self.tr_url,
168
 
            }
169
 
        self.failUnlessRaises(server.ProtocolError, self.decode, args)
170
 
 
171
 
    def test_checkidSetupBadReturn(self):
172
 
        args = {
173
 
            'openid.mode': 'checkid_setup',
174
 
            'openid.identity': self.id_url,
175
 
            'openid.assoc_handle': self.assoc_handle,
176
 
            'openid.return_to': 'not a url',
177
 
            }
178
 
        try:
179
 
            result = self.decode(args)
180
 
        except server.ProtocolError, err:
181
 
            self.failUnless(err.query)
182
 
        else:
183
 
            self.fail("Expected ProtocolError, instead returned with %s" %
184
 
                      (result,))
185
 
 
186
 
    def test_checkidSetupUntrustedReturn(self):
187
 
        args = {
188
 
            'openid.mode': 'checkid_setup',
189
 
            'openid.identity': self.id_url,
190
 
            'openid.assoc_handle': self.assoc_handle,
191
 
            'openid.return_to': self.rt_url,
192
 
            'openid.trust_root': 'http://not-the-return-place.unittest/',
193
 
            }
194
 
        try:
195
 
            result = self.decode(args)
196
 
        except server.UntrustedReturnURL, err:
197
 
            self.failUnless(err.query)
198
 
        else:
199
 
            self.fail("Expected UntrustedReturnURL, instead returned with %s" %
200
 
                      (result,))
201
 
 
202
 
    def test_checkAuth(self):
203
 
        args = {
204
 
            'openid.mode': 'check_authentication',
205
 
            'openid.assoc_handle': '{dumb}{handle}',
206
 
            'openid.sig': 'sigblob',
207
 
            'openid.signed': 'foo,bar,mode',
208
 
            'openid.foo': 'signedval1',
209
 
            'openid.bar': 'signedval2',
210
 
            'openid.baz': 'unsigned',
211
 
            }
212
 
        r = self.decode(args)
213
 
        self.failUnless(isinstance(r, server.CheckAuthRequest))
214
 
        self.failUnlessEqual(r.mode, 'check_authentication')
215
 
        self.failUnlessEqual(r.sig, 'sigblob')
216
 
        self.failUnlessEqual(r.signed, [
217
 
            ('foo', 'signedval1'),
218
 
            ('bar', 'signedval2'),
219
 
            ('mode', 'id_res'),
220
 
            ])
221
 
        # XXX: test error cases (i.e. missing required fields)
222
 
 
223
 
 
224
 
    def test_checkAuthMissingSignedField(self):
225
 
        args = {
226
 
            'openid.mode': 'check_authentication',
227
 
            'openid.assoc_handle': '{dumb}{handle}',
228
 
            'openid.sig': 'sigblob',
229
 
            'openid.signed': 'foo,bar,mode',
230
 
            'openid.foo': 'signedval1',
231
 
            'openid.baz': 'unsigned',
232
 
            }
233
 
        self.failUnlessRaises(server.ProtocolError, self.decode, args)
234
 
 
235
 
 
236
 
    def test_checkAuthMissingSignature(self):
237
 
        args = {
238
 
            'openid.mode': 'check_authentication',
239
 
            'openid.assoc_handle': '{dumb}{handle}',
240
 
            'openid.signed': 'foo,bar,mode',
241
 
            'openid.foo': 'signedval1',
242
 
            'openid.bar': 'signedval2',
243
 
            'openid.baz': 'unsigned',
244
 
            }
245
 
        self.failUnlessRaises(server.ProtocolError, self.decode, args)
246
 
 
247
 
 
248
 
    def test_checkAuthAndInvalidate(self):
249
 
        args = {
250
 
            'openid.mode': 'check_authentication',
251
 
            'openid.assoc_handle': '{dumb}{handle}',
252
 
            'openid.invalidate_handle': '[[SMART_handle]]',
253
 
            'openid.sig': 'sigblob',
254
 
            'openid.signed': 'foo,bar,mode',
255
 
            'openid.foo': 'signedval1',
256
 
            'openid.bar': 'signedval2',
257
 
            'openid.baz': 'unsigned',
258
 
            }
259
 
        r = self.decode(args)
260
 
        self.failUnless(isinstance(r, server.CheckAuthRequest))
261
 
        self.failUnlessEqual(r.invalidate_handle, '[[SMART_handle]]')
262
 
 
263
 
 
264
 
    def test_associateDH(self):
265
 
        args = {
266
 
            'openid.mode': 'associate',
267
 
            'openid.session_type': 'DH-SHA1',
268
 
            'openid.dh_consumer_public': "Rzup9265tw==",
269
 
            }
270
 
        r = self.decode(args)
271
 
        self.failUnless(isinstance(r, server.AssociateRequest))
272
 
        self.failUnlessEqual(r.mode, "associate")
273
 
        self.failUnlessEqual(r.session.session_type, "DH-SHA1")
274
 
        self.failUnlessEqual(r.assoc_type, "HMAC-SHA1")
275
 
        self.failUnless(r.session.consumer_pubkey)
276
 
 
277
 
    def test_associateDHMissingKey(self):
278
 
        """Trying DH assoc w/o public key"""
279
 
        args = {
280
 
            'openid.mode': 'associate',
281
 
            'openid.session_type': 'DH-SHA1',
282
 
            }
283
 
        # Using DH-SHA1 without supplying dh_consumer_public is an error.
284
 
        self.failUnlessRaises(server.ProtocolError, self.decode, args)
285
 
 
286
 
 
287
 
    def test_associateDHpubKeyNotB64(self):
288
 
        args = {
289
 
            'openid.mode': 'associate',
290
 
            'openid.session_type': 'DH-SHA1',
291
 
            'openid.dh_consumer_public': "donkeydonkeydonkey",
292
 
            }
293
 
        self.failUnlessRaises(server.ProtocolError, self.decode, args)
294
 
 
295
 
 
296
 
    def test_associateDHModGen(self):
297
 
        # test dh with non-default but valid values for dh_modulus and dh_gen
298
 
        args = {
299
 
            'openid.mode': 'associate',
300
 
            'openid.session_type': 'DH-SHA1',
301
 
            'openid.dh_consumer_public': "Rzup9265tw==",
302
 
            'openid.dh_modulus': cryptutil.longToBase64(ALT_MODULUS),
303
 
            'openid.dh_gen': cryptutil.longToBase64(ALT_GEN) ,
304
 
            }
305
 
        r = self.decode(args)
306
 
        self.failUnless(isinstance(r, server.AssociateRequest))
307
 
        self.failUnlessEqual(r.mode, "associate")
308
 
        self.failUnlessEqual(r.session.session_type, "DH-SHA1")
309
 
        self.failUnlessEqual(r.assoc_type, "HMAC-SHA1")
310
 
        self.failUnlessEqual(r.session.dh.modulus, ALT_MODULUS)
311
 
        self.failUnlessEqual(r.session.dh.generator, ALT_GEN)
312
 
        self.failUnless(r.session.consumer_pubkey)
313
 
 
314
 
 
315
 
    def test_associateDHCorruptModGen(self):
316
 
        # test dh with non-default but valid values for dh_modulus and dh_gen
317
 
        args = {
318
 
            'openid.mode': 'associate',
319
 
            'openid.session_type': 'DH-SHA1',
320
 
            'openid.dh_consumer_public': "Rzup9265tw==",
321
 
            'openid.dh_modulus': 'pizza',
322
 
            'openid.dh_gen': 'gnocchi',
323
 
            }
324
 
        self.failUnlessRaises(server.ProtocolError, self.decode, args)
325
 
 
326
 
 
327
 
    def test_associateDHMissingModGen(self):
328
 
        # test dh with non-default but valid values for dh_modulus and dh_gen
329
 
        args = {
330
 
            'openid.mode': 'associate',
331
 
            'openid.session_type': 'DH-SHA1',
332
 
            'openid.dh_consumer_public': "Rzup9265tw==",
333
 
            'openid.dh_modulus': 'pizza',
334
 
            }
335
 
        self.failUnlessRaises(server.ProtocolError, self.decode, args)
336
 
 
337
 
 
338
 
#     def test_associateDHInvalidModGen(self):
339
 
#         # test dh with properly encoded values that are not a valid
340
 
#         #   modulus/generator combination.
341
 
#         args = {
342
 
#             'openid.mode': 'associate',
343
 
#             'openid.session_type': 'DH-SHA1',
344
 
#             'openid.dh_consumer_public': "Rzup9265tw==",
345
 
#             'openid.dh_modulus': cryptutil.longToBase64(9),
346
 
#             'openid.dh_gen': cryptutil.longToBase64(27) ,
347
 
#             }
348
 
#         self.failUnlessRaises(server.ProtocolError, self.decode, args)
349
 
#     test_associateDHInvalidModGen.todo = "low-priority feature"
350
 
 
351
 
 
352
 
    def test_associateWeirdSession(self):
353
 
        args = {
354
 
            'openid.mode': 'associate',
355
 
            'openid.session_type': 'FLCL6',
356
 
            'openid.dh_consumer_public': "YQ==\n",
357
 
            }
358
 
        self.failUnlessRaises(server.ProtocolError, self.decode, args)
359
 
 
360
 
 
361
 
    def test_associatePlain(self):
362
 
        args = {
363
 
            'openid.mode': 'associate',
364
 
            }
365
 
        r = self.decode(args)
366
 
        self.failUnless(isinstance(r, server.AssociateRequest))
367
 
        self.failUnlessEqual(r.mode, "associate")
368
 
        self.failUnlessEqual(r.session.session_type, "plaintext")
369
 
        self.failUnlessEqual(r.assoc_type, "HMAC-SHA1")
370
 
 
371
 
    def test_nomode(self):
372
 
        args = {
373
 
            'openid.session_type': 'DH-SHA1',
374
 
            'openid.dh_consumer_public': "my public keeey",
375
 
            }
376
 
        self.failUnlessRaises(server.ProtocolError, self.decode, args)
377
 
 
378
 
 
379
 
 
380
 
class TestEncode(unittest.TestCase):
381
 
    def setUp(self):
382
 
        self.encoder = server.Encoder()
383
 
        self.encode = self.encoder.encode
384
 
 
385
 
    def test_id_res(self):
386
 
        request = server.CheckIDRequest(
387
 
            identity = 'http://bombom.unittest/',
388
 
            trust_root = 'http://burr.unittest/',
389
 
            return_to = 'http://burr.unittest/999',
390
 
            immediate = False,
391
 
            )
392
 
        response = server.OpenIDResponse(request)
393
 
        response.fields = {
394
 
            'mode': 'id_res',
395
 
            'identity': request.identity,
396
 
            'return_to': request.return_to,
397
 
            }
398
 
        webresponse = self.encode(response)
399
 
        self.failUnlessEqual(webresponse.code, server.HTTP_REDIRECT)
400
 
        self.failUnless(webresponse.headers.has_key('location'))
401
 
 
402
 
        location = webresponse.headers['location']
403
 
        self.failUnless(location.startswith(request.return_to),
404
 
                        "%s does not start with %s" % (location,
405
 
                                                       request.return_to))
406
 
        query = cgi.parse_qs(urlparse(location)[4])
407
 
        # argh.
408
 
        q2 = dict([(k, v[0]) for k, v in query.iteritems()])
409
 
        expected = dict(
410
 
            [('openid.' + k, v) for k, v in response.fields.iteritems()])
411
 
        self.failUnlessEqual(q2, expected)
412
 
 
413
 
    def test_cancel(self):
414
 
        request = server.CheckIDRequest(
415
 
            identity = 'http://bombom.unittest/',
416
 
            trust_root = 'http://burr.unittest/',
417
 
            return_to = 'http://burr.unittest/999',
418
 
            immediate = False,
419
 
            )
420
 
        response = server.OpenIDResponse(request)
421
 
        response.fields = {
422
 
            'mode': 'cancel',
423
 
            }
424
 
        webresponse = self.encode(response)
425
 
        self.failUnlessEqual(webresponse.code, server.HTTP_REDIRECT)
426
 
        self.failUnless(webresponse.headers.has_key('location'))
427
 
 
428
 
    def test_assocReply(self):
429
 
        request = server.AssociateRequest.fromQuery({})
430
 
        response = server.OpenIDResponse(request)
431
 
        response.fields = {'assoc_handle': "every-zig"}
432
 
        webresponse = self.encode(response)
433
 
        body = """assoc_handle:every-zig
434
 
"""
435
 
        self.failUnlessEqual(webresponse.code, server.HTTP_OK)
436
 
        self.failUnlessEqual(webresponse.headers, {})
437
 
        self.failUnlessEqual(webresponse.body, body)
438
 
 
439
 
    def test_checkauthReply(self):
440
 
        request = server.CheckAuthRequest('a_sock_monkey',
441
 
                                          'siggggg',
442
 
                                          [])
443
 
        response = server.OpenIDResponse(request)
444
 
        response.fields = {
445
 
            'is_valid': 'true',
446
 
            'invalidate_handle': 'xXxX:xXXx'
447
 
            }
448
 
        body = """invalidate_handle:xXxX:xXXx
449
 
is_valid:true
450
 
"""
451
 
        webresponse = self.encode(response)
452
 
        self.failUnlessEqual(webresponse.code, server.HTTP_OK)
453
 
        self.failUnlessEqual(webresponse.headers, {})
454
 
        self.failUnlessEqual(webresponse.body, body)
455
 
 
456
 
    def test_unencodableError(self):
457
 
        args = {
458
 
            'openid.identity': 'http://limu.unittest/',
459
 
            }
460
 
        e = server.ProtocolError(args, "wet paint")
461
 
        self.failUnlessRaises(server.EncodingError, self.encode, e)
462
 
 
463
 
    def test_encodableError(self):
464
 
        args = {
465
 
            'openid.mode': 'associate',
466
 
            'openid.identity': 'http://limu.unittest/',
467
 
            }
468
 
        body="""error:snoot
469
 
mode:error
470
 
"""
471
 
        webresponse = self.encode(server.ProtocolError(args, "snoot"))
472
 
        self.failUnlessEqual(webresponse.code, server.HTTP_ERROR)
473
 
        self.failUnlessEqual(webresponse.headers, {})
474
 
        self.failUnlessEqual(webresponse.body, body)
475
 
 
476
 
 
477
 
 
478
 
class TestSigningEncode(unittest.TestCase):
479
 
    def setUp(self):
480
 
        self._dumb_key = server.Signatory._dumb_key
481
 
        self._normal_key = server.Signatory._normal_key
482
 
        self.store = _memstore.MemoryStore()
483
 
        self.request = server.CheckIDRequest(
484
 
            identity = 'http://bombom.unittest/',
485
 
            trust_root = 'http://burr.unittest/',
486
 
            return_to = 'http://burr.unittest/999',
487
 
            immediate = False,
488
 
            )
489
 
        self.response = server.OpenIDResponse(self.request)
490
 
        self.response.fields = {
491
 
            'mode': 'id_res',
492
 
            'identity': self.request.identity,
493
 
            'return_to': self.request.return_to,
494
 
            }
495
 
        self.response.signed.extend(['mode','identity','return_to'])
496
 
        self.signatory = server.Signatory(self.store)
497
 
        self.encoder = server.SigningEncoder(self.signatory)
498
 
        self.encode = self.encoder.encode
499
 
 
500
 
    def test_idres(self):
501
 
        assoc_handle = '{bicycle}{shed}'
502
 
        self.store.storeAssociation(
503
 
            self._normal_key,
504
 
            association.Association.fromExpiresIn(60, assoc_handle,
505
 
                                                  'sekrit', 'HMAC-SHA1'))
506
 
        self.request.assoc_handle = assoc_handle
507
 
        webresponse = self.encode(self.response)
508
 
        self.failUnlessEqual(webresponse.code, server.HTTP_REDIRECT)
509
 
        self.failUnless(webresponse.headers.has_key('location'))
510
 
 
511
 
        location = webresponse.headers['location']
512
 
        query = cgi.parse_qs(urlparse(location)[4])
513
 
        self.failUnless('openid.sig' in query)
514
 
        self.failUnless('openid.assoc_handle' in query)
515
 
        self.failUnless('openid.signed' in query)
516
 
 
517
 
    def test_idresDumb(self):
518
 
        webresponse = self.encode(self.response)
519
 
        self.failUnlessEqual(webresponse.code, server.HTTP_REDIRECT)
520
 
        self.failUnless(webresponse.headers.has_key('location'))
521
 
 
522
 
        location = webresponse.headers['location']
523
 
        query = cgi.parse_qs(urlparse(location)[4])
524
 
        self.failUnless('openid.sig' in query)
525
 
        self.failUnless('openid.assoc_handle' in query)
526
 
        self.failUnless('openid.signed' in query)
527
 
 
528
 
    def test_forgotStore(self):
529
 
        self.encoder.signatory = None
530
 
        self.failUnlessRaises(ValueError, self.encode, self.response)
531
 
 
532
 
    def test_cancel(self):
533
 
        request = server.CheckIDRequest(
534
 
            identity = 'http://bombom.unittest/',
535
 
            trust_root = 'http://burr.unittest/',
536
 
            return_to = 'http://burr.unittest/999',
537
 
            immediate = False,
538
 
            )
539
 
        response = server.OpenIDResponse(request)
540
 
        response.fields['mode'] = 'cancel'
541
 
        response.signed[:] = []
542
 
        webresponse = self.encode(response)
543
 
        self.failUnlessEqual(webresponse.code, server.HTTP_REDIRECT)
544
 
        self.failUnless(webresponse.headers.has_key('location'))
545
 
        location = webresponse.headers['location']
546
 
        query = cgi.parse_qs(urlparse(location)[4])
547
 
        self.failIf('openid.sig' in query, query.get('openid.sig'))
548
 
 
549
 
    def test_assocReply(self):
550
 
        request = server.AssociateRequest.fromQuery({})
551
 
        response = server.OpenIDResponse(request)
552
 
        response.fields = {'assoc_handle': "every-zig"}
553
 
        webresponse = self.encode(response)
554
 
        body = """assoc_handle:every-zig
555
 
"""
556
 
        self.failUnlessEqual(webresponse.code, server.HTTP_OK)
557
 
        self.failUnlessEqual(webresponse.headers, {})
558
 
        self.failUnlessEqual(webresponse.body, body)
559
 
 
560
 
    def test_alreadySigned(self):
561
 
        self.response.fields['sig'] = 'priorSig=='
562
 
        self.failUnlessRaises(server.AlreadySigned, self.encode, self.response)
563
 
 
564
 
 
565
 
 
566
 
class TestCheckID(unittest.TestCase):
567
 
    def setUp(self):
568
 
        self.request = server.CheckIDRequest(
569
 
            identity = 'http://bambam.unittest/',
570
 
            trust_root = 'http://bar.unittest/',
571
 
            return_to = 'http://bar.unittest/999',
572
 
            immediate = False,
573
 
            )
574
 
 
575
 
    def test_trustRootInvalid(self):
576
 
        self.request.trust_root = "http://foo.unittest/17"
577
 
        self.request.return_to = "http://foo.unittest/39"
578
 
        self.failIf(self.request.trustRootValid())
579
 
 
580
 
    def test_trustRootValid(self):
581
 
        self.request.trust_root = "http://foo.unittest/"
582
 
        self.request.return_to = "http://foo.unittest/39"
583
 
        self.failUnless(self.request.trustRootValid())
584
 
 
585
 
    def test_answerAllow(self):
586
 
        answer = self.request.answer(True)
587
 
        self.failUnlessEqual(answer.request, self.request)
588
 
        self.failUnlessEqual(answer.fields, {
589
 
            'mode': 'id_res',
590
 
            'identity': self.request.identity,
591
 
            'return_to': self.request.return_to,
592
 
            })
593
 
        signed = answer.signed[:]
594
 
        signed.sort()
595
 
        self.failUnlessEqual(signed, ["identity", "mode", "return_to"])
596
 
 
597
 
    def test_answerAllowNoTrustRoot(self):
598
 
        self.request.trust_root = None
599
 
        answer = self.request.answer(True)
600
 
        self.failUnlessEqual(answer.request, self.request)
601
 
        self.failUnlessEqual(answer.fields, {
602
 
            'mode': 'id_res',
603
 
            'identity': self.request.identity,
604
 
            'return_to': self.request.return_to,
605
 
            })
606
 
        signed = answer.signed[:]
607
 
        signed.sort()
608
 
        self.failUnlessEqual(signed, ["identity", "mode", "return_to"])
609
 
 
610
 
    def test_answerImmediateDeny(self):
611
 
        self.request.mode = 'checkid_immediate'
612
 
        self.request.immediate = True
613
 
        server_url = "http://setup-url.unittest/"
614
 
        # crappiting setup_url, you dirty my interface with your presence!
615
 
        answer = self.request.answer(False, server_url=server_url)
616
 
        self.failUnlessEqual(answer.request, self.request)
617
 
        self.failUnlessEqual(len(answer.fields), 2, answer.fields)
618
 
        self.failUnlessEqual(answer.fields.get('mode'), 'id_res')
619
 
        self.failUnless(answer.fields.get('user_setup_url', '').startswith(
620
 
            server_url))
621
 
        self.failUnlessEqual(answer.signed, [])
622
 
 
623
 
    def test_answerSetupDeny(self):
624
 
        answer = self.request.answer(False)
625
 
        self.failUnlessEqual(answer.fields, {
626
 
            'mode': 'cancel',
627
 
            })
628
 
        self.failUnlessEqual(answer.signed, [])
629
 
 
630
 
    def test_encodeToURL(self):
631
 
        server_url = 'http://openid-server.unittest/'
632
 
        result = self.request.encodeToURL(server_url)
633
 
 
634
 
        # How to check?  How about a round-trip test.
635
 
        base, result_args = result.split('?', 1)
636
 
        result_args = cgi.parse_qs(result_args)
637
 
        result_args = dict([(k, v[0]) for k, v in result_args.iteritems()])
638
 
        rebuilt_request = server.CheckIDRequest.fromQuery(result_args)
639
 
        self.failUnlessEqual(rebuilt_request.__dict__, self.request.__dict__)
640
 
 
641
 
    def test_getCancelURL(self):
642
 
        url = self.request.getCancelURL()
643
 
        expected = self.request.return_to + '?openid.mode=cancel'
644
 
        self.failUnlessEqual(url, expected)
645
 
 
646
 
    def test_getCancelURLimmed(self):
647
 
        self.request.mode = 'checkid_immediate'
648
 
        self.request.immediate = True
649
 
        self.failUnlessRaises(ValueError, self.request.getCancelURL)
650
 
 
651
 
 
652
 
 
653
 
class TestCheckIDExtension(unittest.TestCase):
654
 
 
655
 
    def setUp(self):
656
 
        self.request = server.CheckIDRequest(
657
 
            identity = 'http://bambam.unittest/',
658
 
            trust_root = 'http://bar.unittest/',
659
 
            return_to = 'http://bar.unittest/999',
660
 
            immediate = False,
661
 
            )
662
 
        self.response = server.OpenIDResponse(self.request)
663
 
        self.response.fields['mode'] = 'id_res'
664
 
        self.response.fields['blue'] = 'star'
665
 
        self.response.signed.extend(['mode','identity','return_to'])
666
 
 
667
 
 
668
 
    def test_addField(self):
669
 
        namespace = 'mj12'
670
 
        self.response.addField(namespace, 'bright', 'potato')
671
 
        self.failUnlessEqual(self.response.fields,
672
 
                             {'blue': 'star',
673
 
                              'mode': 'id_res',
674
 
                              'mj12.bright': 'potato'})
675
 
        self.failUnlessEqual(self.response.signed,
676
 
                             ['mode', 'identity', 'return_to', 'mj12.bright'])
677
 
 
678
 
 
679
 
    def test_addFieldNoNamespace(self):
680
 
        self.response.addField('', 'dark', 'pages')
681
 
        self.failUnlessEqual(self.response.fields,
682
 
                             {'blue': 'star',
683
 
                              'mode': 'id_res',
684
 
                              'dark': 'pages'})
685
 
 
686
 
    def test_addFieldUnsigned(self):
687
 
        namespace = 'mj12'
688
 
        self.response.addField(namespace, 'dull', 'lemon', signed=False)
689
 
        self.failUnlessEqual(self.response.fields,
690
 
                             {'blue': 'star',
691
 
                              'mode': 'id_res',
692
 
                              'mj12.dull': 'lemon'})
693
 
        self.failUnlessEqual(self.response.signed,
694
 
                             ['mode', 'identity', 'return_to'])
695
 
 
696
 
 
697
 
    def test_addFields(self):
698
 
        namespace = 'mi5'
699
 
        self.response.addFields(namespace, {'tangy': 'suspenders',
700
 
                                           'bravo': 'inclusion'})
701
 
        self.failUnlessEqual(self.response.fields,
702
 
                             {'blue': 'star',
703
 
                              'mode': 'id_res',
704
 
                              'mi5.tangy': 'suspenders',
705
 
                              'mi5.bravo': 'inclusion'})
706
 
        self.failUnlessEqual(self.response.signed,
707
 
                             ['mode', 'identity', 'return_to',
708
 
                              'mi5.tangy', 'mi5.bravo'])
709
 
 
710
 
 
711
 
    def test_addFieldsUnsigned(self):
712
 
        namespace = 'mi5'
713
 
        self.response.addFields(namespace, {'strange': 'conditioner',
714
 
                                           'elemental': 'blender'},
715
 
                                signed=False)
716
 
        self.failUnlessEqual(self.response.fields,
717
 
                             {'blue': 'star',
718
 
                              'mode': 'id_res',
719
 
                              'mi5.strange': 'conditioner',
720
 
                              'mi5.elemental': 'blender'})
721
 
        self.failUnlessEqual(self.response.signed,
722
 
                             ['mode', 'identity', 'return_to'])
723
 
 
724
 
 
725
 
    def test_update(self):
726
 
        eresponse = server.OpenIDResponse(None)
727
 
        eresponse.fields.update({'shape': 'heart',
728
 
                                 'content': 'strings,wire'})
729
 
        eresponse.signed = ['content']
730
 
        self.response.update('box', eresponse)
731
 
        self.failUnlessEqual(self.response.fields,
732
 
                             {'blue': 'star',
733
 
                              'mode': 'id_res',
734
 
                              'box.shape': 'heart',
735
 
                              'box.content': 'strings,wire'})
736
 
        self.failUnlessEqual(self.response.signed,
737
 
                             ['mode', 'identity', 'return_to', 'box.content'])
738
 
 
739
 
    def test_updateNoNamespace(self):
740
 
        eresponse = server.OpenIDResponse(None)
741
 
        eresponse.fields.update({'species': 'pterodactyl',
742
 
                                 'saturation': 'day-glo'})
743
 
        eresponse.signed = ['species']
744
 
        self.response.update(None, eresponse)
745
 
        self.failUnlessEqual(self.response.fields,
746
 
                             {'blue': 'star',
747
 
                              'mode': 'id_res',
748
 
                              'species': 'pterodactyl',
749
 
                              'saturation': 'day-glo'})
750
 
        self.failUnlessEqual(self.response.signed,
751
 
                             ['mode', 'identity', 'return_to', 'species'])
752
 
 
753
 
 
754
 
 
755
 
class MockSignatory(object):
756
 
    isValid = True
757
 
 
758
 
    def __init__(self, assoc):
759
 
        self.assocs = [assoc]
760
 
 
761
 
    def verify(self, assoc_handle, sig, signed_pairs):
762
 
        assert sig
763
 
        signed_pairs[:]
764
 
        if (True, assoc_handle) in self.assocs:
765
 
            return self.isValid
766
 
        else:
767
 
            return False
768
 
 
769
 
    def getAssociation(self, assoc_handle, dumb):
770
 
        if (dumb, assoc_handle) in self.assocs:
771
 
            # This isn't a valid implementation for many uses of this
772
 
            # function, mind you.
773
 
            return True
774
 
        else:
775
 
            return None
776
 
 
777
 
    def invalidate(self, assoc_handle, dumb):
778
 
        if (dumb, assoc_handle) in self.assocs:
779
 
            self.assocs.remove((dumb, assoc_handle))
780
 
 
781
 
 
782
 
class TestCheckAuth(unittest.TestCase):
783
 
    def setUp(self):
784
 
        self.assoc_handle = 'mooooooooo'
785
 
        self.request = server.CheckAuthRequest(
786
 
            self.assoc_handle, 'signarture',
787
 
            [('one', 'alpha'), ('two', 'beta')])
788
 
 
789
 
        self.signatory = MockSignatory((True, self.assoc_handle))
790
 
 
791
 
    def test_valid(self):
792
 
        r = self.request.answer(self.signatory)
793
 
        self.failUnlessEqual(r.fields, {'is_valid': 'true'})
794
 
        self.failUnlessEqual(r.request, self.request)
795
 
 
796
 
    def test_invalid(self):
797
 
        self.signatory.isValid = False
798
 
        r = self.request.answer(self.signatory)
799
 
        self.failUnlessEqual(r.fields, {'is_valid': 'false'})
800
 
 
801
 
    def test_replay(self):
802
 
        r = self.request.answer(self.signatory)
803
 
        r = self.request.answer(self.signatory)
804
 
        self.failUnlessEqual(r.fields, {'is_valid': 'false'})
805
 
 
806
 
    def test_invalidatehandle(self):
807
 
        self.request.invalidate_handle = "bogusHandle"
808
 
        r = self.request.answer(self.signatory)
809
 
        self.failUnlessEqual(r.fields, {'is_valid': 'true',
810
 
                                        'invalidate_handle': "bogusHandle"})
811
 
        self.failUnlessEqual(r.request, self.request)
812
 
 
813
 
    def test_invalidatehandleNo(self):
814
 
        assoc_handle = 'goodhandle'
815
 
        self.signatory.assocs.append((False, 'goodhandle'))
816
 
        self.request.invalidate_handle = assoc_handle
817
 
        r = self.request.answer(self.signatory)
818
 
        self.failUnlessEqual(r.fields, {'is_valid': 'true'})
819
 
 
820
 
 
821
 
class TestAssociate(unittest.TestCase):
822
 
    # TODO: test DH with non-default values for modulus and gen.
823
 
    # (important to do because we actually had it broken for a while.)
824
 
 
825
 
    def setUp(self):
826
 
        self.request = server.AssociateRequest.fromQuery({})
827
 
        self.store = _memstore.MemoryStore()
828
 
        self.signatory = server.Signatory(self.store)
829
 
        self.assoc = self.signatory.createAssociation(dumb=False)
830
 
 
831
 
    def test_dh(self):
832
 
        from openid.dh import DiffieHellman
833
 
        from openid.server.server import DiffieHellmanServerSession
834
 
        consumer_dh = DiffieHellman.fromDefaults()
835
 
        cpub = consumer_dh.public
836
 
        session = DiffieHellmanServerSession(DiffieHellman.fromDefaults(), cpub)
837
 
        self.request = server.AssociateRequest(session)
838
 
        response = self.request.answer(self.assoc)
839
 
        rfg = response.fields.get
840
 
        self.failUnlessEqual(rfg("assoc_type"), "HMAC-SHA1")
841
 
        self.failUnlessEqual(rfg("assoc_handle"), self.assoc.handle)
842
 
        self.failIf(rfg("mac_key"))
843
 
        self.failUnlessEqual(rfg("session_type"), "DH-SHA1")
844
 
        self.failUnless(rfg("enc_mac_key"))
845
 
        self.failUnless(rfg("dh_server_public"))
846
 
 
847
 
        enc_key = rfg("enc_mac_key").decode('base64')
848
 
        spub = cryptutil.base64ToLong(rfg("dh_server_public"))
849
 
        secret = consumer_dh.xorSecret(spub, enc_key)
850
 
        self.failUnlessEqual(secret, self.assoc.secret)
851
 
 
852
 
 
853
 
    def test_plaintext(self):
854
 
        response = self.request.answer(self.assoc)
855
 
        rfg = response.fields.get
856
 
 
857
 
        self.failUnlessEqual(rfg("assoc_type"), "HMAC-SHA1")
858
 
        self.failUnlessEqual(rfg("assoc_handle"), self.assoc.handle)
859
 
 
860
 
        self.failUnlessEqual(
861
 
            rfg("expires_in"), "%d" % (self.signatory.SECRET_LIFETIME,))
862
 
        self.failUnlessEqual(
863
 
            rfg("mac_key"), oidutil.toBase64(self.assoc.secret))
864
 
        self.failIf(rfg("session_type"))
865
 
        self.failIf(rfg("enc_mac_key"))
866
 
        self.failIf(rfg("dh_server_public"))
867
 
 
868
 
class Counter(object):
869
 
    def __init__(self):
870
 
        self.count = 0
871
 
 
872
 
    def inc(self):
873
 
        self.count += 1
874
 
 
875
 
class TestServer(unittest.TestCase, CatchLogs):
876
 
    def setUp(self):
877
 
        self.store = _memstore.MemoryStore()
878
 
        self.server = server.Server(self.store)
879
 
        CatchLogs.setUp(self)
880
 
 
881
 
    def test_dispatch(self):
882
 
        monkeycalled = Counter()
883
 
        def monkeyDo(request):
884
 
            monkeycalled.inc()
885
 
            r = server.OpenIDResponse(request)
886
 
            return r
887
 
        self.server.openid_monkeymode = monkeyDo
888
 
        request = server.OpenIDRequest()
889
 
        request.mode = "monkeymode"
890
 
        webresult = self.server.handleRequest(request)
891
 
        self.failUnlessEqual(monkeycalled.count, 1)
892
 
 
893
 
    def test_associate(self):
894
 
        request = server.AssociateRequest.fromQuery({})
895
 
        response = self.server.openid_associate(request)
896
 
        self.failUnless(response.fields.has_key("assoc_handle"))
897
 
 
898
 
    def test_checkAuth(self):
899
 
        request = server.CheckAuthRequest('arrrrrf', '0x3999', [])
900
 
        response = self.server.openid_check_authentication(request)
901
 
        self.failUnless(response.fields.has_key("is_valid"))
902
 
 
903
 
class TestSignatory(unittest.TestCase, CatchLogs):
904
 
    def setUp(self):
905
 
        self.store = _memstore.MemoryStore()
906
 
        self.signatory = server.Signatory(self.store)
907
 
        self._dumb_key = self.signatory._dumb_key
908
 
        self._normal_key = self.signatory._normal_key
909
 
        CatchLogs.setUp(self)
910
 
 
911
 
    def test_sign(self):
912
 
        request = server.OpenIDRequest()
913
 
        assoc_handle = '{assoc}{lookatme}'
914
 
        self.store.storeAssociation(
915
 
            self._normal_key,
916
 
            association.Association.fromExpiresIn(60, assoc_handle,
917
 
                                                  'sekrit', 'HMAC-SHA1'))
918
 
        request.assoc_handle = assoc_handle
919
 
        response = server.OpenIDResponse(request)
920
 
        response.fields = {
921
 
            'foo': 'amsigned',
922
 
            'bar': 'notsigned',
923
 
            'azu': 'alsosigned',
924
 
            }
925
 
        response.signed = ['foo', 'azu']
926
 
        sresponse = self.signatory.sign(response)
927
 
        self.failUnlessEqual(sresponse.fields.get('assoc_handle'),
928
 
                             assoc_handle)
929
 
        self.failUnlessEqual(sresponse.fields.get('signed'),
930
 
                             'foo,azu')
931
 
        self.failUnless(sresponse.fields.get('sig'))
932
 
        self.failIf(self.messages, self.messages)
933
 
 
934
 
    def test_signDumb(self):
935
 
        request = server.OpenIDRequest()
936
 
        request.assoc_handle = None
937
 
        response = server.OpenIDResponse(request)
938
 
        response.fields = {
939
 
            'foo': 'amsigned',
940
 
            'bar': 'notsigned',
941
 
            'azu': 'alsosigned',
942
 
            }
943
 
        response.signed = ['foo', 'azu']
944
 
        sresponse = self.signatory.sign(response)
945
 
        assoc_handle = sresponse.fields.get('assoc_handle')
946
 
        self.failUnless(assoc_handle)
947
 
        assoc = self.signatory.getAssociation(assoc_handle, dumb=True)
948
 
        self.failUnless(assoc)
949
 
        self.failUnlessEqual(sresponse.fields.get('signed'),
950
 
                             'foo,azu')
951
 
        self.failUnless(sresponse.fields.get('sig'))
952
 
        self.failIf(self.messages, self.messages)
953
 
 
954
 
    def test_signExpired(self):
955
 
        request = server.OpenIDRequest()
956
 
        assoc_handle = '{assoc}{lookatme}'
957
 
        self.store.storeAssociation(
958
 
            self._normal_key,
959
 
            association.Association.fromExpiresIn(-10, assoc_handle,
960
 
                                                  'sekrit', 'HMAC-SHA1'))
961
 
        self.failUnless(self.store.getAssociation(self._normal_key, assoc_handle))
962
 
 
963
 
        request.assoc_handle = assoc_handle
964
 
        response = server.OpenIDResponse(request)
965
 
        response.fields = {
966
 
            'foo': 'amsigned',
967
 
            'bar': 'notsigned',
968
 
            'azu': 'alsosigned',
969
 
            }
970
 
        response.signed = ['foo', 'azu']
971
 
        sresponse = self.signatory.sign(response)
972
 
 
973
 
        new_assoc_handle = sresponse.fields.get('assoc_handle')
974
 
        self.failUnless(new_assoc_handle)
975
 
        self.failIfEqual(new_assoc_handle, assoc_handle)
976
 
 
977
 
        self.failUnlessEqual(sresponse.fields.get('invalidate_handle'),
978
 
                             assoc_handle)
979
 
 
980
 
        self.failUnlessEqual(sresponse.fields.get('signed'),
981
 
                             'foo,azu')
982
 
        self.failUnless(sresponse.fields.get('sig'))
983
 
 
984
 
        # make sure the expired association is gone
985
 
        self.failIf(self.store.getAssociation(self._normal_key, assoc_handle))
986
 
 
987
 
        # make sure the new key is a dumb mode association
988
 
        self.failUnless(self.store.getAssociation(self._dumb_key, new_assoc_handle))
989
 
        self.failIf(self.store.getAssociation(self._normal_key, new_assoc_handle))
990
 
        self.failUnless(self.messages)
991
 
 
992
 
    def test_signInvalidHandle(self):
993
 
        request = server.OpenIDRequest()
994
 
        assoc_handle = '{bogus-assoc}{notvalid}'
995
 
 
996
 
        request.assoc_handle = assoc_handle
997
 
        response = server.OpenIDResponse(request)
998
 
        response.fields = {
999
 
            'foo': 'amsigned',
1000
 
            'bar': 'notsigned',
1001
 
            'azu': 'alsosigned',
1002
 
            }
1003
 
        response.signed = ['foo', 'azu']
1004
 
        sresponse = self.signatory.sign(response)
1005
 
 
1006
 
        new_assoc_handle = sresponse.fields.get('assoc_handle')
1007
 
        self.failUnless(new_assoc_handle)
1008
 
        self.failIfEqual(new_assoc_handle, assoc_handle)
1009
 
 
1010
 
        self.failUnlessEqual(sresponse.fields.get('invalidate_handle'),
1011
 
                             assoc_handle)
1012
 
 
1013
 
        self.failUnlessEqual(sresponse.fields.get('signed'), 'foo,azu')
1014
 
        self.failUnless(sresponse.fields.get('sig'))
1015
 
 
1016
 
        # make sure the new key is a dumb mode association
1017
 
        self.failUnless(self.store.getAssociation(self._dumb_key, new_assoc_handle))
1018
 
        self.failIf(self.store.getAssociation(self._normal_key, new_assoc_handle))
1019
 
        self.failIf(self.messages, self.messages)
1020
 
 
1021
 
 
1022
 
    def test_verify(self):
1023
 
        assoc_handle = '{vroom}{zoom}'
1024
 
        assoc = association.Association.fromExpiresIn(60, assoc_handle,
1025
 
                                                      'sekrit', 'HMAC-SHA1')
1026
 
 
1027
 
        self.store.storeAssociation(self._dumb_key, assoc)
1028
 
 
1029
 
        signed_pairs = [('foo', 'bar'),
1030
 
                        ('apple', 'orange')]
1031
 
 
1032
 
        sig = "Ylu0KcIR7PvNegB/K41KpnRgJl0="
1033
 
        verified = self.signatory.verify(assoc_handle, sig, signed_pairs)
1034
 
        self.failUnless(verified)
1035
 
        self.failIf(self.messages, self.messages)
1036
 
 
1037
 
    def test_verifyBadSig(self):
1038
 
        assoc_handle = '{vroom}{zoom}'
1039
 
        assoc = association.Association.fromExpiresIn(60, assoc_handle,
1040
 
                                                      'sekrit', 'HMAC-SHA1')
1041
 
 
1042
 
        self.store.storeAssociation(self._dumb_key, assoc)
1043
 
 
1044
 
        signed_pairs = [('foo', 'bar'),
1045
 
                        ('apple', 'orange')]
1046
 
 
1047
 
        sig = "Ylu0KcIR7PvNegB/K41KpnRgJl0=".encode('rot13')
1048
 
        verified = self.signatory.verify(assoc_handle, sig, signed_pairs)
1049
 
        self.failIf(verified)
1050
 
        self.failIf(self.messages, self.messages)
1051
 
 
1052
 
    def test_verifyBadHandle(self):
1053
 
        assoc_handle = '{vroom}{zoom}'
1054
 
        signed_pairs = [('foo', 'bar'),
1055
 
                        ('apple', 'orange')]
1056
 
 
1057
 
        sig = "Ylu0KcIR7PvNegB/K41KpnRgJl0="
1058
 
        verified = self.signatory.verify(assoc_handle, sig, signed_pairs)
1059
 
        self.failIf(verified)
1060
 
        self.failUnless(self.messages)
1061
 
 
1062
 
    def test_getAssoc(self):
1063
 
        assoc_handle = self.makeAssoc(dumb=True)
1064
 
        assoc = self.signatory.getAssociation(assoc_handle, True)
1065
 
        self.failUnless(assoc)
1066
 
        self.failUnlessEqual(assoc.handle, assoc_handle)
1067
 
        self.failIf(self.messages, self.messages)
1068
 
 
1069
 
    def test_getAssocExpired(self):
1070
 
        assoc_handle = self.makeAssoc(dumb=True, lifetime=-10)
1071
 
        assoc = self.signatory.getAssociation(assoc_handle, True)
1072
 
        self.failIf(assoc, assoc)
1073
 
        self.failUnless(self.messages)
1074
 
 
1075
 
    def test_getAssocInvalid(self):
1076
 
        ah = 'no-such-handle'
1077
 
        self.failUnlessEqual(
1078
 
            self.signatory.getAssociation(ah, dumb=False), None)
1079
 
        self.failIf(self.messages, self.messages)
1080
 
 
1081
 
    def test_getAssocDumbVsNormal(self):
1082
 
        assoc_handle = self.makeAssoc(dumb=True)
1083
 
        self.failUnlessEqual(
1084
 
            self.signatory.getAssociation(assoc_handle, dumb=False), None)
1085
 
        self.failIf(self.messages, self.messages)
1086
 
 
1087
 
    def test_createAssociation(self):
1088
 
        assoc = self.signatory.createAssociation(dumb=False)
1089
 
        self.failUnless(self.signatory.getAssociation(assoc.handle, dumb=False))
1090
 
        self.failIf(self.messages, self.messages)
1091
 
 
1092
 
    def makeAssoc(self, dumb, lifetime=60):
1093
 
        assoc_handle = '{bling}'
1094
 
        assoc = association.Association.fromExpiresIn(lifetime, assoc_handle,
1095
 
                                                      'sekrit', 'HMAC-SHA1')
1096
 
 
1097
 
        self.store.storeAssociation((dumb and self._dumb_key) or self._normal_key, assoc)
1098
 
        return assoc_handle
1099
 
 
1100
 
    def test_invalidate(self):
1101
 
        assoc_handle = '-squash-'
1102
 
        assoc = association.Association.fromExpiresIn(60, assoc_handle,
1103
 
                                                      'sekrit', 'HMAC-SHA1')
1104
 
 
1105
 
        self.store.storeAssociation(self._dumb_key, assoc)
1106
 
        assoc = self.signatory.getAssociation(assoc_handle, dumb=True)
1107
 
        self.failUnless(assoc)
1108
 
        assoc = self.signatory.getAssociation(assoc_handle, dumb=True)
1109
 
        self.failUnless(assoc)
1110
 
        self.signatory.invalidate(assoc_handle, dumb=True)
1111
 
        assoc = self.signatory.getAssociation(assoc_handle, dumb=True)
1112
 
        self.failIf(assoc)
1113
 
        self.failIf(self.messages, self.messages)
1114
 
 
1115
 
 
1116
 
 
1117
 
if __name__ == '__main__':
1118
 
    unittest.main()