1
# vim: tabstop=4 shiftwidth=4 softtabstop=4
2
# Copyright (c) 2011 OpenStack, LLC.
4
# Licensed under the Apache License, Version 2.0 (the "License");
5
# you may not use this file except in compliance with the License.
6
# You may obtain a copy of the License at
8
# http://www.apache.org/licenses/LICENSE-2.0
10
# Unless required by applicable law or agreed to in writing, software
11
# distributed under the License is distributed on an "AS IS" BASIS,
12
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
14
# See the License for the specific language governing permissions and
15
# limitations under the License.
17
"""Base test case classes for the unit tests"""
26
sys.path.append(os.path.abspath(os.path.join(os.path.abspath(__file__),
27
'..', '..', '..', '..', '..', 'keystone')))
28
import unittest2 as unittest
30
from lxml import etree, objectify
33
from keystone import server
34
import keystone.backends.sqlalchemy as db
35
import keystone.backends.api as db_api
37
logger = logging.getLogger('test.unit.base')
40
class ServiceAPITest(unittest.TestCase):
43
Base test case class for any unit test that tests the main service API.
47
The `api` attribute for this base class is the `server.KeystoneAPI`
50
api_class = server.ServiceApi
53
Set of dicts of tenant attributes we start each test case with
61
Attributes of the user the test creates for each test case that
62
will authenticate against the API. The `auth_user` attribute
63
will contain the created user with the following attributes.
65
auth_user_attrs = {'id': 'auth_user',
66
'password': 'auth_pass',
67
'email': 'auth_user@example.com',
69
'tenant_id': 'tenant1'}
71
Special attribute that is the identifier of the token we use in
72
authenticating. Makes it easy to test the authentication process.
74
auth_token_id = 'SPECIALAUTHTOKEN'
77
Content-type of requests. Generally, you don't need to manually
78
change this. Instead, :see test.unit.decorators
83
Version of the API to test
88
Dict of configuration options to pass to the API controller
91
'backends': "keystone.backends.sqlalchemy",
92
'keystone.backends.sqlalchemy': {
93
'sql_connection': 'sqlite://', # in-memory db
97
"['UserRoleAssociation', 'Endpoints', 'Role', 'Tenant', "
98
"'Tenant', 'User', 'Credentials', 'EndpointTemplates', "
99
"'Token', 'Service']",
101
'keystone-admin-role': 'Admin',
102
'keystone-service-admin-role': 'KeystoneServiceAdmin',
106
self.api = self.api_class(self.options)
109
self.expires = dt.datetime.utcnow() + dt.timedelta(days=1)
110
self.clear_all_data()
112
# Create all our base tenants
113
for tenant in self.tenant_fixtures:
114
self.fixture_create_tenant(**tenant)
116
# Create the user we will authenticate with
117
self.auth_user = self.fixture_create_user(**self.auth_user_attrs)
118
self.auth_token = self.fixture_create_token(
119
id=self.auth_token_id,
120
user_id=self.auth_user['id'],
121
tenant_id=self.auth_user['tenant_id'],
122
expires=self.expires,
125
self.add_verify_status_helpers()
128
self.clear_all_data()
129
setattr(self, 'req', None)
130
setattr(self, 'res', None)
132
def clear_all_data(self):
134
Purges the database of all data
136
db.unregister_models()
137
logger.debug("Cleared all data from database")
139
db.register_models(options=opts['keystone.backends.sqlalchemy'])
141
def fixture_create_credentials(self, **kwargs):
143
Creates a tenant fixture.
145
:params **kwargs: Attributes of the tenant to create
147
values = kwargs.copy()
148
credentials = db_api.CREDENTIALS.create(values)
149
logger.debug("Created credentials fixture %s", credentials['id'])
152
def fixture_create_tenant(self, **kwargs):
154
Creates a tenant fixture.
156
:params **kwargs: Attributes of the tenant to create
158
values = kwargs.copy()
159
tenant = db_api.TENANT.create(values)
160
logger.debug("Created tenant fixture %s", values['id'])
163
def fixture_create_user(self, **kwargs):
165
Creates a user fixture. If the user's tenant ID is set, and the tenant
166
does not exist in the database, the tenant is created.
168
:params **kwargs: Attributes of the user to create
170
values = kwargs.copy()
171
tenant_id = values.get('tenant_id')
173
if not db_api.TENANT.get(tenant_id):
174
db_api.TENANT.create({'id': tenant_id,
177
user = db_api.USER.create(values)
178
logger.debug("Created user fixture %s", values['id'])
181
def fixture_create_token(self, **kwargs):
183
Creates a token fixture.
185
:params **kwargs: Attributes of the token to create
187
values = kwargs.copy()
188
token = db_api.TOKEN.create(values)
189
logger.debug("Created token fixture %s", values['id'])
192
def get_request(self, method, url, headers=None):
194
Sets the `req` attribute to a `webob.Request` object that
195
is constructed with the supplied method and url. Supplied
196
headers are added to appropriate Content-type headers.
198
headers = headers or {}
199
self.req = webob.Request.blank(url)
200
self.req.method = method
201
self.req.headers = headers
202
if 'content-type' not in headers:
203
ct = 'application/%s' % self.content_type
204
self.req.headers['content-type'] = ct
205
self.req.headers['accept'] = ct
208
def get_response(self):
210
Sets the appropriate headers for the `req` attribute for
211
the current content type, then calls `req.get_response()` and
212
sets the `res` attribute to the returned `webob.Response` object
214
self.res = self.req.get_response(self.api)
215
logger.debug("%s %s returned %s", self.req.method, self.req.path_qs,
217
if self.res.status_int != httplib.OK:
218
logger.debug("Response Body:")
219
for line in self.res.body.split("\n"):
223
def verify_status(self, status_code):
225
Simple convenience wrapper for validating a response's status
228
if not getattr(self, 'res'):
229
raise RuntimeError("Called verify_status() before calling "
232
self.assertEqual(status_code, self.res.status_int,
233
"Incorrect status code %d. Expected %d" %
234
(self.res.status_int, status_code))
236
def add_verify_status_helpers(self):
238
Adds some convenience helpers using partials...
240
self.status_ok = functools.partial(self.verify_status,
242
self.status_not_found = functools.partial(self.verify_status,
244
self.status_unauthorized = functools.partial(self.verify_status,
245
httplib.UNAUTHORIZED)
246
self.status_bad_request = functools.partial(self.verify_status,
249
def assert_dict_equal(self, expected, got):
251
Compares two dicts for equality and prints the dictionaries
252
nicely formatted for easy comparison if there is a failure.
254
self.assertEqual(expected, got, "Mappings are not equal.\n"
255
"Got:\n%s\nExpected:\n%s" %
256
(pprint.pformat(got),
257
pprint.pformat(expected)))
259
def assert_xml_strings_equal(self, expected, got):
261
Compares two XML strings for equality by parsing them both
262
into DOMs. Prints the DOMs nicely formatted for easy comparison
263
if there is a failure.
265
# This is a nice little trick... objectify.fromstring() returns
266
# a DOM different from etree.fromstring(). The objectify version
267
# removes any different whitespacing...
268
got = objectify.fromstring(got)
269
expected = objectify.fromstring(expected)
270
self.assertEqual(etree.tostring(expected),
271
etree.tostring(got), "DOMs are not equal.\n"
272
"Got:\n%s\nExpected:\n%s" %
273
(etree.tostring(got, pretty_print=True),
274
etree.tostring(expected, pretty_print=True)))
277
class AdminAPITest(ServiceAPITest):
280
Base test case class for any unit test that tests the admin API. The
284
The `api` attribute for this base class is the `server.KeystoneAdminAPI`
287
api_class = server.AdminApi
290
Set of dicts of tenant attributes we start each test case with
301
Attributes of the user the test creates for each test case that
302
will authenticate against the API.
304
auth_user_attrs = {'id': 'admin_user',
305
'password': 'admin_pass',
306
'email': 'admin_user@example.com',
308
'tenant_id': 'tenant2'}