1
# vim: tabstop=4 shiftwidth=4 softtabstop=4
3
# Copyright 2012 OpenStack LLC
5
# Licensed under the Apache License, Version 2.0 (the "License"); you may
6
# not use this file except in compliance with the License. You may obtain
7
# a copy of the License at
9
# http://www.apache.org/licenses/LICENSE-2.0
11
# Unless required by applicable law or agreed to in writing, software
12
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
13
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
14
# License for the specific language governing permissions and limitations
30
from keystoneclient.common import cms
31
from keystoneclient.middleware import auth_token
32
from keystoneclient.openstack.common import jsonutils
33
from keystoneclient.openstack.common import memorycache
34
from keystoneclient.openstack.common import timeutils
36
import client_fixtures
38
SIGNED_REVOCATION_LIST = None
39
VALID_SIGNED_REVOCATION_LIST = client_fixtures.SIGNED_REVOCATION_LIST
41
EXPECTED_V2_DEFAULT_ENV_RESPONSE = {
42
'HTTP_X_IDENTITY_STATUS': 'Confirmed',
43
'HTTP_X_TENANT_ID': 'tenant_id1',
44
'HTTP_X_TENANT_NAME': 'tenant_name1',
45
'HTTP_X_USER_ID': 'user_id1',
46
'HTTP_X_USER_NAME': 'user_name1',
47
'HTTP_X_ROLES': 'role1,role2',
48
'HTTP_X_USER': 'user_name1', # deprecated (diablo-compat)
49
'HTTP_X_TENANT': 'tenant_name1', # deprecated (diablo-compat)
50
'HTTP_X_ROLE': 'role1,role2', # deprecated (diablo-compat)
53
FAKE_RESPONSE_STACK = []
61
"updated": "2013-03-06T00:00:00Z",
67
"updated": "2011-11-19T00:00:00Z",
80
"updated": "2011-11-19T00:00:00Z",
88
class NoModuleFinder(object):
89
"""Disallow further imports of 'module'."""
91
def __init__(self, module):
94
def find_module(self, fullname, path):
95
if fullname == self.module or fullname.startswith(self.module + '.'):
99
class DisableModuleFixture(fixtures.Fixture):
100
"""A fixture to provide support for unloading/disabling modules."""
102
def __init__(self, module, *args, **kw):
103
super(DisableModuleFixture, self).__init__(*args, **kw)
106
self._cleared_modules = {}
109
super(DisableModuleFixture, self).tearDown()
110
for finder in self._finders:
111
sys.meta_path.remove(finder)
112
sys.modules.update(self._cleared_modules)
114
def clear_module(self):
116
for fullname in sys.modules.keys():
117
if (fullname == self.module or
118
fullname.startswith(self.module + '.')):
119
cleared_modules[fullname] = sys.modules.pop(fullname)
120
return cleared_modules
123
"""Ensure ImportError for the specified module."""
125
super(DisableModuleFixture, self).setUp()
127
# Clear 'module' references in sys.modules
128
self._cleared_modules.update(self.clear_module())
130
finder = NoModuleFinder(self.module)
131
self._finders.append(finder)
132
sys.meta_path.insert(0, finder)
135
class FakeSwiftOldMemcacheClient(memorycache.Client):
136
# NOTE(vish,chmou): old swift memcache uses param timeout instead of time
137
def set(self, key, value, timeout=0, min_compress_len=0):
138
sup = super(FakeSwiftOldMemcacheClient, self)
139
sup.set(key, value, timeout, min_compress_len)
142
class FakeHTTPResponse(object):
143
def __init__(self, status, body):
151
class BaseFakeHTTPConnection(object):
153
def _user_token_responses(self, token_id):
154
"""Emulate user token responses.
156
Return success if the token is in the list we know
157
about. If the request is for revoked tokens, then return
158
the revoked list, else if a different token is provided,
159
return 404 indicating an unknown (therefore unauthorized) token.
162
if token_id in client_fixtures.JSON_TOKEN_RESPONSES.keys():
164
body = client_fixtures.JSON_TOKEN_RESPONSES[token_id]
165
elif token_id == "revoked":
167
body = SIGNED_REVOCATION_LIST
173
def fake_v2_responses(self, path):
174
token_id = path.rsplit('/', 1)[1]
175
return self._user_token_responses(token_id)
177
def fake_v3_responses(self, path, **kwargs):
178
headers = kwargs.get('headers')
179
token_id = headers['X-Subject-Token']
180
return self._user_token_responses(token_id)
182
def fake_v2_admin_token(self, path):
184
body = jsonutils.dumps({
186
'token': {'id': 'admin_token2',
187
'expires': '2022-10-03T16:58:01Z'}
193
class CertificateHTTPConnection(BaseFakeHTTPConnection):
195
signing_cert_data = 'SIGNING CERT'
196
ca_cert_data = 'SIGNING CA'
198
def __init__(self, *args, **kwargs):
201
def request(self, method, path, **kwargs):
202
CertificateHTTPConnection.last_requested_url = path
204
if method == 'GET' and path == '/testadmin/v2.0/certificates/signing':
205
self.response = FakeHTTPResponse(200, self.signing_cert_data)
206
elif method == 'GET' and path == '/testadmin/v2.0/certificates/ca':
207
self.response = FakeHTTPResponse(200, self.ca_cert_data)
209
self.response = FakeHTTPResponse(404, '')
211
def getresponse(self):
218
class FakeHTTPConnection(BaseFakeHTTPConnection):
219
"""Emulate a fake Keystone v2 server."""
221
def __init__(self, *args, **kwargs):
222
self.send_valid_revocation_list = True
225
def request(self, method, path, **kwargs):
226
"""Fakes out several http responses.
228
Support the following requests:
230
- Create admin token ('POST /testadmin/v2.0/tokens')
231
- Get versions ('GET /testadmin/')
232
- Get v2 user token responses (see fake_v2_responses)
235
FakeHTTPConnection.last_requested_url = path
236
if method == 'POST' and path == '/testadmin/v2.0/tokens':
237
status, body = self.fake_v2_admin_token(path)
239
if path == '/testadmin/':
240
# It's a GET versions call
242
body = jsonutils.dumps(VERSION_LIST_v2)
244
status, body = self.fake_v2_responses(path)
246
self.resp = FakeHTTPResponse(status, body)
248
def getresponse(self):
249
# If self.resp is set then this is just the response to
250
# the earlier request. If it is not set, then we expect
251
# a stack of responses to have been pre-prepared
255
if len(FAKE_RESPONSE_STACK):
256
return FAKE_RESPONSE_STACK.pop()
257
return FakeHTTPResponse(
258
500, jsonutils.dumps('UNEXPECTED RESPONSE'))
264
class v3FakeHTTPConnection(FakeHTTPConnection):
265
"""Emulate a fake Keystone v3 server."""
267
def request(self, method, path, **kwargs):
268
"""Fakes out several http responses.
270
Support the following requests:
272
- Create admin token ('POST /testadmin/v2.0/tokens')
273
- Get versions ('GET /testadmin/')
274
- Get v2 user token responses (see fake_v2_responses)
275
- Get v3 user token responses (see fake_v3_responses)
278
v3FakeHTTPConnection.last_requested_url = path
279
if method == 'POST' and path == '/testadmin/v2.0/tokens':
280
status, body = self.fake_v2_admin_token(path)
282
if path == '/testadmin/':
283
# It's a GET versions call
285
body = jsonutils.dumps(VERSION_LIST_v3)
286
elif path.split('/')[2] == 'v2.0':
287
status, body = self.fake_v2_responses(path)
289
status, body = self.fake_v3_responses(path, **kwargs)
291
self.resp = FakeHTTPResponse(status, body)
294
class RaisingHTTPConnection(FakeHTTPConnection):
295
"""An HTTPConnection that always raises."""
297
def request(self, method, path, **kwargs):
298
raise AssertionError("HTTP request was called.")
301
class RaisingHTTPNetworkError(FakeHTTPConnection):
302
"""An HTTPConnection that always raises network error."""
304
def request(self, method, path, **kwargs):
305
raise auth_token.NetworkError("Network connection error.")
308
class FakeApp(object):
309
"""This represents a WSGI app protected by the auth_token middleware."""
310
def __init__(self, expected_env=None):
311
expected_env = expected_env or {}
312
self.expected_env = dict(EXPECTED_V2_DEFAULT_ENV_RESPONSE)
313
self.expected_env.update(expected_env)
315
def __call__(self, env, start_response):
316
for k, v in self.expected_env.items():
317
assert env[k] == v, '%s != %s' % (env[k], v)
319
resp = webob.Response()
320
resp.body = 'SUCCESS'
321
return resp(env, start_response)
324
class v3FakeApp(object):
325
"""This represents a v3 WSGI app protected by the auth_token middleware."""
326
def __init__(self, expected_env=None):
327
expected_env = expected_env or {}
328
# We should always get back the same v2 items
329
self.expected_env = dict(EXPECTED_V2_DEFAULT_ENV_RESPONSE)
330
# ...and with v3 additions, these are for the DEFAULT TOKEN
331
v3_default_env_additions = {
332
'HTTP_X_PROJECT_ID': 'tenant_id1',
333
'HTTP_X_PROJECT_NAME': 'tenant_name1',
334
'HTTP_X_PROJECT_DOMAIN_ID': 'domain_id1',
335
'HTTP_X_PROJECT_DOMAIN_NAME': 'domain_name1',
336
'HTTP_X_USER_DOMAIN_ID': 'domain_id1',
337
'HTTP_X_USER_DOMAIN_NAME': 'domain_name1'
339
self.expected_env.update(v3_default_env_additions)
340
# And finally update for anything passed in
341
self.expected_env.update(expected_env)
343
def __call__(self, env, start_response):
344
for k, v in self.expected_env.items():
345
assert env[k] == v, '%s != %s' % (env[k], v)
346
resp = webob.Response()
347
resp.body = 'SUCCESS'
348
return resp(env, start_response)
351
class BaseAuthTokenMiddlewareTest(testtools.TestCase):
352
"""Base test class for auth_token middleware.
354
All the tests allow for running with auth_token
355
configured for receiving v2 or v3 tokens, with the
356
choice being made by passing configuration data into
359
The base class will, by default, run all the tests
360
expecting v2 token formats. Child classes can override
361
this to specify, for instance, v3 format.
364
def setUp(self, expected_env=None, auth_version=None,
365
fake_app=None, fake_http=None, token_dict=None):
366
testtools.TestCase.setUp(self)
367
expected_env = expected_env or {}
370
self.token_dict = token_dict
373
'uuid_token_default': client_fixtures.UUID_TOKEN_DEFAULT,
374
'uuid_token_unscoped': client_fixtures.UUID_TOKEN_UNSCOPED,
375
'signed_token_scoped': client_fixtures.SIGNED_TOKEN_SCOPED,
376
'signed_token_scoped_expired':
377
client_fixtures.SIGNED_TOKEN_SCOPED_EXPIRED,
378
'revoked_token': client_fixtures.REVOKED_TOKEN,
379
'revoked_token_hash': client_fixtures.REVOKED_TOKEN_HASH
383
'auth_host': 'keystone.example.com',
385
'auth_admin_prefix': '/testadmin',
386
'signing_dir': client_fixtures.CERTDIR,
387
'auth_version': auth_version
390
# Base assumes v2 for fake app and http, can be overridden for
391
# child classes by called set_middleware() directly
392
self.fake_app = fake_app or FakeApp
393
self.fake_http = fake_http or FakeHTTPConnection
394
self.set_middleware(self.fake_app, self.fake_http,
395
expected_env, self.conf)
396
# self.middleware.verify_signed_token will call
397
# _ensure_subprocess, but we need
398
# cms.subprocess.CalledProcessError to be imported first.
399
# Explicitly call _ensure_subprocess to make sure the import
400
# happens before we need it regardless of test order.
401
cms._ensure_subprocess()
403
self.response_status = None
404
self.response_headers = None
406
signed_list = 'SIGNED_REVOCATION_LIST'
407
valid_signed_list = 'VALID_SIGNED_REVOCATION_LIST'
408
globals()[signed_list] = globals()[valid_signed_list]
410
def set_fake_http(self, http_handler):
411
"""Configure the http handler for the auth_token middleware.
413
Allows tests to override the default handler on specific tests,
414
e.g. to use v2 for those parts of auth_token that still use v2
415
tokens while running the v3 test class, i.e. getting an admin
416
token or revocation list.
419
self.middleware.http_client_class = http_handler
421
def set_middleware(self, fake_app=None, fake_http=None,
422
expected_env=None, conf=None):
423
"""Configure the class ready to call the auth_token middleware.
425
Set up the various fake items needed to run the middleware.
426
Individual tests that need to further refine these can call this
427
function to override the class defaults.
430
conf = conf or self.conf
432
conf['http_handler'] = fake_http
433
fake_app = fake_app or self.fake_app
434
self.middleware = auth_token.AuthProtocol(fake_app(expected_env), conf)
435
self.middleware._iso8601 = iso8601
436
self.middleware.revoked_file_name = tempfile.mkstemp()[1]
437
self.middleware.token_revocation_list = jsonutils.dumps(
438
{"revoked": [], "extra": "success"})
441
testtools.TestCase.tearDown(self)
443
os.remove(self.middleware.revoked_file_name)
447
def start_fake_response(self, status, headers):
448
self.response_status = int(status.split(' ', 1)[0])
449
self.response_headers = dict(headers)
452
if tuple(sys.version_info)[0:2] < (2, 7):
454
# 2.6 doesn't have the assert dict equals so make sure that it exists
455
class AdjustedBaseAuthTokenMiddlewareTest(BaseAuthTokenMiddlewareTest):
456
def assertIsInstance(self, obj, cls, msg=None):
457
"""Same as self.assertTrue(isinstance(obj, cls)), with a nicer
460
if not isinstance(obj, cls):
461
standardMsg = '%s is not an instance of %r' % (obj, cls)
462
self.fail(self._formatMessage(msg, standardMsg))
464
def assertDictEqual(self, d1, d2, msg=None):
465
# Simple version taken from 2.7
466
self.assertIsInstance(d1, dict,
467
'First argument is not a dictionary')
468
self.assertIsInstance(d2, dict,
469
'Second argument is not a dictionary')
474
standardMsg = '%r != %r' % (d1, d2)
475
self.fail(standardMsg)
477
BaseAuthTokenMiddlewareTest = AdjustedBaseAuthTokenMiddlewareTest
480
class StackResponseAuthTokenMiddlewareTest(BaseAuthTokenMiddlewareTest):
481
"""Auth Token middleware test setup that allows the tests to define
482
a stack of responses to HTTP requests in the test and get those
483
responses back in sequence for testing.
487
resp1 = FakeHTTPResponse(401, jsonutils.dumps(''))
488
resp2 = FakeHTTPResponse(200, jsonutils.dumps({
490
'token': {'id': 'admin_token2'},
493
FAKE_RESPONSE_STACK.append(resp1)
494
FAKE_RESPONSE_STACK.append(resp2)
496
... do your testing code here ...
501
super(StackResponseAuthTokenMiddlewareTest, self).setUp()
503
def test_fetch_revocation_list_with_expire(self):
504
# first response to revocation list should return 401 Unauthorized
505
# to pretend to be an expired token
506
resp1 = FakeHTTPResponse(200, jsonutils.dumps({
508
'token': {'id': 'admin_token2'},
511
resp2 = FakeHTTPResponse(401, jsonutils.dumps(''))
512
resp3 = FakeHTTPResponse(200, jsonutils.dumps({
514
'token': {'id': 'admin_token2'},
517
resp4 = FakeHTTPResponse(200, SIGNED_REVOCATION_LIST)
519
# first get_admin_token() call
520
FAKE_RESPONSE_STACK.append(resp1)
521
# request revocation list, get "unauthorized" due to simulated expired
523
FAKE_RESPONSE_STACK.append(resp2)
524
# request a new admin_token
525
FAKE_RESPONSE_STACK.append(resp3)
526
# request revocation list, get the revocation list properly
527
FAKE_RESPONSE_STACK.append(resp4)
529
fetched_list = jsonutils.loads(self.middleware.fetch_revocation_list())
530
self.assertEqual(fetched_list, client_fixtures.REVOCATION_LIST)
533
class DiabloAuthTokenMiddlewareTest(BaseAuthTokenMiddlewareTest):
534
"""Auth Token middleware should understand Diablo keystone responses."""
536
# pre-diablo only had Tenant ID, which was also the Name
538
'HTTP_X_TENANT_ID': 'tenant_id1',
539
'HTTP_X_TENANT_NAME': 'tenant_id1',
540
# now deprecated (diablo-compat)
541
'HTTP_X_TENANT': 'tenant_id1',
543
super(DiabloAuthTokenMiddlewareTest, self).setUp(
544
expected_env=expected_env)
546
def test_valid_diablo_response(self):
547
req = webob.Request.blank('/')
548
req.headers['X-Auth-Token'] = client_fixtures.VALID_DIABLO_TOKEN
549
self.middleware(req.environ, self.start_fake_response)
550
self.assertEqual(self.response_status, 200)
551
self.assertTrue('keystone.token_info' in req.environ)
554
class NoMemcacheAuthToken(BaseAuthTokenMiddlewareTest):
557
super(NoMemcacheAuthToken, self).setUp()
558
self.useFixture(DisableModuleFixture('memcache'))
560
def test_nomemcache(self):
562
'admin_token': 'admin_token1',
563
'auth_host': 'keystone.example.com',
565
'memcached_servers': 'localhost:11211',
568
auth_token.AuthProtocol(FakeApp(), conf)
570
def test_not_use_cache_from_env(self):
571
env = {'swift.cache': 'CACHE_TEST'}
573
'auth_host': 'keystone.example.com',
575
'auth_admin_prefix': '/testadmin',
576
'memcached_servers': 'localhost:11211'
578
self.set_middleware(conf=conf)
579
self.middleware._init_cache(env)
580
self.assertNotEqual(self.middleware._cache, 'CACHE_TEST')
583
class AuthTokenMiddlewareTest(BaseAuthTokenMiddlewareTest):
585
def test_init_does_not_call_http(self):
587
'auth_host': 'keystone.example.com',
589
'revocation_cache_time': 1
591
self.set_fake_http(RaisingHTTPConnection)
592
self.set_middleware(conf=conf, fake_http=RaisingHTTPConnection)
594
def assert_valid_last_url(self, token_id):
595
# Default version (v2) has id in the token, override this
596
# method for v3 and other versions
597
self.assertEqual("/testadmin/v2.0/tokens/%s" % token_id,
598
self.middleware.http_client_class.last_requested_url)
600
def assert_valid_request_200(self, token, with_catalog=True):
601
req = webob.Request.blank('/')
602
req.headers['X-Auth-Token'] = token
603
body = self.middleware(req.environ, self.start_fake_response)
604
self.assertEqual(self.response_status, 200)
606
self.assertTrue(req.headers.get('X-Service-Catalog'))
607
self.assertEqual(body, ['SUCCESS'])
608
self.assertTrue('keystone.token_info' in req.environ)
610
def test_valid_uuid_request(self):
611
self.assert_valid_request_200(self.token_dict['uuid_token_default'])
612
self.assert_valid_last_url(self.token_dict['uuid_token_default'])
614
def test_valid_signed_request(self):
615
self.middleware.http_client_class.last_requested_url = ''
616
self.assert_valid_request_200(
617
self.token_dict['signed_token_scoped'])
618
self.assertEqual(self.middleware.conf['auth_admin_prefix'],
620
#ensure that signed requests do not generate HTTP traffic
622
'', self.middleware.http_client_class.last_requested_url)
624
def test_revoked_token_receives_401(self):
625
self.middleware.token_revocation_list = self.get_revocation_list_json()
626
req = webob.Request.blank('/')
627
req.headers['X-Auth-Token'] = self.token_dict['revoked_token']
628
self.middleware(req.environ, self.start_fake_response)
629
self.assertEqual(self.response_status, 401)
631
def get_revocation_list_json(self, token_ids=None):
632
if token_ids is None:
633
token_ids = [self.token_dict['revoked_token_hash']]
634
revocation_list = {'revoked': [{'id': x, 'expires': timeutils.utcnow()}
636
return jsonutils.dumps(revocation_list)
638
def test_is_signed_token_revoked_returns_false(self):
639
#explicitly setting an empty revocation list here to document intent
640
self.middleware.token_revocation_list = jsonutils.dumps(
641
{"revoked": [], "extra": "success"})
642
result = self.middleware.is_signed_token_revoked(
643
self.token_dict['revoked_token'])
644
self.assertFalse(result)
646
def test_is_signed_token_revoked_returns_true(self):
647
self.middleware.token_revocation_list = self.get_revocation_list_json()
648
result = self.middleware.is_signed_token_revoked(
649
self.token_dict['revoked_token'])
650
self.assertTrue(result)
652
def test_verify_signed_token_raises_exception_for_revoked_token(self):
653
self.middleware.token_revocation_list = self.get_revocation_list_json()
654
self.assertRaises(auth_token.InvalidUserToken,
655
self.middleware.verify_signed_token,
656
self.token_dict['revoked_token'])
658
def test_verify_signed_token_succeeds_for_unrevoked_token(self):
659
self.middleware.token_revocation_list = self.get_revocation_list_json()
660
self.middleware.verify_signed_token(
661
self.token_dict['signed_token_scoped'])
663
def test_verify_signing_dir_create_while_missing(self):
664
tmp_name = uuid.uuid4().hex
665
test_parent_signing_dir = "/tmp/%s" % tmp_name
666
self.middleware.signing_dirname = "/tmp/%s/%s" % ((tmp_name,) * 2)
667
self.middleware.signing_cert_file_name = "%s/test.pem" %\
668
self.middleware.signing_dirname
669
self.middleware.verify_signing_dir()
670
# NOTE(wu_wenxiang): Verify if the signing dir was created as expected.
671
self.assertTrue(os.path.isdir(self.middleware.signing_dirname))
672
self.assertTrue(os.access(self.middleware.signing_dirname, os.W_OK))
673
self.assertEqual(os.stat(self.middleware.signing_dirname).st_uid,
676
stat.S_IMODE(os.stat(self.middleware.signing_dirname).st_mode),
678
shutil.rmtree(test_parent_signing_dir)
680
def test_cert_file_missing(self):
681
self.assertFalse(self.middleware.cert_file_missing(
682
"openstack: /tmp/haystack: No such file or directory",
684
self.assertTrue(self.middleware.cert_file_missing(
685
"openstack: /not/exist: No such file or directory",
688
def test_get_token_revocation_list_fetched_time_returns_min(self):
689
self.middleware.token_revocation_list_fetched_time = None
690
self.middleware.revoked_file_name = ''
691
self.assertEqual(self.middleware.token_revocation_list_fetched_time,
692
datetime.datetime.min)
694
def test_get_token_revocation_list_fetched_time_returns_mtime(self):
695
self.middleware.token_revocation_list_fetched_time = None
696
mtime = os.path.getmtime(self.middleware.revoked_file_name)
697
fetched_time = datetime.datetime.fromtimestamp(mtime)
698
self.assertEqual(self.middleware.token_revocation_list_fetched_time,
701
def test_get_token_revocation_list_fetched_time_returns_value(self):
702
expected = self.middleware._token_revocation_list_fetched_time
703
self.assertEqual(self.middleware.token_revocation_list_fetched_time,
706
def test_get_revocation_list_returns_fetched_list(self):
707
# auth_token uses v2 to fetch this, so don't allow the v3
708
# tests to override the fake http connection
709
self.set_fake_http(FakeHTTPConnection)
710
self.middleware.token_revocation_list_fetched_time = None
711
os.remove(self.middleware.revoked_file_name)
712
self.assertEqual(self.middleware.token_revocation_list,
713
client_fixtures.REVOCATION_LIST)
715
def test_get_revocation_list_returns_current_list_from_memory(self):
716
self.assertEqual(self.middleware.token_revocation_list,
717
self.middleware._token_revocation_list)
719
def test_get_revocation_list_returns_current_list_from_disk(self):
720
in_memory_list = self.middleware.token_revocation_list
721
self.middleware._token_revocation_list = None
722
self.assertEqual(self.middleware.token_revocation_list, in_memory_list)
724
def test_invalid_revocation_list_raises_service_error(self):
725
globals()['SIGNED_REVOCATION_LIST'] = "{}"
726
self.assertRaises(auth_token.ServiceError,
727
self.middleware.fetch_revocation_list)
729
def test_fetch_revocation_list(self):
730
# auth_token uses v2 to fetch this, so don't allow the v3
731
# tests to override the fake http connection
732
self.set_fake_http(FakeHTTPConnection)
733
fetched_list = jsonutils.loads(self.middleware.fetch_revocation_list())
734
self.assertEqual(fetched_list, client_fixtures.REVOCATION_LIST)
736
def test_request_invalid_uuid_token(self):
737
req = webob.Request.blank('/')
738
req.headers['X-Auth-Token'] = 'invalid-token'
739
self.middleware(req.environ, self.start_fake_response)
740
self.assertEqual(self.response_status, 401)
741
self.assertEqual(self.response_headers['WWW-Authenticate'],
742
"Keystone uri='https://keystone.example.com:1234'")
744
def test_request_invalid_signed_token(self):
745
req = webob.Request.blank('/')
746
req.headers['X-Auth-Token'] = client_fixtures.INVALID_SIGNED_TOKEN
747
self.middleware(req.environ, self.start_fake_response)
748
self.assertEqual(self.response_status, 401)
749
self.assertEqual(self.response_headers['WWW-Authenticate'],
750
"Keystone uri='https://keystone.example.com:1234'")
752
def test_request_no_token(self):
753
req = webob.Request.blank('/')
754
self.middleware(req.environ, self.start_fake_response)
755
self.assertEqual(self.response_status, 401)
756
self.assertEqual(self.response_headers['WWW-Authenticate'],
757
"Keystone uri='https://keystone.example.com:1234'")
759
def test_request_no_token_log_message(self):
760
class FakeLog(object):
765
def warn(self, msg=None, *args, **kwargs):
768
def debug(self, msg=None, *args, **kwargs):
771
self.middleware.LOG = FakeLog()
772
self.middleware.delay_auth_decision = False
773
self.assertRaises(auth_token.InvalidUserToken,
774
self.middleware._get_user_token_from_header, {})
775
self.assertIsNotNone(self.middleware.LOG.msg)
776
self.assertIsNotNone(self.middleware.LOG.debugmsg)
778
def test_request_no_token_http(self):
779
req = webob.Request.blank('/', environ={'REQUEST_METHOD': 'HEAD'})
780
self.set_middleware()
781
body = self.middleware(req.environ, self.start_fake_response)
782
self.assertEqual(self.response_status, 401)
783
self.assertEqual(self.response_headers['WWW-Authenticate'],
784
"Keystone uri='https://keystone.example.com:1234'")
785
self.assertEqual(body, [''])
787
def test_request_blank_token(self):
788
req = webob.Request.blank('/')
789
req.headers['X-Auth-Token'] = ''
790
self.middleware(req.environ, self.start_fake_response)
791
self.assertEqual(self.response_status, 401)
792
self.assertEqual(self.response_headers['WWW-Authenticate'],
793
"Keystone uri='https://keystone.example.com:1234'")
795
def _get_cached_token(self, token):
796
token_id = cms.cms_hash_token(token)
797
# NOTE(vish): example tokens are expired so skip the expiration check.
798
return self.middleware._cache_get(token_id, ignore_expires=True)
800
def test_memcache(self):
801
req = webob.Request.blank('/')
802
token = self.token_dict['signed_token_scoped']
803
req.headers['X-Auth-Token'] = token
804
self.middleware(req.environ, self.start_fake_response)
805
self.assertNotEqual(self._get_cached_token(token), None)
807
def test_expired(self):
808
req = webob.Request.blank('/')
809
token = self.token_dict['signed_token_scoped_expired']
810
req.headers['X-Auth-Token'] = token
811
self.middleware(req.environ, self.start_fake_response)
812
self.assertEqual(self.response_status, 401)
814
def test_memcache_set_invalid(self):
815
req = webob.Request.blank('/')
816
token = 'invalid-token'
817
req.headers['X-Auth-Token'] = token
818
self.middleware(req.environ, self.start_fake_response)
819
self.assertRaises(auth_token.InvalidUserToken,
820
self._get_cached_token, token)
822
def test_memcache_set_expired(self, extra_conf={}, extra_environ={}):
823
token_cache_time = 10
825
'token_cache_time': token_cache_time,
826
'signing_dir': client_fixtures.CERTDIR,
828
conf.update(extra_conf)
829
self.set_middleware(conf=conf)
830
req = webob.Request.blank('/')
831
token = self.token_dict['signed_token_scoped']
832
req.headers['X-Auth-Token'] = token
833
req.environ.update(extra_environ)
835
now = datetime.datetime.utcnow()
836
timeutils.set_time_override(now)
837
self.middleware(req.environ, self.start_fake_response)
838
self.assertNotEqual(self._get_cached_token(token), None)
839
expired = now + datetime.timedelta(seconds=token_cache_time)
840
timeutils.set_time_override(expired)
841
self.assertEqual(self._get_cached_token(token), None)
843
timeutils.clear_time_override()
845
def test_old_swift_memcache_set_expired(self):
846
extra_conf = {'cache': 'swift.cache'}
847
extra_environ = {'swift.cache': FakeSwiftOldMemcacheClient()}
848
self.test_memcache_set_expired(extra_conf, extra_environ)
850
def test_swift_memcache_set_expired(self):
851
extra_conf = {'cache': 'swift.cache'}
852
extra_environ = {'swift.cache': memorycache.Client()}
853
self.test_memcache_set_expired(extra_conf, extra_environ)
855
def test_use_cache_from_env(self):
856
env = {'swift.cache': 'CACHE_TEST'}
858
'auth_host': 'keystone.example.com',
860
'auth_admin_prefix': '/testadmin',
861
'cache': 'swift.cache',
862
'memcached_servers': ['localhost:11211']
864
self.set_middleware(conf=conf)
865
self.middleware._init_cache(env)
866
self.assertEqual(self.middleware._cache, 'CACHE_TEST')
868
def test_will_expire_soon(self):
869
tenseconds = datetime.datetime.utcnow() + datetime.timedelta(
871
self.assertTrue(auth_token.will_expire_soon(tenseconds))
872
fortyseconds = datetime.datetime.utcnow() + datetime.timedelta(
874
self.assertFalse(auth_token.will_expire_soon(fortyseconds))
876
def test_encrypt_cache_data(self):
878
'auth_host': 'keystone.example.com',
880
'auth_admin_prefix': '/testadmin',
881
'memcached_servers': ['localhost:11211'],
882
'memcache_security_strategy': 'encrypt',
883
'memcache_secret_key': 'mysecret'
885
self.set_middleware(conf=conf)
887
data = ('this_data', 10e100)
888
self.middleware._init_cache({})
889
self.middleware._cache_store(token, data)
890
self.assertEqual(self.middleware._cache_get(token), data[0])
892
def test_sign_cache_data(self):
894
'auth_host': 'keystone.example.com',
896
'auth_admin_prefix': '/testadmin',
897
'memcached_servers': ['localhost:11211'],
898
'memcache_security_strategy': 'mac',
899
'memcache_secret_key': 'mysecret'
901
self.set_middleware(conf=conf)
903
data = ('this_data', 10e100)
904
self.middleware._init_cache({})
905
self.middleware._cache_store(token, data)
906
self.assertEqual(self.middleware._cache_get(token), data[0])
908
def test_no_memcache_protection(self):
910
'auth_host': 'keystone.example.com',
912
'auth_admin_prefix': '/testadmin',
913
'memcached_servers': ['localhost:11211'],
914
'memcache_secret_key': 'mysecret'
916
self.set_middleware(conf=conf)
918
data = ('this_data', 10e100)
919
self.middleware._init_cache({})
920
self.middleware._cache_store(token, data)
921
self.assertEqual(self.middleware._cache_get(token), data[0])
923
def test_assert_valid_memcache_protection_config(self):
924
# test missing memcache_secret_key
926
'auth_host': 'keystone.example.com',
928
'auth_admin_prefix': '/testadmin',
929
'memcached_servers': ['localhost:11211'],
930
'memcache_security_strategy': 'Encrypt'
932
self.assertRaises(Exception, self.set_middleware, conf)
933
# test invalue memcache_security_strategy
935
'auth_host': 'keystone.example.com',
937
'auth_admin_prefix': '/testadmin',
938
'memcached_servers': ['localhost:11211'],
939
'memcache_security_strategy': 'whatever'
941
self.assertRaises(Exception, self.set_middleware, conf)
942
# test missing memcache_secret_key
944
'auth_host': 'keystone.example.com',
946
'auth_admin_prefix': '/testadmin',
947
'memcached_servers': ['localhost:11211'],
948
'memcache_security_strategy': 'mac'
950
self.assertRaises(Exception, self.set_middleware, conf)
952
'auth_host': 'keystone.example.com',
954
'auth_admin_prefix': '/testadmin',
955
'memcached_servers': ['localhost:11211'],
956
'memcache_security_strategy': 'Encrypt',
957
'memcache_secret_key': ''
959
self.assertRaises(Exception, self.set_middleware, conf)
961
'auth_host': 'keystone.example.com',
963
'auth_admin_prefix': '/testadmin',
964
'memcached_servers': ['localhost:11211'],
965
'memcache_security_strategy': 'mAc',
966
'memcache_secret_key': ''
968
self.assertRaises(Exception, self.set_middleware, conf)
970
def test_config_revocation_cache_timeout(self):
972
'auth_host': 'keystone.example.com',
974
'auth_admin_prefix': '/testadmin',
975
'revocation_cache_time': 24
977
middleware = auth_token.AuthProtocol(self.fake_app, conf)
978
self.assertEquals(middleware.token_revocation_list_cache_timeout,
979
datetime.timedelta(seconds=24))
981
def test_http_error_not_cached_token(self):
982
"""Test to don't cache token as invalid on network errors.
984
We use UUID tokens since they are the easiest one to reach
987
req = webob.Request.blank('/')
988
token = self.token_dict['uuid_token_default']
989
req.headers['X-Auth-Token'] = token
990
self.set_fake_http(RaisingHTTPNetworkError)
991
self.middleware.http_request_max_retries = 0
992
self.middleware(req.environ, self.start_fake_response)
993
self.assertEqual(self._get_cached_token(token), None)
996
class CertDownloadMiddlewareTest(BaseAuthTokenMiddlewareTest):
998
super(CertDownloadMiddlewareTest, self).setUp()
999
self.base_dir = tempfile.mkdtemp()
1000
self.cert_dir = os.path.join(self.base_dir, 'certs')
1001
os.mkdir(self.cert_dir)
1004
'auth_host': 'keystone.example.com',
1006
'auth_protocol': 'http',
1007
'auth_admin_prefix': '/testadmin',
1008
'signing_dir': self.cert_dir,
1012
shutil.rmtree(self.base_dir)
1013
super(CertDownloadMiddlewareTest, self).tearDown()
1015
# Usually we supply a signed_dir with pre-installed certificates,
1016
# so invocation of /usr/bin/openssl succeeds. This time we give it
1017
# an empty directory, so it fails.
1018
def test_request_no_token_dummy(self):
1019
self.set_middleware(fake_http=self.fake_http, conf=self.conf)
1020
self.assertRaises(cms.subprocess.CalledProcessError,
1021
self.middleware.verify_signed_token,
1022
self.token_dict['signed_token_scoped'])
1024
def test_fetch_signing_cert(self):
1025
self.set_middleware(fake_http=CertificateHTTPConnection,
1028
self.middleware.fetch_signing_cert()
1030
with open(self.middleware.signing_cert_file_name, 'r') as f:
1031
self.assertEqual(f.read(),
1032
CertificateHTTPConnection.signing_cert_data)
1034
self.assertEqual('/testadmin/v2.0/certificates/signing',
1035
self.middleware.http_client_class.last_requested_url)
1037
def test_fetch_signing_ca(self):
1038
self.set_middleware(fake_http=CertificateHTTPConnection,
1041
self.middleware.fetch_ca_cert()
1043
with open(self.middleware.ca_file_name, 'r') as f:
1044
self.assertEqual(f.read(), CertificateHTTPConnection.ca_cert_data)
1046
self.assertEqual('/testadmin/v2.0/certificates/ca',
1047
self.middleware.http_client_class.last_requested_url)
1049
def test_prefix_trailing_slash(self):
1050
self.conf['auth_admin_prefix'] = '/newadmin/'
1051
self.set_middleware(fake_http=CertificateHTTPConnection,
1054
# the requests will return a 404, but it doesn't matter
1056
self.middleware.fetch_ca_cert()
1058
self.assertEqual('/newadmin/v2.0/certificates/ca',
1059
self.middleware.http_client_class.last_requested_url)
1061
self.middleware.fetch_signing_cert()
1063
self.assertEqual('/newadmin/v2.0/certificates/signing',
1064
self.middleware.http_client_class.last_requested_url)
1066
def test_without_prefix(self):
1067
self.conf['auth_admin_prefix'] = ''
1069
self.set_middleware(fake_http=CertificateHTTPConnection,
1072
# the requests will return a 404, but it doesn't matter
1074
self.middleware.fetch_ca_cert()
1076
self.assertEqual('/v2.0/certificates/ca',
1077
self.middleware.http_client_class.last_requested_url)
1079
self.middleware.fetch_signing_cert()
1081
self.assertEqual('/v2.0/certificates/signing',
1082
self.middleware.http_client_class.last_requested_url)
1085
class v2AuthTokenMiddlewareTest(BaseAuthTokenMiddlewareTest):
1086
"""v2 token specific tests.
1088
There are some differences between how the auth-token middleware handles
1089
v2 and v3 tokens over and above the token formats, namely:
1091
- A v3 keystone server will auto scope a token to a user's default project
1092
if no scope is specified. A v2 server assumes that the auth-token
1093
middleware will do that.
1094
- A v2 keystone server may issue a token without a catalog, even with a
1097
The tests below were originally part of the generic AuthTokenMiddlewareTest
1098
class, but now, since they really are v2 specifc, they are included here.
1101
def assert_unscoped_default_tenant_auto_scopes(self, token):
1102
"""Unscoped v2 requests with a default tenant should "auto-scope."
1104
The implied scope is the user's tenant ID.
1107
req = webob.Request.blank('/')
1108
req.headers['X-Auth-Token'] = token
1109
body = self.middleware(req.environ, self.start_fake_response)
1110
self.assertEqual(self.response_status, 200)
1111
self.assertEqual(body, ['SUCCESS'])
1112
self.assertTrue('keystone.token_info' in req.environ)
1114
def test_default_tenant_uuid_token(self):
1115
self.assert_unscoped_default_tenant_auto_scopes(
1116
client_fixtures.UUID_TOKEN_DEFAULT)
1118
def test_default_tenant_signed_token(self):
1119
self.assert_unscoped_default_tenant_auto_scopes(
1120
client_fixtures.SIGNED_TOKEN_SCOPED)
1122
def assert_unscoped_token_receives_401(self, token):
1123
"""Unscoped requests with no default tenant ID should be rejected."""
1124
req = webob.Request.blank('/')
1125
req.headers['X-Auth-Token'] = token
1126
self.middleware(req.environ, self.start_fake_response)
1127
self.assertEqual(self.response_status, 401)
1128
self.assertEqual(self.response_headers['WWW-Authenticate'],
1129
"Keystone uri='https://keystone.example.com:1234'")
1131
def test_unscoped_uuid_token_receives_401(self):
1132
self.assert_unscoped_token_receives_401(
1133
client_fixtures.UUID_TOKEN_UNSCOPED)
1135
def test_unscoped_pki_token_receives_401(self):
1136
self.assert_unscoped_token_receives_401(
1137
client_fixtures.SIGNED_TOKEN_UNSCOPED)
1139
def test_request_prevent_service_catalog_injection(self):
1140
req = webob.Request.blank('/')
1141
req.headers['X-Service-Catalog'] = '[]'
1142
req.headers['X-Auth-Token'] = \
1143
client_fixtures.UUID_TOKEN_NO_SERVICE_CATALOG
1144
body = self.middleware(req.environ, self.start_fake_response)
1145
self.assertEqual(self.response_status, 200)
1146
self.assertFalse(req.headers.get('X-Service-Catalog'))
1147
self.assertEqual(body, ['SUCCESS'])
1149
def test_valid_uuid_request_forced_to_2_0(self):
1150
"""Test forcing auth_token to use lower api version.
1152
By installing the v3 http hander, auth_token will be get
1153
a version list that looks like a v3 server - from which it
1154
would normally chose v3.0 as the auth version. However, here
1155
we specify v2.0 in the configuration - which should force
1156
auth_token to use that version instead.
1160
'auth_host': 'keystone.example.com',
1162
'auth_admin_prefix': '/testadmin',
1163
'signing_dir': client_fixtures.CERTDIR,
1164
'auth_version': 'v2.0'
1166
self.set_middleware(fake_http=v3FakeHTTPConnection, conf=conf)
1167
# This tests will only work is auth_token has chosen to use the
1168
# lower, v2, api version
1169
req = webob.Request.blank('/')
1170
req.headers['X-Auth-Token'] = client_fixtures.UUID_TOKEN_DEFAULT
1171
body = self.middleware(req.environ, self.start_fake_response)
1172
self.assertEqual(self.response_status, 200)
1173
self.assertEqual("/testadmin/v2.0/tokens/%s" %
1174
client_fixtures.UUID_TOKEN_DEFAULT,
1175
v3FakeHTTPConnection.last_requested_url)
1177
def test_invalid_auth_version_request(self):
1179
'auth_host': 'keystone.example.com',
1181
'auth_admin_prefix': '/testadmin',
1182
'signing_dir': client_fixtures.CERTDIR,
1183
'auth_version': 'v1.0' # v1.0 is no longer supported
1185
self.assertRaises(Exception, self.set_middleware, conf)
1188
class v3AuthTokenMiddlewareTest(AuthTokenMiddlewareTest):
1189
"""Test auth_token middleware with v3 tokens.
1191
Re-execute the AuthTokenMiddlewareTest class tests, but with the
1192
the auth_token middleware configured to expect v3 tokens back from
1195
This is done by configuring the AuthTokenMiddlewareTest class via
1196
its Setup(), passing in v3 style data that will then be used by
1197
the tests themselves. This approach has been used to ensure we
1198
really are running the same tests for both v2 and v3 tokens.
1200
There a few additional specific test for v3 only:
1202
- We allow an unscoped token to be validated (as unscoped), where
1203
as for v2 tokens, the auth_token middleware is expected to try and
1204
auto-scope it (and fail if there is no default tenant)
1205
- Domain scoped tokens
1207
Since we don't specify an auth version for auth_token to use, by
1208
definition we are thefore implicitely testing that it will use
1209
the highest available auth version, i.e. v3.0
1214
'uuid_token_default': client_fixtures.v3_UUID_TOKEN_DEFAULT,
1215
'uuid_token_unscoped': client_fixtures.v3_UUID_TOKEN_UNSCOPED,
1216
'signed_token_scoped': client_fixtures.SIGNED_v3_TOKEN_SCOPED,
1217
'signed_token_scoped_expired':
1218
client_fixtures.SIGNED_TOKEN_SCOPED_EXPIRED,
1219
'revoked_token': client_fixtures.REVOKED_v3_TOKEN,
1220
'revoked_token_hash': client_fixtures.REVOKED_v3_TOKEN_HASH
1222
super(v3AuthTokenMiddlewareTest, self).setUp(
1223
auth_version='v3.0',
1225
fake_http=v3FakeHTTPConnection,
1226
token_dict=token_dict)
1228
def assert_valid_last_url(self, token_id):
1229
# Token ID is not part of the url in v3, so override
1230
# this assert test in the base class
1231
self.assertEqual('/testadmin/v3/auth/tokens',
1232
v3FakeHTTPConnection.last_requested_url)
1234
def test_valid_unscoped_uuid_request(self):
1235
# Remove items that won't be in an unscoped token
1236
delta_expected_env = {
1237
'HTTP_X_PROJECT_ID': None,
1238
'HTTP_X_PROJECT_NAME': None,
1239
'HTTP_X_PROJECT_DOMAIN_ID': None,
1240
'HTTP_X_PROJECT_DOMAIN_NAME': None,
1241
'HTTP_X_TENANT_ID': None,
1242
'HTTP_X_TENANT_NAME': None,
1244
'HTTP_X_TENANT': None,
1247
self.set_middleware(expected_env=delta_expected_env)
1248
self.assert_valid_request_200(client_fixtures.v3_UUID_TOKEN_UNSCOPED,
1250
self.assertEqual('/testadmin/v3/auth/tokens',
1251
v3FakeHTTPConnection.last_requested_url)
1253
def test_domain_scoped_uuid_request(self):
1254
# Modify items compared to default token for a domain scope
1255
delta_expected_env = {
1256
'HTTP_X_DOMAIN_ID': 'domain_id1',
1257
'HTTP_X_DOMAIN_NAME': 'domain_name1',
1258
'HTTP_X_PROJECT_ID': None,
1259
'HTTP_X_PROJECT_NAME': None,
1260
'HTTP_X_PROJECT_DOMAIN_ID': None,
1261
'HTTP_X_PROJECT_DOMAIN_NAME': None,
1262
'HTTP_X_TENANT_ID': None,
1263
'HTTP_X_TENANT_NAME': None,
1264
'HTTP_X_TENANT': None
1266
self.set_middleware(expected_env=delta_expected_env)
1267
self.assert_valid_request_200(
1268
client_fixtures.v3_UUID_TOKEN_DOMAIN_SCOPED)
1269
self.assertEqual('/testadmin/v3/auth/tokens',
1270
v3FakeHTTPConnection.last_requested_url)
1273
class TokenEncodingTest(testtools.TestCase):
1274
def test_unquoted_token(self):
1275
self.assertEqual('foo%20bar', auth_token.safe_quote('foo bar'))
1277
def test_quoted_token(self):
1278
self.assertEqual('foo%20bar', auth_token.safe_quote('foo%20bar'))