1
# -*- test-case-name: openid.test.server -*-
2
"""Tests for openid.server.
4
from openid.server import server
5
from openid import association, cryptutil, oidutil
11
from urlparse import urlparse
13
# In general, if you edit or add tests here, try to move in the direction
14
# of testing smaller units. For testing the external interfaces, we'll be
15
# developing an implementation-agnostic testing suite.
17
# for more, see /etc/ssh/moduli
19
ALT_MODULUS = 0xCAADDDEC1667FC68B5FA15D53C4E1532DD24561A1A2D47A12C01ABEA1E00731F6921AAC40742311FDF9E634BB7131BEE1AF240261554389A910425E044E88C8359B010F5AD2B80E29CB1A5B027B19D9E01A6F63A6F45E5D7ED2FF6A2A0085050A7D0CF307C3DB51D2490355907B4427C23A98DF1EB8ABEF2BA209BB7AFFE86A7
22
class CatchLogs(object):
24
self.old_logger = oidutil.log
25
oidutil.log = self.gotLogMessage
28
def gotLogMessage(self, message):
29
self.messages.append(message)
32
oidutil.log = self.old_logger
34
class TestProtocolError(unittest.TestCase):
35
def test_browserWithReturnTo(self):
36
return_to = "http://rp.unittest/consumer"
37
# will be a ProtocolError raised by Decode or CheckIDRequest.answer
39
'openid.mode': 'monkeydance',
40
'openid.identity': 'http://wagu.unittest/',
41
'openid.return_to': return_to,
43
e = server.ProtocolError(args, "plucky")
44
self.failUnless(e.hasReturnTo())
46
'openid.mode': ['error'],
47
'openid.error': ['plucky'],
50
rt_base, result_args = e.encodeToURL().split('?', 1)
51
result_args = cgi.parse_qs(result_args)
52
self.failUnlessEqual(result_args, expected_args)
54
def test_noReturnTo(self):
55
# will be a ProtocolError raised by Decode or CheckIDRequest.answer
57
'openid.mode': 'zebradance',
58
'openid.identity': 'http://wagu.unittest/',
60
e = server.ProtocolError(args, "waffles")
61
self.failIf(e.hasReturnTo())
62
expected = """error:waffles
65
self.failUnlessEqual(e.encodeToKVForm(), expected)
69
class TestDecode(unittest.TestCase):
71
self.id_url = "http://decoder.am.unittest/"
72
self.rt_url = "http://rp.unittest/foobot/?qux=zam"
73
self.tr_url = "http://rp.unittest/"
74
self.assoc_handle = "{assoc}{handle}"
75
self.decode = server.Decoder().decode
80
self.failUnlessEqual(r, None)
82
def test_irrelevant(self):
85
'sreg.mutant_power': 'decaffinator',
88
self.failUnlessEqual(r, None)
92
'openid.mode': 'twos-compliment',
93
'openid.pants': 'zippered',
95
self.failUnlessRaises(server.ProtocolError, self.decode, args)
97
def test_dictOfLists(self):
99
'openid.mode': ['checkid_setup'],
100
'openid.identity': self.id_url,
101
'openid.assoc_handle': self.assoc_handle,
102
'openid.return_to': self.rt_url,
103
'openid.trust_root': self.tr_url,
106
result = self.decode(args)
107
except TypeError, err:
108
self.failUnless(str(err).find('values') != -1, err)
110
self.fail("Expected TypeError, but got result %s" % (result,))
112
def test_checkidImmediate(self):
114
'openid.mode': 'checkid_immediate',
115
'openid.identity': self.id_url,
116
'openid.assoc_handle': self.assoc_handle,
117
'openid.return_to': self.rt_url,
118
'openid.trust_root': self.tr_url,
120
'openid.some.extension': 'junk',
122
r = self.decode(args)
123
self.failUnless(isinstance(r, server.CheckIDRequest))
124
self.failUnlessEqual(r.mode, "checkid_immediate")
125
self.failUnlessEqual(r.immediate, True)
126
self.failUnlessEqual(r.identity, self.id_url)
127
self.failUnlessEqual(r.trust_root, self.tr_url)
128
self.failUnlessEqual(r.return_to, self.rt_url)
129
self.failUnlessEqual(r.assoc_handle, self.assoc_handle)
131
def test_checkidSetup(self):
133
'openid.mode': 'checkid_setup',
134
'openid.identity': self.id_url,
135
'openid.assoc_handle': self.assoc_handle,
136
'openid.return_to': self.rt_url,
137
'openid.trust_root': self.tr_url,
139
r = self.decode(args)
140
self.failUnless(isinstance(r, server.CheckIDRequest))
141
self.failUnlessEqual(r.mode, "checkid_setup")
142
self.failUnlessEqual(r.immediate, False)
143
self.failUnlessEqual(r.identity, self.id_url)
144
self.failUnlessEqual(r.trust_root, self.tr_url)
145
self.failUnlessEqual(r.return_to, self.rt_url)
147
def test_checkidSetupNoIdentity(self):
149
'openid.mode': 'checkid_setup',
150
'openid.assoc_handle': self.assoc_handle,
151
'openid.return_to': self.rt_url,
152
'openid.trust_root': self.tr_url,
155
result = self.decode(args)
156
except server.ProtocolError, err:
157
self.failUnless(err.query)
159
self.fail("Expected ProtocolError, instead returned with %s" %
162
def test_checkidSetupNoReturn(self):
164
'openid.mode': 'checkid_setup',
165
'openid.identity': self.id_url,
166
'openid.assoc_handle': self.assoc_handle,
167
'openid.trust_root': self.tr_url,
169
self.failUnlessRaises(server.ProtocolError, self.decode, args)
171
def test_checkidSetupBadReturn(self):
173
'openid.mode': 'checkid_setup',
174
'openid.identity': self.id_url,
175
'openid.assoc_handle': self.assoc_handle,
176
'openid.return_to': 'not a url',
179
result = self.decode(args)
180
except server.ProtocolError, err:
181
self.failUnless(err.query)
183
self.fail("Expected ProtocolError, instead returned with %s" %
186
def test_checkidSetupUntrustedReturn(self):
188
'openid.mode': 'checkid_setup',
189
'openid.identity': self.id_url,
190
'openid.assoc_handle': self.assoc_handle,
191
'openid.return_to': self.rt_url,
192
'openid.trust_root': 'http://not-the-return-place.unittest/',
195
result = self.decode(args)
196
except server.UntrustedReturnURL, err:
197
self.failUnless(err.query)
199
self.fail("Expected UntrustedReturnURL, instead returned with %s" %
202
def test_checkAuth(self):
204
'openid.mode': 'check_authentication',
205
'openid.assoc_handle': '{dumb}{handle}',
206
'openid.sig': 'sigblob',
207
'openid.signed': 'foo,bar,mode',
208
'openid.foo': 'signedval1',
209
'openid.bar': 'signedval2',
210
'openid.baz': 'unsigned',
212
r = self.decode(args)
213
self.failUnless(isinstance(r, server.CheckAuthRequest))
214
self.failUnlessEqual(r.mode, 'check_authentication')
215
self.failUnlessEqual(r.sig, 'sigblob')
216
self.failUnlessEqual(r.signed, [
217
('foo', 'signedval1'),
218
('bar', 'signedval2'),
221
# XXX: test error cases (i.e. missing required fields)
224
def test_checkAuthMissingSignedField(self):
226
'openid.mode': 'check_authentication',
227
'openid.assoc_handle': '{dumb}{handle}',
228
'openid.sig': 'sigblob',
229
'openid.signed': 'foo,bar,mode',
230
'openid.foo': 'signedval1',
231
'openid.baz': 'unsigned',
233
self.failUnlessRaises(server.ProtocolError, self.decode, args)
236
def test_checkAuthMissingSignature(self):
238
'openid.mode': 'check_authentication',
239
'openid.assoc_handle': '{dumb}{handle}',
240
'openid.signed': 'foo,bar,mode',
241
'openid.foo': 'signedval1',
242
'openid.bar': 'signedval2',
243
'openid.baz': 'unsigned',
245
self.failUnlessRaises(server.ProtocolError, self.decode, args)
248
def test_checkAuthAndInvalidate(self):
250
'openid.mode': 'check_authentication',
251
'openid.assoc_handle': '{dumb}{handle}',
252
'openid.invalidate_handle': '[[SMART_handle]]',
253
'openid.sig': 'sigblob',
254
'openid.signed': 'foo,bar,mode',
255
'openid.foo': 'signedval1',
256
'openid.bar': 'signedval2',
257
'openid.baz': 'unsigned',
259
r = self.decode(args)
260
self.failUnless(isinstance(r, server.CheckAuthRequest))
261
self.failUnlessEqual(r.invalidate_handle, '[[SMART_handle]]')
264
def test_associateDH(self):
266
'openid.mode': 'associate',
267
'openid.session_type': 'DH-SHA1',
268
'openid.dh_consumer_public': "Rzup9265tw==",
270
r = self.decode(args)
271
self.failUnless(isinstance(r, server.AssociateRequest))
272
self.failUnlessEqual(r.mode, "associate")
273
self.failUnlessEqual(r.session.session_type, "DH-SHA1")
274
self.failUnlessEqual(r.assoc_type, "HMAC-SHA1")
275
self.failUnless(r.session.consumer_pubkey)
277
def test_associateDHMissingKey(self):
278
"""Trying DH assoc w/o public key"""
280
'openid.mode': 'associate',
281
'openid.session_type': 'DH-SHA1',
283
# Using DH-SHA1 without supplying dh_consumer_public is an error.
284
self.failUnlessRaises(server.ProtocolError, self.decode, args)
287
def test_associateDHpubKeyNotB64(self):
289
'openid.mode': 'associate',
290
'openid.session_type': 'DH-SHA1',
291
'openid.dh_consumer_public': "donkeydonkeydonkey",
293
self.failUnlessRaises(server.ProtocolError, self.decode, args)
296
def test_associateDHModGen(self):
297
# test dh with non-default but valid values for dh_modulus and dh_gen
299
'openid.mode': 'associate',
300
'openid.session_type': 'DH-SHA1',
301
'openid.dh_consumer_public': "Rzup9265tw==",
302
'openid.dh_modulus': cryptutil.longToBase64(ALT_MODULUS),
303
'openid.dh_gen': cryptutil.longToBase64(ALT_GEN) ,
305
r = self.decode(args)
306
self.failUnless(isinstance(r, server.AssociateRequest))
307
self.failUnlessEqual(r.mode, "associate")
308
self.failUnlessEqual(r.session.session_type, "DH-SHA1")
309
self.failUnlessEqual(r.assoc_type, "HMAC-SHA1")
310
self.failUnlessEqual(r.session.dh.modulus, ALT_MODULUS)
311
self.failUnlessEqual(r.session.dh.generator, ALT_GEN)
312
self.failUnless(r.session.consumer_pubkey)
315
def test_associateDHCorruptModGen(self):
316
# test dh with non-default but valid values for dh_modulus and dh_gen
318
'openid.mode': 'associate',
319
'openid.session_type': 'DH-SHA1',
320
'openid.dh_consumer_public': "Rzup9265tw==",
321
'openid.dh_modulus': 'pizza',
322
'openid.dh_gen': 'gnocchi',
324
self.failUnlessRaises(server.ProtocolError, self.decode, args)
327
def test_associateDHMissingModGen(self):
328
# test dh with non-default but valid values for dh_modulus and dh_gen
330
'openid.mode': 'associate',
331
'openid.session_type': 'DH-SHA1',
332
'openid.dh_consumer_public': "Rzup9265tw==",
333
'openid.dh_modulus': 'pizza',
335
self.failUnlessRaises(server.ProtocolError, self.decode, args)
338
# def test_associateDHInvalidModGen(self):
339
# # test dh with properly encoded values that are not a valid
340
# # modulus/generator combination.
342
# 'openid.mode': 'associate',
343
# 'openid.session_type': 'DH-SHA1',
344
# 'openid.dh_consumer_public': "Rzup9265tw==",
345
# 'openid.dh_modulus': cryptutil.longToBase64(9),
346
# 'openid.dh_gen': cryptutil.longToBase64(27) ,
348
# self.failUnlessRaises(server.ProtocolError, self.decode, args)
349
# test_associateDHInvalidModGen.todo = "low-priority feature"
352
def test_associateWeirdSession(self):
354
'openid.mode': 'associate',
355
'openid.session_type': 'FLCL6',
356
'openid.dh_consumer_public': "YQ==\n",
358
self.failUnlessRaises(server.ProtocolError, self.decode, args)
361
def test_associatePlain(self):
363
'openid.mode': 'associate',
365
r = self.decode(args)
366
self.failUnless(isinstance(r, server.AssociateRequest))
367
self.failUnlessEqual(r.mode, "associate")
368
self.failUnlessEqual(r.session.session_type, "plaintext")
369
self.failUnlessEqual(r.assoc_type, "HMAC-SHA1")
371
def test_nomode(self):
373
'openid.session_type': 'DH-SHA1',
374
'openid.dh_consumer_public': "my public keeey",
376
self.failUnlessRaises(server.ProtocolError, self.decode, args)
380
class TestEncode(unittest.TestCase):
382
self.encoder = server.Encoder()
383
self.encode = self.encoder.encode
385
def test_id_res(self):
386
request = server.CheckIDRequest(
387
identity = 'http://bombom.unittest/',
388
trust_root = 'http://burr.unittest/',
389
return_to = 'http://burr.unittest/999',
392
response = server.OpenIDResponse(request)
395
'identity': request.identity,
396
'return_to': request.return_to,
398
webresponse = self.encode(response)
399
self.failUnlessEqual(webresponse.code, server.HTTP_REDIRECT)
400
self.failUnless(webresponse.headers.has_key('location'))
402
location = webresponse.headers['location']
403
self.failUnless(location.startswith(request.return_to),
404
"%s does not start with %s" % (location,
406
query = cgi.parse_qs(urlparse(location)[4])
408
q2 = dict([(k, v[0]) for k, v in query.iteritems()])
410
[('openid.' + k, v) for k, v in response.fields.iteritems()])
411
self.failUnlessEqual(q2, expected)
413
def test_cancel(self):
414
request = server.CheckIDRequest(
415
identity = 'http://bombom.unittest/',
416
trust_root = 'http://burr.unittest/',
417
return_to = 'http://burr.unittest/999',
420
response = server.OpenIDResponse(request)
424
webresponse = self.encode(response)
425
self.failUnlessEqual(webresponse.code, server.HTTP_REDIRECT)
426
self.failUnless(webresponse.headers.has_key('location'))
428
def test_assocReply(self):
429
request = server.AssociateRequest.fromQuery({})
430
response = server.OpenIDResponse(request)
431
response.fields = {'assoc_handle': "every-zig"}
432
webresponse = self.encode(response)
433
body = """assoc_handle:every-zig
435
self.failUnlessEqual(webresponse.code, server.HTTP_OK)
436
self.failUnlessEqual(webresponse.headers, {})
437
self.failUnlessEqual(webresponse.body, body)
439
def test_checkauthReply(self):
440
request = server.CheckAuthRequest('a_sock_monkey',
443
response = server.OpenIDResponse(request)
446
'invalidate_handle': 'xXxX:xXXx'
448
body = """invalidate_handle:xXxX:xXXx
451
webresponse = self.encode(response)
452
self.failUnlessEqual(webresponse.code, server.HTTP_OK)
453
self.failUnlessEqual(webresponse.headers, {})
454
self.failUnlessEqual(webresponse.body, body)
456
def test_unencodableError(self):
458
'openid.identity': 'http://limu.unittest/',
460
e = server.ProtocolError(args, "wet paint")
461
self.failUnlessRaises(server.EncodingError, self.encode, e)
463
def test_encodableError(self):
465
'openid.mode': 'associate',
466
'openid.identity': 'http://limu.unittest/',
471
webresponse = self.encode(server.ProtocolError(args, "snoot"))
472
self.failUnlessEqual(webresponse.code, server.HTTP_ERROR)
473
self.failUnlessEqual(webresponse.headers, {})
474
self.failUnlessEqual(webresponse.body, body)
478
class TestSigningEncode(unittest.TestCase):
480
self._dumb_key = server.Signatory._dumb_key
481
self._normal_key = server.Signatory._normal_key
482
self.store = _memstore.MemoryStore()
483
self.request = server.CheckIDRequest(
484
identity = 'http://bombom.unittest/',
485
trust_root = 'http://burr.unittest/',
486
return_to = 'http://burr.unittest/999',
489
self.response = server.OpenIDResponse(self.request)
490
self.response.fields = {
492
'identity': self.request.identity,
493
'return_to': self.request.return_to,
495
self.response.signed.extend(['mode','identity','return_to'])
496
self.signatory = server.Signatory(self.store)
497
self.encoder = server.SigningEncoder(self.signatory)
498
self.encode = self.encoder.encode
500
def test_idres(self):
501
assoc_handle = '{bicycle}{shed}'
502
self.store.storeAssociation(
504
association.Association.fromExpiresIn(60, assoc_handle,
505
'sekrit', 'HMAC-SHA1'))
506
self.request.assoc_handle = assoc_handle
507
webresponse = self.encode(self.response)
508
self.failUnlessEqual(webresponse.code, server.HTTP_REDIRECT)
509
self.failUnless(webresponse.headers.has_key('location'))
511
location = webresponse.headers['location']
512
query = cgi.parse_qs(urlparse(location)[4])
513
self.failUnless('openid.sig' in query)
514
self.failUnless('openid.assoc_handle' in query)
515
self.failUnless('openid.signed' in query)
517
def test_idresDumb(self):
518
webresponse = self.encode(self.response)
519
self.failUnlessEqual(webresponse.code, server.HTTP_REDIRECT)
520
self.failUnless(webresponse.headers.has_key('location'))
522
location = webresponse.headers['location']
523
query = cgi.parse_qs(urlparse(location)[4])
524
self.failUnless('openid.sig' in query)
525
self.failUnless('openid.assoc_handle' in query)
526
self.failUnless('openid.signed' in query)
528
def test_forgotStore(self):
529
self.encoder.signatory = None
530
self.failUnlessRaises(ValueError, self.encode, self.response)
532
def test_cancel(self):
533
request = server.CheckIDRequest(
534
identity = 'http://bombom.unittest/',
535
trust_root = 'http://burr.unittest/',
536
return_to = 'http://burr.unittest/999',
539
response = server.OpenIDResponse(request)
540
response.fields['mode'] = 'cancel'
541
response.signed[:] = []
542
webresponse = self.encode(response)
543
self.failUnlessEqual(webresponse.code, server.HTTP_REDIRECT)
544
self.failUnless(webresponse.headers.has_key('location'))
545
location = webresponse.headers['location']
546
query = cgi.parse_qs(urlparse(location)[4])
547
self.failIf('openid.sig' in query, query.get('openid.sig'))
549
def test_assocReply(self):
550
request = server.AssociateRequest.fromQuery({})
551
response = server.OpenIDResponse(request)
552
response.fields = {'assoc_handle': "every-zig"}
553
webresponse = self.encode(response)
554
body = """assoc_handle:every-zig
556
self.failUnlessEqual(webresponse.code, server.HTTP_OK)
557
self.failUnlessEqual(webresponse.headers, {})
558
self.failUnlessEqual(webresponse.body, body)
560
def test_alreadySigned(self):
561
self.response.fields['sig'] = 'priorSig=='
562
self.failUnlessRaises(server.AlreadySigned, self.encode, self.response)
566
class TestCheckID(unittest.TestCase):
568
self.request = server.CheckIDRequest(
569
identity = 'http://bambam.unittest/',
570
trust_root = 'http://bar.unittest/',
571
return_to = 'http://bar.unittest/999',
575
def test_trustRootInvalid(self):
576
self.request.trust_root = "http://foo.unittest/17"
577
self.request.return_to = "http://foo.unittest/39"
578
self.failIf(self.request.trustRootValid())
580
def test_trustRootValid(self):
581
self.request.trust_root = "http://foo.unittest/"
582
self.request.return_to = "http://foo.unittest/39"
583
self.failUnless(self.request.trustRootValid())
585
def test_answerAllow(self):
586
answer = self.request.answer(True)
587
self.failUnlessEqual(answer.request, self.request)
588
self.failUnlessEqual(answer.fields, {
590
'identity': self.request.identity,
591
'return_to': self.request.return_to,
593
signed = answer.signed[:]
595
self.failUnlessEqual(signed, ["identity", "mode", "return_to"])
597
def test_answerAllowNoTrustRoot(self):
598
self.request.trust_root = None
599
answer = self.request.answer(True)
600
self.failUnlessEqual(answer.request, self.request)
601
self.failUnlessEqual(answer.fields, {
603
'identity': self.request.identity,
604
'return_to': self.request.return_to,
606
signed = answer.signed[:]
608
self.failUnlessEqual(signed, ["identity", "mode", "return_to"])
610
def test_answerImmediateDeny(self):
611
self.request.mode = 'checkid_immediate'
612
self.request.immediate = True
613
server_url = "http://setup-url.unittest/"
614
# crappiting setup_url, you dirty my interface with your presence!
615
answer = self.request.answer(False, server_url=server_url)
616
self.failUnlessEqual(answer.request, self.request)
617
self.failUnlessEqual(len(answer.fields), 2, answer.fields)
618
self.failUnlessEqual(answer.fields.get('mode'), 'id_res')
619
self.failUnless(answer.fields.get('user_setup_url', '').startswith(
621
self.failUnlessEqual(answer.signed, [])
623
def test_answerSetupDeny(self):
624
answer = self.request.answer(False)
625
self.failUnlessEqual(answer.fields, {
628
self.failUnlessEqual(answer.signed, [])
630
def test_encodeToURL(self):
631
server_url = 'http://openid-server.unittest/'
632
result = self.request.encodeToURL(server_url)
634
# How to check? How about a round-trip test.
635
base, result_args = result.split('?', 1)
636
result_args = cgi.parse_qs(result_args)
637
result_args = dict([(k, v[0]) for k, v in result_args.iteritems()])
638
rebuilt_request = server.CheckIDRequest.fromQuery(result_args)
639
self.failUnlessEqual(rebuilt_request.__dict__, self.request.__dict__)
641
def test_getCancelURL(self):
642
url = self.request.getCancelURL()
643
expected = self.request.return_to + '?openid.mode=cancel'
644
self.failUnlessEqual(url, expected)
646
def test_getCancelURLimmed(self):
647
self.request.mode = 'checkid_immediate'
648
self.request.immediate = True
649
self.failUnlessRaises(ValueError, self.request.getCancelURL)
653
class TestCheckIDExtension(unittest.TestCase):
656
self.request = server.CheckIDRequest(
657
identity = 'http://bambam.unittest/',
658
trust_root = 'http://bar.unittest/',
659
return_to = 'http://bar.unittest/999',
662
self.response = server.OpenIDResponse(self.request)
663
self.response.fields['mode'] = 'id_res'
664
self.response.fields['blue'] = 'star'
665
self.response.signed.extend(['mode','identity','return_to'])
668
def test_addField(self):
670
self.response.addField(namespace, 'bright', 'potato')
671
self.failUnlessEqual(self.response.fields,
674
'mj12.bright': 'potato'})
675
self.failUnlessEqual(self.response.signed,
676
['mode', 'identity', 'return_to', 'mj12.bright'])
679
def test_addFieldNoNamespace(self):
680
self.response.addField('', 'dark', 'pages')
681
self.failUnlessEqual(self.response.fields,
686
def test_addFieldUnsigned(self):
688
self.response.addField(namespace, 'dull', 'lemon', signed=False)
689
self.failUnlessEqual(self.response.fields,
692
'mj12.dull': 'lemon'})
693
self.failUnlessEqual(self.response.signed,
694
['mode', 'identity', 'return_to'])
697
def test_addFields(self):
699
self.response.addFields(namespace, {'tangy': 'suspenders',
700
'bravo': 'inclusion'})
701
self.failUnlessEqual(self.response.fields,
704
'mi5.tangy': 'suspenders',
705
'mi5.bravo': 'inclusion'})
706
self.failUnlessEqual(self.response.signed,
707
['mode', 'identity', 'return_to',
708
'mi5.tangy', 'mi5.bravo'])
711
def test_addFieldsUnsigned(self):
713
self.response.addFields(namespace, {'strange': 'conditioner',
714
'elemental': 'blender'},
716
self.failUnlessEqual(self.response.fields,
719
'mi5.strange': 'conditioner',
720
'mi5.elemental': 'blender'})
721
self.failUnlessEqual(self.response.signed,
722
['mode', 'identity', 'return_to'])
725
def test_update(self):
726
eresponse = server.OpenIDResponse(None)
727
eresponse.fields.update({'shape': 'heart',
728
'content': 'strings,wire'})
729
eresponse.signed = ['content']
730
self.response.update('box', eresponse)
731
self.failUnlessEqual(self.response.fields,
734
'box.shape': 'heart',
735
'box.content': 'strings,wire'})
736
self.failUnlessEqual(self.response.signed,
737
['mode', 'identity', 'return_to', 'box.content'])
739
def test_updateNoNamespace(self):
740
eresponse = server.OpenIDResponse(None)
741
eresponse.fields.update({'species': 'pterodactyl',
742
'saturation': 'day-glo'})
743
eresponse.signed = ['species']
744
self.response.update(None, eresponse)
745
self.failUnlessEqual(self.response.fields,
748
'species': 'pterodactyl',
749
'saturation': 'day-glo'})
750
self.failUnlessEqual(self.response.signed,
751
['mode', 'identity', 'return_to', 'species'])
755
class MockSignatory(object):
758
def __init__(self, assoc):
759
self.assocs = [assoc]
761
def verify(self, assoc_handle, sig, signed_pairs):
764
if (True, assoc_handle) in self.assocs:
769
def getAssociation(self, assoc_handle, dumb):
770
if (dumb, assoc_handle) in self.assocs:
771
# This isn't a valid implementation for many uses of this
772
# function, mind you.
777
def invalidate(self, assoc_handle, dumb):
778
if (dumb, assoc_handle) in self.assocs:
779
self.assocs.remove((dumb, assoc_handle))
782
class TestCheckAuth(unittest.TestCase):
784
self.assoc_handle = 'mooooooooo'
785
self.request = server.CheckAuthRequest(
786
self.assoc_handle, 'signarture',
787
[('one', 'alpha'), ('two', 'beta')])
789
self.signatory = MockSignatory((True, self.assoc_handle))
791
def test_valid(self):
792
r = self.request.answer(self.signatory)
793
self.failUnlessEqual(r.fields, {'is_valid': 'true'})
794
self.failUnlessEqual(r.request, self.request)
796
def test_invalid(self):
797
self.signatory.isValid = False
798
r = self.request.answer(self.signatory)
799
self.failUnlessEqual(r.fields, {'is_valid': 'false'})
801
def test_replay(self):
802
r = self.request.answer(self.signatory)
803
r = self.request.answer(self.signatory)
804
self.failUnlessEqual(r.fields, {'is_valid': 'false'})
806
def test_invalidatehandle(self):
807
self.request.invalidate_handle = "bogusHandle"
808
r = self.request.answer(self.signatory)
809
self.failUnlessEqual(r.fields, {'is_valid': 'true',
810
'invalidate_handle': "bogusHandle"})
811
self.failUnlessEqual(r.request, self.request)
813
def test_invalidatehandleNo(self):
814
assoc_handle = 'goodhandle'
815
self.signatory.assocs.append((False, 'goodhandle'))
816
self.request.invalidate_handle = assoc_handle
817
r = self.request.answer(self.signatory)
818
self.failUnlessEqual(r.fields, {'is_valid': 'true'})
821
class TestAssociate(unittest.TestCase):
822
# TODO: test DH with non-default values for modulus and gen.
823
# (important to do because we actually had it broken for a while.)
826
self.request = server.AssociateRequest.fromQuery({})
827
self.store = _memstore.MemoryStore()
828
self.signatory = server.Signatory(self.store)
829
self.assoc = self.signatory.createAssociation(dumb=False)
832
from openid.dh import DiffieHellman
833
from openid.server.server import DiffieHellmanServerSession
834
consumer_dh = DiffieHellman.fromDefaults()
835
cpub = consumer_dh.public
836
session = DiffieHellmanServerSession(DiffieHellman.fromDefaults(), cpub)
837
self.request = server.AssociateRequest(session)
838
response = self.request.answer(self.assoc)
839
rfg = response.fields.get
840
self.failUnlessEqual(rfg("assoc_type"), "HMAC-SHA1")
841
self.failUnlessEqual(rfg("assoc_handle"), self.assoc.handle)
842
self.failIf(rfg("mac_key"))
843
self.failUnlessEqual(rfg("session_type"), "DH-SHA1")
844
self.failUnless(rfg("enc_mac_key"))
845
self.failUnless(rfg("dh_server_public"))
847
enc_key = rfg("enc_mac_key").decode('base64')
848
spub = cryptutil.base64ToLong(rfg("dh_server_public"))
849
secret = consumer_dh.xorSecret(spub, enc_key)
850
self.failUnlessEqual(secret, self.assoc.secret)
853
def test_plaintext(self):
854
response = self.request.answer(self.assoc)
855
rfg = response.fields.get
857
self.failUnlessEqual(rfg("assoc_type"), "HMAC-SHA1")
858
self.failUnlessEqual(rfg("assoc_handle"), self.assoc.handle)
860
self.failUnlessEqual(
861
rfg("expires_in"), "%d" % (self.signatory.SECRET_LIFETIME,))
862
self.failUnlessEqual(
863
rfg("mac_key"), oidutil.toBase64(self.assoc.secret))
864
self.failIf(rfg("session_type"))
865
self.failIf(rfg("enc_mac_key"))
866
self.failIf(rfg("dh_server_public"))
868
class Counter(object):
875
class TestServer(unittest.TestCase, CatchLogs):
877
self.store = _memstore.MemoryStore()
878
self.server = server.Server(self.store)
879
CatchLogs.setUp(self)
881
def test_dispatch(self):
882
monkeycalled = Counter()
883
def monkeyDo(request):
885
r = server.OpenIDResponse(request)
887
self.server.openid_monkeymode = monkeyDo
888
request = server.OpenIDRequest()
889
request.mode = "monkeymode"
890
webresult = self.server.handleRequest(request)
891
self.failUnlessEqual(monkeycalled.count, 1)
893
def test_associate(self):
894
request = server.AssociateRequest.fromQuery({})
895
response = self.server.openid_associate(request)
896
self.failUnless(response.fields.has_key("assoc_handle"))
898
def test_checkAuth(self):
899
request = server.CheckAuthRequest('arrrrrf', '0x3999', [])
900
response = self.server.openid_check_authentication(request)
901
self.failUnless(response.fields.has_key("is_valid"))
903
class TestSignatory(unittest.TestCase, CatchLogs):
905
self.store = _memstore.MemoryStore()
906
self.signatory = server.Signatory(self.store)
907
self._dumb_key = self.signatory._dumb_key
908
self._normal_key = self.signatory._normal_key
909
CatchLogs.setUp(self)
912
request = server.OpenIDRequest()
913
assoc_handle = '{assoc}{lookatme}'
914
self.store.storeAssociation(
916
association.Association.fromExpiresIn(60, assoc_handle,
917
'sekrit', 'HMAC-SHA1'))
918
request.assoc_handle = assoc_handle
919
response = server.OpenIDResponse(request)
925
response.signed = ['foo', 'azu']
926
sresponse = self.signatory.sign(response)
927
self.failUnlessEqual(sresponse.fields.get('assoc_handle'),
929
self.failUnlessEqual(sresponse.fields.get('signed'),
931
self.failUnless(sresponse.fields.get('sig'))
932
self.failIf(self.messages, self.messages)
934
def test_signDumb(self):
935
request = server.OpenIDRequest()
936
request.assoc_handle = None
937
response = server.OpenIDResponse(request)
943
response.signed = ['foo', 'azu']
944
sresponse = self.signatory.sign(response)
945
assoc_handle = sresponse.fields.get('assoc_handle')
946
self.failUnless(assoc_handle)
947
assoc = self.signatory.getAssociation(assoc_handle, dumb=True)
948
self.failUnless(assoc)
949
self.failUnlessEqual(sresponse.fields.get('signed'),
951
self.failUnless(sresponse.fields.get('sig'))
952
self.failIf(self.messages, self.messages)
954
def test_signExpired(self):
955
request = server.OpenIDRequest()
956
assoc_handle = '{assoc}{lookatme}'
957
self.store.storeAssociation(
959
association.Association.fromExpiresIn(-10, assoc_handle,
960
'sekrit', 'HMAC-SHA1'))
961
self.failUnless(self.store.getAssociation(self._normal_key, assoc_handle))
963
request.assoc_handle = assoc_handle
964
response = server.OpenIDResponse(request)
970
response.signed = ['foo', 'azu']
971
sresponse = self.signatory.sign(response)
973
new_assoc_handle = sresponse.fields.get('assoc_handle')
974
self.failUnless(new_assoc_handle)
975
self.failIfEqual(new_assoc_handle, assoc_handle)
977
self.failUnlessEqual(sresponse.fields.get('invalidate_handle'),
980
self.failUnlessEqual(sresponse.fields.get('signed'),
982
self.failUnless(sresponse.fields.get('sig'))
984
# make sure the expired association is gone
985
self.failIf(self.store.getAssociation(self._normal_key, assoc_handle))
987
# make sure the new key is a dumb mode association
988
self.failUnless(self.store.getAssociation(self._dumb_key, new_assoc_handle))
989
self.failIf(self.store.getAssociation(self._normal_key, new_assoc_handle))
990
self.failUnless(self.messages)
992
def test_signInvalidHandle(self):
993
request = server.OpenIDRequest()
994
assoc_handle = '{bogus-assoc}{notvalid}'
996
request.assoc_handle = assoc_handle
997
response = server.OpenIDResponse(request)
1001
'azu': 'alsosigned',
1003
response.signed = ['foo', 'azu']
1004
sresponse = self.signatory.sign(response)
1006
new_assoc_handle = sresponse.fields.get('assoc_handle')
1007
self.failUnless(new_assoc_handle)
1008
self.failIfEqual(new_assoc_handle, assoc_handle)
1010
self.failUnlessEqual(sresponse.fields.get('invalidate_handle'),
1013
self.failUnlessEqual(sresponse.fields.get('signed'), 'foo,azu')
1014
self.failUnless(sresponse.fields.get('sig'))
1016
# make sure the new key is a dumb mode association
1017
self.failUnless(self.store.getAssociation(self._dumb_key, new_assoc_handle))
1018
self.failIf(self.store.getAssociation(self._normal_key, new_assoc_handle))
1019
self.failIf(self.messages, self.messages)
1022
def test_verify(self):
1023
assoc_handle = '{vroom}{zoom}'
1024
assoc = association.Association.fromExpiresIn(60, assoc_handle,
1025
'sekrit', 'HMAC-SHA1')
1027
self.store.storeAssociation(self._dumb_key, assoc)
1029
signed_pairs = [('foo', 'bar'),
1030
('apple', 'orange')]
1032
sig = "Ylu0KcIR7PvNegB/K41KpnRgJl0="
1033
verified = self.signatory.verify(assoc_handle, sig, signed_pairs)
1034
self.failUnless(verified)
1035
self.failIf(self.messages, self.messages)
1037
def test_verifyBadSig(self):
1038
assoc_handle = '{vroom}{zoom}'
1039
assoc = association.Association.fromExpiresIn(60, assoc_handle,
1040
'sekrit', 'HMAC-SHA1')
1042
self.store.storeAssociation(self._dumb_key, assoc)
1044
signed_pairs = [('foo', 'bar'),
1045
('apple', 'orange')]
1047
sig = "Ylu0KcIR7PvNegB/K41KpnRgJl0=".encode('rot13')
1048
verified = self.signatory.verify(assoc_handle, sig, signed_pairs)
1049
self.failIf(verified)
1050
self.failIf(self.messages, self.messages)
1052
def test_verifyBadHandle(self):
1053
assoc_handle = '{vroom}{zoom}'
1054
signed_pairs = [('foo', 'bar'),
1055
('apple', 'orange')]
1057
sig = "Ylu0KcIR7PvNegB/K41KpnRgJl0="
1058
verified = self.signatory.verify(assoc_handle, sig, signed_pairs)
1059
self.failIf(verified)
1060
self.failUnless(self.messages)
1062
def test_getAssoc(self):
1063
assoc_handle = self.makeAssoc(dumb=True)
1064
assoc = self.signatory.getAssociation(assoc_handle, True)
1065
self.failUnless(assoc)
1066
self.failUnlessEqual(assoc.handle, assoc_handle)
1067
self.failIf(self.messages, self.messages)
1069
def test_getAssocExpired(self):
1070
assoc_handle = self.makeAssoc(dumb=True, lifetime=-10)
1071
assoc = self.signatory.getAssociation(assoc_handle, True)
1072
self.failIf(assoc, assoc)
1073
self.failUnless(self.messages)
1075
def test_getAssocInvalid(self):
1076
ah = 'no-such-handle'
1077
self.failUnlessEqual(
1078
self.signatory.getAssociation(ah, dumb=False), None)
1079
self.failIf(self.messages, self.messages)
1081
def test_getAssocDumbVsNormal(self):
1082
assoc_handle = self.makeAssoc(dumb=True)
1083
self.failUnlessEqual(
1084
self.signatory.getAssociation(assoc_handle, dumb=False), None)
1085
self.failIf(self.messages, self.messages)
1087
def test_createAssociation(self):
1088
assoc = self.signatory.createAssociation(dumb=False)
1089
self.failUnless(self.signatory.getAssociation(assoc.handle, dumb=False))
1090
self.failIf(self.messages, self.messages)
1092
def makeAssoc(self, dumb, lifetime=60):
1093
assoc_handle = '{bling}'
1094
assoc = association.Association.fromExpiresIn(lifetime, assoc_handle,
1095
'sekrit', 'HMAC-SHA1')
1097
self.store.storeAssociation((dumb and self._dumb_key) or self._normal_key, assoc)
1100
def test_invalidate(self):
1101
assoc_handle = '-squash-'
1102
assoc = association.Association.fromExpiresIn(60, assoc_handle,
1103
'sekrit', 'HMAC-SHA1')
1105
self.store.storeAssociation(self._dumb_key, assoc)
1106
assoc = self.signatory.getAssociation(assoc_handle, dumb=True)
1107
self.failUnless(assoc)
1108
assoc = self.signatory.getAssociation(assoc_handle, dumb=True)
1109
self.failUnless(assoc)
1110
self.signatory.invalidate(assoc_handle, dumb=True)
1111
assoc = self.signatory.getAssociation(assoc_handle, dumb=True)
1113
self.failIf(self.messages, self.messages)
1117
if __name__ == '__main__':