~gary/python-openid/python-openid-2.2.1-patched

« back to all changes in this revision

Viewing changes to openid/test/test_consumer.py

  • Committer: Launchpad Patch Queue Manager
  • Date: 2007-12-10 10:29:01 UTC
  • mfrom: (2.1.2 pyopenid-2.1)
  • Revision ID: launchpad@pqm.canonical.com-20071210102901-f5mk3xr5ggx0vtwh
[rs=jamesh] Upgrade to python-openid-2.1 for better support of final OpenID 2.0 standard

Show diffs side-by-side

added added

removed removed

Lines of Context:
14
14
     SuccessResponse, FailureResponse, SetupNeededResponse, CancelResponse, \
15
15
     DiffieHellmanSHA1ConsumerSession, Consumer, PlainTextConsumerSession, \
16
16
     SetupNeededError, DiffieHellmanSHA256ConsumerSession, ServerError, \
17
 
     ProtocolError
 
17
     ProtocolError, _httpResponseToMessage
18
18
from openid import association
19
19
from openid.server.server import \
20
20
     PlainTextServerSession, DiffieHellmanSHA1ServerSession
173
173
        assert new_return_to.startswith(return_to)
174
174
        assert redirect_url.startswith(server_url)
175
175
 
176
 
        nonce_key = consumer.openid1_nonce_query_arg_name
177
 
        nonce = request.return_to_args[nonce_key]
178
 
 
179
 
        query = {
180
 
            nonce_key:nonce,
 
176
        parsed = urlparse.urlparse(new_return_to)
 
177
        query = parseQuery(parsed[4])
 
178
        query.update({
181
179
            'openid.mode':'id_res',
182
180
            'openid.return_to':new_return_to,
183
181
            'openid.identity':delegate_url,
184
182
            'openid.assoc_handle':fetcher.assoc_handle,
185
 
            }
 
183
            })
186
184
 
187
185
        assoc = store.getAssociation(server_url, fetcher.assoc_handle)
188
186
 
189
187
        message = Message.fromPostArgs(query)
190
188
        message = assoc.signMessage(message)
191
 
        info = consumer.complete(message, request.endpoint)
 
189
        info = consumer.complete(message, request.endpoint, new_return_to)
192
190
        assert info.status == SUCCESS, info.message
193
191
        assert info.identity_url == user_url
194
192
 
215
213
consumer_url = 'http://consumer.example.com/'
216
214
https_server_url = 'https://server.example.com/'
217
215
 
218
 
class TestSuccess(unittest.TestCase):
 
216
class TestSuccess(unittest.TestCase, CatchLogs):
219
217
    server_url = http_server_url
220
218
    user_url = 'http://www.example.com/user.html'
221
219
    delegate_url = 'http://consumer.example.com/user'
222
220
 
223
221
    def setUp(self):
 
222
        CatchLogs.setUp(self)
224
223
        self.links = '<link rel="openid.server" href="%s" />' % (
225
224
            self.server_url,)
226
225
 
228
227
                               '<link rel="openid.delegate" href="%s" />') % (
229
228
            self.server_url, self.delegate_url)
230
229
 
 
230
    def tearDown(self):
 
231
        CatchLogs.tearDown(self)
 
232
 
231
233
    def test_nodelegate(self):
232
234
        _test_success(self.server_url, self.user_url,
233
235
                      self.user_url, self.links)
261
263
        self.failUnlessRaises(TypeError, GenericConsumer)
262
264
 
263
265
 
264
 
class TestIdRes(unittest.TestCase):
 
266
class TestIdRes(unittest.TestCase, CatchLogs):
265
267
    consumer_class = GenericConsumer
266
268
 
267
269
    def setUp(self):
 
270
        CatchLogs.setUp(self)
 
271
 
268
272
        self.store = memstore.MemoryStore()
269
273
        self.consumer = self.consumer_class(self.store)
270
274
        self.return_to = "nonny"
281
285
            return endpoint
282
286
        self.consumer._verifyDiscoveryResults = dummyVerifyDiscover
283
287
 
 
288
    def disableReturnToChecking(self):
 
289
        def checkReturnTo(unused1, unused2):
 
290
            return True
 
291
        self.consumer._checkReturnTo = checkReturnTo
 
292
        complete = self.consumer.complete
 
293
        def callCompleteWithoutReturnTo(message, endpoint):
 
294
            return complete(message, endpoint, None)
 
295
        self.consumer.complete = callCompleteWithoutReturnTo
284
296
 
285
297
class TestIdResCheckSignature(TestIdRes):
286
298
    def setUp(self):
379
391
 
380
392
        self.consumer._checkSetupNeeded = raiseSetupNeeded
381
393
 
382
 
        response = self.consumer.complete(message, None)
 
394
        response = self.consumer.complete(message, None, None)
383
395
        self.failUnlessEqual(SETUP_NEEDED, response.status)
384
396
        self.failUnless(setup_url_sentinel is response.setup_url)
385
397
 
386
398
    def test_cancel(self):
387
399
        message = Message.fromPostArgs({'openid.mode': 'cancel'})
 
400
        self.disableReturnToChecking()
388
401
        r = self.consumer.complete(message, self.endpoint)
389
402
        self.failUnlessEqual(r.status, CANCEL)
390
403
        self.failUnless(r.identity_url == self.endpoint.claimed_id)
391
404
 
 
405
    def test_cancel_with_return_to(self):
 
406
        message = Message.fromPostArgs({'openid.mode': 'cancel'})
 
407
        r = self.consumer.complete(message, self.endpoint, self.return_to)
 
408
        self.failUnlessEqual(r.status, CANCEL)
 
409
        self.failUnless(r.identity_url == self.endpoint.claimed_id)
 
410
 
392
411
    def test_error(self):
393
412
        msg = 'an error message'
394
413
        message = Message.fromPostArgs({'openid.mode': 'error',
395
414
                 'openid.error': msg,
396
415
                 })
 
416
        self.disableReturnToChecking()
397
417
        r = self.consumer.complete(message, self.endpoint)
398
418
        self.failUnlessEqual(r.status, FAILURE)
399
419
        self.failUnless(r.identity_url == self.endpoint.claimed_id)
406
426
                 'openid.error': msg,
407
427
                 'openid.contact': contact,
408
428
                 })
 
429
        self.disableReturnToChecking()
409
430
        r = self.consumer.complete(message, self.endpoint)
410
431
        self.failUnlessEqual(r.status, FAILURE)
411
432
        self.failUnless(r.identity_url == self.endpoint.claimed_id)
421
442
                 'openid.error': msg, 'openid.reference': reference,
422
443
                 'openid.contact': contact, 'openid.ns': OPENID2_NS,
423
444
                 })
424
 
        r = self.consumer.complete(message, self.endpoint)
 
445
        r = self.consumer.complete(message, self.endpoint, None)
425
446
        self.failUnlessEqual(r.status, FAILURE)
426
447
        self.failUnless(r.identity_url == self.endpoint.claimed_id)
427
448
        self.failUnless(r.contact == contact)
430
451
 
431
452
    def test_noMode(self):
432
453
        message = Message.fromPostArgs({})
433
 
        r = self.consumer.complete(message, self.endpoint)
 
454
        r = self.consumer.complete(message, self.endpoint, None)
434
455
        self.failUnlessEqual(r.status, FAILURE)
435
456
        self.failUnless(r.identity_url == self.endpoint.claimed_id)
436
457
 
440
461
        # *check_auth* failed, not because it's missing an arg, exactly.
441
462
        message = Message.fromPostArgs({'openid.mode': 'id_res'})
442
463
        self.failUnlessRaises(ProtocolError, self.consumer._doIdRes,
443
 
                              message, self.endpoint)
 
464
                              message, self.endpoint, None)
444
465
 
445
466
    def test_idResURLMismatch(self):
 
467
        class VerifiedError(Exception): pass
 
468
 
 
469
        def discoverAndVerify(_to_match):
 
470
            raise VerifiedError
 
471
 
 
472
        self.consumer._discoverAndVerify = discoverAndVerify
 
473
        self.disableReturnToChecking()
 
474
 
446
475
        message = Message.fromPostArgs(
447
476
            {'openid.mode': 'id_res',
448
477
             'openid.return_to': 'return_to (just anything)',
452
481
             'openid.signed': 'identity,return_to',
453
482
             })
454
483
        self.consumer.store = GoodAssocStore()
455
 
        r = self.consumer.complete(message, self.endpoint)
456
 
        self.failUnlessEqual(r.status, FAILURE)
457
 
        self.failUnlessEqual(r.identity_url, self.consumer_id)
458
 
        self.failUnless(r.message.startswith('local_id mismatch'),
459
 
                        r.message)
460
 
 
461
 
 
 
484
 
 
485
        self.failUnlessRaises(VerifiedError,
 
486
                              self.consumer.complete,
 
487
                              message, self.endpoint)
 
488
 
 
489
        self.failUnlessLogMatches('Error attempting to use stored',
 
490
                                  'Attempting discovery')
462
491
 
463
492
class TestCompleteMissingSig(unittest.TestCase, CatchLogs):
464
493
 
486
515
        self.endpoint = OpenIDServiceEndpoint()
487
516
        self.endpoint.server_url = self.server_url
488
517
        self.endpoint.claimed_id = claimed_id
 
518
        self.consumer._checkReturnTo = lambda unused1, unused2 : True
489
519
 
490
520
    def tearDown(self):
491
521
        CatchLogs.tearDown(self)
496
526
            return endpoint
497
527
 
498
528
        self.consumer._verifyDiscoveryResults = _vrfy
499
 
        r = self.consumer.complete(self.message, self.endpoint)
 
529
        r = self.consumer.complete(self.message, self.endpoint, None)
500
530
        self.failUnlessSuccess(r)
501
531
 
502
532
 
505
535
        self.message.delArg(OPENID_NS, 'claimed_id')
506
536
        self.endpoint.claimed_id = None
507
537
        self.message.setArg(OPENID_NS, 'signed', 'return_to,response_nonce,assoc_handle')
508
 
        r = self.consumer.complete(self.message, self.endpoint)
 
538
        r = self.consumer.complete(self.message, self.endpoint, None)
509
539
        self.failUnlessSuccess(r)
510
540
 
511
541
 
512
542
    def test_idResMissingIdentitySig(self):
513
543
        self.message.setArg(OPENID_NS, 'signed', 'return_to,response_nonce,assoc_handle,claimed_id')
514
 
        r = self.consumer.complete(self.message, self.endpoint)
 
544
        r = self.consumer.complete(self.message, self.endpoint, None)
515
545
        self.failUnlessEqual(r.status, FAILURE)
516
546
 
517
547
 
518
548
    def test_idResMissingReturnToSig(self):
519
549
        self.message.setArg(OPENID_NS, 'signed', 'identity,response_nonce,assoc_handle,claimed_id')
520
 
        r = self.consumer.complete(self.message, self.endpoint)
 
550
        r = self.consumer.complete(self.message, self.endpoint, None)
521
551
        self.failUnlessEqual(r.status, FAILURE)
522
552
 
523
553
 
524
554
    def test_idResMissingAssocHandleSig(self):
525
555
        self.message.setArg(OPENID_NS, 'signed', 'identity,response_nonce,return_to,claimed_id')
526
 
        r = self.consumer.complete(self.message, self.endpoint)
 
556
        r = self.consumer.complete(self.message, self.endpoint, None)
527
557
        self.failUnlessEqual(r.status, FAILURE)
528
558
 
529
559
 
530
560
    def test_idResMissingClaimedIDSig(self):
531
561
        self.message.setArg(OPENID_NS, 'signed', 'identity,response_nonce,return_to,assoc_handle')
532
 
        r = self.consumer.complete(self.message, self.endpoint)
 
562
        r = self.consumer.complete(self.message, self.endpoint, None)
533
563
        self.failUnlessEqual(r.status, FAILURE)
534
564
 
535
565
 
717
747
    def mkSuccessTest(openid_args, signed_list):
718
748
        def test(self):
719
749
            message = Message.fromOpenIDArgs(openid_args)
720
 
            self.consumer._idResCheckForFields(message, signed_list)
 
750
            message.setArg(OPENID_NS, 'signed', ','.join(signed_list))
 
751
            self.consumer._idResCheckForFields(message)
721
752
        return test
722
753
 
723
754
    test_openid1Success = mkSuccessTest(
751
782
        ['return_to', 'response_nonce', 'identity',
752
783
         'claimed_id', 'assoc_handle'])
753
784
 
754
 
    def mkFailureTest(openid_args, signed_list, sig_fail=False):
 
785
    def mkFailureTest(openid_args, signed_list):
755
786
        def test(self):
756
787
            message = Message.fromOpenIDArgs(openid_args)
757
788
            try:
758
 
                self.consumer._idResCheckForFields(message, signed_list)
 
789
                self.consumer._idResCheckForFields(message)
759
790
            except ProtocolError, why:
760
 
                if sig_fail:
761
 
                    self.failUnless(why[0].endswith('not signed'))
762
 
                else:
763
 
                    self.failUnless(why[0].startswith('Missing required'))
 
791
                self.failUnless(why[0].startswith('Missing required'))
764
792
            else:
765
793
                self.fail('Expected an error, but none occurred')
766
794
        return test
771
799
         'sig':'a signature',
772
800
         'identity':'someone',
773
801
         },
774
 
        ['identity'],
775
 
        sig_fail=True)
 
802
        ['identity'])
776
803
 
777
804
    test_openid1Missing_identitySig = mkFailureTest(
778
805
        {'return_to':'return',
780
807
         'sig':'a signature',
781
808
         'identity':'someone',
782
809
         },
783
 
        ['return_to'],
784
 
        sig_fail=True)
 
810
        ['return_to'])
785
811
 
786
812
    test_openid1MissingReturnTo = mkFailureTest(
787
813
        {'assoc_handle':'assoc handle',
812
838
 
813
839
    def test_openid1Success(self):
814
840
        """use consumer-generated nonce"""
815
 
        self.return_to = 'http://rt.unittest/?nonce=%s' % (mkNonce(),)
 
841
        nonce_value = mkNonce()
 
842
        self.return_to = 'http://rt.unittest/?nonce=%s' % (nonce_value,)
816
843
        self.response = Message.fromOpenIDArgs({'return_to': self.return_to})
 
844
        self.response.setArg(BARE_NS, 'nonce', nonce_value)
817
845
        self.consumer._idResCheckNonce(self.response, self.endpoint)
818
846
        self.failUnlessLogEmpty()
819
847
 
921
949
            'openid.sig': GOODSIG,
922
950
            'openid.signed': 'identity,return_to',
923
951
            })
 
952
        self.disableReturnToChecking()
924
953
        try:
925
 
            result = self.consumer._doIdRes(message, self.endpoint)
 
954
            result = self.consumer._doIdRes(message, self.endpoint, None)
926
955
        except CheckAuthHappened:
927
956
            pass
928
957
        else:
937
966
        assoc = association.Association(
938
967
            'handle', 'secret', issued, lifetime, 'HMAC-SHA1')
939
968
        self.store.storeAssociation(self.server_url, assoc)
940
 
 
 
969
        self.disableReturnToChecking()
941
970
        message = Message.fromPostArgs({
942
971
            'openid.return_to':self.return_to,
943
972
            'openid.identity':self.server_id,
946
975
            'openid.signed': 'identity,return_to',
947
976
            })
948
977
        try:
949
 
            result = self.consumer._doIdRes(message, self.endpoint)
 
978
            result = self.consumer._doIdRes(message, self.endpoint, None)
950
979
        except CheckAuthHappened:
951
980
            pass
952
981
        else:
970
999
            'openid.sig': GOODSIG,
971
1000
            'openid.signed': 'identity,return_to',
972
1001
            })
 
1002
        self.disableReturnToChecking()
973
1003
        self.failUnlessRaises(ProtocolError, self.consumer._doIdRes,
974
 
                              message, self.endpoint)
 
1004
                              message, self.endpoint, None)
975
1005
 
976
1006
    def test_newerAssoc(self):
977
1007
        lifetime = 1000
996
1026
 
997
1027
        message = Message.fromOpenIDArgs(query)
998
1028
        message = good_assoc.signMessage(message)
999
 
        info = self.consumer._doIdRes(message, self.endpoint)
 
1029
        self.disableReturnToChecking()
 
1030
        info = self.consumer._doIdRes(message, self.endpoint, None)
1000
1031
        self.failUnlessEqual(info.status, SUCCESS, info.message)
1001
1032
        self.failUnlessEqual(self.consumer_id, info.identity_url)
1002
1033
 
1033
1064
        # no return value, success is assumed if there are no exceptions.
1034
1065
        self.consumer._verifyReturnToArgs(query)
1035
1066
 
 
1067
    def test_returnToArgsUnexpectedArg(self):
 
1068
        query = {
 
1069
            'openid.mode': 'id_res',
 
1070
            'openid.return_to': 'http://example.com/',
 
1071
            'foo': 'bar',
 
1072
            }
 
1073
        # no return value, success is assumed if there are no exceptions.
 
1074
        self.failUnlessRaises(ProtocolError,
 
1075
                              self.consumer._verifyReturnToArgs, query)
1036
1076
 
1037
1077
    def test_returnToMismatch(self):
1038
1078
        query = {
1082
1122
 
1083
1123
        for bad in bad_return_tos:
1084
1124
            m.setArg(OPENID_NS, 'return_to', bad)
1085
 
            result = self.consumer.complete(m, endpoint, return_to)
1086
 
            self.failUnless(isinstance(result, FailureResponse), \
1087
 
                            "Expected FailureResponse, got %r for %s" % (result, bad))
1088
 
            self.failUnless(result.message == \
1089
 
                            "openid.return_to does not match return URL")
 
1125
            self.failIf(self.consumer._checkReturnTo(m, return_to))
1090
1126
 
1091
1127
    def test_completeGoodReturnTo(self):
1092
1128
        """Test GenericConsumer.complete()'s handling of good
1177
1213
    def test_signedList(self):
1178
1214
        query = Message.fromOpenIDArgs({
1179
1215
            'mode': 'id_res',
 
1216
            'ns': OPENID2_NS,
1180
1217
            'sig': 'rabbits',
1181
1218
            'identity': '=example',
1182
1219
            'assoc_handle': 'munchkins',
1183
 
            'signed': 'identity,mode',
 
1220
            'ns.sreg': 'urn:sreg',
 
1221
            'sreg.email': 'bogus@example.com',
 
1222
            'signed': 'identity,mode,ns.sreg,sreg.email',
1184
1223
            'foo': 'bar',
1185
1224
            })
1186
1225
        expected = Message.fromOpenIDArgs({
1188
1227
            'sig': 'rabbits',
1189
1228
            'assoc_handle': 'munchkins',
1190
1229
            'identity': '=example',
1191
 
            'signed': 'identity,mode'
 
1230
            'signed': 'identity,mode,ns.sreg,sreg.email',
 
1231
            'ns.sreg': 'urn:sreg',
 
1232
            'sreg.email': 'bogus@example.com',
1192
1233
            })
1193
1234
        args = self.consumer._createCheckAuthRequest(query)
1194
1235
        self.failUnlessEqual(args.toPostArgs(), expected.toPostArgs())
1315
1356
        resp = mkSuccess(self.endpoint, {'return_to':'return_to'})
1316
1357
        self.failUnlessEqual(resp.getReturnTo(), 'return_to')
1317
1358
 
 
1359
    def test_displayIdentifierClaimedId(self):
 
1360
        resp = mkSuccess(self.endpoint, {})
 
1361
        self.failUnlessEqual(resp.getDisplayIdentifier(),
 
1362
                             resp.endpoint.claimed_id)
 
1363
 
 
1364
    def test_displayIdentifierOverride(self):
 
1365
        self.endpoint.display_identifier = "http://input.url/"
 
1366
        resp = mkSuccess(self.endpoint, {})
 
1367
        self.failUnlessEqual(resp.getDisplayIdentifier(),
 
1368
                             "http://input.url/")
 
1369
 
1318
1370
class StubConsumer(object):
1319
1371
    def __init__(self):
1320
1372
        self.assoc = object()
1326
1378
        self.endpoint = service
1327
1379
        return auth_req
1328
1380
 
1329
 
    def complete(self, message, endpoint, return_to=None):
 
1381
    def complete(self, message, endpoint, return_to):
1330
1382
        assert endpoint is self.endpoint
1331
1383
        return self.response
1332
1384
 
1352
1404
                                   association.SessionNegotiator))
1353
1405
        self.failUnlessEqual([],
1354
1406
                             self.consumer.consumer.negotiator.allowed_types)
1355
 
        self.consumer.setAssociationPreference([('FOO', 'BAR')])
1356
 
        self.failUnlessEqual([('FOO', 'BAR')],
 
1407
        self.consumer.setAssociationPreference([('HMAC-SHA1', 'DH-SHA1')])
 
1408
        self.failUnlessEqual([('HMAC-SHA1', 'DH-SHA1')],
1357
1409
                             self.consumer.consumer.negotiator.allowed_types)
1358
1410
 
1359
1411
    def withDummyDiscovery(self, callable, dummy_getNextService):
1420
1472
        self.failUnless(result.endpoint is self.endpoint)
1421
1473
 
1422
1474
    def test_completeEmptySession(self):
1423
 
        response = self.consumer.complete({})
 
1475
        text = "failed complete"
 
1476
 
 
1477
        def checkEndpoint(message, endpoint, return_to):
 
1478
            self.failUnless(endpoint is None)
 
1479
            return FailureResponse(endpoint, text)
 
1480
 
 
1481
        self.consumer.consumer.complete = checkEndpoint
 
1482
 
 
1483
        response = self.consumer.complete({}, None)
1424
1484
        self.failUnlessEqual(response.status, FAILURE)
 
1485
        self.failUnlessEqual(response.message, text)
1425
1486
        self.failUnless(response.identity_url is None)
1426
1487
 
1427
1488
    def _doResp(self, auth_req, exp_resp):
1433
1494
 
1434
1495
        # endpoint is stored in the session
1435
1496
        self.failUnless(self.session)
1436
 
        resp = self.consumer.complete({})
 
1497
        resp = self.consumer.complete({}, None)
1437
1498
 
1438
1499
        # All responses should have the same identity URL, and the
1439
1500
        # session should be cleaned out
1440
 
        self.failUnless(resp.identity_url is self.identity_url)
 
1501
        if self.endpoint.claimed_id != IDENTIFIER_SELECT:
 
1502
            self.failUnless(resp.identity_url is self.identity_url)
 
1503
 
1441
1504
        self.failIf(self.consumer._token_key in self.session)
1442
1505
 
1443
1506
        # Expected status response
1506
1569
            SetupNeededResponse(self.endpoint, setup_url))
1507
1570
        self.failUnless(resp.setup_url is setup_url)
1508
1571
 
 
1572
    def test_successDifferentURL(self):
 
1573
        """
 
1574
        Be sure that the session gets cleaned up when the response is
 
1575
        successful and has a different URL than the one in the
 
1576
        request.
 
1577
        """
 
1578
        # Set up a request endpoint describing an IDP URL
 
1579
        self.identity_url = 'http://idp.url/'
 
1580
        self.endpoint.claimed_id = self.endpoint.local_id = IDENTIFIER_SELECT
 
1581
 
 
1582
        # Use a response endpoint with a different URL (asserted by
 
1583
        # the IDP)
 
1584
        resp_endpoint = OpenIDServiceEndpoint()
 
1585
        resp_endpoint.claimed_id = "http://user.url/"
 
1586
 
 
1587
        resp = self._doRespDisco(
 
1588
            True,
 
1589
            mkSuccess(resp_endpoint, {}))
 
1590
        self.failUnless(self.discovery.getManager(force=True) is None)
 
1591
 
1509
1592
    def test_begin(self):
1510
1593
        self.discovery.createManager([self.endpoint], self.identity_url)
1511
1594
        # Should not raise an exception
1552
1635
            return discovered_endpoint
1553
1636
        self.consumer._verifyDiscoveryResults = verifyDiscoveryResults
1554
1637
        self.consumer._idResCheckNonce = lambda *args: True
1555
 
        response = self.consumer._doIdRes(message, self.endpoint)
 
1638
        self.consumer._checkReturnTo = lambda unused1, unused2 : True
 
1639
        response = self.consumer._doIdRes(message, self.endpoint, None)
1556
1640
 
1557
1641
        self.failUnlessSuccess(response)
1558
1642
        self.failUnlessEqual(response.identity_url, "=directed_identifier")
1573
1657
        def verifyDiscoveryResults(identifier, endpoint):
1574
1658
            raise DiscoveryFailure("PHREAK!", None)
1575
1659
        self.consumer._verifyDiscoveryResults = verifyDiscoveryResults
 
1660
        self.consumer._checkReturnTo = lambda unused1, unused2 : True
1576
1661
        self.failUnlessRaises(DiscoveryFailure, self.consumer._doIdRes,
1577
 
                              message, self.endpoint)
 
1662
                              message, self.endpoint, None)
1578
1663
 
1579
1664
 
1580
1665
    def failUnlessSuccess(self, response):
1618
1703
 
1619
1704
 
1620
1705
    def test_otherServer(self):
 
1706
        text = "verify failed"
 
1707
 
 
1708
        def discoverAndVerify(to_match):
 
1709
            self.failUnlessEqual(self.identifier, to_match.claimed_id)
 
1710
            raise ProtocolError(text)
 
1711
 
 
1712
        self.consumer._discoverAndVerify = discoverAndVerify
 
1713
 
1621
1714
        # a set of things without the stuff
1622
1715
        endpoint = OpenIDServiceEndpoint()
1623
1716
        endpoint.type_uris = [OPENID_2_0_TYPE]
1629
1722
            r = self.consumer._verifyDiscoveryResults(self.message, endpoint)
1630
1723
        except ProtocolError, e:
1631
1724
            # Should we make more ProtocolError subclasses?
1632
 
            self.failUnless('OP Endpoint mismatch' in str(e), e)
 
1725
            self.failUnless(str(e), text)
1633
1726
        else:
1634
1727
            self.fail("expected ProtocolError, %r returned." % (r,))
1635
1728
            
1636
1729
 
1637
1730
    def test_foreignDelegate(self):
 
1731
        text = "verify failed"
 
1732
 
 
1733
        def discoverAndVerify(to_match):
 
1734
            self.failUnlessEqual(self.identifier, to_match.claimed_id)
 
1735
            raise ProtocolError(text)
 
1736
 
 
1737
        self.consumer._discoverAndVerify = discoverAndVerify
 
1738
 
1638
1739
        # a set of things with the server stuff but other delegate
1639
1740
        endpoint = OpenIDServiceEndpoint()
1640
1741
        endpoint.type_uris = [OPENID_2_0_TYPE]
1641
1742
        endpoint.claimed_id = self.identifier
1642
1743
        endpoint.server_url = self.server_url
1643
1744
        endpoint.local_id = "http://unittest/juan-carlos"
 
1745
 
1644
1746
        try:
1645
1747
            r = self.consumer._verifyDiscoveryResults(self.message, endpoint)
1646
1748
        except ProtocolError, e:
1647
 
            self.failUnless('local_id mismatch' in str(e), e)
 
1749
            self.failUnlessEqual(str(e), text)
1648
1750
        else:
1649
 
            self.fail("expected ProtocolError, %r returned." % (r,))
1650
 
 
 
1751
            self.fail("Exepected ProtocolError, %r returned" % (r,))
1651
1752
 
1652
1753
    def test_nothingDiscovered(self):
1653
1754
        # a set of no things.
1899
2000
        ar.addExtension(ext)
1900
2001
        ext_args = ar.message.getArgs(ext.ns_uri)
1901
2002
        self.failUnlessEqual(ext.getExtensionArgs(), ext_args)
 
2003
 
 
2004
 
 
2005
 
 
2006
class TestKVPost(unittest.TestCase):
 
2007
    def setUp(self):
 
2008
        self.server_url = 'http://unittest/%s' % (self.id(),)
 
2009
 
 
2010
    def test_200(self):
 
2011
        from openid.fetchers import HTTPResponse
 
2012
        response = HTTPResponse()
 
2013
        response.status = 200
 
2014
        response.body = "foo:bar\nbaz:quux\n"
 
2015
        r = _httpResponseToMessage(response, self.server_url)
 
2016
        expected_msg = Message.fromOpenIDArgs({'foo':'bar','baz':'quux'})
 
2017
        self.failUnlessEqual(expected_msg, r)
 
2018
 
 
2019
 
 
2020
    def test_400(self):
 
2021
        response = HTTPResponse()
 
2022
        response.status = 400
 
2023
        response.body = "error:bonk\nerror_code:7\n"
 
2024
        try:
 
2025
            r = _httpResponseToMessage(response, self.server_url)
 
2026
        except ServerError, e:
 
2027
            self.failUnlessEqual(e.error_text, 'bonk')
 
2028
            self.failUnlessEqual(e.error_code, '7')
 
2029
        else:
 
2030
            self.fail("Expected ServerError, got return %r" % (r,))
 
2031
 
 
2032
 
 
2033
    def test_500(self):
 
2034
        # 500 as an example of any non-200, non-400 code.
 
2035
        response = HTTPResponse()
 
2036
        response.status = 500
 
2037
        response.body = "foo:bar\nbaz:quux\n"
 
2038
        self.failUnlessRaises(fetchers.HTTPFetchingError,
 
2039
                              _httpResponseToMessage, response,
 
2040
                              self.server_url)
 
2041
 
 
2042
 
 
2043
 
 
2044
 
1902
2045
if __name__ == '__main__':
1903
2046
    unittest.main()