~ubuntu-branches/ubuntu/trusty/python-keystoneclient/trusty-proposed

« back to all changes in this revision

Viewing changes to keystoneclient/tests/test_auth_token_middleware.py

  • Committer: Package Import Robot
  • Author(s): Chuck Short, Adam Gandelman, Chuck Short
  • Date: 2013-11-14 10:51:32 UTC
  • mfrom: (1.1.23)
  • Revision ID: package-import@ubuntu.com-20131114105132-p1o428l7fclasv9e
Tags: 1:0.4.1-0ubuntu1
[ Adam Gandelman ]
* debian/patches: Refreshed.
* debian/patches/use-mox-dependency.patch: Use mox instead of mox3
  dependency.

[ Chuck Short ]
* New upstream release.
* debian/control:
  - open icehouse release.
  - Dropped python-d2to1 and python-httplib2 dependency.
* debian/patches/skip-tests-ubuntu.patch: Dropped no longer needed.

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# vim: tabstop=4 shiftwidth=4 softtabstop=4
 
2
 
 
3
# Copyright 2012 OpenStack Foundation
 
4
#
 
5
# Licensed under the Apache License, Version 2.0 (the "License"); you may
 
6
# not use this file except in compliance with the License. You may obtain
 
7
# a copy of the License at
 
8
#
 
9
#      http://www.apache.org/licenses/LICENSE-2.0
 
10
#
 
11
# Unless required by applicable law or agreed to in writing, software
 
12
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
 
13
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
 
14
# License for the specific language governing permissions and limitations
 
15
# under the License.
 
16
 
 
17
import calendar
 
18
import datetime
 
19
import iso8601
 
20
import os
 
21
import shutil
 
22
import stat
 
23
import sys
 
24
import tempfile
 
25
import testtools
 
26
import uuid
 
27
 
 
28
import fixtures
 
29
import httpretty
 
30
import mock
 
31
import webob
 
32
 
 
33
from keystoneclient.common import cms
 
34
from keystoneclient.middleware import auth_token
 
35
from keystoneclient.openstack.common import jsonutils
 
36
from keystoneclient.openstack.common import memorycache
 
37
from keystoneclient.openstack.common import timeutils
 
38
from keystoneclient.tests import client_fixtures
 
39
 
 
40
 
 
41
EXPECTED_V2_DEFAULT_ENV_RESPONSE = {
 
42
    'HTTP_X_IDENTITY_STATUS': 'Confirmed',
 
43
    'HTTP_X_TENANT_ID': 'tenant_id1',
 
44
    'HTTP_X_TENANT_NAME': 'tenant_name1',
 
45
    'HTTP_X_USER_ID': 'user_id1',
 
46
    'HTTP_X_USER_NAME': 'user_name1',
 
47
    'HTTP_X_ROLES': 'role1,role2',
 
48
    'HTTP_X_USER': 'user_name1',  # deprecated (diablo-compat)
 
49
    'HTTP_X_TENANT': 'tenant_name1',  # deprecated (diablo-compat)
 
50
    'HTTP_X_ROLE': 'role1,role2',  # deprecated (diablo-compat)
 
51
}
 
52
 
 
53
 
 
54
BASE_HOST = 'https://keystone.example.com:1234'
 
55
BASE_URI = '%s/testadmin' % BASE_HOST
 
56
FAKE_ADMIN_TOKEN_ID = 'admin_token2'
 
57
FAKE_ADMIN_TOKEN = jsonutils.dumps(
 
58
    {'access': {'token': {'id': FAKE_ADMIN_TOKEN_ID,
 
59
                          'expires': '2022-10-03T16:58:01Z'}}})
 
60
 
 
61
 
 
62
VERSION_LIST_v3 = jsonutils.dumps({
 
63
    "versions": {
 
64
        "values": [
 
65
            {
 
66
                "id": "v3.0",
 
67
                "status": "stable",
 
68
                "updated": "2013-03-06T00:00:00Z",
 
69
                "links": [{'href': '%s/v3' % BASE_URI, 'rel': 'self'}]
 
70
            },
 
71
            {
 
72
                "id": "v2.0",
 
73
                "status": "stable",
 
74
                "updated": "2011-11-19T00:00:00Z",
 
75
                "links": [{'href': '%s/v2.0' % BASE_URI, 'rel': 'self'}]
 
76
            }
 
77
        ]
 
78
    }
 
79
})
 
80
 
 
81
VERSION_LIST_v2 = jsonutils.dumps({
 
82
    "versions": {
 
83
        "values": [
 
84
            {
 
85
                "id": "v2.0",
 
86
                "status": "stable",
 
87
                "updated": "2011-11-19T00:00:00Z",
 
88
                "links": []
 
89
            }
 
90
        ]
 
91
    }
 
92
})
 
93
 
 
94
ERROR_TOKEN = '7ae290c2a06244c4b41692eb4e9225f2'
 
95
 
 
96
 
 
97
class NoModuleFinder(object):
 
98
    """Disallow further imports of 'module'."""
 
99
 
 
100
    def __init__(self, module):
 
101
        self.module = module
 
102
 
 
103
    def find_module(self, fullname, path):
 
104
        if fullname == self.module or fullname.startswith(self.module + '.'):
 
105
            raise ImportError
 
106
 
 
107
 
 
108
class DisableModuleFixture(fixtures.Fixture):
 
109
    """A fixture to provide support for unloading/disabling modules."""
 
110
 
 
111
    def __init__(self, module, *args, **kw):
 
112
        super(DisableModuleFixture, self).__init__(*args, **kw)
 
113
        self.module = module
 
114
        self._finders = []
 
115
        self._cleared_modules = {}
 
116
 
 
117
    def tearDown(self):
 
118
        super(DisableModuleFixture, self).tearDown()
 
119
        for finder in self._finders:
 
120
            sys.meta_path.remove(finder)
 
121
        sys.modules.update(self._cleared_modules)
 
122
 
 
123
    def clear_module(self):
 
124
        cleared_modules = {}
 
125
        for fullname in sys.modules.keys():
 
126
            if (fullname == self.module or
 
127
                    fullname.startswith(self.module + '.')):
 
128
                cleared_modules[fullname] = sys.modules.pop(fullname)
 
129
        return cleared_modules
 
130
 
 
131
    def setUp(self):
 
132
        """Ensure ImportError for the specified module."""
 
133
 
 
134
        super(DisableModuleFixture, self).setUp()
 
135
 
 
136
        # Clear 'module' references in sys.modules
 
137
        self._cleared_modules.update(self.clear_module())
 
138
 
 
139
        finder = NoModuleFinder(self.module)
 
140
        self._finders.append(finder)
 
141
        sys.meta_path.insert(0, finder)
 
142
 
 
143
 
 
144
class FakeSwiftOldMemcacheClient(memorycache.Client):
 
145
    # NOTE(vish,chmou): old swift memcache uses param timeout instead of time
 
146
    def set(self, key, value, timeout=0, min_compress_len=0):
 
147
        sup = super(FakeSwiftOldMemcacheClient, self)
 
148
        sup.set(key, value, timeout, min_compress_len)
 
149
 
 
150
 
 
151
class FakeApp(object):
 
152
    """This represents a WSGI app protected by the auth_token middleware."""
 
153
 
 
154
    def __init__(self, expected_env=None):
 
155
        self.expected_env = dict(EXPECTED_V2_DEFAULT_ENV_RESPONSE)
 
156
 
 
157
        if expected_env:
 
158
            self.expected_env.update(expected_env)
 
159
 
 
160
    def __call__(self, env, start_response):
 
161
        for k, v in self.expected_env.items():
 
162
            assert env[k] == v, '%s != %s' % (env[k], v)
 
163
 
 
164
        resp = webob.Response()
 
165
        resp.body = 'SUCCESS'
 
166
        return resp(env, start_response)
 
167
 
 
168
 
 
169
class v3FakeApp(FakeApp):
 
170
    """This represents a v3 WSGI app protected by the auth_token middleware."""
 
171
 
 
172
    def __init__(self, expected_env=None):
 
173
 
 
174
        # with v3 additions, these are for the DEFAULT TOKEN
 
175
        v3_default_env_additions = {
 
176
            'HTTP_X_PROJECT_ID': 'tenant_id1',
 
177
            'HTTP_X_PROJECT_NAME': 'tenant_name1',
 
178
            'HTTP_X_PROJECT_DOMAIN_ID': 'domain_id1',
 
179
            'HTTP_X_PROJECT_DOMAIN_NAME': 'domain_name1',
 
180
            'HTTP_X_USER_DOMAIN_ID': 'domain_id1',
 
181
            'HTTP_X_USER_DOMAIN_NAME': 'domain_name1'
 
182
        }
 
183
 
 
184
        if expected_env:
 
185
            v3_default_env_additions.update(expected_env)
 
186
 
 
187
        super(v3FakeApp, self).__init__(v3_default_env_additions)
 
188
 
 
189
 
 
190
class BaseAuthTokenMiddlewareTest(testtools.TestCase):
 
191
    """Base test class for auth_token middleware.
 
192
 
 
193
    All the tests allow for running with auth_token
 
194
    configured for receiving v2 or v3 tokens, with the
 
195
    choice being made by passing configuration data into
 
196
    Setup().
 
197
 
 
198
    The base class will, by default, run all the tests
 
199
    expecting v2 token formats.  Child classes can override
 
200
    this to specify, for instance, v3 format.
 
201
 
 
202
    """
 
203
    def setUp(self, expected_env=None, auth_version=None, fake_app=None):
 
204
        testtools.TestCase.setUp(self)
 
205
 
 
206
        self.expected_env = expected_env or dict()
 
207
        self.fake_app = fake_app or FakeApp
 
208
        self.middleware = None
 
209
 
 
210
        self.conf = {
 
211
            'auth_host': 'keystone.example.com',
 
212
            'auth_port': 1234,
 
213
            'auth_protocol': 'https',
 
214
            'auth_admin_prefix': '/testadmin',
 
215
            'signing_dir': client_fixtures.CERTDIR,
 
216
            'auth_version': auth_version,
 
217
            'auth_uri': 'https://keystone.example.com:1234',
 
218
        }
 
219
 
 
220
        self.response_status = None
 
221
        self.response_headers = None
 
222
 
 
223
    def set_middleware(self, fake_app=None, expected_env=None, conf=None):
 
224
        """Configure the class ready to call the auth_token middleware.
 
225
 
 
226
        Set up the various fake items needed to run the middleware.
 
227
        Individual tests that need to further refine these can call this
 
228
        function to override the class defaults.
 
229
 
 
230
        """
 
231
        if conf:
 
232
            self.conf.update(conf)
 
233
 
 
234
        if not fake_app:
 
235
            fake_app = self.fake_app
 
236
 
 
237
        if expected_env:
 
238
            self.expected_env.update(expected_env)
 
239
 
 
240
        self.middleware = auth_token.AuthProtocol(fake_app(self.expected_env),
 
241
                                                  self.conf)
 
242
        self.middleware._iso8601 = iso8601
 
243
        self.middleware.revoked_file_name = tempfile.mkstemp()[1]
 
244
        self.middleware.token_revocation_list = jsonutils.dumps(
 
245
            {"revoked": [], "extra": "success"})
 
246
 
 
247
    def tearDown(self):
 
248
        testtools.TestCase.tearDown(self)
 
249
        if self.middleware:
 
250
            try:
 
251
                os.remove(self.middleware.revoked_file_name)
 
252
            except OSError:
 
253
                pass
 
254
 
 
255
    def start_fake_response(self, status, headers):
 
256
        self.response_status = int(status.split(' ', 1)[0])
 
257
        self.response_headers = dict(headers)
 
258
 
 
259
    def assertLastPath(self, path):
 
260
        if path:
 
261
            self.assertEqual(path, httpretty.httpretty.last_request.path)
 
262
        else:
 
263
            self.assertIsInstance(httpretty.httpretty.last_request,
 
264
                                  httpretty.core.HTTPrettyRequestEmpty)
 
265
 
 
266
if tuple(sys.version_info)[0:2] < (2, 7):
 
267
 
 
268
    # 2.6 doesn't have the assert dict equals so make sure that it exists
 
269
    class AdjustedBaseAuthTokenMiddlewareTest(BaseAuthTokenMiddlewareTest):
 
270
        def assertIsInstance(self, obj, cls, msg=None):
 
271
            """Same as self.assertTrue(isinstance(obj, cls)), with a nicer
 
272
            default message.
 
273
            """
 
274
            if not isinstance(obj, cls):
 
275
                standardMsg = '%s is not an instance of %r' % (obj, cls)
 
276
                self.fail(self._formatMessage(msg, standardMsg))
 
277
 
 
278
        def assertDictEqual(self, d1, d2, msg=None):
 
279
            # Simple version taken from 2.7
 
280
            self.assertIsInstance(d1, dict,
 
281
                                  'First argument is not a dictionary')
 
282
            self.assertIsInstance(d2, dict,
 
283
                                  'Second argument is not a dictionary')
 
284
            if d1 != d2:
 
285
                if msg:
 
286
                    self.fail(msg)
 
287
                else:
 
288
                    standardMsg = '%r != %r' % (d1, d2)
 
289
                    self.fail(standardMsg)
 
290
 
 
291
    BaseAuthTokenMiddlewareTest = AdjustedBaseAuthTokenMiddlewareTest
 
292
 
 
293
 
 
294
class MultiStepAuthTokenMiddlewareTest(BaseAuthTokenMiddlewareTest):
 
295
 
 
296
    @httpretty.activate
 
297
    def test_fetch_revocation_list_with_expire(self):
 
298
        self.set_middleware()
 
299
 
 
300
        # Get a token, then try to retrieve revocation list and get a 401.
 
301
        # Get a new token, try to retrieve revocation list and return 200.
 
302
        httpretty.register_uri(httpretty.POST, "%s/v2.0/tokens" % BASE_URI,
 
303
                               body=FAKE_ADMIN_TOKEN)
 
304
 
 
305
        responses = [httpretty.Response(body='', status=401),
 
306
                     httpretty.Response(
 
307
                         body=client_fixtures.SIGNED_REVOCATION_LIST)]
 
308
 
 
309
        httpretty.register_uri(httpretty.GET,
 
310
                               "%s/v2.0/tokens/revoked" % BASE_URI,
 
311
                               responses=responses)
 
312
 
 
313
        fetched_list = jsonutils.loads(self.middleware.fetch_revocation_list())
 
314
        self.assertEqual(fetched_list, client_fixtures.REVOCATION_LIST)
 
315
 
 
316
        # Check that 4 requests have been made
 
317
        self.assertEqual(len(httpretty.httpretty.latest_requests), 4)
 
318
 
 
319
 
 
320
class DiabloAuthTokenMiddlewareTest(BaseAuthTokenMiddlewareTest):
 
321
    """Auth Token middleware should understand Diablo keystone responses."""
 
322
    def setUp(self):
 
323
        # pre-diablo only had Tenant ID, which was also the Name
 
324
        expected_env = {
 
325
            'HTTP_X_TENANT_ID': 'tenant_id1',
 
326
            'HTTP_X_TENANT_NAME': 'tenant_id1',
 
327
            # now deprecated (diablo-compat)
 
328
            'HTTP_X_TENANT': 'tenant_id1',
 
329
        }
 
330
 
 
331
        super(DiabloAuthTokenMiddlewareTest, self).setUp(
 
332
            expected_env=expected_env)
 
333
 
 
334
        httpretty.httpretty.reset()
 
335
        httpretty.enable()
 
336
 
 
337
        httpretty.register_uri(httpretty.GET,
 
338
                               "%s/" % BASE_URI,
 
339
                               body=VERSION_LIST_v2,
 
340
                               status=300)
 
341
 
 
342
        httpretty.register_uri(httpretty.POST,
 
343
                               "%s/v2.0/tokens" % BASE_URI,
 
344
                               body=FAKE_ADMIN_TOKEN)
 
345
 
 
346
        self.token_id = client_fixtures.VALID_DIABLO_TOKEN
 
347
        token_response = client_fixtures.JSON_TOKEN_RESPONSES[self.token_id]
 
348
 
 
349
        httpretty.register_uri(httpretty.GET,
 
350
                               "%s/v2.0/tokens/%s" % (BASE_URI, self.token_id),
 
351
                               body=token_response)
 
352
 
 
353
        self.set_middleware()
 
354
 
 
355
    def tearDown(self):
 
356
        httpretty.disable()
 
357
        super(DiabloAuthTokenMiddlewareTest, self).tearDown()
 
358
 
 
359
    def test_valid_diablo_response(self):
 
360
        req = webob.Request.blank('/')
 
361
        req.headers['X-Auth-Token'] = self.token_id
 
362
        self.middleware(req.environ, self.start_fake_response)
 
363
        self.assertEqual(self.response_status, 200)
 
364
        self.assertTrue('keystone.token_info' in req.environ)
 
365
 
 
366
 
 
367
class NoMemcacheAuthToken(BaseAuthTokenMiddlewareTest):
 
368
 
 
369
    def setUp(self):
 
370
        super(NoMemcacheAuthToken, self).setUp()
 
371
        self.useFixture(DisableModuleFixture('memcache'))
 
372
 
 
373
    def test_nomemcache(self):
 
374
        conf = {
 
375
            'admin_token': 'admin_token1',
 
376
            'auth_host': 'keystone.example.com',
 
377
            'auth_port': 1234,
 
378
            'memcached_servers': 'localhost:11211',
 
379
            'auth_uri': 'https://keystone.example.com:1234',
 
380
        }
 
381
 
 
382
        auth_token.AuthProtocol(FakeApp(), conf)
 
383
 
 
384
    def test_not_use_cache_from_env(self):
 
385
        env = {'swift.cache': 'CACHE_TEST'}
 
386
        conf = {
 
387
            'memcached_servers': 'localhost:11211'
 
388
        }
 
389
        self.set_middleware(conf=conf)
 
390
        self.middleware._init_cache(env)
 
391
        self.assertNotEqual(self.middleware._cache, 'CACHE_TEST')
 
392
 
 
393
 
 
394
class CommonAuthTokenMiddlewareTest(object):
 
395
 
 
396
    def test_init_does_not_call_http(self):
 
397
        conf = {
 
398
            'revocation_cache_time': 1
 
399
        }
 
400
        self.set_middleware(conf=conf)
 
401
        self.assertLastPath(None)
 
402
 
 
403
    def test_init_by_ipv6Addr_auth_host(self):
 
404
        conf = {
 
405
            'auth_host': '2001:2013:1:f101::1',
 
406
            'auth_port': 1234,
 
407
            'auth_protocol': 'http',
 
408
            'auth_uri': None,
 
409
        }
 
410
        self.set_middleware(conf=conf)
 
411
        expected_auth_uri = 'http://[2001:2013:1:f101::1]:1234'
 
412
        self.assertEqual(expected_auth_uri, self.middleware.auth_uri)
 
413
 
 
414
    def assert_valid_request_200(self, token, with_catalog=True):
 
415
        req = webob.Request.blank('/')
 
416
        req.headers['X-Auth-Token'] = token
 
417
        body = self.middleware(req.environ, self.start_fake_response)
 
418
        self.assertEqual(self.response_status, 200)
 
419
        if with_catalog:
 
420
            self.assertTrue(req.headers.get('X-Service-Catalog'))
 
421
        self.assertEqual(body, ['SUCCESS'])
 
422
        self.assertTrue('keystone.token_info' in req.environ)
 
423
 
 
424
    def test_valid_uuid_request(self):
 
425
        self.assert_valid_request_200(self.token_dict['uuid_token_default'])
 
426
        self.assert_valid_last_url(self.token_dict['uuid_token_default'])
 
427
 
 
428
    def test_valid_signed_request(self):
 
429
        self.assert_valid_request_200(
 
430
            self.token_dict['signed_token_scoped'])
 
431
        self.assertEqual(self.middleware.conf['auth_admin_prefix'],
 
432
                         "/testadmin")
 
433
        #ensure that signed requests do not generate HTTP traffic
 
434
        self.assertLastPath(None)
 
435
 
 
436
    def test_revoked_token_receives_401(self):
 
437
        self.middleware.token_revocation_list = self.get_revocation_list_json()
 
438
        req = webob.Request.blank('/')
 
439
        req.headers['X-Auth-Token'] = self.token_dict['revoked_token']
 
440
        self.middleware(req.environ, self.start_fake_response)
 
441
        self.assertEqual(self.response_status, 401)
 
442
 
 
443
    def get_revocation_list_json(self, token_ids=None):
 
444
        if token_ids is None:
 
445
            token_ids = [self.token_dict['revoked_token_hash']]
 
446
        revocation_list = {'revoked': [{'id': x, 'expires': timeutils.utcnow()}
 
447
                                       for x in token_ids]}
 
448
        return jsonutils.dumps(revocation_list)
 
449
 
 
450
    def test_is_signed_token_revoked_returns_false(self):
 
451
        #explicitly setting an empty revocation list here to document intent
 
452
        self.middleware.token_revocation_list = jsonutils.dumps(
 
453
            {"revoked": [], "extra": "success"})
 
454
        result = self.middleware.is_signed_token_revoked(
 
455
            self.token_dict['revoked_token'])
 
456
        self.assertFalse(result)
 
457
 
 
458
    def test_is_signed_token_revoked_returns_true(self):
 
459
        self.middleware.token_revocation_list = self.get_revocation_list_json()
 
460
        result = self.middleware.is_signed_token_revoked(
 
461
            self.token_dict['revoked_token'])
 
462
        self.assertTrue(result)
 
463
 
 
464
    def test_verify_signed_token_raises_exception_for_revoked_token(self):
 
465
        self.middleware.token_revocation_list = self.get_revocation_list_json()
 
466
        self.assertRaises(auth_token.InvalidUserToken,
 
467
                          self.middleware.verify_signed_token,
 
468
                          self.token_dict['revoked_token'])
 
469
 
 
470
    def test_verify_signed_token_succeeds_for_unrevoked_token(self):
 
471
        self.middleware.token_revocation_list = self.get_revocation_list_json()
 
472
        self.middleware.verify_signed_token(
 
473
            self.token_dict['signed_token_scoped'])
 
474
 
 
475
    def test_verify_signing_dir_create_while_missing(self):
 
476
        tmp_name = uuid.uuid4().hex
 
477
        test_parent_signing_dir = "/tmp/%s" % tmp_name
 
478
        self.middleware.signing_dirname = "/tmp/%s/%s" % ((tmp_name,) * 2)
 
479
        self.middleware.signing_cert_file_name = "%s/test.pem" %\
 
480
            self.middleware.signing_dirname
 
481
        self.middleware.verify_signing_dir()
 
482
        # NOTE(wu_wenxiang): Verify if the signing dir was created as expected.
 
483
        self.assertTrue(os.path.isdir(self.middleware.signing_dirname))
 
484
        self.assertTrue(os.access(self.middleware.signing_dirname, os.W_OK))
 
485
        self.assertEqual(os.stat(self.middleware.signing_dirname).st_uid,
 
486
                         os.getuid())
 
487
        self.assertEqual(
 
488
            stat.S_IMODE(os.stat(self.middleware.signing_dirname).st_mode),
 
489
            stat.S_IRWXU)
 
490
        shutil.rmtree(test_parent_signing_dir)
 
491
 
 
492
    def test_cert_file_missing(self):
 
493
        self.assertFalse(self.middleware.cert_file_missing(
 
494
                         "openstack: /tmp/haystack: No such file or directory",
 
495
                         "/tmp/needle"))
 
496
        self.assertTrue(self.middleware.cert_file_missing(
 
497
                        "openstack: /not/exist: No such file or directory",
 
498
                        "/not/exist"))
 
499
 
 
500
    def test_get_token_revocation_list_fetched_time_returns_min(self):
 
501
        self.middleware.token_revocation_list_fetched_time = None
 
502
        self.middleware.revoked_file_name = ''
 
503
        self.assertEqual(self.middleware.token_revocation_list_fetched_time,
 
504
                         datetime.datetime.min)
 
505
 
 
506
    def test_get_token_revocation_list_fetched_time_returns_mtime(self):
 
507
        self.middleware.token_revocation_list_fetched_time = None
 
508
        mtime = os.path.getmtime(self.middleware.revoked_file_name)
 
509
        fetched_time = datetime.datetime.fromtimestamp(mtime)
 
510
        self.assertEqual(self.middleware.token_revocation_list_fetched_time,
 
511
                         fetched_time)
 
512
 
 
513
    def test_get_token_revocation_list_fetched_time_returns_value(self):
 
514
        expected = self.middleware._token_revocation_list_fetched_time
 
515
        self.assertEqual(self.middleware.token_revocation_list_fetched_time,
 
516
                         expected)
 
517
 
 
518
    def test_get_revocation_list_returns_fetched_list(self):
 
519
        # auth_token uses v2 to fetch this, so don't allow the v3
 
520
        # tests to override the fake http connection
 
521
        self.middleware.token_revocation_list_fetched_time = None
 
522
        os.remove(self.middleware.revoked_file_name)
 
523
        self.assertEqual(self.middleware.token_revocation_list,
 
524
                         client_fixtures.REVOCATION_LIST)
 
525
 
 
526
    def test_get_revocation_list_returns_current_list_from_memory(self):
 
527
        self.assertEqual(self.middleware.token_revocation_list,
 
528
                         self.middleware._token_revocation_list)
 
529
 
 
530
    def test_get_revocation_list_returns_current_list_from_disk(self):
 
531
        in_memory_list = self.middleware.token_revocation_list
 
532
        self.middleware._token_revocation_list = None
 
533
        self.assertEqual(self.middleware.token_revocation_list, in_memory_list)
 
534
 
 
535
    def test_invalid_revocation_list_raises_service_error(self):
 
536
        httpretty.register_uri(httpretty.GET,
 
537
                               "%s/v2.0/tokens/revoked" % BASE_URI,
 
538
                               body="{}",
 
539
                               status=200)
 
540
 
 
541
        self.assertRaises(auth_token.ServiceError,
 
542
                          self.middleware.fetch_revocation_list)
 
543
 
 
544
    def test_fetch_revocation_list(self):
 
545
        # auth_token uses v2 to fetch this, so don't allow the v3
 
546
        # tests to override the fake http connection
 
547
        fetched_list = jsonutils.loads(self.middleware.fetch_revocation_list())
 
548
        self.assertEqual(fetched_list, client_fixtures.REVOCATION_LIST)
 
549
 
 
550
    def test_request_invalid_uuid_token(self):
 
551
        # remember because we are testing the middleware we stub the connection
 
552
        # to the keystone server, but this is not what gets returned
 
553
        invalid_uri = "%s/v2.0/tokens/invalid-token" % BASE_URI
 
554
        httpretty.register_uri(httpretty.GET, invalid_uri, body="", status=404)
 
555
 
 
556
        req = webob.Request.blank('/')
 
557
        req.headers['X-Auth-Token'] = 'invalid-token'
 
558
        self.middleware(req.environ, self.start_fake_response)
 
559
        self.assertEqual(self.response_status, 401)
 
560
        self.assertEqual(self.response_headers['WWW-Authenticate'],
 
561
                         "Keystone uri='https://keystone.example.com:1234'")
 
562
 
 
563
    def test_request_invalid_signed_token(self):
 
564
        req = webob.Request.blank('/')
 
565
        req.headers['X-Auth-Token'] = client_fixtures.INVALID_SIGNED_TOKEN
 
566
        self.middleware(req.environ, self.start_fake_response)
 
567
        self.assertEqual(self.response_status, 401)
 
568
        self.assertEqual(self.response_headers['WWW-Authenticate'],
 
569
                         "Keystone uri='https://keystone.example.com:1234'")
 
570
 
 
571
    def test_request_no_token(self):
 
572
        req = webob.Request.blank('/')
 
573
        self.middleware(req.environ, self.start_fake_response)
 
574
        self.assertEqual(self.response_status, 401)
 
575
        self.assertEqual(self.response_headers['WWW-Authenticate'],
 
576
                         "Keystone uri='https://keystone.example.com:1234'")
 
577
 
 
578
    def test_request_no_token_log_message(self):
 
579
        class FakeLog(object):
 
580
            def __init__(self):
 
581
                self.msg = None
 
582
                self.debugmsg = None
 
583
 
 
584
            def warn(self, msg=None, *args, **kwargs):
 
585
                self.msg = msg
 
586
 
 
587
            def debug(self, msg=None, *args, **kwargs):
 
588
                self.debugmsg = msg
 
589
 
 
590
        self.middleware.LOG = FakeLog()
 
591
        self.middleware.delay_auth_decision = False
 
592
        self.assertRaises(auth_token.InvalidUserToken,
 
593
                          self.middleware._get_user_token_from_header, {})
 
594
        self.assertIsNotNone(self.middleware.LOG.msg)
 
595
        self.assertIsNotNone(self.middleware.LOG.debugmsg)
 
596
 
 
597
    def test_request_no_token_http(self):
 
598
        req = webob.Request.blank('/', environ={'REQUEST_METHOD': 'HEAD'})
 
599
        self.set_middleware()
 
600
        body = self.middleware(req.environ, self.start_fake_response)
 
601
        self.assertEqual(self.response_status, 401)
 
602
        self.assertEqual(self.response_headers['WWW-Authenticate'],
 
603
                         "Keystone uri='https://keystone.example.com:1234'")
 
604
        self.assertEqual(body, [''])
 
605
 
 
606
    def test_request_blank_token(self):
 
607
        req = webob.Request.blank('/')
 
608
        req.headers['X-Auth-Token'] = ''
 
609
        self.middleware(req.environ, self.start_fake_response)
 
610
        self.assertEqual(self.response_status, 401)
 
611
        self.assertEqual(self.response_headers['WWW-Authenticate'],
 
612
                         "Keystone uri='https://keystone.example.com:1234'")
 
613
 
 
614
    def _get_cached_token(self, token):
 
615
        token_id = cms.cms_hash_token(token)
 
616
        # NOTE(vish): example tokens are expired so skip the expiration check.
 
617
        return self.middleware._cache_get(token_id, ignore_expires=True)
 
618
 
 
619
    def test_memcache(self):
 
620
        # NOTE(jamielennox): it appears that httpretty can mess with the
 
621
        # memcache socket. Just disable it as it's not required here anyway.
 
622
        httpretty.disable()
 
623
        req = webob.Request.blank('/')
 
624
        token = self.token_dict['signed_token_scoped']
 
625
        req.headers['X-Auth-Token'] = token
 
626
        self.middleware(req.environ, self.start_fake_response)
 
627
        self.assertNotEqual(self._get_cached_token(token), None)
 
628
 
 
629
    def test_expired(self):
 
630
        httpretty.disable()
 
631
        req = webob.Request.blank('/')
 
632
        token = self.token_dict['signed_token_scoped_expired']
 
633
        req.headers['X-Auth-Token'] = token
 
634
        self.middleware(req.environ, self.start_fake_response)
 
635
        self.assertEqual(self.response_status, 401)
 
636
 
 
637
    def test_memcache_set_invalid_uuid(self):
 
638
        invalid_uri = "%s/v2.0/tokens/invalid-token" % BASE_URI
 
639
        httpretty.register_uri(httpretty.GET, invalid_uri, body="", status=404)
 
640
 
 
641
        req = webob.Request.blank('/')
 
642
        token = 'invalid-token'
 
643
        req.headers['X-Auth-Token'] = token
 
644
        self.middleware(req.environ, self.start_fake_response)
 
645
        self.assertRaises(auth_token.InvalidUserToken,
 
646
                          self._get_cached_token, token)
 
647
 
 
648
    def test_memcache_set_invalid_signed(self):
 
649
        req = webob.Request.blank('/')
 
650
        token = self.token_dict['signed_token_scoped_expired']
 
651
        req.headers['X-Auth-Token'] = token
 
652
        self.middleware(req.environ, self.start_fake_response)
 
653
        self.assertRaises(auth_token.InvalidUserToken,
 
654
                          self._get_cached_token, token)
 
655
 
 
656
    def test_memcache_set_expired(self, extra_conf={}, extra_environ={}):
 
657
        httpretty.disable()
 
658
        token_cache_time = 10
 
659
        conf = {
 
660
            'token_cache_time': token_cache_time,
 
661
            'signing_dir': client_fixtures.CERTDIR,
 
662
        }
 
663
        conf.update(extra_conf)
 
664
        self.set_middleware(conf=conf)
 
665
        req = webob.Request.blank('/')
 
666
        token = self.token_dict['signed_token_scoped']
 
667
        req.headers['X-Auth-Token'] = token
 
668
        req.environ.update(extra_environ)
 
669
        try:
 
670
            now = datetime.datetime.utcnow()
 
671
            timeutils.set_time_override(now)
 
672
            self.middleware(req.environ, self.start_fake_response)
 
673
            self.assertNotEqual(self._get_cached_token(token), None)
 
674
            expired = now + datetime.timedelta(seconds=token_cache_time)
 
675
            timeutils.set_time_override(expired)
 
676
            self.assertEqual(self._get_cached_token(token), None)
 
677
        finally:
 
678
            timeutils.clear_time_override()
 
679
 
 
680
    def test_old_swift_memcache_set_expired(self):
 
681
        extra_conf = {'cache': 'swift.cache'}
 
682
        extra_environ = {'swift.cache': FakeSwiftOldMemcacheClient()}
 
683
        self.test_memcache_set_expired(extra_conf, extra_environ)
 
684
 
 
685
    def test_swift_memcache_set_expired(self):
 
686
        extra_conf = {'cache': 'swift.cache'}
 
687
        extra_environ = {'swift.cache': memorycache.Client()}
 
688
        self.test_memcache_set_expired(extra_conf, extra_environ)
 
689
 
 
690
    def test_use_cache_from_env(self):
 
691
        env = {'swift.cache': 'CACHE_TEST'}
 
692
        conf = {
 
693
            'cache': 'swift.cache',
 
694
            'memcached_servers': ['localhost:11211']
 
695
        }
 
696
        self.set_middleware(conf=conf)
 
697
        self.middleware._init_cache(env)
 
698
        self.assertEqual(self.middleware._cache, 'CACHE_TEST')
 
699
 
 
700
    def test_will_expire_soon(self):
 
701
        tenseconds = datetime.datetime.utcnow() + datetime.timedelta(
 
702
            seconds=10)
 
703
        self.assertTrue(auth_token.will_expire_soon(tenseconds))
 
704
        fortyseconds = datetime.datetime.utcnow() + datetime.timedelta(
 
705
            seconds=40)
 
706
        self.assertFalse(auth_token.will_expire_soon(fortyseconds))
 
707
 
 
708
    def test_token_is_v2_accepts_v2(self):
 
709
        token = client_fixtures.UUID_TOKEN_DEFAULT
 
710
        token_response = client_fixtures.TOKEN_RESPONSES[token]
 
711
        self.assertTrue(auth_token._token_is_v2(token_response))
 
712
 
 
713
    def test_token_is_v2_rejects_v3(self):
 
714
        token = client_fixtures.v3_UUID_TOKEN_DEFAULT
 
715
        token_response = client_fixtures.TOKEN_RESPONSES[token]
 
716
        self.assertFalse(auth_token._token_is_v2(token_response))
 
717
 
 
718
    def test_token_is_v3_rejects_v2(self):
 
719
        token = client_fixtures.UUID_TOKEN_DEFAULT
 
720
        token_response = client_fixtures.TOKEN_RESPONSES[token]
 
721
        self.assertFalse(auth_token._token_is_v3(token_response))
 
722
 
 
723
    def test_token_is_v3_accepts_v3(self):
 
724
        token = client_fixtures.v3_UUID_TOKEN_DEFAULT
 
725
        token_response = client_fixtures.TOKEN_RESPONSES[token]
 
726
        self.assertTrue(auth_token._token_is_v3(token_response))
 
727
 
 
728
    def test_encrypt_cache_data(self):
 
729
        httpretty.disable()
 
730
        conf = {
 
731
            'memcached_servers': ['localhost:11211'],
 
732
            'memcache_security_strategy': 'encrypt',
 
733
            'memcache_secret_key': 'mysecret'
 
734
        }
 
735
        self.set_middleware(conf=conf)
 
736
        token = 'my_token'
 
737
        some_time_later = timeutils.utcnow() + datetime.timedelta(hours=4)
 
738
        expires = timeutils.strtime(some_time_later)
 
739
        data = ('this_data', expires)
 
740
        self.middleware._init_cache({})
 
741
        self.middleware._cache_store(token, data)
 
742
        self.assertEqual(self.middleware._cache_get(token), data[0])
 
743
 
 
744
    def test_sign_cache_data(self):
 
745
        httpretty.disable()
 
746
        conf = {
 
747
            'memcached_servers': ['localhost:11211'],
 
748
            'memcache_security_strategy': 'mac',
 
749
            'memcache_secret_key': 'mysecret'
 
750
        }
 
751
        self.set_middleware(conf=conf)
 
752
        token = 'my_token'
 
753
        some_time_later = timeutils.utcnow() + datetime.timedelta(hours=4)
 
754
        expires = timeutils.strtime(some_time_later)
 
755
        data = ('this_data', expires)
 
756
        self.middleware._init_cache({})
 
757
        self.middleware._cache_store(token, data)
 
758
        self.assertEqual(self.middleware._cache_get(token), data[0])
 
759
 
 
760
    def test_no_memcache_protection(self):
 
761
        httpretty.disable()
 
762
        conf = {
 
763
            'memcached_servers': ['localhost:11211'],
 
764
            'memcache_secret_key': 'mysecret'
 
765
        }
 
766
        self.set_middleware(conf=conf)
 
767
        token = 'my_token'
 
768
        some_time_later = timeutils.utcnow() + datetime.timedelta(hours=4)
 
769
        expires = timeutils.strtime(some_time_later)
 
770
        data = ('this_data', expires)
 
771
        self.middleware._init_cache({})
 
772
        self.middleware._cache_store(token, data)
 
773
        self.assertEqual(self.middleware._cache_get(token), data[0])
 
774
 
 
775
    def test_assert_valid_memcache_protection_config(self):
 
776
        # test missing memcache_secret_key
 
777
        conf = {
 
778
            'memcached_servers': ['localhost:11211'],
 
779
            'memcache_security_strategy': 'Encrypt'
 
780
        }
 
781
        self.assertRaises(auth_token.ConfigurationError, self.set_middleware,
 
782
                          conf=conf)
 
783
        # test invalue memcache_security_strategy
 
784
        conf = {
 
785
            'memcached_servers': ['localhost:11211'],
 
786
            'memcache_security_strategy': 'whatever'
 
787
        }
 
788
        self.assertRaises(auth_token.ConfigurationError, self.set_middleware,
 
789
                          conf=conf)
 
790
        # test missing memcache_secret_key
 
791
        conf = {
 
792
            'memcached_servers': ['localhost:11211'],
 
793
            'memcache_security_strategy': 'mac'
 
794
        }
 
795
        self.assertRaises(auth_token.ConfigurationError, self.set_middleware,
 
796
                          conf=conf)
 
797
        conf = {
 
798
            'memcached_servers': ['localhost:11211'],
 
799
            'memcache_security_strategy': 'Encrypt',
 
800
            'memcache_secret_key': ''
 
801
        }
 
802
        self.assertRaises(auth_token.ConfigurationError, self.set_middleware,
 
803
                          conf=conf)
 
804
        conf = {
 
805
            'memcached_servers': ['localhost:11211'],
 
806
            'memcache_security_strategy': 'mAc',
 
807
            'memcache_secret_key': ''
 
808
        }
 
809
        self.assertRaises(auth_token.ConfigurationError, self.set_middleware,
 
810
                          conf=conf)
 
811
 
 
812
    def test_config_revocation_cache_timeout(self):
 
813
        conf = {
 
814
            'revocation_cache_time': 24,
 
815
            'auth_uri': 'https://keystone.example.com:1234',
 
816
        }
 
817
        middleware = auth_token.AuthProtocol(self.fake_app, conf)
 
818
        self.assertEqual(middleware.token_revocation_list_cache_timeout,
 
819
                         datetime.timedelta(seconds=24))
 
820
 
 
821
    def test_http_error_not_cached_token(self):
 
822
        """Test to don't cache token as invalid on network errors.
 
823
 
 
824
        We use UUID tokens since they are the easiest one to reach
 
825
        get_http_connection.
 
826
        """
 
827
        req = webob.Request.blank('/')
 
828
        req.headers['X-Auth-Token'] = ERROR_TOKEN
 
829
        self.middleware.http_request_max_retries = 0
 
830
        self.middleware(req.environ, self.start_fake_response)
 
831
        self.assertEqual(self._get_cached_token(ERROR_TOKEN), None)
 
832
        self.assert_valid_last_url(ERROR_TOKEN)
 
833
 
 
834
    def test_http_request_max_retries(self):
 
835
        times_retry = 10
 
836
 
 
837
        req = webob.Request.blank('/')
 
838
        req.headers['X-Auth-Token'] = ERROR_TOKEN
 
839
 
 
840
        conf = {'http_request_max_retries': times_retry}
 
841
        self.set_middleware(conf=conf)
 
842
 
 
843
        with mock.patch('time.sleep') as mock_obj:
 
844
            self.middleware(req.environ, self.start_fake_response)
 
845
 
 
846
        self.assertEqual(mock_obj.call_count, times_retry)
 
847
 
 
848
 
 
849
class CertDownloadMiddlewareTest(BaseAuthTokenMiddlewareTest):
 
850
    def setUp(self):
 
851
        super(CertDownloadMiddlewareTest, self).setUp()
 
852
        self.base_dir = tempfile.mkdtemp()
 
853
        self.cert_dir = os.path.join(self.base_dir, 'certs')
 
854
        os.mkdir(self.cert_dir)
 
855
        conf = {
 
856
            'signing_dir': self.cert_dir,
 
857
        }
 
858
        self.set_middleware(conf=conf)
 
859
 
 
860
        httpretty.enable()
 
861
 
 
862
    def tearDown(self):
 
863
        httpretty.disable()
 
864
        shutil.rmtree(self.base_dir)
 
865
        super(CertDownloadMiddlewareTest, self).tearDown()
 
866
 
 
867
    # Usually we supply a signed_dir with pre-installed certificates,
 
868
    # so invocation of /usr/bin/openssl succeeds. This time we give it
 
869
    # an empty directory, so it fails.
 
870
    def test_request_no_token_dummy(self):
 
871
        cms._ensure_subprocess()
 
872
 
 
873
        httpretty.register_uri(httpretty.GET,
 
874
                               "%s/v2.0/certificates/ca" % BASE_URI,
 
875
                               status=404)
 
876
        httpretty.register_uri(httpretty.GET,
 
877
                               "%s/v2.0/certificates/signing" % BASE_URI,
 
878
                               status=404)
 
879
        self.assertRaises(cms.subprocess.CalledProcessError,
 
880
                          self.middleware.verify_signed_token,
 
881
                          client_fixtures.SIGNED_TOKEN_SCOPED)
 
882
 
 
883
    def test_fetch_signing_cert(self):
 
884
        data = 'FAKE CERT'
 
885
        httpretty.register_uri(httpretty.GET,
 
886
                               "%s/v2.0/certificates/signing" % BASE_URI,
 
887
                               body=data)
 
888
        self.middleware.fetch_signing_cert()
 
889
 
 
890
        with open(self.middleware.signing_cert_file_name, 'r') as f:
 
891
            self.assertEqual(f.read(), data)
 
892
 
 
893
        self.assertEqual("/testadmin/v2.0/certificates/signing",
 
894
                         httpretty.httpretty.last_request.path)
 
895
 
 
896
    def test_fetch_signing_ca(self):
 
897
        data = 'FAKE CA'
 
898
        httpretty.register_uri(httpretty.GET,
 
899
                               "%s/v2.0/certificates/ca" % BASE_URI,
 
900
                               body=data)
 
901
        self.middleware.fetch_ca_cert()
 
902
 
 
903
        with open(self.middleware.signing_ca_file_name, 'r') as f:
 
904
            self.assertEqual(f.read(), data)
 
905
 
 
906
        self.assertEqual("/testadmin/v2.0/certificates/ca",
 
907
                         httpretty.httpretty.last_request.path)
 
908
 
 
909
    def test_prefix_trailing_slash(self):
 
910
        self.conf['auth_admin_prefix'] = '/newadmin/'
 
911
 
 
912
        httpretty.register_uri(httpretty.GET,
 
913
                               "%s/newadmin/v2.0/certificates/ca" % BASE_HOST,
 
914
                               body='FAKECA')
 
915
        httpretty.register_uri(httpretty.GET,
 
916
                               "%s/newadmin/v2.0/certificates/signing" %
 
917
                               BASE_HOST, body='FAKECERT')
 
918
 
 
919
        self.set_middleware(conf=self.conf)
 
920
 
 
921
        self.middleware.fetch_ca_cert()
 
922
 
 
923
        self.assertEqual('/newadmin/v2.0/certificates/ca',
 
924
                         httpretty.httpretty.last_request.path)
 
925
 
 
926
        self.middleware.fetch_signing_cert()
 
927
 
 
928
        self.assertEqual('/newadmin/v2.0/certificates/signing',
 
929
                         httpretty.httpretty.last_request.path)
 
930
 
 
931
    def test_without_prefix(self):
 
932
        self.conf['auth_admin_prefix'] = ''
 
933
 
 
934
        httpretty.register_uri(httpretty.GET,
 
935
                               "%s/v2.0/certificates/ca" % BASE_HOST,
 
936
                               body='FAKECA')
 
937
        httpretty.register_uri(httpretty.GET,
 
938
                               "%s/v2.0/certificates/signing" % BASE_HOST,
 
939
                               body='FAKECERT')
 
940
 
 
941
        self.set_middleware(conf=self.conf)
 
942
 
 
943
        self.middleware.fetch_ca_cert()
 
944
 
 
945
        self.assertEqual('/v2.0/certificates/ca',
 
946
                         httpretty.httpretty.last_request.path)
 
947
 
 
948
        self.middleware.fetch_signing_cert()
 
949
 
 
950
        self.assertEqual('/v2.0/certificates/signing',
 
951
                         httpretty.httpretty.last_request.path)
 
952
 
 
953
 
 
954
def network_error_response(method, uri, headers):
 
955
    raise auth_token.NetworkError("Network connection error.")
 
956
 
 
957
 
 
958
class v2AuthTokenMiddlewareTest(BaseAuthTokenMiddlewareTest,
 
959
                                CommonAuthTokenMiddlewareTest):
 
960
    """v2 token specific tests.
 
961
 
 
962
    There are some differences between how the auth-token middleware handles
 
963
    v2 and v3 tokens over and above the token formats, namely:
 
964
 
 
965
    - A v3 keystone server will auto scope a token to a user's default project
 
966
      if no scope is specified. A v2 server assumes that the auth-token
 
967
      middleware will do that.
 
968
    - A v2 keystone server may issue a token without a catalog, even with a
 
969
      tenant
 
970
 
 
971
    The tests below were originally part of the generic AuthTokenMiddlewareTest
 
972
    class, but now, since they really are v2 specifc, they are included here.
 
973
 
 
974
    """
 
975
 
 
976
    def setUp(self):
 
977
        super(v2AuthTokenMiddlewareTest, self).setUp()
 
978
 
 
979
        self.token_dict = {
 
980
            'uuid_token_default': client_fixtures.UUID_TOKEN_DEFAULT,
 
981
            'uuid_token_unscoped': client_fixtures.UUID_TOKEN_UNSCOPED,
 
982
            'signed_token_scoped': client_fixtures.SIGNED_TOKEN_SCOPED,
 
983
            'signed_token_scoped_expired':
 
984
            client_fixtures.SIGNED_TOKEN_SCOPED_EXPIRED,
 
985
            'revoked_token': client_fixtures.REVOKED_TOKEN,
 
986
            'revoked_token_hash': client_fixtures.REVOKED_TOKEN_HASH
 
987
        }
 
988
 
 
989
        httpretty.httpretty.reset()
 
990
        httpretty.enable()
 
991
 
 
992
        httpretty.register_uri(httpretty.GET,
 
993
                               "%s/" % BASE_URI,
 
994
                               body=VERSION_LIST_v2,
 
995
                               status=300)
 
996
 
 
997
        httpretty.register_uri(httpretty.POST,
 
998
                               "%s/v2.0/tokens" % BASE_URI,
 
999
                               body=FAKE_ADMIN_TOKEN)
 
1000
 
 
1001
        httpretty.register_uri(httpretty.GET,
 
1002
                               "%s/v2.0/tokens/revoked" % BASE_URI,
 
1003
                               body=client_fixtures.SIGNED_REVOCATION_LIST,
 
1004
                               status=200)
 
1005
 
 
1006
        for token in (client_fixtures.UUID_TOKEN_DEFAULT,
 
1007
                      client_fixtures.UUID_TOKEN_UNSCOPED,
 
1008
                      client_fixtures.UUID_TOKEN_NO_SERVICE_CATALOG):
 
1009
            httpretty.register_uri(httpretty.GET,
 
1010
                                   "%s/v2.0/tokens/%s" % (BASE_URI, token),
 
1011
                                   body=
 
1012
                                   client_fixtures.JSON_TOKEN_RESPONSES[token])
 
1013
 
 
1014
        httpretty.register_uri(httpretty.GET,
 
1015
                               '%s/v2.0/tokens/%s' % (BASE_URI, ERROR_TOKEN),
 
1016
                               body=network_error_response)
 
1017
 
 
1018
        self.set_middleware()
 
1019
 
 
1020
    def tearDown(self):
 
1021
        httpretty.disable()
 
1022
        super(v2AuthTokenMiddlewareTest, self).tearDown()
 
1023
 
 
1024
    def assert_unscoped_default_tenant_auto_scopes(self, token):
 
1025
        """Unscoped v2 requests with a default tenant should "auto-scope."
 
1026
 
 
1027
        The implied scope is the user's tenant ID.
 
1028
 
 
1029
        """
 
1030
        req = webob.Request.blank('/')
 
1031
        req.headers['X-Auth-Token'] = token
 
1032
        body = self.middleware(req.environ, self.start_fake_response)
 
1033
        self.assertEqual(self.response_status, 200)
 
1034
        self.assertEqual(body, ['SUCCESS'])
 
1035
        self.assertTrue('keystone.token_info' in req.environ)
 
1036
 
 
1037
    def assert_valid_last_url(self, token_id):
 
1038
        self.assertLastPath("/testadmin/v2.0/tokens/%s" % token_id)
 
1039
 
 
1040
    def test_default_tenant_uuid_token(self):
 
1041
        self.assert_unscoped_default_tenant_auto_scopes(
 
1042
            client_fixtures.UUID_TOKEN_DEFAULT)
 
1043
 
 
1044
    def test_default_tenant_signed_token(self):
 
1045
        self.assert_unscoped_default_tenant_auto_scopes(
 
1046
            client_fixtures.SIGNED_TOKEN_SCOPED)
 
1047
 
 
1048
    def assert_unscoped_token_receives_401(self, token):
 
1049
        """Unscoped requests with no default tenant ID should be rejected."""
 
1050
        req = webob.Request.blank('/')
 
1051
        req.headers['X-Auth-Token'] = token
 
1052
        self.middleware(req.environ, self.start_fake_response)
 
1053
        self.assertEqual(self.response_status, 401)
 
1054
        self.assertEqual(self.response_headers['WWW-Authenticate'],
 
1055
                         "Keystone uri='https://keystone.example.com:1234'")
 
1056
 
 
1057
    def test_unscoped_uuid_token_receives_401(self):
 
1058
        self.assert_unscoped_token_receives_401(
 
1059
            client_fixtures.UUID_TOKEN_UNSCOPED)
 
1060
 
 
1061
    def test_unscoped_pki_token_receives_401(self):
 
1062
        self.assert_unscoped_token_receives_401(
 
1063
            client_fixtures.SIGNED_TOKEN_UNSCOPED)
 
1064
 
 
1065
    def test_request_prevent_service_catalog_injection(self):
 
1066
        req = webob.Request.blank('/')
 
1067
        req.headers['X-Service-Catalog'] = '[]'
 
1068
        req.headers['X-Auth-Token'] = \
 
1069
            client_fixtures.UUID_TOKEN_NO_SERVICE_CATALOG
 
1070
        body = self.middleware(req.environ, self.start_fake_response)
 
1071
        self.assertEqual(self.response_status, 200)
 
1072
        self.assertFalse(req.headers.get('X-Service-Catalog'))
 
1073
        self.assertEqual(body, ['SUCCESS'])
 
1074
 
 
1075
 
 
1076
class CrossVersionAuthTokenMiddlewareTest(BaseAuthTokenMiddlewareTest):
 
1077
 
 
1078
    @httpretty.activate
 
1079
    def test_valid_uuid_request_forced_to_2_0(self):
 
1080
        """Test forcing auth_token to use lower api version.
 
1081
 
 
1082
        By installing the v3 http hander, auth_token will be get
 
1083
        a version list that looks like a v3 server - from which it
 
1084
        would normally chose v3.0 as the auth version.  However, here
 
1085
        we specify v2.0 in the configuration - which should force
 
1086
        auth_token to use that version instead.
 
1087
 
 
1088
        """
 
1089
        conf = {
 
1090
            'signing_dir': client_fixtures.CERTDIR,
 
1091
            'auth_version': 'v2.0'
 
1092
        }
 
1093
 
 
1094
        httpretty.register_uri(httpretty.GET,
 
1095
                               "%s/" % BASE_URI,
 
1096
                               body=VERSION_LIST_v3,
 
1097
                               status=300)
 
1098
 
 
1099
        httpretty.register_uri(httpretty.POST,
 
1100
                               "%s/v2.0/tokens" % BASE_URI,
 
1101
                               body=FAKE_ADMIN_TOKEN)
 
1102
 
 
1103
        token = client_fixtures.UUID_TOKEN_DEFAULT
 
1104
        httpretty.register_uri(httpretty.GET,
 
1105
                               "%s/v2.0/tokens/%s" % (BASE_URI, token),
 
1106
                               body=
 
1107
                               client_fixtures.JSON_TOKEN_RESPONSES[token])
 
1108
 
 
1109
        self.set_middleware(conf=conf)
 
1110
 
 
1111
        # This tests will only work is auth_token has chosen to use the
 
1112
        # lower, v2, api version
 
1113
        req = webob.Request.blank('/')
 
1114
        req.headers['X-Auth-Token'] = client_fixtures.UUID_TOKEN_DEFAULT
 
1115
        self.middleware(req.environ, self.start_fake_response)
 
1116
        self.assertEqual(self.response_status, 200)
 
1117
        self.assertEqual("/testadmin/v2.0/tokens/%s" %
 
1118
                         client_fixtures.UUID_TOKEN_DEFAULT,
 
1119
                         httpretty.httpretty.last_request.path)
 
1120
 
 
1121
 
 
1122
class v3AuthTokenMiddlewareTest(BaseAuthTokenMiddlewareTest,
 
1123
                                CommonAuthTokenMiddlewareTest):
 
1124
    """Test auth_token middleware with v3 tokens.
 
1125
 
 
1126
    Re-execute the AuthTokenMiddlewareTest class tests, but with the
 
1127
    the auth_token middleware configured to expect v3 tokens back from
 
1128
    a keystone server.
 
1129
 
 
1130
    This is done by configuring the AuthTokenMiddlewareTest class via
 
1131
    its Setup(), passing in v3 style data that will then be used by
 
1132
    the tests themselves.  This approach has been used to ensure we
 
1133
    really are running the same tests for both v2 and v3 tokens.
 
1134
 
 
1135
    There a few additional specific test for v3 only:
 
1136
 
 
1137
    - We allow an unscoped token to be validated (as unscoped), where
 
1138
      as for v2 tokens, the auth_token middleware is expected to try and
 
1139
      auto-scope it (and fail if there is no default tenant)
 
1140
    - Domain scoped tokens
 
1141
 
 
1142
    Since we don't specify an auth version for auth_token to use, by
 
1143
    definition we are thefore implicitely testing that it will use
 
1144
    the highest available auth version, i.e. v3.0
 
1145
 
 
1146
    """
 
1147
    def setUp(self):
 
1148
        super(v3AuthTokenMiddlewareTest, self).setUp(
 
1149
            auth_version='v3.0',
 
1150
            fake_app=v3FakeApp)
 
1151
 
 
1152
        self.token_dict = {
 
1153
            'uuid_token_default': client_fixtures.v3_UUID_TOKEN_DEFAULT,
 
1154
            'uuid_token_unscoped': client_fixtures.v3_UUID_TOKEN_UNSCOPED,
 
1155
            'signed_token_scoped': client_fixtures.SIGNED_v3_TOKEN_SCOPED,
 
1156
            'signed_token_scoped_expired':
 
1157
            client_fixtures.SIGNED_TOKEN_SCOPED_EXPIRED,
 
1158
            'revoked_token': client_fixtures.REVOKED_v3_TOKEN,
 
1159
            'revoked_token_hash': client_fixtures.REVOKED_v3_TOKEN_HASH
 
1160
        }
 
1161
 
 
1162
        httpretty.httpretty.reset()
 
1163
        httpretty.enable()
 
1164
 
 
1165
        httpretty.register_uri(httpretty.GET,
 
1166
                               "%s" % BASE_URI,
 
1167
                               body=VERSION_LIST_v3,
 
1168
                               status=300)
 
1169
 
 
1170
        # TODO(jamielennox): auth_token middleware uses a v2 admin token
 
1171
        # regardless of the auth_version that is set.
 
1172
        httpretty.register_uri(httpretty.POST,
 
1173
                               "%s/v2.0/tokens" % BASE_URI,
 
1174
                               body=FAKE_ADMIN_TOKEN)
 
1175
 
 
1176
        # TODO(jamielennox): there is no v3 revocation url yet, it uses v2
 
1177
        httpretty.register_uri(httpretty.GET,
 
1178
                               "%s/v2.0/tokens/revoked" % BASE_URI,
 
1179
                               body=client_fixtures.SIGNED_REVOCATION_LIST,
 
1180
                               status=200)
 
1181
 
 
1182
        httpretty.register_uri(httpretty.GET,
 
1183
                               "%s/v3/auth/tokens" % BASE_URI,
 
1184
                               body=self.token_response)
 
1185
 
 
1186
        self.set_middleware()
 
1187
 
 
1188
    def tearDown(self):
 
1189
        httpretty.disable()
 
1190
        super(v3AuthTokenMiddlewareTest, self).tearDown()
 
1191
 
 
1192
    def token_response(self, request, uri, headers):
 
1193
        auth_id = request.headers.get('X-Auth-Token')
 
1194
        token_id = request.headers.get('X-Subject-Token')
 
1195
        self.assertEqual(auth_id, FAKE_ADMIN_TOKEN_ID)
 
1196
        headers.pop('status')
 
1197
 
 
1198
        status = 200
 
1199
        response = ""
 
1200
 
 
1201
        if token_id == ERROR_TOKEN:
 
1202
            raise auth_token.NetworkError("Network connection error.")
 
1203
 
 
1204
        try:
 
1205
            response = client_fixtures.JSON_TOKEN_RESPONSES[token_id]
 
1206
        except KeyError:
 
1207
            status = 404
 
1208
 
 
1209
        return status, headers, response
 
1210
 
 
1211
    def assert_valid_last_url(self, token_id):
 
1212
        self.assertLastPath('/testadmin/v3/auth/tokens')
 
1213
 
 
1214
    def test_valid_unscoped_uuid_request(self):
 
1215
        # Remove items that won't be in an unscoped token
 
1216
        delta_expected_env = {
 
1217
            'HTTP_X_PROJECT_ID': None,
 
1218
            'HTTP_X_PROJECT_NAME': None,
 
1219
            'HTTP_X_PROJECT_DOMAIN_ID': None,
 
1220
            'HTTP_X_PROJECT_DOMAIN_NAME': None,
 
1221
            'HTTP_X_TENANT_ID': None,
 
1222
            'HTTP_X_TENANT_NAME': None,
 
1223
            'HTTP_X_ROLES': '',
 
1224
            'HTTP_X_TENANT': None,
 
1225
            'HTTP_X_ROLE': '',
 
1226
        }
 
1227
        self.set_middleware(expected_env=delta_expected_env)
 
1228
        self.assert_valid_request_200(client_fixtures.v3_UUID_TOKEN_UNSCOPED,
 
1229
                                      with_catalog=False)
 
1230
        self.assertLastPath('/testadmin/v3/auth/tokens')
 
1231
 
 
1232
    def test_domain_scoped_uuid_request(self):
 
1233
        # Modify items compared to default token for a domain scope
 
1234
        delta_expected_env = {
 
1235
            'HTTP_X_DOMAIN_ID': 'domain_id1',
 
1236
            'HTTP_X_DOMAIN_NAME': 'domain_name1',
 
1237
            'HTTP_X_PROJECT_ID': None,
 
1238
            'HTTP_X_PROJECT_NAME': None,
 
1239
            'HTTP_X_PROJECT_DOMAIN_ID': None,
 
1240
            'HTTP_X_PROJECT_DOMAIN_NAME': None,
 
1241
            'HTTP_X_TENANT_ID': None,
 
1242
            'HTTP_X_TENANT_NAME': None,
 
1243
            'HTTP_X_TENANT': None
 
1244
        }
 
1245
        self.set_middleware(expected_env=delta_expected_env)
 
1246
        self.assert_valid_request_200(
 
1247
            client_fixtures.v3_UUID_TOKEN_DOMAIN_SCOPED)
 
1248
        self.assertLastPath('/testadmin/v3/auth/tokens')
 
1249
 
 
1250
 
 
1251
class TokenEncodingTest(testtools.TestCase):
 
1252
    def test_unquoted_token(self):
 
1253
        self.assertEqual('foo%20bar', auth_token.safe_quote('foo bar'))
 
1254
 
 
1255
    def test_quoted_token(self):
 
1256
        self.assertEqual('foo%20bar', auth_token.safe_quote('foo%20bar'))
 
1257
 
 
1258
 
 
1259
class TokenExpirationTest(BaseAuthTokenMiddlewareTest):
 
1260
    def setUp(self):
 
1261
        super(TokenExpirationTest, self).setUp()
 
1262
        timeutils.set_time_override()
 
1263
        self.now = timeutils.utcnow()
 
1264
        self.delta = datetime.timedelta(hours=1)
 
1265
        self.one_hour_ago = timeutils.isotime(self.now - self.delta,
 
1266
                                              subsecond=True)
 
1267
        self.one_hour_earlier = timeutils.isotime(self.now + self.delta,
 
1268
                                                  subsecond=True)
 
1269
 
 
1270
    def tearDown(self):
 
1271
        super(TokenExpirationTest, self).tearDown()
 
1272
        timeutils.clear_time_override()
 
1273
 
 
1274
    def create_v2_token_fixture(self, expires=None):
 
1275
        v2_fixture = {
 
1276
            'access': {
 
1277
                'token': {
 
1278
                    'id': 'blah',
 
1279
                    'expires': expires or self.one_hour_earlier,
 
1280
                    'tenant': {
 
1281
                        'id': 'tenant_id1',
 
1282
                        'name': 'tenant_name1',
 
1283
                    },
 
1284
                },
 
1285
                'user': {
 
1286
                    'id': 'user_id1',
 
1287
                    'name': 'user_name1',
 
1288
                    'roles': [
 
1289
                        {'name': 'role1'},
 
1290
                        {'name': 'role2'},
 
1291
                    ],
 
1292
                },
 
1293
                'serviceCatalog': {}
 
1294
            },
 
1295
        }
 
1296
 
 
1297
        return v2_fixture
 
1298
 
 
1299
    def create_v3_token_fixture(self, expires=None):
 
1300
 
 
1301
        v3_fixture = {
 
1302
            'token': {
 
1303
                'expires_at': expires or self.one_hour_earlier,
 
1304
                'user': {
 
1305
                    'id': 'user_id1',
 
1306
                    'name': 'user_name1',
 
1307
                    'domain': {
 
1308
                        'id': 'domain_id1',
 
1309
                        'name': 'domain_name1'
 
1310
                    }
 
1311
                },
 
1312
                'project': {
 
1313
                    'id': 'tenant_id1',
 
1314
                    'name': 'tenant_name1',
 
1315
                    'domain': {
 
1316
                        'id': 'domain_id1',
 
1317
                        'name': 'domain_name1'
 
1318
                    }
 
1319
                },
 
1320
                'roles': [
 
1321
                    {'name': 'role1', 'id': 'Role1'},
 
1322
                    {'name': 'role2', 'id': 'Role2'},
 
1323
                ],
 
1324
                'catalog': {}
 
1325
            }
 
1326
        }
 
1327
 
 
1328
        return v3_fixture
 
1329
 
 
1330
    def test_no_data(self):
 
1331
        data = {}
 
1332
        self.assertRaises(auth_token.InvalidUserToken,
 
1333
                          auth_token.confirm_token_not_expired,
 
1334
                          data)
 
1335
 
 
1336
    def test_bad_data(self):
 
1337
        data = {'my_happy_token_dict': 'woo'}
 
1338
        self.assertRaises(auth_token.InvalidUserToken,
 
1339
                          auth_token.confirm_token_not_expired,
 
1340
                          data)
 
1341
 
 
1342
    def test_v2_token_not_expired(self):
 
1343
        data = self.create_v2_token_fixture()
 
1344
        expected_expires = data['access']['token']['expires']
 
1345
        actual_expires = auth_token.confirm_token_not_expired(data)
 
1346
        self.assertEqual(actual_expires, expected_expires)
 
1347
 
 
1348
    def test_v2_token_expired(self):
 
1349
        data = self.create_v2_token_fixture(expires=self.one_hour_ago)
 
1350
        self.assertRaises(auth_token.InvalidUserToken,
 
1351
                          auth_token.confirm_token_not_expired,
 
1352
                          data)
 
1353
 
 
1354
    def test_v2_token_with_timezone_offset_not_expired(self):
 
1355
        current_time = timeutils.parse_isotime('2000-01-01T00:01:10.000123Z')
 
1356
        current_time = timeutils.normalize_time(current_time)
 
1357
        timeutils.set_time_override(current_time)
 
1358
        data = self.create_v2_token_fixture(
 
1359
            expires='2000-01-01T00:05:10.000123-05:00')
 
1360
        expected_expires = '2000-01-01T05:05:10.000123Z'
 
1361
        actual_expires = auth_token.confirm_token_not_expired(data)
 
1362
        self.assertEqual(actual_expires, expected_expires)
 
1363
 
 
1364
    def test_v2_token_with_timezone_offset_expired(self):
 
1365
        current_time = timeutils.parse_isotime('2000-01-01T00:01:10.000123Z')
 
1366
        current_time = timeutils.normalize_time(current_time)
 
1367
        timeutils.set_time_override(current_time)
 
1368
        data = self.create_v2_token_fixture(
 
1369
            expires='2000-01-01T00:05:10.000123+05:00')
 
1370
        data['access']['token']['expires'] = '2000-01-01T00:05:10.000123+05:00'
 
1371
        self.assertRaises(auth_token.InvalidUserToken,
 
1372
                          auth_token.confirm_token_not_expired,
 
1373
                          data)
 
1374
 
 
1375
    def test_v3_token_not_expired(self):
 
1376
        data = self.create_v3_token_fixture()
 
1377
        expected_expires = data['token']['expires_at']
 
1378
        actual_expires = auth_token.confirm_token_not_expired(data)
 
1379
        self.assertEqual(actual_expires, expected_expires)
 
1380
 
 
1381
    def test_v3_token_expired(self):
 
1382
        data = self.create_v3_token_fixture(expires=self.one_hour_ago)
 
1383
        self.assertRaises(auth_token.InvalidUserToken,
 
1384
                          auth_token.confirm_token_not_expired,
 
1385
                          data)
 
1386
 
 
1387
    def test_v3_token_with_timezone_offset_not_expired(self):
 
1388
        current_time = timeutils.parse_isotime('2000-01-01T00:01:10.000123Z')
 
1389
        current_time = timeutils.normalize_time(current_time)
 
1390
        timeutils.set_time_override(current_time)
 
1391
        data = self.create_v3_token_fixture(
 
1392
            expires='2000-01-01T00:05:10.000123-05:00')
 
1393
        expected_expires = '2000-01-01T05:05:10.000123Z'
 
1394
 
 
1395
        actual_expires = auth_token.confirm_token_not_expired(data)
 
1396
        self.assertEqual(actual_expires, expected_expires)
 
1397
 
 
1398
    def test_v3_token_with_timezone_offset_expired(self):
 
1399
        current_time = timeutils.parse_isotime('2000-01-01T00:01:10.000123Z')
 
1400
        current_time = timeutils.normalize_time(current_time)
 
1401
        timeutils.set_time_override(current_time)
 
1402
        data = self.create_v3_token_fixture(
 
1403
            expires='2000-01-01T00:05:10.000123+05:00')
 
1404
        self.assertRaises(auth_token.InvalidUserToken,
 
1405
                          auth_token.confirm_token_not_expired,
 
1406
                          data)
 
1407
 
 
1408
    def test_cached_token_not_expired(self):
 
1409
        token = 'mytoken'
 
1410
        data = 'this_data'
 
1411
        self.set_middleware()
 
1412
        self.middleware._init_cache({})
 
1413
        some_time_later = timeutils.strtime(at=(self.now + self.delta))
 
1414
        expires = some_time_later
 
1415
        self.middleware._cache_put(token, data, expires)
 
1416
        self.assertEqual(self.middleware._cache_get(token), data)
 
1417
 
 
1418
    def test_cached_token_not_expired_with_old_style_nix_timestamp(self):
 
1419
        """Ensure we cannot retrieve a token from the cache.
 
1420
 
 
1421
        Getting a token from the cache should return None when the token data
 
1422
        in the cache stores the expires time as a *nix style timestamp.
 
1423
 
 
1424
        """
 
1425
        token = 'mytoken'
 
1426
        data = 'this_data'
 
1427
        self.set_middleware()
 
1428
        self.middleware._init_cache({})
 
1429
        some_time_later = self.now + self.delta
 
1430
        # Store a unix timestamp in the cache.
 
1431
        expires = calendar.timegm(some_time_later.timetuple())
 
1432
        self.middleware._cache_put(token, data, expires)
 
1433
        self.assertIsNone(self.middleware._cache_get(token))
 
1434
 
 
1435
    def test_cached_token_expired(self):
 
1436
        token = 'mytoken'
 
1437
        data = 'this_data'
 
1438
        self.set_middleware()
 
1439
        self.middleware._init_cache({})
 
1440
        some_time_earlier = timeutils.strtime(at=(self.now - self.delta))
 
1441
        expires = some_time_earlier
 
1442
        self.middleware._cache_put(token, data, expires)
 
1443
        self.assertIsNone(self.middleware._cache_get(token))
 
1444
 
 
1445
    def test_cached_token_with_timezone_offset_not_expired(self):
 
1446
        token = 'mytoken'
 
1447
        data = 'this_data'
 
1448
        self.set_middleware()
 
1449
        self.middleware._init_cache({})
 
1450
        timezone_offset = datetime.timedelta(hours=2)
 
1451
        some_time_later = self.now - timezone_offset + self.delta
 
1452
        expires = timeutils.strtime(some_time_later) + '-02:00'
 
1453
        self.middleware._cache_put(token, data, expires)
 
1454
        self.assertEqual(self.middleware._cache_get(token), data)
 
1455
 
 
1456
    def test_cached_token_with_timezone_offset_expired(self):
 
1457
        token = 'mytoken'
 
1458
        data = 'this_data'
 
1459
        self.set_middleware()
 
1460
        self.middleware._init_cache({})
 
1461
        timezone_offset = datetime.timedelta(hours=2)
 
1462
        some_time_earlier = self.now - timezone_offset - self.delta
 
1463
        expires = timeutils.strtime(some_time_earlier) + '-02:00'
 
1464
        self.middleware._cache_put(token, data, expires)
 
1465
        self.assertIsNone(self.middleware._cache_get(token))