1
# vim: tabstop=4 shiftwidth=4 softtabstop=4
3
# Copyright 2012 OpenStack LLC
5
# Licensed under the Apache License, Version 2.0 (the "License"); you may
6
# not use this file except in compliance with the License. You may obtain
7
# a copy of the License at
9
# http://www.apache.org/licenses/LICENSE-2.0
11
# Unless required by applicable law or agreed to in writing, software
12
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
13
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
14
# License for the specific language governing permissions and limitations
25
from keystone.common import cms
26
from keystone.common import utils
27
from keystone.middleware import auth_token
28
from keystone.openstack.common import jsonutils
29
from keystone.openstack.common import timeutils
30
from keystone import test
33
REVOCATION_LIST = None
35
REVOKED_TOKEN_HASH = None
36
SIGNED_REVOCATION_LIST = None
37
SIGNED_TOKEN_SCOPED = None
38
SIGNED_TOKEN_UNSCOPED = None
39
SIGNED_TOKEN_SCOPED_KEY = None
40
SIGNED_TOKEN_UNSCOPED_KEY = None
42
VALID_SIGNED_REVOCATION_LIST = None
44
UUID_TOKEN_DEFAULT = "ec6c0710ec2f471498484c1b53ab4f9d"
45
UUID_TOKEN_NO_SERVICE_CATALOG = '8286720fbe4941e69fa8241723bb02df'
46
UUID_TOKEN_UNSCOPED = '731f903721c14827be7b2dc912af7776'
47
VALID_DIABLO_TOKEN = 'b0cf19b55dbb4f20a6ee18e6c6cf1726'
49
INVALID_SIGNED_TOKEN = string.replace(
50
"""AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA
51
BBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBB
52
CCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCC
53
DDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDD
54
EEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEE
55
FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF
56
0000000000000000000000000000000000000000000000000000000000000000
57
1111111111111111111111111111111111111111111111111111111111111111
58
2222222222222222222222222222222222222222222222222222222222222222
59
3333333333333333333333333333333333333333333333333333333333333333
60
4444444444444444444444444444444444444444444444444444444444444444
61
5555555555555555555555555555555555555555555555555555555555555555
62
6666666666666666666666666666666666666666666666666666666666666666
63
7777777777777777777777777777777777777777777777777777777777777777
64
8888888888888888888888888888888888888888888888888888888888888888
65
9999999999999999999999999999999999999999999999999999999999999999
66
0000000000000000000000000000000000000000000000000000000000000000
69
# JSON responses keyed by token ID
74
'id': UUID_TOKEN_DEFAULT,
75
'expires': '2999-01-01T00:00:10Z',
78
'name': 'tenant_name1',
95
'id': VALID_DIABLO_TOKEN,
96
'expires': '2999-01-01T00:00:10',
97
'tenantId': 'tenant_id1',
101
'name': 'user_name1',
109
UUID_TOKEN_UNSCOPED: {
112
'id': UUID_TOKEN_UNSCOPED,
113
'expires': '2999-01-01T00:00:10Z',
117
'name': 'user_name1',
125
UUID_TOKEN_NO_SERVICE_CATALOG: {
129
'expires': '2999-01-01T00:00:10Z',
132
'name': 'tenant_name1',
137
'name': 'user_name1',
147
FAKE_RESPONSE_STACK = []
150
# The data for these tests are signed using openssl and are stored in files
151
# in the signing subdirectory. In order to keep the values consistent between
152
# the tests and the signed documents, we read them in for use in the tests.
153
def setUpModule(self):
154
signing_path = os.path.join(os.path.dirname(__file__), 'signing')
155
with open(os.path.join(signing_path, 'auth_token_scoped.pem')) as f:
156
self.SIGNED_TOKEN_SCOPED = cms.cms_to_token(f.read())
157
with open(os.path.join(signing_path, 'auth_token_unscoped.pem')) as f:
158
self.SIGNED_TOKEN_UNSCOPED = cms.cms_to_token(f.read())
159
with open(os.path.join(signing_path, 'auth_token_revoked.pem')) as f:
160
self.REVOKED_TOKEN = cms.cms_to_token(f.read())
161
self.REVOKED_TOKEN_HASH = utils.hash_signed_token(self.REVOKED_TOKEN)
162
with open(os.path.join(signing_path, 'revocation_list.json')) as f:
163
self.REVOCATION_LIST = jsonutils.loads(f.read())
164
with open(os.path.join(signing_path, 'revocation_list.pem')) as f:
165
self.VALID_SIGNED_REVOCATION_LIST = jsonutils.dumps(
166
{'signed': f.read()})
167
self.SIGNED_TOKEN_SCOPED_KEY =\
168
cms.cms_hash_token(self.SIGNED_TOKEN_SCOPED)
169
self.SIGNED_TOKEN_UNSCOPED_KEY =\
170
cms.cms_hash_token(self.SIGNED_TOKEN_UNSCOPED)
172
self.TOKEN_RESPONSES[self.SIGNED_TOKEN_SCOPED_KEY] = {
175
'id': self.SIGNED_TOKEN_SCOPED_KEY,
179
'name': 'user_name1',
180
'tenantId': 'tenant_id1',
181
'tenantName': 'tenant_name1',
190
self.TOKEN_RESPONSES[SIGNED_TOKEN_UNSCOPED_KEY] = {
193
'id': SIGNED_TOKEN_UNSCOPED_KEY,
197
'name': 'user_name1',
207
class FakeMemcache(object):
210
self.set_value = None
211
self.token_expiration = None
214
data = TOKEN_RESPONSES[SIGNED_TOKEN_SCOPED_KEY].copy()
215
if not data or key != "tokens/%s" % (data['access']['token']['id']):
217
if not self.token_expiration:
218
dt = datetime.datetime.now() + datetime.timedelta(minutes=5)
219
self.token_expiration = dt.strftime("%s")
220
dt = datetime.datetime.now() + datetime.timedelta(hours=24)
221
ks_expires = dt.isoformat()
222
data['access']['token']['expires'] = ks_expires
223
return (data, str(self.token_expiration))
225
def set(self, key, value, time=None):
226
self.set_value = value
230
class FakeHTTPResponse(object):
231
def __init__(self, status, body):
239
class FakeStackHTTPConnection(object):
241
def __init__(self, *args, **kwargs):
244
def getresponse(self):
245
if len(FAKE_RESPONSE_STACK):
246
return FAKE_RESPONSE_STACK.pop()
247
return FakeHTTPResponse(500, jsonutils.dumps('UNEXPECTED RESPONSE'))
249
def request(self, *_args, **_kwargs):
256
class FakeHTTPConnection(object):
258
last_requested_url = ''
260
def __init__(self, *args):
261
self.send_valid_revocation_list = True
263
def request(self, method, path, **kwargs):
264
"""Fakes out several http responses.
266
If a POST request is made, we assume the calling code is trying
267
to get a new admin token.
269
If a GET request is made to validate a token, return success
270
if the token is 'token1'. If a different token is provided, return
271
a 404, indicating an unknown (therefore unauthorized) token.
274
FakeHTTPConnection.last_requested_url = path
277
body = jsonutils.dumps({
279
'token': {'id': 'admin_token2'},
284
token_id = path.rsplit('/', 1)[1]
285
if token_id in TOKEN_RESPONSES.keys():
287
body = jsonutils.dumps(TOKEN_RESPONSES[token_id])
288
elif token_id == "revoked":
290
body = SIGNED_REVOCATION_LIST
295
self.resp = FakeHTTPResponse(status, body)
297
def getresponse(self):
304
class FakeApp(object):
305
"""This represents a WSGI app protected by the auth_token middleware."""
306
def __init__(self, expected_env=None):
307
expected_env = expected_env or {}
308
self.expected_env = {
309
'HTTP_X_IDENTITY_STATUS': 'Confirmed',
310
'HTTP_X_TENANT_ID': 'tenant_id1',
311
'HTTP_X_TENANT_NAME': 'tenant_name1',
312
'HTTP_X_USER_ID': 'user_id1',
313
'HTTP_X_USER_NAME': 'user_name1',
314
'HTTP_X_ROLES': 'role1,role2',
315
'HTTP_X_USER': 'user_name1', # deprecated (diablo-compat)
316
'HTTP_X_TENANT': 'tenant_name1', # deprecated (diablo-compat)
317
'HTTP_X_ROLE': 'role1,role2', # deprecated (diablo-compat)
319
self.expected_env.update(expected_env)
321
def __call__(self, env, start_response):
322
for k, v in self.expected_env.items():
323
assert env[k] == v, '%s != %s' % (env[k], v)
325
resp = webob.Response()
326
resp.body = 'SUCCESS'
327
return resp(env, start_response)
330
class BaseAuthTokenMiddlewareTest(test.TestCase):
332
def setUp(self, expected_env=None):
333
expected_env = expected_env or {}
336
'admin_token': 'admin_token1',
337
'auth_host': 'keystone.example.com',
339
'auth_admin_prefix': '/testadmin',
340
'signing_dir': 'signing',
343
self.middleware = auth_token.AuthProtocol(FakeApp(expected_env), conf)
344
self.middleware.http_client_class = FakeHTTPConnection
345
self.middleware._iso8601 = iso8601
347
self.response_status = None
348
self.response_headers = None
349
self.middleware.revoked_file_name = tempfile.mkstemp()[1]
350
self.middleware.token_revocation_list_cache_timeout =\
351
datetime.timedelta(days=1)
352
self.middleware.token_revocation_list = jsonutils.dumps(
353
{"revoked": [], "extra": "success"})
355
globals()['SIGNED_REVOCATION_LIST'] =\
356
globals()['VALID_SIGNED_REVOCATION_LIST']
358
super(BaseAuthTokenMiddlewareTest, self).setUp()
361
super(BaseAuthTokenMiddlewareTest, self).tearDown()
363
os.remove(self.middleware.revoked_file_name)
367
def start_fake_response(self, status, headers):
368
self.response_status = int(status.split(' ', 1)[0])
369
self.response_headers = dict(headers)
372
class StackResponseAuthTokenMiddlewareTest(BaseAuthTokenMiddlewareTest):
373
"""Auth Token middleware test setup that allows the tests to define
374
a stack of responses to HTTP requests in the test and get those
375
responses back in sequence for testing.
379
resp1 = FakeHTTPResponse(401, jsonutils.dumps(''))
380
resp2 = FakeHTTPResponse(200, jsonutils.dumps({
382
'token': {'id': 'admin_token2'},
385
FAKE_RESPONSE_STACK.append(resp1)
386
FAKE_RESPONSE_STACK.append(resp2)
388
... do your testing code here ...
392
def setUp(self, expected_env=None):
393
super(StackResponseAuthTokenMiddlewareTest, self).setUp(expected_env)
394
self.middleware.http_client_class = FakeStackHTTPConnection
396
def test_fetch_revocation_list_with_expire(self):
397
# first response to revocation list should return 401 Unauthorized
398
# to pretend to be an expired token
399
resp1 = FakeHTTPResponse(200, jsonutils.dumps({
401
'token': {'id': 'admin_token2'},
404
resp2 = FakeHTTPResponse(401, jsonutils.dumps(''))
405
resp3 = FakeHTTPResponse(200, jsonutils.dumps({
407
'token': {'id': 'admin_token2'},
410
resp4 = FakeHTTPResponse(200, SIGNED_REVOCATION_LIST)
412
# first get_admin_token() call
413
FAKE_RESPONSE_STACK.append(resp1)
414
# request revocation list, get "unauthorized" due to simulated expired
416
FAKE_RESPONSE_STACK.append(resp2)
417
# request a new admin_token
418
FAKE_RESPONSE_STACK.append(resp3)
419
# request revocation list, get the revocation list properly
420
FAKE_RESPONSE_STACK.append(resp4)
422
fetched_list = jsonutils.loads(self.middleware.fetch_revocation_list())
423
self.assertEqual(fetched_list, REVOCATION_LIST)
426
class DiabloAuthTokenMiddlewareTest(BaseAuthTokenMiddlewareTest):
427
"""Auth Token middleware should understand Diablo keystone responses."""
429
# pre-diablo only had Tenant ID, which was also the Name
431
'HTTP_X_TENANT_ID': 'tenant_id1',
432
'HTTP_X_TENANT_NAME': 'tenant_id1',
433
# now deprecated (diablo-compat)
434
'HTTP_X_TENANT': 'tenant_id1',
436
super(DiabloAuthTokenMiddlewareTest, self).setUp(expected_env)
438
def test_valid_diablo_response(self):
439
req = webob.Request.blank('/')
440
req.headers['X-Auth-Token'] = VALID_DIABLO_TOKEN
441
self.middleware(req.environ, self.start_fake_response)
442
self.assertEqual(self.response_status, 200)
445
class AuthTokenMiddlewareTest(BaseAuthTokenMiddlewareTest):
446
def assert_valid_request_200(self, token):
447
req = webob.Request.blank('/')
448
req.headers['X-Auth-Token'] = token
449
body = self.middleware(req.environ, self.start_fake_response)
450
self.assertEqual(self.response_status, 200)
451
self.assertTrue(req.headers.get('X-Service-Catalog'))
452
self.assertEqual(body, ['SUCCESS'])
454
def test_valid_uuid_request(self):
455
self.assert_valid_request_200(UUID_TOKEN_DEFAULT)
456
self.assertEqual("/testadmin/v2.0/tokens/%s" % UUID_TOKEN_DEFAULT,
457
FakeHTTPConnection.last_requested_url)
459
def test_valid_signed_request(self):
460
FakeHTTPConnection.last_requested_url = ''
461
self.assert_valid_request_200(SIGNED_TOKEN_SCOPED)
462
self.assertEqual(self.middleware.conf['auth_admin_prefix'],
464
#ensure that signed requests do not generate HTTP traffic
465
self.assertEqual('', FakeHTTPConnection.last_requested_url)
467
def assert_unscoped_default_tenant_auto_scopes(self, token):
468
"""Unscoped requests with a default tenant should "auto-scope."
470
The implied scope is the user's tenant ID.
473
req = webob.Request.blank('/')
474
req.headers['X-Auth-Token'] = token
475
body = self.middleware(req.environ, self.start_fake_response)
476
self.assertEqual(self.response_status, 200)
477
self.assertEqual(body, ['SUCCESS'])
479
def test_default_tenant_uuid_token(self):
480
self.assert_unscoped_default_tenant_auto_scopes(UUID_TOKEN_DEFAULT)
482
def test_default_tenant_signed_token(self):
483
self.assert_unscoped_default_tenant_auto_scopes(SIGNED_TOKEN_SCOPED)
485
def assert_unscoped_token_receives_401(self, token):
486
"""Unscoped requests with no default tenant ID should be rejected."""
487
req = webob.Request.blank('/')
488
req.headers['X-Auth-Token'] = token
489
self.middleware(req.environ, self.start_fake_response)
490
self.assertEqual(self.response_status, 401)
491
self.assertEqual(self.response_headers['WWW-Authenticate'],
492
'Keystone uri=\'https://keystone.example.com:1234\'')
494
def test_unscoped_uuid_token_receives_401(self):
495
self.assert_unscoped_token_receives_401(UUID_TOKEN_UNSCOPED)
497
def test_unscoped_pki_token_receives_401(self):
498
self.assert_unscoped_token_receives_401(SIGNED_TOKEN_UNSCOPED)
500
def test_revoked_token_receives_401(self):
501
self.middleware.token_revocation_list = self.get_revocation_list_json()
502
req = webob.Request.blank('/')
503
req.headers['X-Auth-Token'] = REVOKED_TOKEN
504
self.middleware(req.environ, self.start_fake_response)
505
self.assertEqual(self.response_status, 401)
507
def get_revocation_list_json(self, token_ids=None):
508
if token_ids is None:
509
token_ids = [REVOKED_TOKEN_HASH]
510
revocation_list = {'revoked': [{'id': x, 'expires': timeutils.utcnow()}
512
return jsonutils.dumps(revocation_list)
514
def test_is_signed_token_revoked_returns_false(self):
515
#explicitly setting an empty revocation list here to document intent
516
self.middleware.token_revocation_list = jsonutils.dumps(
517
{"revoked": [], "extra": "success"})
518
result = self.middleware.is_signed_token_revoked(REVOKED_TOKEN)
519
self.assertFalse(result)
521
def test_is_signed_token_revoked_returns_true(self):
522
self.middleware.token_revocation_list = self.get_revocation_list_json()
523
result = self.middleware.is_signed_token_revoked(REVOKED_TOKEN)
524
self.assertTrue(result)
526
def test_verify_signed_token_raises_exception_for_revoked_token(self):
527
self.middleware.token_revocation_list = self.get_revocation_list_json()
528
with self.assertRaises(auth_token.InvalidUserToken):
529
self.middleware.verify_signed_token(REVOKED_TOKEN)
531
def test_verify_signed_token_succeeds_for_unrevoked_token(self):
532
self.middleware.token_revocation_list = self.get_revocation_list_json()
533
self.middleware.verify_signed_token(SIGNED_TOKEN_SCOPED)
535
def test_get_token_revocation_list_fetched_time_returns_min(self):
536
self.middleware.token_revocation_list_fetched_time = None
537
self.middleware.revoked_file_name = ''
538
self.assertEqual(self.middleware.token_revocation_list_fetched_time,
539
datetime.datetime.min)
541
def test_get_token_revocation_list_fetched_time_returns_mtime(self):
542
self.middleware.token_revocation_list_fetched_time = None
543
mtime = os.path.getmtime(self.middleware.revoked_file_name)
544
fetched_time = datetime.datetime.fromtimestamp(mtime)
545
self.assertEqual(self.middleware.token_revocation_list_fetched_time,
548
def test_get_token_revocation_list_fetched_time_returns_value(self):
549
expected = self.middleware._token_revocation_list_fetched_time
550
self.assertEqual(self.middleware.token_revocation_list_fetched_time,
553
def test_get_revocation_list_returns_fetched_list(self):
554
self.middleware.token_revocation_list_fetched_time = None
555
os.remove(self.middleware.revoked_file_name)
556
self.assertEqual(self.middleware.token_revocation_list,
559
def test_get_revocation_list_returns_current_list_from_memory(self):
560
self.assertEqual(self.middleware.token_revocation_list,
561
self.middleware._token_revocation_list)
563
def test_get_revocation_list_returns_current_list_from_disk(self):
564
in_memory_list = self.middleware.token_revocation_list
565
self.middleware._token_revocation_list = None
566
self.assertEqual(self.middleware.token_revocation_list, in_memory_list)
568
def test_invalid_revocation_list_raises_service_error(self):
569
globals()['SIGNED_REVOCATION_LIST'] = "{}"
570
with self.assertRaises(auth_token.ServiceError):
571
self.middleware.fetch_revocation_list()
573
def test_fetch_revocation_list(self):
574
fetched_list = jsonutils.loads(self.middleware.fetch_revocation_list())
575
self.assertEqual(fetched_list, REVOCATION_LIST)
577
def test_request_invalid_uuid_token(self):
578
req = webob.Request.blank('/')
579
req.headers['X-Auth-Token'] = 'invalid-token'
580
self.middleware(req.environ, self.start_fake_response)
581
self.assertEqual(self.response_status, 401)
582
self.assertEqual(self.response_headers['WWW-Authenticate'],
583
'Keystone uri=\'https://keystone.example.com:1234\'')
585
def test_request_invalid_signed_token(self):
586
req = webob.Request.blank('/')
587
req.headers['X-Auth-Token'] = INVALID_SIGNED_TOKEN
588
self.middleware(req.environ, self.start_fake_response)
589
self.assertEqual(self.response_status, 401)
590
self.assertEqual(self.response_headers['WWW-Authenticate'],
591
'Keystone uri=\'https://keystone.example.com:1234\'')
593
def test_request_no_token(self):
594
req = webob.Request.blank('/')
595
self.middleware(req.environ, self.start_fake_response)
596
self.assertEqual(self.response_status, 401)
597
self.assertEqual(self.response_headers['WWW-Authenticate'],
598
'Keystone uri=\'https://keystone.example.com:1234\'')
600
def test_request_blank_token(self):
601
req = webob.Request.blank('/')
602
req.headers['X-Auth-Token'] = ''
603
self.middleware(req.environ, self.start_fake_response)
604
self.assertEqual(self.response_status, 401)
605
self.assertEqual(self.response_headers['WWW-Authenticate'],
606
'Keystone uri=\'https://keystone.example.com:1234\'')
608
def test_memcache(self):
609
req = webob.Request.blank('/')
610
req.headers['X-Auth-Token'] = SIGNED_TOKEN_SCOPED
611
self.middleware._cache = FakeMemcache()
612
self.middleware(req.environ, self.start_fake_response)
613
self.assertEqual(self.middleware._cache.set_value, None)
615
def test_memcache_set_invalid(self):
616
req = webob.Request.blank('/')
617
req.headers['X-Auth-Token'] = 'invalid-token'
618
self.middleware._cache = FakeMemcache()
619
self.middleware(req.environ, self.start_fake_response)
620
self.assertEqual(self.middleware._cache.set_value, "invalid")
622
def test_memcache_set_expired(self):
623
req = webob.Request.blank('/')
624
req.headers['X-Auth-Token'] = SIGNED_TOKEN_SCOPED
625
self.middleware._cache = FakeMemcache()
626
expired = datetime.datetime.now() - datetime.timedelta(minutes=1)
627
self.middleware._cache.token_expiration = float(expired.strftime("%s"))
628
self.middleware(req.environ, self.start_fake_response)
629
self.assertEqual(len(self.middleware._cache.set_value), 2)
631
def test_nomemcache(self):
632
self.disable_module('memcache')
635
'admin_token': 'admin_token1',
636
'auth_host': 'keystone.example.com',
638
'memcache_servers': 'localhost:11211',
641
auth_token.AuthProtocol(FakeApp(), conf)
643
def test_request_prevent_service_catalog_injection(self):
644
req = webob.Request.blank('/')
645
req.headers['X-Service-Catalog'] = '[]'
646
req.headers['X-Auth-Token'] = UUID_TOKEN_NO_SERVICE_CATALOG
647
body = self.middleware(req.environ, self.start_fake_response)
648
self.assertEqual(self.response_status, 200)
649
self.assertFalse(req.headers.get('X-Service-Catalog'))
650
self.assertEqual(body, ['SUCCESS'])
652
def test_will_expire_soon(self):
653
tenseconds = datetime.datetime.utcnow() + datetime.timedelta(
655
self.assertTrue(auth_token.will_expire_soon(tenseconds))
656
fortyseconds = datetime.datetime.utcnow() + datetime.timedelta(
658
self.assertFalse(auth_token.will_expire_soon(fortyseconds))