25
from keystone.common import cms
26
from keystone.common import utils
21
27
from keystone.middleware import auth_token
22
28
from keystone.openstack.common import jsonutils
23
from keystone import config
29
from keystone.openstack.common import timeutils
24
30
from keystone import test
33
REVOCATION_LIST = None
35
REVOKED_TOKEN_HASH = None
36
SIGNED_REVOCATION_LIST = None
37
SIGNED_TOKEN_SCOPED = None
38
SIGNED_TOKEN_UNSCOPED = None
39
VALID_SIGNED_REVOCATION_LIST = None
41
UUID_TOKEN_DEFAULT = "ec6c0710ec2f471498484c1b53ab4f9d"
42
UUID_TOKEN_NO_SERVICE_CATALOG = '8286720fbe4941e69fa8241723bb02df'
43
UUID_TOKEN_UNSCOPED = '731f903721c14827be7b2dc912af7776'
44
VALID_DIABLO_TOKEN = 'b0cf19b55dbb4f20a6ee18e6c6cf1726'
46
INVALID_SIGNED_TOKEN = string.replace(
47
"""AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA
48
BBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBB
49
CCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCC
50
DDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDD
51
EEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEE
52
FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF
53
0000000000000000000000000000000000000000000000000000000000000000
54
1111111111111111111111111111111111111111111111111111111111111111
55
2222222222222222222222222222222222222222222222222222222222222222
56
3333333333333333333333333333333333333333333333333333333333333333
57
4444444444444444444444444444444444444444444444444444444444444444
58
5555555555555555555555555555555555555555555555555555555555555555
59
6666666666666666666666666666666666666666666666666666666666666666
60
7777777777777777777777777777777777777777777777777777777777777777
61
8888888888888888888888888888888888888888888888888888888888888888
62
9999999999999999999999999999999999999999999999999999999999999999
63
0000000000000000000000000000000000000000000000000000000000000000
27
66
# JSON responses keyed by token ID
28
67
TOKEN_RESPONSES = {
71
'id': UUID_TOKEN_DEFAULT,
34
73
'id': 'tenant_id1',
35
74
'name': 'tenant_name1',
141
# The data for these tests are signed using openssl and are stored in files
142
# in the signing subdirectory. In order to keep the values consistent between
143
# the tests and the signed documents, we read them in for use in the tests.
144
def setUpModule(self):
145
signing_path = os.path.join(os.path.dirname(__file__), 'signing')
146
with open(os.path.join(signing_path, 'auth_token_scoped.pem')) as f:
147
self.SIGNED_TOKEN_SCOPED = cms.cms_to_token(f.read())
148
with open(os.path.join(signing_path, 'auth_token_unscoped.pem')) as f:
149
self.SIGNED_TOKEN_UNSCOPED = cms.cms_to_token(f.read())
150
with open(os.path.join(signing_path, 'auth_token_revoked.pem')) as f:
151
self.REVOKED_TOKEN = cms.cms_to_token(f.read())
152
self.REVOKED_TOKEN_HASH = utils.hash_signed_token(self.REVOKED_TOKEN)
153
with open(os.path.join(signing_path, 'revocation_list.json')) as f:
154
self.REVOCATION_LIST = jsonutils.loads(f.read())
155
with open(os.path.join(signing_path, 'revocation_list.pem')) as f:
156
self.VALID_SIGNED_REVOCATION_LIST = jsonutils.dumps(
157
{'signed': f.read()})
159
self.TOKEN_RESPONSES[self.SIGNED_TOKEN_SCOPED] = {
162
'id': self.SIGNED_TOKEN_SCOPED,
166
'name': 'user_name1',
167
'tenantId': 'tenant_id1',
168
'tenantName': 'tenant_name1',
177
self.TOKEN_RESPONSES[self.SIGNED_TOKEN_UNSCOPED] = {
180
'id': self.SIGNED_TOKEN_UNSCOPED,
184
'name': 'user_name1',
119
194
class FakeMemcache(object):
120
195
def __init__(self):
121
196
self.set_key = None
251
347
'HTTP_X_TENANT_ID': 'tenant_id1',
252
348
'HTTP_X_TENANT_NAME': 'tenant_id1',
253
'HTTP_X_TENANT': 'tenant_id1', # now deprecated (diablo-compat)
349
# now deprecated (diablo-compat)
350
'HTTP_X_TENANT': 'tenant_id1',
255
352
super(DiabloAuthTokenMiddlewareTest, self).setUp(expected_env)
257
def test_diablo_response(self):
354
def test_valid_diablo_response(self):
258
355
req = webob.Request.blank('/')
259
req.headers['X-Auth-Token'] = 'valid-diablo-token'
260
body = self.middleware(req.environ, self.start_fake_response)
356
req.headers['X-Auth-Token'] = VALID_DIABLO_TOKEN
357
self.middleware(req.environ, self.start_fake_response)
261
358
self.assertEqual(self.response_status, 200)
262
self.assertEqual(body, ['SUCCESS'])
265
361
class AuthTokenMiddlewareTest(BaseAuthTokenMiddlewareTest):
266
def test_valid_request(self):
362
def assert_valid_request_200(self, token):
267
363
req = webob.Request.blank('/')
268
req.headers['X-Auth-Token'] = 'valid-token'
364
req.headers['X-Auth-Token'] = token
269
365
body = self.middleware(req.environ, self.start_fake_response)
366
self.assertEqual(self.response_status, 200)
367
self.assertTrue(req.headers.get('X-Service-Catalog'))
368
self.assertEqual(body, ['SUCCESS'])
370
def test_valid_uuid_request(self):
371
self.assert_valid_request_200(UUID_TOKEN_DEFAULT)
372
self.assertEqual("/testadmin/v2.0/tokens/%s" % UUID_TOKEN_DEFAULT,
373
FakeHTTPConnection.last_requested_url)
375
def test_valid_signed_request(self):
376
FakeHTTPConnection.last_requested_url = ''
377
self.assert_valid_request_200(SIGNED_TOKEN_SCOPED)
270
378
self.assertEqual(self.middleware.conf['auth_admin_prefix'],
272
self.assertEqual("/testadmin/v2.0/tokens/valid-token",
273
FakeHTTPConnection.last_requested_url)
274
self.assertEqual(self.response_status, 200)
275
self.assertTrue(req.headers.get('X-Service-Catalog'))
276
self.assertEqual(body, ['SUCCESS'])
380
#ensure that signed requests do not generate HTTP traffic
381
self.assertEqual('', FakeHTTPConnection.last_requested_url)
278
def test_default_tenant_token(self):
383
def assert_unscoped_default_tenant_auto_scopes(self, token):
279
384
"""Unscoped requests with a default tenant should "auto-scope."
281
386
The implied scope is the user's tenant ID.
284
389
req = webob.Request.blank('/')
285
req.headers['X-Auth-Token'] = 'default-tenant-token'
390
req.headers['X-Auth-Token'] = token
286
391
body = self.middleware(req.environ, self.start_fake_response)
287
392
self.assertEqual(self.response_status, 200)
288
393
self.assertEqual(body, ['SUCCESS'])
290
def test_unscoped_token(self):
395
def test_default_tenant_uuid_token(self):
396
self.assert_unscoped_default_tenant_auto_scopes(UUID_TOKEN_DEFAULT)
398
def test_default_tenant_signed_token(self):
399
self.assert_unscoped_default_tenant_auto_scopes(SIGNED_TOKEN_SCOPED)
401
def assert_unscoped_token_receives_401(self, token):
291
402
"""Unscoped requests with no default tenant ID should be rejected."""
292
403
req = webob.Request.blank('/')
293
req.headers['X-Auth-Token'] = 'unscoped-token'
404
req.headers['X-Auth-Token'] = token
294
405
self.middleware(req.environ, self.start_fake_response)
295
406
self.assertEqual(self.response_status, 401)
296
407
self.assertEqual(self.response_headers['WWW-Authenticate'],
297
408
'Keystone uri=\'https://keystone.example.com:1234\'')
299
def test_request_invalid_token(self):
410
def test_unscoped_uuid_token_receives_401(self):
411
self.assert_unscoped_token_receives_401(UUID_TOKEN_UNSCOPED)
413
def test_unscoped_pki_token_receives_401(self):
414
self.assert_unscoped_token_receives_401(SIGNED_TOKEN_UNSCOPED)
416
def test_revoked_token_receives_401(self):
417
self.middleware.token_revocation_list = self.get_revocation_list_json()
418
req = webob.Request.blank('/')
419
req.headers['X-Auth-Token'] = REVOKED_TOKEN
420
self.middleware(req.environ, self.start_fake_response)
421
self.assertEqual(self.response_status, 401)
423
def get_revocation_list_json(self, token_ids=None):
424
if token_ids is None:
425
token_ids = [REVOKED_TOKEN_HASH]
426
revocation_list = {'revoked': [{'id': x, 'expires': timeutils.utcnow()}
428
return jsonutils.dumps(revocation_list)
430
def test_is_signed_token_revoked_returns_false(self):
431
#explicitly setting an empty revocation list here to document intent
432
self.middleware.token_revocation_list = jsonutils.dumps(
433
{"revoked": [], "extra": "success"})
434
result = self.middleware.is_signed_token_revoked(REVOKED_TOKEN)
435
self.assertFalse(result)
437
def test_is_signed_token_revoked_returns_true(self):
438
self.middleware.token_revocation_list = self.get_revocation_list_json()
439
result = self.middleware.is_signed_token_revoked(REVOKED_TOKEN)
440
self.assertTrue(result)
442
def test_verify_signed_token_raises_exception_for_revoked_token(self):
443
self.middleware.token_revocation_list = self.get_revocation_list_json()
444
with self.assertRaises(auth_token.InvalidUserToken):
445
self.middleware.verify_signed_token(REVOKED_TOKEN)
447
def test_verify_signed_token_succeeds_for_unrevoked_token(self):
448
self.middleware.token_revocation_list = self.get_revocation_list_json()
449
self.middleware.verify_signed_token(SIGNED_TOKEN_SCOPED)
451
def test_get_token_revocation_list_fetched_time_returns_min(self):
452
self.middleware.token_revocation_list_fetched_time = None
453
self.middleware.revoked_file_name = ''
454
self.assertEqual(self.middleware.token_revocation_list_fetched_time,
455
datetime.datetime.min)
457
def test_get_token_revocation_list_fetched_time_returns_mtime(self):
458
self.middleware.token_revocation_list_fetched_time = None
459
mtime = os.path.getmtime(self.middleware.revoked_file_name)
460
fetched_time = datetime.datetime.fromtimestamp(mtime)
461
self.assertEqual(self.middleware.token_revocation_list_fetched_time,
464
def test_get_token_revocation_list_fetched_time_returns_value(self):
465
expected = self.middleware._token_revocation_list_fetched_time
466
self.assertEqual(self.middleware.token_revocation_list_fetched_time,
469
def test_get_revocation_list_returns_fetched_list(self):
470
self.middleware.token_revocation_list_fetched_time = None
471
os.remove(self.middleware.revoked_file_name)
472
self.assertEqual(self.middleware.token_revocation_list,
475
def test_get_revocation_list_returns_current_list_from_memory(self):
476
self.assertEqual(self.middleware.token_revocation_list,
477
self.middleware._token_revocation_list)
479
def test_get_revocation_list_returns_current_list_from_disk(self):
480
in_memory_list = self.middleware.token_revocation_list
481
self.middleware._token_revocation_list = None
482
self.assertEqual(self.middleware.token_revocation_list, in_memory_list)
484
def test_invalid_revocation_list_raises_service_error(self):
485
globals()['SIGNED_REVOCATION_LIST'] = "{}"
486
with self.assertRaises(auth_token.ServiceError):
487
self.middleware.fetch_revocation_list()
489
def test_fetch_revocation_list(self):
490
fetched_list = jsonutils.loads(self.middleware.fetch_revocation_list())
491
self.assertEqual(fetched_list, REVOCATION_LIST)
493
def test_request_invalid_uuid_token(self):
300
494
req = webob.Request.blank('/')
301
495
req.headers['X-Auth-Token'] = 'invalid-token'
302
496
self.middleware(req.environ, self.start_fake_response)