1
# vim: tabstop=4 shiftwidth=4 softtabstop=4
3
# Copyright 2012 OpenStack Foundation
5
# Licensed under the Apache License, Version 2.0 (the "License"); you may
6
# not use this file except in compliance with the License. You may obtain
7
# a copy of the License at
9
# http://www.apache.org/licenses/LICENSE-2.0
11
# Unless required by applicable law or agreed to in writing, software
12
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
13
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
14
# License for the specific language governing permissions and limitations
33
from keystoneclient.common import cms
34
from keystoneclient.middleware import auth_token
35
from keystoneclient.openstack.common import jsonutils
36
from keystoneclient.openstack.common import memorycache
37
from keystoneclient.openstack.common import timeutils
38
from keystoneclient.tests import client_fixtures
41
EXPECTED_V2_DEFAULT_ENV_RESPONSE = {
42
'HTTP_X_IDENTITY_STATUS': 'Confirmed',
43
'HTTP_X_TENANT_ID': 'tenant_id1',
44
'HTTP_X_TENANT_NAME': 'tenant_name1',
45
'HTTP_X_USER_ID': 'user_id1',
46
'HTTP_X_USER_NAME': 'user_name1',
47
'HTTP_X_ROLES': 'role1,role2',
48
'HTTP_X_USER': 'user_name1', # deprecated (diablo-compat)
49
'HTTP_X_TENANT': 'tenant_name1', # deprecated (diablo-compat)
50
'HTTP_X_ROLE': 'role1,role2', # deprecated (diablo-compat)
54
BASE_HOST = 'https://keystone.example.com:1234'
55
BASE_URI = '%s/testadmin' % BASE_HOST
56
FAKE_ADMIN_TOKEN_ID = 'admin_token2'
57
FAKE_ADMIN_TOKEN = jsonutils.dumps(
58
{'access': {'token': {'id': FAKE_ADMIN_TOKEN_ID,
59
'expires': '2022-10-03T16:58:01Z'}}})
62
VERSION_LIST_v3 = jsonutils.dumps({
68
"updated": "2013-03-06T00:00:00Z",
69
"links": [{'href': '%s/v3' % BASE_URI, 'rel': 'self'}]
74
"updated": "2011-11-19T00:00:00Z",
75
"links": [{'href': '%s/v2.0' % BASE_URI, 'rel': 'self'}]
81
VERSION_LIST_v2 = jsonutils.dumps({
87
"updated": "2011-11-19T00:00:00Z",
94
ERROR_TOKEN = '7ae290c2a06244c4b41692eb4e9225f2'
97
class NoModuleFinder(object):
98
"""Disallow further imports of 'module'."""
100
def __init__(self, module):
103
def find_module(self, fullname, path):
104
if fullname == self.module or fullname.startswith(self.module + '.'):
108
class DisableModuleFixture(fixtures.Fixture):
109
"""A fixture to provide support for unloading/disabling modules."""
111
def __init__(self, module, *args, **kw):
112
super(DisableModuleFixture, self).__init__(*args, **kw)
115
self._cleared_modules = {}
118
super(DisableModuleFixture, self).tearDown()
119
for finder in self._finders:
120
sys.meta_path.remove(finder)
121
sys.modules.update(self._cleared_modules)
123
def clear_module(self):
125
for fullname in sys.modules.keys():
126
if (fullname == self.module or
127
fullname.startswith(self.module + '.')):
128
cleared_modules[fullname] = sys.modules.pop(fullname)
129
return cleared_modules
132
"""Ensure ImportError for the specified module."""
134
super(DisableModuleFixture, self).setUp()
136
# Clear 'module' references in sys.modules
137
self._cleared_modules.update(self.clear_module())
139
finder = NoModuleFinder(self.module)
140
self._finders.append(finder)
141
sys.meta_path.insert(0, finder)
144
class FakeSwiftOldMemcacheClient(memorycache.Client):
145
# NOTE(vish,chmou): old swift memcache uses param timeout instead of time
146
def set(self, key, value, timeout=0, min_compress_len=0):
147
sup = super(FakeSwiftOldMemcacheClient, self)
148
sup.set(key, value, timeout, min_compress_len)
151
class FakeApp(object):
152
"""This represents a WSGI app protected by the auth_token middleware."""
154
def __init__(self, expected_env=None):
155
self.expected_env = dict(EXPECTED_V2_DEFAULT_ENV_RESPONSE)
158
self.expected_env.update(expected_env)
160
def __call__(self, env, start_response):
161
for k, v in self.expected_env.items():
162
assert env[k] == v, '%s != %s' % (env[k], v)
164
resp = webob.Response()
165
resp.body = 'SUCCESS'
166
return resp(env, start_response)
169
class v3FakeApp(FakeApp):
170
"""This represents a v3 WSGI app protected by the auth_token middleware."""
172
def __init__(self, expected_env=None):
174
# with v3 additions, these are for the DEFAULT TOKEN
175
v3_default_env_additions = {
176
'HTTP_X_PROJECT_ID': 'tenant_id1',
177
'HTTP_X_PROJECT_NAME': 'tenant_name1',
178
'HTTP_X_PROJECT_DOMAIN_ID': 'domain_id1',
179
'HTTP_X_PROJECT_DOMAIN_NAME': 'domain_name1',
180
'HTTP_X_USER_DOMAIN_ID': 'domain_id1',
181
'HTTP_X_USER_DOMAIN_NAME': 'domain_name1'
185
v3_default_env_additions.update(expected_env)
187
super(v3FakeApp, self).__init__(v3_default_env_additions)
190
class BaseAuthTokenMiddlewareTest(testtools.TestCase):
191
"""Base test class for auth_token middleware.
193
All the tests allow for running with auth_token
194
configured for receiving v2 or v3 tokens, with the
195
choice being made by passing configuration data into
198
The base class will, by default, run all the tests
199
expecting v2 token formats. Child classes can override
200
this to specify, for instance, v3 format.
203
def setUp(self, expected_env=None, auth_version=None, fake_app=None):
204
testtools.TestCase.setUp(self)
206
self.expected_env = expected_env or dict()
207
self.fake_app = fake_app or FakeApp
208
self.middleware = None
211
'auth_host': 'keystone.example.com',
213
'auth_protocol': 'https',
214
'auth_admin_prefix': '/testadmin',
215
'signing_dir': client_fixtures.CERTDIR,
216
'auth_version': auth_version,
217
'auth_uri': 'https://keystone.example.com:1234',
220
self.response_status = None
221
self.response_headers = None
223
def set_middleware(self, fake_app=None, expected_env=None, conf=None):
224
"""Configure the class ready to call the auth_token middleware.
226
Set up the various fake items needed to run the middleware.
227
Individual tests that need to further refine these can call this
228
function to override the class defaults.
232
self.conf.update(conf)
235
fake_app = self.fake_app
238
self.expected_env.update(expected_env)
240
self.middleware = auth_token.AuthProtocol(fake_app(self.expected_env),
242
self.middleware._iso8601 = iso8601
243
self.middleware.revoked_file_name = tempfile.mkstemp()[1]
244
self.middleware.token_revocation_list = jsonutils.dumps(
245
{"revoked": [], "extra": "success"})
248
testtools.TestCase.tearDown(self)
251
os.remove(self.middleware.revoked_file_name)
255
def start_fake_response(self, status, headers):
256
self.response_status = int(status.split(' ', 1)[0])
257
self.response_headers = dict(headers)
259
def assertLastPath(self, path):
261
self.assertEqual(path, httpretty.httpretty.last_request.path)
263
self.assertIsInstance(httpretty.httpretty.last_request,
264
httpretty.core.HTTPrettyRequestEmpty)
266
if tuple(sys.version_info)[0:2] < (2, 7):
268
# 2.6 doesn't have the assert dict equals so make sure that it exists
269
class AdjustedBaseAuthTokenMiddlewareTest(BaseAuthTokenMiddlewareTest):
270
def assertIsInstance(self, obj, cls, msg=None):
271
"""Same as self.assertTrue(isinstance(obj, cls)), with a nicer
274
if not isinstance(obj, cls):
275
standardMsg = '%s is not an instance of %r' % (obj, cls)
276
self.fail(self._formatMessage(msg, standardMsg))
278
def assertDictEqual(self, d1, d2, msg=None):
279
# Simple version taken from 2.7
280
self.assertIsInstance(d1, dict,
281
'First argument is not a dictionary')
282
self.assertIsInstance(d2, dict,
283
'Second argument is not a dictionary')
288
standardMsg = '%r != %r' % (d1, d2)
289
self.fail(standardMsg)
291
BaseAuthTokenMiddlewareTest = AdjustedBaseAuthTokenMiddlewareTest
294
class MultiStepAuthTokenMiddlewareTest(BaseAuthTokenMiddlewareTest):
297
def test_fetch_revocation_list_with_expire(self):
298
self.set_middleware()
300
# Get a token, then try to retrieve revocation list and get a 401.
301
# Get a new token, try to retrieve revocation list and return 200.
302
httpretty.register_uri(httpretty.POST, "%s/v2.0/tokens" % BASE_URI,
303
body=FAKE_ADMIN_TOKEN)
305
responses = [httpretty.Response(body='', status=401),
307
body=client_fixtures.SIGNED_REVOCATION_LIST)]
309
httpretty.register_uri(httpretty.GET,
310
"%s/v2.0/tokens/revoked" % BASE_URI,
313
fetched_list = jsonutils.loads(self.middleware.fetch_revocation_list())
314
self.assertEqual(fetched_list, client_fixtures.REVOCATION_LIST)
316
# Check that 4 requests have been made
317
self.assertEqual(len(httpretty.httpretty.latest_requests), 4)
320
class DiabloAuthTokenMiddlewareTest(BaseAuthTokenMiddlewareTest):
321
"""Auth Token middleware should understand Diablo keystone responses."""
323
# pre-diablo only had Tenant ID, which was also the Name
325
'HTTP_X_TENANT_ID': 'tenant_id1',
326
'HTTP_X_TENANT_NAME': 'tenant_id1',
327
# now deprecated (diablo-compat)
328
'HTTP_X_TENANT': 'tenant_id1',
331
super(DiabloAuthTokenMiddlewareTest, self).setUp(
332
expected_env=expected_env)
334
httpretty.httpretty.reset()
337
httpretty.register_uri(httpretty.GET,
339
body=VERSION_LIST_v2,
342
httpretty.register_uri(httpretty.POST,
343
"%s/v2.0/tokens" % BASE_URI,
344
body=FAKE_ADMIN_TOKEN)
346
self.token_id = client_fixtures.VALID_DIABLO_TOKEN
347
token_response = client_fixtures.JSON_TOKEN_RESPONSES[self.token_id]
349
httpretty.register_uri(httpretty.GET,
350
"%s/v2.0/tokens/%s" % (BASE_URI, self.token_id),
353
self.set_middleware()
357
super(DiabloAuthTokenMiddlewareTest, self).tearDown()
359
def test_valid_diablo_response(self):
360
req = webob.Request.blank('/')
361
req.headers['X-Auth-Token'] = self.token_id
362
self.middleware(req.environ, self.start_fake_response)
363
self.assertEqual(self.response_status, 200)
364
self.assertTrue('keystone.token_info' in req.environ)
367
class NoMemcacheAuthToken(BaseAuthTokenMiddlewareTest):
370
super(NoMemcacheAuthToken, self).setUp()
371
self.useFixture(DisableModuleFixture('memcache'))
373
def test_nomemcache(self):
375
'admin_token': 'admin_token1',
376
'auth_host': 'keystone.example.com',
378
'memcached_servers': 'localhost:11211',
379
'auth_uri': 'https://keystone.example.com:1234',
382
auth_token.AuthProtocol(FakeApp(), conf)
384
def test_not_use_cache_from_env(self):
385
env = {'swift.cache': 'CACHE_TEST'}
387
'memcached_servers': 'localhost:11211'
389
self.set_middleware(conf=conf)
390
self.middleware._init_cache(env)
391
self.assertNotEqual(self.middleware._cache, 'CACHE_TEST')
394
class CommonAuthTokenMiddlewareTest(object):
396
def test_init_does_not_call_http(self):
398
'revocation_cache_time': 1
400
self.set_middleware(conf=conf)
401
self.assertLastPath(None)
403
def test_init_by_ipv6Addr_auth_host(self):
405
'auth_host': '2001:2013:1:f101::1',
407
'auth_protocol': 'http',
410
self.set_middleware(conf=conf)
411
expected_auth_uri = 'http://[2001:2013:1:f101::1]:1234'
412
self.assertEqual(expected_auth_uri, self.middleware.auth_uri)
414
def assert_valid_request_200(self, token, with_catalog=True):
415
req = webob.Request.blank('/')
416
req.headers['X-Auth-Token'] = token
417
body = self.middleware(req.environ, self.start_fake_response)
418
self.assertEqual(self.response_status, 200)
420
self.assertTrue(req.headers.get('X-Service-Catalog'))
421
self.assertEqual(body, ['SUCCESS'])
422
self.assertTrue('keystone.token_info' in req.environ)
424
def test_valid_uuid_request(self):
425
self.assert_valid_request_200(self.token_dict['uuid_token_default'])
426
self.assert_valid_last_url(self.token_dict['uuid_token_default'])
428
def test_valid_signed_request(self):
429
self.assert_valid_request_200(
430
self.token_dict['signed_token_scoped'])
431
self.assertEqual(self.middleware.conf['auth_admin_prefix'],
433
#ensure that signed requests do not generate HTTP traffic
434
self.assertLastPath(None)
436
def test_revoked_token_receives_401(self):
437
self.middleware.token_revocation_list = self.get_revocation_list_json()
438
req = webob.Request.blank('/')
439
req.headers['X-Auth-Token'] = self.token_dict['revoked_token']
440
self.middleware(req.environ, self.start_fake_response)
441
self.assertEqual(self.response_status, 401)
443
def get_revocation_list_json(self, token_ids=None):
444
if token_ids is None:
445
token_ids = [self.token_dict['revoked_token_hash']]
446
revocation_list = {'revoked': [{'id': x, 'expires': timeutils.utcnow()}
448
return jsonutils.dumps(revocation_list)
450
def test_is_signed_token_revoked_returns_false(self):
451
#explicitly setting an empty revocation list here to document intent
452
self.middleware.token_revocation_list = jsonutils.dumps(
453
{"revoked": [], "extra": "success"})
454
result = self.middleware.is_signed_token_revoked(
455
self.token_dict['revoked_token'])
456
self.assertFalse(result)
458
def test_is_signed_token_revoked_returns_true(self):
459
self.middleware.token_revocation_list = self.get_revocation_list_json()
460
result = self.middleware.is_signed_token_revoked(
461
self.token_dict['revoked_token'])
462
self.assertTrue(result)
464
def test_verify_signed_token_raises_exception_for_revoked_token(self):
465
self.middleware.token_revocation_list = self.get_revocation_list_json()
466
self.assertRaises(auth_token.InvalidUserToken,
467
self.middleware.verify_signed_token,
468
self.token_dict['revoked_token'])
470
def test_verify_signed_token_succeeds_for_unrevoked_token(self):
471
self.middleware.token_revocation_list = self.get_revocation_list_json()
472
self.middleware.verify_signed_token(
473
self.token_dict['signed_token_scoped'])
475
def test_verify_signing_dir_create_while_missing(self):
476
tmp_name = uuid.uuid4().hex
477
test_parent_signing_dir = "/tmp/%s" % tmp_name
478
self.middleware.signing_dirname = "/tmp/%s/%s" % ((tmp_name,) * 2)
479
self.middleware.signing_cert_file_name = "%s/test.pem" %\
480
self.middleware.signing_dirname
481
self.middleware.verify_signing_dir()
482
# NOTE(wu_wenxiang): Verify if the signing dir was created as expected.
483
self.assertTrue(os.path.isdir(self.middleware.signing_dirname))
484
self.assertTrue(os.access(self.middleware.signing_dirname, os.W_OK))
485
self.assertEqual(os.stat(self.middleware.signing_dirname).st_uid,
488
stat.S_IMODE(os.stat(self.middleware.signing_dirname).st_mode),
490
shutil.rmtree(test_parent_signing_dir)
492
def test_cert_file_missing(self):
493
self.assertFalse(self.middleware.cert_file_missing(
494
"openstack: /tmp/haystack: No such file or directory",
496
self.assertTrue(self.middleware.cert_file_missing(
497
"openstack: /not/exist: No such file or directory",
500
def test_get_token_revocation_list_fetched_time_returns_min(self):
501
self.middleware.token_revocation_list_fetched_time = None
502
self.middleware.revoked_file_name = ''
503
self.assertEqual(self.middleware.token_revocation_list_fetched_time,
504
datetime.datetime.min)
506
def test_get_token_revocation_list_fetched_time_returns_mtime(self):
507
self.middleware.token_revocation_list_fetched_time = None
508
mtime = os.path.getmtime(self.middleware.revoked_file_name)
509
fetched_time = datetime.datetime.fromtimestamp(mtime)
510
self.assertEqual(self.middleware.token_revocation_list_fetched_time,
513
def test_get_token_revocation_list_fetched_time_returns_value(self):
514
expected = self.middleware._token_revocation_list_fetched_time
515
self.assertEqual(self.middleware.token_revocation_list_fetched_time,
518
def test_get_revocation_list_returns_fetched_list(self):
519
# auth_token uses v2 to fetch this, so don't allow the v3
520
# tests to override the fake http connection
521
self.middleware.token_revocation_list_fetched_time = None
522
os.remove(self.middleware.revoked_file_name)
523
self.assertEqual(self.middleware.token_revocation_list,
524
client_fixtures.REVOCATION_LIST)
526
def test_get_revocation_list_returns_current_list_from_memory(self):
527
self.assertEqual(self.middleware.token_revocation_list,
528
self.middleware._token_revocation_list)
530
def test_get_revocation_list_returns_current_list_from_disk(self):
531
in_memory_list = self.middleware.token_revocation_list
532
self.middleware._token_revocation_list = None
533
self.assertEqual(self.middleware.token_revocation_list, in_memory_list)
535
def test_invalid_revocation_list_raises_service_error(self):
536
httpretty.register_uri(httpretty.GET,
537
"%s/v2.0/tokens/revoked" % BASE_URI,
541
self.assertRaises(auth_token.ServiceError,
542
self.middleware.fetch_revocation_list)
544
def test_fetch_revocation_list(self):
545
# auth_token uses v2 to fetch this, so don't allow the v3
546
# tests to override the fake http connection
547
fetched_list = jsonutils.loads(self.middleware.fetch_revocation_list())
548
self.assertEqual(fetched_list, client_fixtures.REVOCATION_LIST)
550
def test_request_invalid_uuid_token(self):
551
# remember because we are testing the middleware we stub the connection
552
# to the keystone server, but this is not what gets returned
553
invalid_uri = "%s/v2.0/tokens/invalid-token" % BASE_URI
554
httpretty.register_uri(httpretty.GET, invalid_uri, body="", status=404)
556
req = webob.Request.blank('/')
557
req.headers['X-Auth-Token'] = 'invalid-token'
558
self.middleware(req.environ, self.start_fake_response)
559
self.assertEqual(self.response_status, 401)
560
self.assertEqual(self.response_headers['WWW-Authenticate'],
561
"Keystone uri='https://keystone.example.com:1234'")
563
def test_request_invalid_signed_token(self):
564
req = webob.Request.blank('/')
565
req.headers['X-Auth-Token'] = client_fixtures.INVALID_SIGNED_TOKEN
566
self.middleware(req.environ, self.start_fake_response)
567
self.assertEqual(self.response_status, 401)
568
self.assertEqual(self.response_headers['WWW-Authenticate'],
569
"Keystone uri='https://keystone.example.com:1234'")
571
def test_request_no_token(self):
572
req = webob.Request.blank('/')
573
self.middleware(req.environ, self.start_fake_response)
574
self.assertEqual(self.response_status, 401)
575
self.assertEqual(self.response_headers['WWW-Authenticate'],
576
"Keystone uri='https://keystone.example.com:1234'")
578
def test_request_no_token_log_message(self):
579
class FakeLog(object):
584
def warn(self, msg=None, *args, **kwargs):
587
def debug(self, msg=None, *args, **kwargs):
590
self.middleware.LOG = FakeLog()
591
self.middleware.delay_auth_decision = False
592
self.assertRaises(auth_token.InvalidUserToken,
593
self.middleware._get_user_token_from_header, {})
594
self.assertIsNotNone(self.middleware.LOG.msg)
595
self.assertIsNotNone(self.middleware.LOG.debugmsg)
597
def test_request_no_token_http(self):
598
req = webob.Request.blank('/', environ={'REQUEST_METHOD': 'HEAD'})
599
self.set_middleware()
600
body = self.middleware(req.environ, self.start_fake_response)
601
self.assertEqual(self.response_status, 401)
602
self.assertEqual(self.response_headers['WWW-Authenticate'],
603
"Keystone uri='https://keystone.example.com:1234'")
604
self.assertEqual(body, [''])
606
def test_request_blank_token(self):
607
req = webob.Request.blank('/')
608
req.headers['X-Auth-Token'] = ''
609
self.middleware(req.environ, self.start_fake_response)
610
self.assertEqual(self.response_status, 401)
611
self.assertEqual(self.response_headers['WWW-Authenticate'],
612
"Keystone uri='https://keystone.example.com:1234'")
614
def _get_cached_token(self, token):
615
token_id = cms.cms_hash_token(token)
616
# NOTE(vish): example tokens are expired so skip the expiration check.
617
return self.middleware._cache_get(token_id, ignore_expires=True)
619
def test_memcache(self):
620
# NOTE(jamielennox): it appears that httpretty can mess with the
621
# memcache socket. Just disable it as it's not required here anyway.
623
req = webob.Request.blank('/')
624
token = self.token_dict['signed_token_scoped']
625
req.headers['X-Auth-Token'] = token
626
self.middleware(req.environ, self.start_fake_response)
627
self.assertNotEqual(self._get_cached_token(token), None)
629
def test_expired(self):
631
req = webob.Request.blank('/')
632
token = self.token_dict['signed_token_scoped_expired']
633
req.headers['X-Auth-Token'] = token
634
self.middleware(req.environ, self.start_fake_response)
635
self.assertEqual(self.response_status, 401)
637
def test_memcache_set_invalid_uuid(self):
638
invalid_uri = "%s/v2.0/tokens/invalid-token" % BASE_URI
639
httpretty.register_uri(httpretty.GET, invalid_uri, body="", status=404)
641
req = webob.Request.blank('/')
642
token = 'invalid-token'
643
req.headers['X-Auth-Token'] = token
644
self.middleware(req.environ, self.start_fake_response)
645
self.assertRaises(auth_token.InvalidUserToken,
646
self._get_cached_token, token)
648
def test_memcache_set_invalid_signed(self):
649
req = webob.Request.blank('/')
650
token = self.token_dict['signed_token_scoped_expired']
651
req.headers['X-Auth-Token'] = token
652
self.middleware(req.environ, self.start_fake_response)
653
self.assertRaises(auth_token.InvalidUserToken,
654
self._get_cached_token, token)
656
def test_memcache_set_expired(self, extra_conf={}, extra_environ={}):
658
token_cache_time = 10
660
'token_cache_time': token_cache_time,
661
'signing_dir': client_fixtures.CERTDIR,
663
conf.update(extra_conf)
664
self.set_middleware(conf=conf)
665
req = webob.Request.blank('/')
666
token = self.token_dict['signed_token_scoped']
667
req.headers['X-Auth-Token'] = token
668
req.environ.update(extra_environ)
670
now = datetime.datetime.utcnow()
671
timeutils.set_time_override(now)
672
self.middleware(req.environ, self.start_fake_response)
673
self.assertNotEqual(self._get_cached_token(token), None)
674
expired = now + datetime.timedelta(seconds=token_cache_time)
675
timeutils.set_time_override(expired)
676
self.assertEqual(self._get_cached_token(token), None)
678
timeutils.clear_time_override()
680
def test_old_swift_memcache_set_expired(self):
681
extra_conf = {'cache': 'swift.cache'}
682
extra_environ = {'swift.cache': FakeSwiftOldMemcacheClient()}
683
self.test_memcache_set_expired(extra_conf, extra_environ)
685
def test_swift_memcache_set_expired(self):
686
extra_conf = {'cache': 'swift.cache'}
687
extra_environ = {'swift.cache': memorycache.Client()}
688
self.test_memcache_set_expired(extra_conf, extra_environ)
690
def test_use_cache_from_env(self):
691
env = {'swift.cache': 'CACHE_TEST'}
693
'cache': 'swift.cache',
694
'memcached_servers': ['localhost:11211']
696
self.set_middleware(conf=conf)
697
self.middleware._init_cache(env)
698
self.assertEqual(self.middleware._cache, 'CACHE_TEST')
700
def test_will_expire_soon(self):
701
tenseconds = datetime.datetime.utcnow() + datetime.timedelta(
703
self.assertTrue(auth_token.will_expire_soon(tenseconds))
704
fortyseconds = datetime.datetime.utcnow() + datetime.timedelta(
706
self.assertFalse(auth_token.will_expire_soon(fortyseconds))
708
def test_token_is_v2_accepts_v2(self):
709
token = client_fixtures.UUID_TOKEN_DEFAULT
710
token_response = client_fixtures.TOKEN_RESPONSES[token]
711
self.assertTrue(auth_token._token_is_v2(token_response))
713
def test_token_is_v2_rejects_v3(self):
714
token = client_fixtures.v3_UUID_TOKEN_DEFAULT
715
token_response = client_fixtures.TOKEN_RESPONSES[token]
716
self.assertFalse(auth_token._token_is_v2(token_response))
718
def test_token_is_v3_rejects_v2(self):
719
token = client_fixtures.UUID_TOKEN_DEFAULT
720
token_response = client_fixtures.TOKEN_RESPONSES[token]
721
self.assertFalse(auth_token._token_is_v3(token_response))
723
def test_token_is_v3_accepts_v3(self):
724
token = client_fixtures.v3_UUID_TOKEN_DEFAULT
725
token_response = client_fixtures.TOKEN_RESPONSES[token]
726
self.assertTrue(auth_token._token_is_v3(token_response))
728
def test_encrypt_cache_data(self):
731
'memcached_servers': ['localhost:11211'],
732
'memcache_security_strategy': 'encrypt',
733
'memcache_secret_key': 'mysecret'
735
self.set_middleware(conf=conf)
737
some_time_later = timeutils.utcnow() + datetime.timedelta(hours=4)
738
expires = timeutils.strtime(some_time_later)
739
data = ('this_data', expires)
740
self.middleware._init_cache({})
741
self.middleware._cache_store(token, data)
742
self.assertEqual(self.middleware._cache_get(token), data[0])
744
def test_sign_cache_data(self):
747
'memcached_servers': ['localhost:11211'],
748
'memcache_security_strategy': 'mac',
749
'memcache_secret_key': 'mysecret'
751
self.set_middleware(conf=conf)
753
some_time_later = timeutils.utcnow() + datetime.timedelta(hours=4)
754
expires = timeutils.strtime(some_time_later)
755
data = ('this_data', expires)
756
self.middleware._init_cache({})
757
self.middleware._cache_store(token, data)
758
self.assertEqual(self.middleware._cache_get(token), data[0])
760
def test_no_memcache_protection(self):
763
'memcached_servers': ['localhost:11211'],
764
'memcache_secret_key': 'mysecret'
766
self.set_middleware(conf=conf)
768
some_time_later = timeutils.utcnow() + datetime.timedelta(hours=4)
769
expires = timeutils.strtime(some_time_later)
770
data = ('this_data', expires)
771
self.middleware._init_cache({})
772
self.middleware._cache_store(token, data)
773
self.assertEqual(self.middleware._cache_get(token), data[0])
775
def test_assert_valid_memcache_protection_config(self):
776
# test missing memcache_secret_key
778
'memcached_servers': ['localhost:11211'],
779
'memcache_security_strategy': 'Encrypt'
781
self.assertRaises(auth_token.ConfigurationError, self.set_middleware,
783
# test invalue memcache_security_strategy
785
'memcached_servers': ['localhost:11211'],
786
'memcache_security_strategy': 'whatever'
788
self.assertRaises(auth_token.ConfigurationError, self.set_middleware,
790
# test missing memcache_secret_key
792
'memcached_servers': ['localhost:11211'],
793
'memcache_security_strategy': 'mac'
795
self.assertRaises(auth_token.ConfigurationError, self.set_middleware,
798
'memcached_servers': ['localhost:11211'],
799
'memcache_security_strategy': 'Encrypt',
800
'memcache_secret_key': ''
802
self.assertRaises(auth_token.ConfigurationError, self.set_middleware,
805
'memcached_servers': ['localhost:11211'],
806
'memcache_security_strategy': 'mAc',
807
'memcache_secret_key': ''
809
self.assertRaises(auth_token.ConfigurationError, self.set_middleware,
812
def test_config_revocation_cache_timeout(self):
814
'revocation_cache_time': 24,
815
'auth_uri': 'https://keystone.example.com:1234',
817
middleware = auth_token.AuthProtocol(self.fake_app, conf)
818
self.assertEqual(middleware.token_revocation_list_cache_timeout,
819
datetime.timedelta(seconds=24))
821
def test_http_error_not_cached_token(self):
822
"""Test to don't cache token as invalid on network errors.
824
We use UUID tokens since they are the easiest one to reach
827
req = webob.Request.blank('/')
828
req.headers['X-Auth-Token'] = ERROR_TOKEN
829
self.middleware.http_request_max_retries = 0
830
self.middleware(req.environ, self.start_fake_response)
831
self.assertEqual(self._get_cached_token(ERROR_TOKEN), None)
832
self.assert_valid_last_url(ERROR_TOKEN)
834
def test_http_request_max_retries(self):
837
req = webob.Request.blank('/')
838
req.headers['X-Auth-Token'] = ERROR_TOKEN
840
conf = {'http_request_max_retries': times_retry}
841
self.set_middleware(conf=conf)
843
with mock.patch('time.sleep') as mock_obj:
844
self.middleware(req.environ, self.start_fake_response)
846
self.assertEqual(mock_obj.call_count, times_retry)
849
class CertDownloadMiddlewareTest(BaseAuthTokenMiddlewareTest):
851
super(CertDownloadMiddlewareTest, self).setUp()
852
self.base_dir = tempfile.mkdtemp()
853
self.cert_dir = os.path.join(self.base_dir, 'certs')
854
os.mkdir(self.cert_dir)
856
'signing_dir': self.cert_dir,
858
self.set_middleware(conf=conf)
864
shutil.rmtree(self.base_dir)
865
super(CertDownloadMiddlewareTest, self).tearDown()
867
# Usually we supply a signed_dir with pre-installed certificates,
868
# so invocation of /usr/bin/openssl succeeds. This time we give it
869
# an empty directory, so it fails.
870
def test_request_no_token_dummy(self):
871
cms._ensure_subprocess()
873
httpretty.register_uri(httpretty.GET,
874
"%s/v2.0/certificates/ca" % BASE_URI,
876
httpretty.register_uri(httpretty.GET,
877
"%s/v2.0/certificates/signing" % BASE_URI,
879
self.assertRaises(cms.subprocess.CalledProcessError,
880
self.middleware.verify_signed_token,
881
client_fixtures.SIGNED_TOKEN_SCOPED)
883
def test_fetch_signing_cert(self):
885
httpretty.register_uri(httpretty.GET,
886
"%s/v2.0/certificates/signing" % BASE_URI,
888
self.middleware.fetch_signing_cert()
890
with open(self.middleware.signing_cert_file_name, 'r') as f:
891
self.assertEqual(f.read(), data)
893
self.assertEqual("/testadmin/v2.0/certificates/signing",
894
httpretty.httpretty.last_request.path)
896
def test_fetch_signing_ca(self):
898
httpretty.register_uri(httpretty.GET,
899
"%s/v2.0/certificates/ca" % BASE_URI,
901
self.middleware.fetch_ca_cert()
903
with open(self.middleware.signing_ca_file_name, 'r') as f:
904
self.assertEqual(f.read(), data)
906
self.assertEqual("/testadmin/v2.0/certificates/ca",
907
httpretty.httpretty.last_request.path)
909
def test_prefix_trailing_slash(self):
910
self.conf['auth_admin_prefix'] = '/newadmin/'
912
httpretty.register_uri(httpretty.GET,
913
"%s/newadmin/v2.0/certificates/ca" % BASE_HOST,
915
httpretty.register_uri(httpretty.GET,
916
"%s/newadmin/v2.0/certificates/signing" %
917
BASE_HOST, body='FAKECERT')
919
self.set_middleware(conf=self.conf)
921
self.middleware.fetch_ca_cert()
923
self.assertEqual('/newadmin/v2.0/certificates/ca',
924
httpretty.httpretty.last_request.path)
926
self.middleware.fetch_signing_cert()
928
self.assertEqual('/newadmin/v2.0/certificates/signing',
929
httpretty.httpretty.last_request.path)
931
def test_without_prefix(self):
932
self.conf['auth_admin_prefix'] = ''
934
httpretty.register_uri(httpretty.GET,
935
"%s/v2.0/certificates/ca" % BASE_HOST,
937
httpretty.register_uri(httpretty.GET,
938
"%s/v2.0/certificates/signing" % BASE_HOST,
941
self.set_middleware(conf=self.conf)
943
self.middleware.fetch_ca_cert()
945
self.assertEqual('/v2.0/certificates/ca',
946
httpretty.httpretty.last_request.path)
948
self.middleware.fetch_signing_cert()
950
self.assertEqual('/v2.0/certificates/signing',
951
httpretty.httpretty.last_request.path)
954
def network_error_response(method, uri, headers):
955
raise auth_token.NetworkError("Network connection error.")
958
class v2AuthTokenMiddlewareTest(BaseAuthTokenMiddlewareTest,
959
CommonAuthTokenMiddlewareTest):
960
"""v2 token specific tests.
962
There are some differences between how the auth-token middleware handles
963
v2 and v3 tokens over and above the token formats, namely:
965
- A v3 keystone server will auto scope a token to a user's default project
966
if no scope is specified. A v2 server assumes that the auth-token
967
middleware will do that.
968
- A v2 keystone server may issue a token without a catalog, even with a
971
The tests below were originally part of the generic AuthTokenMiddlewareTest
972
class, but now, since they really are v2 specifc, they are included here.
977
super(v2AuthTokenMiddlewareTest, self).setUp()
980
'uuid_token_default': client_fixtures.UUID_TOKEN_DEFAULT,
981
'uuid_token_unscoped': client_fixtures.UUID_TOKEN_UNSCOPED,
982
'signed_token_scoped': client_fixtures.SIGNED_TOKEN_SCOPED,
983
'signed_token_scoped_expired':
984
client_fixtures.SIGNED_TOKEN_SCOPED_EXPIRED,
985
'revoked_token': client_fixtures.REVOKED_TOKEN,
986
'revoked_token_hash': client_fixtures.REVOKED_TOKEN_HASH
989
httpretty.httpretty.reset()
992
httpretty.register_uri(httpretty.GET,
994
body=VERSION_LIST_v2,
997
httpretty.register_uri(httpretty.POST,
998
"%s/v2.0/tokens" % BASE_URI,
999
body=FAKE_ADMIN_TOKEN)
1001
httpretty.register_uri(httpretty.GET,
1002
"%s/v2.0/tokens/revoked" % BASE_URI,
1003
body=client_fixtures.SIGNED_REVOCATION_LIST,
1006
for token in (client_fixtures.UUID_TOKEN_DEFAULT,
1007
client_fixtures.UUID_TOKEN_UNSCOPED,
1008
client_fixtures.UUID_TOKEN_NO_SERVICE_CATALOG):
1009
httpretty.register_uri(httpretty.GET,
1010
"%s/v2.0/tokens/%s" % (BASE_URI, token),
1012
client_fixtures.JSON_TOKEN_RESPONSES[token])
1014
httpretty.register_uri(httpretty.GET,
1015
'%s/v2.0/tokens/%s' % (BASE_URI, ERROR_TOKEN),
1016
body=network_error_response)
1018
self.set_middleware()
1022
super(v2AuthTokenMiddlewareTest, self).tearDown()
1024
def assert_unscoped_default_tenant_auto_scopes(self, token):
1025
"""Unscoped v2 requests with a default tenant should "auto-scope."
1027
The implied scope is the user's tenant ID.
1030
req = webob.Request.blank('/')
1031
req.headers['X-Auth-Token'] = token
1032
body = self.middleware(req.environ, self.start_fake_response)
1033
self.assertEqual(self.response_status, 200)
1034
self.assertEqual(body, ['SUCCESS'])
1035
self.assertTrue('keystone.token_info' in req.environ)
1037
def assert_valid_last_url(self, token_id):
1038
self.assertLastPath("/testadmin/v2.0/tokens/%s" % token_id)
1040
def test_default_tenant_uuid_token(self):
1041
self.assert_unscoped_default_tenant_auto_scopes(
1042
client_fixtures.UUID_TOKEN_DEFAULT)
1044
def test_default_tenant_signed_token(self):
1045
self.assert_unscoped_default_tenant_auto_scopes(
1046
client_fixtures.SIGNED_TOKEN_SCOPED)
1048
def assert_unscoped_token_receives_401(self, token):
1049
"""Unscoped requests with no default tenant ID should be rejected."""
1050
req = webob.Request.blank('/')
1051
req.headers['X-Auth-Token'] = token
1052
self.middleware(req.environ, self.start_fake_response)
1053
self.assertEqual(self.response_status, 401)
1054
self.assertEqual(self.response_headers['WWW-Authenticate'],
1055
"Keystone uri='https://keystone.example.com:1234'")
1057
def test_unscoped_uuid_token_receives_401(self):
1058
self.assert_unscoped_token_receives_401(
1059
client_fixtures.UUID_TOKEN_UNSCOPED)
1061
def test_unscoped_pki_token_receives_401(self):
1062
self.assert_unscoped_token_receives_401(
1063
client_fixtures.SIGNED_TOKEN_UNSCOPED)
1065
def test_request_prevent_service_catalog_injection(self):
1066
req = webob.Request.blank('/')
1067
req.headers['X-Service-Catalog'] = '[]'
1068
req.headers['X-Auth-Token'] = \
1069
client_fixtures.UUID_TOKEN_NO_SERVICE_CATALOG
1070
body = self.middleware(req.environ, self.start_fake_response)
1071
self.assertEqual(self.response_status, 200)
1072
self.assertFalse(req.headers.get('X-Service-Catalog'))
1073
self.assertEqual(body, ['SUCCESS'])
1076
class CrossVersionAuthTokenMiddlewareTest(BaseAuthTokenMiddlewareTest):
1079
def test_valid_uuid_request_forced_to_2_0(self):
1080
"""Test forcing auth_token to use lower api version.
1082
By installing the v3 http hander, auth_token will be get
1083
a version list that looks like a v3 server - from which it
1084
would normally chose v3.0 as the auth version. However, here
1085
we specify v2.0 in the configuration - which should force
1086
auth_token to use that version instead.
1090
'signing_dir': client_fixtures.CERTDIR,
1091
'auth_version': 'v2.0'
1094
httpretty.register_uri(httpretty.GET,
1096
body=VERSION_LIST_v3,
1099
httpretty.register_uri(httpretty.POST,
1100
"%s/v2.0/tokens" % BASE_URI,
1101
body=FAKE_ADMIN_TOKEN)
1103
token = client_fixtures.UUID_TOKEN_DEFAULT
1104
httpretty.register_uri(httpretty.GET,
1105
"%s/v2.0/tokens/%s" % (BASE_URI, token),
1107
client_fixtures.JSON_TOKEN_RESPONSES[token])
1109
self.set_middleware(conf=conf)
1111
# This tests will only work is auth_token has chosen to use the
1112
# lower, v2, api version
1113
req = webob.Request.blank('/')
1114
req.headers['X-Auth-Token'] = client_fixtures.UUID_TOKEN_DEFAULT
1115
self.middleware(req.environ, self.start_fake_response)
1116
self.assertEqual(self.response_status, 200)
1117
self.assertEqual("/testadmin/v2.0/tokens/%s" %
1118
client_fixtures.UUID_TOKEN_DEFAULT,
1119
httpretty.httpretty.last_request.path)
1122
class v3AuthTokenMiddlewareTest(BaseAuthTokenMiddlewareTest,
1123
CommonAuthTokenMiddlewareTest):
1124
"""Test auth_token middleware with v3 tokens.
1126
Re-execute the AuthTokenMiddlewareTest class tests, but with the
1127
the auth_token middleware configured to expect v3 tokens back from
1130
This is done by configuring the AuthTokenMiddlewareTest class via
1131
its Setup(), passing in v3 style data that will then be used by
1132
the tests themselves. This approach has been used to ensure we
1133
really are running the same tests for both v2 and v3 tokens.
1135
There a few additional specific test for v3 only:
1137
- We allow an unscoped token to be validated (as unscoped), where
1138
as for v2 tokens, the auth_token middleware is expected to try and
1139
auto-scope it (and fail if there is no default tenant)
1140
- Domain scoped tokens
1142
Since we don't specify an auth version for auth_token to use, by
1143
definition we are thefore implicitely testing that it will use
1144
the highest available auth version, i.e. v3.0
1148
super(v3AuthTokenMiddlewareTest, self).setUp(
1149
auth_version='v3.0',
1153
'uuid_token_default': client_fixtures.v3_UUID_TOKEN_DEFAULT,
1154
'uuid_token_unscoped': client_fixtures.v3_UUID_TOKEN_UNSCOPED,
1155
'signed_token_scoped': client_fixtures.SIGNED_v3_TOKEN_SCOPED,
1156
'signed_token_scoped_expired':
1157
client_fixtures.SIGNED_TOKEN_SCOPED_EXPIRED,
1158
'revoked_token': client_fixtures.REVOKED_v3_TOKEN,
1159
'revoked_token_hash': client_fixtures.REVOKED_v3_TOKEN_HASH
1162
httpretty.httpretty.reset()
1165
httpretty.register_uri(httpretty.GET,
1167
body=VERSION_LIST_v3,
1170
# TODO(jamielennox): auth_token middleware uses a v2 admin token
1171
# regardless of the auth_version that is set.
1172
httpretty.register_uri(httpretty.POST,
1173
"%s/v2.0/tokens" % BASE_URI,
1174
body=FAKE_ADMIN_TOKEN)
1176
# TODO(jamielennox): there is no v3 revocation url yet, it uses v2
1177
httpretty.register_uri(httpretty.GET,
1178
"%s/v2.0/tokens/revoked" % BASE_URI,
1179
body=client_fixtures.SIGNED_REVOCATION_LIST,
1182
httpretty.register_uri(httpretty.GET,
1183
"%s/v3/auth/tokens" % BASE_URI,
1184
body=self.token_response)
1186
self.set_middleware()
1190
super(v3AuthTokenMiddlewareTest, self).tearDown()
1192
def token_response(self, request, uri, headers):
1193
auth_id = request.headers.get('X-Auth-Token')
1194
token_id = request.headers.get('X-Subject-Token')
1195
self.assertEqual(auth_id, FAKE_ADMIN_TOKEN_ID)
1196
headers.pop('status')
1201
if token_id == ERROR_TOKEN:
1202
raise auth_token.NetworkError("Network connection error.")
1205
response = client_fixtures.JSON_TOKEN_RESPONSES[token_id]
1209
return status, headers, response
1211
def assert_valid_last_url(self, token_id):
1212
self.assertLastPath('/testadmin/v3/auth/tokens')
1214
def test_valid_unscoped_uuid_request(self):
1215
# Remove items that won't be in an unscoped token
1216
delta_expected_env = {
1217
'HTTP_X_PROJECT_ID': None,
1218
'HTTP_X_PROJECT_NAME': None,
1219
'HTTP_X_PROJECT_DOMAIN_ID': None,
1220
'HTTP_X_PROJECT_DOMAIN_NAME': None,
1221
'HTTP_X_TENANT_ID': None,
1222
'HTTP_X_TENANT_NAME': None,
1224
'HTTP_X_TENANT': None,
1227
self.set_middleware(expected_env=delta_expected_env)
1228
self.assert_valid_request_200(client_fixtures.v3_UUID_TOKEN_UNSCOPED,
1230
self.assertLastPath('/testadmin/v3/auth/tokens')
1232
def test_domain_scoped_uuid_request(self):
1233
# Modify items compared to default token for a domain scope
1234
delta_expected_env = {
1235
'HTTP_X_DOMAIN_ID': 'domain_id1',
1236
'HTTP_X_DOMAIN_NAME': 'domain_name1',
1237
'HTTP_X_PROJECT_ID': None,
1238
'HTTP_X_PROJECT_NAME': None,
1239
'HTTP_X_PROJECT_DOMAIN_ID': None,
1240
'HTTP_X_PROJECT_DOMAIN_NAME': None,
1241
'HTTP_X_TENANT_ID': None,
1242
'HTTP_X_TENANT_NAME': None,
1243
'HTTP_X_TENANT': None
1245
self.set_middleware(expected_env=delta_expected_env)
1246
self.assert_valid_request_200(
1247
client_fixtures.v3_UUID_TOKEN_DOMAIN_SCOPED)
1248
self.assertLastPath('/testadmin/v3/auth/tokens')
1251
class TokenEncodingTest(testtools.TestCase):
1252
def test_unquoted_token(self):
1253
self.assertEqual('foo%20bar', auth_token.safe_quote('foo bar'))
1255
def test_quoted_token(self):
1256
self.assertEqual('foo%20bar', auth_token.safe_quote('foo%20bar'))
1259
class TokenExpirationTest(BaseAuthTokenMiddlewareTest):
1261
super(TokenExpirationTest, self).setUp()
1262
timeutils.set_time_override()
1263
self.now = timeutils.utcnow()
1264
self.delta = datetime.timedelta(hours=1)
1265
self.one_hour_ago = timeutils.isotime(self.now - self.delta,
1267
self.one_hour_earlier = timeutils.isotime(self.now + self.delta,
1271
super(TokenExpirationTest, self).tearDown()
1272
timeutils.clear_time_override()
1274
def create_v2_token_fixture(self, expires=None):
1279
'expires': expires or self.one_hour_earlier,
1282
'name': 'tenant_name1',
1287
'name': 'user_name1',
1293
'serviceCatalog': {}
1299
def create_v3_token_fixture(self, expires=None):
1303
'expires_at': expires or self.one_hour_earlier,
1306
'name': 'user_name1',
1309
'name': 'domain_name1'
1314
'name': 'tenant_name1',
1317
'name': 'domain_name1'
1321
{'name': 'role1', 'id': 'Role1'},
1322
{'name': 'role2', 'id': 'Role2'},
1330
def test_no_data(self):
1332
self.assertRaises(auth_token.InvalidUserToken,
1333
auth_token.confirm_token_not_expired,
1336
def test_bad_data(self):
1337
data = {'my_happy_token_dict': 'woo'}
1338
self.assertRaises(auth_token.InvalidUserToken,
1339
auth_token.confirm_token_not_expired,
1342
def test_v2_token_not_expired(self):
1343
data = self.create_v2_token_fixture()
1344
expected_expires = data['access']['token']['expires']
1345
actual_expires = auth_token.confirm_token_not_expired(data)
1346
self.assertEqual(actual_expires, expected_expires)
1348
def test_v2_token_expired(self):
1349
data = self.create_v2_token_fixture(expires=self.one_hour_ago)
1350
self.assertRaises(auth_token.InvalidUserToken,
1351
auth_token.confirm_token_not_expired,
1354
def test_v2_token_with_timezone_offset_not_expired(self):
1355
current_time = timeutils.parse_isotime('2000-01-01T00:01:10.000123Z')
1356
current_time = timeutils.normalize_time(current_time)
1357
timeutils.set_time_override(current_time)
1358
data = self.create_v2_token_fixture(
1359
expires='2000-01-01T00:05:10.000123-05:00')
1360
expected_expires = '2000-01-01T05:05:10.000123Z'
1361
actual_expires = auth_token.confirm_token_not_expired(data)
1362
self.assertEqual(actual_expires, expected_expires)
1364
def test_v2_token_with_timezone_offset_expired(self):
1365
current_time = timeutils.parse_isotime('2000-01-01T00:01:10.000123Z')
1366
current_time = timeutils.normalize_time(current_time)
1367
timeutils.set_time_override(current_time)
1368
data = self.create_v2_token_fixture(
1369
expires='2000-01-01T00:05:10.000123+05:00')
1370
data['access']['token']['expires'] = '2000-01-01T00:05:10.000123+05:00'
1371
self.assertRaises(auth_token.InvalidUserToken,
1372
auth_token.confirm_token_not_expired,
1375
def test_v3_token_not_expired(self):
1376
data = self.create_v3_token_fixture()
1377
expected_expires = data['token']['expires_at']
1378
actual_expires = auth_token.confirm_token_not_expired(data)
1379
self.assertEqual(actual_expires, expected_expires)
1381
def test_v3_token_expired(self):
1382
data = self.create_v3_token_fixture(expires=self.one_hour_ago)
1383
self.assertRaises(auth_token.InvalidUserToken,
1384
auth_token.confirm_token_not_expired,
1387
def test_v3_token_with_timezone_offset_not_expired(self):
1388
current_time = timeutils.parse_isotime('2000-01-01T00:01:10.000123Z')
1389
current_time = timeutils.normalize_time(current_time)
1390
timeutils.set_time_override(current_time)
1391
data = self.create_v3_token_fixture(
1392
expires='2000-01-01T00:05:10.000123-05:00')
1393
expected_expires = '2000-01-01T05:05:10.000123Z'
1395
actual_expires = auth_token.confirm_token_not_expired(data)
1396
self.assertEqual(actual_expires, expected_expires)
1398
def test_v3_token_with_timezone_offset_expired(self):
1399
current_time = timeutils.parse_isotime('2000-01-01T00:01:10.000123Z')
1400
current_time = timeutils.normalize_time(current_time)
1401
timeutils.set_time_override(current_time)
1402
data = self.create_v3_token_fixture(
1403
expires='2000-01-01T00:05:10.000123+05:00')
1404
self.assertRaises(auth_token.InvalidUserToken,
1405
auth_token.confirm_token_not_expired,
1408
def test_cached_token_not_expired(self):
1411
self.set_middleware()
1412
self.middleware._init_cache({})
1413
some_time_later = timeutils.strtime(at=(self.now + self.delta))
1414
expires = some_time_later
1415
self.middleware._cache_put(token, data, expires)
1416
self.assertEqual(self.middleware._cache_get(token), data)
1418
def test_cached_token_not_expired_with_old_style_nix_timestamp(self):
1419
"""Ensure we cannot retrieve a token from the cache.
1421
Getting a token from the cache should return None when the token data
1422
in the cache stores the expires time as a *nix style timestamp.
1427
self.set_middleware()
1428
self.middleware._init_cache({})
1429
some_time_later = self.now + self.delta
1430
# Store a unix timestamp in the cache.
1431
expires = calendar.timegm(some_time_later.timetuple())
1432
self.middleware._cache_put(token, data, expires)
1433
self.assertIsNone(self.middleware._cache_get(token))
1435
def test_cached_token_expired(self):
1438
self.set_middleware()
1439
self.middleware._init_cache({})
1440
some_time_earlier = timeutils.strtime(at=(self.now - self.delta))
1441
expires = some_time_earlier
1442
self.middleware._cache_put(token, data, expires)
1443
self.assertIsNone(self.middleware._cache_get(token))
1445
def test_cached_token_with_timezone_offset_not_expired(self):
1448
self.set_middleware()
1449
self.middleware._init_cache({})
1450
timezone_offset = datetime.timedelta(hours=2)
1451
some_time_later = self.now - timezone_offset + self.delta
1452
expires = timeutils.strtime(some_time_later) + '-02:00'
1453
self.middleware._cache_put(token, data, expires)
1454
self.assertEqual(self.middleware._cache_get(token), data)
1456
def test_cached_token_with_timezone_offset_expired(self):
1459
self.set_middleware()
1460
self.middleware._init_cache({})
1461
timezone_offset = datetime.timedelta(hours=2)
1462
some_time_earlier = self.now - timezone_offset - self.delta
1463
expires = timeutils.strtime(some_time_earlier) + '-02:00'
1464
self.middleware._cache_put(token, data, expires)
1465
self.assertIsNone(self.middleware._cache_get(token))