30
30
from keystone import test
33
CERTDIR = test.rootdir("examples/pki/certs")
34
KEYDIR = test.rootdir("examples/pki/private")
35
CMSDIR = test.rootdir("examples/pki/cms")
36
SIGNING_CERT = os.path.join(CERTDIR, 'signing_cert.pem')
37
SIGNING_KEY = os.path.join(KEYDIR, 'signing_key.pem')
38
CA = os.path.join(CERTDIR, 'ca.pem')
33
40
REVOCATION_LIST = None
34
41
REVOKED_TOKEN = None
35
42
REVOKED_TOKEN_HASH = None
36
43
SIGNED_REVOCATION_LIST = None
37
44
SIGNED_TOKEN_SCOPED = None
38
45
SIGNED_TOKEN_UNSCOPED = None
46
SIGNED_TOKEN_SCOPED_KEY = None
47
SIGNED_TOKEN_UNSCOPED_KEY = None
39
49
VALID_SIGNED_REVOCATION_LIST = None
41
51
UUID_TOKEN_DEFAULT = "ec6c0710ec2f471498484c1b53ab4f9d"
154
FAKE_RESPONSE_STACK = []
141
157
# The data for these tests are signed using openssl and are stored in files
142
158
# in the signing subdirectory. In order to keep the values consistent between
143
159
# the tests and the signed documents, we read them in for use in the tests.
144
160
def setUpModule(self):
145
signing_path = os.path.join(os.path.dirname(__file__), 'signing')
161
signing_path = CMSDIR
146
162
with open(os.path.join(signing_path, 'auth_token_scoped.pem')) as f:
147
163
self.SIGNED_TOKEN_SCOPED = cms.cms_to_token(f.read())
148
164
with open(os.path.join(signing_path, 'auth_token_unscoped.pem')) as f:
155
171
with open(os.path.join(signing_path, 'revocation_list.pem')) as f:
156
172
self.VALID_SIGNED_REVOCATION_LIST = jsonutils.dumps(
157
173
{'signed': f.read()})
174
self.SIGNED_TOKEN_SCOPED_KEY =\
175
cms.cms_hash_token(self.SIGNED_TOKEN_SCOPED)
176
self.SIGNED_TOKEN_UNSCOPED_KEY =\
177
cms.cms_hash_token(self.SIGNED_TOKEN_UNSCOPED)
159
self.TOKEN_RESPONSES[self.SIGNED_TOKEN_SCOPED] = {
179
self.TOKEN_RESPONSES[self.SIGNED_TOKEN_SCOPED_KEY] = {
162
'id': self.SIGNED_TOKEN_SCOPED,
182
'id': self.SIGNED_TOKEN_SCOPED_KEY,
165
185
'id': 'user_id1',
198
218
self.token_expiration = None
200
220
def get(self, key):
201
data = TOKEN_RESPONSES[SIGNED_TOKEN_SCOPED].copy()
221
data = TOKEN_RESPONSES[SIGNED_TOKEN_SCOPED_KEY].copy()
202
222
if not data or key != "tokens/%s" % (data['access']['token']['id']):
204
224
if not self.token_expiration:
246
class FakeStackHTTPConnection(object):
248
def __init__(self, *args, **kwargs):
251
def getresponse(self):
252
if len(FAKE_RESPONSE_STACK):
253
return FAKE_RESPONSE_STACK.pop()
254
return FakeHTTPResponse(500, jsonutils.dumps('UNEXPECTED RESPONSE'))
256
def request(self, *_args, **_kwargs):
226
263
class FakeHTTPConnection(object):
228
265
last_requested_url = ''
307
344
'auth_host': 'keystone.example.com',
308
345
'auth_port': 1234,
309
346
'auth_admin_prefix': '/testadmin',
310
'signing_dir': 'signing',
347
'signing_dir': CERTDIR,
313
350
self.middleware = auth_token.AuthProtocol(FakeApp(expected_env), conf)
317
354
self.response_status = None
318
355
self.response_headers = None
319
356
self.middleware.revoked_file_name = tempfile.mkstemp()[1]
320
self.middleware.token_revocation_list_cache_timeout =\
321
datetime.timedelta(days=1)
357
cache_timeout = datetime.timedelta(days=1)
358
self.middleware.token_revocation_list_cache_timeout = cache_timeout
322
359
self.middleware.token_revocation_list = jsonutils.dumps(
323
360
{"revoked": [], "extra": "success"})
325
globals()['SIGNED_REVOCATION_LIST'] =\
326
globals()['VALID_SIGNED_REVOCATION_LIST']
362
signed_list = 'SIGNED_REVOCATION_LIST'
363
valid_signed_list = 'VALID_SIGNED_REVOCATION_LIST'
364
globals()[signed_list] = globals()[valid_signed_list]
328
366
super(BaseAuthTokenMiddlewareTest, self).setUp()
339
377
self.response_headers = dict(headers)
380
class StackResponseAuthTokenMiddlewareTest(BaseAuthTokenMiddlewareTest):
381
"""Auth Token middleware test setup that allows the tests to define
382
a stack of responses to HTTP requests in the test and get those
383
responses back in sequence for testing.
387
resp1 = FakeHTTPResponse(401, jsonutils.dumps(''))
388
resp2 = FakeHTTPResponse(200, jsonutils.dumps({
390
'token': {'id': 'admin_token2'},
393
FAKE_RESPONSE_STACK.append(resp1)
394
FAKE_RESPONSE_STACK.append(resp2)
396
... do your testing code here ...
400
def setUp(self, expected_env=None):
401
super(StackResponseAuthTokenMiddlewareTest, self).setUp(expected_env)
402
self.middleware.http_client_class = FakeStackHTTPConnection
404
def test_fetch_revocation_list_with_expire(self):
405
# first response to revocation list should return 401 Unauthorized
406
# to pretend to be an expired token
407
resp1 = FakeHTTPResponse(200, jsonutils.dumps({
409
'token': {'id': 'admin_token2'},
412
resp2 = FakeHTTPResponse(401, jsonutils.dumps(''))
413
resp3 = FakeHTTPResponse(200, jsonutils.dumps({
415
'token': {'id': 'admin_token2'},
418
resp4 = FakeHTTPResponse(200, SIGNED_REVOCATION_LIST)
420
# first get_admin_token() call
421
FAKE_RESPONSE_STACK.append(resp1)
422
# request revocation list, get "unauthorized" due to simulated expired
424
FAKE_RESPONSE_STACK.append(resp2)
425
# request a new admin_token
426
FAKE_RESPONSE_STACK.append(resp3)
427
# request revocation list, get the revocation list properly
428
FAKE_RESPONSE_STACK.append(resp4)
430
fetched_list = jsonutils.loads(self.middleware.fetch_revocation_list())
431
self.assertEqual(fetched_list, REVOCATION_LIST)
342
434
class DiabloAuthTokenMiddlewareTest(BaseAuthTokenMiddlewareTest):
343
435
"""Auth Token middleware should understand Diablo keystone responses."""
564
656
self.assertEqual(self.response_status, 200)
565
657
self.assertFalse(req.headers.get('X-Service-Catalog'))
566
658
self.assertEqual(body, ['SUCCESS'])
660
def test_will_expire_soon(self):
661
tenseconds = datetime.datetime.utcnow() + datetime.timedelta(
663
self.assertTrue(auth_token.will_expire_soon(tenseconds))
664
fortyseconds = datetime.datetime.utcnow() + datetime.timedelta(
666
self.assertFalse(auth_token.will_expire_soon(fortyseconds))