1
# Copyright (c) 2012 OpenStack, LLC.
3
# Licensed under the Apache License, Version 2.0 (the "License");
4
# you may not use this file except in compliance with the License.
5
# You may obtain a copy of the License at
7
# http://www.apache.org/licenses/LICENSE-2.0
9
# Unless required by applicable law or agreed to in writing, software
10
# distributed under the License is distributed on an "AS IS" BASIS,
11
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
13
# See the License for the specific language governing permissions and
14
# limitations under the License.
23
from quantum.api.extensions import PluginAwareExtensionManager
24
from quantum.api.v2 import attributes
25
from quantum.api.v2.router import APIRouter
26
from quantum.common import config
27
from quantum.common.test_lib import test_config
28
from quantum import context
29
from quantum.db import api as db
30
from quantum.db import db_base_plugin_v2
31
from quantum.db import securitygroups_db
32
from quantum.extensions import securitygroup as ext_sg
33
from quantum.manager import QuantumManager
34
from quantum.openstack.common import cfg
35
from quantum.tests.unit import test_db_plugin
36
from quantum.tests.unit import test_extensions
37
from quantum.wsgi import JSONDeserializer
39
DB_PLUGIN_KLASS = ('quantum.tests.unit.test_extension_security_group.'
40
'SecurityGroupTestPlugin')
41
ROOTDIR = os.path.dirname(os.path.dirname(__file__))
42
ETCDIR = os.path.join(ROOTDIR, 'etc')
46
return os.path.join(ETCDIR, *p)
49
class SecurityGroupTestExtensionManager(object):
51
def get_resources(self):
52
return ext_sg.Securitygroup.get_resources()
54
def get_actions(self):
57
def get_request_extensions(self):
61
class SecurityGroupsTestCase(test_db_plugin.QuantumDbPluginV2TestCase,
63
def setUp(self, plugin=None):
64
super(SecurityGroupsTestCase, self).setUp()
67
# Make sure at each test a new instance of the plugin is returned
68
QuantumManager._instance = None
69
# Make sure at each test according extensions for the plugin is loaded
70
PluginAwareExtensionManager._instance = None
71
# Save the attributes map in case the plugin will alter it
73
# Note(salvatore-orlando): shallow copy is not good enough in
74
# this case, but copy.deepcopy does not seem to work, since it
75
# causes test failures
76
self._attribute_map_bk = {}
77
for item in attributes.RESOURCE_ATTRIBUTE_MAP:
78
self._attribute_map_bk[item] = (attributes.
79
RESOURCE_ATTRIBUTE_MAP[item].
81
json_deserializer = JSONDeserializer()
82
self._deserializers = {
83
'application/json': json_deserializer,
87
plugin = test_config.get('plugin_name_v2', DB_PLUGIN_KLASS)
89
# Create the default configurations
90
args = ['--config-file', etcdir('quantum.conf.test')]
91
# If test_config specifies some config-file, use it, as well
92
for config_file in test_config.get('config_files', []):
93
args.extend(['--config-file', config_file])
94
config.parse(args=args)
96
cfg.CONF.set_override('core_plugin', plugin)
97
self.api = APIRouter()
99
def _is_native_bulk_supported():
100
plugin_obj = QuantumManager.get_plugin()
101
native_bulk_attr_name = ("_%s__native_bulk_support"
102
% plugin_obj.__class__.__name__)
103
return getattr(plugin_obj, native_bulk_attr_name, False)
105
self._skip_native_bulk = not _is_native_bulk_supported()
107
QuantumManager.get_plugin().supported_extension_aliases = (
109
ext_mgr = SecurityGroupTestExtensionManager()
111
self.ext_api = test_extensions.setup_extensions_middleware(ext_mgr)
114
super(SecurityGroupsTestCase, self).tearDown()
118
# Restore the original attribute map
119
attributes.RESOURCE_ATTRIBUTE_MAP = self._attribute_map_bk
121
def _create_security_group(self, fmt, name, description, external_id=None,
124
data = {'security_group': {'name': name,
125
'tenant_id': kwargs.get('tenant_id',
127
'description': description}}
129
data['security_group']['external_id'] = external_id
130
security_group_req = self.new_create_request('security-groups', data,
132
if (kwargs.get('set_context') and 'tenant_id' in kwargs):
133
# create a specific auth context for this request
134
security_group_req.environ['quantum.context'] = (
135
context.Context('', kwargs['tenant_id']))
136
return security_group_req.get_response(self.ext_api)
138
def _build_security_group_rule(self, security_group_id, direction,
139
protocol, port_range_min, port_range_max,
140
source_ip_prefix=None, source_group_id=None,
141
external_id=None, tenant_id='test_tenant'):
143
data = {'security_group_rule': {'security_group_id': security_group_id,
144
'direction': direction,
145
'protocol': protocol,
146
'port_range_min': port_range_min,
147
'port_range_max': port_range_max,
148
'tenant_id': tenant_id}}
150
data['security_group_rule']['external_id'] = external_id
153
data['security_group_rule']['source_ip_prefix'] = source_ip_prefix
156
data['security_group_rule']['source_group_id'] = source_group_id
160
def _create_security_group_rule(self, fmt, rules, **kwargs):
162
security_group_rule_req = self.new_create_request(
163
'security-group-rules', rules, fmt)
165
if (kwargs.get('set_context') and 'tenant_id' in kwargs):
166
# create a specific auth context for this request
167
security_group_rule_req.environ['quantum.context'] = (
168
context.Context('', kwargs['tenant_id']))
169
return security_group_rule_req.get_response(self.ext_api)
171
@contextlib.contextmanager
172
def security_group(self, name='webservers', description='webservers',
173
external_id=None, fmt='json', no_delete=False):
174
res = self._create_security_group(fmt, name, description,
176
security_group = self.deserialize(fmt, res)
177
if res.status_int >= 400:
178
raise webob.exc.HTTPClientError(code=res.status_int)
181
self._delete('security-groups',
182
security_group['security_group']['id'])
184
@contextlib.contextmanager
185
def security_group_rule(self, security_group_id='4cd70774-cc67-4a87-9b39-7'
187
direction='ingress', protocol='tcp',
188
port_range_min='22', port_range_max='22',
189
source_ip_prefix=None, source_group_id=None,
190
external_id=None, fmt='json', no_delete=False):
192
rule = self._build_security_group_rule(security_group_id, direction,
193
protocol, port_range_min,
196
source_group_id, external_id)
197
res = self._create_security_group_rule('json', rule)
198
security_group_rule = self.deserialize(fmt, res)
199
if res.status_int >= 400:
200
raise webob.exc.HTTPClientError(code=res.status_int)
201
yield security_group_rule
203
self._delete('security-group-rules',
204
security_group_rule['security_group_rule']['id'])
207
class SecurityGroupTestPlugin(db_base_plugin_v2.QuantumDbPluginV2,
208
securitygroups_db.SecurityGroupDbMixin):
209
""" Test plugin that implements necessary calls on create/delete port for
210
associating ports with security groups.
213
supported_extension_aliases = ["security-group"]
215
def create_port(self, context, port):
216
tenant_id = self._get_tenant_id_for_create(context, port['port'])
217
default_sg = self._ensure_default_security_group(context, tenant_id)
218
if not port['port'].get(ext_sg.SECURITYGROUP):
219
port['port'][ext_sg.SECURITYGROUP] = [default_sg]
220
self._validate_security_groups_on_port(context, port)
221
session = context.session
222
with session.begin(subtransactions=True):
223
sgids = port['port'].get(ext_sg.SECURITYGROUP)
224
port = super(SecurityGroupTestPlugin, self).create_port(context,
226
self._process_port_create_security_group(context, port['id'],
228
self._extend_port_dict_security_group(context, port)
231
def update_port(self, context, id, port):
232
session = context.session
233
with session.begin(subtransactions=True):
234
self._validate_security_groups_on_port(context, port)
235
# delete the port binding and read it with the new rules
236
self._delete_port_security_group_bindings(context, id)
237
self._process_port_create_security_group(context, id,
239
ext_sg.SECURITYGROUP))
240
port = super(SecurityGroupTestPlugin, self).update_port(
242
self._extend_port_dict_security_group(context, port)
245
def delete_port(self, context, id):
246
session = context.session
247
with session.begin(subtransactions=True):
248
super(SecurityGroupTestPlugin, self).delete_port(context, id)
249
self._delete_port_security_group_bindings(context, id)
251
def create_network(self, context, network):
252
tenant_id = self._get_tenant_id_for_create(context, network['network'])
253
self._ensure_default_security_group(context, tenant_id)
254
return super(SecurityGroupTestPlugin, self).create_network(context,
258
class SecurityGroupDBTestCase(SecurityGroupsTestCase):
259
def setUp(self, plugin=None):
260
test_config['plugin_name_v2'] = DB_PLUGIN_KLASS
261
ext_mgr = SecurityGroupTestExtensionManager()
262
test_config['extension_manager'] = ext_mgr
263
super(SecurityGroupDBTestCase, self).setUp()
266
class TestSecurityGroups(SecurityGroupDBTestCase):
267
def test_create_security_group(self):
269
description = 'my webservers'
270
keys = [('name', name,), ('description', description)]
271
with self.security_group(name, description) as security_group:
273
self.assertEquals(security_group['security_group'][k], v)
275
def test_create_security_group_external_id(self):
276
cfg.CONF.SECURITYGROUP.proxy_mode = True
278
description = 'my webservers'
280
keys = [('name', name,), ('description', description),
281
('external_id', external_id)]
282
with self.security_group(name, description, external_id) as sg:
284
self.assertEquals(sg['security_group'][k], v)
286
def test_default_security_group(self):
288
res = self.new_list_request('security-groups')
289
groups = self.deserialize('json', res.get_response(self.ext_api))
290
self.assertEquals(len(groups['security_groups']), 1)
292
def test_create_security_group_proxy_mode_not_admin(self):
293
cfg.CONF.SECURITYGROUP.proxy_mode = True
294
res = self._create_security_group('json', 'webservers',
296
tenant_id='bad_tenant',
298
self.deserialize('json', res)
299
self.assertEquals(res.status_int, 500)
301
def test_create_security_group_no_external_id_proxy_mode(self):
302
cfg.CONF.SECURITYGROUP.proxy_mode = True
303
res = self._create_security_group('json', 'webservers',
305
self.deserialize('json', res)
306
self.assertEquals(res.status_int, 400)
308
def test_create_security_group_no_external_id_not_proxy_mode(self):
309
res = self._create_security_group('json', 'webservers',
311
self.deserialize('json', res)
312
self.assertEquals(res.status_int, 409)
314
def test_create_default_security_group_fail(self):
316
description = 'my webservers'
317
res = self._create_security_group('json', name, description)
318
self.deserialize('json', res)
319
self.assertEquals(res.status_int, 409)
321
def test_create_security_group_duplicate_external_id(self):
322
cfg.CONF.SECURITYGROUP.proxy_mode = True
324
description = 'my webservers'
326
with self.security_group(name, description, external_id):
327
res = self._create_security_group('json', name, description,
329
self.deserialize('json', res)
330
self.assertEquals(res.status_int, 409)
332
def test_list_security_groups(self):
334
description = 'my webservers'
335
with self.security_group(name, description):
336
res = self.new_list_request('security-groups')
337
groups = self.deserialize('json', res.get_response(self.ext_api))
338
self.assertEquals(len(groups['security_groups']), 2)
340
def test_get_security_group(self):
342
description = 'my webservers'
343
with self.security_group(name, description) as sg:
344
source_group_id = sg['security_group']['id']
345
res = self.new_show_request('security-groups', source_group_id)
346
group = self.deserialize('json', res.get_response(self.ext_api))
347
self.assertEquals(group['security_group']['id'], source_group_id)
349
def test_delete_security_group(self):
351
description = 'my webservers'
352
with self.security_group(name, description, no_delete=True) as sg:
353
source_group_id = sg['security_group']['id']
354
self._delete('security-groups', source_group_id, 204)
356
def test_delete_default_security_group_fail(self):
358
res = self.new_list_request('security-groups')
359
sg = self.deserialize('json', res.get_response(self.ext_api))
360
self._delete('security-groups', sg['security_groups'][0]['id'],
363
def test_default_security_group_rules(self):
365
res = self.new_list_request('security-groups')
366
groups = self.deserialize('json', res.get_response(self.ext_api))
367
self.assertEquals(len(groups['security_groups']), 1)
368
res = self.new_list_request('security-group-rules')
369
rules = self.deserialize('json', res.get_response(self.ext_api))
370
self.assertEquals(len(rules['security_group_rules']), 2)
371
# just generic rules to allow default egress and
372
# intergroup communicartion
373
for rule in rules['security_group_rules']:
374
self.assertEquals(rule['port_range_max'], None)
375
self.assertEquals(rule['port_range_min'], None)
376
self.assertEquals(rule['protocol'], None)
378
def test_create_security_group_rule_source_ip_prefix(self):
380
description = 'my webservers'
381
with self.security_group(name, description) as sg:
382
security_group_id = sg['security_group']['id']
383
direction = "ingress"
384
source_ip_prefix = "10.0.0.0/24"
388
keys = [('source_ip_prefix', source_ip_prefix),
389
('security_group_id', security_group_id),
390
('direction', direction),
391
('protocol', protocol),
392
('port_range_min', port_range_min),
393
('port_range_max', port_range_max)]
394
with self.security_group_rule(security_group_id, direction,
395
protocol, port_range_min,
397
source_ip_prefix) as rule:
399
self.assertEquals(rule['security_group_rule'][k], v)
401
def test_create_security_group_rule_group_id(self):
403
description = 'my webservers'
404
with self.security_group(name, description) as sg:
405
with self.security_group(name, description) as sg2:
406
security_group_id = sg['security_group']['id']
407
direction = "ingress"
408
source_group_id = sg2['security_group']['id']
412
keys = [('source_group_id', source_group_id),
413
('security_group_id', security_group_id),
414
('direction', direction),
415
('protocol', protocol),
416
('port_range_min', port_range_min),
417
('port_range_max', port_range_max)]
418
with self.security_group_rule(security_group_id, direction,
419
protocol, port_range_min,
421
source_group_id=source_group_id
424
self.assertEquals(rule['security_group_rule'][k], v)
426
def test_create_security_group_source_group_ip_and_ip_prefix(self):
427
security_group_id = "4cd70774-cc67-4a87-9b39-7d1db38eb087"
428
direction = "ingress"
429
source_ip_prefix = "10.0.0.0/24"
433
source_group_id = "9cd70774-cc67-4a87-9b39-7d1db38eb087"
434
rule = self._build_security_group_rule(security_group_id, direction,
435
protocol, port_range_min,
439
res = self._create_security_group_rule('json', rule)
440
self.deserialize('json', res)
441
self.assertEquals(res.status_int, 400)
443
def test_create_security_group_rule_bad_security_group_id(self):
444
security_group_id = "4cd70774-cc67-4a87-9b39-7d1db38eb087"
445
direction = "ingress"
446
source_ip_prefix = "10.0.0.0/24"
450
rule = self._build_security_group_rule(security_group_id, direction,
451
protocol, port_range_min,
454
res = self._create_security_group_rule('json', rule)
455
self.deserialize('json', res)
456
self.assertEquals(res.status_int, 404)
458
def test_create_security_group_rule_bad_tenant(self):
459
with self.security_group() as sg:
460
rule = {'security_group_rule':
461
{'security_group_id': sg['security_group']['id'],
462
'direction': 'ingress',
464
'port_range_min': '22',
465
'port_range_max': '22',
466
'tenant_id': "bad_tenant"}}
468
res = self._create_security_group_rule('json', rule)
469
self.deserialize('json', res)
470
self.assertEquals(res.status_int, 404)
472
def test_create_security_group_rule_exteral_id_proxy_mode(self):
473
cfg.CONF.SECURITYGROUP.proxy_mode = True
474
with self.security_group(external_id=1) as sg:
475
rule = {'security_group_rule':
476
{'security_group_id': sg['security_group']['id'],
477
'direction': 'ingress',
479
'port_range_min': '22',
480
'port_range_max': '22',
482
'tenant_id': 'test_tenant',
483
'source_group_id': sg['security_group']['id']}}
485
res = self._create_security_group_rule('json', rule)
486
self.deserialize('json', res)
487
self.assertEquals(res.status_int, 201)
489
def test_create_security_group_rule_exteral_id_not_proxy_mode(self):
490
with self.security_group() as sg:
491
rule = {'security_group_rule':
492
{'security_group_id': sg['security_group']['id'],
493
'direction': 'ingress',
495
'port_range_min': '22',
496
'port_range_max': '22',
498
'tenant_id': 'test_tenant',
499
'source_group_id': sg['security_group']['id']}}
501
res = self._create_security_group_rule('json', rule)
502
self.deserialize('json', res)
503
self.assertEquals(res.status_int, 409)
505
def test_create_security_group_rule_not_admin(self):
506
cfg.CONF.SECURITYGROUP.proxy_mode = True
507
with self.security_group(external_id='1') as sg:
508
rule = {'security_group_rule':
509
{'security_group_id': sg['security_group']['id'],
510
'direction': 'ingress',
512
'port_range_min': '22',
513
'port_range_max': '22',
514
'tenant_id': 'bad_tenant',
516
'source_group_id': sg['security_group']['id']}}
518
res = self._create_security_group_rule('json', rule,
519
tenant_id='bad_tenant',
521
self.deserialize('json', res)
522
self.assertEquals(res.status_int, 500)
524
def test_create_security_group_rule_bad_tenant_source_group_id(self):
525
with self.security_group() as sg:
526
res = self._create_security_group('json', 'webservers',
528
tenant_id='bad_tenant')
529
sg2 = self.deserialize('json', res)
530
rule = {'security_group_rule':
531
{'security_group_id': sg2['security_group']['id'],
532
'direction': 'ingress',
534
'port_range_min': '22',
535
'port_range_max': '22',
536
'tenant_id': 'bad_tenant',
537
'source_group_id': sg['security_group']['id']}}
539
res = self._create_security_group_rule('json', rule,
540
tenant_id='bad_tenant',
542
self.deserialize('json', res)
543
self.assertEquals(res.status_int, 404)
545
def test_create_security_group_rule_bad_tenant_security_group_rule(self):
546
with self.security_group() as sg:
547
res = self._create_security_group('json', 'webservers',
549
tenant_id='bad_tenant')
550
self.deserialize('json', res)
551
rule = {'security_group_rule':
552
{'security_group_id': sg['security_group']['id'],
553
'direction': 'ingress',
555
'port_range_min': '22',
556
'port_range_max': '22',
557
'tenant_id': 'bad_tenant'}}
559
res = self._create_security_group_rule('json', rule,
560
tenant_id='bad_tenant',
562
self.deserialize('json', res)
563
self.assertEquals(res.status_int, 404)
565
def test_create_security_group_rule_bad_source_group_id(self):
567
description = 'my webservers'
568
with self.security_group(name, description) as sg:
569
security_group_id = sg['security_group']['id']
570
source_group_id = "4cd70774-cc67-4a87-9b39-7d1db38eb087"
571
direction = "ingress"
575
rule = self._build_security_group_rule(security_group_id, direction,
576
protocol, port_range_min,
578
source_group_id=source_group_id)
579
res = self._create_security_group_rule('json', rule)
580
self.deserialize('json', res)
581
self.assertEquals(res.status_int, 404)
583
def test_create_security_group_rule_duplicate_rules(self):
585
description = 'my webservers'
586
with self.security_group(name, description) as sg:
587
security_group_id = sg['security_group']['id']
588
with self.security_group_rule(security_group_id):
589
rule = self._build_security_group_rule(
590
sg['security_group']['id'], 'ingress', 'tcp', '22', '22')
591
self._create_security_group_rule('json', rule)
592
res = self._create_security_group_rule('json', rule)
593
self.deserialize('json', res)
594
self.assertEquals(res.status_int, 409)
596
def test_create_security_group_rule_min_port_greater_max(self):
598
description = 'my webservers'
599
with self.security_group(name, description) as sg:
600
security_group_id = sg['security_group']['id']
601
with self.security_group_rule(security_group_id):
602
rule = self._build_security_group_rule(
603
sg['security_group']['id'], 'ingress', 'tcp', '50', '22')
604
self._create_security_group_rule('json', rule)
605
res = self._create_security_group_rule('json', rule)
606
self.deserialize('json', res)
607
self.assertEquals(res.status_int, 400)
609
def test_create_security_group_rule_ports_but_no_protocol(self):
611
description = 'my webservers'
612
with self.security_group(name, description) as sg:
613
security_group_id = sg['security_group']['id']
614
with self.security_group_rule(security_group_id):
615
rule = self._build_security_group_rule(
616
sg['security_group']['id'], 'ingress', None, '22', '22')
617
self._create_security_group_rule('json', rule)
618
res = self._create_security_group_rule('json', rule)
619
self.deserialize('json', res)
620
self.assertEquals(res.status_int, 400)
622
def test_update_port_with_security_group(self):
623
with self.network() as n:
625
with self.security_group() as sg:
626
res = self._create_port('json', n['network']['id'])
627
port = self.deserialize('json', res)
629
data = {'port': {'fixed_ips': port['port']['fixed_ips'],
630
'name': port['port']['name'],
631
ext_sg.SECURITYGROUP:
632
[sg['security_group']['id']]}}
634
req = self.new_update_request('ports', data,
636
res = self.deserialize('json', req.get_response(self.api))
637
self.assertEquals(res['port'][ext_sg.SECURITYGROUP][0],
638
sg['security_group']['id'])
639
self._delete('ports', port['port']['id'])
641
def test_update_port_with_multiple_security_groups(self):
642
with self.network() as n:
644
with self.security_group() as sg1:
645
with self.security_group() as sg2:
646
res = self._create_port(
647
'json', n['network']['id'],
648
security_groups=[sg1['security_group']['id'],
649
sg2['security_group']['id']])
650
port = self.deserialize('json', res)
651
self.assertEquals(len(
652
port['port'][ext_sg.SECURITYGROUP]), 2)
653
self._delete('ports', port['port']['id'])
655
def test_update_port_remove_security_group(self):
656
with self.network() as n:
658
with self.security_group() as sg:
659
res = self._create_port('json', n['network']['id'],
661
[sg['security_group']['id']]))
662
port = self.deserialize('json', res)
664
data = {'port': {'fixed_ips': port['port']['fixed_ips'],
665
'name': port['port']['name']}}
667
req = self.new_update_request('ports', data,
669
res = self.deserialize('json', req.get_response(self.api))
670
self.assertEquals(res['port'][ext_sg.SECURITYGROUP], [])
671
self._delete('ports', port['port']['id'])
673
def test_create_port_with_bad_security_group(self):
674
with self.network() as n:
676
res = self._create_port('json', n['network']['id'],
677
security_groups=['bad_id'])
679
self.deserialize('json', res)
680
self.assertEquals(res.status_int, 404)
682
def test_create_delete_security_group_port_in_use(self):
683
with self.network() as n:
685
with self.security_group() as sg:
686
res = self._create_port('json', n['network']['id'],
688
[sg['security_group']['id']]))
689
port = self.deserialize('json', res)
690
self.assertEquals(port['port'][ext_sg.SECURITYGROUP][0],
691
sg['security_group']['id'])
692
# try to delete security group that's in use
693
res = self._delete('security-groups',
694
sg['security_group']['id'], 409)
695
# delete the blocking port
696
self._delete('ports', port['port']['id'])
698
def test_create_security_group_rule_bulk_native(self):
699
if self._skip_native_bulk:
700
self.skipTest("Plugin does not support native bulk "
701
"security_group_rule create")
702
with self.security_group() as sg:
703
rule1 = self._build_security_group_rule(sg['security_group']['id'],
704
'ingress', 'tcp', '22',
706
rule2 = self._build_security_group_rule(sg['security_group']['id'],
707
'ingress', 'tcp', '23',
709
rules = {'security_group_rules': [rule1['security_group_rule'],
710
rule2['security_group_rule']]}
711
res = self._create_security_group_rule('json', rules)
712
self.deserialize('json', res)
713
self.assertEquals(res.status_int, 201)
715
def test_create_security_group_rule_bulk_emulated(self):
716
real_has_attr = hasattr
718
#ensures the API choose the emulation code path
719
def fakehasattr(item, attr):
720
if attr.endswith('__native_bulk_support'):
722
return real_has_attr(item, attr)
724
with mock.patch('__builtin__.hasattr',
726
with self.security_group() as sg:
727
rule1 = self._build_security_group_rule(
728
sg['security_group']['id'], 'ingress', 'tcp', '22', '22',
730
rule2 = self._build_security_group_rule(
731
sg['security_group']['id'], 'ingress', 'tcp', '23', '23',
733
rules = {'security_group_rules': [rule1['security_group_rule'],
734
rule2['security_group_rule']]
736
res = self._create_security_group_rule('json', rules)
737
self.deserialize('json', res)
738
self.assertEquals(res.status_int, 201)
740
def test_create_security_group_rule_duplicate_rule_in_post(self):
741
if self._skip_native_bulk:
742
self.skipTest("Plugin does not support native bulk "
743
"security_group_rule create")
744
with self.security_group() as sg:
745
rule = self._build_security_group_rule(sg['security_group']['id'],
746
'ingress', 'tcp', '22',
748
rules = {'security_group_rules': [rule['security_group_rule'],
749
rule['security_group_rule']]}
750
res = self._create_security_group_rule('json', rules)
751
rule = self.deserialize('json', res)
752
self.assertEquals(res.status_int, 409)
754
def test_create_security_group_rule_duplicate_rule_in_post_emulated(self):
755
real_has_attr = hasattr
757
#ensures the API choose the emulation code path
758
def fakehasattr(item, attr):
759
if attr.endswith('__native_bulk_support'):
761
return real_has_attr(item, attr)
763
with mock.patch('__builtin__.hasattr',
766
with self.security_group() as sg:
767
rule = self._build_security_group_rule(
768
sg['security_group']['id'], 'ingress', 'tcp', '22', '22',
770
rules = {'security_group_rules': [rule['security_group_rule'],
771
rule['security_group_rule']]}
772
res = self._create_security_group_rule('json', rules)
773
rule = self.deserialize('json', res)
774
self.assertEquals(res.status_int, 409)
776
def test_create_security_group_rule_duplicate_rule_db(self):
777
if self._skip_native_bulk:
778
self.skipTest("Plugin does not support native bulk "
779
"security_group_rule create")
780
with self.security_group() as sg:
781
rule = self._build_security_group_rule(sg['security_group']['id'],
782
'ingress', 'tcp', '22',
784
rules = {'security_group_rules': [rule]}
785
self._create_security_group_rule('json', rules)
786
res = self._create_security_group_rule('json', rules)
787
rule = self.deserialize('json', res)
788
self.assertEquals(res.status_int, 409)
790
def test_create_security_group_rule_duplicate_rule_db_emulated(self):
791
real_has_attr = hasattr
793
#ensures the API choose the emulation code path
794
def fakehasattr(item, attr):
795
if attr.endswith('__native_bulk_support'):
797
return real_has_attr(item, attr)
799
with mock.patch('__builtin__.hasattr',
801
with self.security_group() as sg:
802
rule = self._build_security_group_rule(
803
sg['security_group']['id'], 'ingress', 'tcp', '22', '22',
805
rules = {'security_group_rules': [rule]}
806
self._create_security_group_rule('json', rules)
807
res = self._create_security_group_rule('json', rule)
808
self.deserialize('json', res)
809
self.assertEquals(res.status_int, 409)
811
def test_create_security_group_rule_differnt_security_group_ids(self):
812
if self._skip_native_bulk:
813
self.skipTest("Plugin does not support native bulk "
814
"security_group_rule create")
815
with self.security_group() as sg1:
816
with self.security_group() as sg2:
817
rule1 = self._build_security_group_rule(
818
sg1['security_group']['id'], 'ingress', 'tcp', '22', '22',
820
rule2 = self._build_security_group_rule(
821
sg2['security_group']['id'], 'ingress', 'tcp', '23', '23',
824
rules = {'security_group_rules': [rule1['security_group_rule'],
825
rule2['security_group_rule']]
827
res = self._create_security_group_rule('json', rules)
828
self.deserialize('json', res)
829
self.assertEquals(res.status_int, 400)