1
# Copyright 2012 OpenStack Foundation.
4
# Licensed under the Apache License, Version 2.0 (the "License"); you may
5
# not use this file except in compliance with the License. You may obtain
6
# a copy of the License at
8
# http://www.apache.org/licenses/LICENSE-2.0
10
# Unless required by applicable law or agreed to in writing, software
11
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13
# License for the specific language governing permissions and limitations
19
from oslo_config import cfg
24
from neutron.api import extensions
25
from neutron.api.v2 import attributes
26
from neutron.common import config
27
from neutron.common import constants
28
from neutron.common import exceptions
29
from neutron import context
30
from neutron.db import quota_db
31
from neutron import quota
32
from neutron.tests import base
33
from neutron.tests.unit import test_api_v2
34
from neutron.tests.unit import testlib_api
35
from neutron.tests.unit import testlib_plugin
37
TARGET_PLUGIN = 'neutron.plugins.ml2.plugin.Ml2Plugin'
39
_get_path = test_api_v2._get_path
42
class QuotaExtensionTestCase(testlib_api.WebTestCase,
43
testlib_plugin.PluginSetupHelper):
46
super(QuotaExtensionTestCase, self).setUp()
47
# Ensure existing ExtensionManager is not used
48
extensions.PluginAwareExtensionManager._instance = None
50
# Save the global RESOURCE_ATTRIBUTE_MAP
51
self.saved_attr_map = {}
52
for resource, attrs in attributes.RESOURCE_ATTRIBUTE_MAP.iteritems():
53
self.saved_attr_map[resource] = attrs.copy()
55
# Create the default configurations
58
# Update the plugin and extensions path
59
self.setup_coreplugin(TARGET_PLUGIN)
60
cfg.CONF.set_override(
62
['network', 'subnet', 'port', 'extra1'],
64
quota.QUOTAS = quota.QuotaEngine()
65
quota.register_resources_from_config()
66
self._plugin_patcher = mock.patch(TARGET_PLUGIN, autospec=True)
67
self.plugin = self._plugin_patcher.start()
68
self.plugin.return_value.supported_extension_aliases = ['quotas']
69
# QUOTAS will register the items in conf when starting
70
# extra1 here is added later, so have to do it manually
71
quota.QUOTAS.register_resource_by_name('extra1')
72
ext_mgr = extensions.PluginAwareExtensionManager.get_instance()
73
app = config.load_paste_app('extensions_test_app')
74
ext_middleware = extensions.ExtensionMiddleware(app, ext_mgr=ext_mgr)
75
self.api = webtest.TestApp(ext_middleware)
80
# Restore the global RESOURCE_ATTRIBUTE_MAP
81
attributes.RESOURCE_ATTRIBUTE_MAP = self.saved_attr_map
82
super(QuotaExtensionTestCase, self).tearDown()
85
class QuotaExtensionDbTestCase(QuotaExtensionTestCase):
89
cfg.CONF.set_override(
91
'neutron.db.quota_db.DbQuotaDriver',
93
super(QuotaExtensionDbTestCase, self).setUp()
95
def test_quotas_loaded_right(self):
96
res = self.api.get(_get_path('quotas', fmt=self.fmt))
97
quota = self.deserialize(res)
98
self.assertEqual([], quota['quotas'])
99
self.assertEqual(200, res.status_int)
101
def test_quotas_default_values(self):
102
tenant_id = 'tenant_id1'
103
env = {'neutron.context': context.Context('', tenant_id)}
104
res = self.api.get(_get_path('quotas', id=tenant_id, fmt=self.fmt),
106
quota = self.deserialize(res)
107
self.assertEqual(10, quota['quota']['network'])
108
self.assertEqual(10, quota['quota']['subnet'])
109
self.assertEqual(50, quota['quota']['port'])
110
self.assertEqual(-1, quota['quota']['extra1'])
112
def test_show_quotas_with_admin(self):
113
tenant_id = 'tenant_id1'
114
env = {'neutron.context': context.Context('', tenant_id + '2',
116
res = self.api.get(_get_path('quotas', id=tenant_id, fmt=self.fmt),
118
self.assertEqual(200, res.status_int)
119
quota = self.deserialize(res)
120
self.assertEqual(10, quota['quota']['network'])
121
self.assertEqual(10, quota['quota']['subnet'])
122
self.assertEqual(50, quota['quota']['port'])
124
def test_show_quotas_without_admin_forbidden_returns_403(self):
125
tenant_id = 'tenant_id1'
126
env = {'neutron.context': context.Context('', tenant_id + '2',
128
res = self.api.get(_get_path('quotas', id=tenant_id, fmt=self.fmt),
129
extra_environ=env, expect_errors=True)
130
self.assertEqual(403, res.status_int)
132
def test_show_quotas_with_owner_tenant(self):
133
tenant_id = 'tenant_id1'
134
env = {'neutron.context': context.Context('', tenant_id,
136
res = self.api.get(_get_path('quotas', id=tenant_id, fmt=self.fmt),
138
self.assertEqual(200, res.status_int)
139
quota = self.deserialize(res)
140
self.assertEqual(10, quota['quota']['network'])
141
self.assertEqual(10, quota['quota']['subnet'])
142
self.assertEqual(50, quota['quota']['port'])
144
def test_list_quotas_with_admin(self):
145
tenant_id = 'tenant_id1'
146
env = {'neutron.context': context.Context('', tenant_id,
148
res = self.api.get(_get_path('quotas', fmt=self.fmt),
150
self.assertEqual(200, res.status_int)
151
quota = self.deserialize(res)
152
self.assertEqual([], quota['quotas'])
154
def test_list_quotas_without_admin_forbidden_returns_403(self):
155
tenant_id = 'tenant_id1'
156
env = {'neutron.context': context.Context('', tenant_id,
158
res = self.api.get(_get_path('quotas', fmt=self.fmt),
159
extra_environ=env, expect_errors=True)
160
self.assertEqual(403, res.status_int)
162
def test_update_quotas_without_admin_forbidden_returns_403(self):
163
tenant_id = 'tenant_id1'
164
env = {'neutron.context': context.Context('', tenant_id,
166
quotas = {'quota': {'network': 100}}
167
res = self.api.put(_get_path('quotas', id=tenant_id, fmt=self.fmt),
168
self.serialize(quotas), extra_environ=env,
170
self.assertEqual(403, res.status_int)
172
def test_update_quotas_with_non_integer_returns_400(self):
173
tenant_id = 'tenant_id1'
174
env = {'neutron.context': context.Context('', tenant_id,
176
quotas = {'quota': {'network': 'abc'}}
177
res = self.api.put(_get_path('quotas', id=tenant_id, fmt=self.fmt),
178
self.serialize(quotas), extra_environ=env,
180
self.assertEqual(400, res.status_int)
182
def test_update_quotas_with_negative_integer_returns_400(self):
183
tenant_id = 'tenant_id1'
184
env = {'neutron.context': context.Context('', tenant_id,
186
quotas = {'quota': {'network': -2}}
187
res = self.api.put(_get_path('quotas', id=tenant_id, fmt=self.fmt),
188
self.serialize(quotas), extra_environ=env,
190
self.assertEqual(400, res.status_int)
192
def test_update_quotas_with_out_of_range_integer_returns_400(self):
193
tenant_id = 'tenant_id1'
194
env = {'neutron.context': context.Context('', tenant_id,
196
quotas = {'quota': {'network': constants.DB_INTEGER_MAX_VALUE + 1}}
197
res = self.api.put(_get_path('quotas', id=tenant_id, fmt=self.fmt),
198
self.serialize(quotas), extra_environ=env,
200
self.assertEqual(exc.HTTPBadRequest.code, res.status_int)
202
def test_update_quotas_to_unlimited(self):
203
tenant_id = 'tenant_id1'
204
env = {'neutron.context': context.Context('', tenant_id,
206
quotas = {'quota': {'network': -1}}
207
res = self.api.put(_get_path('quotas', id=tenant_id, fmt=self.fmt),
208
self.serialize(quotas), extra_environ=env,
210
self.assertEqual(200, res.status_int)
212
def test_update_quotas_exceeding_current_limit(self):
213
tenant_id = 'tenant_id1'
214
env = {'neutron.context': context.Context('', tenant_id,
216
quotas = {'quota': {'network': 120}}
217
res = self.api.put(_get_path('quotas', id=tenant_id, fmt=self.fmt),
218
self.serialize(quotas), extra_environ=env,
220
self.assertEqual(200, res.status_int)
222
def test_update_quotas_with_non_support_resource_returns_400(self):
223
tenant_id = 'tenant_id1'
224
env = {'neutron.context': context.Context('', tenant_id,
226
quotas = {'quota': {'abc': 100}}
227
res = self.api.put(_get_path('quotas', id=tenant_id, fmt=self.fmt),
228
self.serialize(quotas), extra_environ=env,
230
self.assertEqual(400, res.status_int)
232
def test_update_quotas_with_admin(self):
233
tenant_id = 'tenant_id1'
234
env = {'neutron.context': context.Context('', tenant_id + '2',
236
quotas = {'quota': {'network': 100}}
237
res = self.api.put(_get_path('quotas', id=tenant_id, fmt=self.fmt),
238
self.serialize(quotas), extra_environ=env)
239
self.assertEqual(200, res.status_int)
240
env2 = {'neutron.context': context.Context('', tenant_id)}
241
res = self.api.get(_get_path('quotas', id=tenant_id, fmt=self.fmt),
243
quota = self.deserialize(res)
244
self.assertEqual(100, quota['quota']['network'])
245
self.assertEqual(10, quota['quota']['subnet'])
246
self.assertEqual(50, quota['quota']['port'])
248
def test_update_attributes(self):
249
tenant_id = 'tenant_id1'
250
env = {'neutron.context': context.Context('', tenant_id + '2',
252
quotas = {'quota': {'extra1': 100}}
253
res = self.api.put(_get_path('quotas', id=tenant_id, fmt=self.fmt),
254
self.serialize(quotas), extra_environ=env)
255
self.assertEqual(200, res.status_int)
256
env2 = {'neutron.context': context.Context('', tenant_id)}
257
res = self.api.get(_get_path('quotas', id=tenant_id, fmt=self.fmt),
259
quota = self.deserialize(res)
260
self.assertEqual(100, quota['quota']['extra1'])
262
def test_delete_quotas_with_admin(self):
263
tenant_id = 'tenant_id1'
264
env = {'neutron.context': context.Context('', tenant_id + '2',
266
res = self.api.delete(_get_path('quotas', id=tenant_id, fmt=self.fmt),
268
self.assertEqual(204, res.status_int)
270
def test_delete_quotas_without_admin_forbidden_returns_403(self):
271
tenant_id = 'tenant_id1'
272
env = {'neutron.context': context.Context('', tenant_id,
274
res = self.api.delete(_get_path('quotas', id=tenant_id, fmt=self.fmt),
275
extra_environ=env, expect_errors=True)
276
self.assertEqual(403, res.status_int)
278
def test_quotas_loaded_bad_returns_404(self):
280
res = self.api.get(_get_path('quotas'), expect_errors=True)
281
self.assertEqual(404, res.status_int)
285
def test_quotas_limit_check(self):
286
tenant_id = 'tenant_id1'
287
env = {'neutron.context': context.Context('', tenant_id,
289
quotas = {'quota': {'network': 5}}
290
res = self.api.put(_get_path('quotas', id=tenant_id,
292
self.serialize(quotas), extra_environ=env)
293
self.assertEqual(200, res.status_int)
294
quota.QUOTAS.limit_check(context.Context('', tenant_id),
298
def test_quotas_limit_check_with_invalid_quota_value(self):
299
tenant_id = 'tenant_id1'
300
with testtools.ExpectedException(exceptions.InvalidQuotaValue):
301
quota.QUOTAS.limit_check(context.Context('', tenant_id),
305
def test_quotas_limit_check_with_not_registered_resource_fails(self):
306
tenant_id = 'tenant_id1'
307
self.assertRaises(exceptions.QuotaResourceUnknown,
308
quota.QUOTAS.limit_check,
309
context.get_admin_context(load_admin_roles=False),
313
def test_quotas_get_tenant_from_request_context(self):
314
tenant_id = 'tenant_id1'
315
env = {'neutron.context': context.Context('', tenant_id,
317
res = self.api.get(_get_path('quotas/tenant', fmt=self.fmt),
319
self.assertEqual(200, res.status_int)
320
quota = self.deserialize(res)
321
self.assertEqual(quota['tenant']['tenant_id'], tenant_id)
323
def test_quotas_get_tenant_from_empty_request_context_returns_400(self):
324
env = {'neutron.context': context.Context('', '',
326
res = self.api.get(_get_path('quotas/tenant', fmt=self.fmt),
327
extra_environ=env, expect_errors=True)
328
self.assertEqual(400, res.status_int)
331
class QuotaExtensionCfgTestCase(QuotaExtensionTestCase):
335
cfg.CONF.set_override(
337
'neutron.quota.ConfDriver',
339
super(QuotaExtensionCfgTestCase, self).setUp()
341
def test_quotas_default_values(self):
342
tenant_id = 'tenant_id1'
343
env = {'neutron.context': context.Context('', tenant_id)}
344
res = self.api.get(_get_path('quotas', id=tenant_id, fmt=self.fmt),
346
quota = self.deserialize(res)
347
self.assertEqual(10, quota['quota']['network'])
348
self.assertEqual(10, quota['quota']['subnet'])
349
self.assertEqual(50, quota['quota']['port'])
350
self.assertEqual(-1, quota['quota']['extra1'])
352
def test_show_quotas_with_admin(self):
353
tenant_id = 'tenant_id1'
354
env = {'neutron.context': context.Context('', tenant_id + '2',
356
res = self.api.get(_get_path('quotas', id=tenant_id, fmt=self.fmt),
358
self.assertEqual(200, res.status_int)
360
def test_show_quotas_without_admin_forbidden(self):
361
tenant_id = 'tenant_id1'
362
env = {'neutron.context': context.Context('', tenant_id + '2',
364
res = self.api.get(_get_path('quotas', id=tenant_id, fmt=self.fmt),
365
extra_environ=env, expect_errors=True)
366
self.assertEqual(403, res.status_int)
368
def test_update_quotas_forbidden(self):
369
tenant_id = 'tenant_id1'
370
quotas = {'quota': {'network': 100}}
371
res = self.api.put(_get_path('quotas', id=tenant_id, fmt=self.fmt),
372
self.serialize(quotas),
374
self.assertEqual(403, res.status_int)
376
def test_delete_quotas_forbidden(self):
377
tenant_id = 'tenant_id1'
378
env = {'neutron.context': context.Context('', tenant_id,
380
res = self.api.delete(_get_path('quotas', id=tenant_id, fmt=self.fmt),
381
extra_environ=env, expect_errors=True)
382
self.assertEqual(403, res.status_int)
385
class TestDbQuotaDriver(base.BaseTestCase):
386
"""Test for neutron.db.quota_db.DbQuotaDriver."""
388
def test_get_tenant_quotas_arg(self):
389
"""Call neutron.db.quota_db.DbQuotaDriver._get_quotas."""
391
driver = quota_db.DbQuotaDriver()
392
ctx = context.Context('', 'bar')
394
foo_quotas = {'network': 5}
395
default_quotas = {'network': 10}
396
target_tenant = 'foo'
398
with mock.patch.object(quota_db.DbQuotaDriver,
400
return_value=foo_quotas) as get_tenant_quotas:
402
quotas = driver._get_quotas(ctx,
406
self.assertEqual(quotas, foo_quotas)
407
get_tenant_quotas.assert_called_once_with(ctx,
412
class TestQuotaDriverLoad(base.BaseTestCase):
414
super(TestQuotaDriverLoad, self).setUp()
415
# Make sure QuotaEngine is reinitialized in each test.
416
quota.QUOTAS._driver = None
418
def _test_quota_driver(self, cfg_driver, loaded_driver,
419
with_quota_db_module=True):
420
cfg.CONF.set_override('quota_driver', cfg_driver, group='QUOTAS')
421
with mock.patch.dict(sys.modules, {}):
422
if (not with_quota_db_module and
423
'neutron.db.quota_db' in sys.modules):
424
del sys.modules['neutron.db.quota_db']
425
driver = quota.QUOTAS.get_driver()
426
self.assertEqual(loaded_driver, driver.__class__.__name__)
428
def test_quota_db_driver_with_quotas_table(self):
429
self._test_quota_driver('neutron.db.quota_db.DbQuotaDriver',
430
'DbQuotaDriver', True)
432
def test_quota_db_driver_fallback_conf_driver(self):
433
self._test_quota_driver('neutron.db.quota_db.DbQuotaDriver',
436
def test_quota_conf_driver(self):
437
self._test_quota_driver('neutron.quota.ConfDriver',