~ubuntu-branches/ubuntu/trusty/python-keystoneclient/trusty-proposed

« back to all changes in this revision

Viewing changes to .pc/skip-tests-ubuntu.patch/tests/test_auth_token_middleware.py

  • Committer: Package Import Robot
  • Author(s): Chuck Short, Adam Gandelman, Chuck Short
  • Date: 2013-11-14 10:51:32 UTC
  • mfrom: (1.1.23)
  • Revision ID: package-import@ubuntu.com-20131114105132-p1o428l7fclasv9e
Tags: 1:0.4.1-0ubuntu1
[ Adam Gandelman ]
* debian/patches: Refreshed.
* debian/patches/use-mox-dependency.patch: Use mox instead of mox3
  dependency.

[ Chuck Short ]
* New upstream release.
* debian/control:
  - open icehouse release.
  - Dropped python-d2to1 and python-httplib2 dependency.
* debian/patches/skip-tests-ubuntu.patch: Dropped no longer needed.

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# vim: tabstop=4 shiftwidth=4 softtabstop=4
2
 
 
3
 
# Copyright 2012 OpenStack LLC
4
 
#
5
 
# Licensed under the Apache License, Version 2.0 (the "License"); you may
6
 
# not use this file except in compliance with the License. You may obtain
7
 
# a copy of the License at
8
 
#
9
 
#      http://www.apache.org/licenses/LICENSE-2.0
10
 
#
11
 
# Unless required by applicable law or agreed to in writing, software
12
 
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
13
 
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
14
 
# License for the specific language governing permissions and limitations
15
 
# under the License.
16
 
 
17
 
import datetime
18
 
import iso8601
19
 
import os
20
 
import shutil
21
 
import stat
22
 
import sys
23
 
import tempfile
24
 
import testtools
25
 
import uuid
26
 
 
27
 
import fixtures
28
 
import webob
29
 
 
30
 
from keystoneclient.common import cms
31
 
from keystoneclient.middleware import auth_token
32
 
from keystoneclient.openstack.common import jsonutils
33
 
from keystoneclient.openstack.common import memorycache
34
 
from keystoneclient.openstack.common import timeutils
35
 
 
36
 
import client_fixtures
37
 
 
38
 
SIGNED_REVOCATION_LIST = None
39
 
VALID_SIGNED_REVOCATION_LIST = client_fixtures.SIGNED_REVOCATION_LIST
40
 
 
41
 
EXPECTED_V2_DEFAULT_ENV_RESPONSE = {
42
 
    'HTTP_X_IDENTITY_STATUS': 'Confirmed',
43
 
    'HTTP_X_TENANT_ID': 'tenant_id1',
44
 
    'HTTP_X_TENANT_NAME': 'tenant_name1',
45
 
    'HTTP_X_USER_ID': 'user_id1',
46
 
    'HTTP_X_USER_NAME': 'user_name1',
47
 
    'HTTP_X_ROLES': 'role1,role2',
48
 
    'HTTP_X_USER': 'user_name1',  # deprecated (diablo-compat)
49
 
    'HTTP_X_TENANT': 'tenant_name1',  # deprecated (diablo-compat)
50
 
    'HTTP_X_ROLE': 'role1,role2',  # deprecated (diablo-compat)
51
 
}
52
 
 
53
 
FAKE_RESPONSE_STACK = []
54
 
 
55
 
VERSION_LIST_v3 = {
56
 
    "versions": {
57
 
        "values": [
58
 
            {
59
 
                "id": "v3.0",
60
 
                "status": "stable",
61
 
                "updated": "2013-03-06T00:00:00Z",
62
 
                "links": []
63
 
            },
64
 
            {
65
 
                "id": "v2.0",
66
 
                "status": "beta",
67
 
                "updated": "2011-11-19T00:00:00Z",
68
 
                "links": []
69
 
            }
70
 
        ]
71
 
    }
72
 
}
73
 
 
74
 
VERSION_LIST_v2 = {
75
 
    "versions": {
76
 
        "values": [
77
 
            {
78
 
                "id": "v2.0",
79
 
                "status": "beta",
80
 
                "updated": "2011-11-19T00:00:00Z",
81
 
                "links": []
82
 
            }
83
 
        ]
84
 
    }
85
 
}
86
 
 
87
 
 
88
 
class NoModuleFinder(object):
89
 
    """Disallow further imports of 'module'."""
90
 
 
91
 
    def __init__(self, module):
92
 
        self.module = module
93
 
 
94
 
    def find_module(self, fullname, path):
95
 
        if fullname == self.module or fullname.startswith(self.module + '.'):
96
 
            raise ImportError
97
 
 
98
 
 
99
 
class DisableModuleFixture(fixtures.Fixture):
100
 
    """A fixture to provide support for unloading/disabling modules."""
101
 
 
102
 
    def __init__(self, module, *args, **kw):
103
 
        super(DisableModuleFixture, self).__init__(*args, **kw)
104
 
        self.module = module
105
 
        self._finders = []
106
 
        self._cleared_modules = {}
107
 
 
108
 
    def tearDown(self):
109
 
        super(DisableModuleFixture, self).tearDown()
110
 
        for finder in self._finders:
111
 
            sys.meta_path.remove(finder)
112
 
        sys.modules.update(self._cleared_modules)
113
 
 
114
 
    def clear_module(self):
115
 
        cleared_modules = {}
116
 
        for fullname in sys.modules.keys():
117
 
            if (fullname == self.module or
118
 
                    fullname.startswith(self.module + '.')):
119
 
                cleared_modules[fullname] = sys.modules.pop(fullname)
120
 
        return cleared_modules
121
 
 
122
 
    def setUp(self):
123
 
        """Ensure ImportError for the specified module."""
124
 
 
125
 
        super(DisableModuleFixture, self).setUp()
126
 
 
127
 
        # Clear 'module' references in sys.modules
128
 
        self._cleared_modules.update(self.clear_module())
129
 
 
130
 
        finder = NoModuleFinder(self.module)
131
 
        self._finders.append(finder)
132
 
        sys.meta_path.insert(0, finder)
133
 
 
134
 
 
135
 
class FakeSwiftOldMemcacheClient(memorycache.Client):
136
 
    # NOTE(vish,chmou): old swift memcache uses param timeout instead of time
137
 
    def set(self, key, value, timeout=0, min_compress_len=0):
138
 
        sup = super(FakeSwiftOldMemcacheClient, self)
139
 
        sup.set(key, value, timeout, min_compress_len)
140
 
 
141
 
 
142
 
class FakeHTTPResponse(object):
143
 
    def __init__(self, status, body):
144
 
        self.status = status
145
 
        self.body = body
146
 
 
147
 
    def read(self):
148
 
        return self.body
149
 
 
150
 
 
151
 
class BaseFakeHTTPConnection(object):
152
 
 
153
 
    def _user_token_responses(self, token_id):
154
 
        """Emulate user token responses.
155
 
 
156
 
        Return success if the token is in the list we know
157
 
        about. If the request is for revoked tokens, then return
158
 
        the revoked list, else if a different token is provided,
159
 
        return 404 indicating an unknown (therefore unauthorized) token.
160
 
 
161
 
        """
162
 
        if token_id in client_fixtures.JSON_TOKEN_RESPONSES.keys():
163
 
            status = 200
164
 
            body = client_fixtures.JSON_TOKEN_RESPONSES[token_id]
165
 
        elif token_id == "revoked":
166
 
            status = 200
167
 
            body = SIGNED_REVOCATION_LIST
168
 
        else:
169
 
            status = 404
170
 
            body = str()
171
 
        return status, body
172
 
 
173
 
    def fake_v2_responses(self, path):
174
 
        token_id = path.rsplit('/', 1)[1]
175
 
        return self._user_token_responses(token_id)
176
 
 
177
 
    def fake_v3_responses(self, path, **kwargs):
178
 
        headers = kwargs.get('headers')
179
 
        token_id = headers['X-Subject-Token']
180
 
        return self._user_token_responses(token_id)
181
 
 
182
 
    def fake_v2_admin_token(self, path):
183
 
        status = 200
184
 
        body = jsonutils.dumps({
185
 
            'access': {
186
 
                'token': {'id': 'admin_token2',
187
 
                          'expires': '2022-10-03T16:58:01Z'}
188
 
            },
189
 
        })
190
 
        return status, body
191
 
 
192
 
 
193
 
class CertificateHTTPConnection(BaseFakeHTTPConnection):
194
 
 
195
 
    signing_cert_data = 'SIGNING CERT'
196
 
    ca_cert_data = 'SIGNING CA'
197
 
 
198
 
    def __init__(self, *args, **kwargs):
199
 
        self.response = None
200
 
 
201
 
    def request(self, method, path, **kwargs):
202
 
        CertificateHTTPConnection.last_requested_url = path
203
 
 
204
 
        if method == 'GET' and path == '/testadmin/v2.0/certificates/signing':
205
 
            self.response = FakeHTTPResponse(200, self.signing_cert_data)
206
 
        elif method == 'GET' and path == '/testadmin/v2.0/certificates/ca':
207
 
            self.response = FakeHTTPResponse(200, self.ca_cert_data)
208
 
        else:
209
 
            self.response = FakeHTTPResponse(404, '')
210
 
 
211
 
    def getresponse(self):
212
 
        return self.response
213
 
 
214
 
    def close(self):
215
 
        pass
216
 
 
217
 
 
218
 
class FakeHTTPConnection(BaseFakeHTTPConnection):
219
 
    """Emulate a fake Keystone v2 server."""
220
 
 
221
 
    def __init__(self, *args, **kwargs):
222
 
        self.send_valid_revocation_list = True
223
 
        self.resp = None
224
 
 
225
 
    def request(self, method, path, **kwargs):
226
 
        """Fakes out several http responses.
227
 
 
228
 
        Support the following requests:
229
 
 
230
 
        - Create admin token ('POST /testadmin/v2.0/tokens')
231
 
        - Get versions ('GET /testadmin/')
232
 
        - Get v2 user token responses (see fake_v2_responses)
233
 
 
234
 
        """
235
 
        FakeHTTPConnection.last_requested_url = path
236
 
        if method == 'POST' and path == '/testadmin/v2.0/tokens':
237
 
            status, body = self.fake_v2_admin_token(path)
238
 
        else:
239
 
            if path == '/testadmin/':
240
 
                # It's a GET versions call
241
 
                status = 300
242
 
                body = jsonutils.dumps(VERSION_LIST_v2)
243
 
            else:
244
 
                status, body = self.fake_v2_responses(path)
245
 
 
246
 
        self.resp = FakeHTTPResponse(status, body)
247
 
 
248
 
    def getresponse(self):
249
 
        # If self.resp is set then this is just the response to
250
 
        # the earlier request.  If it is not set, then we expect
251
 
        # a stack of responses to have been pre-prepared
252
 
        if self.resp:
253
 
            return self.resp
254
 
        else:
255
 
            if len(FAKE_RESPONSE_STACK):
256
 
                return FAKE_RESPONSE_STACK.pop()
257
 
            return FakeHTTPResponse(
258
 
                500, jsonutils.dumps('UNEXPECTED RESPONSE'))
259
 
 
260
 
    def close(self):
261
 
        pass
262
 
 
263
 
 
264
 
class v3FakeHTTPConnection(FakeHTTPConnection):
265
 
    """Emulate a fake Keystone v3 server."""
266
 
 
267
 
    def request(self, method, path, **kwargs):
268
 
        """Fakes out several http responses.
269
 
 
270
 
        Support the following requests:
271
 
 
272
 
        - Create admin token ('POST /testadmin/v2.0/tokens')
273
 
        - Get versions ('GET /testadmin/')
274
 
        - Get v2 user token responses (see fake_v2_responses)
275
 
        - Get v3 user token responses (see fake_v3_responses)
276
 
 
277
 
        """
278
 
        v3FakeHTTPConnection.last_requested_url = path
279
 
        if method == 'POST' and path == '/testadmin/v2.0/tokens':
280
 
            status, body = self.fake_v2_admin_token(path)
281
 
        else:
282
 
            if path == '/testadmin/':
283
 
                # It's a GET versions call
284
 
                status = 300
285
 
                body = jsonutils.dumps(VERSION_LIST_v3)
286
 
            elif path.split('/')[2] == 'v2.0':
287
 
                status, body = self.fake_v2_responses(path)
288
 
            else:
289
 
                status, body = self.fake_v3_responses(path, **kwargs)
290
 
 
291
 
        self.resp = FakeHTTPResponse(status, body)
292
 
 
293
 
 
294
 
class RaisingHTTPConnection(FakeHTTPConnection):
295
 
    """An HTTPConnection that always raises."""
296
 
 
297
 
    def request(self, method, path, **kwargs):
298
 
        raise AssertionError("HTTP request was called.")
299
 
 
300
 
 
301
 
class RaisingHTTPNetworkError(FakeHTTPConnection):
302
 
    """An HTTPConnection that always raises network error."""
303
 
 
304
 
    def request(self, method, path, **kwargs):
305
 
        raise auth_token.NetworkError("Network connection error.")
306
 
 
307
 
 
308
 
class FakeApp(object):
309
 
    """This represents a WSGI app protected by the auth_token middleware."""
310
 
    def __init__(self, expected_env=None):
311
 
        expected_env = expected_env or {}
312
 
        self.expected_env = dict(EXPECTED_V2_DEFAULT_ENV_RESPONSE)
313
 
        self.expected_env.update(expected_env)
314
 
 
315
 
    def __call__(self, env, start_response):
316
 
        for k, v in self.expected_env.items():
317
 
            assert env[k] == v, '%s != %s' % (env[k], v)
318
 
 
319
 
        resp = webob.Response()
320
 
        resp.body = 'SUCCESS'
321
 
        return resp(env, start_response)
322
 
 
323
 
 
324
 
class v3FakeApp(object):
325
 
    """This represents a v3 WSGI app protected by the auth_token middleware."""
326
 
    def __init__(self, expected_env=None):
327
 
        expected_env = expected_env or {}
328
 
        # We should always get back the same v2 items
329
 
        self.expected_env = dict(EXPECTED_V2_DEFAULT_ENV_RESPONSE)
330
 
        # ...and with v3 additions, these are for the DEFAULT TOKEN
331
 
        v3_default_env_additions = {
332
 
            'HTTP_X_PROJECT_ID': 'tenant_id1',
333
 
            'HTTP_X_PROJECT_NAME': 'tenant_name1',
334
 
            'HTTP_X_PROJECT_DOMAIN_ID': 'domain_id1',
335
 
            'HTTP_X_PROJECT_DOMAIN_NAME': 'domain_name1',
336
 
            'HTTP_X_USER_DOMAIN_ID': 'domain_id1',
337
 
            'HTTP_X_USER_DOMAIN_NAME': 'domain_name1'
338
 
        }
339
 
        self.expected_env.update(v3_default_env_additions)
340
 
        # And finally update for anything passed in
341
 
        self.expected_env.update(expected_env)
342
 
 
343
 
    def __call__(self, env, start_response):
344
 
        for k, v in self.expected_env.items():
345
 
            assert env[k] == v, '%s != %s' % (env[k], v)
346
 
        resp = webob.Response()
347
 
        resp.body = 'SUCCESS'
348
 
        return resp(env, start_response)
349
 
 
350
 
 
351
 
class BaseAuthTokenMiddlewareTest(testtools.TestCase):
352
 
    """Base test class for auth_token middleware.
353
 
 
354
 
    All the tests allow for running with auth_token
355
 
    configured for receiving v2 or v3 tokens, with the
356
 
    choice being made by passing configuration data into
357
 
    Setup().
358
 
 
359
 
    The base class will, by default, run all the tests
360
 
    expecting v2 token formats.  Child classes can override
361
 
    this to specify, for instance, v3 format.
362
 
 
363
 
    """
364
 
    def setUp(self, expected_env=None, auth_version=None,
365
 
              fake_app=None, fake_http=None, token_dict=None):
366
 
        testtools.TestCase.setUp(self)
367
 
        expected_env = expected_env or {}
368
 
 
369
 
        if token_dict:
370
 
            self.token_dict = token_dict
371
 
        else:
372
 
            self.token_dict = {
373
 
                'uuid_token_default': client_fixtures.UUID_TOKEN_DEFAULT,
374
 
                'uuid_token_unscoped': client_fixtures.UUID_TOKEN_UNSCOPED,
375
 
                'signed_token_scoped': client_fixtures.SIGNED_TOKEN_SCOPED,
376
 
                'signed_token_scoped_expired':
377
 
                client_fixtures.SIGNED_TOKEN_SCOPED_EXPIRED,
378
 
                'revoked_token': client_fixtures.REVOKED_TOKEN,
379
 
                'revoked_token_hash': client_fixtures.REVOKED_TOKEN_HASH
380
 
            }
381
 
 
382
 
        self.conf = {
383
 
            'auth_host': 'keystone.example.com',
384
 
            'auth_port': 1234,
385
 
            'auth_admin_prefix': '/testadmin',
386
 
            'signing_dir': client_fixtures.CERTDIR,
387
 
            'auth_version': auth_version
388
 
        }
389
 
 
390
 
        # Base assumes v2 for fake app and http, can be overridden for
391
 
        # child classes by called set_middleware() directly
392
 
        self.fake_app = fake_app or FakeApp
393
 
        self.fake_http = fake_http or FakeHTTPConnection
394
 
        self.set_middleware(self.fake_app, self.fake_http,
395
 
                            expected_env, self.conf)
396
 
        # self.middleware.verify_signed_token will call
397
 
        # _ensure_subprocess, but we need
398
 
        # cms.subprocess.CalledProcessError to be imported first.
399
 
        # Explicitly call _ensure_subprocess to make sure the import
400
 
        # happens before we need it regardless of test order.
401
 
        cms._ensure_subprocess()
402
 
 
403
 
        self.response_status = None
404
 
        self.response_headers = None
405
 
 
406
 
        signed_list = 'SIGNED_REVOCATION_LIST'
407
 
        valid_signed_list = 'VALID_SIGNED_REVOCATION_LIST'
408
 
        globals()[signed_list] = globals()[valid_signed_list]
409
 
 
410
 
    def set_fake_http(self, http_handler):
411
 
        """Configure the http handler for the auth_token middleware.
412
 
 
413
 
        Allows tests to override the default handler on specific tests,
414
 
        e.g. to use v2 for those parts of auth_token that still use v2
415
 
        tokens while running the v3 test class, i.e. getting an admin
416
 
        token or revocation list.
417
 
 
418
 
        """
419
 
        self.middleware.http_client_class = http_handler
420
 
 
421
 
    def set_middleware(self, fake_app=None, fake_http=None,
422
 
                       expected_env=None, conf=None):
423
 
        """Configure the class ready to call the auth_token middleware.
424
 
 
425
 
        Set up the various fake items needed to run the middleware.
426
 
        Individual tests that need to further refine these can call this
427
 
        function to override the class defaults.
428
 
 
429
 
        """
430
 
        conf = conf or self.conf
431
 
        if fake_http:
432
 
            conf['http_handler'] = fake_http
433
 
        fake_app = fake_app or self.fake_app
434
 
        self.middleware = auth_token.AuthProtocol(fake_app(expected_env), conf)
435
 
        self.middleware._iso8601 = iso8601
436
 
        self.middleware.revoked_file_name = tempfile.mkstemp()[1]
437
 
        self.middleware.token_revocation_list = jsonutils.dumps(
438
 
            {"revoked": [], "extra": "success"})
439
 
 
440
 
    def tearDown(self):
441
 
        testtools.TestCase.tearDown(self)
442
 
        try:
443
 
            os.remove(self.middleware.revoked_file_name)
444
 
        except OSError:
445
 
            pass
446
 
 
447
 
    def start_fake_response(self, status, headers):
448
 
        self.response_status = int(status.split(' ', 1)[0])
449
 
        self.response_headers = dict(headers)
450
 
 
451
 
 
452
 
if tuple(sys.version_info)[0:2] < (2, 7):
453
 
 
454
 
    # 2.6 doesn't have the assert dict equals so make sure that it exists
455
 
    class AdjustedBaseAuthTokenMiddlewareTest(BaseAuthTokenMiddlewareTest):
456
 
        def assertIsInstance(self, obj, cls, msg=None):
457
 
            """Same as self.assertTrue(isinstance(obj, cls)), with a nicer
458
 
            default message.
459
 
            """
460
 
            if not isinstance(obj, cls):
461
 
                standardMsg = '%s is not an instance of %r' % (obj, cls)
462
 
                self.fail(self._formatMessage(msg, standardMsg))
463
 
 
464
 
        def assertDictEqual(self, d1, d2, msg=None):
465
 
            # Simple version taken from 2.7
466
 
            self.assertIsInstance(d1, dict,
467
 
                                  'First argument is not a dictionary')
468
 
            self.assertIsInstance(d2, dict,
469
 
                                  'Second argument is not a dictionary')
470
 
            if d1 != d2:
471
 
                if msg:
472
 
                    self.fail(msg)
473
 
                else:
474
 
                    standardMsg = '%r != %r' % (d1, d2)
475
 
                    self.fail(standardMsg)
476
 
 
477
 
    BaseAuthTokenMiddlewareTest = AdjustedBaseAuthTokenMiddlewareTest
478
 
 
479
 
 
480
 
class StackResponseAuthTokenMiddlewareTest(BaseAuthTokenMiddlewareTest):
481
 
    """Auth Token middleware test setup that allows the tests to define
482
 
    a stack of responses to HTTP requests in the test and get those
483
 
    responses back in sequence for testing.
484
 
 
485
 
    Example::
486
 
 
487
 
        resp1 = FakeHTTPResponse(401, jsonutils.dumps(''))
488
 
        resp2 = FakeHTTPResponse(200, jsonutils.dumps({
489
 
            'access': {
490
 
                'token': {'id': 'admin_token2'},
491
 
            },
492
 
            })
493
 
        FAKE_RESPONSE_STACK.append(resp1)
494
 
        FAKE_RESPONSE_STACK.append(resp2)
495
 
 
496
 
        ... do your testing code here ...
497
 
 
498
 
    """
499
 
 
500
 
    def setUp(self):
501
 
        super(StackResponseAuthTokenMiddlewareTest, self).setUp()
502
 
 
503
 
    def test_fetch_revocation_list_with_expire(self):
504
 
        # first response to revocation list should return 401 Unauthorized
505
 
        # to pretend to be an expired token
506
 
        resp1 = FakeHTTPResponse(200, jsonutils.dumps({
507
 
            'access': {
508
 
                'token': {'id': 'admin_token2'},
509
 
            },
510
 
        }))
511
 
        resp2 = FakeHTTPResponse(401, jsonutils.dumps(''))
512
 
        resp3 = FakeHTTPResponse(200, jsonutils.dumps({
513
 
            'access': {
514
 
                'token': {'id': 'admin_token2'},
515
 
            },
516
 
        }))
517
 
        resp4 = FakeHTTPResponse(200, SIGNED_REVOCATION_LIST)
518
 
 
519
 
        # first get_admin_token() call
520
 
        FAKE_RESPONSE_STACK.append(resp1)
521
 
        # request revocation list, get "unauthorized" due to simulated expired
522
 
        # token
523
 
        FAKE_RESPONSE_STACK.append(resp2)
524
 
        # request a new admin_token
525
 
        FAKE_RESPONSE_STACK.append(resp3)
526
 
        # request revocation list, get the revocation list properly
527
 
        FAKE_RESPONSE_STACK.append(resp4)
528
 
 
529
 
        fetched_list = jsonutils.loads(self.middleware.fetch_revocation_list())
530
 
        self.assertEqual(fetched_list, client_fixtures.REVOCATION_LIST)
531
 
 
532
 
 
533
 
class DiabloAuthTokenMiddlewareTest(BaseAuthTokenMiddlewareTest):
534
 
    """Auth Token middleware should understand Diablo keystone responses."""
535
 
    def setUp(self):
536
 
        # pre-diablo only had Tenant ID, which was also the Name
537
 
        expected_env = {
538
 
            'HTTP_X_TENANT_ID': 'tenant_id1',
539
 
            'HTTP_X_TENANT_NAME': 'tenant_id1',
540
 
            # now deprecated (diablo-compat)
541
 
            'HTTP_X_TENANT': 'tenant_id1',
542
 
        }
543
 
        super(DiabloAuthTokenMiddlewareTest, self).setUp(
544
 
            expected_env=expected_env)
545
 
 
546
 
    def test_valid_diablo_response(self):
547
 
        req = webob.Request.blank('/')
548
 
        req.headers['X-Auth-Token'] = client_fixtures.VALID_DIABLO_TOKEN
549
 
        self.middleware(req.environ, self.start_fake_response)
550
 
        self.assertEqual(self.response_status, 200)
551
 
        self.assertTrue('keystone.token_info' in req.environ)
552
 
 
553
 
 
554
 
class NoMemcacheAuthToken(BaseAuthTokenMiddlewareTest):
555
 
 
556
 
    def setUp(self):
557
 
        super(NoMemcacheAuthToken, self).setUp()
558
 
        self.useFixture(DisableModuleFixture('memcache'))
559
 
 
560
 
    def test_nomemcache(self):
561
 
        conf = {
562
 
            'admin_token': 'admin_token1',
563
 
            'auth_host': 'keystone.example.com',
564
 
            'auth_port': 1234,
565
 
            'memcached_servers': 'localhost:11211',
566
 
        }
567
 
 
568
 
        auth_token.AuthProtocol(FakeApp(), conf)
569
 
 
570
 
    def test_not_use_cache_from_env(self):
571
 
        env = {'swift.cache': 'CACHE_TEST'}
572
 
        conf = {
573
 
            'auth_host': 'keystone.example.com',
574
 
            'auth_port': 1234,
575
 
            'auth_admin_prefix': '/testadmin',
576
 
            'memcached_servers': 'localhost:11211'
577
 
        }
578
 
        self.set_middleware(conf=conf)
579
 
        self.middleware._init_cache(env)
580
 
        self.assertNotEqual(self.middleware._cache, 'CACHE_TEST')
581
 
 
582
 
 
583
 
class AuthTokenMiddlewareTest(BaseAuthTokenMiddlewareTest):
584
 
 
585
 
    def test_init_does_not_call_http(self):
586
 
        conf = {
587
 
            'auth_host': 'keystone.example.com',
588
 
            'auth_port': 1234,
589
 
            'revocation_cache_time': 1
590
 
        }
591
 
        self.set_fake_http(RaisingHTTPConnection)
592
 
        self.set_middleware(conf=conf, fake_http=RaisingHTTPConnection)
593
 
 
594
 
    def assert_valid_last_url(self, token_id):
595
 
        # Default version (v2) has id in the token, override this
596
 
        # method for v3 and other versions
597
 
        self.assertEqual("/testadmin/v2.0/tokens/%s" % token_id,
598
 
                         self.middleware.http_client_class.last_requested_url)
599
 
 
600
 
    def assert_valid_request_200(self, token, with_catalog=True):
601
 
        req = webob.Request.blank('/')
602
 
        req.headers['X-Auth-Token'] = token
603
 
        body = self.middleware(req.environ, self.start_fake_response)
604
 
        self.assertEqual(self.response_status, 200)
605
 
        if with_catalog:
606
 
            self.assertTrue(req.headers.get('X-Service-Catalog'))
607
 
        self.assertEqual(body, ['SUCCESS'])
608
 
        self.assertTrue('keystone.token_info' in req.environ)
609
 
 
610
 
    def test_valid_uuid_request(self):
611
 
        self.assert_valid_request_200(self.token_dict['uuid_token_default'])
612
 
        self.assert_valid_last_url(self.token_dict['uuid_token_default'])
613
 
 
614
 
    def test_valid_signed_request(self):
615
 
        self.middleware.http_client_class.last_requested_url = ''
616
 
        self.assert_valid_request_200(
617
 
            self.token_dict['signed_token_scoped'])
618
 
        self.assertEqual(self.middleware.conf['auth_admin_prefix'],
619
 
                         "/testadmin")
620
 
        #ensure that signed requests do not generate HTTP traffic
621
 
        self.assertEqual(
622
 
            '', self.middleware.http_client_class.last_requested_url)
623
 
 
624
 
    def test_revoked_token_receives_401(self):
625
 
        self.middleware.token_revocation_list = self.get_revocation_list_json()
626
 
        req = webob.Request.blank('/')
627
 
        req.headers['X-Auth-Token'] = self.token_dict['revoked_token']
628
 
        self.middleware(req.environ, self.start_fake_response)
629
 
        self.assertEqual(self.response_status, 401)
630
 
 
631
 
    def get_revocation_list_json(self, token_ids=None):
632
 
        if token_ids is None:
633
 
            token_ids = [self.token_dict['revoked_token_hash']]
634
 
        revocation_list = {'revoked': [{'id': x, 'expires': timeutils.utcnow()}
635
 
                                       for x in token_ids]}
636
 
        return jsonutils.dumps(revocation_list)
637
 
 
638
 
    def test_is_signed_token_revoked_returns_false(self):
639
 
        #explicitly setting an empty revocation list here to document intent
640
 
        self.middleware.token_revocation_list = jsonutils.dumps(
641
 
            {"revoked": [], "extra": "success"})
642
 
        result = self.middleware.is_signed_token_revoked(
643
 
            self.token_dict['revoked_token'])
644
 
        self.assertFalse(result)
645
 
 
646
 
    def test_is_signed_token_revoked_returns_true(self):
647
 
        self.middleware.token_revocation_list = self.get_revocation_list_json()
648
 
        result = self.middleware.is_signed_token_revoked(
649
 
            self.token_dict['revoked_token'])
650
 
        self.assertTrue(result)
651
 
 
652
 
    def test_verify_signed_token_raises_exception_for_revoked_token(self):
653
 
        self.middleware.token_revocation_list = self.get_revocation_list_json()
654
 
        self.assertRaises(auth_token.InvalidUserToken,
655
 
                          self.middleware.verify_signed_token,
656
 
                          self.token_dict['revoked_token'])
657
 
 
658
 
    def test_verify_signed_token_succeeds_for_unrevoked_token(self):
659
 
        self.middleware.token_revocation_list = self.get_revocation_list_json()
660
 
        self.middleware.verify_signed_token(
661
 
            self.token_dict['signed_token_scoped'])
662
 
 
663
 
    def test_verify_signing_dir_create_while_missing(self):
664
 
        tmp_name = uuid.uuid4().hex
665
 
        test_parent_signing_dir = "/tmp/%s" % tmp_name
666
 
        self.middleware.signing_dirname = "/tmp/%s/%s" % ((tmp_name,) * 2)
667
 
        self.middleware.signing_cert_file_name = "%s/test.pem" %\
668
 
            self.middleware.signing_dirname
669
 
        self.middleware.verify_signing_dir()
670
 
        # NOTE(wu_wenxiang): Verify if the signing dir was created as expected.
671
 
        self.assertTrue(os.path.isdir(self.middleware.signing_dirname))
672
 
        self.assertTrue(os.access(self.middleware.signing_dirname, os.W_OK))
673
 
        self.assertEqual(os.stat(self.middleware.signing_dirname).st_uid,
674
 
                         os.getuid())
675
 
        self.assertEqual(
676
 
            stat.S_IMODE(os.stat(self.middleware.signing_dirname).st_mode),
677
 
            stat.S_IRWXU)
678
 
        shutil.rmtree(test_parent_signing_dir)
679
 
 
680
 
    def test_cert_file_missing(self):
681
 
        self.assertFalse(self.middleware.cert_file_missing(
682
 
                         "openstack: /tmp/haystack: No such file or directory",
683
 
                         "/tmp/needle"))
684
 
        self.assertTrue(self.middleware.cert_file_missing(
685
 
                        "openstack: /not/exist: No such file or directory",
686
 
                        "/not/exist"))
687
 
 
688
 
    def test_get_token_revocation_list_fetched_time_returns_min(self):
689
 
        self.middleware.token_revocation_list_fetched_time = None
690
 
        self.middleware.revoked_file_name = ''
691
 
        self.assertEqual(self.middleware.token_revocation_list_fetched_time,
692
 
                         datetime.datetime.min)
693
 
 
694
 
    def test_get_token_revocation_list_fetched_time_returns_mtime(self):
695
 
        self.middleware.token_revocation_list_fetched_time = None
696
 
        mtime = os.path.getmtime(self.middleware.revoked_file_name)
697
 
        fetched_time = datetime.datetime.fromtimestamp(mtime)
698
 
        self.assertEqual(self.middleware.token_revocation_list_fetched_time,
699
 
                         fetched_time)
700
 
 
701
 
    def test_get_token_revocation_list_fetched_time_returns_value(self):
702
 
        expected = self.middleware._token_revocation_list_fetched_time
703
 
        self.assertEqual(self.middleware.token_revocation_list_fetched_time,
704
 
                         expected)
705
 
 
706
 
    def test_get_revocation_list_returns_fetched_list(self):
707
 
        # auth_token uses v2 to fetch this, so don't allow the v3
708
 
        # tests to override the fake http connection
709
 
        self.set_fake_http(FakeHTTPConnection)
710
 
        self.middleware.token_revocation_list_fetched_time = None
711
 
        os.remove(self.middleware.revoked_file_name)
712
 
        self.assertEqual(self.middleware.token_revocation_list,
713
 
                         client_fixtures.REVOCATION_LIST)
714
 
 
715
 
    def test_get_revocation_list_returns_current_list_from_memory(self):
716
 
        self.assertEqual(self.middleware.token_revocation_list,
717
 
                         self.middleware._token_revocation_list)
718
 
 
719
 
    def test_get_revocation_list_returns_current_list_from_disk(self):
720
 
        in_memory_list = self.middleware.token_revocation_list
721
 
        self.middleware._token_revocation_list = None
722
 
        self.assertEqual(self.middleware.token_revocation_list, in_memory_list)
723
 
 
724
 
    def test_invalid_revocation_list_raises_service_error(self):
725
 
        globals()['SIGNED_REVOCATION_LIST'] = "{}"
726
 
        self.assertRaises(auth_token.ServiceError,
727
 
                          self.middleware.fetch_revocation_list)
728
 
 
729
 
    def test_fetch_revocation_list(self):
730
 
        # auth_token uses v2 to fetch this, so don't allow the v3
731
 
        # tests to override the fake http connection
732
 
        self.set_fake_http(FakeHTTPConnection)
733
 
        fetched_list = jsonutils.loads(self.middleware.fetch_revocation_list())
734
 
        self.assertEqual(fetched_list, client_fixtures.REVOCATION_LIST)
735
 
 
736
 
    def test_request_invalid_uuid_token(self):
737
 
        req = webob.Request.blank('/')
738
 
        req.headers['X-Auth-Token'] = 'invalid-token'
739
 
        self.middleware(req.environ, self.start_fake_response)
740
 
        self.assertEqual(self.response_status, 401)
741
 
        self.assertEqual(self.response_headers['WWW-Authenticate'],
742
 
                         "Keystone uri='https://keystone.example.com:1234'")
743
 
 
744
 
    def test_request_invalid_signed_token(self):
745
 
        req = webob.Request.blank('/')
746
 
        req.headers['X-Auth-Token'] = client_fixtures.INVALID_SIGNED_TOKEN
747
 
        self.middleware(req.environ, self.start_fake_response)
748
 
        self.assertEqual(self.response_status, 401)
749
 
        self.assertEqual(self.response_headers['WWW-Authenticate'],
750
 
                         "Keystone uri='https://keystone.example.com:1234'")
751
 
 
752
 
    def test_request_no_token(self):
753
 
        req = webob.Request.blank('/')
754
 
        self.middleware(req.environ, self.start_fake_response)
755
 
        self.assertEqual(self.response_status, 401)
756
 
        self.assertEqual(self.response_headers['WWW-Authenticate'],
757
 
                         "Keystone uri='https://keystone.example.com:1234'")
758
 
 
759
 
    def test_request_no_token_log_message(self):
760
 
        class FakeLog(object):
761
 
            def __init__(self):
762
 
                self.msg = None
763
 
                self.debugmsg = None
764
 
 
765
 
            def warn(self, msg=None, *args, **kwargs):
766
 
                self.msg = msg
767
 
 
768
 
            def debug(self, msg=None, *args, **kwargs):
769
 
                self.debugmsg = msg
770
 
 
771
 
        self.middleware.LOG = FakeLog()
772
 
        self.middleware.delay_auth_decision = False
773
 
        self.assertRaises(auth_token.InvalidUserToken,
774
 
                          self.middleware._get_user_token_from_header, {})
775
 
        self.assertIsNotNone(self.middleware.LOG.msg)
776
 
        self.assertIsNotNone(self.middleware.LOG.debugmsg)
777
 
 
778
 
    def test_request_no_token_http(self):
779
 
        req = webob.Request.blank('/', environ={'REQUEST_METHOD': 'HEAD'})
780
 
        self.set_middleware()
781
 
        body = self.middleware(req.environ, self.start_fake_response)
782
 
        self.assertEqual(self.response_status, 401)
783
 
        self.assertEqual(self.response_headers['WWW-Authenticate'],
784
 
                         "Keystone uri='https://keystone.example.com:1234'")
785
 
        self.assertEqual(body, [''])
786
 
 
787
 
    def test_request_blank_token(self):
788
 
        req = webob.Request.blank('/')
789
 
        req.headers['X-Auth-Token'] = ''
790
 
        self.middleware(req.environ, self.start_fake_response)
791
 
        self.assertEqual(self.response_status, 401)
792
 
        self.assertEqual(self.response_headers['WWW-Authenticate'],
793
 
                         "Keystone uri='https://keystone.example.com:1234'")
794
 
 
795
 
    def _get_cached_token(self, token):
796
 
        token_id = cms.cms_hash_token(token)
797
 
        # NOTE(vish): example tokens are expired so skip the expiration check.
798
 
        return self.middleware._cache_get(token_id, ignore_expires=True)
799
 
 
800
 
    def test_memcache(self):
801
 
        req = webob.Request.blank('/')
802
 
        token = self.token_dict['signed_token_scoped']
803
 
        req.headers['X-Auth-Token'] = token
804
 
        self.middleware(req.environ, self.start_fake_response)
805
 
        self.assertNotEqual(self._get_cached_token(token), None)
806
 
 
807
 
    def test_expired(self):
808
 
        req = webob.Request.blank('/')
809
 
        token = self.token_dict['signed_token_scoped_expired']
810
 
        req.headers['X-Auth-Token'] = token
811
 
        self.middleware(req.environ, self.start_fake_response)
812
 
        self.assertEqual(self.response_status, 401)
813
 
 
814
 
    def test_memcache_set_invalid(self):
815
 
        req = webob.Request.blank('/')
816
 
        token = 'invalid-token'
817
 
        req.headers['X-Auth-Token'] = token
818
 
        self.middleware(req.environ, self.start_fake_response)
819
 
        self.assertRaises(auth_token.InvalidUserToken,
820
 
                          self._get_cached_token, token)
821
 
 
822
 
    def test_memcache_set_expired(self, extra_conf={}, extra_environ={}):
823
 
        token_cache_time = 10
824
 
        conf = {
825
 
            'token_cache_time': token_cache_time,
826
 
            'signing_dir': client_fixtures.CERTDIR,
827
 
        }
828
 
        conf.update(extra_conf)
829
 
        self.set_middleware(conf=conf)
830
 
        req = webob.Request.blank('/')
831
 
        token = self.token_dict['signed_token_scoped']
832
 
        req.headers['X-Auth-Token'] = token
833
 
        req.environ.update(extra_environ)
834
 
        try:
835
 
            now = datetime.datetime.utcnow()
836
 
            timeutils.set_time_override(now)
837
 
            self.middleware(req.environ, self.start_fake_response)
838
 
            self.assertNotEqual(self._get_cached_token(token), None)
839
 
            expired = now + datetime.timedelta(seconds=token_cache_time)
840
 
            timeutils.set_time_override(expired)
841
 
            self.assertEqual(self._get_cached_token(token), None)
842
 
        finally:
843
 
            timeutils.clear_time_override()
844
 
 
845
 
    def test_old_swift_memcache_set_expired(self):
846
 
        extra_conf = {'cache': 'swift.cache'}
847
 
        extra_environ = {'swift.cache': FakeSwiftOldMemcacheClient()}
848
 
        self.test_memcache_set_expired(extra_conf, extra_environ)
849
 
 
850
 
    def test_swift_memcache_set_expired(self):
851
 
        extra_conf = {'cache': 'swift.cache'}
852
 
        extra_environ = {'swift.cache': memorycache.Client()}
853
 
        self.test_memcache_set_expired(extra_conf, extra_environ)
854
 
 
855
 
    def test_use_cache_from_env(self):
856
 
        env = {'swift.cache': 'CACHE_TEST'}
857
 
        conf = {
858
 
            'auth_host': 'keystone.example.com',
859
 
            'auth_port': 1234,
860
 
            'auth_admin_prefix': '/testadmin',
861
 
            'cache': 'swift.cache',
862
 
            'memcached_servers': ['localhost:11211']
863
 
        }
864
 
        self.set_middleware(conf=conf)
865
 
        self.middleware._init_cache(env)
866
 
        self.assertEqual(self.middleware._cache, 'CACHE_TEST')
867
 
 
868
 
    def test_will_expire_soon(self):
869
 
        tenseconds = datetime.datetime.utcnow() + datetime.timedelta(
870
 
            seconds=10)
871
 
        self.assertTrue(auth_token.will_expire_soon(tenseconds))
872
 
        fortyseconds = datetime.datetime.utcnow() + datetime.timedelta(
873
 
            seconds=40)
874
 
        self.assertFalse(auth_token.will_expire_soon(fortyseconds))
875
 
 
876
 
    def test_encrypt_cache_data(self):
877
 
        conf = {
878
 
            'auth_host': 'keystone.example.com',
879
 
            'auth_port': 1234,
880
 
            'auth_admin_prefix': '/testadmin',
881
 
            'memcached_servers': ['localhost:11211'],
882
 
            'memcache_security_strategy': 'encrypt',
883
 
            'memcache_secret_key': 'mysecret'
884
 
        }
885
 
        self.set_middleware(conf=conf)
886
 
        token = 'my_token'
887
 
        data = ('this_data', 10e100)
888
 
        self.middleware._init_cache({})
889
 
        self.middleware._cache_store(token, data)
890
 
        self.assertEqual(self.middleware._cache_get(token), data[0])
891
 
 
892
 
    def test_sign_cache_data(self):
893
 
        conf = {
894
 
            'auth_host': 'keystone.example.com',
895
 
            'auth_port': 1234,
896
 
            'auth_admin_prefix': '/testadmin',
897
 
            'memcached_servers': ['localhost:11211'],
898
 
            'memcache_security_strategy': 'mac',
899
 
            'memcache_secret_key': 'mysecret'
900
 
        }
901
 
        self.set_middleware(conf=conf)
902
 
        token = 'my_token'
903
 
        data = ('this_data', 10e100)
904
 
        self.middleware._init_cache({})
905
 
        self.middleware._cache_store(token, data)
906
 
        self.assertEqual(self.middleware._cache_get(token), data[0])
907
 
 
908
 
    def test_no_memcache_protection(self):
909
 
        conf = {
910
 
            'auth_host': 'keystone.example.com',
911
 
            'auth_port': 1234,
912
 
            'auth_admin_prefix': '/testadmin',
913
 
            'memcached_servers': ['localhost:11211'],
914
 
            'memcache_secret_key': 'mysecret'
915
 
        }
916
 
        self.set_middleware(conf=conf)
917
 
        token = 'my_token'
918
 
        data = ('this_data', 10e100)
919
 
        self.middleware._init_cache({})
920
 
        self.middleware._cache_store(token, data)
921
 
        self.assertEqual(self.middleware._cache_get(token), data[0])
922
 
 
923
 
    def test_assert_valid_memcache_protection_config(self):
924
 
        # test missing memcache_secret_key
925
 
        conf = {
926
 
            'auth_host': 'keystone.example.com',
927
 
            'auth_port': 1234,
928
 
            'auth_admin_prefix': '/testadmin',
929
 
            'memcached_servers': ['localhost:11211'],
930
 
            'memcache_security_strategy': 'Encrypt'
931
 
        }
932
 
        self.assertRaises(Exception, self.set_middleware, conf)
933
 
        # test invalue memcache_security_strategy
934
 
        conf = {
935
 
            'auth_host': 'keystone.example.com',
936
 
            'auth_port': 1234,
937
 
            'auth_admin_prefix': '/testadmin',
938
 
            'memcached_servers': ['localhost:11211'],
939
 
            'memcache_security_strategy': 'whatever'
940
 
        }
941
 
        self.assertRaises(Exception, self.set_middleware, conf)
942
 
        # test missing memcache_secret_key
943
 
        conf = {
944
 
            'auth_host': 'keystone.example.com',
945
 
            'auth_port': 1234,
946
 
            'auth_admin_prefix': '/testadmin',
947
 
            'memcached_servers': ['localhost:11211'],
948
 
            'memcache_security_strategy': 'mac'
949
 
        }
950
 
        self.assertRaises(Exception, self.set_middleware, conf)
951
 
        conf = {
952
 
            'auth_host': 'keystone.example.com',
953
 
            'auth_port': 1234,
954
 
            'auth_admin_prefix': '/testadmin',
955
 
            'memcached_servers': ['localhost:11211'],
956
 
            'memcache_security_strategy': 'Encrypt',
957
 
            'memcache_secret_key': ''
958
 
        }
959
 
        self.assertRaises(Exception, self.set_middleware, conf)
960
 
        conf = {
961
 
            'auth_host': 'keystone.example.com',
962
 
            'auth_port': 1234,
963
 
            'auth_admin_prefix': '/testadmin',
964
 
            'memcached_servers': ['localhost:11211'],
965
 
            'memcache_security_strategy': 'mAc',
966
 
            'memcache_secret_key': ''
967
 
        }
968
 
        self.assertRaises(Exception, self.set_middleware, conf)
969
 
 
970
 
    def test_config_revocation_cache_timeout(self):
971
 
        conf = {
972
 
            'auth_host': 'keystone.example.com',
973
 
            'auth_port': 1234,
974
 
            'auth_admin_prefix': '/testadmin',
975
 
            'revocation_cache_time': 24
976
 
        }
977
 
        middleware = auth_token.AuthProtocol(self.fake_app, conf)
978
 
        self.assertEquals(middleware.token_revocation_list_cache_timeout,
979
 
                          datetime.timedelta(seconds=24))
980
 
 
981
 
    def test_http_error_not_cached_token(self):
982
 
        """Test to don't cache token as invalid on network errors.
983
 
 
984
 
        We use UUID tokens since they are the easiest one to reach
985
 
        get_http_connection.
986
 
        """
987
 
        req = webob.Request.blank('/')
988
 
        token = self.token_dict['uuid_token_default']
989
 
        req.headers['X-Auth-Token'] = token
990
 
        self.set_fake_http(RaisingHTTPNetworkError)
991
 
        self.middleware.http_request_max_retries = 0
992
 
        self.middleware(req.environ, self.start_fake_response)
993
 
        self.assertEqual(self._get_cached_token(token), None)
994
 
 
995
 
 
996
 
class CertDownloadMiddlewareTest(BaseAuthTokenMiddlewareTest):
997
 
    def setUp(self):
998
 
        super(CertDownloadMiddlewareTest, self).setUp()
999
 
        self.base_dir = tempfile.mkdtemp()
1000
 
        self.cert_dir = os.path.join(self.base_dir, 'certs')
1001
 
        os.mkdir(self.cert_dir)
1002
 
 
1003
 
        self.conf = {
1004
 
            'auth_host': 'keystone.example.com',
1005
 
            'auth_port': 1234,
1006
 
            'auth_protocol': 'http',
1007
 
            'auth_admin_prefix': '/testadmin',
1008
 
            'signing_dir': self.cert_dir,
1009
 
        }
1010
 
 
1011
 
    def tearDown(self):
1012
 
        shutil.rmtree(self.base_dir)
1013
 
        super(CertDownloadMiddlewareTest, self).tearDown()
1014
 
 
1015
 
    # Usually we supply a signed_dir with pre-installed certificates,
1016
 
    # so invocation of /usr/bin/openssl succeeds. This time we give it
1017
 
    # an empty directory, so it fails.
1018
 
    def test_request_no_token_dummy(self):
1019
 
        self.set_middleware(fake_http=self.fake_http, conf=self.conf)
1020
 
        self.assertRaises(cms.subprocess.CalledProcessError,
1021
 
                          self.middleware.verify_signed_token,
1022
 
                          self.token_dict['signed_token_scoped'])
1023
 
 
1024
 
    def test_fetch_signing_cert(self):
1025
 
        self.set_middleware(fake_http=CertificateHTTPConnection,
1026
 
                            conf=self.conf)
1027
 
 
1028
 
        self.middleware.fetch_signing_cert()
1029
 
 
1030
 
        with open(self.middleware.signing_cert_file_name, 'r') as f:
1031
 
            self.assertEqual(f.read(),
1032
 
                             CertificateHTTPConnection.signing_cert_data)
1033
 
 
1034
 
        self.assertEqual('/testadmin/v2.0/certificates/signing',
1035
 
                         self.middleware.http_client_class.last_requested_url)
1036
 
 
1037
 
    def test_fetch_signing_ca(self):
1038
 
        self.set_middleware(fake_http=CertificateHTTPConnection,
1039
 
                            conf=self.conf)
1040
 
 
1041
 
        self.middleware.fetch_ca_cert()
1042
 
 
1043
 
        with open(self.middleware.ca_file_name, 'r') as f:
1044
 
            self.assertEqual(f.read(), CertificateHTTPConnection.ca_cert_data)
1045
 
 
1046
 
        self.assertEqual('/testadmin/v2.0/certificates/ca',
1047
 
                         self.middleware.http_client_class.last_requested_url)
1048
 
 
1049
 
    def test_prefix_trailing_slash(self):
1050
 
        self.conf['auth_admin_prefix'] = '/newadmin/'
1051
 
        self.set_middleware(fake_http=CertificateHTTPConnection,
1052
 
                            conf=self.conf)
1053
 
 
1054
 
        # the requests will return a 404, but it doesn't matter
1055
 
 
1056
 
        self.middleware.fetch_ca_cert()
1057
 
 
1058
 
        self.assertEqual('/newadmin/v2.0/certificates/ca',
1059
 
                         self.middleware.http_client_class.last_requested_url)
1060
 
 
1061
 
        self.middleware.fetch_signing_cert()
1062
 
 
1063
 
        self.assertEqual('/newadmin/v2.0/certificates/signing',
1064
 
                         self.middleware.http_client_class.last_requested_url)
1065
 
 
1066
 
    def test_without_prefix(self):
1067
 
        self.conf['auth_admin_prefix'] = ''
1068
 
 
1069
 
        self.set_middleware(fake_http=CertificateHTTPConnection,
1070
 
                            conf=self.conf)
1071
 
 
1072
 
        # the requests will return a 404, but it doesn't matter
1073
 
 
1074
 
        self.middleware.fetch_ca_cert()
1075
 
 
1076
 
        self.assertEqual('/v2.0/certificates/ca',
1077
 
                         self.middleware.http_client_class.last_requested_url)
1078
 
 
1079
 
        self.middleware.fetch_signing_cert()
1080
 
 
1081
 
        self.assertEqual('/v2.0/certificates/signing',
1082
 
                         self.middleware.http_client_class.last_requested_url)
1083
 
 
1084
 
 
1085
 
class v2AuthTokenMiddlewareTest(BaseAuthTokenMiddlewareTest):
1086
 
    """v2 token specific tests.
1087
 
 
1088
 
    There are some differences between how the auth-token middleware handles
1089
 
    v2 and v3 tokens over and above the token formats, namely:
1090
 
 
1091
 
    - A v3 keystone server will auto scope a token to a user's default project
1092
 
      if no scope is specified. A v2 server assumes that the auth-token
1093
 
      middleware will do that.
1094
 
    - A v2 keystone server may issue a token without a catalog, even with a
1095
 
      tenant
1096
 
 
1097
 
    The tests below were originally part of the generic AuthTokenMiddlewareTest
1098
 
    class, but now, since they really are v2 specifc, they are included here.
1099
 
 
1100
 
    """
1101
 
    def assert_unscoped_default_tenant_auto_scopes(self, token):
1102
 
        """Unscoped v2 requests with a default tenant should "auto-scope."
1103
 
 
1104
 
        The implied scope is the user's tenant ID.
1105
 
 
1106
 
        """
1107
 
        req = webob.Request.blank('/')
1108
 
        req.headers['X-Auth-Token'] = token
1109
 
        body = self.middleware(req.environ, self.start_fake_response)
1110
 
        self.assertEqual(self.response_status, 200)
1111
 
        self.assertEqual(body, ['SUCCESS'])
1112
 
        self.assertTrue('keystone.token_info' in req.environ)
1113
 
 
1114
 
    def test_default_tenant_uuid_token(self):
1115
 
        self.assert_unscoped_default_tenant_auto_scopes(
1116
 
            client_fixtures.UUID_TOKEN_DEFAULT)
1117
 
 
1118
 
    def test_default_tenant_signed_token(self):
1119
 
        self.assert_unscoped_default_tenant_auto_scopes(
1120
 
            client_fixtures.SIGNED_TOKEN_SCOPED)
1121
 
 
1122
 
    def assert_unscoped_token_receives_401(self, token):
1123
 
        """Unscoped requests with no default tenant ID should be rejected."""
1124
 
        req = webob.Request.blank('/')
1125
 
        req.headers['X-Auth-Token'] = token
1126
 
        self.middleware(req.environ, self.start_fake_response)
1127
 
        self.assertEqual(self.response_status, 401)
1128
 
        self.assertEqual(self.response_headers['WWW-Authenticate'],
1129
 
                         "Keystone uri='https://keystone.example.com:1234'")
1130
 
 
1131
 
    def test_unscoped_uuid_token_receives_401(self):
1132
 
        self.assert_unscoped_token_receives_401(
1133
 
            client_fixtures.UUID_TOKEN_UNSCOPED)
1134
 
 
1135
 
    def test_unscoped_pki_token_receives_401(self):
1136
 
        self.assert_unscoped_token_receives_401(
1137
 
            client_fixtures.SIGNED_TOKEN_UNSCOPED)
1138
 
 
1139
 
    def test_request_prevent_service_catalog_injection(self):
1140
 
        req = webob.Request.blank('/')
1141
 
        req.headers['X-Service-Catalog'] = '[]'
1142
 
        req.headers['X-Auth-Token'] = \
1143
 
            client_fixtures.UUID_TOKEN_NO_SERVICE_CATALOG
1144
 
        body = self.middleware(req.environ, self.start_fake_response)
1145
 
        self.assertEqual(self.response_status, 200)
1146
 
        self.assertFalse(req.headers.get('X-Service-Catalog'))
1147
 
        self.assertEqual(body, ['SUCCESS'])
1148
 
 
1149
 
    def test_valid_uuid_request_forced_to_2_0(self):
1150
 
        """Test forcing auth_token to use lower api version.
1151
 
 
1152
 
        By installing the v3 http hander, auth_token will be get
1153
 
        a version list that looks like a v3 server - from which it
1154
 
        would normally chose v3.0 as the auth version.  However, here
1155
 
        we specify v2.0 in the configuration - which should force
1156
 
        auth_token to use that version instead.
1157
 
 
1158
 
        """
1159
 
        conf = {
1160
 
            'auth_host': 'keystone.example.com',
1161
 
            'auth_port': 1234,
1162
 
            'auth_admin_prefix': '/testadmin',
1163
 
            'signing_dir': client_fixtures.CERTDIR,
1164
 
            'auth_version': 'v2.0'
1165
 
        }
1166
 
        self.set_middleware(fake_http=v3FakeHTTPConnection, conf=conf)
1167
 
        # This tests will only work is auth_token has chosen to use the
1168
 
        # lower, v2, api version
1169
 
        req = webob.Request.blank('/')
1170
 
        req.headers['X-Auth-Token'] = client_fixtures.UUID_TOKEN_DEFAULT
1171
 
        body = self.middleware(req.environ, self.start_fake_response)
1172
 
        self.assertEqual(self.response_status, 200)
1173
 
        self.assertEqual("/testadmin/v2.0/tokens/%s" %
1174
 
                         client_fixtures.UUID_TOKEN_DEFAULT,
1175
 
                         v3FakeHTTPConnection.last_requested_url)
1176
 
 
1177
 
    def test_invalid_auth_version_request(self):
1178
 
        conf = {
1179
 
            'auth_host': 'keystone.example.com',
1180
 
            'auth_port': 1234,
1181
 
            'auth_admin_prefix': '/testadmin',
1182
 
            'signing_dir': client_fixtures.CERTDIR,
1183
 
            'auth_version': 'v1.0'      # v1.0 is no longer supported
1184
 
        }
1185
 
        self.assertRaises(Exception, self.set_middleware, conf)
1186
 
 
1187
 
 
1188
 
class v3AuthTokenMiddlewareTest(AuthTokenMiddlewareTest):
1189
 
    """Test auth_token middleware with v3 tokens.
1190
 
 
1191
 
    Re-execute the AuthTokenMiddlewareTest class tests, but with the
1192
 
    the auth_token middleware configured to expect v3 tokens back from
1193
 
    a keystone server.
1194
 
 
1195
 
    This is done by configuring the AuthTokenMiddlewareTest class via
1196
 
    its Setup(), passing in v3 style data that will then be used by
1197
 
    the tests themselves.  This approach has been used to ensure we
1198
 
    really are running the same tests for both v2 and v3 tokens.
1199
 
 
1200
 
    There a few additional specific test for v3 only:
1201
 
 
1202
 
    - We allow an unscoped token to be validated (as unscoped), where
1203
 
      as for v2 tokens, the auth_token middleware is expected to try and
1204
 
      auto-scope it (and fail if there is no default tenant)
1205
 
    - Domain scoped tokens
1206
 
 
1207
 
    Since we don't specify an auth version for auth_token to use, by
1208
 
    definition we are thefore implicitely testing that it will use
1209
 
    the highest available auth version, i.e. v3.0
1210
 
 
1211
 
    """
1212
 
    def setUp(self):
1213
 
        token_dict = {
1214
 
            'uuid_token_default': client_fixtures.v3_UUID_TOKEN_DEFAULT,
1215
 
            'uuid_token_unscoped': client_fixtures.v3_UUID_TOKEN_UNSCOPED,
1216
 
            'signed_token_scoped': client_fixtures.SIGNED_v3_TOKEN_SCOPED,
1217
 
            'signed_token_scoped_expired':
1218
 
            client_fixtures.SIGNED_TOKEN_SCOPED_EXPIRED,
1219
 
            'revoked_token': client_fixtures.REVOKED_v3_TOKEN,
1220
 
            'revoked_token_hash': client_fixtures.REVOKED_v3_TOKEN_HASH
1221
 
        }
1222
 
        super(v3AuthTokenMiddlewareTest, self).setUp(
1223
 
            auth_version='v3.0',
1224
 
            fake_app=v3FakeApp,
1225
 
            fake_http=v3FakeHTTPConnection,
1226
 
            token_dict=token_dict)
1227
 
 
1228
 
    def assert_valid_last_url(self, token_id):
1229
 
        # Token ID is not part of the url in v3, so override
1230
 
        # this assert test in the base class
1231
 
        self.assertEqual('/testadmin/v3/auth/tokens',
1232
 
                         v3FakeHTTPConnection.last_requested_url)
1233
 
 
1234
 
    def test_valid_unscoped_uuid_request(self):
1235
 
        # Remove items that won't be in an unscoped token
1236
 
        delta_expected_env = {
1237
 
            'HTTP_X_PROJECT_ID': None,
1238
 
            'HTTP_X_PROJECT_NAME': None,
1239
 
            'HTTP_X_PROJECT_DOMAIN_ID': None,
1240
 
            'HTTP_X_PROJECT_DOMAIN_NAME': None,
1241
 
            'HTTP_X_TENANT_ID': None,
1242
 
            'HTTP_X_TENANT_NAME': None,
1243
 
            'HTTP_X_ROLES': '',
1244
 
            'HTTP_X_TENANT': None,
1245
 
            'HTTP_X_ROLE': '',
1246
 
        }
1247
 
        self.set_middleware(expected_env=delta_expected_env)
1248
 
        self.assert_valid_request_200(client_fixtures.v3_UUID_TOKEN_UNSCOPED,
1249
 
                                      with_catalog=False)
1250
 
        self.assertEqual('/testadmin/v3/auth/tokens',
1251
 
                         v3FakeHTTPConnection.last_requested_url)
1252
 
 
1253
 
    def test_domain_scoped_uuid_request(self):
1254
 
        # Modify items compared to default token for a domain scope
1255
 
        delta_expected_env = {
1256
 
            'HTTP_X_DOMAIN_ID': 'domain_id1',
1257
 
            'HTTP_X_DOMAIN_NAME': 'domain_name1',
1258
 
            'HTTP_X_PROJECT_ID': None,
1259
 
            'HTTP_X_PROJECT_NAME': None,
1260
 
            'HTTP_X_PROJECT_DOMAIN_ID': None,
1261
 
            'HTTP_X_PROJECT_DOMAIN_NAME': None,
1262
 
            'HTTP_X_TENANT_ID': None,
1263
 
            'HTTP_X_TENANT_NAME': None,
1264
 
            'HTTP_X_TENANT': None
1265
 
        }
1266
 
        self.set_middleware(expected_env=delta_expected_env)
1267
 
        self.assert_valid_request_200(
1268
 
            client_fixtures.v3_UUID_TOKEN_DOMAIN_SCOPED)
1269
 
        self.assertEqual('/testadmin/v3/auth/tokens',
1270
 
                         v3FakeHTTPConnection.last_requested_url)
1271
 
 
1272
 
 
1273
 
class TokenEncodingTest(testtools.TestCase):
1274
 
    def test_unquoted_token(self):
1275
 
        self.assertEqual('foo%20bar', auth_token.safe_quote('foo bar'))
1276
 
 
1277
 
    def test_quoted_token(self):
1278
 
        self.assertEqual('foo%20bar', auth_token.safe_quote('foo%20bar'))