1
# vim: tabstop=4 shiftwidth=4 softtabstop=4
9
from keystone import test
10
from keystone.common import serializer
12
import default_fixtures
15
class RestfulTestCase(test.TestCase):
16
"""Performs restful tests against the WSGI app over HTTP.
18
This class launches public & admin WSGI servers for every test, which can
19
be accessed by calling ``public_request()`` or ``admin_request()``,
22
``restful_request()`` and ``request()`` methods are also exposed if you
23
need to bypass restful conventions or access HTTP details in your test
26
Three new asserts are provided:
28
* ``assertResponseSuccessful``: called automatically for every request
29
unless an ``expected_status`` is provided
30
* ``assertResponseStatus``: called instead of ``assertResponseSuccessful``,
31
if an ``expected_status`` is provided
32
* ``assertValidResponseHeaders``: validates that the response headers
35
Requests are automatically serialized according to the defined
36
``content_type``. Responses are automatically deserialized as well, and
37
available in the ``response.body`` attribute. The original body content is
38
available in the ``response.raw`` attribute.
42
# default content type to test
46
super(RestfulTestCase, self).setUp()
49
self.load_fixtures(default_fixtures)
51
self.public_server = self.serveapp('keystone', name='main')
52
self.admin_server = self.serveapp('keystone', name='admin')
54
# TODO(termie): is_admin is being deprecated once the policy stuff
56
# TODO(termie): add an admin user to the fixtures and use that user
57
# override the fixtures, for now
58
self.metadata_foobar = self.identity_api.update_metadata(
60
self.tenant_bar['id'],
61
dict(roles=['keystone_admin'], is_admin='1'))
64
"""Kill running servers and release references to avoid leaks."""
65
self.public_server.kill()
66
self.admin_server.kill()
67
self.public_server = None
68
self.admin_server = None
69
super(RestfulTestCase, self).tearDown()
71
def request(self, host='0.0.0.0', port=80, method='GET', path='/',
72
headers=None, body=None, expected_status=None):
73
"""Perform request and fetch httplib.HTTPResponse from the server."""
75
# Initialize headers dictionary
76
headers = {} if not headers else headers
78
connection = httplib.HTTPConnection(host, port, timeout=10)
81
connection.request(method, path, body, headers)
83
# Retrieve the response so we can close the connection
84
response = connection.getresponse()
86
response.body = response.read()
88
# Close the connection
91
# Automatically assert HTTP status code
93
self.assertResponseStatus(response, expected_status)
95
self.assertResponseSuccessful(response)
96
self.assertValidResponseHeaders(response)
98
# Contains the response headers, body, etc
101
def assertResponseSuccessful(self, response):
102
"""Asserts that a status code lies inside the 2xx range.
104
:param response: :py:class:`httplib.HTTPResponse` to be
105
verified to have a status code between 200 and 299.
109
>>> self.assertResponseSuccessful(response, 203)
111
self.assertTrue(response.status >= 200 and response.status <= 299,
112
'Status code %d is outside of the expected range (2xx)\n\n%s' %
113
(response.status, response.body))
115
def assertResponseStatus(self, response, expected_status):
116
"""Asserts a specific status code on the response.
118
:param response: :py:class:`httplib.HTTPResponse`
119
:param assert_status: The specific ``status`` result expected
123
>>> self.assertResponseStatus(response, 203)
125
self.assertEqual(response.status, expected_status,
126
'Status code %s is not %s, as expected)\n\n%s' %
127
(response.status, expected_status, response.body))
129
def assertValidResponseHeaders(self, response):
130
"""Ensures that response headers appear as expected."""
131
self.assertIn('X-Auth-Token', response.getheader('Vary'))
133
def _to_content_type(self, body, headers, content_type=None):
134
"""Attempt to encode JSON and XML automatically."""
135
content_type = content_type or self.content_type
137
if content_type == 'json':
138
headers['Accept'] = 'application/json'
140
headers['Content-Type'] = 'application/json'
141
return json.dumps(body)
142
elif content_type == 'xml':
143
headers['Accept'] = 'application/xml'
145
headers['Content-Type'] = 'application/xml'
146
return serializer.to_xml(body)
148
def _from_content_type(self, response, content_type=None):
149
"""Attempt to decode JSON and XML automatically, if detected."""
150
content_type = content_type or self.content_type
152
# make the original response body available, for convenience
153
response.raw = response.body
155
if response.body is not None and response.body.strip():
156
# if a body is provided, a Content-Type is also expected
157
header = response.getheader('Content-Type', None)
158
self.assertIn(self.content_type, header)
160
if self.content_type == 'json':
161
response.body = json.loads(response.body)
162
elif self.content_type == 'xml':
163
response.body = etree.fromstring(response.body)
165
def restful_request(self, headers=None, body=None, token=None, **kwargs):
166
"""Serializes/deserializes json/xml as request/response body.
170
* Existing Accept header will be overwritten.
171
* Existing Content-Type header will be overwritten.
174
# Initialize headers dictionary
175
headers = {} if not headers else headers
177
if token is not None:
178
headers['X-Auth-Token'] = token
180
body = self._to_content_type(body, headers)
182
# Perform the HTTP request/response
183
response = self.request(headers=headers, body=body, **kwargs)
185
self._from_content_type(response)
187
# we can save some code & improve coverage by always doing this
188
if response.status >= 400:
189
self.assertValidErrorResponse(response)
191
# Contains the decoded response.body
194
def _get_port(self, server):
195
return server.socket_info['socket'][1]
197
def _public_port(self):
198
return self._get_port(self.public_server)
200
def _admin_port(self):
201
return self._get_port(self.admin_server)
203
def public_request(self, port=None, **kwargs):
204
kwargs['port'] = port or self._public_port()
205
return self.restful_request(**kwargs)
207
def admin_request(self, port=None, **kwargs):
208
kwargs['port'] = port or self._admin_port()
209
return self.restful_request(**kwargs)
211
def get_scoped_token(self):
212
"""Convenience method so that we can test authenticated requests."""
213
r = self.public_request(method='POST', path='/v2.0/tokens', body={
215
'passwordCredentials': {
216
'username': self.user_foo['name'],
217
'password': self.user_foo['password'],
219
'tenantId': self.tenant_bar['id'],
222
return self._get_token_id(r)
224
def _get_token_id(self, r):
225
"""Helper method to return a token ID from a response.
227
This needs to be overridden by child classes for on their content type.
230
raise NotImplementedError()
233
class CoreApiTests(object):
234
def assertValidError(self, error):
235
"""Applicable to XML and JSON."""
240
self.assertIsNotNone(error.get('code'))
241
self.assertIsNotNone(error.get('title'))
242
self.assertIsNotNone(error.get('message'))
244
def assertValidVersion(self, version):
245
"""Applicable to XML and JSON.
247
However, navigating links and media-types differs between content
248
types so they need to be validated seperately.
251
self.assertIsNotNone(version)
252
self.assertIsNotNone(version.get('id'))
253
self.assertIsNotNone(version.get('status'))
254
self.assertIsNotNone(version.get('updated'))
256
def assertValidExtension(self, extension):
257
"""Applicable to XML and JSON.
259
However, navigating extension links differs between content types.
260
They need to be validated seperately with assertValidExtensionLink.
263
self.assertIsNotNone(extension)
264
self.assertIsNotNone(extension.get('name'))
265
self.assertIsNotNone(extension.get('namespace'))
266
self.assertIsNotNone(extension.get('alias'))
267
self.assertIsNotNone(extension.get('updated'))
269
def assertValidExtensionLink(self, link):
270
"""Applicable to XML and JSON."""
271
self.assertIsNotNone(link.get('rel'))
272
self.assertIsNotNone(link.get('type'))
273
self.assertIsNotNone(link.get('href'))
275
def assertValidTenant(self, tenant):
276
"""Applicable to XML and JSON."""
277
self.assertIsNotNone(tenant.get('id'))
278
self.assertIsNotNone(tenant.get('name'))
280
def assertValidUser(self, user):
281
"""Applicable to XML and JSON."""
282
self.assertIsNotNone(user.get('id'))
283
self.assertIsNotNone(user.get('name'))
285
def assertValidRole(self, tenant):
286
"""Applicable to XML and JSON."""
287
self.assertIsNotNone(tenant.get('id'))
288
self.assertIsNotNone(tenant.get('name'))
290
def test_public_multiple_choice(self):
291
r = self.public_request(path='/', expected_status=300)
292
self.assertValidMultipleChoiceResponse(r)
294
def test_admin_multiple_choice(self):
295
r = self.admin_request(path='/', expected_status=300)
296
self.assertValidMultipleChoiceResponse(r)
298
def test_public_version(self):
299
r = self.public_request(path='/v2.0/')
300
self.assertValidVersionResponse(r)
302
def test_admin_version(self):
303
r = self.admin_request(path='/v2.0/')
304
self.assertValidVersionResponse(r)
306
def test_public_extensions(self):
307
self.public_request(path='/v2.0/extensions',)
309
# TODO(dolph): can't test this without any public extensions defined
310
# self.assertValidExtensionListResponse(r)
312
def test_admin_extensions(self):
313
r = self.admin_request(path='/v2.0/extensions',)
314
self.assertValidExtensionListResponse(r)
316
def test_admin_extensions_404(self):
317
self.admin_request(path='/v2.0/extensions/invalid-extension',
320
def test_public_osksadm_extension_404(self):
321
self.public_request(path='/v2.0/extensions/OS-KSADM',
324
def test_admin_osksadm_extension(self):
325
r = self.admin_request(path='/v2.0/extensions/OS-KSADM')
326
self.assertValidExtensionResponse(r)
328
def test_authenticate(self):
329
r = self.public_request(method='POST', path='/v2.0/tokens', body={
331
'passwordCredentials': {
332
'username': self.user_foo['name'],
333
'password': self.user_foo['password'],
335
'tenantId': self.tenant_bar['id'],
338
# TODO(dolph): creating a token should result in a 201 Created
340
self.assertValidAuthenticationResponse(r)
342
def test_get_tenants_for_token(self):
343
r = self.public_request(path='/v2.0/tenants',
344
token=self.get_scoped_token())
345
self.assertValidTenantListResponse(r)
347
def test_validate_token(self):
348
token = self.get_scoped_token()
349
r = self.admin_request(path='/v2.0/tokens/%(token_id)s' % {
353
self.assertValidAuthenticationResponse(r)
355
def test_validate_token_head(self):
356
"""The same call as above, except using HEAD.
358
There's no response to validate here, but this is included for the
359
sake of completely covering the core API.
362
token = self.get_scoped_token()
363
self.admin_request(method='HEAD', path='/v2.0/tokens/%(token_id)s' % {
369
def test_endpoints(self):
370
raise nose.exc.SkipTest('Blocked by bug 933555')
372
token = self.get_scoped_token()
373
r = self.admin_request(path='/v2.0/tokens/%(token_id)s/endpoints' % {
377
self.assertValidTokenCatalogResponse(r)
379
def test_get_tenant(self):
380
token = self.get_scoped_token()
381
r = self.admin_request(path='/v2.0/tenants/%(tenant_id)s' % {
382
'tenant_id': self.tenant_bar['id'],
385
self.assertValidTenantResponse(r)
387
def test_get_user_roles(self):
388
raise nose.exc.SkipTest('Blocked by bug 933565')
390
token = self.get_scoped_token()
391
r = self.admin_request(path='/v2.0/users/%(user_id)s/roles' % {
392
'user_id': self.user_foo['id'],
395
self.assertValidRoleListResponse(r)
397
def test_get_user_roles_with_tenant(self):
398
token = self.get_scoped_token()
399
r = self.admin_request(
400
path='/v2.0/tenants/%(tenant_id)s/users/%(user_id)s/roles' % {
401
'tenant_id': self.tenant_bar['id'],
402
'user_id': self.user_foo['id'],
405
self.assertValidRoleListResponse(r)
407
def test_get_user(self):
408
token = self.get_scoped_token()
409
r = self.admin_request(path='/v2.0/users/%(user_id)s' % {
410
'user_id': self.user_foo['id'],
413
self.assertValidUserResponse(r)
415
def test_error_response(self):
416
"""This triggers assertValidErrorResponse by convention."""
417
self.public_request(path='/v2.0/tenants', expected_status=401)
420
class JsonTestCase(RestfulTestCase, CoreApiTests):
421
content_type = 'json'
423
def _get_token_id(self, r):
424
"""Applicable only to JSON."""
425
return r.body['access']['token']['id']
427
def assertValidErrorResponse(self, r):
428
self.assertIsNotNone(r.body.get('error'))
429
self.assertValidError(r.body['error'])
430
self.assertEqual(r.body['error']['code'], r.status)
432
def assertValidExtension(self, extension):
433
super(JsonTestCase, self).assertValidExtension(extension)
435
self.assertIsNotNone(extension.get('description'))
436
self.assertIsNotNone(extension.get('links'))
437
self.assertTrue(len(extension.get('links')))
438
for link in extension.get('links'):
439
self.assertValidExtensionLink(link)
441
def assertValidExtensionListResponse(self, r):
442
self.assertIsNotNone(r.body.get('extensions'))
443
self.assertIsNotNone(r.body['extensions'].get('values'))
444
self.assertTrue(len(r.body['extensions'].get('values')))
445
for extension in r.body['extensions']['values']:
446
self.assertValidExtension(extension)
448
def assertValidExtensionResponse(self, r):
449
self.assertValidExtension(r.body.get('extension'))
451
def assertValidAuthenticationResponse(self, r):
452
self.assertIsNotNone(r.body.get('access'))
453
self.assertIsNotNone(r.body['access'].get('token'))
454
self.assertIsNotNone(r.body['access'].get('user'))
457
self.assertIsNotNone(r.body['access']['token'].get('id'))
458
self.assertIsNotNone(r.body['access']['token'].get('expires'))
459
tenant = r.body['access']['token'].get('tenant')
460
if tenant is not None:
462
self.assertIsNotNone(tenant.get('id'))
463
self.assertIsNotNone(tenant.get('name'))
466
self.assertIsNotNone(r.body['access']['user'].get('id'))
467
self.assertIsNotNone(r.body['access']['user'].get('name'))
469
# validate service catalog
470
if r.body['access'].get('serviceCatalog') is not None:
471
self.assertTrue(len(r.body['access']['serviceCatalog']))
472
for service in r.body['access']['serviceCatalog']:
474
self.assertIsNotNone(service.get('name'))
475
self.assertIsNotNone(service.get('type'))
477
# services contain at least one endpoint
478
self.assertIsNotNone(service.get('endpoints'))
479
self.assertTrue(len(service['endpoints']))
480
for endpoint in service['endpoints']:
481
# validate service endpoint
482
self.assertIsNotNone(endpoint.get('publicURL'))
484
def assertValidTenantListResponse(self, r):
485
self.assertIsNotNone(r.body.get('tenants'))
486
self.assertTrue(len(r.body['tenants']))
487
for tenant in r.body['tenants']:
488
self.assertValidTenant(tenant)
489
self.assertIsNotNone(tenant.get('enabled'))
490
self.assertIn(tenant.get('enabled'), [True, False])
492
def assertValidUserResponse(self, r):
493
self.assertIsNotNone(r.body.get('user'))
494
self.assertValidUser(r.body['user'])
496
def assertValidTenantResponse(self, r):
497
self.assertIsNotNone(r.body.get('tenant'))
498
self.assertValidTenant(r.body['tenant'])
500
def assertValidRoleListResponse(self, r):
501
self.assertIsNotNone(r.body.get('roles'))
502
self.assertTrue(len(r.body['roles']))
503
for role in r.body['roles']:
504
self.assertValidRole(role)
506
def assertValidVersion(self, version):
507
super(JsonTestCase, self).assertValidVersion(version)
509
self.assertIsNotNone(version.get('links'))
510
self.assertTrue(len(version.get('links')))
511
for link in version.get('links'):
512
self.assertIsNotNone(link.get('rel'))
513
self.assertIsNotNone(link.get('href'))
515
self.assertIsNotNone(version.get('media-types'))
516
self.assertTrue(len(version.get('media-types')))
517
for media in version.get('media-types'):
518
self.assertIsNotNone(media.get('base'))
519
self.assertIsNotNone(media.get('type'))
521
def assertValidMultipleChoiceResponse(self, r):
522
self.assertIsNotNone(r.body.get('versions'))
523
self.assertIsNotNone(r.body['versions'].get('values'))
524
self.assertTrue(len(r.body['versions']['values']))
525
for version in r.body['versions']['values']:
526
self.assertValidVersion(version)
528
def assertValidVersionResponse(self, r):
529
self.assertValidVersion(r.body.get('version'))
532
class XmlTestCase(RestfulTestCase, CoreApiTests):
533
xmlns = 'http://docs.openstack.org/identity/api/v2.0'
536
def _get_token_id(self, r):
537
return r.body.find(self._tag('token')).get('id')
539
def _tag(self, tag_name, xmlns=None):
540
"""Helper method to build an namespaced element name."""
541
return '{%(ns)s}%(tag)s' % {'ns': xmlns or self.xmlns, 'tag': tag_name}
543
def assertValidErrorResponse(self, r):
545
self.assertEqual(xml.tag, self._tag('error'))
547
self.assertValidError(xml)
548
self.assertEqual(xml.get('code'), str(r.status))
550
def assertValidExtension(self, extension):
551
super(XmlTestCase, self).assertValidExtension(extension)
553
self.assertIsNotNone(extension.find(self._tag('description')))
554
self.assertTrue(extension.find(self._tag('description')).text)
555
self.assertTrue(len(extension.findall(self._tag('link'))))
556
for link in extension.findall(self._tag('link')):
557
self.assertValidExtensionLink(link)
559
def assertValidExtensionListResponse(self, r):
561
self.assertEqual(xml.tag, self._tag('extensions'))
563
self.assertTrue(len(xml.findall(self._tag('extension'))))
564
for extension in xml.findall(self._tag('extension')):
565
self.assertValidExtension(extension)
567
def assertValidExtensionResponse(self, r):
569
self.assertEqual(xml.tag, self._tag('extension'))
571
self.assertValidExtension(xml)
573
def assertValidVersion(self, version):
574
super(XmlTestCase, self).assertValidVersion(version)
576
self.assertTrue(len(version.findall(self._tag('link'))))
577
for link in version.findall(self._tag('link')):
578
self.assertIsNotNone(link.get('rel'))
579
self.assertIsNotNone(link.get('href'))
581
media_types = version.find(self._tag('media-types'))
582
self.assertIsNotNone(media_types)
583
self.assertTrue(len(media_types.findall(self._tag('media-type'))))
584
for media in media_types.findall(self._tag('media-type')):
585
self.assertIsNotNone(media.get('base'))
586
self.assertIsNotNone(media.get('type'))
588
def assertValidMultipleChoiceResponse(self, r):
590
self.assertEqual(xml.tag, self._tag('versions'))
592
self.assertTrue(len(xml.findall(self._tag('version'))))
593
for version in xml.findall(self._tag('version')):
594
self.assertValidVersion(version)
596
def assertValidVersionResponse(self, r):
598
self.assertEqual(xml.tag, self._tag('version'))
600
self.assertValidVersion(xml)
602
def assertValidTokenCatalogResponse(self, r):
604
self.assertEqual(xml.tag, self._tag('endpoints'))
606
self.assertTrue(len(xml.findall(self._tag('endpoint'))))
607
for endpoint in xml.findall(self._tag('endpoint')):
608
self.assertIsNotNone(endpoint.get('publicUrl'))
610
def assertValidTenantResponse(self, r):
612
self.assertEqual(xml.tag, self._tag('tenant'))
614
self.assertValidTenant(xml)
616
def assertValidUserResponse(self, r):
618
self.assertEqual(xml.tag, self._tag('user'))
620
self.assertValidUser(xml)
622
def assertValidRoleListResponse(self, r):
624
self.assertEqual(xml.tag, self._tag('roles'))
626
self.assertTrue(len(r.body.findall(self._tag('role'))))
627
for role in r.body.findall(self._tag('role')):
628
self.assertValidRole(role)
630
def assertValidAuthenticationResponse(self, r):
632
self.assertEqual(xml.tag, self._tag('access'))
635
token = xml.find(self._tag('token'))
636
self.assertIsNotNone(token)
637
self.assertIsNotNone(token.get('id'))
638
self.assertIsNotNone(token.get('expires'))
639
tenant = token.find(self._tag('tenant'))
640
if tenant is not None:
642
self.assertValidTenant(tenant)
643
self.assertIn(tenant.get('enabled'), ['true', 'false'])
645
user = xml.find(self._tag('user'))
646
self.assertIsNotNone(user)
647
self.assertIsNotNone(user.get('id'))
648
self.assertIsNotNone(user.get('name'))
650
serviceCatalog = xml.find(self._tag('serviceCatalog'))
651
if serviceCatalog is not None:
652
self.assertTrue(len(serviceCatalog.findall(self._tag('service'))))
653
for service in serviceCatalog.findall(self._tag('service')):
655
self.assertIsNotNone(service.get('name'))
656
self.assertIsNotNone(service.get('type'))
658
# services contain at least one endpoint
659
self.assertTrue(len(service))
660
for endpoint in service.findall(self._tag('endpoint')):
661
# validate service endpoint
662
self.assertIsNotNone(endpoint.get('publicURL'))
664
def assertValidTenantListResponse(self, r):
666
self.assertEqual(xml.tag, self._tag('tenants'))
668
self.assertTrue(len(r.body))
669
for tenant in r.body.findall(self._tag('tenant')):
670
self.assertValidTenant(tenant)
671
self.assertIn(tenant.get('enabled'), ['true', 'false'])