14
14
SuccessResponse, FailureResponse, SetupNeededResponse, CancelResponse, \
15
15
DiffieHellmanSHA1ConsumerSession, Consumer, PlainTextConsumerSession, \
16
16
SetupNeededError, DiffieHellmanSHA256ConsumerSession, ServerError, \
17
ProtocolError, _httpResponseToMessage
18
18
from openid import association
19
19
from openid.server.server import \
20
20
PlainTextServerSession, DiffieHellmanSHA1ServerSession
173
173
assert new_return_to.startswith(return_to)
174
174
assert redirect_url.startswith(server_url)
176
nonce_key = consumer.openid1_nonce_query_arg_name
177
nonce = request.return_to_args[nonce_key]
176
parsed = urlparse.urlparse(new_return_to)
177
query = parseQuery(parsed[4])
181
179
'openid.mode':'id_res',
182
180
'openid.return_to':new_return_to,
183
181
'openid.identity':delegate_url,
184
182
'openid.assoc_handle':fetcher.assoc_handle,
187
185
assoc = store.getAssociation(server_url, fetcher.assoc_handle)
189
187
message = Message.fromPostArgs(query)
190
188
message = assoc.signMessage(message)
191
info = consumer.complete(message, request.endpoint)
189
info = consumer.complete(message, request.endpoint, new_return_to)
192
190
assert info.status == SUCCESS, info.message
193
191
assert info.identity_url == user_url
215
213
consumer_url = 'http://consumer.example.com/'
216
214
https_server_url = 'https://server.example.com/'
218
class TestSuccess(unittest.TestCase):
216
class TestSuccess(unittest.TestCase, CatchLogs):
219
217
server_url = http_server_url
220
218
user_url = 'http://www.example.com/user.html'
221
219
delegate_url = 'http://consumer.example.com/user'
222
CatchLogs.setUp(self)
224
223
self.links = '<link rel="openid.server" href="%s" />' % (
225
224
self.server_url,)
228
227
'<link rel="openid.delegate" href="%s" />') % (
229
228
self.server_url, self.delegate_url)
231
CatchLogs.tearDown(self)
231
233
def test_nodelegate(self):
232
234
_test_success(self.server_url, self.user_url,
233
235
self.user_url, self.links)
261
263
self.failUnlessRaises(TypeError, GenericConsumer)
264
class TestIdRes(unittest.TestCase):
266
class TestIdRes(unittest.TestCase, CatchLogs):
265
267
consumer_class = GenericConsumer
270
CatchLogs.setUp(self)
268
272
self.store = memstore.MemoryStore()
269
273
self.consumer = self.consumer_class(self.store)
270
274
self.return_to = "nonny"
282
286
self.consumer._verifyDiscoveryResults = dummyVerifyDiscover
288
def disableReturnToChecking(self):
289
def checkReturnTo(unused1, unused2):
291
self.consumer._checkReturnTo = checkReturnTo
292
complete = self.consumer.complete
293
def callCompleteWithoutReturnTo(message, endpoint):
294
return complete(message, endpoint, None)
295
self.consumer.complete = callCompleteWithoutReturnTo
285
297
class TestIdResCheckSignature(TestIdRes):
380
392
self.consumer._checkSetupNeeded = raiseSetupNeeded
382
response = self.consumer.complete(message, None)
394
response = self.consumer.complete(message, None, None)
383
395
self.failUnlessEqual(SETUP_NEEDED, response.status)
384
396
self.failUnless(setup_url_sentinel is response.setup_url)
386
398
def test_cancel(self):
387
399
message = Message.fromPostArgs({'openid.mode': 'cancel'})
400
self.disableReturnToChecking()
388
401
r = self.consumer.complete(message, self.endpoint)
389
402
self.failUnlessEqual(r.status, CANCEL)
390
403
self.failUnless(r.identity_url == self.endpoint.claimed_id)
405
def test_cancel_with_return_to(self):
406
message = Message.fromPostArgs({'openid.mode': 'cancel'})
407
r = self.consumer.complete(message, self.endpoint, self.return_to)
408
self.failUnlessEqual(r.status, CANCEL)
409
self.failUnless(r.identity_url == self.endpoint.claimed_id)
392
411
def test_error(self):
393
412
msg = 'an error message'
394
413
message = Message.fromPostArgs({'openid.mode': 'error',
395
414
'openid.error': msg,
416
self.disableReturnToChecking()
397
417
r = self.consumer.complete(message, self.endpoint)
398
418
self.failUnlessEqual(r.status, FAILURE)
399
419
self.failUnless(r.identity_url == self.endpoint.claimed_id)
406
426
'openid.error': msg,
407
427
'openid.contact': contact,
429
self.disableReturnToChecking()
409
430
r = self.consumer.complete(message, self.endpoint)
410
431
self.failUnlessEqual(r.status, FAILURE)
411
432
self.failUnless(r.identity_url == self.endpoint.claimed_id)
421
442
'openid.error': msg, 'openid.reference': reference,
422
443
'openid.contact': contact, 'openid.ns': OPENID2_NS,
424
r = self.consumer.complete(message, self.endpoint)
445
r = self.consumer.complete(message, self.endpoint, None)
425
446
self.failUnlessEqual(r.status, FAILURE)
426
447
self.failUnless(r.identity_url == self.endpoint.claimed_id)
427
448
self.failUnless(r.contact == contact)
431
452
def test_noMode(self):
432
453
message = Message.fromPostArgs({})
433
r = self.consumer.complete(message, self.endpoint)
454
r = self.consumer.complete(message, self.endpoint, None)
434
455
self.failUnlessEqual(r.status, FAILURE)
435
456
self.failUnless(r.identity_url == self.endpoint.claimed_id)
440
461
# *check_auth* failed, not because it's missing an arg, exactly.
441
462
message = Message.fromPostArgs({'openid.mode': 'id_res'})
442
463
self.failUnlessRaises(ProtocolError, self.consumer._doIdRes,
443
message, self.endpoint)
464
message, self.endpoint, None)
445
466
def test_idResURLMismatch(self):
467
class VerifiedError(Exception): pass
469
def discoverAndVerify(_to_match):
472
self.consumer._discoverAndVerify = discoverAndVerify
473
self.disableReturnToChecking()
446
475
message = Message.fromPostArgs(
447
476
{'openid.mode': 'id_res',
448
477
'openid.return_to': 'return_to (just anything)',
452
481
'openid.signed': 'identity,return_to',
454
483
self.consumer.store = GoodAssocStore()
455
r = self.consumer.complete(message, self.endpoint)
456
self.failUnlessEqual(r.status, FAILURE)
457
self.failUnlessEqual(r.identity_url, self.consumer_id)
458
self.failUnless(r.message.startswith('local_id mismatch'),
485
self.failUnlessRaises(VerifiedError,
486
self.consumer.complete,
487
message, self.endpoint)
489
self.failUnlessLogMatches('Error attempting to use stored',
490
'Attempting discovery')
463
492
class TestCompleteMissingSig(unittest.TestCase, CatchLogs):
505
535
self.message.delArg(OPENID_NS, 'claimed_id')
506
536
self.endpoint.claimed_id = None
507
537
self.message.setArg(OPENID_NS, 'signed', 'return_to,response_nonce,assoc_handle')
508
r = self.consumer.complete(self.message, self.endpoint)
538
r = self.consumer.complete(self.message, self.endpoint, None)
509
539
self.failUnlessSuccess(r)
512
542
def test_idResMissingIdentitySig(self):
513
543
self.message.setArg(OPENID_NS, 'signed', 'return_to,response_nonce,assoc_handle,claimed_id')
514
r = self.consumer.complete(self.message, self.endpoint)
544
r = self.consumer.complete(self.message, self.endpoint, None)
515
545
self.failUnlessEqual(r.status, FAILURE)
518
548
def test_idResMissingReturnToSig(self):
519
549
self.message.setArg(OPENID_NS, 'signed', 'identity,response_nonce,assoc_handle,claimed_id')
520
r = self.consumer.complete(self.message, self.endpoint)
550
r = self.consumer.complete(self.message, self.endpoint, None)
521
551
self.failUnlessEqual(r.status, FAILURE)
524
554
def test_idResMissingAssocHandleSig(self):
525
555
self.message.setArg(OPENID_NS, 'signed', 'identity,response_nonce,return_to,claimed_id')
526
r = self.consumer.complete(self.message, self.endpoint)
556
r = self.consumer.complete(self.message, self.endpoint, None)
527
557
self.failUnlessEqual(r.status, FAILURE)
530
560
def test_idResMissingClaimedIDSig(self):
531
561
self.message.setArg(OPENID_NS, 'signed', 'identity,response_nonce,return_to,assoc_handle')
532
r = self.consumer.complete(self.message, self.endpoint)
562
r = self.consumer.complete(self.message, self.endpoint, None)
533
563
self.failUnlessEqual(r.status, FAILURE)
717
747
def mkSuccessTest(openid_args, signed_list):
719
749
message = Message.fromOpenIDArgs(openid_args)
720
self.consumer._idResCheckForFields(message, signed_list)
750
message.setArg(OPENID_NS, 'signed', ','.join(signed_list))
751
self.consumer._idResCheckForFields(message)
723
754
test_openid1Success = mkSuccessTest(
751
782
['return_to', 'response_nonce', 'identity',
752
783
'claimed_id', 'assoc_handle'])
754
def mkFailureTest(openid_args, signed_list, sig_fail=False):
785
def mkFailureTest(openid_args, signed_list):
756
787
message = Message.fromOpenIDArgs(openid_args)
758
self.consumer._idResCheckForFields(message, signed_list)
789
self.consumer._idResCheckForFields(message)
759
790
except ProtocolError, why:
761
self.failUnless(why[0].endswith('not signed'))
763
self.failUnless(why[0].startswith('Missing required'))
791
self.failUnless(why[0].startswith('Missing required'))
765
793
self.fail('Expected an error, but none occurred')
813
839
def test_openid1Success(self):
814
840
"""use consumer-generated nonce"""
815
self.return_to = 'http://rt.unittest/?nonce=%s' % (mkNonce(),)
841
nonce_value = mkNonce()
842
self.return_to = 'http://rt.unittest/?nonce=%s' % (nonce_value,)
816
843
self.response = Message.fromOpenIDArgs({'return_to': self.return_to})
844
self.response.setArg(BARE_NS, 'nonce', nonce_value)
817
845
self.consumer._idResCheckNonce(self.response, self.endpoint)
818
846
self.failUnlessLogEmpty()
921
949
'openid.sig': GOODSIG,
922
950
'openid.signed': 'identity,return_to',
952
self.disableReturnToChecking()
925
result = self.consumer._doIdRes(message, self.endpoint)
954
result = self.consumer._doIdRes(message, self.endpoint, None)
926
955
except CheckAuthHappened:
937
966
assoc = association.Association(
938
967
'handle', 'secret', issued, lifetime, 'HMAC-SHA1')
939
968
self.store.storeAssociation(self.server_url, assoc)
969
self.disableReturnToChecking()
941
970
message = Message.fromPostArgs({
942
971
'openid.return_to':self.return_to,
943
972
'openid.identity':self.server_id,
946
975
'openid.signed': 'identity,return_to',
949
result = self.consumer._doIdRes(message, self.endpoint)
978
result = self.consumer._doIdRes(message, self.endpoint, None)
950
979
except CheckAuthHappened:
970
999
'openid.sig': GOODSIG,
971
1000
'openid.signed': 'identity,return_to',
1002
self.disableReturnToChecking()
973
1003
self.failUnlessRaises(ProtocolError, self.consumer._doIdRes,
974
message, self.endpoint)
1004
message, self.endpoint, None)
976
1006
def test_newerAssoc(self):
997
1027
message = Message.fromOpenIDArgs(query)
998
1028
message = good_assoc.signMessage(message)
999
info = self.consumer._doIdRes(message, self.endpoint)
1029
self.disableReturnToChecking()
1030
info = self.consumer._doIdRes(message, self.endpoint, None)
1000
1031
self.failUnlessEqual(info.status, SUCCESS, info.message)
1001
1032
self.failUnlessEqual(self.consumer_id, info.identity_url)
1033
1064
# no return value, success is assumed if there are no exceptions.
1034
1065
self.consumer._verifyReturnToArgs(query)
1067
def test_returnToArgsUnexpectedArg(self):
1069
'openid.mode': 'id_res',
1070
'openid.return_to': 'http://example.com/',
1073
# no return value, success is assumed if there are no exceptions.
1074
self.failUnlessRaises(ProtocolError,
1075
self.consumer._verifyReturnToArgs, query)
1037
1077
def test_returnToMismatch(self):
1083
1123
for bad in bad_return_tos:
1084
1124
m.setArg(OPENID_NS, 'return_to', bad)
1085
result = self.consumer.complete(m, endpoint, return_to)
1086
self.failUnless(isinstance(result, FailureResponse), \
1087
"Expected FailureResponse, got %r for %s" % (result, bad))
1088
self.failUnless(result.message == \
1089
"openid.return_to does not match return URL")
1125
self.failIf(self.consumer._checkReturnTo(m, return_to))
1091
1127
def test_completeGoodReturnTo(self):
1092
1128
"""Test GenericConsumer.complete()'s handling of good
1177
1213
def test_signedList(self):
1178
1214
query = Message.fromOpenIDArgs({
1179
1215
'mode': 'id_res',
1180
1217
'sig': 'rabbits',
1181
1218
'identity': '=example',
1182
1219
'assoc_handle': 'munchkins',
1183
'signed': 'identity,mode',
1220
'ns.sreg': 'urn:sreg',
1221
'sreg.email': 'bogus@example.com',
1222
'signed': 'identity,mode,ns.sreg,sreg.email',
1186
1225
expected = Message.fromOpenIDArgs({
1188
1227
'sig': 'rabbits',
1189
1228
'assoc_handle': 'munchkins',
1190
1229
'identity': '=example',
1191
'signed': 'identity,mode'
1230
'signed': 'identity,mode,ns.sreg,sreg.email',
1231
'ns.sreg': 'urn:sreg',
1232
'sreg.email': 'bogus@example.com',
1193
1234
args = self.consumer._createCheckAuthRequest(query)
1194
1235
self.failUnlessEqual(args.toPostArgs(), expected.toPostArgs())
1315
1356
resp = mkSuccess(self.endpoint, {'return_to':'return_to'})
1316
1357
self.failUnlessEqual(resp.getReturnTo(), 'return_to')
1359
def test_displayIdentifierClaimedId(self):
1360
resp = mkSuccess(self.endpoint, {})
1361
self.failUnlessEqual(resp.getDisplayIdentifier(),
1362
resp.endpoint.claimed_id)
1364
def test_displayIdentifierOverride(self):
1365
self.endpoint.display_identifier = "http://input.url/"
1366
resp = mkSuccess(self.endpoint, {})
1367
self.failUnlessEqual(resp.getDisplayIdentifier(),
1368
"http://input.url/")
1318
1370
class StubConsumer(object):
1319
1371
def __init__(self):
1320
1372
self.assoc = object()
1352
1404
association.SessionNegotiator))
1353
1405
self.failUnlessEqual([],
1354
1406
self.consumer.consumer.negotiator.allowed_types)
1355
self.consumer.setAssociationPreference([('FOO', 'BAR')])
1356
self.failUnlessEqual([('FOO', 'BAR')],
1407
self.consumer.setAssociationPreference([('HMAC-SHA1', 'DH-SHA1')])
1408
self.failUnlessEqual([('HMAC-SHA1', 'DH-SHA1')],
1357
1409
self.consumer.consumer.negotiator.allowed_types)
1359
1411
def withDummyDiscovery(self, callable, dummy_getNextService):
1420
1472
self.failUnless(result.endpoint is self.endpoint)
1422
1474
def test_completeEmptySession(self):
1423
response = self.consumer.complete({})
1475
text = "failed complete"
1477
def checkEndpoint(message, endpoint, return_to):
1478
self.failUnless(endpoint is None)
1479
return FailureResponse(endpoint, text)
1481
self.consumer.consumer.complete = checkEndpoint
1483
response = self.consumer.complete({}, None)
1424
1484
self.failUnlessEqual(response.status, FAILURE)
1485
self.failUnlessEqual(response.message, text)
1425
1486
self.failUnless(response.identity_url is None)
1427
1488
def _doResp(self, auth_req, exp_resp):
1434
1495
# endpoint is stored in the session
1435
1496
self.failUnless(self.session)
1436
resp = self.consumer.complete({})
1497
resp = self.consumer.complete({}, None)
1438
1499
# All responses should have the same identity URL, and the
1439
1500
# session should be cleaned out
1440
self.failUnless(resp.identity_url is self.identity_url)
1501
if self.endpoint.claimed_id != IDENTIFIER_SELECT:
1502
self.failUnless(resp.identity_url is self.identity_url)
1441
1504
self.failIf(self.consumer._token_key in self.session)
1443
1506
# Expected status response
1506
1569
SetupNeededResponse(self.endpoint, setup_url))
1507
1570
self.failUnless(resp.setup_url is setup_url)
1572
def test_successDifferentURL(self):
1574
Be sure that the session gets cleaned up when the response is
1575
successful and has a different URL than the one in the
1578
# Set up a request endpoint describing an IDP URL
1579
self.identity_url = 'http://idp.url/'
1580
self.endpoint.claimed_id = self.endpoint.local_id = IDENTIFIER_SELECT
1582
# Use a response endpoint with a different URL (asserted by
1584
resp_endpoint = OpenIDServiceEndpoint()
1585
resp_endpoint.claimed_id = "http://user.url/"
1587
resp = self._doRespDisco(
1589
mkSuccess(resp_endpoint, {}))
1590
self.failUnless(self.discovery.getManager(force=True) is None)
1509
1592
def test_begin(self):
1510
1593
self.discovery.createManager([self.endpoint], self.identity_url)
1511
1594
# Should not raise an exception
1552
1635
return discovered_endpoint
1553
1636
self.consumer._verifyDiscoveryResults = verifyDiscoveryResults
1554
1637
self.consumer._idResCheckNonce = lambda *args: True
1555
response = self.consumer._doIdRes(message, self.endpoint)
1638
self.consumer._checkReturnTo = lambda unused1, unused2 : True
1639
response = self.consumer._doIdRes(message, self.endpoint, None)
1557
1641
self.failUnlessSuccess(response)
1558
1642
self.failUnlessEqual(response.identity_url, "=directed_identifier")
1573
1657
def verifyDiscoveryResults(identifier, endpoint):
1574
1658
raise DiscoveryFailure("PHREAK!", None)
1575
1659
self.consumer._verifyDiscoveryResults = verifyDiscoveryResults
1660
self.consumer._checkReturnTo = lambda unused1, unused2 : True
1576
1661
self.failUnlessRaises(DiscoveryFailure, self.consumer._doIdRes,
1577
message, self.endpoint)
1662
message, self.endpoint, None)
1580
1665
def failUnlessSuccess(self, response):
1620
1705
def test_otherServer(self):
1706
text = "verify failed"
1708
def discoverAndVerify(to_match):
1709
self.failUnlessEqual(self.identifier, to_match.claimed_id)
1710
raise ProtocolError(text)
1712
self.consumer._discoverAndVerify = discoverAndVerify
1621
1714
# a set of things without the stuff
1622
1715
endpoint = OpenIDServiceEndpoint()
1623
1716
endpoint.type_uris = [OPENID_2_0_TYPE]
1629
1722
r = self.consumer._verifyDiscoveryResults(self.message, endpoint)
1630
1723
except ProtocolError, e:
1631
1724
# Should we make more ProtocolError subclasses?
1632
self.failUnless('OP Endpoint mismatch' in str(e), e)
1725
self.failUnless(str(e), text)
1634
1727
self.fail("expected ProtocolError, %r returned." % (r,))
1637
1730
def test_foreignDelegate(self):
1731
text = "verify failed"
1733
def discoverAndVerify(to_match):
1734
self.failUnlessEqual(self.identifier, to_match.claimed_id)
1735
raise ProtocolError(text)
1737
self.consumer._discoverAndVerify = discoverAndVerify
1638
1739
# a set of things with the server stuff but other delegate
1639
1740
endpoint = OpenIDServiceEndpoint()
1640
1741
endpoint.type_uris = [OPENID_2_0_TYPE]
1641
1742
endpoint.claimed_id = self.identifier
1642
1743
endpoint.server_url = self.server_url
1643
1744
endpoint.local_id = "http://unittest/juan-carlos"
1645
1747
r = self.consumer._verifyDiscoveryResults(self.message, endpoint)
1646
1748
except ProtocolError, e:
1647
self.failUnless('local_id mismatch' in str(e), e)
1749
self.failUnlessEqual(str(e), text)
1649
self.fail("expected ProtocolError, %r returned." % (r,))
1751
self.fail("Exepected ProtocolError, %r returned" % (r,))
1652
1753
def test_nothingDiscovered(self):
1653
1754
# a set of no things.
1899
2000
ar.addExtension(ext)
1900
2001
ext_args = ar.message.getArgs(ext.ns_uri)
1901
2002
self.failUnlessEqual(ext.getExtensionArgs(), ext_args)
2006
class TestKVPost(unittest.TestCase):
2008
self.server_url = 'http://unittest/%s' % (self.id(),)
2011
from openid.fetchers import HTTPResponse
2012
response = HTTPResponse()
2013
response.status = 200
2014
response.body = "foo:bar\nbaz:quux\n"
2015
r = _httpResponseToMessage(response, self.server_url)
2016
expected_msg = Message.fromOpenIDArgs({'foo':'bar','baz':'quux'})
2017
self.failUnlessEqual(expected_msg, r)
2021
response = HTTPResponse()
2022
response.status = 400
2023
response.body = "error:bonk\nerror_code:7\n"
2025
r = _httpResponseToMessage(response, self.server_url)
2026
except ServerError, e:
2027
self.failUnlessEqual(e.error_text, 'bonk')
2028
self.failUnlessEqual(e.error_code, '7')
2030
self.fail("Expected ServerError, got return %r" % (r,))
2034
# 500 as an example of any non-200, non-400 code.
2035
response = HTTPResponse()
2036
response.status = 500
2037
response.body = "foo:bar\nbaz:quux\n"
2038
self.failUnlessRaises(fetchers.HTTPFetchingError,
2039
_httpResponseToMessage, response,
1902
2045
if __name__ == '__main__':
1903
2046
unittest.main()