61
62
SREG_NS = "http://openid.net/sreg/1.0"
62
63
AX_NS = "http://openid.net/srv/ax/1.0"
65
@override_session_serializer
67
OPENID_USE_EMAIL_FOR_USERNAME=False,
68
OPENID_LAUNCHPAD_TEAMS_REQUIRED=[],
69
OPENID_LAUNCHPAD_TEAMS_MAPPING_AUTO=False,
70
OPENID_EMAIL_WHITELIST_REGEXP_LIST=[])
71
class OpenIDBackendTests(TestCase):
74
super(OpenIDBackendTests, self).setUp()
75
self.backend = OpenIDBackend()
77
def make_openid_response(self, sreg_args=None, teams_args=None):
78
endpoint = OpenIDServiceEndpoint()
79
endpoint.claimed_id = 'some-id'
80
endpoint.server_url = 'http://example.com/'
82
message = Message(OPENID2_NS)
83
if sreg_args is not None:
84
for key, value in sreg_args.items():
85
message.setArg(SREG_NS, key, value)
86
if teams_args is not None:
87
for key, value in teams_args.items():
88
message.setArg(TEAMS_NS, key, value)
89
response = SuccessResponse(
90
endpoint, message, signed_fields=message.toPostArgs().keys())
94
self, schema="http://axschema.org/",
95
fullname="Some User", nickname="someuser", email="foo@example.com",
96
first=None, last=None, verified=False):
97
endpoint = OpenIDServiceEndpoint()
98
endpoint.claimed_id = 'some-id'
99
endpoint.server_url = 'http://example.com/'
101
message = Message(OPENID2_NS)
64
SERVER_URL = 'http://example.com'
67
def make_claimed_id(id_):
68
return urljoin(SERVER_URL, id_)
71
class TestMessage(Message):
72
"""Convenience class to construct test OpenID messages and responses."""
74
def __init__(self, openid_namespace=OPENID2_NS):
75
super(TestMessage, self).__init__(openid_namespace=openid_namespace)
76
endpoint = OpenIDServiceEndpoint()
77
endpoint.claimed_id = make_claimed_id('some-id')
78
endpoint.server_url = SERVER_URL
79
self.endpoint = endpoint
83
email="foo@example.com",
88
schema="http://axschema.org/",
103
92
("nickname", schema + "namePerson/friendly", nickname),
104
93
("fullname", schema + "namePerson", fullname),
114
103
attributes.append(
115
104
("last", "http://axschema.org/namePerson/last", last))
117
message.setArg(AX_NS, "mode", "fetch_response")
106
self.setArg(AX_NS, "mode", "fetch_response")
118
107
for (alias, uri, value) in attributes:
119
message.setArg(AX_NS, "type.%s" % alias, uri)
120
message.setArg(AX_NS, "value.%s" % alias, value)
108
self.setArg(AX_NS, "type.%s" % alias, uri)
109
self.setArg(AX_NS, "value.%s" % alias, value)
111
def set_pape_args(self, *auth_policies):
112
self.setArg(pape.ns_uri, 'auth_policies', ' '.join(auth_policies))
114
def set_sreg_args(self, **kwargs):
115
for key, value in kwargs.items():
116
self.setArg(SREG_NS, key, value)
118
def set_team_args(self, **kwargs):
119
for key, value in kwargs.items():
120
self.setArg(TEAMS_NS, key, value)
122
def to_response(self):
121
123
return SuccessResponse(
122
endpoint, message, signed_fields=message.toPostArgs().keys())
124
def make_user_openid(self, user=None,
125
claimed_id='http://example.com/existing_identity',
126
display_id='http://example.com/existing_identity'):
124
self.endpoint, self, signed_fields=self.toPostArgs().keys())
127
@override_session_serializer
129
OPENID_USE_EMAIL_FOR_USERNAME=False,
130
OPENID_LAUNCHPAD_TEAMS_REQUIRED=[],
131
OPENID_LAUNCHPAD_TEAMS_MAPPING_AUTO=False,
132
OPENID_EMAIL_WHITELIST_REGEXP_LIST=[])
133
class OpenIDBackendTests(TestCase):
136
super(OpenIDBackendTests, self).setUp()
137
self.backend = OpenIDBackend()
138
self.message = TestMessage()
140
def make_user_openid(
141
self, user=None, claimed_id=make_claimed_id('existing_identity')):
128
143
user = User.objects.create_user(
129
144
username='someuser', email='someuser@example.com',
130
145
password='12345678')
132
147
user_openid, created = UserOpenID.objects.get_or_create(
133
user=user, claimed_id=claimed_id, display_id=display_id)
148
user=user, claimed_id=claimed_id, display_id=claimed_id)
134
149
return user_openid
136
151
def _assert_account_verified(self, user, expected):
170
185
expected['last_name']),
171
186
'email': expected['email'],
173
response = self.make_openid_response(sreg_args=data)
188
self.message.set_sreg_args(**data)
175
details = self.backend._extract_user_details(response)
190
details = self.backend._extract_user_details(
191
self.message.to_response())
176
192
self.assertEqual(details, expected)
178
194
def test_extract_user_details_ax(self):
179
response = self.make_response_ax(
180
fullname="Some User", nickname="someuser", email="foo@example.com")
182
data = self.backend._extract_user_details(response)
195
self.message.set_ax_args(
196
email="foo@example.com",
197
fullname="Some User",
200
data = self.backend._extract_user_details(self.message.to_response())
184
202
self.assertEqual(data, {"nickname": "someuser",
185
203
"first_name": "Some",
247
264
self.assert_account_not_verified(user)
249
266
# get a response including verification status
250
response = self.make_response_ax()
267
self.message.set_ax_args()
251
268
data = dict(first_name=u"Some56789012345678901234567890123",
252
269
last_name=u"User56789012345678901234567890123",
253
270
email=u"someotheruser@example.com",
254
271
account_verified=verified)
255
self.backend.update_user_details(user, data, response)
272
self.backend.update_user_details(
273
user, data, self.message.to_response())
257
275
# refresh object from the database
258
276
user = User.objects.get(pk=user.pk)
283
301
initially_verified=False, verified=False)
285
303
def test_extract_user_details_name_with_trailing_space(self):
286
response = self.make_response_ax(fullname="SomeUser ")
304
self.message.set_ax_args(fullname="SomeUser ")
288
data = self.backend._extract_user_details(response)
306
data = self.backend._extract_user_details(self.message.to_response())
290
308
self.assertEqual("", data['first_name'])
291
309
self.assertEqual("SomeUser", data['last_name'])
293
311
def test_extract_user_details_name_with_thin_space(self):
294
response = self.make_response_ax(fullname=u"Some\u2009User")
312
self.message.set_ax_args(fullname=u"Some\u2009User")
296
data = self.backend._extract_user_details(response)
314
data = self.backend._extract_user_details(self.message.to_response())
298
316
self.assertEqual("Some", data['first_name'])
299
317
self.assertEqual("User", data['last_name'])
356
374
def test_authenticate_when_member_of_teams_required(self):
357
375
Group.objects.create(name='team')
359
response = self.make_openid_response(
360
sreg_args=dict(nickname='someuser'),
361
teams_args=dict(is_member='foo,team'))
362
user = self.backend.authenticate(openid_response=response)
377
self.message.set_sreg_args(nickname='someuser')
378
self.message.set_team_args(is_member='foo,team')
379
user = self.backend.authenticate(
380
openid_response=self.message.to_response())
364
382
self.assertIsNotNone(user)
366
384
@override_settings(OPENID_LAUNCHPAD_TEAMS_REQUIRED=[])
367
385
def test_authenticate_when_no_teams_required(self):
368
response = self.make_openid_response(
369
sreg_args=dict(nickname='someuser'),
370
teams_args=dict(is_member='team'))
371
user = self.backend.authenticate(openid_response=response)
386
self.message.set_sreg_args(nickname='someuser')
387
self.message.set_team_args(is_member='team')
388
user = self.backend.authenticate(
389
openid_response=self.message.to_response())
373
391
self.assertIsNotNone(user)
378
396
def test_authenticate_when_member_of_at_least_one_team(self):
379
397
Group.objects.create(name='team1')
381
response = self.make_openid_response(
382
sreg_args=dict(nickname='someuser'),
383
teams_args=dict(is_member='foo,team1'))
384
user = self.backend.authenticate(openid_response=response)
399
self.message.set_sreg_args(nickname='someuser')
400
self.message.set_team_args(is_member='foo,team1')
401
user = self.backend.authenticate(
402
openid_response=self.message.to_response())
386
404
self.assertIsNotNone(user)
394
412
assert Group.objects.filter(name='team').count() == 0
396
response = self.make_openid_response(
397
sreg_args=dict(nickname='someuser', email='foo@foo.com'),
398
teams_args=dict(is_member='foo'))
399
user = self.backend.authenticate(openid_response=response)
414
self.message.set_sreg_args(
415
nickname='someuser', email='foo@foo.com')
416
user = self.backend.authenticate(
417
openid_response=self.message.to_response())
401
419
self.assertIsNotNone(user)
403
response = self.make_openid_response(
404
sreg_args=dict(nickname='someuser', email='foo+bar@foo.com'),
405
teams_args=dict(is_member='foo'))
406
user = self.backend.authenticate(openid_response=response)
421
self.message.set_sreg_args(
422
nickname='someuser', email='foo+bar@foo.com')
423
user = self.backend.authenticate(
424
openid_response=self.message.to_response())
408
426
self.assertIsNotNone(user)
460
477
@override_settings(OPENID_CREATE_USERS=False)
461
478
def test_auth_no_create_users(self):
462
response = self.make_openid_response(
463
sreg_args=dict(email='bar@foo.com'),
464
teams_args=dict(is_member='foo'))
465
user = self.backend.authenticate(openid_response=response)
479
self.message.set_sreg_args()
480
user = self.backend.authenticate(
481
openid_response=self.message.to_response())
467
483
self.assertIsNone(user)
468
484
self.assert_no_users_created()
470
486
@override_settings(OPENID_CREATE_USERS=False)
471
487
def test_auth_no_create_users_existing_user(self):
472
response = self.make_openid_response(
473
sreg_args=dict(email='bar@foo.com'),
474
teams_args=dict(is_member='foo'))
488
self.message.set_sreg_args()
475
489
existing_openid = self.make_user_openid(
476
claimed_id=response.identity_url)
490
claimed_id=self.message.endpoint.claimed_id)
477
491
expected_user_count = User.objects.count()
479
user = self.backend.authenticate(openid_response=response)
492
user = self.backend.authenticate(
493
openid_response=self.message.to_response())
481
495
self.assertIsNotNone(user)
482
496
self.assertEqual(user, existing_openid.user)
536
verified_response = self.make_response_ax(**kwargs)
551
self.message.set_ax_args(**kwargs)
537
552
verified_user = self.backend.authenticate(
538
openid_response=verified_response)
553
openid_response=self.message.to_response())
540
555
self.assert_account_verified(verified_user)
541
556
expected_user_count = User.objects.count()
543
558
kwargs['verified'] = False
544
unverified_response = self.make_response_ax(**kwargs)
559
self.message.set_ax_args(**kwargs)
545
560
unverified_user = self.backend.authenticate(
546
openid_response=unverified_response)
561
openid_response=self.message.to_response())
548
563
self.assertEqual(verified_user, unverified_user)
549
564
self.assert_account_not_verified(unverified_user)
552
567
@override_settings(OPENID_PHYSICAL_MULTIFACTOR_REQUIRED=True)
553
568
def test_physical_multifactor_required_not_given(self):
554
response = self.make_openid_response()
569
response = self.message.to_response()
556
571
with self.assertRaises(MissingPhysicalMultiFactor):
557
572
self.backend.authenticate(openid_response=response)
560
UserOpenID.objects.filter(claimed_id='some-id').exists(),
575
UserOpenID.objects.filter(
576
claimed_id=self.message.endpoint.claimed_id).exists(),
561
577
'User must be created anyways.')
563
579
@override_settings(OPENID_PHYSICAL_MULTIFACTOR_REQUIRED=True)
564
580
def test_physical_multifactor_required_invalid_auth_policy(self):
565
response = self.make_openid_response()
566
message = response.message
568
pape.ns_uri, 'auth_policies',
569
pape.AUTH_MULTI_FACTOR + ' ' + pape.AUTH_PHISHING_RESISTANT)
570
response = SuccessResponse(
571
response.endpoint, message,
572
signed_fields=message.toPostArgs().keys())
581
self.message.set_pape_args(
582
pape.AUTH_MULTI_FACTOR, pape.AUTH_PHISHING_RESISTANT)
574
584
with self.assertRaises(MissingPhysicalMultiFactor):
575
self.backend.authenticate(openid_response=response)
585
self.backend.authenticate(
586
openid_response=self.message.to_response())
578
UserOpenID.objects.filter(claimed_id='some-id').exists(),
589
UserOpenID.objects.filter(
590
claimed_id=self.message.endpoint.claimed_id).exists(),
579
591
'User must be created anyways.')
581
593
@override_settings(OPENID_PHYSICAL_MULTIFACTOR_REQUIRED=True)
582
594
def test_physical_multifactor_required_valid_auth_policy(self):
583
response = self.make_openid_response()
584
message = response.message
586
pape.ns_uri, 'auth_policies',
587
pape.AUTH_MULTI_FACTOR_PHYSICAL)
588
response = SuccessResponse(
589
response.endpoint, message,
590
signed_fields=message.toPostArgs().keys())
592
user = self.backend.authenticate(openid_response=response)
595
self.message.set_pape_args(
596
pape.AUTH_MULTI_FACTOR, pape.AUTH_MULTI_FACTOR_PHYSICAL,
597
pape.AUTH_PHISHING_RESISTANT)
598
user = self.backend.authenticate(
599
openid_response=self.message.to_response())
594
601
self.assertIsNotNone(user)
596
603
@override_settings(OPENID_STRICT_USERNAMES=True)
597
604
def test_auth_strict_usernames(self):
598
605
username = 'nickname'
599
response = self.make_openid_response(
600
sreg_args=dict(nickname=username, email='bar@foo.com'),
601
teams_args=dict(is_member='foo'))
602
user = self.backend.authenticate(openid_response=response)
606
self.message.set_sreg_args(nickname=username)
607
user = self.backend.authenticate(
608
openid_response=self.message.to_response())
604
610
self.assertIsNotNone(user, 'User must be created')
605
611
self.assertEqual(user.username, username)
607
613
@override_settings(OPENID_STRICT_USERNAMES=True)
608
614
def test_auth_strict_usernames_no_nickname(self):
609
response = self.make_openid_response(
610
sreg_args=dict(nickname='', email='bar@foo.com'),
611
teams_args=dict(is_member='foo'))
615
self.message.set_sreg_args(nickname='')
614
618
"An attribute required for logging in was not returned (nickname)")
616
620
with self.assertRaisesRegexp(RequiredAttributeNotReturned, msg):
617
self.backend.authenticate(openid_response=response)
621
self.backend.authenticate(
622
openid_response=self.message.to_response())
619
624
self.assert_no_users_created()
624
629
def test_auth_strict_usernames_conflict(self):
625
630
existing_openid = self.make_user_openid()
626
631
expected_user_count = User.objects.count()
628
response = self.make_openid_response(
630
nickname=existing_openid.user.username, email='bar@foo.com'),
631
teams_args=dict(is_member='foo'))
632
self.message.set_sreg_args(
633
nickname=existing_openid.user.username)
633
635
with self.assertRaises(DuplicateUsernameViolation):
634
self.backend.authenticate(openid_response=response)
636
self.backend.authenticate(
637
openid_response=self.message.to_response())
636
639
self.assert_no_users_created(expected_count=expected_user_count)
641
644
OPENID_UPDATE_DETAILS_FROM_SREG=True)
642
645
def test_auth_follow_renames(self):
643
646
new_username = 'new'
644
original_response = self.make_openid_response(
645
sreg_args=dict(nickname='username', email='bar@foo.com'),
646
teams_args=dict(is_member='foo'))
647
rename_response = self.make_openid_response(
648
sreg_args=dict(nickname=new_username, email='bar@foo.com'),
649
teams_args=dict(is_member='foo'))
650
user = self.backend.authenticate(openid_response=original_response)
647
self.message.set_sreg_args(nickname='username')
648
user = self.backend.authenticate(
649
openid_response=self.message.to_response())
651
650
expected_user_count = User.objects.count()
653
652
self.assertIsNotNone(user, 'User must be created')
654
self.message.set_sreg_args(nickname=new_username)
655
655
renamed_user = self.backend.authenticate(
656
openid_response=rename_response)
656
openid_response=self.message.to_response())
658
658
self.assertEqual(user.pk, renamed_user.pk)
659
659
self.assertEqual(renamed_user.username, new_username)
664
664
OPENID_STRICT_USERNAMES=True,
665
665
OPENID_UPDATE_DETAILS_FROM_SREG=True)
666
666
def test_auth_follow_renames_strict_usernames_no_nickname(self):
667
response = self.make_openid_response(
668
sreg_args=dict(nickname='nickame', email='bar@foo.com'),
669
teams_args=dict(is_member='foo'))
670
user = self.backend.authenticate(openid_response=response)
667
self.message.set_sreg_args(nickname='nickame')
668
user = self.backend.authenticate(
669
openid_response=self.message.to_response())
671
670
expected_user_count = User.objects.count()
673
672
self.assertIsNotNone(user, 'User must be created')
675
response = self.make_openid_response(
676
sreg_args=dict(nickname='', email='bar@foo.com'),
677
teams_args=dict(is_member='foo'))
674
self.message.set_sreg_args(nickname='')
679
676
# XXX: Check possibilities to normalize this error into a
680
677
# `RequiredAttributeNotReturned`.
681
678
with self.assertRaises(MissingUsernameViolation):
682
self.backend.authenticate(openid_response=response)
679
self.backend.authenticate(
680
openid_response=self.message.to_response())
684
682
self.assert_no_users_created(expected_count=expected_user_count)
690
688
def test_auth_follow_renames_strict_usernames_rename_conflict(self):
691
689
existing_openid = self.make_user_openid()
692
690
original_username = 'nickame'
693
good_response = self.make_openid_response(
694
sreg_args=dict(nickname=original_username, email='bar@foo.com'),
695
teams_args=dict(is_member='foo'))
696
conflict_response = self.make_openid_response(
698
nickname=existing_openid.user.username, email='bar@foo.com'),
699
teams_args=dict(is_member='foo'))
700
user = self.backend.authenticate(openid_response=good_response)
691
self.message.set_sreg_args(nickname=original_username)
692
user = self.backend.authenticate(
693
openid_response=self.message.to_response())
701
694
expected_user_count = User.objects.count()
703
696
self.assertIsNotNone(user, 'First request should succeed')
698
self.message.set_sreg_args(
699
nickname=existing_openid.user.username)
705
701
with self.assertRaises(DuplicateUsernameViolation):
706
self.backend.authenticate(openid_response=conflict_response)
702
self.backend.authenticate(
703
openid_response=self.message.to_response())
708
705
db_user = User.objects.get(pk=user.pk)
709
706
self.assertEqual(db_user.username, original_username)
726
723
duplicate_username_handler, sender=User, weak=False,
727
724
dispatch_uid='testing')
729
response = self.make_openid_response(
731
nickname=existing_openid.user.username, email='bar@foo.com'),
732
teams_args=dict(is_member='foo'))
726
self.message.set_sreg_args(
727
nickname=existing_openid.user.username)
734
729
with self.assertRaises(DuplicateUsernameViolation):
735
self.backend.authenticate(openid_response=response)
730
self.backend.authenticate(
731
openid_response=self.message.to_response())
737
733
self.assertIn('username', signal_kwargs)
738
734
self.assertEqual(
756
752
duplicate_username_handler, sender=User, weak=False,
757
753
dispatch_uid='testing')
759
response = self.make_openid_response(
761
nickname=existing_openid.user.username, email='bar@foo.com'),
762
teams_args=dict(is_member='foo'))
763
user = self.backend.authenticate(openid_response=response)
755
self.message.set_sreg_args(
756
nickname=existing_openid.user.username)
757
user = self.backend.authenticate(
758
openid_response=self.message.to_response())
765
760
self.assertIsNotNone(user)
766
761
self.assertNotEqual(user, existing_openid.user)