~ubuntu-cloud-archive/ubuntu/precise/keystone/trunk

« back to all changes in this revision

Viewing changes to tests/test_auth_token_middleware.py

  • Committer: Chuck Short
  • Date: 2012-11-26 19:24:17 UTC
  • mfrom: (34.1.3 raring-proposed)
  • Revision ID: zulcss@ubuntu.com-20121126192417-zqx5ntbbmcduz3wb
* New upstream release for the Ubuntu Cloud Archive.
* debian/tests/test_overrides.conf: Update for Grizzly test suite.
* debian/control: Drop python-nova.
* New upstream release.
* debian/rules: FTBFS if there is a missing binary.
* debian/rules: Temporarily pass the tests since you need to run
  keystone in order to run the tests.
* debian/patches/*: Refrehsed.
* New upstream release.
* debian/control: Ensure keystoneclient is upgraded with keystone,
  require python-keystoneclient >= 1:0.1.3. (LP: #1073273)

Show diffs side-by-side

added added

removed removed

Lines of Context:
30
30
from keystone import test
31
31
 
32
32
 
 
33
CERTDIR = test.rootdir("examples/pki/certs")
 
34
KEYDIR = test.rootdir("examples/pki/private")
 
35
CMSDIR = test.rootdir("examples/pki/cms")
 
36
SIGNING_CERT = os.path.join(CERTDIR, 'signing_cert.pem')
 
37
SIGNING_KEY = os.path.join(KEYDIR, 'signing_key.pem')
 
38
CA = os.path.join(CERTDIR, 'ca.pem')
 
39
 
33
40
REVOCATION_LIST = None
34
41
REVOKED_TOKEN = None
35
42
REVOKED_TOKEN_HASH = None
36
43
SIGNED_REVOCATION_LIST = None
37
44
SIGNED_TOKEN_SCOPED = None
38
45
SIGNED_TOKEN_UNSCOPED = None
 
46
SIGNED_TOKEN_SCOPED_KEY = None
 
47
SIGNED_TOKEN_UNSCOPED_KEY = None
 
48
 
39
49
VALID_SIGNED_REVOCATION_LIST = None
40
50
 
41
51
UUID_TOKEN_DEFAULT = "ec6c0710ec2f471498484c1b53ab4f9d"
69
79
        'access': {
70
80
            'token': {
71
81
                'id': UUID_TOKEN_DEFAULT,
 
82
                'expires': '2999-01-01T00:00:10Z',
72
83
                'tenant': {
73
84
                    'id': 'tenant_id1',
74
85
                    'name': 'tenant_name1',
89
100
        'access': {
90
101
            'token': {
91
102
                'id': VALID_DIABLO_TOKEN,
 
103
                'expires': '2999-01-01T00:00:10',
92
104
                'tenantId': 'tenant_id1',
93
105
            },
94
106
            'user': {
105
117
        'access': {
106
118
            'token': {
107
119
                'id': UUID_TOKEN_UNSCOPED,
 
120
                'expires': '2999-01-01T00:00:10Z',
108
121
            },
109
122
            'user': {
110
123
                'id': 'user_id1',
120
133
        'access': {
121
134
            'token': {
122
135
                'id': 'valid-token',
 
136
                'expires': '2999-01-01T00:00:10Z',
123
137
                'tenant': {
124
138
                    'id': 'tenant_id1',
125
139
                    'name': 'tenant_name1',
137
151
    },
138
152
}
139
153
 
 
154
FAKE_RESPONSE_STACK = []
 
155
 
140
156
 
141
157
# The data for these tests are signed using openssl and are stored in files
142
158
# in the signing subdirectory.  In order to keep the values consistent between
143
159
# the tests and the signed documents, we read them in for use in the tests.
144
160
def setUpModule(self):
145
 
    signing_path = os.path.join(os.path.dirname(__file__), 'signing')
 
161
    signing_path = CMSDIR
146
162
    with open(os.path.join(signing_path, 'auth_token_scoped.pem')) as f:
147
163
        self.SIGNED_TOKEN_SCOPED = cms.cms_to_token(f.read())
148
164
    with open(os.path.join(signing_path, 'auth_token_unscoped.pem')) as f:
155
171
    with open(os.path.join(signing_path, 'revocation_list.pem')) as f:
156
172
        self.VALID_SIGNED_REVOCATION_LIST = jsonutils.dumps(
157
173
            {'signed': f.read()})
 
174
    self.SIGNED_TOKEN_SCOPED_KEY =\
 
175
        cms.cms_hash_token(self.SIGNED_TOKEN_SCOPED)
 
176
    self.SIGNED_TOKEN_UNSCOPED_KEY =\
 
177
        cms.cms_hash_token(self.SIGNED_TOKEN_UNSCOPED)
158
178
 
159
 
    self.TOKEN_RESPONSES[self.SIGNED_TOKEN_SCOPED] = {
 
179
    self.TOKEN_RESPONSES[self.SIGNED_TOKEN_SCOPED_KEY] = {
160
180
        'access': {
161
181
            'token': {
162
 
                'id': self.SIGNED_TOKEN_SCOPED,
 
182
                'id': self.SIGNED_TOKEN_SCOPED_KEY,
163
183
            },
164
184
            'user': {
165
185
                'id': 'user_id1',
174
194
        },
175
195
    }
176
196
 
177
 
    self.TOKEN_RESPONSES[self.SIGNED_TOKEN_UNSCOPED] = {
 
197
    self.TOKEN_RESPONSES[SIGNED_TOKEN_UNSCOPED_KEY] = {
178
198
        'access': {
179
199
            'token': {
180
 
                'id': self.SIGNED_TOKEN_UNSCOPED,
 
200
                'id': SIGNED_TOKEN_UNSCOPED_KEY,
181
201
            },
182
202
            'user': {
183
203
                'id': 'user_id1',
198
218
        self.token_expiration = None
199
219
 
200
220
    def get(self, key):
201
 
        data = TOKEN_RESPONSES[SIGNED_TOKEN_SCOPED].copy()
 
221
        data = TOKEN_RESPONSES[SIGNED_TOKEN_SCOPED_KEY].copy()
202
222
        if not data or key != "tokens/%s" % (data['access']['token']['id']):
203
223
            return
204
224
        if not self.token_expiration:
223
243
        return self.body
224
244
 
225
245
 
 
246
class FakeStackHTTPConnection(object):
 
247
 
 
248
    def __init__(self, *args, **kwargs):
 
249
        pass
 
250
 
 
251
    def getresponse(self):
 
252
        if len(FAKE_RESPONSE_STACK):
 
253
            return FAKE_RESPONSE_STACK.pop()
 
254
        return FakeHTTPResponse(500, jsonutils.dumps('UNEXPECTED RESPONSE'))
 
255
 
 
256
    def request(self, *_args, **_kwargs):
 
257
        pass
 
258
 
 
259
    def close(self):
 
260
        pass
 
261
 
 
262
 
226
263
class FakeHTTPConnection(object):
227
264
 
228
265
    last_requested_url = ''
307
344
            'auth_host': 'keystone.example.com',
308
345
            'auth_port': 1234,
309
346
            'auth_admin_prefix': '/testadmin',
310
 
            'signing_dir': 'signing',
 
347
            'signing_dir': CERTDIR,
311
348
        }
312
349
 
313
350
        self.middleware = auth_token.AuthProtocol(FakeApp(expected_env), conf)
317
354
        self.response_status = None
318
355
        self.response_headers = None
319
356
        self.middleware.revoked_file_name = tempfile.mkstemp()[1]
320
 
        self.middleware.token_revocation_list_cache_timeout =\
321
 
            datetime.timedelta(days=1)
 
357
        cache_timeout = datetime.timedelta(days=1)
 
358
        self.middleware.token_revocation_list_cache_timeout = cache_timeout
322
359
        self.middleware.token_revocation_list = jsonutils.dumps(
323
360
            {"revoked": [], "extra": "success"})
324
361
 
325
 
        globals()['SIGNED_REVOCATION_LIST'] =\
326
 
            globals()['VALID_SIGNED_REVOCATION_LIST']
 
362
        signed_list = 'SIGNED_REVOCATION_LIST'
 
363
        valid_signed_list = 'VALID_SIGNED_REVOCATION_LIST'
 
364
        globals()[signed_list] = globals()[valid_signed_list]
327
365
 
328
366
        super(BaseAuthTokenMiddlewareTest, self).setUp()
329
367
 
339
377
        self.response_headers = dict(headers)
340
378
 
341
379
 
 
380
class StackResponseAuthTokenMiddlewareTest(BaseAuthTokenMiddlewareTest):
 
381
    """Auth Token middleware test setup that allows the tests to define
 
382
    a stack of responses to HTTP requests in the test and get those
 
383
    responses back in sequence for testing.
 
384
 
 
385
    Example::
 
386
 
 
387
        resp1 = FakeHTTPResponse(401, jsonutils.dumps(''))
 
388
        resp2 = FakeHTTPResponse(200, jsonutils.dumps({
 
389
            'access': {
 
390
                'token': {'id': 'admin_token2'},
 
391
            },
 
392
            })
 
393
        FAKE_RESPONSE_STACK.append(resp1)
 
394
        FAKE_RESPONSE_STACK.append(resp2)
 
395
 
 
396
        ... do your testing code here ...
 
397
 
 
398
    """
 
399
 
 
400
    def setUp(self, expected_env=None):
 
401
        super(StackResponseAuthTokenMiddlewareTest, self).setUp(expected_env)
 
402
        self.middleware.http_client_class = FakeStackHTTPConnection
 
403
 
 
404
    def test_fetch_revocation_list_with_expire(self):
 
405
        # first response to revocation list should return 401 Unauthorized
 
406
        # to pretend to be an expired token
 
407
        resp1 = FakeHTTPResponse(200, jsonutils.dumps({
 
408
            'access': {
 
409
                'token': {'id': 'admin_token2'},
 
410
            },
 
411
        }))
 
412
        resp2 = FakeHTTPResponse(401, jsonutils.dumps(''))
 
413
        resp3 = FakeHTTPResponse(200, jsonutils.dumps({
 
414
            'access': {
 
415
                'token': {'id': 'admin_token2'},
 
416
            },
 
417
        }))
 
418
        resp4 = FakeHTTPResponse(200, SIGNED_REVOCATION_LIST)
 
419
 
 
420
        # first get_admin_token() call
 
421
        FAKE_RESPONSE_STACK.append(resp1)
 
422
        # request revocation list, get "unauthorized" due to simulated expired
 
423
        # token
 
424
        FAKE_RESPONSE_STACK.append(resp2)
 
425
        # request a new admin_token
 
426
        FAKE_RESPONSE_STACK.append(resp3)
 
427
        # request revocation list, get the revocation list properly
 
428
        FAKE_RESPONSE_STACK.append(resp4)
 
429
 
 
430
        fetched_list = jsonutils.loads(self.middleware.fetch_revocation_list())
 
431
        self.assertEqual(fetched_list, REVOCATION_LIST)
 
432
 
 
433
 
342
434
class DiabloAuthTokenMiddlewareTest(BaseAuthTokenMiddlewareTest):
343
435
    """Auth Token middleware should understand Diablo keystone responses."""
344
436
    def setUp(self):
564
656
        self.assertEqual(self.response_status, 200)
565
657
        self.assertFalse(req.headers.get('X-Service-Catalog'))
566
658
        self.assertEqual(body, ['SUCCESS'])
 
659
 
 
660
    def test_will_expire_soon(self):
 
661
        tenseconds = datetime.datetime.utcnow() + datetime.timedelta(
 
662
            seconds=10)
 
663
        self.assertTrue(auth_token.will_expire_soon(tenseconds))
 
664
        fortyseconds = datetime.datetime.utcnow() + datetime.timedelta(
 
665
            seconds=40)
 
666
        self.assertFalse(auth_token.will_expire_soon(fortyseconds))