~ubuntu-branches/ubuntu/quantal/keystone/quantal-security

« back to all changes in this revision

Viewing changes to .pc/CVE-2013-2104.patch/tests/test_auth_token_middleware.py

  • Committer: Package Import Robot
  • Author(s): Jamie Strandboge
  • Date: 2013-06-13 13:42:44 UTC
  • mfrom: (35.1.6 quantal-proposed)
  • Revision ID: package-import@ubuntu.com-20130613134244-mle4ueu39urzeknr
Tags: 2012.2.4-0ubuntu3.1
* SECURITY UPDATE: fix auth_token middleware neglects to check expiry of
  signed token when using PKI
  - debian/patches/CVE-2013-2104.patch: explicitly check the expiry on the
    tokens, and reject tokens that have expired. Also update test data
  - CVE-2013-2104
  - LP: #1179615
* debian/patches/fix-testsuite-for-2038-problem.patch: Adjust json example
  cert data to use 2037 instead of 2112 and regenerate the certs. Also
  adjust token expiry data to use 2037 instead of 2999.
* SECURITY UPDATE: fix authentication bypass when using LDAP backend
  - debian/patches/CVE-2013-2157.patch: identity/backends/ldap/core.py is
    adjusted to raise an assertion for invalid password when using LDAP and
    an empty password is submitted
  - CVE-2013-2157
  - LP: #1187305

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# vim: tabstop=4 shiftwidth=4 softtabstop=4
 
2
 
 
3
# Copyright 2012 OpenStack LLC
 
4
#
 
5
# Licensed under the Apache License, Version 2.0 (the "License"); you may
 
6
# not use this file except in compliance with the License. You may obtain
 
7
# a copy of the License at
 
8
#
 
9
#      http://www.apache.org/licenses/LICENSE-2.0
 
10
#
 
11
# Unless required by applicable law or agreed to in writing, software
 
12
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
 
13
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
 
14
# License for the specific language governing permissions and limitations
 
15
# under the License.
 
16
 
 
17
import datetime
 
18
import iso8601
 
19
import os
 
20
import string
 
21
import tempfile
 
22
 
 
23
import webob
 
24
 
 
25
from keystone.common import cms
 
26
from keystone.common import utils
 
27
from keystone.middleware import auth_token
 
28
from keystone.openstack.common import jsonutils
 
29
from keystone.openstack.common import timeutils
 
30
from keystone import test
 
31
 
 
32
 
 
33
REVOCATION_LIST = None
 
34
REVOKED_TOKEN = None
 
35
REVOKED_TOKEN_HASH = None
 
36
SIGNED_REVOCATION_LIST = None
 
37
SIGNED_TOKEN_SCOPED = None
 
38
SIGNED_TOKEN_UNSCOPED = None
 
39
SIGNED_TOKEN_SCOPED_KEY = None
 
40
SIGNED_TOKEN_UNSCOPED_KEY = None
 
41
 
 
42
VALID_SIGNED_REVOCATION_LIST = None
 
43
 
 
44
UUID_TOKEN_DEFAULT = "ec6c0710ec2f471498484c1b53ab4f9d"
 
45
UUID_TOKEN_NO_SERVICE_CATALOG = '8286720fbe4941e69fa8241723bb02df'
 
46
UUID_TOKEN_UNSCOPED = '731f903721c14827be7b2dc912af7776'
 
47
VALID_DIABLO_TOKEN = 'b0cf19b55dbb4f20a6ee18e6c6cf1726'
 
48
 
 
49
INVALID_SIGNED_TOKEN = string.replace(
 
50
    """AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA
 
51
BBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBB
 
52
CCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCC
 
53
DDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDD
 
54
EEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEE
 
55
FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF
 
56
0000000000000000000000000000000000000000000000000000000000000000
 
57
1111111111111111111111111111111111111111111111111111111111111111
 
58
2222222222222222222222222222222222222222222222222222222222222222
 
59
3333333333333333333333333333333333333333333333333333333333333333
 
60
4444444444444444444444444444444444444444444444444444444444444444
 
61
5555555555555555555555555555555555555555555555555555555555555555
 
62
6666666666666666666666666666666666666666666666666666666666666666
 
63
7777777777777777777777777777777777777777777777777777777777777777
 
64
8888888888888888888888888888888888888888888888888888888888888888
 
65
9999999999999999999999999999999999999999999999999999999999999999
 
66
0000000000000000000000000000000000000000000000000000000000000000
 
67
xg==""", "\n", "")
 
68
 
 
69
# JSON responses keyed by token ID
 
70
TOKEN_RESPONSES = {
 
71
    UUID_TOKEN_DEFAULT: {
 
72
        'access': {
 
73
            'token': {
 
74
                'id': UUID_TOKEN_DEFAULT,
 
75
                'expires': '2999-01-01T00:00:10Z',
 
76
                'tenant': {
 
77
                    'id': 'tenant_id1',
 
78
                    'name': 'tenant_name1',
 
79
                },
 
80
            },
 
81
            'user': {
 
82
                'id': 'user_id1',
 
83
                'name': 'user_name1',
 
84
                'roles': [
 
85
                    {'name': 'role1'},
 
86
                    {'name': 'role2'},
 
87
                ],
 
88
            },
 
89
            'serviceCatalog': {}
 
90
        },
 
91
    },
 
92
    VALID_DIABLO_TOKEN: {
 
93
        'access': {
 
94
            'token': {
 
95
                'id': VALID_DIABLO_TOKEN,
 
96
                'expires': '2999-01-01T00:00:10',
 
97
                'tenantId': 'tenant_id1',
 
98
            },
 
99
            'user': {
 
100
                'id': 'user_id1',
 
101
                'name': 'user_name1',
 
102
                'roles': [
 
103
                    {'name': 'role1'},
 
104
                    {'name': 'role2'},
 
105
                ],
 
106
            },
 
107
        },
 
108
    },
 
109
    UUID_TOKEN_UNSCOPED: {
 
110
        'access': {
 
111
            'token': {
 
112
                'id': UUID_TOKEN_UNSCOPED,
 
113
                'expires': '2999-01-01T00:00:10Z',
 
114
            },
 
115
            'user': {
 
116
                'id': 'user_id1',
 
117
                'name': 'user_name1',
 
118
                'roles': [
 
119
                    {'name': 'role1'},
 
120
                    {'name': 'role2'},
 
121
                ],
 
122
            },
 
123
        },
 
124
    },
 
125
    UUID_TOKEN_NO_SERVICE_CATALOG: {
 
126
        'access': {
 
127
            'token': {
 
128
                'id': 'valid-token',
 
129
                'expires': '2999-01-01T00:00:10Z',
 
130
                'tenant': {
 
131
                    'id': 'tenant_id1',
 
132
                    'name': 'tenant_name1',
 
133
                },
 
134
            },
 
135
            'user': {
 
136
                'id': 'user_id1',
 
137
                'name': 'user_name1',
 
138
                'roles': [
 
139
                    {'name': 'role1'},
 
140
                    {'name': 'role2'},
 
141
                ],
 
142
            }
 
143
        },
 
144
    },
 
145
}
 
146
 
 
147
FAKE_RESPONSE_STACK = []
 
148
 
 
149
 
 
150
# The data for these tests are signed using openssl and are stored in files
 
151
# in the signing subdirectory.  In order to keep the values consistent between
 
152
# the tests and the signed documents, we read them in for use in the tests.
 
153
def setUpModule(self):
 
154
    signing_path = os.path.join(os.path.dirname(__file__), 'signing')
 
155
    with open(os.path.join(signing_path, 'auth_token_scoped.pem')) as f:
 
156
        self.SIGNED_TOKEN_SCOPED = cms.cms_to_token(f.read())
 
157
    with open(os.path.join(signing_path, 'auth_token_unscoped.pem')) as f:
 
158
        self.SIGNED_TOKEN_UNSCOPED = cms.cms_to_token(f.read())
 
159
    with open(os.path.join(signing_path, 'auth_token_revoked.pem')) as f:
 
160
        self.REVOKED_TOKEN = cms.cms_to_token(f.read())
 
161
    self.REVOKED_TOKEN_HASH = utils.hash_signed_token(self.REVOKED_TOKEN)
 
162
    with open(os.path.join(signing_path, 'revocation_list.json')) as f:
 
163
        self.REVOCATION_LIST = jsonutils.loads(f.read())
 
164
    with open(os.path.join(signing_path, 'revocation_list.pem')) as f:
 
165
        self.VALID_SIGNED_REVOCATION_LIST = jsonutils.dumps(
 
166
            {'signed': f.read()})
 
167
    self.SIGNED_TOKEN_SCOPED_KEY =\
 
168
        cms.cms_hash_token(self.SIGNED_TOKEN_SCOPED)
 
169
    self.SIGNED_TOKEN_UNSCOPED_KEY =\
 
170
        cms.cms_hash_token(self.SIGNED_TOKEN_UNSCOPED)
 
171
 
 
172
    self.TOKEN_RESPONSES[self.SIGNED_TOKEN_SCOPED_KEY] = {
 
173
        'access': {
 
174
            'token': {
 
175
                'id': self.SIGNED_TOKEN_SCOPED_KEY,
 
176
            },
 
177
            'user': {
 
178
                'id': 'user_id1',
 
179
                'name': 'user_name1',
 
180
                'tenantId': 'tenant_id1',
 
181
                'tenantName': 'tenant_name1',
 
182
                'roles': [
 
183
                    {'name': 'role1'},
 
184
                    {'name': 'role2'},
 
185
                ],
 
186
            },
 
187
        },
 
188
    }
 
189
 
 
190
    self.TOKEN_RESPONSES[SIGNED_TOKEN_UNSCOPED_KEY] = {
 
191
        'access': {
 
192
            'token': {
 
193
                'id': SIGNED_TOKEN_UNSCOPED_KEY,
 
194
            },
 
195
            'user': {
 
196
                'id': 'user_id1',
 
197
                'name': 'user_name1',
 
198
                'roles': [
 
199
                    {'name': 'role1'},
 
200
                    {'name': 'role2'},
 
201
                ],
 
202
            },
 
203
        },
 
204
    },
 
205
 
 
206
 
 
207
class FakeMemcache(object):
 
208
    def __init__(self):
 
209
        self.set_key = None
 
210
        self.set_value = None
 
211
        self.token_expiration = None
 
212
 
 
213
    def get(self, key):
 
214
        data = TOKEN_RESPONSES[SIGNED_TOKEN_SCOPED_KEY].copy()
 
215
        if not data or key != "tokens/%s" % (data['access']['token']['id']):
 
216
            return
 
217
        if not self.token_expiration:
 
218
            dt = datetime.datetime.now() + datetime.timedelta(minutes=5)
 
219
            self.token_expiration = dt.strftime("%s")
 
220
        dt = datetime.datetime.now() + datetime.timedelta(hours=24)
 
221
        ks_expires = dt.isoformat()
 
222
        data['access']['token']['expires'] = ks_expires
 
223
        return (data, str(self.token_expiration))
 
224
 
 
225
    def set(self, key, value, time=None):
 
226
        self.set_value = value
 
227
        self.set_key = key
 
228
 
 
229
 
 
230
class FakeHTTPResponse(object):
 
231
    def __init__(self, status, body):
 
232
        self.status = status
 
233
        self.body = body
 
234
 
 
235
    def read(self):
 
236
        return self.body
 
237
 
 
238
 
 
239
class FakeStackHTTPConnection(object):
 
240
 
 
241
    def __init__(self, *args, **kwargs):
 
242
        pass
 
243
 
 
244
    def getresponse(self):
 
245
        if len(FAKE_RESPONSE_STACK):
 
246
            return FAKE_RESPONSE_STACK.pop()
 
247
        return FakeHTTPResponse(500, jsonutils.dumps('UNEXPECTED RESPONSE'))
 
248
 
 
249
    def request(self, *_args, **_kwargs):
 
250
        pass
 
251
 
 
252
    def close(self):
 
253
        pass
 
254
 
 
255
 
 
256
class FakeHTTPConnection(object):
 
257
 
 
258
    last_requested_url = ''
 
259
 
 
260
    def __init__(self, *args):
 
261
        self.send_valid_revocation_list = True
 
262
 
 
263
    def request(self, method, path, **kwargs):
 
264
        """Fakes out several http responses.
 
265
 
 
266
        If a POST request is made, we assume the calling code is trying
 
267
        to get a new admin token.
 
268
 
 
269
        If a GET request is made to validate a token, return success
 
270
        if the token is 'token1'. If a different token is provided, return
 
271
        a 404, indicating an unknown (therefore unauthorized) token.
 
272
 
 
273
        """
 
274
        FakeHTTPConnection.last_requested_url = path
 
275
        if method == 'POST':
 
276
            status = 200
 
277
            body = jsonutils.dumps({
 
278
                'access': {
 
279
                    'token': {'id': 'admin_token2'},
 
280
                },
 
281
            })
 
282
 
 
283
        else:
 
284
            token_id = path.rsplit('/', 1)[1]
 
285
            if token_id in TOKEN_RESPONSES.keys():
 
286
                status = 200
 
287
                body = jsonutils.dumps(TOKEN_RESPONSES[token_id])
 
288
            elif token_id == "revoked":
 
289
                status = 200
 
290
                body = SIGNED_REVOCATION_LIST
 
291
            else:
 
292
                status = 404
 
293
                body = str()
 
294
 
 
295
        self.resp = FakeHTTPResponse(status, body)
 
296
 
 
297
    def getresponse(self):
 
298
        return self.resp
 
299
 
 
300
    def close(self):
 
301
        pass
 
302
 
 
303
 
 
304
class FakeApp(object):
 
305
    """This represents a WSGI app protected by the auth_token middleware."""
 
306
    def __init__(self, expected_env=None):
 
307
        expected_env = expected_env or {}
 
308
        self.expected_env = {
 
309
            'HTTP_X_IDENTITY_STATUS': 'Confirmed',
 
310
            'HTTP_X_TENANT_ID': 'tenant_id1',
 
311
            'HTTP_X_TENANT_NAME': 'tenant_name1',
 
312
            'HTTP_X_USER_ID': 'user_id1',
 
313
            'HTTP_X_USER_NAME': 'user_name1',
 
314
            'HTTP_X_ROLES': 'role1,role2',
 
315
            'HTTP_X_USER': 'user_name1',  # deprecated (diablo-compat)
 
316
            'HTTP_X_TENANT': 'tenant_name1',  # deprecated (diablo-compat)
 
317
            'HTTP_X_ROLE': 'role1,role2',  # deprecated (diablo-compat)
 
318
        }
 
319
        self.expected_env.update(expected_env)
 
320
 
 
321
    def __call__(self, env, start_response):
 
322
        for k, v in self.expected_env.items():
 
323
            assert env[k] == v, '%s != %s' % (env[k], v)
 
324
 
 
325
        resp = webob.Response()
 
326
        resp.body = 'SUCCESS'
 
327
        return resp(env, start_response)
 
328
 
 
329
 
 
330
class BaseAuthTokenMiddlewareTest(test.TestCase):
 
331
 
 
332
    def setUp(self, expected_env=None):
 
333
        expected_env = expected_env or {}
 
334
 
 
335
        conf = {
 
336
            'admin_token': 'admin_token1',
 
337
            'auth_host': 'keystone.example.com',
 
338
            'auth_port': 1234,
 
339
            'auth_admin_prefix': '/testadmin',
 
340
            'signing_dir': 'signing',
 
341
        }
 
342
 
 
343
        self.middleware = auth_token.AuthProtocol(FakeApp(expected_env), conf)
 
344
        self.middleware.http_client_class = FakeHTTPConnection
 
345
        self.middleware._iso8601 = iso8601
 
346
 
 
347
        self.response_status = None
 
348
        self.response_headers = None
 
349
        self.middleware.revoked_file_name = tempfile.mkstemp()[1]
 
350
        self.middleware.token_revocation_list_cache_timeout =\
 
351
            datetime.timedelta(days=1)
 
352
        self.middleware.token_revocation_list = jsonutils.dumps(
 
353
            {"revoked": [], "extra": "success"})
 
354
 
 
355
        globals()['SIGNED_REVOCATION_LIST'] =\
 
356
            globals()['VALID_SIGNED_REVOCATION_LIST']
 
357
 
 
358
        super(BaseAuthTokenMiddlewareTest, self).setUp()
 
359
 
 
360
    def tearDown(self):
 
361
        super(BaseAuthTokenMiddlewareTest, self).tearDown()
 
362
        try:
 
363
            os.remove(self.middleware.revoked_file_name)
 
364
        except OSError:
 
365
            pass
 
366
 
 
367
    def start_fake_response(self, status, headers):
 
368
        self.response_status = int(status.split(' ', 1)[0])
 
369
        self.response_headers = dict(headers)
 
370
 
 
371
 
 
372
class StackResponseAuthTokenMiddlewareTest(BaseAuthTokenMiddlewareTest):
 
373
    """Auth Token middleware test setup that allows the tests to define
 
374
    a stack of responses to HTTP requests in the test and get those
 
375
    responses back in sequence for testing.
 
376
 
 
377
    Example::
 
378
 
 
379
        resp1 = FakeHTTPResponse(401, jsonutils.dumps(''))
 
380
        resp2 = FakeHTTPResponse(200, jsonutils.dumps({
 
381
            'access': {
 
382
                'token': {'id': 'admin_token2'},
 
383
            },
 
384
            })
 
385
        FAKE_RESPONSE_STACK.append(resp1)
 
386
        FAKE_RESPONSE_STACK.append(resp2)
 
387
 
 
388
        ... do your testing code here ...
 
389
 
 
390
    """
 
391
 
 
392
    def setUp(self, expected_env=None):
 
393
        super(StackResponseAuthTokenMiddlewareTest, self).setUp(expected_env)
 
394
        self.middleware.http_client_class = FakeStackHTTPConnection
 
395
 
 
396
    def test_fetch_revocation_list_with_expire(self):
 
397
        # first response to revocation list should return 401 Unauthorized
 
398
        # to pretend to be an expired token
 
399
        resp1 = FakeHTTPResponse(200, jsonutils.dumps({
 
400
            'access': {
 
401
                'token': {'id': 'admin_token2'},
 
402
            },
 
403
        }))
 
404
        resp2 = FakeHTTPResponse(401, jsonutils.dumps(''))
 
405
        resp3 = FakeHTTPResponse(200, jsonutils.dumps({
 
406
            'access': {
 
407
                'token': {'id': 'admin_token2'},
 
408
            },
 
409
        }))
 
410
        resp4 = FakeHTTPResponse(200, SIGNED_REVOCATION_LIST)
 
411
 
 
412
        # first get_admin_token() call
 
413
        FAKE_RESPONSE_STACK.append(resp1)
 
414
        # request revocation list, get "unauthorized" due to simulated expired
 
415
        # token
 
416
        FAKE_RESPONSE_STACK.append(resp2)
 
417
        # request a new admin_token
 
418
        FAKE_RESPONSE_STACK.append(resp3)
 
419
        # request revocation list, get the revocation list properly
 
420
        FAKE_RESPONSE_STACK.append(resp4)
 
421
 
 
422
        fetched_list = jsonutils.loads(self.middleware.fetch_revocation_list())
 
423
        self.assertEqual(fetched_list, REVOCATION_LIST)
 
424
 
 
425
 
 
426
class DiabloAuthTokenMiddlewareTest(BaseAuthTokenMiddlewareTest):
 
427
    """Auth Token middleware should understand Diablo keystone responses."""
 
428
    def setUp(self):
 
429
        # pre-diablo only had Tenant ID, which was also the Name
 
430
        expected_env = {
 
431
            'HTTP_X_TENANT_ID': 'tenant_id1',
 
432
            'HTTP_X_TENANT_NAME': 'tenant_id1',
 
433
            # now deprecated (diablo-compat)
 
434
            'HTTP_X_TENANT': 'tenant_id1',
 
435
        }
 
436
        super(DiabloAuthTokenMiddlewareTest, self).setUp(expected_env)
 
437
 
 
438
    def test_valid_diablo_response(self):
 
439
        req = webob.Request.blank('/')
 
440
        req.headers['X-Auth-Token'] = VALID_DIABLO_TOKEN
 
441
        self.middleware(req.environ, self.start_fake_response)
 
442
        self.assertEqual(self.response_status, 200)
 
443
 
 
444
 
 
445
class AuthTokenMiddlewareTest(BaseAuthTokenMiddlewareTest):
 
446
    def assert_valid_request_200(self, token):
 
447
        req = webob.Request.blank('/')
 
448
        req.headers['X-Auth-Token'] = token
 
449
        body = self.middleware(req.environ, self.start_fake_response)
 
450
        self.assertEqual(self.response_status, 200)
 
451
        self.assertTrue(req.headers.get('X-Service-Catalog'))
 
452
        self.assertEqual(body, ['SUCCESS'])
 
453
 
 
454
    def test_valid_uuid_request(self):
 
455
        self.assert_valid_request_200(UUID_TOKEN_DEFAULT)
 
456
        self.assertEqual("/testadmin/v2.0/tokens/%s" % UUID_TOKEN_DEFAULT,
 
457
                         FakeHTTPConnection.last_requested_url)
 
458
 
 
459
    def test_valid_signed_request(self):
 
460
        FakeHTTPConnection.last_requested_url = ''
 
461
        self.assert_valid_request_200(SIGNED_TOKEN_SCOPED)
 
462
        self.assertEqual(self.middleware.conf['auth_admin_prefix'],
 
463
                         "/testadmin")
 
464
        #ensure that signed requests do not generate HTTP traffic
 
465
        self.assertEqual('', FakeHTTPConnection.last_requested_url)
 
466
 
 
467
    def assert_unscoped_default_tenant_auto_scopes(self, token):
 
468
        """Unscoped requests with a default tenant should "auto-scope."
 
469
 
 
470
        The implied scope is the user's tenant ID.
 
471
 
 
472
        """
 
473
        req = webob.Request.blank('/')
 
474
        req.headers['X-Auth-Token'] = token
 
475
        body = self.middleware(req.environ, self.start_fake_response)
 
476
        self.assertEqual(self.response_status, 200)
 
477
        self.assertEqual(body, ['SUCCESS'])
 
478
 
 
479
    def test_default_tenant_uuid_token(self):
 
480
        self.assert_unscoped_default_tenant_auto_scopes(UUID_TOKEN_DEFAULT)
 
481
 
 
482
    def test_default_tenant_signed_token(self):
 
483
        self.assert_unscoped_default_tenant_auto_scopes(SIGNED_TOKEN_SCOPED)
 
484
 
 
485
    def assert_unscoped_token_receives_401(self, token):
 
486
        """Unscoped requests with no default tenant ID should be rejected."""
 
487
        req = webob.Request.blank('/')
 
488
        req.headers['X-Auth-Token'] = token
 
489
        self.middleware(req.environ, self.start_fake_response)
 
490
        self.assertEqual(self.response_status, 401)
 
491
        self.assertEqual(self.response_headers['WWW-Authenticate'],
 
492
                         'Keystone uri=\'https://keystone.example.com:1234\'')
 
493
 
 
494
    def test_unscoped_uuid_token_receives_401(self):
 
495
        self.assert_unscoped_token_receives_401(UUID_TOKEN_UNSCOPED)
 
496
 
 
497
    def test_unscoped_pki_token_receives_401(self):
 
498
        self.assert_unscoped_token_receives_401(SIGNED_TOKEN_UNSCOPED)
 
499
 
 
500
    def test_revoked_token_receives_401(self):
 
501
        self.middleware.token_revocation_list = self.get_revocation_list_json()
 
502
        req = webob.Request.blank('/')
 
503
        req.headers['X-Auth-Token'] = REVOKED_TOKEN
 
504
        self.middleware(req.environ, self.start_fake_response)
 
505
        self.assertEqual(self.response_status, 401)
 
506
 
 
507
    def get_revocation_list_json(self, token_ids=None):
 
508
        if token_ids is None:
 
509
            token_ids = [REVOKED_TOKEN_HASH]
 
510
        revocation_list = {'revoked': [{'id': x, 'expires': timeutils.utcnow()}
 
511
                                       for x in token_ids]}
 
512
        return jsonutils.dumps(revocation_list)
 
513
 
 
514
    def test_is_signed_token_revoked_returns_false(self):
 
515
        #explicitly setting an empty revocation list here to document intent
 
516
        self.middleware.token_revocation_list = jsonutils.dumps(
 
517
            {"revoked": [], "extra": "success"})
 
518
        result = self.middleware.is_signed_token_revoked(REVOKED_TOKEN)
 
519
        self.assertFalse(result)
 
520
 
 
521
    def test_is_signed_token_revoked_returns_true(self):
 
522
        self.middleware.token_revocation_list = self.get_revocation_list_json()
 
523
        result = self.middleware.is_signed_token_revoked(REVOKED_TOKEN)
 
524
        self.assertTrue(result)
 
525
 
 
526
    def test_verify_signed_token_raises_exception_for_revoked_token(self):
 
527
        self.middleware.token_revocation_list = self.get_revocation_list_json()
 
528
        with self.assertRaises(auth_token.InvalidUserToken):
 
529
            self.middleware.verify_signed_token(REVOKED_TOKEN)
 
530
 
 
531
    def test_verify_signed_token_succeeds_for_unrevoked_token(self):
 
532
        self.middleware.token_revocation_list = self.get_revocation_list_json()
 
533
        self.middleware.verify_signed_token(SIGNED_TOKEN_SCOPED)
 
534
 
 
535
    def test_get_token_revocation_list_fetched_time_returns_min(self):
 
536
        self.middleware.token_revocation_list_fetched_time = None
 
537
        self.middleware.revoked_file_name = ''
 
538
        self.assertEqual(self.middleware.token_revocation_list_fetched_time,
 
539
                         datetime.datetime.min)
 
540
 
 
541
    def test_get_token_revocation_list_fetched_time_returns_mtime(self):
 
542
        self.middleware.token_revocation_list_fetched_time = None
 
543
        mtime = os.path.getmtime(self.middleware.revoked_file_name)
 
544
        fetched_time = datetime.datetime.fromtimestamp(mtime)
 
545
        self.assertEqual(self.middleware.token_revocation_list_fetched_time,
 
546
                         fetched_time)
 
547
 
 
548
    def test_get_token_revocation_list_fetched_time_returns_value(self):
 
549
        expected = self.middleware._token_revocation_list_fetched_time
 
550
        self.assertEqual(self.middleware.token_revocation_list_fetched_time,
 
551
                         expected)
 
552
 
 
553
    def test_get_revocation_list_returns_fetched_list(self):
 
554
        self.middleware.token_revocation_list_fetched_time = None
 
555
        os.remove(self.middleware.revoked_file_name)
 
556
        self.assertEqual(self.middleware.token_revocation_list,
 
557
                         REVOCATION_LIST)
 
558
 
 
559
    def test_get_revocation_list_returns_current_list_from_memory(self):
 
560
        self.assertEqual(self.middleware.token_revocation_list,
 
561
                         self.middleware._token_revocation_list)
 
562
 
 
563
    def test_get_revocation_list_returns_current_list_from_disk(self):
 
564
        in_memory_list = self.middleware.token_revocation_list
 
565
        self.middleware._token_revocation_list = None
 
566
        self.assertEqual(self.middleware.token_revocation_list, in_memory_list)
 
567
 
 
568
    def test_invalid_revocation_list_raises_service_error(self):
 
569
        globals()['SIGNED_REVOCATION_LIST'] = "{}"
 
570
        with self.assertRaises(auth_token.ServiceError):
 
571
            self.middleware.fetch_revocation_list()
 
572
 
 
573
    def test_fetch_revocation_list(self):
 
574
        fetched_list = jsonutils.loads(self.middleware.fetch_revocation_list())
 
575
        self.assertEqual(fetched_list, REVOCATION_LIST)
 
576
 
 
577
    def test_request_invalid_uuid_token(self):
 
578
        req = webob.Request.blank('/')
 
579
        req.headers['X-Auth-Token'] = 'invalid-token'
 
580
        self.middleware(req.environ, self.start_fake_response)
 
581
        self.assertEqual(self.response_status, 401)
 
582
        self.assertEqual(self.response_headers['WWW-Authenticate'],
 
583
                         'Keystone uri=\'https://keystone.example.com:1234\'')
 
584
 
 
585
    def test_request_invalid_signed_token(self):
 
586
        req = webob.Request.blank('/')
 
587
        req.headers['X-Auth-Token'] = INVALID_SIGNED_TOKEN
 
588
        self.middleware(req.environ, self.start_fake_response)
 
589
        self.assertEqual(self.response_status, 401)
 
590
        self.assertEqual(self.response_headers['WWW-Authenticate'],
 
591
                         'Keystone uri=\'https://keystone.example.com:1234\'')
 
592
 
 
593
    def test_request_no_token(self):
 
594
        req = webob.Request.blank('/')
 
595
        self.middleware(req.environ, self.start_fake_response)
 
596
        self.assertEqual(self.response_status, 401)
 
597
        self.assertEqual(self.response_headers['WWW-Authenticate'],
 
598
                         'Keystone uri=\'https://keystone.example.com:1234\'')
 
599
 
 
600
    def test_request_blank_token(self):
 
601
        req = webob.Request.blank('/')
 
602
        req.headers['X-Auth-Token'] = ''
 
603
        self.middleware(req.environ, self.start_fake_response)
 
604
        self.assertEqual(self.response_status, 401)
 
605
        self.assertEqual(self.response_headers['WWW-Authenticate'],
 
606
                         'Keystone uri=\'https://keystone.example.com:1234\'')
 
607
 
 
608
    def test_memcache(self):
 
609
        req = webob.Request.blank('/')
 
610
        req.headers['X-Auth-Token'] = SIGNED_TOKEN_SCOPED
 
611
        self.middleware._cache = FakeMemcache()
 
612
        self.middleware(req.environ, self.start_fake_response)
 
613
        self.assertEqual(self.middleware._cache.set_value, None)
 
614
 
 
615
    def test_memcache_set_invalid(self):
 
616
        req = webob.Request.blank('/')
 
617
        req.headers['X-Auth-Token'] = 'invalid-token'
 
618
        self.middleware._cache = FakeMemcache()
 
619
        self.middleware(req.environ, self.start_fake_response)
 
620
        self.assertEqual(self.middleware._cache.set_value, "invalid")
 
621
 
 
622
    def test_memcache_set_expired(self):
 
623
        req = webob.Request.blank('/')
 
624
        req.headers['X-Auth-Token'] = SIGNED_TOKEN_SCOPED
 
625
        self.middleware._cache = FakeMemcache()
 
626
        expired = datetime.datetime.now() - datetime.timedelta(minutes=1)
 
627
        self.middleware._cache.token_expiration = float(expired.strftime("%s"))
 
628
        self.middleware(req.environ, self.start_fake_response)
 
629
        self.assertEqual(len(self.middleware._cache.set_value), 2)
 
630
 
 
631
    def test_nomemcache(self):
 
632
        self.disable_module('memcache')
 
633
 
 
634
        conf = {
 
635
            'admin_token': 'admin_token1',
 
636
            'auth_host': 'keystone.example.com',
 
637
            'auth_port': 1234,
 
638
            'memcache_servers': 'localhost:11211',
 
639
        }
 
640
 
 
641
        auth_token.AuthProtocol(FakeApp(), conf)
 
642
 
 
643
    def test_request_prevent_service_catalog_injection(self):
 
644
        req = webob.Request.blank('/')
 
645
        req.headers['X-Service-Catalog'] = '[]'
 
646
        req.headers['X-Auth-Token'] = UUID_TOKEN_NO_SERVICE_CATALOG
 
647
        body = self.middleware(req.environ, self.start_fake_response)
 
648
        self.assertEqual(self.response_status, 200)
 
649
        self.assertFalse(req.headers.get('X-Service-Catalog'))
 
650
        self.assertEqual(body, ['SUCCESS'])
 
651
 
 
652
    def test_will_expire_soon(self):
 
653
        tenseconds = datetime.datetime.utcnow() + datetime.timedelta(
 
654
            seconds=10)
 
655
        self.assertTrue(auth_token.will_expire_soon(tenseconds))
 
656
        fortyseconds = datetime.datetime.utcnow() + datetime.timedelta(
 
657
            seconds=40)
 
658
        self.assertFalse(auth_token.will_expire_soon(fortyseconds))