~ubuntu-branches/ubuntu/precise/keystone/precise-proposed

« back to all changes in this revision

Viewing changes to tests/test_content_types.py

  • Committer: Package Import Robot
  • Author(s): Chuck Short, Chuck Short, Joseph Heck, Adam Gandelman, Dave Walker, Andrew Glen-Young
  • Date: 2012-03-02 09:55:24 UTC
  • mfrom: (1.1.11)
  • Revision ID: package-import@ubuntu.com-20120302095524-koae5li138f7uesh
Tags: 2012.1~e4-0ubuntu1
[ Chuck Short ]
* New upstream release. 
* debian/keystone.upstart: Update for ksl.
* debian/control: Add python-keystoneclient as dependency.
* debian/control: Fix typo.
* debian/keystone.postinst: Update due to redux branch change.
* debian/keystone.templates, debian/keystone.preinst, debian/kestone.postinst,
  debian/keystone.config, debian/README.Debian: Make keystone installation 
  less interactive. (LP: #931236)
* debian/keystone.postinst: Don't create users or run a database sync
  since its not working correctly.
* debian/control: Dropped python-coverage and python-nosexcover.
* debian/changelog: Fixed changelog.
* debian/keystone.templates: Set it to false.
* debian/control: Fix lintian warnings.
* debian/patches/keystone-auth.patch: Backport auth token improvements,
  this can be dropped in the next snapshot.
* debian/control: Add python-memcache as a build dependency.
* debian/keystone-doc.docs: Fix keystone doc builds.
* debian/rules: Temporarily disable doc install.
* debian/control: Add python-ldap and python-lxml.

[ Joseph Heck ]
* debian/control: Dropped python-cli.

[ Adam Gandelman ]
* debian/control: Alphabetize python depends 
* debian/control: Add python-{eventlet, greenlet, passlib} to keystone
  depends
* debian/control: Add python-lxml to python-keystone Depends
* Drop 0001-Fix-keystone-all-failure-to-start.patch
* debian/logging.conf: Temporarily use old logging.conf until upstream
  ships something usable
* debain/patches/sql_connection.patch: Switch backends to use SQL backends
* debian/keystone.preinst: Create directories
* debian/keystone.postinst: Remove create_users stuff, add call to 'db_sync'
  on install

[ Dave Walker ]
* debian/patches/sql_connection.patch: Refreshed and reintroduced DEP-3
  headers.
* debian/control: Added Vcs-Bzr field.

[ Andrew Glen-Young ]
* debian/keystone.preinst: Set the primary group to keystone. (LP: #941905)

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# vim: tabstop=4 shiftwidth=4 softtabstop=4
 
2
 
 
3
import httplib
 
4
import json
 
5
 
 
6
from lxml import etree
 
7
import nose.exc
 
8
 
 
9
from keystone import test
 
10
from keystone.common import serializer
 
11
 
 
12
import default_fixtures
 
13
 
 
14
 
 
15
class RestfulTestCase(test.TestCase):
 
16
    """Performs restful tests against the WSGI app over HTTP.
 
17
 
 
18
    This class launches public & admin WSGI servers for every test, which can
 
19
    be accessed by calling ``public_request()`` or ``admin_request()``,
 
20
    respectfully.
 
21
 
 
22
    ``restful_request()`` and ``request()`` methods are also exposed if you
 
23
    need to bypass restful conventions or access HTTP details in your test
 
24
    implementation.
 
25
 
 
26
    Three new asserts are provided:
 
27
 
 
28
    * ``assertResponseSuccessful``: called automatically for every request
 
29
        unless an ``expected_status`` is provided
 
30
    * ``assertResponseStatus``: called instead of ``assertResponseSuccessful``,
 
31
        if an ``expected_status`` is provided
 
32
    * ``assertValidResponseHeaders``: validates that the response headers
 
33
        appear as expected
 
34
 
 
35
    Requests are automatically serialized according to the defined
 
36
    ``content_type``. Responses are automatically deserialized as well, and
 
37
    available in the ``response.body`` attribute. The original body content is
 
38
    available in the ``response.raw`` attribute.
 
39
 
 
40
    """
 
41
 
 
42
    # default content type to test
 
43
    content_type = 'json'
 
44
 
 
45
    def setUp(self):
 
46
        super(RestfulTestCase, self).setUp()
 
47
 
 
48
        self.load_backends()
 
49
        self.load_fixtures(default_fixtures)
 
50
 
 
51
        self.public_server = self.serveapp('keystone', name='main')
 
52
        self.admin_server = self.serveapp('keystone', name='admin')
 
53
 
 
54
        # TODO(termie): is_admin is being deprecated once the policy stuff
 
55
        #               is all working
 
56
        # TODO(termie): add an admin user to the fixtures and use that user
 
57
        # override the fixtures, for now
 
58
        self.metadata_foobar = self.identity_api.update_metadata(
 
59
                self.user_foo['id'],
 
60
                self.tenant_bar['id'],
 
61
                dict(roles=['keystone_admin'], is_admin='1'))
 
62
 
 
63
    def tearDown(self):
 
64
        """Kill running servers and release references to avoid leaks."""
 
65
        self.public_server.kill()
 
66
        self.admin_server.kill()
 
67
        self.public_server = None
 
68
        self.admin_server = None
 
69
        super(RestfulTestCase, self).tearDown()
 
70
 
 
71
    def request(self, host='0.0.0.0', port=80, method='GET', path='/',
 
72
                headers=None, body=None, expected_status=None):
 
73
        """Perform request and fetch httplib.HTTPResponse from the server."""
 
74
 
 
75
        # Initialize headers dictionary
 
76
        headers = {} if not headers else headers
 
77
 
 
78
        connection = httplib.HTTPConnection(host, port, timeout=10)
 
79
 
 
80
        # Perform the request
 
81
        connection.request(method, path, body, headers)
 
82
 
 
83
        # Retrieve the response so we can close the connection
 
84
        response = connection.getresponse()
 
85
 
 
86
        response.body = response.read()
 
87
 
 
88
        # Close the connection
 
89
        connection.close()
 
90
 
 
91
        # Automatically assert HTTP status code
 
92
        if expected_status:
 
93
            self.assertResponseStatus(response, expected_status)
 
94
        else:
 
95
            self.assertResponseSuccessful(response)
 
96
        self.assertValidResponseHeaders(response)
 
97
 
 
98
        # Contains the response headers, body, etc
 
99
        return response
 
100
 
 
101
    def assertResponseSuccessful(self, response):
 
102
        """Asserts that a status code lies inside the 2xx range.
 
103
 
 
104
        :param response: :py:class:`httplib.HTTPResponse` to be
 
105
          verified to have a status code between 200 and 299.
 
106
 
 
107
        example::
 
108
 
 
109
            >>> self.assertResponseSuccessful(response, 203)
 
110
        """
 
111
        self.assertTrue(response.status >= 200 and response.status <= 299,
 
112
            'Status code %d is outside of the expected range (2xx)\n\n%s' %
 
113
            (response.status, response.body))
 
114
 
 
115
    def assertResponseStatus(self, response, expected_status):
 
116
        """Asserts a specific status code on the response.
 
117
 
 
118
        :param response: :py:class:`httplib.HTTPResponse`
 
119
        :param assert_status: The specific ``status`` result expected
 
120
 
 
121
        example::
 
122
 
 
123
            >>> self.assertResponseStatus(response, 203)
 
124
        """
 
125
        self.assertEqual(response.status, expected_status,
 
126
            'Status code %s is not %s, as expected)\n\n%s' %
 
127
            (response.status, expected_status, response.body))
 
128
 
 
129
    def assertValidResponseHeaders(self, response):
 
130
        """Ensures that response headers appear as expected."""
 
131
        self.assertIn('X-Auth-Token', response.getheader('Vary'))
 
132
 
 
133
    def _to_content_type(self, body, headers, content_type=None):
 
134
        """Attempt to encode JSON and XML automatically."""
 
135
        content_type = content_type or self.content_type
 
136
 
 
137
        if content_type == 'json':
 
138
            headers['Accept'] = 'application/json'
 
139
            if body:
 
140
                headers['Content-Type'] = 'application/json'
 
141
                return json.dumps(body)
 
142
        elif content_type == 'xml':
 
143
            headers['Accept'] = 'application/xml'
 
144
            if body:
 
145
                headers['Content-Type'] = 'application/xml'
 
146
                return serializer.to_xml(body)
 
147
 
 
148
    def _from_content_type(self, response, content_type=None):
 
149
        """Attempt to decode JSON and XML automatically, if detected."""
 
150
        content_type = content_type or self.content_type
 
151
 
 
152
        # make the original response body available, for convenience
 
153
        response.raw = response.body
 
154
 
 
155
        if response.body is not None and response.body.strip():
 
156
            # if a body is provided, a Content-Type is also expected
 
157
            header = response.getheader('Content-Type', None)
 
158
            self.assertIn(self.content_type, header)
 
159
 
 
160
            if self.content_type == 'json':
 
161
                response.body = json.loads(response.body)
 
162
            elif self.content_type == 'xml':
 
163
                response.body = etree.fromstring(response.body)
 
164
 
 
165
    def restful_request(self, headers=None, body=None, token=None, **kwargs):
 
166
        """Serializes/deserializes json/xml as request/response body.
 
167
 
 
168
        .. WARNING::
 
169
 
 
170
            * Existing Accept header will be overwritten.
 
171
            * Existing Content-Type header will be overwritten.
 
172
 
 
173
        """
 
174
        # Initialize headers dictionary
 
175
        headers = {} if not headers else headers
 
176
 
 
177
        if token is not None:
 
178
            headers['X-Auth-Token'] = token
 
179
 
 
180
        body = self._to_content_type(body, headers)
 
181
 
 
182
        # Perform the HTTP request/response
 
183
        response = self.request(headers=headers, body=body, **kwargs)
 
184
 
 
185
        self._from_content_type(response)
 
186
 
 
187
        # we can save some code & improve coverage by always doing this
 
188
        if response.status >= 400:
 
189
            self.assertValidErrorResponse(response)
 
190
 
 
191
        # Contains the decoded response.body
 
192
        return response
 
193
 
 
194
    def _get_port(self, server):
 
195
        return server.socket_info['socket'][1]
 
196
 
 
197
    def _public_port(self):
 
198
        return self._get_port(self.public_server)
 
199
 
 
200
    def _admin_port(self):
 
201
        return self._get_port(self.admin_server)
 
202
 
 
203
    def public_request(self, port=None, **kwargs):
 
204
        kwargs['port'] = port or self._public_port()
 
205
        return self.restful_request(**kwargs)
 
206
 
 
207
    def admin_request(self, port=None, **kwargs):
 
208
        kwargs['port'] = port or self._admin_port()
 
209
        return self.restful_request(**kwargs)
 
210
 
 
211
    def get_scoped_token(self):
 
212
        """Convenience method so that we can test authenticated requests."""
 
213
        r = self.public_request(method='POST', path='/v2.0/tokens', body={
 
214
                'auth': {
 
215
                    'passwordCredentials': {
 
216
                        'username': self.user_foo['name'],
 
217
                        'password': self.user_foo['password'],
 
218
                    },
 
219
                    'tenantId': self.tenant_bar['id'],
 
220
                },
 
221
            })
 
222
        return self._get_token_id(r)
 
223
 
 
224
    def _get_token_id(self, r):
 
225
        """Helper method to return a token ID from a response.
 
226
 
 
227
        This needs to be overridden by child classes for on their content type.
 
228
 
 
229
        """
 
230
        raise NotImplementedError()
 
231
 
 
232
 
 
233
class CoreApiTests(object):
 
234
    def assertValidError(self, error):
 
235
        """Applicable to XML and JSON."""
 
236
        try:
 
237
            print error.attrib
 
238
        except:
 
239
            pass
 
240
        self.assertIsNotNone(error.get('code'))
 
241
        self.assertIsNotNone(error.get('title'))
 
242
        self.assertIsNotNone(error.get('message'))
 
243
 
 
244
    def assertValidVersion(self, version):
 
245
        """Applicable to XML and JSON.
 
246
 
 
247
        However, navigating links and media-types differs between content
 
248
        types so they need to be validated seperately.
 
249
 
 
250
        """
 
251
        self.assertIsNotNone(version)
 
252
        self.assertIsNotNone(version.get('id'))
 
253
        self.assertIsNotNone(version.get('status'))
 
254
        self.assertIsNotNone(version.get('updated'))
 
255
 
 
256
    def assertValidExtension(self, extension):
 
257
        """Applicable to XML and JSON.
 
258
 
 
259
        However, navigating extension links differs between content types.
 
260
        They need to be validated seperately with assertValidExtensionLink.
 
261
 
 
262
        """
 
263
        self.assertIsNotNone(extension)
 
264
        self.assertIsNotNone(extension.get('name'))
 
265
        self.assertIsNotNone(extension.get('namespace'))
 
266
        self.assertIsNotNone(extension.get('alias'))
 
267
        self.assertIsNotNone(extension.get('updated'))
 
268
 
 
269
    def assertValidExtensionLink(self, link):
 
270
        """Applicable to XML and JSON."""
 
271
        self.assertIsNotNone(link.get('rel'))
 
272
        self.assertIsNotNone(link.get('type'))
 
273
        self.assertIsNotNone(link.get('href'))
 
274
 
 
275
    def assertValidTenant(self, tenant):
 
276
        """Applicable to XML and JSON."""
 
277
        self.assertIsNotNone(tenant.get('id'))
 
278
        self.assertIsNotNone(tenant.get('name'))
 
279
 
 
280
    def assertValidUser(self, user):
 
281
        """Applicable to XML and JSON."""
 
282
        self.assertIsNotNone(user.get('id'))
 
283
        self.assertIsNotNone(user.get('name'))
 
284
 
 
285
    def assertValidRole(self, tenant):
 
286
        """Applicable to XML and JSON."""
 
287
        self.assertIsNotNone(tenant.get('id'))
 
288
        self.assertIsNotNone(tenant.get('name'))
 
289
 
 
290
    def test_public_multiple_choice(self):
 
291
        r = self.public_request(path='/', expected_status=300)
 
292
        self.assertValidMultipleChoiceResponse(r)
 
293
 
 
294
    def test_admin_multiple_choice(self):
 
295
        r = self.admin_request(path='/', expected_status=300)
 
296
        self.assertValidMultipleChoiceResponse(r)
 
297
 
 
298
    def test_public_version(self):
 
299
        r = self.public_request(path='/v2.0/')
 
300
        self.assertValidVersionResponse(r)
 
301
 
 
302
    def test_admin_version(self):
 
303
        r = self.admin_request(path='/v2.0/')
 
304
        self.assertValidVersionResponse(r)
 
305
 
 
306
    def test_public_extensions(self):
 
307
        self.public_request(path='/v2.0/extensions',)
 
308
 
 
309
        # TODO(dolph): can't test this without any public extensions defined
 
310
        # self.assertValidExtensionListResponse(r)
 
311
 
 
312
    def test_admin_extensions(self):
 
313
        r = self.admin_request(path='/v2.0/extensions',)
 
314
        self.assertValidExtensionListResponse(r)
 
315
 
 
316
    def test_admin_extensions_404(self):
 
317
        self.admin_request(path='/v2.0/extensions/invalid-extension',
 
318
                           expected_status=404)
 
319
 
 
320
    def test_public_osksadm_extension_404(self):
 
321
        self.public_request(path='/v2.0/extensions/OS-KSADM',
 
322
                            expected_status=404)
 
323
 
 
324
    def test_admin_osksadm_extension(self):
 
325
        r = self.admin_request(path='/v2.0/extensions/OS-KSADM')
 
326
        self.assertValidExtensionResponse(r)
 
327
 
 
328
    def test_authenticate(self):
 
329
        r = self.public_request(method='POST', path='/v2.0/tokens', body={
 
330
                'auth': {
 
331
                    'passwordCredentials': {
 
332
                        'username': self.user_foo['name'],
 
333
                        'password': self.user_foo['password'],
 
334
                    },
 
335
                    'tenantId': self.tenant_bar['id'],
 
336
                },
 
337
            },
 
338
            # TODO(dolph): creating a token should result in a 201 Created
 
339
            expected_status=200)
 
340
        self.assertValidAuthenticationResponse(r)
 
341
 
 
342
    def test_get_tenants_for_token(self):
 
343
        r = self.public_request(path='/v2.0/tenants',
 
344
            token=self.get_scoped_token())
 
345
        self.assertValidTenantListResponse(r)
 
346
 
 
347
    def test_validate_token(self):
 
348
        token = self.get_scoped_token()
 
349
        r = self.admin_request(path='/v2.0/tokens/%(token_id)s' % {
 
350
                'token_id': token,
 
351
            },
 
352
            token=token)
 
353
        self.assertValidAuthenticationResponse(r)
 
354
 
 
355
    def test_validate_token_head(self):
 
356
        """The same call as above, except using HEAD.
 
357
 
 
358
        There's no response to validate here, but this is included for the
 
359
        sake of completely covering the core API.
 
360
 
 
361
        """
 
362
        token = self.get_scoped_token()
 
363
        self.admin_request(method='HEAD', path='/v2.0/tokens/%(token_id)s' % {
 
364
                'token_id': token,
 
365
            },
 
366
            token=token,
 
367
            expected_status=204)
 
368
 
 
369
    def test_endpoints(self):
 
370
        raise nose.exc.SkipTest('Blocked by bug 933555')
 
371
 
 
372
        token = self.get_scoped_token()
 
373
        r = self.admin_request(path='/v2.0/tokens/%(token_id)s/endpoints' % {
 
374
                'token_id': token,
 
375
            },
 
376
            token=token)
 
377
        self.assertValidTokenCatalogResponse(r)
 
378
 
 
379
    def test_get_tenant(self):
 
380
        token = self.get_scoped_token()
 
381
        r = self.admin_request(path='/v2.0/tenants/%(tenant_id)s' % {
 
382
                'tenant_id': self.tenant_bar['id'],
 
383
            },
 
384
            token=token)
 
385
        self.assertValidTenantResponse(r)
 
386
 
 
387
    def test_get_user_roles(self):
 
388
        raise nose.exc.SkipTest('Blocked by bug 933565')
 
389
 
 
390
        token = self.get_scoped_token()
 
391
        r = self.admin_request(path='/v2.0/users/%(user_id)s/roles' % {
 
392
                'user_id': self.user_foo['id'],
 
393
            },
 
394
            token=token)
 
395
        self.assertValidRoleListResponse(r)
 
396
 
 
397
    def test_get_user_roles_with_tenant(self):
 
398
        token = self.get_scoped_token()
 
399
        r = self.admin_request(
 
400
            path='/v2.0/tenants/%(tenant_id)s/users/%(user_id)s/roles' % {
 
401
                'tenant_id': self.tenant_bar['id'],
 
402
                'user_id': self.user_foo['id'],
 
403
            },
 
404
            token=token)
 
405
        self.assertValidRoleListResponse(r)
 
406
 
 
407
    def test_get_user(self):
 
408
        token = self.get_scoped_token()
 
409
        r = self.admin_request(path='/v2.0/users/%(user_id)s' % {
 
410
                'user_id': self.user_foo['id'],
 
411
            },
 
412
            token=token)
 
413
        self.assertValidUserResponse(r)
 
414
 
 
415
    def test_error_response(self):
 
416
        """This triggers assertValidErrorResponse by convention."""
 
417
        self.public_request(path='/v2.0/tenants', expected_status=401)
 
418
 
 
419
 
 
420
class JsonTestCase(RestfulTestCase, CoreApiTests):
 
421
    content_type = 'json'
 
422
 
 
423
    def _get_token_id(self, r):
 
424
        """Applicable only to JSON."""
 
425
        return r.body['access']['token']['id']
 
426
 
 
427
    def assertValidErrorResponse(self, r):
 
428
        self.assertIsNotNone(r.body.get('error'))
 
429
        self.assertValidError(r.body['error'])
 
430
        self.assertEqual(r.body['error']['code'], r.status)
 
431
 
 
432
    def assertValidExtension(self, extension):
 
433
        super(JsonTestCase, self).assertValidExtension(extension)
 
434
 
 
435
        self.assertIsNotNone(extension.get('description'))
 
436
        self.assertIsNotNone(extension.get('links'))
 
437
        self.assertTrue(len(extension.get('links')))
 
438
        for link in extension.get('links'):
 
439
            self.assertValidExtensionLink(link)
 
440
 
 
441
    def assertValidExtensionListResponse(self, r):
 
442
        self.assertIsNotNone(r.body.get('extensions'))
 
443
        self.assertIsNotNone(r.body['extensions'].get('values'))
 
444
        self.assertTrue(len(r.body['extensions'].get('values')))
 
445
        for extension in r.body['extensions']['values']:
 
446
            self.assertValidExtension(extension)
 
447
 
 
448
    def assertValidExtensionResponse(self, r):
 
449
        self.assertValidExtension(r.body.get('extension'))
 
450
 
 
451
    def assertValidAuthenticationResponse(self, r):
 
452
        self.assertIsNotNone(r.body.get('access'))
 
453
        self.assertIsNotNone(r.body['access'].get('token'))
 
454
        self.assertIsNotNone(r.body['access'].get('user'))
 
455
 
 
456
        # validate token
 
457
        self.assertIsNotNone(r.body['access']['token'].get('id'))
 
458
        self.assertIsNotNone(r.body['access']['token'].get('expires'))
 
459
        tenant = r.body['access']['token'].get('tenant')
 
460
        if tenant is not None:
 
461
            # validate tenant
 
462
            self.assertIsNotNone(tenant.get('id'))
 
463
            self.assertIsNotNone(tenant.get('name'))
 
464
 
 
465
        # validate user
 
466
        self.assertIsNotNone(r.body['access']['user'].get('id'))
 
467
        self.assertIsNotNone(r.body['access']['user'].get('name'))
 
468
 
 
469
        # validate service catalog
 
470
        if r.body['access'].get('serviceCatalog') is not None:
 
471
            self.assertTrue(len(r.body['access']['serviceCatalog']))
 
472
            for service in r.body['access']['serviceCatalog']:
 
473
                # validate service
 
474
                self.assertIsNotNone(service.get('name'))
 
475
                self.assertIsNotNone(service.get('type'))
 
476
 
 
477
                # services contain at least one endpoint
 
478
                self.assertIsNotNone(service.get('endpoints'))
 
479
                self.assertTrue(len(service['endpoints']))
 
480
                for endpoint in service['endpoints']:
 
481
                    # validate service endpoint
 
482
                    self.assertIsNotNone(endpoint.get('publicURL'))
 
483
 
 
484
    def assertValidTenantListResponse(self, r):
 
485
        self.assertIsNotNone(r.body.get('tenants'))
 
486
        self.assertTrue(len(r.body['tenants']))
 
487
        for tenant in r.body['tenants']:
 
488
            self.assertValidTenant(tenant)
 
489
            self.assertIsNotNone(tenant.get('enabled'))
 
490
            self.assertIn(tenant.get('enabled'), [True, False])
 
491
 
 
492
    def assertValidUserResponse(self, r):
 
493
        self.assertIsNotNone(r.body.get('user'))
 
494
        self.assertValidUser(r.body['user'])
 
495
 
 
496
    def assertValidTenantResponse(self, r):
 
497
        self.assertIsNotNone(r.body.get('tenant'))
 
498
        self.assertValidTenant(r.body['tenant'])
 
499
 
 
500
    def assertValidRoleListResponse(self, r):
 
501
        self.assertIsNotNone(r.body.get('roles'))
 
502
        self.assertTrue(len(r.body['roles']))
 
503
        for role in r.body['roles']:
 
504
            self.assertValidRole(role)
 
505
 
 
506
    def assertValidVersion(self, version):
 
507
        super(JsonTestCase, self).assertValidVersion(version)
 
508
 
 
509
        self.assertIsNotNone(version.get('links'))
 
510
        self.assertTrue(len(version.get('links')))
 
511
        for link in version.get('links'):
 
512
            self.assertIsNotNone(link.get('rel'))
 
513
            self.assertIsNotNone(link.get('href'))
 
514
 
 
515
        self.assertIsNotNone(version.get('media-types'))
 
516
        self.assertTrue(len(version.get('media-types')))
 
517
        for media in version.get('media-types'):
 
518
            self.assertIsNotNone(media.get('base'))
 
519
            self.assertIsNotNone(media.get('type'))
 
520
 
 
521
    def assertValidMultipleChoiceResponse(self, r):
 
522
        self.assertIsNotNone(r.body.get('versions'))
 
523
        self.assertIsNotNone(r.body['versions'].get('values'))
 
524
        self.assertTrue(len(r.body['versions']['values']))
 
525
        for version in r.body['versions']['values']:
 
526
            self.assertValidVersion(version)
 
527
 
 
528
    def assertValidVersionResponse(self, r):
 
529
        self.assertValidVersion(r.body.get('version'))
 
530
 
 
531
 
 
532
class XmlTestCase(RestfulTestCase, CoreApiTests):
 
533
    xmlns = 'http://docs.openstack.org/identity/api/v2.0'
 
534
    content_type = 'xml'
 
535
 
 
536
    def _get_token_id(self, r):
 
537
        return r.body.find(self._tag('token')).get('id')
 
538
 
 
539
    def _tag(self, tag_name, xmlns=None):
 
540
        """Helper method to build an namespaced element name."""
 
541
        return '{%(ns)s}%(tag)s' % {'ns': xmlns or self.xmlns, 'tag': tag_name}
 
542
 
 
543
    def assertValidErrorResponse(self, r):
 
544
        xml = r.body
 
545
        self.assertEqual(xml.tag, self._tag('error'))
 
546
 
 
547
        self.assertValidError(xml)
 
548
        self.assertEqual(xml.get('code'), str(r.status))
 
549
 
 
550
    def assertValidExtension(self, extension):
 
551
        super(XmlTestCase, self).assertValidExtension(extension)
 
552
 
 
553
        self.assertIsNotNone(extension.find(self._tag('description')))
 
554
        self.assertTrue(extension.find(self._tag('description')).text)
 
555
        self.assertTrue(len(extension.findall(self._tag('link'))))
 
556
        for link in extension.findall(self._tag('link')):
 
557
            self.assertValidExtensionLink(link)
 
558
 
 
559
    def assertValidExtensionListResponse(self, r):
 
560
        xml = r.body
 
561
        self.assertEqual(xml.tag, self._tag('extensions'))
 
562
 
 
563
        self.assertTrue(len(xml.findall(self._tag('extension'))))
 
564
        for extension in xml.findall(self._tag('extension')):
 
565
            self.assertValidExtension(extension)
 
566
 
 
567
    def assertValidExtensionResponse(self, r):
 
568
        xml = r.body
 
569
        self.assertEqual(xml.tag, self._tag('extension'))
 
570
 
 
571
        self.assertValidExtension(xml)
 
572
 
 
573
    def assertValidVersion(self, version):
 
574
        super(XmlTestCase, self).assertValidVersion(version)
 
575
 
 
576
        self.assertTrue(len(version.findall(self._tag('link'))))
 
577
        for link in version.findall(self._tag('link')):
 
578
            self.assertIsNotNone(link.get('rel'))
 
579
            self.assertIsNotNone(link.get('href'))
 
580
 
 
581
        media_types = version.find(self._tag('media-types'))
 
582
        self.assertIsNotNone(media_types)
 
583
        self.assertTrue(len(media_types.findall(self._tag('media-type'))))
 
584
        for media in media_types.findall(self._tag('media-type')):
 
585
            self.assertIsNotNone(media.get('base'))
 
586
            self.assertIsNotNone(media.get('type'))
 
587
 
 
588
    def assertValidMultipleChoiceResponse(self, r):
 
589
        xml = r.body
 
590
        self.assertEqual(xml.tag, self._tag('versions'))
 
591
 
 
592
        self.assertTrue(len(xml.findall(self._tag('version'))))
 
593
        for version in xml.findall(self._tag('version')):
 
594
            self.assertValidVersion(version)
 
595
 
 
596
    def assertValidVersionResponse(self, r):
 
597
        xml = r.body
 
598
        self.assertEqual(xml.tag, self._tag('version'))
 
599
 
 
600
        self.assertValidVersion(xml)
 
601
 
 
602
    def assertValidTokenCatalogResponse(self, r):
 
603
        xml = r.body
 
604
        self.assertEqual(xml.tag, self._tag('endpoints'))
 
605
 
 
606
        self.assertTrue(len(xml.findall(self._tag('endpoint'))))
 
607
        for endpoint in xml.findall(self._tag('endpoint')):
 
608
            self.assertIsNotNone(endpoint.get('publicUrl'))
 
609
 
 
610
    def assertValidTenantResponse(self, r):
 
611
        xml = r.body
 
612
        self.assertEqual(xml.tag, self._tag('tenant'))
 
613
 
 
614
        self.assertValidTenant(xml)
 
615
 
 
616
    def assertValidUserResponse(self, r):
 
617
        xml = r.body
 
618
        self.assertEqual(xml.tag, self._tag('user'))
 
619
 
 
620
        self.assertValidUser(xml)
 
621
 
 
622
    def assertValidRoleListResponse(self, r):
 
623
        xml = r.body
 
624
        self.assertEqual(xml.tag, self._tag('roles'))
 
625
 
 
626
        self.assertTrue(len(r.body.findall(self._tag('role'))))
 
627
        for role in r.body.findall(self._tag('role')):
 
628
            self.assertValidRole(role)
 
629
 
 
630
    def assertValidAuthenticationResponse(self, r):
 
631
        xml = r.body
 
632
        self.assertEqual(xml.tag, self._tag('access'))
 
633
 
 
634
        # validate token
 
635
        token = xml.find(self._tag('token'))
 
636
        self.assertIsNotNone(token)
 
637
        self.assertIsNotNone(token.get('id'))
 
638
        self.assertIsNotNone(token.get('expires'))
 
639
        tenant = token.find(self._tag('tenant'))
 
640
        if tenant is not None:
 
641
            # validate tenant
 
642
            self.assertValidTenant(tenant)
 
643
            self.assertIn(tenant.get('enabled'), ['true', 'false'])
 
644
 
 
645
        user = xml.find(self._tag('user'))
 
646
        self.assertIsNotNone(user)
 
647
        self.assertIsNotNone(user.get('id'))
 
648
        self.assertIsNotNone(user.get('name'))
 
649
 
 
650
        serviceCatalog = xml.find(self._tag('serviceCatalog'))
 
651
        if serviceCatalog is not None:
 
652
            self.assertTrue(len(serviceCatalog.findall(self._tag('service'))))
 
653
            for service in serviceCatalog.findall(self._tag('service')):
 
654
                # validate service
 
655
                self.assertIsNotNone(service.get('name'))
 
656
                self.assertIsNotNone(service.get('type'))
 
657
 
 
658
                # services contain at least one endpoint
 
659
                self.assertTrue(len(service))
 
660
                for endpoint in service.findall(self._tag('endpoint')):
 
661
                    # validate service endpoint
 
662
                    self.assertIsNotNone(endpoint.get('publicURL'))
 
663
 
 
664
    def assertValidTenantListResponse(self, r):
 
665
        xml = r.body
 
666
        self.assertEqual(xml.tag, self._tag('tenants'))
 
667
 
 
668
        self.assertTrue(len(r.body))
 
669
        for tenant in r.body.findall(self._tag('tenant')):
 
670
            self.assertValidTenant(tenant)
 
671
            self.assertIn(tenant.get('enabled'), ['true', 'false'])